From 1a1f93c1712faa3e08ae4383c04349e44c543416 Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Fri, 17 Aug 2018 15:49:58 -0400 Subject: [PATCH] * more tun stuff * use std::unique_ptr where bare pointers aren't absolutely required --- CMakeLists.txt | 1 + include/llarp/codel.hpp | 26 ++++--- include/llarp/crypto_async.h | 8 ++- include/llarp/dht.h | 6 +- include/llarp/endian.h | 2 +- include/llarp/ev.h | 3 + include/llarp/handlers/tun.hpp | 31 ++++++-- include/llarp/ip.hpp | 100 ++++++++++++++++++++++++++ include/llarp/iwp/frame_state.hpp | 2 +- include/llarp/iwp/inbound_message.hpp | 5 +- include/llarp/iwp/sendbuf.hpp | 6 +- include/llarp/iwp/sendqueue.hpp | 4 +- include/llarp/iwp/session.hpp | 7 +- include/llarp/service/context.hpp | 5 +- include/llarp/service/endpoint.hpp | 8 +++ include/llarp/threadpool.h | 3 +- llarp/dht.cpp | 4 +- llarp/ev.hpp | 43 +++++------ llarp/ev_epoll.hpp | 18 ++++- llarp/handlers/tun.cpp | 47 +++++++++--- llarp/ip.cpp | 18 +++++ llarp/iwp/frame_state.cpp | 34 ++++----- llarp/iwp/session.cpp | 52 ++++---------- llarp/iwp/transit_message.cpp | 8 +-- llarp/path.cpp | 3 +- llarp/pathset.cpp | 4 +- llarp/router.cpp | 9 ++- llarp/router.hpp | 2 - llarp/service/context.cpp | 9 ++- llarp/service/endpoint.cpp | 25 ++++++- llarp/threadpool.cpp | 55 +++++--------- llarp/threadpool.hpp | 18 ++++- 32 files changed, 375 insertions(+), 191 deletions(-) create mode 100644 include/llarp/ip.hpp create mode 100644 llarp/ip.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 6a82af031..fa90f3b33 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -313,6 +313,7 @@ set(LIB_SRC llarp/encrypted_frame.cpp llarp/exit_info.cpp llarp/exit_route.cpp + llarp/ip.cpp llarp/link_intro.cpp llarp/link_message.cpp llarp/net.cpp diff --git a/include/llarp/codel.hpp b/include/llarp/codel.hpp index b8f37b100..2715c2422 100644 --- a/include/llarp/codel.hpp +++ b/include/llarp/codel.hpp @@ -67,20 +67,18 @@ namespace llarp } void - Put(T i) + Put(std::unique_ptr< T >& ptr) { Lock_t lock(m_QueueMutex); - // llarp::LogInfo("CoDelQueue::Put - adding item, queue now has ", - // m_Queue.size(), " items at ", getTime(*item)); - PutTime()(i); - m_Queue.push(i); + PutTime()(ptr.get()); if(firstPut == 0) - firstPut = GetTime()(i); + firstPut = GetTime()(ptr.get()); + m_Queue.push(std::move(ptr)); } - template < typename Queue_t > + template < typename Func > void - Process(Queue_t& result) + Process(Func visitor) { llarp_time_t lowest = 0xFFFFFFFFFFFFFFFFUL; // auto start = llarp_time_now_ms(); @@ -90,8 +88,8 @@ namespace llarp while(m_Queue.size()) { // llarp::LogInfo("CoDelQueue::Process - queue has ", m_Queue.size()); - const auto& item = m_Queue.top(); - auto dlt = start - GetTime()(item); + auto& item = m_Queue.top(); + auto dlt = start - GetTime()(item.get()); // llarp::LogInfo("CoDelQueue::Process - dlt ", dlt); lowest = std::min(dlt, lowest); if(m_Queue.size() == 1) @@ -100,9 +98,7 @@ namespace llarp // lowest, " dropMs: ", dropMs); if(lowest > dropMs) { - // drop nextTickInterval += initialIntervalMs / std::sqrt(++dropNum); - delete item; m_Queue.pop(); break; } @@ -113,7 +109,7 @@ namespace llarp } } // llarp::LogInfo("CoDelQueue::Process - passing"); - result.push(item); + visitor(item); m_Queue.pop(); } firstPut = 0; @@ -123,7 +119,9 @@ namespace llarp size_t dropNum = 0; llarp_time_t nextTickInterval = initialIntervalMs; Mutex_t m_QueueMutex; - std::priority_queue< T, std::vector< T >, Compare > m_Queue; + std::priority_queue< std::unique_ptr< T >, + std::vector< std::unique_ptr< T > >, Compare > + m_Queue; std::string m_name; }; } // namespace util diff --git a/include/llarp/crypto_async.h b/include/llarp/crypto_async.h index a796d8419..4c879e569 100644 --- a/include/llarp/crypto_async.h +++ b/include/llarp/crypto_async.h @@ -178,7 +178,9 @@ struct iwp_async_frame byte_t buf[1500]; }; -// TODO: remove +#ifdef __cplusplus +#include + struct FramePutTime { void @@ -199,11 +201,13 @@ struct FrameGetTime struct FrameCompareTime { bool - operator()(const iwp_async_frame *left, iwp_async_frame *right) const + operator()(const std::unique_ptr< iwp_async_frame > &left, + const std::unique_ptr< iwp_async_frame > &right) const { return left->created < right->created; } }; +#endif /// synchronously decrypt a frame bool diff --git a/include/llarp/dht.h b/include/llarp/dht.h index 3a979a48c..c1f6a48f4 100644 --- a/include/llarp/dht.h +++ b/include/llarp/dht.h @@ -46,12 +46,14 @@ void llarp_dht_allow_transit(struct llarp_dht_context* ctx); /// put router as a dht peer +/// internal function do not use void -llarp_dht_put_peer(struct llarp_dht_context* ctx, struct llarp_rc* rc); +__llarp_dht_put_peer(struct llarp_dht_context* ctx, struct llarp_rc* rc); /// remove router from tracked dht peer list +/// internal function do not use void -llarp_dht_remove_peer(struct llarp_dht_context* ctx, const byte_t* id); +__llarp_dht_remove_peer(struct llarp_dht_context* ctx, const byte_t* id); void llarp_dht_lookup_router(struct llarp_dht_context* ctx, diff --git a/include/llarp/endian.h b/include/llarp/endian.h index 8e33bbd62..5a195f4e5 100644 --- a/include/llarp/endian.h +++ b/include/llarp/endian.h @@ -169,4 +169,4 @@ htole64buf(void *buf, uint64_t big64) htobuf64(buf, htole64(big64)); } -#endif \ No newline at end of file +#endif diff --git a/include/llarp/ev.h b/include/llarp/ev.h index 281d2a422..42e416c2d 100644 --- a/include/llarp/ev.h +++ b/include/llarp/ev.h @@ -89,6 +89,9 @@ struct llarp_tun_io void *user; void *impl; struct llarp_ev_loop *parent; + /// called when we are able to write right before we write + /// this happens after reading packets + void (*before_write)(struct llarp_tun_io *); /// called every event loop tick after reads void (*tick)(struct llarp_tun_io *); void (*recvpkt)(struct llarp_tun_io *, const void *, ssize_t); diff --git a/include/llarp/handlers/tun.hpp b/include/llarp/handlers/tun.hpp index 4e4257b47..fe073274c 100644 --- a/include/llarp/handlers/tun.hpp +++ b/include/llarp/handlers/tun.hpp @@ -1,6 +1,8 @@ #ifndef LLARP_HANDLERS_TUN_HPP #define LLARP_HANDLERS_TUN_HPP #include +#include +#include #include #include @@ -8,13 +10,13 @@ namespace llarp { namespace handlers { + static const int DefaultTunNetmask = 16; + static const char DefaultTunIfname[] = "lokinet0"; + static const char DefaultTunDstAddr[] = "10.10.0.1"; + static const char DefaultTunSrcAddr[] = "10.10.0.2"; + struct TunEndpoint : public service::Endpoint { - static constexpr int DefaultNetmask = 16; - static constexpr char DefaultIfname[] = "lokinet0"; - static constexpr char DefaultDstAddr[] = "10.10.0.1"; - static constexpr char DefaultSrcAddr[] = "10.10.0.2"; - TunEndpoint(const std::string& nickname, llarp_router* r); ~TunEndpoint(); @@ -41,7 +43,7 @@ namespace llarp llarp_tun_io tunif; static void - tunifTick(llarp_tun_io* t); + tunifBeforeWrite(llarp_tun_io* t); static void tunifRecvPkt(llarp_tun_io* t, const void* pkt, ssize_t sz); @@ -49,8 +51,25 @@ namespace llarp static void handleTickTun(void* u); + protected: + typedef llarp::util::CoDelQueue< + net::IPv4Packet, net::IPv4Packet::GetTime, net::IPv4Packet::PutTime, + net::IPv4Packet::CompareOrder > + PacketQueue_t; + /// queue for sending packets over the network from us + PacketQueue_t m_UserToNetworkPktQueue; + /// queue for sending packets to user from network + PacketQueue_t m_NetworkToUserPktQueue; + /// return true if we have a remote loki address for this ip address + bool + HasRemoteForIP(const uint32_t& ipv4) + { + return m_IPs.find(ipv4) != m_IPs.end(); + } + private: std::promise< bool > m_TunSetupResult; + std::unordered_map< uint32_t, service::Address > m_IPs; }; } // namespace handlers } // namespace llarp diff --git a/include/llarp/ip.hpp b/include/llarp/ip.hpp new file mode 100644 index 000000000..8fc46fe5c --- /dev/null +++ b/include/llarp/ip.hpp @@ -0,0 +1,100 @@ +#ifndef LLARP_IP_HPP +#define LLARP_IP_HPP +#include +#include +#include +#include + +namespace llarp +{ + namespace net + { + struct IPv4Packet + { + static constexpr size_t MaxSize = 1500; + llarp_time_t timestamp; + size_t sz; + byte_t buf[MaxSize]; + + struct GetTime + { + llarp_time_t + operator()(const IPv4Packet* pkt) const + { + return pkt->timestamp; + } + }; + + struct PutTime + { + void + operator()(IPv4Packet* pkt) const + { + pkt->timestamp = llarp_time_now_ms(); + } + }; + + struct CompareOrder + { + bool + operator()(const std::unique_ptr< IPv4Packet >& left, + const std::unique_ptr< IPv4Packet >& right) + { + return left->timestamp < right->timestamp; + } + }; + + iphdr* + Header() + { + return (iphdr*)buf; + } + + const iphdr* + Header() const + { + return (iphdr*)buf; + } + + uint32_t& + src() + { + return Header()->saddr; + } + + uint32_t& + dst() + { + return Header()->daddr; + } + + const uint32_t& + src() const + { + return Header()->saddr; + } + + const uint32_t& + dst() const + { + return Header()->daddr; + } + + /// put the payload of an ip packet + /// recalculate all fields + /// return true on success + /// return false if the payload doesn't fit + bool + PutPayload(llarp_buffer_t buf); + }; + + /// parse an ipv4 packet + /// returns nullptr if invalid data + /// copies buffer into return value + std::unique_ptr< IPv4Packet > + ParseIPv4Packet(const void* buf, size_t sz); + + } // namespace net +} // namespace llarp + +#endif diff --git a/include/llarp/iwp/frame_state.hpp b/include/llarp/iwp/frame_state.hpp index 0586947fc..77ba5ef3e 100644 --- a/include/llarp/iwp/frame_state.hpp +++ b/include/llarp/iwp/frame_state.hpp @@ -53,7 +53,7 @@ struct frame_state // typedef std::queue< sendbuf_t * > sendqueue_t; typedef llarp::util::CoDelQueue< - InboundMessage *, InboundMessage::GetTime, InboundMessage::PutTime, + InboundMessage, InboundMessage::GetTime, InboundMessage::PutTime, InboundMessage::OrderCompare, llarp::util::DummyMutex, llarp::util::DummyLock > recvqueue_t; diff --git a/include/llarp/iwp/inbound_message.hpp b/include/llarp/iwp/inbound_message.hpp index 8b83b9b6f..e153e6a3b 100644 --- a/include/llarp/iwp/inbound_message.hpp +++ b/include/llarp/iwp/inbound_message.hpp @@ -42,7 +42,8 @@ struct InboundMessage struct OrderCompare { bool - operator()(const InboundMessage *left, const InboundMessage *right) const + operator()(const std::unique_ptr< InboundMessage > &left, + const std::unique_ptr< InboundMessage > &right) const { return left->msgid < right->msgid; } @@ -56,4 +57,4 @@ struct InboundMessage msg->queued = llarp_time_now_ms(); } }; -}; \ No newline at end of file +}; diff --git a/include/llarp/iwp/sendbuf.hpp b/include/llarp/iwp/sendbuf.hpp index 185a3e049..b6ac27060 100644 --- a/include/llarp/iwp/sendbuf.hpp +++ b/include/llarp/iwp/sendbuf.hpp @@ -2,6 +2,7 @@ #include #include +#include #include struct sendbuf_t @@ -55,7 +56,7 @@ struct sendbuf_t struct PutTime { void - operator()(sendbuf_t *&buf) const + operator()(sendbuf_t *buf) const { buf->timestamp = llarp_time_now_ms(); } @@ -64,7 +65,8 @@ struct sendbuf_t struct Compare { bool - operator()(const sendbuf_t *left, const sendbuf_t *right) const + operator()(const std::unique_ptr< sendbuf_t > &left, + const std::unique_ptr< sendbuf_t > &right) const { return left->priority < right->priority; } diff --git a/include/llarp/iwp/sendqueue.hpp b/include/llarp/iwp/sendqueue.hpp index fd4777274..78a256bc5 100644 --- a/include/llarp/iwp/sendqueue.hpp +++ b/include/llarp/iwp/sendqueue.hpp @@ -4,8 +4,8 @@ #include typedef llarp::util::CoDelQueue< - sendbuf_t *, sendbuf_t::GetTime, sendbuf_t::PutTime, sendbuf_t::Compare, + sendbuf_t, sendbuf_t::GetTime, sendbuf_t::PutTime, sendbuf_t::Compare, llarp::util::DummyMutex, llarp::util::DummyLock > sendqueue_t; -#endif \ No newline at end of file +#endif diff --git a/include/llarp/iwp/session.hpp b/include/llarp/iwp/session.hpp index 8ab737661..101cab71a 100644 --- a/include/llarp/iwp/session.hpp +++ b/include/llarp/iwp/session.hpp @@ -125,11 +125,11 @@ struct llarp_link_session uint32_t frames = 0; std::atomic< bool > working; - llarp::util::CoDelQueue< iwp_async_frame *, FrameGetTime, FramePutTime, + llarp::util::CoDelQueue< iwp_async_frame, FrameGetTime, FramePutTime, FrameCompareTime > outboundFrames; - llarp::util::CoDelQueue< iwp_async_frame *, FrameGetTime, FramePutTime, + llarp::util::CoDelQueue< iwp_async_frame, FrameGetTime, FramePutTime, FrameCompareTime > decryptedFrames; @@ -166,7 +166,8 @@ struct llarp_link_session add_outbound_message(uint64_t id, transit_message *msg); void EncryptOutboundFrames(); - iwp_async_frame * + + std::unique_ptr< iwp_async_frame > alloc_frame(const void *buf, size_t sz); void decrypt_frame(const void *buf, size_t sz); diff --git a/include/llarp/service/context.hpp b/include/llarp/service/context.hpp index c3f5d761b..df505ce89 100644 --- a/include/llarp/service/context.hpp +++ b/include/llarp/service/context.hpp @@ -23,8 +23,9 @@ namespace llarp private: llarp_router *m_Router; - std::unordered_map< std::string, Endpoint * > m_Endpoints; + std::unordered_map< std::string, std::unique_ptr< Endpoint > > + m_Endpoints; }; } // namespace service } // namespace llarp -#endif \ No newline at end of file +#endif diff --git a/include/llarp/service/endpoint.hpp b/include/llarp/service/endpoint.hpp index f6d4bcc27..d1912fd16 100644 --- a/include/llarp/service/endpoint.hpp +++ b/include/llarp/service/endpoint.hpp @@ -40,6 +40,10 @@ namespace llarp llarp_logic* EndpointLogic(); + /// endpoint's net loop for sending data to user + llarp_ev_loop* + EndpointNetLoop(); + llarp_crypto* Crypto(); @@ -240,6 +244,9 @@ namespace llarp bool NetworkIsIsolated() const; + static void + RunIsolatedMainLoop(void*); + private: bool OnOutboundLookup(const IntroSet* i); /* */ @@ -268,6 +275,7 @@ namespace llarp llarp_router* m_Router; llarp_threadpool* m_IsolatedWorker = nullptr; llarp_logic* m_IsolatedLogic = nullptr; + llarp_ev_loop* m_IsolatedNetLoop = nullptr; std::string m_Keyfile; std::string m_Name; std::string m_NetNS; diff --git a/include/llarp/threadpool.h b/include/llarp/threadpool.h index 8df8a87f3..d5029bcec 100644 --- a/include/llarp/threadpool.h +++ b/include/llarp/threadpool.h @@ -11,11 +11,12 @@ struct llarp_threadpool * llarp_init_same_process_threadpool(); typedef bool (*setup_net_func)(void *); +typedef void (*run_main_func)(void *); /// for network isolation struct llarp_threadpool * llarp_init_isolated_net_threadpool(const char *name, setup_net_func setupNet, - void *context); + run_main_func runMain, void *context); void llarp_free_threadpool(struct llarp_threadpool **tp); diff --git a/llarp/dht.cpp b/llarp/dht.cpp index 839989619..111ed9dba 100644 --- a/llarp/dht.cpp +++ b/llarp/dht.cpp @@ -20,7 +20,7 @@ llarp_dht_context_free(struct llarp_dht_context *ctx) } void -llarp_dht_put_peer(struct llarp_dht_context *ctx, struct llarp_rc *rc) +__llarp_dht_put_peer(struct llarp_dht_context *ctx, struct llarp_rc *rc) { llarp::dht::RCNode n(rc); @@ -29,7 +29,7 @@ llarp_dht_put_peer(struct llarp_dht_context *ctx, struct llarp_rc *rc) } void -llarp_dht_remove_peer(struct llarp_dht_context *ctx, const byte_t *id) +__llarp_dht_remove_peer(struct llarp_dht_context *ctx, const byte_t *id) { llarp::dht::Key_t k = id; llarp::LogDebug("Removing ", k, " to DHT"); diff --git a/llarp/ev.hpp b/llarp/ev.hpp index 0dd00bc33..6237a1246 100644 --- a/llarp/ev.hpp +++ b/llarp/ev.hpp @@ -39,28 +39,21 @@ namespace llarp bool queue_write(const void* data, size_t sz) { - m_writeq.Put(new WriteBuffer(data, sz)); + std::unique_ptr< WriteBuffer > buf = + std::unique_ptr< WriteBuffer >(new WriteBuffer(data, sz)); + m_writeq.Put(buf); return m_writeq.Size() <= MAX_WRITE_QUEUE_SIZE; } /// called in event loop when fd is ready for writing /// drops all buffers that cannot be written in this pump /// this assumes fd is set to non blocking - void + virtual void flush_write() { - std::queue< WriteBuffer* > send; - m_writeq.Process(send); - while(send.size()) - { - auto& buffer = send.front(); - if(write(fd, buffer->payload.data(), buffer->payload.size()) == -1) - { - // failed to write - // TODO: should we requeue this buffer? - } - delete buffer; - } + m_writeq.Process([this](const std::unique_ptr< WriteBuffer >& buffer) { + write(fd, buffer->buf, buffer->bufsz); + }); /// reset errno errno = 0; } @@ -68,11 +61,18 @@ namespace llarp struct WriteBuffer { llarp_time_t timestamp = 0; - std::vector< byte_t > payload; + size_t bufsz; + byte_t buf[1500]; - WriteBuffer(const void* ptr, size_t sz) : payload(sz) + WriteBuffer(const void* ptr, size_t sz) { - memcpy(payload.data(), ptr, sz); + if(sz <= sizeof(buf)) + { + bufsz = sz; + memcpy(buf, ptr, bufsz); + } + else + bufsz = 0; } struct GetTime @@ -87,7 +87,7 @@ namespace llarp struct PutTime { void - operator()(WriteBuffer*& w) const + operator()(WriteBuffer* w) const { w->timestamp = llarp_time_now_ms(); } @@ -96,14 +96,15 @@ namespace llarp struct Compare { bool - operator()(const WriteBuffer* left, const WriteBuffer* right) const + operator()(const std::unique_ptr< WriteBuffer >& left, + const std::unique_ptr< WriteBuffer >& right) const { return left->timestamp < right->timestamp; } }; }; - llarp::util::CoDelQueue< WriteBuffer*, WriteBuffer::GetTime, + llarp::util::CoDelQueue< WriteBuffer, WriteBuffer::GetTime, WriteBuffer::PutTime, WriteBuffer::Compare, llarp::util::NullMutex, llarp::util::NullLock > m_writeq; @@ -152,7 +153,7 @@ struct llarp_ev_loop create_tun(llarp_tun_io* tun) = 0; virtual bool - add_ev(llarp::ev_io* ev, bool write = false) = 0; + add_ev(llarp::ev_io* ev, bool write = true) = 0; virtual bool running() const = 0; diff --git a/llarp/ev_epoll.hpp b/llarp/ev_epoll.hpp index d0dceddef..5abcc7698 100644 --- a/llarp/ev_epoll.hpp +++ b/llarp/ev_epoll.hpp @@ -76,14 +76,26 @@ namespace llarp int sendto(const sockaddr* to, const void* data, size_t sz) { - // TODO: implement me return -1; } + void + flush_write() + { + if(t->before_write) + { + t->before_write(t); + } + ev_io::flush_write(); + } + int read(void* buf, size_t sz) { - return tuntap_read(tunif, buf, sz); + ssize_t ret = tuntap_read(tunif, buf, sz); + if(ret > 0 && t->recvpkt) + t->recvpkt(t, buf, ret); + return ret; } bool @@ -96,7 +108,7 @@ namespace llarp if(tuntap_set_ip(tunif, t->ifaddr, t->netmask) == -1) return false; fd = tunif->tun_fd; - return false; + return fd != -1; } ~tun() diff --git a/llarp/handlers/tun.cpp b/llarp/handlers/tun.cpp index 96b597e60..dd8eee5df 100644 --- a/llarp/handlers/tun.cpp +++ b/llarp/handlers/tun.cpp @@ -7,15 +7,16 @@ namespace llarp { TunEndpoint::TunEndpoint(const std::string &nickname, llarp_router *r) : service::Endpoint(nickname, r) + , m_UserToNetworkPktQueue(nickname + "_sendq") + , m_NetworkToUserPktQueue(nickname + "_recvq") { tunif.user = this; - tunif.netmask = TunEndpoint::DefaultNetmask; - strncpy(tunif.ifaddr, TunEndpoint::DefaultSrcAddr, - sizeof(tunif.ifaddr) - 1); - strncpy(tunif.ifname, TunEndpoint::DefaultIfname, - sizeof(tunif.ifname) - 1); - tunif.tick = &tunifTick; - tunif.recvpkt = &tunifRecvPkt; + tunif.netmask = DefaultTunNetmask; + strncpy(tunif.ifaddr, DefaultTunSrcAddr, sizeof(tunif.ifaddr) - 1); + strncpy(tunif.ifname, DefaultTunIfname, sizeof(tunif.ifname) - 1); + tunif.tick = nullptr; + tunif.before_write = &tunifBeforeWrite; + tunif.recvpkt = &tunifRecvPkt; } bool @@ -76,13 +77,13 @@ namespace llarp bool TunEndpoint::SetupTun() { - auto evloop = Router()->netloop; - return llarp_ev_add_tun(evloop, &tunif); + return llarp_ev_add_tun(EndpointNetLoop(), &tunif); } bool TunEndpoint::SetupNetworking() { + llarp::LogInfo("Set Up networking for ", Name()); bool result = SetupTun(); m_TunSetupResult.set_value(result); return result; @@ -104,6 +105,34 @@ namespace llarp self->TickTun(now); } + void + TunEndpoint::TickTun(llarp_time_t now) + { + // called in the isolated thread + } + + void + TunEndpoint::tunifBeforeWrite(llarp_tun_io *tun) + { + TunEndpoint *self = static_cast< TunEndpoint * >(tun->user); + self->m_NetworkToUserPktQueue.Process( + [tun](const std::unique_ptr< net::IPv4Packet > &pkt) { + if(!llarp_ev_tun_async_write(tun, pkt->buf, pkt->sz)) + llarp::LogWarn("packet dropped"); + }); + } + + void + TunEndpoint::tunifRecvPkt(llarp_tun_io *tun, const void *buf, ssize_t sz) + { + // called for every packet read from user in isolated network thread + TunEndpoint *self = static_cast< TunEndpoint * >(tun->user); + + std::unique_ptr< net::IPv4Packet > pkt = net::ParseIPv4Packet(buf, sz); + if(pkt) + self->m_UserToNetworkPktQueue.Put(pkt); + } + TunEndpoint::~TunEndpoint() { } diff --git a/llarp/ip.cpp b/llarp/ip.cpp new file mode 100644 index 000000000..3899a2cb9 --- /dev/null +++ b/llarp/ip.cpp @@ -0,0 +1,18 @@ +#include +#include + +namespace llarp +{ + namespace net + { + std::unique_ptr< IPv4Packet > + ParseIPv4Packet(const void* buf, size_t sz) + { + if(sz < 16 || sz > IPv4Packet::MaxSize) + return nullptr; + IPv4Packet* pkt = new IPv4Packet(); + memcpy(pkt->buf, buf, sz); + return std::unique_ptr< IPv4Packet >(pkt); + } + } // namespace net +} // namespace llarp diff --git a/llarp/iwp/frame_state.cpp b/llarp/iwp/frame_state.cpp index 948e226de..6c0733cf0 100644 --- a/llarp/iwp/frame_state.cpp +++ b/llarp/iwp/frame_state.cpp @@ -22,35 +22,23 @@ frame_state::Router() bool frame_state::process_inbound_queue() { - std::priority_queue< InboundMessage *, std::vector< InboundMessage * >, - InboundMessage::OrderCompare > - q; - recvqueue.Process(q); - uint64_t last = 0; - while(q.size()) - { - // TODO: is this right? - auto &front = q.top(); - - if(last != front->msgid) + recvqueue.Process([&](const std::unique_ptr< InboundMessage > &msg) { + if(last != msg->msgid) { - auto buffer = front->Buffer(); + auto buffer = msg->Buffer(); if(!Router()->HandleRecvLinkMessage(parent, buffer)) { - llarp::LogWarn("failed to process inbound message ", front->msgid); + llarp::LogWarn("failed to process inbound message ", msg->msgid); llarp::DumpBuffer< llarp_buffer_t, 128 >(buffer); } - last = front->msgid; + last = msg->msgid; } else { llarp::LogWarn("duplicate inbound message ", last); } - delete front; - - q.pop(); - } + }); // TODO: this isn't right return true; } @@ -193,8 +181,8 @@ void frame_state::push_ackfor(uint64_t id, uint32_t bitmask) { llarp::LogDebug("ACK for msgid=", id, " mask=", bitmask); - auto pkt = new sendbuf_t(12 + 6); - auto body_ptr = init_sendbuf(pkt, eACKS, 12, txflags); + auto pkt = std::unique_ptr< sendbuf_t >(new sendbuf_t(12 + 6)); + auto body_ptr = init_sendbuf(pkt.get(), eACKS, 12, txflags); htobe64buf(body_ptr, id); htobe32buf(body_ptr + 8, bitmask); sendqueue.Put(pkt); @@ -244,7 +232,9 @@ frame_state::inbound_frame_complete(uint64_t id) } else { - recvqueue.Put(new InboundMessage(id, msg)); + std::unique_ptr< InboundMessage > m = + std::unique_ptr< InboundMessage >(new InboundMessage(id, msg)); + recvqueue.Put(m); success = true; } } @@ -398,4 +388,4 @@ void frame_state::alive() { lastEvent = llarp_time_now_ms(); -} \ No newline at end of file +} diff --git a/llarp/iwp/session.cpp b/llarp/iwp/session.cpp index d8c1f4c7e..91cb3bd58 100644 --- a/llarp/iwp/session.cpp +++ b/llarp/iwp/session.cpp @@ -400,15 +400,9 @@ llarp_link_session::get_parent() void llarp_link_session::TickLogic(llarp_time_t now) { - std::queue< iwp_async_frame * > q; - decryptedFrames.Process(q); - while(q.size()) - { - auto &front = q.front(); - handle_frame_decrypt(front); - delete front; - q.pop(); - } + decryptedFrames.Process([&](const std::unique_ptr< iwp_async_frame > &msg) { + handle_frame_decrypt(msg.get()); + }); frame.process_inbound_queue(); frame.retransmit(now); pump(); @@ -452,19 +446,10 @@ llarp_link_session::keepalive() void llarp_link_session::EncryptOutboundFrames() { - std::queue< iwp_async_frame * > outq; - outboundFrames.Process(outq); - while(outq.size()) - { - auto &front = outq.front(); - - // if(iwp_encrypt_frame(&front)) - // q.push(front); - if(iwp_encrypt_frame(front)) - handle_frame_encrypt(front); - delete front; - outq.pop(); - } + outboundFrames.Process([&](const std::unique_ptr< iwp_async_frame > &frame) { + if(iwp_encrypt_frame(frame.get())) + handle_frame_encrypt(frame.get()); + }); } static void @@ -610,14 +595,13 @@ llarp_link_session::decrypt_frame(const void *buf, size_t sz) // inboundFrames.Put(frame); auto f = alloc_frame(buf, sz); - if(iwp_decrypt_frame(f)) + if(iwp_decrypt_frame(f.get())) { decryptedFrames.Put(f); } else { llarp::LogWarn("decrypt frame fail"); - delete f; } // f->hook = &handle_frame_decrypt; // iwp_call_async_frame_decrypt(iwp, f); @@ -757,8 +741,7 @@ llarp_link_session::recv(const void *buf, size_t sz) } } -// TODO: fix orphan -iwp_async_frame * +std::unique_ptr< iwp_async_frame > llarp_link_session::alloc_frame(const void *buf, size_t sz) { // TODO don't hard code 1500 @@ -779,14 +762,14 @@ llarp_link_session::alloc_frame(const void *buf, size_t sz) // frame->created = now; // llarp::LogInfo("alloc_frame putting into q"); // q.Put(frame); - return frame; + return std::unique_ptr< iwp_async_frame >(frame); } void llarp_link_session::encrypt_frame_async_send(const void *buf, size_t sz) { // 64 bytes frame overhead for nonce and hmac - iwp_async_frame *frame = alloc_frame(nullptr, sz + 64); + auto frame = alloc_frame(nullptr, sz + 64); memcpy(frame->buf + 64, buf, sz); // maybe add upto 128 random bytes to the packet auto padding = llarp_randint() % MAX_PAD; @@ -801,18 +784,11 @@ void llarp_link_session::pump() { bool flush = false; - llarp_buffer_t buf; - std::queue< sendbuf_t * > q; - frame.sendqueue.Process(q); - while(q.size()) - { - auto &front = q.front(); - buf = front->Buffer(); + frame.sendqueue.Process([&](const std::unique_ptr< sendbuf_t > &msg) { + llarp_buffer_t buf = msg->Buffer(); encrypt_frame_async_send(buf.base, buf.sz); - delete front; - q.pop(); flush = true; - } + }); if(flush) PumpCryptoOutbound(); } diff --git a/llarp/iwp/transit_message.cpp b/llarp/iwp/transit_message.cpp index f5ca4c36a..6cf8528e2 100644 --- a/llarp/iwp/transit_message.cpp +++ b/llarp/iwp/transit_message.cpp @@ -109,8 +109,8 @@ void transit_message::generate_xmit(sendqueue_t &queue, byte_t flags) { uint16_t sz = lastfrag.size() + sizeof(msginfo.buffer); - auto pkt = new sendbuf_t(sz + 6); - auto body_ptr = init_sendbuf(pkt, eXMIT, sz, flags); + auto pkt = std::unique_ptr< sendbuf_t >(new sendbuf_t(sz + 6)); + auto body_ptr = init_sendbuf(pkt.get(), eXMIT, sz, flags); memcpy(body_ptr, msginfo.buffer, sizeof(msginfo.buffer)); body_ptr += sizeof(msginfo.buffer); memcpy(body_ptr, lastfrag.data(), lastfrag.size()); @@ -128,8 +128,8 @@ transit_message::retransmit_frags(sendqueue_t &queue, byte_t flags) if(status.test(frag.first)) continue; uint16_t sz = 9 + fragsize; - auto pkt = new sendbuf_t(sz + 6); - auto body_ptr = init_sendbuf(pkt, eFRAG, sz, flags); + auto pkt = std::unique_ptr< sendbuf_t >(new sendbuf_t(sz + 6)); + auto body_ptr = init_sendbuf(pkt.get(), eFRAG, sz, flags); htobe64buf(body_ptr, msgid); body_ptr[8] = frag.first; memcpy(body_ptr + 9, frag.second.data(), fragsize); diff --git a/llarp/path.cpp b/llarp/path.cpp index 3ecd56e19..273bcd997 100644 --- a/llarp/path.cpp +++ b/llarp/path.cpp @@ -239,7 +239,8 @@ namespace llarp } for(auto& builder : m_PathBuilders) { - builder->ExpirePaths(now); + if(builder) + builder->ExpirePaths(now); } } diff --git a/llarp/pathset.cpp b/llarp/pathset.cpp index dba386213..072054ff1 100644 --- a/llarp/pathset.cpp +++ b/llarp/pathset.cpp @@ -32,6 +32,8 @@ namespace llarp void PathSet::ExpirePaths(llarp_time_t now) { + if(m_Paths.size() == 0) + return; auto itr = m_Paths.begin(); while(itr != m_Paths.end()) { @@ -178,4 +180,4 @@ namespace llarp } } // namespace path -} // namespace llarp \ No newline at end of file +} // namespace llarp diff --git a/llarp/router.cpp b/llarp/router.cpp index 12388eed9..bb4dc627f 100644 --- a/llarp/router.cpp +++ b/llarp/router.cpp @@ -338,7 +338,7 @@ llarp_router::on_verify_server_rc(llarp_async_verify_rc *job) router->validRouters[pk] = job->rc; // track valid router in dht - llarp_dht_put_peer(router->dht, &router->validRouters[pk]); + __llarp_dht_put_peer(router->dht, &router->validRouters[pk]); // this was an outbound establish job if(ctx->establish_job) @@ -379,7 +379,8 @@ llarp_router::TryEstablishTo(const llarp::RouterID &remote) lookup->user = this; llarp_rc_clear(&lookup->result); memcpy(lookup->target, remote, PUBKEYSIZE); - lookup->hook = &HandleDHTLookupForTryEstablishTo; + lookup->hook = &HandleDHTLookupForTryEstablishTo; + lookup->iterative = false; llarp_dht_lookup_router(this->dht, lookup); } } @@ -408,8 +409,6 @@ llarp_router::Tick() // llarp::LogDebug("tick router"); auto now = llarp_time_now_ms(); paths.ExpirePaths(); - // TODO: don't do this if we have enough paths already - // FIXME: build paths even if we have inbound links if(inboundLinks.size() == 0) { { @@ -502,7 +501,7 @@ llarp_router::SessionClosed(const llarp::RouterID &remote) if(itr == validRouters.end()) return; - llarp_dht_remove_peer(dht, remote); + __llarp_dht_remove_peer(dht, remote); llarp_rc_free(&itr->second); validRouters.erase(itr); } diff --git a/llarp/router.hpp b/llarp/router.hpp index 8e048433d..e0908a139 100644 --- a/llarp/router.hpp +++ b/llarp/router.hpp @@ -97,8 +97,6 @@ struct llarp_router llarp::service::Context hiddenServiceContext; - llarp::handlers::TunEndpoint *tunEndpoint = nullptr; - llarp_link *outboundLink = nullptr; std::list< llarp_link * > inboundLinks; diff --git a/llarp/service/context.cpp b/llarp/service/context.cpp index 73322c3fb..f86df8de9 100644 --- a/llarp/service/context.cpp +++ b/llarp/service/context.cpp @@ -1,3 +1,4 @@ +#include #include namespace llarp @@ -39,7 +40,9 @@ namespace llarp conf.first); return false; } - auto service = new llarp::service::Endpoint(conf.first, m_Router); + + std::unique_ptr< llarp::service::Endpoint > service( + new llarp::handlers::TunEndpoint(conf.first, m_Router)); for(const auto &option : conf.second) { auto &k = option.first; @@ -54,11 +57,11 @@ namespace llarp if(service->Start()) { llarp::LogInfo("added hidden service endpoint ", service->Name()); - m_Endpoints.insert(std::make_pair(conf.first, service)); + m_Endpoints.insert(std::make_pair(conf.first, std::move(service))); return true; } llarp::LogError("failed to start hidden service endpoint ", conf.first); return false; } } // namespace service -} // namespace llarp \ No newline at end of file +} // namespace llarp diff --git a/llarp/service/endpoint.cpp b/llarp/service/endpoint.cpp index 088b5b1f8..287b9256b 100644 --- a/llarp/service/endpoint.cpp +++ b/llarp/service/endpoint.cpp @@ -49,12 +49,22 @@ namespace llarp bool Endpoint::IsolateNetwork() { + llarp::LogInfo("isolating network to namespace ", m_NetNS); m_IsolatedWorker = llarp_init_isolated_net_threadpool( - m_Name.c_str(), &SetupIsolatedNetwork, this); + m_Name.c_str(), &SetupIsolatedNetwork, &RunIsolatedMainLoop, this); m_IsolatedLogic = llarp_init_single_process_logic(m_IsolatedWorker); return true; } + llarp_ev_loop* + Endpoint::EndpointNetLoop() + { + if(m_IsolatedNetLoop) + return m_IsolatedNetLoop; + else + return m_Router->netloop; + } + bool Endpoint::NetworkIsIsolated() const { @@ -559,8 +569,17 @@ namespace llarp bool Endpoint::DoNetworkIsolation() { - /// TODO: implement me - return false; + llarp_ev_loop_alloc(&m_IsolatedNetLoop); + return SetupNetworking(); + } + + void + Endpoint::RunIsolatedMainLoop(void* user) + { + Endpoint* self = static_cast< Endpoint* >(user); + llarp_ev_loop_run_single_process(self->m_IsolatedNetLoop, + self->m_IsolatedWorker, + self->m_IsolatedLogic); } void diff --git a/llarp/threadpool.cpp b/llarp/threadpool.cpp index b50e81943..bffad36ff 100644 --- a/llarp/threadpool.cpp +++ b/llarp/threadpool.cpp @@ -110,46 +110,24 @@ namespace llarp condition.NotifyOne(); } - static int - runIsolated(void *arg) - { - IsolatedPool *self = static_cast< IsolatedPool * >(arg); - if(!self->Isolated()) - { - llarp::LogError("failed to set up isolated environment"); - return 1; - } - auto func = std::bind(&Pool::Spawn, self, self->m_IsolatedWorkers, - self->m_IsolatedName); - func(); - return 0; - } - void - IsolatedPool::Spawn(int workers, const char *name) + IsolatedPool::Spawn(size_t workers, const char *name) { - if(m_isolated) - return; #ifdef __linux__ IsolatedPool *self = this; self->m_IsolatedName = name; self->m_IsolatedWorkers = workers; m_isolated = new std::thread([self] { - pid_t isolated; - isolated = - clone(runIsolated, self->m_childstack + sizeof(self->m_childstack), - self->m_flags | SIGCHLD, self); - if(isolated == -1) + if(unshare(self->m_flags) == -1) + llarp::LogError("unshared failed: ", strerror(errno)); + else { - llarp::LogError("failed to run isolated threadpool, ", - strerror(errno)); - return; - } - llarp::LogInfo("Spawned isolated process pool"); - if(waitpid(isolated, nullptr, 0) == -1) - { - llarp::LogError("failed to wait for pid ", isolated, ", ", - strerror(errno)); + llarp::LogInfo("spawning isolated environment"); + self->Pool::Spawn(self->m_IsolatedWorkers, self->m_IsolatedName); + if(self->Isolated()) + { + self->MainLoop(); + } } }); #else @@ -172,18 +150,22 @@ namespace llarp #ifdef __linux__ NetIsolatedPool::NetIsolatedPool(std::function< bool(void *) > setupNet, + std::function< void(void *) > runMain, void *user) : IsolatedPool(CLONE_NEWNET) { m_NetSetup = setupNet; + m_RunMain = runMain; m_user = user; } #else NetIsolatedPool::NetIsolatedPool(std::function< bool(void *) > setupNet, + std::function< void(void *) > runMain, void *user) : IsolatedPool(0) { m_NetSetup = setupNet; + m_RunMain = runMain; m_user = user; } #endif @@ -198,10 +180,11 @@ struct llarp_threadpool std::queue< llarp_thread_job * > jobs; llarp_threadpool(int workers, const char *name, bool isolate, - setup_net_func setup = nullptr, void *user = nullptr) + setup_net_func setup = nullptr, + run_main_func runmain = nullptr, void *user = nullptr) { if(isolate) - impl = new llarp::thread::NetIsolatedPool(setup, user); + impl = new llarp::thread::NetIsolatedPool(setup, runmain, user); else impl = new llarp::thread::Pool(); impl->Spawn(workers, name); @@ -229,9 +212,9 @@ llarp_init_same_process_threadpool() struct llarp_threadpool * llarp_init_isolated_net_threadpool(const char *name, setup_net_func setup, - void *context) + run_main_func runmain, void *context) { - return new llarp_threadpool(1, name, true, setup, context); + return new llarp_threadpool(1, name, true, setup, runmain, context); } void diff --git a/llarp/threadpool.hpp b/llarp/threadpool.hpp index e1236745c..05122a29d 100644 --- a/llarp/threadpool.hpp +++ b/llarp/threadpool.hpp @@ -61,7 +61,7 @@ namespace llarp } void - Spawn(int workers, const char* name); + Spawn(size_t workers, const char* name); void Join(); @@ -78,12 +78,17 @@ namespace llarp int m_flags; int m_IsolatedWorkers = 0; const char* m_IsolatedName = nullptr; - char m_childstack[(1024 * 1024 * 8)]; + + virtual void + MainLoop() + { + } }; struct NetIsolatedPool : public IsolatedPool { - NetIsolatedPool(std::function< bool(void*) > setupNet, void* user); + NetIsolatedPool(std::function< bool(void*) > setupNet, + std::function< void(void*) > runMain, void* user); bool Isolated() @@ -91,7 +96,14 @@ namespace llarp return m_NetSetup(m_user); } + void + MainLoop() + { + m_RunMain(m_user); + } + std::function< bool(void*) > m_NetSetup; + std::function< void(void*) > m_RunMain; void* m_user; };