RC/RID fetching logic implemented

pull/2226/head
dr7ana 6 months ago
parent 5fa3c2be87
commit 6559617816

@ -434,48 +434,9 @@ namespace llarp
void
LinkManager::fetch_rcs(
const RouterID& source, rc_time since, const std::vector<RouterID>& explicit_ids)
const RouterID& source, std::string payload, std::function<void(oxen::quic::message m)> func)
{
send_control_message(
source,
"fetch_rcs",
RCFetchMessage::serialize(since, explicit_ids),
[this, source = source](oxen::quic::message m) {
if (m.timed_out)
{
// TODO: keep track of this failure for relay quality metrics?
log::info(link_cat, "RC Fetch to {} timed out", source);
return;
}
try
{
oxenc::bt_dict_consumer btdc{m.body()};
if (not m)
{
auto reason = btdc.require<std::string_view>(messages::STATUS_KEY);
log::info(link_cat, "RC Fetch to {} returned error: {}", source, reason);
return;
}
auto btlc = btdc.require<oxenc::bt_list_consumer>("rcs"sv);
auto timestamp = rc_time{std::chrono::seconds{btdc.require<int64_t>("time"sv)}};
std::vector<RemoteRC> rcs;
while (not btlc.is_finished())
{
rcs.emplace_back(btlc.consume_dict_consumer());
}
node_db->ingest_rcs(source, std::move(rcs), timestamp);
}
catch (const std::exception& e)
{
// TODO: Inform NodeDB of failure (perhaps just a call to rotate_rc_source())
log::info(link_cat, "Failed to parse RC Fetch response from {}: {}", source, e.what());
return;
}
});
send_control_message(source, "fetch_rcs", std::move(payload), std::move(func));
}
void
@ -547,69 +508,16 @@ namespace llarp
}
void
LinkManager::fetch_router_ids(const RouterID& source, std::function<void(oxen::quic::message m)> func)
LinkManager::fetch_router_ids(
const RouterID& via, std::string payload, std::function<void(oxen::quic::message m)> func)
{
if (ep.conns.empty())
{
log::debug(link_cat, "Not attempting to fetch Router IDs: not connected to any relays.");
return;
}
// TODO: randomize? Also, keep track of successful responses and drop this edge
// if not many come back successfully.
RouterID edge = ep.conns.begin()->first;
send_control_message(
edge,
"fetch_router_ids"s,
RouterIDFetch::serialize(source),
(func) ? std::move(func) :
[this, source = source, edge = std::move(edge)](oxen::quic::message m) {
if (not m)
{
log::info(
link_cat,
"Error fetching RouterIDs from source \"{}\" via edge \"{}\"",
source,
edge);
node_db->ingest_router_ids(edge); // empty response == failure
return;
}
try
{
oxenc::bt_dict_consumer btdc{m.body()};
btdc.required("routers");
auto router_id_strings = btdc.consume_list<std::vector<ustring>>();
btdc.require_signature("signature", [&edge](ustring_view msg, ustring_view sig) {
if (sig.size() != 64)
throw std::runtime_error{"Invalid signature: not 64 bytes"};
if (not crypto::verify(edge, msg, sig))
throw std::runtime_error{
"Failed to verify signature for fetch RouterIDs response."};
});
std::vector<RouterID> router_ids;
for (const auto& s : router_id_strings)
{
if (s.size() != RouterID::SIZE)
{
log::warning(link_cat, "Got bad RouterID from edge \"{}\".", edge);
return;
}
router_ids.emplace_back(s.data());
}
node_db->ingest_router_ids(edge, std::move(router_ids));
return;
}
catch (const std::exception& e)
{
log::info(link_cat, "Error handling fetch RouterIDs response: {}", e.what());
}
node_db->ingest_router_ids(edge); // empty response == failure
});
send_control_message(via, "fetch_router_ids"s, std::move(payload), std::move(func));
}
void

@ -227,13 +227,17 @@ namespace llarp
handle_gossip_rc(oxen::quic::message m);
void
fetch_rcs(const RouterID& source, rc_time since, const std::vector<RouterID>& explicit_ids);
fetch_rcs(
const RouterID& source,
std::string payload,
std::function<void(oxen::quic::message m)> func);
void
handle_fetch_rcs(oxen::quic::message m);
void
fetch_router_ids(const RouterID& source, std::function<void(oxen::quic::message m)> func = nullptr);
fetch_router_ids(
const RouterID& via, std::string payload, std::function<void(oxen::quic::message m)> func);
void
handle_fetch_router_ids(oxen::quic::message m);

@ -2,6 +2,8 @@
#include "crypto/types.hpp"
#include "dht/kademlia.hpp"
#include "messages/rc.hpp"
#include "messages/router_id.hpp"
#include "router_contact.hpp"
#include "util/time.hpp"
@ -117,7 +119,7 @@ namespace llarp
bool
NodeDB::rotate_startup_rc_source()
{
if (client_known_routers.size() < 13)
if (active_client_routers.size() < 13)
{
// do something here
return false;
@ -126,7 +128,7 @@ namespace llarp
RouterID temp = rc_fetch_source;
while (temp == rc_fetch_source)
std::sample(client_known_routers.begin(), client_known_routers.end(), &temp, 1, csrng);
std::sample(active_client_routers.begin(), active_client_routers.end(), &temp, 1, csrng);
rc_fetch_source = std::move(temp);
return true;
@ -145,11 +147,12 @@ namespace llarp
throw std::runtime_error{"Called rotate_rc_source with no connections, does not make sense!"};
// We should not be in this function if client_known_routers isn't populated
if (client_known_routers.size() <= 1)
if (active_client_routers.size() <= 1)
throw std::runtime_error{"Cannot rotate RC source without RC source(s) to rotate to!"};
RemoteRC new_source{};
_router.link_manager().get_random_connected(new_source);
if (conn_count == 1)
{
// if we only have one connection, it must be current rc fetch source
@ -171,7 +174,7 @@ namespace llarp
while (r == rc_fetch_source)
{
std::sample(client_known_routers.begin(), client_known_routers.end(), &r, 1, csrng);
std::sample(active_client_routers.begin(), active_client_routers.end(), &r, 1, csrng);
}
rc_fetch_source = std::move(r);
return;
@ -187,7 +190,7 @@ namespace llarp
// TODO: trust model
void
NodeDB::ingest_rcs(RouterID source, std::vector<RemoteRC> rcs, rc_time timestamp)
NodeDB::store_fetched_rcs(RouterID source, std::vector<RemoteRC> rcs, rc_time timestamp)
{
(void)source;
@ -205,85 +208,35 @@ namespace llarp
last_rc_update_relay_timestamp = timestamp;
}
// TODO: trust model
void
NodeDB::ingest_router_ids(RouterID source, std::vector<RouterID> ids)
NodeDB::ingest_rid_fetch_responses(const RemoteRC& source, std::vector<RouterID> ids)
{
router_id_fetch_responses[source] = std::move(ids);
const auto& rid = source.router_id();
router_id_response_count++;
if (router_id_response_count == router_id_fetch_sources.size())
{
// TODO: reconcile all the responses, for now just insert all
for (const auto& [rid, responses] : router_id_fetch_responses)
{
// TODO: empty == failure, handle that case
for (const auto& response : responses)
{
client_known_routers.insert(std::move(response));
}
}
router_id_fetch_in_progress = false;
}
router_id_fetch_responses[rid] = std::move(ids);
}
void
NodeDB::fetch_initial()
bool
NodeDB::process_fetched_rids()
{
int num_failures = 0;
[[maybe_unused]]
bool use_bootstrap = false;
RouterID fetch_src;
// NodeDB::load_from_disk is called in Router::Run before any calls to Router::Tick are
// made, which trigger this function call. As a result, client_known_routers should be
// populated (if there was anything to populate it with)
auto num_known = client_known_routers.size();
if (num_known >= MIN_ACTIVE_RIDS)
for (const auto& [rid, responses] : router_id_fetch_responses)
{
std::sample(client_known_routers.begin(), client_known_routers.end(), &fetch_src, 1, csrng);
}
else
{
// DEFAULT TO BOOTSTRAP ROUTERS HERE
use_bootstrap = true;
log::debug(
logcat,
"Insufficient known active RID's to fetch ({}/{}); defaulting to bootstrap",
num_known,
MIN_ACTIVE_RIDS);
assert(not bootstraps.empty());
fetch_src = bootstraps.begin()->first;
}
while (num_failures < MAX_FETCH_ATTEMPTS)
{
// TODO: empty == failure, handle that case
for (const auto& response : responses)
{
active_client_routers.insert(std::move(response));
}
}
router_id_fetch_in_progress = false;
router_id_fetch_in_progress = true;
router_id_response_count = 0;
router_id_fetch_responses.clear();
}
bool
NodeDB::fetch_initial_rcs(const RouterID& src)
{
(void)src;
return true;
}
void
NodeDB::fetch_rcs()
NodeDB::fetch_rcs(int n_fails, bool initial)
{
int num_failures = n_fails;
std::vector<RouterID> needed;
const auto now = time_point_now();
for (const auto& [rid, rc] : known_rcs)
@ -292,15 +245,84 @@ namespace llarp
needed.push_back(rid);
}
_router.link_manager().fetch_rcs(
rc_fetch_source, last_rc_update_relay_timestamp, std::move(needed));
RouterID src = (initial)
? *std::next(active_client_routers.begin(), csrng() % active_client_routers.size())
: rc_fetch_source;
while (num_failures < MAX_FETCH_ATTEMPTS)
{
auto success = std::make_shared<std::promise<bool>>();
auto f = success->get_future();
_router.link_manager().fetch_rcs(
src,
RCFetchMessage::serialize(last_rc_update_relay_timestamp, needed),
[this, src, p = std::move(success)](oxen::quic::message m) mutable {
if (m.timed_out)
{
log::info(logcat, "RC fetch to {} timed out", src);
p->set_value(false);
return;
}
try
{
oxenc::bt_dict_consumer btdc{m.body()};
if (not m)
{
auto reason = btdc.require<std::string_view>(messages::STATUS_KEY);
log::info(logcat, "RC fetch to {} returned error: {}", src, reason);
p->set_value(false);
return;
}
auto btlc = btdc.require<oxenc::bt_list_consumer>("rcs"sv);
auto timestamp = rc_time{std::chrono::seconds{btdc.require<int64_t>("time"sv)}};
std::vector<RemoteRC> rcs;
while (not btlc.is_finished())
{
rcs.emplace_back(btlc.consume_dict_consumer());
}
store_fetched_rcs(src, std::move(rcs), timestamp);
p->set_value(true);
}
catch (const std::exception& e)
{
log::info(logcat, "Failed to parse RC fetch response from {}: {}", src, e.what());
p->set_value(false);
return;
}
});
if (f.get())
{
log::debug(logcat, "Successfully fetched RC's from {}", src);
rc_fetch_source = src;
assert(_router.link_manager().have_connection_to(src));
break;
}
++num_failures;
log::debug(
logcat,
"Unable to fetch RC's from {}; rotating RC source ({}/{} attempts)",
src,
num_failures,
MAX_FETCH_ATTEMPTS);
src = (initial)
? *std::next(active_client_routers.begin(), csrng() % active_client_routers.size())
: std::next(known_rcs.begin(), csrng() % known_rcs.size())->first;
}
}
void
NodeDB::fetch_router_ids()
NodeDB::fetch_router_ids(int n_fails, bool initial)
{
if (router_id_fetch_in_progress)
return;
assert(not router_id_fetch_in_progress);
int num_failures = n_fails;
if (router_id_fetch_sources.empty())
select_router_id_sources();
@ -313,19 +335,140 @@ namespace llarp
}
router_id_fetch_in_progress = true;
router_id_response_count = 0;
router_id_fetch_responses.clear();
for (const auto& rid : router_id_fetch_sources)
_router.link_manager().fetch_router_ids(rid);
RouterID src = (initial)
? *std::next(active_client_routers.begin(), csrng() % active_client_routers.size())
: rc_fetch_source;
std::unordered_set<RouterID> fails;
while (num_failures < MAX_FETCH_ATTEMPTS)
{
RemoteRC& src_rc = known_rcs[src];
auto success = std::make_shared<std::promise<int>>();
auto f = success->get_future();
fails.clear();
for (const auto& target : router_id_fetch_sources)
{
_router.link_manager().fetch_router_ids(
src,
RouterIDFetch::serialize(target),
[this, src, src_rc, target, p = std::move(success)](oxen::quic::message m) mutable {
if (not m)
{
log::info(link_cat, "RID fetch from {} via {} timed out", src, target);
ingest_rid_fetch_responses(src_rc);
p->set_value(-1);
return;
}
try
{
oxenc::bt_dict_consumer btdc{m.body()};
btdc.required("routers");
auto router_id_strings = btdc.consume_list<std::vector<ustring>>();
btdc.require_signature("signature", [&src](ustring_view msg, ustring_view sig) {
if (sig.size() != 64)
throw std::runtime_error{"Invalid signature: not 64 bytes"};
if (not crypto::verify(src, msg, sig))
throw std::runtime_error{
"Failed to verify signature for fetch RouterIDs response."};
});
std::vector<RouterID> router_ids;
for (const auto& s : router_id_strings)
{
if (s.size() != RouterID::SIZE)
{
log::warning(
link_cat, "RID fetch from {} via {} returned bad RouterID", target, src);
p->set_value(0);
return;
}
router_ids.emplace_back(s.data());
}
ingest_rid_fetch_responses(src_rc, std::move(router_ids));
return;
}
catch (const std::exception& e)
{
log::info(link_cat, "Error handling fetch RouterIDs response: {}", e.what());
p->set_value(0);
}
ingest_rid_fetch_responses(src_rc); // empty response == failure
});
switch (f.get())
{
case 1:
log::debug(logcat, "Successfully fetched RID's from {} via {}", target, src);
continue;
case 0:
// RC node relayed our fetch routerID request, but the request failed at the target
log::debug(logcat, "Unsuccessfully fetched RID's from {} via {}", target, src);
fails.insert(target);
continue;
default:
// RC node failed to relay our routerID request; re-select RC node and continue
log::debug(logcat, "RC source {} failed to mediate RID fetching from {}", src, target);
src = (initial)
? *std::next(active_client_routers.begin(), csrng() % active_client_routers.size())
: std::next(known_rcs.begin(), csrng() % known_rcs.size())->first;
++num_failures;
fetch_rcs(num_failures);
continue;
}
}
auto n_fails = fails.size();
if (n_fails <= MAX_RID_ERRORS)
{
log::debug(
logcat,
"RID fetching was successful ({}/{} acceptable errors)",
fails.size(),
MAX_RID_ERRORS);
rc_fetch_source = src;
assert(_router.link_manager().have_connection_to(src));
// this is where the trust model will do verification based on the similarity of the sets
if (process_fetched_rids())
{
log::debug(logcat, "Accumulated RID's accepted by trust model");
return;
}
log::debug(
logcat, "Accumulated RID's rejected by trust model, reselecting all RID sources...");
select_router_id_sources(router_id_fetch_sources);
++num_failures;
continue;
}
// we had 4 or more failed requests, so we will need to rotate our rid sources
log::debug(
logcat, "RID fetching found {} failures; reselecting failed RID sources...", n_fails);
++num_failures;
select_router_id_sources(fails);
}
}
void
NodeDB::select_router_id_sources(std::unordered_set<RouterID> excluded)
{
// TODO: bootstrapping should be finished before this is called, so this
// shouldn't happen; need to make sure that's the case.
if (client_known_routers.empty())
// bootstrapping should be finished before this is called, so this
// shouldn't happen; need to make sure that's the case.
if (active_client_routers.empty())
return;
// keep using any we've been using, but remove `excluded` ones
@ -333,9 +476,9 @@ namespace llarp
router_id_fetch_sources.erase(r);
// only know so many routers, so no need to randomize
if (client_known_routers.size() <= (ROUTER_ID_SOURCE_COUNT + excluded.size()))
if (active_client_routers.size() <= (ROUTER_ID_SOURCE_COUNT + excluded.size()))
{
for (const auto& r : client_known_routers)
for (const auto& r : active_client_routers)
{
if (excluded.count(r))
continue;
@ -347,7 +490,7 @@ namespace llarp
while (router_id_fetch_sources.size() < ROUTER_ID_SOURCE_COUNT)
{
RouterID r;
std::sample(client_known_routers.begin(), client_known_routers.end(), &r, 1, csrng);
std::sample(active_client_routers.begin(), active_client_routers.end(), &r, 1, csrng);
if (excluded.count(r) == 0)
router_id_fetch_sources.insert(r);
}
@ -457,7 +600,7 @@ namespace llarp
// TODO: the list of relays should be maintained and stored separately from
// the RCs, as we keep older RCs around in case we go offline and need to
// bootstrap, but they shouldn't be in the "good relays" list.
client_known_routers.insert(rid);
active_client_routers.insert(rid);
return true;
});

@ -22,7 +22,10 @@ namespace llarp
{
struct Router;
inline constexpr size_t ROUTER_ID_SOURCE_COUNT{12};
inline constexpr size_t MIN_RID_FETCHES{8};
inline constexpr size_t MIN_ACTIVE_RIDS{24};
inline constexpr size_t MAX_RID_ERRORS{ROUTER_ID_SOURCE_COUNT - MIN_RID_FETCHES};
inline constexpr int MAX_FETCH_ATTEMPTS{10};
class NodeDB
@ -57,23 +60,20 @@ namespace llarp
std::unordered_map<RouterID, rc_time> last_rc_update_times;
// Client list of active RouterID's
std::unordered_set<RouterID> client_known_routers;
std::unordered_set<RouterID> active_client_routers;
// only ever use to specific edges as path first-hops
std::unordered_set<RouterID> pinned_edges;
// rc update info
// rc update info: we only set this upon a SUCCESSFUL fetching
RouterID rc_fetch_source;
rc_time last_rc_update_relay_timestamp;
static constexpr auto ROUTER_ID_SOURCE_COUNT = 12;
std::unordered_set<RouterID> router_id_fetch_sources;
std::unordered_map<RouterID, std::vector<RouterID>> router_id_fetch_responses;
// process responses once all are received (or failed/timed out)
size_t router_id_response_count{0};
std::unordered_map<RouterID, std::vector<RouterID>> router_id_fetch_responses;
bool router_id_fetch_in_progress{false};
bool
@ -136,22 +136,22 @@ namespace llarp
rotate_startup_rc_source();
void
ingest_rcs(RouterID source, std::vector<RemoteRC> rcs, rc_time timestamp);
store_fetched_rcs(RouterID source, std::vector<RemoteRC> rcs, rc_time timestamp);
void
ingest_router_ids(RouterID source, std::vector<RouterID> ids = {});
ingest_rid_fetch_responses(const RemoteRC& source, std::vector<RouterID> ids = {});
void
fetch_initial();
bool
process_fetched_rids();
bool
fetch_initial_rcs(const RouterID& src);
void
fetch_rcs();
fetch_rcs(int n_fails = 0, bool initial = false);
void
fetch_router_ids();
fetch_router_ids(int n_fails = 0, bool initial = false);
void
select_router_id_sources(std::unordered_set<RouterID> excluded = {});

@ -851,23 +851,28 @@ namespace llarp
}
else
{
int num_failures = 0;
if (needs_initial_fetch)
{
node_db()->fetch_initial();
node_db()->fetch_rcs(num_failures, true);
last_rc_fetch = now_timepoint;
node_db()->fetch_router_ids(num_failures, true);
last_routerid_fetch = now_timepoint;
}
else
{
// (client-only) periodically fetch updated RCs
if (now_timepoint - last_rc_fetch > RC_UPDATE_INTERVAL)
{
node_db()->fetch_rcs();
node_db()->fetch_rcs(num_failures);
last_rc_fetch = now_timepoint;
}
// (client-only) periodically fetch updated RouterID list
if (now_timepoint - last_routerid_fetch > ROUTERID_UPDATE_INTERVAL)
{
node_db()->fetch_router_ids();
node_db()->fetch_router_ids(num_failures);
last_routerid_fetch = now_timepoint;
}
}

Loading…
Cancel
Save