diff --git a/llarp/link/link_manager.cpp b/llarp/link/link_manager.cpp index 926c1ceac..dda9a666f 100644 --- a/llarp/link/link_manager.cpp +++ b/llarp/link/link_manager.cpp @@ -434,48 +434,9 @@ namespace llarp void LinkManager::fetch_rcs( - const RouterID& source, rc_time since, const std::vector& explicit_ids) + const RouterID& source, std::string payload, std::function func) { - send_control_message( - source, - "fetch_rcs", - RCFetchMessage::serialize(since, explicit_ids), - [this, source = source](oxen::quic::message m) { - if (m.timed_out) - { - // TODO: keep track of this failure for relay quality metrics? - log::info(link_cat, "RC Fetch to {} timed out", source); - return; - } - try - { - oxenc::bt_dict_consumer btdc{m.body()}; - if (not m) - { - auto reason = btdc.require(messages::STATUS_KEY); - log::info(link_cat, "RC Fetch to {} returned error: {}", source, reason); - return; - } - - auto btlc = btdc.require("rcs"sv); - auto timestamp = rc_time{std::chrono::seconds{btdc.require("time"sv)}}; - - std::vector rcs; - - while (not btlc.is_finished()) - { - rcs.emplace_back(btlc.consume_dict_consumer()); - } - - node_db->ingest_rcs(source, std::move(rcs), timestamp); - } - catch (const std::exception& e) - { - // TODO: Inform NodeDB of failure (perhaps just a call to rotate_rc_source()) - log::info(link_cat, "Failed to parse RC Fetch response from {}: {}", source, e.what()); - return; - } - }); + send_control_message(source, "fetch_rcs", std::move(payload), std::move(func)); } void @@ -547,69 +508,16 @@ namespace llarp } void - LinkManager::fetch_router_ids(const RouterID& source, std::function func) + LinkManager::fetch_router_ids( + const RouterID& via, std::string payload, std::function func) { if (ep.conns.empty()) { log::debug(link_cat, "Not attempting to fetch Router IDs: not connected to any relays."); return; } - // TODO: randomize? Also, keep track of successful responses and drop this edge - // if not many come back successfully. - RouterID edge = ep.conns.begin()->first; - send_control_message( - edge, - "fetch_router_ids"s, - RouterIDFetch::serialize(source), - (func) ? std::move(func) : - [this, source = source, edge = std::move(edge)](oxen::quic::message m) { - if (not m) - { - log::info( - link_cat, - "Error fetching RouterIDs from source \"{}\" via edge \"{}\"", - source, - edge); - node_db->ingest_router_ids(edge); // empty response == failure - return; - } - - try - { - oxenc::bt_dict_consumer btdc{m.body()}; - - btdc.required("routers"); - auto router_id_strings = btdc.consume_list>(); - - btdc.require_signature("signature", [&edge](ustring_view msg, ustring_view sig) { - if (sig.size() != 64) - throw std::runtime_error{"Invalid signature: not 64 bytes"}; - if (not crypto::verify(edge, msg, sig)) - throw std::runtime_error{ - "Failed to verify signature for fetch RouterIDs response."}; - }); - - std::vector router_ids; - for (const auto& s : router_id_strings) - { - if (s.size() != RouterID::SIZE) - { - log::warning(link_cat, "Got bad RouterID from edge \"{}\".", edge); - return; - } - router_ids.emplace_back(s.data()); - } - - node_db->ingest_router_ids(edge, std::move(router_ids)); - return; - } - catch (const std::exception& e) - { - log::info(link_cat, "Error handling fetch RouterIDs response: {}", e.what()); - } - node_db->ingest_router_ids(edge); // empty response == failure - }); + send_control_message(via, "fetch_router_ids"s, std::move(payload), std::move(func)); } void diff --git a/llarp/link/link_manager.hpp b/llarp/link/link_manager.hpp index cfa4b3c6d..e51f892dd 100644 --- a/llarp/link/link_manager.hpp +++ b/llarp/link/link_manager.hpp @@ -227,13 +227,17 @@ namespace llarp handle_gossip_rc(oxen::quic::message m); void - fetch_rcs(const RouterID& source, rc_time since, const std::vector& explicit_ids); + fetch_rcs( + const RouterID& source, + std::string payload, + std::function func); void handle_fetch_rcs(oxen::quic::message m); void - fetch_router_ids(const RouterID& source, std::function func = nullptr); + fetch_router_ids( + const RouterID& via, std::string payload, std::function func); void handle_fetch_router_ids(oxen::quic::message m); diff --git a/llarp/nodedb.cpp b/llarp/nodedb.cpp index 15159f7d6..31834ed50 100644 --- a/llarp/nodedb.cpp +++ b/llarp/nodedb.cpp @@ -2,6 +2,8 @@ #include "crypto/types.hpp" #include "dht/kademlia.hpp" +#include "messages/rc.hpp" +#include "messages/router_id.hpp" #include "router_contact.hpp" #include "util/time.hpp" @@ -117,7 +119,7 @@ namespace llarp bool NodeDB::rotate_startup_rc_source() { - if (client_known_routers.size() < 13) + if (active_client_routers.size() < 13) { // do something here return false; @@ -126,7 +128,7 @@ namespace llarp RouterID temp = rc_fetch_source; while (temp == rc_fetch_source) - std::sample(client_known_routers.begin(), client_known_routers.end(), &temp, 1, csrng); + std::sample(active_client_routers.begin(), active_client_routers.end(), &temp, 1, csrng); rc_fetch_source = std::move(temp); return true; @@ -145,11 +147,12 @@ namespace llarp throw std::runtime_error{"Called rotate_rc_source with no connections, does not make sense!"}; // We should not be in this function if client_known_routers isn't populated - if (client_known_routers.size() <= 1) + if (active_client_routers.size() <= 1) throw std::runtime_error{"Cannot rotate RC source without RC source(s) to rotate to!"}; RemoteRC new_source{}; _router.link_manager().get_random_connected(new_source); + if (conn_count == 1) { // if we only have one connection, it must be current rc fetch source @@ -171,7 +174,7 @@ namespace llarp while (r == rc_fetch_source) { - std::sample(client_known_routers.begin(), client_known_routers.end(), &r, 1, csrng); + std::sample(active_client_routers.begin(), active_client_routers.end(), &r, 1, csrng); } rc_fetch_source = std::move(r); return; @@ -187,7 +190,7 @@ namespace llarp // TODO: trust model void - NodeDB::ingest_rcs(RouterID source, std::vector rcs, rc_time timestamp) + NodeDB::store_fetched_rcs(RouterID source, std::vector rcs, rc_time timestamp) { (void)source; @@ -205,85 +208,35 @@ namespace llarp last_rc_update_relay_timestamp = timestamp; } - // TODO: trust model void - NodeDB::ingest_router_ids(RouterID source, std::vector ids) + NodeDB::ingest_rid_fetch_responses(const RemoteRC& source, std::vector ids) { - router_id_fetch_responses[source] = std::move(ids); + const auto& rid = source.router_id(); - router_id_response_count++; - if (router_id_response_count == router_id_fetch_sources.size()) - { - // TODO: reconcile all the responses, for now just insert all - for (const auto& [rid, responses] : router_id_fetch_responses) - { - // TODO: empty == failure, handle that case - for (const auto& response : responses) - { - client_known_routers.insert(std::move(response)); - } - } - router_id_fetch_in_progress = false; - } + router_id_fetch_responses[rid] = std::move(ids); } - void - NodeDB::fetch_initial() + bool + NodeDB::process_fetched_rids() { - int num_failures = 0; - - [[maybe_unused]] - bool use_bootstrap = false; - - RouterID fetch_src; - - // NodeDB::load_from_disk is called in Router::Run before any calls to Router::Tick are - // made, which trigger this function call. As a result, client_known_routers should be - // populated (if there was anything to populate it with) - auto num_known = client_known_routers.size(); - - if (num_known >= MIN_ACTIVE_RIDS) + for (const auto& [rid, responses] : router_id_fetch_responses) { - std::sample(client_known_routers.begin(), client_known_routers.end(), &fetch_src, 1, csrng); - } - else - { - // DEFAULT TO BOOTSTRAP ROUTERS HERE - use_bootstrap = true; - log::debug( - logcat, - "Insufficient known active RID's to fetch ({}/{}); defaulting to bootstrap", - num_known, - MIN_ACTIVE_RIDS); - - assert(not bootstraps.empty()); - fetch_src = bootstraps.begin()->first; - } - - while (num_failures < MAX_FETCH_ATTEMPTS) - { - - - + // TODO: empty == failure, handle that case + for (const auto& response : responses) + { + active_client_routers.insert(std::move(response)); + } } + router_id_fetch_in_progress = false; - router_id_fetch_in_progress = true; - router_id_response_count = 0; - router_id_fetch_responses.clear(); - } - - bool - NodeDB::fetch_initial_rcs(const RouterID& src) - { - (void)src; return true; } void - NodeDB::fetch_rcs() + NodeDB::fetch_rcs(int n_fails, bool initial) { + int num_failures = n_fails; std::vector needed; - const auto now = time_point_now(); for (const auto& [rid, rc] : known_rcs) @@ -292,15 +245,84 @@ namespace llarp needed.push_back(rid); } - _router.link_manager().fetch_rcs( - rc_fetch_source, last_rc_update_relay_timestamp, std::move(needed)); + RouterID src = (initial) + ? *std::next(active_client_routers.begin(), csrng() % active_client_routers.size()) + : rc_fetch_source; + + while (num_failures < MAX_FETCH_ATTEMPTS) + { + auto success = std::make_shared>(); + auto f = success->get_future(); + + _router.link_manager().fetch_rcs( + src, + RCFetchMessage::serialize(last_rc_update_relay_timestamp, needed), + [this, src, p = std::move(success)](oxen::quic::message m) mutable { + if (m.timed_out) + { + log::info(logcat, "RC fetch to {} timed out", src); + p->set_value(false); + return; + } + try + { + oxenc::bt_dict_consumer btdc{m.body()}; + if (not m) + { + auto reason = btdc.require(messages::STATUS_KEY); + log::info(logcat, "RC fetch to {} returned error: {}", src, reason); + p->set_value(false); + return; + } + + auto btlc = btdc.require("rcs"sv); + auto timestamp = rc_time{std::chrono::seconds{btdc.require("time"sv)}}; + + std::vector rcs; + + while (not btlc.is_finished()) + { + rcs.emplace_back(btlc.consume_dict_consumer()); + } + + store_fetched_rcs(src, std::move(rcs), timestamp); + p->set_value(true); + } + catch (const std::exception& e) + { + log::info(logcat, "Failed to parse RC fetch response from {}: {}", src, e.what()); + p->set_value(false); + return; + } + }); + + if (f.get()) + { + log::debug(logcat, "Successfully fetched RC's from {}", src); + rc_fetch_source = src; + assert(_router.link_manager().have_connection_to(src)); + break; + } + + ++num_failures; + log::debug( + logcat, + "Unable to fetch RC's from {}; rotating RC source ({}/{} attempts)", + src, + num_failures, + MAX_FETCH_ATTEMPTS); + + src = (initial) + ? *std::next(active_client_routers.begin(), csrng() % active_client_routers.size()) + : std::next(known_rcs.begin(), csrng() % known_rcs.size())->first; + } } void - NodeDB::fetch_router_ids() + NodeDB::fetch_router_ids(int n_fails, bool initial) { - if (router_id_fetch_in_progress) - return; + assert(not router_id_fetch_in_progress); + int num_failures = n_fails; if (router_id_fetch_sources.empty()) select_router_id_sources(); @@ -313,19 +335,140 @@ namespace llarp } router_id_fetch_in_progress = true; - router_id_response_count = 0; router_id_fetch_responses.clear(); - for (const auto& rid : router_id_fetch_sources) - _router.link_manager().fetch_router_ids(rid); + RouterID src = (initial) + ? *std::next(active_client_routers.begin(), csrng() % active_client_routers.size()) + : rc_fetch_source; + + std::unordered_set fails; + + while (num_failures < MAX_FETCH_ATTEMPTS) + { + RemoteRC& src_rc = known_rcs[src]; + auto success = std::make_shared>(); + auto f = success->get_future(); + fails.clear(); + + for (const auto& target : router_id_fetch_sources) + { + _router.link_manager().fetch_router_ids( + src, + RouterIDFetch::serialize(target), + [this, src, src_rc, target, p = std::move(success)](oxen::quic::message m) mutable { + if (not m) + { + log::info(link_cat, "RID fetch from {} via {} timed out", src, target); + + ingest_rid_fetch_responses(src_rc); + p->set_value(-1); + return; + } + + try + { + oxenc::bt_dict_consumer btdc{m.body()}; + + btdc.required("routers"); + auto router_id_strings = btdc.consume_list>(); + + btdc.require_signature("signature", [&src](ustring_view msg, ustring_view sig) { + if (sig.size() != 64) + throw std::runtime_error{"Invalid signature: not 64 bytes"}; + if (not crypto::verify(src, msg, sig)) + throw std::runtime_error{ + "Failed to verify signature for fetch RouterIDs response."}; + }); + + std::vector router_ids; + + for (const auto& s : router_id_strings) + { + if (s.size() != RouterID::SIZE) + { + log::warning( + link_cat, "RID fetch from {} via {} returned bad RouterID", target, src); + p->set_value(0); + return; + } + + router_ids.emplace_back(s.data()); + } + + ingest_rid_fetch_responses(src_rc, std::move(router_ids)); + return; + } + catch (const std::exception& e) + { + log::info(link_cat, "Error handling fetch RouterIDs response: {}", e.what()); + p->set_value(0); + } + + ingest_rid_fetch_responses(src_rc); // empty response == failure + }); + + switch (f.get()) + { + case 1: + log::debug(logcat, "Successfully fetched RID's from {} via {}", target, src); + continue; + case 0: + // RC node relayed our fetch routerID request, but the request failed at the target + log::debug(logcat, "Unsuccessfully fetched RID's from {} via {}", target, src); + fails.insert(target); + continue; + default: + // RC node failed to relay our routerID request; re-select RC node and continue + log::debug(logcat, "RC source {} failed to mediate RID fetching from {}", src, target); + src = (initial) + ? *std::next(active_client_routers.begin(), csrng() % active_client_routers.size()) + : std::next(known_rcs.begin(), csrng() % known_rcs.size())->first; + ++num_failures; + fetch_rcs(num_failures); + continue; + } + } + + auto n_fails = fails.size(); + + if (n_fails <= MAX_RID_ERRORS) + { + log::debug( + logcat, + "RID fetching was successful ({}/{} acceptable errors)", + fails.size(), + MAX_RID_ERRORS); + rc_fetch_source = src; + assert(_router.link_manager().have_connection_to(src)); + + // this is where the trust model will do verification based on the similarity of the sets + if (process_fetched_rids()) + { + log::debug(logcat, "Accumulated RID's accepted by trust model"); + return; + } + + log::debug( + logcat, "Accumulated RID's rejected by trust model, reselecting all RID sources..."); + select_router_id_sources(router_id_fetch_sources); + ++num_failures; + continue; + } + + // we had 4 or more failed requests, so we will need to rotate our rid sources + log::debug( + logcat, "RID fetching found {} failures; reselecting failed RID sources...", n_fails); + ++num_failures; + select_router_id_sources(fails); + } } void NodeDB::select_router_id_sources(std::unordered_set excluded) { - // TODO: bootstrapping should be finished before this is called, so this - // shouldn't happen; need to make sure that's the case. - if (client_known_routers.empty()) + // bootstrapping should be finished before this is called, so this + // shouldn't happen; need to make sure that's the case. + if (active_client_routers.empty()) return; // keep using any we've been using, but remove `excluded` ones @@ -333,9 +476,9 @@ namespace llarp router_id_fetch_sources.erase(r); // only know so many routers, so no need to randomize - if (client_known_routers.size() <= (ROUTER_ID_SOURCE_COUNT + excluded.size())) + if (active_client_routers.size() <= (ROUTER_ID_SOURCE_COUNT + excluded.size())) { - for (const auto& r : client_known_routers) + for (const auto& r : active_client_routers) { if (excluded.count(r)) continue; @@ -347,7 +490,7 @@ namespace llarp while (router_id_fetch_sources.size() < ROUTER_ID_SOURCE_COUNT) { RouterID r; - std::sample(client_known_routers.begin(), client_known_routers.end(), &r, 1, csrng); + std::sample(active_client_routers.begin(), active_client_routers.end(), &r, 1, csrng); if (excluded.count(r) == 0) router_id_fetch_sources.insert(r); } @@ -457,7 +600,7 @@ namespace llarp // TODO: the list of relays should be maintained and stored separately from // the RCs, as we keep older RCs around in case we go offline and need to // bootstrap, but they shouldn't be in the "good relays" list. - client_known_routers.insert(rid); + active_client_routers.insert(rid); return true; }); diff --git a/llarp/nodedb.hpp b/llarp/nodedb.hpp index fc396f4ed..8b6b540dd 100644 --- a/llarp/nodedb.hpp +++ b/llarp/nodedb.hpp @@ -22,7 +22,10 @@ namespace llarp { struct Router; + inline constexpr size_t ROUTER_ID_SOURCE_COUNT{12}; + inline constexpr size_t MIN_RID_FETCHES{8}; inline constexpr size_t MIN_ACTIVE_RIDS{24}; + inline constexpr size_t MAX_RID_ERRORS{ROUTER_ID_SOURCE_COUNT - MIN_RID_FETCHES}; inline constexpr int MAX_FETCH_ATTEMPTS{10}; class NodeDB @@ -57,23 +60,20 @@ namespace llarp std::unordered_map last_rc_update_times; // Client list of active RouterID's - std::unordered_set client_known_routers; + std::unordered_set active_client_routers; // only ever use to specific edges as path first-hops std::unordered_set pinned_edges; - // rc update info + // rc update info: we only set this upon a SUCCESSFUL fetching RouterID rc_fetch_source; rc_time last_rc_update_relay_timestamp; - static constexpr auto ROUTER_ID_SOURCE_COUNT = 12; - std::unordered_set router_id_fetch_sources; - std::unordered_map> router_id_fetch_responses; // process responses once all are received (or failed/timed out) - size_t router_id_response_count{0}; + std::unordered_map> router_id_fetch_responses; bool router_id_fetch_in_progress{false}; bool @@ -136,22 +136,22 @@ namespace llarp rotate_startup_rc_source(); void - ingest_rcs(RouterID source, std::vector rcs, rc_time timestamp); + store_fetched_rcs(RouterID source, std::vector rcs, rc_time timestamp); void - ingest_router_ids(RouterID source, std::vector ids = {}); + ingest_rid_fetch_responses(const RemoteRC& source, std::vector ids = {}); - void - fetch_initial(); + bool + process_fetched_rids(); bool fetch_initial_rcs(const RouterID& src); void - fetch_rcs(); + fetch_rcs(int n_fails = 0, bool initial = false); void - fetch_router_ids(); + fetch_router_ids(int n_fails = 0, bool initial = false); void select_router_id_sources(std::unordered_set excluded = {}); diff --git a/llarp/router/router.cpp b/llarp/router/router.cpp index da98cf31b..6a50d2f7d 100644 --- a/llarp/router/router.cpp +++ b/llarp/router/router.cpp @@ -851,23 +851,28 @@ namespace llarp } else { + int num_failures = 0; + if (needs_initial_fetch) { - node_db()->fetch_initial(); + node_db()->fetch_rcs(num_failures, true); + last_rc_fetch = now_timepoint; + node_db()->fetch_router_ids(num_failures, true); + last_routerid_fetch = now_timepoint; } else { // (client-only) periodically fetch updated RCs if (now_timepoint - last_rc_fetch > RC_UPDATE_INTERVAL) { - node_db()->fetch_rcs(); + node_db()->fetch_rcs(num_failures); last_rc_fetch = now_timepoint; } // (client-only) periodically fetch updated RouterID list if (now_timepoint - last_routerid_fetch > ROUTERID_UPDATE_INTERVAL) { - node_db()->fetch_router_ids(); + node_db()->fetch_router_ids(num_failures); last_routerid_fetch = now_timepoint; } }