diff --git a/llarp/ev/ev.hpp b/llarp/ev/ev.hpp index 0a4de55fa..1f610c100 100644 --- a/llarp/ev/ev.hpp +++ b/llarp/ev/ev.hpp @@ -40,6 +40,8 @@ typedef struct sockaddr_un #include #endif +struct llarp_ev_pkt_pipe; + #ifndef MAX_WRITE_QUEUE_SIZE #define MAX_WRITE_QUEUE_SIZE (1024UL) #endif @@ -772,6 +774,9 @@ struct llarp_ev_loop virtual llarp::ev_io* bind_tcp(llarp_tcp_acceptor* tcp, const sockaddr* addr) = 0; + virtual bool + add_pipe(llarp_ev_pkt_pipe* p) = 0; + /// register event listener virtual bool add_ev(llarp::ev_io* ev, bool write) = 0; diff --git a/llarp/ev/ev_libuv.cpp b/llarp/ev/ev_libuv.cpp index 3b9ac4307..93a815892 100644 --- a/llarp/ev/ev_libuv.cpp +++ b/llarp/ev/ev_libuv.cpp @@ -427,6 +427,77 @@ namespace libuv } }; + struct pipe_glue : public glue + { + byte_t m_Buffer[1024 * 8]; + llarp_ev_pkt_pipe* const m_Pipe; + pipe_glue(uv_loop_t* loop, llarp_ev_pkt_pipe* pipe) : m_Pipe(pipe) + { + m_Handle.data = this; + m_Ticker.data = this; + uv_poll_init(loop, &m_Handle, m_Pipe->fd); + uv_check_init(loop, &m_Ticker); + } + + void + Tick() + { + m_Pipe->tick(); + } + + static void + OnRead(uv_poll_t* handle, int status, int) + { + if(status) + { + return; + } + pipe_glue* glue = static_cast< pipe_glue* >(handle->data); + int r = glue->m_Pipe->read(glue->m_Buffer, sizeof(glue->m_Buffer)); + if(r <= 0) + return; + const llarp_buffer_t buf{glue->m_Buffer, size_t{r}}; + glue->m_Pipe->OnRead(buf); + } + + static void + OnClosed(uv_handle_t* h) + { + auto* self = static_cast< pipe_glue* >(h->data); + if(self) + { + h->data = nullptr; + delete self; + } + } + + void + Close() override + { + uv_check_stop(&m_Ticker); + uv_close((uv_handle_t*)&m_Handle, &OnClosed); + } + + static void + OnTick(uv_check_t* h) + { + static_cast< pipe_glue* >(h->data)->Tick(); + } + + bool + Start() + { + if(uv_poll_start(&m_Handle, UV_READABLE, &OnRead)) + return false; + if(uv_check_start(&m_Ticker, &OnTick)) + return false; + return true; + } + + uv_poll_t m_Handle; + uv_check_t m_Ticker; + }; + struct tun_glue : public glue { uv_poll_t m_Handle; @@ -703,4 +774,14 @@ namespace libuv return false; } + bool + Loop::add_pipe(llarp_ev_pkt_pipe* p) + { + auto* glue = new pipe_glue(m_Impl.get(), p); + if(glue->Start()) + return true; + delete glue; + return false; + } + } // namespace libuv diff --git a/llarp/ev/ev_libuv.hpp b/llarp/ev/ev_libuv.hpp index b9b85d3df..3ad9a6389 100644 --- a/llarp/ev/ev_libuv.hpp +++ b/llarp/ev/ev_libuv.hpp @@ -1,6 +1,7 @@ #ifndef LLARP_EV_LIBUV_HPP #define LLARP_EV_LIBUV_HPP #include +#include #include #include #include @@ -68,6 +69,9 @@ namespace libuv bool tcp_listen(llarp_tcp_acceptor* tcp, const sockaddr* addr) override; + bool + add_pipe(llarp_ev_pkt_pipe* p) override; + llarp::ev_io* bind_tcp(llarp_tcp_acceptor*, const sockaddr*) override { diff --git a/llarp/ev/pipe.cpp b/llarp/ev/pipe.cpp index 76d141a9e..049e702ee 100644 --- a/llarp/ev/pipe.cpp +++ b/llarp/ev/pipe.cpp @@ -12,7 +12,7 @@ llarp_ev_pkt_pipe::llarp_ev_pkt_pipe(llarp_ev_loop_ptr loop) } bool -llarp_ev_pkt_pipe::Start() +llarp_ev_pkt_pipe::StartPipe() { #if defined(_WIN32) llarp::LogError("llarp_ev_pkt_pipe not supported on win32"); @@ -26,7 +26,7 @@ llarp_ev_pkt_pipe::Start() } fd = _fds[0]; writefd = _fds[1]; - return true; + return m_Loop->add_pipe(this); #endif } diff --git a/llarp/ev/pipe.hpp b/llarp/ev/pipe.hpp index 0bf1953fc..2abff3c48 100644 --- a/llarp/ev/pipe.hpp +++ b/llarp/ev/pipe.hpp @@ -10,7 +10,7 @@ struct llarp_ev_pkt_pipe : public llarp::ev_io /// start the pipe, initialize fds bool - Start(); + StartPipe(); /// write to the pipe from outside the event loop /// returns true on success diff --git a/llarp/mempipe/mempipe.cpp b/llarp/mempipe/mempipe.cpp index 58a0a3696..3f0042de4 100644 --- a/llarp/mempipe/mempipe.cpp +++ b/llarp/mempipe/mempipe.cpp @@ -2,6 +2,7 @@ #include #include #include +#include namespace llarp { @@ -18,9 +19,8 @@ namespace llarp using SendEvent = std::tuple< RouterID, RouterID, std::vector< byte_t >, ILinkSession::CompletionHandler >; - std::deque< SendEvent > _sendQueue; - /// (src, dst, session, hook) + std::vector< SendEvent > _sendQueue; using NodeConnection_t = std::tuple< RouterID, RouterID >; struct NodeConnectionHash @@ -76,7 +76,10 @@ namespace llarp void CallLater(std::function< void(void) > f) { - m_Logic->call_later(10, f); + if(m_Logic && f) + m_Logic->queue_func(f); + else if(f) + LogError("dropping call"); } bool @@ -88,34 +91,38 @@ namespace llarp Pump() LOCKS_EXCLUDED(_access); void - Start() + Start(llarp_ev_loop_ptr loop) { + evloop = loop; m_Run.store(true); - m_Thread = new std::thread{[&]() { + std::promise< void > p; + m_Thread = std::make_unique< std::thread >([&]() { + LogDebug("mempipe started"); m_Logic = std::make_shared< Logic >(); + p.set_value(); while(m_Run.load()) { - Pump(); m_Logic->tick(time_now_ms()); std::this_thread::sleep_for(std::chrono::milliseconds(1)); + Pump(); } - m_Logic = nullptr; - }}; + m_Logic->stop(); + }); + p.get_future().wait(); + LogDebug("mempipe up"); } ~MempipeContext() { m_Run.store(false); if(m_Thread) - { m_Thread->join(); - delete m_Thread; - } } std::atomic< bool > m_Run; - std::shared_ptr< Logic > m_Logic = nullptr; - std::thread* m_Thread = nullptr; + std::shared_ptr< Logic > m_Logic; + std::unique_ptr< std::thread > m_Thread = nullptr; + llarp_ev_loop_ptr evloop = nullptr; }; using Globals_ptr = std::unique_ptr< MempipeContext >; @@ -123,15 +130,21 @@ namespace llarp Globals_ptr _globals; struct MemSession : public ILinkSession, + public llarp_ev_pkt_pipe, public std::enable_shared_from_this< MemSession > { - MemSession(LinkLayer_ptr _local, LinkLayer_ptr _remote) - : remote(std::move(_remote)), parent(std::move(_local)) + MemSession(llarp_ev_loop_ptr ev, LinkLayer_ptr _local, + LinkLayer_ptr _remote, bool inbound) + : llarp_ev_pkt_pipe(ev) + , remote{std::move(_remote)} + , parent{std::move(_local)} + , isInbound{inbound} { } LinkLayer_ptr remote; LinkLayer_ptr parent; + const bool isInbound; util::Mutex _access; @@ -160,6 +173,15 @@ namespace llarp return SendMessageBuffer(buf, nullptr); } + void + OnRead(const llarp_buffer_t& pkt) override + { + std::vector< byte_t > buf; + buf.resize(pkt.sz); + std::copy_n(pkt.base, pkt.sz, buf.begin()); + Recv(std::move(buf)); + } + void Recv(const std::vector< byte_t > msg) LOCKS_EXCLUDED(_access) { @@ -218,6 +240,7 @@ namespace llarp void Tick(llarp_time_t) override { + Pump(); } void @@ -265,9 +288,16 @@ namespace llarp void Start() override { + if(!StartPipe()) + return; + if(isInbound) + return; + LogDebug("outbound start"); auto self = shared_from_this(); - _globals->CallLater( - [=]() { _globals->InboundConnection(self->GetPubKey(), self); }); + _globals->CallLater([=]() { + LogDebug("Called inbound connection"); + _globals->InboundConnection(self->GetPubKey(), self); + }); } bool @@ -348,16 +378,49 @@ namespace llarp return 100; } + void + Pump() override + { + LogDebug("memlink pump"); + std::set< RouterID > sessions; + { + Lock l(&m_AuthedLinksMutex); + auto itr = m_AuthedLinks.begin(); + while(itr != m_AuthedLinks.end()) + { + sessions.insert(itr->first); + ++itr; + } + } + ILinkLayer::Pump(); + { + Lock l(&m_AuthedLinksMutex); + for(const auto& pk : sessions) + { + if(m_AuthedLinks.count(pk) == 0) + { + // all sessions were removed + SessionClosed(pk); + } + } + } + } + void RecvFrom(const llarp::Addr&, const void*, size_t) override { } bool - Configure(llarp_ev_loop_ptr, const std::string&, int, uint16_t) override + Configure(llarp_ev_loop_ptr ev, const std::string&, int, + uint16_t) override { + m_Loop = ev; if(_globals == nullptr) + { _globals = std::make_unique< MempipeContext >(); + _globals->Start(ev); + } return _globals != nullptr; } @@ -370,7 +433,8 @@ namespace llarp auto remote = _globals->FindNode(rc.pubkey); if(remote == nullptr) return nullptr; - return std::make_shared< MemSession >(shared_from_this(), remote); + return std::make_shared< MemSession >(m_Loop, shared_from_this(), + remote, false); } bool @@ -416,6 +480,7 @@ namespace llarp { util::Lock lock(&_access); _nodes.emplace(RouterID(ptr->GetOurRC().pubkey), ptr); + LogInfo("add mempipe node: ", RouterID(ptr->GetOurRC().pubkey)); } bool @@ -433,19 +498,30 @@ namespace llarp MempipeContext::InboundConnection(const RouterID to, const std::shared_ptr< MemSession >& ob) { + LogDebug("inbound connect to ", to, " from ", + RouterID(ob->parent->GetOurRC().pubkey)); std::shared_ptr< MemSession > other; { util::Lock lock(&_access); auto itr = _nodes.find(to); if(itr != _nodes.end()) { - other = std::make_shared< MemSession >(itr->second, ob->parent); + other = std::make_shared< MemSession >(evloop, itr->second, + ob->parent, true); } } if(other) { ConnectNode(other->GetPubKey(), ob->GetPubKey(), other); ConnectNode(ob->GetPubKey(), other->GetPubKey(), ob); + ob->parent->logic()->queue_func([ob]() { + ob->parent->MapAddr(RouterID{ob->GetPubKey()}, ob.get()); + ob->parent->SessionEstablished(ob.get()); + }); + other->parent->logic()->queue_func([other]() { + other->parent->MapAddr(RouterID{other->GetPubKey()}, other.get()); + other->parent->SessionEstablished(other.get()); + }); } else { @@ -457,6 +533,7 @@ namespace llarp MempipeContext::ConnectNode(const RouterID src, const RouterID dst, const std::shared_ptr< MemSession >& session) { + LogDebug("connect ", src, " to ", dst); util::Lock lock(&_access); _connections.emplace(std::make_pair(std::make_tuple(src, dst), session)); } @@ -464,6 +541,7 @@ namespace llarp void MempipeContext::DisconnectNode(const RouterID src, const RouterID dst) { + LogDebug("connect ", src, " from ", dst); util::Lock lock(&_access); _connections.erase({src, dst}); } @@ -507,27 +585,28 @@ namespace llarp void MempipeContext::Pump() { - std::deque< SendEvent > q; + std::vector< SendEvent > q; { util::Lock lock(&_access); q = std::move(_sendQueue); } - while(q.size()) + for(auto& f : q) { - const auto& f = q.front(); + ILinkSession::DeliveryStatus status = + ILinkSession::DeliveryStatus::eDeliveryDropped; { util::Lock lock(&_access); auto itr = _connections.find({std::get< 0 >(f), std::get< 1 >(f)}); - ILinkSession::DeliveryStatus status = - ILinkSession::DeliveryStatus::eDeliveryDropped; if(itr != _connections.end()) { - status = ILinkSession::DeliveryStatus::eDeliverySuccess; - itr->second->Recv(std::get< 2 >(f)); + const llarp_buffer_t pkt{std::get< 2 >(f)}; + if(itr->second->Write(pkt)) + status = ILinkSession::DeliveryStatus::eDeliverySuccess; } - CallLater(std::bind(std::get< 3 >(f), status)); } - q.pop_front(); + LogDebug(std::get< 0 >(f), "->", std::get< 1 >(f), + " status=", (int)status); + CallLater(std::bind(std::get< 3 >(f), status)); } } } // namespace mempipe diff --git a/test/link/test_llarp_link.cpp b/test/link/test_llarp_link.cpp index 63aec45f2..d214a9cc1 100644 --- a/test/link/test_llarp_link.cpp +++ b/test/link/test_llarp_link.cpp @@ -2,10 +2,12 @@ #include #include #include +#include #include #include #include + #include #include @@ -118,6 +120,7 @@ struct LinkLayerTest : public test::LlarpTest< NoOpCrypto > void SetUp() { + SetLogLevel(eLogDebug); oldRCLifetime = RouterContact::Lifetime; RouterContact::IgnoreBogons = true; RouterContact::Lifetime = 500; @@ -134,6 +137,7 @@ struct LinkLayerTest : public test::LlarpTest< NoOpCrypto > netLoop.reset(); RouterContact::IgnoreBogons = false; RouterContact::Lifetime = oldRCLifetime; + SetLogLevel(eLogInfo); } static void @@ -167,6 +171,96 @@ struct LinkLayerTest : public test::LlarpTest< NoOpCrypto > } }; +TEST_F(LinkLayerTest, TestMemPipe) +{ + Alice.link = mempipe::NewInboundLink( + Alice.encryptionKey, + [&]() -> const RouterContact& { return Alice.GetRC(); }, + [&](ILinkSession* s, const llarp_buffer_t& buf) -> bool { + if(Alice.gotLIM) + { + Alice.Regen(); + return s->RenegotiateSession(); + } + else + { + LinkIntroMessage msg; + ManagedBuffer copy{buf}; + if(!msg.BDecode(©.underlying)) + return false; + if(!s->GotLIM(&msg)) + return false; + Alice.gotLIM = true; + return true; + } + }, + [&](Signature& sig, const llarp_buffer_t& buf) -> bool { + return m_crypto.sign(sig, Alice.signingKey, buf); + }, + [&](ILinkSession* s) -> bool { + const auto rc = s->GetRemoteRC(); + return rc.pubkey == Bob.GetRC().pubkey; + }, + [&](RouterContact, RouterContact) -> bool { return true; }, + + [&](ILinkSession* session) { + ASSERT_FALSE(session->IsEstablished()); + Stop(); + }, + [&](RouterID router) { ASSERT_EQ(router, Bob.GetRouterID()); }); + + auto sendDiscardMessage = [](ILinkSession* s) -> bool { + // send discard message in reply to complete unit test + std::array< byte_t, 32 > tmp; + llarp_buffer_t otherBuf(tmp); + DiscardMessage discard; + if(!discard.BEncode(&otherBuf)) + return false; + otherBuf.sz = otherBuf.cur - otherBuf.base; + otherBuf.cur = otherBuf.base; + return s->SendMessageBuffer(otherBuf, nullptr); + }; + + Bob.link = mempipe::NewInboundLink( + Bob.encryptionKey, [&]() -> const RouterContact& { return Bob.GetRC(); }, + [&](ILinkSession* s, const llarp_buffer_t& buf) -> bool { + LinkIntroMessage msg; + ManagedBuffer copy{buf}; + if(!msg.BDecode(©.underlying)) + return false; + if(!s->GotLIM(&msg)) + return false; + Bob.gotLIM = true; + return sendDiscardMessage(s); + }, + + [&](Signature& sig, const llarp_buffer_t& buf) -> bool { + return m_crypto.sign(sig, Bob.signingKey, buf); + }, + [&](ILinkSession* s) -> bool { + if(s->GetRemoteRC().pubkey != Alice.GetRC().pubkey) + return false; + LogInfo("bob established with alice"); + return Bob.link->VisitSessionByPubkey(Alice.GetRC().pubkey.as_array(), + sendDiscardMessage); + }, + [&](RouterContact newrc, RouterContact oldrc) -> bool { + success = newrc.pubkey == oldrc.pubkey; + return true; + }, + [&](ILinkSession* session) { ASSERT_FALSE(session->IsEstablished()); }, + [&](RouterID router) { ASSERT_EQ(router, Alice.GetRouterID()); }); + + ASSERT_TRUE(Alice.Start(m_logic, netLoop, AlicePort)); + ASSERT_TRUE(Bob.Start(m_logic, netLoop, BobPort)); + + ASSERT_TRUE(Alice.link->TryEstablishTo(Bob.GetRC())); + + RunMainloop(); + ASSERT_TRUE(Bob.gotLIM); + ASSERT_TRUE(success); +}; + TEST_F(LinkLayerTest, TestUTPAliceRenegWithBob) { #ifdef WIN32