fix threading and stopping

pull/1/head
Jeff Becker 6 years ago
parent e695906c0c
commit 4939c7a5e3
No known key found for this signature in database
GPG Key ID: F357B3B42F6F9B05

@ -1,12 +1,13 @@
#include <llarp.h>
#include <stdio.h>
#include <signal.h>
#include <string.h>
struct llarp_main {
struct llarp_alloc mem;
struct llarp_router *router;
struct llarp_threadpool *worker;
struct llarp_threadpool *netio;
struct llarp_threadpool *thread;
struct llarp_logic *logic;
struct llarp_config *config;
struct llarp_nodedb *nodedb;
@ -41,21 +42,11 @@ static void progress() {
int shutdown_llarp(struct llarp_main *m) {
printf("Shutting down ");
progress();
if(m->router)
llarp_stop_router(m->router);
progress();
if(m->mainloop)
llarp_ev_loop_stop(m->mainloop);
progress();
if(m->netio)
{
llarp_threadpool_stop(m->netio);
llarp_threadpool_join(m->netio);
}
progress();
if(m->worker)
llarp_threadpool_stop(m->worker);
@ -66,7 +57,14 @@ int shutdown_llarp(struct llarp_main *m) {
llarp_threadpool_join(m->worker);
progress();
if (m->logic) llarp_logic_stop(m->logic);
if (m->logic)
{
llarp_logic_stop(m->logic);
}
progress();
if(m->router)
llarp_stop_router(m->router);
progress();
llarp_free_router(&m->router);
@ -103,10 +101,18 @@ struct llarp_main llarp = {
1
};
void run_netio(void * user)
void run_net(void * user)
{
llarp_ev_loop_run(user);
}
void handle_signal(int sig)
{
struct llarp_ev_loop * loop = user;
llarp_ev_loop_run(loop);
printf("interrupted\n");
llarp_ev_loop_stop(llarp.mainloop);
llarp_logic_stop(llarp.logic);
printf("closing...");
}
int main(int argc, char *argv[]) {
@ -131,22 +137,23 @@ int main(int argc, char *argv[]) {
if (llarp_nodedb_ensure_dir(dir)) {
// ensure worker thread pool
if (!llarp.worker) llarp.worker = llarp_init_threadpool(2);
// ensire net io thread
llarp.netio = llarp_init_threadpool(1);
// ensure thread
llarp.thread = llarp_init_threadpool(1);
llarp.router = llarp_init_router(mem, llarp.worker, llarp.mainloop);
if (llarp_configure_router(llarp.router, llarp.config)) {
llarp.logic = llarp_init_logic(mem);
signal(SIGINT, handle_signal);
printf("starting router\n");
llarp_run_router(llarp.router, llarp.logic);
// run io loop
struct llarp_thread_job netjob = {
// run mainloop
struct llarp_thread_job job = {
.user = llarp.mainloop,
.work = &run_netio
.work = &run_net
};
llarp_threadpool_queue_job(llarp.netio, netjob);
llarp_threadpool_queue_job(llarp.thread, job);
printf("running\n");
llarp.exitcode = 0;
llarp_logic_mainloop(llarp.logic);

@ -213,6 +213,10 @@ struct server
//TODO: exponential backoff for cleanup timer ?
link->issue_cleanup_timer(orig);
}
else
{
printf("cleanup canceled\n");
}
}
static void handle_recvfrom(struct llarp_udp_io * udp, const struct sockaddr *saddr, const void * buf, ssize_t sz)

@ -27,14 +27,8 @@ void llarp_free_logic(struct llarp_logic** logic) {
}
}
static void llarp_logic_stop_work(void* user) {
struct llarp_logic* logic = user;
llarp_timer_stop(logic->timer);
}
void llarp_logic_stop(struct llarp_logic* logic) {
struct llarp_thread_job job = {.user = logic, .work = &llarp_logic_stop_work};
llarp_threadpool_queue_job(logic->thread, job);
llarp_timer_stop(logic->timer);
llarp_threadpool_stop(logic->thread);
llarp_threadpool_join(logic->thread);
}
@ -42,7 +36,6 @@ void llarp_logic_stop(struct llarp_logic* logic) {
void llarp_logic_mainloop(struct llarp_logic* logic) {
llarp_threadpool_start(logic->thread);
llarp_timer_run(logic->timer, logic->thread);
llarp_threadpool_wait(logic->thread);
}
uint32_t llarp_logic_call_later(struct llarp_logic* logic, struct llarp_timeout_job job)

@ -34,6 +34,7 @@ void Pool::Stop() {
void Pool::Join() {
for (auto &t : threads) t.join();
threads.clear();
}
void Pool::QueueJob(const llarp_thread_job &job) {

@ -113,8 +113,18 @@ void llarp_free_timer(struct llarp_timer_context** t) {
}
void llarp_timer_stop(struct llarp_timer_context* t) {
t->cancel_all();
t->stop();
{
std::unique_lock<std::mutex> lock(t->tickerMutex);
auto itr = t->timers.begin();
while (itr != t->timers.end())
{
// timer expired
llarp_threadpool_queue_job(t->threadpool, itr->second);
++itr;
}
}
}
void llarp_timer_cancel(struct llarp_timer_context* t, uint32_t id) {
@ -137,7 +147,7 @@ void llarp_timer_run(struct llarp_timer_context* t,
// timer hit
llarp_threadpool_queue_job(pool, itr->second);
}
++itr;
++itr;
}
}
}

Loading…
Cancel
Save