diff --git a/daemon/main.c b/daemon/main.c index d2d4bb36b..9a7562b30 100644 --- a/daemon/main.c +++ b/daemon/main.c @@ -6,6 +6,7 @@ struct llarp_main { struct llarp_alloc mem; struct llarp_router *router; struct llarp_threadpool *worker; + struct llarp_threadpool *netio; struct llarp_logic *logic; struct llarp_config *config; struct llarp_nodedb *nodedb; @@ -47,6 +48,13 @@ int shutdown_llarp(struct llarp_main *m) { progress(); if(m->mainloop) llarp_ev_loop_stop(m->mainloop); + + progress(); + if(m->netio) + { + llarp_threadpool_stop(m->netio); + llarp_threadpool_join(m->netio); + } progress(); if(m->worker) @@ -56,7 +64,7 @@ int shutdown_llarp(struct llarp_main *m) { if(m->worker) llarp_threadpool_join(m->worker); - + progress(); if (m->logic) llarp_logic_stop(m->logic); @@ -90,10 +98,17 @@ struct llarp_main llarp = { 0, 0, 0, + 0, {0}, 1 }; +void run_netio(void * user) +{ + struct llarp_ev_loop * loop = user; + llarp_ev_loop_run(loop); +} + int main(int argc, char *argv[]) { const char *conffname = "daemon.ini"; if (argc > 1) conffname = argv[1]; @@ -116,6 +131,8 @@ int main(int argc, char *argv[]) { if (llarp_nodedb_ensure_dir(dir)) { // ensure worker thread pool if (!llarp.worker) llarp.worker = llarp_init_threadpool(2); + // ensire net io thread + llarp.netio = llarp_init_threadpool(1); llarp.router = llarp_init_router(mem, llarp.worker, llarp.mainloop); @@ -123,12 +140,16 @@ int main(int argc, char *argv[]) { llarp.logic = llarp_init_logic(mem); printf("starting router\n"); - llarp_run_router(llarp.router, llarp.logic); - - printf("running mainloop\n"); + // run io loop + struct llarp_thread_job netjob = { + .user = llarp.mainloop, + .work = &run_netio + }; + llarp_threadpool_queue_job(llarp.netio, netjob); + printf("running\n"); llarp.exitcode = 0; - llarp_ev_loop_run(llarp.mainloop); + llarp_logic_mainloop(llarp.logic); } else printf("Failed to configure router\n"); } else diff --git a/llarp/iwp_link.cpp b/llarp/iwp_link.cpp index 3bd2d751a..fe08de920 100644 --- a/llarp/iwp_link.cpp +++ b/llarp/iwp_link.cpp @@ -171,6 +171,7 @@ struct server void cleanup_dead() { // todo: implement + printf("cleanup dead\n"); } bool ensure_privkey() diff --git a/llarp/timer.cpp b/llarp/timer.cpp index 0231fde39..d7a4874a0 100644 --- a/llarp/timer.cpp +++ b/llarp/timer.cpp @@ -12,34 +12,41 @@ struct timer { .count(); } + llarp_timer_context * parent; void* user; uint64_t started; uint64_t timeout; llarp_timer_handler_func func; + uint32_t id; - timer(uint64_t ms = 0, void* _user = nullptr, - llarp_timer_handler_func _func = nullptr) - : user(_user), started(now()), timeout(ms), func(_func) {} + timer(llarp_timer_context * ctx=nullptr, uint64_t ms = 0, void* _user = nullptr, + llarp_timer_handler_func _func = nullptr, uint32_t _id=0) + : parent(ctx), user(_user), started(now()), timeout(ms), func(_func), id(_id) {} - void operator()() { - if (func) { - auto ms = now(); - auto diff = ms - started; - if (diff >= timeout) - func(user, timeout, 0); - else - func(user, timeout, diff); - } + + void exec(); + + + static void call(void * user) + { + static_cast(user)->exec(); } + + operator llarp_thread_job () + { + return {this, timer::call}; + } + }; }; // namespace llarp struct llarp_timer_context { + llarp_threadpool * threadpool; std::mutex timersMutex; std::map timers; std::mutex tickerMutex; std::condition_variable ticker; - std::chrono::milliseconds nextTickLen = std::chrono::seconds(1); + std::chrono::milliseconds nextTickLen = std::chrono::milliseconds(10); uint32_t ids = 0; std::atomic _run = true; @@ -54,18 +61,23 @@ struct llarp_timer_context { auto itr = timers.find(id); if (itr != timers.end()) { - itr->second(); - timers.erase(id); + itr->second.exec(); } if (lock) delete lock; } + void remove(uint32_t id) + { + std::unique_lock lock (timersMutex); + timers.erase(id); + } + uint32_t call_later(void* user, llarp_timer_handler_func func, uint64_t timeout_ms) { std::unique_lock lock(timersMutex); uint32_t id = ++ids; - timers[id] = llarp::timer(timeout_ms); + timers[id] = llarp::timer(this, timeout_ms, user, func, id); return id; } @@ -111,12 +123,39 @@ void llarp_timer_cancel(struct llarp_timer_context* t, uint32_t id) { void llarp_timer_run(struct llarp_timer_context* t, struct llarp_threadpool* pool) { - std::unique_lock lock(t->tickerMutex); + t->threadpool = pool; while (t->run()) { - auto status = t->ticker.wait_for(lock, t->nextTickLen); - if (status == std::cv_status::no_timeout) { - // we woke up + std::unique_lock lock(t->tickerMutex); + t->ticker.wait_for(lock, t->nextTickLen); + // we woke up + auto now = llarp::timer::now(); + auto itr = t->timers.begin(); + while (itr != t->timers.end()) + { + if(now - itr->second.started >= itr->second.timeout) + { + // timer hit + llarp_threadpool_queue_job(pool, itr->second); + } + ++itr; } } } } + +namespace llarp +{ + void timer::exec() + { + if (func) { + auto ms = now(); + auto diff = ms - started; + if (diff >= timeout) + func(user, timeout, 0); + else + func(user, timeout, diff); + } + if(parent) + parent->remove(id); + } +}