nodedb refactor

* bump zmq static dep
* lokimq -> oxenmq
* llarp_nodedb -> llarp::NodeDB
* remove all crufty api parts of NodeDB
* make NodeDB rc selection api not suck
* make path builder api not suck
* propagate all above changes so that unit tests work and it all compiles
pull/1529/head
Jeff Becker 3 years ago
parent 811b2a3fbf
commit df4ea34a56
No known key found for this signature in database
GPG Key ID: F357B3B42F6F9B05

@ -162,16 +162,16 @@ endif()
option(FORCE_LOKIMQ_SUBMODULE "force using lokimq submodule" OFF)
if(NOT FORCE_LOKIMQ_SUBMODULE)
pkg_check_modules(LOKIMQ liblokimq>=1.2.2)
pkg_check_modules(LOKIMQ liboxenmq>=1.2.2)
endif()
if(LOKIMQ_FOUND)
add_library(lokimq INTERFACE)
link_dep_libs(lokimq INTERFACE "${LOKIMQ_LIBRARY_DIRS}" ${LOKIMQ_LIBRARIES})
target_include_directories(lokimq INTERFACE ${LOKIMQ_INCLUDE_DIRS})
add_library(lokimq::lokimq ALIAS lokimq)
message(STATUS "Found system liblokimq ${LOKIMQ_VERSION}")
add_library(oxenmq INTERFACE)
link_dep_libs(oxenmq INTERFACE "${LOKIMQ_LIBRARY_DIRS}" ${LOKIMQ_LIBRARIES})
target_include_directories(oxenmq INTERFACE ${LOKIMQ_INCLUDE_DIRS})
add_library(oxenmq::oxenmq ALIAS oxenmq)
message(STATUS "Found system liboxenmq ${LOKIMQ_VERSION}")
else()
message(STATUS "using lokimq submodule")
message(STATUS "using oxenmq submodule")
add_subdirectory(${CMAKE_SOURCE_DIR}/external/loki-mq)
endif()

@ -41,11 +41,11 @@ set(SODIUM_SOURCE libsodium-${SODIUM_VERSION}.tar.gz)
set(SODIUM_HASH SHA512=17e8638e46d8f6f7d024fe5559eccf2b8baf23e143fadd472a7d29d228b186d86686a5e6920385fe2020729119a5f12f989c3a782afbd05a8db4819bb18666ef
CACHE STRING "libsodium source hash")
set(ZMQ_VERSION 4.3.3 CACHE STRING "libzmq version")
set(ZMQ_VERSION 4.3.4 CACHE STRING "libzmq version")
set(ZMQ_MIRROR ${LOCAL_MIRROR} https://github.com/zeromq/libzmq/releases/download/v${ZMQ_VERSION}
CACHE STRING "libzmq mirror(s)")
set(ZMQ_SOURCE zeromq-${ZMQ_VERSION}.tar.gz)
set(ZMQ_HASH SHA512=4c18d784085179c5b1fcb753a93813095a12c8d34970f2e1bfca6499be6c9d67769c71c68b7ca54ff181b20390043170e89733c22f76ff1ea46494814f7095b1
set(ZMQ_HASH SHA512=e198ef9f82d392754caadd547537666d4fba0afd7d027749b3adae450516bcf284d241d4616cad3cb4ad9af8c10373d456de92dc6d115b037941659f141e7c0e
CACHE STRING "libzmq source hash")
set(LIBUV_VERSION 1.40.0 CACHE STRING "libuv version")
@ -283,10 +283,7 @@ endif()
if(CMAKE_CROSSCOMPILING AND ARCH_TRIPLET MATCHES mingw)
set(zmq_patch
PATCH_COMMAND ${PROJECT_SOURCE_DIR}/contrib/apply-patches.sh ${PROJECT_SOURCE_DIR}/contrib/patches/libzmq-mingw-wepoll.patch)
if(ZMQ_VERSION VERSION_LESS 4.3.4)
set(zmq_patch ${zmq_patch} ${PROJECT_SOURCE_DIR}/contrib/patches/libzmq-mingw-closesocket.patch)
endif()
PATCH_COMMAND ${PROJECT_SOURCE_DIR}/contrib/apply-patches.sh ${PROJECT_SOURCE_DIR}/contrib/patches/libzmq-mingw-wepoll.patch ${PROJECT_SOURCE_DIR}/contrib/patches/libzmq-mingw-closesocket.patch)
endif()
build_external(zmq

@ -1,5 +1,5 @@
#!/bin/bash
mkdir -p build-windows
cd build-windows
cmake -G Ninja -DCMAKE_CROSSCOMPILE=ON -DCMAKE_EXE_LINKER_FLAGS=-fstack-protector -DCMAKE_CXX_FLAGS=-fdiagnostics-color=always -DCMAKE_TOOLCHAIN_FILE=../contrib/cross/mingw64.cmake -DBUILD_STATIC_DEPS=ON -DBUILD_PACKAGE=ON -DBUILD_SHARED_LIBS=OFF -DBUILD_TESTING=OFF -DWITH_TESTS=OFF -DNATIVE_BUILD=OFF -DSTATIC_LINK=ON -DWITH_SYSTEMD=OFF -DFORCE_LOKIMQ_SUBMODULE=ON -DSUBMODULE_CHECK=OFF -DWITH_LTO=OFF -DCMAKE_BUILD_TYPE=Release -DCMAKE_CROSSCOMPLING=ON ..
cmake -G Ninja -DCMAKE_EXE_LINKER_FLAGS=-fstack-protector -DCMAKE_CXX_FLAGS=-fdiagnostics-color=always -DCMAKE_TOOLCHAIN_FILE=../contrib/cross/mingw64.cmake -DBUILD_STATIC_DEPS=ON -DBUILD_PACKAGE=ON -DBUILD_SHARED_LIBS=OFF -DBUILD_TESTING=OFF -DWITH_TESTS=OFF -DNATIVE_BUILD=OFF -DSTATIC_LINK=ON -DWITH_SYSTEMD=OFF -DFORCE_LOKIMQ_SUBMODULE=ON -DSUBMODULE_CHECK=OFF -DWITH_LTO=OFF -DCMAKE_BUILD_TYPE=Release ..
ninja package

@ -259,21 +259,21 @@ run_main_context(std::optional<fs::path> confFile, const llarp::RuntimeOptions o
llarp::LogTrace("start of run_main_context()");
try
{
std::unique_ptr<llarp::Config> conf;
std::shared_ptr<llarp::Config> conf;
if (confFile.has_value())
{
llarp::LogInfo("Using config file: ", *confFile);
conf = std::make_unique<llarp::Config>(confFile->parent_path());
conf = std::make_shared<llarp::Config>(confFile->parent_path());
}
else
{
conf = std::make_unique<llarp::Config>(llarp::GetDefaultDataDir());
conf = std::make_shared<llarp::Config>(llarp::GetDefaultDataDir());
}
if (!conf->Load(confFile, opts.isRouter))
throw std::runtime_error{"Config file parsing failed"};
ctx = std::make_shared<llarp::Context>();
ctx->Configure(*conf);
ctx->Configure(std::move(conf));
signal(SIGINT, handle_signal);
signal(SIGTERM, handle_signal);

@ -1,27 +1,31 @@
#ifndef LLARP_HPP
#define LLARP_HPP
#include <llarp.h>
#include <util/fs.hpp>
#include <util/types.hpp>
#include <ev/ev.hpp>
#include <ev/vpn.hpp>
#include <nodedb.hpp>
#include <crypto/crypto.hpp>
#include <router/abstractrouter.hpp>
#include <future>
#include <iostream>
#include <map>
#include <memory>
#include <string>
#include <vector>
struct llarp_ev_loop;
namespace llarp
{
namespace vpn
{
class Platform;
}
class Logic;
struct Config;
struct RouterContact;
struct Config;
struct Crypto;
struct CryptoManager;
struct AbstractRouter;
struct EventLoop;
class NodeDB;
namespace thread
{
class ThreadPool;
@ -36,12 +40,12 @@ namespace llarp
struct Context
{
std::unique_ptr<Crypto> crypto = nullptr;
std::unique_ptr<CryptoManager> cryptoManager = nullptr;
std::unique_ptr<AbstractRouter> router = nullptr;
std::shared_ptr<Crypto> crypto = nullptr;
std::shared_ptr<CryptoManager> cryptoManager = nullptr;
std::shared_ptr<AbstractRouter> router = nullptr;
std::shared_ptr<Logic> logic = nullptr;
std::unique_ptr<llarp_nodedb> nodedb = nullptr;
llarp_ev_loop_ptr mainloop;
std::shared_ptr<NodeDB> nodedb = nullptr;
std::shared_ptr<EventLoop> mainloop;
std::string nodedb_dir;
virtual ~Context() = default;
@ -49,9 +53,6 @@ namespace llarp
void
Close();
int
LoadDatabase();
void
Setup(const RuntimeOptions& opts);
@ -62,10 +63,8 @@ namespace llarp
HandleSignal(int sig);
/// Configure given the specified config.
///
/// note: consider using std::move() when passing conf in.
void
Configure(Config conf);
Configure(std::shared_ptr<Config> conf);
/// handle SIGHUP
void
@ -93,11 +92,11 @@ namespace llarp
/// Creates a router. Can be overridden to allow a different class of router
/// to be created instead. Defaults to llarp::Router.
virtual std::unique_ptr<AbstractRouter>
makeRouter(llarp_ev_loop_ptr __netloop, std::shared_ptr<Logic> logic);
virtual std::shared_ptr<AbstractRouter>
makeRouter(std::shared_ptr<EventLoop> __netloop, std::shared_ptr<Logic> logic);
/// create the vpn platform for use in creating network interfaces
virtual std::unique_ptr<llarp::vpn::Platform>
virtual std::shared_ptr<llarp::vpn::Platform>
makeVPNPlatform();
protected:

@ -39,7 +39,7 @@ target_link_libraries(lokinet-util PUBLIC
nlohmann_json::nlohmann_json
filesystem
date::date
lokimq
oxenmq
sqlite3
)

@ -21,6 +21,8 @@
#include <iostream>
#include "constants/version.hpp"
namespace lokimq = oxenmq;
namespace llarp
{
// constants for config file default values

@ -23,7 +23,7 @@
#include <vector>
#include <unordered_set>
#include <lokimq/address.h>
#include <oxenmq/address.h>
namespace llarp
{
@ -157,7 +157,7 @@ namespace llarp
{
bool whitelistRouters = false;
fs::path ident_keyfile;
lokimq::address lokidRPCAddr;
oxenmq::address lokidRPCAddr;
void
defineConfigOptions(ConfigDefinition& conf, const ConfigGenParameters& params);

@ -28,16 +28,16 @@ namespace llarp
}
void
Context::Configure(Config conf)
Context::Configure(std::shared_ptr<Config> conf)
{
if (nullptr != config.get())
throw std::runtime_error("Config already exists");
config = std::make_shared<Config>(std::move(conf));
config = std::move(conf);
logic = std::make_shared<Logic>();
nodedb_dir = fs::path(config->router.m_dataDir / nodedb_dirname).string();
nodedb_dir = fs::path{config->router.m_dataDir / nodedb_dirname}.string();
}
bool
@ -52,13 +52,6 @@ namespace llarp
return router && router->LooksAlive();
}
int
Context::LoadDatabase()
{
llarp_nodedb::ensure_dir(nodedb_dir.c_str());
return 1;
}
void
Context::Setup(const RuntimeOptions& opts)
{
@ -77,31 +70,25 @@ namespace llarp
mainloop->set_logic(logic);
crypto = std::make_unique<sodium::CryptoLibSodium>();
cryptoManager = std::make_unique<CryptoManager>(crypto.get());
crypto = std::make_shared<sodium::CryptoLibSodium>();
cryptoManager = std::make_shared<CryptoManager>(crypto.get());
router = makeRouter(mainloop, logic);
nodedb = std::make_unique<llarp_nodedb>(
nodedb = std::make_shared<NodeDB>(
nodedb_dir, [r = router.get()](auto call) { r->QueueDiskIO(std::move(call)); });
if (!router->Configure(config, opts.isRouter, nodedb.get()))
if (!router->Configure(config, opts.isRouter, nodedb))
throw std::runtime_error("Failed to configure router");
// must be done after router is made so we can use its disk io worker
// must also be done after configure so that netid is properly set if it
// is provided by config
if (!this->LoadDatabase())
throw std::runtime_error("Config::Setup() failed to load database");
}
std::unique_ptr<AbstractRouter>
Context::makeRouter(llarp_ev_loop_ptr netloop, std::shared_ptr<Logic> logic)
std::shared_ptr<AbstractRouter>
Context::makeRouter(std::shared_ptr<EventLoop> netloop, std::shared_ptr<Logic> logic)
{
return std::make_unique<Router>(netloop, logic, makeVPNPlatform());
return std::make_shared<Router>(netloop, logic, makeVPNPlatform());
}
std::unique_ptr<vpn::Platform>
std::shared_ptr<vpn::Platform>
Context::makeVPNPlatform()
{
auto plat = vpn::MakeNativePlatform(this);
@ -195,10 +182,10 @@ namespace llarp
config.reset();
llarp::LogDebug("free nodedb");
nodedb.release();
nodedb.reset();
llarp::LogDebug("free router");
router.release();
router.reset();
llarp::LogDebug("free logic");
logic.reset();

@ -7,7 +7,9 @@
#include <iterator>
#include <lokimq/hex.h>
#include <oxenmq/hex.h>
namespace lokimq = oxenmq;
#include <sodium/crypto_sign.h>
#include <sodium/crypto_sign_ed25519.h>

@ -228,7 +228,12 @@ namespace llarp
bool
GetRCFromNodeDB(const Key_t& k, llarp::RouterContact& rc) const override
{
return router->nodedb()->Get(k.as_array(), rc);
if (const auto maybe = router->nodedb()->Get(k.as_array()); maybe.has_value())
{
rc = *maybe;
return true;
}
return false;
}
PendingIntrosetLookups _pendingIntrosetLookups;

@ -106,7 +106,7 @@ namespace llarp
}
auto closestRCs =
dht.GetRouter()->nodedb()->FindClosestTo(location, IntroSetStorageRedundancy);
dht.GetRouter()->nodedb()->FindManyClosestTo(location, IntroSetStorageRedundancy);
if (closestRCs.size() <= relayOrder)
{

@ -1,5 +1,5 @@
#include <dht/messages/findname.hpp>
#include <lokimq/bt_serialize.h>
#include <oxenmq/bt_serialize.h>
#include <dht/context.hpp>
#include <dht/messages/gotname.hpp>
#include <router/abstractrouter.hpp>
@ -7,6 +7,8 @@
#include <path/path_context.hpp>
#include <routing/dht_message.hpp>
namespace lokimq = oxenmq;
namespace llarp::dht
{
FindNameMessage::FindNameMessage(const Key_t& from, Key_t namehash, uint64_t txid)

@ -1,9 +1,11 @@
#include <dht/messages/gotname.hpp>
#include <lokimq/bt_serialize.h>
#include <oxenmq/bt_serialize.h>
#include <dht/context.hpp>
#include <router/abstractrouter.hpp>
#include <path/path_context.hpp>
namespace lokimq = oxenmq;
namespace llarp::dht
{
constexpr size_t NameSizeLimit = 128;

@ -86,7 +86,8 @@ namespace llarp
}
// identify closest 4 routers
auto closestRCs = dht.GetRouter()->nodedb()->FindClosestTo(addr, IntroSetStorageRedundancy);
auto closestRCs =
dht.GetRouter()->nodedb()->FindManyClosestTo(addr, IntroSetStorageRedundancy);
if (closestRCs.size() != IntroSetStorageRedundancy)
{
llarp::LogWarn("Received PublishIntroMessage but only know ", closestRCs.size(), " nodes");

@ -81,7 +81,7 @@ namespace llarp::vpn
};
/// create native vpn platform
std::unique_ptr<Platform>
std::shared_ptr<Platform>
MakeNativePlatform(llarp::Context* ctx);
} // namespace llarp::vpn

@ -70,30 +70,10 @@ namespace llarp
m_SnodeBlacklist.insert(std::move(snode));
}
bool
BaseSession::SelectHop(
llarp_nodedb* db,
const std::set<RouterID>& prev,
RouterContact& cur,
size_t hop,
llarp::path::PathRole roles)
std::optional<std::vector<RouterContact>>
BaseSession::GetHopsForBuild()
{
std::set<RouterID> exclude = prev;
for (const auto& snode : m_SnodeBlacklist)
{
if (snode != m_ExitRouter)
exclude.insert(snode);
}
exclude.insert(m_ExitRouter);
if (hop == numHops - 1)
{
if (db->Get(m_ExitRouter, cur))
return true;
m_router->LookupRouter(m_ExitRouter, nullptr);
return false;
}
return path::Builder::SelectHop(db, exclude, cur, hop, roles);
return GetHopsAlignedToForBuild(m_ExitRouter);
}
bool
@ -306,9 +286,8 @@ namespace llarp
if (numHops == 1)
{
auto r = m_router;
RouterContact rc;
if (r->nodedb()->Get(m_ExitRouter, rc))
r->TryConnectAsync(rc, 5);
if (const auto maybe = r->nodedb()->Get(m_ExitRouter); maybe.has_value())
r->TryConnectAsync(*maybe, 5);
else
r->LookupRouter(m_ExitRouter, [r](const std::vector<RouterContact>& results) {
if (results.size())

@ -67,13 +67,8 @@ namespace llarp
bool
CheckPathDead(path::Path_ptr p, llarp_time_t dlt);
bool
SelectHop(
llarp_nodedb* db,
const std::set<RouterID>& prev,
RouterContact& cur,
size_t hop,
llarp::path::PathRole roles) override;
std::optional<std::vector<RouterContact>>
GetHopsForBuild() override;
bool
ShouldBuildMore(llarp_time_t now) const override;

@ -305,20 +305,6 @@ namespace llarp
self->hop->info.upstream, self->hop->ExpireTime() + 10s);
// put hop
self->context->PutTransitHop(self->hop);
// if we have an rc for this hop...
if (self->record.nextRC)
{
// ... and it matches the next hop ...
if (self->record.nextHop == self->record.nextRC->pubkey)
{
// ... and it's valid
const auto now = self->context->Router()->Now();
if (self->record.nextRC->IsPublicRouter() && self->record.nextRC->Verify(now))
{
self->context->Router()->nodedb()->UpdateAsyncIfNewer(*self->record.nextRC.get());
}
}
}
// forward to next hop
using std::placeholders::_1;
auto func = std::bind(

@ -408,10 +408,10 @@ namespace llarp::net
if (parts[1].find_first_not_of('0') == std::string::npos and parts[0] != ifname)
{
const auto& ip = parts[2];
if ((ip.size() == sizeof(uint32_t) * 2) and lokimq::is_hex(ip))
if ((ip.size() == sizeof(uint32_t) * 2) and oxenmq::is_hex(ip))
{
huint32_t x{};
lokimq::from_hex(ip.begin(), ip.end(), reinterpret_cast<char*>(&x.h));
oxenmq::from_hex(ip.begin(), ip.end(), reinterpret_cast<char*>(&x.h));
gateways.emplace_back(x.ToString());
}
}

@ -19,490 +19,255 @@
static const char skiplist_subdirs[] = "0123456789abcdef";
static const std::string RC_FILE_EXT = ".signed";
llarp_nodedb::NetDBEntry::NetDBEntry(llarp::RouterContact value)
: rc(std::move(value)), inserted(llarp::time_now_ms())
{}
bool
llarp_nodedb::Remove(const llarp::RouterID& pk)
{
bool removed = false;
RemoveIf([&](const llarp::RouterContact& rc) -> bool {
if (rc.pubkey == pk)
{
removed = true;
return true;
}
return false;
});
return removed;
}
void
llarp_nodedb::Clear()
namespace llarp
{
llarp::util::Lock lock(access);
entries.clear();
}
NodeDB::Entry::Entry(RouterContact value) : rc(std::move(value)), insertedAt(llarp::time_now_ms())
{}
bool
llarp_nodedb::Get(const llarp::RouterID& pk, llarp::RouterContact& result)
{
llarp::util::Lock l(access);
auto itr = entries.find(pk);
if (itr == entries.end())
return false;
result = itr->second.rc;
return true;
}
void
llarp_nodedb::RemoveIf(std::function<bool(const llarp::RouterContact& rc)> filter)
{
std::set<std::string> files;
static void
EnsureSkiplist(fs::path nodedbDir)
{
llarp::util::Lock l(access);
auto itr = entries.begin();
while (itr != entries.end())
if (not fs::exists(nodedbDir))
{
if (filter(itr->second.rc))
{
files.insert(getRCFilePath(itr->second.rc.pubkey));
itr = entries.erase(itr);
}
// if the old 'netdb' directory exists, move it to this one
fs::path parent = nodedbDir.parent_path();
fs::path old = parent / "netdb";
if (fs::exists(old))
fs::rename(old, nodedbDir);
else
++itr;
fs::create_directory(nodedbDir);
}
}
disk([files = std::move(files)]() {
for (const auto& file : files)
fs::remove(file);
});
}
bool
llarp_nodedb::Has(const llarp::RouterID& pk)
{
llarp::util::Lock lock(access);
return entries.find(pk) != entries.end();
}
llarp::RouterContact
llarp_nodedb::FindClosestTo(const llarp::dht::Key_t& location)
{
llarp::RouterContact rc;
const llarp::dht::XorMetric compare(location);
visit([&rc, compare](const auto& otherRC) -> bool {
if (rc.pubkey.IsZero())
{
rc = otherRC;
return true;
}
if (compare(
llarp::dht::Key_t{otherRC.pubkey.as_array()}, llarp::dht::Key_t{rc.pubkey.as_array()}))
rc = otherRC;
return true;
});
return rc;
}
std::vector<llarp::RouterContact>
llarp_nodedb::FindClosestTo(const llarp::dht::Key_t& location, uint32_t numRouters)
{
llarp::util::Lock lock(access);
std::vector<const llarp::RouterContact*> all;
all.reserve(entries.size());
for (auto& entry : entries)
{
all.push_back(&entry.second.rc);
}
if (not fs::is_directory(nodedbDir))
throw std::runtime_error(llarp::stringify("nodedb ", nodedbDir, " is not a directory"));
auto it_mid = numRouters < all.size() ? all.begin() + numRouters : all.end();
std::partial_sort(
all.begin(),
it_mid,
all.end(),
[compare = llarp::dht::XorMetric{location}](auto* a, auto* b) { return compare(*a, *b); });
std::vector<llarp::RouterContact> closest;
closest.reserve(numRouters);
for (auto it = all.begin(); it != it_mid; ++it)
closest.push_back(**it);
return closest;
}
/// skiplist directory is hex encoded first nibble
/// skiplist filename is <base32encoded>.snode.signed
std::string
llarp_nodedb::getRCFilePath(const llarp::RouterID& pubkey) const
{
std::string hexString = lokimq::to_hex(pubkey.begin(), pubkey.end());
std::string skiplistDir;
llarp::RouterID r(pubkey);
std::string fname = r.ToString();
skiplistDir += hexString[0];
fname += RC_FILE_EXT;
fs::path filepath = nodePath / skiplistDir / fname;
return filepath.string();
}
void
llarp_nodedb::InsertAsync(
llarp::RouterContact rc,
std::shared_ptr<llarp::Logic> logic,
std::function<void(void)> completionHandler)
{
disk([this, rc, logic, completionHandler]() {
this->Insert(rc);
if (logic && completionHandler)
for (const char& ch : skiplist_subdirs)
{
LogicCall(logic, completionHandler);
// this seems to be a problem on all targets
// perhaps cpp17::fs is just as screwed-up
// attempting to create a folder with no name
// what does this mean...?
if (!ch)
continue;
fs::path sub = nodedbDir / std::string(&ch, 1);
fs::create_directory(sub);
}
});
}
bool
llarp_nodedb::UpdateAsyncIfNewer(
llarp::RouterContact rc,
std::shared_ptr<llarp::Logic> logic,
std::function<void(void)> completionHandler)
{
llarp::util::Lock lock(access);
auto itr = entries.find(rc.pubkey);
if (itr == entries.end() || itr->second.rc.OtherIsNewer(rc))
{
InsertAsync(rc, logic, completionHandler);
return true;
}
if (itr != entries.end())
{
// insertion time is set on...insertion. But it should be updated here
// even if there is no insertion of a new RC, to show that the existing one
// is not "stale"
itr->second.inserted = llarp::time_now_ms();
}
return false;
}
/// insert
bool
llarp_nodedb::Insert(const llarp::RouterContact& rc)
{
llarp::util::Lock lock(access);
auto itr = entries.find(rc.pubkey.as_array());
if (itr != entries.end())
entries.erase(itr);
entries.emplace(rc.pubkey.as_array(), rc);
LogDebug(
"Added or updated RC for ",
llarp::RouterID(rc.pubkey),
" to nodedb. Current nodedb count is: ",
entries.size());
return true;
}
ssize_t
llarp_nodedb::Load(const fs::path& path)
{
std::error_code ec;
if (!fs::exists(path, ec))
{
return -1;
}
ssize_t loaded = 0;
constexpr auto FlushInterval = 5min;
for (const char& ch : skiplist_subdirs)
NodeDB::NodeDB(fs::path root, std::function<void(std::function<void()>)> diskCaller)
: m_Root{std::move(root)}
, disk(std::move(diskCaller))
, m_NextFlushAt{time_now_ms() + FlushInterval}
{
if (!ch)
continue;
std::string p;
p += ch;
fs::path sub = path / p;
ssize_t l = loadSubdir(sub);
if (l > 0)
loaded += l;
EnsureSkiplist(m_Root);
}
m_NextSaveToDisk = llarp::time_now_ms() + m_SaveInterval;
return loaded;
}
void
llarp_nodedb::SaveAll()
{
std::array<byte_t, MAX_RC_SIZE> tmp;
llarp::util::Lock lock(access);
for (const auto& item : entries)
void
NodeDB::Tick(llarp_time_t now)
{
llarp_buffer_t buf(tmp);
if (!item.second.rc.BEncode(&buf))
continue;
buf.sz = buf.cur - buf.base;
const auto filepath = getRCFilePath(item.second.rc.pubkey);
auto optional_ofs = llarp::util::OpenFileStream<std::ofstream>(
filepath, std::ofstream::out | std::ofstream::binary | std::ofstream::trunc);
if (!optional_ofs)
continue;
auto& ofs = *optional_ofs;
ofs.write((char*)buf.base, buf.sz);
ofs.flush();
ofs.close();
if (now > m_NextFlushAt)
{
m_NextFlushAt += FlushInterval;
// make copy of all rcs
std::vector<RouterContact> copy;
for (const auto& item : m_Entries)
copy.push_back(item.second.rc);
// flush them to disk in one big job
// TODO: split this up? idk maybe some day...
disk([this, data = std::move(copy)]() {
for (const auto& rc : data)
{
rc.Write(GetPathForPubkey(rc.pubkey));
}
});
}
}
}
bool
llarp_nodedb::ShouldSaveToDisk(llarp_time_t now) const
{
if (now == 0s)
now = llarp::time_now_ms();
return m_NextSaveToDisk > 0s && m_NextSaveToDisk <= now;
}
fs::path
NodeDB::GetPathForPubkey(RouterID pubkey) const
{
std::string hexString = oxenmq::to_hex(pubkey.begin(), pubkey.end());
std::string skiplistDir;
void
llarp_nodedb::AsyncFlushToDisk()
{
disk([this]() { SaveAll(); });
m_NextSaveToDisk = llarp::time_now_ms() + m_SaveInterval;
}
const llarp::RouterID r{pubkey};
std::string fname = r.ToString();
ssize_t
llarp_nodedb::loadSubdir(const fs::path& dir)
{
ssize_t sz = 0;
llarp::util::IterDir(dir, [&](const fs::path& f) -> bool {
if (fs::is_regular_file(f) && loadfile(f))
sz++;
return true;
});
return sz;
}
bool
llarp_nodedb::loadfile(const fs::path& fpath)
{
if (fpath.extension() != RC_FILE_EXT)
return false;
llarp::RouterContact rc;
if (!rc.Read(fpath))
{
llarp::LogError("failed to read file ", fpath);
return false;
skiplistDir += hexString[0];
fname += RC_FILE_EXT;
return m_Root / skiplistDir / fname;
}
if (!rc.Verify(llarp::time_now_ms()))
void
NodeDB::LoadFromDisk()
{
llarp::LogError(fpath, " contains invalid RC");
return false;
for (const char& ch : skiplist_subdirs)
{
if (!ch)
continue;
std::string p;
p += ch;
fs::path sub = m_Root / p;
llarp::util::IterDir(sub, [&](const fs::path& f) -> bool {
if (fs::is_regular_file(f) and f.extension() == RC_FILE_EXT)
{
RouterContact rc{};
if (rc.Read(f) and rc.Verify(time_now_ms()))
m_Entries.emplace(rc.pubkey, rc);
}
return true;
});
}
}
void
NodeDB::SaveToDisk() const
{
llarp::util::Lock lock(access);
entries.emplace(rc.pubkey.as_array(), rc);
for (const auto& item : m_Entries)
{
item.second.rc.Write(GetPathForPubkey(item.first));
}
}
return true;
}
void
llarp_nodedb::visit(std::function<bool(const llarp::RouterContact&)> visit)
{
llarp::util::Lock lock(access);
auto itr = entries.begin();
while (itr != entries.end())
bool
NodeDB::Has(RouterID pk) const
{
if (!visit(itr->second.rc))
return;
++itr;
util::NullLock lock{m_Access};
return m_Entries.find(pk) != m_Entries.end();
}
}
void
llarp_nodedb::VisitInsertedBefore(
std::function<void(const llarp::RouterContact&)> visit, llarp_time_t insertedAfter)
{
llarp::util::Lock lock(access);
auto itr = entries.begin();
while (itr != entries.end())
std::optional<RouterContact>
NodeDB::Get(RouterID pk) const
{
if (itr->second.inserted < insertedAfter)
visit(itr->second.rc);
++itr;
util::NullLock lock{m_Access};
const auto itr = m_Entries.find(pk);
if (itr == m_Entries.end())
return std::nullopt;
return itr->second.rc;
}
}
void
llarp_nodedb::RemoveStaleRCs(const std::set<llarp::RouterID>& keep, llarp_time_t cutoff)
{
std::set<llarp::RouterID> removeStale;
// remove stale routers
VisitInsertedBefore(
[&](const llarp::RouterContact& rc) {
if (keep.find(rc.pubkey) != keep.end())
return;
LogInfo("removing stale router: ", llarp::RouterID(rc.pubkey));
removeStale.insert(rc.pubkey);
},
cutoff);
RemoveIf([&removeStale](const llarp::RouterContact& rc) -> bool {
return removeStale.count(rc.pubkey) > 0;
});
}
// write it to disk
void
disk_threadworker_setRC(llarp_async_verify_rc* verify_request)
{
verify_request->valid = verify_request->nodedb->Insert(verify_request->rc);
if (verify_request->logic)
void
NodeDB::Remove(RouterID pk)
{
LogicCall(verify_request->logic, [verify_request]() {
if (verify_request->hook)
verify_request->hook(verify_request);
});
util::NullLock lock{m_Access};
m_Entries.erase(pk);
AsyncRemoveManyFromDisk({pk});
}
}
// we run the crypto verify in the crypto threadpool worker
void
crypto_threadworker_verifyrc(llarp_async_verify_rc* verify_request)
{
llarp::RouterContact rc = verify_request->rc;
verify_request->valid = rc.Verify(llarp::time_now_ms());
// if it's valid we need to set it
if (verify_request->valid && rc.IsPublicRouter())
void
NodeDB::RemoveStaleRCs(std::unordered_set<RouterID> keep, llarp_time_t cutoff)
{
if (verify_request->disk)
util::NullLock lock{m_Access};
std::unordered_set<RouterID> removed;
auto itr = m_Entries.begin();
while (itr != m_Entries.end())
{
llarp::LogDebug("RC is valid, saving to disk");
verify_request->disk(std::bind(&disk_threadworker_setRC, verify_request));
return;
if (itr->second.insertedAt < cutoff and keep.count(itr->second.rc.pubkey) == 0)
{
removed.insert(itr->second.rc.pubkey);
itr = m_Entries.erase(itr);
}
else
++itr;
}
if (not removed.empty())
AsyncRemoveManyFromDisk(std::move(removed));
}
// callback to logic thread
LogicCall(verify_request->logic, [verify_request]() {
if (verify_request->hook)
verify_request->hook(verify_request);
});
}
void
llarp_nodedb_async_verify(struct llarp_async_verify_rc* job)
{
job->worker(std::bind(&crypto_threadworker_verifyrc, job));
}
void
llarp_nodedb::ensure_dir(const fs::path& nodedbDir)
{
if (not fs::exists(nodedbDir))
void
NodeDB::Put(RouterContact rc)
{
// if the old 'netdb' directory exists, move it to this one
fs::path parent = nodedbDir.parent_path();
fs::path old = parent / "netdb";
if (fs::exists(old))
fs::rename(old, nodedbDir);
else
fs::create_directory(nodedbDir);
util::NullLock lock{m_Access};
m_Entries.erase(rc.pubkey);
m_Entries.emplace(rc.pubkey, rc);
}
if (not fs::is_directory(nodedbDir))
throw std::runtime_error(llarp::stringify("nodedb ", nodedbDir, " is not a directory"));
for (const char& ch : skiplist_subdirs)
size_t
NodeDB::NumLoaded() const
{
// this seems to be a problem on all targets
// perhaps cpp17::fs is just as screwed-up
// attempting to create a folder with no name
// what does this mean...?
if (!ch)
continue;
fs::path sub = nodedbDir / std::string(&ch, 1);
fs::create_directory(sub);
util::NullLock lock{m_Access};
return m_Entries.size();
}
}
ssize_t
llarp_nodedb::LoadAll()
{
return Load(nodePath.c_str());
}
size_t
llarp_nodedb::num_loaded() const
{
llarp::util::Lock l{access};
return entries.size();
}
bool
llarp_nodedb::select_random_exit(llarp::RouterContact& result)
{
llarp::util::Lock lock(access);
const auto sz = entries.size();
auto itr = entries.begin();
if (sz < 3)
return false;
auto idx = llarp::randint() % sz;
if (idx)
std::advance(itr, idx - 1);
while (itr != entries.end())
void
NodeDB::PutIfNewer(RouterContact rc)
{
if (itr->second.rc.IsExit())
util::NullLock lock{m_Access};
auto itr = m_Entries.find(rc.pubkey);
if (itr == m_Entries.end() or itr->second.rc.OtherIsNewer(rc))
{
result = itr->second.rc;
return true;
// delete if existing
if (itr != m_Entries.end())
m_Entries.erase(itr);
// add new entry
m_Entries.emplace(rc.pubkey, rc);
}
++itr;
}
// wrap around
itr = entries.begin();
while (idx--)
void
NodeDB::AsyncRemoveManyFromDisk(std::unordered_set<RouterID> remove) const
{
if (itr->second.rc.IsExit())
// build file list
std::set<fs::path> files;
for (auto id : remove)
{
result = itr->second.rc;
return true;
files.emplace(GetPathForPubkey(std::move(id)));
}
++itr;
// remove them from the disk via the diskio thread
disk([files]() {
for (auto fpath : files)
fs::remove(fpath);
});
}
return false;
}
bool
llarp_nodedb::select_random_hop_excluding(
llarp::RouterContact& result, const std::set<llarp::RouterID>& exclude)
{
llarp::util::Lock lock(access);
/// checking for "guard" status for N = 0 is done by caller inside of
/// pathbuilder's scope
const size_t sz = entries.size();
if (sz < 3)
llarp::RouterContact
NodeDB::FindClosestTo(llarp::dht::Key_t location) const
{
return false;
util::NullLock lock{m_Access};
llarp::RouterContact rc;
const llarp::dht::XorMetric compare(location);
VisitAll([&rc, compare](const auto& otherRC) {
if (rc.pubkey.IsZero())
{
rc = otherRC;
return;
}
if (compare(
llarp::dht::Key_t{otherRC.pubkey.as_array()},
llarp::dht::Key_t{rc.pubkey.as_array()}))
rc = otherRC;
});
return rc;
}
const size_t pos = llarp::randint() % sz;
const auto start = std::next(entries.begin(), pos);
for (auto itr = start; itr != entries.end(); ++itr)
std::vector<RouterContact>
NodeDB::FindManyClosestTo(llarp::dht::Key_t location, uint32_t numRouters) const
{
if (exclude.count(itr->first) == 0 and itr->second.rc.IsPublicRouter())
{
result = itr->second.rc;
return true;
}
}
for (auto itr = entries.begin(); itr != start; ++itr)
{
if (exclude.count(itr->first) == 0 and itr->second.rc.IsPublicRouter())
util::NullLock lock{m_Access};
std::vector<const RouterContact*> all;
const auto& entries = m_Entries;
all.reserve(entries.size());
for (auto& entry : entries)
{
result = itr->second.rc;
return true;
all.push_back(&entry.second.rc);
}
auto it_mid = numRouters < all.size() ? all.begin() + numRouters : all.end();
std::partial_sort(
all.begin(), it_mid, all.end(), [compare = dht::XorMetric{location}](auto* a, auto* b) {
return compare(*a, *b);
});
std::vector<RouterContact> closest;
closest.reserve(numRouters);
for (auto it = all.begin(); it != it_mid; ++it)
closest.push_back(**it);
return closest;
}
return false;
}
} // namespace llarp

@ -8,221 +8,174 @@
#include <util/thread/threading.hpp>
#include <util/thread/annotations.hpp>
#include <dht/key.hpp>
#include <crypto/crypto.hpp>
#include <set>
#include <optional>
#include <unordered_set>
#include <unordered_map>
#include <utility>
#include <atomic>
/**
* nodedb.hpp
*
* persistent storage API for router contacts
*/
namespace llarp
{
class Logic;
} // namespace llarp
struct llarp_nodedb
{
using DiskJob_t = std::function<void(void)>;
using DiskCaller_t = std::function<void(DiskJob_t)>;
using WorkJob_t = std::function<void(void)>;
using WorkCaller_t = std::function<void(WorkJob_t)>;
explicit llarp_nodedb(const std::string rootdir, DiskCaller_t diskCaller)
: disk(std::move(diskCaller)), nodePath(rootdir)
{}
~llarp_nodedb()
class NodeDB
{
Clear();
}
const DiskCaller_t disk;
mutable llarp::util::Mutex access; // protects entries
/// time for next save to disk event, 0 if never happened
llarp_time_t m_NextSaveToDisk = 0s;
/// how often to save to disk
const llarp_time_t m_SaveInterval = 5min;
struct NetDBEntry
{
const llarp::RouterContact rc;
llarp_time_t inserted;
NetDBEntry(llarp::RouterContact data);
struct Entry
{
const RouterContact rc;
llarp_time_t insertedAt;
explicit Entry(RouterContact rc);
};
using NodeMap = std::unordered_map<RouterID, Entry>;
NodeMap m_Entries;
const fs::path m_Root;
const std::function<void(std::function<void()>)> disk;
llarp_time_t m_NextFlushAt;
mutable util::NullMutex m_Access;
/// asynchronously remove the files for a set of rcs on disk given their public ident key
void
AsyncRemoveManyFromDisk(std::unordered_set<RouterID> idents) const;
/// get filename of an RC file given its public ident key
fs::path
GetPathForPubkey(RouterID pk) const;
public:
explicit NodeDB(fs::path rootdir, std::function<void(std::function<void()>)> diskCaller);
/// load all entries from disk syncrhonously
void
LoadFromDisk();
/// explicit save all RCs to disk synchronously
void
SaveToDisk() const;
/// the number of RCs that are loaded from disk
size_t
NumLoaded() const;
/// do periodic tasks like flush to disk and expiration
void
Tick(llarp_time_t now);
/// find the absolute closets router to a dht location
RouterContact
FindClosestTo(dht::Key_t location) const;
/// find many routers closest to dht key
std::vector<RouterContact>
FindManyClosestTo(dht::Key_t location, uint32_t numRouters) const;
/// return true if we have an rc by its ident pubkey
bool
Has(RouterID pk) const;
/// maybe get an rc by its ident pubkey
std::optional<RouterContact>
Get(RouterID pk) const;
template <typename Filter>
std::optional<RouterContact>
GetIf(Filter visit, bool shuffle = true) const
{
util::NullLock lock{m_Access};
const auto sz = m_Entries.size();
if (sz < 3)
return std::nullopt;
auto begin = m_Entries.begin();
auto start = begin;
if (shuffle)
{
start = std::next(start, randint() % sz);
}
for (auto itr = start; itr != m_Entries.end(); ++itr)
{
if (visit(itr->second.rc))
return itr->second.rc;
}
if (shuffle)
{
for (auto itr = begin; begin != start; ++itr)
{
if (visit(itr->second.rc))
return itr->second.rc;
}
}
return std::nullopt;
}
/// visit all entries
template <typename Visit>
void
VisitAll(Visit visit) const
{
util::NullLock lock{m_Access};
for (const auto& item : m_Entries)
{
visit(item.second.rc);
}
}
/// visit all entries inserted before a timestamp
template <typename Visit>
void
VisitInsertedBefore(Visit visit, llarp_time_t insertedBefore)
{
util::NullLock lock{m_Access};
for (const auto& item : m_Entries)
{
if (item.second.insertedAt < insertedBefore)
visit(item.second.rc);
}
}
/// remove an entry via its ident pubkey
void
Remove(RouterID pk);
/// remove an entry given a filter that inspects the rc
template <typename Filter>
void
RemoveIf(Filter visit)
{
util::NullLock lock{m_Access};
std::unordered_set<RouterID> removed;
auto itr = m_Entries.begin();
while (itr != m_Entries.end())
{
if (visit(itr->second.rc))
{
removed.insert(itr->second.rc.pubkey);
itr = m_Entries.erase(itr);
}
else
++itr;
}
if (not removed.empty())
AsyncRemoveManyFromDisk(std::move(removed));
}
/// remove rcs that are not in keep and have been inserted before cutoff
void
RemoveStaleRCs(std::unordered_set<RouterID> keep, llarp_time_t cutoff);
/// put this rc into the cache if it is not there or newer than the one there already
void
PutIfNewer(RouterContact rc);
/// unconditional put of rc into cache
void
Put(RouterContact rc);
};
using NetDBMap_t = std::unordered_map<llarp::RouterID, NetDBEntry, llarp::RouterID::Hash>;
NetDBMap_t entries GUARDED_BY(access);
fs::path nodePath;
llarp::RouterContact
FindClosestTo(const llarp::dht::Key_t& location);
/// find the $numRouters closest routers to the given DHT key
std::vector<llarp::RouterContact>
FindClosestTo(const llarp::dht::Key_t& location, uint32_t numRouters);
/// return true if we should save our nodedb to disk
bool
ShouldSaveToDisk(llarp_time_t now = 0s) const;
bool
Remove(const llarp::RouterID& pk) EXCLUDES(access);
void
RemoveIf(std::function<bool(const llarp::RouterContact&)> filter) EXCLUDES(access);
void
Clear() EXCLUDES(access);
bool
Get(const llarp::RouterID& pk, llarp::RouterContact& result) EXCLUDES(access);
bool
Has(const llarp::RouterID& pk) EXCLUDES(access);
std::string
getRCFilePath(const llarp::RouterID& pubkey) const;
/// insert without writing to disk
bool
Insert(const llarp::RouterContact& rc) EXCLUDES(access);
/// invokes Insert() asynchronously with an optional completion
/// callback
void
InsertAsync(
llarp::RouterContact rc,
std::shared_ptr<llarp::Logic> l = nullptr,
std::function<void(void)> completionHandler = nullptr);
/// update rc if newer
/// return true if we started to put this rc in the database
/// retur false if not newer
bool
UpdateAsyncIfNewer(
llarp::RouterContact rc,
std::shared_ptr<llarp::Logic> l = nullptr,
std::function<void(void)> completionHandler = nullptr) EXCLUDES(access);
ssize_t
Load(const fs::path& path);
ssize_t
loadSubdir(const fs::path& dir);
/// save all entries to disk async
void
AsyncFlushToDisk();
bool
loadfile(const fs::path& fpath) EXCLUDES(access);
void
visit(std::function<bool(const llarp::RouterContact&)> visit) EXCLUDES(access);
void
set_dir(const char* dir);
ssize_t
LoadAll();
ssize_t
store_dir(const char* dir);
/// visit all entries inserted into nodedb cache before a timestamp
void
VisitInsertedBefore(
std::function<void(const llarp::RouterContact&)> visit, llarp_time_t insertedAfter)
EXCLUDES(access);
void
RemoveStaleRCs(const std::set<llarp::RouterID>& keep, llarp_time_t cutoff);
size_t
num_loaded() const EXCLUDES(access);
bool
select_random_exit(llarp::RouterContact& rc) EXCLUDES(access);
bool
select_random_hop_excluding(
llarp::RouterContact& result, const std::set<llarp::RouterID>& exclude) EXCLUDES(access);
/// Ensures that the given nodedb 'dir' exists
///
/// @param nodedbDir should be the desired nodedb directory
/// @throws on any filesistem error or if `nodedbDir` exists and is not a directory
static void
ensure_dir(const fs::path& nodedbDir);
void
SaveAll() EXCLUDES(access);
};
/// struct for async rc verification
struct llarp_async_verify_rc;
using llarp_async_verify_rc_hook_func = std::function<void(struct llarp_async_verify_rc*)>;
/// verify rc request
struct llarp_async_verify_rc
{
/// async_verify_context
void* user;
/// nodedb storage
llarp_nodedb* nodedb;
// llarp::Logic for queue_job
std::shared_ptr<llarp::Logic> logic;
llarp_nodedb::WorkCaller_t worker;
llarp_nodedb::DiskCaller_t disk;
/// router contact
llarp::RouterContact rc;
/// result
bool valid;
/// hook
llarp_async_verify_rc_hook_func hook;
};
/**
struct for async rc verification
data is loaded in disk io threadpool
crypto is done on the crypto worker threadpool
result is called on the logic thread
*/
void
llarp_nodedb_async_verify(struct llarp_async_verify_rc* job);
struct llarp_async_load_rc;
using llarp_async_load_rc_hook_func = std::function<void(struct llarp_async_load_rc*)>;
struct llarp_async_load_rc
{
/// async_verify_context
void* user;
/// nodedb storage
llarp_nodedb* nodedb;
/// llarp::Logic for calling hook
llarp::Logic* logic;
/// disk worker threadpool
llarp_nodedb::DiskCaller_t disk;
/// target pubkey
llarp::PubKey pubkey;
/// router contact result
llarp::RouterContact result;
/// set to true if we loaded the rc
bool loaded;
/// hook function called in logic thread
llarp_async_load_rc_hook_func hook;
};
/// asynchronously load an rc from disk
void
llarp_nodedb_async_load_rc(struct llarp_async_load_rc* job);
} // namespace llarp
#endif

@ -205,56 +205,53 @@ namespace llarp
return obj;
}
bool
Builder::SelectHop(
llarp_nodedb* db,
const std::set<RouterID>& exclude,
RouterContact& cur,
size_t hop,
PathRole roles)
std::optional<RouterContact>
Builder::SelectFirstHop() const
{
(void)roles;
size_t tries = 10;
if (hop == 0)
{
if (m_router->NumberOfConnectedRouters() == 0)
{
return false;
}
bool got = false;
m_router->ForEachPeer(
[&](const ILinkSession* s, bool isOutbound) {
if (s && s->IsEstablished() && isOutbound && !got)
{
const RouterContact rc = s->GetRemoteRC();
#ifdef TESTNET
if (got || exclude.count(rc.pubkey))
#else
if (got || exclude.count(rc.pubkey) || m_router->IsBootstrapNode(rc.pubkey))
std::optional<RouterContact> found = std::nullopt;
m_router->ForEachPeer(
[&](const ILinkSession* s, bool isOutbound) {
if (s && s->IsEstablished() && isOutbound && not found.has_value())
{
const RouterContact rc = s->GetRemoteRC();
#ifndef TESTNET
if (m_router->IsBootstrapNode(rc.pubkey))
return;
#endif
return;
cur = rc;
got = true;
}
},
true);
return got;
}
found = rc;
}
},
true);
return found;
}
do
std::optional<std::vector<RouterContact>>
Builder::GetHopsForBuild()
{
std::vector<RouterContact> hops;
{
cur.Clear();
--tries;
std::set<RouterID> excluding = exclude;
if (db->select_random_hop_excluding(cur, excluding))
{
excluding.insert(cur.pubkey);
if (!m_router->routerProfiling().IsBadForPath(cur.pubkey))
return true;
}
} while (tries > 0);
return false;
const auto maybe = SelectFirstHop();
if (not maybe.has_value())
return std::nullopt;
hops.emplace_back(*maybe);
};
for (size_t idx = hops.size(); idx < numHops; ++idx)
{
const auto maybe = m_router->nodedb()->GetIf([&hops, r = m_router](const auto& rc) -> bool {
if (r->routerProfiling().IsBadForPath(rc.pubkey))
return false;
for (const auto& hop : hops)
{
if (hop.pubkey == rc.pubkey)
return false;
}
return true;
});
if (not maybe.has_value())
return std::nullopt;
hops.emplace_back(*maybe);
}
return hops;
}
bool
@ -301,9 +298,8 @@ namespace llarp
void
Builder::BuildOne(PathRole roles)
{
std::vector<RouterContact> hops(numHops);
if (SelectHops(m_router->nodedb(), hops, roles))
Build(hops, roles);
if (const auto maybe = GetHopsForBuild(); maybe.has_value())
Build(*maybe, roles);
}
bool Builder::UrgentBuild(llarp_time_t) const
@ -311,114 +307,60 @@ namespace llarp
return buildIntervalLimit > MIN_PATH_BUILD_INTERVAL * 4;
}
bool
Builder::DoUrgentBuildAlignedTo(const RouterID remote, std::vector<RouterContact>& hops)
std::optional<std::vector<RouterContact>>
Builder::GetHopsAlignedToForBuild(RouterID endpoint)
{
const auto aligned = m_router->pathContext().FindOwnedPathsWithEndpoint(remote);
/// pick the lowest latency path that aligns to remote
/// note: peer exhaustion is made worse happen here
Path_ptr p;
llarp_time_t min = std::numeric_limits<llarp_time_t>::max();
for (const auto& path : aligned)
{
if (path->intro.latency < min && path->hops.size() == numHops)
{
p = path;
min = path->intro.latency;
}
}
if (p)
std::vector<RouterContact> hops;
{
for (const auto& hop : p->hops)
{
if (hop.rc.pubkey.IsZero())
return false;
hops.emplace_back(hop.rc);
}
}
return true;
}
bool
Builder::DoBuildAlignedTo(const RouterID remote, std::vector<RouterContact>& hops)
{
std::set<RouterID> routers{remote};
hops.resize(numHops);
auto nodedb = m_router->nodedb();
for (size_t idx = 0; idx < hops.size(); idx++)
const auto maybe = SelectFirstHop();
if (not maybe.has_value())
return std::nullopt;
hops.emplace_back(*maybe);
};
for (size_t idx = hops.size(); idx < numHops; ++idx)
{
hops[idx].Clear();
if (idx == numHops - 1)
if (idx + 1 == numHops)
{
// last hop
if (!nodedb->Get(remote, hops[idx]))
const auto maybe = m_router->nodedb()->Get(endpoint);
if (maybe.has_value())
{
m_router->LookupRouter(remote, nullptr);
return false;
hops.emplace_back(*maybe);
}
else
return std::nullopt;
}
else
{
if (!SelectHop(nodedb, routers, hops[idx], idx, path::ePathRoleAny))
{
return false;
}
const auto maybe =
m_router->nodedb()->GetIf([&hops, r = m_router, endpoint](const auto& rc) -> bool {
if (r->routerProfiling().IsBadForPath(rc.pubkey))
return false;
for (const auto& hop : hops)
{
if (hop.pubkey == rc.pubkey)
return false;
}
return rc.pubkey != endpoint;
});
if (not maybe.has_value())
return std::nullopt;
hops.emplace_back(*maybe);
}
if (hops[idx].pubkey.IsZero())
return false;
routers.insert(hops[idx].pubkey);
}
return true;
return hops;
}
bool
Builder::BuildOneAlignedTo(const RouterID remote)
{
std::vector<RouterContact> hops;
/// if we really need this path build it "dangerously"
if (UrgentBuild(m_router->Now()))
if (const auto maybe = GetHopsAlignedToForBuild(remote); maybe.has_value())
{
if (!DoUrgentBuildAlignedTo(remote, hops))
{
return false;
}
LogInfo(Name(), " building path to ", remote);
Build(*maybe);
return true;
}
if (hops.empty())
{
if (!DoBuildAlignedTo(remote, hops))
{
return false;
}
}
LogInfo(Name(), " building path to ", remote);
Build(hops);
return true;
}
bool
Builder::SelectHops(llarp_nodedb* nodedb, std::vector<RouterContact>& hops, PathRole roles)
{
std::set<RouterID> exclude;
for (size_t idx = 0; idx < hops.size(); ++idx)
{
hops[idx].Clear();
size_t tries = 32;
while (tries > 0 && !SelectHop(nodedb, exclude, hops[idx], idx, roles))
{
--tries;
}
if (tries == 0 || hops[idx].pubkey.IsZero())
{
LogWarn(Name(), " failed to select hop ", idx);
return false;
}
exclude.emplace(hops[idx].pubkey);
}
return true;
return false;
}
llarp_time_t
@ -428,7 +370,7 @@ namespace llarp
}
void
Builder::Build(const std::vector<RouterContact>& hops, PathRole roles)
Builder::Build(std::vector<RouterContact> hops, PathRole roles)
{
if (IsStopped())
return;

@ -30,12 +30,6 @@ namespace llarp
void
DoPathBuildBackoff();
bool
DoUrgentBuildAlignedTo(const RouterID remote, std::vector<RouterContact>& hops);
bool
DoBuildAlignedTo(const RouterID remote, std::vector<RouterContact>& hops);
public:
AbstractRouter* m_router;
SecretKey enckey;
@ -51,14 +45,6 @@ namespace llarp
util::StatusObject
ExtractStatus() const;
bool
SelectHop(
llarp_nodedb* db,
const std::set<RouterID>& prev,
RouterContact& cur,
size_t hop,
PathRole roles) override;
bool
ShouldBuildMore(llarp_time_t now) const override;
@ -107,11 +93,18 @@ namespace llarp
bool
BuildOneAlignedTo(const RouterID endpoint) override;
virtual std::optional<std::vector<RouterContact>>
GetHopsAlignedToForBuild(RouterID endpoint);
void
Build(const std::vector<RouterContact>& hops, PathRole roles = ePathRoleAny) override;
Build(std::vector<RouterContact> hops, PathRole roles = ePathRoleAny) override;
bool
SelectHops(llarp_nodedb* db, std::vector<RouterContact>& hops, PathRole roles = ePathRoleAny);
/// pick a first hop
virtual std::optional<RouterContact>
SelectFirstHop() const;
std::optional<std::vector<RouterContact>>
GetHopsForBuild() override;
void
ManualRebuild(size_t N, PathRole roles = ePathRoleAny);

@ -14,11 +14,10 @@
#include <map>
#include <tuple>
struct llarp_nodedb;
namespace llarp
{
struct RouterContact;
class NodeDB;
namespace dht
{
@ -109,7 +108,7 @@ namespace llarp
/// manual build on these hops
virtual void
Build(const std::vector<RouterContact>& hops, PathRole roles = ePathRoleAny) = 0;
Build(std::vector<RouterContact> hops, PathRole roles = ePathRoleAny) = 0;
/// tick owned paths
virtual void
@ -261,20 +260,15 @@ namespace llarp
virtual void
ResetInternalState() = 0;
virtual bool
SelectHop(
llarp_nodedb* db,
const std::set<RouterID>& prev,
RouterContact& cur,
size_t hop,
PathRole roles) = 0;
virtual bool
BuildOneAlignedTo(const RouterID endpoint) = 0;
virtual void
SendPacketToRemote(const llarp_buffer_t& pkt) = 0;
virtual std::optional<std::vector<RouterContact>>
GetHopsForBuild() = 0;
void
ForEachPath(std::function<void(const Path_ptr&)> visit) const
{

@ -1,9 +1,11 @@
#include <peerstats/types.hpp>
#include <util/str.hpp>
#include <lokimq/bt_serialize.h>
#include <oxenmq/bt_serialize.h>
#include <stdexcept>
namespace lokimq = oxenmq;
namespace llarp
{
constexpr auto RouterIdKey = "routerId";

@ -20,16 +20,15 @@
struct llarp_buffer_t;
struct llarp_dht_context;
struct llarp_nodedb;
struct llarp_threadpool;
namespace lokimq
namespace oxenmq
{
class LokiMQ;
class OxenMQ;
}
namespace llarp
{
class NodeDB;
class Logic;
struct Config;
struct RouterID;
@ -80,7 +79,7 @@ namespace llarp
class Platform;
}
using LMQ_ptr = std::shared_ptr<lokimq::LokiMQ>;
using LMQ_ptr = std::shared_ptr<oxenmq::OxenMQ>;
struct AbstractRouter
{
@ -108,7 +107,7 @@ namespace llarp
virtual llarp_dht_context*
dht() const = 0;
virtual llarp_nodedb*
virtual std::shared_ptr<NodeDB>
nodedb() const = 0;
virtual const path::PathContext&
@ -178,7 +177,7 @@ namespace llarp
Sign(Signature& sig, const llarp_buffer_t& buf) const = 0;
virtual bool
Configure(std::shared_ptr<Config> conf, bool isRouter, llarp_nodedb* nodedb) = 0;
Configure(std::shared_ptr<Config> conf, bool isRouter, std::shared_ptr<NodeDB> nodedb) = 0;
virtual bool
IsServiceNode() const = 0;

@ -123,8 +123,14 @@ namespace llarp
std::set<RouterID> exclude;
do
{
auto filter = [exclude](const auto& rc) -> bool { return exclude.count(rc.pubkey) == 0; };
RouterContact other;
if (not _nodedb->select_random_hop_excluding(other, exclude))
if (const auto maybe = _nodedb->GetIf(filter); maybe.has_value())
{
other = *maybe;
}
else
break;
exclude.insert(other.pubkey);
@ -158,14 +164,13 @@ namespace llarp
I_RCLookupHandler* rcLookup,
Profiling* profiler,
std::shared_ptr<Logic> logic,
llarp_nodedb* nodedb,
WorkerFunc_t dowork)
{
_router = router;
_linkManager = linkManager;
_rcLookup = rcLookup;
_logic = logic;
_nodedb = nodedb;
_nodedb = router->nodedb();
_profiler = profiler;
work = dowork;
}

@ -13,8 +13,6 @@
#include <list>
#include <memory>
struct llarp_nodedb;
namespace llarp
{
struct PendingSession;
@ -63,7 +61,6 @@ namespace llarp
I_RCLookupHandler* rcLookup,
Profiling* profiler,
std::shared_ptr<Logic> logic,
llarp_nodedb* nodedb,
WorkerFunc_t work);
void
@ -115,7 +112,7 @@ namespace llarp
ILinkManager* _linkManager = nullptr;
I_RCLookupHandler* _rcLookup = nullptr;
Profiling* _profiler = nullptr;
llarp_nodedb* _nodedb = nullptr;
std::shared_ptr<NodeDB> _nodedb;
std::shared_ptr<Logic> _logic;
WorkerFunc_t work;
RouterID us;

@ -61,8 +61,9 @@ namespace llarp
RouterContact remoteRC;
if (not forceLookup)
{
if (_nodedb->Get(router, remoteRC))
if (const auto maybe = _nodedb->Get(router); maybe.has_value())
{
remoteRC = *maybe;
if (callback)
{
callback(router, &remoteRC, RCRequestResult::Success);
@ -155,7 +156,8 @@ namespace llarp
if (rc.IsPublicRouter())
{
LogDebug("Adding or updating RC for ", RouterID(rc.pubkey), " to nodedb and dht.");
_nodedb->UpdateAsyncIfNewer(rc);
const RouterContact copy{rc};
LogicCall(_logic, [copy, n = _nodedb]() { n->PutIfNewer(copy); });
_dht->impl->PutRCNodeAsync(rc);
}
@ -210,7 +212,7 @@ namespace llarp
RCLookupHandler::PeriodicUpdate(llarp_time_t now)
{
// try looking up stale routers
std::set<RouterID> routersToLookUp;
std::unordered_set<RouterID> routersToLookUp;
_nodedb->VisitInsertedBefore(
[&](const RouterContact& rc) {
@ -231,7 +233,7 @@ namespace llarp
void
RCLookupHandler::ExploreNetwork()
{
const size_t known = _nodedb->num_loaded();
const size_t known = _nodedb->NumLoaded();
if (_bootstrapRCList.empty() && known == 0)
{
LogError("we have no bootstrap nodes specified");
@ -298,17 +300,19 @@ namespace llarp
void
RCLookupHandler::Init(
llarp_dht_context* dht,
llarp_nodedb* nodedb,
std::shared_ptr<NodeDB> nodedb,
std::shared_ptr<Logic> logic,
WorkerFunc_t dowork,
ILinkManager* linkManager,
service::Context* hiddenServiceContext,
const std::set<RouterID>& strictConnectPubkeys,
const std::unordered_set<RouterID>& strictConnectPubkeys,
const std::set<RouterContact>& bootstrapRCList,
bool useWhitelist_arg,
bool isServiceNode_arg)
{
_dht = dht;
_nodedb = nodedb;
_logic = logic;
_work = dowork;
_hiddenServiceContext = hiddenServiceContext;
_strictConnectPubkeys = strictConnectPubkeys;

@ -8,13 +8,16 @@
#include <unordered_map>
#include <set>
#include <unordered_set>
#include <list>
struct llarp_nodedb;
struct llarp_dht_context;
namespace llarp
{
class NodeDB;
class Logic;
namespace service
{
struct Context;
@ -72,11 +75,12 @@ namespace llarp
void
Init(
llarp_dht_context* dht,
llarp_nodedb* nodedb,
std::shared_ptr<NodeDB> nodedb,
std::shared_ptr<Logic> logic,
WorkerFunc_t dowork,
ILinkManager* linkManager,
service::Context* hiddenServiceContext,
const std::set<RouterID>& strictConnectPubkeys,
const std::unordered_set<RouterID>& strictConnectPubkeys,
const std::set<RouterContact>& bootstrapRCList,
bool useWhitelist_arg,
bool isServiceNode_arg);
@ -98,17 +102,18 @@ namespace llarp
mutable util::Mutex _mutex; // protects pendingCallbacks, whitelistRouters
llarp_dht_context* _dht = nullptr;
llarp_nodedb* _nodedb = nullptr;
std::shared_ptr<NodeDB> _nodedb;
std::shared_ptr<Logic> _logic;
WorkerFunc_t _work = nullptr;
service::Context* _hiddenServiceContext = nullptr;
ILinkManager* _linkManager = nullptr;
/// explicit whitelist of routers we will connect to directly (not for
/// service nodes)
std::set<RouterID> _strictConnectPubkeys;
std::unordered_set<RouterID> _strictConnectPubkeys;
std::set<RouterContact> _bootstrapRCList;
std::set<RouterID> _bootstrapRouterIDList;
std::unordered_set<RouterID> _bootstrapRouterIDList;
std::unordered_map<RouterID, CallbacksQueue, RouterID::Hash> pendingCallbacks
GUARDED_BY(_mutex);
@ -116,7 +121,7 @@ namespace llarp
bool useWhitelist = false;
bool isServiceNode = false;
std::set<RouterID> whitelistRouters GUARDED_BY(_mutex);
std::unordered_set<RouterID> whitelistRouters GUARDED_BY(_mutex);
using TimePoint = std::chrono::steady_clock::time_point;
std::unordered_map<RouterID, TimePoint, RouterID::Hash> _routerLookupTimes;

@ -40,7 +40,7 @@
#include <systemd/sd-daemon.h>
#endif
#include <lokimq/lokimq.h>
#include <oxenmq/oxenmq.h>
static constexpr std::chrono::milliseconds ROUTER_TICK_INTERVAL = 1s;
@ -49,9 +49,9 @@ namespace llarp
Router::Router(
llarp_ev_loop_ptr __netloop,
std::shared_ptr<Logic> l,
std::unique_ptr<vpn::Platform> vpnPlatform)
std::shared_ptr<vpn::Platform> vpnPlatform)
: ready(false)
, m_lmq(std::make_shared<lokimq::LokiMQ>())
, m_lmq(std::make_shared<oxenmq::OxenMQ>())
, _netloop(std::move(__netloop))
, _logic(std::move(l))
, _vpnPlatform(std::move(vpnPlatform))
@ -93,7 +93,7 @@ namespace llarp
peerStatsObj = m_peerDb->ExtractStatus();
return util::StatusObject{{"running", true},
{"numNodesKnown", _nodedb->num_loaded()},
{"numNodesKnown", _nodedb->NumLoaded()},
{"dht", _dht->impl->ExtractStatus()},
{"services", _hiddenServiceContext.ExtractStatus()},
{"exit", _exitContext.ExtractStatus()},
@ -150,19 +150,13 @@ namespace llarp
return _rcLookupHandler.GetRandomWhitelistRouter(router);
}
auto pick_router = [&](auto& collection) -> bool {
const auto sz = collection.size();
auto itr = collection.begin();
if (sz == 0)
return false;
if (sz > 1)
std::advance(itr, randint() % sz);
router = itr->first;
if (const auto maybe = nodedb()->GetIf([](const auto&) -> bool { return true; }, true);
maybe.has_value())
{
router = maybe->pubkey;
return true;
};
util::Lock l{nodedb()->access};
return pick_router(nodedb()->entries);
}
return false;
}
void
@ -270,7 +264,7 @@ namespace llarp
}
bool
Router::Configure(std::shared_ptr<Config> c, bool isRouter, llarp_nodedb* nodedb)
Router::Configure(std::shared_ptr<Config> c, bool isRouter, std::shared_ptr<NodeDB> nodedb)
{
m_Config = c;
auto& conf = *m_Config;
@ -470,7 +464,7 @@ namespace llarp
/// build a set of strictConnectPubkeys (
/// TODO: make this consistent with config -- do we support multiple strict connections
// or not?
std::set<RouterID> strictConnectPubkeys;
std::unordered_set<RouterID> strictConnectPubkeys;
if (not networkConfig.m_strictConnect.empty())
{
const auto& val = networkConfig.m_strictConnect;
@ -562,12 +556,12 @@ namespace llarp
&_rcLookupHandler,
&_routerProfiling,
_logic,
_nodedb,
util::memFn(&AbstractRouter::QueueWork, this));
_linkManager.Init(&_outboundSessionMaker);
_rcLookupHandler.Init(
_dht,
_nodedb,
_logic,
util::memFn(&AbstractRouter::QueueWork, this),
&_linkManager,
&_hiddenServiceContext,
@ -691,7 +685,7 @@ namespace llarp
Router::ReportStats()
{
const auto now = Now();
LogInfo(nodedb()->num_loaded(), " RCs loaded");
LogInfo(nodedb()->NumLoaded(), " RCs loaded");
LogInfo(bootstrapRCList.size(), " bootstrap peers");
LogInfo(NumberOfConnectedRouters(), " router connections");
if (IsServiceNode())
@ -719,13 +713,13 @@ namespace llarp
ss << "WATCHDOG=1\nSTATUS=v" << llarp::VERSION_STR;
if (IsServiceNode())
{
ss << " snode | known/svc/clients: " << nodedb()->num_loaded() << "/"
ss << " snode | known/svc/clients: " << nodedb()->NumLoaded() << "/"
<< NumberOfConnectedRouters() << "/" << NumberOfConnectedClients() << " | "
<< pathContext().CurrentTransitPaths() << " active paths";
}
else
{
ss << " client | known/connected: " << nodedb()->num_loaded() << "/"
ss << " client | known/connected: " << nodedb()->NumLoaded() << "/"
<< NumberOfConnectedRouters() << " | path success: ";
hiddenServiceContext().ForEachService([&ss](const auto& name, const auto& ep) {
ss << " [" << name << " " << std::setprecision(4)
@ -852,11 +846,8 @@ namespace llarp
{
QueueDiskIO([&]() { routerProfiling().Save(_profilesFile); });
}
// save nodedb
if (nodedb()->ShouldSaveToDisk(now))
{
nodedb()->AsyncFlushToDisk();
}
_nodedb->Tick(now);
if (m_peerDb)
{
@ -908,10 +899,11 @@ namespace llarp
LogInfo("Session to ", remote, " fully closed");
if (IsServiceNode())
return;
RouterContact rc;
if (not nodedb()->Get(remote, rc))
return;
m_RoutePoker.DelRoute(rc.addrs[0].toIpAddress().toIP());
if (const auto maybe = nodedb()->Get(remote); maybe.has_value())
{
for (const auto& addr : maybe->addrs)
m_RoutePoker.DelRoute(addr.toIpAddress().toIP());
}
}
void
@ -1097,31 +1089,20 @@ namespace llarp
}
{
ssize_t loaded = _nodedb->LoadAll();
llarp::LogInfo("loaded ", loaded, " RCs");
if (loaded < 0)
{
// shouldn't be possible
return false;
}
LogInfo("Loading nodedb from disk...");
_nodedb->LoadFromDisk();
}
llarp_dht_context_start(dht(), pubkey());
for (const auto& rc : bootstrapRCList)
{
if (this->nodedb()->Insert(rc))
{
LogInfo("added bootstrap node ", RouterID(rc.pubkey));
}
else
{
LogError("Failed to add bootstrap node ", RouterID(rc.pubkey));
}
nodedb()->Put(rc);
_dht->impl->Nodes()->PutNode(rc);
LogInfo("added bootstrap node ", RouterID{rc.pubkey});
}
LogInfo("have ", _nodedb->num_loaded(), " routers");
LogInfo("have ", _nodedb->NumLoaded(), " routers");
#ifdef _WIN32
// windows uses proactor event loop so we need to constantly pump
@ -1165,7 +1146,7 @@ namespace llarp
Router::AfterStopIssued()
{
StopLinks();
nodedb()->AsyncFlushToDisk();
nodedb()->SaveToDisk();
_logic->call_later(200ms, std::bind(&Router::AfterStopLinks, this));
}

@ -46,7 +46,7 @@
#include <unordered_map>
#include <vector>
#include <lokimq/address.h>
#include <oxenmq/address.h>
namespace llarp
{
@ -103,7 +103,7 @@ namespace llarp
util::StatusObject
ExtractStatus() const override;
llarp_nodedb*
std::shared_ptr<NodeDB>
nodedb() const override
{
return _nodedb;
@ -182,13 +182,13 @@ namespace llarp
llarp_ev_loop_ptr _netloop;
std::shared_ptr<Logic> _logic;
std::unique_ptr<vpn::Platform> _vpnPlatform;
std::shared_ptr<vpn::Platform> _vpnPlatform;
path::PathContext paths;
exit::Context _exitContext;
SecretKey _identity;
SecretKey _encryption;
llarp_dht_context* _dht = nullptr;
llarp_nodedb* _nodedb;
std::shared_ptr<NodeDB> _nodedb;
llarp_time_t _startedAt;
const lokimq::TaggedThreadID m_DiskThread;
@ -329,7 +329,7 @@ namespace llarp
explicit Router(
llarp_ev_loop_ptr __netloop,
std::shared_ptr<Logic> logic,
std::unique_ptr<vpn::Platform> vpnPlatform);
std::shared_ptr<vpn::Platform> vpnPlatform);
virtual ~Router() override;
@ -358,7 +358,7 @@ namespace llarp
Close();
bool
Configure(std::shared_ptr<Config> conf, bool isRouter, llarp_nodedb* nodedb = nullptr) override;
Configure(std::shared_ptr<Config> conf, bool isRouter, std::shared_ptr<NodeDB> nodedb) override;
bool
StartRpcServer() override;

@ -10,7 +10,7 @@
#include <util/printer.hpp>
#include <util/time.hpp>
#include <lokimq/bt_serialize.h>
#include <oxenmq/bt_serialize.h>
#include <fstream>
#include <util/fs.hpp>
@ -250,7 +250,7 @@ namespace llarp
try
{
std::string_view buf_view(reinterpret_cast<char*>(buf->cur), buf->size_left());
lokimq::bt_list_consumer btlist(buf_view);
oxenmq::bt_list_consumer btlist(buf_view);
uint64_t outer_version = btlist.consume_integer<uint64_t>();
@ -284,7 +284,7 @@ namespace llarp
}
bool
RouterContact::DecodeVersion_1(lokimq::bt_list_consumer& btlist)
RouterContact::DecodeVersion_1(oxenmq::bt_list_consumer& btlist)
{
auto signature_string = btlist.consume_string_view();
signed_bt_dict = btlist.consume_dict_data();

@ -17,10 +17,10 @@
#define MAX_RC_SIZE (1024)
#define NICKLEN (32)
namespace lokimq
namespace oxenmq
{
class bt_list_consumer;
} // namespace lokimq
} // namespace oxenmq
namespace llarp
{
@ -230,7 +230,7 @@ namespace llarp
DecodeVersion_0(llarp_buffer_t* buf);
bool
DecodeVersion_1(lokimq::bt_list_consumer& btlist);
DecodeVersion_1(oxenmq::bt_list_consumer& btlist);
};
inline std::ostream&

@ -1,5 +1,7 @@
#include <router_id.hpp>
#include <lokimq/base32z.h>
#include <oxenmq/base32z.h>
namespace lokimq = oxenmq;
namespace llarp
{

@ -57,4 +57,17 @@ namespace llarp
} // namespace llarp
namespace std
{
template <>
struct hash<llarp::RouterID>
{
size_t
operator()(const llarp::RouterID& id) const
{
const llarp::RouterID::Hash h{};
return h(id);
}
};
} // namespace std
#endif

@ -23,11 +23,11 @@ namespace llarp::rpc
return;
m_LMQ->connect_remote(
m_AuthURL,
[self = shared_from_this()](lokimq::ConnectionID c) {
[self = shared_from_this()](oxenmq::ConnectionID c) {
self->m_Conn = std::move(c);
LogInfo("connected to endpoint auth server via ", *self->m_Conn);
},
[self = shared_from_this()](lokimq::ConnectionID, std::string_view fail) {
[self = shared_from_this()](oxenmq::ConnectionID, std::string_view fail) {
LogWarn("failed to connect to endpoint auth server: ", fail);
self->m_Endpoint->RouterLogic()->call_later(1s, [self]() { self->Start(); });
});

@ -1,7 +1,7 @@
#pragma once
#include <service/auth.hpp>
#include <lokimq/lokimq.h>
#include <oxenmq/oxenmq.h>
namespace llarp::service
{
@ -13,7 +13,7 @@ namespace llarp::rpc
struct EndpointAuthRPC : public llarp::service::IAuthPolicy,
public std::enable_shared_from_this<EndpointAuthRPC>
{
using LMQ_ptr = std::shared_ptr<lokimq::LokiMQ>;
using LMQ_ptr = std::shared_ptr<oxenmq::OxenMQ>;
using Endpoint_ptr = std::shared_ptr<llarp::service::Endpoint>;
using Whitelist_t = std::unordered_set<llarp::service::Address, llarp::service::Address::Hash>;
@ -39,6 +39,6 @@ namespace llarp::rpc
const Whitelist_t m_AuthWhitelist;
LMQ_ptr m_LMQ;
Endpoint_ptr m_Endpoint;
std::optional<lokimq::ConnectionID> m_Conn;
std::optional<oxenmq::ConnectionID> m_Conn;
};
} // namespace llarp::rpc

@ -2,19 +2,21 @@
#include <router_id.hpp>
#include <lokimq/lokimq.h>
#include <lokimq/address.h>
#include <oxenmq/oxenmq.h>
#include <oxenmq/address.h>
#include <crypto/types.hpp>
#include <dht/key.hpp>
#include <service/name.hpp>
namespace lokimq = oxenmq;
namespace llarp
{
struct AbstractRouter;
namespace rpc
{
using LMQ_ptr = std::shared_ptr<lokimq::LokiMQ>;
using LMQ_ptr = std::shared_ptr<lokimq::OxenMQ>;
/// The LokidRpcClient uses loki-mq to talk to make API requests to lokid.
struct LokidRpcClient : public std::enable_shared_from_this<LokidRpcClient>

@ -55,7 +55,7 @@ namespace llarp::rpc
void
HandleJSONRequest(
lokimq::Message& msg, std::function<void(nlohmann::json, ReplyFunction_t)> handleRequest)
oxenmq::Message& msg, std::function<void(nlohmann::json, ReplyFunction_t)> handleRequest)
{
const auto maybe = MaybeParseJSON(msg);
if (not maybe.has_value())
@ -70,10 +70,8 @@ namespace llarp::rpc
}
try
{
std::promise<std::string> reply;
handleRequest(*maybe, [&reply](std::string result) { reply.set_value(result); });
auto ftr = reply.get_future();
msg.send_reply(ftr.get());
handleRequest(
*maybe, [defer = msg.send_later()](std::string result) { defer.reply(result); });
}
catch (std::exception& ex)
{
@ -82,13 +80,13 @@ namespace llarp::rpc
}
void
RpcServer::AsyncServeRPC(lokimq::address url)
RpcServer::AsyncServeRPC(oxenmq::address url)
{
m_LMQ->listen_plain(url.zmq_address());
m_LMQ->add_category("llarp", lokimq::AuthLevel::none)
m_LMQ->add_category("llarp", oxenmq::AuthLevel::none)
.add_command(
"halt",
[&](lokimq::Message& msg) {
[&](oxenmq::Message& msg) {
if (not m_Router->IsRunning())
{
msg.send_reply(CreateJSONError("router is not running"));
@ -99,25 +97,30 @@ namespace llarp::rpc
})
.add_request_command(
"version",
[r = m_Router](lokimq::Message& msg) {
[r = m_Router](oxenmq::Message& msg) {
util::StatusObject result{{"version", llarp::VERSION_FULL},
{"uptime", to_json(r->Uptime())}};
msg.send_reply(CreateJSONResponse(result));
})
.add_request_command(
"status",
[&](lokimq::Message& msg) {
std::promise<util::StatusObject> result;
LogicCall(m_Router->logic(), [&result, r = m_Router]() {
const auto state = r->ExtractStatus();
result.set_value(state);
[&](oxenmq::Message& msg) {
LogicCall(m_Router->logic(), [defer = msg.send_later(), r = m_Router]() {
std::string data;
if (r->IsRunning())
{
data = CreateJSONResponse(r->ExtractStatus());
}
else
{
data = CreateJSONError("router not yet ready");
}
defer.reply(data);
});
auto ftr = result.get_future();
msg.send_reply(CreateJSONResponse(ftr.get()));
})
.add_request_command(
"exit",
[&](lokimq::Message& msg) {
[&](oxenmq::Message& msg) {
HandleJSONRequest(msg, [r = m_Router](nlohmann::json obj, ReplyFunction_t reply) {
if (r->IsServiceNode())
{
@ -261,7 +264,7 @@ namespace llarp::rpc
});
});
})
.add_request_command("config", [&](lokimq::Message& msg) {
.add_request_command("config", [&](oxenmq::Message& msg) {
HandleJSONRequest(msg, [r = m_Router](nlohmann::json obj, ReplyFunction_t reply) {
{
const auto itr = obj.find("override");

@ -1,17 +1,19 @@
#pragma once
#include <string_view>
#include <lokimq/lokimq.h>
#include <lokimq/address.h>
#include <oxenmq/oxenmq.h>
#include <oxenmq/address.h>
namespace llarp
{
struct AbstractRouter;
}
namespace lokimq = oxenmq;
namespace llarp::rpc
{
using LMQ_ptr = std::shared_ptr<lokimq::LokiMQ>;
using LMQ_ptr = std::shared_ptr<lokimq::OxenMQ>;
struct RpcServer
{

@ -1,77 +1,75 @@
#include <service/address.hpp>
#include <crypto/crypto.hpp>
#include <lokimq/base32z.h>
#include <oxenmq/base32z.h>
#include <algorithm>
namespace llarp
namespace lokimq = oxenmq;
namespace llarp::service
{
namespace service
const std::set<std::string> Address::AllowedTLDs = {".loki", ".snode"};
bool
Address::PermitTLD(const char* tld)
{
const std::set<std::string> Address::AllowedTLDs = {".loki", ".snode"};
std::string gtld(tld);
std::transform(gtld.begin(), gtld.end(), gtld.begin(), ::tolower);
return AllowedTLDs.count(gtld) != 0;
}
bool
Address::PermitTLD(const char* tld)
std::string
Address::ToString(const char* tld) const
{
if (!PermitTLD(tld))
return "";
std::string str;
if (subdomain.size())
{
std::string gtld(tld);
std::transform(gtld.begin(), gtld.end(), gtld.begin(), ::tolower);
return AllowedTLDs.count(gtld) != 0;
str = subdomain;
str += '.';
}
str += lokimq::to_base32z(begin(), end());
str += tld;
return str;
}
std::string
Address::ToString(const char* tld) const
{
if (!PermitTLD(tld))
return "";
std::string str;
if (subdomain.size())
{
str = subdomain;
str += '.';
}
str += lokimq::to_base32z(begin(), end());
str += tld;
return str;
}
bool
Address::FromString(std::string_view str, const char* tld)
{
if (!PermitTLD(tld))
return false;
// Find, validate, and remove the .tld
const auto pos = str.find_last_of('.');
if (pos == std::string::npos)
return false;
if (str.substr(pos) != tld)
return false;
str = str.substr(0, pos);
bool
Address::FromString(std::string_view str, const char* tld)
// copy subdomains if they are there (and strip them off)
const auto idx = str.find_last_of('.');
if (idx != std::string::npos)
{
if (!PermitTLD(tld))
return false;
// Find, validate, and remove the .tld
const auto pos = str.find_last_of('.');
if (pos == std::string::npos)
return false;
if (str.substr(pos) != tld)
return false;
str = str.substr(0, pos);
// copy subdomains if they are there (and strip them off)
const auto idx = str.find_last_of('.');
if (idx != std::string::npos)
{
subdomain = str.substr(0, idx);
str.remove_prefix(idx + 1);
}
// Ensure we have something valid:
// - must end in a 1-bit value: 'o' or 'y' (i.e. 10000 or 00000)
// - must have 51 preceeding base32z chars
// - thus we get 51*5+1 = 256 bits = 32 bytes of output
if (str.size() != 52 || !lokimq::is_base32z(str) || !(str.back() == 'o' || str.back() == 'y'))
return false;
lokimq::from_base32z(str.begin(), str.end(), begin());
return true;
subdomain = str.substr(0, idx);
str.remove_prefix(idx + 1);
}
dht::Key_t
Address::ToKey() const
{
PubKey k;
CryptoManager::instance()->derive_subkey(k, PubKey(data()), 1);
return dht::Key_t{k.as_array()};
}
// Ensure we have something valid:
// - must end in a 1-bit value: 'o' or 'y' (i.e. 10000 or 00000)
// - must have 51 preceeding base32z chars
// - thus we get 51*5+1 = 256 bits = 32 bytes of output
if (str.size() != 52 || !lokimq::is_base32z(str) || !(str.back() == 'o' || str.back() == 'y'))
return false;
lokimq::from_base32z(str.begin(), str.end(), begin());
return true;
}
} // namespace service
} // namespace llarp
dht::Key_t
Address::ToKey() const
{
PubKey k;
CryptoManager::instance()->derive_subkey(k, PubKey(data()), 1);
return dht::Key_t{k.as_array()};
}
} // namespace llarp::service

@ -647,24 +647,65 @@ namespace llarp
}
}
bool
Endpoint::SelectHop(
llarp_nodedb* db,
const std::set<RouterID>& prev,
RouterContact& cur,
size_t hop,
path::PathRole roles)
std::optional<std::vector<RouterContact>>
Endpoint::GetHopsForBuild()
{
std::unordered_set<RouterID> exclude;
ForEachPath([&exclude](auto path) { exclude.insert(path->Endpoint()); });
const auto maybe = m_router->nodedb()->GetIf(
[exclude](const auto& rc) -> bool { return exclude.count(rc.pubkey) == 0; });
if (not maybe.has_value())
return std::nullopt;
return GetHopsForBuildWithEndpoint(maybe->pubkey);
}
std::optional<std::vector<RouterContact>>
Endpoint::GetHopsForBuildWithEndpoint(RouterID endpoint)
{
std::set<RouterID> exclude = prev;
for (const auto& snode : SnodeBlacklist())
exclude.insert(snode);
if (hop == numHops - 1 and numHops > 1)
std::vector<RouterContact> hops;
// get first hop
if (const auto maybe = SelectFirstHop(); maybe.has_value())
{
// diversify endpoints
ForEachPath([&exclude](const path::Path_ptr& path) { exclude.insert(path->Endpoint()); });
hops.emplace_back(*maybe);
}
return path::Builder::SelectHop(db, exclude, cur, hop, roles);
else
return std::nullopt;
auto filter =
[endpoint, &hops, blacklist = SnodeBlacklist(), r = m_router](const auto& rc) -> bool {
if (blacklist.count(rc.pubkey) > 0)
return false;
if (r->routerProfiling().IsBadForPath(rc.pubkey))
return false;
for (const auto& hop : hops)
{
if (hop.pubkey == rc.pubkey)
return false;
}
return endpoint != rc.pubkey;
};
for (size_t idx = hops.size(); idx < numHops; ++idx)
{
if (idx + 1 == numHops)
{
if (const auto maybe = m_router->nodedb()->Get(endpoint))
{
hops.emplace_back(*maybe);
}
else
return std::nullopt;
}
else if (const auto maybe = m_router->nodedb()->GetIf(filter); maybe.has_value())
{
hops.emplace_back(*maybe);
}
else
return std::nullopt;
}
return hops;
}
void
@ -712,19 +753,18 @@ namespace llarp
}
void
Endpoint::HandleVerifyGotRouter(dht::GotRouterMessage_constptr msg, llarp_async_verify_rc* j)
Endpoint::HandleVerifyGotRouter(dht::GotRouterMessage_constptr msg, RouterID id, bool valid)
{
auto& pendingRouters = m_state->m_PendingRouters;
auto itr = pendingRouters.find(j->rc.pubkey);
auto itr = pendingRouters.find(id);
if (itr != pendingRouters.end())
{
if (j->valid)
if (valid)
itr->second.InformResult(msg->foundRCs);
else
itr->second.InformResult({});
pendingRouters.erase(itr);
}
delete j;
}
bool
@ -732,16 +772,15 @@ namespace llarp
{
if (not msg->foundRCs.empty())
{
for (const auto& rc : msg->foundRCs)
for (auto rc : msg->foundRCs)
{
llarp_async_verify_rc* job = new llarp_async_verify_rc();
job->nodedb = Router()->nodedb();
job->worker = util::memFn(&AbstractRouter::QueueWork, Router());
job->disk = util::memFn(&AbstractRouter::QueueDiskIO, Router());
job->logic = Router()->logic();
job->hook = std::bind(&Endpoint::HandleVerifyGotRouter, this, msg, std::placeholders::_1);
job->rc = rc;
llarp_nodedb_async_verify(job);
Router()->QueueWork([rc = std::move(rc), logic = Router()->logic(), self = this, msg]() {
bool valid = rc.Verify(llarp::time_now_ms());
LogicCall(logic, [self, valid, rc = std::move(rc), msg]() {
self->Router()->nodedb()->PutIfNewer(rc);
self->HandleVerifyGotRouter(msg, rc.pubkey, valid);
});
});
}
}
else

@ -28,8 +28,6 @@
#define MIN_SHIFT_INTERVAL 5s
#endif
struct llarp_async_verify_rc;
namespace llarp
{
namespace service
@ -364,13 +362,11 @@ namespace llarp
bool
HasExit() const;
bool
SelectHop(
llarp_nodedb* db,
const std::set<RouterID>& prev,
RouterContact& cur,
size_t hop,
path::PathRole roles) override;
std::optional<std::vector<RouterContact>>
GetHopsForBuild() override;
std::optional<std::vector<RouterContact>>
GetHopsForBuildWithEndpoint(RouterID endpoint);
virtual void
PathBuildStarted(path::Path_ptr path) override;
@ -436,7 +432,7 @@ namespace llarp
private:
void
HandleVerifyGotRouter(dht::GotRouterMessage_constptr msg, llarp_async_verify_rc* j);
HandleVerifyGotRouter(dht::GotRouterMessage_constptr msg, RouterID id, bool valid);
bool
OnLookup(const service::Address& addr, std::optional<IntroSet> i, const RouterID& endpoint);

@ -2,393 +2,390 @@
#include <crypto/crypto.hpp>
#include <path/path.hpp>
#include <lokimq/bt_serialize.h>
#include <oxenmq/bt_serialize.h>
namespace lokimq = oxenmq;
namespace llarp
namespace llarp::service
{
namespace service
util::StatusObject
EncryptedIntroSet::ExtractStatus() const
{
util::StatusObject
EncryptedIntroSet::ExtractStatus() const
{
const auto sz = introsetPayload.size();
return {{"location", derivedSigningKey.ToString()},
{"signedAt", to_json(signedAt)},
{"size", sz}};
}
const auto sz = introsetPayload.size();
return {
{"location", derivedSigningKey.ToString()}, {"signedAt", to_json(signedAt)}, {"size", sz}};
}
bool
EncryptedIntroSet::BEncode(llarp_buffer_t* buf) const
{
if (not bencode_start_dict(buf))
return false;
if (not BEncodeWriteDictEntry("d", derivedSigningKey, buf))
return false;
if (not BEncodeWriteDictEntry("n", nounce, buf))
return false;
if (not BEncodeWriteDictInt("s", signedAt.count(), buf))
return false;
if (not bencode_write_bytestring(buf, "x", 1))
return false;
if (not bencode_write_bytestring(buf, introsetPayload.data(), introsetPayload.size()))
return false;
if (not BEncodeWriteDictEntry("z", sig, buf))
return false;
return bencode_end(buf);
}
bool
EncryptedIntroSet::BEncode(llarp_buffer_t* buf) const
bool
EncryptedIntroSet::DecodeKey(const llarp_buffer_t& key, llarp_buffer_t* buf)
{
bool read = false;
if (key == "x")
{
if (not bencode_start_dict(buf))
llarp_buffer_t strbuf;
if (not bencode_read_string(buf, &strbuf))
return false;
if (not BEncodeWriteDictEntry("d", derivedSigningKey, buf))
if (strbuf.sz > MAX_INTROSET_SIZE)
return false;
if (not BEncodeWriteDictEntry("n", nounce, buf))
return false;
if (not BEncodeWriteDictInt("s", signedAt.count(), buf))
return false;
if (not bencode_write_bytestring(buf, "x", 1))
return false;
if (not bencode_write_bytestring(buf, introsetPayload.data(), introsetPayload.size()))
return false;
if (not BEncodeWriteDictEntry("z", sig, buf))
return false;
return bencode_end(buf);
introsetPayload.resize(strbuf.sz);
std::copy_n(strbuf.base, strbuf.sz, introsetPayload.data());
return true;
}
if (not BEncodeMaybeReadDictEntry("d", derivedSigningKey, read, key, buf))
return false;
bool
EncryptedIntroSet::DecodeKey(const llarp_buffer_t& key, llarp_buffer_t* buf)
{
bool read = false;
if (key == "x")
{
llarp_buffer_t strbuf;
if (not bencode_read_string(buf, &strbuf))
return false;
if (strbuf.sz > MAX_INTROSET_SIZE)
return false;
introsetPayload.resize(strbuf.sz);
std::copy_n(strbuf.base, strbuf.sz, introsetPayload.data());
return true;
}
if (not BEncodeMaybeReadDictEntry("d", derivedSigningKey, read, key, buf))
return false;
if (not BEncodeMaybeReadDictEntry("n", nounce, read, key, buf))
return false;
if (not BEncodeMaybeReadDictEntry("n", nounce, read, key, buf))
return false;
if (not BEncodeMaybeReadDictInt("s", signedAt, read, key, buf))
return false;
if (not BEncodeMaybeReadDictInt("s", signedAt, read, key, buf))
return false;
if (not BEncodeMaybeReadDictEntry("z", sig, read, key, buf))
return false;
return read;
}
if (not BEncodeMaybeReadDictEntry("z", sig, read, key, buf))
return false;
return read;
}
bool
EncryptedIntroSet::OtherIsNewer(const EncryptedIntroSet& other) const
{
return signedAt < other.signedAt;
}
bool
EncryptedIntroSet::OtherIsNewer(const EncryptedIntroSet& other) const
{
return signedAt < other.signedAt;
}
std::ostream&
EncryptedIntroSet::print(std::ostream& out, int levels, int spaces) const
{
Printer printer(out, levels, spaces);
printer.printAttribute("d", derivedSigningKey);
printer.printAttribute("n", nounce);
printer.printAttribute("s", signedAt.count());
printer.printAttribute("x", "[" + std::to_string(introsetPayload.size()) + " bytes]");
printer.printAttribute("z", sig);
return out;
}
std::optional<IntroSet>
EncryptedIntroSet::MaybeDecrypt(const PubKey& root) const
{
SharedSecret k(root);
IntroSet i;
std::vector<byte_t> payload = introsetPayload;
llarp_buffer_t buf(payload);
CryptoManager::instance()->xchacha20(buf, k, nounce);
if (not i.BDecode(&buf))
return {};
return i;
}
bool
EncryptedIntroSet::IsExpired(llarp_time_t now) const
{
return now >= signedAt + path::default_lifetime;
}
std::ostream&
EncryptedIntroSet::print(std::ostream& out, int levels, int spaces) const
{
Printer printer(out, levels, spaces);
printer.printAttribute("d", derivedSigningKey);
printer.printAttribute("n", nounce);
printer.printAttribute("s", signedAt.count());
printer.printAttribute("x", "[" + std::to_string(introsetPayload.size()) + " bytes]");
printer.printAttribute("z", sig);
return out;
}
bool
EncryptedIntroSet::Sign(const PrivateKey& k)
{
signedAt = llarp::time_now_ms();
if (not k.toPublic(derivedSigningKey))
return false;
sig.Zero();
std::array<byte_t, MAX_INTROSET_SIZE + 128> tmp;
llarp_buffer_t buf(tmp);
if (not BEncode(&buf))
return false;
buf.sz = buf.cur - buf.base;
buf.cur = buf.base;
if (not CryptoManager::instance()->sign(sig, k, buf))
return false;
LogDebug("signed encrypted introset: ", *this);
return true;
}
std::optional<IntroSet>
EncryptedIntroSet::MaybeDecrypt(const PubKey& root) const
{
SharedSecret k(root);
IntroSet i;
std::vector<byte_t> payload = introsetPayload;
llarp_buffer_t buf(payload);
CryptoManager::instance()->xchacha20(buf, k, nounce);
if (not i.BDecode(&buf))
return {};
return i;
}
bool
EncryptedIntroSet::Verify(llarp_time_t now) const
{
if (IsExpired(now))
return false;
std::array<byte_t, MAX_INTROSET_SIZE + 128> tmp;
llarp_buffer_t buf(tmp);
EncryptedIntroSet copy(*this);
copy.sig.Zero();
if (not copy.BEncode(&buf))
return false;
LogDebug("verify encrypted introset: ", copy, " sig = ", sig);
buf.sz = buf.cur - buf.base;
buf.cur = buf.base;
return CryptoManager::instance()->verify(derivedSigningKey, buf, sig);
}
util::StatusObject
IntroSet::ExtractStatus() const
{
util::StatusObject obj{{"published", to_json(T)}};
std::vector<util::StatusObject> introsObjs;
std::transform(
I.begin(),
I.end(),
std::back_inserter(introsObjs),
[](const auto& intro) -> util::StatusObject { return intro.ExtractStatus(); });
obj["intros"] = introsObjs;
if (!topic.IsZero())
obj["topic"] = topic.ToString();
return obj;
}
bool
IntroSet::DecodeKey(const llarp_buffer_t& key, llarp_buffer_t* buf)
{
bool read = false;
if (!BEncodeMaybeReadDictEntry("a", A, read, key, buf))
return false;
bool
EncryptedIntroSet::IsExpired(llarp_time_t now) const
if (key == "i")
{
return now >= signedAt + path::default_lifetime;
return BEncodeReadList(I, buf);
}
if (!BEncodeMaybeReadDictEntry("k", K, read, key, buf))
return false;
bool
EncryptedIntroSet::Sign(const PrivateKey& k)
{
signedAt = llarp::time_now_ms();
if (not k.toPublic(derivedSigningKey))
return false;
sig.Zero();
std::array<byte_t, MAX_INTROSET_SIZE + 128> tmp;
llarp_buffer_t buf(tmp);
if (not BEncode(&buf))
return false;
buf.sz = buf.cur - buf.base;
buf.cur = buf.base;
if (not CryptoManager::instance()->sign(sig, k, buf))
return false;
LogDebug("signed encrypted introset: ", *this);
return true;
}
if (!BEncodeMaybeReadDictEntry("n", topic, read, key, buf))
return false;
bool
EncryptedIntroSet::Verify(llarp_time_t now) const
if (key == "s")
{
if (IsExpired(now))
return false;
std::array<byte_t, MAX_INTROSET_SIZE + 128> tmp;
llarp_buffer_t buf(tmp);
EncryptedIntroSet copy(*this);
copy.sig.Zero();
if (not copy.BEncode(&buf))
byte_t* begin = buf->cur;
if (not bencode_discard(buf))
return false;
LogDebug("verify encrypted introset: ", copy, " sig = ", sig);
buf.sz = buf.cur - buf.base;
buf.cur = buf.base;
return CryptoManager::instance()->verify(derivedSigningKey, buf, sig);
}
util::StatusObject
IntroSet::ExtractStatus() const
{
util::StatusObject obj{{"published", to_json(T)}};
std::vector<util::StatusObject> introsObjs;
std::transform(
I.begin(),
I.end(),
std::back_inserter(introsObjs),
[](const auto& intro) -> util::StatusObject { return intro.ExtractStatus(); });
obj["intros"] = introsObjs;
if (!topic.IsZero())
obj["topic"] = topic.ToString();
return obj;
}
byte_t* end = buf->cur;
bool
IntroSet::DecodeKey(const llarp_buffer_t& key, llarp_buffer_t* buf)
{
bool read = false;
if (!BEncodeMaybeReadDictEntry("a", A, read, key, buf))
return false;
std::string_view srvString(reinterpret_cast<char*>(begin), end - begin);
if (key == "i")
try
{
return BEncodeReadList(I, buf);
lokimq::bt_deserialize(srvString, SRVs);
}
if (!BEncodeMaybeReadDictEntry("k", K, read, key, buf))
catch (const lokimq::bt_deserialize_invalid& err)
{
LogError("Error decoding SRV records from IntroSet: ", err.what());
return false;
}
read = true;
}
if (!BEncodeMaybeReadDictEntry("n", topic, read, key, buf))
return false;
if (!BEncodeMaybeReadDictInt("t", T, read, key, buf))
return false;
if (key == "s")
{
byte_t* begin = buf->cur;
if (not bencode_discard(buf))
return false;
if (key == "w")
{
W.emplace();
return bencode_decode_dict(*W, buf);
}
byte_t* end = buf->cur;
if (!BEncodeMaybeReadDictInt("v", version, read, key, buf))
return false;
std::string_view srvString(reinterpret_cast<char*>(begin), end - begin);
if (!BEncodeMaybeReadDictEntry("z", Z, read, key, buf))
return false;
try
{
lokimq::bt_deserialize(srvString, SRVs);
}
catch (const lokimq::bt_deserialize_invalid& err)
{
LogError("Error decoding SRV records from IntroSet: ", err.what());
return false;
}
read = true;
}
if (read)
return true;
if (!BEncodeMaybeReadDictInt("t", T, read, key, buf))
return false;
return bencode_discard(buf);
}
if (key == "w")
{
W.emplace();
return bencode_decode_dict(*W, buf);
}
bool
IntroSet::BEncode(llarp_buffer_t* buf) const
{
if (!bencode_start_dict(buf))
return false;
if (!BEncodeWriteDictEntry("a", A, buf))
return false;
// start introduction list
if (!bencode_write_bytestring(buf, "i", 1))
return false;
if (!BEncodeWriteList(I.begin(), I.end(), buf))
return false;
// end introduction list
if (!BEncodeMaybeReadDictInt("v", version, read, key, buf))
return false;
// pq pubkey
if (!BEncodeWriteDictEntry("k", K, buf))
return false;
if (!BEncodeMaybeReadDictEntry("z", Z, read, key, buf))
// topic tag
if (topic.ToString().size())
{
if (!BEncodeWriteDictEntry("n", topic, buf))
return false;
if (read)
return true;
return bencode_discard(buf);
}
bool
IntroSet::BEncode(llarp_buffer_t* buf) const
if (SRVs.size())
{
if (!bencode_start_dict(buf))
return false;
if (!BEncodeWriteDictEntry("a", A, buf))
return false;
// start introduction list
if (!bencode_write_bytestring(buf, "i", 1))
std::string serial = lokimq::bt_serialize(SRVs);
if (!bencode_write_bytestring(buf, "s", 1))
return false;
if (!BEncodeWriteList(I.begin(), I.end(), buf))
if (!buf->write(serial.begin(), serial.end()))
return false;
// end introduction list
}
// Timestamp published
if (!BEncodeWriteDictInt("t", T.count(), buf))
return false;
// pq pubkey
if (!BEncodeWriteDictEntry("k", K, buf))
// write version
if (!BEncodeWriteDictInt("v", version, buf))
return false;
if (W)
{
if (!BEncodeWriteDictEntry("w", *W, buf))
return false;
}
if (!BEncodeWriteDictEntry("z", Z, buf))
return false;
// topic tag
if (topic.ToString().size())
{
if (!BEncodeWriteDictEntry("n", topic, buf))
return false;
}
return bencode_end(buf);
}
if (SRVs.size())
{
std::string serial = lokimq::bt_serialize(SRVs);
if (!bencode_write_bytestring(buf, "s", 1))
return false;
if (!buf->write(serial.begin(), serial.end()))
return false;
}
bool
IntroSet::HasExpiredIntros(llarp_time_t now) const
{
for (const auto& i : I)
if (now >= i.expiresAt)
return true;
return false;
}
// Timestamp published
if (!BEncodeWriteDictInt("t", T.count(), buf))
return false;
bool
IntroSet::IsExpired(llarp_time_t now) const
{
return GetNewestIntroExpiration() < now;
}
// write version
if (!BEncodeWriteDictInt("v", version, buf))
return false;
if (W)
std::vector<llarp::dns::SRVData>
IntroSet::GetMatchingSRVRecords(std::string_view service_proto) const
{
std::vector<llarp::dns::SRVData> records;
for (const auto& tuple : SRVs)
{
if (std::get<0>(tuple) == service_proto)
{
if (!BEncodeWriteDictEntry("w", *W, buf))
return false;
records.push_back(llarp::dns::SRVData::fromTuple(tuple));
}
if (!BEncodeWriteDictEntry("z", Z, buf))
return false;
return bencode_end(buf);
}
bool
IntroSet::HasExpiredIntros(llarp_time_t now) const
return records;
}
bool
IntroSet::Verify(llarp_time_t now) const
{
std::array<byte_t, MAX_INTROSET_SIZE> tmp;
llarp_buffer_t buf(tmp);
IntroSet copy;
copy = *this;
copy.Z.Zero();
if (!copy.BEncode(&buf))
{
for (const auto& i : I)
if (now >= i.expiresAt)
return true;
return false;
}
bool
IntroSet::IsExpired(llarp_time_t now) const
// rewind and resize buffer
buf.sz = buf.cur - buf.base;
buf.cur = buf.base;
if (!A.Verify(buf, Z))
{
return GetNewestIntroExpiration() < now;
return false;
}
std::vector<llarp::dns::SRVData>
IntroSet::GetMatchingSRVRecords(std::string_view service_proto) const
// validate PoW
if (W && !W->IsValid(now))
{
std::vector<llarp::dns::SRVData> records;
for (const auto& tuple : SRVs)
{
if (std::get<0>(tuple) == service_proto)
{
records.push_back(llarp::dns::SRVData::fromTuple(tuple));
}
}
return records;
return false;
}
bool
IntroSet::Verify(llarp_time_t now) const
// valid timestamps
// add max clock skew
now += MAX_INTROSET_TIME_DELTA;
for (const auto& intro : I)
{
std::array<byte_t, MAX_INTROSET_SIZE> tmp;
llarp_buffer_t buf(tmp);
IntroSet copy;
copy = *this;
copy.Z.Zero();
if (!copy.BEncode(&buf))
{
return false;
}
// rewind and resize buffer
buf.sz = buf.cur - buf.base;
buf.cur = buf.base;
if (!A.Verify(buf, Z))
{
return false;
}
// validate PoW
if (W && !W->IsValid(now))
if (intro.expiresAt > now && intro.expiresAt - now > path::default_lifetime)
{
return false;
}
// valid timestamps
// add max clock skew
now += MAX_INTROSET_TIME_DELTA;
for (const auto& intro : I)
{
if (intro.expiresAt > now && intro.expiresAt - now > path::default_lifetime)
if (!W)
{
if (!W)
{
LogWarn("intro has too high expire time");
return false;
}
if (intro.expiresAt - W->extendedLifetime > path::default_lifetime)
{
return false;
}
LogWarn("intro has too high expire time");
return false;
}
if (intro.expiresAt - W->extendedLifetime > path::default_lifetime)
{
return false;
}
}
if (IsExpired(now))
{
LogWarn("introset expired: ", *this);
return false;
}
return true;
}
llarp_time_t
IntroSet::GetNewestIntroExpiration() const
if (IsExpired(now))
{
llarp_time_t t = 0s;
for (const auto& intro : I)
t = std::max(intro.expiresAt, t);
return t;
LogWarn("introset expired: ", *this);
return false;
}
return true;
}
std::ostream&
IntroSet::print(std::ostream& stream, int level, int spaces) const
{
Printer printer(stream, level, spaces);
printer.printAttribute("A", A);
printer.printAttribute("I", I);
printer.printAttribute("K", K);
llarp_time_t
IntroSet::GetNewestIntroExpiration() const
{
llarp_time_t t = 0s;
for (const auto& intro : I)
t = std::max(intro.expiresAt, t);
return t;
}
std::ostream&
IntroSet::print(std::ostream& stream, int level, int spaces) const
{
Printer printer(stream, level, spaces);
printer.printAttribute("A", A);
printer.printAttribute("I", I);
printer.printAttribute("K", K);
std::string _topic = topic.ToString();
std::string _topic = topic.ToString();
if (!_topic.empty())
{
printer.printAttribute("topic", _topic);
}
else
{
printer.printAttribute("topic", topic);
}
printer.printAttribute("T", T.count());
if (W)
{
printer.printAttribute("W", *W);
}
else
{
printer.printAttribute("W", "NULL");
}
printer.printAttribute("V", version);
printer.printAttribute("Z", Z);
if (!_topic.empty())
{
printer.printAttribute("topic", _topic);
}
else
{
printer.printAttribute("topic", topic);
}
return stream;
printer.printAttribute("T", T.count());
if (W)
{
printer.printAttribute("W", *W);
}
} // namespace service
} // namespace llarp
else
{
printer.printAttribute("W", "NULL");
}
printer.printAttribute("V", version);
printer.printAttribute("Z", Z);
return stream;
}
} // namespace llarp::service

@ -334,33 +334,16 @@ namespace llarp
: (now >= createdAt && now - createdAt > connectTimeout);
}
bool
OutboundContext::SelectHop(
llarp_nodedb* db,
const std::set<RouterID>& prev,
RouterContact& cur,
size_t hop,
path::PathRole roles)
std::optional<std::vector<RouterContact>>
OutboundContext::GetHopsForBuild()
{
if (m_NextIntro.router.IsZero() || prev.count(m_NextIntro.router))
if (m_NextIntro.router.IsZero())
{
ShiftIntroduction(false);
}
if (m_NextIntro.router.IsZero())
return false;
std::set<RouterID> exclude = prev;
exclude.insert(m_NextIntro.router);
for (const auto& snode : m_Endpoint->SnodeBlacklist())
exclude.insert(snode);
if (hop == numHops - 1)
{
m_Endpoint->EnsureRouterIsKnown(m_NextIntro.router);
if (db->Get(m_NextIntro.router, cur))
return true;
++m_BuildFails;
return false;
}
return path::Builder::SelectHop(db, exclude, cur, hop, roles);
return std::nullopt;
return GetHopsAlignedToForBuild(m_NextIntro.router);
}
bool

@ -106,13 +106,8 @@ namespace llarp
void
HandlePathBuildFailed(path::Path_ptr path) override;
bool
SelectHop(
llarp_nodedb* db,
const std::set<RouterID>& prev,
RouterContact& cur,
size_t hop,
path::PathRole roles) override;
std::optional<std::vector<RouterContact>>
GetHopsForBuild() override;
bool
HandleHiddenServiceFrame(path::Path_ptr p, const ProtocolFrame& frame);

@ -6,7 +6,7 @@
#include <util/meta/traits.hpp>
#include <util/printer.hpp>
#include <lokimq/hex.h>
#include <oxenmq/hex.h>
#include <array>
#include <cstddef>
@ -71,7 +71,7 @@ namespace llarp
friend std::ostream&
operator<<(std::ostream& out, const AlignedBuffer& self)
{
return out << lokimq::to_hex(self.begin(), self.end());
return out << oxenmq::to_hex(self.begin(), self.end());
}
/// bitwise NOT
@ -261,21 +261,21 @@ namespace llarp
std::string
ToHex() const
{
return lokimq::to_hex(begin(), end());
return oxenmq::to_hex(begin(), end());
}
std::string
ShortHex() const
{
return lokimq::to_hex(begin(), begin() + 4);
return oxenmq::to_hex(begin(), begin() + 4);
}
bool
FromHex(std::string_view str)
{
if (str.size() != 2 * size() || !lokimq::is_hex(str))
if (str.size() != 2 * size() || !oxenmq::is_hex(str))
return false;
lokimq::from_hex(str.begin(), str.end(), begin());
oxenmq::from_hex(str.begin(), str.end(), begin());
return true;
}

@ -14,23 +14,23 @@
namespace llarp::vpn
{
std::unique_ptr<Platform>
std::shared_ptr<Platform>
MakeNativePlatform(llarp::Context* ctx)
{
(void)ctx;
std::unique_ptr<Platform> plat;
std::shared_ptr<Platform> plat;
#ifdef _WIN32
plat = std::make_unique<vpn::Win32Platform>();
plat = std::make_shared<vpn::Win32Platform>();
#endif
#ifdef __linux__
#ifdef ANDROID
plat = std::make_unique<vpn::AndroidPlatform>();
plat = std::make_shared<vpn::AndroidPlatform>();
#else
plat = std::make_unique<vpn::LinuxPlatform>();
plat = std::make_shared<vpn::LinuxPlatform>();
#endif
#endif
#ifdef __APPLE__
plat = std::make_unique<vpn::ApplePlatform>();
plat = std::make_shared<vpn::ApplePlatform>();
#endif
return plat;
}

@ -121,7 +121,7 @@ namespace llarp::vpn
{
std::atomic<bool> m_Run;
HANDLE m_Device, m_IOCP;
std::vector<HANDLE> m_Threads;
std::vector<std::thread> m_Threads;
thread::Queue<net::IPPacket> m_ReadQueue;
const InterfaceInfo m_Info;
@ -334,7 +334,7 @@ namespace llarp::vpn
CloseHandle(m_Device);
// close the reader threads
for (auto& thread : m_Threads)
CloseHandle(thread);
thread.join();
}
int
@ -355,21 +355,22 @@ namespace llarp::vpn
return "";
}
static DWORD FAR PASCAL
Loop(void* u)
{
static_cast<Win32Interface*>(u)->ReadLoop();
return 0;
}
void
Start()
{
m_Run = true;
const auto numThreads = std::thread::hardware_concurrency();
m_IOCP = CreateIoCompletionPort(m_Device, nullptr, (ULONG_PTR)this, 1 + numThreads);
// allocate packets
for (size_t idx = 0; idx < numThreads; ++idx)
m_Threads.push_back(CreateThread(nullptr, 0, &Loop, this, 0, nullptr));
m_Packets.emplace_back(new asio_evt_pkt{true});
// create completion port
m_IOCP = CreateIoCompletionPort(INVALID_HANDLE_VALUE, nullptr, 0, 0);
// attach the handle or some shit
CreateIoCompletionPort(m_Device, m_IOCP, 0, 0);
// spawn reader threads
for (size_t idx = 0; idx < numThreads; ++idx)
m_Threads.emplace_back([this, idx]() { ReadLoop(idx); });
}
net::IPPacket
@ -394,6 +395,8 @@ namespace llarp::vpn
}
};
std::vector<std::unique_ptr<asio_evt_pkt>> m_Packets;
bool
WritePacket(net::IPPacket pkt)
{
@ -406,9 +409,9 @@ namespace llarp::vpn
}
void
ReadLoop()
ReadLoop(size_t packetIndex)
{
std::unique_ptr<asio_evt_pkt> ev = std::make_unique<asio_evt_pkt>(true);
auto& ev = m_Packets[packetIndex];
ev->Read(m_Device);
while (m_Run)
{

@ -11,18 +11,18 @@ static const llarp::RuntimeOptions opts = {.background = false, .debug = false,
std::shared_ptr<llarp::Context>
make_context()
{
llarp::Config conf{fs::current_path()};
conf.Load(std::nullopt, true);
auto conf = std::make_shared<llarp::Config>(fs::current_path());
conf->Load(std::nullopt, true);
// set testing defaults
conf.network.m_endpointType = "null";
conf.bootstrap.seednode = true;
conf.api.m_enableRPCServer = false;
conf.lokid.whitelistRouters = false;
conf.router.m_publicAddress = llarp::IpAddress("1.1.1.1");
conf->network.m_endpointType = "null";
conf->bootstrap.seednode = true;
conf->api.m_enableRPCServer = false;
conf->lokid.whitelistRouters = false;
conf->router.m_publicAddress = llarp::IpAddress("1.1.1.1");
// make a fake inbound link
conf.links.m_InboundLinks.emplace_back();
auto& link = conf.links.m_InboundLinks.back();
conf->links.m_InboundLinks.emplace_back();
auto& link = conf->links.m_InboundLinks.back();
link.interface = "0.0.0.0";
link.addressFamily = AF_INET;
link.port = 0;

@ -4,23 +4,25 @@
#include <router_contact.hpp>
#include <nodedb.hpp>
using llarp_nodedb = llarp::NodeDB;
TEST_CASE("FindClosestTo returns correct number of elements", "[nodedb][dht]")
{
llarp_nodedb nodeDB("", nullptr);
llarp_nodedb nodeDB{fs::current_path(), nullptr};
constexpr uint64_t numRCs = 3;
for (uint64_t i = 0; i < numRCs; ++i)
{
llarp::RouterContact rc;
rc.pubkey[0] = i;
nodeDB.Insert(rc);
nodeDB.Put(rc);
}
REQUIRE(numRCs == nodeDB.num_loaded());
REQUIRE(numRCs == nodeDB.NumLoaded());
llarp::dht::Key_t key;
std::vector<llarp::RouterContact> results = nodeDB.FindClosestTo(key, 4);
std::vector<llarp::RouterContact> results = nodeDB.FindManyClosestTo(key, 4);
// we asked for more entries than nodedb had
REQUIRE(numRCs == results.size());
@ -28,26 +30,26 @@ TEST_CASE("FindClosestTo returns correct number of elements", "[nodedb][dht]")
TEST_CASE("FindClosestTo returns properly ordered set", "[nodedb][dht]")
{
llarp_nodedb nodeDB("", nullptr);
llarp_nodedb nodeDB{fs::current_path(), nullptr};
// insert some RCs: a < b < c
llarp::RouterContact a;
a.pubkey[0] = 1;
nodeDB.Insert(a);
nodeDB.Put(a);
llarp::RouterContact b;
b.pubkey[0] = 2;
nodeDB.Insert(b);
nodeDB.Put(b);
llarp::RouterContact c;
c.pubkey[0] = 3;
nodeDB.Insert(c);
nodeDB.Put(c);
REQUIRE(3 == nodeDB.num_loaded());
REQUIRE(3 == nodeDB.NumLoaded());
llarp::dht::Key_t key;
std::vector<llarp::RouterContact> results = nodeDB.FindClosestTo(key, 2);
std::vector<llarp::RouterContact> results = nodeDB.FindManyClosestTo(key, 2);
REQUIRE(2 == results.size());
// we xor'ed with 0x0, so order should be a,b,c
@ -57,7 +59,7 @@ TEST_CASE("FindClosestTo returns properly ordered set", "[nodedb][dht]")
llarp::dht::Key_t compKey;
compKey.Fill(0xFF);
results = nodeDB.FindClosestTo(compKey, 2);
results = nodeDB.FindManyClosestTo(compKey, 2);
// we xor'ed with 0xF...F, so order should be inverted (c,b,a)
REQUIRE(c.pubkey == results[0].pubkey);

@ -11,12 +11,12 @@ llarp::RuntimeOptions opts = {false, false, false};
static std::shared_ptr<llarp::Context>
make_context(std::optional<fs::path> keyfile)
{
llarp::Config conf{fs::current_path()};
conf.Load(std::nullopt, opts.isRouter);
conf.network.m_endpointType = "null";
conf.network.m_keyfile = keyfile;
conf.bootstrap.seednode = true;
conf.api.m_enableRPCServer = false;
auto conf = std::make_shared<llarp::Config>(fs::current_path());
conf->Load(std::nullopt, opts.isRouter);
conf->network.m_endpointType = "null";
conf->network.m_keyfile = keyfile;
conf->bootstrap.seednode = true;
conf->api.m_enableRPCServer = false;
auto context = std::make_shared<llarp::Context>();
REQUIRE_NOTHROW(context->Configure(std::move(conf)));

Loading…
Cancel
Save