From 957a5ed8330544d314a148cc67291c60e48a69a8 Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Wed, 24 Oct 2018 14:02:42 -0400 Subject: [PATCH] initial epoll event loop implementation for tcp --- .vscode/c_cpp_properties.json | 6 +- .vscode/settings.json | 3 +- CMakeLists.txt | 18 ++- Makefile | 5 +- include/llarp/ev.h | 7 +- libabyss/include/abyss/json.hpp | 14 ++ libabyss/include/abyss/server.hpp | 74 +++++++++++ libabyss/include/libabyss.hpp | 85 +----------- libabyss/main.cpp | 52 ++++++++ libabyss/src/server.cpp | 211 +++++++++++++++++++++++++----- llarp/ev.cpp | 44 +++++-- llarp/ev.hpp | 196 ++++++++++++++++++--------- llarp/ev_epoll.hpp | 166 ++++++++++++++++++++--- llarp/ev_kqueue.hpp | 23 ++-- 14 files changed, 680 insertions(+), 224 deletions(-) create mode 100644 libabyss/include/abyss/json.hpp create mode 100644 libabyss/include/abyss/server.hpp create mode 100644 libabyss/main.cpp diff --git a/.vscode/c_cpp_properties.json b/.vscode/c_cpp_properties.json index 568179363..8c9f05481 100644 --- a/.vscode/c_cpp_properties.json +++ b/.vscode/c_cpp_properties.json @@ -7,6 +7,8 @@ "${workspaceFolder}/llarp", "${workspaceFolder}/daemon", "${workspaceFolder}/include", + "${workspaceFolder}/libabyss/src", + "${workspaceFolder}/crypto", "${workspaceFolder}/vendor/cppbackport-master/lib" ], "limitSymbolsToIncludedHeaders": true @@ -14,7 +16,9 @@ "includePath": [ "${workspaceFolder}/include", "${workspaceFolder}/llarp", - "${workspaceFolder}/vendor/cppbackport-master/lib" + "${workspaceFolder}/vendor/cppbackport-master/lib", + "${workspaceFolder}/libabyss/include", + "${workspaceFolder}/crypto/include" ], "defines": [], "compilerPath": "/usr/bin/clang", diff --git a/.vscode/settings.json b/.vscode/settings.json index 944e78a92..aaa132724 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -65,6 +65,7 @@ "vector": "cpp", "new": "cpp", "shared_mutex": "cpp", - "complex": "cpp" + "complex": "cpp", + "variant": "cpp" } } \ No newline at end of file diff --git a/CMakeLists.txt b/CMakeLists.txt index 5b3214dea..8512b1034 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -478,11 +478,6 @@ set(CLIENT_SRC client/main.cpp ) -set(ALL_SRC ${CLIENT_SRC} ${RC_SRC} ${EXE_SRC} ${DNS_SRC} ${LIB_PLATFORM_SRC} ${LIB_SRC} ${TEST_SRC}) - -foreach(F ${ALL_SRC}) -set_source_files_properties(${F} PROPERTIES COMPILE_FLAGS -DLOG_TAG=\\\"${F}\\\") -endforeach(F) # TODO: exclude this from includes and expose stuff properly for rcutil include_directories(llarp) @@ -491,10 +486,12 @@ include_directories(include) set(RC_EXE rcutil) set(DNS_EXE dns) -set(ABYSS ${CMAKE_SOURCE_DIR}/libabyss) +set(ABYSS libabyss) set(ABYSS_LIB abyss) +set(ABYSS_EXE ${ABYSS_LIB}-main) + include_directories(${ABYSS}/include) set(ABYSS_SRC @@ -505,6 +502,13 @@ set(ABYSS_SRC add_library(${ABYSS_LIB} ${ABYSS_SRC}) + +set(ALL_SRC ${CLIENT_SRC} ${RC_SRC} ${EXE_SRC} ${DNS_SRC} ${LIB_PLATFORM_SRC} ${LIB_SRC} ${TEST_SRC} ${ABYSS_SRC} ${ABYSS}/main.cpp) + +foreach(F ${ALL_SRC}) +set_source_files_properties(${F} PROPERTIES COMPILE_FLAGS -DLOG_TAG=\\\"${F}\\\") +endforeach(F) + if(SHADOW) add_shadow_plugin(shadow-plugin-${SHARED_LIB} ${EXE_SRC} ${LIB_SRC} ${LIB_PLATFORM_SRC} ${CPP_BACKPORT_SRC} ${ABYSS_SRC} ${CRYPTOGRAPHY_SRC}) target_link_libraries(shadow-plugin-${SHARED_LIB} ${LIBS}) @@ -513,6 +517,7 @@ else() add_executable(${RC_EXE} ${RC_SRC}) add_executable(${EXE} ${EXE_SRC}) add_executable(${CLIENT_EXE} ${CLIENT_SRC}) +add_executable(${ABYSS_EXE} ${ABYSS}/main.cpp) add_executable(${DNS_EXE} ${DNS_SRC}) add_subdirectory(${GTEST_DIR}) @@ -528,6 +533,7 @@ if(WITH_STATIC) target_link_libraries(${PLATFORM_LIB} -lcap) endif() target_link_libraries(${STATIC_LIB} ${CRYPTOGRAPHY_LIB} ${LIBS} ${PLATFORM_LIB}) + target_link_libraries(${ABYSS_EXE} ${STATIC_LIB}) if(NOT WITH_SHARED) target_link_libraries(${EXE} ${STATIC_LINK_LIBS} ${STATIC_LIB} ${PLATFORM_LIB}) diff --git a/Makefile b/Makefile index f44beedd1..3c337c7d5 100644 --- a/Makefile +++ b/Makefile @@ -37,7 +37,7 @@ TARGETS = $(REPO)/lokinet SIGS = $(TARGETS:=.sig) EXE = $(BUILD_ROOT)/lokinet TEST_EXE = $(BUILD_ROOT)/testAll - +ABYSS_EXE = $(BUILD_ROOT)/abyss-main DNS_PORT ?= 53 @@ -118,6 +118,9 @@ testnet: test: debug $(TEST_EXE) +abyss: debug + $(ABYSS_EXE) + format: clang-format -i $$(find daemon llarp include | grep -E '\.[h,c](pp)?$$') diff --git a/include/llarp/ev.h b/include/llarp/ev.h index 5708e66c5..5fec1909e 100644 --- a/include/llarp/ev.h +++ b/include/llarp/ev.h @@ -99,7 +99,8 @@ struct llarp_tcp_conn }; /// queue async write a buffer in full -void +/// return if we queueed it or not +bool llarp_tcp_conn_async_write(struct llarp_tcp_conn *, const void *, size_t); /// close a tcp connection @@ -114,8 +115,12 @@ struct llarp_tcp_acceptor void *impl; /// parent event loop (dont set me) struct llarp_ev_loop *loop; + /// handle tick + void (*tick)(struct llarp_tcp_acceptor *); /// handle inbound connection void (*accepted)(struct llarp_tcp_acceptor *, struct llarp_tcp_conn *); + /// handle after server socket closed (free-ing is handled by event loop) + void (*closed)(struct llarp_tcp_acceptor *); }; /// bind to an address and start serving async diff --git a/libabyss/include/abyss/json.hpp b/libabyss/include/abyss/json.hpp new file mode 100644 index 000000000..ff73ffd91 --- /dev/null +++ b/libabyss/include/abyss/json.hpp @@ -0,0 +1,14 @@ +#ifndef __ABYSS_JSON_JSON_HPP +#define __ABYSS_JSON_JSON_HPP + +#include + +namespace abyss +{ + namespace json + { + typedef rapidjson::Document Object; + } +} // namespace abyss + +#endif diff --git a/libabyss/include/abyss/server.hpp b/libabyss/include/abyss/server.hpp new file mode 100644 index 000000000..912ca912c --- /dev/null +++ b/libabyss/include/abyss/server.hpp @@ -0,0 +1,74 @@ +#ifndef __ABYSS_SERVER_HPP__ +#define __ABYSS_SERVER_HPP__ + +#include +#include +#include +#include +#include +#include +#include + +namespace abyss +{ + namespace http + { + struct ConnImpl; + + struct IRPCHandler + { + typedef std::string Method_t; + typedef abyss::json::Object Params; + typedef abyss::json::Object Response; + + IRPCHandler(ConnImpl* impl); + + virtual bool + HandleJSONRPC(const Method_t& method, const Params& params, + Response& response) = 0; + + ~IRPCHandler(); + + bool + ShouldClose(llarp_time_t now) const; + + private: + ConnImpl* m_Impl; + }; + + struct BaseReqHandler + { + BaseReqHandler(llarp_time_t req_timeout); + ~BaseReqHandler(); + + bool + ServeAsync(llarp_ev_loop* loop, llarp_logic* logic, + const sockaddr* bindaddr); + + void + RemoveConn(IRPCHandler* handler); + + protected: + virtual IRPCHandler* + CreateHandler(ConnImpl* connimpl) const = 0; + + private: + static void + OnTick(llarp_tcp_acceptor*); + + void + Tick(); + + static void + OnAccept(struct llarp_tcp_acceptor*, struct llarp_tcp_conn*); + + llarp_ev_loop* m_loop; + llarp_logic* m_Logic; + llarp_tcp_acceptor m_acceptor; + std::list< std::unique_ptr< IRPCHandler > > m_Conns; + llarp_time_t m_ReqTimeout; + }; + } // namespace http +} // namespace abyss + +#endif diff --git a/libabyss/include/libabyss.hpp b/libabyss/include/libabyss.hpp index 96112c2b8..1bc9e1d37 100644 --- a/libabyss/include/libabyss.hpp +++ b/libabyss/include/libabyss.hpp @@ -1,86 +1,5 @@ #ifndef __LIB_ABYSS_HPP__ #define __LIB_ABYSS_HPP__ - -#include -#include -#include -#include -#include -#include - -#ifdef USE_RAPIDJSON -#include -namespace json = rapidjson; -#else -namespace json -{ - struct Document; -} -#endif - -namespace abyss -{ - namespace http - { - struct ConnImpl; - - struct IRPCHandler - { - typedef std::string Method_t; - typedef ::json::Document Params; - typedef ::json::Document Response; - - IRPCHandler(ConnImpl* impl); - - virtual bool - HandleJSONRPC(const Method_t& method, const Params& params, - Response& response) = 0; - - ~IRPCHandler(); - - bool - ShouldClose(llarp_time_t now) const; - - private: - ConnImpl* m_Impl; - }; - - struct BaseReqHandler - { - BaseReqHandler(llarp_time_t req_timeout); - ~BaseReqHandler(); - - bool - ServeAsync(llarp_ev_loop* loop, llarp_logic* logic, - const sockaddr* bindaddr); - - void - RemoveConn(IRPCHandler* handler); - - protected: - virtual IRPCHandler* - CreateHandler(ConnImpl* connimpl) const = 0; - - private: - void - ScheduleTick(llarp_time_t ms); - - static void - OnTick(void* user, llarp_time_t orig, llarp_time_t left); - - void - Tick(); - - static void - OnAccept(struct llarp_tcp_acceptor*, struct llarp_tcp_conn*); - - llarp_ev_loop* m_loop; - llarp_logic* m_Logic; - llarp_tcp_acceptor m_acceptor; - std::list< std::unique_ptr< IRPCHandler > > m_Conns; - llarp_time_t m_ReqTimeout; - }; - } // namespace http -} // namespace abyss - +#include +#include #endif diff --git a/libabyss/main.cpp b/libabyss/main.cpp new file mode 100644 index 000000000..91ff351ef --- /dev/null +++ b/libabyss/main.cpp @@ -0,0 +1,52 @@ +#include +#include +#include + +struct DemoHandler : public abyss::http::IRPCHandler +{ + DemoHandler(abyss::http::ConnImpl* impl) : abyss::http::IRPCHandler(impl) + { + } + + bool + HandleJSONRPC(const Method_t& method, const Params& params, Response& resp) + { + resp.SetObject().AddMember("test", "value", resp.GetAllocator()); + return true; + } +}; + +struct DemoServer : public abyss::http::BaseReqHandler +{ + DemoServer() : abyss::http::BaseReqHandler(1000) + { + } + + abyss::http::IRPCHandler* + CreateHandler(abyss::http::ConnImpl* impl) const + { + return new DemoHandler(impl); + } +}; + +int +main(int argc, char* argv[]) +{ + signal(SIGPIPE, SIG_IGN); + llarp_threadpool* threadpool = llarp_init_same_process_threadpool(); + llarp_ev_loop* loop = nullptr; + llarp_ev_loop_alloc(&loop); + llarp_logic* logic = llarp_init_single_process_logic(threadpool); + sockaddr_in addr; + addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK); + addr.sin_port = htons(1222); + addr.sin_family = AF_INET; + DemoServer serv; + llarp::Addr a(addr); + llarp::LogInfo("bind to ", a); + if(serv.ServeAsync(loop, logic, a)) + llarp_ev_loop_run_single_process(loop, threadpool, logic); + else + llarp::LogError("Failed to serve: ", strerror(errno)); + return 0; +} diff --git a/libabyss/src/server.cpp b/libabyss/src/server.cpp index 4c3ce1cea..7798196ec 100644 --- a/libabyss/src/server.cpp +++ b/libabyss/src/server.cpp @@ -1,9 +1,10 @@ -#include +#include #include #include #include - -// #include +#include +#include +#include namespace abyss { @@ -12,9 +13,9 @@ namespace abyss struct RequestHeader { typedef std::unordered_multimap< std::string, std::string > Headers_t; - Headers_t m_Headers; + Headers_t Headers; std::string Method; - std::string Version; + std::string Path; }; struct ConnImpl @@ -47,9 +48,11 @@ namespace abyss handler = nullptr; m_LastActive = llarp_time_now_ms(); m_ReadTimeout = readtimeout; - c->read = &OnRead; - c->tick = &OnTick; - c->closed = nullptr; + // set up tcp members + _conn->user = this; + _conn->read = &ConnImpl::OnRead; + _conn->tick = &ConnImpl::OnTick; + _conn->closed = &ConnImpl::OnClosed; m_Bad = false; m_State = eReadHTTPMethodLine; } @@ -59,18 +62,166 @@ namespace abyss } bool - FeedLine(const std::string& line) + FeedLine(std::string& line) + { + switch(m_State) + { + case eReadHTTPMethodLine: + return ProcessMethodLine(line); + case eReadHTTPHeaders: + return ProcessHeaderLine(line); + default: + return false; + } + } + + bool + ProcessMethodLine(std::string& line) + { + // TODO: implement me + auto idx = line.find_first_of(' '); + if(idx == std::string::npos) + return false; + m_Header.Method = line.substr(0, idx); + line = line.substr(idx + 1); + idx = line.find_first_of(' '); + if(idx == std::string::npos) + return false; + m_Header.Path = line.substr(0, idx); + m_State = eReadHTTPHeaders; + return true; + } + + bool + ShouldProcessHeader(const std::string& name) const + { + // TODO: header whitelist + return true; + } + + bool + ProcessHeaderLine(std::string& line) + { + // TODO: implement me + auto idx = line.find_first_of(':'); + if(idx == std::string::npos) + return false; + std::string header = line.substr(0, idx); + std::string val = line.substr(idx); + // to lowercase + std::transform(header.begin(), header.end(), header.begin(), + [](char ch) -> char { return ::tolower(ch); }); + if(ShouldProcessHeader(header)) + { + val = val.substr(val.find_first_not_of(' ')); + m_Header.Headers.insert(std::make_pair(header, val)); + } + return true; + } + + bool + WriteStatusLine(int code, const std::string& message) + { + char buf[128] = {0}; + int sz = snprintf(buf, sizeof(buf), "HTTP/1.0 %d %s\r\n", code, + message.c_str()); + if(sz > 0) + { + llarp::LogInfo("HTTP ", code, " ", message); + return llarp_tcp_conn_async_write(_conn, buf, sz); + } + else + return false; + } + + bool + WriteResponseSimple(int code, const std::string& msg, + const char* contentType, const char* content) + { + if(!WriteStatusLine(code, msg)) + return false; + char buf[128] = {0}; + int sz = + snprintf(buf, sizeof(buf), "Content-Type: %s\r\n", contentType); + if(sz <= 0) + return false; + if(!llarp_tcp_conn_async_write(_conn, buf, sz)) + return false; + size_t contentLength = strlen(content); + sz = snprintf(buf, sizeof(buf), "Content-Length: %zu\r\n\r\n", + contentLength); + if(sz <= 0) + return false; + if(!llarp_tcp_conn_async_write(_conn, buf, sz)) + return false; + if(!llarp_tcp_conn_async_write(_conn, content, contentLength)) + return false; + m_State = eWriteHTTPBody; + return true; + } + + bool + FeedBody(const char* buf, size_t sz) { - return false; + llarp::LogInfo("HTTP ", m_Header.Method, " ", m_Header.Path, " ", sz); + if(sz == 0) + { + return WriteResponseSimple(400, "Bad Request", "text/plain", "nope"); + } + if(m_Header.Method != "POST") + { + return WriteResponseSimple(400, "Bad Request", "text/plain", "nope"); + } + return WriteResponseSimple(200, "OK", "text/json", "{}"); } bool ProcessRead(const char* buf, size_t sz) { if(m_Bad) + { + llarp::LogInfo("we bad"); return false; + } + + m_LastActive = llarp_time_now_ms(); + if(m_State < eReadHTTPBody) + { + if(strstr(buf, "\r\n") == nullptr) + { + // probably too big or small + return false; + } + m_ReadBuf << std::string(buf, sz); + std::string line; + while(std::getline(m_ReadBuf, line, '\n')) + { + if(line[0] == '\r') + { + m_State = eReadHTTPBody; + line = m_ReadBuf.str(); + const char* ptr = strstr(line.c_str(), "\r\n\r\n"); + if(ptr == nullptr) + return false; + line = std::string(ptr + 4); + m_ReadBuf.clear(); + return FeedBody(line.c_str(), line.size()); + } + auto pos = line.find_first_of('\r'); + if(pos == std::string::npos) + { + return false; + } + line = line.substr(0, pos); + + if(!FeedLine(line)) + return false; + } + m_ReadBuf.str(line); + } + else + return FeedBody(buf, sz); - m_ReadBuf << std::string(buf, sz); return true; } @@ -82,6 +233,13 @@ namespace abyss self->MarkBad(); } + static void + OnClosed(llarp_tcp_conn* conn) + { + ConnImpl* self = static_cast< ConnImpl* >(conn->user); + self->_conn = nullptr; + } + static void OnTick(llarp_tcp_conn* conn) { @@ -104,13 +262,18 @@ namespace abyss bool ShouldClose(llarp_time_t now) const { - return now - m_LastActive > m_ReadTimeout || m_Bad; + return now - m_LastActive > m_ReadTimeout || m_Bad + || m_State == eCloseMe; } void Close() { - llarp_tcp_conn_close(_conn); + if(_conn) + { + llarp_tcp_conn_close(_conn); + _conn = nullptr; + } } }; @@ -137,6 +300,8 @@ namespace abyss m_Logic = nullptr; m_acceptor.accepted = &BaseReqHandler::OnAccept; m_acceptor.user = this; + m_acceptor.tick = &OnTick; + m_acceptor.closed = nullptr; } bool @@ -145,22 +310,15 @@ namespace abyss { m_loop = loop; m_Logic = logic; - if(!llarp_tcp_serve(m_loop, &m_acceptor, bindaddr)) - return false; - ScheduleTick(1000); - return true; + return llarp_tcp_serve(m_loop, &m_acceptor, bindaddr); } void - BaseReqHandler::OnTick(void* user, llarp_time_t orig, llarp_time_t left) + BaseReqHandler::OnTick(llarp_tcp_acceptor* tcp) { - if(left) - return; - BaseReqHandler* self = static_cast< BaseReqHandler* >(user); + BaseReqHandler* self = static_cast< BaseReqHandler* >(tcp->user); self->Tick(); - - self->ScheduleTick(orig); } void @@ -177,12 +335,6 @@ namespace abyss } } - void - BaseReqHandler::ScheduleTick(llarp_time_t timeout) - { - llarp_logic_call_later(m_Logic, {timeout, this, &BaseReqHandler::OnTick}); - } - BaseReqHandler::~BaseReqHandler() { llarp_tcp_acceptor_close(&m_acceptor); @@ -201,7 +353,6 @@ namespace abyss return; } connimpl->handler = rpcHandler; - conn->user = connimpl; self->m_Conns.emplace_back(rpcHandler); } } // namespace http diff --git a/llarp/ev.cpp b/llarp/ev.cpp index 25cec7df4..fd7d3e5d0 100644 --- a/llarp/ev.cpp +++ b/llarp/ev.cpp @@ -119,23 +119,41 @@ llarp_ev_add_tun(struct llarp_ev_loop *loop, struct llarp_tun_io *tun) bool llarp_ev_tun_async_write(struct llarp_tun_io *tun, const void *pkt, size_t sz) { - // TODO: queue write - return static_cast< llarp::ev_io * >(tun->impl)->do_write((void *)pkt, sz); + return static_cast< llarp::ev_io * >(tun->impl)->queue_write( + (const byte_t *)pkt, sz); } bool llarp_tcp_serve(struct llarp_ev_loop *loop, struct llarp_tcp_acceptor *tcp, const struct sockaddr *bindaddr) { - tcp->loop = loop; - // TODO: implement me + tcp->loop = loop; + llarp::ev_io *impl = loop->bind_tcp(tcp, bindaddr); + if(impl) + { + tcp->impl = impl; + return loop->add_ev(impl); + } return false; } void llarp_tcp_acceptor_close(struct llarp_tcp_acceptor *tcp) { - // TODO: implement me + llarp::ev_io *impl = static_cast< llarp::ev_io * >(tcp->user); + tcp->impl = nullptr; + tcp->loop->close_ev(impl); + if(tcp->closed) + tcp->closed(tcp); + // dont free acceptor because it may be stack allocated +} + +bool +llarp_tcp_conn_async_write(struct llarp_tcp_conn *conn, const void *buf, + size_t sz) +{ + return static_cast< llarp::ev_io * >(conn->impl) + ->queue_write((const byte_t *)buf, sz); } void @@ -143,15 +161,17 @@ llarp_tcp_conn_close(struct llarp_tcp_conn *conn) { if(!conn) return; - llarp::ev_io *impl = static_cast< llarp::ev_io * >(conn->impl); - conn->impl = nullptr; - // deregister - conn->loop->close_ev(impl); - // close fd and delete impl - delete impl; + if(conn->impl) + { + llarp::ev_io *impl = static_cast< llarp::ev_io * >(conn->impl); + // deregister and dealloc + conn->loop->close_ev(impl); + conn->impl = nullptr; + } // call hook if needed if(conn->closed) conn->closed(conn); - // delete + + // delete conn delete conn; } diff --git a/llarp/ev.hpp b/llarp/ev.hpp index 35981d525..24694159d 100644 --- a/llarp/ev.hpp +++ b/llarp/ev.hpp @@ -8,7 +8,8 @@ #include #include #include -#include +#include +#include #ifdef _WIN32 #include @@ -26,62 +27,6 @@ namespace llarp { struct ev_io { -#ifndef _WIN32 - int fd; - ev_io(int f) : fd(f), m_writeq("writequeue"){}; -#else - // on windows, udp event loops are socket fds - // and TUN device is a plain old fd - std::variant< SOCKET, HANDLE > fd; - // the unique completion key that helps us to - // identify the object instance for which we receive data - // Here, we'll use the address of the udp_listener instance, converted - // to its literal int/int64 representation. - ULONG_PTR listener_id = 0; - ev_io(SOCKET f) : fd(f), m_writeq("writequeue"){}; - ev_io(HANDLE t) - : fd(t), m_writeq("writequeue"){}; // overload for TUN device, which - // _is_ a regular file descriptor -#endif - virtual int - read(void* buf, size_t sz) = 0; - - virtual int - sendto(const sockaddr* dst, const void* data, size_t sz) = 0; - - virtual void - tick(){}; - - /// used for tun interface and tcp conn - virtual bool - do_write(void* data, size_t sz) - { -#ifndef _WIN32 - return write(fd, data, sz) != -1; -#else - DWORD w; - return WriteFile(std::get< HANDLE >(fd), data, sz, &w, nullptr); -#endif - } - - /// called in event loop when fd is ready for writing - /// requeues anything not written - /// this assumes fd is set to non blocking - virtual void - flush_write() - { - m_writeq.Process([&](WriteBuffer& buffer) { - do_write(buffer.buf, buffer.bufsz); - // if we would block we save the entries for later - // discard entry - }); - /// reset errno - errno = 0; -#if _WIN32 - SetLastError(0); -#endif - } - struct WriteBuffer { llarp_time_t timestamp = 0; @@ -129,11 +74,135 @@ namespace llarp }; }; - llarp::util::CoDelQueue< WriteBuffer, WriteBuffer::GetTime, - WriteBuffer::PutTime, WriteBuffer::Compare, - llarp::util::NullMutex, llarp::util::NullLock > - m_writeq; + typedef llarp::util::CoDelQueue< WriteBuffer, WriteBuffer::GetTime, + WriteBuffer::PutTime, WriteBuffer::Compare, + llarp::util::NullMutex, + llarp::util::NullLock, 5, 100, 128 > + LossyWriteQueue_t; + + typedef std::deque< WriteBuffer > LosslessWriteQueue_t; + +#ifndef _WIN32 + int fd; + + ev_io(int f) : fd(f) + { + } + + /// for tun + ev_io(int f, LossyWriteQueue_t* lossyqueue) + : fd(f), m_LossyWriteQueue(lossyqueue) + { + } + /// for tcp + ev_io(int f, LosslessWriteQueue_t* q) : fd(f), m_BlockingWriteQueue(q) + { + } +#else + // on windows, udp event loops are socket fds + // and TUN device is a plain old fd + std::variant< SOCKET, HANDLE > fd; + // the unique completion key that helps us to + // identify the object instance for which we receive data + // Here, we'll use the address of the udp_listener instance, converted + // to its literal int/int64 representation. + ULONG_PTR listener_id = 0; + ev_io(SOCKET f) : fd(f), m_writeq("writequeue"){}; + ev_io(HANDLE t) + : fd(t), m_writeq("writequeue"){}; // overload for TUN device, which + // _is_ a regular file descriptor +#endif + virtual int + read(void* buf, size_t sz) = 0; + + virtual int + sendto(const sockaddr* dst, const void* data, size_t sz) + { + return -1; + }; + + virtual void + tick(){}; + + /// used for tun interface and tcp conn + ssize_t + do_write(void* data, size_t sz) + { +#ifndef _WIN32 + return write(fd, data, sz); +#else + DWORD w; + WriteFile(std::get< HANDLE >(fd), data, sz, &w, nullptr); + return w; +#endif + } + + bool + queue_write(const byte_t* buf, size_t sz) + { + if(m_LossyWriteQueue) + { + m_LossyWriteQueue->Emplace(buf, sz); + return true; + } + else if(m_BlockingWriteQueue) + { + m_BlockingWriteQueue->emplace_back(buf, sz); + return true; + } + else + return false; + } + + /// called in event loop when fd is ready for writing + /// requeues anything not written + /// this assumes fd is set to non blocking + virtual void + flush_write() + { + if(m_LossyWriteQueue) + m_LossyWriteQueue->Process([&](WriteBuffer& buffer) { + do_write(buffer.buf, buffer.bufsz); + // if we would block we save the entries for later + // discard entry + }); + else if(m_BlockingWriteQueue) + { + // write buffers + while(m_BlockingWriteQueue->size()) + { + auto& itr = m_BlockingWriteQueue->front(); + ssize_t result = do_write(itr.buf, itr.bufsz); + if(result == -1) + return; + ssize_t dlt = itr.bufsz - result; + if(dlt > 0) + { + // queue remaining to front of queue + WriteBuffer buff(itr.buf + dlt, itr.bufsz - dlt); + m_BlockingWriteQueue->pop_front(); + m_BlockingWriteQueue->push_front(buff); + // TODO: errno? + return; + } + m_BlockingWriteQueue->pop_front(); + if(errno == EAGAIN || errno == EWOULDBLOCK) + { + errno = 0; + return; + } + } + } + /// reset errno + errno = 0; +#if _WIN32 + SetLastError(0); +#endif + } + + std::unique_ptr< LossyWriteQueue_t > m_LossyWriteQueue; + std::unique_ptr< LosslessWriteQueue_t > m_BlockingWriteQueue; virtual ~ev_io() { #ifndef _WIN32 @@ -186,6 +255,9 @@ struct llarp_ev_loop virtual llarp::ev_io* create_tun(llarp_tun_io* tun) = 0; + virtual llarp::ev_io* + bind_tcp(llarp_tcp_acceptor* tcp, const sockaddr* addr) = 0; + virtual bool add_ev(llarp::ev_io* ev, bool write = false) = 0; @@ -194,7 +266,7 @@ struct llarp_ev_loop virtual ~llarp_ev_loop(){}; - std::vector< std::unique_ptr< llarp::ev_io > > handlers; + std::list< std::unique_ptr< llarp::ev_io > > handlers; void tick_listeners() diff --git a/llarp/ev_epoll.hpp b/llarp/ev_epoll.hpp index f55ee0067..670ee364d 100644 --- a/llarp/ev_epoll.hpp +++ b/llarp/ev_epoll.hpp @@ -5,6 +5,7 @@ #include #include #include +#include #include #include #include @@ -13,15 +14,106 @@ #include "llarp/net.hpp" #include "logger.hpp" #include "mem.hpp" +#include namespace llarp { - struct tcp_serv : public ev_io + struct tcp_conn : public ev_io { + llarp_tcp_conn* tcp; + tcp_conn(int fd, llarp_tcp_conn* conn) + : ev_io(fd, new LosslessWriteQueue_t()), tcp(conn) + { + } + + virtual int + do_write(const void* buf, size_t sz) + { + return ::send(fd, buf, sz, MSG_NOSIGNAL); // ignore sigpipe + } + + int + read(void* buf, size_t sz) + { + ssize_t amount = ::read(fd, buf, sz); + if(amount > 0) + { + if(tcp->read) + tcp->read(tcp, buf, amount); + } + else + { + // error + llarp_tcp_conn_close(tcp); + return -1; + } + return 0; + } + + void + tick() + { + if(tcp->tick) + tcp->tick(tcp); + } + + int + sendto(const sockaddr*, const void*, size_t) + { + return -1; + } }; - struct tcp_conn : public ev_io + struct tcp_serv : public ev_io { + llarp_ev_loop* loop; + llarp_tcp_acceptor* tcp; + tcp_serv(llarp_ev_loop* l, int fd, llarp_tcp_acceptor* t) + : ev_io(fd), loop(l), tcp(t) + { + // TODO: handle fail + assert(listen(fd, 5) != -1); + } + + void + tick() + { + if(tcp->tick) + tcp->tick(tcp); + } + + /// actually does accept() :^) + int + read(void*, size_t) + { + int new_fd = ::accept(fd, nullptr, nullptr); + if(new_fd == -1) + { + llarp::LogError("failed to accept on ", fd, ":", strerror(errno)); + return -1; + } + + llarp_tcp_conn* conn = new llarp_tcp_conn; + // zero out callbacks + conn->tick = nullptr; + conn->closed = nullptr; + conn->read = nullptr; + // build handler + llarp::tcp_conn* connimpl = new tcp_conn(new_fd, conn); + conn->impl = connimpl; + conn->loop = loop; + if(loop->add_ev(connimpl, true)) + { + // call callback + if(tcp->accepted) + tcp->accepted(tcp, conn); + return 0; + } + // cleanup error + delete conn; + delete connimpl; + return -1; + } }; struct udp_listener : public ev_io @@ -34,14 +126,14 @@ namespace llarp { } - virtual void + void tick() { if(udp->tick) udp->tick(udp); } - virtual int + int read(void* buf, size_t sz) { sockaddr_in6 src; @@ -56,7 +148,7 @@ namespace llarp return 0; } - virtual int + int sendto(const sockaddr* to, const void* data, size_t sz) { socklen_t slen; @@ -85,7 +177,7 @@ namespace llarp llarp_tun_io* t; device* tunif; tun(llarp_tun_io* tio) - : ev_io(-1) + : ev_io(-1, new LossyWriteQueue_t("tun_write_queue")) , t(tio) , tunif(tuntap_init()) @@ -99,7 +191,7 @@ namespace llarp return -1; } - virtual void + void tick() { if(t->tick) @@ -205,9 +297,16 @@ struct llarp_epoll_loop : public llarp_ev_loop while(idx < result) { llarp::ev_io* ev = static_cast< llarp::ev_io* >(events[idx].data.ptr); - if(events[idx].events & EPOLLIN) + if(ev) { - ev->read(readbuf, sizeof(readbuf)); + if(events[idx].events & EPOLLOUT) + { + ev->flush_write(); + } + if(events[idx].events & EPOLLIN) + { + ev->read(readbuf, sizeof(readbuf)); + } } ++idx; } @@ -231,9 +330,16 @@ struct llarp_epoll_loop : public llarp_ev_loop while(idx < result) { llarp::ev_io* ev = static_cast< llarp::ev_io* >(events[idx].data.ptr); - if(events[idx].events & EPOLLIN) + if(ev) { - ev->read(readbuf, sizeof(readbuf)); + if(events[idx].events & EPOLLOUT) + { + ev->flush_write(); + } + if(events[idx].events & EPOLLIN) + { + ev->read(readbuf, sizeof(readbuf)); + } } ++idx; } @@ -296,13 +402,39 @@ struct llarp_epoll_loop : public llarp_ev_loop if(epoll_ctl(epollfd, EPOLL_CTL_DEL, ev->fd, nullptr) == -1) return false; // deallocate - std::remove_if(handlers.begin(), handlers.end(), - [ev](const std::unique_ptr< llarp::ev_io >& i) -> bool { - return i.get() == ev; - }); + handlers.erase( + std::remove_if(handlers.begin(), handlers.end(), + [ev](const std::unique_ptr< llarp::ev_io >& i) -> bool { + return i.get() == ev; + })); return true; } + llarp::ev_io* + bind_tcp(llarp_tcp_acceptor* tcp, const sockaddr* bindaddr) + { + int fd = ::socket(bindaddr->sa_family, SOCK_STREAM, 0); + if(fd == -1) + return nullptr; + socklen_t sz = sizeof(sockaddr_in); + if(bindaddr->sa_family == AF_INET6) + { + sz = sizeof(sockaddr_in6); + } + else if(bindaddr->sa_family == AF_UNIX) + { + sz = sizeof(sockaddr_un); + } + if(bind(fd, bindaddr, sz) == -1) + { + ::close(fd); + return nullptr; + } + llarp::ev_io* serv = new llarp::tcp_serv(this, fd, tcp); + tcp->impl = serv; + return serv; + } + llarp::ev_io* create_tun(llarp_tun_io* tun) { @@ -332,8 +464,8 @@ struct llarp_epoll_loop : public llarp_ev_loop epoll_event ev; ev.data.ptr = e; ev.events = EPOLLIN; - // if(write) - // ev.events |= EPOLLOUT; + if(write) + ev.events |= EPOLLOUT; if(epoll_ctl(epollfd, EPOLL_CTL_ADD, e->fd, &ev) == -1) { delete e; diff --git a/llarp/ev_kqueue.hpp b/llarp/ev_kqueue.hpp index b16cfd1c9..db0ae7f76 100644 --- a/llarp/ev_kqueue.hpp +++ b/llarp/ev_kqueue.hpp @@ -34,7 +34,8 @@ namespace llarp { } - void tick() + void + tick() { if(udp->tick) udp->tick(udp); @@ -81,7 +82,7 @@ namespace llarp ssize_t sent = ::sendto(fd, data, sz, 0, to, slen); if(sent == -1 || errno) { - llarp::LogError("failed to send udp: ",strerror(errno)); + llarp::LogError("failed to send udp: ", strerror(errno)); errno = 0; } return sent; @@ -130,13 +131,14 @@ namespace llarp } } - void tick() + void + tick() { if(t->tick) t->tick(t); flush_write(); } - + int read(void* buf, size_t sz) { @@ -339,10 +341,11 @@ struct llarp_kqueue_loop : public llarp_ev_loop EV_SET(&change, ev->fd, EVFILT_READ, EV_DELETE, 0, 0, nullptr); if(kevent(kqueuefd, &change, 1, nullptr, 0, nullptr) != -1) { - std::remove_if(handlers.begin(), handlers.end(), - [ev](const std::unique_ptr & i) -> bool { - return i.get() == ev; - }); + handlers.erase(std::remove_if( + handlers.begin(), handlers.end(), + [ev](const std::unique_ptr< llarp::ev_io >& i) -> bool { + return i.get() == ev; + })); return true; } return false; @@ -355,7 +358,7 @@ struct llarp_kqueue_loop : public llarp_ev_loop if(fd == -1) return nullptr; llarp::udp_listener* listener = new llarp::udp_listener(fd, l); - l->impl = listener; + l->impl = listener; return listener; } @@ -385,7 +388,7 @@ struct llarp_kqueue_loop : public llarp_ev_loop // printf("Calling close_ev for [%x] fd[%d]\n", listener, listener->fd); ret = close_ev(listener); l->impl = nullptr; - ret = true; + ret = true; } return ret; }