use lokimq workers instead of llarp:🧵:ThreadPool

pull/1306/head
Jeff Becker 4 years ago
parent 30b158b906
commit f4971a88fd
No known key found for this signature in database
GPG Key ID: F357B3B42F6F9B05

@ -1,7 +1,7 @@
cmake_minimum_required(VERSION 3.10) # bionic's cmake version
# Has to be set before `project()`, and ignored on non-macos:
set(CMAKE_OSX_DEPLOYMENT_TARGET 10.13 CACHE STRING "macOS deployment target (Apple clang only)")
set(CMAKE_OSX_DEPLOYMENT_TARGET 10.14 CACHE STRING "macOS deployment target (Apple clang only)")
find_program(CCACHE_PROGRAM ccache)
if(CCACHE_PROGRAM)

@ -46,7 +46,6 @@ namespace llarp
std::unique_ptr<Crypto> crypto;
std::unique_ptr<CryptoManager> cryptoManager;
std::unique_ptr<AbstractRouter> router;
std::shared_ptr<thread::ThreadPool> worker;
std::shared_ptr<Logic> logic;
std::unique_ptr<Config> config;
std::unique_ptr<llarp_nodedb> nodedb;

@ -22,9 +22,7 @@ add_library(lokinet-util
util/str.cpp
util/thread/logic.cpp
util/thread/queue_manager.cpp
util/thread/thread_pool.cpp
util/thread/threading.cpp
util/thread/threadpool.cpp
util/time.cpp
)
add_dependencies(lokinet-util genversion)

@ -9,7 +9,7 @@
#include <nodedb.hpp>
#include <router/router.hpp>
#include <service/context.hpp>
#include <util/logging/logger.h>
#include <util/logging/logger.hpp>
#include <cxxopts.hpp>
#include <csignal>
@ -47,10 +47,6 @@ namespace llarp
}
}
auto threads = config->router.m_workerThreads;
if (threads <= 0)
threads = 1;
worker = std::make_shared<llarp::thread::ThreadPool>(threads, 1024, "llarp-worker");
logic = std::make_shared<Logic>();
nodedb_dir = fs::path(config->router.m_dataDir / nodedb_dirname).string();
@ -94,9 +90,11 @@ namespace llarp
crypto = std::make_unique<sodium::CryptoLibSodium>();
cryptoManager = std::make_unique<CryptoManager>(crypto.get());
router = std::make_unique<Router>(worker, mainloop, logic);
router = std::make_unique<Router>(mainloop, logic);
nodedb = std::make_unique<llarp_nodedb>(router->diskworker(), nodedb_dir);
nodedb = std::make_unique<llarp_nodedb>(
nodedb_dir,
[r = router.get()](std::function<void(void)> call) { r->QueueDiskIO(std::move(call)); });
if (!router->Configure(config.get(), opts.isRouter, nodedb.get()))
throw std::runtime_error("Failed to configure router");
@ -187,16 +185,9 @@ namespace llarp
void
Context::Close()
{
llarp::LogDebug("stop workers");
if (worker)
worker->stop();
llarp::LogDebug("free config");
config.release();
llarp::LogDebug("free workers");
worker.reset();
llarp::LogDebug("free nodedb");
nodedb.release();

@ -122,7 +122,7 @@ namespace llarp
Crypto* m_prevCrypto;
public:
CryptoManager(Crypto* crypto) : m_prevCrypto(m_crypto)
explicit CryptoManager(Crypto* crypto) : m_prevCrypto(m_crypto)
{
m_crypto = crypto;
}

@ -6,7 +6,6 @@
#include <util/buffer.hpp>
#include <utility>
#include <util/mem.h>
#include <util/thread/threadpool.h>
namespace llarp
{
@ -78,12 +77,14 @@ namespace llarp
const SecretKey& seckey;
EncryptedFrame target;
using WorkFunc_t = std::function<void(void)>;
using WorkerFunction_t = std::function<void(WorkFunc_t)>;
void
AsyncDecrypt(
const std::shared_ptr<thread::ThreadPool>& worker, const EncryptedFrame& frame, User_ptr u)
AsyncDecrypt(const EncryptedFrame& frame, User_ptr u, WorkerFunction_t worker)
{
target = frame;
worker->addJob(std::bind(&AsyncFrameDecrypter<User>::Decrypt, this, std::move(u)));
worker(std::bind(&AsyncFrameDecrypter<User>::Decrypt, this, std::move(u)));
}
};
} // namespace llarp

@ -26,6 +26,8 @@ llarp_make_ev_loop(size_t queueLength)
void
llarp_ev_loop_run_single_process(llarp_ev_loop_ptr ev, std::shared_ptr<llarp::Logic> logic)
{
if (ev == nullptr or logic == nullptr)
return;
ev->run();
logic->clear_event_loop();
ev->stopped();
@ -34,6 +36,8 @@ llarp_ev_loop_run_single_process(llarp_ev_loop_ptr ev, std::shared_ptr<llarp::Lo
int
llarp_ev_add_udp(struct llarp_ev_loop* ev, struct llarp_udp_io* udp, const llarp::SockAddr& src)
{
if (ev == nullptr or udp == nullptr)
return -1;
udp->parent = ev;
if (ev->udp_listen(udp, src))
return 0;

@ -461,8 +461,28 @@ namespace libuv
auto* self = static_cast<udp_glue*>(udp->impl);
if (self == nullptr)
return -1;
uv_buf_t buf = uv_buf_init((char*)ptr, sz);
return uv_udp_try_send(&self->m_Handle, &buf, 1, to);
char* data = new char[sz];
std::copy_n(ptr, sz, data);
uv_buf_t buf = uv_buf_init(data, sz);
uv_udp_send_t* req = new uv_udp_send_t;
req->data = data;
if (uv_udp_send(
req,
&self->m_Handle,
&buf,
1,
to,
[](uv_udp_send_t* req, int) {
delete[](char*) req->data;
delete req;
})
!= 0)
{
delete req;
return -1;
}
return 0;
}
bool

@ -17,10 +17,11 @@ namespace llarp
SessionRenegotiateHandler reneg,
TimeoutHandler timeout,
SessionClosedHandler closed,
PumpDoneHandler pumpDone)
PumpDoneHandler pumpDone,
WorkerFunc_t work)
{
return std::make_shared<LinkLayer>(
keyManager, getrc, h, sign, est, reneg, timeout, closed, pumpDone, true);
keyManager, getrc, h, sign, est, reneg, timeout, closed, pumpDone, work, true);
}
LinkLayer_ptr
@ -33,10 +34,11 @@ namespace llarp
SessionRenegotiateHandler reneg,
TimeoutHandler timeout,
SessionClosedHandler closed,
PumpDoneHandler pumpDone)
PumpDoneHandler pumpDone,
WorkerFunc_t work)
{
return std::make_shared<LinkLayer>(
keyManager, getrc, h, sign, est, reneg, timeout, closed, pumpDone, false);
keyManager, getrc, h, sign, est, reneg, timeout, closed, pumpDone, work, false);
}
} // namespace iwp
} // namespace llarp

@ -6,34 +6,34 @@
#include <memory>
#include <config/key_manager.hpp>
namespace llarp
namespace llarp::iwp
{
namespace iwp
{
LinkLayer_ptr
NewInboundLink(
std::shared_ptr<KeyManager> keyManager,
GetRCFunc getrc,
LinkMessageHandler h,
SignBufferFunc sign,
SessionEstablishedHandler est,
SessionRenegotiateHandler reneg,
TimeoutHandler timeout,
SessionClosedHandler closed,
PumpDoneHandler pumpDone);
LinkLayer_ptr
NewOutboundLink(
std::shared_ptr<KeyManager> keyManager,
GetRCFunc getrc,
LinkMessageHandler h,
SignBufferFunc sign,
SessionEstablishedHandler est,
SessionRenegotiateHandler reneg,
TimeoutHandler timeout,
SessionClosedHandler closed,
PumpDoneHandler pumpDone);
LinkLayer_ptr
NewInboundLink(
std::shared_ptr<KeyManager> keyManager,
GetRCFunc getrc,
LinkMessageHandler h,
SignBufferFunc sign,
SessionEstablishedHandler est,
SessionRenegotiateHandler reneg,
TimeoutHandler timeout,
SessionClosedHandler closed,
PumpDoneHandler pumpDone,
WorkerFunc_t work);
} // namespace iwp
} // namespace llarp
LinkLayer_ptr
NewOutboundLink(
std::shared_ptr<KeyManager> keyManager,
GetRCFunc getrc,
LinkMessageHandler h,
SignBufferFunc sign,
SessionEstablishedHandler est,
SessionRenegotiateHandler reneg,
TimeoutHandler timeout,
SessionClosedHandler closed,
PumpDoneHandler pumpDone,
WorkerFunc_t work);
} // namespace llarp::iwp
#endif

@ -18,8 +18,9 @@ namespace llarp
TimeoutHandler timeout,
SessionClosedHandler closed,
PumpDoneHandler pumpDone,
WorkerFunc_t worker,
bool allowInbound)
: ILinkLayer(keyManager, getrc, h, sign, est, reneg, timeout, closed, pumpDone)
: ILinkLayer(keyManager, getrc, h, sign, est, reneg, timeout, closed, pumpDone, worker)
, permitInbound{allowInbound}
{
}
@ -38,12 +39,6 @@ namespace llarp
return 2;
}
void
LinkLayer::QueueWork(std::function<void(void)> func)
{
m_Worker->addJob(func);
}
void
LinkLayer::RecvFrom(const SockAddr& from, ILinkSession::Packet_t pkt)
{

@ -6,7 +6,6 @@
#include <crypto/encrypted.hpp>
#include <crypto/types.hpp>
#include <link/server.hpp>
#include <util/thread/thread_pool.hpp>
#include <config/key_manager.hpp>
#include <memory>
@ -27,6 +26,7 @@ namespace llarp
TimeoutHandler timeout,
SessionClosedHandler closed,
PumpDoneHandler pumpDone,
WorkerFunc_t dowork,
bool permitInbound);
~LinkLayer() override;
@ -49,9 +49,6 @@ namespace llarp
void
UnmapAddr(const IpAddress& addr);
void
QueueWork(std::function<void(void)> work);
private:
std::unordered_map<IpAddress, RouterID, IpAddress::Hash> m_AuthedAddrs;
const bool permitInbound;

@ -197,7 +197,7 @@ namespace llarp
Session::SendMACK()
{
// send multi acks
while (m_SendMACKs.size() > 0)
while (not m_SendMACKs.empty())
{
const auto sz = m_SendMACKs.size();
const auto max = Session::MaxACKSInMACK;
@ -206,11 +206,11 @@ namespace llarp
mack[PacketOverhead + CommandOverhead] = byte_t{static_cast<byte_t>(numAcks)};
byte_t* ptr = mack.data() + 3 + PacketOverhead;
LogDebug("send ", numAcks, " macks to ", m_RemoteAddr);
auto itr = m_SendMACKs.begin();
const auto& itr = m_SendMACKs.top();
while (numAcks > 0)
{
htobe64buf(ptr, *itr);
itr = m_SendMACKs.erase(itr);
htobe64buf(ptr, itr);
m_SendMACKs.pop();
numAcks--;
ptr += sizeof(uint64_t);
}

@ -8,6 +8,7 @@
#include <unordered_set>
#include <deque>
#include <queue>
namespace llarp
{
@ -25,7 +26,7 @@ namespace llarp
/// How long to keep a replay window for
static constexpr auto ReplayWindow = (ReceivalTimeout * 3) / 2;
/// How often to acks RX messages
static constexpr auto ACKResendInterval = DeliveryTimeout / 2;
static constexpr auto ACKResendInterval = DeliveryTimeout / 4;
/// How often to retransmit TX fragments
static constexpr auto TXFlushInterval = (DeliveryTimeout / 5) * 4;
/// How often we send a keepalive
@ -195,8 +196,8 @@ namespace llarp
/// maps rxid to time recieved
std::unordered_map<uint64_t, llarp_time_t> m_ReplayFilter;
/// set of rx messages to send in next round of multiacks
std::unordered_set<uint64_t> m_SendMACKs;
/// rx messages to send in next round of multiacks
std::priority_queue<uint64_t, std::vector<uint64_t>, std::greater<uint64_t>> m_SendMACKs;
using CryptoQueue_t = std::list<Packet_t>;
using CryptoQueue_ptr = std::shared_ptr<CryptoQueue_t>;

@ -44,7 +44,7 @@ namespace llarp
AddLink(LinkLayer_ptr link, bool inbound = false) = 0;
virtual bool
StartLinks(Logic_ptr logic, std::shared_ptr<thread::ThreadPool> worker) = 0;
StartLinks(Logic_ptr logic) = 0;
virtual void
Stop() = 0;

@ -89,12 +89,12 @@ namespace llarp
}
bool
LinkManager::StartLinks(Logic_ptr logic, std::shared_ptr<thread::ThreadPool> worker)
LinkManager::StartLinks(Logic_ptr logic)
{
LogInfo("starting ", outboundLinks.size(), " outbound links");
for (const auto& link : outboundLinks)
{
if (!link->Start(logic, worker))
if (!link->Start(logic))
{
LogWarn("outbound link '", link->Name(), "' failed to start");
return false;
@ -107,7 +107,7 @@ namespace llarp
LogInfo("starting ", inboundLinks.size(), " inbound links");
for (const auto& link : inboundLinks)
{
if (!link->Start(logic, worker))
if (!link->Start(logic))
{
LogWarn("Link ", link->Name(), " failed to start");
return false;

@ -4,7 +4,6 @@
#include <link/i_link_manager.hpp>
#include <util/compare_ptr.hpp>
#include <util/thread/threading.hpp>
#include <link/server.hpp>
#include <unordered_map>
@ -42,7 +41,7 @@ namespace llarp
AddLink(LinkLayer_ptr link, bool inbound = false) override;
bool
StartLinks(Logic_ptr logic, std::shared_ptr<thread::ThreadPool> worker) override;
StartLinks(Logic_ptr logic) override;
void
Stop() override;

@ -22,7 +22,8 @@ namespace llarp
SessionRenegotiateHandler reneg,
TimeoutHandler timeout,
SessionClosedHandler closed,
PumpDoneHandler pumpDone)
PumpDoneHandler pumpDone,
WorkerFunc_t work)
: HandleMessage(std::move(handler))
, HandleTimeout(std::move(timeout))
, Sign(std::move(signbuf))
@ -31,6 +32,7 @@ namespace llarp
, SessionClosed(std::move(closed))
, SessionRenegotiate(std::move(reneg))
, PumpDone(std::move(pumpDone))
, QueueWork(std::move(work))
, m_RouterEncSecret(keyManager->encryptionKey)
, m_SecretKey(keyManager->transportKey)
{
@ -318,9 +320,8 @@ namespace llarp
}
bool
ILinkLayer::Start(std::shared_ptr<Logic> l, std::shared_ptr<thread::ThreadPool> worker)
ILinkLayer::Start(std::shared_ptr<Logic> l)
{
m_Worker = worker;
m_Logic = l;
ScheduleTick(LINK_LAYER_TICK_INTERVAL);
return true;

@ -47,6 +47,10 @@ namespace llarp
/// messages to upper layers
using PumpDoneHandler = std::function<void(void)>;
using Work_t = std::function<void(void)>;
/// queue work to worker thread
using WorkerFunc_t = std::function<void(Work_t)>;
struct ILinkLayer
{
ILinkLayer(
@ -58,7 +62,8 @@ namespace llarp
SessionRenegotiateHandler renegotiate,
TimeoutHandler timeout,
SessionClosedHandler closed,
PumpDoneHandler pumpDone);
PumpDoneHandler pumpDone,
WorkerFunc_t doWork);
virtual ~ILinkLayer();
/// get current time via event loop
@ -106,7 +111,7 @@ namespace llarp
TryEstablishTo(RouterContact rc);
bool
Start(std::shared_ptr<llarp::Logic> l, std::shared_ptr<thread::ThreadPool> worker);
Start(std::shared_ptr<llarp::Logic> l);
virtual void
Stop();
@ -176,6 +181,7 @@ namespace llarp
SessionRenegotiateHandler SessionRenegotiate;
PumpDoneHandler PumpDone;
std::shared_ptr<KeyManager> keyManager;
WorkerFunc_t QueueWork;
std::shared_ptr<Logic>
logic()
@ -223,7 +229,6 @@ namespace llarp
PutSession(const std::shared_ptr<ILinkSession>& s);
std::shared_ptr<llarp::Logic> m_Logic = nullptr;
std::shared_ptr<llarp::thread::ThreadPool> m_Worker = nullptr;
llarp_ev_loop_ptr m_Loop;
IpAddress m_ourAddr;
llarp_udp_io m_udp;

@ -241,7 +241,7 @@ namespace llarp
auto func =
std::bind(&LR_StatusMessage::CreateAndSend, router, pathid, nextHop, pathKey, status);
router->threadpool()->addJob(func);
router->QueueWork(func);
}
/// this is done from logic thread
@ -478,7 +478,10 @@ namespace llarp
auto frameDecrypt = std::make_shared<LRCMFrameDecrypt>(context, std::move(decrypter), this);
// decrypt frames async
frameDecrypt->decrypter->AsyncDecrypt(context->Worker(), frameDecrypt->frames[0], frameDecrypt);
frameDecrypt->decrypter->AsyncDecrypt(
frameDecrypt->frames[0], frameDecrypt, [r = context->Router()](auto func) {
r->QueueWork(std::move(func));
});
return true;
}
} // namespace llarp

@ -55,7 +55,7 @@ namespace llarp
queue_handle()
{
auto func = std::bind(&llarp::LRSM_AsyncHandler::handle, shared_from_this());
router->threadpool()->addJob(func);
router->QueueWork(func);
}
};

@ -417,6 +417,31 @@ namespace llarp
if (ifa)
freeifaddrs(ifa);
}
namespace net
{
std::string
LoopbackInterfaceName()
{
const auto loopback = IPRange::FromIPv4(127, 0, 0, 0, 8);
std::string ifname;
IterAllNetworkInterfaces([&ifname, loopback](ifaddrs* const i) {
if (i->ifa_addr and i->ifa_addr->sa_family == AF_INET)
{
llarp::nuint32_t addr{((sockaddr_in*)i->ifa_addr)->sin_addr.s_addr};
if (loopback.ContainsV4(xntohl(addr)))
{
ifname = i->ifa_name;
}
}
});
if (ifname.empty())
{
throw std::runtime_error(
"we have no ipv4 loopback interface for some ungodly reason, yeah idk fam");
}
return ifname;
}
} // namespace net
bool
GetBestNetIF(std::string& ifname, int af)

@ -14,4 +14,12 @@ if_nametoindex(const char* __ifname) __THROW;
#include <net/if.h>
#endif
#endif
namespace llarp::net
{
/// get the name of the loopback interface
std::string
LoopbackInterfaceName();
} // namespace llarp::net
#endif

@ -9,7 +9,6 @@
#include <util/logging/logger.hpp>
#include <util/mem.hpp>
#include <util/thread/logic.hpp>
#include <util/thread/thread_pool.hpp>
#include <util/str.hpp>
#include <dht/kademlia.hpp>
@ -84,8 +83,7 @@ llarp_nodedb::RemoveIf(std::function<bool(const llarp::RouterContact& rc)> filte
++itr;
}
}
disk->addJob(std::bind(&KillRCJobs, files));
disk(std::bind(&KillRCJobs, files));
}
bool
@ -166,7 +164,7 @@ llarp_nodedb::InsertAsync(
std::shared_ptr<llarp::Logic> logic,
std::function<void(void)> completionHandler)
{
disk->addJob([this, rc, logic, completionHandler]() {
disk([this, rc, logic, completionHandler]() {
this->Insert(rc);
if (logic && completionHandler)
{
@ -277,7 +275,7 @@ llarp_nodedb::ShouldSaveToDisk(llarp_time_t now) const
void
llarp_nodedb::AsyncFlushToDisk()
{
disk->addJob(std::bind(&llarp_nodedb::SaveAll, this));
disk(std::bind(&llarp_nodedb::SaveAll, this));
m_NextSaveToDisk = llarp::time_now_ms() + m_SaveInterval;
}
@ -362,86 +360,47 @@ llarp_nodedb::RemoveStaleRCs(const std::set<llarp::RouterID>& keep, llarp_time_t
});
}
/*
bool
llarp_nodedb::Save()
{
auto itr = entries.begin();
while(itr != entries.end())
{
llarp::pubkey pk = itr->first;
llarp_rc *rc= itr->second;
itr++; // advance
}
return true;
}
*/
// call request hook
void
logic_threadworker_callback(void* user)
{
auto* verify_request = static_cast<llarp_async_verify_rc*>(user);
if (verify_request->hook)
verify_request->hook(verify_request);
}
// write it to disk
void
disk_threadworker_setRC(llarp_async_verify_rc* verify_request)
{
verify_request->valid = verify_request->nodedb->Insert(verify_request->rc);
if (verify_request->logic)
verify_request->logic->queue_job({verify_request, &logic_threadworker_callback});
{
LogicCall(verify_request->logic, [verify_request]() {
if (verify_request->hook)
verify_request->hook(verify_request);
});
}
}
// we run the crypto verify in the crypto threadpool worker
void
crypto_threadworker_verifyrc(void* user)
crypto_threadworker_verifyrc(llarp_async_verify_rc* verify_request)
{
auto* verify_request = static_cast<llarp_async_verify_rc*>(user);
llarp::RouterContact rc = verify_request->rc;
verify_request->valid = rc.Verify(llarp::time_now_ms());
// if it's valid we need to set it
if (verify_request->valid && rc.IsPublicRouter())
{
if (verify_request->diskworker)
if (verify_request->disk)
{
llarp::LogDebug("RC is valid, saving to disk");
verify_request->diskworker->addJob(std::bind(&disk_threadworker_setRC, verify_request));
verify_request->disk(std::bind(&disk_threadworker_setRC, verify_request));
return;
}
}
// callback to logic thread
verify_request->logic->queue_job({verify_request, &logic_threadworker_callback});
}
void
nodedb_inform_load_rc(void* user)
{
auto* job = static_cast<llarp_async_load_rc*>(user);
job->hook(job);
LogicCall(verify_request->logic, [verify_request]() {
if (verify_request->hook)
verify_request->hook(verify_request);
});
}
void
llarp_nodedb_async_verify(struct llarp_async_verify_rc* job)
{
job->cryptoworker->addJob(std::bind(&crypto_threadworker_verifyrc, job));
}
void
nodedb_async_load_rc(void* user)
{
auto* job = static_cast<llarp_async_load_rc*>(user);
auto fpath = job->nodedb->getRCFilePath(job->pubkey);
job->loaded = job->nodedb->loadfile(fpath);
if (job->loaded)
{
job->nodedb->Get(job->pubkey, job->result);
}
job->logic->queue_job({job, &nodedb_inform_load_rc});
job->worker(std::bind(&crypto_threadworker_verifyrc, job));
}
void

@ -17,24 +17,20 @@
*
* persistent storage API for router contacts
*/
struct llarp_threadpool;
namespace llarp
{
class Logic;
namespace thread
{
class ThreadPool;
}
} // namespace llarp
struct llarp_nodedb
{
explicit llarp_nodedb(
std::shared_ptr<llarp::thread::ThreadPool> diskworker, const std::string rootdir)
: disk(std::move(diskworker)), nodePath(rootdir)
using DiskJob_t = std::function<void(void)>;
using DiskCaller_t = std::function<void(DiskJob_t)>;
using WorkJob_t = std::function<void(void)>;
using WorkCaller_t = std::function<void(WorkJob_t)>;
explicit llarp_nodedb(const std::string rootdir, DiskCaller_t diskCaller)
: disk(std::move(diskCaller)), nodePath(rootdir)
{
}
@ -44,7 +40,7 @@ struct llarp_nodedb
Clear();
}
std::shared_ptr<llarp::thread::ThreadPool> disk;
const DiskCaller_t disk;
mutable llarp::util::Mutex access; // protects entries
/// time for next save to disk event, 0 if never happened
llarp_time_t m_NextSaveToDisk = 0s;
@ -182,8 +178,8 @@ struct llarp_async_verify_rc
llarp_nodedb* nodedb;
// llarp::Logic for queue_job
std::shared_ptr<llarp::Logic> logic;
std::shared_ptr<llarp::thread::ThreadPool> cryptoworker;
std::shared_ptr<llarp::thread::ThreadPool> diskworker;
llarp_nodedb::WorkCaller_t worker;
llarp_nodedb::DiskCaller_t disk;
/// router contact
llarp::RouterContact rc;
@ -215,7 +211,7 @@ struct llarp_async_load_rc
/// llarp::Logic for calling hook
llarp::Logic* logic;
/// disk worker threadpool
llarp::thread::ThreadPool* diskworker;
llarp_nodedb::DiskCaller_t disk;
/// target pubkey
llarp::PubKey pubkey;
/// router contact result

@ -451,7 +451,7 @@ namespace llarp
{
TrafficQueue_ptr data = nullptr;
std::swap(m_UpstreamQueue, data);
r->threadpool()->addJob(
r->QueueWork(
[self = shared_from_this(), data, r]() { self->UpstreamWork(std::move(data), r); });
}
}
@ -463,7 +463,7 @@ namespace llarp
{
TrafficQueue_ptr data = nullptr;
std::swap(m_DownstreamQueue, data);
r->threadpool()->addJob(
r->QueueWork(
[self = shared_from_this(), data, r]() { self->DownstreamWork(std::move(data), r); });
}
}

@ -28,12 +28,6 @@ namespace llarp
return m_AllowTransit;
}
std::shared_ptr<thread::ThreadPool>
PathContext::Worker()
{
return m_Router->threadpool();
}
bool
PathContext::CheckPathLimitHitByIP(const IpAddress& ip)
{

@ -34,7 +34,7 @@ namespace llarp
struct PathContext
{
PathContext(AbstractRouter* router);
explicit PathContext(AbstractRouter* router);
/// called from router tick function
void
@ -147,9 +147,6 @@ namespace llarp
}
};
std::shared_ptr<thread::ThreadPool>
Worker();
std::shared_ptr<Logic>
logic();

@ -16,6 +16,8 @@ namespace llarp
{
struct AsyncPathKeyExchangeContext : std::enable_shared_from_this<AsyncPathKeyExchangeContext>
{
using WorkFunc_t = std::function<void(void)>;
using WorkerFunc_t = std::function<void(WorkFunc_t)>;
using Path_t = path::Path_ptr;
using PathSet_t = path::PathSet_ptr;
PathSet_t pathset = nullptr;
@ -25,7 +27,7 @@ namespace llarp
Handler result;
size_t idx = 0;
AbstractRouter* router = nullptr;
std::shared_ptr<thread::ThreadPool> worker;
WorkerFunc_t work;
std::shared_ptr<Logic> logic;
LR_CommitMessage LRCM;
@ -100,26 +102,24 @@ namespace llarp
else
{
// next hop
worker->addJob(
std::bind(&AsyncPathKeyExchangeContext::GenerateNextKey, shared_from_this()));
work(std::bind(&AsyncPathKeyExchangeContext::GenerateNextKey, shared_from_this()));
}
}
/// Generate all keys asynchronously and call handler when done
void
AsyncGenerateKeys(
Path_t p, std::shared_ptr<Logic> l, std::shared_ptr<thread::ThreadPool> pool, Handler func)
AsyncGenerateKeys(Path_t p, std::shared_ptr<Logic> l, WorkerFunc_t worker, Handler func)
{
path = p;
logic = l;
result = func;
worker = pool;
work = worker;
for (size_t i = 0; i < path::max_len; ++i)
{
LRCM.frames[i].Randomize();
}
pool->addJob(std::bind(&AsyncPathKeyExchangeContext::GenerateNextKey, shared_from_this()));
work(std::bind(&AsyncPathKeyExchangeContext::GenerateNextKey, shared_from_this()));
}
};
@ -442,7 +442,10 @@ namespace llarp
path->SetBuildResultHook([self](Path_ptr p) { self->HandlePathBuilt(p); });
ctx->AsyncGenerateKeys(
path, m_router->logic(), m_router->threadpool(), &PathBuilderKeysGenerated);
path,
m_router->logic(),
[r = m_router](auto func) { r->QueueWork(std::move(func)); },
&PathBuilderKeysGenerated);
}
void

@ -248,9 +248,9 @@ namespace llarp
{
if (m_UpstreamQueue && not m_UpstreamQueue->empty())
{
r->threadpool()->addJob([self = shared_from_this(),
data = std::move(m_UpstreamQueue),
r]() { self->UpstreamWork(data, r); });
r->QueueWork([self = shared_from_this(), data = std::move(m_UpstreamQueue), r]() {
self->UpstreamWork(data, r);
});
}
m_UpstreamQueue = nullptr;
}
@ -260,9 +260,9 @@ namespace llarp
{
if (m_DownstreamQueue && not m_DownstreamQueue->empty())
{
r->threadpool()->addJob([self = shared_from_this(),
data = std::move(m_DownstreamQueue),
r]() { self->DownstreamWork(data, r); });
r->QueueWork([self = shared_from_this(), data = std::move(m_DownstreamQueue), r]() {
self->DownstreamWork(data, r);
});
}
m_DownstreamQueue = nullptr;
}

@ -7,6 +7,7 @@
#include <routing/handler.hpp>
#include <router_id.hpp>
#include <util/compare_ptr.hpp>
#include <util/thread/queue.hpp>
namespace llarp
{

@ -77,7 +77,7 @@ namespace llarp
struct AbstractRouter
{
#ifdef LOKINET_HIVE
tooling::RouterHive* hive;
tooling::RouterHive* hive = nullptr;
#endif
virtual ~AbstractRouter() = default;
@ -127,11 +127,13 @@ namespace llarp
virtual llarp_ev_loop_ptr
netloop() const = 0;
virtual std::shared_ptr<thread::ThreadPool>
threadpool() = 0;
/// call function in crypto worker
virtual void
QueueWork(std::function<void(void)>) = 0;
virtual std::shared_ptr<thread::ThreadPool>
diskworker() = 0;
/// call function in disk io thread
virtual void
QueueDiskIO(std::function<void(void)>) = 0;
virtual service::Context&
hiddenServiceContext() = 0;
@ -286,14 +288,10 @@ namespace llarp
template <class EventType, class... Params>
void
NotifyRouterEvent(Params&&... args) const
NotifyRouterEvent([[maybe_unused]] Params&&... args) const
{
// TODO: no-op when appropriate
auto event = std::make_unique<EventType>(args...);
#ifdef LOKINET_HIVE
hive->NotifyEvent(std::move(event));
#elif LOKINET_DEBUG
LogDebug(event->ToString());
hive->NotifyEvent(std::make_unique<EventType>(args...));
#endif
}
};

@ -5,13 +5,13 @@
#include <util/thread/logic.hpp>
#include <util/thread/queue.hpp>
#include <util/thread/threading.hpp>
#include <path/path_types.hpp>
#include <router_id.hpp>
#include <list>
#include <unordered_map>
#include <utility>
#include <queue>
struct llarp_buffer_t;

@ -46,7 +46,7 @@ namespace llarp
}
auto func = std::bind(&OutboundSessionMaker::VerifyRC, this, session->GetRemoteRC());
_threadpool->addJob(func);
work(func);
return true;
}
@ -157,14 +157,14 @@ namespace llarp
Profiling* profiler,
std::shared_ptr<Logic> logic,
llarp_nodedb* nodedb,
std::shared_ptr<llarp::thread::ThreadPool> threadpool)
WorkerFunc_t dowork)
{
_linkManager = linkManager;
_rcLookup = rcLookup;
_logic = logic;
_nodedb = nodedb;
_threadpool = threadpool;
_profiler = profiler;
work = dowork;
}
void

@ -6,7 +6,6 @@
#include <router/i_rc_lookup_handler.hpp>
#include <util/thread/logic.hpp>
#include <util/thread/threading.hpp>
#include <util/thread/thread_pool.hpp>
#include <profiling.hpp>
@ -25,6 +24,9 @@ namespace llarp
struct OutboundSessionMaker final : public IOutboundSessionMaker
{
using Work_t = std::function<void(void)>;
using WorkerFunc_t = std::function<void(Work_t)>;
using CallbacksQueue = std::list<RouterCallback>;
public:
@ -61,7 +63,7 @@ namespace llarp
Profiling* profiler,
std::shared_ptr<Logic> logic,
llarp_nodedb* nodedb,
std::shared_ptr<llarp::thread::ThreadPool> threadpool);
WorkerFunc_t work);
void
SetOurRouter(RouterID r)
@ -113,7 +115,7 @@ namespace llarp
Profiling* _profiler = nullptr;
llarp_nodedb* _nodedb = nullptr;
std::shared_ptr<Logic> _logic;
std::shared_ptr<llarp::thread::ThreadPool> _threadpool;
WorkerFunc_t work;
RouterID us;
};

@ -194,7 +194,7 @@ namespace llarp
return false;
auto func = std::bind(&RCLookupHandler::CheckRC, this, newrc);
_threadpool->addJob(func);
_work(func);
// update dht if required
if (_dht->impl->Nodes()->HasNode(dht::Key_t{newrc.pubkey}))
@ -299,7 +299,7 @@ namespace llarp
RCLookupHandler::Init(
llarp_dht_context* dht,
llarp_nodedb* nodedb,
std::shared_ptr<llarp::thread::ThreadPool> threadpool,
WorkerFunc_t dowork,
ILinkManager* linkManager,
service::Context* hiddenServiceContext,
const std::set<RouterID>& strictConnectPubkeys,
@ -309,7 +309,7 @@ namespace llarp
{
_dht = dht;
_nodedb = nodedb;
_threadpool = threadpool;
_work = dowork;
_hiddenServiceContext = hiddenServiceContext;
_strictConnectPubkeys = strictConnectPubkeys;
_bootstrapRCList = bootstrapRCList;

@ -5,7 +5,6 @@
#include <router/i_rc_lookup_handler.hpp>
#include <util/thread/threading.hpp>
#include <util/thread/thread_pool.hpp>
#include <unordered_map>
#include <set>
@ -27,6 +26,8 @@ namespace llarp
struct RCLookupHandler final : public I_RCLookupHandler
{
public:
using Work_t = std::function<void(void)>;
using WorkerFunc_t = std::function<void(Work_t)>;
using CallbacksQueue = std::list<RCRequestCallback>;
~RCLookupHandler() override = default;
@ -72,7 +73,7 @@ namespace llarp
Init(
llarp_dht_context* dht,
llarp_nodedb* nodedb,
std::shared_ptr<llarp::thread::ThreadPool> threadpool,
WorkerFunc_t dowork,
ILinkManager* linkManager,
service::Context* hiddenServiceContext,
const std::set<RouterID>& strictConnectPubkeys,
@ -98,7 +99,7 @@ namespace llarp
llarp_dht_context* _dht = nullptr;
llarp_nodedb* _nodedb = nullptr;
std::shared_ptr<llarp::thread::ThreadPool> _threadpool = nullptr;
WorkerFunc_t _work = nullptr;
service::Context* _hiddenServiceContext = nullptr;
ILinkManager* _linkManager = nullptr;

@ -44,19 +44,15 @@ static constexpr std::chrono::milliseconds ROUTER_TICK_INTERVAL = 1s;
namespace llarp
{
Router::Router(
std::shared_ptr<llarp::thread::ThreadPool> _tp,
llarp_ev_loop_ptr __netloop,
std::shared_ptr<Logic> l)
Router::Router(llarp_ev_loop_ptr __netloop, std::shared_ptr<Logic> l)
: ready(false)
, m_lmq(std::make_shared<lokimq::LokiMQ>())
, _netloop(std::move(__netloop))
, cryptoworker(std::move(_tp))
, _logic(std::move(l))
, paths(this)
, _exitContext(this)
, disk(std::make_shared<llarp::thread::ThreadPool>(1, 1000, "diskworker"))
, _dht(llarp_dht_context_new(this))
, m_DiskThread(m_lmq->add_tagged_thread("disk"))
, inbound_link_msg_parser(this)
, _hiddenServiceContext(this)
, m_RPCServer(new rpc::RpcServer(m_lmq, this))
@ -237,35 +233,34 @@ namespace llarp
bool
Router::Configure(Config* conf, bool isRouter, llarp_nodedb* nodedb)
{
if (nodedb == nullptr)
// we need this first so we can start lmq to fetch keys
if (conf)
{
throw std::invalid_argument("nodedb cannot be null");
enableRPCServer = conf->api.m_enableRPCServer;
rpcBindAddr = conf->api.m_rpcBindAddr;
whitelistRouters = conf->lokid.whitelistRouters;
lokidRPCAddr = conf->lokid.lokidRPCAddr;
}
_nodedb = nodedb;
// we need this first so we can start lmq to fetch keys
enableRPCServer = conf->api.m_enableRPCServer;
rpcBindAddr = conf->api.m_rpcBindAddr;
whitelistRouters = conf->lokid.whitelistRouters;
lokidRPCAddr = conf->lokid.lokidRPCAddr;
if (not StartRpcServer())
throw std::runtime_error("Failed to start rpc server");
m_lmq->start();
_nodedb = nodedb;
if (whitelistRouters)
{
m_lokidRpcClient->ConnectAsync(std::string_view{lokidRPCAddr});
}
// fetch keys
if (not m_keyManager->initialize(*conf, true, isRouter))
throw std::runtime_error("KeyManager failed to initialize");
if (!FromConfig(conf))
throw std::runtime_error("FromConfig() failed");
if (conf)
{
if (not m_keyManager->initialize(*conf, true, isRouter))
throw std::runtime_error("KeyManager failed to initialize");
if (!FromConfig(conf))
throw std::runtime_error("FromConfig() failed");
}
if (!InitOutboundLinks())
throw std::runtime_error("InitOutboundLinks() failed");
@ -293,7 +288,7 @@ namespace llarp
LogError("RC is invalid, not saving");
return false;
}
diskworker()->addJob(std::bind(&Router::HandleSaveRC, this));
QueueDiskIO(std::bind(&Router::HandleSaveRC, this));
return true;
}
@ -308,8 +303,6 @@ namespace llarp
{
LogInfo("closing router");
llarp_ev_loop_stop(_netloop);
disk->stop();
disk->shutdown();
_running.store(false);
}
@ -527,12 +520,17 @@ namespace llarp
// Init components after relevant config settings loaded
_outboundMessageHandler.Init(&_linkManager, _logic);
_outboundSessionMaker.Init(
&_linkManager, &_rcLookupHandler, &_routerProfiling, _logic, _nodedb, threadpool());
&_linkManager,
&_rcLookupHandler,
&_routerProfiling,
_logic,
_nodedb,
util::memFn(&AbstractRouter::QueueWork, this));
_linkManager.Init(&_outboundSessionMaker);
_rcLookupHandler.Init(
_dht,
_nodedb,
threadpool(),
util::memFn(&AbstractRouter::QueueWork, this),
&_linkManager,
&_hiddenServiceContext,
strictConnectPubkeys,
@ -552,7 +550,8 @@ namespace llarp
util::memFn(&AbstractRouter::CheckRenegotiateValid, this),
util::memFn(&IOutboundSessionMaker::OnConnectTimeout, &_outboundSessionMaker),
util::memFn(&AbstractRouter::SessionClosed, this),
util::memFn(&AbstractRouter::PumpLL, this));
util::memFn(&AbstractRouter::PumpLL, this),
util::memFn(&AbstractRouter::QueueWork, this));
const std::string& key = serverConfig.interface;
int af = serverConfig.addressFamily;
@ -590,7 +589,7 @@ namespace llarp
conf->logging.m_logType,
conf->logging.m_logFile,
conf->router.m_nickname,
diskworker());
util::memFn(&AbstractRouter::QueueDiskIO, this));
return true;
}
@ -753,7 +752,7 @@ namespace llarp
// save profiles
if (routerProfiling().ShouldSave(now))
{
diskworker()->addJob([&]() { routerProfiling().Save(routerProfilesFile.c_str()); });
QueueDiskIO([&]() { routerProfiling().Save(routerProfilesFile.c_str()); });
}
// save nodedb
if (nodedb()->ShouldSaveToDisk(now))
@ -869,18 +868,6 @@ namespace llarp
if (_running || _stopping)
return false;
if (!cryptoworker->start())
{
LogError("crypto worker failed to start");
return false;
}
if (!disk->start())
{
LogError("disk worker failed to start");
return false;
}
routerProfiling().Load(routerProfilesFile.c_str());
// set public signing key
@ -932,7 +919,7 @@ namespace llarp
}
}
_outboundSessionMaker.SetOurRouter(pubkey());
if (!_linkManager.StartLinks(_logic, cryptoworker))
if (!_linkManager.StartLinks(_logic))
{
LogWarn("One or more links failed to start.");
return false;
@ -1157,7 +1144,8 @@ namespace llarp
util::memFn(&AbstractRouter::CheckRenegotiateValid, this),
util::memFn(&IOutboundSessionMaker::OnConnectTimeout, &_outboundSessionMaker),
util::memFn(&AbstractRouter::SessionClosed, this),
util::memFn(&AbstractRouter::PumpLL, this));
util::memFn(&AbstractRouter::PumpLL, this),
util::memFn(&AbstractRouter::QueueWork, this));
if (!link)
throw std::runtime_error("NewOutboundLink() failed to provide a link");

@ -34,7 +34,6 @@
#include <util/status.hpp>
#include <util/str.hpp>
#include <util/thread/logic.hpp>
#include <util/thread/threadpool.h>
#include <util/time.hpp>
#include <functional>
@ -166,31 +165,30 @@ namespace llarp
return _netloop;
}
std::shared_ptr<llarp::thread::ThreadPool>
threadpool() override
void
QueueWork(std::function<void(void)> func) override
{
return cryptoworker;
m_lmq->job(std::move(func));
}
std::shared_ptr<llarp::thread::ThreadPool>
diskworker() override
void
QueueDiskIO(std::function<void(void)> func) override
{
return disk;
m_lmq->job(std::move(func), m_DiskThread);
}
IpAddress _ourAddress;
llarp_ev_loop_ptr _netloop;
std::shared_ptr<llarp::thread::ThreadPool> cryptoworker;
std::shared_ptr<Logic> _logic;
path::PathContext paths;
exit::Context _exitContext;
SecretKey _identity;
SecretKey _encryption;
std::shared_ptr<thread::ThreadPool> disk;
llarp_dht_context* _dht = nullptr;
llarp_nodedb* _nodedb;
llarp_time_t _startedAt;
const lokimq::TaggedThreadID m_DiskThread;
llarp_time_t
Uptime() const override;
@ -311,10 +309,7 @@ namespace llarp
void
GossipRCIfNeeded(const RouterContact rc) override;
Router(
std::shared_ptr<llarp::thread::ThreadPool> worker,
llarp_ev_loop_ptr __netloop,
std::shared_ptr<Logic> logic);
explicit Router(llarp_ev_loop_ptr __netloop, std::shared_ptr<Logic> logic);
~Router() override;

@ -1,6 +1,5 @@
#include <rpc/lokid_rpc_client.hpp>
#include <util/logging/logger.h>
#include <util/logging/logger.hpp>
#include <router/abstractrouter.hpp>
@ -173,7 +172,7 @@ namespace llarp
promise.set_value(std::nullopt);
return;
}
if (data.size() < 2)
if (data.empty())
{
LogError("failed to get private key, no response");
promise.set_value(std::nullopt);
@ -181,7 +180,7 @@ namespace llarp
}
try
{
auto j = nlohmann::json::parse(data[1]);
auto j = nlohmann::json::parse(data[0]);
SecretKey k;
if (not k.FromHex(j.at("service_node_ed25519_privkey").get<std::string>()))
{

@ -689,8 +689,8 @@ namespace llarp
{
llarp_async_verify_rc* job = new llarp_async_verify_rc();
job->nodedb = Router()->nodedb();
job->cryptoworker = Router()->threadpool();
job->diskworker = Router()->diskworker();
job->worker = util::memFn(&AbstractRouter::QueueWork, Router());
job->disk = util::memFn(&AbstractRouter::QueueDiskIO, Router());
job->logic = Router()->logic();
job->hook = std::bind(&Endpoint::HandleVerifyGotRouter, this, msg, std::placeholders::_1);
job->rc = rc;
@ -908,7 +908,7 @@ namespace llarp
RemoveConvoTag(frame.T);
return true;
}
if (!frame.AsyncDecryptAndVerify(EndpointLogic(), p, CryptoWorker(), m_Identity, this))
if (!frame.AsyncDecryptAndVerify(EndpointLogic(), p, m_Identity, this))
{
// send discard
ProtocolFrame f;
@ -1240,7 +1240,7 @@ namespace llarp
f.F = m->introReply.pathID;
transfer->P = remoteIntro.pathID;
auto self = this;
return CryptoWorker()->addJob([transfer, p, m, K, self]() {
Router()->QueueWork([transfer, p, m, K, self]() {
if (not transfer->T.EncryptAndSign(*m, K, self->m_Identity))
{
LogError("failed to encrypt and sign");
@ -1250,6 +1250,7 @@ namespace llarp
util::Lock lock(self->m_state->m_SendQueueMutex);
self->m_state->m_SendQueue.emplace_back(transfer, p);
});
return true;
}
}
}
@ -1333,12 +1334,6 @@ namespace llarp
return m_state->m_IsolatedLogic ? m_state->m_IsolatedLogic : Router()->logic();
}
std::shared_ptr<llarp::thread::ThreadPool>
Endpoint::CryptoWorker()
{
return Router()->threadpool();
}
AbstractRouter*
Endpoint::Router()
{

@ -141,10 +141,6 @@ namespace llarp
llarp_ev_loop_ptr
EndpointNetLoop();
/// crypto worker threadpool
std::shared_ptr<llarp::thread::ThreadPool>
CryptoWorker();
AbstractRouter*
Router();

@ -207,7 +207,7 @@ namespace llarp
ex->msg.PutBuffer(payload);
ex->msg.introReply = path->intro;
frame->F = ex->msg.introReply.pathID;
m_Endpoint->CryptoWorker()->addJob(std::bind(&AsyncKeyExchange::Encrypt, ex, frame));
m_Endpoint->Router()->QueueWork(std::bind(&AsyncKeyExchange::Encrypt, ex, frame));
}
std::string

@ -6,6 +6,7 @@
#include <util/meta/memfn.hpp>
#include <util/thread/logic.hpp>
#include <service/endpoint.hpp>
#include <router/abstractrouter.hpp>
#include <utility>
namespace llarp
@ -384,7 +385,6 @@ namespace llarp
ProtocolFrame::AsyncDecryptAndVerify(
std::shared_ptr<Logic> logic,
path::Path_ptr recvPath,
const std::shared_ptr<llarp::thread::ThreadPool>& worker,
const Identity& localIdent,
Endpoint* handler) const
{
@ -397,7 +397,7 @@ namespace llarp
auto dh = std::make_shared<AsyncFrameDecrypt>(
logic, localIdent, handler, msg, *this, recvPath->intro);
dh->path = recvPath;
worker->addJob(std::bind(&AsyncFrameDecrypt::Work, dh));
handler->Router()->QueueWork(std::bind(&AsyncFrameDecrypt::Work, dh));
return true;
}
@ -415,7 +415,7 @@ namespace llarp
return false;
}
v->frame = *this;
worker->addJob([v, msg = std::move(msg), recvPath = std::move(recvPath)]() {
handler->Router()->QueueWork([v, msg = std::move(msg), recvPath = std::move(recvPath)]() {
if (not v->frame.Verify(v->si))
{
LogError("Signature failure from ", v->si.Addr());

@ -125,7 +125,6 @@ namespace llarp
AsyncDecryptAndVerify(
std::shared_ptr<Logic> logic,
path::Path_ptr fromPath,
const std::shared_ptr<llarp::thread::ThreadPool>& worker,
const Identity& localIdent,
Endpoint* handler) const;

@ -98,7 +98,7 @@ namespace llarp
m->tag = f->T;
m->PutBuffer(payload);
auto self = this;
m_Endpoint->CryptoWorker()->addJob([f, m, shared, path, self]() {
m_Endpoint->Router()->QueueWork([f, m, shared, path, self]() {
if (not f->EncryptAndSign(*m, shared, self->m_Endpoint->GetIdentity()))
{
LogError(self->m_Endpoint->Name(), " failed to sign message");

@ -26,7 +26,7 @@ namespace llarp
// namespace
FileLogStream::FileLogStream(
std::shared_ptr<thread::ThreadPool> disk, FILE* f, llarp_time_t flushInterval, bool closeFile)
std::function<void(Work_t)> disk, FILE* f, llarp_time_t flushInterval, bool closeFile)
: m_Lines(1024 * 8)
, m_Disk(std::move(disk))
, m_File(f)
@ -112,7 +112,7 @@ namespace llarp
{
FILE* const f = m_File;
auto lines = &m_Lines;
m_Disk->addJob([f, lines]() { Flush(lines, f); });
m_Disk([f, lines]() { Flush(lines, f); });
m_LastFlush = now;
}
} // namespace llarp

@ -3,7 +3,6 @@
#include <util/logging/logstream.hpp>
#include <util/thread/thread_pool.hpp>
#include <util/thread/queue.hpp>
#include <util/time.hpp>
@ -14,11 +13,10 @@ namespace llarp
/// flushable file based log stream
struct FileLogStream : public ILogStream
{
using Work_t = std::function<void(void)>;
FileLogStream(
std::shared_ptr<thread::ThreadPool> disk,
FILE* f,
llarp_time_t flushInterval,
bool closefile = true);
std::function<void(Work_t)> io, FILE* f, llarp_time_t flushInterval, bool closefile = true);
~FileLogStream() override;
@ -65,7 +63,7 @@ namespace llarp
void
FlushLinesToDisk(llarp_time_t now);
std::shared_ptr<thread::ThreadPool> m_Disk;
const std::function<void(Work_t)> m_Disk;
FILE* const m_File;
const llarp_time_t m_FlushInterval;
llarp_time_t m_LastFlush = 0s;

@ -8,11 +8,11 @@ namespace llarp
struct JSONLogStream : public FileLogStream
{
JSONLogStream(
std::shared_ptr<thread::ThreadPool> disk,
std::function<void(FileLogStream::Work_t)> disk,
FILE* f,
llarp_time_t flushInterval,
bool closeFile)
: FileLogStream(disk, f, flushInterval, closeFile)
: FileLogStream(std::move(disk), f, flushInterval, closeFile)
{
}

@ -1,5 +1,4 @@
#include <util/logging/logger.hpp>
#include <util/logging/logger.h>
#include <util/logging/ostream_logger.hpp>
#include <util/logging/logger_syslog.hpp>
#include <util/logging/file_logger.hpp>
@ -108,7 +107,7 @@ namespace llarp
LogType type,
const std::string& file,
const std::string& nickname,
std::shared_ptr<thread::ThreadPool> threadpool)
std::function<void(IOFunc_t)> io)
{
SetLogLevel(level);
nodeName = nickname;
@ -140,7 +139,7 @@ namespace llarp
std::cout << std::flush;
LogContext::Instance().logStream =
std::make_unique<FileLogStream>(threadpool, logfile, 100ms, true);
std::make_unique<FileLogStream>(io, logfile, 100ms, true);
}
else
{
@ -153,7 +152,7 @@ namespace llarp
std::cout << std::flush;
LogContext::Instance().logStream =
std::make_unique<JSONLogStream>(threadpool, logfile, 100ms, logfile != stdout);
std::make_unique<JSONLogStream>(io, logfile, 100ms, logfile != stdout);
break;
case LogType::Syslog:
if (logfile)
@ -188,18 +187,3 @@ namespace llarp
}
} // namespace llarp
extern "C"
{
void
cSetLogLevel(LogLevel lvl)
{
llarp::SetLogLevel((llarp::LogLevel)lvl);
}
void
cSetLogNodeName(const char* name)
{
llarp::LogContext::Instance().nodeName = name;
}
}

@ -1,24 +0,0 @@
#ifndef LLARP_LOGGER_H
#define LLARP_LOGGER_H
#ifdef __cplusplus
extern "C"
{
enum LogLevel
{
eLogDebug,
eLogInfo,
eLogWarn,
eLogError,
eLogNone
};
void
cSetLogLevel(enum LogLevel lvl);
void
cSetLogNodeName(const char* name);
}
#endif
#endif

@ -5,7 +5,6 @@
#include <util/time.hpp>
#include <util/logging/logstream.hpp>
#include <util/logging/logger_internal.hpp>
#include <util/thread/thread_pool.hpp>
namespace llarp
{
@ -21,6 +20,8 @@ namespace llarp
struct LogContext
{
using IOFunc_t = std::function<void(void)>;
LogContext();
LogLevel curLevel = eLogInfo;
LogLevel startupLevel = eLogInfo;
@ -50,14 +51,14 @@ namespace llarp
/// @param type is the type of logger to set up
/// @param file is the file to log to (relevant for types File and Json)
/// @param nickname is a tag to add to each log statement
/// @param threadpool is a threadpool where I/O can offloaded
/// @param io is a callable that queues work that does io, async
void
Initialize(
LogLevel level,
LogType type,
const std::string& file,
const std::string& nickname,
std::shared_ptr<thread::ThreadPool> threadpool);
std::function<void(IOFunc_t)> io);
};
/// RAII type to turn logging off

@ -6,17 +6,6 @@
namespace llarp
{
bool
Logic::queue_job(struct llarp_thread_job job)
{
if (job.user && job.work)
{
LogicCall(this, std::bind(job.work, job.user));
return true;
}
return false;
}
void
Logic::stop()
{

@ -3,7 +3,6 @@
#include <ev/ev.hpp>
#include <util/mem.h>
#include <util/thread/threadpool.h>
#include <optional>
namespace llarp
@ -15,9 +14,6 @@ namespace llarp
void
stop();
bool
queue_job(struct llarp_thread_job job);
void
Call(std::function<void(void)> func);

@ -1,331 +0,0 @@
#include <util/thread/thread_pool.hpp>
#include <util/thread/threading.hpp>
namespace llarp
{
namespace thread
{
void
ThreadPool::join()
{
for (auto& t : m_threads)
{
if (t.joinable())
{
t.join();
}
}
m_createdThreads = 0;
}
void
ThreadPool::runJobs()
{
while (m_status.load(std::memory_order_relaxed) == Status::Run)
{
auto functor = m_queue.tryPopFront();
if (functor)
{
(*functor)();
}
else
{
m_idleThreads++;
if (m_status == Status::Run && m_queue.empty())
{
m_semaphore.wait();
}
m_idleThreads.fetch_sub(1, std::memory_order_relaxed);
}
}
}
void
ThreadPool::drainQueue()
{
while (m_status.load(std::memory_order_relaxed) == Status::Drain)
{
auto functor = m_queue.tryPopFront();
if (!functor)
{
return;
}
(*functor)();
}
}
void
ThreadPool::waitThreads()
{
std::unique_lock lock{m_gateMutex};
m_numThreadsCV.wait(lock, [this] { return allThreadsReady(); });
}
void
ThreadPool::releaseThreads()
{
{
std::lock_guard lock{m_gateMutex};
m_numThreadsReady = 0;
++m_gateCount;
}
m_gateCV.notify_all();
}
void
ThreadPool::interrupt()
{
std::lock_guard lock{m_gateMutex};
size_t count = m_idleThreads;
for (size_t i = 0; i < count; ++i)
{
m_semaphore.notify();
}
}
void
ThreadPool::worker()
{
// Lock will be valid until the end of the statement
size_t gateCount = (std::lock_guard{m_gateMutex}, m_gateCount);
util::SetThreadName(m_name);
for (;;)
{
{
std::unique_lock lock{m_gateMutex};
++m_numThreadsReady;
m_numThreadsCV.notify_one();
m_gateCV.wait(lock, [&] { return gateCount != m_gateCount; });
gateCount = m_gateCount;
}
Status status = m_status.load(std::memory_order_relaxed);
// Can't use a switch here as we want to load and fall through.
if (status == Status::Run)
{
runJobs();
status = m_status;
}
if (status == Status::Drain)
{
drainQueue();
}
else if (status == Status::Suspend)
{
continue;
}
else
{
assert(status == Status::Stop);
return;
}
}
}
bool
ThreadPool::spawn()
{
try
{
m_threads.at(m_createdThreads) = std::thread(std::bind(&ThreadPool::worker, this));
++m_createdThreads;
return true;
}
catch (const std::system_error&)
{
return false;
}
}
ThreadPool::ThreadPool(size_t numThreads, size_t maxJobs, std::string_view name)
: m_queue(maxJobs)
, m_semaphore(0)
, m_idleThreads(0)
, m_status(Status::Stop)
, m_gateCount(0)
, m_numThreadsReady(0)
, m_name(name)
, m_threads(numThreads)
, m_createdThreads(0)
{
assert(numThreads != 0);
assert(maxJobs != 0);
disable();
}
ThreadPool::~ThreadPool()
{
shutdown();
}
bool
ThreadPool::addJob(const Job& job)
{
assert(job);
QueueReturn ret = m_queue.pushBack(job);
if (ret == QueueReturn::Success && m_idleThreads > 0)
{
m_semaphore.notify();
}
return ret == QueueReturn::Success;
}
bool
ThreadPool::addJob(Job&& job)
{
assert(job);
QueueReturn ret = m_queue.pushBack(std::move(job));
if (ret == QueueReturn::Success && m_idleThreads > 0)
{
m_semaphore.notify();
}
return ret == QueueReturn::Success;
}
bool
ThreadPool::tryAddJob(const Job& job)
{
assert(job);
QueueReturn ret = m_queue.tryPushBack(job);
if (ret == QueueReturn::Success && m_idleThreads > 0)
{
m_semaphore.notify();
}
return ret == QueueReturn::Success;
}
bool
ThreadPool::tryAddJob(Job&& job)
{
assert(job);
QueueReturn ret = m_queue.tryPushBack(std::move(job));
if (ret == QueueReturn::Success && m_idleThreads > 0)
{
m_semaphore.notify();
}
return ret == QueueReturn::Success;
}
void
ThreadPool::drain()
{
util::Lock lock(m_mutex);
if (m_status.load(std::memory_order_relaxed) == Status::Run)
{
m_status = Status::Drain;
interrupt();
waitThreads();
m_status = Status::Run;
releaseThreads();
}
}
void
ThreadPool::shutdown()
{
util::Lock lock(m_mutex);
if (m_status.load(std::memory_order_relaxed) == Status::Run)
{
m_queue.disable();
m_status = Status::Stop;
interrupt();
m_queue.removeAll();
join();
}
}
bool
ThreadPool::start()
{
util::Lock lock(m_mutex);
if (m_status.load(std::memory_order_relaxed) != Status::Stop)
{
return true;
}
for (auto it = (m_threads.begin() + m_createdThreads); it != m_threads.end(); ++it)
{
if (!spawn())
{
releaseThreads();
join();
return false;
}
}
waitThreads();
m_queue.enable();
m_status = Status::Run;
// `releaseThreads` has a release barrier so workers don't return from
// wait and not see the above store.
releaseThreads();
return true;
}
void
ThreadPool::stop()
{
util::Lock lock(m_mutex);
if (m_status.load(std::memory_order_relaxed) == Status::Run)
{
m_queue.disable();
m_status = Status::Drain;
// `interrupt` has an acquire barrier (locks a mutex), so nothing will
// be executed before the above store to `status`.
interrupt();
waitThreads();
m_status = Status::Stop;
// `releaseThreads` has a release barrier so workers don't return from
// wait and not see the above store.
releaseThreads();
join();
}
}
} // namespace thread
} // namespace llarp

@ -1,216 +0,0 @@
#ifndef LLARP_THREAD_POOL_HPP
#define LLARP_THREAD_POOL_HPP
#include <util/thread/queue.hpp>
#include <util/thread/threading.hpp>
#include <atomic>
#include <functional>
#include <thread>
#include <vector>
#include <string_view>
namespace llarp
{
namespace thread
{
class ThreadPool
{
// Provide an efficient fixed size threadpool. The following attributes
// of the threadpool are fixed at construction time:
// - the max number of pending jobs
// - the number of threads
public:
using Job = std::function<void()>;
using JobQueue = Queue<Job>;
enum class Status
{
Stop,
Run,
Suspend,
Drain
};
private:
JobQueue m_queue; // The job queue
util::Semaphore m_semaphore; // The semaphore for the queue.
std::atomic_size_t m_idleThreads; // Number of idle threads
util::Mutex m_mutex;
std::atomic<Status> m_status;
size_t m_gateCount GUARDED_BY(m_gateMutex);
size_t m_numThreadsReady GUARDED_BY(m_gateMutex); // Threads ready to go through the gate.
std::mutex m_gateMutex;
std::condition_variable m_gateCV;
std::condition_variable m_numThreadsCV;
std::string m_name;
std::vector<std::thread> m_threads;
size_t m_createdThreads;
void
join();
void
runJobs();
void
drainQueue();
void
waitThreads();
void
releaseThreads();
void
interrupt();
void
worker();
bool
spawn();
bool
allThreadsReady() const REQUIRES_SHARED(m_gateMutex)
{
return m_numThreadsReady == m_threads.size();
}
public:
ThreadPool(size_t numThreads, size_t maxJobs, std::string_view name);
~ThreadPool();
// Disable the threadpool. Calls to `addJob` and `tryAddJob` will fail.
// Jobs currently in the pool will not be affected.
void
disable();
void
enable();
// Add a job to the bool. Note this call will block if the underlying
// queue is full.
// Returns false if the queue is currently disabled.
bool
addJob(const Job& job);
bool
addJob(Job&& job);
// Try to add a job to the pool. If the queue is full, or the queue is
// disabled, return false.
// This call will not block.
bool
tryAddJob(const Job& job);
bool
tryAddJob(Job&& job);
// Wait until all current jobs are complete.
// If any jobs are submitted during this time, they **may** or **may not**
// run.
void
drain();
// Disable this pool, and cancel all pending jobs. After all currently
// running jobs are complete, join with the threads in the pool.
void
shutdown();
// Start this threadpool by spawning `threadCount()` threads.
bool
start();
// Disable queueing on this threadpool and wait until all pending jobs
// have finished.
void
stop();
bool
enabled() const;
bool
started() const;
size_t
activeThreadCount() const;
// Current number of queued jobs
size_t
jobCount() const;
// Number of threads passed in the constructor
size_t
threadCount() const;
// Number of threads currently started in the threadpool
size_t
startedThreadCount() const;
// Max number of queued jobs
size_t
capacity() const;
};
inline void
ThreadPool::disable()
{
m_queue.disable();
}
inline void
ThreadPool::enable()
{
m_queue.enable();
}
inline bool
ThreadPool::enabled() const
{
return m_queue.enabled();
}
inline size_t
ThreadPool::activeThreadCount() const
{
if (m_threads.size() == m_createdThreads)
{
return m_threads.size() - m_idleThreads.load(std::memory_order_relaxed);
}
return 0;
}
inline size_t
ThreadPool::threadCount() const
{
return m_threads.size();
}
inline size_t
ThreadPool::startedThreadCount() const
{
return m_createdThreads;
}
inline size_t
ThreadPool::jobCount() const
{
return m_queue.size();
}
inline size_t
ThreadPool::capacity() const
{
return m_queue.capacity();
}
} // namespace thread
} // namespace llarp
#endif

@ -1,89 +0,0 @@
#include <util/logging/logger.hpp>
#include <util/time.hpp>
#include <util/thread/threadpool.h>
#include <util/thread/thread_pool.hpp>
#include <cstring>
#include <functional>
#include <queue>
struct llarp_threadpool*
llarp_init_threadpool(int workers, const char* name, size_t queueLength)
{
if (workers <= 0)
workers = 1;
return new llarp_threadpool(workers, name, queueLength);
}
void
llarp_threadpool_join(struct llarp_threadpool* pool)
{
llarp::LogDebug("threadpool join");
if (pool->impl)
pool->impl->stop();
pool->impl.reset();
}
void
llarp_threadpool_start(struct llarp_threadpool* pool)
{
if (pool->impl)
pool->impl->start();
}
void
llarp_threadpool_stop(struct llarp_threadpool* pool)
{
llarp::LogDebug("threadpool stop");
if (pool->impl)
pool->impl->disable();
}
bool
llarp_threadpool_queue_job(struct llarp_threadpool* pool, struct llarp_thread_job job)
{
return llarp_threadpool_queue_job(pool, std::bind(job.work, job.user));
}
bool
llarp_threadpool_queue_job(struct llarp_threadpool* pool, std::function<void(void)> func)
{
return pool->impl && pool->impl->addJob(func);
}
void
llarp_threadpool_tick(struct llarp_threadpool* pool)
{
if (pool->impl)
{
pool->impl->drain();
}
}
void
llarp_free_threadpool(struct llarp_threadpool** pool)
{
if (*pool)
{
delete *pool;
}
*pool = nullptr;
}
size_t
llarp_threadpool::size() const
{
return impl ? impl->capacity() : 0;
}
size_t
llarp_threadpool::pendingJobs() const
{
return impl ? impl->jobCount() : 0;
}
size_t
llarp_threadpool::numThreads() const
{
return impl ? impl->activeThreadCount() : 0;
}

@ -1,91 +0,0 @@
#ifndef LLARP_THREADPOOL_H
#define LLARP_THREADPOOL_H
#include <util/thread/queue.hpp>
#include <util/thread/thread_pool.hpp>
#include <util/thread/threading.hpp>
#include <util/thread/annotations.hpp>
#include <util/types.hpp>
#include <memory>
#include <queue>
#include <string_view>
struct llarp_threadpool;
#ifdef __cplusplus
struct llarp_threadpool
{
std::unique_ptr<llarp::thread::ThreadPool> impl;
llarp_threadpool(int workers, std::string_view name, size_t queueLength = size_t{1024 * 8})
: impl(std::make_unique<llarp::thread::ThreadPool>(
workers, std::max(queueLength, size_t{32}), name))
{
}
size_t
size() const;
size_t
pendingJobs() const;
size_t
numThreads() const;
/// see if this thread is full given lookahead amount
bool
LooksFull(size_t lookahead) const
{
return (pendingJobs() + lookahead) >= size();
}
};
#endif
struct llarp_threadpool*
llarp_init_threadpool(int workers, const char* name, size_t queueLength);
void
llarp_free_threadpool(struct llarp_threadpool** tp);
using llarp_thread_work_func = void (*)(void*);
/** job to be done in worker thread */
struct llarp_thread_job
{
#ifdef __cplusplus
/** user data to pass to work function */
void* user{nullptr};
/** called in threadpool worker thread */
llarp_thread_work_func work{nullptr};
llarp_thread_job(void* u, llarp_thread_work_func w) : user(u), work(w)
{
}
llarp_thread_job() = default;
#else
void* user;
llarp_thread_work_func work;
#endif
};
void
llarp_threadpool_tick(struct llarp_threadpool* tp);
bool
llarp_threadpool_queue_job(struct llarp_threadpool* tp, struct llarp_thread_job j);
#ifdef __cplusplus
bool
llarp_threadpool_queue_job(struct llarp_threadpool* tp, std::function<void(void)> func);
#endif
void
llarp_threadpool_start(struct llarp_threadpool* tp);
void
llarp_threadpool_stop(struct llarp_threadpool* tp);
#endif

@ -29,7 +29,6 @@ add_executable(testAll
dht/test_llarp_dht_txowner.cpp
dns/test_llarp_dns_dns.cpp
exit/test_llarp_exit_context.cpp
link/test_llarp_link.cpp
llarp_test.cpp
net/test_llarp_net.cpp
router/test_llarp_router_version.cpp
@ -47,7 +46,6 @@ add_executable(testAll
util/test_llarp_util_log_level.cpp
util/thread/test_llarp_util_queue_manager.cpp
util/thread/test_llarp_util_queue.cpp
util/thread/test_llarp_util_thread_pool.cpp
)
target_link_libraries(testAll PUBLIC gmock gtest liblokinet)
@ -76,6 +74,7 @@ add_executable(catchAll
config/test_llarp_config_output.cpp
net/test_ip_address.cpp
net/test_sock_addr.cpp
iwp/test_iwp_session.cpp
check_main.cpp)
target_link_libraries(catchAll PUBLIC liblokinet Catch2::Catch2)

@ -9,10 +9,10 @@
struct ExitTest : public ::testing::Test
{
ExitTest() : r(nullptr, nullptr, nullptr), context(&r)
ExitTest() : r(nullptr, nullptr), context(&r)
{
r.Configure(nullptr, false, nullptr);
}
llarp::Router r;
llarp::exit::Context context;
};

@ -0,0 +1,278 @@
#include <catch2/catch.hpp>
#include <crypto/crypto.hpp>
#include <crypto/crypto_libsodium.hpp>
#include <string_view>
#include <router_contact.hpp>
#include <iwp/iwp.hpp>
#include <util/meta/memfn.hpp>
#include <messages/link_message_parser.hpp>
#include <messages/discard.hpp>
#include <util/time.hpp>
#undef LOG_TAG
#define LOG_TAG "test_iwp_session.cpp"
namespace iwp = llarp::iwp;
namespace util = llarp::util;
/// make an iwp link
template <bool inbound, typename... Args_t>
static llarp::LinkLayer_ptr
make_link(Args_t... args)
{
if (inbound)
return iwp::NewInboundLink(args...);
else
return iwp::NewOutboundLink(args...);
}
using Logic_ptr = std::shared_ptr<llarp::Logic>;
/// a single iwp link with associated keys and members to make unit tests work
struct IWPLinkContext
{
llarp::RouterContact rc;
llarp::IpAddress localAddr;
llarp::LinkLayer_ptr link;
std::shared_ptr<llarp::KeyManager> keyManager;
llarp::LinkMessageParser m_Parser;
llarp_ev_loop_ptr m_Loop;
/// is the test done on this context ?
bool gucci = false;
IWPLinkContext(std::string_view addr, llarp_ev_loop_ptr loop)
: localAddr{std::move(addr)}
, keyManager{std::make_shared<llarp::KeyManager>()}
, m_Parser{nullptr}
, m_Loop{std::move(loop)}
{
// generate keys
llarp::CryptoManager::instance()->identity_keygen(keyManager->identityKey);
llarp::CryptoManager::instance()->encryption_keygen(keyManager->encryptionKey);
llarp::CryptoManager::instance()->encryption_keygen(keyManager->transportKey);
// set keys in rc
rc.pubkey = keyManager->identityKey.toPublic();
rc.enckey = keyManager->encryptionKey.toPublic();
}
bool
HandleMessage(llarp::ILinkSession* from, const llarp_buffer_t& buf)
{
return m_Parser.ProcessFrom(from, buf);
}
/// initialize link
template <bool inbound>
void
InitLink(std::function<void(llarp::ILinkSession*)> established)
{
link = make_link<inbound>(
keyManager,
// getrc
[&]() -> const llarp::RouterContact& { return rc; },
// link message handler
util::memFn(&IWPLinkContext::HandleMessage, this),
// sign buffer
[&](llarp::Signature& sig, const llarp_buffer_t& buf) {
REQUIRE(llarp::CryptoManager::instance()->sign(sig, keyManager->identityKey, buf));
return true;
},
// established handler
[established](llarp::ILinkSession* s) {
REQUIRE(s != nullptr);
established(s);
return true;
},
// renegotiate handler
[](llarp::RouterContact newrc, llarp::RouterContact oldrc) {
REQUIRE(newrc.pubkey == oldrc.pubkey);
return true;
},
// timeout handler
[&](llarp::ILinkSession*) {
llarp_ev_loop_stop(m_Loop);
REQUIRE(false);
},
// session closed handler
[](llarp::RouterID) {},
// pump done handler
[]() {},
// do work function
[l = m_Loop](llarp::Work_t work) { l->call_after_delay(1ms, work); });
REQUIRE(link->Configure(
m_Loop, llarp::net::LoopbackInterfaceName(), AF_INET, *localAddr.getPort()));
if (inbound)
{
// only add address info on the recipiant's rc
rc.addrs.emplace_back();
REQUIRE(link->GetOurAddressInfo(rc.addrs.back()));
}
// sign rc
REQUIRE(rc.Sign(keyManager->identityKey));
REQUIRE(keyManager != nullptr);
}
};
using Context_ptr = std::shared_ptr<IWPLinkContext>;
/// run an iwp unit test after setup
/// call take 2 parameters, test and a timeout
///
/// test is a callable that takes 5 arguments:
/// 0) std::function<Logic_ptr(void)> that starts the iwp links and gives a logic to call with
/// 1) std::function<void(void)> that ends the unit test if we are done
/// 2) std::function<void(void)> that ends the unit test right now as a success
/// 3) client iwp link context (shared_ptr)
/// 4) relay iwp link context (shared_ptr)
///
/// timeout is a std::chrono::duration that tells the driver how long to run the unit test for
/// before it should assume failure of unit test
template <typename Func_t, typename Duration_t = std::chrono::milliseconds>
void
RunIWPTest(Func_t test, Duration_t timeout = 1s)
{
// shut up logs
llarp::LogSilencer shutup;
// set up event loop
auto logic = std::make_shared<llarp::Logic>();
auto loop = llarp_make_ev_loop();
loop->set_logic(logic);
llarp::LogContext::Instance().Initialize(
llarp::eLogDebug, llarp::LogType::File, "stdout", "unit test", [loop](auto work) {
loop->call_soon(work);
});
// turn off bogon blocking
auto oldBlockBogons = llarp::RouterContact::BlockBogons;
llarp::RouterContact::BlockBogons = false;
// set up cryptography
llarp::sodium::CryptoLibSodium crypto{};
llarp::CryptoManager manager{&crypto};
// set up client
auto initiator = std::make_shared<IWPLinkContext>("127.0.0.1:3001", loop);
// set up server
auto recipiant = std::make_shared<IWPLinkContext>("127.0.0.1:3002", loop);
// function for ending unit test on success
auto endIfDone = [initiator, recipiant, loop, logic]() {
if (initiator->gucci and recipiant->gucci)
{
LogicCall(logic, [loop]() { llarp_ev_loop_stop(loop); });
}
};
// function to start test and give logic to unit test
auto start = [initiator, recipiant, logic]() {
REQUIRE(initiator->link->Start(logic));
REQUIRE(recipiant->link->Start(logic));
return logic;
};
// function to end test immediately
auto endTest = [logic, loop]() { LogicCall(logic, [loop]() { llarp_ev_loop_stop(loop); }); };
loop->call_after_delay(
std::chrono::duration_cast<llarp_time_t>(timeout), []() { REQUIRE(false); });
test(start, endIfDone, endTest, initiator, recipiant);
llarp_ev_loop_run_single_process(loop, logic);
llarp::RouterContact::BlockBogons = oldBlockBogons;
}
/// ensure clients can connect to relays
TEST_CASE("IWP handshake", "[iwp]")
{
RunIWPTest([](std::function<Logic_ptr(void)> start,
std::function<void(void)> endIfDone,
[[maybe_unused]] std::function<void(void)> endTestNow,
Context_ptr alice,
Context_ptr bob) {
// set up initiator
alice->InitLink<false>([=](auto remote) {
REQUIRE(remote->GetRemoteRC() == bob->rc);
alice->gucci = true;
endIfDone();
});
// set up recipiant
bob->InitLink<true>([=](auto remote) {
REQUIRE(remote->GetRemoteRC() == alice->rc);
bob->gucci = true;
endIfDone();
});
// start unit test
auto logic = start();
// try establishing a session
LogicCall(logic, [link = alice->link, rc = bob->rc]() { REQUIRE(link->TryEstablishTo(rc)); });
});
}
/// ensure relays cannot connect to clients
TEST_CASE("IWP handshake reverse", "[iwp]")
{
RunIWPTest([](std::function<Logic_ptr(void)> start,
[[maybe_unused]] std::function<void(void)> endIfDone,
std::function<void(void)> endTestNow,
Context_ptr alice,
Context_ptr bob) {
alice->InitLink<false>([](auto) {});
bob->InitLink<true>([](auto) {});
// start unit test
auto logic = start();
// try establishing a session in the wrong direction
LogicCall(logic, [logic, link = bob->link, rc = alice->rc, endTestNow]() {
REQUIRE(not link->TryEstablishTo(rc));
endTestNow();
});
});
}
/// ensure iwp can send messages between sessions
TEST_CASE("IWP send messages", "[iwp]")
{
RunIWPTest([](std::function<Logic_ptr(void)> start,
std::function<void(void)> endIfDone,
std::function<void(void)> endTestNow,
Context_ptr alice,
Context_ptr bob) {
constexpr int aliceNumSend = 128;
int aliceNumSent = 0;
// when alice makes a session to bob send `aliceNumSend` messages to him
alice->InitLink<false>([endIfDone, alice, &aliceNumSent](auto session) {
for (auto index = 0; index < aliceNumSend; index++)
{
alice->m_Loop->call_soon([session, endIfDone, alice, &aliceNumSent]() {
// generate a discard message that is 512 bytes long
llarp::DiscardMessage msg;
std::vector<byte_t> msgBuff(512);
llarp_buffer_t buf(msgBuff);
// add random padding
llarp::CryptoManager::instance()->randomize(buf);
// encode the discard message
msg.BEncode(&buf);
// send the message
session->SendMessageBuffer(msgBuff, [endIfDone, alice, &aliceNumSent](auto status) {
if (status == llarp::ILinkSession::DeliveryStatus::eDeliverySuccess)
{
// on successful transmit increment the number we sent
aliceNumSent++;
}
// if we sent all the messages sucessfully we end the unit test
alice->gucci = aliceNumSent == aliceNumSend;
endIfDone();
});
});
}
});
bob->InitLink<true>([bob](auto) { bob->gucci = true; });
// start unit test
auto logic = start();
// try establishing a session from alice to bob
LogicCall(logic, [logic, link = alice->link, rc = bob->rc, endTestNow]() {
REQUIRE(link->TryEstablishTo(rc));
});
});
}

@ -1,329 +0,0 @@
#include <crypto/crypto_libsodium.hpp>
#include <ev/ev.h>
#include <iwp/iwp.hpp>
#include <llarp_test.hpp>
#include <iwp/iwp.hpp>
#include <memory>
#include <messages/link_intro.hpp>
#include <messages/discard.hpp>
#include <test_util.hpp>
#include <gtest/gtest.h>
using namespace ::llarp;
using namespace ::testing;
struct LinkLayerTest : public test::LlarpTest<llarp::sodium::CryptoLibSodium>
{
static constexpr uint16_t AlicePort = 41163;
static constexpr uint16_t BobPort = 8088;
struct Context
{
Context()
{
keyManager = std::make_shared<KeyManager>();
SecretKey signingKey;
CryptoManager::instance()->identity_keygen(signingKey);
keyManager->identityKey = signingKey;
SecretKey encryptionKey;
CryptoManager::instance()->encryption_keygen(encryptionKey);
keyManager->encryptionKey = encryptionKey;
SecretKey transportKey;
CryptoManager::instance()->encryption_keygen(transportKey);
keyManager->transportKey = transportKey;
rc.pubkey = signingKey.toPublic();
rc.enckey = encryptionKey.toPublic();
}
std::shared_ptr<thread::ThreadPool> worker;
std::shared_ptr<KeyManager> keyManager;
RouterContact rc;
bool madeSession = false;
bool gotLIM = false;
bool
IsGucci() const
{
return gotLIM && madeSession;
}
void
Setup()
{
worker = std::make_shared<thread::ThreadPool>(1, 128, "test-worker");
worker->start();
}
const RouterContact&
GetRC() const
{
return rc;
}
RouterID
GetRouterID() const
{
return rc.pubkey;
}
std::shared_ptr<ILinkLayer> link;
static std::string
localLoopBack()
{
#if defined(__FreeBSD__) || defined(__OpenBSD__) || defined(__NetBSD__) || (__APPLE__ && __MACH__) \
|| (__sun)
return "lo0";
#else
return "lo";
#endif
}
bool
Start(std::shared_ptr<Logic> logic, llarp_ev_loop_ptr loop, uint16_t port)
{
if (!link)
return false;
if (!link->Configure(loop, localLoopBack(), AF_INET, port))
return false;
/*
* TODO: ephemeral key management
if(!link->GenEphemeralKeys())
return false;
*/
rc.addrs.emplace_back();
if (!link->GetOurAddressInfo(rc.addrs[0]))
return false;
if (!rc.Sign(keyManager->identityKey))
return false;
return link->Start(logic, worker);
}
void
Stop()
{
if (link)
link->Stop();
if (worker)
{
worker->drain();
worker->stop();
}
}
void
TearDown()
{
link.reset();
worker.reset();
}
};
Context Alice;
Context Bob;
bool success = false;
const bool shouldDebug = false;
llarp_ev_loop_ptr netLoop;
std::shared_ptr<Logic> m_logic;
llarp_time_t oldRCLifetime;
llarp::LogLevel oldLevel;
LinkLayerTest() : netLoop(nullptr)
{
}
void
SetUp()
{
oldLevel = llarp::LogContext::Instance().curLevel;
if (shouldDebug)
llarp::SetLogLevel(eLogTrace);
oldRCLifetime = RouterContact::Lifetime;
RouterContact::BlockBogons = false;
RouterContact::Lifetime = 500ms;
netLoop = llarp_make_ev_loop();
m_logic.reset(new Logic());
netLoop->set_logic(m_logic);
Alice.Setup();
Bob.Setup();
}
void
TearDown()
{
Alice.TearDown();
Bob.TearDown();
m_logic.reset();
netLoop.reset();
RouterContact::BlockBogons = true;
RouterContact::Lifetime = oldRCLifetime;
llarp::SetLogLevel(oldLevel);
}
void
RunMainloop()
{
m_logic->call_later(5s, std::bind(&LinkLayerTest::Stop, this));
llarp_ev_loop_run_single_process(netLoop, m_logic);
}
void
Stop()
{
Alice.Stop();
Bob.Stop();
llarp_ev_loop_stop(netLoop);
m_logic->stop();
}
};
TEST_F(LinkLayerTest, TestIWP)
{
#ifdef WIN32
GTEST_SKIP();
#else
auto sendDiscardMessage = [](ILinkSession* s, auto callback) -> bool {
// send discard message in reply to complete unit test
std::vector<byte_t> tmp(32);
llarp_buffer_t otherBuf(tmp);
DiscardMessage discard;
if (!discard.BEncode(&otherBuf))
return false;
return s->SendMessageBuffer(std::move(tmp), callback);
};
Alice.link = iwp::NewInboundLink(
// KeyManager
Alice.keyManager,
// GetRCFunc
[&]() -> const RouterContact& { return Alice.GetRC(); },
// LinkMessageHandler
[&](ILinkSession* s, const llarp_buffer_t& buf) -> bool {
llarp_buffer_t copy(buf.base, buf.sz);
if (not Alice.gotLIM)
{
LinkIntroMessage msg;
if (msg.BDecode(&copy))
{
Alice.gotLIM = s->GotLIM(&msg);
}
}
return Alice.gotLIM;
},
// SignBufferFunc
[&](Signature& sig, const llarp_buffer_t& buf) -> bool {
return m_crypto.sign(sig, Alice.keyManager->identityKey, buf);
},
// SessionEstablishedHandler
[&, this](ILinkSession* s) -> bool {
const auto rc = s->GetRemoteRC();
if (rc.pubkey != Bob.GetRC().pubkey)
return false;
LogInfo("alice established with bob");
Alice.madeSession = true;
sendDiscardMessage(s, [&](auto status) {
success = status == llarp::ILinkSession::DeliveryStatus::eDeliverySuccess;
LogInfo("message sent to bob suceess=", success);
this->Stop();
});
return true;
},
// SessionRenegotiateHandler
[&](RouterContact, RouterContact) -> bool { return true; },
// TimeoutHandler
[&](ILinkSession* session) {
ASSERT_FALSE(session->IsEstablished());
Stop();
},
// SessionClosedHandler
[&](RouterID router) { ASSERT_EQ(router, Alice.GetRouterID()); },
// PumpDoneHandler
[]() {});
Bob.link = iwp::NewInboundLink(
// KeyManager
Bob.keyManager,
// GetRCFunc
[&]() -> const RouterContact& { return Bob.GetRC(); },
// LinkMessageHandler
[&](ILinkSession* s, const llarp_buffer_t& buf) -> bool {
llarp_buffer_t copy(buf.base, buf.sz);
if (not Bob.gotLIM)
{
LinkIntroMessage msg;
if (msg.BDecode(&copy))
{
Bob.gotLIM = s->GotLIM(&msg);
}
return Bob.gotLIM;
}
DiscardMessage discard;
if (discard.BDecode(&copy))
{
LogInfo("bog got discard message from alice");
return true;
}
return false;
},
// SignBufferFunc
[&](Signature& sig, const llarp_buffer_t& buf) -> bool {
return m_crypto.sign(sig, Bob.keyManager->identityKey, buf);
},
// SessionEstablishedHandler
[&](ILinkSession* s) -> bool {
if (s->GetRemoteRC().pubkey != Alice.GetRC().pubkey)
return false;
LogInfo("bob established with alice");
Bob.madeSession = true;
return true;
},
// SessionRenegotiateHandler
[&](RouterContact newrc, RouterContact oldrc) -> bool {
return newrc.pubkey == oldrc.pubkey;
},
// TimeoutHandler
[&](ILinkSession* session) { ASSERT_FALSE(session->IsEstablished()); },
// SessionClosedHandler
[&](RouterID router) { ASSERT_EQ(router, Alice.GetRouterID()); },
// PumpDoneHandler
[]() {});
ASSERT_TRUE(Alice.Start(m_logic, netLoop, AlicePort));
ASSERT_TRUE(Bob.Start(m_logic, netLoop, BobPort));
LogicCall(m_logic, [&]() { ASSERT_TRUE(Alice.link->TryEstablishTo(Bob.GetRC())); });
RunMainloop();
ASSERT_TRUE(Alice.IsGucci());
ASSERT_TRUE(Bob.IsGucci());
ASSERT_TRUE(success);
#endif
};

@ -6,10 +6,10 @@
TEST_CASE("FindClosestTo returns correct number of elements", "[nodedb][dht]")
{
llarp_nodedb nodeDB(nullptr, "");
llarp_nodedb nodeDB("", nullptr);
constexpr uint64_t numRCs = 3;
for(uint64_t i = 0; i < numRCs; ++i)
for (uint64_t i = 0; i < numRCs; ++i)
{
llarp::RouterContact rc;
rc.pubkey[0] = i;
@ -20,7 +20,7 @@ TEST_CASE("FindClosestTo returns correct number of elements", "[nodedb][dht]")
llarp::dht::Key_t key;
std::vector< llarp::RouterContact > results = nodeDB.FindClosestTo(key, 4);
std::vector<llarp::RouterContact> results = nodeDB.FindClosestTo(key, 4);
// we asked for more entries than nodedb had
REQUIRE(numRCs == results.size());
@ -28,7 +28,7 @@ TEST_CASE("FindClosestTo returns correct number of elements", "[nodedb][dht]")
TEST_CASE("FindClosestTo returns properly ordered set", "[nodedb][dht]")
{
llarp_nodedb nodeDB(nullptr, "");
llarp_nodedb nodeDB("", nullptr);
// insert some RCs: a < b < c
llarp::RouterContact a;
@ -47,7 +47,7 @@ TEST_CASE("FindClosestTo returns properly ordered set", "[nodedb][dht]")
llarp::dht::Key_t key;
std::vector< llarp::RouterContact > results = nodeDB.FindClosestTo(key, 2);
std::vector<llarp::RouterContact> results = nodeDB.FindClosestTo(key, 2);
REQUIRE(2 == results.size());
// we xor'ed with 0x0, so order should be a,b,c

@ -1,456 +0,0 @@
#include <util/thread/thread_pool.hpp>
#include <util/thread/threading.hpp>
#include <util/thread/barrier.hpp>
#include <condition_variable>
#include <mutex>
#include <thread>
#include <gtest/gtest.h>
using namespace llarp;
using namespace llarp::thread;
using LockGuard = std::unique_lock< std::mutex >;
class PoolArgs
{
public:
std::mutex& mutex;
std::condition_variable& start;
std::condition_variable& stop;
volatile size_t count;
volatile size_t startSignal;
volatile size_t stopSignal;
};
class BarrierArgs
{
public:
util::Barrier& startBarrier;
util::Barrier& stopBarrier;
std::atomic_size_t count;
};
class BasicWorkArgs
{
public:
std::atomic_size_t count;
};
void
simpleFunction(PoolArgs& args)
{
LockGuard lock(args.mutex);
++args.count;
++args.startSignal;
args.start.notify_one();
args.stop.wait(lock, [&]() { return args.stopSignal; });
}
void
incrementFunction(PoolArgs& args)
{
LockGuard lock(args.mutex);
++args.count;
++args.startSignal;
args.start.notify_one();
}
void
barrierFunction(BarrierArgs& args)
{
args.startBarrier.Block();
args.count++;
args.stopBarrier.Block();
}
void
basicWork(BasicWorkArgs& args)
{
args.count++;
}
void
recurse(util::Barrier& barrier, std::atomic_size_t& counter, ThreadPool& pool,
size_t depthLimit)
{
ASSERT_LE(0u, counter);
ASSERT_GT(depthLimit, counter);
if(++counter != depthLimit)
{
ASSERT_TRUE(
pool.addJob(std::bind(recurse, std::ref(barrier), std::ref(counter),
std::ref(pool), depthLimit)));
}
barrier.Block();
}
class DestructiveObject
{
private:
util::Barrier& barrier;
ThreadPool& pool;
public:
DestructiveObject(util::Barrier& b, ThreadPool& p) : barrier(b), pool(p)
{
}
~DestructiveObject()
{
auto job = std::bind(&util::Barrier::Block, &barrier);
pool.addJob(job);
}
};
void
destructiveJob(DestructiveObject* obj)
{
delete obj;
}
TEST(TestThreadPool, breathing)
{
static constexpr size_t threads = 10;
static constexpr size_t capacity = 50;
ThreadPool pool(threads, capacity, "breathing");
ASSERT_EQ(0u, pool.startedThreadCount());
ASSERT_EQ(capacity, pool.capacity());
ASSERT_EQ(0u, pool.jobCount());
ASSERT_TRUE(pool.start());
ASSERT_EQ(threads, pool.startedThreadCount());
ASSERT_EQ(capacity, pool.capacity());
ASSERT_EQ(0u, pool.jobCount());
pool.drain();
}
struct AccessorsData
{
size_t threads;
size_t capacity;
};
std::ostream&
operator<<(std::ostream& os, AccessorsData d)
{
os << "[ threads = " << d.threads << " capacity = " << d.capacity << " ]";
return os;
}
class Accessors : public ::testing::TestWithParam< AccessorsData >
{
};
TEST_P(Accessors, accessors)
{
auto d = GetParam();
ThreadPool pool(d.threads, d.capacity, "accessors");
ASSERT_EQ(d.threads, pool.threadCount());
ASSERT_EQ(d.capacity, pool.capacity());
ASSERT_EQ(0u, pool.startedThreadCount());
}
static const AccessorsData accessorsData[] = {
{10, 50}, {1, 1}, {50, 100}, {2, 22}, {100, 200}};
INSTANTIATE_TEST_SUITE_P(TestThreadPool, Accessors,
::testing::ValuesIn(accessorsData));
struct ClosingData
{
size_t threads;
size_t capacity;
};
std::ostream&
operator<<(std::ostream& os, ClosingData d)
{
os << "[ threads = " << d.threads << " capacity = " << d.capacity << " ]";
return os;
}
class Closing : public ::testing::TestWithParam< ClosingData >
{
};
TEST_P(Closing, drain)
{
auto d = GetParam();
std::mutex mutex;
std::condition_variable start;
std::condition_variable stop;
PoolArgs args{mutex, start, stop, 0, 0, 0};
ThreadPool pool(d.threads, d.capacity, "drain");
ASSERT_EQ(d.threads, pool.threadCount());
ASSERT_EQ(d.capacity, pool.capacity());
ASSERT_EQ(0u, pool.startedThreadCount());
auto simpleJob = std::bind(simpleFunction, std::ref(args));
ASSERT_FALSE(pool.addJob(simpleJob));
ASSERT_TRUE(pool.start());
ASSERT_EQ(0u, pool.jobCount());
LockGuard lock(mutex);
for(size_t i = 0; i < d.threads; ++i)
{
args.startSignal = 0;
args.stopSignal = 0;
ASSERT_TRUE(pool.addJob(simpleJob));
start.wait(lock, [&]() { return args.startSignal; });
}
args.stopSignal++;
lock.unlock();
stop.notify_all();
pool.drain();
ASSERT_EQ(d.threads, pool.startedThreadCount());
ASSERT_EQ(0u, pool.jobCount());
}
TEST_P(Closing, stop)
{
auto d = GetParam();
ThreadPool pool(d.threads, d.capacity, "stop");
std::mutex mutex;
std::condition_variable start;
std::condition_variable stop;
PoolArgs args{mutex, start, stop, 0, 0, 0};
ASSERT_EQ(d.threads, pool.threadCount());
ASSERT_EQ(d.capacity, pool.capacity());
ASSERT_EQ(0u, pool.startedThreadCount());
auto simpleJob = std::bind(simpleFunction, std::ref(args));
ASSERT_FALSE(pool.addJob(simpleJob));
ASSERT_TRUE(pool.start());
ASSERT_EQ(0u, pool.jobCount());
LockGuard lock(mutex);
for(size_t i = 0; i < d.capacity; ++i)
{
args.startSignal = 0;
args.stopSignal = 0;
ASSERT_TRUE(pool.addJob(simpleJob));
while(i < d.threads && !args.startSignal)
{
start.wait(lock);
}
}
args.stopSignal++;
lock.unlock();
stop.notify_all();
pool.stop();
ASSERT_EQ(d.capacity, args.count);
ASSERT_EQ(0u, pool.startedThreadCount());
ASSERT_EQ(0u, pool.activeThreadCount());
ASSERT_EQ(0u, pool.jobCount());
}
TEST_P(Closing, shutdown)
{
auto d = GetParam();
ThreadPool pool(d.threads, d.capacity, "shutdown");
std::mutex mutex;
std::condition_variable start;
std::condition_variable stop;
PoolArgs args{mutex, start, stop, 0, 0, 0};
ASSERT_EQ(d.threads, pool.threadCount());
ASSERT_EQ(d.capacity, pool.capacity());
ASSERT_EQ(0u, pool.startedThreadCount());
auto simpleJob = std::bind(simpleFunction, std::ref(args));
ASSERT_FALSE(pool.addJob(simpleJob));
ASSERT_TRUE(pool.start());
ASSERT_EQ(0u, pool.jobCount());
LockGuard lock(mutex);
for(size_t i = 0; i < d.capacity; ++i)
{
args.startSignal = 0;
args.stopSignal = 0;
ASSERT_TRUE(pool.addJob(simpleJob));
while(i < d.threads && !args.startSignal)
{
start.wait(lock);
}
}
ASSERT_EQ(d.threads, pool.startedThreadCount());
ASSERT_EQ(d.capacity - d.threads, pool.jobCount());
auto incrementJob = std::bind(incrementFunction, std::ref(args));
for(size_t i = 0; i < d.threads; ++i)
{
ASSERT_TRUE(pool.addJob(incrementJob));
}
args.stopSignal++;
stop.notify_all();
lock.unlock();
pool.shutdown();
ASSERT_EQ(0u, pool.startedThreadCount());
ASSERT_EQ(0u, pool.activeThreadCount());
ASSERT_EQ(0u, pool.jobCount());
}
ClosingData closingData[] = {{1, 1}, {2, 2}, {10, 10},
{10, 50}, {50, 75}, {25, 80}};
INSTANTIATE_TEST_SUITE_P(TestThreadPool, Closing,
::testing::ValuesIn(closingData));
struct TryAddData
{
size_t threads;
size_t capacity;
};
std::ostream&
operator<<(std::ostream& os, TryAddData d)
{
os << "[ threads = " << d.threads << " capacity = " << d.capacity << " ]";
return os;
}
class TryAdd : public ::testing::TestWithParam< TryAddData >
{
};
TEST_P(TryAdd, noblocking)
{
// Verify that tryAdd does not block.
// Fill the queue, then verify `tryAddJob` does not block.
auto d = GetParam();
ThreadPool pool(d.threads, d.capacity, "noblocking");
util::Barrier startBarrier(d.threads + 1);
util::Barrier stopBarrier(d.threads + 1);
BarrierArgs args{startBarrier, stopBarrier, {0}};
auto simpleJob = std::bind(barrierFunction, std::ref(args));
ASSERT_FALSE(pool.tryAddJob(simpleJob));
ASSERT_TRUE(pool.start());
for(size_t i = 0; i < d.threads; ++i)
{
ASSERT_TRUE(pool.tryAddJob(simpleJob));
}
// Wait for everything to start.
startBarrier.Block();
// and that we emptied the queue.
ASSERT_EQ(0u, pool.jobCount());
BasicWorkArgs basicWorkArgs = {{0}};
auto workJob = std::bind(basicWork, std::ref(basicWorkArgs));
for(size_t i = 0; i < d.capacity; ++i)
{
ASSERT_TRUE(pool.tryAddJob(workJob));
}
// queue should now be full
ASSERT_FALSE(pool.tryAddJob(workJob));
// and finish
stopBarrier.Block();
}
TEST(TestThreadPool, recurseJob)
{
// Verify we can enqueue a job onto the threadpool from a thread which is
// currently executing a threadpool job.
static constexpr size_t threads = 10;
static constexpr size_t depth = 10;
static constexpr size_t capacity = 100;
util::Barrier barrier(threads + 1);
std::atomic_size_t counter{0};
ThreadPool pool(threads, capacity, "recurse");
pool.start();
ASSERT_TRUE(pool.addJob(std::bind(recurse, std::ref(barrier),
std::ref(counter), std::ref(pool), depth)));
barrier.Block();
ASSERT_EQ(depth, counter);
}
TEST(TestThreadPool, destructors)
{
// Verify that functors have their destructors called outside of threadpool
// locks.
static constexpr size_t threads = 1;
static constexpr size_t capacity = 100;
ThreadPool pool(threads, capacity, "destructors");
pool.start();
util::Barrier barrier(threads + 1);
{
DestructiveObject* obj = new DestructiveObject(barrier, pool);
ASSERT_TRUE(pool.addJob(std::bind(destructiveJob, obj)));
}
barrier.Block();
}
Loading…
Cancel
Save