mirror of https://github.com/oxen-io/lokinet
initial iwp
parent
aea4542edd
commit
426ee41c46
@ -0,0 +1,160 @@
|
||||
#include <iwp/message_buffer.hpp>
|
||||
#include <crypto/crypto.hpp>
|
||||
|
||||
namespace llarp
|
||||
{
|
||||
namespace iwp
|
||||
{
|
||||
OutboundMessage::OutboundMessage() :
|
||||
m_Size{0} {}
|
||||
|
||||
OutboundMessage::OutboundMessage(uint64_t msgid, const llarp_buffer_t& pkt,
|
||||
ILinkSession::CompletionHandler handler) :
|
||||
m_Size{std::min(pkt.sz, MAX_LINK_MSG_SIZE)},
|
||||
m_MsgID{msgid},
|
||||
m_Completed{handler}
|
||||
{
|
||||
m_Data.Zero();
|
||||
std::copy_n(pkt.base, m_Size, m_Data.begin());
|
||||
}
|
||||
|
||||
std::vector<byte_t>
|
||||
OutboundMessage::XMIT() const
|
||||
{
|
||||
std::vector<byte_t> xmit{LLARP_PROTO_VERSION, Command::eXMIT, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0};
|
||||
htobe16buf(xmit.data() + 2, m_Size);
|
||||
htobe64buf(xmit.data() + 4, m_MsgID);
|
||||
const llarp_buffer_t buf{m_Data.data(), m_Size};
|
||||
ShortHash H;
|
||||
CryptoManager::instance()->shorthash(H, buf);
|
||||
std::copy(H.begin(), H.end(), std::back_inserter(xmit));
|
||||
LogDebug("xmit H=", H.ToHex());
|
||||
return xmit;
|
||||
}
|
||||
|
||||
void
|
||||
OutboundMessage::Completed()
|
||||
{
|
||||
if(m_Completed)
|
||||
{
|
||||
m_Completed(ILinkSession::DeliveryStatus::eDeliverySuccess);
|
||||
}
|
||||
m_Completed = nullptr;
|
||||
}
|
||||
|
||||
bool
|
||||
OutboundMessage::ShouldFlush(llarp_time_t now) const
|
||||
{
|
||||
static constexpr llarp_time_t FlushInterval = 250;
|
||||
return now - m_LastFlush >= FlushInterval;
|
||||
}
|
||||
|
||||
void
|
||||
OutboundMessage::Ack(byte_t bitmask)
|
||||
{
|
||||
m_Acks = std::bitset<8>(bitmask);
|
||||
}
|
||||
|
||||
void
|
||||
OutboundMessage::FlushUnAcked(std::function<void(const llarp_buffer_t &)> sendpkt, llarp_time_t now)
|
||||
{
|
||||
uint16_t idx = 0;
|
||||
while(idx < m_Size)
|
||||
{
|
||||
if(not m_Acks[idx / FragmentSize])
|
||||
{
|
||||
std::vector<byte_t> frag{LLARP_PROTO_VERSION, Command::eDATA, 0,0,0,0,0,0,0,0,0,0};
|
||||
htobe16buf(frag.data() + 2, idx);
|
||||
htobe64buf(frag.data() + 4, m_MsgID);
|
||||
std::copy(m_Data.begin() + idx, m_Data.begin() + idx + FragmentSize, std::back_inserter(frag));
|
||||
const llarp_buffer_t pkt{frag};
|
||||
sendpkt(pkt);
|
||||
}
|
||||
idx += FragmentSize;
|
||||
}
|
||||
m_LastFlush = now;
|
||||
}
|
||||
|
||||
bool
|
||||
OutboundMessage::IsTransmitted() const
|
||||
{
|
||||
for(uint16_t idx = 0; idx < m_Size; idx += FragmentSize)
|
||||
{
|
||||
if(!m_Acks.test(idx / FragmentSize))
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
InboundMessage::InboundMessage() : m_Size{0} {}
|
||||
|
||||
InboundMessage::InboundMessage(uint64_t msgid, uint16_t sz, ShortHash h) :
|
||||
m_Digset{std::move(h)},
|
||||
m_Size{sz},
|
||||
m_MsgID{msgid}
|
||||
{}
|
||||
|
||||
void
|
||||
InboundMessage::HandleData(uint16_t idx, const byte_t * ptr)
|
||||
{
|
||||
if(idx + FragmentSize > MAX_LINK_MSG_SIZE)
|
||||
return;
|
||||
auto * dst = m_Data.data() + idx;
|
||||
std::copy_n(ptr, FragmentSize, dst);
|
||||
m_Acks.set(idx / FragmentSize);
|
||||
LogDebug("got fragment ", idx / FragmentSize , " of ", m_Size);
|
||||
}
|
||||
|
||||
|
||||
std::vector<byte_t>
|
||||
InboundMessage::ACKS() const
|
||||
{
|
||||
std::vector<byte_t> acks{LLARP_PROTO_VERSION, Command::eACKS, 0, 0, 0, 0, 0, 0, 0, 0, uint8_t{m_Acks.to_ulong()}};
|
||||
|
||||
htobe64buf(acks.data() + 2, m_MsgID);
|
||||
return acks;
|
||||
}
|
||||
|
||||
bool
|
||||
InboundMessage::IsCompleted() const
|
||||
{
|
||||
for(uint16_t idx = 0; idx < m_Size; idx += FragmentSize)
|
||||
{
|
||||
if(!m_Acks.test(idx / FragmentSize))
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
bool
|
||||
InboundMessage::ShouldSendACKS(llarp_time_t now) const
|
||||
{
|
||||
return now - m_LastACKSent > 1000 || IsCompleted();
|
||||
}
|
||||
|
||||
void
|
||||
InboundMessage::SendACKS(std::function<void(const llarp_buffer_t &)> sendpkt, llarp_time_t now)
|
||||
{
|
||||
auto acks = ACKS();
|
||||
const llarp_buffer_t pkt{acks};
|
||||
sendpkt(pkt);
|
||||
m_LastACKSent = now;
|
||||
}
|
||||
|
||||
bool
|
||||
InboundMessage::Verify() const
|
||||
{
|
||||
ShortHash gotten;
|
||||
const llarp_buffer_t buf{m_Data.data(), m_Size};
|
||||
CryptoManager::instance()->shorthash(gotten, buf);
|
||||
LogDebug("gotten=",gotten.ToHex());
|
||||
if(gotten != m_Digset)
|
||||
{
|
||||
DumpBuffer(buf);
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
}
|
||||
}
|
@ -0,0 +1,97 @@
|
||||
#ifndef LLARP_IWP_MESSAGE_BUFFER_HPP
|
||||
#define LLARP_IWP_MESSAGE_BUFFER_HPP
|
||||
#include <vector>
|
||||
#include <constants/link_layer.hpp>
|
||||
#include <link/session.hpp>
|
||||
#include <util/aligned.hpp>
|
||||
#include <util/buffer.hpp>
|
||||
#include <util/types.hpp>
|
||||
|
||||
namespace llarp
|
||||
{
|
||||
namespace iwp
|
||||
{
|
||||
enum Command
|
||||
{
|
||||
/// keep alive message
|
||||
ePING = 0,
|
||||
/// begin transission
|
||||
eXMIT = 1,
|
||||
/// fragment data
|
||||
eDATA = 2,
|
||||
/// acknolege fragments
|
||||
eACKS = 3,
|
||||
/// close session
|
||||
eCLOS = 4
|
||||
};
|
||||
|
||||
static constexpr size_t FragmentSize = 1024;
|
||||
|
||||
struct OutboundMessage
|
||||
{
|
||||
OutboundMessage();
|
||||
OutboundMessage(uint64_t msgid, const llarp_buffer_t& pkt,
|
||||
ILinkSession::CompletionHandler handler);
|
||||
|
||||
AlignedBuffer< MAX_LINK_MSG_SIZE > m_Data;
|
||||
uint16_t m_Size = 0;
|
||||
uint64_t m_MsgID = 0;
|
||||
std::bitset< MAX_LINK_MSG_SIZE / FragmentSize > m_Acks;
|
||||
ILinkSession::CompletionHandler m_Completed;
|
||||
llarp_time_t m_LastFlush = 0;
|
||||
|
||||
std::vector<byte_t>
|
||||
XMIT() const;
|
||||
|
||||
void
|
||||
Ack(byte_t bitmask);
|
||||
|
||||
void
|
||||
FlushUnAcked(std::function<void(const llarp_buffer_t &)> sendpkt, llarp_time_t now);
|
||||
|
||||
bool
|
||||
ShouldFlush(llarp_time_t now) const;
|
||||
|
||||
void
|
||||
Completed();
|
||||
|
||||
bool
|
||||
IsTransmitted() const;
|
||||
};
|
||||
|
||||
struct InboundMessage
|
||||
{
|
||||
InboundMessage();
|
||||
InboundMessage(uint64_t msgid, uint16_t sz, ShortHash h);
|
||||
|
||||
AlignedBuffer< MAX_LINK_MSG_SIZE > m_Data;
|
||||
ShortHash m_Digset;
|
||||
uint16_t m_Size = 0;
|
||||
uint64_t m_MsgID = 0;
|
||||
llarp_time_t m_LastACKSent = 0;
|
||||
std::bitset< MAX_LINK_MSG_SIZE / FragmentSize > m_Acks;
|
||||
|
||||
void
|
||||
HandleData(uint16_t idx, const byte_t * ptr);
|
||||
|
||||
bool
|
||||
IsCompleted() const;
|
||||
|
||||
bool
|
||||
Verify() const;
|
||||
|
||||
bool
|
||||
ShouldSendACKS(llarp_time_t now) const;
|
||||
|
||||
void
|
||||
SendACKS(std::function<void(const llarp_buffer_t &)> sendpkt, llarp_time_t now);
|
||||
|
||||
std::vector<byte_t>
|
||||
ACKS() const;
|
||||
|
||||
};
|
||||
|
||||
} // namespace iwp
|
||||
} // namespace llarp
|
||||
|
||||
#endif
|
@ -1,155 +0,0 @@
|
||||
#include <iwp/outermessage.hpp>
|
||||
#include <memory>
|
||||
|
||||
namespace llarp
|
||||
{
|
||||
namespace iwp
|
||||
{
|
||||
std::array< byte_t, 6 > OuterMessage::obtain_flow_id_magic =
|
||||
std::array< byte_t, 6 >{{'n', 'e', 't', 'i', 'd', '?'}};
|
||||
|
||||
std::array< byte_t, 6 > OuterMessage::give_flow_id_magic =
|
||||
std::array< byte_t, 6 >{{'n', 'e', 't', 'i', 'd', '!'}};
|
||||
|
||||
OuterMessage::OuterMessage()
|
||||
{
|
||||
Clear();
|
||||
}
|
||||
|
||||
OuterMessage::~OuterMessage() = default;
|
||||
|
||||
void
|
||||
OuterMessage::Clear()
|
||||
{
|
||||
command = 0;
|
||||
flow.Zero();
|
||||
netid.Zero();
|
||||
reject.fill(0);
|
||||
N.Zero();
|
||||
X.Zero();
|
||||
Xsize = 0;
|
||||
Zsig.Zero();
|
||||
Zhash.Zero();
|
||||
pubkey.Zero();
|
||||
magic.fill(0);
|
||||
uinteger = 0;
|
||||
A.reset();
|
||||
}
|
||||
|
||||
void
|
||||
OuterMessage::CreateReject(const char* msg, llarp_time_t now,
|
||||
const PubKey& pk)
|
||||
{
|
||||
Clear();
|
||||
std::copy_n(msg, std::min(strlen(msg), reject.size()), reject.begin());
|
||||
uinteger = now;
|
||||
pubkey = pk;
|
||||
}
|
||||
|
||||
bool
|
||||
OuterMessage::Encode(llarp_buffer_t* buf) const
|
||||
{
|
||||
if(buf->size_left() < 2)
|
||||
return false;
|
||||
*buf->cur = command;
|
||||
buf->cur++;
|
||||
*buf->cur = '=';
|
||||
buf->cur++;
|
||||
switch(command)
|
||||
{
|
||||
case eOCMD_ObtainFlowID:
|
||||
|
||||
case eOCMD_GiveFlowID:
|
||||
if(!buf->write(reject.begin(), reject.end()))
|
||||
return false;
|
||||
if(!buf->write(give_flow_id_magic.begin(), give_flow_id_magic.end()))
|
||||
return false;
|
||||
if(!buf->write(flow.begin(), flow.end()))
|
||||
return false;
|
||||
if(!buf->write(pubkey.begin(), pubkey.end()))
|
||||
return false;
|
||||
return buf->write(Zsig.begin(), Zsig.end());
|
||||
default:
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
bool
|
||||
OuterMessage::Decode(llarp_buffer_t* buf)
|
||||
{
|
||||
static constexpr size_t header_size = 2;
|
||||
|
||||
if(buf->size_left() < header_size)
|
||||
return false;
|
||||
command = *buf->cur;
|
||||
++buf->cur;
|
||||
if(*buf->cur != '=')
|
||||
return false;
|
||||
++buf->cur;
|
||||
switch(command)
|
||||
{
|
||||
case eOCMD_ObtainFlowID:
|
||||
if(!buf->read_into(magic.begin(), magic.end()))
|
||||
return false;
|
||||
if(!buf->read_into(netid.begin(), netid.end()))
|
||||
return false;
|
||||
if(!buf->read_uint64(uinteger))
|
||||
return false;
|
||||
if(!buf->read_into(pubkey.begin(), pubkey.end()))
|
||||
return false;
|
||||
if(buf->size_left() <= Zsig.size())
|
||||
return false;
|
||||
Xsize = buf->size_left() - Zsig.size();
|
||||
if(!buf->read_into(X.begin(), X.begin() + Xsize))
|
||||
return false;
|
||||
return buf->read_into(Zsig.begin(), Zsig.end());
|
||||
case eOCMD_GiveFlowID:
|
||||
if(!buf->read_into(magic.begin(), magic.end()))
|
||||
return false;
|
||||
if(!buf->read_into(flow.begin(), flow.end()))
|
||||
return false;
|
||||
if(!buf->read_into(pubkey.begin(), pubkey.end()))
|
||||
return false;
|
||||
buf->cur += buf->size_left() - Zsig.size();
|
||||
return buf->read_into(Zsig.begin(), Zsig.end());
|
||||
case eOCMD_Reject:
|
||||
if(!buf->read_into(reject.begin(), reject.end()))
|
||||
return false;
|
||||
if(!buf->read_uint64(uinteger))
|
||||
return false;
|
||||
if(!buf->read_into(pubkey.begin(), pubkey.end()))
|
||||
return false;
|
||||
buf->cur += buf->size_left() - Zsig.size();
|
||||
return buf->read_into(Zsig.begin(), Zsig.end());
|
||||
case eOCMD_SessionNegotiate:
|
||||
if(!buf->read_into(flow.begin(), flow.end()))
|
||||
return false;
|
||||
if(!buf->read_into(pubkey.begin(), pubkey.end()))
|
||||
return false;
|
||||
if(!buf->read_uint64(uinteger))
|
||||
return false;
|
||||
if(buf->size_left() == Zsig.size() + 32)
|
||||
{
|
||||
A = std::make_unique< AlignedBuffer< 32 > >();
|
||||
if(!buf->read_into(A->begin(), A->end()))
|
||||
return false;
|
||||
}
|
||||
return buf->read_into(Zsig.begin(), Zsig.end());
|
||||
case eOCMD_TransmitData:
|
||||
if(!buf->read_into(flow.begin(), flow.end()))
|
||||
return false;
|
||||
if(!buf->read_into(N.begin(), N.end()))
|
||||
return false;
|
||||
if(buf->size_left() <= Zhash.size())
|
||||
return false;
|
||||
Xsize = buf->size_left() - Zhash.size();
|
||||
if(!buf->read_into(X.begin(), X.begin() + Xsize))
|
||||
return false;
|
||||
return buf->read_into(Zhash.begin(), Zhash.end());
|
||||
default:
|
||||
return false;
|
||||
}
|
||||
}
|
||||
} // namespace iwp
|
||||
|
||||
} // namespace llarp
|
@ -1,86 +0,0 @@
|
||||
#ifndef LLARP_IWP_OUTERMESSAGE_HPP
|
||||
#define LLARP_IWP_OUTERMESSAGE_HPP
|
||||
|
||||
#include <crypto/types.hpp>
|
||||
#include <router_contact.hpp>
|
||||
#include <util/aligned.hpp>
|
||||
|
||||
#include <array>
|
||||
|
||||
namespace llarp
|
||||
{
|
||||
namespace iwp
|
||||
{
|
||||
using FlowID_t = AlignedBuffer< 32 >;
|
||||
|
||||
using OuterCommand_t = byte_t;
|
||||
|
||||
constexpr OuterCommand_t eOCMD_ObtainFlowID = 'O';
|
||||
constexpr OuterCommand_t eOCMD_GiveFlowID = 'G';
|
||||
constexpr OuterCommand_t eOCMD_Reject = 'R';
|
||||
constexpr OuterCommand_t eOCMD_SessionNegotiate = 'S';
|
||||
constexpr OuterCommand_t eOCMD_TransmitData = 'D';
|
||||
|
||||
using InnerCommand_t = byte_t;
|
||||
|
||||
constexpr InnerCommand_t eICMD_KeepAlive = 'k';
|
||||
constexpr InnerCommand_t eICMD_KeepAliveAck = 'l';
|
||||
constexpr InnerCommand_t eICMD_Congestion = 'c';
|
||||
constexpr InnerCommand_t eICMD_AntiCongestion = 'd';
|
||||
constexpr InnerCommand_t eICMD_Transmit = 't';
|
||||
constexpr InnerCommand_t eICMD_Ack = 'a';
|
||||
constexpr InnerCommand_t eICMD_RotateKeys = 'r';
|
||||
constexpr InnerCommand_t eICMD_UpgradeProtocol = 'u';
|
||||
constexpr InnerCommand_t eICMD_VersionUpgrade = 'v';
|
||||
|
||||
struct OuterMessage
|
||||
{
|
||||
// required members
|
||||
byte_t command;
|
||||
FlowID_t flow;
|
||||
|
||||
OuterMessage();
|
||||
~OuterMessage();
|
||||
|
||||
// static members
|
||||
static std::array< byte_t, 6 > obtain_flow_id_magic;
|
||||
static std::array< byte_t, 6 > give_flow_id_magic;
|
||||
|
||||
void
|
||||
CreateReject(const char *msg, llarp_time_t now, const PubKey &pk);
|
||||
|
||||
// optional members follow
|
||||
std::array< byte_t, 6 > magic;
|
||||
NetID netid;
|
||||
// either timestamp or counter
|
||||
uint64_t uinteger;
|
||||
std::array< byte_t, 14 > reject;
|
||||
AlignedBuffer< 24 > N;
|
||||
PubKey pubkey;
|
||||
|
||||
std::unique_ptr< AlignedBuffer< 32 > > A;
|
||||
|
||||
static constexpr size_t ipv6_mtu = 1280;
|
||||
static constexpr size_t overhead_size = 16 + 24 + 32;
|
||||
static constexpr size_t payload_size = ipv6_mtu - overhead_size;
|
||||
|
||||
AlignedBuffer< payload_size > X;
|
||||
size_t Xsize;
|
||||
ShortHash Zhash;
|
||||
Signature Zsig;
|
||||
|
||||
/// encode to buffer
|
||||
bool
|
||||
Encode(llarp_buffer_t *buf) const;
|
||||
|
||||
/// decode from buffer
|
||||
bool
|
||||
Decode(llarp_buffer_t *buf);
|
||||
|
||||
/// clear members
|
||||
void
|
||||
Clear();
|
||||
};
|
||||
} // namespace iwp
|
||||
} // namespace llarp
|
||||
#endif
|
@ -0,0 +1,536 @@
|
||||
#include <iwp/session.hpp>
|
||||
#include <util/memfn.hpp>
|
||||
#include <messages/link_intro.hpp>
|
||||
|
||||
namespace llarp
|
||||
{
|
||||
namespace iwp
|
||||
{
|
||||
static constexpr size_t PacketOverhead = HMACSIZE + TUNNONCESIZE;
|
||||
|
||||
Session::Session(LinkLayer* p, RouterContact rc, AddressInfo ai)
|
||||
: m_State{State::Initial}
|
||||
, m_Inbound{false}
|
||||
, m_Parent{p}
|
||||
, m_CreatedAt{p->Now()}
|
||||
, m_RemoteAddr{ai}
|
||||
, m_ChosenAI{std::move(ai)}
|
||||
, m_RemoteRC{std::move(rc)}
|
||||
{
|
||||
token.Zero();
|
||||
GotLIM = util::memFn(&Session::GotOutboundLIM, this);
|
||||
}
|
||||
|
||||
Session::Session(LinkLayer* p, Addr from)
|
||||
: m_State{State::Initial}
|
||||
, m_Inbound{true}
|
||||
, m_Parent{p}
|
||||
, m_CreatedAt{p->Now()}
|
||||
, m_RemoteAddr{from}
|
||||
{
|
||||
token.Randomize();
|
||||
GotLIM = util::memFn(&Session::GotInboundLIM, this);
|
||||
}
|
||||
|
||||
Session::~Session()
|
||||
{
|
||||
}
|
||||
|
||||
void
|
||||
Session::Send_LL(const llarp_buffer_t& pkt)
|
||||
{
|
||||
LogDebug("send ", pkt.sz, " to ", m_RemoteAddr);
|
||||
m_Parent->SendTo_LL(m_RemoteAddr, pkt);
|
||||
m_LastTX = time_now_ms();
|
||||
}
|
||||
|
||||
bool
|
||||
Session::GotInboundLIM(const LinkIntroMessage * msg)
|
||||
{
|
||||
if(msg->rc.enckey != m_RemoteOnionKey)
|
||||
return false;
|
||||
m_State = State::Ready;
|
||||
GotLIM = util::memFn(&Session::GotRenegLIM, this);
|
||||
return true;
|
||||
}
|
||||
|
||||
bool
|
||||
Session::GotOutboundLIM(const LinkIntroMessage * msg)
|
||||
{
|
||||
if(msg->rc.pubkey != m_RemoteRC.pubkey)
|
||||
return false;
|
||||
m_State = State::LinkIntro;
|
||||
GotLIM = util::memFn(&Session::GotRenegLIM, this);
|
||||
SendOurLIM();
|
||||
return true;
|
||||
}
|
||||
|
||||
void
|
||||
Session::SendOurLIM()
|
||||
{
|
||||
LinkIntroMessage msg;
|
||||
msg.rc = m_Parent->GetOurRC();
|
||||
msg.N.Randomize();
|
||||
msg.P = 60000;
|
||||
if(not msg.Sign(m_Parent->Sign))
|
||||
{
|
||||
LogError("failed to sign our RC for ", m_RemoteAddr);
|
||||
return;
|
||||
}
|
||||
AlignedBuffer<LinkIntroMessage::MaxSize> data;
|
||||
llarp_buffer_t buf{data};
|
||||
if(not msg.BEncode(&buf))
|
||||
{
|
||||
LogError("failed to encode LIM for ", m_RemoteAddr);
|
||||
}
|
||||
buf.sz = buf.cur - buf.base;
|
||||
buf.cur = buf.base;
|
||||
if(!SendMessageBuffer(buf, nullptr))
|
||||
{
|
||||
LogError("failed to send LIM to ", m_RemoteAddr);
|
||||
}
|
||||
LogDebug("sent LIM to ", m_RemoteAddr);
|
||||
}
|
||||
|
||||
void
|
||||
Session::EncryptAndSend(const llarp_buffer_t& data)
|
||||
{
|
||||
|
||||
std::vector< byte_t > pkt;
|
||||
pkt.resize(data.sz + PacketOverhead);
|
||||
CryptoManager::instance()->randbytes(pkt.data(), pkt.size());
|
||||
llarp_buffer_t pktbuf{pkt};
|
||||
pktbuf.base += PacketOverhead;
|
||||
pktbuf.sz -= PacketOverhead;
|
||||
byte_t* nonce_ptr = pkt.data() + HMACSIZE;
|
||||
|
||||
|
||||
CryptoManager::instance()->xchacha20_alt(pktbuf, data, m_SessionKey,
|
||||
nonce_ptr);
|
||||
|
||||
pktbuf.base = nonce_ptr;
|
||||
pktbuf.sz = data.sz + 32;
|
||||
CryptoManager::instance()->hmac(pkt.data(), pktbuf, m_SessionKey);
|
||||
|
||||
pktbuf.base = pkt.data();
|
||||
pktbuf.sz = pkt.size();
|
||||
Send_LL(pktbuf);
|
||||
}
|
||||
|
||||
void
|
||||
Session::Close()
|
||||
{
|
||||
if(m_State == State::Closed)
|
||||
return;
|
||||
const std::vector<byte_t> close_msg = {LLARP_PROTO_VERSION, Command::eCLOS};
|
||||
const llarp_buffer_t buf{close_msg};
|
||||
EncryptAndSend(buf);
|
||||
m_State = State::Closed;
|
||||
}
|
||||
|
||||
bool
|
||||
Session::SendMessageBuffer(const llarp_buffer_t& buf,
|
||||
ILinkSession::CompletionHandler completed)
|
||||
{
|
||||
const auto msgid = m_TXID++;
|
||||
auto& msg = m_TXMsgs.emplace(msgid, OutboundMessage{msgid, buf, completed})
|
||||
.first->second;
|
||||
auto xmit = msg.XMIT();
|
||||
const llarp_buffer_t pkt{xmit};
|
||||
EncryptAndSend(pkt);
|
||||
msg.FlushUnAcked(util::memFn(&Session::EncryptAndSend, this), m_Parent->Now());
|
||||
LogDebug("send message ", msgid);
|
||||
return true;
|
||||
}
|
||||
|
||||
void
|
||||
Session::Pump()
|
||||
{
|
||||
static constexpr llarp_time_t IntroInterval = 500;
|
||||
const auto now = m_Parent->Now();
|
||||
if(m_State == State::Introduction)
|
||||
{
|
||||
if(not m_Inbound)
|
||||
{
|
||||
// resend intro
|
||||
if(now - m_LastTX >= IntroInterval)
|
||||
{
|
||||
GenerateAndSendIntro();
|
||||
}
|
||||
}
|
||||
}
|
||||
else if(m_State == State::Ready || m_State == State::LinkIntro)
|
||||
{
|
||||
for(auto itr = m_RXMsgs.begin(); itr != m_RXMsgs.end(); )
|
||||
{
|
||||
if(itr->second.ShouldSendACKS(now))
|
||||
{
|
||||
itr->second.SendACKS(util::memFn(&Session::EncryptAndSend, this), now);
|
||||
}
|
||||
if(itr->second.IsCompleted())
|
||||
{
|
||||
if(itr->second.Verify())
|
||||
{
|
||||
const llarp_buffer_t buf{itr->second.m_Data.data(), itr->second.m_Size};
|
||||
LogDebug("got message ", itr->first);
|
||||
m_Parent->HandleMessage(this, buf);
|
||||
}
|
||||
else
|
||||
{
|
||||
LogError("hash missmatch for message ", itr->first);
|
||||
}
|
||||
itr = m_RXMsgs.erase(itr);
|
||||
continue;
|
||||
}
|
||||
++itr;
|
||||
}
|
||||
for(auto itr = m_TXMsgs.begin(); itr != m_TXMsgs.end(); )
|
||||
{
|
||||
if(itr->second.ShouldFlush(now))
|
||||
itr->second.FlushUnAcked(util::memFn(&Session::EncryptAndSend, this), now);
|
||||
if(itr->second.IsTransmitted())
|
||||
{
|
||||
LogDebug("sent message ", itr->first);
|
||||
itr->second.Completed();
|
||||
itr = m_TXMsgs.erase(itr);
|
||||
continue;
|
||||
}
|
||||
++itr;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
bool
|
||||
Session::GotRenegLIM(const LinkIntroMessage * lim)
|
||||
{
|
||||
return m_Parent->SessionRenegotiate(lim->rc, m_RemoteRC);
|
||||
}
|
||||
|
||||
bool
|
||||
Session::RenegotiateSession()
|
||||
{
|
||||
SendOurLIM();
|
||||
return true;
|
||||
}
|
||||
|
||||
bool
|
||||
Session::ShouldPing() const
|
||||
{
|
||||
static constexpr llarp_time_t PingInterval = 1000;
|
||||
const auto now = m_Parent->Now();
|
||||
return now - m_LastTX > PingInterval;
|
||||
}
|
||||
|
||||
util::StatusObject
|
||||
Session::ExtractStatus() const
|
||||
{
|
||||
return {
|
||||
{"remoteAddr", m_RemoteAddr.ToString()},
|
||||
{"remoteRC", m_RemoteRC.ExtractStatus()}
|
||||
};
|
||||
}
|
||||
|
||||
bool
|
||||
Session::TimedOut(llarp_time_t now) const
|
||||
{
|
||||
static constexpr llarp_time_t SessionAliveTimeout = 5000;
|
||||
if(m_State != State::Ready)
|
||||
return now - m_CreatedAt > SessionAliveTimeout;
|
||||
return now - m_LastRX > SessionAliveTimeout;
|
||||
}
|
||||
|
||||
void
|
||||
Session::Tick(llarp_time_t)
|
||||
{
|
||||
}
|
||||
|
||||
using Introduction = AlignedBuffer<64>;
|
||||
|
||||
void
|
||||
Session::GenerateAndSendIntro()
|
||||
{
|
||||
Introduction intro;
|
||||
|
||||
TunnelNonce N;
|
||||
N.Randomize();
|
||||
if(not CryptoManager::instance()->transport_dh_client(m_SessionKey, m_ChosenAI.pubkey, m_Parent->RouterEncryptionSecret(), N))
|
||||
{
|
||||
LogError("failed to transport_dh_client on outbound session to ", m_RemoteAddr);
|
||||
return;
|
||||
}
|
||||
const auto pk = m_Parent->RouterEncryptionSecret().toPublic();
|
||||
std::copy_n(pk.begin(), pk.size(), intro.begin());
|
||||
std::copy(N.begin(), N.end(), intro.begin() + 32);
|
||||
LogDebug("pk=", pk.ToHex(), " N=", N.ToHex(), " remote-pk=", m_ChosenAI.pubkey.ToHex());
|
||||
std::vector<byte_t> req;
|
||||
req.resize(intro.size() + (randint() % 64));
|
||||
CryptoManager::instance()->randbytes(req.data(), req.size());
|
||||
std::copy_n(intro.begin(), intro.size(), req.begin());
|
||||
const llarp_buffer_t buf{req};
|
||||
Send_LL(buf);
|
||||
m_State = State::Introduction;
|
||||
}
|
||||
|
||||
void
|
||||
Session::HandleCreateSessionRequest(const llarp_buffer_t & buf)
|
||||
{
|
||||
std::vector<byte_t> result;
|
||||
if(not DecryptMessage(buf, result))
|
||||
{
|
||||
LogError("failed to decrypt session request from ", m_RemoteAddr);
|
||||
return;
|
||||
}
|
||||
if(result.size() < token.size())
|
||||
{
|
||||
LogError("bad session request size, ", result.size(), " < ", token.size(), " from ", m_RemoteAddr);
|
||||
return;
|
||||
}
|
||||
if(not std::equal(result.begin(), result.begin() + token.size(), token.begin()))
|
||||
{
|
||||
LogError("token missmatch from ", m_RemoteAddr);
|
||||
return;
|
||||
}
|
||||
SendOurLIM();
|
||||
m_State = State::LinkIntro;
|
||||
}
|
||||
|
||||
void
|
||||
Session::HandleGotIntro(const llarp_buffer_t & buf)
|
||||
{
|
||||
if(buf.sz < Introduction::SIZE)
|
||||
return;
|
||||
TunnelNonce N;
|
||||
std::copy_n(buf.base, PubKey::SIZE, m_RemoteOnionKey.begin());
|
||||
std::copy_n(buf.base + PubKey::SIZE, TunnelNonce::SIZE, N.begin());
|
||||
const PubKey pk = m_Parent->TransportSecretKey().toPublic();
|
||||
LogDebug("remote-pk=", m_RemoteOnionKey.ToHex(), " N=", N.ToHex(), " local-pk=", pk.ToHex());
|
||||
if(not CryptoManager::instance()->transport_dh_server(m_SessionKey, m_RemoteOnionKey, m_Parent->TransportSecretKey(), N))
|
||||
{
|
||||
LogError("failed to transport_dh_server on inbound intro from ", m_RemoteAddr);
|
||||
return;
|
||||
}
|
||||
std::vector<byte_t> reply;
|
||||
reply.resize(token.size() + (randint() % 32));
|
||||
CryptoManager::instance()->randbytes(reply.data(), reply.size());
|
||||
std::copy_n(token.begin(), token.size(), reply.begin());
|
||||
const llarp_buffer_t pkt{reply};
|
||||
m_LastRX = m_Parent->Now();
|
||||
EncryptAndSend(pkt);
|
||||
m_State = State::Introduction;
|
||||
}
|
||||
|
||||
void
|
||||
Session::HandleGotIntroAck(const llarp_buffer_t & buf)
|
||||
{
|
||||
std::vector<byte_t> reply;
|
||||
if(not DecryptMessage(buf, reply))
|
||||
{
|
||||
LogError("intro ack decrypt failed from ", m_RemoteAddr);
|
||||
return;
|
||||
}
|
||||
if(reply.size() < token.size())
|
||||
{
|
||||
LogError("bad intro ack size ", reply.size(), " < ", token.size(), " from ", m_RemoteAddr);
|
||||
return;
|
||||
}
|
||||
m_LastRX = m_Parent->Now();
|
||||
std::copy_n(reply.begin(), token.size(), token.begin());
|
||||
const llarp_buffer_t pkt{token};
|
||||
EncryptAndSend(pkt);
|
||||
m_State = State::LinkIntro;
|
||||
}
|
||||
|
||||
bool
|
||||
Session::DecryptMessage(const llarp_buffer_t & buf, std::vector<byte_t> & result)
|
||||
{
|
||||
if(buf.sz <= PacketOverhead)
|
||||
return false;
|
||||
ShortHash H;
|
||||
llarp_buffer_t curbuf{buf.base, buf.sz};
|
||||
curbuf.base += ShortHash::SIZE;
|
||||
curbuf.sz -= ShortHash::SIZE;
|
||||
if(not CryptoManager::instance()->hmac(H.data(), curbuf, m_SessionKey))
|
||||
{
|
||||
LogError("failed to caclulate keyed hash for ", m_RemoteAddr);
|
||||
return false;
|
||||
}
|
||||
const ShortHash expected{buf.base};
|
||||
if(H != expected)
|
||||
{
|
||||
LogError("keyed hash missmatch ", H, " != ", expected, " from ", m_RemoteAddr);
|
||||
return false;
|
||||
}
|
||||
const byte_t * nonce_ptr = curbuf.base;
|
||||
curbuf.base += 32;
|
||||
curbuf.sz -= 32;
|
||||
result.resize(buf.sz - PacketOverhead);
|
||||
const llarp_buffer_t outbuf{result};
|
||||
LogDebug("decrypt: ", result.size(), " bytes from ", m_RemoteAddr);
|
||||
return CryptoManager::instance()->xchacha20_alt(outbuf, curbuf, m_SessionKey, nonce_ptr);
|
||||
}
|
||||
|
||||
void
|
||||
Session::Start()
|
||||
{
|
||||
if(m_Inbound)
|
||||
return;
|
||||
GenerateAndSendIntro();
|
||||
}
|
||||
|
||||
void
|
||||
Session::HandleSessionData(const llarp_buffer_t & buf)
|
||||
{
|
||||
std::vector<byte_t> result;
|
||||
if(not DecryptMessage(buf, result))
|
||||
{
|
||||
LogError("failed to decrypt session data from ", m_RemoteAddr);
|
||||
return;
|
||||
}
|
||||
if(result[0] != LLARP_PROTO_VERSION)
|
||||
{
|
||||
LogError("protocol version missmatch ", int(result[0]), " != ", LLARP_PROTO_VERSION);
|
||||
return;
|
||||
}
|
||||
LogDebug("command ", int(result[1]), " from ", m_RemoteAddr);
|
||||
switch(result[1])
|
||||
{
|
||||
case Command::eXMIT:
|
||||
HandleXMIT(std::move(result));
|
||||
break;
|
||||
case Command::eDATA:
|
||||
HandleDATA(std::move(result));
|
||||
break;
|
||||
case Command::eACKS:
|
||||
HandleACKS(std::move(result));
|
||||
break;
|
||||
case Command::ePING:
|
||||
HandlePING(std::move(result));
|
||||
break;
|
||||
case Command::eCLOS:
|
||||
HandleCLOS(std::move(result));
|
||||
break;
|
||||
default:
|
||||
LogError("invalid command ", int(result[1]));
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
Session::HandleXMIT(std::vector<byte_t> data)
|
||||
{
|
||||
if(data.size() < 44)
|
||||
{
|
||||
LogError("short XMIT from ", m_RemoteAddr, " ", data.size(), " < 44");
|
||||
return;
|
||||
}
|
||||
uint16_t sz = bufbe16toh(data.data() + 2);
|
||||
uint64_t rxid = bufbe64toh(data.data() + 4);
|
||||
ShortHash h{data.data() + 12};
|
||||
LogDebug("rxid=", rxid, " sz=", sz, " h=", h.ToHex());
|
||||
m_RXMsgs.emplace(rxid, InboundMessage{rxid, sz, std::move(h)});
|
||||
m_LastRX = m_Parent->Now();
|
||||
}
|
||||
|
||||
void
|
||||
Session::HandleDATA(std::vector<byte_t> data)
|
||||
{
|
||||
if(data.size() < FragmentSize + 12)
|
||||
{
|
||||
LogError("short DATA from ", m_RemoteAddr, " ", data.size(), " < ", FragmentSize + 8);
|
||||
return;
|
||||
}
|
||||
uint16_t sz = bufbe16toh(data.data() + 2);
|
||||
uint64_t rxid = bufbe64toh(data.data() + 4);
|
||||
auto itr = m_RXMsgs.find(rxid);
|
||||
if(itr == m_RXMsgs.end())
|
||||
{
|
||||
LogWarn("no rxid=", rxid, " for ", m_RemoteAddr);
|
||||
return;
|
||||
}
|
||||
itr->second.HandleData(sz, data.data() + 12);
|
||||
m_LastRX = m_Parent->Now();
|
||||
LogDebug(itr->first, " completed=", itr->second.IsCompleted());
|
||||
}
|
||||
|
||||
void
|
||||
Session::HandleACKS(std::vector<byte_t> data)
|
||||
{
|
||||
if(data.size() < 11)
|
||||
{
|
||||
LogError("short ACKS from ", m_RemoteAddr, " ", data.size(), " < 11");
|
||||
return;
|
||||
}
|
||||
uint64_t txid = bufbe64toh(data.data() + 2);
|
||||
auto itr = m_TXMsgs.find(txid);
|
||||
if(itr == m_TXMsgs.end())
|
||||
{
|
||||
LogWarn("no txid=", txid, " for ", m_RemoteAddr);
|
||||
return;
|
||||
}
|
||||
itr->second.Ack(data[10]);
|
||||
m_LastRX = m_Parent->Now();
|
||||
}
|
||||
|
||||
void
|
||||
Session::HandleCLOS(std::vector<byte_t>)
|
||||
{
|
||||
Close();
|
||||
}
|
||||
|
||||
void
|
||||
Session::HandlePING(std::vector<byte_t>)
|
||||
{
|
||||
m_LastRX = m_Parent->Now();
|
||||
}
|
||||
|
||||
bool
|
||||
Session::SendKeepAlive()
|
||||
{
|
||||
// TODO: Implement me
|
||||
return false;
|
||||
}
|
||||
|
||||
bool
|
||||
Session::IsEstablished() const
|
||||
{
|
||||
return m_State == State::Ready;
|
||||
}
|
||||
|
||||
void
|
||||
Session::Recv_LL(const llarp_buffer_t& buf)
|
||||
{
|
||||
switch(m_State)
|
||||
{
|
||||
case State::Initial:
|
||||
if(m_Inbound)
|
||||
{
|
||||
// initial data
|
||||
// enter introduction phase
|
||||
HandleGotIntro(buf);
|
||||
}
|
||||
else
|
||||
{
|
||||
// this case should never happen
|
||||
::abort();
|
||||
}
|
||||
break;
|
||||
case State::Introduction:
|
||||
if(m_Inbound)
|
||||
{
|
||||
// we are replying to an intro ack
|
||||
HandleCreateSessionRequest(buf);
|
||||
}
|
||||
else
|
||||
{
|
||||
// we got an intro ack
|
||||
// send a session request
|
||||
HandleGotIntroAck(buf);
|
||||
}
|
||||
break;
|
||||
case State::LinkIntro:
|
||||
default:
|
||||
HandleSessionData(buf);
|
||||
break;
|
||||
}
|
||||
}
|
||||
} // namespace iwp
|
||||
} // namespace llarp
|
@ -0,0 +1,189 @@
|
||||
#ifndef LLARP_IWP_SESSION_HPP
|
||||
#define LLARP_IWP_SESSION_HPP
|
||||
|
||||
#include <link/session.hpp>
|
||||
#include <iwp/linklayer.hpp>
|
||||
#include <iwp/message_buffer.hpp>
|
||||
|
||||
namespace llarp
|
||||
{
|
||||
namespace iwp
|
||||
{
|
||||
struct Session : public ILinkSession,
|
||||
public std::enable_shared_from_this< Session >
|
||||
{
|
||||
/// outbound session
|
||||
Session(LinkLayer* parent, RouterContact rc, AddressInfo ai);
|
||||
/// inbound session
|
||||
Session(LinkLayer* parent, Addr from);
|
||||
|
||||
~Session();
|
||||
|
||||
void
|
||||
Pump() override;
|
||||
|
||||
void
|
||||
Tick(llarp_time_t now) override;
|
||||
|
||||
bool
|
||||
SendMessageBuffer(const llarp_buffer_t& buf,
|
||||
CompletionHandler resultHandler) override;
|
||||
|
||||
void
|
||||
Send_LL(const llarp_buffer_t& pkt);
|
||||
|
||||
void
|
||||
EncryptAndSend(const llarp_buffer_t& data);
|
||||
|
||||
void
|
||||
Start() override;
|
||||
|
||||
void
|
||||
Close() override;
|
||||
|
||||
void
|
||||
Recv_LL(const llarp_buffer_t& pkt) override;
|
||||
|
||||
bool
|
||||
SendKeepAlive() override;
|
||||
|
||||
bool
|
||||
IsEstablished() const override;
|
||||
|
||||
bool
|
||||
TimedOut(llarp_time_t now) const override;
|
||||
|
||||
PubKey
|
||||
GetPubKey() const override
|
||||
{
|
||||
return m_RemoteRC.pubkey;
|
||||
}
|
||||
|
||||
Addr
|
||||
GetRemoteEndpoint() const override
|
||||
{
|
||||
return m_RemoteAddr;
|
||||
}
|
||||
|
||||
RouterContact
|
||||
GetRemoteRC() const override
|
||||
{
|
||||
return m_RemoteRC;
|
||||
}
|
||||
|
||||
size_t
|
||||
SendQueueBacklog() const override
|
||||
{
|
||||
return m_TXMsgs.size();
|
||||
}
|
||||
|
||||
ILinkLayer*
|
||||
GetLinkLayer() const override
|
||||
{
|
||||
return m_Parent;
|
||||
}
|
||||
|
||||
bool
|
||||
RenegotiateSession() override;
|
||||
|
||||
bool
|
||||
ShouldPing() const override;
|
||||
|
||||
util::StatusObject
|
||||
ExtractStatus() const override;
|
||||
|
||||
private:
|
||||
enum class State
|
||||
{
|
||||
/// we have no data recv'd
|
||||
Initial,
|
||||
/// we are in introduction/intro ack phase
|
||||
Introduction,
|
||||
/// we sent our LIM
|
||||
LinkIntro,
|
||||
/// handshake done and LIM has been obtained
|
||||
Ready,
|
||||
/// we are closed now
|
||||
Closed
|
||||
};
|
||||
State m_State;
|
||||
/// are we inbound session ?
|
||||
const bool m_Inbound;
|
||||
/// parent link layer
|
||||
LinkLayer* const m_Parent;
|
||||
const llarp_time_t m_CreatedAt;
|
||||
const Addr m_RemoteAddr;
|
||||
|
||||
AddressInfo m_ChosenAI;
|
||||
/// remote rc
|
||||
RouterContact m_RemoteRC;
|
||||
/// session key
|
||||
SharedSecret m_SessionKey;
|
||||
/// session token
|
||||
AlignedBuffer<16> token;
|
||||
|
||||
PubKey m_RemoteOnionKey;
|
||||
|
||||
llarp_time_t m_LastTX = 0;
|
||||
llarp_time_t m_LastRX = 0;
|
||||
|
||||
uint64_t m_TXID = 0;
|
||||
|
||||
std::unordered_map< uint64_t, InboundMessage > m_RXMsgs;
|
||||
std::unordered_map< uint64_t, OutboundMessage > m_TXMsgs;
|
||||
|
||||
void
|
||||
HandleGotIntro(const llarp_buffer_t& buf);
|
||||
|
||||
void
|
||||
HandleGotIntroAck(const llarp_buffer_t& buf);
|
||||
|
||||
void
|
||||
HandleCreateSessionRequest(const llarp_buffer_t& buf);
|
||||
|
||||
void
|
||||
ProcessSessionRequest(const llarp_buffer_t& buf);
|
||||
|
||||
void
|
||||
ProcessCreateSessionReply(const llarp_buffer_t& buf);
|
||||
|
||||
void
|
||||
HandleSessionData(const llarp_buffer_t& buf);
|
||||
|
||||
bool
|
||||
DecryptMessage(const llarp_buffer_t & buf, std::vector<byte_t> & result);
|
||||
|
||||
void
|
||||
GenerateAndSendIntro();
|
||||
|
||||
bool
|
||||
GotInboundLIM(const LinkIntroMessage * msg);
|
||||
|
||||
bool
|
||||
GotOutboundLIM(const LinkIntroMessage * msg);
|
||||
|
||||
bool
|
||||
GotRenegLIM(const LinkIntroMessage * msg);
|
||||
|
||||
void
|
||||
SendOurLIM();
|
||||
|
||||
void
|
||||
HandleXMIT(std::vector<byte_t> msg);
|
||||
|
||||
void
|
||||
HandleDATA(std::vector<byte_t> msg);
|
||||
|
||||
void
|
||||
HandleACKS(std::vector<byte_t> msg);
|
||||
|
||||
void
|
||||
HandlePING(std::vector<byte_t> msg);
|
||||
|
||||
void
|
||||
HandleCLOS(std::vector<byte_t> msg);
|
||||
};
|
||||
} // namespace iwp
|
||||
} // namespace llarp
|
||||
|
||||
#endif
|
@ -1,613 +0,0 @@
|
||||
#include <mempipe/mempipe.hpp>
|
||||
#include <messages/discard.hpp>
|
||||
#include <util/logic.hpp>
|
||||
#include <util/time.hpp>
|
||||
#include <ev/pipe.hpp>
|
||||
|
||||
namespace llarp
|
||||
{
|
||||
namespace mempipe
|
||||
{
|
||||
struct MemLink;
|
||||
struct MemSession;
|
||||
|
||||
struct MempipeContext
|
||||
{
|
||||
using Nodes_t =
|
||||
std::unordered_map< RouterID, LinkLayer_ptr, RouterID::Hash >;
|
||||
Nodes_t _nodes;
|
||||
using SendEvent = std::tuple< RouterID, RouterID, std::vector< byte_t >,
|
||||
ILinkSession::CompletionHandler >;
|
||||
|
||||
/// (src, dst, session, hook)
|
||||
std::vector< SendEvent > _sendQueue;
|
||||
using NodeConnection_t = std::tuple< RouterID, RouterID >;
|
||||
|
||||
struct NodeConnectionHash
|
||||
{
|
||||
size_t
|
||||
operator()(const NodeConnection_t con) const
|
||||
{
|
||||
const auto& a = std::get< 0 >(con);
|
||||
const auto& b = std::get< 1 >(con);
|
||||
auto op = std::bit_xor< size_t >();
|
||||
return std::accumulate(a.begin(), a.end(),
|
||||
std::accumulate(b.begin(), b.end(), 0, op),
|
||||
op);
|
||||
}
|
||||
};
|
||||
|
||||
using NodeConnections_t =
|
||||
std::unordered_map< NodeConnection_t, std::shared_ptr< MemSession >,
|
||||
NodeConnectionHash >;
|
||||
|
||||
NodeConnections_t _connections;
|
||||
|
||||
mutable util::Mutex _access;
|
||||
|
||||
void
|
||||
AddNode(LinkLayer_ptr ptr) LOCKS_EXCLUDED(_access);
|
||||
|
||||
void
|
||||
RemoveNode(LinkLayer_ptr ptr) LOCKS_EXCLUDED(_access);
|
||||
|
||||
LinkLayer_ptr
|
||||
FindNode(const RouterID pk) LOCKS_EXCLUDED(_access);
|
||||
|
||||
/// connect src to dst
|
||||
void
|
||||
ConnectNode(const RouterID src, const RouterID dst,
|
||||
const std::shared_ptr< MemSession >& ptr)
|
||||
LOCKS_EXCLUDED(_access);
|
||||
|
||||
/// remote both src and dst as connected
|
||||
void
|
||||
DisconnectNode(const RouterID src, const RouterID dst)
|
||||
LOCKS_EXCLUDED(_access);
|
||||
|
||||
bool
|
||||
HasConnection(const RouterID src, const RouterID dst) const
|
||||
LOCKS_EXCLUDED(_access);
|
||||
|
||||
void
|
||||
InboundConnection(const RouterID to,
|
||||
const std::shared_ptr< MemSession >& obsession);
|
||||
|
||||
void
|
||||
CallLater(std::function< void(void) > f)
|
||||
{
|
||||
if(m_Logic && f)
|
||||
m_Logic->queue_func(f);
|
||||
else if(f)
|
||||
LogError("dropping call");
|
||||
}
|
||||
|
||||
bool
|
||||
SendTo(const RouterID src, const RouterID dst,
|
||||
const std::vector< byte_t > msg,
|
||||
ILinkSession::CompletionHandler delivery) LOCKS_EXCLUDED(_access);
|
||||
|
||||
void
|
||||
Pump() LOCKS_EXCLUDED(_access);
|
||||
|
||||
void
|
||||
Start(llarp_ev_loop_ptr loop)
|
||||
{
|
||||
evloop = loop;
|
||||
m_Run.store(true);
|
||||
std::promise< void > p;
|
||||
m_Thread = std::make_unique< std::thread >([&]() {
|
||||
LogDebug("mempipe started");
|
||||
m_Logic = std::make_shared< Logic >();
|
||||
p.set_value();
|
||||
while(m_Run.load())
|
||||
{
|
||||
m_Logic->tick(time_now_ms());
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(1));
|
||||
Pump();
|
||||
}
|
||||
m_Logic->stop();
|
||||
});
|
||||
p.get_future().wait();
|
||||
LogDebug("mempipe up");
|
||||
}
|
||||
|
||||
~MempipeContext()
|
||||
{
|
||||
m_Run.store(false);
|
||||
if(m_Thread)
|
||||
m_Thread->join();
|
||||
}
|
||||
|
||||
std::atomic< bool > m_Run;
|
||||
std::shared_ptr< Logic > m_Logic;
|
||||
std::unique_ptr< std::thread > m_Thread = nullptr;
|
||||
llarp_ev_loop_ptr evloop = nullptr;
|
||||
};
|
||||
|
||||
using Globals_ptr = std::unique_ptr< MempipeContext >;
|
||||
|
||||
Globals_ptr _globals;
|
||||
|
||||
struct MemSession : public ILinkSession,
|
||||
public llarp_ev_pkt_pipe,
|
||||
public std::enable_shared_from_this< MemSession >
|
||||
{
|
||||
MemSession(llarp_ev_loop_ptr ev, LinkLayer_ptr _local,
|
||||
LinkLayer_ptr _remote, bool inbound)
|
||||
: llarp_ev_pkt_pipe(ev)
|
||||
, remote{std::move(_remote)}
|
||||
, parent{std::move(_local)}
|
||||
, isInbound{inbound}
|
||||
{
|
||||
}
|
||||
|
||||
LinkLayer_ptr remote;
|
||||
LinkLayer_ptr parent;
|
||||
const bool isInbound;
|
||||
|
||||
util::Mutex _access;
|
||||
|
||||
std::deque< std::vector< byte_t > > m_recvQueue;
|
||||
std::deque< std::tuple< std::vector< byte_t >, CompletionHandler > >
|
||||
m_sendQueue;
|
||||
|
||||
llarp_time_t lastRecv = 0;
|
||||
|
||||
PubKey
|
||||
GetPubKey() const override
|
||||
{
|
||||
return remote->GetOurRC().pubkey;
|
||||
}
|
||||
|
||||
bool
|
||||
SendKeepAlive() override
|
||||
{
|
||||
std::array< byte_t, 128 > pkt;
|
||||
DiscardMessage msg;
|
||||
llarp_buffer_t buf{pkt};
|
||||
if(!msg.BEncode(&buf))
|
||||
return false;
|
||||
buf.sz = buf.cur - buf.base;
|
||||
buf.cur = buf.base;
|
||||
return SendMessageBuffer(buf, nullptr);
|
||||
}
|
||||
|
||||
void
|
||||
OnRead(const llarp_buffer_t& pkt) override
|
||||
{
|
||||
std::vector< byte_t > buf;
|
||||
buf.resize(pkt.sz);
|
||||
std::copy_n(pkt.base, pkt.sz, buf.begin());
|
||||
Recv(std::move(buf));
|
||||
}
|
||||
|
||||
void
|
||||
Recv(const std::vector< byte_t > msg) LOCKS_EXCLUDED(_access)
|
||||
{
|
||||
util::Lock lock(&_access);
|
||||
m_recvQueue.emplace_back(std::move(msg));
|
||||
lastRecv = parent->Now();
|
||||
}
|
||||
|
||||
void
|
||||
OnLinkEstablished(ILinkLayer*) override
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
bool
|
||||
TimedOut(llarp_time_t now) const override
|
||||
{
|
||||
return now >= lastRecv && now - lastRecv > 5000;
|
||||
}
|
||||
|
||||
void
|
||||
PumpWrite() LOCKS_EXCLUDED(_access)
|
||||
{
|
||||
std::deque< std::tuple< std::vector< byte_t >, CompletionHandler > > q;
|
||||
{
|
||||
util::Lock lock(&_access);
|
||||
if(m_sendQueue.size())
|
||||
q = std::move(m_sendQueue);
|
||||
}
|
||||
const RouterID src = parent->GetOurRC().pubkey;
|
||||
const RouterID dst = GetPubKey();
|
||||
while(q.size())
|
||||
{
|
||||
const auto& f = q.front();
|
||||
_globals->SendTo(src, dst, std::get< 0 >(f), std::get< 1 >(f));
|
||||
q.pop_front();
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
PumpRead() LOCKS_EXCLUDED(_access)
|
||||
{
|
||||
std::deque< std::vector< byte_t > > q;
|
||||
{
|
||||
util::Lock lock(&_access);
|
||||
if(m_recvQueue.size())
|
||||
q = std::move(m_recvQueue);
|
||||
}
|
||||
while(q.size())
|
||||
{
|
||||
const llarp_buffer_t buf{q.front()};
|
||||
parent->HandleMessage(this, buf);
|
||||
q.pop_front();
|
||||
}
|
||||
}
|
||||
|
||||
void Tick(llarp_time_t) override
|
||||
{
|
||||
Pump();
|
||||
}
|
||||
|
||||
void
|
||||
Pump() override
|
||||
{
|
||||
PumpRead();
|
||||
PumpWrite();
|
||||
}
|
||||
|
||||
void
|
||||
Close() override
|
||||
{
|
||||
auto self = shared_from_this();
|
||||
_globals->CallLater([=]() { self->Disconnected(); });
|
||||
}
|
||||
|
||||
RouterContact
|
||||
GetRemoteRC() const override
|
||||
{
|
||||
return remote->GetOurRC();
|
||||
}
|
||||
|
||||
bool
|
||||
ShouldPing() const override
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
bool
|
||||
SendMessageBuffer(const llarp_buffer_t& pkt,
|
||||
ILinkSession::CompletionHandler completed) override
|
||||
{
|
||||
if(completed == nullptr)
|
||||
completed = [](ILinkSession::DeliveryStatus) {};
|
||||
auto self = shared_from_this();
|
||||
std::vector< byte_t > buf(pkt.sz);
|
||||
std::copy_n(pkt.base, pkt.sz, buf.begin());
|
||||
return _globals->SendTo(parent->GetOurRC().pubkey, GetRemoteRC().pubkey,
|
||||
buf, [=](ILinkSession::DeliveryStatus status) {
|
||||
self->parent->logic()->call_later(
|
||||
10, std::bind(completed, status));
|
||||
});
|
||||
}
|
||||
|
||||
void
|
||||
Start() override
|
||||
{
|
||||
if(!StartPipe())
|
||||
return;
|
||||
if(isInbound)
|
||||
return;
|
||||
LogDebug("outbound start");
|
||||
auto self = shared_from_this();
|
||||
_globals->CallLater([=]() {
|
||||
LogDebug("Called inbound connection");
|
||||
_globals->InboundConnection(self->GetPubKey(), self);
|
||||
});
|
||||
}
|
||||
|
||||
bool
|
||||
IsEstablished() const override
|
||||
{
|
||||
return _globals->HasConnection(parent->GetOurRC().pubkey, GetPubKey());
|
||||
}
|
||||
|
||||
void
|
||||
Disconnected()
|
||||
{
|
||||
_globals->DisconnectNode(parent->GetOurRC().pubkey, GetPubKey());
|
||||
}
|
||||
|
||||
bool
|
||||
RenegotiateSession() override
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
ILinkLayer*
|
||||
GetLinkLayer() const override
|
||||
{
|
||||
return parent.get();
|
||||
}
|
||||
|
||||
util::StatusObject
|
||||
ExtractStatus() const override
|
||||
{
|
||||
return {};
|
||||
}
|
||||
|
||||
llarp::Addr
|
||||
GetRemoteEndpoint() const override
|
||||
{
|
||||
return {};
|
||||
}
|
||||
|
||||
size_t
|
||||
SendQueueBacklog() const override
|
||||
{
|
||||
return m_sendQueue.size();
|
||||
}
|
||||
};
|
||||
|
||||
struct MemLink : public ILinkLayer,
|
||||
public std::enable_shared_from_this< MemLink >
|
||||
{
|
||||
MemLink(const SecretKey& routerEncSecret, GetRCFunc getrc,
|
||||
LinkMessageHandler h, SignBufferFunc sign,
|
||||
SessionEstablishedHandler est, SessionRenegotiateHandler reneg,
|
||||
TimeoutHandler timeout, SessionClosedHandler closed,
|
||||
bool permitInbound)
|
||||
: ILinkLayer(routerEncSecret, getrc, h, sign, est, reneg, timeout,
|
||||
closed)
|
||||
, allowInbound(permitInbound)
|
||||
{
|
||||
}
|
||||
|
||||
const bool allowInbound;
|
||||
|
||||
bool
|
||||
KeyGen(SecretKey& k) override
|
||||
{
|
||||
k.Zero();
|
||||
return true;
|
||||
}
|
||||
|
||||
const char*
|
||||
Name() const override
|
||||
{
|
||||
return "mempipe";
|
||||
}
|
||||
|
||||
uint16_t
|
||||
Rank() const override
|
||||
{
|
||||
return 100;
|
||||
}
|
||||
|
||||
void
|
||||
Pump() override
|
||||
{
|
||||
LogDebug("memlink pump");
|
||||
std::set< RouterID > sessions;
|
||||
{
|
||||
Lock l(&m_AuthedLinksMutex);
|
||||
auto itr = m_AuthedLinks.begin();
|
||||
while(itr != m_AuthedLinks.end())
|
||||
{
|
||||
sessions.insert(itr->first);
|
||||
++itr;
|
||||
}
|
||||
}
|
||||
ILinkLayer::Pump();
|
||||
{
|
||||
Lock l(&m_AuthedLinksMutex);
|
||||
for(const auto& pk : sessions)
|
||||
{
|
||||
if(m_AuthedLinks.count(pk) == 0)
|
||||
{
|
||||
// all sessions were removed
|
||||
SessionClosed(pk);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
RecvFrom(const llarp::Addr&, const void*, size_t) override
|
||||
{
|
||||
}
|
||||
|
||||
bool
|
||||
Configure(llarp_ev_loop_ptr ev, const std::string&, int,
|
||||
uint16_t) override
|
||||
{
|
||||
m_Loop = ev;
|
||||
if(_globals == nullptr)
|
||||
{
|
||||
_globals = std::make_unique< MempipeContext >();
|
||||
_globals->Start(ev);
|
||||
}
|
||||
return _globals != nullptr;
|
||||
}
|
||||
|
||||
std::shared_ptr< ILinkSession >
|
||||
NewOutboundSession(const RouterContact& rc,
|
||||
const AddressInfo& ai) override
|
||||
{
|
||||
if(ai.dialect != Name())
|
||||
return nullptr;
|
||||
auto remote = _globals->FindNode(rc.pubkey);
|
||||
if(remote == nullptr)
|
||||
return nullptr;
|
||||
return std::make_shared< MemSession >(m_Loop, shared_from_this(),
|
||||
remote, false);
|
||||
}
|
||||
|
||||
bool
|
||||
Start(std::shared_ptr< Logic > l) override
|
||||
{
|
||||
if(!ILinkLayer::Start(l))
|
||||
return false;
|
||||
_globals->AddNode(shared_from_this());
|
||||
return true;
|
||||
}
|
||||
|
||||
void
|
||||
Stop() override
|
||||
{
|
||||
_globals->RemoveNode(shared_from_this());
|
||||
}
|
||||
};
|
||||
|
||||
LinkLayer_ptr
|
||||
NewOutboundLink(const SecretKey& routerEncSecret, GetRCFunc getrc,
|
||||
LinkMessageHandler h, SignBufferFunc sign,
|
||||
SessionEstablishedHandler est,
|
||||
SessionRenegotiateHandler reneg, TimeoutHandler timeout,
|
||||
SessionClosedHandler closed)
|
||||
{
|
||||
return std::make_shared< MemLink >(routerEncSecret, getrc, h, sign, est,
|
||||
reneg, timeout, closed, false);
|
||||
}
|
||||
|
||||
LinkLayer_ptr
|
||||
NewInboundLink(const SecretKey& routerEncSecret, GetRCFunc getrc,
|
||||
LinkMessageHandler h, SignBufferFunc sign,
|
||||
SessionEstablishedHandler est,
|
||||
SessionRenegotiateHandler reneg, TimeoutHandler timeout,
|
||||
SessionClosedHandler closed)
|
||||
{
|
||||
return std::make_shared< MemLink >(routerEncSecret, getrc, h, sign, est,
|
||||
reneg, timeout, closed, true);
|
||||
}
|
||||
|
||||
void
|
||||
MempipeContext::AddNode(LinkLayer_ptr ptr)
|
||||
{
|
||||
util::Lock lock(&_access);
|
||||
_nodes.emplace(RouterID(ptr->GetOurRC().pubkey), ptr);
|
||||
LogInfo("add mempipe node: ", RouterID(ptr->GetOurRC().pubkey));
|
||||
}
|
||||
|
||||
bool
|
||||
MempipeContext::SendTo(const RouterID src, const RouterID dst,
|
||||
const std::vector< byte_t > msg,
|
||||
ILinkSession::CompletionHandler delivery)
|
||||
{
|
||||
util::Lock lock(&_access);
|
||||
_sendQueue.emplace_back(std::move(src), std::move(dst), std::move(msg),
|
||||
std::move(delivery));
|
||||
return true;
|
||||
}
|
||||
|
||||
void
|
||||
MempipeContext::InboundConnection(const RouterID to,
|
||||
const std::shared_ptr< MemSession >& ob)
|
||||
{
|
||||
LogDebug("inbound connect to ", to, " from ",
|
||||
RouterID(ob->parent->GetOurRC().pubkey));
|
||||
std::shared_ptr< MemSession > other;
|
||||
{
|
||||
util::Lock lock(&_access);
|
||||
auto itr = _nodes.find(to);
|
||||
if(itr != _nodes.end())
|
||||
{
|
||||
other = std::make_shared< MemSession >(evloop, itr->second,
|
||||
ob->parent, true);
|
||||
}
|
||||
}
|
||||
if(other)
|
||||
{
|
||||
ConnectNode(other->GetPubKey(), ob->GetPubKey(), other);
|
||||
ConnectNode(ob->GetPubKey(), other->GetPubKey(), ob);
|
||||
ob->parent->logic()->queue_func([ob]() {
|
||||
ob->parent->MapAddr(RouterID{ob->GetPubKey()}, ob.get());
|
||||
ob->parent->SessionEstablished(ob.get());
|
||||
});
|
||||
other->parent->logic()->queue_func([other]() {
|
||||
other->parent->MapAddr(RouterID{other->GetPubKey()}, other.get());
|
||||
other->parent->SessionEstablished(other.get());
|
||||
});
|
||||
}
|
||||
else
|
||||
{
|
||||
ob->Disconnected();
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
MempipeContext::ConnectNode(const RouterID src, const RouterID dst,
|
||||
const std::shared_ptr< MemSession >& session)
|
||||
{
|
||||
LogDebug("connect ", src, " to ", dst);
|
||||
util::Lock lock(&_access);
|
||||
_connections.emplace(std::make_pair(std::make_tuple(src, dst), session));
|
||||
}
|
||||
|
||||
void
|
||||
MempipeContext::DisconnectNode(const RouterID src, const RouterID dst)
|
||||
{
|
||||
LogDebug("connect ", src, " from ", dst);
|
||||
util::Lock lock(&_access);
|
||||
_connections.erase({src, dst});
|
||||
}
|
||||
|
||||
LinkLayer_ptr
|
||||
MempipeContext::FindNode(const RouterID rid)
|
||||
{
|
||||
util::Lock lock(&_access);
|
||||
auto itr = _nodes.find(rid);
|
||||
if(itr == _nodes.end())
|
||||
return nullptr;
|
||||
return itr->second;
|
||||
}
|
||||
|
||||
bool
|
||||
MempipeContext::HasConnection(const RouterID src, const RouterID dst) const
|
||||
{
|
||||
util::Lock lock(&_access);
|
||||
return _connections.find({src, dst}) != _connections.end();
|
||||
}
|
||||
|
||||
void
|
||||
MempipeContext::RemoveNode(LinkLayer_ptr node)
|
||||
{
|
||||
util::Lock lock(&_access);
|
||||
const RouterID pk = node->GetOurRC().pubkey;
|
||||
_nodes.erase(pk);
|
||||
auto itr = _connections.begin();
|
||||
while(itr != _connections.end())
|
||||
{
|
||||
if(std::get< 0 >(itr->first) == pk || std::get< 1 >(itr->first) == pk)
|
||||
{
|
||||
auto s = itr->second->shared_from_this();
|
||||
itr->second->GetLinkLayer()->logic()->call_later(
|
||||
1, [s]() { s->Disconnected(); });
|
||||
}
|
||||
++itr;
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
MempipeContext::Pump()
|
||||
{
|
||||
std::vector< SendEvent > q;
|
||||
{
|
||||
util::Lock lock(&_access);
|
||||
q = std::move(_sendQueue);
|
||||
}
|
||||
for(auto& f : q)
|
||||
{
|
||||
ILinkSession::DeliveryStatus status =
|
||||
ILinkSession::DeliveryStatus::eDeliveryDropped;
|
||||
{
|
||||
util::Lock lock(&_access);
|
||||
auto itr = _connections.find({std::get< 0 >(f), std::get< 1 >(f)});
|
||||
if(itr != _connections.end())
|
||||
{
|
||||
const llarp_buffer_t pkt{std::get< 2 >(f)};
|
||||
if(itr->second->Write(pkt))
|
||||
status = ILinkSession::DeliveryStatus::eDeliverySuccess;
|
||||
}
|
||||
}
|
||||
LogDebug(std::get< 0 >(f), "->", std::get< 1 >(f),
|
||||
" status=", (int)status);
|
||||
CallLater(std::bind(std::get< 3 >(f), status));
|
||||
}
|
||||
}
|
||||
} // namespace mempipe
|
||||
} // namespace llarp
|
@ -1,25 +0,0 @@
|
||||
#ifndef LLARP_MEMPIPE_MEMPIPE_HPP
|
||||
#define LLARP_MEMPIPE_MEMPIPE_HPP
|
||||
#include <memory>
|
||||
#include <link/server.hpp>
|
||||
|
||||
namespace llarp
|
||||
{
|
||||
namespace mempipe
|
||||
{
|
||||
LinkLayer_ptr
|
||||
NewInboundLink(const SecretKey& routerEncSecret, GetRCFunc getrc,
|
||||
LinkMessageHandler h, SignBufferFunc sign,
|
||||
SessionEstablishedHandler est,
|
||||
SessionRenegotiateHandler reneg, TimeoutHandler timeout,
|
||||
SessionClosedHandler closed);
|
||||
LinkLayer_ptr
|
||||
NewOutboundLink(const SecretKey& routerEncSecret, GetRCFunc getrc,
|
||||
LinkMessageHandler h, SignBufferFunc sign,
|
||||
SessionEstablishedHandler est,
|
||||
SessionRenegotiateHandler reneg, TimeoutHandler timeout,
|
||||
SessionClosedHandler closed);
|
||||
} // namespace mempipe
|
||||
} // namespace llarp
|
||||
|
||||
#endif
|
Loading…
Reference in New Issue