try coleasing inbound packets from iwp

pull/1543/head
Jeff Becker 3 years ago
parent 0c869600df
commit 42ffbcca0a
No known key found for this signature in database
GPG Key ID: F357B3B42F6F9B05

@ -59,12 +59,31 @@ namespace llarp
class NetworkInterface;
}
/// distinct event loop waker uper
class EventLoopWakeup
{
protected:
std::function<void()> callback;
public:
EventLoopWakeup(std::function<void()> cb) : callback{cb}
{}
virtual ~EventLoopWakeup() = default;
/// async wakeup and call callback once
virtual void
Wakeup() = 0;
/// end operation
virtual void
End() = 0;
};
// this (nearly!) abstract base class
// is overriden for each platform
struct EventLoop
{
byte_t readbuf[EV_READ_BUF_SZ] = {0};
virtual bool
init() = 0;
@ -127,6 +146,10 @@ namespace llarp
virtual void
deregister_poll_fd_readable(int fd) = 0;
/// make an event loop waker on this event loop
virtual EventLoopWakeup*
make_event_loop_waker(std::function<void()> callback) = 0;
};
} // namespace llarp
#endif

@ -20,6 +20,54 @@ namespace libuv
Close() = 0;
};
class UVWakeup final : public llarp::EventLoopWakeup, public glue
{
uv_async_t m_Impl;
const int m_Idx;
static void
OnWake(uv_async_t* self)
{
static_cast<UVWakeup*>(self->data)->callback();
}
public:
UVWakeup(uv_loop_t* loop, std::function<void()> hook, int idx)
: llarp::EventLoopWakeup{hook}, m_Idx{idx}
{
uv_async_init(loop, &m_Impl, OnWake);
m_Impl.data = this;
}
~UVWakeup() = default;
void
Close() override
{
uv_close((uv_handle_t*)&m_Impl, [](uv_handle_t* h) {
auto loop = static_cast<libuv::Loop*>(h->loop->data);
loop->delete_waker(static_cast<UVWakeup*>(h->data)->m_Idx);
});
}
void
End() override
{
Close();
}
void
Wakeup() override
{
uv_async_send(&m_Impl);
}
bool
operator<(const UVWakeup& other) const
{
return m_Idx < other.m_Idx;
}
};
struct ticker_glue : public glue
{
std::function<void(void)> func;
@ -622,4 +670,20 @@ namespace libuv
}
}
llarp::EventLoopWakeup*
Loop::make_event_loop_waker(std::function<void()> callback)
{
auto wake_idx = m_NumWakers++;
auto wake = new UVWakeup{&m_Impl, callback, wake_idx};
m_Wakers[wake_idx] = wake;
return wake;
}
void
Loop::delete_waker(int idx)
{
delete m_Wakers[idx];
m_Wakers.erase(idx);
}
} // namespace libuv

@ -12,6 +12,8 @@
namespace libuv
{
class UVWakeup;
struct Loop final : public llarp::EventLoop
{
typedef std::function<void(void)> Callback;
@ -96,6 +98,12 @@ namespace libuv
void
set_pump_function(std::function<void(void)> pumpll) override;
llarp::EventLoopWakeup*
make_event_loop_waker(std::function<void()> callback) override;
void
delete_waker(int idx);
void
FlushLogic();
@ -122,6 +130,9 @@ namespace libuv
llarp::thread::Queue<PendingTimer> m_timerQueue;
llarp::thread::Queue<uint32_t> m_timerCancelQueue;
std::optional<std::thread::id> m_EventLoopThreadID;
int m_NumWakers;
std::unordered_map<int, UVWakeup*> m_Wakers;
};
} // namespace libuv

@ -10,6 +10,7 @@ namespace llarp
LinkLayer_ptr
NewInboundLink(
std::shared_ptr<KeyManager> keyManager,
std::shared_ptr<EventLoop> loop,
GetRCFunc getrc,
LinkMessageHandler h,
SignBufferFunc sign,
@ -22,12 +23,25 @@ namespace llarp
WorkerFunc_t work)
{
return std::make_shared<LinkLayer>(
keyManager, getrc, h, sign, before, est, reneg, timeout, closed, pumpDone, work, true);
keyManager,
loop,
getrc,
h,
sign,
before,
est,
reneg,
timeout,
closed,
pumpDone,
work,
true);
}
LinkLayer_ptr
NewOutboundLink(
std::shared_ptr<KeyManager> keyManager,
std::shared_ptr<EventLoop> loop,
GetRCFunc getrc,
LinkMessageHandler h,
SignBufferFunc sign,
@ -40,7 +54,19 @@ namespace llarp
WorkerFunc_t work)
{
return std::make_shared<LinkLayer>(
keyManager, getrc, h, sign, before, est, reneg, timeout, closed, pumpDone, work, false);
keyManager,
loop,
getrc,
h,
sign,
before,
est,
reneg,
timeout,
closed,
pumpDone,
work,
false);
}
} // namespace iwp
} // namespace llarp

@ -11,6 +11,7 @@ namespace llarp::iwp
LinkLayer_ptr
NewInboundLink(
std::shared_ptr<KeyManager> keyManager,
std::shared_ptr<EventLoop> loop,
GetRCFunc getrc,
LinkMessageHandler h,
SignBufferFunc sign,
@ -25,6 +26,7 @@ namespace llarp::iwp
LinkLayer_ptr
NewOutboundLink(
std::shared_ptr<KeyManager> keyManager,
std::shared_ptr<EventLoop> loop,
GetRCFunc getrc,
LinkMessageHandler h,
SignBufferFunc sign,

@ -4,96 +4,126 @@
#include <memory>
#include <unordered_set>
namespace llarp
namespace llarp::iwp
{
namespace iwp
LinkLayer::LinkLayer(
std::shared_ptr<KeyManager> keyManager,
std::shared_ptr<EventLoop> ev,
GetRCFunc getrc,
LinkMessageHandler h,
SignBufferFunc sign,
BeforeConnectFunc_t before,
SessionEstablishedHandler est,
SessionRenegotiateHandler reneg,
TimeoutHandler timeout,
SessionClosedHandler closed,
PumpDoneHandler pumpDone,
WorkerFunc_t worker,
bool allowInbound)
: ILinkLayer(
keyManager, getrc, h, sign, before, est, reneg, timeout, closed, pumpDone, worker)
, m_Wakeup{ev->make_event_loop_waker([self = this]() { self->HandleWakeupPlaintext(); })}
, m_PlaintextRecv{1024}
, permitInbound{allowInbound}
{}
LinkLayer::~LinkLayer()
{
LinkLayer::LinkLayer(
std::shared_ptr<KeyManager> keyManager,
GetRCFunc getrc,
LinkMessageHandler h,
SignBufferFunc sign,
BeforeConnectFunc_t before,
SessionEstablishedHandler est,
SessionRenegotiateHandler reneg,
TimeoutHandler timeout,
SessionClosedHandler closed,
PumpDoneHandler pumpDone,
WorkerFunc_t worker,
bool allowInbound)
: ILinkLayer(
keyManager, getrc, h, sign, before, est, reneg, timeout, closed, pumpDone, worker)
, permitInbound{allowInbound}
{}
m_Wakeup->End();
}
LinkLayer::~LinkLayer() = default;
const char*
LinkLayer::Name() const
{
return "iwp";
}
const char*
LinkLayer::Name() const
uint16_t
LinkLayer::Rank() const
{
return 2;
}
void
LinkLayer::RecvFrom(const SockAddr& from, ILinkSession::Packet_t pkt)
{
std::shared_ptr<ILinkSession> session;
auto itr = m_AuthedAddrs.find(from);
bool isNewSession = false;
if (itr == m_AuthedAddrs.end())
{
return "iwp";
Lock_t lock(m_PendingMutex);
if (m_Pending.count(from) == 0)
{
if (not permitInbound)
return;
isNewSession = true;
m_Pending.insert({from, std::make_shared<Session>(this, from)});
}
session = m_Pending.find(from)->second;
}
uint16_t
LinkLayer::Rank() const
else
{
return 2;
Lock_t lock(m_AuthedLinksMutex);
auto range = m_AuthedLinks.equal_range(itr->second);
session = range.first->second;
}
void
LinkLayer::RecvFrom(const SockAddr& from, ILinkSession::Packet_t pkt)
if (session)
{
std::shared_ptr<ILinkSession> session;
auto itr = m_AuthedAddrs.find(from);
bool isNewSession = false;
if (itr == m_AuthedAddrs.end())
{
Lock_t lock(m_PendingMutex);
if (m_Pending.count(from) == 0)
{
if (not permitInbound)
return;
isNewSession = true;
m_Pending.insert({from, std::make_shared<Session>(this, from)});
}
session = m_Pending.find(from)->second;
}
else
{
Lock_t lock(m_AuthedLinksMutex);
auto range = m_AuthedLinks.equal_range(itr->second);
session = range.first->second;
}
if (session)
bool success = session->Recv_LL(std::move(pkt));
if (!success and isNewSession)
{
bool success = session->Recv_LL(std::move(pkt));
if (!success and isNewSession)
{
LogWarn("Brand new session failed; removing from pending sessions list");
m_Pending.erase(m_Pending.find(from));
}
LogWarn("Brand new session failed; removing from pending sessions list");
m_Pending.erase(m_Pending.find(from));
}
}
}
bool
LinkLayer::MapAddr(const RouterID& r, ILinkSession* s)
{
if (!ILinkLayer::MapAddr(r, s))
return false;
m_AuthedAddrs.emplace(s->GetRemoteEndpoint(), r);
return true;
}
bool
LinkLayer::MapAddr(const RouterID& r, ILinkSession* s)
{
if (!ILinkLayer::MapAddr(r, s))
return false;
m_AuthedAddrs.emplace(s->GetRemoteEndpoint(), r);
return true;
}
void
LinkLayer::UnmapAddr(const IpAddress& addr)
{
m_AuthedAddrs.erase(addr);
}
void
LinkLayer::UnmapAddr(const IpAddress& addr)
{
m_AuthedAddrs.erase(addr);
}
std::shared_ptr<ILinkSession>
LinkLayer::NewOutboundSession(const RouterContact& rc, const AddressInfo& ai)
{
return std::make_shared<Session>(this, rc, ai);
}
void
LinkLayer::AddWakeup(std::weak_ptr<Session> session)
{
m_PlaintextRecv.tryPushBack(session);
}
std::shared_ptr<ILinkSession>
LinkLayer::NewOutboundSession(const RouterContact& rc, const AddressInfo& ai)
void
LinkLayer::WakeupPlaintext()
{
m_Wakeup->Wakeup();
}
void
LinkLayer::HandleWakeupPlaintext()
{
while (not m_PlaintextRecv.empty())
{
return std::make_shared<Session>(this, rc, ai);
auto session = m_PlaintextRecv.popFront();
auto ptr = session.lock();
if (ptr)
ptr->HandlePlaintext();
}
} // namespace iwp
} // namespace llarp
PumpDone();
}
} // namespace llarp::iwp

@ -7,56 +7,69 @@
#include <crypto/types.hpp>
#include <link/server.hpp>
#include <config/key_manager.hpp>
#include <util/thread/queue.hpp>
#include <memory>
namespace llarp
#include <ev/ev.hpp>
namespace llarp::iwp
{
namespace iwp
struct Session;
struct LinkLayer final : public ILinkLayer
{
struct LinkLayer final : public ILinkLayer
{
LinkLayer(
std::shared_ptr<KeyManager> keyManager,
GetRCFunc getrc,
LinkMessageHandler h,
SignBufferFunc sign,
BeforeConnectFunc_t before,
SessionEstablishedHandler est,
SessionRenegotiateHandler reneg,
TimeoutHandler timeout,
SessionClosedHandler closed,
PumpDoneHandler pumpDone,
WorkerFunc_t dowork,
bool permitInbound);
~LinkLayer() override;
std::shared_ptr<ILinkSession>
NewOutboundSession(const RouterContact& rc, const AddressInfo& ai) override;
const char*
Name() const override;
uint16_t
Rank() const override;
void
RecvFrom(const SockAddr& from, ILinkSession::Packet_t pkt) override;
bool
MapAddr(const RouterID& pk, ILinkSession* s) override;
void
UnmapAddr(const IpAddress& addr);
private:
std::unordered_map<IpAddress, RouterID, IpAddress::Hash> m_AuthedAddrs;
const bool permitInbound;
};
using LinkLayer_ptr = std::shared_ptr<LinkLayer>;
} // namespace iwp
} // namespace llarp
LinkLayer(
std::shared_ptr<KeyManager> keyManager,
std::shared_ptr<EventLoop> ev,
GetRCFunc getrc,
LinkMessageHandler h,
SignBufferFunc sign,
BeforeConnectFunc_t before,
SessionEstablishedHandler est,
SessionRenegotiateHandler reneg,
TimeoutHandler timeout,
SessionClosedHandler closed,
PumpDoneHandler pumpDone,
WorkerFunc_t dowork,
bool permitInbound);
~LinkLayer() override;
std::shared_ptr<ILinkSession>
NewOutboundSession(const RouterContact& rc, const AddressInfo& ai) override;
const char*
Name() const override;
uint16_t
Rank() const override;
void
RecvFrom(const SockAddr& from, ILinkSession::Packet_t pkt) override;
bool
MapAddr(const RouterID& pk, ILinkSession* s) override;
void
UnmapAddr(const IpAddress& addr);
void
WakeupPlaintext();
void
AddWakeup(std::weak_ptr<Session> peer);
private:
void
HandleWakeupPlaintext();
EventLoopWakeup* const m_Wakeup;
llarp::thread::Queue<std::weak_ptr<Session>> m_PlaintextRecv;
std::unordered_map<IpAddress, RouterID, IpAddress::Hash> m_AuthedAddrs;
const bool permitInbound;
};
using LinkLayer_ptr = std::shared_ptr<LinkLayer>;
} // namespace llarp::iwp
#endif

@ -26,6 +26,8 @@ namespace llarp
return pkt;
}
constexpr size_t PlaintextQueueSize = 32;
Session::Session(LinkLayer* p, const RouterContact& rc, const AddressInfo& ai)
: m_State{State::Initial}
, m_Inbound{false}
@ -34,6 +36,7 @@ namespace llarp
, m_RemoteAddr(ai.toIpAddress())
, m_ChosenAI(ai)
, m_RemoteRC(rc)
, m_PlaintextRecv{PlaintextQueueSize}
{
token.Zero();
GotLIM = util::memFn(&Session::GotOutboundLIM, this);
@ -46,6 +49,7 @@ namespace llarp
, m_Parent(p)
, m_CreatedAt{p->Now()}
, m_RemoteAddr(from)
, m_PlaintextRecv{PlaintextQueueSize}
{
token.Randomize();
GotLIM = util::memFn(&Session::GotInboundLIM, this);
@ -130,23 +134,21 @@ namespace llarp
void
Session::EncryptAndSend(ILinkSession::Packet_t data)
{
if (m_EncryptNext == nullptr)
m_EncryptNext = std::make_shared<CryptoQueue_t>();
m_EncryptNext->emplace_back(std::move(data));
m_EncryptNext.emplace_back(std::move(data));
if (!IsEstablished())
{
EncryptWorker(std::move(m_EncryptNext));
m_EncryptNext = nullptr;
m_EncryptNext = CryptoQueue_t{};
}
}
void
Session::EncryptWorker(CryptoQueue_ptr msgs)
Session::EncryptWorker(CryptoQueue_t msgs)
{
LogDebug("encrypt worker ", msgs->size(), " messages");
for (auto& pkt : *msgs)
LogDebug("encrypt worker ", msgs.size(), " messages");
for (auto& pkt : msgs)
{
llarp_buffer_t pktbuf(pkt);
llarp_buffer_t pktbuf{pkt};
const TunnelNonce nonce_ptr{pkt.data() + HMACSIZE};
pktbuf.base += PacketOverhead;
pktbuf.cur = pktbuf.base;
@ -243,16 +245,17 @@ namespace llarp
}
auto self = shared_from_this();
assert(self.use_count() > 1);
if (m_EncryptNext && !m_EncryptNext->empty())
if (not m_EncryptNext.empty())
{
m_Parent->QueueWork([self, data = std::move(m_EncryptNext)] { self->EncryptWorker(data); });
m_EncryptNext = nullptr;
m_Parent->QueueWork([self, data = m_EncryptNext] { self->EncryptWorker(data); });
m_EncryptNext.clear();
}
if (m_DecryptNext && !m_DecryptNext->empty())
if (not m_DecryptNext.empty())
{
m_Parent->QueueWork([self, data = std::move(m_DecryptNext)] { self->DecryptWorker(data); });
m_DecryptNext = nullptr;
m_Parent->AddWakeup(weak_from_this());
m_Parent->QueueWork([self, data = m_DecryptNext] { self->DecryptWorker(data); });
m_DecryptNext.clear();
}
}
@ -596,19 +599,19 @@ namespace llarp
void
Session::HandleSessionData(Packet_t pkt)
{
if (m_DecryptNext == nullptr)
m_DecryptNext = std::make_shared<CryptoQueue_t>();
m_DecryptNext->emplace_back(std::move(pkt));
m_DecryptNext.emplace_back(std::move(pkt));
}
void
Session::DecryptWorker(CryptoQueue_ptr msgs)
Session::DecryptWorker(CryptoQueue_t msgs)
{
CryptoQueue_ptr recvMsgs = std::make_shared<CryptoQueue_t>();
for (auto& pkt : *msgs)
auto itr = msgs.begin();
while (itr != msgs.end())
{
auto& pkt = *itr;
if (not DecryptMessageInPlace(pkt))
{
itr = msgs.erase(itr);
LogError("failed to decrypt session data from ", m_RemoteAddr);
continue;
}
@ -616,52 +619,54 @@ namespace llarp
{
LogError(
"protocol version mismatch ", int(pkt[PacketOverhead]), " != ", LLARP_PROTO_VERSION);
itr = msgs.erase(itr);
continue;
}
recvMsgs->emplace_back(std::move(pkt));
++itr;
}
LogDebug("decrypted ", recvMsgs->size(), " packets from ", m_RemoteAddr);
LogicCall(m_Parent->logic(), [self = shared_from_this(), msgs = recvMsgs] {
self->HandlePlaintext(std::move(msgs));
});
m_PlaintextRecv.pushBack(std::move(msgs));
m_Parent->WakeupPlaintext();
}
void
Session::HandlePlaintext(CryptoQueue_ptr msgs)
Session::HandlePlaintext()
{
for (auto& result : *msgs)
while (not m_PlaintextRecv.empty())
{
LogDebug("Command ", int(result[PacketOverhead + 1]));
switch (result[PacketOverhead + 1])
auto queue = m_PlaintextRecv.popFront();
for (auto& result : queue)
{
case Command::eXMIT:
HandleXMIT(std::move(result));
break;
case Command::eDATA:
HandleDATA(std::move(result));
break;
case Command::eACKS:
HandleACKS(std::move(result));
break;
case Command::ePING:
HandlePING(std::move(result));
break;
case Command::eNACK:
HandleNACK(std::move(result));
break;
case Command::eCLOS:
HandleCLOS(std::move(result));
break;
case Command::eMACK:
HandleMACK(std::move(result));
break;
default:
LogError("invalid command ", int(result[PacketOverhead + 1]), " from ", m_RemoteAddr);
LogDebug("Command ", int(result[PacketOverhead + 1]));
switch (result[PacketOverhead + 1])
{
case Command::eXMIT:
HandleXMIT(std::move(result));
break;
case Command::eDATA:
HandleDATA(std::move(result));
break;
case Command::eACKS:
HandleACKS(std::move(result));
break;
case Command::ePING:
HandlePING(std::move(result));
break;
case Command::eNACK:
HandleNACK(std::move(result));
break;
case Command::eCLOS:
HandleCLOS(std::move(result));
break;
case Command::eMACK:
HandleMACK(std::move(result));
break;
default:
LogError("invalid command ", int(result[PacketOverhead + 1]), " from ", m_RemoteAddr);
}
}
}
SendMACK();
Pump();
m_Parent->PumpDone();
}
void

@ -126,6 +126,8 @@ namespace llarp
{
return m_Inbound;
}
void
HandlePlaintext();
private:
enum class State
@ -189,19 +191,18 @@ namespace llarp
/// rx messages to send in next round of multiacks
std::priority_queue<uint64_t, std::vector<uint64_t>, std::greater<uint64_t>> m_SendMACKs;
using CryptoQueue_t = std::list<Packet_t>;
using CryptoQueue_ptr = std::shared_ptr<CryptoQueue_t>;
CryptoQueue_ptr m_EncryptNext;
CryptoQueue_ptr m_DecryptNext;
using CryptoQueue_t = std::vector<Packet_t>;
void
EncryptWorker(CryptoQueue_ptr msgs);
CryptoQueue_t m_EncryptNext;
CryptoQueue_t m_DecryptNext;
llarp::thread::Queue<CryptoQueue_t> m_PlaintextRecv;
void
DecryptWorker(CryptoQueue_ptr msgs);
EncryptWorker(CryptoQueue_t msgs);
void
HandlePlaintext(CryptoQueue_ptr msgs);
DecryptWorker(CryptoQueue_t msgs);
void
HandleGotIntro(Packet_t pkt);

@ -612,6 +612,7 @@ namespace llarp
{
auto server = iwp::NewInboundLink(
m_keyManager,
netloop(),
util::memFn(&AbstractRouter::rc, this),
util::memFn(&AbstractRouter::HandleRecvLinkMessageBuffer, this),
util::memFn(&AbstractRouter::Sign, this),
@ -1313,6 +1314,7 @@ namespace llarp
{
auto link = iwp::NewOutboundLink(
m_keyManager,
netloop(),
util::memFn(&AbstractRouter::rc, this),
util::memFn(&AbstractRouter::HandleRecvLinkMessageBuffer, this),
util::memFn(&AbstractRouter::Sign, this),

@ -76,6 +76,7 @@ struct IWPLinkContext
{
link = make_link<inbound>(
keyManager,
m_Loop,
// getrc
[&]() -> const llarp::RouterContact& { return rc; },
// link message handler

Loading…
Cancel
Save