Merge pull request #1658 from majestrate/network-stability-fixes-2021-06-02

Network stability fixes
pull/1664/head
Jeff 3 years ago committed by GitHub
commit 57186110f3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -4,17 +4,66 @@ import curses
import json
import sys
import time
import platform
import os
from argparse import ArgumentParser as AP
import zmq
is_windows = lambda : platform.system().lower() == 'windows'
is_linux = lambda : platform.system().lower() == 'linux'
try:
import zmq
except ImportError:
print("zmq module not found")
print()
if is_linux():
print("for debian-based linux do:")
print("\tsudo apt install python3-zmq")
print("for other linuxs do:")
print("\tpip3 install --user zmq")
else:
print("install it with:")
print("\tpip3 install --user zmq")
sys.quit()
geo = None
try:
import GeoIP
geo = GeoIP.open("/usr/share/GeoIP/GeoIP.dat", GeoIP.GEOIP_STANDARD)
except Exception as ex:
print('no geoip: {}'.format(ex))
time.sleep(1)
except ImportError:
print("geoip module not found")
print()
if is_linux():
print("for debian-based linux do:")
print("\tsudo apt install python3-geoip")
print("for other linuxs do:")
print("\tpip3 install --user geoip")
print("for other linuxs you are responsible for obtaining your owen geoip databases, glhf")
else:
print("install it with:")
print("\tpip3 install --user geoip")
print("")
print("press enter to continue without geoip")
sys.stdin.read(1)
else:
try:
geoip_env_var = 'GEOIP_DB_FILE'
if is_windows():
geoip_default_db = '.\\GeoIP.dat'
else:
geoip_default_db = "/usr/share/GeoIP/GeoIP.dat"
geoip_db_file = geoip_env_var in os.environ and os.environ[geoip_env_var] or geoip_default_db
if not os.path.exists(geoip_db_file):
print("no geoip database found at {}".format(geoip_db_file))
print("you can override the path to it using the {} environmental variable".format(geoip_env_var))
sys.quit()
geo = GeoIP.open(geoip_db_file, GeoIP.GEOIP_STANDARD)
except Exception as ex:
print('failed to load geoip database: {}'.format(ex))
time.sleep(1)
now = lambda : int(time.time()) * 1000
def ip_to_flag(ip):
@ -38,7 +87,7 @@ class Monitor:
_sample_size = 12
def __init__(self, url):
def __init__(self, url, introsetMode=False):
self.txrate = 0
self.rxrate = 0
self.data = dict()
@ -52,6 +101,7 @@ class Monitor:
self._rpc_socket.connect(url)
self._speed_samples = [(0,0,0,0)] * self._sample_size
self._run = True
self._introsetMode = introsetMode
def rpc(self, method):
self._rpc_socket.send_multipart([method.encode(), b'lokinetmon'+method.encode()])
@ -104,8 +154,14 @@ class Monitor:
@staticmethod
def time_to(timestamp):
""" return time until timestamp in seconds formatted"""
now = time.time() * 1000
return "{} seconds".format(int((timestamp - now) / 1000))
if timestamp:
val = int((timestamp - now()) / 1000)
if val < 0:
return "{} seconds ago".format(0-val)
else:
return "{} seconds".format(val)
else:
return 'never'
@staticmethod
def speed_of(rate):
@ -343,6 +399,102 @@ class Monitor:
self.win.addstr("search for {}".format(transaction["tx"]["target"]))
return y_pos
def display_introsets(self, y_pos, service):
"""
display introsets on a service
"""
y_pos += 1
self.win.move(y_pos, 1)
self.win.addstr("localhost.loki")
y_pos = self._display_our_introset(y_pos, service)
y_pos += 1
remotes = service['remoteSessions'] or []
for session in remotes:
y_pos = self._display_session_introset(y_pos, session)
def _display_intro(self, y_pos, intro, label, paths):
y_pos += 1
self.win.move(y_pos, 1)
path = 'path' in intro and intro['path'][:4] or '????'
self.win.addstr('{}: ({}|{}) [expires in: {}] [{} paths]'.format(label, intro['router'][:8], path, self.time_to(intro['expiresAt']), self.count_endpoints_in_path(paths, intro['router'])))
return y_pos
@staticmethod
def count_endpoints_in_path(paths, endpoint):
num = 0
for path in paths:
if path['hops'][-1]['router'] == endpoint and path['ready']:
num += 1
return num
@staticmethod
def count_ready_paths(paths):
num = 0
for path in paths:
if path['ready']:
num += 1
return num
@staticmethod
def make_bar(timestamp, scale=1, char='#'):
if timestamp > 0:
return int((abs(timestamp - now()) / 1000) / scale) * char
return ''
def _display_our_introset(self, y_pos, context):
for intro in context['introset']['intros'] or []:
y_pos = self._display_intro(y_pos, intro, "introset intro", context['paths'])
for path in context['paths']:
y_pos = self._display_intro(y_pos, path['intro'], "inbound path intro", context['paths'])
return y_pos
def _display_session_introset(self, y_pos, context):
#print(context.keys())
y_pos += 1
self.win.move(y_pos, 1)
readyState = context['readyToSend'] and '✔️' or '❌'
self.win.addstr('{} ({}) [{}]'.format(context['remoteIdentity'], context['currentConvoTag'], readyState))
y_pos += 1
self.win.move(y_pos, 1)
self.win.addstr('last good send: {}'.format(self.time_to(context['lastGoodSend'])))
y_pos += 1
self.win.move(y_pos, 1)
self.win.addstr(self.make_bar(context['lastGoodSend']))
y_pos += 1
self.win.move(y_pos, 1)
self.win.addstr('last recv: {}'.format(self.time_to(context['lastRecv'])))
y_pos += 1
self.win.move(y_pos, 1)
self.win.addstr(self.make_bar(context['lastRecv']))
y_pos += 1
self.win.move(y_pos, 1)
self.win.addstr('last introset update: {}'.format(self.time_to(context['lastIntrosetUpdate'])))
y_pos += 1
self.win.move(y_pos, 1)
self.win.addstr(self.make_bar(context['lastIntrosetUpdate'], 30))
y_pos += 2
self.win.move(y_pos, 1)
self.win.addstr('last shift: {}'.format(self.time_to(context['lastShift'])))
paths = context['paths'] or []
y_pos = self._display_intro(y_pos + 1, context['nextIntro'], 'next intro', paths) + 1
y_pos = self._display_intro(y_pos, context['remoteIntro'], 'current intro', paths) + 1
for intro in context['currentRemoteIntroset']['intros'] or []:
y_pos = self._display_intro(y_pos, intro, "introset intro", paths)
y_pos += 1
for intro in context['badIntros'] or []:
y_pos = self._display_intro(y_pos, intro, "bad intro", paths)
y_pos += 1
return y_pos
def display_data(self):
""" draw main window """
if self.data is not None:
@ -351,12 +503,16 @@ class Monitor:
services = self.data["services"] or {}
y_pos = 3
try:
y_pos = self.display_links(y_pos, self.data["links"])
for key in services:
y_pos = self.display_service(y_pos, key, services[key])
y_pos = self.display_dht(y_pos, self.data["dht"])
except:
pass
if not self._introsetMode:
y_pos = self.display_links(y_pos, self.data["links"])
for key in services:
y_pos = self.display_service(y_pos, key, services[key])
y_pos = self.display_dht(y_pos, self.data["dht"])
else:
for key in services:
y_pos = self.display_introsets(y_pos, services[key])
except Exception as ex:
print('{}'.format(ex))
else:
self.win.move(1, 1)
self.win.addstr("lokinet offline")
@ -387,8 +543,17 @@ class Monitor:
def main():
""" main function """
ap = AP()
ap.add_argument("--introset", action='store_const', const=True, default=False, help="run in introset inspection mode")
ap.add_argument("--url", default='tcp://127.0.0.1:1190', type=str, help='url to lokinet rpc')
args = ap.parse_args()
mon = Monitor(
len(sys.argv) > 1 and sys.argv[1] or "tcp://127.0.0.1:1190"
args.url,
args.introset
)
mon.run()

@ -31,7 +31,7 @@ def main():
ap.add_argument('--ip', type=str, default=None)
ap.add_argument('--ifname', type=str, default='lo')
ap.add_argument('--netid', type=str, default=None)
ap.add_argument('--loglevel', type=str, default='info')
ap.add_argument('--loglevel', type=str, default='debug')
args = ap.parse_args()
if args.valgrind:
@ -59,11 +59,15 @@ def main():
config['bind'] = {
args.ifname: str(args.baseport + nodeid)
}
config["logging"] = {
"level": args.loglevel
}
config['netdb'] = {
'dir': 'netdb'
}
config['network'] = {
'type' : 'null'
'type' : 'null',
'save-profiles': 'false'
}
config['api'] = {
'enabled': 'false'
@ -71,6 +75,9 @@ def main():
config['lokid'] = {
'enabled': 'false',
}
config["logging"] = {
"level": args.loglevel
}
d = os.path.join(args.dir, svcNodeName(nodeid))
if not os.path.exists(d):
os.mkdir(d)

@ -769,7 +769,7 @@ namespace llarp
}
else
{
info.interface = name;
info.interface = std::string{name};
std::vector<std::string_view> splits = split(value, ',');
for (std::string_view str : splits)

@ -5,6 +5,6 @@
#include <cstdlib>
constexpr size_t MAX_LINK_MSG_SIZE = 8192;
static constexpr auto DefaultLinkSessionLifetime = 1min;
constexpr size_t MaxSendQueueSize = 1024;
static constexpr auto DefaultLinkSessionLifetime = 5min;
constexpr size_t MaxSendQueueSize = 1024 * 16;
static constexpr auto LinkLayerConnectTimeout = 5s;

@ -20,14 +20,16 @@ namespace llarp
constexpr std::chrono::milliseconds default_lifetime = 20min;
/// minimum into lifetime we will advertise
constexpr std::chrono::milliseconds min_intro_lifetime = default_lifetime / 2;
/// number of slices of path lifetime to spread intros out via
constexpr auto intro_spread_slices = 4;
/// spacing frequency at which we try to build paths for introductions
constexpr std::chrono::milliseconds intro_path_spread = default_lifetime / 5;
constexpr std::chrono::milliseconds intro_path_spread = default_lifetime / intro_spread_slices;
/// Minimum paths to keep around for intros; mainly used at startup (the
/// spread, above, should be able to maintain more than this number of paths
/// normally once things are going).
constexpr std::size_t min_intro_paths = 4;
/// after this many ms a path build times out
constexpr auto build_timeout = 30s;
constexpr auto build_timeout = 10s;
/// measure latency every this interval ms
constexpr auto latency_interval = 20s;

@ -130,6 +130,9 @@ namespace llarp
std::string name,
std::string service,
std::function<void(std::vector<dns::SRVData>)> resultHandler) = 0;
virtual void
MarkAddressOutbound(AddressVariant_t remote) = 0;
};
} // namespace llarp

@ -51,6 +51,12 @@ namespace llarp
return shared_from_this();
}
std::weak_ptr<path::PathSet>
GetWeak() override
{
return weak_from_this();
}
void
BlacklistSNode(const RouterID snode) override;

@ -41,6 +41,8 @@ namespace llarp
void
SRVRecordsChanged() override;
void MarkAddressOutbound(AddressVariant_t) override{};
bool
SendToOrQueue(
service::ConvoTag tag, const llarp_buffer_t& payload, service::ProtocolType t) override;

@ -61,6 +61,12 @@ namespace llarp
return shared_from_this();
}
std::weak_ptr<path::PathSet>
GetWeak() override
{
return weak_from_this();
}
bool
SupportsV6() const override
{

@ -67,6 +67,8 @@ namespace llarp
: service::Endpoint(r, parent)
, m_UserToNetworkPktQueue("endpoint_sendq", r->loop(), r->loop())
{
m_PacketSendWaker = r->loop()->make_waker([this]() { FlushWrite(); });
m_MessageSendWaker = r->loop()->make_waker([this]() { FlushSend(); });
m_PacketRouter = std::make_unique<vpn::PacketRouter>(
[this](net::IPPacket pkt) { HandleGotUserPacket(std::move(pkt)); });
#ifdef ANDROID
@ -224,6 +226,12 @@ namespace llarp
{
FlushSend();
Pump(Now());
FlushWrite();
}
void
TunEndpoint::FlushWrite()
{
// flush network to user
while (not m_NetworkToUserPktQueue.empty())
{
@ -281,6 +289,7 @@ namespace llarp
service::Address addr, auto msg, bool isV6) -> bool {
using service::Address;
using service::OutboundContext;
MarkAddressOutbound(addr);
return EnsurePathToService(
addr,
[this, addr, msg, reply, isV6](const Address&, OutboundContext* ctx) {
@ -307,7 +316,7 @@ namespace llarp
service::Address addr, auto msg) -> bool {
using service::Address;
using service::OutboundContext;
MarkAddressOutbound(addr);
return EnsurePathToService(
addr,
[msg, addr, reply](const Address&, OutboundContext* ctx) {
@ -697,6 +706,7 @@ namespace llarp
m_AddrToIP[addr] = ip;
m_SNodes[addr] = SNode;
MarkIPActiveForever(ip);
MarkAddressOutbound(addr);
return true;
}
@ -923,12 +933,13 @@ namespace llarp
MarkAddressOutbound(addr);
EnsurePathToService(
addr,
[addr, pkt, self = this](service::Address, service::OutboundContext* ctx) {
[pkt](service::Address addr, service::OutboundContext* ctx) {
if (ctx)
{
ctx->sendTimeout = 5s;
ctx->SendPacketToRemote(pkt.ConstBuffer(), service::ProtocolType::Exit);
return;
}
self->SendToOrQueue(addr, pkt.ConstBuffer(), service::ProtocolType::Exit);
LogWarn("cannot ensure path to exit ", addr, " so we drop some packets");
},
PathAlignmentTimeout());
return;
@ -957,12 +968,40 @@ namespace llarp
else
pkt.UpdateIPv6Address({0}, {0});
}
if (SendToOrQueue(to, pkt.Buffer(), type))
// try sending it on an existing convotag
// this succeds for inbound convos, probably.
if (SendToOrQueue(to, pkt.ConstBuffer(), type))
{
MarkIPActive(dst);
return;
}
llarp::LogWarn(Name(), " did not flush packets");
// try establishing a path to this guy
// will fail if it's an inbound convo
EnsurePathTo(
to,
[pkt, type, dst, to, this](auto maybe) {
if (not maybe)
{
var::visit(
[&](auto&& addr) {
LogWarn(Name(), " failed to ensure path to ", addr, " no convo tag found");
},
to);
}
if (SendToOrQueue(*maybe, pkt.ConstBuffer(), type))
{
MarkIPActive(dst);
}
else
{
var::visit(
[&](auto&& addr) {
LogWarn(Name(), " failed to send to ", addr, ", SendToOrQueue failed");
},
to);
}
},
PathAlignmentTimeout());
});
}
@ -1117,6 +1156,8 @@ namespace llarp
pkt.UpdateIPv6Address(src, dst);
}
m_NetworkToUserPktQueue.push(std::move(write));
// wake up packet flushing event so we ensure that all packets are written to user
m_PacketSendWaker->Trigger();
return true;
}
@ -1163,7 +1204,9 @@ namespace llarp
m_AddrToIP[ident] = nextIP;
m_IPToAddr[nextIP] = ident;
m_SNodes[ident] = snode;
llarp::LogInfo(Name(), " mapped ", ident, " to ", nextIP);
var::visit(
[&](auto&& remote) { llarp::LogInfo(Name(), " mapped ", remote, " to ", nextIP); },
addr);
MarkIPActive(nextIP);
return nextIP;
}
@ -1223,6 +1266,7 @@ namespace llarp
TunEndpoint::HandleGotUserPacket(net::IPPacket pkt)
{
m_UserToNetworkPktQueue.Emplace(std::move(pkt));
m_MessageSendWaker->Trigger();
}
TunEndpoint::~TunEndpoint() = default;

@ -34,6 +34,12 @@ namespace llarp
return shared_from_this();
}
std::weak_ptr<path::PathSet>
GetWeak() override
{
return weak_from_this();
}
void
Thaw() override;
@ -209,6 +215,10 @@ namespace llarp
virtual void
FlushSend();
/// flush writing ip packets to interface
void
FlushWrite();
/// maps ip to key (host byte order)
std::unordered_map<huint128_t, AlignedBuffer<32>> m_IPToAddr;
/// maps key to ip (host byte order)
@ -228,7 +238,7 @@ namespace llarp
std::function<void(dns::Message)> reply,
bool sendIPv6)
{
if (ctx or HasAddress(addr))
if (ctx)
{
huint128_t ip = ObtainIPForAddr(addr);
query->answers.clear();
@ -275,6 +285,11 @@ namespace llarp
std::set<IPRange> m_OwnedRanges;
/// how long to wait for path alignment
llarp_time_t m_PathAlignmentTimeout;
/// idempotent wakeup for writing packets to user
std::shared_ptr<EventLoopWakeup> m_PacketSendWaker;
/// idempotent wakeup for writing messages to network
std::shared_ptr<EventLoopWakeup> m_MessageSendWaker;
};
} // namespace handlers

@ -329,7 +329,8 @@ namespace llarp
{
if (m_State == State::Ready || m_State == State::LinkIntro)
{
return now > m_LastRX && now - m_LastRX > SessionAliveTimeout;
return now > m_LastRX
&& now - m_LastRX > (m_Inbound ? DefaultLinkSessionLifetime : SessionAliveTimeout);
}
return now - m_CreatedAt >= LinkLayerConnectTimeout;
}

@ -185,10 +185,16 @@ namespace llarp
return;
util::Lock l(_mutex);
auto& curr = m_PersistingSessions[remote];
if (until > curr)
curr = until;
LogDebug("persist session to ", remote, " until ", curr - time_now_ms());
m_PersistingSessions[remote] = std::max(until, m_PersistingSessions[remote]);
if (auto maybe = SessionIsClient(remote))
{
if (*maybe)
{
// mark this as a client so we don't try to back connect
m_Clients.Upsert(remote);
}
}
}
void
@ -337,43 +343,42 @@ namespace llarp
return;
std::vector<RouterID> sessionsNeeded;
std::vector<RouterID> sessionsClosed;
{
util::Lock l(_mutex);
auto itr = m_PersistingSessions.begin();
while (itr != m_PersistingSessions.end())
for (auto [remote, until] : m_PersistingSessions)
{
if (now < itr->second)
if (now < until)
{
auto link = GetLinkWithSessionTo(itr->first);
auto link = GetLinkWithSessionTo(remote);
if (link)
{
link->KeepAliveSessionTo(itr->first);
link->KeepAliveSessionTo(remote);
}
else
else if (not m_Clients.Contains(remote))
{
sessionsNeeded.push_back(itr->first);
sessionsNeeded.push_back(remote);
}
++itr;
}
else
else if (not m_Clients.Contains(remote))
{
const RouterID r(itr->first);
LogInfo("commit to ", r, " expired");
itr = m_PersistingSessions.erase(itr);
for (const auto& link : outboundLinks)
{
link->CloseSessionTo(r);
}
sessionsClosed.push_back(remote);
}
}
}
for (const auto& router : sessionsNeeded)
{
LogInfo("ensuring session to ", router, " for previously made commitment");
_sessionMaker->CreateSessionTo(router, nullptr);
}
for (const auto& router : sessionsClosed)
{
m_PersistingSessions.erase(router);
ForEachOutboundLink([router](auto link) { link->CloseSessionTo(router); });
}
}
void

@ -111,6 +111,8 @@ namespace llarp
std::unordered_map<RouterID, SessionStats> m_lastRouterStats;
util::DecayingHashSet<RouterID> m_Clients{path::default_lifetime};
IOutboundSessionMaker* _sessionMaker;
};

@ -111,7 +111,7 @@ namespace llarp
{
return path->HandleDownstream(llarp_buffer_t(X), Y, r);
}
llarp::LogWarn("unhandled downstream message id=", pathid);
llarp::LogWarn("no path for downstream message id=", pathid);
return false;
}
} // namespace llarp

@ -204,6 +204,7 @@ namespace llarp
static void
OnForwardLRCMResult(
AbstractRouter* router,
std::shared_ptr<path::TransitHop> path,
const PathID_t pathid,
const RouterID nextHop,
const SharedSecret pathKey,
@ -236,9 +237,8 @@ namespace llarp
std::abort();
break;
}
router->QueueWork([router, pathid, nextHop, pathKey, status] {
LR_StatusMessage::CreateAndSend(router, pathid, nextHop, pathKey, status);
router->QueueWork([router, path, pathid, nextHop, pathKey, status] {
LR_StatusMessage::CreateAndSend(router, path, pathid, nextHop, pathKey, status);
});
}
@ -251,6 +251,7 @@ namespace llarp
llarp::LogError("duplicate transit hop ", self->hop->info);
LR_StatusMessage::CreateAndSend(
self->context->Router(),
self->hop,
self->hop->info.rxID,
self->hop->info.downstream,
self->hop->pathKey,
@ -269,6 +270,7 @@ namespace llarp
llarp::LogError("client path build hit limit ", *self->fromAddr);
OnForwardLRCMResult(
self->context->Router(),
self->hop,
self->hop->info.rxID,
self->hop->info.downstream,
self->hop->pathKey,
@ -288,6 +290,7 @@ namespace llarp
"not allowed, dropping build request on the floor");
OnForwardLRCMResult(
self->context->Router(),
self->hop,
self->hop->info.rxID,
self->hop->info.downstream,
self->hop->pathKey,
@ -305,13 +308,15 @@ namespace llarp
self->context->PutTransitHop(self->hop);
// forward to next hop
using std::placeholders::_1;
auto func = std::bind(
&OnForwardLRCMResult,
self->context->Router(),
self->hop->info.rxID,
self->hop->info.downstream,
self->hop->pathKey,
_1);
auto func = [self](auto status) {
OnForwardLRCMResult(
self->context->Router(),
self->hop,
self->hop->info.rxID,
self->hop->info.downstream,
self->hop->pathKey,
status);
};
self->context->ForwardLRCM(self->hop->info.upstream, self->frames, func);
self->hop = nullptr;
}
@ -338,6 +343,7 @@ namespace llarp
if (!LR_StatusMessage::CreateAndSend(
self->context->Router(),
self->hop,
self->hop->info.rxID,
self->hop->info.downstream,
self->hop->pathKey,

@ -22,21 +22,21 @@ namespace llarp
std::array<EncryptedFrame, 8> frames;
uint64_t status = 0;
HopHandler_ptr path;
HopHandler_ptr hop;
AbstractRouter* router;
PathID_t pathid;
LRSM_AsyncHandler(
std::array<EncryptedFrame, 8> _frames,
uint64_t _status,
HopHandler_ptr _path,
HopHandler_ptr _hop,
AbstractRouter* _router,
const PathID_t& pathid)
: frames(std::move(_frames))
, status(_status)
, path(std::move(_path))
, router(_router)
, pathid(pathid)
: frames{std::move(_frames)}
, status{_status}
, hop{std::move(_hop)}
, router{_router}
, pathid{pathid}
{}
~LRSM_AsyncHandler() = default;
@ -45,8 +45,7 @@ namespace llarp
handle()
{
router->NotifyRouterEvent<tooling::PathStatusReceivedEvent>(router->pubkey(), pathid, status);
path->HandleLRSM(status, frames, router);
hop->HandleLRSM(status, frames, router);
}
void
@ -133,16 +132,13 @@ namespace llarp
}
auto path = router->pathContext().GetByUpstream(session->GetPubKey(), pathid);
if (!path)
if (not path)
{
llarp::LogWarn("unhandled LR_Status message: no associated path found pathid=", pathid);
return false;
}
auto handler = std::make_shared<LRSM_AsyncHandler>(frames, status, path, router, pathid);
handler->queue_handle();
return true;
}
@ -157,6 +153,7 @@ namespace llarp
bool
LR_StatusMessage::CreateAndSend(
AbstractRouter* router,
std::shared_ptr<path::TransitHop> hop,
const PathID_t pathid,
const RouterID nextHop,
const SharedSecret pathKey,
@ -169,12 +166,9 @@ namespace llarp
message->SetDummyFrames();
if (!message->AddFrame(pathKey, status))
{
return false;
}
message->AddFrame(pathKey, status);
QueueSendMessage(router, nextHop, message);
QueueSendMessage(router, nextHop, message, hop);
return true; // can't guarantee delivery here, as far as we know it's fine
}
@ -221,10 +215,19 @@ namespace llarp
void
LR_StatusMessage::QueueSendMessage(
AbstractRouter* router, const RouterID nextHop, std::shared_ptr<LR_StatusMessage> msg)
AbstractRouter* router,
const RouterID nextHop,
std::shared_ptr<LR_StatusMessage> msg,
std::shared_ptr<path::TransitHop> hop)
{
router->loop()->call(
[router, nextHop, msg = std::move(msg)] { SendMessage(router, nextHop, msg); });
router->loop()->call([router, nextHop, msg = std::move(msg), hop = std::move(hop)] {
SendMessage(router, nextHop, msg);
// destroy hop as needed
if ((msg->status & LR_StatusRecord::SUCCESS) != LR_StatusRecord::SUCCESS)
{
hop->QueueDestroySelf(router);
}
});
}
void
@ -273,4 +276,32 @@ namespace llarp
return status == other.status;
}
std::string
LRStatusCodeToString(uint64_t status)
{
std::map<uint64_t, std::string> codes = {
{LR_StatusRecord::SUCCESS, "success"},
{LR_StatusRecord::FAIL_TIMEOUT, "timeout"},
{LR_StatusRecord::FAIL_CONGESTION, "congestion"},
{LR_StatusRecord::FAIL_DEST_UNKNOWN, "destination unknown"},
{LR_StatusRecord::FAIL_DECRYPT_ERROR, "decrypt error"},
{LR_StatusRecord::FAIL_MALFORMED_RECORD, "malformed record"},
{LR_StatusRecord::FAIL_DEST_INVALID, "destination invalid"},
{LR_StatusRecord::FAIL_CANNOT_CONNECT, "cannot connect"},
{LR_StatusRecord::FAIL_DUPLICATE_HOP, "duplicate hop"}};
std::stringstream ss;
ss << "[";
bool found = false;
for (const auto& [val, message] : codes)
{
if ((status & val) == val)
{
ss << (found ? ", " : "") << message;
found = true;
}
}
ss << "]";
return ss.str();
}
} // namespace llarp

@ -18,6 +18,7 @@ namespace llarp
{
struct PathContext;
struct IHopHandler;
struct TransitHop;
} // namespace path
struct LR_StatusRecord
@ -49,6 +50,9 @@ namespace llarp
OnKey(llarp_buffer_t* buffer, llarp_buffer_t* key);
};
std::string
LRStatusCodeToString(uint64_t status);
struct LR_StatusMessage : public ILinkMessage
{
std::array<EncryptedFrame, 8> frames;
@ -83,6 +87,7 @@ namespace llarp
static bool
CreateAndSend(
AbstractRouter* router,
std::shared_ptr<path::TransitHop> hop,
const PathID_t pathid,
const RouterID nextHop,
const SharedSecret pathKey,
@ -93,7 +98,10 @@ namespace llarp
static void
QueueSendMessage(
AbstractRouter* router, const RouterID nextHop, std::shared_ptr<LR_StatusMessage> msg);
AbstractRouter* router,
const RouterID nextHop,
std::shared_ptr<LR_StatusMessage> msg,
std::shared_ptr<path::TransitHop> hop);
static void
SendMessage(

@ -7,6 +7,7 @@
#include <llarp/messages/relay_status.hpp>
#include "pathbuilder.hpp"
#include "transit_hop.hpp"
#include <llarp/nodedb.hpp>
#include <llarp/profiling.hpp>
#include <llarp/router/abstractrouter.hpp>
#include <llarp/routing/dht_message.hpp>
@ -25,10 +26,10 @@ namespace llarp
{
Path::Path(
const std::vector<RouterContact>& h,
PathSet* parent,
std::weak_ptr<PathSet> pathset,
PathRole startingRoles,
std::string shortName)
: m_PathSet(parent), _role(startingRoles), m_shortName(std::move(shortName))
: m_PathSet{pathset}, _role{startingRoles}, m_shortName{std::move(shortName)}
{
hops.resize(h.size());
@ -54,7 +55,7 @@ namespace llarp
// initialize parts of the introduction
intro.router = hops[hsz - 1].rc.pubkey;
intro.pathID = hops[hsz - 1].txID;
if (parent)
if (auto parent = m_PathSet.lock())
EnterState(ePathBuilding, parent->Now());
}
@ -184,10 +185,6 @@ namespace llarp
{
failedAt = hops[index + 1].rc.pubkey;
}
else
{
failedAt = hops[index].rc.pubkey;
}
break;
}
++index;
@ -203,7 +200,14 @@ namespace llarp
if (failedAt)
{
r->NotifyRouterEvent<tooling::PathBuildRejectedEvent>(Endpoint(), RXID(), *failedAt);
LogWarn(Name(), " build failed at ", *failedAt);
LogWarn(
Name(),
" build failed at ",
*failedAt,
" status=",
LRStatusCodeToString(currentStatus),
" hops=",
HopsString());
r->routerProfiling().MarkHopFail(*failedAt);
}
else
@ -229,6 +233,13 @@ namespace llarp
llarp::LogDebug(
"Path build failed due to one or more nodes considered an "
"invalid destination");
if (failedAt)
{
r->loop()->call([nodedb = r->nodedb(), router = *failedAt]() {
LogInfo("router ", router, " is deregistered so we remove it");
nodedb->Remove(router);
});
}
}
else if (currentStatus & LR_StatusRecord::FAIL_CANNOT_CONNECT)
{
@ -257,7 +268,10 @@ namespace llarp
edge = *failedAt;
r->loop()->call([r, self = shared_from_this(), edge]() {
self->EnterState(ePathFailed, r->Now());
self->m_PathSet->HandlePathBuildFailedAt(self, edge);
if (auto parent = self->m_PathSet.lock())
{
parent->HandlePathBuildFailedAt(self, edge);
}
});
}
@ -276,7 +290,10 @@ namespace llarp
if (st == ePathExpired && _status == ePathBuilding)
{
_status = st;
m_PathSet->HandlePathBuildTimeout(shared_from_this());
if (auto parent = m_PathSet.lock())
{
parent->HandlePathBuildTimeout(shared_from_this());
}
}
else if (st == ePathBuilding)
{
@ -291,12 +308,19 @@ namespace llarp
{
LogInfo("path ", Name(), " died");
_status = st;
m_PathSet->HandlePathDied(shared_from_this());
if (auto parent = m_PathSet.lock())
{
parent->HandlePathDied(shared_from_this());
}
}
else if (st == ePathEstablished && _status == ePathTimeout)
{
LogInfo("path ", Name(), " reanimated");
}
else if (st == ePathIgnore)
{
LogInfo("path ", Name(), " ignored");
}
_status = st;
}
@ -371,11 +395,14 @@ namespace llarp
void
Path::Rebuild()
{
std::vector<RouterContact> newHops;
for (const auto& hop : hops)
newHops.emplace_back(hop.rc);
LogInfo(Name(), " rebuilding on ", ShortName());
m_PathSet->Build(newHops);
if (auto parent = m_PathSet.lock())
{
std::vector<RouterContact> newHops;
for (const auto& hop : hops)
newHops.emplace_back(hop.rc);
LogInfo(Name(), " rebuilding on ", ShortName());
parent->Build(newHops);
}
}
void
@ -418,6 +445,9 @@ namespace llarp
m_LastLatencyTestTime = now;
SendRoutingMessage(latency, r);
FlushUpstream(r);
// reset ID so we don't mark ourself as dead if we drop a latency sample
r->loop()->call_later(
1s, [self = shared_from_this()]() { self->m_LastLatencyTestID = 0; });
return;
}
dlt = now - m_LastRecvMessage;
@ -510,7 +540,7 @@ namespace llarp
{
return now >= m_LastRecvMessage + PathReanimationTimeout;
}
if (_status == ePathEstablished)
if (_status == ePathEstablished or _status == ePathIgnore)
{
return now >= ExpireTime();
}
@ -678,10 +708,28 @@ namespace llarp
bool
Path::HandleHiddenServiceFrame(const service::ProtocolFrame& frame)
{
MarkActive(m_PathSet->Now());
return m_DataHandler && m_DataHandler(shared_from_this(), frame);
if (auto parent = m_PathSet.lock())
{
MarkActive(parent->Now());
return m_DataHandler && m_DataHandler(shared_from_this(), frame);
}
return false;
}
template <typename Samples_t>
static llarp_time_t
computeLatency(const Samples_t& samps)
{
llarp_time_t mean = 0s;
if (samps.empty())
return mean;
for (const auto& samp : samps)
mean += samp;
return mean / samps.size();
}
constexpr auto MaxLatencySamples = 8;
bool
Path::HandlePathLatencyMessage(const routing::PathLatencyMessage& msg, AbstractRouter* r)
{
@ -689,17 +737,19 @@ namespace llarp
MarkActive(now);
if (msg.L == m_LastLatencyTestID)
{
intro.latency = now - m_LastLatencyTestTime;
m_LatencySamples.emplace_back(now - m_LastLatencyTestTime);
while (m_LatencySamples.size() > MaxLatencySamples)
m_LatencySamples.pop_front();
intro.latency = computeLatency(m_LatencySamples);
m_LastLatencyTestID = 0;
EnterState(ePathEstablished, now);
if (m_BuiltHook)
m_BuiltHook(shared_from_this());
m_BuiltHook = nullptr;
return true;
}
LogWarn("unwarranted path latency message via ", Upstream());
return false;
return true;
}
/// this is the Client's side of handling a DHT message. it's handled

@ -87,7 +87,7 @@ namespace llarp
HopList hops;
PathSet* const m_PathSet;
std::weak_ptr<PathSet> m_PathSet;
service::Introduction intro;
@ -95,7 +95,7 @@ namespace llarp
Path(
const std::vector<RouterContact>& routers,
PathSet* parent,
std::weak_ptr<PathSet> parent,
PathRole startingRoles,
std::string shortName);
@ -424,7 +424,7 @@ namespace llarp
uint64_t m_RXRate = 0;
uint64_t m_LastTXRate = 0;
uint64_t m_TXRate = 0;
std::deque<llarp_time_t> m_LatencySamples;
const std::string m_shortName;
};
} // namespace path

@ -95,8 +95,9 @@ namespace llarp
typename Map_t,
typename Key_t,
typename CheckValue_t,
typename GetFunc_t>
HopHandler_ptr
typename GetFunc_t,
typename Return_ptr = HopHandler_ptr>
Return_ptr
MapGet(Map_t& map, const Key_t& k, CheckValue_t check, GetFunc_t get)
{
Lock_t lock(map.first);
@ -172,6 +173,46 @@ namespace llarp
});
}
std::optional<std::weak_ptr<TransitHop>>
PathContext::TransitHopByInfo(const TransitHopInfo& info)
{
// this is ugly as sin
auto own = MapGet<
SyncTransitMap_t::Lock_t,
decltype(m_TransitPaths),
PathID_t,
std::function<bool(const std::shared_ptr<TransitHop>&)>,
std::function<TransitHop*(const std::shared_ptr<TransitHop>&)>,
TransitHop*>(
m_TransitPaths,
info.txID,
[info](const auto& hop) -> bool { return hop->info == info; },
[](const auto& hop) -> TransitHop* { return hop.get(); });
if (own)
return own->weak_from_this();
return std::nullopt;
}
std::optional<std::weak_ptr<TransitHop>>
PathContext::TransitHopByUpstream(const RouterID& upstream, const PathID_t& id)
{
// this is ugly as sin as well
auto own = MapGet<
SyncTransitMap_t::Lock_t,
decltype(m_TransitPaths),
PathID_t,
std::function<bool(const std::shared_ptr<TransitHop>&)>,
std::function<TransitHop*(const std::shared_ptr<TransitHop>&)>,
TransitHop*>(
m_TransitPaths,
id,
[upstream](const auto& hop) -> bool { return hop->info.upstream == upstream; },
[](const auto& hop) -> TransitHop* { return hop.get(); });
if (own)
return own->weak_from_this();
return std::nullopt;
}
HopHandler_ptr
PathContext::GetByUpstream(const RouterID& remote, const PathID_t& id)
{
@ -225,7 +266,8 @@ namespace llarp
auto itr = map.second.find(id);
if (itr != map.second.end())
{
return itr->second->m_PathSet->GetSelf();
if (auto parent = itr->second->m_PathSet.lock())
return parent;
}
return nullptr;
}

@ -77,6 +77,12 @@ namespace llarp
HopHandler_ptr
GetByDownstream(const RouterID& id, const PathID_t& path);
std::optional<std::weak_ptr<TransitHop>>
TransitHopByInfo(const TransitHopInfo&);
std::optional<std::weak_ptr<TransitHop>>
TransitHopByUpstream(const RouterID&, const PathID_t&);
PathSet_ptr
GetLocalPathSet(const PathID_t& id);

@ -6,8 +6,10 @@
#include "path_context.hpp"
#include <llarp/profiling.hpp>
#include <llarp/router/abstractrouter.hpp>
#include <llarp/router/i_rc_lookup_handler.hpp>
#include <llarp/util/buffer.hpp>
#include <llarp/tooling/path_event.hpp>
#include <llarp/link/link_manager.hpp>
#include <functional>
@ -192,6 +194,9 @@ namespace llarp
void Builder::Tick(llarp_time_t)
{
const auto now = llarp::time_now_ms();
m_router->pathBuildLimiter().Decay(now);
ExpirePaths(now, m_router);
if (ShouldBuildMore(now))
BuildOne();
@ -211,8 +216,8 @@ namespace llarp
{
util::StatusObject obj{
{"buildStats", m_BuildStats.ExtractStatus()},
{"numHops", uint64_t(numHops)},
{"numPaths", uint64_t(numDesiredPaths)}};
{"numHops", uint64_t{numHops}},
{"numPaths", uint64_t{numDesiredPaths}}};
std::transform(
m_Paths.begin(),
m_Paths.end(),
@ -240,6 +245,9 @@ namespace llarp
if (BuildCooldownHit(rc.pubkey))
return;
if (m_router->routerProfiling().IsBadForPath(rc.pubkey))
return;
found = rc;
}
},
@ -251,7 +259,7 @@ namespace llarp
Builder::GetHopsForBuild()
{
auto filter = [r = m_router](const auto& rc) -> bool {
return not r->routerProfiling().IsBadForPath(rc.pubkey);
return not r->routerProfiling().IsBadForPath(rc.pubkey, 1);
};
if (const auto maybe = m_router->nodedb()->GetRandom(filter))
{
@ -264,14 +272,12 @@ namespace llarp
Builder::Stop()
{
_run = false;
// tell all our paths that they have expired
// tell all our paths that they are to be ignored
const auto now = Now();
for (auto& item : m_Paths)
{
item.second->EnterState(ePathExpired, now);
item.second->EnterState(ePathIgnore, now);
}
// remove expired paths
ExpirePaths(now, m_router);
return true;
}
@ -284,7 +290,7 @@ namespace llarp
bool
Builder::ShouldRemove() const
{
return IsStopped();
return IsStopped() and NumInStatus(ePathEstablished) == 0;
}
const SecretKey&
@ -368,7 +374,7 @@ namespace llarp
hopsSet.insert(endpointRC);
hopsSet.insert(hops.begin(), hops.end());
if (r->routerProfiling().IsBadForPath(rc.pubkey))
if (r->routerProfiling().IsBadForPath(rc.pubkey, 1))
return false;
for (const auto& hop : hopsSet)
{
@ -430,7 +436,7 @@ namespace llarp
ctx->pathset = self;
std::string path_shortName = "[path " + m_router->ShortName() + "-";
path_shortName = path_shortName + std::to_string(m_router->NextPathBuildNumber()) + "]";
auto path = std::make_shared<path::Path>(hops, self.get(), roles, std::move(path_shortName));
auto path = std::make_shared<path::Path>(hops, GetWeak(), roles, std::move(path_shortName));
LogInfo(Name(), " build ", path->ShortName(), ": ", path->HopsString());
path->SetBuildResultHook([self](Path_ptr p) { self->HandlePathBuilt(p); });
@ -473,6 +479,31 @@ namespace llarp
m_router->routerProfiling().MarkPathTimeout(p.get());
PathSet::HandlePathBuildTimeout(p);
DoPathBuildBackoff();
for (const auto& hop : p->hops)
{
const RouterID router{hop.rc.pubkey};
// look up router and see if it's still on the network
m_router->loop()->call_soon([router, r = m_router]() {
LogInfo("looking up ", router, " because of path build timeout");
r->rcLookupHandler().GetRC(
router,
[r](const auto& router, const auto* rc, auto result) {
if (result == RCRequestResult::Success && rc != nullptr)
{
LogInfo("refreshed rc for ", router);
r->nodedb()->PutIfNewer(*rc);
}
else
{
// remove all connections to this router as it's probably not registered anymore
LogWarn("removing router ", router, " because of path build timeout");
r->linkManager().DeregisterPeer(router);
r->nodedb()->Remove(router);
}
},
true);
});
}
}
void

@ -57,7 +57,7 @@ namespace llarp
DoPathBuildBackoff();
public:
AbstractRouter* m_router;
AbstractRouter* const m_router;
SecretKey enckey;
size_t numHops;
llarp_time_t lastBuild = 0s;

@ -96,9 +96,10 @@ namespace llarp
}
Path_ptr
PathSet::GetEstablishedPathClosestTo(RouterID id, PathRole roles) const
PathSet::GetEstablishedPathClosestTo(
RouterID id, std::unordered_set<RouterID> excluding, PathRole roles) const
{
Lock_t l(m_PathsMutex);
Lock_t l{m_PathsMutex};
Path_ptr path = nullptr;
AlignedBuffer<32> dist;
AlignedBuffer<32> to = id;
@ -109,6 +110,8 @@ namespace llarp
continue;
if (!item.second->SupportsAnyRoles(roles))
continue;
if (excluding.count(item.second->Endpoint()))
continue;
AlignedBuffer<32> localDist = item.second->Endpoint() ^ to;
if (localDist < dist)
{
@ -280,44 +283,24 @@ namespace llarp
return itr->second;
}
bool
std::optional<std::set<service::Introduction>>
PathSet::GetCurrentIntroductionsWithFilter(
std::set<service::Introduction>& intros,
std::function<bool(const service::Introduction&)> filter) const
{
intros.clear();
size_t count = 0;
Lock_t l(m_PathsMutex);
std::set<service::Introduction> intros;
Lock_t l{m_PathsMutex};
auto itr = m_Paths.begin();
while (itr != m_Paths.end())
{
if (itr->second->IsReady() && filter(itr->second->intro))
if (itr->second->IsReady() and filter(itr->second->intro))
{
intros.insert(itr->second->intro);
++count;
}
++itr;
}
return count > 0;
}
bool
PathSet::GetCurrentIntroductions(std::set<service::Introduction>& intros) const
{
intros.clear();
size_t count = 0;
Lock_t l(m_PathsMutex);
auto itr = m_Paths.begin();
while (itr != m_Paths.end())
{
if (itr->second->IsReady())
{
intros.insert(itr->second->intro);
++count;
}
++itr;
}
return count > 0;
if (intros.empty())
return std::nullopt;
return intros;
}
void

@ -13,6 +13,7 @@
#include <list>
#include <map>
#include <tuple>
#include <unordered_set>
namespace std
{
@ -116,6 +117,10 @@ namespace llarp
virtual PathSet_ptr
GetSelf() = 0;
/// get a weak_ptr of ourself
virtual std::weak_ptr<PathSet>
GetWeak() = 0;
virtual void
BuildOne(PathRole roles = ePathRoleAny) = 0;
@ -235,7 +240,10 @@ namespace llarp
}
Path_ptr
GetEstablishedPathClosestTo(RouterID router, PathRole roles = ePathRoleAny) const;
GetEstablishedPathClosestTo(
RouterID router,
std::unordered_set<RouterID> excluding = {},
PathRole roles = ePathRoleAny) const;
Path_ptr
PickEstablishedPath(PathRole roles = ePathRoleAny) const;
@ -258,14 +266,10 @@ namespace llarp
Path_ptr
GetByEndpointWithID(RouterID router, PathID_t id) const;
bool
std::optional<std::set<service::Introduction>>
GetCurrentIntroductionsWithFilter(
std::set<service::Introduction>& intros,
std::function<bool(const service::Introduction&)> filter) const;
bool
GetCurrentIntroductions(std::set<service::Introduction>& intros) const;
virtual bool
PublishIntroSet(const service::EncryptedIntroSet&, AbstractRouter*)
{

@ -64,22 +64,9 @@ namespace llarp
// TODO: add to IHopHandler some notion of "path status"
const uint64_t ourStatus = LR_StatusRecord::SUCCESS;
if (!msg->AddFrame(pathKey, ourStatus))
{
return false;
}
LR_StatusMessage::QueueSendMessage(r, info.downstream, msg);
if ((status & LR_StatusRecord::SUCCESS) != LR_StatusRecord::SUCCESS)
{
LogWarn(
"TransitHop received non-successful LR_StatusMessage, queueing "
"self-destruct status=",
status);
QueueDestroySelf(r);
}
msg->AddFrame(pathKey, ourStatus);
LR_StatusMessage::QueueSendMessage(r, info.downstream, msg, shared_from_this());
return true;
}

@ -185,6 +185,9 @@ namespace llarp
void
FlushDownstream(AbstractRouter* r) override;
void
QueueDestroySelf(AbstractRouter* r);
protected:
void
UpstreamWork(TrafficQueue_ptr queue, AbstractRouter* r) override;
@ -202,9 +205,6 @@ namespace llarp
void
SetSelfDestruct();
void
QueueDestroySelf(AbstractRouter* r);
std::set<std::shared_ptr<TransitHop>, ComparePtr<std::shared_ptr<TransitHop>>> m_FlushOthers;
thread::Queue<RelayUpstreamMessage> m_UpstreamGather;
thread::Queue<RelayDownstreamMessage> m_DownstreamGather;

@ -64,7 +64,7 @@ namespace llarp
void
RouterProfile::Tick()
{
static constexpr auto updateInterval = 30min;
static constexpr auto updateInterval = 30s;
const auto now = llarp::time_now_ms();
if (lastDecay < now && now - lastDecay > updateInterval)
Decay();
@ -96,8 +96,9 @@ namespace llarp
bool
RouterProfile::IsGoodForPath(uint64_t chances) const
{
return checkIsGood(pathFailCount, pathSuccessCount, chances)
and checkIsGood(pathTimeoutCount, pathSuccessCount, chances);
if (pathTimeoutCount > chances)
return false;
return checkIsGood(pathFailCount, pathSuccessCount, chances);
}
Profiling::Profiling() : m_DisableProfiling(false)
@ -210,15 +211,10 @@ namespace llarp
Profiling::MarkPathTimeout(path::Path* p)
{
util::Lock lock{m_ProfilesMutex};
size_t idx = 0;
for (const auto& hop : p->hops)
{
if (idx)
{
m_Profiles[hop.rc.pubkey].pathTimeoutCount += 1;
m_Profiles[hop.rc.pubkey].lastUpdated = llarp::time_now_ms();
}
++idx;
m_Profiles[hop.rc.pubkey].pathTimeoutCount += 1;
m_Profiles[hop.rc.pubkey].lastUpdated = llarp::time_now_ms();
}
}

@ -528,6 +528,7 @@ namespace llarp::quic
if (not continue_connecting(
pport, (bool)maybe_remote, "endpoint ONS lookup", remote_addr))
return;
service_endpoint_.MarkAddressOutbound(*maybe_remote);
service_endpoint_.EnsurePathTo(*maybe_remote, after_path, open_timeout);
});
return result;
@ -539,7 +540,10 @@ namespace llarp::quic
if (auto maybe_convo = service_endpoint_.GetBestConvoTagFor(remote))
after_path(maybe_convo);
else
{
service_endpoint_.MarkAddressOutbound(remote);
service_endpoint_.EnsurePathTo(remote, after_path, open_timeout);
}
return result;
}

@ -42,7 +42,7 @@
#include <oxenmq/oxenmq.h>
static constexpr std::chrono::milliseconds ROUTER_TICK_INTERVAL = 100ms;
static constexpr std::chrono::milliseconds ROUTER_TICK_INTERVAL = 250ms;
namespace llarp
{
@ -885,7 +885,12 @@ namespace llarp
if (decom)
{
// complain about being deregistered
LogError("We are running as a service node but we seem to be decommissioned");
if (now >= m_NextDecommissionWarn)
{
constexpr auto DecommissionWarnInterval = 30s;
LogError("We are running as a service node but we seem to be decommissioned");
m_NextDecommissionWarn = now + DecommissionWarnInterval;
}
}
else if (connected < connectToNum)
{
@ -1402,8 +1407,10 @@ namespace llarp
void
Router::QueueWork(std::function<void(void)> func)
{
_loop->call_soon(func);
// m_lmq->job(std::move(func));
if (m_isServiceNode)
_loop->call_soon(std::move(func));
else
m_lmq->job(std::move(func));
}
void

@ -539,7 +539,7 @@ namespace llarp
bool m_isServiceNode = false;
llarp_time_t m_LastStatsReport = 0s;
llarp_time_t m_NextDecommissionWarn = 0s;
std::shared_ptr<llarp::KeyManager> m_keyManager;
std::shared_ptr<PeerDb> m_peerDb;

@ -480,6 +480,7 @@ namespace llarp::rpc
onGoodResult("added null exit");
return;
}
ep->MarkAddressOutbound(addr);
ep->EnsurePathToService(
addr,
[onBadResult, onGoodResult, shouldSendAuth, addrStr = addr.ToString()](

@ -101,11 +101,16 @@ namespace llarp
{
const auto now = llarp::time_now_ms();
m_LastIntrosetRegenAttempt = now;
std::set<Introduction> introset;
if (!GetCurrentIntroductionsWithFilter(
introset, [now](const service::Introduction& intro) -> bool {
return not intro.ExpiresSoon(now, path::min_intro_lifetime);
std::set<Introduction, CompareIntroTimestamp> intros;
if (const auto maybe =
GetCurrentIntroductionsWithFilter([now](const service::Introduction& intro) -> bool {
return not intro.ExpiresSoon(
now, path::default_lifetime - path::min_intro_lifetime);
}))
{
intros.insert(maybe->begin(), maybe->end());
}
else
{
LogWarn(
"could not publish descriptors for endpoint ",
@ -146,9 +151,10 @@ namespace llarp
}
introSet().intros.clear();
for (auto& intro : introset)
for (auto& intro : intros)
{
introSet().intros.emplace_back(std::move(intro));
if (introSet().intros.size() < numDesiredPaths)
introSet().intros.emplace_back(std::move(intro));
}
if (introSet().intros.empty())
{
@ -710,8 +716,10 @@ namespace llarp
return false;
auto next_pub = m_state->m_LastPublishAttempt
+ (m_state->m_IntroSet.HasExpiredIntros(now) ? INTROSET_PUBLISH_RETRY_INTERVAL
: INTROSET_PUBLISH_INTERVAL);
+ (m_state->m_IntroSet.HasStaleIntros(
now, path::default_lifetime - path::intro_path_spread)
? IntrosetPublishRetryCooldown
: IntrosetPublishInterval);
return now >= next_pub and m_LastIntrosetRegenAttempt + 1s <= now;
}
@ -739,8 +747,11 @@ namespace llarp
{
std::unordered_set<RouterID> exclude;
ForEachPath([&exclude](auto path) { exclude.insert(path->Endpoint()); });
const auto maybe = m_router->nodedb()->GetRandom(
[exclude](const auto& rc) -> bool { return exclude.count(rc.pubkey) == 0; });
const auto maybe =
m_router->nodedb()->GetRandom([exclude, r = m_router](const auto& rc) -> bool {
return exclude.count(rc.pubkey) == 0
and not r->routerProfiling().IsBadForPath(rc.pubkey);
});
if (not maybe.has_value())
return std::nullopt;
return GetHopsForBuildWithEndpoint(maybe->pubkey);
@ -758,46 +769,27 @@ namespace llarp
path::Builder::PathBuildStarted(path);
}
constexpr auto MaxOutboundContextPerRemote = 4;
constexpr auto MaxOutboundContextPerRemote = 1;
void
Endpoint::PutNewOutboundContext(const service::IntroSet& introset, llarp_time_t left)
{
Address addr{introset.addressKeys.Addr()};
const Address addr{introset.addressKeys.Addr()};
auto& remoteSessions = m_state->m_RemoteSessions;
auto& serviceLookups = m_state->m_PendingServiceLookups;
if (remoteSessions.count(addr) >= MaxOutboundContextPerRemote)
if (remoteSessions.count(addr) < MaxOutboundContextPerRemote)
{
auto itr = remoteSessions.find(addr);
auto range = serviceLookups.equal_range(addr);
auto i = range.first;
while (i != range.second)
{
itr->second->SetReadyHook(
[callback = i->second, addr](auto session) { callback(addr, session); }, left);
++i;
}
serviceLookups.erase(addr);
return;
remoteSessions.emplace(addr, std::make_shared<OutboundContext>(introset, this));
LogInfo("Created New outbound context for ", addr.ToString());
}
auto session = std::make_shared<OutboundContext>(introset, this);
remoteSessions.emplace(addr, session);
LogInfo("Created New outbound context for ", addr.ToString());
// inform pending
auto range = serviceLookups.equal_range(addr);
auto itr = range.first;
if (itr != range.second)
auto sessionRange = remoteSessions.equal_range(addr);
for (auto itr = sessionRange.first; itr != sessionRange.second; ++itr)
{
session->SetReadyHook(
[callback = itr->second, addr](auto session) { callback(addr, session); }, left);
++itr;
itr->second->AddReadyHook(
[addr, this](auto session) { InformPathToService(addr, session); }, left);
}
serviceLookups.erase(addr);
}
void
@ -924,7 +916,7 @@ namespace llarp
paths.insert(path);
});
constexpr size_t min_unique_lns_endpoints = 3;
constexpr size_t min_unique_lns_endpoints = 2;
// not enough paths
if (paths.size() < min_unique_lns_endpoints)
@ -1013,7 +1005,15 @@ namespace llarp
msg.S = path->NextSeqNo();
if (path && path->SendRoutingMessage(msg, Router()))
{
RouterLookupJob job{this, handler};
RouterLookupJob job{this, [handler, router, nodedb = m_router->nodedb()](auto results) {
if (results.empty())
{
LogInfo("could not find ", router, ", remove it from nodedb");
nodedb->Remove(router);
}
if (handler)
handler(results);
}};
assert(msg.M.size() == 1);
auto dhtMsg = dynamic_cast<FindRouterMessage*>(msg.M[0].get());
@ -1066,11 +1066,11 @@ namespace llarp
void
Endpoint::QueueRecvData(RecvDataEvent ev)
{
if (m_RecvQueue.full() || m_RecvQueue.empty())
if (m_RecvQueue.full() or m_RecvQueue.empty())
{
m_router->loop()->call([this] { FlushRecvData(); });
m_router->loop()->call_soon([this] { FlushRecvData(); });
}
m_RecvQueue.pushBack(std::move(ev));
m_RecvQueue.tryPushBack(std::move(ev));
}
bool
@ -1078,14 +1078,15 @@ namespace llarp
path::Path_ptr path, const PathID_t from, std::shared_ptr<ProtocolMessage> msg)
{
msg->sender.UpdateAddr();
if (not HasOutboundConvo(msg->sender.Addr()))
PutSenderFor(msg->tag, msg->sender, true);
PutSenderFor(msg->tag, msg->sender, true);
PutReplyIntroFor(msg->tag, path->intro);
Introduction intro;
intro.pathID = from;
intro.router = PubKey{path->Endpoint()};
intro.expiresAt = std::min(path->ExpireTime(), msg->introReply.expiresAt);
intro.latency = path->intro.latency;
PutIntroFor(msg->tag, intro);
PutReplyIntroFor(msg->tag, path->intro);
ConvoTagRX(msg->tag);
return ProcessDataMessage(msg);
}
@ -1178,10 +1179,11 @@ namespace llarp
// not applicable because we are not an exit or don't have an endpoint auth policy
if ((not m_state->m_ExitEnabled) or m_AuthPolicy == nullptr)
return;
ProtocolFrame f;
ProtocolFrame f{};
f.R = AuthResultCodeAsInt(result.code);
f.T = tag;
f.F = path->intro.pathID;
f.N.Randomize();
if (result.code == AuthResultCode::eAuthAccepted)
{
ProtocolMessage msg;
@ -1189,10 +1191,7 @@ namespace llarp
std::vector<byte_t> reason{};
reason.resize(result.reason.size());
std::copy_n(result.reason.c_str(), reason.size(), reason.data());
msg.PutBuffer(reason);
f.N.Randomize();
f.C.Zero();
if (m_AuthPolicy)
msg.proto = ProtocolType::Auth;
else
@ -1234,6 +1233,23 @@ namespace llarp
Sessions().erase(t);
}
void
Endpoint::ResetConvoTag(ConvoTag tag, path::Path_ptr p, PathID_t from)
{
// send reset convo tag message
ProtocolFrame f{};
f.R = 1;
f.T = tag;
f.F = p->intro.pathID;
f.Sign(m_Identity);
{
LogWarn("invalidating convotag T=", tag);
RemoveConvoTag(tag);
m_SendQueue.tryPushBack(
SendEvent_t{std::make_shared<routing::PathTransferMessage>(f, from), p});
}
}
bool
Endpoint::HandleHiddenServiceFrame(path::Path_ptr p, const ProtocolFrame& frame)
{
@ -1247,29 +1263,13 @@ namespace llarp
if (!frame.Verify(si))
return false;
// remove convotag it doesn't exist
LogWarn("remove convotag T=", frame.T, " R=", frame.R);
LogWarn("remove convotag T=", frame.T, " R=", frame.R, " from ", si.Addr());
RemoveConvoTag(frame.T);
return true;
}
if (not frame.AsyncDecryptAndVerify(Router()->loop(), p, m_Identity, this))
{
LogError("Failed to decrypt protocol frame");
if (not frame.C.IsZero())
{
// send reset convo tag message
ProtocolFrame f;
f.R = 1;
f.T = frame.T;
f.F = p->intro.pathID;
f.Sign(m_Identity);
{
LogWarn("invalidating convotag T=", frame.T);
RemoveConvoTag(frame.T);
m_SendQueue.tryPushBack(
SendEvent_t{std::make_shared<routing::PathTransferMessage>(f, frame.F), p});
}
}
ResetConvoTag(frame.T, p, frame.F);
}
return true;
}
@ -1279,8 +1279,8 @@ namespace llarp
{
m_router->routerProfiling().MarkPathTimeout(p.get());
ManualRebuild(1);
RegenAndPublishIntroSet();
path::Builder::HandlePathDied(p);
RegenAndPublishIntroSet();
}
bool
@ -1294,22 +1294,65 @@ namespace llarp
const Address& addr,
std::optional<IntroSet> introset,
const RouterID& endpoint,
llarp_time_t timeLeft)
llarp_time_t timeLeft,
uint64_t relayOrder)
{
// tell all our existing remote sessions about this introset update
const auto now = Router()->Now();
auto& fails = m_state->m_ServiceLookupFails;
auto& lookups = m_state->m_PendingServiceLookups;
if (introset)
{
auto& sessions = m_state->m_RemoteSessions;
auto range = sessions.equal_range(addr);
auto itr = range.first;
while (itr != range.second)
{
itr->second->OnIntroSetUpdate(addr, introset, endpoint, timeLeft, relayOrder);
// we got a successful lookup
if (itr->second->ReadyToSend() and not introset->IsExpired(now))
{
// inform all lookups
auto lookup_range = lookups.equal_range(addr);
auto i = lookup_range.first;
while (i != lookup_range.second)
{
i->second(addr, itr->second.get());
++i;
}
lookups.erase(addr);
}
++itr;
}
}
auto& fails = m_state->m_ServiceLookupFails;
if (not introset or introset->IsExpired(now))
{
LogError(Name(), " failed to lookup ", addr.ToString(), " from ", endpoint);
LogError(
Name(),
" failed to lookup ",
addr.ToString(),
" from ",
endpoint,
" order=",
relayOrder);
fails[endpoint] = fails[endpoint] + 1;
// inform one
auto range = lookups.equal_range(addr);
auto itr = range.first;
if (itr != range.second)
const auto pendingForAddr = std::count_if(
m_state->m_PendingLookups.begin(),
m_state->m_PendingLookups.end(),
[addr](const auto& item) -> bool { return item.second->IsFor(addr); });
// inform all if we have no more pending lookups for this address
if (pendingForAddr == 0)
{
itr->second(addr, nullptr);
itr = lookups.erase(itr);
auto range = lookups.equal_range(addr);
auto itr = range.first;
while (itr != range.second)
{
itr->second(addr, nullptr);
itr = lookups.erase(itr);
}
}
return false;
}
@ -1323,9 +1366,10 @@ namespace llarp
}
void
Endpoint::MarkAddressOutbound(const Address& addr)
Endpoint::MarkAddressOutbound(AddressVariant_t addr)
{
m_state->m_OutboundSessions.insert(addr);
if (auto* ptr = std::get_if<Address>(&addr))
m_state->m_OutboundSessions.insert(*ptr);
}
bool
@ -1334,15 +1378,38 @@ namespace llarp
return m_state->m_OutboundSessions.count(addr) > 0;
}
void
Endpoint::InformPathToService(const Address remote, OutboundContext* ctx)
{
auto& serviceLookups = m_state->m_PendingServiceLookups;
auto range = serviceLookups.equal_range(remote);
auto itr = range.first;
while (itr != range.second)
{
itr->second(remote, ctx);
++itr;
}
serviceLookups.erase(remote);
}
bool
Endpoint::EnsurePathToService(const Address remote, PathEnsureHook hook, llarp_time_t timeout)
{
if (not WantsOutboundSession(remote))
{
// we don't want to ensure paths to addresses that are inbound
// inform fail right away in that case
hook(remote, nullptr);
return false;
}
/// how many routers to use for lookups
static constexpr size_t NumParallelLookups = 2;
/// how many requests per router
static constexpr size_t RequestsPerLookup = 2;
MarkAddressOutbound(remote);
// add response hook to list for address.
m_state->m_PendingServiceLookups.emplace(remote, hook);
auto& sessions = m_state->m_RemoteSessions;
{
@ -1352,20 +1419,17 @@ namespace llarp
{
if (itr->second->ReadyToSend())
{
hook(remote, itr->second.get());
InformPathToService(remote, itr->second.get());
return true;
}
++itr;
}
}
// add response hook to list for address.
m_state->m_PendingServiceLookups.emplace(remote, hook);
/// check replay filter
if (not m_IntrosetLookupFilter.Insert(remote))
return true;
const auto paths = GetManyPathsWithUniqueEndpoints(this, NumParallelLookups);
const auto paths = GetManyPathsWithUniqueEndpoints(this, NumParallelLookups, remote.ToKey());
using namespace std::placeholders;
const dht::Key_t location = remote.ToKey();
@ -1381,15 +1445,15 @@ namespace llarp
{
HiddenServiceAddressLookup* job = new HiddenServiceAddressLookup(
this,
[this](auto addr, auto result, auto from, auto left) {
return OnLookup(addr, result, from, left);
[this](auto addr, auto result, auto from, auto left, auto order) {
return OnLookup(addr, result, from, left, order);
},
location,
PubKey{remote.as_array()},
path->Endpoint(),
order,
GenTXID(),
timeout);
timeout + (2 * path->intro.latency));
LogInfo(
"doing lookup for ",
remote,
@ -1425,14 +1489,8 @@ namespace llarp
bool
Endpoint::EnsurePathToSNode(const RouterID snode, SNodeEnsureHook h)
{
static constexpr size_t MaxConcurrentSNodeSessions = 16;
auto& nodeSessions = m_state->m_SNodeSessions;
if (nodeSessions.size() >= MaxConcurrentSNodeSessions)
{
// a quick client side work arround before we do proper limiting
LogError(Name(), " has too many snode sessions");
return false;
}
using namespace std::placeholders;
if (nodeSessions.count(snode) == 0)
{
@ -1615,6 +1673,14 @@ namespace llarp
if (session.inbound)
{
auto path = GetPathByRouter(session.replyIntro.router);
// if we have no path to the remote router that's fine still use it just in case this
// is the ONLY one we have
if (path == nullptr)
{
ret = tag;
continue;
}
if (path and path->IsReady())
{
const auto rttEstimate = (session.replyIntro.latency + path->intro.latency) * 2;
@ -1679,6 +1745,13 @@ namespace llarp
Loop()->call_soon([tag, hook]() { hook(tag); });
return true;
}
if (not WantsOutboundSession(*ptr))
{
// we don't want to connect back to inbound sessions
hook(std::nullopt);
return true;
}
return EnsurePathToService(
*ptr,
[hook](auto, auto* ctx) {
@ -1799,8 +1872,7 @@ namespace llarp
LogError("failed to encrypt and sign");
return;
}
self->m_SendQueue.pushBack(SendEvent_t{transfer, p});
;
self->m_SendQueue.tryPushBack(SendEvent_t{transfer, p});
});
return true;
}
@ -1811,7 +1883,7 @@ namespace llarp
}
else
{
LogWarn("Have inbound convo but get-best returned none; bug?");
LogWarn("Have inbound convo from ", remote, " but get-best returned none; bug?");
}
}
@ -1883,10 +1955,13 @@ namespace llarp
bool
Endpoint::ShouldBuildMore(llarp_time_t now) const
{
if (not path::Builder::ShouldBuildMore(now))
if (BuildCooldownHit(now))
return false;
const auto requiredPaths = std::max(numDesiredPaths, path::min_intro_paths);
if (NumInStatus(path::ePathBuilding) >= requiredPaths)
return false;
return ((now - lastBuild) > path::intro_path_spread)
|| NumInStatus(path::ePathEstablished) < path::min_intro_paths;
return NumPathsExistingAt(now + (path::default_lifetime - path::intro_path_spread))
< requiredPaths;
}
AbstractRouter*

@ -49,12 +49,13 @@ namespace llarp
struct OutboundContext;
/// minimum interval for publishing introsets
static constexpr auto INTROSET_PUBLISH_INTERVAL =
std::chrono::milliseconds(path::default_lifetime) / 4;
static constexpr auto IntrosetPublishInterval = path::intro_path_spread / 2;
static constexpr auto INTROSET_PUBLISH_RETRY_INTERVAL = 5s;
/// how agressively should we retry publishing introset on failure
static constexpr auto IntrosetPublishRetryCooldown = 1s;
static constexpr auto INTROSET_LOOKUP_RETRY_COOLDOWN = 3s;
/// how aggressively should we retry looking up introsets
static constexpr auto IntrosetLookupCooldown = 250ms;
struct Endpoint : public path::Builder,
public ILookupHolder,
@ -284,8 +285,8 @@ namespace llarp
bool
WantsOutboundSession(const Address&) const override;
void
MarkAddressOutbound(const Address&) override;
/// this MUST be called if you want to call EnsurePathTo on the given address
void MarkAddressOutbound(AddressVariant_t) override;
bool
ShouldBundleRC() const override
@ -330,6 +331,9 @@ namespace llarp
using SNodeEnsureHook = std::function<void(const RouterID, exit::BaseSession_ptr, ConvoTag)>;
void
InformPathToService(const Address remote, OutboundContext* ctx);
/// ensure a path to a service node by public key
bool
EnsurePathToSNode(const RouterID remote, SNodeEnsureHook h);
@ -415,6 +419,9 @@ namespace llarp
uint64_t
GenTXID();
void
ResetConvoTag(ConvoTag tag, path::Path_ptr path, PathID_t from);
const std::set<RouterID>&
SnodeBlacklist() const;
@ -471,7 +478,8 @@ namespace llarp
const service::Address& addr,
std::optional<IntroSet> i,
const RouterID& endpoint,
llarp_time_t timeLeft);
llarp_time_t timeLeft,
uint64_t relayOrder);
bool
DoNetworkIsolation(bool failed);

@ -37,6 +37,7 @@ namespace llarp
void
EndpointUtil::ExpirePendingTx(llarp_time_t now, PendingLookups& lookups)
{
std::vector<std::unique_ptr<IServiceLookup>> timedout;
for (auto itr = lookups.begin(); itr != lookups.end();)
{
if (!itr->second->IsTimedOut(now))
@ -44,11 +45,14 @@ namespace llarp
++itr;
continue;
}
std::unique_ptr<IServiceLookup> lookup = std::move(itr->second);
timedout.emplace_back(std::move(itr->second));
itr = lookups.erase(itr);
}
for (const auto& lookup : timedout)
{
LogWarn(lookup->name, " timed out txid=", lookup->txid);
lookup->HandleTimeout();
itr = lookups.erase(itr);
}
}
@ -95,7 +99,11 @@ namespace llarp
itr->second->Tick(now);
if (itr->second->Pump(now))
{
LogInfo("marking session as dead T=", itr->second->currentConvoTag);
LogInfo(
"marking session as dead T=",
itr->second->currentConvoTag,
" to ",
itr->second->Addr());
itr->second->Stop();
sessions.erase(itr->second->currentConvoTag);
deadSessions.emplace(std::move(*itr));
@ -106,6 +114,10 @@ namespace llarp
++itr;
}
}
for (auto& item : deadSessions)
{
item.second->Tick(now);
}
}
void
@ -116,7 +128,7 @@ namespace llarp
{
if (itr->second.IsExpired(now))
{
LogInfo("Expire session T=", itr->first);
LogInfo("Expire session T=", itr->first, " to ", itr->second.Addr());
itr = sessions.erase(itr);
}
else

@ -43,15 +43,31 @@ namespace llarp
template <typename Endpoint_t>
static path::Path::UniqueEndpointSet_t
GetManyPathsWithUniqueEndpoints(Endpoint_t* ep, size_t N, size_t tries = 10)
GetManyPathsWithUniqueEndpoints(
Endpoint_t* ep,
size_t N,
std::optional<dht::Key_t> maybeLocation = std::nullopt,
size_t tries = 10)
{
std::unordered_set<RouterID> exclude;
path::Path::UniqueEndpointSet_t paths;
do
{
--tries;
const auto path = ep->PickRandomEstablishedPath();
path::Path_ptr path;
if (maybeLocation)
{
path = ep->GetEstablishedPathClosestTo(RouterID{maybeLocation->as_array()}, exclude);
}
else
{
path = ep->PickRandomEstablishedPath();
}
if (path and path->IsReady())
{
paths.emplace(path);
exclude.insert(path->Endpoint());
}
} while (tries > 0 and paths.size() < N);
return paths;
}

@ -77,10 +77,6 @@ namespace llarp
/// do we want a session outbound to addr
virtual bool
WantsOutboundSession(const Address& addr) const = 0;
virtual void
MarkAddressOutbound(const Address& addr) = 0;
virtual void
QueueRecvData(RecvDataEvent ev) = 0;
};

@ -46,7 +46,7 @@ namespace llarp
found = *maybe;
}
}
return handle(remote, found, endpoint, TimeLeft(time_now_ms()));
return handle(remote, found, endpoint, TimeLeft(time_now_ms()), relayOrder);
}
std::shared_ptr<routing::IMessage>

@ -15,7 +15,7 @@ namespace llarp
uint64_t relayOrder;
const dht::Key_t location;
using HandlerFunc = std::function<bool(
const Address&, std::optional<IntroSet>, const RouterID&, llarp_time_t)>;
const Address&, std::optional<IntroSet>, const RouterID&, llarp_time_t, uint64_t)>;
HandlerFunc handle;
HiddenServiceAddressLookup(
@ -30,6 +30,16 @@ namespace llarp
~HiddenServiceAddressLookup() override = default;
virtual bool
IsFor(EndpointBase::AddressVariant_t addr) const override
{
if (const auto* ptr = std::get_if<Address>(&addr))
{
return Address{rootkey} == *ptr;
}
return false;
}
bool
HandleIntrosetResponse(const std::set<EncryptedIntroSet>& results) override;

@ -9,6 +9,7 @@ namespace llarp
{
util::StatusObject obj{
{"router", router.ToHex()},
{"path", pathID.ToHex()},
{"expiresAt", to_json(expiresAt)},
{"latency", to_json(latency)},
{"version", uint64_t(version)}};
@ -66,8 +67,9 @@ namespace llarp
std::ostream&
Introduction::print(std::ostream& stream, int level, int spaces) const
{
const RouterID r{router};
Printer printer(stream, level, spaces);
printer.printAttribute("k", RouterID(router));
printer.printAttribute("k", r.ToString());
printer.printAttribute("l", latency.count());
printer.printAttribute("p", pathID);
printer.printAttribute("v", version);

@ -77,6 +77,16 @@ namespace llarp
{
return i.print(out, -1, -1);
}
/// comparator for introset timestamp
struct CompareIntroTimestamp
{
bool
operator()(const Introduction& left, const Introduction& right) const
{
return left.expiresAt > right.expiresAt;
}
};
} // namespace service
} // namespace llarp

@ -351,6 +351,15 @@ namespace llarp::service
return false;
}
bool
IntroSet::HasStaleIntros(llarp_time_t now, llarp_time_t delta) const
{
for (const auto& intro : intros)
if (intro.ExpiresSoon(now, delta))
return true;
return false;
}
bool
IntroSet::IsExpired(llarp_time_t now) const
{

@ -69,6 +69,10 @@ namespace llarp
bool
HasExpiredIntros(llarp_time_t now) const;
/// return true if any of our intros expires soon given a delta
bool
HasStaleIntros(llarp_time_t now, llarp_time_t delta) const;
bool
IsExpired(llarp_time_t now) const;

@ -4,6 +4,8 @@
#include "intro_set.hpp"
#include <llarp/path/pathset.hpp>
#include <llarp/endpoint_base.hpp>
#include <set>
namespace llarp
@ -73,6 +75,12 @@ namespace llarp
const std::string name;
RouterID endpoint;
/// return true if this lookup is for a remote address
virtual bool IsFor(EndpointBase::AddressVariant_t) const
{
return false;
}
util::StatusObject
ExtractStatus() const
{

@ -48,6 +48,7 @@ namespace llarp
MarkCurrentIntroBad(Now());
ShiftIntroduction(false);
UpdateIntroSet();
SwapIntros();
}
return true;
}
@ -58,17 +59,22 @@ namespace llarp
: path::Builder{parent->Router(), OutboundContextNumPaths, parent->numHops}
, SendContext{introset.addressKeys, {}, this, parent}
, location{introset.addressKeys.Addr().ToKey()}
, addr{introset.addressKeys.Addr()}
, currentIntroSet{introset}
{
updatingIntroSet = false;
for (const auto& intro : introset.intros)
{
if (intro.expiresAt > m_NextIntro.expiresAt)
if (m_NextIntro.latency == 0s or m_NextIntro.latency > intro.latency)
m_NextIntro = intro;
}
currentConvoTag.Randomize();
lastShift = Now();
// add send and connect timeouts to the parent endpoints path alignment timeout
// this will make it so that there is less of a chance for timing races
sendTimeout += parent->PathAlignmentTimeout();
connectTimeout += parent->PathAlignmentTimeout();
}
OutboundContext::~OutboundContext() = default;
@ -82,12 +88,28 @@ namespace llarp
remoteIntro = m_NextIntro;
m_DataHandler->PutSenderFor(currentConvoTag, currentIntroSet.addressKeys, false);
m_DataHandler->PutIntroFor(currentConvoTag, remoteIntro);
ShiftIntroRouter(m_NextIntro.router);
// if we have not made a handshake to the remote endpoint do so
if (not IntroGenerated())
{
KeepAlive();
}
}
}
Address
OutboundContext::Addr() const
{
return addr;
}
bool
OutboundContext::OnIntroSetUpdate(
const Address&, std::optional<IntroSet> foundIntro, const RouterID& endpoint, llarp_time_t)
const Address&,
std::optional<IntroSet> foundIntro,
const RouterID& endpoint,
llarp_time_t,
uint64_t relayOrder)
{
if (markedBad)
return true;
@ -112,8 +134,9 @@ namespace llarp
return true;
}
currentIntroSet = *foundIntro;
ShiftIntroRouter(RouterID{});
}
else
else if (relayOrder > 0)
{
++m_LookupFails;
LogWarn(Name(), " failed to look up introset, fails=", m_LookupFails);
@ -128,7 +151,7 @@ namespace llarp
return false;
if (remoteIntro.router.IsZero())
return false;
return GetPathByRouter(remoteIntro.router) != nullptr;
return IntroSent();
}
void
@ -145,8 +168,8 @@ namespace llarp
}
if (selectedIntro.router.IsZero() || selectedIntro.ExpiresSoon(now))
return;
LogWarn(Name(), " shfiting intro off of ", r, " to ", RouterID(selectedIntro.router));
m_NextIntro = selectedIntro;
lastShift = now;
}
void
@ -177,7 +200,7 @@ namespace llarp
p->SetDataHandler(util::memFn(&OutboundContext::HandleHiddenServiceFrame, this));
p->SetDropHandler(util::memFn(&OutboundContext::HandleDataDrop, this));
// we now have a path to the next intro, swap intros
if (p->Endpoint() == m_NextIntro.router or p->Endpoint() == remoteIntro.router)
if (p->Endpoint() == m_NextIntro.router)
SwapIntros();
else
{
@ -188,29 +211,25 @@ namespace llarp
void
OutboundContext::AsyncGenIntro(const llarp_buffer_t& payload, ProtocolType t)
{
if (sentIntro)
if (generatedIntro)
{
LogWarn(Name(), " dropping packet as we are not fully handshaked right now");
return;
}
if (remoteIntro.router.IsZero())
{
LogWarn(Name(), " dropping intro frame we have no intro ready yet");
return;
}
auto path = m_PathSet->GetPathByRouter(remoteIntro.router);
auto path = GetPathByRouter(remoteIntro.router);
if (path == nullptr)
{
// try parent as fallback
path = m_Endpoint->GetPathByRouter(remoteIntro.router);
if (path == nullptr)
{
if (!BuildCooldownHit(Now()))
BuildOneAlignedTo(remoteIntro.router);
LogWarn(Name(), " dropping intro frame, no path to ", remoteIntro.router);
return;
}
LogError(Name(), " has no path to ", remoteIntro.router, " when we should have had one");
return;
}
sentIntro = true;
auto frame = std::make_shared<ProtocolFrame>();
frame->Clear();
auto ex = std::make_shared<AsyncKeyExchange>(
m_Endpoint->Loop(),
remoteIdent,
@ -222,22 +241,29 @@ namespace llarp
t);
ex->hook = [self = shared_from_this(), path](auto frame) {
self->Send(std::move(frame), path);
if (not self->Send(std::move(frame), path))
return;
self->m_Endpoint->Loop()->call_later(
self->remoteIntro.latency, [self]() { self->sentIntro = true; });
};
ex->msg.PutBuffer(payload);
ex->msg.introReply = path->intro;
frame->F = ex->msg.introReply.pathID;
frame->R = 0;
generatedIntro = true;
// ensure we have a sender put for this convo tag
m_DataHandler->PutSenderFor(currentConvoTag, currentIntroSet.addressKeys, false);
// encrypt frame async
m_Endpoint->Router()->QueueWork([ex, frame] { return AsyncKeyExchange::Encrypt(ex, frame); });
LogInfo("send intro frame");
LogInfo(Name(), " send intro frame T=", currentConvoTag);
}
std::string
OutboundContext::Name() const
{
return "OBContext:" + m_Endpoint->Name() + "-"
+ currentIntroSet.addressKeys.Addr().ToString();
return "OBContext:" + currentIntroSet.addressKeys.Addr().ToString();
}
void
@ -249,10 +275,9 @@ namespace llarp
return;
LogInfo(Name(), " updating introset");
m_LastIntrosetUpdateAt = now;
const auto addr = currentIntroSet.addressKeys.Addr();
// we want to use the parent endpoint's paths because outbound context
// does not implement path::PathSet::HandleGotIntroMessage
const auto paths = GetManyPathsWithUniqueEndpoints(m_Endpoint, 2);
const auto paths = GetManyPathsWithUniqueEndpoints(m_Endpoint, 2, location);
uint64_t relayOrder = 0;
for (const auto& path : paths)
{
@ -264,7 +289,7 @@ namespace llarp
path->Endpoint(),
relayOrder,
m_Endpoint->GenTXID(),
5s);
(IntrosetUpdateInterval / 2) + (2 * path->intro.latency));
relayOrder++;
if (job->SendRequestViaPath(path, m_Endpoint->Router()))
updatingIntroSet = true;
@ -280,13 +305,15 @@ namespace llarp
obj["remoteIntro"] = remoteIntro.ExtractStatus();
obj["sessionCreatedAt"] = to_json(createdAt);
obj["lastGoodSend"] = to_json(lastGoodSend);
obj["lastRecv"] = to_json(m_LastInboundTraffic);
obj["lastIntrosetUpdate"] = to_json(m_LastIntrosetUpdateAt);
obj["seqno"] = sequenceNo;
obj["markedBad"] = markedBad;
obj["lastShift"] = to_json(lastShift);
obj["remoteIdentity"] = remoteIdent.Addr().ToString();
obj["remoteIdentity"] = addr.ToString();
obj["currentRemoteIntroset"] = currentIntroSet.ExtractStatus();
obj["nextIntro"] = m_NextIntro.ExtractStatus();
obj["readyToSend"] = ReadyToSend();
std::transform(
m_BadIntros.begin(),
m_BadIntros.end(),
@ -306,36 +333,31 @@ namespace llarp
bool
OutboundContext::Pump(llarp_time_t now)
{
// we are probably dead af
if (m_LookupFails > 16 || m_BuildFails > 10)
return true;
constexpr auto InboundTrafficTimeout = 5s;
if (ReadyToSend() and remoteIntro.router.IsZero())
{
SwapIntros();
}
if (m_GotInboundTraffic and m_LastInboundTraffic + InboundTrafficTimeout <= now)
if ((remoteIntro.router.IsZero() or m_BadIntros.count(remoteIntro))
and GetPathByRouter(m_NextIntro.router))
SwapIntros();
if (m_GotInboundTraffic and m_LastInboundTraffic + sendTimeout <= now)
{
if (std::chrono::abs(now - lastGoodSend) < InboundTrafficTimeout)
{
// timeout on other side
MarkCurrentIntroBad(now);
}
// timeout on other side
UpdateIntroSet();
MarkCurrentIntroBad(now);
ShiftIntroRouter(remoteIntro.router);
}
// check for expiration
if (remoteIntro.ExpiresSoon(now))
// check for stale intros
// update the introset if we think we need to
if (currentIntroSet.HasStaleIntros(now, path::intro_path_spread))
{
UpdateIntroSet();
// shift intro if it expires "soon"
if (ShiftIntroduction())
SwapIntros(); // swap intros if we shifted
}
// lookup router in intro if set and unknown
m_Endpoint->EnsureRouterIsKnown(remoteIntro.router);
if (not m_NextIntro.router.IsZero())
m_Endpoint->EnsureRouterIsKnown(m_NextIntro.router);
// expire bad intros
auto itr = m_BadIntros.begin();
while (itr != m_BadIntros.end())
@ -346,7 +368,7 @@ namespace llarp
++itr;
}
if (ReadyToSend() and m_ReadyHook)
if (ReadyToSend() and not m_ReadyHooks.empty())
{
const auto path = GetPathByRouter(remoteIntro.router);
if (not path)
@ -354,32 +376,40 @@ namespace llarp
LogWarn(Name(), " ready but no path to ", remoteIntro.router, " ???");
return true;
}
m_ReadyHook(this);
m_ReadyHook = nullptr;
for (const auto& hook : m_ReadyHooks)
hook(this);
m_ReadyHooks.clear();
}
if (lastGoodSend > 0s and now >= lastGoodSend + (sendTimeout / 2))
const auto timeout = std::max(lastGoodSend, m_LastInboundTraffic);
if (lastGoodSend > 0s and now >= timeout + (sendTimeout / 2))
{
// send a keep alive to keep this session alive
KeepAlive();
}
// if we are dead return true so we are removed
return lastGoodSend > 0s ? (now >= lastGoodSend && now - lastGoodSend > sendTimeout)
: (now >= createdAt && now - createdAt > connectTimeout);
return timeout > 0s ? (now >= timeout && now - timeout > sendTimeout)
: (now >= createdAt && now - createdAt > connectTimeout);
}
void
OutboundContext::SetReadyHook(std::function<void(OutboundContext*)> hook, llarp_time_t timeout)
OutboundContext::AddReadyHook(std::function<void(OutboundContext*)> hook, llarp_time_t timeout)
{
if (m_ReadyHook)
if (ReadyToSend())
{
hook(this);
return;
m_ReadyHook = hook;
m_router->loop()->call_later(timeout, [this]() {
if (m_ReadyHook)
m_ReadyHook(nullptr);
m_ReadyHook = nullptr;
});
}
if (m_ReadyHooks.empty())
{
m_router->loop()->call_later(timeout, [this]() {
LogWarn(Name(), " did not obtain session in time");
for (const auto& hook : m_ReadyHooks)
hook(nullptr);
m_ReadyHooks.clear();
});
}
m_ReadyHooks.push_back(hook);
}
std::optional<std::vector<RouterContact>>
@ -397,16 +427,22 @@ namespace llarp
bool
OutboundContext::ShouldBuildMore(llarp_time_t now) const
{
if (markedBad || not path::Builder::ShouldBuildMore(now))
if (markedBad or path::Builder::BuildCooldownHit(now))
return false;
if (NumInStatus(path::ePathBuilding) >= numDesiredPaths)
return false;
llarp_time_t t = 0s;
ForEachPath([&t](path::Path_ptr path) {
if (path->IsReady())
t = std::max(path->ExpireTime(), t);
if (m_BadIntros.count(remoteIntro))
return true;
size_t numValidPaths = 0;
ForEachPath([now, &numValidPaths](path::Path_ptr path) {
if (not path->IsReady())
return;
if (not path->intro.ExpiresSoon(now, path::default_lifetime - path::intro_path_spread))
numValidPaths++;
});
return t >= now + path::default_lifetime / 4;
return numValidPaths < numDesiredPaths;
}
void
@ -422,12 +458,24 @@ namespace llarp
m_BadIntros[intro] = now;
}
bool
OutboundContext::IntroSent() const
{
return sentIntro;
}
bool
OutboundContext::IntroGenerated() const
{
return generatedIntro;
}
bool
OutboundContext::ShiftIntroduction(bool rebuild)
{
bool success = false;
auto now = Now();
if (now - lastShift < MIN_SHIFT_INTERVAL)
const auto now = Now();
if (abs(now - lastShift) < shiftTimeout)
return false;
bool shifted = false;
std::vector<Introduction> intros = currentIntroSet.intros;
@ -488,7 +536,7 @@ namespace llarp
{
// unconditionally update introset
UpdateIntroSet();
const RouterID endpoint(path->Endpoint());
const RouterID endpoint{path->Endpoint()};
// if a path to our current intro died...
if (endpoint == remoteIntro.router)
{
@ -498,50 +546,13 @@ namespace llarp
if (p->Endpoint() == endpoint && p->IsReady())
++num;
});
// if we have more than two then we are probably fine
if (num > 2)
return;
// if we have one working one ...
if (num == 1)
if (num == 0)
{
num = 0;
ForEachPath([&](const path::Path_ptr& p) {
if (p->Endpoint() == endpoint)
++num;
});
// if we have 2 or more established or pending don't do anything
if (num > 2)
return;
BuildOneAlignedTo(endpoint);
}
else if (num == 0)
{
// we have no paths to this router right now
// hop off it
Introduction picked;
// get the latest intro that isn't on that endpoint
for (const auto& intro : currentIntroSet.intros)
{
if (intro.router == endpoint)
continue;
if (intro.expiresAt > picked.expiresAt)
picked = intro;
}
// we got nothing
if (picked.router.IsZero())
{
return;
}
m_NextIntro = picked;
// check if we have a path to this router
num = 0;
ForEachPath([&](const path::Path_ptr& p) {
// don't count timed out paths
if (p->Status() != path::ePathTimeout && p->Endpoint() == m_NextIntro.router)
++num;
});
// build a path if one isn't already pending build or established
BuildOneAlignedTo(m_NextIntro.router);
// we have no more paths to this endpoint so we want to pivot off of it
MarkCurrentIntroBad(Now());
ShiftIntroRouter(endpoint);
if (m_NextIntro.router != endpoint)
BuildOneAlignedTo(m_NextIntro.router);
}
}
}

@ -38,6 +38,15 @@ namespace llarp
return shared_from_this();
}
std::weak_ptr<path::PathSet>
GetWeak() override
{
return weak_from_this();
}
Address
Addr() const;
bool
Stop() override;
@ -57,7 +66,7 @@ namespace llarp
/// shift the intro off the current router it is using
void
ShiftIntroRouter(const RouterID remote);
ShiftIntroRouter(const RouterID remote) override;
/// mark the current remote intro as bad
void
@ -71,7 +80,7 @@ namespace llarp
ReadyToSend() const;
void
SetReadyHook(std::function<void(OutboundContext*)> readyHook, llarp_time_t timeout);
AddReadyHook(std::function<void(OutboundContext*)> readyHook, llarp_time_t timeout);
/// for exits
void
@ -129,19 +138,26 @@ namespace llarp
llarp_time_t
RTT() const;
bool
OnIntroSetUpdate(
const Address& addr,
std::optional<IntroSet> i,
const RouterID& endpoint,
llarp_time_t,
uint64_t relayOrder);
private:
/// swap remoteIntro with next intro
void
SwapIntros();
void
OnGeneratedIntroFrame(AsyncKeyExchange* k, PathID_t p);
bool
OnIntroSetUpdate(
const Address& addr, std::optional<IntroSet> i, const RouterID& endpoint, llarp_time_t);
IntroGenerated() const override;
bool
IntroSent() const override;
const dht::Key_t location;
const Address addr;
uint64_t m_UpdateIntrosetTX = 0;
IntroSet currentIntroSet;
Introduction m_NextIntro;
@ -151,8 +167,9 @@ namespace llarp
uint16_t m_BuildFails = 0;
llarp_time_t m_LastInboundTraffic = 0s;
bool m_GotInboundTraffic = false;
bool generatedIntro = false;
bool sentIntro = false;
std::function<void(OutboundContext*)> m_ReadyHook;
std::vector<std::function<void(OutboundContext*)>> m_ReadyHooks;
llarp_time_t m_LastIntrosetUpdateAt = 0s;
};
} // namespace service

@ -458,15 +458,21 @@ namespace llarp
}
};
handler->Router()->QueueWork(
[v, msg = std::move(msg), recvPath = std::move(recvPath), callback]() {
[v, msg = std::move(msg), recvPath = std::move(recvPath), callback, handler]() {
auto resetTag = [handler, tag = v->frame.T, from = v->frame.F, path = recvPath]() {
handler->ResetConvoTag(tag, path, from);
};
if (not v->frame.Verify(v->si))
{
LogError("Signature failure from ", v->si.Addr());
handler->Loop()->call_soon(resetTag);
return;
}
if (not v->frame.DecryptPayloadInto(v->shared, *msg))
{
LogError("failed to decrypt message");
LogError("failed to decrypt message from ", v->si.Addr());
handler->Loop()->call_soon(resetTag);
return;
}
callback(msg);

@ -96,7 +96,7 @@ namespace llarp
version = other.version;
}
ProtocolFrame() : routing::IMessage()
ProtocolFrame() : routing::IMessage{}
{
Clear();
}

@ -21,18 +21,19 @@ namespace llarp
, m_Endpoint(ep)
, createdAt(ep->Now())
, m_SendQueue(SendContextQueueSize)
{}
{
m_FlushWakeup = ep->Loop()->make_waker([this] { FlushUpstream(); });
}
bool
SendContext::Send(std::shared_ptr<ProtocolFrame> msg, path::Path_ptr path)
{
if (m_SendQueue.empty() or m_SendQueue.full())
{
m_Endpoint->Loop()->call([this] { FlushUpstream(); });
}
m_SendQueue.pushBack(std::make_pair(
std::make_shared<routing::PathTransferMessage>(*msg, remoteIntro.pathID), path));
return true;
if (not path->IsReady())
return false;
m_FlushWakeup->Trigger();
return m_SendQueue.tryPushBack(std::make_pair(
std::make_shared<routing::PathTransferMessage>(*msg, remoteIntro.pathID), path))
== thread::QueueReturn::Success;
}
void
@ -84,13 +85,15 @@ namespace llarp
auto path = m_PathSet->GetPathByRouter(remoteIntro.router);
if (!path)
{
LogWarn(m_Endpoint->Name(), " cannot encrypt and send: no path for intro ", remoteIntro);
ShiftIntroRouter(remoteIntro.router);
LogWarn(m_PathSet->Name(), " cannot encrypt and send: no path for intro ", remoteIntro);
return;
}
if (!m_DataHandler->GetCachedSessionKeyFor(f->T, shared))
{
LogWarn(m_Endpoint->Name(), " has no cached session key on session T=", f->T);
LogWarn(
m_PathSet->Name(), " could not send, has no cached session key on session T=", f->T);
return;
}
@ -104,7 +107,7 @@ namespace llarp
}
else
{
LogWarn(m_Endpoint->Name(), " no session T=", f->T);
LogWarn(m_PathSet->Name(), " could not get sequence number for session T=", f->T);
return;
}
m->introReply = path->intro;
@ -115,7 +118,7 @@ namespace llarp
m_Endpoint->Router()->QueueWork([f, m, shared, path, this] {
if (not f->EncryptAndSign(*m, shared, m_Endpoint->GetIdentity()))
{
LogError(m_Endpoint->Name(), " failed to sign message");
LogError(m_PathSet->Name(), " failed to sign message");
return;
}
Send(f, path);
@ -140,11 +143,21 @@ namespace llarp
void
SendContext::AsyncEncryptAndSendTo(const llarp_buffer_t& data, ProtocolType protocol)
{
if (lastGoodSend != 0s)
if (IntroSent())
{
EncryptAndSendTo(data, protocol);
return;
}
// have we generated the initial intro but not sent it yet? bail here so we don't cause
// bullshittery
if (IntroGenerated() and not IntroSent())
{
LogWarn(
m_PathSet->Name(),
" we have generated an intial handshake but have not sent it yet so we drop a packet "
"to prevent bullshittery");
return;
}
const auto maybe = m_Endpoint->MaybeGetAuthInfoForEndpoint(remoteIdent.Addr());
if (maybe.has_value())
{

@ -44,8 +44,9 @@ namespace llarp
uint64_t sequenceNo = 0;
llarp_time_t lastGoodSend = 0s;
const llarp_time_t createdAt;
llarp_time_t sendTimeout = 40s;
llarp_time_t connectTimeout = 60s;
llarp_time_t sendTimeout = path::build_timeout;
llarp_time_t connectTimeout = path::build_timeout;
llarp_time_t shiftTimeout = (path::build_timeout * 5) / 2;
llarp_time_t estimatedRTT = 0s;
bool markedBad = false;
using Msg_ptr = std::shared_ptr<routing::PathTransferMessage>;
@ -55,6 +56,8 @@ namespace llarp
std::function<void(AuthResult)> authResultListener;
std::shared_ptr<EventLoopWakeup> m_FlushWakeup;
virtual bool
ShiftIntroduction(bool rebuild = true)
{
@ -62,6 +65,9 @@ namespace llarp
return true;
}
virtual void
ShiftIntroRouter(const RouterID) = 0;
virtual void
UpdateIntroSet() = 0;
@ -72,6 +78,11 @@ namespace llarp
AsyncSendAuth(std::function<void(AuthResult)> replyHandler);
private:
virtual bool
IntroGenerated() const = 0;
virtual bool
IntroSent() const = 0;
void
EncryptAndSendTo(const llarp_buffer_t& payload, ProtocolType t);

@ -11,7 +11,7 @@ namespace llarp
{"lastSend", to_json(lastSend)},
{"lastRecv", to_json(lastRecv)},
{"replyIntro", replyIntro.ExtractStatus()},
{"remote", remote.Addr().ToString()},
{"remote", Addr().ToString()},
{"seqno", seqno},
{"tx", messagesSend},
{"rx", messagesRecv},
@ -19,6 +19,12 @@ namespace llarp
return obj;
}
Address
Session::Addr() const
{
return remote.Addr();
}
bool
Session::IsExpired(llarp_time_t now, llarp_time_t lifetime) const
{

@ -49,6 +49,9 @@ namespace llarp
bool
IsExpired(llarp_time_t now, llarp_time_t lifetime = SessionLifetime) const;
Address
Addr() const;
};
} // namespace service

@ -38,6 +38,13 @@ namespace llarp
return m_Values.try_emplace(v, now).second;
}
/// upsert will insert or update a value with time as now
void
Upsert(const Val_t& v)
{
m_Values[v] = llarp::time_now_ms();
}
/// decay hashset entries
void
Decay(Time_t now = 0s)

@ -54,6 +54,12 @@ namespace llarp
return shared_from_this();
}
std::weak_ptr<path::PathSet>
GetWeak() override
{
return weak_from_this();
}
bool
SupportsV6() const override
{

@ -20,7 +20,7 @@ MakePath(std::vector< char > hops)
std::vector< RC_t > pathHops;
for(const auto& hop : hops)
pathHops.push_back(MakeHop(hop));
return std::make_shared< Path_t >(pathHops, nullptr, 0, "test");
return std::make_shared< Path_t >(pathHops, std::weak_ptr<llarp::path::PathSet>{}, 0, "test");
}
TEST_CASE("UniqueEndpointSet_t has unique endpoints", "[path]")

Loading…
Cancel
Save