Implement upstream DNS using libunbound

pull/1307/head
Thomas Winget 4 years ago
parent b6817646f6
commit c2a30692cf

@ -101,6 +101,7 @@ add_library(liblokinet
dns/rr.cpp
dns/serialize.cpp
dns/server.cpp
dns/unbound_resolver.cpp
consensus/table.cpp
@ -220,7 +221,7 @@ if(WITH_HIVE)
target_sources(liblokinet PRIVATE tooling/router_hive.cpp)
endif()
target_link_libraries(liblokinet PUBLIC cxxopts abyss lokinet-platform lokinet-util lokinet-cryptography)
target_link_libraries(liblokinet PUBLIC cxxopts unbound abyss lokinet-platform lokinet-util lokinet-cryptography)
if(BUILD_SHARED_LIBS)
install(TARGETS lokinet-util lokinet-platform liblokinet LIBRARY DESTINATION lib)

@ -48,9 +48,6 @@ namespace llarp
Message(Message&& other);
Message(const Message& other);
void
UpdateHeader();
void
AddNXReply(RR_TTL_t ttl = 1);

@ -37,8 +37,14 @@ namespace llarp
bool
Proxy::Start(const IpAddress& addr, const std::vector<IpAddress>& resolvers)
{
m_Resolvers.clear();
m_Resolvers = resolvers;
if (resolvers.size())
{
if (not SetupUnboundResolver(resolvers))
{
return false;
}
}
const IpAddress any("0.0.0.0", 0);
auto self = shared_from_this();
LogicCall(m_ClientLogic, [=]() {
@ -89,6 +95,36 @@ namespace llarp
return *itr;
}
bool
Proxy::SetupUnboundResolver(const std::vector<IpAddress>& resolvers)
{
auto replyFunc = [self=weak_from_this()](const SockAddr& to, Message msg)
{
auto this_ptr = self.lock();
if (this_ptr)
{
this_ptr->SendServerMessageTo(to, msg);
}
};
m_UnboundResolver = std::make_shared<UnboundResolver>(m_ServerLoop, std::move(replyFunc));
if (not m_UnboundResolver->Init())
{
m_UnboundResolver = nullptr;
return false;
}
for (const auto& resolver : resolvers)
{
if (not m_UnboundResolver->AddUpstreamResolver(resolver.toString()))
{
m_UnboundResolver = nullptr;
return false;
}
}
return true;
}
void
Proxy::HandleTick(llarp_udp_io*)
{
@ -186,7 +222,6 @@ namespace llarp
}
TX tx = {hdr.id, from};
auto itr = m_Forwarded.find(tx);
Message msg(hdr);
if (!msg.Decode(&pkt))
{
@ -222,7 +257,7 @@ namespace llarp
llarp::LogWarn("failed to handle hooked dns");
}
}
else if (m_Resolvers.size() == 0)
else if (not m_UnboundResolver)
{
// no upstream resolvers
// let's serv fail it
@ -230,26 +265,9 @@ namespace llarp
SendServerMessageTo(from, std::move(msg));
}
else if (itr == m_Forwarded.end())
{
// new forwarded query
tx.from = PickRandomResolver();
m_Forwarded[tx] = from;
LogicCall(m_ClientLogic, [=] {
// do query
const llarp_buffer_t tmpbuf(buf);
llarp_ev_udp_sendto(&self->m_Client, tx.from.createSockAddr(), tmpbuf);
});
}
else
{
// send the query again because it's probably FEC from the requester
const auto resolver = itr->first.from;
LogicCall(m_ClientLogic, [=] {
// send it
const llarp_buffer_t tmpbuf(buf);
llarp_ev_udp_sendto(&self->m_Client, resolver.createSockAddr(), tmpbuf);
});
m_UnboundResolver->Lookup(from, msg);
}
}

@ -5,6 +5,7 @@
#include <ev/ev.h>
#include <net/net.hpp>
#include <util/thread/logic.hpp>
#include <dns/unbound_resolver.hpp>
#include <unordered_map>
@ -55,9 +56,6 @@ namespace llarp
static void
HandleTick(llarp_udp_io*);
void
Tick(llarp_time_t now);
void
HandlePktClient(const SockAddr& from, Buffer_t buf);
@ -73,6 +71,9 @@ namespace llarp
IpAddress
PickRandomResolver() const;
bool
SetupUnboundResolver(const std::vector<IpAddress>& resolvers);
private:
llarp_udp_io m_Server;
llarp_udp_io m_Client;
@ -82,6 +83,7 @@ namespace llarp
Logic_ptr m_ClientLogic;
IQueryHandler* m_QueryHandler;
std::vector<IpAddress> m_Resolvers;
std::shared_ptr<UnboundResolver> m_UnboundResolver;
struct TX
{

@ -0,0 +1,128 @@
#include <dns/unbound_resolver.hpp>
#include <dns/server.hpp>
#include <util/buffer.hpp>
namespace llarp::dns
{
struct PendingUnboundLookup
{
std::weak_ptr<UnboundResolver> resolver;
Message msg;
SockAddr source;
};
void UnboundResolver::Reset()
{
started = false;
if (unboundContext)
{
DeregisterPollFD();
ub_ctx_delete(unboundContext);
}
unboundContext = nullptr;
}
void UnboundResolver::DeregisterPollFD()
{
eventLoop->deregister_poll_fd_readable(ub_fd(unboundContext));
}
void UnboundResolver::RegisterPollFD()
{
eventLoop->register_poll_fd_readable(ub_fd(unboundContext), [=](){ ub_process(unboundContext); });
}
UnboundResolver::UnboundResolver(
llarp_ev_loop_ptr eventLoop,
ReplyFunction replyFunc)
: unboundContext(nullptr), started(false), eventLoop(eventLoop), replyFunc(replyFunc)
{
}
// static callback
void UnboundResolver::Callback(void* data, int err, ub_result* result)
{
std::unique_ptr<PendingUnboundLookup> lookup{static_cast<PendingUnboundLookup*>(data)};
auto this_ptr = lookup->resolver.lock();
if (not this_ptr) return; // resolver is gone, so we don't reply.
if (err != 0)
{
Message& msg = lookup->msg;
msg.AddServFail();
this_ptr->replyFunc(lookup->source, msg);
ub_resolve_free(result);
return;
}
llarp_buffer_t buf;
buf.base = buf.cur = static_cast<byte_t*>(result->answer_packet);
buf.sz = result->answer_len;
MessageHeader hdr;
hdr.Decode(&buf);
hdr.id = lookup->msg.hdr_id;
Message msg(hdr);
msg.Decode(&buf);
this_ptr->replyFunc(lookup->source, msg);
ub_resolve_free(result);
}
bool UnboundResolver::Init()
{
if (started)
{
Reset();
}
unboundContext = ub_ctx_create();
if (not unboundContext)
{
return false;
}
RegisterPollFD();
return true;
}
bool UnboundResolver::AddUpstreamResolver(const std::string& upstreamResolverIP)
{
if (ub_ctx_set_fwd(unboundContext, upstreamResolverIP.c_str()) != 0)
{
Reset();
return false;
}
return true;
}
void UnboundResolver::Lookup(const SockAddr& source, Message& msg)
{
if (not unboundContext)
{
msg.AddServFail();
replyFunc(source, msg);
return;
}
started = true;
const auto& q = msg.questions[0];
auto* lookup = new PendingUnboundLookup{weak_from_this(), msg, source};
int err = ub_resolve_async(unboundContext, q.Name().c_str(), q.qtype, q.qclass, (void*)lookup, &UnboundResolver::Callback, nullptr);
if (err != 0)
{
msg.AddServFail();
replyFunc(source, msg);
return;
}
}
} // namespace llarp::dns

@ -0,0 +1,52 @@
#pragma once
#include <unbound.h>
#include <mutex>
#include <atomic>
#include <memory>
#include <queue>
#include <ev/ev.hpp>
#include <util/thread/logic.hpp>
#include <dns/message.hpp>
namespace llarp::dns
{
using ReplyFunction = std::function<void(const SockAddr& source, const Message& msg)>;
class UnboundResolver : public std::enable_shared_from_this<UnboundResolver>
{
private:
ub_ctx* unboundContext;
bool started;
llarp_ev_loop_ptr eventLoop;
ReplyFunction replyFunc;
void Reset();
void DeregisterPollFD();
void RegisterPollFD();
public:
UnboundResolver(
llarp_ev_loop_ptr eventLoop,
ReplyFunction replyFunc);
static void Callback(void* data, int err, ub_result* result);
// upstream resolver IP can be IPv4 or IPv6
bool Init();
bool AddUpstreamResolver(const std::string& upstreamResolverIP);
void Lookup(const SockAddr& source, Message& msg);
};
} // namespace llarp::dns

@ -810,6 +810,13 @@ struct llarp_ev_loop
virtual void
call_soon(std::function<void(void)> f) = 0;
virtual void
register_poll_fd_readable(int fd, std::function<void(void)> callback) = 0;
virtual void
deregister_poll_fd_readable(int fd) = 0;
};
#endif

@ -957,7 +957,7 @@ namespace libuv
[](uv_handle_t* h, void*) {
if (uv_is_closing(h))
return;
if (h->data && uv_is_active(h) && h->type != UV_TIMER)
if (h->data && uv_is_active(h) && h->type != UV_TIMER && h->type != UV_POLL)
{
static_cast<glue*>(h->data)->Close();
}
@ -1056,4 +1056,48 @@ namespace libuv
uv_async_send(&m_WakeUp);
}
void
OnUVPollFDReadable(uv_poll_t* handle, int status, [[maybe_unused]] int events)
{
if (status < 0) return; // probably fd was closed
auto func = static_cast<libuv::Loop::Callback* >(handle->data);
(*func)();
}
void
Loop::register_poll_fd_readable(int fd, Callback callback)
{
if (m_Polls.count(fd))
{
llarp::LogError("Attempting to create event loop poll on fd ", fd, ", but an event loop poll for that fd already exists.");
return;
}
// new a copy as the one passed in here will go out of scope
auto function_ptr = new Callback(callback);
auto& new_poll = m_Polls[fd];
uv_poll_init(&m_Impl, &new_poll, fd);
new_poll.data = (void *) function_ptr;
uv_poll_start(&new_poll, UV_READABLE, &OnUVPollFDReadable);
}
void
Loop::deregister_poll_fd_readable(int fd)
{
auto itr = m_Polls.find(fd);
if (itr != m_Polls.end())
{
uv_poll_stop(&(itr->second));
auto func = static_cast<Callback* >(itr->second.data);
delete func;
m_Polls.erase(itr);
}
}
} // namespace libuv

@ -125,6 +125,12 @@ namespace libuv
void
call_soon(std::function<void(void)> f) override;
void
register_poll_fd_readable(int fd, Callback callback) override;
void
deregister_poll_fd_readable(int fd) override;
void
FlushLogic();
@ -144,6 +150,8 @@ namespace libuv
std::map<uint32_t, Callback> m_pendingCalls;
std::unordered_map<int, uv_poll_t> m_Polls;
llarp::thread::Queue<PendingTimer> m_timerQueue;
llarp::thread::Queue<uint32_t> m_timerCancelQueue;
};

Loading…
Cancel
Save