wire up liblokinet

pull/1576/head
Jeff Becker 3 years ago
parent 2a809c7a30
commit 853cc52efb
No known key found for this signature in database
GPG Key ID: F357B3B42F6F9B05

@ -47,9 +47,6 @@ namespace llarp
virtual ~Context() = default; virtual ~Context() = default;
void
Close();
void void
Setup(const RuntimeOptions& opts); Setup(const RuntimeOptions& opts);
@ -73,6 +70,9 @@ namespace llarp
bool bool
LooksAlive() const; LooksAlive() const;
bool
IsStopping() const;
/// close async /// close async
void void
CloseAsync(); CloseAsync();
@ -111,6 +111,9 @@ namespace llarp
void void
SigINT(); SigINT();
void
Close();
std::unique_ptr<std::promise<void>> closeWaiter; std::unique_ptr<std::promise<void>> closeWaiter;
}; };

@ -28,7 +28,7 @@ extern "C"
lokinet_context_stop(struct lokinet_context*); lokinet_context_stop(struct lokinet_context*);
/// get default lokinet context /// get default lokinet context
/// does not need to be freed by lokinet_context_free /// not to be freed by lokinet_context_free
struct lokinet_context* struct lokinet_context*
lokinet_default(); lokinet_default();
@ -40,35 +40,44 @@ extern "C"
int errno; int errno;
/// the local ip address we mapped the remote endpoint to /// the local ip address we mapped the remote endpoint to
char* local_address; /// null terminated
char local_address[256];
/// the local port we mapped the remote endpoint to /// the local port we mapped the remote endpoint to
int local_port; int local_port;
/// the id of the stream we created
int stream_id;
}; };
/// connect out to a remote endpoint /// connect out to a remote endpoint
/// remote is in the form of "name:port" /// remoteAddr is in the form of "name:port"
/// returns NULL if context was NULL or not started /// localAddr is either NULL for any or in the form of "ip:port" to bind to an explicit address
/// returns a free()-able lokinet_stream_result * that contains the result /// returns a lokinet_stream_result * that contains the result that can be free()'d
struct lokinet_stream_result* struct lokinet_stream_result*
lokinet_outbound_stream(const char* remote, struct lokinet_context* context); lokinet_outbound_stream(
const char* remoteAddr, const char* localAddr, struct lokinet_context* context);
/// stream accept filter determines if we should accept a stream or not /// stream accept filter determines if we should accept a stream or not
/// return 0 to accept /// return 0 to accept
/// return -1 to explicitly reject /// return -1 to explicitly reject
/// return -2 to silently drop /// return -2 to silently drop
typedef int (*lokinet_stream_filter)(const char*, uint16_t, struct sockaddr* const, void*); typedef int (*lokinet_stream_filter)(const char* remote, uint16_t port, void*);
/// set stream accepter filter /// set stream accepter filter
/// passes user parameter into stream filter as void * /// passes user parameter into stream filter as void *
void /// returns stream id
int
lokinet_inbound_stream_filter( lokinet_inbound_stream_filter(
lokinet_stream_filter acceptFilter, void* user, struct lokinet_context* context); lokinet_stream_filter acceptFilter, void* user, struct lokinet_context* context);
/// simple stream acceptor /// simple stream acceptor
/// simple variant of lokinet_inbound_stream_filter that maps port to localhost:port /// simple variant of lokinet_inbound_stream_filter that maps port to localhost:port
void int
lokinet_inbound_stream(uint16_t port, struct lokinet_context* context); lokinet_inbound_stream(uint16_t port, struct lokinet_context* context);
/// close a stream by id
void
lokinet_close_stream(int stream_id, struct lokinet_context* context);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

@ -1353,4 +1353,12 @@ namespace llarp
return def.generateINIConfig(true); return def.generateINIConfig(true);
} }
std::shared_ptr<Config>
Config::EmbeddedConfig()
{
auto config = std::make_shared<Config>(fs::path{""});
config->network.m_endpointType = "null";
return config;
}
} // namespace llarp } // namespace llarp

@ -251,6 +251,10 @@ namespace llarp
void void
AddDefault(std::string section, std::string key, std::string value); AddDefault(std::string section, std::string key, std::string value);
/// create a config with the default parameters for an embedded lokinet
static std::shared_ptr<Config>
EmbeddedConfig();
private: private:
/// Load (initialize) a default config. /// Load (initialize) a default config.
/// ///

@ -129,13 +129,19 @@ namespace llarp
Context::CloseAsync() Context::CloseAsync()
{ {
/// already closing /// already closing
if (closeWaiter) if (IsStopping())
return; return;
if (CallSafe(std::bind(&Context::HandleSignal, this, SIGTERM))) if (CallSafe(std::bind(&Context::HandleSignal, this, SIGTERM)))
closeWaiter = std::make_unique<std::promise<void>>(); closeWaiter = std::make_unique<std::promise<void>>();
} }
bool
Context::IsStopping() const
{
return closeWaiter.operator bool();
}
void void
Context::Wait() Context::Wait()
{ {

@ -16,36 +16,15 @@ namespace llarp
NullEndpoint(AbstractRouter* r, llarp::service::Context* parent) NullEndpoint(AbstractRouter* r, llarp::service::Context* parent)
: llarp::service::Endpoint(r, parent) : llarp::service::Endpoint(r, parent)
{ {
r->loop()->add_ticker([this] { r->loop()->add_ticker([this] { Pump(Now()); });
while (not m_InboundQuic.empty())
{
m_InboundQuic.top().process();
m_InboundQuic.pop();
}
Pump(Now());
});
} }
struct QUICEvent
{
uint64_t seqno;
std::function<void()> process;
bool
operator<(const QUICEvent& other) const
{
return other.seqno < seqno;
}
};
std::priority_queue<QUICEvent> m_InboundQuic;
virtual bool virtual bool
HandleInboundPacket( HandleInboundPacket(
const service::ConvoTag tag, const service::ConvoTag tag,
const llarp_buffer_t& buf, const llarp_buffer_t& buf,
service::ProtocolType t, service::ProtocolType t,
uint64_t seqno) override uint64_t) override
{ {
LogTrace("Inbound ", t, " packet (", buf.sz, "B) on convo ", tag); LogTrace("Inbound ", t, " packet (", buf.sz, "B) on convo ", tag);
if (t == service::ProtocolType::Control) if (t == service::ProtocolType::Control)
@ -68,11 +47,7 @@ namespace llarp
return false; return false;
} }
MarkConvoTagActive(tag); MarkConvoTagActive(tag);
std::vector<byte_t> copy; quic->receive_packet(tag, buf);
copy.resize(buf.sz);
std::copy_n(buf.base, buf.sz, copy.data());
m_InboundQuic.push({seqno, [quic, buf = copy, tag]() { quic->receive_packet(tag, buf); }});
m_router->loop()->wakeup();
return true; return true;
} }

@ -4,8 +4,16 @@
#include "llarp.hpp" #include "llarp.hpp"
#include "config/config.hpp" #include "config/config.hpp"
#include <llarp/router/abstractrouter.hpp>
#include <llarp/service/context.hpp>
#include <llarp/quic/tunnel.hpp>
#include <mutex>
struct lokinet_context struct lokinet_context
{ {
std::mutex m_access;
std::shared_ptr<llarp::Context> impl; std::shared_ptr<llarp::Context> impl;
std::unique_ptr<std::thread> runner; std::unique_ptr<std::thread> runner;
@ -18,10 +26,85 @@ struct lokinet_context
if (runner) if (runner)
runner->join(); runner->join();
} }
/// acquire mutex for accessing this context
[[nodiscard]] auto
acquire()
{
return std::unique_lock{m_access};
}
std::unordered_map<int, bool> streams;
void
inbound_stream(int id)
{
streams[id] = true;
}
void
outbound_stream(int id)
{
streams[id] = false;
}
}; };
struct lokinet_context g_context namespace
{}; {
struct lokinet_context g_context
{};
lokinet_stream_result*
stream_error(int err)
{
return new lokinet_stream_result{err, {0}, 0, 0};
}
lokinet_stream_result*
stream_okay(std::string host, int port, int stream_id)
{
auto* result = new lokinet_stream_result{};
std::copy_n(
host.c_str(),
std::min(host.size(), sizeof(result->local_address) - 1),
result->local_address);
result->local_port = port;
result->stream_id = stream_id;
return result;
}
std::pair<std::string, int>
split_host_port(std::string data, std::string proto = "tcp")
{
std::string host, portStr;
if (auto pos = data.find(":"); pos != std::string::npos)
{
host = data.substr(0, pos);
portStr = data.substr(pos + 1);
}
else
throw EINVAL;
if (auto* serv = getservbyname(portStr.c_str(), proto.c_str()))
{
return {host, serv->s_port};
}
else
throw(errno ? errno : EINVAL);
}
int
accept_port(const char* remote, uint16_t port, void* ptr)
{
(void)remote;
if (port == *static_cast<uint16_t*>(ptr))
{
return 0;
}
return -1;
}
} // namespace
extern "C" extern "C"
{ {
@ -40,24 +123,236 @@ extern "C"
void void
lokinet_context_free(struct lokinet_context* ctx) lokinet_context_free(struct lokinet_context* ctx)
{ {
lokinet_context_stop(ctx);
delete ctx; delete ctx;
} }
void void
lokinet_context_start(struct lokinet_context* ctx) lokinet_context_start(struct lokinet_context* ctx)
{ {
if (not ctx)
return;
auto lock = ctx->acquire();
ctx->runner = std::make_unique<std::thread>([ctx]() { ctx->runner = std::make_unique<std::thread>([ctx]() {
auto config = std::make_shared<llarp::Config>(fs::path{""}); ctx->impl->Configure(llarp::Config::EmbeddedConfig());
ctx->impl->Configure(config);
const llarp::RuntimeOptions opts{}; const llarp::RuntimeOptions opts{};
ctx->impl->Setup(opts); ctx->impl->Setup(opts);
ctx->impl->Run(opts);
}); });
} }
void void
lokinet_context_stop(struct lokinet_context* ctx) lokinet_context_stop(struct lokinet_context* ctx)
{ {
ctx->impl->CloseAsync(); if (not ctx)
ctx->impl->Wait(); return;
auto lock = ctx->acquire();
if (not ctx->impl->IsStopping())
{
ctx->impl->CloseAsync();
ctx->impl->Wait();
}
if (ctx->runner)
ctx->runner->join();
ctx->runner.reset();
}
struct lokinet_stream_result*
lokinet_outbound_stream(const char* remote, const char* local, struct lokinet_context* ctx)
{
if (ctx == nullptr)
return stream_error(EHOSTDOWN);
std::promise<lokinet_stream_result*> promise;
{
auto lock = ctx->acquire();
if (not ctx->impl->IsUp())
return stream_error(EHOSTDOWN);
std::string remotehost;
int remoteport;
try
{
auto [h, p] = split_host_port(remote);
remotehost = h;
remoteport = p;
}
catch (int err)
{
return stream_error(err);
}
// TODO: make configurable (?)
std::string endpoint{"default"};
llarp::SockAddr localAddr;
try
{
if (local)
localAddr = llarp::SockAddr{std::string{local}};
else
localAddr = llarp::SockAddr{"127.0.0.1:0"};
}
catch (std::exception& ex)
{
return stream_error(EINVAL);
}
auto call = [&promise,
ctx,
router = ctx->impl->router,
remotehost,
remoteport,
endpoint,
localAddr]() {
auto ep = router->hiddenServiceContext().GetEndpointByName(endpoint);
if (ep == nullptr)
{
promise.set_value(stream_error(EHOSTUNREACH));
return;
}
auto* quic = ep->GetQUICTunnel();
if (quic == nullptr)
{
promise.set_value(stream_error(ENOTSUP));
return;
}
try
{
auto [addr, id] = quic->open(
remotehost, remoteport, [](auto&&) {}, localAddr);
auto [host, port] = split_host_port(addr.toString());
ctx->outbound_stream(id);
promise.set_value(stream_okay(host, port, id));
}
catch (std::exception& ex)
{
promise.set_value(stream_error(ECANCELED));
}
catch (int err)
{
promise.set_value(stream_error(err));
}
};
ctx->impl->CallSafe([call]() {
// we dont want the mainloop to die in case setting the value on the promise fails
try
{
call();
}
catch (...)
{}
});
}
auto future = promise.get_future();
try
{
if (auto status = future.wait_for(std::chrono::seconds{10});
status == std::future_status::ready)
{
return future.get();
}
else
{
promise.set_value(stream_error(ETIMEDOUT));
return future.get();
}
}
catch (std::exception& ex)
{
return stream_error(EBADF);
}
}
int
lokinet_inbound_stream(uint16_t port, struct lokinet_context* ctx)
{
/// FIXME: delete pointer later
return lokinet_inbound_stream_filter(&accept_port, (void*)new std::uintptr_t{port}, ctx);
}
int
lokinet_inbound_stream_filter(
lokinet_stream_filter acceptFilter, void* user, struct lokinet_context* ctx)
{
if (acceptFilter == nullptr)
{
acceptFilter = [](auto, auto, auto) { return 0; };
}
if (not ctx)
return -1;
std::promise<int> promise;
{
auto lock = ctx->acquire();
if (not ctx->impl->IsUp())
{
return -1;
}
ctx->impl->CallSafe([router = ctx->impl->router, acceptFilter, user, &promise]() {
auto ep = router->hiddenServiceContext().GetEndpointByName("default");
auto* quic = ep->GetQUICTunnel();
auto id = quic->listen(
[acceptFilter, user](auto remoteAddr, auto port) -> std::optional<llarp::SockAddr> {
std::string remote{remoteAddr};
if (auto result = acceptFilter(remote.c_str(), port, user))
{
if (result == -1)
{
throw std::invalid_argument{"rejected"};
}
}
else
return llarp::SockAddr{"127.0.0.1:" + std::to_string(port)};
return std::nullopt;
});
promise.set_value(id);
});
}
auto ftr = promise.get_future();
auto id = ftr.get();
{
auto lock = ctx->acquire();
ctx->inbound_stream(id);
}
return id;
}
void
lokinet_close_stream(int stream_id, struct lokinet_context* ctx)
{
if (not ctx)
return;
auto lock = ctx->acquire();
if (not ctx->impl->IsUp())
return;
try
{
std::promise<void> promise;
bool inbound = ctx->streams.at(stream_id);
ctx->impl->CallSafe([stream_id, inbound, router = ctx->impl->router, &promise]() {
auto ep = router->hiddenServiceContext().GetEndpointByName("default");
auto* quic = ep->GetQUICTunnel();
try
{
if (inbound)
quic->forget(stream_id);
else
quic->close(stream_id);
}
catch (...)
{}
promise.set_value();
});
promise.get_future().get();
}
catch (...)
{}
} }
} }

@ -1378,6 +1378,7 @@ namespace llarp
} }
if (not SendToOrQueue(*maybe, pkt, t)) if (not SendToOrQueue(*maybe, pkt, t))
return false; return false;
MarkConvoTagActive(tag);
Loop()->wakeup(); Loop()->wakeup();
return true; return true;
} }
@ -1425,7 +1426,11 @@ namespace llarp
msg.payload.size(), msg.payload.size(),
" bytes seqno=", " bytes seqno=",
msg.seqno); msg.seqno);
if (not HandleInboundPacket(msg.tag, msg.payload, msg.proto, msg.seqno)) if (HandleInboundPacket(msg.tag, msg.payload, msg.proto, msg.seqno))
{
MarkConvoTagActive(msg.tag);
}
else
{ {
LogWarn("Failed to handle inbound message"); LogWarn("Failed to handle inbound message");
} }

Loading…
Cancel
Save