named threadpool

pull/1/head
Jeff Becker 6 years ago
parent bffb4c3f3a
commit cc97da29d4
No known key found for this signature in database
GPG Key ID: F357B3B42F6F9B05

@ -1,5 +1,5 @@
[router] [router]
threads=1 threads=8
contact-file=router-contact.signed contact-file=router-contact.signed
[netdb] [netdb]

@ -2,7 +2,7 @@
#include <stdio.h> #include <stdio.h>
#include <signal.h> #include <signal.h>
#include <string.h> #include <string.h>
#include <pthread.h>
static void progress() { static void progress() {
printf("."); printf(".");
@ -80,8 +80,8 @@ void iter_main_config(struct llarp_config_iterator *itr, const char *section,
if (!strcmp(section, "router")) { if (!strcmp(section, "router")) {
if (!strcmp(key, "threads")) { if (!strcmp(key, "threads")) {
int workers = atoi(val); int workers = atoi(val);
if (workers > 0 && m->worker == NULL) { if (workers > 0 && m->worker == nullptr) {
m->worker = llarp_init_threadpool(workers); m->worker = llarp_init_threadpool(workers, "llarp-crypto-worker");
} }
} }
} }
@ -93,7 +93,7 @@ void iter_main_config(struct llarp_config_iterator *itr, const char *section,
} }
llarp_main llarp; llarp_main * llarp = nullptr;
void run_net(void * user) void run_net(void * user)
{ {
@ -102,59 +102,60 @@ void run_net(void * user)
void handle_signal(int sig) void handle_signal(int sig)
{ {
printf("interrupted\n"); printf("\ninterrupted\n");
llarp_ev_loop_stop(llarp.mainloop); llarp_ev_loop_stop(llarp->mainloop);
llarp_logic_stop(llarp.logic); llarp_logic_stop(llarp->logic);
printf("closing...");
} }
int main(int argc, char *argv[]) { int main(int argc, char *argv[]) {
const char *conffname = "daemon.ini"; const char *conffname = "daemon.ini";
if (argc > 1) conffname = argv[1]; if (argc > 1) conffname = argv[1];
llarp_mem_jemalloc(&llarp.mem); llarp = new llarp_main;
auto mem = &llarp.mem; llarp_mem_jemalloc(&llarp->mem);
llarp_new_config(&llarp.config); auto mem = &llarp->mem;
llarp_ev_loop_alloc(&llarp.mainloop); llarp_new_config(&llarp->config);
llarp_crypto_libsodium_init(&llarp.crypto); llarp_ev_loop_alloc(&llarp->mainloop);
llarp_crypto_libsodium_init(&llarp->crypto);
printf("%s loading config file %s\n", LLARP_VERSION, conffname); printf("%s loading config file %s\n", LLARP_VERSION, conffname);
if (!llarp_load_config(llarp.config, conffname)) { if (!llarp_load_config(llarp->config, conffname)) {
llarp_config_iterator iter; llarp_config_iterator iter;
iter.user = &llarp; iter.user = llarp;
iter.visit = iter_main_config; iter.visit = iter_main_config;
llarp_config_iter(llarp.config, &iter); llarp_config_iter(llarp->config, &iter);
llarp.nodedb = llarp_nodedb_new(mem, &llarp.crypto); llarp->nodedb = llarp_nodedb_new(mem, &llarp->crypto);
if (llarp.nodedb_dir[0]) { if (llarp->nodedb_dir[0]) {
llarp.nodedb_dir[sizeof(llarp.nodedb_dir) - 1] = 0; llarp->nodedb_dir[sizeof(llarp->nodedb_dir) - 1] = 0;
char *dir = llarp.nodedb_dir; char *dir = llarp->nodedb_dir;
if (llarp_nodedb_ensure_dir(dir)) { if (llarp_nodedb_ensure_dir(dir)) {
// ensure worker thread pool // ensure worker thread pool
if (!llarp.worker) llarp.worker = llarp_init_threadpool(2); if (!llarp->worker) llarp->worker = llarp_init_threadpool(2, "llarp-crypto-worker");
// ensure logic thread // ensure netio thread
llarp.thread = llarp_init_threadpool(1); llarp->thread = llarp_init_threadpool(1, "llarp-netio");
llarp.logic = llarp_init_logic(mem); llarp->logic = llarp_init_logic(mem);
llarp.router = llarp_init_router(mem, llarp.worker, llarp.mainloop, llarp.logic); llarp->router = llarp_init_router(mem, llarp->worker, llarp->mainloop, llarp->logic);
if (llarp_configure_router(llarp.router, llarp.config)) { if (llarp_configure_router(llarp->router, llarp->config)) {
signal(SIGINT, handle_signal); signal(SIGINT, handle_signal);
printf("starting router\n"); printf("starting router\n");
llarp_run_router(llarp.router); llarp_run_router(llarp->router);
// run mainloop // run mainloop
llarp_threadpool_queue_job(llarp.thread, {llarp.mainloop, &run_net}); llarp_threadpool_queue_job(llarp->thread, {llarp->mainloop, &run_net});
printf("running\n"); printf("running\n");
llarp.exitcode = 0; pthread_setname_np(pthread_self(), "llarp-ticker");
llarp_logic_mainloop(llarp.logic); llarp->exitcode = 0;
llarp_logic_mainloop(llarp->logic);
} else } else
printf("Failed to configure router\n"); printf("Failed to configure router\n");
} else } else
printf("failed to initialize nodedb at %s\n", dir); printf("failed to initialize nodedb at %s\n", dir);
} else } else
printf("no nodedb defined\n"); printf("no nodedb defined\n");
return llarp.shutdown(); return llarp->shutdown();
} else } else
printf("Failed to load config %s\n", conffname); printf("Failed to load config %s\n", conffname);
delete llarp;
return 1; return 1;
} }

@ -6,7 +6,7 @@ extern "C" {
struct llarp_threadpool; struct llarp_threadpool;
struct llarp_threadpool *llarp_init_threadpool(int workers); struct llarp_threadpool *llarp_init_threadpool(int workers, const char * name);
void llarp_free_threadpool(struct llarp_threadpool **tp); void llarp_free_threadpool(struct llarp_threadpool **tp);
typedef void (*llarp_thread_work_func)(void *); typedef void (*llarp_thread_work_func)(void *);
@ -22,7 +22,6 @@ struct llarp_thread_job {
void llarp_threadpool_queue_job(struct llarp_threadpool *tp, void llarp_threadpool_queue_job(struct llarp_threadpool *tp,
struct llarp_thread_job j); struct llarp_thread_job j);
void llarp_threadpool_start(struct llarp_threadpool *tp);
void llarp_threadpool_stop(struct llarp_threadpool *tp); void llarp_threadpool_stop(struct llarp_threadpool *tp);
void llarp_threadpool_join(struct llarp_threadpool *tp); void llarp_threadpool_join(struct llarp_threadpool *tp);

@ -11,7 +11,7 @@ struct llarp_logic* llarp_init_logic(struct llarp_alloc * mem) {
struct llarp_logic* logic = mem->alloc(mem, sizeof(struct llarp_logic), 8); struct llarp_logic* logic = mem->alloc(mem, sizeof(struct llarp_logic), 8);
if (logic) { if (logic) {
logic->mem = mem; logic->mem = mem;
logic->thread = llarp_init_threadpool(1); logic->thread = llarp_init_threadpool(1, "llarp-logic");
logic->timer = llarp_init_timer(); logic->timer = llarp_init_timer();
} }
return logic; return logic;
@ -34,7 +34,6 @@ void llarp_logic_stop(struct llarp_logic* logic) {
} }
void llarp_logic_mainloop(struct llarp_logic* logic) { void llarp_logic_mainloop(struct llarp_logic* logic) {
llarp_threadpool_start(logic->thread);
llarp_timer_run(logic->timer, logic->thread); llarp_timer_run(logic->timer, logic->thread);
} }

@ -1,11 +1,16 @@
#include "threadpool.hpp" #include "threadpool.hpp"
#include <pthread.h>
#include <cstring>
namespace llarp { namespace llarp {
namespace thread { namespace thread {
Pool::Pool(size_t workers) { Pool::Pool(size_t workers, const char * name) {
stop = false; stop = false;
while (workers--) { while (workers--) {
threads.emplace_back([this] { threads.emplace_back([this, name] {
if(name)
pthread_setname_np(pthread_self(), name);
llarp_thread_job job; llarp_thread_job job;
for (;;) { for (;;) {
{ {
@ -55,14 +60,14 @@ void Pool::QueueJob(const llarp_thread_job &job) {
struct llarp_threadpool { struct llarp_threadpool {
llarp::thread::Pool impl; llarp::thread::Pool impl;
llarp_threadpool(int workers) : impl(workers) {} llarp_threadpool(int workers, const char * name) : impl(workers, name) {}
}; };
extern "C" { extern "C" {
struct llarp_threadpool *llarp_init_threadpool(int workers) { struct llarp_threadpool *llarp_init_threadpool(int workers, const char * name) {
if (workers > 0) if (workers > 0)
return new llarp_threadpool(workers); return new llarp_threadpool(workers, name);
else else
return nullptr; return nullptr;
} }

@ -14,7 +14,7 @@ namespace thread {
typedef std::mutex mtx_t; typedef std::mutex mtx_t;
typedef std::unique_lock<mtx_t> lock_t; typedef std::unique_lock<mtx_t> lock_t;
struct Pool { struct Pool {
Pool(size_t sz); Pool(size_t sz, const char * name);
void QueueJob(const llarp_thread_job& job); void QueueJob(const llarp_thread_job& job);
void Join(); void Join();
void Stop(); void Stop();

Loading…
Cancel
Save