From 55ed38fb63cc733ecec09a64242aafe3eff48d9a Mon Sep 17 00:00:00 2001 From: Chip Senkbeil Date: Sun, 18 Jun 2023 13:51:16 -0500 Subject: [PATCH] Fix lockup --- distant-net/src/manager/server/connection.rs | 43 ++++++++++++++++++-- 1 file changed, 40 insertions(+), 3 deletions(-) diff --git a/distant-net/src/manager/server/connection.rs b/distant-net/src/manager/server/connection.rs index 800bf1d..df6d65e 100644 --- a/distant-net/src/manager/server/connection.rs +++ b/distant-net/src/manager/server/connection.rs @@ -1,4 +1,5 @@ use std::collections::HashMap; +use std::fmt; use std::io; use log::*; @@ -135,6 +136,17 @@ enum Action { }, } +impl fmt::Debug for Action { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Register { id, .. } => write!(f, "Action::Register {{ id: {id}, .. }}"), + Self::Unregister { id } => write!(f, "Action::Unregister {{ id: {id} }}"), + Self::Read { .. } => write!(f, "Action::Read {{ .. }}"), + Self::Write { id, .. } => write!(f, "Action::Write {{ id: {id}, .. }}"), + } + } +} + /// Internal task to process outgoing [`UntypedRequest`]s. async fn request_task( id: ConnectionId, @@ -142,10 +154,13 @@ async fn request_task( mut rx: mpsc::UnboundedReceiver>, ) { while let Some(req) = rx.recv().await { + trace!("[Conn {id}] Firing off request {}", req.id); if let Err(x) = client.fire(req).await { error!("[Conn {id}] Failed to send request: {x}"); } } + + trace!("[Conn {id}] Manager request task closed"); } /// Internal task to process incoming [`UntypedResponse`]s. @@ -155,10 +170,17 @@ async fn response_task( tx: mpsc::UnboundedSender, ) { while let Some(res) = mailbox.next().await { + trace!( + "[Conn {id}] Receiving response {} to request {}", + res.id, + res.origin_id + ); if let Err(x) = tx.send(Action::Read { res }) { error!("[Conn {id}] Failed to forward received response: {x}"); } } + + trace!("[Conn {id}] Manager response task closed"); } /// Internal task to process [`Action`] items. @@ -174,6 +196,8 @@ async fn action_task( let mut registered = HashMap::new(); while let Some(action) = rx.recv().await { + trace!("[Conn {id}] {action:?}"); + match action { Action::Register { id, reply } => { registered.insert(id, reply); @@ -201,9 +225,20 @@ async fn action_task( id: channel_id, response: res, }; - if let Err(x) = reply.send(response).await { - error!("[Conn {id}] {x}"); - } + + // TODO: This seems to get stuck at times with some change recently, + // so we kick this off in a new task instead. The better solution + // is to switch most of our mpsc usage to be unbounded so we + // don't need an async call. The only bounded ones should be those + // externally facing to the API user, if even that. + // + // https://github.com/chipsenkbeil/distant/issues/205 + let reply = reply.clone(); + tokio::spawn(async move { + if let Err(x) = reply.send(response).await { + error!("[Conn {id}] {x}"); + } + }); } } Action::Write { id, mut req } => { @@ -217,4 +252,6 @@ async fn action_task( } } } + + trace!("[Conn {id}] Manager action task closed"); }