some fixups on link layer

only add trusted routers to DHT
pull/1/head
Jeff Becker 6 years ago
parent b2545dee34
commit 87aabefddc
No known key found for this signature in database
GPG Key ID: F357B3B42F6F9B05

@ -93,8 +93,8 @@ namespace iwp
memcpy(ptr + 2, &sz, 2);
}
uint8_t
flags() const
uint8_t &
flags()
{
return ptr[5];
}
@ -293,12 +293,12 @@ namespace iwp
template < typename T >
void
generate_xmit(T &queue)
generate_xmit(T &queue, byte_t flags = 0)
{
queue.emplace();
auto &xmitbuf = queue.back();
auto body_ptr = init_sendbuf(xmitbuf, eXMIT,
sizeof(msginfo.buffer) + lastfrag.size(), 0);
auto body_ptr = init_sendbuf(
xmitbuf, eXMIT, sizeof(msginfo.buffer) + lastfrag.size(), flags);
memcpy(body_ptr, msginfo.buffer, sizeof(msginfo.buffer));
body_ptr += sizeof(msginfo.buffer);
memcpy(body_ptr, lastfrag.data(), lastfrag.size());
@ -306,7 +306,7 @@ namespace iwp
template < typename T >
void
retransmit_frags(T &queue)
retransmit_frags(T &queue, byte_t flags = 0)
{
auto msgid = msginfo.msgid();
auto fragsize = msginfo.fragsize();
@ -316,7 +316,7 @@ namespace iwp
continue;
queue.emplace();
auto &fragbuf = queue.back();
auto body_ptr = init_sendbuf(fragbuf, eFRAG, 9 + fragsize, 0);
auto body_ptr = init_sendbuf(fragbuf, eFRAG, 9 + fragsize, flags);
memcpy(body_ptr, &msgid, 8);
body_ptr[8] = frag.first;
memcpy(body_ptr + 9, frag.second.data(), fragsize);
@ -381,6 +381,8 @@ namespace iwp
struct frame_state
{
byte_t rxflags = 0;
byte_t txflags = 0;
uint64_t rxids = 0;
uint64_t txids = 0;
llarp_time_t lastEvent = 0;
@ -394,6 +396,13 @@ namespace iwp
sendqueue_t sendqueue;
/// return true if both sides have the same state flags
bool
flags_agree(byte_t flags) const
{
return (rxflags & flags) == (txflags & flags);
}
void
clear()
{
@ -412,8 +421,7 @@ namespace iwp
llarp::Debug("ACK for msgid=", id, " mask=", bitmask);
sendqueue.emplace();
auto &buf = sendqueue.back();
// TODO: set flags to nonzero as needed
init_sendbuf(buf, eACKS, 12, 0);
init_sendbuf(buf, eACKS, 12, txflags);
// TODO: this assumes big endian
memcpy(buf.data() + 6, &id, 8);
memcpy(buf.data() + 14, &bitmask, 4);
@ -540,8 +548,8 @@ namespace iwp
auto itr = tx.emplace(id, msg);
if(itr.second)
{
msg->generate_xmit(sendqueue);
msg->retransmit_frags(sendqueue);
msg->generate_xmit(sendqueue, txflags);
msg->retransmit_frags(sendqueue, txflags);
}
else // duplicate
delete msg;
@ -572,9 +580,17 @@ namespace iwp
process(uint8_t *buf, size_t sz)
{
frame_header hdr(buf);
if(hdr.flags() | eSessionInvalidated)
{
rxflags |= eSessionInvalidated;
}
switch(hdr.msgtype())
{
case eALIV:
if(rxflags & eSessionInvalidated)
{
txflags |= eSessionInvalidated;
}
alive();
return true;
case eXMIT:
@ -608,9 +624,9 @@ namespace iwp
llarp_link_establish_job *establish_job = nullptr;
uint32_t establish_job_id = 0;
uint32_t keepalive_timer_id = 0;
uint32_t frames = 0;
uint16_t ticks = 0;
uint32_t establish_job_id = 0;
uint32_t frames = 0;
llarp::Addr addr;
iwp_async_intro intro;
@ -683,6 +699,9 @@ namespace iwp
pump();
}
static void
handle_invalidate_timer(void *user);
void
pump()
{
@ -767,48 +786,33 @@ namespace iwp
}
static void
send_keepalive(void *user)
{
session *self = static_cast< session * >(user);
// all zeros means keepalive
byte_t tmp[MAX_PAD + 8] = {0};
// 8 bytes iwp header overhead
int padsz = rand() % (sizeof(tmp) - 8);
auto buf = llarp::StackBuffer< decltype(tmp) >(tmp);
if(padsz)
self->crypto->randbytes(buf.base + 8, padsz);
buf.sz -= padsz;
// send frame after encrypting
self->encrypt_frame_async_send(buf.base, buf.sz);
send_keepalive(void *user);
// send another keepalive
self->schedule_keepalive();
}
static void
handle_keepalive_timer(void *user, uint64_t orig, uint64_t left)
// return true if we should be removed
bool
Tick(uint64_t now)
{
session *self = static_cast< session * >(user);
self->keepalive_timer_id = 0;
// timeout cancelled
if(left)
if(timedout(now, SESSION_TIMEOUT))
{
return;
// we are timed out
// when we are done doing stuff with all of our frames from the crypto
// workers we are done
return frames == 0;
}
auto now = llarp_time_now_ms();
if(self->timedout(now, SESSION_TIMEOUT - 1000))
if(is_invalidated())
{
// about to time out so don't reschedle timer
return;
// both sides agreeed to session invalidation
// terminate our session when all of our frames from the crypto workers
// are done
return frames == 0;
}
llarp_logic_queue_job(self->logic, {self, &send_keepalive});
}
// only send keepalive very 10 ticks
++ticks;
if(ticks % 10 == 0)
send_keepalive(this);
void
schedule_keepalive()
{
keepalive_timer_id = llarp_logic_call_later(
logic, {1000UL, this, &handle_keepalive_timer});
// TODO: determine if we are too idle
return false;
}
void
@ -816,7 +820,6 @@ namespace iwp
{
EnterState(eEstablished);
llarp_logic_cancel_call(logic, establish_job_id);
schedule_keepalive();
}
void
@ -868,7 +871,12 @@ namespace iwp
static void
close(llarp_link_session *s)
{
// TODO: implement
session *impl = static_cast< session * >(s->impl);
// set our side invalidated and close async when the other side also marks
// as session invalidated
impl->frame.txflags |= eSessionInvalidated;
// TODO: add timer for session invalidation
llarp_logic_queue_job(impl->logic, {impl, &send_keepalive});
}
static void
@ -895,6 +903,12 @@ namespace iwp
link->EnterState(eSessionStartSent);
}
bool
is_invalidated() const
{
return frame.flags_agree(eSessionInvalidated);
}
void
session_start()
{
@ -1133,6 +1147,11 @@ namespace iwp
logic, {5000, this, &handle_establish_timeout});
}
// handle session being over
// called right before deallocation
void
done();
void
EnterState(State st)
{
@ -1202,6 +1221,26 @@ namespace iwp
return serv->m_Connected.find(pubkey) != serv->m_Connected.end();
}
static void
HandleSessionTicker(void *user, uint64_t orig, uint64_t left)
{
if(left)
return;
server *serv = static_cast< server * >(user);
auto now = llarp_time_now_ms();
{
lock_t lock(serv->m_sessions_Mutex);
std::set< llarp::Addr > remove;
auto itr = serv->m_sessions.begin();
while(itr != serv->m_sessions.end())
if(static_cast< session * >(itr->second.impl)->Tick(now))
remove.insert(itr->first);
for(const auto &addr : remove)
serv->RemoveSessionByAddr(addr);
}
}
static bool
SendToSession(llarp_link *l, const byte_t *pubkey, llarp_buffer_t buf)
{
@ -1233,8 +1272,8 @@ namespace iwp
if(itr == std::end(m_Connected))
return;
// remove from dht tracking
llarp_dht_remove_local_router(router->dht, itr->first);
// tell router we are done with this session
router->SessionClosed(itr->first);
m_Connected.erase(itr);
}
@ -1298,6 +1337,28 @@ namespace iwp
}
}
void
RemoveSessionByAddr(const llarp::Addr &addr)
{
auto itr = m_sessions.find(addr);
if(itr != m_sessions.end())
{
llarp::Debug("removing session ", addr);
UnmapAddr(addr);
session *s = static_cast< session * >(itr->second.impl);
m_sessions.erase(itr);
s->done();
if(s->frames)
{
llarp::Warn("session has ", s->frames,
" left but is idle, not deallocating session so we "
"leak but don't die");
}
else
delete s;
}
}
void
cleanup_dead()
{
@ -1314,39 +1375,7 @@ namespace iwp
for(const auto &addr : remove)
{
auto itr = m_sessions.find(addr);
if(itr != m_sessions.end())
{
llarp::Debug("session with ", addr, " is stale, removing");
UnmapAddr(addr);
session *s = static_cast< session * >(itr->second.impl);
m_sessions.erase(itr);
if(s->keepalive_timer_id)
{
llarp_logic_remove_call(logic, s->keepalive_timer_id);
}
// cancel establish job
if(s->establish_job_id)
{
llarp_logic_remove_call(logic, s->establish_job_id);
}
if(s->establish_job)
{
auto job = s->establish_job;
job->link = s->serv->parent;
job->session = nullptr;
job->result(job);
}
if(s->frames)
{
llarp::Warn("session has ", s->frames,
" left but is idle, not deallocating session so we "
"leak but don't die");
}
else
delete s;
}
RemoveSessionByAddr(addr);
}
}
}
@ -1469,6 +1498,47 @@ namespace iwp
return success;
}
void
session::done()
{
if(establish_job_id)
{
llarp_logic_remove_call(logic, establish_job_id);
}
if(establish_job)
{
auto job = establish_job;
job->link = serv->parent;
job->session = nullptr;
job->result(job);
}
}
void
session::send_keepalive(void *user)
{
session *self = static_cast< session * >(user);
// if both sides agree on invalidation
if(self->is_invalidated())
{
// don't send keepalive
return;
}
// all zeros means keepalive
byte_t tmp[MAX_PAD + 8] = {0};
// set flags for tx
frame_header hdr(tmp);
hdr.flags() = self->frame.txflags;
// 8 bytes iwp header overhead
int padsz = rand() % (sizeof(tmp) - 8);
auto buf = llarp::StackBuffer< decltype(tmp) >(tmp);
if(padsz)
self->crypto->randbytes(buf.base + 8, padsz);
buf.sz -= padsz;
// send frame after encrypting
self->encrypt_frame_async_send(buf.base, buf.sz);
}
bool
frame_state::got_acks(frame_header &hdr, size_t sz)
{
@ -1744,7 +1814,6 @@ namespace iwp
}
extern "C" {
void
iwp_link_init(struct llarp_link *link, struct llarp_iwp_args args)
{

@ -239,29 +239,44 @@ llarp_router::on_verify_server_rc(llarp_async_verify_rc *job)
}
llarp::Debug("rc verified");
// this was an outbound establish job
if(ctx->establish_job->session)
// track valid router in dht
llarp::pubkey pubkey;
memcpy(&pubkey[0], job->rc.pubkey, pubkey.size());
// refresh valid routers RC value if it's there
auto v = router->validRouters.find(pubkey);
if(v != router->validRouters.end())
{
llarp::pubkey pubkey;
memcpy(&pubkey[0], job->rc.pubkey, pubkey.size());
// free previous RC members
llarp_rc_free(&v->second);
}
router->validRouters[pubkey] = job->rc;
// refresh valid routers RC value if it's there
auto v = router->validRouters.find(pubkey);
if(v != router->validRouters.end())
{
// free previous RC members
llarp_rc_free(&v->second);
}
router->validRouters[pubkey] = job->rc;
// track valid router in dht
llarp_dht_put_local_router(router->dht, &router->validRouters[pubkey]);
// this was an outbound establish job
if(ctx->establish_job->session)
{
router->FlushOutboundFor(pubkey);
}
else
llarp_rc_free(&job->rc);
llarp_rc_free(&job->rc);
delete job;
}
void
llarp_router::SessionClosed(const llarp::RouterID &remote)
{
// remove from valid routers and dht if it's a valid router
auto itr = validRouters.find(remote);
if(itr == validRouters.end())
return;
llarp_dht_remove_local_router(dht, remote);
llarp_rc_free(&itr->second);
validRouters.erase(itr);
}
void
llarp_router::FlushOutboundFor(const llarp::RouterID &remote)
{
@ -310,8 +325,6 @@ llarp_router::on_try_connect_result(llarp_link_establish_job *job)
if(job->session)
{
auto session = job->session;
llarp_dht_put_local_router(router->dht,
session->get_remote_router(session));
router->async_verify_RC(session, false, job);
return;
}

@ -122,6 +122,10 @@ struct llarp_router
void
FlushOutbound();
/// called by link when a remote session is expunged
void
SessionClosed(const llarp::RouterID &remote);
void
async_verify_RC(llarp_link_session *session, bool isExpectingClient,
llarp_link_establish_job *job = nullptr);

Loading…
Cancel
Save