Merge pull request #1318 from notlesh/peer-stats-follow-up-2020-07-09

Peer stats follow up 2020 07 09
pull/1321/head
Jeff 4 years ago committed by GitHub
commit 998d4c4ec3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -1,4 +1,4 @@
local default_deps_base='libsystemd-dev python3-dev libcurl4-openssl-dev libuv1-dev libunbound-dev nettle-dev libssl-dev libevent-dev';
local default_deps_base='libsystemd-dev python3-dev libcurl4-openssl-dev libuv1-dev libunbound-dev nettle-dev libssl-dev libevent-dev libsqlite3-dev';
local default_deps_nocxx='libsodium-dev ' + default_deps_base; // libsodium-dev needs to be >= 1.0.18
local default_deps='g++ ' + default_deps_nocxx; // g++ sometimes needs replacement
local default_windows_deps='mingw-w64-binutils mingw-w64-gcc mingw-w64-crt mingw-w64-headers mingw-w64-winpthreads perl openssh zip bash'; // deps for windows cross compile

3
.gitmodules vendored

@ -31,3 +31,6 @@
path = external/loki-mq
url = https://github.com/loki-project/loki-mq
branch = dev
[submodule "external/sqlite_orm"]
path = external/sqlite_orm
url = https://github.com/fnc12/sqlite_orm

@ -305,6 +305,7 @@ if(SUBMODULE_CHECK)
check_submodule(external/ghc-filesystem)
check_submodule(external/date)
check_submodule(external/pybind11)
check_submodule(external/sqlite_orm)
if (NOT WIN32) # we grab libuv for windows separately in win32-setup/libuv. see note in cmake/win32.cmake.
check_submodule(external/libuv)
endif()
@ -324,6 +325,8 @@ add_subdirectory(external/nlohmann EXCLUDE_FROM_ALL)
add_subdirectory(external/cxxopts EXCLUDE_FROM_ALL)
add_subdirectory(external/date EXCLUDE_FROM_ALL)
include_directories(SYSTEM external/sqlite_orm/include)
add_subdirectory(vendor)
if(ANDROID)

@ -25,6 +25,13 @@ set(UNBOUND_SOURCE unbound-${UNBOUND_VERSION}.tar.gz)
set(UNBOUND_HASH SHA256=b73677c21a71cf92f15cc8cfe76a3d875e40f65b6150081c39620b286582d536
CACHE STRING "unbound source hash")
set(SQLITE3_VERSION 3320200 CACHE STRING "sqlite3 version")
set(SQLITE3_MIRROR ${LOCAL_MIRROR} https://www.sqlite.org/2020
CACHE STRING "sqlite3 download mirror(s)")
set(SQLITE3_SOURCE sqlite-autoconf-${SQLITE3_VERSION}.tar.gz)
set(SQLITE3_HASH SHA512=5b551a1366ce4fd5dfaa687e5021194d34315935b26dd7d71f8abc9935d03c3caea323263a8330fb42038c487cd399e95de68e451cc26d573f852f219c00a02f
CACHE STRING "sqlite3 source hash")
set(SODIUM_VERSION 1.0.18 CACHE STRING "libsodium version")
set(SODIUM_MIRROR ${LOCAL_MIRROR}
https://download.libsodium.org/libsodium/releases
@ -193,6 +200,9 @@ endif()
build_external(sodium)
add_static_target(sodium sodium_external libsodium.a)
build_external(sqlite3)
add_static_target(sqlite3 sqlite3_external libsqlite3.a)
if(ZMQ_VERSION VERSION_LESS 4.3.3 AND CMAKE_CROSSCOMPILING AND ARCH_TRIPLET MATCHES mingw)
set(zmq_patch PATCH_COMMAND patch -p1 -i ${PROJECT_SOURCE_DIR}/contrib/cross/patches/libzmq-pr3601-mingw-build-fix.patch

@ -98,8 +98,8 @@ run_main_context(const fs::path confFile, const llarp::RuntimeOptions opts)
llarp::Config conf;
conf.Load(confFile, opts.isRouter, confFile.parent_path());
ctx = std::shared_ptr<llarp::Context>();
ctx->Configure(opts, {});
ctx = std::make_shared<llarp::Context>();
ctx->Configure(conf);
signal(SIGINT, handle_signal);
signal(SIGTERM, handle_signal);
@ -297,7 +297,23 @@ main(int argc, char* argv[])
} while (ftr.wait_for(std::chrono::seconds(1)) != std::future_status::ready);
main_thread.join();
const auto code = ftr.get();
int code = 0;
try
{
code = ftr.get();
}
catch (const std::exception& e)
{
std::cerr << "main thread threw exception: " << e.what() << std::endl;
code = 1;
}
catch (...)
{
std::cerr << "main thread threw non-standard exception" << std::endl;
code = 2;
}
llarp::LogContext::Instance().ImmediateFlush();
#ifdef _WIN32

@ -0,0 +1 @@
Subproject commit f7ef17a6bde6162e8b487deb36519bace412920a

@ -5,6 +5,8 @@
#include <util/types.hpp>
#include <ev/ev.hpp>
#include <nodedb.hpp>
#include <crypto/crypto.hpp>
#include <router/abstractrouter.hpp>
#include <iostream>
#include <map>
@ -14,20 +16,10 @@
struct llarp_ev_loop;
#ifdef LOKINET_HIVE
namespace tooling
{
struct RouterHive;
} // namespace tooling
#endif
namespace llarp
{
class Logic;
struct AbstractRouter;
struct Config;
struct Crypto;
struct CryptoManager;
struct RouterContact;
namespace thread
{
@ -43,12 +35,11 @@ namespace llarp
struct Context
{
std::unique_ptr<Crypto> crypto;
std::unique_ptr<CryptoManager> cryptoManager;
std::unique_ptr<AbstractRouter> router;
std::shared_ptr<Logic> logic;
std::unique_ptr<Config> config;
std::unique_ptr<llarp_nodedb> nodedb;
std::unique_ptr<Crypto> crypto = nullptr;
std::unique_ptr<CryptoManager> cryptoManager = nullptr;
std::unique_ptr<AbstractRouter> router = nullptr;
std::shared_ptr<Logic> logic = nullptr;
std::unique_ptr<llarp_nodedb> nodedb = nullptr;
llarp_ev_loop_ptr mainloop;
std::string nodedb_dir;
@ -67,8 +58,11 @@ namespace llarp
void
HandleSignal(int sig);
bool
Configure(const RuntimeOptions& opts, std::optional<fs::path> dataDir);
/// Configure given the specified config.
///
/// note: consider using std::move() when passing conf in.
void
Configure(Config conf);
bool
IsUp() const;
@ -90,16 +84,20 @@ namespace llarp
bool
CallSafe(std::function<void(void)> f);
#ifdef LOKINET_HIVE
void
InjectHive(tooling::RouterHive* hive);
#endif
/// Creates a router. Can be overridden to allow a different class of router
/// to be created instead. Defaults to llarp::Router.
virtual std::unique_ptr<AbstractRouter>
makeRouter(
llarp_ev_loop_ptr __netloop,
std::shared_ptr<Logic> logic);
protected:
std::unique_ptr<Config> config = nullptr;
private:
void
SigINT();
std::string configfile;
std::unique_ptr<std::promise<void>> closeWaiter;
};

@ -28,19 +28,25 @@ add_dependencies(lokinet-util genversion)
target_include_directories(lokinet-util PUBLIC ${CMAKE_CURRENT_SOURCE_DIR} ${PROJECT_SOURCE_DIR}/include)
if(NOT TARGET sqlite3)
add_library(sqlite3 INTERFACE)
pkg_check_modules(SQLITE3 REQUIRED IMPORTED_TARGET sqlite3)
target_link_libraries(sqlite3 INTERFACE PkgConfig::SQLITE3)
endif()
target_link_libraries(lokinet-util PUBLIC
lokinet-cryptography
nlohmann_json::nlohmann_json
filesystem
date::date
lokimq
sqlite3
)
if(ANDROID)
target_link_libraries(lokinet-util PUBLIC log)
endif()
add_library(lokinet-platform
# for networking
ev/ev.cpp
@ -156,6 +162,8 @@ add_library(liblokinet
path/pathbuilder.cpp
path/pathset.cpp
path/transit_hop.cpp
peerstats/peer_db.cpp
peerstats/types.cpp
pow.cpp
profiling.cpp
router/outbound_message_handler.cpp
@ -219,7 +227,11 @@ if(TESTNET)
endif()
if(WITH_HIVE)
target_sources(liblokinet PRIVATE tooling/router_hive.cpp)
target_sources(liblokinet PRIVATE
tooling/router_hive.cpp
tooling/hive_router.cpp
tooling/hive_context.cpp
)
endif()
target_link_libraries(liblokinet PUBLIC cxxopts lokinet-platform lokinet-util lokinet-cryptography)

@ -36,6 +36,7 @@ namespace llarp
constexpr int DefaultWorkerThreads = 1;
constexpr int DefaultNetThreads = 1;
constexpr bool DefaultBlockBogons = true;
constexpr bool DefaultEnablePeerStats = false;
conf.defineOption<int>("router", "job-queue-size", false, DefaultJobQueueSize, [this](int arg) {
if (arg < 1024)
@ -128,6 +129,21 @@ namespace llarp
conf.defineOption<std::string>(
"router", "transport-privkey", false, "", AssignmentAcceptor(m_transportKeyFile));
if (not params.isRelay)
{
// TODO: remove this -- all service nodes should run peer db
conf.defineOption<bool>(
"router",
"enable-peer-stats",
false,
DefaultEnablePeerStats,
AssignmentAcceptor(m_enablePeerStats));
}
else
{
m_enablePeerStats = true;
}
}
void
@ -987,6 +1003,40 @@ namespace llarp
"File containing service node's seed.",
});
// extra [network] options
// TODO: probably better to create an [exit] section and only allow it for routers
def.addOptionComments(
"network",
"exit",
{
"Whether or not we should act as an exit node. Beware that this increases demand",
"on the server and may pose liability concerns. Enable at your own risk.",
});
// TODO: define the order of precedence (e.g. is whitelist applied before blacklist?)
// additionally, what's default? What if I don't whitelist anything?
def.addOptionComments(
"network",
"exit-whitelist",
{
"List of destination protocol:port pairs to whitelist, example: udp:*",
"or tcp:80. Multiple values supported.",
});
def.addOptionComments(
"network",
"exit-blacklist",
{
"Blacklist of destinations (same format as whitelist).",
});
def.addOptionComments(
"router",
"enable-peer-stats",
{
"Enable collection of SNode peer stats",
});
return def.generateINIConfig(true);
}

@ -24,8 +24,6 @@
#include <lokimq/address.h>
struct llarp_config;
namespace llarp
{
using SectionValues_t = llarp::ConfigParser::SectionValues_t;
@ -64,6 +62,8 @@ namespace llarp
std::string m_identityKeyFile;
std::string m_transportKeyFile;
bool m_enablePeerStats = false;
void
defineConfigOptions(ConfigDefinition& conf, const ConfigGenParameters& params);
};
@ -222,9 +222,6 @@ namespace llarp
std::string
generateBaseRouterConfig(fs::path defaultDataDir);
llarp_config*
Copy() const;
};
void

@ -27,31 +27,17 @@ namespace llarp
return logic && LogicCall(logic, f);
}
bool
Context::Configure(const RuntimeOptions& opts, std::optional<fs::path> dataDir)
void
Context::Configure(Config conf)
{
if (config)
throw std::runtime_error("Re-configure not supported");
config = std::make_unique<Config>();
if (nullptr != config.get())
throw std::runtime_error("Config already exists");
fs::path defaultDataDir = dataDir ? *dataDir : GetDefaultDataDir();
if (configfile.size())
{
if (!config->Load(configfile.c_str(), opts.isRouter, defaultDataDir))
{
config.release();
llarp::LogError("failed to load config file ", configfile);
return false;
}
}
config = std::make_unique<Config>(std::move(conf));
logic = std::make_shared<Logic>();
nodedb_dir = fs::path(config->router.m_dataDir / nodedb_dirname).string();
return true;
}
bool
@ -76,6 +62,10 @@ namespace llarp
void
Context::Setup(const RuntimeOptions& opts)
{
/// Call one of the Configure() methods before calling Setup()
if (not config)
throw std::runtime_error("Cannot call Setup() on context without a Config");
llarp::LogInfo(llarp::VERSION_FULL, " ", llarp::RELEASE_MOTTO);
llarp::LogInfo("starting up");
if (mainloop == nullptr)
@ -90,12 +80,12 @@ namespace llarp
crypto = std::make_unique<sodium::CryptoLibSodium>();
cryptoManager = std::make_unique<CryptoManager>(crypto.get());
router = std::make_unique<Router>(mainloop, logic);
router = makeRouter(mainloop, logic);
nodedb = std::make_unique<llarp_nodedb>(
nodedb_dir, [r = router.get()](auto call) { r->QueueDiskIO(std::move(call)); });
if (!router->Configure(config.get(), opts.isRouter, nodedb.get()))
if (!router->Configure(*config.get(), opts.isRouter, nodedb.get()))
throw std::runtime_error("Failed to configure router");
// must be done after router is made so we can use its disk io worker
@ -105,6 +95,14 @@ namespace llarp
throw std::runtime_error("Config::Setup() failed to load database");
}
std::unique_ptr<AbstractRouter>
Context::makeRouter(
llarp_ev_loop_ptr netloop,
std::shared_ptr<Logic> logic)
{
return std::make_unique<Router>(netloop, logic);
}
int
Context::Run(const RuntimeOptions& opts)
{
@ -197,14 +195,6 @@ namespace llarp
llarp::LogDebug("free logic");
logic.reset();
}
#ifdef LOKINET_HIVE
void
Context::InjectHive(tooling::RouterHive* hive)
{
router->hive = hive;
}
#endif
} // namespace llarp
extern "C"

@ -127,6 +127,10 @@ namespace llarp
auto* router = dht.GetRouter();
router->NotifyRouterEvent<tooling::RCGossipReceivedEvent>(router->pubkey(), rc);
router->GossipRCIfNeeded(rc);
auto peerDb = router->peerDb();
if (peerDb)
peerDb->handleGossipedRC(rc);
}
}
return true;

@ -76,7 +76,7 @@ namespace llarp
GotLIM = util::memFn(&Session::GotRenegLIM, this);
m_RemoteRC = msg->rc;
m_Parent->MapAddr(m_RemoteRC.pubkey, this);
return m_Parent->SessionEstablished(this);
return m_Parent->SessionEstablished(this, true);
}
bool
@ -96,7 +96,7 @@ namespace llarp
{
self->m_State = State::Ready;
self->m_Parent->MapAddr(self->m_RemoteRC.pubkey, self.get());
self->m_Parent->SessionEstablished(self.get());
self->m_Parent->SessionEstablished(self.get(), false);
}
});
return true;
@ -281,6 +281,13 @@ namespace llarp
return false;
}
SessionStats
Session::GetSessionStats() const
{
// TODO: thread safety
return m_Stats;
}
util::StatusObject
Session::ExtractStatus() const
{

@ -115,6 +115,9 @@ namespace llarp
bool
ShouldPing() const override;
SessionStats
GetSessionStats() const override;
util::StatusObject
ExtractStatus() const override;
@ -141,20 +144,7 @@ namespace llarp
static std::string
StateToString(State state);
State m_State;
struct Stats
{
// rate
uint64_t currentRateRX = 0;
uint64_t currentRateTX = 0;
uint64_t totalPacketsRX = 0;
uint64_t totalAckedTX = 0;
uint64_t totalDroppedTX = 0;
uint64_t totalInFlightTX = 0;
};
Stats m_Stats;
SessionStats m_Stats;
/// are we inbound session ?
const bool m_Inbound;

@ -4,6 +4,7 @@
#include <link/server.hpp>
#include <util/thread/logic.hpp>
#include <util/types.hpp>
#include <peerstats/peer_db.hpp>
#include <functional>
@ -77,6 +78,9 @@ namespace llarp
virtual void
CheckPersistingSessions(llarp_time_t now) = 0;
virtual void
updatePeerDb(std::shared_ptr<PeerDb> peerDb) = 0;
virtual util::StatusObject
ExtractStatus() const = 0;
};

@ -327,6 +327,55 @@ namespace llarp
}
}
void
LinkManager::updatePeerDb(std::shared_ptr<PeerDb> peerDb)
{
std::vector<std::pair<RouterID, SessionStats>> statsToUpdate;
int64_t diffTotalTX = 0;
ForEachPeer([&](ILinkSession* session) {
// derive RouterID
RouterID id = RouterID(session->GetRemoteRC().pubkey);
SessionStats sessionStats = session->GetSessionStats();
SessionStats diff;
SessionStats& lastStats = m_lastRouterStats[id];
// TODO: operator overloads / member func for diff
diff.currentRateRX = std::max(sessionStats.currentRateRX, lastStats.currentRateRX);
diff.currentRateTX = std::max(sessionStats.currentRateTX, lastStats.currentRateTX);
diff.totalPacketsRX = sessionStats.totalPacketsRX - lastStats.totalPacketsRX;
diff.totalAckedTX = sessionStats.totalAckedTX - lastStats.totalAckedTX;
diff.totalDroppedTX = sessionStats.totalDroppedTX - lastStats.totalDroppedTX;
diffTotalTX = diff.totalAckedTX + diff.totalDroppedTX + diff.totalInFlightTX;
lastStats = sessionStats;
// TODO: if we have both inbound and outbound session, this will overwrite
statsToUpdate.push_back({id, diff});
});
for (auto& routerStats : statsToUpdate)
{
peerDb->modifyPeerStats(routerStats.first, [&](PeerStats& stats) {
// TODO: store separate stats for up vs down
const auto& diff = routerStats.second;
// note that 'currentRateRX' and 'currentRateTX' are per-second
stats.peakBandwidthBytesPerSec = std::max(
stats.peakBandwidthBytesPerSec,
(double)std::max(diff.currentRateRX, diff.currentRateTX));
stats.numPacketsDropped += diff.totalDroppedTX;
stats.numPacketsSent = diff.totalAckedTX;
stats.numPacketsAttempted = diffTotalTX;
// TODO: others -- we have slight mismatch on what we store
});
}
}
util::StatusObject
LinkManager::ExtractStatus() const
{

@ -74,6 +74,9 @@ namespace llarp
void
CheckPersistingSessions(llarp_time_t now) override;
void
updatePeerDb(std::shared_ptr<PeerDb> peerDb) override;
util::StatusObject
ExtractStatus() const override;
@ -96,6 +99,8 @@ namespace llarp
std::unordered_map<RouterID, llarp_time_t, RouterID::Hash> m_PersistingSessions
GUARDED_BY(_mutex);
std::unordered_map<RouterID, SessionStats, RouterID::Hash> m_lastRouterStats;
IOutboundSessionMaker* _sessionMaker;
};

@ -17,34 +17,53 @@
namespace llarp
{
/// handle a link layer message
/// handle a link layer message. this allows for the message to be handled by "upper layers"
///
/// currently called from iwp::Session when messages are sent or received.
using LinkMessageHandler = std::function<bool(ILinkSession*, const llarp_buffer_t&)>;
/// sign a buffer with identity key
/// sign a buffer with identity key. this function should take the given `llarp_buffer_t` and
/// sign it, prividing the signature in the out variable `Signature&`.
///
/// currently called from iwp::Session for signing LIMs (link introduction messages)
using SignBufferFunc = std::function<bool(Signature&, const llarp_buffer_t&)>;
/// handle connection timeout
///
/// currently called from ILinkLayer::Pump() when an unestablished session times out
using TimeoutHandler = std::function<void(ILinkSession*)>;
/// get our RC
///
/// currently called by iwp::Session to include as part of a LIM (link introduction message)
using GetRCFunc = std::function<const llarp::RouterContact&(void)>;
/// handler of session established
/// return false to reject
/// return true to accept
using SessionEstablishedHandler = std::function<bool(ILinkSession*)>;
///
/// currently called in iwp::Session when a valid LIM is received.
using SessionEstablishedHandler = std::function<bool(ILinkSession*, bool)>;
/// f(new, old)
/// handler of session renegotiation
/// returns true if the new rc is valid
/// returns false otherwise and the session is terminated
///
/// currently called from iwp::Session when we receive a renegotiation LIM
using SessionRenegotiateHandler = std::function<bool(llarp::RouterContact, llarp::RouterContact)>;
/// handles close of all sessions with pubkey
///
/// Note that this handler is called while m_AuthedLinksMutex is held
///
/// currently called from iwp::ILinkSession when a previously established session times out
using SessionClosedHandler = std::function<void(llarp::RouterID)>;
/// notifies router that a link session has ended its pump and we should flush
/// messages to upper layers
///
/// currently called at the end of every iwp::Session::Pump() call
using PumpDoneHandler = std::function<void(void)>;
using Work_t = std::function<void(void)>;

@ -15,6 +15,19 @@ namespace llarp
struct ILinkMessage;
struct ILinkLayer;
struct SessionStats
{
// rate
uint64_t currentRateRX = 0;
uint64_t currentRateTX = 0;
uint64_t totalPacketsRX = 0;
uint64_t totalAckedTX = 0;
uint64_t totalDroppedTX = 0;
uint64_t totalInFlightTX = 0;
};
struct ILinkSession
{
virtual ~ILinkSession() = default;
@ -108,6 +121,10 @@ namespace llarp
virtual bool
ShouldPing() const = 0;
/// return the current stats for this session
virtual SessionStats
GetSessionStats() const = 0;
virtual util::StatusObject
ExtractStatus() const = 0;
};

@ -0,0 +1,137 @@
#pragma once
#include <sqlite_orm/sqlite_orm.h>
#include <peerstats/types.hpp>
/// Contains some code to help deal with sqlite_orm in hopes of keeping other headers clean
namespace llarp
{
inline auto
initStorage(const std::string& file)
{
using namespace sqlite_orm;
return make_storage(
file,
make_table(
"peerstats",
make_column("routerId", &PeerStats::routerId, primary_key(), unique()),
make_column("numConnectionAttempts", &PeerStats::numConnectionAttempts),
make_column("numConnectionSuccesses", &PeerStats::numConnectionSuccesses),
make_column("numConnectionRejections", &PeerStats::numConnectionRejections),
make_column("numConnectionTimeouts", &PeerStats::numConnectionTimeouts),
make_column("numPathBuilds", &PeerStats::numPathBuilds),
make_column("numPacketsAttempted", &PeerStats::numPacketsAttempted),
make_column("numPacketsSent", &PeerStats::numPacketsSent),
make_column("numPacketsDropped", &PeerStats::numPacketsDropped),
make_column("numPacketsResent", &PeerStats::numPacketsResent),
make_column("numDistinctRCsReceived", &PeerStats::numDistinctRCsReceived),
make_column("numLateRCs", &PeerStats::numLateRCs),
make_column("peakBandwidthBytesPerSec", &PeerStats::peakBandwidthBytesPerSec),
make_column("longestRCReceiveInterval", &PeerStats::longestRCReceiveInterval),
make_column("leastRCRemainingLifetime", &PeerStats::leastRCRemainingLifetime)));
}
using PeerDbStorage = decltype(initStorage(""));
} // namespace llarp
/// "custom" types for sqlite_orm
/// reference: https://github.com/fnc12/sqlite_orm/blob/master/examples/enum_binding.cpp
namespace sqlite_orm
{
/// llarp_time_t serialization
template <>
struct type_printer<llarp_time_t> : public integer_printer
{
};
template <>
struct statement_binder<llarp_time_t>
{
int
bind(sqlite3_stmt* stmt, int index, const llarp_time_t& value)
{
return statement_binder<int64_t>().bind(stmt, index, value.count());
}
};
template <>
struct field_printer<llarp_time_t>
{
std::string
operator()(const llarp_time_t& value) const
{
std::stringstream stream;
stream << value.count();
return stream.str();
}
};
template <>
struct row_extractor<llarp_time_t>
{
llarp_time_t
extract(const char* row_value)
{
int64_t raw = static_cast<int64_t>(atoi(row_value));
return llarp_time_t(raw);
}
llarp_time_t
extract(sqlite3_stmt* stmt, int columnIndex)
{
auto str = sqlite3_column_text(stmt, columnIndex);
return this->extract((const char*)str);
}
};
/// RouterID serialization
template <>
struct type_printer<llarp::RouterID> : public text_printer
{
};
template <>
struct statement_binder<llarp::RouterID>
{
int
bind(sqlite3_stmt* stmt, int index, const llarp::RouterID& value)
{
return statement_binder<std::string>().bind(stmt, index, value.ToString());
}
};
template <>
struct field_printer<llarp::RouterID>
{
std::string
operator()(const llarp::RouterID& value) const
{
return value.ToString();
}
};
template <>
struct row_extractor<llarp::RouterID>
{
llarp::RouterID
extract(const char* row_value)
{
llarp::RouterID id;
if (not id.FromString(row_value))
throw std::runtime_error("Invalid RouterID in sqlite3 database");
return id;
}
llarp::RouterID
extract(sqlite3_stmt* stmt, int columnIndex)
{
auto str = sqlite3_column_text(stmt, columnIndex);
return this->extract((const char*)str);
}
};
} // namespace sqlite_orm

@ -0,0 +1,303 @@
#include <peerstats/peer_db.hpp>
#include <util/logging/logger.hpp>
#include <util/status.hpp>
#include <util/str.hpp>
namespace llarp
{
PeerDb::PeerDb()
{
m_lastFlush.store({});
}
void
PeerDb::loadDatabase(std::optional<fs::path> file)
{
std::lock_guard guard(m_statsLock);
if (m_storage)
throw std::runtime_error("Reloading database not supported"); // TODO
m_peerStats.clear();
// sqlite_orm treats empty-string as an indicator to load a memory-backed database, which we'll
// use if file is an empty-optional
std::string fileString;
if (file.has_value())
{
fileString = file->string();
LogInfo("Loading PeerDb from file ", fileString);
}
else
{
LogInfo("Loading memory-backed PeerDb");
}
m_storage = std::make_unique<PeerDbStorage>(initStorage(fileString));
m_storage->sync_schema(true); // true for "preserve" as in "don't nuke" (how cute!)
auto allStats = m_storage->get_all<PeerStats>();
LogInfo("Loading ", allStats.size(), " PeerStats from table peerstats...");
for (PeerStats& stats : allStats)
{
// we cleared m_peerStats, and the database should enforce that routerId is unique...
assert(m_peerStats.find(stats.routerId) == m_peerStats.end());
stats.stale = false;
m_peerStats[stats.routerId] = stats;
}
}
void
PeerDb::flushDatabase()
{
LogDebug("flushing PeerDb...");
auto start = time_now_ms();
if (not shouldFlush(start))
{
LogWarn("Call to flushDatabase() while already in progress, ignoring");
return;
}
if (not m_storage)
throw std::runtime_error("Cannot flush database before it has been loaded");
std::vector<PeerStats> staleStats;
{
std::lock_guard guard(m_statsLock);
// copy all stale entries
for (auto& entry : m_peerStats)
{
if (entry.second.stale)
{
staleStats.push_back(entry.second);
entry.second.stale = false;
}
}
}
LogInfo("Updating ", staleStats.size(), " stats");
{
auto guard = m_storage->transaction_guard();
for (const auto& stats : staleStats)
{
m_storage->replace(stats);
}
guard.commit();
}
auto end = time_now_ms();
auto elapsed = end - start;
LogInfo("PeerDb flush took about ", elapsed, " seconds");
m_lastFlush.store(end);
}
void
PeerDb::accumulatePeerStats(const RouterID& routerId, const PeerStats& delta)
{
if (routerId != delta.routerId)
throw std::invalid_argument(
stringify("routerId ", routerId, " doesn't match ", delta.routerId));
std::lock_guard guard(m_statsLock);
auto itr = m_peerStats.find(routerId);
if (itr == m_peerStats.end())
itr = m_peerStats.insert({routerId, delta}).first;
else
itr->second += delta;
itr->second.stale = true;
}
void
PeerDb::modifyPeerStats(const RouterID& routerId, std::function<void(PeerStats&)> callback)
{
std::lock_guard guard(m_statsLock);
PeerStats& stats = m_peerStats[routerId];
stats.routerId = routerId;
stats.stale = true;
callback(stats);
}
std::optional<PeerStats>
PeerDb::getCurrentPeerStats(const RouterID& routerId) const
{
std::lock_guard guard(m_statsLock);
auto itr = m_peerStats.find(routerId);
if (itr == m_peerStats.end())
return std::nullopt;
else
return itr->second;
}
std::vector<PeerStats>
PeerDb::listAllPeerStats() const
{
std::lock_guard guard(m_statsLock);
std::vector<PeerStats> statsList;
statsList.reserve(m_peerStats.size());
for (const auto& [routerId, stats] : m_peerStats)
{
statsList.push_back(stats);
}
return statsList;
}
std::vector<PeerStats>
PeerDb::listPeerStats(const std::vector<RouterID>& ids) const
{
std::lock_guard guard(m_statsLock);
std::vector<PeerStats> statsList;
statsList.reserve(ids.size());
for (const auto& id : ids)
{
const auto itr = m_peerStats.find(id);
if (itr != m_peerStats.end())
statsList.push_back(itr->second);
}
return statsList;
}
/// Assume we receive an RC at some point `R` in time which was signed at some point `S` in time
/// and expires at some point `E` in time, as depicted below:
///
/// +-----------------------------+
/// | signed rc | <- useful lifetime of RC
/// +-----------------------------+
/// ^ [ . . . . . . . . ] <----------- window in which we receive this RC gossiped to us
/// | ^ ^
/// | | |
/// S R E
///
/// One useful metric from this is the difference between (E - R), the useful contact time of this
/// RC. As we track this metric over time, the high and low watermarks serve to tell us how
/// quickly we receive signed RCs from a given router and how close to expiration they are when
/// we receive them. The latter is particularly useful, and should always be a positive number for
/// a healthy router. A negative number indicates that we are receiving an expired RC.
///
/// TODO: we actually discard expired RCs, so we currently would not detect a negative value for
/// (E - R)
///
/// Another related metric is the distance between a newly received RC and the previous RC's
/// expiration, which represents how close we came to having no useful RC to work with. This
/// should be a high (positive) number for a healthy router, and if negative indicates that we
/// had no way to contact this router for a period of time.
///
/// E1 E2 E3
/// | | |
/// v | |
/// +-----------------------------+ | |
/// | signed rc 1 | | |
/// +-----------------------------+ | |
/// [ . . . . . ] v |
/// ^ +-----------------------------+ |
/// | | signed rc 2 | |
/// | +-----------------------------+ |
/// | [ . . . . . . . . . . ] v
/// | ^ +-----------------------------+
/// | | | signed rc 3 |
/// | | +-----------------------------+
/// | | [ . . ]
/// | | ^
/// | | |
/// R1 R2 R3
///
/// Example: the delta between (E1 - R2) is healthy, but the delta between (E2 - R3) is indicates
/// that we had a brief period of time where we had no valid (non-expired) RC for this router
/// (because it is negative).
void
PeerDb::handleGossipedRC(const RouterContact& rc, llarp_time_t now)
{
std::lock_guard guard(m_statsLock);
RouterID id(rc.pubkey);
auto& stats = m_peerStats[id];
stats.routerId = id;
const bool isNewRC = (stats.lastRCUpdated < rc.last_updated);
if (isNewRC)
{
stats.numDistinctRCsReceived++;
if (stats.numDistinctRCsReceived > 1)
{
auto prevRCExpiration = (stats.lastRCUpdated + RouterContact::Lifetime);
// we track max expiry as the delta between (last expiration time - time received),
// and this value will be negative for an unhealthy router
// TODO: handle case where new RC is also expired? just ignore?
auto expiry = prevRCExpiration - now;
if (stats.numDistinctRCsReceived == 2)
stats.leastRCRemainingLifetime = expiry;
else
stats.leastRCRemainingLifetime = std::min(stats.leastRCRemainingLifetime, expiry);
}
stats.lastRCUpdated = rc.last_updated;
stats.stale = true;
}
}
void
PeerDb::configure(const RouterConfig& routerConfig)
{
if (not routerConfig.m_enablePeerStats)
throw std::runtime_error("[router]:enable-peer-stats is not enabled");
fs::path dbPath = routerConfig.m_dataDir / "peerstats.sqlite";
loadDatabase(dbPath);
}
bool
PeerDb::shouldFlush(llarp_time_t now)
{
constexpr llarp_time_t TargetFlushInterval = 30s;
return (now - m_lastFlush.load() >= TargetFlushInterval);
}
util::StatusObject
PeerDb::ExtractStatus() const
{
std::lock_guard guard(m_statsLock);
bool loaded = (m_storage.get() != nullptr);
util::StatusObject dbFile = nullptr;
if (loaded)
dbFile = m_storage->filename();
std::vector<util::StatusObject> statsObjs;
statsObjs.reserve(m_peerStats.size());
for (const auto& pair : m_peerStats)
{
statsObjs.push_back(pair.second.toJson());
}
util::StatusObject obj{
{"dbLoaded", loaded},
{"dbFile", dbFile},
{"lastFlushMs", m_lastFlush.load().count()},
{"stats", statsObjs},
};
return obj;
}
}; // namespace llarp

@ -0,0 +1,138 @@
#pragma once
#include <filesystem>
#include <functional>
#include <unordered_map>
#include <sqlite_orm/sqlite_orm.h>
#include <util/fs.hpp>
#include <config/config.hpp>
#include <router_id.hpp>
#include <util/time.hpp>
#include <peerstats/types.hpp>
#include <peerstats/orm.hpp>
namespace llarp
{
/// Maintains a database of stats collected about the connections with our Service Node peers.
/// This uses a sqlite3 database behind the scenes as persistance, but this database is
/// periodically flushed to, meaning that it will become stale as PeerDb accumulates stats without
/// a flush.
struct PeerDb
{
/// Constructor
PeerDb();
/// Loads the database from disk using the provided filepath. If the file is equal to
/// `std::nullopt`, the database will be loaded into memory (useful for testing).
///
/// This must be called prior to calling flushDatabase(), and will truncate any existing data.
///
/// This is a blocking call, both in the sense that it blocks on disk/database I/O and that it
/// will sit on a mutex while the database is loaded.
///
/// @param file is an optional file which doesn't have to exist but must be writable, if a value
/// is provided. If no value is provided, the database will be memory-backed.
/// @throws if sqlite_orm/sqlite3 is unable to open or create a database at the given file
void
loadDatabase(std::optional<fs::path> file);
/// Flushes the database. Must be called after loadDatabase(). This call will block during I/O
/// and should be called in an appropriate threading context. However, it will make a temporary
/// copy of the peer stats so as to avoid sitting on a mutex lock during disk I/O.
///
/// @throws if the database could not be written to (esp. if loadDatabase() has not been called)
void
flushDatabase();
/// Add the given stats to the cummulative stats for the given peer. For cummulative stats, the
/// stats are added together; for watermark stats, the max is kept.
///
/// This is intended to be used in the following pattern:
///
/// 1) Initialize an empty PeerStats
/// 2) Collect relevant stats
/// 3) Call accumulatePeerStats() with the stats
/// 4) Reset the stats to 0
/// 5) <Repeat 2-4 periodically>
///
/// @param routerId is the id of the router whose stats should be modified.
/// @param delta is the stats to add to the existing stats
void
accumulatePeerStats(const RouterID& routerId, const PeerStats& delta);
/// Allows write-access to the stats for a given peer while appropriate mutex lock is held. This
/// is an alternative means of incrementing peer stats that is suitable for one-off
/// modifications.
///
/// Note that this holds m_statsLock during the callback invocation, so the callback should
/// return as quickly as possible.
///
/// @param routerId is the id of the router whose stats should be modified.
/// @param callback is a function which will be called immediately with mutex held
void
modifyPeerStats(const RouterID& routerId, std::function<void(PeerStats&)> callback);
/// Provides a snapshot of the most recent PeerStats we have for the given peer. If we don't
/// have any stats for the peer, std::nullopt
///
/// @param routerId is the RouterID of the requested peer
/// @return a copy of the most recent peer stats or an empty one if no such peer is known
std::optional<PeerStats>
getCurrentPeerStats(const RouterID& routerId) const;
/// Lists all peer stats. This essentially dumps the database into a list of PeerStats objects.
///
/// Note that this avoids disk I/O by copying from our cached map of peers.
///
/// @return a list of all PeerStats we have maintained
std::vector<PeerStats>
listAllPeerStats() const;
/// Lists specific peer stats.
///
/// @param peers is list of RouterIDs which are desired
/// @return a list of the requested peers. Peers not found will be omitted.
std::vector<PeerStats>
listPeerStats(const std::vector<RouterID>& ids) const;
/// Handles a new gossiped RC, updating stats as needed. The database tracks the last
/// advertised update time, so it knows whether this is a new RC or not.
///
/// The given RC is assumed to be valid.
///
/// @param rc is the RouterContact to handle
/// @param now is an optional time representing the current time
void
handleGossipedRC(const RouterContact& rc, llarp_time_t now = time_now_ms());
/// Configures the PeerDb based on RouterConfig
///
/// @param routerConfig
void
configure(const RouterConfig& routerConfig);
/// Returns whether or not we should flush, as determined by the last time we flushed and the
/// configured flush interval.
///
/// @param now is the current[-ish] time
bool
shouldFlush(llarp_time_t now);
/// Get JSON status for API
///
/// @return JSON object representing our current status
util::StatusObject
ExtractStatus() const;
private:
std::unordered_map<RouterID, PeerStats, RouterID::Hash> m_peerStats;
mutable std::mutex m_statsLock;
std::unique_ptr<PeerDbStorage> m_storage;
std::atomic<llarp_time_t> m_lastFlush;
};
} // namespace llarp

@ -0,0 +1,159 @@
#include <peerstats/types.hpp>
#include <util/str.hpp>
#include <stdexcept>
namespace llarp
{
constexpr auto RouterIdKey = "routerId";
constexpr auto NumConnectionAttemptsKey = "numConnectionAttempts";
constexpr auto NumConnectionSuccessesKey = "numConnectionSuccesses";
constexpr auto NumConnectionRejectionsKey = "numConnectionRejections";
constexpr auto NumConnectionTimeoutsKey = "numConnectionTimeouts";
constexpr auto NumPathBuildsKey = "numPathBuilds";
constexpr auto NumPacketsAttemptedKey = "numPacketsAttempted";
constexpr auto NumPacketsSentKey = "numPacketsSent";
constexpr auto NumPacketsDroppedKey = "numPacketsDropped";
constexpr auto NumPacketsResentKey = "numPacketsResent";
constexpr auto NumDistinctRCsReceivedKey = "numDistinctRCsReceived";
constexpr auto NumLateRCsKey = "numLateRCs";
constexpr auto PeakBandwidthBytesPerSecKey = "peakBandwidthBytesPerSec";
constexpr auto LongestRCReceiveIntervalKey = "longestRCReceiveInterval";
constexpr auto LeastRCRemainingLifetimeKey = "leastRCRemainingLifetime";
constexpr auto LastRCUpdatedKey = "lastRCUpdated";
PeerStats::PeerStats() = default;
PeerStats::PeerStats(const RouterID& routerId_) : routerId(routerId_)
{
}
PeerStats&
PeerStats::operator+=(const PeerStats& other)
{
numConnectionAttempts += other.numConnectionAttempts;
numConnectionSuccesses += other.numConnectionSuccesses;
numConnectionRejections += other.numConnectionRejections;
numConnectionTimeouts += other.numConnectionTimeouts;
numPathBuilds += other.numPathBuilds;
numPacketsAttempted += other.numPacketsAttempted;
numPacketsSent += other.numPacketsSent;
numPacketsDropped += other.numPacketsDropped;
numPacketsResent += other.numPacketsResent;
numDistinctRCsReceived += other.numDistinctRCsReceived;
numLateRCs += other.numLateRCs;
peakBandwidthBytesPerSec = std::max(peakBandwidthBytesPerSec, other.peakBandwidthBytesPerSec);
longestRCReceiveInterval = std::max(longestRCReceiveInterval, other.longestRCReceiveInterval);
leastRCRemainingLifetime = std::max(leastRCRemainingLifetime, other.leastRCRemainingLifetime);
lastRCUpdated = std::max(lastRCUpdated, other.lastRCUpdated);
return *this;
}
bool
PeerStats::operator==(const PeerStats& other) const
{
return routerId == other.routerId and numConnectionAttempts == other.numConnectionAttempts
and numConnectionSuccesses == other.numConnectionSuccesses
and numConnectionRejections == other.numConnectionRejections
and numConnectionTimeouts == other.numConnectionTimeouts
and numPathBuilds == other.numPathBuilds
and numPacketsAttempted == other.numPacketsAttempted
and numPacketsSent == other.numPacketsSent and numPacketsDropped == other.numPacketsDropped
and numPacketsResent == other.numPacketsResent
and numDistinctRCsReceived == other.numDistinctRCsReceived
and numLateRCs == other.numLateRCs
and peakBandwidthBytesPerSec == other.peakBandwidthBytesPerSec
and longestRCReceiveInterval == other.longestRCReceiveInterval
and leastRCRemainingLifetime == other.leastRCRemainingLifetime
and lastRCUpdated == other.lastRCUpdated;
}
util::StatusObject
PeerStats::toJson() const
{
return {
{RouterIdKey, routerId.ToString()},
{NumConnectionAttemptsKey, numConnectionAttempts},
{NumConnectionSuccessesKey, numConnectionSuccesses},
{NumConnectionRejectionsKey, numConnectionRejections},
{NumConnectionTimeoutsKey, numConnectionTimeouts},
{NumPathBuildsKey, numPathBuilds},
{NumPacketsAttemptedKey, numPacketsAttempted},
{NumPacketsSentKey, numPacketsSent},
{NumPacketsDroppedKey, numPacketsDropped},
{NumPacketsResentKey, numPacketsResent},
{NumDistinctRCsReceivedKey, numDistinctRCsReceived},
{NumLateRCsKey, numLateRCs},
{PeakBandwidthBytesPerSecKey, peakBandwidthBytesPerSec},
{LongestRCReceiveIntervalKey, longestRCReceiveInterval.count()},
{LeastRCRemainingLifetimeKey, leastRCRemainingLifetime.count()},
{LastRCUpdatedKey, lastRCUpdated.count()},
};
}
void
PeerStats::BEncode(llarp_buffer_t* buf) const
{
if (not buf)
throw std::runtime_error("PeerStats: Can't use null buf");
auto encodeUint64Entry = [&](std::string_view key, uint64_t value) {
if (not bencode_write_uint64_entry(buf, key.data(), key.size(), value))
throw std::runtime_error(stringify("PeerStats: Could not encode ", key));
};
if (not bencode_start_dict(buf))
throw std::runtime_error("PeerStats: Could not create bencode dict");
// TODO: we don't have bencode support for dict entries other than uint64...?
// encodeUint64Entry(RouterIdKey, routerId);
encodeUint64Entry(NumConnectionAttemptsKey, numConnectionAttempts);
encodeUint64Entry(NumConnectionSuccessesKey, numConnectionSuccesses);
encodeUint64Entry(NumConnectionRejectionsKey, numConnectionRejections);
encodeUint64Entry(NumConnectionTimeoutsKey, numConnectionTimeouts);
encodeUint64Entry(NumPathBuildsKey, numPathBuilds);
encodeUint64Entry(NumPacketsAttemptedKey, numPacketsAttempted);
encodeUint64Entry(NumPacketsSentKey, numPacketsSent);
encodeUint64Entry(NumPacketsDroppedKey, numPacketsDropped);
encodeUint64Entry(NumPacketsResentKey, numPacketsResent);
encodeUint64Entry(NumDistinctRCsReceivedKey, numDistinctRCsReceived);
encodeUint64Entry(NumLateRCsKey, numLateRCs);
encodeUint64Entry(PeakBandwidthBytesPerSecKey, (uint64_t)peakBandwidthBytesPerSec);
encodeUint64Entry(LongestRCReceiveIntervalKey, longestRCReceiveInterval.count());
encodeUint64Entry(LeastRCRemainingLifetimeKey, leastRCRemainingLifetime.count());
encodeUint64Entry(LastRCUpdatedKey, lastRCUpdated.count());
if (not bencode_end(buf))
throw std::runtime_error("PeerStats: Could not end bencode dict");
}
void
PeerStats::BEncodeList(const std::vector<PeerStats>& statsList, llarp_buffer_t* buf)
{
if (not buf)
throw std::runtime_error("PeerStats: Can't use null buf");
if (not bencode_start_list(buf))
throw std::runtime_error("PeerStats: Could not create bencode dict");
for (const auto& stats : statsList)
{
stats.BEncode(buf);
}
if (not bencode_end(buf))
throw std::runtime_error("PeerStats: Could not end bencode dict");
}
}; // namespace llarp

@ -0,0 +1,59 @@
#pragma once
#include <chrono>
#include <unordered_map>
#include <router_id.hpp>
#include <util/status.hpp>
#include <util/time.hpp>
/// Types stored in our peerstats database are declared here
namespace llarp
{
// Struct containing stats we know about a peer
struct PeerStats
{
RouterID routerId;
int32_t numConnectionAttempts = 0;
int32_t numConnectionSuccesses = 0;
int32_t numConnectionRejections = 0;
int32_t numConnectionTimeouts = 0;
int32_t numPathBuilds = 0;
int64_t numPacketsAttempted = 0;
int64_t numPacketsSent = 0;
int64_t numPacketsDropped = 0;
int64_t numPacketsResent = 0;
int32_t numDistinctRCsReceived = 0;
int32_t numLateRCs = 0;
double peakBandwidthBytesPerSec = 0;
llarp_time_t longestRCReceiveInterval = 0ms;
llarp_time_t leastRCRemainingLifetime = 0ms;
llarp_time_t lastRCUpdated = 0ms;
// not serialized
bool stale = true;
PeerStats();
PeerStats(const RouterID& routerId);
PeerStats&
operator+=(const PeerStats& other);
bool
operator==(const PeerStats& other) const;
util::StatusObject
toJson() const;
void
BEncode(llarp_buffer_t* buf) const;
static void
BEncodeList(const std::vector<PeerStats>& statsList, llarp_buffer_t* buf);
};
} // namespace llarp

@ -11,9 +11,10 @@
#include <functional>
#include <router_contact.hpp>
#include <tooling/router_event.hpp>
#include <peerstats/peer_db.hpp>
#ifdef LOKINET_HIVE
#include "tooling/router_hive.hpp"
#include "tooling/router_event.hpp"
#endif
struct llarp_buffer_t;
@ -151,11 +152,14 @@ namespace llarp
virtual I_RCLookupHandler&
rcLookupHandler() = 0;
virtual std::shared_ptr<PeerDb>
peerDb() = 0;
virtual bool
Sign(Signature& sig, const llarp_buffer_t& buf) const = 0;
virtual bool
Configure(Config* conf, bool isRouter, llarp_nodedb* nodedb) = 0;
Configure(const Config& conf, bool isRouter, llarp_nodedb* nodedb) = 0;
virtual bool
IsServiceNode() const = 0;
@ -193,19 +197,10 @@ namespace llarp
/// connect to N random routers
virtual void
ConnectToRandomRouters(int N) = 0;
/// inject configuration and reconfigure router
virtual bool
Reconfigure(Config* conf) = 0;
virtual bool
TryConnectAsync(RouterContact rc, uint16_t tries) = 0;
/// validate new configuration against old one
/// return true on 100% valid
/// return false if not 100% valid
virtual bool
ValidateConfig(Config* conf) const = 0;
/// called by link when a remote session has no more sessions open
virtual void
SessionClosed(RouterID remote) = 0;
@ -288,14 +283,23 @@ namespace llarp
virtual void
GossipRCIfNeeded(const RouterContact rc) = 0;
/// Templated convenience function to generate a RouterHive event and
/// delegate to non-templated (and overridable) function for handling.
template <class EventType, class... Params>
void
NotifyRouterEvent([[maybe_unused]] Params&&... args) const
{
#ifdef LOKINET_HIVE
hive->NotifyEvent(std::make_unique<EventType>(std::forward<Params>(args)...));
#endif
// TODO: no-op when appropriate
auto event = std::make_unique<EventType>(args...);
HandleRouterEvent(std::move(event));
}
protected:
/// Virtual function to handle RouterEvent. HiveRouter overrides this in
/// order to inject the event. The default implementation in Router simply
/// logs it.
virtual void
HandleRouterEvent(tooling::RouterEventPtr event) const = 0;
};
} // namespace llarp

@ -1,5 +1,7 @@
#include <router/outbound_session_maker.hpp>
#include <router/abstractrouter.hpp>
#include <tooling/peer_stats_event.hpp>
#include <link/server.hpp>
#include <router_contact.hpp>
#include <nodedb.hpp>
@ -152,6 +154,7 @@ namespace llarp
void
OutboundSessionMaker::Init(
AbstractRouter* router,
ILinkManager* linkManager,
I_RCLookupHandler* rcLookup,
Profiling* profiler,
@ -159,6 +162,7 @@ namespace llarp
llarp_nodedb* nodedb,
WorkerFunc_t dowork)
{
_router = router;
_linkManager = linkManager;
_rcLookup = rcLookup;
_logic = logic;
@ -298,8 +302,18 @@ namespace llarp
void
OutboundSessionMaker::CreatePendingSession(const RouterID& router)
{
util::Lock l(_mutex);
pendingSessions.emplace(router, nullptr);
{
util::Lock l(_mutex);
pendingSessions.emplace(router, nullptr);
}
auto peerDb = _router->peerDb();
if (peerDb)
{
peerDb->modifyPeerStats(router, [](PeerStats& stats) { stats.numConnectionAttempts++; });
}
_router->NotifyRouterEvent<tooling::ConnectionAttemptEvent>(_router->pubkey(), router);
}
void

@ -58,6 +58,7 @@ namespace llarp
void
Init(
AbstractRouter* router,
ILinkManager* linkManager,
I_RCLookupHandler* rcLookup,
Profiling* profiler,
@ -110,6 +111,7 @@ namespace llarp
std::unordered_map<RouterID, CallbacksQueue, RouterID::Hash> pendingCallbacks
GUARDED_BY(_mutex);
AbstractRouter* _router = nullptr;
ILinkManager* _linkManager = nullptr;
I_RCLookupHandler* _rcLookup = nullptr;
Profiling* _profiler = nullptr;

@ -21,8 +21,10 @@
#include <util/meta/memfn.hpp>
#include <util/str.hpp>
#include <ev/ev.hpp>
#include <tooling/peer_stats_event.hpp>
#include "tooling/router_event.hpp"
#include "util/status.hpp"
#include <fstream>
#include <cstdlib>
@ -80,13 +82,18 @@ namespace llarp
{
if (_running)
{
util::StatusObject peerStatsObj = nullptr;
if (m_peerDb)
peerStatsObj = m_peerDb->ExtractStatus();
return util::StatusObject{{"running", true},
{"numNodesKnown", _nodedb->num_loaded()},
{"dht", _dht->impl->ExtractStatus()},
{"services", _hiddenServiceContext.ExtractStatus()},
{"exit", _exitContext.ExtractStatus()},
{"links", _linkManager.ExtractStatus()},
{"outboundMessages", _outboundMessageHandler.ExtractStatus()}};
{"outboundMessages", _outboundMessageHandler.ExtractStatus()},
{"peerStats", peerStatsObj}};
}
else
{
@ -117,6 +124,9 @@ namespace llarp
void
Router::GossipRCIfNeeded(const RouterContact rc)
{
if (disableGossipingRC_TestingOnly())
return;
/// if we are not a service node forget about gossip
if (not IsServiceNode())
return;
@ -212,7 +222,31 @@ namespace llarp
return false;
#endif
#endif
_identity = RpcClient()->ObtainIdentityKey();
constexpr int maxTries = 5;
int numTries = 0;
while (numTries < maxTries)
{
numTries++;
try
{
_identity = RpcClient()->ObtainIdentityKey();
LogWarn("Obtained lokid identity keys");
break;
}
catch (const std::exception& e)
{
LogWarn(
"Failed attempt ",
numTries,
" of ",
maxTries,
" to get lokid identity keys because: ",
e.what());
if (numTries == maxTries)
throw;
}
}
}
else
{
@ -228,16 +262,16 @@ namespace llarp
}
bool
Router::Configure(Config* conf, bool isRouter, llarp_nodedb* nodedb)
Router::Configure(const Config& conf, bool isRouter, llarp_nodedb* nodedb)
{
// we need this first so we can start lmq to fetch keys
if (conf)
{
enableRPCServer = conf->api.m_enableRPCServer;
rpcBindAddr = lokimq::address(conf->api.m_rpcBindAddr);
whitelistRouters = conf->lokid.whitelistRouters;
lokidRPCAddr = lokimq::address(conf->lokid.lokidRPCAddr);
}
whitelistRouters = conf.lokid.whitelistRouters;
if (whitelistRouters)
lokidRPCAddr = lokimq::address(conf.lokid.lokidRPCAddr);
enableRPCServer = conf.api.m_enableRPCServer;
if (enableRPCServer)
rpcBindAddr = lokimq::address(conf.api.m_rpcBindAddr);
if (not StartRpcServer())
throw std::runtime_error("Failed to start rpc server");
@ -251,13 +285,11 @@ namespace llarp
}
// fetch keys
if (conf)
{
if (not m_keyManager->initialize(*conf, true, isRouter))
throw std::runtime_error("KeyManager failed to initialize");
if (!FromConfig(conf))
throw std::runtime_error("FromConfig() failed");
}
if (not m_keyManager->initialize(conf, true, isRouter))
throw std::runtime_error("KeyManager failed to initialize");
if (!FromConfig(conf))
throw std::runtime_error("FromConfig() failed");
if (!InitOutboundLinks())
throw std::runtime_error("InitOutboundLinks() failed");
@ -374,12 +406,12 @@ namespace llarp
}
bool
Router::FromConfig(Config* conf)
Router::FromConfig(const Config& conf)
{
// Set netid before anything else
if (!conf->router.m_netId.empty() && strcmp(conf->router.m_netId.c_str(), llarp::DEFAULT_NETID))
if (!conf.router.m_netId.empty() && strcmp(conf.router.m_netId.c_str(), llarp::DEFAULT_NETID))
{
const auto& netid = conf->router.m_netId;
const auto& netid = conf.router.m_netId;
llarp::LogWarn(
"!!!! you have manually set netid to be '",
netid,
@ -394,36 +426,36 @@ namespace llarp
}
// IWP config
m_OutboundPort = conf->links.m_OutboundLink.port;
m_OutboundPort = conf.links.m_OutboundLink.port;
// Router config
_rc.SetNick(conf->router.m_nickname);
_outboundSessionMaker.maxConnectedRouters = conf->router.m_maxConnectedRouters;
_outboundSessionMaker.minConnectedRouters = conf->router.m_minConnectedRouters;
_rc.SetNick(conf.router.m_nickname);
_outboundSessionMaker.maxConnectedRouters = conf.router.m_maxConnectedRouters;
_outboundSessionMaker.minConnectedRouters = conf.router.m_minConnectedRouters;
encryption_keyfile = m_keyManager->m_encKeyPath;
our_rc_file = m_keyManager->m_rcPath;
transport_keyfile = m_keyManager->m_transportKeyPath;
ident_keyfile = m_keyManager->m_idKeyPath;
_ourAddress = conf->router.m_publicAddress;
_ourAddress = conf.router.m_publicAddress;
RouterContact::BlockBogons = conf->router.m_blockBogons;
RouterContact::BlockBogons = conf.router.m_blockBogons;
// Lokid Config
usingSNSeed = conf->lokid.usingSNSeed;
whitelistRouters = conf->lokid.whitelistRouters;
lokidRPCAddr = lokimq::address(conf->lokid.lokidRPCAddr);
usingSNSeed = conf.lokid.usingSNSeed;
whitelistRouters = conf.lokid.whitelistRouters;
lokidRPCAddr = lokimq::address(conf.lokid.lokidRPCAddr);
if (usingSNSeed)
ident_keyfile = conf->lokid.ident_keyfile;
ident_keyfile = conf.lokid.ident_keyfile;
// TODO: add config flag for "is service node"
if (conf->links.m_InboundLinks.size())
if (conf.links.m_InboundLinks.size())
{
m_isServiceNode = true;
}
networkConfig = conf->network;
networkConfig = conf.network;
/// build a set of strictConnectPubkeys (
/// TODO: make this consistent with config -- do we support multiple strict connections
@ -446,21 +478,21 @@ namespace llarp
throw std::invalid_argument(stringify("invalid key for strict-connect: ", val));
}
std::vector<fs::path> configRouters = conf->connect.routers;
std::vector<fs::path> configRouters = conf.connect.routers;
configRouters.insert(
configRouters.end(), conf->bootstrap.routers.begin(), conf->bootstrap.routers.end());
configRouters.end(), conf.bootstrap.routers.begin(), conf.bootstrap.routers.end());
// if our conf had no bootstrap files specified, try the default location of
// <DATA_DIR>/bootstrap.signed. If this isn't present, leave a useful error message
if (configRouters.size() == 0 and not m_isServiceNode)
{
// TODO: use constant
fs::path defaultBootstrapFile = conf->router.m_dataDir / "bootstrap.signed";
fs::path defaultBootstrapFile = conf.router.m_dataDir / "bootstrap.signed";
if (fs::exists(defaultBootstrapFile))
{
configRouters.push_back(defaultBootstrapFile);
}
else if (not conf->bootstrap.skipBootstrap)
else if (not conf.bootstrap.skipBootstrap)
{
LogError("No bootstrap files specified in config file, and the default");
LogError("bootstrap file ", defaultBootstrapFile, " does not exist.");
@ -515,6 +547,7 @@ namespace llarp
// Init components after relevant config settings loaded
_outboundMessageHandler.Init(&_linkManager, _logic);
_outboundSessionMaker.Init(
this,
&_linkManager,
&_rcLookupHandler,
&_routerProfiling,
@ -534,16 +567,16 @@ namespace llarp
m_isServiceNode);
// create inbound links, if we are a service node
for (const LinksConfig::LinkInfo& serverConfig : conf->links.m_InboundLinks)
for (const LinksConfig::LinkInfo& serverConfig : conf.links.m_InboundLinks)
{
auto server = iwp::NewInboundLink(
m_keyManager,
util::memFn(&AbstractRouter::rc, this),
util::memFn(&AbstractRouter::HandleRecvLinkMessageBuffer, this),
util::memFn(&AbstractRouter::Sign, this),
util::memFn(&IOutboundSessionMaker::OnSessionEstablished, &_outboundSessionMaker),
util::memFn(&Router::ConnectionEstablished, this),
util::memFn(&AbstractRouter::CheckRenegotiateValid, this),
util::memFn(&IOutboundSessionMaker::OnConnectTimeout, &_outboundSessionMaker),
util::memFn(&Router::ConnectionTimedOut, this),
util::memFn(&AbstractRouter::SessionClosed, this),
util::memFn(&AbstractRouter::PumpLL, this),
util::memFn(&AbstractRouter::QueueWork, this));
@ -559,15 +592,15 @@ namespace llarp
}
// Network config
if (conf->network.m_enableProfiling.has_value() and not*conf->network.m_enableProfiling)
if (conf.network.m_enableProfiling.has_value() and not*conf.network.m_enableProfiling)
{
routerProfiling().Disable();
LogWarn("router profiling explicitly disabled");
}
if (!conf->network.m_routerProfilesFile.empty())
if (!conf.network.m_routerProfilesFile.empty())
{
routerProfilesFile = conf->network.m_routerProfilesFile;
routerProfilesFile = conf.network.m_routerProfilesFile;
routerProfiling().Load(routerProfilesFile.c_str());
llarp::LogInfo("setting profiles to ", routerProfilesFile);
}
@ -575,15 +608,27 @@ namespace llarp
// API config
if (not IsServiceNode())
{
hiddenServiceContext().AddEndpoint(*conf);
hiddenServiceContext().AddEndpoint(conf);
}
// peer stats
if (conf.router.m_enablePeerStats)
{
LogInfo("Initializing peerdb...");
m_peerDb = std::make_shared<PeerDb>();
m_peerDb->configure(conf.router);
}
else
{
assert(not IsServiceNode()); // enable peer stats must be enabled for service nodes
}
// Logging config
LogContext::Instance().Initialize(
conf->logging.m_logLevel,
conf->logging.m_logType,
conf->logging.m_logFile,
conf->router.m_nickname,
conf.logging.m_logLevel,
conf.logging.m_logType,
conf.logging.m_logFile,
conf.router.m_nickname,
util::memFn(&AbstractRouter::QueueDiskIO, this));
return true;
@ -754,6 +799,20 @@ namespace llarp
{
nodedb()->AsyncFlushToDisk();
}
if (m_peerDb)
{
// TODO: throttle this?
// TODO: need to capture session stats when session terminates / is removed from link manager
_linkManager.updatePeerDb(m_peerDb);
if (m_peerDb->shouldFlush(now))
{
LogWarn("Queing database flush...");
QueueDiskIO([this]() { m_peerDb->flushDatabase(); });
}
}
// get connected peers
std::set<dht::Key_t> peersWeHave;
_linkManager.ForEachPeer([&peersWeHave](ILinkSession* s) {
@ -791,6 +850,31 @@ namespace llarp
LogInfo("Session to ", remote, " fully closed");
}
void
Router::ConnectionTimedOut(ILinkSession* session)
{
if (m_peerDb)
{
RouterID id{session->GetPubKey()};
// TODO: make sure this is a public router (on whitelist)?
m_peerDb->modifyPeerStats(id, [&](PeerStats& stats) { stats.numConnectionTimeouts++; });
}
_outboundSessionMaker.OnConnectTimeout(session);
}
bool
Router::ConnectionEstablished(ILinkSession* session, bool inbound)
{
RouterID id{session->GetPubKey()};
if (m_peerDb)
{
// TODO: make sure this is a public router (on whitelist)?
m_peerDb->modifyPeerStats(id, [&](PeerStats& stats) { stats.numConnectionSuccesses++; });
}
NotifyRouterEvent<tooling::LinkSessionEstablishedEvent>(pubkey(), id, inbound);
return _outboundSessionMaker.OnSessionEstablished(session);
}
bool
Router::GetRandomConnectedRouter(RouterContact& result) const
{
@ -1101,19 +1185,6 @@ namespace llarp
return true;
}
bool
Router::ValidateConfig(Config* /*conf*/) const
{
return true;
}
bool
Router::Reconfigure(Config*)
{
// TODO: implement me
return true;
}
bool
Router::TryConnectAsync(RouterContact rc, uint16_t tries)
{
@ -1142,9 +1213,9 @@ namespace llarp
util::memFn(&AbstractRouter::rc, this),
util::memFn(&AbstractRouter::HandleRecvLinkMessageBuffer, this),
util::memFn(&AbstractRouter::Sign, this),
util::memFn(&IOutboundSessionMaker::OnSessionEstablished, &_outboundSessionMaker),
util::memFn(&Router::ConnectionEstablished, this),
util::memFn(&AbstractRouter::CheckRenegotiateValid, this),
util::memFn(&IOutboundSessionMaker::OnConnectTimeout, &_outboundSessionMaker),
util::memFn(&Router::ConnectionTimedOut, this),
util::memFn(&AbstractRouter::SessionClosed, this),
util::memFn(&AbstractRouter::PumpLL, this),
util::memFn(&AbstractRouter::QueueWork, this));
@ -1178,4 +1249,11 @@ namespace llarp
LogDebug("Message failed sending to ", remote);
}
}
void
Router::HandleRouterEvent(tooling::RouterEventPtr event) const
{
LogDebug(event->ToString());
}
} // namespace llarp

@ -16,6 +16,7 @@
#include <messages/link_message_parser.hpp>
#include <nodedb.hpp>
#include <path/path_context.hpp>
#include <peerstats/peer_db.hpp>
#include <profiling.hpp>
#include <router_contact.hpp>
#include <router/outbound_message_handler.hpp>
@ -48,7 +49,7 @@
namespace llarp
{
struct Router final : public AbstractRouter
struct Router : public AbstractRouter
{
llarp_time_t _lastPump = 0s;
bool ready;
@ -306,12 +307,18 @@ namespace llarp
return _rcLookupHandler;
}
std::shared_ptr<PeerDb>
peerDb() override
{
return m_peerDb;
}
void
GossipRCIfNeeded(const RouterContact rc) override;
explicit Router(llarp_ev_loop_ptr __netloop, std::shared_ptr<Logic> logic);
~Router() override;
virtual ~Router() override;
bool
HandleRecvLinkMessageBuffer(ILinkSession* from, const llarp_buffer_t& msg) override;
@ -338,7 +345,7 @@ namespace llarp
Close();
bool
Configure(Config* conf, bool isRouter, llarp_nodedb* nodedb = nullptr) override;
Configure(const Config& conf, bool isRouter, llarp_nodedb* nodedb = nullptr) override;
bool
StartRpcServer() override;
@ -385,19 +392,9 @@ namespace llarp
void
try_connect(fs::path rcfile);
/// inject configuration and reconfigure router
bool
Reconfigure(Config* conf) override;
bool
TryConnectAsync(RouterContact rc, uint16_t tries) override;
/// validate new configuration against old one
/// return true on 100% valid
/// return false if not 100% valid
bool
ValidateConfig(Config* conf) const override;
/// send to remote router or queue for sending
/// returns false on overflow
/// returns true on successful queue
@ -427,6 +424,14 @@ namespace llarp
void
SessionClosed(RouterID remote) override;
/// called by link when an unestablished connection times out
void
ConnectionTimedOut(ILinkSession* session);
/// called by link when session is fully established
bool
ConnectionEstablished(ILinkSession* session, bool inbound);
/// call internal router ticker
void
Tick();
@ -495,6 +500,7 @@ namespace llarp
llarp_time_t m_LastStatsReport = 0s;
std::shared_ptr<llarp::KeyManager> m_keyManager;
std::shared_ptr<PeerDb> m_peerDb;
uint32_t path_build_count = 0;
@ -508,10 +514,20 @@ namespace llarp
UpdateOurRC(bool rotateKeys = false);
bool
FromConfig(Config* conf);
FromConfig(const Config& conf);
void
MessageSent(const RouterID& remote, SendStatus status);
protected:
virtual void
HandleRouterEvent(tooling::RouterEventPtr event) const override;
virtual bool
disableGossipingRC_TestingOnly()
{
return false;
};
};
} // namespace llarp

@ -1,5 +1,6 @@
#include <rpc/lokid_rpc_client.hpp>
#include <stdexcept>
#include <util/logging/logger.hpp>
#include <router/abstractrouter.hpp>
@ -36,18 +37,20 @@ namespace llarp
: m_lokiMQ(std::move(lmq)), m_Router(r)
{
// m_lokiMQ->log_level(toLokiMQLogLevel(LogLevel::Instance().curLevel));
// TODO: proper auth here
auto lokidCategory = m_lokiMQ->add_category("lokid", lokimq::Access{lokimq::AuthLevel::none});
lokidCategory.add_request_command(
"get_peer_stats", [this](lokimq::Message& m) { HandleGetPeerStats(m); });
}
void
LokidRpcClient::ConnectAsync(lokimq::address url)
{
LogInfo("connecting to lokid via LMQ at ", url);
m_lokiMQ->connect_remote(
m_Connection = m_lokiMQ->connect_remote(
url,
[self = shared_from_this()](lokimq::ConnectionID c) {
self->m_Connection = std::move(c);
self->Connected();
},
[self = shared_from_this()](lokimq::ConnectionID) { self->Connected(); },
[self = shared_from_this(), url](lokimq::ConnectionID, std::string_view f) {
llarp::LogWarn("Failed to connect to lokid: ", f);
LogicCall(self->m_Router->logic(), [self, url]() { self->ConnectAsync(url); });
@ -100,10 +103,18 @@ namespace llarp
constexpr auto PingInterval = 1min;
constexpr auto NodeListUpdateInterval = 30s;
LogInfo("we connected to lokid [", *m_Connection, "]");
Command("admin.lokinet_ping");
m_lokiMQ->add_timer(
[self = shared_from_this()]() { self->Command("admin.lokinet_ping"); }, PingInterval);
auto makePingRequest = [self = shared_from_this()]() {
nlohmann::json payload = {{"version", {VERSION[0], VERSION[1], VERSION[2]}}};
self->Request(
"admin.lokinet_ping",
[](bool success, std::vector<std::string> data) {
(void)data;
LogDebug("Received response for ping. Successful: ", success);
},
payload.dump());
};
makePingRequest();
m_lokiMQ->add_timer(makePingRequest, PingInterval);
m_lokiMQ->add_timer(
[self = shared_from_this()]() { self->UpdateServiceNodeList(); }, NodeListUpdateInterval);
UpdateServiceNodeList();
@ -175,13 +186,13 @@ namespace llarp
"failed to get private key request "
"failed");
}
if (data.empty())
if (data.empty() or data.size() < 2)
{
throw std::runtime_error(
"failed to get private key request "
"data empty");
}
const auto j = nlohmann::json::parse(data[0]);
const auto j = nlohmann::json::parse(data[1]);
SecretKey k;
if (not k.FromHex(j.at("service_node_ed25519_privkey").get<std::string>()))
{
@ -189,8 +200,14 @@ namespace llarp
}
promise.set_value(k);
}
catch (const std::exception& e)
{
LogWarn("Caught exception while trying to request admin keys: ", e.what());
promise.set_exception(std::current_exception());
}
catch (...)
{
LogWarn("Caught non-standard exception while trying to request admin keys");
promise.set_exception(std::current_exception());
}
});
@ -198,5 +215,74 @@ namespace llarp
return ftr.get();
}
void
LokidRpcClient::HandleGetPeerStats(lokimq::Message& msg)
{
LogInfo("Got request for peer stats (size: ", msg.data.size(), ")");
for (auto str : msg.data)
{
LogInfo(" :", str);
}
assert(m_Router != nullptr);
if (not m_Router->peerDb())
{
LogWarn("HandleGetPeerStats called when router has no peerDb set up.");
// TODO: this can sometimes occur if lokid hits our API before we're done configuring
// (mostly an issue in a loopback testnet)
msg.send_reply("EAGAIN");
return;
}
try
{
// msg.data[0] is expected to contain a bt list of router ids (in our preferred string
// format)
if (msg.data.empty())
{
LogWarn("lokid requested peer stats with no request body");
msg.send_reply("peer stats request requires list of router IDs");
return;
}
std::vector<std::string> routerIdStrings;
lokimq::bt_deserialize(msg.data[0], routerIdStrings);
std::vector<RouterID> routerIds;
routerIds.reserve(routerIdStrings.size());
for (const auto& routerIdString : routerIdStrings)
{
RouterID id;
if (not id.FromString(routerIdString))
{
LogWarn("lokid sent us an invalid router id: ", routerIdString);
msg.send_reply("Invalid router id");
return;
}
routerIds.push_back(std::move(id));
}
auto statsList = m_Router->peerDb()->listPeerStats(routerIds);
int32_t bufSize =
256 + (statsList.size() * 1024); // TODO: tune this or allow to grow dynamically
auto buf = std::unique_ptr<uint8_t[]>(new uint8_t[bufSize]);
llarp_buffer_t llarpBuf(buf.get(), bufSize);
PeerStats::BEncodeList(statsList, &llarpBuf);
msg.send_reply(std::string_view((const char*)llarpBuf.base, llarpBuf.cur - llarpBuf.base));
}
catch (const std::exception& e)
{
LogError("Failed to handle get_peer_stats request: ", e.what());
msg.send_reply("server error");
}
}
} // namespace rpc
} // namespace llarp

@ -59,6 +59,10 @@ namespace llarp
void
HandleGotServiceNodeList(std::string json);
// Handles request from lokid for peer stats on a specific peer
void
HandleGetPeerStats(lokimq::Message& msg);
std::optional<lokimq::ConnectionID> m_Connection;
LMQ_ptr m_lokiMQ;
std::string m_CurrentBlockHash;

@ -0,0 +1,33 @@
#include <tooling/hive_context.hpp>
#include <tooling/hive_router.hpp>
namespace tooling
{
HiveContext::HiveContext(RouterHive* hive) : m_hive(hive)
{
}
std::unique_ptr<llarp::AbstractRouter>
HiveContext::makeRouter(
llarp_ev_loop_ptr netloop,
std::shared_ptr<llarp::Logic> logic)
{
return std::make_unique<HiveRouter>(netloop, logic, m_hive);
}
HiveRouter*
HiveContext::getRouterAsHiveRouter()
{
if (not router)
return nullptr;
HiveRouter* hiveRouter = dynamic_cast<HiveRouter*>(router.get());
if (hiveRouter == nullptr)
throw std::runtime_error("HiveContext has a router not of type HiveRouter");
return hiveRouter;
}
} // namespace tooling

@ -0,0 +1,30 @@
#pragma once
#include <llarp.hpp>
#include <tooling/hive_router.hpp>
namespace tooling
{
/// HiveContext is a subclass of llarp::Context which allows RouterHive to
/// perform custom behavior which might be undesirable in production code.
struct HiveContext : public llarp::Context
{
HiveContext(RouterHive* hive);
std::unique_ptr<llarp::AbstractRouter>
makeRouter(
llarp_ev_loop_ptr netloop,
std::shared_ptr<llarp::Logic> logic) override;
/// Get this context's router as a HiveRouter.
///
/// Returns nullptr if there is no router or throws an exception if the
/// router is somehow not an instance of HiveRouter.
HiveRouter*
getRouterAsHiveRouter();
protected:
RouterHive* m_hive = nullptr;
};
} // namespace tooling

@ -0,0 +1,37 @@
#include <tooling/hive_router.hpp>
#include <tooling/router_hive.hpp>
namespace tooling
{
HiveRouter::HiveRouter(
llarp_ev_loop_ptr netloop, std::shared_ptr<llarp::Logic> logic, RouterHive* hive)
: Router(netloop, logic), m_hive(hive)
{
}
bool
HiveRouter::disableGossipingRC_TestingOnly()
{
return m_disableGossiping;
}
void
HiveRouter::disableGossiping()
{
m_disableGossiping = true;
}
void
HiveRouter::enableGossiping()
{
m_disableGossiping = false;
}
void
HiveRouter::HandleRouterEvent(RouterEventPtr event) const
{
m_hive->NotifyEvent(std::move(event));
}
} // namespace tooling

@ -0,0 +1,38 @@
#pragma once
#include <router/router.hpp>
namespace tooling
{
/// HiveRouter is a subclass of Router which overrides specific behavior in
/// order to perform testing-related functions. It exists largely to prevent
/// this behavior (which may often be "dangerous") from leaking into release
/// code.
struct HiveRouter : public llarp::Router
{
HiveRouter(
llarp_ev_loop_ptr netloop,
std::shared_ptr<llarp::Logic> logic,
RouterHive* hive);
virtual ~HiveRouter() = default;
/// Override logic to prevent base Router class from gossiping its RC.
virtual bool
disableGossipingRC_TestingOnly() override;
void
disableGossiping();
void
enableGossiping();
protected:
bool m_disableGossiping = false;
RouterHive* m_hive = nullptr;
virtual void
HandleRouterEvent(RouterEventPtr event) const override;
};
} // namespace tooling

@ -0,0 +1,44 @@
#pragma once
#include "router_event.hpp"
namespace tooling
{
struct LinkSessionEstablishedEvent : public RouterEvent
{
llarp::RouterID remoteId;
bool inbound = false;
LinkSessionEstablishedEvent(
const llarp::RouterID& ourRouterId, const llarp::RouterID& remoteId_, bool inbound_)
: RouterEvent("Link: LinkSessionEstablishedEvent", ourRouterId, false)
, remoteId(remoteId_)
, inbound(inbound_)
{
}
std::string
ToString() const
{
return RouterEvent::ToString() + (inbound ? "inbound" : "outbound")
+ " : LinkSessionEstablished with " + remoteId.ToString();
}
};
struct ConnectionAttemptEvent : public RouterEvent
{
llarp::RouterID remoteId;
ConnectionAttemptEvent(const llarp::RouterID& ourRouterId, const llarp::RouterID& remoteId_)
: RouterEvent("Link: ConnectionAttemptEvent", ourRouterId, false), remoteId(remoteId_)
{
}
std::string
ToString() const
{
return RouterEvent::ToString() + " : LinkSessionEstablished with " + remoteId.ToString();
}
};
} // namespace tooling

@ -14,45 +14,43 @@ using namespace std::chrono_literals;
namespace tooling
{
void
RouterHive::AddRouter(
const std::shared_ptr<llarp::Config>& config, std::vector<llarp_main*>* routers, bool isRelay)
RouterHive::AddRouter(const std::shared_ptr<llarp::Config>& config, bool isRouter)
{
llarp_main* ctx = llarp_main_init_from_config(config->Copy(), isRelay);
auto result = llarp_main_setup(ctx, isRelay);
if (result == 0)
{
llarp::Context::Get(ctx)->InjectHive(this);
routers->push_back(ctx);
}
else
{
throw std::runtime_error(llarp::stringify(
"Failed to add RouterHive ",
(isRelay ? "relay" : "client"),
", llarp_main_setup() returned ",
result));
}
auto& container = (isRouter ? relays : clients);
llarp::RuntimeOptions opts;
opts.isRouter = isRouter;
Context_ptr context = std::make_shared<HiveContext>(this);
context->Configure(*config);
context->Setup(opts);
auto routerId = llarp::RouterID(context->router->pubkey());
container[routerId] = context;
std::cout << "Generated router with ID " << routerId << std::endl;
}
void
RouterHive::AddRelay(const std::shared_ptr<llarp::Config>& config)
{
AddRouter(config, &relays, true);
AddRouter(config, true);
}
void
RouterHive::AddClient(const std::shared_ptr<llarp::Config>& config)
{
AddRouter(config, &clients, false);
AddRouter(config, false);
}
void
RouterHive::StartRouters(std::vector<llarp_main*>* routers, bool isRelay)
RouterHive::StartRouters(bool isRelay)
{
for (llarp_main* ctx : *routers)
auto& container = (isRelay ? relays : clients);
for (auto [routerId, ctx] : container)
{
routerMainThreads.emplace_back([=]() {
llarp_main_run(ctx, llarp_main_runtime_opts{false, false, false, isRelay});
ctx->Run(llarp::RuntimeOptions{false, false, isRelay});
});
std::this_thread::sleep_for(2ms);
}
@ -61,39 +59,39 @@ namespace tooling
void
RouterHive::StartRelays()
{
StartRouters(&relays, true);
StartRouters(true);
}
void
RouterHive::StartClients()
{
StartRouters(&clients, false);
StartRouters(false);
}
void
RouterHive::StopRouters()
{
llarp::LogInfo("Signalling all routers to stop");
for (llarp_main* ctx : relays)
for (auto [routerId, ctx] : relays)
{
llarp_main_signal(ctx, 2 /* SIGINT */);
LogicCall(ctx->logic, [ctx]() { ctx->HandleSignal(SIGINT); });
}
for (llarp_main* ctx : clients)
for (auto [routerId, ctx] : clients)
{
llarp_main_signal(ctx, 2 /* SIGINT */);
LogicCall(ctx->logic, [ctx]() { ctx->HandleSignal(SIGINT); });
}
llarp::LogInfo("Waiting on routers to be stopped");
for (llarp_main* ctx : relays)
for (auto [routerId, ctx] : relays)
{
while (llarp_main_is_running(ctx))
while (ctx->IsUp())
{
std::this_thread::sleep_for(10ms);
}
}
for (llarp_main* ctx : clients)
for (auto [routerId, ctx] : clients)
{
while (llarp_main_is_running(ctx))
while (ctx->IsUp())
{
std::this_thread::sleep_for(10ms);
}
@ -148,46 +146,40 @@ namespace tooling
}
void
RouterHive::VisitRouter(llarp_main* router, std::function<void(Context_ptr)> visit)
RouterHive::VisitRouter(Context_ptr ctx, std::function<void(Context_ptr)> visit)
{
auto ctx = llarp::Context::Get(router);
LogicCall(ctx->logic, [visit, ctx]() { visit(ctx); });
// TODO: this should be called from each router's appropriate Logic thread, e.g.:
// LogicCall(ctx->logic, [visit, ctx]() { visit(ctx); });
// however, this causes visit calls to be deferred
visit(ctx);
}
void
RouterHive::VisitRelay(size_t index, std::function<void(Context_ptr)> visit)
HiveRouter*
RouterHive::GetRelay(const llarp::RouterID& id, bool needMutexLock)
{
if (index >= relays.size())
{
visit(nullptr);
return;
}
VisitRouter(relays[index], visit);
}
auto guard =
needMutexLock ? std::make_optional<std::lock_guard<std::mutex>>(routerMutex) : std::nullopt;
void
RouterHive::VisitClient(size_t index, std::function<void(Context_ptr)> visit)
{
if (index >= clients.size())
{
visit(nullptr);
return;
}
VisitRouter(clients[index], visit);
auto itr = relays.find(id);
if (itr == relays.end())
return nullptr;
auto ctx = itr->second;
return ctx->getRouterAsHiveRouter();
}
std::vector<size_t>
RouterHive::RelayConnectedRelays()
{
std::lock_guard<std::mutex> guard{routerMutex};
std::vector<size_t> results;
results.resize(relays.size());
std::mutex results_lock;
size_t i = 0;
size_t done_count = 0;
for (auto relay : relays)
for (auto [routerId, ctx] : relays)
{
auto ctx = llarp::Context::Get(relay);
LogicCall(ctx->logic, [&, i, ctx]() {
size_t count = ctx->router->NumberOfConnectedRouters();
std::lock_guard<std::mutex> guard{results_lock};
@ -216,17 +208,43 @@ namespace tooling
std::vector<llarp::RouterContact>
RouterHive::GetRelayRCs()
{
std::lock_guard<std::mutex> guard{routerMutex};
std::vector<llarp::RouterContact> results;
results.resize(relays.size());
size_t i = 0;
for (auto relay : relays)
for (auto [routerId, ctx] : relays)
{
auto ctx = llarp::Context::Get(relay);
results[i] = ctx->router->rc();
i++;
}
return results;
}
void
RouterHive::ForEachRelay(std::function<void(Context_ptr)> visit)
{
for (auto [routerId, ctx] : relays)
{
VisitRouter(ctx, visit);
}
}
void
RouterHive::ForEachClient(std::function<void(Context_ptr)> visit)
{
for (auto [routerId, ctx] : clients)
{
VisitRouter(ctx, visit);
}
}
/// safely visit every router context
void
RouterHive::ForEachRouter(std::function<void(Context_ptr)> visit)
{
ForEachRelay(visit);
ForEachClient(visit);
}
} // namespace tooling

@ -4,6 +4,7 @@
#include <llarp.h>
#include <config/config.hpp>
#include <tooling/hive_context.hpp>
#include <vector>
#include <deque>
@ -16,35 +17,26 @@ struct llarp_main;
namespace llarp
{
struct Context;
}
} // namespace llarp
namespace tooling
{
struct HiveRouter; // Hive's version of Router
struct RouterHive
{
using Context_ptr = std::shared_ptr<llarp::Context>;
using Context_ptr = std::shared_ptr<HiveContext>;
private:
void
StartRouters(std::vector<llarp_main*>* routers, bool isRelay);
void
AddRouter(
const std::shared_ptr<llarp::Config>& config,
std::vector<llarp_main*>* routers,
bool isRelay);
StartRouters(bool isRelay);
/// safely visit router
void
VisitRouter(llarp_main* router, std::function<void(Context_ptr)> visit);
AddRouter(const std::shared_ptr<llarp::Config>& config, bool isRelay);
/// safely visit relay at index N
/// safely visit router (asynchronously)
void
VisitRelay(size_t index, std::function<void(Context_ptr)> visit);
/// safely visit client at index N
void
VisitClient(size_t index, std::function<void(Context_ptr)> visit);
VisitRouter(Context_ptr ctx, std::function<void(Context_ptr)> visit);
public:
RouterHive() = default;
@ -73,31 +65,16 @@ namespace tooling
std::deque<RouterEventPtr>
GetAllEvents();
// functions to safely visit each relay and/or client's HiveContext
void
ForEachRelay(std::function<void(Context_ptr)> visit)
{
for (size_t idx = 0; idx < relays.size(); ++idx)
{
VisitRelay(idx, visit);
}
}
ForEachRelay(std::function<void(Context_ptr)> visit);
void
ForEachClient(std::function<void(Context_ptr)> visit)
{
for (size_t idx = 0; idx < clients.size(); ++idx)
{
VisitClient(idx, visit);
}
}
/// safely visit every router context
ForEachClient(std::function<void(Context_ptr)> visit);
void
ForEachRouter(std::function<void(Context_ptr)> visit)
{
ForEachRelay(visit);
ForEachClient(visit);
}
ForEachRouter(std::function<void(Context_ptr)> visit);
HiveRouter*
GetRelay(const llarp::RouterID& id, bool needMutexLock = true);
std::vector<size_t>
RelayConnectedRelays();
@ -105,8 +82,9 @@ namespace tooling
std::vector<llarp::RouterContact>
GetRelayRCs();
std::vector<llarp_main*> relays;
std::vector<llarp_main*> clients;
std::mutex routerMutex;
std::unordered_map<llarp::RouterID, Context_ptr, llarp::RouterID::Hash> relays;
std::unordered_map<llarp::RouterID, Context_ptr, llarp::RouterID::Hash> clients;
std::vector<std::thread> routerMainThreads;

@ -1,11 +1,13 @@
pybind11_add_module(pyllarp MODULE
module.cpp
llarp/context.cpp
llarp/router.cpp
llarp/router_id.cpp
llarp/router_contact.cpp
llarp/crypto/types.cpp
llarp/config.cpp
llarp/logger.cpp
llarp/peerstats.cpp
llarp/dht/dht_types.cpp
llarp/path/path_types.cpp
llarp/path/path_hop_config.cpp

@ -20,6 +20,9 @@ namespace llarp
void
CryptoTypes_Init(py::module& mod);
void
AbstractRouter_Init(py::module& mod);
void
RouterID_Init(py::module& mod);
@ -32,6 +35,12 @@ namespace llarp
void
PathTypes_Init(py::module& mod);
void
PeerDb_Init(py::module& mod);
void
PeerStats_Init(py::module& mod);
namespace dht
{
void
@ -65,4 +74,10 @@ namespace tooling
void
RouterEvent_Init(py::module& mod);
void
HiveContext_Init(py::module& mod);
void
HiveRouter_Init(py::module& mod);
} // namespace tooling

@ -48,7 +48,8 @@ namespace llarp
})
.def_readwrite("workerThreads", &RouterConfig::m_workerThreads)
.def_readwrite("numNetThreads", &RouterConfig::m_numNetThreads)
.def_readwrite("JobQueueSize", &RouterConfig::m_JobQueueSize);
.def_readwrite("JobQueueSize", &RouterConfig::m_JobQueueSize)
.def_readwrite("enablePeerStats", &RouterConfig::m_enablePeerStats);
py::class_<NetworkConfig>(mod, "NetworkConfig")
.def(py::init<>())
@ -101,7 +102,10 @@ namespace llarp
.def_readwrite("usingSNSeed", &LokidConfig::usingSNSeed)
.def_readwrite("whitelistRouters", &LokidConfig::whitelistRouters)
.def_readwrite("ident_keyfile", &LokidConfig::ident_keyfile)
.def_readwrite("lokidRPCAddr", &LokidConfig::lokidRPCAddr);
.def_property(
"lokidRPCAddr",
[](LokidConfig& self) { return self.lokidRPCAddr.full_address().c_str(); },
[](LokidConfig& self, std::string arg) { self.lokidRPCAddr = lokimq::address(arg); });
py::class_<BootstrapConfig>(mod, "BootstrapConfig")
.def(py::init<>())

@ -1,5 +1,6 @@
#include "common.hpp"
#include <llarp.hpp>
#include <tooling/hive_context.hpp>
#include <router/router.cpp>
#include "llarp/handlers/pyhandler.hpp"
namespace llarp
@ -11,8 +12,10 @@ namespace llarp
py::class_<Context, Context_ptr>(mod, "Context")
.def(
"Setup",
[](Context_ptr self, bool isRelay) -> bool { return self->Setup(isRelay) == 0; })
.def("Run", [](Context_ptr self) -> int { return self->Run(llarp_main_runtime_opts{}); })
[](Context_ptr self, bool isRouter) {
self->Setup({false, false, isRouter});
})
.def("Run", [](Context_ptr self) -> int { return self->Run(RuntimeOptions{}); })
.def("Stop", [](Context_ptr self) { self->CloseAsync(); })
.def("IsUp", &Context::IsUp)
.def("IsRelay", [](Context_ptr self) -> bool { return self->router->IsServiceNode(); })
@ -34,4 +37,19 @@ namespace llarp
})
.def("CallSafe", &Context::CallSafe);
}
} // namespace llarp
namespace tooling
{
void
HiveContext_Init(py::module& mod)
{
using HiveContext_ptr = std::shared_ptr<HiveContext>;
py::class_<tooling::HiveContext, HiveContext_ptr, llarp::Context>(mod, "HiveContext")
.def(
"getRouterAsHiveRouter",
&tooling::HiveContext::getRouterAsHiveRouter,
py::return_value_policy::reference);
}
} // namespace tooling

@ -0,0 +1,40 @@
#include "common.hpp"
#include "config/config.hpp"
#include "peerstats/peer_db.hpp"
#include "peerstats/types.hpp"
#include <netinet/in.h>
namespace llarp
{
void
PeerDb_Init(py::module& mod)
{
using PeerDb_ptr = std::shared_ptr<PeerDb>;
py::class_<PeerDb, PeerDb_ptr>(mod, "PeerDb")
.def("getCurrentPeerStats", &PeerDb::getCurrentPeerStats);
}
void
PeerStats_Init(py::module& mod)
{
py::class_<PeerStats>(mod, "PeerStats")
.def_readwrite("routerId", &PeerStats::routerId)
.def_readwrite("numConnectionAttempts", &PeerStats::numConnectionAttempts)
.def_readwrite("numConnectionSuccesses", &PeerStats::numConnectionSuccesses)
.def_readwrite("numConnectionRejections", &PeerStats::numConnectionRejections)
.def_readwrite("numConnectionTimeouts", &PeerStats::numConnectionTimeouts)
.def_readwrite("numPathBuilds", &PeerStats::numPathBuilds)
.def_readwrite("numPacketsAttempted", &PeerStats::numPacketsAttempted)
.def_readwrite("numPacketsSent", &PeerStats::numPacketsSent)
.def_readwrite("numPacketsDropped", &PeerStats::numPacketsDropped)
.def_readwrite("numPacketsResent", &PeerStats::numPacketsResent)
.def_readwrite("numDistinctRCsReceived", &PeerStats::numDistinctRCsReceived)
.def_readwrite("numLateRCs", &PeerStats::numLateRCs)
.def_readwrite("peakBandwidthBytesPerSec", &PeerStats::peakBandwidthBytesPerSec)
.def_readwrite("longestRCReceiveInterval", &PeerStats::longestRCReceiveInterval)
.def_readwrite("leastRCRemainingLifetime", &PeerStats::leastRCRemainingLifetime)
.def_readwrite("lastRCUpdated", &PeerStats::lastRCUpdated)
.def_readwrite("stale", &PeerStats::stale);
}
} // namespace llarp

@ -0,0 +1,28 @@
#include "common.hpp"
#include "router/abstractrouter.hpp"
#include "tooling/hive_router.hpp"
namespace llarp
{
void
AbstractRouter_Init(py::module& mod)
{
py::class_<AbstractRouter>(mod, "AbstractRouter")
.def("rc", &AbstractRouter::rc)
.def("Stop", &AbstractRouter::Stop)
.def("peerDb", &AbstractRouter::peerDb);
}
} // namespace llarp
namespace tooling
{
void
HiveRouter_Init(py::module& mod)
{
py::class_<HiveRouter, llarp::AbstractRouter>(mod, "HiveRouter")
.def("disableGossiping", &HiveRouter::disableGossiping)
.def("enableGossiping", &HiveRouter::enableGossiping);
}
} // namespace tooling

@ -18,8 +18,6 @@ namespace llarp
.def("__repr__", &RouterID::ToString)
.def("__str__", &RouterID::ToString)
.def("ShortString", &RouterID::ShortString)
.def("__eq__", [](const RouterID* const lhs, const RouterID* const rhs) {
return *lhs == *rhs;
});
.def("__eq__", [](const RouterID& lhs, const RouterID& rhs) { return lhs == rhs; });
}
} // namespace llarp

@ -0,0 +1,28 @@
#pragma once
#include "router_event.hpp"
namespace tooling
{
struct LinkSessionEstablishedEvent : public RouterEvent
{
llarp::RouterID remoteId;
bool inbound = false;
LinkSessionEstablishedEvent(
const llarp::RouterID& ourRouterId, const llarp::RouterID& remoteId_, bool inbound_)
: RouterEvent("Link: LinkSessionEstablishedEvent", ourRouterId, false)
, remoteId(remoteId_)
, inbound(inbound_)
{
}
std::string
ToString() const
{
return RouterEvent::ToString() + (inbound ? "inbound" : "outbound")
+ " : LinkSessionEstablished with " + remoteId.ToString();
}
};
} // namespace tooling

@ -5,6 +5,7 @@
#include "tooling/dht_event.hpp"
#include "tooling/path_event.hpp"
#include "tooling/rc_event.hpp"
#include "tooling/peer_stats_event.hpp"
#include <messages/relay_status.hpp>
#include <path/path.hpp>
@ -73,6 +74,13 @@ namespace tooling
mod, "FindRouterReceivedEvent");
py::class_<FindRouterSentEvent, FindRouterEvent, RouterEvent>(mod, "FindRouterSentEvent");
py::class_<LinkSessionEstablishedEvent, RouterEvent>(mod, "LinkSessionEstablishedEvent")
.def_readonly("remoteId", &LinkSessionEstablishedEvent::remoteId)
.def_readonly("inbound", &LinkSessionEstablishedEvent::inbound);
py::class_<ConnectionAttemptEvent, RouterEvent>(mod, "ConnectionAttemptEvent")
.def_readonly("remoteId", &ConnectionAttemptEvent::remoteId);
}
} // namespace tooling

@ -3,13 +3,18 @@
#include "pybind11/iostream.h"
#include <tooling/router_hive.hpp>
#include "router/abstractrouter.hpp"
#include "llarp.hpp"
namespace tooling
{
void
RouterHive_Init(py::module& mod)
{
using RouterHive_ptr = std::shared_ptr<RouterHive>;
using Context_ptr = RouterHive::Context_ptr;
using ContextVisitor = std::function<void(Context_ptr)>;
py::class_<RouterHive, RouterHive_ptr>(mod, "RouterHive")
.def(py::init<>())
.def("AddRelay", &RouterHive::AddRelay)
@ -17,12 +22,34 @@ namespace tooling
.def("StartRelays", &RouterHive::StartRelays)
.def("StartClients", &RouterHive::StartClients)
.def("StopAll", &RouterHive::StopRouters)
.def("ForEachRelay", &RouterHive::ForEachRelay)
.def("ForEachClient", &RouterHive::ForEachClient)
.def("ForEachRouter", &RouterHive::ForEachRouter)
.def(
"ForEachRelay",
[](RouterHive& hive, ContextVisitor visit) {
hive.ForEachRelay([visit](Context_ptr ctx) {
py::gil_scoped_acquire acquire;
visit(std::move(ctx));
});
})
.def(
"ForEachClient",
[](RouterHive& hive, ContextVisitor visit) {
hive.ForEachClient([visit](Context_ptr ctx) {
py::gil_scoped_acquire acquire;
visit(std::move(ctx));
});
})
.def(
"ForEachRouter",
[](RouterHive& hive, ContextVisitor visit) {
hive.ForEachRouter([visit](Context_ptr ctx) {
py::gil_scoped_acquire acquire;
visit(std::move(ctx));
});
})
.def("GetNextEvent", &RouterHive::GetNextEvent)
.def("GetAllEvents", &RouterHive::GetAllEvents)
.def("RelayConnectedRelays", &RouterHive::RelayConnectedRelays)
.def("GetRelayRCs", &RouterHive::GetRelayRCs);
.def("GetRelayRCs", &RouterHive::GetRelayRCs)
.def("GetRelay", &RouterHive::GetRelay);
}
} // namespace tooling

@ -5,10 +5,15 @@ PYBIND11_MODULE(pyllarp, m)
{
tooling::RouterHive_Init(m);
tooling::RouterEvent_Init(m);
llarp::AbstractRouter_Init(m);
tooling::HiveRouter_Init(m);
llarp::PeerDb_Init(m);
llarp::PeerStats_Init(m);
llarp::RouterID_Init(m);
llarp::RouterContact_Init(m);
llarp::CryptoTypes_Init(m);
llarp::Context_Init(m);
tooling::HiveContext_Init(m);
llarp::Config_Init(m);
llarp::dht::DHTTypes_Init(m);
llarp::PathTypes_Init(m);

@ -68,6 +68,8 @@ add_executable(catchAll
util/test_llarp_util_printer.cpp
util/test_llarp_util_str.cpp
util/test_llarp_util_decaying_hashset.cpp
peerstats/test_peer_db.cpp
peerstats/test_peer_types.cpp
config/test_llarp_config_definition.cpp
config/test_llarp_config_output.cpp
net/test_ip_address.cpp

@ -6,39 +6,40 @@
#include <llarp.hpp>
#include <catch2/catch.hpp>
llarp_main*
static const llarp::RuntimeOptions opts = {.background = false, .debug = false, .isRouter = true};
std::shared_ptr<llarp::Context>
make_context()
{
// make config
auto config = new llarp_config();
REQUIRE(config != nullptr);
REQUIRE(config->impl.LoadDefault(true, fs::current_path()));
llarp::Config conf;
conf.LoadDefault(true, fs::current_path());
// set testing defaults
config->impl.network.m_endpointType = "null";
config->impl.bootstrap.skipBootstrap = true;
config->impl.api.m_enableRPCServer = false;
conf.network.m_endpointType = "null";
conf.bootstrap.skipBootstrap = true;
conf.api.m_enableRPCServer = false;
// make a fake inbound link
config->impl.links.m_InboundLinks.emplace_back();
auto& link = config->impl.links.m_InboundLinks.back();
conf.links.m_InboundLinks.emplace_back();
auto& link = conf.links.m_InboundLinks.back();
link.interface = llarp::net::LoopbackInterfaceName();
link.addressFamily = AF_INET;
link.port = 0;
// configure
auto ptr = llarp_main_init_from_config(config, true);
REQUIRE(ptr != nullptr);
llarp_config_free(config);
return ptr;
auto context = std::make_shared<llarp::Context>();
REQUIRE_NOTHROW(context->Configure(std::move(conf)));
return context;
}
TEST_CASE("ensure snode address allocation", "[snode]")
{
llarp::LogSilencer shutup;
auto ctx = make_context();
REQUIRE(llarp_main_setup(ctx, true) == 0);
auto ctx_pp = llarp::Context::Get(ctx);
ctx_pp->CallSafe([ctx_pp]() {
REQUIRE(ctx_pp->router->IsServiceNode());
auto& context = ctx_pp->router->exitContext();
REQUIRE_NOTHROW(ctx->Setup(opts));
ctx->CallSafe([ctx]() {
REQUIRE(ctx->router->IsServiceNode());
auto& context = ctx->router->exitContext();
llarp::PubKey pk;
pk.Randomize();
@ -51,8 +52,9 @@ TEST_CASE("ensure snode address allocation", "[snode]")
REQUIRE(
context.FindEndpointForPath(firstPath)->LocalIP()
== context.FindEndpointForPath(secondPath)->LocalIP());
ctx_pp->CloseAsync();
ctx->CloseAsync();
});
REQUIRE(llarp_main_run(ctx, llarp_main_runtime_opts{.isRelay = true}) == 0);
llarp_main_free(ctx);
REQUIRE(ctx->Run(opts) == 0);
ctx.reset();
}

@ -33,3 +33,15 @@ def HiveArbitrary():
router_hive.Stop()
@pytest.fixture()
def HiveForPeerStats():
router_hive = None
def _make(n_relays, n_clients, netid):
nonlocal router_hive
router_hive = hive.RouterHive(n_relays, n_clients, netid)
router_hive.Start()
return router_hive
yield _make
router_hive.Stop()

@ -65,6 +65,7 @@ class RouterHive(object):
config.router.nickname = "Router%d" % index
config.router.overrideAddress('127.0.0.1:{}'.format(port))
config.router.blockBogons = False
config.router.enablePeerStats = True
config.network.enableProfiling = False
config.network.routerProfilesFile = "%s/profiles.dat" % dirname
@ -80,6 +81,8 @@ class RouterHive(object):
config.api.enableRPCServer = False
config.lokid.whitelistRouters = False
print("adding relay at index %d" % port);
self.hive.AddRelay(config)
@ -110,6 +113,8 @@ class RouterHive(object):
config.api.enableRPCServer = False
config.lokid.whitelistRouters = False
self.hive.AddClient(config)
def InitFirstRC(self):

@ -0,0 +1,96 @@
import pyllarp
from time import time
def test_peer_stats(HiveForPeerStats):
numRelays = 12
hive = HiveForPeerStats(n_relays=numRelays, n_clients=0, netid="hive")
someRouterId = None
def collectStatsForAWhile(duration):
print("collecting router hive stats for {} seconds...", duration)
start_time = time()
cur_time = start_time
# we track the number of attempted sessions and inbound/outbound established sessions
numInbound = 0
numOutbound = 0
numAttempts = 0
nonlocal someRouterId
while cur_time < start_time + duration:
hive.CollectAllEvents()
for event in hive.events:
event_name = event.__class__.__name__
if event_name == "LinkSessionEstablishedEvent":
if event.inbound:
numInbound += 1
else:
numOutbound += 1
if event_name == "ConnectionAttemptEvent":
numAttempts += 1
# we pick an arbitrary router out of our routers
if someRouterId is None:
someRouterId = event.remoteId;
hive.events = []
cur_time = time()
# these should be strictly equal, although there is variation because of
# the time we sample
print("test duration exceeded")
print("in: {} out: {} attempts: {}", numInbound, numOutbound, numAttempts);
totalReceived = tally_rc_received_for_peer(hive.hive, someRouterId)
# every router should have received this relay's RC exactly once
print("total times RC received: {} numRelays: {}", totalReceived, numRelays)
return {
"numInbound": numInbound,
"numOutbound": numOutbound,
"numAttempts": numAttempts,
"totalTargetRCsReceived": totalReceived,
};
results1 = collectStatsForAWhile(30);
assert(results1["totalTargetRCsReceived"] == numRelays)
# stop our router from gossiping
router = hive.hive.GetRelay(someRouterId, True)
router.disableGossiping();
ignore = collectStatsForAWhile(30);
# ensure that no one hears a fresh RC from this router again
print("Starting second (longer) stats collection...")
results2 = collectStatsForAWhile(3600);
assert(results2["totalTargetRCsReceived"] == numRelays) # should not have increased
def tally_rc_received_for_peer(hive, routerId):
numFound = 0
def visit(context):
nonlocal numFound
peerDb = context.getRouterAsHiveRouter().peerDb()
stats = peerDb.getCurrentPeerStats(routerId);
assert(stats.routerId == routerId)
numFound += stats.numDistinctRCsReceived
hive.ForEachRelay(visit)
return numFound;
if __name__ == "__main__":
main()

@ -86,8 +86,9 @@ struct IWPLinkContext
return true;
},
// established handler
[established](llarp::ILinkSession* s) {
[established](llarp::ILinkSession* s, bool linkIsInbound) {
REQUIRE(s != nullptr);
REQUIRE(inbound == linkIsInbound);
established(s);
return true;
},

@ -0,0 +1,217 @@
#include <peerstats/peer_db.hpp>
#include <test_util.hpp>
#include <numeric>
#include <catch2/catch.hpp>
#include "peerstats/types.hpp"
#include "router_contact.hpp"
#include "util/time.hpp"
TEST_CASE("Test PeerDb PeerStats memory storage", "[PeerDb]")
{
const llarp::RouterID id = llarp::test::makeBuf<llarp::RouterID>(0x01);
const llarp::PeerStats empty(id);
llarp::PeerDb db;
CHECK(db.getCurrentPeerStats(id).has_value() == false);
llarp::PeerStats delta(id);
delta.numConnectionAttempts = 4;
delta.peakBandwidthBytesPerSec = 5;
db.accumulatePeerStats(id, delta);
CHECK(* db.getCurrentPeerStats(id) == delta);
delta = llarp::PeerStats(id);
delta.numConnectionAttempts = 5;
delta.peakBandwidthBytesPerSec = 6;
db.accumulatePeerStats(id, delta);
llarp::PeerStats expected(id);
expected.numConnectionAttempts = 9;
expected.peakBandwidthBytesPerSec = 6;
CHECK(* db.getCurrentPeerStats(id) == expected);
}
TEST_CASE("Test PeerDb flush before load", "[PeerDb]")
{
llarp::PeerDb db;
CHECK_THROWS_WITH(db.flushDatabase(), "Cannot flush database before it has been loaded");
}
TEST_CASE("Test PeerDb load twice", "[PeerDb]")
{
llarp::PeerDb db;
CHECK_NOTHROW(db.loadDatabase(std::nullopt));
CHECK_THROWS_WITH(db.loadDatabase(std::nullopt), "Reloading database not supported");
}
TEST_CASE("Test PeerDb nukes stats on load", "[PeerDb]")
{
const llarp::RouterID id = llarp::test::makeBuf<llarp::RouterID>(0x01);
llarp::PeerDb db;
llarp::PeerStats stats(id);
stats.numConnectionAttempts = 1;
db.accumulatePeerStats(id, stats);
CHECK(* db.getCurrentPeerStats(id) == stats);
db.loadDatabase(std::nullopt);
CHECK(db.getCurrentPeerStats(id).has_value() == false);
}
TEST_CASE("Test PeerDb file-backed database reloads properly", "[PeerDb]")
{
const std::string filename = "/tmp/peerdb_test_tmp2.db.sqlite";
const llarp::RouterID id = llarp::test::makeBuf<llarp::RouterID>(0x02);
{
llarp::PeerDb db;
db.loadDatabase(filename);
llarp::PeerStats stats(id);
stats.numConnectionAttempts = 43;
db.accumulatePeerStats(id, stats);
db.flushDatabase();
}
{
llarp::PeerDb db;
db.loadDatabase(filename);
auto stats = db.getCurrentPeerStats(id);
CHECK(stats.has_value() == true);
CHECK(stats->numConnectionAttempts == 43);
}
fs::remove(filename);
}
TEST_CASE("Test PeerDb modifyPeerStats", "[PeerDb]")
{
const llarp::RouterID id = llarp::test::makeBuf<llarp::RouterID>(0xF2);
int numTimesCalled = 0;
llarp::PeerDb db;
db.loadDatabase(std::nullopt);
db.modifyPeerStats(id, [&](llarp::PeerStats& stats) {
numTimesCalled++;
stats.numPathBuilds += 42;
});
db.flushDatabase();
CHECK(numTimesCalled == 1);
auto stats = db.getCurrentPeerStats(id);
CHECK(stats.has_value());
CHECK(stats->numPathBuilds == 42);
}
TEST_CASE("Test PeerDb handleGossipedRC", "[PeerDb]")
{
const llarp::RouterID id = llarp::test::makeBuf<llarp::RouterID>(0xCA);
auto rcLifetime = llarp::RouterContact::Lifetime;
llarp_time_t now = 0s;
llarp::RouterContact rc;
rc.pubkey = llarp::PubKey(id);
rc.last_updated = 10s;
llarp::PeerDb db;
db.handleGossipedRC(rc, now);
auto stats = db.getCurrentPeerStats(id);
CHECK(stats.has_value());
CHECK(stats->leastRCRemainingLifetime == 0ms); // not calculated on first received RC
CHECK(stats->numDistinctRCsReceived == 1);
CHECK(stats->lastRCUpdated == 10000ms);
now = 9s;
db.handleGossipedRC(rc, now);
stats = db.getCurrentPeerStats(id);
CHECK(stats.has_value());
// these values should remain unchanged, this is not a new RC
CHECK(stats->leastRCRemainingLifetime == 0ms);
CHECK(stats->numDistinctRCsReceived == 1);
CHECK(stats->lastRCUpdated == 10000ms);
rc.last_updated = 11s;
db.handleGossipedRC(rc, now);
stats = db.getCurrentPeerStats(id);
// should be (previous expiration time - new received time)
CHECK(stats->leastRCRemainingLifetime == ((10s + rcLifetime) - now));
CHECK(stats->numDistinctRCsReceived == 2);
CHECK(stats->lastRCUpdated == 11000ms);
}
TEST_CASE("Test PeerDb handleGossipedRC expiry calcs", "[PeerDb]")
{
const llarp::RouterID id = llarp::test::makeBuf<llarp::RouterID>(0xF9);
// see comments in peer_db.cpp above PeerDb::handleGossipedRC() for some context around these
// tests and esp. these numbers
const llarp_time_t ref = 48h;
const llarp_time_t rcLifetime = llarp::RouterContact::Lifetime;
// rc1, first rc received
const llarp_time_t s1 = ref;
const llarp_time_t r1 = s1 + 30s;
const llarp_time_t e1 = s1 + rcLifetime;
llarp::RouterContact rc1;
rc1.pubkey = llarp::PubKey(id);
rc1.last_updated = s1;
// rc2, second rc received
// received "healthily", with lots of room to spare before rc1 expires
const llarp_time_t s2 = s1 + 8h;
const llarp_time_t r2 = s2 + 30s; // healthy recv time
const llarp_time_t e2 = s2 + rcLifetime;
llarp::RouterContact rc2;
rc2.pubkey = llarp::PubKey(id);
rc2.last_updated = s2;
// rc3, third rc received
// received "unhealthily" (after rc2 expires)
const llarp_time_t s3 = s2 + 8h;
const llarp_time_t r3 = e2 + 1h; // received after e2
llarp::RouterContact rc3;
rc3.pubkey = llarp::PubKey(id);
rc3.last_updated = s3;
llarp::PeerDb db;
db.handleGossipedRC(rc1, r1);
auto stats1 = db.getCurrentPeerStats(id);
CHECK(stats1.has_value());
CHECK(stats1->leastRCRemainingLifetime == 0ms);
CHECK(stats1->numDistinctRCsReceived == 1);
CHECK(stats1->lastRCUpdated == s1);
db.handleGossipedRC(rc2, r2);
auto stats2 = db.getCurrentPeerStats(id);
CHECK(stats2.has_value());
CHECK(stats2->leastRCRemainingLifetime == (e1 - r2));
CHECK(stats2->leastRCRemainingLifetime > 0ms); // ensure positive indicates healthy
CHECK(stats2->numDistinctRCsReceived == 2);
CHECK(stats2->lastRCUpdated == s2);
db.handleGossipedRC(rc3, r3);
auto stats3 = db.getCurrentPeerStats(id);
CHECK(stats3.has_value());
CHECK(stats3->leastRCRemainingLifetime == (e2 - r3));
CHECK(
stats3->leastRCRemainingLifetime
< 0ms); // ensure negative indicates unhealthy and we use min()
CHECK(stats3->numDistinctRCsReceived == 3);
CHECK(stats3->lastRCUpdated == s3);
}

@ -0,0 +1,77 @@
#include <numeric>
#include <peerstats/types.hpp>
#include <test_util.hpp>
#include <catch2/catch.hpp>
TEST_CASE("Test PeerStats operator+=", "[PeerStats]")
{
llarp::RouterID id = {};
// TODO: test all members
llarp::PeerStats stats(id);
stats.numConnectionAttempts = 1;
stats.peakBandwidthBytesPerSec = 12;
llarp::PeerStats delta(id);
delta.numConnectionAttempts = 2;
delta.peakBandwidthBytesPerSec = 4;
stats += delta;
CHECK(stats.numConnectionAttempts == 3);
CHECK(stats.peakBandwidthBytesPerSec == 12); // should take max(), not add
}
TEST_CASE("Test PeerStats BEncode", "[PeerStats]")
{
llarp::RouterID id = llarp::test::makeBuf<llarp::RouterID>(0x01);
llarp::PeerStats stats(id);
stats.numConnectionAttempts = 1;
stats.numConnectionSuccesses = 2;
stats.numConnectionRejections = 3;
stats.numConnectionTimeouts = 4;
stats.numPathBuilds = 5;
stats.numPacketsAttempted = 6;
stats.numPacketsSent = 7;
stats.numPacketsDropped = 8;
stats.numPacketsResent = 9;
stats.numDistinctRCsReceived = 10;
stats.numLateRCs = 11;
stats.peakBandwidthBytesPerSec = 12.1; // should truncate to 12
stats.longestRCReceiveInterval = 13ms;
stats.leastRCRemainingLifetime = 14ms;
stats.lastRCUpdated = 15ms;
constexpr int bufSize = 4096;
byte_t* raw = new byte_t[bufSize];
llarp_buffer_t buf(raw, bufSize);
CHECK_NOTHROW(stats.BEncode(&buf));
std::string asString = (const char*)raw;
constexpr std::string_view expected =
"d"
"21:numConnectionAttempts" "i1e"
"22:numConnectionSuccesses" "i2e"
"23:numConnectionRejections" "i3e"
"21:numConnectionTimeouts" "i4e"
"13:numPathBuilds" "i5e"
"19:numPacketsAttempted" "i6e"
"14:numPacketsSent" "i7e"
"17:numPacketsDropped" "i8e"
"16:numPacketsResent" "i9e"
"22:numDistinctRCsReceived" "i10e"
"10:numLateRCs" "i11e"
"24:peakBandwidthBytesPerSec" "i12e"
"24:longestRCReceiveInterval" "i13e"
"24:leastRCRemainingLifetime" "i14e"
"13:lastRCUpdated" "i15e"
"e";
CHECK(asString == expected);
delete [] raw;
}

@ -11,13 +11,15 @@ llarp::RuntimeOptions opts = {false, false, false};
static std::shared_ptr<llarp::Context>
make_context(std::optional<fs::path> keyfile)
{
auto context = std::make_shared<llarp::Context>();
context->Configure(opts, {});
llarp::Config conf;
conf.LoadDefault(opts.isRouter, {});
conf.network.m_endpointType = "null";
conf.network.m_keyfile = keyfile;
conf.bootstrap.skipBootstrap = true;
conf.api.m_enableRPCServer = false;
context->config->network.m_endpointType = "null";
context->config->network.m_keyfile = keyfile;
context->config->bootstrap.skipBootstrap = true;
context->config->api.m_enableRPCServer = false;
auto context = std::make_shared<llarp::Context>();
REQUIRE_NOTHROW(context->Configure(std::move(conf)));
return context;
}

Loading…
Cancel
Save