Merge Logic functions into EventLoop

loop->call(...) is similar to the old logic->Call(...), but is smart
about the current thread: if called from within the event loop it simply
runs the argument directly, otherwise it queues it.

Similarly most of the other event loop calls are also now thread-aware:
for example, `call_later(...)` can queue the job directly when called if
in the event loop rather than having to double-queue through the even
loop (once to call, then inside the call to initiate the time).
pull/1557/head
Jason Rhinelander 3 years ago
parent 5b555ee5aa
commit ccc7b5c9e9

@ -6,7 +6,6 @@
#include <util/logging/logger.hpp> #include <util/logging/logger.hpp>
#include <util/logging/ostream_logger.hpp> #include <util/logging/ostream_logger.hpp>
#include <util/str.hpp> #include <util/str.hpp>
#include <util/thread/logic.hpp>
#ifdef _WIN32 #ifdef _WIN32
#include <dbghelp.h> #include <dbghelp.h>
@ -69,7 +68,7 @@ void
handle_signal(int sig) handle_signal(int sig)
{ {
if (ctx) if (ctx)
LogicCall(ctx->logic, std::bind(&llarp::Context::HandleSignal, ctx.get(), sig)); ctx->loop->call([sig] { ctx->HandleSignal(sig); });
else else
std::cerr << "Received signal " << sig << ", but have no context yet. Ignoring!" << std::endl; std::cerr << "Received signal " << sig << ", but have no context yet. Ignoring!" << std::endl;
} }

@ -16,7 +16,7 @@ namespace llarp
class Platform; class Platform;
} }
class Logic; class EventLoop;
struct Config; struct Config;
struct RouterContact; struct RouterContact;
struct Config; struct Config;
@ -43,9 +43,8 @@ namespace llarp
std::shared_ptr<Crypto> crypto = nullptr; std::shared_ptr<Crypto> crypto = nullptr;
std::shared_ptr<CryptoManager> cryptoManager = nullptr; std::shared_ptr<CryptoManager> cryptoManager = nullptr;
std::shared_ptr<AbstractRouter> router = nullptr; std::shared_ptr<AbstractRouter> router = nullptr;
std::shared_ptr<Logic> logic = nullptr; std::shared_ptr<EventLoop> loop = nullptr;
std::shared_ptr<NodeDB> nodedb = nullptr; std::shared_ptr<NodeDB> nodedb = nullptr;
std::shared_ptr<EventLoop> mainloop;
std::string nodedb_dir; std::string nodedb_dir;
virtual ~Context() = default; virtual ~Context() = default;
@ -93,7 +92,7 @@ namespace llarp
/// Creates a router. Can be overridden to allow a different class of router /// Creates a router. Can be overridden to allow a different class of router
/// to be created instead. Defaults to llarp::Router. /// to be created instead. Defaults to llarp::Router.
virtual std::shared_ptr<AbstractRouter> virtual std::shared_ptr<AbstractRouter>
makeRouter(std::shared_ptr<EventLoop> __netloop, std::shared_ptr<Logic> logic); makeRouter(const std::shared_ptr<EventLoop>& loop);
/// create the vpn platform for use in creating network interfaces /// create the vpn platform for use in creating network interfaces
virtual std::shared_ptr<llarp::vpn::Platform> virtual std::shared_ptr<llarp::vpn::Platform>

@ -19,7 +19,6 @@ add_library(lokinet-util
util/mem.cpp util/mem.cpp
util/printer.cpp util/printer.cpp
util/str.cpp util/str.cpp
util/thread/logic.cpp
util/thread/queue_manager.cpp util/thread/queue_manager.cpp
util/thread/threading.cpp util/thread/threading.cpp
util/time.cpp util/time.cpp

@ -7,6 +7,7 @@
#include <dht/context.hpp> #include <dht/context.hpp>
#include <ev/ev.hpp> #include <ev/ev.hpp>
#include <ev/vpnio.hpp> #include <ev/vpnio.hpp>
#include <memory>
#include <nodedb.hpp> #include <nodedb.hpp>
#include <router/router.hpp> #include <router/router.hpp>
#include <service/context.hpp> #include <service/context.hpp>
@ -25,7 +26,10 @@ namespace llarp
bool bool
Context::CallSafe(std::function<void(void)> f) Context::CallSafe(std::function<void(void)> f)
{ {
return logic && LogicCall(logic, f); if (!loop)
return false;
loop->call(std::move(f));
return true;
} }
void void
@ -36,8 +40,6 @@ namespace llarp
config = std::move(conf); config = std::move(conf);
logic = std::make_shared<Logic>();
nodedb_dir = fs::path{config->router.m_dataDir / nodedb_dirname}.string(); nodedb_dir = fs::path{config->router.m_dataDir / nodedb_dirname}.string();
} }
@ -62,19 +64,16 @@ namespace llarp
llarp::LogInfo(llarp::VERSION_FULL, " ", llarp::RELEASE_MOTTO); llarp::LogInfo(llarp::VERSION_FULL, " ", llarp::RELEASE_MOTTO);
llarp::LogInfo("starting up"); llarp::LogInfo("starting up");
if (mainloop == nullptr) if (!loop)
{ {
auto jobQueueSize = std::max(event_loop_queue_size, config->router.m_JobQueueSize); auto jobQueueSize = std::max(event_loop_queue_size, config->router.m_JobQueueSize);
mainloop = EventLoop::create(jobQueueSize); loop = EventLoop::create(jobQueueSize);
} }
logic->set_event_loop(mainloop.get());
mainloop->set_logic(logic);
crypto = std::make_shared<sodium::CryptoLibSodium>(); crypto = std::make_shared<sodium::CryptoLibSodium>();
cryptoManager = std::make_shared<CryptoManager>(crypto.get()); cryptoManager = std::make_shared<CryptoManager>(crypto.get());
router = makeRouter(mainloop, logic); router = makeRouter(loop);
nodedb = std::make_shared<NodeDB>( nodedb = std::make_shared<NodeDB>(
nodedb_dir, [r = router.get()](auto call) { r->QueueDiskIO(std::move(call)); }); nodedb_dir, [r = router.get()](auto call) { r->QueueDiskIO(std::move(call)); });
@ -84,9 +83,10 @@ namespace llarp
} }
std::shared_ptr<AbstractRouter> std::shared_ptr<AbstractRouter>
Context::makeRouter(std::shared_ptr<EventLoop> netloop, std::shared_ptr<Logic> logic) Context::makeRouter(const EventLoop_ptr& loop)
{ {
return std::make_shared<Router>(netloop, logic, makeVPNPlatform()); return std::static_pointer_cast<AbstractRouter>(
std::make_shared<Router>(loop, makeVPNPlatform()));
} }
std::shared_ptr<vpn::Platform> std::shared_ptr<vpn::Platform>
@ -117,7 +117,7 @@ namespace llarp
// run net io thread // run net io thread
llarp::LogInfo("running mainloop"); llarp::LogInfo("running mainloop");
mainloop->run(*logic); loop->run();
if (closeWaiter) if (closeWaiter)
{ {
closeWaiter->set_value(); closeWaiter->set_value();
@ -188,8 +188,8 @@ namespace llarp
llarp::LogDebug("free router"); llarp::LogDebug("free router");
router.reset(); router.reset();
llarp::LogDebug("free logic"); llarp::LogDebug("free loop");
logic.reset(); loop.reset();
} }
#if defined(ANDROID) #if defined(ANDROID)

@ -17,7 +17,6 @@
#include <path/path_context.hpp> #include <path/path_context.hpp>
#include <router/abstractrouter.hpp> #include <router/abstractrouter.hpp>
#include <routing/dht_message.hpp> #include <routing/dht_message.hpp>
#include <util/thread/logic.hpp>
#include <nodedb.hpp> #include <nodedb.hpp>
#include <profiling.hpp> #include <profiling.hpp>
#include <router/i_rc_lookup_handler.hpp> #include <router/i_rc_lookup_handler.hpp>
@ -161,7 +160,7 @@ namespace llarp
GetIntroSetByLocation(const Key_t& location) const override; GetIntroSetByLocation(const Key_t& location) const override;
void void
handle_cleaner_timer(uint64_t interval); handle_cleaner_timer();
/// explore dht for new routers /// explore dht for new routers
void void
@ -202,15 +201,13 @@ namespace llarp
void void
PutRCNodeAsync(const RCNode& val) override PutRCNodeAsync(const RCNode& val) override
{ {
auto func = std::bind(&Bucket<RCNode>::PutNode, Nodes(), val); router->loop()->call([nodes = Nodes(), val] { nodes->PutNode(val); });
LogicCall(router->logic(), func);
} }
void void
DelRCNodeAsync(const Key_t& val) override DelRCNodeAsync(const Key_t& val) override
{ {
auto func = std::bind(&Bucket<RCNode>::DelNode, Nodes(), val); router->loop()->call([nodes = Nodes(), val] { nodes->DelNode(val); });
LogicCall(router->logic(), func);
} }
const Key_t& const Key_t&
@ -289,8 +286,7 @@ namespace llarp
ExploreNetworkVia(const Key_t& peer) override; ExploreNetworkVia(const Key_t& peer) override;
private: private:
void std::shared_ptr<int> _timer_keepalive;
ScheduleCleanupTimer();
void void
CleanupTX(); CleanupTX();
@ -333,7 +329,7 @@ namespace llarp
} }
void void
Context::handle_cleaner_timer(__attribute__((unused)) uint64_t interval) Context::handle_cleaner_timer()
{ {
// clean up transactions // clean up transactions
CleanupTX(); CleanupTX();
@ -354,7 +350,6 @@ namespace llarp
++itr; ++itr;
} }
} }
ScheduleCleanupTimer();
} }
void void
@ -458,14 +453,8 @@ namespace llarp
_services = std::make_unique<Bucket<ISNode>>(ourKey, llarp::randint); _services = std::make_unique<Bucket<ISNode>>(ourKey, llarp::randint);
llarp::LogDebug("initialize dht with key ", ourKey); llarp::LogDebug("initialize dht with key ", ourKey);
// start cleanup timer // start cleanup timer
ScheduleCleanupTimer(); _timer_keepalive = std::make_shared<int>(0);
} router->loop()->call_every(1s, _timer_keepalive, [this] { handle_cleaner_timer(); });
void
Context::ScheduleCleanupTimer()
{
router->logic()->call_later(
1s, std::bind(&llarp::dht::Context::handle_cleaner_timer, this, 1000));
} }
void void

@ -1,19 +1,18 @@
#include <dns/server.hpp> #include <dns/server.hpp>
#include <dns/dns.hpp> #include <dns/dns.hpp>
#include <crypto/crypto.hpp> #include <crypto/crypto.hpp>
#include <util/thread/logic.hpp>
#include <array> #include <array>
#include <utility> #include <utility>
#include <ev/udp_handle.hpp> #include <ev/udp_handle.hpp>
namespace llarp::dns namespace llarp::dns
{ {
PacketHandler::PacketHandler(Logic_ptr logic, IQueryHandler* h) PacketHandler::PacketHandler(EventLoop_ptr loop, IQueryHandler* h)
: m_QueryHandler{h}, m_Logic{std::move(logic)} : m_QueryHandler{h}, m_Loop{std::move(loop)}
{} {}
Proxy::Proxy(EventLoop_ptr loop, Logic_ptr logic, IQueryHandler* h) Proxy::Proxy(EventLoop_ptr loop, IQueryHandler* h)
: PacketHandler{logic, h}, m_Loop(std::move(loop)) : PacketHandler{loop, h}, m_Loop(std::move(loop))
{ {
m_Server = m_Server =
m_Loop->udp([this](UDPHandle&, SockAddr a, OwnedBuffer buf) { HandlePacket(a, a, buf); }); m_Loop->udp([this](UDPHandle&, SockAddr a, OwnedBuffer buf) { HandlePacket(a, a, buf); });
@ -65,7 +64,7 @@ namespace llarp::dns
}; };
m_UnboundResolver = m_UnboundResolver =
std::make_shared<UnboundResolver>(m_Logic, std::move(replyFunc), std::move(failFunc)); std::make_shared<UnboundResolver>(m_Loop, std::move(replyFunc), std::move(failFunc));
if (not m_UnboundResolver->Init()) if (not m_UnboundResolver->Init())
{ {
llarp::LogError("Failed to initialize upstream DNS resolver."); llarp::LogError("Failed to initialize upstream DNS resolver.");

@ -4,7 +4,6 @@
#include <dns/message.hpp> #include <dns/message.hpp>
#include <ev/ev.hpp> #include <ev/ev.hpp>
#include <net/net.hpp> #include <net/net.hpp>
#include <util/thread/logic.hpp>
#include <dns/unbound_resolver.hpp> #include <dns/unbound_resolver.hpp>
#include <unordered_map> #include <unordered_map>
@ -32,9 +31,7 @@ namespace llarp
class PacketHandler : public std::enable_shared_from_this<PacketHandler> class PacketHandler : public std::enable_shared_from_this<PacketHandler>
{ {
public: public:
using Logic_ptr = std::shared_ptr<Logic>; explicit PacketHandler(EventLoop_ptr loop, IQueryHandler* handler);
explicit PacketHandler(Logic_ptr logic, IQueryHandler* handler);
virtual ~PacketHandler() = default; virtual ~PacketHandler() = default;
@ -67,15 +64,14 @@ namespace llarp
IQueryHandler* const m_QueryHandler; IQueryHandler* const m_QueryHandler;
std::set<IpAddress> m_Resolvers; std::set<IpAddress> m_Resolvers;
std::shared_ptr<UnboundResolver> m_UnboundResolver; std::shared_ptr<UnboundResolver> m_UnboundResolver;
Logic_ptr m_Logic; EventLoop_ptr m_Loop;
}; };
// Proxying DNS handler that listens on a UDP port for proper DNS requests. // Proxying DNS handler that listens on a UDP port for proper DNS requests.
class Proxy : public PacketHandler class Proxy : public PacketHandler
{ {
public: public:
using Logic_ptr = std::shared_ptr<Logic>; explicit Proxy(EventLoop_ptr loop, IQueryHandler* handler);
explicit Proxy(EventLoop_ptr loop, Logic_ptr logic, IQueryHandler* handler);
bool bool
Start(SockAddr localaddr, std::vector<IpAddress> resolvers) override; Start(SockAddr localaddr, std::vector<IpAddress> resolvers) override;

@ -36,11 +36,11 @@ namespace llarp::dns
} }
UnboundResolver::UnboundResolver( UnboundResolver::UnboundResolver(
std::shared_ptr<Logic> logic, ReplyFunction reply, FailFunction fail) EventLoop_ptr loop, ReplyFunction reply, FailFunction fail)
: unboundContext(nullptr) : unboundContext(nullptr)
, started(false) , started(false)
, replyFunc(logic->make_caller(std::move(reply))) , replyFunc(loop->make_caller(std::move(reply)))
, failFunc(logic->make_caller(std::move(fail))) , failFunc(loop->make_caller(std::move(fail)))
{} {}
// static callback // static callback

@ -7,7 +7,6 @@
#include <queue> #include <queue>
#include <ev/ev.hpp> #include <ev/ev.hpp>
#include <util/thread/logic.hpp>
#include <dns/message.hpp> #include <dns/message.hpp>
@ -37,7 +36,7 @@ namespace llarp::dns
Reset(); Reset();
public: public:
UnboundResolver(std::shared_ptr<Logic> logic, ReplyFunction replyFunc, FailFunction failFunc); UnboundResolver(EventLoop_ptr loop, ReplyFunction replyFunc, FailFunction failFunc);
static void static void
Callback(void* data, int err, ub_result* result); Callback(void* data, int err, ub_result* result);

@ -1,7 +1,6 @@
#include <ev/ev.hpp> #include <ev/ev.hpp>
#include <util/mem.hpp> #include <util/mem.hpp>
#include <util/str.hpp> #include <util/str.hpp>
#include <util/thread/logic.hpp>
#include <cstddef> #include <cstddef>
#include <cstring> #include <cstring>
@ -17,13 +16,4 @@ namespace llarp
{ {
return std::make_shared<llarp::uv::Loop>(queueLength); return std::make_shared<llarp::uv::Loop>(queueLength);
} }
void
EventLoop::run(Logic& logic)
{
run_loop();
logic.clear_event_loop();
stopped();
}
} // namespace llarp } // namespace llarp

@ -52,7 +52,6 @@ struct llarp_ev_pkt_pipe;
namespace llarp namespace llarp
{ {
class Logic;
struct SockAddr; struct SockAddr;
struct UDPHandle; struct UDPHandle;
@ -99,17 +98,13 @@ namespace llarp
// this (nearly!) abstract base class // this (nearly!) abstract base class
// is overriden for each platform // is overriden for each platform
struct EventLoop class EventLoop
// : std::enable_shared_from_this<EventLoop> // FIXME: do I actually need shared_from_this()?
{ {
public:
// Runs the event loop. This does not return until sometime after `stop()` is called (and so // Runs the event loop. This does not return until sometime after `stop()` is called (and so
// typically you want to run this in its own thread). // typically you want to run this in its own thread).
void
run(Logic& logic);
// Actually runs the underlying implementation event loop; called by run().
virtual void virtual void
run_loop() = 0; run() = 0;
virtual bool virtual bool
running() const = 0; running() const = 0;
@ -120,14 +115,89 @@ namespace llarp
return llarp::time_now_ms(); return llarp::time_now_ms();
} }
// Calls a function/lambda/etc. If invoked from within the event loop itself this calls the
// given lambda immediately; otherwise it passes it to `call_soon()` to be queued to run at the
// next event loop iteration.
template <typename Callable> void call(Callable&& f) {
if (inEventLoop())
f();
else
call_soon(std::forward<Callable>(f));
}
// Queues a function to be called on the next event loop cycle and triggers it to be called as
// soon as possible; can be called from any thread. Note that, unlike `call()`, this queues the
// job even if called from the event loop thread itself and so you *usually* want to use
// `call()` instead.
virtual void virtual void
stopped() call_soon(std::function<void(void)> f) = 0;
{}
// Adds a timer to the event loop; should only be called from the logic thread (and so is // Adds a timer to the event loop to invoke the given callback after a delay.
// typically scheduled via a call to Logic::call_later()).
virtual void virtual void
call_after_delay(llarp_time_t delay_ms, std::function<void(void)> callback) = 0; call_later(llarp_time_t delay_ms, std::function<void(void)> callback) = 0;
// Created a repeated timer that fires ever `repeat` time unit. Lifetime of the event
// is tied to `owner`: callbacks will be invoked so long as `owner` remains alive, but
// the first time it repeats after `owner` has been destroyed the internal timer object will
// be destroyed and no more callbacks will be invoked.
//
// Intended to be used as:
//
// loop->call_every(100ms, weak_from_this(), [this] { some_func(); });
//
// Alternative, when shared_from_this isn't available for a type, you can use a local member
// shared_ptr (or even create a simple one, for more fine-grained control) to tie the lifetime:
//
// m_keepalive = std::make_shared<int>(42);
// loop->call_every(100ms, m_keepalive, [this] { some_func(); });
//
template <typename Callable> // Templated so that the compiler can inline the call
void
call_every(llarp_time_t repeat, std::weak_ptr<void> owner, Callable f)
{
auto repeater = make_repeater();
auto& r = *repeater; // reference *before* we pass ownership into the lambda below
r.start(
repeat,
[repeater = std::move(repeater), owner = std::move(owner), f = std::move(f)]() mutable {
if (auto ptr = owner.lock())
f();
else
repeater.reset(); // Trigger timer removal on tied object destruction (we should be the only thing holding
// the repeater; ideally it would be a unique_ptr, but
// std::function says nuh-uh).
});
}
// Wraps a lambda with a lambda that triggers it to be called via loop->call()
// when invoked. E.g.:
//
// auto x = loop->make_caller([] (int a) { std::cerr << a; });
// x(42);
// x(99);
//
// will schedule two calls of the inner lambda (with different arguments) in the event loop.
// Arguments are forwarded to the inner lambda (allowing moving arguments into it).
template <typename Callable>
auto
make_caller(Callable f)
{
return [this, f = std::move(f)](auto&&... args) {
if (inEventLoop())
return f(std::forward<decltype(args)>(args)...);
// This shared pointer in a pain in the ass but needed because this lambda is going into a
// std::function that only accepts copyable lambdas. I *want* to simply capture:
// args=std::make_tuple(std::forward<decltype(args)>(args)...) but that fails if any given
// arguments aren't copyable (because of std::function). Dammit.
auto args_tuple_ptr = std::make_shared<std::tuple<std::decay_t<decltype(args)>...>>(
std::forward<decltype(args)>(args)...);
call_soon([f, args = std::move(args_tuple_ptr)]() mutable {
// Moving away the tuple args here is okay because this lambda will only be invoked once
std::apply(f, std::move(*args));
});
};
}
virtual bool virtual bool
add_network_interface( add_network_interface(
@ -146,15 +216,8 @@ namespace llarp
virtual std::shared_ptr<UDPHandle> virtual std::shared_ptr<UDPHandle>
udp(UDPReceiveFunc on_recv) = 0; udp(UDPReceiveFunc on_recv) = 0;
/// give this event loop a logic thread for calling
virtual void
set_logic(const std::shared_ptr<llarp::Logic>& logic) = 0;
virtual ~EventLoop() = default; virtual ~EventLoop() = default;
virtual void
call_soon(std::function<void(void)> f) = 0;
/// set the function that is called once per cycle the flush all the queues /// set the function that is called once per cycle the flush all the queues
virtual void virtual void
set_pump_function(std::function<void(void)> pumpll) = 0; set_pump_function(std::function<void(void)> pumpll) = 0;
@ -167,7 +230,7 @@ namespace llarp
make_waker(std::function<void()> callback) = 0; make_waker(std::function<void()> callback) = 0;
// Initializes a new repeated task object. Note that the task is not actually added to the event // Initializes a new repeated task object. Note that the task is not actually added to the event
// loop until you call start() on the returned object. Typically invoked via Logic::call_every. // loop until you call start() on the returned object. Typically invoked via call_every.
virtual std::shared_ptr<EventLoopRepeater> virtual std::shared_ptr<EventLoopRepeater>
make_repeater() = 0; make_repeater() = 0;
@ -177,7 +240,7 @@ namespace llarp
// Returns true if called from within the event loop thread, false otherwise. // Returns true if called from within the event loop thread, false otherwise.
virtual bool virtual bool
inEventLoopThread() const = 0; inEventLoop() const = 0;
}; };
using EventLoop_ptr = std::shared_ptr<EventLoop>; using EventLoop_ptr = std::shared_ptr<EventLoop>;

@ -3,7 +3,6 @@
#include <memory> #include <memory>
#include <thread> #include <thread>
#include <type_traits> #include <type_traits>
#include <util/thread/logic.hpp>
#include <util/thread/queue.hpp> #include <util/thread/queue.hpp>
#include <cstring> #include <cstring>
@ -130,11 +129,14 @@ namespace llarp::uv
} }
void void
Loop::run_loop() Loop::run()
{ {
llarp::LogTrace("Loop::run_loop()"); llarp::LogTrace("Loop::run_loop()");
m_EventLoopThreadID = std::this_thread::get_id(); m_EventLoopThreadID = std::this_thread::get_id();
m_Impl->run(); m_Impl->run();
m_Impl->close();
m_Impl.reset();
llarp::LogInfo("we have stopped");
} }
void void
@ -150,23 +152,37 @@ namespace llarp::uv
std::make_shared<llarp::uv::UDPHandle>(*m_Impl, std::move(on_recv))); std::make_shared<llarp::uv::UDPHandle>(*m_Impl, std::move(on_recv)));
} }
// TODO: replace this one-shot timer mechanism with a repeated timer, because most likely static void setup_oneshot_timer(uvw::Loop& loop, llarp_time_t delay, std::function<void()> callback) {
// everything using this is repeating scheduling itself all the time and would be better served by auto timer = loop.resource<uvw::TimerHandle>();
// a repeating uv_timer. timer->on<uvw::TimerEvent>([f = std::move(callback)](const auto&, auto& timer) {
f();
timer.stop();
timer.close();
});
timer->start(delay, 0ms);
}
void void
Loop::call_after_delay(llarp_time_t delay_ms, std::function<void(void)> callback) Loop::call_later(llarp_time_t delay_ms, std::function<void(void)> callback)
{ {
llarp::LogTrace("Loop::call_after_delay()"); llarp::LogTrace("Loop::call_after_delay()");
#ifdef TESTNET_SPEED #ifdef TESTNET_SPEED
delay_ms *= TESTNET_SPEED; delay_ms *= TESTNET_SPEED;
#endif #endif
auto timer = m_Impl->resource<uvw::TimerHandle>();
timer->on<uvw::TimerEvent>([f = std::move(callback)](const auto&, auto& timer) { if (inEventLoop())
f(); setup_oneshot_timer(*m_Impl, delay_ms, std::move(callback));
timer.stop(); else
timer.close(); {
}); call_soon([this, f = std::move(callback), target_time = time_now() + delay_ms] {
timer->start(delay_ms, 0ms); // Recalculate delay because it may have taken some time to get ourselves into the logic thread
auto updated_delay = target_time - time_now();
if (updated_delay <= 0ms)
f(); // Timer already expired!
else
setup_oneshot_timer(*m_Impl, updated_delay, std::move(f));
});
}
} }
void void
@ -174,6 +190,9 @@ namespace llarp::uv
{ {
if (m_Run) if (m_Run)
{ {
if (not inEventLoop())
return call_soon([this] { stop(); });
llarp::LogInfo("stopping event loop"); llarp::LogInfo("stopping event loop");
m_Impl->walk([](auto&& handle) { m_Impl->walk([](auto&& handle) {
if constexpr (!std::is_pointer_v<std::remove_reference_t<decltype(handle)>>) if constexpr (!std::is_pointer_v<std::remove_reference_t<decltype(handle)>>)
@ -181,19 +200,9 @@ namespace llarp::uv
}); });
llarp::LogDebug("Closed all handles, stopping the loop"); llarp::LogDebug("Closed all handles, stopping the loop");
m_Impl->stop(); m_Impl->stop();
}
m_Run.store(false);
}
void m_Run.store(false);
Loop::stopped()
{
if (m_Impl)
{
m_Impl->close();
m_Impl.reset();
} }
llarp::LogInfo("we have stopped");
} }
bool bool
@ -248,9 +257,8 @@ namespace llarp::uv
m_WakeUp->send(); m_WakeUp->send();
return; return;
} }
const bool inEventLoop = *m_EventLoopThreadID == std::this_thread::get_id();
if (inEventLoop and m_LogicCalls.full()) if (inEventLoop() and m_LogicCalls.full())
{ {
FlushLogic(); FlushLogic();
} }
@ -333,7 +341,7 @@ namespace llarp::uv
} }
bool bool
Loop::inEventLoopThread() const Loop::inEventLoop() const
{ {
return m_EventLoopThreadID and *m_EventLoopThreadID == std::this_thread::get_id(); return m_EventLoopThreadID and *m_EventLoopThreadID == std::this_thread::get_id();
} }

@ -2,7 +2,6 @@
#define LLARP_EV_LIBUV_HPP #define LLARP_EV_LIBUV_HPP
#include <ev/ev.hpp> #include <ev/ev.hpp>
#include "udp_handle.hpp" #include "udp_handle.hpp"
#include <util/thread/logic.hpp>
#include <util/thread/queue.hpp> #include <util/thread/queue.hpp>
#include <util/meta/memfn.hpp> #include <util/meta/memfn.hpp>
@ -20,20 +19,21 @@ namespace llarp::uv
class UVWakeup; class UVWakeup;
class UVRepeater; class UVRepeater;
struct Loop final : public llarp::EventLoop class Loop final : public llarp::EventLoop
{ {
public:
using Callback = std::function<void()>; using Callback = std::function<void()>;
Loop(size_t queue_size); Loop(size_t queue_size);
void void
run_loop() override; run() override;
bool bool
running() const override; running() const override;
void void
call_after_delay(llarp_time_t delay_ms, std::function<void(void)> callback) override; call_later(llarp_time_t delay_ms, std::function<void(void)> callback) override;
void void
tick_event_loop(); tick_event_loop();
@ -41,9 +41,6 @@ namespace llarp::uv
void void
stop() override; stop() override;
void
stopped() override;
bool bool
add_ticker(std::function<void(void)> ticker) override; add_ticker(std::function<void(void)> ticker) override;
@ -52,12 +49,6 @@ namespace llarp::uv
std::shared_ptr<llarp::vpn::NetworkInterface> netif, std::shared_ptr<llarp::vpn::NetworkInterface> netif,
std::function<void(llarp::net::IPPacket)> handler) override; std::function<void(llarp::net::IPPacket)> handler) override;
void
set_logic(const std::shared_ptr<llarp::Logic>& l) override
{
l->SetQueuer([this](std::function<void()> f) { call_soon(std::move(f)); });
}
void void
call_soon(std::function<void(void)> f) override; call_soon(std::function<void(void)> f) override;
@ -79,7 +70,7 @@ namespace llarp::uv
std::function<void(void)> PumpLL; std::function<void(void)> PumpLL;
bool bool
inEventLoopThread() const override; inEventLoop() const override;
private: private:
std::shared_ptr<uvw::Loop> m_Impl; std::shared_ptr<uvw::Loop> m_Impl;

@ -1,7 +1,6 @@
#include <ev/vpnio.hpp> #include <ev/vpnio.hpp>
#include <llarp.hpp> #include <llarp.hpp>
#include <router/abstractrouter.hpp> #include <router/abstractrouter.hpp>
#include <util/thread/logic.hpp>
void void
llarp_vpn_io_impl::AsyncClose() llarp_vpn_io_impl::AsyncClose()

@ -13,12 +13,12 @@ namespace llarp
{ {
namespace handlers namespace handlers
{ {
ExitEndpoint::ExitEndpoint(const std::string& name, AbstractRouter* r) ExitEndpoint::ExitEndpoint(std::string name, AbstractRouter* r)
: m_Router(r) : m_Router(r)
, m_Resolver(std::make_shared<dns::Proxy>(r->netloop(), r->logic(), this)) , m_Resolver(std::make_shared<dns::Proxy>(r->loop(), this))
, m_Name(name) , m_Name(std::move(name))
, m_LocalResolverAddr("127.0.0.1", 53) , m_LocalResolverAddr("127.0.0.1", 53)
, m_InetToNetwork(name + "_exit_rx", r->netloop(), r->netloop()) , m_InetToNetwork(name + "_exit_rx", r->loop(), r->loop())
{ {
m_ShouldInitTun = true; m_ShouldInitTun = true;
@ -312,15 +312,14 @@ namespace llarp
llarp::LogError("Could not create interface"); llarp::LogError("Could not create interface");
return false; return false;
} }
auto loop = GetRouter()->netloop(); if (not GetRouter()->loop()->add_network_interface(
if (not loop->add_network_interface( m_NetIf, [this](net::IPPacket pkt) { OnInetPacket(std::move(pkt)); }))
m_NetIf, [&](net::IPPacket pkt) { OnInetPacket(std::move(pkt)); }))
{ {
llarp::LogWarn("Could not create tunnel for exit endpoint"); llarp::LogWarn("Could not create tunnel for exit endpoint");
return false; return false;
} }
loop->add_ticker([&]() { Flush(); }); GetRouter()->loop()->add_ticker([this] { Flush(); });
llarp::LogInfo("Trying to start resolver ", m_LocalResolverAddr.toString()); llarp::LogInfo("Trying to start resolver ", m_LocalResolverAddr.toString());
return m_Resolver->Start(m_LocalResolverAddr.createSockAddr(), m_UpstreamResolvers); return m_Resolver->Start(m_LocalResolverAddr.createSockAddr(), m_UpstreamResolvers);

@ -13,7 +13,7 @@ namespace llarp
{ {
struct ExitEndpoint : public dns::IQueryHandler struct ExitEndpoint : public dns::IQueryHandler
{ {
ExitEndpoint(const std::string& name, AbstractRouter* r); ExitEndpoint(std::string name, AbstractRouter* r);
~ExitEndpoint() override; ~ExitEndpoint() override;
void void

@ -18,7 +18,6 @@
#include <service/outbound_context.hpp> #include <service/outbound_context.hpp>
#include <service/name.hpp> #include <service/name.hpp>
#include <util/meta/memfn.hpp> #include <util/meta/memfn.hpp>
#include <util/thread/logic.hpp>
#include <nodedb.hpp> #include <nodedb.hpp>
#include <rpc/endpoint_rpc.hpp> #include <rpc/endpoint_rpc.hpp>
@ -48,7 +47,7 @@ namespace llarp
TunEndpoint* const m_Endpoint; TunEndpoint* const m_Endpoint;
explicit DnsInterceptor(AbstractRouter* router, TunEndpoint* ep) explicit DnsInterceptor(AbstractRouter* router, TunEndpoint* ep)
: dns::PacketHandler{router->logic(), ep}, m_Endpoint{ep} {}; : dns::PacketHandler{router->loop(), ep}, m_Endpoint{ep} {};
void void
SendServerMessageBufferTo( SendServerMessageBufferTo(
@ -97,7 +96,7 @@ namespace llarp
TunEndpoint::TunEndpoint(AbstractRouter* r, service::Context* parent) TunEndpoint::TunEndpoint(AbstractRouter* r, service::Context* parent)
: service::Endpoint(r, parent) : service::Endpoint(r, parent)
, m_UserToNetworkPktQueue("endpoint_sendq", r->netloop(), r->netloop()) , m_UserToNetworkPktQueue("endpoint_sendq", r->loop(), r->loop())
{ {
m_PacketRouter.reset( m_PacketRouter.reset(
new vpn::PacketRouter{[&](net::IPPacket pkt) { HandleGotUserPacket(std::move(pkt)); }}); new vpn::PacketRouter{[&](net::IPPacket pkt) { HandleGotUserPacket(std::move(pkt)); }});
@ -121,7 +120,7 @@ namespace llarp
HandleGotUserPacket(std::move(pkt)); HandleGotUserPacket(std::move(pkt));
}); });
#else #else
m_Resolver = std::make_shared<dns::Proxy>(r->netloop(), r->logic(), this); m_Resolver = std::make_shared<dns::Proxy>(r->loop(), this);
#endif #endif
} }
@ -838,9 +837,8 @@ namespace llarp
m_IfName = m_NetIf->IfName(); m_IfName = m_NetIf->IfName();
LogInfo(Name(), " got network interface ", m_IfName); LogInfo(Name(), " got network interface ", m_IfName);
auto netloop = Router()->netloop(); if (not Router()->loop()->add_network_interface(
if (not netloop->add_network_interface( m_NetIf, [this](net::IPPacket pkt) { m_PacketRouter->HandleIPPacket(std::move(pkt)); }))
m_NetIf, [&](net::IPPacket pkt) { m_PacketRouter->HandleIPPacket(std::move(pkt)); }))
{ {
LogError(Name(), " failed to add network interface"); LogError(Name(), " failed to add network interface");
return false; return false;
@ -853,7 +851,7 @@ namespace llarp
LogInfo(Name(), " has ipv6 address ", m_OurIPv6); LogInfo(Name(), " has ipv6 address ", m_OurIPv6);
} }
netloop->add_ticker([&]() { Flush(); }); Router()->loop()->add_ticker([this] { Flush(); });
if (m_OnUp) if (m_OnUp)
{ {

@ -2,7 +2,6 @@
#define LLARP_I_LINK_MANAGER_HPP #define LLARP_I_LINK_MANAGER_HPP
#include <link/server.hpp> #include <link/server.hpp>
#include <util/thread/logic.hpp>
#include <util/types.hpp> #include <util/types.hpp>
#include <peerstats/peer_db.hpp> #include <peerstats/peer_db.hpp>
@ -13,8 +12,6 @@ struct llarp_buffer_t;
namespace llarp namespace llarp
{ {
using Logic_ptr = std::shared_ptr<Logic>;
struct RouterContact; struct RouterContact;
struct ILinkSession; struct ILinkSession;
struct IOutboundSessionMaker; struct IOutboundSessionMaker;
@ -52,7 +49,7 @@ namespace llarp
AddLink(LinkLayer_ptr link, bool inbound = false) = 0; AddLink(LinkLayer_ptr link, bool inbound = false) = 0;
virtual bool virtual bool
StartLinks(Logic_ptr logic) = 0; StartLinks(const EventLoop_ptr& loop) = 0;
virtual void virtual void
Stop() = 0; Stop() = 0;

@ -121,12 +121,12 @@ namespace llarp
} }
bool bool
LinkManager::StartLinks(Logic_ptr logic) LinkManager::StartLinks(const EventLoop_ptr& loop)
{ {
LogInfo("starting ", outboundLinks.size(), " outbound links"); LogInfo("starting ", outboundLinks.size(), " outbound links");
for (const auto& link : outboundLinks) for (const auto& link : outboundLinks)
{ {
if (!link->Start(logic)) if (!link->Start(loop))
{ {
LogWarn("outbound link '", link->Name(), "' failed to start"); LogWarn("outbound link '", link->Name(), "' failed to start");
return false; return false;
@ -139,7 +139,7 @@ namespace llarp
LogInfo("starting ", inboundLinks.size(), " inbound links"); LogInfo("starting ", inboundLinks.size(), " inbound links");
for (const auto& link : inboundLinks) for (const auto& link : inboundLinks)
{ {
if (!link->Start(logic)) if (!link->Start(loop))
{ {
LogWarn("Link ", link->Name(), " failed to start"); LogWarn("Link ", link->Name(), " failed to start");
return false; return false;

@ -47,7 +47,7 @@ namespace llarp
AddLink(LinkLayer_ptr link, bool inbound = false) override; AddLink(LinkLayer_ptr link, bool inbound = false) override;
bool bool
StartLinks(Logic_ptr logic) override; StartLinks(const EventLoop_ptr& loop) override;
void void
Stop() override; Stop() override;

@ -341,11 +341,11 @@ namespace llarp
} }
bool bool
ILinkLayer::Start(const std::shared_ptr<Logic>& logic) ILinkLayer::Start(const EventLoop_ptr& loop)
{ {
// Tie the lifetime of this repeater to this arbitrary shared_ptr: // Tie the lifetime of this repeater to this arbitrary shared_ptr:
m_repeater_keepalive = std::make_shared<int>(42); m_repeater_keepalive = std::make_shared<int>(0);
logic->call_every(LINK_LAYER_TICK_INTERVAL, m_repeater_keepalive, [this] { Tick(Now()); }); loop->call_every(LINK_LAYER_TICK_INTERVAL, m_repeater_keepalive, [this] { Tick(Now()); });
return true; return true;
} }

@ -7,7 +7,6 @@
#include <net/sock_addr.hpp> #include <net/sock_addr.hpp>
#include <router_contact.hpp> #include <router_contact.hpp>
#include <util/status.hpp> #include <util/status.hpp>
#include <util/thread/logic.hpp>
#include <util/thread/threading.hpp> #include <util/thread/threading.hpp>
#include <config/key_manager.hpp> #include <config/key_manager.hpp>
@ -132,7 +131,7 @@ namespace llarp
TryEstablishTo(RouterContact rc); TryEstablishTo(RouterContact rc);
bool bool
Start(const std::shared_ptr<llarp::Logic>& l); Start(const EventLoop_ptr& loop);
virtual void virtual void
Stop(); Stop();

@ -12,7 +12,6 @@
#include <util/buffer.hpp> #include <util/buffer.hpp>
#include <util/logging/logger.hpp> #include <util/logging/logger.hpp>
#include <util/meta/memfn.hpp> #include <util/meta/memfn.hpp>
#include <util/thread/logic.hpp>
#include <tooling/path_event.hpp> #include <tooling/path_event.hpp>
#include <functional> #include <functional>
@ -238,10 +237,8 @@ namespace llarp
break; break;
} }
auto func = router->QueueWork([router, pathid, nextHop, pathKey, status] {
std::bind(&LR_StatusMessage::CreateAndSend, router, pathid, nextHop, pathKey, status); LR_StatusMessage::CreateAndSend(router, pathid, nextHop, pathKey, status); });
router->QueueWork(func);
} }
/// this is done from logic thread /// this is done from logic thread
@ -438,7 +435,7 @@ namespace llarp
// we are the farthest hop // we are the farthest hop
llarp::LogDebug("We are the farthest hop for ", info); llarp::LogDebug("We are the farthest hop for ", info);
// send a LRSM down the path // send a LRSM down the path
LogicCall(self->context->logic(), [=]() { self->context->loop()->call([self] {
SendPathConfirm(self); SendPathConfirm(self);
self->decrypter = nullptr; self->decrypter = nullptr;
}); });
@ -447,7 +444,7 @@ namespace llarp
{ {
// forward upstream // forward upstream
// we are still in the worker thread so post job to logic // we are still in the worker thread so post job to logic
LogicCall(self->context->logic(), [=]() { self->context->loop()->call([self] {
SendLRCM(self); SendLRCM(self);
self->decrypter = nullptr; self->decrypter = nullptr;
}); });

@ -9,7 +9,6 @@
#include <util/buffer.hpp> #include <util/buffer.hpp>
#include <util/logging/logger.hpp> #include <util/logging/logger.hpp>
#include <util/meta/memfn.hpp> #include <util/meta/memfn.hpp>
#include <util/thread/logic.hpp>
#include <tooling/path_event.hpp> #include <tooling/path_event.hpp>
#include <functional> #include <functional>
@ -224,8 +223,8 @@ namespace llarp
LR_StatusMessage::QueueSendMessage( LR_StatusMessage::QueueSendMessage(
AbstractRouter* router, const RouterID nextHop, std::shared_ptr<LR_StatusMessage> msg) AbstractRouter* router, const RouterID nextHop, std::shared_ptr<LR_StatusMessage> msg)
{ {
auto func = std::bind(&LR_StatusMessage::SendMessage, router, nextHop, msg); router->loop()->call([router, nextHop, msg=std::move(msg)] {
LogicCall(router->logic(), func); SendMessage(router, nextHop, msg); });
} }
void void

@ -7,7 +7,6 @@
#include <util/fs.hpp> #include <util/fs.hpp>
#include <util/logging/logger.hpp> #include <util/logging/logger.hpp>
#include <util/mem.hpp> #include <util/mem.hpp>
#include <util/thread/logic.hpp>
#include <util/str.hpp> #include <util/str.hpp>
#include <dht/kademlia.hpp> #include <dht/kademlia.hpp>

@ -20,8 +20,6 @@
namespace llarp namespace llarp
{ {
class Logic;
class NodeDB class NodeDB
{ {
struct Entry struct Entry

@ -14,7 +14,6 @@
#include <routing/transfer_traffic_message.hpp> #include <routing/transfer_traffic_message.hpp>
#include <util/buffer.hpp> #include <util/buffer.hpp>
#include <util/endian.hpp> #include <util/endian.hpp>
#include <util/thread/logic.hpp>
#include <tooling/path_event.hpp> #include <tooling/path_event.hpp>
#include <deque> #include <deque>
@ -174,8 +173,7 @@ namespace llarp
if ((currentStatus & LR_StatusRecord::SUCCESS) == LR_StatusRecord::SUCCESS) if ((currentStatus & LR_StatusRecord::SUCCESS) == LR_StatusRecord::SUCCESS)
{ {
llarp::LogDebug("LR_Status message processed, path build successful"); llarp::LogDebug("LR_Status message processed, path build successful");
auto self = shared_from_this(); r->loop()->call([r, self = shared_from_this()] { self->HandlePathConfirmMessage(r); });
LogicCall(r->logic(), [=]() { self->HandlePathConfirmMessage(r); });
} }
else else
{ {
@ -231,8 +229,7 @@ namespace llarp
{ {
llarp::LogDebug("Path build failed for an unspecified reason"); llarp::LogDebug("Path build failed for an unspecified reason");
} }
auto self = shared_from_this(); r->loop()->call([r, self = shared_from_this()]() { self->EnterState(ePathFailed, r->Now()); });
LogicCall(r->logic(), [=]() { self->EnterState(ePathFailed, r->Now()); });
} }
// TODO: meaningful return value? // TODO: meaningful return value?
@ -439,7 +436,7 @@ namespace llarp
msg.pathid = TXID(); msg.pathid = TXID();
++idx; ++idx;
} }
LogicCall(r->logic(), [self = shared_from_this(), data = std::move(sendmsgs), r]() { r->loop()->call([self = shared_from_this(), data = std::move(sendmsgs), r] () mutable {
self->HandleAllUpstream(std::move(data), r); self->HandleAllUpstream(std::move(data), r);
}); });
} }
@ -509,7 +506,7 @@ namespace llarp
sendMsgs[idx].X = buf; sendMsgs[idx].X = buf;
++idx; ++idx;
} }
LogicCall(r->logic(), [self = shared_from_this(), msgs = std::move(sendMsgs), r]() { r->loop()->call([self = shared_from_this(), msgs = std::move(sendMsgs), r] () mutable {
self->HandleAllDownstream(std::move(msgs), r); self->HandleAllDownstream(std::move(msgs), r);
}); });
} }

@ -28,7 +28,6 @@
namespace llarp namespace llarp
{ {
class Logic;
struct AbstractRouter; struct AbstractRouter;
struct LR_CommitMessage; struct LR_CommitMessage;

@ -42,10 +42,10 @@ namespace llarp
#endif #endif
} }
std::shared_ptr<Logic> const EventLoop_ptr&
PathContext::logic() PathContext::loop()
{ {
return m_Router->logic(); return m_Router->loop();
} }
const SecretKey& const SecretKey&

@ -18,7 +18,6 @@
namespace llarp namespace llarp
{ {
class Logic;
struct AbstractRouter; struct AbstractRouter;
struct LR_CommitMessage; struct LR_CommitMessage;
struct RelayDownstreamMessage; struct RelayDownstreamMessage;
@ -147,8 +146,8 @@ namespace llarp
} }
}; };
std::shared_ptr<Logic> const EventLoop_ptr&
logic(); loop();
AbstractRouter* AbstractRouter*
Router(); Router();

@ -7,7 +7,6 @@
#include <profiling.hpp> #include <profiling.hpp>
#include <router/abstractrouter.hpp> #include <router/abstractrouter.hpp>
#include <util/buffer.hpp> #include <util/buffer.hpp>
#include <util/thread/logic.hpp>
#include <tooling/path_event.hpp> #include <tooling/path_event.hpp>
#include <functional> #include <functional>
@ -28,7 +27,7 @@ namespace llarp
size_t idx = 0; size_t idx = 0;
AbstractRouter* router = nullptr; AbstractRouter* router = nullptr;
WorkerFunc_t work; WorkerFunc_t work;
std::shared_ptr<Logic> logic; EventLoop_ptr loop;
LR_CommitMessage LRCM; LR_CommitMessage LRCM;
void void
@ -97,21 +96,21 @@ namespace llarp
{ {
// farthest hop // farthest hop
// TODO: encrypt junk frames because our public keys are not eligator // TODO: encrypt junk frames because our public keys are not eligator
LogicCall(logic, std::bind(result, shared_from_this())); loop->call([self = shared_from_this()] { self->result(self); });
} }
else else
{ {
// next hop // next hop
work(std::bind(&AsyncPathKeyExchangeContext::GenerateNextKey, shared_from_this())); work([self = shared_from_this()] { self->GenerateNextKey(); });
} }
} }
/// Generate all keys asynchronously and call handler when done /// Generate all keys asynchronously and call handler when done
void void
AsyncGenerateKeys(Path_t p, std::shared_ptr<Logic> l, WorkerFunc_t worker, Handler func) AsyncGenerateKeys(Path_t p, EventLoop_ptr l, WorkerFunc_t worker, Handler func)
{ {
path = p; path = p;
logic = l; loop = std::move(l);
result = func; result = func;
work = worker; work = worker;
@ -119,7 +118,7 @@ namespace llarp
{ {
LRCM.frames[i].Randomize(); LRCM.frames[i].Randomize();
} }
work(std::bind(&AsyncPathKeyExchangeContext::GenerateNextKey, shared_from_this())); work([self = shared_from_this()] { self->GenerateNextKey(); });
} }
}; };
@ -393,7 +392,7 @@ namespace llarp
path->SetBuildResultHook([self](Path_ptr p) { self->HandlePathBuilt(p); }); path->SetBuildResultHook([self](Path_ptr p) { self->HandlePathBuilt(p); });
ctx->AsyncGenerateKeys( ctx->AsyncGenerateKeys(
path, path,
m_router->logic(), m_router->loop(),
[r = m_router](auto func) { r->QueueWork(std::move(func)); }, [r = m_router](auto func) { r->QueueWork(std::move(func)); },
&PathBuilderKeysGenerated); &PathBuilderKeysGenerated);
} }

@ -121,13 +121,10 @@ namespace llarp
{ {
auto flushIt = [self = shared_from_this(), r]() { auto flushIt = [self = shared_from_this(), r]() {
std::vector<RelayDownstreamMessage> msgs; std::vector<RelayDownstreamMessage> msgs;
do while (auto maybe = self->m_DownstreamGather.tryPopFront())
{ {
auto maybe = self->m_DownstreamGather.tryPopFront(); msgs.push_back(*maybe);
if (not maybe) }
break;
msgs.emplace_back(*maybe);
} while (true);
self->HandleAllDownstream(std::move(msgs), r); self->HandleAllDownstream(std::move(msgs), r);
}; };
for (auto& ev : *msgs) for (auto& ev : *msgs)
@ -147,12 +144,12 @@ namespace llarp
info.downstream); info.downstream);
if (m_DownstreamGather.full()) if (m_DownstreamGather.full())
{ {
LogicCall(r->logic(), flushIt); r->loop()->call(flushIt);
} }
if (m_DownstreamGather.enabled()) if (m_DownstreamGather.enabled())
m_DownstreamGather.pushBack(msg); m_DownstreamGather.pushBack(msg);
} }
LogicCall(r->logic(), flushIt); r->loop()->call(flushIt);
} }
void void
@ -160,13 +157,10 @@ namespace llarp
{ {
auto flushIt = [self = shared_from_this(), r]() { auto flushIt = [self = shared_from_this(), r]() {
std::vector<RelayUpstreamMessage> msgs; std::vector<RelayUpstreamMessage> msgs;
do while (auto maybe = self->m_UpstreamGather.tryPopFront())
{ {
auto maybe = self->m_UpstreamGather.tryPopFront(); msgs.push_back(*maybe);
if (not maybe) }
break;
msgs.emplace_back(*maybe);
} while (true);
self->HandleAllUpstream(std::move(msgs), r); self->HandleAllUpstream(std::move(msgs), r);
}; };
for (auto& ev : *msgs) for (auto& ev : *msgs)
@ -179,12 +173,12 @@ namespace llarp
msg.X = buf; msg.X = buf;
if (m_UpstreamGather.full()) if (m_UpstreamGather.full())
{ {
LogicCall(r->logic(), flushIt); r->loop()->call(flushIt);
} }
if (m_UpstreamGather.enabled()) if (m_UpstreamGather.enabled())
m_UpstreamGather.pushBack(msg); m_UpstreamGather.pushBack(msg);
} }
LogicCall(r->logic(), flushIt); r->loop()->call(flushIt);
} }
void void
@ -488,8 +482,7 @@ namespace llarp
void void
TransitHop::QueueDestroySelf(AbstractRouter* r) TransitHop::QueueDestroySelf(AbstractRouter* r)
{ {
auto func = std::bind(&TransitHop::SetSelfDestruct, shared_from_this()); r->loop()->call([self = shared_from_this()] { self->SetSelfDestruct(); });
LogicCall(r->logic(), func);
} }
} // namespace path } // namespace path
} // namespace llarp } // namespace llarp

@ -29,7 +29,6 @@ namespace oxenmq
namespace llarp namespace llarp
{ {
class NodeDB; class NodeDB;
class Logic;
struct Config; struct Config;
struct RouterID; struct RouterID;
struct ILinkMessage; struct ILinkMessage;
@ -101,9 +100,6 @@ namespace llarp
virtual const std::shared_ptr<rpc::LokidRpcClient>& virtual const std::shared_ptr<rpc::LokidRpcClient>&
RpcClient() const = 0; RpcClient() const = 0;
virtual const std::shared_ptr<Logic>&
logic() const = 0;
virtual llarp_dht_context* virtual llarp_dht_context*
dht() const = 0; dht() const = 0;
@ -135,7 +131,7 @@ namespace llarp
routerProfiling() = 0; routerProfiling() = 0;
virtual const EventLoop_ptr& virtual const EventLoop_ptr&
netloop() const = 0; loop() const = 0;
/// call function in crypto worker /// call function in crypto worker
virtual void QueueWork(std::function<void(void)>) = 0; virtual void QueueWork(std::function<void(void)>) = 0;

@ -113,11 +113,11 @@ namespace llarp
void void
OutboundMessageHandler::Init( OutboundMessageHandler::Init(
ILinkManager* linkManager, I_RCLookupHandler* lookupHandler, std::shared_ptr<Logic> logic) ILinkManager* linkManager, I_RCLookupHandler* lookupHandler, EventLoop_ptr loop)
{ {
_linkManager = linkManager; _linkManager = linkManager;
_lookupHandler = lookupHandler; _lookupHandler = lookupHandler;
_logic = logic; _loop = std::move(loop);
outboundMessageQueues.emplace(zeroID, MessageQueue()); outboundMessageQueues.emplace(zeroID, MessageQueue());
} }
@ -184,8 +184,8 @@ namespace llarp
{ {
if (callback) if (callback)
{ {
auto f = std::bind(callback, status); auto f = [f=std::move(callback), status] { f(status); };
LogicCall(_logic, [self = this, f]() { self->m_Killer.TryAccess(f); }); _loop->call([this, f=std::move(f)] { m_Killer.TryAccess(f); });
} }
} }

@ -3,7 +3,7 @@
#include <router/i_outbound_message_handler.hpp> #include <router/i_outbound_message_handler.hpp>
#include <util/thread/logic.hpp> #include <ev/ev.hpp>
#include <util/thread/queue.hpp> #include <util/thread/queue.hpp>
#include <path/path_types.hpp> #include <path/path_types.hpp>
#include <router_id.hpp> #include <router_id.hpp>
@ -19,7 +19,6 @@ namespace llarp
{ {
struct ILinkManager; struct ILinkManager;
struct I_RCLookupHandler; struct I_RCLookupHandler;
class Logic;
enum class SessionResult; enum class SessionResult;
struct OutboundMessageHandler final : public IOutboundMessageHandler struct OutboundMessageHandler final : public IOutboundMessageHandler
@ -43,7 +42,7 @@ namespace llarp
ExtractStatus() const override; ExtractStatus() const override;
void void
Init(ILinkManager* linkManager, I_RCLookupHandler* lookupHandler, std::shared_ptr<Logic> logic); Init(ILinkManager* linkManager, I_RCLookupHandler* lookupHandler, EventLoop_ptr loop);
private: private:
using Message = std::pair<std::vector<byte_t>, SendStatusHandler>; using Message = std::pair<std::vector<byte_t>, SendStatusHandler>;
@ -139,7 +138,7 @@ namespace llarp
ILinkManager* _linkManager; ILinkManager* _linkManager;
I_RCLookupHandler* _lookupHandler; I_RCLookupHandler* _lookupHandler;
std::shared_ptr<Logic> _logic; EventLoop_ptr _loop;
util::ContentionKiller m_Killer; util::ContentionKiller m_Killer;

@ -8,7 +8,6 @@
#include <router/i_rc_lookup_handler.hpp> #include <router/i_rc_lookup_handler.hpp>
#include <link/i_link_manager.hpp> #include <link/i_link_manager.hpp>
#include <util/meta/memfn.hpp> #include <util/meta/memfn.hpp>
#include <util/thread/logic.hpp>
#include <util/thread/threading.hpp> #include <util/thread/threading.hpp>
#include <util/status.hpp> #include <util/status.hpp>
#include <crypto/crypto.hpp> #include <crypto/crypto.hpp>
@ -163,16 +162,16 @@ namespace llarp
ILinkManager* linkManager, ILinkManager* linkManager,
I_RCLookupHandler* rcLookup, I_RCLookupHandler* rcLookup,
Profiling* profiler, Profiling* profiler,
std::shared_ptr<Logic> logic, EventLoop_ptr loop,
WorkerFunc_t dowork) WorkerFunc_t dowork)
{ {
_router = router; _router = router;
_linkManager = linkManager; _linkManager = linkManager;
_rcLookup = rcLookup; _rcLookup = rcLookup;
_logic = logic; _loop = std::move(loop);
_nodedb = router->nodedb(); _nodedb = router->nodedb();
_profiler = profiler; _profiler = profiler;
work = dowork; work = std::move(dowork);
} }
void void
@ -226,8 +225,7 @@ namespace llarp
} }
if (ShouldConnectTo(router)) if (ShouldConnectTo(router))
{ {
auto fn = std::bind(&OutboundSessionMaker::DoEstablish, this, router); _loop->call([this, router] { DoEstablish(router); });
LogicCall(_logic, fn);
} }
} }
@ -348,8 +346,7 @@ namespace llarp
for (const auto& callback : movedCallbacks) for (const auto& callback : movedCallbacks)
{ {
auto func = std::bind(callback, router, type); _loop->call([callback, router, type] { return callback(router, type); });
LogicCall(_logic, func);
} }
{ {

@ -4,7 +4,6 @@
#include <router/i_outbound_session_maker.hpp> #include <router/i_outbound_session_maker.hpp>
#include <router/i_rc_lookup_handler.hpp> #include <router/i_rc_lookup_handler.hpp>
#include <util/thread/logic.hpp>
#include <util/thread/threading.hpp> #include <util/thread/threading.hpp>
#include <profiling.hpp> #include <profiling.hpp>
@ -60,7 +59,7 @@ namespace llarp
ILinkManager* linkManager, ILinkManager* linkManager,
I_RCLookupHandler* rcLookup, I_RCLookupHandler* rcLookup,
Profiling* profiler, Profiling* profiler,
std::shared_ptr<Logic> logic, EventLoop_ptr loop,
WorkerFunc_t work); WorkerFunc_t work);
void void
@ -113,7 +112,7 @@ namespace llarp
I_RCLookupHandler* _rcLookup = nullptr; I_RCLookupHandler* _rcLookup = nullptr;
Profiling* _profiler = nullptr; Profiling* _profiler = nullptr;
std::shared_ptr<NodeDB> _nodedb; std::shared_ptr<NodeDB> _nodedb;
std::shared_ptr<Logic> _logic; EventLoop_ptr _loop;
WorkerFunc_t work; WorkerFunc_t work;
RouterID us; RouterID us;
}; };

@ -157,7 +157,7 @@ namespace llarp
{ {
LogDebug("Adding or updating RC for ", RouterID(rc.pubkey), " to nodedb and dht."); LogDebug("Adding or updating RC for ", RouterID(rc.pubkey), " to nodedb and dht.");
const RouterContact copy{rc}; const RouterContact copy{rc};
LogicCall(_logic, [copy, n = _nodedb]() { n->PutIfNewer(copy); }); _loop->call([rc, n=_nodedb] { n->PutIfNewer(rc); });
_dht->impl->PutRCNodeAsync(rc); _dht->impl->PutRCNodeAsync(rc);
} }
@ -301,7 +301,7 @@ namespace llarp
RCLookupHandler::Init( RCLookupHandler::Init(
llarp_dht_context* dht, llarp_dht_context* dht,
std::shared_ptr<NodeDB> nodedb, std::shared_ptr<NodeDB> nodedb,
std::shared_ptr<Logic> logic, EventLoop_ptr loop,
WorkerFunc_t dowork, WorkerFunc_t dowork,
ILinkManager* linkManager, ILinkManager* linkManager,
service::Context* hiddenServiceContext, service::Context* hiddenServiceContext,
@ -311,9 +311,9 @@ namespace llarp
bool isServiceNode_arg) bool isServiceNode_arg)
{ {
_dht = dht; _dht = dht;
_nodedb = nodedb; _nodedb = std::move(nodedb);
_logic = logic; _loop = std::move(loop);
_work = dowork; _work = std::move(dowork);
_hiddenServiceContext = hiddenServiceContext; _hiddenServiceContext = hiddenServiceContext;
_strictConnectPubkeys = strictConnectPubkeys; _strictConnectPubkeys = strictConnectPubkeys;
_bootstrapRCList = bootstrapRCList; _bootstrapRCList = bootstrapRCList;

@ -16,7 +16,7 @@ struct llarp_dht_context;
namespace llarp namespace llarp
{ {
class NodeDB; class NodeDB;
class Logic; class EventLoop;
namespace service namespace service
{ {
@ -76,7 +76,7 @@ namespace llarp
Init( Init(
llarp_dht_context* dht, llarp_dht_context* dht,
std::shared_ptr<NodeDB> nodedb, std::shared_ptr<NodeDB> nodedb,
std::shared_ptr<Logic> logic, std::shared_ptr<EventLoop> loop,
WorkerFunc_t dowork, WorkerFunc_t dowork,
ILinkManager* linkManager, ILinkManager* linkManager,
service::Context* hiddenServiceContext, service::Context* hiddenServiceContext,
@ -103,7 +103,7 @@ namespace llarp
llarp_dht_context* _dht = nullptr; llarp_dht_context* _dht = nullptr;
std::shared_ptr<NodeDB> _nodedb; std::shared_ptr<NodeDB> _nodedb;
std::shared_ptr<Logic> _logic; std::shared_ptr<EventLoop> _loop;
WorkerFunc_t _work = nullptr; WorkerFunc_t _work = nullptr;
service::Context* _hiddenServiceContext = nullptr; service::Context* _hiddenServiceContext = nullptr;
ILinkManager* _linkManager = nullptr; ILinkManager* _linkManager = nullptr;

@ -47,11 +47,10 @@ static constexpr std::chrono::milliseconds ROUTER_TICK_INTERVAL = 1s;
namespace llarp namespace llarp
{ {
Router::Router( Router::Router(
EventLoop_ptr netloop, std::shared_ptr<Logic> l, std::shared_ptr<vpn::Platform> vpnPlatform) EventLoop_ptr loop, std::shared_ptr<vpn::Platform> vpnPlatform)
: ready(false) : ready(false)
, m_lmq(std::make_shared<oxenmq::OxenMQ>()) , m_lmq(std::make_shared<oxenmq::OxenMQ>())
, _netloop(std::move(netloop)) , _loop(std::move(loop))
, _logic(std::move(l))
, _vpnPlatform(std::move(vpnPlatform)) , _vpnPlatform(std::move(vpnPlatform))
, paths(this) , paths(this)
, _exitContext(this) , _exitContext(this)
@ -364,7 +363,7 @@ namespace llarp
if (_onDown) if (_onDown)
_onDown(); _onDown();
LogInfo("closing router"); LogInfo("closing router");
_netloop->stop(); _loop->stop();
_running.store(false); _running.store(false);
} }
@ -563,19 +562,19 @@ namespace llarp
LogInfo("Loaded ", bootstrapRCList.size(), " bootstrap routers"); LogInfo("Loaded ", bootstrapRCList.size(), " bootstrap routers");
// Init components after relevant config settings loaded // Init components after relevant config settings loaded
_outboundMessageHandler.Init(&_linkManager, &_rcLookupHandler, _logic); _outboundMessageHandler.Init(&_linkManager, &_rcLookupHandler, _loop);
_outboundSessionMaker.Init( _outboundSessionMaker.Init(
this, this,
&_linkManager, &_linkManager,
&_rcLookupHandler, &_rcLookupHandler,
&_routerProfiling, &_routerProfiling,
_logic, _loop,
util::memFn(&AbstractRouter::QueueWork, this)); util::memFn(&AbstractRouter::QueueWork, this));
_linkManager.Init(&_outboundSessionMaker); _linkManager.Init(&_outboundSessionMaker);
_rcLookupHandler.Init( _rcLookupHandler.Init(
_dht, _dht,
_nodedb, _nodedb,
_logic, _loop,
util::memFn(&AbstractRouter::QueueWork, this), util::memFn(&AbstractRouter::QueueWork, this),
&_linkManager, &_linkManager,
&_hiddenServiceContext, &_hiddenServiceContext,
@ -603,7 +602,7 @@ namespace llarp
{ {
auto server = iwp::NewInboundLink( auto server = iwp::NewInboundLink(
m_keyManager, m_keyManager,
netloop(), loop(),
util::memFn(&AbstractRouter::rc, this), util::memFn(&AbstractRouter::rc, this),
util::memFn(&AbstractRouter::HandleRecvLinkMessageBuffer, this), util::memFn(&AbstractRouter::HandleRecvLinkMessageBuffer, this),
util::memFn(&AbstractRouter::Sign, this), util::memFn(&AbstractRouter::Sign, this),
@ -618,7 +617,7 @@ namespace llarp
const std::string& key = serverConfig.interface; const std::string& key = serverConfig.interface;
int af = serverConfig.addressFamily; int af = serverConfig.addressFamily;
uint16_t port = serverConfig.port; uint16_t port = serverConfig.port;
if (!server->Configure(netloop(), key, af, port)) if (!server->Configure(loop(), key, af, port))
{ {
throw std::runtime_error(stringify("failed to bind inbound link on ", key, " port ", port)); throw std::runtime_error(stringify("failed to bind inbound link on ", key, " port ", port));
} }
@ -1059,7 +1058,7 @@ namespace llarp
} }
} }
_outboundSessionMaker.SetOurRouter(pubkey()); _outboundSessionMaker.SetOurRouter(pubkey());
if (!_linkManager.StartLinks(_logic)) if (!_linkManager.StartLinks(_loop))
{ {
LogWarn("One or more links failed to start."); LogWarn("One or more links failed to start.");
return false; return false;
@ -1120,11 +1119,11 @@ namespace llarp
#ifdef _WIN32 #ifdef _WIN32
// windows uses proactor event loop so we need to constantly pump // windows uses proactor event loop so we need to constantly pump
_netloop->add_ticker([this] { PumpLL(); }); _loop->add_ticker([this] { PumpLL(); });
#else #else
_netloop->set_pump_function([this] { PumpLL(); }); _loop->set_pump_function([this] { PumpLL(); });
#endif #endif
_logic->call_every(ROUTER_TICK_INTERVAL, weak_from_this(), [this] { Tick(); }); _loop->call_every(ROUTER_TICK_INTERVAL, weak_from_this(), [this] { Tick(); });
_running.store(true); _running.store(true);
_startedAt = Now(); _startedAt = Now();
#if defined(WITH_SYSTEMD) #if defined(WITH_SYSTEMD)
@ -1161,7 +1160,7 @@ namespace llarp
{ {
StopLinks(); StopLinks();
nodedb()->SaveToDisk(); nodedb()->SaveToDisk();
_logic->call_later(200ms, std::bind(&Router::AfterStopLinks, this)); _loop->call_later(200ms, [this] { AfterStopLinks(); });
} }
void void
@ -1208,7 +1207,7 @@ namespace llarp
_exitContext.Stop(); _exitContext.Stop();
paths.PumpUpstream(); paths.PumpUpstream();
_linkManager.PumpLinks(); _linkManager.PumpLinks();
_logic->call_later(200ms, std::bind(&Router::AfterStopIssued, this)); _loop->call_later(200ms, [this] { AfterStopIssued(); });
} }
bool bool
@ -1299,7 +1298,7 @@ namespace llarp
{ {
auto link = iwp::NewOutboundLink( auto link = iwp::NewOutboundLink(
m_keyManager, m_keyManager,
netloop(), loop(),
util::memFn(&AbstractRouter::rc, this), util::memFn(&AbstractRouter::rc, this),
util::memFn(&AbstractRouter::HandleRecvLinkMessageBuffer, this), util::memFn(&AbstractRouter::HandleRecvLinkMessageBuffer, this),
util::memFn(&AbstractRouter::Sign, this), util::memFn(&AbstractRouter::Sign, this),
@ -1324,7 +1323,7 @@ namespace llarp
for (const auto af : {AF_INET, AF_INET6}) for (const auto af : {AF_INET, AF_INET6})
{ {
if (not link->Configure(netloop(), "*", af, m_OutboundPort)) if (not link->Configure(loop(), "*", af, m_OutboundPort))
continue; continue;
#if defined(ANDROID) #if defined(ANDROID)

@ -35,7 +35,6 @@
#include <util/mem.hpp> #include <util/mem.hpp>
#include <util/status.hpp> #include <util/status.hpp>
#include <util/str.hpp> #include <util/str.hpp>
#include <util/thread/logic.hpp>
#include <util/time.hpp> #include <util/time.hpp>
#include <functional> #include <functional>
@ -88,12 +87,6 @@ namespace llarp
return m_lokidRpcClient; return m_lokidRpcClient;
} }
const std::shared_ptr<Logic>&
logic() const override
{
return _logic;
}
llarp_dht_context* llarp_dht_context*
dht() const override dht() const override
{ {
@ -161,9 +154,9 @@ namespace llarp
} }
const EventLoop_ptr& const EventLoop_ptr&
netloop() const override loop() const override
{ {
return _netloop; return _loop;
} }
vpn::Platform* vpn::Platform*
@ -180,8 +173,7 @@ namespace llarp
std::optional<SockAddr> _ourAddress; std::optional<SockAddr> _ourAddress;
EventLoop_ptr _netloop; EventLoop_ptr _loop;
std::shared_ptr<Logic> _logic;
std::shared_ptr<vpn::Platform> _vpnPlatform; std::shared_ptr<vpn::Platform> _vpnPlatform;
path::PathContext paths; path::PathContext paths;
exit::Context _exitContext; exit::Context _exitContext;
@ -325,8 +317,7 @@ namespace llarp
GossipRCIfNeeded(const RouterContact rc) override; GossipRCIfNeeded(const RouterContact rc) override;
explicit Router( explicit Router(
EventLoop_ptr netloop, EventLoop_ptr loop,
std::shared_ptr<Logic> logic,
std::shared_ptr<vpn::Platform> vpnPlatform); std::shared_ptr<vpn::Platform> vpnPlatform);
~Router() override; ~Router() override;

@ -29,7 +29,7 @@ namespace llarp::rpc
}, },
[self = shared_from_this()](oxenmq::ConnectionID, std::string_view fail) { [self = shared_from_this()](oxenmq::ConnectionID, std::string_view fail) {
LogWarn("failed to connect to endpoint auth server: ", fail); LogWarn("failed to connect to endpoint auth server: ", fail);
self->m_Endpoint->Logic()->call_later(1s, [self] { self->Start(); }); self->m_Endpoint->Loop()->call_later(1s, [self] { self->Start(); });
}); });
} }
@ -44,11 +44,10 @@ namespace llarp::rpc
std::shared_ptr<llarp::service::ProtocolMessage> msg, std::shared_ptr<llarp::service::ProtocolMessage> msg,
std::function<void(service::AuthResult)> hook) std::function<void(service::AuthResult)> hook)
{ {
assert(m_Endpoint->Logic()->inLogicThread());
service::ConvoTag tag = msg->tag; service::ConvoTag tag = msg->tag;
m_PendingAuths.insert(tag); m_PendingAuths.insert(tag);
const auto from = msg->sender.Addr(); const auto from = msg->sender.Addr();
auto reply = m_Endpoint->Logic()->make_caller([this, tag, hook](service::AuthResult result) { auto reply = m_Endpoint->Loop()->make_caller([this, tag, hook](service::AuthResult result) {
m_PendingAuths.erase(tag); m_PendingAuths.erase(tag);
hook(result); hook(result);
}); });

@ -8,7 +8,6 @@
#include <nlohmann/json.hpp> #include <nlohmann/json.hpp>
#include <util/time.hpp> #include <util/time.hpp>
#include <util/thread/logic.hpp>
namespace llarp namespace llarp
{ {
@ -57,7 +56,7 @@ namespace llarp
[self = shared_from_this()](oxenmq::ConnectionID) { self->Connected(); }, [self = shared_from_this()](oxenmq::ConnectionID) { self->Connected(); },
[self = shared_from_this(), url](oxenmq::ConnectionID, std::string_view f) { [self = shared_from_this(), url](oxenmq::ConnectionID, std::string_view f) {
llarp::LogWarn("Failed to connect to lokid: ", f); llarp::LogWarn("Failed to connect to lokid: ", f);
LogicCall(self->m_Router->logic(), [self, url]() { self->ConnectAsync(url); }); self->m_Router->loop()->call([self, url]() { self->ConnectAsync(url); });
}); });
} }
@ -170,7 +169,7 @@ namespace llarp
return; return;
} }
// inform router about the new list // inform router about the new list
LogicCall(m_Router->logic(), [r = m_Router, nodeList = std::move(nodeList)]() mutable { m_Router->loop()->call([r = m_Router, nodeList = std::move(nodeList)]() mutable {
r->SetRouterWhitelist(std::move(nodeList)); r->SetRouterWhitelist(std::move(nodeList));
}); });
} }
@ -252,7 +251,7 @@ namespace llarp
LogError("failed to parse response from lns lookup: ", ex.what()); LogError("failed to parse response from lns lookup: ", ex.what());
} }
} }
LogicCall(r->logic(), [resultHandler, maybe]() { resultHandler(maybe); }); r->loop()->call([resultHandler, maybe=std::move(maybe)]() { resultHandler(std::move(maybe)); });
}, },
req.dump()); req.dump());
} }

@ -1,6 +1,5 @@
#include "rpc_server.hpp" #include "rpc_server.hpp"
#include <router/route_poker.hpp> #include <router/route_poker.hpp>
#include <util/thread/logic.hpp>
#include <constants/version.hpp> #include <constants/version.hpp>
#include <nlohmann/json.hpp> #include <nlohmann/json.hpp>
#include <net/ip_range.hpp> #include <net/ip_range.hpp>
@ -106,7 +105,7 @@ namespace llarp::rpc
.add_request_command( .add_request_command(
"status", "status",
[&](oxenmq::Message& msg) { [&](oxenmq::Message& msg) {
LogicCall(m_Router->logic(), [defer = msg.send_later(), r = m_Router]() { m_Router->loop()->call([defer = msg.send_later(), r = m_Router]() {
std::string data; std::string data;
if (r->IsRunning()) if (r->IsRunning())
{ {
@ -158,7 +157,7 @@ namespace llarp::rpc
reply(CreateJSONError("no action taken")); reply(CreateJSONError("no action taken"));
return; return;
} }
LogicCall(r->logic(), [r, endpoint, kills, reply]() { r->loop()->call([r, endpoint, kills, reply]() {
auto ep = r->hiddenServiceContext().GetEndpointByName(endpoint); auto ep = r->hiddenServiceContext().GetEndpointByName(endpoint);
if (ep == nullptr) if (ep == nullptr)
{ {
@ -236,8 +235,7 @@ namespace llarp::rpc
{ {
endpoint = endpoint_itr->get<std::string>(); endpoint = endpoint_itr->get<std::string>();
} }
LogicCall( r->loop()->call([map, exit, lnsExit, range, token, endpoint, r, reply]() mutable {
r->logic(), [map, exit, lnsExit, range, token, endpoint, r, reply]() mutable {
auto ep = r->hiddenServiceContext().GetEndpointByName(endpoint); auto ep = r->hiddenServiceContext().GetEndpointByName(endpoint);
if (ep == nullptr) if (ep == nullptr)
{ {
@ -274,6 +272,7 @@ namespace llarp::rpc
} }
ctx->AsyncSendAuth( ctx->AsyncSendAuth(
[onGoodResult, reply](service::AuthResult result) { [onGoodResult, reply](service::AuthResult result) {
// TODO: refactor this code. We are 5 lambdas deep here!
if (result.code != service::AuthResultCode::eAuthAccepted) if (result.code != service::AuthResultCode::eAuthAccepted)
{ {
reply(CreateJSONError(result.reason)); reply(CreateJSONError(result.reason));

@ -3,7 +3,6 @@
#include <crypto/crypto.hpp> #include <crypto/crypto.hpp>
#include <crypto/types.hpp> #include <crypto/types.hpp>
#include <util/meta/memfn.hpp> #include <util/meta/memfn.hpp>
#include <util/thread/logic.hpp>
#include <utility> #include <utility>
namespace llarp namespace llarp
@ -11,7 +10,7 @@ namespace llarp
namespace service namespace service
{ {
AsyncKeyExchange::AsyncKeyExchange( AsyncKeyExchange::AsyncKeyExchange(
std::shared_ptr<Logic> l, EventLoop_ptr l,
ServiceInfo r, ServiceInfo r,
const Identity& localident, const Identity& localident,
const PQPubKey& introsetPubKey, const PQPubKey& introsetPubKey,
@ -19,7 +18,7 @@ namespace llarp
IDataHandler* h, IDataHandler* h,
const ConvoTag& t, const ConvoTag& t,
ProtocolType proto) ProtocolType proto)
: logic(std::move(l)) : loop(std::move(l))
, m_remote(std::move(r)) , m_remote(std::move(r))
, m_LocalIdentity(localident) , m_LocalIdentity(localident)
, introPubKey(introsetPubKey) , introPubKey(introsetPubKey)
@ -74,7 +73,7 @@ namespace llarp
self->msg.version = LLARP_PROTO_VERSION; self->msg.version = LLARP_PROTO_VERSION;
// encrypt and sign // encrypt and sign
if (frame->EncryptAndSign(self->msg, K, self->m_LocalIdentity)) if (frame->EncryptAndSign(self->msg, K, self->m_LocalIdentity))
LogicCall(self->logic, std::bind(&AsyncKeyExchange::Result, self, frame)); self->loop->call([self, frame] { AsyncKeyExchange::Result(self, frame); });
else else
{ {
LogError("failed to encrypt and sign"); LogError("failed to encrypt and sign");

@ -7,13 +7,11 @@
namespace llarp namespace llarp
{ {
class Logic;
namespace service namespace service
{ {
struct AsyncKeyExchange : public std::enable_shared_from_this<AsyncKeyExchange> struct AsyncKeyExchange : public std::enable_shared_from_this<AsyncKeyExchange>
{ {
std::shared_ptr<Logic> logic; EventLoop_ptr loop;
SharedSecret sharedKey; SharedSecret sharedKey;
ServiceInfo m_remote; ServiceInfo m_remote;
const Identity& m_LocalIdentity; const Identity& m_LocalIdentity;
@ -26,7 +24,7 @@ namespace llarp
ConvoTag tag; ConvoTag tag;
AsyncKeyExchange( AsyncKeyExchange(
std::shared_ptr<Logic> l, EventLoop_ptr l,
ServiceInfo r, ServiceInfo r,
const Identity& localident, const Identity& localident,
const PQPubKey& introsetPubKey, const PQPubKey& introsetPubKey,

@ -21,7 +21,6 @@
#include <service/hidden_service_address_lookup.hpp> #include <service/hidden_service_address_lookup.hpp>
#include <service/outbound_context.hpp> #include <service/outbound_context.hpp>
#include <service/protocol.hpp> #include <service/protocol.hpp>
#include <util/thread/logic.hpp>
#include <util/str.hpp> #include <util/str.hpp>
#include <util/buffer.hpp> #include <util/buffer.hpp>
#include <util/meta/memfn.hpp> #include <util/meta/memfn.hpp>
@ -719,13 +718,13 @@ namespace llarp
{ {
if (not msg->foundRCs.empty()) if (not msg->foundRCs.empty())
{ {
for (auto rc : msg->foundRCs) for (auto& rc : msg->foundRCs)
{ {
Router()->QueueWork([rc = std::move(rc), logic = Router()->logic(), self = this, msg]() { Router()->QueueWork([this, rc, msg]() mutable {
bool valid = rc.Verify(llarp::time_now_ms()); bool valid = rc.Verify(llarp::time_now_ms());
LogicCall(logic, [self, valid, rc = std::move(rc), msg]() { Router()->loop()->call([this, valid, rc = std::move(rc), msg] {
self->Router()->nodedb()->PutIfNewer(rc); Router()->nodedb()->PutIfNewer(rc);
self->HandleVerifyGotRouter(msg, rc.pubkey, valid); HandleVerifyGotRouter(msg, rc.pubkey, valid);
}); });
}); });
} }
@ -920,8 +919,7 @@ namespace llarp
{ {
if (m_RecvQueue.full() || m_RecvQueue.empty()) if (m_RecvQueue.full() || m_RecvQueue.empty())
{ {
auto self = this; m_router->loop()->call([this] { FlushRecvData(); });
LogicCall(m_router->logic(), [self]() { self->FlushRecvData(); });
} }
m_RecvQueue.pushBack(std::move(ev)); m_RecvQueue.pushBack(std::move(ev));
} }
@ -990,7 +988,7 @@ namespace llarp
} }
else else
{ {
Router()->logic()->Call([h = std::move(hook)] { Router()->loop()->call([h = std::move(hook)] {
h({AuthResultCode::eAuthAccepted, "OK"}); h({AuthResultCode::eAuthAccepted, "OK"});
}); });
} }
@ -1072,7 +1070,7 @@ namespace llarp
RemoveConvoTag(frame.T); RemoveConvoTag(frame.T);
return true; return true;
} }
if (not frame.AsyncDecryptAndVerify(Router()->logic(), p, m_Identity, this)) if (not frame.AsyncDecryptAndVerify(Router()->loop(), p, m_Identity, this))
{ {
// send reset convo tag message // send reset convo tag message
ProtocolFrame f; ProtocolFrame f;
@ -1509,10 +1507,10 @@ namespace llarp
return m_state->m_Router; return m_state->m_Router;
} }
const std::shared_ptr<llarp::Logic>& const EventLoop_ptr&
Endpoint::Logic() Endpoint::Loop()
{ {
return Router()->logic(); return Router()->loop();
} }
void void

@ -18,7 +18,6 @@
#include <service/lookup.hpp> #include <service/lookup.hpp>
#include <hook/ihook.hpp> #include <hook/ihook.hpp>
#include <util/compare_ptr.hpp> #include <util/compare_ptr.hpp>
#include <util/thread/logic.hpp>
#include <service/endpoint_types.hpp> #include <service/endpoint_types.hpp>
#include <service/auth.hpp> #include <service/auth.hpp>
@ -136,10 +135,10 @@ namespace llarp
void void
ResetInternalState() override; ResetInternalState() override;
/// logic (via router) /// loop (via router)
/// use when sending any data on a path /// use when sending any data on a path
const std::shared_ptr<llarp::Logic>& const EventLoop_ptr&
Logic(); Loop();
AbstractRouter* AbstractRouter*
Router(); Router();

@ -3,7 +3,6 @@
#include <path/path.hpp> #include <path/path.hpp>
#include <util/time.hpp> #include <util/time.hpp>
#include <router/abstractrouter.hpp> #include <router/abstractrouter.hpp>
#include <util/thread/logic.hpp>
#include <utility> #include <utility>
namespace llarp namespace llarp
@ -25,8 +24,7 @@ namespace llarp
auto msg = BuildRequestMessage(); auto msg = BuildRequestMessage();
if (!msg) if (!msg)
return false; return false;
endpoint = path->Endpoint(); r->loop()->call([path=std::move(path), msg=std::move(msg), r] { path->SendRoutingMessage(*msg, r); });
LogicCall(r->logic(), [=]() { path->SendRoutingMessage(*msg, r); });
return true; return true;
} }
} // namespace service } // namespace service

@ -202,7 +202,7 @@ namespace llarp
currentConvoTag.Randomize(); currentConvoTag.Randomize();
auto frame = std::make_shared<ProtocolFrame>(); auto frame = std::make_shared<ProtocolFrame>();
auto ex = std::make_shared<AsyncKeyExchange>( auto ex = std::make_shared<AsyncKeyExchange>(
m_Endpoint->Logic(), m_Endpoint->Loop(),
remoteIdent, remoteIdent,
m_Endpoint->GetIdentity(), m_Endpoint->GetIdentity(),
currentIntroSet.K, currentIntroSet.K,
@ -211,12 +211,12 @@ namespace llarp
currentConvoTag, currentConvoTag,
t); t);
ex->hook = std::bind(&OutboundContext::Send, shared_from_this(), std::placeholders::_1, path); ex->hook = [self = shared_from_this(), path](auto frame) { self->Send(std::move(frame), path); };
ex->msg.PutBuffer(payload); ex->msg.PutBuffer(payload);
ex->msg.introReply = path->intro; ex->msg.introReply = path->intro;
frame->F = ex->msg.introReply.pathID; frame->F = ex->msg.introReply.pathID;
m_Endpoint->Router()->QueueWork(std::bind(&AsyncKeyExchange::Encrypt, ex, frame)); m_Endpoint->Router()->QueueWork([ex, frame] { return AsyncKeyExchange::Encrypt(ex, frame); });
} }
std::string std::string
@ -581,7 +581,7 @@ namespace llarp
}; };
} }
const auto& ident = m_Endpoint->GetIdentity(); const auto& ident = m_Endpoint->GetIdentity();
if (not frame.AsyncDecryptAndVerify(m_Endpoint->Logic(), p, ident, m_Endpoint, hook)) if (not frame.AsyncDecryptAndVerify(m_Endpoint->Loop(), p, ident, m_Endpoint, hook))
{ {
// send reset convo tag message // send reset convo tag message
ProtocolFrame f; ProtocolFrame f;

@ -4,7 +4,6 @@
#include <util/buffer.hpp> #include <util/buffer.hpp>
#include <util/mem.hpp> #include <util/mem.hpp>
#include <util/meta/memfn.hpp> #include <util/meta/memfn.hpp>
#include <util/thread/logic.hpp>
#include <service/endpoint.hpp> #include <service/endpoint.hpp>
#include <router/abstractrouter.hpp> #include <router/abstractrouter.hpp>
#include <utility> #include <utility>
@ -271,7 +270,7 @@ namespace llarp
struct AsyncFrameDecrypt struct AsyncFrameDecrypt
{ {
path::Path_ptr path; path::Path_ptr path;
std::shared_ptr<Logic> logic; EventLoop_ptr loop;
std::shared_ptr<ProtocolMessage> msg; std::shared_ptr<ProtocolMessage> msg;
const Identity& m_LocalIdentity; const Identity& m_LocalIdentity;
Endpoint* handler; Endpoint* handler;
@ -279,13 +278,13 @@ namespace llarp
const Introduction fromIntro; const Introduction fromIntro;
AsyncFrameDecrypt( AsyncFrameDecrypt(
std::shared_ptr<Logic> l, EventLoop_ptr l,
const Identity& localIdent, const Identity& localIdent,
Endpoint* h, Endpoint* h,
std::shared_ptr<ProtocolMessage> m, std::shared_ptr<ProtocolMessage> m,
const ProtocolFrame& f, const ProtocolFrame& f,
const Introduction& recvIntro) const Introduction& recvIntro)
: logic(std::move(l)) : loop(std::move(l))
, msg(std::move(m)) , msg(std::move(m))
, m_LocalIdentity(localIdent) , m_LocalIdentity(localIdent)
, handler(h) , handler(h)
@ -403,7 +402,7 @@ namespace llarp
bool bool
ProtocolFrame::AsyncDecryptAndVerify( ProtocolFrame::AsyncDecryptAndVerify(
std::shared_ptr<Logic> logic, EventLoop_ptr loop,
path::Path_ptr recvPath, path::Path_ptr recvPath,
const Identity& localIdent, const Identity& localIdent,
Endpoint* handler, Endpoint* handler,
@ -416,9 +415,9 @@ namespace llarp
LogInfo("Got protocol frame with new convo"); LogInfo("Got protocol frame with new convo");
// we need to dh // we need to dh
auto dh = std::make_shared<AsyncFrameDecrypt>( auto dh = std::make_shared<AsyncFrameDecrypt>(
logic, localIdent, handler, msg, *this, recvPath->intro); loop, localIdent, handler, msg, *this, recvPath->intro);
dh->path = recvPath; dh->path = recvPath;
handler->Router()->QueueWork(std::bind(&AsyncFrameDecrypt::Work, dh)); handler->Router()->QueueWork([dh = std::move(dh)] { return AsyncFrameDecrypt::Work(dh); });
return true; return true;
} }
@ -436,10 +435,10 @@ namespace llarp
return false; return false;
} }
v->frame = *this; v->frame = *this;
auto callback = [logic, hook](std::shared_ptr<ProtocolMessage> msg) { auto callback = [loop, hook](std::shared_ptr<ProtocolMessage> msg) {
if (hook) if (hook)
{ {
LogicCall(logic, [msg, hook]() { hook(msg); }); loop->call([msg, hook]() { hook(msg); });
} }
}; };
handler->Router()->QueueWork( handler->Router()->QueueWork(

@ -20,8 +20,6 @@ struct llarp_threadpool;
namespace llarp namespace llarp
{ {
class Logic;
namespace path namespace path
{ {
/// forward declare /// forward declare
@ -127,7 +125,7 @@ namespace llarp
bool bool
AsyncDecryptAndVerify( AsyncDecryptAndVerify(
std::shared_ptr<Logic> logic, EventLoop_ptr loop,
path::Path_ptr fromPath, path::Path_ptr fromPath,
const Identity& localIdent, const Identity& localIdent,
Endpoint* handler, Endpoint* handler,

@ -3,7 +3,6 @@
#include <router/abstractrouter.hpp> #include <router/abstractrouter.hpp>
#include <routing/path_transfer_message.hpp> #include <routing/path_transfer_message.hpp>
#include <service/endpoint.hpp> #include <service/endpoint.hpp>
#include <util/thread/logic.hpp>
#include <utility> #include <utility>
#include <unordered_set> #include <unordered_set>
@ -29,7 +28,7 @@ namespace llarp
{ {
if (m_SendQueue.empty() or m_SendQueue.full()) if (m_SendQueue.empty() or m_SendQueue.full())
{ {
LogicCall(m_Endpoint->Logic(), [this] { FlushUpstream(); }); m_Endpoint->Loop()->call([this] { FlushUpstream(); });
} }
m_SendQueue.pushBack(std::make_pair( m_SendQueue.pushBack(std::make_pair(
std::make_shared<const routing::PathTransferMessage>(*msg, remoteIntro.pathID), path)); std::make_shared<const routing::PathTransferMessage>(*msg, remoteIntro.pathID), path));

@ -8,9 +8,9 @@ namespace tooling
{} {}
std::shared_ptr<llarp::AbstractRouter> std::shared_ptr<llarp::AbstractRouter>
HiveContext::makeRouter(llarp::EventLoop_ptr netloop, std::shared_ptr<llarp::Logic> logic) HiveContext::makeRouter(llarp::EventLoop_ptr loop)
{ {
return std::make_shared<HiveRouter>(netloop, logic, makeVPNPlatform(), m_hive); return std::make_shared<HiveRouter>(loop, makeVPNPlatform(), m_hive);
} }
HiveRouter* HiveRouter*

@ -12,7 +12,7 @@ namespace tooling
HiveContext(RouterHive* hive); HiveContext(RouterHive* hive);
std::shared_ptr<llarp::AbstractRouter> std::shared_ptr<llarp::AbstractRouter>
makeRouter(llarp::EventLoop_ptr netloop, std::shared_ptr<llarp::Logic> logic) override; makeRouter(llarp::EventLoop_ptr loop) override;
/// Get this context's router as a HiveRouter. /// Get this context's router as a HiveRouter.
/// ///

@ -5,11 +5,10 @@
namespace tooling namespace tooling
{ {
HiveRouter::HiveRouter( HiveRouter::HiveRouter(
llarp::EventLoop_ptr netloop, llarp::EventLoop_ptr loop,
std::shared_ptr<llarp::Logic> logic,
std::shared_ptr<llarp::vpn::Platform> plat, std::shared_ptr<llarp::vpn::Platform> plat,
RouterHive* hive) RouterHive* hive)
: Router(netloop, logic, plat), m_hive(hive) : Router(loop, plat), m_hive(hive)
{} {}
bool bool

@ -11,8 +11,7 @@ namespace tooling
struct HiveRouter : public llarp::Router struct HiveRouter : public llarp::Router
{ {
explicit HiveRouter( explicit HiveRouter(
llarp::EventLoop_ptr netloop, llarp::EventLoop_ptr loop,
std::shared_ptr<llarp::Logic> logic,
std::shared_ptr<llarp::vpn::Platform> vpnPlatform, std::shared_ptr<llarp::vpn::Platform> vpnPlatform,
RouterHive* hive); RouterHive* hive);

@ -2,7 +2,6 @@
#include "llarp.h" #include "llarp.h"
#include "llarp.hpp" #include "llarp.hpp"
#include "util/thread/logic.hpp"
#include "util/str.hpp" #include "util/str.hpp"
#include "router/abstractrouter.hpp" #include "router/abstractrouter.hpp"
@ -73,13 +72,13 @@ namespace tooling
RouterHive::StopRouters() RouterHive::StopRouters()
{ {
llarp::LogInfo("Signalling all routers to stop"); llarp::LogInfo("Signalling all routers to stop");
for (auto [routerId, ctx] : relays) for (auto& [routerId, ctx] : relays)
{ {
LogicCall(ctx->logic, [ctx]() { ctx->HandleSignal(SIGINT); }); ctx->mainloop->call([ctx=ctx]() { ctx->HandleSignal(SIGINT); });
} }
for (auto [routerId, ctx] : clients) for (auto& [routerId, ctx] : clients)
{ {
LogicCall(ctx->logic, [ctx]() { ctx->HandleSignal(SIGINT); }); ctx->mainloop->call([ctx=ctx]() { ctx->HandleSignal(SIGINT); });
} }
llarp::LogInfo("Waiting on routers to be stopped"); llarp::LogInfo("Waiting on routers to be stopped");
@ -149,8 +148,8 @@ namespace tooling
void void
RouterHive::VisitRouter(Context_ptr ctx, std::function<void(Context_ptr)> visit) RouterHive::VisitRouter(Context_ptr ctx, std::function<void(Context_ptr)> visit)
{ {
// TODO: this should be called from each router's appropriate Logic thread, e.g.: // TODO: this should be called from each router's appropriate Loop, e.g.:
// LogicCall(ctx->logic, [visit, ctx]() { visit(ctx); }); // ctx->mainloop->call([visit, ctx]() { visit(ctx); });
// however, this causes visit calls to be deferred // however, this causes visit calls to be deferred
visit(ctx); visit(ctx);
} }
@ -172,18 +171,18 @@ namespace tooling
std::vector<size_t> std::vector<size_t>
RouterHive::RelayConnectedRelays() RouterHive::RelayConnectedRelays()
{ {
std::lock_guard<std::mutex> guard{routerMutex}; std::lock_guard guard{routerMutex};
std::vector<size_t> results; std::vector<size_t> results;
results.resize(relays.size()); results.resize(relays.size());
std::mutex results_lock; std::mutex results_lock;
size_t i = 0; size_t i = 0;
size_t done_count = 0; size_t done_count = 0;
for (auto [routerId, ctx] : relays) for (auto& [routerId, ctx] : relays)
{ {
LogicCall(ctx->logic, [&, i, ctx]() { ctx->mainloop->call([&, i, ctx=ctx]() {
size_t count = ctx->router->NumberOfConnectedRouters(); size_t count = ctx->router->NumberOfConnectedRouters();
std::lock_guard<std::mutex> guard{results_lock}; std::lock_guard guard{results_lock};
results[i] = count; results[i] = count;
done_count++; done_count++;
}); });
@ -194,7 +193,7 @@ namespace tooling
{ {
size_t read_done_count = 0; size_t read_done_count = 0;
{ {
std::lock_guard<std::mutex> guard{results_lock}; std::lock_guard guard{results_lock};
read_done_count = done_count; read_done_count = done_count;
} }
if (read_done_count == relays.size()) if (read_done_count == relays.size())

@ -1,48 +0,0 @@
#include <util/thread/logic.hpp>
#include <util/logging/logger.hpp>
#include <util/mem.h>
#include <future>
namespace llarp
{
void
Logic::stop()
{
llarp::LogDebug("logic thread stop");
}
void
Logic::Call(std::function<void(void)> func)
{
m_Queue(std::move(func));
}
void
Logic::SetQueuer(std::function<void(std::function<void(void)>)> q)
{
m_Queue = std::move(q);
}
void
Logic::call_later(llarp_time_t timeout, std::function<void(void)> func)
{
Call([this, timeout, f = std::move(func)]() mutable {
m_Loop->call_after_delay(timeout, std::move(f));
});
}
void
Logic::set_event_loop(EventLoop* loop)
{
m_Loop = loop;
SetQueuer([loop](std::function<void(void)> work) { loop->call_soon(work); });
}
void
Logic::clear_event_loop()
{
m_Loop = nullptr;
}
} // namespace llarp

@ -1,114 +0,0 @@
#ifndef LLARP_LOGIC_HPP
#define LLARP_LOGIC_HPP
#include <ev/ev.hpp>
#include <util/mem.h>
namespace llarp
{
class Logic
{
public:
/// stop all operation and wait for that to die
void
stop();
void
Call(std::function<void(void)> func);
// Calls the given function once, after the given delay.
void
call_later(llarp_time_t later, std::function<void(void)> func);
// Calls the given function repeatedly, forever, as long as the event loop lasts; the initial
// call will be after the given delay.
void
call_forever(llarp_time_t repeat, std::function<void(void)> func);
// Created a repeated timer, like call_forever(repeat, func), but ties the lifetime of the
// callback to `owner`: callbacks will be invoked so long as `owner` remains alive, but
// thereafter the callback will be destroyed. Intended to be used as:
//
// logic->call_every(100ms, shared_from_this(), [this] { some_func(); });
//
template <typename Callable>
void
call_every(llarp_time_t repeat, std::weak_ptr<void> owner, Callable f)
{
auto repeater = m_Loop->make_repeater();
auto& r = *repeater;
r.start(
repeat,
[repeater = std::move(repeater), owner = std::move(owner), f = std::move(f)]() mutable {
if (auto ptr = owner.lock())
f();
else
repeater.reset(); // Remove timer on destruction (we should be the only thing holder
// the repeater)
});
}
// Wraps a lambda with a lambda that triggers it to be called via Logic::Call()
// when invoked. E.g.:
//
// auto x = logic->make_caller([] (int a) { std::cerr << a; });
// x(42);
// x(99);
//
// will schedule two calls of the inner lambda (with different arguments) in the logic thread.
// Arguments are forwarded to the inner lambda (allowing moving arguments into it).
template <typename Callable>
auto
make_caller(Callable&& f)
{
return [this, f = std::forward<Callable>(f)](auto&&... args) {
// This shared pointer in a pain in the ass but needed because this lambda is going into a
// std::function that only accepts copyable lambdas. I *want* to simply capture:
// args=std::make_tuple(std::forward<decltype(args)>(args)...)
// but that fails if any given args aren't copyable. Dammit.
auto args_tuple_ptr = std::make_shared<std::tuple<std::decay_t<decltype(args)>...>>(
std::forward<decltype(args)>(args)...);
Call([f, args = std::move(args_tuple_ptr)]() mutable {
// Moving away the tuple args here is okay because this lambda will only be invoked once
std::apply(f, std::move(*args));
});
};
}
void
SetQueuer(std::function<void(std::function<void(void)>)> q);
EventLoop*
event_loop()
{
return m_Loop;
}
void
set_event_loop(EventLoop* loop);
void
clear_event_loop();
bool
inLogicThread() const
{
return m_Loop and m_Loop->inEventLoopThread();
}
private:
EventLoop* m_Loop = nullptr;
std::function<void(std::function<void(void)>)> m_Queue;
};
} // namespace llarp
/// this used to be a macro
template <typename Logic_ptr, typename Func_t>
static bool
LogicCall(const Logic_ptr& logic, Func_t func)
{
logic->Call(std::move(func));
return true;
}
#endif

@ -20,16 +20,14 @@ namespace iwp = llarp::iwp;
namespace util = llarp::util; namespace util = llarp::util;
/// make an iwp link /// make an iwp link
template <bool inbound, typename... Args_t> template <bool inbound, typename... Args>
static llarp::LinkLayer_ptr static llarp::LinkLayer_ptr
make_link(Args_t... args) make_link(Args&&... args)
{ {
if (inbound) if (inbound)
return iwp::NewInboundLink(args...); return iwp::NewInboundLink(std::forward<Args>(args)...);
else return iwp::NewOutboundLink(std::forward<Args>(args)...);
return iwp::NewOutboundLink(args...);
} }
using Logic_ptr = std::shared_ptr<llarp::Logic>;
/// a single iwp link with associated keys and members to make unit tests work /// a single iwp link with associated keys and members to make unit tests work
struct IWPLinkContext struct IWPLinkContext
@ -135,7 +133,7 @@ using Context_ptr = std::shared_ptr<IWPLinkContext>;
/// call take 2 parameters, test and a timeout /// call take 2 parameters, test and a timeout
/// ///
/// test is a callable that takes 5 arguments: /// test is a callable that takes 5 arguments:
/// 0) std::function<Logic_ptr(void)> that starts the iwp links and gives a logic to call with /// 0) std::function<EventLoop_ptr(void)> that starts the iwp links and gives an event loop to call with
/// 1) std::function<void(void)> that ends the unit test if we are done /// 1) std::function<void(void)> that ends the unit test if we are done
/// 2) std::function<void(void)> that ends the unit test right now as a success /// 2) std::function<void(void)> that ends the unit test right now as a success
/// 3) client iwp link context (shared_ptr) /// 3) client iwp link context (shared_ptr)
@ -150,10 +148,7 @@ RunIWPTest(Func_t test, Duration_t timeout = 10s)
// shut up logs // shut up logs
llarp::LogSilencer shutup; llarp::LogSilencer shutup;
// set up event loop // set up event loop
auto logic = std::make_shared<llarp::Logic>();
auto loop = llarp::EventLoop::create(); auto loop = llarp::EventLoop::create();
loop->set_logic(logic);
logic->set_event_loop(loop.get());
llarp::LogContext::Instance().Initialize( llarp::LogContext::Instance().Initialize(
llarp::eLogDebug, llarp::LogType::File, "stdout", "unit test", [loop](auto work) { llarp::eLogDebug, llarp::LogType::File, "stdout", "unit test", [loop](auto work) {
@ -174,33 +169,32 @@ RunIWPTest(Func_t test, Duration_t timeout = 10s)
auto recipiant = std::make_shared<IWPLinkContext>("127.0.0.1:3002", loop); auto recipiant = std::make_shared<IWPLinkContext>("127.0.0.1:3002", loop);
// function for ending unit test on success // function for ending unit test on success
auto endIfDone = [initiator, recipiant, loop, logic]() { auto endIfDone = [initiator, recipiant, loop]() {
if (initiator->gucci and recipiant->gucci) if (initiator->gucci and recipiant->gucci)
{ {
LogicCall(logic, [loop] { loop->stop(); }); loop->stop();
} }
}; };
// function to start test and give logic to unit test // function to start test and give loop to unit test
auto start = [initiator, recipiant, logic]() { auto start = [initiator, recipiant, loop]() {
REQUIRE(initiator->link->Start(logic)); REQUIRE(initiator->link->Start(loop));
REQUIRE(recipiant->link->Start(logic)); REQUIRE(recipiant->link->Start(loop));
return logic; return loop;
}; };
// function to end test immediately // function to end test immediately
auto endTest = [logic, loop]() { LogicCall(logic, [loop] { loop->stop(); }); }; auto endTest = [loop] { loop->stop(); };
loop->call_after_delay( loop->call_later(timeout, [] { FAIL("test timeout"); });
std::chrono::duration_cast<llarp_time_t>(timeout), []() { FAIL("test timeout"); });
test(start, endIfDone, endTest, initiator, recipiant); test(start, endIfDone, endTest, initiator, recipiant);
loop->run(*logic); loop->run();
llarp::RouterContact::BlockBogons = oldBlockBogons; llarp::RouterContact::BlockBogons = oldBlockBogons;
} }
/// ensure clients can connect to relays /// ensure clients can connect to relays
TEST_CASE("IWP handshake", "[iwp]") TEST_CASE("IWP handshake", "[iwp]")
{ {
RunIWPTest([](std::function<Logic_ptr(void)> start, RunIWPTest([](std::function<llarp::EventLoop_ptr(void)> start,
std::function<void(void)> endIfDone, std::function<void(void)> endIfDone,
[[maybe_unused]] std::function<void(void)> endTestNow, [[maybe_unused]] std::function<void(void)> endTestNow,
Context_ptr alice, Context_ptr alice,
@ -218,16 +212,16 @@ TEST_CASE("IWP handshake", "[iwp]")
endIfDone(); endIfDone();
}); });
// start unit test // start unit test
auto logic = start(); auto loop = start();
// try establishing a session // try establishing a session
LogicCall(logic, [link = alice->link, rc = bob->rc]() { REQUIRE(link->TryEstablishTo(rc)); }); loop->call([link = alice->link, rc = bob->rc]() { REQUIRE(link->TryEstablishTo(rc)); });
}); });
} }
/// ensure relays cannot connect to clients /// ensure relays cannot connect to clients
TEST_CASE("IWP handshake reverse", "[iwp]") TEST_CASE("IWP handshake reverse", "[iwp]")
{ {
RunIWPTest([](std::function<Logic_ptr(void)> start, RunIWPTest([](std::function<llarp::EventLoop_ptr(void)> start,
[[maybe_unused]] std::function<void(void)> endIfDone, [[maybe_unused]] std::function<void(void)> endIfDone,
std::function<void(void)> endTestNow, std::function<void(void)> endTestNow,
Context_ptr alice, Context_ptr alice,
@ -235,9 +229,9 @@ TEST_CASE("IWP handshake reverse", "[iwp]")
alice->InitLink<false>([](auto) {}); alice->InitLink<false>([](auto) {});
bob->InitLink<true>([](auto) {}); bob->InitLink<true>([](auto) {});
// start unit test // start unit test
auto logic = start(); auto loop = start();
// try establishing a session in the wrong direction // try establishing a session in the wrong direction
LogicCall(logic, [logic, link = bob->link, rc = alice->rc, endTestNow]() { loop->call([link = bob->link, rc = alice->rc, endTestNow] {
REQUIRE(not link->TryEstablishTo(rc)); REQUIRE(not link->TryEstablishTo(rc));
endTestNow(); endTestNow();
}); });
@ -249,7 +243,7 @@ TEST_CASE("IWP send messages", "[iwp]")
{ {
int aliceNumSent = 0; int aliceNumSent = 0;
int bobNumSent = 0; int bobNumSent = 0;
RunIWPTest([&aliceNumSent, &bobNumSent](std::function<Logic_ptr(void)> start, RunIWPTest([&aliceNumSent, &bobNumSent](std::function<llarp::EventLoop_ptr(void)> start,
std::function<void(void)> endIfDone, std::function<void(void)> endIfDone,
std::function<void(void)> endTestNow, std::function<void(void)> endTestNow,
Context_ptr alice, Context_ptr alice,
@ -309,9 +303,9 @@ TEST_CASE("IWP send messages", "[iwp]")
} }
}); });
// start unit test // start unit test
auto logic = start(); auto loop = start();
// try establishing a session from alice to bob // try establishing a session from alice to bob
LogicCall(logic, [logic, link = alice->link, rc = bob->rc, endTestNow]() { loop->call([link = alice->link, rc = bob->rc, endTestNow]() {
REQUIRE(link->TryEstablishTo(rc)); REQUIRE(link->TryEstablishTo(rc));
}); });
}); });

Loading…
Cancel
Save