diff --git a/llarp/ev/ev.hpp b/llarp/ev/ev.hpp index 9c38d2ac1..dbbd16440 100644 --- a/llarp/ev/ev.hpp +++ b/llarp/ev/ev.hpp @@ -744,6 +744,9 @@ struct llarp_ev_loop virtual int tick(int ms) = 0; + virtual bool + add_ticker(std::function< void(void) > ticker) = 0; + virtual void stop() = 0; diff --git a/llarp/ev/ev_libuv.cpp b/llarp/ev/ev_libuv.cpp index 732770dda..bddfc6791 100644 --- a/llarp/ev/ev_libuv.cpp +++ b/llarp/ev/ev_libuv.cpp @@ -312,6 +312,39 @@ namespace libuv } }; + struct ticker_glue : public glue + { + std::function< void(void) > func; + + ticker_glue(uv_loop_t* loop, std::function< void(void) > tick) : func(tick) + { + m_Ticker.data = this; + uv_check_init(loop, &m_Ticker); + } + + static void + OnTick(uv_check_t* t) + { + static_cast< ticker_glue* >(t->data)->func(); + } + + bool + Start() + { + return uv_check_start(&m_Ticker, &OnTick) != -1; + } + + void + Close() override + { + uv_check_stop(&m_Ticker); + m_Ticker.data = nullptr; + delete this; + } + + uv_check_t m_Ticker; + }; + struct udp_glue : public glue { uv_udp_t m_Handle; @@ -737,6 +770,18 @@ namespace libuv return false; } + bool + Loop::add_ticker(std::function< void(void) > func) + { + auto* ticker = new ticker_glue(m_Impl.get(), func); + if(ticker->Start()) + { + return true; + } + delete ticker; + return false; + } + bool Loop::udp_close(llarp_udp_io* udp) { diff --git a/llarp/ev/ev_libuv.hpp b/llarp/ev/ev_libuv.hpp index 3ad9a6389..ab19fec73 100644 --- a/llarp/ev/ev_libuv.hpp +++ b/llarp/ev/ev_libuv.hpp @@ -78,6 +78,9 @@ namespace libuv return nullptr; } + bool + add_ticker(std::function< void(void) > ticker) override; + /// register event listener bool add_ev(llarp::ev_io*, bool) override diff --git a/llarp/iwp/linklayer.cpp b/llarp/iwp/linklayer.cpp index b2e308733..7704dcc3e 100644 --- a/llarp/iwp/linklayer.cpp +++ b/llarp/iwp/linklayer.cpp @@ -14,14 +14,10 @@ namespace llarp : ILinkLayer(routerEncSecret, getrc, h, sign, est, reneg, timeout, closed) , permitInbound{allowInbound} - , m_CryptoWorker(4, 1024 * 8, "iwp-worker") { } - LinkLayer::~LinkLayer() - { - m_CryptoWorker.stop(); - } + LinkLayer::~LinkLayer() = default; void LinkLayer::Pump() @@ -73,21 +69,7 @@ namespace llarp void LinkLayer::QueueWork(std::function< void(void) > func) { - m_CryptoWorker.addJob(func); - } - - bool - LinkLayer::Start(std::shared_ptr< Logic > l) - { - if(!ILinkLayer::Start(l)) - return false; - return m_CryptoWorker.start(); - } - - void - LinkLayer::Stop() - { - ILinkLayer::Stop(); + m_Worker->addJob(func); } void diff --git a/llarp/iwp/linklayer.hpp b/llarp/iwp/linklayer.hpp index 4ae305ddf..f481ddf61 100644 --- a/llarp/iwp/linklayer.hpp +++ b/llarp/iwp/linklayer.hpp @@ -22,9 +22,6 @@ namespace llarp ~LinkLayer() override; - bool - Start(std::shared_ptr< Logic > l) override; - std::shared_ptr< ILinkSession > NewOutboundSession(const RouterContact &rc, const AddressInfo &ai) override; @@ -38,9 +35,6 @@ namespace llarp const char * Name() const override; - void - Stop() override; - uint16_t Rank() const override; @@ -59,7 +53,6 @@ namespace llarp private: std::unordered_map< Addr, RouterID, Addr::Hash > m_AuthedAddrs; const bool permitInbound; - thread::ThreadPool m_CryptoWorker; }; using LinkLayer_ptr = std::shared_ptr< LinkLayer >; diff --git a/llarp/link/i_link_manager.hpp b/llarp/link/i_link_manager.hpp index 55d32dd4a..d42c9e98d 100644 --- a/llarp/link/i_link_manager.hpp +++ b/llarp/link/i_link_manager.hpp @@ -42,7 +42,8 @@ namespace llarp AddLink(LinkLayer_ptr link, bool inbound = false) = 0; virtual bool - StartLinks(Logic_ptr logic) = 0; + StartLinks(Logic_ptr logic, + std::shared_ptr< thread::ThreadPool > worker) = 0; virtual void Stop() = 0; diff --git a/llarp/link/link_manager.cpp b/llarp/link/link_manager.cpp index 69b58dc0a..09aee0d4d 100644 --- a/llarp/link/link_manager.cpp +++ b/llarp/link/link_manager.cpp @@ -89,12 +89,13 @@ namespace llarp } bool - LinkManager::StartLinks(Logic_ptr logic) + LinkManager::StartLinks(Logic_ptr logic, + std::shared_ptr< thread::ThreadPool > worker) { LogInfo("starting ", outboundLinks.size(), " outbound links"); for(const auto &link : outboundLinks) { - if(!link->Start(logic)) + if(!link->Start(logic, worker)) { LogWarn("outbound link '", link->Name(), "' failed to start"); return false; @@ -107,7 +108,7 @@ namespace llarp LogInfo("starting ", inboundLinks.size(), " inbound links"); for(const auto &link : inboundLinks) { - if(!link->Start(logic)) + if(!link->Start(logic, worker)) { LogWarn("Link ", link->Name(), " failed to start"); return false; diff --git a/llarp/link/link_manager.hpp b/llarp/link/link_manager.hpp index 877a65816..103ef5869 100644 --- a/llarp/link/link_manager.hpp +++ b/llarp/link/link_manager.hpp @@ -40,7 +40,8 @@ namespace llarp AddLink(LinkLayer_ptr link, bool inbound = false) override; bool - StartLinks(Logic_ptr logic) override; + StartLinks(Logic_ptr logic, + std::shared_ptr< thread::ThreadPool > worker) override; void Stop() override; diff --git a/llarp/link/server.cpp b/llarp/link/server.cpp index 8f0cc2d2d..944b2ebc2 100644 --- a/llarp/link/server.cpp +++ b/llarp/link/server.cpp @@ -264,9 +264,11 @@ namespace llarp } bool - ILinkLayer::Start(std::shared_ptr< Logic > l) + ILinkLayer::Start(std::shared_ptr< Logic > l, + std::shared_ptr< thread::ThreadPool > worker) { - m_Logic = l; + m_Worker = worker; + m_Logic = l; ScheduleTick(100); return true; } diff --git a/llarp/link/server.hpp b/llarp/link/server.hpp index 499040db2..4d98af8ed 100644 --- a/llarp/link/server.hpp +++ b/llarp/link/server.hpp @@ -122,8 +122,9 @@ namespace llarp bool TryEstablishTo(RouterContact rc); - virtual bool - Start(std::shared_ptr< llarp::Logic > l); + bool + Start(std::shared_ptr< llarp::Logic > l, + std::shared_ptr< thread::ThreadPool > worker); virtual void Stop(); @@ -243,7 +244,8 @@ namespace llarp bool PutSession(const std::shared_ptr< ILinkSession >& s); - std::shared_ptr< llarp::Logic > m_Logic = nullptr; + std::shared_ptr< llarp::Logic > m_Logic = nullptr; + std::shared_ptr< llarp::thread::ThreadPool > m_Worker = nullptr; llarp_ev_loop_ptr m_Loop; Addr m_ourAddr; llarp_udp_io m_udp; diff --git a/llarp/path/ihophandler.hpp b/llarp/path/ihophandler.hpp index ea672d74b..c0495e57c 100644 --- a/llarp/path/ihophandler.hpp +++ b/llarp/path/ihophandler.hpp @@ -4,6 +4,8 @@ #include #include #include +#include +#include #include @@ -22,6 +24,9 @@ namespace llarp { struct IHopHandler { + using TrafficEvent_t = std::pair< std::vector< byte_t >, TunnelNonce >; + using TrafficQueue_t = std::vector< TrafficEvent_t >; + virtual ~IHopHandler() = default; virtual bool @@ -35,14 +40,30 @@ namespace llarp SendRoutingMessage(const routing::IMessage& msg, AbstractRouter* r) = 0; // handle data in upstream direction - virtual bool + bool HandleUpstream(const llarp_buffer_t& X, const TunnelNonce& Y, - AbstractRouter* r) = 0; + AbstractRouter*) + { + m_UpstreamQueue.emplace_back(); + auto& pkt = m_UpstreamQueue.back(); + pkt.first.resize(X.sz); + std::copy_n(X.base, X.sz, pkt.first.begin()); + pkt.second = Y; + return true; + } // handle data in downstream direction - virtual bool + bool HandleDownstream(const llarp_buffer_t& X, const TunnelNonce& Y, - AbstractRouter* r) = 0; + AbstractRouter*) + { + m_DownstreamQueue.emplace_back(); + auto& pkt = m_DownstreamQueue.back(); + pkt.first.resize(X.sz); + std::copy_n(X.base, X.sz, pkt.first.begin()); + pkt.second = Y; + return true; + } /// return timestamp last remote activity happened at virtual llarp_time_t @@ -57,9 +78,26 @@ namespace llarp { return m_SequenceNum++; } + virtual void + FlushQueues(AbstractRouter* r) = 0; protected: uint64_t m_SequenceNum = 0; + TrafficQueue_t m_UpstreamQueue; + TrafficQueue_t m_DownstreamQueue; + + virtual void + UpstreamWork(TrafficQueue_t queue, AbstractRouter* r) = 0; + + virtual void + DownstreamWork(TrafficQueue_t queue, AbstractRouter* r) = 0; + + virtual void + HandleAllUpstream(std::vector< RelayUpstreamMessage > msgs, + AbstractRouter* r) = 0; + virtual void + HandleAllDownstream(std::vector< RelayDownstreamMessage > msgs, + AbstractRouter* r) = 0; }; using HopHandler_ptr = std::shared_ptr< IHopHandler >; diff --git a/llarp/path/path.cpp b/llarp/path/path.cpp index 381ec6f02..d24ca336b 100644 --- a/llarp/path/path.cpp +++ b/llarp/path/path.cpp @@ -363,24 +363,55 @@ namespace llarp } } - bool - Path::HandleUpstream(const llarp_buffer_t& buf, const TunnelNonce& Y, - AbstractRouter* r) + void + Path::HandleAllUpstream(std::vector< RelayUpstreamMessage > msgs, + AbstractRouter* r) { - TunnelNonce n = Y; - for(const auto& hop : hops) + for(const auto& msg : msgs) { - CryptoManager::instance()->xchacha20(buf, hop.shared, n); - n ^= hop.nonceXOR; + if(!r->SendToOrQueue(Upstream(), &msg)) + { + LogDebug("failed to send upstream to ", Upstream()); + } } - RelayUpstreamMessage msg; - msg.X = buf; - msg.Y = Y; - msg.pathid = TXID(); - if(r->SendToOrQueue(Upstream(), &msg)) - return true; - LogError("send to ", Upstream(), " failed"); - return false; + } + + void + Path::UpstreamWork(TrafficQueue_t msgs, AbstractRouter* r) + { + std::vector< RelayUpstreamMessage > sendmsgs(msgs.size()); + size_t idx = 0; + for(const auto& ev : msgs) + { + const llarp_buffer_t buf(ev.first); + TunnelNonce n = ev.second; + for(const auto& hop : hops) + { + CryptoManager::instance()->xchacha20(buf, hop.shared, n); + n ^= hop.nonceXOR; + } + auto& msg = sendmsgs[idx]; + msg.X = buf; + msg.Y = ev.second; + msg.pathid = TXID(); + ++idx; + } + r->logic()->queue_func(std::bind(&Path::HandleAllUpstream, + shared_from_this(), std::move(sendmsgs), + r)); + } + + void + Path::FlushQueues(AbstractRouter* r) + { + if(!m_UpstreamQueue.empty()) + r->threadpool()->addJob(std::bind(&Path::UpstreamWork, + shared_from_this(), + std::move(m_UpstreamQueue), r)); + if(!m_DownstreamQueue.empty()) + r->threadpool()->addJob(std::bind(&Path::DownstreamWork, + shared_from_this(), + std::move(m_DownstreamQueue), r)); } bool @@ -406,20 +437,43 @@ namespace llarp return ss.str(); } - bool - Path::HandleDownstream(const llarp_buffer_t& buf, const TunnelNonce& Y, - AbstractRouter* r) + void + Path::DownstreamWork(TrafficQueue_t msgs, AbstractRouter* r) { - TunnelNonce n = Y; - for(const auto& hop : hops) + std::vector< RelayDownstreamMessage > sendMsgs(msgs.size()); + size_t idx = 0; + for(auto& ev : msgs) { - n ^= hop.nonceXOR; - CryptoManager::instance()->xchacha20(buf, hop.shared, n); + const llarp_buffer_t buf(ev.first); + sendMsgs[idx].Y = ev.second; + for(const auto& hop : hops) + { + sendMsgs[idx].Y ^= hop.nonceXOR; + CryptoManager::instance()->xchacha20(buf, hop.shared, + sendMsgs[idx].Y); + } + sendMsgs[idx].X = buf; + ++idx; + } + r->logic()->queue_func(std::bind(&Path::HandleAllDownstream, + shared_from_this(), std::move(sendMsgs), + r)); + } + + void + Path::HandleAllDownstream(std::vector< RelayDownstreamMessage > msgs, + AbstractRouter* r) + { + for(const auto& msg : msgs) + { + const llarp_buffer_t buf(msg.X); + if(!HandleRoutingMessage(buf, r)) + { + LogWarn("failed to handle downstream message"); + continue; + } + m_LastRecvMessage = r->Now(); } - if(!HandleRoutingMessage(buf, r)) - return false; - m_LastRecvMessage = r->Now(); - return true; } bool diff --git a/llarp/path/path.hpp b/llarp/path/path.hpp index 9d7f275a1..e1a9b45c7 100644 --- a/llarp/path/path.hpp +++ b/llarp/path/path.hpp @@ -286,16 +286,6 @@ namespace llarp bool HandleRoutingMessage(const llarp_buffer_t& buf, AbstractRouter* r); - // handle data in upstream direction - bool - HandleUpstream(const llarp_buffer_t& X, const TunnelNonce& Y, - AbstractRouter* r) override; - - // handle data in downstream direction - bool - HandleDownstream(const llarp_buffer_t& X, const TunnelNonce& Y, - AbstractRouter* r) override; - bool IsReady() const; @@ -334,6 +324,24 @@ namespace llarp bool SendExitClose(const routing::CloseExitMessage& msg, AbstractRouter* r); + void + FlushQueues(AbstractRouter* r) override; + + protected: + void + UpstreamWork(TrafficQueue_t queue, AbstractRouter* r) override; + + void + DownstreamWork(TrafficQueue_t queue, AbstractRouter* r) override; + + void + HandleAllUpstream(std::vector< RelayUpstreamMessage > msgs, + AbstractRouter* r) override; + + void + HandleAllDownstream(std::vector< RelayDownstreamMessage > msgs, + AbstractRouter* r) override; + private: /// call obtained exit hooks bool diff --git a/llarp/path/path_context.cpp b/llarp/path/path_context.cpp index 84a19f575..3a53b3d79 100644 --- a/llarp/path/path_context.cpp +++ b/llarp/path/path_context.cpp @@ -252,6 +252,13 @@ namespace llarp return nullptr; } + void + PathContext::Pump() + { + m_TransitPaths.ForEach([&](auto& ptr) { ptr->FlushQueues(m_Router); }); + m_OurPaths.ForEach([&](auto& ptr) { ptr->FlushQueues(m_Router); }); + } + void PathContext::PutTransitHop(std::shared_ptr< TransitHop > hop) { diff --git a/llarp/path/path_context.hpp b/llarp/path/path_context.hpp index fce0f1454..0234fb57b 100644 --- a/llarp/path/path_context.hpp +++ b/llarp/path/path_context.hpp @@ -37,6 +37,9 @@ namespace llarp void ExpirePaths(llarp_time_t now); + void + Pump(); + void AllowTransit(); diff --git a/llarp/path/pathset.cpp b/llarp/path/pathset.cpp index 5514d51e2..79b3af3b4 100644 --- a/llarp/path/pathset.cpp +++ b/llarp/path/pathset.cpp @@ -375,5 +375,12 @@ namespace llarp return nullptr; } + void + PathSet::FlushQueues(AbstractRouter* r) + + { + ForEachPath([r](const Path_ptr& ptr) { ptr->FlushQueues(r); }); + } + } // namespace path } // namespace llarp diff --git a/llarp/path/pathset.hpp b/llarp/path/pathset.hpp index 802351d63..ed59cb532 100644 --- a/llarp/path/pathset.hpp +++ b/llarp/path/pathset.hpp @@ -275,6 +275,9 @@ namespace llarp } } + void + FlushQueues(AbstractRouter* r); + size_t numPaths; protected: diff --git a/llarp/path/transit_hop.cpp b/llarp/path/transit_hop.cpp index b9a46c24c..858b8f1f1 100644 --- a/llarp/path/transit_hop.cpp +++ b/llarp/path/transit_hop.cpp @@ -114,39 +114,96 @@ namespace llarp return HandleDownstream(buf, N, r); } - bool - TransitHop::HandleDownstream(const llarp_buffer_t& buf, - const TunnelNonce& Y, AbstractRouter* r) + void + TransitHop::DownstreamWork(TrafficQueue_t msgs, AbstractRouter* r) { - RelayDownstreamMessage msg; - msg.pathid = info.rxID; - msg.Y = Y ^ nonceXOR; - CryptoManager::instance()->xchacha20(buf, pathKey, Y); - msg.X = buf; - llarp::LogDebug("relay ", msg.X.size(), " bytes downstream from ", - info.upstream, " to ", info.downstream); - return r->SendToOrQueue(info.downstream, &msg); + std::vector< RelayDownstreamMessage > sendmsgs(msgs.size()); + size_t idx = 0; + for(auto& ev : msgs) + { + const llarp_buffer_t buf(ev.first); + auto& msg = sendmsgs[idx]; + msg.pathid = info.rxID; + msg.Y = ev.second ^ nonceXOR; + CryptoManager::instance()->xchacha20(buf, pathKey, ev.second); + msg.X = buf; + llarp::LogDebug("relay ", msg.X.size(), " bytes downstream from ", + info.upstream, " to ", info.downstream); + ++idx; + } + r->logic()->queue_func(std::bind(&TransitHop::HandleAllDownstream, + shared_from_this(), std::move(sendmsgs), + r)); } - bool - TransitHop::HandleUpstream(const llarp_buffer_t& buf, const TunnelNonce& Y, - AbstractRouter* r) + void + TransitHop::UpstreamWork(TrafficQueue_t msgs, AbstractRouter* r) + { + std::vector< RelayUpstreamMessage > sendmsgs(msgs.size()); + size_t idx = 0; + for(auto& ev : msgs) + { + const llarp_buffer_t buf(ev.first); + auto& msg = sendmsgs[idx]; + CryptoManager::instance()->xchacha20(buf, pathKey, ev.second); + msg.pathid = info.txID; + msg.Y = ev.second ^ nonceXOR; + msg.X = buf; + ++idx; + } + r->logic()->queue_func(std::bind(&TransitHop::HandleAllUpstream, + shared_from_this(), std::move(sendmsgs), + r)); + } + + void + TransitHop::HandleAllUpstream(std::vector< RelayUpstreamMessage > msgs, + AbstractRouter* r) { - CryptoManager::instance()->xchacha20(buf, pathKey, Y); if(IsEndpoint(r->pubkey())) { - m_LastActivity = r->Now(); - return r->ParseRoutingMessageBuffer(buf, this, info.rxID); + for(const auto& msg : msgs) + { + const llarp_buffer_t buf(msg.X); + if(!r->ParseRoutingMessageBuffer(buf, this, info.rxID)) + continue; + m_LastActivity = r->Now(); + } } + else + { + for(const auto& msg : msgs) + { + llarp::LogDebug("relay ", msg.X.size(), " bytes upstream from ", + info.downstream, " to ", info.upstream); + r->SendToOrQueue(info.upstream, &msg); + } + } + } - RelayUpstreamMessage msg; - msg.pathid = info.txID; - msg.Y = Y ^ nonceXOR; + void + TransitHop::HandleAllDownstream(std::vector< RelayDownstreamMessage > msgs, + AbstractRouter* r) + { + for(const auto& msg : msgs) + { + llarp::LogDebug("relay ", msg.X.size(), " bytes downstream from ", + info.downstream, " to ", info.upstream); + r->SendToOrQueue(info.downstream, &msg); + } + } - msg.X = buf; - llarp::LogDebug("relay ", msg.X.size(), " bytes upstream from ", - info.downstream, " to ", info.upstream); - return r->SendToOrQueue(info.upstream, &msg); + void + TransitHop::FlushQueues(AbstractRouter* r) + { + if(!m_UpstreamQueue.empty()) + r->threadpool()->addJob(std::bind(&TransitHop::UpstreamWork, + shared_from_this(), + std::move(m_UpstreamQueue), r)); + if(!m_DownstreamQueue.empty()) + r->threadpool()->addJob(std::bind(&TransitHop::DownstreamWork, + shared_from_this(), + std::move(m_DownstreamQueue), r)); } bool @@ -228,7 +285,6 @@ namespace llarp { if(SendRoutingMessage(reply, r)) { - r->PumpLL(); ep->Close(); return true; } diff --git a/llarp/path/transit_hop.hpp b/llarp/path/transit_hop.hpp index eb958eb50..c29a5b6b4 100644 --- a/llarp/path/transit_hop.hpp +++ b/llarp/path/transit_hop.hpp @@ -79,7 +79,9 @@ namespace llarp return info.print(out, -1, -1); } - struct TransitHop : public IHopHandler, public routing::IMessageHandler + struct TransitHop : public IHopHandler, + public routing::IMessageHandler, + std::enable_shared_from_this< TransitHop > { TransitHop(); @@ -193,15 +195,23 @@ namespace llarp bool HandleDHTMessage(const dht::IMessage& msg, AbstractRouter* r) override; - // handle data in upstream direction - bool - HandleUpstream(const llarp_buffer_t& X, const TunnelNonce& Y, - AbstractRouter* r) override; + void + FlushQueues(AbstractRouter* r) override; - // handle data in downstream direction - bool - HandleDownstream(const llarp_buffer_t& X, const TunnelNonce& Y, - AbstractRouter* r) override; + protected: + void + UpstreamWork(TrafficQueue_t queue, AbstractRouter* r) override; + + void + DownstreamWork(TrafficQueue_t queue, AbstractRouter* r) override; + + void + HandleAllUpstream(std::vector< RelayUpstreamMessage > msgs, + AbstractRouter* r) override; + + void + HandleAllDownstream(std::vector< RelayDownstreamMessage > msgs, + AbstractRouter* r) override; private: void diff --git a/llarp/router/router.cpp b/llarp/router/router.cpp index 9a72d542b..ef55a3942 100644 --- a/llarp/router/router.cpp +++ b/llarp/router/router.cpp @@ -22,6 +22,7 @@ #include #include #include +#include #include #include @@ -164,6 +165,9 @@ namespace llarp void Router::PumpLL() { + _logic->tick(time_now_ms()); + paths.Pump(); + _logic->tick(time_now_ms()); _linkManager.PumpLinks(); } @@ -913,7 +917,7 @@ namespace llarp return false; } _outboundSessionMaker.SetOurRouter(pubkey()); - if(!_linkManager.StartLinks(_logic)) + if(!_linkManager.StartLinks(_logic, cryptoworker)) { LogWarn("One or more links failed to start."); return false; @@ -987,7 +991,7 @@ namespace llarp } LogInfo("have ", nodedb->num_loaded(), " routers"); - + _netloop->add_ticker(std::bind(&Router::PumpLL, this)); ScheduleTicker(1000); _running.store(true); _startedAt = Now(); @@ -1039,6 +1043,7 @@ namespace llarp _exitContext.Stop(); if(rpcServer) rpcServer->Stop(); + paths.Pump(); _linkManager.PumpLinks(); _logic->call_later({200, this, &RouterAfterStopIssued}); } diff --git a/test/link/test_llarp_link.cpp b/test/link/test_llarp_link.cpp index 958e85da8..c8ac1955f 100644 --- a/test/link/test_llarp_link.cpp +++ b/test/link/test_llarp_link.cpp @@ -30,6 +30,8 @@ struct LinkLayerTest : public test::LlarpTest< llarp::sodium::CryptoLibSodium > rc.enckey = encryptionKey.toPublic(); } + std::shared_ptr worker; + SecretKey signingKey; SecretKey encryptionKey; @@ -37,6 +39,12 @@ struct LinkLayerTest : public test::LlarpTest< llarp::sodium::CryptoLibSodium > bool gotLIM = false; + void Setup() + { + worker = std::make_shared(1, 128, "test-worker"); + worker->start(); + } + const RouterContact& GetRC() const { @@ -85,7 +93,7 @@ struct LinkLayerTest : public test::LlarpTest< llarp::sodium::CryptoLibSodium > return false; if(!rc.Sign(signingKey)) return false; - return link->Start(logic); + return link->Start(logic, worker); } void @@ -93,6 +101,8 @@ struct LinkLayerTest : public test::LlarpTest< llarp::sodium::CryptoLibSodium > { if(link) link->Stop(); + if(worker) + worker->stop(); } void @@ -100,6 +110,7 @@ struct LinkLayerTest : public test::LlarpTest< llarp::sodium::CryptoLibSodium > { Stop(); link.reset(); + worker.reset(); } }; @@ -125,6 +136,8 @@ struct LinkLayerTest : public test::LlarpTest< llarp::sodium::CryptoLibSodium > RouterContact::Lifetime = 500; netLoop = llarp_make_ev_loop(); m_logic.reset(new Logic()); + Alice.Setup(); + Bob.Setup(); } void