Merge remote-tracking branch 'origin/dev' into logic-thread-fix-2019-11-13

pull/912/head
Jeff Becker 5 years ago
commit b31d7b75fc
No known key found for this signature in database
GPG Key ID: F357B3B42F6F9B05

@ -671,8 +671,7 @@ namespace llarp
#ifdef _WIN32
struct llarp_fd_promise
{
void
Set(std::pair< int, int >)
void Set(std::pair< int, int >)
{
}

@ -4,6 +4,7 @@
#include <link/session.hpp>
#include <router_id.hpp>
#include <util/bencode.hpp>
#include <path/path_types.hpp>
#include <vector>
@ -19,6 +20,8 @@ namespace llarp
ILinkSession* session = nullptr;
uint64_t version = LLARP_PROTO_VERSION;
PathID_t pathid;
ILinkMessage() = default;
virtual ~ILinkMessage() = default;

@ -12,7 +12,6 @@ namespace llarp
{
struct RelayUpstreamMessage : public ILinkMessage
{
PathID_t pathid;
Encrypted< MAX_LINK_MSG_SIZE - 128 > X;
TunnelNonce Y;
@ -37,7 +36,6 @@ namespace llarp
struct RelayDownstreamMessage : public ILinkMessage
{
PathID_t pathid;
Encrypted< MAX_LINK_MSG_SIZE - 128 > X;
TunnelNonce Y;

@ -322,6 +322,9 @@ namespace llarp
self->hop = nullptr;
}
// TODO: If decryption has succeeded here but we otherwise don't
// want to or can't accept the path build request, send
// a status message saying as much.
static void
HandleDecrypted(llarp_buffer_t* buf,
std::shared_ptr< LRCMFrameDecrypt > self)
@ -344,8 +347,16 @@ namespace llarp
return;
}
info.txID = self->record.txid;
info.rxID = self->record.rxid;
info.txID = self->record.txid;
info.rxID = self->record.rxid;
if(info.txID.IsZero() || info.rxID.IsZero())
{
llarp::LogError("LRCM refusing zero pathid");
self->decrypter = nullptr;
return;
}
info.upstream = self->record.nextHop;
// generate path key as we are in a worker thread

@ -31,8 +31,15 @@ namespace llarp
for(size_t idx = 0; idx < hsz; ++idx)
{
hops[idx].rc = h[idx];
hops[idx].txID.Randomize();
hops[idx].rxID.Randomize();
do
{
hops[idx].txID.Randomize();
} while(hops[idx].txID.IsZero());
do
{
hops[idx].rxID.Randomize();
} while(hops[idx].rxID.IsZero());
}
for(size_t idx = 0; idx < hsz - 1; ++idx)

@ -289,6 +289,7 @@ namespace llarp
{
if(itr->second->Expired(now))
{
m_Router->outboundMessageHandler().QueueRemoveEmptyPath(itr->first);
itr = map.erase(itr);
}
else

@ -8,6 +8,7 @@ namespace llarp
{
struct PathID_t final : public AlignedBuffer< PATHIDSIZE >
{
using Hash = AlignedBuffer< PATHIDSIZE >::Hash;
};
} // namespace llarp

@ -165,7 +165,7 @@ namespace llarp
void Builder::Tick(llarp_time_t)
{
const auto now = llarp::time_now_ms();
ExpirePaths(now);
ExpirePaths(now, m_router);
if(ShouldBuildMore(now))
BuildOne();
TickPaths(m_router);

@ -73,7 +73,7 @@ namespace llarp
}
void
PathSet::ExpirePaths(llarp_time_t now)
PathSet::ExpirePaths(llarp_time_t now, AbstractRouter* router)
{
Lock_t l(&m_PathsMutex);
if(m_Paths.size() == 0)
@ -83,6 +83,8 @@ namespace llarp
{
if(itr->second->Expired(now))
{
router->outboundMessageHandler().QueueRemoveEmptyPath(
itr->second->TXID());
itr = m_Paths.erase(itr);
}
else

@ -148,7 +148,7 @@ namespace llarp
GetByUpstream(RouterID remote, PathID_t rxid) const;
void
ExpirePaths(llarp_time_t now);
ExpirePaths(llarp_time_t now, AbstractRouter* router);
/// get the number of paths in this status
size_t
@ -199,15 +199,15 @@ namespace llarp
}
/// override me in subtype
virtual bool
HandleGotIntroMessage(std::shared_ptr< const dht::GotIntroMessage >)
virtual bool HandleGotIntroMessage(
std::shared_ptr< const dht::GotIntroMessage >)
{
return false;
}
/// override me in subtype
virtual bool
HandleGotRouterMessage(std::shared_ptr< const dht::GotRouterMessage >)
virtual bool HandleGotRouterMessage(
std::shared_ptr< const dht::GotRouterMessage >)
{
return false;
}

@ -20,9 +20,14 @@ namespace llarp
struct ILinkMessage;
struct RouterID;
struct PathID_t;
using SendStatusHandler = std::function< void(SendStatus) >;
static const size_t MAX_PATH_QUEUE_SIZE = 40;
static const size_t MAX_OUTBOUND_QUEUE_SIZE = 200;
static const size_t MAX_OUTBOUND_MESSAGES_PER_TICK = 20;
struct IOutboundMessageHandler
{
virtual ~IOutboundMessageHandler() = default;
@ -31,6 +36,12 @@ namespace llarp
QueueMessage(const RouterID &remote, const ILinkMessage *msg,
SendStatusHandler callback) = 0;
virtual void
Tick() = 0;
virtual void
QueueRemoveEmptyPath(const PathID_t &pathid) = 0;
virtual util::StatusObject
ExtractStatus() const = 0;
};

@ -12,6 +12,13 @@
namespace llarp
{
const PathID_t OutboundMessageHandler::zeroID;
OutboundMessageHandler::OutboundMessageHandler(size_t maxQueueSize)
: outboundQueue(maxQueueSize), removedPaths(20), removedSomePaths(false)
{
}
bool
OutboundMessageHandler::QueueMessage(const RouterID &remote,
const ILinkMessage *msg,
@ -31,8 +38,9 @@ namespace llarp
std::copy_n(buf.base, buf.sz, message.first.data());
if(SendIfSession(remote, message))
if(_linkManager->HasSessionTo(remote))
{
QueueOutboundMessage(remote, std::move(message), msg->pathid);
return true;
}
@ -41,9 +49,13 @@ namespace llarp
util::Lock l(&_mutex);
// create queue for <remote> if it doesn't exist, and get iterator
auto itr_pair = outboundMessageQueue.emplace(remote, MessageQueue());
auto itr_pair =
pendingSessionMessageQueues.emplace(remote, MessageQueue());
itr_pair.first->second.push_back(std::move(message));
MessageQueueEntry entry;
entry.message = message;
entry.router = remote;
itr_pair.first->second.push(std::move(entry));
shouldCreateSession = itr_pair.second;
}
@ -56,6 +68,20 @@ namespace llarp
return true;
}
void
OutboundMessageHandler::Tick()
{
ProcessOutboundQueue();
RemoveEmptyPathQueues();
SendRoundRobin();
}
void
OutboundMessageHandler::QueueRemoveEmptyPath(const PathID_t &pathid)
{
removedPaths.pushBack(pathid);
}
// TODO: this
util::StatusObject
OutboundMessageHandler::ExtractStatus() const
@ -70,36 +96,38 @@ namespace llarp
{
_linkManager = linkManager;
_logic = logic;
outboundMessageQueues.emplace(zeroID, MessageQueue());
}
void
OutboundMessageHandler::OnSessionEstablished(const RouterID &router)
{
FinalizeRequest(router, SendStatus::Success);
FinalizeSessionRequest(router, SendStatus::Success);
}
void
OutboundMessageHandler::OnConnectTimeout(const RouterID &router)
{
FinalizeRequest(router, SendStatus::Timeout);
FinalizeSessionRequest(router, SendStatus::Timeout);
}
void
OutboundMessageHandler::OnRouterNotFound(const RouterID &router)
{
FinalizeRequest(router, SendStatus::RouterNotFound);
FinalizeSessionRequest(router, SendStatus::RouterNotFound);
}
void
OutboundMessageHandler::OnInvalidRouter(const RouterID &router)
{
FinalizeRequest(router, SendStatus::InvalidRouter);
FinalizeSessionRequest(router, SendStatus::InvalidRouter);
}
void
OutboundMessageHandler::OnNoLink(const RouterID &router)
{
FinalizeRequest(router, SendStatus::NoLink);
FinalizeSessionRequest(router, SendStatus::NoLink);
}
void
@ -190,34 +218,164 @@ namespace llarp
return false;
}
bool
OutboundMessageHandler::QueueOutboundMessage(const RouterID &remote,
Message &&msg,
const PathID_t &pathid)
{
MessageQueueEntry entry;
entry.message = std::move(msg);
auto callback_copy = entry.message.second;
entry.router = remote;
entry.pathid = pathid;
if(outboundQueue.tryPushBack(std::move(entry))
!= llarp::thread::QueueReturn::Success)
{
DoCallback(callback_copy, SendStatus::Congestion);
}
return true;
}
void
OutboundMessageHandler::FinalizeRequest(const RouterID &router,
SendStatus status)
OutboundMessageHandler::ProcessOutboundQueue()
{
while(not outboundQueue.empty())
{
// TODO: can we add util::thread::Queue::front() for move semantics here?
MessageQueueEntry entry = outboundQueue.popFront();
auto itr_pair =
outboundMessageQueues.emplace(entry.pathid, MessageQueue());
if(itr_pair.second && !entry.pathid.IsZero())
{
roundRobinOrder.push(entry.pathid);
}
MessageQueue &path_queue = itr_pair.first->second;
if(path_queue.size() >= MAX_PATH_QUEUE_SIZE)
{
path_queue.pop(); // head drop
}
path_queue.push(std::move(entry));
}
}
void
OutboundMessageHandler::RemoveEmptyPathQueues()
{
removedSomePaths = (not removedPaths.empty());
while(not removedPaths.empty())
{
auto itr = outboundMessageQueues.find(removedPaths.popFront());
if(itr != outboundMessageQueues.end())
{
outboundMessageQueues.erase(itr);
}
}
}
void
OutboundMessageHandler::SendRoundRobin()
{
// send non-routing messages first priority
auto &non_routing_mq = outboundMessageQueues[zeroID];
while(!non_routing_mq.empty())
{
MessageQueueEntry entry = std::move(non_routing_mq.front());
non_routing_mq.pop();
Send(entry.router, entry.message);
}
size_t empty_count = 0;
size_t num_queues = roundRobinOrder.size();
if(removedSomePaths)
{
for(size_t i = 0; i < num_queues; i++)
{
PathID_t pathid = std::move(roundRobinOrder.front());
roundRobinOrder.pop();
if(outboundMessageQueues.find(pathid) != outboundMessageQueues.end())
{
roundRobinOrder.push(std::move(pathid));
}
}
}
num_queues = roundRobinOrder.size();
size_t sent_count = 0;
if(num_queues == 0) // if no queues, return
{
return;
}
while(sent_count
< MAX_OUTBOUND_MESSAGES_PER_TICK) // TODO: better stop condition
{
PathID_t pathid = std::move(roundRobinOrder.front());
roundRobinOrder.pop();
auto &message_queue = outboundMessageQueues[pathid];
if(message_queue.size() > 0)
{
MessageQueueEntry entry = std::move(message_queue.front());
message_queue.pop();
Send(entry.router, entry.message);
empty_count = 0;
sent_count++;
}
else
{
empty_count++;
}
roundRobinOrder.push(std::move(pathid));
// if num_queues empty queues in a row, all queues empty.
if(empty_count == num_queues)
{
break;
}
}
}
void
OutboundMessageHandler::FinalizeSessionRequest(const RouterID &router,
SendStatus status)
{
MessageQueue movedMessages;
{
util::Lock l(&_mutex);
auto itr = outboundMessageQueue.find(router);
auto itr = pendingSessionMessageQueues.find(router);
if(itr == outboundMessageQueue.end())
if(itr == pendingSessionMessageQueues.end())
{
return;
}
movedMessages.splice(movedMessages.begin(), itr->second);
movedMessages.swap(itr->second);
outboundMessageQueue.erase(itr);
pendingSessionMessageQueues.erase(itr);
}
for(const auto &msg : movedMessages)
while(!movedMessages.empty())
{
MessageQueueEntry entry = std::move(movedMessages.front());
movedMessages.pop();
if(status == SendStatus::Success)
{
Send(router, msg);
Send(entry.router, entry.message);
}
else
{
DoCallback(msg.second, status);
DoCallback(entry.message.second, status);
}
}
}

@ -4,7 +4,9 @@
#include <router/i_outbound_message_handler.hpp>
#include <util/thread/logic.hpp>
#include <util/thread/queue.hpp>
#include <util/thread/threading.hpp>
#include <path/path_types.hpp>
#include <router_id.hpp>
#include <list>
@ -24,10 +26,18 @@ namespace llarp
public:
~OutboundMessageHandler() override = default;
OutboundMessageHandler(size_t maxQueueSize = MAX_OUTBOUND_QUEUE_SIZE);
bool
QueueMessage(const RouterID &remote, const ILinkMessage *msg,
SendStatusHandler callback) override LOCKS_EXCLUDED(_mutex);
void
Tick() override;
void
QueueRemoveEmptyPath(const PathID_t &pathid) override;
util::StatusObject
ExtractStatus() const override;
@ -35,8 +45,16 @@ namespace llarp
Init(ILinkManager *linkManager, std::shared_ptr< Logic > logic);
private:
using Message = std::pair< std::vector< byte_t >, SendStatusHandler >;
using MessageQueue = std::list< Message >;
using Message = std::pair< std::vector< byte_t >, SendStatusHandler >;
struct MessageQueueEntry
{
Message message;
PathID_t pathid;
RouterID router;
};
using MessageQueue = std::queue< MessageQueueEntry >;
void
OnSessionEstablished(const RouterID &router);
@ -71,17 +89,43 @@ namespace llarp
bool
SendIfSession(const RouterID &remote, const Message &msg);
bool
QueueOutboundMessage(const RouterID &remote, Message &&msg,
const PathID_t &pathid);
void
FinalizeRequest(const RouterID &router, SendStatus status)
ProcessOutboundQueue();
void
RemoveEmptyPathQueues();
void
SendRoundRobin();
void
FinalizeSessionRequest(const RouterID &router, SendStatus status)
LOCKS_EXCLUDED(_mutex);
mutable util::Mutex _mutex; // protects outboundMessageQueue
llarp::thread::Queue< MessageQueueEntry > outboundQueue;
llarp::thread::Queue< PathID_t > removedPaths;
bool removedSomePaths;
mutable util::Mutex _mutex; // protects pendingSessionMessageQueues
std::unordered_map< RouterID, MessageQueue, RouterID::Hash >
outboundMessageQueue GUARDED_BY(_mutex);
pendingSessionMessageQueues GUARDED_BY(_mutex);
std::unordered_map< PathID_t, MessageQueue, PathID_t::Hash >
outboundMessageQueues;
std::queue< PathID_t > roundRobinOrder;
ILinkManager *_linkManager;
std::shared_ptr< Logic > _logic;
// paths cannot have pathid "0", so it can be used as the "pathid"
// for non-traffic (control) messages, so they can be prioritized.
static const PathID_t zeroID;
};
} // namespace llarp

@ -175,8 +175,12 @@ namespace llarp
{
if(_stopping.load())
return;
paths.PumpDownstream();
paths.PumpUpstream();
_outboundMessageHandler.Tick();
_linkManager.PumpLinks();
}

@ -65,7 +65,8 @@ namespace llarp
return m_CachedAddr.ToString();
}
bool ServiceInfo::CalculateAddress(std::array< byte_t, 32 >& data) const
bool
ServiceInfo::CalculateAddress(std::array< byte_t, 32 >& data) const
{
std::array< byte_t, 256 > tmp;
llarp_buffer_t buf(tmp);

@ -89,7 +89,8 @@ namespace llarp
}
/// calculate our address
bool CalculateAddress(std::array< byte_t, 32 >& data) const;
bool
CalculateAddress(std::array< byte_t, 32 >& data) const;
bool
BDecode(llarp_buffer_t* buf)

Loading…
Cancel
Save