diff --git a/Makefile b/Makefile index 88315facf..27157fc82 100644 --- a/Makefile +++ b/Makefile @@ -11,13 +11,18 @@ CXX ?= c++ TARGETS = lokinet SIGS = $(TARGETS:=.sig) + SHADOW_ROOT ?= $(HOME)/.shadow SHADOW_BIN=$(SHADOW_ROOT)/bin/shadow SHADOW_CONFIG=$(REPO)/shadow.config.xml SHADOW_PLUGIN=$(REPO)/libshadow-plugin-llarp.so SHADOW_LOG=$(REPO)/shadow.log.txt -TESTNET_ROOT=$(REPO)/testnet_tmp +SHADOW_SRC ?= $(HOME)/git/shadow +SHADOW_PARSE ?= python $(SHADOW_SRC)/src/tools/parse-shadow.py - -m 0 --packet-data +SHADOW_PLOT ?= python $(SHADOW_SRC)/src/tools/plot-shadow.py -d $(REPO) LokiNET -c $(SHADOW_CONFIG) -r 10000 -e '.*' + +TESTNET_ROOT=/tmp/lokinet_testnet_tmp TESTNET_CONF=$(TESTNET_ROOT)/supervisor.conf TESTNET_LOG=$(TESTNET_ROOT)/testnet.log @@ -63,9 +68,14 @@ shadow-build: shadow-configure ninja clean ninja -shadow: shadow-build +shadow-run: shadow-build python3 contrib/shadow/genconf.py $(SHADOW_CONFIG) - bash -c "$(SHADOW_BIN) -w $$(cat /proc/cpuinfo | grep processor | wc -l) $(SHADOW_CONFIG) &> $(SHADOW_LOG)" + bash -c "$(SHADOW_BIN) -w $$(cat /proc/cpuinfo | grep processor | wc -l) $(SHADOW_CONFIG) | $(SHADOW_PARSE)" + +shadow-plot: shadow-run + $(SHADOW_PLOT) + +shadow: shadow-plot testnet-configure: clean cmake -GNinja -DCMAKE_BUILD_TYPE=Debug -DCMAKE_C_COMPILER=$(CC) -DCMAKE_CXX_COMPILER=$(CXX) diff --git a/contrib/shadow/genconf.py b/contrib/shadow/genconf.py index 9d816b1dc..b94bc3505 100644 --- a/contrib/shadow/genconf.py +++ b/contrib/shadow/genconf.py @@ -71,6 +71,7 @@ def makeSVCNode(settings, name, id, port): def genconf(settings, outf): root = etree.Element('shadow') + root.attrib["environment"] = 'LLARP_SHADOW=1' topology = etree.SubElement(root, 'topology') topology.attrib['path'] = getSetting(settings, 'topology', os.path.join( shadowRoot, 'share', 'topology.graphml.xml')) diff --git a/daemon.ini b/daemon.ini index a74b326cc..7ea02c292 100644 --- a/daemon.ini +++ b/daemon.ini @@ -7,7 +7,7 @@ ident-privkey=server-ident.key #public-port=1090 [netdb] -dir=./tmp-nodes +dir=/tmp/nodes [connect] #i2p.rocks=i2p.rocks.signed.txt diff --git a/daemon/main.cpp b/daemon/main.cpp index d2197afed..54af82cb4 100644 --- a/daemon/main.cpp +++ b/daemon/main.cpp @@ -3,6 +3,7 @@ #include #include #include // for MIN +#include struct llarp_main *ctx = 0; @@ -13,13 +14,15 @@ handle_signal(int sig) llarp_main_signal(ctx, sig); } -#ifndef TESTNET -#define TESTNET 0 -#endif - int main(int argc, char *argv[]) { + bool multiThreaded = true; + const char *singleThreadVar = getenv("LLARP_SHADOW"); + if(singleThreadVar && std::string(singleThreadVar) == "1") + { + multiThreaded = false; + } const char *conffname = "daemon.ini"; int c; while(1) @@ -65,13 +68,13 @@ main(int argc, char *argv[]) } } - ctx = llarp_main_init(conffname, !TESTNET); + ctx = llarp_main_init(conffname, multiThreaded); int code = 1; if(ctx) { signal(SIGINT, handle_signal); code = llarp_main_run(ctx); - // llarp_main_free(ctx); + llarp_main_free(ctx); } return code; } diff --git a/include/llarp/codel.hpp b/include/llarp/codel.hpp index bef9c0b5c..7492c330f 100644 --- a/include/llarp/codel.hpp +++ b/include/llarp/codel.hpp @@ -25,26 +25,36 @@ namespace llarp { } }; + template < typename T, typename GetTime > + struct CoDelCompareTime + { + bool + operator()(const T& left, const T& right) const + { + return GetTime()(left) < GetTime()(right); + } + }; - template < typename T, typename GetTime, typename PutTime, + template < typename T > + struct CoDelComparePriority + { + bool + operator()(const T& left, const T& right) const + { + return left < right; + } + }; + + template < typename T, typename GetTime, typename PutTime, typename Compare, typename Mutex_t = std::mutex, typename Lock_t = std::unique_lock< Mutex_t >, - llarp_time_t dropMs = 20, llarp_time_t initialIntervalMs = 100 > + llarp_time_t dropMs = 5, llarp_time_t initialIntervalMs = 100 > struct CoDelQueue { CoDelQueue(const std::string& name) : m_name(name) { } - struct CoDelCompare - { - bool - operator()(const T& left, const T& right) const - { - return GetTime()(left) < GetTime()(right); - } - }; - size_t Size() { @@ -53,7 +63,7 @@ namespace llarp } void - Put(const T& i) + Put(T i) { Lock_t lock(m_QueueMutex); // llarp::LogInfo("CoDelQueue::Put - adding item, queue now has ", @@ -111,7 +121,7 @@ namespace llarp size_t dropNum = 0; llarp_time_t nextTickInterval = initialIntervalMs; Mutex_t m_QueueMutex; - std::priority_queue< T, std::vector< T >, CoDelCompare > m_Queue; + std::priority_queue< T, std::vector< T >, Compare > m_Queue; std::string m_name; }; } // namespace util diff --git a/include/llarp/crypto.h b/include/llarp/crypto.h index 658b05edd..e2b81b326 100644 --- a/include/llarp/crypto.h +++ b/include/llarp/crypto.h @@ -106,4 +106,8 @@ llarp_crypto_libsodium_init(struct llarp_crypto *c); bool llarp_crypto_initialized(struct llarp_crypto *c); +/// return random 64bit unsigned interger +uint64_t +llarp_randint(); + #endif diff --git a/include/llarp/crypto_async.h b/include/llarp/crypto_async.h index b999ea866..a796d8419 100644 --- a/include/llarp/crypto_async.h +++ b/include/llarp/crypto_async.h @@ -196,6 +196,15 @@ struct FrameGetTime } }; +struct FrameCompareTime +{ + bool + operator()(const iwp_async_frame *left, iwp_async_frame *right) const + { + return left->created < right->created; + } +}; + /// synchronously decrypt a frame bool iwp_decrypt_frame(struct iwp_async_frame *frame); diff --git a/include/llarp/dht/messages/gotintro.hpp b/include/llarp/dht/messages/gotintro.hpp index e8f6fa0b6..6807fba71 100644 --- a/include/llarp/dht/messages/gotintro.hpp +++ b/include/llarp/dht/messages/gotintro.hpp @@ -2,6 +2,7 @@ #define LLARP_DHT_MESSAGES_GOT_INTRO_HPP #include #include +#include namespace llarp { @@ -10,14 +11,14 @@ namespace llarp /// acknologement to PublishIntroMessage or reply to FinIntroMessage struct GotIntroMessage : public IMessage { - std::set< llarp::service::IntroSet > I; + std::vector< llarp::service::IntroSet > I; uint64_t T = 0; GotIntroMessage(const Key_t& from) : IMessage(from) { } - GotIntroMessage(const std::set< llarp::service::IntroSet >& results, + GotIntroMessage(const std::vector< llarp::service::IntroSet >& results, uint64_t txid); ~GotIntroMessage(); diff --git a/include/llarp/dht/messages/pubintro.hpp b/include/llarp/dht/messages/pubintro.hpp index eab2cae91..9c7d1c4ef 100644 --- a/include/llarp/dht/messages/pubintro.hpp +++ b/include/llarp/dht/messages/pubintro.hpp @@ -2,6 +2,8 @@ #define LLARP_DHT_MESSAGES_PUB_INTRO_HPP #include #include +#include + namespace llarp { namespace dht @@ -9,7 +11,7 @@ namespace llarp struct PublishIntroMessage : public IMessage { llarp::service::IntroSet I; - std::set< Key_t > E; + std::vector< Key_t > E; uint64_t R = 0; uint64_t S = 0; uint64_t txID = 0; @@ -19,7 +21,7 @@ namespace llarp } PublishIntroMessage(const llarp::service::IntroSet& i, uint64_t tx, - uint64_t s, const std::set< Key_t >& exclude = {}) + uint64_t s, const std::vector< Key_t >& exclude = {}) : IMessage({}), E(exclude), txID(tx) { I = i; diff --git a/include/llarp/dht/search_job.hpp b/include/llarp/dht/search_job.hpp index 15c6ef8de..19091ff1c 100644 --- a/include/llarp/dht/search_job.hpp +++ b/include/llarp/dht/search_job.hpp @@ -7,6 +7,7 @@ #include #include #include +#include namespace llarp { @@ -17,7 +18,8 @@ namespace llarp { const static uint64_t JobTimeout = 30000; - typedef std::function< void(const std::set< llarp::service::IntroSet >&) > + typedef std::function< void( + const std::vector< llarp::service::IntroSet >&) > IntroSetHookFunc; SearchJob(); /// for routers @@ -36,7 +38,8 @@ namespace llarp FoundRouter(const llarp_rc* router) const; void - FoundIntros(const std::set< llarp::service::IntroSet >& introset) const; + FoundIntros( + const std::vector< llarp::service::IntroSet >& introset) const; void Timeout() const; diff --git a/include/llarp/iwp/frame_state.hpp b/include/llarp/iwp/frame_state.hpp index 685650f38..0586947fc 100644 --- a/include/llarp/iwp/frame_state.hpp +++ b/include/llarp/iwp/frame_state.hpp @@ -8,6 +8,7 @@ #include "llarp/time.h" #include "llarp/types.h" #include "sendbuf.hpp" +#include "sendqueue.hpp" #include "transit_message.hpp" #include @@ -49,10 +50,12 @@ struct frame_state rx; std::unordered_map< uint64_t, transit_message * > tx; - typedef std::queue< sendbuf_t * > sendqueue_t; + // typedef std::queue< sendbuf_t * > sendqueue_t; + typedef llarp::util::CoDelQueue< InboundMessage *, InboundMessage::GetTime, InboundMessage::PutTime, - llarp::util::DummyMutex, llarp::util::DummyLock > + InboundMessage::OrderCompare, llarp::util::DummyMutex, + llarp::util::DummyLock > recvqueue_t; llarp_link_session *parent = nullptr; @@ -62,7 +65,9 @@ struct frame_state uint64_t nextMsgID = 0; frame_state(llarp_link_session *session) - : parent(session), recvqueue("iwp_inbound_message") + : parent(session) + , sendqueue("iwp_outbound_message") + , recvqueue("iwp_inbound_message") { } diff --git a/include/llarp/iwp/sendbuf.hpp b/include/llarp/iwp/sendbuf.hpp index 88a52fbb6..185a3e049 100644 --- a/include/llarp/iwp/sendbuf.hpp +++ b/include/llarp/iwp/sendbuf.hpp @@ -1,7 +1,7 @@ #pragma once -#include "llarp/buffer.h" - +#include +#include #include struct sendbuf_t @@ -19,6 +19,8 @@ struct sendbuf_t size_t sz; + byte_t priority = 255; + size_t size() const { @@ -31,8 +33,45 @@ struct sendbuf_t return _buf; } + llarp_buffer_t + Buffer() + { + llarp_buffer_t buf; + buf.base = _buf; + buf.sz = sz; + buf.cur = buf.base; + return buf; + } + + struct GetTime + { + llarp_time_t + operator()(const sendbuf_t *buf) const + { + return buf->timestamp; + } + }; + + struct PutTime + { + void + operator()(sendbuf_t *&buf) const + { + buf->timestamp = llarp_time_now_ms(); + } + }; + + struct Compare + { + bool + operator()(const sendbuf_t *left, const sendbuf_t *right) const + { + return left->priority < right->priority; + } + }; + + llarp_time_t timestamp = 0; + private: byte_t *_buf = nullptr; }; - -typedef std::queue< sendbuf_t * > sendqueue_t; \ No newline at end of file diff --git a/include/llarp/iwp/sendqueue.hpp b/include/llarp/iwp/sendqueue.hpp index 21f8cb6aa..fd4777274 100644 --- a/include/llarp/iwp/sendqueue.hpp +++ b/include/llarp/iwp/sendqueue.hpp @@ -1,10 +1,11 @@ #ifndef LLARP_IWP_SENDQUEUE_HPP #define LLARP_IWP_SENDQUEUE_HPP -#include #include +#include -typedef llarp::util::CoDelQueue sendqueue_t; +typedef llarp::util::CoDelQueue< + sendbuf_t *, sendbuf_t::GetTime, sendbuf_t::PutTime, sendbuf_t::Compare, + llarp::util::DummyMutex, llarp::util::DummyLock > + sendqueue_t; #endif \ No newline at end of file diff --git a/include/llarp/iwp/server.hpp b/include/llarp/iwp/server.hpp index 697b2d312..101a1e04e 100644 --- a/include/llarp/iwp/server.hpp +++ b/include/llarp/iwp/server.hpp @@ -46,7 +46,7 @@ struct llarp_link SessionMap_t m_Connected; mtx_t m_Connected_Mutex; - bool pumpingLogic = false; + std::atomic< bool > pumpingLogic; typedef std::unordered_map< llarp::Addr, llarp_link_session *, llarp::addrhash > @@ -65,6 +65,7 @@ struct llarp_link { strncpy(keyfile, args.keyfile, sizeof(keyfile)); iwp = llarp_async_iwp_new(crypto, logic, worker); + pumpingLogic.store(false); } ~llarp_link() @@ -126,6 +127,20 @@ struct llarp_link } for(const auto &addr : remove) RemoveSessionByAddr(addr); + + { + lock_t lock(m_PendingSessions_Mutex); + auto itr = m_PendingSessions.begin(); + while(itr != m_PendingSessions.end()) + { + if(itr->second->timedout(now)) + { + itr = m_PendingSessions.erase(itr); + } + else + ++itr; + } + } } static bool diff --git a/include/llarp/iwp/session.hpp b/include/llarp/iwp/session.hpp index ad2c2d72c..1bfca0fe4 100644 --- a/include/llarp/iwp/session.hpp +++ b/include/llarp/iwp/session.hpp @@ -117,13 +117,15 @@ struct llarp_link_session uint32_t frames = 0; std::atomic< bool > working; - llarp::util::CoDelQueue< iwp_async_frame *, FrameGetTime, FramePutTime > + llarp::util::CoDelQueue< iwp_async_frame *, FrameGetTime, FramePutTime, + FrameCompareTime > outboundFrames; /* std::mutex m_EncryptedFramesMutex; std::queue< iwp_async_frame > encryptedFrames; */ - llarp::util::CoDelQueue< iwp_async_frame *, FrameGetTime, FramePutTime > + llarp::util::CoDelQueue< iwp_async_frame *, FrameGetTime, FramePutTime, + FrameCompareTime > decryptedFrames; uint32_t pump_send_timer_id = 0; @@ -133,8 +135,6 @@ struct llarp_link_session iwp_async_intro intro; iwp_async_introack introack; iwp_async_session_start start; - // frame_state frame; - bool started_inbound_codel = false; byte_t token[32]; byte_t workbuf[MAX_PAD + 128]; diff --git a/include/llarp/iwp/transit_message.hpp b/include/llarp/iwp/transit_message.hpp index c945ed8b8..4b4257864 100644 --- a/include/llarp/iwp/transit_message.hpp +++ b/include/llarp/iwp/transit_message.hpp @@ -2,6 +2,7 @@ #include "llarp/types.h" #include "sendbuf.hpp" +#include "sendqueue.hpp" #include "xmit.hpp" #include diff --git a/include/llarp/service/IntroSet.hpp b/include/llarp/service/IntroSet.hpp index e2189dbb7..cbc26cfac 100644 --- a/include/llarp/service/IntroSet.hpp +++ b/include/llarp/service/IntroSet.hpp @@ -9,7 +9,7 @@ #include #include -#include +#include namespace llarp { @@ -20,7 +20,7 @@ namespace llarp struct IntroSet : public llarp::IBEncodeMessage { ServiceInfo A; - std::set< Introduction > I; + std::vector< Introduction > I; Tag topic; llarp::PoW* W = nullptr; llarp::Signature Z; diff --git a/include/llarp/service/endpoint.hpp b/include/llarp/service/endpoint.hpp index 947907fd7..d51665f20 100644 --- a/include/llarp/service/endpoint.hpp +++ b/include/llarp/service/endpoint.hpp @@ -106,10 +106,10 @@ namespace llarp uint64_t sequenceNo = 0; llarp::SharedSecret sharedKey; - llarp::util::CoDelQueue< ProtocolMessage*, ProtocolMessage::GetTime, - ProtocolMessage::PutTime, - llarp::util::DummyMutex, - llarp::util::DummyLock > + llarp::util::CoDelQueue< + ProtocolMessage*, ProtocolMessage::GetTime, + ProtocolMessage::PutTime, ProtocolMessage::Compare, + llarp::util::DummyMutex, llarp::util::DummyLock > m_SendQueue; Endpoint* m_Parent; }; diff --git a/include/llarp/service/protocol.hpp b/include/llarp/service/protocol.hpp index 04fa183e1..6f32e9104 100644 --- a/include/llarp/service/protocol.hpp +++ b/include/llarp/service/protocol.hpp @@ -17,12 +17,13 @@ namespace llarp struct ProtocolMessage : public llarp::IBEncodeMessage { - ProtocolMessage(ProtocolType t); + ProtocolMessage(ProtocolType t, uint64_t seqno); ~ProtocolMessage(); ProtocolType proto; llarp_time_t queued = 0; std::vector< byte_t > payload; llarp::KeyExchangeNonce N; + uint64_t sequenceNum; bool DecodeKey(llarp_buffer_t key, llarp_buffer_t* val); @@ -32,6 +33,16 @@ namespace llarp void PutBuffer(llarp_buffer_t payload); + struct Compare + { + bool + operator()(const ProtocolMessage* left, + const ProtocolMessage* right) const + { + return left->sequenceNum < right->sequenceNum; + } + }; + struct GetTime { llarp_time_t diff --git a/llarp/crypto_libsodium.cpp b/llarp/crypto_libsodium.cpp index 960640148..e21515eb8 100644 --- a/llarp/crypto_libsodium.cpp +++ b/llarp/crypto_libsodium.cpp @@ -159,3 +159,11 @@ llarp_crypto_libsodium_init(struct llarp_crypto *c) c->randbytes(&seed, sizeof(seed)); srand(seed); } + +uint64_t +llarp_randint() +{ + uint64_t i; + randombytes((byte_t *)&i, sizeof(i)); + return i; +} \ No newline at end of file diff --git a/llarp/dht/context.cpp b/llarp/dht/context.cpp index 4baf02b86..32371eed3 100644 --- a/llarp/dht/context.cpp +++ b/llarp/dht/context.cpp @@ -2,6 +2,7 @@ #include #include #include +#include #include "router.hpp" namespace llarp @@ -57,7 +58,7 @@ namespace llarp } void - OnResult(const std::set< service::IntroSet > &results) + OnResult(const std::vector< service::IntroSet > &results) { auto path = m_router->paths.GetByUpstream(m_router->dht->impl.OurKey(), pathID); @@ -68,7 +69,13 @@ namespace llarp localtags.insert(introset); } llarp::routing::DHTMessage msg; - msg.M.push_back(new llarp::dht::GotIntroMessage(localtags, txid)); + auto sz = localtags.size(); + std::vector< service::IntroSet > intros(sz); + for(const auto &i : localtags) + { + intros[--sz] = i; + } + msg.M.push_back(new llarp::dht::GotIntroMessage(intros, txid)); path->SendRoutingMessage(&msg, m_router); } else @@ -92,7 +99,8 @@ namespace llarp TXOwner ownerKey; ownerKey.node = peer; ownerKey.txid = id; - SearchJob job(from, txid, [](const std::set< service::IntroSet > &) {}); + SearchJob job(from, txid, + [](const std::vector< service::IntroSet > &) {}); pendingTX[ownerKey] = job; auto msg = new llarp::DHTImmeidateMessage(peer); msg->msgs.push_back(new PublishIntroMessage(introset, id, S)); @@ -130,7 +138,7 @@ namespace llarp auto itr = nodes.begin(); // start at random middle point - auto start = rand() % nodes.size(); + auto start = llarp_randint() % nodes.size(); std::advance(itr, start); auto end = itr; while(itr != nodes.end()) @@ -308,7 +316,7 @@ namespace llarp } void - OnResult(const std::set< llarp::service::IntroSet > &results) + OnResult(const std::vector< llarp::service::IntroSet > &results) { if(replyNode != m_Router->dht->impl.OurKey()) { diff --git a/llarp/dht/find_intro.cpp b/llarp/dht/find_intro.cpp index f29e4b2ec..be3abf0c3 100644 --- a/llarp/dht/find_intro.cpp +++ b/llarp/dht/find_intro.cpp @@ -148,7 +148,9 @@ namespace llarp { if(iterative) { - auto introsets = dht.FindRandomIntroSetsWithTag(N, 8); + std::vector< service::IntroSet > introsets; + for(const auto& introset : dht.FindRandomIntroSetsWithTag(N, 8)) + introsets.push_back(introset); // we are iterative and don't have it, reply with a direct reply replies.push_back(new GotIntroMessage(introsets, T)); } diff --git a/llarp/dht/got_intro.cpp b/llarp/dht/got_intro.cpp index ef97d9f1b..174791082 100644 --- a/llarp/dht/got_intro.cpp +++ b/llarp/dht/got_intro.cpp @@ -9,7 +9,7 @@ namespace llarp namespace dht { GotIntroMessage::GotIntroMessage( - const std::set< llarp::service::IntroSet > &results, uint64_t tx) + const std::vector< llarp::service::IntroSet > &results, uint64_t tx) : IMessage({}), I(results), T(tx) { } @@ -24,7 +24,6 @@ namespace llarp { auto &dht = ctx->impl; auto crypto = &dht.router->crypto; - std::set< service::IntroSet > introsets; for(const auto &introset : I) { @@ -36,22 +35,11 @@ namespace llarp From); return false; } - llarp::dht::Key_t addr; - if(!introset.A.CalculateAddress(addr)) - { - llarp::LogWarn( - "failed to calculate hidden service address for direct " - "GotIntro " - "message from ", - From); - return false; - } - introsets.insert(introset); } auto pending = dht.FindPendingTX(From, T); if(pending) { - pending->FoundIntros(introsets); + pending->FoundIntros(I); dht.RemovePendingLookup(From, T); return true; } @@ -82,7 +70,7 @@ namespace llarp { if(llarp_buffer_eq(key, "I")) { - return BEncodeReadSet(I, buf); + return BEncodeReadList(I, buf); } bool read = false; if(!BEncodeMaybeReadDictInt("T", T, read, key, buf)) diff --git a/llarp/dht/publish_intro.cpp b/llarp/dht/publish_intro.cpp index fcd3e3677..30bee74f5 100644 --- a/llarp/dht/publish_intro.cpp +++ b/llarp/dht/publish_intro.cpp @@ -19,7 +19,7 @@ namespace llarp bool read = false; if(llarp_buffer_eq(key, "E")) { - return BEncodeReadSet(E, val); + return BEncodeReadList(E, val); } if(!BEncodeMaybeReadDictEntry("I", I, read, key, val)) return false; @@ -69,8 +69,11 @@ namespace llarp dht.services->PutNode(I); replies.push_back(new GotIntroMessage({I}, txID)); Key_t peer; - std::set< Key_t > exclude = E; + std::set< Key_t > exclude; + for(const auto &e : E) + exclude.insert(e); exclude.insert(From); + exclude.insert(dht.OurKey()); if(S && dht.nodes->FindCloseExcluding(addr, peer, exclude)) { dht.PropagateIntroSetTo(From, txID, I, peer, S - 1); diff --git a/llarp/dht/search_job.cpp b/llarp/dht/search_job.cpp index 773fa776b..df9ef01f7 100644 --- a/llarp/dht/search_job.cpp +++ b/llarp/dht/search_job.cpp @@ -46,7 +46,7 @@ namespace llarp void SearchJob::FoundIntros( - const std::set< llarp::service::IntroSet > &introsets) const + const std::vector< llarp::service::IntroSet > &introsets) const { if(foundIntroHook) foundIntroHook(introsets); diff --git a/llarp/iwp/frame_state.cpp b/llarp/iwp/frame_state.cpp index 30d90030a..ecbaaab96 100644 --- a/llarp/iwp/frame_state.cpp +++ b/llarp/iwp/frame_state.cpp @@ -27,19 +27,29 @@ frame_state::process_inbound_queue() // TODO: is this right? auto &front = q.top(); // the items are already sorted anyways so this doesn't really do much - nextMsgID = std::max(nextMsgID, front->msgid); - auto buffer = front->Buffer(); - if(!Router()->HandleRecvLinkMessage(parent, buffer)) + + if(front->msgid < nextMsgID && nextMsgID - front->msgid > 1) { - llarp::LogWarn("failed to process inbound message ", front->msgid); - llarp::DumpBuffer< llarp_buffer_t, 128 >(buffer); + // re queue + recvqueue.Put(front); + nextMsgID = front->msgid; + } + else + { + auto buffer = front->Buffer(); + if(!Router()->HandleRecvLinkMessage(parent, buffer)) + { + llarp::LogWarn("failed to process inbound message ", front->msgid); + llarp::DumpBuffer< llarp_buffer_t, 128 >(buffer); + } + else + { + nextMsgID = std::max(front->msgid, nextMsgID + 1); + } + delete front; } - delete front; q.pop(); - increment = true; } - if(increment) - ++nextMsgID; // TODO: this isn't right return true; } @@ -145,13 +155,13 @@ frame_state::got_frag(frame_header hdr, size_t sz) auto idItr = rxIDs.find(msgid); if(idItr == rxIDs.end()) { - llarp::LogWarn("no such RX fragment, msgid=", msgid); + push_ackfor(msgid, 0); return true; } auto itr = rx.find(idItr->second); if(itr == rx.end()) { - llarp::LogWarn("no such RX fragment, msgid=", msgid); + push_ackfor(msgid, 0); return true; } auto fragsize = itr->second->msginfo.fragsize(); @@ -184,11 +194,12 @@ void frame_state::push_ackfor(uint64_t id, uint32_t bitmask) { llarp::LogDebug("ACK for msgid=", id, " mask=", bitmask); - sendqueue.push(new sendbuf_t(12 + 6)); - auto body_ptr = init_sendbuf(sendqueue.back(), eACKS, 12, txflags); + auto pkt = new sendbuf_t(12 + 6); + auto body_ptr = init_sendbuf(pkt, eACKS, 12, txflags); // TODO: this assumes big endian memcpy(body_ptr, &id, 8); memcpy(body_ptr + 8, &bitmask, 4); + sendqueue.Put(pkt); } bool @@ -331,6 +342,7 @@ frame_state::process(byte_t *buf, size_t sz) } } +/* bool frame_state::next_frame(llarp_buffer_t *buf) { @@ -354,6 +366,7 @@ frame_state::pop_next_frame() delete buf; sendqueue.pop(); } +*/ void frame_state::queue_tx(uint64_t id, transit_message *msg) diff --git a/llarp/iwp/session.cpp b/llarp/iwp/session.cpp index 294f67ff1..f5bd6978d 100644 --- a/llarp/iwp/session.cpp +++ b/llarp/iwp/session.cpp @@ -424,6 +424,7 @@ llarp_link_session::TickLogic() q.pop(); } frame.process_inbound_queue(); + frame.retransmit(now); pump(); } @@ -453,15 +454,9 @@ llarp_link_session::Tick(llarp_time_t now) if(now - lastKeepalive > KEEP_ALIVE_INTERVAL) send_keepalive(this); } - - // pump frame state if(state == eEstablished) { - // llarp::LogDebug("Tick - pumping and retransmitting because we're - // eEstablished"); - - frame.retransmit(now); - pump(); + this->now = now; } return false; } @@ -558,7 +553,7 @@ llarp_link_session::introduce(uint8_t *pub) llarp::LogDebug("session introduce"); memcpy(remote, pub, PUBKEYSIZE); intro.buf = workbuf; - size_t w0sz = (rand() % MAX_PAD); + size_t w0sz = (llarp_randint() % MAX_PAD); intro.sz = (32 * 3) + w0sz; // randomize w0 if(w0sz) @@ -576,8 +571,6 @@ llarp_link_session::introduce(uint8_t *pub) intro.user = this; intro.hook = &handle_generated_intro; working = true; - llarp::LogInfo("try introduce to transport adddress ", - llarp::RouterID(remote)); iwp_call_async_gen_intro(iwp, &intro); // start introduce timer establish_job_id = llarp_logic_call_later( @@ -630,7 +623,7 @@ void llarp_link_session::session_start() { llarp::LogInfo("session gen start"); - size_t w2sz = rand() % MAX_PAD; + size_t w2sz = llarp_randint() % MAX_PAD; start.buf = workbuf; start.sz = w2sz + (32 * 3); start.nonce = workbuf + 32; @@ -682,7 +675,7 @@ llarp_link_session::intro_ack() return; } llarp::LogDebug("session introack"); - uint16_t w1sz = rand() % MAX_PAD; + uint16_t w1sz = llarp_randint() % MAX_PAD; introack.buf = workbuf; introack.sz = (32 * 3) + w1sz; // randomize padding @@ -785,7 +778,7 @@ llarp_link_session::encrypt_frame_async_send(const void *buf, size_t sz) iwp_async_frame *frame = alloc_frame(nullptr, sz + 64); memcpy(frame->buf + 64, buf, sz); // maybe add upto 128 random bytes to the packet - auto padding = rand() % MAX_PAD; + auto padding = llarp_randint() % MAX_PAD; if(padding) crypto->randbytes(frame->buf + 64 + sz, padding); frame->sz += padding; @@ -802,10 +795,15 @@ llarp_link_session::pump() bool flush = false; now = llarp_time_now_ms(); llarp_buffer_t buf; - while(frame.next_frame(&buf)) + std::queue< sendbuf_t * > q; + frame.sendqueue.Process(q); + while(q.size()) { + auto &front = q.front(); + buf = front->Buffer(); encrypt_frame_async_send(buf.base, buf.sz); - frame.pop_next_frame(); + delete front; + q.pop(); flush = true; } if(flush) diff --git a/llarp/iwp/transit_message.cpp b/llarp/iwp/transit_message.cpp index dd30911f8..97b42ce53 100644 --- a/llarp/iwp/transit_message.cpp +++ b/llarp/iwp/transit_message.cpp @@ -97,12 +97,13 @@ transit_message::completed() const void transit_message::generate_xmit(sendqueue_t &queue, byte_t flags) { - uint16_t sz = lastfrag.size() + sizeof(msginfo.buffer); - queue.push(new sendbuf_t(sz + 6)); - auto body_ptr = init_sendbuf(queue.back(), eXMIT, sz, flags); + uint16_t sz = lastfrag.size() + sizeof(msginfo.buffer); + auto pkt = new sendbuf_t(sz + 6); + auto body_ptr = init_sendbuf(pkt, eXMIT, sz, flags); memcpy(body_ptr, msginfo.buffer, sizeof(msginfo.buffer)); body_ptr += sizeof(msginfo.buffer); memcpy(body_ptr, lastfrag.data(), lastfrag.size()); + queue.Put(pkt); } // template < typename T > @@ -115,13 +116,14 @@ transit_message::retransmit_frags(sendqueue_t &queue, byte_t flags) { if(status.test(frag.first)) continue; - uint16_t sz = 9 + fragsize; - queue.push(new sendbuf_t(sz + 6)); - auto body_ptr = init_sendbuf(queue.back(), eFRAG, sz, flags); + uint16_t sz = 9 + fragsize; + auto pkt = new sendbuf_t(sz + 6); + auto body_ptr = init_sendbuf(pkt, eFRAG, sz, flags); // TODO: assumes big endian memcpy(body_ptr, &msgid, 8); body_ptr[8] = frag.first; memcpy(body_ptr + 9, frag.second.data(), fragsize); + queue.Put(pkt); } lastRetransmit = llarp_time_now_ms(); } diff --git a/llarp/mem.hpp b/llarp/mem.hpp index 26be41197..ba104d850 100644 --- a/llarp/mem.hpp +++ b/llarp/mem.hpp @@ -51,6 +51,8 @@ namespace llarp if(idx % align == 0) printf("\n"); } + printf("\n"); + fflush(stdout); } } // namespace llarp diff --git a/llarp/nodedb.cpp b/llarp/nodedb.cpp index d16d4e46c..7aca063a6 100644 --- a/llarp/nodedb.cpp +++ b/llarp/nodedb.cpp @@ -439,7 +439,7 @@ llarp_nodedb_select_random_hop(struct llarp_nodedb *n, struct llarp_rc *prev, auto itr = n->entries.begin(); if(sz > 1) { - auto idx = rand() % sz; + auto idx = llarp_randint() % sz; std::advance(itr, idx); } if(memcmp(prev->pubkey, itr->second.pubkey, PUBKEYSIZE) == 0) @@ -453,7 +453,7 @@ llarp_nodedb_select_random_hop(struct llarp_nodedb *n, struct llarp_rc *prev, auto itr = n->entries.begin(); if(sz > 1) { - auto idx = rand() % sz; + auto idx = llarp_randint() % sz; std::advance(itr, idx); } llarp_rc_copy(result, &itr->second); diff --git a/llarp/path.cpp b/llarp/path.cpp index 72a9a9532..2edf97b0b 100644 --- a/llarp/path.cpp +++ b/llarp/path.cpp @@ -64,7 +64,7 @@ namespace llarp PathContext::ForwardLRCM(const RouterID& nextHop, std::deque< EncryptedFrame >& frames) { - llarp::LogInfo("fowarding LRCM to ", nextHop); + llarp::LogDebug("fowarding LRCM to ", nextHop); LR_CommitMessage* msg = new LR_CommitMessage; while(frames.size()) { @@ -226,7 +226,7 @@ namespace llarp if(itr->second->Expired(now)) { TransitHop* path = itr->second; - llarp::LogInfo("transit path expired ", path->info); + llarp::LogDebug("transit path expired ", path->info); removePaths.insert(path); } ++itr; @@ -335,7 +335,7 @@ namespace llarp if(dlt > 5000 && m_LastLatencyTestID == 0) { llarp::routing::PathLatencyMessage latency; - latency.T = rand(); + latency.T = llarp_randint(); m_LastLatencyTestID = latency.T; m_LastLatencyTestTime = now; SendRoutingMessage(&latency, r); @@ -432,7 +432,7 @@ namespace llarp m_BuiltHook = nullptr; llarp::routing::PathLatencyMessage latency; - latency.T = rand(); + latency.T = llarp_randint(); m_LastLatencyTestID = latency.T; m_LastLatencyTestTime = llarp_time_now_ms(); return SendRoutingMessage(&latency, r); diff --git a/llarp/pathset.cpp b/llarp/pathset.cpp index 9f1fb7796..8b391fc12 100644 --- a/llarp/pathset.cpp +++ b/llarp/pathset.cpp @@ -121,7 +121,7 @@ namespace llarp auto sz = established.size(); if(sz) { - return established[rand() % sz]; + return established[llarp_randint() % sz]; } else return nullptr; diff --git a/llarp/relay_commit.cpp b/llarp/relay_commit.cpp index d36b763cb..58976300c 100644 --- a/llarp/relay_commit.cpp +++ b/llarp/relay_commit.cpp @@ -249,21 +249,21 @@ namespace llarp if(self->record.work && self->record.work->IsValid(self->context->Crypto()->shorthash)) { - llarp::LogInfo("LRCM extended lifetime by ", - self->record.work->extendedLifetime, " seconds for ", - info); + llarp::LogDebug("LRCM extended lifetime by ", + self->record.work->extendedLifetime, " seconds for ", + info); self->hop->lifetime += 1000 * self->record.work->extendedLifetime; } else if(self->record.lifetime < 600 && self->record.lifetime > 10) { self->hop->lifetime = self->record.lifetime; - llarp::LogInfo("LRCM short lifespan set to ", self->hop->lifetime, - " seconds for ", info); + llarp::LogDebug("LRCM short lifespan set to ", self->hop->lifetime, + " seconds for ", info); } // TODO: check if we really want to accept it self->hop->started = llarp_time_now_ms(); - llarp::LogInfo("Accepted ", self->hop->info); + llarp::LogDebug("Accepted ", self->hop->info); self->context->PutTransitHop(self->hop); size_t sz = self->frames.front().size(); @@ -277,7 +277,7 @@ namespace llarp if(self->context->HopIsUs(info.upstream)) { // we are the farthest hop - llarp::LogInfo("We are the farthest hop for ", info); + llarp::LogDebug("We are the farthest hop for ", info); // send a LRAM down the path llarp_logic_queue_job(self->context->Logic(), {self, &SendPathConfirm}); } diff --git a/llarp/router.cpp b/llarp/router.cpp index abf90afca..628630f7d 100644 --- a/llarp/router.cpp +++ b/llarp/router.cpp @@ -716,13 +716,13 @@ llarp_router::Run() // initialize as service node InitServiceNode(); // immediate connect all for service node - uint64_t delay = rand() % 100; + uint64_t delay = llarp_randint() % 100; llarp_logic_call_later(logic, {delay, this, &ConnectAll}); } else { // delayed connect all for clients - uint64_t delay = ((rand() % 10) * 500) + 1000; + uint64_t delay = ((llarp_randint() % 10) * 500) + 500; llarp_logic_call_later(logic, {delay, this, &ConnectAll}); } diff --git a/llarp/service.cpp b/llarp/service.cpp index 5a146c665..370d04ce7 100644 --- a/llarp/service.cpp +++ b/llarp/service.cpp @@ -22,7 +22,7 @@ namespace llarp if(llarp_buffer_eq(key, "i")) { - return BEncodeReadSet(I, buf); + return BEncodeReadList(I, buf); } if(!BEncodeMaybeReadDictEntry("n", topic, read, key, buf)) diff --git a/llarp/service/endpoint.cpp b/llarp/service/endpoint.cpp index 25f0e2f9f..23548a617 100644 --- a/llarp/service/endpoint.cpp +++ b/llarp/service/endpoint.cpp @@ -73,7 +73,9 @@ namespace llarp " because we couldn't get any introductions"); return; } - m_IntroSet.I = I; + m_IntroSet.I.clear(); + for(const auto& intro : I) + m_IntroSet.I.push_back(intro); m_IntroSet.topic = m_Tag; if(!m_Identity.SignIntroSet(m_IntroSet, &m_Router->crypto)) { @@ -125,7 +127,7 @@ namespace llarp uint64_t Endpoint::GenTXID() { - uint64_t txid = rand(); + uint64_t txid = llarp_randint(); while(m_PendingLookups.find(txid) != m_PendingLookups.end()) ++txid; return txid; @@ -172,7 +174,7 @@ namespace llarp { llarp::LogWarn("invalid lookup response for hidden service endpoint ", Name(), " txid=", msg->T); - return false; + return true; } bool result = itr->second->HandleResponse(remote); m_PendingLookups.erase(itr); @@ -255,7 +257,7 @@ namespace llarp auto path = PickRandomEstablishedPath(); if(path) { - m_CurrentPublishTX = rand(); + m_CurrentPublishTX = llarp_randint(); llarp::routing::DHTMessage msg; msg.M.push_back(new llarp::dht::PublishIntroMessage( m_IntroSet, m_CurrentPublishTX, 3)); @@ -371,7 +373,7 @@ namespace llarp { auto sendto = std::bind(&OutboundContext::SendMessage, this, std::placeholders::_1); - ProtocolMessage* msg = new ProtocolMessage(protocol); + ProtocolMessage* msg = new ProtocolMessage(protocol, sequenceNo); msg->PutBuffer(data); if(sequenceNo) { diff --git a/llarp/service/protocol.cpp b/llarp/service/protocol.cpp index e0997c1fc..db7cc0e19 100644 --- a/llarp/service/protocol.cpp +++ b/llarp/service/protocol.cpp @@ -4,7 +4,8 @@ namespace llarp { namespace service { - ProtocolMessage::ProtocolMessage(ProtocolType t) : proto(t) + ProtocolMessage::ProtocolMessage(ProtocolType t, uint64_t seqno) + : proto(t), sequenceNum(seqno) { } diff --git a/test/hiddenservice_unittest.cpp b/test/hiddenservice_unittest.cpp index ba6f3e518..058651736 100644 --- a/test/hiddenservice_unittest.cpp +++ b/test/hiddenservice_unittest.cpp @@ -36,7 +36,7 @@ TEST_F(HiddenServiceTest, TestGenerateIntroSet) intro.expiresAt = 1000; intro.router.Randomize(); intro.pathID.Randomize(); - I.I.insert(intro); + I.I.push_back(intro); } ASSERT_TRUE(ident.SignIntroSet(I, Crypto())); ASSERT_TRUE(I.VerifySignature(Crypto()));