Update stdin/stdout/stderr pipes to read into buffers instead of waiting for newlines

pull/38/head v0.7.0
Chip Senkbeil 3 years ago
parent a15a707f1d
commit b3a4d79507
No known key found for this signature in database
GPG Key ID: 35EF1F8EC72A4131

2
Cargo.lock generated

@ -179,7 +179,7 @@ dependencies = [
[[package]]
name = "distant"
version = "0.6.0"
version = "0.7.0"
dependencies = [
"bytes",
"derive_more",

@ -2,7 +2,7 @@
name = "distant"
description = "Operate on a remote computer through file and process manipulation"
categories = ["command-line-utilities"]
version = "0.6.0"
version = "0.7.0"
authors = ["Chip Senkbeil <chip@senkbeil.org>"]
edition = "2018"
homepage = "https://github.com/chipsenkbeil/distant"

@ -1,8 +1,10 @@
use crate::{
cli::opt::Mode,
core::{
constants::MAX_PIPE_CHUNK_SIZE,
data::{Request, RequestPayload, Response, ResponsePayload},
net::{Client, DataStream},
utils::StringBuf,
},
};
use derive_more::IsVariant;
@ -53,10 +55,12 @@ where
// We also want to spawn a task to handle sending stdin to the remote process
let mut rx = spawn_stdin_reader();
tokio::spawn(async move {
while let Some(line) = rx.recv().await {
let mut buf = StringBuf::new();
while let Some(data) = rx.recv().await {
match config {
// Special exit condition for interactive mode
_ if line.trim() == "exit" => {
_ if buf.trim() == "exit" => {
if let Err(_) = tx_stop.send(()) {
error!("Failed to close interactive loop!");
}
@ -65,61 +69,80 @@ where
// For json mode, all stdin is treated as individual requests
LoopConfig::Json => {
debug!("Client sending request: {:?}", line);
let result = serde_json::from_str(&line)
.map_err(|x| io::Error::new(io::ErrorKind::InvalidData, x));
match result {
Ok(req) => match client.send(req).await {
Ok(res) => match format_response(Mode::Json, res) {
Ok(out) => out.print(),
Err(x) => error!("Failed to format response: {}", x),
},
Err(x) => {
error!("Failed to send request: {}", x)
buf.push_str(&data);
let (lines, new_buf) = buf.into_full_lines();
buf = new_buf;
// For each complete line, parse it as json and
if let Some(lines) = lines {
for data in lines.lines() {
debug!("Client sending request: {:?}", data);
let result = serde_json::from_str(&data)
.map_err(|x| io::Error::new(io::ErrorKind::InvalidData, x));
match result {
Ok(req) => match client.send(req).await {
Ok(res) => match format_response(Mode::Json, res) {
Ok(out) => out.print(),
Err(x) => error!("Failed to format response: {}", x),
},
Err(x) => {
error!("Failed to send request: {}", x)
}
},
Err(x) => {
error!("Failed to serialize request ('{}'): {}", data, x);
}
}
},
Err(x) => {
error!("Failed to serialize request: {}", x);
}
}
}
// For interactive shell mode, parse stdin as individual commands
LoopConfig::Shell => {
if line.trim().is_empty() {
continue;
}
buf.push_str(&data);
let (lines, new_buf) = buf.into_full_lines();
buf = new_buf;
debug!("Client sending command: {:?}", line);
if let Some(lines) = lines {
for data in lines.lines() {
trace!("Shell processing line: {:?}", data);
if data.trim().is_empty() {
continue;
}
// NOTE: We have to stick something in as the first argument as clap/structopt
// expect the binary name as the first item in the iterator
let payload_result = RequestPayload::from_iter_safe(
std::iter::once("distant")
.chain(line.trim().split(' ').filter(|s| !s.trim().is_empty())),
);
match payload_result {
Ok(payload) => {
match client.send(Request::new(tenant.as_str(), payload)).await {
Ok(res) => match format_response(Mode::Shell, res) {
Ok(out) => out.print(),
Err(x) => error!("Failed to format response: {}", x),
},
debug!("Client sending command: {:?}", data);
// NOTE: We have to stick something in as the first argument as clap/structopt
// expect the binary name as the first item in the iterator
let payload_result = RequestPayload::from_iter_safe(
std::iter::once("distant")
.chain(data.trim().split(' ').filter(|s| !s.trim().is_empty())),
);
match payload_result {
Ok(payload) => {
match client.send(Request::new(tenant.as_str(), payload)).await
{
Ok(res) => match format_response(Mode::Shell, res) {
Ok(out) => out.print(),
Err(x) => error!("Failed to format response: {}", x),
},
Err(x) => {
error!("Failed to send request: {}", x)
}
}
}
Err(x) => {
error!("Failed to send request: {}", x)
error!("Failed to parse command: {}", x);
}
}
}
Err(x) => {
error!("Failed to parse command: {}", x);
}
}
}
// For non-interactive shell mode, all stdin is treated as a proc's stdin
LoopConfig::Proc { id } => {
debug!("Client sending stdin: {:?}", line);
let req = Request::new(tenant.as_str(), RequestPayload::ProcStdin { id, line });
debug!("Client sending stdin: {:?}", data);
let req = Request::new(tenant.as_str(), RequestPayload::ProcStdin { id, data });
let result = client.send(req).await;
if let Err(x) = result {
@ -163,18 +186,28 @@ fn spawn_stdin_reader() -> mpsc::Receiver<String> {
// NOTE: Using blocking I/O per tokio's advice to read from stdin line-by-line and then
// pass the results to a separate async handler to forward to the remote process
std::thread::spawn(move || {
let stdin = std::io::stdin();
use std::io::{self, BufReader, Read};
let mut stdin = BufReader::new(io::stdin());
// Maximum chunk that we expect to read at any one time
let mut buf = [0; MAX_PIPE_CHUNK_SIZE];
loop {
let mut line = String::new();
match stdin.read_line(&mut line) {
match stdin.read(&mut buf) {
Ok(0) | Err(_) => break,
Ok(_) => {
if let Err(x) = tx.blocking_send(line) {
error!(
"Failed to pass along stdin to be sent to remote process: {}",
x
);
Ok(n) => {
match String::from_utf8(buf[..n].to_vec()) {
Ok(text) => {
if let Err(x) = tx.blocking_send(text) {
error!(
"Failed to pass along stdin to be sent to remote process: {}",
x
);
}
}
Err(x) => {
error!("Input over stdin is invalid: {}", x);
}
}
std::thread::yield_now();
}
@ -188,15 +221,19 @@ fn spawn_stdin_reader() -> mpsc::Receiver<String> {
/// Represents the output content and destination
pub enum ResponseOut {
Stdout(String),
StdoutLine(String),
Stderr(String),
StderrLine(String),
None,
}
impl ResponseOut {
pub fn print(self) {
match self {
Self::Stdout(x) => println!("{}", x),
Self::Stderr(x) => eprintln!("{}", x),
Self::Stdout(x) => print!("{}", x),
Self::StdoutLine(x) => println!("{}", x),
Self::Stderr(x) => eprint!("{}", x),
Self::StderrLine(x) => eprintln!("{}", x),
Self::None => {}
}
}
@ -204,7 +241,7 @@ impl ResponseOut {
pub fn format_response(mode: Mode, res: Response) -> io::Result<ResponseOut> {
Ok(match mode {
Mode::Json => ResponseOut::Stdout(format!(
Mode::Json => ResponseOut::StdoutLine(format!(
"{}",
serde_json::to_string(&res)
.map_err(|x| io::Error::new(io::ErrorKind::InvalidData, x))?
@ -217,13 +254,13 @@ fn format_shell(res: Response) -> ResponseOut {
match res.payload {
ResponsePayload::Ok => ResponseOut::None,
ResponsePayload::Error { description } => {
ResponseOut::Stderr(format!("Failed: '{}'.", description))
ResponseOut::StderrLine(format!("Failed: '{}'.", description))
}
ResponsePayload::Blob { data } => {
ResponseOut::Stdout(String::from_utf8_lossy(&data).to_string())
ResponseOut::StdoutLine(String::from_utf8_lossy(&data).to_string())
}
ResponsePayload::Text { data } => ResponseOut::Stdout(data),
ResponsePayload::DirEntries { entries, .. } => ResponseOut::Stdout(format!(
ResponsePayload::Text { data } => ResponseOut::StdoutLine(data),
ResponsePayload::DirEntries { entries, .. } => ResponseOut::StdoutLine(format!(
"{}",
entries
.into_iter()
@ -247,7 +284,7 @@ fn format_shell(res: Response) -> ResponseOut {
.collect::<Vec<String>>()
.join("\n"),
)),
ResponsePayload::Metadata { data } => ResponseOut::Stdout(format!(
ResponsePayload::Metadata { data } => ResponseOut::StdoutLine(format!(
concat!(
"Type: {}\n",
"Len: {}\n",
@ -263,7 +300,7 @@ fn format_shell(res: Response) -> ResponseOut {
data.accessed.unwrap_or_default(),
data.modified.unwrap_or_default(),
)),
ResponsePayload::ProcEntries { entries } => ResponseOut::Stdout(format!(
ResponsePayload::ProcEntries { entries } => ResponseOut::StdoutLine(format!(
"{}",
entries
.into_iter()
@ -272,15 +309,15 @@ fn format_shell(res: Response) -> ResponseOut {
.join("\n"),
)),
ResponsePayload::ProcStart { .. } => ResponseOut::None,
ResponsePayload::ProcStdout { line, .. } => ResponseOut::Stdout(line),
ResponsePayload::ProcStderr { line, .. } => ResponseOut::Stderr(line),
ResponsePayload::ProcStdout { data, .. } => ResponseOut::Stdout(data),
ResponsePayload::ProcStderr { data, .. } => ResponseOut::Stderr(data),
ResponsePayload::ProcDone { id, success, code } => {
if success {
ResponseOut::None
} else if let Some(code) = code {
ResponseOut::Stderr(format!("Proc {} failed with code {}", id, code))
ResponseOut::StderrLine(format!("Proc {} failed with code {}", id, code))
} else {
ResponseOut::Stderr(format!("Proc {} failed", id))
ResponseOut::StderrLine(format!("Proc {} failed", id))
}
}
}

@ -1,4 +1,5 @@
use crate::core::{
constants::MAX_PIPE_CHUNK_SIZE,
data::{
self, DirEntry, FileType, Metadata, Request, RequestPayload, Response, ResponsePayload,
RunningProcess,
@ -15,7 +16,7 @@ use std::{
time::SystemTime,
};
use tokio::{
io::{self, AsyncBufReadExt, AsyncWriteExt, BufReader},
io::{self, AsyncReadExt, AsyncWriteExt},
process::Command,
sync::{mpsc, oneshot, Mutex},
};
@ -60,7 +61,7 @@ pub(super) async fn process(
proc_run(tenant.to_string(), addr, state, tx, cmd, args).await
}
RequestPayload::ProcKill { id } => proc_kill(state, id).await,
RequestPayload::ProcStdin { id, line } => proc_stdin(state, id, line).await,
RequestPayload::ProcStdin { id, data } => proc_stdin(state, id, data).await,
RequestPayload::ProcList {} => proc_list(state).await,
}
}
@ -311,26 +312,33 @@ async fn proc_run(
// Spawn a task that sends stdout as a response
let tx_2 = tx.clone();
let tenant_2 = tenant.clone();
let mut stdout = BufReader::new(child.stdout.take().unwrap()).lines();
let mut stdout = child.stdout.take().unwrap();
let stdout_task = tokio::spawn(async move {
let mut buf: [u8; MAX_PIPE_CHUNK_SIZE] = [0; MAX_PIPE_CHUNK_SIZE];
loop {
match stdout.next_line().await {
Ok(Some(line)) => {
let res = Response::new(
tenant_2.as_str(),
None,
ResponsePayload::ProcStdout { id, line },
);
debug!(
"<Client @ {}> Sending response of type {}",
addr,
res.payload.as_ref()
);
if let Err(_) = tx_2.send(res).await {
match stdout.read(&mut buf).await {
Ok(n) if n > 0 => match String::from_utf8(buf[..n].to_vec()) {
Ok(data) => {
let res = Response::new(
tenant_2.as_str(),
None,
ResponsePayload::ProcStdout { id, data },
);
debug!(
"<Client @ {}> Sending response of type {}",
addr,
res.payload.as_ref()
);
if let Err(_) = tx_2.send(res).await {
break;
}
}
Err(x) => {
error!("Invalid data read from stdout pipe: {}", x);
break;
}
}
Ok(None) => break,
},
Ok(_) => break,
Err(_) => break,
}
}
@ -339,26 +347,33 @@ async fn proc_run(
// Spawn a task that sends stderr as a response
let tx_2 = tx.clone();
let tenant_2 = tenant.clone();
let mut stderr = BufReader::new(child.stderr.take().unwrap()).lines();
let mut stderr = child.stderr.take().unwrap();
let stderr_task = tokio::spawn(async move {
let mut buf: [u8; MAX_PIPE_CHUNK_SIZE] = [0; MAX_PIPE_CHUNK_SIZE];
loop {
match stderr.next_line().await {
Ok(Some(line)) => {
let res = Response::new(
tenant_2.as_str(),
None,
ResponsePayload::ProcStderr { id, line },
);
debug!(
"<Client @ {}> Sending response of type {}",
addr,
res.payload.as_ref()
);
if let Err(_) = tx_2.send(res).await {
match stderr.read(&mut buf).await {
Ok(n) if n > 0 => match String::from_utf8(buf[..n].to_vec()) {
Ok(data) => {
let res = Response::new(
tenant_2.as_str(),
None,
ResponsePayload::ProcStderr { id, data },
);
debug!(
"<Client @ {}> Sending response of type {}",
addr,
res.payload.as_ref()
);
if let Err(_) = tx_2.send(res).await {
break;
}
}
Err(x) => {
error!("Invalid data read from stdout pipe: {}", x);
break;
}
}
Ok(None) => break,
},
Ok(_) => break,
Err(_) => break,
}
}
@ -368,17 +383,7 @@ async fn proc_run(
let mut stdin = child.stdin.take().unwrap();
let (stdin_tx, mut stdin_rx) = mpsc::channel::<String>(1);
tokio::spawn(async move {
while let Some(mut line) = stdin_rx.recv().await {
// NOTE: If the line coming in does not have a newline character,
// we must add it to properly flush to stdin of process
//
// Given that our data structure is called line, we assume
// that it represents a complete line whether or not it
// ends with the linefeed character
if !line.ends_with('\n') {
line.push('\n');
}
while let Some(line) = stdin_rx.recv().await {
if let Err(x) = stdin.write_all(line.as_bytes()).await {
error!("Failed to send stdin to process {}: {}", id, x);
break;
@ -495,10 +500,10 @@ async fn proc_kill(state: HState, id: usize) -> Result<ResponsePayload, Box<dyn
async fn proc_stdin(
state: HState,
id: usize,
line: String,
data: String,
) -> Result<ResponsePayload, Box<dyn Error>> {
if let Some(process) = state.lock().await.processes.get(&id) {
process.stdin_tx.send(line).await.map_err(|_| {
process.stdin_tx.send(data).await.map_err(|_| {
io::Error::new(io::ErrorKind::BrokenPipe, "Unable to send stdin to process")
})?;
}

@ -2,10 +2,16 @@ use std::{env, path::PathBuf};
/// Capacity associated with a client broadcasting its received messages that
/// do not have a callback associated
pub static CLIENT_BROADCAST_CHANNEL_CAPACITY: usize = 100;
pub const CLIENT_BROADCAST_CHANNEL_CAPACITY: usize = 100;
/// Represents the maximum size (in bytes) that data will be read from pipes
/// per individual `read` call
///
/// Current setting is 1k size
pub const MAX_PIPE_CHUNK_SIZE: usize = 1024;
/// Represents the length of the salt to use for encryption
pub static SALT_LEN: usize = 16;
pub const SALT_LEN: usize = 16;
lazy_static::lazy_static! {
/// Represents the path to the global session file

@ -184,8 +184,8 @@ pub enum RequestPayload {
/// Id of the actively-running process to send stdin data
id: usize,
/// Complete line to stdin of process
line: String,
/// Data to send to a process's stdin pipe
data: String,
},
/// Retrieve a list of all processes being managed by the remote server
@ -286,8 +286,8 @@ pub enum ResponsePayload {
/// Arbitrary id associated with running process
id: usize,
/// Line sent to stdout by the process
line: String,
/// Data read from a process' stdout pipe
data: String,
},
/// Actively-transmitted stderr as part of running process
@ -295,8 +295,8 @@ pub enum ResponsePayload {
/// Arbitrary id associated with running process
id: usize,
/// Line sent to stderr by the process
line: String,
/// Data read from a process' stderr pipe
data: String,
},
/// Response to a process finishing

@ -1,4 +1,56 @@
use std::ops::{Deref, DerefMut};
// Generates a new tenant name
pub fn new_tenant() -> String {
format!("tenant_{}{}", rand::random::<u16>(), rand::random::<u8>())
}
/// Wraps a string to provide some friendly read and write methods
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct StringBuf(String);
impl StringBuf {
pub fn new() -> Self {
Self(String::new())
}
/// Consumes data within the buffer that represent full lines (end with a newline) and returns
/// the string containing those lines.
///
/// The remaining buffer contains are returned as the second part of the tuple
pub fn into_full_lines(mut self) -> (Option<String>, StringBuf) {
match self.rfind('\n') {
Some(idx) => {
let remaining = self.0.split_off(idx + 1);
(Some(self.0), Self(remaining))
}
None => (None, self),
}
}
}
impl From<String> for StringBuf {
fn from(x: String) -> Self {
Self(x)
}
}
impl From<StringBuf> for String {
fn from(x: StringBuf) -> Self {
x.0
}
}
impl Deref for StringBuf {
type Target = String;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl DerefMut for StringBuf {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}

Loading…
Cancel
Save