More logging

unused/TracingSupport
Chip Senkbeil 12 months ago
parent 78fa43c1bf
commit a108abbf20
No known key found for this signature in database
GPG Key ID: 35EF1F8EC72A4131

@ -212,16 +212,11 @@ impl UntypedClient {
// down. // down.
let _shutdown_tx = shutdown_tx_2; let _shutdown_tx = shutdown_tx_2;
// Keep track of block status so we can log appropriately
let mut was_blocked = false;
loop { loop {
// If we have flagged that a reconnect is needed, attempt to do so // If we have flagged that a reconnect is needed, attempt to do so
if needs_reconnect { if needs_reconnect {
info!("Client encountered issue, attempting to reconnect"); info!("Client encountered issue, attempting to reconnect");
if log::log_enabled!(log::Level::Debug) { debug!("Using strategy {reconnect_strategy:?}");
debug!("Using strategy {reconnect_strategy:?}");
}
match reconnect_strategy.reconnect(&mut connection).await { match reconnect_strategy.reconnect(&mut connection).await {
Ok(()) => { Ok(()) => {
info!("Client successfully reconnected!"); info!("Client successfully reconnected!");
@ -239,7 +234,7 @@ impl UntypedClient {
macro_rules! silence_needs_reconnect { macro_rules! silence_needs_reconnect {
() => {{ () => {{
debug!( info!(
"Client exceeded {}s without server activity, so attempting to reconnect", "Client exceeded {}s without server activity, so attempting to reconnect",
silence_duration.as_secs_f32(), silence_duration.as_secs_f32(),
); );
@ -263,7 +258,7 @@ impl UntypedClient {
let ready = tokio::select! { let ready = tokio::select! {
// NOTE: This should NEVER return None as we never allow the channel to close. // NOTE: This should NEVER return None as we never allow the channel to close.
cb = shutdown_rx.recv() => { cb = shutdown_rx.recv() => {
debug!("Client got shutdown signal, so exiting event loop"); info!("Client got shutdown signal, so exiting event loop");
let cb = cb.expect("Impossible: shutdown channel closed!"); let cb = cb.expect("Impossible: shutdown channel closed!");
let _ = cb.send(Ok(())); let _ = cb.send(Ok(()));
watcher_tx.send_replace(ConnectionState::Disconnected); watcher_tx.send_replace(ConnectionState::Disconnected);
@ -338,7 +333,7 @@ impl UntypedClient {
} }
Ok(None) => { Ok(None) => {
debug!("Connection closed"); info!("Connection closed");
needs_reconnect = true; needs_reconnect = true;
watcher_tx.send_replace(ConnectionState::Reconnecting); watcher_tx.send_replace(ConnectionState::Reconnecting);
continue; continue;
@ -398,18 +393,6 @@ impl UntypedClient {
// If we did not read or write anything, sleep a bit to offload CPU usage // If we did not read or write anything, sleep a bit to offload CPU usage
if read_blocked && write_blocked { if read_blocked && write_blocked {
tokio::time::sleep(SLEEP_DURATION).await; tokio::time::sleep(SLEEP_DURATION).await;
if !was_blocked {
trace!("Client entering blocked state");
}
was_blocked = true;
} else {
if was_blocked {
trace!("Client exiting blocked state");
}
was_blocked = false;
} }
} }
}); });

@ -188,6 +188,7 @@ impl ServerHandler for ManagerServer {
type Response = ManagerResponse; type Response = ManagerResponse;
async fn on_request(&self, ctx: RequestCtx<Self::Request, Self::Response>) { async fn on_request(&self, ctx: RequestCtx<Self::Request, Self::Response>) {
debug!("manager::on_request({ctx:?})");
let RequestCtx { let RequestCtx {
connection_id, connection_id,
request, request,
@ -195,113 +196,161 @@ impl ServerHandler for ManagerServer {
} = ctx; } = ctx;
let response = match request.payload { let response = match request.payload {
ManagerRequest::Capabilities {} => match self.capabilities().await { ManagerRequest::Capabilities {} => {
Ok(supported) => ManagerResponse::Capabilities { supported }, debug!("Looking up capabilities");
Err(x) => ManagerResponse::from(x), match self.capabilities().await {
}, Ok(supported) => ManagerResponse::Capabilities { supported },
Err(x) => ManagerResponse::from(x),
}
}
ManagerRequest::Launch { ManagerRequest::Launch {
destination, destination,
options, options,
} => match self } => {
.launch( info!("Launching {destination} with {options}");
*destination, match self
options, .launch(
ManagerAuthenticator { *destination,
reply: reply.clone(), options,
registry: Arc::clone(&self.registry), ManagerAuthenticator {
}, reply: reply.clone(),
) registry: Arc::clone(&self.registry),
.await },
{ )
Ok(destination) => ManagerResponse::Launched { destination }, .await
Err(x) => ManagerResponse::from(x), {
}, Ok(destination) => ManagerResponse::Launched { destination },
Err(x) => ManagerResponse::from(x),
}
}
ManagerRequest::Connect { ManagerRequest::Connect {
destination, destination,
options, options,
} => match self } => {
.connect( info!("Connecting to {destination} with {options}");
*destination, match self
options, .connect(
ManagerAuthenticator { *destination,
reply: reply.clone(), options,
registry: Arc::clone(&self.registry), ManagerAuthenticator {
}, reply: reply.clone(),
) registry: Arc::clone(&self.registry),
.await },
{ )
Ok(id) => ManagerResponse::Connected { id }, .await
Err(x) => ManagerResponse::from(x), {
}, Ok(id) => ManagerResponse::Connected { id },
Err(x) => ManagerResponse::from(x),
}
}
ManagerRequest::Authenticate { id, msg } => { ManagerRequest::Authenticate { id, msg } => {
trace!("Retrieving authentication callback registry");
match self.registry.write().await.remove(&id) { match self.registry.write().await.remove(&id) {
Some(cb) => match cb.send(msg) { Some(cb) => {
Ok(_) => return, trace!("Sending {msg:?} through authentication callback");
Err(_) => ManagerResponse::Error { match cb.send(msg) {
description: "Unable to forward authentication callback".to_string(), Ok(_) => return,
}, Err(_) => ManagerResponse::Error {
}, description: "Unable to forward authentication callback"
.to_string(),
},
}
}
None => ManagerResponse::from(io::Error::new( None => ManagerResponse::from(io::Error::new(
io::ErrorKind::InvalidInput, io::ErrorKind::InvalidInput,
"Invalid authentication id", "Invalid authentication id",
)), )),
} }
} }
ManagerRequest::OpenChannel { id } => match self.connections.read().await.get(&id) { ManagerRequest::OpenChannel { id } => {
Some(connection) => match connection.open_channel(reply.clone()) { debug!("Attempting to retrieve connection {id}");
Ok(channel) => { match self.connections.read().await.get(&id) {
debug!("[Conn {id}] Channel {} has been opened", channel.id()); Some(connection) => {
let id = channel.id(); debug!("Opening channel through connection {id}");
self.channels.write().await.insert(id, channel); match connection.open_channel(reply.clone()) {
ManagerResponse::ChannelOpened { id } Ok(channel) => {
info!("[Conn {id}] Channel {} has been opened", channel.id());
let id = channel.id();
self.channels.write().await.insert(id, channel);
ManagerResponse::ChannelOpened { id }
}
Err(x) => ManagerResponse::from(x),
}
} }
Err(x) => ManagerResponse::from(x), None => ManagerResponse::from(io::Error::new(
}, io::ErrorKind::NotConnected,
None => ManagerResponse::from(io::Error::new( "Connection does not exist",
io::ErrorKind::NotConnected, )),
"Connection does not exist", }
)), }
},
ManagerRequest::Channel { id, request } => { ManagerRequest::Channel { id, request } => {
debug!("Attempting to retrieve channel {id}");
match self.channels.read().await.get(&id) { match self.channels.read().await.get(&id) {
// TODO: For now, we are NOT sending back a response to acknowledge // TODO: For now, we are NOT sending back a response to acknowledge
// a successful channel send. We could do this in order for // a successful channel send. We could do this in order for
// the client to listen for a complete send, but is it worth it? // the client to listen for a complete send, but is it worth it?
Some(channel) => match channel.send(request) { Some(channel) => {
Ok(_) => return, debug!("Sending {request:?} through channel {id}");
Err(x) => ManagerResponse::from(x), match channel.send(request) {
}, Ok(_) => return,
Err(x) => ManagerResponse::from(x),
}
}
None => ManagerResponse::from(io::Error::new(
io::ErrorKind::NotConnected,
"Channel is not open or does not exist",
)),
}
}
ManagerRequest::CloseChannel { id } => {
debug!("Attempting to remove channel {id}");
match self.channels.write().await.remove(&id) {
Some(channel) => {
debug!("Removed channel {}", channel.id());
match channel.close() {
Ok(_) => {
info!("Channel {id} has been closed");
ManagerResponse::ChannelClosed { id }
}
Err(x) => ManagerResponse::from(x),
}
}
None => ManagerResponse::from(io::Error::new( None => ManagerResponse::from(io::Error::new(
io::ErrorKind::NotConnected, io::ErrorKind::NotConnected,
"Channel is not open or does not exist", "Channel is not open or does not exist",
)), )),
} }
} }
ManagerRequest::CloseChannel { id } => match self.channels.write().await.remove(&id) { ManagerRequest::Info { id } => {
Some(channel) => match channel.close() { debug!("Attempting to retrieve information for connection {id}");
Ok(_) => { match self.info(id).await {
debug!("Channel {id} has been closed"); Ok(info) => {
ManagerResponse::ChannelClosed { id } info!("Retrieved information for connection {id}");
ManagerResponse::Info(info)
} }
Err(x) => ManagerResponse::from(x), Err(x) => ManagerResponse::from(x),
}, }
None => ManagerResponse::from(io::Error::new( }
io::ErrorKind::NotConnected, ManagerRequest::List => {
"Channel is not open or does not exist", debug!("Attempting to retrieve the list of connections");
)), match self.list().await {
}, Ok(list) => {
ManagerRequest::Info { id } => match self.info(id).await { info!("Retrieved list of connections");
Ok(info) => ManagerResponse::Info(info), ManagerResponse::List(list)
Err(x) => ManagerResponse::from(x), }
}, Err(x) => ManagerResponse::from(x),
ManagerRequest::List => match self.list().await { }
Ok(list) => ManagerResponse::List(list), }
Err(x) => ManagerResponse::from(x), ManagerRequest::Kill { id } => {
}, debug!("Attempting to kill connection {id}");
ManagerRequest::Kill { id } => match self.kill(id).await { match self.kill(id).await {
Ok(()) => ManagerResponse::Killed, Ok(()) => {
Err(x) => ManagerResponse::from(x), info!("Killed connection {id}");
}, ManagerResponse::Killed
}
Err(x) => ManagerResponse::from(x),
}
}
}; };
if let Err(x) = reply.send(response).await { if let Err(x) = reply.send(response).await {

@ -453,9 +453,6 @@ where
// Store our connection details // Store our connection details
state.connections.write().await.insert(id, connection_state); state.connections.write().await.insert(id, connection_state);
// Keep track of block status so we can log appropriately
let mut was_blocked = false;
debug!("[Conn {id}] Beginning read/write loop"); debug!("[Conn {id}] Beginning read/write loop");
loop { loop {
let ready = match await_or_shutdown!( let ready = match await_or_shutdown!(
@ -477,10 +474,14 @@ where
Ok(Some(frame)) => match UntypedRequest::from_slice(frame.as_item()) { Ok(Some(frame)) => match UntypedRequest::from_slice(frame.as_item()) {
Ok(request) => match request.to_typed_request() { Ok(request) => match request.to_typed_request() {
Ok(request) => { Ok(request) => {
debug!( if log::log_enabled!(Level::Debug) {
"[Conn {id}] New request {} | header {}", let debug_header = if !request.header.is_empty() {
request.id, request.header format!(" | header {}", request.header)
); } else {
String::new()
};
debug!("[Conn {id}] New request {}{debug_header}", request.id);
}
let origin_id = request.id.clone(); let origin_id = request.id.clone();
let ctx = RequestCtx { let ctx = RequestCtx {
connection_id: id, connection_id: id,
@ -580,18 +581,6 @@ where
// If we did not read or write anything, sleep a bit to offload CPU usage // If we did not read or write anything, sleep a bit to offload CPU usage
if read_blocked && write_blocked { if read_blocked && write_blocked {
tokio::time::sleep(sleep_duration).await; tokio::time::sleep(sleep_duration).await;
if !was_blocked {
trace!("[Conn {id}] Entering blocked state");
}
was_blocked = true;
} else {
if was_blocked {
trace!("[Conn {id}] Exiting blocked state");
}
was_blocked = false;
} }
} }
} }

@ -1,5 +1,6 @@
use super::ServerReply; use super::ServerReply;
use crate::common::{ConnectionId, Request}; use crate::common::{ConnectionId, Request};
use std::fmt;
/// Represents contextual information for working with an inbound request. /// Represents contextual information for working with an inbound request.
pub struct RequestCtx<T, U> { pub struct RequestCtx<T, U> {
@ -12,3 +13,16 @@ pub struct RequestCtx<T, U> {
/// Used to send replies back to be sent out by the server. /// Used to send replies back to be sent out by the server.
pub reply: ServerReply<U>, pub reply: ServerReply<U>,
} }
impl<T, U> fmt::Debug for RequestCtx<T, U>
where
T: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("RequestCtx")
.field("connection_id", &self.connection_id)
.field("request", &self.request)
.field("reply", &"...")
.finish()
}
}

Loading…
Cancel
Save