Remove libabyss and rpc::Caller/rpc::Server

pull/1306/head
Stephen Shelton 4 years ago committed by Jeff Becker
parent c67db46b12
commit 11951510bf
No known key found for this signature in database
GPG Key ID: F357B3B42F6F9B05

@ -317,7 +317,6 @@ endif()
add_subdirectory(crypto)
add_subdirectory(llarp)
add_subdirectory(libabyss)
add_subdirectory(daemon)
if(WITH_HIVE)
add_subdirectory(pybind)

@ -146,7 +146,6 @@ TARGETS = $(REPO)/lokinet
SIGS = $(TARGETS:=.sig)
EXE = $(BUILD_ROOT)/daemon/lokinet
TEST_EXE = $(BUILD_ROOT)/test/testAll
ABYSS_EXE = $(BUILD_ROOT)/abyss-main
LINT_FILES = $(wildcard llarp/*.cpp)
@ -311,7 +310,7 @@ mac: mac-release
$(MAKE) -C '$(BUILD_ROOT)' package
format:
$(FORMAT) -i $$(find jni daemon llarp include libabyss pybind | grep -E '\.[hc](pp)?$$')
$(FORMAT) -i $$(find jni daemon llarp include pybind | grep -E '\.[h,c](pp)?$$')
format-verify: format
(type $(FORMAT))

@ -1,20 +0,0 @@
add_library(abyss
src/md5.cpp
src/http.cpp
src/client.cpp
src/server.cpp)
target_include_directories(abyss PUBLIC include)
target_link_libraries(abyss PUBLIC lokinet-platform)
enable_lto(abyss)
# for freebsd
if(CMAKE_SYSTEM_NAME MATCHES "FreeBSD")
target_include_directories(abyss SYSTEM PUBLIC /usr/local/include)
endif()
if(BUILD_SHARED_LIBS)
install(TARGETS abyss LIBRARY DESTINATION lib)
endif()
add_log_tag(abyss)

Binary file not shown.

Before

Width:  |  Height:  |  Size: 26 KiB

@ -1,127 +0,0 @@
#ifndef ABYSS_CLIENT_HPP
#define ABYSS_CLIENT_HPP
#include <ev/ev.h>
#include <net/ip_address.hpp>
#include <util/json.hpp>
#include <abyss/http.hpp>
#include <deque>
#include <functional>
#include <list>
#include <memory>
#include <string>
#include <unordered_map>
#include <atomic>
namespace abyss
{
namespace http
{
using RPC_Method_t = std::string;
using RPC_Params = nlohmann::json;
using RPC_Response = nlohmann::json;
using Headers_t = std::unordered_multimap<std::string, std::string>;
using Response = RequestHeader;
struct ConnImpl;
/// jsonrpc response handler for client
struct IRPCClientHandler
{
IRPCClientHandler(ConnImpl* impl);
virtual ~IRPCClientHandler();
/// handle response from rpc server
/// return true on successful handling
/// return false on errors while handling
virtual bool
HandleResponse(RPC_Response response) = 0;
/// populate http request headers
virtual void
PopulateReqHeaders(Headers_t& hdr) = 0;
/// handle fatal internal error while doing request
virtual void
HandleError() = 0;
/// return true if we should close
bool
ShouldClose() const;
/// close underlying connection
void
Close() const;
private:
ConnImpl* m_Impl;
};
/// jsonrpc client
struct JSONRPC
{
using HandlerFactory = std::function<IRPCClientHandler*(ConnImpl*)>;
JSONRPC();
~JSONRPC();
/// start runing on event loop async
/// return true on success otherwise return false
bool
RunAsync(llarp_ev_loop_ptr loop, const llarp::IpAddress& endpoint);
/// must be called after RunAsync returns true
/// queue a call for rpc
void
QueueRPC(RPC_Method_t method, RPC_Params params, HandlerFactory createHandler);
/// drop all pending calls on the floor
void
DropAllCalls();
/// close all connections and stop operation
void
Stop();
/// handle new outbound connection
void
Connected(llarp_tcp_conn* conn);
/// flush queued rpc calls
void
Flush();
std::string username;
std::string password;
private:
struct Call
{
Call(RPC_Method_t&& m, RPC_Params&& p, HandlerFactory&& f)
: method(std::move(m)), params(std::move(p)), createHandler(std::move(f))
{
}
RPC_Method_t method;
RPC_Params params;
HandlerFactory createHandler;
};
static void
OnConnected(llarp_tcp_connecter* connect, llarp_tcp_conn* conn);
static void
OnConnectFail(llarp_tcp_connecter* connect);
static void
OnTick(llarp_tcp_connecter* connect);
std::atomic<bool> m_Run;
llarp_tcp_connecter m_connect;
llarp_ev_loop_ptr m_Loop;
std::deque<Call> m_PendingCalls;
std::list<std::unique_ptr<IRPCClientHandler>> m_Conns;
};
} // namespace http
} // namespace abyss
#endif

@ -1,37 +0,0 @@
#ifndef ABYSS_HTTP_HPP
#define ABYSS_HTTP_HPP
#include <util/json.hpp>
#include <string>
#include <string_view>
#include <unordered_map>
namespace abyss
{
namespace http
{
struct RequestHeader
{
using Headers_t = std::unordered_multimap<std::string, std::string>;
Headers_t Headers;
std::string Method;
std::string Path;
};
struct HeaderReader
{
RequestHeader Header;
virtual ~HeaderReader()
{
}
bool
ProcessHeaderLine(std::string_view line, bool& done);
virtual bool
ShouldProcessHeader(std::string_view line) const = 0;
};
} // namespace http
} // namespace abyss
#endif

@ -1,39 +0,0 @@
#ifndef MD5_HPP
#define MD5_HPP
#include <array>
#include <algorithm>
#include <string>
struct MD5
{
MD5();
void
Update(const unsigned char* ptr, uint32_t len);
void
Final(uint8_t* digest);
uint32_t i[2]; /* number of _bits_ handled mod 2^64 */
uint32_t buf[4]; /* scratch buffer */
unsigned char in[64]; /* input buffer */
/// do md5(str) and return hex encoded digest
static std::string
SumHex(const std::string& str)
{
std::array<uint8_t, 16> digest;
auto dist = str.size();
MD5 m;
m.Update((const unsigned char*)str.c_str(), dist);
m.Final(digest.data());
std::string hex;
std::for_each(digest.begin(), digest.end(), [&hex](const unsigned char& ch) {
char tmpbuf[4] = {0};
std::snprintf(tmpbuf, sizeof(tmpbuf), "%.2x", ch);
hex += std::string(tmpbuf);
});
return hex;
}
};
#endif

@ -1,94 +0,0 @@
#ifndef ABYSS_SERVER_HPP
#define ABYSS_SERVER_HPP
#include <ev/ev.h>
#include <net/sock_addr.hpp>
#include <util/json.hpp>
#include <util/thread/logic.hpp>
#include <util/time.hpp>
#include <nlohmann/json.hpp>
#include <list>
#include <memory>
#include <string>
#include <unordered_map>
namespace abyss
{
namespace httpd
{
struct ConnImpl;
struct IRPCHandler
{
using Method_t = std::string;
using Params = nlohmann::json;
using Response = nlohmann::json;
IRPCHandler(ConnImpl* impl);
virtual Response
HandleJSONRPC(Method_t method, const Params& params) = 0;
virtual ~IRPCHandler();
bool
ShouldClose(llarp_time_t now) const;
/// return true if the host header is correct
virtual bool
ValidateHost(const std::string& host) const = 0;
private:
ConnImpl* m_Impl;
};
struct BaseReqHandler
{
BaseReqHandler(llarp_time_t req_timeout);
virtual ~BaseReqHandler();
bool
ServeAsync(
llarp_ev_loop_ptr loop,
std::shared_ptr<llarp::Logic> logic,
const llarp::SockAddr& bindaddr);
void
RemoveConn(IRPCHandler* handler);
/// close the handler and acceptor
void
Close();
llarp_time_t
now() const
{
return llarp_ev_loop_time_now_ms(m_loop);
}
protected:
virtual IRPCHandler*
CreateHandler(ConnImpl* connimpl) = 0;
private:
static void
OnTick(llarp_tcp_acceptor*);
void
Tick();
static void
OnAccept(struct llarp_tcp_acceptor*, struct llarp_tcp_conn*);
llarp_ev_loop_ptr m_loop;
std::shared_ptr<llarp::Logic> m_Logic;
llarp_tcp_acceptor m_acceptor;
std::list<std::unique_ptr<IRPCHandler>> m_Conns;
llarp_time_t m_ReqTimeout;
};
} // namespace httpd
} // namespace abyss
#endif

@ -1,19 +0,0 @@
#ifndef __LIB_ABYSS_HPP__
#define __LIB_ABYSS_HPP__
#include <abyss/server.hpp>
#include <abyss/client.hpp>
namespace abyss
{
struct Globals
{
Globals()
{
}
~Globals()
{
}
};
} // namespace abyss
#endif

@ -1,148 +0,0 @@
#include <libabyss.hpp>
#include <net/net.hpp>
#include <absl/synchronization/mutex.h>
#ifndef _WIN32
#include <signal.h>
#endif
struct DemoHandler : public abyss::httpd::IRPCHandler
{
DemoHandler(abyss::httpd::ConnImpl* impl) : abyss::httpd::IRPCHandler(impl)
{
}
nonstd::optional<Response>
HandleJSONRPC(Method_t method, const Params& /*params*/) override
{
llarp::LogInfo("method: ", method);
return nonstd::make_optional(Response::object());
}
};
struct DemoCall : public abyss::http::IRPCClientHandler
{
std::function<void(void)> m_Callback;
std::shared_ptr<llarp::Logic> m_Logic;
DemoCall(
abyss::http::ConnImpl* impl,
std::shared_ptr<llarp::Logic> logic,
std::function<void(void)> callback)
: abyss::http::IRPCClientHandler(impl), m_Callback(callback), m_Logic(logic)
{
llarp::LogInfo("new call");
}
bool HandleResponse(abyss::http::RPC_Response) override
{
llarp::LogInfo("response get");
LogicCall(m_Logic, m_Callback);
return true;
}
void
PopulateReqHeaders(ABSL_ATTRIBUTE_UNUSED abyss::http::Headers_t& hdr) override
{
}
void
HandleError() override
{
llarp::LogError("error while handling call: ", strerror(errno));
}
};
struct DemoClient : public abyss::http::JSONRPC
{
llarp_ev_loop_ptr m_Loop;
std::shared_ptr<llarp::Logic> m_Logic;
DemoClient(llarp_ev_loop_ptr l, std::shared_ptr<llarp::Logic> logic)
: abyss::http::JSONRPC(), m_Loop(std::move(l)), m_Logic(logic)
{
}
abyss::http::IRPCClientHandler*
NewConn(abyss::http::ConnImpl* impl)
{
return new DemoCall(impl, m_Logic, std::bind(&llarp_ev_loop_stop, m_Loop));
}
void
DoDemoRequest()
{
QueueRPC(
"test",
nlohmann::json::object(),
std::bind(&DemoClient::NewConn, this, std::placeholders::_1));
Flush();
}
};
struct DemoServer : public abyss::httpd::BaseReqHandler
{
DemoServer() : abyss::httpd::BaseReqHandler(1000)
{
}
abyss::httpd::IRPCHandler*
CreateHandler(abyss::httpd::ConnImpl* impl)
{
return new DemoHandler(impl);
}
};
int
main(ABSL_ATTRIBUTE_UNUSED int argc, ABSL_ATTRIBUTE_UNUSED char* argv[])
{
// Ignore on Windows, we don't even get SIGPIPE (even though native *and*
// emulated UNIX pipes exist - CreatePipe(2), pipe(3))
// Microsoft libc only covers six signals
#ifndef _WIN32
signal(SIGPIPE, SIG_IGN);
#else
WSADATA wsockd;
int err;
err = ::WSAStartup(MAKEWORD(2, 2), &wsockd);
if (err)
{
perror("Failed to start Windows Sockets");
return err;
}
#endif
#ifdef LOKINET_DEBUG
absl::SetMutexDeadlockDetectionMode(absl::OnDeadlockCycle::kAbort);
#endif
llarp::SetLogLevel(llarp::eLogDebug);
// Now that libuv is the single non-Windows event loop impl, we can
// go back to using the normal function
llarp_ev_loop_ptr loop = llarp_make_ev_loop();
auto logic = std::make_shared<llarp::Logic>();
sockaddr_in addr;
addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
addr.sin_port = htons(1222);
addr.sin_family = AF_INET;
DemoServer serv;
DemoClient client(loop, logic);
llarp::Addr a(addr);
while (true)
{
llarp::LogInfo("bind to ", a);
if (serv.ServeAsync(loop, logic, a))
{
client.RunAsync(loop, a.ToString());
client.DoDemoRequest();
llarp_ev_loop_run_single_process(loop, logic);
return 0;
}
else
{
llarp::LogError("Failed to serve: ", strerror(errno));
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}
return 0;
}

@ -1,8 +0,0 @@
# libabyss
![abyss.jpeg](abyss.jpeg)
http client/server with multiple layers of the abysmal hellscape of the bottomless pit of standards by the w3c encapsulated in a neat standalone little library.
Try not to die.

@ -1,472 +0,0 @@
#include <abyss/client.hpp>
#include <abyss/http.hpp>
#include <abyss/md5.hpp>
#include <crypto/crypto.hpp>
#include <util/buffer.hpp>
#include <util/logging/logger.hpp>
namespace abyss
{
namespace http
{
using namespace std::literals;
namespace json = llarp::json;
struct ConnImpl : HeaderReader
{
llarp_tcp_conn* m_Conn;
JSONRPC* m_Parent;
nlohmann::json m_RequestBody;
Headers_t m_SendHeaders;
IRPCClientHandler* handler;
std::unique_ptr<json::IParser> m_BodyParser;
nlohmann::json m_Response;
uint16_t m_AuthTries;
bool m_ShouldAuth;
enum State
{
eInitial,
eReadStatusLine,
eReadResponseHeader,
eReadResponseBody,
eCloseMe
};
State state;
ConnImpl(
llarp_tcp_conn* conn,
JSONRPC* parent,
const RPC_Method_t& method,
const RPC_Params& params,
JSONRPC::HandlerFactory factory)
: m_Conn(conn)
, m_Parent(parent)
, m_RequestBody(nlohmann::json::object())
, m_Response(nlohmann::json::object())
, m_AuthTries(0)
, m_ShouldAuth(false)
, state(eInitial)
{
conn->user = this;
conn->closed = &ConnImpl::OnClosed;
conn->read = &ConnImpl::OnRead;
conn->tick = &ConnImpl::OnTick;
handler = factory(this);
m_RequestBody["jsonrpc"] = "2.0";
llarp::AlignedBuffer<8> p;
p.Randomize();
m_RequestBody["id"] = p.ToHex();
m_RequestBody["method"] = method;
m_RequestBody["params"] = params;
}
static void
OnClosed(llarp_tcp_conn* conn)
{
llarp::LogDebug("connection closed");
ConnImpl* self = static_cast<ConnImpl*>(conn->user);
self->state = eCloseMe;
}
static void
OnRead(llarp_tcp_conn* conn, const llarp_buffer_t& buf)
{
ConnImpl* self = static_cast<ConnImpl*>(conn->user);
if (!self->ProcessRead((const char*)buf.base, buf.sz))
{
self->CloseError("on read failed");
}
}
static void
OnTick(llarp_tcp_conn* /*conn*/)
{
}
bool
ProcessStatusLine(std::string_view line)
{
auto idx = line.find_first_of(' ');
if (idx == std::string_view::npos)
return false;
std::string_view codePart = line.substr(1 + idx);
idx = codePart.find_first_of(' ');
if (idx == std::string_view::npos)
return false;
return HandleStatusCode(codePart.substr(0, idx));
}
bool
ShouldProcessHeader(std::string_view name) const
{
return name == "content-length"sv || name == "content-type"sv
|| name == "www-authenticate"sv;
}
/// return true if we get a 200 status code
bool
HandleStatusCode(std::string_view code)
{
if (code == "200"sv)
return true;
if (code == "401"sv)
{
m_ShouldAuth = true;
return true;
}
return false;
}
bool
RetryWithAuth(const std::string& auth)
{
m_ShouldAuth = false;
auto idx = auth.find_first_of(' ');
if (idx == std::string::npos)
return false;
std::istringstream info(auth.substr(1 + idx));
std::unordered_map<std::string, std::string> opts;
std::string part;
while (std::getline(info, part, ','))
{
idx = part.find_first_of('=');
if (idx == std::string::npos)
return false;
std::string k = part.substr(0, idx);
std::string val;
++idx;
while (idx < part.size())
{
const char ch = part.at(idx);
val += ch;
++idx;
}
opts[k] = val;
}
auto itr = opts.find("algorithm");
if (itr != opts.end() && itr->second == "MD5-sess")
return false;
std::stringstream authgen;
auto strip = [&opts](const std::string& name) -> std::string {
std::string val;
std::for_each(opts[name].begin(), opts[name].end(), [&val](const char& ch) {
if (ch != '"')
val += ch;
});
return val;
};
const auto realm = strip("realm");
const auto nonce = strip("nonce");
const auto qop = strip("qop");
std::string nonceCount = "0000000" + std::to_string(m_AuthTries);
std::string str = m_Parent->username + ":" + realm + ":" + m_Parent->password;
std::string h1 = MD5::SumHex(str);
str = "POST:/json_rpc";
std::string h2 = MD5::SumHex(str);
llarp::AlignedBuffer<8> n;
n.Randomize();
std::string cnonce = n.ToHex();
str = h1 + ":" + nonce + ":" + nonceCount + ":" + cnonce + ":" + qop + ":" + h2;
auto responseH = MD5::SumHex(str);
authgen << "Digest username=\"" << m_Parent->username + "\", realm=\"" << realm
<< "\", uri=\"/json_rpc\", algorithm=MD5, qop=auth, nonce=\"" << nonce
<< "\", response=\"" << responseH << "\", nc=" << nonceCount << ", cnonce=\""
<< cnonce << "\"";
for (const auto& opt : opts)
{
if (opt.first == "algorithm" || opt.first == "realm" || opt.first == "qop"
|| opt.first == "nonce" || opt.first == "stale")
continue;
authgen << ", " << opt.first << "=" << opt.second;
}
m_SendHeaders.clear();
m_SendHeaders.emplace("Host", "localhost");
m_SendHeaders.emplace("Authorization", authgen.str());
SendRequest();
return true;
}
bool
ProcessBody(const char* buf, size_t sz)
{
// we got 401 ?
if (m_ShouldAuth && m_AuthTries < 9)
{
m_AuthTries++;
auto range = Header.Headers.equal_range("www-authenticate");
auto itr = range.first;
while (itr != range.second)
{
if (RetryWithAuth(itr->second))
return true;
else
++itr;
}
return false;
}
// init parser
if (m_BodyParser == nullptr)
{
size_t contentSize = 0;
auto itr = Header.Headers.find("content-length");
// no content-length header
if (itr == Header.Headers.end())
return false;
contentSize = std::stoul(itr->second);
m_BodyParser.reset(json::MakeParser(contentSize));
}
if (m_BodyParser && m_BodyParser->FeedData(buf, sz))
{
switch (m_BodyParser->Parse(m_Response))
{
case json::IParser::eNeedData:
return true;
case json::IParser::eDone:
handler->HandleResponse(std::move(m_Response));
Close();
return true;
case json::IParser::eParseError:
CloseError("json parse error");
return true;
default:
return false;
}
}
else
return false;
}
bool
ProcessRead(const char* buf, size_t sz)
{
if (state == eInitial)
return true;
if (!sz)
return true;
bool done = false;
while (state < eReadResponseBody)
{
const char* end = strstr(buf, "\r\n");
if (!end)
return false;
std::string_view line(buf, end - buf);
switch (state)
{
case eReadStatusLine:
if (!ProcessStatusLine(line))
return false;
sz -= line.size() + (2 * sizeof(char));
state = eReadResponseHeader;
break;
case eReadResponseHeader:
if (!ProcessHeaderLine(line, done))
return false;
sz -= line.size() + (2 * sizeof(char));
if (done)
state = eReadResponseBody;
break;
default:
break;
}
buf = end + (2 * sizeof(char));
end = strstr(buf, "\r\n");
}
if (state == eReadResponseBody)
return ProcessBody(buf, sz);
return state == eCloseMe;
}
bool
ShouldClose() const
{
return state == eCloseMe;
}
void
CloseError(const char* msg)
{
LogError("CloseError: ", msg);
if (handler)
handler->HandleError();
handler = nullptr;
Close();
}
void
Close()
{
if (m_Conn)
llarp_tcp_conn_close(m_Conn);
m_Conn = nullptr;
}
void
SendRequest()
{
// populate request headers
handler->PopulateReqHeaders(m_SendHeaders);
// create request body
std::string body;
std::stringstream ss;
body = m_RequestBody.dump();
m_SendHeaders.emplace("Host", "localhost");
m_SendHeaders.emplace("Content-Type", "application/json");
m_SendHeaders.emplace("Content-Length", std::to_string(body.size()));
m_SendHeaders.emplace("Accept", "application/json");
std::stringstream request;
request << "POST /json_rpc HTTP/1.1\r\n";
for (const auto& item : m_SendHeaders)
request << item.first << ": " << item.second << "\r\n";
request << "\r\n" << body;
std::string buf = request.str();
if (!llarp_tcp_conn_async_write(m_Conn, llarp_buffer_t(buf.c_str(), buf.size())))
{
CloseError("failed to write request");
return;
}
llarp::LogDebug("request sent");
state = eReadStatusLine;
}
};
void
JSONRPC::Flush()
{
/// close idle connections
auto itr = m_Conns.begin();
while (itr != m_Conns.end())
{
if ((*itr)->ShouldClose())
{
(*itr)->Close();
itr = m_Conns.erase(itr);
}
else
++itr;
}
// open at most 10 connections
size_t numCalls = std::min(m_PendingCalls.size(), (size_t)10UL);
llarp::LogDebug("tick connect to rpc ", numCalls, " times");
while (numCalls--)
{
llarp_tcp_async_try_connect(m_Loop.get(), &m_connect);
}
}
IRPCClientHandler::IRPCClientHandler(ConnImpl* impl) : m_Impl(impl)
{
}
bool
IRPCClientHandler::ShouldClose() const
{
return m_Impl && m_Impl->ShouldClose();
}
void
IRPCClientHandler::Close() const
{
if (m_Impl)
m_Impl->Close();
}
IRPCClientHandler::~IRPCClientHandler()
{
if (m_Impl)
delete m_Impl;
}
JSONRPC::JSONRPC()
{
m_Run.store(true);
}
JSONRPC::~JSONRPC()
{
}
void
JSONRPC::QueueRPC(RPC_Method_t method, RPC_Params params, HandlerFactory createHandler)
{
if (m_Run)
m_PendingCalls.emplace_back(std::move(method), std::move(params), std::move(createHandler));
}
bool
JSONRPC::RunAsync(llarp_ev_loop_ptr loop, const llarp::IpAddress& remote)
{
m_connect.remote = remote;
// TODO: ipv6
m_connect.connected = &JSONRPC::OnConnected;
m_connect.error = &JSONRPC::OnConnectFail;
m_connect.user = this;
m_connect.af = AF_INET;
m_Loop = std::move(loop);
return true;
}
void
JSONRPC::OnConnectFail(llarp_tcp_connecter* tcp)
{
JSONRPC* self = static_cast<JSONRPC*>(tcp->user);
llarp::LogError("failed to connect to RPC, dropped all pending calls");
self->DropAllCalls();
}
void
JSONRPC::OnConnected(llarp_tcp_connecter* tcp, llarp_tcp_conn* conn)
{
JSONRPC* self = static_cast<JSONRPC*>(tcp->user);
llarp::LogDebug("connected to RPC");
self->Connected(conn);
}
void
JSONRPC::Connected(llarp_tcp_conn* conn)
{
if (!m_Run)
{
llarp_tcp_conn_close(conn);
return;
}
auto& front = m_PendingCalls.front();
ConnImpl* connimpl =
new ConnImpl(conn, this, front.method, front.params, front.createHandler);
m_PendingCalls.pop_front();
m_Conns.emplace_back(connimpl->handler);
connimpl->SendRequest();
}
void
JSONRPC::Stop()
{
m_Run.store(false);
DropAllCalls();
}
void
JSONRPC::DropAllCalls()
{
while (m_PendingCalls.size())
{
auto& front = m_PendingCalls.front();
IRPCClientHandler* h = front.createHandler(nullptr);
h->HandleError();
delete h;
m_PendingCalls.pop_front();
}
}
} // namespace http
} // namespace abyss

@ -1,40 +0,0 @@
#include <abyss/http.hpp>
#include <util/str.hpp>
#include <algorithm>
namespace abyss
{
namespace http
{
bool
HeaderReader::ProcessHeaderLine(std::string_view line, bool& done)
{
if (line.size() == 0)
{
done = true;
return true;
}
auto idx = line.find_first_of(':');
if (idx == std::string_view::npos)
return false;
std::string_view header = line.substr(0, idx);
std::string_view val = line.substr(1 + idx);
// to lowercase
std::string lowerHeader;
lowerHeader.reserve(header.size());
auto itr = header.begin();
while (itr != header.end())
{
lowerHeader += std::tolower(*itr);
++itr;
}
if (ShouldProcessHeader(lowerHeader))
{
val = val.substr(val.find_first_not_of(' '));
Header.Headers.emplace(std::move(lowerHeader), val);
}
return true;
}
} // namespace http
} // namespace abyss

@ -1,232 +0,0 @@
#include <abyss/md5.hpp>
using UINT4 = uint32_t;
/* forward declaration */
void
Transform(uint32_t* buf, uint32_t* in);
static unsigned char PADDING[64] = {
0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00};
/* F, G and H are basic MD5 functions: selection, majority, parity */
#define F(x, y, z) (((x) & (y)) | ((~x) & (z)))
#define G(x, y, z) (((x) & (z)) | ((y) & (~z)))
#define H(x, y, z) ((x) ^ (y) ^ (z))
#define I(x, y, z) ((y) ^ ((x) | (~z)))
/* ROTATE_LEFT rotates x left n bits */
#define ROTATE_LEFT(x, n) (((x) << (n)) | ((x) >> (32 - (n))))
/* FF, GG, HH, and II transformations for rounds 1, 2, 3, and 4 */
/* Rotation is separate from addition to prevent recomputation */
#define FF(a, b, c, d, x, s, ac) \
{ \
(a) += F((b), (c), (d)) + (x) + (UINT4)(ac); \
(a) = ROTATE_LEFT((a), (s)); \
(a) += (b); \
}
#define GG(a, b, c, d, x, s, ac) \
{ \
(a) += G((b), (c), (d)) + (x) + (UINT4)(ac); \
(a) = ROTATE_LEFT((a), (s)); \
(a) += (b); \
}
#define HH(a, b, c, d, x, s, ac) \
{ \
(a) += H((b), (c), (d)) + (x) + (UINT4)(ac); \
(a) = ROTATE_LEFT((a), (s)); \
(a) += (b); \
}
#define II(a, b, c, d, x, s, ac) \
{ \
(a) += I((b), (c), (d)) + (x) + (UINT4)(ac); \
(a) = ROTATE_LEFT((a), (s)); \
(a) += (b); \
}
MD5::MD5()
{
i[0] = i[1] = (UINT4)0;
/* Load magic initialization constants.
*/
buf[0] = (UINT4)0x67452301;
buf[1] = (UINT4)0xefcdab89;
buf[2] = (UINT4)0x98badcfe;
buf[3] = (UINT4)0x10325476;
}
void
MD5::Update(const unsigned char* inBuf, uint32_t inLen)
{
UINT4 input[16];
int mdi;
/* compute number of bytes mod 64 */
mdi = (int)((this->i[0] >> 3) & 0x3F);
/* update number of bits */
if ((this->i[0] + ((UINT4)inLen << 3)) < this->i[0])
this->i[1]++;
this->i[0] += ((UINT4)inLen << 3);
this->i[1] += ((UINT4)inLen >> 29);
while (inLen--)
{
/* add new character to buffer, increment mdi */
in[mdi++] = *inBuf++;
/* transform if necessary */
if (mdi == 0x40)
{
for (unsigned int j = 0, jj = 0; j < 16; j++, jj += 4)
{
input[j] = (((UINT4)in[jj + 3]) << 24) | (((UINT4)in[jj + 2]) << 16)
| (((UINT4)in[jj + 1]) << 8) | ((UINT4)in[jj]);
}
Transform(this->buf, input);
mdi = 0;
}
}
}
void
MD5::Final(uint8_t* digest)
{
UINT4 input[16];
int mdi;
unsigned int padLen;
/* save number of bits */
input[14] = i[0];
input[15] = i[1];
/* compute number of bytes mod 64 */
mdi = (int)((i[0] >> 3) & 0x3F);
/* pad out to 56 mod 64 */
padLen = (mdi < 56) ? (56 - mdi) : (120 - mdi);
this->Update(PADDING, padLen);
/* append length in bits and transform */
for (unsigned int j = 0, jj = 0; j < 14; j++, jj += 4)
{
input[j] = (((UINT4)in[jj + 3]) << 24) | (((UINT4)in[jj + 2]) << 16)
| (((UINT4)in[jj + 1]) << 8) | ((UINT4)in[jj]);
}
Transform(this->buf, input);
/* store buffer in digest */
for (unsigned int j = 0, jj = 0; j < 4; j++, jj += 4)
{
digest[jj] = (unsigned char)(this->buf[j] & 0xFF);
digest[jj + 1] = (unsigned char)((this->buf[j] >> 8) & 0xFF);
digest[jj + 2] = (unsigned char)((this->buf[j] >> 16) & 0xFF);
digest[jj + 3] = (unsigned char)((this->buf[j] >> 24) & 0xFF);
}
}
/* Basic MD5 step. Transform buf based on in.
*/
void
Transform(UINT4* buf, UINT4* in)
{
UINT4 a = buf[0], b = buf[1], c = buf[2], d = buf[3];
/* Round 1 */
#define S11 7
#define S12 12
#define S13 17
#define S14 22
FF(a, b, c, d, in[0], S11, 3614090360); /* 1 */
FF(d, a, b, c, in[1], S12, 3905402710); /* 2 */
FF(c, d, a, b, in[2], S13, 606105819); /* 3 */
FF(b, c, d, a, in[3], S14, 3250441966); /* 4 */
FF(a, b, c, d, in[4], S11, 4118548399); /* 5 */
FF(d, a, b, c, in[5], S12, 1200080426); /* 6 */
FF(c, d, a, b, in[6], S13, 2821735955); /* 7 */
FF(b, c, d, a, in[7], S14, 4249261313); /* 8 */
FF(a, b, c, d, in[8], S11, 1770035416); /* 9 */
FF(d, a, b, c, in[9], S12, 2336552879); /* 10 */
FF(c, d, a, b, in[10], S13, 4294925233); /* 11 */
FF(b, c, d, a, in[11], S14, 2304563134); /* 12 */
FF(a, b, c, d, in[12], S11, 1804603682); /* 13 */
FF(d, a, b, c, in[13], S12, 4254626195); /* 14 */
FF(c, d, a, b, in[14], S13, 2792965006); /* 15 */
FF(b, c, d, a, in[15], S14, 1236535329); /* 16 */
/* Round 2 */
#define S21 5
#define S22 9
#define S23 14
#define S24 20
GG(a, b, c, d, in[1], S21, 4129170786); /* 17 */
GG(d, a, b, c, in[6], S22, 3225465664); /* 18 */
GG(c, d, a, b, in[11], S23, 643717713); /* 19 */
GG(b, c, d, a, in[0], S24, 3921069994); /* 20 */
GG(a, b, c, d, in[5], S21, 3593408605); /* 21 */
GG(d, a, b, c, in[10], S22, 38016083); /* 22 */
GG(c, d, a, b, in[15], S23, 3634488961); /* 23 */
GG(b, c, d, a, in[4], S24, 3889429448); /* 24 */
GG(a, b, c, d, in[9], S21, 568446438); /* 25 */
GG(d, a, b, c, in[14], S22, 3275163606); /* 26 */
GG(c, d, a, b, in[3], S23, 4107603335); /* 27 */
GG(b, c, d, a, in[8], S24, 1163531501); /* 28 */
GG(a, b, c, d, in[13], S21, 2850285829); /* 29 */
GG(d, a, b, c, in[2], S22, 4243563512); /* 30 */
GG(c, d, a, b, in[7], S23, 1735328473); /* 31 */
GG(b, c, d, a, in[12], S24, 2368359562); /* 32 */
/* Round 3 */
#define S31 4
#define S32 11
#define S33 16
#define S34 23
HH(a, b, c, d, in[5], S31, 4294588738); /* 33 */
HH(d, a, b, c, in[8], S32, 2272392833); /* 34 */
HH(c, d, a, b, in[11], S33, 1839030562); /* 35 */
HH(b, c, d, a, in[14], S34, 4259657740); /* 36 */
HH(a, b, c, d, in[1], S31, 2763975236); /* 37 */
HH(d, a, b, c, in[4], S32, 1272893353); /* 38 */
HH(c, d, a, b, in[7], S33, 4139469664); /* 39 */
HH(b, c, d, a, in[10], S34, 3200236656); /* 40 */
HH(a, b, c, d, in[13], S31, 681279174); /* 41 */
HH(d, a, b, c, in[0], S32, 3936430074); /* 42 */
HH(c, d, a, b, in[3], S33, 3572445317); /* 43 */
HH(b, c, d, a, in[6], S34, 76029189); /* 44 */
HH(a, b, c, d, in[9], S31, 3654602809); /* 45 */
HH(d, a, b, c, in[12], S32, 3873151461); /* 46 */
HH(c, d, a, b, in[15], S33, 530742520); /* 47 */
HH(b, c, d, a, in[2], S34, 3299628645); /* 48 */
/* Round 4 */
#define S41 6
#define S42 10
#define S43 15
#define S44 21
II(a, b, c, d, in[0], S41, 4096336452); /* 49 */
II(d, a, b, c, in[7], S42, 1126891415); /* 50 */
II(c, d, a, b, in[14], S43, 2878612391); /* 51 */
II(b, c, d, a, in[5], S44, 4237533241); /* 52 */
II(a, b, c, d, in[12], S41, 1700485571); /* 53 */
II(d, a, b, c, in[3], S42, 2399980690); /* 54 */
II(c, d, a, b, in[10], S43, 4293915773); /* 55 */
II(b, c, d, a, in[1], S44, 2240044497); /* 56 */
II(a, b, c, d, in[8], S41, 1873313359); /* 57 */
II(d, a, b, c, in[15], S42, 4264355552); /* 58 */
II(c, d, a, b, in[6], S43, 2734768916); /* 59 */
II(b, c, d, a, in[13], S44, 1309151649); /* 60 */
II(a, b, c, d, in[4], S41, 4149444226); /* 61 */
II(d, a, b, c, in[11], S42, 3174756917); /* 62 */
II(c, d, a, b, in[2], S43, 718787259); /* 63 */
II(b, c, d, a, in[9], S44, 3951481745); /* 64 */
buf[0] += a;
buf[1] += b;
buf[2] += c;
buf[3] += d;
}

@ -1,400 +0,0 @@
#include <abyss/server.hpp>
#include <abyss/http.hpp>
#include <util/buffer.hpp>
#include <util/logging/logger.hpp>
#include <util/time.hpp>
#include <algorithm>
#include <sstream>
#include <string>
#include <string_view>
#include <unordered_map>
namespace abyss
{
namespace httpd
{
using namespace std::literals;
namespace json = llarp::json;
struct ConnImpl : abyss::http::HeaderReader
{
llarp_tcp_conn* _conn;
IRPCHandler* handler;
BaseReqHandler* _parent;
llarp_time_t m_LastActive;
llarp_time_t m_ReadTimeout;
bool m_Bad;
std::unique_ptr<json::IParser> m_BodyParser;
nlohmann::json m_Request;
enum HTTPState
{
eReadHTTPMethodLine,
eReadHTTPHeaders,
eReadHTTPBody,
eWriteHTTPStatusLine,
eWriteHTTPHeaders,
eWriteHTTPBody,
eCloseMe
};
HTTPState m_State;
ConnImpl(BaseReqHandler* p, llarp_tcp_conn* c, llarp_time_t readtimeout)
: _conn(c), _parent(p)
{
handler = nullptr;
m_LastActive = p->now();
m_ReadTimeout = readtimeout;
// set up tcp members
_conn->user = this;
_conn->read = &ConnImpl::OnRead;
_conn->tick = &ConnImpl::OnTick;
_conn->closed = &ConnImpl::OnClosed;
m_Bad = false;
m_State = eReadHTTPMethodLine;
}
~ConnImpl() = default;
bool
FeedLine(std::string& line)
{
bool done = false;
switch (m_State)
{
case eReadHTTPMethodLine:
return ProcessMethodLine(line);
case eReadHTTPHeaders:
if (!ProcessHeaderLine(line, done))
return false;
if (done)
m_State = eReadHTTPBody;
return true;
default:
return false;
}
}
bool
ProcessMethodLine(std::string_view line)
{
auto idx = line.find_first_of(' ');
if (idx == std::string_view::npos)
return false;
Header.Method = std::string(line.substr(0, idx));
line = line.substr(idx + 1);
idx = line.find_first_of(' ');
if (idx == std::string_view::npos)
return false;
Header.Path = std::string(line.substr(0, idx));
m_State = eReadHTTPHeaders;
return true;
}
bool
ShouldProcessHeader(std::string_view name) const
{
// TODO: header whitelist
return name == "content-type"sv || name == "content-length"sv || name == "host"sv;
}
bool
WriteResponseSimple(
int code, const std::string& msg, const char* contentType, const char* content)
{
char buf[512] = {0};
size_t contentLength = strlen(content);
int sz = snprintf(
buf,
sizeof(buf),
"HTTP/1.0 %d %s\r\nContent-Type: "
"%s\r\nContent-Length: %zu\r\n\r\n",
code,
msg.c_str(),
contentType,
contentLength);
if (sz <= 0)
return false;
if (!llarp_tcp_conn_async_write(_conn, llarp_buffer_t(buf, sz)))
return false;
m_State = eWriteHTTPBody;
return llarp_tcp_conn_async_write(_conn, llarp_buffer_t(content, contentLength));
}
bool
FeedBody(const char* buf, size_t sz)
{
if (Header.Method != "POST")
{
return WriteResponseSimple(405, "Method Not Allowed", "text/plain", "nope");
}
{
auto itr = Header.Headers.find("content-type");
if (itr == Header.Headers.end())
{
return WriteResponseSimple(
415, "Unsupported Media Type", "text/plain", "no content type provided");
}
else if (itr->second != "application/json"sv)
{
return WriteResponseSimple(
415, "Unsupported Media Type", "text/plain", "this does not look like jsonrpc 2.0");
}
}
// initialize body parser
if (m_BodyParser == nullptr)
{
auto itr = Header.Headers.find("content-length");
if (itr == Header.Headers.end())
{
return WriteResponseSimple(400, "Bad Request", "text/plain", "no content length");
}
ssize_t contentLength = std::stoll(itr->second);
if (contentLength <= 0)
{
return WriteResponseSimple(400, "Bad Request", "text/plain", "bad content length");
}
else
{
m_BodyParser.reset(json::MakeParser(contentLength));
}
itr = Header.Headers.find("host");
if (itr == Header.Headers.end())
{
return WriteResponseSimple(400, "Bad Request", "text/plain", "no host header provided");
}
if (not handler->ValidateHost(itr->second))
{
return WriteResponseSimple(400, "Bad Request", "text/plain", "invalid host header");
}
}
if (!m_BodyParser->FeedData(buf, sz))
{
return WriteResponseSimple(400, "Bad Request", "text/plain", "invalid body size");
}
switch (m_BodyParser->Parse(m_Request))
{
case json::IParser::eNeedData:
return true;
case json::IParser::eParseError:
return WriteResponseSimple(400, "Bad Request", "text/plain", "bad json object");
case json::IParser::eDone:
if (m_Request.is_object() && m_Request.count("params") && m_Request.count("method")
&& m_Request.count("id") && m_Request["id"].is_string()
&& m_Request["method"].is_string() && m_Request["params"].is_object())
{
nlohmann::json response;
response["jsonrpc"] = "2.0";
response["id"] = m_Request["id"].get<std::string>();
auto value = handler->HandleJSONRPC(
m_Request["method"].get<std::string>(), m_Request["params"]);
if (!value.is_null())
response["result"] = std::move(value);
return WriteResponseJSON(response);
}
return WriteResponseSimple(500, "internal error", "text/plain", "nope");
default:
return false;
}
}
bool
WriteResponseJSON(const nlohmann::json& response)
{
std::string responseStr = response.dump();
return WriteResponseSimple(200, "OK", "application/json", responseStr.c_str());
}
bool
ProcessRead(const char* buf, size_t sz)
{
llarp::LogDebug("http read ", sz, " bytes");
if (m_Bad)
{
return false;
}
if (!sz)
return true;
bool done = false;
m_LastActive = _parent->now();
if (m_State < eReadHTTPBody)
{
const char* end = strstr(buf, "\r\n");
while (end)
{
std::string_view line(buf, end - buf);
switch (m_State)
{
case eReadHTTPMethodLine:
if (!ProcessMethodLine(line))
return false;
sz -= line.size() + (2 * sizeof(char));
break;
case eReadHTTPHeaders:
if (!ProcessHeaderLine(line, done))
return false;
sz -= line.size() + (2 * sizeof(char));
if (done)
m_State = eReadHTTPBody;
break;
default:
break;
}
buf = end + (2 * sizeof(char));
end = strstr(buf, "\r\n");
}
}
if (m_State == eReadHTTPBody)
return FeedBody(buf, sz);
return false;
}
static void
OnRead(llarp_tcp_conn* conn, const llarp_buffer_t& buf)
{
ConnImpl* self = static_cast<ConnImpl*>(conn->user);
if (!self->ProcessRead((const char*)buf.base, buf.sz))
self->MarkBad();
}
static void
OnClosed(llarp_tcp_conn* conn)
{
llarp::LogDebug("connection closed");
ConnImpl* self = static_cast<ConnImpl*>(conn->user);
self->_conn = nullptr;
self->m_State = eCloseMe;
}
static void
OnTick(llarp_tcp_conn* conn)
{
ConnImpl* self = static_cast<ConnImpl*>(conn->user);
self->Tick();
}
void
Tick()
{
if (m_Bad)
Close();
}
/// mark bad so next tick we are closed
void
MarkBad()
{
m_Bad = true;
}
bool
ShouldClose(llarp_time_t now) const
{
return now - m_LastActive > m_ReadTimeout || m_Bad || m_State == eCloseMe;
}
void
Close()
{
if (_conn)
{
llarp_tcp_conn_close(_conn);
_conn = nullptr;
}
}
}; // namespace http
IRPCHandler::IRPCHandler(ConnImpl* conn) : m_Impl(conn)
{
}
IRPCHandler::~IRPCHandler()
{
m_Impl->Close();
delete m_Impl;
}
bool
IRPCHandler::ShouldClose(llarp_time_t now) const
{
return m_Impl->ShouldClose(now);
}
BaseReqHandler::BaseReqHandler(llarp_time_t reqtimeout) : m_ReqTimeout(reqtimeout)
{
m_acceptor.accepted = &BaseReqHandler::OnAccept;
m_acceptor.user = this;
m_acceptor.tick = &OnTick;
m_acceptor.closed = nullptr;
}
bool
BaseReqHandler::ServeAsync(
llarp_ev_loop_ptr loop,
std::shared_ptr<llarp::Logic> logic,
const llarp::SockAddr& bindaddr)
{
m_loop = loop;
m_Logic = logic;
return llarp_tcp_serve(m_loop.get(), &m_acceptor, bindaddr);
}
void
BaseReqHandler::OnTick(llarp_tcp_acceptor* tcp)
{
BaseReqHandler* self = static_cast<BaseReqHandler*>(tcp->user);
self->Tick();
}
void
BaseReqHandler::Tick()
{
auto _now = now();
auto itr = m_Conns.begin();
while (itr != m_Conns.end())
{
if ((*itr)->ShouldClose(_now))
itr = m_Conns.erase(itr);
else
++itr;
}
}
void
BaseReqHandler::Close()
{
llarp_tcp_acceptor_close(&m_acceptor);
}
BaseReqHandler::~BaseReqHandler()
{
}
void
BaseReqHandler::OnAccept(llarp_tcp_acceptor* acceptor, llarp_tcp_conn* conn)
{
BaseReqHandler* self = static_cast<BaseReqHandler*>(acceptor->user);
ConnImpl* connimpl = new ConnImpl(self, conn, self->m_ReqTimeout);
IRPCHandler* rpcHandler = self->CreateHandler(connimpl);
if (rpcHandler == nullptr)
{
connimpl->Close();
delete connimpl;
return;
}
connimpl->handler = rpcHandler;
self->m_Conns.emplace_back(rpcHandler);
}
} // namespace httpd
} // namespace abyss

@ -174,7 +174,6 @@ add_library(liblokinet
routing/path_latency_message.cpp
routing/path_transfer_message.cpp
routing/transfer_traffic_message.cpp
rpc/rpc.cpp
service/address.cpp
service/async_key_exchange.cpp
service/context.cpp
@ -221,7 +220,7 @@ if(WITH_HIVE)
target_sources(liblokinet PRIVATE tooling/router_hive.cpp)
endif()
target_link_libraries(liblokinet PUBLIC cxxopts abyss lokinet-platform lokinet-util lokinet-cryptography)
target_link_libraries(liblokinet PUBLIC cxxopts lokinet-platform lokinet-util lokinet-cryptography)
target_link_libraries(liblokinet PRIVATE libunbound)

@ -12,7 +12,6 @@
#include <link/server.hpp>
#include <messages/link_message.hpp>
#include <net/net.hpp>
#include <rpc/rpc.hpp>
#include <stdexcept>
#include <util/buffer.hpp>
#include <util/encode.hpp>
@ -725,8 +724,6 @@ namespace llarp
_hiddenServiceContext.Tick(now);
_exitContext.Tick(now);
if (rpcCaller)
rpcCaller->Tick(now);
// save profiles
if (routerProfiling().ShouldSave(now))
{
@ -828,12 +825,6 @@ namespace llarp
{
rpcBindAddr = DefaultRPCBindAddr;
}
rpcServer = std::make_unique<rpc::Server>(this);
if (not rpcServer->Start(rpcBindAddr))
{
LogError("failed to bind jsonrpc to ", rpcBindAddr);
return false;
}
LogInfo("Bound RPC server to ", rpcBindAddr);
}
@ -848,13 +839,6 @@ namespace llarp
if (whitelistRouters)
{
rpcCaller = std::make_unique<rpc::Caller>(this);
rpcCaller->SetAuth(lokidRPCUser, lokidRPCPassword);
if (not rpcCaller->Start(lokidRPCAddr))
{
LogError("RPC Caller to ", lokidRPCAddr, " failed to start");
return false;
}
LogInfo("RPC Caller to ", lokidRPCAddr, " started");
}
@ -1054,8 +1038,6 @@ namespace llarp
#endif
hiddenServiceContext().StopAll();
_exitContext.Stop();
if (rpcServer)
rpcServer->Stop();
paths.PumpUpstream();
_linkManager.PumpLinks();
_logic->call_later(200ms, std::bind(&Router::AfterStopIssued, this));

@ -24,7 +24,6 @@
#include <router/rc_lookup_handler.hpp>
#include <routing/handler.hpp>
#include <routing/message_parser.hpp>
#include <rpc/rpc.hpp>
#include <service/context.hpp>
#include <stdexcept>
#include <util/buffer.hpp>
@ -244,12 +243,10 @@ namespace llarp
const IpAddress DefaultRPCBindAddr = IpAddress("127.0.0.1:1190");
bool enableRPCServer = false;
std::unique_ptr<rpc::Server> rpcServer;
IpAddress rpcBindAddr = DefaultRPCBindAddr;
const llarp_time_t _randomStartDelay;
/// lokid caller
std::unique_ptr<rpc::Caller> rpcCaller;
IpAddress lokidRPCAddr = IpAddress("127.0.0.1:22023");
std::string lokidRPCUser;
std::string lokidRPCPassword;

@ -1,518 +0,0 @@
#include <rpc/rpc.hpp>
#include <constants/version.hpp>
#include <router/abstractrouter.hpp>
#include <service/context.hpp>
#include <util/logging/logger.hpp>
#include <router_id.hpp>
#include <exit/context.hpp>
#include <util/encode.hpp>
#include <util/meta/memfn.hpp>
#include <libabyss.hpp>
#include <utility>
namespace llarp
{
namespace rpc
{
struct CallerHandler : public ::abyss::http::IRPCClientHandler
{
CallerImpl* m_Parent;
CallerHandler(::abyss::http::ConnImpl* impl, CallerImpl* parent)
: ::abyss::http::IRPCClientHandler(impl), m_Parent(parent)
{
}
~CallerHandler() override = default;
virtual bool
HandleJSONResult(const nlohmann::json& val) = 0;
bool
HandleResponse(::abyss::http::RPC_Response response) override
{
if (!response.is_object())
{
return HandleJSONResult({});
}
const auto itr = response.find("result");
if (itr == response.end())
{
return HandleJSONResult({});
}
if (itr.value().is_object())
{
return HandleJSONResult(itr.value());
}
return false;
}
void
PopulateReqHeaders(abyss::http::Headers_t& hdr) override;
};
struct LokiPingHandler final : public CallerHandler
{
~LokiPingHandler() override = default;
LokiPingHandler(::abyss::http::ConnImpl* impl, CallerImpl* parent)
: CallerHandler(impl, parent)
{
}
bool
HandleJSONResult(const nlohmann::json& result) override
{
if (not result.is_object())
{
LogError("invalid result from lokid ping, not an object");
return false;
}
const auto itr = result.find("status");
if (itr == result.end())
{
LogError("invalid result from lokid ping, no result");
return false;
}
if (not itr->is_string())
{
LogError("invalid result from lokid ping, status not an string");
return false;
}
const auto status = itr->get<std::string>();
if (status != "OK")
{
LogError("lokid ping failed: '", status, "'");
return false;
}
LogInfo("lokid ping: '", status, "'");
return true;
}
void
HandleError() override
{
LogError("Failed to ping lokid");
}
};
struct GetServiceNodeListHandler final : public CallerHandler
{
using PubkeyList_t = std::vector<RouterID>;
using Callback_t = std::function<void(const PubkeyList_t&, bool)>;
~GetServiceNodeListHandler() override = default;
Callback_t handler;
GetServiceNodeListHandler(::abyss::http::ConnImpl* impl, CallerImpl* parent, Callback_t h)
: CallerHandler(impl, parent), handler(std::move(h))
{
}
bool
HandleJSONResult(const nlohmann::json& result) override;
void
HandleError() override
{
handler({}, false);
}
};
struct CallerImpl : public ::abyss::http::JSONRPC
{
AbstractRouter* router;
llarp_time_t m_NextKeyUpdate = 0s;
std::string m_LastBlockHash;
llarp_time_t m_NextPing = 0s;
const llarp_time_t KeyUpdateInterval = 5s;
const llarp_time_t PingInterval = 5min;
using PubkeyList_t = GetServiceNodeListHandler::PubkeyList_t;
CallerImpl(AbstractRouter* r) : ::abyss::http::JSONRPC(), router(r)
{
}
void
Tick(llarp_time_t now)
{
if (not router->IsRunning())
return;
if (not router->IsServiceNode())
return;
if (now >= m_NextKeyUpdate)
{
AsyncUpdatePubkeyList();
m_NextKeyUpdate = now + KeyUpdateInterval;
}
if (now >= m_NextPing)
{
AsyncLokiPing();
m_NextPing = now + PingInterval;
}
Flush();
}
void
SetAuth(const std::string& user, const std::string& passwd)
{
username = user;
password = passwd;
}
void
AsyncLokiPing()
{
LogInfo("Pinging Lokid");
nlohmann::json version(llarp::VERSION);
nlohmann::json params({{"version", version}});
QueueRPC(
"lokinet_ping", std::move(params), util::memFn(&CallerImpl::NewLokinetPingConn, this));
}
void
AsyncUpdatePubkeyList()
{
LogDebug("Updating service node list");
nlohmann::json params = {{"fields",
{
{"pubkey_ed25519", true},
{"active", true},
{"funded", true},
{"block_hash", true},
}},
{"poll_block_hash", m_LastBlockHash}};
QueueRPC(
"get_n_service_nodes",
std::move(params),
util::memFn(&CallerImpl::NewAsyncUpdatePubkeyListConn, this));
}
bool
Start(const IpAddress& remote)
{
return RunAsync(router->netloop(), remote);
}
abyss::http::IRPCClientHandler*
NewLokinetPingConn(abyss::http::ConnImpl* impl)
{
return new LokiPingHandler(impl, this);
}
abyss::http::IRPCClientHandler*
NewAsyncUpdatePubkeyListConn(abyss::http::ConnImpl* impl)
{
return new GetServiceNodeListHandler(
impl, this, util::memFn(&CallerImpl::HandleServiceNodeListUpdated, this));
}
void
HandleServiceNodeListUpdated(const PubkeyList_t& list, bool updated)
{
if (updated)
{
router->SetRouterWhitelist(list);
}
else
LogError("service node list not updated");
}
~CallerImpl() = default;
};
bool
GetServiceNodeListHandler::HandleJSONResult(const nlohmann::json& result)
{
PubkeyList_t keys;
if (not result.is_object())
{
LogWarn("Invalid result: not an object");
handler({}, false);
return false;
}
// If lokid says tells us the block didn't change then nothing to do
const auto unchanged_it = result.find("unchanged");
if (unchanged_it != result.end() and unchanged_it->get<bool>())
return true;
const auto hash_it = result.find("block_hash");
if (hash_it == result.end())
{
LogWarn("Invalid result: no block_hash member");
handler({}, false);
return false;
}
else
m_Parent->m_LastBlockHash = hash_it->get<std::string>();
const auto itr = result.find("service_node_states");
if (itr == result.end())
{
LogWarn("Invalid result: no service_node_states member");
handler({}, false);
return false;
}
if (not itr.value().is_array())
{
LogWarn("Invalid result: service_node_states is not an array");
handler({}, false);
return false;
}
for (const auto& item : itr.value())
{
if (not item.is_object())
continue;
if (not item.value("active", false))
continue;
if (not item.value("funded", false))
continue;
const std::string pk = item.value("pubkey_ed25519", "");
if (pk.empty())
continue;
PubKey k;
if (k.FromString(pk))
keys.emplace_back(std::move(k));
}
handler(keys, not keys.empty());
return true;
}
void
CallerHandler::PopulateReqHeaders(abyss::http::Headers_t& hdr)
{
hdr.emplace("User-Agent", "lokinet rpc (YOLO)");
}
struct Handler : public ::abyss::httpd::IRPCHandler
{
std::string expectedHostname;
AbstractRouter* router;
std::unordered_map<std::string, std::function<Response()>> m_dispatch;
Handler(::abyss::httpd::ConnImpl* conn, AbstractRouter* r, std::string hostname)
: ::abyss::httpd::IRPCHandler(conn)
, expectedHostname(std::move(hostname))
, router(r)
, m_dispatch{{"llarp.admin.die", [=]() { return KillRouter(); }},
{"llarp.admin.wakeup", [=]() { return StartRouter(); }},
{"llarp.admin.link.neighbor", [=]() { return ListNeighbors(); }},
{"llarp.admin.exit.list", [=]() { return ListExitLevels(); }},
{"llarp.admin.dumpstate", [=]() { return DumpState(); }},
{"llarp.admin.status", [=]() { return DumpStatus(); }},
{"llarp.our.addresses", [=]() { return OurAddresses(); }},
{"llarp.version", [=]() { return DumpVersion(); }}}
{
}
~Handler() override = default;
bool
ValidateHost(const std::string& host) const override
{
return host == "localhost" || host == expectedHostname;
}
Response
StartRouter() const
{
const bool rc = router->Run();
return Response{{"status", rc}};
}
Response
DumpState() const
{
return router->ExtractStatus();
}
Response
KillRouter() const
{
if (not router->IsRunning())
return {{"error", "already stopping"}};
router->Stop();
return {{"status", "OK"}};
}
Response
ListExitLevels() const
{
exit::Context::TrafficStats stats;
router->exitContext().CalculateExitTraffic(stats);
Response resp;
for (const auto& stat : stats)
{
resp.emplace_back(Response{
{"ident", stat.first.ToHex()},
{"tx", stat.second.first},
{"rx", stat.second.second},
});
}
return resp;
}
Response
ListNeighbors() const
{
Response resp = Response::array();
router->ForEachPeer(
[&](const ILinkSession* session, bool outbound) {
resp.emplace_back(Response{{"ident", RouterID(session->GetPubKey()).ToString()},
{"svcnode", session->GetRemoteRC().IsPublicRouter()},
{"outbound", outbound}});
},
false);
return resp;
}
Response
DumpStatus() const
{
size_t numServices = 0;
size_t numServicesReady = 0;
Response services = Response::array();
auto visitor = [&](const std::string& name,
const std::shared_ptr<service::Endpoint>& ptr) -> bool {
numServices++;
if (ptr->IsReady())
numServicesReady++;
const Response status{{"ready", ptr->IsReady()},
{"stopped", ptr->IsStopped()},
{"stale", ptr->IntrosetIsStale()}};
services.emplace_back(Response{name, status});
return true;
};
router->hiddenServiceContext().ForEachService(visitor);
const Response resp{{"uptime", to_json(router->Uptime())},
{"servicesTotal", numServices},
{"servicesReady", numServicesReady},
{"services", services}};
return resp;
}
Response
OurAddresses() const
{
Response services;
router->hiddenServiceContext().ForEachService(
[&](const std::string&, const std::shared_ptr<service::Endpoint>& service) {
const service::Address addr = service->GetIdentity().pub.Addr();
services.push_back(addr.ToString());
return true;
});
return Response{{"services", services}};
}
Response
DumpVersion() const
{
const Response resp{{"version", llarp::VERSION_FULL}};
return resp;
}
Response
HandleJSONRPC(Method_t method, const Params& /*params*/) override
{
auto it = m_dispatch.find(method);
if (it != m_dispatch.end())
{
return it->second();
}
return false;
}
};
struct ReqHandlerImpl : public ::abyss::httpd::BaseReqHandler
{
ReqHandlerImpl(AbstractRouter* r, llarp_time_t reqtimeout)
: ::abyss::httpd::BaseReqHandler(reqtimeout), router(r)
{
}
std::string expectedHostname;
AbstractRouter* router;
::abyss::httpd::IRPCHandler*
CreateHandler(::abyss::httpd::ConnImpl* conn) override
{
return new Handler(conn, router, expectedHostname);
}
};
struct ServerImpl
{
AbstractRouter* router;
ReqHandlerImpl _handler;
ServerImpl(AbstractRouter* r) : router(r), _handler(r, 2s)
{
}
~ServerImpl() = default;
void
Stop()
{
_handler.Close();
}
bool
Start(const IpAddress& addr)
{
_handler.expectedHostname = "localhost";
return _handler.ServeAsync(router->netloop(), router->logic(), addr.createSockAddr());
}
};
Caller::Caller(AbstractRouter* r) : m_Impl(std::make_unique<CallerImpl>(r))
{
}
Caller::~Caller() = default;
void
Caller::Stop()
{
m_Impl->Stop();
}
bool
Caller::Start(const IpAddress& addr)
{
return m_Impl->Start(addr);
}
void
Caller::Tick(llarp_time_t now)
{
m_Impl->Tick(now);
}
void
Caller::SetAuth(const std::string& user, const std::string& passwd)
{
m_Impl->SetAuth(user, passwd);
}
Server::Server(AbstractRouter* r) : m_Impl(std::make_unique<ServerImpl>(r))
{
}
Server::~Server() = default;
void
Server::Stop()
{
m_Impl->Stop();
}
bool
Server::Start(const IpAddress& addr)
{
return m_Impl->Start(addr);
}
} // namespace rpc
} // namespace llarp

@ -1,68 +0,0 @@
#ifndef LLARP_RPC_HPP
#define LLARP_RPC_HPP
#include <util/time.hpp>
#include <net/ip_address.hpp>
#include <functional>
#include <memory>
#include <string>
namespace llarp
{
struct PubKey;
struct AbstractRouter;
namespace rpc
{
struct ServerImpl;
/// jsonrpc server
struct Server
{
Server(AbstractRouter* r);
~Server();
bool
Start(const IpAddress& bindaddr);
/// stop and close
void
Stop();
private:
std::unique_ptr<ServerImpl> m_Impl;
};
struct CallerImpl;
/// jsonrpc caller
struct Caller
{
Caller(AbstractRouter* r);
~Caller();
/// set http basic auth for use with remote rpc endpoint
void
SetAuth(const std::string& user, const std::string& password);
/// start with jsonrpc endpoint address
bool
Start(const IpAddress& remote);
/// stop and close
void
Stop();
/// do per second tick
void
Tick(llarp_time_t now);
private:
std::unique_ptr<CallerImpl> m_Impl;
};
} // namespace rpc
} // namespace llarp
#endif

@ -37,10 +37,8 @@ add_executable(testAll
routing/test_llarp_routing_obtainexitmessage.cpp
service/test_llarp_service_address.cpp
service/test_llarp_service_identity.cpp
test_libabyss.cpp
test_llarp_encrypted_frame.cpp
test_llarp_router_contact.cpp
test_md5.cpp
util/meta/test_llarp_util_memfn.cpp
util/meta/test_llarp_util_traits.cpp
util/test_llarp_util_aligned.cpp

@ -1,181 +0,0 @@
#include <libabyss.hpp>
#include <crypto/crypto.hpp>
#include <crypto/crypto_libsodium.hpp>
#include <ev/ev.hpp>
#include <net/net.hpp>
#include <util/thread/threading.hpp>
#include <gtest/gtest.h>
struct AbyssTestBase : public ::testing::Test
{
llarp::sodium::CryptoLibSodium crypto;
llarp_ev_loop_ptr loop = nullptr;
std::shared_ptr<llarp::Logic> logic;
abyss::httpd::BaseReqHandler* server = nullptr;
abyss::http::JSONRPC* client = nullptr;
const std::string method = "test.method";
bool called = false;
AbyssTestBase()
{
llarp::SetLogLevel(llarp::eLogDebug);
}
void
AssertMethod(const std::string& meth) const
{
ASSERT_EQ(meth, method);
}
void
Start()
{
// throw std::runtime_error("FIXME (replace libabyss with lokimq)");
/*
loop = llarp_make_ev_loop();
logic = std::make_shared< llarp::Logic >();
loop->set_logic(logic);
sockaddr_in addr;
addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
addr.sin_port = htons((llarp::randint() % 2000) + 2000);
addr.sin_family = AF_INET;
llarp::Addr a(addr);
while(true)
{
if(server->ServeAsync(loop, logic, a))
{
client->RunAsync(loop, a.ToString());
logic->call_later(1s, std::bind(&AbyssTestBase::Stop, this));
return;
}
std::this_thread::sleep_for(1s);
}
*/
}
void
Stop()
{
llarp::LogDebug("test case Stop() called");
llarp_ev_loop_stop(loop);
}
void
AsyncStop()
{
LogicCall(logic, std::bind(&AbyssTestBase::Stop, this));
}
~AbyssTestBase()
{
logic.reset();
llarp::SetLogLevel(llarp::eLogInfo);
}
};
struct ClientHandler : public abyss::http::IRPCClientHandler
{
AbyssTestBase* test;
ClientHandler(abyss::http::ConnImpl* impl, AbyssTestBase* parent)
: abyss::http::IRPCClientHandler(impl), test(parent)
{
}
void
HandleError()
{
FAIL() << "unexpected error";
}
void
PopulateReqHeaders(abyss::http::Headers_t& /*hdr*/)
{
}
bool HandleResponse(abyss::http::RPC_Response /*response*/)
{
test->AsyncStop();
return true;
}
};
struct ServerHandler : public abyss::httpd::IRPCHandler
{
AbyssTestBase* test;
ServerHandler(abyss::httpd::ConnImpl* impl, AbyssTestBase* parent)
: abyss::httpd::IRPCHandler(impl), test(parent)
{
}
bool
ValidateHost(const std::string& /*hostname */) const override
{
return true;
}
Response
HandleJSONRPC(Method_t method, const Params& /*params*/) override
{
test->AssertMethod(method);
test->called = true;
return Response();
}
~ServerHandler()
{
}
};
struct AbyssTest : public AbyssTestBase,
public abyss::http::JSONRPC,
public abyss::httpd::BaseReqHandler
{
AbyssTest() : AbyssTestBase(), abyss::http::JSONRPC(), abyss::httpd::BaseReqHandler(1s)
{
client = this;
server = this;
}
abyss::http::IRPCClientHandler*
NewConn(abyss::http::ConnImpl* impl)
{
return new ClientHandler(impl, this);
}
abyss::httpd::IRPCHandler*
CreateHandler(abyss::httpd::ConnImpl* impl)
{
return new ServerHandler(impl, this);
}
void
AsyncFlush()
{
LogicCall(logic, std::bind(&AbyssTest::Flush, this));
}
void
RunLoop()
{
llarp_ev_loop_run_single_process(loop, logic);
}
};
TEST_F(AbyssTest, TestClientAndServer)
{
#if 1
GTEST_SKIP();
#else
Start();
QueueRPC(
method,
nlohmann::json::object(),
std::bind(&AbyssTest::NewConn, this, std::placeholders::_1));
AsyncFlush();
RunLoop();
ASSERT_TRUE(called);
#endif
}

@ -1,9 +0,0 @@
#include <gtest/gtest.h>
#include <abyss/md5.hpp>
TEST(TestMD5, TestMD5)
{
std::string str("The quick brown fox jumps over the lazy dog");
auto H = MD5::SumHex(str);
ASSERT_EQ(H, "9e107d9d372bb6826bd81d3542a419d6");
}
Loading…
Cancel
Save