Fix connection close handling

Replace stream_reset (which typically isn't called) with a stream_close
handler (which is already called whether or not it was a reset).  Most
importantly, the server side needs to extend the max bidi streams
counter during stream_close (otherwise we run out when we hit the
limit and new connections just stall).
pull/1576/head
Jason Rhinelander 3 years ago committed by Jeff Becker
parent 8bc60a59ac
commit 5e912600f8
No known key found for this signature in database
GPG Key ID: F357B3B42F6F9B05

@ -242,16 +242,16 @@ namespace llarp::quic
return static_cast<Connection*>(user_data)->stream_opened({stream_id});
}
int
stream_reset_cb(
stream_close_cb(
ngtcp2_conn* conn,
int64_t stream_id,
uint64_t final_size,
uint64_t app_error_code,
void* user_data,
void* stream_user_data)
{
LogTrace("######################", __func__);
return static_cast<Connection*>(user_data)->stream_reset({stream_id}, app_error_code);
static_cast<Connection*>(user_data)->stream_closed({stream_id}, app_error_code);
return 0;
}
// (client only)
@ -392,7 +392,7 @@ namespace llarp::quic
cb.recv_stream_data = recv_stream_data;
cb.acked_stream_data_offset = acked_stream_data_offset;
cb.stream_open = stream_open;
cb.stream_reset = stream_reset_cb;
cb.stream_close = stream_close_cb;
cb.extend_max_local_streams_bidi = extend_max_local_streams_bidi;
cb.rand = rand;
cb.get_new_connection_id = get_new_connection_id;
@ -535,8 +535,6 @@ namespace llarp::quic
retransmit_timer->stop();
retransmit_timer->close();
}
for (auto& [id, str] : streams)
str->hard_close();
}
void
@ -622,12 +620,6 @@ namespace llarp::quic
{
auto& stream = **it;
auto bufs = stream.pending();
if (stream.is_shutdown
|| (bufs.empty() && !stream.is_new && !(stream.is_closing && !stream.sent_fin)))
{
it = strs.erase(it);
continue;
}
std::vector<ngtcp2_vec> vecs;
vecs.reserve(bufs.size());
std::transform(bufs.begin(), bufs.end(), std::back_inserter(vecs), [](const auto& buf) {
@ -840,10 +832,8 @@ namespace llarp::quic
}
if (fin)
{
if (str->close_callback)
str->close_callback(*str, std::nullopt);
streams.erase(id);
io_ready();
LogTrace("Stream ", str->id(), " closed by remote");
// Don't cleanup here; stream_closed is going to be called right away to deal with that
}
else
{
@ -853,26 +843,33 @@ namespace llarp::quic
return 0;
}
int
Connection::stream_reset(StreamID id, uint64_t app_code)
void
Connection::stream_closed(StreamID id, uint64_t app_code)
{
LogDebug(id, " reset with code ", app_code);
assert(ngtcp2_is_bidi_stream(id.id));
LogDebug(id, " closed with code ", app_code);
auto it = streams.find(id);
if (it == streams.end())
return NGTCP2_ERR_CALLBACK_FAILURE;
return;
auto& stream = *it->second;
const bool was_closing = stream.is_closing;
stream.is_closing = true;
stream.is_closing = stream.is_shutdown = true;
if (!was_closing && stream.close_callback)
{
LogDebug("Invoke stream close callback");
stream.close_callback(stream, app_code);
stream.close_callback(stream, app_code == 0 ? std::nullopt : std::optional<uint64_t>{app_code});
}
LogDebug("Erasing stream ", id, " from ", (void*)it->second.get());
streams.erase(it);
return 0;
if (!ngtcp2_conn_is_local_stream(*this, id.id))
ngtcp2_conn_extend_max_streams_bidi(*this, 1);
io_ready(); // Probably superfluous but sometimes we might need to send a FIN or something.
}
int
Connection::stream_ack(StreamID id, size_t size)
{

@ -254,11 +254,11 @@ namespace llarp::quic
int
stream_receive(StreamID id, bstring_view data, bool fin);
// Called when a stream is closed/reset
int
stream_reset(StreamID id, uint64_t app_error_code);
// Called when a stream is closed
void
stream_closed(StreamID id, uint64_t app_error_code);
// Called when stream data has been acknoledged and can be freed
// Called when stream data has been acknowledged and can be freed
int
stream_ack(StreamID id, size_t size);

@ -71,7 +71,16 @@ namespace llarp::quic
Stream::~Stream()
{
hard_close();
LogTrace("Destroying stream ", stream_id);
if (avail_trigger)
{
avail_trigger->close();
avail_trigger.reset();
}
bool was_closing = is_closing;
is_closing = is_shutdown = true;
if (!was_closing && close_callback)
close_callback(*this, STREAM_ERROR_CONNECTION_EXPIRED);
}
void
@ -330,19 +339,6 @@ namespace llarp::quic
conn.io_ready();
}
void
Stream::hard_close()
{
if (avail_trigger)
{
avail_trigger->close();
avail_trigger.reset();
}
if (!is_closing && close_callback)
close_callback(*this, STREAM_ERROR_CONNECTION_EXPIRED);
is_closing = is_shutdown = true;
}
void
Stream::data(std::shared_ptr<void> data)
{

@ -84,7 +84,10 @@ namespace llarp::quic
close_tcp_pair(quic::Stream& st, std::optional<uint64_t> /*errcode*/)
{
if (auto tcp = st.data<uvw::TCPHandle>())
{
LogTrace("Closing TCP connection");
tcp->close();
}
};
// Creates a new tcp handle that forwards incoming data/errors/closes into appropriate actions
// on the given quic stream.
@ -98,9 +101,9 @@ namespace llarp::quic
tcp.on<uvw::CloseEvent>([](auto&, uvw::TCPHandle& c) {
// This fires sometime after we call `close()` to signal that the close is done.
LogError("Connection closed to ", c.peer().ip, ":", c.peer().port, "; closing quic stream");
if (auto stream = c.data<Stream>())
{
LogInfo("Local TCP connection closed, closing associated quic stream ", stream->id());
stream->close();
stream->data(nullptr);
}
@ -108,7 +111,7 @@ namespace llarp::quic
});
tcp.on<uvw::EndEvent>([](auto&, uvw::TCPHandle& c) {
// This fires on eof, most likely because the other side of the TCP connection closed it.
LogError("EOF on connection to ", c.peer().ip, ":", c.peer().port);
LogInfo("EOF on connection to ", c.peer().ip, ":", c.peer().port);
c.close();
});
tcp.on<uvw::ErrorEvent>([](const uvw::ErrorEvent& e, uvw::TCPHandle& tcp) {

Loading…
Cancel
Save