diff --git a/Cargo.lock b/Cargo.lock index a249649..a80c1b0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -179,7 +179,7 @@ dependencies = [ [[package]] name = "distant" -version = "0.7.0" +version = "0.8.0" dependencies = [ "bytes", "derive_more", diff --git a/Cargo.toml b/Cargo.toml index bb9d0ce..46ea8a9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,7 +2,7 @@ name = "distant" description = "Operate on a remote computer through file and process manipulation" categories = ["command-line-utilities"] -version = "0.7.1" +version = "0.8.0" authors = ["Chip Senkbeil "] edition = "2018" homepage = "https://github.com/chipsenkbeil/distant" diff --git a/src/cli/opt.rs b/src/cli/opt.rs index 18a8149..194bb85 100644 --- a/src/cli/opt.rs +++ b/src/cli/opt.rs @@ -1,7 +1,7 @@ use crate::{ cli::subcommand, core::{ - constants::{SESSION_FILE_PATH_STR, SESSION_SOCKET_PATH_STR}, + constants::{SESSION_FILE_PATH_STR, SESSION_SOCKET_PATH_STR, TIMEOUT_STR}, data::RequestPayload, }, }; @@ -45,13 +45,18 @@ pub struct CommonOpt { #[structopt(short, long, parse(from_occurrences), global = true)] pub verbose: u8, - /// Quiet mode + /// Quiet mode, suppresses all logging #[structopt(short, long, global = true)] pub quiet: bool, /// Log output to disk instead of stderr #[structopt(long, global = true)] pub log_file: Option, + + /// Represents the maximum time (in milliseconds) to wait for a network + /// request before timing out; a timeout of 0 implies waiting indefinitely + #[structopt(short, long, global = true, default_value = &TIMEOUT_STR)] + pub timeout: usize, } /// Contains options related sessions @@ -321,6 +326,11 @@ pub struct LaunchSubcommand { #[structopt(flatten)] pub session_data: SessionOpt, + /// If specified, launch will fail when attempting to bind to a unix socket that + /// already exists, rather than removing the old socket + #[structopt(long)] + pub fail_if_socket_exists: bool, + /// Runs in background via daemon-mode (does nothing on windows); only applies /// when session is socket #[structopt(short, long)] diff --git a/src/cli/subcommand/action/inner.rs b/src/cli/subcommand/action/inner.rs index 0ae3c45..0eee16b 100644 --- a/src/cli/subcommand/action/inner.rs +++ b/src/cli/subcommand/action/inner.rs @@ -337,5 +337,21 @@ fn format_shell(res: Response) -> ResponseOut { ResponseOut::StderrLine(format!("Proc {} failed", id)) } } + ResponsePayload::SystemInfo { + family, + os, + arch, + current_dir, + main_separator, + } => ResponseOut::StdoutLine(format!( + concat!( + "Family: {:?}\n", + "Operating System: {:?}\n", + "Arch: {:?}\n", + "Cwd: {:?}\n", + "Path Sep: {:?}", + ), + family, os, arch, current_dir, main_separator, + )), } } diff --git a/src/cli/subcommand/action/mod.rs b/src/cli/subcommand/action/mod.rs index 1df25bf..928fd4b 100644 --- a/src/cli/subcommand/action/mod.rs +++ b/src/cli/subcommand/action/mod.rs @@ -9,7 +9,7 @@ use crate::{ }; use derive_more::{Display, Error, From}; use log::*; -use tokio::io; +use tokio::{io, time::Duration}; pub(crate) mod inner; @@ -28,12 +28,15 @@ pub fn run(cmd: ActionSubcommand, opt: CommonOpt) -> Result<(), Error> { rt.block_on(async { run_async(cmd, opt).await }) } -async fn run_async(cmd: ActionSubcommand, _opt: CommonOpt) -> Result<(), Error> { +async fn run_async(cmd: ActionSubcommand, opt: CommonOpt) -> Result<(), Error> { + let timeout = Duration::from_millis(opt.timeout as u64); + match cmd.session { SessionInput::Environment => { start( cmd, - Client::tcp_connect(Session::from_environment()?).await?, + Client::tcp_connect_timeout(Session::from_environment()?, timeout).await?, + timeout, ) .await } @@ -41,22 +44,40 @@ async fn run_async(cmd: ActionSubcommand, _opt: CommonOpt) -> Result<(), Error> let path = cmd.session_data.session_file.clone(); start( cmd, - Client::tcp_connect(SessionFile::load_from(path).await?.into()).await?, + Client::tcp_connect_timeout(SessionFile::load_from(path).await?.into(), timeout) + .await?, + timeout, + ) + .await + } + SessionInput::Pipe => { + start( + cmd, + Client::tcp_connect_timeout(Session::from_stdin()?, timeout).await?, + timeout, ) .await } - SessionInput::Pipe => start(cmd, Client::tcp_connect(Session::from_stdin()?).await?).await, #[cfg(unix)] SessionInput::Socket => { let path = cmd.session_data.session_socket.clone(); - start(cmd, Client::unix_connect(path, None).await?).await + start( + cmd, + Client::unix_connect_timeout(path, None, timeout).await?, + timeout, + ) + .await } #[cfg(not(unix))] SessionInput::Socket => unreachable!(), } } -async fn start(cmd: ActionSubcommand, mut client: Client) -> Result<(), Error> +async fn start( + cmd: ActionSubcommand, + mut client: Client, + timeout: Duration, +) -> Result<(), Error> where T: DataStream + 'static, { @@ -78,7 +99,7 @@ where is_proc_req = req.payload.is_proc_run(); debug!("Client sending request: {:?}", req); - let res = client.send(req).await?; + let res = client.send_timeout(req, timeout).await?; // Store the spawned process id for using in sending stdin (if we spawned a proc) proc_id = match &res.payload { diff --git a/src/cli/subcommand/launch.rs b/src/cli/subcommand/launch.rs index 70bbd7f..1aef89d 100644 --- a/src/cli/subcommand/launch.rs +++ b/src/cli/subcommand/launch.rs @@ -18,6 +18,7 @@ use tokio::{ io::{self, AsyncRead, AsyncWrite}, process::Command, sync::{broadcast, mpsc, oneshot, Mutex}, + time::Duration, }; #[derive(Debug, Display, Error, From)] @@ -46,6 +47,8 @@ pub fn run(cmd: LaunchSubcommand, opt: CommonOpt) -> Result<(), Error> { let session_file = cmd.session_data.session_file.clone(); let session_socket = cmd.session_data.session_socket.clone(); + let fail_if_socket_exists = cmd.fail_if_socket_exists; + let timeout = Duration::from_millis(opt.timeout as u64); let session = rt.block_on(async { spawn_remote_server(cmd, opt).await })?; @@ -57,7 +60,7 @@ pub fn run(cmd: LaunchSubcommand, opt: CommonOpt) -> Result<(), Error> { } SessionOutput::Keep => { debug!("Entering interactive loop over stdin"); - rt.block_on(async { keep_loop(session, mode).await })? + rt.block_on(async { keep_loop(session, mode, timeout).await })? } SessionOutput::Pipe => { debug!("Piping session to stdout"); @@ -79,7 +82,9 @@ pub fn run(cmd: LaunchSubcommand, opt: CommonOpt) -> Result<(), Error> { // tokio's runtime doesn't support being transferred from // parent to child in a fork let rt = tokio::runtime::Runtime::new()?; - rt.block_on(async { socket_loop(session_socket, session).await })? + rt.block_on(async { + socket_loop(session_socket, session, timeout, fail_if_socket_exists).await + })? } Ok(_) => {} Err(x) => return Err(Error::ForkError(x)), @@ -91,7 +96,9 @@ pub fn run(cmd: LaunchSubcommand, opt: CommonOpt) -> Result<(), Error> { "Entering interactive loop over unix socket {:?}", session_socket ); - rt.block_on(async { socket_loop(session_socket, session).await })? + rt.block_on(async { + socket_loop(session_socket, session, timeout, fail_if_socket_exists).await + })? } #[cfg(not(unix))] SessionOutput::Socket => { @@ -106,9 +113,9 @@ pub fn run(cmd: LaunchSubcommand, opt: CommonOpt) -> Result<(), Error> { Ok(()) } -async fn keep_loop(session: Session, mode: Mode) -> io::Result<()> { +async fn keep_loop(session: Session, mode: Mode, duration: Duration) -> io::Result<()> { use crate::cli::subcommand::action::inner; - match Client::tcp_connect(session).await { + match Client::tcp_connect_timeout(session, duration).await { Ok(client) => { let config = match mode { Mode::Json => inner::LoopConfig::Json, @@ -121,11 +128,16 @@ async fn keep_loop(session: Session, mode: Mode) -> io::Result<()> { } #[cfg(unix)] -async fn socket_loop(socket_path: impl AsRef, session: Session) -> io::Result<()> { +async fn socket_loop( + socket_path: impl AsRef, + session: Session, + duration: Duration, + fail_if_socket_exists: bool, +) -> io::Result<()> { // We need to form a connection with the actual server to forward requests // and responses between connections debug!("Connecting to {} {}", session.host, session.port); - let mut client = Client::tcp_connect(session).await?; + let mut client = Client::tcp_connect_timeout(session, duration).await?; // Get a copy of our client's broadcaster so we can have each connection // subscribe to it for new messages filtered by tenant @@ -141,13 +153,19 @@ async fn socket_loop(socket_path: impl AsRef, session: Session) -> io::Res "Forwarding request of type {} to server", req.payload.as_ref() ); - if let Err(x) = client.fire(req).await { + if let Err(x) = client.fire_timeout(req, duration).await { error!("Client failed to send request: {:?}", x); break; } } }); + // Remove the socket file if it already exists + if fail_if_socket_exists && socket_path.as_ref().exists() { + debug!("Removing old unix socket instance"); + tokio::fs::remove_file(socket_path.as_ref()).await?; + } + // Continue to receive connections over the unix socket, store them in our // connection mapping debug!("Binding to unix socket: {:?}", socket_path.as_ref()); diff --git a/src/cli/subcommand/listen/handler.rs b/src/cli/subcommand/listen/handler.rs index ec7d189..107ca0d 100644 --- a/src/cli/subcommand/listen/handler.rs +++ b/src/cli/subcommand/listen/handler.rs @@ -8,6 +8,7 @@ use crate::core::{ }; use log::*; use std::{ + env, error::Error, net::SocketAddr, path::{Path, PathBuf}, @@ -51,7 +52,8 @@ pub(super) async fn process( depth, absolute, canonicalize, - } => dir_read(path, depth, absolute, canonicalize).await, + include_root, + } => dir_read(path, depth, absolute, canonicalize, include_root).await, RequestPayload::DirCreate { path, all } => dir_create(path, all).await, RequestPayload::Remove { path, force } => remove(path, force).await, RequestPayload::Copy { src, dst } => copy(src, dst).await, @@ -63,6 +65,7 @@ pub(super) async fn process( RequestPayload::ProcKill { id } => proc_kill(state, id).await, RequestPayload::ProcStdin { id, data } => proc_stdin(state, id, data).await, RequestPayload::ProcList {} => proc_list(state).await, + RequestPayload::SystemInfo {} => system_info().await, } } @@ -125,12 +128,14 @@ async fn dir_read( depth: usize, absolute: bool, canonicalize: bool, + include_root: bool, ) -> Result> { // Canonicalize our provided path to ensure that it is exists, not a loop, and absolute let root_path = tokio::fs::canonicalize(path).await?; - // Traverse, but don't include root directory in entries (hence min depth 1) - let dir = WalkDir::new(root_path.as_path()).min_depth(1); + // Traverse, but don't include root directory in entries (hence min depth 1), unless indicated + // to do so (min depth 0) + let dir = WalkDir::new(root_path.as_path()).min_depth(if include_root { 0 } else { 1 }); // If depth > 0, will recursively traverse to specified max depth, otherwise // performs infinite traversal @@ -140,9 +145,21 @@ async fn dir_read( let mut entries = Vec::new(); let mut errors = Vec::new(); + #[inline] + fn map_file_type(ft: std::fs::FileType) -> FileType { + if ft.is_dir() { + FileType::Dir + } else if ft.is_file() { + FileType::File + } else { + FileType::SymLink + } + } + for entry in dir { match entry.map_err(data::Error::from) { - Ok(e) => { + // For entries within the root, we want to transform the path based on flags + Ok(e) if e.depth() > 0 => { // Canonicalize the path if specified, otherwise just return // the path as is let mut path = if canonicalize { @@ -171,16 +188,20 @@ async fn dir_read( entries.push(DirEntry { path, - file_type: if e.file_type().is_dir() { - FileType::Dir - } else if e.file_type().is_file() { - FileType::File - } else { - FileType::SymLink - }, + file_type: map_file_type(e.file_type()), depth: e.depth(), }); } + + // For the root, we just want to echo back the entry as is + Ok(e) => { + entries.push(DirEntry { + path: e.path().to_path_buf(), + file_type: map_file_type(e.file_type()), + depth: e.depth(), + }); + } + Err(x) => errors.push(x), } } @@ -526,3 +547,13 @@ async fn proc_list(state: HState) -> Result> { .collect(), }) } + +async fn system_info() -> Result> { + Ok(ResponsePayload::SystemInfo { + family: env::consts::FAMILY.to_string(), + os: env::consts::OS.to_string(), + arch: env::consts::ARCH.to_string(), + current_dir: env::current_dir().unwrap_or_default(), + main_separator: std::path::MAIN_SEPARATOR, + }) +} diff --git a/src/core/constants.rs b/src/core/constants.rs index 18b9541..a0abca3 100644 --- a/src/core/constants.rs +++ b/src/core/constants.rs @@ -1,5 +1,9 @@ use std::{env, path::PathBuf}; +/// Represents maximum time (in milliseconds) to wait on a network request +/// before failing (0 meaning indefinitely) +pub const TIMEOUT: usize = 15000; + /// Capacity associated with a client broadcasting its received messages that /// do not have a callback associated pub const CLIENT_BROADCAST_CHANNEL_CAPACITY: usize = 100; @@ -14,6 +18,8 @@ pub const MAX_PIPE_CHUNK_SIZE: usize = 1024; pub const SALT_LEN: usize = 16; lazy_static::lazy_static! { + pub static ref TIMEOUT_STR: String = TIMEOUT.to_string(); + /// Represents the path to the global session file pub static ref SESSION_FILE_PATH: PathBuf = env::temp_dir().join("distant.session"); pub static ref SESSION_FILE_PATH_STR: String = SESSION_FILE_PATH.to_string_lossy().to_string(); diff --git a/src/core/data.rs b/src/core/data.rs index 314ba8a..1042379 100644 --- a/src/core/data.rs +++ b/src/core/data.rs @@ -117,6 +117,14 @@ pub enum RequestPayload { /// returned, even if canonicalize is flagged as true #[structopt(short, long)] canonicalize: bool, + + /// Whether or not to include the root directory in the retrieved + /// entries + /// + /// If included, the root directory will also be a canonicalized, + /// absolute path and will not follow any of the other flags + #[structopt(long)] + include_root: bool, }, /// Creates a directory on the remote machine @@ -190,6 +198,9 @@ pub enum RequestPayload { /// Retrieve a list of all processes being managed by the remote server ProcList {}, + + /// Retrieve information about the server and the system it is on + SystemInfo {}, } /// Represents an response to a request performed on the remote machine @@ -316,6 +327,28 @@ pub enum ResponsePayload { /// List of managed processes entries: Vec, }, + + /// Response to retrieving information about the server and the system it is on + SystemInfo { + /// Family of the operating system as described in + /// https://doc.rust-lang.org/std/env/consts/constant.FAMILY.html + family: String, + + /// Name of the specific operating system as described in + /// https://doc.rust-lang.org/std/env/consts/constant.OS.html + os: String, + + /// Architecture of the CPI as described in + /// https://doc.rust-lang.org/std/env/consts/constant.ARCH.html + arch: String, + + /// Current working directory of the running server process + current_dir: PathBuf, + + /// Primary separator for path components for the current platform + /// as defined in https://doc.rust-lang.org/std/path/constant.MAIN_SEPARATOR.html + main_separator: char, + }, } /// Represents information about a single entry within a directory diff --git a/src/core/net/mod.rs b/src/core/net/mod.rs index 9cb9f8a..a01614a 100644 --- a/src/core/net/mod.rs +++ b/src/core/net/mod.rs @@ -5,16 +5,19 @@ use crate::core::{ constants::CLIENT_BROADCAST_CHANNEL_CAPACITY, data::{Request, Response}, session::Session, + utils, }; use log::*; use std::{ collections::HashMap, + convert, sync::{Arc, Mutex}, }; use tokio::{ io, net::TcpStream, sync::{broadcast, oneshot}, + time::Duration, }; use tokio_stream::wrappers::BroadcastStream; @@ -53,6 +56,13 @@ impl Client { ); Self::inner_connect(transport).await } + + /// Connect to a remote TCP session, timing out after duration has passed + pub async fn tcp_connect_timeout(session: Session, duration: Duration) -> io::Result { + utils::timeout(duration, Self::tcp_connect(session)) + .await + .and_then(convert::identity) + } } #[cfg(unix)] @@ -72,6 +82,17 @@ impl Client { ); Self::inner_connect(transport).await } + + /// Connect to a proxy unix socket, timing out after duration has passed + pub async fn unix_connect_timeout( + path: impl AsRef, + auth_key: Option>, + duration: Duration, + ) -> io::Result { + utils::timeout(duration, Self::unix_connect(path, auth_key)) + .await + .and_then(convert::identity) + } } impl Client @@ -144,6 +165,18 @@ where .map_err(|x| TransportError::from(io::Error::new(io::ErrorKind::ConnectionAborted, x))) } + /// Sends a request and waits for a response, timing out after duration has passed + pub async fn send_timeout( + &mut self, + req: Request, + duration: Duration, + ) -> Result { + utils::timeout(duration, self.send(req)) + .await + .map_err(TransportError::from) + .and_then(convert::identity) + } + /// Sends a request without waiting for a response /// /// Any response that would be received gets sent over the broadcast channel instead @@ -151,6 +184,18 @@ where self.t_write.send(req).await } + /// Sends a request without waiting for a response, timing out after duration has passed + pub async fn fire_timeout( + &mut self, + req: Request, + duration: Duration, + ) -> Result<(), TransportError> { + utils::timeout(duration, self.fire(req)) + .await + .map_err(TransportError::from) + .and_then(convert::identity) + } + /// Clones a new instance of the broadcaster used by the client pub fn to_response_broadcaster(&self) -> broadcast::Sender { self.broadcast.clone() diff --git a/src/core/utils.rs b/src/core/utils.rs index 8a72308..f708041 100644 --- a/src/core/utils.rs +++ b/src/core/utils.rs @@ -1,10 +1,26 @@ -use std::ops::{Deref, DerefMut}; +use std::{ + future::Future, + ops::{Deref, DerefMut}, + time::Duration, +}; +use tokio::{io, time}; // Generates a new tenant name pub fn new_tenant() -> String { format!("tenant_{}{}", rand::random::(), rand::random::()) } +// Wraps a future in a tokio timeout call, transforming the error into +// an io error +pub async fn timeout(d: Duration, f: F) -> io::Result +where + F: Future, +{ + time::timeout(d, f) + .await + .map_err(|x| io::Error::new(io::ErrorKind::TimedOut, x)) +} + /// Wraps a string to provide some friendly read and write methods #[derive(Clone, Debug, PartialEq, Eq, Hash)] pub struct StringBuf(String);