diff --git a/CHANGELOG.md b/CHANGELOG.md index df07ac2..3ec16b3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - When terminating a connection using `distant manager kill`, the connection is now properly dropped, resulting servers waiting to terminate due to `--shutdown lonely=N` to now shutdown accordingly +- Zombies from spawned servers via `distant launch manager://localhost` are now + properly terminated by checking the exit status of processes ## [0.20.0-alpha.13] diff --git a/src/cli/commands/manager/handlers.rs b/src/cli/commands/manager/handlers.rs index 74d7200..8186312 100644 --- a/src/cli/commands/manager/handlers.rs +++ b/src/cli/commands/manager/handlers.rs @@ -16,8 +16,8 @@ use distant_core::net::manager::{ConnectHandler, LaunchHandler}; use distant_core::protocol::PROTOCOL_VERSION; use log::*; use tokio::io::{AsyncBufReadExt, BufReader}; -use tokio::process::{Child, Command}; -use tokio::sync::Mutex; +use tokio::process::Command; +use tokio::sync::{watch, Mutex}; use crate::options::{BindAddress, ClientLaunchConfig}; @@ -33,15 +33,28 @@ fn invalid(label: &str) -> io::Error { /// Supports launching locally through the manager as defined by `manager://...` pub struct ManagerLaunchHandler { - servers: Mutex>, + shutdown: watch::Sender, } impl ManagerLaunchHandler { pub fn new() -> Self { Self { - servers: Mutex::new(Vec::new()), + shutdown: watch::channel(false).0, } } + + /// Triggers shutdown of any tasks still checking that spawned servers have terminated. + pub fn shutdown(&self) { + let _ = self.shutdown.send(true); + } +} + +impl Drop for ManagerLaunchHandler { + /// Terminates waiting for any servers spawned by this handler, which in turn should + /// shut them down. + fn drop(&mut self) { + self.shutdown(); + } } #[async_trait] @@ -138,9 +151,34 @@ impl LaunchHandler for ManagerLaunchHandler { match stdout.read_line(&mut line).await { Ok(n) if n > 0 => { if let Ok(destination) = line[..n].trim().parse::() { - // Store a reference to the server so we can terminate them - // when this handler is dropped - self.servers.lock().await.push(child); + let mut rx = self.shutdown.subscribe(); + + // Wait for the process to complete in a task. We have to do this + // to properly check the exit status, otherwise if the server + // self-terminates then we get a ZOMBIE process! Oh no! + // + // This also replaces the need to store the children within the + // handler itself and instead uses a watch update to kill the + // task in advance in the case where the child hasn't terminated. + tokio::spawn(async move { + // We don't actually care about the result, just that we're done + loop { + tokio::select! { + result = rx.changed() => { + if result.is_err() { + break; + } + + if *rx.borrow_and_update() { + break; + } + } + _ = child.wait() => { + break; + } + } + } + }); break Ok(destination); } else {