Some Windows fixes (#1415)

* Should fix some windows service issues

* fix return condition inversion

* Add some Trace level logging

also make the logger actually respect the log level you set.

* event loop should not queue things to itself...

at present, logic thread queue continues until it is empty, so
queueing things onto itself is just wasteful.

* call_later(foreach thing) is better than foreach thing (call later)

also if you already queued those things but they have not happened yet,
there is no sense to queue them to happen again.

* do not queue read on write finish, only on read finish

* failure to start DNS server should be proper startup failure.

without the DNS server working lokinet is...kinda pointless, right?

* format

* don't queue stuff to logic thread if in logic thread
the thing that clears the queue...clears it.  So you're just delaying and adding overhead.

* windows unbound thread sleep instead of just busy-waiting

also clang-format decided I can't have a blank line for some reason...

* fix unbound async worker on windows
pull/1418/head
Thomas Winget 4 years ago committed by GitHub
parent 12eb32a816
commit a91bb35dbf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -50,7 +50,7 @@ extern "C" LONG FAR PASCAL
win32_signal_handler(EXCEPTION_POINTERS*);
extern "C" VOID FAR PASCAL
win32_daemon_entry(DWORD, LPTSTR*);
VOID ReportSvcStatus(DWORD, DWORD, DWORD);
BOOL ReportSvcStatus(DWORD, DWORD, DWORD);
VOID
insert_description();
SERVICE_STATUS SvcStatus;
@ -252,6 +252,7 @@ uninstall_win32_daemon()
static void
run_main_context(std::optional<fs::path> confFile, const llarp::RuntimeOptions opts)
{
llarp::LogTrace("start of run_main_context()");
try
{
std::unique_ptr<llarp::Config> conf;
@ -295,6 +296,41 @@ run_main_context(std::optional<fs::path> confFile, const llarp::RuntimeOptions o
}
}
#ifdef _WIN32
void
TellWindowsServiceStopped()
{
::WSACleanup();
if (not start_as_daemon)
return;
llarp::LogInfo("Telling Windows the service has stopped.");
if (not ReportSvcStatus(SERVICE_STOPPED, NO_ERROR, 0))
{
auto error_code = GetLastError();
if (error_code == ERROR_INVALID_DATA)
llarp::LogError(
"SetServiceStatus failed: \"The specified service status structure is invalid.\"");
else if (error_code == ERROR_INVALID_HANDLE)
llarp::LogError("SetServiceStatus failed: \"The specified handle is invalid.\"");
else
llarp::LogError("SetServiceStatus failed with an unknown error.");
}
llarp::LogContext::Instance().ImmediateFlush();
}
class WindowsServiceStopped
{
public:
WindowsServiceStopped() = default;
~WindowsServiceStopped()
{
TellWindowsServiceStopped();
}
};
#endif
int
main(int argc, char* argv[])
{
@ -324,9 +360,9 @@ lokinet_main(int argc, char* argv[])
llarp::RuntimeOptions opts;
#ifdef _WIN32
WindowsServiceStopped stopped_raii;
if (startWinsock())
return -1;
ReportSvcStatus(SERVICE_RUNNING, NO_ERROR, 0);
SetConsoleCtrlHandler(handle_signal_win32, TRUE);
// SetUnhandledExceptionFilter(win32_signal_handler);
@ -468,6 +504,11 @@ lokinet_main(int argc, char* argv[])
std::thread main_thread{std::bind(&run_main_context, configFile, opts)};
auto ftr = exit_code.get_future();
#ifdef _WIN32
ReportSvcStatus(SERVICE_RUNNING, NO_ERROR, 0);
#endif
do
{
// do periodic non lokinet related tasks here
@ -497,6 +538,9 @@ lokinet_main(int argc, char* argv[])
LogError(wtf);
llarp::LogContext::Instance().ImmediateFlush();
}
#ifdef _WIN32
TellWindowsServiceStopped();
#endif
std::abort();
}
} while (ftr.wait_for(std::chrono::seconds(1)) != std::future_status::ready);
@ -521,10 +565,6 @@ lokinet_main(int argc, char* argv[])
}
llarp::LogContext::Instance().ImmediateFlush();
#ifdef _WIN32
::WSACleanup();
ReportSvcStatus(SERVICE_STOPPED, NO_ERROR, code);
#endif
if (ctx)
{
ctx.reset();
@ -533,7 +573,7 @@ lokinet_main(int argc, char* argv[])
}
#ifdef _WIN32
VOID
BOOL
ReportSvcStatus(DWORD dwCurrentState, DWORD dwWin32ExitCode, DWORD dwWaitHint)
{
static DWORD dwCheckPoint = 1;
@ -554,7 +594,7 @@ ReportSvcStatus(DWORD dwCurrentState, DWORD dwWin32ExitCode, DWORD dwWaitHint)
SvcStatus.dwCheckPoint = dwCheckPoint++;
// Report the status of the service to the SCM.
SetServiceStatus(SvcStatusHandle, &SvcStatus);
return SetServiceStatus(SvcStatusHandle, &SvcStatus);
}
VOID FAR PASCAL

@ -40,6 +40,7 @@ namespace llarp
{
if (not SetupUnboundResolver(resolvers))
{
llarp::LogError("Failed to add upstream resolvers during DNS server setup.");
return false;
}
}
@ -49,7 +50,8 @@ namespace llarp
LogicCall(m_ClientLogic, [=]() {
llarp_ev_add_udp(self->m_ClientLoop.get(), &self->m_Client, any.createSockAddr());
});
return llarp_ev_add_udp(self->m_ServerLoop.get(), &self->m_Server, addr.createSockAddr());
return (
llarp_ev_add_udp(self->m_ServerLoop.get(), &self->m_Server, addr.createSockAddr()) == 0);
}
static Proxy::Buffer_t

@ -41,6 +41,8 @@ namespace llarp::dns
runnerThread = std::make_unique<std::thread>([self = shared_from_this()]() {
while (self->started)
{
using namespace std::chrono_literals;
std::this_thread::sleep_for(20ms);
ub_wait(self->unboundContext);
}
});
@ -120,6 +122,11 @@ namespace llarp::dns
{
return false;
}
#ifdef _WIN32
ub_ctx_async(unboundContext, 1);
#endif
started = true;
RegisterPollFD();
return true;

@ -37,10 +37,14 @@ int
llarp_ev_add_udp(struct llarp_ev_loop* ev, struct llarp_udp_io* udp, const llarp::SockAddr& src)
{
if (ev == nullptr or udp == nullptr)
{
llarp::LogError("Attempting llarp_ev_add_udp() with null event loop or udp io struct.");
return -1;
}
udp->parent = ev;
if (ev->udp_listen(udp, src))
return 0;
llarp::LogError("llarp_ev_add_udp() call to udp_listen failed.");
return -1;
}

@ -6,7 +6,11 @@
namespace libuv
{
#define LoopCall(h, ...) LogicCall(static_cast<Loop*>((h)->loop->data)->m_Logic, __VA_ARGS__)
#define LoopCall(h, ...) \
{ \
auto __f = __VA_ARGS__; \
__f(); \
}
struct glue
{
@ -294,8 +298,10 @@ namespace libuv
static void
OnTick(uv_check_t* t)
{
llarp::LogTrace("conn_glue::OnTick() start");
conn_glue* conn = static_cast<conn_glue*>(t->data);
conn->Tick();
llarp::LogTrace("conn_glue::OnTick() end");
}
void
@ -367,10 +373,12 @@ namespace libuv
static void
OnTick(uv_check_t* t)
{
llarp::LogTrace("ticker_glue::OnTick() start");
ticker_glue* ticker = static_cast<ticker_glue*>(t->data);
ticker->func();
Loop* loop = static_cast<Loop*>(t->loop->data);
loop->FlushLogic();
llarp::LogTrace("ticker_glue::OnTick() end");
}
bool
@ -446,8 +454,10 @@ namespace libuv
static void
OnTick(uv_check_t* t)
{
llarp::LogTrace("udp_glue::OnTick() start");
udp_glue* udp = static_cast<udp_glue*>(t->data);
udp->Tick();
llarp::LogTrace("udp_glue::OnTick() end");
}
void
@ -570,8 +580,10 @@ namespace libuv
static void
OnTick(uv_check_t* h)
{
llarp::LogTrace("pipe_glue::OnTick() start");
pipe_glue* pipe = static_cast<pipe_glue*>(h->data);
LoopCall(h, std::bind(&pipe_glue::Tick, pipe));
llarp::LogTrace("pipe_glue::OnTick() end");
}
bool
@ -613,8 +625,10 @@ namespace libuv
static void
OnTick(uv_check_t* timer)
{
llarp::LogTrace("tun_glue::OnTick() start");
tun_glue* tun = static_cast<tun_glue*>(timer->data);
tun->Tick();
llarp::LogTrace("tun_glue::OnTick() end");
}
static void
@ -740,16 +754,19 @@ namespace libuv
void
Loop::FlushLogic()
{
llarp::LogTrace("Loop::FlushLogic() start");
while (not m_LogicCalls.empty())
{
auto f = m_LogicCalls.popFront();
f();
}
llarp::LogTrace("Loop::FlushLogic() end");
}
static void
OnAsyncWake(uv_async_t* async_handle)
{
llarp::LogTrace("OnAsyncWake, ticking event loop.");
Loop* loop = static_cast<Loop*>(async_handle->data);
loop->update_time();
loop->process_timer_queue();
@ -825,6 +842,7 @@ namespace libuv
int
Loop::run()
{
llarp::LogTrace("Loop::run()");
m_EventLoopThreadID = std::this_thread::get_id();
return uv_run(&m_Impl, UV_RUN_DEFAULT);
}
@ -871,6 +889,7 @@ namespace libuv
uint32_t
Loop::call_after_delay(llarp_time_t delay_ms, std::function<void(void)> callback)
{
llarp::LogTrace("Loop::call_after_delay()");
#ifdef TESTNET_SPEED
delay_ms *= TESTNET_SPEED;
#endif
@ -982,6 +1001,7 @@ namespace libuv
{
return true;
}
llarp::LogError("Loop::udp_listen failed to bind");
delete impl;
return false;
}
@ -1061,16 +1081,9 @@ namespace libuv
}
const auto inEventLoop = *m_EventLoopThreadID == std::this_thread::get_id();
while (m_LogicCalls.full() and inEventLoop)
{
FlushLogic();
}
if (inEventLoop)
{
if (m_LogicCalls.tryPushBack(f) != llarp::thread::QueueReturn::Success)
{
LogError("logic job queue is full");
}
f();
}
else
m_LogicCalls.pushBack(f);

@ -3,6 +3,7 @@
#ifdef _WIN32
#include <util/logging/logger.hpp>
#include <atomic>
// a single event queue for the TUN interface
static HANDLE tun_event_queue = INVALID_HANDLE_VALUE;
@ -136,6 +137,7 @@ tun_ev_loop(void* u)
asio_evt_pkt* pkt = nullptr;
BOOL alert;
std::atomic_flag tick_queued;
while (true)
{
alert = GetQueuedCompletionStatus(tun_event_queue, &size, &listener, &ovl, EV_TICK_INTERVAL);
@ -145,15 +147,21 @@ tun_ev_loop(void* u)
// tick listeners on io timeout, this is required to be done every tick
// cycle regardless of any io being done, this manages the internal state
// of the tun logic
for (const auto& tun : tun_listeners)
{
logic->call_soon([tun]() {
if (tick_queued.test_and_set())
continue; // if tick queued, don't queue another
logic->call_soon([&]() {
for (const auto& tun : tun_listeners)
{
tun->flush_write();
if (tun->t->tick)
tun->t->tick(tun->t);
});
}
continue; // let's go at it once more
}
tick_queued.clear();
});
continue;
}
if (listener == (ULONG_PTR)~0)
break;
@ -172,12 +180,7 @@ tun_ev_loop(void* u)
byte_t* readbuf = (byte_t*)malloc(1500);
ev->read(readbuf, 1500);
}
else
{
// ok let's queue another read!
byte_t* readbuf = (byte_t*)malloc(1500);
ev->read(readbuf, 1500);
}
logic->call_soon([ev]() {
ev->flush_write();
if (ev->t->tick)

@ -51,6 +51,7 @@ namespace llarp
void
TunEndpoint::tunifTick(llarp_tun_io* tun)
{
llarp::LogTrace("TunEndpoint::tunifTick()");
auto* self = static_cast<TunEndpoint*>(tun->user);
self->Flush();
}
@ -748,6 +749,7 @@ namespace llarp
};
// event loop ticker
auto ticker = [self, sendpkt]() {
llarp::LogTrace("TunEndpoint ticker() start");
TunEndpoint* ep = self.get();
const bool running = not ep->IsStopped();
auto impl = ep->GetVPNImpl();
@ -772,6 +774,7 @@ namespace llarp
// if impl has a tick function call it
if (impl && impl->parent && impl->parent->tick)
impl->parent->tick(impl->parent);
llarp::LogTrace("TunEndpoint ticker() end");
};
if (not loop->add_ticker(ticker))
{
@ -875,9 +878,8 @@ namespace llarp
}
if (!m_Resolver->Start(m_LocalResolverAddr, m_UpstreamResolvers))
{
// downgrade DNS server failure to a warning
llarp::LogWarn(Name(), " failed to start dns server");
// return false;
llarp::LogError(Name(), " failed to start DNS server");
return false;
}
return true;
}

@ -445,6 +445,8 @@ namespace llarp::net
std::string interface_name{interface_str.data()};
if ((!gateway.S_un.S_addr) and interface_name != ifname)
{
llarp::LogTrace(
"Win32 find gateway: Adding gateway (", interface_name, ") to list of gateways.");
gateways.push_back(std::move(interface_name));
}
});

@ -164,12 +164,14 @@ namespace llarp
void
Router::PumpLL()
{
llarp::LogTrace("Router::PumpLL() start");
if (_stopping.load())
return;
paths.PumpDownstream();
paths.PumpUpstream();
_outboundMessageHandler.Tick();
_linkManager.PumpLinks();
llarp::LogTrace("Router::PumpLL() end");
}
bool
@ -1272,6 +1274,10 @@ namespace llarp
[&](llarp::RouterContact rc) {
if (IsServiceNode())
return;
llarp::LogTrace(
"Before connect, outbound link adding route to (",
rc.addrs[0].toIpAddress().toIP(),
") via gateway.");
m_RoutePoker.AddRoute(rc.addrs[0].toIpAddress().toIP());
},
util::memFn(&Router::ConnectionEstablished, this),

@ -80,10 +80,7 @@ namespace llarp
SetLogLevel(LogLevel lvl)
{
LogContext::Instance().curLevel = lvl;
if (lvl == eLogDebug)
{
LogContext::Instance().runtimeLevel = lvl;
}
LogContext::Instance().runtimeLevel = lvl;
}
LogLevel
@ -107,6 +104,9 @@ namespace llarp
std::function<void(IOFunc_t)> io)
{
SetLogLevel(level);
if (level == eLogTrace)
LogTrace("Set log level to trace.");
nodeName = nickname;
FILE* logfile = nullptr;

Loading…
Cancel
Save