|
|
|
@ -24,6 +24,7 @@ namespace llarp
|
|
|
|
|
const ILinkMessage *msg,
|
|
|
|
|
SendStatusHandler callback)
|
|
|
|
|
{
|
|
|
|
|
const uint16_t priority = msg->Priority();
|
|
|
|
|
std::array< byte_t, MAX_LINK_MSG_SIZE > linkmsg_buffer;
|
|
|
|
|
llarp_buffer_t buf(linkmsg_buffer);
|
|
|
|
|
|
|
|
|
@ -40,7 +41,7 @@ namespace llarp
|
|
|
|
|
|
|
|
|
|
if(_linkManager->HasSessionTo(remote))
|
|
|
|
|
{
|
|
|
|
|
QueueOutboundMessage(remote, std::move(message), msg->pathid);
|
|
|
|
|
QueueOutboundMessage(remote, std::move(message), msg->pathid, priority);
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -53,8 +54,9 @@ namespace llarp
|
|
|
|
|
pendingSessionMessageQueues.emplace(remote, MessageQueue());
|
|
|
|
|
|
|
|
|
|
MessageQueueEntry entry;
|
|
|
|
|
entry.message = message;
|
|
|
|
|
entry.router = remote;
|
|
|
|
|
entry.priority = priority;
|
|
|
|
|
entry.message = message;
|
|
|
|
|
entry.router = remote;
|
|
|
|
|
itr_pair.first->second.push(std::move(entry));
|
|
|
|
|
|
|
|
|
|
shouldCreateSession = itr_pair.second;
|
|
|
|
@ -89,7 +91,14 @@ namespace llarp
|
|
|
|
|
util::StatusObject
|
|
|
|
|
OutboundMessageHandler::ExtractStatus() const
|
|
|
|
|
{
|
|
|
|
|
util::StatusObject status{};
|
|
|
|
|
util::StatusObject status{"queueStats",
|
|
|
|
|
{{"queued", m_queueStats.queued},
|
|
|
|
|
{"dropped", m_queueStats.dropped},
|
|
|
|
|
{"sent", m_queueStats.sent},
|
|
|
|
|
{"queueWatermark", m_queueStats.queueWatermark},
|
|
|
|
|
{"perTickMax", m_queueStats.perTickMax},
|
|
|
|
|
{"numTicks", m_queueStats.numTicks}}};
|
|
|
|
|
|
|
|
|
|
return status;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -201,6 +210,7 @@ namespace llarp
|
|
|
|
|
{
|
|
|
|
|
const llarp_buffer_t buf(msg.first);
|
|
|
|
|
auto callback = msg.second;
|
|
|
|
|
m_queueStats.sent++;
|
|
|
|
|
return _linkManager->SendTo(
|
|
|
|
|
remote, buf, [=](ILinkSession::DeliveryStatus status) {
|
|
|
|
|
if(status == ILinkSession::DeliveryStatus::eDeliverySuccess)
|
|
|
|
@ -224,18 +234,29 @@ namespace llarp
|
|
|
|
|
bool
|
|
|
|
|
OutboundMessageHandler::QueueOutboundMessage(const RouterID &remote,
|
|
|
|
|
Message &&msg,
|
|
|
|
|
const PathID_t &pathid)
|
|
|
|
|
const PathID_t &pathid,
|
|
|
|
|
uint16_t priority)
|
|
|
|
|
{
|
|
|
|
|
MessageQueueEntry entry;
|
|
|
|
|
entry.message = std::move(msg);
|
|
|
|
|
auto callback_copy = entry.message.second;
|
|
|
|
|
entry.router = remote;
|
|
|
|
|
entry.pathid = pathid;
|
|
|
|
|
entry.priority = priority;
|
|
|
|
|
if(outboundQueue.tryPushBack(std::move(entry))
|
|
|
|
|
!= llarp::thread::QueueReturn::Success)
|
|
|
|
|
{
|
|
|
|
|
m_queueStats.dropped++;
|
|
|
|
|
DoCallback(callback_copy, SendStatus::Congestion);
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
m_queueStats.queued++;
|
|
|
|
|
|
|
|
|
|
uint32_t queueSize = outboundQueue.size();
|
|
|
|
|
m_queueStats.queueWatermark =
|
|
|
|
|
std::max(queueSize, m_queueStats.queueWatermark);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
@ -257,11 +278,16 @@ namespace llarp
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
MessageQueue &path_queue = itr_pair.first->second;
|
|
|
|
|
if(path_queue.size() >= MAX_PATH_QUEUE_SIZE)
|
|
|
|
|
|
|
|
|
|
if(path_queue.size() < MAX_PATH_QUEUE_SIZE)
|
|
|
|
|
{
|
|
|
|
|
path_queue.pop(); // head drop
|
|
|
|
|
path_queue.push(std::move(entry));
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
DoCallback(entry.message.second, SendStatus::Congestion);
|
|
|
|
|
m_queueStats.dropped++;
|
|
|
|
|
}
|
|
|
|
|
path_queue.push(std::move(entry));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -286,14 +312,15 @@ namespace llarp
|
|
|
|
|
void
|
|
|
|
|
OutboundMessageHandler::SendRoundRobin()
|
|
|
|
|
{
|
|
|
|
|
m_queueStats.numTicks++;
|
|
|
|
|
|
|
|
|
|
// send non-routing messages first priority
|
|
|
|
|
auto &non_routing_mq = outboundMessageQueues[zeroID];
|
|
|
|
|
while(not non_routing_mq.empty())
|
|
|
|
|
{
|
|
|
|
|
MessageQueueEntry entry = std::move(non_routing_mq.front());
|
|
|
|
|
non_routing_mq.pop();
|
|
|
|
|
|
|
|
|
|
const MessageQueueEntry &entry = non_routing_mq.top();
|
|
|
|
|
Send(entry.router, entry.message);
|
|
|
|
|
non_routing_mq.pop();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
size_t empty_count = 0;
|
|
|
|
@ -329,10 +356,11 @@ namespace llarp
|
|
|
|
|
auto &message_queue = outboundMessageQueues[pathid];
|
|
|
|
|
if(message_queue.size() > 0)
|
|
|
|
|
{
|
|
|
|
|
MessageQueueEntry entry = std::move(message_queue.front());
|
|
|
|
|
message_queue.pop();
|
|
|
|
|
const MessageQueueEntry &entry = message_queue.top();
|
|
|
|
|
|
|
|
|
|
Send(entry.router, entry.message);
|
|
|
|
|
message_queue.pop();
|
|
|
|
|
|
|
|
|
|
empty_count = 0;
|
|
|
|
|
sent_count++;
|
|
|
|
|
}
|
|
|
|
@ -349,6 +377,9 @@ namespace llarp
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
m_queueStats.perTickMax =
|
|
|
|
|
std::max((uint32_t)sent_count, m_queueStats.perTickMax);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void
|
|
|
|
@ -372,8 +403,7 @@ namespace llarp
|
|
|
|
|
|
|
|
|
|
while(!movedMessages.empty())
|
|
|
|
|
{
|
|
|
|
|
MessageQueueEntry entry = std::move(movedMessages.front());
|
|
|
|
|
movedMessages.pop();
|
|
|
|
|
const MessageQueueEntry &entry = movedMessages.top();
|
|
|
|
|
|
|
|
|
|
if(status == SendStatus::Success)
|
|
|
|
|
{
|
|
|
|
@ -383,6 +413,7 @@ namespace llarp
|
|
|
|
|
{
|
|
|
|
|
DoCallback(entry.message.second, status);
|
|
|
|
|
}
|
|
|
|
|
movedMessages.pop();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|