add likn layer delivery timeout notification for iwp

pull/783/head
Jeff Becker 5 years ago
parent 94f8531776
commit 0241851b72
No known key found for this signature in database
GPG Key ID: F357B3B42F6F9B05

@ -11,6 +11,8 @@
#include <util/memfn.hpp>
#include <util/str.hpp>
#include <absl/strings/strip.h>
#include <cstdlib>
#include <fstream>
#include <ios>
@ -208,29 +210,20 @@ namespace llarp
std::string v = tostr(val);
std::string::size_type idx;
static constexpr char delimiter = ',';
static const auto strip_spaces = [](const auto &begin,
const auto &end) -> std::string {
std::string val;
std::for_each(begin, end, [&val](const char &ch) {
// strip spaces
if(::isspace(ch) || ch == delimiter)
return;
val += ch;
});
return val;
};
do
{
idx = v.find_first_of(delimiter);
if(idx != std::string::npos)
{
parsed_opts.emplace(strip_spaces(v.begin(), v.begin() + idx));
std::string val = v.substr(0, idx);
absl::StripAsciiWhitespace(&val);
parsed_opts.emplace(std::move(val));
v = v.substr(idx + 1);
}
else
{
parsed_opts.insert(strip_spaces(v.begin(), v.end()));
absl::StripAsciiWhitespace(&v);
parsed_opts.insert(std::move(v));
}
} while(idx != std::string::npos);
std::unordered_set< std::string > opts;

@ -6,10 +6,12 @@ namespace llarp
namespace iwp
{
OutboundMessage::OutboundMessage(uint64_t msgid, const llarp_buffer_t &pkt,
llarp_time_t now,
ILinkSession::CompletionHandler handler)
: m_Size{(uint16_t)std::min(pkt.sz, MAX_LINK_MSG_SIZE)}
, m_MsgID{msgid}
, m_Completed{handler}
, m_StartedAt{now}
{
m_Data.Zero();
std::copy_n(pkt.base, m_Size, m_Data.begin());
@ -95,13 +97,35 @@ namespace llarp
return true;
}
InboundMessage::InboundMessage(uint64_t msgid, uint16_t sz, ShortHash h)
: m_Digset{std::move(h)}, m_Size{sz}, m_MsgID{msgid}
bool
OutboundMessage::IsTimedOut(const llarp_time_t now) const
{
// TODO: make configurable by outbound message deliverer
return now > m_StartedAt && now - m_StartedAt > 5000;
}
void
OutboundMessage::InformTimeout()
{
if(m_Completed)
{
m_Completed(ILinkSession::DeliveryStatus::eDeliveryDropped);
}
m_Completed = nullptr;
}
InboundMessage::InboundMessage(uint64_t msgid, uint16_t sz, ShortHash h,
llarp_time_t now)
: m_Digset{std::move(h)}
, m_Size{sz}
, m_MsgID{msgid}
, m_LastActiveAt{now}
{
}
void
InboundMessage::HandleData(uint16_t idx, const byte_t *ptr)
InboundMessage::HandleData(uint16_t idx, const byte_t *ptr,
llarp_time_t now)
{
if(idx + FragmentSize > MAX_LINK_MSG_SIZE)
return;
@ -109,6 +133,7 @@ namespace llarp
std::copy_n(ptr, FragmentSize, dst);
m_Acks.set(idx / FragmentSize);
LogDebug("got fragment ", idx / FragmentSize, " of ", m_Size);
m_LastActiveAt = now;
}
std::vector< byte_t >

@ -33,6 +33,7 @@ namespace llarp
{
OutboundMessage() = default;
OutboundMessage(uint64_t msgid, const llarp_buffer_t &pkt,
llarp_time_t now,
ILinkSession::CompletionHandler handler);
AlignedBuffer< MAX_LINK_MSG_SIZE > m_Data;
@ -42,6 +43,7 @@ namespace llarp
ILinkSession::CompletionHandler m_Completed;
llarp_time_t m_LastFlush = 0;
ShortHash digest;
llarp_time_t m_StartedAt = 0;
std::vector< byte_t >
XMIT() const;
@ -61,26 +63,37 @@ namespace llarp
bool
IsTransmitted() const;
bool
IsTimedOut(llarp_time_t now) const;
void
InformTimeout();
};
struct InboundMessage
{
InboundMessage() = default;
InboundMessage(uint64_t msgid, uint16_t sz, ShortHash h);
InboundMessage(uint64_t msgid, uint16_t sz, ShortHash h,
llarp_time_t now);
AlignedBuffer< MAX_LINK_MSG_SIZE > m_Data;
ShortHash m_Digset;
uint16_t m_Size = 0;
uint64_t m_MsgID = 0;
llarp_time_t m_LastACKSent = 0;
uint16_t m_Size = 0;
uint64_t m_MsgID = 0;
llarp_time_t m_LastACKSent = 0;
llarp_time_t m_LastActiveAt = 0;
std::bitset< MAX_LINK_MSG_SIZE / FragmentSize > m_Acks;
void
HandleData(uint16_t idx, const byte_t *ptr);
HandleData(uint16_t idx, const byte_t *ptr, llarp_time_t now);
bool
IsCompleted() const;
bool
IsTimedOut(llarp_time_t now) const;
bool
Verify() const;

@ -147,15 +147,15 @@ namespace llarp
Session::SendMessageBuffer(const llarp_buffer_t& buf,
ILinkSession::CompletionHandler completed)
{
const auto now = m_Parent->Now();
const auto msgid = m_TXID++;
auto& msg =
m_TXMsgs.emplace(msgid, OutboundMessage{msgid, buf, completed})
m_TXMsgs.emplace(msgid, OutboundMessage{msgid, buf, now, completed})
.first->second;
const auto xmit = msg.XMIT();
const llarp_buffer_t pkt{xmit};
EncryptAndSend(pkt);
msg.FlushUnAcked(util::memFn(&Session::EncryptAndSend, this),
m_Parent->Now());
msg.FlushUnAcked(util::memFn(&Session::EncryptAndSend, this), now);
LogDebug("send message ", msgid);
return true;
}
@ -168,19 +168,21 @@ namespace llarp
{
if(ShouldPing())
SendKeepAlive();
for(auto itr = m_RXMsgs.begin(); itr != m_RXMsgs.end(); ++itr)
for(auto& item : m_RXMsgs)
{
if(itr->second.ShouldSendACKS(now))
if(item.second.ShouldSendACKS(now))
{
itr->second.SendACKS(util::memFn(&Session::EncryptAndSend, this),
item.second.SendACKS(util::memFn(&Session::EncryptAndSend, this),
now);
}
}
for(auto itr = m_TXMsgs.begin(); itr != m_TXMsgs.end(); ++itr)
for(auto& item : m_TXMsgs)
{
if(itr->second.ShouldFlush(now))
itr->second.FlushUnAcked(
if(item.second.ShouldFlush(now))
{
item.second.FlushUnAcked(
util::memFn(&Session::EncryptAndSend, this), now);
}
}
}
}
@ -229,8 +231,27 @@ namespace llarp
return now - m_CreatedAt > SessionAliveTimeout;
}
void Session::Tick(llarp_time_t)
void
Session::Tick(llarp_time_t now)
{
// remove pending outbound messsages that timed out
// inform waiters
auto itr = m_TXMsgs.begin();
while(itr != m_TXMsgs.end())
{
if(itr->second.IsTimedOut(now))
{
itr->second.InformTimeout();
itr = m_TXMsgs.erase(itr);
}
else
++itr;
}
// remove pending inbound messages that timed out
std::remove_if(m_RXMsgs.begin(), m_RXMsgs.end(),
[now](const auto& item) -> bool {
return item.second.IsTimedOut(now);
});
}
using Introduction = AlignedBuffer< 64 >;

@ -170,18 +170,13 @@ namespace llarp
{
const llarp_buffer_t buf(msg.first);
auto callback = msg.second;
if(!_linkManager->SendTo(
remote, buf, [=](ILinkSession::DeliveryStatus status) {
if(status == ILinkSession::DeliveryStatus::eDeliverySuccess)
DoCallback(callback, SendStatus::Success);
else
DoCallback(callback, SendStatus::Congestion);
}))
{
DoCallback(callback, SendStatus::Congestion);
return false;
}
return true;
return _linkManager->SendTo(
remote, buf, [=](ILinkSession::DeliveryStatus status) {
if(status == ILinkSession::DeliveryStatus::eDeliverySuccess)
DoCallback(callback, SendStatus::Success);
else
DoCallback(callback, SendStatus::Congestion);
});
}
bool

Loading…
Cancel
Save