Merge pull request #936 from majestrate/dev

last changes before 0.6.0 version bump
pull/939/head
Jeff 5 years ago committed by GitHub
commit 27b1e36039
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -843,17 +843,12 @@ namespace libuv
{
if(m_Run)
{
#ifdef LOKINET_DEBUG
if((uv_now(&m_Impl) - last_time) > 1000)
{
llarp::LogInfo("UV EVENT LOOP TICKS LAST SECOND: ", loop_run_count,
", LOGIC THREAD JOBS: ", m_Logic->numPendingJobs());
loop_run_count = 0;
last_time = uv_now(&m_Impl);
}
loop_run_count++;
#endif
uv_timer_start(&m_TickTimer, &OnTickTimeout, ms, 0);
uv_run(&m_Impl, UV_RUN_ONCE);
}

@ -22,11 +22,14 @@ namespace llarp
ILinkSession::Packet_t
OutboundMessage::XMIT() const
{
auto xmit = CreatePacket(Command::eXMIT, 10 + 32);
size_t extra = std::min(m_Data.size(), FragmentSize);
auto xmit = CreatePacket(Command::eXMIT, 10 + 32 + extra, 0, 0);
htobe16buf(xmit.data() + CommandOverhead + PacketOverhead, m_Data.size());
htobe64buf(xmit.data() + 2 + CommandOverhead + PacketOverhead, m_MsgID);
std::copy_n(m_Digest.begin(), m_Digest.size(),
xmit.data() + 10 + CommandOverhead + PacketOverhead);
std::copy_n(m_Data.data(), extra,
xmit.data() + 10 + CommandOverhead + PacketOverhead + 32);
return xmit;
}
@ -111,7 +114,7 @@ namespace llarp
llarp_time_t now)
: m_Data(size_t{sz})
, m_Digset{std::move(h)}
, m_MsgID{msgid}
, m_MsgID(msgid)
, m_LastActiveAt{now}
{
}
@ -125,7 +128,6 @@ namespace llarp
LogWarn("invalid fragment offset ", idx);
return;
}
byte_t *dst = m_Data.data() + idx;
std::copy_n(buf.base, buf.sz, dst);
m_Acks.set(idx / FragmentSize);
@ -189,6 +191,5 @@ namespace llarp
CryptoManager::instance()->shorthash(gotten, buf);
return gotten == m_Digset;
}
} // namespace iwp
} // namespace llarp
} // namespace llarp

@ -191,7 +191,10 @@ namespace llarp
OutboundMessage{msgid, std::move(buf), now, completed})
.first->second;
EncryptAndSend(msg.XMIT());
msg.FlushUnAcked(util::memFn(&Session::EncryptAndSend, this), now);
if(buf.size() > FragmentSize)
{
msg.FlushUnAcked(util::memFn(&Session::EncryptAndSend, this), now);
}
LogDebug("send message ", msgid);
return true;
}
@ -688,10 +691,43 @@ namespace llarp
}
}
{
auto itr = m_RXMsgs.find(rxid);
const auto now = m_Parent->Now();
auto itr = m_RXMsgs.find(rxid);
if(itr == m_RXMsgs.end())
m_RXMsgs.emplace(
rxid, InboundMessage{rxid, sz, std::move(h), m_Parent->Now()});
{
itr = m_RXMsgs
.emplace(
rxid,
InboundMessage{rxid, sz, std::move(h), m_Parent->Now()})
.first;
auto _sizeDelta = data.size()
- (CommandOverhead + sizeof(uint16_t) + sizeof(uint64_t)
+ PacketOverhead + 32);
if(_sizeDelta == 0)
{
sz = std::min(sz, uint16_t{FragmentSize});
{
const llarp_buffer_t buf(data.data() + (data.size() - sz), sz);
itr->second.HandleData(0, buf, now);
if(not itr->second.IsCompleted())
{
return;
}
if(not itr->second.Verify())
{
LogError("bad short xmit hash from ", m_RemoteAddr);
return;
}
}
auto msg = std::move(itr->second);
const llarp_buffer_t buf(msg.m_Data);
m_Parent->HandleMessage(this, buf);
m_ReplayFilter.emplace(rxid, m_Parent->Now());
m_SendMACKs.emplace(rxid);
m_RXMsgs.erase(rxid);
}
}
else
LogDebug("got duplicate xmit on ", rxid, " from ", m_RemoteAddr);
}

@ -67,6 +67,9 @@ namespace llarp
virtual size_t
NumberOfConnectedClients() const = 0;
virtual size_t
NumberOfPendingConnections() const = 0;
virtual bool
GetRandomConnectedRouter(RouterContact &router) const = 0;

@ -241,6 +241,23 @@ namespace llarp
return connectedClients.size();
}
size_t
LinkManager::NumberOfPendingConnections() const
{
size_t pending = 0;
for(const auto &link : inboundLinks)
{
pending += link->NumberOfPendingSessions();
}
for(const auto &link : outboundLinks)
{
pending += link->NumberOfPendingSessions();
}
return pending;
}
bool
LinkManager::GetRandomConnectedRouter(RouterContact &router) const
{

@ -66,6 +66,9 @@ namespace llarp
size_t
NumberOfConnectedClients() const override;
size_t
NumberOfPendingConnections() const override;
bool
GetRandomConnectedRouter(RouterContact &router) const override;

@ -198,6 +198,14 @@ namespace llarp
// void
// RemovePending(ILinkSession* s) LOCKS_EXCLUDED(m_PendingMutex);
/// count the number of sessions that are yet to be fully connected
size_t
NumberOfPendingSessions() const
{
ACQUIRE_LOCK(Lock_t lock, m_PendingMutex);
return m_Pending.size();
}
private:
static void
on_timer_tick(void* user, uint64_t orig, uint64_t left)

@ -55,11 +55,9 @@ namespace llarp
PathContext::FindOwnedPathsWithEndpoint(const RouterID& r)
{
EndpointPathPtrSet found;
m_OurPaths.ForEach([&](const PathSet_ptr& set) {
set->ForEachPath([&](const Path_ptr& p) {
if(p->Endpoint() == r && p->IsReady())
found.insert(p);
});
m_OurPaths.ForEach([&](const Path_ptr& p) {
if(p->Endpoint() == r && p->IsReady())
found.insert(p);
});
return found;
}
@ -151,8 +149,8 @@ namespace llarp
PathContext::AddOwnPath(PathSet_ptr set, Path_ptr path)
{
set->AddPath(path);
MapPut< SyncOwnedPathsMap_t::Lock_t >(m_OurPaths, path->TXID(), set);
MapPut< SyncOwnedPathsMap_t::Lock_t >(m_OurPaths, path->RXID(), set);
MapPut< SyncOwnedPathsMap_t::Lock_t >(m_OurPaths, path->TXID(), path);
MapPut< SyncOwnedPathsMap_t::Lock_t >(m_OurPaths, path->RXID(), path);
}
bool
@ -170,13 +168,11 @@ namespace llarp
{
auto own = MapGet< SyncOwnedPathsMap_t::Lock_t >(
m_OurPaths, id,
[](const PathSet_ptr) -> bool {
[](const Path_ptr) -> bool {
// TODO: is this right?
return true;
},
[remote, id](PathSet_ptr p) -> HopHandler_ptr {
return p->GetByUpstream(remote, id);
});
[](Path_ptr p) -> HopHandler_ptr { return p; });
if(own)
return own;
@ -222,7 +218,7 @@ namespace llarp
auto itr = map.second.find(id);
if(itr != map.second.end())
{
return itr->second;
return itr->second->m_PathSet->GetSelf();
}
return nullptr;
}
@ -260,7 +256,7 @@ namespace llarp
PathContext::PumpUpstream()
{
m_TransitPaths.ForEach([&](auto& ptr) { ptr->FlushUpstream(m_Router); });
m_OurPaths.ForEach([&](auto& ptr) { ptr->UpstreamFlush(m_Router); });
m_OurPaths.ForEach([&](auto& ptr) { ptr->FlushUpstream(m_Router); });
}
void
@ -268,7 +264,7 @@ namespace llarp
{
m_TransitPaths.ForEach(
[&](auto& ptr) { ptr->FlushDownstream(m_Router); });
m_OurPaths.ForEach([&](auto& ptr) { ptr->DownstreamFlush(m_Router); });
m_OurPaths.ForEach([&](auto& ptr) { ptr->FlushDownstream(m_Router); });
}
void
@ -296,6 +292,20 @@ namespace llarp
++itr;
}
}
{
SyncOwnedPathsMap_t::Lock_t lock(&m_OurPaths.first);
auto& map = m_OurPaths.second;
auto itr = map.begin();
while(itr != map.end())
{
if(itr->second->Expired(now))
{
itr = map.erase(itr);
}
else
++itr;
}
}
}
routing::MessageHandler_ptr
@ -323,19 +333,8 @@ namespace llarp
return nullptr;
}
void
PathContext::RemovePathSet(PathSet_ptr set)
void PathContext::RemovePathSet(PathSet_ptr)
{
SyncOwnedPathsMap_t::Lock_t lock(&m_OurPaths.first);
auto& map = m_OurPaths.second;
auto itr = map.begin();
while(itr != map.end())
{
if(itr->second.get() == set.get())
itr = map.erase(itr);
else
++itr;
}
}
} // namespace path
} // namespace llarp

@ -12,6 +12,7 @@
#include <util/types.hpp>
#include <memory>
#include <unordered_map>
namespace llarp
{
@ -104,7 +105,8 @@ namespace llarp
void
RemovePathSet(PathSet_ptr set);
using TransitHopsMap_t = std::multimap< PathID_t, TransitHop_ptr >;
using TransitHopsMap_t =
std::unordered_multimap< PathID_t, TransitHop_ptr, PathID_t::Hash >;
struct SyncTransitMap_t
{
@ -125,7 +127,8 @@ namespace llarp
};
// maps path id -> pathset owner of path
using OwnedPathsMap_t = std::map< PathID_t, PathSet_ptr >;
using OwnedPathsMap_t =
std::unordered_map< PathID_t, Path_ptr, PathID_t::Hash >;
struct SyncOwnedPathsMap_t
{
@ -135,7 +138,7 @@ namespace llarp
OwnedPathsMap_t second GUARDED_BY(first);
void
ForEach(std::function< void(const PathSet_ptr&) > visit)
ForEach(std::function< void(const Path_ptr&) > visit)
{
Lock_t lock(&first);
for(const auto& item : second)

@ -680,8 +680,12 @@ namespace llarp
_linkManager.CheckPersistingSessions(now);
const size_t connected = NumberOfConnectedRouters();
const size_t N = nodedb()->num_loaded();
size_t connected = NumberOfConnectedRouters();
if(not isSvcNode)
{
connected += _linkManager.NumberOfPendingConnections();
}
const size_t N = nodedb()->num_loaded();
if(N < llarp::path::default_len)
{
LogInfo("We need at least ", llarp::path::default_len,
@ -1080,8 +1084,16 @@ namespace llarp
}
void
Router::ConnectToRandomRouters(int want)
Router::ConnectToRandomRouters(int _want)
{
const size_t want = _want;
auto connected = NumberOfConnectedRouters();
if(not IsServiceNode())
{
connected += _linkManager.NumberOfPendingConnections();
}
if(connected >= want)
return;
_outboundSessionMaker.ConnectToRandomRouters(want, Now());
}

@ -96,7 +96,8 @@ namespace llarp
}
if(m_Thread->LooksFull(5))
{
LogErrorExplicit(TAG, LINE, "holy crap, we are trying to queue a job "
LogErrorExplicit(TAG, LINE,
"holy crap, we are trying to queue a job "
"onto the logic thread but it looks full");
METRIC("full");
std::abort();

Loading…
Cancel
Save