Add more logging to try to pin down what's happening

pull/146/head
Chip Senkbeil 2 years ago
parent 32244d9b2e
commit a6952a98bb
No known key found for this signature in database
GPG Key ID: 35EF1F8EC72A4131

@ -5,6 +5,7 @@ use serde::{de::DeserializeOwned, Serialize};
use std::{
ops::{Deref, DerefMut},
sync::Arc,
time::Duration,
};
use tokio::{
io,
@ -18,6 +19,9 @@ pub use builder::*;
mod channel;
pub use channel::*;
/// Time to wait inbetween connection read/write when nothing was read or written on last pass
const SLEEP_DURATION: Duration = Duration::from_millis(50);
/// Represents a client that can be used to send requests & receive responses from a server
pub struct Client<T, U> {
/// Used to send requests to a server
@ -60,13 +64,22 @@ where
loop {
let ready = tokio::select! {
_ = shutdown_rx.recv() => {
debug!("Client got shutdown signal, so exiting event loop");
break;
}
cb = reconnect_rx.recv() => {
debug!("Client got reconnect signal, so attempting to reconnect");
if let Some(cb) = cb {
let _ = cb.send(Reconnectable::reconnect(&mut transport).await);
let _ = match Reconnectable::reconnect(&mut transport).await {
Ok(()) => cb.send(Ok(())),
Err(x) => {
error!("Client reconnect failed: {x}");
cb.send(Err(x))
}
};
continue;
} else {
error!("Client callback for reconnect missing! Corrupt state!");
break;
}
}
@ -75,35 +88,41 @@ where
}
};
let mut read_blocked = false;
let mut write_blocked = false;
let mut flush_blocked = false;
if ready.is_readable() {
match transport.try_read_frame() {
Ok(Some(frame)) => match UntypedResponse::from_slice(frame.as_item()) {
Ok(response) => {
match response.to_typed_response() {
Ok(response) => {
// Try to send response to appropriate mailbox
// TODO: This will block if full... is that a problem?
// TODO: How should we handle false response? Did logging in past
post_office.deliver_response(response).await;
}
Err(x) => {
if log::log_enabled!(Level::Trace) {
trace!(
"Failed receiving {}",
String::from_utf8_lossy(&response.payload),
);
Ok(Some(frame)) => {
match UntypedResponse::from_slice(frame.as_item()) {
Ok(response) => {
match response.to_typed_response() {
Ok(response) => {
// Try to send response to appropriate mailbox
// TODO: This will block if full... is that a problem?
// TODO: How should we handle false response? Did logging in past
post_office.deliver_response(response).await;
}
Err(x) => {
if log::log_enabled!(Level::Trace) {
trace!(
"Failed receiving {}",
String::from_utf8_lossy(&response.payload),
);
}
error!("Invalid response: {x}");
error!("Invalid response: {x}");
}
}
}
Err(x) => {
error!("Invalid response: {x}");
}
}
Err(x) => {
error!("Invalid response: {x}");
}
},
}
Ok(None) => (),
Err(x) if x.kind() == io::ErrorKind::WouldBlock => (),
Err(x) if x.kind() == io::ErrorKind::WouldBlock => read_blocked = true,
Err(x) => {
error!("Failed to read next frame: {x}");
}
@ -115,7 +134,9 @@ where
match request.to_vec() {
Ok(data) => match transport.try_write_frame(data) {
Ok(()) => (),
Err(x) if x.kind() == io::ErrorKind::WouldBlock => (),
Err(x) if x.kind() == io::ErrorKind::WouldBlock => {
write_blocked = true
}
Err(x) => error!("Send failed: {x}"),
},
Err(x) => {
@ -126,12 +147,21 @@ where
match transport.try_flush() {
Ok(()) => (),
Err(x) if x.kind() == io::ErrorKind::WouldBlock => (),
Err(x) if x.kind() == io::ErrorKind::WouldBlock => flush_blocked = true,
Err(x) => {
error!("Failed to flush outgoing data: {x}");
}
}
}
// If we did not read or write anything, sleep a bit to offload CPU usage
if read_blocked && write_blocked && flush_blocked {
trace!(
"Client blocked on read and write, so sleeping {}s",
SLEEP_DURATION.as_secs_f32()
);
tokio::time::sleep(SLEEP_DURATION).await;
}
}
});

@ -134,12 +134,13 @@ where
#[cfg(test)]
mod tests {
use super::*;
use crate::{Client, FramedTransport};
use crate::{auth::Authenticator, Client, FramedTransport};
use std::time::Duration;
use test_log::test;
type TestClient = Client<u8, u8>;
#[tokio::test]
#[test(tokio::test)]
async fn mail_should_return_mailbox_that_receives_responses_until_transport_closes() {
let (t1, mut t2) = FramedTransport::test_pair(100);
let session = TestClient::new(t1);
@ -174,7 +175,7 @@ mod tests {
}
}
#[tokio::test]
#[test(tokio::test)]
async fn send_should_wait_until_response_received() {
let (t1, mut t2) = FramedTransport::test_pair(100);
let session = TestClient::new(t1);
@ -190,7 +191,7 @@ mod tests {
}
}
#[tokio::test]
#[test(tokio::test)]
async fn send_timeout_should_fail_if_response_not_received_in_time() {
let (t1, mut t2) = FramedTransport::test_pair(100);
let session: TestClient = Client::new(t1);
@ -206,7 +207,7 @@ mod tests {
let _req: Request<u8> = Request::from_slice(frame.as_item()).unwrap();
}
#[tokio::test]
#[test(tokio::test)]
async fn fire_should_send_request_and_not_wait_for_response() {
let (t1, mut t2) = FramedTransport::test_pair(100);
let session: TestClient = Client::new(t1);

@ -8,6 +8,7 @@ use serde::{de::DeserializeOwned, Serialize};
use std::{
io,
sync::{Arc, Weak},
time::Duration,
};
use tokio::{
sync::{mpsc, Mutex},
@ -29,6 +30,9 @@ mod windows;
#[cfg(windows)]
pub use windows::*;
/// Time to wait inbetween connection read/write when nothing was read or written on last pass
const SLEEP_DURATION: Duration = Duration::from_millis(50);
/// Extension trait to provide a reference implementation of starting a server
/// that will listen for new connections (exposed as [`TypedAsyncWrite`] and [`TypedAsyncRead`])
/// and process them using the [`Server`] implementation
@ -227,6 +231,11 @@ where
.await
.expect("[Conn {connection_id}] Failed to examine ready state");
// Keep track of whether we read or wrote anything
let mut read_blocked = false;
let mut write_blocked = false;
let mut flush_blocked = false;
if ready.is_readable() {
match transport.try_read_frame() {
Ok(Some(frame)) => match UntypedRequest::from_slice(frame.as_item()) {
@ -277,7 +286,7 @@ where
}
break;
}
Err(x) if x.kind() == io::ErrorKind::WouldBlock => continue,
Err(x) if x.kind() == io::ErrorKind::WouldBlock => read_blocked = true,
Err(x) => {
// NOTE: We do NOT break out of the loop, as this could happen
// if someone sends bad data at any point, but does not
@ -291,38 +300,49 @@ where
// If our socket is ready to be written to, we try to get the next item from
// the queue and process it
if ready.is_writable() {
match rx.try_recv() {
Ok(response) => {
// Log our message as a string, which can be expensive
if log_enabled!(Level::Trace) {
trace!(
"[Conn {connection_id}] Sending {}",
&response
.to_vec()
.map(|x| String::from_utf8_lossy(&x).to_string())
.unwrap_or_else(|_| "<Cannot serialize>".to_string())
);
}
if let Ok(response) = rx.try_recv() {
// Log our message as a string, which can be expensive
if log_enabled!(Level::Trace) {
trace!(
"[Conn {connection_id}] Sending {}",
&response
.to_vec()
.map(|x| String::from_utf8_lossy(&x).to_string())
.unwrap_or_else(|_| "<Cannot serialize>".to_string())
);
}
match response.to_vec() {
Ok(data) => match transport.try_write_frame(data) {
Ok(()) => continue,
Err(x) if x.kind() == io::ErrorKind::WouldBlock => continue,
Err(x) => error!("[Conn {connection_id}] Send failed: {x}"),
},
Err(x) => {
error!(
"[Conn {connection_id}] Unable to serialize outgoing response: {x}"
);
continue;
}
match response.to_vec() {
Ok(data) => match transport.try_write_frame(data) {
Ok(()) => (),
Err(x) if x.kind() == io::ErrorKind::WouldBlock => write_blocked = true,
Err(x) => error!("[Conn {connection_id}] Send failed: {x}"),
},
Err(x) => {
error!(
"[Conn {connection_id}] Unable to serialize outgoing response: {x}"
);
}
}
}
// If we don't have data, we skip
Err(_) => continue,
match transport.try_flush() {
Ok(()) => (),
Err(x) if x.kind() == io::ErrorKind::WouldBlock => flush_blocked = true,
Err(x) => {
error!("[Conn {connection_id}] Failed to flush outgoing data: {x}");
}
}
}
// If we did not read or write anything, sleep a bit to offload CPU usage
if read_blocked && write_blocked && flush_blocked {
trace!(
"[Conn {connection_id}] Blocked on read and write, so sleeping {}s",
SLEEP_DURATION.as_secs_f32()
);
tokio::time::sleep(SLEEP_DURATION).await;
}
}
}
}

@ -127,12 +127,16 @@ impl<T: Transport> FramedTransport<T> {
pub fn try_flush(&mut self) -> io::Result<()> {
// Continue to send from the outgoing buffer until we either finish or fail
while !self.outgoing.is_empty() {
trace!("try_flush({} bytes)", self.outgoing.len());
match self.inner.try_write(self.outgoing.as_ref()) {
// Getting 0 bytes on write indicates the channel has closed
Ok(0) => return Err(io::Error::from(io::ErrorKind::WriteZero)),
// Successful write will advance the outgoing buffer
Ok(n) => self.outgoing.advance(n),
Ok(n) => {
trace!("try_flush() successfully flushed {n} bytes");
self.outgoing.advance(n);
}
// Any error (including WouldBlock) will get bubbled up
Err(x) => return Err(x),
@ -198,6 +202,10 @@ impl<T: Transport> FramedTransport<T> {
match self.try_read_frame() {
Err(x) if x.kind() == io::ErrorKind::WouldBlock => {
// NOTE: We sleep for a little bit before trying again to avoid pegging CPU
trace!(
"read_frame() blocked, so sleeping {}s",
SLEEP_DURATION.as_secs_f32()
);
tokio::time::sleep(SLEEP_DURATION).await
}
x => return x,
@ -253,6 +261,10 @@ impl<T: Transport> FramedTransport<T> {
match self.try_flush() {
Err(x) if x.kind() == io::ErrorKind::WouldBlock => {
// NOTE: We sleep for a little bit before trying again to avoid pegging CPU
trace!(
"write_frame() blocked, so sleeping {}s",
SLEEP_DURATION.as_secs_f32()
);
tokio::time::sleep(SLEEP_DURATION).await
}
x => return x,

Loading…
Cancel
Save