* add ProcessIf in CoDel queue to requeue unprocessed items

* more tun code
* make event loop tick and write with tun
pull/14/head
Jeff Becker 6 years ago
parent 4aebbda640
commit 4e2f0ccbf8
No known key found for this signature in database
GPG Key ID: F357B3B42F6F9B05

@ -113,20 +113,43 @@ namespace llarp
}
}
template < typename Func >
template < typename Q >
void
Process(Func visitor)
_sort(Q& queue)
{
std::vector< std::unique_ptr< T > > q;
while(queue.size())
{
q.push_back(std::move(queue.front()));
queue.pop();
}
std::sort(q.begin(), q.end(), Compare());
auto itr = q.begin();
while(itr != q.end())
{
queue.push(std::move(*itr));
++itr;
}
}
/// visit returns true to discard entry otherwise the entry is
/// re quened
template < typename Visit >
void
ProcessIf(Visit visitor)
{
llarp_time_t lowest = 0xFFFFFFFFFFFFFFFFUL;
// auto start = llarp_time_now_ms();
// llarp::LogInfo("CoDelQueue::Process - start at ", start);
Lock_t lock(m_QueueMutex);
_sort(m_Queue);
auto start = firstPut;
std::queue< std::unique_ptr< T > > requeue;
while(m_Queue.size())
{
llarp::LogDebug("CoDelQueue::Process - queue has ", m_Queue.size());
const auto& item = m_Queue.top();
auto dlt = start - GetTime()(item.get());
auto& item = m_Queue.front();
auto dlt = start - GetTime()(item.get());
// llarp::LogInfo("CoDelQueue::Process - dlt ", dlt);
lowest = std::min(dlt, lowest);
if(m_Queue.size() == 1)
@ -146,19 +169,32 @@ namespace llarp
}
}
// llarp::LogInfo("CoDelQueue::Process - passing");
visitor(item);
if(!visitor(item))
{
// requeue item as we are not done
requeue.push(std::move(item));
}
m_Queue.pop();
}
m_Queue = std::move(requeue);
firstPut = 0;
}
template < typename Func >
void
Process(Func visitor)
{
ProcessIf([visitor](const std::unique_ptr< T >& t) -> bool {
visitor(t);
return true;
});
}
llarp_time_t firstPut = 0;
size_t dropNum = 0;
llarp_time_t nextTickInterval = initialIntervalMs;
Mutex_t m_QueueMutex;
std::priority_queue< std::unique_ptr< T >,
std::vector< std::unique_ptr< T > >, Compare >
m_Queue;
std::queue< std::unique_ptr< T > > m_Queue;
std::string m_name;
};
} // namespace util

@ -41,6 +41,7 @@ namespace llarp
SetupNetworking();
/// overrides Endpoint
/// handle inbound traffic
void
HandleDataMessage(service::ProtocolMessage* msg);
@ -54,12 +55,15 @@ namespace llarp
llarp_tun_io tunif;
/// called before writing to tun interface
static void
tunifBeforeWrite(llarp_tun_io* t);
/// called every time we wish to read a packet from the tun interface
static void
tunifRecvPkt(llarp_tun_io* t, const void* pkt, ssize_t sz);
/// called in the endpoint logic thread
static void
handleTickTun(void* u);
@ -75,17 +79,31 @@ namespace llarp
/// return true if we have a remote loki address for this ip address
bool
HasRemoteForIP(const uint32_t& ipv4) const;
/// get ip address for service address unconditionally
uint32_t
ObtainIPForAddr(const service::Address& addr);
/// mark this address as active
void
MarkIPActive(uint32_t ip);
private:
/// handles setup, given value true on success and false on failure to set
/// up interface
std::promise< bool > m_TunSetupResult;
/// maps ip to service address
std::unordered_map< uint32_t, service::Address > m_IPToAddr;
/// maps service address to ip
std::unordered_map< service::Address, uint32_t, service::Address::Hash >
m_AddrToIP;
/// maps ip address to timestamp last active
std::unordered_map< uint32_t, llarp_time_t > m_IPActivity;
/// our ip address
uint32_t m_OurIP;
/// next ip address to allocate
uint32_t m_NextIP;
/// highest ip address to allocate
uint32_t m_MaxIP;
};
} // namespace handlers
} // namespace llarp

@ -104,9 +104,13 @@ llarp_ev_udp_sendto(struct llarp_udp_io *udp, const sockaddr *to,
bool
llarp_ev_add_tun(struct llarp_ev_loop *loop, struct llarp_tun_io *tun)
{
auto dev = loop->create_tun(tun);
auto dev = loop->create_tun(tun);
tun->impl = dev;
if(dev)
return loop->add_ev(dev, false);
{
loop->tun_listeners.push_back(tun);
return loop->add_ev(dev, true);
}
return false;
}

@ -39,24 +39,35 @@ namespace llarp
bool
queue_write(const void* data, size_t sz)
{
m_writeq.Emplace(data, sz);
return m_writeq.Size() <= MAX_WRITE_QUEUE_SIZE;
return m_writeq.EmplaceIf(
[&](WriteBuffer* pkt) -> bool {
return m_writeq.Size() < MAX_WRITE_QUEUE_SIZE
&& sz <= sizeof(pkt->buf);
},
data, sz);
}
/// called in event loop when fd is ready for writing
/// drops all buffers that cannot be written in this pump
/// requeues anything not written
/// this assumes fd is set to non blocking
virtual void
flush_write()
{
m_writeq.Process([this](const std::unique_ptr< WriteBuffer >& buffer) {
m_writeq.ProcessIf(
[&](const std::unique_ptr< WriteBuffer >& buffer) -> bool {
// todo: wtf???
#ifndef _WIN32
write(fd, buffer->buf, buffer->bufsz);
if(write(fd, buffer->buf, buffer->bufsz) == -1)
{
// if we would block we save the entries for later
return errno == EWOULDBLOCK || errno == EAGAIN;
}
// discard entry
return true;
#else
// writefile
#endif
});
});
/// reset errno
errno = 0;
}
@ -173,8 +184,13 @@ struct llarp_ev_loop
if(l->tick)
l->tick(l);
for(auto& l : tun_listeners)
{
if(l->tick)
l->tick(l);
if(l->before_write)
l->before_write(l);
static_cast< llarp::ev_io* >(l->impl)->flush_write();
}
}
};

@ -1,5 +1,6 @@
#ifndef EV_EPOLL_HPP
#define EV_EPOLL_HPP
#include <fcntl.h>
#include <llarp/buffer.h>
#include <llarp/net.h>
#include <signal.h>
@ -85,8 +86,8 @@ namespace llarp
if(t->before_write)
{
t->before_write(t);
ev_io::flush_write();
}
ev_io::flush_write();
}
int
@ -112,7 +113,13 @@ namespace llarp
if(tuntap_up(tunif) == -1)
return false;
fd = tunif->tun_fd;
return fd != -1;
if(fd == -1)
return false;
// set non blocking
int flags = fcntl(fd, F_GETFL, 0);
if(flags == -1)
return false;
return fcntl(fd, F_SETFL, flags | O_NONBLOCK) != -1;
}
~tun()
@ -172,7 +179,6 @@ struct llarp_epoll_loop : public llarp_ev_loop
{
epoll_event events[1024];
int result;
result = epoll_wait(epollfd, events, 1024, ms);
if(result > 0)
{
@ -189,14 +195,11 @@ struct llarp_epoll_loop : public llarp_ev_loop
if(events[idx].events & EPOLLIN)
{
ev->read(readbuf, sizeof(readbuf));
if(events[idx].events & EPOLLOUT)
ev->flush_write();
}
++idx;
}
}
if(result != -1)
tick_listeners();
tick_listeners();
return result;
}
@ -223,14 +226,11 @@ struct llarp_epoll_loop : public llarp_ev_loop
if(events[idx].events & EPOLLIN)
{
ev->read(readbuf, sizeof(readbuf));
if(events[idx].events & EPOLLOUT)
ev->flush_write();
}
++idx;
}
}
if(result != -1)
tick_listeners();
tick_listeners();
} while(epollfd != -1);
return result;
}
@ -315,8 +315,8 @@ struct llarp_epoll_loop : public llarp_ev_loop
epoll_event ev;
ev.data.ptr = e;
ev.events = EPOLLIN;
if(write)
ev.events |= EPOLLOUT;
// if(write)
// ev.events |= EPOLLOUT;
if(epoll_ctl(epollfd, EPOLL_CTL_ADD, e->fd, &ev) == -1)
{
delete e;

@ -74,6 +74,14 @@ namespace llarp
return m_TunSetupResult.get_future().get();
}
constexpr uint32_t
netmask_ipv4_bits(uint32_t netmask)
{
return (32 - netmask)
? (1 << (32 - (netmask + 1))) | netmask_ipv4_bits(netmask + 1)
: 0;
}
bool
TunEndpoint::SetupTun()
{
@ -84,9 +92,16 @@ namespace llarp
}
m_OurIP = inet_addr(tunif.ifaddr);
m_NextIP = m_OurIP;
char buf[128] = {0};
uint32_t mask = tunif.netmask;
uint32_t baseaddr = (ntohs(m_OurIP) & netmask_ipv4_bits(mask));
m_MaxIP = (ntohs(baseaddr) | ~ntohs(netmask_ipv4_bits(mask)));
char buf[128] = {0};
llarp::LogInfo(Name(), " set ", tunif.ifname, " to have address ",
inet_ntop(AF_INET, &m_OurIP, buf, sizeof(buf)));
llarp::LogInfo(Name(), " allocated up to ",
inet_ntop(AF_INET, &m_MaxIP, buf, sizeof(buf)));
return true;
}
@ -138,13 +153,50 @@ namespace llarp
uint32_t
TunEndpoint::ObtainIPForAddr(const service::Address &addr)
{
auto itr = m_AddrToIP.find(addr);
if(itr != m_AddrToIP.end())
return itr->second;
{
// previously allocated address
auto itr = m_AddrToIP.find(addr);
if(itr != m_AddrToIP.end())
return itr->second;
}
llarp_time_t now = llarp_time_now_ms();
uint32_t nextIP;
if(m_NextIP < m_MaxIP)
{
nextIP = ++m_NextIP;
m_AddrToIP.insert(std::make_pair(addr, nextIP));
m_IPToAddr.insert(std::make_pair(nextIP, addr));
}
else
{
// we are full
// expire least active ip
// TODO: prevent DoS
std::pair< uint32_t, llarp_time_t > oldest = {0, 0};
// find oldest entry
auto itr = m_IPActivity.begin();
while(itr != m_IPActivity.end())
{
if(itr->second <= now)
{
if((now - itr->second) > oldest.second)
{
oldest.first = itr->first;
oldest.second = itr->second;
}
}
++itr;
}
// remap address
m_IPToAddr[oldest.first] = addr;
m_AddrToIP[addr] = oldest.first;
nextIP = oldest.first;
}
// mark ip active
m_IPActivity[nextIP] = now;
uint32_t nextIP = ++m_NextIP;
m_AddrToIP.insert(std::make_pair(addr, nextIP));
m_IPToAddr.insert(std::make_pair(nextIP, addr));
return nextIP;
}
@ -154,6 +206,12 @@ namespace llarp
return m_IPToAddr.find(ip) != m_IPToAddr.end();
}
void
TunEndpoint::MarkIPActive(uint32_t ip)
{
m_IPActivity[ip] = llarp_time_now_ms();
}
void
TunEndpoint::handleTickTun(void *u)
{
@ -171,8 +229,8 @@ namespace llarp
void
TunEndpoint::tunifBeforeWrite(llarp_tun_io *tun)
{
// called in the isolated network thread
TunEndpoint *self = static_cast< TunEndpoint * >(tun->user);
llarp::LogDebug("tunifBeforeWrite");
self->m_NetworkToUserPktQueue.Process(
[tun](const std::unique_ptr< net::IPv4Packet > &pkt) {
if(!llarp_ev_tun_async_write(tun, pkt->buf, pkt->sz))

Loading…
Cancel
Save