* more tun stuff

* use std::unique_ptr where bare pointers aren't absolutely required
pull/13/head
Jeff Becker 6 years ago
parent 913fb1d88a
commit 1a1f93c171
No known key found for this signature in database
GPG Key ID: F357B3B42F6F9B05

@ -313,6 +313,7 @@ set(LIB_SRC
llarp/encrypted_frame.cpp llarp/encrypted_frame.cpp
llarp/exit_info.cpp llarp/exit_info.cpp
llarp/exit_route.cpp llarp/exit_route.cpp
llarp/ip.cpp
llarp/link_intro.cpp llarp/link_intro.cpp
llarp/link_message.cpp llarp/link_message.cpp
llarp/net.cpp llarp/net.cpp

@ -67,20 +67,18 @@ namespace llarp
} }
void void
Put(T i) Put(std::unique_ptr< T >& ptr)
{ {
Lock_t lock(m_QueueMutex); Lock_t lock(m_QueueMutex);
// llarp::LogInfo("CoDelQueue::Put - adding item, queue now has ", PutTime()(ptr.get());
// m_Queue.size(), " items at ", getTime(*item));
PutTime()(i);
m_Queue.push(i);
if(firstPut == 0) if(firstPut == 0)
firstPut = GetTime()(i); firstPut = GetTime()(ptr.get());
m_Queue.push(std::move(ptr));
} }
template < typename Queue_t > template < typename Func >
void void
Process(Queue_t& result) Process(Func visitor)
{ {
llarp_time_t lowest = 0xFFFFFFFFFFFFFFFFUL; llarp_time_t lowest = 0xFFFFFFFFFFFFFFFFUL;
// auto start = llarp_time_now_ms(); // auto start = llarp_time_now_ms();
@ -90,8 +88,8 @@ namespace llarp
while(m_Queue.size()) while(m_Queue.size())
{ {
// llarp::LogInfo("CoDelQueue::Process - queue has ", m_Queue.size()); // llarp::LogInfo("CoDelQueue::Process - queue has ", m_Queue.size());
const auto& item = m_Queue.top(); auto& item = m_Queue.top();
auto dlt = start - GetTime()(item); auto dlt = start - GetTime()(item.get());
// llarp::LogInfo("CoDelQueue::Process - dlt ", dlt); // llarp::LogInfo("CoDelQueue::Process - dlt ", dlt);
lowest = std::min(dlt, lowest); lowest = std::min(dlt, lowest);
if(m_Queue.size() == 1) if(m_Queue.size() == 1)
@ -100,9 +98,7 @@ namespace llarp
// lowest, " dropMs: ", dropMs); // lowest, " dropMs: ", dropMs);
if(lowest > dropMs) if(lowest > dropMs)
{ {
// drop
nextTickInterval += initialIntervalMs / std::sqrt(++dropNum); nextTickInterval += initialIntervalMs / std::sqrt(++dropNum);
delete item;
m_Queue.pop(); m_Queue.pop();
break; break;
} }
@ -113,7 +109,7 @@ namespace llarp
} }
} }
// llarp::LogInfo("CoDelQueue::Process - passing"); // llarp::LogInfo("CoDelQueue::Process - passing");
result.push(item); visitor(item);
m_Queue.pop(); m_Queue.pop();
} }
firstPut = 0; firstPut = 0;
@ -123,7 +119,9 @@ namespace llarp
size_t dropNum = 0; size_t dropNum = 0;
llarp_time_t nextTickInterval = initialIntervalMs; llarp_time_t nextTickInterval = initialIntervalMs;
Mutex_t m_QueueMutex; Mutex_t m_QueueMutex;
std::priority_queue< T, std::vector< T >, Compare > m_Queue; std::priority_queue< std::unique_ptr< T >,
std::vector< std::unique_ptr< T > >, Compare >
m_Queue;
std::string m_name; std::string m_name;
}; };
} // namespace util } // namespace util

@ -178,7 +178,9 @@ struct iwp_async_frame
byte_t buf[1500]; byte_t buf[1500];
}; };
// TODO: remove #ifdef __cplusplus
#include <memory>
struct FramePutTime struct FramePutTime
{ {
void void
@ -199,11 +201,13 @@ struct FrameGetTime
struct FrameCompareTime struct FrameCompareTime
{ {
bool bool
operator()(const iwp_async_frame *left, iwp_async_frame *right) const operator()(const std::unique_ptr< iwp_async_frame > &left,
const std::unique_ptr< iwp_async_frame > &right) const
{ {
return left->created < right->created; return left->created < right->created;
} }
}; };
#endif
/// synchronously decrypt a frame /// synchronously decrypt a frame
bool bool

@ -46,12 +46,14 @@ void
llarp_dht_allow_transit(struct llarp_dht_context* ctx); llarp_dht_allow_transit(struct llarp_dht_context* ctx);
/// put router as a dht peer /// put router as a dht peer
/// internal function do not use
void void
llarp_dht_put_peer(struct llarp_dht_context* ctx, struct llarp_rc* rc); __llarp_dht_put_peer(struct llarp_dht_context* ctx, struct llarp_rc* rc);
/// remove router from tracked dht peer list /// remove router from tracked dht peer list
/// internal function do not use
void void
llarp_dht_remove_peer(struct llarp_dht_context* ctx, const byte_t* id); __llarp_dht_remove_peer(struct llarp_dht_context* ctx, const byte_t* id);
void void
llarp_dht_lookup_router(struct llarp_dht_context* ctx, llarp_dht_lookup_router(struct llarp_dht_context* ctx,

@ -169,4 +169,4 @@ htole64buf(void *buf, uint64_t big64)
htobuf64(buf, htole64(big64)); htobuf64(buf, htole64(big64));
} }
#endif #endif

@ -89,6 +89,9 @@ struct llarp_tun_io
void *user; void *user;
void *impl; void *impl;
struct llarp_ev_loop *parent; struct llarp_ev_loop *parent;
/// called when we are able to write right before we write
/// this happens after reading packets
void (*before_write)(struct llarp_tun_io *);
/// called every event loop tick after reads /// called every event loop tick after reads
void (*tick)(struct llarp_tun_io *); void (*tick)(struct llarp_tun_io *);
void (*recvpkt)(struct llarp_tun_io *, const void *, ssize_t); void (*recvpkt)(struct llarp_tun_io *, const void *, ssize_t);

@ -1,6 +1,8 @@
#ifndef LLARP_HANDLERS_TUN_HPP #ifndef LLARP_HANDLERS_TUN_HPP
#define LLARP_HANDLERS_TUN_HPP #define LLARP_HANDLERS_TUN_HPP
#include <llarp/ev.h> #include <llarp/ev.h>
#include <llarp/codel.hpp>
#include <llarp/ip.hpp>
#include <llarp/service/endpoint.hpp> #include <llarp/service/endpoint.hpp>
#include <llarp/threading.hpp> #include <llarp/threading.hpp>
@ -8,13 +10,13 @@ namespace llarp
{ {
namespace handlers namespace handlers
{ {
static const int DefaultTunNetmask = 16;
static const char DefaultTunIfname[] = "lokinet0";
static const char DefaultTunDstAddr[] = "10.10.0.1";
static const char DefaultTunSrcAddr[] = "10.10.0.2";
struct TunEndpoint : public service::Endpoint struct TunEndpoint : public service::Endpoint
{ {
static constexpr int DefaultNetmask = 16;
static constexpr char DefaultIfname[] = "lokinet0";
static constexpr char DefaultDstAddr[] = "10.10.0.1";
static constexpr char DefaultSrcAddr[] = "10.10.0.2";
TunEndpoint(const std::string& nickname, llarp_router* r); TunEndpoint(const std::string& nickname, llarp_router* r);
~TunEndpoint(); ~TunEndpoint();
@ -41,7 +43,7 @@ namespace llarp
llarp_tun_io tunif; llarp_tun_io tunif;
static void static void
tunifTick(llarp_tun_io* t); tunifBeforeWrite(llarp_tun_io* t);
static void static void
tunifRecvPkt(llarp_tun_io* t, const void* pkt, ssize_t sz); tunifRecvPkt(llarp_tun_io* t, const void* pkt, ssize_t sz);
@ -49,8 +51,25 @@ namespace llarp
static void static void
handleTickTun(void* u); handleTickTun(void* u);
protected:
typedef llarp::util::CoDelQueue<
net::IPv4Packet, net::IPv4Packet::GetTime, net::IPv4Packet::PutTime,
net::IPv4Packet::CompareOrder >
PacketQueue_t;
/// queue for sending packets over the network from us
PacketQueue_t m_UserToNetworkPktQueue;
/// queue for sending packets to user from network
PacketQueue_t m_NetworkToUserPktQueue;
/// return true if we have a remote loki address for this ip address
bool
HasRemoteForIP(const uint32_t& ipv4)
{
return m_IPs.find(ipv4) != m_IPs.end();
}
private: private:
std::promise< bool > m_TunSetupResult; std::promise< bool > m_TunSetupResult;
std::unordered_map< uint32_t, service::Address > m_IPs;
}; };
} // namespace handlers } // namespace handlers
} // namespace llarp } // namespace llarp

@ -0,0 +1,100 @@
#ifndef LLARP_IP_HPP
#define LLARP_IP_HPP
#include <llarp/buffer.h>
#include <llarp/time.h>
#include <netinet/ip.h>
#include <memory>
namespace llarp
{
namespace net
{
struct IPv4Packet
{
static constexpr size_t MaxSize = 1500;
llarp_time_t timestamp;
size_t sz;
byte_t buf[MaxSize];
struct GetTime
{
llarp_time_t
operator()(const IPv4Packet* pkt) const
{
return pkt->timestamp;
}
};
struct PutTime
{
void
operator()(IPv4Packet* pkt) const
{
pkt->timestamp = llarp_time_now_ms();
}
};
struct CompareOrder
{
bool
operator()(const std::unique_ptr< IPv4Packet >& left,
const std::unique_ptr< IPv4Packet >& right)
{
return left->timestamp < right->timestamp;
}
};
iphdr*
Header()
{
return (iphdr*)buf;
}
const iphdr*
Header() const
{
return (iphdr*)buf;
}
uint32_t&
src()
{
return Header()->saddr;
}
uint32_t&
dst()
{
return Header()->daddr;
}
const uint32_t&
src() const
{
return Header()->saddr;
}
const uint32_t&
dst() const
{
return Header()->daddr;
}
/// put the payload of an ip packet
/// recalculate all fields
/// return true on success
/// return false if the payload doesn't fit
bool
PutPayload(llarp_buffer_t buf);
};
/// parse an ipv4 packet
/// returns nullptr if invalid data
/// copies buffer into return value
std::unique_ptr< IPv4Packet >
ParseIPv4Packet(const void* buf, size_t sz);
} // namespace net
} // namespace llarp
#endif

@ -53,7 +53,7 @@ struct frame_state
// typedef std::queue< sendbuf_t * > sendqueue_t; // typedef std::queue< sendbuf_t * > sendqueue_t;
typedef llarp::util::CoDelQueue< typedef llarp::util::CoDelQueue<
InboundMessage *, InboundMessage::GetTime, InboundMessage::PutTime, InboundMessage, InboundMessage::GetTime, InboundMessage::PutTime,
InboundMessage::OrderCompare, llarp::util::DummyMutex, InboundMessage::OrderCompare, llarp::util::DummyMutex,
llarp::util::DummyLock > llarp::util::DummyLock >
recvqueue_t; recvqueue_t;

@ -42,7 +42,8 @@ struct InboundMessage
struct OrderCompare struct OrderCompare
{ {
bool bool
operator()(const InboundMessage *left, const InboundMessage *right) const operator()(const std::unique_ptr< InboundMessage > &left,
const std::unique_ptr< InboundMessage > &right) const
{ {
return left->msgid < right->msgid; return left->msgid < right->msgid;
} }
@ -56,4 +57,4 @@ struct InboundMessage
msg->queued = llarp_time_now_ms(); msg->queued = llarp_time_now_ms();
} }
}; };
}; };

@ -2,6 +2,7 @@
#include <llarp/buffer.h> #include <llarp/buffer.h>
#include <llarp/time.h> #include <llarp/time.h>
#include <memory>
#include <queue> #include <queue>
struct sendbuf_t struct sendbuf_t
@ -55,7 +56,7 @@ struct sendbuf_t
struct PutTime struct PutTime
{ {
void void
operator()(sendbuf_t *&buf) const operator()(sendbuf_t *buf) const
{ {
buf->timestamp = llarp_time_now_ms(); buf->timestamp = llarp_time_now_ms();
} }
@ -64,7 +65,8 @@ struct sendbuf_t
struct Compare struct Compare
{ {
bool bool
operator()(const sendbuf_t *left, const sendbuf_t *right) const operator()(const std::unique_ptr< sendbuf_t > &left,
const std::unique_ptr< sendbuf_t > &right) const
{ {
return left->priority < right->priority; return left->priority < right->priority;
} }

@ -4,8 +4,8 @@
#include <llarp/iwp/sendbuf.hpp> #include <llarp/iwp/sendbuf.hpp>
typedef llarp::util::CoDelQueue< typedef llarp::util::CoDelQueue<
sendbuf_t *, sendbuf_t::GetTime, sendbuf_t::PutTime, sendbuf_t::Compare, sendbuf_t, sendbuf_t::GetTime, sendbuf_t::PutTime, sendbuf_t::Compare,
llarp::util::DummyMutex, llarp::util::DummyLock > llarp::util::DummyMutex, llarp::util::DummyLock >
sendqueue_t; sendqueue_t;
#endif #endif

@ -125,11 +125,11 @@ struct llarp_link_session
uint32_t frames = 0; uint32_t frames = 0;
std::atomic< bool > working; std::atomic< bool > working;
llarp::util::CoDelQueue< iwp_async_frame *, FrameGetTime, FramePutTime, llarp::util::CoDelQueue< iwp_async_frame, FrameGetTime, FramePutTime,
FrameCompareTime > FrameCompareTime >
outboundFrames; outboundFrames;
llarp::util::CoDelQueue< iwp_async_frame *, FrameGetTime, FramePutTime, llarp::util::CoDelQueue< iwp_async_frame, FrameGetTime, FramePutTime,
FrameCompareTime > FrameCompareTime >
decryptedFrames; decryptedFrames;
@ -166,7 +166,8 @@ struct llarp_link_session
add_outbound_message(uint64_t id, transit_message *msg); add_outbound_message(uint64_t id, transit_message *msg);
void void
EncryptOutboundFrames(); EncryptOutboundFrames();
iwp_async_frame *
std::unique_ptr< iwp_async_frame >
alloc_frame(const void *buf, size_t sz); alloc_frame(const void *buf, size_t sz);
void void
decrypt_frame(const void *buf, size_t sz); decrypt_frame(const void *buf, size_t sz);

@ -23,8 +23,9 @@ namespace llarp
private: private:
llarp_router *m_Router; llarp_router *m_Router;
std::unordered_map< std::string, Endpoint * > m_Endpoints; std::unordered_map< std::string, std::unique_ptr< Endpoint > >
m_Endpoints;
}; };
} // namespace service } // namespace service
} // namespace llarp } // namespace llarp
#endif #endif

@ -40,6 +40,10 @@ namespace llarp
llarp_logic* llarp_logic*
EndpointLogic(); EndpointLogic();
/// endpoint's net loop for sending data to user
llarp_ev_loop*
EndpointNetLoop();
llarp_crypto* llarp_crypto*
Crypto(); Crypto();
@ -240,6 +244,9 @@ namespace llarp
bool bool
NetworkIsIsolated() const; NetworkIsIsolated() const;
static void
RunIsolatedMainLoop(void*);
private: private:
bool bool
OnOutboundLookup(const IntroSet* i); /* */ OnOutboundLookup(const IntroSet* i); /* */
@ -268,6 +275,7 @@ namespace llarp
llarp_router* m_Router; llarp_router* m_Router;
llarp_threadpool* m_IsolatedWorker = nullptr; llarp_threadpool* m_IsolatedWorker = nullptr;
llarp_logic* m_IsolatedLogic = nullptr; llarp_logic* m_IsolatedLogic = nullptr;
llarp_ev_loop* m_IsolatedNetLoop = nullptr;
std::string m_Keyfile; std::string m_Keyfile;
std::string m_Name; std::string m_Name;
std::string m_NetNS; std::string m_NetNS;

@ -11,11 +11,12 @@ struct llarp_threadpool *
llarp_init_same_process_threadpool(); llarp_init_same_process_threadpool();
typedef bool (*setup_net_func)(void *); typedef bool (*setup_net_func)(void *);
typedef void (*run_main_func)(void *);
/// for network isolation /// for network isolation
struct llarp_threadpool * struct llarp_threadpool *
llarp_init_isolated_net_threadpool(const char *name, setup_net_func setupNet, llarp_init_isolated_net_threadpool(const char *name, setup_net_func setupNet,
void *context); run_main_func runMain, void *context);
void void
llarp_free_threadpool(struct llarp_threadpool **tp); llarp_free_threadpool(struct llarp_threadpool **tp);

@ -20,7 +20,7 @@ llarp_dht_context_free(struct llarp_dht_context *ctx)
} }
void void
llarp_dht_put_peer(struct llarp_dht_context *ctx, struct llarp_rc *rc) __llarp_dht_put_peer(struct llarp_dht_context *ctx, struct llarp_rc *rc)
{ {
llarp::dht::RCNode n(rc); llarp::dht::RCNode n(rc);
@ -29,7 +29,7 @@ llarp_dht_put_peer(struct llarp_dht_context *ctx, struct llarp_rc *rc)
} }
void void
llarp_dht_remove_peer(struct llarp_dht_context *ctx, const byte_t *id) __llarp_dht_remove_peer(struct llarp_dht_context *ctx, const byte_t *id)
{ {
llarp::dht::Key_t k = id; llarp::dht::Key_t k = id;
llarp::LogDebug("Removing ", k, " to DHT"); llarp::LogDebug("Removing ", k, " to DHT");

@ -39,28 +39,21 @@ namespace llarp
bool bool
queue_write(const void* data, size_t sz) queue_write(const void* data, size_t sz)
{ {
m_writeq.Put(new WriteBuffer(data, sz)); std::unique_ptr< WriteBuffer > buf =
std::unique_ptr< WriteBuffer >(new WriteBuffer(data, sz));
m_writeq.Put(buf);
return m_writeq.Size() <= MAX_WRITE_QUEUE_SIZE; return m_writeq.Size() <= MAX_WRITE_QUEUE_SIZE;
} }
/// called in event loop when fd is ready for writing /// called in event loop when fd is ready for writing
/// drops all buffers that cannot be written in this pump /// drops all buffers that cannot be written in this pump
/// this assumes fd is set to non blocking /// this assumes fd is set to non blocking
void virtual void
flush_write() flush_write()
{ {
std::queue< WriteBuffer* > send; m_writeq.Process([this](const std::unique_ptr< WriteBuffer >& buffer) {
m_writeq.Process(send); write(fd, buffer->buf, buffer->bufsz);
while(send.size()) });
{
auto& buffer = send.front();
if(write(fd, buffer->payload.data(), buffer->payload.size()) == -1)
{
// failed to write
// TODO: should we requeue this buffer?
}
delete buffer;
}
/// reset errno /// reset errno
errno = 0; errno = 0;
} }
@ -68,11 +61,18 @@ namespace llarp
struct WriteBuffer struct WriteBuffer
{ {
llarp_time_t timestamp = 0; llarp_time_t timestamp = 0;
std::vector< byte_t > payload; size_t bufsz;
byte_t buf[1500];
WriteBuffer(const void* ptr, size_t sz) : payload(sz) WriteBuffer(const void* ptr, size_t sz)
{ {
memcpy(payload.data(), ptr, sz); if(sz <= sizeof(buf))
{
bufsz = sz;
memcpy(buf, ptr, bufsz);
}
else
bufsz = 0;
} }
struct GetTime struct GetTime
@ -87,7 +87,7 @@ namespace llarp
struct PutTime struct PutTime
{ {
void void
operator()(WriteBuffer*& w) const operator()(WriteBuffer* w) const
{ {
w->timestamp = llarp_time_now_ms(); w->timestamp = llarp_time_now_ms();
} }
@ -96,14 +96,15 @@ namespace llarp
struct Compare struct Compare
{ {
bool bool
operator()(const WriteBuffer* left, const WriteBuffer* right) const operator()(const std::unique_ptr< WriteBuffer >& left,
const std::unique_ptr< WriteBuffer >& right) const
{ {
return left->timestamp < right->timestamp; return left->timestamp < right->timestamp;
} }
}; };
}; };
llarp::util::CoDelQueue< WriteBuffer*, WriteBuffer::GetTime, llarp::util::CoDelQueue< WriteBuffer, WriteBuffer::GetTime,
WriteBuffer::PutTime, WriteBuffer::Compare, WriteBuffer::PutTime, WriteBuffer::Compare,
llarp::util::NullMutex, llarp::util::NullLock > llarp::util::NullMutex, llarp::util::NullLock >
m_writeq; m_writeq;
@ -152,7 +153,7 @@ struct llarp_ev_loop
create_tun(llarp_tun_io* tun) = 0; create_tun(llarp_tun_io* tun) = 0;
virtual bool virtual bool
add_ev(llarp::ev_io* ev, bool write = false) = 0; add_ev(llarp::ev_io* ev, bool write = true) = 0;
virtual bool virtual bool
running() const = 0; running() const = 0;

@ -76,14 +76,26 @@ namespace llarp
int int
sendto(const sockaddr* to, const void* data, size_t sz) sendto(const sockaddr* to, const void* data, size_t sz)
{ {
// TODO: implement me
return -1; return -1;
} }
void
flush_write()
{
if(t->before_write)
{
t->before_write(t);
}
ev_io::flush_write();
}
int int
read(void* buf, size_t sz) read(void* buf, size_t sz)
{ {
return tuntap_read(tunif, buf, sz); ssize_t ret = tuntap_read(tunif, buf, sz);
if(ret > 0 && t->recvpkt)
t->recvpkt(t, buf, ret);
return ret;
} }
bool bool
@ -96,7 +108,7 @@ namespace llarp
if(tuntap_set_ip(tunif, t->ifaddr, t->netmask) == -1) if(tuntap_set_ip(tunif, t->ifaddr, t->netmask) == -1)
return false; return false;
fd = tunif->tun_fd; fd = tunif->tun_fd;
return false; return fd != -1;
} }
~tun() ~tun()

@ -7,15 +7,16 @@ namespace llarp
{ {
TunEndpoint::TunEndpoint(const std::string &nickname, llarp_router *r) TunEndpoint::TunEndpoint(const std::string &nickname, llarp_router *r)
: service::Endpoint(nickname, r) : service::Endpoint(nickname, r)
, m_UserToNetworkPktQueue(nickname + "_sendq")
, m_NetworkToUserPktQueue(nickname + "_recvq")
{ {
tunif.user = this; tunif.user = this;
tunif.netmask = TunEndpoint::DefaultNetmask; tunif.netmask = DefaultTunNetmask;
strncpy(tunif.ifaddr, TunEndpoint::DefaultSrcAddr, strncpy(tunif.ifaddr, DefaultTunSrcAddr, sizeof(tunif.ifaddr) - 1);
sizeof(tunif.ifaddr) - 1); strncpy(tunif.ifname, DefaultTunIfname, sizeof(tunif.ifname) - 1);
strncpy(tunif.ifname, TunEndpoint::DefaultIfname, tunif.tick = nullptr;
sizeof(tunif.ifname) - 1); tunif.before_write = &tunifBeforeWrite;
tunif.tick = &tunifTick; tunif.recvpkt = &tunifRecvPkt;
tunif.recvpkt = &tunifRecvPkt;
} }
bool bool
@ -76,13 +77,13 @@ namespace llarp
bool bool
TunEndpoint::SetupTun() TunEndpoint::SetupTun()
{ {
auto evloop = Router()->netloop; return llarp_ev_add_tun(EndpointNetLoop(), &tunif);
return llarp_ev_add_tun(evloop, &tunif);
} }
bool bool
TunEndpoint::SetupNetworking() TunEndpoint::SetupNetworking()
{ {
llarp::LogInfo("Set Up networking for ", Name());
bool result = SetupTun(); bool result = SetupTun();
m_TunSetupResult.set_value(result); m_TunSetupResult.set_value(result);
return result; return result;
@ -104,6 +105,34 @@ namespace llarp
self->TickTun(now); self->TickTun(now);
} }
void
TunEndpoint::TickTun(llarp_time_t now)
{
// called in the isolated thread
}
void
TunEndpoint::tunifBeforeWrite(llarp_tun_io *tun)
{
TunEndpoint *self = static_cast< TunEndpoint * >(tun->user);
self->m_NetworkToUserPktQueue.Process(
[tun](const std::unique_ptr< net::IPv4Packet > &pkt) {
if(!llarp_ev_tun_async_write(tun, pkt->buf, pkt->sz))
llarp::LogWarn("packet dropped");
});
}
void
TunEndpoint::tunifRecvPkt(llarp_tun_io *tun, const void *buf, ssize_t sz)
{
// called for every packet read from user in isolated network thread
TunEndpoint *self = static_cast< TunEndpoint * >(tun->user);
std::unique_ptr< net::IPv4Packet > pkt = net::ParseIPv4Packet(buf, sz);
if(pkt)
self->m_UserToNetworkPktQueue.Put(pkt);
}
TunEndpoint::~TunEndpoint() TunEndpoint::~TunEndpoint()
{ {
} }

@ -0,0 +1,18 @@
#include <llarp/endian.h>
#include <llarp/ip.hpp>
namespace llarp
{
namespace net
{
std::unique_ptr< IPv4Packet >
ParseIPv4Packet(const void* buf, size_t sz)
{
if(sz < 16 || sz > IPv4Packet::MaxSize)
return nullptr;
IPv4Packet* pkt = new IPv4Packet();
memcpy(pkt->buf, buf, sz);
return std::unique_ptr< IPv4Packet >(pkt);
}
} // namespace net
} // namespace llarp

@ -22,35 +22,23 @@ frame_state::Router()
bool bool
frame_state::process_inbound_queue() frame_state::process_inbound_queue()
{ {
std::priority_queue< InboundMessage *, std::vector< InboundMessage * >,
InboundMessage::OrderCompare >
q;
recvqueue.Process(q);
uint64_t last = 0; uint64_t last = 0;
while(q.size()) recvqueue.Process([&](const std::unique_ptr< InboundMessage > &msg) {
{ if(last != msg->msgid)
// TODO: is this right?
auto &front = q.top();
if(last != front->msgid)
{ {
auto buffer = front->Buffer(); auto buffer = msg->Buffer();
if(!Router()->HandleRecvLinkMessage(parent, buffer)) if(!Router()->HandleRecvLinkMessage(parent, buffer))
{ {
llarp::LogWarn("failed to process inbound message ", front->msgid); llarp::LogWarn("failed to process inbound message ", msg->msgid);
llarp::DumpBuffer< llarp_buffer_t, 128 >(buffer); llarp::DumpBuffer< llarp_buffer_t, 128 >(buffer);
} }
last = front->msgid; last = msg->msgid;
} }
else else
{ {
llarp::LogWarn("duplicate inbound message ", last); llarp::LogWarn("duplicate inbound message ", last);
} }
delete front; });
q.pop();
}
// TODO: this isn't right // TODO: this isn't right
return true; return true;
} }
@ -193,8 +181,8 @@ void
frame_state::push_ackfor(uint64_t id, uint32_t bitmask) frame_state::push_ackfor(uint64_t id, uint32_t bitmask)
{ {
llarp::LogDebug("ACK for msgid=", id, " mask=", bitmask); llarp::LogDebug("ACK for msgid=", id, " mask=", bitmask);
auto pkt = new sendbuf_t(12 + 6); auto pkt = std::unique_ptr< sendbuf_t >(new sendbuf_t(12 + 6));
auto body_ptr = init_sendbuf(pkt, eACKS, 12, txflags); auto body_ptr = init_sendbuf(pkt.get(), eACKS, 12, txflags);
htobe64buf(body_ptr, id); htobe64buf(body_ptr, id);
htobe32buf(body_ptr + 8, bitmask); htobe32buf(body_ptr + 8, bitmask);
sendqueue.Put(pkt); sendqueue.Put(pkt);
@ -244,7 +232,9 @@ frame_state::inbound_frame_complete(uint64_t id)
} }
else else
{ {
recvqueue.Put(new InboundMessage(id, msg)); std::unique_ptr< InboundMessage > m =
std::unique_ptr< InboundMessage >(new InboundMessage(id, msg));
recvqueue.Put(m);
success = true; success = true;
} }
} }
@ -398,4 +388,4 @@ void
frame_state::alive() frame_state::alive()
{ {
lastEvent = llarp_time_now_ms(); lastEvent = llarp_time_now_ms();
} }

@ -400,15 +400,9 @@ llarp_link_session::get_parent()
void void
llarp_link_session::TickLogic(llarp_time_t now) llarp_link_session::TickLogic(llarp_time_t now)
{ {
std::queue< iwp_async_frame * > q; decryptedFrames.Process([&](const std::unique_ptr< iwp_async_frame > &msg) {
decryptedFrames.Process(q); handle_frame_decrypt(msg.get());
while(q.size()) });
{
auto &front = q.front();
handle_frame_decrypt(front);
delete front;
q.pop();
}
frame.process_inbound_queue(); frame.process_inbound_queue();
frame.retransmit(now); frame.retransmit(now);
pump(); pump();
@ -452,19 +446,10 @@ llarp_link_session::keepalive()
void void
llarp_link_session::EncryptOutboundFrames() llarp_link_session::EncryptOutboundFrames()
{ {
std::queue< iwp_async_frame * > outq; outboundFrames.Process([&](const std::unique_ptr< iwp_async_frame > &frame) {
outboundFrames.Process(outq); if(iwp_encrypt_frame(frame.get()))
while(outq.size()) handle_frame_encrypt(frame.get());
{ });
auto &front = outq.front();
// if(iwp_encrypt_frame(&front))
// q.push(front);
if(iwp_encrypt_frame(front))
handle_frame_encrypt(front);
delete front;
outq.pop();
}
} }
static void static void
@ -610,14 +595,13 @@ llarp_link_session::decrypt_frame(const void *buf, size_t sz)
// inboundFrames.Put(frame); // inboundFrames.Put(frame);
auto f = alloc_frame(buf, sz); auto f = alloc_frame(buf, sz);
if(iwp_decrypt_frame(f)) if(iwp_decrypt_frame(f.get()))
{ {
decryptedFrames.Put(f); decryptedFrames.Put(f);
} }
else else
{ {
llarp::LogWarn("decrypt frame fail"); llarp::LogWarn("decrypt frame fail");
delete f;
} }
// f->hook = &handle_frame_decrypt; // f->hook = &handle_frame_decrypt;
// iwp_call_async_frame_decrypt(iwp, f); // iwp_call_async_frame_decrypt(iwp, f);
@ -757,8 +741,7 @@ llarp_link_session::recv(const void *buf, size_t sz)
} }
} }
// TODO: fix orphan std::unique_ptr< iwp_async_frame >
iwp_async_frame *
llarp_link_session::alloc_frame(const void *buf, size_t sz) llarp_link_session::alloc_frame(const void *buf, size_t sz)
{ {
// TODO don't hard code 1500 // TODO don't hard code 1500
@ -779,14 +762,14 @@ llarp_link_session::alloc_frame(const void *buf, size_t sz)
// frame->created = now; // frame->created = now;
// llarp::LogInfo("alloc_frame putting into q"); // llarp::LogInfo("alloc_frame putting into q");
// q.Put(frame); // q.Put(frame);
return frame; return std::unique_ptr< iwp_async_frame >(frame);
} }
void void
llarp_link_session::encrypt_frame_async_send(const void *buf, size_t sz) llarp_link_session::encrypt_frame_async_send(const void *buf, size_t sz)
{ {
// 64 bytes frame overhead for nonce and hmac // 64 bytes frame overhead for nonce and hmac
iwp_async_frame *frame = alloc_frame(nullptr, sz + 64); auto frame = alloc_frame(nullptr, sz + 64);
memcpy(frame->buf + 64, buf, sz); memcpy(frame->buf + 64, buf, sz);
// maybe add upto 128 random bytes to the packet // maybe add upto 128 random bytes to the packet
auto padding = llarp_randint() % MAX_PAD; auto padding = llarp_randint() % MAX_PAD;
@ -801,18 +784,11 @@ void
llarp_link_session::pump() llarp_link_session::pump()
{ {
bool flush = false; bool flush = false;
llarp_buffer_t buf; frame.sendqueue.Process([&](const std::unique_ptr< sendbuf_t > &msg) {
std::queue< sendbuf_t * > q; llarp_buffer_t buf = msg->Buffer();
frame.sendqueue.Process(q);
while(q.size())
{
auto &front = q.front();
buf = front->Buffer();
encrypt_frame_async_send(buf.base, buf.sz); encrypt_frame_async_send(buf.base, buf.sz);
delete front;
q.pop();
flush = true; flush = true;
} });
if(flush) if(flush)
PumpCryptoOutbound(); PumpCryptoOutbound();
} }

@ -109,8 +109,8 @@ void
transit_message::generate_xmit(sendqueue_t &queue, byte_t flags) transit_message::generate_xmit(sendqueue_t &queue, byte_t flags)
{ {
uint16_t sz = lastfrag.size() + sizeof(msginfo.buffer); uint16_t sz = lastfrag.size() + sizeof(msginfo.buffer);
auto pkt = new sendbuf_t(sz + 6); auto pkt = std::unique_ptr< sendbuf_t >(new sendbuf_t(sz + 6));
auto body_ptr = init_sendbuf(pkt, eXMIT, sz, flags); auto body_ptr = init_sendbuf(pkt.get(), eXMIT, sz, flags);
memcpy(body_ptr, msginfo.buffer, sizeof(msginfo.buffer)); memcpy(body_ptr, msginfo.buffer, sizeof(msginfo.buffer));
body_ptr += sizeof(msginfo.buffer); body_ptr += sizeof(msginfo.buffer);
memcpy(body_ptr, lastfrag.data(), lastfrag.size()); memcpy(body_ptr, lastfrag.data(), lastfrag.size());
@ -128,8 +128,8 @@ transit_message::retransmit_frags(sendqueue_t &queue, byte_t flags)
if(status.test(frag.first)) if(status.test(frag.first))
continue; continue;
uint16_t sz = 9 + fragsize; uint16_t sz = 9 + fragsize;
auto pkt = new sendbuf_t(sz + 6); auto pkt = std::unique_ptr< sendbuf_t >(new sendbuf_t(sz + 6));
auto body_ptr = init_sendbuf(pkt, eFRAG, sz, flags); auto body_ptr = init_sendbuf(pkt.get(), eFRAG, sz, flags);
htobe64buf(body_ptr, msgid); htobe64buf(body_ptr, msgid);
body_ptr[8] = frag.first; body_ptr[8] = frag.first;
memcpy(body_ptr + 9, frag.second.data(), fragsize); memcpy(body_ptr + 9, frag.second.data(), fragsize);

@ -239,7 +239,8 @@ namespace llarp
} }
for(auto& builder : m_PathBuilders) for(auto& builder : m_PathBuilders)
{ {
builder->ExpirePaths(now); if(builder)
builder->ExpirePaths(now);
} }
} }

@ -32,6 +32,8 @@ namespace llarp
void void
PathSet::ExpirePaths(llarp_time_t now) PathSet::ExpirePaths(llarp_time_t now)
{ {
if(m_Paths.size() == 0)
return;
auto itr = m_Paths.begin(); auto itr = m_Paths.begin();
while(itr != m_Paths.end()) while(itr != m_Paths.end())
{ {
@ -178,4 +180,4 @@ namespace llarp
} }
} // namespace path } // namespace path
} // namespace llarp } // namespace llarp

@ -338,7 +338,7 @@ llarp_router::on_verify_server_rc(llarp_async_verify_rc *job)
router->validRouters[pk] = job->rc; router->validRouters[pk] = job->rc;
// track valid router in dht // track valid router in dht
llarp_dht_put_peer(router->dht, &router->validRouters[pk]); __llarp_dht_put_peer(router->dht, &router->validRouters[pk]);
// this was an outbound establish job // this was an outbound establish job
if(ctx->establish_job) if(ctx->establish_job)
@ -379,7 +379,8 @@ llarp_router::TryEstablishTo(const llarp::RouterID &remote)
lookup->user = this; lookup->user = this;
llarp_rc_clear(&lookup->result); llarp_rc_clear(&lookup->result);
memcpy(lookup->target, remote, PUBKEYSIZE); memcpy(lookup->target, remote, PUBKEYSIZE);
lookup->hook = &HandleDHTLookupForTryEstablishTo; lookup->hook = &HandleDHTLookupForTryEstablishTo;
lookup->iterative = false;
llarp_dht_lookup_router(this->dht, lookup); llarp_dht_lookup_router(this->dht, lookup);
} }
} }
@ -408,8 +409,6 @@ llarp_router::Tick()
// llarp::LogDebug("tick router"); // llarp::LogDebug("tick router");
auto now = llarp_time_now_ms(); auto now = llarp_time_now_ms();
paths.ExpirePaths(); paths.ExpirePaths();
// TODO: don't do this if we have enough paths already
// FIXME: build paths even if we have inbound links
if(inboundLinks.size() == 0) if(inboundLinks.size() == 0)
{ {
{ {
@ -502,7 +501,7 @@ llarp_router::SessionClosed(const llarp::RouterID &remote)
if(itr == validRouters.end()) if(itr == validRouters.end())
return; return;
llarp_dht_remove_peer(dht, remote); __llarp_dht_remove_peer(dht, remote);
llarp_rc_free(&itr->second); llarp_rc_free(&itr->second);
validRouters.erase(itr); validRouters.erase(itr);
} }

@ -97,8 +97,6 @@ struct llarp_router
llarp::service::Context hiddenServiceContext; llarp::service::Context hiddenServiceContext;
llarp::handlers::TunEndpoint *tunEndpoint = nullptr;
llarp_link *outboundLink = nullptr; llarp_link *outboundLink = nullptr;
std::list< llarp_link * > inboundLinks; std::list< llarp_link * > inboundLinks;

@ -1,3 +1,4 @@
#include <llarp/handlers/tun.hpp>
#include <llarp/service/context.hpp> #include <llarp/service/context.hpp>
namespace llarp namespace llarp
@ -39,7 +40,9 @@ namespace llarp
conf.first); conf.first);
return false; return false;
} }
auto service = new llarp::service::Endpoint(conf.first, m_Router);
std::unique_ptr< llarp::service::Endpoint > service(
new llarp::handlers::TunEndpoint(conf.first, m_Router));
for(const auto &option : conf.second) for(const auto &option : conf.second)
{ {
auto &k = option.first; auto &k = option.first;
@ -54,11 +57,11 @@ namespace llarp
if(service->Start()) if(service->Start())
{ {
llarp::LogInfo("added hidden service endpoint ", service->Name()); llarp::LogInfo("added hidden service endpoint ", service->Name());
m_Endpoints.insert(std::make_pair(conf.first, service)); m_Endpoints.insert(std::make_pair(conf.first, std::move(service)));
return true; return true;
} }
llarp::LogError("failed to start hidden service endpoint ", conf.first); llarp::LogError("failed to start hidden service endpoint ", conf.first);
return false; return false;
} }
} // namespace service } // namespace service
} // namespace llarp } // namespace llarp

@ -49,12 +49,22 @@ namespace llarp
bool bool
Endpoint::IsolateNetwork() Endpoint::IsolateNetwork()
{ {
llarp::LogInfo("isolating network to namespace ", m_NetNS);
m_IsolatedWorker = llarp_init_isolated_net_threadpool( m_IsolatedWorker = llarp_init_isolated_net_threadpool(
m_Name.c_str(), &SetupIsolatedNetwork, this); m_Name.c_str(), &SetupIsolatedNetwork, &RunIsolatedMainLoop, this);
m_IsolatedLogic = llarp_init_single_process_logic(m_IsolatedWorker); m_IsolatedLogic = llarp_init_single_process_logic(m_IsolatedWorker);
return true; return true;
} }
llarp_ev_loop*
Endpoint::EndpointNetLoop()
{
if(m_IsolatedNetLoop)
return m_IsolatedNetLoop;
else
return m_Router->netloop;
}
bool bool
Endpoint::NetworkIsIsolated() const Endpoint::NetworkIsIsolated() const
{ {
@ -559,8 +569,17 @@ namespace llarp
bool bool
Endpoint::DoNetworkIsolation() Endpoint::DoNetworkIsolation()
{ {
/// TODO: implement me llarp_ev_loop_alloc(&m_IsolatedNetLoop);
return false; return SetupNetworking();
}
void
Endpoint::RunIsolatedMainLoop(void* user)
{
Endpoint* self = static_cast< Endpoint* >(user);
llarp_ev_loop_run_single_process(self->m_IsolatedNetLoop,
self->m_IsolatedWorker,
self->m_IsolatedLogic);
} }
void void

@ -110,46 +110,24 @@ namespace llarp
condition.NotifyOne(); condition.NotifyOne();
} }
static int
runIsolated(void *arg)
{
IsolatedPool *self = static_cast< IsolatedPool * >(arg);
if(!self->Isolated())
{
llarp::LogError("failed to set up isolated environment");
return 1;
}
auto func = std::bind(&Pool::Spawn, self, self->m_IsolatedWorkers,
self->m_IsolatedName);
func();
return 0;
}
void void
IsolatedPool::Spawn(int workers, const char *name) IsolatedPool::Spawn(size_t workers, const char *name)
{ {
if(m_isolated)
return;
#ifdef __linux__ #ifdef __linux__
IsolatedPool *self = this; IsolatedPool *self = this;
self->m_IsolatedName = name; self->m_IsolatedName = name;
self->m_IsolatedWorkers = workers; self->m_IsolatedWorkers = workers;
m_isolated = new std::thread([self] { m_isolated = new std::thread([self] {
pid_t isolated; if(unshare(self->m_flags) == -1)
isolated = llarp::LogError("unshared failed: ", strerror(errno));
clone(runIsolated, self->m_childstack + sizeof(self->m_childstack), else
self->m_flags | SIGCHLD, self);
if(isolated == -1)
{ {
llarp::LogError("failed to run isolated threadpool, ", llarp::LogInfo("spawning isolated environment");
strerror(errno)); self->Pool::Spawn(self->m_IsolatedWorkers, self->m_IsolatedName);
return; if(self->Isolated())
} {
llarp::LogInfo("Spawned isolated process pool"); self->MainLoop();
if(waitpid(isolated, nullptr, 0) == -1) }
{
llarp::LogError("failed to wait for pid ", isolated, ", ",
strerror(errno));
} }
}); });
#else #else
@ -172,18 +150,22 @@ namespace llarp
#ifdef __linux__ #ifdef __linux__
NetIsolatedPool::NetIsolatedPool(std::function< bool(void *) > setupNet, NetIsolatedPool::NetIsolatedPool(std::function< bool(void *) > setupNet,
std::function< void(void *) > runMain,
void *user) void *user)
: IsolatedPool(CLONE_NEWNET) : IsolatedPool(CLONE_NEWNET)
{ {
m_NetSetup = setupNet; m_NetSetup = setupNet;
m_RunMain = runMain;
m_user = user; m_user = user;
} }
#else #else
NetIsolatedPool::NetIsolatedPool(std::function< bool(void *) > setupNet, NetIsolatedPool::NetIsolatedPool(std::function< bool(void *) > setupNet,
std::function< void(void *) > runMain,
void *user) void *user)
: IsolatedPool(0) : IsolatedPool(0)
{ {
m_NetSetup = setupNet; m_NetSetup = setupNet;
m_RunMain = runMain;
m_user = user; m_user = user;
} }
#endif #endif
@ -198,10 +180,11 @@ struct llarp_threadpool
std::queue< llarp_thread_job * > jobs; std::queue< llarp_thread_job * > jobs;
llarp_threadpool(int workers, const char *name, bool isolate, llarp_threadpool(int workers, const char *name, bool isolate,
setup_net_func setup = nullptr, void *user = nullptr) setup_net_func setup = nullptr,
run_main_func runmain = nullptr, void *user = nullptr)
{ {
if(isolate) if(isolate)
impl = new llarp::thread::NetIsolatedPool(setup, user); impl = new llarp::thread::NetIsolatedPool(setup, runmain, user);
else else
impl = new llarp::thread::Pool(); impl = new llarp::thread::Pool();
impl->Spawn(workers, name); impl->Spawn(workers, name);
@ -229,9 +212,9 @@ llarp_init_same_process_threadpool()
struct llarp_threadpool * struct llarp_threadpool *
llarp_init_isolated_net_threadpool(const char *name, setup_net_func setup, llarp_init_isolated_net_threadpool(const char *name, setup_net_func setup,
void *context) run_main_func runmain, void *context)
{ {
return new llarp_threadpool(1, name, true, setup, context); return new llarp_threadpool(1, name, true, setup, runmain, context);
} }
void void

@ -61,7 +61,7 @@ namespace llarp
} }
void void
Spawn(int workers, const char* name); Spawn(size_t workers, const char* name);
void void
Join(); Join();
@ -78,12 +78,17 @@ namespace llarp
int m_flags; int m_flags;
int m_IsolatedWorkers = 0; int m_IsolatedWorkers = 0;
const char* m_IsolatedName = nullptr; const char* m_IsolatedName = nullptr;
char m_childstack[(1024 * 1024 * 8)];
virtual void
MainLoop()
{
}
}; };
struct NetIsolatedPool : public IsolatedPool struct NetIsolatedPool : public IsolatedPool
{ {
NetIsolatedPool(std::function< bool(void*) > setupNet, void* user); NetIsolatedPool(std::function< bool(void*) > setupNet,
std::function< void(void*) > runMain, void* user);
bool bool
Isolated() Isolated()
@ -91,7 +96,14 @@ namespace llarp
return m_NetSetup(m_user); return m_NetSetup(m_user);
} }
void
MainLoop()
{
m_RunMain(m_user);
}
std::function< bool(void*) > m_NetSetup; std::function< bool(void*) > m_NetSetup;
std::function< void(void*) > m_RunMain;
void* m_user; void* m_user;
}; };

Loading…
Cancel
Save