new exit packet buffer format

pull/90/head
Jeff Becker 6 years ago
parent 7a63f5c85e
commit 79cffacafd
No known key found for this signature in database
GPG Key ID: F357B3B42F6F9B05

@ -745,6 +745,9 @@ transfer ip traffic
X: "<list of ip packet buffers>", X: "<list of ip packet buffers>",
} }
an ip packet buffer is prefixed with a 64 bit big endian unsigned integer
denoting the sequence number for re-ordering followed by the ip packet itself.
X is parsed as a alist of IP packet buffers. X is parsed as a alist of IP packet buffers.
for each ip packet the source addresss is extracted and sent on the for each ip packet the source addresss is extracted and sent on the
appropriate network interface. appropriate network interface.

@ -4,6 +4,7 @@
#include <llarp/crypto.hpp> #include <llarp/crypto.hpp>
#include <llarp/router.h> #include <llarp/router.h>
#include <llarp/path.hpp> #include <llarp/path.hpp>
#include <llarp/ip.hpp>
namespace llarp namespace llarp
{ {
@ -18,6 +19,8 @@ namespace llarp
/// persistant exit state for 1 identity on the exit node /// persistant exit state for 1 identity on the exit node
struct Endpoint struct Endpoint
{ {
static constexpr size_t MaxUpstreamQueueSize = 256;
Endpoint(const llarp::PubKey& remoteIdent, Endpoint(const llarp::PubKey& remoteIdent,
const llarp::PathID_t& beginPath, bool rewriteIP, huint32_t ip, const llarp::PathID_t& beginPath, bool rewriteIP, huint32_t ip,
llarp::handlers::ExitEndpoint* parent); llarp::handlers::ExitEndpoint* parent);
@ -47,14 +50,14 @@ namespace llarp
bool bool
QueueInboundTraffic(llarp_buffer_t buff); QueueInboundTraffic(llarp_buffer_t buff);
/// flush inbound traffic queue /// flush inbound and outbound traffic queues
bool bool
FlushInboundTraffic(); Flush();
/// send traffic to service node / internet /// queue outbound traffic
/// does ip rewrite here /// does ip rewrite here
bool bool
SendOutboundTraffic(llarp_buffer_t buf); QueueOutboundTraffic(llarp_buffer_t pkt, uint64_t counter);
/// update local path id and cascade information to parent /// update local path id and cascade information to parent
/// return true if success /// return true if success
@ -106,6 +109,25 @@ namespace llarp
using TieredQueue = std::map<uint8_t, InboundTrafficQueue_t>; using TieredQueue = std::map<uint8_t, InboundTrafficQueue_t>;
// maps number of fragments the message will fit in to the queue for it // maps number of fragments the message will fit in to the queue for it
TieredQueue m_DownstreamQueues; TieredQueue m_DownstreamQueues;
struct UpstreamBuffer
{
UpstreamBuffer(const llarp::net::IPv4Packet & p, uint64_t c) : pkt(p), counter(c)
{
}
llarp::net::IPv4Packet pkt;
uint64_t counter;
bool operator<(const UpstreamBuffer & other) const
{
return counter < other.counter;
}
};
using UpstreamQueue_t = std::priority_queue<UpstreamBuffer>;
UpstreamQueue_t m_UpstreamQueue;
uint64_t m_Counter;
}; };
} // namespace exit } // namespace exit
} // namespace llarp } // namespace llarp

@ -63,6 +63,7 @@ namespace llarp
using UpstreamTrafficQueue_t = std::deque<llarp::routing::TransferTrafficMessage>; using UpstreamTrafficQueue_t = std::deque<llarp::routing::TransferTrafficMessage>;
using TieredQueue_t = std::map<uint8_t, UpstreamTrafficQueue_t>; using TieredQueue_t = std::map<uint8_t, UpstreamTrafficQueue_t>;
TieredQueue_t m_Upstream; TieredQueue_t m_Upstream;
uint64_t m_Counter;
llarp::SecretKey m_ExitIdentity; llarp::SecretKey m_ExitIdentity;
}; };

@ -88,6 +88,9 @@ namespace llarp
llarp_buffer_t llarp_buffer_t
Buffer(); Buffer();
llarp_buffer_t
ConstBuffer() const;
bool bool
Load(llarp_buffer_t buf); Load(llarp_buffer_t buf);

@ -26,7 +26,7 @@ namespace llarp
/// append buffer to X /// append buffer to X
bool bool
PutBuffer(llarp_buffer_t buf); PutBuffer(llarp_buffer_t buf, uint64_t counter);
bool bool
BEncode(llarp_buffer_t* buf) const override; BEncode(llarp_buffer_t* buf) const override;

@ -290,7 +290,7 @@ namespace llarp
using ExitUpdatedFunc = std::function< bool(Path*) >; using ExitUpdatedFunc = std::function< bool(Path*) >;
using ExitClosedFunc = std::function< bool(Path*) >; using ExitClosedFunc = std::function< bool(Path*) >;
using ExitTrafficHandlerFunc = using ExitTrafficHandlerFunc =
std::function< bool(Path*, llarp_buffer_t) >; std::function< bool(Path*, llarp_buffer_t, uint64_t) >;
/// (path, backoff) backoff is 0 on success /// (path, backoff) backoff is 0 on success
using ObtainedExitHandler = std::function< bool(Path*, llarp_time_t) >; using ObtainedExitHandler = std::function< bool(Path*, llarp_time_t) >;

@ -13,6 +13,7 @@ namespace llarp
, m_CurrentPath(beginPath) , m_CurrentPath(beginPath)
, m_IP(ip) , m_IP(ip)
, m_RewriteSource(rewriteIP) , m_RewriteSource(rewriteIP)
, m_Counter(0)
{ {
m_LastActive = parent->Now(); m_LastActive = parent->Now();
} }
@ -76,22 +77,23 @@ namespace llarp
} }
bool bool
Endpoint::SendOutboundTraffic(llarp_buffer_t buf) Endpoint::QueueOutboundTraffic(llarp_buffer_t buf, uint64_t counter)
{ {
// queue overflow
if(m_UpstreamQueue.size() > MaxUpstreamQueueSize)
return false;
llarp::net::IPv4Packet pkt; llarp::net::IPv4Packet pkt;
if(!pkt.Load(buf)) if(!pkt.Load(buf))
return false; return false;
huint32_t dst; huint32_t dst;
if(m_RewriteSource) if(m_RewriteSource)
dst = m_Parent->GetIfAddr(); dst = m_Parent->GetIfAddr();
else else
dst = pkt.dst(); dst = pkt.dst();
pkt.UpdateIPv4PacketOnDst(m_IP, dst); pkt.UpdateIPv4PacketOnDst(m_IP, dst);
if(!m_Parent->QueueOutboundTraffic(pkt.Buffer())) m_UpstreamQueue.emplace(pkt, counter);
{
llarp::LogError("failed to queue outbound traffic");
return false;
}
m_TxRate += buf.sz; m_TxRate += buf.sz;
m_LastActive = m_Parent->Now(); m_LastActive = m_Parent->Now();
return true; return true;
@ -116,21 +118,28 @@ namespace llarp
if(queue.size() == 0) if(queue.size() == 0)
{ {
queue.emplace_back(); queue.emplace_back();
return queue.back().PutBuffer(buf); return queue.back().PutBuffer(buf, m_Counter++);
} }
auto & msg = queue.back(); auto & msg = queue.back();
if(msg.Size() + pktbuf.sz > llarp::routing::ExitPadSize) if(msg.Size() + pktbuf.sz > llarp::routing::ExitPadSize)
{ {
queue.emplace_back(); queue.emplace_back();
return queue.back().PutBuffer(pktbuf); return queue.back().PutBuffer(pktbuf, m_Counter++);
} }
else else
return msg.PutBuffer(pktbuf); return msg.PutBuffer(pktbuf, m_Counter++);
} }
bool bool
Endpoint::FlushInboundTraffic() Endpoint::Flush()
{ {
// flush upstream queue
while(m_UpstreamQueue.size())
{
m_Parent->QueueOutboundTraffic(m_UpstreamQueue.top().pkt.ConstBuffer());
m_UpstreamQueue.pop();
}
// flush downstream queue
auto path = GetCurrentPath(); auto path = GetCurrentPath();
bool sent = path != nullptr; bool sent = path != nullptr;
if(path) if(path)

@ -11,6 +11,7 @@ namespace llarp
: llarp::path::Builder(r, r->dht, numpaths, hoplen) : llarp::path::Builder(r, r->dht, numpaths, hoplen)
, m_ExitRouter(router) , m_ExitRouter(router)
, m_WritePacket(writepkt) , m_WritePacket(writepkt)
, m_Counter(0)
{ {
r->crypto.identity_keygen(m_ExitIdentity); r->crypto.identity_keygen(m_ExitIdentity);
} }
@ -108,17 +109,17 @@ namespace llarp
if(queue.size() == 0) if(queue.size() == 0)
{ {
queue.emplace_back(); queue.emplace_back();
return queue.back().PutBuffer(buf); return queue.back().PutBuffer(buf, m_Counter++);
} }
auto & back = queue.back(); auto & back = queue.back();
// pack to nearest N // pack to nearest N
if(back.Size() + buf.sz > N) if(back.Size() + buf.sz > N)
{ {
queue.emplace_back(); queue.emplace_back();
return queue.back().PutBuffer(buf); return queue.back().PutBuffer(buf, m_Counter++);
} }
else else
return back.PutBuffer(buf); return back.PutBuffer(buf, m_Counter++);
} }
bool bool

@ -1,5 +1,6 @@
#include <llarp/messages/transfer_traffic.hpp> #include <llarp/messages/transfer_traffic.hpp>
#include <llarp/routing/handler.hpp> #include <llarp/routing/handler.hpp>
#include <llarp/endian.hpp>
namespace llarp namespace llarp
{ {
@ -15,14 +16,17 @@ namespace llarp
} }
bool bool
TransferTrafficMessage::PutBuffer(llarp_buffer_t buf) TransferTrafficMessage::PutBuffer(llarp_buffer_t buf, uint64_t counter)
{ {
if(buf.sz > MaxExitMTU) if(buf.sz > MaxExitMTU)
return false; return false;
X.emplace_back(buf.sz); X.emplace_back(buf.sz + 8);
memcpy(X.back().data(), buf.base, buf.sz); byte_t * ptr = X.back().data();
// 8 bytes encoding overhead htobe64buf(ptr, counter);
_size += buf.sz + 8; ptr += 8;
memcpy(ptr, buf.base, buf.sz);
// 8 bytes encoding overhead and 8 bytes counter
_size += buf.sz + 16;
return true; return true;
} }

@ -90,7 +90,7 @@ namespace llarp
auto itr = m_ActiveExits.begin(); auto itr = m_ActiveExits.begin();
while(itr != m_ActiveExits.end()) while(itr != m_ActiveExits.end())
{ {
if(!itr->second->FlushInboundTraffic()) if(!itr->second->Flush())
{ {
llarp::LogWarn("exit session with ", itr->first, " dropped packets"); llarp::LogWarn("exit session with ", itr->first, " dropped packets");
} }

@ -23,6 +23,12 @@ namespace llarp
return true; return true;
} }
llarp_buffer_t
IPv4Packet::ConstBuffer() const
{
return llarp::InitBuffer(buf, sz);
}
llarp_buffer_t llarp_buffer_t
IPv4Packet::Buffer() IPv4Packet::Buffer()
{ {

@ -6,6 +6,7 @@
#include <llarp/messages/discard.hpp> #include <llarp/messages/discard.hpp>
#include "buffer.hpp" #include "buffer.hpp"
#include "router.hpp" #include "router.hpp"
#include <llarp/endian.hpp>
namespace llarp namespace llarp
{ {
@ -801,7 +802,12 @@ namespace llarp
return false; return false;
bool sent = msg->X.size() > 0; bool sent = msg->X.size() > 0;
for(const auto & pkt : msg->X) for(const auto & pkt : msg->X)
m_ExitTrafficHandler(this, pkt.Buffer()); {
if(pkt.size() <= 8)
return false;
uint64_t counter = bufbe64toh(pkt.data());
m_ExitTrafficHandler(this, llarp::InitBuffer(pkt.data() +8, pkt.size()-8), counter);
}
return sent; return sent;
} }

@ -3,6 +3,7 @@
#include <llarp/messages/discard.hpp> #include <llarp/messages/discard.hpp>
#include "buffer.hpp" #include "buffer.hpp"
#include "router.hpp" #include "router.hpp"
#include <llarp/endian.hpp>
namespace llarp namespace llarp
{ {
@ -259,7 +260,13 @@ namespace llarp
{ {
bool sent = true; bool sent = true;
for(const auto & pkt : msg->X) for(const auto & pkt : msg->X)
sent &= endpoint->SendOutboundTraffic(pkt.Buffer()); {
// check short packet buffer
if(pkt.size() <= 8)
continue;
uint64_t counter = bufbe64toh(pkt.data());
sent &= endpoint->QueueOutboundTraffic(llarp::InitBuffer(pkt.data() + 8, pkt.size() - 8), counter);
}
return sent; return sent;
} }
else else

@ -12,7 +12,7 @@ TEST_F(TransferTrafficTest, TestPutBufferOverflow)
TransferTrafficMessage msg; TransferTrafficMessage msg;
byte_t tmp[llarp::routing::MaxExitMTU * 2] = {0}; byte_t tmp[llarp::routing::MaxExitMTU * 2] = {0};
auto buf = llarp::StackBuffer< decltype(tmp) >(tmp); auto buf = llarp::StackBuffer< decltype(tmp) >(tmp);
ASSERT_FALSE(msg.PutBuffer(buf)); ASSERT_FALSE(msg.PutBuffer(buf, 1));
}; };
TEST_F(TransferTrafficTest, TestPutBuffer) TEST_F(TransferTrafficTest, TestPutBuffer)
@ -20,5 +20,5 @@ TEST_F(TransferTrafficTest, TestPutBuffer)
TransferTrafficMessage msg; TransferTrafficMessage msg;
byte_t tmp[llarp::routing::MaxExitMTU] = {0}; byte_t tmp[llarp::routing::MaxExitMTU] = {0};
auto buf = llarp::StackBuffer< decltype(tmp) >(tmp); auto buf = llarp::StackBuffer< decltype(tmp) >(tmp);
ASSERT_TRUE(msg.PutBuffer(buf)); ASSERT_TRUE(msg.PutBuffer(buf, 1));
}; };

Loading…
Cancel
Save