more stuff

pull/1/head
Jeff Becker 6 years ago
parent fbf2778453
commit fe19314484
No known key found for this signature in database
GPG Key ID: F357B3B42F6F9B05

@ -69,6 +69,9 @@ llarp_ai_list_pushback(struct llarp_ai_list *l, struct llarp_ai *ai);
size_t
llarp_ai_list_size(struct llarp_ai_list *l);
void
llarp_ai_list_copy(struct llarp_ai_list *dst, struct llarp_ai_list *src);
/// does this index exist in list
bool
llarp_ai_list_index(struct llarp_ai_list *l, ssize_t idx,

@ -44,6 +44,9 @@ llarp_xi_list_bencode(struct llarp_xi_list *l, llarp_buffer_t *buf);
void
llarp_xi_list_pushback(struct llarp_xi_list *l, struct llarp_xi *xi);
void
llarp_xi_list_copy(struct llarp_xi_list *dst, struct llarp_xi_list *src);
void
llarp_xi_copy(struct llarp_xi *dst, struct llarp_xi *src);

@ -80,10 +80,6 @@ struct llarp_link
void (*iter_sessions)(struct llarp_link *, struct llarp_link_session_iter);
bool (*try_establish)(struct llarp_link *, struct llarp_link_establish_job *);
/**
struct llarp_link_session * (*acquire_session_for_addr)(struct llarp_link
*, const struct sockaddr *);
*/
void (*mark_session_active)(struct llarp_link *, struct llarp_link_session *);
void (*free_impl)(struct llarp_link *);
};

@ -2,6 +2,7 @@
#define LLARP_NODEDB_H
#include <llarp/common.h>
#include <llarp/crypto.h>
#include <llarp/router_contact.h>
#ifdef __cplusplus
extern "C" {
#endif
@ -51,6 +52,13 @@ bool
llarp_nodedb_find_rc(struct llarp_nodedb *n, struct llarp_rc *rc,
llarp_pubkey_t k);
/**
return true if we have a rc with rc.k of value k on disk
otherwise return false
*/
bool
llarp_nodedb_has_rc(struct llarp_nodedb *n, llarp_pubkey_t k);
/**
put an rc into the node db
overwrites with new contents if already present
@ -60,6 +68,35 @@ llarp_nodedb_find_rc(struct llarp_nodedb *n, struct llarp_rc *rc,
bool
llarp_nodedb_put_rc(struct llarp_nodedb *n, struct llarp_rc *rc);
/**
struct for async rc verification
*/
struct llarp_async_verify_rc;
typedef void (*llarp_async_verify_rc_hook_func)(struct llarp_async_verify_rc *);
struct llarp_async_verify_rc
{
void *user;
struct llarp_rc rc;
bool valid;
llarp_async_verify_rc_hook_func hook;
};
/**
struct for async rc verification
data is loaded in disk io threadpool
crypto is done on the crypto worker threadpool
result is called on the logic thread
*/
void
llarp_nodedb_async_verify(struct llarp_nodedb *nodedb,
struct llarp_logic *logic,
struct llarp_crypto *crypto,
struct llarp_threadpool *cryptoworker,
struct llarp_threadpool *diskworker,
struct llarp_async_verify_rc *job);
#ifdef __cplusplus
}
#endif

@ -30,7 +30,7 @@ bool
llarp_configure_router(struct llarp_router *router, struct llarp_config *conf);
void
llarp_run_router(struct llarp_router *router);
llarp_run_router(struct llarp_router *router, struct llarp_nodedb *nodedb);
void
llarp_stop_router(struct llarp_router *router);

@ -210,6 +210,14 @@ llarp_ai_copy(struct llarp_ai *dst, struct llarp_ai *src)
memcpy(dst, src, sizeof(struct llarp_ai));
}
void
llarp_ai_list_copy(struct llarp_ai_list *dst, struct llarp_ai_list *src)
{
dst->list.clear();
for(auto &itr : src->list)
dst->list.emplace_back(itr);
}
void
llarp_ai_list_pushback(struct llarp_ai_list *l, struct llarp_ai *ai)
{

@ -3,7 +3,7 @@
#include <llarp.hpp>
#include "logger.hpp"
#if (__FreeBSD__)
#if(__FreeBSD__)
#include <pthread_np.h>
#endif
@ -95,15 +95,15 @@ namespace llarp
if(llarp_configure_router(router, config))
{
llarp_run_router(router);
llarp_run_router(router, nodedb);
// run net io thread
auto netio = mainloop;
while(num_nethreads--)
{
netio_threads.emplace_back([netio]() { llarp_ev_loop_run(netio); });
#if (__APPLE__ && __MACH__)
#if(__APPLE__ && __MACH__)
#elif (__FreeBSD__)
#elif(__FreeBSD__)
pthread_set_name_np(netio_threads.back().native_handle(),
"llarp-netio");
#else

@ -7,6 +7,17 @@
namespace llarp
{
typedef std::array< uint8_t, sizeof(llarp_pubkey_t) > pubkey;
struct pubkeyhash
{
std::size_t
operator()(pubkey const& a) const noexcept
{
size_t sz = 0;
memcpy(&sz, a.data(), sizeof(size_t));
return sz;
}
};
}
#endif

@ -175,6 +175,14 @@ llarp_xi_list_decode_item(struct list_reader *r, bool more)
return llarp_xi_bdecode(&l->list.back(), r->buffer);
}
void
llarp_xi_list_copy(struct llarp_xi_list *dst, struct llarp_xi_list *src)
{
dst->list.clear();
for(auto &itr : src->list)
dst->list.emplace_back(itr);
}
bool
llarp_xi_list_bdecode(struct llarp_xi_list *l, llarp_buffer_t *buff)
{

@ -156,4 +156,14 @@ llarp_nodedb_load_dir(struct llarp_nodedb *n, const char *dir)
{
return n->Load(dir);
}
void
llarp_nodedb_async_verify(struct llarp_nodedb *nodedb,
struct llarp_logic *logic,
struct llarp_crypto *crypto,
struct llarp_threadpool *cryptoworker,
struct llarp_threadpool *diskworker,
struct llarp_async_verify_rc *job)
{
}
}

@ -18,6 +18,13 @@ namespace llarp
void
router_iter_config(llarp_config_iterator *iter, const char *section,
const char *key, const char *val);
struct async_verify_context
{
llarp_router *router;
llarp_link_establish_job *establish_job;
};
} // namespace llarp
llarp_router::llarp_router() : ready(false), inbound_msg_handler(this)
@ -164,43 +171,108 @@ llarp_router::connect_job_retry(void *user)
}
void
llarp_router::on_try_connect_result(llarp_link_establish_job *job)
llarp_router::on_verify_client_rc(llarp_async_verify_rc *job)
{
llarp_router *router = static_cast< llarp_router * >(job->user);
if(job->session)
llarp::async_verify_context *ctx =
static_cast< llarp::async_verify_context * >(job->user);
llarp_rc_free(&job->rc);
delete ctx;
}
void
llarp_router::on_verify_server_rc(llarp_async_verify_rc *job)
{
llarp::async_verify_context *ctx =
static_cast< llarp::async_verify_context * >(job->user);
auto router = ctx->router;
if(!job->valid)
{
llarp::Warn(__FILE__, "invalid server RC");
if(ctx->establish_job)
{
// was an outbound attempt
auto session = ctx->establish_job->session;
if(session)
session->close(session);
delete ctx->establish_job;
}
llarp_rc_free(&job->rc);
delete job;
return;
}
llarp::Debug(__FILE__, "rc verified");
// this was an outbound establish job
if(ctx->establish_job->session)
{
llarp_rc *remote = job->session->get_remote_router(job->session);
if(remote)
auto session = ctx->establish_job->session;
llarp::pubkey pubkey;
memcpy(&pubkey[0], job->rc.pubkey, pubkey.size());
auto v = router->validRouters.find(pubkey);
if(v != router->validRouters.end())
{
// free previous RC members
llarp_rc_free(&v->second);
}
router->validRouters[pubkey] = job->rc;
auto itr = router->pendingMessages.find(pubkey);
if(itr != router->pendingMessages.end())
{
llarp::pubkey pubkey;
memcpy(&pubkey[0], remote->pubkey, 32);
char tmp[68] = {0};
const char *pubkeystr =
llarp::HexEncode< decltype(pubkey), decltype(tmp) >(pubkey, tmp);
llarp::Info(__FILE__, "session established with ", pubkeystr);
auto itr = router->pendingMessages.find(pubkey);
if(itr != router->pendingMessages.end())
// flush pending
if(itr->second.size())
{
// flush pending
if(itr->second.size())
{
llarp::Info(__FILE__, pubkeystr, " flush ", itr->second.size(),
" pending messages");
}
for(auto &msg : itr->second)
{
auto buf = llarp::Buffer< decltype(msg) >(msg);
job->session->sendto(job->session, buf);
}
router->pendingMessages.erase(itr);
llarp::Debug(__FILE__, "flush ", itr->second.size(),
" pending messages");
}
delete job;
return;
for(auto &msg : itr->second)
{
auto buf = llarp::Buffer< decltype(msg) >(msg);
session->sendto(session, buf);
}
router->pendingMessages.erase(itr);
}
}
else
llarp_rc_free(&job->rc);
delete job;
}
void
llarp_router::on_try_connect_result(llarp_link_establish_job *job)
{
llarp_router *router = static_cast< llarp_router * >(job->user);
if(job->session)
{
auto session = job->session;
router->async_verify_RC(session, false, job);
return;
}
llarp::Info(__FILE__, "session not established");
llarp_logic_queue_job(router->logic, {job, &llarp_router::connect_job_retry});
}
void
llarp_router::async_verify_RC(llarp_link_session *session,
bool isExpectingClient,
llarp_link_establish_job *establish_job)
{
llarp_async_verify_rc *job = new llarp_async_verify_rc{
new llarp::async_verify_context{this, establish_job},
{},
false,
nullptr,
};
llarp_rc_copy(&job->rc, session->get_remote_router(session));
if(isExpectingClient)
job->hook = &llarp_router::on_verify_client_rc;
else
job->hook = &llarp_router::on_verify_server_rc;
llarp_nodedb_async_verify(nodedb, logic, &crypto, tp, disk, job);
}
void
llarp_router::Run()
@ -286,6 +358,8 @@ llarp_init_router(struct llarp_threadpool *tp, struct llarp_ev_loop *netloop,
router->netloop = netloop;
router->tp = tp;
router->logic = logic;
// TODO: make disk io threadpool count configurable
router->disk = llarp_init_threadpool(1, "llarp-diskio");
llarp_crypto_libsodium_init(&router->crypto);
}
return router;
@ -306,8 +380,9 @@ llarp_configure_router(struct llarp_router *router, struct llarp_config *conf)
}
void
llarp_run_router(struct llarp_router *router)
llarp_run_router(struct llarp_router *router, struct llarp_nodedb *nodedb)
{
router->nodedb = nodedb;
router->Run();
}

@ -1,12 +1,14 @@
#ifndef LLARP_ROUTER_HPP
#define LLARP_ROUTER_HPP
#include <llarp/link.h>
#include <llarp/nodedb.h>
#include <llarp/path.h>
#include <llarp/router.h>
#include <llarp/router_contact.h>
#include <functional>
#include <list>
#include <map>
#include <unordered_map>
#include <llarp/link_message.hpp>
@ -48,6 +50,9 @@ struct llarp_router
llarp_crypto crypto;
llarp_path_context *paths;
llarp_seckey_t identity;
llarp_threadpool *disk;
llarp_nodedb *nodedb;
llarp::InboundMessageHandler inbound_msg_handler;
@ -55,6 +60,8 @@ struct llarp_router
std::map< llarp::pubkey, std::vector< llarp::Message > > pendingMessages;
std::unordered_map< llarp::pubkey, llarp_rc, llarp::pubkeyhash > validRouters;
llarp_router();
~llarp_router();
@ -88,15 +95,16 @@ struct llarp_router
void
try_connect(fs::path rcfile);
bool
has_session_to(const uint8_t *pubkey) const;
void
QueueSendTo(const byte_t *pubkey, const std::vector< llarp::Message > &msgs);
bool
ProcessLRCM(llarp::LR_CommitMessage msg);
void
async_verify_RC(llarp_link_session *session, bool isExpectingClient,
llarp_link_establish_job *job = nullptr);
static bool
iter_try_connect(llarp_router_link_iter *i, llarp_router *router,
llarp_link *l);
@ -106,6 +114,12 @@ struct llarp_router
static void
connect_job_retry(void *user);
static void
on_verify_client_rc(llarp_async_verify_rc *context);
static void
on_verify_server_rc(llarp_async_verify_rc *context);
};
#endif

@ -92,6 +92,27 @@ llarp_rc_decode_dict(struct dict_reader *r, llarp_buffer_t *key)
return false;
}
void
llarp_rc_copy(struct llarp_rc *dst, struct llarp_rc *src)
{
llarp_rc_free(dst);
memcpy(dst->pubkey, src->pubkey, sizeof(llarp_pubkey_t));
memcpy(dst->signature, src->signature, sizeof(llarp_sig_t));
dst->last_updated = src->last_updated;
if(src->addrs)
{
dst->addrs = llarp_ai_list_new();
llarp_ai_list_copy(dst->addrs, src->addrs);
}
if(src->exits)
{
dst->exits = llarp_xi_list_new();
llarp_xi_list_copy(dst->exits, src->exits);
}
}
bool
llarp_rc_bdecode(struct llarp_rc *rc, llarp_buffer_t *buff)
{

Loading…
Cancel
Save