diff --git a/distant-core/src/client/process.rs b/distant-core/src/client/process.rs index 7752b91..e63ade5 100644 --- a/distant-core/src/client/process.rs +++ b/distant-core/src/client/process.rs @@ -239,7 +239,9 @@ impl RemoteProcess { pub struct RemoteStdin(mpsc::Sender); impl RemoteStdin { - /// Tries to write to the stdin of the remote process + /// Tries to write to the stdin of the remote process, returning ok if immediately + /// successful, `WouldBlock` if would need to wait to send data, and `BrokenPipe` + /// if stdin has been closed pub fn try_write(&mut self, data: impl Into) -> io::Result<()> { match self.0.try_send(data.into()) { Ok(data) => Ok(data), @@ -268,7 +270,7 @@ pub struct RemoteStdout(mpsc::Receiver); impl RemoteStdout { /// Tries to receive latest stdout for a remote process, yielding `None` - /// if no stdout is available + /// if no stdout is available, and `BrokenPipe` if stdout has been closed pub fn try_read(&mut self) -> io::Result> { match self.0.try_recv() { Ok(data) => Ok(Some(data)), @@ -277,7 +279,8 @@ impl RemoteStdout { } } - /// Retrieves the latest stdout for a specific remote process + /// Retrieves the latest stdout for a specific remote process, and `BrokenPipe` if stdout has + /// been closed pub async fn read(&mut self) -> io::Result { self.0 .recv() @@ -292,7 +295,7 @@ pub struct RemoteStderr(mpsc::Receiver); impl RemoteStderr { /// Tries to receive latest stderr for a remote process, yielding `None` - /// if no stderr is available + /// if no stderr is available, and `BrokenPipe` if stderr has been closed pub fn try_read(&mut self) -> io::Result> { match self.0.try_recv() { Ok(data) => Ok(Some(data)), @@ -301,7 +304,8 @@ impl RemoteStderr { } } - /// Retrieves the latest stderr for a specific remote process + /// Retrieves the latest stderr for a specific remote process, and `BrokenPipe` if stderr has + /// been closed pub async fn read(&mut self) -> io::Result { self.0 .recv() diff --git a/distant-lua/src/constants.rs b/distant-lua/src/constants.rs index cbd57d1..ca11cc2 100644 --- a/distant-lua/src/constants.rs +++ b/distant-lua/src/constants.rs @@ -1,5 +1,8 @@ /// Default timeout (15 secs) pub const TIMEOUT_MILLIS: u64 = 15000; +/// Default polling interval for internal process reading and writing +pub const PROC_POLL_TIMEOUT: u64 = 200; + /// Default polling interval for neovim (0.2 secs) pub const NVIM_POLL_TIMEOUT: u64 = 200; diff --git a/distant-lua/src/session/proc.rs b/distant-lua/src/session/proc.rs index 328efb3..a80094d 100644 --- a/distant-lua/src/session/proc.rs +++ b/distant-lua/src/session/proc.rs @@ -1,10 +1,10 @@ -use crate::runtime; +use crate::{constants::PROC_POLL_TIMEOUT, runtime}; use distant_core::{ RemoteLspProcess as DistantRemoteLspProcess, RemoteProcess as DistantRemoteProcess, }; use mlua::{prelude::*, UserData, UserDataFields, UserDataMethods}; use once_cell::sync::Lazy; -use std::{collections::HashMap, io}; +use std::{collections::HashMap, io, time::Duration}; use tokio::sync::RwLock; /// Contains mapping of id -> remote process for use in maintaining active processes @@ -76,16 +76,33 @@ macro_rules! impl_process { } async fn write_stdin_async(id: usize, data: String) -> LuaResult<()> { - with_proc_async!($map_name, id, proc -> { - proc.stdin - .as_mut() - .ok_or_else(|| { - io::Error::new(io::ErrorKind::BrokenPipe, "Stdin closed").to_lua_err() - })? - .write(data.as_str()) - .await - .to_lua_err() - }) + // NOTE: We must spawn a task that continually tries to send stdin as + // if we wait until successful then we hold the lock the entire time + runtime::spawn(async move { + loop { + let is_done = with_proc_async!($map_name, id, proc -> { + let res = proc.stdin + .as_mut() + .ok_or_else(|| { + io::Error::new(io::ErrorKind::BrokenPipe, "Stdin closed").to_lua_err() + })? + .try_write(data.as_str()); + match res { + Ok(_) => Ok(true), + Err(x) if x.kind() == io::ErrorKind::WouldBlock => Ok(false), + Err(x) => Err(x), + } + })?; + + if is_done { + break; + } + + tokio::time::sleep(Duration::from_millis(PROC_POLL_TIMEOUT)).await; + } + + Ok(()) + }).await } fn close_stdin(id: usize) -> LuaResult<()> { @@ -108,16 +125,25 @@ macro_rules! impl_process { } async fn read_stdout_async(id: usize) -> LuaResult { - with_proc_async!($map_name, id, proc -> { - proc.stdout - .as_mut() - .ok_or_else(|| { - io::Error::new(io::ErrorKind::BrokenPipe, "Stdout closed").to_lua_err() - })? - .read() - .await - .to_lua_err() - }) + // NOTE: We must spawn a task that continually tries to read stdout as + // if we wait until successful then we hold the lock the entire time + runtime::spawn(async move { + loop { + let data = with_proc_async!($map_name, id, proc -> { + proc.stdout + .as_mut() + .ok_or_else(|| { + io::Error::new(io::ErrorKind::BrokenPipe, "Stdout closed").to_lua_err() + })? + .try_read() + .to_lua_err()? + }); + + if let Some(data) = data { + break Ok(data); + } + } + }).await } fn read_stderr(id: usize) -> LuaResult> { @@ -133,16 +159,25 @@ macro_rules! impl_process { } async fn read_stderr_async(id: usize) -> LuaResult { - with_proc_async!($map_name, id, proc -> { - proc.stderr - .as_mut() - .ok_or_else(|| { - io::Error::new(io::ErrorKind::BrokenPipe, "Stderr closed").to_lua_err() - })? - .read() - .await - .to_lua_err() - }) + // NOTE: We must spawn a task that continually tries to read stdout as + // if we wait until successful then we hold the lock the entire time + runtime::spawn(async move { + loop { + let data = with_proc_async!($map_name, id, proc -> { + proc.stderr + .as_mut() + .ok_or_else(|| { + io::Error::new(io::ErrorKind::BrokenPipe, "Stderr closed").to_lua_err() + })? + .try_read() + .to_lua_err()? + }); + + if let Some(data) = data { + break Ok(data); + } + } + }).await } fn kill(id: usize) -> LuaResult<()> { @@ -160,12 +195,18 @@ macro_rules! impl_process { } async fn status_async(id: usize) -> LuaResult> { - with_proc_async!($map_name, id, proc -> { - Ok(proc.status().await.map(|(success, exit_code)| Status { - success, - exit_code, - })) - }) + let lock = $map_name.read().await; + let proc = lock.get(&id).ok_or_else(|| { + io::Error::new( + io::ErrorKind::NotFound, + format!("No remote process found with id {}", id), + ) + .to_lua_err() + })?; + Ok(proc.status().await.map(|(success, exit_code)| Status { + success, + exit_code, + })) } fn wait(id: usize) -> LuaResult<(bool, Option)> {