From 4e2f0ccbf843dd21e9da6225b31e53f9310965ec Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Tue, 21 Aug 2018 14:17:16 -0400 Subject: [PATCH] * add ProcessIf in CoDel queue to requeue unprocessed items * more tun code * make event loop tick and write with tun --- include/llarp/codel.hpp | 52 ++++++++++++++++++++---- include/llarp/handlers/tun.hpp | 20 ++++++++- llarp/ev.cpp | 8 +++- llarp/ev.hpp | 28 ++++++++++--- llarp/ev_epoll.hpp | 26 ++++++------ llarp/handlers/tun.cpp | 74 ++++++++++++++++++++++++++++++---- 6 files changed, 170 insertions(+), 38 deletions(-) diff --git a/include/llarp/codel.hpp b/include/llarp/codel.hpp index b1d243f82..628a5896e 100644 --- a/include/llarp/codel.hpp +++ b/include/llarp/codel.hpp @@ -113,20 +113,43 @@ namespace llarp } } - template < typename Func > + template < typename Q > void - Process(Func visitor) + _sort(Q& queue) + { + std::vector< std::unique_ptr< T > > q; + while(queue.size()) + { + q.push_back(std::move(queue.front())); + queue.pop(); + } + std::sort(q.begin(), q.end(), Compare()); + auto itr = q.begin(); + while(itr != q.end()) + { + queue.push(std::move(*itr)); + ++itr; + } + } + + /// visit returns true to discard entry otherwise the entry is + /// re quened + template < typename Visit > + void + ProcessIf(Visit visitor) { llarp_time_t lowest = 0xFFFFFFFFFFFFFFFFUL; // auto start = llarp_time_now_ms(); // llarp::LogInfo("CoDelQueue::Process - start at ", start); Lock_t lock(m_QueueMutex); + _sort(m_Queue); auto start = firstPut; + std::queue< std::unique_ptr< T > > requeue; while(m_Queue.size()) { llarp::LogDebug("CoDelQueue::Process - queue has ", m_Queue.size()); - const auto& item = m_Queue.top(); - auto dlt = start - GetTime()(item.get()); + auto& item = m_Queue.front(); + auto dlt = start - GetTime()(item.get()); // llarp::LogInfo("CoDelQueue::Process - dlt ", dlt); lowest = std::min(dlt, lowest); if(m_Queue.size() == 1) @@ -146,19 +169,32 @@ namespace llarp } } // llarp::LogInfo("CoDelQueue::Process - passing"); - visitor(item); + if(!visitor(item)) + { + // requeue item as we are not done + requeue.push(std::move(item)); + } m_Queue.pop(); } + m_Queue = std::move(requeue); firstPut = 0; } + template < typename Func > + void + Process(Func visitor) + { + ProcessIf([visitor](const std::unique_ptr< T >& t) -> bool { + visitor(t); + return true; + }); + } + llarp_time_t firstPut = 0; size_t dropNum = 0; llarp_time_t nextTickInterval = initialIntervalMs; Mutex_t m_QueueMutex; - std::priority_queue< std::unique_ptr< T >, - std::vector< std::unique_ptr< T > >, Compare > - m_Queue; + std::queue< std::unique_ptr< T > > m_Queue; std::string m_name; }; } // namespace util diff --git a/include/llarp/handlers/tun.hpp b/include/llarp/handlers/tun.hpp index 53a6e61ce..e45911f3e 100644 --- a/include/llarp/handlers/tun.hpp +++ b/include/llarp/handlers/tun.hpp @@ -41,6 +41,7 @@ namespace llarp SetupNetworking(); /// overrides Endpoint + /// handle inbound traffic void HandleDataMessage(service::ProtocolMessage* msg); @@ -54,12 +55,15 @@ namespace llarp llarp_tun_io tunif; + /// called before writing to tun interface static void tunifBeforeWrite(llarp_tun_io* t); + /// called every time we wish to read a packet from the tun interface static void tunifRecvPkt(llarp_tun_io* t, const void* pkt, ssize_t sz); + /// called in the endpoint logic thread static void handleTickTun(void* u); @@ -75,17 +79,31 @@ namespace llarp /// return true if we have a remote loki address for this ip address bool HasRemoteForIP(const uint32_t& ipv4) const; - + /// get ip address for service address unconditionally uint32_t ObtainIPForAddr(const service::Address& addr); + /// mark this address as active + void + MarkIPActive(uint32_t ip); + private: + /// handles setup, given value true on success and false on failure to set + /// up interface std::promise< bool > m_TunSetupResult; + /// maps ip to service address std::unordered_map< uint32_t, service::Address > m_IPToAddr; + /// maps service address to ip std::unordered_map< service::Address, uint32_t, service::Address::Hash > m_AddrToIP; + /// maps ip address to timestamp last active + std::unordered_map< uint32_t, llarp_time_t > m_IPActivity; + /// our ip address uint32_t m_OurIP; + /// next ip address to allocate uint32_t m_NextIP; + /// highest ip address to allocate + uint32_t m_MaxIP; }; } // namespace handlers } // namespace llarp diff --git a/llarp/ev.cpp b/llarp/ev.cpp index 1ab547cb3..599545ca2 100644 --- a/llarp/ev.cpp +++ b/llarp/ev.cpp @@ -104,9 +104,13 @@ llarp_ev_udp_sendto(struct llarp_udp_io *udp, const sockaddr *to, bool llarp_ev_add_tun(struct llarp_ev_loop *loop, struct llarp_tun_io *tun) { - auto dev = loop->create_tun(tun); + auto dev = loop->create_tun(tun); + tun->impl = dev; if(dev) - return loop->add_ev(dev, false); + { + loop->tun_listeners.push_back(tun); + return loop->add_ev(dev, true); + } return false; } diff --git a/llarp/ev.hpp b/llarp/ev.hpp index 4a552ac79..618e55a4a 100644 --- a/llarp/ev.hpp +++ b/llarp/ev.hpp @@ -39,24 +39,35 @@ namespace llarp bool queue_write(const void* data, size_t sz) { - m_writeq.Emplace(data, sz); - return m_writeq.Size() <= MAX_WRITE_QUEUE_SIZE; + return m_writeq.EmplaceIf( + [&](WriteBuffer* pkt) -> bool { + return m_writeq.Size() < MAX_WRITE_QUEUE_SIZE + && sz <= sizeof(pkt->buf); + }, + data, sz); } /// called in event loop when fd is ready for writing - /// drops all buffers that cannot be written in this pump + /// requeues anything not written /// this assumes fd is set to non blocking virtual void flush_write() { - m_writeq.Process([this](const std::unique_ptr< WriteBuffer >& buffer) { + m_writeq.ProcessIf( + [&](const std::unique_ptr< WriteBuffer >& buffer) -> bool { // todo: wtf??? #ifndef _WIN32 - write(fd, buffer->buf, buffer->bufsz); + if(write(fd, buffer->buf, buffer->bufsz) == -1) + { + // if we would block we save the entries for later + return errno == EWOULDBLOCK || errno == EAGAIN; + } + // discard entry + return true; #else // writefile #endif - }); + }); /// reset errno errno = 0; } @@ -173,8 +184,13 @@ struct llarp_ev_loop if(l->tick) l->tick(l); for(auto& l : tun_listeners) + { if(l->tick) l->tick(l); + if(l->before_write) + l->before_write(l); + static_cast< llarp::ev_io* >(l->impl)->flush_write(); + } } }; diff --git a/llarp/ev_epoll.hpp b/llarp/ev_epoll.hpp index a9cdbbf3a..9d5704309 100644 --- a/llarp/ev_epoll.hpp +++ b/llarp/ev_epoll.hpp @@ -1,5 +1,6 @@ #ifndef EV_EPOLL_HPP #define EV_EPOLL_HPP +#include #include #include #include @@ -85,8 +86,8 @@ namespace llarp if(t->before_write) { t->before_write(t); - ev_io::flush_write(); } + ev_io::flush_write(); } int @@ -112,7 +113,13 @@ namespace llarp if(tuntap_up(tunif) == -1) return false; fd = tunif->tun_fd; - return fd != -1; + if(fd == -1) + return false; + // set non blocking + int flags = fcntl(fd, F_GETFL, 0); + if(flags == -1) + return false; + return fcntl(fd, F_SETFL, flags | O_NONBLOCK) != -1; } ~tun() @@ -172,7 +179,6 @@ struct llarp_epoll_loop : public llarp_ev_loop { epoll_event events[1024]; int result; - result = epoll_wait(epollfd, events, 1024, ms); if(result > 0) { @@ -189,14 +195,11 @@ struct llarp_epoll_loop : public llarp_ev_loop if(events[idx].events & EPOLLIN) { ev->read(readbuf, sizeof(readbuf)); - if(events[idx].events & EPOLLOUT) - ev->flush_write(); } ++idx; } } - if(result != -1) - tick_listeners(); + tick_listeners(); return result; } @@ -223,14 +226,11 @@ struct llarp_epoll_loop : public llarp_ev_loop if(events[idx].events & EPOLLIN) { ev->read(readbuf, sizeof(readbuf)); - if(events[idx].events & EPOLLOUT) - ev->flush_write(); } ++idx; } } - if(result != -1) - tick_listeners(); + tick_listeners(); } while(epollfd != -1); return result; } @@ -315,8 +315,8 @@ struct llarp_epoll_loop : public llarp_ev_loop epoll_event ev; ev.data.ptr = e; ev.events = EPOLLIN; - if(write) - ev.events |= EPOLLOUT; + // if(write) + // ev.events |= EPOLLOUT; if(epoll_ctl(epollfd, EPOLL_CTL_ADD, e->fd, &ev) == -1) { delete e; diff --git a/llarp/handlers/tun.cpp b/llarp/handlers/tun.cpp index 0ffffa6e6..a6586cef4 100644 --- a/llarp/handlers/tun.cpp +++ b/llarp/handlers/tun.cpp @@ -74,6 +74,14 @@ namespace llarp return m_TunSetupResult.get_future().get(); } + constexpr uint32_t + netmask_ipv4_bits(uint32_t netmask) + { + return (32 - netmask) + ? (1 << (32 - (netmask + 1))) | netmask_ipv4_bits(netmask + 1) + : 0; + } + bool TunEndpoint::SetupTun() { @@ -84,9 +92,16 @@ namespace llarp } m_OurIP = inet_addr(tunif.ifaddr); m_NextIP = m_OurIP; - char buf[128] = {0}; + uint32_t mask = tunif.netmask; + + uint32_t baseaddr = (ntohs(m_OurIP) & netmask_ipv4_bits(mask)); + m_MaxIP = (ntohs(baseaddr) | ~ntohs(netmask_ipv4_bits(mask))); + char buf[128] = {0}; llarp::LogInfo(Name(), " set ", tunif.ifname, " to have address ", inet_ntop(AF_INET, &m_OurIP, buf, sizeof(buf))); + + llarp::LogInfo(Name(), " allocated up to ", + inet_ntop(AF_INET, &m_MaxIP, buf, sizeof(buf))); return true; } @@ -138,13 +153,50 @@ namespace llarp uint32_t TunEndpoint::ObtainIPForAddr(const service::Address &addr) { - auto itr = m_AddrToIP.find(addr); - if(itr != m_AddrToIP.end()) - return itr->second; + { + // previously allocated address + auto itr = m_AddrToIP.find(addr); + if(itr != m_AddrToIP.end()) + return itr->second; + } + llarp_time_t now = llarp_time_now_ms(); + uint32_t nextIP; + if(m_NextIP < m_MaxIP) + { + nextIP = ++m_NextIP; + m_AddrToIP.insert(std::make_pair(addr, nextIP)); + m_IPToAddr.insert(std::make_pair(nextIP, addr)); + } + else + { + // we are full + // expire least active ip + // TODO: prevent DoS + std::pair< uint32_t, llarp_time_t > oldest = {0, 0}; + + // find oldest entry + auto itr = m_IPActivity.begin(); + while(itr != m_IPActivity.end()) + { + if(itr->second <= now) + { + if((now - itr->second) > oldest.second) + { + oldest.first = itr->first; + oldest.second = itr->second; + } + } + ++itr; + } + // remap address + m_IPToAddr[oldest.first] = addr; + m_AddrToIP[addr] = oldest.first; + nextIP = oldest.first; + } + + // mark ip active + m_IPActivity[nextIP] = now; - uint32_t nextIP = ++m_NextIP; - m_AddrToIP.insert(std::make_pair(addr, nextIP)); - m_IPToAddr.insert(std::make_pair(nextIP, addr)); return nextIP; } @@ -154,6 +206,12 @@ namespace llarp return m_IPToAddr.find(ip) != m_IPToAddr.end(); } + void + TunEndpoint::MarkIPActive(uint32_t ip) + { + m_IPActivity[ip] = llarp_time_now_ms(); + } + void TunEndpoint::handleTickTun(void *u) { @@ -171,8 +229,8 @@ namespace llarp void TunEndpoint::tunifBeforeWrite(llarp_tun_io *tun) { + // called in the isolated network thread TunEndpoint *self = static_cast< TunEndpoint * >(tun->user); - llarp::LogDebug("tunifBeforeWrite"); self->m_NetworkToUserPktQueue.Process( [tun](const std::unique_ptr< net::IPv4Packet > &pkt) { if(!llarp_ev_tun_async_write(tun, pkt->buf, pkt->sz))