fix up libuv close logic

pull/662/head
Jeff Becker 5 years ago
parent d05dd7d526
commit 53d2034a73
No known key found for this signature in database
GPG Key ID: F357B3B42F6F9B05

@ -3,12 +3,19 @@
namespace libuv
{
struct glue
{
virtual ~glue(){};
virtual void
Close() = 0;
};
/// tcp connection glue between llarp and libuv
struct conn_glue
struct conn_glue : public glue
{
uv_tcp_t m_Handle;
uv_connect_t m_Connect;
uv_timer_t m_Ticker;
uv_check_t m_Ticker;
llarp_tcp_connecter* const m_TCP;
llarp_tcp_acceptor* const m_Accept;
llarp_tcp_conn m_Conn;
@ -21,9 +28,10 @@ namespace libuv
{
m_Connect.data = this;
m_Handle.data = this;
m_TCP->impl = this;
uv_tcp_init(loop, &m_Handle);
m_Ticker.data = this;
uv_timer_init(loop, &m_Ticker);
uv_check_init(loop, &m_Ticker);
m_Conn.close = &ExplicitClose;
m_Conn.write = &ExplicitWrite;
}
@ -31,10 +39,11 @@ namespace libuv
conn_glue(uv_loop_t* loop, llarp_tcp_acceptor* tcp, const sockaddr* addr)
: m_TCP(nullptr), m_Accept(tcp), m_Addr(*addr)
{
m_Handle.data = this;
m_Connect.data = nullptr;
m_Handle.data = this;
uv_tcp_init(loop, &m_Handle);
m_Ticker.data = this;
uv_timer_init(loop, &m_Ticker);
uv_check_init(loop, &m_Ticker);
m_Accept->close = &ExplicitCloseAccept;
m_Conn.write = nullptr;
m_Conn.closed = nullptr;
@ -42,12 +51,13 @@ namespace libuv
conn_glue(conn_glue* parent) : m_TCP(nullptr), m_Accept(nullptr)
{
m_Conn.close = &ExplicitClose;
m_Conn.write = &ExplicitWrite;
m_Handle.data = this;
m_Connect.data = nullptr;
m_Conn.close = &ExplicitClose;
m_Conn.write = &ExplicitWrite;
m_Handle.data = this;
uv_tcp_init(parent->m_Handle.loop, &m_Handle);
m_Ticker.data = this;
uv_timer_init(parent->m_Handle.loop, &m_Ticker);
uv_check_init(parent->m_Handle.loop, &m_Ticker);
}
static void
@ -55,6 +65,7 @@ namespace libuv
{
conn_glue* self = static_cast< conn_glue* >(c->data);
self->HandleConnectResult(status);
c->data = nullptr;
}
bool
@ -151,9 +162,10 @@ namespace libuv
{
llarp::LogError("write failed on tcp: ", uv_strerror(status));
static_cast< conn_glue* >(req->data)->Close();
return;
}
static_cast< conn_glue* >(req->data)->DrainOne();
else
static_cast< conn_glue* >(req->data)->DrainOne();
delete req;
}
void
@ -179,29 +191,52 @@ namespace libuv
static_cast< conn_glue* >(h->data)->HandleClosed();
}
static void
FullClose(uv_handle_t* h)
{
conn_glue* self = static_cast< conn_glue* >(h->data);
h->data = nullptr;
delete self;
llarp::LogInfo("deleted");
}
void
HandleClosed()
{
m_Handle.data = nullptr;
if(m_Accept && m_Accept->closed)
if(m_Accept)
{
if(m_Accept->closed)
m_Accept->closed(m_Accept);
m_Accept->impl = nullptr;
m_Accept->closed(m_Accept);
}
m_Conn.impl = nullptr;
if(m_Conn.closed)
{
m_Conn.closed(&m_Conn);
}
delete this;
m_Conn.impl = nullptr;
llarp::LogInfo("closed");
uv_close((uv_handle_t*)&m_Ticker, &FullClose);
}
static void
OnShutdown(uv_shutdown_t* shut, int code)
{
llarp::LogDebug("shut down ", code);
conn_glue* self = static_cast< conn_glue* >(shut->data);
uv_close((uv_handle_t*)&self->m_Handle, &OnClosed);
delete shut;
}
void
Close()
Close() override
{
llarp::LogDebug("close tcp connection");
uv_timer_stop(&m_Ticker);
uv_close((uv_handle_t*)&m_Handle, &OnClosed);
uv_check_stop(&m_Ticker);
uv_read_stop(Stream());
uv_shutdown_t* shut = new uv_shutdown_t();
shut->data = this;
uv_shutdown(shut, Stream(), &OnShutdown);
}
static void
@ -218,10 +253,9 @@ namespace libuv
}
static void
OnTick(uv_timer_t* t)
OnTick(uv_check_t* t)
{
static_cast< conn_glue* >(t->data)->Tick();
uv_timer_again(t);
}
void
@ -236,7 +270,7 @@ namespace libuv
void
Start()
{
auto result = uv_timer_start((uv_timer_t*)&m_Ticker, &OnTick, 10, 10);
auto result = uv_check_start(&m_Ticker, &OnTick);
if(result)
llarp::LogError("failed to start timer ", uv_strerror(result));
result = uv_read_start(Stream(), &Alloc, &OnRead);
@ -276,7 +310,7 @@ namespace libuv
}
};
struct udp_glue
struct udp_glue : public glue
{
uv_udp_t m_Handle;
uv_check_t m_Ticker;
@ -339,7 +373,9 @@ namespace libuv
SendTo(llarp_udp_io* udp, const sockaddr* to, const byte_t* ptr, size_t sz)
{
udp_glue* self = static_cast< udp_glue* >(udp->impl);
uv_buf_t buf = uv_buf_init((char*)ptr, sz);
if(self == nullptr)
return -1;
uv_buf_t buf = uv_buf_init((char*)ptr, sz);
return uv_udp_try_send(&self->m_Handle, &buf, 1, to);
}
@ -371,20 +407,24 @@ namespace libuv
static void
OnClosed(uv_handle_t* h)
{
udp_glue* glue = static_cast< udp_glue* >(h->data);
glue->m_UDP->impl = nullptr;
delete glue;
udp_glue* glue = static_cast< udp_glue* >(h->data);
if(glue)
{
h->data = nullptr;
glue->m_UDP->impl = nullptr;
delete glue;
}
}
void
Close()
Close() override
{
uv_check_stop(&m_Ticker);
uv_close((uv_handle_t*)&m_Handle, &OnClosed);
}
};
struct tun_glue
struct tun_glue : public glue
{
uv_poll_t m_Handle;
uv_check_t m_Ticker;
@ -446,12 +486,18 @@ namespace libuv
OnClosed(uv_handle_t* h)
{
tun_glue* self = static_cast< tun_glue* >(h->data);
delete self;
if(self)
{
self->m_Tun->impl = nullptr;
h->data = nullptr;
delete self;
}
}
void
Close()
Close() override
{
uv_check_stop(&m_Ticker);
uv_close((uv_handle_t*)&m_Handle, &OnClosed);
}
@ -574,8 +620,34 @@ namespace libuv
void
Loop::stop()
{
uv_stop(m_Impl.get());
llarp::LogInfo("stopping event loop");
m_Run.store(false);
CloseAll();
}
void
Loop::CloseAll()
{
llarp::LogInfo("Closing all handles");
uv_walk(
m_Impl.get(),
[](uv_handle_t* h, void*) {
if(uv_is_closing(h))
return;
if(h->data && uv_is_active(h))
{
static_cast< glue* >(h->data)->Close();
}
},
nullptr);
}
void
Loop::stopped()
{
tick(50);
llarp::LogInfo("we have stopped");
}
bool
@ -585,9 +657,9 @@ namespace libuv
udp->impl = impl;
if(impl->Bind())
{
m_CloseFuncs.emplace_back(std::bind(&udp_glue::Close, impl));
return true;
}
delete impl;
return false;
}
@ -610,7 +682,6 @@ namespace libuv
tun->impl = glue;
if(glue->Init(m_Impl.get()))
{
m_CloseFuncs.emplace_back(std::bind(&tun_glue ::Close, glue));
return true;
}
delete glue;

@ -37,13 +37,10 @@ namespace libuv
stop() override;
void
stopped() override
{
for(const auto& func : m_CloseFuncs)
func();
m_CloseFuncs.clear();
llarp::LogInfo("event loop stopped");
}
stopped() override;
void
CloseAll();
bool
udp_listen(llarp_udp_io* l, const sockaddr* src) override;
@ -96,7 +93,6 @@ namespace libuv
std::unique_ptr< uv_loop_t, DestructLoop > m_Impl;
uv_timer_t m_TickTimer;
std::atomic< bool > m_Run;
std::vector< std::function< void(void) > > m_CloseFuncs;
};
} // namespace libuv

Loading…
Cancel
Save