* codel related changes

* add tick callback to udp event loop
pull/3/head
Jeff Becker 6 years ago
parent 86f16ff090
commit 1c26fb5e40
No known key found for this signature in database
GPG Key ID: F357B3B42F6F9B05

@ -50,6 +50,8 @@ struct llarp_udp_io
void *user;
void *impl;
struct llarp_ev_loop *parent;
/// called every event loop tick after reads
void (*tick)(struct llarp_udp_io *);
void (*recvfrom)(struct llarp_udp_io *, const struct sockaddr *, const void *,
ssize_t);
};

@ -5,42 +5,49 @@
#include <functional>
#include <mutex>
#include <queue>
#include <string>
namespace llarp
{
namespace util
{
template < typename T, typename GetTime, llarp_time_t dropMs = 20,
llarp_time_t initialIntervalMs = 50 >
template < typename T, typename GetTime, typename PutTime,
llarp_time_t dropMs = 20, llarp_time_t initialIntervalMs = 100 >
struct CoDelQueue
{
CoDelQueue(const std::string& name) : m_name(name)
{
}
struct CoDelCompare
{
GetTime getTime = GetTime();
bool
operator()(const T& left, const T& right) const
{
return getTime(left) < getTime(right);
return GetTime()(left) < GetTime()(right);
}
};
void
Put(T* item)
Put(const T& i)
{
std::unique_lock< std::mutex > lock(m_QueueMutex);
m_Queue.push(*item);
PutTime()(i);
m_Queue.push(i);
if(firstPut == 0)
firstPut = GetTime()(i);
}
void
Process(std::queue< T >& result)
{
llarp_time_t lowest = 0xFFFFFFFFFFFFFFFFUL;
auto start = llarp_time_now_ms();
std::unique_lock< std::mutex > lock(m_QueueMutex);
auto start = firstPut;
while(m_Queue.size())
{
const auto& item = m_Queue.top();
auto dlt = start - getTime(item);
auto dlt = start - GetTime()(item);
lowest = std::min(dlt, lowest);
if(m_Queue.size() == 1)
{
@ -48,9 +55,11 @@ namespace llarp
{
// drop
nextTickInterval += initialIntervalMs / std::sqrt(++dropNum);
llarp::Info("CoDel drop ", nextTickInterval, " ms next interval");
llarp::Info("CoDel quque ", m_name, " drop ", nextTickInterval,
" ms next interval lowest=", lowest);
delete item;
m_Queue.pop();
return;
break;
}
else
{
@ -61,13 +70,15 @@ namespace llarp
result.push(item);
m_Queue.pop();
}
firstPut = 0;
}
GetTime getTime = GetTime();
llarp_time_t firstPut = 0;
size_t dropNum = 0;
llarp_time_t nextTickInterval = initialIntervalMs;
std::mutex m_QueueMutex;
std::priority_queue< T, std::vector< T >, CoDelCompare > m_Queue;
std::string m_name;
};
} // namespace util
} // namespace llarp

@ -13,7 +13,6 @@
#endif
extern "C" {
void
llarp_ev_loop_alloc(struct llarp_ev_loop **ev)
{

@ -3,6 +3,7 @@
#include <llarp/ev.h>
#include <unistd.h>
#include <list>
namespace llarp
{
@ -12,6 +13,7 @@ namespace llarp
ev_io(int f) : fd(f){};
virtual int
read(void* buf, size_t sz) = 0;
virtual int
sendto(const sockaddr* dst, const void* data, size_t sz) = 0;
virtual ~ev_io()
@ -42,6 +44,8 @@ struct llarp_ev_loop
close_ev(llarp::ev_io* ev) = 0;
virtual ~llarp_ev_loop(){};
std::list< llarp_udp_io* > udp_listeners;
};
#endif

@ -131,6 +131,9 @@ struct llarp_epoll_loop : public llarp_ev_loop
++idx;
}
}
for(auto& l : udp_listeners)
if(l->tick)
l->tick(l);
return result;
}
@ -166,6 +169,9 @@ struct llarp_epoll_loop : public llarp_ev_loop
++idx;
}
}
for(auto& l : udp_listeners)
if(l->tick)
l->tick(l);
} while(epollfd != -1);
return result;
}
@ -238,6 +244,7 @@ struct llarp_epoll_loop : public llarp_ev_loop
return false;
}
l->impl = listener;
udp_listeners.push_back(l);
return true;
}
@ -252,6 +259,7 @@ struct llarp_epoll_loop : public llarp_ev_loop
close_ev(listener);
l->impl = nullptr;
delete listener;
udp_listeners.remove(l);
}
return ret;
}

@ -123,6 +123,9 @@ struct llarp_kqueue_loop : public llarp_ev_loop
++idx;
}
}
for(auto& l : udp_listeners)
if(l->tick)
l->tick(l);
return result;
}
@ -151,6 +154,9 @@ struct llarp_kqueue_loop : public llarp_ev_loop
++idx;
}
}
for(auto& l : udp_listeners)
if(l->tick)
l->tick(l);
} while(result != -1);
return result;
}
@ -235,7 +241,7 @@ struct llarp_kqueue_loop : public llarp_ev_loop
delete listener;
return false;
}
udp_listeners.push_back(l);
l->impl = listener;
return true;
}
@ -250,6 +256,7 @@ struct llarp_kqueue_loop : public llarp_ev_loop
ret = close_ev(listener);
delete listener;
l->impl = nullptr;
udp_listeners.remove(l);
}
return ret;
}

@ -557,7 +557,7 @@ namespace iwp
if(itr == rx.end())
{
llarp::Warn("no such RX fragment, msgid=", msgid);
return false;
return true;
}
auto fragsize = itr->second->msginfo.fragsize();
if(fragsize != sz - 9)
@ -663,9 +663,18 @@ namespace iwp
struct FrameGetTime
{
llarp_time_t
operator()(const iwp_async_frame &frame) const
operator()(const iwp_async_frame *frame) const
{
return frame->created;
}
};
struct FramePutTime
{
void
operator()(iwp_async_frame *frame) const
{
return frame.created;
frame->created = llarp_time_now_ms();
}
};
@ -689,14 +698,19 @@ namespace iwp
llarp_link_establish_job *establish_job = nullptr;
/// cached timestamp for frame creation
llarp_time_t now;
llarp_time_t now, inboundNow;
uint32_t establish_job_id = 0;
uint32_t frames = 0;
bool working = false;
llarp::util::CoDelQueue< iwp_async_frame, FrameGetTime > outboundFrames;
llarp::util::CoDelQueue< iwp_async_frame *, FrameGetTime, FramePutTime >
outboundFrames;
/*
std::mutex m_EncryptedFramesMutex;
std::queue< iwp_async_frame > encryptedFrames;
llarp::util::CoDelQueue< iwp_async_frame *, FrameGetTime, FramePutTime >
decryptedFrames;
*/
uint32_t pump_send_timer_id = 0;
uint32_t pump_recv_timer_id = 0;
@ -706,6 +720,7 @@ namespace iwp
iwp_async_introack introack;
iwp_async_session_start start;
frame_state frame;
bool started_inbound_codel = false;
byte_t token[32];
byte_t workbuf[MAX_PAD + 128];
@ -727,7 +742,14 @@ namespace iwp
session(llarp_udp_io *u, llarp_async_iwp *i, llarp_crypto *c,
llarp_logic *l, const byte_t *seckey, const llarp::Addr &a)
: udp(u), crypto(c), iwp(i), logic(l), addr(a), state(eInitial)
: udp(u)
, crypto(c)
, iwp(i)
, logic(l)
, outboundFrames("iwp_outbound")
//, decryptedFrames("iwp_inbound")
, addr(a)
, state(eInitial)
{
eph_seckey = seckey;
llarp::Zero(&remote_router, sizeof(llarp_rc));
@ -768,6 +790,7 @@ namespace iwp
" lastfrag=", (int)msg->msginfo.lastfrag());
frame.queue_tx(id, msg);
pump();
PumpCryptoOutbound();
}
static void
@ -792,23 +815,74 @@ namespace iwp
return false;
}
static void
handle_codel_outbound_pump(void *u, uint64_t orig, uint64_t left);
void
PumpCrypto();
PumpCryptoOutbound();
/*
void
HandleInboundCodel()
{
std::queue< iwp_async_frame * > outq;
decryptedFrames.Process(outq);
while(outq.size())
{
auto &front = outq.front();
handle_frame_decrypt(front);
delete front;
outq.pop();
}
PumpCryptoOutbound();
}
void
HandleCodelOutboundPump();
static void
handle_inbound_codel_delayed(void *user, uint64_t orig, uint64_t left)
{
if(left)
return;
session *self = static_cast< session * >(user);
self->pump_recv_timer_id = 0;
self->HandleInboundCodel();
self->PumpCodelInbound();
}
void
PumpCodelOutbound()
{
pump_send_timer_id = llarp_logic_call_later(
logic,
{outboundFrames.nextTickInterval, this, &handle_codel_outbound_pump});
}
static void
handle_start_inbound_codel(void *user)
{
session *self = static_cast< session * >(user);
self->HandleInboundCodel();
self->PumpCodelInbound();
}
void
StartInboundCodel()
{
if(started_inbound_codel)
return;
started_inbound_codel = true;
llarp_logic_queue_job(logic, {this, &handle_start_inbound_codel});
}
static void
handle_pump_inbound_codel(void *user)
{
session *self = static_cast< session * >(user);
self->HandleInboundCodel();
}
void
ManualPumpInboundCodel()
{
llarp_logic_queue_job(logic, {this, &handle_pump_inbound_codel});
}
void
PumpCodelInbound()
{
pump_recv_timer_id =
llarp_logic_call_later(logic,
{decryptedFrames.nextTickInterval, this,
&handle_inbound_codel_delayed});
}
*/
void
pump()
{
@ -821,8 +895,6 @@ namespace iwp
encrypt_frame_async_send(buf.base, buf.sz);
frame.pop_next_frame();
}
PumpCrypto();
HandleCodelOutboundPump();
}
// this is called from net thread
@ -1033,8 +1105,22 @@ namespace iwp
{
if(sz > 64)
{
llarp::Debug("decrypt frame ", sz);
auto f = alloc_frame(buf, sz);
auto f = alloc_frame(buf, sz);
/*
if(iwp_decrypt_frame(f))
{
decryptedFrames.Put(f);
if(state == eEstablished)
{
if(pump_recv_timer_id == 0)
PumpCodelInbound();
}
else
ManualPumpInboundCodel();
}
else
llarp::Warn("decrypt frame fail");
*/
f->hook = &handle_frame_decrypt;
iwp_call_async_frame_decrypt(iwp, f);
}
@ -1043,7 +1129,7 @@ namespace iwp
}
static void
handle_crypto_pump(void *u);
handle_crypto_outbound(void *u);
static void
handle_frame_encrypt(iwp_async_frame *frame)
@ -1069,7 +1155,6 @@ namespace iwp
frame->sz = sz;
frame->user = this;
frame->sessionkey = sessionkey;
frame->created = now;
return frame;
}
@ -1089,28 +1174,16 @@ namespace iwp
void
EncryptOutboundFrames()
{
std::queue< iwp_async_frame > q;
std::queue< iwp_async_frame > outq;
std::queue< iwp_async_frame * > outq;
outboundFrames.Process(outq);
while(outq.size())
{
auto &front = outq.front();
if(iwp_encrypt_frame(&front))
q.push(front);
if(iwp_encrypt_frame(front))
handle_frame_encrypt(front);
delete front;
outq.pop();
}
{
std::unique_lock< std::mutex > lock(m_EncryptedFramesMutex);
while(q.size())
{
encryptedFrames.push(q.front());
q.pop();
}
if(encryptedFrames.size() && pump_send_timer_id == 0)
{
PumpCodelOutbound();
}
}
}
static void
@ -1245,12 +1318,13 @@ namespace iwp
{
frame.alive();
state = st;
if(state == eLIMSent || state == eSessionStartSent)
if(state == eSessionStartSent || state == eIntroAckSent)
{
HandleCodelOutboundPump();
PumpCryptoOutbound();
// StartInboundCodel();
}
}
};
}; // namespace iwp
struct server
{
@ -1499,7 +1573,6 @@ namespace iwp
server *link = static_cast< server * >(l);
link->timeout_job_id = 0;
link->TickSessions();
// TODO: exponential backoff for cleanup timer ?
link->issue_cleanup_timer(orig);
}
@ -1678,6 +1751,8 @@ namespace iwp
auto buf = llarp::StackBuffer< decltype(tmp) >(tmp);
self->now = llarp_time_now_ms();
self->encrypt_frame_async_send(buf.base, buf.sz);
self->pump();
self->PumpCryptoOutbound();
}
bool
@ -1704,8 +1779,8 @@ namespace iwp
auto itr = tx.find(msgid);
if(itr == tx.end())
{
llarp::Error("ACK for missing TX frame msgid=", msgid);
return false;
llarp::Debug("ACK for missing TX frame msgid=", msgid);
return true;
}
transit_message *msg = itr->second;
@ -1778,46 +1853,20 @@ namespace iwp
{
frame.retransmit();
pump();
PumpCryptoOutbound();
}
// TODO: determine if we are too idle
return false;
}
void
session::HandleCodelOutboundPump()
{
{
std::unique_lock< std::mutex > lock(m_EncryptedFramesMutex);
while(encryptedFrames.size())
{
auto &front = encryptedFrames.front();
handle_frame_encrypt(&front);
encryptedFrames.pop();
}
}
}
void
session::handle_codel_outbound_pump(void *u, uint64_t orig, uint64_t left)
{
if(left)
return;
session *self = static_cast< session * >(u);
self->pump_send_timer_id = 0;
if(self->timedout(llarp_time_now_ms()))
return;
self->HandleCodelOutboundPump();
self->PumpCodelOutbound();
}
void
session::PumpCrypto()
session::PumpCryptoOutbound()
{
llarp_threadpool_queue_job(serv->worker, {this, &handle_crypto_pump});
llarp_threadpool_queue_job(serv->worker, {this, &handle_crypto_outbound});
}
void
session::handle_crypto_pump(void *u)
session::handle_crypto_outbound(void *u)
{
session *self = static_cast< session * >(u);
self->EncryptOutboundFrames();
@ -1935,6 +1984,7 @@ namespace iwp
link->netloop = netloop;
link->udp.recvfrom = &server::handle_recvfrom;
link->udp.user = link;
link->udp.tick = nullptr;
llarp::Debug("bind IWP link to ", link->addr);
if(llarp_ev_add_udp(link->netloop, &link->udp, link->addr) == -1)
{
@ -1953,7 +2003,7 @@ namespace iwp
link->timeout_job_id = 0;
link->logic = logic;
// start cleanup timer
link->issue_cleanup_timer(1000);
link->issue_cleanup_timer(100);
return true;
}

Loading…
Cancel
Save