Merge pull request #1664 from oxen-io/dev

dev -> stable for 0.9.3
pull/1766/head
Jason Rhinelander 3 years ago committed by GitHub
commit a56308074d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

3
.gitmodules vendored

@ -35,6 +35,3 @@
[submodule "external/ngtcp2"]
path = external/ngtcp2
url = https://github.com/ngtcp2/ngtcp2.git
[submodule "external/cpr"]
path = external/cpr
url = https://github.com/whoshuu/cpr

@ -16,7 +16,7 @@ if(CCACHE_PROGRAM)
endif()
project(lokinet
VERSION 0.9.2
VERSION 0.9.3
DESCRIPTION "lokinet - IP packet onion router"
LANGUAGES C CXX)

@ -14,7 +14,7 @@ if(NOT MSVC_VERSION)
# GNU ld sees fit to merge *all* the .ident sections in object files
# to .r[o]data section one after the other!
add_compile_options(-fno-ident -Wa,-mbig-obj)
link_libraries( -lws2_32 -lshlwapi -ldbghelp -luser32 -liphlpapi -lpsapi -luserenv )
link_libraries( -lws2_32 -lshlwapi -ldbghelp -luser32 -liphlpapi -lpsapi -luserenv)
# zmq requires windows xp or higher
add_definitions(-DWINVER=0x0501 -D_WIN32_WINNT=0x0501)
endif()
@ -29,12 +29,3 @@ if (NOT STATIC_LINK AND NOT MSVC)
message("must ship compiler runtime libraries with this build: libwinpthread-1.dll, libgcc_s_dw2-1.dll, and libstdc++-6.dll")
message("for release builds, turn on STATIC_LINK in cmake options")
endif()
if (STATIC_LINK)
set(LIBUV_USE_STATIC ON)
if (WOW64_CROSS_COMPILE)
link_libraries( -static-libstdc++ -static-libgcc -static -Wl,--image-base=0x10000,--large-address-aware,--dynamicbase,--pic-executable,-e,_mainCRTStartup,--subsystem,console:5.00 )
else()
link_libraries( -static-libstdc++ -static-libgcc -static -Wl,--image-base=0x10000,--dynamicbase,--pic-executable,-e,mainCRTStartup )
endif()
endif()

@ -1,6 +1,6 @@
if(NOT GUI_ZIP_URL)
set(GUI_ZIP_URL "https://oxen.rocks/oxen-io/loki-network-control-panel/lokinet-gui-windows-32bit-v0.3.7.zip")
set(GUI_ZIP_HASH_OPTS EXPECTED_HASH SHA256=faafb5c7c8b9831f572ed78bb2cf8454bfa0d5f79897ce31e64e4a4331d55045)
set(GUI_ZIP_URL "https://oxen.rocks/oxen-io/loki-network-control-panel/lokinet-gui-windows-32bit-v0.3.8.zip")
set(GUI_ZIP_HASH_OPTS EXPECTED_HASH SHA256=60c2b738cf997e5684f307e5222498fd09143d495a932924105a49bf59ded8bb)
endif()
set(TUNTAP_URL "https://build.openvpn.net/downloads/releases/latest/tap-windows-latest-stable.exe")
@ -32,7 +32,7 @@ set(CPACK_PACKAGE_INSTALL_DIRECTORY "Lokinet")
set(CPACK_NSIS_MUI_ICON "${CMAKE_SOURCE_DIR}/win32-setup/lokinet.ico")
set(CPACK_NSIS_DEFINES "RequestExecutionLevel admin")
set(CPACK_NSIS_ENABLE_UNINSTALL_BEFORE_INSTALL ON)
set(CPACK_NSIS_EXTRA_INSTALL_COMMANDS "ifFileExists $INSTDIR\\\\bin\\\\tuntap-install.exe 0 +2\\nExecWait '$INSTDIR\\\\bin\\\\tuntap-install.exe /S'\\nExecWait '$INSTDIR\\\\bin\\\\lokinet.exe --install'\\nExecWait 'sc failure lokinet reset= 60 actions= restart/1000'\\nExecWait '$INSTDIR\\\\bin\\\\lokinet.exe -g C:\\\\ProgramData\\\\lokinet\\\\lokinet.ini'\\nCopyFiles '$INSTDIR\\\\share\\\\bootstrap.signed' C:\\\\ProgramData\\\\lokinet\\\\bootstrap.signed\\nExecWait '$INSTDIR\\\\bin\\\\lokinet-bootstrap.exe'")
set(CPACK_NSIS_EXTRA_INSTALL_COMMANDS "ifFileExists $INSTDIR\\\\bin\\\\tuntap-install.exe 0 +2\\nExecWait '$INSTDIR\\\\bin\\\\tuntap-install.exe /S'\\nExecWait '$INSTDIR\\\\bin\\\\lokinet.exe --install'\\nExecWait 'sc failure lokinet reset= 60 actions= restart/1000'\\nExecWait '$INSTDIR\\\\bin\\\\lokinet.exe -g C:\\\\ProgramData\\\\lokinet\\\\lokinet.ini'\\nCopyFiles '$INSTDIR\\\\share\\\\bootstrap.signed' C:\\\\ProgramData\\\\lokinet\\\\bootstrap.signed\\n")
set(CPACK_NSIS_EXTRA_UNINSTALL_COMMANDS "ExecWait 'net stop lokinet'\\nExecWait 'taskkill /f /t /im lokinet-gui.exe'\\nExecWait '$INSTDIR\\\\bin\\\\lokinet.exe --remove'\\nRMDir /r /REBOOTOK C:\\\\ProgramData\\\\lokinet")
set(CPACK_NSIS_CREATE_ICONS_EXTRA
"CreateShortCut '$SMPROGRAMS\\\\$STARTMENU_FOLDER\\\\Lokinet.lnk' '$INSTDIR\\\\share\\\\gui\\\\lokinet-gui.exe'"

@ -4,17 +4,66 @@ import curses
import json
import sys
import time
import platform
import os
from argparse import ArgumentParser as AP
import zmq
is_windows = lambda : platform.system().lower() == 'windows'
is_linux = lambda : platform.system().lower() == 'linux'
try:
import zmq
except ImportError:
print("zmq module not found")
print()
if is_linux():
print("for debian-based linux do:")
print("\tsudo apt install python3-zmq")
print("for other linuxs do:")
print("\tpip3 install --user zmq")
else:
print("install it with:")
print("\tpip3 install --user zmq")
sys.quit()
geo = None
try:
import GeoIP
geo = GeoIP.open("/usr/share/GeoIP/GeoIP.dat", GeoIP.GEOIP_STANDARD)
except Exception as ex:
print('no geoip: {}'.format(ex))
time.sleep(1)
except ImportError:
print("geoip module not found")
print()
if is_linux():
print("for debian-based linux do:")
print("\tsudo apt install python3-geoip")
print("for other linuxs do:")
print("\tpip3 install --user geoip")
print("for other linuxs you are responsible for obtaining your owen geoip databases, glhf")
else:
print("install it with:")
print("\tpip3 install --user geoip")
print("")
print("press enter to continue without geoip")
sys.stdin.read(1)
else:
try:
geoip_env_var = 'GEOIP_DB_FILE'
if is_windows():
geoip_default_db = '.\\GeoIP.dat'
else:
geoip_default_db = "/usr/share/GeoIP/GeoIP.dat"
geoip_db_file = geoip_env_var in os.environ and os.environ[geoip_env_var] or geoip_default_db
if not os.path.exists(geoip_db_file):
print("no geoip database found at {}".format(geoip_db_file))
print("you can override the path to it using the {} environmental variable".format(geoip_env_var))
sys.quit()
geo = GeoIP.open(geoip_db_file, GeoIP.GEOIP_STANDARD)
except Exception as ex:
print('failed to load geoip database: {}'.format(ex))
time.sleep(1)
now = lambda : int(time.time()) * 1000
def ip_to_flag(ip):
@ -38,7 +87,7 @@ class Monitor:
_sample_size = 12
def __init__(self, url):
def __init__(self, url, introsetMode=False):
self.txrate = 0
self.rxrate = 0
self.data = dict()
@ -52,6 +101,7 @@ class Monitor:
self._rpc_socket.connect(url)
self._speed_samples = [(0,0,0,0)] * self._sample_size
self._run = True
self._introsetMode = introsetMode
def rpc(self, method):
self._rpc_socket.send_multipart([method.encode(), b'lokinetmon'+method.encode()])
@ -104,8 +154,14 @@ class Monitor:
@staticmethod
def time_to(timestamp):
""" return time until timestamp in seconds formatted"""
now = time.time() * 1000
return "{} seconds".format(int((timestamp - now) / 1000))
if timestamp:
val = int((timestamp - now()) / 1000)
if val < 0:
return "{} seconds ago".format(0-val)
else:
return "{} seconds".format(val)
else:
return 'never'
@staticmethod
def speed_of(rate):
@ -343,6 +399,102 @@ class Monitor:
self.win.addstr("search for {}".format(transaction["tx"]["target"]))
return y_pos
def display_introsets(self, y_pos, service):
"""
display introsets on a service
"""
y_pos += 1
self.win.move(y_pos, 1)
self.win.addstr("localhost.loki")
y_pos = self._display_our_introset(y_pos, service)
y_pos += 1
remotes = service['remoteSessions'] or []
for session in remotes:
y_pos = self._display_session_introset(y_pos, session)
def _display_intro(self, y_pos, intro, label, paths):
y_pos += 1
self.win.move(y_pos, 1)
path = 'path' in intro and intro['path'][:4] or '????'
self.win.addstr('{}: ({}|{}) [expires in: {}] [{} paths]'.format(label, intro['router'][:8], path, self.time_to(intro['expiresAt']), self.count_endpoints_in_path(paths, intro['router'])))
return y_pos
@staticmethod
def count_endpoints_in_path(paths, endpoint):
num = 0
for path in paths:
if path['hops'][-1]['router'] == endpoint and path['ready']:
num += 1
return num
@staticmethod
def count_ready_paths(paths):
num = 0
for path in paths:
if path['ready']:
num += 1
return num
@staticmethod
def make_bar(timestamp, scale=1, char='#'):
if timestamp > 0:
return int((abs(timestamp - now()) / 1000) / scale) * char
return ''
def _display_our_introset(self, y_pos, context):
for intro in context['introset']['intros'] or []:
y_pos = self._display_intro(y_pos, intro, "introset intro", context['paths'])
for path in context['paths']:
y_pos = self._display_intro(y_pos, path['intro'], "inbound path intro", context['paths'])
return y_pos
def _display_session_introset(self, y_pos, context):
#print(context.keys())
y_pos += 1
self.win.move(y_pos, 1)
readyState = context['readyToSend'] and '✔️' or '❌'
self.win.addstr('{} ({}) [{}]'.format(context['remoteIdentity'], context['currentConvoTag'], readyState))
y_pos += 1
self.win.move(y_pos, 1)
self.win.addstr('last good send: {}'.format(self.time_to(context['lastGoodSend'])))
y_pos += 1
self.win.move(y_pos, 1)
self.win.addstr(self.make_bar(context['lastGoodSend']))
y_pos += 1
self.win.move(y_pos, 1)
self.win.addstr('last recv: {}'.format(self.time_to(context['lastRecv'])))
y_pos += 1
self.win.move(y_pos, 1)
self.win.addstr(self.make_bar(context['lastRecv']))
y_pos += 1
self.win.move(y_pos, 1)
self.win.addstr('last introset update: {}'.format(self.time_to(context['lastIntrosetUpdate'])))
y_pos += 1
self.win.move(y_pos, 1)
self.win.addstr(self.make_bar(context['lastIntrosetUpdate'], 30))
y_pos += 2
self.win.move(y_pos, 1)
self.win.addstr('last shift: {}'.format(self.time_to(context['lastShift'])))
paths = context['paths'] or []
y_pos = self._display_intro(y_pos + 1, context['nextIntro'], 'next intro', paths) + 1
y_pos = self._display_intro(y_pos, context['remoteIntro'], 'current intro', paths) + 1
for intro in context['currentRemoteIntroset']['intros'] or []:
y_pos = self._display_intro(y_pos, intro, "introset intro", paths)
y_pos += 1
for intro in context['badIntros'] or []:
y_pos = self._display_intro(y_pos, intro, "bad intro", paths)
y_pos += 1
return y_pos
def display_data(self):
""" draw main window """
if self.data is not None:
@ -351,12 +503,16 @@ class Monitor:
services = self.data["services"] or {}
y_pos = 3
try:
y_pos = self.display_links(y_pos, self.data["links"])
for key in services:
y_pos = self.display_service(y_pos, key, services[key])
y_pos = self.display_dht(y_pos, self.data["dht"])
except:
pass
if not self._introsetMode:
y_pos = self.display_links(y_pos, self.data["links"])
for key in services:
y_pos = self.display_service(y_pos, key, services[key])
y_pos = self.display_dht(y_pos, self.data["dht"])
else:
for key in services:
y_pos = self.display_introsets(y_pos, services[key])
except Exception as ex:
print('{}'.format(ex))
else:
self.win.move(1, 1)
self.win.addstr("lokinet offline")
@ -387,8 +543,17 @@ class Monitor:
def main():
""" main function """
ap = AP()
ap.add_argument("--introset", action='store_const', const=True, default=False, help="run in introset inspection mode")
ap.add_argument("--url", default='tcp://127.0.0.1:1190', type=str, help='url to lokinet rpc')
args = ap.parse_args()
mon = Monitor(
len(sys.argv) > 1 and sys.argv[1] or "tcp://127.0.0.1:1190"
args.url,
args.introset
)
mon.run()

@ -31,7 +31,7 @@ def main():
ap.add_argument('--ip', type=str, default=None)
ap.add_argument('--ifname', type=str, default='lo')
ap.add_argument('--netid', type=str, default=None)
ap.add_argument('--loglevel', type=str, default='info')
ap.add_argument('--loglevel', type=str, default='debug')
args = ap.parse_args()
if args.valgrind:
@ -59,11 +59,15 @@ def main():
config['bind'] = {
args.ifname: str(args.baseport + nodeid)
}
config["logging"] = {
"level": args.loglevel
}
config['netdb'] = {
'dir': 'netdb'
}
config['network'] = {
'type' : 'null'
'type' : 'null',
'save-profiles': 'false'
}
config['api'] = {
'enabled': 'false'
@ -71,6 +75,9 @@ def main():
config['lokid'] = {
'enabled': 'false',
}
config["logging"] = {
"level": args.loglevel
}
d = os.path.join(args.dir, svcNodeName(nodeid))
if not os.path.exists(d):
os.mkdir(d)

@ -33,6 +33,7 @@ endif()
foreach(exe lokinet lokinet-vpn lokinet-bootstrap)
if(WIN32 AND NOT MSVC_VERSION)
target_sources(${exe} PRIVATE ../llarp/win32/version.rc)
target_link_libraries(${exe} PRIVATE -static-libstdc++ -static-libgcc --static -Wl,--pic-executable,-e,mainCRTStartup,--subsystem,console:5.00)
target_link_libraries(${exe} PRIVATE ws2_32 iphlpapi)
elseif(CMAKE_SYSTEM_NAME MATCHES "FreeBSD")
target_link_directories(${exe} PRIVATE /usr/local/lib)

@ -25,7 +25,6 @@ if(SUBMODULE_CHECK)
check_submodule(uvw)
check_submodule(cpr)
check_submodule(ngtcp2)
check_submodule(cpr)
endif()
endif()

@ -8,7 +8,7 @@ extern "C"
/// get a free()-able null terminated string that holds our .loki address
/// returns NULL if we dont have one right now
char*
char* EXPORT
lokinet_address(struct lokinet_context*);
#ifdef __cplusplus
}

@ -1,5 +1,7 @@
#pragma once
#include "lokinet_export.h"
#include <stdbool.h>
#include <stdint.h>
#include <unistd.h>
@ -12,40 +14,40 @@ extern "C"
struct lokinet_context;
/// allocate a new lokinet context
struct lokinet_context*
struct lokinet_context* EXPORT
lokinet_context_new();
/// free a context allocated by lokinet_context_new
void
void EXPORT
lokinet_context_free(struct lokinet_context*);
/// spawn all the threads needed for operation and start running
/// return 0 on success
/// return non zero on fail
int
int EXPORT
lokinet_context_start(struct lokinet_context*);
/// return 0 if we our endpoint has published on the network and is ready to send
/// return -1 if we don't have enough paths ready
/// retrun -2 if we look deadlocked
/// retrun -3 if context was null or not started yet
int
int EXPORT
lokinet_status(struct lokinet_context*);
/// wait at most N milliseconds for lokinet to build paths and get ready
/// return 0 if we are ready
/// return nonzero if we are not ready
int
int EXPORT
lokinet_wait_for_ready(int N, struct lokinet_context*);
/// stop all operations on this lokinet context
void
void EXPORT
lokinet_context_stop(struct lokinet_context*);
/// load a bootstrap RC from memory
/// return 0 on success
/// return non zero on fail
int
int EXPORT
lokinet_add_bootstrap_rc(const char*, size_t, struct lokinet_context*);
#ifdef __cplusplus

@ -0,0 +1,7 @@
#pragma once
#ifdef _WIN32
#define EXPORT __cdecl
#else
#define EXPORT
#endif

@ -1,24 +1,24 @@
#pragma once
#include "lokinet_export.h"
#ifdef __cplusplus
extern "C"
{
#endif
/// change our network id globally across all contexts
void
void EXPORT
lokinet_set_netid(const char*);
/// get our current netid
/// must be free()'d after use
const char*
const char* EXPORT
lokinet_get_netid();
/// set log level
/// possible values: trace, debug, info, warn, error, none
/// return 0 on success
/// return non zero on fail
int
int EXPORT
lokinet_log_level(const char*);
#ifdef __cplusplus

@ -11,11 +11,11 @@ extern "C"
/// poll many sockets for activity
/// each pollfd.fd should be set to the socket id
/// returns 0 on sucess
int
int EXPORT
lokinet_poll(struct pollfd* poll, nfds_t numsockets, struct lokinet_context* ctx);
/// close a udp socket or a stream socket by its id
void
void EXPORT
lokinet_close_socket(int id, struct lokinet_context* ctx);
#ifdef __cplusplus

@ -35,7 +35,7 @@ extern "C"
/// do a srv lookup on host for service
/// caller MUST call lokinet_srv_lookup_done when they are done handling the result
int
int EXPORT
lokinet_srv_lookup(
char* host,
char* service,
@ -51,12 +51,12 @@ extern "C"
/// iterate over each srv record in a lookup result
/// user is passes into hook and called for each result and then with NULL as the result on the
/// end of iteration
void
void EXPORT
lokinet_for_each_srv_record(
struct lokinet_srv_lookup_result* result, lokinet_srv_record_iterator iter, void* user);
/// free internal members of a srv lookup result after use of the result
void
void EXPORT
lokinet_srv_lookup_done(struct lokinet_srv_lookup_result* result);
#ifdef __cplusplus

@ -28,7 +28,7 @@ extern "C"
/// connect out to a remote endpoint
/// remoteAddr is in the form of "name:port"
/// localAddr is either NULL for any or in the form of "ip:port" to bind to an explicit address
void
void EXPORT
lokinet_outbound_stream(
struct lokinet_stream_result* result,
const char* remoteAddr,
@ -44,13 +44,13 @@ extern "C"
/// set stream accepter filter
/// passes user parameter into stream filter as void *
/// returns stream id
int
int EXPORT
lokinet_inbound_stream_filter(
lokinet_stream_filter acceptFilter, void* user, struct lokinet_context* context);
/// simple stream acceptor
/// simple variant of lokinet_inbound_stream_filter that maps port to localhost:port
int
int EXPORT
lokinet_inbound_stream(uint16_t port, struct lokinet_context* context);
#ifdef __cplusplus

@ -42,7 +42,7 @@ extern "C"
/// localAddr is the local ip:port to bind our socket to, if localAddr is NULL then
/// lokinet_udp_sendmmsg MUST be used to send packets return 0 on success return nonzero on fail,
/// containing an errno value
int
int EXPORT
lokinet_udp_establish(
char* remoteHost,
char* remotePort,
@ -66,7 +66,7 @@ extern "C"
///
/// returns 0 on success
/// returns nonzero on error in which it is an errno value
int
int EXPORT
lokinet_udp_bind(
int exposedPort,
char* srv,
@ -78,7 +78,7 @@ extern "C"
/// returns 0 on sucess
///
/// returns non zero errno on error
int
int EXPORT
lokinet_udp_poll(
const int* socket_ids,
size_t numsockets,
@ -93,7 +93,7 @@ extern "C"
};
/// analog to recvmmsg
ssize_t
ssize_t EXPORT
lokinet_udp_recvmmsg(
int socket_id,
struct lokinet_udp_pkt* events,

@ -103,6 +103,7 @@ add_library(liblokinet
dns/unbound_resolver.cpp
consensus/table.cpp
consensus/reachability_testing.cpp
bootstrap.cpp
context.cpp

@ -769,7 +769,7 @@ namespace llarp
}
else
{
info.interface = name;
info.interface = std::string{name};
std::vector<std::string_view> splits = split(value, ',');
for (std::string_view str : splits)

@ -0,0 +1,156 @@
#include "reachability_testing.hpp"
#include <chrono>
#include <llarp/router/abstractrouter.hpp>
#include <llarp/util/logging/logger.hpp>
#include <llarp/crypto/crypto.hpp>
using std::chrono::steady_clock;
namespace llarp::consensus
{
using fseconds = std::chrono::duration<float, std::chrono::seconds::period>;
using fminutes = std::chrono::duration<float, std::chrono::minutes::period>;
static void
check_incoming_tests_impl(
std::string_view name,
const time_point_t& now,
const time_point_t& startup,
detail::incoming_test_state& incoming)
{
const auto elapsed = now - std::max(startup, incoming.last_test);
bool failing = elapsed > reachability_testing::MAX_TIME_WITHOUT_PING;
bool whine = failing != incoming.was_failing
|| (failing && now - incoming.last_whine > reachability_testing::WHINING_INTERVAL);
incoming.was_failing = failing;
if (whine)
{
incoming.last_whine = now;
if (!failing)
{
LogInfo(name, " ping received; port is likely reachable again");
}
else
{
if (incoming.last_test.time_since_epoch() == 0s)
{
LogWarn("Have NEVER received ", name, " pings!");
}
else
{
LogWarn(
"Have not received ",
name,
" pings for a long time: ",
fminutes{elapsed}.count(),
" minutes");
}
LogWarn(
"Please check your ",
name,
" port. Not being reachable "
"over ",
name,
" may result in a deregistration!");
}
}
}
void
reachability_testing::check_incoming_tests(const time_point_t& now)
{
check_incoming_tests_impl("lokinet", now, startup, last);
}
void
reachability_testing::incoming_ping(const time_point_t& now)
{
last.last_test = now;
}
std::optional<RouterID>
reachability_testing::next_random(AbstractRouter* router, const time_point_t& now, bool requeue)
{
if (next_general_test > now)
return std::nullopt;
CSRNG rng;
next_general_test =
now + std::chrono::duration_cast<time_point_t::duration>(fseconds(TESTING_INTERVAL(rng)));
// Pull the next element off the queue, but skip ourself, any that are no longer registered, and
// any that are currently known to be failing (those are queued for testing separately).
RouterID my_pk{router->pubkey()};
while (!testing_queue.empty())
{
auto& pk = testing_queue.back();
std::optional<RouterID> sn;
if (pk != my_pk && !failing.count(pk))
sn = pk;
testing_queue.pop_back();
if (sn)
return sn;
}
if (!requeue)
return std::nullopt;
// FIXME: when a *new* node comes online we need to inject it into a random position in the SN
// list with probability (L/N) [L = current list size, N = potential list size]
//
// (FIXME: put this FIXME in a better place ;-) )
// We exhausted the queue so repopulate it and try again
testing_queue.clear();
const auto all = router->GetRouterWhitelist();
testing_queue.insert(testing_queue.begin(), all.begin(), all.end());
std::shuffle(testing_queue.begin(), testing_queue.end(), rng);
// Recurse with the rebuild list, but don't let it try rebuilding again
return next_random(router, now, false);
}
std::vector<std::pair<RouterID, int>>
reachability_testing::get_failing(const time_point_t& now)
{
// Our failing_queue puts the oldest retest times at the top, so pop them off into our result
// until the top node should be retested sometime in the future
std::vector<std::pair<RouterID, int>> result;
while (result.size() < MAX_RETESTS_PER_TICK && !failing_queue.empty())
{
auto& [pk, retest_time, failures] = failing_queue.top();
if (retest_time > now)
break;
result.emplace_back(pk, failures);
failing_queue.pop();
}
return result;
}
void
reachability_testing::add_failing_node(const RouterID& pk, int previous_failures)
{
using namespace std::chrono;
if (previous_failures < 0)
previous_failures = 0;
CSRNG rng;
auto next_test_in = duration_cast<time_point_t::duration>(
previous_failures * TESTING_BACKOFF + fseconds{TESTING_INTERVAL(rng)});
if (next_test_in > TESTING_BACKOFF_MAX)
next_test_in = TESTING_BACKOFF_MAX;
failing.insert(pk);
failing_queue.emplace(pk, steady_clock::now() + next_test_in, previous_failures + 1);
}
void
reachability_testing::remove_node_from_failing(const RouterID& pk)
{
failing.erase(pk);
}
} // namespace llarp::consensus

@ -0,0 +1,148 @@
#pragma once
#include <chrono>
#include <queue>
#include <random>
#include <unordered_map>
#include <unordered_set>
#include <vector>
#include <llarp/util/time.hpp>
#include <llarp/router_id.hpp>
namespace llarp
{
struct AbstractRouter;
}
namespace llarp::consensus
{
namespace detail
{
using clock_t = std::chrono::steady_clock;
using time_point_t = std::chrono::time_point<clock_t>;
// Returns std::greater on the std::get<N>(v)th element value.
template <typename T, size_t N>
struct nth_greater
{
constexpr bool
operator()(const T& lhs, const T& rhs) const
{
return std::greater<std::tuple_element_t<N, T>>{}(std::get<N>(lhs), std::get<N>(rhs));
}
};
struct incoming_test_state
{
time_point_t last_test{};
time_point_t last_whine{};
bool was_failing = false;
};
} // namespace detail
using time_point_t = detail::time_point_t;
using clock_t = detail::clock_t;
// How often we tick the timer to check whether we need to do any tests.
constexpr auto REACHABILITY_TESTING_TIMER_INTERVAL = 50ms;
class reachability_testing
{
public:
// Distribution for the seconds between node tests: we throw in some randomness to avoid
// potential clustering of tests. (Note that there is some granularity here as the test timer
// only runs every REACHABILITY_TESTING_TIMER_INTERVAL).
std::normal_distribution<float> TESTING_INTERVAL{10.0, 3.0};
// The linear backoff after each consecutive test failure before we re-test. Specifically we
// schedule the next re-test for (TESTING_BACKOFF*previous_failures) + TESTING_INTERVAL(rng).
inline static constexpr auto TESTING_BACKOFF = 10s;
// The upper bound for the re-test interval.
inline static constexpr auto TESTING_BACKOFF_MAX = 2min;
// The maximum number of nodes that we will re-test at once (i.e. per TESTING_TIMING_INTERVAL);
// mainly intended to throttle ourselves if, for instance, our own connectivity loss makes us
// accumulate tons of nodes to test all at once. (Despite the random intervals, this can happen
// if we also get decommissioned during which we can't test at all but still have lots of
// failing nodes we want to test right away when we get recommissioned).
inline static constexpr int MAX_RETESTS_PER_TICK = 4;
// Maximum time without a ping before we start whining about it.
//
// We have a probability of about 0.368* of *not* getting pinged within a ping interval (10s),
// and so the probability of not getting a ping for 2 minutes (i.e. 12 test spans) just because
// we haven't been selected is extremely small (0.0000061). It also coincides nicely with
// blockchain time (i.e. two minutes) and our max testing backoff.
//
// * = approx value of ((n-1)/n)^n for non-tiny values of n
inline static constexpr auto MAX_TIME_WITHOUT_PING = 2min;
// How often we whine in the logs about being unreachable
inline static constexpr auto WHINING_INTERVAL = 2min;
private:
// Queue of pubkeys of service nodes to test; we pop off the back of this until the queue
// empties then we refill it with a shuffled list of all pubkeys then pull off of it until it is
// empty again, etc.
std::vector<RouterID> testing_queue;
// The next time for a general test
time_point_t next_general_test = time_point_t::min();
// When we started, so that we know not to hold off on whining about no pings for a while.
const time_point_t startup = clock_t::now();
// Pubkeys, next test times, and sequential failure counts of service nodes that are currently
// in "failed" status along with the last time they failed; we retest them first after 10s then
// back off linearly by an additional 10s up to a max testing interval of 2m30s, until we get a
// successful response.
using FailingPK = std::tuple<RouterID, time_point_t, int>;
std::priority_queue<FailingPK, std::vector<FailingPK>, detail::nth_greater<FailingPK, 1>>
failing_queue;
std::unordered_set<RouterID> failing;
// Track the last time *this node* was tested by other network nodes; used to detect and warn
// about possible network issues.
detail::incoming_test_state last;
public:
// If it is time to perform another random test, this returns the next node to test from the
// testing queue and returns it, also updating the timer for the next test. If it is not yet
// time, or if the queue is empty and cannot current be replenished, returns std::nullopt. If
// the queue empties then this builds a new one by shuffling current public keys in the swarm's
// "all nodes" then starts using the new queue for this an subsequent calls.
//
// `requeue` is mainly for internal use: if false it avoids rebuilding the queue if we run
// out (and instead just return nullopt).
std::optional<RouterID>
next_random(
AbstractRouter* router, const time_point_t& now = clock_t::now(), bool requeue = true);
// Removes and returns up to MAX_RETESTS_PER_TICK nodes that are due to be tested (i.e.
// next-testing-time <= now). Returns [snrecord, #previous-failures] for each.
std::vector<std::pair<RouterID, int>>
get_failing(const time_point_t& now = clock_t::now());
// Adds a bad node pubkey to the failing list, to be re-tested soon (with a backoff depending on
// `failures`; see TESTING_BACKOFF). `previous_failures` should be the number of previous
// failures *before* this one, i.e. 0 for a random general test; or the failure count returned
// by `get_failing` for repeated failures.
void
add_failing_node(const RouterID& pk, int previous_failures = 0);
/// removes the public key from the failing set
void
remove_node_from_failing(const RouterID& pk);
// Called when this router receives an incomming session
void
incoming_ping(const time_point_t& now = clock_t::now());
// Check whether we received incoming pings recently
void
check_incoming_tests(const time_point_t& now = clock_t::now());
};
} // namespace llarp::consensus

@ -5,6 +5,6 @@
#include <cstdlib>
constexpr size_t MAX_LINK_MSG_SIZE = 8192;
static constexpr auto DefaultLinkSessionLifetime = 1min;
constexpr size_t MaxSendQueueSize = 1024;
static constexpr auto DefaultLinkSessionLifetime = 5min;
constexpr size_t MaxSendQueueSize = 1024 * 16;
static constexpr auto LinkLayerConnectTimeout = 5s;

@ -20,14 +20,16 @@ namespace llarp
constexpr std::chrono::milliseconds default_lifetime = 20min;
/// minimum into lifetime we will advertise
constexpr std::chrono::milliseconds min_intro_lifetime = default_lifetime / 2;
/// number of slices of path lifetime to spread intros out via
constexpr auto intro_spread_slices = 4;
/// spacing frequency at which we try to build paths for introductions
constexpr std::chrono::milliseconds intro_path_spread = default_lifetime / 5;
constexpr std::chrono::milliseconds intro_path_spread = default_lifetime / intro_spread_slices;
/// Minimum paths to keep around for intros; mainly used at startup (the
/// spread, above, should be able to maintain more than this number of paths
/// normally once things are going).
constexpr std::size_t min_intro_paths = 4;
/// after this many ms a path build times out
constexpr auto build_timeout = 30s;
constexpr auto build_timeout = 10s;
/// measure latency every this interval ms
constexpr auto latency_interval = 20s;

@ -382,7 +382,7 @@ namespace llarp
replies.emplace_back(new GotRouterMessage(requester, txid, {router->rc()}, false));
return;
}
if (not GetRouter()->ConnectionToRouterAllowed(target.as_array()))
if (not GetRouter()->SessionToRouterAllowed(target.as_array()))
{
// explicitly not allowed
replies.emplace_back(new GotRouterMessage(requester, txid, {}, false));

@ -34,7 +34,7 @@ namespace llarp
Key_t peer;
// check if we know this in our nodedb first
if (not dht.GetRouter()->ConnectionToRouterAllowed(targetKey))
if (not dht.GetRouter()->SessionToRouterAllowed(targetKey))
{
// explicitly disallowed by network
replies.emplace_back(new GotRouterMessage(k, txid, {}, false));

@ -130,6 +130,9 @@ namespace llarp
std::string name,
std::string service,
std::function<void(std::vector<dns::SRVData>)> resultHandler) = 0;
virtual void
MarkAddressOutbound(AddressVariant_t remote) = 0;
};
} // namespace llarp

@ -51,6 +51,12 @@ namespace llarp
return shared_from_this();
}
std::weak_ptr<path::PathSet>
GetWeak() override
{
return weak_from_this();
}
void
BlacklistSNode(const RouterID snode) override;

@ -124,7 +124,7 @@ namespace llarp
++itr;
}
if (not m_Router->ConnectionToRouterAllowed(*rid))
if (not m_Router->PathToRouterAllowed(*rid))
return false;
ObtainSNodeSession(*rid, [data = payload.copy(), type](auto session) {
@ -150,7 +150,7 @@ namespace llarp
return false;
if (auto* rid = std::get_if<RouterID>(&addr))
{
if (m_SNodeKeys.count(PubKey{*rid}) or m_Router->ConnectionToRouterAllowed(*rid))
if (m_SNodeKeys.count(PubKey{*rid}) or m_Router->PathToRouterAllowed(*rid))
{
ObtainSNodeSession(
*rid, [hook, routerID = *rid](std::shared_ptr<exit::BaseSession> session) {
@ -338,7 +338,7 @@ namespace llarp
void
ExitEndpoint::ObtainSNodeSession(const RouterID& router, exit::SessionReadyFunc obtainCb)
{
if (not m_Router->rcLookupHandler().RemoteIsAllowed(router))
if (not m_Router->rcLookupHandler().SessionIsAllowed(router))
{
obtainCb(nullptr);
return;

@ -41,6 +41,8 @@ namespace llarp
void
SRVRecordsChanged() override;
void MarkAddressOutbound(AddressVariant_t) override{};
bool
SendToOrQueue(
service::ConvoTag tag, const llarp_buffer_t& payload, service::ProtocolType t) override;

@ -61,6 +61,12 @@ namespace llarp
return shared_from_this();
}
std::weak_ptr<path::PathSet>
GetWeak() override
{
return weak_from_this();
}
bool
SupportsV6() const override
{

@ -67,6 +67,11 @@ namespace llarp
: service::Endpoint(r, parent)
, m_UserToNetworkPktQueue("endpoint_sendq", r->loop(), r->loop())
{
m_PacketSendWaker = r->loop()->make_waker([this]() { FlushWrite(); });
m_MessageSendWaker = r->loop()->make_waker([this]() {
FlushSend();
Pump(Now());
});
m_PacketRouter = std::make_unique<vpn::PacketRouter>(
[this](net::IPPacket pkt) { HandleGotUserPacket(std::move(pkt)); });
#ifdef ANDROID
@ -224,6 +229,12 @@ namespace llarp
{
FlushSend();
Pump(Now());
FlushWrite();
}
void
TunEndpoint::FlushWrite()
{
// flush network to user
while (not m_NetworkToUserPktQueue.empty())
{
@ -281,6 +292,7 @@ namespace llarp
service::Address addr, auto msg, bool isV6) -> bool {
using service::Address;
using service::OutboundContext;
MarkAddressOutbound(addr);
return EnsurePathToService(
addr,
[this, addr, msg, reply, isV6](const Address&, OutboundContext* ctx) {
@ -307,7 +319,7 @@ namespace llarp
service::Address addr, auto msg) -> bool {
using service::Address;
using service::OutboundContext;
MarkAddressOutbound(addr);
return EnsurePathToService(
addr,
[msg, addr, reply](const Address&, OutboundContext* ctx) {
@ -697,6 +709,7 @@ namespace llarp
m_AddrToIP[addr] = ip;
m_SNodes[addr] = SNode;
MarkIPActiveForever(ip);
MarkAddressOutbound(addr);
return true;
}
@ -923,12 +936,13 @@ namespace llarp
MarkAddressOutbound(addr);
EnsurePathToService(
addr,
[addr, pkt, self = this](service::Address, service::OutboundContext* ctx) {
[pkt](service::Address addr, service::OutboundContext* ctx) {
if (ctx)
{
ctx->sendTimeout = 5s;
ctx->SendPacketToRemote(pkt.ConstBuffer(), service::ProtocolType::Exit);
return;
}
self->SendToOrQueue(addr, pkt.ConstBuffer(), service::ProtocolType::Exit);
LogWarn("cannot ensure path to exit ", addr, " so we drop some packets");
},
PathAlignmentTimeout());
return;
@ -957,12 +971,40 @@ namespace llarp
else
pkt.UpdateIPv6Address({0}, {0});
}
if (SendToOrQueue(to, pkt.Buffer(), type))
// try sending it on an existing convotag
// this succeds for inbound convos, probably.
if (SendToOrQueue(to, pkt.ConstBuffer(), type))
{
MarkIPActive(dst);
return;
}
llarp::LogWarn(Name(), " did not flush packets");
// try establishing a path to this guy
// will fail if it's an inbound convo
EnsurePathTo(
to,
[pkt, type, dst, to, this](auto maybe) {
if (not maybe)
{
var::visit(
[&](auto&& addr) {
LogWarn(Name(), " failed to ensure path to ", addr, " no convo tag found");
},
to);
}
if (SendToOrQueue(*maybe, pkt.ConstBuffer(), type))
{
MarkIPActive(dst);
}
else
{
var::visit(
[&](auto&& addr) {
LogWarn(Name(), " failed to send to ", addr, ", SendToOrQueue failed");
},
to);
}
},
PathAlignmentTimeout());
});
}
@ -1117,6 +1159,8 @@ namespace llarp
pkt.UpdateIPv6Address(src, dst);
}
m_NetworkToUserPktQueue.push(std::move(write));
// wake up packet flushing event so we ensure that all packets are written to user
m_PacketSendWaker->Trigger();
return true;
}
@ -1163,7 +1207,9 @@ namespace llarp
m_AddrToIP[ident] = nextIP;
m_IPToAddr[nextIP] = ident;
m_SNodes[ident] = snode;
llarp::LogInfo(Name(), " mapped ", ident, " to ", nextIP);
var::visit(
[&](auto&& remote) { llarp::LogInfo(Name(), " mapped ", remote, " to ", nextIP); },
addr);
MarkIPActive(nextIP);
return nextIP;
}
@ -1223,6 +1269,7 @@ namespace llarp
TunEndpoint::HandleGotUserPacket(net::IPPacket pkt)
{
m_UserToNetworkPktQueue.Emplace(std::move(pkt));
m_MessageSendWaker->Trigger();
}
TunEndpoint::~TunEndpoint() = default;

@ -34,6 +34,12 @@ namespace llarp
return shared_from_this();
}
std::weak_ptr<path::PathSet>
GetWeak() override
{
return weak_from_this();
}
void
Thaw() override;
@ -209,6 +215,10 @@ namespace llarp
virtual void
FlushSend();
/// flush writing ip packets to interface
void
FlushWrite();
/// maps ip to key (host byte order)
std::unordered_map<huint128_t, AlignedBuffer<32>> m_IPToAddr;
/// maps key to ip (host byte order)
@ -228,7 +238,7 @@ namespace llarp
std::function<void(dns::Message)> reply,
bool sendIPv6)
{
if (ctx or HasAddress(addr))
if (ctx)
{
huint128_t ip = ObtainIPForAddr(addr);
query->answers.clear();
@ -275,6 +285,11 @@ namespace llarp
std::set<IPRange> m_OwnedRanges;
/// how long to wait for path alignment
llarp_time_t m_PathAlignmentTimeout;
/// idempotent wakeup for writing packets to user
std::shared_ptr<EventLoopWakeup> m_PacketSendWaker;
/// idempotent wakeup for writing messages to network
std::shared_ptr<EventLoopWakeup> m_MessageSendWaker;
};
} // namespace handlers

@ -329,7 +329,8 @@ namespace llarp
{
if (m_State == State::Ready || m_State == State::LinkIntro)
{
return now > m_LastRX && now - m_LastRX > SessionAliveTimeout;
return now > m_LastRX
&& now - m_LastRX > (m_Inbound ? DefaultLinkSessionLifetime : SessionAliveTimeout);
}
return now - m_CreatedAt >= LinkLayerConnectTimeout;
}

@ -35,6 +35,12 @@ namespace llarp
virtual bool
HasSessionTo(const RouterID& remote) const = 0;
// it is fine to have both an inbound and outbound session with
// another relay, and is useful for network testing. This test
// is more specific for use with "should we connect outbound?"
virtual bool
HasOutboundSessionTo(const RouterID& remote) const = 0;
/// return true if the session with this pubkey is a client
/// return false if the session with this pubkey is a router
/// return std::nullopt we have no session with this pubkey

@ -60,6 +60,17 @@ namespace llarp
return GetLinkWithSessionTo(remote) != nullptr;
}
bool
LinkManager::HasOutboundSessionTo(const RouterID& remote) const
{
for (const auto& link : outboundLinks)
{
if (link->HasSessionTo(remote))
return true;
}
return false;
}
std::optional<bool>
LinkManager::SessionIsClient(RouterID remote) const
{
@ -69,11 +80,8 @@ namespace llarp
if (session)
return not session->IsRelay();
}
for (const auto& link : outboundLinks)
{
if (link->HasSessionTo(remote))
return false;
}
if (HasOutboundSessionTo(remote))
return false;
return std::nullopt;
}
@ -177,10 +185,16 @@ namespace llarp
return;
util::Lock l(_mutex);
auto& curr = m_PersistingSessions[remote];
if (until > curr)
curr = until;
LogDebug("persist session to ", remote, " until ", curr - time_now_ms());
m_PersistingSessions[remote] = std::max(until, m_PersistingSessions[remote]);
if (auto maybe = SessionIsClient(remote))
{
if (*maybe)
{
// mark this as a client so we don't try to back connect
m_Clients.Upsert(remote);
}
}
}
void
@ -329,43 +343,42 @@ namespace llarp
return;
std::vector<RouterID> sessionsNeeded;
std::vector<RouterID> sessionsClosed;
{
util::Lock l(_mutex);
auto itr = m_PersistingSessions.begin();
while (itr != m_PersistingSessions.end())
for (auto [remote, until] : m_PersistingSessions)
{
if (now < itr->second)
if (now < until)
{
auto link = GetLinkWithSessionTo(itr->first);
auto link = GetLinkWithSessionTo(remote);
if (link)
{
link->KeepAliveSessionTo(itr->first);
link->KeepAliveSessionTo(remote);
}
else
else if (not m_Clients.Contains(remote))
{
sessionsNeeded.push_back(itr->first);
sessionsNeeded.push_back(remote);
}
++itr;
}
else
else if (not m_Clients.Contains(remote))
{
const RouterID r(itr->first);
LogInfo("commit to ", r, " expired");
itr = m_PersistingSessions.erase(itr);
for (const auto& link : outboundLinks)
{
link->CloseSessionTo(r);
}
sessionsClosed.push_back(remote);
}
}
}
for (const auto& router : sessionsNeeded)
{
LogDebug("ensuring session to ", router, " for previously made commitment");
_sessionMaker->CreateSessionTo(router, nullptr);
}
for (const auto& router : sessionsClosed)
{
m_PersistingSessions.erase(router);
ForEachOutboundLink([router](auto link) { link->CloseSessionTo(router); });
}
}
void

@ -33,6 +33,9 @@ namespace llarp
bool
HasSessionTo(const RouterID& remote) const override;
bool
HasOutboundSessionTo(const RouterID& remote) const override;
std::optional<bool>
SessionIsClient(RouterID remote) const override;
@ -108,6 +111,8 @@ namespace llarp
std::unordered_map<RouterID, SessionStats> m_lastRouterStats;
util::DecayingHashSet<RouterID> m_Clients{path::default_lifetime};
IOutboundSessionMaker* _sessionMaker;
};

@ -191,28 +191,20 @@ struct lokinet_srv_lookup_private
extern "C"
{
struct lokinet_context*
lokinet_default()
{
if (not g_context)
g_context = std::make_unique<lokinet_context>();
return g_context.get();
}
void
void EXPORT
lokinet_set_netid(const char* netid)
{
llarp::NetID::DefaultValue() = llarp::NetID{reinterpret_cast<const byte_t*>(netid)};
}
const char*
const char* EXPORT
lokinet_get_netid()
{
const auto netid = llarp::NetID::DefaultValue().ToString();
return strdup(netid.c_str());
}
int
int EXPORT
lokinet_log_level(const char* level)
{
if (auto maybe = llarp::LogLevelFromString(level))
@ -223,7 +215,7 @@ extern "C"
return -1;
}
char*
char* EXPORT
lokinet_address(struct lokinet_context* ctx)
{
if (not ctx)
@ -235,7 +227,7 @@ extern "C"
return strdup(addrStr.c_str());
}
int
int EXPORT
lokinet_add_bootstrap_rc(const char* data, size_t datalen, struct lokinet_context* ctx)
{
llarp_buffer_t buf{data, datalen};
@ -253,20 +245,20 @@ extern "C"
return 0;
}
struct lokinet_context*
struct lokinet_context* EXPORT
lokinet_context_new()
{
return new lokinet_context{};
}
void
void EXPORT
lokinet_context_free(struct lokinet_context* ctx)
{
lokinet_context_stop(ctx);
delete ctx;
}
int
int EXPORT
lokinet_context_start(struct lokinet_context* ctx)
{
if (not ctx)
@ -301,7 +293,7 @@ extern "C"
return 0;
}
int
int EXPORT
lokinet_status(struct lokinet_context* ctx)
{
if (ctx == nullptr)
@ -314,7 +306,7 @@ extern "C"
return ctx->endpoint()->IsReady() ? 0 : -1;
}
int
int EXPORT
lokinet_wait_for_ready(int ms, struct lokinet_context* ctx)
{
if (ctx == nullptr)
@ -335,7 +327,7 @@ extern "C"
return ep->IsReady() ? 0 : -1;
}
void
void EXPORT
lokinet_context_stop(struct lokinet_context* ctx)
{
if (not ctx)
@ -354,7 +346,7 @@ extern "C"
ctx->runner.reset();
}
void
void EXPORT
lokinet_outbound_stream(
struct lokinet_stream_result* result,
const char* remote,
@ -477,14 +469,14 @@ extern "C"
}
}
int
int EXPORT
lokinet_inbound_stream(uint16_t port, struct lokinet_context* ctx)
{
/// FIXME: delete pointer later
return lokinet_inbound_stream_filter(&accept_port, (void*)new std::uintptr_t{port}, ctx);
}
int
int EXPORT
lokinet_inbound_stream_filter(
lokinet_stream_filter acceptFilter, void* user, struct lokinet_context* ctx)
{
@ -531,7 +523,7 @@ extern "C"
return id;
}
void
void EXPORT
lokinet_close_stream(int stream_id, struct lokinet_context* ctx)
{
if (not ctx)
@ -564,7 +556,7 @@ extern "C"
{}
}
int
int EXPORT
lokinet_srv_lookup(
char* host,
char* service,
@ -580,7 +572,7 @@ extern "C"
return result->internal->LookupSRV(host, service, ctx);
}
void
void EXPORT
lokinet_for_each_srv_record(
struct lokinet_srv_lookup_result* result, lokinet_srv_record_iterator iter, void* user)
{
@ -594,7 +586,7 @@ extern "C"
}
}
void
void EXPORT
lokinet_srv_lookup_done(struct lokinet_srv_lookup_result* result)
{
if (result == nullptr or result->internal == nullptr)

@ -111,7 +111,7 @@ namespace llarp
{
return path->HandleDownstream(llarp_buffer_t(X), Y, r);
}
llarp::LogWarn("unhandled downstream message id=", pathid);
llarp::LogWarn("no path for downstream message id=", pathid);
return false;
}
} // namespace llarp

@ -204,6 +204,7 @@ namespace llarp
static void
OnForwardLRCMResult(
AbstractRouter* router,
std::shared_ptr<path::TransitHop> path,
const PathID_t pathid,
const RouterID nextHop,
const SharedSecret pathKey,
@ -236,9 +237,8 @@ namespace llarp
std::abort();
break;
}
router->QueueWork([router, pathid, nextHop, pathKey, status] {
LR_StatusMessage::CreateAndSend(router, pathid, nextHop, pathKey, status);
router->QueueWork([router, path, pathid, nextHop, pathKey, status] {
LR_StatusMessage::CreateAndSend(router, path, pathid, nextHop, pathKey, status);
});
}
@ -251,6 +251,7 @@ namespace llarp
llarp::LogError("duplicate transit hop ", self->hop->info);
LR_StatusMessage::CreateAndSend(
self->context->Router(),
self->hop,
self->hop->info.rxID,
self->hop->info.downstream,
self->hop->pathKey,
@ -269,6 +270,7 @@ namespace llarp
llarp::LogError("client path build hit limit ", *self->fromAddr);
OnForwardLRCMResult(
self->context->Router(),
self->hop,
self->hop->info.rxID,
self->hop->info.downstream,
self->hop->pathKey,
@ -279,7 +281,7 @@ namespace llarp
#endif
}
if (!self->context->Router()->ConnectionToRouterAllowed(self->hop->info.upstream))
if (not self->context->Router()->PathToRouterAllowed(self->hop->info.upstream))
{
// we are not allowed to forward it ... now what?
llarp::LogError(
@ -288,6 +290,7 @@ namespace llarp
"not allowed, dropping build request on the floor");
OnForwardLRCMResult(
self->context->Router(),
self->hop,
self->hop->info.rxID,
self->hop->info.downstream,
self->hop->pathKey,
@ -305,15 +308,17 @@ namespace llarp
self->context->PutTransitHop(self->hop);
// forward to next hop
using std::placeholders::_1;
auto func = std::bind(
&OnForwardLRCMResult,
self->context->Router(),
self->hop->info.rxID,
self->hop->info.downstream,
self->hop->pathKey,
_1);
auto func = [self](auto status) {
OnForwardLRCMResult(
self->context->Router(),
self->hop,
self->hop->info.rxID,
self->hop->info.downstream,
self->hop->pathKey,
status);
self->hop = nullptr;
};
self->context->ForwardLRCM(self->hop->info.upstream, self->frames, func);
self->hop = nullptr;
}
// this is called from the logic thread
@ -338,6 +343,7 @@ namespace llarp
if (!LR_StatusMessage::CreateAndSend(
self->context->Router(),
self->hop,
self->hop->info.rxID,
self->hop->info.downstream,
self->hop->pathKey,

@ -22,21 +22,21 @@ namespace llarp
std::array<EncryptedFrame, 8> frames;
uint64_t status = 0;
HopHandler_ptr path;
HopHandler_ptr hop;
AbstractRouter* router;
PathID_t pathid;
LRSM_AsyncHandler(
std::array<EncryptedFrame, 8> _frames,
uint64_t _status,
HopHandler_ptr _path,
HopHandler_ptr _hop,
AbstractRouter* _router,
const PathID_t& pathid)
: frames(std::move(_frames))
, status(_status)
, path(std::move(_path))
, router(_router)
, pathid(pathid)
: frames{std::move(_frames)}
, status{_status}
, hop{std::move(_hop)}
, router{_router}
, pathid{pathid}
{}
~LRSM_AsyncHandler() = default;
@ -45,8 +45,7 @@ namespace llarp
handle()
{
router->NotifyRouterEvent<tooling::PathStatusReceivedEvent>(router->pubkey(), pathid, status);
path->HandleLRSM(status, frames, router);
hop->HandleLRSM(status, frames, router);
}
void
@ -133,16 +132,13 @@ namespace llarp
}
auto path = router->pathContext().GetByUpstream(session->GetPubKey(), pathid);
if (!path)
if (not path)
{
llarp::LogWarn("unhandled LR_Status message: no associated path found pathid=", pathid);
return false;
}
auto handler = std::make_shared<LRSM_AsyncHandler>(frames, status, path, router, pathid);
handler->queue_handle();
return true;
}
@ -157,6 +153,7 @@ namespace llarp
bool
LR_StatusMessage::CreateAndSend(
AbstractRouter* router,
std::shared_ptr<path::TransitHop> hop,
const PathID_t pathid,
const RouterID nextHop,
const SharedSecret pathKey,
@ -169,12 +166,9 @@ namespace llarp
message->SetDummyFrames();
if (!message->AddFrame(pathKey, status))
{
return false;
}
message->AddFrame(pathKey, status);
QueueSendMessage(router, nextHop, message);
QueueSendMessage(router, nextHop, message, hop);
return true; // can't guarantee delivery here, as far as we know it's fine
}
@ -221,10 +215,19 @@ namespace llarp
void
LR_StatusMessage::QueueSendMessage(
AbstractRouter* router, const RouterID nextHop, std::shared_ptr<LR_StatusMessage> msg)
AbstractRouter* router,
const RouterID nextHop,
std::shared_ptr<LR_StatusMessage> msg,
std::shared_ptr<path::TransitHop> hop)
{
router->loop()->call(
[router, nextHop, msg = std::move(msg)] { SendMessage(router, nextHop, msg); });
router->loop()->call([router, nextHop, msg = std::move(msg), hop = std::move(hop)] {
SendMessage(router, nextHop, msg);
// destroy hop as needed
if ((msg->status & LR_StatusRecord::SUCCESS) != LR_StatusRecord::SUCCESS)
{
hop->QueueDestroySelf(router);
}
});
}
void
@ -273,4 +276,32 @@ namespace llarp
return status == other.status;
}
std::string
LRStatusCodeToString(uint64_t status)
{
std::map<uint64_t, std::string> codes = {
{LR_StatusRecord::SUCCESS, "success"},
{LR_StatusRecord::FAIL_TIMEOUT, "timeout"},
{LR_StatusRecord::FAIL_CONGESTION, "congestion"},
{LR_StatusRecord::FAIL_DEST_UNKNOWN, "destination unknown"},
{LR_StatusRecord::FAIL_DECRYPT_ERROR, "decrypt error"},
{LR_StatusRecord::FAIL_MALFORMED_RECORD, "malformed record"},
{LR_StatusRecord::FAIL_DEST_INVALID, "destination invalid"},
{LR_StatusRecord::FAIL_CANNOT_CONNECT, "cannot connect"},
{LR_StatusRecord::FAIL_DUPLICATE_HOP, "duplicate hop"}};
std::stringstream ss;
ss << "[";
bool found = false;
for (const auto& [val, message] : codes)
{
if ((status & val) == val)
{
ss << (found ? ", " : "") << message;
found = true;
}
}
ss << "]";
return ss.str();
}
} // namespace llarp

@ -18,6 +18,7 @@ namespace llarp
{
struct PathContext;
struct IHopHandler;
struct TransitHop;
} // namespace path
struct LR_StatusRecord
@ -49,6 +50,9 @@ namespace llarp
OnKey(llarp_buffer_t* buffer, llarp_buffer_t* key);
};
std::string
LRStatusCodeToString(uint64_t status);
struct LR_StatusMessage : public ILinkMessage
{
std::array<EncryptedFrame, 8> frames;
@ -83,6 +87,7 @@ namespace llarp
static bool
CreateAndSend(
AbstractRouter* router,
std::shared_ptr<path::TransitHop> hop,
const PathID_t pathid,
const RouterID nextHop,
const SharedSecret pathKey,
@ -93,7 +98,10 @@ namespace llarp
static void
QueueSendMessage(
AbstractRouter* router, const RouterID nextHop, std::shared_ptr<LR_StatusMessage> msg);
AbstractRouter* router,
const RouterID nextHop,
std::shared_ptr<LR_StatusMessage> msg,
std::shared_ptr<path::TransitHop> hop);
static void
SendMessage(

@ -7,6 +7,7 @@
#include <llarp/messages/relay_status.hpp>
#include "pathbuilder.hpp"
#include "transit_hop.hpp"
#include <llarp/nodedb.hpp>
#include <llarp/profiling.hpp>
#include <llarp/router/abstractrouter.hpp>
#include <llarp/routing/dht_message.hpp>
@ -25,10 +26,10 @@ namespace llarp
{
Path::Path(
const std::vector<RouterContact>& h,
PathSet* parent,
std::weak_ptr<PathSet> pathset,
PathRole startingRoles,
std::string shortName)
: m_PathSet(parent), _role(startingRoles), m_shortName(std::move(shortName))
: m_PathSet{pathset}, _role{startingRoles}, m_shortName{std::move(shortName)}
{
hops.resize(h.size());
@ -54,7 +55,7 @@ namespace llarp
// initialize parts of the introduction
intro.router = hops[hsz - 1].rc.pubkey;
intro.pathID = hops[hsz - 1].txID;
if (parent)
if (auto parent = m_PathSet.lock())
EnterState(ePathBuilding, parent->Now());
}
@ -184,10 +185,6 @@ namespace llarp
{
failedAt = hops[index + 1].rc.pubkey;
}
else
{
failedAt = hops[index].rc.pubkey;
}
break;
}
++index;
@ -203,7 +200,14 @@ namespace llarp
if (failedAt)
{
r->NotifyRouterEvent<tooling::PathBuildRejectedEvent>(Endpoint(), RXID(), *failedAt);
LogWarn(Name(), " build failed at ", *failedAt);
LogWarn(
Name(),
" build failed at ",
*failedAt,
" status=",
LRStatusCodeToString(currentStatus),
" hops=",
HopsString());
r->routerProfiling().MarkHopFail(*failedAt);
}
else
@ -229,6 +233,13 @@ namespace llarp
llarp::LogDebug(
"Path build failed due to one or more nodes considered an "
"invalid destination");
if (failedAt)
{
r->loop()->call([nodedb = r->nodedb(), router = *failedAt]() {
LogInfo("router ", router, " is deregistered so we remove it");
nodedb->Remove(router);
});
}
}
else if (currentStatus & LR_StatusRecord::FAIL_CANNOT_CONNECT)
{
@ -257,7 +268,10 @@ namespace llarp
edge = *failedAt;
r->loop()->call([r, self = shared_from_this(), edge]() {
self->EnterState(ePathFailed, r->Now());
self->m_PathSet->HandlePathBuildFailedAt(self, edge);
if (auto parent = self->m_PathSet.lock())
{
parent->HandlePathBuildFailedAt(self, edge);
}
});
}
@ -276,7 +290,10 @@ namespace llarp
if (st == ePathExpired && _status == ePathBuilding)
{
_status = st;
m_PathSet->HandlePathBuildTimeout(shared_from_this());
if (auto parent = m_PathSet.lock())
{
parent->HandlePathBuildTimeout(shared_from_this());
}
}
else if (st == ePathBuilding)
{
@ -291,12 +308,19 @@ namespace llarp
{
LogInfo("path ", Name(), " died");
_status = st;
m_PathSet->HandlePathDied(shared_from_this());
if (auto parent = m_PathSet.lock())
{
parent->HandlePathDied(shared_from_this());
}
}
else if (st == ePathEstablished && _status == ePathTimeout)
{
LogInfo("path ", Name(), " reanimated");
}
else if (st == ePathIgnore)
{
LogInfo("path ", Name(), " ignored");
}
_status = st;
}
@ -371,11 +395,31 @@ namespace llarp
void
Path::Rebuild()
{
std::vector<RouterContact> newHops;
for (const auto& hop : hops)
newHops.emplace_back(hop.rc);
LogInfo(Name(), " rebuilding on ", ShortName());
m_PathSet->Build(newHops);
if (auto parent = m_PathSet.lock())
{
std::vector<RouterContact> newHops;
for (const auto& hop : hops)
newHops.emplace_back(hop.rc);
LogInfo(Name(), " rebuilding on ", ShortName());
parent->Build(newHops);
}
}
bool
Path::SendLatencyMessage(AbstractRouter* r)
{
const auto now = r->Now();
// send path latency test
routing::PathLatencyMessage latency{};
latency.T = randint();
latency.S = NextSeqNo();
m_LastLatencyTestID = latency.T;
m_LastLatencyTestTime = now;
LogDebug(Name(), " send latency test id=", latency.T);
if (not SendRoutingMessage(latency, r))
return false;
FlushUpstream(r);
return true;
}
void
@ -412,12 +456,12 @@ namespace llarp
auto dlt = now - m_LastLatencyTestTime;
if (dlt > path::latency_interval && m_LastLatencyTestID == 0)
{
routing::PathLatencyMessage latency;
latency.T = randint();
m_LastLatencyTestID = latency.T;
m_LastLatencyTestTime = now;
SendRoutingMessage(latency, r);
FlushUpstream(r);
SendLatencyMessage(r);
// latency test FEC
r->loop()->call_later(2s, [self = shared_from_this(), r]() {
if (self->m_LastLatencyTestID)
self->SendLatencyMessage(r);
});
return;
}
dlt = now - m_LastRecvMessage;
@ -510,7 +554,7 @@ namespace llarp
{
return now >= m_LastRecvMessage + PathReanimationTimeout;
}
if (_status == ePathEstablished)
if (_status == ePathEstablished or _status == ePathIgnore)
{
return now >= ExpireTime();
}
@ -654,16 +698,7 @@ namespace llarp
// persist session with upstream router until the path is done
r->PersistSessionUntil(Upstream(), intro.expiresAt);
MarkActive(now);
// send path latency test
routing::PathLatencyMessage latency;
latency.T = randint();
latency.S = NextSeqNo();
m_LastLatencyTestID = latency.T;
m_LastLatencyTestTime = now;
if (!SendRoutingMessage(latency, r))
return false;
FlushUpstream(r);
return true;
return SendLatencyMessage(r);
}
LogWarn("got unwarranted path confirm message on tx=", RXID(), " rx=", RXID());
return false;
@ -678,28 +713,48 @@ namespace llarp
bool
Path::HandleHiddenServiceFrame(const service::ProtocolFrame& frame)
{
MarkActive(m_PathSet->Now());
return m_DataHandler && m_DataHandler(shared_from_this(), frame);
if (auto parent = m_PathSet.lock())
{
MarkActive(parent->Now());
return m_DataHandler && m_DataHandler(shared_from_this(), frame);
}
return false;
}
template <typename Samples_t>
static llarp_time_t
computeLatency(const Samples_t& samps)
{
llarp_time_t mean = 0s;
if (samps.empty())
return mean;
for (const auto& samp : samps)
mean += samp;
return mean / samps.size();
}
constexpr auto MaxLatencySamples = 8;
bool
Path::HandlePathLatencyMessage(const routing::PathLatencyMessage& msg, AbstractRouter* r)
Path::HandlePathLatencyMessage(const routing::PathLatencyMessage&, AbstractRouter* r)
{
const auto now = r->Now();
MarkActive(now);
if (msg.L == m_LastLatencyTestID)
if (m_LastLatencyTestID)
{
intro.latency = now - m_LastLatencyTestTime;
m_LatencySamples.emplace_back(now - m_LastLatencyTestTime);
while (m_LatencySamples.size() > MaxLatencySamples)
m_LatencySamples.pop_front();
intro.latency = computeLatency(m_LatencySamples);
m_LastLatencyTestID = 0;
EnterState(ePathEstablished, now);
if (m_BuiltHook)
m_BuiltHook(shared_from_this());
m_BuiltHook = nullptr;
return true;
}
LogWarn("unwarranted path latency message via ", Upstream());
return false;
return true;
}
/// this is the Client's side of handling a DHT message. it's handled

@ -87,7 +87,7 @@ namespace llarp
HopList hops;
PathSet* const m_PathSet;
std::weak_ptr<PathSet> m_PathSet;
service::Introduction intro;
@ -95,7 +95,7 @@ namespace llarp
Path(
const std::vector<RouterContact>& routers,
PathSet* parent,
std::weak_ptr<PathSet> parent,
PathRole startingRoles,
std::string shortName);
@ -400,6 +400,9 @@ namespace llarp
HandleAllDownstream(std::vector<RelayDownstreamMessage> msgs, AbstractRouter* r) override;
private:
bool
SendLatencyMessage(AbstractRouter* r);
/// call obtained exit hooks
bool
InformExitResult(llarp_time_t b);
@ -424,7 +427,7 @@ namespace llarp
uint64_t m_RXRate = 0;
uint64_t m_LastTXRate = 0;
uint64_t m_TXRate = 0;
std::deque<llarp_time_t> m_LatencySamples;
const std::string m_shortName;
};
} // namespace path

@ -95,8 +95,9 @@ namespace llarp
typename Map_t,
typename Key_t,
typename CheckValue_t,
typename GetFunc_t>
HopHandler_ptr
typename GetFunc_t,
typename Return_ptr = HopHandler_ptr>
Return_ptr
MapGet(Map_t& map, const Key_t& k, CheckValue_t check, GetFunc_t get)
{
Lock_t lock(map.first);
@ -172,6 +173,46 @@ namespace llarp
});
}
std::optional<std::weak_ptr<TransitHop>>
PathContext::TransitHopByInfo(const TransitHopInfo& info)
{
// this is ugly as sin
auto own = MapGet<
SyncTransitMap_t::Lock_t,
decltype(m_TransitPaths),
PathID_t,
std::function<bool(const std::shared_ptr<TransitHop>&)>,
std::function<TransitHop*(const std::shared_ptr<TransitHop>&)>,
TransitHop*>(
m_TransitPaths,
info.txID,
[info](const auto& hop) -> bool { return hop->info == info; },
[](const auto& hop) -> TransitHop* { return hop.get(); });
if (own)
return own->weak_from_this();
return std::nullopt;
}
std::optional<std::weak_ptr<TransitHop>>
PathContext::TransitHopByUpstream(const RouterID& upstream, const PathID_t& id)
{
// this is ugly as sin as well
auto own = MapGet<
SyncTransitMap_t::Lock_t,
decltype(m_TransitPaths),
PathID_t,
std::function<bool(const std::shared_ptr<TransitHop>&)>,
std::function<TransitHop*(const std::shared_ptr<TransitHop>&)>,
TransitHop*>(
m_TransitPaths,
id,
[upstream](const auto& hop) -> bool { return hop->info.upstream == upstream; },
[](const auto& hop) -> TransitHop* { return hop.get(); });
if (own)
return own->weak_from_this();
return std::nullopt;
}
HopHandler_ptr
PathContext::GetByUpstream(const RouterID& remote, const PathID_t& id)
{
@ -225,7 +266,8 @@ namespace llarp
auto itr = map.second.find(id);
if (itr != map.second.end())
{
return itr->second->m_PathSet->GetSelf();
if (auto parent = itr->second->m_PathSet.lock())
return parent;
}
return nullptr;
}

@ -77,6 +77,12 @@ namespace llarp
HopHandler_ptr
GetByDownstream(const RouterID& id, const PathID_t& path);
std::optional<std::weak_ptr<TransitHop>>
TransitHopByInfo(const TransitHopInfo&);
std::optional<std::weak_ptr<TransitHop>>
TransitHopByUpstream(const RouterID&, const PathID_t&);
PathSet_ptr
GetLocalPathSet(const PathID_t& id);

@ -6,8 +6,10 @@
#include "path_context.hpp"
#include <llarp/profiling.hpp>
#include <llarp/router/abstractrouter.hpp>
#include <llarp/router/i_rc_lookup_handler.hpp>
#include <llarp/util/buffer.hpp>
#include <llarp/tooling/path_event.hpp>
#include <llarp/link/link_manager.hpp>
#include <functional>
@ -192,6 +194,9 @@ namespace llarp
void Builder::Tick(llarp_time_t)
{
const auto now = llarp::time_now_ms();
m_router->pathBuildLimiter().Decay(now);
ExpirePaths(now, m_router);
if (ShouldBuildMore(now))
BuildOne();
@ -211,8 +216,8 @@ namespace llarp
{
util::StatusObject obj{
{"buildStats", m_BuildStats.ExtractStatus()},
{"numHops", uint64_t(numHops)},
{"numPaths", uint64_t(numDesiredPaths)}};
{"numHops", uint64_t{numHops}},
{"numPaths", uint64_t{numDesiredPaths}}};
std::transform(
m_Paths.begin(),
m_Paths.end(),
@ -240,6 +245,9 @@ namespace llarp
if (BuildCooldownHit(rc.pubkey))
return;
if (m_router->routerProfiling().IsBadForPath(rc.pubkey))
return;
found = rc;
}
},
@ -251,7 +259,7 @@ namespace llarp
Builder::GetHopsForBuild()
{
auto filter = [r = m_router](const auto& rc) -> bool {
return not r->routerProfiling().IsBadForPath(rc.pubkey);
return not r->routerProfiling().IsBadForPath(rc.pubkey, 1);
};
if (const auto maybe = m_router->nodedb()->GetRandom(filter))
{
@ -264,14 +272,12 @@ namespace llarp
Builder::Stop()
{
_run = false;
// tell all our paths that they have expired
// tell all our paths that they are to be ignored
const auto now = Now();
for (auto& item : m_Paths)
{
item.second->EnterState(ePathExpired, now);
item.second->EnterState(ePathIgnore, now);
}
// remove expired paths
ExpirePaths(now, m_router);
return true;
}
@ -284,7 +290,7 @@ namespace llarp
bool
Builder::ShouldRemove() const
{
return IsStopped();
return IsStopped() and NumInStatus(ePathEstablished) == 0;
}
const SecretKey&
@ -368,7 +374,7 @@ namespace llarp
hopsSet.insert(endpointRC);
hopsSet.insert(hops.begin(), hops.end());
if (r->routerProfiling().IsBadForPath(rc.pubkey))
if (r->routerProfiling().IsBadForPath(rc.pubkey, 1))
return false;
for (const auto& hop : hopsSet)
{
@ -430,7 +436,7 @@ namespace llarp
ctx->pathset = self;
std::string path_shortName = "[path " + m_router->ShortName() + "-";
path_shortName = path_shortName + std::to_string(m_router->NextPathBuildNumber()) + "]";
auto path = std::make_shared<path::Path>(hops, self.get(), roles, std::move(path_shortName));
auto path = std::make_shared<path::Path>(hops, GetWeak(), roles, std::move(path_shortName));
LogInfo(Name(), " build ", path->ShortName(), ": ", path->HopsString());
path->SetBuildResultHook([self](Path_ptr p) { self->HandlePathBuilt(p); });
@ -473,6 +479,31 @@ namespace llarp
m_router->routerProfiling().MarkPathTimeout(p.get());
PathSet::HandlePathBuildTimeout(p);
DoPathBuildBackoff();
for (const auto& hop : p->hops)
{
const RouterID router{hop.rc.pubkey};
// look up router and see if it's still on the network
m_router->loop()->call_soon([router, r = m_router]() {
LogInfo("looking up ", router, " because of path build timeout");
r->rcLookupHandler().GetRC(
router,
[r](const auto& router, const auto* rc, auto result) {
if (result == RCRequestResult::Success && rc != nullptr)
{
LogInfo("refreshed rc for ", router);
r->nodedb()->PutIfNewer(*rc);
}
else
{
// remove all connections to this router as it's probably not registered anymore
LogWarn("removing router ", router, " because of path build timeout");
r->linkManager().DeregisterPeer(router);
r->nodedb()->Remove(router);
}
},
true);
});
}
}
void

@ -57,7 +57,7 @@ namespace llarp
DoPathBuildBackoff();
public:
AbstractRouter* m_router;
AbstractRouter* const m_router;
SecretKey enckey;
size_t numHops;
llarp_time_t lastBuild = 0s;

@ -96,9 +96,10 @@ namespace llarp
}
Path_ptr
PathSet::GetEstablishedPathClosestTo(RouterID id, PathRole roles) const
PathSet::GetEstablishedPathClosestTo(
RouterID id, std::unordered_set<RouterID> excluding, PathRole roles) const
{
Lock_t l(m_PathsMutex);
Lock_t l{m_PathsMutex};
Path_ptr path = nullptr;
AlignedBuffer<32> dist;
AlignedBuffer<32> to = id;
@ -109,6 +110,8 @@ namespace llarp
continue;
if (!item.second->SupportsAnyRoles(roles))
continue;
if (excluding.count(item.second->Endpoint()))
continue;
AlignedBuffer<32> localDist = item.second->Endpoint() ^ to;
if (localDist < dist)
{
@ -280,44 +283,24 @@ namespace llarp
return itr->second;
}
bool
std::optional<std::set<service::Introduction>>
PathSet::GetCurrentIntroductionsWithFilter(
std::set<service::Introduction>& intros,
std::function<bool(const service::Introduction&)> filter) const
{
intros.clear();
size_t count = 0;
Lock_t l(m_PathsMutex);
std::set<service::Introduction> intros;
Lock_t l{m_PathsMutex};
auto itr = m_Paths.begin();
while (itr != m_Paths.end())
{
if (itr->second->IsReady() && filter(itr->second->intro))
if (itr->second->IsReady() and filter(itr->second->intro))
{
intros.insert(itr->second->intro);
++count;
}
++itr;
}
return count > 0;
}
bool
PathSet::GetCurrentIntroductions(std::set<service::Introduction>& intros) const
{
intros.clear();
size_t count = 0;
Lock_t l(m_PathsMutex);
auto itr = m_Paths.begin();
while (itr != m_Paths.end())
{
if (itr->second->IsReady())
{
intros.insert(itr->second->intro);
++count;
}
++itr;
}
return count > 0;
if (intros.empty())
return std::nullopt;
return intros;
}
void

@ -13,6 +13,7 @@
#include <list>
#include <map>
#include <tuple>
#include <unordered_set>
namespace std
{
@ -116,6 +117,10 @@ namespace llarp
virtual PathSet_ptr
GetSelf() = 0;
/// get a weak_ptr of ourself
virtual std::weak_ptr<PathSet>
GetWeak() = 0;
virtual void
BuildOne(PathRole roles = ePathRoleAny) = 0;
@ -235,7 +240,10 @@ namespace llarp
}
Path_ptr
GetEstablishedPathClosestTo(RouterID router, PathRole roles = ePathRoleAny) const;
GetEstablishedPathClosestTo(
RouterID router,
std::unordered_set<RouterID> excluding = {},
PathRole roles = ePathRoleAny) const;
Path_ptr
PickEstablishedPath(PathRole roles = ePathRoleAny) const;
@ -258,14 +266,10 @@ namespace llarp
Path_ptr
GetByEndpointWithID(RouterID router, PathID_t id) const;
bool
std::optional<std::set<service::Introduction>>
GetCurrentIntroductionsWithFilter(
std::set<service::Introduction>& intros,
std::function<bool(const service::Introduction&)> filter) const;
bool
GetCurrentIntroductions(std::set<service::Introduction>& intros) const;
virtual bool
PublishIntroSet(const service::EncryptedIntroSet&, AbstractRouter*)
{

@ -64,22 +64,9 @@ namespace llarp
// TODO: add to IHopHandler some notion of "path status"
const uint64_t ourStatus = LR_StatusRecord::SUCCESS;
if (!msg->AddFrame(pathKey, ourStatus))
{
return false;
}
LR_StatusMessage::QueueSendMessage(r, info.downstream, msg);
if ((status & LR_StatusRecord::SUCCESS) != LR_StatusRecord::SUCCESS)
{
LogWarn(
"TransitHop received non-successful LR_StatusMessage, queueing "
"self-destruct status=",
status);
QueueDestroySelf(r);
}
msg->AddFrame(pathKey, ourStatus);
LR_StatusMessage::QueueSendMessage(r, info.downstream, msg, shared_from_this());
return true;
}

@ -185,6 +185,9 @@ namespace llarp
void
FlushDownstream(AbstractRouter* r) override;
void
QueueDestroySelf(AbstractRouter* r);
protected:
void
UpstreamWork(TrafficQueue_ptr queue, AbstractRouter* r) override;
@ -202,9 +205,6 @@ namespace llarp
void
SetSelfDestruct();
void
QueueDestroySelf(AbstractRouter* r);
std::set<std::shared_ptr<TransitHop>, ComparePtr<std::shared_ptr<TransitHop>>> m_FlushOthers;
thread::Queue<RelayUpstreamMessage> m_UpstreamGather;
thread::Queue<RelayDownstreamMessage> m_DownstreamGather;

@ -64,7 +64,7 @@ namespace llarp
void
RouterProfile::Tick()
{
static constexpr auto updateInterval = 30min;
static constexpr auto updateInterval = 30s;
const auto now = llarp::time_now_ms();
if (lastDecay < now && now - lastDecay > updateInterval)
Decay();
@ -96,8 +96,9 @@ namespace llarp
bool
RouterProfile::IsGoodForPath(uint64_t chances) const
{
return checkIsGood(pathFailCount, pathSuccessCount, chances)
and checkIsGood(pathTimeoutCount, pathSuccessCount, chances);
if (pathTimeoutCount > chances)
return false;
return checkIsGood(pathFailCount, pathSuccessCount, chances);
}
Profiling::Profiling() : m_DisableProfiling(false)
@ -210,15 +211,10 @@ namespace llarp
Profiling::MarkPathTimeout(path::Path* p)
{
util::Lock lock{m_ProfilesMutex};
size_t idx = 0;
for (const auto& hop : p->hops)
{
if (idx)
{
m_Profiles[hop.rc.pubkey].pathTimeoutCount += 1;
m_Profiles[hop.rc.pubkey].lastUpdated = llarp::time_now_ms();
}
++idx;
m_Profiles[hop.rc.pubkey].pathTimeoutCount += 1;
m_Profiles[hop.rc.pubkey].lastUpdated = llarp::time_now_ms();
}
}

@ -528,6 +528,7 @@ namespace llarp::quic
if (not continue_connecting(
pport, (bool)maybe_remote, "endpoint ONS lookup", remote_addr))
return;
service_endpoint_.MarkAddressOutbound(*maybe_remote);
service_endpoint_.EnsurePathTo(*maybe_remote, after_path, open_timeout);
});
return result;
@ -539,7 +540,10 @@ namespace llarp::quic
if (auto maybe_convo = service_endpoint_.GetBestConvoTagFor(remote))
after_path(maybe_convo);
else
{
service_endpoint_.MarkAddressOutbound(remote);
service_endpoint_.EnsurePathTo(remote, after_path, open_timeout);
}
return result;
}

@ -12,6 +12,7 @@
#include <llarp/router_contact.hpp>
#include <llarp/tooling/router_event.hpp>
#include <llarp/peerstats/peer_db.hpp>
#include <llarp/consensus/reachability_testing.hpp>
#include <optional>
@ -283,14 +284,21 @@ namespace llarp
/// set router's service node whitelist
virtual void
SetRouterWhitelist(const std::vector<RouterID> routers) = 0;
SetRouterWhitelist(
const std::vector<RouterID>& whitelist, const std::vector<RouterID>& greylist) = 0;
virtual std::unordered_set<RouterID>
GetRouterWhitelist() const = 0;
/// visit each connected link session
virtual void
ForEachPeer(std::function<void(const ILinkSession*, bool)> visit, bool randomize) const = 0;
virtual bool
ConnectionToRouterAllowed(const RouterID& router) const = 0;
SessionToRouterAllowed(const RouterID& router) const = 0;
virtual bool
PathToRouterAllowed(const RouterID& router) const = 0;
/// return true if we have an exit as a client
virtual bool

@ -33,13 +33,20 @@ namespace llarp
RemoveValidRouter(const RouterID& router) = 0;
virtual void
SetRouterWhitelist(const std::vector<RouterID>& routers) = 0;
SetRouterWhitelist(
const std::vector<RouterID>& whitelist, const std::vector<RouterID>& greylist) = 0;
virtual void
GetRC(const RouterID& router, RCRequestCallback callback, bool forceLookup = false) = 0;
virtual bool
RemoteIsAllowed(const RouterID& remote) const = 0;
PathIsAllowed(const RouterID& remote) const = 0;
virtual bool
SessionIsAllowed(const RouterID& remote) const = 0;
virtual bool
IsGreylisted(const RouterID& remote) const = 0;
virtual bool
CheckRC(const RouterContact& rc) const = 0;

@ -26,12 +26,11 @@ namespace llarp
const RouterID& remote, const ILinkMessage& msg, SendStatusHandler callback)
{
// if the destination is invalid, callback with failure and return
if (not _linkManager->SessionIsClient(remote) and not _lookupHandler->RemoteIsAllowed(remote))
if (not _linkManager->SessionIsClient(remote) and not _lookupHandler->SessionIsAllowed(remote))
{
DoCallback(callback, SendStatus::InvalidRouter);
return true;
}
const uint16_t priority = msg.Priority();
std::array<byte_t, MAX_LINK_MSG_SIZE> linkmsg_buffer;
llarp_buffer_t buf(linkmsg_buffer);

@ -13,6 +13,8 @@
#include <llarp/crypto/crypto.hpp>
#include <utility>
#include <llarp/rpc/lokid_rpc_client.hpp>
namespace llarp
{
struct PendingSession
@ -35,18 +37,18 @@ namespace llarp
// TODO: do we want to keep it
const auto router = RouterID(session->GetPubKey());
const bool isOutbound = not session->IsInbound();
const std::string remoteType = session->GetRemoteRC().IsPublicRouter() ? "router" : "client";
LogInfo("session with ", remoteType, " [", router, "] established");
LogInfo(
"session with ", remoteType, " [", router, "] ", isOutbound ? "established" : "received");
if (not _rcLookup->RemoteIsAllowed(router))
if (not _rcLookup->SessionIsAllowed(router))
{
FinalizeRequest(router, SessionResult::InvalidRouter);
return false;
}
auto func = std::bind(&OutboundSessionMaker::VerifyRC, this, session->GetRemoteRC());
work(func);
work([this, rc = session->GetRemoteRC()] { VerifyRC(rc); });
return true;
}
@ -54,13 +56,9 @@ namespace llarp
void
OutboundSessionMaker::OnConnectTimeout(ILinkSession* session)
{
// TODO: retry/num attempts
LogWarn(
"Session establish attempt to ",
RouterID(session->GetPubKey()),
" timed out.",
session->GetRemoteEndpoint());
FinalizeRequest(session->GetPubKey(), SessionResult::Timeout);
const auto router = RouterID(session->GetPubKey());
LogWarn("Session establish attempt to ", router, " timed out.", session->GetRemoteEndpoint());
FinalizeRequest(router, SessionResult::Timeout);
}
void
@ -76,6 +74,7 @@ namespace llarp
if (HavePendingSessionTo(router))
{
LogDebug("has pending session to", router);
return;
}
@ -133,7 +132,7 @@ namespace llarp
break;
exclude.insert(other.pubkey);
if (not _rcLookup->RemoteIsAllowed(other.pubkey))
if (not _rcLookup->SessionIsAllowed(other.pubkey))
{
continue;
}
@ -227,12 +226,20 @@ namespace llarp
{
_loop->call([this, router] { DoEstablish(router); });
}
else if (_linkManager->HasSessionTo(router))
{
FinalizeRequest(router, SessionResult::Establish);
}
else
{
FinalizeRequest(router, SessionResult::NoLink);
}
}
bool
OutboundSessionMaker::ShouldConnectTo(const RouterID& router) const
{
if (router == us or not _rcLookup->RemoteIsAllowed(router))
if (router == us or not _rcLookup->SessionIsAllowed(router))
return false;
size_t numPending = 0;
{
@ -240,8 +247,10 @@ namespace llarp
if (pendingSessions.find(router) == pendingSessions.end())
numPending += pendingSessions.size();
}
if (_linkManager->HasSessionTo(router))
if (_linkManager->HasOutboundSessionTo(router))
return false;
if (_router->IsServiceNode())
return true;
return _linkManager->NumberOfConnectedRouters() + numPending < maxConnectedRouters;
}
@ -263,6 +272,7 @@ namespace llarp
{
if (not HavePendingSessionTo(router))
{
LogError("no pending session to ", router);
return;
}
@ -276,6 +286,7 @@ namespace llarp
else
{
LogError("RCRequestResult::Success but null rc pointer given");
InvalidRouter(router);
}
break;
case RCRequestResult::InvalidRouter:
@ -285,6 +296,7 @@ namespace llarp
RouterNotFound(router);
break;
default:
RouterNotFound(router);
break;
}
}

@ -33,17 +33,23 @@ namespace llarp
}
void
RCLookupHandler::SetRouterWhitelist(const std::vector<RouterID>& routers)
RCLookupHandler::SetRouterWhitelist(
const std::vector<RouterID>& whitelist, const std::vector<RouterID>& greylist)
{
if (routers.empty())
if (whitelist.empty())
return;
util::Lock l(_mutex);
whitelistRouters.clear();
for (auto& router : routers)
greylistRouters.clear();
for (auto& router : whitelist)
{
whitelistRouters.emplace(router);
}
for (auto& router : greylist)
{
greylistRouters.emplace(router);
}
LogInfo("lokinet service node list now has ", whitelistRouters.size(), " routers");
}
@ -119,7 +125,24 @@ namespace llarp
}
bool
RCLookupHandler::RemoteIsAllowed(const RouterID& remote) const
RCLookupHandler::IsGreylisted(const RouterID& remote) const
{
if (_strictConnectPubkeys.size() && _strictConnectPubkeys.count(remote) == 0
&& !RemoteInBootstrap(remote))
{
return false;
}
if (not useWhitelist)
return false;
util::Lock lock{_mutex};
return greylistRouters.count(remote);
}
bool
RCLookupHandler::PathIsAllowed(const RouterID& remote) const
{
if (_strictConnectPubkeys.size() && _strictConnectPubkeys.count(remote) == 0
&& !RemoteInBootstrap(remote))
@ -135,10 +158,27 @@ namespace llarp
return whitelistRouters.count(remote);
}
bool
RCLookupHandler::SessionIsAllowed(const RouterID& remote) const
{
if (_strictConnectPubkeys.size() && _strictConnectPubkeys.count(remote) == 0
&& !RemoteInBootstrap(remote))
{
return false;
}
if (not useWhitelist)
return true;
util::Lock lock{_mutex};
return whitelistRouters.count(remote) or greylistRouters.count(remote);
}
bool
RCLookupHandler::CheckRC(const RouterContact& rc) const
{
if (not RemoteIsAllowed(rc.pubkey))
if (not SessionIsAllowed(rc.pubkey))
{
_dht->impl->DelRCNodeAsync(dht::Key_t{rc.pubkey});
return false;
@ -189,7 +229,7 @@ namespace llarp
if (newrc.pubkey != oldrc.pubkey)
return false;
if (!RemoteIsAllowed(newrc.pubkey))
if (!SessionIsAllowed(newrc.pubkey))
return false;
auto func = std::bind(&RCLookupHandler::CheckRC, this, newrc);
@ -332,7 +372,7 @@ namespace llarp
return;
}
if (not RemoteIsAllowed(remote))
if (not SessionIsAllowed(remote))
{
FinalizeRequest(remote, &results[0], RCRequestResult::InvalidRouter);
return;

@ -41,7 +41,9 @@ namespace llarp
RemoveValidRouter(const RouterID& router) override EXCLUDES(_mutex);
void
SetRouterWhitelist(const std::vector<RouterID>& routers) override EXCLUDES(_mutex);
SetRouterWhitelist(
const std::vector<RouterID>& whitelist, const std::vector<RouterID>& greylist) override
EXCLUDES(_mutex);
bool
HaveReceivedWhitelist() const override;
@ -51,7 +53,13 @@ namespace llarp
EXCLUDES(_mutex);
bool
RemoteIsAllowed(const RouterID& remote) const override EXCLUDES(_mutex);
PathIsAllowed(const RouterID& remote) const override EXCLUDES(_mutex);
bool
SessionIsAllowed(const RouterID& remote) const override EXCLUDES(_mutex);
bool
IsGreylisted(const RouterID& remote) const override EXCLUDES(_mutex);
bool
CheckRC(const RouterContact& rc) const override;
@ -84,6 +92,13 @@ namespace llarp
bool useWhitelist_arg,
bool isServiceNode_arg);
std::unordered_set<RouterID>
Whitelist() const
{
util::Lock lock{_mutex};
return whitelistRouters;
}
private:
void
HandleDHTLookupResult(RouterID remote, const std::vector<RouterContact>& results);
@ -120,6 +135,7 @@ namespace llarp
bool isServiceNode = false;
std::unordered_set<RouterID> whitelistRouters GUARDED_BY(_mutex);
std::unordered_set<RouterID> greylistRouters GUARDED_BY(_mutex);
using TimePoint = std::chrono::steady_clock::time_point;
std::unordered_map<RouterID, TimePoint> _routerLookupTimes;

@ -42,7 +42,7 @@
#include <oxenmq/oxenmq.h>
static constexpr std::chrono::milliseconds ROUTER_TICK_INTERVAL = 100ms;
static constexpr std::chrono::milliseconds ROUTER_TICK_INTERVAL = 250ms;
namespace llarp
{
@ -63,7 +63,6 @@ namespace llarp
#else
, _randomStartDelay(std::chrono::seconds((llarp::randint() % 30) + 10))
#endif
, m_lokidRpcClient(std::make_shared<rpc::LokidRpcClient>(m_lmq, this))
{
m_keyManager = std::make_shared<KeyManager>();
// for lokid, so we don't close the connection when syncing the whitelist
@ -290,7 +289,10 @@ namespace llarp
auto& conf = *m_Config;
whitelistRouters = conf.lokid.whitelistRouters;
if (whitelistRouters)
{
lokidRPCAddr = oxenmq::address(conf.lokid.lokidRPCAddr);
m_lokidRpcClient = std::make_shared<rpc::LokidRpcClient>(m_lmq, weak_from_this());
}
enableRPCServer = conf.api.m_enableRPCServer;
if (enableRPCServer)
@ -375,21 +377,27 @@ namespace llarp
}
bool
Router::LooksDeregistered() const
Router::LooksDecommissioned() const
{
return IsServiceNode() and whitelistRouters and _rcLookupHandler.HaveReceivedWhitelist()
and not _rcLookupHandler.RemoteIsAllowed(pubkey());
and _rcLookupHandler.IsGreylisted(pubkey());
}
bool
Router::SessionToRouterAllowed(const RouterID& router) const
{
return _rcLookupHandler.SessionIsAllowed(router);
}
bool
Router::ConnectionToRouterAllowed(const RouterID& router) const
Router::PathToRouterAllowed(const RouterID& router) const
{
if (LooksDeregistered())
if (LooksDecommissioned())
{
// we are deregistered don't allow any connections outbound at all
// we are decom'd don't allow any paths outbound at all
return false;
}
return _rcLookupHandler.RemoteIsAllowed(router);
return _rcLookupHandler.PathIsAllowed(router);
}
size_t
@ -748,7 +756,7 @@ namespace llarp
ss << " snode | known/svc/clients: " << nodedb()->NumLoaded() << "/"
<< NumberOfConnectedRouters() << "/" << NumberOfConnectedClients() << " | "
<< pathContext().CurrentTransitPaths() << " active paths | "
<< "block " << m_lokidRpcClient->BlockHeight();
<< "block " << (m_lokidRpcClient ? m_lokidRpcClient->BlockHeight() : 0);
}
else
{
@ -780,7 +788,7 @@ namespace llarp
const bool gotWhitelist = _rcLookupHandler.HaveReceivedWhitelist();
const bool isSvcNode = IsServiceNode();
const bool looksDeregistered = LooksDeregistered();
const bool decom = LooksDecommissioned();
if (_rc.ExpiresSoon(now, std::chrono::milliseconds(randint() % 10000))
|| (now - _rc.last_updated) > rcRegenInterval)
@ -789,8 +797,10 @@ namespace llarp
if (!UpdateOurRC(false))
LogError("Failed to update our RC");
}
else if (not looksDeregistered)
else if (whitelistRouters and gotWhitelist and _rcLookupHandler.SessionIsAllowed(pubkey()))
{
// if we have the whitelist enabled, we have fetched the list and we are in either
// the white or grey list, we want to gossip our RC
GossipRCIfNeeded(_rc);
}
// remove RCs for nodes that are no longer allowed by network policy
@ -815,7 +825,7 @@ namespace llarp
// the whitelist enabled and we got the whitelist
// check against the whitelist and remove if it's not
// in the whitelist OR if there is no whitelist don't remove
return not _rcLookupHandler.RemoteIsAllowed(rc.pubkey);
return not _rcLookupHandler.SessionIsAllowed(rc.pubkey);
});
// find all deregistered relays
@ -827,7 +837,7 @@ namespace llarp
if (not session)
return;
const auto pk = session->GetPubKey();
if (session->IsRelay() and not _rcLookupHandler.RemoteIsAllowed(pk))
if (session->IsRelay() and not _rcLookupHandler.SessionIsAllowed(pk))
{
closePeers.emplace(pk);
}
@ -860,7 +870,7 @@ namespace llarp
const int interval = isSvcNode ? 5 : 2;
const auto timepoint_now = Clock_t::now();
if (timepoint_now >= m_NextExploreAt and not looksDeregistered)
if (timepoint_now >= m_NextExploreAt and not decom)
{
_rcLookupHandler.ExploreNetwork();
m_NextExploreAt = timepoint_now + std::chrono::seconds(interval);
@ -872,15 +882,15 @@ namespace llarp
connectToNum = strictConnect;
}
if (looksDeregistered)
if (decom)
{
// kill all sessions that are open because we think we are deregistered
_linkManager.ForEachPeer([](auto* peer) {
if (peer)
peer->Close();
});
// complain about being deregistered
LogError("We are running as a service node but we seem to be decommissioned");
if (now >= m_NextDecommissionWarn)
{
constexpr auto DecommissionWarnInterval = 30s;
LogError("We are running as a service node but we seem to be decommissioned");
m_NextDecommissionWarn = now + DecommissionWarnInterval;
}
}
else if (connected < connectToNum)
{
@ -1032,9 +1042,10 @@ namespace llarp
}
void
Router::SetRouterWhitelist(const std::vector<RouterID> routers)
Router::SetRouterWhitelist(
const std::vector<RouterID>& whitelist, const std::vector<RouterID>& greylist)
{
_rcLookupHandler.SetRouterWhitelist(routers);
_rcLookupHandler.SetRouterWhitelist(whitelist, greylist);
}
bool
@ -1181,6 +1192,75 @@ namespace llarp
#if defined(WITH_SYSTEMD)
::sd_notify(0, "READY=1");
#endif
if (whitelistRouters)
{
// do service node testing if we are in service node whitelist mode
_loop->call_every(consensus::REACHABILITY_TESTING_TIMER_INTERVAL, weak_from_this(), [this] {
// dont run tests if we are not running or we are stopping
if (not _running)
return;
// dont run tests if we are decommissioned
if (LooksDecommissioned())
return;
auto tests = m_routerTesting.get_failing();
if (auto maybe = m_routerTesting.next_random(this))
{
tests.emplace_back(*maybe, 0);
}
for (const auto& [router, fails] : tests)
{
if (not SessionToRouterAllowed(router))
{
LogDebug(
router,
" is no longer a registered service node so we remove it from the testing list");
m_routerTesting.remove_node_from_failing(router);
continue;
}
LogDebug("Establishing session to ", router, " for SN testing");
// try to make a session to this random router
// this will do a dht lookup if needed
_outboundSessionMaker.CreateSessionTo(
router, [previous_fails = fails, this](const auto& router, const auto result) {
auto rpc = RpcClient();
if (result != SessionResult::Establish)
{
// failed connection mark it as so
m_routerTesting.add_failing_node(router, previous_fails);
LogInfo(
"FAILED SN connection test to ",
router,
" (",
previous_fails + 1,
" consecutive failures)");
}
else
{
m_routerTesting.remove_node_from_failing(router);
if (previous_fails > 0)
{
LogInfo(
"Successful SN connection test to ",
router,
" after ",
previous_fails,
" failures");
}
else
{
LogDebug("Successful SN connection test to ", router);
}
}
if (rpc)
{
// inform as needed
rpc->InformConnection(router, result == SessionResult::Establish);
}
});
}
});
}
LogContext::Instance().DropToRuntimeLevel();
return _running;
}
@ -1314,7 +1394,7 @@ namespace llarp
return false;
}
if (!_rcLookupHandler.RemoteIsAllowed(rc.pubkey))
if (not _rcLookupHandler.SessionIsAllowed(rc.pubkey))
{
return false;
}
@ -1327,8 +1407,10 @@ namespace llarp
void
Router::QueueWork(std::function<void(void)> func)
{
_loop->call_soon(func);
// m_lmq->job(std::move(func));
if (m_isServiceNode)
_loop->call_soon(std::move(func));
else
m_lmq->job(std::move(func));
}
void

@ -131,7 +131,14 @@ namespace llarp
ModifyOurRC(std::function<std::optional<RouterContact>(RouterContact)> modify) override;
void
SetRouterWhitelist(const std::vector<RouterID> routers) override;
SetRouterWhitelist(
const std::vector<RouterID>& whitelist, const std::vector<RouterID>& greylist) override;
std::unordered_set<RouterID>
GetRouterWhitelist() const override
{
return _rcLookupHandler.Whitelist();
}
exit::Context&
exitContext() override
@ -181,9 +188,9 @@ namespace llarp
void
QueueDiskIO(std::function<void(void)> func) override;
/// return true if we look like we are a deregistered service node
/// return true if we look like we are a decommissioned service node
bool
LooksDeregistered() const;
LooksDecommissioned() const;
std::optional<SockAddr> _ourAddress;
@ -392,7 +399,9 @@ namespace llarp
EnsureEncryptionKey();
bool
ConnectionToRouterAllowed(const RouterID& router) const override;
SessionToRouterAllowed(const RouterID& router) const override;
bool
PathToRouterAllowed(const RouterID& router) const override;
void
HandleSaveRC() const;
@ -530,12 +539,14 @@ namespace llarp
bool m_isServiceNode = false;
llarp_time_t m_LastStatsReport = 0s;
llarp_time_t m_NextDecommissionWarn = 0s;
std::shared_ptr<llarp::KeyManager> m_keyManager;
std::shared_ptr<PeerDb> m_peerDb;
uint32_t path_build_count = 0;
consensus::reachability_testing m_routerTesting;
bool
ShouldReportStats(llarp_time_t now) const;

@ -32,8 +32,8 @@ namespace llarp
}
}
LokidRpcClient::LokidRpcClient(LMQ_ptr lmq, AbstractRouter* r)
: m_lokiMQ(std::move(lmq)), m_Router(r)
LokidRpcClient::LokidRpcClient(LMQ_ptr lmq, std::weak_ptr<AbstractRouter> r)
: m_lokiMQ{std::move(lmq)}, m_Router{std::move(r)}
{
// m_lokiMQ->log_level(toLokiMQLogLevel(LogLevel::Instance().curLevel));
@ -51,18 +51,24 @@ namespace llarp
void
LokidRpcClient::ConnectAsync(oxenmq::address url)
{
if (not m_Router->IsServiceNode())
if (auto router = m_Router.lock())
{
throw std::runtime_error("we cannot talk to lokid while not a service node");
if (not router->IsServiceNode())
{
throw std::runtime_error("we cannot talk to lokid while not a service node");
}
LogInfo("connecting to lokid via LMQ at ", url);
m_Connection = m_lokiMQ->connect_remote(
url,
[self = shared_from_this()](oxenmq::ConnectionID) { self->Connected(); },
[self = shared_from_this(), url](oxenmq::ConnectionID, std::string_view f) {
llarp::LogWarn("Failed to connect to lokid: ", f);
if (auto router = self->m_Router.lock())
{
router->loop()->call([self, url]() { self->ConnectAsync(url); });
}
});
}
LogInfo("connecting to lokid via LMQ at ", url);
m_Connection = m_lokiMQ->connect_remote(
url,
[self = shared_from_this()](oxenmq::ConnectionID) { self->Connected(); },
[self = shared_from_this(), url](oxenmq::ConnectionID, std::string_view f) {
llarp::LogWarn("Failed to connect to lokid: ", f);
self->m_Router->loop()->call([self, url]() { self->ConnectAsync(url); });
});
}
void
@ -89,25 +95,25 @@ namespace llarp
}
catch (std::exception& ex)
{
LogError("bad block hieght: ", ex.what());
LogError("bad block height: ", ex.what());
return; // bail
}
LogDebug("new block at hieght ", m_BlockHeight);
LogDebug("new block at height ", m_BlockHeight);
// don't upadate on block notification if an update is pending
if (not m_UpdatingList)
UpdateServiceNodeList(std::string{msg.data[1]});
UpdateServiceNodeList();
}
void
LokidRpcClient::UpdateServiceNodeList(std::string topblock)
LokidRpcClient::UpdateServiceNodeList()
{
nlohmann::json request, fields;
fields["pubkey_ed25519"] = true;
fields["service_node_pubkey"] = true;
fields["funded"] = true;
fields["active"] = true;
request["fields"] = fields;
request["active_only"] = true;
if (not topblock.empty())
request["poll_block_hash"] = topblock;
m_UpdatingList = true;
Request(
"rpc.get_service_nodes",
@ -161,7 +167,7 @@ namespace llarp
};
m_lokiMQ->add_timer(makePingRequest, PingInterval);
// initial fetch of service node list
UpdateServiceNodeList("");
UpdateServiceNodeList();
}
void
@ -179,8 +185,8 @@ namespace llarp
}
}
}
std::vector<RouterID> nodeList;
std::unordered_map<RouterID, PubKey> keymap;
std::vector<RouterID> activeNodeList, nonActiveNodeList;
{
const auto itr = j.find("service_node_states");
if (itr != j.end() and itr->is_array())
@ -190,22 +196,81 @@ namespace llarp
const auto ed_itr = j_itr->find("pubkey_ed25519");
if (ed_itr == j_itr->end() or not ed_itr->is_string())
continue;
const auto svc_itr = j_itr->find("service_node_pubkey");
if (svc_itr == j_itr->end() or not svc_itr->is_string())
continue;
const auto funded_itr = j_itr->find("funded");
if (funded_itr == j_itr->end() or not funded_itr->is_boolean())
continue;
const auto active_itr = j_itr->find("active");
if (active_itr == j_itr->end() or not active_itr->is_boolean())
continue;
const bool active = active_itr->get<bool>();
const bool funded = funded_itr->get<bool>();
if (not funded)
continue;
RouterID rid;
if (rid.FromHex(ed_itr->get<std::string>()))
nodeList.emplace_back(std::move(rid));
PubKey pk;
if (rid.FromHex(ed_itr->get<std::string>()) and pk.FromHex(svc_itr->get<std::string>()))
{
keymap[rid] = pk;
if (active)
activeNodeList.emplace_back(std::move(rid));
else
nonActiveNodeList.emplace_back(std::move(rid));
}
}
}
}
if (nodeList.empty())
if (activeNodeList.empty())
{
LogWarn("got empty service node list, ignoring.");
return;
}
// inform router about the new list
m_Router->loop()->call([r = m_Router, nodeList = std::move(nodeList)]() mutable {
r->SetRouterWhitelist(std::move(nodeList));
});
if (auto router = m_Router.lock())
{
auto& loop = router->loop();
loop->call([this,
active = std::move(activeNodeList),
inactive = std::move(nonActiveNodeList),
keymap = std::move(keymap),
router = std::move(router)]() mutable {
m_KeyMap = std::move(keymap);
router->SetRouterWhitelist(active, inactive);
});
}
else
LogWarn("Cannot update whitelist: router object has gone away");
}
void
LokidRpcClient::InformConnection(RouterID router, bool success)
{
if (auto r = m_Router.lock())
{
r->loop()->call([router, success, this]() {
if (auto itr = m_KeyMap.find(router); itr != m_KeyMap.end())
{
const nlohmann::json request = {
{"passed", success}, {"pubkey", itr->second.ToHex()}, {"type", "lokinet"}};
Request(
"admin.report_peer_status",
[self = shared_from_this()](bool success, std::vector<std::string>) {
if (not success)
{
LogError("Failed to report connection status to oxend");
return;
}
LogDebug("reported connection status to core");
},
request.dump());
}
});
}
}
SecretKey
@ -261,7 +326,7 @@ namespace llarp
const nlohmann::json req{{"type", 2}, {"name_hash", namehash.ToHex()}};
Request(
"rpc.lns_resolve",
[r = m_Router, resultHandler](bool success, std::vector<std::string> data) {
[this, resultHandler](bool success, std::vector<std::string> data) {
std::optional<service::EncryptedName> maybe = std::nullopt;
if (success)
{
@ -285,8 +350,11 @@ namespace llarp
LogError("failed to parse response from lns lookup: ", ex.what());
}
}
r->loop()->call(
[resultHandler, maybe = std::move(maybe)]() { resultHandler(std::move(maybe)); });
if (auto r = m_Router.lock())
{
r->loop()->call(
[resultHandler, maybe = std::move(maybe)]() { resultHandler(std::move(maybe)); });
}
},
req.dump());
}
@ -300,65 +368,66 @@ namespace llarp
LogInfo(" :", str);
}
assert(m_Router != nullptr);
if (not m_Router->peerDb())
if (auto router = m_Router.lock())
{
LogWarn("HandleGetPeerStats called when router has no peerDb set up.");
// TODO: this can sometimes occur if lokid hits our API before we're done configuring
// (mostly an issue in a loopback testnet)
msg.send_reply("EAGAIN");
return;
}
try
{
// msg.data[0] is expected to contain a bt list of router ids (in our preferred string
// format)
if (msg.data.empty())
if (not router->peerDb())
{
LogWarn("lokid requested peer stats with no request body");
msg.send_reply("peer stats request requires list of router IDs");
LogWarn("HandleGetPeerStats called when router has no peerDb set up.");
// TODO: this can sometimes occur if lokid hits our API before we're done configuring
// (mostly an issue in a loopback testnet)
msg.send_reply("EAGAIN");
return;
}
std::vector<std::string> routerIdStrings;
oxenmq::bt_deserialize(msg.data[0], routerIdStrings);
std::vector<RouterID> routerIds;
routerIds.reserve(routerIdStrings.size());
for (const auto& routerIdString : routerIdStrings)
try
{
RouterID id;
if (not id.FromString(routerIdString))
// msg.data[0] is expected to contain a bt list of router ids (in our preferred string
// format)
if (msg.data.empty())
{
LogWarn("lokid sent us an invalid router id: ", routerIdString);
msg.send_reply("Invalid router id");
LogWarn("lokid requested peer stats with no request body");
msg.send_reply("peer stats request requires list of router IDs");
return;
}
routerIds.push_back(std::move(id));
}
std::vector<std::string> routerIdStrings;
oxenmq::bt_deserialize(msg.data[0], routerIdStrings);
std::vector<RouterID> routerIds;
routerIds.reserve(routerIdStrings.size());
auto statsList = m_Router->peerDb()->listPeerStats(routerIds);
for (const auto& routerIdString : routerIdStrings)
{
RouterID id;
if (not id.FromString(routerIdString))
{
LogWarn("lokid sent us an invalid router id: ", routerIdString);
msg.send_reply("Invalid router id");
return;
}
int32_t bufSize =
256 + (statsList.size() * 1024); // TODO: tune this or allow to grow dynamically
auto buf = std::unique_ptr<uint8_t[]>(new uint8_t[bufSize]);
llarp_buffer_t llarpBuf(buf.get(), bufSize);
routerIds.push_back(std::move(id));
}
PeerStats::BEncodeList(statsList, &llarpBuf);
auto statsList = router->peerDb()->listPeerStats(routerIds);
msg.send_reply(std::string_view((const char*)llarpBuf.base, llarpBuf.cur - llarpBuf.base));
}
catch (const std::exception& e)
{
LogError("Failed to handle get_peer_stats request: ", e.what());
msg.send_reply("server error");
int32_t bufSize =
256 + (statsList.size() * 1024); // TODO: tune this or allow to grow dynamically
auto buf = std::unique_ptr<uint8_t[]>(new uint8_t[bufSize]);
llarp_buffer_t llarpBuf(buf.get(), bufSize);
PeerStats::BEncodeList(statsList, &llarpBuf);
msg.send_reply(
std::string_view((const char*)llarpBuf.base, llarpBuf.cur - llarpBuf.base));
}
catch (const std::exception& e)
{
LogError("Failed to handle get_peer_stats request: ", e.what());
msg.send_reply("server error");
}
}
}
} // namespace rpc
} // namespace llarp

@ -19,7 +19,7 @@ namespace llarp
/// The LokidRpcClient uses loki-mq to talk to make API requests to lokid.
struct LokidRpcClient : public std::enable_shared_from_this<LokidRpcClient>
{
explicit LokidRpcClient(LMQ_ptr lmq, AbstractRouter* r);
explicit LokidRpcClient(LMQ_ptr lmq, std::weak_ptr<AbstractRouter> r);
/// Connect to lokid async
void
@ -42,6 +42,10 @@ namespace llarp
dht::Key_t namehash,
std::function<void(std::optional<service::EncryptedName>)> resultHandler);
/// inform that if connected to a router successfully
void
InformConnection(RouterID router, bool success);
private:
/// called when we have connected to lokid via lokimq
void
@ -52,7 +56,7 @@ namespace llarp
Command(std::string_view cmd);
void
UpdateServiceNodeList(std::string topblock);
UpdateServiceNodeList();
template <typename HandlerFunc_t, typename Args_t>
void
@ -82,9 +86,11 @@ namespace llarp
std::optional<oxenmq::ConnectionID> m_Connection;
LMQ_ptr m_lokiMQ;
AbstractRouter* const m_Router;
std::weak_ptr<AbstractRouter> m_Router;
std::atomic<bool> m_UpdatingList;
std::unordered_map<RouterID, PubKey> m_KeyMap;
uint64_t m_BlockHeight;
};

@ -480,6 +480,7 @@ namespace llarp::rpc
onGoodResult("added null exit");
return;
}
ep->MarkAddressOutbound(addr);
ep->EnsurePathToService(
addr,
[onBadResult, onGoodResult, shouldSendAuth, addrStr = addr.ToString()](

@ -101,11 +101,16 @@ namespace llarp
{
const auto now = llarp::time_now_ms();
m_LastIntrosetRegenAttempt = now;
std::set<Introduction> introset;
if (!GetCurrentIntroductionsWithFilter(
introset, [now](const service::Introduction& intro) -> bool {
return not intro.ExpiresSoon(now, path::min_intro_lifetime);
std::set<Introduction, CompareIntroTimestamp> intros;
if (const auto maybe =
GetCurrentIntroductionsWithFilter([now](const service::Introduction& intro) -> bool {
return not intro.ExpiresSoon(
now, path::default_lifetime - path::min_intro_lifetime);
}))
{
intros.insert(maybe->begin(), maybe->end());
}
else
{
LogWarn(
"could not publish descriptors for endpoint ",
@ -146,9 +151,10 @@ namespace llarp
}
introSet().intros.clear();
for (auto& intro : introset)
for (auto& intro : intros)
{
introSet().intros.emplace_back(std::move(intro));
if (introSet().intros.size() < numDesiredPaths)
introSet().intros.emplace_back(std::move(intro));
}
if (introSet().intros.empty())
{
@ -710,8 +716,10 @@ namespace llarp
return false;
auto next_pub = m_state->m_LastPublishAttempt
+ (m_state->m_IntroSet.HasExpiredIntros(now) ? INTROSET_PUBLISH_RETRY_INTERVAL
: INTROSET_PUBLISH_INTERVAL);
+ (m_state->m_IntroSet.HasStaleIntros(
now, path::default_lifetime - path::intro_path_spread)
? IntrosetPublishRetryCooldown
: IntrosetPublishInterval);
return now >= next_pub and m_LastIntrosetRegenAttempt + 1s <= now;
}
@ -739,8 +747,11 @@ namespace llarp
{
std::unordered_set<RouterID> exclude;
ForEachPath([&exclude](auto path) { exclude.insert(path->Endpoint()); });
const auto maybe = m_router->nodedb()->GetRandom(
[exclude](const auto& rc) -> bool { return exclude.count(rc.pubkey) == 0; });
const auto maybe =
m_router->nodedb()->GetRandom([exclude, r = m_router](const auto& rc) -> bool {
return exclude.count(rc.pubkey) == 0
and not r->routerProfiling().IsBadForPath(rc.pubkey);
});
if (not maybe.has_value())
return std::nullopt;
return GetHopsForBuildWithEndpoint(maybe->pubkey);
@ -758,46 +769,27 @@ namespace llarp
path::Builder::PathBuildStarted(path);
}
constexpr auto MaxOutboundContextPerRemote = 4;
constexpr auto MaxOutboundContextPerRemote = 1;
void
Endpoint::PutNewOutboundContext(const service::IntroSet& introset, llarp_time_t left)
{
Address addr{introset.addressKeys.Addr()};
const Address addr{introset.addressKeys.Addr()};
auto& remoteSessions = m_state->m_RemoteSessions;
auto& serviceLookups = m_state->m_PendingServiceLookups;
if (remoteSessions.count(addr) >= MaxOutboundContextPerRemote)
if (remoteSessions.count(addr) < MaxOutboundContextPerRemote)
{
auto itr = remoteSessions.find(addr);
auto range = serviceLookups.equal_range(addr);
auto i = range.first;
while (i != range.second)
{
itr->second->SetReadyHook(
[callback = i->second, addr](auto session) { callback(addr, session); }, left);
++i;
}
serviceLookups.erase(addr);
return;
remoteSessions.emplace(addr, std::make_shared<OutboundContext>(introset, this));
LogInfo("Created New outbound context for ", addr.ToString());
}
auto session = std::make_shared<OutboundContext>(introset, this);
remoteSessions.emplace(addr, session);
LogInfo("Created New outbound context for ", addr.ToString());
// inform pending
auto range = serviceLookups.equal_range(addr);
auto itr = range.first;
if (itr != range.second)
auto sessionRange = remoteSessions.equal_range(addr);
for (auto itr = sessionRange.first; itr != sessionRange.second; ++itr)
{
session->SetReadyHook(
[callback = itr->second, addr](auto session) { callback(addr, session); }, left);
++itr;
itr->second->AddReadyHook(
[addr, this](auto session) { InformPathToService(addr, session); }, left);
}
serviceLookups.erase(addr);
}
void
@ -924,7 +916,7 @@ namespace llarp
paths.insert(path);
});
constexpr size_t min_unique_lns_endpoints = 3;
constexpr size_t min_unique_lns_endpoints = 2;
// not enough paths
if (paths.size() < min_unique_lns_endpoints)
@ -1013,7 +1005,15 @@ namespace llarp
msg.S = path->NextSeqNo();
if (path && path->SendRoutingMessage(msg, Router()))
{
RouterLookupJob job{this, handler};
RouterLookupJob job{this, [handler, router, nodedb = m_router->nodedb()](auto results) {
if (results.empty())
{
LogInfo("could not find ", router, ", remove it from nodedb");
nodedb->Remove(router);
}
if (handler)
handler(results);
}};
assert(msg.M.size() == 1);
auto dhtMsg = dynamic_cast<FindRouterMessage*>(msg.M[0].get());
@ -1066,11 +1066,11 @@ namespace llarp
void
Endpoint::QueueRecvData(RecvDataEvent ev)
{
if (m_RecvQueue.full() || m_RecvQueue.empty())
if (m_RecvQueue.full() or m_RecvQueue.empty())
{
m_router->loop()->call([this] { FlushRecvData(); });
m_router->loop()->call_soon([this] { FlushRecvData(); });
}
m_RecvQueue.pushBack(std::move(ev));
m_RecvQueue.tryPushBack(std::move(ev));
}
bool
@ -1078,14 +1078,15 @@ namespace llarp
path::Path_ptr path, const PathID_t from, std::shared_ptr<ProtocolMessage> msg)
{
msg->sender.UpdateAddr();
if (not HasOutboundConvo(msg->sender.Addr()))
PutSenderFor(msg->tag, msg->sender, true);
PutSenderFor(msg->tag, msg->sender, true);
PutReplyIntroFor(msg->tag, path->intro);
Introduction intro;
intro.pathID = from;
intro.router = PubKey{path->Endpoint()};
intro.expiresAt = std::min(path->ExpireTime(), msg->introReply.expiresAt);
intro.latency = path->intro.latency;
PutIntroFor(msg->tag, intro);
PutReplyIntroFor(msg->tag, path->intro);
ConvoTagRX(msg->tag);
return ProcessDataMessage(msg);
}
@ -1178,10 +1179,11 @@ namespace llarp
// not applicable because we are not an exit or don't have an endpoint auth policy
if ((not m_state->m_ExitEnabled) or m_AuthPolicy == nullptr)
return;
ProtocolFrame f;
ProtocolFrame f{};
f.R = AuthResultCodeAsInt(result.code);
f.T = tag;
f.F = path->intro.pathID;
f.N.Randomize();
if (result.code == AuthResultCode::eAuthAccepted)
{
ProtocolMessage msg;
@ -1189,10 +1191,7 @@ namespace llarp
std::vector<byte_t> reason{};
reason.resize(result.reason.size());
std::copy_n(result.reason.c_str(), reason.size(), reason.data());
msg.PutBuffer(reason);
f.N.Randomize();
f.C.Zero();
if (m_AuthPolicy)
msg.proto = ProtocolType::Auth;
else
@ -1234,6 +1233,23 @@ namespace llarp
Sessions().erase(t);
}
void
Endpoint::ResetConvoTag(ConvoTag tag, path::Path_ptr p, PathID_t from)
{
// send reset convo tag message
ProtocolFrame f{};
f.R = 1;
f.T = tag;
f.F = p->intro.pathID;
f.Sign(m_Identity);
{
LogWarn("invalidating convotag T=", tag);
RemoveConvoTag(tag);
m_SendQueue.tryPushBack(
SendEvent_t{std::make_shared<routing::PathTransferMessage>(f, from), p});
}
}
bool
Endpoint::HandleHiddenServiceFrame(path::Path_ptr p, const ProtocolFrame& frame)
{
@ -1247,29 +1263,13 @@ namespace llarp
if (!frame.Verify(si))
return false;
// remove convotag it doesn't exist
LogWarn("remove convotag T=", frame.T, " R=", frame.R);
LogWarn("remove convotag T=", frame.T, " R=", frame.R, " from ", si.Addr());
RemoveConvoTag(frame.T);
return true;
}
if (not frame.AsyncDecryptAndVerify(Router()->loop(), p, m_Identity, this))
{
LogError("Failed to decrypt protocol frame");
if (not frame.C.IsZero())
{
// send reset convo tag message
ProtocolFrame f;
f.R = 1;
f.T = frame.T;
f.F = p->intro.pathID;
f.Sign(m_Identity);
{
LogWarn("invalidating convotag T=", frame.T);
RemoveConvoTag(frame.T);
m_SendQueue.tryPushBack(
SendEvent_t{std::make_shared<routing::PathTransferMessage>(f, frame.F), p});
}
}
ResetConvoTag(frame.T, p, frame.F);
}
return true;
}
@ -1279,8 +1279,8 @@ namespace llarp
{
m_router->routerProfiling().MarkPathTimeout(p.get());
ManualRebuild(1);
RegenAndPublishIntroSet();
path::Builder::HandlePathDied(p);
RegenAndPublishIntroSet();
}
bool
@ -1294,22 +1294,65 @@ namespace llarp
const Address& addr,
std::optional<IntroSet> introset,
const RouterID& endpoint,
llarp_time_t timeLeft)
llarp_time_t timeLeft,
uint64_t relayOrder)
{
// tell all our existing remote sessions about this introset update
const auto now = Router()->Now();
auto& fails = m_state->m_ServiceLookupFails;
auto& lookups = m_state->m_PendingServiceLookups;
if (introset)
{
auto& sessions = m_state->m_RemoteSessions;
auto range = sessions.equal_range(addr);
auto itr = range.first;
while (itr != range.second)
{
itr->second->OnIntroSetUpdate(addr, introset, endpoint, timeLeft, relayOrder);
// we got a successful lookup
if (itr->second->ReadyToSend() and not introset->IsExpired(now))
{
// inform all lookups
auto lookup_range = lookups.equal_range(addr);
auto i = lookup_range.first;
while (i != lookup_range.second)
{
i->second(addr, itr->second.get());
++i;
}
lookups.erase(addr);
}
++itr;
}
}
auto& fails = m_state->m_ServiceLookupFails;
if (not introset or introset->IsExpired(now))
{
LogError(Name(), " failed to lookup ", addr.ToString(), " from ", endpoint);
LogError(
Name(),
" failed to lookup ",
addr.ToString(),
" from ",
endpoint,
" order=",
relayOrder);
fails[endpoint] = fails[endpoint] + 1;
// inform one
auto range = lookups.equal_range(addr);
auto itr = range.first;
if (itr != range.second)
const auto pendingForAddr = std::count_if(
m_state->m_PendingLookups.begin(),
m_state->m_PendingLookups.end(),
[addr](const auto& item) -> bool { return item.second->IsFor(addr); });
// inform all if we have no more pending lookups for this address
if (pendingForAddr == 0)
{
itr->second(addr, nullptr);
itr = lookups.erase(itr);
auto range = lookups.equal_range(addr);
auto itr = range.first;
while (itr != range.second)
{
itr->second(addr, nullptr);
itr = lookups.erase(itr);
}
}
return false;
}
@ -1323,9 +1366,10 @@ namespace llarp
}
void
Endpoint::MarkAddressOutbound(const Address& addr)
Endpoint::MarkAddressOutbound(AddressVariant_t addr)
{
m_state->m_OutboundSessions.insert(addr);
if (auto* ptr = std::get_if<Address>(&addr))
m_state->m_OutboundSessions.insert(*ptr);
}
bool
@ -1334,15 +1378,38 @@ namespace llarp
return m_state->m_OutboundSessions.count(addr) > 0;
}
void
Endpoint::InformPathToService(const Address remote, OutboundContext* ctx)
{
auto& serviceLookups = m_state->m_PendingServiceLookups;
auto range = serviceLookups.equal_range(remote);
auto itr = range.first;
while (itr != range.second)
{
itr->second(remote, ctx);
++itr;
}
serviceLookups.erase(remote);
}
bool
Endpoint::EnsurePathToService(const Address remote, PathEnsureHook hook, llarp_time_t timeout)
{
if (not WantsOutboundSession(remote))
{
// we don't want to ensure paths to addresses that are inbound
// inform fail right away in that case
hook(remote, nullptr);
return false;
}
/// how many routers to use for lookups
static constexpr size_t NumParallelLookups = 2;
/// how many requests per router
static constexpr size_t RequestsPerLookup = 2;
MarkAddressOutbound(remote);
// add response hook to list for address.
m_state->m_PendingServiceLookups.emplace(remote, hook);
auto& sessions = m_state->m_RemoteSessions;
{
@ -1352,20 +1419,17 @@ namespace llarp
{
if (itr->second->ReadyToSend())
{
hook(remote, itr->second.get());
InformPathToService(remote, itr->second.get());
return true;
}
++itr;
}
}
// add response hook to list for address.
m_state->m_PendingServiceLookups.emplace(remote, hook);
/// check replay filter
if (not m_IntrosetLookupFilter.Insert(remote))
return true;
const auto paths = GetManyPathsWithUniqueEndpoints(this, NumParallelLookups);
const auto paths = GetManyPathsWithUniqueEndpoints(this, NumParallelLookups, remote.ToKey());
using namespace std::placeholders;
const dht::Key_t location = remote.ToKey();
@ -1381,15 +1445,15 @@ namespace llarp
{
HiddenServiceAddressLookup* job = new HiddenServiceAddressLookup(
this,
[this](auto addr, auto result, auto from, auto left) {
return OnLookup(addr, result, from, left);
[this](auto addr, auto result, auto from, auto left, auto order) {
return OnLookup(addr, result, from, left, order);
},
location,
PubKey{remote.as_array()},
path->Endpoint(),
order,
GenTXID(),
timeout);
timeout + (2 * path->intro.latency));
LogInfo(
"doing lookup for ",
remote,
@ -1425,14 +1489,8 @@ namespace llarp
bool
Endpoint::EnsurePathToSNode(const RouterID snode, SNodeEnsureHook h)
{
static constexpr size_t MaxConcurrentSNodeSessions = 16;
auto& nodeSessions = m_state->m_SNodeSessions;
if (nodeSessions.size() >= MaxConcurrentSNodeSessions)
{
// a quick client side work arround before we do proper limiting
LogError(Name(), " has too many snode sessions");
return false;
}
using namespace std::placeholders;
if (nodeSessions.count(snode) == 0)
{
@ -1615,6 +1673,14 @@ namespace llarp
if (session.inbound)
{
auto path = GetPathByRouter(session.replyIntro.router);
// if we have no path to the remote router that's fine still use it just in case this
// is the ONLY one we have
if (path == nullptr)
{
ret = tag;
continue;
}
if (path and path->IsReady())
{
const auto rttEstimate = (session.replyIntro.latency + path->intro.latency) * 2;
@ -1679,6 +1745,13 @@ namespace llarp
Loop()->call_soon([tag, hook]() { hook(tag); });
return true;
}
if (not WantsOutboundSession(*ptr))
{
// we don't want to connect back to inbound sessions
hook(std::nullopt);
return true;
}
return EnsurePathToService(
*ptr,
[hook](auto, auto* ctx) {
@ -1799,8 +1872,7 @@ namespace llarp
LogError("failed to encrypt and sign");
return;
}
self->m_SendQueue.pushBack(SendEvent_t{transfer, p});
;
self->m_SendQueue.tryPushBack(SendEvent_t{transfer, p});
});
return true;
}
@ -1811,7 +1883,7 @@ namespace llarp
}
else
{
LogWarn("Have inbound convo but get-best returned none; bug?");
LogWarn("Have inbound convo from ", remote, " but get-best returned none; bug?");
}
}
@ -1883,10 +1955,13 @@ namespace llarp
bool
Endpoint::ShouldBuildMore(llarp_time_t now) const
{
if (not path::Builder::ShouldBuildMore(now))
if (BuildCooldownHit(now))
return false;
const auto requiredPaths = std::max(numDesiredPaths, path::min_intro_paths);
if (NumInStatus(path::ePathBuilding) >= requiredPaths)
return false;
return ((now - lastBuild) > path::intro_path_spread)
|| NumInStatus(path::ePathEstablished) < path::min_intro_paths;
return NumPathsExistingAt(now + (path::default_lifetime - path::intro_path_spread))
< requiredPaths;
}
AbstractRouter*

@ -49,12 +49,13 @@ namespace llarp
struct OutboundContext;
/// minimum interval for publishing introsets
static constexpr auto INTROSET_PUBLISH_INTERVAL =
std::chrono::milliseconds(path::default_lifetime) / 4;
static constexpr auto IntrosetPublishInterval = path::intro_path_spread / 2;
static constexpr auto INTROSET_PUBLISH_RETRY_INTERVAL = 5s;
/// how agressively should we retry publishing introset on failure
static constexpr auto IntrosetPublishRetryCooldown = 1s;
static constexpr auto INTROSET_LOOKUP_RETRY_COOLDOWN = 3s;
/// how aggressively should we retry looking up introsets
static constexpr auto IntrosetLookupCooldown = 250ms;
struct Endpoint : public path::Builder,
public ILookupHolder,
@ -284,8 +285,8 @@ namespace llarp
bool
WantsOutboundSession(const Address&) const override;
void
MarkAddressOutbound(const Address&) override;
/// this MUST be called if you want to call EnsurePathTo on the given address
void MarkAddressOutbound(AddressVariant_t) override;
bool
ShouldBundleRC() const override
@ -330,6 +331,9 @@ namespace llarp
using SNodeEnsureHook = std::function<void(const RouterID, exit::BaseSession_ptr, ConvoTag)>;
void
InformPathToService(const Address remote, OutboundContext* ctx);
/// ensure a path to a service node by public key
bool
EnsurePathToSNode(const RouterID remote, SNodeEnsureHook h);
@ -415,6 +419,9 @@ namespace llarp
uint64_t
GenTXID();
void
ResetConvoTag(ConvoTag tag, path::Path_ptr path, PathID_t from);
const std::set<RouterID>&
SnodeBlacklist() const;
@ -471,7 +478,8 @@ namespace llarp
const service::Address& addr,
std::optional<IntroSet> i,
const RouterID& endpoint,
llarp_time_t timeLeft);
llarp_time_t timeLeft,
uint64_t relayOrder);
bool
DoNetworkIsolation(bool failed);

@ -37,6 +37,7 @@ namespace llarp
void
EndpointUtil::ExpirePendingTx(llarp_time_t now, PendingLookups& lookups)
{
std::vector<std::unique_ptr<IServiceLookup>> timedout;
for (auto itr = lookups.begin(); itr != lookups.end();)
{
if (!itr->second->IsTimedOut(now))
@ -44,11 +45,14 @@ namespace llarp
++itr;
continue;
}
std::unique_ptr<IServiceLookup> lookup = std::move(itr->second);
timedout.emplace_back(std::move(itr->second));
itr = lookups.erase(itr);
}
for (const auto& lookup : timedout)
{
LogWarn(lookup->name, " timed out txid=", lookup->txid);
lookup->HandleTimeout();
itr = lookups.erase(itr);
}
}
@ -95,7 +99,11 @@ namespace llarp
itr->second->Tick(now);
if (itr->second->Pump(now))
{
LogInfo("marking session as dead T=", itr->second->currentConvoTag);
LogInfo(
"marking session as dead T=",
itr->second->currentConvoTag,
" to ",
itr->second->Addr());
itr->second->Stop();
sessions.erase(itr->second->currentConvoTag);
deadSessions.emplace(std::move(*itr));
@ -106,6 +114,10 @@ namespace llarp
++itr;
}
}
for (auto& item : deadSessions)
{
item.second->Tick(now);
}
}
void
@ -116,7 +128,7 @@ namespace llarp
{
if (itr->second.IsExpired(now))
{
LogInfo("Expire session T=", itr->first);
LogInfo("Expire session T=", itr->first, " to ", itr->second.Addr());
itr = sessions.erase(itr);
}
else

@ -43,15 +43,31 @@ namespace llarp
template <typename Endpoint_t>
static path::Path::UniqueEndpointSet_t
GetManyPathsWithUniqueEndpoints(Endpoint_t* ep, size_t N, size_t tries = 10)
GetManyPathsWithUniqueEndpoints(
Endpoint_t* ep,
size_t N,
std::optional<dht::Key_t> maybeLocation = std::nullopt,
size_t tries = 10)
{
std::unordered_set<RouterID> exclude;
path::Path::UniqueEndpointSet_t paths;
do
{
--tries;
const auto path = ep->PickRandomEstablishedPath();
path::Path_ptr path;
if (maybeLocation)
{
path = ep->GetEstablishedPathClosestTo(RouterID{maybeLocation->as_array()}, exclude);
}
else
{
path = ep->PickRandomEstablishedPath();
}
if (path and path->IsReady())
{
paths.emplace(path);
exclude.insert(path->Endpoint());
}
} while (tries > 0 and paths.size() < N);
return paths;
}

@ -77,10 +77,6 @@ namespace llarp
/// do we want a session outbound to addr
virtual bool
WantsOutboundSession(const Address& addr) const = 0;
virtual void
MarkAddressOutbound(const Address& addr) = 0;
virtual void
QueueRecvData(RecvDataEvent ev) = 0;
};

@ -46,7 +46,7 @@ namespace llarp
found = *maybe;
}
}
return handle(remote, found, endpoint, TimeLeft(time_now_ms()));
return handle(remote, found, endpoint, TimeLeft(time_now_ms()), relayOrder);
}
std::shared_ptr<routing::IMessage>

@ -15,7 +15,7 @@ namespace llarp
uint64_t relayOrder;
const dht::Key_t location;
using HandlerFunc = std::function<bool(
const Address&, std::optional<IntroSet>, const RouterID&, llarp_time_t)>;
const Address&, std::optional<IntroSet>, const RouterID&, llarp_time_t, uint64_t)>;
HandlerFunc handle;
HiddenServiceAddressLookup(
@ -30,6 +30,16 @@ namespace llarp
~HiddenServiceAddressLookup() override = default;
virtual bool
IsFor(EndpointBase::AddressVariant_t addr) const override
{
if (const auto* ptr = std::get_if<Address>(&addr))
{
return Address{rootkey} == *ptr;
}
return false;
}
bool
HandleIntrosetResponse(const std::set<EncryptedIntroSet>& results) override;

@ -9,6 +9,7 @@ namespace llarp
{
util::StatusObject obj{
{"router", router.ToHex()},
{"path", pathID.ToHex()},
{"expiresAt", to_json(expiresAt)},
{"latency", to_json(latency)},
{"version", uint64_t(version)}};
@ -66,8 +67,9 @@ namespace llarp
std::ostream&
Introduction::print(std::ostream& stream, int level, int spaces) const
{
const RouterID r{router};
Printer printer(stream, level, spaces);
printer.printAttribute("k", RouterID(router));
printer.printAttribute("k", r.ToString());
printer.printAttribute("l", latency.count());
printer.printAttribute("p", pathID);
printer.printAttribute("v", version);

@ -77,6 +77,16 @@ namespace llarp
{
return i.print(out, -1, -1);
}
/// comparator for introset timestamp
struct CompareIntroTimestamp
{
bool
operator()(const Introduction& left, const Introduction& right) const
{
return left.expiresAt > right.expiresAt;
}
};
} // namespace service
} // namespace llarp

@ -351,6 +351,15 @@ namespace llarp::service
return false;
}
bool
IntroSet::HasStaleIntros(llarp_time_t now, llarp_time_t delta) const
{
for (const auto& intro : intros)
if (intro.ExpiresSoon(now, delta))
return true;
return false;
}
bool
IntroSet::IsExpired(llarp_time_t now) const
{

@ -69,6 +69,10 @@ namespace llarp
bool
HasExpiredIntros(llarp_time_t now) const;
/// return true if any of our intros expires soon given a delta
bool
HasStaleIntros(llarp_time_t now, llarp_time_t delta) const;
bool
IsExpired(llarp_time_t now) const;

@ -4,6 +4,8 @@
#include "intro_set.hpp"
#include <llarp/path/pathset.hpp>
#include <llarp/endpoint_base.hpp>
#include <set>
namespace llarp
@ -73,6 +75,12 @@ namespace llarp
const std::string name;
RouterID endpoint;
/// return true if this lookup is for a remote address
virtual bool IsFor(EndpointBase::AddressVariant_t) const
{
return false;
}
util::StatusObject
ExtractStatus() const
{

@ -48,6 +48,7 @@ namespace llarp
MarkCurrentIntroBad(Now());
ShiftIntroduction(false);
UpdateIntroSet();
SwapIntros();
}
return true;
}
@ -58,17 +59,22 @@ namespace llarp
: path::Builder{parent->Router(), OutboundContextNumPaths, parent->numHops}
, SendContext{introset.addressKeys, {}, this, parent}
, location{introset.addressKeys.Addr().ToKey()}
, addr{introset.addressKeys.Addr()}
, currentIntroSet{introset}
{
updatingIntroSet = false;
for (const auto& intro : introset.intros)
{
if (intro.expiresAt > m_NextIntro.expiresAt)
if (m_NextIntro.latency == 0s or m_NextIntro.latency > intro.latency)
m_NextIntro = intro;
}
currentConvoTag.Randomize();
lastShift = Now();
// add send and connect timeouts to the parent endpoints path alignment timeout
// this will make it so that there is less of a chance for timing races
sendTimeout += parent->PathAlignmentTimeout();
connectTimeout += parent->PathAlignmentTimeout();
}
OutboundContext::~OutboundContext() = default;
@ -82,12 +88,28 @@ namespace llarp
remoteIntro = m_NextIntro;
m_DataHandler->PutSenderFor(currentConvoTag, currentIntroSet.addressKeys, false);
m_DataHandler->PutIntroFor(currentConvoTag, remoteIntro);
ShiftIntroRouter(m_NextIntro.router);
// if we have not made a handshake to the remote endpoint do so
if (not IntroGenerated())
{
KeepAlive();
}
}
}
Address
OutboundContext::Addr() const
{
return addr;
}
bool
OutboundContext::OnIntroSetUpdate(
const Address&, std::optional<IntroSet> foundIntro, const RouterID& endpoint, llarp_time_t)
const Address&,
std::optional<IntroSet> foundIntro,
const RouterID& endpoint,
llarp_time_t,
uint64_t relayOrder)
{
if (markedBad)
return true;
@ -112,8 +134,9 @@ namespace llarp
return true;
}
currentIntroSet = *foundIntro;
ShiftIntroRouter(RouterID{});
}
else
else if (relayOrder > 0)
{
++m_LookupFails;
LogWarn(Name(), " failed to look up introset, fails=", m_LookupFails);
@ -128,7 +151,7 @@ namespace llarp
return false;
if (remoteIntro.router.IsZero())
return false;
return GetPathByRouter(remoteIntro.router) != nullptr;
return IntroSent();
}
void
@ -145,8 +168,8 @@ namespace llarp
}
if (selectedIntro.router.IsZero() || selectedIntro.ExpiresSoon(now))
return;
LogWarn(Name(), " shfiting intro off of ", r, " to ", RouterID(selectedIntro.router));
m_NextIntro = selectedIntro;
lastShift = now;
}
void
@ -177,7 +200,7 @@ namespace llarp
p->SetDataHandler(util::memFn(&OutboundContext::HandleHiddenServiceFrame, this));
p->SetDropHandler(util::memFn(&OutboundContext::HandleDataDrop, this));
// we now have a path to the next intro, swap intros
if (p->Endpoint() == m_NextIntro.router or p->Endpoint() == remoteIntro.router)
if (p->Endpoint() == m_NextIntro.router)
SwapIntros();
else
{
@ -188,29 +211,25 @@ namespace llarp
void
OutboundContext::AsyncGenIntro(const llarp_buffer_t& payload, ProtocolType t)
{
if (sentIntro)
if (generatedIntro)
{
LogWarn(Name(), " dropping packet as we are not fully handshaked right now");
return;
}
if (remoteIntro.router.IsZero())
{
LogWarn(Name(), " dropping intro frame we have no intro ready yet");
return;
}
auto path = m_PathSet->GetPathByRouter(remoteIntro.router);
auto path = GetPathByRouter(remoteIntro.router);
if (path == nullptr)
{
// try parent as fallback
path = m_Endpoint->GetPathByRouter(remoteIntro.router);
if (path == nullptr)
{
if (!BuildCooldownHit(Now()))
BuildOneAlignedTo(remoteIntro.router);
LogWarn(Name(), " dropping intro frame, no path to ", remoteIntro.router);
return;
}
LogError(Name(), " has no path to ", remoteIntro.router, " when we should have had one");
return;
}
sentIntro = true;
auto frame = std::make_shared<ProtocolFrame>();
frame->Clear();
auto ex = std::make_shared<AsyncKeyExchange>(
m_Endpoint->Loop(),
remoteIdent,
@ -222,22 +241,29 @@ namespace llarp
t);
ex->hook = [self = shared_from_this(), path](auto frame) {
self->Send(std::move(frame), path);
if (not self->Send(std::move(frame), path))
return;
self->m_Endpoint->Loop()->call_later(
self->remoteIntro.latency, [self]() { self->sentIntro = true; });
};
ex->msg.PutBuffer(payload);
ex->msg.introReply = path->intro;
frame->F = ex->msg.introReply.pathID;
frame->R = 0;
generatedIntro = true;
// ensure we have a sender put for this convo tag
m_DataHandler->PutSenderFor(currentConvoTag, currentIntroSet.addressKeys, false);
// encrypt frame async
m_Endpoint->Router()->QueueWork([ex, frame] { return AsyncKeyExchange::Encrypt(ex, frame); });
LogInfo("send intro frame");
LogInfo(Name(), " send intro frame T=", currentConvoTag);
}
std::string
OutboundContext::Name() const
{
return "OBContext:" + m_Endpoint->Name() + "-"
+ currentIntroSet.addressKeys.Addr().ToString();
return "OBContext:" + currentIntroSet.addressKeys.Addr().ToString();
}
void
@ -249,10 +275,9 @@ namespace llarp
return;
LogInfo(Name(), " updating introset");
m_LastIntrosetUpdateAt = now;
const auto addr = currentIntroSet.addressKeys.Addr();
// we want to use the parent endpoint's paths because outbound context
// does not implement path::PathSet::HandleGotIntroMessage
const auto paths = GetManyPathsWithUniqueEndpoints(m_Endpoint, 2);
const auto paths = GetManyPathsWithUniqueEndpoints(m_Endpoint, 2, location);
uint64_t relayOrder = 0;
for (const auto& path : paths)
{
@ -264,7 +289,7 @@ namespace llarp
path->Endpoint(),
relayOrder,
m_Endpoint->GenTXID(),
5s);
(IntrosetUpdateInterval / 2) + (2 * path->intro.latency));
relayOrder++;
if (job->SendRequestViaPath(path, m_Endpoint->Router()))
updatingIntroSet = true;
@ -280,13 +305,15 @@ namespace llarp
obj["remoteIntro"] = remoteIntro.ExtractStatus();
obj["sessionCreatedAt"] = to_json(createdAt);
obj["lastGoodSend"] = to_json(lastGoodSend);
obj["lastRecv"] = to_json(m_LastInboundTraffic);
obj["lastIntrosetUpdate"] = to_json(m_LastIntrosetUpdateAt);
obj["seqno"] = sequenceNo;
obj["markedBad"] = markedBad;
obj["lastShift"] = to_json(lastShift);
obj["remoteIdentity"] = remoteIdent.Addr().ToString();
obj["remoteIdentity"] = addr.ToString();
obj["currentRemoteIntroset"] = currentIntroSet.ExtractStatus();
obj["nextIntro"] = m_NextIntro.ExtractStatus();
obj["readyToSend"] = ReadyToSend();
std::transform(
m_BadIntros.begin(),
m_BadIntros.end(),
@ -306,36 +333,31 @@ namespace llarp
bool
OutboundContext::Pump(llarp_time_t now)
{
// we are probably dead af
if (m_LookupFails > 16 || m_BuildFails > 10)
return true;
constexpr auto InboundTrafficTimeout = 5s;
if (ReadyToSend() and remoteIntro.router.IsZero())
{
SwapIntros();
}
if (m_GotInboundTraffic and m_LastInboundTraffic + InboundTrafficTimeout <= now)
if ((remoteIntro.router.IsZero() or m_BadIntros.count(remoteIntro))
and GetPathByRouter(m_NextIntro.router))
SwapIntros();
if (m_GotInboundTraffic and m_LastInboundTraffic + sendTimeout <= now)
{
if (std::chrono::abs(now - lastGoodSend) < InboundTrafficTimeout)
{
// timeout on other side
MarkCurrentIntroBad(now);
}
// timeout on other side
UpdateIntroSet();
MarkCurrentIntroBad(now);
ShiftIntroRouter(remoteIntro.router);
}
// check for expiration
if (remoteIntro.ExpiresSoon(now))
// check for stale intros
// update the introset if we think we need to
if (currentIntroSet.HasStaleIntros(now, path::intro_path_spread))
{
UpdateIntroSet();
// shift intro if it expires "soon"
if (ShiftIntroduction())
SwapIntros(); // swap intros if we shifted
}
// lookup router in intro if set and unknown
m_Endpoint->EnsureRouterIsKnown(remoteIntro.router);
if (not m_NextIntro.router.IsZero())
m_Endpoint->EnsureRouterIsKnown(m_NextIntro.router);
// expire bad intros
auto itr = m_BadIntros.begin();
while (itr != m_BadIntros.end())
@ -346,7 +368,7 @@ namespace llarp
++itr;
}
if (ReadyToSend() and m_ReadyHook)
if (ReadyToSend() and not m_ReadyHooks.empty())
{
const auto path = GetPathByRouter(remoteIntro.router);
if (not path)
@ -354,32 +376,40 @@ namespace llarp
LogWarn(Name(), " ready but no path to ", remoteIntro.router, " ???");
return true;
}
m_ReadyHook(this);
m_ReadyHook = nullptr;
for (const auto& hook : m_ReadyHooks)
hook(this);
m_ReadyHooks.clear();
}
if (lastGoodSend > 0s and now >= lastGoodSend + (sendTimeout / 2))
const auto timeout = std::max(lastGoodSend, m_LastInboundTraffic);
if (lastGoodSend > 0s and now >= timeout + (sendTimeout / 2))
{
// send a keep alive to keep this session alive
KeepAlive();
}
// if we are dead return true so we are removed
return lastGoodSend > 0s ? (now >= lastGoodSend && now - lastGoodSend > sendTimeout)
: (now >= createdAt && now - createdAt > connectTimeout);
return timeout > 0s ? (now >= timeout && now - timeout > sendTimeout)
: (now >= createdAt && now - createdAt > connectTimeout);
}
void
OutboundContext::SetReadyHook(std::function<void(OutboundContext*)> hook, llarp_time_t timeout)
OutboundContext::AddReadyHook(std::function<void(OutboundContext*)> hook, llarp_time_t timeout)
{
if (m_ReadyHook)
if (ReadyToSend())
{
hook(this);
return;
m_ReadyHook = hook;
m_router->loop()->call_later(timeout, [this]() {
if (m_ReadyHook)
m_ReadyHook(nullptr);
m_ReadyHook = nullptr;
});
}
if (m_ReadyHooks.empty())
{
m_router->loop()->call_later(timeout, [this]() {
LogWarn(Name(), " did not obtain session in time");
for (const auto& hook : m_ReadyHooks)
hook(nullptr);
m_ReadyHooks.clear();
});
}
m_ReadyHooks.push_back(hook);
}
std::optional<std::vector<RouterContact>>
@ -397,16 +427,19 @@ namespace llarp
bool
OutboundContext::ShouldBuildMore(llarp_time_t now) const
{
if (markedBad || not path::Builder::ShouldBuildMore(now))
if (markedBad or path::Builder::BuildCooldownHit(now))
return false;
if (NumInStatus(path::ePathBuilding) >= numDesiredPaths)
return false;
llarp_time_t t = 0s;
ForEachPath([&t](path::Path_ptr path) {
if (path->IsReady())
t = std::max(path->ExpireTime(), t);
size_t numValidPaths = 0;
ForEachPath([now, &numValidPaths](path::Path_ptr path) {
if (not path->IsReady())
return;
if (not path->intro.ExpiresSoon(now, path::default_lifetime - path::intro_path_spread))
numValidPaths++;
});
return t >= now + path::default_lifetime / 4;
return numValidPaths < numDesiredPaths;
}
void
@ -422,12 +455,24 @@ namespace llarp
m_BadIntros[intro] = now;
}
bool
OutboundContext::IntroSent() const
{
return sentIntro;
}
bool
OutboundContext::IntroGenerated() const
{
return generatedIntro;
}
bool
OutboundContext::ShiftIntroduction(bool rebuild)
{
bool success = false;
auto now = Now();
if (now - lastShift < MIN_SHIFT_INTERVAL)
const auto now = Now();
if (abs(now - lastShift) < shiftTimeout)
return false;
bool shifted = false;
std::vector<Introduction> intros = currentIntroSet.intros;
@ -488,7 +533,7 @@ namespace llarp
{
// unconditionally update introset
UpdateIntroSet();
const RouterID endpoint(path->Endpoint());
const RouterID endpoint{path->Endpoint()};
// if a path to our current intro died...
if (endpoint == remoteIntro.router)
{
@ -498,50 +543,13 @@ namespace llarp
if (p->Endpoint() == endpoint && p->IsReady())
++num;
});
// if we have more than two then we are probably fine
if (num > 2)
return;
// if we have one working one ...
if (num == 1)
if (num == 0)
{
num = 0;
ForEachPath([&](const path::Path_ptr& p) {
if (p->Endpoint() == endpoint)
++num;
});
// if we have 2 or more established or pending don't do anything
if (num > 2)
return;
BuildOneAlignedTo(endpoint);
}
else if (num == 0)
{
// we have no paths to this router right now
// hop off it
Introduction picked;
// get the latest intro that isn't on that endpoint
for (const auto& intro : currentIntroSet.intros)
{
if (intro.router == endpoint)
continue;
if (intro.expiresAt > picked.expiresAt)
picked = intro;
}
// we got nothing
if (picked.router.IsZero())
{
return;
}
m_NextIntro = picked;
// check if we have a path to this router
num = 0;
ForEachPath([&](const path::Path_ptr& p) {
// don't count timed out paths
if (p->Status() != path::ePathTimeout && p->Endpoint() == m_NextIntro.router)
++num;
});
// build a path if one isn't already pending build or established
BuildOneAlignedTo(m_NextIntro.router);
// we have no more paths to this endpoint so we want to pivot off of it
MarkCurrentIntroBad(Now());
ShiftIntroRouter(endpoint);
if (m_NextIntro.router != endpoint)
BuildOneAlignedTo(m_NextIntro.router);
}
}
}

@ -38,6 +38,15 @@ namespace llarp
return shared_from_this();
}
std::weak_ptr<path::PathSet>
GetWeak() override
{
return weak_from_this();
}
Address
Addr() const;
bool
Stop() override;
@ -57,7 +66,7 @@ namespace llarp
/// shift the intro off the current router it is using
void
ShiftIntroRouter(const RouterID remote);
ShiftIntroRouter(const RouterID remote) override;
/// mark the current remote intro as bad
void
@ -71,7 +80,7 @@ namespace llarp
ReadyToSend() const;
void
SetReadyHook(std::function<void(OutboundContext*)> readyHook, llarp_time_t timeout);
AddReadyHook(std::function<void(OutboundContext*)> readyHook, llarp_time_t timeout);
/// for exits
void
@ -129,19 +138,26 @@ namespace llarp
llarp_time_t
RTT() const;
bool
OnIntroSetUpdate(
const Address& addr,
std::optional<IntroSet> i,
const RouterID& endpoint,
llarp_time_t,
uint64_t relayOrder);
private:
/// swap remoteIntro with next intro
void
SwapIntros();
void
OnGeneratedIntroFrame(AsyncKeyExchange* k, PathID_t p);
bool
OnIntroSetUpdate(
const Address& addr, std::optional<IntroSet> i, const RouterID& endpoint, llarp_time_t);
IntroGenerated() const override;
bool
IntroSent() const override;
const dht::Key_t location;
const Address addr;
uint64_t m_UpdateIntrosetTX = 0;
IntroSet currentIntroSet;
Introduction m_NextIntro;
@ -151,8 +167,9 @@ namespace llarp
uint16_t m_BuildFails = 0;
llarp_time_t m_LastInboundTraffic = 0s;
bool m_GotInboundTraffic = false;
bool generatedIntro = false;
bool sentIntro = false;
std::function<void(OutboundContext*)> m_ReadyHook;
std::vector<std::function<void(OutboundContext*)>> m_ReadyHooks;
llarp_time_t m_LastIntrosetUpdateAt = 0s;
};
} // namespace service

@ -458,15 +458,21 @@ namespace llarp
}
};
handler->Router()->QueueWork(
[v, msg = std::move(msg), recvPath = std::move(recvPath), callback]() {
[v, msg = std::move(msg), recvPath = std::move(recvPath), callback, handler]() {
auto resetTag = [handler, tag = v->frame.T, from = v->frame.F, path = recvPath]() {
handler->ResetConvoTag(tag, path, from);
};
if (not v->frame.Verify(v->si))
{
LogError("Signature failure from ", v->si.Addr());
handler->Loop()->call_soon(resetTag);
return;
}
if (not v->frame.DecryptPayloadInto(v->shared, *msg))
{
LogError("failed to decrypt message");
LogError("failed to decrypt message from ", v->si.Addr());
handler->Loop()->call_soon(resetTag);
return;
}
callback(msg);

@ -96,7 +96,7 @@ namespace llarp
version = other.version;
}
ProtocolFrame() : routing::IMessage()
ProtocolFrame() : routing::IMessage{}
{
Clear();
}

@ -21,18 +21,19 @@ namespace llarp
, m_Endpoint(ep)
, createdAt(ep->Now())
, m_SendQueue(SendContextQueueSize)
{}
{
m_FlushWakeup = ep->Loop()->make_waker([this] { FlushUpstream(); });
}
bool
SendContext::Send(std::shared_ptr<ProtocolFrame> msg, path::Path_ptr path)
{
if (m_SendQueue.empty() or m_SendQueue.full())
{
m_Endpoint->Loop()->call([this] { FlushUpstream(); });
}
m_SendQueue.pushBack(std::make_pair(
std::make_shared<routing::PathTransferMessage>(*msg, remoteIntro.pathID), path));
return true;
if (not path->IsReady())
return false;
m_FlushWakeup->Trigger();
return m_SendQueue.tryPushBack(std::make_pair(
std::make_shared<routing::PathTransferMessage>(*msg, remoteIntro.pathID), path))
== thread::QueueReturn::Success;
}
void
@ -84,13 +85,15 @@ namespace llarp
auto path = m_PathSet->GetPathByRouter(remoteIntro.router);
if (!path)
{
LogWarn(m_Endpoint->Name(), " cannot encrypt and send: no path for intro ", remoteIntro);
ShiftIntroRouter(remoteIntro.router);
LogWarn(m_PathSet->Name(), " cannot encrypt and send: no path for intro ", remoteIntro);
return;
}
if (!m_DataHandler->GetCachedSessionKeyFor(f->T, shared))
{
LogWarn(m_Endpoint->Name(), " has no cached session key on session T=", f->T);
LogWarn(
m_PathSet->Name(), " could not send, has no cached session key on session T=", f->T);
return;
}
@ -104,7 +107,7 @@ namespace llarp
}
else
{
LogWarn(m_Endpoint->Name(), " no session T=", f->T);
LogWarn(m_PathSet->Name(), " could not get sequence number for session T=", f->T);
return;
}
m->introReply = path->intro;
@ -115,7 +118,7 @@ namespace llarp
m_Endpoint->Router()->QueueWork([f, m, shared, path, this] {
if (not f->EncryptAndSign(*m, shared, m_Endpoint->GetIdentity()))
{
LogError(m_Endpoint->Name(), " failed to sign message");
LogError(m_PathSet->Name(), " failed to sign message");
return;
}
Send(f, path);
@ -140,11 +143,21 @@ namespace llarp
void
SendContext::AsyncEncryptAndSendTo(const llarp_buffer_t& data, ProtocolType protocol)
{
if (lastGoodSend != 0s)
if (IntroSent())
{
EncryptAndSendTo(data, protocol);
return;
}
// have we generated the initial intro but not sent it yet? bail here so we don't cause
// bullshittery
if (IntroGenerated() and not IntroSent())
{
LogWarn(
m_PathSet->Name(),
" we have generated an intial handshake but have not sent it yet so we drop a packet "
"to prevent bullshittery");
return;
}
const auto maybe = m_Endpoint->MaybeGetAuthInfoForEndpoint(remoteIdent.Addr());
if (maybe.has_value())
{

@ -44,8 +44,9 @@ namespace llarp
uint64_t sequenceNo = 0;
llarp_time_t lastGoodSend = 0s;
const llarp_time_t createdAt;
llarp_time_t sendTimeout = 40s;
llarp_time_t connectTimeout = 60s;
llarp_time_t sendTimeout = path::build_timeout;
llarp_time_t connectTimeout = path::build_timeout;
llarp_time_t shiftTimeout = (path::build_timeout * 5) / 2;
llarp_time_t estimatedRTT = 0s;
bool markedBad = false;
using Msg_ptr = std::shared_ptr<routing::PathTransferMessage>;
@ -55,6 +56,8 @@ namespace llarp
std::function<void(AuthResult)> authResultListener;
std::shared_ptr<EventLoopWakeup> m_FlushWakeup;
virtual bool
ShiftIntroduction(bool rebuild = true)
{
@ -62,6 +65,9 @@ namespace llarp
return true;
}
virtual void
ShiftIntroRouter(const RouterID) = 0;
virtual void
UpdateIntroSet() = 0;
@ -72,6 +78,11 @@ namespace llarp
AsyncSendAuth(std::function<void(AuthResult)> replyHandler);
private:
virtual bool
IntroGenerated() const = 0;
virtual bool
IntroSent() const = 0;
void
EncryptAndSendTo(const llarp_buffer_t& payload, ProtocolType t);

@ -11,7 +11,7 @@ namespace llarp
{"lastSend", to_json(lastSend)},
{"lastRecv", to_json(lastRecv)},
{"replyIntro", replyIntro.ExtractStatus()},
{"remote", remote.Addr().ToString()},
{"remote", Addr().ToString()},
{"seqno", seqno},
{"tx", messagesSend},
{"rx", messagesRecv},
@ -19,6 +19,12 @@ namespace llarp
return obj;
}
Address
Session::Addr() const
{
return remote.Addr();
}
bool
Session::IsExpired(llarp_time_t now, llarp_time_t lifetime) const
{

@ -49,6 +49,9 @@ namespace llarp
bool
IsExpired(llarp_time_t now, llarp_time_t lifetime = SessionLifetime) const;
Address
Addr() const;
};
} // namespace service

@ -38,6 +38,13 @@ namespace llarp
return m_Values.try_emplace(v, now).second;
}
/// upsert will insert or update a value with time as now
void
Upsert(const Val_t& v)
{
m_Values[v] = llarp::time_now_ms();
}
/// decay hashset entries
void
Decay(Time_t now = 0s)

@ -54,6 +54,12 @@ namespace llarp
return shared_from_this();
}
std::weak_ptr<path::PathSet>
GetWeak() override
{
return weak_from_this();
}
bool
SupportsV6() const override
{

@ -20,7 +20,7 @@ MakePath(std::vector< char > hops)
std::vector< RC_t > pathHops;
for(const auto& hop : hops)
pathHops.push_back(MakeHop(hop));
return std::make_shared< Path_t >(pathHops, nullptr, 0, "test");
return std::make_shared< Path_t >(pathHops, std::weak_ptr<llarp::path::PathSet>{}, 0, "test");
}
TEST_CASE("UniqueEndpointSet_t has unique endpoints", "[path]")

Loading…
Cancel
Save