|
|
|
@ -1,6 +1,7 @@
|
|
|
|
|
#include <ev/ev_libuv.hpp>
|
|
|
|
|
#include <net/net_addr.hpp>
|
|
|
|
|
#include <util/thread/logic.hpp>
|
|
|
|
|
#include <util/thread/queue.hpp>
|
|
|
|
|
|
|
|
|
|
#include <cstring>
|
|
|
|
|
|
|
|
|
@ -24,6 +25,33 @@ namespace libuv
|
|
|
|
|
/// tcp connection glue between llarp and libuv
|
|
|
|
|
struct conn_glue : public glue
|
|
|
|
|
{
|
|
|
|
|
using WriteBuffer_t = std::vector< char >;
|
|
|
|
|
|
|
|
|
|
struct WriteEvent
|
|
|
|
|
{
|
|
|
|
|
WriteBuffer_t data;
|
|
|
|
|
uv_write_t request;
|
|
|
|
|
|
|
|
|
|
WriteEvent() = default;
|
|
|
|
|
|
|
|
|
|
explicit WriteEvent(WriteBuffer_t buf) : data(std::move(buf))
|
|
|
|
|
{
|
|
|
|
|
request.data = this;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
uv_buf_t
|
|
|
|
|
Buffer()
|
|
|
|
|
{
|
|
|
|
|
return uv_buf_init(data.data(), data.size());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
uv_write_t*
|
|
|
|
|
Request()
|
|
|
|
|
{
|
|
|
|
|
return &request;
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
uv_tcp_t m_Handle;
|
|
|
|
|
uv_connect_t m_Connect;
|
|
|
|
|
uv_check_t m_Ticker;
|
|
|
|
@ -31,11 +59,11 @@ namespace libuv
|
|
|
|
|
llarp_tcp_acceptor* const m_Accept;
|
|
|
|
|
llarp_tcp_conn m_Conn;
|
|
|
|
|
llarp::Addr m_Addr;
|
|
|
|
|
|
|
|
|
|
std::deque< std::vector< char > > m_WriteQueue;
|
|
|
|
|
llarp::thread::Queue< WriteBuffer_t > m_WriteQueue;
|
|
|
|
|
uv_async_t m_WriteNotify;
|
|
|
|
|
|
|
|
|
|
conn_glue(uv_loop_t* loop, llarp_tcp_connecter* tcp, const sockaddr* addr)
|
|
|
|
|
: m_TCP(tcp), m_Accept(nullptr), m_Addr(*addr)
|
|
|
|
|
: m_TCP(tcp), m_Accept(nullptr), m_Addr(*addr), m_WriteQueue(32)
|
|
|
|
|
{
|
|
|
|
|
m_Connect.data = this;
|
|
|
|
|
m_Handle.data = this;
|
|
|
|
@ -43,24 +71,29 @@ namespace libuv
|
|
|
|
|
uv_tcp_init(loop, &m_Handle);
|
|
|
|
|
m_Ticker.data = this;
|
|
|
|
|
uv_check_init(loop, &m_Ticker);
|
|
|
|
|
m_Conn.close = &ExplicitClose;
|
|
|
|
|
m_Conn.write = &ExplicitWrite;
|
|
|
|
|
m_Conn.close = &ExplicitClose;
|
|
|
|
|
m_Conn.write = &ExplicitWrite;
|
|
|
|
|
m_WriteNotify.data = this;
|
|
|
|
|
uv_async_init(loop, &m_WriteNotify, &OnShouldWrite);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
conn_glue(uv_loop_t* loop, llarp_tcp_acceptor* tcp, const sockaddr* addr)
|
|
|
|
|
: m_TCP(nullptr), m_Accept(tcp), m_Addr(*addr)
|
|
|
|
|
: m_TCP(nullptr), m_Accept(tcp), m_Addr(*addr), m_WriteQueue(32)
|
|
|
|
|
{
|
|
|
|
|
m_Connect.data = nullptr;
|
|
|
|
|
m_Handle.data = this;
|
|
|
|
|
uv_tcp_init(loop, &m_Handle);
|
|
|
|
|
m_Ticker.data = this;
|
|
|
|
|
uv_check_init(loop, &m_Ticker);
|
|
|
|
|
m_Accept->close = &ExplicitCloseAccept;
|
|
|
|
|
m_Conn.write = nullptr;
|
|
|
|
|
m_Conn.closed = nullptr;
|
|
|
|
|
m_Accept->close = &ExplicitCloseAccept;
|
|
|
|
|
m_Conn.write = nullptr;
|
|
|
|
|
m_Conn.closed = nullptr;
|
|
|
|
|
m_WriteNotify.data = this;
|
|
|
|
|
uv_async_init(loop, &m_WriteNotify, &OnShouldWrite);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
conn_glue(conn_glue* parent) : m_TCP(nullptr), m_Accept(nullptr)
|
|
|
|
|
conn_glue(conn_glue* parent)
|
|
|
|
|
: m_TCP(nullptr), m_Accept(nullptr), m_WriteQueue(32)
|
|
|
|
|
{
|
|
|
|
|
m_Connect.data = nullptr;
|
|
|
|
|
m_Conn.close = &ExplicitClose;
|
|
|
|
@ -69,6 +102,8 @@ namespace libuv
|
|
|
|
|
uv_tcp_init(parent->m_Handle.loop, &m_Handle);
|
|
|
|
|
m_Ticker.data = this;
|
|
|
|
|
uv_check_init(parent->m_Handle.loop, &m_Ticker);
|
|
|
|
|
m_WriteNotify.data = this;
|
|
|
|
|
uv_async_init(parent->m_Handle.loop, &m_WriteNotify, &OnShouldWrite);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void
|
|
|
|
@ -177,32 +212,50 @@ namespace libuv
|
|
|
|
|
static void
|
|
|
|
|
OnWritten(uv_write_t* req, int status)
|
|
|
|
|
{
|
|
|
|
|
conn_glue* conn = static_cast< conn_glue* >(req->data);
|
|
|
|
|
if(status)
|
|
|
|
|
WriteEvent* ev = static_cast< WriteEvent* >(req->data);
|
|
|
|
|
if(status == 0)
|
|
|
|
|
{
|
|
|
|
|
llarp::LogError("write failed on tcp: ", uv_strerror(status));
|
|
|
|
|
conn->Close();
|
|
|
|
|
llarp::LogDebug("wrote ", ev->data.size());
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
Call(conn->Stream(), std::bind(&conn_glue::DrainOne, conn));
|
|
|
|
|
delete req;
|
|
|
|
|
{
|
|
|
|
|
llarp::LogDebug("write fail");
|
|
|
|
|
}
|
|
|
|
|
delete ev;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void
|
|
|
|
|
DrainOne()
|
|
|
|
|
int
|
|
|
|
|
WriteAsync(char* data, size_t sz)
|
|
|
|
|
{
|
|
|
|
|
m_WriteQueue.pop_front();
|
|
|
|
|
if(uv_is_closing((const uv_handle_t*)&m_Handle))
|
|
|
|
|
return -1;
|
|
|
|
|
if(uv_is_closing((const uv_handle_t*)&m_WriteNotify))
|
|
|
|
|
return -1;
|
|
|
|
|
WriteBuffer_t buf(sz);
|
|
|
|
|
std::copy_n(data, sz, buf.begin());
|
|
|
|
|
if(m_WriteQueue.pushBack(std::move(buf))
|
|
|
|
|
== llarp::thread::QueueReturn::Success)
|
|
|
|
|
{
|
|
|
|
|
uv_async_send(&m_WriteNotify);
|
|
|
|
|
return sz;
|
|
|
|
|
}
|
|
|
|
|
return -1;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int
|
|
|
|
|
WriteAsync(char* data, size_t sz)
|
|
|
|
|
void
|
|
|
|
|
FlushWrite()
|
|
|
|
|
{
|
|
|
|
|
m_WriteQueue.emplace_back(sz);
|
|
|
|
|
std::copy_n(data, sz, m_WriteQueue.back().begin());
|
|
|
|
|
auto buf = uv_buf_init(m_WriteQueue.back().data(), sz);
|
|
|
|
|
auto* req = new uv_write_t();
|
|
|
|
|
req->data = this;
|
|
|
|
|
return uv_write(req, Stream(), &buf, 1, &OnWritten) == 0 ? sz : 0;
|
|
|
|
|
while(not m_WriteQueue.empty())
|
|
|
|
|
{
|
|
|
|
|
auto data = m_WriteQueue.popFront();
|
|
|
|
|
WriteEvent* ev = new WriteEvent(std::move(data));
|
|
|
|
|
auto buf = ev->Buffer();
|
|
|
|
|
if(uv_write(ev->Request(), Stream(), &buf, 1, &OnWritten) != 0)
|
|
|
|
|
{
|
|
|
|
|
Close();
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void
|
|
|
|
@ -249,15 +302,25 @@ namespace libuv
|
|
|
|
|
delete shut;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void
|
|
|
|
|
OnWriteClosed(uv_handle_t* h)
|
|
|
|
|
{
|
|
|
|
|
conn_glue* conn = static_cast< conn_glue* >(h->data);
|
|
|
|
|
auto* shut = new uv_shutdown_t();
|
|
|
|
|
shut->data = conn;
|
|
|
|
|
uv_shutdown(shut, conn->Stream(), &OnShutdown);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void
|
|
|
|
|
Close() override
|
|
|
|
|
{
|
|
|
|
|
if(uv_is_closing((uv_handle_t*)&m_WriteNotify))
|
|
|
|
|
return;
|
|
|
|
|
llarp::LogDebug("close tcp connection");
|
|
|
|
|
m_WriteQueue.disable();
|
|
|
|
|
uv_close((uv_handle_t*)&m_WriteNotify, &OnWriteClosed);
|
|
|
|
|
uv_check_stop(&m_Ticker);
|
|
|
|
|
uv_read_stop(Stream());
|
|
|
|
|
auto* shut = new uv_shutdown_t();
|
|
|
|
|
shut->data = this;
|
|
|
|
|
uv_shutdown(shut, Stream(), &OnShutdown);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void
|
|
|
|
@ -274,6 +337,12 @@ namespace libuv
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void
|
|
|
|
|
OnShouldWrite(uv_async_t* h)
|
|
|
|
|
{
|
|
|
|
|
static_cast< conn_glue* >(h->data)->FlushWrite();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void
|
|
|
|
|
OnTick(uv_check_t* t)
|
|
|
|
|
{
|
|
|
|
@ -714,20 +783,19 @@ namespace libuv
|
|
|
|
|
bool
|
|
|
|
|
Loop::init()
|
|
|
|
|
{
|
|
|
|
|
m_Impl.reset(uv_loop_new());
|
|
|
|
|
if(uv_loop_init(m_Impl.get()) == -1)
|
|
|
|
|
if(uv_loop_init(&m_Impl) == -1)
|
|
|
|
|
return false;
|
|
|
|
|
m_Impl->data = this;
|
|
|
|
|
uv_loop_configure(m_Impl.get(), UV_LOOP_BLOCK_SIGNAL, SIGPIPE);
|
|
|
|
|
m_Impl.data = this;
|
|
|
|
|
uv_loop_configure(&m_Impl, UV_LOOP_BLOCK_SIGNAL, SIGPIPE);
|
|
|
|
|
m_TickTimer.data = this;
|
|
|
|
|
m_Run.store(true);
|
|
|
|
|
return uv_timer_init(m_Impl.get(), &m_TickTimer) != -1;
|
|
|
|
|
return uv_timer_init(&m_Impl, &m_TickTimer) != -1;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void
|
|
|
|
|
Loop::update_time()
|
|
|
|
|
{
|
|
|
|
|
uv_update_time(m_Impl.get());
|
|
|
|
|
uv_update_time(&m_Impl);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
bool
|
|
|
|
@ -745,7 +813,7 @@ namespace libuv
|
|
|
|
|
bool
|
|
|
|
|
Loop::tcp_connect(llarp_tcp_connecter* tcp, const sockaddr* addr)
|
|
|
|
|
{
|
|
|
|
|
auto* impl = new conn_glue(m_Impl.get(), tcp, addr);
|
|
|
|
|
auto* impl = new conn_glue(&m_Impl, tcp, addr);
|
|
|
|
|
tcp->impl = impl;
|
|
|
|
|
if(impl->ConnectAsync())
|
|
|
|
|
return true;
|
|
|
|
@ -764,14 +832,14 @@ namespace libuv
|
|
|
|
|
Loop::tick(int ms)
|
|
|
|
|
{
|
|
|
|
|
uv_timer_start(&m_TickTimer, &OnTickTimeout, ms, 0);
|
|
|
|
|
uv_run(m_Impl.get(), UV_RUN_ONCE);
|
|
|
|
|
uv_run(&m_Impl, UV_RUN_ONCE);
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void
|
|
|
|
|
Loop::stop()
|
|
|
|
|
{
|
|
|
|
|
uv_stop(m_Impl.get());
|
|
|
|
|
uv_stop(&m_Impl);
|
|
|
|
|
llarp::LogInfo("stopping event loop");
|
|
|
|
|
m_Run.store(false);
|
|
|
|
|
CloseAll();
|
|
|
|
@ -782,7 +850,7 @@ namespace libuv
|
|
|
|
|
{
|
|
|
|
|
llarp::LogInfo("Closing all handles");
|
|
|
|
|
uv_walk(
|
|
|
|
|
m_Impl.get(),
|
|
|
|
|
&m_Impl,
|
|
|
|
|
[](uv_handle_t* h, void*) {
|
|
|
|
|
if(uv_is_closing(h))
|
|
|
|
|
return;
|
|
|
|
@ -804,7 +872,7 @@ namespace libuv
|
|
|
|
|
bool
|
|
|
|
|
Loop::udp_listen(llarp_udp_io* udp, const sockaddr* src)
|
|
|
|
|
{
|
|
|
|
|
auto* impl = new udp_glue(m_Impl.get(), udp, src);
|
|
|
|
|
auto* impl = new udp_glue(&m_Impl, udp, src);
|
|
|
|
|
udp->impl = impl;
|
|
|
|
|
if(impl->Bind())
|
|
|
|
|
{
|
|
|
|
@ -817,7 +885,7 @@ namespace libuv
|
|
|
|
|
bool
|
|
|
|
|
Loop::add_ticker(std::function< void(void) > func)
|
|
|
|
|
{
|
|
|
|
|
auto* ticker = new ticker_glue(m_Impl.get(), func);
|
|
|
|
|
auto* ticker = new ticker_glue(&m_Impl, func);
|
|
|
|
|
if(ticker->Start())
|
|
|
|
|
{
|
|
|
|
|
return true;
|
|
|
|
@ -843,7 +911,7 @@ namespace libuv
|
|
|
|
|
{
|
|
|
|
|
auto* glue = new tun_glue(tun);
|
|
|
|
|
tun->impl = glue;
|
|
|
|
|
if(glue->Init(m_Impl.get()))
|
|
|
|
|
if(glue->Init(&m_Impl))
|
|
|
|
|
{
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
@ -854,7 +922,7 @@ namespace libuv
|
|
|
|
|
bool
|
|
|
|
|
Loop::tcp_listen(llarp_tcp_acceptor* tcp, const sockaddr* addr)
|
|
|
|
|
{
|
|
|
|
|
auto* glue = new conn_glue(m_Impl.get(), tcp, addr);
|
|
|
|
|
auto* glue = new conn_glue(&m_Impl, tcp, addr);
|
|
|
|
|
tcp->impl = glue;
|
|
|
|
|
if(glue->Server())
|
|
|
|
|
return true;
|
|
|
|
@ -866,7 +934,7 @@ namespace libuv
|
|
|
|
|
bool
|
|
|
|
|
Loop::add_pipe(llarp_ev_pkt_pipe* p)
|
|
|
|
|
{
|
|
|
|
|
auto* glue = new pipe_glue(m_Impl.get(), p);
|
|
|
|
|
auto* glue = new pipe_glue(&m_Impl, p);
|
|
|
|
|
if(glue->Start())
|
|
|
|
|
return true;
|
|
|
|
|
delete glue;
|
|
|
|
|