diff --git a/CMakeLists.txt b/CMakeLists.txt index 2f69162d2..727c48525 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -162,16 +162,16 @@ endif() option(FORCE_LOKIMQ_SUBMODULE "force using lokimq submodule" OFF) if(NOT FORCE_LOKIMQ_SUBMODULE) - pkg_check_modules(LOKIMQ liblokimq>=1.2.2) + pkg_check_modules(LOKIMQ liboxenmq>=1.2.2) endif() if(LOKIMQ_FOUND) - add_library(lokimq INTERFACE) - link_dep_libs(lokimq INTERFACE "${LOKIMQ_LIBRARY_DIRS}" ${LOKIMQ_LIBRARIES}) - target_include_directories(lokimq INTERFACE ${LOKIMQ_INCLUDE_DIRS}) - add_library(lokimq::lokimq ALIAS lokimq) - message(STATUS "Found system liblokimq ${LOKIMQ_VERSION}") + add_library(oxenmq INTERFACE) + link_dep_libs(oxenmq INTERFACE "${LOKIMQ_LIBRARY_DIRS}" ${LOKIMQ_LIBRARIES}) + target_include_directories(oxenmq INTERFACE ${LOKIMQ_INCLUDE_DIRS}) + add_library(oxenmq::oxenmq ALIAS oxenmq) + message(STATUS "Found system liboxenmq ${LOKIMQ_VERSION}") else() - message(STATUS "using lokimq submodule") + message(STATUS "using oxenmq submodule") add_subdirectory(${CMAKE_SOURCE_DIR}/external/loki-mq) endif() diff --git a/cmake/StaticBuild.cmake b/cmake/StaticBuild.cmake index e16591f43..285c679f6 100644 --- a/cmake/StaticBuild.cmake +++ b/cmake/StaticBuild.cmake @@ -41,11 +41,11 @@ set(SODIUM_SOURCE libsodium-${SODIUM_VERSION}.tar.gz) set(SODIUM_HASH SHA512=17e8638e46d8f6f7d024fe5559eccf2b8baf23e143fadd472a7d29d228b186d86686a5e6920385fe2020729119a5f12f989c3a782afbd05a8db4819bb18666ef CACHE STRING "libsodium source hash") -set(ZMQ_VERSION 4.3.3 CACHE STRING "libzmq version") +set(ZMQ_VERSION 4.3.4 CACHE STRING "libzmq version") set(ZMQ_MIRROR ${LOCAL_MIRROR} https://github.com/zeromq/libzmq/releases/download/v${ZMQ_VERSION} CACHE STRING "libzmq mirror(s)") set(ZMQ_SOURCE zeromq-${ZMQ_VERSION}.tar.gz) -set(ZMQ_HASH SHA512=4c18d784085179c5b1fcb753a93813095a12c8d34970f2e1bfca6499be6c9d67769c71c68b7ca54ff181b20390043170e89733c22f76ff1ea46494814f7095b1 +set(ZMQ_HASH SHA512=e198ef9f82d392754caadd547537666d4fba0afd7d027749b3adae450516bcf284d241d4616cad3cb4ad9af8c10373d456de92dc6d115b037941659f141e7c0e CACHE STRING "libzmq source hash") set(LIBUV_VERSION 1.40.0 CACHE STRING "libuv version") @@ -283,10 +283,7 @@ endif() if(CMAKE_CROSSCOMPILING AND ARCH_TRIPLET MATCHES mingw) set(zmq_patch - PATCH_COMMAND ${PROJECT_SOURCE_DIR}/contrib/apply-patches.sh ${PROJECT_SOURCE_DIR}/contrib/patches/libzmq-mingw-wepoll.patch) - if(ZMQ_VERSION VERSION_LESS 4.3.4) - set(zmq_patch ${zmq_patch} ${PROJECT_SOURCE_DIR}/contrib/patches/libzmq-mingw-closesocket.patch) - endif() + PATCH_COMMAND ${PROJECT_SOURCE_DIR}/contrib/apply-patches.sh ${PROJECT_SOURCE_DIR}/contrib/patches/libzmq-mingw-wepoll.patch ${PROJECT_SOURCE_DIR}/contrib/patches/libzmq-mingw-closesocket.patch) endif() build_external(zmq diff --git a/contrib/windows.sh b/contrib/windows.sh index f55e7fcff..6107cc464 100755 --- a/contrib/windows.sh +++ b/contrib/windows.sh @@ -1,5 +1,5 @@ #!/bin/bash mkdir -p build-windows cd build-windows -cmake -G Ninja -DCMAKE_CROSSCOMPILE=ON -DCMAKE_EXE_LINKER_FLAGS=-fstack-protector -DCMAKE_CXX_FLAGS=-fdiagnostics-color=always -DCMAKE_TOOLCHAIN_FILE=../contrib/cross/mingw64.cmake -DBUILD_STATIC_DEPS=ON -DBUILD_PACKAGE=ON -DBUILD_SHARED_LIBS=OFF -DBUILD_TESTING=OFF -DWITH_TESTS=OFF -DNATIVE_BUILD=OFF -DSTATIC_LINK=ON -DWITH_SYSTEMD=OFF -DFORCE_LOKIMQ_SUBMODULE=ON -DSUBMODULE_CHECK=OFF -DWITH_LTO=OFF -DCMAKE_BUILD_TYPE=Release -DCMAKE_CROSSCOMPLING=ON .. +cmake -G Ninja -DCMAKE_EXE_LINKER_FLAGS=-fstack-protector -DCMAKE_CXX_FLAGS=-fdiagnostics-color=always -DCMAKE_TOOLCHAIN_FILE=../contrib/cross/mingw64.cmake -DBUILD_STATIC_DEPS=ON -DBUILD_PACKAGE=ON -DBUILD_SHARED_LIBS=OFF -DBUILD_TESTING=OFF -DWITH_TESTS=OFF -DNATIVE_BUILD=OFF -DSTATIC_LINK=ON -DWITH_SYSTEMD=OFF -DFORCE_LOKIMQ_SUBMODULE=ON -DSUBMODULE_CHECK=OFF -DWITH_LTO=OFF -DCMAKE_BUILD_TYPE=Release .. ninja package diff --git a/daemon/main.cpp b/daemon/main.cpp index c6153fc90..d57de2de2 100644 --- a/daemon/main.cpp +++ b/daemon/main.cpp @@ -259,21 +259,21 @@ run_main_context(std::optional confFile, const llarp::RuntimeOptions o llarp::LogTrace("start of run_main_context()"); try { - std::unique_ptr conf; + std::shared_ptr conf; if (confFile.has_value()) { llarp::LogInfo("Using config file: ", *confFile); - conf = std::make_unique(confFile->parent_path()); + conf = std::make_shared(confFile->parent_path()); } else { - conf = std::make_unique(llarp::GetDefaultDataDir()); + conf = std::make_shared(llarp::GetDefaultDataDir()); } if (!conf->Load(confFile, opts.isRouter)) throw std::runtime_error{"Config file parsing failed"}; ctx = std::make_shared(); - ctx->Configure(*conf); + ctx->Configure(std::move(conf)); signal(SIGINT, handle_signal); signal(SIGTERM, handle_signal); diff --git a/include/llarp.hpp b/include/llarp.hpp index f529d3c75..5944f7fcc 100644 --- a/include/llarp.hpp +++ b/include/llarp.hpp @@ -1,27 +1,31 @@ #ifndef LLARP_HPP #define LLARP_HPP #include -#include -#include -#include -#include -#include -#include -#include +#include #include #include #include #include #include -struct llarp_ev_loop; - namespace llarp { + namespace vpn + { + class Platform; + } + class Logic; struct Config; struct RouterContact; + struct Config; + struct Crypto; + struct CryptoManager; + struct AbstractRouter; + struct EventLoop; + class NodeDB; + namespace thread { class ThreadPool; @@ -36,12 +40,12 @@ namespace llarp struct Context { - std::unique_ptr crypto = nullptr; - std::unique_ptr cryptoManager = nullptr; - std::unique_ptr router = nullptr; + std::shared_ptr crypto = nullptr; + std::shared_ptr cryptoManager = nullptr; + std::shared_ptr router = nullptr; std::shared_ptr logic = nullptr; - std::unique_ptr nodedb = nullptr; - llarp_ev_loop_ptr mainloop; + std::shared_ptr nodedb = nullptr; + std::shared_ptr mainloop; std::string nodedb_dir; virtual ~Context() = default; @@ -49,9 +53,6 @@ namespace llarp void Close(); - int - LoadDatabase(); - void Setup(const RuntimeOptions& opts); @@ -62,10 +63,8 @@ namespace llarp HandleSignal(int sig); /// Configure given the specified config. - /// - /// note: consider using std::move() when passing conf in. void - Configure(Config conf); + Configure(std::shared_ptr conf); /// handle SIGHUP void @@ -93,11 +92,11 @@ namespace llarp /// Creates a router. Can be overridden to allow a different class of router /// to be created instead. Defaults to llarp::Router. - virtual std::unique_ptr - makeRouter(llarp_ev_loop_ptr __netloop, std::shared_ptr logic); + virtual std::shared_ptr + makeRouter(std::shared_ptr __netloop, std::shared_ptr logic); /// create the vpn platform for use in creating network interfaces - virtual std::unique_ptr + virtual std::shared_ptr makeVPNPlatform(); protected: diff --git a/llarp/CMakeLists.txt b/llarp/CMakeLists.txt index 71a221e49..1a7ea79d6 100644 --- a/llarp/CMakeLists.txt +++ b/llarp/CMakeLists.txt @@ -39,7 +39,7 @@ target_link_libraries(lokinet-util PUBLIC nlohmann_json::nlohmann_json filesystem date::date - lokimq + oxenmq sqlite3 ) diff --git a/llarp/config/config.cpp b/llarp/config/config.cpp index 58abb8eee..c8279464a 100644 --- a/llarp/config/config.cpp +++ b/llarp/config/config.cpp @@ -21,6 +21,8 @@ #include #include "constants/version.hpp" +namespace lokimq = oxenmq; + namespace llarp { // constants for config file default values diff --git a/llarp/config/config.hpp b/llarp/config/config.hpp index 4f822a9d5..5f561e2cd 100644 --- a/llarp/config/config.hpp +++ b/llarp/config/config.hpp @@ -23,7 +23,7 @@ #include #include -#include +#include namespace llarp { @@ -157,7 +157,7 @@ namespace llarp { bool whitelistRouters = false; fs::path ident_keyfile; - lokimq::address lokidRPCAddr; + oxenmq::address lokidRPCAddr; void defineConfigOptions(ConfigDefinition& conf, const ConfigGenParameters& params); diff --git a/llarp/context.cpp b/llarp/context.cpp index 7ef9f6b1c..a34d67e9c 100644 --- a/llarp/context.cpp +++ b/llarp/context.cpp @@ -28,16 +28,16 @@ namespace llarp } void - Context::Configure(Config conf) + Context::Configure(std::shared_ptr conf) { if (nullptr != config.get()) throw std::runtime_error("Config already exists"); - config = std::make_shared(std::move(conf)); + config = std::move(conf); logic = std::make_shared(); - nodedb_dir = fs::path(config->router.m_dataDir / nodedb_dirname).string(); + nodedb_dir = fs::path{config->router.m_dataDir / nodedb_dirname}.string(); } bool @@ -52,13 +52,6 @@ namespace llarp return router && router->LooksAlive(); } - int - Context::LoadDatabase() - { - llarp_nodedb::ensure_dir(nodedb_dir.c_str()); - return 1; - } - void Context::Setup(const RuntimeOptions& opts) { @@ -77,31 +70,25 @@ namespace llarp mainloop->set_logic(logic); - crypto = std::make_unique(); - cryptoManager = std::make_unique(crypto.get()); + crypto = std::make_shared(); + cryptoManager = std::make_shared(crypto.get()); router = makeRouter(mainloop, logic); - nodedb = std::make_unique( + nodedb = std::make_shared( nodedb_dir, [r = router.get()](auto call) { r->QueueDiskIO(std::move(call)); }); - if (!router->Configure(config, opts.isRouter, nodedb.get())) + if (!router->Configure(config, opts.isRouter, nodedb)) throw std::runtime_error("Failed to configure router"); - - // must be done after router is made so we can use its disk io worker - // must also be done after configure so that netid is properly set if it - // is provided by config - if (!this->LoadDatabase()) - throw std::runtime_error("Config::Setup() failed to load database"); } - std::unique_ptr - Context::makeRouter(llarp_ev_loop_ptr netloop, std::shared_ptr logic) + std::shared_ptr + Context::makeRouter(std::shared_ptr netloop, std::shared_ptr logic) { - return std::make_unique(netloop, logic, makeVPNPlatform()); + return std::make_shared(netloop, logic, makeVPNPlatform()); } - std::unique_ptr + std::shared_ptr Context::makeVPNPlatform() { auto plat = vpn::MakeNativePlatform(this); @@ -195,10 +182,10 @@ namespace llarp config.reset(); llarp::LogDebug("free nodedb"); - nodedb.release(); + nodedb.reset(); llarp::LogDebug("free router"); - router.release(); + router.reset(); llarp::LogDebug("free logic"); logic.reset(); diff --git a/llarp/crypto/types.cpp b/llarp/crypto/types.cpp index 1cf5ca3fd..b84512277 100644 --- a/llarp/crypto/types.cpp +++ b/llarp/crypto/types.cpp @@ -7,7 +7,9 @@ #include -#include +#include + +namespace lokimq = oxenmq; #include #include diff --git a/llarp/dht/context.cpp b/llarp/dht/context.cpp index db409b30b..68fa865db 100644 --- a/llarp/dht/context.cpp +++ b/llarp/dht/context.cpp @@ -228,7 +228,12 @@ namespace llarp bool GetRCFromNodeDB(const Key_t& k, llarp::RouterContact& rc) const override { - return router->nodedb()->Get(k.as_array(), rc); + if (const auto maybe = router->nodedb()->Get(k.as_array()); maybe.has_value()) + { + rc = *maybe; + return true; + } + return false; } PendingIntrosetLookups _pendingIntrosetLookups; diff --git a/llarp/dht/messages/findintro.cpp b/llarp/dht/messages/findintro.cpp index 8a3f63d0a..eb5a3d5f6 100644 --- a/llarp/dht/messages/findintro.cpp +++ b/llarp/dht/messages/findintro.cpp @@ -106,7 +106,7 @@ namespace llarp } auto closestRCs = - dht.GetRouter()->nodedb()->FindClosestTo(location, IntroSetStorageRedundancy); + dht.GetRouter()->nodedb()->FindManyClosestTo(location, IntroSetStorageRedundancy); if (closestRCs.size() <= relayOrder) { diff --git a/llarp/dht/messages/findname.cpp b/llarp/dht/messages/findname.cpp index 360f27e15..1a8bb698e 100644 --- a/llarp/dht/messages/findname.cpp +++ b/llarp/dht/messages/findname.cpp @@ -1,5 +1,5 @@ #include -#include +#include #include #include #include @@ -7,6 +7,8 @@ #include #include +namespace lokimq = oxenmq; + namespace llarp::dht { FindNameMessage::FindNameMessage(const Key_t& from, Key_t namehash, uint64_t txid) diff --git a/llarp/dht/messages/gotname.cpp b/llarp/dht/messages/gotname.cpp index 9bbd05fb5..40a4b3302 100644 --- a/llarp/dht/messages/gotname.cpp +++ b/llarp/dht/messages/gotname.cpp @@ -1,9 +1,11 @@ #include -#include +#include #include #include #include +namespace lokimq = oxenmq; + namespace llarp::dht { constexpr size_t NameSizeLimit = 128; diff --git a/llarp/dht/messages/pubintro.cpp b/llarp/dht/messages/pubintro.cpp index 878f9bbf4..0496505b5 100644 --- a/llarp/dht/messages/pubintro.cpp +++ b/llarp/dht/messages/pubintro.cpp @@ -86,7 +86,8 @@ namespace llarp } // identify closest 4 routers - auto closestRCs = dht.GetRouter()->nodedb()->FindClosestTo(addr, IntroSetStorageRedundancy); + auto closestRCs = + dht.GetRouter()->nodedb()->FindManyClosestTo(addr, IntroSetStorageRedundancy); if (closestRCs.size() != IntroSetStorageRedundancy) { llarp::LogWarn("Received PublishIntroMessage but only know ", closestRCs.size(), " nodes"); diff --git a/llarp/ev/vpn.hpp b/llarp/ev/vpn.hpp index 60c95fe1e..38c5cd6ed 100644 --- a/llarp/ev/vpn.hpp +++ b/llarp/ev/vpn.hpp @@ -81,7 +81,7 @@ namespace llarp::vpn }; /// create native vpn platform - std::unique_ptr + std::shared_ptr MakeNativePlatform(llarp::Context* ctx); } // namespace llarp::vpn diff --git a/llarp/exit/session.cpp b/llarp/exit/session.cpp index 3a075698e..9211c3ba6 100644 --- a/llarp/exit/session.cpp +++ b/llarp/exit/session.cpp @@ -70,30 +70,10 @@ namespace llarp m_SnodeBlacklist.insert(std::move(snode)); } - bool - BaseSession::SelectHop( - llarp_nodedb* db, - const std::set& prev, - RouterContact& cur, - size_t hop, - llarp::path::PathRole roles) + std::optional> + BaseSession::GetHopsForBuild() { - std::set exclude = prev; - for (const auto& snode : m_SnodeBlacklist) - { - if (snode != m_ExitRouter) - exclude.insert(snode); - } - exclude.insert(m_ExitRouter); - if (hop == numHops - 1) - { - if (db->Get(m_ExitRouter, cur)) - return true; - m_router->LookupRouter(m_ExitRouter, nullptr); - return false; - } - - return path::Builder::SelectHop(db, exclude, cur, hop, roles); + return GetHopsAlignedToForBuild(m_ExitRouter); } bool @@ -306,9 +286,8 @@ namespace llarp if (numHops == 1) { auto r = m_router; - RouterContact rc; - if (r->nodedb()->Get(m_ExitRouter, rc)) - r->TryConnectAsync(rc, 5); + if (const auto maybe = r->nodedb()->Get(m_ExitRouter); maybe.has_value()) + r->TryConnectAsync(*maybe, 5); else r->LookupRouter(m_ExitRouter, [r](const std::vector& results) { if (results.size()) diff --git a/llarp/exit/session.hpp b/llarp/exit/session.hpp index cab5c5d95..625436b5d 100644 --- a/llarp/exit/session.hpp +++ b/llarp/exit/session.hpp @@ -67,13 +67,8 @@ namespace llarp bool CheckPathDead(path::Path_ptr p, llarp_time_t dlt); - bool - SelectHop( - llarp_nodedb* db, - const std::set& prev, - RouterContact& cur, - size_t hop, - llarp::path::PathRole roles) override; + std::optional> + GetHopsForBuild() override; bool ShouldBuildMore(llarp_time_t now) const override; diff --git a/llarp/messages/relay_commit.cpp b/llarp/messages/relay_commit.cpp index ac0da0a63..fbee25177 100644 --- a/llarp/messages/relay_commit.cpp +++ b/llarp/messages/relay_commit.cpp @@ -305,20 +305,6 @@ namespace llarp self->hop->info.upstream, self->hop->ExpireTime() + 10s); // put hop self->context->PutTransitHop(self->hop); - // if we have an rc for this hop... - if (self->record.nextRC) - { - // ... and it matches the next hop ... - if (self->record.nextHop == self->record.nextRC->pubkey) - { - // ... and it's valid - const auto now = self->context->Router()->Now(); - if (self->record.nextRC->IsPublicRouter() && self->record.nextRC->Verify(now)) - { - self->context->Router()->nodedb()->UpdateAsyncIfNewer(*self->record.nextRC.get()); - } - } - } // forward to next hop using std::placeholders::_1; auto func = std::bind( diff --git a/llarp/net/route.cpp b/llarp/net/route.cpp index 5a3037f9c..ea67edc1d 100644 --- a/llarp/net/route.cpp +++ b/llarp/net/route.cpp @@ -408,10 +408,10 @@ namespace llarp::net if (parts[1].find_first_not_of('0') == std::string::npos and parts[0] != ifname) { const auto& ip = parts[2]; - if ((ip.size() == sizeof(uint32_t) * 2) and lokimq::is_hex(ip)) + if ((ip.size() == sizeof(uint32_t) * 2) and oxenmq::is_hex(ip)) { huint32_t x{}; - lokimq::from_hex(ip.begin(), ip.end(), reinterpret_cast(&x.h)); + oxenmq::from_hex(ip.begin(), ip.end(), reinterpret_cast(&x.h)); gateways.emplace_back(x.ToString()); } } diff --git a/llarp/nodedb.cpp b/llarp/nodedb.cpp index d41896a7b..6f01ca5f3 100644 --- a/llarp/nodedb.cpp +++ b/llarp/nodedb.cpp @@ -19,490 +19,255 @@ static const char skiplist_subdirs[] = "0123456789abcdef"; static const std::string RC_FILE_EXT = ".signed"; -llarp_nodedb::NetDBEntry::NetDBEntry(llarp::RouterContact value) - : rc(std::move(value)), inserted(llarp::time_now_ms()) -{} - -bool -llarp_nodedb::Remove(const llarp::RouterID& pk) -{ - bool removed = false; - RemoveIf([&](const llarp::RouterContact& rc) -> bool { - if (rc.pubkey == pk) - { - removed = true; - return true; - } - return false; - }); - return removed; -} - -void -llarp_nodedb::Clear() +namespace llarp { - llarp::util::Lock lock(access); - entries.clear(); -} + NodeDB::Entry::Entry(RouterContact value) : rc(std::move(value)), insertedAt(llarp::time_now_ms()) + {} -bool -llarp_nodedb::Get(const llarp::RouterID& pk, llarp::RouterContact& result) -{ - llarp::util::Lock l(access); - auto itr = entries.find(pk); - if (itr == entries.end()) - return false; - result = itr->second.rc; - return true; -} - -void -llarp_nodedb::RemoveIf(std::function filter) -{ - std::set files; + static void + EnsureSkiplist(fs::path nodedbDir) { - llarp::util::Lock l(access); - auto itr = entries.begin(); - while (itr != entries.end()) + if (not fs::exists(nodedbDir)) { - if (filter(itr->second.rc)) - { - files.insert(getRCFilePath(itr->second.rc.pubkey)); - itr = entries.erase(itr); - } + // if the old 'netdb' directory exists, move it to this one + fs::path parent = nodedbDir.parent_path(); + fs::path old = parent / "netdb"; + if (fs::exists(old)) + fs::rename(old, nodedbDir); else - ++itr; + fs::create_directory(nodedbDir); } - } - disk([files = std::move(files)]() { - for (const auto& file : files) - fs::remove(file); - }); -} - -bool -llarp_nodedb::Has(const llarp::RouterID& pk) -{ - llarp::util::Lock lock(access); - return entries.find(pk) != entries.end(); -} - -llarp::RouterContact -llarp_nodedb::FindClosestTo(const llarp::dht::Key_t& location) -{ - llarp::RouterContact rc; - const llarp::dht::XorMetric compare(location); - visit([&rc, compare](const auto& otherRC) -> bool { - if (rc.pubkey.IsZero()) - { - rc = otherRC; - return true; - } - if (compare( - llarp::dht::Key_t{otherRC.pubkey.as_array()}, llarp::dht::Key_t{rc.pubkey.as_array()})) - rc = otherRC; - return true; - }); - return rc; -} - -std::vector -llarp_nodedb::FindClosestTo(const llarp::dht::Key_t& location, uint32_t numRouters) -{ - llarp::util::Lock lock(access); - std::vector all; - all.reserve(entries.size()); - for (auto& entry : entries) - { - all.push_back(&entry.second.rc); - } + if (not fs::is_directory(nodedbDir)) + throw std::runtime_error(llarp::stringify("nodedb ", nodedbDir, " is not a directory")); - auto it_mid = numRouters < all.size() ? all.begin() + numRouters : all.end(); - std::partial_sort( - all.begin(), - it_mid, - all.end(), - [compare = llarp::dht::XorMetric{location}](auto* a, auto* b) { return compare(*a, *b); }); - - std::vector closest; - closest.reserve(numRouters); - for (auto it = all.begin(); it != it_mid; ++it) - closest.push_back(**it); - - return closest; -} - -/// skiplist directory is hex encoded first nibble -/// skiplist filename is .snode.signed -std::string -llarp_nodedb::getRCFilePath(const llarp::RouterID& pubkey) const -{ - std::string hexString = lokimq::to_hex(pubkey.begin(), pubkey.end()); - std::string skiplistDir; - - llarp::RouterID r(pubkey); - std::string fname = r.ToString(); - - skiplistDir += hexString[0]; - fname += RC_FILE_EXT; - fs::path filepath = nodePath / skiplistDir / fname; - return filepath.string(); -} - -void -llarp_nodedb::InsertAsync( - llarp::RouterContact rc, - std::shared_ptr logic, - std::function completionHandler) -{ - disk([this, rc, logic, completionHandler]() { - this->Insert(rc); - if (logic && completionHandler) + for (const char& ch : skiplist_subdirs) { - LogicCall(logic, completionHandler); + // this seems to be a problem on all targets + // perhaps cpp17::fs is just as screwed-up + // attempting to create a folder with no name + // what does this mean...? + if (!ch) + continue; + + fs::path sub = nodedbDir / std::string(&ch, 1); + fs::create_directory(sub); } - }); -} - -bool -llarp_nodedb::UpdateAsyncIfNewer( - llarp::RouterContact rc, - std::shared_ptr logic, - std::function completionHandler) -{ - llarp::util::Lock lock(access); - auto itr = entries.find(rc.pubkey); - if (itr == entries.end() || itr->second.rc.OtherIsNewer(rc)) - { - InsertAsync(rc, logic, completionHandler); - return true; - } - if (itr != entries.end()) - { - // insertion time is set on...insertion. But it should be updated here - // even if there is no insertion of a new RC, to show that the existing one - // is not "stale" - itr->second.inserted = llarp::time_now_ms(); } - return false; -} -/// insert -bool -llarp_nodedb::Insert(const llarp::RouterContact& rc) -{ - llarp::util::Lock lock(access); - auto itr = entries.find(rc.pubkey.as_array()); - if (itr != entries.end()) - entries.erase(itr); - entries.emplace(rc.pubkey.as_array(), rc); - LogDebug( - "Added or updated RC for ", - llarp::RouterID(rc.pubkey), - " to nodedb. Current nodedb count is: ", - entries.size()); - return true; -} - -ssize_t -llarp_nodedb::Load(const fs::path& path) -{ - std::error_code ec; - if (!fs::exists(path, ec)) - { - return -1; - } - ssize_t loaded = 0; + constexpr auto FlushInterval = 5min; - for (const char& ch : skiplist_subdirs) + NodeDB::NodeDB(fs::path root, std::function)> diskCaller) + : m_Root{std::move(root)} + , disk(std::move(diskCaller)) + , m_NextFlushAt{time_now_ms() + FlushInterval} { - if (!ch) - continue; - std::string p; - p += ch; - fs::path sub = path / p; - - ssize_t l = loadSubdir(sub); - if (l > 0) - loaded += l; + EnsureSkiplist(m_Root); } - m_NextSaveToDisk = llarp::time_now_ms() + m_SaveInterval; - return loaded; -} -void -llarp_nodedb::SaveAll() -{ - std::array tmp; - llarp::util::Lock lock(access); - for (const auto& item : entries) + void + NodeDB::Tick(llarp_time_t now) { - llarp_buffer_t buf(tmp); - - if (!item.second.rc.BEncode(&buf)) - continue; - - buf.sz = buf.cur - buf.base; - const auto filepath = getRCFilePath(item.second.rc.pubkey); - auto optional_ofs = llarp::util::OpenFileStream( - filepath, std::ofstream::out | std::ofstream::binary | std::ofstream::trunc); - if (!optional_ofs) - continue; - auto& ofs = *optional_ofs; - ofs.write((char*)buf.base, buf.sz); - ofs.flush(); - ofs.close(); + if (now > m_NextFlushAt) + { + m_NextFlushAt += FlushInterval; + // make copy of all rcs + std::vector copy; + for (const auto& item : m_Entries) + copy.push_back(item.second.rc); + // flush them to disk in one big job + // TODO: split this up? idk maybe some day... + disk([this, data = std::move(copy)]() { + for (const auto& rc : data) + { + rc.Write(GetPathForPubkey(rc.pubkey)); + } + }); + } } -} -bool -llarp_nodedb::ShouldSaveToDisk(llarp_time_t now) const -{ - if (now == 0s) - now = llarp::time_now_ms(); - return m_NextSaveToDisk > 0s && m_NextSaveToDisk <= now; -} + fs::path + NodeDB::GetPathForPubkey(RouterID pubkey) const + { + std::string hexString = oxenmq::to_hex(pubkey.begin(), pubkey.end()); + std::string skiplistDir; -void -llarp_nodedb::AsyncFlushToDisk() -{ - disk([this]() { SaveAll(); }); - m_NextSaveToDisk = llarp::time_now_ms() + m_SaveInterval; -} + const llarp::RouterID r{pubkey}; + std::string fname = r.ToString(); -ssize_t -llarp_nodedb::loadSubdir(const fs::path& dir) -{ - ssize_t sz = 0; - llarp::util::IterDir(dir, [&](const fs::path& f) -> bool { - if (fs::is_regular_file(f) && loadfile(f)) - sz++; - return true; - }); - return sz; -} - -bool -llarp_nodedb::loadfile(const fs::path& fpath) -{ - if (fpath.extension() != RC_FILE_EXT) - return false; - llarp::RouterContact rc; - if (!rc.Read(fpath)) - { - llarp::LogError("failed to read file ", fpath); - return false; + skiplistDir += hexString[0]; + fname += RC_FILE_EXT; + return m_Root / skiplistDir / fname; } - if (!rc.Verify(llarp::time_now_ms())) + + void + NodeDB::LoadFromDisk() { - llarp::LogError(fpath, " contains invalid RC"); - return false; + for (const char& ch : skiplist_subdirs) + { + if (!ch) + continue; + std::string p; + p += ch; + fs::path sub = m_Root / p; + + llarp::util::IterDir(sub, [&](const fs::path& f) -> bool { + if (fs::is_regular_file(f) and f.extension() == RC_FILE_EXT) + { + RouterContact rc{}; + if (rc.Read(f) and rc.Verify(time_now_ms())) + m_Entries.emplace(rc.pubkey, rc); + } + return true; + }); + } } + + void + NodeDB::SaveToDisk() const { - llarp::util::Lock lock(access); - entries.emplace(rc.pubkey.as_array(), rc); + for (const auto& item : m_Entries) + { + item.second.rc.Write(GetPathForPubkey(item.first)); + } } - return true; -} -void -llarp_nodedb::visit(std::function visit) -{ - llarp::util::Lock lock(access); - auto itr = entries.begin(); - while (itr != entries.end()) + bool + NodeDB::Has(RouterID pk) const { - if (!visit(itr->second.rc)) - return; - ++itr; + util::NullLock lock{m_Access}; + return m_Entries.find(pk) != m_Entries.end(); } -} -void -llarp_nodedb::VisitInsertedBefore( - std::function visit, llarp_time_t insertedAfter) -{ - llarp::util::Lock lock(access); - auto itr = entries.begin(); - while (itr != entries.end()) + std::optional + NodeDB::Get(RouterID pk) const { - if (itr->second.inserted < insertedAfter) - visit(itr->second.rc); - ++itr; + util::NullLock lock{m_Access}; + const auto itr = m_Entries.find(pk); + if (itr == m_Entries.end()) + return std::nullopt; + return itr->second.rc; } -} -void -llarp_nodedb::RemoveStaleRCs(const std::set& keep, llarp_time_t cutoff) -{ - std::set removeStale; - // remove stale routers - VisitInsertedBefore( - [&](const llarp::RouterContact& rc) { - if (keep.find(rc.pubkey) != keep.end()) - return; - LogInfo("removing stale router: ", llarp::RouterID(rc.pubkey)); - removeStale.insert(rc.pubkey); - }, - cutoff); - - RemoveIf([&removeStale](const llarp::RouterContact& rc) -> bool { - return removeStale.count(rc.pubkey) > 0; - }); -} - -// write it to disk -void -disk_threadworker_setRC(llarp_async_verify_rc* verify_request) -{ - verify_request->valid = verify_request->nodedb->Insert(verify_request->rc); - if (verify_request->logic) + void + NodeDB::Remove(RouterID pk) { - LogicCall(verify_request->logic, [verify_request]() { - if (verify_request->hook) - verify_request->hook(verify_request); - }); + util::NullLock lock{m_Access}; + m_Entries.erase(pk); + AsyncRemoveManyFromDisk({pk}); } -} -// we run the crypto verify in the crypto threadpool worker -void -crypto_threadworker_verifyrc(llarp_async_verify_rc* verify_request) -{ - llarp::RouterContact rc = verify_request->rc; - verify_request->valid = rc.Verify(llarp::time_now_ms()); - // if it's valid we need to set it - if (verify_request->valid && rc.IsPublicRouter()) + void + NodeDB::RemoveStaleRCs(std::unordered_set keep, llarp_time_t cutoff) { - if (verify_request->disk) + util::NullLock lock{m_Access}; + std::unordered_set removed; + auto itr = m_Entries.begin(); + while (itr != m_Entries.end()) { - llarp::LogDebug("RC is valid, saving to disk"); - verify_request->disk(std::bind(&disk_threadworker_setRC, verify_request)); - return; + if (itr->second.insertedAt < cutoff and keep.count(itr->second.rc.pubkey) == 0) + { + removed.insert(itr->second.rc.pubkey); + itr = m_Entries.erase(itr); + } + else + ++itr; } + if (not removed.empty()) + AsyncRemoveManyFromDisk(std::move(removed)); } - // callback to logic thread - LogicCall(verify_request->logic, [verify_request]() { - if (verify_request->hook) - verify_request->hook(verify_request); - }); -} - -void -llarp_nodedb_async_verify(struct llarp_async_verify_rc* job) -{ - job->worker(std::bind(&crypto_threadworker_verifyrc, job)); -} -void -llarp_nodedb::ensure_dir(const fs::path& nodedbDir) -{ - if (not fs::exists(nodedbDir)) + void + NodeDB::Put(RouterContact rc) { - // if the old 'netdb' directory exists, move it to this one - fs::path parent = nodedbDir.parent_path(); - fs::path old = parent / "netdb"; - if (fs::exists(old)) - fs::rename(old, nodedbDir); - else - fs::create_directory(nodedbDir); + util::NullLock lock{m_Access}; + m_Entries.erase(rc.pubkey); + m_Entries.emplace(rc.pubkey, rc); } - if (not fs::is_directory(nodedbDir)) - throw std::runtime_error(llarp::stringify("nodedb ", nodedbDir, " is not a directory")); - - for (const char& ch : skiplist_subdirs) + size_t + NodeDB::NumLoaded() const { - // this seems to be a problem on all targets - // perhaps cpp17::fs is just as screwed-up - // attempting to create a folder with no name - // what does this mean...? - if (!ch) - continue; - - fs::path sub = nodedbDir / std::string(&ch, 1); - fs::create_directory(sub); + util::NullLock lock{m_Access}; + return m_Entries.size(); } -} - -ssize_t -llarp_nodedb::LoadAll() -{ - return Load(nodePath.c_str()); -} - -size_t -llarp_nodedb::num_loaded() const -{ - llarp::util::Lock l{access}; - return entries.size(); -} -bool -llarp_nodedb::select_random_exit(llarp::RouterContact& result) -{ - llarp::util::Lock lock(access); - const auto sz = entries.size(); - auto itr = entries.begin(); - if (sz < 3) - return false; - auto idx = llarp::randint() % sz; - if (idx) - std::advance(itr, idx - 1); - while (itr != entries.end()) + void + NodeDB::PutIfNewer(RouterContact rc) { - if (itr->second.rc.IsExit()) + util::NullLock lock{m_Access}; + auto itr = m_Entries.find(rc.pubkey); + if (itr == m_Entries.end() or itr->second.rc.OtherIsNewer(rc)) { - result = itr->second.rc; - return true; + // delete if existing + if (itr != m_Entries.end()) + m_Entries.erase(itr); + // add new entry + m_Entries.emplace(rc.pubkey, rc); } - ++itr; } - // wrap around - itr = entries.begin(); - while (idx--) + + void + NodeDB::AsyncRemoveManyFromDisk(std::unordered_set remove) const { - if (itr->second.rc.IsExit()) + // build file list + std::set files; + for (auto id : remove) { - result = itr->second.rc; - return true; + files.emplace(GetPathForPubkey(std::move(id))); } - ++itr; + // remove them from the disk via the diskio thread + disk([files]() { + for (auto fpath : files) + fs::remove(fpath); + }); } - return false; -} -bool -llarp_nodedb::select_random_hop_excluding( - llarp::RouterContact& result, const std::set& exclude) -{ - llarp::util::Lock lock(access); - /// checking for "guard" status for N = 0 is done by caller inside of - /// pathbuilder's scope - const size_t sz = entries.size(); - if (sz < 3) + llarp::RouterContact + NodeDB::FindClosestTo(llarp::dht::Key_t location) const { - return false; + util::NullLock lock{m_Access}; + llarp::RouterContact rc; + const llarp::dht::XorMetric compare(location); + VisitAll([&rc, compare](const auto& otherRC) { + if (rc.pubkey.IsZero()) + { + rc = otherRC; + return; + } + if (compare( + llarp::dht::Key_t{otherRC.pubkey.as_array()}, + llarp::dht::Key_t{rc.pubkey.as_array()})) + rc = otherRC; + }); + return rc; } - const size_t pos = llarp::randint() % sz; - const auto start = std::next(entries.begin(), pos); - for (auto itr = start; itr != entries.end(); ++itr) + std::vector + NodeDB::FindManyClosestTo(llarp::dht::Key_t location, uint32_t numRouters) const { - if (exclude.count(itr->first) == 0 and itr->second.rc.IsPublicRouter()) - { - result = itr->second.rc; - return true; - } - } - for (auto itr = entries.begin(); itr != start; ++itr) - { - if (exclude.count(itr->first) == 0 and itr->second.rc.IsPublicRouter()) + util::NullLock lock{m_Access}; + std::vector all; + + const auto& entries = m_Entries; + + all.reserve(entries.size()); + for (auto& entry : entries) { - result = itr->second.rc; - return true; + all.push_back(&entry.second.rc); } + + auto it_mid = numRouters < all.size() ? all.begin() + numRouters : all.end(); + std::partial_sort( + all.begin(), it_mid, all.end(), [compare = dht::XorMetric{location}](auto* a, auto* b) { + return compare(*a, *b); + }); + + std::vector closest; + closest.reserve(numRouters); + for (auto it = all.begin(); it != it_mid; ++it) + closest.push_back(**it); + + return closest; } - return false; -} +} // namespace llarp diff --git a/llarp/nodedb.hpp b/llarp/nodedb.hpp index f28939251..5588d88da 100644 --- a/llarp/nodedb.hpp +++ b/llarp/nodedb.hpp @@ -8,221 +8,174 @@ #include #include #include +#include #include +#include +#include +#include #include +#include -/** - * nodedb.hpp - * - * persistent storage API for router contacts - */ namespace llarp { class Logic; -} // namespace llarp - -struct llarp_nodedb -{ - using DiskJob_t = std::function; - using DiskCaller_t = std::function; - using WorkJob_t = std::function; - using WorkCaller_t = std::function; - explicit llarp_nodedb(const std::string rootdir, DiskCaller_t diskCaller) - : disk(std::move(diskCaller)), nodePath(rootdir) - - {} - - ~llarp_nodedb() + class NodeDB { - Clear(); - } - - const DiskCaller_t disk; - mutable llarp::util::Mutex access; // protects entries - /// time for next save to disk event, 0 if never happened - llarp_time_t m_NextSaveToDisk = 0s; - /// how often to save to disk - const llarp_time_t m_SaveInterval = 5min; - - struct NetDBEntry - { - const llarp::RouterContact rc; - llarp_time_t inserted; - - NetDBEntry(llarp::RouterContact data); + struct Entry + { + const RouterContact rc; + llarp_time_t insertedAt; + explicit Entry(RouterContact rc); + }; + using NodeMap = std::unordered_map; + + NodeMap m_Entries; + + const fs::path m_Root; + + const std::function)> disk; + + llarp_time_t m_NextFlushAt; + + mutable util::NullMutex m_Access; + + /// asynchronously remove the files for a set of rcs on disk given their public ident key + void + AsyncRemoveManyFromDisk(std::unordered_set idents) const; + + /// get filename of an RC file given its public ident key + fs::path + GetPathForPubkey(RouterID pk) const; + + public: + explicit NodeDB(fs::path rootdir, std::function)> diskCaller); + + /// load all entries from disk syncrhonously + void + LoadFromDisk(); + + /// explicit save all RCs to disk synchronously + void + SaveToDisk() const; + + /// the number of RCs that are loaded from disk + size_t + NumLoaded() const; + + /// do periodic tasks like flush to disk and expiration + void + Tick(llarp_time_t now); + + /// find the absolute closets router to a dht location + RouterContact + FindClosestTo(dht::Key_t location) const; + + /// find many routers closest to dht key + std::vector + FindManyClosestTo(dht::Key_t location, uint32_t numRouters) const; + + /// return true if we have an rc by its ident pubkey + bool + Has(RouterID pk) const; + + /// maybe get an rc by its ident pubkey + std::optional + Get(RouterID pk) const; + + template + std::optional + GetIf(Filter visit, bool shuffle = true) const + { + util::NullLock lock{m_Access}; + const auto sz = m_Entries.size(); + if (sz < 3) + return std::nullopt; + auto begin = m_Entries.begin(); + auto start = begin; + if (shuffle) + { + start = std::next(start, randint() % sz); + } + for (auto itr = start; itr != m_Entries.end(); ++itr) + { + if (visit(itr->second.rc)) + return itr->second.rc; + } + if (shuffle) + { + for (auto itr = begin; begin != start; ++itr) + { + if (visit(itr->second.rc)) + return itr->second.rc; + } + } + return std::nullopt; + } + + /// visit all entries + template + void + VisitAll(Visit visit) const + { + util::NullLock lock{m_Access}; + for (const auto& item : m_Entries) + { + visit(item.second.rc); + } + } + + /// visit all entries inserted before a timestamp + template + void + VisitInsertedBefore(Visit visit, llarp_time_t insertedBefore) + { + util::NullLock lock{m_Access}; + for (const auto& item : m_Entries) + { + if (item.second.insertedAt < insertedBefore) + visit(item.second.rc); + } + } + + /// remove an entry via its ident pubkey + void + Remove(RouterID pk); + + /// remove an entry given a filter that inspects the rc + template + void + RemoveIf(Filter visit) + { + util::NullLock lock{m_Access}; + std::unordered_set removed; + auto itr = m_Entries.begin(); + while (itr != m_Entries.end()) + { + if (visit(itr->second.rc)) + { + removed.insert(itr->second.rc.pubkey); + itr = m_Entries.erase(itr); + } + else + ++itr; + } + if (not removed.empty()) + AsyncRemoveManyFromDisk(std::move(removed)); + } + + /// remove rcs that are not in keep and have been inserted before cutoff + void + RemoveStaleRCs(std::unordered_set keep, llarp_time_t cutoff); + + /// put this rc into the cache if it is not there or newer than the one there already + void + PutIfNewer(RouterContact rc); + + /// unconditional put of rc into cache + void + Put(RouterContact rc); }; - - using NetDBMap_t = std::unordered_map; - - NetDBMap_t entries GUARDED_BY(access); - fs::path nodePath; - - llarp::RouterContact - FindClosestTo(const llarp::dht::Key_t& location); - - /// find the $numRouters closest routers to the given DHT key - std::vector - FindClosestTo(const llarp::dht::Key_t& location, uint32_t numRouters); - - /// return true if we should save our nodedb to disk - bool - ShouldSaveToDisk(llarp_time_t now = 0s) const; - - bool - Remove(const llarp::RouterID& pk) EXCLUDES(access); - - void - RemoveIf(std::function filter) EXCLUDES(access); - - void - Clear() EXCLUDES(access); - - bool - Get(const llarp::RouterID& pk, llarp::RouterContact& result) EXCLUDES(access); - - bool - Has(const llarp::RouterID& pk) EXCLUDES(access); - - std::string - getRCFilePath(const llarp::RouterID& pubkey) const; - - /// insert without writing to disk - bool - Insert(const llarp::RouterContact& rc) EXCLUDES(access); - - /// invokes Insert() asynchronously with an optional completion - /// callback - void - InsertAsync( - llarp::RouterContact rc, - std::shared_ptr l = nullptr, - std::function completionHandler = nullptr); - - /// update rc if newer - /// return true if we started to put this rc in the database - /// retur false if not newer - bool - UpdateAsyncIfNewer( - llarp::RouterContact rc, - std::shared_ptr l = nullptr, - std::function completionHandler = nullptr) EXCLUDES(access); - - ssize_t - Load(const fs::path& path); - - ssize_t - loadSubdir(const fs::path& dir); - /// save all entries to disk async - void - AsyncFlushToDisk(); - - bool - loadfile(const fs::path& fpath) EXCLUDES(access); - - void - visit(std::function visit) EXCLUDES(access); - - void - set_dir(const char* dir); - - ssize_t - LoadAll(); - - ssize_t - store_dir(const char* dir); - - /// visit all entries inserted into nodedb cache before a timestamp - void - VisitInsertedBefore( - std::function visit, llarp_time_t insertedAfter) - EXCLUDES(access); - - void - RemoveStaleRCs(const std::set& keep, llarp_time_t cutoff); - - size_t - num_loaded() const EXCLUDES(access); - - bool - select_random_exit(llarp::RouterContact& rc) EXCLUDES(access); - - bool - select_random_hop_excluding( - llarp::RouterContact& result, const std::set& exclude) EXCLUDES(access); - - /// Ensures that the given nodedb 'dir' exists - /// - /// @param nodedbDir should be the desired nodedb directory - /// @throws on any filesistem error or if `nodedbDir` exists and is not a directory - static void - ensure_dir(const fs::path& nodedbDir); - - void - SaveAll() EXCLUDES(access); -}; - -/// struct for async rc verification -struct llarp_async_verify_rc; - -using llarp_async_verify_rc_hook_func = std::function; - -/// verify rc request -struct llarp_async_verify_rc -{ - /// async_verify_context - void* user; - /// nodedb storage - llarp_nodedb* nodedb; - // llarp::Logic for queue_job - std::shared_ptr logic; - llarp_nodedb::WorkCaller_t worker; - llarp_nodedb::DiskCaller_t disk; - - /// router contact - llarp::RouterContact rc; - /// result - bool valid; - /// hook - llarp_async_verify_rc_hook_func hook; -}; - -/** - struct for async rc verification - data is loaded in disk io threadpool - crypto is done on the crypto worker threadpool - result is called on the logic thread -*/ -void -llarp_nodedb_async_verify(struct llarp_async_verify_rc* job); - -struct llarp_async_load_rc; - -using llarp_async_load_rc_hook_func = std::function; - -struct llarp_async_load_rc -{ - /// async_verify_context - void* user; - /// nodedb storage - llarp_nodedb* nodedb; - /// llarp::Logic for calling hook - llarp::Logic* logic; - /// disk worker threadpool - llarp_nodedb::DiskCaller_t disk; - /// target pubkey - llarp::PubKey pubkey; - /// router contact result - llarp::RouterContact result; - /// set to true if we loaded the rc - bool loaded; - /// hook function called in logic thread - llarp_async_load_rc_hook_func hook; -}; - -/// asynchronously load an rc from disk -void -llarp_nodedb_async_load_rc(struct llarp_async_load_rc* job); - +} // namespace llarp #endif diff --git a/llarp/path/pathbuilder.cpp b/llarp/path/pathbuilder.cpp index 3afe33f40..4732af963 100644 --- a/llarp/path/pathbuilder.cpp +++ b/llarp/path/pathbuilder.cpp @@ -205,56 +205,53 @@ namespace llarp return obj; } - bool - Builder::SelectHop( - llarp_nodedb* db, - const std::set& exclude, - RouterContact& cur, - size_t hop, - PathRole roles) + std::optional + Builder::SelectFirstHop() const { - (void)roles; - size_t tries = 10; - if (hop == 0) - { - if (m_router->NumberOfConnectedRouters() == 0) - { - return false; - } - bool got = false; - m_router->ForEachPeer( - [&](const ILinkSession* s, bool isOutbound) { - if (s && s->IsEstablished() && isOutbound && !got) - { - const RouterContact rc = s->GetRemoteRC(); -#ifdef TESTNET - if (got || exclude.count(rc.pubkey)) -#else - if (got || exclude.count(rc.pubkey) || m_router->IsBootstrapNode(rc.pubkey)) + std::optional found = std::nullopt; + m_router->ForEachPeer( + [&](const ILinkSession* s, bool isOutbound) { + if (s && s->IsEstablished() && isOutbound && not found.has_value()) + { + const RouterContact rc = s->GetRemoteRC(); +#ifndef TESTNET + if (m_router->IsBootstrapNode(rc.pubkey)) + return; #endif - return; - cur = rc; - got = true; - } - }, - true); - return got; - } + found = rc; + } + }, + true); + return found; + } - do + std::optional> + Builder::GetHopsForBuild() + { + std::vector hops; { - cur.Clear(); - --tries; - std::set excluding = exclude; - if (db->select_random_hop_excluding(cur, excluding)) - { - excluding.insert(cur.pubkey); - if (!m_router->routerProfiling().IsBadForPath(cur.pubkey)) - return true; - } - } while (tries > 0); - - return false; + const auto maybe = SelectFirstHop(); + if (not maybe.has_value()) + return std::nullopt; + hops.emplace_back(*maybe); + }; + for (size_t idx = hops.size(); idx < numHops; ++idx) + { + const auto maybe = m_router->nodedb()->GetIf([&hops, r = m_router](const auto& rc) -> bool { + if (r->routerProfiling().IsBadForPath(rc.pubkey)) + return false; + for (const auto& hop : hops) + { + if (hop.pubkey == rc.pubkey) + return false; + } + return true; + }); + if (not maybe.has_value()) + return std::nullopt; + hops.emplace_back(*maybe); + } + return hops; } bool @@ -301,9 +298,8 @@ namespace llarp void Builder::BuildOne(PathRole roles) { - std::vector hops(numHops); - if (SelectHops(m_router->nodedb(), hops, roles)) - Build(hops, roles); + if (const auto maybe = GetHopsForBuild(); maybe.has_value()) + Build(*maybe, roles); } bool Builder::UrgentBuild(llarp_time_t) const @@ -311,114 +307,60 @@ namespace llarp return buildIntervalLimit > MIN_PATH_BUILD_INTERVAL * 4; } - bool - Builder::DoUrgentBuildAlignedTo(const RouterID remote, std::vector& hops) + std::optional> + Builder::GetHopsAlignedToForBuild(RouterID endpoint) { - const auto aligned = m_router->pathContext().FindOwnedPathsWithEndpoint(remote); - /// pick the lowest latency path that aligns to remote - /// note: peer exhaustion is made worse happen here - Path_ptr p; - llarp_time_t min = std::numeric_limits::max(); - for (const auto& path : aligned) - { - if (path->intro.latency < min && path->hops.size() == numHops) - { - p = path; - min = path->intro.latency; - } - } - if (p) + std::vector hops; { - for (const auto& hop : p->hops) - { - if (hop.rc.pubkey.IsZero()) - return false; - hops.emplace_back(hop.rc); - } - } - - return true; - } - - bool - Builder::DoBuildAlignedTo(const RouterID remote, std::vector& hops) - { - std::set routers{remote}; - hops.resize(numHops); - - auto nodedb = m_router->nodedb(); - for (size_t idx = 0; idx < hops.size(); idx++) + const auto maybe = SelectFirstHop(); + if (not maybe.has_value()) + return std::nullopt; + hops.emplace_back(*maybe); + }; + for (size_t idx = hops.size(); idx < numHops; ++idx) { - hops[idx].Clear(); - if (idx == numHops - 1) + if (idx + 1 == numHops) { - // last hop - if (!nodedb->Get(remote, hops[idx])) + const auto maybe = m_router->nodedb()->Get(endpoint); + if (maybe.has_value()) { - m_router->LookupRouter(remote, nullptr); - return false; + hops.emplace_back(*maybe); } + else + return std::nullopt; } else { - if (!SelectHop(nodedb, routers, hops[idx], idx, path::ePathRoleAny)) - { - return false; - } + const auto maybe = + m_router->nodedb()->GetIf([&hops, r = m_router, endpoint](const auto& rc) -> bool { + if (r->routerProfiling().IsBadForPath(rc.pubkey)) + return false; + for (const auto& hop : hops) + { + if (hop.pubkey == rc.pubkey) + return false; + } + return rc.pubkey != endpoint; + }); + + if (not maybe.has_value()) + return std::nullopt; + hops.emplace_back(*maybe); } - if (hops[idx].pubkey.IsZero()) - return false; - routers.insert(hops[idx].pubkey); } - - return true; + return hops; } bool Builder::BuildOneAlignedTo(const RouterID remote) { - std::vector hops; - /// if we really need this path build it "dangerously" - if (UrgentBuild(m_router->Now())) + if (const auto maybe = GetHopsAlignedToForBuild(remote); maybe.has_value()) { - if (!DoUrgentBuildAlignedTo(remote, hops)) - { - return false; - } + LogInfo(Name(), " building path to ", remote); + Build(*maybe); + return true; } - - if (hops.empty()) - { - if (!DoBuildAlignedTo(remote, hops)) - { - return false; - } - } - LogInfo(Name(), " building path to ", remote); - Build(hops); - return true; - } - - bool - Builder::SelectHops(llarp_nodedb* nodedb, std::vector& hops, PathRole roles) - { - std::set exclude; - for (size_t idx = 0; idx < hops.size(); ++idx) - { - hops[idx].Clear(); - size_t tries = 32; - while (tries > 0 && !SelectHop(nodedb, exclude, hops[idx], idx, roles)) - { - --tries; - } - if (tries == 0 || hops[idx].pubkey.IsZero()) - { - LogWarn(Name(), " failed to select hop ", idx); - return false; - } - exclude.emplace(hops[idx].pubkey); - } - return true; + return false; } llarp_time_t @@ -428,7 +370,7 @@ namespace llarp } void - Builder::Build(const std::vector& hops, PathRole roles) + Builder::Build(std::vector hops, PathRole roles) { if (IsStopped()) return; diff --git a/llarp/path/pathbuilder.hpp b/llarp/path/pathbuilder.hpp index af8b16068..dce412621 100644 --- a/llarp/path/pathbuilder.hpp +++ b/llarp/path/pathbuilder.hpp @@ -30,12 +30,6 @@ namespace llarp void DoPathBuildBackoff(); - bool - DoUrgentBuildAlignedTo(const RouterID remote, std::vector& hops); - - bool - DoBuildAlignedTo(const RouterID remote, std::vector& hops); - public: AbstractRouter* m_router; SecretKey enckey; @@ -51,14 +45,6 @@ namespace llarp util::StatusObject ExtractStatus() const; - bool - SelectHop( - llarp_nodedb* db, - const std::set& prev, - RouterContact& cur, - size_t hop, - PathRole roles) override; - bool ShouldBuildMore(llarp_time_t now) const override; @@ -107,11 +93,18 @@ namespace llarp bool BuildOneAlignedTo(const RouterID endpoint) override; + virtual std::optional> + GetHopsAlignedToForBuild(RouterID endpoint); + void - Build(const std::vector& hops, PathRole roles = ePathRoleAny) override; + Build(std::vector hops, PathRole roles = ePathRoleAny) override; - bool - SelectHops(llarp_nodedb* db, std::vector& hops, PathRole roles = ePathRoleAny); + /// pick a first hop + virtual std::optional + SelectFirstHop() const; + + std::optional> + GetHopsForBuild() override; void ManualRebuild(size_t N, PathRole roles = ePathRoleAny); diff --git a/llarp/path/pathset.hpp b/llarp/path/pathset.hpp index 56f9dd2ca..55294b1e5 100644 --- a/llarp/path/pathset.hpp +++ b/llarp/path/pathset.hpp @@ -14,11 +14,10 @@ #include #include -struct llarp_nodedb; - namespace llarp { struct RouterContact; + class NodeDB; namespace dht { @@ -109,7 +108,7 @@ namespace llarp /// manual build on these hops virtual void - Build(const std::vector& hops, PathRole roles = ePathRoleAny) = 0; + Build(std::vector hops, PathRole roles = ePathRoleAny) = 0; /// tick owned paths virtual void @@ -261,20 +260,15 @@ namespace llarp virtual void ResetInternalState() = 0; - virtual bool - SelectHop( - llarp_nodedb* db, - const std::set& prev, - RouterContact& cur, - size_t hop, - PathRole roles) = 0; - virtual bool BuildOneAlignedTo(const RouterID endpoint) = 0; virtual void SendPacketToRemote(const llarp_buffer_t& pkt) = 0; + virtual std::optional> + GetHopsForBuild() = 0; + void ForEachPath(std::function visit) const { diff --git a/llarp/peerstats/types.cpp b/llarp/peerstats/types.cpp index 4cd793b0a..3a726b653 100644 --- a/llarp/peerstats/types.cpp +++ b/llarp/peerstats/types.cpp @@ -1,9 +1,11 @@ #include #include -#include +#include #include +namespace lokimq = oxenmq; + namespace llarp { constexpr auto RouterIdKey = "routerId"; diff --git a/llarp/router/abstractrouter.hpp b/llarp/router/abstractrouter.hpp index 08c60325e..5ee99d1a5 100644 --- a/llarp/router/abstractrouter.hpp +++ b/llarp/router/abstractrouter.hpp @@ -20,16 +20,15 @@ struct llarp_buffer_t; struct llarp_dht_context; -struct llarp_nodedb; -struct llarp_threadpool; -namespace lokimq +namespace oxenmq { - class LokiMQ; + class OxenMQ; } namespace llarp { + class NodeDB; class Logic; struct Config; struct RouterID; @@ -80,7 +79,7 @@ namespace llarp class Platform; } - using LMQ_ptr = std::shared_ptr; + using LMQ_ptr = std::shared_ptr; struct AbstractRouter { @@ -108,7 +107,7 @@ namespace llarp virtual llarp_dht_context* dht() const = 0; - virtual llarp_nodedb* + virtual std::shared_ptr nodedb() const = 0; virtual const path::PathContext& @@ -178,7 +177,7 @@ namespace llarp Sign(Signature& sig, const llarp_buffer_t& buf) const = 0; virtual bool - Configure(std::shared_ptr conf, bool isRouter, llarp_nodedb* nodedb) = 0; + Configure(std::shared_ptr conf, bool isRouter, std::shared_ptr nodedb) = 0; virtual bool IsServiceNode() const = 0; diff --git a/llarp/router/outbound_session_maker.cpp b/llarp/router/outbound_session_maker.cpp index 1102e622f..2e2bce2b5 100644 --- a/llarp/router/outbound_session_maker.cpp +++ b/llarp/router/outbound_session_maker.cpp @@ -123,8 +123,14 @@ namespace llarp std::set exclude; do { + auto filter = [exclude](const auto& rc) -> bool { return exclude.count(rc.pubkey) == 0; }; + RouterContact other; - if (not _nodedb->select_random_hop_excluding(other, exclude)) + if (const auto maybe = _nodedb->GetIf(filter); maybe.has_value()) + { + other = *maybe; + } + else break; exclude.insert(other.pubkey); @@ -158,14 +164,13 @@ namespace llarp I_RCLookupHandler* rcLookup, Profiling* profiler, std::shared_ptr logic, - llarp_nodedb* nodedb, WorkerFunc_t dowork) { _router = router; _linkManager = linkManager; _rcLookup = rcLookup; _logic = logic; - _nodedb = nodedb; + _nodedb = router->nodedb(); _profiler = profiler; work = dowork; } diff --git a/llarp/router/outbound_session_maker.hpp b/llarp/router/outbound_session_maker.hpp index 063594cbd..8933be01a 100644 --- a/llarp/router/outbound_session_maker.hpp +++ b/llarp/router/outbound_session_maker.hpp @@ -13,8 +13,6 @@ #include #include -struct llarp_nodedb; - namespace llarp { struct PendingSession; @@ -63,7 +61,6 @@ namespace llarp I_RCLookupHandler* rcLookup, Profiling* profiler, std::shared_ptr logic, - llarp_nodedb* nodedb, WorkerFunc_t work); void @@ -115,7 +112,7 @@ namespace llarp ILinkManager* _linkManager = nullptr; I_RCLookupHandler* _rcLookup = nullptr; Profiling* _profiler = nullptr; - llarp_nodedb* _nodedb = nullptr; + std::shared_ptr _nodedb; std::shared_ptr _logic; WorkerFunc_t work; RouterID us; diff --git a/llarp/router/rc_lookup_handler.cpp b/llarp/router/rc_lookup_handler.cpp index d696bdb70..9c4dcd6de 100644 --- a/llarp/router/rc_lookup_handler.cpp +++ b/llarp/router/rc_lookup_handler.cpp @@ -61,8 +61,9 @@ namespace llarp RouterContact remoteRC; if (not forceLookup) { - if (_nodedb->Get(router, remoteRC)) + if (const auto maybe = _nodedb->Get(router); maybe.has_value()) { + remoteRC = *maybe; if (callback) { callback(router, &remoteRC, RCRequestResult::Success); @@ -155,7 +156,8 @@ namespace llarp if (rc.IsPublicRouter()) { LogDebug("Adding or updating RC for ", RouterID(rc.pubkey), " to nodedb and dht."); - _nodedb->UpdateAsyncIfNewer(rc); + const RouterContact copy{rc}; + LogicCall(_logic, [copy, n = _nodedb]() { n->PutIfNewer(copy); }); _dht->impl->PutRCNodeAsync(rc); } @@ -210,7 +212,7 @@ namespace llarp RCLookupHandler::PeriodicUpdate(llarp_time_t now) { // try looking up stale routers - std::set routersToLookUp; + std::unordered_set routersToLookUp; _nodedb->VisitInsertedBefore( [&](const RouterContact& rc) { @@ -231,7 +233,7 @@ namespace llarp void RCLookupHandler::ExploreNetwork() { - const size_t known = _nodedb->num_loaded(); + const size_t known = _nodedb->NumLoaded(); if (_bootstrapRCList.empty() && known == 0) { LogError("we have no bootstrap nodes specified"); @@ -298,17 +300,19 @@ namespace llarp void RCLookupHandler::Init( llarp_dht_context* dht, - llarp_nodedb* nodedb, + std::shared_ptr nodedb, + std::shared_ptr logic, WorkerFunc_t dowork, ILinkManager* linkManager, service::Context* hiddenServiceContext, - const std::set& strictConnectPubkeys, + const std::unordered_set& strictConnectPubkeys, const std::set& bootstrapRCList, bool useWhitelist_arg, bool isServiceNode_arg) { _dht = dht; _nodedb = nodedb; + _logic = logic; _work = dowork; _hiddenServiceContext = hiddenServiceContext; _strictConnectPubkeys = strictConnectPubkeys; diff --git a/llarp/router/rc_lookup_handler.hpp b/llarp/router/rc_lookup_handler.hpp index 45a5169c9..084142965 100644 --- a/llarp/router/rc_lookup_handler.hpp +++ b/llarp/router/rc_lookup_handler.hpp @@ -8,13 +8,16 @@ #include #include +#include #include -struct llarp_nodedb; struct llarp_dht_context; namespace llarp { + class NodeDB; + class Logic; + namespace service { struct Context; @@ -72,11 +75,12 @@ namespace llarp void Init( llarp_dht_context* dht, - llarp_nodedb* nodedb, + std::shared_ptr nodedb, + std::shared_ptr logic, WorkerFunc_t dowork, ILinkManager* linkManager, service::Context* hiddenServiceContext, - const std::set& strictConnectPubkeys, + const std::unordered_set& strictConnectPubkeys, const std::set& bootstrapRCList, bool useWhitelist_arg, bool isServiceNode_arg); @@ -98,17 +102,18 @@ namespace llarp mutable util::Mutex _mutex; // protects pendingCallbacks, whitelistRouters llarp_dht_context* _dht = nullptr; - llarp_nodedb* _nodedb = nullptr; + std::shared_ptr _nodedb; + std::shared_ptr _logic; WorkerFunc_t _work = nullptr; service::Context* _hiddenServiceContext = nullptr; ILinkManager* _linkManager = nullptr; /// explicit whitelist of routers we will connect to directly (not for /// service nodes) - std::set _strictConnectPubkeys; + std::unordered_set _strictConnectPubkeys; std::set _bootstrapRCList; - std::set _bootstrapRouterIDList; + std::unordered_set _bootstrapRouterIDList; std::unordered_map pendingCallbacks GUARDED_BY(_mutex); @@ -116,7 +121,7 @@ namespace llarp bool useWhitelist = false; bool isServiceNode = false; - std::set whitelistRouters GUARDED_BY(_mutex); + std::unordered_set whitelistRouters GUARDED_BY(_mutex); using TimePoint = std::chrono::steady_clock::time_point; std::unordered_map _routerLookupTimes; diff --git a/llarp/router/router.cpp b/llarp/router/router.cpp index 09144f6d6..551bf43e6 100644 --- a/llarp/router/router.cpp +++ b/llarp/router/router.cpp @@ -40,7 +40,7 @@ #include #endif -#include +#include static constexpr std::chrono::milliseconds ROUTER_TICK_INTERVAL = 1s; @@ -49,9 +49,9 @@ namespace llarp Router::Router( llarp_ev_loop_ptr __netloop, std::shared_ptr l, - std::unique_ptr vpnPlatform) + std::shared_ptr vpnPlatform) : ready(false) - , m_lmq(std::make_shared()) + , m_lmq(std::make_shared()) , _netloop(std::move(__netloop)) , _logic(std::move(l)) , _vpnPlatform(std::move(vpnPlatform)) @@ -93,7 +93,7 @@ namespace llarp peerStatsObj = m_peerDb->ExtractStatus(); return util::StatusObject{{"running", true}, - {"numNodesKnown", _nodedb->num_loaded()}, + {"numNodesKnown", _nodedb->NumLoaded()}, {"dht", _dht->impl->ExtractStatus()}, {"services", _hiddenServiceContext.ExtractStatus()}, {"exit", _exitContext.ExtractStatus()}, @@ -150,19 +150,13 @@ namespace llarp return _rcLookupHandler.GetRandomWhitelistRouter(router); } - auto pick_router = [&](auto& collection) -> bool { - const auto sz = collection.size(); - auto itr = collection.begin(); - if (sz == 0) - return false; - if (sz > 1) - std::advance(itr, randint() % sz); - router = itr->first; + if (const auto maybe = nodedb()->GetIf([](const auto&) -> bool { return true; }, true); + maybe.has_value()) + { + router = maybe->pubkey; return true; - }; - - util::Lock l{nodedb()->access}; - return pick_router(nodedb()->entries); + } + return false; } void @@ -270,7 +264,7 @@ namespace llarp } bool - Router::Configure(std::shared_ptr c, bool isRouter, llarp_nodedb* nodedb) + Router::Configure(std::shared_ptr c, bool isRouter, std::shared_ptr nodedb) { m_Config = c; auto& conf = *m_Config; @@ -470,7 +464,7 @@ namespace llarp /// build a set of strictConnectPubkeys ( /// TODO: make this consistent with config -- do we support multiple strict connections // or not? - std::set strictConnectPubkeys; + std::unordered_set strictConnectPubkeys; if (not networkConfig.m_strictConnect.empty()) { const auto& val = networkConfig.m_strictConnect; @@ -562,12 +556,12 @@ namespace llarp &_rcLookupHandler, &_routerProfiling, _logic, - _nodedb, util::memFn(&AbstractRouter::QueueWork, this)); _linkManager.Init(&_outboundSessionMaker); _rcLookupHandler.Init( _dht, _nodedb, + _logic, util::memFn(&AbstractRouter::QueueWork, this), &_linkManager, &_hiddenServiceContext, @@ -691,7 +685,7 @@ namespace llarp Router::ReportStats() { const auto now = Now(); - LogInfo(nodedb()->num_loaded(), " RCs loaded"); + LogInfo(nodedb()->NumLoaded(), " RCs loaded"); LogInfo(bootstrapRCList.size(), " bootstrap peers"); LogInfo(NumberOfConnectedRouters(), " router connections"); if (IsServiceNode()) @@ -719,13 +713,13 @@ namespace llarp ss << "WATCHDOG=1\nSTATUS=v" << llarp::VERSION_STR; if (IsServiceNode()) { - ss << " snode | known/svc/clients: " << nodedb()->num_loaded() << "/" + ss << " snode | known/svc/clients: " << nodedb()->NumLoaded() << "/" << NumberOfConnectedRouters() << "/" << NumberOfConnectedClients() << " | " << pathContext().CurrentTransitPaths() << " active paths"; } else { - ss << " client | known/connected: " << nodedb()->num_loaded() << "/" + ss << " client | known/connected: " << nodedb()->NumLoaded() << "/" << NumberOfConnectedRouters() << " | path success: "; hiddenServiceContext().ForEachService([&ss](const auto& name, const auto& ep) { ss << " [" << name << " " << std::setprecision(4) @@ -852,11 +846,8 @@ namespace llarp { QueueDiskIO([&]() { routerProfiling().Save(_profilesFile); }); } - // save nodedb - if (nodedb()->ShouldSaveToDisk(now)) - { - nodedb()->AsyncFlushToDisk(); - } + + _nodedb->Tick(now); if (m_peerDb) { @@ -908,10 +899,11 @@ namespace llarp LogInfo("Session to ", remote, " fully closed"); if (IsServiceNode()) return; - RouterContact rc; - if (not nodedb()->Get(remote, rc)) - return; - m_RoutePoker.DelRoute(rc.addrs[0].toIpAddress().toIP()); + if (const auto maybe = nodedb()->Get(remote); maybe.has_value()) + { + for (const auto& addr : maybe->addrs) + m_RoutePoker.DelRoute(addr.toIpAddress().toIP()); + } } void @@ -1097,31 +1089,20 @@ namespace llarp } { - ssize_t loaded = _nodedb->LoadAll(); - llarp::LogInfo("loaded ", loaded, " RCs"); - if (loaded < 0) - { - // shouldn't be possible - return false; - } + LogInfo("Loading nodedb from disk..."); + _nodedb->LoadFromDisk(); } llarp_dht_context_start(dht(), pubkey()); for (const auto& rc : bootstrapRCList) { - if (this->nodedb()->Insert(rc)) - { - LogInfo("added bootstrap node ", RouterID(rc.pubkey)); - } - else - { - LogError("Failed to add bootstrap node ", RouterID(rc.pubkey)); - } + nodedb()->Put(rc); _dht->impl->Nodes()->PutNode(rc); + LogInfo("added bootstrap node ", RouterID{rc.pubkey}); } - LogInfo("have ", _nodedb->num_loaded(), " routers"); + LogInfo("have ", _nodedb->NumLoaded(), " routers"); #ifdef _WIN32 // windows uses proactor event loop so we need to constantly pump @@ -1165,7 +1146,7 @@ namespace llarp Router::AfterStopIssued() { StopLinks(); - nodedb()->AsyncFlushToDisk(); + nodedb()->SaveToDisk(); _logic->call_later(200ms, std::bind(&Router::AfterStopLinks, this)); } diff --git a/llarp/router/router.hpp b/llarp/router/router.hpp index 609e6aa38..31151719e 100644 --- a/llarp/router/router.hpp +++ b/llarp/router/router.hpp @@ -46,7 +46,7 @@ #include #include -#include +#include namespace llarp { @@ -103,7 +103,7 @@ namespace llarp util::StatusObject ExtractStatus() const override; - llarp_nodedb* + std::shared_ptr nodedb() const override { return _nodedb; @@ -182,13 +182,13 @@ namespace llarp llarp_ev_loop_ptr _netloop; std::shared_ptr _logic; - std::unique_ptr _vpnPlatform; + std::shared_ptr _vpnPlatform; path::PathContext paths; exit::Context _exitContext; SecretKey _identity; SecretKey _encryption; llarp_dht_context* _dht = nullptr; - llarp_nodedb* _nodedb; + std::shared_ptr _nodedb; llarp_time_t _startedAt; const lokimq::TaggedThreadID m_DiskThread; @@ -329,7 +329,7 @@ namespace llarp explicit Router( llarp_ev_loop_ptr __netloop, std::shared_ptr logic, - std::unique_ptr vpnPlatform); + std::shared_ptr vpnPlatform); virtual ~Router() override; @@ -358,7 +358,7 @@ namespace llarp Close(); bool - Configure(std::shared_ptr conf, bool isRouter, llarp_nodedb* nodedb = nullptr) override; + Configure(std::shared_ptr conf, bool isRouter, std::shared_ptr nodedb) override; bool StartRpcServer() override; diff --git a/llarp/router_contact.cpp b/llarp/router_contact.cpp index 88b8102e4..cfb452d15 100644 --- a/llarp/router_contact.cpp +++ b/llarp/router_contact.cpp @@ -10,7 +10,7 @@ #include #include -#include +#include #include #include @@ -250,7 +250,7 @@ namespace llarp try { std::string_view buf_view(reinterpret_cast(buf->cur), buf->size_left()); - lokimq::bt_list_consumer btlist(buf_view); + oxenmq::bt_list_consumer btlist(buf_view); uint64_t outer_version = btlist.consume_integer(); @@ -284,7 +284,7 @@ namespace llarp } bool - RouterContact::DecodeVersion_1(lokimq::bt_list_consumer& btlist) + RouterContact::DecodeVersion_1(oxenmq::bt_list_consumer& btlist) { auto signature_string = btlist.consume_string_view(); signed_bt_dict = btlist.consume_dict_data(); diff --git a/llarp/router_contact.hpp b/llarp/router_contact.hpp index be29e761b..2fb9cbfe2 100644 --- a/llarp/router_contact.hpp +++ b/llarp/router_contact.hpp @@ -17,10 +17,10 @@ #define MAX_RC_SIZE (1024) #define NICKLEN (32) -namespace lokimq +namespace oxenmq { class bt_list_consumer; -} // namespace lokimq +} // namespace oxenmq namespace llarp { @@ -230,7 +230,7 @@ namespace llarp DecodeVersion_0(llarp_buffer_t* buf); bool - DecodeVersion_1(lokimq::bt_list_consumer& btlist); + DecodeVersion_1(oxenmq::bt_list_consumer& btlist); }; inline std::ostream& diff --git a/llarp/router_id.cpp b/llarp/router_id.cpp index 7b0d96731..c177fe4c5 100644 --- a/llarp/router_id.cpp +++ b/llarp/router_id.cpp @@ -1,5 +1,7 @@ #include -#include +#include + +namespace lokimq = oxenmq; namespace llarp { diff --git a/llarp/router_id.hpp b/llarp/router_id.hpp index 81f2b34b1..d8ed32b2c 100644 --- a/llarp/router_id.hpp +++ b/llarp/router_id.hpp @@ -57,4 +57,17 @@ namespace llarp } // namespace llarp +namespace std +{ + template <> + struct hash + { + size_t + operator()(const llarp::RouterID& id) const + { + const llarp::RouterID::Hash h{}; + return h(id); + } + }; +} // namespace std #endif diff --git a/llarp/rpc/endpoint_rpc.cpp b/llarp/rpc/endpoint_rpc.cpp index 85830d507..59d876a74 100644 --- a/llarp/rpc/endpoint_rpc.cpp +++ b/llarp/rpc/endpoint_rpc.cpp @@ -23,11 +23,11 @@ namespace llarp::rpc return; m_LMQ->connect_remote( m_AuthURL, - [self = shared_from_this()](lokimq::ConnectionID c) { + [self = shared_from_this()](oxenmq::ConnectionID c) { self->m_Conn = std::move(c); LogInfo("connected to endpoint auth server via ", *self->m_Conn); }, - [self = shared_from_this()](lokimq::ConnectionID, std::string_view fail) { + [self = shared_from_this()](oxenmq::ConnectionID, std::string_view fail) { LogWarn("failed to connect to endpoint auth server: ", fail); self->m_Endpoint->RouterLogic()->call_later(1s, [self]() { self->Start(); }); }); diff --git a/llarp/rpc/endpoint_rpc.hpp b/llarp/rpc/endpoint_rpc.hpp index ca0b2e45b..39cc8ae56 100644 --- a/llarp/rpc/endpoint_rpc.hpp +++ b/llarp/rpc/endpoint_rpc.hpp @@ -1,7 +1,7 @@ #pragma once #include -#include +#include namespace llarp::service { @@ -13,7 +13,7 @@ namespace llarp::rpc struct EndpointAuthRPC : public llarp::service::IAuthPolicy, public std::enable_shared_from_this { - using LMQ_ptr = std::shared_ptr; + using LMQ_ptr = std::shared_ptr; using Endpoint_ptr = std::shared_ptr; using Whitelist_t = std::unordered_set; @@ -39,6 +39,6 @@ namespace llarp::rpc const Whitelist_t m_AuthWhitelist; LMQ_ptr m_LMQ; Endpoint_ptr m_Endpoint; - std::optional m_Conn; + std::optional m_Conn; }; } // namespace llarp::rpc diff --git a/llarp/rpc/lokid_rpc_client.hpp b/llarp/rpc/lokid_rpc_client.hpp index 3354316c8..af4594747 100644 --- a/llarp/rpc/lokid_rpc_client.hpp +++ b/llarp/rpc/lokid_rpc_client.hpp @@ -2,19 +2,21 @@ #include -#include -#include +#include +#include #include #include #include +namespace lokimq = oxenmq; + namespace llarp { struct AbstractRouter; namespace rpc { - using LMQ_ptr = std::shared_ptr; + using LMQ_ptr = std::shared_ptr; /// The LokidRpcClient uses loki-mq to talk to make API requests to lokid. struct LokidRpcClient : public std::enable_shared_from_this diff --git a/llarp/rpc/rpc_server.cpp b/llarp/rpc/rpc_server.cpp index 7a4e0da27..7cda55e6d 100644 --- a/llarp/rpc/rpc_server.cpp +++ b/llarp/rpc/rpc_server.cpp @@ -55,7 +55,7 @@ namespace llarp::rpc void HandleJSONRequest( - lokimq::Message& msg, std::function handleRequest) + oxenmq::Message& msg, std::function handleRequest) { const auto maybe = MaybeParseJSON(msg); if (not maybe.has_value()) @@ -70,10 +70,8 @@ namespace llarp::rpc } try { - std::promise reply; - handleRequest(*maybe, [&reply](std::string result) { reply.set_value(result); }); - auto ftr = reply.get_future(); - msg.send_reply(ftr.get()); + handleRequest( + *maybe, [defer = msg.send_later()](std::string result) { defer.reply(result); }); } catch (std::exception& ex) { @@ -82,13 +80,13 @@ namespace llarp::rpc } void - RpcServer::AsyncServeRPC(lokimq::address url) + RpcServer::AsyncServeRPC(oxenmq::address url) { m_LMQ->listen_plain(url.zmq_address()); - m_LMQ->add_category("llarp", lokimq::AuthLevel::none) + m_LMQ->add_category("llarp", oxenmq::AuthLevel::none) .add_command( "halt", - [&](lokimq::Message& msg) { + [&](oxenmq::Message& msg) { if (not m_Router->IsRunning()) { msg.send_reply(CreateJSONError("router is not running")); @@ -99,25 +97,30 @@ namespace llarp::rpc }) .add_request_command( "version", - [r = m_Router](lokimq::Message& msg) { + [r = m_Router](oxenmq::Message& msg) { util::StatusObject result{{"version", llarp::VERSION_FULL}, {"uptime", to_json(r->Uptime())}}; msg.send_reply(CreateJSONResponse(result)); }) .add_request_command( "status", - [&](lokimq::Message& msg) { - std::promise result; - LogicCall(m_Router->logic(), [&result, r = m_Router]() { - const auto state = r->ExtractStatus(); - result.set_value(state); + [&](oxenmq::Message& msg) { + LogicCall(m_Router->logic(), [defer = msg.send_later(), r = m_Router]() { + std::string data; + if (r->IsRunning()) + { + data = CreateJSONResponse(r->ExtractStatus()); + } + else + { + data = CreateJSONError("router not yet ready"); + } + defer.reply(data); }); - auto ftr = result.get_future(); - msg.send_reply(CreateJSONResponse(ftr.get())); }) .add_request_command( "exit", - [&](lokimq::Message& msg) { + [&](oxenmq::Message& msg) { HandleJSONRequest(msg, [r = m_Router](nlohmann::json obj, ReplyFunction_t reply) { if (r->IsServiceNode()) { @@ -261,7 +264,7 @@ namespace llarp::rpc }); }); }) - .add_request_command("config", [&](lokimq::Message& msg) { + .add_request_command("config", [&](oxenmq::Message& msg) { HandleJSONRequest(msg, [r = m_Router](nlohmann::json obj, ReplyFunction_t reply) { { const auto itr = obj.find("override"); diff --git a/llarp/rpc/rpc_server.hpp b/llarp/rpc/rpc_server.hpp index 8575b20ec..c0e0b2388 100644 --- a/llarp/rpc/rpc_server.hpp +++ b/llarp/rpc/rpc_server.hpp @@ -1,17 +1,19 @@ #pragma once #include -#include -#include +#include +#include namespace llarp { struct AbstractRouter; } +namespace lokimq = oxenmq; + namespace llarp::rpc { - using LMQ_ptr = std::shared_ptr; + using LMQ_ptr = std::shared_ptr; struct RpcServer { diff --git a/llarp/service/address.cpp b/llarp/service/address.cpp index 4434e1c5a..af6bd922a 100644 --- a/llarp/service/address.cpp +++ b/llarp/service/address.cpp @@ -1,77 +1,75 @@ #include #include -#include +#include #include -namespace llarp +namespace lokimq = oxenmq; + +namespace llarp::service { - namespace service + const std::set Address::AllowedTLDs = {".loki", ".snode"}; + + bool + Address::PermitTLD(const char* tld) { - const std::set Address::AllowedTLDs = {".loki", ".snode"}; + std::string gtld(tld); + std::transform(gtld.begin(), gtld.end(), gtld.begin(), ::tolower); + return AllowedTLDs.count(gtld) != 0; + } - bool - Address::PermitTLD(const char* tld) + std::string + Address::ToString(const char* tld) const + { + if (!PermitTLD(tld)) + return ""; + std::string str; + if (subdomain.size()) { - std::string gtld(tld); - std::transform(gtld.begin(), gtld.end(), gtld.begin(), ::tolower); - return AllowedTLDs.count(gtld) != 0; + str = subdomain; + str += '.'; } + str += lokimq::to_base32z(begin(), end()); + str += tld; + return str; + } - std::string - Address::ToString(const char* tld) const - { - if (!PermitTLD(tld)) - return ""; - std::string str; - if (subdomain.size()) - { - str = subdomain; - str += '.'; - } - str += lokimq::to_base32z(begin(), end()); - str += tld; - return str; - } + bool + Address::FromString(std::string_view str, const char* tld) + { + if (!PermitTLD(tld)) + return false; + // Find, validate, and remove the .tld + const auto pos = str.find_last_of('.'); + if (pos == std::string::npos) + return false; + if (str.substr(pos) != tld) + return false; + str = str.substr(0, pos); - bool - Address::FromString(std::string_view str, const char* tld) + // copy subdomains if they are there (and strip them off) + const auto idx = str.find_last_of('.'); + if (idx != std::string::npos) { - if (!PermitTLD(tld)) - return false; - // Find, validate, and remove the .tld - const auto pos = str.find_last_of('.'); - if (pos == std::string::npos) - return false; - if (str.substr(pos) != tld) - return false; - str = str.substr(0, pos); - - // copy subdomains if they are there (and strip them off) - const auto idx = str.find_last_of('.'); - if (idx != std::string::npos) - { - subdomain = str.substr(0, idx); - str.remove_prefix(idx + 1); - } - - // Ensure we have something valid: - // - must end in a 1-bit value: 'o' or 'y' (i.e. 10000 or 00000) - // - must have 51 preceeding base32z chars - // - thus we get 51*5+1 = 256 bits = 32 bytes of output - if (str.size() != 52 || !lokimq::is_base32z(str) || !(str.back() == 'o' || str.back() == 'y')) - return false; - - lokimq::from_base32z(str.begin(), str.end(), begin()); - return true; + subdomain = str.substr(0, idx); + str.remove_prefix(idx + 1); } - dht::Key_t - Address::ToKey() const - { - PubKey k; - CryptoManager::instance()->derive_subkey(k, PubKey(data()), 1); - return dht::Key_t{k.as_array()}; - } + // Ensure we have something valid: + // - must end in a 1-bit value: 'o' or 'y' (i.e. 10000 or 00000) + // - must have 51 preceeding base32z chars + // - thus we get 51*5+1 = 256 bits = 32 bytes of output + if (str.size() != 52 || !lokimq::is_base32z(str) || !(str.back() == 'o' || str.back() == 'y')) + return false; + + lokimq::from_base32z(str.begin(), str.end(), begin()); + return true; + } - } // namespace service -} // namespace llarp + dht::Key_t + Address::ToKey() const + { + PubKey k; + CryptoManager::instance()->derive_subkey(k, PubKey(data()), 1); + return dht::Key_t{k.as_array()}; + } +} // namespace llarp::service diff --git a/llarp/service/endpoint.cpp b/llarp/service/endpoint.cpp index 5ce5f714e..1a1783885 100644 --- a/llarp/service/endpoint.cpp +++ b/llarp/service/endpoint.cpp @@ -647,24 +647,65 @@ namespace llarp } } - bool - Endpoint::SelectHop( - llarp_nodedb* db, - const std::set& prev, - RouterContact& cur, - size_t hop, - path::PathRole roles) + std::optional> + Endpoint::GetHopsForBuild() + { + std::unordered_set exclude; + ForEachPath([&exclude](auto path) { exclude.insert(path->Endpoint()); }); + const auto maybe = m_router->nodedb()->GetIf( + [exclude](const auto& rc) -> bool { return exclude.count(rc.pubkey) == 0; }); + if (not maybe.has_value()) + return std::nullopt; + return GetHopsForBuildWithEndpoint(maybe->pubkey); + } + std::optional> + Endpoint::GetHopsForBuildWithEndpoint(RouterID endpoint) { - std::set exclude = prev; - for (const auto& snode : SnodeBlacklist()) - exclude.insert(snode); - if (hop == numHops - 1 and numHops > 1) + std::vector hops; + // get first hop + if (const auto maybe = SelectFirstHop(); maybe.has_value()) { - // diversify endpoints - ForEachPath([&exclude](const path::Path_ptr& path) { exclude.insert(path->Endpoint()); }); + hops.emplace_back(*maybe); } - return path::Builder::SelectHop(db, exclude, cur, hop, roles); + else + return std::nullopt; + + auto filter = + [endpoint, &hops, blacklist = SnodeBlacklist(), r = m_router](const auto& rc) -> bool { + if (blacklist.count(rc.pubkey) > 0) + return false; + + if (r->routerProfiling().IsBadForPath(rc.pubkey)) + return false; + + for (const auto& hop : hops) + { + if (hop.pubkey == rc.pubkey) + return false; + } + return endpoint != rc.pubkey; + }; + + for (size_t idx = hops.size(); idx < numHops; ++idx) + { + if (idx + 1 == numHops) + { + if (const auto maybe = m_router->nodedb()->Get(endpoint)) + { + hops.emplace_back(*maybe); + } + else + return std::nullopt; + } + else if (const auto maybe = m_router->nodedb()->GetIf(filter); maybe.has_value()) + { + hops.emplace_back(*maybe); + } + else + return std::nullopt; + } + return hops; } void @@ -712,19 +753,18 @@ namespace llarp } void - Endpoint::HandleVerifyGotRouter(dht::GotRouterMessage_constptr msg, llarp_async_verify_rc* j) + Endpoint::HandleVerifyGotRouter(dht::GotRouterMessage_constptr msg, RouterID id, bool valid) { auto& pendingRouters = m_state->m_PendingRouters; - auto itr = pendingRouters.find(j->rc.pubkey); + auto itr = pendingRouters.find(id); if (itr != pendingRouters.end()) { - if (j->valid) + if (valid) itr->second.InformResult(msg->foundRCs); else itr->second.InformResult({}); pendingRouters.erase(itr); } - delete j; } bool @@ -732,16 +772,15 @@ namespace llarp { if (not msg->foundRCs.empty()) { - for (const auto& rc : msg->foundRCs) + for (auto rc : msg->foundRCs) { - llarp_async_verify_rc* job = new llarp_async_verify_rc(); - job->nodedb = Router()->nodedb(); - job->worker = util::memFn(&AbstractRouter::QueueWork, Router()); - job->disk = util::memFn(&AbstractRouter::QueueDiskIO, Router()); - job->logic = Router()->logic(); - job->hook = std::bind(&Endpoint::HandleVerifyGotRouter, this, msg, std::placeholders::_1); - job->rc = rc; - llarp_nodedb_async_verify(job); + Router()->QueueWork([rc = std::move(rc), logic = Router()->logic(), self = this, msg]() { + bool valid = rc.Verify(llarp::time_now_ms()); + LogicCall(logic, [self, valid, rc = std::move(rc), msg]() { + self->Router()->nodedb()->PutIfNewer(rc); + self->HandleVerifyGotRouter(msg, rc.pubkey, valid); + }); + }); } } else diff --git a/llarp/service/endpoint.hpp b/llarp/service/endpoint.hpp index 403d8cbfd..46c75ae4c 100644 --- a/llarp/service/endpoint.hpp +++ b/llarp/service/endpoint.hpp @@ -28,8 +28,6 @@ #define MIN_SHIFT_INTERVAL 5s #endif -struct llarp_async_verify_rc; - namespace llarp { namespace service @@ -364,13 +362,11 @@ namespace llarp bool HasExit() const; - bool - SelectHop( - llarp_nodedb* db, - const std::set& prev, - RouterContact& cur, - size_t hop, - path::PathRole roles) override; + std::optional> + GetHopsForBuild() override; + + std::optional> + GetHopsForBuildWithEndpoint(RouterID endpoint); virtual void PathBuildStarted(path::Path_ptr path) override; @@ -436,7 +432,7 @@ namespace llarp private: void - HandleVerifyGotRouter(dht::GotRouterMessage_constptr msg, llarp_async_verify_rc* j); + HandleVerifyGotRouter(dht::GotRouterMessage_constptr msg, RouterID id, bool valid); bool OnLookup(const service::Address& addr, std::optional i, const RouterID& endpoint); diff --git a/llarp/service/intro_set.cpp b/llarp/service/intro_set.cpp index ee5d5c1ec..427c17473 100644 --- a/llarp/service/intro_set.cpp +++ b/llarp/service/intro_set.cpp @@ -2,393 +2,390 @@ #include #include -#include +#include +namespace lokimq = oxenmq; -namespace llarp +namespace llarp::service { - namespace service + util::StatusObject + EncryptedIntroSet::ExtractStatus() const { - util::StatusObject - EncryptedIntroSet::ExtractStatus() const - { - const auto sz = introsetPayload.size(); - return {{"location", derivedSigningKey.ToString()}, - {"signedAt", to_json(signedAt)}, - {"size", sz}}; - } + const auto sz = introsetPayload.size(); + return { + {"location", derivedSigningKey.ToString()}, {"signedAt", to_json(signedAt)}, {"size", sz}}; + } + + bool + EncryptedIntroSet::BEncode(llarp_buffer_t* buf) const + { + if (not bencode_start_dict(buf)) + return false; + if (not BEncodeWriteDictEntry("d", derivedSigningKey, buf)) + return false; + if (not BEncodeWriteDictEntry("n", nounce, buf)) + return false; + if (not BEncodeWriteDictInt("s", signedAt.count(), buf)) + return false; + if (not bencode_write_bytestring(buf, "x", 1)) + return false; + if (not bencode_write_bytestring(buf, introsetPayload.data(), introsetPayload.size())) + return false; + if (not BEncodeWriteDictEntry("z", sig, buf)) + return false; + return bencode_end(buf); + } - bool - EncryptedIntroSet::BEncode(llarp_buffer_t* buf) const + bool + EncryptedIntroSet::DecodeKey(const llarp_buffer_t& key, llarp_buffer_t* buf) + { + bool read = false; + if (key == "x") { - if (not bencode_start_dict(buf)) + llarp_buffer_t strbuf; + if (not bencode_read_string(buf, &strbuf)) return false; - if (not BEncodeWriteDictEntry("d", derivedSigningKey, buf)) + if (strbuf.sz > MAX_INTROSET_SIZE) return false; - if (not BEncodeWriteDictEntry("n", nounce, buf)) - return false; - if (not BEncodeWriteDictInt("s", signedAt.count(), buf)) - return false; - if (not bencode_write_bytestring(buf, "x", 1)) - return false; - if (not bencode_write_bytestring(buf, introsetPayload.data(), introsetPayload.size())) - return false; - if (not BEncodeWriteDictEntry("z", sig, buf)) - return false; - return bencode_end(buf); + introsetPayload.resize(strbuf.sz); + std::copy_n(strbuf.base, strbuf.sz, introsetPayload.data()); + return true; } + if (not BEncodeMaybeReadDictEntry("d", derivedSigningKey, read, key, buf)) + return false; - bool - EncryptedIntroSet::DecodeKey(const llarp_buffer_t& key, llarp_buffer_t* buf) - { - bool read = false; - if (key == "x") - { - llarp_buffer_t strbuf; - if (not bencode_read_string(buf, &strbuf)) - return false; - if (strbuf.sz > MAX_INTROSET_SIZE) - return false; - introsetPayload.resize(strbuf.sz); - std::copy_n(strbuf.base, strbuf.sz, introsetPayload.data()); - return true; - } - if (not BEncodeMaybeReadDictEntry("d", derivedSigningKey, read, key, buf)) - return false; + if (not BEncodeMaybeReadDictEntry("n", nounce, read, key, buf)) + return false; - if (not BEncodeMaybeReadDictEntry("n", nounce, read, key, buf)) - return false; + if (not BEncodeMaybeReadDictInt("s", signedAt, read, key, buf)) + return false; - if (not BEncodeMaybeReadDictInt("s", signedAt, read, key, buf)) - return false; + if (not BEncodeMaybeReadDictEntry("z", sig, read, key, buf)) + return false; + return read; + } - if (not BEncodeMaybeReadDictEntry("z", sig, read, key, buf)) - return false; - return read; - } + bool + EncryptedIntroSet::OtherIsNewer(const EncryptedIntroSet& other) const + { + return signedAt < other.signedAt; + } - bool - EncryptedIntroSet::OtherIsNewer(const EncryptedIntroSet& other) const - { - return signedAt < other.signedAt; - } + std::ostream& + EncryptedIntroSet::print(std::ostream& out, int levels, int spaces) const + { + Printer printer(out, levels, spaces); + printer.printAttribute("d", derivedSigningKey); + printer.printAttribute("n", nounce); + printer.printAttribute("s", signedAt.count()); + printer.printAttribute("x", "[" + std::to_string(introsetPayload.size()) + " bytes]"); + printer.printAttribute("z", sig); + return out; + } + + std::optional + EncryptedIntroSet::MaybeDecrypt(const PubKey& root) const + { + SharedSecret k(root); + IntroSet i; + std::vector payload = introsetPayload; + llarp_buffer_t buf(payload); + CryptoManager::instance()->xchacha20(buf, k, nounce); + if (not i.BDecode(&buf)) + return {}; + return i; + } + + bool + EncryptedIntroSet::IsExpired(llarp_time_t now) const + { + return now >= signedAt + path::default_lifetime; + } - std::ostream& - EncryptedIntroSet::print(std::ostream& out, int levels, int spaces) const - { - Printer printer(out, levels, spaces); - printer.printAttribute("d", derivedSigningKey); - printer.printAttribute("n", nounce); - printer.printAttribute("s", signedAt.count()); - printer.printAttribute("x", "[" + std::to_string(introsetPayload.size()) + " bytes]"); - printer.printAttribute("z", sig); - return out; - } + bool + EncryptedIntroSet::Sign(const PrivateKey& k) + { + signedAt = llarp::time_now_ms(); + if (not k.toPublic(derivedSigningKey)) + return false; + sig.Zero(); + std::array tmp; + llarp_buffer_t buf(tmp); + if (not BEncode(&buf)) + return false; + buf.sz = buf.cur - buf.base; + buf.cur = buf.base; + if (not CryptoManager::instance()->sign(sig, k, buf)) + return false; + LogDebug("signed encrypted introset: ", *this); + return true; + } - std::optional - EncryptedIntroSet::MaybeDecrypt(const PubKey& root) const - { - SharedSecret k(root); - IntroSet i; - std::vector payload = introsetPayload; - llarp_buffer_t buf(payload); - CryptoManager::instance()->xchacha20(buf, k, nounce); - if (not i.BDecode(&buf)) - return {}; - return i; - } + bool + EncryptedIntroSet::Verify(llarp_time_t now) const + { + if (IsExpired(now)) + return false; + std::array tmp; + llarp_buffer_t buf(tmp); + EncryptedIntroSet copy(*this); + copy.sig.Zero(); + if (not copy.BEncode(&buf)) + return false; + LogDebug("verify encrypted introset: ", copy, " sig = ", sig); + buf.sz = buf.cur - buf.base; + buf.cur = buf.base; + return CryptoManager::instance()->verify(derivedSigningKey, buf, sig); + } + + util::StatusObject + IntroSet::ExtractStatus() const + { + util::StatusObject obj{{"published", to_json(T)}}; + std::vector introsObjs; + std::transform( + I.begin(), + I.end(), + std::back_inserter(introsObjs), + [](const auto& intro) -> util::StatusObject { return intro.ExtractStatus(); }); + obj["intros"] = introsObjs; + if (!topic.IsZero()) + obj["topic"] = topic.ToString(); + return obj; + } + + bool + IntroSet::DecodeKey(const llarp_buffer_t& key, llarp_buffer_t* buf) + { + bool read = false; + if (!BEncodeMaybeReadDictEntry("a", A, read, key, buf)) + return false; - bool - EncryptedIntroSet::IsExpired(llarp_time_t now) const + if (key == "i") { - return now >= signedAt + path::default_lifetime; + return BEncodeReadList(I, buf); } + if (!BEncodeMaybeReadDictEntry("k", K, read, key, buf)) + return false; - bool - EncryptedIntroSet::Sign(const PrivateKey& k) - { - signedAt = llarp::time_now_ms(); - if (not k.toPublic(derivedSigningKey)) - return false; - sig.Zero(); - std::array tmp; - llarp_buffer_t buf(tmp); - if (not BEncode(&buf)) - return false; - buf.sz = buf.cur - buf.base; - buf.cur = buf.base; - if (not CryptoManager::instance()->sign(sig, k, buf)) - return false; - LogDebug("signed encrypted introset: ", *this); - return true; - } + if (!BEncodeMaybeReadDictEntry("n", topic, read, key, buf)) + return false; - bool - EncryptedIntroSet::Verify(llarp_time_t now) const + if (key == "s") { - if (IsExpired(now)) - return false; - std::array tmp; - llarp_buffer_t buf(tmp); - EncryptedIntroSet copy(*this); - copy.sig.Zero(); - if (not copy.BEncode(&buf)) + byte_t* begin = buf->cur; + if (not bencode_discard(buf)) return false; - LogDebug("verify encrypted introset: ", copy, " sig = ", sig); - buf.sz = buf.cur - buf.base; - buf.cur = buf.base; - return CryptoManager::instance()->verify(derivedSigningKey, buf, sig); - } - util::StatusObject - IntroSet::ExtractStatus() const - { - util::StatusObject obj{{"published", to_json(T)}}; - std::vector introsObjs; - std::transform( - I.begin(), - I.end(), - std::back_inserter(introsObjs), - [](const auto& intro) -> util::StatusObject { return intro.ExtractStatus(); }); - obj["intros"] = introsObjs; - if (!topic.IsZero()) - obj["topic"] = topic.ToString(); - return obj; - } + byte_t* end = buf->cur; - bool - IntroSet::DecodeKey(const llarp_buffer_t& key, llarp_buffer_t* buf) - { - bool read = false; - if (!BEncodeMaybeReadDictEntry("a", A, read, key, buf)) - return false; + std::string_view srvString(reinterpret_cast(begin), end - begin); - if (key == "i") + try { - return BEncodeReadList(I, buf); + lokimq::bt_deserialize(srvString, SRVs); } - if (!BEncodeMaybeReadDictEntry("k", K, read, key, buf)) + catch (const lokimq::bt_deserialize_invalid& err) + { + LogError("Error decoding SRV records from IntroSet: ", err.what()); return false; + } + read = true; + } - if (!BEncodeMaybeReadDictEntry("n", topic, read, key, buf)) - return false; + if (!BEncodeMaybeReadDictInt("t", T, read, key, buf)) + return false; - if (key == "s") - { - byte_t* begin = buf->cur; - if (not bencode_discard(buf)) - return false; + if (key == "w") + { + W.emplace(); + return bencode_decode_dict(*W, buf); + } - byte_t* end = buf->cur; + if (!BEncodeMaybeReadDictInt("v", version, read, key, buf)) + return false; - std::string_view srvString(reinterpret_cast(begin), end - begin); + if (!BEncodeMaybeReadDictEntry("z", Z, read, key, buf)) + return false; - try - { - lokimq::bt_deserialize(srvString, SRVs); - } - catch (const lokimq::bt_deserialize_invalid& err) - { - LogError("Error decoding SRV records from IntroSet: ", err.what()); - return false; - } - read = true; - } + if (read) + return true; - if (!BEncodeMaybeReadDictInt("t", T, read, key, buf)) - return false; + return bencode_discard(buf); + } - if (key == "w") - { - W.emplace(); - return bencode_decode_dict(*W, buf); - } + bool + IntroSet::BEncode(llarp_buffer_t* buf) const + { + if (!bencode_start_dict(buf)) + return false; + if (!BEncodeWriteDictEntry("a", A, buf)) + return false; + // start introduction list + if (!bencode_write_bytestring(buf, "i", 1)) + return false; + if (!BEncodeWriteList(I.begin(), I.end(), buf)) + return false; + // end introduction list - if (!BEncodeMaybeReadDictInt("v", version, read, key, buf)) - return false; + // pq pubkey + if (!BEncodeWriteDictEntry("k", K, buf)) + return false; - if (!BEncodeMaybeReadDictEntry("z", Z, read, key, buf)) + // topic tag + if (topic.ToString().size()) + { + if (!BEncodeWriteDictEntry("n", topic, buf)) return false; - - if (read) - return true; - - return bencode_discard(buf); } - bool - IntroSet::BEncode(llarp_buffer_t* buf) const + if (SRVs.size()) { - if (!bencode_start_dict(buf)) - return false; - if (!BEncodeWriteDictEntry("a", A, buf)) - return false; - // start introduction list - if (!bencode_write_bytestring(buf, "i", 1)) + std::string serial = lokimq::bt_serialize(SRVs); + if (!bencode_write_bytestring(buf, "s", 1)) return false; - if (!BEncodeWriteList(I.begin(), I.end(), buf)) + if (!buf->write(serial.begin(), serial.end())) return false; - // end introduction list + } + + // Timestamp published + if (!BEncodeWriteDictInt("t", T.count(), buf)) + return false; - // pq pubkey - if (!BEncodeWriteDictEntry("k", K, buf)) + // write version + if (!BEncodeWriteDictInt("v", version, buf)) + return false; + if (W) + { + if (!BEncodeWriteDictEntry("w", *W, buf)) return false; + } + if (!BEncodeWriteDictEntry("z", Z, buf)) + return false; - // topic tag - if (topic.ToString().size()) - { - if (!BEncodeWriteDictEntry("n", topic, buf)) - return false; - } + return bencode_end(buf); + } - if (SRVs.size()) - { - std::string serial = lokimq::bt_serialize(SRVs); - if (!bencode_write_bytestring(buf, "s", 1)) - return false; - if (!buf->write(serial.begin(), serial.end())) - return false; - } + bool + IntroSet::HasExpiredIntros(llarp_time_t now) const + { + for (const auto& i : I) + if (now >= i.expiresAt) + return true; + return false; + } - // Timestamp published - if (!BEncodeWriteDictInt("t", T.count(), buf)) - return false; + bool + IntroSet::IsExpired(llarp_time_t now) const + { + return GetNewestIntroExpiration() < now; + } - // write version - if (!BEncodeWriteDictInt("v", version, buf)) - return false; - if (W) + std::vector + IntroSet::GetMatchingSRVRecords(std::string_view service_proto) const + { + std::vector records; + + for (const auto& tuple : SRVs) + { + if (std::get<0>(tuple) == service_proto) { - if (!BEncodeWriteDictEntry("w", *W, buf)) - return false; + records.push_back(llarp::dns::SRVData::fromTuple(tuple)); } - if (!BEncodeWriteDictEntry("z", Z, buf)) - return false; - - return bencode_end(buf); } - bool - IntroSet::HasExpiredIntros(llarp_time_t now) const + return records; + } + + bool + IntroSet::Verify(llarp_time_t now) const + { + std::array tmp; + llarp_buffer_t buf(tmp); + IntroSet copy; + copy = *this; + copy.Z.Zero(); + if (!copy.BEncode(&buf)) { - for (const auto& i : I) - if (now >= i.expiresAt) - return true; return false; } - - bool - IntroSet::IsExpired(llarp_time_t now) const + // rewind and resize buffer + buf.sz = buf.cur - buf.base; + buf.cur = buf.base; + if (!A.Verify(buf, Z)) { - return GetNewestIntroExpiration() < now; + return false; } - - std::vector - IntroSet::GetMatchingSRVRecords(std::string_view service_proto) const + // validate PoW + if (W && !W->IsValid(now)) { - std::vector records; - - for (const auto& tuple : SRVs) - { - if (std::get<0>(tuple) == service_proto) - { - records.push_back(llarp::dns::SRVData::fromTuple(tuple)); - } - } - - return records; + return false; } - - bool - IntroSet::Verify(llarp_time_t now) const + // valid timestamps + // add max clock skew + now += MAX_INTROSET_TIME_DELTA; + for (const auto& intro : I) { - std::array tmp; - llarp_buffer_t buf(tmp); - IntroSet copy; - copy = *this; - copy.Z.Zero(); - if (!copy.BEncode(&buf)) - { - return false; - } - // rewind and resize buffer - buf.sz = buf.cur - buf.base; - buf.cur = buf.base; - if (!A.Verify(buf, Z)) - { - return false; - } - // validate PoW - if (W && !W->IsValid(now)) + if (intro.expiresAt > now && intro.expiresAt - now > path::default_lifetime) { - return false; - } - // valid timestamps - // add max clock skew - now += MAX_INTROSET_TIME_DELTA; - for (const auto& intro : I) - { - if (intro.expiresAt > now && intro.expiresAt - now > path::default_lifetime) + if (!W) { - if (!W) - { - LogWarn("intro has too high expire time"); - return false; - } - if (intro.expiresAt - W->extendedLifetime > path::default_lifetime) - { - return false; - } + LogWarn("intro has too high expire time"); + return false; + } + if (intro.expiresAt - W->extendedLifetime > path::default_lifetime) + { + return false; } } - if (IsExpired(now)) - { - LogWarn("introset expired: ", *this); - return false; - } - return true; } - - llarp_time_t - IntroSet::GetNewestIntroExpiration() const + if (IsExpired(now)) { - llarp_time_t t = 0s; - for (const auto& intro : I) - t = std::max(intro.expiresAt, t); - return t; + LogWarn("introset expired: ", *this); + return false; } + return true; + } - std::ostream& - IntroSet::print(std::ostream& stream, int level, int spaces) const - { - Printer printer(stream, level, spaces); - printer.printAttribute("A", A); - printer.printAttribute("I", I); - printer.printAttribute("K", K); + llarp_time_t + IntroSet::GetNewestIntroExpiration() const + { + llarp_time_t t = 0s; + for (const auto& intro : I) + t = std::max(intro.expiresAt, t); + return t; + } + + std::ostream& + IntroSet::print(std::ostream& stream, int level, int spaces) const + { + Printer printer(stream, level, spaces); + printer.printAttribute("A", A); + printer.printAttribute("I", I); + printer.printAttribute("K", K); - std::string _topic = topic.ToString(); + std::string _topic = topic.ToString(); - if (!_topic.empty()) - { - printer.printAttribute("topic", _topic); - } - else - { - printer.printAttribute("topic", topic); - } - - printer.printAttribute("T", T.count()); - if (W) - { - printer.printAttribute("W", *W); - } - else - { - printer.printAttribute("W", "NULL"); - } - printer.printAttribute("V", version); - printer.printAttribute("Z", Z); + if (!_topic.empty()) + { + printer.printAttribute("topic", _topic); + } + else + { + printer.printAttribute("topic", topic); + } - return stream; + printer.printAttribute("T", T.count()); + if (W) + { + printer.printAttribute("W", *W); } - } // namespace service -} // namespace llarp + else + { + printer.printAttribute("W", "NULL"); + } + printer.printAttribute("V", version); + printer.printAttribute("Z", Z); + + return stream; + } +} // namespace llarp::service diff --git a/llarp/service/outbound_context.cpp b/llarp/service/outbound_context.cpp index 0ef24d5da..c98a41cac 100644 --- a/llarp/service/outbound_context.cpp +++ b/llarp/service/outbound_context.cpp @@ -334,33 +334,16 @@ namespace llarp : (now >= createdAt && now - createdAt > connectTimeout); } - bool - OutboundContext::SelectHop( - llarp_nodedb* db, - const std::set& prev, - RouterContact& cur, - size_t hop, - path::PathRole roles) + std::optional> + OutboundContext::GetHopsForBuild() { - if (m_NextIntro.router.IsZero() || prev.count(m_NextIntro.router)) + if (m_NextIntro.router.IsZero()) { ShiftIntroduction(false); } if (m_NextIntro.router.IsZero()) - return false; - std::set exclude = prev; - exclude.insert(m_NextIntro.router); - for (const auto& snode : m_Endpoint->SnodeBlacklist()) - exclude.insert(snode); - if (hop == numHops - 1) - { - m_Endpoint->EnsureRouterIsKnown(m_NextIntro.router); - if (db->Get(m_NextIntro.router, cur)) - return true; - ++m_BuildFails; - return false; - } - return path::Builder::SelectHop(db, exclude, cur, hop, roles); + return std::nullopt; + return GetHopsAlignedToForBuild(m_NextIntro.router); } bool diff --git a/llarp/service/outbound_context.hpp b/llarp/service/outbound_context.hpp index 81af90e12..6e06ad0a0 100644 --- a/llarp/service/outbound_context.hpp +++ b/llarp/service/outbound_context.hpp @@ -106,13 +106,8 @@ namespace llarp void HandlePathBuildFailed(path::Path_ptr path) override; - bool - SelectHop( - llarp_nodedb* db, - const std::set& prev, - RouterContact& cur, - size_t hop, - path::PathRole roles) override; + std::optional> + GetHopsForBuild() override; bool HandleHiddenServiceFrame(path::Path_ptr p, const ProtocolFrame& frame); diff --git a/llarp/util/aligned.hpp b/llarp/util/aligned.hpp index b3c79f9f6..0db12ea9e 100644 --- a/llarp/util/aligned.hpp +++ b/llarp/util/aligned.hpp @@ -6,7 +6,7 @@ #include #include -#include +#include #include #include @@ -71,7 +71,7 @@ namespace llarp friend std::ostream& operator<<(std::ostream& out, const AlignedBuffer& self) { - return out << lokimq::to_hex(self.begin(), self.end()); + return out << oxenmq::to_hex(self.begin(), self.end()); } /// bitwise NOT @@ -261,21 +261,21 @@ namespace llarp std::string ToHex() const { - return lokimq::to_hex(begin(), end()); + return oxenmq::to_hex(begin(), end()); } std::string ShortHex() const { - return lokimq::to_hex(begin(), begin() + 4); + return oxenmq::to_hex(begin(), begin() + 4); } bool FromHex(std::string_view str) { - if (str.size() != 2 * size() || !lokimq::is_hex(str)) + if (str.size() != 2 * size() || !oxenmq::is_hex(str)) return false; - lokimq::from_hex(str.begin(), str.end(), begin()); + oxenmq::from_hex(str.begin(), str.end(), begin()); return true; } diff --git a/llarp/vpn/platform.cpp b/llarp/vpn/platform.cpp index e8e9b35f3..74cfe8840 100644 --- a/llarp/vpn/platform.cpp +++ b/llarp/vpn/platform.cpp @@ -14,23 +14,23 @@ namespace llarp::vpn { - std::unique_ptr + std::shared_ptr MakeNativePlatform(llarp::Context* ctx) { (void)ctx; - std::unique_ptr plat; + std::shared_ptr plat; #ifdef _WIN32 - plat = std::make_unique(); + plat = std::make_shared(); #endif #ifdef __linux__ #ifdef ANDROID - plat = std::make_unique(); + plat = std::make_shared(); #else - plat = std::make_unique(); + plat = std::make_shared(); #endif #endif #ifdef __APPLE__ - plat = std::make_unique(); + plat = std::make_shared(); #endif return plat; } diff --git a/llarp/vpn/win32.hpp b/llarp/vpn/win32.hpp index 58d27fb84..a5eea375b 100644 --- a/llarp/vpn/win32.hpp +++ b/llarp/vpn/win32.hpp @@ -121,7 +121,7 @@ namespace llarp::vpn { std::atomic m_Run; HANDLE m_Device, m_IOCP; - std::vector m_Threads; + std::vector m_Threads; thread::Queue m_ReadQueue; const InterfaceInfo m_Info; @@ -334,7 +334,7 @@ namespace llarp::vpn CloseHandle(m_Device); // close the reader threads for (auto& thread : m_Threads) - CloseHandle(thread); + thread.join(); } int @@ -355,21 +355,22 @@ namespace llarp::vpn return ""; } - static DWORD FAR PASCAL - Loop(void* u) - { - static_cast(u)->ReadLoop(); - return 0; - } - void Start() { m_Run = true; const auto numThreads = std::thread::hardware_concurrency(); - m_IOCP = CreateIoCompletionPort(m_Device, nullptr, (ULONG_PTR)this, 1 + numThreads); + // allocate packets for (size_t idx = 0; idx < numThreads; ++idx) - m_Threads.push_back(CreateThread(nullptr, 0, &Loop, this, 0, nullptr)); + m_Packets.emplace_back(new asio_evt_pkt{true}); + + // create completion port + m_IOCP = CreateIoCompletionPort(INVALID_HANDLE_VALUE, nullptr, 0, 0); + // attach the handle or some shit + CreateIoCompletionPort(m_Device, m_IOCP, 0, 0); + // spawn reader threads + for (size_t idx = 0; idx < numThreads; ++idx) + m_Threads.emplace_back([this, idx]() { ReadLoop(idx); }); } net::IPPacket @@ -394,6 +395,8 @@ namespace llarp::vpn } }; + std::vector> m_Packets; + bool WritePacket(net::IPPacket pkt) { @@ -406,9 +409,9 @@ namespace llarp::vpn } void - ReadLoop() + ReadLoop(size_t packetIndex) { - std::unique_ptr ev = std::make_unique(true); + auto& ev = m_Packets[packetIndex]; ev->Read(m_Device); while (m_Run) { diff --git a/test/exit/test_llarp_exit_context.cpp b/test/exit/test_llarp_exit_context.cpp index 89e87e0d3..2b38b9c24 100644 --- a/test/exit/test_llarp_exit_context.cpp +++ b/test/exit/test_llarp_exit_context.cpp @@ -11,18 +11,18 @@ static const llarp::RuntimeOptions opts = {.background = false, .debug = false, std::shared_ptr make_context() { - llarp::Config conf{fs::current_path()}; - conf.Load(std::nullopt, true); + auto conf = std::make_shared(fs::current_path()); + conf->Load(std::nullopt, true); // set testing defaults - conf.network.m_endpointType = "null"; - conf.bootstrap.seednode = true; - conf.api.m_enableRPCServer = false; - conf.lokid.whitelistRouters = false; - conf.router.m_publicAddress = llarp::IpAddress("1.1.1.1"); + conf->network.m_endpointType = "null"; + conf->bootstrap.seednode = true; + conf->api.m_enableRPCServer = false; + conf->lokid.whitelistRouters = false; + conf->router.m_publicAddress = llarp::IpAddress("1.1.1.1"); // make a fake inbound link - conf.links.m_InboundLinks.emplace_back(); - auto& link = conf.links.m_InboundLinks.back(); + conf->links.m_InboundLinks.emplace_back(); + auto& link = conf->links.m_InboundLinks.back(); link.interface = "0.0.0.0"; link.addressFamily = AF_INET; link.port = 0; diff --git a/test/nodedb/test_nodedb.cpp b/test/nodedb/test_nodedb.cpp index cb12712c5..2d23bd27d 100644 --- a/test/nodedb/test_nodedb.cpp +++ b/test/nodedb/test_nodedb.cpp @@ -4,23 +4,25 @@ #include #include +using llarp_nodedb = llarp::NodeDB; + TEST_CASE("FindClosestTo returns correct number of elements", "[nodedb][dht]") { - llarp_nodedb nodeDB("", nullptr); + llarp_nodedb nodeDB{fs::current_path(), nullptr}; constexpr uint64_t numRCs = 3; for (uint64_t i = 0; i < numRCs; ++i) { llarp::RouterContact rc; rc.pubkey[0] = i; - nodeDB.Insert(rc); + nodeDB.Put(rc); } - REQUIRE(numRCs == nodeDB.num_loaded()); + REQUIRE(numRCs == nodeDB.NumLoaded()); llarp::dht::Key_t key; - std::vector results = nodeDB.FindClosestTo(key, 4); + std::vector results = nodeDB.FindManyClosestTo(key, 4); // we asked for more entries than nodedb had REQUIRE(numRCs == results.size()); @@ -28,26 +30,26 @@ TEST_CASE("FindClosestTo returns correct number of elements", "[nodedb][dht]") TEST_CASE("FindClosestTo returns properly ordered set", "[nodedb][dht]") { - llarp_nodedb nodeDB("", nullptr); + llarp_nodedb nodeDB{fs::current_path(), nullptr}; // insert some RCs: a < b < c llarp::RouterContact a; a.pubkey[0] = 1; - nodeDB.Insert(a); + nodeDB.Put(a); llarp::RouterContact b; b.pubkey[0] = 2; - nodeDB.Insert(b); + nodeDB.Put(b); llarp::RouterContact c; c.pubkey[0] = 3; - nodeDB.Insert(c); + nodeDB.Put(c); - REQUIRE(3 == nodeDB.num_loaded()); + REQUIRE(3 == nodeDB.NumLoaded()); llarp::dht::Key_t key; - std::vector results = nodeDB.FindClosestTo(key, 2); + std::vector results = nodeDB.FindManyClosestTo(key, 2); REQUIRE(2 == results.size()); // we xor'ed with 0x0, so order should be a,b,c @@ -57,7 +59,7 @@ TEST_CASE("FindClosestTo returns properly ordered set", "[nodedb][dht]") llarp::dht::Key_t compKey; compKey.Fill(0xFF); - results = nodeDB.FindClosestTo(compKey, 2); + results = nodeDB.FindManyClosestTo(compKey, 2); // we xor'ed with 0xF...F, so order should be inverted (c,b,a) REQUIRE(c.pubkey == results[0].pubkey); diff --git a/test/regress/2020-06-08-key-backup-bug.cpp b/test/regress/2020-06-08-key-backup-bug.cpp index a7109fb9a..e18a48b03 100644 --- a/test/regress/2020-06-08-key-backup-bug.cpp +++ b/test/regress/2020-06-08-key-backup-bug.cpp @@ -11,12 +11,12 @@ llarp::RuntimeOptions opts = {false, false, false}; static std::shared_ptr make_context(std::optional keyfile) { - llarp::Config conf{fs::current_path()}; - conf.Load(std::nullopt, opts.isRouter); - conf.network.m_endpointType = "null"; - conf.network.m_keyfile = keyfile; - conf.bootstrap.seednode = true; - conf.api.m_enableRPCServer = false; + auto conf = std::make_shared(fs::current_path()); + conf->Load(std::nullopt, opts.isRouter); + conf->network.m_endpointType = "null"; + conf->network.m_keyfile = keyfile; + conf->bootstrap.seednode = true; + conf->api.m_enableRPCServer = false; auto context = std::make_shared(); REQUIRE_NOTHROW(context->Configure(std::move(conf)));