order downstream packets

pull/170/head
Jeff Becker 6 years ago
parent c7f78427e0
commit 8e6046ff1c
No known key found for this signature in database
GPG Key ID: F357B3B42F6F9B05

@ -59,9 +59,11 @@ namespace llarp
p->SetDropHandler(std::bind(&BaseSession::HandleTrafficDrop, this,
std::placeholders::_1, std::placeholders::_2,
std::placeholders::_3));
p->SetExitTrafficHandler(std::bind(&BaseSession::HandleTraffic, this,
std::placeholders::_1,
std::placeholders::_2));
p->SetExitTrafficHandler(
std::bind(&BaseSession::HandleTraffic, this, std::placeholders::_1,
std::placeholders::_2, std::placeholders::_3));
p->AddObtainExitHandler(std::bind(&BaseSession::HandleGotExit, this,
std::placeholders::_1,
std::placeholders::_2));
@ -91,13 +93,16 @@ namespace llarp
}
bool
BaseSession::HandleTraffic(llarp::path::Path* p, llarp_buffer_t pkt)
BaseSession::HandleTraffic(llarp::path::Path* p, llarp_buffer_t buf,
uint64_t counter)
{
(void)p;
if(m_WritePacket)
{
if(!m_WritePacket(pkt))
llarp::net::IPv4Packet pkt;
if(!pkt.Load(buf))
return false;
m_Downstream.emplace(counter, pkt);
m_LastUse = router->Now();
return true;
}
@ -153,7 +158,7 @@ namespace llarp
}
bool
BaseSession::FlushUpstreamTraffic()
BaseSession::Flush()
{
auto now = router->Now();
auto path = PickRandomEstablishedPath(llarp::path::ePathRoleExit);
@ -176,6 +181,12 @@ namespace llarp
queue.pop_front();
}
}
while(m_Downstream.size())
{
if(m_WritePacket)
m_WritePacket(m_Downstream.top().second.ConstBuffer());
m_Downstream.pop();
}
return true;
}

@ -7,6 +7,7 @@
#include <pathbuilder.hpp>
#include <deque>
#include <queue>
namespace llarp
{
@ -37,8 +38,9 @@ namespace llarp
bool
QueueUpstreamTraffic(llarp::net::IPv4Packet pkt, const size_t packSize);
/// flush upstream and downstream traffic
bool
FlushUpstreamTraffic();
Flush();
bool
IsReady() const;
@ -71,13 +73,30 @@ namespace llarp
HandleGotExit(llarp::path::Path* p, llarp_time_t b);
bool
HandleTraffic(llarp::path::Path* p, llarp_buffer_t buf);
HandleTraffic(llarp::path::Path* p, llarp_buffer_t buf, uint64_t seqno);
private:
using UpstreamTrafficQueue_t =
std::deque< llarp::routing::TransferTrafficMessage >;
using TieredQueue_t = std::map< uint8_t, UpstreamTrafficQueue_t >;
TieredQueue_t m_Upstream;
using DownstreamPkt = std::pair< uint64_t, llarp::net::IPv4Packet >;
struct DownstreamPktSorter
{
bool
operator()(const DownstreamPkt& left, const DownstreamPkt& right) const
{
return left.first < right.first;
}
};
using DownstreamTrafficQueue_t =
std::priority_queue< DownstreamPkt, std::vector< DownstreamPkt >,
DownstreamPktSorter >;
DownstreamTrafficQueue_t m_Downstream;
uint64_t m_Counter;
llarp_time_t m_LastUse;
};

@ -206,7 +206,7 @@ namespace llarp
auto itr = m_SNodeSessions.begin();
while(itr != m_SNodeSessions.end())
{
if(!itr->second->FlushUpstreamTraffic())
if(!itr->second->Flush())
{
llarp::LogWarn("failed to flushsnode traffic to ", itr->first,
" via outbound session");

@ -490,7 +490,7 @@ namespace llarp
return true;
});
if(m_Exit)
m_Exit->FlushUpstreamTraffic();
m_Exit->Flush();
FlushSNodeTraffic();
}

@ -129,7 +129,7 @@ namespace llarp
auto itr = m_SNodeSessions.begin();
while(itr != m_SNodeSessions.end())
{
itr->second->FlushUpstreamTraffic();
itr->second->Flush();
++itr;
}
}

Loading…
Cancel
Save