Rename distant-core::data to distant-core::protocol and remove distant-core::data::{Msg, Request, Response} from being re-exported at crate root

pull/184/head
Chip Senkbeil 1 year ago
parent 7fceb63aa3
commit 398aff2f12
No known key found for this signature in database
GPG Key ID: 35EF1F8EC72A4131

@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased] ## [Unreleased]
### Changed
- Renamed `distant_core::data` to `distant_core::protocol`
## [0.20.0-alpha.5] ## [0.20.0-alpha.5]
### Added ### Added

@ -7,11 +7,10 @@ use distant_net::common::ConnectionId;
use distant_net::server::{ConnectionCtx, Reply, ServerCtx, ServerHandler}; use distant_net::server::{ConnectionCtx, Reply, ServerCtx, ServerHandler};
use log::*; use log::*;
use crate::data::{ use crate::protocol::{
Capabilities, ChangeKind, DirEntry, Environment, Error, Metadata, ProcessId, PtySize, SearchId, self, Capabilities, ChangeKind, DirEntry, Environment, Error, Metadata, ProcessId, PtySize,
SearchQuery, SystemInfo, SearchId, SearchQuery, SystemInfo,
}; };
use crate::{DistantMsg, DistantRequestData, DistantResponseData};
mod local; mod local;
pub use local::LocalDistantApi; pub use local::LocalDistantApi;
@ -22,7 +21,7 @@ use reply::DistantSingleReply;
/// Represents the context provided to the [`DistantApi`] for incoming requests /// Represents the context provided to the [`DistantApi`] for incoming requests
pub struct DistantCtx<T> { pub struct DistantCtx<T> {
pub connection_id: ConnectionId, pub connection_id: ConnectionId,
pub reply: Box<dyn Reply<Data = DistantResponseData>>, pub reply: Box<dyn Reply<Data = protocol::Response>>,
pub local_data: Arc<T>, pub local_data: Arc<T>,
} }
@ -423,8 +422,8 @@ where
D: Send + Sync, D: Send + Sync,
{ {
type LocalData = D; type LocalData = D;
type Request = DistantMsg<DistantRequestData>; type Request = protocol::Msg<protocol::Request>;
type Response = DistantMsg<DistantResponseData>; type Response = protocol::Msg<protocol::Response>;
/// Overridden to leverage [`DistantApi`] implementation of `on_accept` /// Overridden to leverage [`DistantApi`] implementation of `on_accept`
async fn on_accept(&self, ctx: ConnectionCtx<'_, Self::LocalData>) -> io::Result<()> { async fn on_accept(&self, ctx: ConnectionCtx<'_, Self::LocalData>) -> io::Result<()> {
@ -445,7 +444,7 @@ where
// Process single vs batch requests // Process single vs batch requests
let response = match request.payload { let response = match request.payload {
DistantMsg::Single(data) => { protocol::Msg::Single(data) => {
let ctx = DistantCtx { let ctx = DistantCtx {
connection_id, connection_id,
reply: Box::new(DistantSingleReply::from(reply.clone_reply())), reply: Box::new(DistantSingleReply::from(reply.clone_reply())),
@ -455,13 +454,13 @@ where
let data = handle_request(self, ctx, data).await; let data = handle_request(self, ctx, data).await;
// Report outgoing errors in our debug logs // Report outgoing errors in our debug logs
if let DistantResponseData::Error(x) = &data { if let protocol::Response::Error(x) = &data {
debug!("[Conn {}] {}", connection_id, x); debug!("[Conn {}] {}", connection_id, x);
} }
DistantMsg::Single(data) protocol::Msg::Single(data)
} }
DistantMsg::Batch(list) => { protocol::Msg::Batch(list) => {
let mut out = Vec::new(); let mut out = Vec::new();
for data in list { for data in list {
@ -480,14 +479,14 @@ where
let data = handle_request(self, ctx, data).await; let data = handle_request(self, ctx, data).await;
// Report outgoing errors in our debug logs // Report outgoing errors in our debug logs
if let DistantResponseData::Error(x) = &data { if let protocol::Response::Error(x) = &data {
debug!("[Conn {}] {}", connection_id, x); debug!("[Conn {}] {}", connection_id, x);
} }
out.push(data); out.push(data);
} }
DistantMsg::Batch(out) protocol::Msg::Batch(out)
} }
}; };
@ -512,56 +511,56 @@ where
async fn handle_request<T, D>( async fn handle_request<T, D>(
server: &DistantApiServerHandler<T, D>, server: &DistantApiServerHandler<T, D>,
ctx: DistantCtx<D>, ctx: DistantCtx<D>,
request: DistantRequestData, request: protocol::Request,
) -> DistantResponseData ) -> protocol::Response
where where
T: DistantApi<LocalData = D> + Send + Sync, T: DistantApi<LocalData = D> + Send + Sync,
D: Send + Sync, D: Send + Sync,
{ {
match request { match request {
DistantRequestData::Capabilities {} => server protocol::Request::Capabilities {} => server
.api .api
.capabilities(ctx) .capabilities(ctx)
.await .await
.map(|supported| DistantResponseData::Capabilities { supported }) .map(|supported| protocol::Response::Capabilities { supported })
.unwrap_or_else(DistantResponseData::from), .unwrap_or_else(protocol::Response::from),
DistantRequestData::FileRead { path } => server protocol::Request::FileRead { path } => server
.api .api
.read_file(ctx, path) .read_file(ctx, path)
.await .await
.map(|data| DistantResponseData::Blob { data }) .map(|data| protocol::Response::Blob { data })
.unwrap_or_else(DistantResponseData::from), .unwrap_or_else(protocol::Response::from),
DistantRequestData::FileReadText { path } => server protocol::Request::FileReadText { path } => server
.api .api
.read_file_text(ctx, path) .read_file_text(ctx, path)
.await .await
.map(|data| DistantResponseData::Text { data }) .map(|data| protocol::Response::Text { data })
.unwrap_or_else(DistantResponseData::from), .unwrap_or_else(protocol::Response::from),
DistantRequestData::FileWrite { path, data } => server protocol::Request::FileWrite { path, data } => server
.api .api
.write_file(ctx, path, data) .write_file(ctx, path, data)
.await .await
.map(|_| DistantResponseData::Ok) .map(|_| protocol::Response::Ok)
.unwrap_or_else(DistantResponseData::from), .unwrap_or_else(protocol::Response::from),
DistantRequestData::FileWriteText { path, text } => server protocol::Request::FileWriteText { path, text } => server
.api .api
.write_file_text(ctx, path, text) .write_file_text(ctx, path, text)
.await .await
.map(|_| DistantResponseData::Ok) .map(|_| protocol::Response::Ok)
.unwrap_or_else(DistantResponseData::from), .unwrap_or_else(protocol::Response::from),
DistantRequestData::FileAppend { path, data } => server protocol::Request::FileAppend { path, data } => server
.api .api
.append_file(ctx, path, data) .append_file(ctx, path, data)
.await .await
.map(|_| DistantResponseData::Ok) .map(|_| protocol::Response::Ok)
.unwrap_or_else(DistantResponseData::from), .unwrap_or_else(protocol::Response::from),
DistantRequestData::FileAppendText { path, text } => server protocol::Request::FileAppendText { path, text } => server
.api .api
.append_file_text(ctx, path, text) .append_file_text(ctx, path, text)
.await .await
.map(|_| DistantResponseData::Ok) .map(|_| protocol::Response::Ok)
.unwrap_or_else(DistantResponseData::from), .unwrap_or_else(protocol::Response::from),
DistantRequestData::DirRead { protocol::Request::DirRead {
path, path,
depth, depth,
absolute, absolute,
@ -571,36 +570,36 @@ where
.api .api
.read_dir(ctx, path, depth, absolute, canonicalize, include_root) .read_dir(ctx, path, depth, absolute, canonicalize, include_root)
.await .await
.map(|(entries, errors)| DistantResponseData::DirEntries { .map(|(entries, errors)| protocol::Response::DirEntries {
entries, entries,
errors: errors.into_iter().map(Error::from).collect(), errors: errors.into_iter().map(Error::from).collect(),
}) })
.unwrap_or_else(DistantResponseData::from), .unwrap_or_else(protocol::Response::from),
DistantRequestData::DirCreate { path, all } => server protocol::Request::DirCreate { path, all } => server
.api .api
.create_dir(ctx, path, all) .create_dir(ctx, path, all)
.await .await
.map(|_| DistantResponseData::Ok) .map(|_| protocol::Response::Ok)
.unwrap_or_else(DistantResponseData::from), .unwrap_or_else(protocol::Response::from),
DistantRequestData::Remove { path, force } => server protocol::Request::Remove { path, force } => server
.api .api
.remove(ctx, path, force) .remove(ctx, path, force)
.await .await
.map(|_| DistantResponseData::Ok) .map(|_| protocol::Response::Ok)
.unwrap_or_else(DistantResponseData::from), .unwrap_or_else(protocol::Response::from),
DistantRequestData::Copy { src, dst } => server protocol::Request::Copy { src, dst } => server
.api .api
.copy(ctx, src, dst) .copy(ctx, src, dst)
.await .await
.map(|_| DistantResponseData::Ok) .map(|_| protocol::Response::Ok)
.unwrap_or_else(DistantResponseData::from), .unwrap_or_else(protocol::Response::from),
DistantRequestData::Rename { src, dst } => server protocol::Request::Rename { src, dst } => server
.api .api
.rename(ctx, src, dst) .rename(ctx, src, dst)
.await .await
.map(|_| DistantResponseData::Ok) .map(|_| protocol::Response::Ok)
.unwrap_or_else(DistantResponseData::from), .unwrap_or_else(protocol::Response::from),
DistantRequestData::Watch { protocol::Request::Watch {
path, path,
recursive, recursive,
only, only,
@ -609,21 +608,21 @@ where
.api .api
.watch(ctx, path, recursive, only, except) .watch(ctx, path, recursive, only, except)
.await .await
.map(|_| DistantResponseData::Ok) .map(|_| protocol::Response::Ok)
.unwrap_or_else(DistantResponseData::from), .unwrap_or_else(protocol::Response::from),
DistantRequestData::Unwatch { path } => server protocol::Request::Unwatch { path } => server
.api .api
.unwatch(ctx, path) .unwatch(ctx, path)
.await .await
.map(|_| DistantResponseData::Ok) .map(|_| protocol::Response::Ok)
.unwrap_or_else(DistantResponseData::from), .unwrap_or_else(protocol::Response::from),
DistantRequestData::Exists { path } => server protocol::Request::Exists { path } => server
.api .api
.exists(ctx, path) .exists(ctx, path)
.await .await
.map(|value| DistantResponseData::Exists { value }) .map(|value| protocol::Response::Exists { value })
.unwrap_or_else(DistantResponseData::from), .unwrap_or_else(protocol::Response::from),
DistantRequestData::Metadata { protocol::Request::Metadata {
path, path,
canonicalize, canonicalize,
resolve_file_type, resolve_file_type,
@ -631,21 +630,21 @@ where
.api .api
.metadata(ctx, path, canonicalize, resolve_file_type) .metadata(ctx, path, canonicalize, resolve_file_type)
.await .await
.map(DistantResponseData::Metadata) .map(protocol::Response::Metadata)
.unwrap_or_else(DistantResponseData::from), .unwrap_or_else(protocol::Response::from),
DistantRequestData::Search { query } => server protocol::Request::Search { query } => server
.api .api
.search(ctx, query) .search(ctx, query)
.await .await
.map(|id| DistantResponseData::SearchStarted { id }) .map(|id| protocol::Response::SearchStarted { id })
.unwrap_or_else(DistantResponseData::from), .unwrap_or_else(protocol::Response::from),
DistantRequestData::CancelSearch { id } => server protocol::Request::CancelSearch { id } => server
.api .api
.cancel_search(ctx, id) .cancel_search(ctx, id)
.await .await
.map(|_| DistantResponseData::Ok) .map(|_| protocol::Response::Ok)
.unwrap_or_else(DistantResponseData::from), .unwrap_or_else(protocol::Response::from),
DistantRequestData::ProcSpawn { protocol::Request::ProcSpawn {
cmd, cmd,
environment, environment,
current_dir, current_dir,
@ -654,31 +653,31 @@ where
.api .api
.proc_spawn(ctx, cmd.into(), environment, current_dir, pty) .proc_spawn(ctx, cmd.into(), environment, current_dir, pty)
.await .await
.map(|id| DistantResponseData::ProcSpawned { id }) .map(|id| protocol::Response::ProcSpawned { id })
.unwrap_or_else(DistantResponseData::from), .unwrap_or_else(protocol::Response::from),
DistantRequestData::ProcKill { id } => server protocol::Request::ProcKill { id } => server
.api .api
.proc_kill(ctx, id) .proc_kill(ctx, id)
.await .await
.map(|_| DistantResponseData::Ok) .map(|_| protocol::Response::Ok)
.unwrap_or_else(DistantResponseData::from), .unwrap_or_else(protocol::Response::from),
DistantRequestData::ProcStdin { id, data } => server protocol::Request::ProcStdin { id, data } => server
.api .api
.proc_stdin(ctx, id, data) .proc_stdin(ctx, id, data)
.await .await
.map(|_| DistantResponseData::Ok) .map(|_| protocol::Response::Ok)
.unwrap_or_else(DistantResponseData::from), .unwrap_or_else(protocol::Response::from),
DistantRequestData::ProcResizePty { id, size } => server protocol::Request::ProcResizePty { id, size } => server
.api .api
.proc_resize_pty(ctx, id, size) .proc_resize_pty(ctx, id, size)
.await .await
.map(|_| DistantResponseData::Ok) .map(|_| protocol::Response::Ok)
.unwrap_or_else(DistantResponseData::from), .unwrap_or_else(protocol::Response::from),
DistantRequestData::SystemInfo {} => server protocol::Request::SystemInfo {} => server
.api .api
.system_info(ctx) .system_info(ctx)
.await .await
.map(DistantResponseData::SystemInfo) .map(protocol::Response::SystemInfo)
.unwrap_or_else(DistantResponseData::from), .unwrap_or_else(protocol::Response::from),
} }
} }

@ -6,7 +6,7 @@ use log::*;
use tokio::io::AsyncWriteExt; use tokio::io::AsyncWriteExt;
use walkdir::WalkDir; use walkdir::WalkDir;
use crate::data::{ use crate::protocol::{
Capabilities, ChangeKind, ChangeKindSet, DirEntry, Environment, FileType, Metadata, ProcessId, Capabilities, ChangeKind, ChangeKindSet, DirEntry, Environment, FileType, Metadata, ProcessId,
PtySize, SearchId, SearchQuery, SystemInfo, PtySize, SearchId, SearchQuery, SystemInfo,
}; };
@ -503,7 +503,7 @@ mod tests {
use super::*; use super::*;
use crate::api::ConnectionCtx; use crate::api::ConnectionCtx;
use crate::data::DistantResponseData; use crate::protocol::Response;
static TEMP_SCRIPT_DIR: Lazy<assert_fs::TempDir> = static TEMP_SCRIPT_DIR: Lazy<assert_fs::TempDir> =
Lazy::new(|| assert_fs::TempDir::new().unwrap()); Lazy::new(|| assert_fs::TempDir::new().unwrap());
@ -564,13 +564,7 @@ mod tests {
static DOES_NOT_EXIST_BIN: Lazy<assert_fs::fixture::ChildPath> = static DOES_NOT_EXIST_BIN: Lazy<assert_fs::fixture::ChildPath> =
Lazy::new(|| TEMP_SCRIPT_DIR.child("does_not_exist_bin")); Lazy::new(|| TEMP_SCRIPT_DIR.child("does_not_exist_bin"));
async fn setup( async fn setup(buffer: usize) -> (LocalDistantApi, DistantCtx<()>, mpsc::Receiver<Response>) {
buffer: usize,
) -> (
LocalDistantApi,
DistantCtx<()>,
mpsc::Receiver<DistantResponseData>,
) {
let api = LocalDistantApi::initialize().unwrap(); let api = LocalDistantApi::initialize().unwrap();
let (reply, rx) = make_reply(buffer); let (reply, rx) = make_reply(buffer);
let connection_id = rand::random(); let connection_id = rand::random();
@ -592,12 +586,7 @@ mod tests {
(api, ctx, rx) (api, ctx, rx)
} }
fn make_reply( fn make_reply(buffer: usize) -> (Box<dyn Reply<Data = Response>>, mpsc::Receiver<Response>) {
buffer: usize,
) -> (
Box<dyn Reply<Data = DistantResponseData>>,
mpsc::Receiver<DistantResponseData>,
) {
let (tx, rx) = mpsc::channel(buffer); let (tx, rx) = mpsc::channel(buffer);
(Box::new(tx), rx) (Box::new(tx), rx)
} }
@ -1344,12 +1333,12 @@ mod tests {
/// Validates a response as being a series of changes that include the provided paths /// Validates a response as being a series of changes that include the provided paths
fn validate_changed_paths( fn validate_changed_paths(
data: &DistantResponseData, data: &Response,
expected_paths: &[PathBuf], expected_paths: &[PathBuf],
should_panic: bool, should_panic: bool,
) -> bool { ) -> bool {
match data { match data {
DistantResponseData::Changed(change) if should_panic => { Response::Changed(change) if should_panic => {
let paths: Vec<PathBuf> = change let paths: Vec<PathBuf> = change
.paths .paths
.iter() .iter()
@ -1359,7 +1348,7 @@ mod tests {
true true
} }
DistantResponseData::Changed(change) => { Response::Changed(change) => {
let paths: Vec<PathBuf> = change let paths: Vec<PathBuf> = change
.paths .paths
.iter() .iter()
@ -1901,8 +1890,8 @@ mod tests {
let mut got_stdout = false; let mut got_stdout = false;
let mut got_done = false; let mut got_done = false;
let mut check_data = |data: &DistantResponseData| match data { let mut check_data = |data: &Response| match data {
DistantResponseData::ProcStdout { id, data } => { Response::ProcStdout { id, data } => {
assert_eq!( assert_eq!(
*id, proc_id, *id, proc_id,
"Got {}, but expected {} as process id", "Got {}, but expected {} as process id",
@ -1911,7 +1900,7 @@ mod tests {
assert_eq!(data, b"some stdout", "Got wrong stdout"); assert_eq!(data, b"some stdout", "Got wrong stdout");
got_stdout = true; got_stdout = true;
} }
DistantResponseData::ProcDone { id, success, .. } => { Response::ProcDone { id, success, .. } => {
assert_eq!( assert_eq!(
*id, proc_id, *id, proc_id,
"Got {}, but expected {} as process id", "Got {}, but expected {} as process id",
@ -1965,8 +1954,8 @@ mod tests {
let mut got_stderr = false; let mut got_stderr = false;
let mut got_done = false; let mut got_done = false;
let mut check_data = |data: &DistantResponseData| match data { let mut check_data = |data: &Response| match data {
DistantResponseData::ProcStderr { id, data } => { Response::ProcStderr { id, data } => {
assert_eq!( assert_eq!(
*id, proc_id, *id, proc_id,
"Got {}, but expected {} as process id", "Got {}, but expected {} as process id",
@ -1975,7 +1964,7 @@ mod tests {
assert_eq!(data, b"some stderr", "Got wrong stderr"); assert_eq!(data, b"some stderr", "Got wrong stderr");
got_stderr = true; got_stderr = true;
} }
DistantResponseData::ProcDone { id, success, .. } => { Response::ProcDone { id, success, .. } => {
assert_eq!( assert_eq!(
*id, proc_id, *id, proc_id,
"Got {}, but expected {} as process id", "Got {}, but expected {} as process id",
@ -2014,7 +2003,7 @@ mod tests {
// Wait for process to finish // Wait for process to finish
match rx.recv().await.unwrap() { match rx.recv().await.unwrap() {
DistantResponseData::ProcDone { id, .. } => assert_eq!( Response::ProcDone { id, .. } => assert_eq!(
id, proc_id, id, proc_id,
"Got {}, but expected {} as process id", "Got {}, but expected {} as process id",
id, proc_id id, proc_id
@ -2056,7 +2045,7 @@ mod tests {
// Wait for the completion response to come in // Wait for the completion response to come in
match rx.recv().await.unwrap() { match rx.recv().await.unwrap() {
DistantResponseData::ProcDone { id, .. } => assert_eq!( Response::ProcDone { id, .. } => assert_eq!(
id, proc_id, id, proc_id,
"Got {}, but expected {} as process id", "Got {}, but expected {} as process id",
id, proc_id id, proc_id
@ -2124,7 +2113,7 @@ mod tests {
// Third, check the async response of stdout to verify we got stdin // Third, check the async response of stdout to verify we got stdin
match rx.recv().await.unwrap() { match rx.recv().await.unwrap() {
DistantResponseData::ProcStdout { data, .. } => { Response::ProcStdout { data, .. } => {
assert_eq!(data, b"hello world\n", "Mirrored data didn't match"); assert_eq!(data, b"hello world\n", "Mirrored data didn't match");
} }
x => panic!("Unexpected response: {:?}", x), x => panic!("Unexpected response: {:?}", x),

@ -4,7 +4,7 @@ use std::pin::Pin;
use tokio::io; use tokio::io;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use crate::data::{ProcessId, PtySize}; use crate::protocol::{ProcessId, PtySize};
mod pty; mod pty;
pub use pty::*; pub use pty::*;

@ -13,7 +13,7 @@ use super::{
ProcessPty, PtySize, WaitRx, ProcessPty, PtySize, WaitRx,
}; };
use crate::constants::{MAX_PIPE_CHUNK_SIZE, READ_PAUSE_DURATION}; use crate::constants::{MAX_PIPE_CHUNK_SIZE, READ_PAUSE_DURATION};
use crate::data::Environment; use crate::protocol::Environment;
/// Represents a process that is associated with a pty /// Represents a process that is associated with a pty
pub struct PtyProcess { pub struct PtyProcess {

@ -12,7 +12,7 @@ use super::{
wait, ExitStatus, FutureReturn, InputChannel, NoProcessPty, OutputChannel, Process, ProcessId, wait, ExitStatus, FutureReturn, InputChannel, NoProcessPty, OutputChannel, Process, ProcessId,
ProcessKiller, WaitRx, ProcessKiller, WaitRx,
}; };
use crate::data::Environment; use crate::protocol::Environment;
mod tasks; mod tasks;

@ -7,7 +7,7 @@ use distant_net::server::Reply;
use tokio::sync::{mpsc, oneshot}; use tokio::sync::{mpsc, oneshot};
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
use crate::data::{DistantResponseData, Environment, ProcessId, PtySize}; use crate::protocol::{Environment, ProcessId, PtySize, Response};
mod instance; mod instance;
pub use instance::*; pub use instance::*;
@ -71,7 +71,7 @@ impl ProcessChannel {
environment: Environment, environment: Environment,
current_dir: Option<PathBuf>, current_dir: Option<PathBuf>,
pty: Option<PtySize>, pty: Option<PtySize>,
reply: Box<dyn Reply<Data = DistantResponseData>>, reply: Box<dyn Reply<Data = Response>>,
) -> io::Result<ProcessId> { ) -> io::Result<ProcessId> {
let (cb, rx) = oneshot::channel(); let (cb, rx) = oneshot::channel();
self.tx self.tx
@ -131,7 +131,7 @@ enum InnerProcessMsg {
environment: Environment, environment: Environment,
current_dir: Option<PathBuf>, current_dir: Option<PathBuf>,
pty: Option<PtySize>, pty: Option<PtySize>,
reply: Box<dyn Reply<Data = DistantResponseData>>, reply: Box<dyn Reply<Data = Response>>,
cb: oneshot::Sender<io::Result<ProcessId>>, cb: oneshot::Sender<io::Result<ProcessId>>,
}, },
Resize { Resize {

@ -9,7 +9,7 @@ use tokio::task::JoinHandle;
use crate::api::local::process::{ use crate::api::local::process::{
InputChannel, OutputChannel, Process, ProcessKiller, ProcessPty, PtyProcess, SimpleProcess, InputChannel, OutputChannel, Process, ProcessKiller, ProcessPty, PtyProcess, SimpleProcess,
}; };
use crate::data::{DistantResponseData, Environment, ProcessId, PtySize}; use crate::protocol::{Environment, ProcessId, PtySize, Response};
/// Holds information related to a spawned process on the server /// Holds information related to a spawned process on the server
pub struct ProcessInstance { pub struct ProcessInstance {
@ -65,7 +65,7 @@ impl ProcessInstance {
environment: Environment, environment: Environment,
current_dir: Option<PathBuf>, current_dir: Option<PathBuf>,
pty: Option<PtySize>, pty: Option<PtySize>,
reply: Box<dyn Reply<Data = DistantResponseData>>, reply: Box<dyn Reply<Data = Response>>,
) -> io::Result<Self> { ) -> io::Result<Self> {
// Build out the command and args from our string // Build out the command and args from our string
let mut cmd_and_args = if cfg!(windows) { let mut cmd_and_args = if cfg!(windows) {
@ -168,14 +168,12 @@ impl ProcessInstance {
async fn stdout_task( async fn stdout_task(
id: ProcessId, id: ProcessId,
mut stdout: Box<dyn OutputChannel>, mut stdout: Box<dyn OutputChannel>,
reply: Box<dyn Reply<Data = DistantResponseData>>, reply: Box<dyn Reply<Data = Response>>,
) -> io::Result<()> { ) -> io::Result<()> {
loop { loop {
match stdout.recv().await { match stdout.recv().await {
Ok(Some(data)) => { Ok(Some(data)) => {
reply reply.send(Response::ProcStdout { id, data }).await?;
.send(DistantResponseData::ProcStdout { id, data })
.await?;
} }
Ok(None) => return Ok(()), Ok(None) => return Ok(()),
Err(x) => return Err(x), Err(x) => return Err(x),
@ -186,14 +184,12 @@ async fn stdout_task(
async fn stderr_task( async fn stderr_task(
id: ProcessId, id: ProcessId,
mut stderr: Box<dyn OutputChannel>, mut stderr: Box<dyn OutputChannel>,
reply: Box<dyn Reply<Data = DistantResponseData>>, reply: Box<dyn Reply<Data = Response>>,
) -> io::Result<()> { ) -> io::Result<()> {
loop { loop {
match stderr.recv().await { match stderr.recv().await {
Ok(Some(data)) => { Ok(Some(data)) => {
reply reply.send(Response::ProcStderr { id, data }).await?;
.send(DistantResponseData::ProcStderr { id, data })
.await?;
} }
Ok(None) => return Ok(()), Ok(None) => return Ok(()),
Err(x) => return Err(x), Err(x) => return Err(x),
@ -204,20 +200,20 @@ async fn stderr_task(
async fn wait_task( async fn wait_task(
id: ProcessId, id: ProcessId,
mut child: Box<dyn Process>, mut child: Box<dyn Process>,
reply: Box<dyn Reply<Data = DistantResponseData>>, reply: Box<dyn Reply<Data = Response>>,
) -> io::Result<()> { ) -> io::Result<()> {
let status = child.wait().await; let status = child.wait().await;
match status { match status {
Ok(status) => { Ok(status) => {
reply reply
.send(DistantResponseData::ProcDone { .send(Response::ProcDone {
id, id,
success: status.success, success: status.success,
code: status.code, code: status.code,
}) })
.await .await
} }
Err(x) => reply.send(DistantResponseData::from(x)).await, Err(x) => reply.send(Response::from(x)).await,
} }
} }

@ -13,8 +13,8 @@ use log::*;
use tokio::sync::{broadcast, mpsc, oneshot}; use tokio::sync::{broadcast, mpsc, oneshot};
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
use crate::data::{ use crate::protocol::{
DistantResponseData, SearchId, SearchQuery, SearchQueryContentsMatch, SearchQueryMatch, Response, SearchId, SearchQuery, SearchQueryContentsMatch, SearchQueryMatch,
SearchQueryMatchData, SearchQueryOptions, SearchQueryPathMatch, SearchQuerySubmatch, SearchQueryMatchData, SearchQueryOptions, SearchQueryPathMatch, SearchQuerySubmatch,
SearchQueryTarget, SearchQueryTarget,
}; };
@ -82,7 +82,7 @@ impl SearchChannel {
pub async fn start( pub async fn start(
&self, &self,
query: SearchQuery, query: SearchQuery,
reply: Box<dyn Reply<Data = DistantResponseData>>, reply: Box<dyn Reply<Data = Response>>,
) -> io::Result<SearchId> { ) -> io::Result<SearchId> {
let (cb, rx) = oneshot::channel(); let (cb, rx) = oneshot::channel();
self.tx self.tx
@ -113,7 +113,7 @@ impl SearchChannel {
enum InnerSearchMsg { enum InnerSearchMsg {
Start { Start {
query: Box<SearchQuery>, query: Box<SearchQuery>,
reply: Box<dyn Reply<Data = DistantResponseData>>, reply: Box<dyn Reply<Data = Response>>,
cb: oneshot::Sender<io::Result<SearchId>>, cb: oneshot::Sender<io::Result<SearchId>>,
}, },
Cancel { Cancel {
@ -187,7 +187,7 @@ struct SearchQueryReporter {
id: SearchId, id: SearchId,
options: SearchQueryOptions, options: SearchQueryOptions,
rx: mpsc::UnboundedReceiver<SearchQueryMatch>, rx: mpsc::UnboundedReceiver<SearchQueryMatch>,
reply: Box<dyn Reply<Data = DistantResponseData>>, reply: Box<dyn Reply<Data = Response>>,
} }
impl SearchQueryReporter { impl SearchQueryReporter {
@ -226,7 +226,7 @@ impl SearchQueryReporter {
if matches.len() as u64 >= len { if matches.len() as u64 >= len {
trace!("[Query {id}] Reached {len} paginated matches"); trace!("[Query {id}] Reached {len} paginated matches");
if let Err(x) = reply if let Err(x) = reply
.send(DistantResponseData::SearchResults { .send(Response::SearchResults {
id, id,
matches: std::mem::take(&mut matches), matches: std::mem::take(&mut matches),
}) })
@ -241,17 +241,14 @@ impl SearchQueryReporter {
// Send any remaining matches // Send any remaining matches
if !matches.is_empty() { if !matches.is_empty() {
trace!("[Query {id}] Sending {} remaining matches", matches.len()); trace!("[Query {id}] Sending {} remaining matches", matches.len());
if let Err(x) = reply if let Err(x) = reply.send(Response::SearchResults { id, matches }).await {
.send(DistantResponseData::SearchResults { id, matches })
.await
{
error!("[Query {id}] Failed to send final matches: {x}"); error!("[Query {id}] Failed to send final matches: {x}");
} }
} }
// Report that we are done // Report that we are done
trace!("[Query {id}] Reporting as done"); trace!("[Query {id}] Reporting as done");
if let Err(x) = reply.send(DistantResponseData::SearchDone { id }).await { if let Err(x) = reply.send(Response::SearchDone { id }).await {
error!("[Query {id}] Failed to send done status: {x}"); error!("[Query {id}] Failed to send done status: {x}");
} }
} }
@ -813,7 +810,7 @@ mod tests {
use test_log::test; use test_log::test;
use super::*; use super::*;
use crate::data::{FileType, SearchQueryCondition, SearchQueryMatchData}; use crate::protocol::{FileType, SearchQueryCondition, SearchQueryMatchData};
fn make_path(path: &str) -> PathBuf { fn make_path(path: &str) -> PathBuf {
use std::path::MAIN_SEPARATOR; use std::path::MAIN_SEPARATOR;
@ -834,9 +831,9 @@ mod tests {
root root
} }
fn get_matches(data: DistantResponseData) -> Vec<SearchQueryMatch> { fn get_matches(data: Response) -> Vec<SearchQueryMatch> {
match data { match data {
DistantResponseData::SearchResults { matches, .. } => matches, Response::SearchResults { matches, .. } => matches,
x => panic!("Did not get search results: {x:?}"), x => panic!("Did not get search results: {x:?}"),
} }
} }
@ -858,10 +855,7 @@ mod tests {
let search_id = state.start(query, Box::new(reply)).await.unwrap(); let search_id = state.start(query, Box::new(reply)).await.unwrap();
let data = rx.recv().await; let data = rx.recv().await;
assert_eq!( assert_eq!(data, Some(Response::SearchDone { id: search_id }));
data,
Some(DistantResponseData::SearchDone { id: search_id })
);
assert_eq!(rx.recv().await, None); assert_eq!(rx.recv().await, None);
} }
@ -937,7 +931,7 @@ mod tests {
assert_eq!( assert_eq!(
rx.recv().await, rx.recv().await,
Some(DistantResponseData::SearchDone { id: search_id }) Some(Response::SearchDone { id: search_id })
); );
assert_eq!(rx.recv().await, None); assert_eq!(rx.recv().await, None);
@ -1013,10 +1007,7 @@ mod tests {
); );
let data = rx.recv().await; let data = rx.recv().await;
assert_eq!( assert_eq!(data, Some(Response::SearchDone { id: search_id }));
data,
Some(DistantResponseData::SearchDone { id: search_id })
);
assert_eq!(rx.recv().await, None); assert_eq!(rx.recv().await, None);
} }
@ -1089,10 +1080,7 @@ mod tests {
); );
let data = rx.recv().await; let data = rx.recv().await;
assert_eq!( assert_eq!(data, Some(Response::SearchDone { id: search_id }));
data,
Some(DistantResponseData::SearchDone { id: search_id })
);
assert_eq!(rx.recv().await, None); assert_eq!(rx.recv().await, None);
} }
@ -1181,10 +1169,7 @@ mod tests {
); );
let data = rx.recv().await; let data = rx.recv().await;
assert_eq!( assert_eq!(data, Some(Response::SearchDone { id: search_id }));
data,
Some(DistantResponseData::SearchDone { id: search_id })
);
assert_eq!(rx.recv().await, None); assert_eq!(rx.recv().await, None);
} }
@ -1277,10 +1262,7 @@ mod tests {
); );
let data = rx.recv().await; let data = rx.recv().await;
assert_eq!( assert_eq!(data, Some(Response::SearchDone { id: search_id }));
data,
Some(DistantResponseData::SearchDone { id: search_id })
);
assert_eq!(rx.recv().await, None); assert_eq!(rx.recv().await, None);
} }
@ -1314,10 +1296,7 @@ mod tests {
assert_eq!(matches.len(), 2); assert_eq!(matches.len(), 2);
let data = rx.recv().await; let data = rx.recv().await;
assert_eq!( assert_eq!(data, Some(Response::SearchDone { id: search_id }));
data,
Some(DistantResponseData::SearchDone { id: search_id })
);
assert_eq!(rx.recv().await, None); assert_eq!(rx.recv().await, None);
} }
@ -1355,10 +1334,7 @@ mod tests {
assert_eq!(matches.len(), 1); assert_eq!(matches.len(), 1);
let data = rx.recv().await; let data = rx.recv().await;
assert_eq!( assert_eq!(data, Some(Response::SearchDone { id: search_id }));
data,
Some(DistantResponseData::SearchDone { id: search_id })
);
assert_eq!(rx.recv().await, None); assert_eq!(rx.recv().await, None);
} }
@ -1402,10 +1378,7 @@ mod tests {
assert_eq!(paths, expected_paths); assert_eq!(paths, expected_paths);
let data = rx.recv().await; let data = rx.recv().await;
assert_eq!( assert_eq!(data, Some(Response::SearchDone { id: search_id }));
data,
Some(DistantResponseData::SearchDone { id: search_id })
);
assert_eq!(rx.recv().await, None); assert_eq!(rx.recv().await, None);
} }
@ -1506,10 +1479,7 @@ mod tests {
); );
let data = rx.recv().await; let data = rx.recv().await;
assert_eq!( assert_eq!(data, Some(Response::SearchDone { id: search_id }));
data,
Some(DistantResponseData::SearchDone { id: search_id })
);
assert_eq!(rx.recv().await, None); assert_eq!(rx.recv().await, None);
} }
@ -1574,10 +1544,7 @@ mod tests {
); );
let data = rx.recv().await; let data = rx.recv().await;
assert_eq!( assert_eq!(data, Some(Response::SearchDone { id: search_id }));
data,
Some(DistantResponseData::SearchDone { id: search_id })
);
assert_eq!(rx.recv().await, None); assert_eq!(rx.recv().await, None);
} }
@ -1629,10 +1596,7 @@ mod tests {
); );
let data = rx.recv().await; let data = rx.recv().await;
assert_eq!( assert_eq!(data, Some(Response::SearchDone { id: search_id }));
data,
Some(DistantResponseData::SearchDone { id: search_id })
);
assert_eq!(rx.recv().await, None); assert_eq!(rx.recv().await, None);
} }
@ -1663,10 +1627,7 @@ mod tests {
// Get done indicator next as there were no matches // Get done indicator next as there were no matches
let data = rx.recv().await; let data = rx.recv().await;
assert_eq!( assert_eq!(data, Some(Response::SearchDone { id: search_id }));
data,
Some(DistantResponseData::SearchDone { id: search_id })
);
assert_eq!(rx.recv().await, None); assert_eq!(rx.recv().await, None);
} }
@ -1715,10 +1676,7 @@ mod tests {
); );
let data = rx.recv().await; let data = rx.recv().await;
assert_eq!( assert_eq!(data, Some(Response::SearchDone { id: search_id }));
data,
Some(DistantResponseData::SearchDone { id: search_id })
);
assert_eq!(rx.recv().await, None); assert_eq!(rx.recv().await, None);
} }
@ -1808,10 +1766,7 @@ mod tests {
); );
let data = rx.recv().await; let data = rx.recv().await;
assert_eq!( assert_eq!(data, Some(Response::SearchDone { id: search_id }));
data,
Some(DistantResponseData::SearchDone { id: search_id })
);
assert_eq!(rx.recv().await, None); assert_eq!(rx.recv().await, None);
} }
@ -1867,10 +1822,7 @@ mod tests {
); );
let data = rx.recv().await; let data = rx.recv().await;
assert_eq!( assert_eq!(data, Some(Response::SearchDone { id: search_id }));
data,
Some(DistantResponseData::SearchDone { id: search_id })
);
assert_eq!(rx.recv().await, None); assert_eq!(rx.recv().await, None);
} }
@ -1937,10 +1889,7 @@ mod tests {
); );
let data = rx.recv().await; let data = rx.recv().await;
assert_eq!( assert_eq!(data, Some(Response::SearchDone { id: search_id }));
data,
Some(DistantResponseData::SearchDone { id: search_id })
);
assert_eq!(rx.recv().await, None); assert_eq!(rx.recv().await, None);
} }
@ -1998,10 +1947,7 @@ mod tests {
} }
let data = rx.recv().await; let data = rx.recv().await;
assert_eq!( assert_eq!(data, Some(Response::SearchDone { id: search_id }));
data,
Some(DistantResponseData::SearchDone { id: search_id })
);
assert_eq!(rx.recv().await, None); assert_eq!(rx.recv().await, None);
} }

@ -15,7 +15,7 @@ use tokio::sync::oneshot;
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
use crate::constants::SERVER_WATCHER_CAPACITY; use crate::constants::SERVER_WATCHER_CAPACITY;
use crate::data::ChangeKind; use crate::protocol::ChangeKind;
mod path; mod path;
pub use path::*; pub use path::*;

@ -5,7 +5,7 @@ use std::{fmt, io};
use distant_net::common::ConnectionId; use distant_net::common::ConnectionId;
use distant_net::server::Reply; use distant_net::server::Reply;
use crate::data::{Change, ChangeKind, ChangeKindSet, DistantResponseData, Error}; use crate::protocol::{Change, ChangeKind, ChangeKindSet, Error, Response};
/// Represents a path registered with a watcher that includes relevant state including /// Represents a path registered with a watcher that includes relevant state including
/// the ability to reply with /// the ability to reply with
@ -29,7 +29,7 @@ pub struct RegisteredPath {
allowed: ChangeKindSet, allowed: ChangeKindSet,
/// Used to send a reply through the connection watching this path /// Used to send a reply through the connection watching this path
reply: Box<dyn Reply<Data = DistantResponseData>>, reply: Box<dyn Reply<Data = Response>>,
} }
impl fmt::Debug for RegisteredPath { impl fmt::Debug for RegisteredPath {
@ -69,7 +69,7 @@ impl RegisteredPath {
recursive: bool, recursive: bool,
only: impl Into<ChangeKindSet>, only: impl Into<ChangeKindSet>,
except: impl Into<ChangeKindSet>, except: impl Into<ChangeKindSet>,
reply: Box<dyn Reply<Data = DistantResponseData>>, reply: Box<dyn Reply<Data = Response>>,
) -> io::Result<Self> { ) -> io::Result<Self> {
let raw_path = path.into(); let raw_path = path.into();
let path = tokio::fs::canonicalize(raw_path.as_path()).await?; let path = tokio::fs::canonicalize(raw_path.as_path()).await?;
@ -140,7 +140,7 @@ impl RegisteredPath {
if !paths.is_empty() { if !paths.is_empty() {
self.reply self.reply
.send(DistantResponseData::Changed(Change { kind, paths })) .send(Response::Changed(Change { kind, paths }))
.await .await
.map(|_| true) .map(|_| true)
} else { } else {
@ -171,9 +171,9 @@ impl RegisteredPath {
if !paths.is_empty() || !skip_if_no_paths { if !paths.is_empty() || !skip_if_no_paths {
self.reply self.reply
.send(if paths.is_empty() { .send(if paths.is_empty() {
DistantResponseData::Error(Error::from(msg)) Response::Error(Error::from(msg))
} else { } else {
DistantResponseData::Error(Error::from(format!("{msg} about {paths:?}"))) Response::Error(Error::from(format!("{msg} about {paths:?}")))
}) })
.await .await
.map(|_| true) .map(|_| true)

@ -4,28 +4,27 @@ use std::pin::Pin;
use distant_net::server::Reply; use distant_net::server::Reply;
use crate::api::DistantMsg; use crate::protocol;
use crate::data::DistantResponseData;
/// Wrapper around a reply that can be batch or single, converting /// Wrapper around a reply that can be batch or single, converting
/// a single data into the wrapped type /// a single data into the wrapped type
pub struct DistantSingleReply(Box<dyn Reply<Data = DistantMsg<DistantResponseData>>>); pub struct DistantSingleReply(Box<dyn Reply<Data = protocol::Msg<protocol::Response>>>);
impl From<Box<dyn Reply<Data = DistantMsg<DistantResponseData>>>> for DistantSingleReply { impl From<Box<dyn Reply<Data = protocol::Msg<protocol::Response>>>> for DistantSingleReply {
fn from(reply: Box<dyn Reply<Data = DistantMsg<DistantResponseData>>>) -> Self { fn from(reply: Box<dyn Reply<Data = protocol::Msg<protocol::Response>>>) -> Self {
Self(reply) Self(reply)
} }
} }
impl Reply for DistantSingleReply { impl Reply for DistantSingleReply {
type Data = DistantResponseData; type Data = protocol::Response;
fn send(&self, data: Self::Data) -> Pin<Box<dyn Future<Output = io::Result<()>> + Send + '_>> { fn send(&self, data: Self::Data) -> Pin<Box<dyn Future<Output = io::Result<()>> + Send + '_>> {
self.0.send(DistantMsg::Single(data)) self.0.send(protocol::Msg::Single(data))
} }
fn blocking_send(&self, data: Self::Data) -> io::Result<()> { fn blocking_send(&self, data: Self::Data) -> io::Result<()> {
self.0.blocking_send(DistantMsg::Single(data)) self.0.blocking_send(protocol::Msg::Single(data))
} }
fn clone_reply(&self) -> Box<dyn Reply<Data = Self::Data>> { fn clone_reply(&self) -> Box<dyn Reply<Data = Self::Data>> {

@ -1,7 +1,7 @@
use distant_net::client::Channel; use distant_net::client::Channel;
use distant_net::Client; use distant_net::Client;
use crate::{DistantMsg, DistantRequestData, DistantResponseData}; use crate::protocol;
mod ext; mod ext;
mod lsp; mod lsp;
@ -10,10 +10,12 @@ mod searcher;
mod watcher; mod watcher;
/// Represents a [`Client`] that communicates using the distant protocol /// Represents a [`Client`] that communicates using the distant protocol
pub type DistantClient = Client<DistantMsg<DistantRequestData>, DistantMsg<DistantResponseData>>; pub type DistantClient =
Client<protocol::Msg<protocol::Request>, protocol::Msg<protocol::Response>>;
/// Represents a [`Channel`] that communicates using the distant protocol /// Represents a [`Channel`] that communicates using the distant protocol
pub type DistantChannel = Channel<DistantMsg<DistantRequestData>, DistantMsg<DistantResponseData>>; pub type DistantChannel =
Channel<protocol::Msg<protocol::Request>, protocol::Msg<protocol::Response>>;
pub use ext::*; pub use ext::*;
pub use lsp::*; pub use lsp::*;

@ -10,11 +10,10 @@ use crate::client::{
RemoteCommand, RemoteLspCommand, RemoteLspProcess, RemoteOutput, RemoteProcess, Searcher, RemoteCommand, RemoteLspCommand, RemoteLspProcess, RemoteOutput, RemoteProcess, Searcher,
Watcher, Watcher,
}; };
use crate::data::{ use crate::protocol::{
Capabilities, ChangeKindSet, DirEntry, DistantRequestData, DistantResponseData, Environment, self, Capabilities, ChangeKindSet, DirEntry, Environment, Error as Failure, Metadata, PtySize,
Error as Failure, Metadata, PtySize, SearchId, SearchQuery, SystemInfo, SearchId, SearchQuery, SystemInfo,
}; };
use crate::DistantMsg;
pub type AsyncReturn<'a, T, E = io::Error> = pub type AsyncReturn<'a, T, E = io::Error> =
Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'a>>; Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'a>>;
@ -148,21 +147,21 @@ macro_rules! make_body {
($self:expr, $data:expr, @ok) => { ($self:expr, $data:expr, @ok) => {
make_body!($self, $data, |data| { make_body!($self, $data, |data| {
match data { match data {
DistantResponseData::Ok => Ok(()), protocol::Response::Ok => Ok(()),
DistantResponseData::Error(x) => Err(io::Error::from(x)), protocol::Response::Error(x) => Err(io::Error::from(x)),
_ => Err(mismatched_response()), _ => Err(mismatched_response()),
} }
}) })
}; };
($self:expr, $data:expr, $and_then:expr) => {{ ($self:expr, $data:expr, $and_then:expr) => {{
let req = Request::new(DistantMsg::Single($data)); let req = Request::new(protocol::Msg::Single($data));
Box::pin(async move { Box::pin(async move {
$self $self
.send(req) .send(req)
.await .await
.and_then(|res| match res.payload { .and_then(|res| match res.payload {
DistantMsg::Single(x) => Ok(x), protocol::Msg::Single(x) => Ok(x),
_ => Err(mismatched_response()), _ => Err(mismatched_response()),
}) })
.and_then($and_then) .and_then($and_then)
@ -171,7 +170,7 @@ macro_rules! make_body {
} }
impl DistantChannelExt impl DistantChannelExt
for Channel<DistantMsg<DistantRequestData>, DistantMsg<DistantResponseData>> for Channel<protocol::Msg<protocol::Request>, protocol::Msg<protocol::Response>>
{ {
fn append_file( fn append_file(
&mut self, &mut self,
@ -180,7 +179,7 @@ impl DistantChannelExt
) -> AsyncReturn<'_, ()> { ) -> AsyncReturn<'_, ()> {
make_body!( make_body!(
self, self,
DistantRequestData::FileAppend { path: path.into(), data: data.into() }, protocol::Request::FileAppend { path: path.into(), data: data.into() },
@ok @ok
) )
} }
@ -192,7 +191,7 @@ impl DistantChannelExt
) -> AsyncReturn<'_, ()> { ) -> AsyncReturn<'_, ()> {
make_body!( make_body!(
self, self,
DistantRequestData::FileAppendText { path: path.into(), text: data.into() }, protocol::Request::FileAppendText { path: path.into(), text: data.into() },
@ok @ok
) )
} }
@ -200,10 +199,10 @@ impl DistantChannelExt
fn capabilities(&mut self) -> AsyncReturn<'_, Capabilities> { fn capabilities(&mut self) -> AsyncReturn<'_, Capabilities> {
make_body!( make_body!(
self, self,
DistantRequestData::Capabilities {}, protocol::Request::Capabilities {},
|data| match data { |data| match data {
DistantResponseData::Capabilities { supported } => Ok(supported), protocol::Response::Capabilities { supported } => Ok(supported),
DistantResponseData::Error(x) => Err(io::Error::from(x)), protocol::Response::Error(x) => Err(io::Error::from(x)),
_ => Err(mismatched_response()), _ => Err(mismatched_response()),
} }
) )
@ -212,7 +211,7 @@ impl DistantChannelExt
fn copy(&mut self, src: impl Into<PathBuf>, dst: impl Into<PathBuf>) -> AsyncReturn<'_, ()> { fn copy(&mut self, src: impl Into<PathBuf>, dst: impl Into<PathBuf>) -> AsyncReturn<'_, ()> {
make_body!( make_body!(
self, self,
DistantRequestData::Copy { src: src.into(), dst: dst.into() }, protocol::Request::Copy { src: src.into(), dst: dst.into() },
@ok @ok
) )
} }
@ -220,7 +219,7 @@ impl DistantChannelExt
fn create_dir(&mut self, path: impl Into<PathBuf>, all: bool) -> AsyncReturn<'_, ()> { fn create_dir(&mut self, path: impl Into<PathBuf>, all: bool) -> AsyncReturn<'_, ()> {
make_body!( make_body!(
self, self,
DistantRequestData::DirCreate { path: path.into(), all }, protocol::Request::DirCreate { path: path.into(), all },
@ok @ok
) )
} }
@ -228,10 +227,10 @@ impl DistantChannelExt
fn exists(&mut self, path: impl Into<PathBuf>) -> AsyncReturn<'_, bool> { fn exists(&mut self, path: impl Into<PathBuf>) -> AsyncReturn<'_, bool> {
make_body!( make_body!(
self, self,
DistantRequestData::Exists { path: path.into() }, protocol::Request::Exists { path: path.into() },
|data| match data { |data| match data {
DistantResponseData::Exists { value } => Ok(value), protocol::Response::Exists { value } => Ok(value),
DistantResponseData::Error(x) => Err(io::Error::from(x)), protocol::Response::Error(x) => Err(io::Error::from(x)),
_ => Err(mismatched_response()), _ => Err(mismatched_response()),
} }
) )
@ -245,14 +244,14 @@ impl DistantChannelExt
) -> AsyncReturn<'_, Metadata> { ) -> AsyncReturn<'_, Metadata> {
make_body!( make_body!(
self, self,
DistantRequestData::Metadata { protocol::Request::Metadata {
path: path.into(), path: path.into(),
canonicalize, canonicalize,
resolve_file_type resolve_file_type
}, },
|data| match data { |data| match data {
DistantResponseData::Metadata(x) => Ok(x), protocol::Response::Metadata(x) => Ok(x),
DistantResponseData::Error(x) => Err(io::Error::from(x)), protocol::Response::Error(x) => Err(io::Error::from(x)),
_ => Err(mismatched_response()), _ => Err(mismatched_response()),
} }
) )
@ -266,7 +265,7 @@ impl DistantChannelExt
fn cancel_search(&mut self, id: SearchId) -> AsyncReturn<'_, ()> { fn cancel_search(&mut self, id: SearchId) -> AsyncReturn<'_, ()> {
make_body!( make_body!(
self, self,
DistantRequestData::CancelSearch { id }, protocol::Request::CancelSearch { id },
@ok @ok
) )
} }
@ -281,7 +280,7 @@ impl DistantChannelExt
) -> AsyncReturn<'_, (Vec<DirEntry>, Vec<Failure>)> { ) -> AsyncReturn<'_, (Vec<DirEntry>, Vec<Failure>)> {
make_body!( make_body!(
self, self,
DistantRequestData::DirRead { protocol::Request::DirRead {
path: path.into(), path: path.into(),
depth, depth,
absolute, absolute,
@ -289,8 +288,8 @@ impl DistantChannelExt
include_root include_root
}, },
|data| match data { |data| match data {
DistantResponseData::DirEntries { entries, errors } => Ok((entries, errors)), protocol::Response::DirEntries { entries, errors } => Ok((entries, errors)),
DistantResponseData::Error(x) => Err(io::Error::from(x)), protocol::Response::Error(x) => Err(io::Error::from(x)),
_ => Err(mismatched_response()), _ => Err(mismatched_response()),
} }
) )
@ -299,10 +298,10 @@ impl DistantChannelExt
fn read_file(&mut self, path: impl Into<PathBuf>) -> AsyncReturn<'_, Vec<u8>> { fn read_file(&mut self, path: impl Into<PathBuf>) -> AsyncReturn<'_, Vec<u8>> {
make_body!( make_body!(
self, self,
DistantRequestData::FileRead { path: path.into() }, protocol::Request::FileRead { path: path.into() },
|data| match data { |data| match data {
DistantResponseData::Blob { data } => Ok(data), protocol::Response::Blob { data } => Ok(data),
DistantResponseData::Error(x) => Err(io::Error::from(x)), protocol::Response::Error(x) => Err(io::Error::from(x)),
_ => Err(mismatched_response()), _ => Err(mismatched_response()),
} }
) )
@ -311,10 +310,10 @@ impl DistantChannelExt
fn read_file_text(&mut self, path: impl Into<PathBuf>) -> AsyncReturn<'_, String> { fn read_file_text(&mut self, path: impl Into<PathBuf>) -> AsyncReturn<'_, String> {
make_body!( make_body!(
self, self,
DistantRequestData::FileReadText { path: path.into() }, protocol::Request::FileReadText { path: path.into() },
|data| match data { |data| match data {
DistantResponseData::Text { data } => Ok(data), protocol::Response::Text { data } => Ok(data),
DistantResponseData::Error(x) => Err(io::Error::from(x)), protocol::Response::Error(x) => Err(io::Error::from(x)),
_ => Err(mismatched_response()), _ => Err(mismatched_response()),
} }
) )
@ -323,7 +322,7 @@ impl DistantChannelExt
fn remove(&mut self, path: impl Into<PathBuf>, force: bool) -> AsyncReturn<'_, ()> { fn remove(&mut self, path: impl Into<PathBuf>, force: bool) -> AsyncReturn<'_, ()> {
make_body!( make_body!(
self, self,
DistantRequestData::Remove { path: path.into(), force }, protocol::Request::Remove { path: path.into(), force },
@ok @ok
) )
} }
@ -331,7 +330,7 @@ impl DistantChannelExt
fn rename(&mut self, src: impl Into<PathBuf>, dst: impl Into<PathBuf>) -> AsyncReturn<'_, ()> { fn rename(&mut self, src: impl Into<PathBuf>, dst: impl Into<PathBuf>) -> AsyncReturn<'_, ()> {
make_body!( make_body!(
self, self,
DistantRequestData::Rename { src: src.into(), dst: dst.into() }, protocol::Request::Rename { src: src.into(), dst: dst.into() },
@ok @ok
) )
} }
@ -351,12 +350,15 @@ impl DistantChannelExt
fn unwatch(&mut self, path: impl Into<PathBuf>) -> AsyncReturn<'_, ()> { fn unwatch(&mut self, path: impl Into<PathBuf>) -> AsyncReturn<'_, ()> {
fn inner_unwatch( fn inner_unwatch(
channel: &mut Channel<DistantMsg<DistantRequestData>, DistantMsg<DistantResponseData>>, channel: &mut Channel<
protocol::Msg<protocol::Request>,
protocol::Msg<protocol::Response>,
>,
path: impl Into<PathBuf>, path: impl Into<PathBuf>,
) -> AsyncReturn<'_, ()> { ) -> AsyncReturn<'_, ()> {
make_body!( make_body!(
channel, channel,
DistantRequestData::Unwatch { path: path.into() }, protocol::Request::Unwatch { path: path.into() },
@ok @ok
) )
} }
@ -423,9 +425,9 @@ impl DistantChannelExt
} }
fn system_info(&mut self) -> AsyncReturn<'_, SystemInfo> { fn system_info(&mut self) -> AsyncReturn<'_, SystemInfo> {
make_body!(self, DistantRequestData::SystemInfo {}, |data| match data { make_body!(self, protocol::Request::SystemInfo {}, |data| match data {
DistantResponseData::SystemInfo(x) => Ok(x), protocol::Response::SystemInfo(x) => Ok(x),
DistantResponseData::Error(x) => Err(io::Error::from(x)), protocol::Response::Error(x) => Err(io::Error::from(x)),
_ => Err(mismatched_response()), _ => Err(mismatched_response()),
}) })
} }
@ -437,7 +439,7 @@ impl DistantChannelExt
) -> AsyncReturn<'_, ()> { ) -> AsyncReturn<'_, ()> {
make_body!( make_body!(
self, self,
DistantRequestData::FileWrite { path: path.into(), data: data.into() }, protocol::Request::FileWrite { path: path.into(), data: data.into() },
@ok @ok
) )
} }
@ -449,7 +451,7 @@ impl DistantChannelExt
) -> AsyncReturn<'_, ()> { ) -> AsyncReturn<'_, ()> {
make_body!( make_body!(
self, self,
DistantRequestData::FileWriteText { path: path.into(), text: data.into() }, protocol::Request::FileWriteText { path: path.into(), text: data.into() },
@ok @ok
) )
} }

@ -3,15 +3,15 @@ use std::ops::{Deref, DerefMut};
use std::path::PathBuf; use std::path::PathBuf;
use futures::stream::{Stream, StreamExt}; use futures::stream::{Stream, StreamExt};
use tokio::sync::mpsc;
use tokio::sync::mpsc::error::TryRecvError; use tokio::sync::mpsc::error::TryRecvError;
use tokio::sync::mpsc::{self};
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
use crate::client::{ use crate::client::{
DistantChannel, RemoteCommand, RemoteProcess, RemoteStatus, RemoteStderr, RemoteStdin, DistantChannel, RemoteCommand, RemoteProcess, RemoteStatus, RemoteStderr, RemoteStdin,
RemoteStdout, RemoteStdout,
}; };
use crate::data::{Environment, PtySize}; use crate::protocol::{Environment, PtySize};
mod msg; mod msg;
pub use msg::*; pub use msg::*;
@ -402,7 +402,7 @@ mod tests {
use test_log::test; use test_log::test;
use super::*; use super::*;
use crate::data::{DistantRequestData, DistantResponseData}; use crate::protocol;
/// Timeout used with timeout function /// Timeout used with timeout function
const TIMEOUT: Duration = Duration::from_millis(50); const TIMEOUT: Duration = Duration::from_millis(50);
@ -421,12 +421,12 @@ mod tests {
}); });
// Wait until we get the request from the session // Wait until we get the request from the session
let req: Request<DistantRequestData> = t1.read_frame_as().await.unwrap().unwrap(); let req: Request<protocol::Request> = t1.read_frame_as().await.unwrap().unwrap();
// Send back a response through the session // Send back a response through the session
t1.write_frame_for(&Response::new( t1.write_frame_for(&Response::new(
req.id, req.id,
DistantResponseData::ProcSpawned { id: rand::random() }, protocol::Response::ProcSpawned { id: rand::random() },
)) ))
.await .await
.unwrap(); .unwrap();
@ -473,9 +473,9 @@ mod tests {
.unwrap(); .unwrap();
// Validate that the outgoing req is a complete LSP message // Validate that the outgoing req is a complete LSP message
let req: Request<DistantRequestData> = transport.read_frame_as().await.unwrap().unwrap(); let req: Request<protocol::Request> = transport.read_frame_as().await.unwrap().unwrap();
match req.payload { match req.payload {
DistantRequestData::ProcStdin { data, .. } => { protocol::Request::ProcStdin { data, .. } => {
assert_eq!( assert_eq!(
data, data,
make_lsp_msg(serde_json::json!({ make_lsp_msg(serde_json::json!({
@ -507,7 +507,7 @@ mod tests {
tokio::task::yield_now().await; tokio::task::yield_now().await;
let result = timeout( let result = timeout(
TIMEOUT, TIMEOUT,
transport.read_frame_as::<Request<DistantRequestData>>(), transport.read_frame_as::<Request<protocol::Request>>(),
) )
.await; .await;
assert!(result.is_err(), "Unexpectedly got data: {:?}", result); assert!(result.is_err(), "Unexpectedly got data: {:?}", result);
@ -516,9 +516,9 @@ mod tests {
proc.stdin.as_mut().unwrap().write(msg_b).await.unwrap(); proc.stdin.as_mut().unwrap().write(msg_b).await.unwrap();
// Validate that the outgoing req is a complete LSP message // Validate that the outgoing req is a complete LSP message
let req: Request<DistantRequestData> = transport.read_frame_as().await.unwrap().unwrap(); let req: Request<protocol::Request> = transport.read_frame_as().await.unwrap().unwrap();
match req.payload { match req.payload {
DistantRequestData::ProcStdin { data, .. } => { protocol::Request::ProcStdin { data, .. } => {
assert_eq!( assert_eq!(
data, data,
make_lsp_msg(serde_json::json!({ make_lsp_msg(serde_json::json!({
@ -551,9 +551,9 @@ mod tests {
.unwrap(); .unwrap();
// Validate that the outgoing req is a complete LSP message // Validate that the outgoing req is a complete LSP message
let req: Request<DistantRequestData> = transport.read_frame_as().await.unwrap().unwrap(); let req: Request<protocol::Request> = transport.read_frame_as().await.unwrap().unwrap();
match req.payload { match req.payload {
DistantRequestData::ProcStdin { data, .. } => { protocol::Request::ProcStdin { data, .. } => {
assert_eq!( assert_eq!(
data, data,
make_lsp_msg(serde_json::json!({ make_lsp_msg(serde_json::json!({
@ -600,9 +600,9 @@ mod tests {
.unwrap(); .unwrap();
// Validate that the first outgoing req is a complete LSP message matching first // Validate that the first outgoing req is a complete LSP message matching first
let req: Request<DistantRequestData> = transport.read_frame_as().await.unwrap().unwrap(); let req: Request<protocol::Request> = transport.read_frame_as().await.unwrap().unwrap();
match req.payload { match req.payload {
DistantRequestData::ProcStdin { data, .. } => { protocol::Request::ProcStdin { data, .. } => {
assert_eq!( assert_eq!(
data, data,
make_lsp_msg(serde_json::json!({ make_lsp_msg(serde_json::json!({
@ -615,9 +615,9 @@ mod tests {
} }
// Validate that the second outgoing req is a complete LSP message matching second // Validate that the second outgoing req is a complete LSP message matching second
let req: Request<DistantRequestData> = transport.read_frame_as().await.unwrap().unwrap(); let req: Request<protocol::Request> = transport.read_frame_as().await.unwrap().unwrap();
match req.payload { match req.payload {
DistantRequestData::ProcStdin { data, .. } => { protocol::Request::ProcStdin { data, .. } => {
assert_eq!( assert_eq!(
data, data,
make_lsp_msg(serde_json::json!({ make_lsp_msg(serde_json::json!({
@ -645,9 +645,9 @@ mod tests {
.unwrap(); .unwrap();
// Validate that the outgoing req is a complete LSP message // Validate that the outgoing req is a complete LSP message
let req: Request<DistantRequestData> = transport.read_frame_as().await.unwrap().unwrap(); let req: Request<protocol::Request> = transport.read_frame_as().await.unwrap().unwrap();
match req.payload { match req.payload {
DistantRequestData::ProcStdin { data, .. } => { protocol::Request::ProcStdin { data, .. } => {
// Verify the contents AND headers are as expected; in this case, // Verify the contents AND headers are as expected; in this case,
// this will also ensure that the Content-Length is adjusted // this will also ensure that the Content-Length is adjusted
// when the distant scheme was changed to file // when the distant scheme was changed to file
@ -671,7 +671,7 @@ mod tests {
transport transport
.write_frame_for(&Response::new( .write_frame_for(&Response::new(
proc.origin_id().to_string(), proc.origin_id().to_string(),
DistantResponseData::ProcStdout { protocol::Response::ProcStdout {
id: proc.id(), id: proc.id(),
data: make_lsp_msg(serde_json::json!({ data: make_lsp_msg(serde_json::json!({
"field1": "a", "field1": "a",
@ -707,7 +707,7 @@ mod tests {
transport transport
.write_frame_for(&Response::new( .write_frame_for(&Response::new(
proc.origin_id().to_string(), proc.origin_id().to_string(),
DistantResponseData::ProcStdout { protocol::Response::ProcStdout {
id: proc.id(), id: proc.id(),
data: msg_a.to_vec(), data: msg_a.to_vec(),
}, },
@ -725,7 +725,7 @@ mod tests {
transport transport
.write_frame_for(&Response::new( .write_frame_for(&Response::new(
proc.origin_id().to_string(), proc.origin_id().to_string(),
DistantResponseData::ProcStdout { protocol::Response::ProcStdout {
id: proc.id(), id: proc.id(),
data: msg_b.to_vec(), data: msg_b.to_vec(),
}, },
@ -759,7 +759,7 @@ mod tests {
transport transport
.write_frame_for(&Response::new( .write_frame_for(&Response::new(
proc.origin_id().to_string(), proc.origin_id().to_string(),
DistantResponseData::ProcStdout { protocol::Response::ProcStdout {
id: proc.id(), id: proc.id(),
data: format!("{}{}", String::from_utf8(msg).unwrap(), extra).into_bytes(), data: format!("{}{}", String::from_utf8(msg).unwrap(), extra).into_bytes(),
}, },
@ -802,7 +802,7 @@ mod tests {
transport transport
.write_frame_for(&Response::new( .write_frame_for(&Response::new(
proc.origin_id().to_string(), proc.origin_id().to_string(),
DistantResponseData::ProcStdout { protocol::Response::ProcStdout {
id: proc.id(), id: proc.id(),
data: format!( data: format!(
"{}{}", "{}{}",
@ -844,7 +844,7 @@ mod tests {
transport transport
.write_frame_for(&Response::new( .write_frame_for(&Response::new(
proc.origin_id().to_string(), proc.origin_id().to_string(),
DistantResponseData::ProcStdout { protocol::Response::ProcStdout {
id: proc.id(), id: proc.id(),
data: make_lsp_msg(serde_json::json!({ data: make_lsp_msg(serde_json::json!({
"field1": "distant://some/path", "field1": "distant://some/path",
@ -874,7 +874,7 @@ mod tests {
transport transport
.write_frame_for(&Response::new( .write_frame_for(&Response::new(
proc.origin_id().to_string(), proc.origin_id().to_string(),
DistantResponseData::ProcStderr { protocol::Response::ProcStderr {
id: proc.id(), id: proc.id(),
data: make_lsp_msg(serde_json::json!({ data: make_lsp_msg(serde_json::json!({
"field1": "a", "field1": "a",
@ -910,7 +910,7 @@ mod tests {
transport transport
.write_frame_for(&Response::new( .write_frame_for(&Response::new(
proc.origin_id().to_string(), proc.origin_id().to_string(),
DistantResponseData::ProcStderr { protocol::Response::ProcStderr {
id: proc.id(), id: proc.id(),
data: msg_a.to_vec(), data: msg_a.to_vec(),
}, },
@ -928,7 +928,7 @@ mod tests {
transport transport
.write_frame_for(&Response::new( .write_frame_for(&Response::new(
proc.origin_id().to_string(), proc.origin_id().to_string(),
DistantResponseData::ProcStderr { protocol::Response::ProcStderr {
id: proc.id(), id: proc.id(),
data: msg_b.to_vec(), data: msg_b.to_vec(),
}, },
@ -962,7 +962,7 @@ mod tests {
transport transport
.write_frame_for(&Response::new( .write_frame_for(&Response::new(
proc.origin_id().to_string(), proc.origin_id().to_string(),
DistantResponseData::ProcStderr { protocol::Response::ProcStderr {
id: proc.id(), id: proc.id(),
data: format!("{}{}", String::from_utf8(msg).unwrap(), extra).into_bytes(), data: format!("{}{}", String::from_utf8(msg).unwrap(), extra).into_bytes(),
}, },
@ -1005,7 +1005,7 @@ mod tests {
transport transport
.write_frame_for(&Response::new( .write_frame_for(&Response::new(
proc.origin_id().to_string(), proc.origin_id().to_string(),
DistantResponseData::ProcStderr { protocol::Response::ProcStderr {
id: proc.id(), id: proc.id(),
data: format!( data: format!(
"{}{}", "{}{}",
@ -1047,7 +1047,7 @@ mod tests {
transport transport
.write_frame_for(&Response::new( .write_frame_for(&Response::new(
proc.origin_id().to_string(), proc.origin_id().to_string(),
DistantResponseData::ProcStderr { protocol::Response::ProcStderr {
id: proc.id(), id: proc.id(),
data: make_lsp_msg(serde_json::json!({ data: make_lsp_msg(serde_json::json!({
"field1": "distant://some/path", "field1": "distant://some/path",

@ -5,15 +5,14 @@ use distant_net::client::Mailbox;
use distant_net::common::{Request, Response}; use distant_net::common::{Request, Response};
use log::*; use log::*;
use tokio::io; use tokio::io;
use tokio::sync::mpsc;
use tokio::sync::mpsc::error::{TryRecvError, TrySendError}; use tokio::sync::mpsc::error::{TryRecvError, TrySendError};
use tokio::sync::mpsc::{self};
use tokio::sync::RwLock; use tokio::sync::RwLock;
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
use crate::client::DistantChannel; use crate::client::DistantChannel;
use crate::constants::CLIENT_PIPE_CAPACITY; use crate::constants::CLIENT_PIPE_CAPACITY;
use crate::data::{Cmd, DistantRequestData, DistantResponseData, Environment, ProcessId, PtySize}; use crate::protocol::{self, Cmd, Environment, ProcessId, PtySize};
use crate::DistantMsg;
#[derive(Clone, Debug, PartialEq, Eq)] #[derive(Clone, Debug, PartialEq, Eq)]
pub struct RemoteOutput { pub struct RemoteOutput {
@ -89,8 +88,8 @@ impl RemoteCommand {
// Submit our run request and get back a mailbox for responses // Submit our run request and get back a mailbox for responses
let mut mailbox = channel let mut mailbox = channel
.mail(Request::new(DistantMsg::Single( .mail(Request::new(protocol::Msg::Single(
DistantRequestData::ProcSpawn { protocol::Request::ProcSpawn {
cmd: Cmd::from(cmd), cmd: Cmd::from(cmd),
pty: self.pty, pty: self.pty,
environment: self.environment.clone(), environment: self.environment.clone(),
@ -104,15 +103,17 @@ impl RemoteCommand {
Some(res) => { Some(res) => {
let origin_id = res.origin_id; let origin_id = res.origin_id;
match res.payload { match res.payload {
DistantMsg::Single(DistantResponseData::ProcSpawned { id }) => (id, origin_id), protocol::Msg::Single(protocol::Response::ProcSpawned { id }) => {
DistantMsg::Single(DistantResponseData::Error(x)) => return Err(x.into()), (id, origin_id)
DistantMsg::Single(x) => { }
protocol::Msg::Single(protocol::Response::Error(x)) => return Err(x.into()),
protocol::Msg::Single(x) => {
return Err(io::Error::new( return Err(io::Error::new(
io::ErrorKind::InvalidData, io::ErrorKind::InvalidData,
format!("Got response type of {}", x.as_ref()), format!("Got response type of {}", x.as_ref()),
)) ))
} }
DistantMsg::Batch(_) => { protocol::Msg::Batch(_) => {
return Err(io::Error::new( return Err(io::Error::new(
io::ErrorKind::InvalidData, io::ErrorKind::InvalidData,
"Got batch instead of single response", "Got batch instead of single response",
@ -492,7 +493,7 @@ async fn process_outgoing_requests(
match data { match data {
Some(data) => channel.fire( Some(data) => channel.fire(
Request::new( Request::new(
DistantMsg::Single(DistantRequestData::ProcStdin { id, data }) protocol::Msg::Single(protocol::Request::ProcStdin { id, data })
) )
).await?, ).await?,
None => break Err(errors::dead_channel()), None => break Err(errors::dead_channel()),
@ -502,7 +503,7 @@ async fn process_outgoing_requests(
match size { match size {
Some(size) => channel.fire( Some(size) => channel.fire(
Request::new( Request::new(
DistantMsg::Single(DistantRequestData::ProcResizePty { id, size }) protocol::Msg::Single(protocol::Request::ProcResizePty { id, size })
) )
).await?, ).await?,
None => break Err(errors::dead_channel()), None => break Err(errors::dead_channel()),
@ -511,7 +512,7 @@ async fn process_outgoing_requests(
msg = kill_rx.recv() => { msg = kill_rx.recv() => {
if msg.is_some() { if msg.is_some() {
channel.fire(Request::new( channel.fire(Request::new(
DistantMsg::Single(DistantRequestData::ProcKill { id }) protocol::Msg::Single(protocol::Request::ProcKill { id })
)).await?; )).await?;
break Ok(()); break Ok(());
} else { } else {
@ -528,7 +529,7 @@ async fn process_outgoing_requests(
/// Helper function that loops, processing incoming stdout & stderr requests from a remote process /// Helper function that loops, processing incoming stdout & stderr requests from a remote process
async fn process_incoming_responses( async fn process_incoming_responses(
proc_id: ProcessId, proc_id: ProcessId,
mut mailbox: Mailbox<Response<DistantMsg<DistantResponseData>>>, mut mailbox: Mailbox<Response<protocol::Msg<protocol::Response>>>,
stdout_tx: mpsc::Sender<Vec<u8>>, stdout_tx: mpsc::Sender<Vec<u8>>,
stderr_tx: mpsc::Sender<Vec<u8>>, stderr_tx: mpsc::Sender<Vec<u8>>,
kill_tx: mpsc::Sender<()>, kill_tx: mpsc::Sender<()>,
@ -538,7 +539,7 @@ async fn process_incoming_responses(
// Check if any of the payload data is the termination // Check if any of the payload data is the termination
let exit_status = payload.iter().find_map(|data| match data { let exit_status = payload.iter().find_map(|data| match data {
DistantResponseData::ProcDone { id, success, code } if *id == proc_id => { protocol::Response::ProcDone { id, success, code } if *id == proc_id => {
Some((*success, *code)) Some((*success, *code))
} }
_ => None, _ => None,
@ -548,10 +549,10 @@ async fn process_incoming_responses(
// TODO: What should we do about unexpected data? For now, just ignore // TODO: What should we do about unexpected data? For now, just ignore
for data in payload { for data in payload {
match data { match data {
DistantResponseData::ProcStdout { id, data } if id == proc_id => { protocol::Response::ProcStdout { id, data } if id == proc_id => {
let _ = stdout_tx.send(data).await; let _ = stdout_tx.send(data).await;
} }
DistantResponseData::ProcStderr { id, data } if id == proc_id => { protocol::Response::ProcStderr { id, data } if id == proc_id => {
let _ = stderr_tx.send(data).await; let _ = stderr_tx.send(data).await;
} }
_ => {} _ => {}
@ -596,7 +597,7 @@ mod tests {
use super::*; use super::*;
use crate::client::DistantClient; use crate::client::DistantClient;
use crate::data::{Error, ErrorKind}; use crate::protocol::{Error, ErrorKind};
fn make_session() -> (FramedTransport<InmemoryTransport>, DistantClient) { fn make_session() -> (FramedTransport<InmemoryTransport>, DistantClient) {
let (t1, t2) = FramedTransport::pair(100); let (t1, t2) = FramedTransport::pair(100);
@ -616,14 +617,14 @@ mod tests {
}); });
// Wait until we get the request from the session // Wait until we get the request from the session
let req: Request<DistantMsg<DistantRequestData>> = let req: Request<protocol::Msg<protocol::Request>> =
transport.read_frame_as().await.unwrap().unwrap(); transport.read_frame_as().await.unwrap().unwrap();
// Send back a response through the session // Send back a response through the session
transport transport
.write_frame_for(&Response::new( .write_frame_for(&Response::new(
req.id, req.id,
DistantMsg::Batch(vec![DistantResponseData::ProcSpawned { id: 1 }]), protocol::Msg::Batch(vec![protocol::Response::ProcSpawned { id: 1 }]),
)) ))
.await .await
.unwrap(); .unwrap();
@ -648,14 +649,14 @@ mod tests {
}); });
// Wait until we get the request from the session // Wait until we get the request from the session
let req: Request<DistantMsg<DistantRequestData>> = let req: Request<protocol::Msg<protocol::Request>> =
transport.read_frame_as().await.unwrap().unwrap(); transport.read_frame_as().await.unwrap().unwrap();
// Send back a response through the session // Send back a response through the session
transport transport
.write_frame_for(&Response::new( .write_frame_for(&Response::new(
req.id, req.id,
DistantMsg::Single(DistantResponseData::Error(Error { protocol::Msg::Single(protocol::Response::Error(Error {
kind: ErrorKind::BrokenPipe, kind: ErrorKind::BrokenPipe,
description: String::from("some error"), description: String::from("some error"),
})), })),
@ -683,7 +684,7 @@ mod tests {
}); });
// Wait until we get the request from the session // Wait until we get the request from the session
let req: Request<DistantMsg<DistantRequestData>> = let req: Request<protocol::Msg<protocol::Request>> =
transport.read_frame_as().await.unwrap().unwrap(); transport.read_frame_as().await.unwrap().unwrap();
// Send back a response through the session // Send back a response through the session
@ -691,7 +692,7 @@ mod tests {
transport transport
.write_frame_for(&Response::new( .write_frame_for(&Response::new(
req.id, req.id,
DistantMsg::Single(DistantResponseData::ProcSpawned { id }), protocol::Msg::Single(protocol::Response::ProcSpawned { id }),
)) ))
.await .await
.unwrap(); .unwrap();
@ -722,7 +723,7 @@ mod tests {
}); });
// Wait until we get the request from the session // Wait until we get the request from the session
let req: Request<DistantMsg<DistantRequestData>> = let req: Request<protocol::Msg<protocol::Request>> =
transport.read_frame_as().await.unwrap().unwrap(); transport.read_frame_as().await.unwrap().unwrap();
// Send back a response through the session // Send back a response through the session
@ -730,7 +731,7 @@ mod tests {
transport transport
.write_frame_for(&Response::new( .write_frame_for(&Response::new(
req.id, req.id,
DistantMsg::Single(DistantResponseData::ProcSpawned { id }), protocol::Msg::Single(protocol::Response::ProcSpawned { id }),
)) ))
.await .await
.unwrap(); .unwrap();
@ -740,10 +741,10 @@ mod tests {
assert!(proc.kill().await.is_ok(), "Failed to send kill request"); assert!(proc.kill().await.is_ok(), "Failed to send kill request");
// Verify the kill request was sent // Verify the kill request was sent
let req: Request<DistantMsg<DistantRequestData>> = let req: Request<protocol::Msg<protocol::Request>> =
transport.read_frame_as().await.unwrap().unwrap(); transport.read_frame_as().await.unwrap().unwrap();
match req.payload { match req.payload {
DistantMsg::Single(DistantRequestData::ProcKill { id: proc_id }) => { protocol::Msg::Single(protocol::Request::ProcKill { id: proc_id }) => {
assert_eq!(proc_id, id) assert_eq!(proc_id, id)
} }
x => panic!("Unexpected request: {:?}", x), x => panic!("Unexpected request: {:?}", x),
@ -775,7 +776,7 @@ mod tests {
}); });
// Wait until we get the request from the session // Wait until we get the request from the session
let req: Request<DistantMsg<DistantRequestData>> = let req: Request<protocol::Msg<protocol::Request>> =
transport.read_frame_as().await.unwrap().unwrap(); transport.read_frame_as().await.unwrap().unwrap();
// Send back a response through the session // Send back a response through the session
@ -783,7 +784,7 @@ mod tests {
transport transport
.write_frame_for(&Response::new( .write_frame_for(&Response::new(
req.id, req.id,
DistantMsg::Single(DistantResponseData::ProcSpawned { id }), protocol::Msg::Single(protocol::Response::ProcSpawned { id }),
)) ))
.await .await
.unwrap(); .unwrap();
@ -798,10 +799,10 @@ mod tests {
.unwrap(); .unwrap();
// Verify that a request is made through the session // Verify that a request is made through the session
let req: Request<DistantMsg<DistantRequestData>> = let req: Request<protocol::Msg<protocol::Request>> =
transport.read_frame_as().await.unwrap().unwrap(); transport.read_frame_as().await.unwrap().unwrap();
match req.payload { match req.payload {
DistantMsg::Single(DistantRequestData::ProcStdin { id, data }) => { protocol::Msg::Single(protocol::Request::ProcStdin { id, data }) => {
assert_eq!(id, 12345); assert_eq!(id, 12345);
assert_eq!(data, b"some input"); assert_eq!(data, b"some input");
} }
@ -822,7 +823,7 @@ mod tests {
}); });
// Wait until we get the request from the session // Wait until we get the request from the session
let req: Request<DistantMsg<DistantRequestData>> = let req: Request<protocol::Msg<protocol::Request>> =
transport.read_frame_as().await.unwrap().unwrap(); transport.read_frame_as().await.unwrap().unwrap();
// Send back a response through the session // Send back a response through the session
@ -830,7 +831,7 @@ mod tests {
transport transport
.write_frame_for(&Response::new( .write_frame_for(&Response::new(
req.id.clone(), req.id.clone(),
DistantMsg::Single(DistantResponseData::ProcSpawned { id }), protocol::Msg::Single(protocol::Response::ProcSpawned { id }),
)) ))
.await .await
.unwrap(); .unwrap();
@ -841,7 +842,7 @@ mod tests {
transport transport
.write_frame_for(&Response::new( .write_frame_for(&Response::new(
req.id, req.id,
DistantMsg::Single(DistantResponseData::ProcStdout { protocol::Msg::Single(protocol::Response::ProcStdout {
id, id,
data: b"some out".to_vec(), data: b"some out".to_vec(),
}), }),
@ -866,7 +867,7 @@ mod tests {
}); });
// Wait until we get the request from the session // Wait until we get the request from the session
let req: Request<DistantMsg<DistantRequestData>> = let req: Request<protocol::Msg<protocol::Request>> =
transport.read_frame_as().await.unwrap().unwrap(); transport.read_frame_as().await.unwrap().unwrap();
// Send back a response through the session // Send back a response through the session
@ -874,7 +875,7 @@ mod tests {
transport transport
.write_frame_for(&Response::new( .write_frame_for(&Response::new(
req.id.clone(), req.id.clone(),
DistantMsg::Single(DistantResponseData::ProcSpawned { id }), protocol::Msg::Single(protocol::Response::ProcSpawned { id }),
)) ))
.await .await
.unwrap(); .unwrap();
@ -885,7 +886,7 @@ mod tests {
transport transport
.write_frame_for(&Response::new( .write_frame_for(&Response::new(
req.id, req.id,
DistantMsg::Single(DistantResponseData::ProcStderr { protocol::Msg::Single(protocol::Response::ProcStderr {
id, id,
data: b"some err".to_vec(), data: b"some err".to_vec(),
}), }),
@ -910,7 +911,7 @@ mod tests {
}); });
// Wait until we get the request from the session // Wait until we get the request from the session
let req: Request<DistantMsg<DistantRequestData>> = let req: Request<protocol::Msg<protocol::Request>> =
transport.read_frame_as().await.unwrap().unwrap(); transport.read_frame_as().await.unwrap().unwrap();
// Send back a response through the session // Send back a response through the session
@ -918,7 +919,7 @@ mod tests {
transport transport
.write_frame_for(&Response::new( .write_frame_for(&Response::new(
req.id, req.id,
DistantMsg::Single(DistantResponseData::ProcSpawned { id }), protocol::Msg::Single(protocol::Response::ProcSpawned { id }),
)) ))
.await .await
.unwrap(); .unwrap();
@ -943,7 +944,7 @@ mod tests {
}); });
// Wait until we get the request from the session // Wait until we get the request from the session
let req: Request<DistantMsg<DistantRequestData>> = let req: Request<protocol::Msg<protocol::Request>> =
transport.read_frame_as().await.unwrap().unwrap(); transport.read_frame_as().await.unwrap().unwrap();
// Send back a response through the session // Send back a response through the session
@ -951,7 +952,7 @@ mod tests {
transport transport
.write_frame_for(&Response::new( .write_frame_for(&Response::new(
req.id, req.id,
DistantMsg::Single(DistantResponseData::ProcSpawned { id }), protocol::Msg::Single(protocol::Response::ProcSpawned { id }),
)) ))
.await .await
.unwrap(); .unwrap();
@ -990,7 +991,7 @@ mod tests {
}); });
// Wait until we get the request from the session // Wait until we get the request from the session
let req: Request<DistantMsg<DistantRequestData>> = let req: Request<protocol::Msg<protocol::Request>> =
transport.read_frame_as().await.unwrap().unwrap(); transport.read_frame_as().await.unwrap().unwrap();
// Send back a response through the session // Send back a response through the session
@ -998,7 +999,7 @@ mod tests {
transport transport
.write_frame_for(&Response::new( .write_frame_for(&Response::new(
req.id.clone(), req.id.clone(),
DistantMsg::Single(DistantResponseData::ProcSpawned { id }), protocol::Msg::Single(protocol::Response::ProcSpawned { id }),
)) ))
.await .await
.unwrap(); .unwrap();
@ -1010,7 +1011,7 @@ mod tests {
transport transport
.write_frame_for(&Response::new( .write_frame_for(&Response::new(
req.id, req.id,
DistantMsg::Single(DistantResponseData::ProcDone { protocol::Msg::Single(protocol::Response::ProcDone {
id, id,
success: true, success: true,
code: Some(123), code: Some(123),
@ -1045,7 +1046,7 @@ mod tests {
}); });
// Wait until we get the request from the session // Wait until we get the request from the session
let req: Request<DistantMsg<DistantRequestData>> = let req: Request<protocol::Msg<protocol::Request>> =
transport.read_frame_as().await.unwrap().unwrap(); transport.read_frame_as().await.unwrap().unwrap();
// Send back a response through the session // Send back a response through the session
@ -1053,7 +1054,7 @@ mod tests {
transport transport
.write_frame_for(&Response::new( .write_frame_for(&Response::new(
req.id, req.id,
DistantMsg::Single(DistantResponseData::ProcSpawned { id }), protocol::Msg::Single(protocol::Response::ProcSpawned { id }),
)) ))
.await .await
.unwrap(); .unwrap();
@ -1081,7 +1082,7 @@ mod tests {
}); });
// Wait until we get the request from the session // Wait until we get the request from the session
let req: Request<DistantMsg<DistantRequestData>> = let req: Request<protocol::Msg<protocol::Request>> =
transport.read_frame_as().await.unwrap().unwrap(); transport.read_frame_as().await.unwrap().unwrap();
// Send back a response through the session // Send back a response through the session
@ -1089,7 +1090,7 @@ mod tests {
transport transport
.write_frame_for(&Response::new( .write_frame_for(&Response::new(
req.id, req.id,
DistantMsg::Single(DistantResponseData::ProcSpawned { id }), protocol::Msg::Single(protocol::Response::ProcSpawned { id }),
)) ))
.await .await
.unwrap(); .unwrap();
@ -1124,7 +1125,7 @@ mod tests {
}); });
// Wait until we get the request from the session // Wait until we get the request from the session
let req: Request<DistantMsg<DistantRequestData>> = let req: Request<protocol::Msg<protocol::Request>> =
transport.read_frame_as().await.unwrap().unwrap(); transport.read_frame_as().await.unwrap().unwrap();
// Send back a response through the session // Send back a response through the session
@ -1132,7 +1133,7 @@ mod tests {
transport transport
.write_frame_for(&Response::new( .write_frame_for(&Response::new(
req.id.clone(), req.id.clone(),
DistantMsg::Single(DistantResponseData::ProcSpawned { id }), protocol::Msg::Single(protocol::Response::ProcSpawned { id }),
)) ))
.await .await
.unwrap(); .unwrap();
@ -1145,7 +1146,7 @@ mod tests {
transport transport
.write_frame_for(&Response::new( .write_frame_for(&Response::new(
req.id, req.id,
DistantMsg::Single(DistantResponseData::ProcDone { protocol::Msg::Single(protocol::Response::ProcDone {
id, id,
success: false, success: false,
code: Some(123), code: Some(123),
@ -1177,7 +1178,7 @@ mod tests {
}); });
// Wait until we get the request from the session // Wait until we get the request from the session
let req: Request<DistantMsg<DistantRequestData>> = let req: Request<protocol::Msg<protocol::Request>> =
transport.read_frame_as().await.unwrap().unwrap(); transport.read_frame_as().await.unwrap().unwrap();
// Send back a response through the session // Send back a response through the session
@ -1185,7 +1186,7 @@ mod tests {
transport transport
.write_frame_for(&Response::new( .write_frame_for(&Response::new(
req.id.clone(), req.id.clone(),
DistantMsg::Single(DistantResponseData::ProcSpawned { id }), protocol::Msg::Single(protocol::Response::ProcSpawned { id }),
)) ))
.await .await
.unwrap(); .unwrap();
@ -1198,7 +1199,7 @@ mod tests {
transport transport
.write_frame_for(&Response::new( .write_frame_for(&Response::new(
req.id.clone(), req.id.clone(),
DistantMsg::Single(DistantResponseData::ProcStdout { protocol::Msg::Single(protocol::Response::ProcStdout {
id, id,
data: b"some out".to_vec(), data: b"some out".to_vec(),
}), }),
@ -1210,7 +1211,7 @@ mod tests {
transport transport
.write_frame_for(&Response::new( .write_frame_for(&Response::new(
req.id.clone(), req.id.clone(),
DistantMsg::Single(DistantResponseData::ProcStderr { protocol::Msg::Single(protocol::Response::ProcStderr {
id, id,
data: b"some err".to_vec(), data: b"some err".to_vec(),
}), }),
@ -1222,7 +1223,7 @@ mod tests {
transport transport
.write_frame_for(&Response::new( .write_frame_for(&Response::new(
req.id, req.id,
DistantMsg::Single(DistantResponseData::ProcDone { protocol::Msg::Single(protocol::Response::ProcDone {
id, id,
success: false, success: false,
code: Some(123), code: Some(123),

@ -7,10 +7,7 @@ use tokio::task::JoinHandle;
use crate::client::{DistantChannel, DistantChannelExt}; use crate::client::{DistantChannel, DistantChannelExt};
use crate::constants::CLIENT_SEARCHER_CAPACITY; use crate::constants::CLIENT_SEARCHER_CAPACITY;
use crate::data::{ use crate::protocol::{self, SearchId, SearchQuery, SearchQueryMatch};
DistantRequestData, DistantResponseData, SearchId, SearchQuery, SearchQueryMatch,
};
use crate::DistantMsg;
/// Represents a searcher for files, directories, and symlinks on the filesystem /// Represents a searcher for files, directories, and symlinks on the filesystem
pub struct Searcher { pub struct Searcher {
@ -37,8 +34,8 @@ impl Searcher {
// Submit our run request and get back a mailbox for responses // Submit our run request and get back a mailbox for responses
let mut mailbox = channel let mut mailbox = channel
.mail(Request::new(DistantMsg::Single( .mail(Request::new(protocol::Msg::Single(
DistantRequestData::Search { protocol::Request::Search {
query: query.clone(), query: query.clone(),
}, },
))) )))
@ -53,18 +50,18 @@ impl Searcher {
for data in res.payload.into_vec() { for data in res.payload.into_vec() {
match data { match data {
// If we get results before the started indicator, queue them up // If we get results before the started indicator, queue them up
DistantResponseData::SearchResults { matches, .. } => { protocol::Response::SearchResults { matches, .. } => {
queue.extend(matches); queue.extend(matches);
} }
// Once we get the started indicator, mark as ready to go // Once we get the started indicator, mark as ready to go
DistantResponseData::SearchStarted { id } => { protocol::Response::SearchStarted { id } => {
trace!("[Query {id}] Searcher has started"); trace!("[Query {id}] Searcher has started");
search_id = Some(id); search_id = Some(id);
} }
// If we get an explicit error, convert and return it // If we get an explicit error, convert and return it
DistantResponseData::Error(x) => return Err(io::Error::from(x)), protocol::Response::Error(x) => return Err(io::Error::from(x)),
// Otherwise, we got something unexpected, and report as such // Otherwise, we got something unexpected, and report as such
x => { x => {
@ -118,7 +115,7 @@ impl Searcher {
for data in res.payload.into_vec() { for data in res.payload.into_vec() {
match data { match data {
DistantResponseData::SearchResults { matches, .. } => { protocol::Response::SearchResults { matches, .. } => {
// If we can't queue up a match anymore, we've // If we can't queue up a match anymore, we've
// been closed and therefore want to quit // been closed and therefore want to quit
if tx.is_closed() { if tx.is_closed() {
@ -138,7 +135,7 @@ impl Searcher {
} }
// Received completion indicator, so close out // Received completion indicator, so close out
DistantResponseData::SearchDone { .. } => { protocol::Response::SearchDone { .. } => {
trace!("[Query {search_id}] Searcher has finished"); trace!("[Query {search_id}] Searcher has finished");
done = true; done = true;
break; break;
@ -202,7 +199,7 @@ mod tests {
use tokio::sync::Mutex; use tokio::sync::Mutex;
use super::*; use super::*;
use crate::data::{ use crate::protocol::{
SearchQueryCondition, SearchQueryMatchData, SearchQueryOptions, SearchQueryPathMatch, SearchQueryCondition, SearchQueryMatchData, SearchQueryOptions, SearchQueryPathMatch,
SearchQuerySubmatch, SearchQueryTarget, SearchQuerySubmatch, SearchQueryTarget,
}; };
@ -233,13 +230,13 @@ mod tests {
}; };
// Wait until we get the request from the session // Wait until we get the request from the session
let req: Request<DistantRequestData> = transport.read_frame_as().await.unwrap().unwrap(); let req: Request<protocol::Request> = transport.read_frame_as().await.unwrap().unwrap();
// Send back an acknowledgement that a search was started // Send back an acknowledgement that a search was started
transport transport
.write_frame_for(&Response::new( .write_frame_for(&Response::new(
req.id, req.id,
DistantResponseData::SearchStarted { id: rand::random() }, protocol::Response::SearchStarted { id: rand::random() },
)) ))
.await .await
.unwrap(); .unwrap();
@ -269,14 +266,14 @@ mod tests {
); );
// Wait until we get the request from the session // Wait until we get the request from the session
let req: Request<DistantRequestData> = transport.read_frame_as().await.unwrap().unwrap(); let req: Request<protocol::Request> = transport.read_frame_as().await.unwrap().unwrap();
// Send back an acknowledgement that a searcher was created // Send back an acknowledgement that a searcher was created
let id = rand::random::<SearchId>(); let id = rand::random::<SearchId>();
transport transport
.write_frame_for(&Response::new( .write_frame_for(&Response::new(
req.id.clone(), req.id.clone(),
DistantResponseData::SearchStarted { id }, protocol::Response::SearchStarted { id },
)) ))
.await .await
.unwrap(); .unwrap();
@ -289,7 +286,7 @@ mod tests {
.write_frame_for(&Response::new( .write_frame_for(&Response::new(
req.id, req.id,
vec![ vec![
DistantResponseData::SearchResults { protocol::Response::SearchResults {
id, id,
matches: vec![ matches: vec![
SearchQueryMatch::Path(SearchQueryPathMatch { SearchQueryMatch::Path(SearchQueryPathMatch {
@ -310,7 +307,7 @@ mod tests {
}), }),
], ],
}, },
DistantResponseData::SearchResults { protocol::Response::SearchResults {
id, id,
matches: vec![SearchQueryMatch::Path(SearchQueryPathMatch { matches: vec![SearchQueryMatch::Path(SearchQueryPathMatch {
path: PathBuf::from("/some/path/3"), path: PathBuf::from("/some/path/3"),
@ -388,14 +385,14 @@ mod tests {
); );
// Wait until we get the request from the session // Wait until we get the request from the session
let req: Request<DistantRequestData> = transport.read_frame_as().await.unwrap().unwrap(); let req: Request<protocol::Request> = transport.read_frame_as().await.unwrap().unwrap();
// Send back an acknowledgement that a searcher was created // Send back an acknowledgement that a searcher was created
let id = rand::random(); let id = rand::random();
transport transport
.write_frame_for(&Response::new( .write_frame_for(&Response::new(
req.id.clone(), req.id.clone(),
DistantResponseData::SearchStarted { id }, protocol::Response::SearchStarted { id },
)) ))
.await .await
.unwrap(); .unwrap();
@ -407,7 +404,7 @@ mod tests {
transport transport
.write_frame_for(&Response::new( .write_frame_for(&Response::new(
req.id.clone(), req.id.clone(),
DistantResponseData::SearchResults { protocol::Response::SearchResults {
id, id,
matches: vec![SearchQueryMatch::Path(SearchQueryPathMatch { matches: vec![SearchQueryMatch::Path(SearchQueryPathMatch {
path: PathBuf::from("/some/path/1"), path: PathBuf::from("/some/path/1"),
@ -426,7 +423,7 @@ mod tests {
transport transport
.write_frame_for(&Response::new( .write_frame_for(&Response::new(
req.id.clone() + "1", req.id.clone() + "1",
DistantResponseData::SearchResults { protocol::Response::SearchResults {
id, id,
matches: vec![SearchQueryMatch::Path(SearchQueryPathMatch { matches: vec![SearchQueryMatch::Path(SearchQueryPathMatch {
path: PathBuf::from("/some/path/2"), path: PathBuf::from("/some/path/2"),
@ -445,7 +442,7 @@ mod tests {
transport transport
.write_frame_for(&Response::new( .write_frame_for(&Response::new(
req.id, req.id,
DistantResponseData::SearchResults { protocol::Response::SearchResults {
id, id,
matches: vec![SearchQueryMatch::Path(SearchQueryPathMatch { matches: vec![SearchQueryMatch::Path(SearchQueryPathMatch {
path: PathBuf::from("/some/path/3"), path: PathBuf::from("/some/path/3"),
@ -509,14 +506,14 @@ mod tests {
); );
// Wait until we get the request from the session // Wait until we get the request from the session
let req: Request<DistantRequestData> = transport.read_frame_as().await.unwrap().unwrap(); let req: Request<protocol::Request> = transport.read_frame_as().await.unwrap().unwrap();
// Send back an acknowledgement that a watcher was created // Send back an acknowledgement that a watcher was created
let id = rand::random::<SearchId>(); let id = rand::random::<SearchId>();
transport transport
.write_frame_for(&Response::new( .write_frame_for(&Response::new(
req.id.clone(), req.id.clone(),
DistantResponseData::SearchStarted { id }, protocol::Response::SearchStarted { id },
)) ))
.await .await
.unwrap(); .unwrap();
@ -525,7 +522,7 @@ mod tests {
transport transport
.write_frame_for(&Response::new( .write_frame_for(&Response::new(
req.id, req.id,
DistantResponseData::SearchResults { protocol::Response::SearchResults {
id, id,
matches: vec![ matches: vec![
SearchQueryMatch::Path(SearchQueryPathMatch { SearchQueryMatch::Path(SearchQueryPathMatch {
@ -580,10 +577,10 @@ mod tests {
let searcher_2 = Arc::clone(&searcher); let searcher_2 = Arc::clone(&searcher);
let cancel_task = tokio::spawn(async move { searcher_2.lock().await.cancel().await }); let cancel_task = tokio::spawn(async move { searcher_2.lock().await.cancel().await });
let req: Request<DistantRequestData> = transport.read_frame_as().await.unwrap().unwrap(); let req: Request<protocol::Request> = transport.read_frame_as().await.unwrap().unwrap();
transport transport
.write_frame_for(&Response::new(req.id.clone(), DistantResponseData::Ok)) .write_frame_for(&Response::new(req.id.clone(), protocol::Response::Ok))
.await .await
.unwrap(); .unwrap();
@ -594,7 +591,7 @@ mod tests {
transport transport
.write_frame_for(&Response::new( .write_frame_for(&Response::new(
req.id, req.id,
DistantResponseData::SearchResults { protocol::Response::SearchResults {
id, id,
matches: vec![SearchQueryMatch::Path(SearchQueryPathMatch { matches: vec![SearchQueryMatch::Path(SearchQueryPathMatch {
path: PathBuf::from("/some/path/3"), path: PathBuf::from("/some/path/3"),

@ -8,8 +8,7 @@ use tokio::task::JoinHandle;
use crate::client::{DistantChannel, DistantChannelExt}; use crate::client::{DistantChannel, DistantChannelExt};
use crate::constants::CLIENT_WATCHER_CAPACITY; use crate::constants::CLIENT_WATCHER_CAPACITY;
use crate::data::{Change, ChangeKindSet, DistantRequestData, DistantResponseData}; use crate::protocol::{self, Change, ChangeKindSet};
use crate::DistantMsg;
/// Represents a watcher of some path on a remote machine /// Represents a watcher of some path on a remote machine
pub struct Watcher { pub struct Watcher {
@ -56,8 +55,8 @@ impl Watcher {
// Submit our run request and get back a mailbox for responses // Submit our run request and get back a mailbox for responses
let mut mailbox = channel let mut mailbox = channel
.mail(Request::new(DistantMsg::Single( .mail(Request::new(protocol::Msg::Single(
DistantRequestData::Watch { protocol::Request::Watch {
path: path.to_path_buf(), path: path.to_path_buf(),
recursive, recursive,
only: only.into_sorted_vec(), only: only.into_sorted_vec(),
@ -74,11 +73,11 @@ impl Watcher {
while let Some(res) = mailbox.next().await { while let Some(res) = mailbox.next().await {
for data in res.payload.into_vec() { for data in res.payload.into_vec() {
match data { match data {
DistantResponseData::Changed(change) => queue.push(change), protocol::Response::Changed(change) => queue.push(change),
DistantResponseData::Ok => { protocol::Response::Ok => {
confirmed = true; confirmed = true;
} }
DistantResponseData::Error(x) => return Err(io::Error::from(x)), protocol::Response::Error(x) => return Err(io::Error::from(x)),
x => { x => {
return Err(io::Error::new( return Err(io::Error::new(
io::ErrorKind::Other, io::ErrorKind::Other,
@ -118,7 +117,7 @@ impl Watcher {
while let Some(res) = mailbox.next().await { while let Some(res) = mailbox.next().await {
for data in res.payload.into_vec() { for data in res.payload.into_vec() {
match data { match data {
DistantResponseData::Changed(change) => { protocol::Response::Changed(change) => {
// If we can't queue up a change anymore, we've // If we can't queue up a change anymore, we've
// been closed and therefore want to quit // been closed and therefore want to quit
if tx.is_closed() { if tx.is_closed() {
@ -188,7 +187,7 @@ mod tests {
use tokio::sync::Mutex; use tokio::sync::Mutex;
use super::*; use super::*;
use crate::data::ChangeKind; use crate::protocol::ChangeKind;
use crate::DistantClient; use crate::DistantClient;
fn make_session() -> (FramedTransport<InmemoryTransport>, DistantClient) { fn make_session() -> (FramedTransport<InmemoryTransport>, DistantClient) {
@ -215,11 +214,11 @@ mod tests {
}); });
// Wait until we get the request from the session // Wait until we get the request from the session
let req: Request<DistantRequestData> = transport.read_frame_as().await.unwrap().unwrap(); let req: Request<protocol::Request> = transport.read_frame_as().await.unwrap().unwrap();
// Send back an acknowledgement that a watcher was created // Send back an acknowledgement that a watcher was created
transport transport
.write_frame_for(&Response::new(req.id, DistantResponseData::Ok)) .write_frame_for(&Response::new(req.id, protocol::Response::Ok))
.await .await
.unwrap(); .unwrap();
@ -247,11 +246,11 @@ mod tests {
}); });
// Wait until we get the request from the session // Wait until we get the request from the session
let req: Request<DistantRequestData> = transport.read_frame_as().await.unwrap().unwrap(); let req: Request<protocol::Request> = transport.read_frame_as().await.unwrap().unwrap();
// Send back an acknowledgement that a watcher was created // Send back an acknowledgement that a watcher was created
transport transport
.write_frame_for(&Response::new(req.id.clone(), DistantResponseData::Ok)) .write_frame_for(&Response::new(req.id.clone(), protocol::Response::Ok))
.await .await
.unwrap(); .unwrap();
@ -263,11 +262,11 @@ mod tests {
.write_frame_for(&Response::new( .write_frame_for(&Response::new(
req.id, req.id,
vec![ vec![
DistantResponseData::Changed(Change { protocol::Response::Changed(Change {
kind: ChangeKind::Access, kind: ChangeKind::Access,
paths: vec![test_path.to_path_buf()], paths: vec![test_path.to_path_buf()],
}), }),
DistantResponseData::Changed(Change { protocol::Response::Changed(Change {
kind: ChangeKind::Content, kind: ChangeKind::Content,
paths: vec![test_path.to_path_buf()], paths: vec![test_path.to_path_buf()],
}), }),
@ -315,11 +314,11 @@ mod tests {
}); });
// Wait until we get the request from the session // Wait until we get the request from the session
let req: Request<DistantRequestData> = transport.read_frame_as().await.unwrap().unwrap(); let req: Request<protocol::Request> = transport.read_frame_as().await.unwrap().unwrap();
// Send back an acknowledgement that a watcher was created // Send back an acknowledgement that a watcher was created
transport transport
.write_frame_for(&Response::new(req.id.clone(), DistantResponseData::Ok)) .write_frame_for(&Response::new(req.id.clone(), protocol::Response::Ok))
.await .await
.unwrap(); .unwrap();
@ -330,7 +329,7 @@ mod tests {
transport transport
.write_frame_for(&Response::new( .write_frame_for(&Response::new(
req.id.clone(), req.id.clone(),
DistantResponseData::Changed(Change { protocol::Response::Changed(Change {
kind: ChangeKind::Access, kind: ChangeKind::Access,
paths: vec![test_path.to_path_buf()], paths: vec![test_path.to_path_buf()],
}), }),
@ -342,7 +341,7 @@ mod tests {
transport transport
.write_frame_for(&Response::new( .write_frame_for(&Response::new(
req.id.clone() + "1", req.id.clone() + "1",
DistantResponseData::Changed(Change { protocol::Response::Changed(Change {
kind: ChangeKind::Content, kind: ChangeKind::Content,
paths: vec![test_path.to_path_buf()], paths: vec![test_path.to_path_buf()],
}), }),
@ -354,7 +353,7 @@ mod tests {
transport transport
.write_frame_for(&Response::new( .write_frame_for(&Response::new(
req.id, req.id,
DistantResponseData::Changed(Change { protocol::Response::Changed(Change {
kind: ChangeKind::Remove, kind: ChangeKind::Remove,
paths: vec![test_path.to_path_buf()], paths: vec![test_path.to_path_buf()],
}), }),
@ -401,11 +400,11 @@ mod tests {
}); });
// Wait until we get the request from the session // Wait until we get the request from the session
let req: Request<DistantRequestData> = transport.read_frame_as().await.unwrap().unwrap(); let req: Request<protocol::Request> = transport.read_frame_as().await.unwrap().unwrap();
// Send back an acknowledgement that a watcher was created // Send back an acknowledgement that a watcher was created
transport transport
.write_frame_for(&Response::new(req.id.clone(), DistantResponseData::Ok)) .write_frame_for(&Response::new(req.id.clone(), protocol::Response::Ok))
.await .await
.unwrap(); .unwrap();
@ -414,15 +413,15 @@ mod tests {
.write_frame_for(&Response::new( .write_frame_for(&Response::new(
req.id, req.id,
vec![ vec![
DistantResponseData::Changed(Change { protocol::Response::Changed(Change {
kind: ChangeKind::Access, kind: ChangeKind::Access,
paths: vec![test_path.to_path_buf()], paths: vec![test_path.to_path_buf()],
}), }),
DistantResponseData::Changed(Change { protocol::Response::Changed(Change {
kind: ChangeKind::Content, kind: ChangeKind::Content,
paths: vec![test_path.to_path_buf()], paths: vec![test_path.to_path_buf()],
}), }),
DistantResponseData::Changed(Change { protocol::Response::Changed(Change {
kind: ChangeKind::Remove, kind: ChangeKind::Remove,
paths: vec![test_path.to_path_buf()], paths: vec![test_path.to_path_buf()],
}), }),
@ -457,10 +456,10 @@ mod tests {
let watcher_2 = Arc::clone(&watcher); let watcher_2 = Arc::clone(&watcher);
let unwatch_task = tokio::spawn(async move { watcher_2.lock().await.unwatch().await }); let unwatch_task = tokio::spawn(async move { watcher_2.lock().await.unwatch().await });
let req: Request<DistantRequestData> = transport.read_frame_as().await.unwrap().unwrap(); let req: Request<protocol::Request> = transport.read_frame_as().await.unwrap().unwrap();
transport transport
.write_frame_for(&Response::new(req.id.clone(), DistantResponseData::Ok)) .write_frame_for(&Response::new(req.id.clone(), protocol::Response::Ok))
.await .await
.unwrap(); .unwrap();
@ -470,7 +469,7 @@ mod tests {
transport transport
.write_frame_for(&Response::new( .write_frame_for(&Response::new(
req.id, req.id,
DistantResponseData::Changed(Change { protocol::Response::Changed(Change {
kind: ChangeKind::Unknown, kind: ChangeKind::Unknown,
paths: vec![test_path.to_path_buf()], paths: vec![test_path.to_path_buf()],
}), }),

@ -7,8 +7,7 @@ pub use client::*;
mod credentials; mod credentials;
pub use credentials::*; pub use credentials::*;
pub mod data; pub mod protocol;
pub use data::{DistantMsg, DistantRequestData, DistantResponseData};
mod constants; mod constants;
mod serde_str; mod serde_str;

@ -45,12 +45,12 @@ pub type Environment = distant_net::common::Map;
#[derive(Clone, Debug, From, PartialEq, Eq, Serialize, Deserialize)] #[derive(Clone, Debug, From, PartialEq, Eq, Serialize, Deserialize)]
#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))] #[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))]
#[serde(untagged)] #[serde(untagged)]
pub enum DistantMsg<T> { pub enum Msg<T> {
Single(T), Single(T),
Batch(Vec<T>), Batch(Vec<T>),
} }
impl<T> DistantMsg<T> { impl<T> Msg<T> {
/// Returns true if msg has a single payload /// Returns true if msg has a single payload
pub fn is_single(&self) -> bool { pub fn is_single(&self) -> bool {
matches!(self, Self::Single(_)) matches!(self, Self::Single(_))
@ -119,9 +119,9 @@ impl<T> DistantMsg<T> {
} }
#[cfg(feature = "schemars")] #[cfg(feature = "schemars")]
impl<T: schemars::JsonSchema> DistantMsg<T> { impl<T: schemars::JsonSchema> Msg<T> {
pub fn root_schema() -> schemars::schema::RootSchema { pub fn root_schema() -> schemars::schema::RootSchema {
schemars::schema_for!(DistantMsg<T>) schemars::schema_for!(Msg<T>)
} }
} }
@ -148,7 +148,7 @@ impl<T: schemars::JsonSchema> DistantMsg<T> {
#[strum_discriminants(name(CapabilityKind))] #[strum_discriminants(name(CapabilityKind))]
#[strum_discriminants(strum(serialize_all = "snake_case"))] #[strum_discriminants(strum(serialize_all = "snake_case"))]
#[serde(rename_all = "snake_case", deny_unknown_fields, tag = "type")] #[serde(rename_all = "snake_case", deny_unknown_fields, tag = "type")]
pub enum DistantRequestData { pub enum Request {
/// Retrieve information about the server's capabilities /// Retrieve information about the server's capabilities
#[strum_discriminants(strum(message = "Supports retrieving capabilities"))] #[strum_discriminants(strum(message = "Supports retrieving capabilities"))]
Capabilities {}, Capabilities {},
@ -414,9 +414,9 @@ pub enum DistantRequestData {
} }
#[cfg(feature = "schemars")] #[cfg(feature = "schemars")]
impl DistantRequestData { impl Request {
pub fn root_schema() -> schemars::schema::RootSchema { pub fn root_schema() -> schemars::schema::RootSchema {
schemars::schema_for!(DistantRequestData) schemars::schema_for!(Request)
} }
} }
@ -425,7 +425,7 @@ impl DistantRequestData {
#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))] #[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))]
#[serde(rename_all = "snake_case", deny_unknown_fields, tag = "type")] #[serde(rename_all = "snake_case", deny_unknown_fields, tag = "type")]
#[strum(serialize_all = "snake_case")] #[strum(serialize_all = "snake_case")]
pub enum DistantResponseData { pub enum Response {
/// General okay with no extra data, returned in cases like /// General okay with no extra data, returned in cases like
/// creating or removing a directory, copying a file, or renaming /// creating or removing a directory, copying a file, or renaming
/// a file /// a file
@ -535,13 +535,13 @@ pub enum DistantResponseData {
} }
#[cfg(feature = "schemars")] #[cfg(feature = "schemars")]
impl DistantResponseData { impl Response {
pub fn root_schema() -> schemars::schema::RootSchema { pub fn root_schema() -> schemars::schema::RootSchema {
schemars::schema_for!(DistantResponseData) schemars::schema_for!(Response)
} }
} }
impl From<io::Error> for DistantResponseData { impl From<io::Error> for Response {
fn from(x: io::Error) -> Self { fn from(x: io::Error) -> Self {
Self::Error(Error::from(x)) Self::Error(Error::from(x))
} }

@ -101,7 +101,7 @@ impl Metadata {
unix: Some({ unix: Some({
use std::os::unix::prelude::*; use std::os::unix::prelude::*;
let mode = metadata.mode(); let mode = metadata.mode();
crate::data::UnixMetadata::from(mode) crate::protocol::UnixMetadata::from(mode)
}), }),
#[cfg(not(unix))] #[cfg(not(unix))]
unix: None, unix: None,
@ -110,7 +110,7 @@ impl Metadata {
windows: Some({ windows: Some({
use std::os::windows::prelude::*; use std::os::windows::prelude::*;
let attributes = metadata.file_attributes(); let attributes = metadata.file_attributes();
crate::data::WindowsMetadata::from(attributes) crate::protocol::WindowsMetadata::from(attributes)
}), }),
#[cfg(not(windows))] #[cfg(not(windows))]
windows: None, windows: None,

@ -1,5 +1,5 @@
use assert_fs::prelude::*; use assert_fs::prelude::*;
use distant_core::data::ChangeKindSet; use distant_core::protocol::ChangeKindSet;
use distant_core::DistantChannelExt; use distant_core::DistantChannelExt;
use rstest::*; use rstest::*;
use test_log::test; use test_log::test;

@ -7,11 +7,11 @@ use std::time::Duration;
use async_compat::CompatExt; use async_compat::CompatExt;
use async_once_cell::OnceCell; use async_once_cell::OnceCell;
use async_trait::async_trait; use async_trait::async_trait;
use distant_core::data::{ use distant_core::net::server::ConnectionCtx;
use distant_core::protocol::{
Capabilities, CapabilityKind, DirEntry, Environment, FileType, Metadata, ProcessId, PtySize, Capabilities, CapabilityKind, DirEntry, Environment, FileType, Metadata, ProcessId, PtySize,
SystemInfo, UnixMetadata, SystemInfo, UnixMetadata,
}; };
use distant_core::net::server::ConnectionCtx;
use distant_core::{DistantApi, DistantCtx}; use distant_core::{DistantApi, DistantCtx};
use log::*; use log::*;
use tokio::sync::{mpsc, RwLock}; use tokio::sync::{mpsc, RwLock};

@ -4,8 +4,8 @@ use std::path::PathBuf;
use std::time::Duration; use std::time::Duration;
use async_compat::CompatExt; use async_compat::CompatExt;
use distant_core::data::{DistantResponseData, Environment, ProcessId, PtySize};
use distant_core::net::server::Reply; use distant_core::net::server::Reply;
use distant_core::protocol::{Environment, ProcessId, PtySize, Response};
use log::*; use log::*;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
@ -32,7 +32,7 @@ pub async fn spawn_simple<F, R>(
cmd: &str, cmd: &str,
environment: Environment, environment: Environment,
current_dir: Option<PathBuf>, current_dir: Option<PathBuf>,
reply: Box<dyn Reply<Data = DistantResponseData>>, reply: Box<dyn Reply<Data = Response>>,
cleanup: F, cleanup: F,
) -> io::Result<SpawnResult> ) -> io::Result<SpawnResult>
where where
@ -117,7 +117,7 @@ pub async fn spawn_pty<F, R>(
environment: Environment, environment: Environment,
current_dir: Option<PathBuf>, current_dir: Option<PathBuf>,
size: PtySize, size: PtySize,
reply: Box<dyn Reply<Data = DistantResponseData>>, reply: Box<dyn Reply<Data = Response>>,
cleanup: F, cleanup: F,
) -> io::Result<SpawnResult> ) -> io::Result<SpawnResult>
where where
@ -205,14 +205,14 @@ where
fn spawn_blocking_stdout_task( fn spawn_blocking_stdout_task(
id: ProcessId, id: ProcessId,
mut reader: impl Read + Send + 'static, mut reader: impl Read + Send + 'static,
reply: Box<dyn Reply<Data = DistantResponseData>>, reply: Box<dyn Reply<Data = Response>>,
) -> JoinHandle<()> { ) -> JoinHandle<()> {
tokio::task::spawn_blocking(move || { tokio::task::spawn_blocking(move || {
let mut buf: [u8; MAX_PIPE_CHUNK_SIZE] = [0; MAX_PIPE_CHUNK_SIZE]; let mut buf: [u8; MAX_PIPE_CHUNK_SIZE] = [0; MAX_PIPE_CHUNK_SIZE];
loop { loop {
match reader.read(&mut buf) { match reader.read(&mut buf) {
Ok(n) if n > 0 => { Ok(n) if n > 0 => {
let payload = DistantResponseData::ProcStdout { let payload = Response::ProcStdout {
id, id,
data: buf[..n].to_vec(), data: buf[..n].to_vec(),
}; };
@ -236,14 +236,14 @@ fn spawn_blocking_stdout_task(
fn spawn_nonblocking_stdout_task( fn spawn_nonblocking_stdout_task(
id: ProcessId, id: ProcessId,
mut reader: impl Read + Send + 'static, mut reader: impl Read + Send + 'static,
reply: Box<dyn Reply<Data = DistantResponseData>>, reply: Box<dyn Reply<Data = Response>>,
) -> JoinHandle<()> { ) -> JoinHandle<()> {
tokio::spawn(async move { tokio::spawn(async move {
let mut buf: [u8; MAX_PIPE_CHUNK_SIZE] = [0; MAX_PIPE_CHUNK_SIZE]; let mut buf: [u8; MAX_PIPE_CHUNK_SIZE] = [0; MAX_PIPE_CHUNK_SIZE];
loop { loop {
match reader.read(&mut buf) { match reader.read(&mut buf) {
Ok(n) if n > 0 => { Ok(n) if n > 0 => {
let payload = DistantResponseData::ProcStdout { let payload = Response::ProcStdout {
id, id,
data: buf[..n].to_vec(), data: buf[..n].to_vec(),
}; };
@ -270,14 +270,14 @@ fn spawn_nonblocking_stdout_task(
fn spawn_nonblocking_stderr_task( fn spawn_nonblocking_stderr_task(
id: ProcessId, id: ProcessId,
mut reader: impl Read + Send + 'static, mut reader: impl Read + Send + 'static,
reply: Box<dyn Reply<Data = DistantResponseData>>, reply: Box<dyn Reply<Data = Response>>,
) -> JoinHandle<()> { ) -> JoinHandle<()> {
tokio::spawn(async move { tokio::spawn(async move {
let mut buf: [u8; MAX_PIPE_CHUNK_SIZE] = [0; MAX_PIPE_CHUNK_SIZE]; let mut buf: [u8; MAX_PIPE_CHUNK_SIZE] = [0; MAX_PIPE_CHUNK_SIZE];
loop { loop {
match reader.read(&mut buf) { match reader.read(&mut buf) {
Ok(n) if n > 0 => { Ok(n) if n > 0 => {
let payload = DistantResponseData::ProcStderr { let payload = Response::ProcStderr {
id, id,
data: buf[..n].to_vec(), data: buf[..n].to_vec(),
}; };
@ -348,7 +348,7 @@ fn spawn_cleanup_task<F, R>(
stdin_task: JoinHandle<()>, stdin_task: JoinHandle<()>,
stdout_task: JoinHandle<()>, stdout_task: JoinHandle<()>,
stderr_task: Option<JoinHandle<()>>, stderr_task: Option<JoinHandle<()>>,
reply: Box<dyn Reply<Data = DistantResponseData>>, reply: Box<dyn Reply<Data = Response>>,
cleanup: F, cleanup: F,
) -> JoinHandle<()> ) -> JoinHandle<()>
where where
@ -417,7 +417,7 @@ where
cleanup(id).await; cleanup(id).await;
let payload = DistantResponseData::ProcDone { let payload = Response::ProcDone {
id, id,
success: !should_kill && success, success: !should_kill && success,
code: if success { Some(0) } else { None }, code: if success { Some(0) } else { None },

@ -4,7 +4,7 @@ use std::time::Duration;
use assert_fs::prelude::*; use assert_fs::prelude::*;
use assert_fs::TempDir; use assert_fs::TempDir;
use distant_core::data::{ChangeKindSet, Environment, FileType, Metadata}; use distant_core::protocol::{ChangeKindSet, Environment, FileType, Metadata};
use distant_core::{DistantChannelExt, DistantClient}; use distant_core::{DistantChannelExt, DistantClient};
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use predicates::prelude::*; use predicates::prelude::*;

@ -3,7 +3,7 @@ use std::time::Duration;
use assert_fs::prelude::*; use assert_fs::prelude::*;
use assert_fs::TempDir; use assert_fs::TempDir;
use distant_core::data::{ChangeKindSet, Environment, FileType, Metadata}; use distant_core::protocol::{ChangeKindSet, Environment, FileType, Metadata};
use distant_core::{DistantChannelExt, DistantClient}; use distant_core::{DistantChannelExt, DistantClient};
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use predicates::prelude::*; use predicates::prelude::*;

@ -4,13 +4,10 @@ use std::path::Path;
use std::time::Duration; use std::time::Duration;
use anyhow::Context; use anyhow::Context;
use distant_core::data::{ChangeKindSet, FileType, SearchQuery, SystemInfo};
use distant_core::net::common::{ConnectionId, Host, Map, Request, Response}; use distant_core::net::common::{ConnectionId, Host, Map, Request, Response};
use distant_core::net::manager::ManagerClient; use distant_core::net::manager::ManagerClient;
use distant_core::{ use distant_core::protocol::{self, ChangeKindSet, FileType, SearchQuery, SystemInfo};
DistantChannel, DistantChannelExt, DistantMsg, DistantRequestData, DistantResponseData, use distant_core::{DistantChannel, DistantChannelExt, RemoteCommand, Searcher, Watcher};
RemoteCommand, Searcher, Watcher,
};
use log::*; use log::*;
use serde_json::json; use serde_json::json;
use tabled::object::Rows; use tabled::object::Rows;
@ -281,8 +278,8 @@ async fn async_run(cmd: ClientSubcommand) -> CliResult {
debug!("Starting api tasks"); debug!("Starting api tasks");
let (msg_tx, mut msg_rx) = mpsc::channel(1); let (msg_tx, mut msg_rx) = mpsc::channel(1);
let request_task = tokio::spawn(async move { let request_task = tokio::spawn(async move {
let mut rx = let mut rx = MsgReceiver::from_stdin()
MsgReceiver::from_stdin().into_rx::<Request<DistantMsg<DistantRequestData>>>(); .into_rx::<Request<protocol::Msg<protocol::Request>>>();
loop { loop {
match rx.recv().await { match rx.recv().await {
Some(Ok(request)) => { Some(Ok(request)) => {
@ -312,7 +309,7 @@ async fn async_run(cmd: ClientSubcommand) -> CliResult {
if ready.is_readable() { if ready.is_readable() {
match channel match channel
.try_read_frame_as::<Response<DistantMsg<DistantResponseData>>>() .try_read_frame_as::<Response<protocol::Msg<protocol::Response>>>()
{ {
Ok(Some(msg)) => tx.send_blocking(&msg)?, Ok(Some(msg)) => tx.send_blocking(&msg)?,
Ok(None) => break, Ok(None) => break,
@ -823,11 +820,11 @@ async fn async_run(cmd: ClientSubcommand) -> CliResult {
depth, absolute, canonicalize, include_root depth, absolute, canonicalize, include_root
); );
let results = channel let results = channel
.send(DistantMsg::Batch(vec![ .send(protocol::Msg::Batch(vec![
DistantRequestData::FileRead { protocol::Request::FileRead {
path: path.to_path_buf(), path: path.to_path_buf(),
}, },
DistantRequestData::DirRead { protocol::Request::DirRead {
path: path.to_path_buf(), path: path.to_path_buf(),
depth, depth,
absolute, absolute,
@ -847,7 +844,7 @@ async fn async_run(cmd: ClientSubcommand) -> CliResult {
.context("Got single response to batch request")? .context("Got single response to batch request")?
{ {
match response { match response {
DistantResponseData::DirEntries { entries, .. } => { protocol::Response::DirEntries { entries, .. } => {
#[derive(Tabled)] #[derive(Tabled)]
struct EntryRow { struct EntryRow {
ty: String, ty: String,
@ -874,14 +871,14 @@ async fn async_run(cmd: ClientSubcommand) -> CliResult {
out.flush().context("Failed to flush stdout")?; out.flush().context("Failed to flush stdout")?;
return Ok(()); return Ok(());
} }
DistantResponseData::Blob { data } => { protocol::Response::Blob { data } => {
let mut out = std::io::stdout(); let mut out = std::io::stdout();
out.write_all(&data) out.write_all(&data)
.context("Failed to write file contents to stdout")?; .context("Failed to write file contents to stdout")?;
out.flush().context("Failed to flush stdout")?; out.flush().context("Failed to flush stdout")?;
return Ok(()); return Ok(());
} }
DistantResponseData::Error(x) => errors.push(x), protocol::Response::Error(x) => errors.push(x),
_ => continue, _ => continue,
} }
} }
@ -1001,7 +998,7 @@ async fn async_run(cmd: ClientSubcommand) -> CliResult {
// TODO: Provide a cleaner way to print just a match // TODO: Provide a cleaner way to print just a match
let res = Response::new( let res = Response::new(
"".to_string(), "".to_string(),
DistantMsg::Single(DistantResponseData::SearchResults { protocol::Msg::Single(protocol::Response::SearchResults {
id: 0, id: 0,
matches: vec![m], matches: vec![m],
}), }),
@ -1053,7 +1050,7 @@ async fn async_run(cmd: ClientSubcommand) -> CliResult {
// TODO: Provide a cleaner way to print just a change // TODO: Provide a cleaner way to print just a change
let res = Response::new( let res = Response::new(
"".to_string(), "".to_string(),
DistantMsg::Single(DistantResponseData::Changed(change)), protocol::Msg::Single(protocol::Response::Changed(change)),
); );
formatter.print(res).context("Failed to print change")?; formatter.print(res).context("Failed to print change")?;

@ -1,7 +1,7 @@
use std::path::PathBuf; use std::path::PathBuf;
use anyhow::Context; use anyhow::Context;
use distant_core::data::PtySize; use distant_core::protocol::PtySize;
use distant_core::{DistantChannel, RemoteLspCommand}; use distant_core::{DistantChannel, RemoteLspCommand};
use terminal_size::{terminal_size, Height, Width}; use terminal_size::{terminal_size, Height, Width};

@ -2,7 +2,7 @@ use std::path::PathBuf;
use std::time::Duration; use std::time::Duration;
use anyhow::Context; use anyhow::Context;
use distant_core::data::{Environment, PtySize}; use distant_core::protocol::{Environment, PtySize};
use distant_core::{DistantChannel, DistantChannelExt, RemoteCommand}; use distant_core::{DistantChannel, DistantChannelExt, RemoteCommand};
use log::*; use log::*;
use terminal_size::{terminal_size, Height, Width}; use terminal_size::{terminal_size, Height, Width};

@ -2,11 +2,11 @@ use std::collections::HashMap;
use std::io::{self, Write}; use std::io::{self, Write};
use std::path::PathBuf; use std::path::PathBuf;
use distant_core::data::{
ChangeKind, DistantMsg, DistantResponseData, Error, FileType, Metadata,
SearchQueryContentsMatch, SearchQueryMatch, SearchQueryPathMatch, SystemInfo,
};
use distant_core::net::common::Response; use distant_core::net::common::Response;
use distant_core::protocol::{
self, ChangeKind, Error, FileType, Metadata, SearchQueryContentsMatch, SearchQueryMatch,
SearchQueryPathMatch, SystemInfo,
};
use log::*; use log::*;
use tabled::object::Rows; use tabled::object::Rows;
use tabled::style::Style; use tabled::style::Style;
@ -40,7 +40,7 @@ impl Formatter {
} }
/// Consumes the output message, printing it based on its configuration /// Consumes the output message, printing it based on its configuration
pub fn print(&mut self, res: Response<DistantMsg<DistantResponseData>>) -> io::Result<()> { pub fn print(&mut self, res: Response<protocol::Msg<protocol::Response>>) -> io::Result<()> {
let output = match self.format { let output = match self.format {
Format::Json => Output::StdoutLine( Format::Json => Output::StdoutLine(
serde_json::to_vec(&res) serde_json::to_vec(&res)
@ -120,15 +120,15 @@ enum Output {
None, None,
} }
fn format_shell(state: &mut FormatterState, data: DistantResponseData) -> Output { fn format_shell(state: &mut FormatterState, data: protocol::Response) -> Output {
match data { match data {
DistantResponseData::Ok => Output::None, protocol::Response::Ok => Output::None,
DistantResponseData::Error(Error { description, .. }) => { protocol::Response::Error(Error { description, .. }) => {
Output::StderrLine(description.into_bytes()) Output::StderrLine(description.into_bytes())
} }
DistantResponseData::Blob { data } => Output::StdoutLine(data), protocol::Response::Blob { data } => Output::StdoutLine(data),
DistantResponseData::Text { data } => Output::StdoutLine(data.into_bytes()), protocol::Response::Text { data } => Output::StdoutLine(data.into_bytes()),
DistantResponseData::DirEntries { entries, .. } => { protocol::Response::DirEntries { entries, .. } => {
#[derive(Tabled)] #[derive(Tabled)]
struct EntryRow { struct EntryRow {
ty: String, ty: String,
@ -151,7 +151,7 @@ fn format_shell(state: &mut FormatterState, data: DistantResponseData) -> Output
Output::Stdout(table) Output::Stdout(table)
} }
DistantResponseData::Changed(change) => Output::StdoutLine( protocol::Response::Changed(change) => Output::StdoutLine(
format!( format!(
"{}{}", "{}{}",
match change.kind { match change.kind {
@ -171,14 +171,14 @@ fn format_shell(state: &mut FormatterState, data: DistantResponseData) -> Output
) )
.into_bytes(), .into_bytes(),
), ),
DistantResponseData::Exists { value: exists } => { protocol::Response::Exists { value: exists } => {
if exists { if exists {
Output::StdoutLine(b"true".to_vec()) Output::StdoutLine(b"true".to_vec())
} else { } else {
Output::StdoutLine(b"false".to_vec()) Output::StdoutLine(b"false".to_vec())
} }
} }
DistantResponseData::Metadata(Metadata { protocol::Response::Metadata(Metadata {
canonicalized_path, canonicalized_path,
file_type, file_type,
len, len,
@ -278,11 +278,11 @@ fn format_shell(state: &mut FormatterState, data: DistantResponseData) -> Output
) )
.into_bytes(), .into_bytes(),
), ),
DistantResponseData::SearchStarted { id } => { protocol::Response::SearchStarted { id } => {
Output::StdoutLine(format!("Query {id} started").into_bytes()) Output::StdoutLine(format!("Query {id} started").into_bytes())
} }
DistantResponseData::SearchDone { .. } => Output::None, protocol::Response::SearchDone { .. } => Output::None,
DistantResponseData::SearchResults { matches, .. } => { protocol::Response::SearchResults { matches, .. } => {
let mut files: HashMap<_, Vec<String>> = HashMap::new(); let mut files: HashMap<_, Vec<String>> = HashMap::new();
let mut is_targeting_paths = false; let mut is_targeting_paths = false;
@ -340,10 +340,10 @@ fn format_shell(state: &mut FormatterState, data: DistantResponseData) -> Output
Output::None Output::None
} }
} }
DistantResponseData::ProcSpawned { .. } => Output::None, protocol::Response::ProcSpawned { .. } => Output::None,
DistantResponseData::ProcStdout { data, .. } => Output::Stdout(data), protocol::Response::ProcStdout { data, .. } => Output::Stdout(data),
DistantResponseData::ProcStderr { data, .. } => Output::Stderr(data), protocol::Response::ProcStderr { data, .. } => Output::Stderr(data),
DistantResponseData::ProcDone { id, success, code } => { protocol::Response::ProcDone { id, success, code } => {
if success { if success {
Output::None Output::None
} else if let Some(code) = code { } else if let Some(code) = code {
@ -352,7 +352,7 @@ fn format_shell(state: &mut FormatterState, data: DistantResponseData) -> Output
Output::StderrLine(format!("Proc {id} failed").into_bytes()) Output::StderrLine(format!("Proc {id} failed").into_bytes())
} }
} }
DistantResponseData::SystemInfo(SystemInfo { protocol::Response::SystemInfo(SystemInfo {
family, family,
os, os,
arch, arch,
@ -375,7 +375,7 @@ fn format_shell(state: &mut FormatterState, data: DistantResponseData) -> Output
) )
.into_bytes(), .into_bytes(),
), ),
DistantResponseData::Capabilities { supported } => { protocol::Response::Capabilities { supported } => {
#[derive(Tabled)] #[derive(Tabled)]
struct EntryRow { struct EntryRow {
kind: String, kind: String,

@ -4,7 +4,7 @@ use anyhow::Context;
use clap::CommandFactory; use clap::CommandFactory;
use clap_complete::generate as clap_generate; use clap_complete::generate as clap_generate;
use distant_core::net::common::{Request, Response}; use distant_core::net::common::{Request, Response};
use distant_core::{DistantMsg, DistantRequestData, DistantResponseData}; use distant_core::protocol;
use crate::options::{Config, GenerateSubcommand}; use crate::options::{Config, GenerateSubcommand};
use crate::{CliResult, Options}; use crate::{CliResult, Options};
@ -22,10 +22,10 @@ async fn async_run(cmd: GenerateSubcommand) -> CliResult {
GenerateSubcommand::Schema { file } => { GenerateSubcommand::Schema { file } => {
let request_schema = let request_schema =
serde_json::to_value(&Request::<DistantMsg<DistantRequestData>>::root_schema()) serde_json::to_value(&Request::<protocol::Msg<protocol::Request>>::root_schema())
.context("Failed to serialize request schema")?; .context("Failed to serialize request schema")?;
let response_schema = let response_schema =
serde_json::to_value(&Response::<DistantMsg<DistantResponseData>>::root_schema()) serde_json::to_value(&Response::<protocol::Msg<protocol::Response>>::root_schema())
.context("Failed to serialize response schema")?; .context("Failed to serialize response schema")?;
let schema = serde_json::json!({ let schema = serde_json::json!({

@ -5,9 +5,9 @@ use clap::builder::TypedValueParser as _;
use clap::{Parser, Subcommand, ValueEnum, ValueHint}; use clap::{Parser, Subcommand, ValueEnum, ValueHint};
use clap_complete::Shell as ClapCompleteShell; use clap_complete::Shell as ClapCompleteShell;
use derive_more::IsVariant; use derive_more::IsVariant;
use distant_core::data::{ChangeKind, Environment};
use distant_core::net::common::{ConnectionId, Destination, Map, PortRange}; use distant_core::net::common::{ConnectionId, Destination, Map, PortRange};
use distant_core::net::server::Shutdown; use distant_core::net::server::Shutdown;
use distant_core::protocol::{ChangeKind, Environment};
use service_manager::ServiceManagerKind; use service_manager::ServiceManagerKind;
use crate::constants; use crate::constants;

@ -1,8 +1,8 @@
use std::collections::HashSet; use std::collections::HashSet;
use clap::{Args, ValueEnum}; use clap::{Args, ValueEnum};
pub use distant_core::data::SearchQueryCondition as CliSearchQueryCondition; pub use distant_core::protocol::SearchQueryCondition as CliSearchQueryCondition;
use distant_core::data::{FileType, SearchQueryOptions, SearchQueryTarget}; use distant_core::protocol::{FileType, SearchQueryOptions, SearchQueryTarget};
/// Options to customize the search results. /// Options to customize the search results.
#[derive(Args, Clone, Debug, Default, PartialEq, Eq)] #[derive(Args, Clone, Debug, Default, PartialEq, Eq)]

@ -1,4 +1,4 @@
use distant_core::data::{Capabilities, Capability}; use distant_core::protocol::{Capabilities, Capability};
use rstest::*; use rstest::*;
use serde_json::json; use serde_json::json;
use test_log::test; use test_log::test;

Loading…
Cancel
Save