diff --git a/CMakeLists.txt b/CMakeLists.txt index 781a8ae8f..c186c5905 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,7 +1,7 @@ cmake_minimum_required(VERSION 3.10) # bionic's cmake version # Has to be set before `project()`, and ignored on non-macos: -set(CMAKE_OSX_DEPLOYMENT_TARGET 10.13 CACHE STRING "macOS deployment target (Apple clang only)") +set(CMAKE_OSX_DEPLOYMENT_TARGET 10.14 CACHE STRING "macOS deployment target (Apple clang only)") find_program(CCACHE_PROGRAM ccache) if(CCACHE_PROGRAM) diff --git a/include/llarp.hpp b/include/llarp.hpp index 880d48dd8..762bf38b1 100644 --- a/include/llarp.hpp +++ b/include/llarp.hpp @@ -46,7 +46,6 @@ namespace llarp std::unique_ptr crypto; std::unique_ptr cryptoManager; std::unique_ptr router; - std::shared_ptr worker; std::shared_ptr logic; std::unique_ptr config; std::unique_ptr nodedb; diff --git a/llarp/CMakeLists.txt b/llarp/CMakeLists.txt index 286cd0301..12e2adbbe 100644 --- a/llarp/CMakeLists.txt +++ b/llarp/CMakeLists.txt @@ -22,9 +22,7 @@ add_library(lokinet-util util/str.cpp util/thread/logic.cpp util/thread/queue_manager.cpp - util/thread/thread_pool.cpp util/thread/threading.cpp - util/thread/threadpool.cpp util/time.cpp ) add_dependencies(lokinet-util genversion) diff --git a/llarp/context.cpp b/llarp/context.cpp index 67706d3c9..1d2090870 100644 --- a/llarp/context.cpp +++ b/llarp/context.cpp @@ -9,7 +9,7 @@ #include #include #include -#include +#include #include #include @@ -47,10 +47,6 @@ namespace llarp } } - auto threads = config->router.m_workerThreads; - if (threads <= 0) - threads = 1; - worker = std::make_shared(threads, 1024, "llarp-worker"); logic = std::make_shared(); nodedb_dir = fs::path(config->router.m_dataDir / nodedb_dirname).string(); @@ -94,9 +90,11 @@ namespace llarp crypto = std::make_unique(); cryptoManager = std::make_unique(crypto.get()); - router = std::make_unique(worker, mainloop, logic); + router = std::make_unique(mainloop, logic); - nodedb = std::make_unique(router->diskworker(), nodedb_dir); + nodedb = std::make_unique( + nodedb_dir, + [r = router.get()](std::function call) { r->QueueDiskIO(std::move(call)); }); if (!router->Configure(config.get(), opts.isRouter, nodedb.get())) throw std::runtime_error("Failed to configure router"); @@ -187,16 +185,9 @@ namespace llarp void Context::Close() { - llarp::LogDebug("stop workers"); - if (worker) - worker->stop(); - llarp::LogDebug("free config"); config.release(); - llarp::LogDebug("free workers"); - worker.reset(); - llarp::LogDebug("free nodedb"); nodedb.release(); diff --git a/llarp/crypto/crypto.hpp b/llarp/crypto/crypto.hpp index 400e6bb64..823908965 100644 --- a/llarp/crypto/crypto.hpp +++ b/llarp/crypto/crypto.hpp @@ -122,7 +122,7 @@ namespace llarp Crypto* m_prevCrypto; public: - CryptoManager(Crypto* crypto) : m_prevCrypto(m_crypto) + explicit CryptoManager(Crypto* crypto) : m_prevCrypto(m_crypto) { m_crypto = crypto; } diff --git a/llarp/crypto/encrypted_frame.hpp b/llarp/crypto/encrypted_frame.hpp index 27fdcf8d0..9c7ad1b05 100644 --- a/llarp/crypto/encrypted_frame.hpp +++ b/llarp/crypto/encrypted_frame.hpp @@ -6,7 +6,6 @@ #include #include #include -#include namespace llarp { @@ -78,12 +77,14 @@ namespace llarp const SecretKey& seckey; EncryptedFrame target; + using WorkFunc_t = std::function; + using WorkerFunction_t = std::function; + void - AsyncDecrypt( - const std::shared_ptr& worker, const EncryptedFrame& frame, User_ptr u) + AsyncDecrypt(const EncryptedFrame& frame, User_ptr u, WorkerFunction_t worker) { target = frame; - worker->addJob(std::bind(&AsyncFrameDecrypter::Decrypt, this, std::move(u))); + worker(std::bind(&AsyncFrameDecrypter::Decrypt, this, std::move(u))); } }; } // namespace llarp diff --git a/llarp/ev/ev.cpp b/llarp/ev/ev.cpp index 4a08e64b2..8137a7f4f 100644 --- a/llarp/ev/ev.cpp +++ b/llarp/ev/ev.cpp @@ -26,6 +26,8 @@ llarp_make_ev_loop(size_t queueLength) void llarp_ev_loop_run_single_process(llarp_ev_loop_ptr ev, std::shared_ptr logic) { + if (ev == nullptr or logic == nullptr) + return; ev->run(); logic->clear_event_loop(); ev->stopped(); @@ -34,6 +36,8 @@ llarp_ev_loop_run_single_process(llarp_ev_loop_ptr ev, std::shared_ptrparent = ev; if (ev->udp_listen(udp, src)) return 0; diff --git a/llarp/ev/ev_libuv.cpp b/llarp/ev/ev_libuv.cpp index 78fa8f3e2..09133c798 100644 --- a/llarp/ev/ev_libuv.cpp +++ b/llarp/ev/ev_libuv.cpp @@ -461,8 +461,28 @@ namespace libuv auto* self = static_cast(udp->impl); if (self == nullptr) return -1; - uv_buf_t buf = uv_buf_init((char*)ptr, sz); - return uv_udp_try_send(&self->m_Handle, &buf, 1, to); + char* data = new char[sz]; + std::copy_n(ptr, sz, data); + uv_buf_t buf = uv_buf_init(data, sz); + uv_udp_send_t* req = new uv_udp_send_t; + req->data = data; + if (uv_udp_send( + req, + &self->m_Handle, + &buf, + 1, + to, + [](uv_udp_send_t* req, int) { + delete[](char*) req->data; + delete req; + }) + != 0) + + { + delete req; + return -1; + } + return 0; } bool diff --git a/llarp/iwp/iwp.cpp b/llarp/iwp/iwp.cpp index 115f0e1ae..408001505 100644 --- a/llarp/iwp/iwp.cpp +++ b/llarp/iwp/iwp.cpp @@ -17,10 +17,11 @@ namespace llarp SessionRenegotiateHandler reneg, TimeoutHandler timeout, SessionClosedHandler closed, - PumpDoneHandler pumpDone) + PumpDoneHandler pumpDone, + WorkerFunc_t work) { return std::make_shared( - keyManager, getrc, h, sign, est, reneg, timeout, closed, pumpDone, true); + keyManager, getrc, h, sign, est, reneg, timeout, closed, pumpDone, work, true); } LinkLayer_ptr @@ -33,10 +34,11 @@ namespace llarp SessionRenegotiateHandler reneg, TimeoutHandler timeout, SessionClosedHandler closed, - PumpDoneHandler pumpDone) + PumpDoneHandler pumpDone, + WorkerFunc_t work) { return std::make_shared( - keyManager, getrc, h, sign, est, reneg, timeout, closed, pumpDone, false); + keyManager, getrc, h, sign, est, reneg, timeout, closed, pumpDone, work, false); } } // namespace iwp } // namespace llarp diff --git a/llarp/iwp/iwp.hpp b/llarp/iwp/iwp.hpp index f33917870..34e080fb3 100644 --- a/llarp/iwp/iwp.hpp +++ b/llarp/iwp/iwp.hpp @@ -6,34 +6,34 @@ #include #include -namespace llarp +namespace llarp::iwp { - namespace iwp - { - LinkLayer_ptr - NewInboundLink( - std::shared_ptr keyManager, - GetRCFunc getrc, - LinkMessageHandler h, - SignBufferFunc sign, - SessionEstablishedHandler est, - SessionRenegotiateHandler reneg, - TimeoutHandler timeout, - SessionClosedHandler closed, - PumpDoneHandler pumpDone); - LinkLayer_ptr - NewOutboundLink( - std::shared_ptr keyManager, - GetRCFunc getrc, - LinkMessageHandler h, - SignBufferFunc sign, - SessionEstablishedHandler est, - SessionRenegotiateHandler reneg, - TimeoutHandler timeout, - SessionClosedHandler closed, - PumpDoneHandler pumpDone); + LinkLayer_ptr + NewInboundLink( + std::shared_ptr keyManager, + GetRCFunc getrc, + LinkMessageHandler h, + SignBufferFunc sign, + SessionEstablishedHandler est, + SessionRenegotiateHandler reneg, + TimeoutHandler timeout, + SessionClosedHandler closed, + PumpDoneHandler pumpDone, + WorkerFunc_t work); - } // namespace iwp -} // namespace llarp + LinkLayer_ptr + NewOutboundLink( + std::shared_ptr keyManager, + GetRCFunc getrc, + LinkMessageHandler h, + SignBufferFunc sign, + SessionEstablishedHandler est, + SessionRenegotiateHandler reneg, + TimeoutHandler timeout, + SessionClosedHandler closed, + PumpDoneHandler pumpDone, + WorkerFunc_t work); + +} // namespace llarp::iwp #endif diff --git a/llarp/iwp/linklayer.cpp b/llarp/iwp/linklayer.cpp index 24f0f70ca..15f758cbe 100644 --- a/llarp/iwp/linklayer.cpp +++ b/llarp/iwp/linklayer.cpp @@ -18,8 +18,9 @@ namespace llarp TimeoutHandler timeout, SessionClosedHandler closed, PumpDoneHandler pumpDone, + WorkerFunc_t worker, bool allowInbound) - : ILinkLayer(keyManager, getrc, h, sign, est, reneg, timeout, closed, pumpDone) + : ILinkLayer(keyManager, getrc, h, sign, est, reneg, timeout, closed, pumpDone, worker) , permitInbound{allowInbound} { } @@ -38,12 +39,6 @@ namespace llarp return 2; } - void - LinkLayer::QueueWork(std::function func) - { - m_Worker->addJob(func); - } - void LinkLayer::RecvFrom(const SockAddr& from, ILinkSession::Packet_t pkt) { diff --git a/llarp/iwp/linklayer.hpp b/llarp/iwp/linklayer.hpp index 3fc87d961..7e56f96ff 100644 --- a/llarp/iwp/linklayer.hpp +++ b/llarp/iwp/linklayer.hpp @@ -6,7 +6,6 @@ #include #include #include -#include #include #include @@ -27,6 +26,7 @@ namespace llarp TimeoutHandler timeout, SessionClosedHandler closed, PumpDoneHandler pumpDone, + WorkerFunc_t dowork, bool permitInbound); ~LinkLayer() override; @@ -49,9 +49,6 @@ namespace llarp void UnmapAddr(const IpAddress& addr); - void - QueueWork(std::function work); - private: std::unordered_map m_AuthedAddrs; const bool permitInbound; diff --git a/llarp/iwp/session.cpp b/llarp/iwp/session.cpp index 4004a8ea3..dd395f82b 100644 --- a/llarp/iwp/session.cpp +++ b/llarp/iwp/session.cpp @@ -197,7 +197,7 @@ namespace llarp Session::SendMACK() { // send multi acks - while (m_SendMACKs.size() > 0) + while (not m_SendMACKs.empty()) { const auto sz = m_SendMACKs.size(); const auto max = Session::MaxACKSInMACK; @@ -206,11 +206,11 @@ namespace llarp mack[PacketOverhead + CommandOverhead] = byte_t{static_cast(numAcks)}; byte_t* ptr = mack.data() + 3 + PacketOverhead; LogDebug("send ", numAcks, " macks to ", m_RemoteAddr); - auto itr = m_SendMACKs.begin(); + const auto& itr = m_SendMACKs.top(); while (numAcks > 0) { - htobe64buf(ptr, *itr); - itr = m_SendMACKs.erase(itr); + htobe64buf(ptr, itr); + m_SendMACKs.pop(); numAcks--; ptr += sizeof(uint64_t); } diff --git a/llarp/iwp/session.hpp b/llarp/iwp/session.hpp index 4a67d4221..98e9ba8ac 100644 --- a/llarp/iwp/session.hpp +++ b/llarp/iwp/session.hpp @@ -8,6 +8,7 @@ #include #include +#include namespace llarp { @@ -25,7 +26,7 @@ namespace llarp /// How long to keep a replay window for static constexpr auto ReplayWindow = (ReceivalTimeout * 3) / 2; /// How often to acks RX messages - static constexpr auto ACKResendInterval = DeliveryTimeout / 2; + static constexpr auto ACKResendInterval = DeliveryTimeout / 4; /// How often to retransmit TX fragments static constexpr auto TXFlushInterval = (DeliveryTimeout / 5) * 4; /// How often we send a keepalive @@ -195,8 +196,8 @@ namespace llarp /// maps rxid to time recieved std::unordered_map m_ReplayFilter; - /// set of rx messages to send in next round of multiacks - std::unordered_set m_SendMACKs; + /// rx messages to send in next round of multiacks + std::priority_queue, std::greater> m_SendMACKs; using CryptoQueue_t = std::list; using CryptoQueue_ptr = std::shared_ptr; diff --git a/llarp/link/i_link_manager.hpp b/llarp/link/i_link_manager.hpp index 615844f8d..002a59f52 100644 --- a/llarp/link/i_link_manager.hpp +++ b/llarp/link/i_link_manager.hpp @@ -44,7 +44,7 @@ namespace llarp AddLink(LinkLayer_ptr link, bool inbound = false) = 0; virtual bool - StartLinks(Logic_ptr logic, std::shared_ptr worker) = 0; + StartLinks(Logic_ptr logic) = 0; virtual void Stop() = 0; diff --git a/llarp/link/link_manager.cpp b/llarp/link/link_manager.cpp index 5c80bac41..9caf940c1 100644 --- a/llarp/link/link_manager.cpp +++ b/llarp/link/link_manager.cpp @@ -89,12 +89,12 @@ namespace llarp } bool - LinkManager::StartLinks(Logic_ptr logic, std::shared_ptr worker) + LinkManager::StartLinks(Logic_ptr logic) { LogInfo("starting ", outboundLinks.size(), " outbound links"); for (const auto& link : outboundLinks) { - if (!link->Start(logic, worker)) + if (!link->Start(logic)) { LogWarn("outbound link '", link->Name(), "' failed to start"); return false; @@ -107,7 +107,7 @@ namespace llarp LogInfo("starting ", inboundLinks.size(), " inbound links"); for (const auto& link : inboundLinks) { - if (!link->Start(logic, worker)) + if (!link->Start(logic)) { LogWarn("Link ", link->Name(), " failed to start"); return false; diff --git a/llarp/link/link_manager.hpp b/llarp/link/link_manager.hpp index a4a2e1f58..ae9bfabf7 100644 --- a/llarp/link/link_manager.hpp +++ b/llarp/link/link_manager.hpp @@ -4,7 +4,6 @@ #include #include -#include #include #include @@ -42,7 +41,7 @@ namespace llarp AddLink(LinkLayer_ptr link, bool inbound = false) override; bool - StartLinks(Logic_ptr logic, std::shared_ptr worker) override; + StartLinks(Logic_ptr logic) override; void Stop() override; diff --git a/llarp/link/server.cpp b/llarp/link/server.cpp index 1733a116b..8ac30a7bb 100644 --- a/llarp/link/server.cpp +++ b/llarp/link/server.cpp @@ -22,7 +22,8 @@ namespace llarp SessionRenegotiateHandler reneg, TimeoutHandler timeout, SessionClosedHandler closed, - PumpDoneHandler pumpDone) + PumpDoneHandler pumpDone, + WorkerFunc_t work) : HandleMessage(std::move(handler)) , HandleTimeout(std::move(timeout)) , Sign(std::move(signbuf)) @@ -31,6 +32,7 @@ namespace llarp , SessionClosed(std::move(closed)) , SessionRenegotiate(std::move(reneg)) , PumpDone(std::move(pumpDone)) + , QueueWork(std::move(work)) , m_RouterEncSecret(keyManager->encryptionKey) , m_SecretKey(keyManager->transportKey) { @@ -318,9 +320,8 @@ namespace llarp } bool - ILinkLayer::Start(std::shared_ptr l, std::shared_ptr worker) + ILinkLayer::Start(std::shared_ptr l) { - m_Worker = worker; m_Logic = l; ScheduleTick(LINK_LAYER_TICK_INTERVAL); return true; diff --git a/llarp/link/server.hpp b/llarp/link/server.hpp index f36f90db2..8c9005e5c 100644 --- a/llarp/link/server.hpp +++ b/llarp/link/server.hpp @@ -47,6 +47,10 @@ namespace llarp /// messages to upper layers using PumpDoneHandler = std::function; + using Work_t = std::function; + /// queue work to worker thread + using WorkerFunc_t = std::function; + struct ILinkLayer { ILinkLayer( @@ -58,7 +62,8 @@ namespace llarp SessionRenegotiateHandler renegotiate, TimeoutHandler timeout, SessionClosedHandler closed, - PumpDoneHandler pumpDone); + PumpDoneHandler pumpDone, + WorkerFunc_t doWork); virtual ~ILinkLayer(); /// get current time via event loop @@ -106,7 +111,7 @@ namespace llarp TryEstablishTo(RouterContact rc); bool - Start(std::shared_ptr l, std::shared_ptr worker); + Start(std::shared_ptr l); virtual void Stop(); @@ -176,6 +181,7 @@ namespace llarp SessionRenegotiateHandler SessionRenegotiate; PumpDoneHandler PumpDone; std::shared_ptr keyManager; + WorkerFunc_t QueueWork; std::shared_ptr logic() @@ -223,7 +229,6 @@ namespace llarp PutSession(const std::shared_ptr& s); std::shared_ptr m_Logic = nullptr; - std::shared_ptr m_Worker = nullptr; llarp_ev_loop_ptr m_Loop; IpAddress m_ourAddr; llarp_udp_io m_udp; diff --git a/llarp/messages/relay_commit.cpp b/llarp/messages/relay_commit.cpp index 365b04389..ac0da0a63 100644 --- a/llarp/messages/relay_commit.cpp +++ b/llarp/messages/relay_commit.cpp @@ -241,7 +241,7 @@ namespace llarp auto func = std::bind(&LR_StatusMessage::CreateAndSend, router, pathid, nextHop, pathKey, status); - router->threadpool()->addJob(func); + router->QueueWork(func); } /// this is done from logic thread @@ -478,7 +478,10 @@ namespace llarp auto frameDecrypt = std::make_shared(context, std::move(decrypter), this); // decrypt frames async - frameDecrypt->decrypter->AsyncDecrypt(context->Worker(), frameDecrypt->frames[0], frameDecrypt); + frameDecrypt->decrypter->AsyncDecrypt( + frameDecrypt->frames[0], frameDecrypt, [r = context->Router()](auto func) { + r->QueueWork(std::move(func)); + }); return true; } } // namespace llarp diff --git a/llarp/messages/relay_status.cpp b/llarp/messages/relay_status.cpp index 913958f68..f18de32c4 100644 --- a/llarp/messages/relay_status.cpp +++ b/llarp/messages/relay_status.cpp @@ -55,7 +55,7 @@ namespace llarp queue_handle() { auto func = std::bind(&llarp::LRSM_AsyncHandler::handle, shared_from_this()); - router->threadpool()->addJob(func); + router->QueueWork(func); } }; diff --git a/llarp/net/net.cpp b/llarp/net/net.cpp index 5367f1438..71a1021a7 100644 --- a/llarp/net/net.cpp +++ b/llarp/net/net.cpp @@ -417,6 +417,31 @@ namespace llarp if (ifa) freeifaddrs(ifa); } + namespace net + { + std::string + LoopbackInterfaceName() + { + const auto loopback = IPRange::FromIPv4(127, 0, 0, 0, 8); + std::string ifname; + IterAllNetworkInterfaces([&ifname, loopback](ifaddrs* const i) { + if (i->ifa_addr and i->ifa_addr->sa_family == AF_INET) + { + llarp::nuint32_t addr{((sockaddr_in*)i->ifa_addr)->sin_addr.s_addr}; + if (loopback.ContainsV4(xntohl(addr))) + { + ifname = i->ifa_name; + } + } + }); + if (ifname.empty()) + { + throw std::runtime_error( + "we have no ipv4 loopback interface for some ungodly reason, yeah idk fam"); + } + return ifname; + } + } // namespace net bool GetBestNetIF(std::string& ifname, int af) diff --git a/llarp/net/net_if.hpp b/llarp/net/net_if.hpp index d65dffcbf..90e345425 100644 --- a/llarp/net/net_if.hpp +++ b/llarp/net/net_if.hpp @@ -14,4 +14,12 @@ if_nametoindex(const char* __ifname) __THROW; #include #endif #endif + +namespace llarp::net +{ + /// get the name of the loopback interface + std::string + LoopbackInterfaceName(); +} // namespace llarp::net + #endif diff --git a/llarp/nodedb.cpp b/llarp/nodedb.cpp index 4380ab6a4..bbbda8205 100644 --- a/llarp/nodedb.cpp +++ b/llarp/nodedb.cpp @@ -9,7 +9,6 @@ #include #include #include -#include #include #include @@ -84,8 +83,7 @@ llarp_nodedb::RemoveIf(std::function filte ++itr; } } - - disk->addJob(std::bind(&KillRCJobs, files)); + disk(std::bind(&KillRCJobs, files)); } bool @@ -166,7 +164,7 @@ llarp_nodedb::InsertAsync( std::shared_ptr logic, std::function completionHandler) { - disk->addJob([this, rc, logic, completionHandler]() { + disk([this, rc, logic, completionHandler]() { this->Insert(rc); if (logic && completionHandler) { @@ -277,7 +275,7 @@ llarp_nodedb::ShouldSaveToDisk(llarp_time_t now) const void llarp_nodedb::AsyncFlushToDisk() { - disk->addJob(std::bind(&llarp_nodedb::SaveAll, this)); + disk(std::bind(&llarp_nodedb::SaveAll, this)); m_NextSaveToDisk = llarp::time_now_ms() + m_SaveInterval; } @@ -362,86 +360,47 @@ llarp_nodedb::RemoveStaleRCs(const std::set& keep, llarp_time_t }); } -/* -bool -llarp_nodedb::Save() -{ - auto itr = entries.begin(); - while(itr != entries.end()) - { - llarp::pubkey pk = itr->first; - llarp_rc *rc= itr->second; - - itr++; // advance - } - return true; -} -*/ - -// call request hook -void -logic_threadworker_callback(void* user) -{ - auto* verify_request = static_cast(user); - if (verify_request->hook) - verify_request->hook(verify_request); -} - // write it to disk void disk_threadworker_setRC(llarp_async_verify_rc* verify_request) { verify_request->valid = verify_request->nodedb->Insert(verify_request->rc); if (verify_request->logic) - verify_request->logic->queue_job({verify_request, &logic_threadworker_callback}); + { + LogicCall(verify_request->logic, [verify_request]() { + if (verify_request->hook) + verify_request->hook(verify_request); + }); + } } // we run the crypto verify in the crypto threadpool worker void -crypto_threadworker_verifyrc(void* user) +crypto_threadworker_verifyrc(llarp_async_verify_rc* verify_request) { - auto* verify_request = static_cast(user); llarp::RouterContact rc = verify_request->rc; verify_request->valid = rc.Verify(llarp::time_now_ms()); // if it's valid we need to set it if (verify_request->valid && rc.IsPublicRouter()) { - if (verify_request->diskworker) + if (verify_request->disk) { llarp::LogDebug("RC is valid, saving to disk"); - verify_request->diskworker->addJob(std::bind(&disk_threadworker_setRC, verify_request)); + verify_request->disk(std::bind(&disk_threadworker_setRC, verify_request)); return; } } // callback to logic thread - verify_request->logic->queue_job({verify_request, &logic_threadworker_callback}); -} - -void -nodedb_inform_load_rc(void* user) -{ - auto* job = static_cast(user); - job->hook(job); + LogicCall(verify_request->logic, [verify_request]() { + if (verify_request->hook) + verify_request->hook(verify_request); + }); } void llarp_nodedb_async_verify(struct llarp_async_verify_rc* job) { - job->cryptoworker->addJob(std::bind(&crypto_threadworker_verifyrc, job)); -} - -void -nodedb_async_load_rc(void* user) -{ - auto* job = static_cast(user); - - auto fpath = job->nodedb->getRCFilePath(job->pubkey); - job->loaded = job->nodedb->loadfile(fpath); - if (job->loaded) - { - job->nodedb->Get(job->pubkey, job->result); - } - job->logic->queue_job({job, &nodedb_inform_load_rc}); + job->worker(std::bind(&crypto_threadworker_verifyrc, job)); } void diff --git a/llarp/nodedb.hpp b/llarp/nodedb.hpp index 54030c896..db18f678a 100644 --- a/llarp/nodedb.hpp +++ b/llarp/nodedb.hpp @@ -17,24 +17,20 @@ * * persistent storage API for router contacts */ - -struct llarp_threadpool; - namespace llarp { class Logic; - - namespace thread - { - class ThreadPool; - } } // namespace llarp struct llarp_nodedb { - explicit llarp_nodedb( - std::shared_ptr diskworker, const std::string rootdir) - : disk(std::move(diskworker)), nodePath(rootdir) + using DiskJob_t = std::function; + using DiskCaller_t = std::function; + using WorkJob_t = std::function; + using WorkCaller_t = std::function; + + explicit llarp_nodedb(const std::string rootdir, DiskCaller_t diskCaller) + : disk(std::move(diskCaller)), nodePath(rootdir) { } @@ -44,7 +40,7 @@ struct llarp_nodedb Clear(); } - std::shared_ptr disk; + const DiskCaller_t disk; mutable llarp::util::Mutex access; // protects entries /// time for next save to disk event, 0 if never happened llarp_time_t m_NextSaveToDisk = 0s; @@ -182,8 +178,8 @@ struct llarp_async_verify_rc llarp_nodedb* nodedb; // llarp::Logic for queue_job std::shared_ptr logic; - std::shared_ptr cryptoworker; - std::shared_ptr diskworker; + llarp_nodedb::WorkCaller_t worker; + llarp_nodedb::DiskCaller_t disk; /// router contact llarp::RouterContact rc; @@ -215,7 +211,7 @@ struct llarp_async_load_rc /// llarp::Logic for calling hook llarp::Logic* logic; /// disk worker threadpool - llarp::thread::ThreadPool* diskworker; + llarp_nodedb::DiskCaller_t disk; /// target pubkey llarp::PubKey pubkey; /// router contact result diff --git a/llarp/path/path.cpp b/llarp/path/path.cpp index 96470ab7f..05e79d38d 100644 --- a/llarp/path/path.cpp +++ b/llarp/path/path.cpp @@ -451,7 +451,7 @@ namespace llarp { TrafficQueue_ptr data = nullptr; std::swap(m_UpstreamQueue, data); - r->threadpool()->addJob( + r->QueueWork( [self = shared_from_this(), data, r]() { self->UpstreamWork(std::move(data), r); }); } } @@ -463,7 +463,7 @@ namespace llarp { TrafficQueue_ptr data = nullptr; std::swap(m_DownstreamQueue, data); - r->threadpool()->addJob( + r->QueueWork( [self = shared_from_this(), data, r]() { self->DownstreamWork(std::move(data), r); }); } } diff --git a/llarp/path/path_context.cpp b/llarp/path/path_context.cpp index b3ad17cfe..5edd2dcf2 100644 --- a/llarp/path/path_context.cpp +++ b/llarp/path/path_context.cpp @@ -28,12 +28,6 @@ namespace llarp return m_AllowTransit; } - std::shared_ptr - PathContext::Worker() - { - return m_Router->threadpool(); - } - bool PathContext::CheckPathLimitHitByIP(const IpAddress& ip) { diff --git a/llarp/path/path_context.hpp b/llarp/path/path_context.hpp index 88af93ff9..a8c617804 100644 --- a/llarp/path/path_context.hpp +++ b/llarp/path/path_context.hpp @@ -34,7 +34,7 @@ namespace llarp struct PathContext { - PathContext(AbstractRouter* router); + explicit PathContext(AbstractRouter* router); /// called from router tick function void @@ -147,9 +147,6 @@ namespace llarp } }; - std::shared_ptr - Worker(); - std::shared_ptr logic(); diff --git a/llarp/path/pathbuilder.cpp b/llarp/path/pathbuilder.cpp index 6643411f3..8620188cc 100644 --- a/llarp/path/pathbuilder.cpp +++ b/llarp/path/pathbuilder.cpp @@ -16,6 +16,8 @@ namespace llarp { struct AsyncPathKeyExchangeContext : std::enable_shared_from_this { + using WorkFunc_t = std::function; + using WorkerFunc_t = std::function; using Path_t = path::Path_ptr; using PathSet_t = path::PathSet_ptr; PathSet_t pathset = nullptr; @@ -25,7 +27,7 @@ namespace llarp Handler result; size_t idx = 0; AbstractRouter* router = nullptr; - std::shared_ptr worker; + WorkerFunc_t work; std::shared_ptr logic; LR_CommitMessage LRCM; @@ -100,26 +102,24 @@ namespace llarp else { // next hop - worker->addJob( - std::bind(&AsyncPathKeyExchangeContext::GenerateNextKey, shared_from_this())); + work(std::bind(&AsyncPathKeyExchangeContext::GenerateNextKey, shared_from_this())); } } /// Generate all keys asynchronously and call handler when done void - AsyncGenerateKeys( - Path_t p, std::shared_ptr l, std::shared_ptr pool, Handler func) + AsyncGenerateKeys(Path_t p, std::shared_ptr l, WorkerFunc_t worker, Handler func) { path = p; logic = l; result = func; - worker = pool; + work = worker; for (size_t i = 0; i < path::max_len; ++i) { LRCM.frames[i].Randomize(); } - pool->addJob(std::bind(&AsyncPathKeyExchangeContext::GenerateNextKey, shared_from_this())); + work(std::bind(&AsyncPathKeyExchangeContext::GenerateNextKey, shared_from_this())); } }; @@ -442,7 +442,10 @@ namespace llarp path->SetBuildResultHook([self](Path_ptr p) { self->HandlePathBuilt(p); }); ctx->AsyncGenerateKeys( - path, m_router->logic(), m_router->threadpool(), &PathBuilderKeysGenerated); + path, + m_router->logic(), + [r = m_router](auto func) { r->QueueWork(std::move(func)); }, + &PathBuilderKeysGenerated); } void diff --git a/llarp/path/transit_hop.cpp b/llarp/path/transit_hop.cpp index 5f5afd6b8..167f57eda 100644 --- a/llarp/path/transit_hop.cpp +++ b/llarp/path/transit_hop.cpp @@ -248,9 +248,9 @@ namespace llarp { if (m_UpstreamQueue && not m_UpstreamQueue->empty()) { - r->threadpool()->addJob([self = shared_from_this(), - data = std::move(m_UpstreamQueue), - r]() { self->UpstreamWork(data, r); }); + r->QueueWork([self = shared_from_this(), data = std::move(m_UpstreamQueue), r]() { + self->UpstreamWork(data, r); + }); } m_UpstreamQueue = nullptr; } @@ -260,9 +260,9 @@ namespace llarp { if (m_DownstreamQueue && not m_DownstreamQueue->empty()) { - r->threadpool()->addJob([self = shared_from_this(), - data = std::move(m_DownstreamQueue), - r]() { self->DownstreamWork(data, r); }); + r->QueueWork([self = shared_from_this(), data = std::move(m_DownstreamQueue), r]() { + self->DownstreamWork(data, r); + }); } m_DownstreamQueue = nullptr; } diff --git a/llarp/path/transit_hop.hpp b/llarp/path/transit_hop.hpp index eab437e2c..e29837b1f 100644 --- a/llarp/path/transit_hop.hpp +++ b/llarp/path/transit_hop.hpp @@ -7,6 +7,7 @@ #include #include #include +#include namespace llarp { diff --git a/llarp/router/abstractrouter.hpp b/llarp/router/abstractrouter.hpp index d303e7ebd..1e9a61509 100644 --- a/llarp/router/abstractrouter.hpp +++ b/llarp/router/abstractrouter.hpp @@ -77,7 +77,7 @@ namespace llarp struct AbstractRouter { #ifdef LOKINET_HIVE - tooling::RouterHive* hive; + tooling::RouterHive* hive = nullptr; #endif virtual ~AbstractRouter() = default; @@ -127,11 +127,13 @@ namespace llarp virtual llarp_ev_loop_ptr netloop() const = 0; - virtual std::shared_ptr - threadpool() = 0; + /// call function in crypto worker + virtual void + QueueWork(std::function) = 0; - virtual std::shared_ptr - diskworker() = 0; + /// call function in disk io thread + virtual void + QueueDiskIO(std::function) = 0; virtual service::Context& hiddenServiceContext() = 0; @@ -286,14 +288,10 @@ namespace llarp template void - NotifyRouterEvent(Params&&... args) const + NotifyRouterEvent([[maybe_unused]] Params&&... args) const { - // TODO: no-op when appropriate - auto event = std::make_unique(args...); #ifdef LOKINET_HIVE - hive->NotifyEvent(std::move(event)); -#elif LOKINET_DEBUG - LogDebug(event->ToString()); + hive->NotifyEvent(std::make_unique(args...)); #endif } }; diff --git a/llarp/router/outbound_message_handler.hpp b/llarp/router/outbound_message_handler.hpp index 6a113d821..44f53e69f 100644 --- a/llarp/router/outbound_message_handler.hpp +++ b/llarp/router/outbound_message_handler.hpp @@ -5,13 +5,13 @@ #include #include -#include #include #include #include #include #include +#include struct llarp_buffer_t; diff --git a/llarp/router/outbound_session_maker.cpp b/llarp/router/outbound_session_maker.cpp index fc220d093..f7e50185d 100644 --- a/llarp/router/outbound_session_maker.cpp +++ b/llarp/router/outbound_session_maker.cpp @@ -46,7 +46,7 @@ namespace llarp } auto func = std::bind(&OutboundSessionMaker::VerifyRC, this, session->GetRemoteRC()); - _threadpool->addJob(func); + work(func); return true; } @@ -157,14 +157,14 @@ namespace llarp Profiling* profiler, std::shared_ptr logic, llarp_nodedb* nodedb, - std::shared_ptr threadpool) + WorkerFunc_t dowork) { _linkManager = linkManager; _rcLookup = rcLookup; _logic = logic; _nodedb = nodedb; - _threadpool = threadpool; _profiler = profiler; + work = dowork; } void diff --git a/llarp/router/outbound_session_maker.hpp b/llarp/router/outbound_session_maker.hpp index 4501267e8..63069f65e 100644 --- a/llarp/router/outbound_session_maker.hpp +++ b/llarp/router/outbound_session_maker.hpp @@ -6,7 +6,6 @@ #include #include #include -#include #include @@ -25,6 +24,9 @@ namespace llarp struct OutboundSessionMaker final : public IOutboundSessionMaker { + using Work_t = std::function; + using WorkerFunc_t = std::function; + using CallbacksQueue = std::list; public: @@ -61,7 +63,7 @@ namespace llarp Profiling* profiler, std::shared_ptr logic, llarp_nodedb* nodedb, - std::shared_ptr threadpool); + WorkerFunc_t work); void SetOurRouter(RouterID r) @@ -113,7 +115,7 @@ namespace llarp Profiling* _profiler = nullptr; llarp_nodedb* _nodedb = nullptr; std::shared_ptr _logic; - std::shared_ptr _threadpool; + WorkerFunc_t work; RouterID us; }; diff --git a/llarp/router/rc_lookup_handler.cpp b/llarp/router/rc_lookup_handler.cpp index 198adf0b5..d696bdb70 100644 --- a/llarp/router/rc_lookup_handler.cpp +++ b/llarp/router/rc_lookup_handler.cpp @@ -194,7 +194,7 @@ namespace llarp return false; auto func = std::bind(&RCLookupHandler::CheckRC, this, newrc); - _threadpool->addJob(func); + _work(func); // update dht if required if (_dht->impl->Nodes()->HasNode(dht::Key_t{newrc.pubkey})) @@ -299,7 +299,7 @@ namespace llarp RCLookupHandler::Init( llarp_dht_context* dht, llarp_nodedb* nodedb, - std::shared_ptr threadpool, + WorkerFunc_t dowork, ILinkManager* linkManager, service::Context* hiddenServiceContext, const std::set& strictConnectPubkeys, @@ -309,7 +309,7 @@ namespace llarp { _dht = dht; _nodedb = nodedb; - _threadpool = threadpool; + _work = dowork; _hiddenServiceContext = hiddenServiceContext; _strictConnectPubkeys = strictConnectPubkeys; _bootstrapRCList = bootstrapRCList; diff --git a/llarp/router/rc_lookup_handler.hpp b/llarp/router/rc_lookup_handler.hpp index 85453b566..45a5169c9 100644 --- a/llarp/router/rc_lookup_handler.hpp +++ b/llarp/router/rc_lookup_handler.hpp @@ -5,7 +5,6 @@ #include #include -#include #include #include @@ -27,6 +26,8 @@ namespace llarp struct RCLookupHandler final : public I_RCLookupHandler { public: + using Work_t = std::function; + using WorkerFunc_t = std::function; using CallbacksQueue = std::list; ~RCLookupHandler() override = default; @@ -72,7 +73,7 @@ namespace llarp Init( llarp_dht_context* dht, llarp_nodedb* nodedb, - std::shared_ptr threadpool, + WorkerFunc_t dowork, ILinkManager* linkManager, service::Context* hiddenServiceContext, const std::set& strictConnectPubkeys, @@ -98,7 +99,7 @@ namespace llarp llarp_dht_context* _dht = nullptr; llarp_nodedb* _nodedb = nullptr; - std::shared_ptr _threadpool = nullptr; + WorkerFunc_t _work = nullptr; service::Context* _hiddenServiceContext = nullptr; ILinkManager* _linkManager = nullptr; diff --git a/llarp/router/router.cpp b/llarp/router/router.cpp index ac544eafc..7783b044f 100644 --- a/llarp/router/router.cpp +++ b/llarp/router/router.cpp @@ -44,19 +44,15 @@ static constexpr std::chrono::milliseconds ROUTER_TICK_INTERVAL = 1s; namespace llarp { - Router::Router( - std::shared_ptr _tp, - llarp_ev_loop_ptr __netloop, - std::shared_ptr l) + Router::Router(llarp_ev_loop_ptr __netloop, std::shared_ptr l) : ready(false) , m_lmq(std::make_shared()) , _netloop(std::move(__netloop)) - , cryptoworker(std::move(_tp)) , _logic(std::move(l)) , paths(this) , _exitContext(this) - , disk(std::make_shared(1, 1000, "diskworker")) , _dht(llarp_dht_context_new(this)) + , m_DiskThread(m_lmq->add_tagged_thread("disk")) , inbound_link_msg_parser(this) , _hiddenServiceContext(this) , m_RPCServer(new rpc::RpcServer(m_lmq, this)) @@ -237,35 +233,34 @@ namespace llarp bool Router::Configure(Config* conf, bool isRouter, llarp_nodedb* nodedb) { - if (nodedb == nullptr) + // we need this first so we can start lmq to fetch keys + if (conf) { - throw std::invalid_argument("nodedb cannot be null"); + enableRPCServer = conf->api.m_enableRPCServer; + rpcBindAddr = conf->api.m_rpcBindAddr; + whitelistRouters = conf->lokid.whitelistRouters; + lokidRPCAddr = conf->lokid.lokidRPCAddr; } - _nodedb = nodedb; - - // we need this first so we can start lmq to fetch keys - enableRPCServer = conf->api.m_enableRPCServer; - rpcBindAddr = conf->api.m_rpcBindAddr; - whitelistRouters = conf->lokid.whitelistRouters; - lokidRPCAddr = conf->lokid.lokidRPCAddr; - if (not StartRpcServer()) throw std::runtime_error("Failed to start rpc server"); m_lmq->start(); + _nodedb = nodedb; + if (whitelistRouters) { m_lokidRpcClient->ConnectAsync(std::string_view{lokidRPCAddr}); } // fetch keys - if (not m_keyManager->initialize(*conf, true, isRouter)) - throw std::runtime_error("KeyManager failed to initialize"); - - if (!FromConfig(conf)) - throw std::runtime_error("FromConfig() failed"); - + if (conf) + { + if (not m_keyManager->initialize(*conf, true, isRouter)) + throw std::runtime_error("KeyManager failed to initialize"); + if (!FromConfig(conf)) + throw std::runtime_error("FromConfig() failed"); + } if (!InitOutboundLinks()) throw std::runtime_error("InitOutboundLinks() failed"); @@ -293,7 +288,7 @@ namespace llarp LogError("RC is invalid, not saving"); return false; } - diskworker()->addJob(std::bind(&Router::HandleSaveRC, this)); + QueueDiskIO(std::bind(&Router::HandleSaveRC, this)); return true; } @@ -308,8 +303,6 @@ namespace llarp { LogInfo("closing router"); llarp_ev_loop_stop(_netloop); - disk->stop(); - disk->shutdown(); _running.store(false); } @@ -527,12 +520,17 @@ namespace llarp // Init components after relevant config settings loaded _outboundMessageHandler.Init(&_linkManager, _logic); _outboundSessionMaker.Init( - &_linkManager, &_rcLookupHandler, &_routerProfiling, _logic, _nodedb, threadpool()); + &_linkManager, + &_rcLookupHandler, + &_routerProfiling, + _logic, + _nodedb, + util::memFn(&AbstractRouter::QueueWork, this)); _linkManager.Init(&_outboundSessionMaker); _rcLookupHandler.Init( _dht, _nodedb, - threadpool(), + util::memFn(&AbstractRouter::QueueWork, this), &_linkManager, &_hiddenServiceContext, strictConnectPubkeys, @@ -552,7 +550,8 @@ namespace llarp util::memFn(&AbstractRouter::CheckRenegotiateValid, this), util::memFn(&IOutboundSessionMaker::OnConnectTimeout, &_outboundSessionMaker), util::memFn(&AbstractRouter::SessionClosed, this), - util::memFn(&AbstractRouter::PumpLL, this)); + util::memFn(&AbstractRouter::PumpLL, this), + util::memFn(&AbstractRouter::QueueWork, this)); const std::string& key = serverConfig.interface; int af = serverConfig.addressFamily; @@ -590,7 +589,7 @@ namespace llarp conf->logging.m_logType, conf->logging.m_logFile, conf->router.m_nickname, - diskworker()); + util::memFn(&AbstractRouter::QueueDiskIO, this)); return true; } @@ -753,7 +752,7 @@ namespace llarp // save profiles if (routerProfiling().ShouldSave(now)) { - diskworker()->addJob([&]() { routerProfiling().Save(routerProfilesFile.c_str()); }); + QueueDiskIO([&]() { routerProfiling().Save(routerProfilesFile.c_str()); }); } // save nodedb if (nodedb()->ShouldSaveToDisk(now)) @@ -869,18 +868,6 @@ namespace llarp if (_running || _stopping) return false; - if (!cryptoworker->start()) - { - LogError("crypto worker failed to start"); - return false; - } - - if (!disk->start()) - { - LogError("disk worker failed to start"); - return false; - } - routerProfiling().Load(routerProfilesFile.c_str()); // set public signing key @@ -932,7 +919,7 @@ namespace llarp } } _outboundSessionMaker.SetOurRouter(pubkey()); - if (!_linkManager.StartLinks(_logic, cryptoworker)) + if (!_linkManager.StartLinks(_logic)) { LogWarn("One or more links failed to start."); return false; @@ -1157,7 +1144,8 @@ namespace llarp util::memFn(&AbstractRouter::CheckRenegotiateValid, this), util::memFn(&IOutboundSessionMaker::OnConnectTimeout, &_outboundSessionMaker), util::memFn(&AbstractRouter::SessionClosed, this), - util::memFn(&AbstractRouter::PumpLL, this)); + util::memFn(&AbstractRouter::PumpLL, this), + util::memFn(&AbstractRouter::QueueWork, this)); if (!link) throw std::runtime_error("NewOutboundLink() failed to provide a link"); diff --git a/llarp/router/router.hpp b/llarp/router/router.hpp index 628bbab54..bce756a11 100644 --- a/llarp/router/router.hpp +++ b/llarp/router/router.hpp @@ -34,7 +34,6 @@ #include #include #include -#include #include #include @@ -166,31 +165,30 @@ namespace llarp return _netloop; } - std::shared_ptr - threadpool() override + void + QueueWork(std::function func) override { - return cryptoworker; + m_lmq->job(std::move(func)); } - std::shared_ptr - diskworker() override + void + QueueDiskIO(std::function func) override { - return disk; + m_lmq->job(std::move(func), m_DiskThread); } IpAddress _ourAddress; llarp_ev_loop_ptr _netloop; - std::shared_ptr cryptoworker; std::shared_ptr _logic; path::PathContext paths; exit::Context _exitContext; SecretKey _identity; SecretKey _encryption; - std::shared_ptr disk; llarp_dht_context* _dht = nullptr; llarp_nodedb* _nodedb; llarp_time_t _startedAt; + const lokimq::TaggedThreadID m_DiskThread; llarp_time_t Uptime() const override; @@ -311,10 +309,7 @@ namespace llarp void GossipRCIfNeeded(const RouterContact rc) override; - Router( - std::shared_ptr worker, - llarp_ev_loop_ptr __netloop, - std::shared_ptr logic); + explicit Router(llarp_ev_loop_ptr __netloop, std::shared_ptr logic); ~Router() override; diff --git a/llarp/rpc/lokid_rpc_client.cpp b/llarp/rpc/lokid_rpc_client.cpp index acf134248..f45b333fb 100644 --- a/llarp/rpc/lokid_rpc_client.cpp +++ b/llarp/rpc/lokid_rpc_client.cpp @@ -1,6 +1,5 @@ #include -#include #include #include @@ -173,7 +172,7 @@ namespace llarp promise.set_value(std::nullopt); return; } - if (data.size() < 2) + if (data.empty()) { LogError("failed to get private key, no response"); promise.set_value(std::nullopt); @@ -181,7 +180,7 @@ namespace llarp } try { - auto j = nlohmann::json::parse(data[1]); + auto j = nlohmann::json::parse(data[0]); SecretKey k; if (not k.FromHex(j.at("service_node_ed25519_privkey").get())) { diff --git a/llarp/service/endpoint.cpp b/llarp/service/endpoint.cpp index 2f0b48cde..d8e120c7b 100644 --- a/llarp/service/endpoint.cpp +++ b/llarp/service/endpoint.cpp @@ -689,8 +689,8 @@ namespace llarp { llarp_async_verify_rc* job = new llarp_async_verify_rc(); job->nodedb = Router()->nodedb(); - job->cryptoworker = Router()->threadpool(); - job->diskworker = Router()->diskworker(); + job->worker = util::memFn(&AbstractRouter::QueueWork, Router()); + job->disk = util::memFn(&AbstractRouter::QueueDiskIO, Router()); job->logic = Router()->logic(); job->hook = std::bind(&Endpoint::HandleVerifyGotRouter, this, msg, std::placeholders::_1); job->rc = rc; @@ -908,7 +908,7 @@ namespace llarp RemoveConvoTag(frame.T); return true; } - if (!frame.AsyncDecryptAndVerify(EndpointLogic(), p, CryptoWorker(), m_Identity, this)) + if (!frame.AsyncDecryptAndVerify(EndpointLogic(), p, m_Identity, this)) { // send discard ProtocolFrame f; @@ -1240,7 +1240,7 @@ namespace llarp f.F = m->introReply.pathID; transfer->P = remoteIntro.pathID; auto self = this; - return CryptoWorker()->addJob([transfer, p, m, K, self]() { + Router()->QueueWork([transfer, p, m, K, self]() { if (not transfer->T.EncryptAndSign(*m, K, self->m_Identity)) { LogError("failed to encrypt and sign"); @@ -1250,6 +1250,7 @@ namespace llarp util::Lock lock(self->m_state->m_SendQueueMutex); self->m_state->m_SendQueue.emplace_back(transfer, p); }); + return true; } } } @@ -1333,12 +1334,6 @@ namespace llarp return m_state->m_IsolatedLogic ? m_state->m_IsolatedLogic : Router()->logic(); } - std::shared_ptr - Endpoint::CryptoWorker() - { - return Router()->threadpool(); - } - AbstractRouter* Endpoint::Router() { diff --git a/llarp/service/endpoint.hpp b/llarp/service/endpoint.hpp index bf35971b6..20a759be7 100644 --- a/llarp/service/endpoint.hpp +++ b/llarp/service/endpoint.hpp @@ -141,10 +141,6 @@ namespace llarp llarp_ev_loop_ptr EndpointNetLoop(); - /// crypto worker threadpool - std::shared_ptr - CryptoWorker(); - AbstractRouter* Router(); diff --git a/llarp/service/outbound_context.cpp b/llarp/service/outbound_context.cpp index 015b4cd54..d7142fafa 100644 --- a/llarp/service/outbound_context.cpp +++ b/llarp/service/outbound_context.cpp @@ -207,7 +207,7 @@ namespace llarp ex->msg.PutBuffer(payload); ex->msg.introReply = path->intro; frame->F = ex->msg.introReply.pathID; - m_Endpoint->CryptoWorker()->addJob(std::bind(&AsyncKeyExchange::Encrypt, ex, frame)); + m_Endpoint->Router()->QueueWork(std::bind(&AsyncKeyExchange::Encrypt, ex, frame)); } std::string diff --git a/llarp/service/protocol.cpp b/llarp/service/protocol.cpp index 39a2af724..49c5d7f88 100644 --- a/llarp/service/protocol.cpp +++ b/llarp/service/protocol.cpp @@ -6,6 +6,7 @@ #include #include #include +#include #include namespace llarp @@ -384,7 +385,6 @@ namespace llarp ProtocolFrame::AsyncDecryptAndVerify( std::shared_ptr logic, path::Path_ptr recvPath, - const std::shared_ptr& worker, const Identity& localIdent, Endpoint* handler) const { @@ -397,7 +397,7 @@ namespace llarp auto dh = std::make_shared( logic, localIdent, handler, msg, *this, recvPath->intro); dh->path = recvPath; - worker->addJob(std::bind(&AsyncFrameDecrypt::Work, dh)); + handler->Router()->QueueWork(std::bind(&AsyncFrameDecrypt::Work, dh)); return true; } @@ -415,7 +415,7 @@ namespace llarp return false; } v->frame = *this; - worker->addJob([v, msg = std::move(msg), recvPath = std::move(recvPath)]() { + handler->Router()->QueueWork([v, msg = std::move(msg), recvPath = std::move(recvPath)]() { if (not v->frame.Verify(v->si)) { LogError("Signature failure from ", v->si.Addr()); diff --git a/llarp/service/protocol.hpp b/llarp/service/protocol.hpp index 3e5bb2650..4ca6f4421 100644 --- a/llarp/service/protocol.hpp +++ b/llarp/service/protocol.hpp @@ -125,7 +125,6 @@ namespace llarp AsyncDecryptAndVerify( std::shared_ptr logic, path::Path_ptr fromPath, - const std::shared_ptr& worker, const Identity& localIdent, Endpoint* handler) const; diff --git a/llarp/service/sendcontext.cpp b/llarp/service/sendcontext.cpp index febc15305..959a2a644 100644 --- a/llarp/service/sendcontext.cpp +++ b/llarp/service/sendcontext.cpp @@ -98,7 +98,7 @@ namespace llarp m->tag = f->T; m->PutBuffer(payload); auto self = this; - m_Endpoint->CryptoWorker()->addJob([f, m, shared, path, self]() { + m_Endpoint->Router()->QueueWork([f, m, shared, path, self]() { if (not f->EncryptAndSign(*m, shared, self->m_Endpoint->GetIdentity())) { LogError(self->m_Endpoint->Name(), " failed to sign message"); diff --git a/llarp/util/logging/file_logger.cpp b/llarp/util/logging/file_logger.cpp index 61d69f9f5..704e615aa 100644 --- a/llarp/util/logging/file_logger.cpp +++ b/llarp/util/logging/file_logger.cpp @@ -26,7 +26,7 @@ namespace llarp // namespace FileLogStream::FileLogStream( - std::shared_ptr disk, FILE* f, llarp_time_t flushInterval, bool closeFile) + std::function disk, FILE* f, llarp_time_t flushInterval, bool closeFile) : m_Lines(1024 * 8) , m_Disk(std::move(disk)) , m_File(f) @@ -112,7 +112,7 @@ namespace llarp { FILE* const f = m_File; auto lines = &m_Lines; - m_Disk->addJob([f, lines]() { Flush(lines, f); }); + m_Disk([f, lines]() { Flush(lines, f); }); m_LastFlush = now; } } // namespace llarp diff --git a/llarp/util/logging/file_logger.hpp b/llarp/util/logging/file_logger.hpp index 43dd51e36..15ce24ceb 100644 --- a/llarp/util/logging/file_logger.hpp +++ b/llarp/util/logging/file_logger.hpp @@ -3,7 +3,6 @@ #include -#include #include #include @@ -14,11 +13,10 @@ namespace llarp /// flushable file based log stream struct FileLogStream : public ILogStream { + using Work_t = std::function; + FileLogStream( - std::shared_ptr disk, - FILE* f, - llarp_time_t flushInterval, - bool closefile = true); + std::function io, FILE* f, llarp_time_t flushInterval, bool closefile = true); ~FileLogStream() override; @@ -65,7 +63,7 @@ namespace llarp void FlushLinesToDisk(llarp_time_t now); - std::shared_ptr m_Disk; + const std::function m_Disk; FILE* const m_File; const llarp_time_t m_FlushInterval; llarp_time_t m_LastFlush = 0s; diff --git a/llarp/util/logging/json_logger.hpp b/llarp/util/logging/json_logger.hpp index cd5526b39..6cf4ff511 100644 --- a/llarp/util/logging/json_logger.hpp +++ b/llarp/util/logging/json_logger.hpp @@ -8,11 +8,11 @@ namespace llarp struct JSONLogStream : public FileLogStream { JSONLogStream( - std::shared_ptr disk, + std::function disk, FILE* f, llarp_time_t flushInterval, bool closeFile) - : FileLogStream(disk, f, flushInterval, closeFile) + : FileLogStream(std::move(disk), f, flushInterval, closeFile) { } diff --git a/llarp/util/logging/logger.cpp b/llarp/util/logging/logger.cpp index f604c07d5..93e108dac 100644 --- a/llarp/util/logging/logger.cpp +++ b/llarp/util/logging/logger.cpp @@ -1,5 +1,4 @@ #include -#include #include #include #include @@ -108,7 +107,7 @@ namespace llarp LogType type, const std::string& file, const std::string& nickname, - std::shared_ptr threadpool) + std::function io) { SetLogLevel(level); nodeName = nickname; @@ -140,7 +139,7 @@ namespace llarp std::cout << std::flush; LogContext::Instance().logStream = - std::make_unique(threadpool, logfile, 100ms, true); + std::make_unique(io, logfile, 100ms, true); } else { @@ -153,7 +152,7 @@ namespace llarp std::cout << std::flush; LogContext::Instance().logStream = - std::make_unique(threadpool, logfile, 100ms, logfile != stdout); + std::make_unique(io, logfile, 100ms, logfile != stdout); break; case LogType::Syslog: if (logfile) @@ -188,18 +187,3 @@ namespace llarp } } // namespace llarp - -extern "C" -{ - void - cSetLogLevel(LogLevel lvl) - { - llarp::SetLogLevel((llarp::LogLevel)lvl); - } - - void - cSetLogNodeName(const char* name) - { - llarp::LogContext::Instance().nodeName = name; - } -} diff --git a/llarp/util/logging/logger.h b/llarp/util/logging/logger.h deleted file mode 100644 index 8d9f12fb7..000000000 --- a/llarp/util/logging/logger.h +++ /dev/null @@ -1,24 +0,0 @@ -#ifndef LLARP_LOGGER_H -#define LLARP_LOGGER_H - -#ifdef __cplusplus -extern "C" -{ - enum LogLevel - { - eLogDebug, - eLogInfo, - eLogWarn, - eLogError, - eLogNone - }; - - void - cSetLogLevel(enum LogLevel lvl); - - void - cSetLogNodeName(const char* name); -} -#endif - -#endif diff --git a/llarp/util/logging/logger.hpp b/llarp/util/logging/logger.hpp index 26b7c37d5..a8375fdc3 100644 --- a/llarp/util/logging/logger.hpp +++ b/llarp/util/logging/logger.hpp @@ -5,7 +5,6 @@ #include #include #include -#include namespace llarp { @@ -21,6 +20,8 @@ namespace llarp struct LogContext { + using IOFunc_t = std::function; + LogContext(); LogLevel curLevel = eLogInfo; LogLevel startupLevel = eLogInfo; @@ -50,14 +51,14 @@ namespace llarp /// @param type is the type of logger to set up /// @param file is the file to log to (relevant for types File and Json) /// @param nickname is a tag to add to each log statement - /// @param threadpool is a threadpool where I/O can offloaded + /// @param io is a callable that queues work that does io, async void Initialize( LogLevel level, LogType type, const std::string& file, const std::string& nickname, - std::shared_ptr threadpool); + std::function io); }; /// RAII type to turn logging off diff --git a/llarp/util/thread/logic.cpp b/llarp/util/thread/logic.cpp index 16e84a838..1aded0fd9 100644 --- a/llarp/util/thread/logic.cpp +++ b/llarp/util/thread/logic.cpp @@ -6,17 +6,6 @@ namespace llarp { - bool - Logic::queue_job(struct llarp_thread_job job) - { - if (job.user && job.work) - { - LogicCall(this, std::bind(job.work, job.user)); - return true; - } - return false; - } - void Logic::stop() { diff --git a/llarp/util/thread/logic.hpp b/llarp/util/thread/logic.hpp index 91e850251..4faf2f730 100644 --- a/llarp/util/thread/logic.hpp +++ b/llarp/util/thread/logic.hpp @@ -3,7 +3,6 @@ #include #include -#include #include namespace llarp @@ -15,9 +14,6 @@ namespace llarp void stop(); - bool - queue_job(struct llarp_thread_job job); - void Call(std::function func); diff --git a/llarp/util/thread/thread_pool.cpp b/llarp/util/thread/thread_pool.cpp deleted file mode 100644 index a313b1e8c..000000000 --- a/llarp/util/thread/thread_pool.cpp +++ /dev/null @@ -1,331 +0,0 @@ -#include - -#include - -namespace llarp -{ - namespace thread - { - void - ThreadPool::join() - { - for (auto& t : m_threads) - { - if (t.joinable()) - { - t.join(); - } - } - - m_createdThreads = 0; - } - - void - ThreadPool::runJobs() - { - while (m_status.load(std::memory_order_relaxed) == Status::Run) - { - auto functor = m_queue.tryPopFront(); - - if (functor) - { - (*functor)(); - } - else - { - m_idleThreads++; - - if (m_status == Status::Run && m_queue.empty()) - { - m_semaphore.wait(); - } - - m_idleThreads.fetch_sub(1, std::memory_order_relaxed); - } - } - } - - void - ThreadPool::drainQueue() - { - while (m_status.load(std::memory_order_relaxed) == Status::Drain) - { - auto functor = m_queue.tryPopFront(); - - if (!functor) - { - return; - } - - (*functor)(); - } - } - - void - ThreadPool::waitThreads() - { - std::unique_lock lock{m_gateMutex}; - m_numThreadsCV.wait(lock, [this] { return allThreadsReady(); }); - } - - void - ThreadPool::releaseThreads() - { - { - std::lock_guard lock{m_gateMutex}; - m_numThreadsReady = 0; - ++m_gateCount; - } - m_gateCV.notify_all(); - } - - void - ThreadPool::interrupt() - { - std::lock_guard lock{m_gateMutex}; - - size_t count = m_idleThreads; - - for (size_t i = 0; i < count; ++i) - { - m_semaphore.notify(); - } - } - - void - ThreadPool::worker() - { - // Lock will be valid until the end of the statement - size_t gateCount = (std::lock_guard{m_gateMutex}, m_gateCount); - - util::SetThreadName(m_name); - - for (;;) - { - { - std::unique_lock lock{m_gateMutex}; - ++m_numThreadsReady; - m_numThreadsCV.notify_one(); - - m_gateCV.wait(lock, [&] { return gateCount != m_gateCount; }); - - gateCount = m_gateCount; - } - - Status status = m_status.load(std::memory_order_relaxed); - - // Can't use a switch here as we want to load and fall through. - - if (status == Status::Run) - { - runJobs(); - status = m_status; - } - - if (status == Status::Drain) - { - drainQueue(); - } - else if (status == Status::Suspend) - { - continue; - } - else - { - assert(status == Status::Stop); - return; - } - } - } - - bool - ThreadPool::spawn() - { - try - { - m_threads.at(m_createdThreads) = std::thread(std::bind(&ThreadPool::worker, this)); - ++m_createdThreads; - return true; - } - catch (const std::system_error&) - { - return false; - } - } - - ThreadPool::ThreadPool(size_t numThreads, size_t maxJobs, std::string_view name) - : m_queue(maxJobs) - , m_semaphore(0) - , m_idleThreads(0) - , m_status(Status::Stop) - , m_gateCount(0) - , m_numThreadsReady(0) - , m_name(name) - , m_threads(numThreads) - , m_createdThreads(0) - { - assert(numThreads != 0); - assert(maxJobs != 0); - disable(); - } - - ThreadPool::~ThreadPool() - { - shutdown(); - } - - bool - ThreadPool::addJob(const Job& job) - { - assert(job); - - QueueReturn ret = m_queue.pushBack(job); - - if (ret == QueueReturn::Success && m_idleThreads > 0) - { - m_semaphore.notify(); - } - - return ret == QueueReturn::Success; - } - bool - ThreadPool::addJob(Job&& job) - { - assert(job); - QueueReturn ret = m_queue.pushBack(std::move(job)); - - if (ret == QueueReturn::Success && m_idleThreads > 0) - { - m_semaphore.notify(); - } - - return ret == QueueReturn::Success; - } - - bool - ThreadPool::tryAddJob(const Job& job) - { - assert(job); - QueueReturn ret = m_queue.tryPushBack(job); - - if (ret == QueueReturn::Success && m_idleThreads > 0) - { - m_semaphore.notify(); - } - - return ret == QueueReturn::Success; - } - - bool - ThreadPool::tryAddJob(Job&& job) - { - assert(job); - QueueReturn ret = m_queue.tryPushBack(std::move(job)); - - if (ret == QueueReturn::Success && m_idleThreads > 0) - { - m_semaphore.notify(); - } - - return ret == QueueReturn::Success; - } - - void - ThreadPool::drain() - { - util::Lock lock(m_mutex); - - if (m_status.load(std::memory_order_relaxed) == Status::Run) - { - m_status = Status::Drain; - - interrupt(); - waitThreads(); - - m_status = Status::Run; - - releaseThreads(); - } - } - - void - ThreadPool::shutdown() - { - util::Lock lock(m_mutex); - - if (m_status.load(std::memory_order_relaxed) == Status::Run) - { - m_queue.disable(); - m_status = Status::Stop; - - interrupt(); - m_queue.removeAll(); - - join(); - } - } - - bool - ThreadPool::start() - { - util::Lock lock(m_mutex); - - if (m_status.load(std::memory_order_relaxed) != Status::Stop) - { - return true; - } - - for (auto it = (m_threads.begin() + m_createdThreads); it != m_threads.end(); ++it) - { - if (!spawn()) - { - releaseThreads(); - - join(); - - return false; - } - } - - waitThreads(); - - m_queue.enable(); - m_status = Status::Run; - - // `releaseThreads` has a release barrier so workers don't return from - // wait and not see the above store. - - releaseThreads(); - - return true; - } - - void - ThreadPool::stop() - { - util::Lock lock(m_mutex); - - if (m_status.load(std::memory_order_relaxed) == Status::Run) - { - m_queue.disable(); - m_status = Status::Drain; - - // `interrupt` has an acquire barrier (locks a mutex), so nothing will - // be executed before the above store to `status`. - interrupt(); - - waitThreads(); - - m_status = Status::Stop; - - // `releaseThreads` has a release barrier so workers don't return from - // wait and not see the above store. - - releaseThreads(); - - join(); - } - } - - } // namespace thread -} // namespace llarp diff --git a/llarp/util/thread/thread_pool.hpp b/llarp/util/thread/thread_pool.hpp deleted file mode 100644 index 4002c849e..000000000 --- a/llarp/util/thread/thread_pool.hpp +++ /dev/null @@ -1,216 +0,0 @@ -#ifndef LLARP_THREAD_POOL_HPP -#define LLARP_THREAD_POOL_HPP - -#include -#include - -#include -#include -#include -#include -#include - -namespace llarp -{ - namespace thread - { - class ThreadPool - { - // Provide an efficient fixed size threadpool. The following attributes - // of the threadpool are fixed at construction time: - // - the max number of pending jobs - // - the number of threads - public: - using Job = std::function; - using JobQueue = Queue; - - enum class Status - { - Stop, - Run, - Suspend, - Drain - }; - - private: - JobQueue m_queue; // The job queue - util::Semaphore m_semaphore; // The semaphore for the queue. - - std::atomic_size_t m_idleThreads; // Number of idle threads - - util::Mutex m_mutex; - - std::atomic m_status; - - size_t m_gateCount GUARDED_BY(m_gateMutex); - size_t m_numThreadsReady GUARDED_BY(m_gateMutex); // Threads ready to go through the gate. - - std::mutex m_gateMutex; - std::condition_variable m_gateCV; - std::condition_variable m_numThreadsCV; - - std::string m_name; - std::vector m_threads; - size_t m_createdThreads; - - void - join(); - - void - runJobs(); - - void - drainQueue(); - - void - waitThreads(); - - void - releaseThreads(); - - void - interrupt(); - - void - worker(); - - bool - spawn(); - - bool - allThreadsReady() const REQUIRES_SHARED(m_gateMutex) - { - return m_numThreadsReady == m_threads.size(); - } - - public: - ThreadPool(size_t numThreads, size_t maxJobs, std::string_view name); - - ~ThreadPool(); - - // Disable the threadpool. Calls to `addJob` and `tryAddJob` will fail. - // Jobs currently in the pool will not be affected. - void - disable(); - - void - enable(); - - // Add a job to the bool. Note this call will block if the underlying - // queue is full. - // Returns false if the queue is currently disabled. - bool - addJob(const Job& job); - bool - addJob(Job&& job); - - // Try to add a job to the pool. If the queue is full, or the queue is - // disabled, return false. - // This call will not block. - bool - tryAddJob(const Job& job); - bool - tryAddJob(Job&& job); - - // Wait until all current jobs are complete. - // If any jobs are submitted during this time, they **may** or **may not** - // run. - void - drain(); - - // Disable this pool, and cancel all pending jobs. After all currently - // running jobs are complete, join with the threads in the pool. - void - shutdown(); - - // Start this threadpool by spawning `threadCount()` threads. - bool - start(); - - // Disable queueing on this threadpool and wait until all pending jobs - // have finished. - void - stop(); - - bool - enabled() const; - - bool - started() const; - - size_t - activeThreadCount() const; - - // Current number of queued jobs - size_t - jobCount() const; - - // Number of threads passed in the constructor - size_t - threadCount() const; - - // Number of threads currently started in the threadpool - size_t - startedThreadCount() const; - - // Max number of queued jobs - size_t - capacity() const; - }; - - inline void - ThreadPool::disable() - { - m_queue.disable(); - } - - inline void - ThreadPool::enable() - { - m_queue.enable(); - } - - inline bool - ThreadPool::enabled() const - { - return m_queue.enabled(); - } - - inline size_t - ThreadPool::activeThreadCount() const - { - if (m_threads.size() == m_createdThreads) - { - return m_threads.size() - m_idleThreads.load(std::memory_order_relaxed); - } - - return 0; - } - - inline size_t - ThreadPool::threadCount() const - { - return m_threads.size(); - } - - inline size_t - ThreadPool::startedThreadCount() const - { - return m_createdThreads; - } - - inline size_t - ThreadPool::jobCount() const - { - return m_queue.size(); - } - - inline size_t - ThreadPool::capacity() const - { - return m_queue.capacity(); - } - } // namespace thread -} // namespace llarp - -#endif diff --git a/llarp/util/thread/threadpool.cpp b/llarp/util/thread/threadpool.cpp deleted file mode 100644 index 3a66597fa..000000000 --- a/llarp/util/thread/threadpool.cpp +++ /dev/null @@ -1,89 +0,0 @@ -#include -#include -#include -#include - -#include -#include -#include - -struct llarp_threadpool* -llarp_init_threadpool(int workers, const char* name, size_t queueLength) -{ - if (workers <= 0) - workers = 1; - return new llarp_threadpool(workers, name, queueLength); -} - -void -llarp_threadpool_join(struct llarp_threadpool* pool) -{ - llarp::LogDebug("threadpool join"); - if (pool->impl) - pool->impl->stop(); - pool->impl.reset(); -} - -void -llarp_threadpool_start(struct llarp_threadpool* pool) -{ - if (pool->impl) - pool->impl->start(); -} - -void -llarp_threadpool_stop(struct llarp_threadpool* pool) -{ - llarp::LogDebug("threadpool stop"); - if (pool->impl) - pool->impl->disable(); -} - -bool -llarp_threadpool_queue_job(struct llarp_threadpool* pool, struct llarp_thread_job job) -{ - return llarp_threadpool_queue_job(pool, std::bind(job.work, job.user)); -} - -bool -llarp_threadpool_queue_job(struct llarp_threadpool* pool, std::function func) -{ - return pool->impl && pool->impl->addJob(func); -} - -void -llarp_threadpool_tick(struct llarp_threadpool* pool) -{ - if (pool->impl) - { - pool->impl->drain(); - } -} - -void -llarp_free_threadpool(struct llarp_threadpool** pool) -{ - if (*pool) - { - delete *pool; - } - *pool = nullptr; -} - -size_t -llarp_threadpool::size() const -{ - return impl ? impl->capacity() : 0; -} - -size_t -llarp_threadpool::pendingJobs() const -{ - return impl ? impl->jobCount() : 0; -} - -size_t -llarp_threadpool::numThreads() const -{ - return impl ? impl->activeThreadCount() : 0; -} diff --git a/llarp/util/thread/threadpool.h b/llarp/util/thread/threadpool.h deleted file mode 100644 index 0b5354a65..000000000 --- a/llarp/util/thread/threadpool.h +++ /dev/null @@ -1,91 +0,0 @@ -#ifndef LLARP_THREADPOOL_H -#define LLARP_THREADPOOL_H - -#include -#include -#include -#include -#include - -#include -#include -#include - -struct llarp_threadpool; - -#ifdef __cplusplus -struct llarp_threadpool -{ - std::unique_ptr impl; - - llarp_threadpool(int workers, std::string_view name, size_t queueLength = size_t{1024 * 8}) - : impl(std::make_unique( - workers, std::max(queueLength, size_t{32}), name)) - { - } - - size_t - size() const; - - size_t - pendingJobs() const; - - size_t - numThreads() const; - - /// see if this thread is full given lookahead amount - bool - LooksFull(size_t lookahead) const - { - return (pendingJobs() + lookahead) >= size(); - } -}; -#endif - -struct llarp_threadpool* -llarp_init_threadpool(int workers, const char* name, size_t queueLength); - -void -llarp_free_threadpool(struct llarp_threadpool** tp); - -using llarp_thread_work_func = void (*)(void*); - -/** job to be done in worker thread */ -struct llarp_thread_job -{ -#ifdef __cplusplus - /** user data to pass to work function */ - void* user{nullptr}; - /** called in threadpool worker thread */ - llarp_thread_work_func work{nullptr}; - - llarp_thread_job(void* u, llarp_thread_work_func w) : user(u), work(w) - { - } - - llarp_thread_job() = default; -#else - void* user; - llarp_thread_work_func work; -#endif -}; - -void -llarp_threadpool_tick(struct llarp_threadpool* tp); - -bool -llarp_threadpool_queue_job(struct llarp_threadpool* tp, struct llarp_thread_job j); - -#ifdef __cplusplus - -bool -llarp_threadpool_queue_job(struct llarp_threadpool* tp, std::function func); - -#endif - -void -llarp_threadpool_start(struct llarp_threadpool* tp); -void -llarp_threadpool_stop(struct llarp_threadpool* tp); - -#endif diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 417ad3527..84f6e4fb3 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -29,7 +29,6 @@ add_executable(testAll dht/test_llarp_dht_txowner.cpp dns/test_llarp_dns_dns.cpp exit/test_llarp_exit_context.cpp - link/test_llarp_link.cpp llarp_test.cpp net/test_llarp_net.cpp router/test_llarp_router_version.cpp @@ -47,7 +46,6 @@ add_executable(testAll util/test_llarp_util_log_level.cpp util/thread/test_llarp_util_queue_manager.cpp util/thread/test_llarp_util_queue.cpp - util/thread/test_llarp_util_thread_pool.cpp ) target_link_libraries(testAll PUBLIC gmock gtest liblokinet) @@ -76,6 +74,7 @@ add_executable(catchAll config/test_llarp_config_output.cpp net/test_ip_address.cpp net/test_sock_addr.cpp + iwp/test_iwp_session.cpp check_main.cpp) target_link_libraries(catchAll PUBLIC liblokinet Catch2::Catch2) diff --git a/test/exit/test_llarp_exit_context.cpp b/test/exit/test_llarp_exit_context.cpp index 4dfd9d6b7..7059b7bfc 100644 --- a/test/exit/test_llarp_exit_context.cpp +++ b/test/exit/test_llarp_exit_context.cpp @@ -9,10 +9,10 @@ struct ExitTest : public ::testing::Test { - ExitTest() : r(nullptr, nullptr, nullptr), context(&r) + ExitTest() : r(nullptr, nullptr), context(&r) { + r.Configure(nullptr, false, nullptr); } - llarp::Router r; llarp::exit::Context context; }; diff --git a/test/iwp/test_iwp_session.cpp b/test/iwp/test_iwp_session.cpp new file mode 100644 index 000000000..58cce40fa --- /dev/null +++ b/test/iwp/test_iwp_session.cpp @@ -0,0 +1,278 @@ +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#undef LOG_TAG +#define LOG_TAG "test_iwp_session.cpp" + +namespace iwp = llarp::iwp; +namespace util = llarp::util; + +/// make an iwp link +template +static llarp::LinkLayer_ptr +make_link(Args_t... args) +{ + if (inbound) + return iwp::NewInboundLink(args...); + else + return iwp::NewOutboundLink(args...); +} +using Logic_ptr = std::shared_ptr; + +/// a single iwp link with associated keys and members to make unit tests work +struct IWPLinkContext +{ + llarp::RouterContact rc; + llarp::IpAddress localAddr; + llarp::LinkLayer_ptr link; + std::shared_ptr keyManager; + llarp::LinkMessageParser m_Parser; + llarp_ev_loop_ptr m_Loop; + /// is the test done on this context ? + bool gucci = false; + + IWPLinkContext(std::string_view addr, llarp_ev_loop_ptr loop) + : localAddr{std::move(addr)} + , keyManager{std::make_shared()} + , m_Parser{nullptr} + , m_Loop{std::move(loop)} + { + // generate keys + llarp::CryptoManager::instance()->identity_keygen(keyManager->identityKey); + llarp::CryptoManager::instance()->encryption_keygen(keyManager->encryptionKey); + llarp::CryptoManager::instance()->encryption_keygen(keyManager->transportKey); + + // set keys in rc + rc.pubkey = keyManager->identityKey.toPublic(); + rc.enckey = keyManager->encryptionKey.toPublic(); + } + + bool + HandleMessage(llarp::ILinkSession* from, const llarp_buffer_t& buf) + { + return m_Parser.ProcessFrom(from, buf); + } + + /// initialize link + template + void + InitLink(std::function established) + { + link = make_link( + keyManager, + // getrc + [&]() -> const llarp::RouterContact& { return rc; }, + // link message handler + util::memFn(&IWPLinkContext::HandleMessage, this), + // sign buffer + [&](llarp::Signature& sig, const llarp_buffer_t& buf) { + REQUIRE(llarp::CryptoManager::instance()->sign(sig, keyManager->identityKey, buf)); + return true; + }, + // established handler + [established](llarp::ILinkSession* s) { + REQUIRE(s != nullptr); + established(s); + return true; + }, + // renegotiate handler + [](llarp::RouterContact newrc, llarp::RouterContact oldrc) { + REQUIRE(newrc.pubkey == oldrc.pubkey); + return true; + }, + // timeout handler + [&](llarp::ILinkSession*) { + llarp_ev_loop_stop(m_Loop); + REQUIRE(false); + }, + // session closed handler + [](llarp::RouterID) {}, + // pump done handler + []() {}, + // do work function + [l = m_Loop](llarp::Work_t work) { l->call_after_delay(1ms, work); }); + REQUIRE(link->Configure( + m_Loop, llarp::net::LoopbackInterfaceName(), AF_INET, *localAddr.getPort())); + + if (inbound) + { + // only add address info on the recipiant's rc + rc.addrs.emplace_back(); + REQUIRE(link->GetOurAddressInfo(rc.addrs.back())); + } + // sign rc + REQUIRE(rc.Sign(keyManager->identityKey)); + REQUIRE(keyManager != nullptr); + } +}; + +using Context_ptr = std::shared_ptr; + +/// run an iwp unit test after setup +/// call take 2 parameters, test and a timeout +/// +/// test is a callable that takes 5 arguments: +/// 0) std::function that starts the iwp links and gives a logic to call with +/// 1) std::function that ends the unit test if we are done +/// 2) std::function that ends the unit test right now as a success +/// 3) client iwp link context (shared_ptr) +/// 4) relay iwp link context (shared_ptr) +/// +/// timeout is a std::chrono::duration that tells the driver how long to run the unit test for +/// before it should assume failure of unit test +template +void +RunIWPTest(Func_t test, Duration_t timeout = 1s) +{ + // shut up logs + llarp::LogSilencer shutup; + + // set up event loop + auto logic = std::make_shared(); + auto loop = llarp_make_ev_loop(); + loop->set_logic(logic); + + llarp::LogContext::Instance().Initialize( + llarp::eLogDebug, llarp::LogType::File, "stdout", "unit test", [loop](auto work) { + loop->call_soon(work); + }); + + // turn off bogon blocking + auto oldBlockBogons = llarp::RouterContact::BlockBogons; + llarp::RouterContact::BlockBogons = false; + + // set up cryptography + llarp::sodium::CryptoLibSodium crypto{}; + llarp::CryptoManager manager{&crypto}; + + // set up client + auto initiator = std::make_shared("127.0.0.1:3001", loop); + // set up server + auto recipiant = std::make_shared("127.0.0.1:3002", loop); + + // function for ending unit test on success + auto endIfDone = [initiator, recipiant, loop, logic]() { + if (initiator->gucci and recipiant->gucci) + { + LogicCall(logic, [loop]() { llarp_ev_loop_stop(loop); }); + } + }; + // function to start test and give logic to unit test + auto start = [initiator, recipiant, logic]() { + REQUIRE(initiator->link->Start(logic)); + REQUIRE(recipiant->link->Start(logic)); + return logic; + }; + + // function to end test immediately + auto endTest = [logic, loop]() { LogicCall(logic, [loop]() { llarp_ev_loop_stop(loop); }); }; + + loop->call_after_delay( + std::chrono::duration_cast(timeout), []() { REQUIRE(false); }); + test(start, endIfDone, endTest, initiator, recipiant); + llarp_ev_loop_run_single_process(loop, logic); + llarp::RouterContact::BlockBogons = oldBlockBogons; +} + +/// ensure clients can connect to relays +TEST_CASE("IWP handshake", "[iwp]") +{ + RunIWPTest([](std::function start, + std::function endIfDone, + [[maybe_unused]] std::function endTestNow, + Context_ptr alice, + Context_ptr bob) { + // set up initiator + alice->InitLink([=](auto remote) { + REQUIRE(remote->GetRemoteRC() == bob->rc); + alice->gucci = true; + endIfDone(); + }); + // set up recipiant + bob->InitLink([=](auto remote) { + REQUIRE(remote->GetRemoteRC() == alice->rc); + bob->gucci = true; + endIfDone(); + }); + // start unit test + auto logic = start(); + // try establishing a session + LogicCall(logic, [link = alice->link, rc = bob->rc]() { REQUIRE(link->TryEstablishTo(rc)); }); + }); +} + +/// ensure relays cannot connect to clients +TEST_CASE("IWP handshake reverse", "[iwp]") +{ + RunIWPTest([](std::function start, + [[maybe_unused]] std::function endIfDone, + std::function endTestNow, + Context_ptr alice, + Context_ptr bob) { + alice->InitLink([](auto) {}); + bob->InitLink([](auto) {}); + // start unit test + auto logic = start(); + // try establishing a session in the wrong direction + LogicCall(logic, [logic, link = bob->link, rc = alice->rc, endTestNow]() { + REQUIRE(not link->TryEstablishTo(rc)); + endTestNow(); + }); + }); +} + +/// ensure iwp can send messages between sessions +TEST_CASE("IWP send messages", "[iwp]") +{ + RunIWPTest([](std::function start, + std::function endIfDone, + std::function endTestNow, + Context_ptr alice, + Context_ptr bob) { + constexpr int aliceNumSend = 128; + int aliceNumSent = 0; + // when alice makes a session to bob send `aliceNumSend` messages to him + alice->InitLink([endIfDone, alice, &aliceNumSent](auto session) { + for (auto index = 0; index < aliceNumSend; index++) + { + alice->m_Loop->call_soon([session, endIfDone, alice, &aliceNumSent]() { + // generate a discard message that is 512 bytes long + llarp::DiscardMessage msg; + std::vector msgBuff(512); + llarp_buffer_t buf(msgBuff); + // add random padding + llarp::CryptoManager::instance()->randomize(buf); + // encode the discard message + msg.BEncode(&buf); + // send the message + session->SendMessageBuffer(msgBuff, [endIfDone, alice, &aliceNumSent](auto status) { + if (status == llarp::ILinkSession::DeliveryStatus::eDeliverySuccess) + { + // on successful transmit increment the number we sent + aliceNumSent++; + } + // if we sent all the messages sucessfully we end the unit test + alice->gucci = aliceNumSent == aliceNumSend; + endIfDone(); + }); + }); + } + }); + bob->InitLink([bob](auto) { bob->gucci = true; }); + // start unit test + auto logic = start(); + // try establishing a session from alice to bob + LogicCall(logic, [logic, link = alice->link, rc = bob->rc, endTestNow]() { + REQUIRE(link->TryEstablishTo(rc)); + }); + }); +} diff --git a/test/link/test_llarp_link.cpp b/test/link/test_llarp_link.cpp deleted file mode 100644 index c50f76976..000000000 --- a/test/link/test_llarp_link.cpp +++ /dev/null @@ -1,329 +0,0 @@ -#include -#include -#include -#include -#include -#include -#include -#include - -#include - -#include - -using namespace ::llarp; -using namespace ::testing; - -struct LinkLayerTest : public test::LlarpTest -{ - static constexpr uint16_t AlicePort = 41163; - static constexpr uint16_t BobPort = 8088; - - struct Context - { - Context() - { - keyManager = std::make_shared(); - - SecretKey signingKey; - CryptoManager::instance()->identity_keygen(signingKey); - keyManager->identityKey = signingKey; - - SecretKey encryptionKey; - CryptoManager::instance()->encryption_keygen(encryptionKey); - keyManager->encryptionKey = encryptionKey; - - SecretKey transportKey; - CryptoManager::instance()->encryption_keygen(transportKey); - keyManager->transportKey = transportKey; - - rc.pubkey = signingKey.toPublic(); - rc.enckey = encryptionKey.toPublic(); - } - - std::shared_ptr worker; - - std::shared_ptr keyManager; - - RouterContact rc; - - bool madeSession = false; - bool gotLIM = false; - - bool - IsGucci() const - { - return gotLIM && madeSession; - } - - void - Setup() - { - worker = std::make_shared(1, 128, "test-worker"); - worker->start(); - } - - const RouterContact& - GetRC() const - { - return rc; - } - - RouterID - GetRouterID() const - { - return rc.pubkey; - } - - std::shared_ptr link; - - static std::string - localLoopBack() - { -#if defined(__FreeBSD__) || defined(__OpenBSD__) || defined(__NetBSD__) || (__APPLE__ && __MACH__) \ - || (__sun) - return "lo0"; -#else - return "lo"; -#endif - } - - bool - Start(std::shared_ptr logic, llarp_ev_loop_ptr loop, uint16_t port) - { - if (!link) - return false; - if (!link->Configure(loop, localLoopBack(), AF_INET, port)) - return false; - /* - * TODO: ephemeral key management - if(!link->GenEphemeralKeys()) - return false; - */ - rc.addrs.emplace_back(); - if (!link->GetOurAddressInfo(rc.addrs[0])) - return false; - if (!rc.Sign(keyManager->identityKey)) - return false; - return link->Start(logic, worker); - } - - void - Stop() - { - if (link) - link->Stop(); - if (worker) - { - worker->drain(); - worker->stop(); - } - } - - void - TearDown() - { - link.reset(); - worker.reset(); - } - }; - - Context Alice; - Context Bob; - - bool success = false; - const bool shouldDebug = false; - - llarp_ev_loop_ptr netLoop; - std::shared_ptr m_logic; - - llarp_time_t oldRCLifetime; - llarp::LogLevel oldLevel; - - LinkLayerTest() : netLoop(nullptr) - { - } - - void - SetUp() - { - oldLevel = llarp::LogContext::Instance().curLevel; - if (shouldDebug) - llarp::SetLogLevel(eLogTrace); - oldRCLifetime = RouterContact::Lifetime; - RouterContact::BlockBogons = false; - RouterContact::Lifetime = 500ms; - netLoop = llarp_make_ev_loop(); - m_logic.reset(new Logic()); - netLoop->set_logic(m_logic); - Alice.Setup(); - Bob.Setup(); - } - - void - TearDown() - { - Alice.TearDown(); - Bob.TearDown(); - m_logic.reset(); - netLoop.reset(); - RouterContact::BlockBogons = true; - RouterContact::Lifetime = oldRCLifetime; - llarp::SetLogLevel(oldLevel); - } - - void - RunMainloop() - { - m_logic->call_later(5s, std::bind(&LinkLayerTest::Stop, this)); - llarp_ev_loop_run_single_process(netLoop, m_logic); - } - - void - Stop() - { - Alice.Stop(); - Bob.Stop(); - llarp_ev_loop_stop(netLoop); - m_logic->stop(); - } -}; - -TEST_F(LinkLayerTest, TestIWP) -{ -#ifdef WIN32 - GTEST_SKIP(); -#else - auto sendDiscardMessage = [](ILinkSession* s, auto callback) -> bool { - // send discard message in reply to complete unit test - std::vector tmp(32); - llarp_buffer_t otherBuf(tmp); - DiscardMessage discard; - if (!discard.BEncode(&otherBuf)) - return false; - return s->SendMessageBuffer(std::move(tmp), callback); - }; - Alice.link = iwp::NewInboundLink( - // KeyManager - Alice.keyManager, - - // GetRCFunc - [&]() -> const RouterContact& { return Alice.GetRC(); }, - - // LinkMessageHandler - [&](ILinkSession* s, const llarp_buffer_t& buf) -> bool { - llarp_buffer_t copy(buf.base, buf.sz); - if (not Alice.gotLIM) - { - LinkIntroMessage msg; - if (msg.BDecode(©)) - { - Alice.gotLIM = s->GotLIM(&msg); - } - } - return Alice.gotLIM; - }, - - // SignBufferFunc - [&](Signature& sig, const llarp_buffer_t& buf) -> bool { - return m_crypto.sign(sig, Alice.keyManager->identityKey, buf); - }, - - // SessionEstablishedHandler - [&, this](ILinkSession* s) -> bool { - const auto rc = s->GetRemoteRC(); - if (rc.pubkey != Bob.GetRC().pubkey) - return false; - LogInfo("alice established with bob"); - Alice.madeSession = true; - sendDiscardMessage(s, [&](auto status) { - success = status == llarp::ILinkSession::DeliveryStatus::eDeliverySuccess; - LogInfo("message sent to bob suceess=", success); - this->Stop(); - }); - return true; - }, - - // SessionRenegotiateHandler - [&](RouterContact, RouterContact) -> bool { return true; }, - - // TimeoutHandler - [&](ILinkSession* session) { - ASSERT_FALSE(session->IsEstablished()); - Stop(); - }, - - // SessionClosedHandler - [&](RouterID router) { ASSERT_EQ(router, Alice.GetRouterID()); }, - - // PumpDoneHandler - []() {}); - - Bob.link = iwp::NewInboundLink( - // KeyManager - Bob.keyManager, - - // GetRCFunc - [&]() -> const RouterContact& { return Bob.GetRC(); }, - - // LinkMessageHandler - [&](ILinkSession* s, const llarp_buffer_t& buf) -> bool { - llarp_buffer_t copy(buf.base, buf.sz); - if (not Bob.gotLIM) - { - LinkIntroMessage msg; - if (msg.BDecode(©)) - { - Bob.gotLIM = s->GotLIM(&msg); - } - return Bob.gotLIM; - } - DiscardMessage discard; - if (discard.BDecode(©)) - { - LogInfo("bog got discard message from alice"); - return true; - } - return false; - }, - - // SignBufferFunc - [&](Signature& sig, const llarp_buffer_t& buf) -> bool { - return m_crypto.sign(sig, Bob.keyManager->identityKey, buf); - }, - - // SessionEstablishedHandler - [&](ILinkSession* s) -> bool { - if (s->GetRemoteRC().pubkey != Alice.GetRC().pubkey) - return false; - LogInfo("bob established with alice"); - Bob.madeSession = true; - - return true; - }, - - // SessionRenegotiateHandler - [&](RouterContact newrc, RouterContact oldrc) -> bool { - return newrc.pubkey == oldrc.pubkey; - }, - - // TimeoutHandler - [&](ILinkSession* session) { ASSERT_FALSE(session->IsEstablished()); }, - - // SessionClosedHandler - [&](RouterID router) { ASSERT_EQ(router, Alice.GetRouterID()); }, - - // PumpDoneHandler - []() {}); - - ASSERT_TRUE(Alice.Start(m_logic, netLoop, AlicePort)); - ASSERT_TRUE(Bob.Start(m_logic, netLoop, BobPort)); - - LogicCall(m_logic, [&]() { ASSERT_TRUE(Alice.link->TryEstablishTo(Bob.GetRC())); }); - - RunMainloop(); - ASSERT_TRUE(Alice.IsGucci()); - ASSERT_TRUE(Bob.IsGucci()); - ASSERT_TRUE(success); -#endif -}; diff --git a/test/nodedb/test_nodedb.cpp b/test/nodedb/test_nodedb.cpp index 26070358b..cb12712c5 100644 --- a/test/nodedb/test_nodedb.cpp +++ b/test/nodedb/test_nodedb.cpp @@ -6,10 +6,10 @@ TEST_CASE("FindClosestTo returns correct number of elements", "[nodedb][dht]") { - llarp_nodedb nodeDB(nullptr, ""); + llarp_nodedb nodeDB("", nullptr); constexpr uint64_t numRCs = 3; - for(uint64_t i = 0; i < numRCs; ++i) + for (uint64_t i = 0; i < numRCs; ++i) { llarp::RouterContact rc; rc.pubkey[0] = i; @@ -20,7 +20,7 @@ TEST_CASE("FindClosestTo returns correct number of elements", "[nodedb][dht]") llarp::dht::Key_t key; - std::vector< llarp::RouterContact > results = nodeDB.FindClosestTo(key, 4); + std::vector results = nodeDB.FindClosestTo(key, 4); // we asked for more entries than nodedb had REQUIRE(numRCs == results.size()); @@ -28,7 +28,7 @@ TEST_CASE("FindClosestTo returns correct number of elements", "[nodedb][dht]") TEST_CASE("FindClosestTo returns properly ordered set", "[nodedb][dht]") { - llarp_nodedb nodeDB(nullptr, ""); + llarp_nodedb nodeDB("", nullptr); // insert some RCs: a < b < c llarp::RouterContact a; @@ -47,7 +47,7 @@ TEST_CASE("FindClosestTo returns properly ordered set", "[nodedb][dht]") llarp::dht::Key_t key; - std::vector< llarp::RouterContact > results = nodeDB.FindClosestTo(key, 2); + std::vector results = nodeDB.FindClosestTo(key, 2); REQUIRE(2 == results.size()); // we xor'ed with 0x0, so order should be a,b,c diff --git a/test/util/thread/test_llarp_util_thread_pool.cpp b/test/util/thread/test_llarp_util_thread_pool.cpp deleted file mode 100644 index 7c10b824c..000000000 --- a/test/util/thread/test_llarp_util_thread_pool.cpp +++ /dev/null @@ -1,456 +0,0 @@ -#include -#include -#include - -#include -#include -#include - -#include - -using namespace llarp; -using namespace llarp::thread; - -using LockGuard = std::unique_lock< std::mutex >; - -class PoolArgs -{ - public: - std::mutex& mutex; - std::condition_variable& start; - std::condition_variable& stop; - volatile size_t count; - volatile size_t startSignal; - volatile size_t stopSignal; -}; - -class BarrierArgs -{ - public: - util::Barrier& startBarrier; - util::Barrier& stopBarrier; - - std::atomic_size_t count; -}; - -class BasicWorkArgs -{ - public: - std::atomic_size_t count; -}; - -void -simpleFunction(PoolArgs& args) -{ - LockGuard lock(args.mutex); - ++args.count; - ++args.startSignal; - args.start.notify_one(); - - args.stop.wait(lock, [&]() { return args.stopSignal; }); -} - -void -incrementFunction(PoolArgs& args) -{ - LockGuard lock(args.mutex); - ++args.count; - ++args.startSignal; - args.start.notify_one(); -} - -void -barrierFunction(BarrierArgs& args) -{ - args.startBarrier.Block(); - args.count++; - args.stopBarrier.Block(); -} - -void -basicWork(BasicWorkArgs& args) -{ - args.count++; -} - -void -recurse(util::Barrier& barrier, std::atomic_size_t& counter, ThreadPool& pool, - size_t depthLimit) -{ - ASSERT_LE(0u, counter); - ASSERT_GT(depthLimit, counter); - - if(++counter != depthLimit) - { - ASSERT_TRUE( - pool.addJob(std::bind(recurse, std::ref(barrier), std::ref(counter), - std::ref(pool), depthLimit))); - } - - barrier.Block(); -} - -class DestructiveObject -{ - private: - util::Barrier& barrier; - ThreadPool& pool; - - public: - DestructiveObject(util::Barrier& b, ThreadPool& p) : barrier(b), pool(p) - { - } - - ~DestructiveObject() - { - auto job = std::bind(&util::Barrier::Block, &barrier); - pool.addJob(job); - } -}; - -void -destructiveJob(DestructiveObject* obj) -{ - delete obj; -} - -TEST(TestThreadPool, breathing) -{ - static constexpr size_t threads = 10; - static constexpr size_t capacity = 50; - - ThreadPool pool(threads, capacity, "breathing"); - - ASSERT_EQ(0u, pool.startedThreadCount()); - ASSERT_EQ(capacity, pool.capacity()); - ASSERT_EQ(0u, pool.jobCount()); - - ASSERT_TRUE(pool.start()); - - ASSERT_EQ(threads, pool.startedThreadCount()); - ASSERT_EQ(capacity, pool.capacity()); - ASSERT_EQ(0u, pool.jobCount()); - - pool.drain(); -} - -struct AccessorsData -{ - size_t threads; - size_t capacity; -}; - -std::ostream& -operator<<(std::ostream& os, AccessorsData d) -{ - os << "[ threads = " << d.threads << " capacity = " << d.capacity << " ]"; - return os; -} - -class Accessors : public ::testing::TestWithParam< AccessorsData > -{ -}; - -TEST_P(Accessors, accessors) -{ - auto d = GetParam(); - - ThreadPool pool(d.threads, d.capacity, "accessors"); - - ASSERT_EQ(d.threads, pool.threadCount()); - ASSERT_EQ(d.capacity, pool.capacity()); - ASSERT_EQ(0u, pool.startedThreadCount()); -} - -static const AccessorsData accessorsData[] = { - {10, 50}, {1, 1}, {50, 100}, {2, 22}, {100, 200}}; - -INSTANTIATE_TEST_SUITE_P(TestThreadPool, Accessors, - ::testing::ValuesIn(accessorsData)); - -struct ClosingData -{ - size_t threads; - size_t capacity; -}; - -std::ostream& -operator<<(std::ostream& os, ClosingData d) -{ - os << "[ threads = " << d.threads << " capacity = " << d.capacity << " ]"; - return os; -} - -class Closing : public ::testing::TestWithParam< ClosingData > -{ -}; - -TEST_P(Closing, drain) -{ - auto d = GetParam(); - - std::mutex mutex; - std::condition_variable start; - std::condition_variable stop; - - PoolArgs args{mutex, start, stop, 0, 0, 0}; - - ThreadPool pool(d.threads, d.capacity, "drain"); - - ASSERT_EQ(d.threads, pool.threadCount()); - ASSERT_EQ(d.capacity, pool.capacity()); - ASSERT_EQ(0u, pool.startedThreadCount()); - - auto simpleJob = std::bind(simpleFunction, std::ref(args)); - - ASSERT_FALSE(pool.addJob(simpleJob)); - - ASSERT_TRUE(pool.start()); - ASSERT_EQ(0u, pool.jobCount()); - - LockGuard lock(mutex); - - for(size_t i = 0; i < d.threads; ++i) - { - args.startSignal = 0; - args.stopSignal = 0; - ASSERT_TRUE(pool.addJob(simpleJob)); - - start.wait(lock, [&]() { return args.startSignal; }); - } - - args.stopSignal++; - - lock.unlock(); - - stop.notify_all(); - - pool.drain(); - - ASSERT_EQ(d.threads, pool.startedThreadCount()); - ASSERT_EQ(0u, pool.jobCount()); -} - -TEST_P(Closing, stop) -{ - auto d = GetParam(); - - ThreadPool pool(d.threads, d.capacity, "stop"); - - std::mutex mutex; - std::condition_variable start; - std::condition_variable stop; - - PoolArgs args{mutex, start, stop, 0, 0, 0}; - - ASSERT_EQ(d.threads, pool.threadCount()); - ASSERT_EQ(d.capacity, pool.capacity()); - ASSERT_EQ(0u, pool.startedThreadCount()); - - auto simpleJob = std::bind(simpleFunction, std::ref(args)); - - ASSERT_FALSE(pool.addJob(simpleJob)); - - ASSERT_TRUE(pool.start()); - ASSERT_EQ(0u, pool.jobCount()); - - LockGuard lock(mutex); - - for(size_t i = 0; i < d.capacity; ++i) - { - args.startSignal = 0; - args.stopSignal = 0; - ASSERT_TRUE(pool.addJob(simpleJob)); - - while(i < d.threads && !args.startSignal) - { - start.wait(lock); - } - } - - args.stopSignal++; - - lock.unlock(); - - stop.notify_all(); - - pool.stop(); - - ASSERT_EQ(d.capacity, args.count); - ASSERT_EQ(0u, pool.startedThreadCount()); - ASSERT_EQ(0u, pool.activeThreadCount()); - ASSERT_EQ(0u, pool.jobCount()); -} - -TEST_P(Closing, shutdown) -{ - auto d = GetParam(); - - ThreadPool pool(d.threads, d.capacity, "shutdown"); - - std::mutex mutex; - std::condition_variable start; - std::condition_variable stop; - - PoolArgs args{mutex, start, stop, 0, 0, 0}; - - ASSERT_EQ(d.threads, pool.threadCount()); - ASSERT_EQ(d.capacity, pool.capacity()); - ASSERT_EQ(0u, pool.startedThreadCount()); - - auto simpleJob = std::bind(simpleFunction, std::ref(args)); - - ASSERT_FALSE(pool.addJob(simpleJob)); - - ASSERT_TRUE(pool.start()); - ASSERT_EQ(0u, pool.jobCount()); - - LockGuard lock(mutex); - - for(size_t i = 0; i < d.capacity; ++i) - { - args.startSignal = 0; - args.stopSignal = 0; - ASSERT_TRUE(pool.addJob(simpleJob)); - - while(i < d.threads && !args.startSignal) - { - start.wait(lock); - } - } - - ASSERT_EQ(d.threads, pool.startedThreadCount()); - ASSERT_EQ(d.capacity - d.threads, pool.jobCount()); - - auto incrementJob = std::bind(incrementFunction, std::ref(args)); - - for(size_t i = 0; i < d.threads; ++i) - { - ASSERT_TRUE(pool.addJob(incrementJob)); - } - - args.stopSignal++; - stop.notify_all(); - - lock.unlock(); - - pool.shutdown(); - - ASSERT_EQ(0u, pool.startedThreadCount()); - ASSERT_EQ(0u, pool.activeThreadCount()); - ASSERT_EQ(0u, pool.jobCount()); -} - -ClosingData closingData[] = {{1, 1}, {2, 2}, {10, 10}, - {10, 50}, {50, 75}, {25, 80}}; - -INSTANTIATE_TEST_SUITE_P(TestThreadPool, Closing, - ::testing::ValuesIn(closingData)); - -struct TryAddData -{ - size_t threads; - size_t capacity; -}; - -std::ostream& -operator<<(std::ostream& os, TryAddData d) -{ - os << "[ threads = " << d.threads << " capacity = " << d.capacity << " ]"; - return os; -} - -class TryAdd : public ::testing::TestWithParam< TryAddData > -{ -}; - -TEST_P(TryAdd, noblocking) -{ - // Verify that tryAdd does not block. - // Fill the queue, then verify `tryAddJob` does not block. - auto d = GetParam(); - - ThreadPool pool(d.threads, d.capacity, "noblocking"); - - util::Barrier startBarrier(d.threads + 1); - util::Barrier stopBarrier(d.threads + 1); - - BarrierArgs args{startBarrier, stopBarrier, {0}}; - - auto simpleJob = std::bind(barrierFunction, std::ref(args)); - - ASSERT_FALSE(pool.tryAddJob(simpleJob)); - - ASSERT_TRUE(pool.start()); - - for(size_t i = 0; i < d.threads; ++i) - { - ASSERT_TRUE(pool.tryAddJob(simpleJob)); - } - - // Wait for everything to start. - startBarrier.Block(); - - // and that we emptied the queue. - ASSERT_EQ(0u, pool.jobCount()); - - BasicWorkArgs basicWorkArgs = {{0}}; - - auto workJob = std::bind(basicWork, std::ref(basicWorkArgs)); - - for(size_t i = 0; i < d.capacity; ++i) - { - ASSERT_TRUE(pool.tryAddJob(workJob)); - } - - // queue should now be full - ASSERT_FALSE(pool.tryAddJob(workJob)); - - // and finish - stopBarrier.Block(); -} - -TEST(TestThreadPool, recurseJob) -{ - // Verify we can enqueue a job onto the threadpool from a thread which is - // currently executing a threadpool job. - - static constexpr size_t threads = 10; - static constexpr size_t depth = 10; - static constexpr size_t capacity = 100; - - util::Barrier barrier(threads + 1); - std::atomic_size_t counter{0}; - - ThreadPool pool(threads, capacity, "recurse"); - - pool.start(); - - ASSERT_TRUE(pool.addJob(std::bind(recurse, std::ref(barrier), - std::ref(counter), std::ref(pool), depth))); - - barrier.Block(); - ASSERT_EQ(depth, counter); -} - -TEST(TestThreadPool, destructors) -{ - // Verify that functors have their destructors called outside of threadpool - // locks. - - static constexpr size_t threads = 1; - static constexpr size_t capacity = 100; - - ThreadPool pool(threads, capacity, "destructors"); - - pool.start(); - - util::Barrier barrier(threads + 1); - - { - DestructiveObject* obj = new DestructiveObject(barrier, pool); - ASSERT_TRUE(pool.addJob(std::bind(destructiveJob, obj))); - } - - barrier.Block(); -}