try tiered packing based off size of packet

pull/90/head
Jeff Becker 6 years ago
parent 11372c9544
commit 2d83ad4637
No known key found for this signature in database
GPG Key ID: F357B3B42F6F9B05

@ -103,7 +103,9 @@ namespace llarp
llarp_time_t m_LastActive;
bool m_RewriteSource;
using InboundTrafficQueue_t = std::deque<llarp::routing::TransferTrafficMessage>;
InboundTrafficQueue_t m_DownstreamQueue;
using TieredQueue = std::map<uint8_t, InboundTrafficQueue_t>;
// maps number of fragments the message will fit in to the queue for it
TieredQueue m_DownstreamQueues;
};
} // namespace exit
} // namespace llarp

@ -61,7 +61,8 @@ namespace llarp
private:
using UpstreamTrafficQueue_t = std::deque<llarp::routing::TransferTrafficMessage>;
UpstreamTrafficQueue_t m_UpstreamQueue;
using TieredQueue_t = std::map<uint8_t, UpstreamTrafficQueue_t>;
TieredQueue_t m_Upstream;
llarp::SecretKey m_ExitIdentity;
};

@ -126,6 +126,15 @@ namespace llarp
}
};
struct CompareSize
{
bool
operator()(const IPv4Packet& left, const IPv4Packet& right)
{
return left.sz < right.sz;
}
};
struct CompareOrder
{
bool

@ -9,7 +9,7 @@ namespace llarp
{
namespace routing
{
constexpr size_t ExitPadSize = 768;
constexpr size_t ExitPadSize = 512 * 3;
constexpr size_t MaxExitMTU = 1500;
struct TransferTrafficMessage final : public IMessage
{

@ -110,15 +110,19 @@ namespace llarp
else
src = pkt.src();
pkt.UpdateIPv4PacketOnDst(src, m_IP);
if(m_DownstreamQueue.size() == 0)
m_DownstreamQueue.emplace_back();
auto pktbuf = pkt.Buffer();
auto & msg = m_DownstreamQueue.back();
uint8_t queue_idx = pktbuf.sz / llarp::routing::ExitPadSize;
auto & queue = m_DownstreamQueues[queue_idx];
if(queue.size() == 0)
{
queue.emplace_back();
return queue.back().PutBuffer(buf);
}
auto & msg = queue.back();
if(msg.Size() + pktbuf.sz > llarp::routing::ExitPadSize)
{
m_DownstreamQueue.emplace_back();
return m_DownstreamQueue.back().PutBuffer(pktbuf);
queue.emplace_back();
return queue.back().PutBuffer(pktbuf);
}
else
return msg.PutBuffer(pktbuf);
@ -128,20 +132,27 @@ namespace llarp
Endpoint::FlushInboundTraffic()
{
auto path = GetCurrentPath();
bool sent = m_DownstreamQueue.size() == 0 && path;
bool sent = path != nullptr;
if(path)
{
for(auto & msg : m_DownstreamQueue)
for(auto & item : m_DownstreamQueues)
{
msg.S = path->NextSeqNo();
if(path->SendRoutingMessage(&msg, m_Parent->Router()))
auto & queue = item.second;
while(queue.size())
{
m_RxRate += msg.Size();
sent = true;
auto & msg = queue.front();
msg.S = path->NextSeqNo();
if(path->SendRoutingMessage(&msg, m_Parent->Router()))
{
m_RxRate += msg.Size();
sent = true;
}
queue.pop_front();
}
}
}
m_DownstreamQueue.clear();
for(auto & item : m_DownstreamQueues)
item.second.clear();
return sent;
}

@ -100,18 +100,22 @@ namespace llarp
bool
BaseSession::QueueUpstreamTraffic(llarp::net::IPv4Packet pkt, const size_t N)
{
auto buf = pkt.Buffer();
auto & queue = m_Upstream[buf.sz / N];
// queue overflow
if(m_UpstreamQueue.size() >= MaxUpstreamQueueLength)
if(queue.size() >= MaxUpstreamQueueLength)
return false;
if(m_UpstreamQueue.size() == 0)
m_UpstreamQueue.emplace_back();
auto & back = m_UpstreamQueue.back();
auto buf = pkt.Buffer();
if(queue.size() == 0)
{
queue.emplace_back();
return queue.back().PutBuffer(buf);
}
auto & back = queue.back();
// pack to nearest N
if(back.Size() + buf.sz > N)
{
m_UpstreamQueue.emplace_back();
return m_UpstreamQueue.back().PutBuffer(buf);
queue.emplace_back();
return queue.back().PutBuffer(buf);
}
else
return back.PutBuffer(buf);
@ -130,15 +134,20 @@ namespace llarp
if(!path)
{
// discard
m_UpstreamQueue.clear();
for(auto & item : m_Upstream)
item.second.clear();
return false;
}
while(m_UpstreamQueue.size())
for (auto & item : m_Upstream)
{
auto & msg = m_UpstreamQueue.front();
msg.S = path->NextSeqNo();
path->SendRoutingMessage(&msg, router);
m_UpstreamQueue.pop_front();
auto & queue = item.second;
while(queue.size())
{
auto & msg = queue.front();
msg.S = path->NextSeqNo();
path->SendRoutingMessage(&msg, router);
queue.pop_front();
}
}
return true;
}

Loading…
Cancel
Save