* use weak_ptr on core rpc

* use reachability testing code lifted storage server's code
pull/1659/head
Jeff Becker 3 years ago
parent b830eeb535
commit 9ad90d029d
No known key found for this signature in database
GPG Key ID: F357B3B42F6F9B05

@ -103,6 +103,7 @@ add_library(liblokinet
dns/unbound_resolver.cpp
consensus/table.cpp
consensus/reachability_testing.cpp
bootstrap.cpp
context.cpp

@ -0,0 +1,160 @@
#include "reachability_testing.hpp"
#include <chrono>
#include <llarp/router/abstractrouter.hpp>
#include <llarp/util/logging/logger.hpp>
#include <llarp/crypto/crypto.hpp>
using std::chrono::steady_clock;
namespace llarp::consensus
{
namespace detail
{
std::mt19937_64&
rng()
{
static thread_local std::mt19937_64 generator{std::random_device{}()};
return generator;
}
} // namespace detail
using fseconds = std::chrono::duration<float, std::chrono::seconds::period>;
using fminutes = std::chrono::duration<float, std::chrono::minutes::period>;
static void
check_incoming_tests_impl(
std::string_view name,
const time_point_t& now,
const time_point_t& startup,
detail::incoming_test_state& incoming)
{
const auto elapsed = now - std::max(startup, incoming.last_test);
bool failing = elapsed > reachability_testing::MAX_TIME_WITHOUT_PING;
bool whine = failing != incoming.was_failing
|| (failing && now - incoming.last_whine > reachability_testing::WHINING_INTERVAL);
incoming.was_failing = failing;
if (whine)
{
incoming.last_whine = now;
if (!failing)
{
LogInfo(name, " ping received; port is likely reachable again");
}
else
{
if (incoming.last_test.time_since_epoch() == 0s)
{
LogWarn("Have NEVER received ", name, " pings!");
}
else
{
LogWarn(
"Have not received ",
name,
" pings for a long time: ",
fminutes{elapsed}.count(),
" minutes");
}
LogWarn(
"Please check your ",
name,
" port. Not being reachable "
"over ",
name,
" may result in a deregistration!");
}
}
}
void
reachability_testing::check_incoming_tests(const time_point_t& now)
{
check_incoming_tests_impl("lokinet", now, startup, last);
}
void
reachability_testing::incoming_ping(const time_point_t& now)
{
last.last_test = now;
}
std::optional<RouterID>
reachability_testing::next_random(AbstractRouter* router, const time_point_t& now, bool requeue)
{
if (next_general_test > now)
return std::nullopt;
next_general_test = now
+ std::chrono::duration_cast<time_point_t::duration>(
fseconds(TESTING_INTERVAL(detail::rng())));
// Pull the next element off the queue, but skip ourself, any that are no longer registered, and
// any that are currently known to be failing (those are queued for testing separately).
RouterID my_pk{router->pubkey()};
while (!testing_queue.empty())
{
auto& pk = testing_queue.back();
std::optional<RouterID> sn;
if (pk != my_pk && !failing.count(pk))
sn = pk;
testing_queue.pop_back();
if (sn)
return sn;
}
if (!requeue)
return std::nullopt;
// FIXME: when a *new* node comes online we need to inject it into a random position in the SN
// list with probability (L/N) [L = current list size, N = potential list size]
//
// (FIXME: put this FIXME in a better place ;-) )
// We exhausted the queue so repopulate it and try again
testing_queue.clear();
const auto all = router->GetRouterWhitelist();
testing_queue.insert(testing_queue.begin(), all.begin(), all.end());
std::shuffle(testing_queue.begin(), testing_queue.end(), detail::rng());
// Recurse with the rebuild list, but don't let it try rebuilding again
return next_random(router, now, false);
}
std::vector<std::pair<RouterID, int>>
reachability_testing::get_failing(AbstractRouter*, const time_point_t& now)
{
// Our failing_queue puts the oldest retest times at the top, so pop them off into our result
// until the top node should be retested sometime in the future
std::vector<std::pair<RouterID, int>> result;
while (result.size() < MAX_RETESTS_PER_TICK && !failing_queue.empty())
{
auto& [pk, retest_time, failures] = failing_queue.top();
if (retest_time > now)
break;
result.emplace_back(pk, failures);
failing.erase(pk);
failing_queue.pop();
}
return result;
}
void
reachability_testing::add_failing_node(const RouterID& pk, int previous_failures)
{
using namespace std::chrono;
if (previous_failures < 0)
previous_failures = 0;
auto next_test_in = duration_cast<time_point_t::duration>(
previous_failures * TESTING_BACKOFF + fseconds{TESTING_INTERVAL(detail::rng())});
if (next_test_in > TESTING_BACKOFF_MAX)
next_test_in = TESTING_BACKOFF_MAX;
failing_queue.emplace(pk, steady_clock::now() + next_test_in, previous_failures + 1);
}
} // namespace llarp::consensus

@ -0,0 +1,144 @@
#pragma once
#include <chrono>
#include <queue>
#include <random>
#include <unordered_map>
#include <unordered_set>
#include <vector>
#include <llarp/util/time.hpp>
#include <llarp/router_id.hpp>
namespace llarp
{
struct AbstractRouter;
}
namespace llarp::consensus
{
namespace detail
{
using clock_t = std::chrono::steady_clock;
using time_point_t = std::chrono::time_point<clock_t>;
// Returns std::greater on the std::get<N>(v)th element value.
template <typename T, size_t N>
struct nth_greater
{
constexpr bool
operator()(const T& lhs, const T& rhs) const
{
return std::greater<std::tuple_element_t<N, T>>{}(std::get<N>(lhs), std::get<N>(rhs));
}
};
struct incoming_test_state
{
time_point_t last_test{};
time_point_t last_whine{};
bool was_failing = false;
};
} // namespace detail
using time_point_t = detail::time_point_t;
using clock_t = detail::clock_t;
class reachability_testing
{
public:
// How often we tick the timer to check whether we need to do any tests.
inline static constexpr auto TESTING_TIMER_INTERVAL = 50ms;
// Distribution for the seconds between node tests: we throw in some randomness to avoid
// potential clustering of tests. (Note that there is some granularity here as the test timer
// only runs every TESTING_TIMER_INTERVAL).
inline static thread_local std::normal_distribution<float> TESTING_INTERVAL{10.0, 3.0};
// The linear backoff after each consecutive test failure before we re-test. Specifically we
// schedule the next re-test for (TESTING_BACKOFF*previous_failures) + TESTING_INTERVAL(rng).
inline static constexpr auto TESTING_BACKOFF = 10s;
// The upper bound for the re-test interval.
inline static constexpr auto TESTING_BACKOFF_MAX = 2min;
// The maximum number of nodes that we will re-test at once (i.e. per TESTING_TIMING_INTERVAL);
// mainly intended to throttle ourselves if, for instance, our own connectivity loss makes us
// accumulate tons of nodes to test all at once. (Despite the random intervals, this can happen
// if we also get decommissioned during which we can't test at all but still have lots of
// failing nodes we want to test right away when we get recommissioned).
inline static constexpr int MAX_RETESTS_PER_TICK = 4;
// Maximum time without a ping before we start whining about it.
//
// We have a probability of about 0.368* of *not* getting pinged within a ping interval (10s),
// and so the probability of not getting a ping for 2 minutes (i.e. 12 test spans) just because
// we haven't been selected is extremely small (0.0000061). It also coincides nicely with
// blockchain time (i.e. two minutes) and our max testing backoff.
//
// * = approx value of ((n-1)/n)^n for non-tiny values of n
inline static constexpr auto MAX_TIME_WITHOUT_PING = 2min;
// How often we whine in the logs about being unreachable
inline static constexpr auto WHINING_INTERVAL = 2min;
private:
// Queue of pubkeys of service nodes to test; we pop off the back of this until the queue
// empties then we refill it with a shuffled list of all pubkeys then pull off of it until it is
// empty again, etc.
std::vector<RouterID> testing_queue;
// The next time for a general test
time_point_t next_general_test = time_point_t::min();
// When we started, so that we know not to hold off on whining about no pings for a while.
const time_point_t startup = clock_t::now();
// Pubkeys, next test times, and sequential failure counts of service nodes that are currently
// in "failed" status along with the last time they failed; we retest them first after 10s then
// back off linearly by an additional 10s up to a max testing interval of 2m30s, until we get a
// successful response.
using FailingPK = std::tuple<RouterID, time_point_t, int>;
std::priority_queue<FailingPK, std::vector<FailingPK>, detail::nth_greater<FailingPK, 1>>
failing_queue;
std::unordered_set<RouterID> failing;
// Track the last time *this node* was tested by other network nodes; used to detect and warn
// about possible network issues.
detail::incoming_test_state last;
public:
// If it is time to perform another random test, this returns the next node to test from the
// testing queue and returns it, also updating the timer for the next test. If it is not yet
// time, or if the queue is empty and cannot current be replenished, returns std::nullopt. If
// the queue empties then this builds a new one by shuffling current public keys in the swarm's
// "all nodes" then starts using the new queue for this an subsequent calls.
//
// `requeue` is mainly for internal use: if false it avoids rebuilding the queue if we run
// out (and instead just return nullopt).
std::optional<RouterID>
next_random(
AbstractRouter* router, const time_point_t& now = clock_t::now(), bool requeue = true);
// Removes and returns up to MAX_RETESTS_PER_TICK nodes that are due to be tested (i.e.
// next-testing-time <= now). Returns [snrecord, #previous-failures] for each.
std::vector<std::pair<RouterID, int>>
get_failing(AbstractRouter* router, const time_point_t& now = clock_t::now());
// Adds a bad node pubkey to the failing list, to be re-tested soon (with a backoff depending on
// `failures`; see TESTING_BACKOFF). `previous_failures` should be the number of previous
// failures *before* this one, i.e. 0 for a random general test; or the failure count returned
// by `get_failing` for repeated failures.
void
add_failing_node(const RouterID& pk, int previous_failures = 0);
// Called when this router receives an incomming session
void
incoming_ping(const time_point_t& now = clock_t::now());
// Check whether we received incoming pings recently
void
check_incoming_tests(const time_point_t& now = clock_t::now());
};
} // namespace llarp::consensus

@ -12,6 +12,7 @@
#include <llarp/router_contact.hpp>
#include <llarp/tooling/router_event.hpp>
#include <llarp/peerstats/peer_db.hpp>
#include <llarp/consensus/reachability_testing.hpp>
#include <optional>
@ -285,6 +286,9 @@ namespace llarp
virtual void
SetRouterWhitelist(const std::vector<RouterID> routers) = 0;
virtual std::unordered_set<RouterID>
GetRouterWhitelist() const = 0;
/// visit each connected link session
virtual void
ForEachPeer(std::function<void(const ILinkSession*, bool)> visit, bool randomize) const = 0;

@ -39,24 +39,15 @@ namespace llarp
const auto router = RouterID(session->GetPubKey());
const bool isOutbound = not session->IsInbound();
const std::string remoteType = session->GetRemoteRC().IsPublicRouter() ? "router" : "client";
LogInfo("session with ", remoteType, " [", router, "] established");
LogInfo(
"session with ", remoteType, " [", router, "] ", isOutbound ? "established" : "received");
if (not _rcLookup->RemoteIsAllowed(router))
{
FinalizeRequest(router, SessionResult::InvalidRouter);
return false;
}
if (isOutbound)
{
// add a callback for this router if it's outbound to inform core if applicable
if (auto rpc = _router->RpcClient())
{
util::Lock l{_mutex};
pendingCallbacks[router].emplace_back([rpc](const auto& router, const auto result) {
rpc->RecordConnection(router, result == SessionResult::Establish);
});
}
}
work([this, rc = session->GetRemoteRC()] { VerifyRC(rc); });
return true;
@ -67,13 +58,6 @@ namespace llarp
{
const auto router = RouterID(session->GetPubKey());
LogWarn("Session establish attempt to ", router, " timed out.", session->GetRemoteEndpoint());
// inform core if needed
if (auto rpc = _router->RpcClient())
{
rpc->RecordConnection(router, false);
}
FinalizeRequest(router, SessionResult::Timeout);
}

@ -84,6 +84,13 @@ namespace llarp
bool useWhitelist_arg,
bool isServiceNode_arg);
std::unordered_set<RouterID>
Whitelist() const
{
util::Lock lock{_mutex};
return whitelistRouters;
}
private:
void
HandleDHTLookupResult(RouterID remote, const std::vector<RouterContact>& results);

@ -63,7 +63,6 @@ namespace llarp
#else
, _randomStartDelay(std::chrono::seconds((llarp::randint() % 30) + 10))
#endif
, m_lokidRpcClient(std::make_shared<rpc::LokidRpcClient>(m_lmq, this))
{
m_keyManager = std::make_shared<KeyManager>();
// for lokid, so we don't close the connection when syncing the whitelist
@ -290,7 +289,10 @@ namespace llarp
auto& conf = *m_Config;
whitelistRouters = conf.lokid.whitelistRouters;
if (whitelistRouters)
{
lokidRPCAddr = oxenmq::address(conf.lokid.lokidRPCAddr);
m_lokidRpcClient = std::make_shared<rpc::LokidRpcClient>(m_lmq, weak_from_this());
}
enableRPCServer = conf.api.m_enableRPCServer;
if (enableRPCServer)
@ -748,7 +750,7 @@ namespace llarp
ss << " snode | known/svc/clients: " << nodedb()->NumLoaded() << "/"
<< NumberOfConnectedRouters() << "/" << NumberOfConnectedClients() << " | "
<< pathContext().CurrentTransitPaths() << " active paths | "
<< "block " << m_lokidRpcClient->BlockHeight();
<< "block " << (m_lokidRpcClient ? m_lokidRpcClient->BlockHeight() : 0);
}
else
{
@ -1184,21 +1186,37 @@ namespace llarp
if (whitelistRouters)
{
// do service node testing if we are in service node whitelist mode
_loop->call_every(5s, weak_from_this(), [this] {
// dont run tests if we are not running or we are stopping
if (not _running)
return;
// get a random router to test by pubkey
RouterID router{};
if (not _rcLookupHandler.GetRandomWhitelistRouter(router))
{
LogError("could not get random whitelisted router for testing");
return;
}
// try to make a session to this random router
// this will do a dht lookup if needed
_outboundSessionMaker.CreateSessionTo(router, nullptr);
});
_loop->call_every(
consensus::reachability_testing::TESTING_TIMER_INTERVAL, weak_from_this(), [this] {
// dont run tests if we are not running or we are stopping
if (not _running)
return;
auto tests = m_routerTesting.get_failing(this);
if (auto maybe = m_routerTesting.next_random(this))
{
tests.emplace_back(*maybe, 0);
}
for (const auto& [router, fails] : tests)
{
// try to make a session to this random router
// this will do a dht lookup if needed
_outboundSessionMaker.CreateSessionTo(
router, [fails, this](const auto& router, const auto result) {
auto rpc = RpcClient();
if (result != SessionResult::Establish)
{
// failed connection mark it as so
m_routerTesting.add_failing_node(router, fails);
}
if (rpc)
{
// inform as needed
rpc->InformConnection(router, result == SessionResult::Establish);
}
});
}
});
}
LogContext::Instance().DropToRuntimeLevel();
return _running;

@ -133,6 +133,12 @@ namespace llarp
void
SetRouterWhitelist(const std::vector<RouterID> routers) override;
std::unordered_set<RouterID>
GetRouterWhitelist() const override
{
return _rcLookupHandler.Whitelist();
}
exit::Context&
exitContext() override
{
@ -536,6 +542,8 @@ namespace llarp
uint32_t path_build_count = 0;
consensus::reachability_testing m_routerTesting;
bool
ShouldReportStats(llarp_time_t now) const;

@ -32,8 +32,8 @@ namespace llarp
}
}
LokidRpcClient::LokidRpcClient(LMQ_ptr lmq, AbstractRouter* r)
: m_lokiMQ(std::move(lmq)), m_Router(r)
LokidRpcClient::LokidRpcClient(LMQ_ptr lmq, std::weak_ptr<AbstractRouter> r)
: m_lokiMQ{std::move(lmq)}, m_Router{std::move(r)}
{
// m_lokiMQ->log_level(toLokiMQLogLevel(LogLevel::Instance().curLevel));
@ -51,18 +51,24 @@ namespace llarp
void
LokidRpcClient::ConnectAsync(oxenmq::address url)
{
if (not m_Router->IsServiceNode())
if (auto router = m_Router.lock())
{
throw std::runtime_error("we cannot talk to lokid while not a service node");
if (not router->IsServiceNode())
{
throw std::runtime_error("we cannot talk to lokid while not a service node");
}
LogInfo("connecting to lokid via LMQ at ", url);
m_Connection = m_lokiMQ->connect_remote(
url,
[self = shared_from_this()](oxenmq::ConnectionID) { self->Connected(); },
[self = shared_from_this(), url](oxenmq::ConnectionID, std::string_view f) {
llarp::LogWarn("Failed to connect to lokid: ", f);
if (auto router = self->m_Router.lock())
{
router->loop()->call([self, url]() { self->ConnectAsync(url); });
}
});
}
LogInfo("connecting to lokid via LMQ at ", url);
m_Connection = m_lokiMQ->connect_remote(
url,
[self = shared_from_this()](oxenmq::ConnectionID) { self->Connected(); },
[self = shared_from_this(), url](oxenmq::ConnectionID, std::string_view f) {
llarp::LogWarn("Failed to connect to lokid: ", f);
self->m_Router->loop()->call([self, url]() { self->ConnectAsync(url); });
});
}
void
@ -211,34 +217,41 @@ namespace llarp
return;
}
// inform router about the new list
m_Router->loop()->call(
[this, nodeList = std::move(nodeList), keymap = std::move(keymap)]() mutable {
m_KeyMap = std::move(keymap);
m_Router->SetRouterWhitelist(std::move(nodeList));
});
if (auto router = m_Router.lock())
{
router->loop()->call(
[this, nodeList = std::move(nodeList), keymap = std::move(keymap)]() mutable {
m_KeyMap = std::move(keymap);
if (auto router = m_Router.lock())
router->SetRouterWhitelist(std::move(nodeList));
});
}
}
void
LokidRpcClient::RecordConnection(RouterID router, bool success)
LokidRpcClient::InformConnection(RouterID router, bool success)
{
m_Router->loop()->call([router, success, this]() {
if (auto itr = m_KeyMap.find(router); itr != m_KeyMap.end())
{
const nlohmann::json request = {
{"passed", success}, {"pubkey", itr->second.ToHex()}, {"type", "lokinet"}};
Request(
"admin.report_peer_status",
[self = shared_from_this()](bool success, std::vector<std::string>) {
if (not success)
{
LogError("Failed to report connection status to oxend");
return;
}
LogDebug("reported connection status to core");
},
request.dump());
}
});
if (auto r = m_Router.lock())
{
r->loop()->call([router, success, this]() {
if (auto itr = m_KeyMap.find(router); itr != m_KeyMap.end())
{
const nlohmann::json request = {
{"passed", success}, {"pubkey", itr->second.ToHex()}, {"type", "lokinet"}};
Request(
"admin.report_peer_status",
[self = shared_from_this()](bool success, std::vector<std::string>) {
if (not success)
{
LogError("Failed to report connection status to oxend");
return;
}
LogDebug("reported connection status to core");
},
request.dump());
}
});
}
}
SecretKey
@ -294,7 +307,7 @@ namespace llarp
const nlohmann::json req{{"type", 2}, {"name_hash", namehash.ToHex()}};
Request(
"rpc.lns_resolve",
[r = m_Router, resultHandler](bool success, std::vector<std::string> data) {
[this, resultHandler](bool success, std::vector<std::string> data) {
std::optional<service::EncryptedName> maybe = std::nullopt;
if (success)
{
@ -318,8 +331,11 @@ namespace llarp
LogError("failed to parse response from lns lookup: ", ex.what());
}
}
r->loop()->call(
[resultHandler, maybe = std::move(maybe)]() { resultHandler(std::move(maybe)); });
if (auto r = m_Router.lock())
{
r->loop()->call(
[resultHandler, maybe = std::move(maybe)]() { resultHandler(std::move(maybe)); });
}
},
req.dump());
}
@ -333,65 +349,66 @@ namespace llarp
LogInfo(" :", str);
}
assert(m_Router != nullptr);
if (not m_Router->peerDb())
{
LogWarn("HandleGetPeerStats called when router has no peerDb set up.");
// TODO: this can sometimes occur if lokid hits our API before we're done configuring
// (mostly an issue in a loopback testnet)
msg.send_reply("EAGAIN");
return;
}
try
if (auto router = m_Router.lock())
{
// msg.data[0] is expected to contain a bt list of router ids (in our preferred string
// format)
if (msg.data.empty())
if (not router->peerDb())
{
LogWarn("lokid requested peer stats with no request body");
msg.send_reply("peer stats request requires list of router IDs");
LogWarn("HandleGetPeerStats called when router has no peerDb set up.");
// TODO: this can sometimes occur if lokid hits our API before we're done configuring
// (mostly an issue in a loopback testnet)
msg.send_reply("EAGAIN");
return;
}
std::vector<std::string> routerIdStrings;
oxenmq::bt_deserialize(msg.data[0], routerIdStrings);
std::vector<RouterID> routerIds;
routerIds.reserve(routerIdStrings.size());
for (const auto& routerIdString : routerIdStrings)
try
{
RouterID id;
if (not id.FromString(routerIdString))
// msg.data[0] is expected to contain a bt list of router ids (in our preferred string
// format)
if (msg.data.empty())
{
LogWarn("lokid sent us an invalid router id: ", routerIdString);
msg.send_reply("Invalid router id");
LogWarn("lokid requested peer stats with no request body");
msg.send_reply("peer stats request requires list of router IDs");
return;
}
routerIds.push_back(std::move(id));
}
std::vector<std::string> routerIdStrings;
oxenmq::bt_deserialize(msg.data[0], routerIdStrings);
std::vector<RouterID> routerIds;
routerIds.reserve(routerIdStrings.size());
for (const auto& routerIdString : routerIdStrings)
{
RouterID id;
if (not id.FromString(routerIdString))
{
LogWarn("lokid sent us an invalid router id: ", routerIdString);
msg.send_reply("Invalid router id");
return;
}
auto statsList = m_Router->peerDb()->listPeerStats(routerIds);
routerIds.push_back(std::move(id));
}
int32_t bufSize =
256 + (statsList.size() * 1024); // TODO: tune this or allow to grow dynamically
auto buf = std::unique_ptr<uint8_t[]>(new uint8_t[bufSize]);
llarp_buffer_t llarpBuf(buf.get(), bufSize);
auto statsList = router->peerDb()->listPeerStats(routerIds);
PeerStats::BEncodeList(statsList, &llarpBuf);
int32_t bufSize =
256 + (statsList.size() * 1024); // TODO: tune this or allow to grow dynamically
auto buf = std::unique_ptr<uint8_t[]>(new uint8_t[bufSize]);
llarp_buffer_t llarpBuf(buf.get(), bufSize);
msg.send_reply(std::string_view((const char*)llarpBuf.base, llarpBuf.cur - llarpBuf.base));
}
catch (const std::exception& e)
{
LogError("Failed to handle get_peer_stats request: ", e.what());
msg.send_reply("server error");
PeerStats::BEncodeList(statsList, &llarpBuf);
msg.send_reply(
std::string_view((const char*)llarpBuf.base, llarpBuf.cur - llarpBuf.base));
}
catch (const std::exception& e)
{
LogError("Failed to handle get_peer_stats request: ", e.what());
msg.send_reply("server error");
}
}
}
} // namespace rpc
} // namespace llarp

@ -19,7 +19,7 @@ namespace llarp
/// The LokidRpcClient uses loki-mq to talk to make API requests to lokid.
struct LokidRpcClient : public std::enable_shared_from_this<LokidRpcClient>
{
explicit LokidRpcClient(LMQ_ptr lmq, AbstractRouter* r);
explicit LokidRpcClient(LMQ_ptr lmq, std::weak_ptr<AbstractRouter> r);
/// Connect to lokid async
void
@ -42,9 +42,9 @@ namespace llarp
dht::Key_t namehash,
std::function<void(std::optional<service::EncryptedName>)> resultHandler);
/// record that if connected to a router successfully
/// inform that if connected to a router successfully
void
RecordConnection(RouterID router, bool success);
InformConnection(RouterID router, bool success);
private:
/// called when we have connected to lokid via lokimq
@ -86,7 +86,7 @@ namespace llarp
std::optional<oxenmq::ConnectionID> m_Connection;
LMQ_ptr m_lokiMQ;
AbstractRouter* const m_Router;
std::weak_ptr<AbstractRouter> m_Router;
std::atomic<bool> m_UpdatingList;
std::unordered_map<RouterID, PubKey> m_KeyMap;

Loading…
Cancel
Save