Make outbound message queue PumpLL again if it doesn't send all

pull/1795/head
Jason Rhinelander 3 years ago
parent faf95cbd0a
commit 633431be66

@ -1,9 +1,7 @@
#include "outbound_message_handler.hpp"
#include <llarp/messages/link_message.hpp>
#include "i_outbound_session_maker.hpp"
#include "i_rc_lookup_handler.hpp"
#include <llarp/link/i_link_manager.hpp>
#include "router.hpp"
#include <llarp/constants/link_layer.hpp>
#include <llarp/util/meta/memfn.hpp>
#include <llarp/util/status.hpp>
@ -26,7 +24,8 @@ namespace llarp
const RouterID& remote, const ILinkMessage& msg, SendStatusHandler callback)
{
// if the destination is invalid, callback with failure and return
if (not _linkManager->SessionIsClient(remote) and not _lookupHandler->SessionIsAllowed(remote))
if (not _router->linkManager().SessionIsClient(remote)
and not _router->rcLookupHandler().SessionIsAllowed(remote))
{
DoCallback(callback, SendStatus::InvalidRouter);
return true;
@ -47,7 +46,7 @@ namespace llarp
std::copy_n(buf.base, buf.sz, message.first.data());
// if we have a session to the destination, queue the message and return
if (_linkManager->HasSessionTo(remote))
if (_router->linkManager().HasSessionTo(remote))
{
QueueOutboundMessage(remote, std::move(message), msg.pathid, priority);
return true;
@ -87,7 +86,8 @@ namespace llarp
m_Killer.TryAccess([this]() {
recentlyRemovedPaths.Decay();
ProcessOutboundQueue();
SendRoundRobin();
if (/*bool more = */ SendRoundRobin())
_router->PumpLL();
});
}
@ -127,13 +127,9 @@ namespace llarp
}
void
OutboundMessageHandler::Init(
ILinkManager* linkManager, I_RCLookupHandler* lookupHandler, EventLoop_ptr loop)
OutboundMessageHandler::Init(AbstractRouter* router)
{
_linkManager = linkManager;
_lookupHandler = lookupHandler;
_loop = std::move(loop);
_router = router;
outboundMessageQueues.emplace(zeroID, MessageQueue());
}
@ -168,14 +164,14 @@ namespace llarp
OutboundMessageHandler::DoCallback(SendStatusHandler callback, SendStatus status)
{
if (callback)
_loop->call([f = std::move(callback), status] { f(status); });
_router->loop()->call([f = std::move(callback), status] { f(status); });
}
void
OutboundMessageHandler::QueueSessionCreation(const RouterID& remote)
{
auto fn = util::memFn(&OutboundMessageHandler::OnSessionResult, this);
_linkManager->GetSessionMaker()->CreateSessionTo(remote, fn);
_router->linkManager().GetSessionMaker()->CreateSessionTo(remote, fn);
}
bool
@ -199,7 +195,7 @@ namespace llarp
const llarp_buffer_t buf(msg.first);
auto callback = msg.second;
m_queueStats.sent++;
return _linkManager->SendTo(remote, buf, [=](ILinkSession::DeliveryStatus status) {
return _router->linkManager().SendTo(remote, buf, [=](ILinkSession::DeliveryStatus status) {
if (status == ILinkSession::DeliveryStatus::eDeliverySuccess)
DoCallback(callback, SendStatus::Success);
else
@ -212,7 +208,7 @@ namespace llarp
bool
OutboundMessageHandler::SendIfSession(const RouterID& remote, const Message& msg)
{
if (_linkManager->HasSessionTo(remote))
if (_router->linkManager().HasSessionTo(remote))
{
return Send(remote, msg);
}
@ -282,7 +278,7 @@ namespace llarp
}
}
void
bool
OutboundMessageHandler::SendRoundRobin()
{
m_queueStats.numTicks++;
@ -296,7 +292,6 @@ namespace llarp
routing_mq.pop();
}
size_t empty_count = 0;
size_t num_queues = roundRobinOrder.size();
// if any paths have been removed since last tick, remove any stale
@ -317,16 +312,16 @@ namespace llarp
removedSomePaths = false;
num_queues = roundRobinOrder.size();
size_t sent_count = 0;
if (num_queues == 0) // if no queues, return
if (num_queues == 0)
{
return;
return false;
}
// send messages for each pathid in roundRobinOrder, stopping when
// either every path's queue is empty or a set maximum amount of
// messages have been sent.
while (sent_count < MAX_OUTBOUND_MESSAGES_PER_TICK)
size_t consecutive_empty = 0;
for (size_t sent_count = 0; sent_count < MAX_OUTBOUND_MESSAGES_PER_TICK;)
{
PathID_t pathid = std::move(roundRobinOrder.front());
roundRobinOrder.pop();
@ -339,24 +334,26 @@ namespace llarp
Send(entry.router, entry.message);
message_queue.pop();
empty_count = 0;
sent_count++;
consecutive_empty = 0;
consecutive_empty++;
}
else
{
empty_count++;
consecutive_empty++;
}
roundRobinOrder.push(std::move(pathid));
// if num_queues empty queues in a row, all queues empty.
if (empty_count == num_queues)
if (consecutive_empty == num_queues)
{
break;
}
}
m_queueStats.perTickMax = std::max((uint32_t)sent_count, m_queueStats.perTickMax);
m_queueStats.perTickMax = std::max((uint32_t)consecutive_empty, m_queueStats.perTickMax);
return consecutive_empty != num_queues;
}
void

@ -17,8 +17,7 @@ struct llarp_buffer_t;
namespace llarp
{
struct ILinkManager;
struct I_RCLookupHandler;
struct AbstractRouter;
enum class SessionResult;
struct OutboundMessageHandler final : public IOutboundMessageHandler
@ -72,7 +71,7 @@ namespace llarp
ExtractStatus() const override;
void
Init(ILinkManager* linkManager, I_RCLookupHandler* lookupHandler, EventLoop_ptr loop);
Init(AbstractRouter* router);
private:
using Message = std::pair<std::vector<byte_t>, SendStatusHandler>;
@ -160,14 +159,17 @@ namespace llarp
ProcessOutboundQueue();
/*
* Sends all routing messages that have been queued, indicated by pathid 0 when queued.
* Sends routing messages that have been queued, indicated by pathid 0 when queued.
*
* Sends messages from path queues until all are empty or a set cap has been reached.
* This will send one message from each queue in a round-robin fashion such that they
* all have roughly equal access to bandwidth. A notion of priority may be introduced
* at a later time, but for now only routing messages get priority.
*
* Returns true if there is more to send (i.e. we hit the limit before emptying all path
* queues), false if all queues were drained.
*/
void
bool
SendRoundRobin();
/* Invoked when an outbound session establish attempt has concluded.
@ -193,9 +195,7 @@ namespace llarp
std::queue<PathID_t> roundRobinOrder;
ILinkManager* _linkManager;
I_RCLookupHandler* _lookupHandler;
EventLoop_ptr _loop;
AbstractRouter* _router;
util::ContentionKiller m_Killer;

@ -661,7 +661,7 @@ namespace llarp
LogInfo("Loaded ", bootstrapRCList.size(), " bootstrap routers");
// Init components after relevant config settings loaded
_outboundMessageHandler.Init(&_linkManager, &_rcLookupHandler, _loop);
_outboundMessageHandler.Init(this);
_outboundSessionMaker.Init(
this,
&_linkManager,

Loading…
Cancel
Save