make logic and net thread one in the same

pull/960/head
Jeff Becker 5 years ago committed by Stephen Shelton
parent 581306e35f
commit cec36b62b5

@ -76,7 +76,6 @@ run_main_context(std::string conffname, llarp_main_runtime_opts opts)
llarp::util::SetThreadName("llarp-mainloop");
if(code == 0)
code = llarp_main_run(ctx, opts);
llarp_main_free(ctx);
}
exit_code.set_value(code);
}
@ -311,5 +310,9 @@ main(int argc, char *argv[])
#ifdef _WIN32
::WSACleanup();
#endif
if(ctx)
{
llarp_main_free(ctx);
}
return code;
}

@ -815,6 +815,9 @@ struct llarp_ev_loop
}
}
}
virtual void
call_soon(std::function< void(void) > f) = 0;
};
struct PacketBuffer

@ -802,7 +802,19 @@ namespace libuv
m_Impl.data = this;
uv_loop_configure(&m_Impl, UV_LOOP_BLOCK_SIGNAL, SIGPIPE);
m_TickTimer.data = this;
m_TickTimer.data = this;
m_LogicCaller.data = this;
uv_async_init(&m_Impl, &m_LogicCaller, [](uv_async_t* h) {
Loop* l = static_cast< Loop* >(h->data);
Queue_t* jobs = l->m_LogicCalls.load();
l->m_LogicCalls.store(new Queue_t());
while(not jobs->empty())
{
jobs->front()();
jobs->pop_front();
}
delete jobs;
});
m_Run.store(true);
return uv_timer_init(&m_Impl, &m_TickTimer) != -1;
}
@ -878,6 +890,11 @@ namespace libuv
nullptr);
}
Loop::Loop() : llarp_ev_loop()
{
m_LogicCalls.store(new Queue_t());
}
void
Loop::stopped()
{
@ -957,6 +974,13 @@ namespace libuv
return false;
}
void
Loop::call_soon(std::function< void(void) > f)
{
m_LogicCalls.load()->emplace_back(f);
uv_async_send(&m_LogicCaller);
}
} // namespace libuv
bool

@ -6,11 +6,14 @@
#include <vector>
#include <functional>
#include <util/thread/logic.hpp>
#include <util/meta/memfn.hpp>
namespace libuv
{
struct Loop final : public llarp_ev_loop
{
Loop();
bool
init() override;
@ -90,14 +93,22 @@ namespace libuv
set_logic(std::shared_ptr< llarp::Logic > l) override
{
m_Logic = l;
m_Logic->SetQueuer(llarp::util::memFn(&Loop::call_soon, this));
}
std::shared_ptr< llarp::Logic > m_Logic;
void
call_soon(std::function< void(void) > f) override;
private:
uv_loop_t m_Impl;
uv_timer_t m_TickTimer;
std::atomic< bool > m_Run;
uv_async_t m_LogicCaller;
using Queue_t = std::deque< std::function< void(void) > >;
using AtomicQueue_t = std::atomic< Queue_t* >;
AtomicQueue_t m_LogicCalls;
#ifdef LOKINET_DEBUG
uint64_t last_time;

@ -63,6 +63,10 @@ namespace llarp
SetLogLevel(LogLevel lvl)
{
LogContext::Instance().curLevel = lvl;
if(lvl == eLogDebug)
{
LogContext::Instance().runtimeLevel = lvl;
}
}
} // namespace llarp

@ -94,6 +94,11 @@ namespace llarp
f();
return true;
}
if(m_Queue)
{
m_Queue(f);
return true;
}
if(m_Thread->LooksFull(5))
{
LogErrorExplicit(TAG, LINE,
@ -113,6 +118,13 @@ namespace llarp
#undef METRIC
}
void
Logic::SetQueuer(std::function< void(std::function< void(void) >) > q)
{
m_Queue = q;
m_Queue([self = this]() { self->m_ID = std::this_thread::get_id(); });
}
void
Logic::call_later(llarp_time_t timeout, std::function< void(void) > func)
{

@ -48,12 +48,16 @@ namespace llarp
bool
can_flush() const;
void
SetQueuer(std::function< void(std::function< void(void) >) > q);
private:
using ID_t = std::thread::id;
llarp_threadpool* const m_Thread;
llarp_timer_context* const m_Timer;
absl::optional< ID_t > m_ID;
util::ContentionKiller m_Killer;
std::function< void(std::function< void(void) >) > m_Queue;
};
} // namespace llarp

Loading…
Cancel
Save