Add features and fixes for upcoming 0.6.0

1. Capture errors when listing directory contents and report them
   as part of the response instead of exiting on first error
2. Refactor DirRead request to support providing a depth (instead
   of the "all" flag), canonicalizing the paths, and have the choice
   of returning absolute paths instead of relative
3. Fix forked process for launch not connecting over TCP to
   server due to tokio runtime being inherited from parent
4. Fix leftover launch process caused by forking w/ the old runtime
5. Fix stdout/stderr of running processes not being reported
   (when process looping and not returning) by wrapping stdout/stderr
   in `BufReader` and sending back one line at a time for each
6. Refactor ProcStdout and ProcStderr responses to send back a line
   field that is a string instead of a data field that is a byte vec
   as we are now reading and sending back whole lines, which makes
   more sense and aligns with output flushing and common stdout/stderr
   processing by other programs
pull/38/head
Chip Senkbeil 3 years ago
parent 046f71ab6e
commit 4edf8021cc
No known key found for this signature in database
GPG Key ID: 35EF1F8EC72A4131

2
Cargo.lock generated

@ -179,7 +179,7 @@ dependencies = [
[[package]]
name = "distant"
version = "0.5.0"
version = "0.6.0"
dependencies = [
"bytes",
"derive_more",

@ -2,7 +2,7 @@
name = "distant"
description = "Operate on a remote computer through file and process manipulation"
categories = ["command-line-utilities"]
version = "0.5.0"
version = "0.6.0"
authors = ["Chip Senkbeil <chip@senkbeil.org>"]
edition = "2018"
homepage = "https://github.com/chipsenkbeil/distant"

@ -201,8 +201,8 @@ pub enum ResponseOut {
impl ResponseOut {
pub fn print(self) {
match self {
Self::Stdout(x) => print!("{}", x),
Self::Stderr(x) => eprint!("{}", x),
Self::Stdout(x) => println!("{}", x),
Self::Stderr(x) => eprintln!("{}", x),
Self::None => {}
}
}
@ -211,7 +211,7 @@ impl ResponseOut {
pub fn format_response(mode: Mode, res: Response) -> io::Result<ResponseOut> {
Ok(match mode {
Mode::Json => ResponseOut::Stdout(format!(
"{}\n",
"{}",
serde_json::to_string(&res)
.map_err(|x| io::Error::new(io::ErrorKind::InvalidData, x))?
)),
@ -223,14 +223,14 @@ fn format_shell(res: Response) -> ResponseOut {
match res.payload {
ResponsePayload::Ok => ResponseOut::None,
ResponsePayload::Error { description } => {
ResponseOut::Stderr(format!("Failed: '{}'.\n", description))
ResponseOut::Stderr(format!("Failed: '{}'.", description))
}
ResponsePayload::Blob { data } => {
ResponseOut::Stdout(String::from_utf8_lossy(&data).to_string())
}
ResponsePayload::Text { data } => ResponseOut::Stdout(data),
ResponsePayload::DirEntries { entries } => ResponseOut::Stdout(format!(
"{}\n",
ResponsePayload::DirEntries { entries, .. } => ResponseOut::Stdout(format!(
"{}",
entries
.into_iter()
.map(|entry| {
@ -238,6 +238,12 @@ fn format_shell(res: Response) -> ResponseOut {
"{}{}",
entry.path.as_os_str().to_string_lossy(),
if entry.file_type.is_dir() {
// NOTE: This can be different from the server if
// the server OS is unix and the client is
// not or vice versa; for now, this doesn't
// matter as we only support unix-based
// operating systems, but something to keep
// in mind
std::path::MAIN_SEPARATOR.to_string()
} else {
String::new()
@ -254,7 +260,7 @@ fn format_shell(res: Response) -> ResponseOut {
"Readonly: {}\n",
"Created: {}\n",
"Last Accessed: {}\n",
"Last Modified: {}\n",
"Last Modified: {}",
),
data.file_type.as_ref(),
data.len,
@ -264,7 +270,7 @@ fn format_shell(res: Response) -> ResponseOut {
data.modified.unwrap_or_default(),
)),
ResponsePayload::ProcEntries { entries } => ResponseOut::Stdout(format!(
"{}\n",
"{}",
entries
.into_iter()
.map(|entry| format!("{}: {} {}", entry.id, entry.cmd, entry.args.join(" ")))
@ -272,19 +278,15 @@ fn format_shell(res: Response) -> ResponseOut {
.join("\n"),
)),
ResponsePayload::ProcStart { .. } => ResponseOut::None,
ResponsePayload::ProcStdout { data, .. } => {
ResponseOut::Stdout(String::from_utf8_lossy(&data).to_string())
}
ResponsePayload::ProcStderr { data, .. } => {
ResponseOut::Stderr(String::from_utf8_lossy(&data).to_string())
}
ResponsePayload::ProcStdout { line, .. } => ResponseOut::Stdout(line),
ResponsePayload::ProcStderr { line, .. } => ResponseOut::Stderr(line),
ResponsePayload::ProcDone { id, success, code } => {
if success {
ResponseOut::None
} else if let Some(code) = code {
ResponseOut::Stderr(format!("Proc {} failed with code {}\n", id, code))
ResponseOut::Stderr(format!("Proc {} failed with code {}", id, code))
} else {
ResponseOut::Stderr(format!("Proc {} failed\n", id))
ResponseOut::Stderr(format!("Proc {} failed", id))
}
}
}

@ -62,8 +62,17 @@ pub fn run(cmd: LaunchSubcommand, opt: CommonOpt) -> Result<(), Error> {
"Forking and entering interactive loop over unix socket {:?}",
session_socket
);
// Force runtime shutdown by dropping it BEFORE forking as otherwise
// this produces a garbage process that won't die
drop(rt);
match daemon(false, false) {
Ok(Fork::Child) => {
// NOTE: We need to create a runtime within the forked process as
// tokio's runtime doesn't support being transferred from
// parent to child in a fork
let rt = tokio::runtime::Runtime::new()?;
rt.block_on(async { socket_loop(session_socket, session).await })?
}
Ok(_) => {}
@ -109,13 +118,16 @@ async fn keep_loop(session: Session, mode: Mode) -> io::Result<()> {
async fn socket_loop(socket_path: impl AsRef<Path>, session: Session) -> io::Result<()> {
// We need to form a connection with the actual server to forward requests
// and responses between connections
debug!("Connecting to {} {}", session.host, session.port);
let mut client = Client::tcp_connect(session).await?;
// Get a copy of our client's broadcaster so we can have each connection
// subscribe to it for new messages filtered by tenant
debug!("Acquiring client broadcaster");
let broadcaster = client.to_response_broadcaster();
// Spawn task to send to the server requests from connections
debug!("Spawning request forwarding task");
let (req_tx, mut req_rx) = mpsc::channel::<Request>(CLIENT_BROADCAST_CHANNEL_CAPACITY);
tokio::spawn(async move {
while let Some(req) = req_rx.recv().await {
@ -132,7 +144,9 @@ async fn socket_loop(socket_path: impl AsRef<Path>, session: Session) -> io::Res
// Continue to receive connections over the unix socket, store them in our
// connection mapping
debug!("Binding to unix socket: {:?}", socket_path.as_ref());
let listener = tokio::net::UnixListener::bind(socket_path)?;
while let Ok((conn, addr)) = listener.accept().await {
// Establish a proper connection via a handshake, discarding the connection otherwise
let transport = match Transport::from_handshake(conn, None).await {

@ -1,14 +1,19 @@
use super::{Process, State};
use crate::core::data::{
DirEntry, FileType, Metadata, Request, RequestPayload, Response, ResponsePayload,
self, DirEntry, FileType, Metadata, Request, RequestPayload, Response, ResponsePayload,
RunningProcess,
};
use log::*;
use std::{
error::Error, net::SocketAddr, path::PathBuf, process::Stdio, sync::Arc, time::SystemTime,
error::Error,
net::SocketAddr,
path::{Path, PathBuf},
process::Stdio,
sync::Arc,
time::SystemTime,
};
use tokio::{
io::{self, AsyncReadExt, AsyncWriteExt},
io::{self, AsyncBufReadExt, AsyncWriteExt, BufReader},
process::Command,
sync::{mpsc, oneshot, Mutex},
};
@ -38,7 +43,12 @@ pub(super) async fn process(
RequestPayload::FileWriteText { path, text } => file_write(path, text).await,
RequestPayload::FileAppend { path, data } => file_append(path, data).await,
RequestPayload::FileAppendText { path, text } => file_append(path, text).await,
RequestPayload::DirRead { path, all } => dir_read(path, all).await,
RequestPayload::DirRead {
path,
depth,
absolute,
canonicalize,
} => dir_read(path, depth, absolute, canonicalize).await,
RequestPayload::DirCreate { path, all } => dir_create(path, all).await,
RequestPayload::Remove { path, force } => remove(path, force).await,
RequestPayload::Copy { src, dst } => copy(src, dst).await,
@ -107,21 +117,57 @@ async fn file_append(
Ok(ResponsePayload::Ok)
}
async fn dir_read(path: PathBuf, all: bool) -> Result<ResponsePayload, Box<dyn Error>> {
// Traverse, but don't include root directory in entries (hence min depth 1)
let dir = WalkDir::new(path.as_path()).min_depth(1);
// If all, will recursively traverse, otherwise just return directly from dir
let dir = if all { dir } else { dir.max_depth(1) };
async fn dir_read(
path: PathBuf,
depth: usize,
absolute: bool,
canonicalize: bool,
) -> Result<ResponsePayload, Box<dyn Error>> {
// Canonicalize our provided path to ensure that it is exists, not a loop, and absolute
let root_path = tokio::fs::canonicalize(path).await?;
// TODO: Support both returning errors and successfully-traversed entries
// TODO: Support returning full paths instead of always relative?
Ok(ResponsePayload::DirEntries {
entries: dir
.into_iter()
.map(|e| {
e.map(|e| DirEntry {
path: e.path().strip_prefix(path.as_path()).unwrap().to_path_buf(),
// Traverse, but don't include root directory in entries (hence min depth 1)
let dir = WalkDir::new(root_path.as_path()).min_depth(1);
// If depth > 0, will recursively traverse to specified max depth, otherwise
// performs infinite traversal
let dir = if depth > 0 { dir.max_depth(depth) } else { dir };
// Determine our entries and errors
let mut entries = Vec::new();
let mut errors = Vec::new();
for entry in dir {
match entry.map_err(data::Error::from) {
Ok(e) => {
// Canonicalize the path if specified, otherwise just return
// the path as is
let mut path = if canonicalize {
match tokio::fs::canonicalize(e.path()).await {
Ok(path) => path,
Err(x) => {
errors.push(data::Error::from(x));
continue;
}
}
} else {
e.path().to_path_buf()
};
// Strip the path of its prefix based if not flagged as absolute
if !absolute {
// NOTE: In the situation where we canonicalized the path earlier,
// there is no guarantee that our root path is still the
// parent of the symlink's destination; so, in that case we MUST just
// return the path if the strip_prefix fails
path = path
.strip_prefix(root_path.as_path())
.map(Path::to_path_buf)
.unwrap_or(path);
};
entries.push(DirEntry {
path,
file_type: if e.file_type().is_dir() {
FileType::Dir
} else if e.file_type().is_file() {
@ -130,10 +176,13 @@ async fn dir_read(path: PathBuf, all: bool) -> Result<ResponsePayload, Box<dyn E
FileType::SymLink
},
depth: e.depth(),
})
})
.collect::<Result<Vec<DirEntry>, walkdir::Error>>()?,
})
});
}
Err(x) => errors.push(x),
}
}
Ok(ResponsePayload::DirEntries { entries, errors })
}
async fn dir_create(path: PathBuf, all: bool) -> Result<ResponsePayload, Box<dyn Error>> {
@ -260,16 +309,16 @@ async fn proc_run(
// Spawn a task that sends stdout as a response
let tx_2 = tx.clone();
let tenant_2 = tenant.clone();
let mut stdout = child.stdout.take().unwrap();
let mut stdout = BufReader::new(child.stdout.take().unwrap()).lines();
let stdout_task = tokio::spawn(async move {
loop {
let mut data = Vec::new();
match stdout.read_to_end(&mut data).await {
Ok(n) if n > 0 => {
trace!("Reading stdout...");
match stdout.next_line().await {
Ok(Some(line)) => {
let res = Response::new(
tenant_2.as_str(),
None,
ResponsePayload::ProcStdout { id, data },
ResponsePayload::ProcStdout { id, line },
);
debug!(
"<Client @ {}> Sending response of type {}",
@ -280,7 +329,7 @@ async fn proc_run(
break;
}
}
Ok(_) => break,
Ok(None) => break,
Err(_) => break,
}
}
@ -289,16 +338,15 @@ async fn proc_run(
// Spawn a task that sends stderr as a response
let tx_2 = tx.clone();
let tenant_2 = tenant.clone();
let mut stderr = child.stderr.take().unwrap();
let mut stderr = BufReader::new(child.stderr.take().unwrap()).lines();
let stderr_task = tokio::spawn(async move {
loop {
let mut data = Vec::new();
match stderr.read_to_end(&mut data).await {
Ok(n) if n > 0 => {
match stderr.next_line().await {
Ok(Some(line)) => {
let res = Response::new(
tenant_2.as_str(),
None,
ResponsePayload::ProcStderr { id, data },
ResponsePayload::ProcStderr { id, line },
);
debug!(
"<Client @ {}> Sending response of type {}",
@ -309,7 +357,7 @@ async fn proc_run(
break;
}
}
Ok(_) => break,
Ok(None) => break,
Err(_) => break,
}
}

@ -39,9 +39,15 @@ struct State {
impl State {
/// Cleans up state associated with a particular client
pub async fn cleanup_client(&mut self, addr: SocketAddr) {
debug!("<Client @ {}> Cleaning up state", addr);
if let Some(ids) = self.client_processes.remove(&addr) {
for id in ids {
if let Some(process) = self.processes.remove(&id) {
trace!(
"<Client @ {}> Requesting proc {} be killed",
addr,
process.id
);
if let Err(_) = process.kill_tx.send(()) {
error!(
"Client {} failed to send process {} kill signal",

@ -9,10 +9,10 @@ pub static SALT_LEN: usize = 16;
lazy_static::lazy_static! {
/// Represents the path to the global session file
pub static ref SESSION_FILE_PATH: PathBuf = env::temp_dir().join("distant.session.txt");
pub static ref SESSION_FILE_PATH: PathBuf = env::temp_dir().join("distant.session");
pub static ref SESSION_FILE_PATH_STR: String = SESSION_FILE_PATH.to_string_lossy().to_string();
/// Represents the path to a socket to communicate instead of a session file
pub static ref SESSION_SOCKET_PATH: PathBuf = env::temp_dir().join("distant.session.sock");
pub static ref SESSION_SOCKET_PATH: PathBuf = env::temp_dir().join("distant.sock");
pub static ref SESSION_SOCKET_PATH_STR: String = SESSION_SOCKET_PATH.to_string_lossy().to_string();
}

@ -1,6 +1,6 @@
use derive_more::IsVariant;
use serde::{Deserialize, Serialize};
use std::path::PathBuf;
use std::{io, path::PathBuf};
use structopt::StructOpt;
use strum::AsRefStr;
@ -98,9 +98,25 @@ pub enum RequestPayload {
/// The path to the directory on the remote machine
path: PathBuf,
/// Whether or not to read subdirectories recursively
/// Maximum depth to traverse with 0 indicating there is no maximum
/// depth and 1 indicating the most immediate children within the
/// directory
#[serde(default = "one")]
#[structopt(short, long, default_value = "1")]
depth: usize,
/// Whether or not to return absolute or relative paths
#[structopt(short, long)]
all: bool,
absolute: bool,
/// Whether or not to canonicalize the resulting paths, meaning
/// returning the canonical, absolute form of a path with all
/// intermediate components normalized and symbolic links resolved
///
/// Note that the flag absolute must be true to have absolute paths
/// returned, even if canonicalize is flagged as true
#[structopt(short, long)]
canonicalize: bool,
},
/// Creates a directory on the remote machine
@ -248,6 +264,9 @@ pub enum ResponsePayload {
DirEntries {
/// Entries contained within the requested directory
entries: Vec<DirEntry>,
/// Errors encountered while scanning for entries
errors: Vec<Error>,
},
/// Response to reading metadata
@ -267,8 +286,8 @@ pub enum ResponsePayload {
/// Arbitrary id associated with running process
id: usize,
/// Data sent to stdout by process
data: Vec<u8>,
/// Line sent to stdout by the process
line: String,
},
/// Actively-transmitted stderr as part of running process
@ -276,8 +295,8 @@ pub enum ResponsePayload {
/// Arbitrary id associated with running process
id: usize,
/// Data sent to stderr by process
data: Vec<u8>,
/// Line sent to stderr by the process
line: String,
},
/// Response to a process finishing
@ -365,3 +384,41 @@ pub struct RunningProcess {
/// Not the same as the process' pid!
pub id: usize,
}
/// General purpose error type that can be sent across the wire
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case", deny_unknown_fields)]
pub struct Error {
/// Label describing the kind of error
pub kind: String,
/// Description of the error itself
pub description: String,
}
impl From<io::Error> for Error {
fn from(x: io::Error) -> Self {
Self {
kind: format!("{:?}", x.kind()),
description: format!("{}", x),
}
}
}
impl From<walkdir::Error> for Error {
fn from(x: walkdir::Error) -> Self {
if x.io_error().is_some() {
x.into_io_error().map(Self::from).unwrap()
} else {
Self {
kind: String::from("Loop"),
description: format!("{}", x),
}
}
}
}
/// Used to provide a default serde value of 1
const fn one() -> usize {
1
}

Loading…
Cancel
Save