separate upstream/downstream flush

pull/576/head
Jeff Becker 5 years ago
parent d50b18d7b0
commit 5e0acc1197
No known key found for this signature in database
GPG Key ID: F357B3B42F6F9B05

@ -231,7 +231,7 @@ namespace llarp
} }
bool bool
BaseSession::Flush() BaseSession::FlushUpstream()
{ {
auto now = router->Now(); auto now = router->Now();
auto path = PickRandomEstablishedPath(llarp::path::ePathRoleExit); auto path = PickRandomEstablishedPath(llarp::path::ePathRoleExit);
@ -259,13 +259,18 @@ namespace llarp
item.second.clear(); item.second.clear();
m_Upstream.clear(); m_Upstream.clear();
} }
return true;
}
void
BaseSession::FlushDownstream()
{
while(m_Downstream.size()) while(m_Downstream.size())
{ {
if(m_WritePacket) if(m_WritePacket)
m_WritePacket(m_Downstream.top().second.ConstBuffer()); m_WritePacket(m_Downstream.top().second.ConstBuffer());
m_Downstream.pop(); m_Downstream.pop();
} }
return true;
} }
SNodeSession::SNodeSession( SNodeSession::SNodeSession(

@ -45,7 +45,7 @@ namespace llarp
ShouldBundleRC() const override ShouldBundleRC() const override
{ {
// TODO: make configurable // TODO: make configurable
return true; return false;
} }
void void
@ -67,9 +67,13 @@ namespace llarp
bool bool
QueueUpstreamTraffic(llarp::net::IPv4Packet pkt, const size_t packSize); QueueUpstreamTraffic(llarp::net::IPv4Packet pkt, const size_t packSize);
/// flush upstream and downstream traffic /// flush upstream to exit via paths
bool bool
Flush(); FlushUpstream();
/// flush downstream to user via tun
void
FlushDownstream();
path::PathRole path::PathRole
GetRoles() const override GetRoles() const override

@ -247,11 +247,13 @@ namespace llarp
auto itr = m_SNodeSessions.begin(); auto itr = m_SNodeSessions.begin();
while(itr != m_SNodeSessions.end()) while(itr != m_SNodeSessions.end())
{ {
if(!itr->second->Flush()) // TODO: move flush upstream to router event loop
if(!itr->second->FlushUpstream())
{ {
LogWarn("failed to flush snode traffic to ", itr->first, LogWarn("failed to flush snode traffic to ", itr->first,
" via outbound session"); " via outbound session");
} }
itr->second->FlushDownstream();
++itr; ++itr;
} }
} }

@ -12,6 +12,7 @@
#include <ev/ev.hpp> #include <ev/ev.hpp>
#include <router/abstractrouter.hpp> #include <router/abstractrouter.hpp>
#include <service/context.hpp> #include <service/context.hpp>
#include <util/logic.hpp>
namespace llarp namespace llarp
{ {
@ -228,6 +229,11 @@ namespace llarp
TunEndpoint::Flush() TunEndpoint::Flush()
{ {
FlushSend(); FlushSend();
if(m_Exit)
{
llarp::exit::BaseSession_ptr ex = m_Exit;
RouterLogic()->queue_func([=] { ex->FlushUpstream(); });
}
} }
static bool static bool
@ -772,14 +778,13 @@ namespace llarp
self->FlushSend(); self->FlushSend();
// flush exit traffic queues if it's there // flush exit traffic queues if it's there
if(self->m_Exit) if(self->m_Exit)
self->m_Exit->Flush(); self->m_Exit->FlushDownstream();
// flush snode traffic
self->FlushSNodeTraffic();
// flush network to user // flush network to user
self->m_NetworkToUserPktQueue.Process([tun](net::IPv4Packet &pkt) { self->m_NetworkToUserPktQueue.Process([tun](net::IPv4Packet &pkt) {
if(!llarp_ev_tun_async_write(tun, pkt.Buffer())) if(!llarp_ev_tun_async_write(tun, pkt.Buffer()))
llarp::LogWarn("packet dropped"); llarp::LogWarn("packet dropped");
}); });
self->Pump(self->Now());
} }
void void

@ -169,17 +169,6 @@ namespace llarp
} }
} }
void
Endpoint::FlushSNodeTraffic()
{
auto itr = m_SNodeSessions.begin();
while(itr != m_SNodeSessions.end())
{
itr->second->Flush();
++itr;
}
}
bool bool
Endpoint::IsReady() const Endpoint::IsReady() const
{ {
@ -1131,6 +1120,24 @@ namespace llarp
itr->second->Pump(now); itr->second->Pump(now);
++itr; ++itr;
} }
RouterLogic()->queue_func([&]() {
for(const auto& item : m_SNodeSessions)
item.second->FlushUpstream();
});
EndpointLogic()->queue_func([&]() {
for(const auto& item : m_SNodeSessions)
item.second->FlushDownstream();
});
RouterLogic()->queue_func([&]() {
auto router = Router();
util::Lock lock(&m_SendQueueMutex);
for(const auto& item : m_SendQueue)
item.second->SendRoutingMessage(*item.first, router);
m_SendQueue.clear();
});
} }
bool bool
@ -1193,9 +1200,10 @@ namespace llarp
return false; return false;
} }
LogDebug(Name(), " send ", data.sz, " via ", remoteIntro.router); LogDebug(Name(), " send ", data.sz, " via ", remoteIntro.router);
auto router = Router(); {
RouterLogic()->queue_func( util::Lock lock(&m_SendQueueMutex);
[=]() { p->SendRoutingMessage(*transfer, router); }); m_SendQueue.emplace_back(transfer, p);
}
return true; return true;
} }
} }

@ -209,9 +209,6 @@ namespace llarp
bool bool
SendToSNodeOrQueue(const RouterID& addr, const llarp_buffer_t& payload); SendToSNodeOrQueue(const RouterID& addr, const llarp_buffer_t& payload);
void
FlushSNodeTraffic();
bool bool
HandleDataDrop(path::Path_ptr p, const PathID_t& dst, uint64_t s); HandleDataDrop(path::Path_ptr p, const PathID_t& dst, uint64_t s);
@ -365,6 +362,11 @@ namespace llarp
std::string m_NetNS; std::string m_NetNS;
bool m_BundleRC = false; bool m_BundleRC = false;
using Msg_ptr = std::shared_ptr< const routing::PathTransferMessage >;
using SendEvent_t = std::pair< Msg_ptr, path::Path_ptr >;
util::Mutex m_SendQueueMutex;
std::deque< SendEvent_t > m_SendQueue;
using PendingTraffic = using PendingTraffic =
std::unordered_map< Address, PendingBufferQueue, Address::Hash >; std::unordered_map< Address, PendingBufferQueue, Address::Hash >;
@ -379,7 +381,7 @@ namespace llarp
using SNodeSessions = std::unordered_multimap< using SNodeSessions = std::unordered_multimap<
RouterID, std::shared_ptr< exit::BaseSession >, RouterID::Hash >; RouterID, std::shared_ptr< exit::BaseSession >, RouterID::Hash >;
util::Mutex m_SNodeSessionsMutex;
SNodeSessions m_SNodeSessions; SNodeSessions m_SNodeSessions;
std::unordered_map< Address, ServiceInfo, Address::Hash > std::unordered_map< Address, ServiceInfo, Address::Hash >

Loading…
Cancel
Save