more async cryptography

pull/830/head
Jeff Becker 5 years ago
parent 88cde21b9b
commit 4bf6882c8a
No known key found for this signature in database
GPG Key ID: F357B3B42F6F9B05

@ -744,6 +744,9 @@ struct llarp_ev_loop
virtual int
tick(int ms) = 0;
virtual bool
add_ticker(std::function< void(void) > ticker) = 0;
virtual void
stop() = 0;

@ -312,6 +312,39 @@ namespace libuv
}
};
struct ticker_glue : public glue
{
std::function< void(void) > func;
ticker_glue(uv_loop_t* loop, std::function< void(void) > tick) : func(tick)
{
m_Ticker.data = this;
uv_check_init(loop, &m_Ticker);
}
static void
OnTick(uv_check_t* t)
{
static_cast< ticker_glue* >(t->data)->func();
}
bool
Start()
{
return uv_check_start(&m_Ticker, &OnTick) != -1;
}
void
Close() override
{
uv_check_stop(&m_Ticker);
m_Ticker.data = nullptr;
delete this;
}
uv_check_t m_Ticker;
};
struct udp_glue : public glue
{
uv_udp_t m_Handle;
@ -737,6 +770,18 @@ namespace libuv
return false;
}
bool
Loop::add_ticker(std::function< void(void) > func)
{
auto* ticker = new ticker_glue(m_Impl.get(), func);
if(ticker->Start())
{
return true;
}
delete ticker;
return false;
}
bool
Loop::udp_close(llarp_udp_io* udp)
{

@ -78,6 +78,9 @@ namespace libuv
return nullptr;
}
bool
add_ticker(std::function< void(void) > ticker) override;
/// register event listener
bool
add_ev(llarp::ev_io*, bool) override

@ -14,14 +14,10 @@ namespace llarp
: ILinkLayer(routerEncSecret, getrc, h, sign, est, reneg, timeout,
closed)
, permitInbound{allowInbound}
, m_CryptoWorker(4, 1024 * 8, "iwp-worker")
{
}
LinkLayer::~LinkLayer()
{
m_CryptoWorker.stop();
}
LinkLayer::~LinkLayer() = default;
void
LinkLayer::Pump()
@ -73,21 +69,7 @@ namespace llarp
void
LinkLayer::QueueWork(std::function< void(void) > func)
{
m_CryptoWorker.addJob(func);
}
bool
LinkLayer::Start(std::shared_ptr< Logic > l)
{
if(!ILinkLayer::Start(l))
return false;
return m_CryptoWorker.start();
}
void
LinkLayer::Stop()
{
ILinkLayer::Stop();
m_Worker->addJob(func);
}
void

@ -22,9 +22,6 @@ namespace llarp
~LinkLayer() override;
bool
Start(std::shared_ptr< Logic > l) override;
std::shared_ptr< ILinkSession >
NewOutboundSession(const RouterContact &rc,
const AddressInfo &ai) override;
@ -38,9 +35,6 @@ namespace llarp
const char *
Name() const override;
void
Stop() override;
uint16_t
Rank() const override;
@ -59,7 +53,6 @@ namespace llarp
private:
std::unordered_map< Addr, RouterID, Addr::Hash > m_AuthedAddrs;
const bool permitInbound;
thread::ThreadPool m_CryptoWorker;
};
using LinkLayer_ptr = std::shared_ptr< LinkLayer >;

@ -42,7 +42,8 @@ namespace llarp
AddLink(LinkLayer_ptr link, bool inbound = false) = 0;
virtual bool
StartLinks(Logic_ptr logic) = 0;
StartLinks(Logic_ptr logic,
std::shared_ptr< thread::ThreadPool > worker) = 0;
virtual void
Stop() = 0;

@ -89,12 +89,13 @@ namespace llarp
}
bool
LinkManager::StartLinks(Logic_ptr logic)
LinkManager::StartLinks(Logic_ptr logic,
std::shared_ptr< thread::ThreadPool > worker)
{
LogInfo("starting ", outboundLinks.size(), " outbound links");
for(const auto &link : outboundLinks)
{
if(!link->Start(logic))
if(!link->Start(logic, worker))
{
LogWarn("outbound link '", link->Name(), "' failed to start");
return false;
@ -107,7 +108,7 @@ namespace llarp
LogInfo("starting ", inboundLinks.size(), " inbound links");
for(const auto &link : inboundLinks)
{
if(!link->Start(logic))
if(!link->Start(logic, worker))
{
LogWarn("Link ", link->Name(), " failed to start");
return false;

@ -40,7 +40,8 @@ namespace llarp
AddLink(LinkLayer_ptr link, bool inbound = false) override;
bool
StartLinks(Logic_ptr logic) override;
StartLinks(Logic_ptr logic,
std::shared_ptr< thread::ThreadPool > worker) override;
void
Stop() override;

@ -264,9 +264,11 @@ namespace llarp
}
bool
ILinkLayer::Start(std::shared_ptr< Logic > l)
ILinkLayer::Start(std::shared_ptr< Logic > l,
std::shared_ptr< thread::ThreadPool > worker)
{
m_Logic = l;
m_Worker = worker;
m_Logic = l;
ScheduleTick(100);
return true;
}

@ -122,8 +122,9 @@ namespace llarp
bool
TryEstablishTo(RouterContact rc);
virtual bool
Start(std::shared_ptr< llarp::Logic > l);
bool
Start(std::shared_ptr< llarp::Logic > l,
std::shared_ptr< thread::ThreadPool > worker);
virtual void
Stop();
@ -243,7 +244,8 @@ namespace llarp
bool
PutSession(const std::shared_ptr< ILinkSession >& s);
std::shared_ptr< llarp::Logic > m_Logic = nullptr;
std::shared_ptr< llarp::Logic > m_Logic = nullptr;
std::shared_ptr< llarp::thread::ThreadPool > m_Worker = nullptr;
llarp_ev_loop_ptr m_Loop;
Addr m_ourAddr;
llarp_udp_io m_udp;

@ -4,6 +4,8 @@
#include <crypto/types.hpp>
#include <util/types.hpp>
#include <crypto/encrypted_frame.hpp>
#include <messages/relay.hpp>
#include <vector>
#include <memory>
@ -22,6 +24,9 @@ namespace llarp
{
struct IHopHandler
{
using TrafficEvent_t = std::pair< std::vector< byte_t >, TunnelNonce >;
using TrafficQueue_t = std::vector< TrafficEvent_t >;
virtual ~IHopHandler() = default;
virtual bool
@ -35,14 +40,30 @@ namespace llarp
SendRoutingMessage(const routing::IMessage& msg, AbstractRouter* r) = 0;
// handle data in upstream direction
virtual bool
bool
HandleUpstream(const llarp_buffer_t& X, const TunnelNonce& Y,
AbstractRouter* r) = 0;
AbstractRouter*)
{
m_UpstreamQueue.emplace_back();
auto& pkt = m_UpstreamQueue.back();
pkt.first.resize(X.sz);
std::copy_n(X.base, X.sz, pkt.first.begin());
pkt.second = Y;
return true;
}
// handle data in downstream direction
virtual bool
bool
HandleDownstream(const llarp_buffer_t& X, const TunnelNonce& Y,
AbstractRouter* r) = 0;
AbstractRouter*)
{
m_DownstreamQueue.emplace_back();
auto& pkt = m_DownstreamQueue.back();
pkt.first.resize(X.sz);
std::copy_n(X.base, X.sz, pkt.first.begin());
pkt.second = Y;
return true;
}
/// return timestamp last remote activity happened at
virtual llarp_time_t
@ -57,9 +78,26 @@ namespace llarp
{
return m_SequenceNum++;
}
virtual void
FlushQueues(AbstractRouter* r) = 0;
protected:
uint64_t m_SequenceNum = 0;
TrafficQueue_t m_UpstreamQueue;
TrafficQueue_t m_DownstreamQueue;
virtual void
UpstreamWork(TrafficQueue_t queue, AbstractRouter* r) = 0;
virtual void
DownstreamWork(TrafficQueue_t queue, AbstractRouter* r) = 0;
virtual void
HandleAllUpstream(std::vector< RelayUpstreamMessage > msgs,
AbstractRouter* r) = 0;
virtual void
HandleAllDownstream(std::vector< RelayDownstreamMessage > msgs,
AbstractRouter* r) = 0;
};
using HopHandler_ptr = std::shared_ptr< IHopHandler >;

@ -363,24 +363,55 @@ namespace llarp
}
}
bool
Path::HandleUpstream(const llarp_buffer_t& buf, const TunnelNonce& Y,
AbstractRouter* r)
void
Path::HandleAllUpstream(std::vector< RelayUpstreamMessage > msgs,
AbstractRouter* r)
{
TunnelNonce n = Y;
for(const auto& hop : hops)
for(const auto& msg : msgs)
{
CryptoManager::instance()->xchacha20(buf, hop.shared, n);
n ^= hop.nonceXOR;
if(!r->SendToOrQueue(Upstream(), &msg))
{
LogDebug("failed to send upstream to ", Upstream());
}
}
RelayUpstreamMessage msg;
msg.X = buf;
msg.Y = Y;
msg.pathid = TXID();
if(r->SendToOrQueue(Upstream(), &msg))
return true;
LogError("send to ", Upstream(), " failed");
return false;
}
void
Path::UpstreamWork(TrafficQueue_t msgs, AbstractRouter* r)
{
std::vector< RelayUpstreamMessage > sendmsgs(msgs.size());
size_t idx = 0;
for(const auto& ev : msgs)
{
const llarp_buffer_t buf(ev.first);
TunnelNonce n = ev.second;
for(const auto& hop : hops)
{
CryptoManager::instance()->xchacha20(buf, hop.shared, n);
n ^= hop.nonceXOR;
}
auto& msg = sendmsgs[idx];
msg.X = buf;
msg.Y = ev.second;
msg.pathid = TXID();
++idx;
}
r->logic()->queue_func(std::bind(&Path::HandleAllUpstream,
shared_from_this(), std::move(sendmsgs),
r));
}
void
Path::FlushQueues(AbstractRouter* r)
{
if(!m_UpstreamQueue.empty())
r->threadpool()->addJob(std::bind(&Path::UpstreamWork,
shared_from_this(),
std::move(m_UpstreamQueue), r));
if(!m_DownstreamQueue.empty())
r->threadpool()->addJob(std::bind(&Path::DownstreamWork,
shared_from_this(),
std::move(m_DownstreamQueue), r));
}
bool
@ -406,20 +437,43 @@ namespace llarp
return ss.str();
}
bool
Path::HandleDownstream(const llarp_buffer_t& buf, const TunnelNonce& Y,
AbstractRouter* r)
void
Path::DownstreamWork(TrafficQueue_t msgs, AbstractRouter* r)
{
TunnelNonce n = Y;
for(const auto& hop : hops)
std::vector< RelayDownstreamMessage > sendMsgs(msgs.size());
size_t idx = 0;
for(auto& ev : msgs)
{
n ^= hop.nonceXOR;
CryptoManager::instance()->xchacha20(buf, hop.shared, n);
const llarp_buffer_t buf(ev.first);
sendMsgs[idx].Y = ev.second;
for(const auto& hop : hops)
{
sendMsgs[idx].Y ^= hop.nonceXOR;
CryptoManager::instance()->xchacha20(buf, hop.shared,
sendMsgs[idx].Y);
}
sendMsgs[idx].X = buf;
++idx;
}
r->logic()->queue_func(std::bind(&Path::HandleAllDownstream,
shared_from_this(), std::move(sendMsgs),
r));
}
void
Path::HandleAllDownstream(std::vector< RelayDownstreamMessage > msgs,
AbstractRouter* r)
{
for(const auto& msg : msgs)
{
const llarp_buffer_t buf(msg.X);
if(!HandleRoutingMessage(buf, r))
{
LogWarn("failed to handle downstream message");
continue;
}
m_LastRecvMessage = r->Now();
}
if(!HandleRoutingMessage(buf, r))
return false;
m_LastRecvMessage = r->Now();
return true;
}
bool

@ -286,16 +286,6 @@ namespace llarp
bool
HandleRoutingMessage(const llarp_buffer_t& buf, AbstractRouter* r);
// handle data in upstream direction
bool
HandleUpstream(const llarp_buffer_t& X, const TunnelNonce& Y,
AbstractRouter* r) override;
// handle data in downstream direction
bool
HandleDownstream(const llarp_buffer_t& X, const TunnelNonce& Y,
AbstractRouter* r) override;
bool
IsReady() const;
@ -334,6 +324,24 @@ namespace llarp
bool
SendExitClose(const routing::CloseExitMessage& msg, AbstractRouter* r);
void
FlushQueues(AbstractRouter* r) override;
protected:
void
UpstreamWork(TrafficQueue_t queue, AbstractRouter* r) override;
void
DownstreamWork(TrafficQueue_t queue, AbstractRouter* r) override;
void
HandleAllUpstream(std::vector< RelayUpstreamMessage > msgs,
AbstractRouter* r) override;
void
HandleAllDownstream(std::vector< RelayDownstreamMessage > msgs,
AbstractRouter* r) override;
private:
/// call obtained exit hooks
bool

@ -252,6 +252,13 @@ namespace llarp
return nullptr;
}
void
PathContext::Pump()
{
m_TransitPaths.ForEach([&](auto& ptr) { ptr->FlushQueues(m_Router); });
m_OurPaths.ForEach([&](auto& ptr) { ptr->FlushQueues(m_Router); });
}
void
PathContext::PutTransitHop(std::shared_ptr< TransitHop > hop)
{

@ -37,6 +37,9 @@ namespace llarp
void
ExpirePaths(llarp_time_t now);
void
Pump();
void
AllowTransit();

@ -375,5 +375,12 @@ namespace llarp
return nullptr;
}
void
PathSet::FlushQueues(AbstractRouter* r)
{
ForEachPath([r](const Path_ptr& ptr) { ptr->FlushQueues(r); });
}
} // namespace path
} // namespace llarp

@ -275,6 +275,9 @@ namespace llarp
}
}
void
FlushQueues(AbstractRouter* r);
size_t numPaths;
protected:

@ -114,39 +114,96 @@ namespace llarp
return HandleDownstream(buf, N, r);
}
bool
TransitHop::HandleDownstream(const llarp_buffer_t& buf,
const TunnelNonce& Y, AbstractRouter* r)
void
TransitHop::DownstreamWork(TrafficQueue_t msgs, AbstractRouter* r)
{
RelayDownstreamMessage msg;
msg.pathid = info.rxID;
msg.Y = Y ^ nonceXOR;
CryptoManager::instance()->xchacha20(buf, pathKey, Y);
msg.X = buf;
llarp::LogDebug("relay ", msg.X.size(), " bytes downstream from ",
info.upstream, " to ", info.downstream);
return r->SendToOrQueue(info.downstream, &msg);
std::vector< RelayDownstreamMessage > sendmsgs(msgs.size());
size_t idx = 0;
for(auto& ev : msgs)
{
const llarp_buffer_t buf(ev.first);
auto& msg = sendmsgs[idx];
msg.pathid = info.rxID;
msg.Y = ev.second ^ nonceXOR;
CryptoManager::instance()->xchacha20(buf, pathKey, ev.second);
msg.X = buf;
llarp::LogDebug("relay ", msg.X.size(), " bytes downstream from ",
info.upstream, " to ", info.downstream);
++idx;
}
r->logic()->queue_func(std::bind(&TransitHop::HandleAllDownstream,
shared_from_this(), std::move(sendmsgs),
r));
}
bool
TransitHop::HandleUpstream(const llarp_buffer_t& buf, const TunnelNonce& Y,
AbstractRouter* r)
void
TransitHop::UpstreamWork(TrafficQueue_t msgs, AbstractRouter* r)
{
std::vector< RelayUpstreamMessage > sendmsgs(msgs.size());
size_t idx = 0;
for(auto& ev : msgs)
{
const llarp_buffer_t buf(ev.first);
auto& msg = sendmsgs[idx];
CryptoManager::instance()->xchacha20(buf, pathKey, ev.second);
msg.pathid = info.txID;
msg.Y = ev.second ^ nonceXOR;
msg.X = buf;
++idx;
}
r->logic()->queue_func(std::bind(&TransitHop::HandleAllUpstream,
shared_from_this(), std::move(sendmsgs),
r));
}
void
TransitHop::HandleAllUpstream(std::vector< RelayUpstreamMessage > msgs,
AbstractRouter* r)
{
CryptoManager::instance()->xchacha20(buf, pathKey, Y);
if(IsEndpoint(r->pubkey()))
{
m_LastActivity = r->Now();
return r->ParseRoutingMessageBuffer(buf, this, info.rxID);
for(const auto& msg : msgs)
{
const llarp_buffer_t buf(msg.X);
if(!r->ParseRoutingMessageBuffer(buf, this, info.rxID))
continue;
m_LastActivity = r->Now();
}
}
else
{
for(const auto& msg : msgs)
{
llarp::LogDebug("relay ", msg.X.size(), " bytes upstream from ",
info.downstream, " to ", info.upstream);
r->SendToOrQueue(info.upstream, &msg);
}
}
}
RelayUpstreamMessage msg;
msg.pathid = info.txID;
msg.Y = Y ^ nonceXOR;
void
TransitHop::HandleAllDownstream(std::vector< RelayDownstreamMessage > msgs,
AbstractRouter* r)
{
for(const auto& msg : msgs)
{
llarp::LogDebug("relay ", msg.X.size(), " bytes downstream from ",
info.downstream, " to ", info.upstream);
r->SendToOrQueue(info.downstream, &msg);
}
}
msg.X = buf;
llarp::LogDebug("relay ", msg.X.size(), " bytes upstream from ",
info.downstream, " to ", info.upstream);
return r->SendToOrQueue(info.upstream, &msg);
void
TransitHop::FlushQueues(AbstractRouter* r)
{
if(!m_UpstreamQueue.empty())
r->threadpool()->addJob(std::bind(&TransitHop::UpstreamWork,
shared_from_this(),
std::move(m_UpstreamQueue), r));
if(!m_DownstreamQueue.empty())
r->threadpool()->addJob(std::bind(&TransitHop::DownstreamWork,
shared_from_this(),
std::move(m_DownstreamQueue), r));
}
bool
@ -228,7 +285,6 @@ namespace llarp
{
if(SendRoutingMessage(reply, r))
{
r->PumpLL();
ep->Close();
return true;
}

@ -79,7 +79,9 @@ namespace llarp
return info.print(out, -1, -1);
}
struct TransitHop : public IHopHandler, public routing::IMessageHandler
struct TransitHop : public IHopHandler,
public routing::IMessageHandler,
std::enable_shared_from_this< TransitHop >
{
TransitHop();
@ -193,15 +195,23 @@ namespace llarp
bool
HandleDHTMessage(const dht::IMessage& msg, AbstractRouter* r) override;
// handle data in upstream direction
bool
HandleUpstream(const llarp_buffer_t& X, const TunnelNonce& Y,
AbstractRouter* r) override;
void
FlushQueues(AbstractRouter* r) override;
// handle data in downstream direction
bool
HandleDownstream(const llarp_buffer_t& X, const TunnelNonce& Y,
AbstractRouter* r) override;
protected:
void
UpstreamWork(TrafficQueue_t queue, AbstractRouter* r) override;
void
DownstreamWork(TrafficQueue_t queue, AbstractRouter* r) override;
void
HandleAllUpstream(std::vector< RelayUpstreamMessage > msgs,
AbstractRouter* r) override;
void
HandleAllDownstream(std::vector< RelayDownstreamMessage > msgs,
AbstractRouter* r) override;
private:
void

@ -22,6 +22,7 @@
#include <util/metrics/metrics.hpp>
#include <util/str.hpp>
#include <utp/utp.hpp>
#include <ev/ev.hpp>
#include <fstream>
#include <cstdlib>
@ -164,6 +165,9 @@ namespace llarp
void
Router::PumpLL()
{
_logic->tick(time_now_ms());
paths.Pump();
_logic->tick(time_now_ms());
_linkManager.PumpLinks();
}
@ -913,7 +917,7 @@ namespace llarp
return false;
}
_outboundSessionMaker.SetOurRouter(pubkey());
if(!_linkManager.StartLinks(_logic))
if(!_linkManager.StartLinks(_logic, cryptoworker))
{
LogWarn("One or more links failed to start.");
return false;
@ -987,7 +991,7 @@ namespace llarp
}
LogInfo("have ", nodedb->num_loaded(), " routers");
_netloop->add_ticker(std::bind(&Router::PumpLL, this));
ScheduleTicker(1000);
_running.store(true);
_startedAt = Now();
@ -1039,6 +1043,7 @@ namespace llarp
_exitContext.Stop();
if(rpcServer)
rpcServer->Stop();
paths.Pump();
_linkManager.PumpLinks();
_logic->call_later({200, this, &RouterAfterStopIssued});
}

@ -30,6 +30,8 @@ struct LinkLayerTest : public test::LlarpTest< llarp::sodium::CryptoLibSodium >
rc.enckey = encryptionKey.toPublic();
}
std::shared_ptr<thread::ThreadPool> worker;
SecretKey signingKey;
SecretKey encryptionKey;
@ -37,6 +39,12 @@ struct LinkLayerTest : public test::LlarpTest< llarp::sodium::CryptoLibSodium >
bool gotLIM = false;
void Setup()
{
worker = std::make_shared<thread::ThreadPool>(1, 128, "test-worker");
worker->start();
}
const RouterContact&
GetRC() const
{
@ -85,7 +93,7 @@ struct LinkLayerTest : public test::LlarpTest< llarp::sodium::CryptoLibSodium >
return false;
if(!rc.Sign(signingKey))
return false;
return link->Start(logic);
return link->Start(logic, worker);
}
void
@ -93,6 +101,8 @@ struct LinkLayerTest : public test::LlarpTest< llarp::sodium::CryptoLibSodium >
{
if(link)
link->Stop();
if(worker)
worker->stop();
}
void
@ -100,6 +110,7 @@ struct LinkLayerTest : public test::LlarpTest< llarp::sodium::CryptoLibSodium >
{
Stop();
link.reset();
worker.reset();
}
};
@ -125,6 +136,8 @@ struct LinkLayerTest : public test::LlarpTest< llarp::sodium::CryptoLibSodium >
RouterContact::Lifetime = 500;
netLoop = llarp_make_ev_loop();
m_logic.reset(new Logic());
Alice.Setup();
Bob.Setup();
}
void

Loading…
Cancel
Save