Merge pull request #1907 from majestrate/link-layer-priority-2022-05-02

propagate message priority to link layer
pull/1912/head
majestrate 2 years ago committed by GitHub
commit bad98b5476
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -23,6 +23,9 @@ local submodules = {
commands: submodule_commands,
};
// cmake options for static deps mirror
local ci_mirror_opts = '-DLOCAL_MIRROR=https://oxen.rocks/deps ';
local apt_get_quiet = 'apt-get -o=Dpkg::Use-Pty=0 -q';
// Regular build on a debian-like system:
@ -140,7 +143,7 @@ local windows_cross_pipeline(name,
'eatmydata ' + apt_get_quiet + ' install --no-install-recommends -y build-essential cmake git pkg-config ccache g++-mingw-w64-x86-64-posix nsis zip automake libtool',
'update-alternatives --set x86_64-w64-mingw32-gcc /usr/bin/x86_64-w64-mingw32-gcc-posix',
'update-alternatives --set x86_64-w64-mingw32-g++ /usr/bin/x86_64-w64-mingw32-g++-posix',
'VERBOSE=1 JOBS=' + jobs + ' ./contrib/windows.sh',
'VERBOSE=1 JOBS=' + jobs + ' ./contrib/windows.sh ' + ci_mirror_opts,
] + extra_cmds,
},
],
@ -171,7 +174,7 @@ local linux_cross_pipeline(name,
commands: [
'echo "Building on ${DRONE_STAGE_MACHINE}"',
'VERBOSE=1 JOBS=' + jobs + ' ./contrib/cross.sh ' + std.join(' ', cross_targets) + (if std.length(cmake_extra) > 0 then ' -- ' + cmake_extra else ''),
] + extra_cmds,
],
},
],
};
@ -270,7 +273,7 @@ local mac_builder(name,
// basic system headers. WTF apple:
'export SDKROOT="$(xcrun --sdk macosx --show-sdk-path)"',
'ulimit -n 1024', // because macos sets ulimit to 256 for some reason yeah idk
'./contrib/mac.sh',
'./contrib/mac.sh ' + ci_mirror_opts,
] + extra_cmds,
},
],

@ -48,11 +48,11 @@ set(ZMQ_SOURCE zeromq-${ZMQ_VERSION}.tar.gz)
set(ZMQ_HASH SHA512=e198ef9f82d392754caadd547537666d4fba0afd7d027749b3adae450516bcf284d241d4616cad3cb4ad9af8c10373d456de92dc6d115b037941659f141e7c0e
CACHE STRING "libzmq source hash")
set(LIBUV_VERSION 1.43.0 CACHE STRING "libuv version")
set(LIBUV_VERSION 1.44.1 CACHE STRING "libuv version")
set(LIBUV_MIRROR ${LOCAL_MIRROR} https://dist.libuv.org/dist/v${LIBUV_VERSION}
CACHE STRING "libuv mirror(s)")
set(LIBUV_SOURCE libuv-v${LIBUV_VERSION}.tar.gz)
set(LIBUV_HASH SHA256=90d72bb7ae18de2519d0cac70eb89c319351146b90cd3f91303a492707e693a4
set(LIBUV_HASH SHA512=b4f8944e2c79e3a6a31ded6cccbe4c0eeada50db6bc8a448d7015642795012a4b80ffeef7ca455bb093c59a8950d0e1430566c3c2fa87b73f82699098162d834
CACHE STRING "libuv source hash")
set(ZLIB_VERSION 1.2.12 CACHE STRING "zlib version")

@ -12,10 +12,12 @@
#include <llarp/vpn/packet_router.hpp>
#include <future>
#include <queue>
#include <type_traits>
#include <variant>
#include <llarp/service/protocol_type.hpp>
#include <llarp/util/priority_queue.hpp>
namespace llarp
{
@ -185,14 +187,14 @@ namespace llarp
net::IPPacket pkt;
bool
operator<(const WritePacket& other) const
operator>(const WritePacket& other) const
{
return other.seqno < seqno;
return seqno > other.seqno;
}
};
/// queue for sending packets to user from network
std::priority_queue<WritePacket> m_NetworkToUserPktQueue;
util::ascending_priority_queue<WritePacket> m_NetworkToUserPktQueue;
void
Pump(llarp_time_t now) override;

@ -10,12 +10,14 @@ namespace llarp
uint64_t msgid,
ILinkSession::Message_t msg,
llarp_time_t now,
ILinkSession::CompletionHandler handler)
ILinkSession::CompletionHandler handler,
uint16_t priority)
: m_Data{std::move(msg)}
, m_MsgID{msgid}
, m_Completed{handler}
, m_LastFlush{now}
, m_StartedAt{now}
, m_ResendPriority{priority}
{
const llarp_buffer_t buf(m_Data);
CryptoManager::instance()->shorthash(m_Digest, buf);

@ -40,7 +40,8 @@ namespace llarp
uint64_t msgid,
ILinkSession::Message_t data,
llarp_time_t now,
ILinkSession::CompletionHandler handler);
ILinkSession::CompletionHandler handler,
uint16_t priority);
ILinkSession::Message_t m_Data;
uint64_t m_MsgID = 0;
@ -49,6 +50,15 @@ namespace llarp
llarp_time_t m_LastFlush = 0s;
ShortHash m_Digest;
llarp_time_t m_StartedAt = 0s;
uint16_t m_ResendPriority;
bool
operator<(const OutboundMessage& msg) const
{
// yes, the first order is reversed as higher means more important
// second part is for queue order
return msg.m_ResendPriority < m_ResendPriority or m_MsgID < msg.m_MsgID;
}
ILinkSession::Packet_t
XMIT() const;

@ -5,6 +5,8 @@
#include <llarp/util/meta/memfn.hpp>
#include <llarp/router/abstractrouter.hpp>
#include <queue>
namespace llarp
{
namespace iwp
@ -126,10 +128,12 @@ namespace llarp
if (not msg.BEncode(&buf))
{
LogError("failed to encode LIM for ", m_RemoteAddr);
return;
}
if (!SendMessageBuffer(std::move(data), h))
if (not SendMessageBuffer(std::move(data), h))
{
LogError("failed to send LIM to ", m_RemoteAddr);
return;
}
LogTrace("sent LIM to ", m_RemoteAddr);
}
@ -183,7 +187,7 @@ namespace llarp
bool
Session::SendMessageBuffer(
ILinkSession::Message_t buf, ILinkSession::CompletionHandler completed)
ILinkSession::Message_t buf, ILinkSession::CompletionHandler completed, uint16_t priority)
{
if (m_TXMsgs.size() >= MaxSendQueueSize)
{
@ -194,8 +198,9 @@ namespace llarp
const auto now = m_Parent->Now();
const auto msgid = m_TXID++;
const auto bufsz = buf.size();
auto& msg = m_TXMsgs.emplace(msgid, OutboundMessage{msgid, std::move(buf), now, completed})
.first->second;
auto& msg =
m_TXMsgs.emplace(msgid, OutboundMessage{msgid, std::move(buf), now, completed, priority})
.first->second;
TriggerPump();
EncryptAndSend(msg.XMIT());
if (bufsz > FragmentSize)
@ -253,15 +258,22 @@ namespace llarp
msg.SendACKS(util::memFn(&Session::EncryptAndSend, this), now);
}
}
std::priority_queue<
OutboundMessage*,
std::vector<OutboundMessage*>,
ComparePtr<OutboundMessage*>>
to_resend;
for (auto& [id, msg] : m_TXMsgs)
{
if (msg.ShouldFlush(now))
{
msg.FlushUnAcked(util::memFn(&Session::EncryptAndSend, this), now);
}
to_resend.push(&msg);
}
if (not to_resend.empty())
{
for (auto& msg = to_resend.top(); not to_resend.empty(); to_resend.pop())
msg->FlushUnAcked(util::memFn(&Session::EncryptAndSend, this), now);
}
}
assert(shared_from_this().use_count() > 1);
if (not m_EncryptNext.empty())
{
m_Parent->QueueWork(

@ -8,8 +8,8 @@
#include <map>
#include <unordered_set>
#include <deque>
#include <queue>
#include <llarp/util/priority_queue.hpp>
#include <llarp/util/thread/queue.hpp>
namespace llarp
@ -60,7 +60,10 @@ namespace llarp
Tick(llarp_time_t now) override;
bool
SendMessageBuffer(ILinkSession::Message_t msg, CompletionHandler resultHandler) override;
SendMessageBuffer(
ILinkSession::Message_t msg,
CompletionHandler resultHandler,
uint16_t priority = 0) override;
void
Send_LL(const byte_t* buf, size_t sz);
@ -194,7 +197,7 @@ namespace llarp
/// maps rxid to time recieved
std::unordered_map<uint64_t, llarp_time_t> m_ReplayFilter;
/// rx messages to send in next round of multiacks
std::priority_queue<uint64_t, std::vector<uint64_t>, std::greater<>> m_SendMACKs;
util::ascending_priority_queue<uint64_t> m_SendMACKs;
using CryptoQueue_t = std::vector<Packet_t>;

@ -30,7 +30,8 @@ namespace llarp
SendTo(
const RouterID& remote,
const llarp_buffer_t& buf,
ILinkSession::CompletionHandler completed) = 0;
ILinkSession::CompletionHandler completed,
uint16_t priority = 0) = 0;
virtual bool
HasSessionTo(const RouterID& remote) const = 0;

@ -19,7 +19,7 @@ namespace llarp
// TODO: may want to add some memory of session failures for a given
// router on a given link and not return that link here for a
// duration
if (!link->IsCompatable(rc))
if (not link->IsCompatable(rc))
continue;
return link;
@ -36,7 +36,10 @@ namespace llarp
bool
LinkManager::SendTo(
const RouterID& remote, const llarp_buffer_t& buf, ILinkSession::CompletionHandler completed)
const RouterID& remote,
const llarp_buffer_t& buf,
ILinkSession::CompletionHandler completed,
uint16_t priority)
{
if (stopping)
return false;
@ -51,7 +54,7 @@ namespace llarp
return false;
}
return link->SendTo(remote, buf, completed);
return link->SendTo(remote, buf, completed, priority);
}
bool

@ -28,7 +28,8 @@ namespace llarp
SendTo(
const RouterID& remote,
const llarp_buffer_t& buf,
ILinkSession::CompletionHandler completed) override;
ILinkSession::CompletionHandler completed,
uint16_t priority) override;
bool
HasSessionTo(const RouterID& remote) const override;

@ -440,7 +440,10 @@ namespace llarp
bool
ILinkLayer::SendTo(
const RouterID& remote, const llarp_buffer_t& buf, ILinkSession::CompletionHandler completed)
const RouterID& remote,
const llarp_buffer_t& buf,
ILinkSession::CompletionHandler completed,
uint16_t priority)
{
std::shared_ptr<ILinkSession> s;
{
@ -459,7 +462,7 @@ namespace llarp
}
ILinkSession::Message_t pkt(buf.sz);
std::copy_n(buf.base, buf.sz, pkt.begin());
return s && s->SendMessageBuffer(std::move(pkt), completed);
return s && s->SendMessageBuffer(std::move(pkt), completed, priority);
}
bool

@ -148,7 +148,8 @@ namespace llarp
SendTo(
const RouterID& remote,
const llarp_buffer_t& buf,
ILinkSession::CompletionHandler completed);
ILinkSession::CompletionHandler completed,
uint16_t priority);
virtual bool
GetOurAddressInfo(AddressInfo& addr) const;

@ -57,7 +57,7 @@ namespace llarp
/// send a message buffer to the remote endpoint
virtual bool
SendMessageBuffer(Message_t, CompletionHandler handler) = 0;
SendMessageBuffer(Message_t, CompletionHandler handler, uint16_t priority) = 0;
/// start the connection
virtual void

@ -30,25 +30,28 @@ namespace llarp
DoCallback(callback, SendStatus::InvalidRouter);
return true;
}
const uint16_t priority = msg.Priority();
MessageQueueEntry ent;
ent.router = remote;
ent.inform = std::move(callback);
ent.pathid = msg.pathid;
ent.priority = msg.Priority();
std::array<byte_t, MAX_LINK_MSG_SIZE> linkmsg_buffer;
llarp_buffer_t buf(linkmsg_buffer);
llarp_buffer_t buf{linkmsg_buffer};
if (!EncodeBuffer(msg, buf))
{
return false;
}
Message message;
message.first.resize(buf.sz);
message.second = callback;
ent.message.resize(buf.sz);
std::copy_n(buf.base, buf.sz, message.first.data());
std::copy_n(buf.base, buf.sz, ent.message.data());
// if we have a session to the destination, queue the message and return
if (_router->linkManager().HasSessionTo(remote))
{
QueueOutboundMessage(remote, std::move(message), msg.pathid, priority);
QueueOutboundMessage(std::move(ent));
return true;
}
@ -58,16 +61,11 @@ namespace llarp
// in progress.
bool shouldCreateSession = false;
{
util::Lock l(_mutex);
util::Lock l{_mutex};
// create queue for <remote> if it doesn't exist, and get iterator
auto [queue_itr, is_new] = pendingSessionMessageQueues.emplace(remote, MessageQueue());
MessageQueueEntry entry;
entry.priority = priority;
entry.message = message;
entry.router = remote;
queue_itr->second.push(std::move(entry));
queue_itr->second.push(std::move(ent));
shouldCreateSession = is_new;
}
@ -86,6 +84,9 @@ namespace llarp
m_Killer.TryAccess([this]() {
recentlyRemovedPaths.Decay();
ProcessOutboundQueue();
// TODO: this probably shouldn't be pumping, as it defeats the purpose
// of having a limit on sends per tick, but chaning it is potentially bad
// and requires testing so it should be changed later.
if (/*bool more = */ SendRoundRobin())
_router->TriggerPump();
});
@ -190,52 +191,48 @@ namespace llarp
}
bool
OutboundMessageHandler::Send(const RouterID& remote, const Message& msg)
OutboundMessageHandler::Send(const MessageQueueEntry& ent)
{
const llarp_buffer_t buf(msg.first);
auto callback = msg.second;
const llarp_buffer_t buf{ent.message};
m_queueStats.sent++;
return _router->linkManager().SendTo(remote, buf, [=](ILinkSession::DeliveryStatus status) {
if (status == ILinkSession::DeliveryStatus::eDeliverySuccess)
DoCallback(callback, SendStatus::Success);
else
{
DoCallback(callback, SendStatus::Congestion);
}
});
SendStatusHandler callback = ent.inform;
return _router->linkManager().SendTo(
ent.router,
buf,
[this, callback](ILinkSession::DeliveryStatus status) {
if (status == ILinkSession::DeliveryStatus::eDeliverySuccess)
DoCallback(callback, SendStatus::Success);
else
{
DoCallback(callback, SendStatus::Congestion);
}
},
ent.priority);
}
bool
OutboundMessageHandler::SendIfSession(const RouterID& remote, const Message& msg)
OutboundMessageHandler::SendIfSession(const MessageQueueEntry& ent)
{
if (_router->linkManager().HasSessionTo(remote))
if (_router->linkManager().HasSessionTo(ent.router))
{
return Send(remote, msg);
return Send(ent);
}
return false;
}
bool
OutboundMessageHandler::QueueOutboundMessage(
const RouterID& remote, Message&& msg, const PathID_t& pathid, uint16_t priority)
OutboundMessageHandler::QueueOutboundMessage(MessageQueueEntry entry)
{
MessageQueueEntry entry;
entry.message = std::move(msg);
// copy callback in case we need to call it, so we can std::move(entry)
auto callback_copy = entry.message.second;
entry.router = remote;
entry.pathid = pathid;
entry.priority = priority;
auto callback = entry.inform;
if (outboundQueue.tryPushBack(std::move(entry)) != llarp::thread::QueueReturn::Success)
{
m_queueStats.dropped++;
DoCallback(callback_copy, SendStatus::Congestion);
DoCallback(callback, SendStatus::Congestion);
}
else
{
m_queueStats.queued++;
uint32_t queueSize = outboundQueue.size();
m_queueStats.queueWatermark = std::max(queueSize, m_queueStats.queueWatermark);
}
@ -272,7 +269,7 @@ namespace llarp
}
else
{
DoCallback(entry.message.second, SendStatus::Congestion);
DoCallback(entry.inform, SendStatus::Congestion);
m_queueStats.dropped++;
}
}
@ -288,7 +285,7 @@ namespace llarp
while (not routing_mq.empty())
{
const MessageQueueEntry& entry = routing_mq.top();
Send(entry.router, entry.message);
Send(entry);
routing_mq.pop();
}
@ -331,7 +328,7 @@ namespace llarp
{
const MessageQueueEntry& entry = message_queue.top();
Send(entry.router, entry.message);
Send(entry);
message_queue.pop();
consecutive_empty = 0;
@ -380,11 +377,11 @@ namespace llarp
if (status == SendStatus::Success)
{
Send(entry.router, entry.message);
Send(entry);
}
else
{
DoCallback(entry.message.second, status);
DoCallback(entry.inform, status);
}
movedMessages.pop();
}

@ -6,12 +6,12 @@
#include <llarp/util/thread/queue.hpp>
#include <llarp/util/decaying_hashset.hpp>
#include <llarp/path/path_types.hpp>
#include <llarp/util/priority_queue.hpp>
#include <llarp/router_id.hpp>
#include <list>
#include <unordered_map>
#include <utility>
#include <queue>
struct llarp_buffer_t;
@ -74,22 +74,21 @@ namespace llarp
Init(AbstractRouter* router);
private:
using Message = std::pair<std::vector<byte_t>, SendStatusHandler>;
/* A message that has been queued for sending, but not yet
* processed into an individual path's message queue.
*/
struct MessageQueueEntry
{
uint16_t priority;
Message message;
std::vector<byte_t> message;
SendStatusHandler inform;
PathID_t pathid;
RouterID router;
bool
operator<(const MessageQueueEntry& other) const
operator>(const MessageQueueEntry& other) const
{
return other.priority < priority;
return priority > other.priority;
}
};
@ -104,7 +103,7 @@ namespace llarp
uint32_t numTicks = 0;
};
using MessageQueue = std::priority_queue<MessageQueueEntry>;
using MessageQueue = util::ascending_priority_queue<MessageQueueEntry>;
/* If a session is not yet created with the destination router for a message,
* a special queue is created for that router and an attempt is made to
@ -131,14 +130,14 @@ namespace llarp
* returns the result of the call to LinkManager::SendTo()
*/
bool
Send(const RouterID& remote, const Message& msg);
Send(const MessageQueueEntry& ent);
/* Sends the message along to the link layer if we have a session to the remote
*
* returns the result of the Send() call, or false if no session.
*/
bool
SendIfSession(const RouterID& remote, const Message& msg);
SendIfSession(const MessageQueueEntry& ent);
/* queues a message to the shared outbound message queue.
*
@ -149,8 +148,7 @@ namespace llarp
* are placed in their paths' respective individual queues.
*/
bool
QueueOutboundMessage(
const RouterID& remote, Message&& msg, const PathID_t& pathid, uint16_t priority = 0);
QueueOutboundMessage(MessageQueueEntry entry);
/* Processes messages on the shared message queue into their paths' respective
* individual queues.

@ -116,7 +116,7 @@ namespace llarp
m_router->NotifyRouterEvent<tooling::RCGossipSentEvent>(m_router->pubkey(), rc);
// send message
peerSession->SendMessageBuffer(std::move(msg), nullptr);
peerSession->SendMessageBuffer(std::move(msg), nullptr, gossip.Priority());
});
return true;
}

@ -31,6 +31,7 @@
#include <llarp/link/link_manager.hpp>
#include <llarp/tooling/dht_event.hpp>
#include <llarp/quic/tunnel.hpp>
#include <llarp/util/priority_queue.hpp>
#include <optional>
#include <utility>
@ -1629,7 +1630,7 @@ namespace llarp
session->FlushDownstream();
// handle inbound traffic sorted
std::priority_queue<ProtocolMessage> queue;
util::ascending_priority_queue<ProtocolMessage> queue;
while (not m_InboundTrafficQueue.empty())
{
// succ it out

@ -64,9 +64,9 @@ namespace llarp
ProcessAsync(path::Path_ptr p, PathID_t from, std::shared_ptr<ProtocolMessage> self);
bool
operator<(const ProtocolMessage& other) const
operator>(const ProtocolMessage& other) const
{
return other.seqno < seqno;
return seqno > other.seqno;
}
};

@ -0,0 +1,13 @@
#pragma once
#include <queue>
#include <vector>
namespace llarp::util
{
/// priority queue that uses operator > instead of operator <
template <typename T, typename Container = std::vector<T>>
using ascending_priority_queue =
std::priority_queue<T, Container, std::greater<typename Container::value_type>>;
} // namespace llarp::util
Loading…
Cancel
Save