more tweaking

pull/5/head
Jeff Becker 6 years ago
parent d987aa09c7
commit fb13c5ce3e
No known key found for this signature in database
GPG Key ID: F357B3B42F6F9B05

@ -34,6 +34,10 @@ def main():
for nodeid in range(args.svc):
config = CP()
config['router'] = {
'net-threads': '1',
'worker-threads': '4'
}
config['bind'] = {
'lo': str(args.baseport + nodeid)
}
@ -56,6 +60,10 @@ def main():
for nodeid in range(args.clients):
config = CP()
config['router'] = {
'net-threads': '1',
'worker-threads': '2'
}
config['netdb'] = {
'dir': 'netdb'
}

@ -11,7 +11,24 @@ namespace llarp
{
namespace util
{
struct DummyMutex
{
};
template < typename Mutex_t >
struct DummyLock
{
DummyLock(const Mutex_t& mtx){
};
~DummyLock()
{
}
};
template < typename T, typename GetTime, typename PutTime,
typename Mutex_t = std::mutex,
typename Lock_t = std::unique_lock< Mutex_t >,
llarp_time_t dropMs = 20, llarp_time_t initialIntervalMs = 100 >
struct CoDelQueue
{
@ -28,10 +45,17 @@ namespace llarp
}
};
size_t
Size()
{
Lock_t lock(m_QueueMutex);
return m_Queue.size();
}
void
Put(const T& i)
{
std::unique_lock< std::mutex > lock(m_QueueMutex);
Lock_t lock(m_QueueMutex);
// llarp::Info("CoDelQueue::Put - adding item, queue now has ",
// m_Queue.size(), " items at ", getTime(*item));
PutTime()(i);
@ -46,7 +70,7 @@ namespace llarp
llarp_time_t lowest = 0xFFFFFFFFFFFFFFFFUL;
// auto start = llarp_time_now_ms();
// llarp::Info("CoDelQueue::Process - start at ", start);
std::unique_lock< std::mutex > lock(m_QueueMutex);
Lock_t lock(m_QueueMutex);
auto start = firstPut;
while(m_Queue.size())
{
@ -85,7 +109,7 @@ namespace llarp
llarp_time_t firstPut = 0;
size_t dropNum = 0;
llarp_time_t nextTickInterval = initialIntervalMs;
std::mutex m_QueueMutex;
Mutex_t m_QueueMutex;
std::priority_queue< T, std::vector< T >, CoDelCompare > m_Queue;
std::string m_name;
};

@ -3,7 +3,9 @@
#include <llarp/router_contact.h>
#include <string.h>
#include <llarp/crypto.hpp>
#include <llarp/router_id.hpp>
#include "buffer.hpp"
#include "logger.hpp"
#include "mem.hpp"
struct llarp_async_iwp
@ -44,15 +46,12 @@ namespace iwp
iwp_async_intro *intro = static_cast< iwp_async_intro * >(user);
llarp::SharedSecret sharedkey;
llarp::ShortHash e_k;
llarp::SymmNonce n;
llarp_crypto *crypto = intro->iwp->crypto;
byte_t tmp[64];
// S = TKE(a.k, b.k, n)
crypto->transport_dh_client(sharedkey, intro->remote_pubkey,
intro->secretkey, intro->nonce);
auto buf = llarp::StackBuffer< decltype(tmp) >(tmp);
// copy nonce
memcpy(n, intro->nonce, 24);
// e_k = HS(b.k + n)
memcpy(tmp, intro->remote_pubkey, 32);
memcpy(tmp + 32, intro->nonce, 32);
@ -62,7 +61,7 @@ namespace iwp
buf.base = intro->buf + 64;
buf.cur = buf.base;
buf.sz = 32;
crypto->xchacha20(buf, e_k, n);
crypto->xchacha20(buf, e_k, intro->nonce);
// h = MDS( n + e + w0, S)
buf.base = intro->buf + 32;
buf.cur = buf.base;
@ -81,27 +80,24 @@ namespace iwp
llarp::SharedSecret sharedkey;
llarp::ShortHash e_K;
llarp::SharedSecret h;
llarp::SymmNonce N;
byte_t tmp[64];
auto OurPK = llarp::seckey_topublic(intro->secretkey);
const auto OurPK = llarp::seckey_topublic(intro->secretkey);
// e_k = HS(b.k + n)
memcpy(tmp, OurPK, 32);
memcpy(tmp + 32, intro->nonce, 32);
auto buf = llarp::StackBuffer< decltype(tmp) >(tmp);
crypto->shorthash(e_K, buf);
// a.k = SD(x, e_k, n[0:24])
memcpy(N, intro->nonce, 24);
buf.base = intro->remote_pubkey;
buf.cur = buf.base;
buf.sz = 32;
memcpy(intro->remote_pubkey, intro->buf + 64, 32);
crypto->xchacha20(buf, e_K, N);
crypto->xchacha20(buf, e_K, intro->nonce);
llarp::Info("handshake from ", llarp::RouterID(intro->remote_pubkey));
// S = TKE(a.k, b.k, n)
crypto->transport_dh_server(sharedkey, intro->remote_pubkey,
intro->secretkey, intro->nonce);
// h = MDS( n + e + w2 )
// h = MDS( n + e + w2, S)
buf.base = intro->buf + 32;
buf.cur = buf.base;
buf.sz = intro->sz - 32;

@ -430,6 +430,48 @@ namespace iwp
}
};
struct InboundMessage
{
uint64_t msgid;
std::vector< byte_t > msg;
llarp_time_t queued = 0;
InboundMessage(uint64_t id, const std::vector< byte_t > &m)
: msgid(id), msg(m)
{
}
bool
operator<(const InboundMessage &other) const
{
return msgid < other.msgid;
}
llarp_buffer_t
Buffer()
{
return llarp::Buffer< decltype(msg) >(msg);
}
struct GetTime
{
llarp_time_t
operator()(const InboundMessage *msg)
{
return msg->queued;
}
};
struct PutTime
{
void
operator()(InboundMessage *msg)
{
msg->queued = llarp_time_now_ms();
}
};
};
struct frame_state
{
byte_t rxflags = 0;
@ -441,11 +483,22 @@ namespace iwp
std::unordered_map< uint64_t, transit_message * > tx;
typedef std::queue< sendbuf_t * > sendqueue_t;
typedef llarp::util::CoDelQueue<
InboundMessage *, InboundMessage::GetTime, InboundMessage::PutTime,
llarp::util::DummyMutex,
llarp::util::DummyLock< llarp::util::DummyMutex > >
recvqueue_t;
llarp_router *router = nullptr;
llarp_link_session *parent = nullptr;
sendqueue_t sendqueue;
recvqueue_t recvqueue;
uint64_t nextMsgID = 0;
frame_state() : recvqueue("iwp_inbound_message")
{
}
/// return true if both sides have the same state flags
bool
@ -454,6 +507,24 @@ namespace iwp
return ((rxflags & flags) & (txflags & flags)) == flags;
}
bool
process_inbound_queue()
{
std::queue< InboundMessage * > q;
recvqueue.Process(q);
while(q.size())
{
// TODO: is this right?
nextMsgID = std::max(nextMsgID, q.front()->msgid);
if(!router->HandleRecvLinkMessage(parent, q.front()->Buffer()))
llarp::Warn("failed to process inbound message ", q.front()->msgid);
delete q.front();
q.pop();
}
// TODO: this isn't right
return true;
}
void
clear()
{
@ -568,7 +639,7 @@ namespace iwp
if(itr == rx.end())
{
llarp::Warn("no such RX fragment, msgid=", msgid);
return false;
return true;
}
auto fragsize = itr->second->msginfo.fragsize();
if(fragsize != sz - 9)
@ -772,8 +843,11 @@ namespace iwp
, addr(a)
, state(eInitial)
{
eph_seckey = seckey;
llarp::Zero(&remote_router, sizeof(llarp_rc));
if(seckey)
eph_seckey = seckey;
else
crypto->encryption_keygen(eph_seckey);
llarp_rc_clear(&remote_router);
crypto->randbytes(token, 32);
llarp::Info("session created");
}
@ -936,6 +1010,7 @@ namespace iwp
switch(state)
{
case eInitial:
case eIntroRecv:
// got intro
llarp::Debug("session recv - intro");
on_intro(buf, sz);
@ -1086,15 +1161,7 @@ namespace iwp
handle_verify_introack(iwp_async_introack *introack);
static void
handle_generated_session_start(iwp_async_session_start *start)
{
session *link = static_cast< session * >(start->user);
link->working = false;
if(llarp_ev_udp_sendto(link->udp, link->addr, start->buf, start->sz)
== -1)
llarp::Error("sendto failed");
link->EnterState(eSessionStartSent);
}
handle_generated_session_start(iwp_async_session_start *start);
bool
is_invalidated() const
@ -1284,64 +1351,10 @@ namespace iwp
handle_introack_generated(iwp_async_introack *i);
void
intro_ack()
{
llarp::Debug("session introack");
uint16_t w1sz = rand() % MAX_PAD;
introack.buf = workbuf;
introack.sz = (32 * 3) + w1sz;
// randomize padding
if(w1sz)
crypto->randbytes(introack.buf + (32 * 3), w1sz);
// randomize nonce
introack.nonce = introack.buf + 32;
crypto->randbytes(introack.nonce, 32);
// token
introack.token = token;
// keys
introack.remote_pubkey = remote;
introack.secretkey = eph_seckey;
// call
introack.user = this;
introack.hook = &handle_introack_generated;
working = true;
iwp_call_async_gen_introack(iwp, &introack);
}
intro_ack();
void
on_intro(const void *buf, size_t sz)
{
llarp::Debug("session onintro");
if(sz >= sizeof(workbuf))
{
// too big?
llarp::Error("intro too big");
delete this;
return;
}
// copy so we own it
memcpy(workbuf, buf, sz);
intro.buf = workbuf;
intro.sz = sz;
// give secret key
intro.secretkey = eph_seckey;
// and nonce
intro.nonce = intro.buf + 32;
intro.user = this;
// set call back hook
intro.hook = &handle_verify_intro;
// put remote pubkey into this buffer
intro.remote_pubkey = remote;
// call
EnterState(eIntroRecv);
working = true;
iwp_call_async_verify_intro(iwp, &intro);
}
on_intro(const void *buf, size_t sz);
void
on_intro_ack(const void *buf, size_t sz);
@ -1351,12 +1364,11 @@ namespace iwp
static void
handle_generated_intro(iwp_async_intro *i)
{
llarp::Debug("session handle genintro");
session *link = static_cast< session * >(i->user);
link->working = false;
if(i->buf)
{
llarp::Debug("send intro");
llarp::Info("send intro to ", link->addr);
if(llarp_ev_udp_sendto(link->udp, link->addr, i->buf, i->sz) == -1)
{
llarp::Warn("send intro failed");
@ -1397,6 +1409,8 @@ namespace iwp
intro.user = this;
intro.hook = &handle_generated_intro;
working = true;
llarp::Info("try introduce to transport adddress ",
llarp::RouterID(remote));
iwp_call_async_gen_intro(iwp, &intro);
// start introduce timer
establish_job_id = llarp_logic_call_later(
@ -1457,6 +1471,11 @@ namespace iwp
SessionMap_t m_Connected;
mtx_t m_Connected_Mutex;
typedef std::unordered_map< llarp::Addr, session *, llarp::addrhash >
PendingSessionMap_t;
PendingSessionMap_t m_PendingSessions;
mtx_t m_PendingSessions_Mutex;
llarp::SecretKey seckey;
server(llarp_router *r, llarp_crypto *c, llarp_logic *l,
@ -1474,6 +1493,27 @@ namespace iwp
llarp_async_iwp_free(iwp);
}
bool
has_intro_from(const llarp::Addr &from)
{
std::unique_lock< std::mutex > lock(m_PendingSessions_Mutex);
return m_PendingSessions.find(from) != m_PendingSessions.end();
}
void
put_intro_from(session *s)
{
std::unique_lock< std::mutex > lock(m_PendingSessions_Mutex);
m_PendingSessions[s->addr] = s;
}
void
remove_intro_from(const llarp::Addr &from)
{
std::unique_lock< std::mutex > lock(m_PendingSessions_Mutex);
m_PendingSessions.erase(from);
}
// set that src address has identity pubkey
void
MapAddr(const llarp::Addr &src, const llarp::PubKey &identity)
@ -1551,7 +1591,7 @@ namespace iwp
}
session *
create_session(const llarp::Addr &src)
create_session(llarp::Addr src)
{
auto s = new session(&udp, iwp, crypto, logic, seckey, src);
s->serv = this;
@ -1729,10 +1769,10 @@ namespace iwp
" != ", llarp::AlignedBuffer< 32 >(rxmsg->msginfo.hash()));
return false;
}
session *impl = static_cast< session * >(parent->impl);
success = router->HandleRecvLinkMessage(parent, buf);
if(success)
if(id == nextMsgID)
{
session *impl = static_cast< session * >(parent->impl);
success = router->HandleRecvLinkMessage(parent, buf);
if(id == 0)
{
if(impl->CheckRCValid())
@ -1750,20 +1790,27 @@ namespace iwp
impl->parent->close(impl->parent);
success = false;
}
++nextMsgID;
}
else if(recvqueue.Size() > 2)
{
return process_inbound_queue();
}
}
if(!success)
llarp::Warn("failed to handle inbound message ", id);
}
else
{
llarp::Warn("failed to reassemble message ", id);
else
{
recvqueue.Put(new InboundMessage(id, msg));
success = true;
}
}
delete rxmsg;
rx.erase(id);
return success;
} // namespace iwp
if(!success)
llarp::Warn("Failed to process inbound message ", id);
return success;
}
void
session::handle_verify_intro(iwp_async_intro *intro)
{
@ -1771,13 +1818,97 @@ namespace iwp
self->working = false;
if(!intro->buf)
{
self->serv->remove_intro_from(self->addr);
llarp::Error("intro verify failed from ", self->addr, " via ",
self->serv->addr);
delete self;
return;
}
self->intro_ack();
}
void
session::handle_generated_session_start(iwp_async_session_start *start)
{
session *link = static_cast< session * >(start->user);
link->working = false;
if(llarp_ev_udp_sendto(link->udp, link->addr, start->buf, start->sz) == -1)
llarp::Error("sendto failed");
link->EnterState(eSessionStartSent);
link->serv->remove_intro_from(link->addr);
}
void
session::on_intro(const void *buf, size_t sz)
{
llarp::Debug("session onintro");
if(sz >= sizeof(workbuf))
{
// too big?
llarp::Error("intro too big");
delete this;
return;
}
if(serv->has_intro_from(addr))
{
llarp::Error("duplicate intro from ", addr);
delete this;
return;
}
serv->put_intro_from(this);
// copy so we own it
memcpy(workbuf, buf, sz);
intro.buf = workbuf;
intro.sz = sz;
// give secret key
intro.secretkey = eph_seckey;
// and nonce
intro.nonce = intro.buf + 32;
intro.user = this;
// set call back hook
intro.hook = &handle_verify_intro;
// put remote pubkey into this buffer
intro.remote_pubkey = remote;
// call
EnterState(eIntroRecv);
working = true;
iwp_call_async_verify_intro(iwp, &intro);
}
void
session::intro_ack()
{
if(serv->has_session_to(addr))
{
llarp::Warn("won't ack intro for duplicate session from ", addr);
return;
}
llarp::Debug("session introack");
uint16_t w1sz = rand() % MAX_PAD;
introack.buf = workbuf;
introack.sz = (32 * 3) + w1sz;
// randomize padding
if(w1sz)
crypto->randbytes(introack.buf + (32 * 3), w1sz);
// randomize nonce
introack.nonce = introack.buf + 32;
crypto->randbytes(introack.nonce, 32);
// token
introack.token = token;
// keys
introack.remote_pubkey = remote;
introack.secretkey = eph_seckey;
// call
introack.user = this;
introack.hook = &handle_introack_generated;
working = true;
iwp_call_async_gen_introack(iwp, &introack);
}
void
session::session_established()
{
@ -1792,6 +1923,7 @@ namespace iwp
session::done()
{
auto logic = serv->logic;
serv->remove_intro_from(addr);
if(establish_job_id)
{
llarp_logic_remove_call(logic, establish_job_id);
@ -1817,6 +1949,7 @@ namespace iwp
serv->RemoveSessionByAddr(addr);
return;
}
serv->put_intro_from(this);
// copy buffer so we own it
memcpy(workbuf, buf, sz);
// set intro ack parameters
@ -1916,12 +2049,14 @@ namespace iwp
session::handle_verify_introack(iwp_async_introack *introack)
{
session *link = static_cast< session * >(introack->user);
link->working = false;
if(introack->buf == nullptr)
{
// invalid signature
llarp::Error("introack verify failed from ", link->addr);
// link->serv->RemoveSessionByAddr(link->addr);
link->serv->remove_intro_from(link->addr);
link->serv->RemoveSessionByAddr(link->addr);
return;
}
link->EnterState(eIntroAckRecv);
@ -1948,31 +2083,27 @@ namespace iwp
" frames left");
return !working;
}
// send keepalive if we are established or a session is made
if(state == eEstablished || state == eLIMSent)
{
llarp::Debug("Tick - sending keepalive because state=",
state == eEstablished ? "eEstablished" : "",
state == eLIMSent ? "eLIMSent" : "");
send_keepalive(this);
}
// pump frame state
if(state == eEstablished)
{
// llarp::Debug("Tick - pumping and retransmitting because we're
// eEstablished");
if(now - frame.lastEvent > 1000)
{
send_keepalive(this);
}
frame.retransmit(now);
pump();
PumpCryptoOutbound();
}
// TODO: determine if we are too idle
return false;
return !frame.process_inbound_queue();
}
void
session::PumpCryptoOutbound()
{
working = true;
llarp_threadpool_queue_job(serv->worker, {this, &handle_crypto_outbound});
}
@ -1981,18 +2112,21 @@ namespace iwp
{
session *self = static_cast< session * >(u);
self->EncryptOutboundFrames();
self->working = false;
}
void
session::handle_verify_session_start(iwp_async_session_start *s)
{
session *self = static_cast< session * >(s->user);
self->serv->remove_intro_from(self->addr);
self->working = false;
if(!s->buf)
{
// verify fail
// TODO: remove session?
llarp::Warn("session start verify failed from ", self->addr);
self->serv->RemoveSessionByAddr(self->addr);
return;
}
self->send_LIM();
@ -2037,7 +2171,6 @@ namespace iwp
const char *ifname, int af, uint16_t port)
{
server *link = static_cast< server * >(l->impl);
if(!link->ensure_privkey())
{
llarp::Error("failed to ensure private key");
@ -2114,7 +2247,7 @@ namespace iwp
link->timeout_job_id = 0;
link->logic = logic;
// start cleanup timer
link->issue_cleanup_timer(100);
link->issue_cleanup_timer(500);
return true;
}
@ -2158,6 +2291,8 @@ namespace iwp
s = link->create_session(dst);
link->put_session(dst, s);
}
else
return false;
s->establish_job = job;
s->frame.alive(); // mark it alive
s->introduce(job->ai.enc_key);
@ -2208,31 +2343,25 @@ namespace iwp
session::handle_introack_generated(iwp_async_introack *i)
{
session *link = static_cast< session * >(i->user);
if(i->buf)
if(i->buf && link->serv->has_intro_from(link->addr))
{
// track it with the server here
if(link->serv->has_session_to(link->addr))
{
// duplicate session
llarp::Warn("duplicate session to ", link->addr);
delete link;
return;
}
link->frame.alive();
link->EnterState(eIntroAckSent);
link->serv->put_session(link->addr, link);
llarp::Debug("send introack to ", link->addr, " via ", link->serv->addr);
if(llarp_ev_udp_sendto(link->udp, link->addr, i->buf, i->sz) == -1)
{
llarp::Warn("sendto failed");
return;
}
link->EnterState(eIntroAckSent);
llarp_ev_udp_sendto(link->udp, link->addr, i->buf, i->sz);
}
else
{
// failed to generate?
llarp::Warn("failed to generate introack");
delete link;
}
}
} // namespace iwp

@ -181,15 +181,22 @@ namespace llarp
bool
operator<(const Addr& other) const
{
return port() < other.port() || *addr6() < *other.addr6()
|| af() < other.af();
if(af() == AF_INET && other.af() == AF_INET)
return port() < other.port() || addr4()->s_addr < other.addr4()->s_addr;
else
return port() < other.port() || *addr6() < *other.addr6()
|| af() < other.af();
}
bool
operator==(const Addr& other) const
{
return af() == other.af() && memcmp(addr6(), other.addr6(), 16) == 0
&& port() == other.port();
if(af() == AF_INET && other.af() == AF_INET)
return port() == other.port()
&& addr4()->s_addr == other.addr4()->s_addr;
else
return af() == other.af() && memcmp(addr6(), other.addr6(), 16) == 0
&& port() == other.port();
}
bool

Loading…
Cancel
Save