Merge pull request #630 from majestrate/master

recent stability stuff
pull/633/head
Jeff 5 years ago committed by GitHub
commit 674f272a46
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -4,14 +4,17 @@
#
import requests
import json
import os
import sys
from collections import defaultdict as Dict
from requests.exceptions import RequestException
def jsonrpc(method, **args):
return requests.post('http://127.0.0.1:1190/', data=json.dumps(
{'method': method, 'params': args, 'id': 0}), headers={'content-type': 'application/json'}).json()
{'method': method, 'params': args, 'id': 'munin'}), headers={'content-type': 'application/json'}).json()
def exit_sessions_main():
@ -20,8 +23,8 @@ def exit_sessions_main():
print("graph_vlabel sessions")
print("graph_category network")
print("graph_info This graph shows the number of exit sessions on a lokinet exit")
print("lokinet.exit.sessions.info Number of exit sessions")
print("lokinet.exit.sessions.label sessions")
print("_exit_sessions.info Number of exit sessions")
print("_exit_sessions.label sessions")
else:
count = 0
try:
@ -29,7 +32,7 @@ def exit_sessions_main():
count = len(j['result'])
except RequestException:
pass
print("lokinet.exit.sessions {}".format(count))
print("_exit_sessions.value {}".format(count))
def peers_main():
@ -38,32 +41,39 @@ def peers_main():
print("graph_vlabel peers")
print("graph_category network")
print("graph_info This graph shows the number of node to node sessions of this lokinet router")
print("lokinet.peers.outbound.info Number of outbound lokinet peers")
print("lokinet.peers.inbound.info Number of inbound lokinet peers")
print("lokinet.peers.outbound.label outbound peers")
print("lokinet.peers.inbound.label inbound peers")
print("_peers_outbound.info Number of outbound lokinet peers")
print("_peers_inbound.info Number of inbound lokinet peers")
print("_peers_outbound.label outbound peers")
print("_peers_inbound.label inbound peers")
print("_peers_clients.info Number of lokinet client peers")
print("_peers_clients.label lokinet client peers")
else:
inbound = 0
outbound = 0
inbound = Dict(int)
outbound = Dict(int)
clients = Dict(int)
try:
j = jsonrpc("llarp.admin.link.neighboors")
for peer in j['result']:
if peer["outbound"]:
outbound += 1
if peer["svcnode"]:
if peer["outbound"]:
outbound[peer['ident']] += 1
else:
inbound[peer['ident']] += 1
else:
inbound += 1
clients[peer['ident']] += 1
except RequestException:
pass
print("lokinet.peers.outbound {}".format(outbound))
print("lokinet.peers.inbound {}".format(inbound))
print("_peers_outbound.value {}".format(len(outbound)))
print("_peers_inbound.value {}".format(len(inbound)))
print("_peers_clients.value {}".format(len(clients)))
if __name__ == '__main__':
if sys.argv[0] == 'lokinet-peers':
exe = os.path.basename(sys.argv[0]).lower()
if exe == 'lokinet_peers':
peers_main()
elif sys.argv[0] == 'lokinet-exit':
elif exe == 'lokinet_exit':
exit_sessions_main()
else:
print(
'please symlink this as `lokinet-peers` or `lokinet-exit` in munin plugins dir')
'please symlink this as `lokinet_peers` or `lokinet_exit` in munin plugins dir')

@ -85,6 +85,19 @@ decrement S by 1 and forward to dht peer who is next closest to
the SA of the IS. If S is greater than 3, don't store the IS and
discard this message.
acknoledge introduction message (AIM)
acknoledge the publishing of an introduction
{
A: "A",
P: published_to_counter,
T: transaction_id_uint64,
V: 0
}
increment P by 1 and forward to requester
find router contact message (FRCM)
@ -130,3 +143,11 @@ in response to an exploritory router lookup, where FRCM.E is provided and non ze
T: transaction_id_uint64,
V: 0
}
sent in reply to a dht request to indicate transaction timeout
{
A: "T",
T: transaction_id_uint64,
V: 0
}

@ -48,7 +48,6 @@ namespace llarp
{
if(!ILinkLayer::Start(l))
return false;
/// TODO: change me to true when done
return false;
}

@ -127,6 +127,7 @@ namespace llarp
{
llarp::LogInfo("session to ", RouterID(itr->second->GetPubKey()),
" timed out");
itr->second->Close();
itr = m_AuthedLinks.erase(itr);
}
}
@ -145,6 +146,7 @@ namespace llarp
else
{
LogInfo("pending session at ", itr->first, " timed out");
itr->second->Close();
itr = m_Pending.erase(itr);
}
}

@ -93,9 +93,29 @@ namespace llarp
PathContext::ForwardLRCM(const RouterID& nextHop,
const std::array< EncryptedFrame, 8 >& frames)
{
auto msg = std::make_shared< const LR_CommitMessage >(frames);
LogDebug("forwarding LRCM to ", nextHop);
const LR_CommitMessage msg(frames);
return m_Router->SendToOrQueue(nextHop, &msg);
if(m_Router->HasSessionTo(nextHop))
{
return m_Router->SendToOrQueue(nextHop, msg.get());
}
const RouterID router = nextHop;
AbstractRouter* const r = m_Router;
m_Router->EnsureRouter(
nextHop, [msg, r, router](const std::vector< RouterContact >& found) {
if(found.size())
{
r->TryConnectAsync(found[0], 1);
r->SendToOrQueue(router, msg.get());
}
else
LogError("dropped LRCM to ", router,
" as we cannot find in via DHT");
});
LogInfo("we are not directly connected to ", router,
" so we need to do a lookup");
return true;
}
template < typename Map_t, typename Key_t, typename CheckValue_t,
typename GetFunc_t >

@ -84,7 +84,7 @@ namespace llarp
std::make_unique< RouterContact >(ctx->path->hops[ctx->idx].rc);
}
// build record
record.lifetime = path::default_lifetime;
record.version = LLARP_PROTO_VERSION;
record.txid = hop.txID;
record.rxid = hop.rxID;

@ -54,7 +54,7 @@ namespace llarp
Lock_t l(&m_PathsMutex);
for(const auto& item : m_Paths)
{
if(!item.second->Expired(futureTime))
if(item.second->IsReady() && !item.second->Expired(futureTime))
++num;
}
return num;

@ -1,8 +1,19 @@
#include <router/abstractrouter.hpp>
#include <nodedb.hpp>
namespace llarp
{
AbstractRouter::~AbstractRouter()
{
}
void
AbstractRouter::EnsureRouter(RouterID router, RouterLookupHandler handler)
{
std::vector< RouterContact > found(1);
if(nodedb()->Get(router, found[0]))
handler(found);
else
LookupRouter(router, handler);
}
} // namespace llarp

@ -214,8 +214,16 @@ namespace llarp
virtual bool
ConnectionToRouterAllowed(const RouterID &router) const = 0;
/// return true if we have at least 1 session to this router in either
/// direction
virtual bool
HasSessionTo(const RouterID &router) const = 0;
virtual util::StatusObject
ExtractStatus() const = 0;
void
EnsureRouter(RouterID router, RouterLookupHandler handler);
};
} // namespace llarp

@ -61,16 +61,6 @@ struct TryConnectJob
return now > lastAttempt && now - lastAttempt > 5000;
}
void
Failed()
{
llarp::LogInfo("session to ", llarp::RouterID(rc.pubkey), " closed");
if(link)
link->CloseSessionTo(rc.pubkey);
// delete this
router->pendingEstablishJobs.erase(rc.pubkey);
}
void
Success()
{
@ -86,6 +76,8 @@ struct TryConnectJob
{
return Attempt();
}
// discard pending traffic on timeout
router->DiscardOutboundFor(rc.pubkey);
router->routerProfiling().MarkConnectTimeout(rc.pubkey);
if(router->routerProfiling().IsBad(rc.pubkey))
{
@ -532,8 +524,7 @@ namespace llarp
async_verify_context *ctx =
static_cast< async_verify_context * >(job->user);
auto router = ctx->router;
PubKey pk(job->rc.pubkey);
router->m_Clients.insert(pk);
const PubKey pk(job->rc.pubkey);
router->FlushOutboundFor(pk, router->GetLinkWithSessionByPubkey(pk));
delete ctx;
router->pendingVerifyRC.erase(pk);
@ -546,18 +537,12 @@ namespace llarp
async_verify_context *ctx =
static_cast< async_verify_context * >(job->user);
auto router = ctx->router;
PubKey pk(job->rc.pubkey);
const PubKey pk(job->rc.pubkey);
if(!job->valid)
{
if(ctx->establish_job)
{
// was an outbound attempt
ctx->establish_job->Failed();
}
delete ctx;
router->DiscardOutboundFor(pk);
router->pendingVerifyRC.erase(pk);
return;
}
// we're valid, which means it's already been committed to the nodedb
@ -569,7 +554,7 @@ namespace llarp
router->validRouters.erase(pk);
}
RouterContact rc = job->rc;
const RouterContact rc = job->rc;
router->validRouters.emplace(pk, rc);
@ -1274,10 +1259,17 @@ namespace llarp
LogDebug("keepalive to ", itr->first);
link->KeepAliveSessionTo(itr->first);
}
else if(m_Clients.count(itr->first) == 0)
else
{
LogDebug("establish to ", itr->first);
TryEstablishTo(itr->first);
RouterContact rc;
if(nodedb()->Get(itr->first, rc))
{
if(rc.IsPublicRouter())
{
LogDebug("establish to ", itr->first);
TryConnectAsync(rc, 5);
}
}
}
++itr;
}
@ -1359,12 +1351,12 @@ namespace llarp
if(selected->SendTo(remote, buf))
return;
}
for(const auto &link : outboundLinks)
for(const auto &link : inboundLinks)
{
if(link->SendTo(remote, buf))
return;
}
for(const auto &link : inboundLinks)
for(const auto &link : outboundLinks)
{
if(link->SendTo(remote, buf))
return;
@ -1385,7 +1377,6 @@ namespace llarp
dht()->impl->Nodes()->DelNode(k);
// remove from valid routers if it's a valid router
validRouters.erase(remote);
m_Clients.erase(remote);
LogInfo("Session to ", remote, " fully closed");
}
@ -1416,22 +1407,37 @@ namespace llarp
pendingEstablishJobs.erase(remote);
return;
}
// if for some reason we don't provide a link layer pick one that has it
if(!chosen)
{
DiscardOutboundFor(remote);
pendingEstablishJobs.erase(remote);
return;
for(const auto &link : inboundLinks)
{
if(link->HasSessionTo(remote))
{
chosen = link.get();
break;
}
}
for(const auto &link : outboundLinks)
{
if(link->HasSessionTo(remote))
{
chosen = link.get();
break;
}
}
}
while(itr->second.size())
{
llarp_buffer_t buf(itr->second.front());
if(!chosen->SendTo(remote, buf))
LogWarn("failed to send outbound message to ", remote, " via ",
LogWarn("failed to send queued outbound message to ", remote, " via ",
chosen->Name());
itr->second.pop();
}
pendingEstablishJobs.erase(remote);
outboundMessageQueue.erase(itr);
}
void

@ -301,9 +301,6 @@ namespace llarp
std::unordered_map< RouterID, llarp_time_t, RouterID::Hash >
m_PersistingSessions;
// RCs of connected clients
std::set< RouterID > m_Clients;
// lokinet routers from lokid, maps pubkey to when we think it will expire,
// set to max value right now
std::unordered_map< RouterID, llarp_time_t, PubKey::Hash > lokinetRouters;
@ -527,7 +524,7 @@ namespace llarp
const std::vector< RouterContact > &results);
bool
HasSessionTo(const RouterID &remote) const;
HasSessionTo(const RouterID &remote) const override;
void
HandleDHTLookupForTryEstablishTo(

@ -237,7 +237,7 @@ namespace llarp
[&](const ILinkSession* session, bool outbound) {
resp.emplace_back(
Response{{"ident", RouterID(session->GetPubKey()).ToString()},
{"addr", session->GetRemoteEndpoint().ToString()},
{"svcnode", session->GetRemoteRC().IsPublicRouter()},
{"outbound", outbound}});
},
false);

@ -123,14 +123,11 @@ namespace llarp
}
}
// tick active endpoints
for(const auto &item : m_Endpoints)
{
auto itr = m_Endpoints.begin();
while(itr != m_Endpoints.end())
{
itr->second->Tick(now);
++itr;
}
item.second->Tick(now);
}
/*
std::vector< RouterID > expired;
m_Router->nodedb()->visit([&](const RouterContact &rc) -> bool {
if(rc.IsExpired(now))
@ -146,6 +143,7 @@ namespace llarp
return false;
return true;
});
*/
}
bool

@ -125,10 +125,8 @@ namespace llarp
if(arg->error_code == UTP_ETIMEDOUT)
{
link->HandleTimeout(session);
utp_close(arg->socket);
}
else
session->Close();
session->Close();
}
return 0;
}
@ -329,8 +327,7 @@ namespace llarp
LinkLayer::NewOutboundSession(const RouterContact& rc,
const AddressInfo& addr)
{
return std::make_shared< OutboundSession >(
this, utp_create_socket(_utp_ctx), rc, addr);
return std::make_shared< OutboundSession >(this, NewSocket(), rc, addr);
}
uint64
@ -351,6 +348,7 @@ namespace llarp
return 0;
}
utp_read_drained(arg->socket);
utp_issue_deferred_acks(arg->context);
}
else
{
@ -396,7 +394,6 @@ namespace llarp
{
session->OnLinkEstablished(self);
}
return 0;
}

@ -189,10 +189,12 @@ namespace llarp
bool
Session::TimedOut(llarp_time_t now) const
{
if(state == eInitial || state == eLinkEstablished)
return false;
if(state == eConnecting)
return now - lastActive > 5000;
if(sendq.size() >= MaxSendQueueSize)
{
if(now <= lastSend)
return false;
return now - lastSend > 5000;
}
// let utp manage this
@ -546,6 +548,7 @@ namespace llarp
if(!itr->second.AppendData(out.cur, length))
{
LogError("inbound buffer is full");
m_RecvMsgs.erase(itr);
return false; // not enough room
}
// mutate key
@ -557,8 +560,6 @@ namespace llarp
if(remaining == 0)
{
// we done with this guy, prune next tick
itr->second.lastActive = 0;
ManagedBuffer buf{itr->second.buffer};
// resize
buf.underlying.sz = buf.underlying.cur - buf.underlying.base;
@ -567,6 +568,7 @@ namespace llarp
// process buffer
LogDebug("got message ", msgid, " from ", remoteAddr);
parent->HandleMessage(this, buf.underlying);
m_RecvMsgs.erase(itr);
}
return true;
}
@ -580,10 +582,12 @@ namespace llarp
{
if(state == eLinkEstablished || state == eSessionReady)
{
// only call shutdown and close when we are actually connected
// only call shutdown when we are actually connected
utp_shutdown(sock, SHUT_RDWR);
utp_close(sock);
}
utp_close(sock);
utp_set_userdata(sock, nullptr);
sock = nullptr;
LogDebug("utp_close ", remoteAddr);
}
}

Loading…
Cancel
Save