Remove AsyncKeyExchange, HiddenServiceAddressLookup and OutboundContext to their own components

pull/536/head
Michael 5 years ago
parent 3db6d80928
commit 6bf54e0925
No known key found for this signature in database
GPG Key ID: 2D51757B47E2434C

@ -214,6 +214,8 @@ set(LIB_SRC
routing/path_latency.cpp
routing/path_transfer.cpp
rpc/rpc.cpp
service/async_key_exchange.cpp
service/hidden_service_address_lookup.cpp
service/Identity.cpp
service/Intro.cpp
service/IntroSet.cpp
@ -224,6 +226,7 @@ set(LIB_SRC
service/handler.cpp
service/info.cpp
service/lookup.cpp
service/outbound_context.cpp
service/pendingbuffer.cpp
service/protocol.cpp
service/sendcontext.cpp

@ -336,9 +336,11 @@ namespace llarp
else
{
dns::Message *replyMsg = new dns::Message(std::move(msg));
using service::Address;
using service::OutboundContext;
return EnsurePathToService(
addr,
[=](const service::Address &remote, OutboundContext *ctx) {
[=](const Address &remote, OutboundContext *ctx) {
SendDNSReply(remote, ctx, replyMsg, reply, false, isV6);
},
2000);

@ -0,0 +1,86 @@
#include <service/async_key_exchange.hpp>
#include <crypto/crypto.hpp>
#include <crypto/types.hpp>
#include <util/logic.hpp>
namespace llarp
{
namespace service
{
AsyncKeyExchange::AsyncKeyExchange(Logic* l, Crypto* c,
const ServiceInfo& r,
const Identity& localident,
const PQPubKey& introsetPubKey,
const Introduction& remote,
IDataHandler* h, const ConvoTag& t)
: logic(l)
, crypto(c)
, remote(r)
, m_LocalIdentity(localident)
, introPubKey(introsetPubKey)
, remoteIntro(remote)
, handler(h)
, tag(t)
{
}
void
AsyncKeyExchange::Result(void* user)
{
AsyncKeyExchange* self = static_cast< AsyncKeyExchange* >(user);
// put values
self->handler->PutCachedSessionKeyFor(self->msg.tag, self->sharedKey);
self->handler->PutIntroFor(self->msg.tag, self->remoteIntro);
self->handler->PutSenderFor(self->msg.tag, self->remote);
self->handler->PutReplyIntroFor(self->msg.tag, self->msg.introReply);
self->hook(self->frame);
delete self;
}
void
AsyncKeyExchange::Encrypt(void* user)
{
AsyncKeyExchange* self = static_cast< AsyncKeyExchange* >(user);
// derive ntru session key component
SharedSecret K;
self->crypto->pqe_encrypt(self->frame.C, K, self->introPubKey);
// randomize Nonce
self->frame.N.Randomize();
// compure post handshake session key
// PKE (A, B, N)
SharedSecret sharedSecret;
using namespace std::placeholders;
path_dh_func dh_client =
std::bind(&Crypto::dh_client, self->crypto, _1, _2, _3, _4);
if(!self->m_LocalIdentity.KeyExchange(dh_client, sharedSecret,
self->remote, self->frame.N))
{
LogError("failed to derive x25519 shared key component");
}
std::array< byte_t, 64 > tmp = {{0}};
// K
std::copy(K.begin(), K.end(), tmp.begin());
// H (K + PKE(A, B, N))
std::copy(sharedSecret.begin(), sharedSecret.end(), tmp.begin() + 32);
self->crypto->shorthash(self->sharedKey, llarp_buffer_t(tmp));
// set tag
self->msg.tag = self->tag;
// set sender
self->msg.sender = self->m_LocalIdentity.pub;
// set version
self->msg.version = LLARP_PROTO_VERSION;
// set protocol
self->msg.proto = eProtocolTraffic;
// encrypt and sign
if(self->frame.EncryptAndSign(self->crypto, self->msg, K,
self->m_LocalIdentity))
self->logic->queue_job({self, &Result});
else
{
LogError("failed to encrypt and sign");
delete self;
}
}
} // namespace service
} // namespace llarp

@ -0,0 +1,48 @@
#ifndef LLARP_SERVICE_ASYNC_KEY_EXCHANGE_HPP
#define LLARP_SERVICE_ASYNC_KEY_EXCHANGE_HPP
#include <crypto/types.hpp>
#include <service/Identity.hpp>
#include <service/protocol.hpp>
namespace llarp
{
class Logic;
struct Crypto;
namespace service
{
struct AsyncKeyExchange
{
Logic* logic;
Crypto* crypto;
SharedSecret sharedKey;
ServiceInfo remote;
const Identity& m_LocalIdentity;
ProtocolMessage msg;
ProtocolFrame frame;
Introduction intro;
const PQPubKey introPubKey;
Introduction remoteIntro;
std::function< void(ProtocolFrame&) > hook;
IDataHandler* handler;
ConvoTag tag;
AsyncKeyExchange(Logic* l, Crypto* c, const ServiceInfo& r,
const Identity& localident,
const PQPubKey& introsetPubKey,
const Introduction& remote, IDataHandler* h,
const ConvoTag& t);
static void
Result(void* user);
/// given protocol message make protocol frame
static void
Encrypt(void* user);
};
} // namespace service
} // namespace llarp
#endif

@ -10,6 +10,8 @@
#include <nodedb.hpp>
#include <profiling.hpp>
#include <router/abstractrouter.hpp>
#include <service/hidden_service_address_lookup.hpp>
#include <service/outbound_context.hpp>
#include <service/protocol.hpp>
#include <util/logic.hpp>
#include <util/str.hpp>
@ -352,13 +354,6 @@ namespace llarp
}
}
bool
Endpoint::OutboundContext::Stop()
{
markedBad = true;
return path::Builder::Stop();
}
bool
Endpoint::Stop()
{
@ -375,13 +370,6 @@ namespace llarp
return path::Builder::Stop();
}
bool
Endpoint::OutboundContext::IsDone(llarp_time_t now) const
{
(void)now;
return AvailablePaths(path::ePathRoleAny) == 0 && ShouldRemove();
}
uint64_t
Endpoint::GenTXID()
{
@ -753,50 +741,6 @@ namespace llarp
LogInfo(Name(), " IntroSet publish confirmed");
}
struct HiddenServiceAddressLookup : public IServiceLookup
{
~HiddenServiceAddressLookup()
{
}
Address remote;
typedef std::function< bool(const Address&, const IntroSet*,
const RouterID&) >
HandlerFunc;
HandlerFunc handle;
HiddenServiceAddressLookup(Endpoint* p, HandlerFunc h,
const Address& addr, uint64_t tx)
: IServiceLookup(p, tx, "HSLookup"), remote(addr), handle(h)
{
}
bool
HandleResponse(const std::set< IntroSet >& results)
{
LogInfo("found ", results.size(), " for ", remote.ToString());
if(results.size() > 0)
{
IntroSet selected;
for(const auto& introset : results)
{
if(selected.OtherIsNewer(introset) && introset.A.Addr() == remote)
selected = introset;
}
return handle(remote, &selected, endpoint);
}
return handle(remote, nullptr, endpoint);
}
routing::IMessage*
BuildRequestMessage()
{
routing::DHTMessage* msg = new routing::DHTMessage();
msg->M.emplace_back(new dht::FindIntroMessage(txid, remote, 0));
return msg;
}
};
bool
Endpoint::DoNetworkIsolation(bool failed)
{
@ -935,25 +879,6 @@ namespace llarp
return true;
}
bool
Endpoint::OutboundContext::HandleDataDrop(path::Path* p,
const PathID_t& dst, uint64_t seq)
{
// pick another intro
if(dst == remoteIntro.pathID && remoteIntro.router == p->Endpoint())
{
LogWarn(Name(), " message ", seq, " dropped by endpoint ",
p->Endpoint(), " via ", dst);
if(MarkCurrentIntroBad(Now()))
{
LogInfo(Name(), " switched intros to ", remoteIntro.router, " via ",
remoteIntro.pathID);
}
UpdateIntroSet(true);
}
return true;
}
bool
Endpoint::HandleDataMessage(const PathID_t& src, ProtocolMessage* msg)
{
@ -1041,104 +966,18 @@ namespace llarp
return true;
}
void
Endpoint::OutboundContext::HandlePathBuilt(path::Path* p)
{
path::Builder::HandlePathBuilt(p);
/// don't use it if we are marked bad
if(markedBad)
return;
p->SetDataHandler(
std::bind(&Endpoint::OutboundContext::HandleHiddenServiceFrame, this,
std::placeholders::_1, std::placeholders::_2));
p->SetDropHandler(std::bind(
&Endpoint::OutboundContext::HandleDataDrop, this,
std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
}
void
Endpoint::HandlePathDied(path::Path*)
{
RegenAndPublishIntroSet(Now(), true);
}
void
Endpoint::OutboundContext::HandlePathDied(path::Path* path)
{
// unconditionally update introset
UpdateIntroSet(true);
const RouterID endpoint(path->Endpoint());
// if a path to our current intro died...
if(endpoint == remoteIntro.router)
{
// figure out how many paths to this router we have
size_t num = 0;
ForEachPath([&](path::Path* p) {
if(p->Endpoint() == endpoint && p->IsReady())
++num;
});
// if we have more than two then we are probably fine
if(num > 2)
return;
// if we have one working one ...
if(num == 1)
{
num = 0;
ForEachPath([&](path::Path* p) {
if(p->Endpoint() == endpoint)
++num;
});
// if we have 2 or more established or pending don't do anything
if(num > 2)
return;
BuildOneAlignedTo(endpoint);
}
else if(num == 0)
{
// we have no paths to this router right now
// hop off it
Introduction picked;
// get the latest intro that isn't on that endpoint
for(const auto& intro : currentIntroSet.I)
{
if(intro.router == endpoint)
continue;
if(intro.expiresAt > picked.expiresAt)
picked = intro;
}
// we got nothing
if(picked.router.IsZero())
{
return;
}
m_NextIntro = picked;
// check if we have a path to this router
num = 0;
ForEachPath([&](path::Path* p) {
if(p->Endpoint() == m_NextIntro.router)
++num;
});
// build a path if one isn't already pending build or established
if(num == 0)
BuildOneAlignedTo(m_NextIntro.router);
SwapIntros();
}
}
}
bool
Endpoint::CheckPathIsDead(path::Path*, llarp_time_t dlt)
{
return dlt > path::alive_timeout;
}
bool
Endpoint::OutboundContext::HandleHiddenServiceFrame(
path::Path* p, const ProtocolFrame* frame)
{
return m_Endpoint->HandleHiddenServiceFrame(p, frame);
}
bool
Endpoint::OnLookup(const Address& addr, const IntroSet* introset,
const RouterID& endpoint)
@ -1212,78 +1051,6 @@ namespace llarp
return false;
}
Endpoint::OutboundContext::OutboundContext(const IntroSet& introset,
Endpoint* parent)
: path::Builder(parent->m_Router, parent->m_Router->dht(), 3,
path::default_len)
, SendContext(introset.A, {}, this, parent)
, currentIntroSet(introset)
{
updatingIntroSet = false;
for(const auto intro : introset.I)
{
if(intro.expiresAt > m_NextIntro.expiresAt)
m_NextIntro = intro;
}
}
Endpoint::OutboundContext::~OutboundContext()
{
}
/// actually swap intros
void
Endpoint::OutboundContext::SwapIntros()
{
remoteIntro = m_NextIntro;
m_DataHandler->PutIntroFor(currentConvoTag, remoteIntro);
}
bool
Endpoint::OutboundContext::OnIntroSetUpdate(__attribute__((unused))
const Address& addr,
const IntroSet* i,
const RouterID& endpoint)
{
if(markedBad)
return true;
updatingIntroSet = false;
if(i)
{
if(currentIntroSet.T >= i->T)
{
LogInfo("introset is old, dropping");
return true;
}
auto now = Now();
if(i->IsExpired(now))
{
LogError("got expired introset from lookup from ", endpoint);
return true;
}
currentIntroSet = *i;
if(!ShiftIntroduction())
{
LogWarn("failed to pick new intro during introset update");
}
if(GetPathByRouter(m_NextIntro.router) == nullptr)
BuildOneAlignedTo(m_NextIntro.router);
else
SwapIntros();
}
else
++m_LookupFails;
return true;
}
bool
Endpoint::OutboundContext::ReadyToSend() const
{
return (!remoteIntro.router.IsZero())
&& GetPathByRouter(remoteIntro.router) != nullptr;
}
void
Endpoint::EnsurePathToSNode(const RouterID& snode, SNodeEnsureHook h)
{
@ -1424,436 +1191,18 @@ namespace llarp
5000, true);
}
bool
Endpoint::OutboundContext::BuildOneAlignedTo(const RouterID& remote)
{
LogInfo(Name(), " building path to ", remote);
auto nodedb = m_Endpoint->Router()->nodedb();
std::vector< RouterContact > hops;
hops.resize(numHops);
for(size_t hop = 0; hop < numHops; ++hop)
{
if(hop == 0)
{
if(!SelectHop(nodedb, hops[0], hops[0], 0, path::ePathRoleAny))
return false;
}
else if(hop == numHops - 1)
{
// last hop
if(!nodedb->Get(remote, hops[hop]))
return false;
}
// middle hop
else
{
size_t tries = 5;
do
{
nodedb->select_random_hop_excluding(hops[hop],
{hops[hop - 1].pubkey, remote});
--tries;
} while(m_Endpoint->Router()->routerProfiling().IsBadForPath(
hops[hop].pubkey)
&& tries > 0);
return tries > 0;
}
return false;
}
Build(hops);
return true;
}
bool
Endpoint::OutboundContext::MarkCurrentIntroBad(llarp_time_t now)
{
// insert bad intro
m_BadIntros[remoteIntro] = now;
// unconditional shift
bool shiftedRouter = false;
bool shiftedIntro = false;
// try same router
for(const auto& intro : currentIntroSet.I)
{
if(intro.ExpiresSoon(now))
continue;
if(router->routerProfiling().IsBadForPath(intro.router))
continue;
auto itr = m_BadIntros.find(intro);
if(itr == m_BadIntros.end() && intro.router == m_NextIntro.router)
{
shiftedIntro = true;
m_NextIntro = intro;
break;
}
}
if(!shiftedIntro)
{
// try any router
for(const auto& intro : currentIntroSet.I)
{
if(intro.ExpiresSoon(now))
continue;
auto itr = m_BadIntros.find(intro);
if(itr == m_BadIntros.end())
{
// TODO: this should always be true but idk if it really is
shiftedRouter = m_NextIntro.router != intro.router;
shiftedIntro = true;
m_NextIntro = intro;
break;
}
}
}
if(shiftedRouter)
{
lastShift = now;
BuildOneAlignedTo(m_NextIntro.router);
}
else if(shiftedIntro)
{
SwapIntros();
}
else
{
LogInfo(Name(), " updating introset");
UpdateIntroSet(true);
}
return shiftedIntro;
}
bool
Endpoint::OutboundContext::ShiftIntroduction(bool rebuild)
{
bool success = false;
auto now = Now();
if(now - lastShift < MIN_SHIFT_INTERVAL)
return false;
bool shifted = false;
// to find a intro on the same router as before
for(const auto& intro : currentIntroSet.I)
{
if(intro.ExpiresSoon(now))
continue;
if(m_BadIntros.find(intro) == m_BadIntros.end()
&& remoteIntro.router == intro.router)
{
m_NextIntro = intro;
return true;
}
}
for(const auto& intro : currentIntroSet.I)
{
m_Endpoint->EnsureRouterIsKnown(intro.router);
if(intro.ExpiresSoon(now))
continue;
if(m_BadIntros.find(intro) == m_BadIntros.end() && m_NextIntro != intro)
{
shifted = intro.router != m_NextIntro.router
|| (now < intro.expiresAt
&& intro.expiresAt - now
> 10 * 1000); // TODO: hardcoded value
m_NextIntro = intro;
success = true;
break;
}
}
if(shifted && rebuild)
{
lastShift = now;
BuildOneAlignedTo(m_NextIntro.router);
}
return success;
}
struct AsyncKeyExchange
{
Logic* logic;
Crypto* crypto;
SharedSecret sharedKey;
ServiceInfo remote;
const Identity& m_LocalIdentity;
ProtocolMessage msg;
ProtocolFrame frame;
Introduction intro;
const PQPubKey introPubKey;
Introduction remoteIntro;
std::function< void(ProtocolFrame&) > hook;
IDataHandler* handler;
ConvoTag tag;
AsyncKeyExchange(Logic* l, Crypto* c, const ServiceInfo& r,
const Identity& localident,
const PQPubKey& introsetPubKey,
const Introduction& remote, IDataHandler* h,
const ConvoTag& t)
: logic(l)
, crypto(c)
, remote(r)
, m_LocalIdentity(localident)
, introPubKey(introsetPubKey)
, remoteIntro(remote)
, handler(h)
, tag(t)
{
}
static void
Result(void* user)
{
AsyncKeyExchange* self = static_cast< AsyncKeyExchange* >(user);
// put values
self->handler->PutCachedSessionKeyFor(self->msg.tag, self->sharedKey);
self->handler->PutIntroFor(self->msg.tag, self->remoteIntro);
self->handler->PutSenderFor(self->msg.tag, self->remote);
self->handler->PutReplyIntroFor(self->msg.tag, self->msg.introReply);
self->hook(self->frame);
delete self;
}
/// given protocol message make protocol frame
static void
Encrypt(void* user)
{
AsyncKeyExchange* self = static_cast< AsyncKeyExchange* >(user);
// derive ntru session key component
SharedSecret K;
self->crypto->pqe_encrypt(self->frame.C, K, self->introPubKey);
// randomize Nonce
self->frame.N.Randomize();
// compure post handshake session key
// PKE (A, B, N)
SharedSecret sharedSecret;
using namespace std::placeholders;
path_dh_func dh_client =
std::bind(&Crypto::dh_client, self->crypto, _1, _2, _3, _4);
if(!self->m_LocalIdentity.KeyExchange(dh_client, sharedSecret,
self->remote, self->frame.N))
{
LogError("failed to derive x25519 shared key component");
}
std::array< byte_t, 64 > tmp = {{0}};
// K
std::copy(K.begin(), K.end(), tmp.begin());
// H (K + PKE(A, B, N))
std::copy(sharedSecret.begin(), sharedSecret.end(), tmp.begin() + 32);
self->crypto->shorthash(self->sharedKey, llarp_buffer_t(tmp));
// set tag
self->msg.tag = self->tag;
// set sender
self->msg.sender = self->m_LocalIdentity.pub;
// set version
self->msg.version = LLARP_PROTO_VERSION;
// set protocol
self->msg.proto = eProtocolTraffic;
// encrypt and sign
if(self->frame.EncryptAndSign(self->crypto, self->msg, K,
self->m_LocalIdentity))
self->logic->queue_job({self, &Result});
else
{
LogError("failed to encrypt and sign");
delete self;
}
}
};
void
Endpoint::EnsureReplyPath(const ServiceInfo& ident)
{
m_AddressToService[ident.Addr()] = ident;
}
void
Endpoint::OutboundContext::AsyncGenIntro(const llarp_buffer_t& payload,
__attribute__((unused))
ProtocolType t)
{
auto path = m_PathSet->GetPathByRouter(remoteIntro.router);
if(path == nullptr)
{
// try parent as fallback
path = m_Endpoint->GetPathByRouter(remoteIntro.router);
if(path == nullptr)
{
BuildOneAlignedTo(remoteIntro.router);
LogWarn(Name(), " dropping intro frame, no path to ",
remoteIntro.router);
return;
}
}
currentConvoTag.Randomize();
AsyncKeyExchange* ex = new AsyncKeyExchange(
m_Endpoint->RouterLogic(), m_Endpoint->Crypto(), remoteIdent,
m_Endpoint->GetIdentity(), currentIntroSet.K, remoteIntro,
m_DataHandler, currentConvoTag);
ex->hook = std::bind(&Endpoint::OutboundContext::Send, this,
std::placeholders::_1);
ex->msg.PutBuffer(payload);
ex->msg.introReply = path->intro;
ex->frame.F = ex->msg.introReply.pathID;
llarp_threadpool_queue_job(m_Endpoint->Worker(),
{ex, &AsyncKeyExchange::Encrypt});
}
std::string
Endpoint::OutboundContext::Name() const
{
return "OBContext:" + m_Endpoint->Name() + "-"
+ currentIntroSet.A.Addr().ToString();
}
void
Endpoint::OutboundContext::UpdateIntroSet(bool randomizePath)
{
if(updatingIntroSet || markedBad)
return;
auto addr = currentIntroSet.A.Addr();
path::Path* path = nullptr;
if(randomizePath)
path = m_Endpoint->PickRandomEstablishedPath();
else
path = m_Endpoint->GetEstablishedPathClosestTo(addr.as_array());
if(path)
{
HiddenServiceAddressLookup* job = new HiddenServiceAddressLookup(
m_Endpoint,
std::bind(&Endpoint::OutboundContext::OnIntroSetUpdate, this,
std::placeholders::_1, std::placeholders::_2,
std::placeholders::_3),
addr, m_Endpoint->GenTXID());
updatingIntroSet = job->SendRequestViaPath(path, m_Endpoint->Router());
}
else
{
LogWarn("Cannot update introset no path for outbound session to ",
currentIntroSet.A.Addr().ToString());
}
}
util::StatusObject
Endpoint::OutboundContext::ExtractStatus() const
{
auto obj = path::Builder::ExtractStatus();
obj.Put("currentConvoTag", currentConvoTag.ToHex());
obj.Put("remoteIntro", remoteIntro.ExtractStatus());
obj.Put("sessionCreatedAt", createdAt);
obj.Put("lastGoodSend", lastGoodSend);
obj.Put("seqno", sequenceNo);
obj.Put("markedBad", markedBad);
obj.Put("lastShift", lastShift);
obj.Put("remoteIdentity", remoteIdent.Addr().ToString());
obj.Put("currentRemoteIntroset", currentIntroSet.ExtractStatus());
obj.Put("nextIntro", m_NextIntro.ExtractStatus());
std::vector< util::StatusObject > badIntrosObj;
std::transform(m_BadIntros.begin(), m_BadIntros.end(),
std::back_inserter(badIntrosObj),
[](const auto& item) -> util::StatusObject {
util::StatusObject o{
{"count", item.second},
{"intro", item.first.ExtractStatus()}};
return o;
});
obj.Put("badIntros", badIntrosObj);
return obj;
}
bool
Endpoint::OutboundContext::Tick(llarp_time_t now)
{
// we are probably dead af
if(m_LookupFails > 16 || m_BuildFails > 10)
return true;
// check for expiration
if(remoteIntro.ExpiresSoon(now))
{
// shift intro if it expires "soon"
ShiftIntroduction();
}
// swap if we can
if(remoteIntro != m_NextIntro)
{
if(GetPathByRouter(m_NextIntro.router) != nullptr)
{
// we can safely set remoteIntro to the next one
SwapIntros();
LogInfo(Name(), " swapped intro");
}
}
// lookup router in intro if set and unknown
if(!remoteIntro.router.IsZero())
m_Endpoint->EnsureRouterIsKnown(remoteIntro.router);
// expire bad intros
auto itr = m_BadIntros.begin();
while(itr != m_BadIntros.end())
{
if(now - itr->second > path::default_lifetime)
itr = m_BadIntros.erase(itr);
else
++itr;
}
// send control message if we look too quiet
if(lastGoodSend)
{
if(now - lastGoodSend > (sendTimeout / 2))
{
if(!GetNewestPathByRouter(remoteIntro.router))
{
BuildOneAlignedTo(remoteIntro.router);
}
Encrypted< 64 > tmp;
tmp.Randomize();
llarp_buffer_t buf(tmp.data(), tmp.size());
AsyncEncryptAndSendTo(buf, eProtocolControl);
SharedSecret k;
if(currentConvoTag.IsZero())
return false;
return !m_DataHandler->HasConvoTag(currentConvoTag);
}
}
// if we are dead return true so we are removed
return lastGoodSend
? (now >= lastGoodSend && now - lastGoodSend > sendTimeout)
: (now >= createdAt && now - createdAt > connectTimeout);
}
bool
Endpoint::HasConvoTag(const ConvoTag& t) const
{
return m_Sessions.find(t) != m_Sessions.end();
}
bool
Endpoint::OutboundContext::SelectHop(llarp_nodedb* db,
const RouterContact& prev,
RouterContact& cur, size_t hop,
path::PathRole roles)
{
if(remoteIntro.router.IsZero())
{
SwapIntros();
}
if(hop == numHops - 1)
{
m_Endpoint->EnsureRouterIsKnown(remoteIntro.router);
if(db->Get(remoteIntro.router, cur))
return true;
++m_BuildFails;
return false;
}
else if(hop == numHops - 2)
{
return db->select_random_hop_excluding(
cur, {prev.pubkey, remoteIntro.router});
}
return path::Builder::SelectHop(db, prev, cur, hop, roles);
}
uint64_t
Endpoint::GetSeqNoForConvo(const ConvoTag& tag)
{
@ -1863,16 +1212,6 @@ namespace llarp
return ++(itr->second.seqno);
}
bool
Endpoint::OutboundContext::ShouldBuildMore(llarp_time_t now) const
{
if(markedBad)
return false;
if(path::Builder::ShouldBuildMore(now))
return true;
return !ReadyToSend();
}
bool
Endpoint::ShouldBuildMore(llarp_time_t now) const
{

@ -22,10 +22,9 @@ namespace llarp
{
namespace service
{
// forward declare
struct Context;
// forward declare
struct AsyncKeyExchange;
struct Context;
struct OutboundContext;
struct Endpoint : public path::Builder,
public ILookupHolder,
@ -210,107 +209,6 @@ namespace llarp
bool
ShouldBuildMore(llarp_time_t now) const override;
/// context needed to initiate an outbound hidden service session
struct OutboundContext : public path::Builder, public SendContext
{
OutboundContext(const IntroSet& introSet, Endpoint* parent);
~OutboundContext();
util::StatusObject
ExtractStatus() const;
bool
ShouldBundleRC() const override
{
return m_Endpoint->ShouldBundleRC();
}
bool
Stop() override;
bool
HandleDataDrop(path::Path* p, const PathID_t& dst, uint64_t s);
void
HandlePathDied(path::Path* p) override;
/// set to true if we are updating the remote introset right now
bool updatingIntroSet;
/// update the current selected intro to be a new best introduction
/// return true if we have changed intros
bool
ShiftIntroduction(bool rebuild = true) override;
/// mark the current remote intro as bad
bool
MarkCurrentIntroBad(llarp_time_t now) override;
/// return true if we are ready to send
bool
ReadyToSend() const;
bool
ShouldBuildMore(llarp_time_t now) const override;
/// tick internal state
/// return true to mark as dead
bool
Tick(llarp_time_t now);
/// return true if it's safe to remove ourselves
bool
IsDone(llarp_time_t now) const;
bool
CheckPathIsDead(path::Path* p, llarp_time_t dlt);
void
AsyncGenIntro(const llarp_buffer_t& payload, ProtocolType t) override;
/// issues a lookup to find the current intro set of the remote service
void
UpdateIntroSet(bool randomizePath) override;
bool
BuildOneAlignedTo(const RouterID& remote);
void
HandlePathBuilt(path::Path* path) override;
bool
SelectHop(llarp_nodedb* db, const RouterContact& prev,
RouterContact& cur, size_t hop,
llarp::path::PathRole roles) override;
bool
HandleHiddenServiceFrame(path::Path* p, const ProtocolFrame* frame);
std::string
Name() const override;
private:
/// swap remoteIntro with next intro
void
SwapIntros();
void
OnGeneratedIntroFrame(AsyncKeyExchange* k, PathID_t p);
bool
OnIntroSetUpdate(const Address& addr, const IntroSet* i,
const RouterID& endpoint);
uint64_t m_UpdateIntrosetTX = 0;
IntroSet currentIntroSet;
Introduction m_NextIntro;
std::unordered_map< Introduction, llarp_time_t, Introduction::Hash >
m_BadIntros;
llarp_time_t lastShift = 0;
uint16_t m_LookupFails = 0;
uint16_t m_BuildFails = 0;
};
// passed a sendto context when we have a path established otherwise
// nullptr if the path was not made before the timeout
using PathEnsureHook = std::function< void(Address, OutboundContext*) >;
@ -376,6 +274,9 @@ namespace llarp
virtual void
IntroSetPublished();
uint64_t
GenTXID();
protected:
/// parent context that owns this endpoint
Context* const context;
@ -423,9 +324,6 @@ namespace llarp
return false;
}
uint64_t
GenTXID();
protected:
IDataHandler* m_DataHandler = nullptr;
Identity m_Identity;

@ -0,0 +1,45 @@
#include <service/hidden_service_address_lookup.hpp>
#include <dht/messages/findintro.hpp>
#include <service/endpoint.hpp>
namespace llarp
{
namespace service
{
HiddenServiceAddressLookup::HiddenServiceAddressLookup(Endpoint* p,
HandlerFunc h,
const Address& addr,
uint64_t tx)
: IServiceLookup(p, tx, "HSLookup"), remote(addr), handle(h)
{
}
bool
HiddenServiceAddressLookup::HandleResponse(
const std::set< IntroSet >& results)
{
LogInfo("found ", results.size(), " for ", remote.ToString());
if(results.size() > 0)
{
IntroSet selected;
for(const auto& introset : results)
{
if(selected.OtherIsNewer(introset) && introset.A.Addr() == remote)
selected = introset;
}
return handle(remote, &selected, endpoint);
}
return handle(remote, nullptr, endpoint);
}
routing::IMessage*
HiddenServiceAddressLookup::BuildRequestMessage()
{
routing::DHTMessage* msg = new routing::DHTMessage();
msg->M.emplace_back(new dht::FindIntroMessage(txid, remote, 0));
return msg;
}
} // namespace service
} // namespace llarp

@ -0,0 +1,36 @@
#ifndef LLARP_SERVICE_HIDDEN_SERVICE_ADDRESS_LOOKUP_HPP
#define LLARP_SERVICE_HIDDEN_SERVICE_ADDRESS_LOOKUP_HPP
#include <messages/dht.hpp>
#include <service/IntroSet.hpp>
#include <service/lookup.hpp>
namespace llarp
{
namespace service
{
struct Endpoint;
struct HiddenServiceAddressLookup : public IServiceLookup
{
Address remote;
using HandlerFunc = std::function< bool(const Address&, const IntroSet*,
const RouterID&) >;
HandlerFunc handle;
HiddenServiceAddressLookup(Endpoint* p, HandlerFunc h,
const Address& addr, uint64_t tx);
~HiddenServiceAddressLookup()
{
}
bool
HandleResponse(const std::set< IntroSet >& results);
routing::IMessage*
BuildRequestMessage();
};
} // namespace service
} // namespace llarp
#endif

@ -0,0 +1,545 @@
#include <service/outbound_context.hpp>
#include <router/abstractrouter.hpp>
#include <service/async_key_exchange.hpp>
#include <service/hidden_service_address_lookup.hpp>
#include <service/endpoint.hpp>
#include <nodedb.hpp>
#include <profiling.hpp>
namespace llarp
{
namespace service
{
bool
OutboundContext::Stop()
{
markedBad = true;
return path::Builder::Stop();
}
bool
OutboundContext::IsDone(llarp_time_t now) const
{
(void)now;
return AvailablePaths(path::ePathRoleAny) == 0 && ShouldRemove();
}
bool
OutboundContext::ShouldBundleRC() const
{
return m_Endpoint->ShouldBundleRC();
}
bool
OutboundContext::HandleDataDrop(path::Path* p, const PathID_t& dst,
uint64_t seq)
{
// pick another intro
if(dst == remoteIntro.pathID && remoteIntro.router == p->Endpoint())
{
LogWarn(Name(), " message ", seq, " dropped by endpoint ",
p->Endpoint(), " via ", dst);
if(MarkCurrentIntroBad(Now()))
{
LogInfo(Name(), " switched intros to ", remoteIntro.router, " via ",
remoteIntro.pathID);
}
UpdateIntroSet(true);
}
return true;
}
OutboundContext::OutboundContext(const IntroSet& introset, Endpoint* parent)
: path::Builder(parent->Router(), parent->Router()->dht(), 3,
path::default_len)
, SendContext(introset.A, {}, this, parent)
, currentIntroSet(introset)
{
updatingIntroSet = false;
for(const auto intro : introset.I)
{
if(intro.expiresAt > m_NextIntro.expiresAt)
m_NextIntro = intro;
}
}
OutboundContext::~OutboundContext()
{
}
/// actually swap intros
void
OutboundContext::SwapIntros()
{
remoteIntro = m_NextIntro;
m_DataHandler->PutIntroFor(currentConvoTag, remoteIntro);
}
bool
OutboundContext::OnIntroSetUpdate(__attribute__((unused))
const Address& addr,
const IntroSet* i,
const RouterID& endpoint)
{
if(markedBad)
return true;
updatingIntroSet = false;
if(i)
{
if(currentIntroSet.T >= i->T)
{
LogInfo("introset is old, dropping");
return true;
}
auto now = Now();
if(i->IsExpired(now))
{
LogError("got expired introset from lookup from ", endpoint);
return true;
}
currentIntroSet = *i;
if(!ShiftIntroduction())
{
LogWarn("failed to pick new intro during introset update");
}
if(GetPathByRouter(m_NextIntro.router) == nullptr)
BuildOneAlignedTo(m_NextIntro.router);
else
SwapIntros();
}
else
++m_LookupFails;
return true;
}
bool
OutboundContext::ReadyToSend() const
{
return (!remoteIntro.router.IsZero())
&& GetPathByRouter(remoteIntro.router) != nullptr;
}
void
OutboundContext::HandlePathBuilt(path::Path* p)
{
path::Builder::HandlePathBuilt(p);
/// don't use it if we are marked bad
if(markedBad)
return;
p->SetDataHandler(std::bind(&OutboundContext::HandleHiddenServiceFrame,
this, std::placeholders::_1,
std::placeholders::_2));
p->SetDropHandler(std::bind(&OutboundContext::HandleDataDrop, this,
std::placeholders::_1, std::placeholders::_2,
std::placeholders::_3));
}
bool
OutboundContext::BuildOneAlignedTo(const RouterID& remote)
{
LogInfo(Name(), " building path to ", remote);
auto nodedb = m_Endpoint->Router()->nodedb();
std::vector< RouterContact > hops;
hops.resize(numHops);
for(size_t hop = 0; hop < numHops; ++hop)
{
if(hop == 0)
{
if(!SelectHop(nodedb, hops[0], hops[0], 0, path::ePathRoleAny))
return false;
}
else if(hop == numHops - 1)
{
// last hop
if(!nodedb->Get(remote, hops[hop]))
return false;
}
// middle hop
else
{
size_t tries = 5;
do
{
nodedb->select_random_hop_excluding(hops[hop],
{hops[hop - 1].pubkey, remote});
--tries;
} while(m_Endpoint->Router()->routerProfiling().IsBadForPath(
hops[hop].pubkey)
&& tries > 0);
return tries > 0;
}
return false;
}
Build(hops);
return true;
}
void
OutboundContext::AsyncGenIntro(const llarp_buffer_t& payload,
__attribute__((unused)) ProtocolType t)
{
auto path = m_PathSet->GetPathByRouter(remoteIntro.router);
if(path == nullptr)
{
// try parent as fallback
path = m_Endpoint->GetPathByRouter(remoteIntro.router);
if(path == nullptr)
{
BuildOneAlignedTo(remoteIntro.router);
LogWarn(Name(), " dropping intro frame, no path to ",
remoteIntro.router);
return;
}
}
currentConvoTag.Randomize();
AsyncKeyExchange* ex = new AsyncKeyExchange(
m_Endpoint->RouterLogic(), m_Endpoint->Crypto(), remoteIdent,
m_Endpoint->GetIdentity(), currentIntroSet.K, remoteIntro,
m_DataHandler, currentConvoTag);
ex->hook = std::bind(&OutboundContext::Send, this, std::placeholders::_1);
ex->msg.PutBuffer(payload);
ex->msg.introReply = path->intro;
ex->frame.F = ex->msg.introReply.pathID;
llarp_threadpool_queue_job(m_Endpoint->Worker(),
{ex, &AsyncKeyExchange::Encrypt});
}
std::string
OutboundContext::Name() const
{
return "OBContext:" + m_Endpoint->Name() + "-"
+ currentIntroSet.A.Addr().ToString();
}
void
OutboundContext::UpdateIntroSet(bool randomizePath)
{
if(updatingIntroSet || markedBad)
return;
auto addr = currentIntroSet.A.Addr();
path::Path* path = nullptr;
if(randomizePath)
path = m_Endpoint->PickRandomEstablishedPath();
else
path = m_Endpoint->GetEstablishedPathClosestTo(addr.as_array());
if(path)
{
HiddenServiceAddressLookup* job = new HiddenServiceAddressLookup(
m_Endpoint,
std::bind(&OutboundContext::OnIntroSetUpdate, this,
std::placeholders::_1, std::placeholders::_2,
std::placeholders::_3),
addr, m_Endpoint->GenTXID());
updatingIntroSet = job->SendRequestViaPath(path, m_Endpoint->Router());
}
else
{
LogWarn("Cannot update introset no path for outbound session to ",
currentIntroSet.A.Addr().ToString());
}
}
util::StatusObject
OutboundContext::ExtractStatus() const
{
auto obj = path::Builder::ExtractStatus();
obj.Put("currentConvoTag", currentConvoTag.ToHex());
obj.Put("remoteIntro", remoteIntro.ExtractStatus());
obj.Put("sessionCreatedAt", createdAt);
obj.Put("lastGoodSend", lastGoodSend);
obj.Put("seqno", sequenceNo);
obj.Put("markedBad", markedBad);
obj.Put("lastShift", lastShift);
obj.Put("remoteIdentity", remoteIdent.Addr().ToString());
obj.Put("currentRemoteIntroset", currentIntroSet.ExtractStatus());
obj.Put("nextIntro", m_NextIntro.ExtractStatus());
std::vector< util::StatusObject > badIntrosObj;
std::transform(m_BadIntros.begin(), m_BadIntros.end(),
std::back_inserter(badIntrosObj),
[](const auto& item) -> util::StatusObject {
util::StatusObject o{
{"count", item.second},
{"intro", item.first.ExtractStatus()}};
return o;
});
obj.Put("badIntros", badIntrosObj);
return obj;
}
bool
OutboundContext::Tick(llarp_time_t now)
{
// we are probably dead af
if(m_LookupFails > 16 || m_BuildFails > 10)
return true;
// check for expiration
if(remoteIntro.ExpiresSoon(now))
{
// shift intro if it expires "soon"
ShiftIntroduction();
}
// swap if we can
if(remoteIntro != m_NextIntro)
{
if(GetPathByRouter(m_NextIntro.router) != nullptr)
{
// we can safely set remoteIntro to the next one
SwapIntros();
LogInfo(Name(), " swapped intro");
}
}
// lookup router in intro if set and unknown
if(!remoteIntro.router.IsZero())
m_Endpoint->EnsureRouterIsKnown(remoteIntro.router);
// expire bad intros
auto itr = m_BadIntros.begin();
while(itr != m_BadIntros.end())
{
if(now - itr->second > path::default_lifetime)
itr = m_BadIntros.erase(itr);
else
++itr;
}
// send control message if we look too quiet
if(lastGoodSend)
{
if(now - lastGoodSend > (sendTimeout / 2))
{
if(!GetNewestPathByRouter(remoteIntro.router))
{
BuildOneAlignedTo(remoteIntro.router);
}
Encrypted< 64 > tmp;
tmp.Randomize();
llarp_buffer_t buf(tmp.data(), tmp.size());
AsyncEncryptAndSendTo(buf, eProtocolControl);
SharedSecret k;
if(currentConvoTag.IsZero())
return false;
return !m_DataHandler->HasConvoTag(currentConvoTag);
}
}
// if we are dead return true so we are removed
return lastGoodSend
? (now >= lastGoodSend && now - lastGoodSend > sendTimeout)
: (now >= createdAt && now - createdAt > connectTimeout);
}
bool
OutboundContext::SelectHop(llarp_nodedb* db, const RouterContact& prev,
RouterContact& cur, size_t hop,
path::PathRole roles)
{
if(remoteIntro.router.IsZero())
{
SwapIntros();
}
if(hop == numHops - 1)
{
m_Endpoint->EnsureRouterIsKnown(remoteIntro.router);
if(db->Get(remoteIntro.router, cur))
return true;
++m_BuildFails;
return false;
}
else if(hop == numHops - 2)
{
return db->select_random_hop_excluding(
cur, {prev.pubkey, remoteIntro.router});
}
return path::Builder::SelectHop(db, prev, cur, hop, roles);
}
bool
OutboundContext::ShouldBuildMore(llarp_time_t now) const
{
if(markedBad)
return false;
if(path::Builder::ShouldBuildMore(now))
return true;
return !ReadyToSend();
}
bool
OutboundContext::MarkCurrentIntroBad(llarp_time_t now)
{
// insert bad intro
m_BadIntros[remoteIntro] = now;
// unconditional shift
bool shiftedRouter = false;
bool shiftedIntro = false;
// try same router
for(const auto& intro : currentIntroSet.I)
{
if(intro.ExpiresSoon(now))
continue;
if(router->routerProfiling().IsBadForPath(intro.router))
continue;
auto itr = m_BadIntros.find(intro);
if(itr == m_BadIntros.end() && intro.router == m_NextIntro.router)
{
shiftedIntro = true;
m_NextIntro = intro;
break;
}
}
if(!shiftedIntro)
{
// try any router
for(const auto& intro : currentIntroSet.I)
{
if(intro.ExpiresSoon(now))
continue;
auto itr = m_BadIntros.find(intro);
if(itr == m_BadIntros.end())
{
// TODO: this should always be true but idk if it really is
shiftedRouter = m_NextIntro.router != intro.router;
shiftedIntro = true;
m_NextIntro = intro;
break;
}
}
}
if(shiftedRouter)
{
lastShift = now;
BuildOneAlignedTo(m_NextIntro.router);
}
else if(shiftedIntro)
{
SwapIntros();
}
else
{
LogInfo(Name(), " updating introset");
UpdateIntroSet(true);
}
return shiftedIntro;
}
bool
OutboundContext::ShiftIntroduction(bool rebuild)
{
bool success = false;
auto now = Now();
if(now - lastShift < MIN_SHIFT_INTERVAL)
return false;
bool shifted = false;
// to find a intro on the same router as before
for(const auto& intro : currentIntroSet.I)
{
if(intro.ExpiresSoon(now))
continue;
if(m_BadIntros.find(intro) == m_BadIntros.end()
&& remoteIntro.router == intro.router)
{
m_NextIntro = intro;
return true;
}
}
for(const auto& intro : currentIntroSet.I)
{
m_Endpoint->EnsureRouterIsKnown(intro.router);
if(intro.ExpiresSoon(now))
continue;
if(m_BadIntros.find(intro) == m_BadIntros.end() && m_NextIntro != intro)
{
shifted = intro.router != m_NextIntro.router
|| (now < intro.expiresAt
&& intro.expiresAt - now
> 10 * 1000); // TODO: hardcoded value
m_NextIntro = intro;
success = true;
break;
}
}
if(shifted && rebuild)
{
lastShift = now;
BuildOneAlignedTo(m_NextIntro.router);
}
return success;
}
void
OutboundContext::HandlePathDied(path::Path* path)
{
// unconditionally update introset
UpdateIntroSet(true);
const RouterID endpoint(path->Endpoint());
// if a path to our current intro died...
if(endpoint == remoteIntro.router)
{
// figure out how many paths to this router we have
size_t num = 0;
ForEachPath([&](path::Path* p) {
if(p->Endpoint() == endpoint && p->IsReady())
++num;
});
// if we have more than two then we are probably fine
if(num > 2)
return;
// if we have one working one ...
if(num == 1)
{
num = 0;
ForEachPath([&](path::Path* p) {
if(p->Endpoint() == endpoint)
++num;
});
// if we have 2 or more established or pending don't do anything
if(num > 2)
return;
BuildOneAlignedTo(endpoint);
}
else if(num == 0)
{
// we have no paths to this router right now
// hop off it
Introduction picked;
// get the latest intro that isn't on that endpoint
for(const auto& intro : currentIntroSet.I)
{
if(intro.router == endpoint)
continue;
if(intro.expiresAt > picked.expiresAt)
picked = intro;
}
// we got nothing
if(picked.router.IsZero())
{
return;
}
m_NextIntro = picked;
// check if we have a path to this router
num = 0;
ForEachPath([&](path::Path* p) {
if(p->Endpoint() == m_NextIntro.router)
++num;
});
// build a path if one isn't already pending build or established
if(num == 0)
BuildOneAlignedTo(m_NextIntro.router);
SwapIntros();
}
}
}
bool
OutboundContext::HandleHiddenServiceFrame(path::Path* p,
const ProtocolFrame* frame)
{
return m_Endpoint->HandleHiddenServiceFrame(p, frame);
}
} // namespace service
} // namespace llarp

@ -0,0 +1,116 @@
#ifndef LLARP_SERVICE_OUTBOUND_CONTEXT_HPP
#define LLARP_SERVICE_OUTBOUND_CONTEXT_HPP
#include <path/pathbuilder.hpp>
#include <service/sendcontext.hpp>
#include <util/status.hpp>
#include <unordered_map>
namespace llarp
{
namespace service
{
struct AsyncKeyExchange;
struct Endpoint;
/// context needed to initiate an outbound hidden service session
struct OutboundContext : public path::Builder, public SendContext
{
OutboundContext(const IntroSet& introSet, Endpoint* parent);
~OutboundContext();
util::StatusObject
ExtractStatus() const;
bool
ShouldBundleRC() const override;
bool
Stop() override;
bool
HandleDataDrop(path::Path* p, const PathID_t& dst, uint64_t s);
void
HandlePathDied(path::Path* p) override;
/// set to true if we are updating the remote introset right now
bool updatingIntroSet;
/// update the current selected intro to be a new best introduction
/// return true if we have changed intros
bool
ShiftIntroduction(bool rebuild = true) override;
/// mark the current remote intro as bad
bool
MarkCurrentIntroBad(llarp_time_t now) override;
/// return true if we are ready to send
bool
ReadyToSend() const;
bool
ShouldBuildMore(llarp_time_t now) const override;
/// tick internal state
/// return true to mark as dead
bool
Tick(llarp_time_t now);
/// return true if it's safe to remove ourselves
bool
IsDone(llarp_time_t now) const;
bool
CheckPathIsDead(path::Path* p, llarp_time_t dlt);
void
AsyncGenIntro(const llarp_buffer_t& payload, ProtocolType t) override;
/// issues a lookup to find the current intro set of the remote service
void
UpdateIntroSet(bool randomizePath) override;
bool
BuildOneAlignedTo(const RouterID& remote);
void
HandlePathBuilt(path::Path* path) override;
bool
SelectHop(llarp_nodedb* db, const RouterContact& prev, RouterContact& cur,
size_t hop, llarp::path::PathRole roles) override;
bool
HandleHiddenServiceFrame(path::Path* p, const ProtocolFrame* frame);
std::string
Name() const override;
private:
/// swap remoteIntro with next intro
void
SwapIntros();
void
OnGeneratedIntroFrame(AsyncKeyExchange* k, PathID_t p);
bool
OnIntroSetUpdate(const Address& addr, const IntroSet* i,
const RouterID& endpoint);
uint64_t m_UpdateIntrosetTX = 0;
IntroSet currentIntroSet;
Introduction m_NextIntro;
std::unordered_map< Introduction, llarp_time_t, Introduction::Hash >
m_BadIntros;
llarp_time_t lastShift = 0;
uint16_t m_LookupFails = 0;
uint16_t m_BuildFails = 0;
};
} // namespace service
} // namespace llarp
#endif
Loading…
Cancel
Save