Refactor ProcStdin to send a string representing a line instead of a vec of bytes

pull/38/head
Chip Senkbeil 3 years ago
parent 4edf8021cc
commit 638638f332
No known key found for this signature in database
GPG Key ID: 35EF1F8EC72A4131

@ -119,13 +119,7 @@ where
// For non-interactive shell mode, all stdin is treated as a proc's stdin
LoopConfig::Proc { id } => {
debug!("Client sending stdin: {:?}", line);
let req = Request::new(
tenant.as_str(),
RequestPayload::ProcStdin {
id,
data: line.into_bytes(),
},
);
let req = Request::new(tenant.as_str(), RequestPayload::ProcStdin { id, line });
let result = client.send(req).await;
if let Err(x) = result {

@ -58,7 +58,7 @@ pub(super) async fn process(
proc_run(tenant.to_string(), addr, state, tx, cmd, args).await
}
RequestPayload::ProcKill { id } => proc_kill(state, id).await,
RequestPayload::ProcStdin { id, data } => proc_stdin(state, id, data).await,
RequestPayload::ProcStdin { id, line } => proc_stdin(state, id, line).await,
RequestPayload::ProcList {} => proc_list(state).await,
}
}
@ -312,7 +312,6 @@ async fn proc_run(
let mut stdout = BufReader::new(child.stdout.take().unwrap()).lines();
let stdout_task = tokio::spawn(async move {
loop {
trace!("Reading stdout...");
match stdout.next_line().await {
Ok(Some(line)) => {
let res = Response::new(
@ -365,10 +364,20 @@ async fn proc_run(
// Spawn a task that sends stdin to the process
let mut stdin = child.stdin.take().unwrap();
let (stdin_tx, mut stdin_rx) = mpsc::channel::<Vec<u8>>(1);
let (stdin_tx, mut stdin_rx) = mpsc::channel::<String>(1);
tokio::spawn(async move {
while let Some(data) = stdin_rx.recv().await {
if let Err(x) = stdin.write_all(&data).await {
while let Some(mut line) = stdin_rx.recv().await {
// NOTE: If the line coming in does not have a newline character,
// we must add it to properly flush to stdin of process
//
// Given that our data structure is called line, we assume
// that it represents a complete line whether or not it
// ends with the linefeed character
if !line.ends_with('\n') {
line.push('\n');
}
if let Err(x) = stdin.write_all(line.as_bytes()).await {
error!("Failed to send stdin to process {}: {}", id, x);
break;
}
@ -492,10 +501,10 @@ async fn proc_kill(state: HState, id: usize) -> Result<ResponsePayload, Box<dyn
async fn proc_stdin(
state: HState,
id: usize,
data: Vec<u8>,
line: String,
) -> Result<ResponsePayload, Box<dyn Error>> {
if let Some(process) = state.lock().await.processes.get(&id) {
process.stdin_tx.send(data).await.map_err(|_| {
process.stdin_tx.send(line).await.map_err(|_| {
io::Error::new(io::ErrorKind::BrokenPipe, "Unable to send stdin to process")
})?;
}

@ -65,7 +65,7 @@ struct Process {
pub id: usize,
pub cmd: String,
pub args: Vec<String>,
pub stdin_tx: mpsc::Sender<Vec<u8>>,
pub stdin_tx: mpsc::Sender<String>,
pub kill_tx: oneshot::Sender<()>,
}

@ -184,8 +184,8 @@ pub enum RequestPayload {
/// Id of the actively-running process to send stdin data
id: usize,
/// Data to send to stdin of process
data: Vec<u8>,
/// Complete line to stdin of process
line: String,
},
/// Retrieve a list of all processes being managed by the remote server

Loading…
Cancel
Save