From a5b7a7e35b27a4084469d7610f2fd716af4557e9 Mon Sep 17 00:00:00 2001 From: dr7ana Date: Sun, 7 Jan 2024 13:15:55 -0800 Subject: [PATCH] Deprecate pending_msg_que in favor of libquic internal stream buffers --- llarp/exit/session.cpp | 64 ++++++++-------- llarp/link/link_manager.cpp | 147 ++++++++++++------------------------ llarp/link/link_manager.hpp | 96 +++++++++++++++++++---- llarp/router/router.cpp | 12 --- llarp/router/router.hpp | 6 -- 5 files changed, 163 insertions(+), 162 deletions(-) diff --git a/llarp/exit/session.cpp b/llarp/exit/session.cpp index 5f1c6192e..2802fd9ca 100644 --- a/llarp/exit/session.cpp +++ b/llarp/exit/session.cpp @@ -246,38 +246,38 @@ namespace llarp::exit bool BaseSession::FlushUpstream() { - auto now = router->now(); - auto path = PickEstablishedPath(llarp::path::ePathRoleExit); - if (path) - { - // for (auto& [i, queue] : m_Upstream) - // { - // while (queue.size()) - // { - // auto& msg = queue.front(); - // msg.sequence_number = path->NextSeqNo(); - // path->SendRoutingMessage(msg, router); - // queue.pop_front(); - // } - // } - } - else - { - // if (m_Upstream.size()) - // llarp::LogWarn("no path for exit session"); - // // discard upstream - // for (auto& [i, queue] : m_Upstream) - // queue.clear(); - // m_Upstream.clear(); - - if (numHops == 1) - { - if (const auto maybe = router->node_db()->get_rc(exit_router); maybe.has_value()) - router->connect_to(*maybe); - } - else if (UrgentBuild(now)) - BuildOneAlignedTo(exit_router); - } + // auto now = router->now(); + // auto path = PickEstablishedPath(llarp::path::ePathRoleExit); + // if (path) + // { + // for (auto& [i, queue] : m_Upstream) + // { + // while (queue.size()) + // { + // auto& msg = queue.front(); + // msg.sequence_number = path->NextSeqNo(); + // path->SendRoutingMessage(msg, router); + // queue.pop_front(); + // } + // } + // } + // else + // { + // if (m_Upstream.size()) + // llarp::LogWarn("no path for exit session"); + // // discard upstream + // for (auto& [i, queue] : m_Upstream) + // queue.clear(); + // m_Upstream.clear(); + + // if (numHops == 1) + // { + // if (const auto maybe = router->node_db()->get_rc(exit_router); maybe.has_value()) + // router->connect_to(*maybe); + // } + // else if (UrgentBuild(now)) + // BuildOneAlignedTo(exit_router); + // } return true; } diff --git a/llarp/link/link_manager.cpp b/llarp/link/link_manager.cpp index b850ec462..9e3db54dc 100644 --- a/llarp/link/link_manager.cpp +++ b/llarp/link/link_manager.cpp @@ -352,36 +352,6 @@ namespace llarp if (auto it = ep.service_conns.find(rid); it != ep.service_conns.end()) { log::critical(logcat, "Fetched configured outbound connection to relay RID:{}", rid); - - auto& conn = it->second->conn; - auto& str = it->second->control_stream; - - if (auto itr = pending_conn_msg_queue.find(rid); itr != pending_conn_msg_queue.end()) - { - log::critical(logcat, "Clearing pending queue for RID:{}", rid); - auto& que = itr->second; - - while (not que.empty()) - { - auto& msg = que.front(); - - if (msg.is_control) - { - log::critical( - logcat, "Dispatching {} request (stream ID: {})!", *msg.endpoint, str->stream_id()); - str->command(std::move(*msg.endpoint), std::move(msg.body), std::move(msg.func)); - } - else - { - log::critical(logcat, "DIspatching data message: {}", msg.body); - conn->send_datagram(std::move(msg.body)); - } - - que.pop_front(); - } - } - - log::warning(logcat, "Pending queue empty for RID:{}", rid); } else { @@ -427,10 +397,6 @@ namespace llarp [this, scid = ci.scid(), rid = RouterID{ci.remote_key()}, error_code = ec]() { log::critical(quic_cat, "Purging quic connection CID:{} (ec:{})", scid, error_code); - // in case this didn't clear earlier, do it now - if (auto p_itr = pending_conn_msg_queue.find(rid); p_itr != pending_conn_msg_queue.end()) - pending_conn_msg_queue.erase(p_itr); - if (auto s_itr = ep.service_conns.find(rid); s_itr != ep.service_conns.end()) { log::critical(quic_cat, "Quic connection to relay RID:{} purged successfully", rid); @@ -453,7 +419,12 @@ namespace llarp std::string body, std::function func) { - assert(func); // makes no sense to send control message and ignore response + // DISCUSS: revisit if this assert makes sense. If so, there's no need to if (func) the + // next logic block + assert(func); // makes no sense to send control message and ignore response (maybe gossip?) + + if (is_stopping) + return false; if (func) { @@ -463,19 +434,6 @@ namespace llarp }; } - return send_control_message_impl(remote, std::move(endpoint), std::move(body), std::move(func)); - } - - bool - LinkManager::send_control_message_impl( - const RouterID& remote, - std::string endpoint, - std::string body, - std::function func) - { - if (is_stopping) - return false; - if (auto conn = ep.get_conn(remote); conn) { conn->control_stream->command(std::move(endpoint), std::move(body), std::move(func)); @@ -487,21 +445,7 @@ namespace llarp endpoint = std::move(endpoint), body = std::move(body), f = std::move(func)]() { - auto pending = PendingMessage(std::move(body), std::move(endpoint), std::move(f)); - - if (auto it = pending_conn_msg_queue.find(remote); it != pending_conn_msg_queue.end()) - { - it->second.push_back(std::move(pending)); - log::critical( - logcat, "Connection to RID:{} is pending; message appended to send queue!", remote); - } - else - { - log::critical(logcat, "Connection to RID:{} is pending; creating send queue!", remote); - auto [itr, b] = pending_conn_msg_queue.emplace(remote, MessageQueue()); - itr->second.push_back(std::move(pending)); - connect_to(remote); - } + connect_and_send(remote, std::move(endpoint), std::move(body), std::move(f)); }); return false; @@ -520,12 +464,7 @@ namespace llarp } _router.loop()->call([this, body = std::move(body), remote]() { - auto pending = PendingMessage(std::move(body)); - - auto [itr, b] = pending_conn_msg_queue.emplace(remote, MessageQueue()); - itr->second.push_back(std::move(pending)); - - connect_to(remote); + connect_and_send(remote, std::nullopt, std::move(body)); }); return false; @@ -550,12 +489,35 @@ namespace llarp } void - LinkManager::connect_to(const RouterID& rid, conn_open_hook hook) + LinkManager::connect_and_send( + const RouterID& router, + std::optional endpoint, + std::string body, + std::function func) { - if (auto rc = node_db->get_rc(rid)) - connect_to(*rc, std::move(hook)); + // by the time we have called this, we have already checked if we have a connection to this RID + // in ::send_control_message_impl, at which point we will dispatch on that stream + if (auto rc = node_db->get_rc(router)) + { + const auto& remote_addr = rc->addr(); + + if (auto rv = ep.establish_and_send( + oxen::quic::RemoteAddress{router.ToView(), remote_addr}, + *rc, + std::move(endpoint), + std::move(body), + std::move(func)); + rv) + { + log::info(quic_cat, "Begun establishing connection to {}", remote_addr); + return; + } + + log::warning(quic_cat, "Failed to begin establishing connection to {}", remote_addr); + } else - log::warning(quic_cat, "Could not find RouterContact for connection to rid:{}", rid); + log::error( + quic_cat, "Error: Could not find RC for connection to rid:{}, message not sent!", router); } void @@ -573,8 +535,6 @@ namespace llarp const auto& remote_addr = rc.addr(); - // TODO: confirm remote end is using the expected pubkey (RouterID). - // TODO: ALPN for "client" vs "relay" (could just be set on endpoint creation) if (auto rv = ep.establish_connection( oxen::quic::RemoteAddress{rid.ToView(), remote_addr}, rc, @@ -767,35 +727,28 @@ namespace llarp log::critical(link_cat, "Received known or old RC, not storing or forwarding."); } + // TODO: can probably use ::send_control_message instead. Need to discuss the potential difference + // in calling Endpoint::get_service_conn vs Endpoint::get_conn void LinkManager::fetch_bootstrap_rcs( const RemoteRC& source, std::string payload, std::function func) { - _router.loop()->call([this, source, payload, f = std::move(func)]() mutable { - if (f) - { - f = [this, func = std::move(f)](oxen::quic::message m) mutable { - _router.loop()->call( - [f = std::move(func), msg = std::move(m)]() mutable { f(std::move(msg)); }); - }; - } - - const auto& rid = source.router_id(); - - if (auto conn = ep.get_service_conn(rid); conn) - { - conn->control_stream->command("bfetch_rcs"s, std::move(payload), std::move(f)); - log::critical(logcat, "Dispatched bootstrap fetch request!"); - return; - } + func = [this, f = std::move(func)](oxen::quic::message m) mutable { + _router.loop()->call( + [func = std::move(f), msg = std::move(m)]() mutable { func(std::move(msg)); }); + }; - log::critical(logcat, "Queuing bootstrap fetch request to {}", rid); - auto pending = PendingMessage(std::move(payload), "bfetch_rcs"s, std::move(f)); + const auto& rid = source.router_id(); - auto [itr, b] = pending_conn_msg_queue.emplace(rid, MessageQueue()); - itr->second.push_back(std::move(pending)); + if (auto conn = ep.get_service_conn(rid); conn) + { + conn->control_stream->command("bfetch_rcs"s, std::move(payload), std::move(func)); + log::critical(logcat, "Dispatched bootstrap fetch request!"); + return; + } - connect_to(source); + _router.loop()->call([this, source, payload, f = std::move(func), rid = rid]() mutable { + connect_and_send(rid, "bfetch_rcs"s, std::move(payload), std::move(f)); }); } diff --git a/llarp/link/link_manager.hpp b/llarp/link/link_manager.hpp index 028d9f3ee..c73662308 100644 --- a/llarp/link/link_manager.hpp +++ b/llarp/link/link_manager.hpp @@ -106,6 +106,16 @@ namespace llarp establish_connection( const oxen::quic::RemoteAddress& remote, const RemoteRC& rc, Opt&&... opts); + template + bool + establish_and_send( + const oxen::quic::RemoteAddress& remote, + const RemoteRC& rc, + std::optional endpoint, + std::string body, + std::function func = nullptr, + Opt&&... opts); + void for_each_connection(std::function func); @@ -188,13 +198,6 @@ namespace llarp private: explicit LinkManager(Router& r); - bool - send_control_message_impl( - const RouterID& remote, - std::string endpoint, - std::string body, - std::function = nullptr); - friend struct link::Endpoint; std::atomic is_stopping; @@ -202,9 +205,6 @@ namespace llarp // sessions to persist -> timestamp to end persist at std::unordered_map persisting_conns; - // holds any messages we attempt to send while connections are establishing - std::unordered_map pending_conn_msg_queue; - util::DecayingHashSet clients{path::DEFAULT_LIFETIME}; std::shared_ptr node_db; @@ -225,9 +225,6 @@ namespace llarp void recv_data_message(oxen::quic::dgram_interface& dgi, bstring dgram); - void - recv_control_message(oxen::quic::message msg); - std::shared_ptr make_control(oxen::quic::connection_interface& ci, const RouterID& rid); @@ -309,10 +306,14 @@ namespace llarp test_reachability(const RouterID& rid, conn_open_hook, conn_closed_hook); void - connect_to(const RouterID& router, conn_open_hook = nullptr); + connect_to(const RemoteRC& rc, conn_open_hook = nullptr, conn_closed_hook = nullptr); void - connect_to(const RemoteRC& rc, conn_open_hook = nullptr, conn_closed_hook = nullptr); + connect_and_send( + const RouterID& router, + std::optional endpoint, + std::string body, + std::function func = nullptr); void close_connection(RouterID rid); @@ -426,6 +427,71 @@ namespace llarp namespace link { + template + bool + Endpoint::establish_and_send( + const oxen::quic::RemoteAddress& remote, + const RemoteRC& rc, + std::optional ep, + std::string body, + std::function func, + Opt&&... opts) + { + try + { + const auto& rid = rc.router_id(); + const auto& is_snode = _is_service_node; + const auto& is_control = ep.has_value(); + const auto us = is_snode ? "Relay"s : "Client"s; + + log::critical(logcat, "Establishing connection to RID:{}", rid); + // add to service conns + auto [itr, b] = service_conns.emplace(rid, nullptr); + + auto conn_interface = endpoint->connect( + remote, + link_manager.tls_creds, + is_snode ? ROUTER_KEEP_ALIVE : CLIENT_KEEP_ALIVE, + std::forward(opts)...); + + // auto + std::shared_ptr control_stream = + conn_interface->template open_stream( + [this, rid = rid](oxen::quic::Stream&, uint64_t error_code) { + log::warning( + logcat, + "BTRequestStream closed unexpectedly (ec:{}); closing outbound connection...", + error_code); + close_connection(rid); + }); + + if (is_snode) + link_manager.register_commands(control_stream, rid); + else + log::critical(logcat, "Client NOT registering BTStream commands!"); + + log::critical( + logcat, + "{} dispatching {} on outbound connection to remote (rid:{})", + us, + is_control ? "control message (ep:{})"_format(ep) : "data message", + rid); + + (is_control) ? control_stream->command(std::move(*ep), std::move(body), std::move(func)) + : conn_interface->send_datagram(std::move(body)); + + itr->second = std::make_shared(conn_interface, control_stream, true); + + log::critical(logcat, "Outbound connection to RID:{} added to service conns...", rid); + return true; + } + catch (...) + { + log::error(quic_cat, "Error: failed to establish connection to {}", remote); + return false; + } + } + template bool Endpoint::establish_connection( diff --git a/llarp/router/router.cpp b/llarp/router/router.cpp index 389a864af..85e3b7ba3 100644 --- a/llarp/router/router.cpp +++ b/llarp/router/router.cpp @@ -247,18 +247,6 @@ namespace llarp loop_wakeup->Trigger(); } - void - Router::connect_to(const RouterID& rid) - { - _link_manager->connect_to(rid); - } - - void - Router::connect_to(const RemoteRC& rc) - { - _link_manager->connect_to(rc); - } - bool Router::send_data_message(const RouterID& remote, std::string payload) { diff --git a/llarp/router/router.hpp b/llarp/router/router.hpp index 7880a49aa..ea2774e15 100644 --- a/llarp/router/router.hpp +++ b/llarp/router/router.hpp @@ -202,12 +202,6 @@ namespace llarp void for_each_connection(std::function func); - void - connect_to(const RouterID& rid); - - void - connect_to(const RemoteRC& rc); - const Contacts& contacts() const {