initial dht implementation

pull/1/head
Jeff Becker 6 years ago
parent bcb5e4fcbb
commit 620b9616a6
No known key found for this signature in database
GPG Key ID: F357B3B42F6F9B05

@ -2,6 +2,7 @@
#define LLARP_DHT_H_
#include <llarp/buffer.h>
#include <llarp/router.h>
/**
* dht.h
@ -17,7 +18,7 @@ struct llarp_dht_context;
/// allocator
struct llarp_dht_context*
llarp_dht_context_new();
llarp_dht_context_new(struct llarp_router* parent);
/// deallocator
void
@ -31,14 +32,40 @@ struct llarp_dht_msg;
typedef bool (*llarp_dht_msg_handler)(struct llarp_dht_msg*,
struct llarp_dht_msg*);
/// start dht context with our location in keyspace
void
llarp_dht_context_set_our_key(struct llarp_dht_context* ctx, const byte_t* key);
llarp_dht_context_start(struct llarp_dht_context* ctx, const byte_t* key);
// override dht message handler with custom handler
void
llarp_dht_set_msg_handler(struct llarp_dht_context* ctx,
llarp_dht_msg_handler func);
struct llarp_router_lookup_job;
typedef void (*llarp_rotuer_lookup_handler)(struct llarp_router_lookup_job*);
struct llarp_router_lookup_job
{
void* user;
llarp_rotuer_lookup_handler hook;
struct llarp_dht_context* dht;
llarp_pubkey_t target;
bool found;
llarp_rc result;
};
// shallow copy
void
llarp_dht_put_local_router(struct llarp_dht_context* ctx, struct llarp_rc* rc);
void
llarp_dht_remove_local_router(struct llarp_dht_context* ctx, const byte_t* id);
void
llarp_dht_lookup_router(struct llarp_dht_context* ctx,
struct llarp_router_lookup_job* job);
#ifdef __cplusplus
}
#endif

@ -4,9 +4,11 @@
#include <llarp/dht.h>
#include <llarp/router.h>
#include <llarp/router_contact.h>
#include <llarp/time.h>
#include <llarp/aligned.hpp>
#include <array>
#include <functional>
#include <map>
#include <vector>
@ -16,17 +18,20 @@ namespace llarp
{
const size_t MAX_MSG_SIZE = 2048;
struct SearchJob;
struct Node
{
llarp_rc rc;
llarp_rc* rc;
const byte_t*
ID() const;
Node();
~Node();
Node() : rc(nullptr)
{
}
Node(llarp_rc* other) : rc(other)
{
}
};
struct Key_t : public llarp::AlignedBuffer< 32 >
@ -55,6 +60,28 @@ namespace llarp
}
};
struct SearchJob
{
const static uint64_t JobTimeout = 30000;
SearchJob();
SearchJob(const Key_t& requestor, const Key_t& target,
llarp_router_lookup_job* job);
void
Completed(const llarp_rc* router, bool timeout = false) const;
bool
IsExpired(llarp_time_t now) const;
private:
llarp_time_t started;
Key_t requestor;
Key_t target;
llarp_router_lookup_job* job;
};
struct XorMetric
{
const Key_t& us;
@ -105,6 +132,10 @@ namespace llarp
bool
FindClosest(const Key_t& target, Key_t& result) const;
bool
FindCloseExcluding(const Key_t& target, Key_t& result,
const Key_t& exclude) const;
BucketStorage_t nodes;
};
@ -115,11 +146,78 @@ namespace llarp
llarp_dht_msg_handler custom_handler = nullptr;
SearchJob*
FindPendingTX(const Key_t& owner, uint64_t txid);
void
RemovePendingLookup(const Key_t& owner, uint64_t txid);
void
Init(const Key_t& us);
LookupRouter(const Key_t& target, const Key_t& whoasked,
const Key_t& askpeer,
llarp_router_lookup_job* job = nullptr);
void
LookupRouterViaJob(llarp_router_lookup_job* job);
void
LookupRouterRelayed(const Key_t& requester, uint64_t txid,
const Key_t& target,
std::vector< IMessage* >& replies);
void
Init(const Key_t& us, llarp_router* router);
void
QueueRouterLookup(llarp_router_lookup_job* job);
static void
handle_cleaner_timer(void* user, uint64_t orig, uint64_t left);
static void
queue_router_lookup(void* user);
llarp_router* router = nullptr;
Bucket* nodes = nullptr;
private:
Bucket* nodes = nullptr;
void
ScheduleCleanupTimer();
void
CleanupTX();
uint64_t ids;
struct TXOwner
{
Key_t requester = {0};
uint64_t txid = 0;
bool
operator==(const TXOwner& other) const
{
return txid == other.txid && requester == other.requester;
}
bool
operator<(const TXOwner& other) const
{
return txid < other.txid && requester < other.requester;
}
};
struct TXOwnerHash
{
std::size_t
operator()(TXOwner const& o) const noexcept
{
std::size_t sz2;
memcpy(&sz2, &o.requester[0], sizeof(std::size_t));
return o.txid ^ (sz2 << 1);
}
};
std::unordered_map< TXOwner, SearchJob, TXOwnerHash > pendingTX;
Key_t ourKey;
};
@ -128,6 +226,16 @@ namespace llarp
GotRouterMessage(const Key_t& from) : IMessage(from)
{
}
GotRouterMessage(const Key_t& from, uint64_t id, const llarp_rc* result)
: IMessage(from), txid(id)
{
if(result)
{
R.emplace_back();
llarp_rc_clear(&R.back());
llarp_rc_copy(&R.back(), result);
}
}
~GotRouterMessage();
@ -152,6 +260,11 @@ namespace llarp
{
}
FindRouterMessage(const Key_t& from, const Key_t& target, uint64_t id)
: IMessage(from), K(target), txid(id)
{
}
~FindRouterMessage();
bool
@ -174,6 +287,8 @@ namespace llarp
struct llarp_dht_context
{
llarp::dht::Context impl;
llarp_router* parent;
llarp_dht_context(llarp_router* router);
};
#endif

@ -30,7 +30,7 @@ bool
llarp_rc_verify_sig(struct llarp_crypto *crypto, struct llarp_rc *rc);
void
llarp_rc_copy(struct llarp_rc *dst, struct llarp_rc *src);
llarp_rc_copy(struct llarp_rc *dst, const struct llarp_rc *src);
void
llarp_rc_set_addrs(struct llarp_rc *rc, struct llarp_alloc *mem,

@ -3,6 +3,10 @@
#include <llarp/messages/dht_immediate.hpp>
#include "router.hpp"
#include <sodium.h>
#include <set>
namespace llarp
{
DHTImmeidateMessage::~DHTImmeidateMessage()
@ -33,6 +37,7 @@ namespace llarp
if(!bencode_start_dict(buf))
return false;
// message type
if(!bencode_write_bytestring(buf, "a", 1))
return false;
if(!bencode_write_bytestring(buf, "m", 1))
@ -53,6 +58,7 @@ namespace llarp
if(!bencode_end(buf))
return false;
// protocol version
if(!bencode_write_version_entry(buf))
return false;
@ -96,6 +102,20 @@ namespace llarp
GotRouterMessage::HandleMessage(llarp_router *router,
std::vector< IMessage * > &replies) const
{
auto &dht = router->dht->impl;
auto pending = dht.FindPendingTX(From, txid);
if(pending)
{
if(R.size())
pending->Completed(&R[0]);
else
pending->Completed(nullptr);
dht.RemovePendingLookup(From, txid);
return true;
}
llarp::Warn("Got response for DHT transaction we are not tracking, txid=",
txid);
return false;
}
@ -106,7 +126,34 @@ namespace llarp
bool
FindRouterMessage::BEncode(llarp_buffer_t *buf) const
{
return false;
if(!bencode_start_dict(buf))
return false;
// message type
if(!bencode_write_bytestring(buf, "A", 1))
return false;
if(!bencode_write_bytestring(buf, "R", 1))
return false;
// key
if(!bencode_write_bytestring(buf, "K", 1))
return false;
if(!bencode_write_bytestring(buf, K.data(), K.size()))
return false;
// txid
if(!bencode_write_bytestring(buf, "T", 1))
return false;
if(!bencode_write_uint64(buf, txid))
return false;
// version
if(!bencode_write_bytestring(buf, "V", 1))
return false;
if(!bencode_write_uint64(buf, version))
return false;
return bencode_end(buf);
}
bool
@ -129,9 +176,7 @@ namespace llarp
}
if(llarp_buffer_eq(key, "V"))
{
if(!bencode_read_integer(val, &version))
return false;
return version == LLARP_PROTO_VERSION;
return bencode_read_integer(val, &version);
}
return false;
}
@ -140,12 +185,20 @@ namespace llarp
FindRouterMessage::HandleMessage(llarp_router *router,
std::vector< IMessage * > &replies) const
{
return false;
auto &dht = router->dht->impl;
auto pending = dht.FindPendingTX(From, txid);
if(pending)
{
llarp::Warn("Got duplicate DHT lookup from ", From, " txid=", txid);
return false;
}
dht.LookupRouterRelayed(From, txid, K, replies);
return true;
}
struct MessageDecoder
{
Key_t From;
const Key_t &From;
bool firstKey = true;
IMessage *msg = nullptr;
@ -181,6 +234,7 @@ namespace llarp
dec->msg = new GotRouterMessage(dec->From);
break;
default:
llarp::Warn("unknown dht message type: ", (char)*strbuf.base);
// bad msg type
return false;
}
@ -213,7 +267,8 @@ namespace llarp
{
ListDecoder(const Key_t &from, std::vector< IMessage * > &list)
: From(from), l(list){};
Key_t From;
const Key_t &From;
std::vector< IMessage * > &l;
static bool
@ -245,18 +300,77 @@ namespace llarp
return bencode_read_list(buf, &r);
}
Node::Node()
const byte_t *
Node::ID() const
{
if(rc)
return rc->pubkey;
else
return nullptr;
}
SearchJob::SearchJob()
{
started = 0;
requestor.Zero();
target.Zero();
}
SearchJob::SearchJob(const Key_t &asker, const Key_t &key,
llarp_router_lookup_job *j)
: started(llarp_time_now_ms()), requestor(asker), target(key), job(j)
{
}
void
SearchJob::Completed(const llarp_rc *router, bool timeout) const
{
if(job && job->hook)
{
if(router)
{
job->found = true;
llarp_rc_copy(&job->result, router);
}
job->hook(job);
}
}
bool
SearchJob::IsExpired(llarp_time_t now) const
{
return now - started >= JobTimeout;
}
bool
Bucket::FindClosest(const Key_t &target, Key_t &result) const
{
llarp_rc_clear(&rc);
auto itr = nodes.lower_bound(target);
if(itr == nodes.end())
return false;
result = itr->second.ID();
return true;
}
Node::~Node()
bool
Bucket::FindCloseExcluding(const Key_t &target, Key_t &result,
const Key_t &exclude) const
{
llarp_rc_free(&rc);
auto itr = nodes.lower_bound(target);
if(itr == nodes.end())
return false;
if(itr->second.ID() == exclude)
++itr;
if(itr == nodes.end())
return false;
result = itr->second.ID();
return true;
}
Context::Context()
{
randombytes((byte_t *)&ids, sizeof(uint64_t));
}
Context::~Context()
@ -266,20 +380,155 @@ namespace llarp
}
void
Context::Init(const Key_t &us)
Context::handle_cleaner_timer(void *u, uint64_t orig, uint64_t left)
{
if(left)
return;
Context *ctx = static_cast< Context * >(u);
ctx->CleanupTX();
}
void
Context::LookupRouterRelayed(const Key_t &requester, uint64_t txid,
const Key_t &target,
std::vector< IMessage * > &replies)
{
if(target == ourKey)
{
// we are the target, give them our RC
replies.push_back(new GotRouterMessage(requester, txid, &router->rc));
return;
}
Key_t next = ourKey;
nodes->FindClosest(target, next);
if(next == ourKey)
{
// we are closest and don't have a match
replies.push_back(new GotRouterMessage(requester, txid, nullptr));
return;
}
if(next == target)
{
// we know it
replies.push_back(
new GotRouterMessage(requester, txid, nodes->nodes[target].rc));
return;
}
// ask neighbor
LookupRouter(target, requester, next);
}
void
Context::RemovePendingLookup(const Key_t &owner, uint64_t id)
{
auto itr = pendingTX.find({owner, id});
if(itr == pendingTX.end())
return;
pendingTX.erase(itr);
}
SearchJob *
Context::FindPendingTX(const Key_t &owner, uint64_t id)
{
auto itr = pendingTX.find({owner, id});
if(itr == pendingTX.end())
return nullptr;
else
return &itr->second;
}
void
Context::CleanupTX()
{
auto now = llarp_time_now_ms();
std::set< TXOwner > expired;
for(auto &item : pendingTX)
if(item.second.IsExpired(now))
expired.insert(item.first);
for(const auto &e : expired)
{
pendingTX[e].Completed(nullptr, true);
RemovePendingLookup(e.requester, e.txid);
if(e.requester != ourKey)
{
// inform not found
auto msg = new llarp::DHTImmeidateMessage(e.requester);
msg->msgs.push_back(
new GotRouterMessage(e.requester, e.txid, nullptr));
router->SendToOrQueue(e.requester, {msg});
}
}
ScheduleCleanupTimer();
}
void
Context::Init(const Key_t &us, llarp_router *r)
{
router = r;
ourKey = us;
nodes = new Bucket(ourKey);
}
void
Context::ScheduleCleanupTimer()
{
llarp_logic_call_later(router->logic,
{1000, this, &handle_cleaner_timer});
}
void
Context::LookupRouter(const Key_t &target, const Key_t &whoasked,
const Key_t &askpeer, llarp_router_lookup_job *job)
{
auto id = ++ids;
pendingTX[{whoasked, id}] = SearchJob(whoasked, target, job);
llarp::Info("Asking ", askpeer, " for router ", target, " for ",
whoasked);
auto msg = new llarp::DHTImmeidateMessage(askpeer);
msg->msgs.push_back(new FindRouterMessage(askpeer, target, id));
router->SendToOrQueue(askpeer, {msg});
}
void
Context::LookupRouterViaJob(llarp_router_lookup_job *job)
{
Key_t peer;
if(nodes->FindCloseExcluding(job->target, peer, ourKey))
LookupRouter(job->target, ourKey, peer, job);
else if(job->hook)
{
job->found = false;
job->hook(job);
}
}
void
Context::queue_router_lookup(void *user)
{
llarp_router_lookup_job *job =
static_cast< llarp_router_lookup_job * >(user);
job->dht->impl.LookupRouterViaJob(job);
}
}
}
llarp_dht_context::llarp_dht_context(llarp_router *router)
{
parent = router;
}
extern "C" {
struct llarp_dht_context *
llarp_dht_context_new()
llarp_dht_context_new(struct llarp_router *router)
{
return new llarp_dht_context;
return new llarp_dht_context(router);
}
void
@ -288,6 +537,23 @@ llarp_dht_context_free(struct llarp_dht_context *ctx)
delete ctx;
}
void
llarp_dht_put_local_router(struct llarp_dht_context *ctx, struct llarp_rc *rc)
{
ctx->impl.nodes->nodes[rc->pubkey] = rc;
}
void
llarp_dht_remove_local_router(struct llarp_dht_context *ctx, const byte_t *id)
{
auto &nodes = ctx->impl.nodes->nodes;
auto itr = nodes.find(id);
if(itr == nodes.end())
return;
nodes.erase(itr);
}
void
llarp_dht_set_msg_handler(struct llarp_dht_context *ctx,
llarp_dht_msg_handler handler)
@ -296,8 +562,17 @@ llarp_dht_set_msg_handler(struct llarp_dht_context *ctx,
}
void
llarp_dht_context_set_our_key(struct llarp_dht_context *ctx, const byte_t *key)
llarp_dht_context_start(struct llarp_dht_context *ctx, const byte_t *key)
{
ctx->impl.Init(key, ctx->parent);
}
void
llarp_dh_lookup_router(struct llarp_dht_context *ctx,
struct llarp_router_lookup_job *job)
{
ctx->impl.Init(key);
job->dht = ctx;
llarp_logic_queue_job(ctx->parent->logic,
{job, &llarp::dht::Context::queue_router_lookup});
}
}

@ -1232,6 +1232,10 @@ namespace iwp
[src](const auto &item) -> bool { return src == item.second; });
if(itr == std::end(m_Connected))
return;
// remove from dht tracking
llarp_dht_remove_local_router(router->dht, itr->first);
m_Connected.erase(itr);
}

@ -28,7 +28,7 @@ namespace llarp
} // namespace llarp
llarp_router::llarp_router()
: ready(false), dht(llarp_dht_context_new()), inbound_msg_parser(this)
: ready(false), dht(llarp_dht_context_new(this)), inbound_msg_parser(this)
{
llarp_rc_clear(&rc);
}
@ -310,6 +310,8 @@ llarp_router::on_try_connect_result(llarp_link_establish_job *job)
if(job->session)
{
auto session = job->session;
llarp_dht_put_local_router(router->dht,
session->get_remote_router(session));
router->async_verify_RC(session, false, job);
return;
}

@ -93,7 +93,7 @@ llarp_rc_decode_dict(struct dict_reader *r, llarp_buffer_t *key)
}
void
llarp_rc_copy(struct llarp_rc *dst, struct llarp_rc *src)
llarp_rc_copy(struct llarp_rc *dst, const struct llarp_rc *src)
{
llarp_rc_free(dst);
llarp_rc_clear(dst);

Loading…
Cancel
Save