From 42ffbcca0a41a71d6a221b225a6568662346cf48 Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Mon, 22 Feb 2021 08:26:32 -0500 Subject: [PATCH] try coleasing inbound packets from iwp --- llarp/ev/ev.hpp | 27 ++++- llarp/ev/ev_libuv.cpp | 64 ++++++++++++ llarp/ev/ev_libuv.hpp | 11 ++ llarp/iwp/iwp.cpp | 30 +++++- llarp/iwp/iwp.hpp | 2 + llarp/iwp/linklayer.cpp | 182 ++++++++++++++++++++-------------- llarp/iwp/linklayer.hpp | 107 +++++++++++--------- llarp/iwp/session.cpp | 111 +++++++++++---------- llarp/iwp/session.hpp | 17 ++-- llarp/router/router.cpp | 2 + test/iwp/test_iwp_session.cpp | 1 + 11 files changed, 366 insertions(+), 188 deletions(-) diff --git a/llarp/ev/ev.hpp b/llarp/ev/ev.hpp index 1737a80bd..5dd0f5ad9 100644 --- a/llarp/ev/ev.hpp +++ b/llarp/ev/ev.hpp @@ -59,12 +59,31 @@ namespace llarp class NetworkInterface; } + /// distinct event loop waker uper + class EventLoopWakeup + { + protected: + std::function callback; + + public: + EventLoopWakeup(std::function cb) : callback{cb} + {} + + virtual ~EventLoopWakeup() = default; + + /// async wakeup and call callback once + virtual void + Wakeup() = 0; + + /// end operation + virtual void + End() = 0; + }; + // this (nearly!) abstract base class // is overriden for each platform struct EventLoop { - byte_t readbuf[EV_READ_BUF_SZ] = {0}; - virtual bool init() = 0; @@ -127,6 +146,10 @@ namespace llarp virtual void deregister_poll_fd_readable(int fd) = 0; + + /// make an event loop waker on this event loop + virtual EventLoopWakeup* + make_event_loop_waker(std::function callback) = 0; }; } // namespace llarp #endif diff --git a/llarp/ev/ev_libuv.cpp b/llarp/ev/ev_libuv.cpp index d7cde44f8..4af6eec7a 100644 --- a/llarp/ev/ev_libuv.cpp +++ b/llarp/ev/ev_libuv.cpp @@ -20,6 +20,54 @@ namespace libuv Close() = 0; }; + class UVWakeup final : public llarp::EventLoopWakeup, public glue + { + uv_async_t m_Impl; + const int m_Idx; + static void + OnWake(uv_async_t* self) + { + static_cast(self->data)->callback(); + } + + public: + UVWakeup(uv_loop_t* loop, std::function hook, int idx) + : llarp::EventLoopWakeup{hook}, m_Idx{idx} + { + uv_async_init(loop, &m_Impl, OnWake); + m_Impl.data = this; + } + + ~UVWakeup() = default; + + void + Close() override + { + uv_close((uv_handle_t*)&m_Impl, [](uv_handle_t* h) { + auto loop = static_cast(h->loop->data); + loop->delete_waker(static_cast(h->data)->m_Idx); + }); + } + + void + End() override + { + Close(); + } + + void + Wakeup() override + { + uv_async_send(&m_Impl); + } + + bool + operator<(const UVWakeup& other) const + { + return m_Idx < other.m_Idx; + } + }; + struct ticker_glue : public glue { std::function func; @@ -622,4 +670,20 @@ namespace libuv } } + llarp::EventLoopWakeup* + Loop::make_event_loop_waker(std::function callback) + { + auto wake_idx = m_NumWakers++; + auto wake = new UVWakeup{&m_Impl, callback, wake_idx}; + m_Wakers[wake_idx] = wake; + return wake; + } + + void + Loop::delete_waker(int idx) + { + delete m_Wakers[idx]; + m_Wakers.erase(idx); + } + } // namespace libuv diff --git a/llarp/ev/ev_libuv.hpp b/llarp/ev/ev_libuv.hpp index afb86c105..4eb162c40 100644 --- a/llarp/ev/ev_libuv.hpp +++ b/llarp/ev/ev_libuv.hpp @@ -12,6 +12,8 @@ namespace libuv { + class UVWakeup; + struct Loop final : public llarp::EventLoop { typedef std::function Callback; @@ -96,6 +98,12 @@ namespace libuv void set_pump_function(std::function pumpll) override; + llarp::EventLoopWakeup* + make_event_loop_waker(std::function callback) override; + + void + delete_waker(int idx); + void FlushLogic(); @@ -122,6 +130,9 @@ namespace libuv llarp::thread::Queue m_timerQueue; llarp::thread::Queue m_timerCancelQueue; std::optional m_EventLoopThreadID; + + int m_NumWakers; + std::unordered_map m_Wakers; }; } // namespace libuv diff --git a/llarp/iwp/iwp.cpp b/llarp/iwp/iwp.cpp index 038c03e28..14ab566d3 100644 --- a/llarp/iwp/iwp.cpp +++ b/llarp/iwp/iwp.cpp @@ -10,6 +10,7 @@ namespace llarp LinkLayer_ptr NewInboundLink( std::shared_ptr keyManager, + std::shared_ptr loop, GetRCFunc getrc, LinkMessageHandler h, SignBufferFunc sign, @@ -22,12 +23,25 @@ namespace llarp WorkerFunc_t work) { return std::make_shared( - keyManager, getrc, h, sign, before, est, reneg, timeout, closed, pumpDone, work, true); + keyManager, + loop, + getrc, + h, + sign, + before, + est, + reneg, + timeout, + closed, + pumpDone, + work, + true); } LinkLayer_ptr NewOutboundLink( std::shared_ptr keyManager, + std::shared_ptr loop, GetRCFunc getrc, LinkMessageHandler h, SignBufferFunc sign, @@ -40,7 +54,19 @@ namespace llarp WorkerFunc_t work) { return std::make_shared( - keyManager, getrc, h, sign, before, est, reneg, timeout, closed, pumpDone, work, false); + keyManager, + loop, + getrc, + h, + sign, + before, + est, + reneg, + timeout, + closed, + pumpDone, + work, + false); } } // namespace iwp } // namespace llarp diff --git a/llarp/iwp/iwp.hpp b/llarp/iwp/iwp.hpp index 1b9f71596..2fb15ea82 100644 --- a/llarp/iwp/iwp.hpp +++ b/llarp/iwp/iwp.hpp @@ -11,6 +11,7 @@ namespace llarp::iwp LinkLayer_ptr NewInboundLink( std::shared_ptr keyManager, + std::shared_ptr loop, GetRCFunc getrc, LinkMessageHandler h, SignBufferFunc sign, @@ -25,6 +26,7 @@ namespace llarp::iwp LinkLayer_ptr NewOutboundLink( std::shared_ptr keyManager, + std::shared_ptr loop, GetRCFunc getrc, LinkMessageHandler h, SignBufferFunc sign, diff --git a/llarp/iwp/linklayer.cpp b/llarp/iwp/linklayer.cpp index a402f1d72..d5f8fd0e0 100644 --- a/llarp/iwp/linklayer.cpp +++ b/llarp/iwp/linklayer.cpp @@ -4,96 +4,126 @@ #include #include -namespace llarp +namespace llarp::iwp { - namespace iwp + LinkLayer::LinkLayer( + std::shared_ptr keyManager, + std::shared_ptr ev, + GetRCFunc getrc, + LinkMessageHandler h, + SignBufferFunc sign, + BeforeConnectFunc_t before, + SessionEstablishedHandler est, + SessionRenegotiateHandler reneg, + TimeoutHandler timeout, + SessionClosedHandler closed, + PumpDoneHandler pumpDone, + WorkerFunc_t worker, + bool allowInbound) + : ILinkLayer( + keyManager, getrc, h, sign, before, est, reneg, timeout, closed, pumpDone, worker) + , m_Wakeup{ev->make_event_loop_waker([self = this]() { self->HandleWakeupPlaintext(); })} + , m_PlaintextRecv{1024} + , permitInbound{allowInbound} + + {} + + LinkLayer::~LinkLayer() { - LinkLayer::LinkLayer( - std::shared_ptr keyManager, - GetRCFunc getrc, - LinkMessageHandler h, - SignBufferFunc sign, - BeforeConnectFunc_t before, - SessionEstablishedHandler est, - SessionRenegotiateHandler reneg, - TimeoutHandler timeout, - SessionClosedHandler closed, - PumpDoneHandler pumpDone, - WorkerFunc_t worker, - bool allowInbound) - : ILinkLayer( - keyManager, getrc, h, sign, before, est, reneg, timeout, closed, pumpDone, worker) - , permitInbound{allowInbound} - {} + m_Wakeup->End(); + } - LinkLayer::~LinkLayer() = default; + const char* + LinkLayer::Name() const + { + return "iwp"; + } - const char* - LinkLayer::Name() const + uint16_t + LinkLayer::Rank() const + { + return 2; + } + + void + LinkLayer::RecvFrom(const SockAddr& from, ILinkSession::Packet_t pkt) + { + std::shared_ptr session; + auto itr = m_AuthedAddrs.find(from); + bool isNewSession = false; + if (itr == m_AuthedAddrs.end()) { - return "iwp"; + Lock_t lock(m_PendingMutex); + if (m_Pending.count(from) == 0) + { + if (not permitInbound) + return; + isNewSession = true; + m_Pending.insert({from, std::make_shared(this, from)}); + } + session = m_Pending.find(from)->second; } - - uint16_t - LinkLayer::Rank() const + else { - return 2; + Lock_t lock(m_AuthedLinksMutex); + auto range = m_AuthedLinks.equal_range(itr->second); + session = range.first->second; } - - void - LinkLayer::RecvFrom(const SockAddr& from, ILinkSession::Packet_t pkt) + if (session) { - std::shared_ptr session; - auto itr = m_AuthedAddrs.find(from); - bool isNewSession = false; - if (itr == m_AuthedAddrs.end()) - { - Lock_t lock(m_PendingMutex); - if (m_Pending.count(from) == 0) - { - if (not permitInbound) - return; - isNewSession = true; - m_Pending.insert({from, std::make_shared(this, from)}); - } - session = m_Pending.find(from)->second; - } - else - { - Lock_t lock(m_AuthedLinksMutex); - auto range = m_AuthedLinks.equal_range(itr->second); - session = range.first->second; - } - if (session) + bool success = session->Recv_LL(std::move(pkt)); + if (!success and isNewSession) { - bool success = session->Recv_LL(std::move(pkt)); - if (!success and isNewSession) - { - LogWarn("Brand new session failed; removing from pending sessions list"); - m_Pending.erase(m_Pending.find(from)); - } + LogWarn("Brand new session failed; removing from pending sessions list"); + m_Pending.erase(m_Pending.find(from)); } } + } - bool - LinkLayer::MapAddr(const RouterID& r, ILinkSession* s) - { - if (!ILinkLayer::MapAddr(r, s)) - return false; - m_AuthedAddrs.emplace(s->GetRemoteEndpoint(), r); - return true; - } + bool + LinkLayer::MapAddr(const RouterID& r, ILinkSession* s) + { + if (!ILinkLayer::MapAddr(r, s)) + return false; + m_AuthedAddrs.emplace(s->GetRemoteEndpoint(), r); + return true; + } - void - LinkLayer::UnmapAddr(const IpAddress& addr) - { - m_AuthedAddrs.erase(addr); - } + void + LinkLayer::UnmapAddr(const IpAddress& addr) + { + m_AuthedAddrs.erase(addr); + } + + std::shared_ptr + LinkLayer::NewOutboundSession(const RouterContact& rc, const AddressInfo& ai) + { + return std::make_shared(this, rc, ai); + } + + void + LinkLayer::AddWakeup(std::weak_ptr session) + { + m_PlaintextRecv.tryPushBack(session); + } - std::shared_ptr - LinkLayer::NewOutboundSession(const RouterContact& rc, const AddressInfo& ai) + void + LinkLayer::WakeupPlaintext() + { + m_Wakeup->Wakeup(); + } + + void + LinkLayer::HandleWakeupPlaintext() + { + while (not m_PlaintextRecv.empty()) { - return std::make_shared(this, rc, ai); + auto session = m_PlaintextRecv.popFront(); + auto ptr = session.lock(); + if (ptr) + ptr->HandlePlaintext(); } - } // namespace iwp -} // namespace llarp + PumpDone(); + } + +} // namespace llarp::iwp diff --git a/llarp/iwp/linklayer.hpp b/llarp/iwp/linklayer.hpp index 1af850486..cc9e90c3a 100644 --- a/llarp/iwp/linklayer.hpp +++ b/llarp/iwp/linklayer.hpp @@ -7,56 +7,69 @@ #include #include #include - +#include #include -namespace llarp +#include + +namespace llarp::iwp { - namespace iwp + struct Session; + + struct LinkLayer final : public ILinkLayer { - struct LinkLayer final : public ILinkLayer - { - LinkLayer( - std::shared_ptr keyManager, - GetRCFunc getrc, - LinkMessageHandler h, - SignBufferFunc sign, - BeforeConnectFunc_t before, - SessionEstablishedHandler est, - SessionRenegotiateHandler reneg, - TimeoutHandler timeout, - SessionClosedHandler closed, - PumpDoneHandler pumpDone, - WorkerFunc_t dowork, - bool permitInbound); - - ~LinkLayer() override; - - std::shared_ptr - NewOutboundSession(const RouterContact& rc, const AddressInfo& ai) override; - - const char* - Name() const override; - - uint16_t - Rank() const override; - - void - RecvFrom(const SockAddr& from, ILinkSession::Packet_t pkt) override; - - bool - MapAddr(const RouterID& pk, ILinkSession* s) override; - - void - UnmapAddr(const IpAddress& addr); - - private: - std::unordered_map m_AuthedAddrs; - const bool permitInbound; - }; - - using LinkLayer_ptr = std::shared_ptr; - } // namespace iwp -} // namespace llarp + LinkLayer( + std::shared_ptr keyManager, + std::shared_ptr ev, + GetRCFunc getrc, + LinkMessageHandler h, + SignBufferFunc sign, + BeforeConnectFunc_t before, + SessionEstablishedHandler est, + SessionRenegotiateHandler reneg, + TimeoutHandler timeout, + SessionClosedHandler closed, + PumpDoneHandler pumpDone, + WorkerFunc_t dowork, + bool permitInbound); + + ~LinkLayer() override; + + std::shared_ptr + NewOutboundSession(const RouterContact& rc, const AddressInfo& ai) override; + + const char* + Name() const override; + + uint16_t + Rank() const override; + + void + RecvFrom(const SockAddr& from, ILinkSession::Packet_t pkt) override; + + bool + MapAddr(const RouterID& pk, ILinkSession* s) override; + + void + UnmapAddr(const IpAddress& addr); + + void + WakeupPlaintext(); + + void + AddWakeup(std::weak_ptr peer); + + private: + void + HandleWakeupPlaintext(); + + EventLoopWakeup* const m_Wakeup; + llarp::thread::Queue> m_PlaintextRecv; + std::unordered_map m_AuthedAddrs; + const bool permitInbound; + }; + + using LinkLayer_ptr = std::shared_ptr; +} // namespace llarp::iwp #endif diff --git a/llarp/iwp/session.cpp b/llarp/iwp/session.cpp index 93fe40251..7fd6874b4 100644 --- a/llarp/iwp/session.cpp +++ b/llarp/iwp/session.cpp @@ -26,6 +26,8 @@ namespace llarp return pkt; } + constexpr size_t PlaintextQueueSize = 32; + Session::Session(LinkLayer* p, const RouterContact& rc, const AddressInfo& ai) : m_State{State::Initial} , m_Inbound{false} @@ -34,6 +36,7 @@ namespace llarp , m_RemoteAddr(ai.toIpAddress()) , m_ChosenAI(ai) , m_RemoteRC(rc) + , m_PlaintextRecv{PlaintextQueueSize} { token.Zero(); GotLIM = util::memFn(&Session::GotOutboundLIM, this); @@ -46,6 +49,7 @@ namespace llarp , m_Parent(p) , m_CreatedAt{p->Now()} , m_RemoteAddr(from) + , m_PlaintextRecv{PlaintextQueueSize} { token.Randomize(); GotLIM = util::memFn(&Session::GotInboundLIM, this); @@ -130,23 +134,21 @@ namespace llarp void Session::EncryptAndSend(ILinkSession::Packet_t data) { - if (m_EncryptNext == nullptr) - m_EncryptNext = std::make_shared(); - m_EncryptNext->emplace_back(std::move(data)); + m_EncryptNext.emplace_back(std::move(data)); if (!IsEstablished()) { EncryptWorker(std::move(m_EncryptNext)); - m_EncryptNext = nullptr; + m_EncryptNext = CryptoQueue_t{}; } } void - Session::EncryptWorker(CryptoQueue_ptr msgs) + Session::EncryptWorker(CryptoQueue_t msgs) { - LogDebug("encrypt worker ", msgs->size(), " messages"); - for (auto& pkt : *msgs) + LogDebug("encrypt worker ", msgs.size(), " messages"); + for (auto& pkt : msgs) { - llarp_buffer_t pktbuf(pkt); + llarp_buffer_t pktbuf{pkt}; const TunnelNonce nonce_ptr{pkt.data() + HMACSIZE}; pktbuf.base += PacketOverhead; pktbuf.cur = pktbuf.base; @@ -243,16 +245,17 @@ namespace llarp } auto self = shared_from_this(); assert(self.use_count() > 1); - if (m_EncryptNext && !m_EncryptNext->empty()) + if (not m_EncryptNext.empty()) { - m_Parent->QueueWork([self, data = std::move(m_EncryptNext)] { self->EncryptWorker(data); }); - m_EncryptNext = nullptr; + m_Parent->QueueWork([self, data = m_EncryptNext] { self->EncryptWorker(data); }); + m_EncryptNext.clear(); } - if (m_DecryptNext && !m_DecryptNext->empty()) + if (not m_DecryptNext.empty()) { - m_Parent->QueueWork([self, data = std::move(m_DecryptNext)] { self->DecryptWorker(data); }); - m_DecryptNext = nullptr; + m_Parent->AddWakeup(weak_from_this()); + m_Parent->QueueWork([self, data = m_DecryptNext] { self->DecryptWorker(data); }); + m_DecryptNext.clear(); } } @@ -596,19 +599,19 @@ namespace llarp void Session::HandleSessionData(Packet_t pkt) { - if (m_DecryptNext == nullptr) - m_DecryptNext = std::make_shared(); - m_DecryptNext->emplace_back(std::move(pkt)); + m_DecryptNext.emplace_back(std::move(pkt)); } void - Session::DecryptWorker(CryptoQueue_ptr msgs) + Session::DecryptWorker(CryptoQueue_t msgs) { - CryptoQueue_ptr recvMsgs = std::make_shared(); - for (auto& pkt : *msgs) + auto itr = msgs.begin(); + while (itr != msgs.end()) { + auto& pkt = *itr; if (not DecryptMessageInPlace(pkt)) { + itr = msgs.erase(itr); LogError("failed to decrypt session data from ", m_RemoteAddr); continue; } @@ -616,52 +619,54 @@ namespace llarp { LogError( "protocol version mismatch ", int(pkt[PacketOverhead]), " != ", LLARP_PROTO_VERSION); + itr = msgs.erase(itr); continue; } - recvMsgs->emplace_back(std::move(pkt)); + ++itr; } - LogDebug("decrypted ", recvMsgs->size(), " packets from ", m_RemoteAddr); - LogicCall(m_Parent->logic(), [self = shared_from_this(), msgs = recvMsgs] { - self->HandlePlaintext(std::move(msgs)); - }); + m_PlaintextRecv.pushBack(std::move(msgs)); + m_Parent->WakeupPlaintext(); } void - Session::HandlePlaintext(CryptoQueue_ptr msgs) + Session::HandlePlaintext() { - for (auto& result : *msgs) + while (not m_PlaintextRecv.empty()) { - LogDebug("Command ", int(result[PacketOverhead + 1])); - switch (result[PacketOverhead + 1]) + auto queue = m_PlaintextRecv.popFront(); + for (auto& result : queue) { - case Command::eXMIT: - HandleXMIT(std::move(result)); - break; - case Command::eDATA: - HandleDATA(std::move(result)); - break; - case Command::eACKS: - HandleACKS(std::move(result)); - break; - case Command::ePING: - HandlePING(std::move(result)); - break; - case Command::eNACK: - HandleNACK(std::move(result)); - break; - case Command::eCLOS: - HandleCLOS(std::move(result)); - break; - case Command::eMACK: - HandleMACK(std::move(result)); - break; - default: - LogError("invalid command ", int(result[PacketOverhead + 1]), " from ", m_RemoteAddr); + LogDebug("Command ", int(result[PacketOverhead + 1])); + switch (result[PacketOverhead + 1]) + { + case Command::eXMIT: + HandleXMIT(std::move(result)); + break; + case Command::eDATA: + HandleDATA(std::move(result)); + break; + case Command::eACKS: + HandleACKS(std::move(result)); + break; + case Command::ePING: + HandlePING(std::move(result)); + break; + case Command::eNACK: + HandleNACK(std::move(result)); + break; + case Command::eCLOS: + HandleCLOS(std::move(result)); + break; + case Command::eMACK: + HandleMACK(std::move(result)); + break; + default: + LogError("invalid command ", int(result[PacketOverhead + 1]), " from ", m_RemoteAddr); + } } } SendMACK(); Pump(); - m_Parent->PumpDone(); } void diff --git a/llarp/iwp/session.hpp b/llarp/iwp/session.hpp index 00ae14510..e6b71f2e0 100644 --- a/llarp/iwp/session.hpp +++ b/llarp/iwp/session.hpp @@ -126,6 +126,8 @@ namespace llarp { return m_Inbound; } + void + HandlePlaintext(); private: enum class State @@ -189,19 +191,18 @@ namespace llarp /// rx messages to send in next round of multiacks std::priority_queue, std::greater> m_SendMACKs; - using CryptoQueue_t = std::list; - using CryptoQueue_ptr = std::shared_ptr; - CryptoQueue_ptr m_EncryptNext; - CryptoQueue_ptr m_DecryptNext; + using CryptoQueue_t = std::vector; - void - EncryptWorker(CryptoQueue_ptr msgs); + CryptoQueue_t m_EncryptNext; + CryptoQueue_t m_DecryptNext; + + llarp::thread::Queue m_PlaintextRecv; void - DecryptWorker(CryptoQueue_ptr msgs); + EncryptWorker(CryptoQueue_t msgs); void - HandlePlaintext(CryptoQueue_ptr msgs); + DecryptWorker(CryptoQueue_t msgs); void HandleGotIntro(Packet_t pkt); diff --git a/llarp/router/router.cpp b/llarp/router/router.cpp index 235a64fe4..53b9fb222 100644 --- a/llarp/router/router.cpp +++ b/llarp/router/router.cpp @@ -612,6 +612,7 @@ namespace llarp { auto server = iwp::NewInboundLink( m_keyManager, + netloop(), util::memFn(&AbstractRouter::rc, this), util::memFn(&AbstractRouter::HandleRecvLinkMessageBuffer, this), util::memFn(&AbstractRouter::Sign, this), @@ -1313,6 +1314,7 @@ namespace llarp { auto link = iwp::NewOutboundLink( m_keyManager, + netloop(), util::memFn(&AbstractRouter::rc, this), util::memFn(&AbstractRouter::HandleRecvLinkMessageBuffer, this), util::memFn(&AbstractRouter::Sign, this), diff --git a/test/iwp/test_iwp_session.cpp b/test/iwp/test_iwp_session.cpp index 6da9c6947..e31113bdf 100644 --- a/test/iwp/test_iwp_session.cpp +++ b/test/iwp/test_iwp_session.cpp @@ -76,6 +76,7 @@ struct IWPLinkContext { link = make_link( keyManager, + m_Loop, // getrc [&]() -> const llarp::RouterContact& { return rc; }, // link message handler