diff --git a/llarp/CMakeLists.txt b/llarp/CMakeLists.txt index db3854ce1..fd6a36fbc 100644 --- a/llarp/CMakeLists.txt +++ b/llarp/CMakeLists.txt @@ -176,15 +176,15 @@ set(LIB_SRC handlers/null.cpp handlers/tun.cpp hook/shell.cpp - iwp/linklayer.cpp - iwp/outermessage.cpp iwp/iwp.cpp + iwp/linklayer.cpp + iwp/message_buffer.cpp + iwp/session.cpp link/factory.cpp link/i_link_manager.cpp link/link_manager.cpp link/server.cpp link/session.cpp - mempipe/mempipe.cpp messages/dht_immediate.cpp messages/discard.cpp messages/link_intro.cpp diff --git a/llarp/iwp/iwp.cpp b/llarp/iwp/iwp.cpp index 457a51340..eae5384be 100644 --- a/llarp/iwp/iwp.cpp +++ b/llarp/iwp/iwp.cpp @@ -7,22 +7,26 @@ namespace llarp { namespace iwp { - std::unique_ptr< ILinkLayer > - NewServer(const SecretKey& enckey, GetRCFunc getrc, LinkMessageHandler h, - SessionEstablishedHandler est, SessionRenegotiateHandler reneg, - SignBufferFunc sign, TimeoutHandler t, - SessionClosedHandler closed) + LinkLayer_ptr + NewInboundLink(const SecretKey& routerEncSecret, GetRCFunc getrc, + LinkMessageHandler h, SignBufferFunc sign, + SessionEstablishedHandler est, + SessionRenegotiateHandler reneg, TimeoutHandler timeout, + SessionClosedHandler closed) { - (void)enckey; - (void)getrc; - (void)h; - (void)est; - (void)reneg; - (void)sign; - (void)t; - (void)closed; - // TODO: implement me - return nullptr; + return std::make_shared< LinkLayer >(routerEncSecret, getrc, h, sign, est, + reneg, timeout, closed, true); + } + + LinkLayer_ptr + NewOutboundLink(const SecretKey& routerEncSecret, GetRCFunc getrc, + LinkMessageHandler h, SignBufferFunc sign, + SessionEstablishedHandler est, + SessionRenegotiateHandler reneg, TimeoutHandler timeout, + SessionClosedHandler closed) + { + return std::make_shared< LinkLayer >(routerEncSecret, getrc, h, sign, est, + reneg, timeout, closed, false); } } // namespace iwp } // namespace llarp diff --git a/llarp/iwp/iwp.hpp b/llarp/iwp/iwp.hpp index e7a10413e..0e9eacaac 100644 --- a/llarp/iwp/iwp.hpp +++ b/llarp/iwp/iwp.hpp @@ -2,21 +2,25 @@ #define LLARP_IWP_HPP #include - +#include #include namespace llarp { - struct AbstractRouter; - namespace iwp { - std::unique_ptr< ILinkLayer > - NewServer(const SecretKey& routerEncSecret, llarp::GetRCFunc getrc, - llarp::LinkMessageHandler h, llarp::SessionEstablishedHandler est, - llarp::SessionRenegotiateHandler reneg, - llarp::SignBufferFunc sign, llarp::TimeoutHandler timeout, - llarp::SessionClosedHandler closed); + LinkLayer_ptr + NewInboundLink(const SecretKey& routerEncSecret, GetRCFunc getrc, + LinkMessageHandler h, SignBufferFunc sign, + SessionEstablishedHandler est, + SessionRenegotiateHandler reneg, TimeoutHandler timeout, + SessionClosedHandler closed); + LinkLayer_ptr + NewOutboundLink(const SecretKey& routerEncSecret, GetRCFunc getrc, + LinkMessageHandler h, SignBufferFunc sign, + SessionEstablishedHandler est, + SessionRenegotiateHandler reneg, TimeoutHandler timeout, + SessionClosedHandler closed); } // namespace iwp } // namespace llarp diff --git a/llarp/iwp/linklayer.cpp b/llarp/iwp/linklayer.cpp index 8a984d7c4..9de470693 100644 --- a/llarp/iwp/linklayer.cpp +++ b/llarp/iwp/linklayer.cpp @@ -1,16 +1,20 @@ #include +#include namespace llarp { namespace iwp { - LinkLayer::LinkLayer(const SecretKey& enckey, GetRCFunc getrc, - LinkMessageHandler h, SessionEstablishedHandler est, - SessionRenegotiateHandler reneg, SignBufferFunc sign, - TimeoutHandler t, SessionClosedHandler closed) - : ILinkLayer(enckey, getrc, h, sign, est, reneg, t, closed) + LinkLayer::LinkLayer(const SecretKey& routerEncSecret, GetRCFunc getrc, + LinkMessageHandler h, SignBufferFunc sign, + SessionEstablishedHandler est, + SessionRenegotiateHandler reneg, + TimeoutHandler timeout, SessionClosedHandler closed, + bool allowInbound) + : ILinkLayer(routerEncSecret, getrc, h, sign, est, reneg, timeout, + closed) + , permitInbound{allowInbound} { - m_FlowCookie.Randomize(); } LinkLayer::~LinkLayer() = default; @@ -44,135 +48,30 @@ namespace llarp bool LinkLayer::Start(std::shared_ptr< Logic > l) { - if(!ILinkLayer::Start(l)) - return false; - return false; + return ILinkLayer::Start(l); } void LinkLayer::RecvFrom(const Addr& from, const void* pkt, size_t sz) { - m_OuterMsg.Clear(); - llarp_buffer_t sigbuf(pkt, sz); - llarp_buffer_t decodebuf(pkt, sz); - if(!m_OuterMsg.Decode(&decodebuf)) + std::shared_ptr< ILinkSession > session; { - LogError("failed to decode outer message"); - return; - } - NetID ourNetID; - switch(m_OuterMsg.command) - { - case eOCMD_ObtainFlowID: - sigbuf.sz -= m_OuterMsg.Zsig.size(); - if(!CryptoManager::instance()->verify(m_OuterMsg.pubkey, sigbuf, - m_OuterMsg.Zsig)) - { - LogError("failed to verify signature on '", - (char)m_OuterMsg.command, "' message from ", from); - return; - } - if(!ShouldSendFlowID(from)) - { - SendReject(from, "no flo 4u :^)"); - return; - } - if(m_OuterMsg.netid == ourNetID) - { - if(GenFlowIDFor(m_OuterMsg.pubkey, from, m_OuterMsg.flow)) - SendFlowID(from, m_OuterMsg.flow); - else - SendReject(from, "genflow fail"); - } - else - SendReject(from, "bad netid"); + util::Lock lock(&m_PendingMutex); + if(m_Pending.count(from) == 0) + { + m_Pending.insert({from, std::make_shared< Session >(this, from)}); + } + session = m_Pending.find(from)->second; } + const llarp_buffer_t buf{pkt, sz}; + session->Recv_LL(buf); } std::shared_ptr< ILinkSession > LinkLayer::NewOutboundSession(const RouterContact& rc, const AddressInfo& ai) { - (void)rc; - (void)ai; - // TODO: implement me - return {}; - } - - void - LinkLayer::SendFlowID(const Addr& to, const FlowID_t& flow) - { - // TODO: implement me - (void)to; - (void)flow; - } - - bool - LinkLayer::VerifyFlowID(const PubKey& pk, const Addr& from, - const FlowID_t& flow) const - { - FlowID_t expected; - if(!GenFlowIDFor(pk, from, expected)) - return false; - return expected == flow; - } - - bool - LinkLayer::GenFlowIDFor(const PubKey& pk, const Addr& from, - FlowID_t& flow) const - { - std::array< byte_t, 128 > tmp = {{0}}; - if(inet_ntop(AF_INET6, from.addr6(), (char*)tmp.data(), tmp.size()) - == nullptr) - return false; - std::copy_n(pk.begin(), pk.size(), tmp.begin() + 64); - std::copy_n(m_FlowCookie.begin(), m_FlowCookie.size(), - tmp.begin() + 64 + pk.size()); - llarp_buffer_t buf(tmp); - ShortHash h; - if(!CryptoManager::instance()->shorthash(h, buf)) - return false; - std::copy_n(h.begin(), flow.size(), flow.begin()); - return true; - } - - bool - LinkLayer::ShouldSendFlowID(const Addr& to) const - { - (void)to; - // TODO: implement me - return false; - } - - void - LinkLayer::SendReject(const Addr& to, const char* msg) - { - if(strlen(msg) > 14) - { - throw std::logic_error("reject message too big"); - } - std::array< byte_t, 120 > pkt; - auto now = Now(); - PubKey pk = GetOurRC().pubkey; - OuterMessage m; - m.CreateReject(msg, now, pk); - llarp_buffer_t encodebuf(pkt); - if(!m.Encode(&encodebuf)) - { - LogError("failed to encode reject message to ", to); - return; - } - llarp_buffer_t signbuf(pkt.data(), pkt.size() - m.Zsig.size()); - if(!Sign(m.Zsig, signbuf)) - { - LogError("failed to sign reject messsage to ", to); - return; - } - std::copy_n(m.Zsig.begin(), m.Zsig.size(), - pkt.begin() + (pkt.size() - m.Zsig.size())); - llarp_buffer_t pktbuf(pkt); - SendTo_LL(to, pktbuf); + return std::make_shared< Session >(this, rc, ai); } } // namespace iwp - } // namespace llarp diff --git a/llarp/iwp/linklayer.hpp b/llarp/iwp/linklayer.hpp index e7265188e..364716de8 100644 --- a/llarp/iwp/linklayer.hpp +++ b/llarp/iwp/linklayer.hpp @@ -6,7 +6,6 @@ #include #include #include -#include namespace llarp { @@ -14,10 +13,11 @@ namespace llarp { struct LinkLayer final : public ILinkLayer { - LinkLayer(const SecretKey &encryptionSecretKey, GetRCFunc getrc, - LinkMessageHandler h, SessionEstablishedHandler established, - SessionRenegotiateHandler reneg, SignBufferFunc sign, - TimeoutHandler timeout, SessionClosedHandler closed); + LinkLayer(const SecretKey &routerEncSecret, GetRCFunc getrc, + LinkMessageHandler h, SignBufferFunc sign, + SessionEstablishedHandler est, SessionRenegotiateHandler reneg, + TimeoutHandler timeout, SessionClosedHandler closed, + bool permitInbound); ~LinkLayer() override; @@ -40,41 +40,14 @@ namespace llarp uint16_t Rank() const override; - /// verify that a new flow id matches addresses and pubkey - bool - VerifyFlowID(const PubKey &pk, const Addr &from, - const FlowID_t &flow) const; - void RecvFrom(const Addr &from, const void *buf, size_t sz) override; private: - bool - GenFlowIDFor(const PubKey &pk, const Addr &from, FlowID_t &flow) const; - - bool - ShouldSendFlowID(const Addr &from) const; - - void - SendReject(const Addr &to, const char *msg); - - void - SendFlowID(const Addr &to, const FlowID_t &flow); - - using ActiveFlows_t = - std::unordered_map< FlowID_t, RouterID, FlowID_t::Hash >; - - ActiveFlows_t m_ActiveFlows; - - using PendingFlows_t = std::unordered_map< Addr, FlowID_t, Addr::Hash >; - /// flows that are pending authentication - PendingFlows_t m_PendingFlows; - - /// cookie used in flow id computation - AlignedBuffer< 32 > m_FlowCookie; - - OuterMessage m_OuterMsg; + const bool permitInbound; }; + + using LinkLayer_ptr = std::shared_ptr< LinkLayer >; } // namespace iwp } // namespace llarp diff --git a/llarp/iwp/message_buffer.cpp b/llarp/iwp/message_buffer.cpp new file mode 100644 index 000000000..b2ed59a87 --- /dev/null +++ b/llarp/iwp/message_buffer.cpp @@ -0,0 +1,160 @@ +#include +#include + +namespace llarp +{ + namespace iwp + { + OutboundMessage::OutboundMessage() : + m_Size{0} {} + + OutboundMessage::OutboundMessage(uint64_t msgid, const llarp_buffer_t& pkt, + ILinkSession::CompletionHandler handler) : + m_Size{std::min(pkt.sz, MAX_LINK_MSG_SIZE)}, + m_MsgID{msgid}, + m_Completed{handler} + { + m_Data.Zero(); + std::copy_n(pkt.base, m_Size, m_Data.begin()); + } + + std::vector + OutboundMessage::XMIT() const + { + std::vector xmit{LLARP_PROTO_VERSION, Command::eXMIT, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}; + htobe16buf(xmit.data() + 2, m_Size); + htobe64buf(xmit.data() + 4, m_MsgID); + const llarp_buffer_t buf{m_Data.data(), m_Size}; + ShortHash H; + CryptoManager::instance()->shorthash(H, buf); + std::copy(H.begin(), H.end(), std::back_inserter(xmit)); + LogDebug("xmit H=", H.ToHex()); + return xmit; + } + + void + OutboundMessage::Completed() + { + if(m_Completed) + { + m_Completed(ILinkSession::DeliveryStatus::eDeliverySuccess); + } + m_Completed = nullptr; + } + + bool + OutboundMessage::ShouldFlush(llarp_time_t now) const + { + static constexpr llarp_time_t FlushInterval = 250; + return now - m_LastFlush >= FlushInterval; + } + + void + OutboundMessage::Ack(byte_t bitmask) + { + m_Acks = std::bitset<8>(bitmask); + } + + void + OutboundMessage::FlushUnAcked(std::function sendpkt, llarp_time_t now) + { + uint16_t idx = 0; + while(idx < m_Size) + { + if(not m_Acks[idx / FragmentSize]) + { + std::vector frag{LLARP_PROTO_VERSION, Command::eDATA, 0,0,0,0,0,0,0,0,0,0}; + htobe16buf(frag.data() + 2, idx); + htobe64buf(frag.data() + 4, m_MsgID); + std::copy(m_Data.begin() + idx, m_Data.begin() + idx + FragmentSize, std::back_inserter(frag)); + const llarp_buffer_t pkt{frag}; + sendpkt(pkt); + } + idx += FragmentSize; + } + m_LastFlush = now; + } + + bool + OutboundMessage::IsTransmitted() const + { + for(uint16_t idx = 0; idx < m_Size; idx += FragmentSize) + { + if(!m_Acks.test(idx / FragmentSize)) + return false; + } + return true; + } + + InboundMessage::InboundMessage() : m_Size{0} {} + + InboundMessage::InboundMessage(uint64_t msgid, uint16_t sz, ShortHash h) : + m_Digset{std::move(h)}, + m_Size{sz}, + m_MsgID{msgid} + {} + + void + InboundMessage::HandleData(uint16_t idx, const byte_t * ptr) + { + if(idx + FragmentSize > MAX_LINK_MSG_SIZE) + return; + auto * dst = m_Data.data() + idx; + std::copy_n(ptr, FragmentSize, dst); + m_Acks.set(idx / FragmentSize); + LogDebug("got fragment ", idx / FragmentSize , " of ", m_Size); + } + + + std::vector + InboundMessage::ACKS() const + { + std::vector acks{LLARP_PROTO_VERSION, Command::eACKS, 0, 0, 0, 0, 0, 0, 0, 0, uint8_t{m_Acks.to_ulong()}}; + + htobe64buf(acks.data() + 2, m_MsgID); + return acks; + } + + bool + InboundMessage::IsCompleted() const + { + for(uint16_t idx = 0; idx < m_Size; idx += FragmentSize) + { + if(!m_Acks.test(idx / FragmentSize)) + return false; + } + return true; + } + + bool + InboundMessage::ShouldSendACKS(llarp_time_t now) const + { + return now - m_LastACKSent > 1000 || IsCompleted(); + } + + void + InboundMessage::SendACKS(std::function sendpkt, llarp_time_t now) + { + auto acks = ACKS(); + const llarp_buffer_t pkt{acks}; + sendpkt(pkt); + m_LastACKSent = now; + } + + bool + InboundMessage::Verify() const + { + ShortHash gotten; + const llarp_buffer_t buf{m_Data.data(), m_Size}; + CryptoManager::instance()->shorthash(gotten, buf); + LogDebug("gotten=",gotten.ToHex()); + if(gotten != m_Digset) + { + DumpBuffer(buf); + return false; + } + return true; + } + + } +} \ No newline at end of file diff --git a/llarp/iwp/message_buffer.hpp b/llarp/iwp/message_buffer.hpp new file mode 100644 index 000000000..0d60e9a0a --- /dev/null +++ b/llarp/iwp/message_buffer.hpp @@ -0,0 +1,97 @@ +#ifndef LLARP_IWP_MESSAGE_BUFFER_HPP +#define LLARP_IWP_MESSAGE_BUFFER_HPP +#include +#include +#include +#include +#include +#include + +namespace llarp +{ + namespace iwp + { + enum Command + { + /// keep alive message + ePING = 0, + /// begin transission + eXMIT = 1, + /// fragment data + eDATA = 2, + /// acknolege fragments + eACKS = 3, + /// close session + eCLOS = 4 + }; + + static constexpr size_t FragmentSize = 1024; + + struct OutboundMessage + { + OutboundMessage(); + OutboundMessage(uint64_t msgid, const llarp_buffer_t& pkt, + ILinkSession::CompletionHandler handler); + + AlignedBuffer< MAX_LINK_MSG_SIZE > m_Data; + uint16_t m_Size = 0; + uint64_t m_MsgID = 0; + std::bitset< MAX_LINK_MSG_SIZE / FragmentSize > m_Acks; + ILinkSession::CompletionHandler m_Completed; + llarp_time_t m_LastFlush = 0; + + std::vector + XMIT() const; + + void + Ack(byte_t bitmask); + + void + FlushUnAcked(std::function sendpkt, llarp_time_t now); + + bool + ShouldFlush(llarp_time_t now) const; + + void + Completed(); + + bool + IsTransmitted() const; + }; + + struct InboundMessage + { + InboundMessage(); + InboundMessage(uint64_t msgid, uint16_t sz, ShortHash h); + + AlignedBuffer< MAX_LINK_MSG_SIZE > m_Data; + ShortHash m_Digset; + uint16_t m_Size = 0; + uint64_t m_MsgID = 0; + llarp_time_t m_LastACKSent = 0; + std::bitset< MAX_LINK_MSG_SIZE / FragmentSize > m_Acks; + + void + HandleData(uint16_t idx, const byte_t * ptr); + + bool + IsCompleted() const; + + bool + Verify() const; + + bool + ShouldSendACKS(llarp_time_t now) const; + + void + SendACKS(std::function sendpkt, llarp_time_t now); + + std::vector + ACKS() const; + + }; + + } // namespace iwp +} // namespace llarp + +#endif \ No newline at end of file diff --git a/llarp/iwp/outermessage.cpp b/llarp/iwp/outermessage.cpp deleted file mode 100644 index 3d7ad2af7..000000000 --- a/llarp/iwp/outermessage.cpp +++ /dev/null @@ -1,155 +0,0 @@ -#include -#include - -namespace llarp -{ - namespace iwp - { - std::array< byte_t, 6 > OuterMessage::obtain_flow_id_magic = - std::array< byte_t, 6 >{{'n', 'e', 't', 'i', 'd', '?'}}; - - std::array< byte_t, 6 > OuterMessage::give_flow_id_magic = - std::array< byte_t, 6 >{{'n', 'e', 't', 'i', 'd', '!'}}; - - OuterMessage::OuterMessage() - { - Clear(); - } - - OuterMessage::~OuterMessage() = default; - - void - OuterMessage::Clear() - { - command = 0; - flow.Zero(); - netid.Zero(); - reject.fill(0); - N.Zero(); - X.Zero(); - Xsize = 0; - Zsig.Zero(); - Zhash.Zero(); - pubkey.Zero(); - magic.fill(0); - uinteger = 0; - A.reset(); - } - - void - OuterMessage::CreateReject(const char* msg, llarp_time_t now, - const PubKey& pk) - { - Clear(); - std::copy_n(msg, std::min(strlen(msg), reject.size()), reject.begin()); - uinteger = now; - pubkey = pk; - } - - bool - OuterMessage::Encode(llarp_buffer_t* buf) const - { - if(buf->size_left() < 2) - return false; - *buf->cur = command; - buf->cur++; - *buf->cur = '='; - buf->cur++; - switch(command) - { - case eOCMD_ObtainFlowID: - - case eOCMD_GiveFlowID: - if(!buf->write(reject.begin(), reject.end())) - return false; - if(!buf->write(give_flow_id_magic.begin(), give_flow_id_magic.end())) - return false; - if(!buf->write(flow.begin(), flow.end())) - return false; - if(!buf->write(pubkey.begin(), pubkey.end())) - return false; - return buf->write(Zsig.begin(), Zsig.end()); - default: - return false; - } - } - - bool - OuterMessage::Decode(llarp_buffer_t* buf) - { - static constexpr size_t header_size = 2; - - if(buf->size_left() < header_size) - return false; - command = *buf->cur; - ++buf->cur; - if(*buf->cur != '=') - return false; - ++buf->cur; - switch(command) - { - case eOCMD_ObtainFlowID: - if(!buf->read_into(magic.begin(), magic.end())) - return false; - if(!buf->read_into(netid.begin(), netid.end())) - return false; - if(!buf->read_uint64(uinteger)) - return false; - if(!buf->read_into(pubkey.begin(), pubkey.end())) - return false; - if(buf->size_left() <= Zsig.size()) - return false; - Xsize = buf->size_left() - Zsig.size(); - if(!buf->read_into(X.begin(), X.begin() + Xsize)) - return false; - return buf->read_into(Zsig.begin(), Zsig.end()); - case eOCMD_GiveFlowID: - if(!buf->read_into(magic.begin(), magic.end())) - return false; - if(!buf->read_into(flow.begin(), flow.end())) - return false; - if(!buf->read_into(pubkey.begin(), pubkey.end())) - return false; - buf->cur += buf->size_left() - Zsig.size(); - return buf->read_into(Zsig.begin(), Zsig.end()); - case eOCMD_Reject: - if(!buf->read_into(reject.begin(), reject.end())) - return false; - if(!buf->read_uint64(uinteger)) - return false; - if(!buf->read_into(pubkey.begin(), pubkey.end())) - return false; - buf->cur += buf->size_left() - Zsig.size(); - return buf->read_into(Zsig.begin(), Zsig.end()); - case eOCMD_SessionNegotiate: - if(!buf->read_into(flow.begin(), flow.end())) - return false; - if(!buf->read_into(pubkey.begin(), pubkey.end())) - return false; - if(!buf->read_uint64(uinteger)) - return false; - if(buf->size_left() == Zsig.size() + 32) - { - A = std::make_unique< AlignedBuffer< 32 > >(); - if(!buf->read_into(A->begin(), A->end())) - return false; - } - return buf->read_into(Zsig.begin(), Zsig.end()); - case eOCMD_TransmitData: - if(!buf->read_into(flow.begin(), flow.end())) - return false; - if(!buf->read_into(N.begin(), N.end())) - return false; - if(buf->size_left() <= Zhash.size()) - return false; - Xsize = buf->size_left() - Zhash.size(); - if(!buf->read_into(X.begin(), X.begin() + Xsize)) - return false; - return buf->read_into(Zhash.begin(), Zhash.end()); - default: - return false; - } - } - } // namespace iwp - -} // namespace llarp diff --git a/llarp/iwp/outermessage.hpp b/llarp/iwp/outermessage.hpp deleted file mode 100644 index eefc05593..000000000 --- a/llarp/iwp/outermessage.hpp +++ /dev/null @@ -1,86 +0,0 @@ -#ifndef LLARP_IWP_OUTERMESSAGE_HPP -#define LLARP_IWP_OUTERMESSAGE_HPP - -#include -#include -#include - -#include - -namespace llarp -{ - namespace iwp - { - using FlowID_t = AlignedBuffer< 32 >; - - using OuterCommand_t = byte_t; - - constexpr OuterCommand_t eOCMD_ObtainFlowID = 'O'; - constexpr OuterCommand_t eOCMD_GiveFlowID = 'G'; - constexpr OuterCommand_t eOCMD_Reject = 'R'; - constexpr OuterCommand_t eOCMD_SessionNegotiate = 'S'; - constexpr OuterCommand_t eOCMD_TransmitData = 'D'; - - using InnerCommand_t = byte_t; - - constexpr InnerCommand_t eICMD_KeepAlive = 'k'; - constexpr InnerCommand_t eICMD_KeepAliveAck = 'l'; - constexpr InnerCommand_t eICMD_Congestion = 'c'; - constexpr InnerCommand_t eICMD_AntiCongestion = 'd'; - constexpr InnerCommand_t eICMD_Transmit = 't'; - constexpr InnerCommand_t eICMD_Ack = 'a'; - constexpr InnerCommand_t eICMD_RotateKeys = 'r'; - constexpr InnerCommand_t eICMD_UpgradeProtocol = 'u'; - constexpr InnerCommand_t eICMD_VersionUpgrade = 'v'; - - struct OuterMessage - { - // required members - byte_t command; - FlowID_t flow; - - OuterMessage(); - ~OuterMessage(); - - // static members - static std::array< byte_t, 6 > obtain_flow_id_magic; - static std::array< byte_t, 6 > give_flow_id_magic; - - void - CreateReject(const char *msg, llarp_time_t now, const PubKey &pk); - - // optional members follow - std::array< byte_t, 6 > magic; - NetID netid; - // either timestamp or counter - uint64_t uinteger; - std::array< byte_t, 14 > reject; - AlignedBuffer< 24 > N; - PubKey pubkey; - - std::unique_ptr< AlignedBuffer< 32 > > A; - - static constexpr size_t ipv6_mtu = 1280; - static constexpr size_t overhead_size = 16 + 24 + 32; - static constexpr size_t payload_size = ipv6_mtu - overhead_size; - - AlignedBuffer< payload_size > X; - size_t Xsize; - ShortHash Zhash; - Signature Zsig; - - /// encode to buffer - bool - Encode(llarp_buffer_t *buf) const; - - /// decode from buffer - bool - Decode(llarp_buffer_t *buf); - - /// clear members - void - Clear(); - }; - } // namespace iwp -} // namespace llarp -#endif diff --git a/llarp/iwp/session.cpp b/llarp/iwp/session.cpp new file mode 100644 index 000000000..c81bb0b02 --- /dev/null +++ b/llarp/iwp/session.cpp @@ -0,0 +1,536 @@ +#include +#include +#include + +namespace llarp +{ + namespace iwp + { + static constexpr size_t PacketOverhead = HMACSIZE + TUNNONCESIZE; + + Session::Session(LinkLayer* p, RouterContact rc, AddressInfo ai) + : m_State{State::Initial} + , m_Inbound{false} + , m_Parent{p} + , m_CreatedAt{p->Now()} + , m_RemoteAddr{ai} + , m_ChosenAI{std::move(ai)} + , m_RemoteRC{std::move(rc)} + { + token.Zero(); + GotLIM = util::memFn(&Session::GotOutboundLIM, this); + } + + Session::Session(LinkLayer* p, Addr from) + : m_State{State::Initial} + , m_Inbound{true} + , m_Parent{p} + , m_CreatedAt{p->Now()} + , m_RemoteAddr{from} + { + token.Randomize(); + GotLIM = util::memFn(&Session::GotInboundLIM, this); + } + + Session::~Session() + { + } + + void + Session::Send_LL(const llarp_buffer_t& pkt) + { + LogDebug("send ", pkt.sz, " to ", m_RemoteAddr); + m_Parent->SendTo_LL(m_RemoteAddr, pkt); + m_LastTX = time_now_ms(); + } + + bool + Session::GotInboundLIM(const LinkIntroMessage * msg) + { + if(msg->rc.enckey != m_RemoteOnionKey) + return false; + m_State = State::Ready; + GotLIM = util::memFn(&Session::GotRenegLIM, this); + return true; + } + + bool + Session::GotOutboundLIM(const LinkIntroMessage * msg) + { + if(msg->rc.pubkey != m_RemoteRC.pubkey) + return false; + m_State = State::LinkIntro; + GotLIM = util::memFn(&Session::GotRenegLIM, this); + SendOurLIM(); + return true; + } + + void + Session::SendOurLIM() + { + LinkIntroMessage msg; + msg.rc = m_Parent->GetOurRC(); + msg.N.Randomize(); + msg.P = 60000; + if(not msg.Sign(m_Parent->Sign)) + { + LogError("failed to sign our RC for ", m_RemoteAddr); + return; + } + AlignedBuffer data; + llarp_buffer_t buf{data}; + if(not msg.BEncode(&buf)) + { + LogError("failed to encode LIM for ", m_RemoteAddr); + } + buf.sz = buf.cur - buf.base; + buf.cur = buf.base; + if(!SendMessageBuffer(buf, nullptr)) + { + LogError("failed to send LIM to ", m_RemoteAddr); + } + LogDebug("sent LIM to ", m_RemoteAddr); + } + + void + Session::EncryptAndSend(const llarp_buffer_t& data) + { + + std::vector< byte_t > pkt; + pkt.resize(data.sz + PacketOverhead); + CryptoManager::instance()->randbytes(pkt.data(), pkt.size()); + llarp_buffer_t pktbuf{pkt}; + pktbuf.base += PacketOverhead; + pktbuf.sz -= PacketOverhead; + byte_t* nonce_ptr = pkt.data() + HMACSIZE; + + + CryptoManager::instance()->xchacha20_alt(pktbuf, data, m_SessionKey, + nonce_ptr); + + pktbuf.base = nonce_ptr; + pktbuf.sz = data.sz + 32; + CryptoManager::instance()->hmac(pkt.data(), pktbuf, m_SessionKey); + + pktbuf.base = pkt.data(); + pktbuf.sz = pkt.size(); + Send_LL(pktbuf); + } + + void + Session::Close() + { + if(m_State == State::Closed) + return; + const std::vector close_msg = {LLARP_PROTO_VERSION, Command::eCLOS}; + const llarp_buffer_t buf{close_msg}; + EncryptAndSend(buf); + m_State = State::Closed; + } + + bool + Session::SendMessageBuffer(const llarp_buffer_t& buf, + ILinkSession::CompletionHandler completed) + { + const auto msgid = m_TXID++; + auto& msg = m_TXMsgs.emplace(msgid, OutboundMessage{msgid, buf, completed}) + .first->second; + auto xmit = msg.XMIT(); + const llarp_buffer_t pkt{xmit}; + EncryptAndSend(pkt); + msg.FlushUnAcked(util::memFn(&Session::EncryptAndSend, this), m_Parent->Now()); + LogDebug("send message ", msgid); + return true; + } + + void + Session::Pump() + { + static constexpr llarp_time_t IntroInterval = 500; + const auto now = m_Parent->Now(); + if(m_State == State::Introduction) + { + if(not m_Inbound) + { + // resend intro + if(now - m_LastTX >= IntroInterval) + { + GenerateAndSendIntro(); + } + } + } + else if(m_State == State::Ready || m_State == State::LinkIntro) + { + for(auto itr = m_RXMsgs.begin(); itr != m_RXMsgs.end(); ) + { + if(itr->second.ShouldSendACKS(now)) + { + itr->second.SendACKS(util::memFn(&Session::EncryptAndSend, this), now); + } + if(itr->second.IsCompleted()) + { + if(itr->second.Verify()) + { + const llarp_buffer_t buf{itr->second.m_Data.data(), itr->second.m_Size}; + LogDebug("got message ", itr->first); + m_Parent->HandleMessage(this, buf); + } + else + { + LogError("hash missmatch for message ", itr->first); + } + itr = m_RXMsgs.erase(itr); + continue; + } + ++itr; + } + for(auto itr = m_TXMsgs.begin(); itr != m_TXMsgs.end(); ) + { + if(itr->second.ShouldFlush(now)) + itr->second.FlushUnAcked(util::memFn(&Session::EncryptAndSend, this), now); + if(itr->second.IsTransmitted()) + { + LogDebug("sent message ", itr->first); + itr->second.Completed(); + itr = m_TXMsgs.erase(itr); + continue; + } + ++itr; + } + } + } + + bool + Session::GotRenegLIM(const LinkIntroMessage * lim) + { + return m_Parent->SessionRenegotiate(lim->rc, m_RemoteRC); + } + + bool + Session::RenegotiateSession() + { + SendOurLIM(); + return true; + } + + bool + Session::ShouldPing() const + { + static constexpr llarp_time_t PingInterval = 1000; + const auto now = m_Parent->Now(); + return now - m_LastTX > PingInterval; + } + + util::StatusObject + Session::ExtractStatus() const + { + return { + {"remoteAddr", m_RemoteAddr.ToString()}, + {"remoteRC", m_RemoteRC.ExtractStatus()} + }; + } + + bool + Session::TimedOut(llarp_time_t now) const + { + static constexpr llarp_time_t SessionAliveTimeout = 5000; + if(m_State != State::Ready) + return now - m_CreatedAt > SessionAliveTimeout; + return now - m_LastRX > SessionAliveTimeout; + } + + void + Session::Tick(llarp_time_t) + { + } + + using Introduction = AlignedBuffer<64>; + + void + Session::GenerateAndSendIntro() + { + Introduction intro; + + TunnelNonce N; + N.Randomize(); + if(not CryptoManager::instance()->transport_dh_client(m_SessionKey, m_ChosenAI.pubkey, m_Parent->RouterEncryptionSecret(), N)) + { + LogError("failed to transport_dh_client on outbound session to ", m_RemoteAddr); + return; + } + const auto pk = m_Parent->RouterEncryptionSecret().toPublic(); + std::copy_n(pk.begin(), pk.size(), intro.begin()); + std::copy(N.begin(), N.end(), intro.begin() + 32); + LogDebug("pk=", pk.ToHex(), " N=", N.ToHex(), " remote-pk=", m_ChosenAI.pubkey.ToHex()); + std::vector req; + req.resize(intro.size() + (randint() % 64)); + CryptoManager::instance()->randbytes(req.data(), req.size()); + std::copy_n(intro.begin(), intro.size(), req.begin()); + const llarp_buffer_t buf{req}; + Send_LL(buf); + m_State = State::Introduction; + } + + void + Session::HandleCreateSessionRequest(const llarp_buffer_t & buf) + { + std::vector result; + if(not DecryptMessage(buf, result)) + { + LogError("failed to decrypt session request from ", m_RemoteAddr); + return; + } + if(result.size() < token.size()) + { + LogError("bad session request size, ", result.size(), " < ", token.size(), " from ", m_RemoteAddr); + return; + } + if(not std::equal(result.begin(), result.begin() + token.size(), token.begin())) + { + LogError("token missmatch from ", m_RemoteAddr); + return; + } + SendOurLIM(); + m_State = State::LinkIntro; + } + + void + Session::HandleGotIntro(const llarp_buffer_t & buf) + { + if(buf.sz < Introduction::SIZE) + return; + TunnelNonce N; + std::copy_n(buf.base, PubKey::SIZE, m_RemoteOnionKey.begin()); + std::copy_n(buf.base + PubKey::SIZE, TunnelNonce::SIZE, N.begin()); + const PubKey pk = m_Parent->TransportSecretKey().toPublic(); + LogDebug("remote-pk=", m_RemoteOnionKey.ToHex(), " N=", N.ToHex(), " local-pk=", pk.ToHex()); + if(not CryptoManager::instance()->transport_dh_server(m_SessionKey, m_RemoteOnionKey, m_Parent->TransportSecretKey(), N)) + { + LogError("failed to transport_dh_server on inbound intro from ", m_RemoteAddr); + return; + } + std::vector reply; + reply.resize(token.size() + (randint() % 32)); + CryptoManager::instance()->randbytes(reply.data(), reply.size()); + std::copy_n(token.begin(), token.size(), reply.begin()); + const llarp_buffer_t pkt{reply}; + m_LastRX = m_Parent->Now(); + EncryptAndSend(pkt); + m_State = State::Introduction; + } + + void + Session::HandleGotIntroAck(const llarp_buffer_t & buf) + { + std::vector reply; + if(not DecryptMessage(buf, reply)) + { + LogError("intro ack decrypt failed from ", m_RemoteAddr); + return; + } + if(reply.size() < token.size()) + { + LogError("bad intro ack size ", reply.size(), " < ", token.size(), " from ", m_RemoteAddr); + return; + } + m_LastRX = m_Parent->Now(); + std::copy_n(reply.begin(), token.size(), token.begin()); + const llarp_buffer_t pkt{token}; + EncryptAndSend(pkt); + m_State = State::LinkIntro; + } + + bool + Session::DecryptMessage(const llarp_buffer_t & buf, std::vector & result) + { + if(buf.sz <= PacketOverhead) + return false; + ShortHash H; + llarp_buffer_t curbuf{buf.base, buf.sz}; + curbuf.base += ShortHash::SIZE; + curbuf.sz -= ShortHash::SIZE; + if(not CryptoManager::instance()->hmac(H.data(), curbuf, m_SessionKey)) + { + LogError("failed to caclulate keyed hash for ", m_RemoteAddr); + return false; + } + const ShortHash expected{buf.base}; + if(H != expected) + { + LogError("keyed hash missmatch ", H, " != ", expected, " from ", m_RemoteAddr); + return false; + } + const byte_t * nonce_ptr = curbuf.base; + curbuf.base += 32; + curbuf.sz -= 32; + result.resize(buf.sz - PacketOverhead); + const llarp_buffer_t outbuf{result}; + LogDebug("decrypt: ", result.size(), " bytes from ", m_RemoteAddr); + return CryptoManager::instance()->xchacha20_alt(outbuf, curbuf, m_SessionKey, nonce_ptr); + } + + void + Session::Start() + { + if(m_Inbound) + return; + GenerateAndSendIntro(); + } + + void + Session::HandleSessionData(const llarp_buffer_t & buf) + { + std::vector result; + if(not DecryptMessage(buf, result)) + { + LogError("failed to decrypt session data from ", m_RemoteAddr); + return; + } + if(result[0] != LLARP_PROTO_VERSION) + { + LogError("protocol version missmatch ", int(result[0]), " != ", LLARP_PROTO_VERSION); + return; + } + LogDebug("command ", int(result[1]), " from ", m_RemoteAddr); + switch(result[1]) + { + case Command::eXMIT: + HandleXMIT(std::move(result)); + break; + case Command::eDATA: + HandleDATA(std::move(result)); + break; + case Command::eACKS: + HandleACKS(std::move(result)); + break; + case Command::ePING: + HandlePING(std::move(result)); + break; + case Command::eCLOS: + HandleCLOS(std::move(result)); + break; + default: + LogError("invalid command ", int(result[1])); + } + } + + void + Session::HandleXMIT(std::vector data) + { + if(data.size() < 44) + { + LogError("short XMIT from ", m_RemoteAddr, " ", data.size(), " < 44"); + return; + } + uint16_t sz = bufbe16toh(data.data() + 2); + uint64_t rxid = bufbe64toh(data.data() + 4); + ShortHash h{data.data() + 12}; + LogDebug("rxid=", rxid, " sz=", sz, " h=", h.ToHex()); + m_RXMsgs.emplace(rxid, InboundMessage{rxid, sz, std::move(h)}); + m_LastRX = m_Parent->Now(); + } + + void + Session::HandleDATA(std::vector data) + { + if(data.size() < FragmentSize + 12) + { + LogError("short DATA from ", m_RemoteAddr, " ", data.size(), " < ", FragmentSize + 8); + return; + } + uint16_t sz = bufbe16toh(data.data() + 2); + uint64_t rxid = bufbe64toh(data.data() + 4); + auto itr = m_RXMsgs.find(rxid); + if(itr == m_RXMsgs.end()) + { + LogWarn("no rxid=", rxid, " for ", m_RemoteAddr); + return; + } + itr->second.HandleData(sz, data.data() + 12); + m_LastRX = m_Parent->Now(); + LogDebug(itr->first, " completed=", itr->second.IsCompleted()); + } + + void + Session::HandleACKS(std::vector data) + { + if(data.size() < 11) + { + LogError("short ACKS from ", m_RemoteAddr, " ", data.size(), " < 11"); + return; + } + uint64_t txid = bufbe64toh(data.data() + 2); + auto itr = m_TXMsgs.find(txid); + if(itr == m_TXMsgs.end()) + { + LogWarn("no txid=", txid, " for ", m_RemoteAddr); + return; + } + itr->second.Ack(data[10]); + m_LastRX = m_Parent->Now(); + } + + void + Session::HandleCLOS(std::vector) + { + Close(); + } + + void + Session::HandlePING(std::vector) + { + m_LastRX = m_Parent->Now(); + } + + bool + Session::SendKeepAlive() + { + // TODO: Implement me + return false; + } + + bool + Session::IsEstablished() const + { + return m_State == State::Ready; + } + + void + Session::Recv_LL(const llarp_buffer_t& buf) + { + switch(m_State) + { + case State::Initial: + if(m_Inbound) + { + // initial data + // enter introduction phase + HandleGotIntro(buf); + } + else + { + // this case should never happen + ::abort(); + } + break; + case State::Introduction: + if(m_Inbound) + { + // we are replying to an intro ack + HandleCreateSessionRequest(buf); + } + else + { + // we got an intro ack + // send a session request + HandleGotIntroAck(buf); + } + break; + case State::LinkIntro: + default: + HandleSessionData(buf); + break; + } + } + } // namespace iwp +} // namespace llarp \ No newline at end of file diff --git a/llarp/iwp/session.hpp b/llarp/iwp/session.hpp new file mode 100644 index 000000000..423348bc0 --- /dev/null +++ b/llarp/iwp/session.hpp @@ -0,0 +1,189 @@ +#ifndef LLARP_IWP_SESSION_HPP +#define LLARP_IWP_SESSION_HPP + +#include +#include +#include + +namespace llarp +{ + namespace iwp + { + struct Session : public ILinkSession, + public std::enable_shared_from_this< Session > + { + /// outbound session + Session(LinkLayer* parent, RouterContact rc, AddressInfo ai); + /// inbound session + Session(LinkLayer* parent, Addr from); + + ~Session(); + + void + Pump() override; + + void + Tick(llarp_time_t now) override; + + bool + SendMessageBuffer(const llarp_buffer_t& buf, + CompletionHandler resultHandler) override; + + void + Send_LL(const llarp_buffer_t& pkt); + + void + EncryptAndSend(const llarp_buffer_t& data); + + void + Start() override; + + void + Close() override; + + void + Recv_LL(const llarp_buffer_t& pkt) override; + + bool + SendKeepAlive() override; + + bool + IsEstablished() const override; + + bool + TimedOut(llarp_time_t now) const override; + + PubKey + GetPubKey() const override + { + return m_RemoteRC.pubkey; + } + + Addr + GetRemoteEndpoint() const override + { + return m_RemoteAddr; + } + + RouterContact + GetRemoteRC() const override + { + return m_RemoteRC; + } + + size_t + SendQueueBacklog() const override + { + return m_TXMsgs.size(); + } + + ILinkLayer* + GetLinkLayer() const override + { + return m_Parent; + } + + bool + RenegotiateSession() override; + + bool + ShouldPing() const override; + + util::StatusObject + ExtractStatus() const override; + + private: + enum class State + { + /// we have no data recv'd + Initial, + /// we are in introduction/intro ack phase + Introduction, + /// we sent our LIM + LinkIntro, + /// handshake done and LIM has been obtained + Ready, + /// we are closed now + Closed + }; + State m_State; + /// are we inbound session ? + const bool m_Inbound; + /// parent link layer + LinkLayer* const m_Parent; + const llarp_time_t m_CreatedAt; + const Addr m_RemoteAddr; + + AddressInfo m_ChosenAI; + /// remote rc + RouterContact m_RemoteRC; + /// session key + SharedSecret m_SessionKey; + /// session token + AlignedBuffer<16> token; + + PubKey m_RemoteOnionKey; + + llarp_time_t m_LastTX = 0; + llarp_time_t m_LastRX = 0; + + uint64_t m_TXID = 0; + + std::unordered_map< uint64_t, InboundMessage > m_RXMsgs; + std::unordered_map< uint64_t, OutboundMessage > m_TXMsgs; + + void + HandleGotIntro(const llarp_buffer_t& buf); + + void + HandleGotIntroAck(const llarp_buffer_t& buf); + + void + HandleCreateSessionRequest(const llarp_buffer_t& buf); + + void + ProcessSessionRequest(const llarp_buffer_t& buf); + + void + ProcessCreateSessionReply(const llarp_buffer_t& buf); + + void + HandleSessionData(const llarp_buffer_t& buf); + + bool + DecryptMessage(const llarp_buffer_t & buf, std::vector & result); + + void + GenerateAndSendIntro(); + + bool + GotInboundLIM(const LinkIntroMessage * msg); + + bool + GotOutboundLIM(const LinkIntroMessage * msg); + + bool + GotRenegLIM(const LinkIntroMessage * msg); + + void + SendOurLIM(); + + void + HandleXMIT(std::vector msg); + + void + HandleDATA(std::vector msg); + + void + HandleACKS(std::vector msg); + + void + HandlePING(std::vector msg); + + void + HandleCLOS(std::vector msg); + }; + } // namespace iwp +} // namespace llarp + +#endif \ No newline at end of file diff --git a/llarp/link/factory.cpp b/llarp/link/factory.cpp index ba71f9a1d..734e7b1ef 100644 --- a/llarp/link/factory.cpp +++ b/llarp/link/factory.cpp @@ -1,6 +1,6 @@ #include +#include #include -#include namespace llarp { @@ -41,10 +41,10 @@ namespace llarp if(permitInbound) return llarp::utp::NewInboundLink; return llarp::utp::NewOutboundLink; - case LinkType::eLinkMempipe: + case LinkType::eLinkIWP: if(permitInbound) - return llarp::mempipe::NewInboundLink; - return llarp::mempipe::NewOutboundLink; + return llarp::iwp::NewInboundLink; + return llarp::iwp::NewOutboundLink; default: return nullptr; } diff --git a/llarp/link/session.hpp b/llarp/link/session.hpp index 39dc1ae96..73a6efe3e 100644 --- a/llarp/link/session.hpp +++ b/llarp/link/session.hpp @@ -27,7 +27,7 @@ namespace llarp /// hook for utp for when we have established a connection virtual void - OnLinkEstablished(ILinkLayer *p) = 0; + OnLinkEstablished(ILinkLayer *){}; /// called every event loop tick virtual void @@ -50,6 +50,13 @@ namespace llarp virtual void Close() = 0; + /// recv packet on low layer + /// not used by utp + virtual void + Recv_LL(const llarp_buffer_t &) + { + } + /// send a keepalive to the remote endpoint virtual bool SendKeepAlive() = 0; diff --git a/llarp/mempipe/mempipe.cpp b/llarp/mempipe/mempipe.cpp deleted file mode 100644 index 3f0042de4..000000000 --- a/llarp/mempipe/mempipe.cpp +++ /dev/null @@ -1,613 +0,0 @@ -#include -#include -#include -#include -#include - -namespace llarp -{ - namespace mempipe - { - struct MemLink; - struct MemSession; - - struct MempipeContext - { - using Nodes_t = - std::unordered_map< RouterID, LinkLayer_ptr, RouterID::Hash >; - Nodes_t _nodes; - using SendEvent = std::tuple< RouterID, RouterID, std::vector< byte_t >, - ILinkSession::CompletionHandler >; - - /// (src, dst, session, hook) - std::vector< SendEvent > _sendQueue; - using NodeConnection_t = std::tuple< RouterID, RouterID >; - - struct NodeConnectionHash - { - size_t - operator()(const NodeConnection_t con) const - { - const auto& a = std::get< 0 >(con); - const auto& b = std::get< 1 >(con); - auto op = std::bit_xor< size_t >(); - return std::accumulate(a.begin(), a.end(), - std::accumulate(b.begin(), b.end(), 0, op), - op); - } - }; - - using NodeConnections_t = - std::unordered_map< NodeConnection_t, std::shared_ptr< MemSession >, - NodeConnectionHash >; - - NodeConnections_t _connections; - - mutable util::Mutex _access; - - void - AddNode(LinkLayer_ptr ptr) LOCKS_EXCLUDED(_access); - - void - RemoveNode(LinkLayer_ptr ptr) LOCKS_EXCLUDED(_access); - - LinkLayer_ptr - FindNode(const RouterID pk) LOCKS_EXCLUDED(_access); - - /// connect src to dst - void - ConnectNode(const RouterID src, const RouterID dst, - const std::shared_ptr< MemSession >& ptr) - LOCKS_EXCLUDED(_access); - - /// remote both src and dst as connected - void - DisconnectNode(const RouterID src, const RouterID dst) - LOCKS_EXCLUDED(_access); - - bool - HasConnection(const RouterID src, const RouterID dst) const - LOCKS_EXCLUDED(_access); - - void - InboundConnection(const RouterID to, - const std::shared_ptr< MemSession >& obsession); - - void - CallLater(std::function< void(void) > f) - { - if(m_Logic && f) - m_Logic->queue_func(f); - else if(f) - LogError("dropping call"); - } - - bool - SendTo(const RouterID src, const RouterID dst, - const std::vector< byte_t > msg, - ILinkSession::CompletionHandler delivery) LOCKS_EXCLUDED(_access); - - void - Pump() LOCKS_EXCLUDED(_access); - - void - Start(llarp_ev_loop_ptr loop) - { - evloop = loop; - m_Run.store(true); - std::promise< void > p; - m_Thread = std::make_unique< std::thread >([&]() { - LogDebug("mempipe started"); - m_Logic = std::make_shared< Logic >(); - p.set_value(); - while(m_Run.load()) - { - m_Logic->tick(time_now_ms()); - std::this_thread::sleep_for(std::chrono::milliseconds(1)); - Pump(); - } - m_Logic->stop(); - }); - p.get_future().wait(); - LogDebug("mempipe up"); - } - - ~MempipeContext() - { - m_Run.store(false); - if(m_Thread) - m_Thread->join(); - } - - std::atomic< bool > m_Run; - std::shared_ptr< Logic > m_Logic; - std::unique_ptr< std::thread > m_Thread = nullptr; - llarp_ev_loop_ptr evloop = nullptr; - }; - - using Globals_ptr = std::unique_ptr< MempipeContext >; - - Globals_ptr _globals; - - struct MemSession : public ILinkSession, - public llarp_ev_pkt_pipe, - public std::enable_shared_from_this< MemSession > - { - MemSession(llarp_ev_loop_ptr ev, LinkLayer_ptr _local, - LinkLayer_ptr _remote, bool inbound) - : llarp_ev_pkt_pipe(ev) - , remote{std::move(_remote)} - , parent{std::move(_local)} - , isInbound{inbound} - { - } - - LinkLayer_ptr remote; - LinkLayer_ptr parent; - const bool isInbound; - - util::Mutex _access; - - std::deque< std::vector< byte_t > > m_recvQueue; - std::deque< std::tuple< std::vector< byte_t >, CompletionHandler > > - m_sendQueue; - - llarp_time_t lastRecv = 0; - - PubKey - GetPubKey() const override - { - return remote->GetOurRC().pubkey; - } - - bool - SendKeepAlive() override - { - std::array< byte_t, 128 > pkt; - DiscardMessage msg; - llarp_buffer_t buf{pkt}; - if(!msg.BEncode(&buf)) - return false; - buf.sz = buf.cur - buf.base; - buf.cur = buf.base; - return SendMessageBuffer(buf, nullptr); - } - - void - OnRead(const llarp_buffer_t& pkt) override - { - std::vector< byte_t > buf; - buf.resize(pkt.sz); - std::copy_n(pkt.base, pkt.sz, buf.begin()); - Recv(std::move(buf)); - } - - void - Recv(const std::vector< byte_t > msg) LOCKS_EXCLUDED(_access) - { - util::Lock lock(&_access); - m_recvQueue.emplace_back(std::move(msg)); - lastRecv = parent->Now(); - } - - void - OnLinkEstablished(ILinkLayer*) override - { - return; - } - - bool - TimedOut(llarp_time_t now) const override - { - return now >= lastRecv && now - lastRecv > 5000; - } - - void - PumpWrite() LOCKS_EXCLUDED(_access) - { - std::deque< std::tuple< std::vector< byte_t >, CompletionHandler > > q; - { - util::Lock lock(&_access); - if(m_sendQueue.size()) - q = std::move(m_sendQueue); - } - const RouterID src = parent->GetOurRC().pubkey; - const RouterID dst = GetPubKey(); - while(q.size()) - { - const auto& f = q.front(); - _globals->SendTo(src, dst, std::get< 0 >(f), std::get< 1 >(f)); - q.pop_front(); - } - } - - void - PumpRead() LOCKS_EXCLUDED(_access) - { - std::deque< std::vector< byte_t > > q; - { - util::Lock lock(&_access); - if(m_recvQueue.size()) - q = std::move(m_recvQueue); - } - while(q.size()) - { - const llarp_buffer_t buf{q.front()}; - parent->HandleMessage(this, buf); - q.pop_front(); - } - } - - void Tick(llarp_time_t) override - { - Pump(); - } - - void - Pump() override - { - PumpRead(); - PumpWrite(); - } - - void - Close() override - { - auto self = shared_from_this(); - _globals->CallLater([=]() { self->Disconnected(); }); - } - - RouterContact - GetRemoteRC() const override - { - return remote->GetOurRC(); - } - - bool - ShouldPing() const override - { - return true; - } - - bool - SendMessageBuffer(const llarp_buffer_t& pkt, - ILinkSession::CompletionHandler completed) override - { - if(completed == nullptr) - completed = [](ILinkSession::DeliveryStatus) {}; - auto self = shared_from_this(); - std::vector< byte_t > buf(pkt.sz); - std::copy_n(pkt.base, pkt.sz, buf.begin()); - return _globals->SendTo(parent->GetOurRC().pubkey, GetRemoteRC().pubkey, - buf, [=](ILinkSession::DeliveryStatus status) { - self->parent->logic()->call_later( - 10, std::bind(completed, status)); - }); - } - - void - Start() override - { - if(!StartPipe()) - return; - if(isInbound) - return; - LogDebug("outbound start"); - auto self = shared_from_this(); - _globals->CallLater([=]() { - LogDebug("Called inbound connection"); - _globals->InboundConnection(self->GetPubKey(), self); - }); - } - - bool - IsEstablished() const override - { - return _globals->HasConnection(parent->GetOurRC().pubkey, GetPubKey()); - } - - void - Disconnected() - { - _globals->DisconnectNode(parent->GetOurRC().pubkey, GetPubKey()); - } - - bool - RenegotiateSession() override - { - return true; - } - - ILinkLayer* - GetLinkLayer() const override - { - return parent.get(); - } - - util::StatusObject - ExtractStatus() const override - { - return {}; - } - - llarp::Addr - GetRemoteEndpoint() const override - { - return {}; - } - - size_t - SendQueueBacklog() const override - { - return m_sendQueue.size(); - } - }; - - struct MemLink : public ILinkLayer, - public std::enable_shared_from_this< MemLink > - { - MemLink(const SecretKey& routerEncSecret, GetRCFunc getrc, - LinkMessageHandler h, SignBufferFunc sign, - SessionEstablishedHandler est, SessionRenegotiateHandler reneg, - TimeoutHandler timeout, SessionClosedHandler closed, - bool permitInbound) - : ILinkLayer(routerEncSecret, getrc, h, sign, est, reneg, timeout, - closed) - , allowInbound(permitInbound) - { - } - - const bool allowInbound; - - bool - KeyGen(SecretKey& k) override - { - k.Zero(); - return true; - } - - const char* - Name() const override - { - return "mempipe"; - } - - uint16_t - Rank() const override - { - return 100; - } - - void - Pump() override - { - LogDebug("memlink pump"); - std::set< RouterID > sessions; - { - Lock l(&m_AuthedLinksMutex); - auto itr = m_AuthedLinks.begin(); - while(itr != m_AuthedLinks.end()) - { - sessions.insert(itr->first); - ++itr; - } - } - ILinkLayer::Pump(); - { - Lock l(&m_AuthedLinksMutex); - for(const auto& pk : sessions) - { - if(m_AuthedLinks.count(pk) == 0) - { - // all sessions were removed - SessionClosed(pk); - } - } - } - } - - void - RecvFrom(const llarp::Addr&, const void*, size_t) override - { - } - - bool - Configure(llarp_ev_loop_ptr ev, const std::string&, int, - uint16_t) override - { - m_Loop = ev; - if(_globals == nullptr) - { - _globals = std::make_unique< MempipeContext >(); - _globals->Start(ev); - } - return _globals != nullptr; - } - - std::shared_ptr< ILinkSession > - NewOutboundSession(const RouterContact& rc, - const AddressInfo& ai) override - { - if(ai.dialect != Name()) - return nullptr; - auto remote = _globals->FindNode(rc.pubkey); - if(remote == nullptr) - return nullptr; - return std::make_shared< MemSession >(m_Loop, shared_from_this(), - remote, false); - } - - bool - Start(std::shared_ptr< Logic > l) override - { - if(!ILinkLayer::Start(l)) - return false; - _globals->AddNode(shared_from_this()); - return true; - } - - void - Stop() override - { - _globals->RemoveNode(shared_from_this()); - } - }; - - LinkLayer_ptr - NewOutboundLink(const SecretKey& routerEncSecret, GetRCFunc getrc, - LinkMessageHandler h, SignBufferFunc sign, - SessionEstablishedHandler est, - SessionRenegotiateHandler reneg, TimeoutHandler timeout, - SessionClosedHandler closed) - { - return std::make_shared< MemLink >(routerEncSecret, getrc, h, sign, est, - reneg, timeout, closed, false); - } - - LinkLayer_ptr - NewInboundLink(const SecretKey& routerEncSecret, GetRCFunc getrc, - LinkMessageHandler h, SignBufferFunc sign, - SessionEstablishedHandler est, - SessionRenegotiateHandler reneg, TimeoutHandler timeout, - SessionClosedHandler closed) - { - return std::make_shared< MemLink >(routerEncSecret, getrc, h, sign, est, - reneg, timeout, closed, true); - } - - void - MempipeContext::AddNode(LinkLayer_ptr ptr) - { - util::Lock lock(&_access); - _nodes.emplace(RouterID(ptr->GetOurRC().pubkey), ptr); - LogInfo("add mempipe node: ", RouterID(ptr->GetOurRC().pubkey)); - } - - bool - MempipeContext::SendTo(const RouterID src, const RouterID dst, - const std::vector< byte_t > msg, - ILinkSession::CompletionHandler delivery) - { - util::Lock lock(&_access); - _sendQueue.emplace_back(std::move(src), std::move(dst), std::move(msg), - std::move(delivery)); - return true; - } - - void - MempipeContext::InboundConnection(const RouterID to, - const std::shared_ptr< MemSession >& ob) - { - LogDebug("inbound connect to ", to, " from ", - RouterID(ob->parent->GetOurRC().pubkey)); - std::shared_ptr< MemSession > other; - { - util::Lock lock(&_access); - auto itr = _nodes.find(to); - if(itr != _nodes.end()) - { - other = std::make_shared< MemSession >(evloop, itr->second, - ob->parent, true); - } - } - if(other) - { - ConnectNode(other->GetPubKey(), ob->GetPubKey(), other); - ConnectNode(ob->GetPubKey(), other->GetPubKey(), ob); - ob->parent->logic()->queue_func([ob]() { - ob->parent->MapAddr(RouterID{ob->GetPubKey()}, ob.get()); - ob->parent->SessionEstablished(ob.get()); - }); - other->parent->logic()->queue_func([other]() { - other->parent->MapAddr(RouterID{other->GetPubKey()}, other.get()); - other->parent->SessionEstablished(other.get()); - }); - } - else - { - ob->Disconnected(); - } - } - - void - MempipeContext::ConnectNode(const RouterID src, const RouterID dst, - const std::shared_ptr< MemSession >& session) - { - LogDebug("connect ", src, " to ", dst); - util::Lock lock(&_access); - _connections.emplace(std::make_pair(std::make_tuple(src, dst), session)); - } - - void - MempipeContext::DisconnectNode(const RouterID src, const RouterID dst) - { - LogDebug("connect ", src, " from ", dst); - util::Lock lock(&_access); - _connections.erase({src, dst}); - } - - LinkLayer_ptr - MempipeContext::FindNode(const RouterID rid) - { - util::Lock lock(&_access); - auto itr = _nodes.find(rid); - if(itr == _nodes.end()) - return nullptr; - return itr->second; - } - - bool - MempipeContext::HasConnection(const RouterID src, const RouterID dst) const - { - util::Lock lock(&_access); - return _connections.find({src, dst}) != _connections.end(); - } - - void - MempipeContext::RemoveNode(LinkLayer_ptr node) - { - util::Lock lock(&_access); - const RouterID pk = node->GetOurRC().pubkey; - _nodes.erase(pk); - auto itr = _connections.begin(); - while(itr != _connections.end()) - { - if(std::get< 0 >(itr->first) == pk || std::get< 1 >(itr->first) == pk) - { - auto s = itr->second->shared_from_this(); - itr->second->GetLinkLayer()->logic()->call_later( - 1, [s]() { s->Disconnected(); }); - } - ++itr; - } - } - - void - MempipeContext::Pump() - { - std::vector< SendEvent > q; - { - util::Lock lock(&_access); - q = std::move(_sendQueue); - } - for(auto& f : q) - { - ILinkSession::DeliveryStatus status = - ILinkSession::DeliveryStatus::eDeliveryDropped; - { - util::Lock lock(&_access); - auto itr = _connections.find({std::get< 0 >(f), std::get< 1 >(f)}); - if(itr != _connections.end()) - { - const llarp_buffer_t pkt{std::get< 2 >(f)}; - if(itr->second->Write(pkt)) - status = ILinkSession::DeliveryStatus::eDeliverySuccess; - } - } - LogDebug(std::get< 0 >(f), "->", std::get< 1 >(f), - " status=", (int)status); - CallLater(std::bind(std::get< 3 >(f), status)); - } - } - } // namespace mempipe -} // namespace llarp \ No newline at end of file diff --git a/llarp/mempipe/mempipe.hpp b/llarp/mempipe/mempipe.hpp deleted file mode 100644 index 91094602a..000000000 --- a/llarp/mempipe/mempipe.hpp +++ /dev/null @@ -1,25 +0,0 @@ -#ifndef LLARP_MEMPIPE_MEMPIPE_HPP -#define LLARP_MEMPIPE_MEMPIPE_HPP -#include -#include - -namespace llarp -{ - namespace mempipe - { - LinkLayer_ptr - NewInboundLink(const SecretKey& routerEncSecret, GetRCFunc getrc, - LinkMessageHandler h, SignBufferFunc sign, - SessionEstablishedHandler est, - SessionRenegotiateHandler reneg, TimeoutHandler timeout, - SessionClosedHandler closed); - LinkLayer_ptr - NewOutboundLink(const SecretKey& routerEncSecret, GetRCFunc getrc, - LinkMessageHandler h, SignBufferFunc sign, - SessionEstablishedHandler est, - SessionRenegotiateHandler reneg, TimeoutHandler timeout, - SessionClosedHandler closed); - } // namespace mempipe -} // namespace llarp - -#endif \ No newline at end of file diff --git a/test/link/test_llarp_link.cpp b/test/link/test_llarp_link.cpp index d214a9cc1..5f03069fb 100644 --- a/test/link/test_llarp_link.cpp +++ b/test/link/test_llarp_link.cpp @@ -1,8 +1,8 @@ -#include +#include #include #include #include -#include +#include #include #include #include @@ -15,7 +15,7 @@ using namespace ::llarp; using namespace ::testing; -struct LinkLayerTest : public test::LlarpTest< NoOpCrypto > +struct LinkLayerTest : public test::LlarpTest< llarp::sodium::CryptoLibSodium > { static constexpr uint16_t AlicePort = 5000; static constexpr uint16_t BobPort = 6000; @@ -171,9 +171,9 @@ struct LinkLayerTest : public test::LlarpTest< NoOpCrypto > } }; -TEST_F(LinkLayerTest, TestMemPipe) +TEST_F(LinkLayerTest, TestIWP) { - Alice.link = mempipe::NewInboundLink( + Alice.link = iwp::NewInboundLink( Alice.encryptionKey, [&]() -> const RouterContact& { return Alice.GetRC(); }, [&](ILinkSession* s, const llarp_buffer_t& buf) -> bool { @@ -221,7 +221,7 @@ TEST_F(LinkLayerTest, TestMemPipe) return s->SendMessageBuffer(otherBuf, nullptr); }; - Bob.link = mempipe::NewInboundLink( + Bob.link = iwp::NewInboundLink( Bob.encryptionKey, [&]() -> const RouterContact& { return Bob.GetRC(); }, [&](ILinkSession* s, const llarp_buffer_t& buf) -> bool { LinkIntroMessage msg; @@ -258,7 +258,6 @@ TEST_F(LinkLayerTest, TestMemPipe) RunMainloop(); ASSERT_TRUE(Bob.gotLIM); - ASSERT_TRUE(success); }; TEST_F(LinkLayerTest, TestUTPAliceRenegWithBob)