fix up event loop crap so that unit tests pass

pull/1306/head
Jeff Becker 4 years ago
parent 5c039233db
commit e13e886df9
No known key found for this signature in database
GPG Key ID: F357B3B42F6F9B05

@ -276,17 +276,9 @@ if(SUBMODULE_CHECK)
endif() endif()
endif() endif()
# We only actually need pybind11 with WITH_HIVE, but if we don't load it here then something further if(WITH_HIVE)
# down loads a broken PythonInterp that loads Python2, but Python2 headers are not C++17 compatible.
# So load this here universally so that pybind's more intelligent python finder finds python3.x
# (which the crappier loader invoked below then respects).
# however! travis is FUBAR right now so disable it in travis builds
if(TRAVIS_CI_SUCKS OR NOT WITH_HIVE)
message(WARNING "you're on travis (which is broken garbage) or are compiling without routerhive, we have disabled pybind11, THANKS!")
else()
add_subdirectory(external/pybind11 EXCLUDE_FROM_ALL) add_subdirectory(external/pybind11 EXCLUDE_FROM_ALL)
endif() endif()
if(WITH_TESTS) if(WITH_TESTS)
add_subdirectory(external/googletest EXCLUDE_FROM_ALL) add_subdirectory(external/googletest EXCLUDE_FROM_ALL)
endif() endif()

@ -64,7 +64,10 @@ elseif(DOWNLOAD_UV)
endif() endif()
include_directories(${LIBUV_INCLUDE_DIRS}) include_directories(${LIBUV_INCLUDE_DIRS})
find_package(LokiMQ 1.2 QUIET) option(FORCE_LOKIMQ_SUBMODULE "force using lokimq submodule" OFF)
if(NOT FORCE_LOKIMQ_SUBMODULE)
find_package(LokiMQ 1.2)
endif()
if(LokiMQ_FOUND) if(LokiMQ_FOUND)
message(STATUS "using system lokimq") message(STATUS "using system lokimq")
else() else()

@ -368,7 +368,9 @@ namespace libuv
OnTick(uv_check_t* t) OnTick(uv_check_t* t)
{ {
ticker_glue* ticker = static_cast<ticker_glue*>(t->data); ticker_glue* ticker = static_cast<ticker_glue*>(t->data);
LoopCall(t, ticker->func); ticker->func();
Loop* loop = static_cast<Loop*>(t->loop->data);
loop->FlushLogic();
} }
bool bool
@ -841,11 +843,7 @@ namespace libuv
int int
Loop::run() Loop::run()
{ {
uv_timer_start( m_EventLoopThreadID = std::this_thread::get_id();
m_TickTimer,
[](uv_timer_t* t) { static_cast<Loop*>(t->loop->data)->FlushLogic(); },
1000,
1000);
return uv_run(&m_Impl, UV_RUN_DEFAULT); return uv_run(&m_Impl, UV_RUN_DEFAULT);
} }
@ -988,7 +986,6 @@ namespace libuv
void void
Loop::stopped() Loop::stopped()
{ {
tick(50);
llarp::LogInfo("we have stopped"); llarp::LogInfo("we have stopped");
} }
@ -1072,7 +1069,27 @@ namespace libuv
void void
Loop::call_soon(std::function<void(void)> f) Loop::call_soon(std::function<void(void)> f)
{ {
m_LogicCalls.tryPushBack(f); if (not m_EventLoopThreadID.has_value())
{
m_LogicCalls.tryPushBack(f);
uv_async_send(&m_WakeUp);
return;
}
const auto inEventLoop = *m_EventLoopThreadID == std::this_thread::get_id();
while (m_LogicCalls.full() and inEventLoop)
{
FlushLogic();
}
if (inEventLoop)
{
if (m_LogicCalls.tryPushBack(f) != llarp::thread::QueueReturn::Success)
{
f();
}
}
else
m_LogicCalls.pushBack(f);
uv_async_send(&m_WakeUp); uv_async_send(&m_WakeUp);
} }

@ -154,6 +154,7 @@ namespace libuv
llarp::thread::Queue<PendingTimer> m_timerQueue; llarp::thread::Queue<PendingTimer> m_timerQueue;
llarp::thread::Queue<uint32_t> m_timerCancelQueue; llarp::thread::Queue<uint32_t> m_timerCancelQueue;
std::optional<std::thread::id> m_EventLoopThreadID;
}; };
} // namespace libuv } // namespace libuv

@ -176,6 +176,10 @@ namespace llarp
virtual void virtual void
Stop() = 0; Stop() = 0;
/// non gracefully stop the router
virtual void
Die() = 0;
/// pump low level links /// pump low level links
virtual void virtual void
PumpLL() = 0; PumpLL() = 0;

@ -1021,6 +1021,26 @@ namespace llarp
_linkManager.Stop(); _linkManager.Stop();
} }
void
Router::Die()
{
if (!_running)
return;
if (_stopping)
return;
_stopping.store(true);
LogContext::Instance().RevertRuntimeLevel();
LogWarn("stopping router hard");
#if defined(WITH_SYSTEMD)
sd_notify(0, "STOPPING=1\nSTATUS=Shutting down HARD");
#endif
hiddenServiceContext().StopAll();
_exitContext.Stop();
StopLinks();
Close();
}
void void
Router::Stop() Router::Stop()
{ {

@ -352,6 +352,10 @@ namespace llarp
void void
Stop() override; Stop() override;
/// non graceful stop router
void
Die() override;
/// close all sessions and shutdown all links /// close all sessions and shutdown all links
void void
StopLinks(); StopLinks();

@ -15,21 +15,13 @@ namespace llarp
void void
Logic::Call(std::function<void(void)> func) Logic::Call(std::function<void(void)> func)
{ {
if (can_flush()) m_Queue(std::move(func));
{
func();
}
else
{
m_Queue(std::move(func));
}
} }
void void
Logic::SetQueuer(std::function<void(std::function<void(void)>)> q) Logic::SetQueuer(std::function<void(std::function<void(void)>)> q)
{ {
m_Queue = std::move(q); m_Queue = std::move(q);
m_Queue([self = this]() { self->m_ID = std::this_thread::get_id(); });
} }
uint32_t uint32_t
@ -63,16 +55,11 @@ namespace llarp
} }
} }
bool
Logic::can_flush() const
{
return *m_ID == std::this_thread::get_id();
}
void void
Logic::set_event_loop(llarp_ev_loop* loop) Logic::set_event_loop(llarp_ev_loop* loop)
{ {
m_Loop = loop; m_Loop = loop;
SetQueuer([loop](std::function<void(void)> work) { loop->call_soon(work); });
} }
void void

@ -3,7 +3,6 @@
#include <ev/ev.hpp> #include <ev/ev.hpp>
#include <util/mem.h> #include <util/mem.h>
#include <optional>
namespace llarp namespace llarp
{ {
@ -26,9 +25,6 @@ namespace llarp
void void
remove_call(uint32_t id); remove_call(uint32_t id);
bool
can_flush() const;
void void
SetQueuer(std::function<void(std::function<void(void)>)> q); SetQueuer(std::function<void(std::function<void(void)>)> q);
@ -39,9 +35,7 @@ namespace llarp
clear_event_loop(); clear_event_loop();
private: private:
using ID_t = std::thread::id;
llarp_ev_loop* m_Loop = nullptr; llarp_ev_loop* m_Loop = nullptr;
std::optional<ID_t> m_ID;
std::function<void(std::function<void(void)>)> m_Queue; std::function<void(std::function<void(void)>)> m_Queue;
}; };
} // namespace llarp } // namespace llarp

@ -64,8 +64,8 @@ TEST_CASE("key backup bug regression test", "[regress]")
REQUIRE(endpointAddress != ep->GetIdentity().pub.Addr()); REQUIRE(endpointAddress != ep->GetIdentity().pub.Addr());
} }
} }
// close the router "later" so llarp_main_run exits // close the router right away
ctx->CloseAsync(); ctx->router->Die();
}); });
REQUIRE(ctx->Run({}) == 0); REQUIRE(ctx->Run({}) == 0);
ctx.reset(); ctx.reset();

Loading…
Cancel
Save