From dbd2c41909373bf1c01eb094ababc6520d659af1 Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Mon, 29 Oct 2018 12:48:36 -0400 Subject: [PATCH] use event loop for time --- include/llarp/codel.hpp | 15 ++++++---- include/llarp/dht/context.hpp | 3 ++ include/llarp/ev.h | 6 ++++ include/llarp/ip.hpp | 7 ++++- include/llarp/link/server.hpp | 14 ++++++--- include/llarp/logic.h | 4 +-- include/llarp/path.hpp | 8 ++--- include/llarp/pathbuilder.hpp | 5 +++- include/llarp/pathset.hpp | 6 +++- include/llarp/pow.hpp | 2 +- include/llarp/service/Identity.hpp | 2 +- include/llarp/service/IntroSet.hpp | 2 +- include/llarp/service/endpoint.hpp | 15 ++++++++-- include/llarp/timer.h | 8 ++++- libabyss/include/abyss/server.hpp | 6 ++++ libabyss/src/server.cpp | 10 +++---- llarp/dht/context.cpp | 16 ++++++---- llarp/dht/got_intro.cpp | 2 +- llarp/dht/publish_intro.cpp | 7 +++-- llarp/ev.cpp | 26 ++++++++++------ llarp/ev.hpp | 27 ++++++++--------- llarp/ev_epoll.hpp | 6 ++-- llarp/ev_kqueue.hpp | 12 ++++---- llarp/handlers/tun.cpp | 11 ++++--- llarp/link/server.cpp | 11 +++---- llarp/link/utp.cpp | 43 +++++++++++++------------- llarp/logic.cpp | 7 +++-- llarp/path.cpp | 35 +++++++++++----------- llarp/pathbuilder.cpp | 13 +++++--- llarp/pathset.cpp | 5 ++-- llarp/proofofwork.cpp | 4 +-- llarp/relay_commit.cpp | 7 +++-- llarp/router.cpp | 8 ++--- llarp/router.hpp | 7 +++++ llarp/service.cpp | 9 +++--- llarp/service/context.cpp | 3 +- llarp/service/endpoint.cpp | 48 ++++++++++++++---------------- llarp/timer.cpp | 33 +++++++++++++++----- 38 files changed, 273 insertions(+), 180 deletions(-) diff --git a/include/llarp/codel.hpp b/include/llarp/codel.hpp index 25db2b705..5a4dcac43 100644 --- a/include/llarp/codel.hpp +++ b/include/llarp/codel.hpp @@ -41,7 +41,8 @@ namespace llarp size_t MaxSize = 1024 > struct CoDelQueue { - CoDelQueue(const std::string& name) : m_name(name) + CoDelQueue(const std::string& name, const PutTime& put) + : m_name(name), _putTime(put) { } @@ -67,9 +68,9 @@ namespace llarp return false; } - PutTime()(m_Queue[m_QueueIdx]); + _putTime(m_Queue[m_QueueIdx]); if(firstPut == 0) - firstPut = GetTime()(m_Queue[m_QueueIdx]); + firstPut = _getTime(m_Queue[m_QueueIdx]); ++m_QueueIdx; return true; @@ -84,9 +85,9 @@ namespace llarp return; T* t = &m_Queue[m_QueueIdx]; new(t) T(std::forward< Args >(args)...); - PutTime()(m_Queue[m_QueueIdx]); + _putTime(m_Queue[m_QueueIdx]); if(firstPut == 0) - firstPut = GetTime()(m_Queue[m_QueueIdx]); + firstPut = _getTime(m_Queue[m_QueueIdx]); ++m_QueueIdx; } @@ -123,7 +124,7 @@ namespace llarp if(f(*item)) break; --m_QueueIdx; - auto dlt = start - GetTime()(*item); + auto dlt = start - _getTime(*item); // llarp::LogInfo("CoDelQueue::Process - dlt ", dlt); lowest = std::min(dlt, lowest); if(m_QueueIdx == 0) @@ -156,6 +157,8 @@ namespace llarp size_t m_QueueIdx = 0; T m_Queue[MaxSize]; std::string m_name; + GetTime _getTime; + PutTime _putTime; }; // namespace util } // namespace util } // namespace llarp diff --git a/include/llarp/dht/context.hpp b/include/llarp/dht/context.hpp index 364387e41..6bbf32706 100644 --- a/include/llarp/dht/context.hpp +++ b/include/llarp/dht/context.hpp @@ -385,6 +385,9 @@ namespace llarp return ++ids; } + llarp_time_t + Now(); + private: void ExploreNetworkVia(const Key_t& peer); diff --git a/include/llarp/ev.h b/include/llarp/ev.h index 5fec1909e..6412c5ba4 100644 --- a/include/llarp/ev.h +++ b/include/llarp/ev.h @@ -16,6 +16,7 @@ #include #include #include +#include /** * ev.h * @@ -45,6 +46,10 @@ llarp_ev_loop_run_single_process(struct llarp_ev_loop *ev, struct llarp_threadpool *tp, struct llarp_logic *logic); +/// get the current time on the event loop +llarp_time_t +llarp_ev_loop_time_now_ms(struct llarp_ev_loop *ev); + /// stop event loop and wait for it to complete all jobs void llarp_ev_loop_stop(struct llarp_ev_loop *ev); @@ -57,6 +62,7 @@ struct llarp_udp_io void *user; void *impl; struct llarp_ev_loop *parent; + /// called every event loop tick after reads void (*tick)(struct llarp_udp_io *); // sockaddr * is the source diff --git a/include/llarp/ip.hpp b/include/llarp/ip.hpp index 728bd46f8..35386c4e5 100644 --- a/include/llarp/ip.hpp +++ b/include/llarp/ip.hpp @@ -3,6 +3,7 @@ #include #include #include +#include #ifndef _WIN32 // unix, linux @@ -101,10 +102,14 @@ namespace llarp struct PutTime { + llarp_ev_loop* loop; + PutTime(llarp_ev_loop* evloop) : loop(evloop) + { + } void operator()(IPv4Packet& pkt) const { - pkt.timestamp = llarp_time_now_ms(); + pkt.timestamp = llarp_ev_loop_time_now_ms(loop); } }; diff --git a/include/llarp/link/server.hpp b/include/llarp/link/server.hpp index dde06982f..6b57a0ef6 100644 --- a/include/llarp/link/server.hpp +++ b/include/llarp/link/server.hpp @@ -18,7 +18,12 @@ namespace llarp struct ILinkLayer { virtual ~ILinkLayer(); - + /// get current time via event loop + llarp_time_t + now() const + { + return llarp_ev_loop_time_now_ms(m_Loop); + } bool HasSessionTo(const PubKey& pk); @@ -111,11 +116,11 @@ namespace llarp // timer cancelled if(left) return; - static_cast< ILinkLayer* >(user)->OnTick(orig, llarp_time_now_ms()); + static_cast< ILinkLayer* >(user)->OnTick(orig); } void - OnTick(uint64_t interval, llarp_time_t now); + OnTick(uint64_t interval); void ScheduleTick(uint64_t interval); @@ -129,7 +134,8 @@ namespace llarp void PutSession(ILinkSession* s); - llarp_logic* m_Logic = nullptr; + llarp_logic* m_Logic = nullptr; + llarp_ev_loop* m_Loop = nullptr; Addr m_ourAddr; llarp_udp_io m_udp; SecretKey m_SecretKey; diff --git a/include/llarp/logic.h b/include/llarp/logic.h index 79b31e740..18db854e1 100644 --- a/include/llarp/logic.h +++ b/include/llarp/logic.h @@ -19,11 +19,11 @@ llarp_init_single_process_logic(struct llarp_threadpool* tp); /// single threaded tick void -llarp_logic_tick(struct llarp_logic* logic); +llarp_logic_tick(struct llarp_logic* logic, llarp_time_t now); /// isolated tick void -llarp_logic_tick_async(struct llarp_logic* logic); +llarp_logic_tick_async(struct llarp_logic* logic, llarp_time_t now); void llarp_free_logic(struct llarp_logic** logic); diff --git a/include/llarp/path.hpp b/include/llarp/path.hpp index 6c87d1cec..f1785391a 100644 --- a/include/llarp/path.hpp +++ b/include/llarp/path.hpp @@ -271,7 +271,7 @@ namespace llarp } void - EnterState(PathStatus st); + EnterState(PathStatus st, llarp_time_t now); llarp_time_t ExpireTime() const @@ -374,16 +374,16 @@ namespace llarp /// called from router tick function void - ExpirePaths(); + ExpirePaths(llarp_time_t now); /// called from router tick function /// builds all paths we need to build at current tick void - BuildPaths(); + BuildPaths(llarp_time_t now); /// called from router tick function void - TickPaths(); + TickPaths(llarp_time_t now); /// track a path builder with this context void diff --git a/include/llarp/pathbuilder.hpp b/include/llarp/pathbuilder.hpp index ba2a123aa..8d946341d 100644 --- a/include/llarp/pathbuilder.hpp +++ b/include/llarp/pathbuilder.hpp @@ -29,7 +29,10 @@ namespace llarp size_t hop); virtual bool - ShouldBuildMore() const; + ShouldBuildMore(llarp_time_t now) const; + + llarp_time_t + Now() const; void BuildOne(); diff --git a/include/llarp/pathset.hpp b/include/llarp/pathset.hpp index 0a293cf3b..1a8d0fc6a 100644 --- a/include/llarp/pathset.hpp +++ b/include/llarp/pathset.hpp @@ -66,9 +66,13 @@ namespace llarp size_t NumInStatus(PathStatus st) const; + /// get time from event loop + virtual llarp_time_t + Now() const = 0; + /// return true if we should build another path virtual bool - ShouldBuildMore() const; + ShouldBuildMore(llarp_time_t now) const; /// return true if we should publish a new hidden service descriptor virtual bool diff --git a/include/llarp/pow.hpp b/include/llarp/pow.hpp index bf2c93026..1e0e9bf9c 100644 --- a/include/llarp/pow.hpp +++ b/include/llarp/pow.hpp @@ -17,7 +17,7 @@ namespace llarp ~PoW(); bool - IsValid(llarp_shorthash_func hashfunc) const; + IsValid(llarp_shorthash_func hashfunc, llarp_time_t now) const; bool DecodeKey(llarp_buffer_t k, llarp_buffer_t* val); diff --git a/include/llarp/service/Identity.hpp b/include/llarp/service/Identity.hpp index 0a33e7f65..ce6aa9a21 100644 --- a/include/llarp/service/Identity.hpp +++ b/include/llarp/service/Identity.hpp @@ -46,7 +46,7 @@ namespace llarp DecodeKey(llarp_buffer_t key, llarp_buffer_t* buf); bool - SignIntroSet(IntroSet& i, llarp_crypto* c) const; + SignIntroSet(IntroSet& i, llarp_crypto* c, llarp_time_t now) const; bool Sign(llarp_crypto*, byte_t* sig, llarp_buffer_t buf) const; diff --git a/include/llarp/service/IntroSet.hpp b/include/llarp/service/IntroSet.hpp index 9ea7d3003..1a419025d 100644 --- a/include/llarp/service/IntroSet.hpp +++ b/include/llarp/service/IntroSet.hpp @@ -148,7 +148,7 @@ namespace llarp DecodeKey(llarp_buffer_t key, llarp_buffer_t* buf); bool - Verify(llarp_crypto* crypto) const; + Verify(llarp_crypto* crypto, llarp_time_t now) const; }; } // namespace service } // namespace llarp diff --git a/include/llarp/service/endpoint.hpp b/include/llarp/service/endpoint.hpp index 48ced9df7..946309d57 100644 --- a/include/llarp/service/endpoint.hpp +++ b/include/llarp/service/endpoint.hpp @@ -6,6 +6,7 @@ #include #include #include +#include // minimum time between interoset shifts #ifndef MIN_SHIFT_INTERVAL @@ -43,6 +44,13 @@ namespace llarp virtual void Tick(llarp_time_t now); + /// get time via event loop + llarp_time_t + Now() + { + return llarp_ev_loop_time_now_ms(EndpointNetLoop()); + } + /// router's logic llarp_logic* RouterLogic(); @@ -263,7 +271,7 @@ namespace llarp ReadyToSend() const; bool - ShouldBuildMore() const; + ShouldBuildMore(llarp_time_t now) const; /// tick internal state /// return true to mark as dead @@ -455,7 +463,7 @@ namespace llarp { RouterLookupJob(Endpoint* p) { - started = llarp_time_now_ms(); + started = p->Now(); txid = p->GenTXID(); } @@ -511,8 +519,9 @@ namespace llarp llarp_time_t lastModified = 0; std::set< IntroSet > result; Tag tag; + Endpoint * parent; - CachedTagResult(const Tag& t) : tag(t) + CachedTagResult(const Tag& t, Endpoint * p) : tag(t), parent(p) { } diff --git a/include/llarp/timer.h b/include/llarp/timer.h index 7b401e87c..388e51f13 100644 --- a/include/llarp/timer.h +++ b/include/llarp/timer.h @@ -2,6 +2,7 @@ #define LLARP_TIMER_H #include #include +#include /** called with userptr, original timeout, left */ typedef void (*llarp_timer_handler_func)(void *, uint64_t, uint64_t); @@ -32,6 +33,11 @@ llarp_timer_remove_job(struct llarp_timer_context *t, uint32_t id); void llarp_timer_stop(struct llarp_timer_context *t); +/// set timer's timestamp, if now is 0 use the current time from system clock, +/// llarp_time_t now +void +llarp_timer_set_time(struct llarp_timer_context *t, llarp_time_t now); + // blocking run timer and send events to thread pool void llarp_timer_run(struct llarp_timer_context *t, struct llarp_threadpool *pool); @@ -43,7 +49,7 @@ llarp_timer_tick_all(struct llarp_timer_context *t); /// tick all timers into a threadpool asynchronously void llarp_timer_tick_all_async(struct llarp_timer_context *t, - struct llarp_threadpool *pool); + struct llarp_threadpool *pool, llarp_time_t now); void llarp_free_timer(struct llarp_timer_context **t); diff --git a/libabyss/include/abyss/server.hpp b/libabyss/include/abyss/server.hpp index 7c014adca..2c2108ec0 100644 --- a/libabyss/include/abyss/server.hpp +++ b/libabyss/include/abyss/server.hpp @@ -49,6 +49,12 @@ namespace abyss void RemoveConn(IRPCHandler* handler); + llarp_time_t + now() const + { + return llarp_ev_loop_time_now_ms(m_loop); + } + protected: virtual IRPCHandler* CreateHandler(ConnImpl* connimpl) const = 0; diff --git a/libabyss/src/server.cpp b/libabyss/src/server.cpp index 0772b6bc2..821c1159d 100644 --- a/libabyss/src/server.cpp +++ b/libabyss/src/server.cpp @@ -48,7 +48,7 @@ namespace abyss : _conn(c), _parent(p) { handler = nullptr; - m_LastActive = llarp_time_now_ms(); + m_LastActive = p->now(); m_ReadTimeout = readtimeout; // set up tcp members _conn->user = this; @@ -265,7 +265,7 @@ namespace abyss return false; } - m_LastActive = llarp_time_now_ms(); + m_LastActive = _parent->now(); if(m_State < eReadHTTPBody) { const char* end = strstr(buf, "\r\n"); @@ -395,11 +395,11 @@ namespace abyss void BaseReqHandler::Tick() { - auto now = llarp_time_now_ms(); - auto itr = m_Conns.begin(); + auto _now = now(); + auto itr = m_Conns.begin(); while(itr != m_Conns.end()) { - if((*itr)->ShouldClose(now)) + if((*itr)->ShouldClose(_now) itr = m_Conns.erase(itr); else ++itr; diff --git a/llarp/dht/context.cpp b/llarp/dht/context.cpp index 674b70142..66a08fa4b 100644 --- a/llarp/dht/context.cpp +++ b/llarp/dht/context.cpp @@ -115,7 +115,7 @@ namespace llarp if(ctx->services) { // expire intro sets - auto now = llarp_time_now_ms(); + auto now = ctx->Now(); auto &nodes = ctx->services->nodes; auto itr = nodes.begin(); while(itr != nodes.end()) @@ -244,7 +244,7 @@ namespace llarp void Context::CleanupTX() { - auto now = llarp_time_now_ms(); + auto now = Now(); llarp::LogDebug("DHT tick"); pendingRouterLookups.Expire(now); @@ -285,7 +285,7 @@ namespace llarp router->SendToOrQueue(peer, &m); if(keepalive) { - auto now = llarp_time_now_ms(); + auto now = Now(); router->PersistSessionUntil(peer, now + 10000); } } @@ -323,7 +323,7 @@ namespace llarp bool Validate(const service::IntroSet &value) const { - if(!value.Verify(parent->Crypto())) + if(!value.Verify(parent->Crypto(), parent->Now())) { llarp::LogWarn("Got invalid introset from service lookup"); return false; @@ -547,7 +547,7 @@ namespace llarp bool Validate(const service::IntroSet &introset) const { - if(!introset.Verify(parent->Crypto())) + if(!introset.Verify(parent->Crypto(), parent->Now())) { llarp::LogWarn("got invalid introset from tag lookup"); return false; @@ -824,5 +824,11 @@ namespace llarp return &router->crypto; } + llarp_time_t + Context::Now() + { + return llarp_ev_loop_time_now_ms(router->netloop); + } + } // namespace dht } // namespace llarp diff --git a/llarp/dht/got_intro.cpp b/llarp/dht/got_intro.cpp index 57c15378d..ff94abd75 100644 --- a/llarp/dht/got_intro.cpp +++ b/llarp/dht/got_intro.cpp @@ -28,7 +28,7 @@ namespace llarp for(const auto &introset : I) { - if(!introset.Verify(crypto)) + if(!introset.Verify(crypto, dht.Now())) { llarp::LogWarn( "Invalid introset while handling direct GotIntro " diff --git a/llarp/dht/publish_intro.cpp b/llarp/dht/publish_intro.cpp index 779346a81..a205518ac 100644 --- a/llarp/dht/publish_intro.cpp +++ b/llarp/dht/publish_intro.cpp @@ -44,20 +44,21 @@ namespace llarp llarp_dht_context *ctx, std::vector< std::unique_ptr< IMessage > > &replies) const { + auto now = ctx->impl.Now(); if(S > 5) { llarp::LogWarn("invalid S value ", S, " > 5"); return false; } auto &dht = ctx->impl; - if(!I.Verify(&dht.router->crypto)) + if(!I.Verify(&dht.router->crypto, now)) { llarp::LogWarn("invalid introset: ", I); // don't propogate or store replies.emplace_back(new GotIntroMessage({}, txID)); return true; } - if(I.W && !I.W->IsValid(dht.router->crypto.shorthash)) + if(I.W && !I.W->IsValid(dht.router->crypto.shorthash, now)) { llarp::LogWarn("proof of work not good enough for IntroSet"); // don't propogate or store @@ -71,7 +72,7 @@ namespace llarp "failed to calculate hidden service address for PubIntro message"); return false; } - auto now = llarp_time_now_ms(); + now += llarp::service::MAX_INTROSET_TIME_DELTA; if(I.IsExpired(now)) { diff --git a/llarp/ev.cpp b/llarp/ev.cpp index 7dfe894d0..a0c271279 100644 --- a/llarp/ev.cpp +++ b/llarp/ev.cpp @@ -30,6 +30,7 @@ llarp_ev_loop_alloc(struct llarp_ev_loop **ev) *ev = new llarp_win32_loop; #endif (*ev)->init(); + (*ev)->_now = llarp_time_now_ms(); } void @@ -44,9 +45,10 @@ llarp_ev_loop_run(struct llarp_ev_loop *ev, struct llarp_logic *logic) { while(ev->running()) { + ev->_now = llarp_time_now_ms(); ev->tick(EV_TICK_INTERVAL); if(ev->running()) - llarp_logic_tick(logic); + llarp_logic_tick(logic, ev->_now); } return 0; } @@ -58,10 +60,11 @@ llarp_ev_loop_run_single_process(struct llarp_ev_loop *ev, { while(ev->running()) { + ev->_now = llarp_time_now_ms(); ev->tick(EV_TICK_INTERVAL); if(ev->running()) { - llarp_logic_tick_async(logic); + llarp_logic_tick_async(logic, ev->_now); llarp_threadpool_tick(tp); } } @@ -85,6 +88,12 @@ llarp_ev_close_udp(struct llarp_udp_io *udp) return -1; } +llarp_time_t +llarp_ev_loop_time_now_ms(struct llarp_ev_loop *loop) +{ + return loop->_now; +} + void llarp_ev_loop_stop(struct llarp_ev_loop *loop) { @@ -120,8 +129,8 @@ bool llarp_tcp_conn_async_write(struct llarp_tcp_conn *conn, const void *pkt, size_t sz) { - const byte_t *ptr = (const byte_t *)pkt; - llarp::tcp_conn *impl = static_cast< llarp::tcp_conn * >(conn->impl); + const byte_t *ptr = (const byte_t *)pkt; + llarp::tcp_conn *impl = static_cast< llarp::tcp_conn * >(conn->impl); if(impl->_shouldClose) return false; while(sz > EV_WRITE_BUF_SZ) @@ -227,9 +236,8 @@ namespace llarp } // namespace llarp - -llarp::ev_io* -llarp_ev_loop::bind_tcp(llarp_tcp_acceptor* tcp, const sockaddr* bindaddr) +llarp::ev_io * +llarp_ev_loop::bind_tcp(llarp_tcp_acceptor *tcp, const sockaddr *bindaddr) { int fd = ::socket(bindaddr->sa_family, SOCK_STREAM, 0); if(fd == -1) @@ -237,7 +245,7 @@ llarp_ev_loop::bind_tcp(llarp_tcp_acceptor* tcp, const sockaddr* bindaddr) socklen_t sz = sizeof(sockaddr_in); if(bindaddr->sa_family == AF_INET6) { - sz = sizeof(sockaddr_in6); + sz = sizeof(sockaddr_in6); } else if(bindaddr->sa_family == AF_UNIX) { @@ -253,7 +261,7 @@ llarp_ev_loop::bind_tcp(llarp_tcp_acceptor* tcp, const sockaddr* bindaddr) ::close(fd); return nullptr; } - llarp::ev_io* serv = new llarp::tcp_serv(this, fd, tcp); + llarp::ev_io *serv = new llarp::tcp_serv(this, fd, tcp); tcp->impl = serv; return serv; } diff --git a/llarp/ev.hpp b/llarp/ev.hpp index 121e32318..e0c546c00 100644 --- a/llarp/ev.hpp +++ b/llarp/ev.hpp @@ -29,11 +29,10 @@ namespace llarp { struct ev_io - { + { struct WriteBuffer { - - llarp_time_t timestamp = 0; + llarp_time_t timestamp = 0; size_t bufsz; byte_t buf[EV_WRITE_BUF_SZ]; @@ -52,19 +51,19 @@ namespace llarp struct GetTime { - llarp_time_t - operator()(const WriteBuffer& w) const + llarp_time_t operator()(const WriteBuffer & buf) const { - return w.timestamp; + return buf.timestamp; } }; struct PutTime { - void - operator()(WriteBuffer& w) const + llarp_ev_loop * loop; + PutTime(llarp_ev_loop * l ) : loop(l) {} + void operator()(WriteBuffer & buf) { - w.timestamp = llarp_time_now_ms(); + buf.timestamp = llarp_ev_loop_time_now_ms(loop); } }; @@ -239,11 +238,11 @@ namespace llarp { if(_shouldClose) return -1; - #ifdef __linux__ +#ifdef __linux__ return ::send(fd, buf, sz, MSG_NOSIGNAL); // ignore sigpipe - #else - return ::send(fd, buf, sz, 0 ); - #endif +#else + return ::send(fd, buf, sz, 0); +#endif } int @@ -303,7 +302,7 @@ namespace llarp struct llarp_ev_loop { byte_t readbuf[EV_READ_BUF_SZ]; - + llarp_time_t _now = 0; virtual bool init() = 0; virtual int diff --git a/llarp/ev_epoll.hpp b/llarp/ev_epoll.hpp index d6f1f3869..36c0ecbd2 100644 --- a/llarp/ev_epoll.hpp +++ b/llarp/ev_epoll.hpp @@ -79,8 +79,8 @@ namespace llarp { llarp_tun_io* t; device* tunif; - tun(llarp_tun_io* tio) - : ev_io(-1, new LossyWriteQueue_t("tun_write_queue")) + tun(llarp_tun_io* tio, llarp_ev_loop* l) + : ev_io(-1, new LossyWriteQueue_t("tun_write_queue", l)) , t(tio) , tunif(tuntap_init()) @@ -309,7 +309,7 @@ struct llarp_epoll_loop : public llarp_ev_loop llarp::ev_io* create_tun(llarp_tun_io* tun) { - llarp::tun* t = new llarp::tun(tun); + llarp::tun* t = new llarp::tun(tun, this); if(t->setup()) { return t; diff --git a/llarp/ev_kqueue.hpp b/llarp/ev_kqueue.hpp index 13e9c1ed5..5e1355c50 100644 --- a/llarp/ev_kqueue.hpp +++ b/llarp/ev_kqueue.hpp @@ -97,12 +97,10 @@ namespace llarp { llarp_tun_io* t; device* tunif; - tun(llarp_tun_io* tio) - : ev_io(-1, new LossyWriteQueue_t("kqueue_tun_write")) - , t(tio) - , tunif(tuntap_init()) - { - }; + tun(llarp_tun_io* tio, llarp_ev_loop* l) + : ev_io(-1, new LossyWriteQueue_t("kqueue_tun_write", l)) + , t(tio) + , tunif(tuntap_init()){}; int sendto(const sockaddr* to, const void* data, size_t sz) @@ -189,7 +187,7 @@ struct llarp_kqueue_loop : public llarp_ev_loop llarp::ev_io* create_tun(llarp_tun_io* tun) { - llarp::tun* t = new llarp::tun(tun); + llarp::tun* t = new llarp::tun(tun, this); if(t->setup()) return t; delete t; diff --git a/llarp/handlers/tun.cpp b/llarp/handlers/tun.cpp index 53336c0c5..5fae89523 100644 --- a/llarp/handlers/tun.cpp +++ b/llarp/handlers/tun.cpp @@ -19,8 +19,8 @@ namespace llarp { TunEndpoint::TunEndpoint(const std::string &nickname, llarp_router *r) : service::Endpoint(nickname, r) - , m_UserToNetworkPktQueue(nickname + "_sendq") - , m_NetworkToUserPktQueue(nickname + "_recvq") + , m_UserToNetworkPktQueue(nickname + "_sendq", r->netloop) + , m_NetworkToUserPktQueue(nickname + "_recvq", r->netloop) { tunif.user = this; tunif.netmask = DefaultTunNetmask; @@ -369,7 +369,7 @@ namespace llarp huint32_t TunEndpoint::ObtainIPForAddr(const service::Address &addr) { - llarp_time_t now = llarp_time_now_ms(); + llarp_time_t now = Now(); huint32_t nextIP = {0}; { @@ -440,7 +440,7 @@ namespace llarp void TunEndpoint::MarkIPActive(huint32_t ip) { - m_IPActivity[ip] = std::max(llarp_time_now_ms(), m_IPActivity[ip]); + m_IPActivity[ip] = std::max(Now(), m_IPActivity[ip]); } void @@ -452,9 +452,8 @@ namespace llarp void TunEndpoint::handleTickTun(void *u) { - auto now = llarp_time_now_ms(); TunEndpoint *self = static_cast< TunEndpoint * >(u); - self->TickTun(now); + self->TickTun(self->Now()); } void diff --git a/llarp/link/server.cpp b/llarp/link/server.cpp index 29915cde3..c323842be 100644 --- a/llarp/link/server.cpp +++ b/llarp/link/server.cpp @@ -30,6 +30,7 @@ namespace llarp ILinkLayer::Configure(llarp_ev_loop* loop, const std::string& ifname, int af, uint16_t port) { + m_Loop = loop; m_udp.user = this; m_udp.recvfrom = &ILinkLayer::udp_recv_from; m_udp.tick = &ILinkLayer::udp_tick; @@ -47,13 +48,13 @@ namespace llarp void ILinkLayer::Pump() { - auto now = llarp_time_now_ms(); + auto _now = now(); { Lock lock(m_AuthedLinksMutex); auto itr = m_AuthedLinks.begin(); while(itr != m_AuthedLinks.end()) { - if(!itr->second->TimedOut(now)) + if(!itr->second->TimedOut(_now)) { itr->second->Pump(); ++itr; @@ -68,7 +69,7 @@ namespace llarp auto itr = m_Pending.begin(); while(itr != m_Pending.end()) { - if(!(*itr)->TimedOut(now)) + if(!(*itr)->TimedOut(_now)) { (*itr)->Pump(); ++itr; @@ -261,9 +262,9 @@ namespace llarp } void - ILinkLayer::OnTick(uint64_t interval, llarp_time_t now) + ILinkLayer::OnTick(uint64_t interval) { - Tick(now); + Tick(now()); ScheduleTick(interval); } diff --git a/llarp/link/utp.cpp b/llarp/link/utp.cpp index ce5b3e8b7..3b1a70e7f 100644 --- a/llarp/link/utp.cpp +++ b/llarp/link/utp.cpp @@ -180,23 +180,7 @@ namespace llarp EncryptThenHash(const byte_t* ptr, uint32_t sz, bool isLastFragment); bool - QueueWriteBuffers(llarp_buffer_t buf) - { - if(sendq.size() >= MaxSendQueueSize) - return false; - llarp::LogDebug("write ", buf.sz, " bytes to ", remoteAddr); - lastActive = llarp_time_now_ms(); - size_t sz = buf.sz; - byte_t* ptr = buf.base; - while(sz) - { - uint32_t s = std::min(FragmentBodyPayloadSize, sz); - EncryptThenHash(ptr, s, ((sz - s) == 0)); - ptr += s; - sz -= s; - } - return true; - } + QueueWriteBuffers(llarp_buffer_t buf); void Connect() @@ -578,7 +562,7 @@ namespace llarp SendQueueBacklog = [&]() -> size_t { return sendq.size(); }; SendKeepAlive = [&]() -> bool { - auto now = llarp_time_now_ms(); + auto now = parent->now(); if(sendq.size() == 0 && state == eSessionReady && now > lastActive && now - lastActive > (sessionTimeout / 4)) { @@ -600,7 +584,7 @@ namespace llarp return this->IsTimedOut(now) || this->state == eClose; }; GetPubKey = std::bind(&BaseSession::RemotePubKey, this); - lastActive = llarp_time_now_ms(); + lastActive = parent->now(); // Pump = []() {}; Pump = std::bind(&BaseSession::PumpWrite, this); Tick = std::bind(&BaseSession::TickImpl, this, std::placeholders::_1); @@ -661,6 +645,25 @@ namespace llarp return true; } + bool + BaseSession::QueueWriteBuffers(llarp_buffer_t buf) + { + if(sendq.size() >= MaxSendQueueSize) + return false; + llarp::LogDebug("write ", buf.sz, " bytes to ", remoteAddr); + lastActive = parent->now(); + size_t sz = buf.sz; + byte_t* ptr = buf.base; + while(sz) + { + uint32_t s = std::min(FragmentBodyPayloadSize, sz); + EncryptThenHash(ptr, s, ((sz - s) == 0)); + ptr += s; + sz -= s; + } + return true; + } + bool BaseSession::OutboundLIM(const LinkIntroMessage* msg) { @@ -949,7 +952,7 @@ namespace llarp void BaseSession::Alive() { - lastActive = llarp_time_now_ms(); + lastActive = parent->now(); } } // namespace utp diff --git a/llarp/logic.cpp b/llarp/logic.cpp index 688f128f2..4d3cd42b1 100644 --- a/llarp/logic.cpp +++ b/llarp/logic.cpp @@ -27,16 +27,17 @@ llarp_init_single_process_logic(struct llarp_threadpool* tp) } void -llarp_logic_tick(struct llarp_logic* logic) +llarp_logic_tick(struct llarp_logic* logic, llarp_time_t now) { + llarp_timer_set_time(logic->timer, now); llarp_timer_tick_all(logic->timer); llarp_threadpool_tick(logic->thread); } void -llarp_logic_tick_async(struct llarp_logic* logic) +llarp_logic_tick_async(struct llarp_logic* logic, llarp_time_t now) { - llarp_timer_tick_all_async(logic->timer, logic->thread); + llarp_timer_tick_all_async(logic->timer, logic->thread, now); llarp_threadpool_tick(logic->thread); } diff --git a/llarp/path.cpp b/llarp/path.cpp index f82b4f714..3d2ba9220 100644 --- a/llarp/path.cpp +++ b/llarp/path.cpp @@ -234,10 +234,9 @@ namespace llarp } void - PathContext::ExpirePaths() + PathContext::ExpirePaths(llarp_time_t now) { util::Lock lock(m_TransitPaths.first); - auto now = llarp_time_now_ms(); auto& map = m_TransitPaths.second; auto itr = map.begin(); while(itr != map.end()) @@ -258,11 +257,11 @@ namespace llarp } void - PathContext::BuildPaths() + PathContext::BuildPaths(llarp_time_t now) { for(auto& builder : m_PathBuilders) { - if(builder->ShouldBuildMore()) + if(builder->ShouldBuildMore(now)) { builder->BuildOne(); } @@ -270,9 +269,8 @@ namespace llarp } void - PathContext::TickPaths() + PathContext::TickPaths(llarp_time_t now) { - auto now = llarp_time_now_ms(); for(auto& builder : m_PathBuilders) builder->Tick(now, m_Router); } @@ -357,7 +355,7 @@ namespace llarp // initialize parts of the introduction intro.router = hops[hsz - 1].rc.pubkey; intro.pathID = hops[hsz - 1].txID; - EnterState(ePathBuilding); + EnterState(ePathBuilding, parent->Now()); } void @@ -397,7 +395,7 @@ namespace llarp } void - Path::EnterState(PathStatus st) + Path::EnterState(PathStatus st, llarp_time_t now) { if(st == ePathTimeout) { @@ -406,7 +404,7 @@ namespace llarp else if(st == ePathBuilding) { llarp::LogInfo("path ", Name(), " is building"); - buildStarted = llarp_time_now_ms(); + buildStarted = now; } _status = st; } @@ -425,7 +423,7 @@ namespace llarp if(dlt >= PATH_BUILD_TIMEOUT) { r->routerProfiling.MarkPathFail(this); - EnterState(ePathTimeout); + EnterState(ePathTimeout, now); return; } } @@ -452,19 +450,19 @@ namespace llarp if(m_CheckForDead(this, dlt)) { r->routerProfiling.MarkPathFail(this); - EnterState(ePathTimeout); + EnterState(ePathTimeout, now); } } else { r->routerProfiling.MarkPathFail(this); - EnterState(ePathTimeout); + EnterState(ePathTimeout, now); } } else if(dlt >= 10000 && m_LastRecvMessage == 0) { r->routerProfiling.MarkPathFail(this); - EnterState(ePathTimeout); + EnterState(ePathTimeout, now); } } } @@ -581,14 +579,15 @@ namespace llarp Path::HandlePathConfirmMessage( const llarp::routing::PathConfirmMessage* msg, llarp_router* r) { + auto now = r->Now(); if(_status == ePathBuilding) { // finish initializing introduction intro.expiresAt = buildStarted + hops[0].lifetime; // confirm that we build the path - EnterState(ePathEstablished); + EnterState(ePathEstablished, now); llarp::LogInfo("path is confirmed tx=", TXID(), " rx=", RXID(), - " took ", llarp_time_now_ms() - buildStarted, " ms"); + " took ", now - buildStarted, " ms"); if(m_BuiltHook) m_BuiltHook(this); m_BuiltHook = nullptr; @@ -602,7 +601,7 @@ namespace llarp llarp::routing::PathLatencyMessage latency; latency.T = llarp_randint(); m_LastLatencyTestID = latency.T; - m_LastLatencyTestTime = llarp_time_now_ms(); + m_LastLatencyTestTime = now; return SendRoutingMessage(&latency, r); } llarp::LogWarn("got unwarrented path confirm message on tx=", RXID(), @@ -617,7 +616,7 @@ namespace llarp { if(m_DataHandler(this, frame)) { - m_LastRecvMessage = llarp_time_now_ms(); + m_LastRecvMessage = m_PathSet->Now(); return true; } } @@ -628,7 +627,7 @@ namespace llarp Path::HandlePathLatencyMessage( const llarp::routing::PathLatencyMessage* msg, llarp_router* r) { - auto now = llarp_time_now_ms(); + auto now = r->Now(); // TODO: reanimate dead paths if they get this message if(msg->L == m_LastLatencyTestID && _status == ePathEstablished) { diff --git a/llarp/pathbuilder.cpp b/llarp/pathbuilder.cpp index 2fde1a28b..d1d5dc56d 100644 --- a/llarp/pathbuilder.cpp +++ b/llarp/pathbuilder.cpp @@ -191,10 +191,9 @@ namespace llarp } bool - Builder::ShouldBuildMore() const + Builder::ShouldBuildMore(llarp_time_t now) const { - auto now = llarp_time_now_ms(); - return llarp::path::PathSet::ShouldBuildMore() && now > lastBuild + return llarp::path::PathSet::ShouldBuildMore(now) && now > lastBuild && now - lastBuild > buildIntervalLimit; } @@ -236,10 +235,16 @@ namespace llarp return true; } + llarp_time_t + Builder::Now() const + { + return router->Now(); + } + void Builder::Build(const std::vector< RouterContact >& hops) { - lastBuild = llarp_time_now_ms(); + lastBuild = Now(); // async generate keys AsyncPathKeyExchangeContext< Builder >* ctx = new AsyncPathKeyExchangeContext< Builder >(&router->crypto); diff --git a/llarp/pathset.cpp b/llarp/pathset.cpp index d8bab8f04..ff9416ac8 100644 --- a/llarp/pathset.cpp +++ b/llarp/pathset.cpp @@ -12,8 +12,9 @@ namespace llarp } bool - PathSet::ShouldBuildMore() const + PathSet::ShouldBuildMore(llarp_time_t now) const { + (void)now; return m_Paths.size() < m_NumPaths; } @@ -160,7 +161,7 @@ namespace llarp void PathSet::HandlePathBuilt(Path* path) { - auto dlt = llarp_time_now_ms() - path->buildStarted; + auto dlt = Now() - path->buildStarted; llarp::LogInfo("Path build took ", dlt, "ms for tx=", path->TXID(), " rx=", path->RXID()); } diff --git a/llarp/proofofwork.cpp b/llarp/proofofwork.cpp index 0c6081a92..061f7482b 100644 --- a/llarp/proofofwork.cpp +++ b/llarp/proofofwork.cpp @@ -26,10 +26,8 @@ namespace llarp } bool - PoW::IsValid(llarp_shorthash_func hashfunc) const + PoW::IsValid(llarp_shorthash_func hashfunc, llarp_time_t now) const { - auto now = llarp_time_now_ms(); - if(now - timestamp > (uint64_t(extendedLifetime) * 1000)) return false; diff --git a/llarp/relay_commit.cpp b/llarp/relay_commit.cpp index 5b75d7ca2..2b84b3ac5 100644 --- a/llarp/relay_commit.cpp +++ b/llarp/relay_commit.cpp @@ -179,7 +179,7 @@ namespace llarp } ~LRCMFrameDecrypt() - { + { delete decrypter; } @@ -226,6 +226,7 @@ namespace llarp static void HandleDecrypted(llarp_buffer_t* buf, LRCMFrameDecrypt* self) { + auto now = self->context->Router()->Now(); auto& info = self->hop->info; if(!buf) { @@ -265,7 +266,7 @@ namespace llarp self->context->Crypto()->shorthash(self->hop->nonceXOR, llarp::Buffer(self->hop->pathKey)); if(self->record.work - && self->record.work->IsValid(self->context->Crypto()->shorthash)) + && self->record.work->IsValid(self->context->Crypto()->shorthash, now)) { llarp::LogDebug("LRCM extended lifetime by ", self->record.work->extendedLifetime, " seconds for ", @@ -280,7 +281,7 @@ namespace llarp } // TODO: check if we really want to accept it - self->hop->started = llarp_time_now_ms(); + self->hop->started = now; size_t sz = self->frames[0].size(); // shift diff --git a/llarp/router.cpp b/llarp/router.cpp index 0638dabb7..42af006a8 100644 --- a/llarp/router.cpp +++ b/llarp/router.cpp @@ -468,8 +468,8 @@ void llarp_router::Tick() { // llarp::LogDebug("tick router"); - auto now = llarp_time_now_ms(); - paths.ExpirePaths(); + auto now = llarp_ev_loop_time_now_ms(netloop); + paths.ExpirePaths(now); { auto itr = m_PersistingSessions.begin(); while(itr != m_PersistingSessions.end()) @@ -502,14 +502,14 @@ llarp_router::Tick() auto explore = std::max(NumberOfConnectedRouters(), size_t(1)); dht->impl.Explore(explore); } - paths.BuildPaths(); + paths.BuildPaths(now); hiddenServiceContext.Tick(); } if(NumberOfConnectedRouters() < minConnectedRouters) { ConnectToRandomRouters(minConnectedRouters); } - paths.TickPaths(); + paths.TickPaths(now); } void diff --git a/llarp/router.hpp b/llarp/router.hpp index 8fabbbfe4..15e24ad5d 100644 --- a/llarp/router.hpp +++ b/llarp/router.hpp @@ -242,6 +242,13 @@ struct llarp_router void Tick(); + /// get time from event loop + llarp_time_t + Now() const + { + return llarp_ev_loop_time_now_ms(netloop); + } + /// schedule ticker to call i ms from now void ScheduleTicker(uint64_t i = 1000); diff --git a/llarp/service.cpp b/llarp/service.cpp index 4f5f99de7..c8ad30d35 100644 --- a/llarp/service.cpp +++ b/llarp/service.cpp @@ -273,13 +273,13 @@ namespace llarp } bool - Identity::SignIntroSet(IntroSet& i, llarp_crypto* crypto) const + Identity::SignIntroSet(IntroSet& i, llarp_crypto* crypto, llarp_time_t now) const { if(i.I.size() == 0) return false; // set timestamp // TODO: round to nearest 1000 ms - i.T = llarp_time_now_ms(); + i.T = now; // set service info i.A = pub; // set public encryption key @@ -297,7 +297,7 @@ namespace llarp } bool - IntroSet::Verify(llarp_crypto* crypto) const + IntroSet::Verify(llarp_crypto* crypto, llarp_time_t now) const { byte_t tmp[MAX_INTROSET_SIZE]; auto buf = llarp::StackBuffer< decltype(tmp) >(tmp); @@ -312,10 +312,9 @@ namespace llarp if(!A.Verify(crypto, buf, Z)) return false; // validate PoW - if(W && !W->IsValid(crypto->shorthash)) + if(W && !W->IsValid(crypto->shorthash, now)) return false; // valid timestamps - auto now = llarp_time_now_ms(); // add max clock skew now += MAX_INTROSET_TIME_DELTA; for(const auto& intro : I) diff --git a/llarp/service/context.cpp b/llarp/service/context.cpp index 06d1640e9..4b65f193f 100644 --- a/llarp/service/context.cpp +++ b/llarp/service/context.cpp @@ -1,6 +1,7 @@ #include #include #include +#include "router.hpp" namespace llarp { @@ -22,7 +23,7 @@ namespace llarp void Context::Tick() { - auto now = llarp_time_now_ms(); + auto now = m_Router->Now(); auto itr = m_Endpoints.begin(); while(itr != m_Endpoints.end()) { diff --git a/llarp/service/endpoint.cpp b/llarp/service/endpoint.cpp index 1a5ff5035..b9f7625d6 100644 --- a/llarp/service/endpoint.cpp +++ b/llarp/service/endpoint.cpp @@ -102,7 +102,7 @@ namespace llarp { llarp::LogWarn("could not publish descriptors for endpoint ", Name(), " because we couldn't get enough valid introductions"); - if(ShouldBuildMore() || forceRebuild) + if(ShouldBuildMore(now) || forceRebuild) ManualRebuild(1); return; } @@ -117,7 +117,7 @@ namespace llarp return; } m_IntroSet.topic = m_Tag; - if(!m_Identity.SignIntroSet(m_IntroSet, &m_Router->crypto)) + if(!m_Identity.SignIntroSet(m_IntroSet, &m_Router->crypto, now)) { llarp::LogWarn("failed to sign introset for endpoint ", Name()); return; @@ -300,7 +300,7 @@ namespace llarp std::set< IntroSet > remote; for(const auto& introset : msg->I) { - if(!introset.Verify(crypto)) + if(!introset.Verify(crypto, Now())) { if(m_Identity.pub == introset.A && m_CurrentPublishTX == msg->T) { @@ -358,7 +358,7 @@ namespace llarp itr = m_Sessions.insert(std::make_pair(tag, Session{})).first; } itr->second.remote = info; - itr->second.lastUsed = llarp_time_now_ms(); + itr->second.lastUsed = Now(); } bool @@ -380,7 +380,7 @@ namespace llarp itr = m_Sessions.insert(std::make_pair(tag, Session{})).first; } itr->second.intro = intro; - itr->second.lastUsed = llarp_time_now_ms(); + itr->second.lastUsed = Now(); } bool @@ -430,7 +430,7 @@ namespace llarp itr = m_Sessions.insert(std::make_pair(tag, Session{})).first; } itr->second.sharedKey = k; - itr->second.lastUsed = llarp_time_now_ms(); + itr->second.lastUsed = Now(); } bool @@ -469,7 +469,7 @@ namespace llarp Endpoint::CachedTagResult::HandleResponse( const std::set< IntroSet >& introsets) { - auto now = llarp_time_now_ms(); + auto now = parent->Now(); for(const auto& introset : introsets) if(result.insert(introset).second) @@ -505,7 +505,7 @@ namespace llarp { llarp::routing::DHTMessage* msg = new llarp::routing::DHTMessage(); msg->M.emplace_back(new llarp::dht::FindIntroMessage(tag, txid)); - lastRequest = llarp_time_now_ms(); + lastRequest = parent->Now(); return msg; } @@ -563,7 +563,7 @@ namespace llarp auto job = new PublishIntroSetJob(this, GenTXID(), m_IntroSet); if(job->SendRequestViaPath(path, r)) { - m_LastPublishAttempt = llarp_time_now_ms(); + m_LastPublishAttempt = Now(); return true; } return false; @@ -582,7 +582,7 @@ namespace llarp void Endpoint::IntroSetPublished() { - m_LastPublish = llarp_time_now_ms(); + m_LastPublish = Now(); llarp::LogInfo(Name(), " IntroSet publish confirmed"); } @@ -761,7 +761,7 @@ namespace llarp { llarp::LogWarn(Name(), " message ", seq, " dropped by endpoint ", p->Endpoint(), " via ", dst); - if(MarkCurrentIntroBad(llarp_time_now_ms())) + if(MarkCurrentIntroBad(Now())) { llarp::LogInfo(Name(), " switched intros to ", remoteIntro.router, " via ", remoteIntro.pathID); @@ -797,7 +797,7 @@ namespace llarp , m_DataHandler(ep) , m_Endpoint(ep) { - createdAt = llarp_time_now_ms(); + createdAt = ep->Now(); } void @@ -822,7 +822,7 @@ namespace llarp Endpoint::HandlePathDead(void* user) { Endpoint* self = static_cast< Endpoint* >(user); - self->RegenAndPublishIntroSet(llarp_time_now_ms(), true); + self->RegenAndPublishIntroSet(self->Now(), true); } bool @@ -845,7 +845,7 @@ namespace llarp Endpoint::OnLookup(const Address& addr, const IntroSet* introset, const RouterID& endpoint) { - auto now = llarp_time_now_ms(); + auto now = Now(); if(introset == nullptr || introset->IsExpired(now)) { llarp::LogError(Name(), " failed to lookup ", addr.ToString(), " from ", @@ -958,7 +958,7 @@ namespace llarp { remoteIntro = m_NextIntro; // prepare next intro - auto now = llarp_time_now_ms(); + auto now = Now(); for(const auto& intro : currentIntroSet.I) { if(intro.ExpiresSoon(now)) @@ -996,7 +996,7 @@ namespace llarp llarp::LogInfo("introset is old, dropping"); return true; } - auto now = llarp_time_now_ms(); + auto now = Now(); if(i->IsExpired(now)) { llarp::LogError("got expired introset from lookup from ", endpoint); @@ -1028,11 +1028,11 @@ namespace llarp ProtocolType t) { // inbound converstation + auto now = Now(); { auto itr = m_AddressToService.find(remote); if(itr != m_AddressToService.end()) { - auto now = llarp_time_now_ms(); routing::PathTransferMessage transfer; ProtocolFrame& f = transfer.T; path::Path* p = nullptr; @@ -1211,7 +1211,7 @@ namespace llarp Endpoint::OutboundContext::ShiftIntroduction() { bool success = false; - auto now = llarp_time_now_ms(); + auto now = Now(); if(now - lastShift < MIN_SHIFT_INTERVAL) return false; bool shifted = false; @@ -1255,7 +1255,7 @@ namespace llarp Endpoint::SendContext::AsyncEncryptAndSendTo(llarp_buffer_t data, ProtocolType protocol) { - auto now = llarp_time_now_ms(); + auto now = m_Endpoint->Now(); if(remoteIntro.ExpiresSoon(now)) { if(!MarkCurrentIntroBad(now)) @@ -1409,7 +1409,7 @@ namespace llarp { llarp::LogDebug("sent data to ", remoteIntro.pathID, " on ", remoteIntro.router); - lastGoodSend = llarp_time_now_ms(); + lastGoodSend = m_Endpoint->Now(); } else llarp::LogError("Failed to send frame on path"); @@ -1532,17 +1532,15 @@ namespace llarp } bool - Endpoint::OutboundContext::ShouldBuildMore() const + Endpoint::OutboundContext::ShouldBuildMore(llarp_time_t now) const { if(markedBad) return false; - bool should = path::Builder::ShouldBuildMore(); + bool should = path::Builder::ShouldBuildMore(now); // determinte newest intro Introduction intro; if(!GetNewestIntro(intro)) return should; - - auto now = llarp_time_now_ms(); // time from now that the newest intro expires at if(now >= intro.expiresAt) return should; @@ -1567,7 +1565,7 @@ namespace llarp f.T = currentConvoTag; f.S = m_Endpoint->GetSeqNoForConvo(f.T); - auto now = llarp_time_now_ms(); + auto now = m_Endpoint->Now(); if(remoteIntro.ExpiresSoon(now)) { // shift intro diff --git a/llarp/timer.cpp b/llarp/timer.cpp index 5cfe773e6..98387f1da 100644 --- a/llarp/timer.cpp +++ b/llarp/timer.cpp @@ -20,11 +20,11 @@ namespace llarp bool done; bool canceled; - timer(uint64_t ms = 0, void* _user = nullptr, + timer(llarp_time_t now, uint64_t ms = 0, void* _user = nullptr, llarp_timer_handler_func _func = nullptr) : user(_user) , called_at(0) - , started(llarp_time_now_ms()) + , started(now) , timeout(ms) , func(_func) , done(false) @@ -62,6 +62,13 @@ struct llarp_timer_context llarp::util::Condition* ticker = nullptr; std::chrono::milliseconds nextTickLen = std::chrono::milliseconds(100); + llarp_time_t m_Now; + + llarp_timer_context() + { + m_Now = llarp_time_now_ms(); + } + uint32_t ids = 0; bool _run = true; @@ -108,11 +115,12 @@ struct llarp_timer_context call_later(void* user, llarp_timer_handler_func func, uint64_t timeout_ms) { llarp::util::Lock lock(timersMutex); + uint32_t id = ++ids; timers.insert( std::make_pair(id, std::unique_ptr< llarp::timer >( - new llarp::timer(timeout_ms, user, func)))); + new llarp::timer(m_Now, timeout_ms, user, func)))); return id; } @@ -181,19 +189,27 @@ llarp_timer_cancel_job(struct llarp_timer_context* t, uint32_t id) t->cancel(id); } +void +llarp_timer_set_time(struct llarp_timer_context* t, llarp_time_t now) +{ + if(now == 0) + now = llarp_time_now_ms(); + t->m_Now = now; +} + void llarp_timer_tick_all(struct llarp_timer_context* t) { if(!t->run()) return; - auto now = llarp_time_now_ms(); + std::list< std::unique_ptr< llarp::timer > > hit; { llarp::util::Lock lock(t->timersMutex); auto itr = t->timers.begin(); while(itr != t->timers.end()) { - if(now - itr->second->started >= itr->second->timeout + if(t->m_Now - itr->second->started >= itr->second->timeout || itr->second->canceled) { // timer hit @@ -208,7 +224,7 @@ llarp_timer_tick_all(struct llarp_timer_context* t) { if(h->func) { - h->called_at = now; + h->called_at = t->m_Now; h->exec(); } } @@ -222,8 +238,9 @@ llarp_timer_tick_all_job(void* user) void llarp_timer_tick_all_async(struct llarp_timer_context* t, - struct llarp_threadpool* pool) + struct llarp_threadpool* pool, llarp_time_t now) { + t->m_Now = now; llarp_threadpool_queue_job(pool, {t, llarp_timer_tick_all_job}); } @@ -244,7 +261,7 @@ llarp_timer_run(struct llarp_timer_context* t, struct llarp_threadpool* pool) { llarp::util::Lock lock(t->timersMutex); // we woke up - llarp_timer_tick_all_async(t, pool); + llarp_timer_tick_all_async(t, pool, llarp_time_now_ms()); } } }