try fixing handover and add snapp traffic to multithreaded crypto workers

pull/850/head
jeff 5 years ago
parent 5863e33825
commit 6c2ebbb925

@ -108,6 +108,34 @@ namespace llarp
return _role;
}
struct Hash
{
size_t
operator()(const Path& p) const
{
const auto& tx = p.hops[0].txID;
const auto& rx = p.hops[0].rxID;
const auto& r = p.hops[0].upstream;
const size_t rhash =
std::accumulate(r.begin(), r.end(), 0, std::bit_xor< size_t >());
return std::accumulate(rx.begin(), rx.begin(),
std::accumulate(tx.begin(), tx.end(), rhash,
std::bit_xor< size_t >()),
std::bit_xor< size_t >());
}
};
struct Ptr_Hash
{
size_t
operator()(const std::shared_ptr< Path >& p) const
{
if(p == nullptr)
return 0;
return Hash{}(*p);
}
};
bool
operator<(const Path& other) const
{

@ -17,6 +17,7 @@ namespace llarp
: logic(std::move(l))
, m_remote(std::move(r))
, m_LocalIdentity(localident)
, frame(std::make_shared< ProtocolFrame >())
, introPubKey(introsetPubKey)
, remoteIntro(remote)
, handler(h)
@ -26,34 +27,31 @@ namespace llarp
}
void
AsyncKeyExchange::Result(void* user)
AsyncKeyExchange::Result(std::shared_ptr< AsyncKeyExchange > self)
{
auto* self = static_cast< AsyncKeyExchange* >(user);
// put values
self->handler->PutSenderFor(self->msg.tag, self->m_remote, false);
self->handler->PutCachedSessionKeyFor(self->msg.tag, self->sharedKey);
self->handler->PutIntroFor(self->msg.tag, self->remoteIntro);
self->handler->PutReplyIntroFor(self->msg.tag, self->msg.introReply);
self->hook(self->frame);
delete self;
}
void
AsyncKeyExchange::Encrypt(void* user)
AsyncKeyExchange::Encrypt(std::shared_ptr< AsyncKeyExchange > self)
{
auto* self = static_cast< AsyncKeyExchange* >(user);
// derive ntru session key component
SharedSecret K;
auto crypto = CryptoManager::instance();
crypto->pqe_encrypt(self->frame.C, K, self->introPubKey);
crypto->pqe_encrypt(self->frame->C, K, self->introPubKey);
// randomize Nonce
self->frame.N.Randomize();
self->frame->N.Randomize();
// compure post handshake session key
// PKE (A, B, N)
SharedSecret sharedSecret;
path_dh_func dh_client = util::memFn(&Crypto::dh_client, crypto);
if(!self->m_LocalIdentity.KeyExchange(dh_client, sharedSecret,
self->m_remote, self->frame.N))
self->m_remote, self->frame->N))
{
LogError("failed to derive x25519 shared key component");
}
@ -70,12 +68,11 @@ namespace llarp
// set version
self->msg.version = LLARP_PROTO_VERSION;
// encrypt and sign
if(self->frame.EncryptAndSign(self->msg, K, self->m_LocalIdentity))
self->logic->queue_job({self, &Result});
if(self->frame->EncryptAndSign(self->msg, K, self->m_LocalIdentity))
self->logic->queue_func(std::bind(&AsyncKeyExchange::Result, self));
else
{
LogError("failed to encrypt and sign");
delete self;
}
}
} // namespace service

@ -12,17 +12,18 @@ namespace llarp
namespace service
{
struct AsyncKeyExchange
: public std::enable_shared_from_this< AsyncKeyExchange >
{
std::shared_ptr< Logic > logic;
SharedSecret sharedKey;
ServiceInfo m_remote;
const Identity& m_LocalIdentity;
ProtocolMessage msg;
ProtocolFrame frame;
std::shared_ptr< ProtocolFrame > frame;
Introduction intro;
const PQPubKey introPubKey;
Introduction remoteIntro;
std::function< void(ProtocolFrame&) > hook;
std::function< void(std::shared_ptr< ProtocolFrame >) > hook;
IDataHandler* handler;
ConvoTag tag;
@ -33,11 +34,11 @@ namespace llarp
const ConvoTag& t, ProtocolType proto);
static void
Result(void* user);
Result(std::shared_ptr< AsyncKeyExchange > user);
/// given protocol message make protocol frame
static void
Encrypt(void* user);
Encrypt(std::shared_ptr< AsyncKeyExchange > user);
};
} // namespace service

@ -1132,30 +1132,30 @@ namespace llarp
if(p)
{
// TODO: check expiration of our end
ProtocolMessage m(f.T);
m.PutBuffer(data);
auto m = std::make_shared< ProtocolMessage >(f.T);
m->PutBuffer(data);
f.N.Randomize();
f.C.Zero();
transfer->Y.Randomize();
m.proto = t;
m.introReply = p->intro;
PutReplyIntroFor(f.T, m.introReply);
m.sender = m_Identity.pub;
m.seqno = GetSeqNoForConvo(f.T);
m->proto = t;
m->introReply = p->intro;
PutReplyIntroFor(f.T, m->introReply);
m->sender = m_Identity.pub;
m->seqno = GetSeqNoForConvo(f.T);
f.S = 1;
f.F = m.introReply.pathID;
f.F = m->introReply.pathID;
transfer->P = remoteIntro.pathID;
if(!f.EncryptAndSign(m, K, m_Identity))
{
LogError("failed to encrypt and sign");
return false;
}
LogDebug(Name(), " send ", data.sz, " via ", remoteIntro.router);
{
util::Lock lock(&m_state->m_SendQueueMutex);
m_state->m_SendQueue.emplace_back(transfer, p);
}
return true;
auto self = this;
return CryptoWorker()->addJob([transfer, p, m, K, self]() {
if(not transfer->T.EncryptAndSign(*m, K, self->m_Identity))
{
LogError("failed to encrypt and sign");
return;
}
util::Lock lock(&self->m_state->m_SendQueueMutex);
self->m_state->m_SendQueue.emplace_back(transfer, p);
});
}
}
}
@ -1193,7 +1193,7 @@ namespace llarp
}
m_state->m_PendingTraffic.erase(r);
},
5000, true);
5000);
}
bool

@ -166,7 +166,7 @@ namespace llarp
{
if(itr->second.remote.Addr() == info)
{
if(tags.insert(itr->first).second)
if(tags.emplace(itr->first).second)
{
inserted = true;
}

@ -19,7 +19,7 @@ namespace llarp
{
struct ILookupHolder;
constexpr size_t MaxConcurrentLookups = size_t(4);
constexpr size_t MaxConcurrentLookups = size_t(16);
struct IServiceLookup
{

@ -64,6 +64,7 @@ namespace llarp
if(intro.expiresAt > m_NextIntro.expiresAt)
m_NextIntro = intro;
}
currentConvoTag.Randomize();
}
OutboundContext::~OutboundContext() = default;
@ -160,8 +161,7 @@ namespace llarp
return;
}
}
currentConvoTag.Randomize();
AsyncKeyExchange* ex = new AsyncKeyExchange(
auto ex = std::make_shared< AsyncKeyExchange >(
m_Endpoint->RouterLogic(), remoteIdent, m_Endpoint->GetIdentity(),
currentIntroSet.K, remoteIntro, m_DataHandler, currentConvoTag, t);
@ -170,7 +170,7 @@ namespace llarp
ex->msg.PutBuffer(payload);
ex->msg.introReply = path->intro;
ex->frame.F = ex->msg.introReply.pathID;
ex->frame->F = ex->msg.introReply.pathID;
m_Endpoint->CryptoWorker()->addJob(
std::bind(&AsyncKeyExchange::Encrypt, ex));
}
@ -245,7 +245,8 @@ namespace llarp
if(remoteIntro.ExpiresSoon(now))
{
// shift intro if it expires "soon"
ShiftIntroduction();
if(ShiftIntroduction())
SwapIntros(); // swap intros if we shifted
}
// lookup router in intro if set and unknown
m_Endpoint->EnsureRouterIsKnown(remoteIntro.router);
@ -253,7 +254,7 @@ namespace llarp
auto itr = m_BadIntros.begin();
while(itr != m_BadIntros.end())
{
if(now - itr->second > path::default_lifetime)
if(now > itr->second && now - itr->second > path::default_lifetime)
itr = m_BadIntros.erase(itr);
else
++itr;
@ -274,8 +275,6 @@ namespace llarp
tmp.Randomize();
llarp_buffer_t buf(tmp.data(), tmp.size());
AsyncEncryptAndSendTo(buf, eProtocolControl);
if(currentConvoTag.IsZero())
return false;
return !m_DataHandler->HasConvoTag(currentConvoTag);
}
}

@ -5,6 +5,7 @@
#include <service/endpoint.hpp>
#include <util/thread/logic.hpp>
#include <utility>
#include <unordered_set>
namespace llarp
{
@ -23,12 +24,12 @@ namespace llarp
}
bool
SendContext::Send(const ProtocolFrame& msg, path::Path_ptr path)
SendContext::Send(std::shared_ptr< ProtocolFrame > msg, path::Path_ptr path)
{
util::Lock lock(&m_SendQueueMutex);
m_SendQueue.emplace_back(
std::make_shared< const routing::PathTransferMessage >(
msg, remoteIntro.pathID),
*msg, remoteIntro.pathID),
path);
return true;
}
@ -37,17 +38,27 @@ namespace llarp
SendContext::FlushUpstream()
{
auto r = m_Endpoint->Router();
util::Lock lock(&m_SendQueueMutex);
for(const auto& item : m_SendQueue)
std::unordered_set< path::Path_ptr, path::Path::Ptr_Hash > flushpaths;
{
if(item.second->SendRoutingMessage(*item.first, r))
util::Lock lock(&m_SendQueueMutex);
for(const auto& item : m_SendQueue)
{
lastGoodSend = r->Now();
if(item.second->SendRoutingMessage(*item.first, r))
{
lastGoodSend = r->Now();
flushpaths.emplace(item.second);
}
else
LogError(m_Endpoint->Name(), " failed to send frame on path");
}
else
LogError(m_Endpoint->Name(), " failed to send frame on path");
m_SendQueue.clear();
}
// flush the select path's upstream
for(const auto& path : flushpaths)
{
path->FlushUpstream(r);
}
m_SendQueue.clear();
}
/// send on an established convo tag
@ -55,10 +66,10 @@ namespace llarp
SendContext::EncryptAndSendTo(const llarp_buffer_t& payload, ProtocolType t)
{
SharedSecret shared;
ProtocolFrame f;
f.N.Randomize();
f.T = currentConvoTag;
f.S = ++sequenceNo;
auto f = std::make_shared< ProtocolFrame >();
f->N.Randomize();
f->T = currentConvoTag;
f->S = ++sequenceNo;
auto path = m_PathSet->GetNewestPathByRouter(remoteIntro.router);
if(!path)
@ -68,29 +79,35 @@ namespace llarp
return;
}
if(!m_DataHandler->GetCachedSessionKeyFor(f.T, shared))
if(!m_DataHandler->GetCachedSessionKeyFor(f->T, shared))
{
LogError(m_Endpoint->Name(),
" has no cached session key on session T=", f.T);
" has no cached session key on session T=", f->T);
return;
}
ProtocolMessage m;
m_DataHandler->PutIntroFor(f.T, remoteIntro);
m_DataHandler->PutReplyIntroFor(f.T, path->intro);
m.proto = t;
m.seqno = m_Endpoint->GetSeqNoForConvo(f.T);
m.introReply = path->intro;
f.F = m.introReply.pathID;
m.sender = m_Endpoint->GetIdentity().pub;
m.tag = f.T;
m.PutBuffer(payload);
if(!f.EncryptAndSign(m, shared, m_Endpoint->GetIdentity()))
{
LogError(m_Endpoint->Name(), " failed to sign message");
return;
}
Send(f, path);
auto m = std::make_shared< ProtocolMessage >();
m_DataHandler->PutIntroFor(f->T, remoteIntro);
m_DataHandler->PutReplyIntroFor(f->T, path->intro);
m->proto = t;
m->seqno = m_Endpoint->GetSeqNoForConvo(f->T);
m->introReply = path->intro;
f->F = m->introReply.pathID;
m->sender = m_Endpoint->GetIdentity().pub;
m->tag = f->T;
m->PutBuffer(payload);
auto self = this;
m_Endpoint->CryptoWorker()->addJob([f, m, shared, path, self]() {
if(!f->EncryptAndSign(*m, shared, self->m_Endpoint->GetIdentity()))
{
LogError(self->m_Endpoint->Name(), " failed to sign message");
return;
}
self->m_Endpoint->RouterLogic()->queue_func([self, f, path]() {
self->Send(f, path);
self->FlushUpstream();
});
});
}
void

@ -29,7 +29,7 @@ namespace llarp
/// queue send a fully encrypted hidden service frame
/// via a path
bool
Send(const ProtocolFrame& f, path::Path_ptr path)
Send(std::shared_ptr< ProtocolFrame > f, path::Path_ptr path)
LOCKS_EXCLUDED(m_SendQueueMutex);
/// flush upstream traffic when in router thread

Loading…
Cancel
Save