diff --git a/llarp/CMakeLists.txt b/llarp/CMakeLists.txt index 213b223a6..3ad144f93 100644 --- a/llarp/CMakeLists.txt +++ b/llarp/CMakeLists.txt @@ -95,8 +95,6 @@ add_library(lokinet-quic quic/server.cpp quic/stream.cpp quic/tunnel.cpp - quic/tunnel_client.cpp - quic/tunnel_server.cpp ) target_link_libraries(lokinet-quic PRIVATE lokinet-platform ngtcp2) diff --git a/llarp/config/config.cpp b/llarp/config/config.cpp index 9207b0f8c..e9ad25752 100644 --- a/llarp/config/config.cpp +++ b/llarp/config/config.cpp @@ -622,16 +622,6 @@ namespace llarp m_SRVRecords.push_back(std::move(newSRV)); }); - conf.defineOption( - "network", - "expose", - ClientOnly, - MultiValue, - Comment{ - "expose a local port via quic for liblokinet", - }, - [this](uint16_t port) { m_quicServerPorts.insert(port); }); - // Deprecated options: conf.defineOption("network", "enabled", Deprecated); } diff --git a/llarp/config/config.hpp b/llarp/config/config.hpp index 4a0ba8169..9d4072c7c 100644 --- a/llarp/config/config.hpp +++ b/llarp/config/config.hpp @@ -121,8 +121,6 @@ namespace llarp std::optional m_baseV6Address; - std::unordered_set m_quicServerPorts; - // TODO: // on-up // on-down diff --git a/llarp/handlers/tun.cpp b/llarp/handlers/tun.cpp index 59f44fdb5..455f142e7 100644 --- a/llarp/handlers/tun.cpp +++ b/llarp/handlers/tun.cpp @@ -1,5 +1,4 @@ #include -#include #include // harmless on other platforms #define __USE_MINGW_ANSI_STDIO 1 @@ -12,14 +11,17 @@ #include #include +#include #include #include #include #include #include #include +#include #include #include +#include #include #include @@ -89,8 +91,8 @@ namespace llarp : service::Endpoint(r, parent) , m_UserToNetworkPktQueue("endpoint_sendq", r->loop(), r->loop()) { - m_PacketRouter.reset( - new vpn::PacketRouter{[&](net::IPPacket pkt) { HandleGotUserPacket(std::move(pkt)); }}); + m_PacketRouter = std::make_unique( + [this](net::IPPacket pkt) { HandleGotUserPacket(std::move(pkt)); }); #ifdef ANDROID m_Resolver = std::make_shared(r, this); m_PacketRouter->AddUDPHandler(huint16_t{53}, [&](net::IPPacket pkt) { @@ -1007,6 +1009,23 @@ namespace llarp service::ProtocolType t, uint64_t seqno) { + if (t == service::ProtocolType::QUIC) + { + auto* quic = GetQUICTunnel(); + if (!quic) + { + LogWarn("incoming quic packet but this endpoint is not quic capable; dropping"); + return false; + } + if (buf.sz < 4) + { + LogWarn("invalid incoming quic packet, dropping"); + return false; + } + quic->receive_packet(tag, buf); + return true; + } + if (t != service::ProtocolType::TrafficV4 && t != service::ProtocolType::TrafficV6 && t != service::ProtocolType::Exit) return false; diff --git a/llarp/net/sock_addr.cpp b/llarp/net/sock_addr.cpp index cd4f852fe..67adee372 100644 --- a/llarp/net/sock_addr.cpp +++ b/llarp/net/sock_addr.cpp @@ -197,6 +197,12 @@ namespace llarp return &m_addr; } + size_t + SockAddr::sockaddr_len() const + { + return isIPv6() ? sizeof(m_addr) : sizeof(m_addr4); + } + bool SockAddr::operator<(const SockAddr& other) const { diff --git a/llarp/net/sock_addr.hpp b/llarp/net/sock_addr.hpp index a34aaecfc..9078aa015 100644 --- a/llarp/net/sock_addr.hpp +++ b/llarp/net/sock_addr.hpp @@ -68,6 +68,9 @@ namespace llarp operator const sockaddr_in*() const; operator const sockaddr_in6*() const; + size_t + sockaddr_len() const; + bool operator<(const SockAddr& other) const; diff --git a/llarp/quic/address.cpp b/llarp/quic/address.cpp index 0e62b029c..48dd05261 100644 --- a/llarp/quic/address.cpp +++ b/llarp/quic/address.cpp @@ -6,7 +6,7 @@ namespace llarp::quic { using namespace std::literals; - Address::Address(service::ConvoTag tag) : saddr{tag.ToV6()} + Address::Address(const SockAddr& addr) : saddr{*addr.operator const sockaddr_in6*()} {} Address& @@ -17,8 +17,7 @@ namespace llarp::quic return *this; } - service::ConvoTag - Address::Tag() const + Address::operator service::ConvoTag() const { service::ConvoTag tag{}; tag.FromV6(saddr); @@ -30,9 +29,14 @@ namespace llarp::quic { if (a.addrlen != sizeof(sockaddr_in6)) return "(unknown-addr)"; - char buf[INET6_ADDRSTRLEN] = {0}; - inet_ntop(AF_INET6, &saddr.sin6_addr, buf, INET6_ADDRSTRLEN); - return buf; + std::string result; + result.resize(8 + INET6_ADDRSTRLEN); + result[0] = '['; + inet_ntop(AF_INET6, &saddr.sin6_addr, &result[1], INET6_ADDRSTRLEN); + result.resize(result.find(char{0})); + result += "]:"; + result += std::to_string(ToHost(nuint16_t{saddr.sin6_port}).h); + return result; } std::ostream& diff --git a/llarp/quic/address.hpp b/llarp/quic/address.hpp index 56ea0700d..812f37849 100644 --- a/llarp/quic/address.hpp +++ b/llarp/quic/address.hpp @@ -1,5 +1,7 @@ #pragma once +#include + #include #include #include @@ -14,14 +16,8 @@ namespace llarp::quic { - union sockaddr_any - { - sockaddr_storage storage; - sockaddr sa; - sockaddr_in6 in6; - sockaddr_in in; - }; - + // Wrapper around a sockaddr; ngtcp2 requires more intrusive access that llarp::SockAddr is meant + // to deal with, hence this wrapper (rather than trying to abuse llarp::SockAddr). class Address { sockaddr_in6 saddr{}; @@ -29,19 +25,18 @@ namespace llarp::quic public: Address() = default; - Address(service::ConvoTag tag); + Address(const SockAddr& addr); Address(const Address& other) { *this = other; } - service::ConvoTag - Tag() const; - Address& operator=(const Address&); + // Implicit conversion to sockaddr* and ngtcp2_addr& so that an Address can be passed wherever + // one of those is expected. operator sockaddr*() { return reinterpret_cast(&saddr); @@ -68,6 +63,32 @@ namespace llarp::quic return sizeof(sockaddr_in6); } + // Implicit conversion to a convo tag so you can pass an Address to things taking a ConvoTag + operator service::ConvoTag() const; + + // Returns the lokinet pseudo-port for the quic connection (which routes this quic packet to the + // correct waiting quic instance on the remote). + nuint16_t + port() const + { + return nuint16_t{saddr.sin6_port}; + } + + // Sets the address port + void + port(nuint16_t port) + { + saddr.sin6_port = port.n; + } + + // Implicit conversion to SockAddr for going back to general llarp code + // FIXME: see if this is still needed, I think it may have been refactored away with the + // ConvoTag operator + operator SockAddr() const + { + return SockAddr(saddr); + } + std::string to_string() const; }; diff --git a/llarp/quic/client.cpp b/llarp/quic/client.cpp index 0a255dd6c..13dc7a169 100644 --- a/llarp/quic/client.cpp +++ b/llarp/quic/client.cpp @@ -1,62 +1,46 @@ - #include "client.hpp" +#include "tunnel.hpp" #include #include +#include #include #include #include #include +#include + namespace llarp::quic { - Client::Client(service::ConvoTag tag, service::Endpoint* parent, uint16_t tunnel_port) - : Endpoint{parent, parent->Loop()->MaybeGetUVWLoop()} + Client::Client(service::Endpoint& ep, const SockAddr& remote, uint16_t pseudo_port) : Endpoint{ep} { - // Our UDP socket is now set up, so now we initiate contact with the remote QUIC - Address remote{std::move(tag)}; + default_stream_buffer_size = + 0; // We steal uvw's provided buffers so don't need an outgoing data buffer - Path path{local, remote}; - llarp::LogDebug("Connecting to ", remote); + // *Our* port; we stuff this in the llarp quic header so it knows how to target quic packets + // back to *this* client. + local_addr.port(ToNet(huint16_t{pseudo_port})); + uint16_t tunnel_port = remote.getPort(); if (tunnel_port == 0) throw std::logic_error{"Cannot tunnel to port 0"}; // TODO: need timers for: // - // - timeout (to disconnect if idle for too longer) + // - timeout (to disconnect if idle for too long) // // - probably don't need for lokinet tunnel: change local addr -- attempts to re-bind the local // socket // // - key_update_timer - // - // - delay_stream_timer - - auto connptr = std::make_shared(*this, ConnectionID::random(), path, tunnel_port); - auto& conn = *connptr; - conns.emplace(conn.base_cid, connptr); - - /* Debug("set crypto ctx"); - null_crypto.client_initial(conn); - - auto x = ngtcp2_conn_get_max_data_left(conn); - Debug("mdl = ", x); - */ - - conn.io_ready(); + Path path{local_addr, remote}; + llarp::LogDebug("Connecting to ", remote); - /* - Debug("Opening bidi stream"); - int64_t stream_id; - if (auto rv = ngtcp2_conn_open_bidi_stream(conn, &stream_id, nullptr); - rv != 0) { - Debug("Opening bidi stream failed: ", ngtcp2_strerror(rv)); - assert(rv == NGTCP2_ERR_STREAM_ID_BLOCKED); - } - else { Debug("Opening bidi stream good"); } - */ + auto conn = std::make_shared(*this, ConnectionID::random(), path, tunnel_port); + conn->io_ready(); + conns.emplace(conn->base_cid, std::move(conn)); } std::shared_ptr @@ -72,29 +56,13 @@ namespace llarp::quic return std::get(it->second); } - void - Client::handle_packet(const Packet& p) + size_t + Client::write_packet_header(nuint16_t, uint8_t ecn) { - llarp::LogDebug("Handling incoming client packet: ", buffer_printer{p.data}); - auto maybe_dcid = handle_packet_init(p); - if (!maybe_dcid) - return; - auto& dcid = *maybe_dcid; - - llarp::LogDebug("Incoming connection id ", dcid); - auto [connptr, alias] = get_conn(dcid); - if (!connptr) - { - llarp::LogDebug("CID is ", alias ? "expired alias" : "unknown/expired", "; dropping"); - return; - } - auto& conn = *connptr; - if (alias) - llarp::LogDebug("CID is alias for primary CID ", conn.base_cid); - else - llarp::LogDebug("CID is primary CID"); - - handle_conn_packet(conn, p); + buf_[0] = CLIENT_TO_SERVER; + auto pseudo_port = local_addr.port(); + std::memcpy(&buf_[1], &pseudo_port.n, 2); // remote quic pseudo-port (network order u16) + buf_[3] = std::byte{ecn}; + return 4; } - } // namespace llarp::quic diff --git a/llarp/quic/client.hpp b/llarp/quic/client.hpp index 6cca4a55d..ff94e7ed7 100644 --- a/llarp/quic/client.hpp +++ b/llarp/quic/client.hpp @@ -1,18 +1,26 @@ #pragma once #include "endpoint.hpp" +#include "service/endpoint.hpp" #include +namespace uvw +{ + struct ListenEvent; + class TCPHandle; +} // namespace uvw + namespace llarp::quic { class Client : public Endpoint { public: // Constructs a client that establishes an outgoing connection to `remote` to tunnel packets to - // `tunnel_port` on the remote's lokinet address. `local` can be used to optionally bind to a - // local IP and/or port for the connection. - Client(service::ConvoTag remote, service::Endpoint* parent, uint16_t tunnel_port); + // `remote.getPort()` on the remote's lokinet address. `pseudo_port` is *our* unique local + // identifier which we include in outgoing packets (so that the remote server knows where to + // send the back to *this* client). + Client(service::Endpoint& ep, const SockAddr& remote, uint16_t pseudo_port); // Returns a reference to the client's connection to the server. Returns a nullptr if there is // no connection. @@ -20,8 +28,8 @@ namespace llarp::quic get_connection(); private: - void - handle_packet(const Packet& p) override; + size_t + write_packet_header(nuint16_t remote_port, uint8_t ecn) override; }; } // namespace llarp::quic diff --git a/llarp/quic/connection.cpp b/llarp/quic/connection.cpp index d8d40504f..f7e116146 100644 --- a/llarp/quic/connection.cpp +++ b/llarp/quic/connection.cpp @@ -126,7 +126,7 @@ namespace llarp::quic // At this stage of the protocol with TLS the client sends back TLS info so that // the server can install our rx key; we have to send *something* back to invoke // the server's HANDSHAKE callback (so that it knows handshake is complete) so - // sent the magic again. + // send the magic again. if (auto rv = conn.send_magic(NGTCP2_CRYPTO_LEVEL_HANDSHAKE); rv != 0) return rv; } @@ -262,6 +262,18 @@ namespace llarp::quic // FIXME return 0; } + int + extend_max_local_streams_bidi(ngtcp2_conn* conn_, uint64_t max_streams, void* user_data) + { + LogTrace("######################", __func__); + auto& conn = *static_cast(user_data); + if (conn.on_stream_available) + if (uint64_t left = ngtcp2_conn_get_streams_bidi_left(conn); left > 0) + conn.on_stream_available(conn); + + return 0; + } + int rand( uint8_t* dest, @@ -361,10 +373,11 @@ namespace llarp::quic std::tuple Connection::init() { - io_trigger = endpoint.loop->resource(); + auto loop = endpoint.get_loop(); + io_trigger = loop->resource(); io_trigger->on([this](auto&, auto&) { on_io_ready(); }); - retransmit_timer = endpoint.loop->resource(); + retransmit_timer = loop->resource(); retransmit_timer->on([this](auto&, auto&) { LogTrace("Retransmit timer fired!"); if (auto rv = ngtcp2_conn_handle_expiry(*this, get_timestamp()); rv != 0) @@ -389,6 +402,7 @@ namespace llarp::quic cb.acked_stream_data_offset = acked_stream_data_offset; cb.stream_open = stream_open; cb.stream_reset = stream_reset_cb; + cb.extend_max_local_streams_bidi = extend_max_local_streams_bidi; cb.rand = rand; cb.get_new_connection_id = get_new_connection_id; cb.remove_connection_id = remove_connection_id; @@ -770,7 +784,7 @@ namespace llarp::quic stream->stream_id = id; bool good = true; if (serv->stream_open_callback) - good = serv->stream_open_callback(*serv, *stream, tunnel_port); + good = serv->stream_open_callback(*stream, tunnel_port); if (!good) { LogDebug("stream_open_callback returned failure, dropping stream ", id); @@ -897,16 +911,29 @@ namespace llarp::quic return endpoint.add_connection_id(*this, cidlen); } + bool + Connection::get_handshake_completed() + { + return ngtcp2_conn_get_handshake_completed(*this) != 0; + } + + int + Connection::get_streams_available() + { + uint64_t left = ngtcp2_conn_get_streams_bidi_left(*this); + constexpr int max_int = std::numeric_limits::max(); + if (left > static_cast(max_int)) + return max_int; + return static_cast(left); + } + const std::shared_ptr& Connection::open_stream(Stream::data_callback_t data_cb, Stream::close_callback_t close_cb) { std::shared_ptr stream{new Stream{ *this, std::move(data_cb), std::move(close_cb), endpoint.default_stream_buffer_size}}; if (int rv = ngtcp2_conn_open_bidi_stream(*this, &stream->stream_id.id, stream.get()); rv != 0) - { - LogWarn("Creating stream failed: ", ngtcp2_strerror(rv)); throw std::runtime_error{"Stream creation failed: "s + ngtcp2_strerror(rv)}; - } auto& str = streams[stream->stream_id]; str = std::move(stream); @@ -979,6 +1006,12 @@ namespace llarp::quic if (!ngtcp2_conn_is_server(*this)) endpoint.null_crypto.install_tx_key(*this); ngtcp2_conn_handshake_completed(*this); + + if (on_handshake_complete) + { + on_handshake_complete(*this); + on_handshake_complete = nullptr; + } } // ngtcp2 doesn't expose the varint encoding, but it's fairly simple: diff --git a/llarp/quic/connection.hpp b/llarp/quic/connection.hpp index 7f414c8f8..33b840680 100644 --- a/llarp/quic/connection.hpp +++ b/llarp/quic/connection.hpp @@ -184,26 +184,28 @@ namespace llarp::quic // when the connection is initiated. std::map> streams; - /// Constructs and initializes a new connection received by a Server + /// Constructs and initializes a new incoming connection /// - /// \param s - the Server object on which the connection was initiated + /// \param server - the Server object that owns this connection /// \param base_cid - the local "primary" ConnectionID we use for this connection, typically - /// random \param header - packet header that initiated the connection \param path - the network - /// path to reach the remote - Connection(Server& s, const ConnectionID& base_cid, ngtcp2_pkt_hd& header, const Path& path); + /// random + /// \param header - packet header that initiated the connection \param path - the network path + /// to reach the remote + Connection( + Server& server, const ConnectionID& base_cid, ngtcp2_pkt_hd& header, const Path& path); /// Establishes a connection from the local Client to a remote Server - /// \param c - the Client object from which the connection is being made + /// \param client - the Endpoint object that owns this connection /// \param base_cid - the client's source (i.e. local) connection ID, typically random /// \param path - the network path to reach the remote /// \param tunnel_port - the port that this connection should tunnel to on the remote end - Connection(Client& c, const ConnectionID& scid, const Path& path, uint16_t tunnel_port); + Connection(Client& client, const ConnectionID& scid, const Path& path, uint16_t tunnel_port); // Non-movable, non-copyable: Connection(Connection&&) = delete; + Connection(const Connection&) = delete; Connection& operator=(Connection&&) = delete; - Connection(const Connection&) = delete; Connection& operator=(const Connection&) = delete; @@ -265,6 +267,25 @@ namespace llarp::quic ConnectionID make_alias_id(size_t cidlen = ConnectionID::max_size()); + // A callback to invoke when the connection handshake completes. Will be cleared after being + // called. + std::function on_handshake_complete; + + // Returns true iff this connection has completed a handshake with the remote end. + bool + get_handshake_completed(); + + // Callback that is invoked whenever new streams become available: i.e. after handshaking, or + // after existing streams are closed. Note that this callback is invoked whenever the number of + // available streams increases, even if it was initially non-zero before the increase. To see + // how many streams are currently available call `get_streams_available()` (it will always be at + // least 1 when this callback is invoked). + std::function on_stream_available; + + // Returns the number of available streams that can currently be opened on the connection + int + get_streams_available(); + // Opens a stream over this connection; when the server receives this it attempts to establish a // TCP connection to the tunnel configured in the connection. The data callback is invoked as // data is received on this stream. The close callback is called if the stream is closed diff --git a/llarp/quic/endpoint.cpp b/llarp/quic/endpoint.cpp index 43ea0ad4a..b8be36f70 100644 --- a/llarp/quic/endpoint.cpp +++ b/llarp/quic/endpoint.cpp @@ -5,6 +5,7 @@ #include #include #include +#include #include #include @@ -21,13 +22,12 @@ extern "C" namespace llarp::quic { - Endpoint::Endpoint(service::Endpoint* parent_, std::shared_ptr loop_) - : parent{parent_}, loop{std::move(loop_)} + Endpoint::Endpoint(service::Endpoint& ep) : service_endpoint{ep} { randombytes_buf(static_secret.data(), static_secret.size()); // Set up a callback every 250ms to clean up stale sockets, etc. - expiry_timer = loop->resource(); + expiry_timer = get_loop()->resource(); expiry_timer->on([this](const auto&, auto&) { check_timeouts(); }); expiry_timer->start(250ms, 250ms); @@ -40,6 +40,61 @@ namespace llarp::quic expiry_timer->close(); } + std::shared_ptr + Endpoint::get_loop() + { + auto loop = service_endpoint.Loop()->MaybeGetUVWLoop(); + assert(loop); // This object should never have been constructed if we aren't using uvw + return loop; + } + + void + Endpoint::receive_packet(const SockAddr& src, uint8_t ecn, bstring_view data) + { + // ngtcp2 wants a local address but we don't necessarily have something so just set it to + // IPv4 or IPv6 "unspecified" address (0.0.0.0 or ::) + SockAddr local = src.isIPv6() ? SockAddr{in6addr_any} : SockAddr{nuint32_t{INADDR_ANY}}; + + Packet pkt{Path{local, src}, data, ngtcp2_pkt_info{.ecn = ecn}}; + + LogDebug("[", pkt.path, ",ecn=", pkt.info.ecn, "]: received ", data.size(), " bytes"); + + handle_packet(pkt); + + LogDebug("Done handling packet"); + } + + void + Endpoint::handle_packet(const Packet& p) + { + LogDebug("Handling incoming quic packet: ", buffer_printer{p.data}); + auto maybe_dcid = handle_packet_init(p); + if (!maybe_dcid) + return; + auto& dcid = *maybe_dcid; + + // See if we have an existing connection already established for it + LogDebug("Incoming connection id ", dcid); + auto [connptr, alias] = get_conn(dcid); + if (!connptr) + { + if (alias) + { + LogDebug("CID is an expired alias; dropping"); + return; + } + connptr = accept_initial_connection(p); + if (!connptr) + return; + } + if (alias) + llarp::LogDebug("CID is alias for primary CID ", connptr->base_cid); + else + llarp::LogDebug("CID is primary CID"); + + handle_conn_packet(*connptr, p); + } + std::optional Endpoint::handle_packet_init(const Packet& p) { @@ -119,32 +174,20 @@ namespace llarp::quic return {rv}; } - void - Endpoint::update_ecn(uint32_t ecn) - { - assert(ecn <= std::numeric_limits::max()); - if (ecn_curr != ecn) - { - ecn_curr = ecn; - } - } - io_result - Endpoint::send_packet(const Address& to, bstring_view data, uint32_t ecn) + Endpoint::send_packet(const Address& to, bstring_view data, uint8_t ecn) { - update_ecn(ecn); - - parent->SendTo(to.Tag(), data, service::ProtocolType::QUIC); - LogDebug( - "[", - to.to_string(), - ",ecn=0x", - std::hex, - +ecn_curr, - std::dec, - "]: sent ", - data.size(), - " bytes"); + assert(service_endpoint.Loop()->inEventLoop()); + + size_t header_size = write_packet_header(to.port(), ecn); + size_t outgoing_len = header_size + data.size(); + assert(outgoing_len <= buf_.size()); + std::memcpy(&buf_[header_size], data.data(), data.size()); + bstring_view outgoing{buf_.data(), outgoing_len}; + + service_endpoint.SendToOrQueue(to, outgoing, service::ProtocolType::QUIC); + LogDebug("[", to, "]: sent ", outgoing.size(), " bytes"); + LogTrace("Full quic data: ", buffer_printer{outgoing}); return {}; } @@ -207,15 +250,11 @@ namespace llarp::quic conn.conn_buffer.resize(written); conn.closing = true; - // FIXME: ipv6 - assert(path.local.sockaddr_size() == sizeof(sockaddr_in)); - assert(path.remote.sockaddr_size() == sizeof(sockaddr_in)); - conn.path = path; } assert(conn.closing && !conn.conn_buffer.empty()); - if (auto sent = send_packet(conn.path.remote, conn.conn_buffer, 0); !sent) + if (auto sent = send_packet(conn.path.remote, conn.conn_buffer, 0); not sent) { LogWarn( "Failed to send packet: ", diff --git a/llarp/quic/endpoint.hpp b/llarp/quic/endpoint.hpp index 5e82d9782..aad7fcaca 100644 --- a/llarp/quic/endpoint.hpp +++ b/llarp/quic/endpoint.hpp @@ -6,52 +6,61 @@ #include "null_crypto.hpp" #include "packet.hpp" #include "stream.hpp" -#include "uvw/async.h" +#include #include #include #include #include #include - #include -#include -#include +#include #include -#include - namespace llarp::service { struct Endpoint; } // namespace llarp::service -namespace llarp::net -{ - struct IPPacket; -} - namespace llarp::quic { using namespace std::literals; inline constexpr auto IDLE_TIMEOUT = 5min; + inline constexpr std::byte CLIENT_TO_SERVER{1}; + inline constexpr std::byte SERVER_TO_CLIENT{2}; + + /// QUIC Tunnel Endpoint; this is the class that implements either end of a quic tunnel for both + /// servers and clients. class Endpoint { + public: + /// Called from tun code via TunnelManager to deliver an incoming packet to us. + /// + /// \param src - the source address; this may be a tun interface address, or may be a fake IPv6 + /// address based on the convo tag. The port is not used. + /// \param ecn - the packet ecn parameter + void + receive_packet(const SockAddr& src, uint8_t ecn, bstring_view data); + + /// Returns a shared pointer to the uvw loop. + std::shared_ptr + get_loop(); + protected: /// the service endpoint we are owned by - service::Endpoint* const parent; - // The current outgoing IP ecn value for the socket - uint8_t ecn_curr = 0; + service::Endpoint& service_endpoint; - /// local "address" just a blank - Address local{}; + /// local "address" is the IPv6 unspecified address since we don't have (or care about) the + /// actual local address for building quic packets. The port of this address must be set to our + /// local pseudo-port, for clients, and 0 for a server. + Address local_addr{in6addr_any}; std::shared_ptr expiry_timer; - std::shared_ptr loop; + std::vector buf; // Max theoretical size of a UDP packet is 2^16-1 minus IP/UDP header overhead static constexpr size_t max_buf_size = 64 * 1024; // Max size of a UDP packet that we'll send @@ -96,8 +105,7 @@ namespace llarp::quic friend class Connection; - // Wires up an endpoint connection. - Endpoint(service::Endpoint* ep, std::shared_ptr loop); + explicit Endpoint(service::Endpoint& service_endpoint_); virtual ~Endpoint(); @@ -112,8 +120,8 @@ namespace llarp::quic }; // Called to handle an incoming packet - virtual void - handle_packet(const Packet& p) = 0; + void + handle_packet(const Packet& p); // Internal method: handles initial common packet decoding, returns the connection ID or nullopt // if decoding failed. @@ -123,27 +131,47 @@ namespace llarp::quic void handle_conn_packet(Connection& c, const Packet& p); + // Accept a new incoming connection, i.e. pre-handshake. Returns a nullptr if the connection + // can't be created (e.g. because of invalid initial data), or if incoming connections are not + // accepted by this endpoint (i.e. because it is not a Server instance, or because there are no + // registered listen handlers). The base class default returns nullptr. + virtual std::shared_ptr + accept_initial_connection(const Packet&) + { + return nullptr; + } + // Reads a packet and handles various error conditions. Returns an io_result. Note that it is // possible for the conn_it to be erased from `conns` if the error code is anything other than // success (0) or NGTCP2_ERR_RETRY. io_result read_packet(const Packet& p, Connection& conn); - // Sets up the ECN IP field (IP_TOS for IPv4) for the next outgoing packet sent via - // send_packet(). This does the actual syscall (if ECN is different than currently set), and is - // typically called implicitly via send_packet(). - void - update_ecn(uint32_t ecn); + // Writes the lokinet packet header to the beginning of `buf_`; the header is prepend to quic + // packets to identify which quic server the packet should be delivered to and consists of: + // - type [1 byte]: 1 for client->server packets; 2 for server->client packets (other values + // reserved) + // - port [2 bytes, network order]: client pseudoport (i.e. either a source or destination port + // depending on type) + // - ecn value [1 byte]: provided by ngtcp2. (Only the lower 2 bits are actually used). + // + // \param psuedo_port - the remote's pseudo-port (will be 0 if the remote is a server, > 0 for + // a client remote) + // \param ecn - the ecn value from ngtcp2 + // + // Returns the number of bytes written to buf_. + virtual size_t + write_packet_header(nuint16_t pseudo_port, uint8_t ecn) = 0; // Sends a packet to `to` containing `data`. Returns a non-error io_result on success, // an io_result with .error_code set to the errno of the failure on failure. io_result - send_packet(const Address& to, bstring_view data, uint32_t ecn); + send_packet(const Address& to, bstring_view data, uint8_t ecn); // Wrapper around the above that takes a regular std::string_view (i.e. of chars) and recasts // it to an string_view of std::bytes. io_result - send_packet(const Address& to, std::string_view data, uint32_t ecn) + send_packet(const Address& to, std::string_view data, uint8_t ecn) { return send_packet( to, bstring_view{reinterpret_cast(data.data()), data.size()}, ecn); @@ -151,7 +179,7 @@ namespace llarp::quic // Another wrapper taking a vector io_result - send_packet(const Address& to, const std::vector& data, uint32_t ecn) + send_packet(const Address& to, const std::vector& data, uint8_t ecn) { return send_packet(to, bstring_view{data.data(), data.size()}, ecn); } @@ -212,12 +240,12 @@ namespace llarp::quic // Default stream buffer size for streams opened through this endpoint. size_t default_stream_buffer_size = 64 * 1024; - // Gets a reference to the UV event loop - uvw::Loop& - get_loop() - { - return *loop; - } + // Packet buffer we use when constructing custom packets to fire over lokinet + std::array buf_; + + // Non-copyable, non-movable + Endpoint(const Endpoint&) = delete; + Endpoint(Endpoint&&) = delete; }; } // namespace llarp::quic diff --git a/llarp/quic/server.cpp b/llarp/quic/server.cpp index cad32790c..369426699 100644 --- a/llarp/quic/server.cpp +++ b/llarp/quic/server.cpp @@ -12,59 +12,11 @@ namespace llarp::quic { - Server::Server( - service::Endpoint* parent, - std::shared_ptr loop, - stream_open_callback_t stream_open) - : Endpoint{parent, std::move(loop)}, stream_open_callback{std::move(stream_open)} - {} - - void - Server::handle_packet(const Packet& p) - { - LogDebug("Handling incoming server packet: ", buffer_printer{p.data}); - auto maybe_dcid = handle_packet_init(p); - if (!maybe_dcid) - return; - auto& dcid = *maybe_dcid; - - // See if we have an existing connection already established for it - LogDebug("Incoming connection id ", dcid); - primary_conn_ptr connptr; - if (auto conn_it = conns.find(dcid); conn_it != conns.end()) - { - if (auto* wptr = std::get_if(&conn_it->second)) - { - connptr = wptr->lock(); - if (!connptr) - LogDebug("CID is an expired alias"); - else - LogDebug("CID is an alias for primary CID ", connptr->base_cid); - } - else - { - connptr = var::get(conn_it->second); - LogDebug("CID is primary"); - } - } - else - { - connptr = accept_connection(p); - } - - if (!connptr) - { - LogWarn("invalid or expired connection, ignoring"); - return; - } - - handle_conn_packet(*connptr, p); - } - std::shared_ptr - Server::accept_connection(const Packet& p) + Server::accept_initial_connection(const Packet& p) { LogDebug("Accepting new connection"); + // This is a new incoming connection ngtcp2_pkt_hd hd; auto rv = ngtcp2_accept(&hd, u8data(p.data), p.data.size()); @@ -85,10 +37,6 @@ namespace llarp::quic return nullptr; } - /* - ngtcp2_cid ocid; - ngtcp2_cid *pocid = nullptr; - */ if (hd.type == NGTCP2_PKT_0RTT) { LogWarn("Received 0-RTT packet, which shouldn't happen in our implementation; dropping"); @@ -114,4 +62,13 @@ namespace llarp::quic } } + size_t + Server::write_packet_header(nuint16_t pport, uint8_t ecn) + { + buf_[0] = SERVER_TO_CLIENT; + std::memcpy(&buf_[1], &pport.n, 2); // remote quic pseudo-port (network order u16) + buf_[3] = std::byte{ecn}; + return 4; + } + } // namespace llarp::quic diff --git a/llarp/quic/server.hpp b/llarp/quic/server.hpp index 192809420..7e6e1db3b 100644 --- a/llarp/quic/server.hpp +++ b/llarp/quic/server.hpp @@ -9,11 +9,10 @@ namespace llarp::quic class Server : public Endpoint { public: - using stream_open_callback_t = - std::function; + using stream_open_callback_t = std::function; - Server( - service::Endpoint*, std::shared_ptr loop, stream_open_callback_t stream_opened); + Server(service::Endpoint& service_endpoint) : Endpoint{service_endpoint} + {} // Stream callback: takes the server, the (just-created) stream, and the connection port. // Returns true if the stream should be allowed or false to reject the stream. The callback @@ -21,21 +20,14 @@ namespace llarp::quic // (which means incoming data will simply be dropped). stream_open_callback_t stream_open_callback; - int - setup_null_crypto(ngtcp2_conn* conn); - private: - // Handles an incoming packet by figuring out and handling the connection id; if necessary we - // send back a version negotiation or a connection close frame, or drop the packet (if in the - // draining state). If we get through all of the above then it's a packet to read, in which - // case we pass it on to read_packet(). - void - handle_packet(const Packet& p) override; - - // Creates a new connection from an incoming packet. Returns a nullptr if the connection can't - // be created. + // Accept a new incoming connection, i.e. pre-handshake. Returns a nullptr if the connection + // can't be created (e.g. because of invalid initial data), or is invalid. std::shared_ptr - accept_connection(const Packet& p); + accept_initial_connection(const Packet& p) override; + + size_t + write_packet_header(nuint16_t pport, uint8_t ecn) override; }; } // namespace llarp::quic diff --git a/llarp/quic/stream.cpp b/llarp/quic/stream.cpp index 166f38dc4..6d5b8b3f9 100644 --- a/llarp/quic/stream.cpp +++ b/llarp/quic/stream.cpp @@ -60,7 +60,7 @@ namespace llarp::quic , conn{conn} , stream_id{std::move(id)} , buffer{buffer_size} - , avail_trigger{conn.endpoint.get_loop().resource()} + , avail_trigger{conn.endpoint.get_loop()->resource()} { avail_trigger->on([this](auto&, auto&) { handle_unblocked(); }); } diff --git a/llarp/quic/stream.hpp b/llarp/quic/stream.hpp index 60ba4b377..65c3df66d 100644 --- a/llarp/quic/stream.hpp +++ b/llarp/quic/stream.hpp @@ -258,6 +258,13 @@ namespace llarp::quic : std::get>(user_data).lock()); } + // Returns a reference to the connection that owns this stream + Connection& + get_connection() + { + return conn; + } + private: friend class Connection; diff --git a/llarp/quic/tunnel.cpp b/llarp/quic/tunnel.cpp index d41bb21c1..cff5badb1 100644 --- a/llarp/quic/tunnel.cpp +++ b/llarp/quic/tunnel.cpp @@ -1,111 +1,634 @@ #include "tunnel.hpp" +#include "service/convotag.hpp" +#include "service/endpoint.hpp" +#include "service/name.hpp" #include "stream.hpp" +#include #include #include +#include +#include +#include +#include +#include -namespace llarp::quic::tunnel +namespace llarp::quic { - // Takes data from the tcp connection and pushes it down the quic tunnel - void - on_outgoing_data(uvw::DataEvent& event, uvw::TCPHandle& client) - { - auto stream = client.data(); - assert(stream); - std::string_view data{event.data.get(), event.length}; - auto peer = client.peer(); - LogDebug(peer.ip, ":", peer.port, " → lokinet ", buffer_printer{data}); - // Steal the buffer from the DataEvent's unique_ptr: - stream->append_buffer(reinterpret_cast(event.data.release()), event.length); - if (stream->used() >= PAUSE_SIZE) - { - LogDebug( - "quic tunnel is congested (have ", - stream->used(), - " bytes in flight); pausing local tcp connection reads"); - client.stop(); - stream->when_available([](llarp::quic::Stream& s) { - auto client = s.data(); - if (s.used() < PAUSE_SIZE) + namespace + { + // Takes data from the tcp connection and pushes it down the quic tunnel + void + on_outgoing_data(uvw::DataEvent& event, uvw::TCPHandle& client) + { + auto stream = client.data(); + assert(stream); + std::string_view data{event.data.get(), event.length}; + auto peer = client.peer(); + LogDebug(peer.ip, ":", peer.port, " → lokinet ", buffer_printer{data}); + // Steal the buffer from the DataEvent's unique_ptr: + stream->append_buffer(reinterpret_cast(event.data.release()), event.length); + if (stream->used() >= tunnel::PAUSE_SIZE) + { + LogDebug( + "quic tunnel is congested (have ", + stream->used(), + " bytes in flight); pausing local tcp connection reads"); + client.stop(); + stream->when_available([](Stream& s) { + auto client = s.data(); + if (s.used() < tunnel::PAUSE_SIZE) + { + LogDebug("quic tunnel is no longer congested; resuming tcp connection reading"); + client->read(); + return true; + } + return false; + }); + } + else + { + LogDebug("Queued ", event.length, " bytes"); + } + } + + // Received data from the quic tunnel and sends it to the TCP connection + void + on_incoming_data(Stream& stream, bstring_view bdata) + { + auto tcp = stream.data(); + assert(tcp); + std::string_view data{reinterpret_cast(bdata.data()), bdata.size()}; + auto peer = tcp->peer(); + LogTrace(peer.ip, ":", peer.port, " ← lokinet ", buffer_printer{data}); + + if (data.empty()) + return; + + // Try first to write immediately from the existing buffer to avoid needing an + // allocation and copy: + auto written = tcp->tryWrite(const_cast(data.data()), data.size()); + if (written < (int)data.size()) + { + data.remove_prefix(written); + + auto wdata = std::make_unique(data.size()); + std::copy(data.begin(), data.end(), wdata.get()); + tcp->write(std::move(wdata), data.size()); + } + } + + // Creates a new tcp handle that forwards incoming data/errors/closes into appropriate actions + // on the given quic stream. + void + install_stream_forwarding(uvw::TCPHandle& tcp, Stream& stream) + { + tcp.data(stream.shared_from_this()); + stream.weak_data(tcp.weak_from_this()); + + tcp.clear(); // Clear any existing initial event handlers + + tcp.on([](auto&, uvw::TCPHandle& c) { + // This fires sometime after we call `close()` to signal that the close is done. + LogError("Connection closed to ", c.peer().ip, ":", c.peer().port, "; closing quic stream"); + c.data()->close(); + c.data(nullptr); + }); + tcp.on([](auto&, uvw::TCPHandle& c) { + // This fires on eof, most likely because the other side of the TCP connection closed it. + LogError("EOF on connection to ", c.peer().ip, ":", c.peer().port); + c.close(); + }); + tcp.on([](const uvw::ErrorEvent& e, uvw::TCPHandle& tcp) { + LogError( + "ErrorEvent[", + e.name(), + ": ", + e.what(), + "] on connection with ", + tcp.peer().ip, + ":", + tcp.peer().port, + ", shutting down quic stream"); + // Failed to open connection, so close the quic stream + auto stream = tcp.data(); + if (stream) + stream->close(tunnel::ERROR_TCP); + tcp.closeReset(); + }); + tcp.on(on_outgoing_data); + stream.data_callback = on_incoming_data; + } + // This initial data handler is responsible for pulling off the initial stream data that comes + // back, confirming that the tunnel is opened on the other end. Currently this is a null byte + // (CONNECT_INIT) but in the future we might encode additional data here (and, if that happens, + // we want this older implementation to fail). + // + // If the initial byte checks out we replace this handler with the regular stream handler (and + // forward the rest of the data to it if we got more than just the single byte). + void + initial_client_data_handler(uvw::TCPHandle& client, Stream& stream, bstring_view bdata) + { + if (bdata.empty()) + return; + client.clear(); // Clear these initial event handlers: we either set up the proper ones, or + // close + + if (auto b0 = bdata[0]; b0 == tunnel::CONNECT_INIT) + { + // Set up callbacks, which replaces both of these initial callbacks + client.read(); + install_stream_forwarding(client, stream); + + if (bdata.size() > 1) { - LogDebug("quic tunnel is no longer congested; resuming tcp connection reading"); - client->read(); - return true; + bdata.remove_prefix(1); + stream.data_callback(stream, std::move(bdata)); } + LogTrace("starting client reading"); + } + else + { + LogWarn( + "Remote connection returned invalid initial byte (0x", + oxenmq::to_hex(bdata.begin(), bdata.begin() + 1), + "); dropping connection"); + stream.close(tunnel::ERROR_BAD_INIT); + client.closeReset(); + } + stream.io_ready(); + } + + // Initial close handler that gets replaced as soon as we receive a valid byte (in the above + // handler). If this gets called then it means the quic remote quic end closed before we + // established the end-to-end tunnel (for example because the remote's tunnel connection + // failed): + void + initial_client_close_handler( + uvw::TCPHandle& client, Stream& /*stream*/, std::optional error_code) + { + if (error_code && *error_code == tunnel::ERROR_CONNECT) + LogDebug("Remote TCP connection failed, closing local connection"); + else + LogWarn( + "Stream connection closed ", + error_code ? "with error " + std::to_string(*error_code) : "gracefully", + "; closing local TCP connection."); + auto peer = client.peer(); + LogDebug("Closing connection to ", peer.ip, ":", peer.port); + client.clear(); + if (error_code) + client.closeReset(); + else + client.close(); + } + + } // namespace + + TunnelManager::TunnelManager(service::Endpoint& se) : service_endpoint_{se} + { + // Cleanup callback to clear out closed tunnel connections + service_endpoint_.Loop()->call_every(500ms, timer_keepalive_, [this] { + LogTrace("Checking quic tunnels for finished connections"); + for (auto ctit = client_tunnels_.begin(); ctit != client_tunnels_.end();) + { + // Clear any accepted connections that have been closed: + auto& [port, ct] = *ctit; + for (auto it = ct.conns.begin(); it != ct.conns.end();) + { + // TCP connections keep a shared_ptr to their quic::Stream while open and clear it when + // closed. (We don't want to use `.active()` here because we do deliberately temporarily + // stop the TCP connection when the quic side gets congested. + if (not *it or not(*it)->data()) + { + LogDebug("Cleanup up closed outgoing tunnel on quic:", port); + it = ct.conns.erase(it); + } + else + ++it; + } + + // If there are not accepted connections left *and* we stopped listening for new ones then + // destroy the whole thing. + if (ct.conns.empty() and (not ct.tcp or not ct.tcp->active())) + { + LogDebug("All sockets closed on quic:", port, ", destroying tunnel data"); + ctit = client_tunnels_.erase(ctit); + } + else + ++ctit; + } + LogTrace("Done quic tunnel cleanup check"); + }); + } + + void + TunnelManager::make_server() + { + // auto loop = get_loop(); + + server_ = std::make_unique(service_endpoint_); + server_->stream_open_callback = [this](Stream& stream, uint16_t port) -> bool { + stream.close_callback = [](quic::Stream& st, + [[maybe_unused]] std::optional errcode) { + auto tcp = st.data(); + if (tcp) + tcp->close(); + }; + + auto& conn = stream.get_connection(); + auto remote = service_endpoint_.GetEndpointWithConvoTag(conn.path.remote); + if (!remote) + { + LogWarn("Received new stream open from invalid/unknown convo tag, dropping stream"); return false; - }); + } + + auto lokinet_addr = var::visit([](auto&& remote) { return remote.ToString(); }, *remote); + auto tunnel_to = allow_connection(lokinet_addr, port); + if (not tunnel_to) + return false; + LogDebug("quic stream from ", lokinet_addr, " to ", port, " tunnelling to ", *tunnel_to); + + auto tcp = get_loop()->resource(); + auto error_handler = tcp->once( + [&stream, to = *tunnel_to](const uvw::ErrorEvent&, uvw::TCPHandle&) { + LogWarn("Failed to connect to ", to, ", shutting down quic stream"); + stream.close(tunnel::ERROR_CONNECT); + }); + + // As soon as we connect to the local tcp tunnel port we fire a CONNECT_INIT down the stream + // tunnel to let the other end know the connection was successful, then set up regular + // stream handling to handle any other to/from data. + tcp->once( + [streamw = stream.weak_from_this(), error_handler = std::move(error_handler)]( + const uvw::ConnectEvent&, uvw::TCPHandle& tcp) { + auto peer = tcp.peer(); + auto stream = streamw.lock(); + if (!stream) + { + LogWarn( + "Connected to TCP ", + peer.ip, + ":", + peer.port, + " but quic stream has gone away; close/resetting local TCP connection"); + tcp.closeReset(); + return; + } + LogDebug("Connected to ", peer.ip, ":", peer.port, " for quic ", stream->id()); + // Set up the data stream forwarding (which also clears these initial handlers). + install_stream_forwarding(tcp, *stream); + assert(stream->used() == 0); + + // Send the magic byte, and start reading from the tcp tunnel in the logic thread + stream->append_buffer(new std::byte[1]{tunnel::CONNECT_INIT}, 1); + tcp.read(); + }); + + tcp->connect(*tunnel_to->operator const sockaddr*()); + + return true; + }; + } + + int + TunnelManager::listen(ListenHandler handler) + { + if (!handler) + throw std::logic_error{"Cannot call listen() with a null handler"}; + assert(service_endpoint_.Loop()->inEventLoop()); + if (not server_) + make_server(); + + int id = next_handler_id_++; + incoming_handlers_.emplace_hint(incoming_handlers_.end(), id, std::move(handler)); + return id; + } + + int + TunnelManager::listen(uint16_t port) + { + return listen([port](std::string_view, uint16_t p) -> std::optional { + if (p == port) + return SockAddr{127, 0, 0, 1, huint16_t{port}}; + return std::nullopt; + }); + } + + void + TunnelManager::forget(int id) + { + incoming_handlers_.erase(id); + } + + std::optional + TunnelManager::allow_connection(std::string_view lokinet_addr, uint16_t port) + { + for (auto& [id, handler] : incoming_handlers_) + { + try + { + if (auto addr = handler(lokinet_addr, port)) + return addr; + } + catch (const std::exception& e) + { + LogWarn( + "Incoming quic connection from ", + lokinet_addr, + " to ", + port, + " denied via exception (", + e.what(), + ")"); + return std::nullopt; + } + } + LogWarn( + "Incoming quic connection from ", lokinet_addr, " to ", port, " declined by all handlers"); + return std::nullopt; + } + + std::shared_ptr + TunnelManager::get_loop() + { + if (auto loop = service_endpoint_.Loop()->MaybeGetUVWLoop()) + return loop; + throw std::logic_error{"TunnelManager requires a libuv-based event loop"}; + } + + // Finds the first unused key in `map`, starting at `start` and wrapping back to 0 if we hit the + // end. Requires an unsigned int type for the key. Requires nullopt if the map is completely + // full, otherwise returns the free key. + template < + typename K, + typename V, + typename = std::enable_if_t && std::is_unsigned_v>> + static std::optional + find_unused_key(std::map& map, K start) + { + if (map.size() == std::numeric_limits::max()) + return std::nullopt; // The map is completely full + [[maybe_unused]] bool from_zero = (start == K{0}); + + // Start at the first key >= start, then walk 1-by-1 (incrementing start) until we find a + // strictly > key, which means we've found a hole we can use + auto it = map.lower_bound(start); + if (it == map.end()) + return start; + + for (; it != map.end(); ++it, ++start) + if (it->first != start) + return start; + if (start != 0) // `start` didn't wrap which means we found an empty slot + return start; + assert(!from_zero); // There *must* be a free slot somewhere in [0, max] (otherwise the map + // would be completely full and we'd have returned nullopt). + return find_unused_key(map, K{0}); + } + + // Wrap common tasks and cleanup that we need to do from multiple places while establishing a + // tunnel + bool + TunnelManager::continue_connecting( + uint16_t pseudo_port, bool step_success, std::string_view step_name, std::string_view addr) + { + assert(service_endpoint_.Loop()->inEventLoop()); + auto it = client_tunnels_.find(pseudo_port); + if (it == client_tunnels_.end()) + { + LogDebug("QUIC tunnel to ", addr, " closed before ", step_name, " finished"); + return false; + } + if (!step_success) + { + LogWarn("QUIC tunnel to ", addr, " failed during ", step_name, "; aborting tunnel"); + it->second.tcp->closeReset(); + if (it->second.open_cb) + it->second.open_cb(false); + client_tunnels_.erase(it); + } + return step_success; + } + + std::pair + TunnelManager::open( + std::string_view remote_address, uint16_t port, OpenCallback on_open, SockAddr bind_addr) + { + std::string remote_addr = lowercase_ascii_string(std::string{remote_address}); + + std::pair result; + auto& [saddr, pport] = result; + + auto maybe_remote = service::ParseAddress(remote_addr); + if (!maybe_remote) + { + if (not service::NameIsValid(remote_addr)) + throw std::invalid_argument{"Invalid remote lokinet name/address"}; + // Otherwise it's a valid ONS name, so we'll initiate an ONS lookup below + } + + // Open the TCP tunnel right away; it will just block new incoming connections until the quic + // connection is established, but this still allows the caller to connect right away and queue + // an initial request (rather than having to wait via a callback before connecting). It also + // makes sure we can actually listen on the given address before we go ahead with establishing + // the quic connection. + auto tcp_tunnel = get_loop()->resource(); + const char* failed = nullptr; + auto err_handler = + tcp_tunnel->once([&failed](auto& evt, auto&) { failed = evt.what(); }); + tcp_tunnel->bind(*bind_addr.operator const sockaddr*()); + tcp_tunnel->listen(); + tcp_tunnel->erase(err_handler); + + if (failed) + { + tcp_tunnel->closeReset(); + throw std::runtime_error{ + "Failed to bind/listen local TCP tunnel socket on " + bind_addr.toString() + ": " + + failed}; } + + auto bound = tcp_tunnel->sock(); + saddr = SockAddr{bound.ip, static_cast(bound.port)}; + + // Find the first unused psuedo-port value starting from next_pseudo_port_. + if (auto p = find_unused_key(client_tunnels_, next_pseudo_port_)) + pport = *p; else + throw std::runtime_error{ + "Unable to open an outgoing quic connection: too many existing connections"}; + (next_pseudo_port_ = pport)++; + + // We are emplacing into client_tunnels_ here: beyond this point we must not throw until we + // return (or if we do, make sure we remove this row from client_tunnels_ first). + assert(client_tunnels_.count(pport) == 0); + auto& ct = client_tunnels_[pport]; + ct.open_cb = std::move(on_open); + ct.tcp = std::move(tcp_tunnel); + + auto after_path = [this, port, pport = pport, remote_addr](auto maybe_convo) { + if (not continue_connecting(pport, (bool)maybe_convo, "path build", remote_addr)) + return; + SockAddr dest{maybe_convo->ToV6()}; + dest.setPort(port); + make_client(dest, *client_tunnels_.find(pport)); + }; + + if (!maybe_remote) { - LogDebug("Queued ", event.length, " bytes"); + // We were given an ONS address, so it's a two-step process: first we resolve the ONS name, + // then we have to build a path to that address. + service_endpoint_.LookupNameAsync( + remote_addr, + [this, + after_path = std::move(after_path), + pport = pport, + remote_addr = std::move(remote_addr)](auto maybe_remote) { + if (not continue_connecting( + pport, (bool)maybe_remote, "endpoint ONS lookup", remote_addr)) + return; + service_endpoint_.EnsurePathTo(*maybe_remote, after_path, open_timeout); + }); + return result; } + + auto& remote = *maybe_remote; + + // See if we have an existing convo tag we can use to start things immediately + if (auto maybe_convo = service_endpoint_.GetBestConvoTagFor(remote)) + after_path(maybe_convo); + else + service_endpoint_.EnsurePathTo(remote, after_path, open_timeout); + + return result; } - // Received data from the quic tunnel and sends it to the TCP connection void - on_incoming_data(llarp::quic::Stream& stream, llarp::quic::bstring_view bdata) + TunnelManager::close(int id) { - auto tcp = stream.data(); - assert(tcp); - std::string_view data{reinterpret_cast(bdata.data()), bdata.size()}; - auto peer = tcp->peer(); - LogTrace(peer.ip, ":", peer.port, " ← lokinet ", buffer_printer{data}); - - if (data.empty()) - return; + if (auto it = client_tunnels_.find(id); it != client_tunnels_.end()) + { + it->second.tcp->close(); + it->second.tcp.reset(); + } + } - // Try first to write immediately from the existing buffer to avoid needing an - // allocation and copy: - auto written = tcp->tryWrite(const_cast(data.data()), data.size()); - if (written < (int)data.size()) + TunnelManager::ClientTunnel::~ClientTunnel() + { + if (tcp) { - data.remove_prefix(written); + tcp->close(); + tcp.reset(); + } + for (auto& conn : conns) + conn->closeReset(); + conns.clear(); - auto wdata = std::make_unique(data.size()); - std::copy(data.begin(), data.end(), wdata.get()); - tcp->write(std::move(wdata), data.size()); + while (not pending_incoming.empty()) + { + if (auto tcp = pending_incoming.front().lock()) + { + tcp->clear(); + tcp->close(); + } + pending_incoming.pop(); } } void - install_stream_forwarding(uvw::TCPHandle& tcp, llarp::quic::Stream& stream) - { - tcp.data(stream.shared_from_this()); - stream.weak_data(tcp.weak_from_this()); - - tcp.on([](auto&, uvw::TCPHandle& c) { - // This fires sometime after we call `close()` to signal that the close is done. - LogError( - "Connection with ", - c.peer().ip, - ":", - c.peer().port, - " closed directly, closing quic stream"); - c.data()->close(); - }); - tcp.on([](auto&, uvw::TCPHandle& c) { - // This fires on eof, most likely because the other side of the TCP connection closed it. - LogError("EOF on connection with ", c.peer().ip, ":", c.peer().port, ", closing quic stream"); - c.data()->close(); - }); - tcp.on([](const uvw::ErrorEvent& e, uvw::TCPHandle& tcp) { - LogError( - "ErrorEvent[", - e.name(), - ": ", - e.what(), - "] on connection with ", - tcp.peer().ip, - ":", - tcp.peer().port, - ", shutting down quic stream"); - // Failed to open connection, so close the quic stream - auto stream = tcp.data(); - if (stream) - stream->close(ERROR_TCP); - tcp.close(); - }); - tcp.on(tunnel::on_outgoing_data); - stream.data_callback = on_incoming_data; + TunnelManager::make_client(const SockAddr& remote, std::pair& row) + { + assert(remote.getPort() > 0); + auto& [pport, tunnel] = row; + assert(not tunnel.client); + tunnel.client = std::make_unique(service_endpoint_, remote, pport); + auto conn = tunnel.client->get_connection(); + + conn->on_stream_available = [this, id = row.first](Connection& conn) { + if (auto it = client_tunnels_.find(id); it != client_tunnels_.end()) + flush_pending_incoming(it->second, conn); + }; + } + + void + TunnelManager::flush_pending_incoming(ClientTunnel& ct, Connection& conn) + { + int available = conn.get_streams_available(); + while (available > 0 and not ct.pending_incoming.empty()) + { + auto client = ct.pending_incoming.front().lock(); + ct.pending_incoming.pop(); + if (not client) + continue; + + try + { + conn.open_stream( + [client](auto&&... args) { + initial_client_data_handler(*client, std::forward(args)...); + }, + [client](auto&&... args) { + initial_client_close_handler(*client, std::forward(args)...); + }); + available--; + } + catch (const std::exception& e) + { + LogWarn("Opening quic stream failed: ", e.what()); + client->closeReset(); + } + + LogTrace("Set up new stream"); + conn.io_ready(); + } } -} // namespace llarp::quic::tunnel + void + TunnelManager::receive_packet(const service::ConvoTag& tag, const llarp_buffer_t& buf) + { + if (buf.sz <= 4) + { + LogWarn("invalid quic packet: packet size (", buf.sz, ") too small"); + return; + } + auto type = static_cast(buf.base[0]); + nuint16_t pseudo_port_n; + std::memcpy(&pseudo_port_n.n, &buf.base[1], 2); + uint16_t pseudo_port = ToHost(pseudo_port_n).h; + auto ecn = static_cast(buf.base[3]); + bstring_view data{reinterpret_cast(&buf.base[4]), buf.sz - 4}; + + SockAddr remote{tag.ToV6()}; + quic::Endpoint* ep = nullptr; + if (type == CLIENT_TO_SERVER) + { + // Client-to-server: the header port is the return port + remote.setPort(pseudo_port); + if (!server_) + { + LogWarn("Dropping incoming quic packet to server: no listeners"); + return; + } + ep = server_.get(); + } + else if (type == SERVER_TO_CLIENT) + { + // Server-to-client: the header port tells us which client tunnel this is going to + if (auto it = client_tunnels_.find(pseudo_port); it != client_tunnels_.end()) + ep = it->second.client.get(); + + if (not ep) + { + LogWarn("Incoming quic packet to invalid/closed client; dropping"); + return; + } + } + else + { + LogWarn("Invalid incoming quic packet type ", type, "; dropping packet"); + return; + } + ep->receive_packet(remote, ecn, data); + } +} // namespace llarp::quic diff --git a/llarp/quic/tunnel.hpp b/llarp/quic/tunnel.hpp index 45e2bb397..7bcaccc2c 100644 --- a/llarp/quic/tunnel.hpp +++ b/llarp/quic/tunnel.hpp @@ -1,6 +1,10 @@ #pragma once +#include #include "stream.hpp" +#include "address.hpp" +#include "client.hpp" +#include "server.hpp" #include #include @@ -9,45 +13,186 @@ #include -namespace llarp::quic::tunnel +namespace llarp::quic { - // The server sends back a 0x00 to signal that the remote TCP connection was established and that - // it is now accepting stream data; the client is not allowed to send any other data down the - // stream until this comes back (any data sent down the stream before then is discarded.) - inline constexpr std::byte CONNECT_INIT{0x00}; - // QUIC application error codes we sent on failures: - // Failure to establish an initial connection: - inline constexpr uint64_t ERROR_CONNECT{0x5471907}; - // Error if we receive something other than CONNECT_INIT as the initial stream data from the - // server - inline constexpr uint64_t ERROR_BAD_INIT{0x5471908}; - // Close error code sent if we get an error on the TCP socket (other than an initial connect - // failure) - inline constexpr uint64_t ERROR_TCP{0x5471909}; - - // We pause reading from the local TCP socket if we have more than this amount of outstanding - // unacked data in the quic tunnel, then resume once it drops below this. - inline constexpr size_t PAUSE_SIZE = 64 * 1024; - - // Callbacks for network events. The uvw::TCPHandle client must contain a shared pointer to the - // associated llarp::quic::Stream in its data, and the llarp::quic::Stream must contain a weak - // pointer to the uvw::TCPHandle. - - // Callback when we receive data to go out over lokinet, i.e. read from the local TCP socket - void - on_outgoing_data(uvw::DataEvent& event, uvw::TCPHandle& client); - - // Callback when we receive data from lokinet to write to the local TCP socket - void - on_incoming_data(llarp::quic::Stream& stream, llarp::quic::bstring_view bdata); - - // Callback to handle and discard the first incoming 0x00 byte that initiates the stream - void - on_init_incoming_data(llarp::quic::Stream& stream, llarp::quic::bstring_view bdata); - - // Creates a new tcp handle that forwards incoming data/errors/closes into appropriate actions on - // the given quic stream. - void - install_stream_forwarding(uvw::TCPHandle& tcp, llarp::quic::Stream& stream); - -} // namespace llarp::quic::tunnel + namespace tunnel + { + // The server sends back a 0x00 to signal that the remote TCP connection was established and + // that it is now accepting stream data; the client is not allowed to send any other data down + // the stream until this comes back (any data sent down the stream before then is discarded.) + inline constexpr std::byte CONNECT_INIT{0x00}; + // QUIC application error codes we sent on failures: + // Failure to establish an initial connection: + inline constexpr uint64_t ERROR_CONNECT{0x5471907}; + // Error if we receive something other than CONNECT_INIT as the initial stream data from the + // server + inline constexpr uint64_t ERROR_BAD_INIT{0x5471908}; + // Close error code sent if we get an error on the TCP socket (other than an initial connect + // failure) + inline constexpr uint64_t ERROR_TCP{0x5471909}; + + // We pause reading from the local TCP socket if we have more than this amount of outstanding + // unacked data in the quic tunnel, then resume once it drops below this. + inline constexpr size_t PAUSE_SIZE = 64 * 1024; + } // namespace tunnel + + /// Manager class for incoming and outgoing QUIC tunnels. + class TunnelManager + { + public: + using ListenHandler = std::function( + std::string_view lokinet_addr, // The remote's full lokinet address + uint16_t port // The requested port the tunnel wants to reach + )>; + + // Timeout for the next `open()`. Note that when `open()` is given a ONS name to resolve this + // includes the resolution time. + std::chrono::milliseconds open_timeout = 10s; + + TunnelManager(service::Endpoint& endpoint); + + /// Adds an incoming listener callback. When a new incoming quic connection is initiated to us + /// by some remote we invoke these callback(s) in order of registration. Each one has three + /// options: + /// - return a concrete llarp::SockAddr giving the TCP address/port to which we should connect + /// new incoming streams over the connection. + /// - returns std::nullopt to decline handling the connection (we will try the next listen + /// handler, in order of registration). + /// - throws an exception (derived from std::exception) in which case we refuse the connection + /// without trying any additional handlers. + /// + /// If `listen()` is not called at all then new incoming connections will be immediately + /// dropped. + /// + /// For plain-C wrappers around this see [FIXME]. + int + listen(ListenHandler handler); + + /// Simple wrapper around `listen(...)` that adds a handler that accepts all incoming + /// connections trying to tunnel to port `port` and maps them to `localhost:port`. + int + listen(uint16_t port); + + /// Removes an incoming connection handler; takes the ID returned by `listen()`. + void + forget(int id); + + /// Called when open succeeds or times out. + using OpenCallback = std::function; + + /// Opens a quic tunnel to some remote lokinet address. (Should only be called from the event + /// loop thread.) + /// + /// \param remote_addr is the lokinet address or ONS name (e.g. `azfojblahblahblah.loki` or + /// `blocks.loki`) that the tunnel should connect to. + /// \param port is the tunneled port on the remote that the client wants to reach. (This is + /// *not* the quic pseudo-port, which is always 0). + /// \param callback callback invoked when the quic connection has been established, or has timed + /// out. + /// \param bind_addr is the bind address and port that we should use for the localhost TCP + /// connection. Use port 0 to let the OS choose a random high port. Defaults to `127.0.0.1:0`. + /// + /// This call immediately opens the local TCP socket, and initiates the lokinet connection and + /// QUIC tunnel to the remote. If the connection fails, the TCP socket will be closed. Note, + /// however, that this TCP socket will block until the underlying quic connection is + /// established. + /// + /// Each connection to the local TCP socket establishes a new stream over the QUIC connection. + /// + /// \return a pair: + /// - SockAddr containing the just-opened localhost socket that tunnels to the remote. This is + /// typically the same IP as `bind_addr`, with the port filled in (if bind_addr had a 0 port). + /// Note that, while you can connect to this socket immediately, it will block until the actual + /// connection and streams are established (and will be closed if they fail). + /// - unique integer that can be passed to close() to stop listening for new connections. This + /// also serves as a unique internal "pseudo-port" number to route returned quic packets to the + /// right connection. + /// + /// TODO: add a callback to invoke when QUIC connection succeeds or fails. + /// TODO: add a plain C wrapper around this + std::pair + open( + std::string_view remote_addr, + uint16_t port, + OpenCallback on_open = {}, + SockAddr bind_addr = {127, 0, 0, 1}); + + /// Start closing an outgoing tunnel; takes the ID returned by `open()`. Note that an existing + /// established tunneled connections will not be forcibly closed; this simply stops accepting + /// new tunnel connections. + void + close(int id); + + /// Called from tun code to deliver a quic packet. + /// + /// \param dest - the convotag for which the packet arrived + /// \param buf - the raw arriving packet + /// + void + receive_packet(const service::ConvoTag& tag, const llarp_buffer_t& buf); + + private: + service::Endpoint& service_endpoint_; + + struct ClientTunnel + { + // quic endpoint + std::unique_ptr client; + // Callback to invoke on quic connection established (true argument) or failed (false arg) + OpenCallback open_cb; + // TCP listening socket + std::shared_ptr tcp; + // Accepted TCP connections + std::unordered_set> conns; + // Queue of incoming connections that are waiting for a stream to become available (either + // because we are still handshaking, or we reached the stream limit). + std::queue> pending_incoming; + + ~ClientTunnel(); + }; + + // pseudo-port -> Client instance (the "port" is used to route incoming quic packets to the + // right quic endpoint); pseudo-ports start at 1. + std::map client_tunnels_; + + uint16_t next_pseudo_port_ = 0; + bool pport_wrapped_ = false; + + bool + continue_connecting( + uint16_t pseudo_port, bool step_success, std::string_view step_name, std::string_view addr); + + void + make_client(const SockAddr& remote, std::pair& row); + + void + flush_pending_incoming(ClientTunnel& ct, Connection& conn); + + // Server instance; this listens on pseudo-port 0 (if it listens). This is automatically + // instantiated the first time `listen()` is called; if not instantiated we simply drop any + // inbound client-to-server quic packets. + std::unique_ptr server_; + + void + make_server(); + + // Called when a new during connection handshaking once we have the established transport + // parameters (which include the port) if this is an incoming connection (and this endpoint is a + // server). This checks handlers to see whether the stream is allowed and, if so, returns a + // SockAddr containing the IP/port the tunnel should map to. Returns nullopt if the connection + // should be rejected. + std::optional + allow_connection(std::string_view lokinet_addr, uint16_t port); + + // Incoming stream handlers + std::map incoming_handlers_; + int next_handler_id_ = 1; + + std::shared_ptr + get_loop(); + + // Cleanup member + std::shared_ptr timer_keepalive_ = std::make_shared(0); + }; + +} // namespace llarp::quic diff --git a/llarp/quic/tunnel_client.cpp b/llarp/quic/tunnel_client.cpp deleted file mode 100644 index 4ae894e75..000000000 --- a/llarp/quic/tunnel_client.cpp +++ /dev/null @@ -1,143 +0,0 @@ -#include "connection.hpp" -#include "client.hpp" -#include "stream.hpp" -#include "tunnel.hpp" -#include - -#include - -#include - -#include -#include - -#include - -/* -using namespace std::literals; - -namespace llarp::quic::tunnel -{ - // When we receive a new incoming connection we immediately initiate a new quic stream. This quic - // stream in turn causes the other end to initiate a TCP connection on whatever port we specified - // in the connection; if the connection is established, it sends back a single byte 0x00 - // (CONNECT_INIT); otherwise it shuts down the stream with an error code. - void - on_new_connection(const uvw::ListenEvent&, uvw::TCPHandle& server) - { - LogDebug("New connection!\n"); - auto client = server.loop().resource(); - server.accept(*client); - - auto conn = server.data(); - std::shared_ptr stream; - try - { - LogTrace("open stream"); - stream = conn->open_stream( - [client](llarp::quic::Stream& stream, llarp::quic::bstring_view bdata) { - if (bdata.empty()) - return; - if (auto b0 = bdata[0]; b0 == tunnel::CONNECT_INIT) - { - // Set up callbacks, which replaces both of these initial callbacks - client->read(); - tunnel::install_stream_forwarding(*client, stream); - - if (bdata.size() > 1) - { - bdata.remove_prefix(1); - stream.data_callback(stream, std::move(bdata)); - } - LogTrace("starting client reading"); - } - else - { - LogWarn( - "Remote connection returned invalid initial byte (0x", - oxenmq::to_hex(bdata.begin(), bdata.begin() + 1), - "); dropping connection"); - client->closeReset(); - stream.close(tunnel::ERROR_BAD_INIT); - } - stream.io_ready(); - }, - [client](llarp::quic::Stream&, std::optional error_code) mutable { - if (error_code && *error_code == tunnel::ERROR_CONNECT) - LogDebug("Remote TCP connection failed, closing local connection"); - else - LogWarn( - "Stream connection closed ", - error_code ? "with error " + std::to_string(*error_code) : "gracefully", - "; closing local TCP connection."); - auto peer = client->peer(); - LogDebug("Closing connection to ", peer.ip, ":", peer.port); - if (error_code) - client->closeReset(); - else - client->close(); - }); - stream->io_ready(); - } - catch (const std::exception& e) - { - LogDebug("open stream failed"); - client->closeReset(); - return; - } - - LogTrace("done stream setup"); - conn->io_ready(); - } - - int - usage(std::string_view arg0, std::string_view msg) - { - std::cerr << msg << "\n\n" - << "Usage: " << arg0 - << " [DESTPORT [SERVERPORT [LISTENPORT]]]\n\nDefaults to ports 4444 4242 5555\n"; - return 1; - } - - int - main(int argc, char* argv[]) - { - auto loop = uvw::Loop::create(); - - std::array ports{{4444, 4242, 5555}}; - for (size_t i = 0; i < ports.size(); i++) - { - if (argc < 2 + (int)i) - break; - if (!parse_int(argv[1 + i], ports[i])) - return usage(argv[0], "Invalid port "s + argv[1 + i]); - } - auto& [dest_port, server_port, listen_port] = ports; - std::cout << "Connecting to quic server at localhost:" << server_port - << " to reach tunneled port " << dest_port - << ", listening on localhost:" << listen_port << "\n"; - - signal(SIGPIPE, SIG_IGN); - - LogDebug("Initializing client"); - auto tunnel_client = std::make_shared( - llarp::quic::Address{{127, 0, 0, 1}, server_port}, // server addr - loop, - dest_port // tunnel destination port - ); - tunnel_client->default_stream_buffer_size = 0; // We steal uvw's provided buffers - LogDebug("Initialized client"); - - // Start listening for TCP connections: - auto server = loop->resource(); - server->data(tunnel_client->get_connection()); - server->on(llarp::quic::tunnel::on_new_connection); - - server->bind("127.0.0.1", listen_port); - server->listen(); - - loop->run(); - } - -} // namespace llarp::quic::tunnel -*/ diff --git a/llarp/quic/tunnel_server.cpp b/llarp/quic/tunnel_server.cpp deleted file mode 100644 index f8c03e16f..000000000 --- a/llarp/quic/tunnel_server.cpp +++ /dev/null @@ -1,158 +0,0 @@ -#include "tunnel_server.hpp" -#include "tunnel.hpp" -#include "connection.hpp" -#include "server.hpp" -#include - -#include - -#include - -/* -using namespace std::literals; - -namespace llarp::quic::tunnel -{ - IncomingTunnel::IncomingTunnel(uint16_t localhost_port) - : IncomingTunnel{ - [localhost_port]( - [[maybe_unused]] const auto& remote, uint16_t port, SockAddr& connect_to) { - if (port != localhost_port) - return AcceptResult::DECLINE; - connect_to.setIPv4(127, 0, 0, 1); - connect_to.setPort(port); - return AcceptResult::ACCEPT; - }} - {} - - int - usage(std::string_view arg0, std::string_view msg) - { - std::cerr << msg << "\n\n" - << "Usage: " << arg0 - << " [LISTENPORT [ALLOWED ...]]\n\nDefaults to listening on 4242 and allowing " - "22,80,4444,8080\n"; - return 1; - } - - int - main(int argc, char* argv[]) - { - uint16_t listen_port = 4242; - std::set allowed_ports{{22, 80, 4444, 8080}}; - - if (argc >= 2 && !parse_int(argv[1], listen_port)) - return usage(argv[0], "Invalid port "s + argv[1]); - if (argc >= 3) - { - allowed_ports.clear(); - for (int i = 2; i < argc; i++) - { - if (argv[i] == "all"sv) - { - allowed_ports.clear(); - break; - } - uint16_t port; - if (!parse_int(argv[i], port)) - return usage(argv[0], "Invalid port "s + argv[i]); - allowed_ports.insert(port); - } - } - - auto loop = uvw::Loop::create(); - - Address listen_addr{{0, 0, 0, 0}, listen_port}; - - signal(SIGPIPE, SIG_IGN); - - // The local address we connect to for incoming connections. (localhost for this demo, should - // be the localhost.loki address for lokinet). - std::string localhost = "127.0.0.1"; - - LogInfo("Initializing QUIC server"); - llarp::quic::Server s{ - listen_addr, - loop, - [loop, localhost, allowed_ports]( - llarp::quic::Server&, llarp::quic::Stream& stream, uint16_t port) { - LogDebug("New incoming quic stream ", stream.id(), " to reach ", localhost, ":", port); - if (port == 0 || !(allowed_ports.empty() || allowed_ports.count(port))) - { - LogWarn( - "quic stream denied by configuration: ", port, " is not a permitted local port"); - return false; - } - - stream.close_callback = [](llarp::quic::Stream& strm, - std::optional error_code) { - LogDebug( - error_code ? "Remote side" : "We", - " closed the quic stream, closing localhost tcp connection"); - if (error_code && *error_code > 0) - LogWarn("Remote quic stream was closed with error code ", *error_code); - auto tcp = strm.data(); - if (!tcp) - LogDebug("Local TCP connection already closed"); - else - tcp->close(); - }; - // Try to open a TCP connection to the configured localhost port; if we establish a - // connection then we immediately send a CONNECT_INIT back down the stream; if we fail - // then we send a fail-to-connect error code. Once we successfully connect both of - // these handlers get replaced with the normal tunnel handlers. - auto tcp = loop->resource(); - auto error_handler = tcp->once( - [&stream, localhost, port](const uvw::ErrorEvent&, uvw::TCPHandle&) { - LogWarn( - "Failed to connect to ", localhost, ":", port, ", shutting down quic stream"); - stream.close(tunnel::ERROR_CONNECT); - }); - tcp->once( - [streamw = stream.weak_from_this(), error_handler = std::move(error_handler)]( - const uvw::ConnectEvent&, uvw::TCPHandle& tcp) { - auto peer = tcp.peer(); - auto stream = streamw.lock(); - if (!stream) - { - LogWarn( - "Connected to ", - peer.ip, - ":", - peer.port, - " but quic stream has gone away; resetting local connection"); - tcp.closeReset(); - return; - } - LogDebug("Connected to ", peer.ip, ":", peer.port, " for quic ", stream->id()); - tcp.erase(error_handler); - tunnel::install_stream_forwarding(tcp, *stream); - assert(stream->used() == 0); - - stream->append_buffer(new std::byte[1]{tunnel::CONNECT_INIT}, 1); - tcp.read(); - }); - - // FIXME, need to configure this - tcp->connect("127.0.0.1", port); - - return true; - }}; - s.default_stream_buffer_size = 0; // We steal uvw's provided buffers - LogDebug("Initialized server"); - std::cout << "Listening on localhost:" << listen_port - << " with tunnel(s) to localhost port(s):"; - if (allowed_ports.empty()) - std::cout << " (any)"; - for (auto p : allowed_ports) - std::cout << ' ' << p; - std::cout << '\n'; - - loop->run(); - - return 0; - } - -} // namespace llarp::quic::tunnel - -*/ diff --git a/llarp/quic/tunnel_server.hpp b/llarp/quic/tunnel_server.hpp deleted file mode 100644 index 67dac4146..000000000 --- a/llarp/quic/tunnel_server.hpp +++ /dev/null @@ -1,80 +0,0 @@ -#pragma once - -#include "address.hpp" -#include -#include - -#include - -namespace llarp::quic::tunnel -{ - enum class AcceptResult : int - { - ACCEPT = 0, // Accepts a connection - DECLINE = -1, // Declines a connection (try other callbacks, refuse if all decline) - REFUSE = -2, // Refuses a connection (don't try any more callbacks) - }; - - // Class that wraps an incoming connection acceptance callback (to allow for callback removal). - // This is not directly constructible: you must construct it via the TunnelServer instance. - class IncomingTunnel final - { - public: - using AcceptCallback = std::function; - - private: - AcceptCallback accept; - - friend class TunnelServer; - - // Constructor with a full callback; invoked via TunnelServer::add_incoming_tunnel - explicit IncomingTunnel(AcceptCallback accept) : accept{std::move(accept)} - {} - - // Constructor for a simple forwarding to a single localhost port. E.g. IncomingTunnel(22) - // allows incoming connections to reach port 22 and forwards them to localhost:22. - explicit IncomingTunnel(uint16_t localhost_port); - - // Constructor for forwarding everything to the same port; this is used by full clients by - // default. - IncomingTunnel(); - }; - - // Class that handles incoming quic connections. This class sets itself up in the llarp event - // loop on construction and maintains a list of incoming acceptor callbacks. When a new incoming - // quic connections is being established we try the callbacks one by one to determine the local - // TCP port the tunnel should be connected to until: - // - a callback sets connect_to and returns AcceptResult::ACCEPT - we connect it to the returned - // address - // - a callback returns AcceptResult::REFUSE - we reject the connection - // - // If a callback returns AcceptResult::DECLINE then we skip that callback and try the next one; if - // all callbacks decline (or we have no callbacks at all) then we reject the connection. - // - // Note that tunnel operations and initialization are done in the event loop thread and so will - // not take effect until the next event loop tick when called from some other thread. - class TunnelServer : public std::enable_shared_from_this - { - public: - explicit TunnelServer(EventLoop_ptr ev); - - // Appends a new tunnel to the end of the queue; all arguments are forwarded to private - // constructor(s) of IncomingTunnel. - template - std::shared_ptr - add_incoming_tunnel(Args&&... args) - { - return std::shared_ptr{new IncomingTunnel{std::forward(args)...}}; - } - - // Removes a tunnel acceptor from the acceptor queue. - void - remove_incoming_tunnel(std::weak_ptr tunnel); - - private: - EventLoop_ptr ev; - std::vector> tunnels; - }; - -} // namespace llarp::quic::tunnel diff --git a/llarp/service/endpoint.cpp b/llarp/service/endpoint.cpp index fae0b7182..04217cef0 100644 --- a/llarp/service/endpoint.cpp +++ b/llarp/service/endpoint.cpp @@ -30,7 +30,7 @@ #include #include #include -#include +#include #include #include @@ -56,6 +56,9 @@ namespace llarp m_state->m_Router = r; m_state->m_Name = "endpoint"; m_RecvQueue.enable(); + + if (Loop()->MaybeGetUVWLoop()) + m_quic = std::make_unique(*this); } bool @@ -83,64 +86,6 @@ namespace llarp m_StartupLNSMappings[name] = std::make_pair(range, auth); }); - auto loop = Router()->loop()->MaybeGetUVWLoop(); - assert(loop); - auto callback = [this, loop, ports = conf.m_quicServerPorts]( - quic::Server& serv, quic::Stream& stream, uint16_t port) { - if (ports.count(port) == 0) - { - return false; - } - - stream.close_callback = [](quic::Stream& st, - [[maybe_unused]] std::optional errcode) { - auto tcp = st.data(); - if (tcp) - tcp->close(); - }; - - auto localIP = net::TruncateV6(GetIfAddr()); - - std::string localhost = localIP.ToString(); - - auto tcp = loop->resource(); - auto error_handler = tcp->once( - [&stream, localhost, port](const uvw::ErrorEvent&, uvw::TCPHandle&) { - LogWarn("Failed to connect to ", localhost, ":", port, ", shutting down quic stream"); - stream.close(quic::tunnel::ERROR_CONNECT); - }); - tcp->once( - [streamw = stream.weak_from_this(), error_handler = std::move(error_handler)]( - const uvw::ConnectEvent&, uvw::TCPHandle& tcp) { - auto peer = tcp.peer(); - auto stream = streamw.lock(); - if (!stream) - { - LogWarn( - "Connected to ", - peer.ip, - ":", - peer.port, - " but quic stream has gone away; resetting local connection"); - tcp.closeReset(); - return; - } - LogDebug("Connected to ", peer.ip, ":", peer.port, " for quic ", stream->id()); - tcp.erase(error_handler); - quic::tunnel::install_stream_forwarding(tcp, *stream); - assert(stream->used() == 0); - - stream->append_buffer(new std::byte[1]{quic::tunnel::CONNECT_INIT}, 1); - tcp.read(); - }); - - tcp->connect(localhost, port); - - return true; - }; - - m_QuicServer = std::make_shared(this, loop, callback); - return m_state->Configure(conf); } @@ -1729,5 +1674,11 @@ namespace llarp return itr->second; } + quic::TunnelManager* + Endpoint::GetQUICTunnel() + { + return m_quic.get(); + } + } // namespace service } // namespace llarp diff --git a/llarp/service/endpoint.hpp b/llarp/service/endpoint.hpp index a461000df..8faa440e2 100644 --- a/llarp/service/endpoint.hpp +++ b/llarp/service/endpoint.hpp @@ -35,6 +35,11 @@ namespace llarp { + namespace quic + { + class TunnelManager; + } + namespace service { struct AsyncKeyExchange; @@ -386,6 +391,11 @@ namespace llarp std::optional MaybeGetAuthInfoForEndpoint(service::Address addr); + /// Returns a pointer to the quic::Tunnel object handling quic connections for this endpoint. + /// Returns nullptr if quic is not supported. + quic::TunnelManager* + GetQUICTunnel(); + protected: /// parent context that owns this endpoint Context* const context; @@ -437,6 +447,7 @@ namespace llarp std::unique_ptr m_state; std::shared_ptr m_AuthPolicy; std::unordered_map m_RemoteAuthInfos; + std::unique_ptr m_quic; /// (lns name, optional exit range, optional auth info) for looking up on startup std::unordered_map, std::optional>> @@ -462,8 +473,6 @@ namespace llarp ConvoMap& Sessions(); // clang-format on thread::Queue m_RecvQueue; - - std::shared_ptr m_QuicServer; }; using Endpoint_ptr = std::shared_ptr;