diff --git a/include/tuntap.h b/include/tuntap.h
index 8a56eb28e..6cc444a70 100644
--- a/include/tuntap.h
+++ b/include/tuntap.h
@@ -147,7 +147,7 @@ extern "C"
int flags; /* ifr.ifr_flags on Unix */
char if_name[IF_NAMESIZE];
#if defined(Windows)
- int idx; /* needed to set ipv6 address */
+ int idx; /* needed to set ipv6 address */
#endif
#if defined(FreeBSD)
int mode;
diff --git a/llarp/CMakeLists.txt b/llarp/CMakeLists.txt
index 36510c0e1..a05435ba5 100644
--- a/llarp/CMakeLists.txt
+++ b/llarp/CMakeLists.txt
@@ -178,10 +178,12 @@ set(LIB_SRC
iwp/linklayer.cpp
iwp/outermessage.cpp
iwp/iwp.cpp
+ link/factory.cpp
link/i_link_manager.cpp
link/link_manager.cpp
link/server.cpp
link/session.cpp
+ mempipe/mempipe.cpp
messages/dht_immediate.cpp
messages/discard.cpp
messages/link_intro.cpp
diff --git a/llarp/config/config.cpp b/llarp/config/config.cpp
index bf10ff91d..c68204354 100644
--- a/llarp/config/config.cpp
+++ b/llarp/config/config.cpp
@@ -33,6 +33,11 @@ namespace llarp
void
RouterConfig::fromSection(string_view key, string_view val)
{
+ if(key == "default-protocol")
+ {
+ m_DefaultLinkProto = tostr(val);
+ LogInfo("overriding default link protocol to '", val, "'");
+ }
if(key == "netid")
{
if(val.size() <= NetID::size())
@@ -194,9 +199,8 @@ namespace llarp
}
void
- IwpConfig::fromSection(string_view key, string_view val)
+ LinksConfig::fromSection(string_view key, string_view val)
{
- // try IPv4 first
uint16_t proto = 0;
std::set< std::string > parsed_opts;
@@ -215,7 +219,7 @@ namespace llarp
parsed_opts.insert(v);
}
} while(idx != std::string::npos);
-
+ std::set< std::string > opts;
/// for each option
for(const auto &item : parsed_opts)
{
@@ -229,15 +233,20 @@ namespace llarp
proto = port;
}
}
+ else
+ {
+ opts.insert(item);
+ }
}
if(key == "*")
{
- m_OutboundPort = proto;
+ m_OutboundLink = {"*", AF_INET, fromEnv(proto, "OUTBOUND_PORT"),
+ std::move(opts)};
}
else
{
- m_servers.emplace_back(tostr(key), AF_INET, proto);
+ m_InboundLinks.emplace_back(tostr(key), AF_INET, proto, std::move(opts));
}
}
@@ -434,7 +443,7 @@ namespace llarp
connect = find_section< ConnectConfig >(parser, "connect");
netdb = find_section< NetdbConfig >(parser, "netdb");
dns = find_section< DnsConfig >(parser, "dns");
- iwp_links = find_section< IwpConfig >(parser, "bind");
+ links = find_section< LinksConfig >(parser, "bind");
services = find_section< ServicesConfig >(parser, "services");
system = find_section< SystemConfig >(parser, "system");
metrics = find_section< MetricsConfig >(parser, "metrics");
diff --git a/llarp/config/config.hpp b/llarp/config/config.hpp
index 6ed3ee297..916144cee 100644
--- a/llarp/config/config.hpp
+++ b/llarp/config/config.hpp
@@ -128,6 +128,8 @@ namespace llarp
int m_workerThreads = 1;
int m_numNetThreads = 1;
+ std::string m_DefaultLinkProto = "utp";
+
public:
// clang-format off
size_t minConnectedRouters() const { return fromEnv(m_minConnectedRouters, "MIN_CONNECTED_ROUTERS"); }
@@ -143,6 +145,7 @@ namespace llarp
const AddressInfo& addrInfo() const { return m_addrInfo; }
int workerThreads() const { return fromEnv(m_workerThreads, "WORKER_THREADS"); }
int numNetThreads() const { return fromEnv(m_numNetThreads, "NUM_NET_THREADS"); }
+ std::string defaultLinkProto() const { return fromEnv(m_DefaultLinkProto, "LINK_PROTO"); }
// clang-format on
void
@@ -194,20 +197,27 @@ namespace llarp
fromSection(string_view key, string_view val);
};
- class IwpConfig
+ class LinksConfig
{
public:
- using Servers = std::vector< std::tuple< std::string, int, uint16_t > >;
+ static constexpr int Interface = 0;
+ static constexpr int AddressFamily = 1;
+ static constexpr int Port = 2;
+ static constexpr int Options = 3;
- private:
- uint16_t m_OutboundPort = 0;
+ using ServerOptions = std::set< std::string >;
+ using LinkInfo = std::tuple< std::string, int, uint16_t, ServerOptions >;
+ using Links = std::vector< LinkInfo >;
- Servers m_servers;
+ private:
+ LinkInfo m_OutboundLink;
+ Links m_InboundLinks;
public:
// clang-format off
- uint16_t outboundPort() const { return fromEnv(m_OutboundPort, "OUTBOUND_PORT"); }
- const Servers& servers() const { return m_servers; }
+ const LinkInfo& outboundLink() const { return m_OutboundLink; }
+
+ const Links& inboundLinks() const { return m_InboundLinks; }
// clang-format on
void
@@ -306,7 +316,7 @@ namespace llarp
ConnectConfig connect;
NetdbConfig netdb;
DnsConfig dns;
- IwpConfig iwp_links;
+ LinksConfig links;
ServicesConfig services;
SystemConfig system;
MetricsConfig metrics;
diff --git a/llarp/ev/ev_libuv.cpp b/llarp/ev/ev_libuv.cpp
index feb6221c6..224f0a1aa 100644
--- a/llarp/ev/ev_libuv.cpp
+++ b/llarp/ev/ev_libuv.cpp
@@ -633,16 +633,17 @@ namespace libuv
Loop::CloseAll()
{
llarp::LogInfo("Closing all handles");
- uv_walk(m_Impl.get(),
- [](uv_handle_t* h, void*) {
- if(uv_is_closing(h))
- return;
- if(h->data && uv_is_active(h))
- {
- static_cast< glue* >(h->data)->Close();
- }
- },
- nullptr);
+ uv_walk(
+ m_Impl.get(),
+ [](uv_handle_t* h, void*) {
+ if(uv_is_closing(h))
+ return;
+ if(h->data && uv_is_active(h))
+ {
+ static_cast< glue* >(h->data)->Close();
+ }
+ },
+ nullptr);
}
void
diff --git a/llarp/link/factory.cpp b/llarp/link/factory.cpp
new file mode 100644
index 000000000..ba71f9a1d
--- /dev/null
+++ b/llarp/link/factory.cpp
@@ -0,0 +1,52 @@
+#include
+#include
+#include
+
+namespace llarp
+{
+ LinkFactory::LinkType
+ LinkFactory::TypeFromName(string_view str)
+ {
+ if(str == "utp")
+ return LinkType::eLinkUTP;
+ if(str == "iwp")
+ return LinkType::eLinkIWP;
+ if(str == "mempipe")
+ return LinkType::eLinkMempipe;
+ return LinkType::eLinkUnknown;
+ }
+
+ std::string
+ LinkFactory::NameFromType(LinkFactory::LinkType tp)
+ {
+ switch(tp)
+ {
+ case LinkType::eLinkUTP:
+ return "utp";
+ case LinkType::eLinkIWP:
+ return "iwp";
+ case LinkType::eLinkMempipe:
+ return "mempipe";
+ default:
+ return "unspec";
+ }
+ }
+
+ LinkFactory::Factory
+ LinkFactory::Obtain(LinkFactory::LinkType tp, bool permitInbound)
+ {
+ switch(tp)
+ {
+ case LinkType::eLinkUTP:
+ if(permitInbound)
+ return llarp::utp::NewInboundLink;
+ return llarp::utp::NewOutboundLink;
+ case LinkType::eLinkMempipe:
+ if(permitInbound)
+ return llarp::mempipe::NewInboundLink;
+ return llarp::mempipe::NewOutboundLink;
+ default:
+ return nullptr;
+ }
+ }
+} // namespace llarp
\ No newline at end of file
diff --git a/llarp/link/factory.hpp b/llarp/link/factory.hpp
new file mode 100644
index 000000000..5acf3063e
--- /dev/null
+++ b/llarp/link/factory.hpp
@@ -0,0 +1,43 @@
+#ifndef LLARP_LINK_FACTORY_HPP
+#define LLARP_LINK_FACTORY_HPP
+#include
+#include
+
+#include
+
+namespace llarp
+{
+ /// LinkFactory is responsible for returning std::functions that create the
+ /// link layer types
+ struct LinkFactory
+ {
+ enum class LinkType
+ {
+ eLinkUTP,
+ eLinkIWP,
+ eLinkMempipe,
+ eLinkUnknown
+ };
+
+ using Factory = std::function< LinkLayer_ptr(
+ const SecretKey&, GetRCFunc, LinkMessageHandler, SignBufferFunc,
+ SessionEstablishedHandler, SessionRenegotiateHandler, TimeoutHandler,
+ SessionClosedHandler) >;
+
+ /// get link type by name string
+ /// if invalid returns eLinkUnspec
+ static LinkType
+ TypeFromName(string_view name);
+
+ /// turns a link type into a string representation
+ static std::string
+ NameFromType(LinkType t);
+
+ /// obtain a link factory of a certain type
+ static Factory
+ Obtain(LinkType t, bool permitInbound);
+ };
+
+} // namespace llarp
+
+#endif
\ No newline at end of file
diff --git a/llarp/link/server.hpp b/llarp/link/server.hpp
index 74f3cf3cf..3f12f455d 100644
--- a/llarp/link/server.hpp
+++ b/llarp/link/server.hpp
@@ -103,7 +103,7 @@ namespace llarp
llarp_ev_udp_sendto(&m_udp, to, pkt);
}
- bool
+ virtual bool
Configure(llarp_ev_loop_ptr loop, const std::string& ifname, int af,
uint16_t port);
@@ -125,7 +125,7 @@ namespace llarp
virtual bool
Start(std::shared_ptr< llarp::Logic > l);
- void
+ virtual void
Stop();
virtual const char*
@@ -140,11 +140,11 @@ namespace llarp
void
KeepAliveSessionTo(const RouterID& remote);
- bool
+ virtual bool
SendTo(const RouterID& remote, const llarp_buffer_t& buf,
ILinkSession::CompletionHandler completed);
- bool
+ virtual bool
GetOurAddressInfo(AddressInfo& addr) const;
bool
@@ -200,6 +200,12 @@ namespace llarp
SessionClosedHandler SessionClosed;
SessionRenegotiateHandler SessionRenegotiate;
+ std::shared_ptr< Logic >
+ logic()
+ {
+ return m_Logic;
+ }
+
bool
operator<(const ILinkLayer& other) const
{
diff --git a/llarp/mempipe/mempipe.cpp b/llarp/mempipe/mempipe.cpp
new file mode 100644
index 000000000..58a0a3696
--- /dev/null
+++ b/llarp/mempipe/mempipe.cpp
@@ -0,0 +1,534 @@
+#include
+#include
+#include
+#include
+
+namespace llarp
+{
+ namespace mempipe
+ {
+ struct MemLink;
+ struct MemSession;
+
+ struct MempipeContext
+ {
+ using Nodes_t =
+ std::unordered_map< RouterID, LinkLayer_ptr, RouterID::Hash >;
+ Nodes_t _nodes;
+ using SendEvent = std::tuple< RouterID, RouterID, std::vector< byte_t >,
+ ILinkSession::CompletionHandler >;
+
+ std::deque< SendEvent > _sendQueue;
+
+ /// (src, dst, session, hook)
+ using NodeConnection_t = std::tuple< RouterID, RouterID >;
+
+ struct NodeConnectionHash
+ {
+ size_t
+ operator()(const NodeConnection_t con) const
+ {
+ const auto& a = std::get< 0 >(con);
+ const auto& b = std::get< 1 >(con);
+ auto op = std::bit_xor< size_t >();
+ return std::accumulate(a.begin(), a.end(),
+ std::accumulate(b.begin(), b.end(), 0, op),
+ op);
+ }
+ };
+
+ using NodeConnections_t =
+ std::unordered_map< NodeConnection_t, std::shared_ptr< MemSession >,
+ NodeConnectionHash >;
+
+ NodeConnections_t _connections;
+
+ mutable util::Mutex _access;
+
+ void
+ AddNode(LinkLayer_ptr ptr) LOCKS_EXCLUDED(_access);
+
+ void
+ RemoveNode(LinkLayer_ptr ptr) LOCKS_EXCLUDED(_access);
+
+ LinkLayer_ptr
+ FindNode(const RouterID pk) LOCKS_EXCLUDED(_access);
+
+ /// connect src to dst
+ void
+ ConnectNode(const RouterID src, const RouterID dst,
+ const std::shared_ptr< MemSession >& ptr)
+ LOCKS_EXCLUDED(_access);
+
+ /// remote both src and dst as connected
+ void
+ DisconnectNode(const RouterID src, const RouterID dst)
+ LOCKS_EXCLUDED(_access);
+
+ bool
+ HasConnection(const RouterID src, const RouterID dst) const
+ LOCKS_EXCLUDED(_access);
+
+ void
+ InboundConnection(const RouterID to,
+ const std::shared_ptr< MemSession >& obsession);
+
+ void
+ CallLater(std::function< void(void) > f)
+ {
+ m_Logic->call_later(10, f);
+ }
+
+ bool
+ SendTo(const RouterID src, const RouterID dst,
+ const std::vector< byte_t > msg,
+ ILinkSession::CompletionHandler delivery) LOCKS_EXCLUDED(_access);
+
+ void
+ Pump() LOCKS_EXCLUDED(_access);
+
+ void
+ Start()
+ {
+ m_Run.store(true);
+ m_Thread = new std::thread{[&]() {
+ m_Logic = std::make_shared< Logic >();
+ while(m_Run.load())
+ {
+ Pump();
+ m_Logic->tick(time_now_ms());
+ std::this_thread::sleep_for(std::chrono::milliseconds(1));
+ }
+ m_Logic = nullptr;
+ }};
+ }
+
+ ~MempipeContext()
+ {
+ m_Run.store(false);
+ if(m_Thread)
+ {
+ m_Thread->join();
+ delete m_Thread;
+ }
+ }
+
+ std::atomic< bool > m_Run;
+ std::shared_ptr< Logic > m_Logic = nullptr;
+ std::thread* m_Thread = nullptr;
+ };
+
+ using Globals_ptr = std::unique_ptr< MempipeContext >;
+
+ Globals_ptr _globals;
+
+ struct MemSession : public ILinkSession,
+ public std::enable_shared_from_this< MemSession >
+ {
+ MemSession(LinkLayer_ptr _local, LinkLayer_ptr _remote)
+ : remote(std::move(_remote)), parent(std::move(_local))
+ {
+ }
+
+ LinkLayer_ptr remote;
+ LinkLayer_ptr parent;
+
+ util::Mutex _access;
+
+ std::deque< std::vector< byte_t > > m_recvQueue;
+ std::deque< std::tuple< std::vector< byte_t >, CompletionHandler > >
+ m_sendQueue;
+
+ llarp_time_t lastRecv = 0;
+
+ PubKey
+ GetPubKey() const override
+ {
+ return remote->GetOurRC().pubkey;
+ }
+
+ bool
+ SendKeepAlive() override
+ {
+ std::array< byte_t, 128 > pkt;
+ DiscardMessage msg;
+ llarp_buffer_t buf{pkt};
+ if(!msg.BEncode(&buf))
+ return false;
+ buf.sz = buf.cur - buf.base;
+ buf.cur = buf.base;
+ return SendMessageBuffer(buf, nullptr);
+ }
+
+ void
+ Recv(const std::vector< byte_t > msg) LOCKS_EXCLUDED(_access)
+ {
+ util::Lock lock(&_access);
+ m_recvQueue.emplace_back(std::move(msg));
+ lastRecv = parent->Now();
+ }
+
+ void
+ OnLinkEstablished(ILinkLayer*) override
+ {
+ return;
+ }
+
+ bool
+ TimedOut(llarp_time_t now) const override
+ {
+ return now >= lastRecv && now - lastRecv > 5000;
+ }
+
+ void
+ PumpWrite() LOCKS_EXCLUDED(_access)
+ {
+ std::deque< std::tuple< std::vector< byte_t >, CompletionHandler > > q;
+ {
+ util::Lock lock(&_access);
+ if(m_sendQueue.size())
+ q = std::move(m_sendQueue);
+ }
+ const RouterID src = parent->GetOurRC().pubkey;
+ const RouterID dst = GetPubKey();
+ while(q.size())
+ {
+ const auto& f = q.front();
+ _globals->SendTo(src, dst, std::get< 0 >(f), std::get< 1 >(f));
+ q.pop_front();
+ }
+ }
+
+ void
+ PumpRead() LOCKS_EXCLUDED(_access)
+ {
+ std::deque< std::vector< byte_t > > q;
+ {
+ util::Lock lock(&_access);
+ if(m_recvQueue.size())
+ q = std::move(m_recvQueue);
+ }
+ while(q.size())
+ {
+ const llarp_buffer_t buf{q.front()};
+ parent->HandleMessage(this, buf);
+ q.pop_front();
+ }
+ }
+
+ void Tick(llarp_time_t) override
+ {
+ }
+
+ void
+ Pump() override
+ {
+ PumpRead();
+ PumpWrite();
+ }
+
+ void
+ Close() override
+ {
+ auto self = shared_from_this();
+ _globals->CallLater([=]() { self->Disconnected(); });
+ }
+
+ RouterContact
+ GetRemoteRC() const override
+ {
+ return remote->GetOurRC();
+ }
+
+ bool
+ ShouldPing() const override
+ {
+ return true;
+ }
+
+ bool
+ SendMessageBuffer(const llarp_buffer_t& pkt,
+ ILinkSession::CompletionHandler completed) override
+ {
+ if(completed == nullptr)
+ completed = [](ILinkSession::DeliveryStatus) {};
+ auto self = shared_from_this();
+ std::vector< byte_t > buf(pkt.sz);
+ std::copy_n(pkt.base, pkt.sz, buf.begin());
+ return _globals->SendTo(parent->GetOurRC().pubkey, GetRemoteRC().pubkey,
+ buf, [=](ILinkSession::DeliveryStatus status) {
+ self->parent->logic()->call_later(
+ 10, std::bind(completed, status));
+ });
+ }
+
+ void
+ Start() override
+ {
+ auto self = shared_from_this();
+ _globals->CallLater(
+ [=]() { _globals->InboundConnection(self->GetPubKey(), self); });
+ }
+
+ bool
+ IsEstablished() const override
+ {
+ return _globals->HasConnection(parent->GetOurRC().pubkey, GetPubKey());
+ }
+
+ void
+ Disconnected()
+ {
+ _globals->DisconnectNode(parent->GetOurRC().pubkey, GetPubKey());
+ }
+
+ bool
+ RenegotiateSession() override
+ {
+ return true;
+ }
+
+ ILinkLayer*
+ GetLinkLayer() const override
+ {
+ return parent.get();
+ }
+
+ util::StatusObject
+ ExtractStatus() const override
+ {
+ return {};
+ }
+
+ llarp::Addr
+ GetRemoteEndpoint() const override
+ {
+ return {};
+ }
+
+ size_t
+ SendQueueBacklog() const override
+ {
+ return m_sendQueue.size();
+ }
+ };
+
+ struct MemLink : public ILinkLayer,
+ public std::enable_shared_from_this< MemLink >
+ {
+ MemLink(const SecretKey& routerEncSecret, GetRCFunc getrc,
+ LinkMessageHandler h, SignBufferFunc sign,
+ SessionEstablishedHandler est, SessionRenegotiateHandler reneg,
+ TimeoutHandler timeout, SessionClosedHandler closed,
+ bool permitInbound)
+ : ILinkLayer(routerEncSecret, getrc, h, sign, est, reneg, timeout,
+ closed)
+ , allowInbound(permitInbound)
+ {
+ }
+
+ const bool allowInbound;
+
+ bool
+ KeyGen(SecretKey& k) override
+ {
+ k.Zero();
+ return true;
+ }
+
+ const char*
+ Name() const override
+ {
+ return "mempipe";
+ }
+
+ uint16_t
+ Rank() const override
+ {
+ return 100;
+ }
+
+ void
+ RecvFrom(const llarp::Addr&, const void*, size_t) override
+ {
+ }
+
+ bool
+ Configure(llarp_ev_loop_ptr, const std::string&, int, uint16_t) override
+ {
+ if(_globals == nullptr)
+ _globals = std::make_unique< MempipeContext >();
+ return _globals != nullptr;
+ }
+
+ std::shared_ptr< ILinkSession >
+ NewOutboundSession(const RouterContact& rc,
+ const AddressInfo& ai) override
+ {
+ if(ai.dialect != Name())
+ return nullptr;
+ auto remote = _globals->FindNode(rc.pubkey);
+ if(remote == nullptr)
+ return nullptr;
+ return std::make_shared< MemSession >(shared_from_this(), remote);
+ }
+
+ bool
+ Start(std::shared_ptr< Logic > l) override
+ {
+ if(!ILinkLayer::Start(l))
+ return false;
+ _globals->AddNode(shared_from_this());
+ return true;
+ }
+
+ void
+ Stop() override
+ {
+ _globals->RemoveNode(shared_from_this());
+ }
+ };
+
+ LinkLayer_ptr
+ NewOutboundLink(const SecretKey& routerEncSecret, GetRCFunc getrc,
+ LinkMessageHandler h, SignBufferFunc sign,
+ SessionEstablishedHandler est,
+ SessionRenegotiateHandler reneg, TimeoutHandler timeout,
+ SessionClosedHandler closed)
+ {
+ return std::make_shared< MemLink >(routerEncSecret, getrc, h, sign, est,
+ reneg, timeout, closed, false);
+ }
+
+ LinkLayer_ptr
+ NewInboundLink(const SecretKey& routerEncSecret, GetRCFunc getrc,
+ LinkMessageHandler h, SignBufferFunc sign,
+ SessionEstablishedHandler est,
+ SessionRenegotiateHandler reneg, TimeoutHandler timeout,
+ SessionClosedHandler closed)
+ {
+ return std::make_shared< MemLink >(routerEncSecret, getrc, h, sign, est,
+ reneg, timeout, closed, true);
+ }
+
+ void
+ MempipeContext::AddNode(LinkLayer_ptr ptr)
+ {
+ util::Lock lock(&_access);
+ _nodes.emplace(RouterID(ptr->GetOurRC().pubkey), ptr);
+ }
+
+ bool
+ MempipeContext::SendTo(const RouterID src, const RouterID dst,
+ const std::vector< byte_t > msg,
+ ILinkSession::CompletionHandler delivery)
+ {
+ util::Lock lock(&_access);
+ _sendQueue.emplace_back(std::move(src), std::move(dst), std::move(msg),
+ std::move(delivery));
+ return true;
+ }
+
+ void
+ MempipeContext::InboundConnection(const RouterID to,
+ const std::shared_ptr< MemSession >& ob)
+ {
+ std::shared_ptr< MemSession > other;
+ {
+ util::Lock lock(&_access);
+ auto itr = _nodes.find(to);
+ if(itr != _nodes.end())
+ {
+ other = std::make_shared< MemSession >(itr->second, ob->parent);
+ }
+ }
+ if(other)
+ {
+ ConnectNode(other->GetPubKey(), ob->GetPubKey(), other);
+ ConnectNode(ob->GetPubKey(), other->GetPubKey(), ob);
+ }
+ else
+ {
+ ob->Disconnected();
+ }
+ }
+
+ void
+ MempipeContext::ConnectNode(const RouterID src, const RouterID dst,
+ const std::shared_ptr< MemSession >& session)
+ {
+ util::Lock lock(&_access);
+ _connections.emplace(std::make_pair(std::make_tuple(src, dst), session));
+ }
+
+ void
+ MempipeContext::DisconnectNode(const RouterID src, const RouterID dst)
+ {
+ util::Lock lock(&_access);
+ _connections.erase({src, dst});
+ }
+
+ LinkLayer_ptr
+ MempipeContext::FindNode(const RouterID rid)
+ {
+ util::Lock lock(&_access);
+ auto itr = _nodes.find(rid);
+ if(itr == _nodes.end())
+ return nullptr;
+ return itr->second;
+ }
+
+ bool
+ MempipeContext::HasConnection(const RouterID src, const RouterID dst) const
+ {
+ util::Lock lock(&_access);
+ return _connections.find({src, dst}) != _connections.end();
+ }
+
+ void
+ MempipeContext::RemoveNode(LinkLayer_ptr node)
+ {
+ util::Lock lock(&_access);
+ const RouterID pk = node->GetOurRC().pubkey;
+ _nodes.erase(pk);
+ auto itr = _connections.begin();
+ while(itr != _connections.end())
+ {
+ if(std::get< 0 >(itr->first) == pk || std::get< 1 >(itr->first) == pk)
+ {
+ auto s = itr->second->shared_from_this();
+ itr->second->GetLinkLayer()->logic()->call_later(
+ 1, [s]() { s->Disconnected(); });
+ }
+ ++itr;
+ }
+ }
+
+ void
+ MempipeContext::Pump()
+ {
+ std::deque< SendEvent > q;
+ {
+ util::Lock lock(&_access);
+ q = std::move(_sendQueue);
+ }
+ while(q.size())
+ {
+ const auto& f = q.front();
+ {
+ util::Lock lock(&_access);
+ auto itr = _connections.find({std::get< 0 >(f), std::get< 1 >(f)});
+ ILinkSession::DeliveryStatus status =
+ ILinkSession::DeliveryStatus::eDeliveryDropped;
+ if(itr != _connections.end())
+ {
+ status = ILinkSession::DeliveryStatus::eDeliverySuccess;
+ itr->second->Recv(std::get< 2 >(f));
+ }
+ CallLater(std::bind(std::get< 3 >(f), status));
+ }
+ q.pop_front();
+ }
+ }
+ } // namespace mempipe
+} // namespace llarp
\ No newline at end of file
diff --git a/llarp/mempipe/mempipe.hpp b/llarp/mempipe/mempipe.hpp
new file mode 100644
index 000000000..91094602a
--- /dev/null
+++ b/llarp/mempipe/mempipe.hpp
@@ -0,0 +1,25 @@
+#ifndef LLARP_MEMPIPE_MEMPIPE_HPP
+#define LLARP_MEMPIPE_MEMPIPE_HPP
+#include
+#include
+
+namespace llarp
+{
+ namespace mempipe
+ {
+ LinkLayer_ptr
+ NewInboundLink(const SecretKey& routerEncSecret, GetRCFunc getrc,
+ LinkMessageHandler h, SignBufferFunc sign,
+ SessionEstablishedHandler est,
+ SessionRenegotiateHandler reneg, TimeoutHandler timeout,
+ SessionClosedHandler closed);
+ LinkLayer_ptr
+ NewOutboundLink(const SecretKey& routerEncSecret, GetRCFunc getrc,
+ LinkMessageHandler h, SignBufferFunc sign,
+ SessionEstablishedHandler est,
+ SessionRenegotiateHandler reneg, TimeoutHandler timeout,
+ SessionClosedHandler closed);
+ } // namespace mempipe
+} // namespace llarp
+
+#endif
\ No newline at end of file
diff --git a/llarp/path/path.cpp b/llarp/path/path.cpp
index 97f69d3fe..c3585cff1 100644
--- a/llarp/path/path.cpp
+++ b/llarp/path/path.cpp
@@ -103,8 +103,8 @@ namespace llarp
}
bool
- Path::HandleLRSM(uint64_t status,
- std::array< EncryptedFrame, 8 >& frames, AbstractRouter* r)
+ Path::HandleLRSM(uint64_t status, std::array< EncryptedFrame, 8 >& frames,
+ AbstractRouter* r)
{
uint64_t currentStatus = status;
diff --git a/llarp/path/path_context.cpp b/llarp/path/path_context.cpp
index b2460f84b..84a19f575 100644
--- a/llarp/path/path_context.cpp
+++ b/llarp/path/path_context.cpp
@@ -164,14 +164,15 @@ namespace llarp
HopHandler_ptr
PathContext::GetByUpstream(const RouterID& remote, const PathID_t& id)
{
- auto own = MapGet(m_OurPaths, id,
- [](const PathSet_ptr) -> bool {
- // TODO: is this right?
- return true;
- },
- [remote, id](PathSet_ptr p) -> HopHandler_ptr {
- return p->GetByUpstream(remote, id);
- });
+ auto own = MapGet(
+ m_OurPaths, id,
+ [](const PathSet_ptr) -> bool {
+ // TODO: is this right?
+ return true;
+ },
+ [remote, id](PathSet_ptr p) -> HopHandler_ptr {
+ return p->GetByUpstream(remote, id);
+ });
if(own)
return own;
diff --git a/llarp/router/router.cpp b/llarp/router/router.cpp
index 686e44d1a..c0207304a 100644
--- a/llarp/router/router.cpp
+++ b/llarp/router/router.cpp
@@ -368,9 +368,17 @@ namespace llarp
// reset netid in our rc
_rc.netID = llarp::NetID();
}
+ const auto linktypename = conf->router.defaultLinkProto();
+ _defaultLinkType = LinkFactory::TypeFromName(linktypename);
+ if(_defaultLinkType == LinkFactory::LinkType::eLinkUnknown)
+ {
+ LogError("failed to set link type to '", linktypename,
+ "' as that is invalid");
+ return false;
+ }
// IWP config
- m_OutboundPort = conf->iwp_links.outboundPort();
+ m_OutboundPort = std::get< LinksConfig::Port >(conf->links.outboundLink());
// Router config
_rc.SetNick(conf->router.nickname());
maxConnectedRouters = conf->router.maxConnectedRouters();
@@ -391,7 +399,7 @@ namespace llarp
lokidRPCPassword = conf->lokid.lokidRPCPassword;
// TODO: add config flag for "is service node"
- if(conf->iwp_links.servers().size())
+ if(conf->links.inboundLinks().size())
{
m_isServiceNode = true;
}
@@ -479,15 +487,34 @@ namespace llarp
}
// create inbound links, if we are a service node
- for(const auto &serverConfig : conf->iwp_links.servers())
+ for(const auto &serverConfig : conf->links.inboundLinks())
{
- auto server = llarp::utp::NewInboundLink(
+ // get default factory
+ auto inboundLinkFactory = LinkFactory::Obtain(_defaultLinkType, true);
+ // for each option if provided ...
+ for(const auto &opt : std::get< LinksConfig::Options >(serverConfig))
+ {
+ // try interpreting it as a link type
+ const auto linktype = LinkFactory::TypeFromName(opt);
+ if(linktype != LinkFactory::LinkType::eLinkUnknown)
+ {
+ // override link factory if it's a valid link type
+ auto factory = LinkFactory::Obtain(linktype, true);
+ if(factory)
+ {
+ inboundLinkFactory = std::move(factory);
+ break;
+ }
+ }
+ }
+
+ auto server = inboundLinkFactory(
encryption(), util::memFn(&AbstractRouter::rc, this),
util::memFn(&AbstractRouter::HandleRecvLinkMessageBuffer, this),
+ util::memFn(&AbstractRouter::Sign, this),
util::memFn(&IOutboundSessionMaker::OnSessionEstablished,
&_outboundSessionMaker),
util::memFn(&AbstractRouter::CheckRenegotiateValid, this),
- util::memFn(&AbstractRouter::Sign, this),
util::memFn(&IOutboundSessionMaker::OnConnectTimeout,
&_outboundSessionMaker),
util::memFn(&AbstractRouter::SessionClosed, this));
@@ -498,9 +525,9 @@ namespace llarp
return false;
}
- const auto &key = std::get< 0 >(serverConfig);
- int af = std::get< 1 >(serverConfig);
- uint16_t port = std::get< 2 >(serverConfig);
+ const auto &key = std::get< LinksConfig::Interface >(serverConfig);
+ int af = std::get< LinksConfig::AddressFamily >(serverConfig);
+ uint16_t port = std::get< LinksConfig::Port >(serverConfig);
if(!server->Configure(netloop(), key, af, port))
{
LogError("failed to bind inbound link on ", key, " port ", port);
@@ -1059,48 +1086,44 @@ namespace llarp
bool
Router::InitOutboundLinks()
{
- using LinkFactory = std::function< LinkLayer_ptr(
- const SecretKey &, GetRCFunc, LinkMessageHandler,
- SessionEstablishedHandler, SessionRenegotiateHandler, SignBufferFunc,
- TimeoutHandler, SessionClosedHandler) >;
+ const auto linkTypeName = LinkFactory::NameFromType(_defaultLinkType);
+ LogInfo("initialize outbound link: ", linkTypeName);
+ auto factory = LinkFactory::Obtain(_defaultLinkType, false);
+ if(factory == nullptr)
+ {
+ LogError("cannot initialize outbound link of type '", linkTypeName,
+ "' as it has no implementation");
+ return false;
+ }
+ auto link =
+ factory(encryption(), util::memFn(&AbstractRouter::rc, this),
+ util::memFn(&AbstractRouter::HandleRecvLinkMessageBuffer, this),
+ util::memFn(&AbstractRouter::Sign, this),
+ util::memFn(&IOutboundSessionMaker::OnSessionEstablished,
+ &_outboundSessionMaker),
+ util::memFn(&AbstractRouter::CheckRenegotiateValid, this),
+ util::memFn(&IOutboundSessionMaker::OnConnectTimeout,
+ &_outboundSessionMaker),
+ util::memFn(&AbstractRouter::SessionClosed, this));
+
+ if(!link)
+ return false;
+ if(!link->EnsureKeys(transport_keyfile.string().c_str()))
+ {
+ LogError("failed to load ", transport_keyfile);
+ return false;
+ }
- static std::list< LinkFactory > linkFactories = {utp::NewOutboundLink,
- iwp::NewServer};
+ const auto afs = {AF_INET, AF_INET6};
- bool addedAtLeastOne = false;
- for(const auto &factory : linkFactories)
+ for(const auto af : afs)
{
- auto link = factory(
- encryption(), util::memFn(&AbstractRouter::rc, this),
- util::memFn(&AbstractRouter::HandleRecvLinkMessageBuffer, this),
- util::memFn(&IOutboundSessionMaker::OnSessionEstablished,
- &_outboundSessionMaker),
- util::memFn(&AbstractRouter::CheckRenegotiateValid, this),
- util::memFn(&AbstractRouter::Sign, this),
- util::memFn(&IOutboundSessionMaker::OnConnectTimeout,
- &_outboundSessionMaker),
- util::memFn(&AbstractRouter::SessionClosed, this));
-
- if(!link)
+ if(!link->Configure(netloop(), "*", af, m_OutboundPort))
continue;
- if(!link->EnsureKeys(transport_keyfile.string().c_str()))
- {
- LogError("failed to load ", transport_keyfile);
- continue;
- }
-
- const auto afs = {AF_INET, AF_INET6};
-
- for(const auto af : afs)
- {
- if(!link->Configure(netloop(), "*", af, m_OutboundPort))
- continue;
- _linkManager.AddLink(std::move(link), false);
- addedAtLeastOne = true;
- break;
- }
+ _linkManager.AddLink(std::move(link), false);
+ return true;
}
- return addedAtLeastOne;
+ return false;
}
bool
diff --git a/llarp/router/router.hpp b/llarp/router/router.hpp
index 2b6b2df86..e4f2fb4d0 100644
--- a/llarp/router/router.hpp
+++ b/llarp/router/router.hpp
@@ -28,6 +28,7 @@
#include
#include
#include
+#include
#include
#include
@@ -175,6 +176,8 @@ namespace llarp
struct sockaddr_in ip4addr;
AddressInfo addrInfo;
+ LinkFactory::LinkType _defaultLinkType;
+
llarp_ev_loop_ptr _netloop;
std::shared_ptr< llarp::thread::ThreadPool > cryptoworker;
std::shared_ptr< Logic > _logic;
diff --git a/llarp/service/endpoint.cpp b/llarp/service/endpoint.cpp
index f00fb9e9d..f70194c50 100644
--- a/llarp/service/endpoint.cpp
+++ b/llarp/service/endpoint.cpp
@@ -510,7 +510,7 @@ namespace llarp
{
auto msg = std::make_shared< routing::DHTMessage >();
msg->M.emplace_back(
- std::make_unique< dht::PublishIntroMessage >(m_IntroSet, txid, 1));
+ std::make_unique< dht::PublishIntroMessage >(m_IntroSet, txid, 5));
return msg;
}
diff --git a/llarp/service/protocol.cpp b/llarp/service/protocol.cpp
index a42d24aba..31dab271e 100644
--- a/llarp/service/protocol.cpp
+++ b/llarp/service/protocol.cpp
@@ -309,7 +309,8 @@ namespace llarp
if(self->frame.T != self->msg->tag)
{
- LogError("convotag missmatch: ", self->frame.T, " != ", self->msg->tag);
+ LogError("convotag missmatch: ", self->frame.T,
+ " != ", self->msg->tag);
self->msg.reset();
delete self;
return;
diff --git a/llarp/utp/utp.cpp b/llarp/utp/utp.cpp
index 01d47502d..911fdc309 100644
--- a/llarp/utp/utp.cpp
+++ b/llarp/utp/utp.cpp
@@ -10,9 +10,10 @@ namespace llarp
{
LinkLayer_ptr
NewOutboundLink(const SecretKey& routerEncSecret, GetRCFunc getrc,
- LinkMessageHandler h, SessionEstablishedHandler est,
- SessionRenegotiateHandler reneg, SignBufferFunc sign,
- TimeoutHandler timeout, SessionClosedHandler closed)
+ LinkMessageHandler h, SignBufferFunc sign,
+ SessionEstablishedHandler est,
+ SessionRenegotiateHandler reneg, TimeoutHandler timeout,
+ SessionClosedHandler closed)
{
return std::make_shared< LinkLayer >(routerEncSecret, getrc, h, sign, est,
reneg, timeout, closed, false);
@@ -20,9 +21,10 @@ namespace llarp
LinkLayer_ptr
NewInboundLink(const SecretKey& routerEncSecret, GetRCFunc getrc,
- LinkMessageHandler h, SessionEstablishedHandler est,
- SessionRenegotiateHandler reneg, SignBufferFunc sign,
- TimeoutHandler timeout, SessionClosedHandler closed)
+ LinkMessageHandler h, SignBufferFunc sign,
+ SessionEstablishedHandler est,
+ SessionRenegotiateHandler reneg, TimeoutHandler timeout,
+ SessionClosedHandler closed)
{
return std::make_shared< LinkLayer >(routerEncSecret, getrc, h, sign, est,
reneg, timeout, closed, true);
diff --git a/llarp/utp/utp.hpp b/llarp/utp/utp.hpp
index 10857b796..d368065e2 100644
--- a/llarp/utp/utp.hpp
+++ b/llarp/utp/utp.hpp
@@ -6,20 +6,20 @@
namespace llarp
{
- struct AbstractRouter;
-
namespace utp
{
LinkLayer_ptr
NewInboundLink(const SecretKey& routerEncSecret, GetRCFunc getrc,
- LinkMessageHandler h, SessionEstablishedHandler est,
- SessionRenegotiateHandler reneg, SignBufferFunc sign,
- TimeoutHandler timeout, SessionClosedHandler closed);
+ LinkMessageHandler h, SignBufferFunc sign,
+ SessionEstablishedHandler est,
+ SessionRenegotiateHandler reneg, TimeoutHandler timeout,
+ SessionClosedHandler closed);
LinkLayer_ptr
NewOutboundLink(const SecretKey& routerEncSecret, GetRCFunc getrc,
- LinkMessageHandler h, SessionEstablishedHandler est,
- SessionRenegotiateHandler reneg, SignBufferFunc sign,
- TimeoutHandler timeout, SessionClosedHandler closed);
+ LinkMessageHandler h, SignBufferFunc sign,
+ SessionEstablishedHandler est,
+ SessionRenegotiateHandler reneg, TimeoutHandler timeout,
+ SessionClosedHandler closed);
/// shim
const auto NewServer = NewInboundLink;
} // namespace utp
diff --git a/test/config/test_llarp_config_config.cpp b/test/config/test_llarp_config_config.cpp
index 55eadeb3c..a31a7ed79 100644
--- a/test/config/test_llarp_config_config.cpp
+++ b/test/config/test_llarp_config_config.cpp
@@ -102,10 +102,10 @@ metric-tank-host=52.80.56.123:2003
ASSERT_FALSE(config.metrics.disableMetrics);
{
- using kv = IwpConfig::Servers::value_type;
+ using kv = LinksConfig::Links::value_type;
- ASSERT_THAT(config.iwp_links.servers(),
- UnorderedElementsAre(kv("eth0", AF_INET, 5501)));
+ ASSERT_THAT(config.links.inboundLinks(),
+ UnorderedElementsAre(kv("eth0", AF_INET, 5501, {})));
}
ASSERT_THAT(config.bootstrap.routers,
diff --git a/test/link/test_llarp_link.cpp b/test/link/test_llarp_link.cpp
index 3c83b5050..63aec45f2 100644
--- a/test/link/test_llarp_link.cpp
+++ b/test/link/test_llarp_link.cpp
@@ -193,14 +193,15 @@ TEST_F(LinkLayerTest, TestUTPAliceRenegWithBob)
return true;
}
},
+ [&](Signature& sig, const llarp_buffer_t& buf) -> bool {
+ return m_crypto.sign(sig, Alice.signingKey, buf);
+ },
[&](ILinkSession* s) -> bool {
const auto rc = s->GetRemoteRC();
return rc.pubkey == Bob.GetRC().pubkey;
},
[&](RouterContact, RouterContact) -> bool { return true; },
- [&](Signature& sig, const llarp_buffer_t& buf) -> bool {
- return m_crypto.sign(sig, Alice.signingKey, buf);
- },
+
[&](ILinkSession* session) {
ASSERT_FALSE(session->IsEstablished());
Stop();
@@ -231,6 +232,10 @@ TEST_F(LinkLayerTest, TestUTPAliceRenegWithBob)
Bob.gotLIM = true;
return sendDiscardMessage(s);
},
+
+ [&](Signature& sig, const llarp_buffer_t& buf) -> bool {
+ return m_crypto.sign(sig, Bob.signingKey, buf);
+ },
[&](ILinkSession* s) -> bool {
if(s->GetRemoteRC().pubkey != Alice.GetRC().pubkey)
return false;
@@ -242,9 +247,6 @@ TEST_F(LinkLayerTest, TestUTPAliceRenegWithBob)
success = newrc.pubkey == oldrc.pubkey;
return true;
},
- [&](Signature& sig, const llarp_buffer_t& buf) -> bool {
- return m_crypto.sign(sig, Bob.signingKey, buf);
- },
[&](ILinkSession* session) { ASSERT_FALSE(session->IsEstablished()); },
[&](RouterID router) { ASSERT_EQ(router, Alice.GetRouterID()); });
@@ -280,6 +282,9 @@ TEST_F(LinkLayerTest, TestUTPAliceConnectToBob)
return false;
}
return AliceGotMessage(buf);
+ },
+ [&](Signature& sig, const llarp_buffer_t& buf) -> bool {
+ return m_crypto.sign(sig, Alice.signingKey, buf);
},
[&](ILinkSession* s) -> bool {
if(s->GetRemoteRC().pubkey != Bob.GetRC().pubkey)
@@ -288,9 +293,7 @@ TEST_F(LinkLayerTest, TestUTPAliceConnectToBob)
return true;
},
[&](RouterContact, RouterContact) -> bool { return true; },
- [&](Signature& sig, const llarp_buffer_t& buf) -> bool {
- return m_crypto.sign(sig, Alice.signingKey, buf);
- },
+
[&](ILinkSession* session) {
ASSERT_FALSE(session->IsEstablished());
Stop();
@@ -312,6 +315,9 @@ TEST_F(LinkLayerTest, TestUTPAliceConnectToBob)
return false;
}
return true;
+ },
+ [&](Signature& sig, const llarp_buffer_t& buf) -> bool {
+ return m_crypto.sign(sig, Bob.signingKey, buf);
},
[&](ILinkSession* s) -> bool {
if(s->GetRemoteRC().pubkey != Alice.GetRC().pubkey)
@@ -332,9 +338,6 @@ TEST_F(LinkLayerTest, TestUTPAliceConnectToBob)
return true;
},
[&](RouterContact, RouterContact) -> bool { return true; },
- [&](Signature& sig, const llarp_buffer_t& buf) -> bool {
- return m_crypto.sign(sig, Bob.signingKey, buf);
- },
[&](ILinkSession* session) { ASSERT_FALSE(session->IsEstablished()); },
[&](RouterID router) { ASSERT_EQ(router, Alice.GetRouterID()); });