Refactor to support a payload for request/response with multiple entries; bump to 0.10.0

pull/38/head v0.10.0
Chip Senkbeil 3 years ago
parent 5a5d7f6909
commit 86e4d7f2fc
No known key found for this signature in database
GPG Key ID: 35EF1F8EC72A4131

2
Cargo.lock generated

@ -179,7 +179,7 @@ dependencies = [
[[package]]
name = "distant"
version = "0.9.5"
version = "0.10.0"
dependencies = [
"bytes",
"derive_more",

@ -2,7 +2,7 @@
name = "distant"
description = "Operate on a remote computer through file and process manipulation"
categories = ["command-line-utilities"]
version = "0.9.5"
version = "0.10.0"
authors = ["Chip Senkbeil <chip@senkbeil.org>"]
edition = "2018"
homepage = "https://github.com/chipsenkbeil/distant"

@ -2,7 +2,7 @@ use crate::{
cli::subcommand,
core::{
constants::{SESSION_FILE_PATH_STR, SESSION_SOCKET_PATH_STR, TIMEOUT_STR},
data::RequestPayload,
data::RequestData,
},
};
use derive_more::{Display, Error, From, IsVariant};
@ -168,7 +168,7 @@ pub struct ActionSubcommand {
/// Operation to send over the wire if not in interactive mode
#[structopt(subcommand)]
pub operation: Option<RequestPayload>,
pub operation: Option<RequestData>,
}
/// Represents options for binding a server to an IP address

@ -2,7 +2,7 @@ use crate::{
cli::opt::Mode,
core::{
constants::MAX_PIPE_CHUNK_SIZE,
data::{Request, RequestPayload, Response, ResponsePayload},
data::{Request, RequestData, Response, ResponseData},
net::{Client, DataStream},
utils::StringBuf,
},
@ -114,13 +114,15 @@ where
// NOTE: We have to stick something in as the first argument as clap/structopt
// expect the binary name as the first item in the iterator
let payload_result = RequestPayload::from_iter_safe(
let result = RequestData::from_iter_safe(
std::iter::once("distant")
.chain(data.trim().split(' ').filter(|s| !s.trim().is_empty())),
);
match payload_result {
Ok(payload) => {
match client.send(Request::new(tenant.as_str(), payload)).await
match result {
Ok(data) => {
match client
.send(Request::new(tenant.as_str(), vec![data]))
.await
{
Ok(res) => match format_response(Mode::Shell, res) {
Ok(out) => out.print(),
@ -142,7 +144,8 @@ where
// For non-interactive shell mode, all stdin is treated as a proc's stdin
LoopConfig::Proc { id } => {
debug!("Client sending stdin: {:?}", data);
let req = Request::new(tenant.as_str(), RequestPayload::ProcStdin { id, data });
let req =
Request::new(tenant.as_str(), vec![RequestData::ProcStdin { id, data }]);
let result = client.send(req).await;
if let Err(x) = result {
@ -161,7 +164,10 @@ where
"Response stream no longer available",
)
})?;
let done = res.payload.is_proc_done() && config.is_proc();
// NOTE: If the loop is for a proxy process, we should assume that the payload
// is all-or-nothing for the done check
let done = config.is_proc() && res.payload.iter().any(|x| x.is_proc_done());
format_response(config.into(), res)?.print();
@ -257,27 +263,40 @@ impl ResponseOut {
}
pub fn format_response(mode: Mode, res: Response) -> io::Result<ResponseOut> {
let payload_cnt = res.payload.len();
Ok(match mode {
Mode::Json => ResponseOut::StdoutLine(format!(
"{}",
serde_json::to_string(&res)
.map_err(|x| io::Error::new(io::ErrorKind::InvalidData, x))?
)),
Mode::Shell => format_shell(res),
// NOTE: For shell, we assume a singular entry in the response's payload
Mode::Shell if payload_cnt != 1 => {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
format!(
"Got {} entries in payload data, but shell expects exactly 1",
payload_cnt
),
))
}
Mode::Shell => format_shell(res.payload.into_iter().next().unwrap()),
})
}
fn format_shell(res: Response) -> ResponseOut {
match res.payload {
ResponsePayload::Ok => ResponseOut::None,
ResponsePayload::Error { description } => {
fn format_shell(data: ResponseData) -> ResponseOut {
match data {
ResponseData::Ok => ResponseOut::None,
ResponseData::Error { description } => {
ResponseOut::StderrLine(format!("Failed: '{}'.", description))
}
ResponsePayload::Blob { data } => {
ResponseData::Blob { data } => {
ResponseOut::StdoutLine(String::from_utf8_lossy(&data).to_string())
}
ResponsePayload::Text { data } => ResponseOut::StdoutLine(data),
ResponsePayload::DirEntries { entries, .. } => ResponseOut::StdoutLine(format!(
ResponseData::Text { data } => ResponseOut::StdoutLine(data),
ResponseData::DirEntries { entries, .. } => ResponseOut::StdoutLine(format!(
"{}",
entries
.into_iter()
@ -301,7 +320,7 @@ fn format_shell(res: Response) -> ResponseOut {
.collect::<Vec<String>>()
.join("\n"),
)),
ResponsePayload::Metadata {
ResponseData::Metadata {
canonicalized_path,
file_type,
len,
@ -329,7 +348,7 @@ fn format_shell(res: Response) -> ResponseOut {
accessed.unwrap_or_default(),
modified.unwrap_or_default(),
)),
ResponsePayload::ProcEntries { entries } => ResponseOut::StdoutLine(format!(
ResponseData::ProcEntries { entries } => ResponseOut::StdoutLine(format!(
"{}",
entries
.into_iter()
@ -337,10 +356,10 @@ fn format_shell(res: Response) -> ResponseOut {
.collect::<Vec<String>>()
.join("\n"),
)),
ResponsePayload::ProcStart { .. } => ResponseOut::None,
ResponsePayload::ProcStdout { data, .. } => ResponseOut::Stdout(data),
ResponsePayload::ProcStderr { data, .. } => ResponseOut::Stderr(data),
ResponsePayload::ProcDone { id, success, code } => {
ResponseData::ProcStart { .. } => ResponseOut::None,
ResponseData::ProcStdout { data, .. } => ResponseOut::Stdout(data),
ResponseData::ProcStderr { data, .. } => ResponseOut::Stderr(data),
ResponseData::ProcDone { id, success, code } => {
if success {
ResponseOut::None
} else if let Some(code) = code {
@ -349,7 +368,7 @@ fn format_shell(res: Response) -> ResponseOut {
ResponseOut::StderrLine(format!("Proc {} failed", id))
}
}
ResponsePayload::SystemInfo {
ResponseData::SystemInfo {
family,
os,
arch,

@ -1,7 +1,7 @@
use crate::{
cli::opt::{ActionSubcommand, CommonOpt, Mode, SessionInput},
core::{
data::{Request, ResponsePayload},
data::{Request, ResponseData},
net::{Client, DataStream, TransportError},
session::{Session, SessionFile},
utils,
@ -94,18 +94,20 @@ where
if let Some(req) = cmd
.operation
.map(|payload| Request::new(tenant.as_str(), payload))
.map(|payload| Request::new(tenant.as_str(), vec![payload]))
{
is_proc_req = req.payload.is_proc_run();
// NOTE: We know that there is a single payload entry, so it's all-or-nothing
is_proc_req = req.payload.iter().any(|x| x.is_proc_run());
debug!("Client sending request: {:?}", req);
let res = client.send_timeout(req, timeout).await?;
// Store the spawned process id for using in sending stdin (if we spawned a proc)
proc_id = match &res.payload {
ResponsePayload::ProcStart { id } => *id,
_ => 0,
};
// NOTE: We can assume that there is a single payload entry in response to our single
// entry in our request
if let Some(ResponseData::ProcStart { id }) = res.payload.first() {
proc_id = *id;
}
inner::format_response(cmd.mode, res)?.print();
}

@ -2,7 +2,7 @@ use crate::{
cli::opt::{CommonOpt, LaunchSubcommand, Mode, SessionOutput},
core::{
constants::CLIENT_BROADCAST_CHANNEL_CAPACITY,
data::{Request, RequestPayload, Response, ResponsePayload},
data::{Request, RequestData, Response, ResponseData},
net::{Client, Transport, TransportReadHalf, TransportWriteHalf},
session::{Session, SessionFile},
utils,
@ -167,8 +167,9 @@ async fn socket_loop(
tokio::spawn(async move {
while let Some(req) = req_rx.recv().await {
debug!(
"Forwarding request of type {} to server",
req.payload.as_ref()
"Forwarding request of type{} {} to server",
if req.payload.len() > 1 { "s" } else { "" },
req.to_payload_type_string()
);
if let Err(x) = client.fire_timeout(req, duration).await {
error!("Client failed to send request: {:?}", x);
@ -308,22 +309,22 @@ async fn handle_conn_incoming<T>(
// beginning, we exit the function early via return.
let tenant = tenant.unwrap();
// Perform cleanup if done
for id in state.lock().await.processes.as_slice() {
debug!("Cleaning conn {} :: killing process {}", conn_id, id);
if let Err(x) = req_tx
.send(Request::new(
tenant.clone(),
RequestPayload::ProcKill { id: *id },
))
.await
{
error!(
"<Client @ {}> Failed to send kill signal for process {}: {}",
conn_id, id, x
);
break;
}
// Perform cleanup if done by sending a request to kill each running process
// debug!("Cleaning conn {} :: killing process {}", conn_id, id);
if let Err(x) = req_tx
.send(Request::new(
tenant.clone(),
state
.lock()
.await
.processes
.iter()
.map(|id| RequestData::ProcKill { id: *id })
.collect(),
))
.await
{
error!("<Client @ {}> Failed to send kill signals: {}", conn_id, x);
}
}
@ -347,19 +348,21 @@ async fn handle_conn_outgoing<T>(
// Forward along responses that are for our connection
Ok(res) if res.tenant == tenant => {
debug!(
"Conn {} being sent response of type {}",
"Conn {} being sent response of type{} {}",
conn_id,
res.payload.as_ref()
if res.payload.len() > 1 { "s" } else { "" },
res.to_payload_type_string(),
);
// If a new process was started, we want to capture the id and
// associate it with the connection
match &res.payload {
ResponsePayload::ProcStart { id } => {
debug!("Tracking proc {} for conn {}", id, conn_id);
state.lock().await.processes.push(*id);
}
_ => {}
let ids = res.payload.iter().filter_map(|x| match x {
ResponseData::ProcStart { id } => Some(*id),
_ => None,
});
for id in ids {
debug!("Tracking proc {} for conn {}", id, conn_id);
state.lock().await.processes.push(id);
}
if let Err(x) = writer.send(res).await {

@ -1,11 +1,11 @@
use crate::core::{
constants::MAX_PIPE_CHUNK_SIZE,
data::{
self, DirEntry, FileType, Request, RequestPayload, Response, ResponsePayload,
RunningProcess,
self, DirEntry, FileType, Request, RequestData, Response, ResponseData, RunningProcess,
},
state::{Process, ServerState},
};
use futures::future;
use log::*;
use std::{
env,
@ -34,93 +34,111 @@ pub(super) async fn process(
tx: Reply,
) -> Result<(), mpsc::error::SendError<Response>> {
async fn inner(
tenant: String,
tenant: Arc<String>,
addr: SocketAddr,
state: HState,
payload: RequestPayload,
data: RequestData,
tx: Reply,
) -> Result<ResponsePayload, Box<dyn std::error::Error>> {
match payload {
RequestPayload::FileRead { path } => file_read(path).await,
RequestPayload::FileReadText { path } => file_read_text(path).await,
RequestPayload::FileWrite { path, data } => file_write(path, data).await,
RequestPayload::FileWriteText { path, text } => file_write(path, text).await,
RequestPayload::FileAppend { path, data } => file_append(path, data).await,
RequestPayload::FileAppendText { path, text } => file_append(path, text).await,
RequestPayload::DirRead {
) -> Result<ResponseData, Box<dyn std::error::Error>> {
match data {
RequestData::FileRead { path } => file_read(path).await,
RequestData::FileReadText { path } => file_read_text(path).await,
RequestData::FileWrite { path, data } => file_write(path, data).await,
RequestData::FileWriteText { path, text } => file_write(path, text).await,
RequestData::FileAppend { path, data } => file_append(path, data).await,
RequestData::FileAppendText { path, text } => file_append(path, text).await,
RequestData::DirRead {
path,
depth,
absolute,
canonicalize,
include_root,
} => dir_read(path, depth, absolute, canonicalize, include_root).await,
RequestPayload::DirCreate { path, all } => dir_create(path, all).await,
RequestPayload::Remove { path, force } => remove(path, force).await,
RequestPayload::Copy { src, dst } => copy(src, dst).await,
RequestPayload::Rename { src, dst } => rename(src, dst).await,
RequestPayload::Metadata { path, canonicalize } => metadata(path, canonicalize).await,
RequestPayload::ProcRun { cmd, args } => {
RequestData::DirCreate { path, all } => dir_create(path, all).await,
RequestData::Remove { path, force } => remove(path, force).await,
RequestData::Copy { src, dst } => copy(src, dst).await,
RequestData::Rename { src, dst } => rename(src, dst).await,
RequestData::Metadata { path, canonicalize } => metadata(path, canonicalize).await,
RequestData::ProcRun { cmd, args } => {
proc_run(tenant.to_string(), addr, state, tx, cmd, args).await
}
RequestPayload::ProcKill { id } => proc_kill(state, id).await,
RequestPayload::ProcStdin { id, data } => proc_stdin(state, id, data).await,
RequestPayload::ProcList {} => proc_list(state).await,
RequestPayload::SystemInfo {} => system_info().await,
RequestData::ProcKill { id } => proc_kill(state, id).await,
RequestData::ProcStdin { id, data } => proc_stdin(state, id, data).await,
RequestData::ProcList {} => proc_list(state).await,
RequestData::SystemInfo {} => system_info().await,
}
}
let tenant = req.tenant.clone();
let res = Response::new(
req.tenant,
Some(req.id),
match inner(tenant, addr, state, req.payload, tx.clone()).await {
Ok(payload) => payload,
Err(x) => ResponsePayload::Error {
let tenant = Arc::new(req.tenant.clone());
// Build up a collection of tasks to run independently
let mut payload_tasks = Vec::new();
for data in req.payload {
let tenant_2 = Arc::clone(&tenant);
let state_2 = Arc::clone(&state);
let tx_2 = tx.clone();
payload_tasks.push(tokio::spawn(async move {
match inner(tenant_2, addr, state_2, data, tx_2).await {
Ok(data) => data,
Err(x) => ResponseData::Error {
description: x.to_string(),
},
}
}));
}
// Collect the results of our tasks into the payload entries
let payload = future::join_all(payload_tasks)
.await
.into_iter()
.map(|x| match x {
Ok(x) => x,
Err(x) => ResponseData::Error {
description: x.to_string(),
},
},
);
})
.collect();
let res = Response::new(req.tenant, Some(req.id), payload);
debug!(
"<Client @ {}> Sending response of type {}",
"<Client @ {}> Sending response of type{} {}",
addr,
res.payload.as_ref()
if res.payload.len() > 1 { "s" } else { "" },
res.to_payload_type_string()
);
// Send out our primary response from processing the request
tx.send(res).await
}
async fn file_read(path: PathBuf) -> Result<ResponsePayload, Box<dyn Error>> {
Ok(ResponsePayload::Blob {
async fn file_read(path: PathBuf) -> Result<ResponseData, Box<dyn Error>> {
Ok(ResponseData::Blob {
data: tokio::fs::read(path).await?,
})
}
async fn file_read_text(path: PathBuf) -> Result<ResponsePayload, Box<dyn Error>> {
Ok(ResponsePayload::Text {
async fn file_read_text(path: PathBuf) -> Result<ResponseData, Box<dyn Error>> {
Ok(ResponseData::Text {
data: tokio::fs::read_to_string(path).await?,
})
}
async fn file_write(
path: PathBuf,
data: impl AsRef<[u8]>,
) -> Result<ResponsePayload, Box<dyn Error>> {
async fn file_write(path: PathBuf, data: impl AsRef<[u8]>) -> Result<ResponseData, Box<dyn Error>> {
tokio::fs::write(path, data).await?;
Ok(ResponsePayload::Ok)
Ok(ResponseData::Ok)
}
async fn file_append(
path: PathBuf,
data: impl AsRef<[u8]>,
) -> Result<ResponsePayload, Box<dyn Error>> {
) -> Result<ResponseData, Box<dyn Error>> {
let mut file = tokio::fs::OpenOptions::new()
.append(true)
.open(path)
.await?;
file.write_all(data.as_ref()).await?;
Ok(ResponsePayload::Ok)
Ok(ResponseData::Ok)
}
async fn dir_read(
@ -129,7 +147,7 @@ async fn dir_read(
absolute: bool,
canonicalize: bool,
include_root: bool,
) -> Result<ResponsePayload, Box<dyn Error>> {
) -> Result<ResponseData, Box<dyn Error>> {
// Canonicalize our provided path to ensure that it is exists, not a loop, and absolute
let root_path = tokio::fs::canonicalize(path).await?;
@ -208,20 +226,20 @@ async fn dir_read(
}
}
Ok(ResponsePayload::DirEntries { entries, errors })
Ok(ResponseData::DirEntries { entries, errors })
}
async fn dir_create(path: PathBuf, all: bool) -> Result<ResponsePayload, Box<dyn Error>> {
async fn dir_create(path: PathBuf, all: bool) -> Result<ResponseData, Box<dyn Error>> {
if all {
tokio::fs::create_dir_all(path).await?;
} else {
tokio::fs::create_dir(path).await?;
}
Ok(ResponsePayload::Ok)
Ok(ResponseData::Ok)
}
async fn remove(path: PathBuf, force: bool) -> Result<ResponsePayload, Box<dyn Error>> {
async fn remove(path: PathBuf, force: bool) -> Result<ResponseData, Box<dyn Error>> {
let path_metadata = tokio::fs::metadata(path.as_path()).await?;
if path_metadata.is_dir() {
if force {
@ -233,10 +251,10 @@ async fn remove(path: PathBuf, force: bool) -> Result<ResponsePayload, Box<dyn E
tokio::fs::remove_file(path).await?;
}
Ok(ResponsePayload::Ok)
Ok(ResponseData::Ok)
}
async fn copy(src: PathBuf, dst: PathBuf) -> Result<ResponsePayload, Box<dyn Error>> {
async fn copy(src: PathBuf, dst: PathBuf) -> Result<ResponseData, Box<dyn Error>> {
let src_metadata = tokio::fs::metadata(src.as_path()).await?;
if src_metadata.is_dir() {
for entry in WalkDir::new(src.as_path())
@ -273,16 +291,16 @@ async fn copy(src: PathBuf, dst: PathBuf) -> Result<ResponsePayload, Box<dyn Err
tokio::fs::copy(src, dst).await?;
}
Ok(ResponsePayload::Ok)
Ok(ResponseData::Ok)
}
async fn rename(src: PathBuf, dst: PathBuf) -> Result<ResponsePayload, Box<dyn Error>> {
async fn rename(src: PathBuf, dst: PathBuf) -> Result<ResponseData, Box<dyn Error>> {
tokio::fs::rename(src, dst).await?;
Ok(ResponsePayload::Ok)
Ok(ResponseData::Ok)
}
async fn metadata(path: PathBuf, canonicalize: bool) -> Result<ResponsePayload, Box<dyn Error>> {
async fn metadata(path: PathBuf, canonicalize: bool) -> Result<ResponseData, Box<dyn Error>> {
let metadata = tokio::fs::metadata(path.as_path()).await?;
let canonicalized_path = if canonicalize {
Some(tokio::fs::canonicalize(path).await?)
@ -290,7 +308,7 @@ async fn metadata(path: PathBuf, canonicalize: bool) -> Result<ResponsePayload,
None
};
Ok(ResponsePayload::Metadata {
Ok(ResponseData::Metadata {
canonicalized_path,
accessed: metadata
.accessed()
@ -326,7 +344,7 @@ async fn proc_run(
tx: Reply,
cmd: String,
args: Vec<String>,
) -> Result<ResponsePayload, Box<dyn Error>> {
) -> Result<ResponseData, Box<dyn Error>> {
let id = rand::random();
let mut child = Command::new(cmd.to_string())
@ -349,12 +367,13 @@ async fn proc_run(
let res = Response::new(
tenant_2.as_str(),
None,
ResponsePayload::ProcStdout { id, data },
vec![ResponseData::ProcStdout { id, data }],
);
debug!(
"<Client @ {}> Sending response of type {}",
"<Client @ {}> Sending response of type{} {}",
addr,
res.payload.as_ref()
if res.payload.len() > 1 { "s" } else { "" },
res.to_payload_type_string()
);
if let Err(_) = tx_2.send(res).await {
break;
@ -384,12 +403,13 @@ async fn proc_run(
let res = Response::new(
tenant_2.as_str(),
None,
ResponsePayload::ProcStderr { id, data },
vec![ResponseData::ProcStderr { id, data }],
);
debug!(
"<Client @ {}> Sending response of type {}",
"<Client @ {}> Sending response of type{} {}",
addr,
res.payload.as_ref()
if res.payload.len() > 1 { "s" } else { "" },
res.to_payload_type_string()
);
if let Err(_) = tx_2.send(res).await {
break;
@ -439,25 +459,27 @@ async fn proc_run(
let res = Response::new(
tenant.as_str(),
None,
ResponsePayload::ProcDone { id, success, code }
vec![ResponseData::ProcDone { id, success, code }]
);
debug!(
"<Client @ {}> Sending response of type {}",
"<Client @ {}> Sending response of type{} {}",
addr,
res.payload.as_ref()
if res.payload.len() > 1 { "s" } else { "" },
res.to_payload_type_string()
);
if let Err(_) = tx.send(res).await {
error!("Failed to send done for process {}!", id);
}
}
Err(x) => {
let res = Response::new(tenant.as_str(), None, ResponsePayload::Error {
let res = Response::new(tenant.as_str(), None, vec![ResponseData::Error {
description: x.to_string()
});
}]);
debug!(
"<Client @ {}> Sending response of type {}",
"<Client @ {}> Sending response of type{} {}",
addr,
res.payload.as_ref()
if res.payload.len() > 1 { "s" } else { "" },
res.to_payload_type_string()
);
if let Err(_) = tx.send(res).await {
error!("Failed to send error for waiting on process {}!", id);
@ -480,13 +502,14 @@ async fn proc_run(
}
let res = Response::new(tenant.as_str(), None, ResponsePayload::ProcDone {
let res = Response::new(tenant.as_str(), None, vec![ResponseData::ProcDone {
id, success: false, code: None
});
}]);
debug!(
"<Client @ {}> Sending response of type {}",
"<Client @ {}> Sending response of type{} {}",
addr,
res.payload.as_ref()
if res.payload.len() > 1 { "s" } else { "" },
res.to_payload_type_string()
);
if let Err(_) = tx
.send(res)
@ -508,10 +531,10 @@ async fn proc_run(
};
state.lock().await.push_process(addr, process);
Ok(ResponsePayload::ProcStart { id })
Ok(ResponseData::ProcStart { id })
}
async fn proc_kill(state: HState, id: usize) -> Result<ResponsePayload, Box<dyn Error>> {
async fn proc_kill(state: HState, id: usize) -> Result<ResponseData, Box<dyn Error>> {
if let Some(process) = state.lock().await.processes.remove(&id) {
process.kill_tx.send(()).map_err(|_| {
io::Error::new(
@ -521,25 +544,25 @@ async fn proc_kill(state: HState, id: usize) -> Result<ResponsePayload, Box<dyn
})?;
}
Ok(ResponsePayload::Ok)
Ok(ResponseData::Ok)
}
async fn proc_stdin(
state: HState,
id: usize,
data: String,
) -> Result<ResponsePayload, Box<dyn Error>> {
) -> Result<ResponseData, Box<dyn Error>> {
if let Some(process) = state.lock().await.processes.get(&id) {
process.stdin_tx.send(data).await.map_err(|_| {
io::Error::new(io::ErrorKind::BrokenPipe, "Unable to send stdin to process")
})?;
}
Ok(ResponsePayload::Ok)
Ok(ResponseData::Ok)
}
async fn proc_list(state: HState) -> Result<ResponsePayload, Box<dyn Error>> {
Ok(ResponsePayload::ProcEntries {
async fn proc_list(state: HState) -> Result<ResponseData, Box<dyn Error>> {
Ok(ResponseData::ProcEntries {
entries: state
.lock()
.await
@ -554,8 +577,8 @@ async fn proc_list(state: HState) -> Result<ResponsePayload, Box<dyn Error>> {
})
}
async fn system_info() -> Result<ResponsePayload, Box<dyn Error>> {
Ok(ResponsePayload::SystemInfo {
async fn system_info() -> Result<ResponseData, Box<dyn Error>> {
Ok(ResponseData::SystemInfo {
family: env::consts::FAMILY.to_string(),
os: env::consts::OS.to_string(),
arch: env::consts::ARCH.to_string(),

@ -158,9 +158,10 @@ async fn request_loop(
match transport.receive::<Request>().await {
Ok(Some(req)) => {
debug!(
"<Client @ {}> Received request of type {}",
"<Client @ {}> Received request of type{} {}",
addr,
req.payload.as_ref()
if req.payload.len() > 1 { "s" } else { "" },
req.to_payload_type_string()
);
if let Err(x) = handler::process(addr, Arc::clone(&state), req, tx.clone()).await {

@ -14,13 +14,13 @@ pub struct Request {
/// A unique id associated with the request
pub id: usize,
/// The main payload containing the type and data of the request
pub payload: RequestPayload,
/// The main payload containing a collection of data comprising one or more actions
pub payload: Vec<RequestData>,
}
impl Request {
/// Creates a new request, generating a unique id for it
pub fn new(tenant: impl Into<String>, payload: RequestPayload) -> Self {
pub fn new(tenant: impl Into<String>, payload: Vec<RequestData>) -> Self {
let id = rand::random();
Self {
tenant: tenant.into(),
@ -28,6 +28,15 @@ impl Request {
payload,
}
}
/// Converts to a string representing the type (or types) contained in the payload
pub fn to_payload_type_string(&self) -> String {
self.payload
.iter()
.map(AsRef::as_ref)
.collect::<Vec<&str>>()
.join(",")
}
}
/// Represents the payload of a request to be performed on the remote machine
@ -39,7 +48,7 @@ impl Request {
content = "data"
)]
#[strum(serialize_all = "snake_case")]
pub enum RequestPayload {
pub enum RequestData {
/// Reads a file from the specified path on the remote machine
#[structopt(visible_aliases = &["cat"])]
FileRead {
@ -229,8 +238,8 @@ pub struct Response {
/// (some responses are sent unprompted)
pub origin_id: Option<usize>,
/// The main payload containing the type and data of the response
pub payload: ResponsePayload,
/// The main payload containing a collection of data comprising one or more results
pub payload: Vec<ResponseData>,
}
impl Response {
@ -238,7 +247,7 @@ impl Response {
pub fn new(
tenant: impl Into<String>,
origin_id: Option<usize>,
payload: ResponsePayload,
payload: Vec<ResponseData>,
) -> Self {
let id = rand::random();
Self {
@ -248,6 +257,15 @@ impl Response {
payload,
}
}
/// Converts to a string representing the type (or types) contained in the payload
pub fn to_payload_type_string(&self) -> String {
self.payload
.iter()
.map(AsRef::as_ref)
.collect::<Vec<&str>>()
.join(",")
}
}
/// Represents the payload of a successful response
@ -259,7 +277,7 @@ impl Response {
content = "data"
)]
#[strum(serialize_all = "snake_case")]
pub enum ResponsePayload {
pub enum ResponseData {
/// General okay with no extra data, returned in cases like
/// creating or removing a directory, copying a file, or renaming
/// a file

Loading…
Cancel
Save