implement timers using libuv

So far only a bit of the code using timers has been modified to use
the new libuv-based timers.  Also only the non-Windows case has been
implemented.  Seems to be working though, so it's a good time to commit.
pull/990/head
Thomas Winget 5 years ago
parent d532c4784e
commit 71bb0dd520

@ -202,6 +202,7 @@ __ ___ ____ _ _ ___ _ _ ____
llarp::LogInfo(llarp::VERSION_FULL, " ", llarp::RELEASE_MOTTO);
llarp::LogInfo("starting up");
mainloop = llarp_make_ev_loop();
logic->set_event_loop(mainloop.get());
mainloop->set_logic(logic);

@ -38,6 +38,7 @@ llarp_ev_loop_run_single_process(llarp_ev_loop_ptr ev,
}
llarp::LogContext::Instance().logStream->Tick(ev->time_now());
}
logic->clear_event_loop();
ev->stopped();
}

@ -669,8 +669,7 @@ namespace llarp
#ifdef _WIN32
struct llarp_fd_promise
{
void
Set(std::pair< int, int >)
void Set(std::pair< int, int >)
{
}
@ -741,6 +740,13 @@ struct llarp_ev_loop
virtual int
tick(int ms) = 0;
virtual uint32_t
call_after_delay(llarp_time_t delay_ms,
std::function< void(void) > callback) = 0;
virtual void
cancel_delayed_call(uint32_t call_id) = 0;
virtual bool
add_ticker(std::function< void(void) > ticker) = 0;

@ -793,6 +793,19 @@ namespace libuv
}
};
#endif
static void
OnAsyncWake(uv_async_t* async_handle)
{
Loop* loop = static_cast< Loop* >(async_handle->data);
loop->process_timer_queue();
loop->process_cancel_queue();
}
Loop::Loop() : llarp_ev_loop(), m_LogicCalls(1024), m_timerQueue(20), m_timerCancelQueue(20)
{
}
bool
Loop::init()
{
@ -809,7 +822,7 @@ namespace libuv
#else
uv_loop_configure(&m_Impl, UV_LOOP_BLOCK_SIGNAL, SIGPIPE);
#endif
m_TickTimer.data = this;
m_TickTimer->data = this;
m_LogicCaller.data = this;
uv_async_init(&m_Impl, &m_LogicCaller, [](uv_async_t* h) {
Loop* l = static_cast< Loop* >(h->data);
@ -819,8 +832,14 @@ namespace libuv
f();
}
});
m_TickTimer = new uv_timer_t;
m_TickTimer->data = this;
m_Run.store(true);
return uv_timer_init(&m_Impl, &m_TickTimer) != -1;
m_nextID.store(0);
m_WakeUp.data = this;
uv_async_init(&m_Impl, &m_WakeUp, &OnAsyncWake);
return uv_timer_init(&m_Impl, m_TickTimer) != -1;
}
void
@ -859,12 +878,107 @@ namespace libuv
{
if(m_Run)
{
uv_timer_start(&m_TickTimer, &OnTickTimeout, ms, 0);
uv_timer_start(m_TickTimer, &OnTickTimeout, ms, 0);
uv_run(&m_Impl, UV_RUN_ONCE);
}
return 0;
}
struct TimerData
{
Loop* loop;
uint64_t job_id;
};
void
CloseUVTimer(uv_timer_t* timer)
{
// have to delete timer handle this way because libuv.
uv_timer_stop(timer);
uv_close((uv_handle_t*)timer,
[](uv_handle_t* handle) { delete(uv_timer_t*)handle; });
}
static void
OnUVTimer(uv_timer_t* timer)
{
TimerData* timer_data = static_cast< TimerData* >(timer->data);
Loop* loop = timer_data->loop;
loop->do_timer_job(timer_data->job_id);
delete timer_data;
CloseUVTimer(timer);
}
uint32_t
Loop::call_after_delay(llarp_time_t delay_ms,
std::function< void(void) > callback)
{
PendingTimer timer;
timer.delay_ms = delay_ms;
timer.callback = callback;
timer.job_id = m_nextID++;
uint64_t job_id = timer.job_id;
m_timerQueue.pushBack(std::move(timer));
uv_async_send(&m_WakeUp);
return job_id;
}
void
Loop::cancel_delayed_call(uint32_t job_id)
{
m_timerCancelQueue.pushBack(job_id);
uv_async_send(&m_WakeUp);
}
void
Loop::process_timer_queue()
{
while(not m_timerQueue.empty())
{
PendingTimer job = m_timerQueue.popFront();
uint64_t job_id = job.job_id;
m_pendingCalls.emplace(job_id, std::move(job.callback));
TimerData* timer_data = new TimerData;
timer_data->loop = this;
timer_data->job_id = job_id;
uv_timer_t* newTimer = new uv_timer_t;
newTimer->data = (void*)timer_data;
uv_timer_init(&m_Impl, newTimer);
uv_timer_start(newTimer, &OnUVTimer, job.delay_ms, 0);
}
}
void
Loop::process_cancel_queue()
{
while(not m_timerCancelQueue.empty())
{
uint64_t job_id = m_timerCancelQueue.popFront();
auto itr = m_pendingCalls.find(job_id);
if(itr != m_pendingCalls.end())
{
m_pendingCalls.erase(itr);
}
}
}
void
Loop::do_timer_job(uint64_t job_id)
{
auto itr = m_pendingCalls.find(job_id);
if(itr != m_pendingCalls.end())
{
LogicCall(m_Logic, itr->second);
m_pendingCalls.erase(itr);
}
}
void
Loop::stop()
{
@ -886,18 +1000,18 @@ namespace libuv
[](uv_handle_t* h, void*) {
if(uv_is_closing(h))
return;
if(h->data && uv_is_active(h))
if(h->data && uv_is_active(h) && h->type != UV_TIMER)
{
static_cast< glue* >(h->data)->Close();
}
else if(h->type == UV_TIMER)
{
CloseUVTimer((uv_timer_t*)h);
}
},
nullptr);
}
Loop::Loop() : llarp_ev_loop(), m_LogicCalls(1024)
{
}
void
Loop::stopped()
{

@ -9,10 +9,21 @@
#include <util/thread/queue.hpp>
#include <util/meta/memfn.hpp>
#include <map>
namespace libuv
{
struct Loop final : public llarp_ev_loop
{
typedef std::function< void(void) > Callback;
struct PendingTimer
{
uint64_t job_id;
llarp_time_t delay_ms;
Callback callback;
};
Loop();
bool
@ -37,6 +48,22 @@ namespace libuv
int
tick(int ms) override;
uint32_t
call_after_delay(llarp_time_t delay_ms,
std::function< void(void) > callback) override;
void
cancel_delayed_call(uint32_t job_id) override;
void
process_timer_queue();
void
process_cancel_queue();
void
do_timer_job(uint64_t job_id);
void
stop() override;
@ -104,7 +131,8 @@ namespace libuv
private:
uv_loop_t m_Impl;
uv_timer_t m_TickTimer;
uv_timer_t* m_TickTimer;
uv_async_t m_WakeUp;
std::atomic< bool > m_Run;
uv_async_t m_LogicCaller;
using AtomicQueue_t = llarp::thread::Queue< std::function< void(void) > >;
@ -114,6 +142,12 @@ namespace libuv
uint64_t last_time;
uint64_t loop_run_count;
#endif
std::atomic< uint32_t > m_nextID;
std::map< uint32_t, Callback > m_pendingCalls;
llarp::thread::Queue< PendingTimer > m_timerQueue;
llarp::thread::Queue< uint32_t > m_timerCancelQueue;
};
} // namespace libuv

@ -199,15 +199,15 @@ namespace llarp
}
/// override me in subtype
virtual bool
HandleGotIntroMessage(std::shared_ptr< const dht::GotIntroMessage >)
virtual bool HandleGotIntroMessage(
std::shared_ptr< const dht::GotIntroMessage >)
{
return false;
}
/// override me in subtype
virtual bool
HandleGotRouterMessage(std::shared_ptr< const dht::GotRouterMessage >)
virtual bool HandleGotRouterMessage(
std::shared_ptr< const dht::GotRouterMessage >)
{
return false;
}

@ -32,9 +32,8 @@
#if defined(ANDROID) || defined(IOS)
#include <unistd.h>
#endif
#if defined(WITH_SYSTEMD)
#include <systemd/sd-daemon.h>
#endif
constexpr uint64_t ROUTER_TICK_INTERVAL_MS = 1000;
namespace llarp
{
@ -284,14 +283,11 @@ namespace llarp
}
void
Router::handle_router_ticker(void *user, uint64_t orig, uint64_t left)
Router::handle_router_ticker()
{
if(left)
return;
auto *self = static_cast< Router * >(user);
self->ticker_job_id = 0;
self->Tick();
self->ScheduleTicker(orig);
ticker_job_id = 0;
LogicCall(logic(), std::bind(&Router::Tick, this));
ScheduleTicker(ROUTER_TICK_INTERVAL_MS);
}
bool
@ -765,7 +761,8 @@ namespace llarp
void
Router::ScheduleTicker(uint64_t ms)
{
ticker_job_id = _logic->call_later({ms, this, &handle_router_ticker});
ticker_job_id =
_logic->call_later(ms, std::bind(&Router::handle_router_ticker, this));
}
void
@ -1036,7 +1033,7 @@ namespace llarp
_netloop->add_ticker(std::bind(&Router::PumpLL, this));
ScheduleTicker(1000);
ScheduleTicker(ROUTER_TICK_INTERVAL_MS);
_running.store(true);
_startedAt = Now();
#if defined(WITH_SYSTEMD)
@ -1061,20 +1058,18 @@ namespace llarp
return 0;
}
static void
RouterAfterStopLinks(void *u, uint64_t, uint64_t)
void
Router::AfterStopLinks()
{
auto *self = static_cast< Router * >(u);
self->Close();
Close();
}
static void
RouterAfterStopIssued(void *u, uint64_t, uint64_t)
void
Router::AfterStopIssued()
{
auto *self = static_cast< Router * >(u);
self->StopLinks();
self->nodedb()->AsyncFlushToDisk();
self->_logic->call_later({200, self, &RouterAfterStopLinks});
StopLinks();
nodedb()->AsyncFlushToDisk();
_logic->call_later(200, std::bind(&Router::AfterStopLinks, this));
}
void
@ -1100,7 +1095,7 @@ namespace llarp
rpcServer->Stop();
paths.PumpUpstream();
_linkManager.PumpLinks();
_logic->call_later({200, this, &RouterAfterStopIssued});
_logic->call_later(200, std::bind(&Router::AfterStopIssued, this));
}
bool

@ -466,8 +466,14 @@ namespace llarp
bool
HasSessionTo(const RouterID &remote) const override;
static void
handle_router_ticker(void *user, uint64_t orig, uint64_t left);
void
handle_router_ticker();
void
AfterStopLinks();
void
AfterStopIssued();
private:
std::atomic< bool > _stopping;

@ -65,7 +65,8 @@ namespace llarp
return m_CachedAddr.ToString();
}
bool ServiceInfo::CalculateAddress(std::array< byte_t, 32 >& data) const
bool
ServiceInfo::CalculateAddress(std::array< byte_t, 32 >& data) const
{
std::array< byte_t, 256 > tmp;
llarp_buffer_t buf(tmp);

@ -97,7 +97,8 @@ namespace llarp
}
/// calculate our address
bool CalculateAddress(std::array< byte_t, 32 >& data) const;
bool
CalculateAddress(std::array< byte_t, 32 >& data) const;
bool
BDecode(llarp_buffer_t* buf)

@ -139,10 +139,15 @@ namespace llarp
m_Queue([self = this]() { self->m_ID = std::this_thread::get_id(); });
}
void
uint32_t
Logic::call_later(llarp_time_t timeout, std::function< void(void) > func)
{
llarp_timer_call_func_later(m_Timer, timeout, func);
auto loop = m_Loop;
if(loop != nullptr)
{
return loop->call_after_delay(timeout, func);
}
return 0;
}
uint32_t
@ -159,12 +164,22 @@ namespace llarp
Logic::cancel_call(uint32_t id)
{
llarp_timer_cancel_job(m_Timer, id);
auto loop = m_Loop;
if(loop != nullptr)
{
loop->cancel_delayed_call(id);
}
}
void
Logic::remove_call(uint32_t id)
{
llarp_timer_remove_job(m_Timer, id);
auto loop = m_Loop;
if(loop != nullptr)
{
loop->cancel_delayed_call(id);
}
}
bool
@ -173,4 +188,16 @@ namespace llarp
return m_ID.value() == std::this_thread::get_id();
}
void
Logic::set_event_loop(llarp_ev_loop* loop)
{
m_Loop = loop;
}
void
Logic::clear_event_loop()
{
m_Loop = nullptr;
}
} // namespace llarp

@ -1,6 +1,7 @@
#ifndef LLARP_LOGIC_HPP
#define LLARP_LOGIC_HPP
#include <ev/ev.hpp>
#include <util/mem.h>
#include <util/thread/threadpool.h>
#include <util/thread/timer.hpp>
@ -33,7 +34,7 @@ namespace llarp
uint32_t
call_later(const llarp_timeout_job& job);
void
uint32_t
call_later(llarp_time_t later, std::function< void(void) > func);
void
@ -51,9 +52,16 @@ namespace llarp
void
SetQueuer(std::function< void(std::function< void(void) >) > q);
void
set_event_loop(llarp_ev_loop* loop);
void
clear_event_loop();
private:
using ID_t = std::thread::id;
llarp_threadpool* const m_Thread;
llarp_ev_loop* m_Loop;
llarp_timer_context* const m_Timer;
absl::optional< ID_t > m_ID;
util::ContentionKiller m_Killer;

@ -280,28 +280,6 @@ llarp_timer_tick_all_async(struct llarp_timer_context* t,
llarp_threadpool_queue_job(pool, std::bind(&llarp_timer_tick_all, t));
}
void
llarp_timer_run(struct llarp_timer_context* t, struct llarp_threadpool* pool)
{
t->ticker = std::make_unique< llarp::util::Condition >();
while(t->run())
{
// wait for timer mutex
if(t->ticker)
{
llarp::util::Lock lock(&t->tickerMutex);
t->ticker->WaitWithTimeout(&t->tickerMutex, t->nextTickLen);
}
if(t->run())
{
llarp::util::Lock lock(&t->timersMutex);
// we woke up
llarp_timer_tick_all_async(t, pool, llarp::time_now_ms());
}
}
}
namespace llarp
{
void

@ -49,10 +49,6 @@ llarp_timer_stop(struct llarp_timer_context *t);
void
llarp_timer_set_time(struct llarp_timer_context *t, llarp_time_t now);
// blocking run timer and send events to thread pool
void
llarp_timer_run(struct llarp_timer_context *t, struct llarp_threadpool *pool);
/// single threaded run timer, tick all timers
void
llarp_timer_tick_all(struct llarp_timer_context *t);

Loading…
Cancel
Save