diff --git a/contrib/py/quic_tester.py b/contrib/py/quic_tester.py index 83b0a5cd9..03cff9c1b 100755 --- a/contrib/py/quic_tester.py +++ b/contrib/py/quic_tester.py @@ -56,7 +56,8 @@ if not 2 <= len(sys.argv) or any(x in y for x in ("--help", "-h") for y in sys.a action = sys.argv[1].lower() host = sys.argv[2] port = int(sys.argv[3]) - +request_path = len(sys.argv) >= 5 and sys.argv[4] or '/' + beginning_of_time = time.clock_gettime(time.CLOCK_MONOTONIC) #print("Connecting to {}".format(remote), file=sys.stderr) @@ -95,8 +96,8 @@ def success_or_die(response): if action == "connect": result = success_or_die(rpc("llarp.quic_connect", args)) print(result) - cmd = "curl -vv http://{}".format(result["addr"]) - print("exec: {}".format(cmd)) + cmd = "curl -vv http://{}{} -o /dev/null".format(result["addr"], request_path) + print("{}".format(cmd)) os.system(cmd) if action == "listen": result = success_or_die(rpc("llarp.quic_listener", args)) diff --git a/llarp/handlers/null.hpp b/llarp/handlers/null.hpp index 496c8122b..9ce9f63d5 100644 --- a/llarp/handlers/null.hpp +++ b/llarp/handlers/null.hpp @@ -19,7 +19,6 @@ namespace llarp r->loop()->add_ticker([this] { while (not m_InboundQuic.empty()) { - LogInfo(m_InboundQuic.top().seqno); m_InboundQuic.top().process(); m_InboundQuic.pop(); } diff --git a/llarp/iwp/message_buffer.cpp b/llarp/iwp/message_buffer.cpp index 56ea3ead6..495fb076b 100644 --- a/llarp/iwp/message_buffer.cpp +++ b/llarp/iwp/message_buffer.cpp @@ -128,7 +128,7 @@ namespace llarp byte_t* dst = m_Data.data() + idx; std::copy_n(buf.base, buf.sz, dst); m_Acks.set(idx / FragmentSize); - LogDebug("got fragment ", idx / FragmentSize); + LogTrace("got fragment ", idx / FragmentSize); m_LastActiveAt = now; } diff --git a/llarp/iwp/session.cpp b/llarp/iwp/session.cpp index 3e15dee2f..557a45c4d 100644 --- a/llarp/iwp/session.cpp +++ b/llarp/iwp/session.cpp @@ -60,7 +60,7 @@ namespace llarp void Session::Send_LL(const byte_t* buf, size_t sz) { - LogDebug("send ", sz, " to ", m_RemoteAddr); + LogTrace("send ", sz, " to ", m_RemoteAddr); const llarp_buffer_t pkt(buf, sz); m_Parent->SendTo_LL(m_RemoteAddr, pkt); m_LastTX = time_now_ms(); @@ -128,7 +128,7 @@ namespace llarp { LogError("failed to send LIM to ", m_RemoteAddr); } - LogDebug("sent LIM to ", m_RemoteAddr); + LogTrace("sent LIM to ", m_RemoteAddr); } void @@ -145,7 +145,7 @@ namespace llarp void Session::EncryptWorker(CryptoQueue_t msgs) { - LogDebug("encrypt worker ", msgs.size(), " messages"); + LogTrace("encrypt worker ", msgs.size(), " messages"); for (auto& pkt : msgs) { llarp_buffer_t pktbuf{pkt}; @@ -195,7 +195,7 @@ namespace llarp msg.FlushUnAcked(util::memFn(&Session::EncryptAndSend, this), now); } m_Stats.totalInFlightTX++; - LogDebug("send message ", msgid); + LogDebug("send message ", msgid, " to ", m_RemoteAddr); return true; } @@ -211,7 +211,7 @@ namespace llarp auto mack = CreatePacket(Command::eMACK, 1 + (numAcks * sizeof(uint64_t))); mack[PacketOverhead + CommandOverhead] = byte_t{static_cast(numAcks)}; byte_t* ptr = mack.data() + 3 + PacketOverhead; - LogDebug("send ", numAcks, " macks to ", m_RemoteAddr); + LogTrace("send ", numAcks, " macks to ", m_RemoteAddr); const auto& itr = m_SendMACKs.top(); while (numAcks > 0) { @@ -367,7 +367,7 @@ namespace llarp { m_Stats.totalDroppedTX++; m_Stats.totalInFlightTX--; - LogDebug("Dropped unacked packet to ", m_RemoteAddr); + LogTrace("Dropped unacked packet to ", m_RemoteAddr); itr->second.InformTimeout(); itr = m_TXMsgs.erase(itr); } @@ -439,7 +439,7 @@ namespace llarp LogError("failed to transport_dh_client on outbound session to ", m_RemoteAddr); return; } - LogDebug("sent intro to ", m_RemoteAddr); + LogTrace("sent intro to ", m_RemoteAddr); } void @@ -589,7 +589,7 @@ namespace llarp const TunnelNonce N{curbuf.base}; curbuf.base += 32; curbuf.sz -= 32; - LogDebug("decrypt: ", curbuf.sz, " bytes from ", m_RemoteAddr); + LogTrace("decrypt: ", curbuf.sz, " bytes from ", m_RemoteAddr); return CryptoManager::instance()->xchacha20(curbuf, m_SessionKey, N); } @@ -641,7 +641,7 @@ namespace llarp auto queue = m_PlaintextRecv.popFront(); for (auto& result : queue) { - LogDebug("Command ", int(result[PacketOverhead + 1])); + LogTrace("Command ", int(result[PacketOverhead + 1]), " from ", m_RemoteAddr); switch (result[PacketOverhead + 1]) { case Command::eXMIT: @@ -688,12 +688,12 @@ namespace llarp LogError("short mack from ", m_RemoteAddr); return; } - LogDebug("got ", int(numAcks), " mack from ", m_RemoteAddr); + LogTrace("got ", int(numAcks), " mack from ", m_RemoteAddr); byte_t* ptr = data.data() + CommandOverhead + PacketOverhead + 1; while (numAcks > 0) { uint64_t acked = bufbe64toh(ptr); - LogDebug("mack containing txid=", acked, " from ", m_RemoteAddr); + LogTrace("mack containing txid=", acked, " from ", m_RemoteAddr); auto itr = m_TXMsgs.find(acked); if (itr != m_TXMsgs.end()) { @@ -704,7 +704,7 @@ namespace llarp } else { - LogDebug("ignored mack for txid=", acked, " from ", m_RemoteAddr); + LogTrace("ignored mack for txid=", acked, " from ", m_RemoteAddr); } ptr += sizeof(uint64_t); numAcks--; @@ -720,7 +720,7 @@ namespace llarp return; } uint64_t txid = bufbe64toh(data.data() + CommandOverhead + PacketOverhead); - LogDebug("got nack on ", txid, " from ", m_RemoteAddr); + LogTrace("got nack on ", txid, " from ", m_RemoteAddr); auto itr = m_TXMsgs.find(txid); if (itr != m_TXMsgs.end()) { @@ -744,7 +744,7 @@ namespace llarp uint64_t rxid = bufbe64toh(data.data() + CommandOverhead + sizeof(uint16_t) + PacketOverhead); ShortHash h{ data.data() + CommandOverhead + sizeof(uint16_t) + sizeof(uint64_t) + PacketOverhead}; - LogDebug("rxid=", rxid, " sz=", sz, " h=", h.ToHex()); + LogTrace("rxid=", rxid, " sz=", sz, " h=", h.ToHex(), " from ", m_RemoteAddr); m_LastRX = m_Parent->Now(); { // check for replay @@ -752,7 +752,7 @@ namespace llarp if (itr != m_ReplayFilter.end()) { m_SendMACKs.emplace(rxid); - LogDebug("duplicate rxid=", rxid, " from ", m_RemoteAddr); + LogTrace("duplicate rxid=", rxid, " from ", m_RemoteAddr); return; } } @@ -784,7 +784,7 @@ namespace llarp } } else - LogDebug("got duplicate xmit on ", rxid, " from ", m_RemoteAddr); + LogTrace("got duplicate xmit on ", rxid, " from ", m_RemoteAddr); } } @@ -804,14 +804,14 @@ namespace llarp { if (m_ReplayFilter.find(rxid) == m_ReplayFilter.end()) { - LogDebug("no rxid=", rxid, " for ", m_RemoteAddr); + LogTrace("no rxid=", rxid, " for ", m_RemoteAddr); auto nack = CreatePacket(Command::eNACK, 8); htobe64buf(nack.data() + PacketOverhead + CommandOverhead, rxid); EncryptAndSend(std::move(nack)); } else { - LogDebug("replay hit for rxid=", rxid, " for ", m_RemoteAddr); + LogTrace("replay hit for rxid=", rxid, " for ", m_RemoteAddr); m_SendMACKs.emplace(rxid); } return; @@ -844,6 +844,7 @@ namespace llarp { m_Parent->HandleMessage(this, msg.m_Data); EncryptAndSend(msg.ACKS()); + LogDebug("recv'd message ", rxid, " from ", m_RemoteAddr); } m_RXMsgs.erase(rxid); } @@ -862,14 +863,14 @@ namespace llarp auto itr = m_TXMsgs.find(txid); if (itr == m_TXMsgs.end()) { - LogDebug("no txid=", txid, " for ", m_RemoteAddr); + LogTrace("no txid=", txid, " for ", m_RemoteAddr); return; } itr->second.Ack(data[10 + PacketOverhead]); if (itr->second.IsTransmitted()) { - LogDebug("sent message ", itr->first); + LogDebug("sent message ", itr->first, " to ", m_RemoteAddr); itr->second.Completed(); itr = m_TXMsgs.erase(itr); } diff --git a/llarp/path/path.cpp b/llarp/path/path.cpp index 93821e0d3..0de477a0c 100644 --- a/llarp/path/path.cpp +++ b/llarp/path/path.cpp @@ -17,6 +17,7 @@ #include #include +#include namespace llarp { @@ -547,16 +548,14 @@ namespace llarp { for (const auto& msg : msgs) { - const llarp_buffer_t buf(msg.X); + const llarp_buffer_t buf{msg.X}; m_RXRate += buf.sz; - if (!HandleRoutingMessage(buf, r)) + if (HandleRoutingMessage(buf, r)) { - LogWarn("failed to handle downstream message"); - continue; + r->loop()->wakeup(); + m_LastRecvMessage = r->Now(); } - m_LastRecvMessage = r->Now(); } - FlushUpstream(r); } bool @@ -615,6 +614,7 @@ namespace llarp buf.sz = pad_size; } buf.cur = buf.base; + LogDebug("send routing message ", msg.S, " with ", buf.sz, " bytes to endpoint ", Endpoint()); return HandleUpstream(buf, N, r); } @@ -653,6 +653,7 @@ namespace llarp // send path latency test routing::PathLatencyMessage latency; latency.T = randint(); + latency.S = NextSeqNo(); m_LastLatencyTestID = latency.T; m_LastLatencyTestTime = now; if (!SendRoutingMessage(latency, r)) diff --git a/llarp/path/transit_hop.cpp b/llarp/path/transit_hop.cpp index 489275f98..084d8ec22 100644 --- a/llarp/path/transit_hop.cpp +++ b/llarp/path/transit_hop.cpp @@ -275,6 +275,7 @@ namespace llarp { llarp::routing::PathLatencyMessage reply; reply.L = msg.T; + reply.S = msg.S; return SendRoutingMessage(reply, r); } @@ -435,14 +436,13 @@ namespace llarp const llarp::routing::PathTransferMessage& msg, AbstractRouter* r) { auto path = r->pathContext().GetPathForTransfer(msg.P); - llarp::routing::DataDiscardMessage discarded(msg.P, msg.S); + llarp::routing::DataDiscardMessage discarded{msg.P, msg.S}; if (path == nullptr || msg.T.F != info.txID) { return SendRoutingMessage(discarded, r); } - std::array tmp; - llarp_buffer_t buf(tmp); + llarp_buffer_t buf{tmp}; if (!msg.T.BEncode(&buf)) { llarp::LogWarn(info, " failed to transfer data message, encode failed"); diff --git a/llarp/quic/tunnel.cpp b/llarp/quic/tunnel.cpp index 13dfcd8c5..b08338004 100644 --- a/llarp/quic/tunnel.cpp +++ b/llarp/quic/tunnel.cpp @@ -321,6 +321,7 @@ namespace llarp::quic TunnelManager::listen(SockAddr addr) { return listen([addr](std::string_view, uint16_t p) -> std::optional { + LogInfo("try accepting ", addr.getPort()); if (p == addr.getPort()) return addr; return std::nullopt; @@ -595,7 +596,8 @@ namespace llarp::quic { if (!ct.client) return; // Happens if we're still waiting for a path to build - assert(ct.client->get_connection()); + if (not ct.client->get_connection()) + return; auto& conn = *ct.client->get_connection(); int available = conn.get_streams_available(); while (available > 0 and not ct.pending_incoming.empty()) diff --git a/llarp/routing/message.hpp b/llarp/routing/message.hpp index 9b127ee8a..8274b956b 100644 --- a/llarp/routing/message.hpp +++ b/llarp/routing/message.hpp @@ -33,6 +33,12 @@ namespace llarp virtual void Clear() = 0; + + bool + operator<(const IMessage& other) const + { + return other.S < S; + } }; } // namespace routing diff --git a/llarp/routing/message_parser.cpp b/llarp/routing/message_parser.cpp index b17b1665e..68e015dc9 100644 --- a/llarp/routing/message_parser.cpp +++ b/llarp/routing/message_parser.cpp @@ -55,7 +55,7 @@ namespace llarp if (strbuf.sz != 1) return false; ourKey = *strbuf.cur; - LogDebug("routing message '", key, "'"); + LogDebug("routing message '", std::string{ourKey, 1}, "'"); switch (ourKey) { case 'D': @@ -123,6 +123,7 @@ namespace llarp if (bencode_read_dict(*this, ©)) { msg->from = from; + LogDebug("handle routing message ", msg->S, " from ", from); result = msg->HandleMessage(h, r); if (!result) { diff --git a/llarp/service/endpoint.cpp b/llarp/service/endpoint.cpp index 7ba3bdefe..d810d9941 100644 --- a/llarp/service/endpoint.cpp +++ b/llarp/service/endpoint.cpp @@ -96,9 +96,10 @@ namespace llarp } void - Endpoint::RegenAndPublishIntroSet(bool forceRebuild) + Endpoint::RegenAndPublishIntroSet() { const auto now = llarp::time_now_ms(); + m_LastIntrosetRegenAttempt = now; std::set introset; if (!GetCurrentIntroductionsWithFilter( introset, [now](const service::Introduction& intro) -> bool { @@ -109,8 +110,7 @@ namespace llarp "could not publish descriptors for endpoint ", Name(), " because we couldn't get enough valid introductions"); - if (ShouldBuildMore(now) || forceRebuild) - ManualRebuild(1); + ManualRebuild(1); return; } introSet().I.clear(); @@ -121,7 +121,7 @@ namespace llarp if (introSet().I.size() == 0) { LogWarn("not enough intros to publish introset for ", Name()); - if (ShouldBuildMore(now) || forceRebuild) + if (ShouldBuildMore(now)) ManualRebuild(1); return; } @@ -631,7 +631,7 @@ namespace llarp + (m_state->m_IntroSet.HasExpiredIntros(now) ? INTROSET_PUBLISH_RETRY_INTERVAL : INTROSET_PUBLISH_INTERVAL); - return now >= next_pub; + return now >= next_pub and m_LastIntrosetRegenAttempt + 1s <= now; } void @@ -916,7 +916,8 @@ namespace llarp routing::DHTMessage msg; auto txid = GenTXID(); msg.M.emplace_back(std::make_unique(txid, router)); - + if (path) + msg.S = path->NextSeqNo(); if (path && path->SendRoutingMessage(msg, Router())) { RouterLookupJob job(this, handler); @@ -1106,7 +1107,7 @@ namespace llarp } } m_SendQueue.tryPushBack( - SendEvent_t{std::make_shared(f, replyPath), path}); + SendEvent_t{std::make_shared(f, replyPath), path}); } void @@ -1148,7 +1149,7 @@ namespace llarp LogWarn("invalidating convotag T=", frame.T); RemoveConvoTag(frame.T); m_SendQueue.tryPushBack( - SendEvent_t{std::make_shared(f, frame.F), p}); + SendEvent_t{std::make_shared(f, frame.F), p}); } } } @@ -1158,7 +1159,8 @@ namespace llarp void Endpoint::HandlePathDied(path::Path_ptr p) { - RegenAndPublishIntroSet(true); + ManualRebuild(1); + RegenAndPublishIntroSet(); path::Builder::HandlePathDied(p); } @@ -1363,9 +1365,21 @@ namespace llarp LogWarn("SendToOrQueue failed: convo tag is zero"); return false; } + LogDebug(Name(), " send ", pkt.sz, " bytes on T=", tag); if (auto maybe = GetEndpointWithConvoTag(tag)) { - return SendToOrQueue(*maybe, pkt, t); + if (auto* ptr = std::get_if
(&*maybe)) + { + if (*ptr == m_Identity.pub.Addr()) + { + Loop()->wakeup(); + return HandleInboundPacket(tag, pkt, t, 0); + } + } + if (not SendToOrQueue(*maybe, pkt, t)) + return false; + Loop()->wakeup(); + return true; } LogDebug("SendToOrQueue failed: no endpoint for convo tag ", tag); return false; @@ -1392,14 +1406,30 @@ namespace llarp // send downstream packets to user for snode for (const auto& [router, session] : m_state->m_SNodeSessions) session.first->FlushDownstream(); - // send downstream traffic to user for hidden service + + // handle inbound traffic sorted + std::priority_queue queue; while (not m_InboundTrafficQueue.empty()) { - auto msg = m_InboundTrafficQueue.popFront(); - if (not HandleInboundPacket(msg->tag, msg->payload, msg->proto, msg->seqno)) + // succ it out + queue.emplace(std::move(*m_InboundTrafficQueue.popFront())); + } + while (not queue.empty()) + { + const auto& msg = queue.top(); + LogDebug( + Name(), + " handle inbound packet on ", + msg.tag, + " ", + msg.payload.size(), + " bytes seqno=", + msg.seqno); + if (not HandleInboundPacket(msg.tag, msg.payload, msg.proto, msg.seqno)) { LogWarn("Failed to handle inbound message"); } + queue.pop(); } auto router = Router(); @@ -1413,9 +1443,10 @@ namespace llarp // send queue flush while (not m_SendQueue.empty()) { - auto item = m_SendQueue.popFront(); - item.second->SendRoutingMessage(*item.first, router); - MarkConvoTagActive(item.first->T.T); + SendEvent_t item = m_SendQueue.popFront(); + item.first->S = item.second->NextSeqNo(); + if (item.second->SendRoutingMessage(*item.first, router)) + MarkConvoTagActive(item.first->T.T); } UpstreamFlush(router); @@ -1425,19 +1456,55 @@ namespace llarp std::optional Endpoint::GetBestConvoTagFor(std::variant remote) const { - // get convotag with higest timestamp + // get convotag with lowest estimated RTT if (auto ptr = std::get_if
(&remote)) { - llarp_time_t time = 0s; + llarp_time_t rtt = 30s; std::optional ret = std::nullopt; for (const auto& [tag, session] : Sessions()) { if (tag.IsZero()) continue; - if (session.remote.Addr() == *ptr and session.lastUsed >= time) + if (session.remote.Addr() == *ptr) { - time = session.lastUsed; - ret = tag; + if (*ptr == m_Identity.pub.Addr()) + { + return tag; + } + if (session.inbound) + { + auto path = GetPathByRouter(session.intro.router); + if (path) + { + const auto rttEstimate = (session.replyIntro.latency + path->intro.latency) * 2; + if (rttEstimate < rtt) + { + ret = tag; + rtt = rttEstimate; + } + } + else + { + LogWarn("no path for inbound session T=", tag); + } + } + else + { + auto range = m_state->m_RemoteSessions.equal_range(*ptr); + auto itr = range.first; + while (itr != range.second) + { + if (itr->second->ReadyToSend() and itr->second->estimatedRTT > 0s) + { + if (itr->second->estimatedRTT < rtt) + { + ret = tag; + rtt = itr->second->estimatedRTT; + } + } + itr++; + } + } } } return ret; @@ -1461,6 +1528,15 @@ namespace llarp { if (auto ptr = std::get_if
(&addr)) { + if (*ptr == m_Identity.pub.Addr()) + { + ConvoTag tag{}; + tag.Randomize(); + Sessions()[tag].inbound = false; + MarkConvoTagActive(tag); + Loop()->call_soon([tag, hook]() { hook(tag); }); + return true; + } return EnsurePathToService( *ptr, [hook](auto, auto* ctx) { @@ -1500,6 +1576,7 @@ namespace llarp LogTrace("SendToOrQueue: dropping because data.sz == 0"); return false; } + // inbound conversation const auto now = Now(); @@ -1562,6 +1639,10 @@ namespace llarp PutReplyIntroFor(f.T, m->introReply); m->sender = m_Identity.pub; m->seqno = GetSeqNoForConvo(f.T); + if (m->seqno == 0) + { + LogWarn(Name(), " no session T=", f.T); + } f.S = m->seqno; f.F = m->introReply.pathID; transfer->P = remoteIntro.pathID; diff --git a/llarp/service/endpoint.hpp b/llarp/service/endpoint.hpp index 31f34da3e..e2a174238 100644 --- a/llarp/service/endpoint.hpp +++ b/llarp/service/endpoint.hpp @@ -61,7 +61,7 @@ namespace llarp public IDataHandler, public EndpointBase { - static const size_t MAX_OUTBOUND_CONTEXT_COUNT = 4; + static const size_t MAX_OUTBOUND_CONTEXT_COUNT = 1; Endpoint(AbstractRouter* r, Context* parent); ~Endpoint() override; @@ -415,7 +415,7 @@ namespace llarp SupportsV6() const = 0; void - RegenAndPublishIntroSet(bool forceRebuild = false); + RegenAndPublishIntroSet(); IServiceLookup* GenerateLookupByTag(const Tag& tag); @@ -473,6 +473,9 @@ namespace llarp public: SendMessageQueue_t m_SendQueue; + private: + llarp_time_t m_LastIntrosetRegenAttempt = 0s; + protected: void FlushRecvData(); diff --git a/llarp/service/endpoint_types.hpp b/llarp/service/endpoint_types.hpp index 7eee10a68..17794955b 100644 --- a/llarp/service/endpoint_types.hpp +++ b/llarp/service/endpoint_types.hpp @@ -24,7 +24,7 @@ namespace llarp struct IServiceLookup; struct OutboundContext; - using Msg_ptr = std::shared_ptr; + using Msg_ptr = std::shared_ptr; using SendEvent_t = std::pair; using SendMessageQueue_t = thread::Queue; diff --git a/llarp/service/outbound_context.cpp b/llarp/service/outbound_context.cpp index 0f5d1a160..cd4c9f7c7 100644 --- a/llarp/service/outbound_context.cpp +++ b/llarp/service/outbound_context.cpp @@ -269,6 +269,7 @@ namespace llarp OutboundContext::ExtractStatus() const { auto obj = path::Builder::ExtractStatus(); + obj["estimatedRTT"] = to_json(estimatedRTT); obj["currentConvoTag"] = currentConvoTag.ToHex(); obj["remoteIntro"] = remoteIntro.ExtractStatus(); obj["sessionCreatedAt"] = to_json(createdAt); @@ -341,8 +342,26 @@ namespace llarp if (ReadyToSend() and m_ReadyHook) { - m_ReadyHook(this); - m_ReadyHook = nullptr; + KeepAlive(); + const auto path = GetPathByRouter(remoteIntro.router); + if (not path) + { + LogWarn(Name(), " ready but no path to ", remoteIntro.router, " ???"); + return false; + } + const auto rtt = (path->intro.latency + remoteIntro.latency) * 2; + m_router->loop()->call_later( + rtt, [rtt, self = shared_from_this(), hook = std::move(m_ReadyHook)]() { + LogInfo( + self->Name(), + " is ready, RTT is measured as ", + self->estimatedRTT, + " approximated as ", + rtt, + " delta=", + rtt - self->estimatedRTT); + hook(self.get()); + }); } // if we are dead return true so we are removed @@ -628,7 +647,7 @@ namespace llarp LogWarn("invalidating convotag T=", frame.T); m_Endpoint->RemoveConvoTag(frame.T); m_Endpoint->m_SendQueue.tryPushBack( - SendEvent_t{std::make_shared(f, frame.F), p}); + SendEvent_t{std::make_shared(f, frame.F), p}); } } return true; diff --git a/llarp/service/outbound_context.hpp b/llarp/service/outbound_context.hpp index a7d7e3fac..cf374b60c 100644 --- a/llarp/service/outbound_context.hpp +++ b/llarp/service/outbound_context.hpp @@ -126,6 +126,9 @@ namespace llarp return currentIntroSet; } + llarp_time_t + RTT() const; + private: /// swap remoteIntro with next intro void diff --git a/llarp/service/protocol.hpp b/llarp/service/protocol.hpp index 24b56ec82..623e2afa7 100644 --- a/llarp/service/protocol.hpp +++ b/llarp/service/protocol.hpp @@ -66,7 +66,7 @@ namespace llarp bool operator<(const ProtocolMessage& other) const { - return seqno < other.seqno; + return other.seqno < seqno; } }; diff --git a/llarp/service/sendcontext.cpp b/llarp/service/sendcontext.cpp index ede6b6ff9..5ad25ce3c 100644 --- a/llarp/service/sendcontext.cpp +++ b/llarp/service/sendcontext.cpp @@ -31,7 +31,7 @@ namespace llarp m_Endpoint->Loop()->call([this] { FlushUpstream(); }); } m_SendQueue.pushBack(std::make_pair( - std::make_shared(*msg, remoteIntro.pathID), path)); + std::make_shared(*msg, remoteIntro.pathID), path)); return true; } @@ -40,6 +40,7 @@ namespace llarp { auto r = m_Endpoint->Router(); std::unordered_set flushpaths; + auto rttRMS = 0ms; { do { @@ -47,11 +48,14 @@ namespace llarp if (not maybe) break; auto& item = *maybe; + item.first->S = item.second->NextSeqNo(); if (item.second->SendRoutingMessage(*item.first, r)) { lastGoodSend = r->Now(); flushpaths.emplace(item.second); m_Endpoint->MarkConvoTagActive(item.first->T.T); + const auto rtt = (item.second->intro.latency + remoteIntro.latency) * 2; + rttRMS += rtt * rtt.count(); } } while (not m_SendQueue.empty()); } @@ -60,6 +64,10 @@ namespace llarp { path->FlushUpstream(r); } + if (flushpaths.empty()) + return; + estimatedRTT = std::chrono::milliseconds{ + static_cast(std::sqrt(rttRMS.count() / flushpaths.size()))}; } /// send on an established convo tag diff --git a/llarp/service/sendcontext.hpp b/llarp/service/sendcontext.hpp index da4f19023..c946efd86 100644 --- a/llarp/service/sendcontext.hpp +++ b/llarp/service/sendcontext.hpp @@ -46,9 +46,11 @@ namespace llarp const llarp_time_t createdAt; llarp_time_t sendTimeout = 40s; llarp_time_t connectTimeout = 60s; + llarp_time_t estimatedRTT = 0s; bool markedBad = false; - using Msg_ptr = std::shared_ptr; + using Msg_ptr = std::shared_ptr; using SendEvent_t = std::pair; + thread::Queue m_SendQueue; std::function authResultListener;