|
|
|
@ -400,7 +400,7 @@ namespace llarp
|
|
|
|
|
{
|
|
|
|
|
for (const auto& item : Sessions())
|
|
|
|
|
{
|
|
|
|
|
if (item.second.remote.Addr() == addr && item.second.inbound)
|
|
|
|
|
if (item.second.remote.Addr() == addr and item.second.inbound)
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
return false;
|
|
|
|
@ -420,9 +420,24 @@ namespace llarp
|
|
|
|
|
void
|
|
|
|
|
Endpoint::PutSenderFor(const ConvoTag& tag, const ServiceInfo& info, bool inbound)
|
|
|
|
|
{
|
|
|
|
|
if (info.Addr().IsZero())
|
|
|
|
|
{
|
|
|
|
|
LogError(Name(), " cannot put invalid service info ", info, " T=", tag);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
auto itr = Sessions().find(tag);
|
|
|
|
|
if (itr == Sessions().end())
|
|
|
|
|
{
|
|
|
|
|
if (WantsOutboundSession(info.Addr()) and inbound)
|
|
|
|
|
{
|
|
|
|
|
LogWarn(
|
|
|
|
|
Name(),
|
|
|
|
|
" not adding sender for ",
|
|
|
|
|
info.Addr(),
|
|
|
|
|
" session is inbound and we want outbound T=",
|
|
|
|
|
tag);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
itr = Sessions().emplace(tag, Session{}).first;
|
|
|
|
|
itr->second.inbound = inbound;
|
|
|
|
|
itr->second.remote = info;
|
|
|
|
@ -587,7 +602,10 @@ namespace llarp
|
|
|
|
|
bool
|
|
|
|
|
Endpoint::PublishIntroSet(const EncryptedIntroSet& introset, AbstractRouter* r)
|
|
|
|
|
{
|
|
|
|
|
const auto paths = GetManyPathsWithUniqueEndpoints(this, llarp::dht::IntroSetRelayRedundancy);
|
|
|
|
|
const auto paths = GetManyPathsWithUniqueEndpoints(
|
|
|
|
|
this,
|
|
|
|
|
llarp::dht::IntroSetRelayRedundancy,
|
|
|
|
|
dht::Key_t{introset.derivedSigningKey.as_array()});
|
|
|
|
|
|
|
|
|
|
if (paths.size() != llarp::dht::IntroSetRelayRedundancy)
|
|
|
|
|
{
|
|
|
|
@ -917,6 +935,7 @@ namespace llarp
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
constexpr size_t min_unique_lns_endpoints = 2;
|
|
|
|
|
constexpr size_t max_unique_lns_endpoints = 7;
|
|
|
|
|
|
|
|
|
|
// not enough paths
|
|
|
|
|
if (paths.size() < min_unique_lns_endpoints)
|
|
|
|
@ -951,10 +970,16 @@ namespace llarp
|
|
|
|
|
handler(result);
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
// pick up to max_unique_lns_endpoints random paths to do lookups from
|
|
|
|
|
std::vector<path::Path_ptr> chosenpaths;
|
|
|
|
|
chosenpaths.insert(chosenpaths.begin(), paths.begin(), paths.end());
|
|
|
|
|
std::shuffle(chosenpaths.begin(), chosenpaths.end(), CSRNG{});
|
|
|
|
|
chosenpaths.resize(std::min(paths.size(), max_unique_lns_endpoints));
|
|
|
|
|
|
|
|
|
|
auto resultHandler =
|
|
|
|
|
m_state->lnsTracker.MakeResultHandler(name, paths.size(), maybeInvalidateCache);
|
|
|
|
|
m_state->lnsTracker.MakeResultHandler(name, chosenpaths.size(), maybeInvalidateCache);
|
|
|
|
|
|
|
|
|
|
for (const auto& path : paths)
|
|
|
|
|
for (const auto& path : chosenpaths)
|
|
|
|
|
{
|
|
|
|
|
LogInfo(Name(), " lookup ", name, " from ", path->Endpoint());
|
|
|
|
|
auto job = new LookupNameJob{this, GenTXID(), name, resultHandler};
|
|
|
|
@ -1075,18 +1100,15 @@ namespace llarp
|
|
|
|
|
|
|
|
|
|
bool
|
|
|
|
|
Endpoint::HandleDataMessage(
|
|
|
|
|
path::Path_ptr path, const PathID_t from, std::shared_ptr<ProtocolMessage> msg)
|
|
|
|
|
path::Path_ptr, const PathID_t from, std::shared_ptr<ProtocolMessage> msg)
|
|
|
|
|
{
|
|
|
|
|
msg->sender.UpdateAddr();
|
|
|
|
|
PutSenderFor(msg->tag, msg->sender, true);
|
|
|
|
|
PutReplyIntroFor(msg->tag, path->intro);
|
|
|
|
|
Introduction intro;
|
|
|
|
|
intro.pathID = from;
|
|
|
|
|
intro.router = PubKey{path->Endpoint()};
|
|
|
|
|
intro.expiresAt = std::min(path->ExpireTime(), msg->introReply.expiresAt);
|
|
|
|
|
intro.latency = path->intro.latency;
|
|
|
|
|
PutIntroFor(msg->tag, intro);
|
|
|
|
|
PutReplyIntroFor(msg->tag, path->intro);
|
|
|
|
|
Introduction intro = msg->introReply;
|
|
|
|
|
if (HasInboundConvo(msg->sender.Addr()))
|
|
|
|
|
{
|
|
|
|
|
intro.pathID = from;
|
|
|
|
|
}
|
|
|
|
|
PutReplyIntroFor(msg->tag, intro);
|
|
|
|
|
ConvoTagRX(msg->tag);
|
|
|
|
|
return ProcessDataMessage(msg);
|
|
|
|
|
}
|
|
|
|
@ -1429,7 +1451,7 @@ namespace llarp
|
|
|
|
|
if (not m_IntrosetLookupFilter.Insert(remote))
|
|
|
|
|
return true;
|
|
|
|
|
|
|
|
|
|
const auto paths = GetManyPathsWithUniqueEndpoints(this, NumParallelLookups, remote.ToKey());
|
|
|
|
|
const auto paths = GetManyPathsWithUniqueEndpoints(this, NumParallelLookups);
|
|
|
|
|
|
|
|
|
|
using namespace std::placeholders;
|
|
|
|
|
const dht::Key_t location = remote.ToKey();
|
|
|
|
@ -1453,7 +1475,7 @@ namespace llarp
|
|
|
|
|
path->Endpoint(),
|
|
|
|
|
order,
|
|
|
|
|
GenTXID(),
|
|
|
|
|
timeout + (2 * path->intro.latency));
|
|
|
|
|
timeout + (2 * path->intro.latency) + IntrosetLookupGraceInterval);
|
|
|
|
|
LogInfo(
|
|
|
|
|
"doing lookup for ",
|
|
|
|
|
remote,
|
|
|
|
@ -1791,12 +1813,9 @@ namespace llarp
|
|
|
|
|
LogTrace("SendToOrQueue: dropping because data.sz == 0");
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// inbound conversation
|
|
|
|
|
const auto now = Now();
|
|
|
|
|
|
|
|
|
|
if (HasInboundConvo(remote))
|
|
|
|
|
{
|
|
|
|
|
// inbound conversation
|
|
|
|
|
LogTrace("Have inbound convo");
|
|
|
|
|
auto transfer = std::make_shared<routing::PathTransferMessage>();
|
|
|
|
|
ProtocolFrame& f = transfer->T;
|
|
|
|
@ -1805,87 +1824,86 @@ namespace llarp
|
|
|
|
|
if (const auto maybe = GetBestConvoTagFor(remote))
|
|
|
|
|
{
|
|
|
|
|
// the remote guy's intro
|
|
|
|
|
Introduction remoteIntro;
|
|
|
|
|
Introduction replyPath;
|
|
|
|
|
Introduction replyIntro;
|
|
|
|
|
SharedSecret K;
|
|
|
|
|
const auto tag = *maybe;
|
|
|
|
|
|
|
|
|
|
if (!GetCachedSessionKeyFor(tag, K))
|
|
|
|
|
if (not GetCachedSessionKeyFor(tag, K))
|
|
|
|
|
{
|
|
|
|
|
LogError("no cached key for T=", tag);
|
|
|
|
|
LogError(Name(), " no cached key for inbound session from ", remote, " T=", tag);
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
if (!GetIntroFor(tag, remoteIntro))
|
|
|
|
|
if (not GetReplyIntroFor(tag, replyIntro))
|
|
|
|
|
{
|
|
|
|
|
LogError("no intro for T=", tag);
|
|
|
|
|
LogError(Name(), "no reply intro for inbound session from ", remote, " T=", tag);
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
if (GetReplyIntroFor(tag, replyPath))
|
|
|
|
|
// get path for intro
|
|
|
|
|
auto p = GetPathByRouter(replyIntro.router);
|
|
|
|
|
|
|
|
|
|
if (not p)
|
|
|
|
|
{
|
|
|
|
|
// get path for intro
|
|
|
|
|
ForEachPath([&](path::Path_ptr path) {
|
|
|
|
|
if (path->intro == replyPath)
|
|
|
|
|
{
|
|
|
|
|
p = path;
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
if (p && p->ExpiresSoon(now) && path->IsReady()
|
|
|
|
|
&& path->intro.router == replyPath.router)
|
|
|
|
|
{
|
|
|
|
|
p = path;
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
LogWarn(
|
|
|
|
|
Name(),
|
|
|
|
|
" has no path for intro router ",
|
|
|
|
|
RouterID{replyIntro.router},
|
|
|
|
|
" for inbound convo T=",
|
|
|
|
|
tag);
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
p = GetPathByRouter(remoteIntro.router);
|
|
|
|
|
|
|
|
|
|
if (p)
|
|
|
|
|
f.T = tag;
|
|
|
|
|
// TODO: check expiration of our end
|
|
|
|
|
auto m = std::make_shared<ProtocolMessage>(f.T);
|
|
|
|
|
m->PutBuffer(data);
|
|
|
|
|
f.N.Randomize();
|
|
|
|
|
f.C.Zero();
|
|
|
|
|
f.R = 0;
|
|
|
|
|
transfer->Y.Randomize();
|
|
|
|
|
m->proto = t;
|
|
|
|
|
m->introReply = p->intro;
|
|
|
|
|
m->sender = m_Identity.pub;
|
|
|
|
|
if (auto maybe = GetSeqNoForConvo(f.T))
|
|
|
|
|
{
|
|
|
|
|
f.T = tag;
|
|
|
|
|
// TODO: check expiration of our end
|
|
|
|
|
auto m = std::make_shared<ProtocolMessage>(f.T);
|
|
|
|
|
m->PutBuffer(data);
|
|
|
|
|
f.N.Randomize();
|
|
|
|
|
f.C.Zero();
|
|
|
|
|
f.R = 0;
|
|
|
|
|
transfer->Y.Randomize();
|
|
|
|
|
m->proto = t;
|
|
|
|
|
m->introReply = p->intro;
|
|
|
|
|
PutReplyIntroFor(f.T, m->introReply);
|
|
|
|
|
m->sender = m_Identity.pub;
|
|
|
|
|
if (auto maybe = GetSeqNoForConvo(f.T))
|
|
|
|
|
{
|
|
|
|
|
m->seqno = *maybe;
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
LogWarn(Name(), " no session T=", f.T);
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
f.S = m->seqno;
|
|
|
|
|
f.F = m->introReply.pathID;
|
|
|
|
|
transfer->P = remoteIntro.pathID;
|
|
|
|
|
auto self = this;
|
|
|
|
|
Router()->QueueWork([transfer, p, m, K, self]() {
|
|
|
|
|
if (not transfer->T.EncryptAndSign(*m, K, self->m_Identity))
|
|
|
|
|
{
|
|
|
|
|
LogError("failed to encrypt and sign");
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
self->m_SendQueue.tryPushBack(SendEvent_t{transfer, p});
|
|
|
|
|
});
|
|
|
|
|
return true;
|
|
|
|
|
m->seqno = *maybe;
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
LogTrace("SendToOrQueue failed to return via inbound: no path");
|
|
|
|
|
LogWarn(Name(), " could not set sequence number, no session T=", f.T);
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
f.S = m->seqno;
|
|
|
|
|
f.F = p->intro.pathID;
|
|
|
|
|
transfer->P = replyIntro.pathID;
|
|
|
|
|
auto self = this;
|
|
|
|
|
Router()->QueueWork([transfer, p, m, K, self]() {
|
|
|
|
|
if (not transfer->T.EncryptAndSign(*m, K, self->m_Identity))
|
|
|
|
|
{
|
|
|
|
|
LogError("failed to encrypt and sign for sessionn T=", transfer->T.T);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
self->m_SendQueue.tryPushBack(SendEvent_t{transfer, p});
|
|
|
|
|
});
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
LogWarn("Have inbound convo from ", remote, " but get-best returned none; bug?");
|
|
|
|
|
LogWarn(
|
|
|
|
|
Name(),
|
|
|
|
|
" SendToOrQueue on inbound convo from ",
|
|
|
|
|
remote,
|
|
|
|
|
" but get-best returned none; bug?");
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if (not WantsOutboundSession(remote))
|
|
|
|
|
{
|
|
|
|
|
LogWarn(
|
|
|
|
|
Name(),
|
|
|
|
|
" SendToOrQueue on outbound session we did not mark as outbound (remote=",
|
|
|
|
|
remote,
|
|
|
|
|
")");
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Failed to find a suitable inbound convo, look for outbound
|
|
|
|
|
LogTrace("Not an inbound convo");
|
|
|
|
@ -1900,34 +1918,28 @@ namespace llarp
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
// if we want to make an outbound session
|
|
|
|
|
if (WantsOutboundSession(remote))
|
|
|
|
|
{
|
|
|
|
|
LogTrace("Making an outbound session and queuing the data");
|
|
|
|
|
// add pending traffic
|
|
|
|
|
auto& traffic = m_state->m_PendingTraffic;
|
|
|
|
|
traffic[remote].emplace_back(data, t);
|
|
|
|
|
EnsurePathToService(
|
|
|
|
|
remote,
|
|
|
|
|
[self = this](Address addr, OutboundContext* ctx) {
|
|
|
|
|
if (ctx)
|
|
|
|
|
{
|
|
|
|
|
for (auto& pending : self->m_state->m_PendingTraffic[addr])
|
|
|
|
|
{
|
|
|
|
|
ctx->AsyncEncryptAndSendTo(pending.Buffer(), pending.protocol);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
LogTrace("Making an outbound session and queuing the data");
|
|
|
|
|
// add pending traffic
|
|
|
|
|
auto& traffic = m_state->m_PendingTraffic;
|
|
|
|
|
traffic[remote].emplace_back(data, t);
|
|
|
|
|
EnsurePathToService(
|
|
|
|
|
remote,
|
|
|
|
|
[self = this](Address addr, OutboundContext* ctx) {
|
|
|
|
|
if (ctx)
|
|
|
|
|
{
|
|
|
|
|
for (auto& pending : self->m_state->m_PendingTraffic[addr])
|
|
|
|
|
{
|
|
|
|
|
LogWarn("no path made to ", addr);
|
|
|
|
|
ctx->AsyncEncryptAndSendTo(pending.Buffer(), pending.protocol);
|
|
|
|
|
}
|
|
|
|
|
self->m_state->m_PendingTraffic.erase(addr);
|
|
|
|
|
},
|
|
|
|
|
PathAlignmentTimeout());
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
LogDebug("SendOrQueue failed: no inbound/outbound sessions");
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
LogWarn("no path made to ", addr);
|
|
|
|
|
}
|
|
|
|
|
self->m_state->m_PendingTraffic.erase(addr);
|
|
|
|
|
},
|
|
|
|
|
PathAlignmentTimeout());
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
bool
|
|
|
|
|