de changes

pull/35/head
Jeff Becker 6 years ago
parent ca1243f392
commit 612057003a
No known key found for this signature in database
GPG Key ID: F357B3B42F6F9B05

@ -318,7 +318,6 @@ set(LIB_SRC
${UTP_SRC}
${NTRU_SRC}
llarp/address_info.cpp
llarp/arpc.cpp
llarp/bencode.cpp
llarp/buffer.cpp
llarp/config.cpp
@ -348,6 +347,7 @@ set(LIB_SRC
llarp/relay_up_down.cpp
llarp/router_contact.cpp
llarp/router.cpp
llarp/rpc.cpp
llarp/service.cpp
llarp/transit_hop.cpp
llarp/testnet.c
@ -439,8 +439,19 @@ include_directories(${sodium_INCLUDE_DIR})
set(RC_EXE rcutil)
set(DNS_EXE dns)
add_subdirectory(${CMAKE_SOURCE_DIR}/libabyss)
set(ABYSS ${CMAKE_SOURCE_DIR}/libabyss)
set(ABYSS_LIB abyss)
include_directories(${ABYSS}/include)
set(ABYSS_SRC
${ABYSS}/src/http.cpp
${ABYSS}/src/client.cpp
${ABYSS}/src/server.cpp
${ABYSS}/src/lib.cpp)
add_library(${ABYSS_LIB} ${ABYSS_SRC})
if(SHADOW)
add_shadow_plugin(shadow-plugin-${SHARED_LIB} ${EXE_SRC} ${LIB_SRC} ${LIB_PLATFORM_SRC} ${CPP_BACKPORT_SRC} ${ABYSS_SRC})
@ -451,21 +462,19 @@ add_executable(${RC_EXE} ${RC_SRC})
add_executable(${EXE} ${EXE_SRC})
add_executable(${CLIENT_EXE} ${CLIENT_SRC})
target_link_libraries(${CLIENT_EXE} ${ABYSS_LIB})
target_link_libraries(${EXE} ${ABYSS_LIB})
add_executable(${DNS_EXE} ${DNS_SRC})
add_subdirectory(${GTEST_DIR})
include_directories(${GTEST_DIR}/include ${GTEST_DIR})
add_executable(${TEST_EXE} ${TEST_SRC})
if(WITH_STATIC)
add_library(${STATIC_LIB} STATIC ${LIB_SRC})
if(NOT HAVE_CXX17_FILESYSTEM)
add_library(${BACKPORT_LIB} STATIC ${CPP_BACKPORT_SRC})
endif(NOT HAVE_CXX17_FILESYSTEM)
add_library(${PLATFORM_LIB} STATIC ${LIB_PLATFORM_SRC})
target_link_libraries(${PLATFORM_LIB} ${THREAD_LIB})
target_link_libraries(${PLATFORM_LIB} ${THREAD_LIB} ${ABYSS_LIB})
if(${CMAKE_SYSTEM_NAME} MATCHES "Linux")
target_link_libraries(${PLATFORM_LIB} -lcap)
endif()

@ -1,64 +0,0 @@
#ifndef LLARP_ARPC_HPP
#define LLARP_ARPC_HPP
#include <llarp/bencode.hpp>
#include <llarp/crypto.hpp>
#include <llarp/logger.hpp>
#include <llarp/time.h>
#include <llarp/endian.h>
#include <llarp/ev.h>
#include <functional>
#include <string>
#include <map>
#include <unordered_map>
#ifndef _WIN32
#include <sys/un.h>
#endif
#include <llarp/net.hpp>
// forward declare
struct llarp_router;
namespace llarp
{
namespace arpc
{
// forward declare
struct BaseMessage;
struct Server
{
llarp_tcp_acceptor m_acceptor;
llarp_router* router;
Server(llarp_router* r);
static void
OnAccept(llarp_tcp_acceptor* a, llarp_tcp_conn* conn);
bool
Start(const std::string& bindaddr);
const llarp_crypto*
Crypto() const;
const byte_t*
SigningPublicKey() const
{
return llarp::seckey_topublic(SigningPrivateKey());
}
const byte_t*
SigningPrivateKey() const;
bool
Sign(BaseMessage* msg) const;
};
} // namespace arpc
} // namespace llarp
#endif

@ -94,6 +94,8 @@ struct llarp_tcp_conn
void (*read)(struct llarp_tcp_conn *, const void *, size_t);
/// handle close event (free-ing is handled by event loop)
void (*closed)(struct llarp_tcp_conn *);
/// handle event loop tick
void (*tick)(struct llarp_tcp_conn *);
};
/// queue async write a buffer in full
@ -120,7 +122,8 @@ struct llarp_tcp_acceptor
/// return false if failed to bind
/// return true on successs
bool
llarp_tcp_serve(struct llarp_tcp_acceptor *t, const sockaddr *bindaddr);
llarp_tcp_serve(struct llarp_ev_loop *loop, struct llarp_tcp_acceptor *t,
const sockaddr *bindaddr);
/// close and stop accepting connections
void

@ -0,0 +1,31 @@
#ifndef LLARP_RPC_HPP
#define LLARP_RPC_HPP
#include <llarp/time.h>
#include <llarp/ev.h>
#include <string>
// forward declare
struct llarp_router;
namespace llarp
{
namespace rpc
{
struct ServerImpl;
struct Server
{
Server(llarp_router* r);
~Server();
bool
Start(const std::string& bindaddr);
private:
ServerImpl* m_Impl;
};
} // namespace rpc
} // namespace llarp
#endif

@ -1,4 +1,3 @@
project(abyss)
set(ABYSS src)

@ -0,0 +1,13 @@
#ifdnef __LIB_ABYSS_H__
#define __LIB_ABYSS_H__
#include <llarp/ev.h>
#ifdef __cplusplus
extern "C"
{
#endif
#ifdef __cplusplus
}
#endif
#endif

@ -0,0 +1,39 @@
#ifndef __LIB_ABYSS_HPP__
#define __LIB_ABYSS_HPP__
#include <llarp/ev.h>
#include <llarp/logic.h>
#include <llarp/time.h>
#include <vector>
#include <memory>
namespace abyss
{
namespace http
{
// forward declare
struct ConnHandler;
struct BaseReqHandler
{
BaseReqHandler(llarp_time_t req_timeout);
~BaseReqHandler();
bool
ServeAsync(llarp_ev_loop* loop, llarp_logic* logic,
const sockaddr* bindaddr);
private:
static void
OnAccept(struct llarp_tcp_acceptor*, struct llarp_tcp_conn*);
llarp_ev_loop* m_loop;
llarp_logic* m_Logic;
llarp_tcp_acceptor m_acceptor;
std::vector< std::unique_ptr< ConnHandler > > m_Conns;
llarp_time_t m_ReqTimeout;
};
} // namespace http
} // namespace abyss
#endif

@ -0,0 +1,56 @@
#include <libabyss.hpp>
#include <llarp/time.h>
namespace abyss
{
namespace http
{
struct ConnHandler
{
llarp_tcp_conn* _conn;
llarp_time_t m_LastActive;
llarp_time_t m_ReadTimeout;
ConnHandler(llarp_tcp_conn* c, llarp_time_t readtimeout) : _conn(c)
{
m_LastActive = llarp_time_now_ms();
m_ReadTimeout = readtimeout;
}
bool
ShouldClose(llarp_time_t now) const
{
return now - m_LastActive > m_ReadTimeout;
}
void
Begin()
{
}
};
BaseReqHandler::BaseReqHandler(llarp_time_t reqtimeout)
: m_ReqTimeout(reqtimeout)
{
m_loop = nullptr;
m_Logic = nullptr;
m_acceptor.accepted = &BaseReqHandler::OnAccept;
m_acceptor.user = this;
}
BaseReqHandler::~BaseReqHandler()
{
llarp_tcp_acceptor_close(&m_acceptor);
}
void
BaseReqHandler::OnAccept(llarp_tcp_acceptor* acceptor, llarp_tcp_conn* conn)
{
BaseReqHandler* self = static_cast< BaseReqHandler* >(acceptor->user);
ConnHandler* handler = new ConnHandler(conn, self->m_ReqTimeout);
conn->user = handler;
self->m_Conns.emplace_back(handler);
}
} // namespace http
} // namespace abyss

@ -1,405 +0,0 @@
#include <llarp/arpc.hpp>
namespace llarp
{
namespace arpc
{
/// interface for request messages
struct IRequest
{
/// returns false if errmsg is set
/// returns true if retval is set
virtual bool
HandleRequest(Server* ctx, std::unique_ptr< BaseMessage >& retval,
std::string& errmsg) const = 0;
};
struct BaseMessage : public llarp::IBEncodeMessage, public IRequest
{
static constexpr size_t MaxIDSize = 128;
/// maximum size of a message
static constexpr size_t MaxSize = 1024 * 8;
BaseMessage()
{
timestamp = llarp_time_now_ms();
zkey.Zero();
zsig.Zero();
}
std::string m_id;
llarp_time_t timestamp;
llarp::PubKey zkey;
llarp::Signature zsig;
/// override me
virtual std::string
Method() const = 0;
/// encode the entire message
bool
BEncode(llarp_buffer_t* buf) const
{
if(!bencode_start_dict(buf))
return false;
if(!BEncodeWriteDictString("aRPC-method", Method(), buf))
return false;
if(!BEncodeWriteDictString("id", m_id, buf))
return false;
if(!BEncodeBody(buf))
return false;
if(!zkey.IsZero())
{
if(!BEncodeWriteDictEntry("z-key", zkey, buf))
return false;
if(!BEncodeWriteDictEntry("z-sig", zsig, buf))
return false;
}
return bencode_end(buf);
}
bool
DecodeKey(llarp_buffer_t k, llarp_buffer_t* buf)
{
if(llarp_buffer_eq(k, "id"))
{
return DecodeID(buf);
}
if(llarp_buffer_eq(k, "params"))
{
return DecodeParams(buf);
}
return false;
}
protected:
typedef bool (*ParamDecoder)(dict_reader*, llarp_buffer_t*);
virtual ParamDecoder
GetParamDecoder() const = 0;
bool
DecodeParams(llarp_buffer_t* buf)
{
dict_reader r;
r.user = this;
r.on_key = GetParamDecoder();
return bencode_read_dict(buf, &r);
}
bool
DecodeID(llarp_buffer_t* buf)
{
llarp_buffer_t strbuf;
if(!bencode_read_string(buf, &strbuf))
return false;
if(strbuf.sz > MaxIDSize) // too big
return false;
m_id = std::string((char*)strbuf.base, strbuf.sz);
return true;
}
/// encode body of message
virtual bool
BEncodeBody(llarp_buffer_t* buf) const = 0;
};
struct ConnHandler
{
ConnHandler(Server* s, llarp_tcp_conn* c) : parent(s), m_conn(c)
{
left = 0;
readingHeader = true;
}
bool readingHeader;
Server* parent;
llarp_tcp_conn* m_conn;
AlignedBuffer< BaseMessage::MaxSize > buf;
uint16_t left;
void
ParseMessage();
void
Close()
{
llarp_tcp_conn_close(m_conn);
}
static void
OnClosed(llarp_tcp_conn* conn)
{
ConnHandler* self = static_cast< ConnHandler* >(conn->user);
delete self;
}
static void
OnRead(llarp_tcp_conn* conn, const void* buf, size_t sz)
{
ConnHandler* self = static_cast< ConnHandler* >(conn->user);
const byte_t* ptr = (const byte_t*)buf;
do
{
if(self->readingHeader)
{
self->left = bufbe16toh(ptr);
sz -= 2;
ptr += 2;
self->readingHeader = false;
}
size_t dlt = std::min((size_t)self->left, sz);
memcpy(self->buf.data() + (self->buf.size() - self->left), ptr, dlt);
self->left -= dlt;
sz -= dlt;
if(self->left == 0)
{
self->ParseMessage();
self->readingHeader = true;
}
} while(sz > 0);
}
};
/// base type for ping req/resp
struct Ping : public BaseMessage
{
Ping() : BaseMessage()
{
}
uint64_t ping;
std::string
Method() const
{
return "llarp.rpc.ping";
}
bool
BEncodeBody(llarp_buffer_t* buf) const
{
if(!bencode_write_bytestring(buf, "params", 6))
return false;
if(!bencode_start_dict(buf))
return false;
if(!BEncodeWriteDictInt("ping", ping, buf))
return false;
return bencode_end(buf);
}
static bool
OnParamKey(dict_reader* r, llarp_buffer_t* k)
{
Ping* self = static_cast< Ping* >(r->user);
if(k && llarp_buffer_eq(*k, "ping"))
{
return bencode_read_integer(r->buffer, &self->ping);
}
else
return k == nullptr;
}
virtual ParamDecoder
GetParamDecoder() const
{
return &OnParamKey;
}
};
struct PingResponse : public Ping
{
PingResponse(uint64_t p) : Ping()
{
ping = p;
}
bool
HandleRequest(Server*, std::unique_ptr< BaseMessage >&,
std::string&) const
{
/// TODO: handle client response
llarp::LogInfo(Method(), "pong ", ping);
return false;
}
};
struct PingRequest : public Ping
{
bool
HandleRequest(Server* serv, std::unique_ptr< BaseMessage >& retval,
std::string& errmsg) const
{
PingResponse* resp = new PingResponse(ping);
if(!serv->Sign(resp))
{
errmsg = "failed to sign response";
return false;
}
retval.reset(resp);
return true;
}
};
struct MessageReader
{
dict_reader m_reader;
BaseMessage* msg = nullptr;
MessageReader()
{
m_reader.user = this;
m_reader.on_key = &OnKey;
}
static bool
OnKey(dict_reader* r, llarp_buffer_t* key)
{
static std::unordered_map< std::string,
const std::function< BaseMessage*(void) > >
msgConstructors = {
{"llarp.rpc.ping",
[]() -> BaseMessage* { return new PingRequest(); }},
};
MessageReader* self = static_cast< MessageReader* >(r->user);
if(self->msg == nullptr)
{
// first key
if(key == nullptr || !llarp_buffer_eq(*key, "aRPC-method"))
{
// bad value
return false;
}
llarp_buffer_t strbuf;
if(!bencode_read_string(r->buffer, &strbuf))
return false;
std::string method = std::string((char*)strbuf.base, strbuf.sz);
auto itr = msgConstructors.find(method);
if(itr == msgConstructors.end())
{
// no such method
return false;
}
else
self->msg = itr->second();
return true;
}
else if(key)
return self->msg->DecodeKey(*key, r->buffer);
else
return true;
}
bool
DecodeMessage(llarp_buffer_t* buf,
std::unique_ptr< BaseMessage >& request)
{
msg = nullptr;
if(!bencode_read_dict(buf, &m_reader))
return false;
request.reset(msg);
return true;
}
};
Server::Server(llarp_router* r)
{
router = r;
m_acceptor.user = this;
m_acceptor.accepted = &OnAccept;
}
bool
Server::Start(const std::string& bindaddr)
{
llarp::Addr addr;
sockaddr* saddr = nullptr;
#ifndef _WIN32
sockaddr_un unaddr;
if(bindaddr.find("unix:") == 0)
{
unaddr.sun_family = AF_UNIX;
strncpy(unaddr.sun_path, bindaddr.substr(5).c_str(),
sizeof(unaddr.sun_path));
saddr = (sockaddr*)&unaddr;
}
else
#endif
{
// TODO: ipv6
auto idx = bindaddr.find(':');
std::string host = bindaddr.substr(0, idx);
uint16_t port = std::stoi(bindaddr.substr(idx + 1));
addr = llarp::Addr(host, port);
saddr = (sockaddr*)addr;
}
return llarp_tcp_serve(&m_acceptor, saddr);
}
void
Server::OnAccept(llarp_tcp_acceptor* a, llarp_tcp_conn* conn)
{
Server* self = static_cast< Server* >(a->user);
conn->user = new ConnHandler(self, conn);
conn->read = &ConnHandler::OnRead;
conn->closed = &ConnHandler::OnClosed;
}
bool
Server::Sign(BaseMessage* msg) const
{
msg->zkey = SigningPublicKey();
msg->zsig.Zero();
llarp::Signature sig;
//
byte_t tmp[BaseMessage::MaxSize];
auto buf = llarp::StackBuffer< decltype(tmp) >(tmp);
if(!msg->BEncode(&buf))
return false;
// rewind buffer
buf.sz = buf.cur - buf.base;
buf.cur = buf.base;
if(!Crypto()->sign(sig, SigningPrivateKey(), buf))
return false;
msg->zsig = sig;
return true;
}
void
ConnHandler::ParseMessage()
{
std::unique_ptr< BaseMessage > msg;
std::unique_ptr< BaseMessage > response;
std::string errmsg;
MessageReader r;
auto tmp = llarp::Buffer(buf);
if(!r.DecodeMessage(&tmp, msg))
{
llarp::LogError("failed to decode message");
Close();
return;
}
// handle request
if(!msg->HandleRequest(parent, response, errmsg))
{
// TODO: send error reply
llarp::LogError("failed to handle api message: ", errmsg);
Close();
return;
}
if(!parent->Sign(response.get()))
{
llarp::LogError("failed to sign response");
Close();
}
}
} // namespace arpc
} // namespace llarp

@ -111,7 +111,6 @@ llarp_ev_add_tun(struct llarp_ev_loop *loop, struct llarp_tun_io *tun)
tun->impl = dev;
if(dev)
{
loop->tun_listeners.push_back(tun);
return loop->add_ev(dev, true);
}
return false;
@ -131,6 +130,12 @@ llarp_tcp_serve(struct llarp_tcp_acceptor *tcp, const struct sockaddr *bindaddr)
return false;
}
void
llarp_tcp_acceptor_close(struct llarp_tcp_acceptor *tcp)
{
// TODO: implement me
}
void
llarp_tcp_conn_close(struct llarp_tcp_conn *conn)
{

@ -49,6 +49,9 @@ namespace llarp
virtual int
sendto(const sockaddr* dst, const void* data, size_t sz) = 0;
virtual void
tick(){};
/// used for tun interface and tcp conn
virtual bool
do_write(void* data, size_t sz)
@ -191,23 +194,13 @@ struct llarp_ev_loop
virtual ~llarp_ev_loop(){};
std::vector< llarp_udp_io* > udp_listeners;
std::vector< llarp_tun_io* > tun_listeners;
std::vector< std::unique_ptr< llarp::ev_io > > handlers;
void
tick_listeners()
{
for(auto& l : udp_listeners)
if(l->tick)
l->tick(l);
for(auto& l : tun_listeners)
{
if(l->tick)
l->tick(l);
if(l->before_write)
l->before_write(l);
static_cast< llarp::ev_io* >(l->impl)->flush_write();
}
for(const auto& h : handlers)
h->tick();
}
};

@ -16,6 +16,14 @@
namespace llarp
{
struct tcp_serv : public ev_io
{
};
struct tcp_conn : public ev_io
{
};
struct udp_listener : public ev_io
{
llarp_udp_io* udp;
@ -26,6 +34,13 @@ namespace llarp
{
}
virtual void
tick()
{
if(udp->tick)
udp->tick(udp);
}
virtual int
read(void* buf, size_t sz)
{
@ -84,6 +99,14 @@ namespace llarp
return -1;
}
virtual void
tick()
{
if(t->tick)
t->tick(t);
flush_write();
}
void
flush_write()
{
@ -270,7 +293,14 @@ struct llarp_epoll_loop : public llarp_ev_loop
bool
close_ev(llarp::ev_io* ev)
{
return epoll_ctl(epollfd, EPOLL_CTL_DEL, ev->fd, nullptr) != -1;
if(epoll_ctl(epollfd, EPOLL_CTL_DEL, ev->fd, nullptr) == -1)
return false;
// deallocate
std::remove_if(handlers.begin(), handlers.end(),
[ev](const std::unique_ptr< llarp::ev_io >& i) -> bool {
return i.get() == ev;
});
return true;
}
llarp::ev_io*
@ -278,7 +308,10 @@ struct llarp_epoll_loop : public llarp_ev_loop
{
llarp::tun* t = new llarp::tun(tun);
if(t->setup())
{
handlers.emplace_back(t);
return t;
}
delete t;
return nullptr;
}
@ -289,9 +322,9 @@ struct llarp_epoll_loop : public llarp_ev_loop
int fd = udp_bind(src);
if(fd == -1)
return nullptr;
llarp::udp_listener* listener = new llarp::udp_listener(fd, l);
l->impl = listener;
udp_listeners.push_back(l);
handlers.emplace_back(new llarp::udp_listener(fd, l));
llarp::ev_io* listener = handlers.back().get();
l->impl = listener;
return listener;
}
@ -321,9 +354,7 @@ struct llarp_epoll_loop : public llarp_ev_loop
{
close_ev(listener);
l->impl = nullptr;
delete listener;
std::remove_if(udp_listeners.begin(), udp_listeners.end(),
[l](llarp_udp_io* i) -> bool { return i == l; });
ret = true;
}
return ret;
}

@ -3,7 +3,7 @@
#include <llarp/iwp.hpp>
#include <llarp/link_message.hpp>
#include <llarp/link/utp.hpp>
#include <llarp/arpc.hpp>
#include <llarp/rpc.hpp>
#include "buffer.hpp"
#include "encode.hpp"
@ -655,7 +655,7 @@ llarp_router::Run()
{
rpcBindAddr = DefaultRPCBindAddr;
}
rpcServer = std::make_unique< llarp::arpc::Server >(this);
rpcServer = std::make_unique< llarp::rpc::Server >(this);
if(!rpcServer->Start(rpcBindAddr))
{
llarp::LogError("Binding rpc server to ", rpcBindAddr, " failed");
@ -1252,19 +1252,4 @@ namespace llarp
}
}
}
namespace arpc
{
const byte_t *
Server::SigningPrivateKey() const
{
return router->identity;
}
const llarp_crypto *
Server::Crypto() const
{
return &router->crypto;
}
} // namespace arpc
} // namespace llarp

@ -5,7 +5,7 @@
#include <llarp/router_contact.hpp>
#include <llarp/path.hpp>
#include <llarp/link_layer.hpp>
#include <llarp/arpc.hpp>
#include <llarp/rpc.hpp>
#include <functional>
#include <list>
@ -105,7 +105,7 @@ struct llarp_router
std::string DefaultRPCBindAddr = "127.0.0.1:1190";
bool enableRPCServer = false;
std::unique_ptr< llarp::arpc::Server > rpcServer;
std::unique_ptr< llarp::rpc::Server > rpcServer;
std::string rpcBindAddr = DefaultRPCBindAddr;
std::unique_ptr< llarp::ILinkLayer > outboundLink;

@ -0,0 +1,40 @@
#include <llarp/rpc.hpp>
#include <libabyss.hpp>
namespace llarp
{
namespace rpc
{
struct ServerImpl : public ::abyss::http::BaseReqHandler
{
llarp_router* router;
ServerImpl(llarp_router* r)
: ::abyss::http::BaseReqHandler(2000), router(r)
{
}
bool
Start(const std::string& addr)
{
return false;
}
};
Server::Server(llarp_router* r) : m_Impl(new ServerImpl(r))
{
}
Server::~Server()
{
delete m_Impl;
}
bool
Server::Start(const std::string& addr)
{
return m_Impl->Start(addr);
}
} // namespace rpc
} // namespace llarp
Loading…
Cancel
Save