From 752879d7125a3e1a5c461d5e9ac98ca993a709f8 Mon Sep 17 00:00:00 2001 From: Jason Rhinelander Date: Tue, 23 Mar 2021 16:26:32 -0300 Subject: [PATCH] QUIC lokinet integration refactor Refactors how quic packets get handled: the actual tunnels now live in tunnel.hpp's TunnelManager which holds and manages all the quic<->tcp tunnelling. service::Endpoint now holds a TunnelManager rather than a quic::Server. We only need one quic server, but we need a separate quic client instance per outgoing quic tunnel, and TunnelManager handles all that glue now. Adds QUIC packet handling to get to the right tunnel code. This required multiplexing incoming quic packets, as follows: Adds a very small quic tunnel packet header of 4 bytes: [1, SPORT, ECN] for client->server packets, where SPORT is our source "port" (really: just a uint16_t unique quic instance identifier) or [2, DPORT, ECN] for server->client packets where the DPORT is the SPORT from above. (This also reworks ECN bits to get properly carried over lokinet.) We don't need a destination/source port for the server-side because there is only ever one quic server (and we know we're going to it when the first byte of the header is 1). Removes the config option for quic exposing ports; a full lokinet will simply accept anything incoming on quic and tunnel it to the requested port on the the local endpoint IP (this handler will come in a following commit). Replace ConvoTags with full addresses: we need to carry the port, as well, which the ConvoTag can't give us, so change those to more general SockAddrs from which we can extract both the ConvoTag *and* the port. Add a pending connection queue along with new quic-side handlers to call when a stream becomes available (TunnelManager uses this to wire up pending incoming conns with quic streams as streams open up). Completely get rid of tunnel_server/tunnel_client.cpp code; it is now moved to tunnel.hpp. Add listen()/forget() methods in TunnelManager for setting up quic listening sockets (for liblokinet usage). Add open()/close() methods in TunnelManager for spinning up new quic clients for outgoing quic connections. --- llarp/CMakeLists.txt | 2 - llarp/config/config.cpp | 10 - llarp/config/config.hpp | 2 - llarp/handlers/tun.cpp | 25 +- llarp/net/sock_addr.cpp | 6 + llarp/net/sock_addr.hpp | 3 + llarp/quic/address.cpp | 16 +- llarp/quic/address.hpp | 45 ++- llarp/quic/client.cpp | 80 ++-- llarp/quic/client.hpp | 18 +- llarp/quic/connection.cpp | 47 ++- llarp/quic/connection.hpp | 37 +- llarp/quic/endpoint.cpp | 103 ++++-- llarp/quic/endpoint.hpp | 98 +++-- llarp/quic/server.cpp | 65 +--- llarp/quic/server.hpp | 26 +- llarp/quic/stream.cpp | 2 +- llarp/quic/stream.hpp | 7 + llarp/quic/tunnel.cpp | 693 ++++++++++++++++++++++++++++++----- llarp/quic/tunnel.hpp | 227 +++++++++--- llarp/quic/tunnel_client.cpp | 143 -------- llarp/quic/tunnel_server.cpp | 158 -------- llarp/quic/tunnel_server.hpp | 80 ---- llarp/service/endpoint.cpp | 69 +--- llarp/service/endpoint.hpp | 13 +- 25 files changed, 1157 insertions(+), 818 deletions(-) delete mode 100644 llarp/quic/tunnel_client.cpp delete mode 100644 llarp/quic/tunnel_server.cpp delete mode 100644 llarp/quic/tunnel_server.hpp diff --git a/llarp/CMakeLists.txt b/llarp/CMakeLists.txt index 213b223a6..3ad144f93 100644 --- a/llarp/CMakeLists.txt +++ b/llarp/CMakeLists.txt @@ -95,8 +95,6 @@ add_library(lokinet-quic quic/server.cpp quic/stream.cpp quic/tunnel.cpp - quic/tunnel_client.cpp - quic/tunnel_server.cpp ) target_link_libraries(lokinet-quic PRIVATE lokinet-platform ngtcp2) diff --git a/llarp/config/config.cpp b/llarp/config/config.cpp index 9207b0f8c..e9ad25752 100644 --- a/llarp/config/config.cpp +++ b/llarp/config/config.cpp @@ -622,16 +622,6 @@ namespace llarp m_SRVRecords.push_back(std::move(newSRV)); }); - conf.defineOption( - "network", - "expose", - ClientOnly, - MultiValue, - Comment{ - "expose a local port via quic for liblokinet", - }, - [this](uint16_t port) { m_quicServerPorts.insert(port); }); - // Deprecated options: conf.defineOption("network", "enabled", Deprecated); } diff --git a/llarp/config/config.hpp b/llarp/config/config.hpp index 4a0ba8169..9d4072c7c 100644 --- a/llarp/config/config.hpp +++ b/llarp/config/config.hpp @@ -121,8 +121,6 @@ namespace llarp std::optional m_baseV6Address; - std::unordered_set m_quicServerPorts; - // TODO: // on-up // on-down diff --git a/llarp/handlers/tun.cpp b/llarp/handlers/tun.cpp index 59f44fdb5..455f142e7 100644 --- a/llarp/handlers/tun.cpp +++ b/llarp/handlers/tun.cpp @@ -1,5 +1,4 @@ #include -#include #include // harmless on other platforms #define __USE_MINGW_ANSI_STDIO 1 @@ -12,14 +11,17 @@ #include #include +#include #include #include #include #include #include #include +#include #include #include +#include #include #include @@ -89,8 +91,8 @@ namespace llarp : service::Endpoint(r, parent) , m_UserToNetworkPktQueue("endpoint_sendq", r->loop(), r->loop()) { - m_PacketRouter.reset( - new vpn::PacketRouter{[&](net::IPPacket pkt) { HandleGotUserPacket(std::move(pkt)); }}); + m_PacketRouter = std::make_unique( + [this](net::IPPacket pkt) { HandleGotUserPacket(std::move(pkt)); }); #ifdef ANDROID m_Resolver = std::make_shared(r, this); m_PacketRouter->AddUDPHandler(huint16_t{53}, [&](net::IPPacket pkt) { @@ -1007,6 +1009,23 @@ namespace llarp service::ProtocolType t, uint64_t seqno) { + if (t == service::ProtocolType::QUIC) + { + auto* quic = GetQUICTunnel(); + if (!quic) + { + LogWarn("incoming quic packet but this endpoint is not quic capable; dropping"); + return false; + } + if (buf.sz < 4) + { + LogWarn("invalid incoming quic packet, dropping"); + return false; + } + quic->receive_packet(tag, buf); + return true; + } + if (t != service::ProtocolType::TrafficV4 && t != service::ProtocolType::TrafficV6 && t != service::ProtocolType::Exit) return false; diff --git a/llarp/net/sock_addr.cpp b/llarp/net/sock_addr.cpp index cd4f852fe..67adee372 100644 --- a/llarp/net/sock_addr.cpp +++ b/llarp/net/sock_addr.cpp @@ -197,6 +197,12 @@ namespace llarp return &m_addr; } + size_t + SockAddr::sockaddr_len() const + { + return isIPv6() ? sizeof(m_addr) : sizeof(m_addr4); + } + bool SockAddr::operator<(const SockAddr& other) const { diff --git a/llarp/net/sock_addr.hpp b/llarp/net/sock_addr.hpp index a34aaecfc..9078aa015 100644 --- a/llarp/net/sock_addr.hpp +++ b/llarp/net/sock_addr.hpp @@ -68,6 +68,9 @@ namespace llarp operator const sockaddr_in*() const; operator const sockaddr_in6*() const; + size_t + sockaddr_len() const; + bool operator<(const SockAddr& other) const; diff --git a/llarp/quic/address.cpp b/llarp/quic/address.cpp index 0e62b029c..48dd05261 100644 --- a/llarp/quic/address.cpp +++ b/llarp/quic/address.cpp @@ -6,7 +6,7 @@ namespace llarp::quic { using namespace std::literals; - Address::Address(service::ConvoTag tag) : saddr{tag.ToV6()} + Address::Address(const SockAddr& addr) : saddr{*addr.operator const sockaddr_in6*()} {} Address& @@ -17,8 +17,7 @@ namespace llarp::quic return *this; } - service::ConvoTag - Address::Tag() const + Address::operator service::ConvoTag() const { service::ConvoTag tag{}; tag.FromV6(saddr); @@ -30,9 +29,14 @@ namespace llarp::quic { if (a.addrlen != sizeof(sockaddr_in6)) return "(unknown-addr)"; - char buf[INET6_ADDRSTRLEN] = {0}; - inet_ntop(AF_INET6, &saddr.sin6_addr, buf, INET6_ADDRSTRLEN); - return buf; + std::string result; + result.resize(8 + INET6_ADDRSTRLEN); + result[0] = '['; + inet_ntop(AF_INET6, &saddr.sin6_addr, &result[1], INET6_ADDRSTRLEN); + result.resize(result.find(char{0})); + result += "]:"; + result += std::to_string(ToHost(nuint16_t{saddr.sin6_port}).h); + return result; } std::ostream& diff --git a/llarp/quic/address.hpp b/llarp/quic/address.hpp index 56ea0700d..812f37849 100644 --- a/llarp/quic/address.hpp +++ b/llarp/quic/address.hpp @@ -1,5 +1,7 @@ #pragma once +#include + #include #include #include @@ -14,14 +16,8 @@ namespace llarp::quic { - union sockaddr_any - { - sockaddr_storage storage; - sockaddr sa; - sockaddr_in6 in6; - sockaddr_in in; - }; - + // Wrapper around a sockaddr; ngtcp2 requires more intrusive access that llarp::SockAddr is meant + // to deal with, hence this wrapper (rather than trying to abuse llarp::SockAddr). class Address { sockaddr_in6 saddr{}; @@ -29,19 +25,18 @@ namespace llarp::quic public: Address() = default; - Address(service::ConvoTag tag); + Address(const SockAddr& addr); Address(const Address& other) { *this = other; } - service::ConvoTag - Tag() const; - Address& operator=(const Address&); + // Implicit conversion to sockaddr* and ngtcp2_addr& so that an Address can be passed wherever + // one of those is expected. operator sockaddr*() { return reinterpret_cast(&saddr); @@ -68,6 +63,32 @@ namespace llarp::quic return sizeof(sockaddr_in6); } + // Implicit conversion to a convo tag so you can pass an Address to things taking a ConvoTag + operator service::ConvoTag() const; + + // Returns the lokinet pseudo-port for the quic connection (which routes this quic packet to the + // correct waiting quic instance on the remote). + nuint16_t + port() const + { + return nuint16_t{saddr.sin6_port}; + } + + // Sets the address port + void + port(nuint16_t port) + { + saddr.sin6_port = port.n; + } + + // Implicit conversion to SockAddr for going back to general llarp code + // FIXME: see if this is still needed, I think it may have been refactored away with the + // ConvoTag operator + operator SockAddr() const + { + return SockAddr(saddr); + } + std::string to_string() const; }; diff --git a/llarp/quic/client.cpp b/llarp/quic/client.cpp index 0a255dd6c..13dc7a169 100644 --- a/llarp/quic/client.cpp +++ b/llarp/quic/client.cpp @@ -1,62 +1,46 @@ - #include "client.hpp" +#include "tunnel.hpp" #include #include +#include #include #include #include #include +#include + namespace llarp::quic { - Client::Client(service::ConvoTag tag, service::Endpoint* parent, uint16_t tunnel_port) - : Endpoint{parent, parent->Loop()->MaybeGetUVWLoop()} + Client::Client(service::Endpoint& ep, const SockAddr& remote, uint16_t pseudo_port) : Endpoint{ep} { - // Our UDP socket is now set up, so now we initiate contact with the remote QUIC - Address remote{std::move(tag)}; + default_stream_buffer_size = + 0; // We steal uvw's provided buffers so don't need an outgoing data buffer - Path path{local, remote}; - llarp::LogDebug("Connecting to ", remote); + // *Our* port; we stuff this in the llarp quic header so it knows how to target quic packets + // back to *this* client. + local_addr.port(ToNet(huint16_t{pseudo_port})); + uint16_t tunnel_port = remote.getPort(); if (tunnel_port == 0) throw std::logic_error{"Cannot tunnel to port 0"}; // TODO: need timers for: // - // - timeout (to disconnect if idle for too longer) + // - timeout (to disconnect if idle for too long) // // - probably don't need for lokinet tunnel: change local addr -- attempts to re-bind the local // socket // // - key_update_timer - // - // - delay_stream_timer - - auto connptr = std::make_shared(*this, ConnectionID::random(), path, tunnel_port); - auto& conn = *connptr; - conns.emplace(conn.base_cid, connptr); - - /* Debug("set crypto ctx"); - null_crypto.client_initial(conn); - - auto x = ngtcp2_conn_get_max_data_left(conn); - Debug("mdl = ", x); - */ - - conn.io_ready(); + Path path{local_addr, remote}; + llarp::LogDebug("Connecting to ", remote); - /* - Debug("Opening bidi stream"); - int64_t stream_id; - if (auto rv = ngtcp2_conn_open_bidi_stream(conn, &stream_id, nullptr); - rv != 0) { - Debug("Opening bidi stream failed: ", ngtcp2_strerror(rv)); - assert(rv == NGTCP2_ERR_STREAM_ID_BLOCKED); - } - else { Debug("Opening bidi stream good"); } - */ + auto conn = std::make_shared(*this, ConnectionID::random(), path, tunnel_port); + conn->io_ready(); + conns.emplace(conn->base_cid, std::move(conn)); } std::shared_ptr @@ -72,29 +56,13 @@ namespace llarp::quic return std::get(it->second); } - void - Client::handle_packet(const Packet& p) + size_t + Client::write_packet_header(nuint16_t, uint8_t ecn) { - llarp::LogDebug("Handling incoming client packet: ", buffer_printer{p.data}); - auto maybe_dcid = handle_packet_init(p); - if (!maybe_dcid) - return; - auto& dcid = *maybe_dcid; - - llarp::LogDebug("Incoming connection id ", dcid); - auto [connptr, alias] = get_conn(dcid); - if (!connptr) - { - llarp::LogDebug("CID is ", alias ? "expired alias" : "unknown/expired", "; dropping"); - return; - } - auto& conn = *connptr; - if (alias) - llarp::LogDebug("CID is alias for primary CID ", conn.base_cid); - else - llarp::LogDebug("CID is primary CID"); - - handle_conn_packet(conn, p); + buf_[0] = CLIENT_TO_SERVER; + auto pseudo_port = local_addr.port(); + std::memcpy(&buf_[1], &pseudo_port.n, 2); // remote quic pseudo-port (network order u16) + buf_[3] = std::byte{ecn}; + return 4; } - } // namespace llarp::quic diff --git a/llarp/quic/client.hpp b/llarp/quic/client.hpp index 6cca4a55d..ff94e7ed7 100644 --- a/llarp/quic/client.hpp +++ b/llarp/quic/client.hpp @@ -1,18 +1,26 @@ #pragma once #include "endpoint.hpp" +#include "service/endpoint.hpp" #include +namespace uvw +{ + struct ListenEvent; + class TCPHandle; +} // namespace uvw + namespace llarp::quic { class Client : public Endpoint { public: // Constructs a client that establishes an outgoing connection to `remote` to tunnel packets to - // `tunnel_port` on the remote's lokinet address. `local` can be used to optionally bind to a - // local IP and/or port for the connection. - Client(service::ConvoTag remote, service::Endpoint* parent, uint16_t tunnel_port); + // `remote.getPort()` on the remote's lokinet address. `pseudo_port` is *our* unique local + // identifier which we include in outgoing packets (so that the remote server knows where to + // send the back to *this* client). + Client(service::Endpoint& ep, const SockAddr& remote, uint16_t pseudo_port); // Returns a reference to the client's connection to the server. Returns a nullptr if there is // no connection. @@ -20,8 +28,8 @@ namespace llarp::quic get_connection(); private: - void - handle_packet(const Packet& p) override; + size_t + write_packet_header(nuint16_t remote_port, uint8_t ecn) override; }; } // namespace llarp::quic diff --git a/llarp/quic/connection.cpp b/llarp/quic/connection.cpp index d8d40504f..f7e116146 100644 --- a/llarp/quic/connection.cpp +++ b/llarp/quic/connection.cpp @@ -126,7 +126,7 @@ namespace llarp::quic // At this stage of the protocol with TLS the client sends back TLS info so that // the server can install our rx key; we have to send *something* back to invoke // the server's HANDSHAKE callback (so that it knows handshake is complete) so - // sent the magic again. + // send the magic again. if (auto rv = conn.send_magic(NGTCP2_CRYPTO_LEVEL_HANDSHAKE); rv != 0) return rv; } @@ -262,6 +262,18 @@ namespace llarp::quic // FIXME return 0; } + int + extend_max_local_streams_bidi(ngtcp2_conn* conn_, uint64_t max_streams, void* user_data) + { + LogTrace("######################", __func__); + auto& conn = *static_cast(user_data); + if (conn.on_stream_available) + if (uint64_t left = ngtcp2_conn_get_streams_bidi_left(conn); left > 0) + conn.on_stream_available(conn); + + return 0; + } + int rand( uint8_t* dest, @@ -361,10 +373,11 @@ namespace llarp::quic std::tuple Connection::init() { - io_trigger = endpoint.loop->resource(); + auto loop = endpoint.get_loop(); + io_trigger = loop->resource(); io_trigger->on([this](auto&, auto&) { on_io_ready(); }); - retransmit_timer = endpoint.loop->resource(); + retransmit_timer = loop->resource(); retransmit_timer->on([this](auto&, auto&) { LogTrace("Retransmit timer fired!"); if (auto rv = ngtcp2_conn_handle_expiry(*this, get_timestamp()); rv != 0) @@ -389,6 +402,7 @@ namespace llarp::quic cb.acked_stream_data_offset = acked_stream_data_offset; cb.stream_open = stream_open; cb.stream_reset = stream_reset_cb; + cb.extend_max_local_streams_bidi = extend_max_local_streams_bidi; cb.rand = rand; cb.get_new_connection_id = get_new_connection_id; cb.remove_connection_id = remove_connection_id; @@ -770,7 +784,7 @@ namespace llarp::quic stream->stream_id = id; bool good = true; if (serv->stream_open_callback) - good = serv->stream_open_callback(*serv, *stream, tunnel_port); + good = serv->stream_open_callback(*stream, tunnel_port); if (!good) { LogDebug("stream_open_callback returned failure, dropping stream ", id); @@ -897,16 +911,29 @@ namespace llarp::quic return endpoint.add_connection_id(*this, cidlen); } + bool + Connection::get_handshake_completed() + { + return ngtcp2_conn_get_handshake_completed(*this) != 0; + } + + int + Connection::get_streams_available() + { + uint64_t left = ngtcp2_conn_get_streams_bidi_left(*this); + constexpr int max_int = std::numeric_limits::max(); + if (left > static_cast(max_int)) + return max_int; + return static_cast(left); + } + const std::shared_ptr& Connection::open_stream(Stream::data_callback_t data_cb, Stream::close_callback_t close_cb) { std::shared_ptr stream{new Stream{ *this, std::move(data_cb), std::move(close_cb), endpoint.default_stream_buffer_size}}; if (int rv = ngtcp2_conn_open_bidi_stream(*this, &stream->stream_id.id, stream.get()); rv != 0) - { - LogWarn("Creating stream failed: ", ngtcp2_strerror(rv)); throw std::runtime_error{"Stream creation failed: "s + ngtcp2_strerror(rv)}; - } auto& str = streams[stream->stream_id]; str = std::move(stream); @@ -979,6 +1006,12 @@ namespace llarp::quic if (!ngtcp2_conn_is_server(*this)) endpoint.null_crypto.install_tx_key(*this); ngtcp2_conn_handshake_completed(*this); + + if (on_handshake_complete) + { + on_handshake_complete(*this); + on_handshake_complete = nullptr; + } } // ngtcp2 doesn't expose the varint encoding, but it's fairly simple: diff --git a/llarp/quic/connection.hpp b/llarp/quic/connection.hpp index 7f414c8f8..33b840680 100644 --- a/llarp/quic/connection.hpp +++ b/llarp/quic/connection.hpp @@ -184,26 +184,28 @@ namespace llarp::quic // when the connection is initiated. std::map> streams; - /// Constructs and initializes a new connection received by a Server + /// Constructs and initializes a new incoming connection /// - /// \param s - the Server object on which the connection was initiated + /// \param server - the Server object that owns this connection /// \param base_cid - the local "primary" ConnectionID we use for this connection, typically - /// random \param header - packet header that initiated the connection \param path - the network - /// path to reach the remote - Connection(Server& s, const ConnectionID& base_cid, ngtcp2_pkt_hd& header, const Path& path); + /// random + /// \param header - packet header that initiated the connection \param path - the network path + /// to reach the remote + Connection( + Server& server, const ConnectionID& base_cid, ngtcp2_pkt_hd& header, const Path& path); /// Establishes a connection from the local Client to a remote Server - /// \param c - the Client object from which the connection is being made + /// \param client - the Endpoint object that owns this connection /// \param base_cid - the client's source (i.e. local) connection ID, typically random /// \param path - the network path to reach the remote /// \param tunnel_port - the port that this connection should tunnel to on the remote end - Connection(Client& c, const ConnectionID& scid, const Path& path, uint16_t tunnel_port); + Connection(Client& client, const ConnectionID& scid, const Path& path, uint16_t tunnel_port); // Non-movable, non-copyable: Connection(Connection&&) = delete; + Connection(const Connection&) = delete; Connection& operator=(Connection&&) = delete; - Connection(const Connection&) = delete; Connection& operator=(const Connection&) = delete; @@ -265,6 +267,25 @@ namespace llarp::quic ConnectionID make_alias_id(size_t cidlen = ConnectionID::max_size()); + // A callback to invoke when the connection handshake completes. Will be cleared after being + // called. + std::function on_handshake_complete; + + // Returns true iff this connection has completed a handshake with the remote end. + bool + get_handshake_completed(); + + // Callback that is invoked whenever new streams become available: i.e. after handshaking, or + // after existing streams are closed. Note that this callback is invoked whenever the number of + // available streams increases, even if it was initially non-zero before the increase. To see + // how many streams are currently available call `get_streams_available()` (it will always be at + // least 1 when this callback is invoked). + std::function on_stream_available; + + // Returns the number of available streams that can currently be opened on the connection + int + get_streams_available(); + // Opens a stream over this connection; when the server receives this it attempts to establish a // TCP connection to the tunnel configured in the connection. The data callback is invoked as // data is received on this stream. The close callback is called if the stream is closed diff --git a/llarp/quic/endpoint.cpp b/llarp/quic/endpoint.cpp index 43ea0ad4a..b8be36f70 100644 --- a/llarp/quic/endpoint.cpp +++ b/llarp/quic/endpoint.cpp @@ -5,6 +5,7 @@ #include #include #include +#include #include #include @@ -21,13 +22,12 @@ extern "C" namespace llarp::quic { - Endpoint::Endpoint(service::Endpoint* parent_, std::shared_ptr loop_) - : parent{parent_}, loop{std::move(loop_)} + Endpoint::Endpoint(service::Endpoint& ep) : service_endpoint{ep} { randombytes_buf(static_secret.data(), static_secret.size()); // Set up a callback every 250ms to clean up stale sockets, etc. - expiry_timer = loop->resource(); + expiry_timer = get_loop()->resource(); expiry_timer->on([this](const auto&, auto&) { check_timeouts(); }); expiry_timer->start(250ms, 250ms); @@ -40,6 +40,61 @@ namespace llarp::quic expiry_timer->close(); } + std::shared_ptr + Endpoint::get_loop() + { + auto loop = service_endpoint.Loop()->MaybeGetUVWLoop(); + assert(loop); // This object should never have been constructed if we aren't using uvw + return loop; + } + + void + Endpoint::receive_packet(const SockAddr& src, uint8_t ecn, bstring_view data) + { + // ngtcp2 wants a local address but we don't necessarily have something so just set it to + // IPv4 or IPv6 "unspecified" address (0.0.0.0 or ::) + SockAddr local = src.isIPv6() ? SockAddr{in6addr_any} : SockAddr{nuint32_t{INADDR_ANY}}; + + Packet pkt{Path{local, src}, data, ngtcp2_pkt_info{.ecn = ecn}}; + + LogDebug("[", pkt.path, ",ecn=", pkt.info.ecn, "]: received ", data.size(), " bytes"); + + handle_packet(pkt); + + LogDebug("Done handling packet"); + } + + void + Endpoint::handle_packet(const Packet& p) + { + LogDebug("Handling incoming quic packet: ", buffer_printer{p.data}); + auto maybe_dcid = handle_packet_init(p); + if (!maybe_dcid) + return; + auto& dcid = *maybe_dcid; + + // See if we have an existing connection already established for it + LogDebug("Incoming connection id ", dcid); + auto [connptr, alias] = get_conn(dcid); + if (!connptr) + { + if (alias) + { + LogDebug("CID is an expired alias; dropping"); + return; + } + connptr = accept_initial_connection(p); + if (!connptr) + return; + } + if (alias) + llarp::LogDebug("CID is alias for primary CID ", connptr->base_cid); + else + llarp::LogDebug("CID is primary CID"); + + handle_conn_packet(*connptr, p); + } + std::optional Endpoint::handle_packet_init(const Packet& p) { @@ -119,32 +174,20 @@ namespace llarp::quic return {rv}; } - void - Endpoint::update_ecn(uint32_t ecn) - { - assert(ecn <= std::numeric_limits::max()); - if (ecn_curr != ecn) - { - ecn_curr = ecn; - } - } - io_result - Endpoint::send_packet(const Address& to, bstring_view data, uint32_t ecn) + Endpoint::send_packet(const Address& to, bstring_view data, uint8_t ecn) { - update_ecn(ecn); - - parent->SendTo(to.Tag(), data, service::ProtocolType::QUIC); - LogDebug( - "[", - to.to_string(), - ",ecn=0x", - std::hex, - +ecn_curr, - std::dec, - "]: sent ", - data.size(), - " bytes"); + assert(service_endpoint.Loop()->inEventLoop()); + + size_t header_size = write_packet_header(to.port(), ecn); + size_t outgoing_len = header_size + data.size(); + assert(outgoing_len <= buf_.size()); + std::memcpy(&buf_[header_size], data.data(), data.size()); + bstring_view outgoing{buf_.data(), outgoing_len}; + + service_endpoint.SendToOrQueue(to, outgoing, service::ProtocolType::QUIC); + LogDebug("[", to, "]: sent ", outgoing.size(), " bytes"); + LogTrace("Full quic data: ", buffer_printer{outgoing}); return {}; } @@ -207,15 +250,11 @@ namespace llarp::quic conn.conn_buffer.resize(written); conn.closing = true; - // FIXME: ipv6 - assert(path.local.sockaddr_size() == sizeof(sockaddr_in)); - assert(path.remote.sockaddr_size() == sizeof(sockaddr_in)); - conn.path = path; } assert(conn.closing && !conn.conn_buffer.empty()); - if (auto sent = send_packet(conn.path.remote, conn.conn_buffer, 0); !sent) + if (auto sent = send_packet(conn.path.remote, conn.conn_buffer, 0); not sent) { LogWarn( "Failed to send packet: ", diff --git a/llarp/quic/endpoint.hpp b/llarp/quic/endpoint.hpp index 5e82d9782..aad7fcaca 100644 --- a/llarp/quic/endpoint.hpp +++ b/llarp/quic/endpoint.hpp @@ -6,52 +6,61 @@ #include "null_crypto.hpp" #include "packet.hpp" #include "stream.hpp" -#include "uvw/async.h" +#include #include #include #include #include #include - #include -#include -#include +#include #include -#include - namespace llarp::service { struct Endpoint; } // namespace llarp::service -namespace llarp::net -{ - struct IPPacket; -} - namespace llarp::quic { using namespace std::literals; inline constexpr auto IDLE_TIMEOUT = 5min; + inline constexpr std::byte CLIENT_TO_SERVER{1}; + inline constexpr std::byte SERVER_TO_CLIENT{2}; + + /// QUIC Tunnel Endpoint; this is the class that implements either end of a quic tunnel for both + /// servers and clients. class Endpoint { + public: + /// Called from tun code via TunnelManager to deliver an incoming packet to us. + /// + /// \param src - the source address; this may be a tun interface address, or may be a fake IPv6 + /// address based on the convo tag. The port is not used. + /// \param ecn - the packet ecn parameter + void + receive_packet(const SockAddr& src, uint8_t ecn, bstring_view data); + + /// Returns a shared pointer to the uvw loop. + std::shared_ptr + get_loop(); + protected: /// the service endpoint we are owned by - service::Endpoint* const parent; - // The current outgoing IP ecn value for the socket - uint8_t ecn_curr = 0; + service::Endpoint& service_endpoint; - /// local "address" just a blank - Address local{}; + /// local "address" is the IPv6 unspecified address since we don't have (or care about) the + /// actual local address for building quic packets. The port of this address must be set to our + /// local pseudo-port, for clients, and 0 for a server. + Address local_addr{in6addr_any}; std::shared_ptr expiry_timer; - std::shared_ptr loop; + std::vector buf; // Max theoretical size of a UDP packet is 2^16-1 minus IP/UDP header overhead static constexpr size_t max_buf_size = 64 * 1024; // Max size of a UDP packet that we'll send @@ -96,8 +105,7 @@ namespace llarp::quic friend class Connection; - // Wires up an endpoint connection. - Endpoint(service::Endpoint* ep, std::shared_ptr loop); + explicit Endpoint(service::Endpoint& service_endpoint_); virtual ~Endpoint(); @@ -112,8 +120,8 @@ namespace llarp::quic }; // Called to handle an incoming packet - virtual void - handle_packet(const Packet& p) = 0; + void + handle_packet(const Packet& p); // Internal method: handles initial common packet decoding, returns the connection ID or nullopt // if decoding failed. @@ -123,27 +131,47 @@ namespace llarp::quic void handle_conn_packet(Connection& c, const Packet& p); + // Accept a new incoming connection, i.e. pre-handshake. Returns a nullptr if the connection + // can't be created (e.g. because of invalid initial data), or if incoming connections are not + // accepted by this endpoint (i.e. because it is not a Server instance, or because there are no + // registered listen handlers). The base class default returns nullptr. + virtual std::shared_ptr + accept_initial_connection(const Packet&) + { + return nullptr; + } + // Reads a packet and handles various error conditions. Returns an io_result. Note that it is // possible for the conn_it to be erased from `conns` if the error code is anything other than // success (0) or NGTCP2_ERR_RETRY. io_result read_packet(const Packet& p, Connection& conn); - // Sets up the ECN IP field (IP_TOS for IPv4) for the next outgoing packet sent via - // send_packet(). This does the actual syscall (if ECN is different than currently set), and is - // typically called implicitly via send_packet(). - void - update_ecn(uint32_t ecn); + // Writes the lokinet packet header to the beginning of `buf_`; the header is prepend to quic + // packets to identify which quic server the packet should be delivered to and consists of: + // - type [1 byte]: 1 for client->server packets; 2 for server->client packets (other values + // reserved) + // - port [2 bytes, network order]: client pseudoport (i.e. either a source or destination port + // depending on type) + // - ecn value [1 byte]: provided by ngtcp2. (Only the lower 2 bits are actually used). + // + // \param psuedo_port - the remote's pseudo-port (will be 0 if the remote is a server, > 0 for + // a client remote) + // \param ecn - the ecn value from ngtcp2 + // + // Returns the number of bytes written to buf_. + virtual size_t + write_packet_header(nuint16_t pseudo_port, uint8_t ecn) = 0; // Sends a packet to `to` containing `data`. Returns a non-error io_result on success, // an io_result with .error_code set to the errno of the failure on failure. io_result - send_packet(const Address& to, bstring_view data, uint32_t ecn); + send_packet(const Address& to, bstring_view data, uint8_t ecn); // Wrapper around the above that takes a regular std::string_view (i.e. of chars) and recasts // it to an string_view of std::bytes. io_result - send_packet(const Address& to, std::string_view data, uint32_t ecn) + send_packet(const Address& to, std::string_view data, uint8_t ecn) { return send_packet( to, bstring_view{reinterpret_cast(data.data()), data.size()}, ecn); @@ -151,7 +179,7 @@ namespace llarp::quic // Another wrapper taking a vector io_result - send_packet(const Address& to, const std::vector& data, uint32_t ecn) + send_packet(const Address& to, const std::vector& data, uint8_t ecn) { return send_packet(to, bstring_view{data.data(), data.size()}, ecn); } @@ -212,12 +240,12 @@ namespace llarp::quic // Default stream buffer size for streams opened through this endpoint. size_t default_stream_buffer_size = 64 * 1024; - // Gets a reference to the UV event loop - uvw::Loop& - get_loop() - { - return *loop; - } + // Packet buffer we use when constructing custom packets to fire over lokinet + std::array buf_; + + // Non-copyable, non-movable + Endpoint(const Endpoint&) = delete; + Endpoint(Endpoint&&) = delete; }; } // namespace llarp::quic diff --git a/llarp/quic/server.cpp b/llarp/quic/server.cpp index cad32790c..369426699 100644 --- a/llarp/quic/server.cpp +++ b/llarp/quic/server.cpp @@ -12,59 +12,11 @@ namespace llarp::quic { - Server::Server( - service::Endpoint* parent, - std::shared_ptr loop, - stream_open_callback_t stream_open) - : Endpoint{parent, std::move(loop)}, stream_open_callback{std::move(stream_open)} - {} - - void - Server::handle_packet(const Packet& p) - { - LogDebug("Handling incoming server packet: ", buffer_printer{p.data}); - auto maybe_dcid = handle_packet_init(p); - if (!maybe_dcid) - return; - auto& dcid = *maybe_dcid; - - // See if we have an existing connection already established for it - LogDebug("Incoming connection id ", dcid); - primary_conn_ptr connptr; - if (auto conn_it = conns.find(dcid); conn_it != conns.end()) - { - if (auto* wptr = std::get_if(&conn_it->second)) - { - connptr = wptr->lock(); - if (!connptr) - LogDebug("CID is an expired alias"); - else - LogDebug("CID is an alias for primary CID ", connptr->base_cid); - } - else - { - connptr = var::get(conn_it->second); - LogDebug("CID is primary"); - } - } - else - { - connptr = accept_connection(p); - } - - if (!connptr) - { - LogWarn("invalid or expired connection, ignoring"); - return; - } - - handle_conn_packet(*connptr, p); - } - std::shared_ptr - Server::accept_connection(const Packet& p) + Server::accept_initial_connection(const Packet& p) { LogDebug("Accepting new connection"); + // This is a new incoming connection ngtcp2_pkt_hd hd; auto rv = ngtcp2_accept(&hd, u8data(p.data), p.data.size()); @@ -85,10 +37,6 @@ namespace llarp::quic return nullptr; } - /* - ngtcp2_cid ocid; - ngtcp2_cid *pocid = nullptr; - */ if (hd.type == NGTCP2_PKT_0RTT) { LogWarn("Received 0-RTT packet, which shouldn't happen in our implementation; dropping"); @@ -114,4 +62,13 @@ namespace llarp::quic } } + size_t + Server::write_packet_header(nuint16_t pport, uint8_t ecn) + { + buf_[0] = SERVER_TO_CLIENT; + std::memcpy(&buf_[1], &pport.n, 2); // remote quic pseudo-port (network order u16) + buf_[3] = std::byte{ecn}; + return 4; + } + } // namespace llarp::quic diff --git a/llarp/quic/server.hpp b/llarp/quic/server.hpp index 192809420..7e6e1db3b 100644 --- a/llarp/quic/server.hpp +++ b/llarp/quic/server.hpp @@ -9,11 +9,10 @@ namespace llarp::quic class Server : public Endpoint { public: - using stream_open_callback_t = - std::function; + using stream_open_callback_t = std::function; - Server( - service::Endpoint*, std::shared_ptr loop, stream_open_callback_t stream_opened); + Server(service::Endpoint& service_endpoint) : Endpoint{service_endpoint} + {} // Stream callback: takes the server, the (just-created) stream, and the connection port. // Returns true if the stream should be allowed or false to reject the stream. The callback @@ -21,21 +20,14 @@ namespace llarp::quic // (which means incoming data will simply be dropped). stream_open_callback_t stream_open_callback; - int - setup_null_crypto(ngtcp2_conn* conn); - private: - // Handles an incoming packet by figuring out and handling the connection id; if necessary we - // send back a version negotiation or a connection close frame, or drop the packet (if in the - // draining state). If we get through all of the above then it's a packet to read, in which - // case we pass it on to read_packet(). - void - handle_packet(const Packet& p) override; - - // Creates a new connection from an incoming packet. Returns a nullptr if the connection can't - // be created. + // Accept a new incoming connection, i.e. pre-handshake. Returns a nullptr if the connection + // can't be created (e.g. because of invalid initial data), or is invalid. std::shared_ptr - accept_connection(const Packet& p); + accept_initial_connection(const Packet& p) override; + + size_t + write_packet_header(nuint16_t pport, uint8_t ecn) override; }; } // namespace llarp::quic diff --git a/llarp/quic/stream.cpp b/llarp/quic/stream.cpp index 166f38dc4..6d5b8b3f9 100644 --- a/llarp/quic/stream.cpp +++ b/llarp/quic/stream.cpp @@ -60,7 +60,7 @@ namespace llarp::quic , conn{conn} , stream_id{std::move(id)} , buffer{buffer_size} - , avail_trigger{conn.endpoint.get_loop().resource()} + , avail_trigger{conn.endpoint.get_loop()->resource()} { avail_trigger->on([this](auto&, auto&) { handle_unblocked(); }); } diff --git a/llarp/quic/stream.hpp b/llarp/quic/stream.hpp index 60ba4b377..65c3df66d 100644 --- a/llarp/quic/stream.hpp +++ b/llarp/quic/stream.hpp @@ -258,6 +258,13 @@ namespace llarp::quic : std::get>(user_data).lock()); } + // Returns a reference to the connection that owns this stream + Connection& + get_connection() + { + return conn; + } + private: friend class Connection; diff --git a/llarp/quic/tunnel.cpp b/llarp/quic/tunnel.cpp index d41bb21c1..cff5badb1 100644 --- a/llarp/quic/tunnel.cpp +++ b/llarp/quic/tunnel.cpp @@ -1,111 +1,634 @@ #include "tunnel.hpp" +#include "service/convotag.hpp" +#include "service/endpoint.hpp" +#include "service/name.hpp" #include "stream.hpp" +#include #include #include +#include +#include +#include +#include +#include -namespace llarp::quic::tunnel +namespace llarp::quic { - // Takes data from the tcp connection and pushes it down the quic tunnel - void - on_outgoing_data(uvw::DataEvent& event, uvw::TCPHandle& client) - { - auto stream = client.data(); - assert(stream); - std::string_view data{event.data.get(), event.length}; - auto peer = client.peer(); - LogDebug(peer.ip, ":", peer.port, " → lokinet ", buffer_printer{data}); - // Steal the buffer from the DataEvent's unique_ptr: - stream->append_buffer(reinterpret_cast(event.data.release()), event.length); - if (stream->used() >= PAUSE_SIZE) - { - LogDebug( - "quic tunnel is congested (have ", - stream->used(), - " bytes in flight); pausing local tcp connection reads"); - client.stop(); - stream->when_available([](llarp::quic::Stream& s) { - auto client = s.data(); - if (s.used() < PAUSE_SIZE) + namespace + { + // Takes data from the tcp connection and pushes it down the quic tunnel + void + on_outgoing_data(uvw::DataEvent& event, uvw::TCPHandle& client) + { + auto stream = client.data(); + assert(stream); + std::string_view data{event.data.get(), event.length}; + auto peer = client.peer(); + LogDebug(peer.ip, ":", peer.port, " → lokinet ", buffer_printer{data}); + // Steal the buffer from the DataEvent's unique_ptr: + stream->append_buffer(reinterpret_cast(event.data.release()), event.length); + if (stream->used() >= tunnel::PAUSE_SIZE) + { + LogDebug( + "quic tunnel is congested (have ", + stream->used(), + " bytes in flight); pausing local tcp connection reads"); + client.stop(); + stream->when_available([](Stream& s) { + auto client = s.data(); + if (s.used() < tunnel::PAUSE_SIZE) + { + LogDebug("quic tunnel is no longer congested; resuming tcp connection reading"); + client->read(); + return true; + } + return false; + }); + } + else + { + LogDebug("Queued ", event.length, " bytes"); + } + } + + // Received data from the quic tunnel and sends it to the TCP connection + void + on_incoming_data(Stream& stream, bstring_view bdata) + { + auto tcp = stream.data(); + assert(tcp); + std::string_view data{reinterpret_cast(bdata.data()), bdata.size()}; + auto peer = tcp->peer(); + LogTrace(peer.ip, ":", peer.port, " ← lokinet ", buffer_printer{data}); + + if (data.empty()) + return; + + // Try first to write immediately from the existing buffer to avoid needing an + // allocation and copy: + auto written = tcp->tryWrite(const_cast(data.data()), data.size()); + if (written < (int)data.size()) + { + data.remove_prefix(written); + + auto wdata = std::make_unique(data.size()); + std::copy(data.begin(), data.end(), wdata.get()); + tcp->write(std::move(wdata), data.size()); + } + } + + // Creates a new tcp handle that forwards incoming data/errors/closes into appropriate actions + // on the given quic stream. + void + install_stream_forwarding(uvw::TCPHandle& tcp, Stream& stream) + { + tcp.data(stream.shared_from_this()); + stream.weak_data(tcp.weak_from_this()); + + tcp.clear(); // Clear any existing initial event handlers + + tcp.on([](auto&, uvw::TCPHandle& c) { + // This fires sometime after we call `close()` to signal that the close is done. + LogError("Connection closed to ", c.peer().ip, ":", c.peer().port, "; closing quic stream"); + c.data()->close(); + c.data(nullptr); + }); + tcp.on([](auto&, uvw::TCPHandle& c) { + // This fires on eof, most likely because the other side of the TCP connection closed it. + LogError("EOF on connection to ", c.peer().ip, ":", c.peer().port); + c.close(); + }); + tcp.on([](const uvw::ErrorEvent& e, uvw::TCPHandle& tcp) { + LogError( + "ErrorEvent[", + e.name(), + ": ", + e.what(), + "] on connection with ", + tcp.peer().ip, + ":", + tcp.peer().port, + ", shutting down quic stream"); + // Failed to open connection, so close the quic stream + auto stream = tcp.data(); + if (stream) + stream->close(tunnel::ERROR_TCP); + tcp.closeReset(); + }); + tcp.on(on_outgoing_data); + stream.data_callback = on_incoming_data; + } + // This initial data handler is responsible for pulling off the initial stream data that comes + // back, confirming that the tunnel is opened on the other end. Currently this is a null byte + // (CONNECT_INIT) but in the future we might encode additional data here (and, if that happens, + // we want this older implementation to fail). + // + // If the initial byte checks out we replace this handler with the regular stream handler (and + // forward the rest of the data to it if we got more than just the single byte). + void + initial_client_data_handler(uvw::TCPHandle& client, Stream& stream, bstring_view bdata) + { + if (bdata.empty()) + return; + client.clear(); // Clear these initial event handlers: we either set up the proper ones, or + // close + + if (auto b0 = bdata[0]; b0 == tunnel::CONNECT_INIT) + { + // Set up callbacks, which replaces both of these initial callbacks + client.read(); + install_stream_forwarding(client, stream); + + if (bdata.size() > 1) { - LogDebug("quic tunnel is no longer congested; resuming tcp connection reading"); - client->read(); - return true; + bdata.remove_prefix(1); + stream.data_callback(stream, std::move(bdata)); } + LogTrace("starting client reading"); + } + else + { + LogWarn( + "Remote connection returned invalid initial byte (0x", + oxenmq::to_hex(bdata.begin(), bdata.begin() + 1), + "); dropping connection"); + stream.close(tunnel::ERROR_BAD_INIT); + client.closeReset(); + } + stream.io_ready(); + } + + // Initial close handler that gets replaced as soon as we receive a valid byte (in the above + // handler). If this gets called then it means the quic remote quic end closed before we + // established the end-to-end tunnel (for example because the remote's tunnel connection + // failed): + void + initial_client_close_handler( + uvw::TCPHandle& client, Stream& /*stream*/, std::optional error_code) + { + if (error_code && *error_code == tunnel::ERROR_CONNECT) + LogDebug("Remote TCP connection failed, closing local connection"); + else + LogWarn( + "Stream connection closed ", + error_code ? "with error " + std::to_string(*error_code) : "gracefully", + "; closing local TCP connection."); + auto peer = client.peer(); + LogDebug("Closing connection to ", peer.ip, ":", peer.port); + client.clear(); + if (error_code) + client.closeReset(); + else + client.close(); + } + + } // namespace + + TunnelManager::TunnelManager(service::Endpoint& se) : service_endpoint_{se} + { + // Cleanup callback to clear out closed tunnel connections + service_endpoint_.Loop()->call_every(500ms, timer_keepalive_, [this] { + LogTrace("Checking quic tunnels for finished connections"); + for (auto ctit = client_tunnels_.begin(); ctit != client_tunnels_.end();) + { + // Clear any accepted connections that have been closed: + auto& [port, ct] = *ctit; + for (auto it = ct.conns.begin(); it != ct.conns.end();) + { + // TCP connections keep a shared_ptr to their quic::Stream while open and clear it when + // closed. (We don't want to use `.active()` here because we do deliberately temporarily + // stop the TCP connection when the quic side gets congested. + if (not *it or not(*it)->data()) + { + LogDebug("Cleanup up closed outgoing tunnel on quic:", port); + it = ct.conns.erase(it); + } + else + ++it; + } + + // If there are not accepted connections left *and* we stopped listening for new ones then + // destroy the whole thing. + if (ct.conns.empty() and (not ct.tcp or not ct.tcp->active())) + { + LogDebug("All sockets closed on quic:", port, ", destroying tunnel data"); + ctit = client_tunnels_.erase(ctit); + } + else + ++ctit; + } + LogTrace("Done quic tunnel cleanup check"); + }); + } + + void + TunnelManager::make_server() + { + // auto loop = get_loop(); + + server_ = std::make_unique(service_endpoint_); + server_->stream_open_callback = [this](Stream& stream, uint16_t port) -> bool { + stream.close_callback = [](quic::Stream& st, + [[maybe_unused]] std::optional errcode) { + auto tcp = st.data(); + if (tcp) + tcp->close(); + }; + + auto& conn = stream.get_connection(); + auto remote = service_endpoint_.GetEndpointWithConvoTag(conn.path.remote); + if (!remote) + { + LogWarn("Received new stream open from invalid/unknown convo tag, dropping stream"); return false; - }); + } + + auto lokinet_addr = var::visit([](auto&& remote) { return remote.ToString(); }, *remote); + auto tunnel_to = allow_connection(lokinet_addr, port); + if (not tunnel_to) + return false; + LogDebug("quic stream from ", lokinet_addr, " to ", port, " tunnelling to ", *tunnel_to); + + auto tcp = get_loop()->resource(); + auto error_handler = tcp->once( + [&stream, to = *tunnel_to](const uvw::ErrorEvent&, uvw::TCPHandle&) { + LogWarn("Failed to connect to ", to, ", shutting down quic stream"); + stream.close(tunnel::ERROR_CONNECT); + }); + + // As soon as we connect to the local tcp tunnel port we fire a CONNECT_INIT down the stream + // tunnel to let the other end know the connection was successful, then set up regular + // stream handling to handle any other to/from data. + tcp->once( + [streamw = stream.weak_from_this(), error_handler = std::move(error_handler)]( + const uvw::ConnectEvent&, uvw::TCPHandle& tcp) { + auto peer = tcp.peer(); + auto stream = streamw.lock(); + if (!stream) + { + LogWarn( + "Connected to TCP ", + peer.ip, + ":", + peer.port, + " but quic stream has gone away; close/resetting local TCP connection"); + tcp.closeReset(); + return; + } + LogDebug("Connected to ", peer.ip, ":", peer.port, " for quic ", stream->id()); + // Set up the data stream forwarding (which also clears these initial handlers). + install_stream_forwarding(tcp, *stream); + assert(stream->used() == 0); + + // Send the magic byte, and start reading from the tcp tunnel in the logic thread + stream->append_buffer(new std::byte[1]{tunnel::CONNECT_INIT}, 1); + tcp.read(); + }); + + tcp->connect(*tunnel_to->operator const sockaddr*()); + + return true; + }; + } + + int + TunnelManager::listen(ListenHandler handler) + { + if (!handler) + throw std::logic_error{"Cannot call listen() with a null handler"}; + assert(service_endpoint_.Loop()->inEventLoop()); + if (not server_) + make_server(); + + int id = next_handler_id_++; + incoming_handlers_.emplace_hint(incoming_handlers_.end(), id, std::move(handler)); + return id; + } + + int + TunnelManager::listen(uint16_t port) + { + return listen([port](std::string_view, uint16_t p) -> std::optional { + if (p == port) + return SockAddr{127, 0, 0, 1, huint16_t{port}}; + return std::nullopt; + }); + } + + void + TunnelManager::forget(int id) + { + incoming_handlers_.erase(id); + } + + std::optional + TunnelManager::allow_connection(std::string_view lokinet_addr, uint16_t port) + { + for (auto& [id, handler] : incoming_handlers_) + { + try + { + if (auto addr = handler(lokinet_addr, port)) + return addr; + } + catch (const std::exception& e) + { + LogWarn( + "Incoming quic connection from ", + lokinet_addr, + " to ", + port, + " denied via exception (", + e.what(), + ")"); + return std::nullopt; + } + } + LogWarn( + "Incoming quic connection from ", lokinet_addr, " to ", port, " declined by all handlers"); + return std::nullopt; + } + + std::shared_ptr + TunnelManager::get_loop() + { + if (auto loop = service_endpoint_.Loop()->MaybeGetUVWLoop()) + return loop; + throw std::logic_error{"TunnelManager requires a libuv-based event loop"}; + } + + // Finds the first unused key in `map`, starting at `start` and wrapping back to 0 if we hit the + // end. Requires an unsigned int type for the key. Requires nullopt if the map is completely + // full, otherwise returns the free key. + template < + typename K, + typename V, + typename = std::enable_if_t && std::is_unsigned_v>> + static std::optional + find_unused_key(std::map& map, K start) + { + if (map.size() == std::numeric_limits::max()) + return std::nullopt; // The map is completely full + [[maybe_unused]] bool from_zero = (start == K{0}); + + // Start at the first key >= start, then walk 1-by-1 (incrementing start) until we find a + // strictly > key, which means we've found a hole we can use + auto it = map.lower_bound(start); + if (it == map.end()) + return start; + + for (; it != map.end(); ++it, ++start) + if (it->first != start) + return start; + if (start != 0) // `start` didn't wrap which means we found an empty slot + return start; + assert(!from_zero); // There *must* be a free slot somewhere in [0, max] (otherwise the map + // would be completely full and we'd have returned nullopt). + return find_unused_key(map, K{0}); + } + + // Wrap common tasks and cleanup that we need to do from multiple places while establishing a + // tunnel + bool + TunnelManager::continue_connecting( + uint16_t pseudo_port, bool step_success, std::string_view step_name, std::string_view addr) + { + assert(service_endpoint_.Loop()->inEventLoop()); + auto it = client_tunnels_.find(pseudo_port); + if (it == client_tunnels_.end()) + { + LogDebug("QUIC tunnel to ", addr, " closed before ", step_name, " finished"); + return false; + } + if (!step_success) + { + LogWarn("QUIC tunnel to ", addr, " failed during ", step_name, "; aborting tunnel"); + it->second.tcp->closeReset(); + if (it->second.open_cb) + it->second.open_cb(false); + client_tunnels_.erase(it); + } + return step_success; + } + + std::pair + TunnelManager::open( + std::string_view remote_address, uint16_t port, OpenCallback on_open, SockAddr bind_addr) + { + std::string remote_addr = lowercase_ascii_string(std::string{remote_address}); + + std::pair result; + auto& [saddr, pport] = result; + + auto maybe_remote = service::ParseAddress(remote_addr); + if (!maybe_remote) + { + if (not service::NameIsValid(remote_addr)) + throw std::invalid_argument{"Invalid remote lokinet name/address"}; + // Otherwise it's a valid ONS name, so we'll initiate an ONS lookup below + } + + // Open the TCP tunnel right away; it will just block new incoming connections until the quic + // connection is established, but this still allows the caller to connect right away and queue + // an initial request (rather than having to wait via a callback before connecting). It also + // makes sure we can actually listen on the given address before we go ahead with establishing + // the quic connection. + auto tcp_tunnel = get_loop()->resource(); + const char* failed = nullptr; + auto err_handler = + tcp_tunnel->once([&failed](auto& evt, auto&) { failed = evt.what(); }); + tcp_tunnel->bind(*bind_addr.operator const sockaddr*()); + tcp_tunnel->listen(); + tcp_tunnel->erase(err_handler); + + if (failed) + { + tcp_tunnel->closeReset(); + throw std::runtime_error{ + "Failed to bind/listen local TCP tunnel socket on " + bind_addr.toString() + ": " + + failed}; } + + auto bound = tcp_tunnel->sock(); + saddr = SockAddr{bound.ip, static_cast(bound.port)}; + + // Find the first unused psuedo-port value starting from next_pseudo_port_. + if (auto p = find_unused_key(client_tunnels_, next_pseudo_port_)) + pport = *p; else + throw std::runtime_error{ + "Unable to open an outgoing quic connection: too many existing connections"}; + (next_pseudo_port_ = pport)++; + + // We are emplacing into client_tunnels_ here: beyond this point we must not throw until we + // return (or if we do, make sure we remove this row from client_tunnels_ first). + assert(client_tunnels_.count(pport) == 0); + auto& ct = client_tunnels_[pport]; + ct.open_cb = std::move(on_open); + ct.tcp = std::move(tcp_tunnel); + + auto after_path = [this, port, pport = pport, remote_addr](auto maybe_convo) { + if (not continue_connecting(pport, (bool)maybe_convo, "path build", remote_addr)) + return; + SockAddr dest{maybe_convo->ToV6()}; + dest.setPort(port); + make_client(dest, *client_tunnels_.find(pport)); + }; + + if (!maybe_remote) { - LogDebug("Queued ", event.length, " bytes"); + // We were given an ONS address, so it's a two-step process: first we resolve the ONS name, + // then we have to build a path to that address. + service_endpoint_.LookupNameAsync( + remote_addr, + [this, + after_path = std::move(after_path), + pport = pport, + remote_addr = std::move(remote_addr)](auto maybe_remote) { + if (not continue_connecting( + pport, (bool)maybe_remote, "endpoint ONS lookup", remote_addr)) + return; + service_endpoint_.EnsurePathTo(*maybe_remote, after_path, open_timeout); + }); + return result; } + + auto& remote = *maybe_remote; + + // See if we have an existing convo tag we can use to start things immediately + if (auto maybe_convo = service_endpoint_.GetBestConvoTagFor(remote)) + after_path(maybe_convo); + else + service_endpoint_.EnsurePathTo(remote, after_path, open_timeout); + + return result; } - // Received data from the quic tunnel and sends it to the TCP connection void - on_incoming_data(llarp::quic::Stream& stream, llarp::quic::bstring_view bdata) + TunnelManager::close(int id) { - auto tcp = stream.data(); - assert(tcp); - std::string_view data{reinterpret_cast(bdata.data()), bdata.size()}; - auto peer = tcp->peer(); - LogTrace(peer.ip, ":", peer.port, " ← lokinet ", buffer_printer{data}); - - if (data.empty()) - return; + if (auto it = client_tunnels_.find(id); it != client_tunnels_.end()) + { + it->second.tcp->close(); + it->second.tcp.reset(); + } + } - // Try first to write immediately from the existing buffer to avoid needing an - // allocation and copy: - auto written = tcp->tryWrite(const_cast(data.data()), data.size()); - if (written < (int)data.size()) + TunnelManager::ClientTunnel::~ClientTunnel() + { + if (tcp) { - data.remove_prefix(written); + tcp->close(); + tcp.reset(); + } + for (auto& conn : conns) + conn->closeReset(); + conns.clear(); - auto wdata = std::make_unique(data.size()); - std::copy(data.begin(), data.end(), wdata.get()); - tcp->write(std::move(wdata), data.size()); + while (not pending_incoming.empty()) + { + if (auto tcp = pending_incoming.front().lock()) + { + tcp->clear(); + tcp->close(); + } + pending_incoming.pop(); } } void - install_stream_forwarding(uvw::TCPHandle& tcp, llarp::quic::Stream& stream) - { - tcp.data(stream.shared_from_this()); - stream.weak_data(tcp.weak_from_this()); - - tcp.on([](auto&, uvw::TCPHandle& c) { - // This fires sometime after we call `close()` to signal that the close is done. - LogError( - "Connection with ", - c.peer().ip, - ":", - c.peer().port, - " closed directly, closing quic stream"); - c.data()->close(); - }); - tcp.on([](auto&, uvw::TCPHandle& c) { - // This fires on eof, most likely because the other side of the TCP connection closed it. - LogError("EOF on connection with ", c.peer().ip, ":", c.peer().port, ", closing quic stream"); - c.data()->close(); - }); - tcp.on([](const uvw::ErrorEvent& e, uvw::TCPHandle& tcp) { - LogError( - "ErrorEvent[", - e.name(), - ": ", - e.what(), - "] on connection with ", - tcp.peer().ip, - ":", - tcp.peer().port, - ", shutting down quic stream"); - // Failed to open connection, so close the quic stream - auto stream = tcp.data(); - if (stream) - stream->close(ERROR_TCP); - tcp.close(); - }); - tcp.on(tunnel::on_outgoing_data); - stream.data_callback = on_incoming_data; + TunnelManager::make_client(const SockAddr& remote, std::pair& row) + { + assert(remote.getPort() > 0); + auto& [pport, tunnel] = row; + assert(not tunnel.client); + tunnel.client = std::make_unique(service_endpoint_, remote, pport); + auto conn = tunnel.client->get_connection(); + + conn->on_stream_available = [this, id = row.first](Connection& conn) { + if (auto it = client_tunnels_.find(id); it != client_tunnels_.end()) + flush_pending_incoming(it->second, conn); + }; + } + + void + TunnelManager::flush_pending_incoming(ClientTunnel& ct, Connection& conn) + { + int available = conn.get_streams_available(); + while (available > 0 and not ct.pending_incoming.empty()) + { + auto client = ct.pending_incoming.front().lock(); + ct.pending_incoming.pop(); + if (not client) + continue; + + try + { + conn.open_stream( + [client](auto&&... args) { + initial_client_data_handler(*client, std::forward(args)...); + }, + [client](auto&&... args) { + initial_client_close_handler(*client, std::forward(args)...); + }); + available--; + } + catch (const std::exception& e) + { + LogWarn("Opening quic stream failed: ", e.what()); + client->closeReset(); + } + + LogTrace("Set up new stream"); + conn.io_ready(); + } } -} // namespace llarp::quic::tunnel + void + TunnelManager::receive_packet(const service::ConvoTag& tag, const llarp_buffer_t& buf) + { + if (buf.sz <= 4) + { + LogWarn("invalid quic packet: packet size (", buf.sz, ") too small"); + return; + } + auto type = static_cast(buf.base[0]); + nuint16_t pseudo_port_n; + std::memcpy(&pseudo_port_n.n, &buf.base[1], 2); + uint16_t pseudo_port = ToHost(pseudo_port_n).h; + auto ecn = static_cast(buf.base[3]); + bstring_view data{reinterpret_cast(&buf.base[4]), buf.sz - 4}; + + SockAddr remote{tag.ToV6()}; + quic::Endpoint* ep = nullptr; + if (type == CLIENT_TO_SERVER) + { + // Client-to-server: the header port is the return port + remote.setPort(pseudo_port); + if (!server_) + { + LogWarn("Dropping incoming quic packet to server: no listeners"); + return; + } + ep = server_.get(); + } + else if (type == SERVER_TO_CLIENT) + { + // Server-to-client: the header port tells us which client tunnel this is going to + if (auto it = client_tunnels_.find(pseudo_port); it != client_tunnels_.end()) + ep = it->second.client.get(); + + if (not ep) + { + LogWarn("Incoming quic packet to invalid/closed client; dropping"); + return; + } + } + else + { + LogWarn("Invalid incoming quic packet type ", type, "; dropping packet"); + return; + } + ep->receive_packet(remote, ecn, data); + } +} // namespace llarp::quic diff --git a/llarp/quic/tunnel.hpp b/llarp/quic/tunnel.hpp index 45e2bb397..7bcaccc2c 100644 --- a/llarp/quic/tunnel.hpp +++ b/llarp/quic/tunnel.hpp @@ -1,6 +1,10 @@ #pragma once +#include #include "stream.hpp" +#include "address.hpp" +#include "client.hpp" +#include "server.hpp" #include #include @@ -9,45 +13,186 @@ #include -namespace llarp::quic::tunnel +namespace llarp::quic { - // The server sends back a 0x00 to signal that the remote TCP connection was established and that - // it is now accepting stream data; the client is not allowed to send any other data down the - // stream until this comes back (any data sent down the stream before then is discarded.) - inline constexpr std::byte CONNECT_INIT{0x00}; - // QUIC application error codes we sent on failures: - // Failure to establish an initial connection: - inline constexpr uint64_t ERROR_CONNECT{0x5471907}; - // Error if we receive something other than CONNECT_INIT as the initial stream data from the - // server - inline constexpr uint64_t ERROR_BAD_INIT{0x5471908}; - // Close error code sent if we get an error on the TCP socket (other than an initial connect - // failure) - inline constexpr uint64_t ERROR_TCP{0x5471909}; - - // We pause reading from the local TCP socket if we have more than this amount of outstanding - // unacked data in the quic tunnel, then resume once it drops below this. - inline constexpr size_t PAUSE_SIZE = 64 * 1024; - - // Callbacks for network events. The uvw::TCPHandle client must contain a shared pointer to the - // associated llarp::quic::Stream in its data, and the llarp::quic::Stream must contain a weak - // pointer to the uvw::TCPHandle. - - // Callback when we receive data to go out over lokinet, i.e. read from the local TCP socket - void - on_outgoing_data(uvw::DataEvent& event, uvw::TCPHandle& client); - - // Callback when we receive data from lokinet to write to the local TCP socket - void - on_incoming_data(llarp::quic::Stream& stream, llarp::quic::bstring_view bdata); - - // Callback to handle and discard the first incoming 0x00 byte that initiates the stream - void - on_init_incoming_data(llarp::quic::Stream& stream, llarp::quic::bstring_view bdata); - - // Creates a new tcp handle that forwards incoming data/errors/closes into appropriate actions on - // the given quic stream. - void - install_stream_forwarding(uvw::TCPHandle& tcp, llarp::quic::Stream& stream); - -} // namespace llarp::quic::tunnel + namespace tunnel + { + // The server sends back a 0x00 to signal that the remote TCP connection was established and + // that it is now accepting stream data; the client is not allowed to send any other data down + // the stream until this comes back (any data sent down the stream before then is discarded.) + inline constexpr std::byte CONNECT_INIT{0x00}; + // QUIC application error codes we sent on failures: + // Failure to establish an initial connection: + inline constexpr uint64_t ERROR_CONNECT{0x5471907}; + // Error if we receive something other than CONNECT_INIT as the initial stream data from the + // server + inline constexpr uint64_t ERROR_BAD_INIT{0x5471908}; + // Close error code sent if we get an error on the TCP socket (other than an initial connect + // failure) + inline constexpr uint64_t ERROR_TCP{0x5471909}; + + // We pause reading from the local TCP socket if we have more than this amount of outstanding + // unacked data in the quic tunnel, then resume once it drops below this. + inline constexpr size_t PAUSE_SIZE = 64 * 1024; + } // namespace tunnel + + /// Manager class for incoming and outgoing QUIC tunnels. + class TunnelManager + { + public: + using ListenHandler = std::function( + std::string_view lokinet_addr, // The remote's full lokinet address + uint16_t port // The requested port the tunnel wants to reach + )>; + + // Timeout for the next `open()`. Note that when `open()` is given a ONS name to resolve this + // includes the resolution time. + std::chrono::milliseconds open_timeout = 10s; + + TunnelManager(service::Endpoint& endpoint); + + /// Adds an incoming listener callback. When a new incoming quic connection is initiated to us + /// by some remote we invoke these callback(s) in order of registration. Each one has three + /// options: + /// - return a concrete llarp::SockAddr giving the TCP address/port to which we should connect + /// new incoming streams over the connection. + /// - returns std::nullopt to decline handling the connection (we will try the next listen + /// handler, in order of registration). + /// - throws an exception (derived from std::exception) in which case we refuse the connection + /// without trying any additional handlers. + /// + /// If `listen()` is not called at all then new incoming connections will be immediately + /// dropped. + /// + /// For plain-C wrappers around this see [FIXME]. + int + listen(ListenHandler handler); + + /// Simple wrapper around `listen(...)` that adds a handler that accepts all incoming + /// connections trying to tunnel to port `port` and maps them to `localhost:port`. + int + listen(uint16_t port); + + /// Removes an incoming connection handler; takes the ID returned by `listen()`. + void + forget(int id); + + /// Called when open succeeds or times out. + using OpenCallback = std::function; + + /// Opens a quic tunnel to some remote lokinet address. (Should only be called from the event + /// loop thread.) + /// + /// \param remote_addr is the lokinet address or ONS name (e.g. `azfojblahblahblah.loki` or + /// `blocks.loki`) that the tunnel should connect to. + /// \param port is the tunneled port on the remote that the client wants to reach. (This is + /// *not* the quic pseudo-port, which is always 0). + /// \param callback callback invoked when the quic connection has been established, or has timed + /// out. + /// \param bind_addr is the bind address and port that we should use for the localhost TCP + /// connection. Use port 0 to let the OS choose a random high port. Defaults to `127.0.0.1:0`. + /// + /// This call immediately opens the local TCP socket, and initiates the lokinet connection and + /// QUIC tunnel to the remote. If the connection fails, the TCP socket will be closed. Note, + /// however, that this TCP socket will block until the underlying quic connection is + /// established. + /// + /// Each connection to the local TCP socket establishes a new stream over the QUIC connection. + /// + /// \return a pair: + /// - SockAddr containing the just-opened localhost socket that tunnels to the remote. This is + /// typically the same IP as `bind_addr`, with the port filled in (if bind_addr had a 0 port). + /// Note that, while you can connect to this socket immediately, it will block until the actual + /// connection and streams are established (and will be closed if they fail). + /// - unique integer that can be passed to close() to stop listening for new connections. This + /// also serves as a unique internal "pseudo-port" number to route returned quic packets to the + /// right connection. + /// + /// TODO: add a callback to invoke when QUIC connection succeeds or fails. + /// TODO: add a plain C wrapper around this + std::pair + open( + std::string_view remote_addr, + uint16_t port, + OpenCallback on_open = {}, + SockAddr bind_addr = {127, 0, 0, 1}); + + /// Start closing an outgoing tunnel; takes the ID returned by `open()`. Note that an existing + /// established tunneled connections will not be forcibly closed; this simply stops accepting + /// new tunnel connections. + void + close(int id); + + /// Called from tun code to deliver a quic packet. + /// + /// \param dest - the convotag for which the packet arrived + /// \param buf - the raw arriving packet + /// + void + receive_packet(const service::ConvoTag& tag, const llarp_buffer_t& buf); + + private: + service::Endpoint& service_endpoint_; + + struct ClientTunnel + { + // quic endpoint + std::unique_ptr client; + // Callback to invoke on quic connection established (true argument) or failed (false arg) + OpenCallback open_cb; + // TCP listening socket + std::shared_ptr tcp; + // Accepted TCP connections + std::unordered_set> conns; + // Queue of incoming connections that are waiting for a stream to become available (either + // because we are still handshaking, or we reached the stream limit). + std::queue> pending_incoming; + + ~ClientTunnel(); + }; + + // pseudo-port -> Client instance (the "port" is used to route incoming quic packets to the + // right quic endpoint); pseudo-ports start at 1. + std::map client_tunnels_; + + uint16_t next_pseudo_port_ = 0; + bool pport_wrapped_ = false; + + bool + continue_connecting( + uint16_t pseudo_port, bool step_success, std::string_view step_name, std::string_view addr); + + void + make_client(const SockAddr& remote, std::pair& row); + + void + flush_pending_incoming(ClientTunnel& ct, Connection& conn); + + // Server instance; this listens on pseudo-port 0 (if it listens). This is automatically + // instantiated the first time `listen()` is called; if not instantiated we simply drop any + // inbound client-to-server quic packets. + std::unique_ptr server_; + + void + make_server(); + + // Called when a new during connection handshaking once we have the established transport + // parameters (which include the port) if this is an incoming connection (and this endpoint is a + // server). This checks handlers to see whether the stream is allowed and, if so, returns a + // SockAddr containing the IP/port the tunnel should map to. Returns nullopt if the connection + // should be rejected. + std::optional + allow_connection(std::string_view lokinet_addr, uint16_t port); + + // Incoming stream handlers + std::map incoming_handlers_; + int next_handler_id_ = 1; + + std::shared_ptr + get_loop(); + + // Cleanup member + std::shared_ptr timer_keepalive_ = std::make_shared(0); + }; + +} // namespace llarp::quic diff --git a/llarp/quic/tunnel_client.cpp b/llarp/quic/tunnel_client.cpp deleted file mode 100644 index 4ae894e75..000000000 --- a/llarp/quic/tunnel_client.cpp +++ /dev/null @@ -1,143 +0,0 @@ -#include "connection.hpp" -#include "client.hpp" -#include "stream.hpp" -#include "tunnel.hpp" -#include - -#include - -#include - -#include -#include - -#include - -/* -using namespace std::literals; - -namespace llarp::quic::tunnel -{ - // When we receive a new incoming connection we immediately initiate a new quic stream. This quic - // stream in turn causes the other end to initiate a TCP connection on whatever port we specified - // in the connection; if the connection is established, it sends back a single byte 0x00 - // (CONNECT_INIT); otherwise it shuts down the stream with an error code. - void - on_new_connection(const uvw::ListenEvent&, uvw::TCPHandle& server) - { - LogDebug("New connection!\n"); - auto client = server.loop().resource(); - server.accept(*client); - - auto conn = server.data(); - std::shared_ptr stream; - try - { - LogTrace("open stream"); - stream = conn->open_stream( - [client](llarp::quic::Stream& stream, llarp::quic::bstring_view bdata) { - if (bdata.empty()) - return; - if (auto b0 = bdata[0]; b0 == tunnel::CONNECT_INIT) - { - // Set up callbacks, which replaces both of these initial callbacks - client->read(); - tunnel::install_stream_forwarding(*client, stream); - - if (bdata.size() > 1) - { - bdata.remove_prefix(1); - stream.data_callback(stream, std::move(bdata)); - } - LogTrace("starting client reading"); - } - else - { - LogWarn( - "Remote connection returned invalid initial byte (0x", - oxenmq::to_hex(bdata.begin(), bdata.begin() + 1), - "); dropping connection"); - client->closeReset(); - stream.close(tunnel::ERROR_BAD_INIT); - } - stream.io_ready(); - }, - [client](llarp::quic::Stream&, std::optional error_code) mutable { - if (error_code && *error_code == tunnel::ERROR_CONNECT) - LogDebug("Remote TCP connection failed, closing local connection"); - else - LogWarn( - "Stream connection closed ", - error_code ? "with error " + std::to_string(*error_code) : "gracefully", - "; closing local TCP connection."); - auto peer = client->peer(); - LogDebug("Closing connection to ", peer.ip, ":", peer.port); - if (error_code) - client->closeReset(); - else - client->close(); - }); - stream->io_ready(); - } - catch (const std::exception& e) - { - LogDebug("open stream failed"); - client->closeReset(); - return; - } - - LogTrace("done stream setup"); - conn->io_ready(); - } - - int - usage(std::string_view arg0, std::string_view msg) - { - std::cerr << msg << "\n\n" - << "Usage: " << arg0 - << " [DESTPORT [SERVERPORT [LISTENPORT]]]\n\nDefaults to ports 4444 4242 5555\n"; - return 1; - } - - int - main(int argc, char* argv[]) - { - auto loop = uvw::Loop::create(); - - std::array ports{{4444, 4242, 5555}}; - for (size_t i = 0; i < ports.size(); i++) - { - if (argc < 2 + (int)i) - break; - if (!parse_int(argv[1 + i], ports[i])) - return usage(argv[0], "Invalid port "s + argv[1 + i]); - } - auto& [dest_port, server_port, listen_port] = ports; - std::cout << "Connecting to quic server at localhost:" << server_port - << " to reach tunneled port " << dest_port - << ", listening on localhost:" << listen_port << "\n"; - - signal(SIGPIPE, SIG_IGN); - - LogDebug("Initializing client"); - auto tunnel_client = std::make_shared( - llarp::quic::Address{{127, 0, 0, 1}, server_port}, // server addr - loop, - dest_port // tunnel destination port - ); - tunnel_client->default_stream_buffer_size = 0; // We steal uvw's provided buffers - LogDebug("Initialized client"); - - // Start listening for TCP connections: - auto server = loop->resource(); - server->data(tunnel_client->get_connection()); - server->on(llarp::quic::tunnel::on_new_connection); - - server->bind("127.0.0.1", listen_port); - server->listen(); - - loop->run(); - } - -} // namespace llarp::quic::tunnel -*/ diff --git a/llarp/quic/tunnel_server.cpp b/llarp/quic/tunnel_server.cpp deleted file mode 100644 index f8c03e16f..000000000 --- a/llarp/quic/tunnel_server.cpp +++ /dev/null @@ -1,158 +0,0 @@ -#include "tunnel_server.hpp" -#include "tunnel.hpp" -#include "connection.hpp" -#include "server.hpp" -#include - -#include - -#include - -/* -using namespace std::literals; - -namespace llarp::quic::tunnel -{ - IncomingTunnel::IncomingTunnel(uint16_t localhost_port) - : IncomingTunnel{ - [localhost_port]( - [[maybe_unused]] const auto& remote, uint16_t port, SockAddr& connect_to) { - if (port != localhost_port) - return AcceptResult::DECLINE; - connect_to.setIPv4(127, 0, 0, 1); - connect_to.setPort(port); - return AcceptResult::ACCEPT; - }} - {} - - int - usage(std::string_view arg0, std::string_view msg) - { - std::cerr << msg << "\n\n" - << "Usage: " << arg0 - << " [LISTENPORT [ALLOWED ...]]\n\nDefaults to listening on 4242 and allowing " - "22,80,4444,8080\n"; - return 1; - } - - int - main(int argc, char* argv[]) - { - uint16_t listen_port = 4242; - std::set allowed_ports{{22, 80, 4444, 8080}}; - - if (argc >= 2 && !parse_int(argv[1], listen_port)) - return usage(argv[0], "Invalid port "s + argv[1]); - if (argc >= 3) - { - allowed_ports.clear(); - for (int i = 2; i < argc; i++) - { - if (argv[i] == "all"sv) - { - allowed_ports.clear(); - break; - } - uint16_t port; - if (!parse_int(argv[i], port)) - return usage(argv[0], "Invalid port "s + argv[i]); - allowed_ports.insert(port); - } - } - - auto loop = uvw::Loop::create(); - - Address listen_addr{{0, 0, 0, 0}, listen_port}; - - signal(SIGPIPE, SIG_IGN); - - // The local address we connect to for incoming connections. (localhost for this demo, should - // be the localhost.loki address for lokinet). - std::string localhost = "127.0.0.1"; - - LogInfo("Initializing QUIC server"); - llarp::quic::Server s{ - listen_addr, - loop, - [loop, localhost, allowed_ports]( - llarp::quic::Server&, llarp::quic::Stream& stream, uint16_t port) { - LogDebug("New incoming quic stream ", stream.id(), " to reach ", localhost, ":", port); - if (port == 0 || !(allowed_ports.empty() || allowed_ports.count(port))) - { - LogWarn( - "quic stream denied by configuration: ", port, " is not a permitted local port"); - return false; - } - - stream.close_callback = [](llarp::quic::Stream& strm, - std::optional error_code) { - LogDebug( - error_code ? "Remote side" : "We", - " closed the quic stream, closing localhost tcp connection"); - if (error_code && *error_code > 0) - LogWarn("Remote quic stream was closed with error code ", *error_code); - auto tcp = strm.data(); - if (!tcp) - LogDebug("Local TCP connection already closed"); - else - tcp->close(); - }; - // Try to open a TCP connection to the configured localhost port; if we establish a - // connection then we immediately send a CONNECT_INIT back down the stream; if we fail - // then we send a fail-to-connect error code. Once we successfully connect both of - // these handlers get replaced with the normal tunnel handlers. - auto tcp = loop->resource(); - auto error_handler = tcp->once( - [&stream, localhost, port](const uvw::ErrorEvent&, uvw::TCPHandle&) { - LogWarn( - "Failed to connect to ", localhost, ":", port, ", shutting down quic stream"); - stream.close(tunnel::ERROR_CONNECT); - }); - tcp->once( - [streamw = stream.weak_from_this(), error_handler = std::move(error_handler)]( - const uvw::ConnectEvent&, uvw::TCPHandle& tcp) { - auto peer = tcp.peer(); - auto stream = streamw.lock(); - if (!stream) - { - LogWarn( - "Connected to ", - peer.ip, - ":", - peer.port, - " but quic stream has gone away; resetting local connection"); - tcp.closeReset(); - return; - } - LogDebug("Connected to ", peer.ip, ":", peer.port, " for quic ", stream->id()); - tcp.erase(error_handler); - tunnel::install_stream_forwarding(tcp, *stream); - assert(stream->used() == 0); - - stream->append_buffer(new std::byte[1]{tunnel::CONNECT_INIT}, 1); - tcp.read(); - }); - - // FIXME, need to configure this - tcp->connect("127.0.0.1", port); - - return true; - }}; - s.default_stream_buffer_size = 0; // We steal uvw's provided buffers - LogDebug("Initialized server"); - std::cout << "Listening on localhost:" << listen_port - << " with tunnel(s) to localhost port(s):"; - if (allowed_ports.empty()) - std::cout << " (any)"; - for (auto p : allowed_ports) - std::cout << ' ' << p; - std::cout << '\n'; - - loop->run(); - - return 0; - } - -} // namespace llarp::quic::tunnel - -*/ diff --git a/llarp/quic/tunnel_server.hpp b/llarp/quic/tunnel_server.hpp deleted file mode 100644 index 67dac4146..000000000 --- a/llarp/quic/tunnel_server.hpp +++ /dev/null @@ -1,80 +0,0 @@ -#pragma once - -#include "address.hpp" -#include -#include - -#include - -namespace llarp::quic::tunnel -{ - enum class AcceptResult : int - { - ACCEPT = 0, // Accepts a connection - DECLINE = -1, // Declines a connection (try other callbacks, refuse if all decline) - REFUSE = -2, // Refuses a connection (don't try any more callbacks) - }; - - // Class that wraps an incoming connection acceptance callback (to allow for callback removal). - // This is not directly constructible: you must construct it via the TunnelServer instance. - class IncomingTunnel final - { - public: - using AcceptCallback = std::function; - - private: - AcceptCallback accept; - - friend class TunnelServer; - - // Constructor with a full callback; invoked via TunnelServer::add_incoming_tunnel - explicit IncomingTunnel(AcceptCallback accept) : accept{std::move(accept)} - {} - - // Constructor for a simple forwarding to a single localhost port. E.g. IncomingTunnel(22) - // allows incoming connections to reach port 22 and forwards them to localhost:22. - explicit IncomingTunnel(uint16_t localhost_port); - - // Constructor for forwarding everything to the same port; this is used by full clients by - // default. - IncomingTunnel(); - }; - - // Class that handles incoming quic connections. This class sets itself up in the llarp event - // loop on construction and maintains a list of incoming acceptor callbacks. When a new incoming - // quic connections is being established we try the callbacks one by one to determine the local - // TCP port the tunnel should be connected to until: - // - a callback sets connect_to and returns AcceptResult::ACCEPT - we connect it to the returned - // address - // - a callback returns AcceptResult::REFUSE - we reject the connection - // - // If a callback returns AcceptResult::DECLINE then we skip that callback and try the next one; if - // all callbacks decline (or we have no callbacks at all) then we reject the connection. - // - // Note that tunnel operations and initialization are done in the event loop thread and so will - // not take effect until the next event loop tick when called from some other thread. - class TunnelServer : public std::enable_shared_from_this - { - public: - explicit TunnelServer(EventLoop_ptr ev); - - // Appends a new tunnel to the end of the queue; all arguments are forwarded to private - // constructor(s) of IncomingTunnel. - template - std::shared_ptr - add_incoming_tunnel(Args&&... args) - { - return std::shared_ptr{new IncomingTunnel{std::forward(args)...}}; - } - - // Removes a tunnel acceptor from the acceptor queue. - void - remove_incoming_tunnel(std::weak_ptr tunnel); - - private: - EventLoop_ptr ev; - std::vector> tunnels; - }; - -} // namespace llarp::quic::tunnel diff --git a/llarp/service/endpoint.cpp b/llarp/service/endpoint.cpp index fae0b7182..04217cef0 100644 --- a/llarp/service/endpoint.cpp +++ b/llarp/service/endpoint.cpp @@ -30,7 +30,7 @@ #include #include #include -#include +#include #include #include @@ -56,6 +56,9 @@ namespace llarp m_state->m_Router = r; m_state->m_Name = "endpoint"; m_RecvQueue.enable(); + + if (Loop()->MaybeGetUVWLoop()) + m_quic = std::make_unique(*this); } bool @@ -83,64 +86,6 @@ namespace llarp m_StartupLNSMappings[name] = std::make_pair(range, auth); }); - auto loop = Router()->loop()->MaybeGetUVWLoop(); - assert(loop); - auto callback = [this, loop, ports = conf.m_quicServerPorts]( - quic::Server& serv, quic::Stream& stream, uint16_t port) { - if (ports.count(port) == 0) - { - return false; - } - - stream.close_callback = [](quic::Stream& st, - [[maybe_unused]] std::optional errcode) { - auto tcp = st.data(); - if (tcp) - tcp->close(); - }; - - auto localIP = net::TruncateV6(GetIfAddr()); - - std::string localhost = localIP.ToString(); - - auto tcp = loop->resource(); - auto error_handler = tcp->once( - [&stream, localhost, port](const uvw::ErrorEvent&, uvw::TCPHandle&) { - LogWarn("Failed to connect to ", localhost, ":", port, ", shutting down quic stream"); - stream.close(quic::tunnel::ERROR_CONNECT); - }); - tcp->once( - [streamw = stream.weak_from_this(), error_handler = std::move(error_handler)]( - const uvw::ConnectEvent&, uvw::TCPHandle& tcp) { - auto peer = tcp.peer(); - auto stream = streamw.lock(); - if (!stream) - { - LogWarn( - "Connected to ", - peer.ip, - ":", - peer.port, - " but quic stream has gone away; resetting local connection"); - tcp.closeReset(); - return; - } - LogDebug("Connected to ", peer.ip, ":", peer.port, " for quic ", stream->id()); - tcp.erase(error_handler); - quic::tunnel::install_stream_forwarding(tcp, *stream); - assert(stream->used() == 0); - - stream->append_buffer(new std::byte[1]{quic::tunnel::CONNECT_INIT}, 1); - tcp.read(); - }); - - tcp->connect(localhost, port); - - return true; - }; - - m_QuicServer = std::make_shared(this, loop, callback); - return m_state->Configure(conf); } @@ -1729,5 +1674,11 @@ namespace llarp return itr->second; } + quic::TunnelManager* + Endpoint::GetQUICTunnel() + { + return m_quic.get(); + } + } // namespace service } // namespace llarp diff --git a/llarp/service/endpoint.hpp b/llarp/service/endpoint.hpp index a461000df..8faa440e2 100644 --- a/llarp/service/endpoint.hpp +++ b/llarp/service/endpoint.hpp @@ -35,6 +35,11 @@ namespace llarp { + namespace quic + { + class TunnelManager; + } + namespace service { struct AsyncKeyExchange; @@ -386,6 +391,11 @@ namespace llarp std::optional MaybeGetAuthInfoForEndpoint(service::Address addr); + /// Returns a pointer to the quic::Tunnel object handling quic connections for this endpoint. + /// Returns nullptr if quic is not supported. + quic::TunnelManager* + GetQUICTunnel(); + protected: /// parent context that owns this endpoint Context* const context; @@ -437,6 +447,7 @@ namespace llarp std::unique_ptr m_state; std::shared_ptr m_AuthPolicy; std::unordered_map m_RemoteAuthInfos; + std::unique_ptr m_quic; /// (lns name, optional exit range, optional auth info) for looking up on startup std::unordered_map, std::optional>> @@ -462,8 +473,6 @@ namespace llarp ConvoMap& Sessions(); // clang-format on thread::Queue m_RecvQueue; - - std::shared_ptr m_QuicServer; }; using Endpoint_ptr = std::shared_ptr;