initial tcp connect for epoll

pull/38/head^2
Jeff Becker 6 years ago
parent 6fe6e59bd5
commit 2d279e83fd
No known key found for this signature in database
GPG Key ID: F357B3B42F6F9B05

@ -5,13 +5,17 @@ set(PROJECT_NAME lokinet)
project(${PROJECT_NAME} C CXX ASM)
option(USE_LIBABYSS "enable libabyss" OFF)
option(USE_CXX17 "enable c++17 features" OFF)
# Require C++11
# or C++17 on win32
if (NOT WIN32)
set(CMAKE_CXX_STANDARD 11)
if(USE_CXX17)
set(CMAKE_CXX_STANDARD 17)
else()
set(CMAKE_CXX_STANDARD 11)
endif()
else()
set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_STANDARD 17)
endif(NOT WIN32)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
@ -492,6 +496,7 @@ set(DNS_EXE dns)
set(ALL_SRC ${CLIENT_SRC} ${RC_SRC} ${EXE_SRC} ${DNS_SRC} ${LIB_PLATFORM_SRC} ${LIB_SRC} ${TEST_SRC})
if(USE_LIBABYSS)
add_definitions(-DUSE_ABYSS=1)
set(ABYSS libabyss)
set(ABYSS_LIB abyss)
set(ABYSS_EXE ${ABYSS_LIB}-main)
@ -503,6 +508,8 @@ if(USE_LIBABYSS)
${ABYSS}/src/json.cpp)
add_library(${ABYSS_LIB} STATIC ${ABYSS_SRC})
set(ALL_SRC ${ALL_SRC} ${ABYSS_SRC} ${ABYSS}/main.cpp)
add_executable(${ABYSS_EXE} ${ABYSS}/main.cpp)
target_link_libraries(${ABYSS_EXE} ${PLATFORM_LIB})
endif()
foreach(F ${ALL_SRC})

@ -31,11 +31,12 @@ TESTNET_CLIENTS ?= 50
TESTNET_SERVERS ?= 50
TESTNET_DEBUG ?= 0
JSONRPC = OFF
JSONRPC = ON
CXX17 = ON
BUILD_ROOT = $(REPO)/build
CONFIG_CMD = $(shell /bin/echo -n "cd '$(BUILD_ROOT)' && " ; /bin/echo -n "cmake -DUSE_LIBABYSS=$(JSONRPC) '$(REPO)'")
CONFIG_CMD = $(shell /bin/echo -n "cd '$(BUILD_ROOT)' && " ; /bin/echo -n "cmake -DUSE_CXX17=$(CXX17) -DUSE_LIBABYSS=$(JSONRPC) '$(REPO)'")
SCAN_BUILD ?= scan-build
ANALYZE_CMD = $(shell /bin/echo -n "cd '$(BUILD_ROOT)' && " ; /bin/echo -n "$(SCAN_BUILD) cmake -DUSE_LIBABYSS=$(JSONRPC) '$(REPO)' && cd '$(BUILD_ROOT)' && $(SCAN_BUILD) $(MAKE)")

@ -25,6 +25,18 @@ namespace llarp
return dist;
}
bool
operator==(const Key_t& other) const
{
return memcmp(data(), other.data(), 32) == 0;
}
bool
operator!=(const Key_t& other) const
{
return memcmp(data(), other.data(), 32) != 0;
}
bool
operator<(const Key_t& other) const
{

@ -113,6 +113,29 @@ llarp_tcp_conn_async_write(struct llarp_tcp_conn *, const void *, size_t);
void
llarp_tcp_conn_close(struct llarp_tcp_conn *);
/// handles outbound connections to 1 endpoint
struct llarp_tcp_connecter
{
/// remote address family
int af;
/// remote address string
char remote[512];
/// userdata pointer
void *user;
/// parent event loop (dont set me)
struct llarp_ev_loop *loop;
/// handle outbound connection made
void (*connected)(struct llarp_tcp_connecter *, struct llarp_tcp_conn *);
/// handle outbound connection error
void (*error)(struct llarp_tcp_connecter *);
};
/// async try connecting to a remote connection 1 time
void
llarp_tcp_async_try_connect(struct llarp_ev_loop *l,
struct llarp_tcp_connecter *tcp);
/// handles inbound connections
struct llarp_tcp_acceptor
{
/// userdata pointer
@ -121,7 +144,7 @@ struct llarp_tcp_acceptor
void *impl;
/// parent event loop (dont set me)
struct llarp_ev_loop *loop;
/// handle tick
/// handle event loop tick
void (*tick)(struct llarp_tcp_acceptor *);
/// handle inbound connection
void (*accepted)(struct llarp_tcp_acceptor *, struct llarp_tcp_conn *);

@ -6,7 +6,7 @@
#include <iostream>
#include "logger.hpp"
#include "mem.hpp"
#include <llarp/string_view.hpp>
#include <vector>
#include <stdlib.h> // for itoa
@ -328,6 +328,12 @@ namespace llarp
this->port(p_port);
}
Addr(string_view addr_str, string_view port_str)
{
this->from_char_array(llarp::string_view_string(addr_str).c_str());
this->port(std::stoi(llarp::string_view_string(port_str)));
}
bool
from_char_array(const char* str)
{

@ -3,6 +3,8 @@
#include <llarp/time.h>
#include <llarp/ev.h>
#include <string>
#include <functional>
#include <llarp/crypto.hpp>
// forward declare
struct llarp_router;
@ -13,6 +15,7 @@ namespace llarp
{
struct ServerImpl;
/// jsonrpc server
struct Server
{
Server(llarp_router* r);
@ -25,6 +28,27 @@ namespace llarp
ServerImpl* m_Impl;
};
struct CallerImpl;
/// jsonrpc caller
struct Caller
{
Caller(llarp_router* r);
~Caller();
/// start with jsonrpc endpoint address
bool
Start(const std::string& remote);
/// async test if a router is valid via jsonrpc
void
AsyncVerifyRouter(llarp::PubKey pkey,
std::function< void(llarp::PubKey, bool) > handler);
private:
CallerImpl* m_Impl;
};
} // namespace rpc
} // namespace llarp

@ -0,0 +1,29 @@
#ifndef LLARP_STRING_VIEW_HPP
#define LLARP_STRING_VIEW_HPP
#if __cplusplus >= 201703L
#include <string_view>
#include <string>
namespace llarp
{
typedef std::string_view string_view;
static std::string
string_view_string(const string_view& v)
{
return std::string(v.data(), v.size());
}
} // namespace llarp
#else
#include <string>
namespace llarp
{
typedef std::string string_view;
static std::string
string_view_string(const string_view& v)
{
return v;
};
} // namespace llarp
#endif
#endif

@ -0,0 +1,117 @@
#ifndef __ABYSS_CLIENT_HPP__
#define __ABYSS_CLIENT_HPP__
#include <string>
#include <memory>
#include <list>
#include <deque>
#include <unordered_map>
#include <functional>
#include <abyss/json.hpp>
#include <llarp/ev.h>
namespace abyss
{
namespace http
{
typedef std::string RPC_Method_t;
typedef json::Value RPC_Params;
typedef json::Document RPC_Response;
typedef std::unordered_multimap< std::string, std::string > Headers_t;
struct ConnImpl;
/// jsonrpc response handler for client
struct IRPCClientHandler
{
IRPCClientHandler(ConnImpl* impl);
virtual ~IRPCClientHandler();
/// handle response from rpc server
/// return true on successful handling
/// return false on errors while handling
virtual bool
HandleResponse(const RPC_Response& response) = 0;
/// populate http request headers
virtual void
PopulateReqHeaders(Headers_t& hdr) = 0;
/// handle fatal internal error while doing request
virtual void
HandleError() = 0;
/// return true if we should close
bool
ShouldClose() const;
/// close underlying connection
void
Close() const;
private:
ConnImpl* m_Impl;
};
/// jsonrpc client
struct JSONRPC
{
typedef std::function< IRPCClientHandler*(ConnImpl*) > HandlerFactory;
JSONRPC();
~JSONRPC();
/// start runing on event loop async
/// return true on success otherwise return false
bool
RunAsync(llarp_ev_loop* loop, const std::string& endpoint);
/// must be called after RunAsync returns true
/// queue a call for rpc
void
QueueRPC(RPC_Method_t method, RPC_Params params,
HandlerFactory createHandler);
/// drop all pending calls on the floor
void
DropAllCalls();
/// handle new outbound connection
void
Connected(llarp_tcp_conn* conn);
/// flush queued rpc calls
void
Flush();
private:
struct Call
{
Call(RPC_Method_t&& m, RPC_Params&& p, HandlerFactory&& f)
: method(std::move(m))
, params(std::move(p))
, createHandler(std::move(f))
{
}
RPC_Method_t method;
RPC_Params params;
HandlerFactory createHandler;
};
static void
OnConnected(llarp_tcp_connecter* connect, llarp_tcp_conn* conn);
static void
OnConnectFail(llarp_tcp_connecter* connect);
static void
OnTick(llarp_tcp_connecter* connect);
llarp_tcp_connecter m_connect;
llarp_ev_loop* m_Loop;
std::deque< Call > m_PendingCalls;
std::list< std::unique_ptr< IRPCClientHandler > > m_Conns;
};
} // namespace http
} // namespace abyss
#endif

@ -0,0 +1,38 @@
#ifndef __ABYSS_HTTP_HPP__
#define __ABYSS_HTTP_HPP__
#include <string>
#include <unordered_map>
#include <abyss/json.hpp>
namespace abyss
{
namespace http
{
struct RequestHeader
{
typedef std::unordered_multimap< std::string, std::string > Headers_t;
Headers_t Headers;
std::string Method;
std::string Path;
};
struct HeaderReader
{
RequestHeader Header;
virtual ~HeaderReader()
{
}
bool
ProcessHeaderLine(abyss::string_view line, bool& done);
virtual bool
ShouldProcessHeader(const abyss::string_view& line) const
{
return true;
}
};
} // namespace http
} // namespace abyss
#endif

@ -26,17 +26,7 @@ namespace abyss
#endif
namespace abyss
{
// the code assumes C++11, if you _actually have_ C++17
// and have the ability to switch it on, then some of the code
// using string_view fails since it was made with the assumption
// that string_view is an alias for string!
//
// Specifically, one cannot use std::transform() on _real_ string_views
// since _those_ are read-only at all times - iterator and const_iterator
// are _exactly_ alike -rick
/* #if __cplusplus >= 201703L */
#if 0
#if __cplusplus >= 201703L
typedef std::string_view string_view;
#else
typedef std::string string_view;

@ -12,7 +12,7 @@
namespace abyss
{
namespace http
namespace httpd
{
struct ConnImpl;
@ -75,7 +75,7 @@ namespace abyss
std::list< std::unique_ptr< IRPCHandler > > m_Conns;
llarp_time_t m_ReqTimeout;
};
} // namespace http
} // namespace httpd
} // namespace abyss
#endif

@ -1,5 +1,5 @@
#ifndef __LIB_ABYSS_HPP__
#define __LIB_ABYSS_HPP__
#include <abyss/server.hpp>
#include <abyss/json.hpp>
#include <abyss/client.hpp>
#endif

@ -4,9 +4,9 @@
#include <sys/signal.h>
#endif
struct DemoHandler : public abyss::http::IRPCHandler
struct DemoHandler : public abyss::httpd::IRPCHandler
{
DemoHandler(abyss::http::ConnImpl* impl) : abyss::http::IRPCHandler(impl)
DemoHandler(abyss::httpd::ConnImpl* impl) : abyss::httpd::IRPCHandler(impl)
{
}
@ -20,14 +20,61 @@ struct DemoHandler : public abyss::http::IRPCHandler
}
};
struct DemoServer : public abyss::http::BaseReqHandler
struct DemoCall : public abyss::http::IRPCClientHandler
{
DemoServer() : abyss::http::BaseReqHandler(1000)
DemoCall(abyss::http::ConnImpl* impl) : abyss::http::IRPCClientHandler(impl)
{
llarp::LogInfo("new call");
}
abyss::http::IRPCHandler*
CreateHandler(abyss::http::ConnImpl* impl) const
bool
HandleResponse(const abyss::http::RPC_Response& resp)
{
std::string body;
abyss::json::ToString(resp, body);
llarp::LogInfo("got response body: ", body);
return true;
}
void
PopulateReqHeaders(abyss::http::Headers_t& hdr)
{
}
void
HandleError()
{
llarp::LogError("error while handling call: ", strerror(errno));
}
};
struct DemoClient : public abyss::http::JSONRPC
{
abyss::http::IRPCClientHandler*
NewConn(abyss::http::ConnImpl* impl)
{
return new DemoCall(impl);
}
void
DoDemoRequest()
{
abyss::json::Value params;
params.SetObject();
QueueRPC("test", std::move(params),
std::bind(&DemoClient::NewConn, this, std::placeholders::_1));
Flush();
};
};
struct DemoServer : public abyss::httpd::BaseReqHandler
{
DemoServer() : abyss::httpd::BaseReqHandler(1000)
{
}
abyss::httpd::IRPCHandler*
CreateHandler(abyss::httpd::ConnImpl* impl) const
{
return new DemoHandler(impl);
}
@ -51,6 +98,7 @@ main(int argc, char* argv[])
return err;
}
#endif
llarp::SetLogLevel(llarp::eLogDebug);
llarp_threadpool* threadpool = llarp_init_same_process_threadpool();
llarp_ev_loop* loop = nullptr;
llarp_ev_loop_alloc(&loop);
@ -60,12 +108,15 @@ main(int argc, char* argv[])
addr.sin_port = htons(1222);
addr.sin_family = AF_INET;
DemoServer serv;
DemoClient client;
llarp::Addr a(addr);
while(true)
{
llarp::LogInfo("bind to ", a);
if(serv.ServeAsync(loop, logic, a))
{
client.RunAsync(loop, a.ToString());
client.DoDemoRequest();
llarp_ev_loop_run_single_process(loop, threadpool, logic);
return 0;
}

@ -0,0 +1,386 @@
#include <abyss/client.hpp>
#include <abyss/http.hpp>
#include <llarp/logger.hpp>
#include <llarp/crypto.hpp>
namespace abyss
{
namespace http
{
struct ConnImpl : HeaderReader
{
// big
static const size_t MAX_BODY_SIZE = (1024 * 1024);
llarp_tcp_conn* m_Conn;
json::Document m_RequestBody;
Headers_t m_SendHeaders;
IRPCClientHandler* handler;
std::unique_ptr< abyss::json::IParser > m_BodyParser;
json::Document m_Response;
enum State
{
eInitial,
eReadStatusLine,
eReadResponseHeader,
eReadResponseBody,
eCloseMe
};
State state;
ConnImpl(llarp_tcp_conn* conn, RPC_Method_t method, RPC_Params params,
JSONRPC::HandlerFactory factory)
: m_Conn(conn), state(eInitial)
{
srand(time(nullptr));
conn->user = this;
conn->closed = &ConnImpl::OnClosed;
conn->read = &ConnImpl::OnRead;
conn->tick = &ConnImpl::OnTick;
handler = factory(this);
m_RequestBody.SetObject();
auto& alloc = m_RequestBody.GetAllocator();
m_RequestBody.AddMember("jsonrpc", json::Value().SetString("2.0"),
alloc);
m_RequestBody.AddMember("id", json::Value(abs(rand())), alloc);
m_RequestBody.AddMember(
"method", json::Value().SetString(method.c_str(), alloc), alloc);
m_RequestBody.AddMember("params", params, alloc);
}
static void
OnClosed(llarp_tcp_conn* conn)
{
ConnImpl* self = static_cast< ConnImpl* >(conn->user);
self->state = eCloseMe;
}
static void
OnRead(llarp_tcp_conn* conn, const void* buf, size_t sz)
{
ConnImpl* self = static_cast< ConnImpl* >(conn->user);
if(!self->ProcessRead((const char*)buf, sz))
self->CloseError();
}
static void
OnTick(llarp_tcp_conn* conn)
{
}
bool
ProcessStatusLine(string_view line) const
{
auto idx = line.find_first_of(' ');
if(idx == string_view::npos)
return false;
string_view codePart = line.substr(1 + idx);
idx = codePart.find_first_of(' ');
if(idx == string_view::npos)
return false;
return HandleStatusCode(codePart.substr(0, idx));
}
/// return true if we get a 200 status code
bool
HandleStatusCode(string_view code) const
{
return code == "200";
}
bool
ProcessBody(const char* buf, size_t sz)
{
// init parser
if(m_BodyParser == nullptr)
{
size_t contentSize = 0;
auto itr = Header.Headers.find("content-length");
// no content-length header
if(itr == Header.Headers.end())
return false;
// check size
contentSize = std::stoul(itr->second);
if(contentSize > MAX_BODY_SIZE)
return false;
m_BodyParser.reset(abyss::json::MakeParser(contentSize));
}
if(m_BodyParser && m_BodyParser->FeedData(buf, sz))
{
switch(m_BodyParser->Parse(m_Response))
{
case json::IParser::eNeedData:
return true;
case json::IParser::eDone:
handler->HandleResponse(m_Response);
Close();
return true;
case json::IParser::eParseError:
handler->HandleError();
return false;
default:
return false;
}
}
else
return false;
}
bool
ProcessRead(const char* buf, size_t sz)
{
if(state == eInitial)
return false;
bool done = false;
while(state < eReadResponseBody)
{
const char* end = strstr(buf, "\r\n");
string_view line(buf, end - buf);
switch(state)
{
case eReadStatusLine:
if(!ProcessStatusLine(line))
return false;
sz -= line.size() + (2 * sizeof(char));
state = eReadResponseHeader;
break;
case eReadResponseHeader:
if(!ProcessHeaderLine(line, done))
return false;
sz -= line.size() + (2 * sizeof(char));
if(done)
state = eReadResponseBody;
break;
default:
break;
}
buf = end + (2 * sizeof(char));
end = strstr(buf, "\r\n");
}
if(state == eReadResponseBody)
return ProcessBody(buf, sz);
return state == eCloseMe;
}
bool
ShouldClose() const
{
return state == eCloseMe;
}
void
CloseError()
{
if(handler)
handler->HandleError();
handler = nullptr;
Close();
}
void
Close()
{
if(m_Conn)
llarp_tcp_conn_close(m_Conn);
m_Conn = nullptr;
}
void
SendRequest()
{
// populate request headers
handler->PopulateReqHeaders(m_SendHeaders);
// create request body
std::string body;
json::ToString(m_RequestBody, body);
// request base
char buf[512] = {0};
snprintf(buf, sizeof(buf),
"POST /rpc HTTP/1.0\r\nContent-Type: "
"application/json\r\nContent-Length: %lu\r\nAccept: "
"application/json\r\n",
body.size());
if(!llarp_tcp_conn_async_write(m_Conn, buf, strnlen(buf, sizeof(buf))))
{
CloseError();
return;
}
// header delimiter
buf[0] = ':';
buf[1] = ' ';
// CRLF
buf[2] = '\r';
buf[3] = '\n';
// write extra request header
for(const auto& item : m_SendHeaders)
{
// header name
if(!llarp_tcp_conn_async_write(m_Conn, item.first.c_str(),
item.first.size()))
{
CloseError();
return;
}
// header delimiter
if(!llarp_tcp_conn_async_write(m_Conn, buf, 2 * sizeof(char)))
{
CloseError();
return;
}
// header value
if(!llarp_tcp_conn_async_write(m_Conn, item.second.c_str(),
item.second.size()))
{
CloseError();
return;
}
// CRLF
if(!llarp_tcp_conn_async_write(m_Conn, buf + 2, 2 * sizeof(char)))
{
CloseError();
return;
}
}
// CRLF
if(!llarp_tcp_conn_async_write(m_Conn, buf + 2, 2 * sizeof(char)))
{
CloseError();
return;
}
// request body
if(!llarp_tcp_conn_async_write(m_Conn, body.c_str(), body.size()))
{
CloseError();
return;
}
llarp::LogDebug("request sent");
state = eReadStatusLine;
}
};
void
JSONRPC::Flush()
{
/// close idle connections
auto itr = m_Conns.begin();
while(itr != m_Conns.end())
{
if((*itr)->ShouldClose())
{
(*itr)->Close();
itr = m_Conns.erase(itr);
}
else
++itr;
}
// open at most 10 connections
size_t numCalls = std::min(m_PendingCalls.size(), 10UL);
llarp::LogDebug("tick connect to rpc ", numCalls, " times");
while(numCalls--)
{
llarp_tcp_async_try_connect(m_Loop, &m_connect);
}
}
IRPCClientHandler::IRPCClientHandler(ConnImpl* impl) : m_Impl(impl)
{
}
bool
IRPCClientHandler::ShouldClose() const
{
return m_Impl && m_Impl->ShouldClose();
}
void
IRPCClientHandler::Close() const
{
if(m_Impl)
m_Impl->Close();
}
IRPCClientHandler::~IRPCClientHandler()
{
if(m_Impl)
delete m_Impl;
}
JSONRPC::JSONRPC()
{
}
JSONRPC::~JSONRPC()
{
}
void
JSONRPC::QueueRPC(RPC_Method_t method, RPC_Params params,
HandlerFactory createHandler)
{
m_PendingCalls.emplace_back(std::move(method), std::move(params),
std::move(createHandler));
}
bool
JSONRPC::RunAsync(llarp_ev_loop* loop, const std::string& remote)
{
strncpy(m_connect.remote, remote.c_str(), sizeof(m_connect.remote));
// TODO: ipv6
m_connect.connected = &JSONRPC::OnConnected;
m_connect.error = &JSONRPC::OnConnectFail;
m_connect.user = this;
m_connect.af = AF_INET;
m_Loop = loop;
return true;
}
void
JSONRPC::OnConnectFail(llarp_tcp_connecter* tcp)
{
JSONRPC* self = static_cast< JSONRPC* >(tcp->user);
llarp::LogError("failed to connect to RPC, dropped all pending calls");
self->DropAllCalls();
}
void
JSONRPC::OnConnected(llarp_tcp_connecter* tcp, llarp_tcp_conn* conn)
{
JSONRPC* self = static_cast< JSONRPC* >(tcp->user);
llarp::LogDebug("connected to RPC");
self->Connected(conn);
}
void
JSONRPC::Connected(llarp_tcp_conn* conn)
{
auto& front = m_PendingCalls.front();
ConnImpl* connimpl =
new ConnImpl(conn, std::move(front.method), std::move(front.params),
std::move(front.createHandler));
m_PendingCalls.pop_front();
m_Conns.emplace_back(connimpl->handler);
connimpl->SendRequest();
}
void
JSONRPC::DropAllCalls()
{
while(m_PendingCalls.size())
{
auto& front = m_PendingCalls.front();
IRPCClientHandler* h = front.createHandler(nullptr);
h->HandleError();
delete h;
m_PendingCalls.pop_front();
}
}
} // namespace http
} // namespace abyss

@ -0,0 +1,36 @@
#include <abyss/http.hpp>
#include <algorithm>
namespace abyss
{
namespace http
{
bool
HeaderReader::ProcessHeaderLine(string_view line, bool& done)
{
if(line.size() == 0)
{
done = true;
return true;
}
auto idx = line.find_first_of(':');
if(idx == string_view::npos)
return false;
string_view header = line.substr(0, idx);
string_view val = line.substr(1 + idx);
// to lowercase
std::string lowerHeader;
auto itr = header.begin();
while(itr != header.end())
{
lowerHeader += ::tolower(*itr);
++itr;
}
if(ShouldProcessHeader(string_view(lowerHeader)))
{
val = val.substr(val.find_first_not_of(' '));
Header.Headers.insert(std::make_pair(lowerHeader, val));
}
return true;
}
} // namespace http
} // namespace abyss

@ -1,4 +1,5 @@
#include <abyss/server.hpp>
#include <abyss/http.hpp>
#include <llarp/time.h>
#include <sstream>
#include <unordered_map>
@ -8,17 +9,9 @@
namespace abyss
{
namespace http
namespace httpd
{
struct RequestHeader
{
typedef std::unordered_multimap< std::string, std::string > Headers_t;
Headers_t Headers;
std::string Method;
std::string Path;
};
struct ConnImpl
struct ConnImpl : abyss::http::HeaderReader
{
llarp_tcp_conn* _conn;
IRPCHandler* handler;
@ -26,7 +19,6 @@ namespace abyss
llarp_time_t m_LastActive;
llarp_time_t m_ReadTimeout;
bool m_Bad;
RequestHeader m_Header;
std::unique_ptr< abyss::json::IParser > m_BodyParser;
json::Document m_Request;
json::Document m_Response;
@ -66,12 +58,17 @@ namespace abyss
bool
FeedLine(std::string& line)
{
bool done = false;
switch(m_State)
{
case eReadHTTPMethodLine:
return ProcessMethodLine(line);
case eReadHTTPHeaders:
return ProcessHeaderLine(line);
if(!ProcessHeaderLine(line, done))
return false;
if(done)
m_State = eReadHTTPBody;
return true;
default:
return false;
}
@ -83,13 +80,13 @@ namespace abyss
auto idx = line.find_first_of(' ');
if(idx == string_view::npos)
return false;
m_Header.Method = line.substr(0, idx);
line = line.substr(idx + 1);
idx = line.find_first_of(' ');
Header.Method = line.substr(0, idx);
line = line.substr(idx + 1);
idx = line.find_first_of(' ');
if(idx == string_view::npos)
return false;
m_Header.Path = line.substr(0, idx);
m_State = eReadHTTPHeaders;
Header.Path = line.substr(0, idx);
m_State = eReadHTTPHeaders;
return true;
}
@ -100,31 +97,6 @@ namespace abyss
return name == "content-type" || name == "content-length";
}
bool
ProcessHeaderLine(string_view line)
{
if(line.size() == 0)
{
// end of headers
m_State = eReadHTTPBody;
return true;
}
auto idx = line.find_first_of(':');
if(idx == string_view::npos)
return false;
string_view header = line.substr(0, idx);
string_view val = line.substr(1 + idx);
// to lowercase
std::transform(header.begin(), header.end(), header.begin(),
[](char ch) -> char { return ::tolower(ch); });
if(ShouldProcessHeader(header))
{
val = val.substr(val.find_first_not_of(' '));
m_Header.Headers.insert(std::make_pair(header, val));
}
return true;
}
bool
WriteStatusLine(int code, const std::string& message)
{
@ -168,14 +140,14 @@ namespace abyss
bool
FeedBody(const char* buf, size_t sz)
{
if(m_Header.Method != "POST")
if(Header.Method != "POST")
{
return WriteResponseSimple(405, "Method Not Allowed", "text/plain",
"nope");
}
{
auto itr = m_Header.Headers.find("content-type");
if(itr == m_Header.Headers.end())
auto itr = Header.Headers.find("content-type");
if(itr == Header.Headers.end())
{
return WriteResponseSimple(415, "Unsupported Media Type",
"text/plain",
@ -192,8 +164,8 @@ namespace abyss
if(m_BodyParser == nullptr)
{
ssize_t contentLength = 0;
auto itr = m_Header.Headers.find("content-length");
if(itr == m_Header.Headers.end())
auto itr = Header.Headers.find("content-length");
if(itr == Header.Headers.end())
{
return WriteResponseSimple(400, "Bad Request", "text/plain",
"no content length");
@ -264,14 +236,14 @@ namespace abyss
{
return false;
}
bool done = false;
m_LastActive = _parent->now();
if(m_State < eReadHTTPBody)
{
const char* end = strstr(buf, "\r\n");
while(end)
{
string_view line(buf, end);
string_view line(buf, end - buf);
switch(m_State)
{
case eReadHTTPMethodLine:
@ -280,9 +252,11 @@ namespace abyss
sz -= line.size() + (2 * sizeof(char));
break;
case eReadHTTPHeaders:
if(!ProcessHeaderLine(line))
if(!ProcessHeaderLine(line, done))
return false;
sz -= line.size() + (2 * sizeof(char));
if(done)
m_State = eReadHTTPBody;
break;
default:
break;
@ -426,5 +400,5 @@ namespace abyss
connimpl->handler = rpcHandler;
self->m_Conns.emplace_back(rpcHandler);
}
} // namespace http
} // namespace httpd
} // namespace abyss

@ -385,9 +385,9 @@ namespace llarp
{
if(handleResult)
handleResult(valuesFound);
else
parent->DHTSendTo(whoasked.node,
new GotIntroMessage(valuesFound, whoasked.txid));
parent->DHTSendTo(whoasked.node,
new GotIntroMessage(valuesFound, whoasked.txid));
}
};
@ -398,7 +398,7 @@ namespace llarp
LocalServiceAddressLookup(const PathID_t &pathid, uint64_t txid,
const service::Address &addr, Context *ctx,
const Key_t &askpeer)
: ServiceAddressLookup(TXOwner{ctx->OurKey(), txid}, addr, ctx, 4,
: ServiceAddressLookup(TXOwner{ctx->OurKey(), txid}, addr, ctx, 5,
nullptr)
, localPath(pathid)
{

@ -147,8 +147,8 @@ namespace llarp
}
else
{
llarp::LogError(
"cannot find closer peers for introset lookup for ", S);
// no more closer peers
replies.emplace_back(new GotIntroMessage({}, T));
return true;
}
}
@ -165,7 +165,9 @@ namespace llarp
}
else
{
llarp::LogWarn("no closer peers for tag ", N.ToString());
// no more closer peers
replies.emplace_back(new GotIntroMessage({}, T));
return true;
}
}
else

@ -1,5 +1,6 @@
#include <llarp/ev.h>
#include <llarp/logic.h>
#include <llarp/string_view.hpp>
#include "mem.hpp"
#define EV_TICK_INTERVAL 100
@ -143,6 +144,46 @@ llarp_tcp_conn_async_write(struct llarp_tcp_conn *conn, const void *pkt,
return impl->queue_write(ptr, sz);
}
void
llarp_tcp_async_try_connect(struct llarp_ev_loop *loop,
struct llarp_tcp_connecter *tcp)
{
tcp->loop = loop;
llarp::string_view addr_str, port_str;
// try parsing address
const char *begin = tcp->remote;
const char *ptr = strstr(tcp->remote, ":");
// get end of address
if(ptr == nullptr)
{
llarp::LogError("bad address: ", tcp->remote);
if(tcp->error)
tcp->error(tcp);
return;
}
const char *end = ptr;
while(*end && (end - begin) < sizeof(tcp->remote))
{
++end;
}
addr_str = llarp::string_view(begin, ptr - begin);
++ptr;
port_str = llarp::string_view(ptr, end - ptr);
// actually parse address
llarp::Addr addr(addr_str, port_str);
llarp::ev_io *conn = loop->tcp_connect(tcp, addr);
if(conn && loop->add_ev(conn, true))
{
llarp::LogDebug("async connect to ", addr);
return;
}
llarp::LogError("async connect failed");
if(tcp->error)
tcp->error(tcp);
}
bool
llarp_tcp_serve(struct llarp_ev_loop *loop, struct llarp_tcp_acceptor *tcp,
const struct sockaddr *bindaddr)
@ -151,7 +192,6 @@ llarp_tcp_serve(struct llarp_ev_loop *loop, struct llarp_tcp_acceptor *tcp,
llarp::ev_io *impl = loop->bind_tcp(tcp, bindaddr);
if(impl)
{
tcp->impl = impl;
return loop->add_ev(impl);
}
return false;
@ -193,12 +233,12 @@ namespace llarp
{
if(_shouldClose)
{
if(tcp && tcp->closed)
tcp->closed(tcp);
if(tcp.closed)
tcp.closed(&tcp);
return false;
}
else if(tcp->tick)
tcp->tick(tcp);
else if(tcp.tick)
tcp.tick(&tcp);
return true;
}

@ -301,6 +301,12 @@ namespace llarp
{
}
virtual void
error()
{
llarp::LogError(strerror(errno));
}
virtual int
read(void* buf, size_t sz) = 0;
@ -406,18 +412,71 @@ namespace llarp
// on sockets
struct tcp_conn : public ev_io
{
bool _shouldClose = false;
llarp_tcp_conn* tcp;
tcp_conn(int fd, llarp_tcp_conn* conn)
: ev_io(fd, new LosslessWriteQueue_t{}), tcp(conn)
sockaddr_storage _addr;
bool _shouldClose = false;
bool _calledConnected = false;
llarp_tcp_conn tcp;
// null if inbound otherwise outbound
llarp_tcp_connecter* _conn;
/// inbound
tcp_conn(llarp_ev_loop* loop, int fd)
: ev_io(fd, new LosslessWriteQueue_t{}), _conn(nullptr)
{
tcp.impl = this;
tcp.loop = loop;
tcp.closed = nullptr;
tcp.user = nullptr;
tcp.read = nullptr;
tcp.tick = nullptr;
}
/// outbound
tcp_conn(llarp_ev_loop* loop, int fd, const sockaddr* addr,
llarp_tcp_connecter* conn)
: ev_io(fd, new LosslessWriteQueue_t{}), _conn(conn)
{
socklen_t slen = sizeof(sockaddr_in);
if(addr->sa_family == AF_INET6)
slen = sizeof(sockaddr_in6);
else if(addr->sa_family == AF_UNIX)
slen = sizeof(sockaddr_un);
memcpy(&_addr, addr, slen);
tcp.impl = this;
tcp.loop = loop;
tcp.closed = nullptr;
tcp.user = nullptr;
tcp.read = nullptr;
tcp.tick = nullptr;
}
virtual ~tcp_conn()
{
delete tcp;
}
/// start connecting
void
connect();
/// calls connected hooks
void
connected()
{
// we are connected yeh boi
if(_conn)
{
if(_conn->connected && !_calledConnected)
_conn->connected(_conn, &tcp);
}
_calledConnected = true;
}
void
flush_write();
void
error();
virtual ssize_t
do_write(void* buf, size_t sz);
@ -426,7 +485,6 @@ namespace llarp
bool
tick();
};
struct tcp_serv : public ev_io
@ -436,6 +494,7 @@ namespace llarp
tcp_serv(llarp_ev_loop* l, int fd, llarp_tcp_acceptor* t)
: ev_io(fd), loop(l), tcp(t)
{
tcp->impl = this;
}
bool
@ -458,7 +517,8 @@ namespace llarp
struct llarp_ev_loop
{
byte_t readbuf[EV_READ_BUF_SZ] = {0};
llarp_time_t _now = 0;
llarp_time_t _now = 0;
virtual bool
init() = 0;
virtual int
@ -488,6 +548,9 @@ struct llarp_ev_loop
virtual llarp::ev_io*
bind_tcp(llarp_tcp_acceptor* tcp, const sockaddr* addr) = 0;
virtual llarp::ev_io*
tcp_connect(llarp_tcp_connecter* tcp, const sockaddr* addr) = 0;
/// register event listener
virtual bool
add_ev(llarp::ev_io* ev, bool write = false) = 0;

@ -28,8 +28,8 @@ namespace llarp
if(amount > 0)
{
if(tcp->read)
tcp->read(tcp, buf, amount);
if(tcp.read)
tcp.read(&tcp, buf, amount);
}
else
{
@ -40,6 +40,13 @@ namespace llarp
return 0;
}
void
tcp_conn::flush_write()
{
connected();
ev_io::flush_write();
}
ssize_t
tcp_conn::do_write(void* buf, size_t sz)
{
@ -51,6 +58,46 @@ namespace llarp
return ::send(fd, buf, sz, MSG_NOSIGNAL); // ignore sigpipe
}
void
tcp_conn::connect()
{
socklen_t slen = sizeof(sockaddr_in);
if(_addr.ss_family == AF_UNIX)
slen = sizeof(sockaddr_un);
else if(_addr.ss_family == AF_INET6)
slen = sizeof(sockaddr_in6);
int result = ::connect(fd, (const sockaddr*)&_addr, slen);
if(result == 0)
{
llarp::LogDebug("connected immedidately");
connected();
}
else if(errno == EINPROGRESS)
{
// in progress
llarp::LogDebug("connect in progress");
errno = 0;
return;
}
else if(_conn->error)
{
// wtf?
llarp::LogError("error connecting ", strerror(errno));
_conn->error(_conn);
}
}
void
tcp_conn::error()
{
if(_conn)
{
llarp::LogError("tcp_conn error: ", strerror(errno));
if(_conn->error)
_conn->error(_conn);
}
}
int
tcp_serv::read(void*, size_t)
{
@ -60,24 +107,16 @@ namespace llarp
llarp::LogError("failed to accept on ", fd, ":", strerror(errno));
return -1;
}
llarp_tcp_conn* conn = new llarp_tcp_conn;
// zero out callbacks
conn->tick = nullptr;
conn->closed = nullptr;
conn->read = nullptr;
// build handler
llarp::tcp_conn* connimpl = new tcp_conn(new_fd, conn);
conn->impl = connimpl;
conn->loop = loop;
llarp::tcp_conn* connimpl = new tcp_conn(loop, new_fd);
if(loop->add_ev(connimpl, true))
{
// call callback
if(tcp->accepted)
tcp->accepted(tcp, conn);
tcp->accepted(tcp, &connimpl->tcp);
return 0;
}
// cleanup error
delete conn;
delete connimpl;
return -1;
}
@ -233,6 +272,30 @@ struct llarp_epoll_loop : public llarp_ev_loop
{
}
llarp::ev_io*
tcp_connect(struct llarp_tcp_connecter* tcp, const sockaddr* remoteaddr)
{
// create socket
int fd = ::socket(remoteaddr->sa_family, SOCK_STREAM, 0);
if(fd == -1)
return nullptr;
// set non blocking
int flags = fcntl(fd, F_GETFL, 0);
if(flags == -1)
{
::close(fd);
return nullptr;
}
if(fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
{
::close(fd);
return nullptr;
}
llarp::tcp_conn* conn = new llarp::tcp_conn(this, fd, remoteaddr, tcp);
conn->connect();
return conn;
}
llarp::ev_io*
bind_tcp(llarp_tcp_acceptor* tcp, const sockaddr* bindaddr)
{
@ -258,9 +321,7 @@ struct llarp_epoll_loop : public llarp_ev_loop
::close(fd);
return nullptr;
}
llarp::ev_io* serv = new llarp::tcp_serv(this, fd, tcp);
tcp->impl = serv;
return serv;
return new llarp::tcp_serv(this, fd, tcp);
}
virtual bool
@ -306,13 +367,20 @@ struct llarp_epoll_loop : public llarp_ev_loop
llarp::ev_io* ev = static_cast< llarp::ev_io* >(events[idx].data.ptr);
if(ev)
{
if(events[idx].events & EPOLLIN)
if(events[idx].events & EPOLLERR)
{
ev->read(readbuf, sizeof(readbuf));
ev->error();
}
if(events[idx].events & EPOLLOUT)
else
{
ev->flush_write();
if(events[idx].events & EPOLLIN)
{
ev->read(readbuf, sizeof(readbuf));
}
if(events[idx].events & EPOLLOUT)
{
ev->flush_write();
}
}
}
++idx;
@ -339,13 +407,20 @@ struct llarp_epoll_loop : public llarp_ev_loop
llarp::ev_io* ev = static_cast< llarp::ev_io* >(events[idx].data.ptr);
if(ev)
{
if(events[idx].events & EPOLLIN)
if(events[idx].events & EPOLLERR)
{
ev->read(readbuf, sizeof(readbuf));
ev->error();
}
if(events[idx].events & EPOLLOUT)
else
{
ev->flush_write();
if(events[idx].events & EPOLLIN)
{
ev->read(readbuf, sizeof(readbuf));
}
if(events[idx].events & EPOLLOUT)
{
ev->flush_write();
}
}
}
++idx;
@ -437,7 +512,7 @@ struct llarp_epoll_loop : public llarp_ev_loop
{
epoll_event ev;
ev.data.ptr = e;
ev.events = EPOLLIN;
ev.events = EPOLLIN | EPOLLERR;
if(write)
ev.events |= EPOLLOUT;
if(epoll_ctl(epollfd, EPOLL_CTL_ADD, e->fd, &ev) == -1)

@ -218,7 +218,7 @@ namespace llarp
const size_t offset = 0;
#endif
ssize_t ret = tuntap_read(tunif, buf, sz);
if(ret > 4 && t->recvpkt)
if(ret > offset && t->recvpkt)
t->recvpkt(t, ((byte_t*)buf) + offset, ret - offset);
return ret;
}

@ -9,11 +9,101 @@ namespace llarp
namespace rpc
{
#ifdef USE_ABYSS
struct Handler : public ::abyss::http::IRPCHandler
struct CallerHandler : public ::abyss::http::IRPCClientHandler
{
CallerHandler(::abyss::http::ConnImpl* impl)
: ::abyss::http::IRPCClientHandler(impl)
{
}
~CallerHandler()
{
}
void
PopulateReqHeaders(abyss::http::Headers_t& hdr)
{
}
};
struct VerifyRouterHandler : public CallerHandler
{
llarp::PubKey pk;
std::function< void(llarp::PubKey, bool) > handler;
~VerifyRouterHandler()
{
}
VerifyRouterHandler(::abyss::http::ConnImpl* impl, const llarp::PubKey& k,
std::function< void(llarp::PubKey, bool) > h)
: CallerHandler(impl), pk(k), handler(h)
{
}
bool
HandleResponse(const ::abyss::http::RPC_Response& response)
{
handler(pk, true);
return true;
}
void
HandleError()
{
llarp::LogInfo("failed to verify router ", pk);
handler(pk, false);
}
};
struct CallerImpl : public ::abyss::http::JSONRPC
{
llarp_router* router;
CallerImpl(llarp_router* r) : ::abyss::http::JSONRPC(), router(r)
{
}
void
Tick()
{
Flush();
}
bool
Start(const std::string& remote)
{
return RunAsync(router->netloop, remote);
}
abyss::http::IRPCClientHandler*
NewConn(PubKey k, std::function< void(llarp::PubKey, bool) > handler,
abyss::http::ConnImpl* impl)
{
return new VerifyRouterHandler(impl, k, handler);
}
void
AsyncVerifyRouter(llarp::PubKey pk,
std::function< void(llarp::PubKey, bool) > handler)
{
abyss::json::Value params;
params.SetObject();
QueueRPC("get_service_node", std::move(params),
std::bind(&CallerImpl::NewConn, this, pk, handler,
std::placeholders::_1));
}
~CallerImpl()
{
}
};
struct Handler : public ::abyss::httpd::IRPCHandler
{
llarp_router* router;
Handler(::abyss::http::ConnImpl* conn, llarp_router* r)
: ::abyss::http::IRPCHandler(conn), router(r)
Handler(::abyss::httpd::ConnImpl* conn, llarp_router* r)
: ::abyss::httpd::IRPCHandler(conn), router(r)
{
}
@ -59,15 +149,15 @@ namespace llarp
}
};
struct ReqHandlerImpl : public ::abyss::http::BaseReqHandler
struct ReqHandlerImpl : public ::abyss::httpd::BaseReqHandler
{
ReqHandlerImpl(llarp_router* r, llarp_time_t reqtimeout)
: ::abyss::http::BaseReqHandler(reqtimeout), router(r)
: ::abyss::httpd::BaseReqHandler(reqtimeout), router(r)
{
}
llarp_router* router;
::abyss::http::IRPCHandler*
CreateHandler(::abyss::http::ConnImpl* conn) const
::abyss::httpd::IRPCHandler*
CreateHandler(::abyss::httpd::ConnImpl* conn) const
{
return new Handler(conn, router);
}
@ -115,8 +205,61 @@ namespace llarp
return true;
}
};
struct CallerImpl
{
CallerImpl(llarp_router* r)
{
}
~CallerImpl()
{
}
bool
Start(const std::string&)
{
return true;
}
void
Tick()
{
}
void
AsyncVerifyRouter(llarp::PubKey pk,
std::function< void(llarp::PubKey, bool) > result)
{
// always allow routers when not using libabyss
result(pk, true);
}
};
#endif
Caller::Caller(llarp_router* r) : m_Impl(new CallerImpl(r))
{
}
Caller::~Caller()
{
delete m_Impl;
}
bool
Caller::Start(const std::string& addr)
{
return m_Impl->Start(addr);
}
void
Caller::AsyncVerifyRouter(
llarp::PubKey pk, std::function< void(llarp::PubKey, bool) > handler)
{
m_Impl->AsyncVerifyRouter(pk, handler);
}
Server::Server(llarp_router* r) : m_Impl(new ServerImpl(r))
{
}

@ -54,6 +54,29 @@ TEST_F(KademliaDHTTest, TestBucketFindClosest)
ASSERT_TRUE(oldResult == result);
};
TEST_F(KademliaDHTTest, TestBucketOperators)
{
llarp::dht::Key_t zero;
llarp::dht::Key_t one;
llarp::dht::Key_t three;
zero.Zero();
one.Fill(1);
three.Fill(3);
ASSERT_TRUE(zero < one);
ASSERT_TRUE(zero < three);
ASSERT_FALSE(zero > one);
ASSERT_FALSE(zero > three);
ASSERT_TRUE(zero != three);
ASSERT_FALSE(zero == three);
ASSERT_TRUE((zero ^ one) == one);
ASSERT_TRUE(one < three);
ASSERT_TRUE(three > one);
ASSERT_TRUE(one != three);
ASSERT_FALSE(one == three);
ASSERT_TRUE((one ^ three) == (three ^ one));
};
TEST_F(KademliaDHTTest, TestBucketRandomzied)
{
size_t moreNodes = 100;
@ -67,5 +90,9 @@ TEST_F(KademliaDHTTest, TestBucketRandomzied)
llarp::dht::Key_t target;
llarp::dht::Key_t oldResult;
target.Randomize();
oldResult = result;
ASSERT_TRUE(nodes->FindClosest(target, result));
ASSERT_TRUE((result ^ target) < (oldResult ^ target));
ASSERT_TRUE((result ^ target) != (oldResult ^ target));
ASSERT_FALSE((result ^ target) == (oldResult ^ target));
};

Loading…
Cancel
Save