more shit

pull/1658/head
Jeff Becker 3 years ago
parent 7a5dcc3eab
commit 719dd38cf5
No known key found for this signature in database
GPG Key ID: F357B3B42F6F9B05

@ -31,7 +31,7 @@ def main():
ap.add_argument('--ip', type=str, default=None)
ap.add_argument('--ifname', type=str, default='lo')
ap.add_argument('--netid', type=str, default=None)
ap.add_argument('--loglevel', type=str, default='info')
ap.add_argument('--loglevel', type=str, default='debug')
args = ap.parse_args()
if args.valgrind:
@ -66,7 +66,8 @@ def main():
'dir': 'netdb'
}
config['network'] = {
'type' : 'null'
'type' : 'null',
'save-profiles': 'false'
}
config['api'] = {
'enabled': 'false'
@ -74,6 +75,9 @@ def main():
config['lokid'] = {
'enabled': 'false',
}
config["logging"] = {
"level": args.loglevel
}
d = os.path.join(args.dir, svcNodeName(nodeid))
if not os.path.exists(d):
os.mkdir(d)

@ -187,6 +187,14 @@ namespace llarp
util::Lock l(_mutex);
m_PersistingSessions[remote] = std::max(until, m_PersistingSessions[remote]);
if (auto maybe = SessionIsClient(remote))
{
if (*maybe)
{
// mark this as a client so we don't try to back connect
m_Clients.Upsert(remote);
}
}
}
void
@ -335,43 +343,43 @@ namespace llarp
return;
std::vector<RouterID> sessionsNeeded;
std::vector<RouterID> sessionsClosed;
{
util::Lock l(_mutex);
auto itr = m_PersistingSessions.begin();
while (itr != m_PersistingSessions.end())
for (auto [remote, until] : m_PersistingSessions)
{
if (now < itr->second)
if (now < until)
{
auto link = GetLinkWithSessionTo(itr->first);
auto link = GetLinkWithSessionTo(remote);
if (link)
{
link->KeepAliveSessionTo(itr->first);
link->KeepAliveSessionTo(remote);
}
else
else if (not m_Clients.Contains(remote))
{
sessionsNeeded.push_back(itr->first);
sessionsNeeded.push_back(remote);
}
++itr;
}
else
else if (not m_Clients.Contains(remote))
{
const RouterID r(itr->first);
LogInfo("commit to ", r, " expired");
itr = m_PersistingSessions.erase(itr);
for (const auto& link : outboundLinks)
{
link->CloseSessionTo(r);
}
sessionsClosed.push_back(remote);
}
}
}
for (const auto& router : sessionsNeeded)
{
LogInfo("ensuring session to ", router, " for previously made commitment");
_sessionMaker->CreateSessionTo(router, nullptr);
}
ForEachOutboundLink([sessionsClosed](auto link) {
for (const auto& router : sessionsClosed)
{
link->CloseSessionTo(router);
}
});
}
void

@ -111,6 +111,8 @@ namespace llarp
std::unordered_map<RouterID, SessionStats> m_lastRouterStats;
util::DecayingHashSet<RouterID> m_Clients{path::default_lifetime};
IOutboundSessionMaker* _sessionMaker;
};

@ -204,6 +204,7 @@ namespace llarp
static void
OnForwardLRCMResult(
AbstractRouter* router,
std::shared_ptr<path::TransitHop> path,
const PathID_t pathid,
const RouterID nextHop,
const SharedSecret pathKey,
@ -236,9 +237,8 @@ namespace llarp
std::abort();
break;
}
router->QueueWork([router, pathid, nextHop, pathKey, status] {
LR_StatusMessage::CreateAndSend(router, pathid, nextHop, pathKey, status);
router->QueueWork([router, path, pathid, nextHop, pathKey, status] {
LR_StatusMessage::CreateAndSend(router, path, pathid, nextHop, pathKey, status);
});
}
@ -251,6 +251,7 @@ namespace llarp
llarp::LogError("duplicate transit hop ", self->hop->info);
LR_StatusMessage::CreateAndSend(
self->context->Router(),
self->hop,
self->hop->info.rxID,
self->hop->info.downstream,
self->hop->pathKey,
@ -269,6 +270,7 @@ namespace llarp
llarp::LogError("client path build hit limit ", *self->fromAddr);
OnForwardLRCMResult(
self->context->Router(),
self->hop,
self->hop->info.rxID,
self->hop->info.downstream,
self->hop->pathKey,
@ -288,6 +290,7 @@ namespace llarp
"not allowed, dropping build request on the floor");
OnForwardLRCMResult(
self->context->Router(),
self->hop,
self->hop->info.rxID,
self->hop->info.downstream,
self->hop->pathKey,
@ -308,6 +311,7 @@ namespace llarp
auto func = std::bind(
&OnForwardLRCMResult,
self->context->Router(),
self->hop,
self->hop->info.rxID,
self->hop->info.downstream,
self->hop->pathKey,
@ -338,6 +342,7 @@ namespace llarp
if (!LR_StatusMessage::CreateAndSend(
self->context->Router(),
self->hop,
self->hop->info.rxID,
self->hop->info.downstream,
self->hop->pathKey,

@ -22,21 +22,21 @@ namespace llarp
std::array<EncryptedFrame, 8> frames;
uint64_t status = 0;
HopHandler_ptr path;
HopHandler_ptr hop;
AbstractRouter* router;
PathID_t pathid;
LRSM_AsyncHandler(
std::array<EncryptedFrame, 8> _frames,
uint64_t _status,
HopHandler_ptr _path,
HopHandler_ptr _hop,
AbstractRouter* _router,
const PathID_t& pathid)
: frames(std::move(_frames))
, status(_status)
, path(std::move(_path))
, router(_router)
, pathid(pathid)
: frames{std::move(_frames)}
, status{_status}
, hop{std::move(_hop)}
, router{_router}
, pathid{pathid}
{}
~LRSM_AsyncHandler() = default;
@ -45,8 +45,7 @@ namespace llarp
handle()
{
router->NotifyRouterEvent<tooling::PathStatusReceivedEvent>(router->pubkey(), pathid, status);
path->HandleLRSM(status, frames, router);
hop->HandleLRSM(status, frames, router);
}
void
@ -133,16 +132,13 @@ namespace llarp
}
auto path = router->pathContext().GetByUpstream(session->GetPubKey(), pathid);
if (!path)
if (not path)
{
llarp::LogWarn("unhandled LR_Status message: no associated path found pathid=", pathid);
return false;
}
auto handler = std::make_shared<LRSM_AsyncHandler>(frames, status, path, router, pathid);
handler->queue_handle();
return true;
}
@ -157,6 +153,7 @@ namespace llarp
bool
LR_StatusMessage::CreateAndSend(
AbstractRouter* router,
std::shared_ptr<path::TransitHop> hop,
const PathID_t pathid,
const RouterID nextHop,
const SharedSecret pathKey,
@ -169,12 +166,9 @@ namespace llarp
message->SetDummyFrames();
if (!message->AddFrame(pathKey, status))
{
return false;
}
message->AddFrame(pathKey, status);
QueueSendMessage(router, nextHop, message);
QueueSendMessage(router, nextHop, message, hop);
return true; // can't guarantee delivery here, as far as we know it's fine
}
@ -221,10 +215,19 @@ namespace llarp
void
LR_StatusMessage::QueueSendMessage(
AbstractRouter* router, const RouterID nextHop, std::shared_ptr<LR_StatusMessage> msg)
AbstractRouter* router,
const RouterID nextHop,
std::shared_ptr<LR_StatusMessage> msg,
std::shared_ptr<path::TransitHop> hop)
{
router->loop()->call(
[router, nextHop, msg = std::move(msg)] { SendMessage(router, nextHop, msg); });
router->loop()->call([router, nextHop, msg = std::move(msg), hop = std::move(hop)] {
SendMessage(router, nextHop, msg);
// destroy hop as needed
if ((msg->status & LR_StatusRecord::SUCCESS) != LR_StatusRecord::SUCCESS)
{
hop->QueueDestroySelf(router);
}
});
}
void

@ -18,6 +18,7 @@ namespace llarp
{
struct PathContext;
struct IHopHandler;
struct TransitHop;
} // namespace path
struct LR_StatusRecord
@ -86,6 +87,7 @@ namespace llarp
static bool
CreateAndSend(
AbstractRouter* router,
std::shared_ptr<path::TransitHop> hop,
const PathID_t pathid,
const RouterID nextHop,
const SharedSecret pathKey,
@ -96,7 +98,10 @@ namespace llarp
static void
QueueSendMessage(
AbstractRouter* router, const RouterID nextHop, std::shared_ptr<LR_StatusMessage> msg);
AbstractRouter* router,
const RouterID nextHop,
std::shared_ptr<LR_StatusMessage> msg,
std::shared_ptr<path::TransitHop> hop);
static void
SendMessage(

@ -95,8 +95,9 @@ namespace llarp
typename Map_t,
typename Key_t,
typename CheckValue_t,
typename GetFunc_t>
HopHandler_ptr
typename GetFunc_t,
typename Return_ptr = HopHandler_ptr>
Return_ptr
MapGet(Map_t& map, const Key_t& k, CheckValue_t check, GetFunc_t get)
{
Lock_t lock(map.first);
@ -172,6 +173,46 @@ namespace llarp
});
}
std::optional<std::weak_ptr<TransitHop>>
PathContext::TransitHopByInfo(const TransitHopInfo& info)
{
// this is ugly as sin
auto own = MapGet<
SyncTransitMap_t::Lock_t,
decltype(m_TransitPaths),
PathID_t,
std::function<bool(const std::shared_ptr<TransitHop>&)>,
std::function<TransitHop*(const std::shared_ptr<TransitHop>&)>,
TransitHop*>(
m_TransitPaths,
info.txID,
[info](const auto& hop) -> bool { return hop->info == info; },
[](const auto& hop) -> TransitHop* { return hop.get(); });
if (own)
return own->weak_from_this();
return std::nullopt;
}
std::optional<std::weak_ptr<TransitHop>>
PathContext::TransitHopByUpstream(const RouterID& upstream, const PathID_t& id)
{
// this is ugly as sin as well
auto own = MapGet<
SyncTransitMap_t::Lock_t,
decltype(m_TransitPaths),
PathID_t,
std::function<bool(const std::shared_ptr<TransitHop>&)>,
std::function<TransitHop*(const std::shared_ptr<TransitHop>&)>,
TransitHop*>(
m_TransitPaths,
id,
[upstream](const auto& hop) -> bool { return hop->info.upstream == upstream; },
[](const auto& hop) -> TransitHop* { return hop.get(); });
if (own)
return own->weak_from_this();
return std::nullopt;
}
HopHandler_ptr
PathContext::GetByUpstream(const RouterID& remote, const PathID_t& id)
{

@ -77,6 +77,12 @@ namespace llarp
HopHandler_ptr
GetByDownstream(const RouterID& id, const PathID_t& path);
std::optional<std::weak_ptr<TransitHop>>
TransitHopByInfo(const TransitHopInfo&);
std::optional<std::weak_ptr<TransitHop>>
TransitHopByUpstream(const RouterID&, const PathID_t&);
PathSet_ptr
GetLocalPathSet(const PathID_t& id);

@ -64,22 +64,9 @@ namespace llarp
// TODO: add to IHopHandler some notion of "path status"
const uint64_t ourStatus = LR_StatusRecord::SUCCESS;
if (!msg->AddFrame(pathKey, ourStatus))
{
return false;
}
LR_StatusMessage::QueueSendMessage(r, info.downstream, msg);
if ((status & LR_StatusRecord::SUCCESS) != LR_StatusRecord::SUCCESS)
{
LogWarn(
"TransitHop received non-successful LR_StatusMessage, queueing "
"self-destruct status=",
LRStatusCodeToString(status));
QueueDestroySelf(r);
}
msg->AddFrame(pathKey, ourStatus);
LR_StatusMessage::QueueSendMessage(r, info.downstream, msg, shared_from_this());
return true;
}

@ -185,6 +185,9 @@ namespace llarp
void
FlushDownstream(AbstractRouter* r) override;
void
QueueDestroySelf(AbstractRouter* r);
protected:
void
UpstreamWork(TrafficQueue_ptr queue, AbstractRouter* r) override;
@ -202,9 +205,6 @@ namespace llarp
void
SetSelfDestruct();
void
QueueDestroySelf(AbstractRouter* r);
std::set<std::shared_ptr<TransitHop>, ComparePtr<std::shared_ptr<TransitHop>>> m_FlushOthers;
thread::Queue<RelayUpstreamMessage> m_UpstreamGather;
thread::Queue<RelayDownstreamMessage> m_DownstreamGather;

@ -38,6 +38,13 @@ namespace llarp
return m_Values.try_emplace(v, now).second;
}
/// upsert will insert or update a value with time as now
void
Upsert(const Val_t& v)
{
m_Values[v] = llarp::time_now_ms();
}
/// decay hashset entries
void
Decay(Time_t now = 0s)

Loading…
Cancel
Save