refactor event loop stuff

find some place in the C code to place the worker thread procedure
until such time that michael presents the new thread pool class

fix unix

get a new event port each time and delet in the event loop after use
pull/66/head
despair 6 years ago
parent d9863128b9
commit e18f786ab2

3
.gitignore vendored

@ -43,4 +43,5 @@ lokinet.exe
rapidjson/
.gradle/
.gradle/
build64/

@ -48,12 +48,12 @@ else()
# found it. this is GNU only
add_compile_options(-Wno-cast-function-type)
endif(USING_CLANG)
else()
add_compile_options(-Wno-cast-function-type)
add_compile_options($<$<COMPILE_LANGUAGE:C>:-Wno-cast-function-type>)
endif()
if(WIN32)
add_compile_options($<$<COMPILE_LANGUAGE:C>:-Wno-bad-function-cast>)
add_compile_options($<$<COMPILE_LANGUAGE:C>:-Wno-cast-function-type>)
set(FS_LIB stdc++fs)
endif(WIN32)

@ -143,9 +143,6 @@ extern "C"
char if_name[IF_NAMESIZE];
#if defined(FreeBSD)
int mode;
#endif
#if defined(Windows)
OVERLAPPED ovl[2];
#endif
};

@ -36,7 +36,7 @@ namespace llarp
{
llarp_time_t timestamp = 0;
size_t bufsz;
byte_t buf[EV_WRITE_BUF_SZ] = {0};
byte_t buf[EV_WRITE_BUF_SZ];
WriteBuffer() = default;
@ -60,6 +60,20 @@ namespace llarp
}
};
struct GetNow
{
llarp_ev_loop* loop;
GetNow(llarp_ev_loop* l) : loop(l)
{
}
llarp_time_t
operator()() const
{
return llarp_ev_loop_time_now_ms(loop);
}
};
struct PutTime
{
llarp_ev_loop* loop;
@ -83,10 +97,10 @@ namespace llarp
};
};
typedef llarp::util::CoDelQueue< WriteBuffer, WriteBuffer::GetTime,
WriteBuffer::PutTime, WriteBuffer::Compare,
llarp::util::NullMutex,
llarp::util::NullLock, 5, 100, 128 >
typedef llarp::util::CoDelQueue<
WriteBuffer, WriteBuffer::GetTime, WriteBuffer::PutTime,
WriteBuffer::Compare, WriteBuffer::GetNow, llarp::util::NullMutex,
llarp::util::NullLock, 5, 100, 1024 >
LossyWriteQueue_t;
typedef std::deque< WriteBuffer > LosslessWriteQueue_t;
@ -97,24 +111,16 @@ namespace llarp
ULONG_PTR listener_id = 0;
bool isTCP = false;
bool write = false;
WSAOVERLAPPED portfd[2];
// constructors
// for udp
win32_ev_io(SOCKET f) : fd(f)
{
memset((void*)&portfd[0], 0, sizeof(WSAOVERLAPPED) * 2);
};
win32_ev_io(SOCKET f) : fd(f){};
// for tun
win32_ev_io(HANDLE t, LossyWriteQueue_t* q) : fd(t), m_LossyWriteQueue(q)
{
memset((void*)&portfd[0], 0, sizeof(WSAOVERLAPPED) * 2);
}
win32_ev_io(HANDLE t, LossyWriteQueue_t* q) : fd(t), m_LossyWriteQueue(q){}
// for tcp
win32_ev_io(SOCKET f, LosslessWriteQueue_t* q)
: fd(f), m_BlockingWriteQueue(q)
{
memset((void*)&portfd[0], 0, sizeof(WSAOVERLAPPED) * 2);
isTCP = true;
}
@ -141,13 +147,14 @@ namespace llarp
virtual ssize_t
do_write(void* data, size_t sz)
{
// DWORD w;
// hmm, think we should deallocate event ports in the loop itself
WSAOVERLAPPED* portfd = new WSAOVERLAPPED;
if(std::holds_alternative< HANDLE >(fd))
WriteFile(std::get< HANDLE >(fd), data, sz, nullptr, &portfd[1]);
WriteFile(std::get< HANDLE >(fd), data, sz, nullptr, portfd);
else
WriteFile((HANDLE)std::get< SOCKET >(fd), data, sz, nullptr,
&portfd[1]);
return sz;
portfd);
return sz; // we grab the error in the event loop
}
bool

@ -216,7 +216,7 @@ namespace llarp
llarp_tun_io* t;
device* tunif;
tun(llarp_tun_io* tio, llarp_ev_loop* l)
: ev_io(-1, new LossyWriteQueue_t("kqueue_tun_write", l))
: ev_io(-1, new LossyWriteQueue_t("kqueue_tun_write", l, l))
, t(tio)
, tunif(tuntap_init()){};

@ -19,13 +19,12 @@ namespace llarp
int
tcp_conn::read(void* buf, size_t sz)
{
WSABUF r_buf = {(u_long)sz, (char*)buf};
DWORD amount = 0;
WSABUF r_buf = {(u_long)sz, (char*)buf};
WSAOVERLAPPED* portfd = new WSAOVERLAPPED;
WSARecv(std::get< SOCKET >(fd), &r_buf, 1, nullptr, 0, &portfd[0], nullptr);
GetOverlappedResult((HANDLE)std::get< SOCKET >(fd), &portfd[0], &amount,
TRUE);
if(amount > 0)
WSARecv(std::get< SOCKET >(fd), &r_buf, 1, nullptr, 0, portfd, nullptr);
if(WSAGetLastError() == 997)
{
if(tcp.read)
tcp.read(&tcp, buf, amount);
@ -34,6 +33,7 @@ namespace llarp
{
// error
_shouldClose = true;
delete portfd;
return -1;
}
return 0;
@ -42,16 +42,18 @@ namespace llarp
ssize_t
tcp_conn::do_write(void* buf, size_t sz)
{
WSABUF s_buf = {(u_long)sz, (char*)buf};
DWORD sent = 0;
WSABUF s_buf = {(u_long)sz, (char*)buf};
WSAOVERLAPPED* portfd = new WSAOVERLAPPED;
if(_shouldClose)
{
delete portfd;
return -1;
}
WSASend(std::get< SOCKET >(fd), &s_buf, 1, nullptr, 0, &portfd[1], nullptr);
GetOverlappedResult((HANDLE)std::get< SOCKET >(fd), &portfd[1], &sent,
TRUE);
return sent;
WSASend(std::get< SOCKET >(fd), &s_buf, 1, nullptr, 0, portfd, nullptr);
return sz;
}
void
@ -137,6 +139,7 @@ namespace llarp
read(void* buf, size_t sz)
{
printf("read\n");
WSAOVERLAPPED* portfd = new WSAOVERLAPPED;
sockaddr_in6 src;
socklen_t slen = sizeof(src);
sockaddr* addr = (sockaddr*)&src;
@ -145,12 +148,13 @@ namespace llarp
// WSARecvFrom
llarp::LogDebug("read ", sz, " bytes from socket");
int ret = ::WSARecvFrom(std::get< SOCKET >(fd), &wbuf, 1, nullptr, &flags,
addr, &slen, &portfd[0], nullptr);
addr, &slen, portfd, nullptr);
// 997 is the error code for queued ops
int s_errno = ::WSAGetLastError();
if(ret && s_errno != 997)
{
llarp::LogWarn("recv socket error ", s_errno);
delete portfd;
return -1;
}
udp->recvfrom(udp, addr, buf, sz);
@ -161,6 +165,7 @@ namespace llarp
sendto(const sockaddr* to, const void* data, size_t sz)
{
printf("sendto\n");
WSAOVERLAPPED* portfd = new WSAOVERLAPPED;
socklen_t slen;
WSABUF wbuf = {(u_long)sz, (char*)data};
switch(to->sa_family)
@ -177,11 +182,12 @@ namespace llarp
// WSASendTo
llarp::LogDebug("write ", sz, " bytes into socket");
ssize_t sent = ::WSASendTo(std::get< SOCKET >(fd), &wbuf, 1, nullptr, 0,
to, slen, &portfd[1], nullptr);
to, slen, portfd, nullptr);
int s_errno = ::WSAGetLastError();
if(sent && s_errno != 997)
{
llarp::LogWarn("send socket error ", s_errno);
delete portfd;
return -1;
}
return 0;
@ -192,10 +198,9 @@ namespace llarp
{
llarp_tun_io* t;
device* tunif;
OVERLAPPED* tun_async[2];
tun(llarp_tun_io* tio, llarp_ev_loop* l)
: ev_io(INVALID_HANDLE_VALUE,
new LossyWriteQueue_t("win32_tun_write", l))
new LossyWriteQueue_t("win32_tun_write_queue", l, l))
, t(tio)
, tunif(tuntap_init()){};
@ -230,7 +235,8 @@ namespace llarp
ssize_t
do_write(void* data, size_t sz)
{
return WriteFile(std::get< HANDLE >(fd), data, sz, nullptr, tun_async[1]);
OVERLAPPED* tun_async = new OVERLAPPED;
return WriteFile(std::get< HANDLE >(fd), data, sz, nullptr, tun_async);
}
int
@ -247,8 +253,6 @@ namespace llarp
bool
setup()
{
llarp::LogDebug("set ifname to ", t->ifname);
if(tuntap_start(tunif, TUNTAP_MODE_TUNNEL, 0) == -1)
{
llarp::LogWarn("failed to start interface");
@ -265,9 +269,7 @@ namespace llarp
return false;
}
fd = tunif->tun_fd;
tun_async[0] = &tunif->ovl[0];
tun_async[1] = &tunif->ovl[1];
fd = tunif->tun_fd;
if(std::get< HANDLE >(fd) == INVALID_HANDLE_VALUE)
return false;
@ -360,47 +362,72 @@ struct llarp_win32_loop : public llarp_ev_loop
}
// it works! -despair86, 3-Aug-18 @0420
// if this works, may consider digging up the other code
// from the grave...
int
tick(int ms)
{
OVERLAPPED_ENTRY events[1024];
memset(&events, 0, sizeof(OVERLAPPED_ENTRY) * 1024);
ULONG result = 0;
::GetQueuedCompletionStatusEx(iocpfd, events, 1024, &result, ms, false);
ULONG idx = 0;
while(idx < result)
{
llarp::ev_io* ev =
reinterpret_cast< llarp::ev_io* >(events[idx].lpCompletionKey);
if(ev && events[idx].lpOverlapped)
DWORD iolen = 0;
ULONG_PTR ev_id = 0;
WSAOVERLAPPED* qdata = nullptr;
int idx = 0;
while(GetQueuedCompletionStatus(iocpfd, &iolen, &ev_id, &qdata, ms))
{
llarp::ev_io* ev = reinterpret_cast< llarp::ev_io* >(ev_id);
if(ev)
{
auto amount =
std::min(EV_READ_BUF_SZ, events[idx].dwNumberOfBytesTransferred);
if(ev->write)
ev->flush_write_buffers(amount);
ev->flush_write_buffers(iolen);
else
{
memcpy(readbuf, events[idx].lpOverlapped->Pointer, amount);
ev->read(readbuf, amount);
memcpy(readbuf, qdata->Pointer, iolen);
ev->read(readbuf, iolen);
}
}
++idx;
delete qdata;
}
tick_listeners();
return result;
if (!idx)
idx--;
return idx;
// shelve this for a bit...
/*
OVERLAPPED_ENTRY events[1024];
memset(&events, 0, sizeof(OVERLAPPED_ENTRY) * 1024);
ULONG result = 0;
::GetQueuedCompletionStatusEx(iocpfd, events, 1024, &result, ms, false);
ULONG idx = 0;
while(idx < result)
{
llarp::ev_io* ev =
reinterpret_cast< llarp::ev_io* >(events[idx].lpCompletionKey);
if(ev && events[idx].lpOverlapped)
{
auto amount =
std::min(EV_READ_BUF_SZ,
events[idx].dwNumberOfBytesTransferred); if(ev->write)
ev->flush_write_buffers(amount);
else
{
memcpy(readbuf, events[idx].lpOverlapped->Pointer, amount);
ev->read(readbuf, amount);
}
delete events[idx].lpOverlapped;
}
++idx;
}
tick_listeners();
return idx;
*/
}
// ok apparently this isn't being used yet...
int
run()
{
// The only field we really care about is
// the listener_id, as it contains the address
// of the udp_listener instance.
DWORD iolen = 0;
// ULONG_PTR is guaranteed to be the same size
// as an arch-specific pointer value
DWORD iolen = 0;
ULONG_PTR ev_id = 0;
WSAOVERLAPPED* qdata = nullptr;
int idx = 0;
@ -631,4 +658,17 @@ struct llarp_win32_loop : public llarp_ev_loop
}
};
// This hands us a new event port for the tun
// read/write functions that can later be freed with
// operator delete in the event loop tick
extern "C"
{
OVERLAPPED*
getTunEvPort()
{
OVERLAPPED* newport = new OVERLAPPED;
return newport;
}
}
#endif

@ -31,6 +31,14 @@
#define FILE_ANY_ACCESS 0x00000000
#define METHOD_BUFFERED 0
// from ev_win32.hpp
// gives us a fresh OVERLAPPED port
// from the C++ arena
// that can later be destructed with
// operator delete at the end of the event loop
OVERLAPPED *
getTunEvPort();
/* From OpenVPN tap driver, common.h */
#define TAP_CONTROL_CODE(request, method) \
CTL_CODE(FILE_DEVICE_UNKNOWN, request, method, FILE_ANY_ACCESS)
@ -415,9 +423,11 @@ tuntap_sys_set_ipv6(struct device *dev, t_tun_in6_addr *s, uint32_t mask)
int
tuntap_read(struct device *dev, void *buf, size_t size)
{
/* free this somewhere! */
OVERLAPPED *ovl = getTunEvPort();
if(size)
{
ReadFile(dev->tun_fd, buf, (DWORD)size, NULL, &dev->ovl[0]);
ReadFile(dev->tun_fd, buf, (DWORD)size, NULL, ovl);
int errcode = GetLastError();
@ -425,6 +435,7 @@ tuntap_read(struct device *dev, void *buf, size_t size)
{
tuntap_log(TUNTAP_LOG_ERR,
(const char *)formated_error(L"%1%0", errcode));
free(ovl);
return -1;
}
}
@ -434,15 +445,17 @@ tuntap_read(struct device *dev, void *buf, size_t size)
int
tuntap_write(struct device *dev, void *buf, size_t size)
{
OVERLAPPED *ovl = getTunEvPort();
if(size)
{
WriteFile(dev->tun_fd, buf, (DWORD)size, NULL, &dev->ovl[1]);
WriteFile(dev->tun_fd, buf, (DWORD)size, NULL, ovl);
int errcode = GetLastError();
if(errcode != 997)
{
tuntap_log(TUNTAP_LOG_ERR,
(const char *)formated_error(L"%1%0", errcode));
free(ovl);
return -1;
}
}

@ -61,11 +61,8 @@ extern "C"
dev->tun_fd = TUNFD_INVALID_VALUE;
dev->ctrl_sock = -1;
dev->flags = 0;
#if defined(Windows)
memset(&dev->ovl[0], 0, sizeof(OVERLAPPED) * 2);
#endif
__tuntap_log = &tuntap_log_default;
__tuntap_log = &tuntap_log_default;
return dev;
}

Loading…
Cancel
Save