From 79cffacafd3493afdc0ac1a6eabf941f52b39704 Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Thu, 29 Nov 2018 16:19:20 -0500 Subject: [PATCH] new exit packet buffer format --- docs/proto_v0.txt | 3 +++ include/llarp/exit/endpoint.hpp | 30 ++++++++++++++++++--- include/llarp/exit/session.hpp | 1 + include/llarp/ip.hpp | 3 +++ include/llarp/messages/transfer_traffic.hpp | 2 +- include/llarp/path.hpp | 2 +- llarp/exit/endpoint.cpp | 29 +++++++++++++------- llarp/exit/session.cpp | 7 ++--- llarp/exit/transfer_traffic.cpp | 14 ++++++---- llarp/handlers/exit.cpp | 2 +- llarp/ip.cpp | 6 +++++ llarp/path.cpp | 8 +++++- llarp/transit_hop.cpp | 9 ++++++- test/traffic_transfer_unittest.cpp | 4 +-- 14 files changed, 91 insertions(+), 29 deletions(-) diff --git a/docs/proto_v0.txt b/docs/proto_v0.txt index 086c3f68e..503bcd874 100644 --- a/docs/proto_v0.txt +++ b/docs/proto_v0.txt @@ -745,6 +745,9 @@ transfer ip traffic X: "", } +an ip packet buffer is prefixed with a 64 bit big endian unsigned integer +denoting the sequence number for re-ordering followed by the ip packet itself. + X is parsed as a alist of IP packet buffers. for each ip packet the source addresss is extracted and sent on the appropriate network interface. diff --git a/include/llarp/exit/endpoint.hpp b/include/llarp/exit/endpoint.hpp index 3bb3fd9db..ad0a54cac 100644 --- a/include/llarp/exit/endpoint.hpp +++ b/include/llarp/exit/endpoint.hpp @@ -4,6 +4,7 @@ #include #include #include +#include namespace llarp { @@ -18,6 +19,8 @@ namespace llarp /// persistant exit state for 1 identity on the exit node struct Endpoint { + static constexpr size_t MaxUpstreamQueueSize = 256; + Endpoint(const llarp::PubKey& remoteIdent, const llarp::PathID_t& beginPath, bool rewriteIP, huint32_t ip, llarp::handlers::ExitEndpoint* parent); @@ -47,14 +50,14 @@ namespace llarp bool QueueInboundTraffic(llarp_buffer_t buff); - /// flush inbound traffic queue + /// flush inbound and outbound traffic queues bool - FlushInboundTraffic(); + Flush(); - /// send traffic to service node / internet + /// queue outbound traffic /// does ip rewrite here bool - SendOutboundTraffic(llarp_buffer_t buf); + QueueOutboundTraffic(llarp_buffer_t pkt, uint64_t counter); /// update local path id and cascade information to parent /// return true if success @@ -106,6 +109,25 @@ namespace llarp using TieredQueue = std::map; // maps number of fragments the message will fit in to the queue for it TieredQueue m_DownstreamQueues; + + struct UpstreamBuffer + { + UpstreamBuffer(const llarp::net::IPv4Packet & p, uint64_t c) : pkt(p), counter(c) + { + } + + llarp::net::IPv4Packet pkt; + uint64_t counter; + + bool operator<(const UpstreamBuffer & other) const + { + return counter < other.counter; + } + }; + + using UpstreamQueue_t = std::priority_queue; + UpstreamQueue_t m_UpstreamQueue; + uint64_t m_Counter; }; } // namespace exit } // namespace llarp diff --git a/include/llarp/exit/session.hpp b/include/llarp/exit/session.hpp index f3e5ea8cc..43b35dfb4 100644 --- a/include/llarp/exit/session.hpp +++ b/include/llarp/exit/session.hpp @@ -63,6 +63,7 @@ namespace llarp using UpstreamTrafficQueue_t = std::deque; using TieredQueue_t = std::map; TieredQueue_t m_Upstream; + uint64_t m_Counter; llarp::SecretKey m_ExitIdentity; }; diff --git a/include/llarp/ip.hpp b/include/llarp/ip.hpp index b09aaf89e..7b6a1a62a 100644 --- a/include/llarp/ip.hpp +++ b/include/llarp/ip.hpp @@ -88,6 +88,9 @@ namespace llarp llarp_buffer_t Buffer(); + llarp_buffer_t + ConstBuffer() const; + bool Load(llarp_buffer_t buf); diff --git a/include/llarp/messages/transfer_traffic.hpp b/include/llarp/messages/transfer_traffic.hpp index 54f532abc..c230526ac 100644 --- a/include/llarp/messages/transfer_traffic.hpp +++ b/include/llarp/messages/transfer_traffic.hpp @@ -26,7 +26,7 @@ namespace llarp /// append buffer to X bool - PutBuffer(llarp_buffer_t buf); + PutBuffer(llarp_buffer_t buf, uint64_t counter); bool BEncode(llarp_buffer_t* buf) const override; diff --git a/include/llarp/path.hpp b/include/llarp/path.hpp index f357ac161..73b19dad8 100644 --- a/include/llarp/path.hpp +++ b/include/llarp/path.hpp @@ -290,7 +290,7 @@ namespace llarp using ExitUpdatedFunc = std::function< bool(Path*) >; using ExitClosedFunc = std::function< bool(Path*) >; using ExitTrafficHandlerFunc = - std::function< bool(Path*, llarp_buffer_t) >; + std::function< bool(Path*, llarp_buffer_t, uint64_t) >; /// (path, backoff) backoff is 0 on success using ObtainedExitHandler = std::function< bool(Path*, llarp_time_t) >; diff --git a/llarp/exit/endpoint.cpp b/llarp/exit/endpoint.cpp index f8e8fd747..9d9be1f2a 100644 --- a/llarp/exit/endpoint.cpp +++ b/llarp/exit/endpoint.cpp @@ -13,6 +13,7 @@ namespace llarp , m_CurrentPath(beginPath) , m_IP(ip) , m_RewriteSource(rewriteIP) + , m_Counter(0) { m_LastActive = parent->Now(); } @@ -76,22 +77,23 @@ namespace llarp } bool - Endpoint::SendOutboundTraffic(llarp_buffer_t buf) + Endpoint::QueueOutboundTraffic(llarp_buffer_t buf, uint64_t counter) { + // queue overflow + if(m_UpstreamQueue.size() > MaxUpstreamQueueSize) + return false; + llarp::net::IPv4Packet pkt; if(!pkt.Load(buf)) return false; + huint32_t dst; if(m_RewriteSource) dst = m_Parent->GetIfAddr(); else dst = pkt.dst(); pkt.UpdateIPv4PacketOnDst(m_IP, dst); - if(!m_Parent->QueueOutboundTraffic(pkt.Buffer())) - { - llarp::LogError("failed to queue outbound traffic"); - return false; - } + m_UpstreamQueue.emplace(pkt, counter); m_TxRate += buf.sz; m_LastActive = m_Parent->Now(); return true; @@ -116,21 +118,28 @@ namespace llarp if(queue.size() == 0) { queue.emplace_back(); - return queue.back().PutBuffer(buf); + return queue.back().PutBuffer(buf, m_Counter++); } auto & msg = queue.back(); if(msg.Size() + pktbuf.sz > llarp::routing::ExitPadSize) { queue.emplace_back(); - return queue.back().PutBuffer(pktbuf); + return queue.back().PutBuffer(pktbuf, m_Counter++); } else - return msg.PutBuffer(pktbuf); + return msg.PutBuffer(pktbuf, m_Counter++); } bool - Endpoint::FlushInboundTraffic() + Endpoint::Flush() { + // flush upstream queue + while(m_UpstreamQueue.size()) + { + m_Parent->QueueOutboundTraffic(m_UpstreamQueue.top().pkt.ConstBuffer()); + m_UpstreamQueue.pop(); + } + // flush downstream queue auto path = GetCurrentPath(); bool sent = path != nullptr; if(path) diff --git a/llarp/exit/session.cpp b/llarp/exit/session.cpp index 3c640d508..9fddd433b 100644 --- a/llarp/exit/session.cpp +++ b/llarp/exit/session.cpp @@ -11,6 +11,7 @@ namespace llarp : llarp::path::Builder(r, r->dht, numpaths, hoplen) , m_ExitRouter(router) , m_WritePacket(writepkt) + , m_Counter(0) { r->crypto.identity_keygen(m_ExitIdentity); } @@ -108,17 +109,17 @@ namespace llarp if(queue.size() == 0) { queue.emplace_back(); - return queue.back().PutBuffer(buf); + return queue.back().PutBuffer(buf, m_Counter++); } auto & back = queue.back(); // pack to nearest N if(back.Size() + buf.sz > N) { queue.emplace_back(); - return queue.back().PutBuffer(buf); + return queue.back().PutBuffer(buf, m_Counter++); } else - return back.PutBuffer(buf); + return back.PutBuffer(buf, m_Counter++); } bool diff --git a/llarp/exit/transfer_traffic.cpp b/llarp/exit/transfer_traffic.cpp index 2370c504a..d7d530d80 100644 --- a/llarp/exit/transfer_traffic.cpp +++ b/llarp/exit/transfer_traffic.cpp @@ -1,5 +1,6 @@ #include #include +#include namespace llarp { @@ -15,14 +16,17 @@ namespace llarp } bool - TransferTrafficMessage::PutBuffer(llarp_buffer_t buf) + TransferTrafficMessage::PutBuffer(llarp_buffer_t buf, uint64_t counter) { if(buf.sz > MaxExitMTU) return false; - X.emplace_back(buf.sz); - memcpy(X.back().data(), buf.base, buf.sz); - // 8 bytes encoding overhead - _size += buf.sz + 8; + X.emplace_back(buf.sz + 8); + byte_t * ptr = X.back().data(); + htobe64buf(ptr, counter); + ptr += 8; + memcpy(ptr, buf.base, buf.sz); + // 8 bytes encoding overhead and 8 bytes counter + _size += buf.sz + 16; return true; } diff --git a/llarp/handlers/exit.cpp b/llarp/handlers/exit.cpp index c56ba614d..00c5ed26c 100644 --- a/llarp/handlers/exit.cpp +++ b/llarp/handlers/exit.cpp @@ -90,7 +90,7 @@ namespace llarp auto itr = m_ActiveExits.begin(); while(itr != m_ActiveExits.end()) { - if(!itr->second->FlushInboundTraffic()) + if(!itr->second->Flush()) { llarp::LogWarn("exit session with ", itr->first, " dropped packets"); } diff --git a/llarp/ip.cpp b/llarp/ip.cpp index 321e3bf16..9ad9de41f 100644 --- a/llarp/ip.cpp +++ b/llarp/ip.cpp @@ -23,6 +23,12 @@ namespace llarp return true; } + llarp_buffer_t + IPv4Packet::ConstBuffer() const + { + return llarp::InitBuffer(buf, sz); + } + llarp_buffer_t IPv4Packet::Buffer() { diff --git a/llarp/path.cpp b/llarp/path.cpp index 78a9d636d..e90f8687e 100644 --- a/llarp/path.cpp +++ b/llarp/path.cpp @@ -6,6 +6,7 @@ #include #include "buffer.hpp" #include "router.hpp" +#include namespace llarp { @@ -801,7 +802,12 @@ namespace llarp return false; bool sent = msg->X.size() > 0; for(const auto & pkt : msg->X) - m_ExitTrafficHandler(this, pkt.Buffer()); + { + if(pkt.size() <= 8) + return false; + uint64_t counter = bufbe64toh(pkt.data()); + m_ExitTrafficHandler(this, llarp::InitBuffer(pkt.data() +8, pkt.size()-8), counter); + } return sent; } diff --git a/llarp/transit_hop.cpp b/llarp/transit_hop.cpp index f572b833b..f93d28017 100644 --- a/llarp/transit_hop.cpp +++ b/llarp/transit_hop.cpp @@ -3,6 +3,7 @@ #include #include "buffer.hpp" #include "router.hpp" +#include namespace llarp { @@ -259,7 +260,13 @@ namespace llarp { bool sent = true; for(const auto & pkt : msg->X) - sent &= endpoint->SendOutboundTraffic(pkt.Buffer()); + { + // check short packet buffer + if(pkt.size() <= 8) + continue; + uint64_t counter = bufbe64toh(pkt.data()); + sent &= endpoint->QueueOutboundTraffic(llarp::InitBuffer(pkt.data() + 8, pkt.size() - 8), counter); + } return sent; } else diff --git a/test/traffic_transfer_unittest.cpp b/test/traffic_transfer_unittest.cpp index 6556650dc..dc6204eae 100644 --- a/test/traffic_transfer_unittest.cpp +++ b/test/traffic_transfer_unittest.cpp @@ -12,7 +12,7 @@ TEST_F(TransferTrafficTest, TestPutBufferOverflow) TransferTrafficMessage msg; byte_t tmp[llarp::routing::MaxExitMTU * 2] = {0}; auto buf = llarp::StackBuffer< decltype(tmp) >(tmp); - ASSERT_FALSE(msg.PutBuffer(buf)); + ASSERT_FALSE(msg.PutBuffer(buf, 1)); }; TEST_F(TransferTrafficTest, TestPutBuffer) @@ -20,5 +20,5 @@ TEST_F(TransferTrafficTest, TestPutBuffer) TransferTrafficMessage msg; byte_t tmp[llarp::routing::MaxExitMTU] = {0}; auto buf = llarp::StackBuffer< decltype(tmp) >(tmp); - ASSERT_TRUE(msg.PutBuffer(buf)); + ASSERT_TRUE(msg.PutBuffer(buf, 1)); };