Refactor well named functionality in service::Endpoint into new struct

pull/572/head
Michael 5 years ago
parent c96b31ea5d
commit 725ee293c1
No known key found for this signature in database
GPG Key ID: 2D51757B47E2434C

@ -217,6 +217,7 @@ set(LIB_SRC
service/async_key_exchange.cpp
service/config.cpp
service/context.cpp
service/endpoint_util.cpp
service/endpoint.cpp
service/handler.cpp
service/hidden_service_address_lookup.cpp

@ -10,6 +10,7 @@
#include <nodedb.hpp>
#include <profiling.hpp>
#include <router/abstractrouter.hpp>
#include <service/endpoint_util.hpp>
#include <service/hidden_service_address_lookup.hpp>
#include <service/outbound_context.hpp>
#include <service/protocol.hpp>
@ -147,12 +148,8 @@ namespace llarp
void
Endpoint::FlushSNodeTraffic()
{
auto itr = m_SNodeSessions.begin();
while(itr != m_SNodeSessions.end())
{
itr->second->Flush();
++itr;
}
std::for_each(m_SNodeSessions.begin(), m_SNodeSessions.end(),
[](auto& x) { x.second->Flush(); });
}
util::StatusObject
@ -210,53 +207,11 @@ namespace llarp
}
// expire snode sessions
{
auto itr = m_SNodeSessions.begin();
while(itr != m_SNodeSessions.end())
{
if(itr->second->ShouldRemove() && itr->second->IsStopped())
{
itr = m_SNodeSessions.erase(itr);
continue;
}
// expunge next tick
if(itr->second->IsExpired(now))
itr->second->Stop();
++itr;
}
}
EndpointUtil::ExpireSNodeSessions(now, m_SNodeSessions);
// expire pending tx
{
auto itr = m_PendingLookups.begin();
while(itr != m_PendingLookups.end())
{
if(itr->second->IsTimedOut(now))
{
std::unique_ptr< IServiceLookup > lookup = std::move(itr->second);
LogInfo(lookup->name, " timed out txid=", lookup->txid);
lookup->HandleResponse({});
itr = m_PendingLookups.erase(itr);
}
else
++itr;
}
}
EndpointUtil::ExpirePendingTx(now, m_PendingLookups);
// expire pending router lookups
{
auto itr = m_PendingRouters.begin();
while(itr != m_PendingRouters.end())
{
if(itr->second.IsExpired(now))
{
LogInfo("lookup for ", itr->first, " timed out");
itr = m_PendingRouters.erase(itr);
}
else
++itr;
}
}
EndpointUtil::ExpirePendingRouterLookups(now, m_PendingRouters);
// prefetch addrs
for(const auto& addr : m_PrefetchAddrs)
@ -265,8 +220,8 @@ namespace llarp
{
if(!EnsurePathToService(
addr,
[](__attribute__((unused)) Address addr,
__attribute__((unused)) OutboundContext* ctx) {},
[](ABSL_ATTRIBUTE_UNUSED Address addr,
ABSL_ATTRIBUTE_UNUSED OutboundContext* ctx) {},
10000))
{
LogWarn("failed to ensure path to ", addr);
@ -316,57 +271,20 @@ namespace llarp
#endif
// deregister dead sessions
{
auto itr = m_DeadSessions.begin();
while(itr != m_DeadSessions.end())
{
if(itr->second->IsDone(now))
itr = m_DeadSessions.erase(itr);
else
++itr;
}
}
EndpointUtil::DeregisterDeadSessions(now, m_DeadSessions);
// tick remote sessions
{
auto itr = m_RemoteSessions.begin();
while(itr != m_RemoteSessions.end())
{
if(itr->second->Tick(now))
{
itr->second->Stop();
m_DeadSessions.emplace(itr->first, std::move(itr->second));
itr = m_RemoteSessions.erase(itr);
}
else
++itr;
}
}
EndpointUtil::TickRemoteSessions(now, m_RemoteSessions, m_DeadSessions);
// expire convotags
{
auto itr = m_Sessions.begin();
while(itr != m_Sessions.end())
{
if(itr->second.IsExpired(now))
itr = m_Sessions.erase(itr);
else
++itr;
}
}
EndpointUtil::ExpireConvoSessions(now, m_Sessions);
}
bool
Endpoint::Stop()
{
// stop remote sessions
for(auto& item : m_RemoteSessions)
{
item.second->Stop();
}
EndpointUtil::StopRemoteSessions(m_RemoteSessions);
// stop snode sessions
for(auto& item : m_SNodeSessions)
{
item.second->Stop();
}
EndpointUtil::StopSnodeSessions(m_SNodeSessions);
return path::Builder::Stop();
}
@ -388,15 +306,7 @@ namespace llarp
bool
Endpoint::HasPathToService(const Address& addr) const
{
auto range = m_RemoteSessions.equal_range(addr);
Sessions::const_iterator itr = range.first;
while(itr != range.second)
{
if(itr->second->ReadyToSend())
return true;
++itr;
}
return false;
return EndpointUtil::HasPathToService(addr, m_RemoteSessions);
}
void
@ -958,8 +868,7 @@ namespace llarp
bool
Endpoint::EnsurePathToService(const Address& remote, PathEnsureHook hook,
__attribute__((unused))
llarp_time_t timeoutMS,
ABSL_ATTRIBUTE_UNUSED llarp_time_t timeoutMS,
bool randomPath)
{
path::Path* path = nullptr;

@ -332,6 +332,8 @@ namespace llarp
std::unique_ptr< exit::BaseSession > m_Exit;
private:
friend struct EndpointUtil;
AbstractRouter* m_Router;
llarp_threadpool* m_IsolatedWorker = nullptr;
Logic* m_IsolatedLogic = nullptr;
@ -387,8 +389,9 @@ namespace llarp
}
};
std::unordered_map< RouterID, RouterLookupJob, RouterID::Hash >
m_PendingRouters;
using PendingRouters =
std::unordered_map< RouterID, RouterLookupJob, RouterID::Hash >;
PendingRouters m_PendingRouters;
uint64_t m_CurrentPublishTX = 0;
llarp_time_t m_LastPublish = 0;
@ -397,8 +400,10 @@ namespace llarp
/// our introset
service::IntroSet m_IntroSet;
/// pending remote service lookups by id
std::unordered_map< uint64_t, std::unique_ptr< service::IServiceLookup > >
m_PendingLookups;
using PendingLookups =
std::unordered_map< uint64_t,
std::unique_ptr< service::IServiceLookup > >;
PendingLookups m_PendingLookups;
/// prefetch remote address list
std::set< Address > m_PrefetchAddrs;
/// hidden service tag
@ -409,10 +414,9 @@ namespace llarp
std::list< std::function< bool(void) > > m_OnInit;
/// conversations
using ConvoMap_t =
std::unordered_map< ConvoTag, Session, ConvoTag::Hash >;
using ConvoMap = std::unordered_map< ConvoTag, Session, ConvoTag::Hash >;
ConvoMap_t m_Sessions;
ConvoMap m_Sessions;
std::unordered_map< Tag, CachedTagResult, Tag::Hash > m_PrefetchedTags;
};

@ -0,0 +1,153 @@
#include <service/endpoint_util.hpp>
#include <service/outbound_context.hpp>
#include <util/logger.hpp>
namespace llarp
{
namespace service
{
void
EndpointUtil::ExpireSNodeSessions(llarp_time_t now,
Endpoint::SNodeSessions& sessions)
{
auto itr = sessions.begin();
while(itr != sessions.end())
{
if(itr->second->ShouldRemove() && itr->second->IsStopped())
{
itr = sessions.erase(itr);
continue;
}
// expunge next tick
if(itr->second->IsExpired(now))
{
itr->second->Stop();
}
++itr;
}
}
void
EndpointUtil::ExpirePendingTx(llarp_time_t now,
Endpoint::PendingLookups& lookups)
{
for(auto itr = lookups.begin(); itr != lookups.end();)
{
if(!itr->second->IsTimedOut(now))
{
++itr;
continue;
}
std::unique_ptr< IServiceLookup > lookup = std::move(itr->second);
LogInfo(lookup->name, " timed out txid=", lookup->txid);
lookup->HandleResponse({});
itr = lookups.erase(itr);
}
}
void
EndpointUtil::ExpirePendingRouterLookups(llarp_time_t now,
Endpoint::PendingRouters& routers)
{
for(auto itr = routers.begin(); itr != routers.end();)
{
if(!itr->second.IsExpired(now))
{
++itr;
continue;
}
LogInfo("lookup for ", itr->first, " timed out");
itr = routers.erase(itr);
}
}
void
EndpointUtil::DeregisterDeadSessions(llarp_time_t now,
Endpoint::Sessions& sessions)
{
auto itr = sessions.begin();
while(itr != sessions.end())
{
if(itr->second->IsDone(now))
{
itr = sessions.erase(itr);
}
else
{
++itr;
}
}
}
void
EndpointUtil::TickRemoteSessions(llarp_time_t now,
Endpoint::Sessions& remoteSessions,
Endpoint::Sessions& deadSessions)
{
auto itr = remoteSessions.begin();
while(itr != remoteSessions.end())
{
if(itr->second->Tick(now))
{
itr->second->Stop();
deadSessions.emplace(std::move(*itr));
itr = remoteSessions.erase(itr);
}
else
{
++itr;
}
}
}
void
EndpointUtil::ExpireConvoSessions(llarp_time_t now,
Endpoint::ConvoMap& sessions)
{
auto itr = sessions.begin();
while(itr != sessions.end())
{
if(itr->second.IsExpired(now))
itr = sessions.erase(itr);
else
++itr;
}
}
void
EndpointUtil::StopRemoteSessions(Endpoint::Sessions& remoteSessions)
{
for(auto& item : remoteSessions)
{
item.second->Stop();
}
}
void
EndpointUtil::StopSnodeSessions(Endpoint::SNodeSessions& sessions)
{
for(auto& item : sessions)
{
item.second->Stop();
}
}
bool
EndpointUtil::HasPathToService(const Address& addr,
const Endpoint::Sessions& remoteSessions)
{
auto range = remoteSessions.equal_range(addr);
auto itr = range.first;
while(itr != range.second)
{
if(itr->second->ReadyToSend())
return true;
++itr;
}
return false;
}
} // namespace service
} // namespace llarp

@ -0,0 +1,46 @@
#ifndef LLARP_SERVICE_ENDPOINT_UTIL_HPP
#define LLARP_SERVICE_ENDPOINT_UTIL_HPP
#include <service/endpoint.hpp>
namespace llarp
{
namespace service
{
struct EndpointUtil
{
static void
ExpireSNodeSessions(llarp_time_t now, Endpoint::SNodeSessions& sessions);
static void
ExpirePendingTx(llarp_time_t now, Endpoint::PendingLookups& lookups);
static void
ExpirePendingRouterLookups(llarp_time_t now,
Endpoint::PendingRouters& routers);
static void
DeregisterDeadSessions(llarp_time_t now, Endpoint::Sessions& sessions);
static void
TickRemoteSessions(llarp_time_t now, Endpoint::Sessions& remoteSessions,
Endpoint::Sessions& deadSessions);
static void
ExpireConvoSessions(llarp_time_t now, Endpoint::ConvoMap& sessions);
static void
StopRemoteSessions(Endpoint::Sessions& remoteSessions);
static void
StopSnodeSessions(Endpoint::SNodeSessions& sessions);
static bool
HasPathToService(const Address& addr,
const Endpoint::Sessions& remoteSessions);
};
} // namespace service
} // namespace llarp
#endif
Loading…
Cancel
Save