Refactor Router code into more classes

This commit refactors functionality from the Router class into separate,
dedicated classes.
There are a few behavior changes that came as a result of discussion on
what the correct behavior should be.
In addition, many things Router was previously doing can now be provided
callback functions to alert the calling point when the asynchronous
action completes, successfully or otherwise.
pull/728/head
Thomas Winget 5 years ago
parent f154c9a0d0
commit baf8019fe5

@ -1,6 +1,11 @@
# Lowest version - android ndk 3.6.0
cmake_minimum_required(VERSION 3.6.0)
find_program(CCACHE_PROGRAM ccache)
if(CCACHE_PROGRAM)
set_property(GLOBAL PROPERTY RULE_LAUNCH_COMPILE "${CCACHE_PROGRAM}")
endif()
set(PROJECT_NAME lokinet)
project(${PROJECT_NAME} C CXX)
@ -45,11 +50,6 @@ add_definitions(-D${CMAKE_SYSTEM_NAME})
get_filename_component(CORE_INCLUDE "${CMAKE_CURRENT_SOURCE_DIR}/include" ABSOLUTE)
get_filename_component(ABYSS_INCLUDE "${CMAKE_CURRENT_SOURCE_DIR}/${ABYSS}/include" ABSOLUTE)
find_program(CCACHE_PROGRAM ccache)
if(CCACHE_PROGRAM)
set_property(GLOBAL PROPERTY RULE_LAUNCH_COMPILE "${CCACHE_PROGRAM}")
endif()
if(MSVC_VERSION)
enable_language(ASM_MASM)
list(APPEND CMAKE_ASM_MASM_SOURCE_FILE_EXTENSIONS s)

@ -173,6 +173,8 @@ set(LIB_SRC
iwp/linklayer.cpp
iwp/outermessage.cpp
iwp/iwp.cpp
link/i_link_manager.cpp
link/link_manager.cpp
link/server.cpp
link/session.cpp
messages/dht_immediate.cpp
@ -195,6 +197,12 @@ set(LIB_SRC
pow.cpp
profiling.cpp
router/abstractrouter.cpp
router/i_outbound_message_handler.cpp
router/outbound_message_handler.cpp
router/i_outbound_session_maker.cpp
router/outbound_session_maker.cpp
router/i_rc_lookup_handler.cpp
router/rc_lookup_handler.cpp
router/router.cpp
router_contact.cpp
router_id.cpp

@ -159,8 +159,6 @@ __ ___ ____ _ _ ___ _ _ ____
int
Context::LoadDatabase()
{
nodedb = std::make_unique< llarp_nodedb >(router->diskworker());
if(!llarp_nodedb::ensure_dir(nodedb_dir.c_str()))
{
llarp::LogError("nodedb_dir is incorrect");
@ -233,16 +231,21 @@ __ ___ ____ _ _ ___ _ _ ____
cryptoManager = std::make_unique< CryptoManager >(crypto.get());
router = std::make_unique< Router >(worker, mainloop, logic);
if(!router->Configure(config.get()))
nodedb = std::make_unique< llarp_nodedb >(router->diskworker());
if(!router->Configure(config.get(), nodedb.get()))
{
llarp::LogError("Failed to configure router");
return 1;
}
// must be done after router is made so we can use its disk io worker
// must also be done after configure so that netid is properly set if it
// is provided by config
if(!this->LoadDatabase())
return 1;
return 0;
}

@ -199,6 +199,20 @@ namespace llarp
return _nodes.get();
}
void
PutRCNodeAsync(const RCNode& val) override
{
auto func = std::bind(&Bucket< RCNode >::PutNode, Nodes(), val);
router->logic()->queue_func(func);
}
void
DelRCNodeAsync(const Key_t& val) override
{
auto func = std::bind(&Bucket< RCNode >::DelNode, Nodes(), val);
router->logic()->queue_func(func);
}
const Key_t&
OurKey() const override
{

@ -173,6 +173,12 @@ namespace llarp
virtual Bucket< RCNode >*
Nodes() const = 0;
virtual void
PutRCNodeAsync(const RCNode& val) = 0;
virtual void
DelRCNodeAsync(const Key_t& val) = 0;
virtual util::StatusObject
ExtractStatus() const = 0;

@ -7,19 +7,6 @@ namespace llarp
{
namespace iwp
{
std::unique_ptr< ILinkLayer >
NewServerFromRouter(AbstractRouter* r)
{
return NewServer(
r->encryption(), std::bind(&AbstractRouter::rc, r),
util::memFn(&AbstractRouter::HandleRecvLinkMessageBuffer, r),
util::memFn(&AbstractRouter::OnSessionEstablished, r),
util::memFn(&AbstractRouter::CheckRenegotiateValid, r),
util::memFn(&AbstractRouter::Sign, r),
util::memFn(&AbstractRouter::OnConnectTimeout, r),
util::memFn(&AbstractRouter::SessionClosed, r));
}
std::unique_ptr< ILinkLayer >
NewServer(const SecretKey& enckey, GetRCFunc getrc, LinkMessageHandler h,
SessionEstablishedHandler est, SessionRenegotiateHandler reneg,

@ -18,9 +18,6 @@ namespace llarp
llarp::SignBufferFunc sign, llarp::TimeoutHandler timeout,
llarp::SessionClosedHandler closed);
std::unique_ptr< ILinkLayer >
NewServerFromRouter(AbstractRouter* r);
} // namespace iwp
} // namespace llarp

@ -0,0 +1 @@
#include <link/i_link_manager.hpp>

@ -0,0 +1,85 @@
#ifndef LLARP_I_LINK_MANAGER_HPP
#define LLARP_I_LINK_MANAGER_HPP
#include <link/server.hpp>
#include <util/types.hpp>
#include <util/logic.hpp>
#include <functional>
struct llarp_buffer_t;
namespace llarp
{
using Logic_ptr = std::shared_ptr< Logic >;
struct RouterContact;
struct ILinkSession;
struct IOutboundSessionMaker;
struct RouterID;
namespace util
{
struct StatusObject;
} // namespace util
struct ILinkManager
{
virtual ~ILinkManager() = default;
virtual LinkLayer_ptr
GetCompatibleLink(const RouterContact &rc) const = 0;
virtual IOutboundSessionMaker *
GetSessionMaker() const = 0;
virtual bool
SendTo(const RouterID &remote, const llarp_buffer_t &buf) = 0;
virtual bool
HasSessionTo(const RouterID &remote) const = 0;
virtual void
PumpLinks() = 0;
virtual void
AddLink(LinkLayer_ptr link, bool inbound = false) = 0;
virtual bool
StartLinks(Logic_ptr logic) = 0;
virtual void
Stop() = 0;
virtual void
PersistSessionUntil(const RouterID &remote, llarp_time_t until) = 0;
virtual void
ForEachPeer(std::function< void(const ILinkSession *, bool) > visit,
bool randomize = false) const = 0;
virtual void
ForEachPeer(std::function< void(ILinkSession *) > visit) = 0;
virtual void
ForEachInboundLink(std::function< void(LinkLayer_ptr) > visit) const = 0;
virtual size_t
NumberOfConnectedRouters() const = 0;
virtual size_t
NumberOfConnectedClients() const = 0;
virtual bool
GetRandomConnectedRouter(RouterContact &router) const = 0;
virtual void
CheckPersistingSessions(llarp_time_t now) = 0;
virtual util::StatusObject
ExtractStatus() const = 0;
};
} // namespace llarp
#endif // LLARP_I_LINK_MANAGER_HPP

@ -0,0 +1,362 @@
#include <link/link_manager.hpp>
#include <router/i_outbound_session_maker.hpp>
#include <crypto/crypto.hpp>
#include <algorithm>
#include <set>
namespace llarp
{
LinkLayer_ptr
LinkManager::GetCompatibleLink(const RouterContact &rc) const
{
if(stopping)
return nullptr;
for(auto &link : outboundLinks)
{
// TODO: may want to add some memory of session failures for a given
// router on a given link and not return that link here for a
// duration
if(!link->IsCompatable(rc))
continue;
return link;
}
return nullptr;
}
IOutboundSessionMaker *
LinkManager::GetSessionMaker() const
{
return _sessionMaker;
}
bool
LinkManager::SendTo(const RouterID &remote, const llarp_buffer_t &buf)
{
if(stopping)
return false;
auto link = GetLinkWithSessionTo(remote);
if(link == nullptr)
{
return false;
}
return link->SendTo(remote, buf);
}
bool
LinkManager::HasSessionTo(const RouterID &remote) const
{
return GetLinkWithSessionTo(remote) != nullptr;
}
void
LinkManager::PumpLinks()
{
for(const auto &link : inboundLinks)
{
link->Pump();
}
for(const auto &link : outboundLinks)
{
link->Pump();
}
}
void
LinkManager::AddLink(LinkLayer_ptr link, bool inbound)
{
util::Lock l(&_mutex);
if(inbound)
{
inboundLinks.emplace(link);
}
else
{
outboundLinks.emplace(link);
}
}
bool
LinkManager::StartLinks(Logic_ptr logic)
{
LogInfo("starting ", outboundLinks.size(), " outbound links");
for(const auto &link : outboundLinks)
{
if(!link->Start(logic))
{
LogWarn("outbound link '", link->Name(), "' failed to start");
return false;
}
LogDebug("Outbound Link ", link->Name(), " started");
}
if(inboundLinks.size())
{
LogInfo("starting ", inboundLinks.size(), " inbound links");
for(const auto &link : inboundLinks)
{
if(!link->Start(logic))
{
LogWarn("Link ", link->Name(), " failed to start");
return false;
}
LogDebug("Inbound Link ", link->Name(), " started");
}
}
return true;
}
void
LinkManager::Stop()
{
if(stopping)
{
return;
}
util::Lock l(&_mutex);
LogInfo("stopping links");
stopping = true;
for(const auto &link : outboundLinks)
link->Stop();
for(const auto &link : inboundLinks)
link->Stop();
}
void
LinkManager::PersistSessionUntil(const RouterID &remote, llarp_time_t until)
{
if(stopping)
return;
util::Lock l(&_mutex);
m_PersistingSessions[remote] =
std::max(until, m_PersistingSessions[remote]);
LogDebug("persist session to ", remote, " until ",
m_PersistingSessions[remote]);
}
void
LinkManager::ForEachPeer(
std::function< void(const ILinkSession *, bool) > visit,
bool randomize) const
{
if(stopping)
return;
for(const auto &link : outboundLinks)
{
link->ForEachSession(
[visit](const ILinkSession *peer) { visit(peer, true); }, randomize);
}
for(const auto &link : inboundLinks)
{
link->ForEachSession(
[visit](const ILinkSession *peer) { visit(peer, false); }, randomize);
}
}
void
LinkManager::ForEachPeer(std::function< void(ILinkSession *) > visit)
{
if(stopping)
return;
for(const auto &link : outboundLinks)
{
link->ForEachSession([visit](ILinkSession *peer) { visit(peer); });
}
for(const auto &link : inboundLinks)
{
link->ForEachSession([visit](ILinkSession *peer) { visit(peer); });
}
}
void
LinkManager::ForEachInboundLink(
std::function< void(LinkLayer_ptr) > visit) const
{
for(const auto &link : inboundLinks)
{
visit(link);
}
}
size_t
LinkManager::NumberOfConnectedRouters() const
{
std::set< RouterID > connectedRouters;
auto fn = [&connectedRouters](const ILinkSession *session, bool) {
if(session->IsEstablished())
{
const RouterContact rc(session->GetRemoteRC());
if(rc.IsPublicRouter())
{
connectedRouters.insert(rc.pubkey);
}
}
};
ForEachPeer(fn);
return connectedRouters.size();
}
size_t
LinkManager::NumberOfConnectedClients() const
{
std::set< RouterID > connectedClients;
auto fn = [&connectedClients](const ILinkSession *session, bool) {
if(session->IsEstablished())
{
const RouterContact rc(session->GetRemoteRC());
if(!rc.IsPublicRouter())
{
connectedClients.insert(rc.pubkey);
}
}
};
ForEachPeer(fn);
return connectedClients.size();
}
bool
LinkManager::GetRandomConnectedRouter(RouterContact &router) const
{
std::unordered_map< RouterID, RouterContact, RouterID::Hash >
connectedRouters;
ForEachPeer(
[&connectedRouters](const ILinkSession *peer, bool unused) {
(void)unused;
connectedRouters[peer->GetPubKey()] = peer->GetRemoteRC();
},
false);
const auto sz = connectedRouters.size();
if(sz)
{
auto itr = connectedRouters.begin();
if(sz > 1)
{
std::advance(itr, randint() % sz);
}
router = itr->second;
return true;
}
return false;
}
void
LinkManager::CheckPersistingSessions(llarp_time_t now)
{
if(stopping)
return;
std::vector< RouterID > sessionsNeeded;
{
util::Lock l(&_mutex);
auto itr = m_PersistingSessions.begin();
while(itr != m_PersistingSessions.end())
{
auto link = GetLinkWithSessionTo(itr->first);
if(now < itr->second)
{
if(link)
{
LogDebug("keepalive to ", itr->first);
link->KeepAliveSessionTo(itr->first);
}
else
{
sessionsNeeded.push_back(itr->first);
}
++itr;
}
else
{
const RouterID r(itr->first);
LogInfo("commit to ", r, " expired");
itr = m_PersistingSessions.erase(itr);
}
}
}
for(const auto &router : sessionsNeeded)
{
_sessionMaker->CreateSessionTo(router, nullptr);
}
}
util::StatusObject
LinkManager::ExtractStatus() const
{
std::vector< util::StatusObject > ob_links, ib_links;
std::transform(inboundLinks.begin(), inboundLinks.end(),
std::back_inserter(ib_links),
[](const auto &link) -> util::StatusObject {
return link->ExtractStatus();
});
std::transform(outboundLinks.begin(), outboundLinks.end(),
std::back_inserter(ob_links),
[](const auto &link) -> util::StatusObject {
return link->ExtractStatus();
});
util::StatusObject obj{{"outbound", ob_links}, {"inbound", ib_links}};
return obj;
}
void
LinkManager::Init(IOutboundSessionMaker *sessionMaker)
{
stopping = false;
_sessionMaker = sessionMaker;
}
LinkLayer_ptr
LinkManager::GetLinkWithSessionTo(const RouterID &remote) const
{
if(stopping)
return nullptr;
for(const auto &link : inboundLinks)
{
if(link->HasSessionTo(remote))
{
return link;
}
}
for(const auto &link : outboundLinks)
{
if(link->HasSessionTo(remote))
{
return link;
}
}
return nullptr;
}
} // namespace llarp

@ -0,0 +1,100 @@
#ifndef LLARP_LINK_MANAGER_HPP
#define LLARP_LINK_MANAGER_HPP
#include <link/i_link_manager.hpp>
#include <util/compare_ptr.hpp>
#include <util/threading.hpp>
#include <link/server.hpp>
#include <unordered_map>
#include <set>
#include <atomic>
namespace llarp
{
struct IRouterContactManager;
struct LinkManager final : public ILinkManager
{
public:
~LinkManager() = default;
LinkLayer_ptr
GetCompatibleLink(const RouterContact &rc) const override;
IOutboundSessionMaker *
GetSessionMaker() const override;
bool
SendTo(const RouterID &remote, const llarp_buffer_t &buf) override;
bool
HasSessionTo(const RouterID &remote) const override;
void
PumpLinks() override;
void
AddLink(LinkLayer_ptr link, bool inbound = false) override;
bool
StartLinks(Logic_ptr logic) override;
void
Stop() override;
void
PersistSessionUntil(const RouterID &remote, llarp_time_t until) override;
void
ForEachPeer(std::function< void(const ILinkSession *, bool) > visit,
bool randomize = false) const override;
void
ForEachPeer(std::function< void(ILinkSession *) > visit) override;
void
ForEachInboundLink(
std::function< void(LinkLayer_ptr) > visit) const override;
size_t
NumberOfConnectedRouters() const override;
size_t
NumberOfConnectedClients() const override;
bool
GetRandomConnectedRouter(RouterContact &router) const override;
void
CheckPersistingSessions(llarp_time_t now) override;
virtual util::StatusObject
ExtractStatus() const override;
void
Init(IOutboundSessionMaker *sessionMaker);
private:
LinkLayer_ptr
GetLinkWithSessionTo(const RouterID &remote) const;
std::atomic< bool > stopping;
mutable util::Mutex _mutex; // protects m_PersistingSessions
using LinkSet = std::set< LinkLayer_ptr, ComparePtr< LinkLayer_ptr > >;
LinkSet outboundLinks;
LinkSet inboundLinks;
// sessions to persist -> timestamp to end persist at
std::unordered_map< RouterID, llarp_time_t, RouterID::Hash >
m_PersistingSessions GUARDED_BY(_mutex);
IOutboundSessionMaker *_sessionMaker;
};
} // namespace llarp
#endif // LLARP_LINK_MANAGER_HPP

@ -151,6 +151,13 @@ llarp_nodedb::UpdateAsyncIfNewer(llarp::RouterContact rc,
InsertAsync(rc, logic, completionHandler);
return true;
}
else if(itr != entries.end())
{
// insertion time is set on...insertion. But it should be updated here
// even if there is no insertion of a new RC, to show that the existing one
// is not "stale"
itr->second.inserted = llarp::time_now_ms();
}
return false;
}
@ -189,6 +196,8 @@ llarp_nodedb::Insert(const llarp::RouterContact &rc)
if(itr != entries.end())
entries.erase(itr);
entries.emplace(rc.pubkey.as_array(), rc);
LogInfo("Added or updated RC for ", llarp::RouterID(rc.pubkey),
" to nodedb. Current nodedb count is: ", entries.size());
}
return true;
}
@ -313,6 +322,26 @@ llarp_nodedb::VisitInsertedBefore(
}
}
void
llarp_nodedb::RemoveStaleRCs(const std::set< llarp::RouterID > &keep,
llarp_time_t cutoff)
{
std::set< llarp::RouterID > removeStale;
// remove stale routers
VisitInsertedBefore(
[&](const llarp::RouterContact &rc) {
if(keep.find(rc.pubkey) != keep.end())
return;
LogInfo("removing stale router: ", llarp::RouterID(rc.pubkey));
removeStale.insert(rc.pubkey);
},
cutoff);
RemoveIf([&removeStale](const llarp::RouterContact &rc) -> bool {
return removeStale.count(rc.pubkey) > 0;
});
}
/*
bool
llarp_nodedb::Save()

@ -60,7 +60,7 @@ struct llarp_nodedb
struct NetDBEntry
{
const llarp::RouterContact rc;
const llarp_time_t inserted;
llarp_time_t inserted;
NetDBEntry(const llarp::RouterContact &data);
};
@ -140,6 +140,9 @@ struct llarp_nodedb
VisitInsertedBefore(std::function< void(const llarp::RouterContact &) > visit,
llarp_time_t insertedAfter) LOCKS_EXCLUDED(access);
void
RemoveStaleRCs(const std::set< llarp::RouterID > &keep, llarp_time_t cutoff);
size_t
num_loaded() const LOCKS_EXCLUDED(access);

@ -3,10 +3,6 @@
namespace llarp
{
AbstractRouter::~AbstractRouter()
{
}
void
AbstractRouter::EnsureRouter(RouterID router, RouterLookupHandler handler)
{

@ -24,6 +24,10 @@ namespace llarp
struct Profiling;
struct SecretKey;
struct Signature;
struct IOutboundMessageHandler;
struct IOutboundSessionMaker;
struct ILinkManager;
struct I_RCLookupHandler;
namespace exit
{
@ -52,10 +56,7 @@ namespace llarp
struct AbstractRouter
{
virtual ~AbstractRouter() = 0;
virtual bool
OnSessionEstablished(ILinkSession *) = 0;
virtual ~AbstractRouter() = default;
virtual bool
HandleRecvLinkMessageBuffer(ILinkSession *from,
@ -106,11 +107,23 @@ namespace llarp
virtual const service::Context &
hiddenServiceContext() const = 0;
virtual IOutboundMessageHandler &
outboundMessageHandler() = 0;
virtual IOutboundSessionMaker &
outboundSessionMaker() = 0;
virtual ILinkManager &
linkManager() = 0;
virtual I_RCLookupHandler &
rcLookupHandler() = 0;
virtual bool
Sign(Signature &sig, const llarp_buffer_t &buf) const = 0;
virtual bool
Configure(Config *conf) = 0;
Configure(Config *conf, llarp_nodedb *nodedb) = 0;
virtual bool
Run(struct llarp_nodedb *nodedb) = 0;
@ -129,9 +142,6 @@ namespace llarp
virtual const byte_t *
pubkey() const = 0;
virtual void
OnConnectTimeout(ILinkSession *session) = 0;
/// connect to N random routers
virtual void
ConnectToRandomRouters(int N) = 0;
@ -219,11 +229,6 @@ namespace llarp
virtual bool
HasSessionTo(const RouterID &router) const = 0;
/// return true if we are currently looking up this router either directly
/// or via an anonymous endpoint
virtual bool
HasPendingRouterLookup(const RouterID &router) const = 0;
virtual util::StatusObject
ExtractStatus() const = 0;

@ -0,0 +1 @@
#include <router/i_outbound_message_handler.hpp>

@ -0,0 +1,43 @@
#ifndef LLARP_ROUTER_I_OUTBOUND_MESSAGE_HANDLER_HPP
#define LLARP_ROUTER_I_OUTBOUND_MESSAGE_HANDLER_HPP
#include <cstdint>
#include <functional>
namespace llarp
{
enum class SendStatus
{
Success,
Timeout,
NoLink,
InvalidRouter,
RouterNotFound,
Congestion
};
struct ILinkMessage;
struct RouterID;
namespace util
{
struct StatusObject;
}
using SendStatusHandler = std::function< void(SendStatus) >;
struct IOutboundMessageHandler
{
virtual ~IOutboundMessageHandler() = default;
virtual bool
QueueMessage(const RouterID &remote, const ILinkMessage *msg,
SendStatusHandler callback) = 0;
virtual util::StatusObject
ExtractStatus() const = 0;
};
} // namespace llarp
#endif // LLARP_ROUTER_I_OUTBOUND_MESSAGE_HANDLER_HPP

@ -0,0 +1 @@
#include <router/i_outbound_session_maker.hpp>

@ -0,0 +1,59 @@
#ifndef LLARP_ROUTER_I_OUTBOUND_SESSION_MAKER_HPP
#define LLARP_ROUTER_I_OUTBOUND_SESSION_MAKER_HPP
#include <util/types.hpp>
#include <functional>
namespace llarp
{
namespace util
{
struct StatusObject;
} // namespace util
struct ILinkSession;
struct RouterID;
struct RouterContact;
enum class SessionResult
{
Establish,
Timeout,
RouterNotFound,
InvalidRouter,
NoLink
};
using RouterCallback =
std::function< void(const RouterID &, const SessionResult) >;
struct IOutboundSessionMaker
{
virtual ~IOutboundSessionMaker() = default;
virtual bool
OnSessionEstablished(ILinkSession *session) = 0;
virtual void
OnConnectTimeout(ILinkSession *session) = 0;
virtual void
CreateSessionTo(const RouterID &router, RouterCallback on_result) = 0;
virtual void
CreateSessionTo(const RouterContact &rc, RouterCallback on_result) = 0;
virtual bool
HavePendingSessionTo(const RouterID &router) const = 0;
virtual void
ConnectToRandomRouters(int numDesired, llarp_time_t now) = 0;
virtual util::StatusObject
ExtractStatus() const = 0;
};
} // namespace llarp
#endif // LLARP_ROUTER_I_OUTBOUND_SESSION_MAKER_HPP

@ -0,0 +1 @@
#include <router/i_rc_lookup_handler.hpp>

@ -0,0 +1,63 @@
#ifndef LLARP_I_RC_LOOKUP_HANDLER_HPP
#define LLARP_I_RC_LOOKUP_HANDLER_HPP
#include <util/types.hpp>
#include <router_id.hpp>
#include <memory>
#include <set>
#include <vector>
namespace llarp
{
struct RouterContact;
enum class RCRequestResult
{
Success,
InvalidRouter,
RouterNotFound,
BadRC
};
using RCRequestCallback = std::function< void(
const RouterID &, const RouterContact *const, const RCRequestResult) >;
struct I_RCLookupHandler
{
virtual ~I_RCLookupHandler() = default;
virtual void
AddValidRouter(const RouterID &router) = 0;
virtual void
RemoveValidRouter(const RouterID &router) = 0;
virtual void
SetRouterWhitelist(const std::vector< RouterID > &routers) = 0;
virtual void
GetRC(const RouterID &router, RCRequestCallback callback) = 0;
virtual bool
RemoteIsAllowed(const RouterID &remote) const = 0;
virtual bool
CheckRC(const RouterContact &rc) const = 0;
virtual bool
GetRandomWhitelistRouter(RouterID &router) const = 0;
virtual bool
CheckRenegotiateValid(RouterContact newrc, RouterContact oldrc) = 0;
virtual void
PeriodicUpdate(llarp_time_t now) = 0;
virtual void
ExploreNetwork() = 0;
};
} // namespace llarp
#endif // LLARP_I_RC_LOOKUP_HANDLER_HPP

@ -0,0 +1,224 @@
#include <router/outbound_message_handler.hpp>
#include <messages/link_message.hpp>
#include <router/i_outbound_session_maker.hpp>
#include <link/i_link_manager.hpp>
#include <constants/link_layer.hpp>
#include <util/memfn.hpp>
#include <util/status.hpp>
#include <algorithm>
#include <cstdlib>
namespace llarp
{
bool
OutboundMessageHandler::QueueMessage(const RouterID &remote,
const ILinkMessage *msg,
SendStatusHandler callback)
{
std::array< byte_t, MAX_LINK_MSG_SIZE > linkmsg_buffer;
llarp_buffer_t buf(linkmsg_buffer);
if(!EncodeBuffer(msg, buf))
{
return false;
}
Message message;
message.first.reserve(buf.sz);
message.second = callback;
std::copy_n(buf.base, buf.sz, message.first.data());
if(SendIfSession(remote, message))
{
return true;
}
bool shouldCreateSession = false;
{
util::Lock l(&_mutex);
// create queue for <remote> if it doesn't exist, and get iterator
auto itr_pair = outboundMessageQueue.emplace(remote, MessageQueue());
itr_pair.first->second.push_back(std::move(message));
shouldCreateSession = itr_pair.second;
}
if(shouldCreateSession)
{
QueueSessionCreation(remote);
}
return true;
}
// TODO: this
util::StatusObject
OutboundMessageHandler::ExtractStatus() const
{
util::StatusObject status{};
return status;
}
void
OutboundMessageHandler::Init(ILinkManager *linkManager,
std::shared_ptr< Logic > logic)
{
_linkManager = linkManager;
_logic = logic;
}
void
OutboundMessageHandler::OnSessionEstablished(const RouterID &router)
{
FinalizeRequest(router, SendStatus::Success);
}
void
OutboundMessageHandler::OnConnectTimeout(const RouterID &router)
{
FinalizeRequest(router, SendStatus::Timeout);
}
void
OutboundMessageHandler::OnRouterNotFound(const RouterID &router)
{
FinalizeRequest(router, SendStatus::RouterNotFound);
}
void
OutboundMessageHandler::OnInvalidRouter(const RouterID &router)
{
FinalizeRequest(router, SendStatus::InvalidRouter);
}
void
OutboundMessageHandler::OnNoLink(const RouterID &router)
{
FinalizeRequest(router, SendStatus::NoLink);
}
void
OutboundMessageHandler::OnSessionResult(const RouterID &router,
const SessionResult result)
{
switch(result)
{
case SessionResult::Establish:
OnSessionEstablished(router);
break;
case SessionResult::Timeout:
OnConnectTimeout(router);
break;
case SessionResult::RouterNotFound:
OnRouterNotFound(router);
break;
case SessionResult::InvalidRouter:
OnInvalidRouter(router);
break;
case SessionResult::NoLink:
OnNoLink(router);
break;
default:
LogError("Impossible situation: enum class value out of bounds.");
std::abort();
break;
}
}
void
OutboundMessageHandler::DoCallback(SendStatusHandler callback,
SendStatus status)
{
if(callback)
{
auto func = std::bind(callback, status);
_logic->queue_func(func);
}
}
void
OutboundMessageHandler::QueueSessionCreation(const RouterID &remote)
{
auto fn = util::memFn(&OutboundMessageHandler::OnSessionResult, this);
_linkManager->GetSessionMaker()->CreateSessionTo(remote, fn);
}
bool
OutboundMessageHandler::EncodeBuffer(const ILinkMessage *msg,
llarp_buffer_t &buf)
{
if(!msg->BEncode(&buf))
{
LogWarn("failed to encode outbound message, buffer size left: ",
buf.size_left());
return false;
}
// set size of message
buf.sz = buf.cur - buf.base;
buf.cur = buf.base;
return true;
}
bool
OutboundMessageHandler::Send(const RouterID &remote, const Message &msg)
{
llarp_buffer_t buf(msg.first);
if(_linkManager->SendTo(remote, buf))
{
DoCallback(msg.second, SendStatus::Success);
return true;
}
DoCallback(msg.second, SendStatus::Congestion);
return false;
}
bool
OutboundMessageHandler::SendIfSession(const RouterID &remote,
const Message &msg)
{
if(_linkManager->HasSessionTo(remote))
{
return Send(remote, msg);
}
return false;
}
void
OutboundMessageHandler::FinalizeRequest(const RouterID &router,
SendStatus status)
{
MessageQueue movedMessages;
{
util::Lock l(&_mutex);
auto itr = outboundMessageQueue.find(router);
if(itr == outboundMessageQueue.end())
{
return;
}
movedMessages.splice(movedMessages.begin(), itr->second);
outboundMessageQueue.erase(itr);
}
for(const auto &msg : movedMessages)
{
if(status == SendStatus::Success)
{
Send(router, msg);
}
else
{
DoCallback(msg.second, status);
}
}
}
} // namespace llarp

@ -0,0 +1,88 @@
#ifndef LLARP_ROUTER_OUTBOUND_MESSAGE_HANDLER_HPP
#define LLARP_ROUTER_OUTBOUND_MESSAGE_HANDLER_HPP
#include <router/i_outbound_message_handler.hpp>
#include <util/threading.hpp>
#include <util/logic.hpp>
#include <router_id.hpp>
#include <list>
#include <unordered_map>
#include <utility>
struct llarp_buffer_t;
namespace llarp
{
struct ILinkManager;
struct Logic;
enum class SessionResult;
struct OutboundMessageHandler final : public IOutboundMessageHandler
{
public:
~OutboundMessageHandler() = default;
bool
QueueMessage(const RouterID &remote, const ILinkMessage *msg,
SendStatusHandler callback) override;
util::StatusObject
ExtractStatus() const override;
void
Init(ILinkManager *linkManager, std::shared_ptr< Logic > logic);
private:
using Message = std::pair< std::vector< byte_t >, SendStatusHandler >;
using MessageQueue = std::list< Message >;
void
OnSessionEstablished(const RouterID &router);
void
OnConnectTimeout(const RouterID &router);
void
OnRouterNotFound(const RouterID &router);
void
OnInvalidRouter(const RouterID &router);
void
OnNoLink(const RouterID &router);
void
OnSessionResult(const RouterID &router, const SessionResult result);
void
DoCallback(SendStatusHandler callback, SendStatus status);
void
QueueSessionCreation(const RouterID &remote);
bool
EncodeBuffer(const ILinkMessage *msg, llarp_buffer_t &buf);
bool
Send(const RouterID &remote, const Message &msg);
bool
SendIfSession(const RouterID &remote, const Message &msg);
void
FinalizeRequest(const RouterID &router, SendStatus status);
mutable util::Mutex _mutex; // protects outboundMessageQueue
std::unordered_map< RouterID, MessageQueue, RouterID::Hash >
outboundMessageQueue GUARDED_BY(_mutex);
ILinkManager *_linkManager;
std::shared_ptr< Logic > _logic;
};
} // namespace llarp
#endif // LLARP_ROUTER_OUTBOUND_MESSAGE_HANDLER_HPP

@ -0,0 +1,317 @@
#include <router/outbound_session_maker.hpp>
#include <link/server.hpp>
#include <router_contact.hpp>
#include <nodedb.hpp>
#include <router/i_rc_lookup_handler.hpp>
#include <link/i_link_manager.hpp>
#include <util/logic.hpp>
#include <util/memfn.hpp>
#include <util/threading.hpp>
#include <util/status.hpp>
#include <crypto/crypto.hpp>
namespace llarp
{
struct PendingSession
{
// TODO: add session establish status metadata, e.g. num retries
const RouterContact rc;
LinkLayer_ptr link;
size_t attemptCount = 0;
PendingSession(const RouterContact &rc, LinkLayer_ptr link)
: rc(rc), link(link)
{
}
};
bool
OutboundSessionMaker::OnSessionEstablished(ILinkSession *session)
{
// TODO: do we want to keep it
const auto router = RouterID(session->GetPubKey());
const std::string remoteType =
session->GetRemoteRC().IsPublicRouter() ? "router" : "client";
LogInfo("session with ", remoteType, " [", router, "] established");
if(not _rcLookup->RemoteIsAllowed(router))
{
FinalizeRequest(router, SessionResult::InvalidRouter);
return false;
}
auto func = std::bind(&OutboundSessionMaker::VerifyRC, this,
session->GetRemoteRC());
_threadpool->addJob(func);
return true;
}
void
OutboundSessionMaker::OnConnectTimeout(ILinkSession *session)
{
// TODO: retry/num attempts
LogWarn("Session establish attempt to ", RouterID(session->GetPubKey()),
" timed out.");
FinalizeRequest(session->GetPubKey(), SessionResult::Timeout);
}
void
OutboundSessionMaker::CreateSessionTo(const RouterID &router,
RouterCallback on_result)
{
if(on_result)
{
util::Lock l(&_mutex);
auto itr_pair = pendingCallbacks.emplace(router, CallbacksQueue{});
itr_pair.first->second.push_back(on_result);
}
if(HavePendingSessionTo(router))
{
return;
}
CreatePendingSession(router);
LogDebug("Creating session establish attempt to ", router, " .");
auto fn = util::memFn(&OutboundSessionMaker::OnRouterContactResult, this);
_rcLookup->GetRC(router, fn);
}
void
OutboundSessionMaker::CreateSessionTo(const RouterContact &rc,
RouterCallback on_result)
{
if(on_result)
{
util::Lock l(&_mutex);
auto itr_pair = pendingCallbacks.emplace(rc.pubkey, CallbacksQueue{});
itr_pair.first->second.push_back(on_result);
}
if(not HavePendingSessionTo(rc.pubkey))
{
LogDebug("Creating session establish attempt to ", rc.pubkey, " .");
CreatePendingSession(rc.pubkey);
}
GotRouterContact(rc.pubkey, rc);
}
bool
OutboundSessionMaker::HavePendingSessionTo(const RouterID &router) const
{
util::Lock l(&_mutex);
return pendingSessions.find(router) != pendingSessions.end();
}
void
OutboundSessionMaker::ConnectToRandomRouters(int numDesired, llarp_time_t now)
{
int remainingDesired = numDesired;
_nodedb->visit([&](const RouterContact &other) -> bool {
// check if we really remainingDesired to
if(other.ExpiresSoon(now, 30000)) // TODO: make delta configurable
{
return remainingDesired > 0;
}
if(!_rcLookup->RemoteIsAllowed(other.pubkey))
{
return remainingDesired > 0;
}
if(randint() % 2 == 0
&& !(_linkManager->HasSessionTo(other.pubkey)
|| HavePendingSessionTo(other.pubkey)))
{
CreateSessionTo(other, nullptr);
--remainingDesired;
}
return remainingDesired > 0;
});
LogDebug("connecting to ", numDesired - remainingDesired, " out of ",
numDesired, " random routers");
}
// TODO: this
util::StatusObject
OutboundSessionMaker::ExtractStatus() const
{
util::StatusObject status{};
return status;
}
void
OutboundSessionMaker::Init(
ILinkManager *linkManager, I_RCLookupHandler *rcLookup,
std::shared_ptr< Logic > logic, llarp_nodedb *nodedb,
std::shared_ptr< llarp::thread::ThreadPool > threadpool)
{
_linkManager = linkManager;
_rcLookup = rcLookup;
_logic = logic;
_nodedb = nodedb;
_threadpool = threadpool;
}
void
OutboundSessionMaker::DoEstablish(const RouterID &router)
{
util::ReleasableLock l(&_mutex);
auto itr = pendingSessions.find(router);
if(itr == pendingSessions.end())
{
return;
}
const auto &job = itr->second;
if(!job->link->TryEstablishTo(job->rc))
{
// TODO: maybe different failure type?
l.Release();
FinalizeRequest(router, SessionResult::NoLink);
}
}
void
OutboundSessionMaker::GotRouterContact(const RouterID &router,
const RouterContact &rc)
{
{
util::Lock l(&_mutex);
// in case other request found RC for this router after this request was
// made
auto itr = pendingSessions.find(router);
if(itr == pendingSessions.end())
{
return;
}
LinkLayer_ptr link = _linkManager->GetCompatibleLink(rc);
if(!link)
{
FinalizeRequest(router, SessionResult::NoLink);
return;
}
auto session = std::make_shared< PendingSession >(rc, link);
itr->second = session;
}
auto fn = std::bind(&OutboundSessionMaker::DoEstablish, this, router);
_logic->queue_func(fn);
}
void
OutboundSessionMaker::InvalidRouter(const RouterID &router)
{
FinalizeRequest(router, SessionResult::InvalidRouter);
}
void
OutboundSessionMaker::RouterNotFound(const RouterID &router)
{
FinalizeRequest(router, SessionResult::RouterNotFound);
}
void
OutboundSessionMaker::OnRouterContactResult(const RouterID &router,
const RouterContact *const rc,
const RCRequestResult result)
{
if(not HavePendingSessionTo(router))
{
return;
}
switch(result)
{
case RCRequestResult::Success:
if(rc)
{
GotRouterContact(router, *rc);
}
else
{
LogError("RCRequestResult::Success but null rc pointer given");
}
break;
case RCRequestResult::InvalidRouter:
InvalidRouter(router);
break;
case RCRequestResult::RouterNotFound:
RouterNotFound(router);
break;
default:
break;
}
}
void
OutboundSessionMaker::VerifyRC(const RouterContact rc)
{
if(not _rcLookup->CheckRC(rc))
{
FinalizeRequest(rc.pubkey, SessionResult::InvalidRouter);
return;
}
FinalizeRequest(rc.pubkey, SessionResult::Establish);
}
void
OutboundSessionMaker::CreatePendingSession(const RouterID &router)
{
util::Lock l(&_mutex);
pendingSessions.emplace(router, nullptr);
}
void
OutboundSessionMaker::FinalizeRequest(const RouterID &router,
const SessionResult type)
{
CallbacksQueue movedCallbacks;
{
util::Lock l(&_mutex);
// TODO: Router profiling stuff
auto itr = pendingCallbacks.find(router);
if(itr != pendingCallbacks.end())
{
movedCallbacks.splice(movedCallbacks.begin(), itr->second);
pendingCallbacks.erase(itr);
}
}
for(const auto &callback : movedCallbacks)
{
auto func = std::bind(callback, router, type);
_logic->queue_func(func);
}
{
util::Lock l(&_mutex);
pendingSessions.erase(router);
}
}
} // namespace llarp

@ -0,0 +1,103 @@
#ifndef LLARP_ROUTER_OUTBOUND_SESSION_MAKER_HPP
#define LLARP_ROUTER_OUTBOUND_SESSION_MAKER_HPP
#include <router/i_outbound_session_maker.hpp>
#include <router/i_rc_lookup_handler.hpp>
#include <util/threading.hpp>
#include <util/thread_pool.hpp>
#include <util/logic.hpp>
#include <unordered_map>
#include <list>
#include <memory>
struct llarp_nodedb;
namespace llarp
{
struct PendingSession;
struct ILinkManager;
struct I_RCLookupHandler;
struct OutboundSessionMaker final : public IOutboundSessionMaker
{
using CallbacksQueue = std::list< RouterCallback >;
public:
~OutboundSessionMaker() = default;
bool
OnSessionEstablished(ILinkSession *session) override;
void
OnConnectTimeout(ILinkSession *session) override;
void
CreateSessionTo(const RouterID &router,
RouterCallback on_result) /* override */;
void
CreateSessionTo(const RouterContact &rc,
RouterCallback on_result) /* override */;
bool
HavePendingSessionTo(const RouterID &router) const override;
void
ConnectToRandomRouters(int numDesired, llarp_time_t now) override;
util::StatusObject
ExtractStatus() const override;
void
Init(ILinkManager *linkManager, I_RCLookupHandler *rcLookup,
std::shared_ptr< Logic > logic, llarp_nodedb *nodedb,
std::shared_ptr< llarp::thread::ThreadPool > threadpool);
private:
void
DoEstablish(const RouterID &router);
void
GotRouterContact(const RouterID &router, const RouterContact &rc);
void
InvalidRouter(const RouterID &router);
void
RouterNotFound(const RouterID &router);
void
OnRouterContactResult(const RouterID &router, const RouterContact *const rc,
const RCRequestResult result);
void
VerifyRC(const RouterContact rc);
void
CreatePendingSession(const RouterID &router);
void
FinalizeRequest(const RouterID &router, const SessionResult type);
mutable util::Mutex _mutex; // protects pendingSessions, pendingCallbacks
std::unordered_map< RouterID, std::shared_ptr< PendingSession >,
RouterID::Hash >
pendingSessions GUARDED_BY(_mutex);
std::unordered_map< RouterID, CallbacksQueue, RouterID::Hash >
pendingCallbacks GUARDED_BY(_mutex);
ILinkManager *_linkManager;
I_RCLookupHandler *_rcLookup;
std::shared_ptr< Logic > _logic;
llarp_nodedb *_nodedb;
std::shared_ptr< llarp::thread::ThreadPool > _threadpool;
};
} // namespace llarp
#endif // LLARP_ROUTER_OUTBOUND_SESSION_MAKER_HPP

@ -0,0 +1,337 @@
#include <router/rc_lookup_handler.hpp>
#include <link/i_link_manager.hpp>
#include <link/server.hpp>
#include <crypto/crypto.hpp>
#include <service/context.hpp>
#include <router_contact.hpp>
#include <util/memfn.hpp>
#include <util/types.hpp>
#include <util/threading.hpp>
#include <nodedb.hpp>
#include <dht/context.hpp>
#include <iterator>
#include <functional>
namespace llarp
{
void
RCLookupHandler::AddValidRouter(const RouterID &router)
{
util::Lock l(&_mutex);
whitelistRouters.insert(router);
}
void
RCLookupHandler::RemoveValidRouter(const RouterID &router)
{
util::Lock l(&_mutex);
whitelistRouters.erase(router);
}
void
RCLookupHandler::SetRouterWhitelist(const std::vector< RouterID > &routers)
{
util::Lock l(&_mutex);
whitelistRouters.clear();
for(auto &router : routers)
{
whitelistRouters.emplace(router);
}
LogInfo("lokinet service node list now has ", whitelistRouters.size(),
" routers");
}
void
RCLookupHandler::GetRC(const RouterID &router, RCRequestCallback callback)
{
RouterContact remoteRC;
if(_nodedb->Get(router, remoteRC))
{
if(callback)
{
callback(router, &remoteRC, RCRequestResult::Success);
}
FinalizeRequest(router, &remoteRC, RCRequestResult::Success);
return;
}
bool shouldDoLookup = false;
{
util::Lock l(&_mutex);
auto itr_pair = pendingCallbacks.emplace(router, CallbacksQueue{});
if(callback)
{
itr_pair.first->second.push_back(callback);
}
shouldDoLookup = itr_pair.second;
}
if(shouldDoLookup)
{
auto fn = std::bind(&RCLookupHandler::HandleDHTLookupResult, this, router,
std::placeholders::_1);
// if we are a client try using the hidden service endpoints
if(!isServiceNode)
{
bool sent = false;
LogInfo("Lookup ", router, " anonymously");
_hiddenServiceContext->ForEachService(
[&](const std::string &,
const std::shared_ptr< service::Endpoint > &ep) -> bool {
const bool success = ep->LookupRouterAnon(router, fn);
sent = sent || success;
return !success;
});
if(sent)
return;
LogWarn("cannot lookup ", router, " anonymously");
}
if(!_dht->impl->LookupRouter(router, fn))
{
FinalizeRequest(router, nullptr, RCRequestResult::RouterNotFound);
}
}
}
bool
RCLookupHandler::RemoteIsAllowed(const RouterID &remote) const
{
if(_strictConnectPubkeys.size() && _strictConnectPubkeys.count(remote) == 0
&& !RemoteInBootstrap(remote))
{
return false;
}
util::Lock l(&_mutex);
if(useWhitelist && whitelistRouters.find(remote) == whitelistRouters.end())
{
return false;
}
return true;
}
bool
RCLookupHandler::CheckRC(const RouterContact &rc) const
{
if(not RemoteIsAllowed(rc.pubkey))
{
_dht->impl->DelRCNodeAsync(dht::Key_t{rc.pubkey});
return false;
}
if(not rc.Verify(_dht->impl->Now()))
{
return false;
}
// update nodedb if required
if(rc.IsPublicRouter())
{
LogInfo("Adding or updating RC for ", RouterID(rc.pubkey),
" to nodedb and dht.");
_nodedb->UpdateAsyncIfNewer(rc);
_dht->impl->PutRCNodeAsync(rc);
}
return true;
}
bool
RCLookupHandler::GetRandomWhitelistRouter(RouterID &router) const
{
util::Lock l(&_mutex);
const auto sz = whitelistRouters.size();
auto itr = whitelistRouters.begin();
if(sz == 0)
return false;
if(sz > 1)
std::advance(itr, randint() % sz);
router = *itr;
return true;
}
bool
RCLookupHandler::CheckRenegotiateValid(RouterContact newrc,
RouterContact oldrc)
{
// missmatch of identity ?
if(newrc.pubkey != oldrc.pubkey)
return false;
if(!RemoteIsAllowed(newrc.pubkey))
return false;
auto func = std::bind(&RCLookupHandler::CheckRC, this, newrc);
_threadpool->addJob(func);
// update dht if required
if(_dht->impl->Nodes()->HasNode(dht::Key_t{newrc.pubkey}))
{
_dht->impl->Nodes()->PutNode(newrc);
}
// TODO: check for other places that need updating the RC
return true;
}
void
RCLookupHandler::PeriodicUpdate(llarp_time_t now)
{
// try looking up stale routers
std::set< RouterID > routersToLookUp;
_nodedb->VisitInsertedBefore(
[&](const RouterContact &rc) {
if(HavePendingLookup(rc.pubkey))
return;
routersToLookUp.insert(rc.pubkey);
},
now - RouterContact::UpdateInterval);
for(const auto &router : routersToLookUp)
{
GetRC(router, nullptr);
}
_nodedb->RemoveStaleRCs(_bootstrapRouterIDList,
now - RouterContact::StaleInsertionAge);
}
void
RCLookupHandler::ExploreNetwork()
{
if(_bootstrapRCList.size())
{
for(const auto &rc : _bootstrapRCList)
{
LogInfo("Doing explore via bootstrap node: ", RouterID(rc.pubkey));
_dht->impl->ExploreNetworkVia(dht::Key_t{rc.pubkey});
}
}
else
{
LogError("we have no bootstrap nodes specified");
}
// TODO: only explore via random subset
// explore via every connected peer
_linkManager->ForEachPeer([&](ILinkSession *s) {
if(!s->IsEstablished())
return;
const RouterContact rc = s->GetRemoteRC();
if(rc.IsPublicRouter()
&& (_bootstrapRCList.find(rc) == _bootstrapRCList.end()))
{
LogInfo("Doing explore via public node: ", RouterID(rc.pubkey));
_dht->impl->ExploreNetworkVia(dht::Key_t{rc.pubkey});
}
});
}
void
RCLookupHandler::Init(llarp_dht_context *dht, llarp_nodedb *nodedb,
std::shared_ptr< llarp::thread::ThreadPool > threadpool,
ILinkManager *linkManager,
service::Context *hiddenServiceContext,
const std::set< RouterID > &strictConnectPubkeys,
const std::set< RouterContact > &bootstrapRCList,
bool useWhitelist_arg, bool isServiceNode_arg)
{
_dht = dht;
_nodedb = nodedb;
_threadpool = threadpool;
_hiddenServiceContext = hiddenServiceContext;
_strictConnectPubkeys = strictConnectPubkeys;
_bootstrapRCList = bootstrapRCList;
_linkManager = linkManager;
useWhitelist = useWhitelist_arg;
isServiceNode = isServiceNode_arg;
for(const auto &rc : _bootstrapRCList)
{
_bootstrapRouterIDList.insert(rc.pubkey);
}
}
void
RCLookupHandler::HandleDHTLookupResult(
RouterID remote, const std::vector< RouterContact > &results)
{
if(not results.size())
{
FinalizeRequest(remote, nullptr, RCRequestResult::RouterNotFound);
return;
}
if(not RemoteIsAllowed(remote))
{
FinalizeRequest(remote, &results[0], RCRequestResult::InvalidRouter);
return;
}
if(not CheckRC(results[0]))
{
FinalizeRequest(remote, &results[0], RCRequestResult::BadRC);
return;
}
FinalizeRequest(remote, &results[0], RCRequestResult::Success);
}
bool
RCLookupHandler::HavePendingLookup(RouterID remote) const
{
return pendingCallbacks.find(remote) != pendingCallbacks.end();
}
bool
RCLookupHandler::RemoteInBootstrap(const RouterID &remote) const
{
for(const auto &rc : _bootstrapRCList)
{
if(rc.pubkey == remote)
{
return true;
}
}
return false;
}
void
RCLookupHandler::FinalizeRequest(const RouterID &router,
const RouterContact *const rc,
RCRequestResult result)
{
CallbacksQueue movedCallbacks;
{
util::Lock l(&_mutex);
auto itr = pendingCallbacks.find(router);
if(itr != pendingCallbacks.end())
{
movedCallbacks.splice(movedCallbacks.begin(), itr->second);
pendingCallbacks.erase(itr);
}
} // lock
for(const auto &callback : movedCallbacks)
{
callback(router, rc, result);
}
}
} // namespace llarp

@ -0,0 +1,112 @@
#ifndef LLARP_RC_LOOKUP_HANDLER_HPP
#define LLARP_RC_LOOKUP_HANDLER_HPP
#include <router/i_rc_lookup_handler.hpp>
#include <util/threading.hpp>
#include <util/thread_pool.hpp>
#include <unordered_map>
#include <set>
#include <list>
struct llarp_nodedb;
struct llarp_dht_context;
namespace llarp
{
namespace service
{
struct Context;
} // namespace service
struct ILinkManager;
struct RCLookupHandler final : public I_RCLookupHandler
{
public:
using CallbacksQueue = std::list< RCRequestCallback >;
~RCLookupHandler() = default;
void
AddValidRouter(const RouterID &router) override;
void
RemoveValidRouter(const RouterID &router) override;
void
SetRouterWhitelist(const std::vector< RouterID > &routers) override;
void
GetRC(const RouterID &router, RCRequestCallback callback) override;
bool
RemoteIsAllowed(const RouterID &remote) const override;
bool
CheckRC(const RouterContact &rc) const override;
bool
GetRandomWhitelistRouter(RouterID &router) const override;
bool
CheckRenegotiateValid(RouterContact newrc, RouterContact oldrc) override;
void
PeriodicUpdate(llarp_time_t now) override;
void
ExploreNetwork() override;
void
Init(llarp_dht_context *dht, llarp_nodedb *nodedb,
std::shared_ptr< llarp::thread::ThreadPool > threadpool,
ILinkManager *linkManager, service::Context *hiddenServiceContext,
const std::set< RouterID > &strictConnectPubkeys,
const std::set< RouterContact > &bootstrapRCList,
bool useWhitelist_arg, bool isServiceNode_arg);
private:
void
HandleDHTLookupResult(RouterID remote,
const std::vector< RouterContact > &results);
bool
HavePendingLookup(RouterID remote) const;
bool
RemoteInBootstrap(const RouterID &remote) const;
void
FinalizeRequest(const RouterID &router, const RouterContact *const rc,
RCRequestResult result);
mutable util::Mutex _mutex; // protects pendingCallbacks, whitelistRouters
llarp_dht_context *_dht = nullptr;
llarp_nodedb *_nodedb = nullptr;
std::shared_ptr< llarp::thread::ThreadPool > _threadpool = nullptr;
service::Context *_hiddenServiceContext = nullptr;
ILinkManager *_linkManager = nullptr;
/// explicit whitelist of routers we will connect to directly (not for
/// service nodes)
std::set< RouterID > _strictConnectPubkeys;
std::set< RouterContact > _bootstrapRCList;
std::set< RouterID > _bootstrapRouterIDList;
std::unordered_map< RouterID, CallbacksQueue, RouterID::Hash >
pendingCallbacks GUARDED_BY(_mutex);
bool useWhitelist = false;
bool isServiceNode = false;
std::set< RouterID > whitelistRouters GUARDED_BY(_mutex);
};
} // namespace llarp
#endif // LLARP_RC_LOOKUP_HANDLER_HPP

File diff suppressed because it is too large Load Diff

@ -25,6 +25,10 @@
#include <util/status.hpp>
#include <util/str.hpp>
#include <util/threadpool.h>
#include <router/outbound_message_handler.hpp>
#include <router/outbound_session_maker.hpp>
#include <link/link_manager.hpp>
#include <router/rc_lookup_handler.hpp>
#include <functional>
#include <list>
@ -50,8 +54,6 @@ bool
llarp_loadServiceNodeIdentityKey(const fs::path &fpath,
llarp::SecretKey &secretkey);
struct TryConnectJob;
namespace llarp
{
struct Router final : public AbstractRouter
@ -191,9 +193,6 @@ namespace llarp
bool
Sign(Signature &sig, const llarp_buffer_t &buf) const override;
// buffer for serializing link messages
std::array< byte_t, MAX_LINK_MSG_SIZE > linkmsg_buffer;
uint16_t m_OutboundPort = 0;
/// always maintain this many connections to other routers
@ -233,10 +232,6 @@ namespace llarp
/// default network config for default network interface
NetConfig_t netConfig;
/// identity keys whitelist of routers we will connect to directly (not for
/// service nodes)
std::set< RouterID > strictConnectPubkeys;
/// bootstrap RCs
std::set< RouterContact > bootstrapRCList;
@ -267,55 +262,47 @@ namespace llarp
std::string lokidRPCUser;
std::string lokidRPCPassword;
using LinkSet = std::set< LinkLayer_ptr, ComparePtr< LinkLayer_ptr > >;
LinkSet outboundLinks;
LinkSet inboundLinks;
Profiling _routerProfiling;
std::string routerProfilesFile = "profiles.dat";
using MessageQueue = std::queue< std::vector< byte_t > >;
OutboundMessageHandler _outboundMessageHandler;
OutboundSessionMaker _outboundSessionMaker;
LinkManager _linkManager;
RCLookupHandler _rcLookupHandler;
/// outbound message queue
std::unordered_map< RouterID, MessageQueue, RouterID::Hash >
outboundMessageQueue;
/// loki verified routers
std::unordered_map< RouterID, RouterContact, RouterID::Hash > validRouters;
// pending establishing session with routers
std::unordered_map< RouterID, std::shared_ptr< TryConnectJob >,
RouterID::Hash >
pendingEstablishJobs;
IOutboundMessageHandler &
outboundMessageHandler() override
{
return _outboundMessageHandler;
}
// pending RCs to be verified by pubkey
std::unordered_map< RouterID, llarp_async_verify_rc, RouterID::Hash >
pendingVerifyRC;
IOutboundSessionMaker &
outboundSessionMaker() override
{
return _outboundSessionMaker;
}
// sessions to persist -> timestamp to end persist at
std::unordered_map< RouterID, llarp_time_t, RouterID::Hash >
m_PersistingSessions;
ILinkManager &
linkManager() override
{
return _linkManager;
}
// lokinet routers from lokid, maps pubkey to when we think it will expire,
// set to max value right now
std::unordered_map< RouterID, llarp_time_t, PubKey::Hash > lokinetRouters;
I_RCLookupHandler &
rcLookupHandler() override
{
return _rcLookupHandler;
}
Router(std::shared_ptr< llarp::thread::ThreadPool > worker,
llarp_ev_loop_ptr __netloop, std::shared_ptr< Logic > logic);
~Router();
bool
OnSessionEstablished(ILinkSession *) override;
bool
HandleRecvLinkMessageBuffer(ILinkSession *from,
const llarp_buffer_t &msg) override;
void
AddLink(std::shared_ptr< ILinkLayer > link, bool outbound = false);
bool
InitOutboundLinks();
@ -341,10 +328,7 @@ namespace llarp
AddHiddenService(const service::Config::section_t &config);
bool
Configure(Config *conf) override;
bool
Ready();
Configure(Config *conf, llarp_nodedb *nodedb = nullptr) override;
bool
Run(struct llarp_nodedb *nodedb) override;
@ -381,15 +365,6 @@ namespace llarp
return seckey_topublic(_identity);
}
void
OnConnectTimeout(ILinkSession *session) override;
bool
HasPendingConnectJob(const RouterID &remote);
bool
HasPendingRouterLookup(const RouterID &remote) const override;
void
try_connect(fs::path rcfile);
@ -397,6 +372,9 @@ namespace llarp
bool
Reconfigure(Config *conf) override;
bool
TryConnectAsync(RouterContact rc, uint16_t tries) override;
/// validate new configuration against old one
/// return true on 100% valid
/// return false if not 100% valid
@ -411,37 +389,6 @@ namespace llarp
bool
SendToOrQueue(const RouterID &remote, const ILinkMessage *msg) override;
/// sendto or drop
void
SendTo(RouterID remote, const ILinkMessage *msg, ILinkLayer *chosen);
/// manually flush outbound message queue for just 1 router
void
FlushOutboundFor(RouterID remote, ILinkLayer *chosen = nullptr);
void
LookupRouter(RouterID remote, RouterLookupHandler handler) override;
/// manually discard all pending messages to remote router
void
DiscardOutboundFor(const RouterID &remote);
/// try establishing a session to a remote router
void
TryEstablishTo(const RouterID &remote);
/// lookup a router by pubkey when it expires
void
LookupRouterWhenExpired(RouterID remote);
void
HandleDHTLookupForExplore(
RouterID remote, const std::vector< RouterContact > &results) override;
void
HandleRouterLookupForExpireUpdate(
RouterID remote, const std::vector< RouterContact > &results);
void
ForEachPeer(std::function< void(const ILinkSession *, bool) > visit,
bool randomize = false) const override;
@ -458,10 +405,6 @@ namespace llarp
bool
CheckRenegotiateValid(RouterContact newRc, RouterContact oldRC) override;
/// flush outbound message queue
void
FlushOutbound();
/// called by link when a remote session has no more sessions open
void
SessionClosed(RouterID remote) override;
@ -481,9 +424,6 @@ namespace llarp
void
ScheduleTicker(uint64_t i = 1000);
ILinkLayer *
GetLinkWithSessionByPubkey(const RouterID &remote);
/// parse a routing message in a buffer and handle it with a handler if
/// successful parsing return true on parse and handle success otherwise
/// return false
@ -503,52 +443,28 @@ namespace llarp
size_t
NumberOfConnectedClients() const override;
/// count unique router id's given filter to match session
size_t
NumberOfRoutersMatchingFilter(
std::function< bool(const ILinkSession *) > filter) const;
/// count the number of connections that match filter
size_t
NumberOfConnectionsMatchingFilter(
std::function< bool(const ILinkSession *) > filter) const;
bool
TryConnectAsync(RouterContact rc, uint16_t tries) override;
bool
GetRandomConnectedRouter(RouterContact &result) const override;
bool
async_verify_RC(const RouterContact rc);
void
HandleDHTLookupForExplore(
RouterID remote, const std::vector< RouterContact > &results) override;
void
HandleDHTLookupForSendTo(RouterID remote,
const std::vector< RouterContact > &results);
LookupRouter(RouterID remote, RouterLookupHandler resultHandler) override;
bool
HasSessionTo(const RouterID &remote) const override;
void
HandleDHTLookupForTryEstablishTo(
RouterID remote, const std::vector< RouterContact > &results);
static void
on_verify_client_rc(llarp_async_verify_rc *context);
static void
on_verify_server_rc(llarp_async_verify_rc *context);
static void
handle_router_ticker(void *user, uint64_t orig, uint64_t left);
static void
HandleAsyncLoadRCForSendTo(llarp_async_load_rc *async);
private:
std::atomic< bool > _stopping;
std::atomic< bool > _running;
bool m_isServiceNode = false;
llarp_time_t m_LastStatsReport = 0;
bool
@ -572,6 +488,9 @@ namespace llarp
bool
FromConfig(Config *conf);
void
MessageSent(const RouterID &remote, SendStatus status);
};
} // namespace llarp

@ -32,12 +32,10 @@ namespace llarp
/// 1 day for real network
llarp_time_t RouterContact::Lifetime = 24 * 60 * 60 * 1000;
#endif
/// every 30 minutes an RC is stale and needs updating
llarp_time_t RouterContact::UpdateInterval = 30 * 60 * 1000;
// 1 minute window for update
llarp_time_t RouterContact::UpdateWindow = 60 * 1000;
// try 5 times to update
int RouterContact::UpdateTries = 5;
/// update RCs every 5 minutes
llarp_time_t RouterContact::UpdateInterval = 5 * 60 * 1000;
/// an RC inserted long enough ago (30 min) is considered stale and is removed
llarp_time_t RouterContact::StaleInsertionAge = 30 * 60 * 1000;
NetID::NetID(const byte_t *val) : AlignedBuffer< 8 >()
{

@ -70,8 +70,7 @@ namespace llarp
static llarp_time_t Lifetime;
static llarp_time_t UpdateInterval;
static llarp_time_t UpdateWindow;
static int UpdateTries;
static llarp_time_t StaleInsertionAge;
RouterContact()
{

@ -35,9 +35,10 @@ namespace llarp
}
};
using Mutex = absl::Mutex;
using Lock = absl::MutexLock;
using Condition = absl::CondVar;
using Mutex = absl::Mutex;
using Lock = absl::MutexLock;
using ReleasableLock = absl::ReleasableMutexLock;
using Condition = absl::CondVar;
class Semaphore
{

@ -18,19 +18,6 @@ namespace llarp
reneg, timeout, closed);
}
LinkLayer_ptr
NewServerFromRouter(AbstractRouter* r)
{
return NewServer(
r->encryption(), util::memFn(&AbstractRouter::rc, r),
util::memFn(&AbstractRouter::HandleRecvLinkMessageBuffer, r),
util::memFn(&AbstractRouter::OnSessionEstablished, r),
util::memFn(&AbstractRouter::CheckRenegotiateValid, r),
util::memFn(&AbstractRouter::Sign, r),
util::memFn(&AbstractRouter::OnConnectTimeout, r),
util::memFn(&AbstractRouter::SessionClosed, r));
}
} // namespace utp
} // namespace llarp

@ -15,9 +15,6 @@ namespace llarp
LinkMessageHandler h, SessionEstablishedHandler est,
SessionRenegotiateHandler reneg, SignBufferFunc sign,
TimeoutHandler timeout, SessionClosedHandler closed);
LinkLayer_ptr
NewServerFromRouter(AbstractRouter* r);
} // namespace utp
} // namespace llarp

@ -117,6 +117,8 @@ namespace llarp
MOCK_METHOD0(AllowTransit, bool&());
MOCK_CONST_METHOD0(Nodes, dht::Bucket< dht::RCNode >*());
MOCK_METHOD1(PutRCNodeAsync, void(const dht::RCNode& val));
MOCK_METHOD1(DelRCNodeAsync, void(const dht::Key_t& val));
};
} // namespace test

Loading…
Cancel
Save