order packets when writing to network interface (#1372)

* order packet writes on userside

* make it compile

* fix pybind
pull/1401/head
Jeff 4 years ago committed by GitHub
parent b9f6af760b
commit 50aea744f6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -16,7 +16,7 @@ namespace llarp
virtual bool
HandleInboundPacket(
const service::ConvoTag, const llarp_buffer_t&, service::ProtocolType) override
const service::ConvoTag, const llarp_buffer_t&, service::ProtocolType, uint64_t) override
{
return true;
}

@ -31,10 +31,14 @@ namespace llarp
namespace handlers
{
void
TunEndpoint::FlushToUser(std::function<bool(net::IPPacket&)> send)
TunEndpoint::FlushToUser(std::function<bool(const net::IPPacket&)> send)
{
// flush network to user
m_NetworkToUserPktQueue.Process(send);
while (not m_NetworkToUserPktQueue.empty())
{
send(m_NetworkToUserPktQueue.top().pkt);
m_NetworkToUserPktQueue.pop();
}
}
bool
@ -54,7 +58,6 @@ namespace llarp
TunEndpoint::TunEndpoint(AbstractRouter* r, service::Context* parent, bool lazyVPN)
: service::Endpoint(r, parent)
, m_UserToNetworkPktQueue("endpoint_sendq", r->netloop(), r->netloop())
, m_NetworkToUserPktQueue("endpoint_recvq", r->netloop(), r->netloop())
, m_Resolver(std::make_shared<dns::Proxy>(
r->netloop(), r->logic(), r->netloop(), r->logic(), this))
{
@ -223,13 +226,6 @@ namespace llarp
return m_IPToAddr.find(ip) != m_IPToAddr.end();
}
bool
TunEndpoint::QueueOutboundTraffic(llarp::net::IPPacket&& pkt)
{
return m_NetworkToUserPktQueue.EmplaceIf(
[](llarp::net::IPPacket&) -> bool { return true; }, std::move(pkt));
}
void
TunEndpoint::Flush()
{
@ -669,7 +665,7 @@ namespace llarp
llarp::LogInfo(Name(), " got vpn interface");
auto self = shared_from_this();
// function to queue a packet to send to vpn interface
auto sendpkt = [self](net::IPPacket& pkt) -> bool {
auto sendpkt = [self](const net::IPPacket& pkt) -> bool {
// drop if no endpoint
auto impl = self->GetVPNImpl();
// drop if no vpn interface
@ -860,7 +856,7 @@ namespace llarp
const auto icmp = pkt.MakeICMPUnreachable();
if (icmp.has_value())
{
HandleWriteIPPacket(icmp->ConstBuffer(), dst, src);
HandleWriteIPPacket(icmp->ConstBuffer(), dst, src, 0);
}
}
else
@ -927,7 +923,10 @@ namespace llarp
bool
TunEndpoint::HandleInboundPacket(
const service::ConvoTag tag, const llarp_buffer_t& buf, service::ProtocolType t)
const service::ConvoTag tag,
const llarp_buffer_t& buf,
service::ProtocolType t,
uint64_t seqno)
{
if (t != service::eProtocolTrafficV4 && t != service::eProtocolTrafficV6
&& t != service::eProtocolExit)
@ -972,28 +971,31 @@ namespace llarp
src = ObtainIPForAddr(addr, snode);
dst = m_OurIP;
}
HandleWriteIPPacket(buf, src, dst);
HandleWriteIPPacket(buf, src, dst, seqno);
return true;
}
bool
TunEndpoint::HandleWriteIPPacket(const llarp_buffer_t& b, huint128_t src, huint128_t dst)
TunEndpoint::HandleWriteIPPacket(
const llarp_buffer_t& b, huint128_t src, huint128_t dst, uint64_t seqno)
{
ManagedBuffer buf(b);
return m_NetworkToUserPktQueue.EmplaceIf([buf, src, dst](net::IPPacket& pkt) -> bool {
// load
if (!pkt.Load(buf))
return false;
if (pkt.IsV4())
{
pkt.UpdateIPv4Address(xhtonl(net::TruncateV6(src)), xhtonl(net::TruncateV6(dst)));
}
else if (pkt.IsV6())
{
pkt.UpdateIPv6Address(src, dst);
}
return true;
});
WritePacket write;
write.seqno = seqno;
auto& pkt = write.pkt;
// load
if (!pkt.Load(buf))
return false;
if (pkt.IsV4())
{
pkt.UpdateIPv4Address(xhtonl(net::TruncateV6(src)), xhtonl(net::TruncateV6(dst)));
}
else if (pkt.IsV6())
{
pkt.UpdateIPv6Address(src, dst);
}
m_NetworkToUserPktQueue.push(std::move(write));
return true;
}
huint128_t
@ -1097,8 +1099,8 @@ namespace llarp
// called in the isolated network thread
auto* self = static_cast<TunEndpoint*>(tun->user);
self->Flush();
self->FlushToUser([self, tun](net::IPPacket& pkt) -> bool {
if (not llarp_ev_tun_async_write(tun, pkt.Buffer()))
self->FlushToUser([self, tun](const net::IPPacket& pkt) -> bool {
if (not llarp_ev_tun_async_write(tun, pkt.ConstBuffer()))
{
llarp::LogWarn(self->Name(), " packet dropped");
}

@ -12,6 +12,7 @@
#include <util/thread/threading.hpp>
#include <future>
#include <queue>
namespace llarp
{
@ -87,11 +88,15 @@ namespace llarp
/// overrides Endpoint
bool
HandleInboundPacket(
const service::ConvoTag tag, const llarp_buffer_t& pkt, service::ProtocolType t) override;
const service::ConvoTag tag,
const llarp_buffer_t& pkt,
service::ProtocolType t,
uint64_t seqno) override;
/// handle inbound traffic
bool
HandleWriteIPPacket(const llarp_buffer_t& buf, huint128_t src, huint128_t dst);
HandleWriteIPPacket(
const llarp_buffer_t& buf, huint128_t src, huint128_t dst, uint64_t seqno);
/// queue outbound packet to the world
bool
@ -190,8 +195,21 @@ namespace llarp
/// queue for sending packets over the network from us
PacketQueue_t m_UserToNetworkPktQueue;
struct WritePacket
{
uint64_t seqno;
net::IPPacket pkt;
bool
operator<(const WritePacket& other) const
{
return other.seqno < seqno;
}
};
/// queue for sending packets to user from network
PacketQueue_t m_NetworkToUserPktQueue;
std::priority_queue<WritePacket> m_NetworkToUserPktQueue;
/// return true if we have a remote loki address for this ip address
bool
HasRemoteForIP(huint128_t ipv4) const;
@ -279,7 +297,7 @@ namespace llarp
/// send function returns true to indicate stop iteration and do codel
/// drop
void
FlushToUser(std::function<bool(net::IPPacket&)> sendfunc);
FlushToUser(std::function<bool(const net::IPPacket&)> sendfunc);
};
} // namespace handlers

@ -1189,7 +1189,7 @@ namespace llarp
return false;
pkt.UpdateIPv4Address(src, dst);
/// TODO: V6
return HandleInboundPacket(tag, pkt.ConstBuffer(), eProtocolTrafficV4);
return HandleInboundPacket(tag, pkt.ConstBuffer(), eProtocolTrafficV4, 0);
},
Router(),
numPaths,
@ -1244,7 +1244,7 @@ namespace llarp
{
auto msg = queue.popFront();
const llarp_buffer_t buf(msg->payload);
HandleInboundPacket(msg->tag, buf, msg->proto);
HandleInboundPacket(msg->tag, buf, msg->proto, msg->seqno);
}
};
@ -1352,7 +1352,7 @@ namespace llarp
PutReplyIntroFor(f.T, m->introReply);
m->sender = m_Identity.pub;
m->seqno = GetSeqNoForConvo(f.T);
f.S = 1;
f.S = m->seqno;
f.F = m->introReply.pathID;
transfer->P = remoteIntro.pathID;
auto self = this;
@ -1420,7 +1420,8 @@ namespace llarp
auto itr = Sessions().find(tag);
if (itr == Sessions().end())
return 0;
return ++(itr->second.seqno);
itr->second.seqno += 1;
return itr->second.seqno;
}
bool

@ -208,7 +208,8 @@ namespace llarp
/// handle packet io from service node or hidden service to frontend
virtual bool
HandleInboundPacket(const ConvoTag tag, const llarp_buffer_t& pkt, ProtocolType t) = 0;
HandleInboundPacket(
const ConvoTag tag, const llarp_buffer_t& pkt, ProtocolType t, uint64_t seqno) = 0;
// virtual bool
// HandleWriteIPPacket(const llarp_buffer_t& pkt,

@ -25,7 +25,8 @@ namespace llarp
HandleInboundPacket(
const service::ConvoTag tag,
const llarp_buffer_t& pktbuf,
service::ProtocolType proto) override
service::ProtocolType proto,
uint64_t) override
{
if (handlePacket)
{

Loading…
Cancel
Save