diff --git a/CHANGELOG.md b/CHANGELOG.md index f5ea2c0..391a72a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Changed + +- Renamed `distant_core::data` to `distant_core::protocol` + ## [0.20.0-alpha.5] ### Added diff --git a/distant-core/src/api.rs b/distant-core/src/api.rs index ced8018..3be21ab 100644 --- a/distant-core/src/api.rs +++ b/distant-core/src/api.rs @@ -7,11 +7,10 @@ use distant_net::common::ConnectionId; use distant_net::server::{ConnectionCtx, Reply, ServerCtx, ServerHandler}; use log::*; -use crate::data::{ - Capabilities, ChangeKind, DirEntry, Environment, Error, Metadata, ProcessId, PtySize, SearchId, - SearchQuery, SystemInfo, +use crate::protocol::{ + self, Capabilities, ChangeKind, DirEntry, Environment, Error, Metadata, ProcessId, PtySize, + SearchId, SearchQuery, SystemInfo, }; -use crate::{DistantMsg, DistantRequestData, DistantResponseData}; mod local; pub use local::LocalDistantApi; @@ -22,7 +21,7 @@ use reply::DistantSingleReply; /// Represents the context provided to the [`DistantApi`] for incoming requests pub struct DistantCtx { pub connection_id: ConnectionId, - pub reply: Box>, + pub reply: Box>, pub local_data: Arc, } @@ -423,8 +422,8 @@ where D: Send + Sync, { type LocalData = D; - type Request = DistantMsg; - type Response = DistantMsg; + type Request = protocol::Msg; + type Response = protocol::Msg; /// Overridden to leverage [`DistantApi`] implementation of `on_accept` async fn on_accept(&self, ctx: ConnectionCtx<'_, Self::LocalData>) -> io::Result<()> { @@ -445,7 +444,7 @@ where // Process single vs batch requests let response = match request.payload { - DistantMsg::Single(data) => { + protocol::Msg::Single(data) => { let ctx = DistantCtx { connection_id, reply: Box::new(DistantSingleReply::from(reply.clone_reply())), @@ -455,13 +454,13 @@ where let data = handle_request(self, ctx, data).await; // Report outgoing errors in our debug logs - if let DistantResponseData::Error(x) = &data { + if let protocol::Response::Error(x) = &data { debug!("[Conn {}] {}", connection_id, x); } - DistantMsg::Single(data) + protocol::Msg::Single(data) } - DistantMsg::Batch(list) => { + protocol::Msg::Batch(list) => { let mut out = Vec::new(); for data in list { @@ -480,14 +479,14 @@ where let data = handle_request(self, ctx, data).await; // Report outgoing errors in our debug logs - if let DistantResponseData::Error(x) = &data { + if let protocol::Response::Error(x) = &data { debug!("[Conn {}] {}", connection_id, x); } out.push(data); } - DistantMsg::Batch(out) + protocol::Msg::Batch(out) } }; @@ -512,56 +511,56 @@ where async fn handle_request( server: &DistantApiServerHandler, ctx: DistantCtx, - request: DistantRequestData, -) -> DistantResponseData + request: protocol::Request, +) -> protocol::Response where T: DistantApi + Send + Sync, D: Send + Sync, { match request { - DistantRequestData::Capabilities {} => server + protocol::Request::Capabilities {} => server .api .capabilities(ctx) .await - .map(|supported| DistantResponseData::Capabilities { supported }) - .unwrap_or_else(DistantResponseData::from), - DistantRequestData::FileRead { path } => server + .map(|supported| protocol::Response::Capabilities { supported }) + .unwrap_or_else(protocol::Response::from), + protocol::Request::FileRead { path } => server .api .read_file(ctx, path) .await - .map(|data| DistantResponseData::Blob { data }) - .unwrap_or_else(DistantResponseData::from), - DistantRequestData::FileReadText { path } => server + .map(|data| protocol::Response::Blob { data }) + .unwrap_or_else(protocol::Response::from), + protocol::Request::FileReadText { path } => server .api .read_file_text(ctx, path) .await - .map(|data| DistantResponseData::Text { data }) - .unwrap_or_else(DistantResponseData::from), - DistantRequestData::FileWrite { path, data } => server + .map(|data| protocol::Response::Text { data }) + .unwrap_or_else(protocol::Response::from), + protocol::Request::FileWrite { path, data } => server .api .write_file(ctx, path, data) .await - .map(|_| DistantResponseData::Ok) - .unwrap_or_else(DistantResponseData::from), - DistantRequestData::FileWriteText { path, text } => server + .map(|_| protocol::Response::Ok) + .unwrap_or_else(protocol::Response::from), + protocol::Request::FileWriteText { path, text } => server .api .write_file_text(ctx, path, text) .await - .map(|_| DistantResponseData::Ok) - .unwrap_or_else(DistantResponseData::from), - DistantRequestData::FileAppend { path, data } => server + .map(|_| protocol::Response::Ok) + .unwrap_or_else(protocol::Response::from), + protocol::Request::FileAppend { path, data } => server .api .append_file(ctx, path, data) .await - .map(|_| DistantResponseData::Ok) - .unwrap_or_else(DistantResponseData::from), - DistantRequestData::FileAppendText { path, text } => server + .map(|_| protocol::Response::Ok) + .unwrap_or_else(protocol::Response::from), + protocol::Request::FileAppendText { path, text } => server .api .append_file_text(ctx, path, text) .await - .map(|_| DistantResponseData::Ok) - .unwrap_or_else(DistantResponseData::from), - DistantRequestData::DirRead { + .map(|_| protocol::Response::Ok) + .unwrap_or_else(protocol::Response::from), + protocol::Request::DirRead { path, depth, absolute, @@ -571,36 +570,36 @@ where .api .read_dir(ctx, path, depth, absolute, canonicalize, include_root) .await - .map(|(entries, errors)| DistantResponseData::DirEntries { + .map(|(entries, errors)| protocol::Response::DirEntries { entries, errors: errors.into_iter().map(Error::from).collect(), }) - .unwrap_or_else(DistantResponseData::from), - DistantRequestData::DirCreate { path, all } => server + .unwrap_or_else(protocol::Response::from), + protocol::Request::DirCreate { path, all } => server .api .create_dir(ctx, path, all) .await - .map(|_| DistantResponseData::Ok) - .unwrap_or_else(DistantResponseData::from), - DistantRequestData::Remove { path, force } => server + .map(|_| protocol::Response::Ok) + .unwrap_or_else(protocol::Response::from), + protocol::Request::Remove { path, force } => server .api .remove(ctx, path, force) .await - .map(|_| DistantResponseData::Ok) - .unwrap_or_else(DistantResponseData::from), - DistantRequestData::Copy { src, dst } => server + .map(|_| protocol::Response::Ok) + .unwrap_or_else(protocol::Response::from), + protocol::Request::Copy { src, dst } => server .api .copy(ctx, src, dst) .await - .map(|_| DistantResponseData::Ok) - .unwrap_or_else(DistantResponseData::from), - DistantRequestData::Rename { src, dst } => server + .map(|_| protocol::Response::Ok) + .unwrap_or_else(protocol::Response::from), + protocol::Request::Rename { src, dst } => server .api .rename(ctx, src, dst) .await - .map(|_| DistantResponseData::Ok) - .unwrap_or_else(DistantResponseData::from), - DistantRequestData::Watch { + .map(|_| protocol::Response::Ok) + .unwrap_or_else(protocol::Response::from), + protocol::Request::Watch { path, recursive, only, @@ -609,21 +608,21 @@ where .api .watch(ctx, path, recursive, only, except) .await - .map(|_| DistantResponseData::Ok) - .unwrap_or_else(DistantResponseData::from), - DistantRequestData::Unwatch { path } => server + .map(|_| protocol::Response::Ok) + .unwrap_or_else(protocol::Response::from), + protocol::Request::Unwatch { path } => server .api .unwatch(ctx, path) .await - .map(|_| DistantResponseData::Ok) - .unwrap_or_else(DistantResponseData::from), - DistantRequestData::Exists { path } => server + .map(|_| protocol::Response::Ok) + .unwrap_or_else(protocol::Response::from), + protocol::Request::Exists { path } => server .api .exists(ctx, path) .await - .map(|value| DistantResponseData::Exists { value }) - .unwrap_or_else(DistantResponseData::from), - DistantRequestData::Metadata { + .map(|value| protocol::Response::Exists { value }) + .unwrap_or_else(protocol::Response::from), + protocol::Request::Metadata { path, canonicalize, resolve_file_type, @@ -631,21 +630,21 @@ where .api .metadata(ctx, path, canonicalize, resolve_file_type) .await - .map(DistantResponseData::Metadata) - .unwrap_or_else(DistantResponseData::from), - DistantRequestData::Search { query } => server + .map(protocol::Response::Metadata) + .unwrap_or_else(protocol::Response::from), + protocol::Request::Search { query } => server .api .search(ctx, query) .await - .map(|id| DistantResponseData::SearchStarted { id }) - .unwrap_or_else(DistantResponseData::from), - DistantRequestData::CancelSearch { id } => server + .map(|id| protocol::Response::SearchStarted { id }) + .unwrap_or_else(protocol::Response::from), + protocol::Request::CancelSearch { id } => server .api .cancel_search(ctx, id) .await - .map(|_| DistantResponseData::Ok) - .unwrap_or_else(DistantResponseData::from), - DistantRequestData::ProcSpawn { + .map(|_| protocol::Response::Ok) + .unwrap_or_else(protocol::Response::from), + protocol::Request::ProcSpawn { cmd, environment, current_dir, @@ -654,31 +653,31 @@ where .api .proc_spawn(ctx, cmd.into(), environment, current_dir, pty) .await - .map(|id| DistantResponseData::ProcSpawned { id }) - .unwrap_or_else(DistantResponseData::from), - DistantRequestData::ProcKill { id } => server + .map(|id| protocol::Response::ProcSpawned { id }) + .unwrap_or_else(protocol::Response::from), + protocol::Request::ProcKill { id } => server .api .proc_kill(ctx, id) .await - .map(|_| DistantResponseData::Ok) - .unwrap_or_else(DistantResponseData::from), - DistantRequestData::ProcStdin { id, data } => server + .map(|_| protocol::Response::Ok) + .unwrap_or_else(protocol::Response::from), + protocol::Request::ProcStdin { id, data } => server .api .proc_stdin(ctx, id, data) .await - .map(|_| DistantResponseData::Ok) - .unwrap_or_else(DistantResponseData::from), - DistantRequestData::ProcResizePty { id, size } => server + .map(|_| protocol::Response::Ok) + .unwrap_or_else(protocol::Response::from), + protocol::Request::ProcResizePty { id, size } => server .api .proc_resize_pty(ctx, id, size) .await - .map(|_| DistantResponseData::Ok) - .unwrap_or_else(DistantResponseData::from), - DistantRequestData::SystemInfo {} => server + .map(|_| protocol::Response::Ok) + .unwrap_or_else(protocol::Response::from), + protocol::Request::SystemInfo {} => server .api .system_info(ctx) .await - .map(DistantResponseData::SystemInfo) - .unwrap_or_else(DistantResponseData::from), + .map(protocol::Response::SystemInfo) + .unwrap_or_else(protocol::Response::from), } } diff --git a/distant-core/src/api/local.rs b/distant-core/src/api/local.rs index bc6d4cc..f6849d6 100644 --- a/distant-core/src/api/local.rs +++ b/distant-core/src/api/local.rs @@ -6,7 +6,7 @@ use log::*; use tokio::io::AsyncWriteExt; use walkdir::WalkDir; -use crate::data::{ +use crate::protocol::{ Capabilities, ChangeKind, ChangeKindSet, DirEntry, Environment, FileType, Metadata, ProcessId, PtySize, SearchId, SearchQuery, SystemInfo, }; @@ -503,7 +503,7 @@ mod tests { use super::*; use crate::api::ConnectionCtx; - use crate::data::DistantResponseData; + use crate::protocol::Response; static TEMP_SCRIPT_DIR: Lazy = Lazy::new(|| assert_fs::TempDir::new().unwrap()); @@ -564,13 +564,7 @@ mod tests { static DOES_NOT_EXIST_BIN: Lazy = Lazy::new(|| TEMP_SCRIPT_DIR.child("does_not_exist_bin")); - async fn setup( - buffer: usize, - ) -> ( - LocalDistantApi, - DistantCtx<()>, - mpsc::Receiver, - ) { + async fn setup(buffer: usize) -> (LocalDistantApi, DistantCtx<()>, mpsc::Receiver) { let api = LocalDistantApi::initialize().unwrap(); let (reply, rx) = make_reply(buffer); let connection_id = rand::random(); @@ -592,12 +586,7 @@ mod tests { (api, ctx, rx) } - fn make_reply( - buffer: usize, - ) -> ( - Box>, - mpsc::Receiver, - ) { + fn make_reply(buffer: usize) -> (Box>, mpsc::Receiver) { let (tx, rx) = mpsc::channel(buffer); (Box::new(tx), rx) } @@ -1344,12 +1333,12 @@ mod tests { /// Validates a response as being a series of changes that include the provided paths fn validate_changed_paths( - data: &DistantResponseData, + data: &Response, expected_paths: &[PathBuf], should_panic: bool, ) -> bool { match data { - DistantResponseData::Changed(change) if should_panic => { + Response::Changed(change) if should_panic => { let paths: Vec = change .paths .iter() @@ -1359,7 +1348,7 @@ mod tests { true } - DistantResponseData::Changed(change) => { + Response::Changed(change) => { let paths: Vec = change .paths .iter() @@ -1901,8 +1890,8 @@ mod tests { let mut got_stdout = false; let mut got_done = false; - let mut check_data = |data: &DistantResponseData| match data { - DistantResponseData::ProcStdout { id, data } => { + let mut check_data = |data: &Response| match data { + Response::ProcStdout { id, data } => { assert_eq!( *id, proc_id, "Got {}, but expected {} as process id", @@ -1911,7 +1900,7 @@ mod tests { assert_eq!(data, b"some stdout", "Got wrong stdout"); got_stdout = true; } - DistantResponseData::ProcDone { id, success, .. } => { + Response::ProcDone { id, success, .. } => { assert_eq!( *id, proc_id, "Got {}, but expected {} as process id", @@ -1965,8 +1954,8 @@ mod tests { let mut got_stderr = false; let mut got_done = false; - let mut check_data = |data: &DistantResponseData| match data { - DistantResponseData::ProcStderr { id, data } => { + let mut check_data = |data: &Response| match data { + Response::ProcStderr { id, data } => { assert_eq!( *id, proc_id, "Got {}, but expected {} as process id", @@ -1975,7 +1964,7 @@ mod tests { assert_eq!(data, b"some stderr", "Got wrong stderr"); got_stderr = true; } - DistantResponseData::ProcDone { id, success, .. } => { + Response::ProcDone { id, success, .. } => { assert_eq!( *id, proc_id, "Got {}, but expected {} as process id", @@ -2014,7 +2003,7 @@ mod tests { // Wait for process to finish match rx.recv().await.unwrap() { - DistantResponseData::ProcDone { id, .. } => assert_eq!( + Response::ProcDone { id, .. } => assert_eq!( id, proc_id, "Got {}, but expected {} as process id", id, proc_id @@ -2056,7 +2045,7 @@ mod tests { // Wait for the completion response to come in match rx.recv().await.unwrap() { - DistantResponseData::ProcDone { id, .. } => assert_eq!( + Response::ProcDone { id, .. } => assert_eq!( id, proc_id, "Got {}, but expected {} as process id", id, proc_id @@ -2124,7 +2113,7 @@ mod tests { // Third, check the async response of stdout to verify we got stdin match rx.recv().await.unwrap() { - DistantResponseData::ProcStdout { data, .. } => { + Response::ProcStdout { data, .. } => { assert_eq!(data, b"hello world\n", "Mirrored data didn't match"); } x => panic!("Unexpected response: {:?}", x), diff --git a/distant-core/src/api/local/process.rs b/distant-core/src/api/local/process.rs index c0d7826..91be365 100644 --- a/distant-core/src/api/local/process.rs +++ b/distant-core/src/api/local/process.rs @@ -4,7 +4,7 @@ use std::pin::Pin; use tokio::io; use tokio::sync::mpsc; -use crate::data::{ProcessId, PtySize}; +use crate::protocol::{ProcessId, PtySize}; mod pty; pub use pty::*; diff --git a/distant-core/src/api/local/process/pty.rs b/distant-core/src/api/local/process/pty.rs index 3b2fb6e..03f80d6 100644 --- a/distant-core/src/api/local/process/pty.rs +++ b/distant-core/src/api/local/process/pty.rs @@ -13,7 +13,7 @@ use super::{ ProcessPty, PtySize, WaitRx, }; use crate::constants::{MAX_PIPE_CHUNK_SIZE, READ_PAUSE_DURATION}; -use crate::data::Environment; +use crate::protocol::Environment; /// Represents a process that is associated with a pty pub struct PtyProcess { diff --git a/distant-core/src/api/local/process/simple.rs b/distant-core/src/api/local/process/simple.rs index 3fa14b2..d959fdd 100644 --- a/distant-core/src/api/local/process/simple.rs +++ b/distant-core/src/api/local/process/simple.rs @@ -12,7 +12,7 @@ use super::{ wait, ExitStatus, FutureReturn, InputChannel, NoProcessPty, OutputChannel, Process, ProcessId, ProcessKiller, WaitRx, }; -use crate::data::Environment; +use crate::protocol::Environment; mod tasks; diff --git a/distant-core/src/api/local/state/process.rs b/distant-core/src/api/local/state/process.rs index 0bcc120..dd62e40 100644 --- a/distant-core/src/api/local/state/process.rs +++ b/distant-core/src/api/local/state/process.rs @@ -7,7 +7,7 @@ use distant_net::server::Reply; use tokio::sync::{mpsc, oneshot}; use tokio::task::JoinHandle; -use crate::data::{DistantResponseData, Environment, ProcessId, PtySize}; +use crate::protocol::{Environment, ProcessId, PtySize, Response}; mod instance; pub use instance::*; @@ -71,7 +71,7 @@ impl ProcessChannel { environment: Environment, current_dir: Option, pty: Option, - reply: Box>, + reply: Box>, ) -> io::Result { let (cb, rx) = oneshot::channel(); self.tx @@ -131,7 +131,7 @@ enum InnerProcessMsg { environment: Environment, current_dir: Option, pty: Option, - reply: Box>, + reply: Box>, cb: oneshot::Sender>, }, Resize { diff --git a/distant-core/src/api/local/state/process/instance.rs b/distant-core/src/api/local/state/process/instance.rs index 011dbd1..4b2cddb 100644 --- a/distant-core/src/api/local/state/process/instance.rs +++ b/distant-core/src/api/local/state/process/instance.rs @@ -9,7 +9,7 @@ use tokio::task::JoinHandle; use crate::api::local::process::{ InputChannel, OutputChannel, Process, ProcessKiller, ProcessPty, PtyProcess, SimpleProcess, }; -use crate::data::{DistantResponseData, Environment, ProcessId, PtySize}; +use crate::protocol::{Environment, ProcessId, PtySize, Response}; /// Holds information related to a spawned process on the server pub struct ProcessInstance { @@ -65,7 +65,7 @@ impl ProcessInstance { environment: Environment, current_dir: Option, pty: Option, - reply: Box>, + reply: Box>, ) -> io::Result { // Build out the command and args from our string let mut cmd_and_args = if cfg!(windows) { @@ -168,14 +168,12 @@ impl ProcessInstance { async fn stdout_task( id: ProcessId, mut stdout: Box, - reply: Box>, + reply: Box>, ) -> io::Result<()> { loop { match stdout.recv().await { Ok(Some(data)) => { - reply - .send(DistantResponseData::ProcStdout { id, data }) - .await?; + reply.send(Response::ProcStdout { id, data }).await?; } Ok(None) => return Ok(()), Err(x) => return Err(x), @@ -186,14 +184,12 @@ async fn stdout_task( async fn stderr_task( id: ProcessId, mut stderr: Box, - reply: Box>, + reply: Box>, ) -> io::Result<()> { loop { match stderr.recv().await { Ok(Some(data)) => { - reply - .send(DistantResponseData::ProcStderr { id, data }) - .await?; + reply.send(Response::ProcStderr { id, data }).await?; } Ok(None) => return Ok(()), Err(x) => return Err(x), @@ -204,20 +200,20 @@ async fn stderr_task( async fn wait_task( id: ProcessId, mut child: Box, - reply: Box>, + reply: Box>, ) -> io::Result<()> { let status = child.wait().await; match status { Ok(status) => { reply - .send(DistantResponseData::ProcDone { + .send(Response::ProcDone { id, success: status.success, code: status.code, }) .await } - Err(x) => reply.send(DistantResponseData::from(x)).await, + Err(x) => reply.send(Response::from(x)).await, } } diff --git a/distant-core/src/api/local/state/search.rs b/distant-core/src/api/local/state/search.rs index 432068b..f00ccdb 100644 --- a/distant-core/src/api/local/state/search.rs +++ b/distant-core/src/api/local/state/search.rs @@ -13,8 +13,8 @@ use log::*; use tokio::sync::{broadcast, mpsc, oneshot}; use tokio::task::JoinHandle; -use crate::data::{ - DistantResponseData, SearchId, SearchQuery, SearchQueryContentsMatch, SearchQueryMatch, +use crate::protocol::{ + Response, SearchId, SearchQuery, SearchQueryContentsMatch, SearchQueryMatch, SearchQueryMatchData, SearchQueryOptions, SearchQueryPathMatch, SearchQuerySubmatch, SearchQueryTarget, }; @@ -82,7 +82,7 @@ impl SearchChannel { pub async fn start( &self, query: SearchQuery, - reply: Box>, + reply: Box>, ) -> io::Result { let (cb, rx) = oneshot::channel(); self.tx @@ -113,7 +113,7 @@ impl SearchChannel { enum InnerSearchMsg { Start { query: Box, - reply: Box>, + reply: Box>, cb: oneshot::Sender>, }, Cancel { @@ -187,7 +187,7 @@ struct SearchQueryReporter { id: SearchId, options: SearchQueryOptions, rx: mpsc::UnboundedReceiver, - reply: Box>, + reply: Box>, } impl SearchQueryReporter { @@ -226,7 +226,7 @@ impl SearchQueryReporter { if matches.len() as u64 >= len { trace!("[Query {id}] Reached {len} paginated matches"); if let Err(x) = reply - .send(DistantResponseData::SearchResults { + .send(Response::SearchResults { id, matches: std::mem::take(&mut matches), }) @@ -241,17 +241,14 @@ impl SearchQueryReporter { // Send any remaining matches if !matches.is_empty() { trace!("[Query {id}] Sending {} remaining matches", matches.len()); - if let Err(x) = reply - .send(DistantResponseData::SearchResults { id, matches }) - .await - { + if let Err(x) = reply.send(Response::SearchResults { id, matches }).await { error!("[Query {id}] Failed to send final matches: {x}"); } } // Report that we are done trace!("[Query {id}] Reporting as done"); - if let Err(x) = reply.send(DistantResponseData::SearchDone { id }).await { + if let Err(x) = reply.send(Response::SearchDone { id }).await { error!("[Query {id}] Failed to send done status: {x}"); } } @@ -813,7 +810,7 @@ mod tests { use test_log::test; use super::*; - use crate::data::{FileType, SearchQueryCondition, SearchQueryMatchData}; + use crate::protocol::{FileType, SearchQueryCondition, SearchQueryMatchData}; fn make_path(path: &str) -> PathBuf { use std::path::MAIN_SEPARATOR; @@ -834,9 +831,9 @@ mod tests { root } - fn get_matches(data: DistantResponseData) -> Vec { + fn get_matches(data: Response) -> Vec { match data { - DistantResponseData::SearchResults { matches, .. } => matches, + Response::SearchResults { matches, .. } => matches, x => panic!("Did not get search results: {x:?}"), } } @@ -858,10 +855,7 @@ mod tests { let search_id = state.start(query, Box::new(reply)).await.unwrap(); let data = rx.recv().await; - assert_eq!( - data, - Some(DistantResponseData::SearchDone { id: search_id }) - ); + assert_eq!(data, Some(Response::SearchDone { id: search_id })); assert_eq!(rx.recv().await, None); } @@ -937,7 +931,7 @@ mod tests { assert_eq!( rx.recv().await, - Some(DistantResponseData::SearchDone { id: search_id }) + Some(Response::SearchDone { id: search_id }) ); assert_eq!(rx.recv().await, None); @@ -1013,10 +1007,7 @@ mod tests { ); let data = rx.recv().await; - assert_eq!( - data, - Some(DistantResponseData::SearchDone { id: search_id }) - ); + assert_eq!(data, Some(Response::SearchDone { id: search_id })); assert_eq!(rx.recv().await, None); } @@ -1089,10 +1080,7 @@ mod tests { ); let data = rx.recv().await; - assert_eq!( - data, - Some(DistantResponseData::SearchDone { id: search_id }) - ); + assert_eq!(data, Some(Response::SearchDone { id: search_id })); assert_eq!(rx.recv().await, None); } @@ -1181,10 +1169,7 @@ mod tests { ); let data = rx.recv().await; - assert_eq!( - data, - Some(DistantResponseData::SearchDone { id: search_id }) - ); + assert_eq!(data, Some(Response::SearchDone { id: search_id })); assert_eq!(rx.recv().await, None); } @@ -1277,10 +1262,7 @@ mod tests { ); let data = rx.recv().await; - assert_eq!( - data, - Some(DistantResponseData::SearchDone { id: search_id }) - ); + assert_eq!(data, Some(Response::SearchDone { id: search_id })); assert_eq!(rx.recv().await, None); } @@ -1314,10 +1296,7 @@ mod tests { assert_eq!(matches.len(), 2); let data = rx.recv().await; - assert_eq!( - data, - Some(DistantResponseData::SearchDone { id: search_id }) - ); + assert_eq!(data, Some(Response::SearchDone { id: search_id })); assert_eq!(rx.recv().await, None); } @@ -1355,10 +1334,7 @@ mod tests { assert_eq!(matches.len(), 1); let data = rx.recv().await; - assert_eq!( - data, - Some(DistantResponseData::SearchDone { id: search_id }) - ); + assert_eq!(data, Some(Response::SearchDone { id: search_id })); assert_eq!(rx.recv().await, None); } @@ -1402,10 +1378,7 @@ mod tests { assert_eq!(paths, expected_paths); let data = rx.recv().await; - assert_eq!( - data, - Some(DistantResponseData::SearchDone { id: search_id }) - ); + assert_eq!(data, Some(Response::SearchDone { id: search_id })); assert_eq!(rx.recv().await, None); } @@ -1506,10 +1479,7 @@ mod tests { ); let data = rx.recv().await; - assert_eq!( - data, - Some(DistantResponseData::SearchDone { id: search_id }) - ); + assert_eq!(data, Some(Response::SearchDone { id: search_id })); assert_eq!(rx.recv().await, None); } @@ -1574,10 +1544,7 @@ mod tests { ); let data = rx.recv().await; - assert_eq!( - data, - Some(DistantResponseData::SearchDone { id: search_id }) - ); + assert_eq!(data, Some(Response::SearchDone { id: search_id })); assert_eq!(rx.recv().await, None); } @@ -1629,10 +1596,7 @@ mod tests { ); let data = rx.recv().await; - assert_eq!( - data, - Some(DistantResponseData::SearchDone { id: search_id }) - ); + assert_eq!(data, Some(Response::SearchDone { id: search_id })); assert_eq!(rx.recv().await, None); } @@ -1663,10 +1627,7 @@ mod tests { // Get done indicator next as there were no matches let data = rx.recv().await; - assert_eq!( - data, - Some(DistantResponseData::SearchDone { id: search_id }) - ); + assert_eq!(data, Some(Response::SearchDone { id: search_id })); assert_eq!(rx.recv().await, None); } @@ -1715,10 +1676,7 @@ mod tests { ); let data = rx.recv().await; - assert_eq!( - data, - Some(DistantResponseData::SearchDone { id: search_id }) - ); + assert_eq!(data, Some(Response::SearchDone { id: search_id })); assert_eq!(rx.recv().await, None); } @@ -1808,10 +1766,7 @@ mod tests { ); let data = rx.recv().await; - assert_eq!( - data, - Some(DistantResponseData::SearchDone { id: search_id }) - ); + assert_eq!(data, Some(Response::SearchDone { id: search_id })); assert_eq!(rx.recv().await, None); } @@ -1867,10 +1822,7 @@ mod tests { ); let data = rx.recv().await; - assert_eq!( - data, - Some(DistantResponseData::SearchDone { id: search_id }) - ); + assert_eq!(data, Some(Response::SearchDone { id: search_id })); assert_eq!(rx.recv().await, None); } @@ -1937,10 +1889,7 @@ mod tests { ); let data = rx.recv().await; - assert_eq!( - data, - Some(DistantResponseData::SearchDone { id: search_id }) - ); + assert_eq!(data, Some(Response::SearchDone { id: search_id })); assert_eq!(rx.recv().await, None); } @@ -1998,10 +1947,7 @@ mod tests { } let data = rx.recv().await; - assert_eq!( - data, - Some(DistantResponseData::SearchDone { id: search_id }) - ); + assert_eq!(data, Some(Response::SearchDone { id: search_id })); assert_eq!(rx.recv().await, None); } diff --git a/distant-core/src/api/local/state/watcher.rs b/distant-core/src/api/local/state/watcher.rs index 7b45fbe..a564875 100644 --- a/distant-core/src/api/local/state/watcher.rs +++ b/distant-core/src/api/local/state/watcher.rs @@ -15,7 +15,7 @@ use tokio::sync::oneshot; use tokio::task::JoinHandle; use crate::constants::SERVER_WATCHER_CAPACITY; -use crate::data::ChangeKind; +use crate::protocol::ChangeKind; mod path; pub use path::*; diff --git a/distant-core/src/api/local/state/watcher/path.rs b/distant-core/src/api/local/state/watcher/path.rs index ddfc394..d689262 100644 --- a/distant-core/src/api/local/state/watcher/path.rs +++ b/distant-core/src/api/local/state/watcher/path.rs @@ -5,7 +5,7 @@ use std::{fmt, io}; use distant_net::common::ConnectionId; use distant_net::server::Reply; -use crate::data::{Change, ChangeKind, ChangeKindSet, DistantResponseData, Error}; +use crate::protocol::{Change, ChangeKind, ChangeKindSet, Error, Response}; /// Represents a path registered with a watcher that includes relevant state including /// the ability to reply with @@ -29,7 +29,7 @@ pub struct RegisteredPath { allowed: ChangeKindSet, /// Used to send a reply through the connection watching this path - reply: Box>, + reply: Box>, } impl fmt::Debug for RegisteredPath { @@ -69,7 +69,7 @@ impl RegisteredPath { recursive: bool, only: impl Into, except: impl Into, - reply: Box>, + reply: Box>, ) -> io::Result { let raw_path = path.into(); let path = tokio::fs::canonicalize(raw_path.as_path()).await?; @@ -140,7 +140,7 @@ impl RegisteredPath { if !paths.is_empty() { self.reply - .send(DistantResponseData::Changed(Change { kind, paths })) + .send(Response::Changed(Change { kind, paths })) .await .map(|_| true) } else { @@ -171,9 +171,9 @@ impl RegisteredPath { if !paths.is_empty() || !skip_if_no_paths { self.reply .send(if paths.is_empty() { - DistantResponseData::Error(Error::from(msg)) + Response::Error(Error::from(msg)) } else { - DistantResponseData::Error(Error::from(format!("{msg} about {paths:?}"))) + Response::Error(Error::from(format!("{msg} about {paths:?}"))) }) .await .map(|_| true) diff --git a/distant-core/src/api/reply.rs b/distant-core/src/api/reply.rs index 843c445..0c69822 100644 --- a/distant-core/src/api/reply.rs +++ b/distant-core/src/api/reply.rs @@ -4,28 +4,27 @@ use std::pin::Pin; use distant_net::server::Reply; -use crate::api::DistantMsg; -use crate::data::DistantResponseData; +use crate::protocol; /// Wrapper around a reply that can be batch or single, converting /// a single data into the wrapped type -pub struct DistantSingleReply(Box>>); +pub struct DistantSingleReply(Box>>); -impl From>>> for DistantSingleReply { - fn from(reply: Box>>) -> Self { +impl From>>> for DistantSingleReply { + fn from(reply: Box>>) -> Self { Self(reply) } } impl Reply for DistantSingleReply { - type Data = DistantResponseData; + type Data = protocol::Response; fn send(&self, data: Self::Data) -> Pin> + Send + '_>> { - self.0.send(DistantMsg::Single(data)) + self.0.send(protocol::Msg::Single(data)) } fn blocking_send(&self, data: Self::Data) -> io::Result<()> { - self.0.blocking_send(DistantMsg::Single(data)) + self.0.blocking_send(protocol::Msg::Single(data)) } fn clone_reply(&self) -> Box> { diff --git a/distant-core/src/client.rs b/distant-core/src/client.rs index 9d2e48f..b700762 100644 --- a/distant-core/src/client.rs +++ b/distant-core/src/client.rs @@ -1,7 +1,7 @@ use distant_net::client::Channel; use distant_net::Client; -use crate::{DistantMsg, DistantRequestData, DistantResponseData}; +use crate::protocol; mod ext; mod lsp; @@ -10,10 +10,12 @@ mod searcher; mod watcher; /// Represents a [`Client`] that communicates using the distant protocol -pub type DistantClient = Client, DistantMsg>; +pub type DistantClient = + Client, protocol::Msg>; /// Represents a [`Channel`] that communicates using the distant protocol -pub type DistantChannel = Channel, DistantMsg>; +pub type DistantChannel = + Channel, protocol::Msg>; pub use ext::*; pub use lsp::*; diff --git a/distant-core/src/client/ext.rs b/distant-core/src/client/ext.rs index d49ae1c..9449f8c 100644 --- a/distant-core/src/client/ext.rs +++ b/distant-core/src/client/ext.rs @@ -10,11 +10,10 @@ use crate::client::{ RemoteCommand, RemoteLspCommand, RemoteLspProcess, RemoteOutput, RemoteProcess, Searcher, Watcher, }; -use crate::data::{ - Capabilities, ChangeKindSet, DirEntry, DistantRequestData, DistantResponseData, Environment, - Error as Failure, Metadata, PtySize, SearchId, SearchQuery, SystemInfo, +use crate::protocol::{ + self, Capabilities, ChangeKindSet, DirEntry, Environment, Error as Failure, Metadata, PtySize, + SearchId, SearchQuery, SystemInfo, }; -use crate::DistantMsg; pub type AsyncReturn<'a, T, E = io::Error> = Pin> + Send + 'a>>; @@ -148,21 +147,21 @@ macro_rules! make_body { ($self:expr, $data:expr, @ok) => { make_body!($self, $data, |data| { match data { - DistantResponseData::Ok => Ok(()), - DistantResponseData::Error(x) => Err(io::Error::from(x)), + protocol::Response::Ok => Ok(()), + protocol::Response::Error(x) => Err(io::Error::from(x)), _ => Err(mismatched_response()), } }) }; ($self:expr, $data:expr, $and_then:expr) => {{ - let req = Request::new(DistantMsg::Single($data)); + let req = Request::new(protocol::Msg::Single($data)); Box::pin(async move { $self .send(req) .await .and_then(|res| match res.payload { - DistantMsg::Single(x) => Ok(x), + protocol::Msg::Single(x) => Ok(x), _ => Err(mismatched_response()), }) .and_then($and_then) @@ -171,7 +170,7 @@ macro_rules! make_body { } impl DistantChannelExt - for Channel, DistantMsg> + for Channel, protocol::Msg> { fn append_file( &mut self, @@ -180,7 +179,7 @@ impl DistantChannelExt ) -> AsyncReturn<'_, ()> { make_body!( self, - DistantRequestData::FileAppend { path: path.into(), data: data.into() }, + protocol::Request::FileAppend { path: path.into(), data: data.into() }, @ok ) } @@ -192,7 +191,7 @@ impl DistantChannelExt ) -> AsyncReturn<'_, ()> { make_body!( self, - DistantRequestData::FileAppendText { path: path.into(), text: data.into() }, + protocol::Request::FileAppendText { path: path.into(), text: data.into() }, @ok ) } @@ -200,10 +199,10 @@ impl DistantChannelExt fn capabilities(&mut self) -> AsyncReturn<'_, Capabilities> { make_body!( self, - DistantRequestData::Capabilities {}, + protocol::Request::Capabilities {}, |data| match data { - DistantResponseData::Capabilities { supported } => Ok(supported), - DistantResponseData::Error(x) => Err(io::Error::from(x)), + protocol::Response::Capabilities { supported } => Ok(supported), + protocol::Response::Error(x) => Err(io::Error::from(x)), _ => Err(mismatched_response()), } ) @@ -212,7 +211,7 @@ impl DistantChannelExt fn copy(&mut self, src: impl Into, dst: impl Into) -> AsyncReturn<'_, ()> { make_body!( self, - DistantRequestData::Copy { src: src.into(), dst: dst.into() }, + protocol::Request::Copy { src: src.into(), dst: dst.into() }, @ok ) } @@ -220,7 +219,7 @@ impl DistantChannelExt fn create_dir(&mut self, path: impl Into, all: bool) -> AsyncReturn<'_, ()> { make_body!( self, - DistantRequestData::DirCreate { path: path.into(), all }, + protocol::Request::DirCreate { path: path.into(), all }, @ok ) } @@ -228,10 +227,10 @@ impl DistantChannelExt fn exists(&mut self, path: impl Into) -> AsyncReturn<'_, bool> { make_body!( self, - DistantRequestData::Exists { path: path.into() }, + protocol::Request::Exists { path: path.into() }, |data| match data { - DistantResponseData::Exists { value } => Ok(value), - DistantResponseData::Error(x) => Err(io::Error::from(x)), + protocol::Response::Exists { value } => Ok(value), + protocol::Response::Error(x) => Err(io::Error::from(x)), _ => Err(mismatched_response()), } ) @@ -245,14 +244,14 @@ impl DistantChannelExt ) -> AsyncReturn<'_, Metadata> { make_body!( self, - DistantRequestData::Metadata { + protocol::Request::Metadata { path: path.into(), canonicalize, resolve_file_type }, |data| match data { - DistantResponseData::Metadata(x) => Ok(x), - DistantResponseData::Error(x) => Err(io::Error::from(x)), + protocol::Response::Metadata(x) => Ok(x), + protocol::Response::Error(x) => Err(io::Error::from(x)), _ => Err(mismatched_response()), } ) @@ -266,7 +265,7 @@ impl DistantChannelExt fn cancel_search(&mut self, id: SearchId) -> AsyncReturn<'_, ()> { make_body!( self, - DistantRequestData::CancelSearch { id }, + protocol::Request::CancelSearch { id }, @ok ) } @@ -281,7 +280,7 @@ impl DistantChannelExt ) -> AsyncReturn<'_, (Vec, Vec)> { make_body!( self, - DistantRequestData::DirRead { + protocol::Request::DirRead { path: path.into(), depth, absolute, @@ -289,8 +288,8 @@ impl DistantChannelExt include_root }, |data| match data { - DistantResponseData::DirEntries { entries, errors } => Ok((entries, errors)), - DistantResponseData::Error(x) => Err(io::Error::from(x)), + protocol::Response::DirEntries { entries, errors } => Ok((entries, errors)), + protocol::Response::Error(x) => Err(io::Error::from(x)), _ => Err(mismatched_response()), } ) @@ -299,10 +298,10 @@ impl DistantChannelExt fn read_file(&mut self, path: impl Into) -> AsyncReturn<'_, Vec> { make_body!( self, - DistantRequestData::FileRead { path: path.into() }, + protocol::Request::FileRead { path: path.into() }, |data| match data { - DistantResponseData::Blob { data } => Ok(data), - DistantResponseData::Error(x) => Err(io::Error::from(x)), + protocol::Response::Blob { data } => Ok(data), + protocol::Response::Error(x) => Err(io::Error::from(x)), _ => Err(mismatched_response()), } ) @@ -311,10 +310,10 @@ impl DistantChannelExt fn read_file_text(&mut self, path: impl Into) -> AsyncReturn<'_, String> { make_body!( self, - DistantRequestData::FileReadText { path: path.into() }, + protocol::Request::FileReadText { path: path.into() }, |data| match data { - DistantResponseData::Text { data } => Ok(data), - DistantResponseData::Error(x) => Err(io::Error::from(x)), + protocol::Response::Text { data } => Ok(data), + protocol::Response::Error(x) => Err(io::Error::from(x)), _ => Err(mismatched_response()), } ) @@ -323,7 +322,7 @@ impl DistantChannelExt fn remove(&mut self, path: impl Into, force: bool) -> AsyncReturn<'_, ()> { make_body!( self, - DistantRequestData::Remove { path: path.into(), force }, + protocol::Request::Remove { path: path.into(), force }, @ok ) } @@ -331,7 +330,7 @@ impl DistantChannelExt fn rename(&mut self, src: impl Into, dst: impl Into) -> AsyncReturn<'_, ()> { make_body!( self, - DistantRequestData::Rename { src: src.into(), dst: dst.into() }, + protocol::Request::Rename { src: src.into(), dst: dst.into() }, @ok ) } @@ -351,12 +350,15 @@ impl DistantChannelExt fn unwatch(&mut self, path: impl Into) -> AsyncReturn<'_, ()> { fn inner_unwatch( - channel: &mut Channel, DistantMsg>, + channel: &mut Channel< + protocol::Msg, + protocol::Msg, + >, path: impl Into, ) -> AsyncReturn<'_, ()> { make_body!( channel, - DistantRequestData::Unwatch { path: path.into() }, + protocol::Request::Unwatch { path: path.into() }, @ok ) } @@ -423,9 +425,9 @@ impl DistantChannelExt } fn system_info(&mut self) -> AsyncReturn<'_, SystemInfo> { - make_body!(self, DistantRequestData::SystemInfo {}, |data| match data { - DistantResponseData::SystemInfo(x) => Ok(x), - DistantResponseData::Error(x) => Err(io::Error::from(x)), + make_body!(self, protocol::Request::SystemInfo {}, |data| match data { + protocol::Response::SystemInfo(x) => Ok(x), + protocol::Response::Error(x) => Err(io::Error::from(x)), _ => Err(mismatched_response()), }) } @@ -437,7 +439,7 @@ impl DistantChannelExt ) -> AsyncReturn<'_, ()> { make_body!( self, - DistantRequestData::FileWrite { path: path.into(), data: data.into() }, + protocol::Request::FileWrite { path: path.into(), data: data.into() }, @ok ) } @@ -449,7 +451,7 @@ impl DistantChannelExt ) -> AsyncReturn<'_, ()> { make_body!( self, - DistantRequestData::FileWriteText { path: path.into(), text: data.into() }, + protocol::Request::FileWriteText { path: path.into(), text: data.into() }, @ok ) } diff --git a/distant-core/src/client/lsp.rs b/distant-core/src/client/lsp.rs index 873704f..7ead3ce 100644 --- a/distant-core/src/client/lsp.rs +++ b/distant-core/src/client/lsp.rs @@ -3,15 +3,15 @@ use std::ops::{Deref, DerefMut}; use std::path::PathBuf; use futures::stream::{Stream, StreamExt}; +use tokio::sync::mpsc; use tokio::sync::mpsc::error::TryRecvError; -use tokio::sync::mpsc::{self}; use tokio::task::JoinHandle; use crate::client::{ DistantChannel, RemoteCommand, RemoteProcess, RemoteStatus, RemoteStderr, RemoteStdin, RemoteStdout, }; -use crate::data::{Environment, PtySize}; +use crate::protocol::{Environment, PtySize}; mod msg; pub use msg::*; @@ -402,7 +402,7 @@ mod tests { use test_log::test; use super::*; - use crate::data::{DistantRequestData, DistantResponseData}; + use crate::protocol; /// Timeout used with timeout function const TIMEOUT: Duration = Duration::from_millis(50); @@ -421,12 +421,12 @@ mod tests { }); // Wait until we get the request from the session - let req: Request = t1.read_frame_as().await.unwrap().unwrap(); + let req: Request = t1.read_frame_as().await.unwrap().unwrap(); // Send back a response through the session t1.write_frame_for(&Response::new( req.id, - DistantResponseData::ProcSpawned { id: rand::random() }, + protocol::Response::ProcSpawned { id: rand::random() }, )) .await .unwrap(); @@ -473,9 +473,9 @@ mod tests { .unwrap(); // Validate that the outgoing req is a complete LSP message - let req: Request = transport.read_frame_as().await.unwrap().unwrap(); + let req: Request = transport.read_frame_as().await.unwrap().unwrap(); match req.payload { - DistantRequestData::ProcStdin { data, .. } => { + protocol::Request::ProcStdin { data, .. } => { assert_eq!( data, make_lsp_msg(serde_json::json!({ @@ -507,7 +507,7 @@ mod tests { tokio::task::yield_now().await; let result = timeout( TIMEOUT, - transport.read_frame_as::>(), + transport.read_frame_as::>(), ) .await; assert!(result.is_err(), "Unexpectedly got data: {:?}", result); @@ -516,9 +516,9 @@ mod tests { proc.stdin.as_mut().unwrap().write(msg_b).await.unwrap(); // Validate that the outgoing req is a complete LSP message - let req: Request = transport.read_frame_as().await.unwrap().unwrap(); + let req: Request = transport.read_frame_as().await.unwrap().unwrap(); match req.payload { - DistantRequestData::ProcStdin { data, .. } => { + protocol::Request::ProcStdin { data, .. } => { assert_eq!( data, make_lsp_msg(serde_json::json!({ @@ -551,9 +551,9 @@ mod tests { .unwrap(); // Validate that the outgoing req is a complete LSP message - let req: Request = transport.read_frame_as().await.unwrap().unwrap(); + let req: Request = transport.read_frame_as().await.unwrap().unwrap(); match req.payload { - DistantRequestData::ProcStdin { data, .. } => { + protocol::Request::ProcStdin { data, .. } => { assert_eq!( data, make_lsp_msg(serde_json::json!({ @@ -600,9 +600,9 @@ mod tests { .unwrap(); // Validate that the first outgoing req is a complete LSP message matching first - let req: Request = transport.read_frame_as().await.unwrap().unwrap(); + let req: Request = transport.read_frame_as().await.unwrap().unwrap(); match req.payload { - DistantRequestData::ProcStdin { data, .. } => { + protocol::Request::ProcStdin { data, .. } => { assert_eq!( data, make_lsp_msg(serde_json::json!({ @@ -615,9 +615,9 @@ mod tests { } // Validate that the second outgoing req is a complete LSP message matching second - let req: Request = transport.read_frame_as().await.unwrap().unwrap(); + let req: Request = transport.read_frame_as().await.unwrap().unwrap(); match req.payload { - DistantRequestData::ProcStdin { data, .. } => { + protocol::Request::ProcStdin { data, .. } => { assert_eq!( data, make_lsp_msg(serde_json::json!({ @@ -645,9 +645,9 @@ mod tests { .unwrap(); // Validate that the outgoing req is a complete LSP message - let req: Request = transport.read_frame_as().await.unwrap().unwrap(); + let req: Request = transport.read_frame_as().await.unwrap().unwrap(); match req.payload { - DistantRequestData::ProcStdin { data, .. } => { + protocol::Request::ProcStdin { data, .. } => { // Verify the contents AND headers are as expected; in this case, // this will also ensure that the Content-Length is adjusted // when the distant scheme was changed to file @@ -671,7 +671,7 @@ mod tests { transport .write_frame_for(&Response::new( proc.origin_id().to_string(), - DistantResponseData::ProcStdout { + protocol::Response::ProcStdout { id: proc.id(), data: make_lsp_msg(serde_json::json!({ "field1": "a", @@ -707,7 +707,7 @@ mod tests { transport .write_frame_for(&Response::new( proc.origin_id().to_string(), - DistantResponseData::ProcStdout { + protocol::Response::ProcStdout { id: proc.id(), data: msg_a.to_vec(), }, @@ -725,7 +725,7 @@ mod tests { transport .write_frame_for(&Response::new( proc.origin_id().to_string(), - DistantResponseData::ProcStdout { + protocol::Response::ProcStdout { id: proc.id(), data: msg_b.to_vec(), }, @@ -759,7 +759,7 @@ mod tests { transport .write_frame_for(&Response::new( proc.origin_id().to_string(), - DistantResponseData::ProcStdout { + protocol::Response::ProcStdout { id: proc.id(), data: format!("{}{}", String::from_utf8(msg).unwrap(), extra).into_bytes(), }, @@ -802,7 +802,7 @@ mod tests { transport .write_frame_for(&Response::new( proc.origin_id().to_string(), - DistantResponseData::ProcStdout { + protocol::Response::ProcStdout { id: proc.id(), data: format!( "{}{}", @@ -844,7 +844,7 @@ mod tests { transport .write_frame_for(&Response::new( proc.origin_id().to_string(), - DistantResponseData::ProcStdout { + protocol::Response::ProcStdout { id: proc.id(), data: make_lsp_msg(serde_json::json!({ "field1": "distant://some/path", @@ -874,7 +874,7 @@ mod tests { transport .write_frame_for(&Response::new( proc.origin_id().to_string(), - DistantResponseData::ProcStderr { + protocol::Response::ProcStderr { id: proc.id(), data: make_lsp_msg(serde_json::json!({ "field1": "a", @@ -910,7 +910,7 @@ mod tests { transport .write_frame_for(&Response::new( proc.origin_id().to_string(), - DistantResponseData::ProcStderr { + protocol::Response::ProcStderr { id: proc.id(), data: msg_a.to_vec(), }, @@ -928,7 +928,7 @@ mod tests { transport .write_frame_for(&Response::new( proc.origin_id().to_string(), - DistantResponseData::ProcStderr { + protocol::Response::ProcStderr { id: proc.id(), data: msg_b.to_vec(), }, @@ -962,7 +962,7 @@ mod tests { transport .write_frame_for(&Response::new( proc.origin_id().to_string(), - DistantResponseData::ProcStderr { + protocol::Response::ProcStderr { id: proc.id(), data: format!("{}{}", String::from_utf8(msg).unwrap(), extra).into_bytes(), }, @@ -1005,7 +1005,7 @@ mod tests { transport .write_frame_for(&Response::new( proc.origin_id().to_string(), - DistantResponseData::ProcStderr { + protocol::Response::ProcStderr { id: proc.id(), data: format!( "{}{}", @@ -1047,7 +1047,7 @@ mod tests { transport .write_frame_for(&Response::new( proc.origin_id().to_string(), - DistantResponseData::ProcStderr { + protocol::Response::ProcStderr { id: proc.id(), data: make_lsp_msg(serde_json::json!({ "field1": "distant://some/path", diff --git a/distant-core/src/client/process.rs b/distant-core/src/client/process.rs index 827364c..aa2bd3a 100644 --- a/distant-core/src/client/process.rs +++ b/distant-core/src/client/process.rs @@ -5,15 +5,14 @@ use distant_net::client::Mailbox; use distant_net::common::{Request, Response}; use log::*; use tokio::io; +use tokio::sync::mpsc; use tokio::sync::mpsc::error::{TryRecvError, TrySendError}; -use tokio::sync::mpsc::{self}; use tokio::sync::RwLock; use tokio::task::JoinHandle; use crate::client::DistantChannel; use crate::constants::CLIENT_PIPE_CAPACITY; -use crate::data::{Cmd, DistantRequestData, DistantResponseData, Environment, ProcessId, PtySize}; -use crate::DistantMsg; +use crate::protocol::{self, Cmd, Environment, ProcessId, PtySize}; #[derive(Clone, Debug, PartialEq, Eq)] pub struct RemoteOutput { @@ -89,8 +88,8 @@ impl RemoteCommand { // Submit our run request and get back a mailbox for responses let mut mailbox = channel - .mail(Request::new(DistantMsg::Single( - DistantRequestData::ProcSpawn { + .mail(Request::new(protocol::Msg::Single( + protocol::Request::ProcSpawn { cmd: Cmd::from(cmd), pty: self.pty, environment: self.environment.clone(), @@ -104,15 +103,17 @@ impl RemoteCommand { Some(res) => { let origin_id = res.origin_id; match res.payload { - DistantMsg::Single(DistantResponseData::ProcSpawned { id }) => (id, origin_id), - DistantMsg::Single(DistantResponseData::Error(x)) => return Err(x.into()), - DistantMsg::Single(x) => { + protocol::Msg::Single(protocol::Response::ProcSpawned { id }) => { + (id, origin_id) + } + protocol::Msg::Single(protocol::Response::Error(x)) => return Err(x.into()), + protocol::Msg::Single(x) => { return Err(io::Error::new( io::ErrorKind::InvalidData, format!("Got response type of {}", x.as_ref()), )) } - DistantMsg::Batch(_) => { + protocol::Msg::Batch(_) => { return Err(io::Error::new( io::ErrorKind::InvalidData, "Got batch instead of single response", @@ -492,7 +493,7 @@ async fn process_outgoing_requests( match data { Some(data) => channel.fire( Request::new( - DistantMsg::Single(DistantRequestData::ProcStdin { id, data }) + protocol::Msg::Single(protocol::Request::ProcStdin { id, data }) ) ).await?, None => break Err(errors::dead_channel()), @@ -502,7 +503,7 @@ async fn process_outgoing_requests( match size { Some(size) => channel.fire( Request::new( - DistantMsg::Single(DistantRequestData::ProcResizePty { id, size }) + protocol::Msg::Single(protocol::Request::ProcResizePty { id, size }) ) ).await?, None => break Err(errors::dead_channel()), @@ -511,7 +512,7 @@ async fn process_outgoing_requests( msg = kill_rx.recv() => { if msg.is_some() { channel.fire(Request::new( - DistantMsg::Single(DistantRequestData::ProcKill { id }) + protocol::Msg::Single(protocol::Request::ProcKill { id }) )).await?; break Ok(()); } else { @@ -528,7 +529,7 @@ async fn process_outgoing_requests( /// Helper function that loops, processing incoming stdout & stderr requests from a remote process async fn process_incoming_responses( proc_id: ProcessId, - mut mailbox: Mailbox>>, + mut mailbox: Mailbox>>, stdout_tx: mpsc::Sender>, stderr_tx: mpsc::Sender>, kill_tx: mpsc::Sender<()>, @@ -538,7 +539,7 @@ async fn process_incoming_responses( // Check if any of the payload data is the termination let exit_status = payload.iter().find_map(|data| match data { - DistantResponseData::ProcDone { id, success, code } if *id == proc_id => { + protocol::Response::ProcDone { id, success, code } if *id == proc_id => { Some((*success, *code)) } _ => None, @@ -548,10 +549,10 @@ async fn process_incoming_responses( // TODO: What should we do about unexpected data? For now, just ignore for data in payload { match data { - DistantResponseData::ProcStdout { id, data } if id == proc_id => { + protocol::Response::ProcStdout { id, data } if id == proc_id => { let _ = stdout_tx.send(data).await; } - DistantResponseData::ProcStderr { id, data } if id == proc_id => { + protocol::Response::ProcStderr { id, data } if id == proc_id => { let _ = stderr_tx.send(data).await; } _ => {} @@ -596,7 +597,7 @@ mod tests { use super::*; use crate::client::DistantClient; - use crate::data::{Error, ErrorKind}; + use crate::protocol::{Error, ErrorKind}; fn make_session() -> (FramedTransport, DistantClient) { let (t1, t2) = FramedTransport::pair(100); @@ -616,14 +617,14 @@ mod tests { }); // Wait until we get the request from the session - let req: Request> = + let req: Request> = transport.read_frame_as().await.unwrap().unwrap(); // Send back a response through the session transport .write_frame_for(&Response::new( req.id, - DistantMsg::Batch(vec![DistantResponseData::ProcSpawned { id: 1 }]), + protocol::Msg::Batch(vec![protocol::Response::ProcSpawned { id: 1 }]), )) .await .unwrap(); @@ -648,14 +649,14 @@ mod tests { }); // Wait until we get the request from the session - let req: Request> = + let req: Request> = transport.read_frame_as().await.unwrap().unwrap(); // Send back a response through the session transport .write_frame_for(&Response::new( req.id, - DistantMsg::Single(DistantResponseData::Error(Error { + protocol::Msg::Single(protocol::Response::Error(Error { kind: ErrorKind::BrokenPipe, description: String::from("some error"), })), @@ -683,7 +684,7 @@ mod tests { }); // Wait until we get the request from the session - let req: Request> = + let req: Request> = transport.read_frame_as().await.unwrap().unwrap(); // Send back a response through the session @@ -691,7 +692,7 @@ mod tests { transport .write_frame_for(&Response::new( req.id, - DistantMsg::Single(DistantResponseData::ProcSpawned { id }), + protocol::Msg::Single(protocol::Response::ProcSpawned { id }), )) .await .unwrap(); @@ -722,7 +723,7 @@ mod tests { }); // Wait until we get the request from the session - let req: Request> = + let req: Request> = transport.read_frame_as().await.unwrap().unwrap(); // Send back a response through the session @@ -730,7 +731,7 @@ mod tests { transport .write_frame_for(&Response::new( req.id, - DistantMsg::Single(DistantResponseData::ProcSpawned { id }), + protocol::Msg::Single(protocol::Response::ProcSpawned { id }), )) .await .unwrap(); @@ -740,10 +741,10 @@ mod tests { assert!(proc.kill().await.is_ok(), "Failed to send kill request"); // Verify the kill request was sent - let req: Request> = + let req: Request> = transport.read_frame_as().await.unwrap().unwrap(); match req.payload { - DistantMsg::Single(DistantRequestData::ProcKill { id: proc_id }) => { + protocol::Msg::Single(protocol::Request::ProcKill { id: proc_id }) => { assert_eq!(proc_id, id) } x => panic!("Unexpected request: {:?}", x), @@ -775,7 +776,7 @@ mod tests { }); // Wait until we get the request from the session - let req: Request> = + let req: Request> = transport.read_frame_as().await.unwrap().unwrap(); // Send back a response through the session @@ -783,7 +784,7 @@ mod tests { transport .write_frame_for(&Response::new( req.id, - DistantMsg::Single(DistantResponseData::ProcSpawned { id }), + protocol::Msg::Single(protocol::Response::ProcSpawned { id }), )) .await .unwrap(); @@ -798,10 +799,10 @@ mod tests { .unwrap(); // Verify that a request is made through the session - let req: Request> = + let req: Request> = transport.read_frame_as().await.unwrap().unwrap(); match req.payload { - DistantMsg::Single(DistantRequestData::ProcStdin { id, data }) => { + protocol::Msg::Single(protocol::Request::ProcStdin { id, data }) => { assert_eq!(id, 12345); assert_eq!(data, b"some input"); } @@ -822,7 +823,7 @@ mod tests { }); // Wait until we get the request from the session - let req: Request> = + let req: Request> = transport.read_frame_as().await.unwrap().unwrap(); // Send back a response through the session @@ -830,7 +831,7 @@ mod tests { transport .write_frame_for(&Response::new( req.id.clone(), - DistantMsg::Single(DistantResponseData::ProcSpawned { id }), + protocol::Msg::Single(protocol::Response::ProcSpawned { id }), )) .await .unwrap(); @@ -841,7 +842,7 @@ mod tests { transport .write_frame_for(&Response::new( req.id, - DistantMsg::Single(DistantResponseData::ProcStdout { + protocol::Msg::Single(protocol::Response::ProcStdout { id, data: b"some out".to_vec(), }), @@ -866,7 +867,7 @@ mod tests { }); // Wait until we get the request from the session - let req: Request> = + let req: Request> = transport.read_frame_as().await.unwrap().unwrap(); // Send back a response through the session @@ -874,7 +875,7 @@ mod tests { transport .write_frame_for(&Response::new( req.id.clone(), - DistantMsg::Single(DistantResponseData::ProcSpawned { id }), + protocol::Msg::Single(protocol::Response::ProcSpawned { id }), )) .await .unwrap(); @@ -885,7 +886,7 @@ mod tests { transport .write_frame_for(&Response::new( req.id, - DistantMsg::Single(DistantResponseData::ProcStderr { + protocol::Msg::Single(protocol::Response::ProcStderr { id, data: b"some err".to_vec(), }), @@ -910,7 +911,7 @@ mod tests { }); // Wait until we get the request from the session - let req: Request> = + let req: Request> = transport.read_frame_as().await.unwrap().unwrap(); // Send back a response through the session @@ -918,7 +919,7 @@ mod tests { transport .write_frame_for(&Response::new( req.id, - DistantMsg::Single(DistantResponseData::ProcSpawned { id }), + protocol::Msg::Single(protocol::Response::ProcSpawned { id }), )) .await .unwrap(); @@ -943,7 +944,7 @@ mod tests { }); // Wait until we get the request from the session - let req: Request> = + let req: Request> = transport.read_frame_as().await.unwrap().unwrap(); // Send back a response through the session @@ -951,7 +952,7 @@ mod tests { transport .write_frame_for(&Response::new( req.id, - DistantMsg::Single(DistantResponseData::ProcSpawned { id }), + protocol::Msg::Single(protocol::Response::ProcSpawned { id }), )) .await .unwrap(); @@ -990,7 +991,7 @@ mod tests { }); // Wait until we get the request from the session - let req: Request> = + let req: Request> = transport.read_frame_as().await.unwrap().unwrap(); // Send back a response through the session @@ -998,7 +999,7 @@ mod tests { transport .write_frame_for(&Response::new( req.id.clone(), - DistantMsg::Single(DistantResponseData::ProcSpawned { id }), + protocol::Msg::Single(protocol::Response::ProcSpawned { id }), )) .await .unwrap(); @@ -1010,7 +1011,7 @@ mod tests { transport .write_frame_for(&Response::new( req.id, - DistantMsg::Single(DistantResponseData::ProcDone { + protocol::Msg::Single(protocol::Response::ProcDone { id, success: true, code: Some(123), @@ -1045,7 +1046,7 @@ mod tests { }); // Wait until we get the request from the session - let req: Request> = + let req: Request> = transport.read_frame_as().await.unwrap().unwrap(); // Send back a response through the session @@ -1053,7 +1054,7 @@ mod tests { transport .write_frame_for(&Response::new( req.id, - DistantMsg::Single(DistantResponseData::ProcSpawned { id }), + protocol::Msg::Single(protocol::Response::ProcSpawned { id }), )) .await .unwrap(); @@ -1081,7 +1082,7 @@ mod tests { }); // Wait until we get the request from the session - let req: Request> = + let req: Request> = transport.read_frame_as().await.unwrap().unwrap(); // Send back a response through the session @@ -1089,7 +1090,7 @@ mod tests { transport .write_frame_for(&Response::new( req.id, - DistantMsg::Single(DistantResponseData::ProcSpawned { id }), + protocol::Msg::Single(protocol::Response::ProcSpawned { id }), )) .await .unwrap(); @@ -1124,7 +1125,7 @@ mod tests { }); // Wait until we get the request from the session - let req: Request> = + let req: Request> = transport.read_frame_as().await.unwrap().unwrap(); // Send back a response through the session @@ -1132,7 +1133,7 @@ mod tests { transport .write_frame_for(&Response::new( req.id.clone(), - DistantMsg::Single(DistantResponseData::ProcSpawned { id }), + protocol::Msg::Single(protocol::Response::ProcSpawned { id }), )) .await .unwrap(); @@ -1145,7 +1146,7 @@ mod tests { transport .write_frame_for(&Response::new( req.id, - DistantMsg::Single(DistantResponseData::ProcDone { + protocol::Msg::Single(protocol::Response::ProcDone { id, success: false, code: Some(123), @@ -1177,7 +1178,7 @@ mod tests { }); // Wait until we get the request from the session - let req: Request> = + let req: Request> = transport.read_frame_as().await.unwrap().unwrap(); // Send back a response through the session @@ -1185,7 +1186,7 @@ mod tests { transport .write_frame_for(&Response::new( req.id.clone(), - DistantMsg::Single(DistantResponseData::ProcSpawned { id }), + protocol::Msg::Single(protocol::Response::ProcSpawned { id }), )) .await .unwrap(); @@ -1198,7 +1199,7 @@ mod tests { transport .write_frame_for(&Response::new( req.id.clone(), - DistantMsg::Single(DistantResponseData::ProcStdout { + protocol::Msg::Single(protocol::Response::ProcStdout { id, data: b"some out".to_vec(), }), @@ -1210,7 +1211,7 @@ mod tests { transport .write_frame_for(&Response::new( req.id.clone(), - DistantMsg::Single(DistantResponseData::ProcStderr { + protocol::Msg::Single(protocol::Response::ProcStderr { id, data: b"some err".to_vec(), }), @@ -1222,7 +1223,7 @@ mod tests { transport .write_frame_for(&Response::new( req.id, - DistantMsg::Single(DistantResponseData::ProcDone { + protocol::Msg::Single(protocol::Response::ProcDone { id, success: false, code: Some(123), diff --git a/distant-core/src/client/searcher.rs b/distant-core/src/client/searcher.rs index d961a93..cef0542 100644 --- a/distant-core/src/client/searcher.rs +++ b/distant-core/src/client/searcher.rs @@ -7,10 +7,7 @@ use tokio::task::JoinHandle; use crate::client::{DistantChannel, DistantChannelExt}; use crate::constants::CLIENT_SEARCHER_CAPACITY; -use crate::data::{ - DistantRequestData, DistantResponseData, SearchId, SearchQuery, SearchQueryMatch, -}; -use crate::DistantMsg; +use crate::protocol::{self, SearchId, SearchQuery, SearchQueryMatch}; /// Represents a searcher for files, directories, and symlinks on the filesystem pub struct Searcher { @@ -37,8 +34,8 @@ impl Searcher { // Submit our run request and get back a mailbox for responses let mut mailbox = channel - .mail(Request::new(DistantMsg::Single( - DistantRequestData::Search { + .mail(Request::new(protocol::Msg::Single( + protocol::Request::Search { query: query.clone(), }, ))) @@ -53,18 +50,18 @@ impl Searcher { for data in res.payload.into_vec() { match data { // If we get results before the started indicator, queue them up - DistantResponseData::SearchResults { matches, .. } => { + protocol::Response::SearchResults { matches, .. } => { queue.extend(matches); } // Once we get the started indicator, mark as ready to go - DistantResponseData::SearchStarted { id } => { + protocol::Response::SearchStarted { id } => { trace!("[Query {id}] Searcher has started"); search_id = Some(id); } // If we get an explicit error, convert and return it - DistantResponseData::Error(x) => return Err(io::Error::from(x)), + protocol::Response::Error(x) => return Err(io::Error::from(x)), // Otherwise, we got something unexpected, and report as such x => { @@ -118,7 +115,7 @@ impl Searcher { for data in res.payload.into_vec() { match data { - DistantResponseData::SearchResults { matches, .. } => { + protocol::Response::SearchResults { matches, .. } => { // If we can't queue up a match anymore, we've // been closed and therefore want to quit if tx.is_closed() { @@ -138,7 +135,7 @@ impl Searcher { } // Received completion indicator, so close out - DistantResponseData::SearchDone { .. } => { + protocol::Response::SearchDone { .. } => { trace!("[Query {search_id}] Searcher has finished"); done = true; break; @@ -202,7 +199,7 @@ mod tests { use tokio::sync::Mutex; use super::*; - use crate::data::{ + use crate::protocol::{ SearchQueryCondition, SearchQueryMatchData, SearchQueryOptions, SearchQueryPathMatch, SearchQuerySubmatch, SearchQueryTarget, }; @@ -233,13 +230,13 @@ mod tests { }; // Wait until we get the request from the session - let req: Request = transport.read_frame_as().await.unwrap().unwrap(); + let req: Request = transport.read_frame_as().await.unwrap().unwrap(); // Send back an acknowledgement that a search was started transport .write_frame_for(&Response::new( req.id, - DistantResponseData::SearchStarted { id: rand::random() }, + protocol::Response::SearchStarted { id: rand::random() }, )) .await .unwrap(); @@ -269,14 +266,14 @@ mod tests { ); // Wait until we get the request from the session - let req: Request = transport.read_frame_as().await.unwrap().unwrap(); + let req: Request = transport.read_frame_as().await.unwrap().unwrap(); // Send back an acknowledgement that a searcher was created let id = rand::random::(); transport .write_frame_for(&Response::new( req.id.clone(), - DistantResponseData::SearchStarted { id }, + protocol::Response::SearchStarted { id }, )) .await .unwrap(); @@ -289,7 +286,7 @@ mod tests { .write_frame_for(&Response::new( req.id, vec![ - DistantResponseData::SearchResults { + protocol::Response::SearchResults { id, matches: vec![ SearchQueryMatch::Path(SearchQueryPathMatch { @@ -310,7 +307,7 @@ mod tests { }), ], }, - DistantResponseData::SearchResults { + protocol::Response::SearchResults { id, matches: vec![SearchQueryMatch::Path(SearchQueryPathMatch { path: PathBuf::from("/some/path/3"), @@ -388,14 +385,14 @@ mod tests { ); // Wait until we get the request from the session - let req: Request = transport.read_frame_as().await.unwrap().unwrap(); + let req: Request = transport.read_frame_as().await.unwrap().unwrap(); // Send back an acknowledgement that a searcher was created let id = rand::random(); transport .write_frame_for(&Response::new( req.id.clone(), - DistantResponseData::SearchStarted { id }, + protocol::Response::SearchStarted { id }, )) .await .unwrap(); @@ -407,7 +404,7 @@ mod tests { transport .write_frame_for(&Response::new( req.id.clone(), - DistantResponseData::SearchResults { + protocol::Response::SearchResults { id, matches: vec![SearchQueryMatch::Path(SearchQueryPathMatch { path: PathBuf::from("/some/path/1"), @@ -426,7 +423,7 @@ mod tests { transport .write_frame_for(&Response::new( req.id.clone() + "1", - DistantResponseData::SearchResults { + protocol::Response::SearchResults { id, matches: vec![SearchQueryMatch::Path(SearchQueryPathMatch { path: PathBuf::from("/some/path/2"), @@ -445,7 +442,7 @@ mod tests { transport .write_frame_for(&Response::new( req.id, - DistantResponseData::SearchResults { + protocol::Response::SearchResults { id, matches: vec![SearchQueryMatch::Path(SearchQueryPathMatch { path: PathBuf::from("/some/path/3"), @@ -509,14 +506,14 @@ mod tests { ); // Wait until we get the request from the session - let req: Request = transport.read_frame_as().await.unwrap().unwrap(); + let req: Request = transport.read_frame_as().await.unwrap().unwrap(); // Send back an acknowledgement that a watcher was created let id = rand::random::(); transport .write_frame_for(&Response::new( req.id.clone(), - DistantResponseData::SearchStarted { id }, + protocol::Response::SearchStarted { id }, )) .await .unwrap(); @@ -525,7 +522,7 @@ mod tests { transport .write_frame_for(&Response::new( req.id, - DistantResponseData::SearchResults { + protocol::Response::SearchResults { id, matches: vec![ SearchQueryMatch::Path(SearchQueryPathMatch { @@ -580,10 +577,10 @@ mod tests { let searcher_2 = Arc::clone(&searcher); let cancel_task = tokio::spawn(async move { searcher_2.lock().await.cancel().await }); - let req: Request = transport.read_frame_as().await.unwrap().unwrap(); + let req: Request = transport.read_frame_as().await.unwrap().unwrap(); transport - .write_frame_for(&Response::new(req.id.clone(), DistantResponseData::Ok)) + .write_frame_for(&Response::new(req.id.clone(), protocol::Response::Ok)) .await .unwrap(); @@ -594,7 +591,7 @@ mod tests { transport .write_frame_for(&Response::new( req.id, - DistantResponseData::SearchResults { + protocol::Response::SearchResults { id, matches: vec![SearchQueryMatch::Path(SearchQueryPathMatch { path: PathBuf::from("/some/path/3"), diff --git a/distant-core/src/client/watcher.rs b/distant-core/src/client/watcher.rs index f0c2d4f..1708916 100644 --- a/distant-core/src/client/watcher.rs +++ b/distant-core/src/client/watcher.rs @@ -8,8 +8,7 @@ use tokio::task::JoinHandle; use crate::client::{DistantChannel, DistantChannelExt}; use crate::constants::CLIENT_WATCHER_CAPACITY; -use crate::data::{Change, ChangeKindSet, DistantRequestData, DistantResponseData}; -use crate::DistantMsg; +use crate::protocol::{self, Change, ChangeKindSet}; /// Represents a watcher of some path on a remote machine pub struct Watcher { @@ -56,8 +55,8 @@ impl Watcher { // Submit our run request and get back a mailbox for responses let mut mailbox = channel - .mail(Request::new(DistantMsg::Single( - DistantRequestData::Watch { + .mail(Request::new(protocol::Msg::Single( + protocol::Request::Watch { path: path.to_path_buf(), recursive, only: only.into_sorted_vec(), @@ -74,11 +73,11 @@ impl Watcher { while let Some(res) = mailbox.next().await { for data in res.payload.into_vec() { match data { - DistantResponseData::Changed(change) => queue.push(change), - DistantResponseData::Ok => { + protocol::Response::Changed(change) => queue.push(change), + protocol::Response::Ok => { confirmed = true; } - DistantResponseData::Error(x) => return Err(io::Error::from(x)), + protocol::Response::Error(x) => return Err(io::Error::from(x)), x => { return Err(io::Error::new( io::ErrorKind::Other, @@ -118,7 +117,7 @@ impl Watcher { while let Some(res) = mailbox.next().await { for data in res.payload.into_vec() { match data { - DistantResponseData::Changed(change) => { + protocol::Response::Changed(change) => { // If we can't queue up a change anymore, we've // been closed and therefore want to quit if tx.is_closed() { @@ -188,7 +187,7 @@ mod tests { use tokio::sync::Mutex; use super::*; - use crate::data::ChangeKind; + use crate::protocol::ChangeKind; use crate::DistantClient; fn make_session() -> (FramedTransport, DistantClient) { @@ -215,11 +214,11 @@ mod tests { }); // Wait until we get the request from the session - let req: Request = transport.read_frame_as().await.unwrap().unwrap(); + let req: Request = transport.read_frame_as().await.unwrap().unwrap(); // Send back an acknowledgement that a watcher was created transport - .write_frame_for(&Response::new(req.id, DistantResponseData::Ok)) + .write_frame_for(&Response::new(req.id, protocol::Response::Ok)) .await .unwrap(); @@ -247,11 +246,11 @@ mod tests { }); // Wait until we get the request from the session - let req: Request = transport.read_frame_as().await.unwrap().unwrap(); + let req: Request = transport.read_frame_as().await.unwrap().unwrap(); // Send back an acknowledgement that a watcher was created transport - .write_frame_for(&Response::new(req.id.clone(), DistantResponseData::Ok)) + .write_frame_for(&Response::new(req.id.clone(), protocol::Response::Ok)) .await .unwrap(); @@ -263,11 +262,11 @@ mod tests { .write_frame_for(&Response::new( req.id, vec![ - DistantResponseData::Changed(Change { + protocol::Response::Changed(Change { kind: ChangeKind::Access, paths: vec![test_path.to_path_buf()], }), - DistantResponseData::Changed(Change { + protocol::Response::Changed(Change { kind: ChangeKind::Content, paths: vec![test_path.to_path_buf()], }), @@ -315,11 +314,11 @@ mod tests { }); // Wait until we get the request from the session - let req: Request = transport.read_frame_as().await.unwrap().unwrap(); + let req: Request = transport.read_frame_as().await.unwrap().unwrap(); // Send back an acknowledgement that a watcher was created transport - .write_frame_for(&Response::new(req.id.clone(), DistantResponseData::Ok)) + .write_frame_for(&Response::new(req.id.clone(), protocol::Response::Ok)) .await .unwrap(); @@ -330,7 +329,7 @@ mod tests { transport .write_frame_for(&Response::new( req.id.clone(), - DistantResponseData::Changed(Change { + protocol::Response::Changed(Change { kind: ChangeKind::Access, paths: vec![test_path.to_path_buf()], }), @@ -342,7 +341,7 @@ mod tests { transport .write_frame_for(&Response::new( req.id.clone() + "1", - DistantResponseData::Changed(Change { + protocol::Response::Changed(Change { kind: ChangeKind::Content, paths: vec![test_path.to_path_buf()], }), @@ -354,7 +353,7 @@ mod tests { transport .write_frame_for(&Response::new( req.id, - DistantResponseData::Changed(Change { + protocol::Response::Changed(Change { kind: ChangeKind::Remove, paths: vec![test_path.to_path_buf()], }), @@ -401,11 +400,11 @@ mod tests { }); // Wait until we get the request from the session - let req: Request = transport.read_frame_as().await.unwrap().unwrap(); + let req: Request = transport.read_frame_as().await.unwrap().unwrap(); // Send back an acknowledgement that a watcher was created transport - .write_frame_for(&Response::new(req.id.clone(), DistantResponseData::Ok)) + .write_frame_for(&Response::new(req.id.clone(), protocol::Response::Ok)) .await .unwrap(); @@ -414,15 +413,15 @@ mod tests { .write_frame_for(&Response::new( req.id, vec![ - DistantResponseData::Changed(Change { + protocol::Response::Changed(Change { kind: ChangeKind::Access, paths: vec![test_path.to_path_buf()], }), - DistantResponseData::Changed(Change { + protocol::Response::Changed(Change { kind: ChangeKind::Content, paths: vec![test_path.to_path_buf()], }), - DistantResponseData::Changed(Change { + protocol::Response::Changed(Change { kind: ChangeKind::Remove, paths: vec![test_path.to_path_buf()], }), @@ -457,10 +456,10 @@ mod tests { let watcher_2 = Arc::clone(&watcher); let unwatch_task = tokio::spawn(async move { watcher_2.lock().await.unwatch().await }); - let req: Request = transport.read_frame_as().await.unwrap().unwrap(); + let req: Request = transport.read_frame_as().await.unwrap().unwrap(); transport - .write_frame_for(&Response::new(req.id.clone(), DistantResponseData::Ok)) + .write_frame_for(&Response::new(req.id.clone(), protocol::Response::Ok)) .await .unwrap(); @@ -470,7 +469,7 @@ mod tests { transport .write_frame_for(&Response::new( req.id, - DistantResponseData::Changed(Change { + protocol::Response::Changed(Change { kind: ChangeKind::Unknown, paths: vec![test_path.to_path_buf()], }), diff --git a/distant-core/src/lib.rs b/distant-core/src/lib.rs index 7189a9a..2ab5778 100644 --- a/distant-core/src/lib.rs +++ b/distant-core/src/lib.rs @@ -7,8 +7,7 @@ pub use client::*; mod credentials; pub use credentials::*; -pub mod data; -pub use data::{DistantMsg, DistantRequestData, DistantResponseData}; +pub mod protocol; mod constants; mod serde_str; diff --git a/distant-core/src/data.rs b/distant-core/src/protocol.rs similarity index 97% rename from distant-core/src/data.rs rename to distant-core/src/protocol.rs index a4b431f..29ab870 100644 --- a/distant-core/src/data.rs +++ b/distant-core/src/protocol.rs @@ -45,12 +45,12 @@ pub type Environment = distant_net::common::Map; #[derive(Clone, Debug, From, PartialEq, Eq, Serialize, Deserialize)] #[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))] #[serde(untagged)] -pub enum DistantMsg { +pub enum Msg { Single(T), Batch(Vec), } -impl DistantMsg { +impl Msg { /// Returns true if msg has a single payload pub fn is_single(&self) -> bool { matches!(self, Self::Single(_)) @@ -119,9 +119,9 @@ impl DistantMsg { } #[cfg(feature = "schemars")] -impl DistantMsg { +impl Msg { pub fn root_schema() -> schemars::schema::RootSchema { - schemars::schema_for!(DistantMsg) + schemars::schema_for!(Msg) } } @@ -148,7 +148,7 @@ impl DistantMsg { #[strum_discriminants(name(CapabilityKind))] #[strum_discriminants(strum(serialize_all = "snake_case"))] #[serde(rename_all = "snake_case", deny_unknown_fields, tag = "type")] -pub enum DistantRequestData { +pub enum Request { /// Retrieve information about the server's capabilities #[strum_discriminants(strum(message = "Supports retrieving capabilities"))] Capabilities {}, @@ -414,9 +414,9 @@ pub enum DistantRequestData { } #[cfg(feature = "schemars")] -impl DistantRequestData { +impl Request { pub fn root_schema() -> schemars::schema::RootSchema { - schemars::schema_for!(DistantRequestData) + schemars::schema_for!(Request) } } @@ -425,7 +425,7 @@ impl DistantRequestData { #[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))] #[serde(rename_all = "snake_case", deny_unknown_fields, tag = "type")] #[strum(serialize_all = "snake_case")] -pub enum DistantResponseData { +pub enum Response { /// General okay with no extra data, returned in cases like /// creating or removing a directory, copying a file, or renaming /// a file @@ -535,13 +535,13 @@ pub enum DistantResponseData { } #[cfg(feature = "schemars")] -impl DistantResponseData { +impl Response { pub fn root_schema() -> schemars::schema::RootSchema { - schemars::schema_for!(DistantResponseData) + schemars::schema_for!(Response) } } -impl From for DistantResponseData { +impl From for Response { fn from(x: io::Error) -> Self { Self::Error(Error::from(x)) } diff --git a/distant-core/src/data/capabilities.rs b/distant-core/src/protocol/capabilities.rs similarity index 100% rename from distant-core/src/data/capabilities.rs rename to distant-core/src/protocol/capabilities.rs diff --git a/distant-core/src/data/change.rs b/distant-core/src/protocol/change.rs similarity index 100% rename from distant-core/src/data/change.rs rename to distant-core/src/protocol/change.rs diff --git a/distant-core/src/data/cmd.rs b/distant-core/src/protocol/cmd.rs similarity index 100% rename from distant-core/src/data/cmd.rs rename to distant-core/src/protocol/cmd.rs diff --git a/distant-core/src/data/error.rs b/distant-core/src/protocol/error.rs similarity index 100% rename from distant-core/src/data/error.rs rename to distant-core/src/protocol/error.rs diff --git a/distant-core/src/data/filesystem.rs b/distant-core/src/protocol/filesystem.rs similarity index 100% rename from distant-core/src/data/filesystem.rs rename to distant-core/src/protocol/filesystem.rs diff --git a/distant-core/src/data/metadata.rs b/distant-core/src/protocol/metadata.rs similarity index 99% rename from distant-core/src/data/metadata.rs rename to distant-core/src/protocol/metadata.rs index ad4525e..dcd7208 100644 --- a/distant-core/src/data/metadata.rs +++ b/distant-core/src/protocol/metadata.rs @@ -101,7 +101,7 @@ impl Metadata { unix: Some({ use std::os::unix::prelude::*; let mode = metadata.mode(); - crate::data::UnixMetadata::from(mode) + crate::protocol::UnixMetadata::from(mode) }), #[cfg(not(unix))] unix: None, @@ -110,7 +110,7 @@ impl Metadata { windows: Some({ use std::os::windows::prelude::*; let attributes = metadata.file_attributes(); - crate::data::WindowsMetadata::from(attributes) + crate::protocol::WindowsMetadata::from(attributes) }), #[cfg(not(windows))] windows: None, diff --git a/distant-core/src/data/pty.rs b/distant-core/src/protocol/pty.rs similarity index 100% rename from distant-core/src/data/pty.rs rename to distant-core/src/protocol/pty.rs diff --git a/distant-core/src/data/search.rs b/distant-core/src/protocol/search.rs similarity index 100% rename from distant-core/src/data/search.rs rename to distant-core/src/protocol/search.rs diff --git a/distant-core/src/data/system.rs b/distant-core/src/protocol/system.rs similarity index 100% rename from distant-core/src/data/system.rs rename to distant-core/src/protocol/system.rs diff --git a/distant-core/src/data/utils.rs b/distant-core/src/protocol/utils.rs similarity index 100% rename from distant-core/src/data/utils.rs rename to distant-core/src/protocol/utils.rs diff --git a/distant-core/tests/stress/distant/watch.rs b/distant-core/tests/stress/distant/watch.rs index ad690ee..ea4e442 100644 --- a/distant-core/tests/stress/distant/watch.rs +++ b/distant-core/tests/stress/distant/watch.rs @@ -1,5 +1,5 @@ use assert_fs::prelude::*; -use distant_core::data::ChangeKindSet; +use distant_core::protocol::ChangeKindSet; use distant_core::DistantChannelExt; use rstest::*; use test_log::test; diff --git a/distant-ssh2/src/api.rs b/distant-ssh2/src/api.rs index f31062c..bace98a 100644 --- a/distant-ssh2/src/api.rs +++ b/distant-ssh2/src/api.rs @@ -7,11 +7,11 @@ use std::time::Duration; use async_compat::CompatExt; use async_once_cell::OnceCell; use async_trait::async_trait; -use distant_core::data::{ +use distant_core::net::server::ConnectionCtx; +use distant_core::protocol::{ Capabilities, CapabilityKind, DirEntry, Environment, FileType, Metadata, ProcessId, PtySize, SystemInfo, UnixMetadata, }; -use distant_core::net::server::ConnectionCtx; use distant_core::{DistantApi, DistantCtx}; use log::*; use tokio::sync::{mpsc, RwLock}; diff --git a/distant-ssh2/src/process.rs b/distant-ssh2/src/process.rs index f7c119d..4fd3243 100644 --- a/distant-ssh2/src/process.rs +++ b/distant-ssh2/src/process.rs @@ -4,8 +4,8 @@ use std::path::PathBuf; use std::time::Duration; use async_compat::CompatExt; -use distant_core::data::{DistantResponseData, Environment, ProcessId, PtySize}; use distant_core::net::server::Reply; +use distant_core::protocol::{Environment, ProcessId, PtySize, Response}; use log::*; use tokio::sync::mpsc; use tokio::task::JoinHandle; @@ -32,7 +32,7 @@ pub async fn spawn_simple( cmd: &str, environment: Environment, current_dir: Option, - reply: Box>, + reply: Box>, cleanup: F, ) -> io::Result where @@ -117,7 +117,7 @@ pub async fn spawn_pty( environment: Environment, current_dir: Option, size: PtySize, - reply: Box>, + reply: Box>, cleanup: F, ) -> io::Result where @@ -205,14 +205,14 @@ where fn spawn_blocking_stdout_task( id: ProcessId, mut reader: impl Read + Send + 'static, - reply: Box>, + reply: Box>, ) -> JoinHandle<()> { tokio::task::spawn_blocking(move || { let mut buf: [u8; MAX_PIPE_CHUNK_SIZE] = [0; MAX_PIPE_CHUNK_SIZE]; loop { match reader.read(&mut buf) { Ok(n) if n > 0 => { - let payload = DistantResponseData::ProcStdout { + let payload = Response::ProcStdout { id, data: buf[..n].to_vec(), }; @@ -236,14 +236,14 @@ fn spawn_blocking_stdout_task( fn spawn_nonblocking_stdout_task( id: ProcessId, mut reader: impl Read + Send + 'static, - reply: Box>, + reply: Box>, ) -> JoinHandle<()> { tokio::spawn(async move { let mut buf: [u8; MAX_PIPE_CHUNK_SIZE] = [0; MAX_PIPE_CHUNK_SIZE]; loop { match reader.read(&mut buf) { Ok(n) if n > 0 => { - let payload = DistantResponseData::ProcStdout { + let payload = Response::ProcStdout { id, data: buf[..n].to_vec(), }; @@ -270,14 +270,14 @@ fn spawn_nonblocking_stdout_task( fn spawn_nonblocking_stderr_task( id: ProcessId, mut reader: impl Read + Send + 'static, - reply: Box>, + reply: Box>, ) -> JoinHandle<()> { tokio::spawn(async move { let mut buf: [u8; MAX_PIPE_CHUNK_SIZE] = [0; MAX_PIPE_CHUNK_SIZE]; loop { match reader.read(&mut buf) { Ok(n) if n > 0 => { - let payload = DistantResponseData::ProcStderr { + let payload = Response::ProcStderr { id, data: buf[..n].to_vec(), }; @@ -348,7 +348,7 @@ fn spawn_cleanup_task( stdin_task: JoinHandle<()>, stdout_task: JoinHandle<()>, stderr_task: Option>, - reply: Box>, + reply: Box>, cleanup: F, ) -> JoinHandle<()> where @@ -417,7 +417,7 @@ where cleanup(id).await; - let payload = DistantResponseData::ProcDone { + let payload = Response::ProcDone { id, success: !should_kill && success, code: if success { Some(0) } else { None }, diff --git a/distant-ssh2/tests/ssh2/client.rs b/distant-ssh2/tests/ssh2/client.rs index 560f133..c1acc99 100644 --- a/distant-ssh2/tests/ssh2/client.rs +++ b/distant-ssh2/tests/ssh2/client.rs @@ -4,7 +4,7 @@ use std::time::Duration; use assert_fs::prelude::*; use assert_fs::TempDir; -use distant_core::data::{ChangeKindSet, Environment, FileType, Metadata}; +use distant_core::protocol::{ChangeKindSet, Environment, FileType, Metadata}; use distant_core::{DistantChannelExt, DistantClient}; use once_cell::sync::Lazy; use predicates::prelude::*; diff --git a/distant-ssh2/tests/ssh2/launched.rs b/distant-ssh2/tests/ssh2/launched.rs index 5d5ac82..5237e91 100644 --- a/distant-ssh2/tests/ssh2/launched.rs +++ b/distant-ssh2/tests/ssh2/launched.rs @@ -3,7 +3,7 @@ use std::time::Duration; use assert_fs::prelude::*; use assert_fs::TempDir; -use distant_core::data::{ChangeKindSet, Environment, FileType, Metadata}; +use distant_core::protocol::{ChangeKindSet, Environment, FileType, Metadata}; use distant_core::{DistantChannelExt, DistantClient}; use once_cell::sync::Lazy; use predicates::prelude::*; diff --git a/src/cli/commands/client.rs b/src/cli/commands/client.rs index a174d12..28bc3a4 100644 --- a/src/cli/commands/client.rs +++ b/src/cli/commands/client.rs @@ -4,13 +4,10 @@ use std::path::Path; use std::time::Duration; use anyhow::Context; -use distant_core::data::{ChangeKindSet, FileType, SearchQuery, SystemInfo}; use distant_core::net::common::{ConnectionId, Host, Map, Request, Response}; use distant_core::net::manager::ManagerClient; -use distant_core::{ - DistantChannel, DistantChannelExt, DistantMsg, DistantRequestData, DistantResponseData, - RemoteCommand, Searcher, Watcher, -}; +use distant_core::protocol::{self, ChangeKindSet, FileType, SearchQuery, SystemInfo}; +use distant_core::{DistantChannel, DistantChannelExt, RemoteCommand, Searcher, Watcher}; use log::*; use serde_json::json; use tabled::object::Rows; @@ -281,8 +278,8 @@ async fn async_run(cmd: ClientSubcommand) -> CliResult { debug!("Starting api tasks"); let (msg_tx, mut msg_rx) = mpsc::channel(1); let request_task = tokio::spawn(async move { - let mut rx = - MsgReceiver::from_stdin().into_rx::>>(); + let mut rx = MsgReceiver::from_stdin() + .into_rx::>>(); loop { match rx.recv().await { Some(Ok(request)) => { @@ -312,7 +309,7 @@ async fn async_run(cmd: ClientSubcommand) -> CliResult { if ready.is_readable() { match channel - .try_read_frame_as::>>() + .try_read_frame_as::>>() { Ok(Some(msg)) => tx.send_blocking(&msg)?, Ok(None) => break, @@ -823,11 +820,11 @@ async fn async_run(cmd: ClientSubcommand) -> CliResult { depth, absolute, canonicalize, include_root ); let results = channel - .send(DistantMsg::Batch(vec![ - DistantRequestData::FileRead { + .send(protocol::Msg::Batch(vec![ + protocol::Request::FileRead { path: path.to_path_buf(), }, - DistantRequestData::DirRead { + protocol::Request::DirRead { path: path.to_path_buf(), depth, absolute, @@ -847,7 +844,7 @@ async fn async_run(cmd: ClientSubcommand) -> CliResult { .context("Got single response to batch request")? { match response { - DistantResponseData::DirEntries { entries, .. } => { + protocol::Response::DirEntries { entries, .. } => { #[derive(Tabled)] struct EntryRow { ty: String, @@ -874,14 +871,14 @@ async fn async_run(cmd: ClientSubcommand) -> CliResult { out.flush().context("Failed to flush stdout")?; return Ok(()); } - DistantResponseData::Blob { data } => { + protocol::Response::Blob { data } => { let mut out = std::io::stdout(); out.write_all(&data) .context("Failed to write file contents to stdout")?; out.flush().context("Failed to flush stdout")?; return Ok(()); } - DistantResponseData::Error(x) => errors.push(x), + protocol::Response::Error(x) => errors.push(x), _ => continue, } } @@ -1001,7 +998,7 @@ async fn async_run(cmd: ClientSubcommand) -> CliResult { // TODO: Provide a cleaner way to print just a match let res = Response::new( "".to_string(), - DistantMsg::Single(DistantResponseData::SearchResults { + protocol::Msg::Single(protocol::Response::SearchResults { id: 0, matches: vec![m], }), @@ -1053,7 +1050,7 @@ async fn async_run(cmd: ClientSubcommand) -> CliResult { // TODO: Provide a cleaner way to print just a change let res = Response::new( "".to_string(), - DistantMsg::Single(DistantResponseData::Changed(change)), + protocol::Msg::Single(protocol::Response::Changed(change)), ); formatter.print(res).context("Failed to print change")?; diff --git a/src/cli/commands/client/lsp.rs b/src/cli/commands/client/lsp.rs index b1f982a..f7cee2b 100644 --- a/src/cli/commands/client/lsp.rs +++ b/src/cli/commands/client/lsp.rs @@ -1,7 +1,7 @@ use std::path::PathBuf; use anyhow::Context; -use distant_core::data::PtySize; +use distant_core::protocol::PtySize; use distant_core::{DistantChannel, RemoteLspCommand}; use terminal_size::{terminal_size, Height, Width}; diff --git a/src/cli/commands/client/shell.rs b/src/cli/commands/client/shell.rs index 2018668..1fc9218 100644 --- a/src/cli/commands/client/shell.rs +++ b/src/cli/commands/client/shell.rs @@ -2,7 +2,7 @@ use std::path::PathBuf; use std::time::Duration; use anyhow::Context; -use distant_core::data::{Environment, PtySize}; +use distant_core::protocol::{Environment, PtySize}; use distant_core::{DistantChannel, DistantChannelExt, RemoteCommand}; use log::*; use terminal_size::{terminal_size, Height, Width}; diff --git a/src/cli/commands/common/format.rs b/src/cli/commands/common/format.rs index af3261a..8a5f765 100644 --- a/src/cli/commands/common/format.rs +++ b/src/cli/commands/common/format.rs @@ -2,11 +2,11 @@ use std::collections::HashMap; use std::io::{self, Write}; use std::path::PathBuf; -use distant_core::data::{ - ChangeKind, DistantMsg, DistantResponseData, Error, FileType, Metadata, - SearchQueryContentsMatch, SearchQueryMatch, SearchQueryPathMatch, SystemInfo, -}; use distant_core::net::common::Response; +use distant_core::protocol::{ + self, ChangeKind, Error, FileType, Metadata, SearchQueryContentsMatch, SearchQueryMatch, + SearchQueryPathMatch, SystemInfo, +}; use log::*; use tabled::object::Rows; use tabled::style::Style; @@ -40,7 +40,7 @@ impl Formatter { } /// Consumes the output message, printing it based on its configuration - pub fn print(&mut self, res: Response>) -> io::Result<()> { + pub fn print(&mut self, res: Response>) -> io::Result<()> { let output = match self.format { Format::Json => Output::StdoutLine( serde_json::to_vec(&res) @@ -120,15 +120,15 @@ enum Output { None, } -fn format_shell(state: &mut FormatterState, data: DistantResponseData) -> Output { +fn format_shell(state: &mut FormatterState, data: protocol::Response) -> Output { match data { - DistantResponseData::Ok => Output::None, - DistantResponseData::Error(Error { description, .. }) => { + protocol::Response::Ok => Output::None, + protocol::Response::Error(Error { description, .. }) => { Output::StderrLine(description.into_bytes()) } - DistantResponseData::Blob { data } => Output::StdoutLine(data), - DistantResponseData::Text { data } => Output::StdoutLine(data.into_bytes()), - DistantResponseData::DirEntries { entries, .. } => { + protocol::Response::Blob { data } => Output::StdoutLine(data), + protocol::Response::Text { data } => Output::StdoutLine(data.into_bytes()), + protocol::Response::DirEntries { entries, .. } => { #[derive(Tabled)] struct EntryRow { ty: String, @@ -151,7 +151,7 @@ fn format_shell(state: &mut FormatterState, data: DistantResponseData) -> Output Output::Stdout(table) } - DistantResponseData::Changed(change) => Output::StdoutLine( + protocol::Response::Changed(change) => Output::StdoutLine( format!( "{}{}", match change.kind { @@ -171,14 +171,14 @@ fn format_shell(state: &mut FormatterState, data: DistantResponseData) -> Output ) .into_bytes(), ), - DistantResponseData::Exists { value: exists } => { + protocol::Response::Exists { value: exists } => { if exists { Output::StdoutLine(b"true".to_vec()) } else { Output::StdoutLine(b"false".to_vec()) } } - DistantResponseData::Metadata(Metadata { + protocol::Response::Metadata(Metadata { canonicalized_path, file_type, len, @@ -278,11 +278,11 @@ fn format_shell(state: &mut FormatterState, data: DistantResponseData) -> Output ) .into_bytes(), ), - DistantResponseData::SearchStarted { id } => { + protocol::Response::SearchStarted { id } => { Output::StdoutLine(format!("Query {id} started").into_bytes()) } - DistantResponseData::SearchDone { .. } => Output::None, - DistantResponseData::SearchResults { matches, .. } => { + protocol::Response::SearchDone { .. } => Output::None, + protocol::Response::SearchResults { matches, .. } => { let mut files: HashMap<_, Vec> = HashMap::new(); let mut is_targeting_paths = false; @@ -340,10 +340,10 @@ fn format_shell(state: &mut FormatterState, data: DistantResponseData) -> Output Output::None } } - DistantResponseData::ProcSpawned { .. } => Output::None, - DistantResponseData::ProcStdout { data, .. } => Output::Stdout(data), - DistantResponseData::ProcStderr { data, .. } => Output::Stderr(data), - DistantResponseData::ProcDone { id, success, code } => { + protocol::Response::ProcSpawned { .. } => Output::None, + protocol::Response::ProcStdout { data, .. } => Output::Stdout(data), + protocol::Response::ProcStderr { data, .. } => Output::Stderr(data), + protocol::Response::ProcDone { id, success, code } => { if success { Output::None } else if let Some(code) = code { @@ -352,7 +352,7 @@ fn format_shell(state: &mut FormatterState, data: DistantResponseData) -> Output Output::StderrLine(format!("Proc {id} failed").into_bytes()) } } - DistantResponseData::SystemInfo(SystemInfo { + protocol::Response::SystemInfo(SystemInfo { family, os, arch, @@ -375,7 +375,7 @@ fn format_shell(state: &mut FormatterState, data: DistantResponseData) -> Output ) .into_bytes(), ), - DistantResponseData::Capabilities { supported } => { + protocol::Response::Capabilities { supported } => { #[derive(Tabled)] struct EntryRow { kind: String, diff --git a/src/cli/commands/generate.rs b/src/cli/commands/generate.rs index f16cd1f..eef1875 100644 --- a/src/cli/commands/generate.rs +++ b/src/cli/commands/generate.rs @@ -4,7 +4,7 @@ use anyhow::Context; use clap::CommandFactory; use clap_complete::generate as clap_generate; use distant_core::net::common::{Request, Response}; -use distant_core::{DistantMsg, DistantRequestData, DistantResponseData}; +use distant_core::protocol; use crate::options::{Config, GenerateSubcommand}; use crate::{CliResult, Options}; @@ -22,10 +22,10 @@ async fn async_run(cmd: GenerateSubcommand) -> CliResult { GenerateSubcommand::Schema { file } => { let request_schema = - serde_json::to_value(&Request::>::root_schema()) + serde_json::to_value(&Request::>::root_schema()) .context("Failed to serialize request schema")?; let response_schema = - serde_json::to_value(&Response::>::root_schema()) + serde_json::to_value(&Response::>::root_schema()) .context("Failed to serialize response schema")?; let schema = serde_json::json!({ diff --git a/src/options.rs b/src/options.rs index 34158f4..a480dde 100644 --- a/src/options.rs +++ b/src/options.rs @@ -5,9 +5,9 @@ use clap::builder::TypedValueParser as _; use clap::{Parser, Subcommand, ValueEnum, ValueHint}; use clap_complete::Shell as ClapCompleteShell; use derive_more::IsVariant; -use distant_core::data::{ChangeKind, Environment}; use distant_core::net::common::{ConnectionId, Destination, Map, PortRange}; use distant_core::net::server::Shutdown; +use distant_core::protocol::{ChangeKind, Environment}; use service_manager::ServiceManagerKind; use crate::constants; diff --git a/src/options/common/search.rs b/src/options/common/search.rs index 643bc46..2819f5a 100644 --- a/src/options/common/search.rs +++ b/src/options/common/search.rs @@ -1,8 +1,8 @@ use std::collections::HashSet; use clap::{Args, ValueEnum}; -pub use distant_core::data::SearchQueryCondition as CliSearchQueryCondition; -use distant_core::data::{FileType, SearchQueryOptions, SearchQueryTarget}; +pub use distant_core::protocol::SearchQueryCondition as CliSearchQueryCondition; +use distant_core::protocol::{FileType, SearchQueryOptions, SearchQueryTarget}; /// Options to customize the search results. #[derive(Args, Clone, Debug, Default, PartialEq, Eq)] diff --git a/tests/cli/api/capabilities.rs b/tests/cli/api/capabilities.rs index 076d137..149da94 100644 --- a/tests/cli/api/capabilities.rs +++ b/tests/cli/api/capabilities.rs @@ -1,4 +1,4 @@ -use distant_core::data::{Capabilities, Capability}; +use distant_core::protocol::{Capabilities, Capability}; use rstest::*; use serde_json::json; use test_log::test;