add graceful stop for path builders and friends

pull/181/head
Jeff Becker 5 years ago
parent d58fab8440
commit 21d1998a38
No known key found for this signature in database
GPG Key ID: F357B3B42F6F9B05

@ -58,6 +58,9 @@ namespace llarp
HandleSignal(int sig);
private:
bool
Configure();
void
SigINT();

@ -11,6 +11,7 @@
#include <memory>
#include <string>
#include <unordered_map>
#include <atomic>
namespace abyss
{
@ -77,6 +78,10 @@ namespace abyss
void
DropAllCalls();
/// close all connections and stop operation
void
Stop();
/// handle new outbound connection
void
Connected(llarp_tcp_conn* conn);
@ -108,6 +113,7 @@ namespace abyss
static void
OnTick(llarp_tcp_connecter* connect);
std::atomic< bool > m_Run;
llarp_tcp_connecter m_connect;
llarp_ev_loop* m_Loop;
std::deque< Call > m_PendingCalls;

@ -334,6 +334,7 @@ namespace abyss
JSONRPC::JSONRPC()
{
m_Run.store(true);
}
JSONRPC::~JSONRPC()
@ -344,8 +345,9 @@ namespace abyss
JSONRPC::QueueRPC(RPC_Method_t method, RPC_Params params,
HandlerFactory createHandler)
{
m_PendingCalls.emplace_back(std::move(method), std::move(params),
std::move(createHandler));
if(m_Run)
m_PendingCalls.emplace_back(std::move(method), std::move(params),
std::move(createHandler));
}
bool
@ -380,6 +382,11 @@ namespace abyss
void
JSONRPC::Connected(llarp_tcp_conn* conn)
{
if(!m_Run)
{
llarp_tcp_conn_close(conn);
return;
}
auto& front = m_PendingCalls.front();
ConnImpl* connimpl =
new ConnImpl(conn, std::move(front.method), std::move(front.params),
@ -389,6 +396,13 @@ namespace abyss
connimpl->SendRequest();
}
void
JSONRPC::Stop()
{
m_Run.store(false);
DropAllCalls();
}
void
JSONRPC::DropAllCalls()
{

@ -28,7 +28,7 @@ namespace llarp
}
bool
Context::ReloadConfig()
Context::Configure()
{
// llarp::LogInfo("loading config at ", configfile);
if(llarp_load_config(config, configfile.c_str()))
@ -41,7 +41,7 @@ namespace llarp
iter.user = this;
iter.visit = &iter_config;
llarp_config_iter(config, &iter);
return router->ReloadConfig(config);
return true;
}
void
@ -174,6 +174,7 @@ namespace llarp
// run net io thread
llarp::LogInfo("running mainloop");
llarp_ev_loop_run_single_process(mainloop, worker, logic);
// waits for router graceful stop
return 0;
}
@ -189,7 +190,34 @@ namespace llarp
if(sig == SIGHUP)
{
llarp::LogInfo("SIGHUP");
ReloadConfig();
if(router)
{
llarp_config *newconfig = nullptr;
llarp_new_config(&newconfig);
if(llarp_load_config(newconfig, configfile.c_str()))
{
llarp_free_config(&newconfig);
llarp::LogError("failed to load config file ", configfile);
return;
}
// validate config
if(!router->ValidateConfig(newconfig))
{
llarp::LogWarn("new configuration is invalid");
llarp_free_config(&newconfig);
return;
}
// reconfigure
if(!router->Reconfigure(newconfig))
{
llarp_free_config(&newconfig);
llarp::LogError("Failed to reconfigure so we will stop.");
router->Stop();
return;
}
llarp::LogInfo("router reconfigured");
llarp_free_config(&newconfig);
}
}
#endif
}
@ -197,24 +225,23 @@ namespace llarp
void
Context::SigINT()
{
Close();
if(router)
{
/// async stop router on sigint
router->Stop();
}
else
{
if(logic)
logic->stop();
llarp_ev_loop_stop(mainloop);
Close();
}
}
void
Context::Close()
{
llarp::LogDebug("stopping logic");
if(logic)
logic->stop();
llarp::LogDebug("stopping event loop");
if(mainloop)
llarp_ev_loop_stop(mainloop);
llarp::LogDebug("stop router");
if(router)
router->Stop();
llarp::LogDebug("stop workers");
if(worker)
llarp_threadpool_stop(worker);
@ -232,14 +259,18 @@ namespace llarp
llarp::LogDebug("free nodedb");
delete nodedb;
llarp::LogDebug("stopping event loop");
llarp_ev_loop_stop(mainloop);
llarp::LogDebug("free router");
delete router;
llarp::LogDebug("free logic");
delete logic;
if(router)
{
llarp::LogDebug("free router");
delete router;
router = nullptr;
}
if(logic)
{
llarp::LogDebug("free logic");
delete logic;
logic = nullptr;
}
}
bool
@ -247,7 +278,7 @@ namespace llarp
{
llarp_new_config(&config);
configfile = fname;
return ReloadConfig();
return Configure();
}
} // namespace llarp

@ -13,12 +13,36 @@ namespace llarp
void
Context::Tick(llarp_time_t now)
{
{
auto itr = m_Exits.begin();
while(itr != m_Exits.end())
{
itr->second->Tick(now);
++itr;
}
}
{
auto itr = m_Closed.begin();
while(itr != m_Closed.end())
{
if((*itr)->ShouldRemove())
itr = m_Closed.erase(itr);
else
++itr;
}
}
}
void
Context::Stop()
{
auto itr = m_Exits.begin();
while(itr != m_Exits.end())
{
itr->second->Tick(now);
++itr;
itr->second->Stop();
m_Closed.emplace_back(std::move(itr->second));
itr = m_Exits.erase(itr);
}
}

@ -24,6 +24,10 @@ namespace llarp
void
ClearAllEndpoints();
/// send close to all exit sessions and remove all sessions
void
Stop();
bool
AddExitEndpoint(const std::string &name, const Config_t &config);
@ -47,6 +51,7 @@ namespace llarp
std::unordered_map< std::string,
std::unique_ptr< llarp::handlers::ExitEndpoint > >
m_Exits;
std::list< std::unique_ptr< llarp::handlers::ExitEndpoint > > m_Closed;
};
} // namespace exit
} // namespace llarp

@ -92,6 +92,23 @@ namespace llarp
return true;
}
bool
BaseSession::Stop()
{
auto sendExitClose = [&](llarp::path::Path* p) {
if(p->SupportsAnyRoles(llarp::path::ePathRoleExit))
{
llarp::LogInfo(Name(), " closing exit path");
llarp::routing::CloseExitMessage msg;
if(!(msg.Sign(&router->crypto, m_ExitIdentity)
&& p->SendExitClose(&msg, router)))
llarp::LogWarn(Name(), " failed to send exit close message");
}
};
ForEachPath(sendExitClose);
return llarp::path::Builder::Stop();
}
bool
BaseSession::HandleTraffic(llarp::path::Path* p, llarp_buffer_t buf,
uint64_t counter)

@ -42,6 +42,10 @@ namespace llarp
bool
Flush();
/// send close and stop session
bool
Stop() override;
bool
IsReady() const;

@ -248,6 +248,23 @@ namespace llarp
return m_IfAddr;
}
bool
ExitEndpoint::Stop()
{
for(auto &item : m_SNodeSessions)
item.second->Stop();
return true;
}
bool
ExitEndpoint::ShouldRemove() const
{
for(auto &item : m_SNodeSessions)
if(!item.second->ShouldRemove())
return false;
return true;
}
bool
ExitEndpoint::HasLocalMappedAddrFor(const llarp::PubKey &pk) const
{

@ -87,6 +87,12 @@ namespace llarp
bool
Start();
bool
Stop();
bool
ShouldRemove() const;
bool
HasLocalMappedAddrFor(const llarp::PubKey& pk) const;

@ -450,6 +450,14 @@ namespace llarp
Endpoint::Tick(now);
}
bool
TunEndpoint::Stop()
{
if(m_Exit)
m_Exit->Stop();
return llarp::service::Endpoint::Stop();
}
void
TunEndpoint::FlushSend()
{

@ -46,6 +46,9 @@ namespace llarp
bool
Start() override;
bool
Stop() override;
bool
IsSNode() const;

@ -733,6 +733,16 @@ namespace llarp
return SendRoutingMessage(msg, r);
}
bool
Path::SendExitClose(const llarp::routing::CloseExitMessage* msg,
llarp::Router* r)
{
llarp::LogInfo(Name(), " closing exit to ", Endpoint());
// mark as not exit anymore
_role &= ~ePathRoleExit;
return SendRoutingMessage(msg, r);
}
bool
Path::HandleObtainExitMessage(const llarp::routing::ObtainExitMessage* msg,
llarp::Router* r)

@ -505,6 +505,10 @@ namespace llarp
SendExitRequest(const llarp::routing::ObtainExitMessage* msg,
llarp::Router* r);
bool
SendExitClose(const llarp::routing::CloseExitMessage* msg,
llarp::Router* r);
protected:
llarp::routing::InboundMessageParser m_InboundMessageParser;

@ -12,7 +12,7 @@ namespace llarp
struct AsyncPathKeyExchangeContext
{
typedef llarp::path::Path Path_t;
typedef llarp::path::PathSet PathSet_t;
typedef llarp::path::Builder PathSet_t;
PathSet_t* pathset = nullptr;
Path_t* path = nullptr;
typedef std::function< void(AsyncPathKeyExchangeContext< User >*) > Handler;
@ -135,14 +135,15 @@ namespace llarp
}
};
void
pathbuilder_generated_keys(AsyncPathKeyExchangeContext< path::Builder >* ctx)
static void
PathBuilderKeysGenerated(AsyncPathKeyExchangeContext< path::Builder >* ctx)
{
RouterID remote = ctx->path->Upstream();
const ILinkMessage* msg = &ctx->LRCM;
if(!ctx->router->SendToOrQueue(remote, msg))
{
llarp::LogError("failed to send LRCM");
llarp::LogError("failed to send LRCM to ", remote);
ctx->pathset->keygens--;
return;
}
@ -150,6 +151,8 @@ namespace llarp
ctx->router->PersistSessionUntil(remote, ctx->path->ExpireTime());
// add own path
ctx->router->paths.AddOwnPath(ctx->pathset, ctx->path);
// decrement keygen counter
ctx->pathset->keygens--;
}
namespace path
@ -163,6 +166,8 @@ namespace llarp
{
p_router->paths.AddPathBuilder(this);
p_router->crypto.encryption_keygen(enckey);
_run.store(true);
keygens.store(0);
}
Builder::~Builder()
@ -189,6 +194,21 @@ namespace llarp
return false;
}
bool
Builder::Stop()
{
_run.store(false);
return true;
}
bool
Builder::ShouldRemove() const
{
if(_run)
return false;
return keygens.load() > 0;
}
const byte_t*
Builder::GetTunnelEncryptionSecretKey() const
{
@ -249,6 +269,8 @@ namespace llarp
void
Builder::Build(const std::vector< RouterContact >& hops, PathRole roles)
{
if(!_run)
return;
lastBuild = Now();
// async generate keys
AsyncPathKeyExchangeContext< Builder >* ctx =
@ -258,8 +280,9 @@ namespace llarp
auto path = new llarp::path::Path(hops, this, roles);
path->SetBuildResultHook(std::bind(&llarp::path::Builder::HandlePathBuilt,
this, std::placeholders::_1));
++keygens;
ctx->AsyncGenerateKeys(path, router->logic, router->tp, this,
&pathbuilder_generated_keys);
&PathBuilderKeysGenerated);
}
void

@ -2,6 +2,7 @@
#define LLARP_PATHBUILDER_HPP_
#include <pathset.hpp>
#include <atomic>
namespace llarp
{
@ -12,12 +13,21 @@ namespace llarp
struct Builder : public PathSet
{
protected:
/// flag for PathSet::Stop()
std::atomic< bool > _run;
public:
llarp::Router* router;
struct llarp_dht_context* dht;
llarp::SecretKey enckey;
size_t numHops;
llarp_time_t lastBuild = 0;
llarp_time_t buildIntervalLimit = MIN_PATH_BUILD_INTERVAL;
// how many keygens are currently happening
std::atomic< uint8_t > keygens;
/// construct
Builder(llarp::Router* p_router, struct llarp_dht_context* p_dht,
size_t numPaths, size_t numHops);
@ -31,6 +41,12 @@ namespace llarp
virtual bool
ShouldBuildMore(llarp_time_t now) const;
virtual bool
Stop();
bool
ShouldRemove() const override;
llarp_time_t
Now() const;

@ -101,6 +101,15 @@ namespace llarp
virtual llarp_time_t
Now() const = 0;
/// stop this pathset and mark it as to be removed
virtual bool
Stop() = 0;
/// return true if we can and should remove this pathset and underlying
/// resources from its parent context
virtual bool
ShouldRemove() const = 0;
/// return true if we should build another path
virtual bool
ShouldBuildMore(llarp_time_t now) const;
@ -183,6 +192,18 @@ namespace llarp
protected:
size_t m_NumPaths;
void
ForEachPath(std::function< void(Path*) > visit)
{
Lock_t lock(m_PathsMutex);
auto itr = m_Paths.begin();
while(itr != m_Paths.end())
{
visit(itr->second);
++itr;
}
}
private:
using PathInfo_t = std::pair< RouterID, PathID_t >;

@ -207,6 +207,8 @@ namespace llarp
#else
disk = llarp_init_threadpool(1, "llarp-diskio");
#endif
_stopping.store(false);
_running.store(false);
}
Router::~Router()
@ -218,6 +220,9 @@ namespace llarp
Router::HandleRecvLinkMessageBuffer(llarp::ILinkSession *session,
llarp_buffer_t buf)
{
if(_stopping)
return true;
if(!session)
{
llarp::LogWarn("no link session");
@ -440,16 +445,11 @@ namespace llarp
void
Router::Close()
{
for(const auto &link : inboundLinks)
{
link->Stop();
}
if(logic)
logic->stop();
llarp_ev_loop_stop(netloop);
inboundLinks.clear();
if(outboundLink)
{
outboundLink->Stop();
outboundLink.reset(nullptr);
}
outboundLink.reset(nullptr);
}
void
@ -852,6 +852,8 @@ namespace llarp
bool
Router::Run(struct llarp_nodedb *nodedb)
{
if(_running || _stopping)
return false;
this->nodedb = nodedb;
if(enableRPCServer)
@ -1026,14 +1028,49 @@ namespace llarp
}
llarp_dht_context_start(dht, pubkey());
ScheduleTicker(1000);
return true;
_running.store(true);
return _running;
}
static void
RouterAfterStopLinks(void *u, uint64_t, uint64_t)
{
Router *self = static_cast< Router * >(u);
self->Close();
}
static void
RouterAfterStopIssued(void *u, uint64_t, uint64_t)
{
Router *self = static_cast< Router * >(u);
self->StopLinks();
self->logic->call_later({200, self, &RouterAfterStopLinks});
}
void
Router::StopLinks()
{
llarp::LogInfo("stopping links");
outboundLink->Stop();
for(auto &link : inboundLinks)
link->Stop();
}
void
Router::Stop()
{
this->Close();
this->routerProfiling.Save(this->routerProfilesFile.c_str());
if(!_running)
return;
if(_stopping)
return;
_stopping.store(true);
llarp::LogInfo("stopping router");
hiddenServiceContext.StopAll();
exitContext.Stop();
if(rpcServer)
rpcServer->Stop();
logic->call_later({200, this, &RouterAfterStopIssued});
}
bool
@ -1076,8 +1113,68 @@ namespace llarp
return exitContext.AddExitEndpoint("default-connectivity", netConfig);
}
/// validate a new configuration against an already made and running
/// router
struct RouterConfigValidator
{
static void
ValidateEntry(llarp_config_iterator *i, const char *section,
const char *key, const char *val)
{
RouterConfigValidator *self =
static_cast< RouterConfigValidator * >(i->user);
if(self->valid)
{
if(!self->OnEntry(section, key, val))
{
llarp::LogError("invalid entry in section [", section, "]: '", key,
"'='", val, "'");
self->valid = false;
}
}
}
const Router *router;
llarp_config *config;
bool valid;
RouterConfigValidator(const Router *r, llarp_config *conf)
: router(r), config(conf), valid(true)
{
}
/// checks the (section, key, value) config tuple
/// return false if that entry conflicts
/// with existing configuration in router
bool
OnEntry(const char *, const char *, const char *) const
{
// TODO: implement me
return true;
}
/// do validation
/// return true if this config is valid
/// return false if this config is not valid
bool
Validate()
{
llarp_config_iterator iter;
iter.user = this;
iter.visit = &ValidateEntry;
llarp_config_iter(config, &iter);
return valid;
}
};
bool
Router::ValidateConfig(llarp_config *conf) const
{
RouterConfigValidator validator(this, conf);
return validator.Validate();
}
bool
Router::ReloadConfig(__attribute__((unused)) const llarp_config *conf)
Router::Reconfigure(llarp_config *)
{
// TODO: implement me
return true;

@ -229,9 +229,14 @@ namespace llarp
bool
Run(struct llarp_nodedb *nodedb);
/// stop running the router logic gracefully
void
Stop();
/// close all sessions and shutdown all links
void
StopLinks();
void
PersistSessionUntil(const llarp::RouterID &remote, llarp_time_t until);
@ -262,8 +267,15 @@ namespace llarp
void
try_connect(fs::path rcfile);
/// inject configuration and reconfigure router
bool
Reconfigure(llarp_config *conf);
/// validate new configuration against old one
/// return true on 100% valid
/// return false if not 100% valid
bool
ReloadConfig(const llarp_config *conf);
ValidateConfig(llarp_config *conf) const;
/// send to remote router or queue for sending
/// returns false on overflow
@ -365,6 +377,12 @@ namespace llarp
HandleAsyncLoadRCForSendTo(llarp_async_load_rc *async);
private:
std::atomic< bool > _stopping;
std::atomic< bool > _running;
bool
UpdateOurRC(bool rotateKeys = true);
template < typename Config >
void
mergeHiddenServiceConfig(const Config &in, Config &out)

@ -284,6 +284,12 @@ namespace llarp
{
}
void
Stop()
{
_handler.Close();
}
bool
Start(const std::string& addr)
{
@ -313,6 +319,11 @@ namespace llarp
{
return true;
}
void
Stop()
{
}
};
struct CallerImpl
@ -331,6 +342,11 @@ namespace llarp
return true;
}
void
Stop()
{
}
void
Tick(llarp_time_t now)
{
@ -349,6 +365,12 @@ namespace llarp
delete m_Impl;
}
void
Caller::Stop()
{
m_Impl->Stop();
}
bool
Caller::Start(const std::string& addr)
{
@ -370,6 +392,12 @@ namespace llarp
delete m_Impl;
}
void
Server::Stop()
{
m_Impl->Stop();
}
bool
Server::Start(const std::string& addr)
{

@ -25,6 +25,10 @@ namespace llarp
bool
Start(const std::string& bindaddr);
/// stop and close
void
Stop();
private:
ServerImpl* m_Impl;
};
@ -41,6 +45,10 @@ namespace llarp
bool
Start(const std::string& remote);
/// stop and close
void
Stop();
/// test if a router is valid
bool
VerifyRouter(const llarp::PubKey& pk);

@ -12,22 +12,57 @@ namespace llarp
}
Context::~Context()
{
}
bool
Context::StopAll()
{
auto itr = m_Endpoints.begin();
while(itr != m_Endpoints.end())
{
itr->second->Stop();
m_Stopped.emplace_back(std::move(itr->second));
itr = m_Endpoints.erase(itr);
}
return true;
}
bool
Context::RemoveEndpoint(const std::string &name)
{
auto itr = m_Endpoints.find(name);
if(itr == m_Endpoints.end())
return false;
std::unique_ptr< Endpoint > ep = std::move(itr->second);
m_Endpoints.erase(itr);
ep->Stop();
m_Stopped.emplace_back(std::move(ep));
return true;
}
void
Context::Tick(llarp_time_t now)
{
auto itr = m_Endpoints.begin();
while(itr != m_Endpoints.end())
// erase stopped endpoints that are done
{
itr->second->Tick(now);
++itr;
auto itr = m_Stopped.begin();
while(itr != m_Stopped.end())
{
if((*itr)->ShouldRemove())
itr = m_Stopped.erase(itr);
else
++itr;
}
}
// tick active endpoints
{
auto itr = m_Endpoints.begin();
while(itr != m_Endpoints.end())
{
itr->second->Tick(now);
++itr;
}
}
}

@ -21,6 +21,10 @@ namespace llarp
void
Tick(llarp_time_t now);
/// stop all held services
bool
StopAll();
bool
hasEndpoints();
@ -64,13 +68,20 @@ namespace llarp
MapAddressAll(const llarp::service::Address &addr,
llarp::Addr &localPrivateIpAddr);
/// add default endpoint with options
bool
AddDefaultEndpoint(
const std::unordered_multimap< std::string, std::string > &opts);
/// add endpoint via config
bool
AddEndpoint(const Config::section_t &conf, bool autostart = false);
/// stop and remove an endpoint by name
/// return false if we don't have the hidden service with that name
bool
RemoveEndpoint(const std::string &name);
bool
StartAll();
@ -78,6 +89,7 @@ namespace llarp
llarp::Router *m_Router;
std::unordered_map< std::string, std::unique_ptr< Endpoint > >
m_Endpoints;
std::list< std::unique_ptr< Endpoint > > m_Stopped;
};
} // namespace service
} // namespace llarp

@ -251,10 +251,10 @@ namespace llarp
{
if(itr->second->Tick(now))
{
m_DeadSessions
.insert(std::make_pair(itr->first, std::move(itr->second)))
->second->markedBad = true;
itr = m_RemoteSessions.erase(itr);
itr->second->Stop();
m_DeadSessions.insert(
std::make_pair(itr->first, std::move(itr->second)));
itr = m_RemoteSessions.erase(itr);
}
else
++itr;
@ -273,10 +273,33 @@ namespace llarp
}
}
bool
Endpoint::OutboundContext::Stop()
{
markedBad = true;
return llarp::path::Builder::Stop();
}
bool
Endpoint::Stop()
{
// stop remote sessions
for(auto& item : m_RemoteSessions)
{
item.second->Stop();
}
// stop snode sessions
for(auto& item : m_SNodeSessions)
{
item.second->Stop();
}
return llarp::path::Builder::Stop();
}
bool
Endpoint::OutboundContext::IsDone(llarp_time_t now) const
{
return now - lastGoodSend > DEFAULT_PATH_LIFETIME;
return now - lastGoodSend > DEFAULT_PATH_LIFETIME && ShouldRemove();
}
uint64_t

@ -74,13 +74,6 @@ namespace llarp
virtual bool
Start();
virtual bool
Stop()
{
// TODO: implement me
return false;
}
virtual std::string
Name() const;
@ -143,6 +136,14 @@ namespace llarp
void
EnsureRouterIsKnown(const RouterID& router);
/// lookup a router via closest path
bool
LookupRouterAnon(RouterID router);
/// stop this endpoint
bool
Stop() override;
const Identity&
GetIdentity() const
{
@ -247,6 +248,9 @@ namespace llarp
OutboundContext(const IntroSet& introSet, Endpoint* parent);
~OutboundContext();
bool
Stop() override;
bool
HandleDataDrop(path::Path* p, const PathID_t& dst, uint64_t s);

@ -295,7 +295,7 @@ namespace llarp
llarp::LogWarn(info, " failed to transfer data message, encode failed");
return SendRoutingMessage(&discarded, r);
}
// rewind0
// rewind
buf.sz = buf.cur - buf.base;
buf.cur = buf.base;
// send

Loading…
Cancel
Save