From 5e0acc11970cc48edc9025a32bd288d7d318c2c8 Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Tue, 30 Apr 2019 09:56:39 -0400 Subject: [PATCH] separate upstream/downstream flush --- llarp/exit/session.cpp | 9 +++++++-- llarp/exit/session.hpp | 10 +++++++--- llarp/handlers/exit.cpp | 4 +++- llarp/handlers/tun.cpp | 11 ++++++++--- llarp/service/endpoint.cpp | 36 ++++++++++++++++++++++-------------- llarp/service/endpoint.hpp | 10 ++++++---- 6 files changed, 53 insertions(+), 27 deletions(-) diff --git a/llarp/exit/session.cpp b/llarp/exit/session.cpp index 2aaf1e371..8865f590c 100644 --- a/llarp/exit/session.cpp +++ b/llarp/exit/session.cpp @@ -231,7 +231,7 @@ namespace llarp } bool - BaseSession::Flush() + BaseSession::FlushUpstream() { auto now = router->Now(); auto path = PickRandomEstablishedPath(llarp::path::ePathRoleExit); @@ -259,13 +259,18 @@ namespace llarp item.second.clear(); m_Upstream.clear(); } + return true; + } + + void + BaseSession::FlushDownstream() + { while(m_Downstream.size()) { if(m_WritePacket) m_WritePacket(m_Downstream.top().second.ConstBuffer()); m_Downstream.pop(); } - return true; } SNodeSession::SNodeSession( diff --git a/llarp/exit/session.hpp b/llarp/exit/session.hpp index be9764bcd..8cc910def 100644 --- a/llarp/exit/session.hpp +++ b/llarp/exit/session.hpp @@ -45,7 +45,7 @@ namespace llarp ShouldBundleRC() const override { // TODO: make configurable - return true; + return false; } void @@ -67,9 +67,13 @@ namespace llarp bool QueueUpstreamTraffic(llarp::net::IPv4Packet pkt, const size_t packSize); - /// flush upstream and downstream traffic + /// flush upstream to exit via paths bool - Flush(); + FlushUpstream(); + + /// flush downstream to user via tun + void + FlushDownstream(); path::PathRole GetRoles() const override diff --git a/llarp/handlers/exit.cpp b/llarp/handlers/exit.cpp index 80d5d92b8..15d584daf 100644 --- a/llarp/handlers/exit.cpp +++ b/llarp/handlers/exit.cpp @@ -247,11 +247,13 @@ namespace llarp auto itr = m_SNodeSessions.begin(); while(itr != m_SNodeSessions.end()) { - if(!itr->second->Flush()) + // TODO: move flush upstream to router event loop + if(!itr->second->FlushUpstream()) { LogWarn("failed to flush snode traffic to ", itr->first, " via outbound session"); } + itr->second->FlushDownstream(); ++itr; } } diff --git a/llarp/handlers/tun.cpp b/llarp/handlers/tun.cpp index ee25a7ed0..de809f38b 100644 --- a/llarp/handlers/tun.cpp +++ b/llarp/handlers/tun.cpp @@ -12,6 +12,7 @@ #include #include #include +#include namespace llarp { @@ -228,6 +229,11 @@ namespace llarp TunEndpoint::Flush() { FlushSend(); + if(m_Exit) + { + llarp::exit::BaseSession_ptr ex = m_Exit; + RouterLogic()->queue_func([=] { ex->FlushUpstream(); }); + } } static bool @@ -772,14 +778,13 @@ namespace llarp self->FlushSend(); // flush exit traffic queues if it's there if(self->m_Exit) - self->m_Exit->Flush(); - // flush snode traffic - self->FlushSNodeTraffic(); + self->m_Exit->FlushDownstream(); // flush network to user self->m_NetworkToUserPktQueue.Process([tun](net::IPv4Packet &pkt) { if(!llarp_ev_tun_async_write(tun, pkt.Buffer())) llarp::LogWarn("packet dropped"); }); + self->Pump(self->Now()); } void diff --git a/llarp/service/endpoint.cpp b/llarp/service/endpoint.cpp index 66db92baa..215ca6479 100644 --- a/llarp/service/endpoint.cpp +++ b/llarp/service/endpoint.cpp @@ -169,17 +169,6 @@ namespace llarp } } - void - Endpoint::FlushSNodeTraffic() - { - auto itr = m_SNodeSessions.begin(); - while(itr != m_SNodeSessions.end()) - { - itr->second->Flush(); - ++itr; - } - } - bool Endpoint::IsReady() const { @@ -1131,6 +1120,24 @@ namespace llarp itr->second->Pump(now); ++itr; } + + RouterLogic()->queue_func([&]() { + for(const auto& item : m_SNodeSessions) + item.second->FlushUpstream(); + }); + + EndpointLogic()->queue_func([&]() { + for(const auto& item : m_SNodeSessions) + item.second->FlushDownstream(); + }); + + RouterLogic()->queue_func([&]() { + auto router = Router(); + util::Lock lock(&m_SendQueueMutex); + for(const auto& item : m_SendQueue) + item.second->SendRoutingMessage(*item.first, router); + m_SendQueue.clear(); + }); } bool @@ -1193,9 +1200,10 @@ namespace llarp return false; } LogDebug(Name(), " send ", data.sz, " via ", remoteIntro.router); - auto router = Router(); - RouterLogic()->queue_func( - [=]() { p->SendRoutingMessage(*transfer, router); }); + { + util::Lock lock(&m_SendQueueMutex); + m_SendQueue.emplace_back(transfer, p); + } return true; } } diff --git a/llarp/service/endpoint.hpp b/llarp/service/endpoint.hpp index f9a4a09bb..a53188ef8 100644 --- a/llarp/service/endpoint.hpp +++ b/llarp/service/endpoint.hpp @@ -209,9 +209,6 @@ namespace llarp bool SendToSNodeOrQueue(const RouterID& addr, const llarp_buffer_t& payload); - void - FlushSNodeTraffic(); - bool HandleDataDrop(path::Path_ptr p, const PathID_t& dst, uint64_t s); @@ -365,6 +362,11 @@ namespace llarp std::string m_NetNS; bool m_BundleRC = false; + using Msg_ptr = std::shared_ptr< const routing::PathTransferMessage >; + using SendEvent_t = std::pair< Msg_ptr, path::Path_ptr >; + util::Mutex m_SendQueueMutex; + std::deque< SendEvent_t > m_SendQueue; + using PendingTraffic = std::unordered_map< Address, PendingBufferQueue, Address::Hash >; @@ -379,7 +381,7 @@ namespace llarp using SNodeSessions = std::unordered_multimap< RouterID, std::shared_ptr< exit::BaseSession >, RouterID::Hash >; - + util::Mutex m_SNodeSessionsMutex; SNodeSessions m_SNodeSessions; std::unordered_map< Address, ServiceInfo, Address::Hash >