use event loop for time

pull/36/head
Jeff Becker 6 years ago
parent c5e2cffdbb
commit dbd2c41909
No known key found for this signature in database
GPG Key ID: F357B3B42F6F9B05

@ -41,7 +41,8 @@ namespace llarp
size_t MaxSize = 1024 > size_t MaxSize = 1024 >
struct CoDelQueue struct CoDelQueue
{ {
CoDelQueue(const std::string& name) : m_name(name) CoDelQueue(const std::string& name, const PutTime& put)
: m_name(name), _putTime(put)
{ {
} }
@ -67,9 +68,9 @@ namespace llarp
return false; return false;
} }
PutTime()(m_Queue[m_QueueIdx]); _putTime(m_Queue[m_QueueIdx]);
if(firstPut == 0) if(firstPut == 0)
firstPut = GetTime()(m_Queue[m_QueueIdx]); firstPut = _getTime(m_Queue[m_QueueIdx]);
++m_QueueIdx; ++m_QueueIdx;
return true; return true;
@ -84,9 +85,9 @@ namespace llarp
return; return;
T* t = &m_Queue[m_QueueIdx]; T* t = &m_Queue[m_QueueIdx];
new(t) T(std::forward< Args >(args)...); new(t) T(std::forward< Args >(args)...);
PutTime()(m_Queue[m_QueueIdx]); _putTime(m_Queue[m_QueueIdx]);
if(firstPut == 0) if(firstPut == 0)
firstPut = GetTime()(m_Queue[m_QueueIdx]); firstPut = _getTime(m_Queue[m_QueueIdx]);
++m_QueueIdx; ++m_QueueIdx;
} }
@ -123,7 +124,7 @@ namespace llarp
if(f(*item)) if(f(*item))
break; break;
--m_QueueIdx; --m_QueueIdx;
auto dlt = start - GetTime()(*item); auto dlt = start - _getTime(*item);
// llarp::LogInfo("CoDelQueue::Process - dlt ", dlt); // llarp::LogInfo("CoDelQueue::Process - dlt ", dlt);
lowest = std::min(dlt, lowest); lowest = std::min(dlt, lowest);
if(m_QueueIdx == 0) if(m_QueueIdx == 0)
@ -156,6 +157,8 @@ namespace llarp
size_t m_QueueIdx = 0; size_t m_QueueIdx = 0;
T m_Queue[MaxSize]; T m_Queue[MaxSize];
std::string m_name; std::string m_name;
GetTime _getTime;
PutTime _putTime;
}; // namespace util }; // namespace util
} // namespace util } // namespace util
} // namespace llarp } // namespace llarp

@ -385,6 +385,9 @@ namespace llarp
return ++ids; return ++ids;
} }
llarp_time_t
Now();
private: private:
void void
ExploreNetworkVia(const Key_t& peer); ExploreNetworkVia(const Key_t& peer);

@ -16,6 +16,7 @@
#include <stdint.h> #include <stdint.h>
#include <stdlib.h> #include <stdlib.h>
#include <tuntap.h> #include <tuntap.h>
#include <llarp/time.h>
/** /**
* ev.h * ev.h
* *
@ -45,6 +46,10 @@ llarp_ev_loop_run_single_process(struct llarp_ev_loop *ev,
struct llarp_threadpool *tp, struct llarp_threadpool *tp,
struct llarp_logic *logic); struct llarp_logic *logic);
/// get the current time on the event loop
llarp_time_t
llarp_ev_loop_time_now_ms(struct llarp_ev_loop *ev);
/// stop event loop and wait for it to complete all jobs /// stop event loop and wait for it to complete all jobs
void void
llarp_ev_loop_stop(struct llarp_ev_loop *ev); llarp_ev_loop_stop(struct llarp_ev_loop *ev);
@ -57,6 +62,7 @@ struct llarp_udp_io
void *user; void *user;
void *impl; void *impl;
struct llarp_ev_loop *parent; struct llarp_ev_loop *parent;
/// called every event loop tick after reads /// called every event loop tick after reads
void (*tick)(struct llarp_udp_io *); void (*tick)(struct llarp_udp_io *);
// sockaddr * is the source // sockaddr * is the source

@ -3,6 +3,7 @@
#include <llarp/buffer.h> #include <llarp/buffer.h>
#include <llarp/time.h> #include <llarp/time.h>
#include <llarp/net.hpp> #include <llarp/net.hpp>
#include <llarp/ev.h>
#ifndef _WIN32 #ifndef _WIN32
// unix, linux // unix, linux
@ -101,10 +102,14 @@ namespace llarp
struct PutTime struct PutTime
{ {
llarp_ev_loop* loop;
PutTime(llarp_ev_loop* evloop) : loop(evloop)
{
}
void void
operator()(IPv4Packet& pkt) const operator()(IPv4Packet& pkt) const
{ {
pkt.timestamp = llarp_time_now_ms(); pkt.timestamp = llarp_ev_loop_time_now_ms(loop);
} }
}; };

@ -18,7 +18,12 @@ namespace llarp
struct ILinkLayer struct ILinkLayer
{ {
virtual ~ILinkLayer(); virtual ~ILinkLayer();
/// get current time via event loop
llarp_time_t
now() const
{
return llarp_ev_loop_time_now_ms(m_Loop);
}
bool bool
HasSessionTo(const PubKey& pk); HasSessionTo(const PubKey& pk);
@ -111,11 +116,11 @@ namespace llarp
// timer cancelled // timer cancelled
if(left) if(left)
return; return;
static_cast< ILinkLayer* >(user)->OnTick(orig, llarp_time_now_ms()); static_cast< ILinkLayer* >(user)->OnTick(orig);
} }
void void
OnTick(uint64_t interval, llarp_time_t now); OnTick(uint64_t interval);
void void
ScheduleTick(uint64_t interval); ScheduleTick(uint64_t interval);
@ -129,7 +134,8 @@ namespace llarp
void void
PutSession(ILinkSession* s); PutSession(ILinkSession* s);
llarp_logic* m_Logic = nullptr; llarp_logic* m_Logic = nullptr;
llarp_ev_loop* m_Loop = nullptr;
Addr m_ourAddr; Addr m_ourAddr;
llarp_udp_io m_udp; llarp_udp_io m_udp;
SecretKey m_SecretKey; SecretKey m_SecretKey;

@ -19,11 +19,11 @@ llarp_init_single_process_logic(struct llarp_threadpool* tp);
/// single threaded tick /// single threaded tick
void void
llarp_logic_tick(struct llarp_logic* logic); llarp_logic_tick(struct llarp_logic* logic, llarp_time_t now);
/// isolated tick /// isolated tick
void void
llarp_logic_tick_async(struct llarp_logic* logic); llarp_logic_tick_async(struct llarp_logic* logic, llarp_time_t now);
void void
llarp_free_logic(struct llarp_logic** logic); llarp_free_logic(struct llarp_logic** logic);

@ -271,7 +271,7 @@ namespace llarp
} }
void void
EnterState(PathStatus st); EnterState(PathStatus st, llarp_time_t now);
llarp_time_t llarp_time_t
ExpireTime() const ExpireTime() const
@ -374,16 +374,16 @@ namespace llarp
/// called from router tick function /// called from router tick function
void void
ExpirePaths(); ExpirePaths(llarp_time_t now);
/// called from router tick function /// called from router tick function
/// builds all paths we need to build at current tick /// builds all paths we need to build at current tick
void void
BuildPaths(); BuildPaths(llarp_time_t now);
/// called from router tick function /// called from router tick function
void void
TickPaths(); TickPaths(llarp_time_t now);
/// track a path builder with this context /// track a path builder with this context
void void

@ -29,7 +29,10 @@ namespace llarp
size_t hop); size_t hop);
virtual bool virtual bool
ShouldBuildMore() const; ShouldBuildMore(llarp_time_t now) const;
llarp_time_t
Now() const;
void void
BuildOne(); BuildOne();

@ -66,9 +66,13 @@ namespace llarp
size_t size_t
NumInStatus(PathStatus st) const; NumInStatus(PathStatus st) const;
/// get time from event loop
virtual llarp_time_t
Now() const = 0;
/// return true if we should build another path /// return true if we should build another path
virtual bool virtual bool
ShouldBuildMore() const; ShouldBuildMore(llarp_time_t now) const;
/// return true if we should publish a new hidden service descriptor /// return true if we should publish a new hidden service descriptor
virtual bool virtual bool

@ -17,7 +17,7 @@ namespace llarp
~PoW(); ~PoW();
bool bool
IsValid(llarp_shorthash_func hashfunc) const; IsValid(llarp_shorthash_func hashfunc, llarp_time_t now) const;
bool bool
DecodeKey(llarp_buffer_t k, llarp_buffer_t* val); DecodeKey(llarp_buffer_t k, llarp_buffer_t* val);

@ -46,7 +46,7 @@ namespace llarp
DecodeKey(llarp_buffer_t key, llarp_buffer_t* buf); DecodeKey(llarp_buffer_t key, llarp_buffer_t* buf);
bool bool
SignIntroSet(IntroSet& i, llarp_crypto* c) const; SignIntroSet(IntroSet& i, llarp_crypto* c, llarp_time_t now) const;
bool bool
Sign(llarp_crypto*, byte_t* sig, llarp_buffer_t buf) const; Sign(llarp_crypto*, byte_t* sig, llarp_buffer_t buf) const;

@ -148,7 +148,7 @@ namespace llarp
DecodeKey(llarp_buffer_t key, llarp_buffer_t* buf); DecodeKey(llarp_buffer_t key, llarp_buffer_t* buf);
bool bool
Verify(llarp_crypto* crypto) const; Verify(llarp_crypto* crypto, llarp_time_t now) const;
}; };
} // namespace service } // namespace service
} // namespace llarp } // namespace llarp

@ -6,6 +6,7 @@
#include <llarp/service/handler.hpp> #include <llarp/service/handler.hpp>
#include <llarp/service/protocol.hpp> #include <llarp/service/protocol.hpp>
#include <llarp/path.hpp> #include <llarp/path.hpp>
#include <llarp/ev.h>
// minimum time between interoset shifts // minimum time between interoset shifts
#ifndef MIN_SHIFT_INTERVAL #ifndef MIN_SHIFT_INTERVAL
@ -43,6 +44,13 @@ namespace llarp
virtual void virtual void
Tick(llarp_time_t now); Tick(llarp_time_t now);
/// get time via event loop
llarp_time_t
Now()
{
return llarp_ev_loop_time_now_ms(EndpointNetLoop());
}
/// router's logic /// router's logic
llarp_logic* llarp_logic*
RouterLogic(); RouterLogic();
@ -263,7 +271,7 @@ namespace llarp
ReadyToSend() const; ReadyToSend() const;
bool bool
ShouldBuildMore() const; ShouldBuildMore(llarp_time_t now) const;
/// tick internal state /// tick internal state
/// return true to mark as dead /// return true to mark as dead
@ -455,7 +463,7 @@ namespace llarp
{ {
RouterLookupJob(Endpoint* p) RouterLookupJob(Endpoint* p)
{ {
started = llarp_time_now_ms(); started = p->Now();
txid = p->GenTXID(); txid = p->GenTXID();
} }
@ -511,8 +519,9 @@ namespace llarp
llarp_time_t lastModified = 0; llarp_time_t lastModified = 0;
std::set< IntroSet > result; std::set< IntroSet > result;
Tag tag; Tag tag;
Endpoint * parent;
CachedTagResult(const Tag& t) : tag(t) CachedTagResult(const Tag& t, Endpoint * p) : tag(t), parent(p)
{ {
} }

@ -2,6 +2,7 @@
#define LLARP_TIMER_H #define LLARP_TIMER_H
#include <llarp/common.h> #include <llarp/common.h>
#include <llarp/threadpool.h> #include <llarp/threadpool.h>
#include <llarp/time.h>
/** called with userptr, original timeout, left */ /** called with userptr, original timeout, left */
typedef void (*llarp_timer_handler_func)(void *, uint64_t, uint64_t); typedef void (*llarp_timer_handler_func)(void *, uint64_t, uint64_t);
@ -32,6 +33,11 @@ llarp_timer_remove_job(struct llarp_timer_context *t, uint32_t id);
void void
llarp_timer_stop(struct llarp_timer_context *t); llarp_timer_stop(struct llarp_timer_context *t);
/// set timer's timestamp, if now is 0 use the current time from system clock,
/// llarp_time_t now
void
llarp_timer_set_time(struct llarp_timer_context *t, llarp_time_t now);
// blocking run timer and send events to thread pool // blocking run timer and send events to thread pool
void void
llarp_timer_run(struct llarp_timer_context *t, struct llarp_threadpool *pool); llarp_timer_run(struct llarp_timer_context *t, struct llarp_threadpool *pool);
@ -43,7 +49,7 @@ llarp_timer_tick_all(struct llarp_timer_context *t);
/// tick all timers into a threadpool asynchronously /// tick all timers into a threadpool asynchronously
void void
llarp_timer_tick_all_async(struct llarp_timer_context *t, llarp_timer_tick_all_async(struct llarp_timer_context *t,
struct llarp_threadpool *pool); struct llarp_threadpool *pool, llarp_time_t now);
void void
llarp_free_timer(struct llarp_timer_context **t); llarp_free_timer(struct llarp_timer_context **t);

@ -49,6 +49,12 @@ namespace abyss
void void
RemoveConn(IRPCHandler* handler); RemoveConn(IRPCHandler* handler);
llarp_time_t
now() const
{
return llarp_ev_loop_time_now_ms(m_loop);
}
protected: protected:
virtual IRPCHandler* virtual IRPCHandler*
CreateHandler(ConnImpl* connimpl) const = 0; CreateHandler(ConnImpl* connimpl) const = 0;

@ -48,7 +48,7 @@ namespace abyss
: _conn(c), _parent(p) : _conn(c), _parent(p)
{ {
handler = nullptr; handler = nullptr;
m_LastActive = llarp_time_now_ms(); m_LastActive = p->now();
m_ReadTimeout = readtimeout; m_ReadTimeout = readtimeout;
// set up tcp members // set up tcp members
_conn->user = this; _conn->user = this;
@ -265,7 +265,7 @@ namespace abyss
return false; return false;
} }
m_LastActive = llarp_time_now_ms(); m_LastActive = _parent->now();
if(m_State < eReadHTTPBody) if(m_State < eReadHTTPBody)
{ {
const char* end = strstr(buf, "\r\n"); const char* end = strstr(buf, "\r\n");
@ -395,11 +395,11 @@ namespace abyss
void void
BaseReqHandler::Tick() BaseReqHandler::Tick()
{ {
auto now = llarp_time_now_ms(); auto _now = now();
auto itr = m_Conns.begin(); auto itr = m_Conns.begin();
while(itr != m_Conns.end()) while(itr != m_Conns.end())
{ {
if((*itr)->ShouldClose(now)) if((*itr)->ShouldClose(_now)
itr = m_Conns.erase(itr); itr = m_Conns.erase(itr);
else else
++itr; ++itr;

@ -115,7 +115,7 @@ namespace llarp
if(ctx->services) if(ctx->services)
{ {
// expire intro sets // expire intro sets
auto now = llarp_time_now_ms(); auto now = ctx->Now();
auto &nodes = ctx->services->nodes; auto &nodes = ctx->services->nodes;
auto itr = nodes.begin(); auto itr = nodes.begin();
while(itr != nodes.end()) while(itr != nodes.end())
@ -244,7 +244,7 @@ namespace llarp
void void
Context::CleanupTX() Context::CleanupTX()
{ {
auto now = llarp_time_now_ms(); auto now = Now();
llarp::LogDebug("DHT tick"); llarp::LogDebug("DHT tick");
pendingRouterLookups.Expire(now); pendingRouterLookups.Expire(now);
@ -285,7 +285,7 @@ namespace llarp
router->SendToOrQueue(peer, &m); router->SendToOrQueue(peer, &m);
if(keepalive) if(keepalive)
{ {
auto now = llarp_time_now_ms(); auto now = Now();
router->PersistSessionUntil(peer, now + 10000); router->PersistSessionUntil(peer, now + 10000);
} }
} }
@ -323,7 +323,7 @@ namespace llarp
bool bool
Validate(const service::IntroSet &value) const Validate(const service::IntroSet &value) const
{ {
if(!value.Verify(parent->Crypto())) if(!value.Verify(parent->Crypto(), parent->Now()))
{ {
llarp::LogWarn("Got invalid introset from service lookup"); llarp::LogWarn("Got invalid introset from service lookup");
return false; return false;
@ -547,7 +547,7 @@ namespace llarp
bool bool
Validate(const service::IntroSet &introset) const Validate(const service::IntroSet &introset) const
{ {
if(!introset.Verify(parent->Crypto())) if(!introset.Verify(parent->Crypto(), parent->Now()))
{ {
llarp::LogWarn("got invalid introset from tag lookup"); llarp::LogWarn("got invalid introset from tag lookup");
return false; return false;
@ -824,5 +824,11 @@ namespace llarp
return &router->crypto; return &router->crypto;
} }
llarp_time_t
Context::Now()
{
return llarp_ev_loop_time_now_ms(router->netloop);
}
} // namespace dht } // namespace dht
} // namespace llarp } // namespace llarp

@ -28,7 +28,7 @@ namespace llarp
for(const auto &introset : I) for(const auto &introset : I)
{ {
if(!introset.Verify(crypto)) if(!introset.Verify(crypto, dht.Now()))
{ {
llarp::LogWarn( llarp::LogWarn(
"Invalid introset while handling direct GotIntro " "Invalid introset while handling direct GotIntro "

@ -44,20 +44,21 @@ namespace llarp
llarp_dht_context *ctx, llarp_dht_context *ctx,
std::vector< std::unique_ptr< IMessage > > &replies) const std::vector< std::unique_ptr< IMessage > > &replies) const
{ {
auto now = ctx->impl.Now();
if(S > 5) if(S > 5)
{ {
llarp::LogWarn("invalid S value ", S, " > 5"); llarp::LogWarn("invalid S value ", S, " > 5");
return false; return false;
} }
auto &dht = ctx->impl; auto &dht = ctx->impl;
if(!I.Verify(&dht.router->crypto)) if(!I.Verify(&dht.router->crypto, now))
{ {
llarp::LogWarn("invalid introset: ", I); llarp::LogWarn("invalid introset: ", I);
// don't propogate or store // don't propogate or store
replies.emplace_back(new GotIntroMessage({}, txID)); replies.emplace_back(new GotIntroMessage({}, txID));
return true; return true;
} }
if(I.W && !I.W->IsValid(dht.router->crypto.shorthash)) if(I.W && !I.W->IsValid(dht.router->crypto.shorthash, now))
{ {
llarp::LogWarn("proof of work not good enough for IntroSet"); llarp::LogWarn("proof of work not good enough for IntroSet");
// don't propogate or store // don't propogate or store
@ -71,7 +72,7 @@ namespace llarp
"failed to calculate hidden service address for PubIntro message"); "failed to calculate hidden service address for PubIntro message");
return false; return false;
} }
auto now = llarp_time_now_ms();
now += llarp::service::MAX_INTROSET_TIME_DELTA; now += llarp::service::MAX_INTROSET_TIME_DELTA;
if(I.IsExpired(now)) if(I.IsExpired(now))
{ {

@ -30,6 +30,7 @@ llarp_ev_loop_alloc(struct llarp_ev_loop **ev)
*ev = new llarp_win32_loop; *ev = new llarp_win32_loop;
#endif #endif
(*ev)->init(); (*ev)->init();
(*ev)->_now = llarp_time_now_ms();
} }
void void
@ -44,9 +45,10 @@ llarp_ev_loop_run(struct llarp_ev_loop *ev, struct llarp_logic *logic)
{ {
while(ev->running()) while(ev->running())
{ {
ev->_now = llarp_time_now_ms();
ev->tick(EV_TICK_INTERVAL); ev->tick(EV_TICK_INTERVAL);
if(ev->running()) if(ev->running())
llarp_logic_tick(logic); llarp_logic_tick(logic, ev->_now);
} }
return 0; return 0;
} }
@ -58,10 +60,11 @@ llarp_ev_loop_run_single_process(struct llarp_ev_loop *ev,
{ {
while(ev->running()) while(ev->running())
{ {
ev->_now = llarp_time_now_ms();
ev->tick(EV_TICK_INTERVAL); ev->tick(EV_TICK_INTERVAL);
if(ev->running()) if(ev->running())
{ {
llarp_logic_tick_async(logic); llarp_logic_tick_async(logic, ev->_now);
llarp_threadpool_tick(tp); llarp_threadpool_tick(tp);
} }
} }
@ -85,6 +88,12 @@ llarp_ev_close_udp(struct llarp_udp_io *udp)
return -1; return -1;
} }
llarp_time_t
llarp_ev_loop_time_now_ms(struct llarp_ev_loop *loop)
{
return loop->_now;
}
void void
llarp_ev_loop_stop(struct llarp_ev_loop *loop) llarp_ev_loop_stop(struct llarp_ev_loop *loop)
{ {
@ -120,8 +129,8 @@ bool
llarp_tcp_conn_async_write(struct llarp_tcp_conn *conn, const void *pkt, llarp_tcp_conn_async_write(struct llarp_tcp_conn *conn, const void *pkt,
size_t sz) size_t sz)
{ {
const byte_t *ptr = (const byte_t *)pkt; const byte_t *ptr = (const byte_t *)pkt;
llarp::tcp_conn *impl = static_cast< llarp::tcp_conn * >(conn->impl); llarp::tcp_conn *impl = static_cast< llarp::tcp_conn * >(conn->impl);
if(impl->_shouldClose) if(impl->_shouldClose)
return false; return false;
while(sz > EV_WRITE_BUF_SZ) while(sz > EV_WRITE_BUF_SZ)
@ -227,9 +236,8 @@ namespace llarp
} // namespace llarp } // namespace llarp
llarp::ev_io *
llarp::ev_io* llarp_ev_loop::bind_tcp(llarp_tcp_acceptor *tcp, const sockaddr *bindaddr)
llarp_ev_loop::bind_tcp(llarp_tcp_acceptor* tcp, const sockaddr* bindaddr)
{ {
int fd = ::socket(bindaddr->sa_family, SOCK_STREAM, 0); int fd = ::socket(bindaddr->sa_family, SOCK_STREAM, 0);
if(fd == -1) if(fd == -1)
@ -237,7 +245,7 @@ llarp_ev_loop::bind_tcp(llarp_tcp_acceptor* tcp, const sockaddr* bindaddr)
socklen_t sz = sizeof(sockaddr_in); socklen_t sz = sizeof(sockaddr_in);
if(bindaddr->sa_family == AF_INET6) if(bindaddr->sa_family == AF_INET6)
{ {
sz = sizeof(sockaddr_in6); sz = sizeof(sockaddr_in6);
} }
else if(bindaddr->sa_family == AF_UNIX) else if(bindaddr->sa_family == AF_UNIX)
{ {
@ -253,7 +261,7 @@ llarp_ev_loop::bind_tcp(llarp_tcp_acceptor* tcp, const sockaddr* bindaddr)
::close(fd); ::close(fd);
return nullptr; return nullptr;
} }
llarp::ev_io* serv = new llarp::tcp_serv(this, fd, tcp); llarp::ev_io *serv = new llarp::tcp_serv(this, fd, tcp);
tcp->impl = serv; tcp->impl = serv;
return serv; return serv;
} }

@ -29,11 +29,10 @@
namespace llarp namespace llarp
{ {
struct ev_io struct ev_io
{ {
struct WriteBuffer struct WriteBuffer
{ {
llarp_time_t timestamp = 0;
llarp_time_t timestamp = 0;
size_t bufsz; size_t bufsz;
byte_t buf[EV_WRITE_BUF_SZ]; byte_t buf[EV_WRITE_BUF_SZ];
@ -52,19 +51,19 @@ namespace llarp
struct GetTime struct GetTime
{ {
llarp_time_t llarp_time_t operator()(const WriteBuffer & buf) const
operator()(const WriteBuffer& w) const
{ {
return w.timestamp; return buf.timestamp;
} }
}; };
struct PutTime struct PutTime
{ {
void llarp_ev_loop * loop;
operator()(WriteBuffer& w) const PutTime(llarp_ev_loop * l ) : loop(l) {}
void operator()(WriteBuffer & buf)
{ {
w.timestamp = llarp_time_now_ms(); buf.timestamp = llarp_ev_loop_time_now_ms(loop);
} }
}; };
@ -239,11 +238,11 @@ namespace llarp
{ {
if(_shouldClose) if(_shouldClose)
return -1; return -1;
#ifdef __linux__ #ifdef __linux__
return ::send(fd, buf, sz, MSG_NOSIGNAL); // ignore sigpipe return ::send(fd, buf, sz, MSG_NOSIGNAL); // ignore sigpipe
#else #else
return ::send(fd, buf, sz, 0 ); return ::send(fd, buf, sz, 0);
#endif #endif
} }
int int
@ -303,7 +302,7 @@ namespace llarp
struct llarp_ev_loop struct llarp_ev_loop
{ {
byte_t readbuf[EV_READ_BUF_SZ]; byte_t readbuf[EV_READ_BUF_SZ];
llarp_time_t _now = 0;
virtual bool virtual bool
init() = 0; init() = 0;
virtual int virtual int

@ -79,8 +79,8 @@ namespace llarp
{ {
llarp_tun_io* t; llarp_tun_io* t;
device* tunif; device* tunif;
tun(llarp_tun_io* tio) tun(llarp_tun_io* tio, llarp_ev_loop* l)
: ev_io(-1, new LossyWriteQueue_t("tun_write_queue")) : ev_io(-1, new LossyWriteQueue_t("tun_write_queue", l))
, t(tio) , t(tio)
, tunif(tuntap_init()) , tunif(tuntap_init())
@ -309,7 +309,7 @@ struct llarp_epoll_loop : public llarp_ev_loop
llarp::ev_io* llarp::ev_io*
create_tun(llarp_tun_io* tun) create_tun(llarp_tun_io* tun)
{ {
llarp::tun* t = new llarp::tun(tun); llarp::tun* t = new llarp::tun(tun, this);
if(t->setup()) if(t->setup())
{ {
return t; return t;

@ -97,12 +97,10 @@ namespace llarp
{ {
llarp_tun_io* t; llarp_tun_io* t;
device* tunif; device* tunif;
tun(llarp_tun_io* tio) tun(llarp_tun_io* tio, llarp_ev_loop* l)
: ev_io(-1, new LossyWriteQueue_t("kqueue_tun_write")) : ev_io(-1, new LossyWriteQueue_t("kqueue_tun_write", l))
, t(tio) , t(tio)
, tunif(tuntap_init()) , tunif(tuntap_init()){};
{
};
int int
sendto(const sockaddr* to, const void* data, size_t sz) sendto(const sockaddr* to, const void* data, size_t sz)
@ -189,7 +187,7 @@ struct llarp_kqueue_loop : public llarp_ev_loop
llarp::ev_io* llarp::ev_io*
create_tun(llarp_tun_io* tun) create_tun(llarp_tun_io* tun)
{ {
llarp::tun* t = new llarp::tun(tun); llarp::tun* t = new llarp::tun(tun, this);
if(t->setup()) if(t->setup())
return t; return t;
delete t; delete t;

@ -19,8 +19,8 @@ namespace llarp
{ {
TunEndpoint::TunEndpoint(const std::string &nickname, llarp_router *r) TunEndpoint::TunEndpoint(const std::string &nickname, llarp_router *r)
: service::Endpoint(nickname, r) : service::Endpoint(nickname, r)
, m_UserToNetworkPktQueue(nickname + "_sendq") , m_UserToNetworkPktQueue(nickname + "_sendq", r->netloop)
, m_NetworkToUserPktQueue(nickname + "_recvq") , m_NetworkToUserPktQueue(nickname + "_recvq", r->netloop)
{ {
tunif.user = this; tunif.user = this;
tunif.netmask = DefaultTunNetmask; tunif.netmask = DefaultTunNetmask;
@ -369,7 +369,7 @@ namespace llarp
huint32_t huint32_t
TunEndpoint::ObtainIPForAddr(const service::Address &addr) TunEndpoint::ObtainIPForAddr(const service::Address &addr)
{ {
llarp_time_t now = llarp_time_now_ms(); llarp_time_t now = Now();
huint32_t nextIP = {0}; huint32_t nextIP = {0};
{ {
@ -440,7 +440,7 @@ namespace llarp
void void
TunEndpoint::MarkIPActive(huint32_t ip) TunEndpoint::MarkIPActive(huint32_t ip)
{ {
m_IPActivity[ip] = std::max(llarp_time_now_ms(), m_IPActivity[ip]); m_IPActivity[ip] = std::max(Now(), m_IPActivity[ip]);
} }
void void
@ -452,9 +452,8 @@ namespace llarp
void void
TunEndpoint::handleTickTun(void *u) TunEndpoint::handleTickTun(void *u)
{ {
auto now = llarp_time_now_ms();
TunEndpoint *self = static_cast< TunEndpoint * >(u); TunEndpoint *self = static_cast< TunEndpoint * >(u);
self->TickTun(now); self->TickTun(self->Now());
} }
void void

@ -30,6 +30,7 @@ namespace llarp
ILinkLayer::Configure(llarp_ev_loop* loop, const std::string& ifname, int af, ILinkLayer::Configure(llarp_ev_loop* loop, const std::string& ifname, int af,
uint16_t port) uint16_t port)
{ {
m_Loop = loop;
m_udp.user = this; m_udp.user = this;
m_udp.recvfrom = &ILinkLayer::udp_recv_from; m_udp.recvfrom = &ILinkLayer::udp_recv_from;
m_udp.tick = &ILinkLayer::udp_tick; m_udp.tick = &ILinkLayer::udp_tick;
@ -47,13 +48,13 @@ namespace llarp
void void
ILinkLayer::Pump() ILinkLayer::Pump()
{ {
auto now = llarp_time_now_ms(); auto _now = now();
{ {
Lock lock(m_AuthedLinksMutex); Lock lock(m_AuthedLinksMutex);
auto itr = m_AuthedLinks.begin(); auto itr = m_AuthedLinks.begin();
while(itr != m_AuthedLinks.end()) while(itr != m_AuthedLinks.end())
{ {
if(!itr->second->TimedOut(now)) if(!itr->second->TimedOut(_now))
{ {
itr->second->Pump(); itr->second->Pump();
++itr; ++itr;
@ -68,7 +69,7 @@ namespace llarp
auto itr = m_Pending.begin(); auto itr = m_Pending.begin();
while(itr != m_Pending.end()) while(itr != m_Pending.end())
{ {
if(!(*itr)->TimedOut(now)) if(!(*itr)->TimedOut(_now))
{ {
(*itr)->Pump(); (*itr)->Pump();
++itr; ++itr;
@ -261,9 +262,9 @@ namespace llarp
} }
void void
ILinkLayer::OnTick(uint64_t interval, llarp_time_t now) ILinkLayer::OnTick(uint64_t interval)
{ {
Tick(now); Tick(now());
ScheduleTick(interval); ScheduleTick(interval);
} }

@ -180,23 +180,7 @@ namespace llarp
EncryptThenHash(const byte_t* ptr, uint32_t sz, bool isLastFragment); EncryptThenHash(const byte_t* ptr, uint32_t sz, bool isLastFragment);
bool bool
QueueWriteBuffers(llarp_buffer_t buf) QueueWriteBuffers(llarp_buffer_t buf);
{
if(sendq.size() >= MaxSendQueueSize)
return false;
llarp::LogDebug("write ", buf.sz, " bytes to ", remoteAddr);
lastActive = llarp_time_now_ms();
size_t sz = buf.sz;
byte_t* ptr = buf.base;
while(sz)
{
uint32_t s = std::min(FragmentBodyPayloadSize, sz);
EncryptThenHash(ptr, s, ((sz - s) == 0));
ptr += s;
sz -= s;
}
return true;
}
void void
Connect() Connect()
@ -578,7 +562,7 @@ namespace llarp
SendQueueBacklog = [&]() -> size_t { return sendq.size(); }; SendQueueBacklog = [&]() -> size_t { return sendq.size(); };
SendKeepAlive = [&]() -> bool { SendKeepAlive = [&]() -> bool {
auto now = llarp_time_now_ms(); auto now = parent->now();
if(sendq.size() == 0 && state == eSessionReady && now > lastActive if(sendq.size() == 0 && state == eSessionReady && now > lastActive
&& now - lastActive > (sessionTimeout / 4)) && now - lastActive > (sessionTimeout / 4))
{ {
@ -600,7 +584,7 @@ namespace llarp
return this->IsTimedOut(now) || this->state == eClose; return this->IsTimedOut(now) || this->state == eClose;
}; };
GetPubKey = std::bind(&BaseSession::RemotePubKey, this); GetPubKey = std::bind(&BaseSession::RemotePubKey, this);
lastActive = llarp_time_now_ms(); lastActive = parent->now();
// Pump = []() {}; // Pump = []() {};
Pump = std::bind(&BaseSession::PumpWrite, this); Pump = std::bind(&BaseSession::PumpWrite, this);
Tick = std::bind(&BaseSession::TickImpl, this, std::placeholders::_1); Tick = std::bind(&BaseSession::TickImpl, this, std::placeholders::_1);
@ -661,6 +645,25 @@ namespace llarp
return true; return true;
} }
bool
BaseSession::QueueWriteBuffers(llarp_buffer_t buf)
{
if(sendq.size() >= MaxSendQueueSize)
return false;
llarp::LogDebug("write ", buf.sz, " bytes to ", remoteAddr);
lastActive = parent->now();
size_t sz = buf.sz;
byte_t* ptr = buf.base;
while(sz)
{
uint32_t s = std::min(FragmentBodyPayloadSize, sz);
EncryptThenHash(ptr, s, ((sz - s) == 0));
ptr += s;
sz -= s;
}
return true;
}
bool bool
BaseSession::OutboundLIM(const LinkIntroMessage* msg) BaseSession::OutboundLIM(const LinkIntroMessage* msg)
{ {
@ -949,7 +952,7 @@ namespace llarp
void void
BaseSession::Alive() BaseSession::Alive()
{ {
lastActive = llarp_time_now_ms(); lastActive = parent->now();
} }
} // namespace utp } // namespace utp

@ -27,16 +27,17 @@ llarp_init_single_process_logic(struct llarp_threadpool* tp)
} }
void void
llarp_logic_tick(struct llarp_logic* logic) llarp_logic_tick(struct llarp_logic* logic, llarp_time_t now)
{ {
llarp_timer_set_time(logic->timer, now);
llarp_timer_tick_all(logic->timer); llarp_timer_tick_all(logic->timer);
llarp_threadpool_tick(logic->thread); llarp_threadpool_tick(logic->thread);
} }
void void
llarp_logic_tick_async(struct llarp_logic* logic) llarp_logic_tick_async(struct llarp_logic* logic, llarp_time_t now)
{ {
llarp_timer_tick_all_async(logic->timer, logic->thread); llarp_timer_tick_all_async(logic->timer, logic->thread, now);
llarp_threadpool_tick(logic->thread); llarp_threadpool_tick(logic->thread);
} }

@ -234,10 +234,9 @@ namespace llarp
} }
void void
PathContext::ExpirePaths() PathContext::ExpirePaths(llarp_time_t now)
{ {
util::Lock lock(m_TransitPaths.first); util::Lock lock(m_TransitPaths.first);
auto now = llarp_time_now_ms();
auto& map = m_TransitPaths.second; auto& map = m_TransitPaths.second;
auto itr = map.begin(); auto itr = map.begin();
while(itr != map.end()) while(itr != map.end())
@ -258,11 +257,11 @@ namespace llarp
} }
void void
PathContext::BuildPaths() PathContext::BuildPaths(llarp_time_t now)
{ {
for(auto& builder : m_PathBuilders) for(auto& builder : m_PathBuilders)
{ {
if(builder->ShouldBuildMore()) if(builder->ShouldBuildMore(now))
{ {
builder->BuildOne(); builder->BuildOne();
} }
@ -270,9 +269,8 @@ namespace llarp
} }
void void
PathContext::TickPaths() PathContext::TickPaths(llarp_time_t now)
{ {
auto now = llarp_time_now_ms();
for(auto& builder : m_PathBuilders) for(auto& builder : m_PathBuilders)
builder->Tick(now, m_Router); builder->Tick(now, m_Router);
} }
@ -357,7 +355,7 @@ namespace llarp
// initialize parts of the introduction // initialize parts of the introduction
intro.router = hops[hsz - 1].rc.pubkey; intro.router = hops[hsz - 1].rc.pubkey;
intro.pathID = hops[hsz - 1].txID; intro.pathID = hops[hsz - 1].txID;
EnterState(ePathBuilding); EnterState(ePathBuilding, parent->Now());
} }
void void
@ -397,7 +395,7 @@ namespace llarp
} }
void void
Path::EnterState(PathStatus st) Path::EnterState(PathStatus st, llarp_time_t now)
{ {
if(st == ePathTimeout) if(st == ePathTimeout)
{ {
@ -406,7 +404,7 @@ namespace llarp
else if(st == ePathBuilding) else if(st == ePathBuilding)
{ {
llarp::LogInfo("path ", Name(), " is building"); llarp::LogInfo("path ", Name(), " is building");
buildStarted = llarp_time_now_ms(); buildStarted = now;
} }
_status = st; _status = st;
} }
@ -425,7 +423,7 @@ namespace llarp
if(dlt >= PATH_BUILD_TIMEOUT) if(dlt >= PATH_BUILD_TIMEOUT)
{ {
r->routerProfiling.MarkPathFail(this); r->routerProfiling.MarkPathFail(this);
EnterState(ePathTimeout); EnterState(ePathTimeout, now);
return; return;
} }
} }
@ -452,19 +450,19 @@ namespace llarp
if(m_CheckForDead(this, dlt)) if(m_CheckForDead(this, dlt))
{ {
r->routerProfiling.MarkPathFail(this); r->routerProfiling.MarkPathFail(this);
EnterState(ePathTimeout); EnterState(ePathTimeout, now);
} }
} }
else else
{ {
r->routerProfiling.MarkPathFail(this); r->routerProfiling.MarkPathFail(this);
EnterState(ePathTimeout); EnterState(ePathTimeout, now);
} }
} }
else if(dlt >= 10000 && m_LastRecvMessage == 0) else if(dlt >= 10000 && m_LastRecvMessage == 0)
{ {
r->routerProfiling.MarkPathFail(this); r->routerProfiling.MarkPathFail(this);
EnterState(ePathTimeout); EnterState(ePathTimeout, now);
} }
} }
} }
@ -581,14 +579,15 @@ namespace llarp
Path::HandlePathConfirmMessage( Path::HandlePathConfirmMessage(
const llarp::routing::PathConfirmMessage* msg, llarp_router* r) const llarp::routing::PathConfirmMessage* msg, llarp_router* r)
{ {
auto now = r->Now();
if(_status == ePathBuilding) if(_status == ePathBuilding)
{ {
// finish initializing introduction // finish initializing introduction
intro.expiresAt = buildStarted + hops[0].lifetime; intro.expiresAt = buildStarted + hops[0].lifetime;
// confirm that we build the path // confirm that we build the path
EnterState(ePathEstablished); EnterState(ePathEstablished, now);
llarp::LogInfo("path is confirmed tx=", TXID(), " rx=", RXID(), llarp::LogInfo("path is confirmed tx=", TXID(), " rx=", RXID(),
" took ", llarp_time_now_ms() - buildStarted, " ms"); " took ", now - buildStarted, " ms");
if(m_BuiltHook) if(m_BuiltHook)
m_BuiltHook(this); m_BuiltHook(this);
m_BuiltHook = nullptr; m_BuiltHook = nullptr;
@ -602,7 +601,7 @@ namespace llarp
llarp::routing::PathLatencyMessage latency; llarp::routing::PathLatencyMessage latency;
latency.T = llarp_randint(); latency.T = llarp_randint();
m_LastLatencyTestID = latency.T; m_LastLatencyTestID = latency.T;
m_LastLatencyTestTime = llarp_time_now_ms(); m_LastLatencyTestTime = now;
return SendRoutingMessage(&latency, r); return SendRoutingMessage(&latency, r);
} }
llarp::LogWarn("got unwarrented path confirm message on tx=", RXID(), llarp::LogWarn("got unwarrented path confirm message on tx=", RXID(),
@ -617,7 +616,7 @@ namespace llarp
{ {
if(m_DataHandler(this, frame)) if(m_DataHandler(this, frame))
{ {
m_LastRecvMessage = llarp_time_now_ms(); m_LastRecvMessage = m_PathSet->Now();
return true; return true;
} }
} }
@ -628,7 +627,7 @@ namespace llarp
Path::HandlePathLatencyMessage( Path::HandlePathLatencyMessage(
const llarp::routing::PathLatencyMessage* msg, llarp_router* r) const llarp::routing::PathLatencyMessage* msg, llarp_router* r)
{ {
auto now = llarp_time_now_ms(); auto now = r->Now();
// TODO: reanimate dead paths if they get this message // TODO: reanimate dead paths if they get this message
if(msg->L == m_LastLatencyTestID && _status == ePathEstablished) if(msg->L == m_LastLatencyTestID && _status == ePathEstablished)
{ {

@ -191,10 +191,9 @@ namespace llarp
} }
bool bool
Builder::ShouldBuildMore() const Builder::ShouldBuildMore(llarp_time_t now) const
{ {
auto now = llarp_time_now_ms(); return llarp::path::PathSet::ShouldBuildMore(now) && now > lastBuild
return llarp::path::PathSet::ShouldBuildMore() && now > lastBuild
&& now - lastBuild > buildIntervalLimit; && now - lastBuild > buildIntervalLimit;
} }
@ -236,10 +235,16 @@ namespace llarp
return true; return true;
} }
llarp_time_t
Builder::Now() const
{
return router->Now();
}
void void
Builder::Build(const std::vector< RouterContact >& hops) Builder::Build(const std::vector< RouterContact >& hops)
{ {
lastBuild = llarp_time_now_ms(); lastBuild = Now();
// async generate keys // async generate keys
AsyncPathKeyExchangeContext< Builder >* ctx = AsyncPathKeyExchangeContext< Builder >* ctx =
new AsyncPathKeyExchangeContext< Builder >(&router->crypto); new AsyncPathKeyExchangeContext< Builder >(&router->crypto);

@ -12,8 +12,9 @@ namespace llarp
} }
bool bool
PathSet::ShouldBuildMore() const PathSet::ShouldBuildMore(llarp_time_t now) const
{ {
(void)now;
return m_Paths.size() < m_NumPaths; return m_Paths.size() < m_NumPaths;
} }
@ -160,7 +161,7 @@ namespace llarp
void void
PathSet::HandlePathBuilt(Path* path) PathSet::HandlePathBuilt(Path* path)
{ {
auto dlt = llarp_time_now_ms() - path->buildStarted; auto dlt = Now() - path->buildStarted;
llarp::LogInfo("Path build took ", dlt, "ms for tx=", path->TXID(), llarp::LogInfo("Path build took ", dlt, "ms for tx=", path->TXID(),
" rx=", path->RXID()); " rx=", path->RXID());
} }

@ -26,10 +26,8 @@ namespace llarp
} }
bool bool
PoW::IsValid(llarp_shorthash_func hashfunc) const PoW::IsValid(llarp_shorthash_func hashfunc, llarp_time_t now) const
{ {
auto now = llarp_time_now_ms();
if(now - timestamp > (uint64_t(extendedLifetime) * 1000)) if(now - timestamp > (uint64_t(extendedLifetime) * 1000))
return false; return false;

@ -179,7 +179,7 @@ namespace llarp
} }
~LRCMFrameDecrypt() ~LRCMFrameDecrypt()
{ {
delete decrypter; delete decrypter;
} }
@ -226,6 +226,7 @@ namespace llarp
static void static void
HandleDecrypted(llarp_buffer_t* buf, LRCMFrameDecrypt* self) HandleDecrypted(llarp_buffer_t* buf, LRCMFrameDecrypt* self)
{ {
auto now = self->context->Router()->Now();
auto& info = self->hop->info; auto& info = self->hop->info;
if(!buf) if(!buf)
{ {
@ -265,7 +266,7 @@ namespace llarp
self->context->Crypto()->shorthash(self->hop->nonceXOR, self->context->Crypto()->shorthash(self->hop->nonceXOR,
llarp::Buffer(self->hop->pathKey)); llarp::Buffer(self->hop->pathKey));
if(self->record.work if(self->record.work
&& self->record.work->IsValid(self->context->Crypto()->shorthash)) && self->record.work->IsValid(self->context->Crypto()->shorthash, now))
{ {
llarp::LogDebug("LRCM extended lifetime by ", llarp::LogDebug("LRCM extended lifetime by ",
self->record.work->extendedLifetime, " seconds for ", self->record.work->extendedLifetime, " seconds for ",
@ -280,7 +281,7 @@ namespace llarp
} }
// TODO: check if we really want to accept it // TODO: check if we really want to accept it
self->hop->started = llarp_time_now_ms(); self->hop->started = now;
size_t sz = self->frames[0].size(); size_t sz = self->frames[0].size();
// shift // shift

@ -468,8 +468,8 @@ void
llarp_router::Tick() llarp_router::Tick()
{ {
// llarp::LogDebug("tick router"); // llarp::LogDebug("tick router");
auto now = llarp_time_now_ms(); auto now = llarp_ev_loop_time_now_ms(netloop);
paths.ExpirePaths(); paths.ExpirePaths(now);
{ {
auto itr = m_PersistingSessions.begin(); auto itr = m_PersistingSessions.begin();
while(itr != m_PersistingSessions.end()) while(itr != m_PersistingSessions.end())
@ -502,14 +502,14 @@ llarp_router::Tick()
auto explore = std::max(NumberOfConnectedRouters(), size_t(1)); auto explore = std::max(NumberOfConnectedRouters(), size_t(1));
dht->impl.Explore(explore); dht->impl.Explore(explore);
} }
paths.BuildPaths(); paths.BuildPaths(now);
hiddenServiceContext.Tick(); hiddenServiceContext.Tick();
} }
if(NumberOfConnectedRouters() < minConnectedRouters) if(NumberOfConnectedRouters() < minConnectedRouters)
{ {
ConnectToRandomRouters(minConnectedRouters); ConnectToRandomRouters(minConnectedRouters);
} }
paths.TickPaths(); paths.TickPaths(now);
} }
void void

@ -242,6 +242,13 @@ struct llarp_router
void void
Tick(); Tick();
/// get time from event loop
llarp_time_t
Now() const
{
return llarp_ev_loop_time_now_ms(netloop);
}
/// schedule ticker to call i ms from now /// schedule ticker to call i ms from now
void void
ScheduleTicker(uint64_t i = 1000); ScheduleTicker(uint64_t i = 1000);

@ -273,13 +273,13 @@ namespace llarp
} }
bool bool
Identity::SignIntroSet(IntroSet& i, llarp_crypto* crypto) const Identity::SignIntroSet(IntroSet& i, llarp_crypto* crypto, llarp_time_t now) const
{ {
if(i.I.size() == 0) if(i.I.size() == 0)
return false; return false;
// set timestamp // set timestamp
// TODO: round to nearest 1000 ms // TODO: round to nearest 1000 ms
i.T = llarp_time_now_ms(); i.T = now;
// set service info // set service info
i.A = pub; i.A = pub;
// set public encryption key // set public encryption key
@ -297,7 +297,7 @@ namespace llarp
} }
bool bool
IntroSet::Verify(llarp_crypto* crypto) const IntroSet::Verify(llarp_crypto* crypto, llarp_time_t now) const
{ {
byte_t tmp[MAX_INTROSET_SIZE]; byte_t tmp[MAX_INTROSET_SIZE];
auto buf = llarp::StackBuffer< decltype(tmp) >(tmp); auto buf = llarp::StackBuffer< decltype(tmp) >(tmp);
@ -312,10 +312,9 @@ namespace llarp
if(!A.Verify(crypto, buf, Z)) if(!A.Verify(crypto, buf, Z))
return false; return false;
// validate PoW // validate PoW
if(W && !W->IsValid(crypto->shorthash)) if(W && !W->IsValid(crypto->shorthash, now))
return false; return false;
// valid timestamps // valid timestamps
auto now = llarp_time_now_ms();
// add max clock skew // add max clock skew
now += MAX_INTROSET_TIME_DELTA; now += MAX_INTROSET_TIME_DELTA;
for(const auto& intro : I) for(const auto& intro : I)

@ -1,6 +1,7 @@
#include <llarp/handlers/tun.hpp> #include <llarp/handlers/tun.hpp>
#include <llarp/service/context.hpp> #include <llarp/service/context.hpp>
#include <llarp/service/endpoint.hpp> #include <llarp/service/endpoint.hpp>
#include "router.hpp"
namespace llarp namespace llarp
{ {
@ -22,7 +23,7 @@ namespace llarp
void void
Context::Tick() Context::Tick()
{ {
auto now = llarp_time_now_ms(); auto now = m_Router->Now();
auto itr = m_Endpoints.begin(); auto itr = m_Endpoints.begin();
while(itr != m_Endpoints.end()) while(itr != m_Endpoints.end())
{ {

@ -102,7 +102,7 @@ namespace llarp
{ {
llarp::LogWarn("could not publish descriptors for endpoint ", Name(), llarp::LogWarn("could not publish descriptors for endpoint ", Name(),
" because we couldn't get enough valid introductions"); " because we couldn't get enough valid introductions");
if(ShouldBuildMore() || forceRebuild) if(ShouldBuildMore(now) || forceRebuild)
ManualRebuild(1); ManualRebuild(1);
return; return;
} }
@ -117,7 +117,7 @@ namespace llarp
return; return;
} }
m_IntroSet.topic = m_Tag; m_IntroSet.topic = m_Tag;
if(!m_Identity.SignIntroSet(m_IntroSet, &m_Router->crypto)) if(!m_Identity.SignIntroSet(m_IntroSet, &m_Router->crypto, now))
{ {
llarp::LogWarn("failed to sign introset for endpoint ", Name()); llarp::LogWarn("failed to sign introset for endpoint ", Name());
return; return;
@ -300,7 +300,7 @@ namespace llarp
std::set< IntroSet > remote; std::set< IntroSet > remote;
for(const auto& introset : msg->I) for(const auto& introset : msg->I)
{ {
if(!introset.Verify(crypto)) if(!introset.Verify(crypto, Now()))
{ {
if(m_Identity.pub == introset.A && m_CurrentPublishTX == msg->T) if(m_Identity.pub == introset.A && m_CurrentPublishTX == msg->T)
{ {
@ -358,7 +358,7 @@ namespace llarp
itr = m_Sessions.insert(std::make_pair(tag, Session{})).first; itr = m_Sessions.insert(std::make_pair(tag, Session{})).first;
} }
itr->second.remote = info; itr->second.remote = info;
itr->second.lastUsed = llarp_time_now_ms(); itr->second.lastUsed = Now();
} }
bool bool
@ -380,7 +380,7 @@ namespace llarp
itr = m_Sessions.insert(std::make_pair(tag, Session{})).first; itr = m_Sessions.insert(std::make_pair(tag, Session{})).first;
} }
itr->second.intro = intro; itr->second.intro = intro;
itr->second.lastUsed = llarp_time_now_ms(); itr->second.lastUsed = Now();
} }
bool bool
@ -430,7 +430,7 @@ namespace llarp
itr = m_Sessions.insert(std::make_pair(tag, Session{})).first; itr = m_Sessions.insert(std::make_pair(tag, Session{})).first;
} }
itr->second.sharedKey = k; itr->second.sharedKey = k;
itr->second.lastUsed = llarp_time_now_ms(); itr->second.lastUsed = Now();
} }
bool bool
@ -469,7 +469,7 @@ namespace llarp
Endpoint::CachedTagResult::HandleResponse( Endpoint::CachedTagResult::HandleResponse(
const std::set< IntroSet >& introsets) const std::set< IntroSet >& introsets)
{ {
auto now = llarp_time_now_ms(); auto now = parent->Now();
for(const auto& introset : introsets) for(const auto& introset : introsets)
if(result.insert(introset).second) if(result.insert(introset).second)
@ -505,7 +505,7 @@ namespace llarp
{ {
llarp::routing::DHTMessage* msg = new llarp::routing::DHTMessage(); llarp::routing::DHTMessage* msg = new llarp::routing::DHTMessage();
msg->M.emplace_back(new llarp::dht::FindIntroMessage(tag, txid)); msg->M.emplace_back(new llarp::dht::FindIntroMessage(tag, txid));
lastRequest = llarp_time_now_ms(); lastRequest = parent->Now();
return msg; return msg;
} }
@ -563,7 +563,7 @@ namespace llarp
auto job = new PublishIntroSetJob(this, GenTXID(), m_IntroSet); auto job = new PublishIntroSetJob(this, GenTXID(), m_IntroSet);
if(job->SendRequestViaPath(path, r)) if(job->SendRequestViaPath(path, r))
{ {
m_LastPublishAttempt = llarp_time_now_ms(); m_LastPublishAttempt = Now();
return true; return true;
} }
return false; return false;
@ -582,7 +582,7 @@ namespace llarp
void void
Endpoint::IntroSetPublished() Endpoint::IntroSetPublished()
{ {
m_LastPublish = llarp_time_now_ms(); m_LastPublish = Now();
llarp::LogInfo(Name(), " IntroSet publish confirmed"); llarp::LogInfo(Name(), " IntroSet publish confirmed");
} }
@ -761,7 +761,7 @@ namespace llarp
{ {
llarp::LogWarn(Name(), " message ", seq, " dropped by endpoint ", llarp::LogWarn(Name(), " message ", seq, " dropped by endpoint ",
p->Endpoint(), " via ", dst); p->Endpoint(), " via ", dst);
if(MarkCurrentIntroBad(llarp_time_now_ms())) if(MarkCurrentIntroBad(Now()))
{ {
llarp::LogInfo(Name(), " switched intros to ", remoteIntro.router, llarp::LogInfo(Name(), " switched intros to ", remoteIntro.router,
" via ", remoteIntro.pathID); " via ", remoteIntro.pathID);
@ -797,7 +797,7 @@ namespace llarp
, m_DataHandler(ep) , m_DataHandler(ep)
, m_Endpoint(ep) , m_Endpoint(ep)
{ {
createdAt = llarp_time_now_ms(); createdAt = ep->Now();
} }
void void
@ -822,7 +822,7 @@ namespace llarp
Endpoint::HandlePathDead(void* user) Endpoint::HandlePathDead(void* user)
{ {
Endpoint* self = static_cast< Endpoint* >(user); Endpoint* self = static_cast< Endpoint* >(user);
self->RegenAndPublishIntroSet(llarp_time_now_ms(), true); self->RegenAndPublishIntroSet(self->Now(), true);
} }
bool bool
@ -845,7 +845,7 @@ namespace llarp
Endpoint::OnLookup(const Address& addr, const IntroSet* introset, Endpoint::OnLookup(const Address& addr, const IntroSet* introset,
const RouterID& endpoint) const RouterID& endpoint)
{ {
auto now = llarp_time_now_ms(); auto now = Now();
if(introset == nullptr || introset->IsExpired(now)) if(introset == nullptr || introset->IsExpired(now))
{ {
llarp::LogError(Name(), " failed to lookup ", addr.ToString(), " from ", llarp::LogError(Name(), " failed to lookup ", addr.ToString(), " from ",
@ -958,7 +958,7 @@ namespace llarp
{ {
remoteIntro = m_NextIntro; remoteIntro = m_NextIntro;
// prepare next intro // prepare next intro
auto now = llarp_time_now_ms(); auto now = Now();
for(const auto& intro : currentIntroSet.I) for(const auto& intro : currentIntroSet.I)
{ {
if(intro.ExpiresSoon(now)) if(intro.ExpiresSoon(now))
@ -996,7 +996,7 @@ namespace llarp
llarp::LogInfo("introset is old, dropping"); llarp::LogInfo("introset is old, dropping");
return true; return true;
} }
auto now = llarp_time_now_ms(); auto now = Now();
if(i->IsExpired(now)) if(i->IsExpired(now))
{ {
llarp::LogError("got expired introset from lookup from ", endpoint); llarp::LogError("got expired introset from lookup from ", endpoint);
@ -1028,11 +1028,11 @@ namespace llarp
ProtocolType t) ProtocolType t)
{ {
// inbound converstation // inbound converstation
auto now = Now();
{ {
auto itr = m_AddressToService.find(remote); auto itr = m_AddressToService.find(remote);
if(itr != m_AddressToService.end()) if(itr != m_AddressToService.end())
{ {
auto now = llarp_time_now_ms();
routing::PathTransferMessage transfer; routing::PathTransferMessage transfer;
ProtocolFrame& f = transfer.T; ProtocolFrame& f = transfer.T;
path::Path* p = nullptr; path::Path* p = nullptr;
@ -1211,7 +1211,7 @@ namespace llarp
Endpoint::OutboundContext::ShiftIntroduction() Endpoint::OutboundContext::ShiftIntroduction()
{ {
bool success = false; bool success = false;
auto now = llarp_time_now_ms(); auto now = Now();
if(now - lastShift < MIN_SHIFT_INTERVAL) if(now - lastShift < MIN_SHIFT_INTERVAL)
return false; return false;
bool shifted = false; bool shifted = false;
@ -1255,7 +1255,7 @@ namespace llarp
Endpoint::SendContext::AsyncEncryptAndSendTo(llarp_buffer_t data, Endpoint::SendContext::AsyncEncryptAndSendTo(llarp_buffer_t data,
ProtocolType protocol) ProtocolType protocol)
{ {
auto now = llarp_time_now_ms(); auto now = m_Endpoint->Now();
if(remoteIntro.ExpiresSoon(now)) if(remoteIntro.ExpiresSoon(now))
{ {
if(!MarkCurrentIntroBad(now)) if(!MarkCurrentIntroBad(now))
@ -1409,7 +1409,7 @@ namespace llarp
{ {
llarp::LogDebug("sent data to ", remoteIntro.pathID, " on ", llarp::LogDebug("sent data to ", remoteIntro.pathID, " on ",
remoteIntro.router); remoteIntro.router);
lastGoodSend = llarp_time_now_ms(); lastGoodSend = m_Endpoint->Now();
} }
else else
llarp::LogError("Failed to send frame on path"); llarp::LogError("Failed to send frame on path");
@ -1532,17 +1532,15 @@ namespace llarp
} }
bool bool
Endpoint::OutboundContext::ShouldBuildMore() const Endpoint::OutboundContext::ShouldBuildMore(llarp_time_t now) const
{ {
if(markedBad) if(markedBad)
return false; return false;
bool should = path::Builder::ShouldBuildMore(); bool should = path::Builder::ShouldBuildMore(now);
// determinte newest intro // determinte newest intro
Introduction intro; Introduction intro;
if(!GetNewestIntro(intro)) if(!GetNewestIntro(intro))
return should; return should;
auto now = llarp_time_now_ms();
// time from now that the newest intro expires at // time from now that the newest intro expires at
if(now >= intro.expiresAt) if(now >= intro.expiresAt)
return should; return should;
@ -1567,7 +1565,7 @@ namespace llarp
f.T = currentConvoTag; f.T = currentConvoTag;
f.S = m_Endpoint->GetSeqNoForConvo(f.T); f.S = m_Endpoint->GetSeqNoForConvo(f.T);
auto now = llarp_time_now_ms(); auto now = m_Endpoint->Now();
if(remoteIntro.ExpiresSoon(now)) if(remoteIntro.ExpiresSoon(now))
{ {
// shift intro // shift intro

@ -20,11 +20,11 @@ namespace llarp
bool done; bool done;
bool canceled; bool canceled;
timer(uint64_t ms = 0, void* _user = nullptr, timer(llarp_time_t now, uint64_t ms = 0, void* _user = nullptr,
llarp_timer_handler_func _func = nullptr) llarp_timer_handler_func _func = nullptr)
: user(_user) : user(_user)
, called_at(0) , called_at(0)
, started(llarp_time_now_ms()) , started(now)
, timeout(ms) , timeout(ms)
, func(_func) , func(_func)
, done(false) , done(false)
@ -62,6 +62,13 @@ struct llarp_timer_context
llarp::util::Condition* ticker = nullptr; llarp::util::Condition* ticker = nullptr;
std::chrono::milliseconds nextTickLen = std::chrono::milliseconds(100); std::chrono::milliseconds nextTickLen = std::chrono::milliseconds(100);
llarp_time_t m_Now;
llarp_timer_context()
{
m_Now = llarp_time_now_ms();
}
uint32_t ids = 0; uint32_t ids = 0;
bool _run = true; bool _run = true;
@ -108,11 +115,12 @@ struct llarp_timer_context
call_later(void* user, llarp_timer_handler_func func, uint64_t timeout_ms) call_later(void* user, llarp_timer_handler_func func, uint64_t timeout_ms)
{ {
llarp::util::Lock lock(timersMutex); llarp::util::Lock lock(timersMutex);
uint32_t id = ++ids; uint32_t id = ++ids;
timers.insert( timers.insert(
std::make_pair(id, std::make_pair(id,
std::unique_ptr< llarp::timer >( std::unique_ptr< llarp::timer >(
new llarp::timer(timeout_ms, user, func)))); new llarp::timer(m_Now, timeout_ms, user, func))));
return id; return id;
} }
@ -181,19 +189,27 @@ llarp_timer_cancel_job(struct llarp_timer_context* t, uint32_t id)
t->cancel(id); t->cancel(id);
} }
void
llarp_timer_set_time(struct llarp_timer_context* t, llarp_time_t now)
{
if(now == 0)
now = llarp_time_now_ms();
t->m_Now = now;
}
void void
llarp_timer_tick_all(struct llarp_timer_context* t) llarp_timer_tick_all(struct llarp_timer_context* t)
{ {
if(!t->run()) if(!t->run())
return; return;
auto now = llarp_time_now_ms();
std::list< std::unique_ptr< llarp::timer > > hit; std::list< std::unique_ptr< llarp::timer > > hit;
{ {
llarp::util::Lock lock(t->timersMutex); llarp::util::Lock lock(t->timersMutex);
auto itr = t->timers.begin(); auto itr = t->timers.begin();
while(itr != t->timers.end()) while(itr != t->timers.end())
{ {
if(now - itr->second->started >= itr->second->timeout if(t->m_Now - itr->second->started >= itr->second->timeout
|| itr->second->canceled) || itr->second->canceled)
{ {
// timer hit // timer hit
@ -208,7 +224,7 @@ llarp_timer_tick_all(struct llarp_timer_context* t)
{ {
if(h->func) if(h->func)
{ {
h->called_at = now; h->called_at = t->m_Now;
h->exec(); h->exec();
} }
} }
@ -222,8 +238,9 @@ llarp_timer_tick_all_job(void* user)
void void
llarp_timer_tick_all_async(struct llarp_timer_context* t, llarp_timer_tick_all_async(struct llarp_timer_context* t,
struct llarp_threadpool* pool) struct llarp_threadpool* pool, llarp_time_t now)
{ {
t->m_Now = now;
llarp_threadpool_queue_job(pool, {t, llarp_timer_tick_all_job}); llarp_threadpool_queue_job(pool, {t, llarp_timer_tick_all_job});
} }
@ -244,7 +261,7 @@ llarp_timer_run(struct llarp_timer_context* t, struct llarp_threadpool* pool)
{ {
llarp::util::Lock lock(t->timersMutex); llarp::util::Lock lock(t->timersMutex);
// we woke up // we woke up
llarp_timer_tick_all_async(t, pool); llarp_timer_tick_all_async(t, pool, llarp_time_now_ms());
} }
} }
} }

Loading…
Cancel
Save