pack exit traffic

pull/90/head
Jeff Becker 6 years ago
parent d25d35c06e
commit e6037ff060
No known key found for this signature in database
GPG Key ID: F357B3B42F6F9B05

@ -742,10 +742,11 @@ transfer ip traffic
A: "I",
S: uint64_sequence_number,
V: 0,
X: "<N bytes ip packet>"
X: "<list of ip packet buffers>",
}
X is parsed as an IP packet and the source addresss is extracted and sent on the
X is parsed as a alist of IP packet buffers.
for each ip packet the source addresss is extracted and sent on the
appropriate network interface.
When we recieve an ip packet from the internet to an exit address, we put it

@ -167,7 +167,7 @@ namespace llarp
Key_t askpeer;
if(!nodes->FindClosest(target.data(), askpeer))
return false;
LookupRouterRecursive(target, OurKey(), ++ids, askpeer, result);
LookupRouterRecursive(target, OurKey(), 0, askpeer, result);
return true;
}

@ -12,7 +12,7 @@ namespace llarp
/// encrypted buffer base type
struct Encrypted
{
Encrypted(Encrypted&&) = delete;
Encrypted(Encrypted&& other);
Encrypted(const Encrypted& other);
Encrypted();
Encrypted(const byte_t* buf, size_t sz);

@ -43,9 +43,13 @@ namespace llarp
void
Tick(llarp_time_t now);
/// handle traffic from service node / internet
/// queue traffic from service node / internet to be transmitted
bool
SendInboundTraffic(llarp_buffer_t buff);
QueueInboundTraffic(llarp_buffer_t buff);
/// flush inbound traffic queue
bool
FlushInboundTraffic();
/// send traffic to service node / internet
/// does ip rewrite here
@ -98,6 +102,8 @@ namespace llarp
uint64_t m_TxRate, m_RxRate;
llarp_time_t m_LastActive;
bool m_RewriteSource;
using InboundTrafficQueue_t = std::deque<llarp::routing::TransferTrafficMessage>;
InboundTrafficQueue_t m_DownstreamQueue;
};
} // namespace exit
} // namespace llarp

@ -2,6 +2,8 @@
#define LLARP_EXIT_SESSION_HPP
#include <llarp/pathbuilder.hpp>
#include <llarp/ip.hpp>
#include <llarp/messages/transfer_traffic.hpp>
#include <deque>
namespace llarp
{
@ -27,7 +29,11 @@ namespace llarp
HandlePathBuilt(llarp::path::Path* p) override;
bool
SendUpstreamTraffic(llarp::net::IPv4Packet pkt);
QueueUpstreamTraffic(llarp::net::IPv4Packet pkt, const size_t packSize);
bool
FlushUpstreamTraffic();
protected:
llarp::RouterID m_ExitRouter;
@ -43,7 +49,9 @@ namespace llarp
bool
HandleTraffic(llarp::path::Path* p, llarp_buffer_t buf);
private:
private:
using UpstreamTrafficQueue_t = std::deque<llarp::routing::TransferTrafficMessage>;
UpstreamTrafficQueue_t m_UpstreamQueue;
llarp::SecretKey m_ExitIdentity;
};

@ -85,7 +85,7 @@ namespace llarp
GetIfAddr() const;
void
FlushInbound();
Flush();
private:
huint32_t

@ -1,6 +1,7 @@
#ifndef LLARP_MESSAGES_TRANSFER_TRAFFIC_HPP
#define LLARP_MESSAGES_TRANSFER_TRAFFIC_HPP
#include <llarp/routing/message.hpp>
#include <llarp/encrypted.hpp>
#include <llarp/crypto.hpp>
#include <vector>
@ -11,11 +12,18 @@ namespace llarp
constexpr size_t MaxExitMTU = 1500;
struct TransferTrafficMessage final : public IMessage
{
std::vector< byte_t > X;
std::vector<llarp::Encrypted> X;
size_t _size = 0;
size_t Size() const
{
return _size;
}
TransferTrafficMessage&
operator=(const TransferTrafficMessage& other);
/// append buffer to X
bool
PutBuffer(llarp_buffer_t buf);

@ -800,7 +800,7 @@ namespace llarp
RouterLookupHandler handler)
{
TXOwner asker(whoasked, txid);
TXOwner peer(askpeer, txid);
TXOwner peer(askpeer, ++ids);
if(target != askpeer)
{
pendingRouterLookups.NewTX(

@ -10,6 +10,12 @@ namespace llarp
UpdateBuffer();
}
Encrypted::Encrypted(Encrypted && other)
{
_data = std::move(other._data);
UpdateBuffer();
}
Encrypted::Encrypted(const Encrypted& other)
: Encrypted(other.data(), other.size())
{

@ -98,12 +98,9 @@ namespace llarp
}
bool
Endpoint::SendInboundTraffic(llarp_buffer_t buf)
Endpoint::QueueInboundTraffic(llarp_buffer_t buf)
{
auto path = GetCurrentPath();
if(path)
{
llarp::net::IPv4Packet pkt;
llarp::net::IPv4Packet pkt;
if(!pkt.Load(buf))
return false;
@ -114,16 +111,37 @@ namespace llarp
src = pkt.src();
pkt.UpdateIPv4PacketOnDst(src, m_IP);
llarp::routing::TransferTrafficMessage msg;
if(!msg.PutBuffer(pkt.Buffer()))
return false;
msg.S = path->NextSeqNo();
if(!path->SendRoutingMessage(&msg, m_Parent->Router()))
return false;
m_RxRate += buf.sz;
return true;
if(m_DownstreamQueue.size() == 0)
m_DownstreamQueue.emplace_back();
auto pktbuf = pkt.Buffer();
auto & msg = m_DownstreamQueue.back();
if(msg.Size() + pktbuf.sz > 1024)
{
m_DownstreamQueue.emplace_back();
return m_DownstreamQueue.back().PutBuffer(pktbuf);
}
else
return msg.PutBuffer(pktbuf);
}
bool Endpoint::FlushInboundTraffic()
{
auto path = GetCurrentPath();
bool sent = false;
if(path)
{
for(auto & msg : m_DownstreamQueue)
{
msg.S = path->NextSeqNo();
if(path->SendRoutingMessage(&msg, m_Parent->Router()))
{
m_RxRate += msg.Size();
sent = true;
}
}
}
return false;
m_DownstreamQueue.clear();
return sent;
}
llarp::path::IHopHandler*

@ -101,16 +101,39 @@ namespace llarp
}
bool
BaseSession::SendUpstreamTraffic(llarp::net::IPv4Packet pkt)
BaseSession::QueueUpstreamTraffic(llarp::net::IPv4Packet pkt, const size_t N)
{
if(m_UpstreamQueue.size() == 0)
m_UpstreamQueue.emplace_back();
auto & back = m_UpstreamQueue.back();
auto buf = pkt.Buffer();
// pack to nearest N
if(back.Size() + buf.sz > N)
{
m_UpstreamQueue.emplace_back();
return m_UpstreamQueue.back().PutBuffer(buf);
}
else
return back.PutBuffer(buf);
}
bool BaseSession::FlushUpstreamTraffic()
{
auto path = PickRandomEstablishedPath(llarp::path::ePathRoleExit);
if(!path)
{
// discard
m_UpstreamQueue.clear();
return false;
llarp::routing::TransferTrafficMessage transfer;
transfer.S = path->NextSeqNo();
if(!transfer.PutBuffer(pkt.Buffer()))
return false;
return path->SendRoutingMessage(&transfer, router);
}
while(m_UpstreamQueue.size())
{
auto & msg = m_UpstreamQueue.front();
msg.S = path->NextSeqNo();
path->SendRoutingMessage(&msg, router);
m_UpstreamQueue.pop_front();
}
return true;
}
} // namespace exit

@ -19,8 +19,10 @@ namespace llarp
{
if(buf.sz > MaxExitMTU)
return false;
X.resize(buf.sz);
memcpy(X.data(), buf.base, buf.sz);
X.emplace_back(buf.sz);
memcpy(X.back().data(), buf.base, buf.sz);
// 8 bytes encoding overhead
_size += buf.sz + 8;
return true;
}
@ -35,10 +37,7 @@ namespace llarp
return false;
if(!BEncodeWriteDictInt("V", version, buf))
return false;
if(!bencode_write_bytestring(buf, "X", 1))
return false;
if(!bencode_write_bytestring(buf, X.data(), X.size()))
if(!BEncodeWriteDictList("X", X, buf))
return false;
return bencode_end(buf);
}
@ -51,13 +50,8 @@ namespace llarp
return false;
if(!BEncodeMaybeReadDictInt("V", version, read, key, buf))
return false;
if(llarp_buffer_eq(key, "X"))
{
llarp_buffer_t strbuf;
if(!bencode_read_string(buf, &strbuf))
return false;
return PutBuffer(strbuf);
}
if(!BEncodeMaybeReadDictList("X", X, read, key, buf))
return false;
return read;
}

@ -15,9 +15,9 @@ namespace llarp
llarp::InitBuffer(pkt, sz));
}
static void
ExitHandlerFlushInbound(llarp_tun_io *tun)
ExitHandlerFlush(llarp_tun_io *tun)
{
static_cast< ExitEndpoint * >(tun->user)->FlushInbound();
static_cast< ExitEndpoint * >(tun->user)->Flush();
}
ExitEndpoint::ExitEndpoint(const std::string &name, llarp_router *r)
@ -29,7 +29,7 @@ namespace llarp
{
m_Tun.user = this;
m_Tun.recvpkt = &ExitHandlerRecvPkt;
m_Tun.tick = &ExitHandlerFlushInbound;
m_Tun.tick = &ExitHandlerFlush;
m_ShouldInitTun = true;
}
@ -44,7 +44,7 @@ namespace llarp
}
void
ExitEndpoint::FlushInbound()
ExitEndpoint::Flush()
{
auto now = Now();
m_InetToNetwork.Process([&](Pkt_t &pkt) {
@ -82,12 +82,21 @@ namespace llarp
}
else
{
if(!ep->SendInboundTraffic(pkt.Buffer()))
if(!ep->QueueInboundTraffic(pkt.Buffer()))
{
llarp::LogWarn(Name(), " dropped inbound traffic for session ", pk, " as we are overloaded (probably)");
}
}
});
auto itr = m_ActiveExits.begin();
while(itr != m_ActiveExits.end())
{
if(!itr->second->FlushInboundTraffic())
{
llarp::LogWarn("exit session with ", itr->first, " dropped packets");
}
++itr;
}
}
bool

@ -350,7 +350,7 @@ namespace llarp
if(m_Exit)
{
pkt.UpdateIPv4PacketOnDst({0}, pkt.dst());
m_Exit->SendUpstreamTraffic(std::move(pkt));
m_Exit->QueueUpstreamTraffic(std::move(pkt), 1024);
}
else
llarp::LogWarn(Name(), " has no endpoint for ", pkt.dst());
@ -368,6 +368,8 @@ namespace llarp
}
return true;
});
if(m_Exit)
m_Exit->FlushUpstreamTraffic();
}
bool

Loading…
Cancel
Save