From f86a2daf83a8f17d3dd97401c1fecbc032f2b18c Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Wed, 31 Mar 2021 06:57:06 -0400 Subject: [PATCH] fixes * Add service::Endpoint::HasOutboundConvo * dont mark outbound convos as inbound * order quic packets --- llarp/handlers/null.hpp | 31 ++++++++++++++++++++++++++++--- llarp/quic/tunnel.cpp | 8 ++++---- llarp/quic/tunnel.hpp | 2 +- llarp/rpc/rpc_server.cpp | 9 +++++++-- llarp/service/endpoint.cpp | 14 +++++++++++++- llarp/service/endpoint.hpp | 3 +++ llarp/service/handler.hpp | 3 +++ 7 files changed, 59 insertions(+), 11 deletions(-) diff --git a/llarp/handlers/null.hpp b/llarp/handlers/null.hpp index 20f17a855..496c8122b 100644 --- a/llarp/handlers/null.hpp +++ b/llarp/handlers/null.hpp @@ -16,15 +16,37 @@ namespace llarp NullEndpoint(AbstractRouter* r, llarp::service::Context* parent) : llarp::service::Endpoint(r, parent) { - r->loop()->add_ticker([this] { Pump(Now()); }); + r->loop()->add_ticker([this] { + while (not m_InboundQuic.empty()) + { + LogInfo(m_InboundQuic.top().seqno); + m_InboundQuic.top().process(); + m_InboundQuic.pop(); + } + Pump(Now()); + }); } + struct QUICEvent + { + uint64_t seqno; + std::function process; + + bool + operator<(const QUICEvent& other) const + { + return other.seqno < seqno; + } + }; + + std::priority_queue m_InboundQuic; + virtual bool HandleInboundPacket( const service::ConvoTag tag, const llarp_buffer_t& buf, service::ProtocolType t, - uint64_t) override + uint64_t seqno) override { if (t == service::ProtocolType::Control) return true; @@ -44,7 +66,10 @@ namespace llarp return false; } MarkConvoTagActive(tag); - quic->receive_packet(tag, buf); + std::vector copy; + copy.resize(buf.sz); + std::copy_n(buf.base, buf.sz, copy.data()); + m_InboundQuic.push({seqno, [quic, buf = copy, tag]() { quic->receive_packet(tag, buf); }}); m_router->loop()->wakeup(); return true; } diff --git a/llarp/quic/tunnel.cpp b/llarp/quic/tunnel.cpp index 4bec1fa31..13dfcd8c5 100644 --- a/llarp/quic/tunnel.cpp +++ b/llarp/quic/tunnel.cpp @@ -318,11 +318,11 @@ namespace llarp::quic } int - TunnelManager::listen(uint16_t port) + TunnelManager::listen(SockAddr addr) { - return listen([port](std::string_view, uint16_t p) -> std::optional { - if (p == port) - return SockAddr{127, 0, 0, 1, huint16_t{port}}; + return listen([addr](std::string_view, uint16_t p) -> std::optional { + if (p == addr.getPort()) + return addr; return std::nullopt; }); } diff --git a/llarp/quic/tunnel.hpp b/llarp/quic/tunnel.hpp index c6f91aa70..ac466d134 100644 --- a/llarp/quic/tunnel.hpp +++ b/llarp/quic/tunnel.hpp @@ -71,7 +71,7 @@ namespace llarp::quic /// Simple wrapper around `listen(...)` that adds a handler that accepts all incoming /// connections trying to tunnel to port `port` and maps them to `localhost:port`. int - listen(uint16_t port); + listen(SockAddr port); /// Removes an incoming connection handler; takes the ID returned by `listen()`. void diff --git a/llarp/rpc/rpc_server.cpp b/llarp/rpc/rpc_server.cpp index 74a5372fb..d00baebde 100644 --- a/llarp/rpc/rpc_server.cpp +++ b/llarp/rpc/rpc_server.cpp @@ -214,6 +214,10 @@ namespace llarp::rpc if (auto itr = obj.find("endpoint"); itr != obj.end()) endpoint = itr->get(); + std::string remote = "127.0.0.1"; + if (auto itr = obj.find("host"); itr != obj.end()) + remote = itr->get(); + uint16_t port = 0; if (auto itr = obj.find("port"); itr != obj.end()) port = itr->get(); @@ -227,7 +231,7 @@ namespace llarp::rpc reply(CreateJSONError("invalid arguments")); return; } - r->loop()->call([reply, endpoint, port, r, closeID]() { + r->loop()->call([reply, endpoint, remote, port, r, closeID]() { auto ep = GetEndpointByName(r, endpoint); if (not ep) { @@ -245,7 +249,8 @@ namespace llarp::rpc int id = 0; try { - id = quic->listen(port); + SockAddr addr{remote + ":" + std::to_string(port)}; + id = quic->listen(addr); } catch (std::exception& ex) { diff --git a/llarp/service/endpoint.cpp b/llarp/service/endpoint.cpp index b12bb8f1f..7ba3bdefe 100644 --- a/llarp/service/endpoint.cpp +++ b/llarp/service/endpoint.cpp @@ -325,6 +325,17 @@ namespace llarp return false; } + bool + Endpoint::HasOutboundConvo(const Address& addr) const + { + for (const auto& item : Sessions()) + { + if (item.second.remote.Addr() == addr && not item.second.inbound) + return true; + } + return false; + } + void Endpoint::PutSenderFor(const ConvoTag& tag, const ServiceInfo& info, bool inbound) { @@ -973,7 +984,8 @@ namespace llarp path::Path_ptr path, const PathID_t from, std::shared_ptr msg) { msg->sender.UpdateAddr(); - PutSenderFor(msg->tag, msg->sender, true); + if (not HasOutboundConvo(msg->sender.Addr())) + PutSenderFor(msg->tag, msg->sender, true); PutReplyIntroFor(msg->tag, path->intro); Introduction intro; intro.pathID = from; diff --git a/llarp/service/endpoint.hpp b/llarp/service/endpoint.hpp index 2258f5ef7..31f34da3e 100644 --- a/llarp/service/endpoint.hpp +++ b/llarp/service/endpoint.hpp @@ -311,6 +311,9 @@ namespace llarp bool HasInboundConvo(const Address& addr) const override; + bool + HasOutboundConvo(const Address& addr) const override; + bool GetCachedSessionKeyFor(const ConvoTag& remote, SharedSecret& secret) const override; void diff --git a/llarp/service/handler.hpp b/llarp/service/handler.hpp index b2683940e..0bea0d039 100644 --- a/llarp/service/handler.hpp +++ b/llarp/service/handler.hpp @@ -66,6 +66,9 @@ namespace llarp virtual bool HasInboundConvo(const Address& addr) const = 0; + virtual bool + HasOutboundConvo(const Address& addr) const = 0; + /// do we want a session outbound to addr virtual bool WantsOutboundSession(const Address& addr) const = 0;