From c6d77e72f2a8166f5a8f7866840d84ae26f10cdb Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Tue, 7 Jan 2020 06:59:17 -0500 Subject: [PATCH] fix up tcp connection logic --- llarp/ev/ev.cpp | 1 + llarp/ev/ev.hpp | 2 +- llarp/ev/ev_libuv.cpp | 95 +++++++++++-------------------------------- 3 files changed, 26 insertions(+), 72 deletions(-) diff --git a/llarp/ev/ev.cpp b/llarp/ev/ev.cpp index 4eb65ae8c..1d952c1bc 100644 --- a/llarp/ev/ev.cpp +++ b/llarp/ev/ev.cpp @@ -135,6 +135,7 @@ llarp_tcp_conn_async_write(struct llarp_tcp_conn *conn, const llarp_buffer_t &b) if(amount <= 0) { llarp::LogError("write underrun"); + llarp_tcp_conn_close(conn); return false; } buf.underlying.cur += amount; diff --git a/llarp/ev/ev.hpp b/llarp/ev/ev.hpp index b10400ebd..e8f853061 100644 --- a/llarp/ev/ev.hpp +++ b/llarp/ev/ev.hpp @@ -49,7 +49,7 @@ struct llarp_ev_pkt_pipe; #define EV_READ_BUF_SZ (4 * 1024UL) #endif #ifndef EV_WRITE_BUF_SZ -#define EV_WRITE_BUF_SZ (2 * 1024UL) +#define EV_WRITE_BUF_SZ (4 * 1024UL) #endif /// do io and reset errno after diff --git a/llarp/ev/ev_libuv.cpp b/llarp/ev/ev_libuv.cpp index a1ee29519..08dbfd9bb 100644 --- a/llarp/ev/ev_libuv.cpp +++ b/llarp/ev/ev_libuv.cpp @@ -29,9 +29,11 @@ namespace libuv WriteEvent() = default; - explicit WriteEvent(WriteBuffer_t buf) : data(std::move(buf)) + explicit WriteEvent(size_t sz, char* ptr) { request.data = this; + data.resize(sz); + std::copy_n(ptr, sz, data.begin()); } uv_buf_t @@ -54,11 +56,9 @@ namespace libuv llarp_tcp_acceptor* const m_Accept; llarp_tcp_conn m_Conn; llarp::Addr m_Addr; - llarp::thread::Queue< WriteBuffer_t > m_WriteQueue; - uv_async_t m_WriteNotify; conn_glue(uv_loop_t* loop, llarp_tcp_connecter* tcp, const sockaddr* addr) - : m_TCP(tcp), m_Accept(nullptr), m_Addr(*addr), m_WriteQueue(32) + : m_TCP(tcp), m_Accept(nullptr), m_Addr(*addr) { m_Connect.data = this; m_Handle.data = this; @@ -66,29 +66,24 @@ namespace libuv uv_tcp_init(loop, &m_Handle); m_Ticker.data = this; uv_check_init(loop, &m_Ticker); - m_Conn.close = &ExplicitClose; - m_Conn.write = &ExplicitWrite; - m_WriteNotify.data = this; - uv_async_init(loop, &m_WriteNotify, &OnShouldWrite); + m_Conn.close = &ExplicitClose; + m_Conn.write = &ExplicitWrite; } conn_glue(uv_loop_t* loop, llarp_tcp_acceptor* tcp, const sockaddr* addr) - : m_TCP(nullptr), m_Accept(tcp), m_Addr(*addr), m_WriteQueue(32) + : m_TCP(nullptr), m_Accept(tcp), m_Addr(*addr) { m_Connect.data = nullptr; m_Handle.data = this; uv_tcp_init(loop, &m_Handle); m_Ticker.data = this; uv_check_init(loop, &m_Ticker); - m_Accept->close = &ExplicitCloseAccept; - m_Conn.write = nullptr; - m_Conn.closed = nullptr; - m_WriteNotify.data = this; - uv_async_init(loop, &m_WriteNotify, &OnShouldWrite); + m_Accept->close = &ExplicitCloseAccept; + m_Conn.write = nullptr; + m_Conn.closed = nullptr; } - conn_glue(conn_glue* parent) - : m_TCP(nullptr), m_Accept(nullptr), m_WriteQueue(32) + conn_glue(conn_glue* parent) : m_TCP(nullptr), m_Accept(nullptr) { m_Connect.data = nullptr; m_Conn.close = &ExplicitClose; @@ -97,8 +92,6 @@ namespace libuv uv_tcp_init(parent->m_Handle.loop, &m_Handle); m_Ticker.data = this; uv_check_init(parent->m_Handle.loop, &m_Ticker); - m_WriteNotify.data = this; - uv_async_init(parent->m_Handle.loop, &m_WriteNotify, &OnShouldWrite); } static void @@ -221,45 +214,19 @@ namespace libuv { if(uv_is_closing((const uv_handle_t*)&m_Handle)) return -1; - if(uv_is_closing((const uv_handle_t*)&m_WriteNotify)) - return -1; - WriteBuffer_t buf(sz); - std::copy_n(data, sz, buf.begin()); - int result = -1; - if(m_WriteQueue.tryPushBack(std::move(buf)) - == llarp::thread::QueueReturn::Success) - { - result = sz; - } - else - { - WriteFail(); - } - uv_async_send(&m_WriteNotify); - return result; - } - - void - FlushWrite() - { - while(not m_WriteQueue.empty()) - { - auto data = m_WriteQueue.popFront(); - WriteEvent* ev = new WriteEvent(std::move(data)); - auto buf = ev->Buffer(); - if(uv_write(ev->Request(), Stream(), &buf, 1, &OnWritten) != 0) - { - Close(); - return; - } - } + WriteEvent* ev = new WriteEvent(sz, data); + auto buf = ev->Buffer(); + if(uv_write(ev->Request(), Stream(), &buf, 1, &OnWritten) == 0) + return sz; + delete ev; + return -1; } static void OnClosed(uv_handle_t* h) { conn_glue* conn = static_cast< conn_glue* >(h->data); - LoopCall(h, std::bind(&conn_glue::HandleClosed, conn)); + conn->HandleClosed(); } static void @@ -299,25 +266,17 @@ namespace libuv delete shut; } - static void - OnWriteClosed(uv_handle_t* h) - { - conn_glue* conn = static_cast< conn_glue* >(h->data); - auto* shut = new uv_shutdown_t(); - shut->data = conn; - uv_shutdown(shut, conn->Stream(), &OnShutdown); - } - void Close() override { - if(uv_is_closing((uv_handle_t*)&m_WriteNotify)) + if(uv_is_closing((uv_handle_t*)Stream())) return; llarp::LogDebug("close tcp connection"); - m_WriteQueue.disable(); - uv_close((uv_handle_t*)&m_WriteNotify, &OnWriteClosed); uv_check_stop(&m_Ticker); uv_read_stop(Stream()); + auto* shut = new uv_shutdown_t(); + shut->data = this; + uv_shutdown(shut, Stream(), &OnShutdown); } static void @@ -326,7 +285,7 @@ namespace libuv if(status == 0) { conn_glue* conn = static_cast< conn_glue* >(stream->data); - LoopCall(stream, std::bind(&conn_glue::Accept, conn)); + conn->Accept(); } else { @@ -334,17 +293,11 @@ namespace libuv } } - static void - OnShouldWrite(uv_async_t* h) - { - static_cast< conn_glue* >(h->data)->FlushWrite(); - } - static void OnTick(uv_check_t* t) { conn_glue* conn = static_cast< conn_glue* >(t->data); - LoopCall(t, std::bind(&conn_glue::Tick, conn)); + conn->Tick(); } void