make it compile and link

pull/1/head
Jeff Becker 6 years ago
parent f50a49b7cb
commit a704a28f34
No known key found for this signature in database
GPG Key ID: F357B3B42F6F9B05

@ -3,6 +3,7 @@
#include <stdlib.h>
#include <stdint.h>
#include <stdbool.h>
#include <sys/socket.h>
#ifdef __cplusplus
@ -35,14 +36,19 @@ extern "C" {
struct llarp_ev_job
{
/** the loop this job belongs to */
struct llarp_ev_loop * loop;
/** user data */
void * user;
/** work is called async when ready in the event loop thread */
void (*work)(struct llarp_ev_job *);
};
/** call work async in event loop thread (thread safe) */
void llarp_ev_async(struct llarp_ev_loop * ev, struct llarp_ev_job job);
/**
call work async in event loop thread (thread safe)
return true if we queued the job otherwise return false
*/
bool llarp_ev_async(struct llarp_ev_loop * ev, struct llarp_ev_job job);
#ifdef __cplusplus
}

@ -13,10 +13,14 @@ extern "C" {
/** job to be done in worker thread */
struct llarp_thread_job
{
/** calls result async after work is executed */
/**
called async after work is executed
*/
struct llarp_ev_job * result;
/** user data to pass to work function */
void * user;
/** called in threadpool worker thread */
void (*work)(struct llarp_thread_job *);
void (*work)(void *);
};
void llarp_threadpool_queue_job(struct llarp_threadpool * tp, struct llarp_thread_job j);

@ -27,6 +27,11 @@ namespace llarp
{
return llarp_g_mem.alloc(sz, alignment<udp_listener>());
}
static void operator delete(void * ptr)
{
llarp_g_mem.free(ptr);
}
uv_udp_t _handle;
struct llarp_udp_listener * listener;
@ -65,11 +70,29 @@ namespace llarp
{
udp_listener * l = static_cast<udp_listener *>(handle->data);
l->closed();
llarp_g_mem.free(l);
delete l;
}
}
namespace llarp
{
static void ev_handle_async_closed(uv_handle_t * handle)
{
struct llarp_ev_job * ev = static_cast<llarp_ev_job *>(handle->data);
llarp_g_mem.free(ev);
llarp_g_mem.free(handle);
}
static void ev_handle_async(uv_async_t * handle)
{
struct llarp_ev_job * ev = static_cast<llarp_ev_job *>(handle->data);
ev->work(ev);
uv_close((uv_handle_t *)handle, ev_handle_async_closed);
}
}
extern "C" {
void llarp_ev_loop_alloc(struct llarp_ev_loop ** ev)
{
@ -135,4 +158,27 @@ extern "C" {
}
return ret;
}
void llarp_ev_loop_stop(struct llarp_ev_loop * loop)
{
uv_stop(loop->loop());
}
bool llarp_ev_async(struct llarp_ev_loop * loop, struct llarp_ev_job job)
{
struct llarp_ev_job * job_copy = static_cast<struct llarp_ev_job *>(llarp_g_mem.alloc(sizeof(struct llarp_ev_job), llarp::alignment<llarp_ev_job>()));
job_copy->work = job.work;
job_copy->loop = loop;
job_copy->user = job.user;
uv_async_t * async = static_cast<uv_async_t *>(llarp_g_mem.alloc(sizeof(uv_async_t), llarp::alignment<uv_async_t>()));
async->data = job_copy;
if(uv_async_init(loop->loop(), async, llarp::ev_handle_async) == 0 && uv_async_send(async))
return true;
else
{
llarp_g_mem.free(job_copy);
llarp_g_mem.free(async);
return false;
}
}
}

@ -7,43 +7,30 @@ bool operator < (const sockaddr_in6 addr0, const sockaddr_in6 addr1)
return memcmp(addr0.sin6_addr.s6_addr, addr1.sin6_addr.s6_addr, sizeof(addr0.sin6_addr)) && addr0.sin6_port < addr1.sin6_port;
}
namespace llarp
{
extern "C"{
struct llarp_link * llarp_link_alloc()
{
return new llarp_link;
}
static void link_recv_from(struct llarp_udp_listener * l, const struct sockaddr * src, char * buff, ssize_t sz)
void llarp_link_free(struct llarp_link ** l)
{
if(src && src->sa_family == AF_INET6)
{
Link * link = static_cast<Link*>(l->user);
struct sockaddr_in6 remote;
memcpy(&remote, src, sizeof(sockaddr_in6));
auto itr = link->sessions.find(remote);
if(itr == link->sessions.end())
{
link->sessions[remote] = std::make_shared<PeerSession>(link->_crypto, remote);
}
link->sessions[remote]->RecvFrom(buff, sz);
}
if(*l) delete *l;
*l =nullptr;
}
Link::Link(llarp_crypto * crypto) : _crypto(crypto)
struct llarp_udp_listener * llarp_link_udp_listener(struct llarp_link *l)
{
_listener.user = this;
_listener.recvfrom = link_recv_from;
return &l->listener;
}
PeerSession::PeerSession(llarp_crypto * crypto, sockaddr_in6 remote) :
lastRX(0),
remoteAddr(remote),
_crypto(crypto),
state(eHandshakeInboundInit)
bool llarp_link_configure(struct llarp_link * link, const char * ifname, int af)
{
memset(sessionKey, 0, sizeof(sessionKey));
return false;
}
void PeerSession::RecvFrom(const char * buff, ssize_t sz)
void llarp_link_stop(struct llarp_link * link)
{
lastRX = llarp_time_now_ms();
llarp_ev_close_udp_listener(&link->listener);
}
}

@ -9,66 +9,25 @@
#include <llarp/ev.h>
#include <llarp/router_contact.h>
#include "mem.hpp"
namespace llarp
struct llarp_link
{
struct Link;
struct PeerSession
static void * operator new(size_t sz)
{
sockaddr_in6 remoteAddr;
llarp_rc rc;
llarp_sharedkey_t sessionKey;
uint64_t lastRX;
llarp_crypto * _crypto;
enum State
{
eStateNULL,
eHandshakeInboundInit,
eHandshakeOutboundInit,
eHandshakeInboundRepliedInit,
eHandshakeOutboundGotReply,
eHandshakeInboundGotAck,
eHandshakeOutboundGotAck,
eEstablished,
eTimeout
};
State state;
/** inbound session */
PeerSession(llarp_crypto * crypto, sockaddr_in6 remote);
/** outbound session */
PeerSession(llarp_crypto * crypto, llarp_rc rc);
void SendTo(Link * link, const char * buff, std::size_t sz);
void RecvFrom(const char * buff, ssize_t sz);
typedef std::shared_ptr<PeerSession> Ptr;
};
return llarp_g_mem.alloc(sz, llarp::alignment<llarp_link>());
}
struct Link
static void operator delete(void * ptr)
{
typedef std::map<sockaddr_in6, PeerSession::Ptr> Sessions;
Sessions sessions;
llarp_seckey_t transportSecKey;
llarp_pubkey_t transportPubKey;
llarp_crypto * _crypto;
Link(llarp_crypto * crypto);
llarp_udp_listener _listener;
llarp_udp_listener * Listener() { return &_listener; }
};
}
llarp_g_mem.free(ptr);
}
struct sockaddr_in6 localaddr;
int af;
llarp_udp_listener listener;
};
#endif

@ -89,14 +89,17 @@ extern "C" {
});
}
void llarp_stop_router(struct llarp_router * router)
{
router->Close();
}
void llarp_free_router(struct llarp_router ** router)
{
if(*router)
{
llarp_router * r = *router;
r->Close();
r->ForEachLink([](llarp_link * link) { llarp_g_mem.free(link); });
delete r;
(*router)->ForEachLink([](llarp_link * link) { llarp_g_mem.free(link); });
delete *router;
}
*router = nullptr;
}

@ -0,0 +1,96 @@
#include "threadpool.hpp"
#include <iostream>
namespace llarp
{
namespace thread
{
Pool::Pool(size_t workers)
{
stop.store(true);
while(workers--)
{
threads.emplace_back(
[this]
{
for(;;)
{
llarp_thread_job job;
{
lock_t lock(this->queue_mutex);
this->condition.wait(lock, [this]{ return this->stop || !this->jobs.empty(); });
if(this->stop && this->jobs.empty())
return;
job = std::move(this->jobs.front());
this->jobs.pop();
}
// do work
job.work(job.user);
// inform result if needed
if(job.result && job.result->loop)
if(!llarp_ev_async(job.result->loop, *job.result))
{
std::cerr << "failed to queue result in thread worker" << std::endl;
}
}
});
}
}
void Pool::Join()
{
{
lock_t lock(queue_mutex);
stop.store(true);
}
condition.notify_all();
for(auto & t : threads)
t.join();
}
void Pool::QueueJob(llarp_thread_job job)
{
{
lock_t lock(queue_mutex);
// don't allow enqueueing after stopping the pool
if(stop)
throw std::runtime_error("enqueue on stopped ThreadPool");
jobs.emplace(job);
}
condition.notify_one();
}
}
}
struct llarp_threadpool
{
llarp::thread::Pool impl;
llarp_threadpool(int workers) : impl(workers) {}
};
extern "C" {
struct llarp_threadpool * llarp_init_threadpool(int workers)
{
if(workers > 0)
return new llarp_threadpool(workers);
else
return nullptr;
}
void llarp_threadpool_join(struct llarp_threadpool * pool)
{
pool->impl.Join();
}
void llarp_free_threadpool(struct llarp_threadpool ** pool)
{
delete *pool;
*pool = nullptr;
}
}

@ -0,0 +1,36 @@
#ifndef LLARP_THREADPOOL_HPP
#define LLARP_THREADPOOL_HPP
#include <llarp/threadpool.h>
#include <atomic>
#include <condition_variable>
#include <mutex>
#include <thread>
#include <queue>
#include <vector>
namespace llarp
{
namespace thread
{
typedef std::mutex mtx_t;
typedef std::unique_lock<mtx_t> lock_t;
struct Pool
{
Pool(size_t sz);
void QueueJob(llarp_thread_job job);
void Join();
std::vector<std::thread> threads;
std::queue<llarp_thread_job> jobs;
mtx_t queue_mutex;
std::condition_variable condition;
std::atomic<bool> stop;
};
}
}
#endif
Loading…
Cancel
Save