Simul-defer connections

- When two relays are repeatedly attempting connections to one another simultaneously, the connection initiated by the RouterID that appears first (in lexicographical order) is deferred to. The connection initiated by the other endpoint is marked to close quietly (w/o executing callbacks), and is rejected in the TLS verification hook
- Bypassing callback execution is critical, as it will clean-up the link::Connection object for the connection that is being deferred to; this results in BOTH connections being destroyed.
pull/2232/head
dr7ana 4 months ago
parent 12381c876f
commit 7970ad2d07

@ -442,10 +442,6 @@ namespace
cli.exit(e);
};
// TESTNET:
oxen::log::set_level("quic", oxen::log::Level::critical);
oxen::log::set_level("quicverbose", oxen::log::Level::debug);
if (configFile.has_value())
{
// when we have an explicit filepath

@ -266,29 +266,47 @@ namespace llarp
ep.client_conns.emplace(other, nullptr);
return true;
}
if (alpn == alpns::SN_ALPNS)
{
// verify as service node!
bool result = node_db->registered_routers().count(other);
log::critical(
logcat,
"{} node was {} to confirm remote (RID:{}) is registered; {} connection!",
us,
result ? "able" : "unable",
other,
result ? "allowing" : "rejecting");
if (result)
{
auto [_, b] = ep.service_conns.try_emplace(other, nullptr);
auto [itr, b] = ep.service_conns.try_emplace(other, nullptr);
if (not b)
{
// If we fail to try_emplace a connection to the incoming RID, then we are
// simultaneously dealing with an outbound and inbound from the same connection. To
// resolve this, both endpoints will defer to the connection initiated by the RID
// that appears first in lexicographical order
auto defer_to_incoming = other < router().local_rid();
if (defer_to_incoming)
itr->second->conn->set_close_quietly();
log::critical(
logcat, "{} node rejecting inbound -- already have connection to remote!", us);
logcat,
"{} node received inbound with ongoing outbound to remote (RID:{}); {}!",
us,
other,
defer_to_incoming ? "deferring to inbound" : "rejecting in favor of outbound");
return defer_to_incoming;
}
return result and b;
log::critical(
logcat, "{} node accepting inbound from registered remote (RID:{})", us, other);
}
else
log::critical(
logcat,
"{} node was unable to confirm remote (RID:{}) is registered; rejecting "
"connection!",
us,
other);
return result;
}
@ -313,12 +331,8 @@ namespace llarp
LinkManager::make_control(oxen::quic::connection_interface& ci, const RouterID& rid)
{
auto control_stream = ci.queue_incoming_stream<oxen::quic::BTRequestStream>(
[/* this, */ rid = rid](oxen::quic::Stream&, uint64_t error_code) {
log::warning(
logcat,
"BTRequestStream closed unexpectedly (ec:{}); closing inbound connection...",
error_code);
// ep.close_connection(rid);
[rid = rid](oxen::quic::Stream&, uint64_t error_code) {
log::warning(logcat, "BTRequestStream closed unexpectedly (ec:{})", error_code);
});
log::critical(logcat, "Queued BTStream to be opened (ID:{})", control_stream->stream_id());
@ -338,9 +352,7 @@ namespace llarp
{
log::critical(logcat, "Configuring inbound connection from relay RID:{}", rid);
if (!it->second)
it->second =
std::make_shared<link::Connection>(ci.shared_from_this(), make_control(ci, rid));
it->second = std::make_shared<link::Connection>(ci.shared_from_this(), make_control(ci, rid));
}
else if (auto it = ep.client_conns.find(rid); it != ep.client_conns.end())
{
@ -663,7 +675,7 @@ namespace llarp
const auto& rid = rc.router_id();
auto res = client_only ? not ep.have_client_conn(rid) : not ep.have_conn(rid);
log::debug(logcat, "RID:{} {}", rid, res ? "ACCEPTED" : "REJECTED");
log::trace(logcat, "RID:{} {}", rid, res ? "ACCEPTED" : "REJECTED");
return res;
};
@ -689,32 +701,34 @@ namespace llarp
void
LinkManager::gossip_rc(const RouterID& last_sender, const RemoteRC& rc)
{
int count = 0;
const auto& gossip_src = rc.router_id();
_router.loop()->call([this, last_sender, rc]() {
int count = 0;
const auto& gossip_src = rc.router_id();
for (auto& [rid, conn] : ep.service_conns)
{
// don't send back to the gossip source or the last sender
if (rid == gossip_src or rid == last_sender)
continue;
for (auto& [rid, conn] : ep.service_conns)
{
// don't send back to the gossip source or the last sender
if (rid == gossip_src or rid == last_sender)
continue;
send_control_message(
rid,
"gossip_rc"s,
GossipRCMessage::serialize(last_sender, rc),
[](oxen::quic::message) mutable {
log::critical(logcat, "PLACEHOLDER FOR GOSSIP RC RESPONSE HANDLER");
});
++count;
}
send_control_message(
rid,
"gossip_rc"s,
GossipRCMessage::serialize(last_sender, rc),
[](oxen::quic::message) mutable {
log::trace(logcat, "PLACEHOLDER FOR GOSSIP RC RESPONSE HANDLER");
});
++count;
}
log::critical(logcat, "Dispatched {} GossipRC requests!", count);
log::critical(logcat, "Dispatched {} GossipRC requests!", count);
});
}
void
LinkManager::handle_gossip_rc(oxen::quic::message m)
{
log::critical(logcat, "Handling GossipRC request...");
log::debug(logcat, "Handling GossipRC request...");
// RemoteRC constructor wraps deserialization in a try/catch
RemoteRC rc;
@ -730,7 +744,7 @@ namespace llarp
}
catch (const std::exception& e)
{
log::info(link_cat, "Exception handling GossipRC request: {}", e.what());
log::critical(link_cat, "Exception handling GossipRC request: {}", e.what());
return;
}
@ -740,7 +754,7 @@ namespace llarp
gossip_rc(_router.local_rid(), rc);
}
else
log::critical(link_cat, "Received known or old RC, not storing or forwarding.");
log::debug(link_cat, "Received known or old RC, not storing or forwarding.");
}
// TODO: can probably use ::send_control_message instead. Need to discuss the potential difference

@ -128,50 +128,6 @@ namespace llarp
};
} // namespace link
enum class SessionResult
{
Establish,
Timeout,
RouterNotFound,
InvalidRouter,
NoLink,
EstablishFail
};
constexpr std::string_view
ToString(SessionResult sr)
{
return sr == llarp::SessionResult::Establish ? "success"sv
: sr == llarp::SessionResult::Timeout ? "timeout"sv
: sr == llarp::SessionResult::NoLink ? "no link"sv
: sr == llarp::SessionResult::InvalidRouter ? "invalid router"sv
: sr == llarp::SessionResult::RouterNotFound ? "not found"sv
: sr == llarp::SessionResult::EstablishFail ? "establish failed"sv
: "???"sv;
}
template <>
constexpr inline bool IsToStringFormattable<SessionResult> = true;
struct PendingMessage
{
std::string body;
std::optional<std::string> endpoint = std::nullopt;
std::function<void(oxen::quic::message)> func = nullptr;
RouterID rid;
bool is_control = false;
PendingMessage(std::string b) : body{std::move(b)}
{}
PendingMessage(
std::string b, std::string ep, std::function<void(oxen::quic::message)> f = nullptr)
: body{std::move(b)}, endpoint{std::move(ep)}, func{std::move(f)}, is_control{true}
{}
};
using MessageQueue = std::deque<PendingMessage>;
struct Router;
struct LinkManager
@ -469,12 +425,7 @@ namespace llarp
std::shared_ptr<oxen::quic::BTRequestStream> control_stream =
conn_interface->template open_stream<oxen::quic::BTRequestStream>(
[rid = rid](oxen::quic::Stream&, uint64_t error_code) {
log::warning(
logcat,
"BTRequestStream closed unexpectedly (ec:{}); closing outbound "
"connection...",
error_code);
// close_connection(rid);
log::warning(logcat, "BTRequestStream closed unexpectedly (ec:{})", error_code);
});
if (is_snode)
@ -534,11 +485,7 @@ namespace llarp
auto control_stream = conn_interface->template open_stream<oxen::quic::BTRequestStream>(
[rid = rid](oxen::quic::Stream&, uint64_t error_code) {
log::warning(
logcat,
"BTRequestStream closed unexpectedly (ec:{}); closing outbound connection...",
error_code);
// close_connection(rid);
log::warning(logcat, "BTRequestStream closed unexpectedly (ec:{})", error_code);
});
if (is_snode)

@ -381,8 +381,7 @@ namespace llarp
llarp::logRingBuffer = nullptr;
// TESTNET:
oxen::log::set_level("quic", oxen::log::Level::critical);
oxen::log::set_level("quicverbose", oxen::log::Level::debug);
// oxen::log::set_level("quic", oxen::log::Level::critical);
log::debug(logcat, "Configuring router");
@ -789,9 +788,9 @@ namespace llarp
last_rc_gossip = now_timepoint;
// TESTNET: 1 to 5 minutes before testnet gossip interval
// TESTNET: 0 to 3 minutes before testnet gossip interval
auto delta =
std::chrono::seconds{std::uniform_int_distribution<size_t>{60, 300}(llarp::csrng)};
std::chrono::seconds{std::uniform_int_distribution<size_t>{0, 180}(llarp::csrng)};
next_rc_gossip = now_timepoint + TESTNET_GOSSIP_INTERVAL - delta;
}

@ -11,7 +11,6 @@
#include <llarp/ev/ev.hpp>
#include <llarp/exit/context.hpp>
#include <llarp/handlers/tun.hpp>
// #include <llarp/link/link_manager.hpp>
#include <llarp/path/path_context.hpp>
#include <llarp/profiling.hpp>
#include <llarp/router_contact.hpp>
@ -56,7 +55,7 @@ namespace llarp
(INTROSET_RELAY_REDUNDANCY * INTROSET_REQS_PER_RELAY)};
// TESTNET: these constants are shortened for testing purposes
inline constexpr std::chrono::milliseconds TESTNET_GOSSIP_INTERVAL{10min};
inline constexpr std::chrono::milliseconds TESTNET_GOSSIP_INTERVAL{15min};
inline constexpr std::chrono::milliseconds RC_UPDATE_INTERVAL{5min};
inline constexpr std::chrono::milliseconds INITIAL_ATTEMPT_INTERVAL{30s};
// as we advance towards full mesh, we try to connect to this number per tick

Loading…
Cancel
Save