Merge pull request #912 from majestrate/logic-thread-fix-2019-11-13

fix logic thread behavior
pull/919/head
Jason Rhinelander 5 years ago committed by GitHub
commit d96d33329b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -44,7 +44,7 @@ llarp_ev_loop_run_single_process(llarp_ev_loop_ptr ev,
if(ev->running())
{
ev->update_time();
logic->tick_async(ev->time_now());
logic->tick(ev->time_now());
}
llarp::LogContext::Instance().logStream->Tick(ev->time_now());
}

@ -671,7 +671,8 @@ namespace llarp
#ifdef _WIN32
struct llarp_fd_promise
{
void Set(std::pair< int, int >)
void
Set(std::pair< int, int >)
{
}

@ -596,6 +596,7 @@ namespace llarp
}
}
SendMACK();
Pump();
m_Parent->PumpDone();
}

@ -39,9 +39,17 @@ namespace llarp
}
bool
DecodeKey(ABSL_ATTRIBUTE_UNUSED const llarp_buffer_t& key,
ABSL_ATTRIBUTE_UNUSED llarp_buffer_t* buf) override
DecodeKey(const llarp_buffer_t& key, llarp_buffer_t* buf) override
{
if(key == "a")
{
llarp_buffer_t strbuf;
if(!bencode_read_string(buf, &strbuf))
return false;
if(strbuf.sz != 1)
return false;
return *strbuf.cur == 'x';
}
return false;
}

@ -199,15 +199,15 @@ namespace llarp
}
/// override me in subtype
virtual bool HandleGotIntroMessage(
std::shared_ptr< const dht::GotIntroMessage >)
virtual bool
HandleGotIntroMessage(std::shared_ptr< const dht::GotIntroMessage >)
{
return false;
}
/// override me in subtype
virtual bool HandleGotRouterMessage(
std::shared_ptr< const dht::GotRouterMessage >)
virtual bool
HandleGotRouterMessage(std::shared_ptr< const dht::GotRouterMessage >)
{
return false;
}

@ -65,8 +65,7 @@ namespace llarp
return m_CachedAddr.ToString();
}
bool
ServiceInfo::CalculateAddress(std::array< byte_t, 32 >& data) const
bool ServiceInfo::CalculateAddress(std::array< byte_t, 32 >& data) const
{
std::array< byte_t, 256 > tmp;
llarp_buffer_t buf(tmp);

@ -89,8 +89,7 @@ namespace llarp
}
/// calculate our address
bool
CalculateAddress(std::array< byte_t, 32 >& data) const;
bool CalculateAddress(std::array< byte_t, 32 >& data) const;
bool
BDecode(llarp_buffer_t* buf)

@ -1,87 +1,80 @@
#include <util/thread/logic.hpp>
#include <util/thread/timer.hpp>
#include <util/logging/logger.hpp>
#include <util/mem.h>
#include <future>
namespace llarp
{
void
Logic::tick(llarp_time_t now)
{
llarp_timer_set_time(this->timer, now);
llarp_timer_tick_all(this->timer);
llarp_threadpool_tick(this->thread);
llarp_timer_tick_all_async(m_Timer, m_Thread, now);
}
Logic::Logic()
: thread(llarp_init_threadpool(1, "llarp-logic"))
, timer(llarp_init_timer())
: m_Thread(llarp_init_threadpool(1, "llarp-logic"))
, m_Timer(llarp_init_timer())
{
llarp_threadpool_start(thread);
llarp_threadpool_start(m_Thread);
/// set thread id
thread->impl->addJob([&]() { id.emplace(std::this_thread::get_id()); });
std::promise< ID_t > result;
// queue setting id and try to get the result back
llarp_threadpool_queue_job(m_Thread, [&]() {
m_ID.emplace(std::this_thread::get_id());
result.set_value(m_ID.value());
});
// get the result back
ID_t spawned = result.get_future().get();
LogDebug("logic thread spawned on ", spawned);
}
Logic::~Logic()
{
llarp_threadpool_stop(this->thread);
llarp_threadpool_join(this->thread);
llarp_free_threadpool(&this->thread);
}
void
Logic::tick_async(llarp_time_t now)
{
llarp_timer_tick_all_async(this->timer, this->thread, now);
}
void
Logic::stop_timer()
{
llarp_timer_stop(this->timer);
delete m_Thread;
llarp_free_timer(m_Timer);
}
void
bool
Logic::queue_job(struct llarp_thread_job job)
{
if(job.user && job.work)
queue_func(std::bind(job.work, job.user));
return job.user && job.work && queue_func(std::bind(job.work, job.user));
}
void
Logic::stop()
{
llarp::LogDebug("logic thread stop");
if(this->thread)
{
llarp_threadpool_stop(this->thread);
}
llarp::LogDebug("logic timer stop");
if(this->timer)
llarp_timer_stop(this->timer);
}
void
Logic::mainloop()
{
llarp_timer_run(this->timer, this->thread);
// stop all timers from happening in the future
queue_func(std::bind(&llarp_timer_stop, m_Timer));
// stop all operations on threadpool
llarp_threadpool_stop(m_Thread);
}
bool
Logic::queue_func(std::function< void(void) >&& f)
Logic::queue_func(std::function< void(void) > f)
{
if(!this->thread->impl->tryAddJob(f))
if(m_Thread->LooksFull(5))
{
call_later(0, f);
LogWarn(
"holy crap, we are trying to queue a job onto the logic thread but "
"it looks full");
if(can_flush())
{
// we are calling in the logic thread and our queue looks full
// defer call to a later time so we don't die like a little bitch
call_later(m_Thread->GuessJobLatency() / 2, f);
return true;
}
}
return true;
return llarp_threadpool_queue_job(m_Thread, f);
}
void
Logic::call_later(llarp_time_t timeout, std::function< void(void) > func)
{
llarp_timer_call_func_later(this->timer, timeout, func);
llarp_timer_call_func_later(m_Timer, timeout, func);
}
uint32_t
@ -91,25 +84,25 @@ namespace llarp
j.user = job.user;
j.timeout = job.timeout;
j.handler = job.handler;
return llarp_timer_call_later(this->timer, j);
return llarp_timer_call_later(m_Timer, j);
}
void
Logic::cancel_call(uint32_t id)
{
llarp_timer_cancel_job(this->timer, id);
llarp_timer_cancel_job(m_Timer, id);
}
void
Logic::remove_call(uint32_t id)
{
llarp_timer_remove_job(this->timer, id);
llarp_timer_remove_job(m_Timer, id);
}
bool
Logic::can_flush() const
{
return id.value() == std::this_thread::get_id();
return m_ID.value() == std::this_thread::get_id();
}
} // namespace llarp

@ -11,36 +11,23 @@ namespace llarp
class Logic
{
public:
struct llarp_threadpool* thread;
struct llarp_timer_context* timer;
absl::optional< std::thread::id > id;
Logic();
~Logic();
/// single threaded tick
/// trigger times as needed
void
tick(llarp_time_t now);
/// isolated tick
void
tick_async(llarp_time_t now);
void
stop_timer();
/// stop all operation and wait for that to die
void
stop();
void
mainloop();
void
bool
queue_job(struct llarp_thread_job job);
bool
queue_func(std::function< void(void) >&& func);
queue_func(std::function< void(void) > func);
uint32_t
call_later(const llarp_timeout_job& job);
@ -56,6 +43,12 @@ namespace llarp
bool
can_flush() const;
private:
using ID_t = std::thread::id;
llarp_threadpool* const m_Thread;
llarp_timer_context* const m_Timer;
absl::optional< ID_t > m_ID;
};
} // namespace llarp

@ -15,18 +15,13 @@ llarp_init_threadpool(int workers, const char *name)
return new llarp_threadpool(workers, name);
}
struct llarp_threadpool *
llarp_init_same_process_threadpool()
{
return new llarp_threadpool();
}
void
llarp_threadpool_join(struct llarp_threadpool *pool)
{
llarp::LogDebug("threadpool join");
if(pool->impl)
pool->impl->drain();
pool->impl->stop();
pool->impl.reset();
}
void
@ -41,64 +36,29 @@ llarp_threadpool_stop(struct llarp_threadpool *pool)
{
llarp::LogDebug("threadpool stop");
if(pool->impl)
pool->impl->stop();
if(pool->jobs)
pool->jobs->disable();
pool->impl->disable();
}
void
llarp_threadpool_wait(struct llarp_threadpool *pool)
{
llarp::LogDebug("threadpool wait");
if(pool->impl)
{
pool->impl->drain();
}
}
void
bool
llarp_threadpool_queue_job(struct llarp_threadpool *pool,
struct llarp_thread_job job)
{
return llarp_threadpool_queue_job(pool, (std::bind(job.work, job.user)));
return llarp_threadpool_queue_job(pool, std::bind(job.work, job.user));
}
void
bool
llarp_threadpool_queue_job(struct llarp_threadpool *pool,
std::function< void() > func)
std::function< void(void) > func)
{
if(pool->impl)
{
while(!pool->impl->tryAddJob(func))
{
std::this_thread::sleep_for(std::chrono::microseconds(1000));
}
}
else
{
// single threaded mode
while(pool->jobs->tryPushBack(func) != llarp::thread::QueueReturn::Success)
{
if(!pool->jobs->enabled())
return;
if(::getpid() == pool->callingPID)
llarp_threadpool_tick(pool);
else
std::this_thread::sleep_for(std::chrono::microseconds(1000));
}
}
return pool->impl && pool->impl->addJob(func);
}
void
llarp_threadpool_tick(struct llarp_threadpool *pool)
{
while(pool->size())
if(pool->impl)
{
auto job = pool->jobs->tryPopFront();
if(job)
{
(*job)();
}
pool->impl->drain();
}
}
@ -111,3 +71,47 @@ llarp_free_threadpool(struct llarp_threadpool **pool)
}
*pool = nullptr;
}
size_t
llarp_threadpool::size() const
{
return impl ? impl->capacity() : 0;
}
size_t
llarp_threadpool::pendingJobs() const
{
return impl ? impl->jobCount() : 0;
}
size_t
llarp_threadpool::numThreads() const
{
return impl ? impl->activeThreadCount() : 0;
}
llarp_time_t
llarp_threadpool::GuessJobLatency(llarp_time_t granularity) const
{
static const llarp_time_t minimum = llarp_time_t{10};
granularity = std::max(granularity, minimum);
const llarp_time_t _jobs = llarp_time_t{pendingJobs()} * granularity;
const llarp_time_t _capacity =
std::max(llarp_time_t{size()} * granularity, granularity);
const llarp_time_t _numthreads =
std::max(llarp_time_t{numThreads()} * granularity, granularity);
// divisor = log10(granularity)
llarp_time_t divisor = 0;
do
{
granularity /= 10;
if(granularity > 0)
divisor++;
} while(granularity > 0);
// granulairuty is minimum of 10 so log10(granulairuty) is never 0
divisor *= divisor;
// job lag is pending number of jobs divided by job queue length per thread
// divided by log10(granularity) sqaured
const llarp_time_t _queue_length = _capacity / _numthreads;
return _jobs / _queue_length / divisor;
}

@ -5,48 +5,51 @@
#include <util/thread/queue.hpp>
#include <util/thread/thread_pool.hpp>
#include <util/thread/threading.hpp>
#include <util/types.hpp>
#include <absl/base/thread_annotations.h>
#include <memory>
#include <queue>
struct llarp_threadpool;
#ifdef __cplusplus
struct llarp_threadpool
{
std::unique_ptr< llarp::thread::ThreadPool > impl;
std::unique_ptr< llarp::thread::Queue< std::function< void(void) > > > jobs;
const pid_t callingPID;
llarp_threadpool(int workers, llarp::string_view name)
: impl(std::make_unique< llarp::thread::ThreadPool >(workers,
workers * 128, name))
, jobs(nullptr)
, callingPID(0)
{
}
llarp_threadpool()
: jobs(new llarp::thread::Queue< std::function< void() > >(128))
, callingPID(llarp::util::GetPid())
llarp_threadpool(int workers, llarp::string_view name,
size_t queueLength = size_t{1024})
: impl(std::make_unique< llarp::thread::ThreadPool >(
workers, std::max(queueLength, size_t{32}), name))
{
jobs->enable();
}
size_t
size() const
size() const;
size_t
pendingJobs() const;
size_t
numThreads() const;
/// try to guess how big our job latency is on this threadpool
llarp_time_t
GuessJobLatency(llarp_time_t granulairty = 1000) const;
/// see if this thread is full given lookahead amount
bool
LooksFull(size_t lookahead) const
{
if(jobs)
return jobs->size();
return 0;
return (pendingJobs() + lookahead) >= size();
}
};
#endif
struct llarp_threadpool *
llarp_init_threadpool(int workers, const char *name);
/// for single process mode
struct llarp_threadpool *
llarp_init_same_process_threadpool();
void
llarp_free_threadpool(struct llarp_threadpool **tp);
@ -55,44 +58,41 @@ using llarp_thread_work_func = void (*)(void *);
/** job to be done in worker thread */
struct llarp_thread_job
{
#ifdef __cplusplus
/** user data to pass to work function */
void *user{nullptr};
/** called in threadpool worker thread */
llarp_thread_work_func work{nullptr};
#ifdef __cplusplus
llarp_thread_job(void *u, llarp_thread_work_func w) : user(u), work(w)
{
}
llarp_thread_job() = default;
#else
void *user;
llarp_thread_work_func work;
#endif
};
/// for single process mode
void
llarp_threadpool_tick(struct llarp_threadpool *tp);
void
bool
llarp_threadpool_queue_job(struct llarp_threadpool *tp,
struct llarp_thread_job j);
#ifdef __cplusplus
void
bool
llarp_threadpool_queue_job(struct llarp_threadpool *tp,
std::function< void() > func);
std::function< void(void) > func);
#endif
void
llarp_threadpool_start(struct llarp_threadpool *tp);
void
llarp_threadpool_stop(struct llarp_threadpool *tp);
void
llarp_threadpool_join(struct llarp_threadpool *tp);
void
llarp_threadpool_wait(struct llarp_threadpool *tp);
#endif

@ -1,5 +1,5 @@
#include <util/thread/timer.hpp>
#include <util/logging/logger.hpp>
#include <util/time.hpp>
#include <atomic>
@ -65,6 +65,9 @@ struct llarp_timer_context
absl::Duration nextTickLen = absl::Milliseconds(100);
llarp_time_t m_Now;
llarp_time_t m_NextRequiredTickAt =
std::numeric_limits< llarp_time_t >::max();
size_t m_NumPendingTimers;
llarp_timer_context()
{
@ -118,46 +121,52 @@ struct llarp_timer_context
const uint32_t id = ++currentId;
timers.emplace(
id, std::make_unique< llarp::timer >(m_Now, timeout_ms, user, func));
m_NextRequiredTickAt = std::min(m_NextRequiredTickAt, m_Now + timeout_ms);
m_NumPendingTimers = timers.size();
return id;
}
uint32_t
call_func_later(std::function< void(void) > func, llarp_time_t timeout)
call_func_later(std::function< void(void) > func, llarp_time_t timeout_ms)
{
llarp::util::Lock lock(&timersMutex);
const uint32_t id = ++currentId;
timers.emplace(
id, std::make_unique< llarp::timer >(m_Now, timeout, nullptr, nullptr));
id,
std::make_unique< llarp::timer >(m_Now, timeout_ms, nullptr, nullptr));
timers[id]->deferredFunc = func;
m_NextRequiredTickAt = std::min(m_NextRequiredTickAt, m_Now + timeout_ms);
m_NumPendingTimers = timers.size();
return id;
}
void
cancel_all() LOCKS_EXCLUDED(timersMutex)
{
std::list< uint32_t > ids;
{
llarp::util::Lock lock(&timersMutex);
for(auto& item : timers)
{
ids.push_back(item.first);
item.second->func = nullptr;
item.second->canceled = true;
}
}
}
for(auto id : ids)
{
cancel(id);
}
bool
ShouldTriggerTimers(llarp_time_t peekAhead) const
{
return m_NumPendingTimers > 0
and (m_Now + peekAhead) >= m_NextRequiredTickAt;
}
};
struct llarp_timer_context*
llarp_init_timer()
{
return new llarp_timer_context;
return new llarp_timer_context();
}
uint32_t
@ -175,11 +184,9 @@ llarp_timer_call_func_later(struct llarp_timer_context* t, llarp_time_t timeout,
}
void
llarp_free_timer(struct llarp_timer_context** t)
llarp_free_timer(struct llarp_timer_context* t)
{
if(*t)
delete *t;
*t = nullptr;
delete t;
}
void
@ -191,6 +198,7 @@ llarp_timer_remove_job(struct llarp_timer_context* t, uint32_t id)
void
llarp_timer_stop(struct llarp_timer_context* t)
{
llarp::LogDebug("timers stopping");
// destroy all timers
// don't call callbacks on timers
{
@ -200,6 +208,7 @@ llarp_timer_stop(struct llarp_timer_context* t)
}
if(t->ticker)
t->ticker->SignalAll();
llarp::LogDebug("timers stopped");
}
void
@ -236,31 +245,39 @@ llarp_timer_tick_all(struct llarp_timer_context* t)
itr = t->timers.erase(itr);
}
else
{
++itr;
}
}
}
for(const auto& h : hit)
while(not hit.empty())
{
const auto& h = hit.front();
h->called_at = t->m_Now;
h->exec();
hit.pop_front();
}
// reindex next tick info
{
if(h->func)
llarp::util::Lock lock(&t->timersMutex);
t->m_Now = llarp::time_now_ms();
t->m_NextRequiredTickAt = std::numeric_limits< llarp_time_t >::max();
for(const auto& item : t->timers)
{
h->called_at = t->m_Now;
h->exec();
t->m_NextRequiredTickAt =
std::min(t->m_NextRequiredTickAt, item.second->timeout + t->m_Now);
}
t->m_NumPendingTimers = t->timers.size();
}
}
static void
llarp_timer_tick_all_job(void* user)
{
llarp_timer_tick_all(static_cast< llarp_timer_context* >(user));
}
void
llarp_timer_tick_all_async(struct llarp_timer_context* t,
struct llarp_threadpool* pool, llarp_time_t now)
{
t->m_Now = now;
llarp_threadpool_queue_job(pool, {t, llarp_timer_tick_all_job});
llarp_timer_set_time(t, now);
if(t->ShouldTriggerTimers(pool->GuessJobLatency()))
llarp_threadpool_queue_job(pool, std::bind(&llarp_timer_tick_all, t));
}
void
@ -302,8 +319,9 @@ namespace llarp
else
call(user, timeout, diff);
}
if(deferredFunc)
if(deferredFunc && not canceled)
deferredFunc();
done = true;
deferredFunc = nullptr;
done = true;
}
} // namespace llarp

@ -60,6 +60,6 @@ llarp_timer_tick_all_async(struct llarp_timer_context *t,
struct llarp_threadpool *pool, llarp_time_t now);
void
llarp_free_timer(struct llarp_timer_context **t);
llarp_free_timer(struct llarp_timer_context *t);
#endif

@ -37,8 +37,15 @@ struct LinkLayerTest : public test::LlarpTest< llarp::sodium::CryptoLibSodium >
RouterContact rc;
bool madeSession = false;
bool gotLIM = false;
bool
IsGucci() const
{
return gotLIM && madeSession;
}
void Setup()
{
worker = std::make_shared<thread::ThreadPool>(1, 128, "test-worker");
@ -120,11 +127,13 @@ struct LinkLayerTest : public test::LlarpTest< llarp::sodium::CryptoLibSodium >
Context Bob;
bool success = false;
const bool shouldDebug = true;
llarp_ev_loop_ptr netLoop;
std::shared_ptr< Logic > m_logic;
llarp_time_t oldRCLifetime;
llarp::LogLevel oldLevel;
LinkLayerTest() : netLoop(nullptr)
{
@ -133,6 +142,9 @@ struct LinkLayerTest : public test::LlarpTest< llarp::sodium::CryptoLibSodium >
void
SetUp()
{
oldLevel = llarp::LogContext::Instance().minLevel;
if(shouldDebug)
llarp::SetLogLevel(eLogDebug);
oldRCLifetime = RouterContact::Lifetime;
RouterContact::BlockBogons = false;
RouterContact::Lifetime = 500;
@ -151,40 +163,23 @@ struct LinkLayerTest : public test::LlarpTest< llarp::sodium::CryptoLibSodium >
netLoop.reset();
RouterContact::BlockBogons = true;
RouterContact::Lifetime = oldRCLifetime;
}
static void
OnTimeout(void* u, uint64_t, uint64_t left)
{
if(left)
return;
llarp::LogInfo("timed out test");
static_cast< LinkLayerTest* >(u)->Stop();
llarp::SetLogLevel(oldLevel);
}
void
RunMainloop()
{
m_logic->call_later({5000, this, &OnTimeout});
m_logic->call_later(5000, std::bind(&LinkLayerTest::Stop, this));
llarp_ev_loop_run_single_process(netLoop, m_logic);
}
void
Stop()
{
m_logic->queue_func([&]() {
Alice.Stop();
Bob.Stop();
llarp_ev_loop_stop(netLoop);
});
}
bool
AliceGotMessage(const llarp_buffer_t&)
{
success = true;
Stop();
return true;
Alice.Stop();
Bob.Stop();
llarp_ev_loop_stop(netLoop);
m_logic->stop();
}
};
@ -193,33 +188,45 @@ TEST_F(LinkLayerTest, TestIWP)
#ifdef WIN32
GTEST_SKIP();
#else
auto sendDiscardMessage = [](ILinkSession* s, auto callback) -> bool {
// send discard message in reply to complete unit test
std::vector< byte_t> tmp(32);
llarp_buffer_t otherBuf(tmp);
DiscardMessage discard;
if(!discard.BEncode(&otherBuf))
return false;
return s->SendMessageBuffer(std::move(tmp), callback);
};
Alice.link = iwp::NewInboundLink(
Alice.encryptionKey,
[&]() -> const RouterContact& { return Alice.GetRC(); },
[&](ILinkSession* s, const llarp_buffer_t& buf) -> bool {
if(Alice.gotLIM)
{
Alice.Regen();
return s->RenegotiateSession();
}
else
{
LinkIntroMessage msg;
ManagedBuffer copy{buf};
if(!msg.BDecode(&copy.underlying))
return false;
if(!s->GotLIM(&msg))
return false;
Alice.gotLIM = true;
return true;
}
llarp_buffer_t copy(buf.base, buf.sz);
if(not Alice.gotLIM)
{
LinkIntroMessage msg;
if(msg.BDecode(&copy))
{
Alice.gotLIM = s->GotLIM(&msg);
}
}
return Alice.gotLIM;
},
[&](Signature& sig, const llarp_buffer_t& buf) -> bool {
return m_crypto.sign(sig, Alice.signingKey, buf);
},
[&](ILinkSession* s) -> bool {
const auto rc = s->GetRemoteRC();
return rc.pubkey == Bob.GetRC().pubkey;
if(rc.pubkey != Bob.GetRC().pubkey)
return false;
LogInfo("alice established with bob");
Alice.madeSession = true;
sendDiscardMessage(s, [&](auto status) {
success = status == llarp::ILinkSession::DeliveryStatus::eDeliverySuccess;
LogInfo("message sent to bob suceess=", success);
Stop();
});
return true;
},
[&](RouterContact, RouterContact) -> bool { return true; },
@ -231,27 +238,29 @@ TEST_F(LinkLayerTest, TestIWP)
[]() {})
;
auto sendDiscardMessage = [](ILinkSession* s) -> bool {
// send discard message in reply to complete unit test
std::vector< byte_t> tmp(32);
llarp_buffer_t otherBuf(tmp);
DiscardMessage discard;
if(!discard.BEncode(&otherBuf))
return false;
return s->SendMessageBuffer(std::move(tmp), nullptr);
};
Bob.link = iwp::NewInboundLink(
Bob.encryptionKey, [&]() -> const RouterContact& { return Bob.GetRC(); },
[&](ILinkSession* s, const llarp_buffer_t& buf) -> bool {
LinkIntroMessage msg;
ManagedBuffer copy{buf};
if(!msg.BDecode(&copy.underlying))
return false;
if(!s->GotLIM(&msg))
llarp_buffer_t copy(buf.base, buf.sz);
if(not Bob.gotLIM)
{
LinkIntroMessage msg;
if(msg.BDecode(&copy))
{
Bob.gotLIM = s->GotLIM(&msg);
}
return Bob.gotLIM;
}
DiscardMessage discard;
if(discard.BDecode(&copy))
{
LogInfo("bog got discard message from alice");
return true;
}
return false;
Bob.gotLIM = true;
return sendDiscardMessage(s);
},
[&](Signature& sig, const llarp_buffer_t& buf) -> bool {
@ -261,12 +270,12 @@ TEST_F(LinkLayerTest, TestIWP)
if(s->GetRemoteRC().pubkey != Alice.GetRC().pubkey)
return false;
LogInfo("bob established with alice");
return Bob.link->VisitSessionByPubkey(Alice.GetRC().pubkey.as_array(),
sendDiscardMessage);
Bob.madeSession = true;
return true;
},
[&](RouterContact newrc, RouterContact oldrc) -> bool {
success = newrc.pubkey == oldrc.pubkey;
return true;
return newrc.pubkey == oldrc.pubkey;
},
[&](ILinkSession* session) { ASSERT_FALSE(session->IsEstablished()); },
[&](RouterID router) { ASSERT_EQ(router, Alice.GetRouterID()); },
@ -279,6 +288,8 @@ TEST_F(LinkLayerTest, TestIWP)
m_logic->queue_func([&]() { ASSERT_TRUE(Alice.link->TryEstablishTo(Bob.GetRC())); });
RunMainloop();
ASSERT_TRUE(Bob.gotLIM);
ASSERT_TRUE(Alice.IsGucci());
ASSERT_TRUE(Bob.IsGucci());
ASSERT_TRUE(success);
#endif
};

Loading…
Cancel
Save