Compare commits

...

6 Commits

67
Cargo.lock generated

@ -825,9 +825,12 @@ dependencies = [
"bitflags 2.4.0",
"bytes",
"derive_more",
"distant-core-client",
"distant-core-manager",
"distant-core-net",
"distant-core-plugin",
"distant-core-protocol",
"distant-core-server",
"distant-plugin",
"env_logger",
"futures",
"hex",
@ -857,6 +860,34 @@ dependencies = [
"tokio",
]
[[package]]
name = "distant-core-client"
version = "0.21.0"
dependencies = [
"async-trait",
"derive_more",
"distant-core-net",
"env_logger",
"log",
"serde",
"test-log",
"tokio",
]
[[package]]
name = "distant-core-manager"
version = "0.21.0"
dependencies = [
"async-trait",
"derive_more",
"distant-core-net",
"env_logger",
"log",
"serde",
"test-log",
"tokio",
]
[[package]]
name = "distant-core-net"
version = "0.21.0"
@ -889,16 +920,6 @@ dependencies = [
"tokio",
]
[[package]]
name = "distant-core-plugin"
version = "0.21.0"
dependencies = [
"async-trait",
"distant-core-auth",
"distant-core-protocol",
"serde",
]
[[package]]
name = "distant-core-protocol"
version = "0.21.0"
@ -916,6 +937,30 @@ dependencies = [
"strum",
]
[[package]]
name = "distant-core-server"
version = "0.21.0"
dependencies = [
"async-trait",
"derive_more",
"distant-core-net",
"env_logger",
"log",
"serde",
"test-log",
"tokio",
]
[[package]]
name = "distant-plugin"
version = "0.21.0"
dependencies = [
"async-trait",
"distant-core-auth",
"distant-core-protocol",
"serde",
]
[[package]]
name = "distant-plugin-local"
version = "0.21.0"

@ -15,9 +15,12 @@ license = "MIT OR Apache-2.0"
members = [
"distant-core",
"distant-core-auth",
"distant-core-client",
"distant-core-manager",
"distant-core-net",
"distant-core-plugin",
"distant-core-protocol",
"distant-core-server",
"distant-plugin",
"distant-plugin-local",
"distant-plugin-ssh",
]

@ -0,0 +1,24 @@
[package]
name = "distant-core-client"
description = "Core client library for distant, providing mechanisms to connect to a distant-compatible server"
categories = ["network-programming"]
keywords = ["client", "network", "distant"]
version = "0.21.0"
authors = ["Chip Senkbeil <chip@senkbeil.org>"]
edition = "2021"
homepage = "https://github.com/chipsenkbeil/distant"
repository = "https://github.com/chipsenkbeil/distant"
readme = "README.md"
license = "MIT OR Apache-2.0"
[dependencies]
async-trait = "0.1.68"
derive_more = { version = "0.99.17", default-features = false, features = ["display", "from", "error"] }
distant-core-net = { version = "=0.21.0", path = "../distant-core-net" }
log = "0.4.18"
serde = { version = "1.0.163", features = ["derive"] }
[dev-dependencies]
env_logger = "0.10.0"
test-log = "0.2.11"
tokio = { version = "1.28.2", features = ["full"] }

@ -0,0 +1,24 @@
[package]
name = "distant-core-manager"
description = "Core manager library for distant, providing client & server implementation of a manager"
categories = ["network-programming"]
keywords = ["client", "server", "manager", "network", "distant"]
version = "0.21.0"
authors = ["Chip Senkbeil <chip@senkbeil.org>"]
edition = "2021"
homepage = "https://github.com/chipsenkbeil/distant"
repository = "https://github.com/chipsenkbeil/distant"
readme = "README.md"
license = "MIT OR Apache-2.0"
[dependencies]
async-trait = "0.1.68"
derive_more = { version = "0.99.17", default-features = false, features = ["display", "from", "error"] }
distant-core-net = { version = "=0.21.0", path = "../distant-core-net" }
log = "0.4.18"
serde = { version = "1.0.163", features = ["derive"] }
[dev-dependencies]
env_logger = "0.10.0"
test-log = "0.2.11"
tokio = { version = "1.28.2", features = ["full"] }

@ -1,8 +1,8 @@
[package]
name = "distant-core-net"
description = "Core network library for distant, providing implementations to support client/server architecture"
description = "Core network library for distant, providing primitives for use in network communication"
categories = ["network-programming"]
keywords = ["api", "async"]
keywords = ["api", "async", "network", "primitives"]
version = "0.21.0"
authors = ["Chip Senkbeil <chip@senkbeil.org>"]
edition = "2021"

@ -1,19 +1,19 @@
use std::any::Any;
/// Trait used for casting support into the [`Any`] trait object
/// Trait used for casting support into the [`Any`] trait object.
pub trait AsAny: Any {
/// Converts reference to [`Any`]
/// Converts reference to [`Any`].
fn as_any(&self) -> &dyn Any;
/// Converts mutable reference to [`Any`]
/// Converts mutable reference to [`Any`].
fn as_mut_any(&mut self) -> &mut dyn Any;
/// Consumes and produces `Box<dyn Any>`
/// Consumes and produces `Box<dyn Any>`.
fn into_any(self: Box<Self>) -> Box<dyn Any>;
}
/// Blanket implementation that enables any `'static` reference to convert
/// to the [`Any`] type
/// to the [`Any`] type.
impl<T: 'static> AsAny for T {
fn as_any(&self) -> &dyn Any {
self

@ -5,7 +5,7 @@ use distant_core_auth::msg::*;
use distant_core_auth::{AuthHandler, Authenticate, Authenticator};
use log::*;
use crate::common::{utils, FramedTransport, Transport};
use crate::{utils, FramedTransport, Transport};
macro_rules! write_frame {
($transport:expr, $data:expr) => {{

@ -1,25 +0,0 @@
mod any;
mod connection;
mod destination;
mod key;
mod keychain;
mod listener;
mod map;
mod packet;
mod port;
mod transport;
pub(crate) mod utils;
mod version;
pub use any::*;
pub(crate) use connection::Connection;
pub use connection::ConnectionId;
pub use destination::*;
pub use key::*;
pub use keychain::*;
pub use listener::*;
pub use map::*;
pub use packet::*;
pub use port::*;
pub use transport::*;
pub use version::*;

@ -8,16 +8,16 @@ use serde::{Deserialize, Serialize};
use tokio::sync::oneshot;
#[cfg(test)]
use crate::common::InmemoryTransport;
use crate::common::{
use crate::InmemoryTransport;
use crate::{
Backup, FramedTransport, HeapSecretKey, Keychain, KeychainResult, Reconnectable, Transport,
TransportExt, Version,
};
/// Id of the connection
/// Id of the connection.
pub type ConnectionId = u32;
/// Represents a connection from either the client or server side
/// Represents a connection from either the client or server side.
#[derive(Debug)]
pub enum Connection<T> {
/// Connection from the client side
@ -179,7 +179,7 @@ where
}
}
/// Type of connection to perform
/// Type of connection to perform.
#[derive(Debug, Serialize, Deserialize)]
enum ConnectType {
/// Indicates that the connection from client to server is no and not a reconnection

@ -3,7 +3,7 @@ use std::sync::Arc;
use tokio::sync::RwLock;
use crate::common::HeapSecretKey;
use crate::HeapSecretKey;
/// Represents the result of a request to the database.
#[derive(Copy, Clone, Debug, PartialEq, Eq)]

@ -4,14 +4,28 @@
#[cfg(doctest)]
pub struct ReadmeDoctests;
mod any;
mod authentication;
pub mod client;
pub mod common;
pub mod manager;
pub mod server;
mod connection;
mod key;
mod keychain;
mod listener;
mod packet;
mod port;
mod transport;
pub(crate) mod utils;
mod version;
pub use any::*;
pub use connection::*;
pub use key::*;
pub use keychain::*;
pub use listener::*;
pub use packet::*;
pub use port::*;
pub use transport::*;
pub use version::*;
pub use client::{Client, ReconnectStrategy};
/// Authentication functionality tied to network operations.
pub use distant_core_auth as auth;
pub use server::Server;
pub use {log, paste};

@ -5,7 +5,7 @@ use async_trait::async_trait;
use tokio::net::TcpListener as TokioTcpListener;
use super::Listener;
use crate::common::{PortRange, TcpTransport};
use crate::{PortRange, TcpTransport};
/// Represents a [`Listener`] for incoming connections over TCP
pub struct TcpListener {

@ -6,7 +6,7 @@ use async_trait::async_trait;
use tokio::net::{UnixListener, UnixStream};
use super::Listener;
use crate::common::UnixSocketTransport;
use crate::UnixSocketTransport;
/// Represents a [`Listener`] for incoming connections over a Unix socket
pub struct UnixSocketListener {

@ -5,7 +5,7 @@ use async_trait::async_trait;
use tokio::net::windows::named_pipe::{NamedPipeServer, ServerOptions};
use super::Listener;
use crate::common::{NamedPipe, WindowsPipeTransport};
use crate::{NamedPipe, WindowsPipeTransport};
/// Represents a [`Listener`] for incoming connections over a named windows pipe
pub struct WindowsPipeListener {

@ -6,7 +6,7 @@ use derive_more::IntoIterator;
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use crate::common::{utils, Value};
use crate::{utils, Value};
/// Generates a new [`Header`] of key/value pairs based on literals.
///
@ -18,7 +18,7 @@ use crate::common::{utils, Value};
#[macro_export]
macro_rules! header {
($($key:literal -> $value:expr),* $(,)?) => {{
let mut _header = $crate::common::Header::default();
let mut _header = $crate::Header::default();
$(
_header.insert($key, $value);

@ -6,8 +6,7 @@ use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use super::{read_header_bytes, read_key_eq, read_str_bytes, Header, Id};
use crate::common::utils;
use crate::header;
use crate::{header, utils};
/// Represents a request to send
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]

@ -6,8 +6,7 @@ use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use super::{read_header_bytes, read_key_eq, read_str_bytes, Header, Id};
use crate::common::utils;
use crate::header;
use crate::{header, utils};
/// Represents a response received related to some response
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]

@ -5,7 +5,7 @@ use std::ops::{Deref, DerefMut};
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use crate::common::utils;
use crate::utils;
/// Generic value type for data passed through header.
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]

@ -9,7 +9,7 @@ use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use super::{InmemoryTransport, Interest, Ready, Reconnectable, Transport};
use crate::common::{utils, SecretKey32};
use crate::{utils, SecretKey32};
mod backup;
mod codec;

@ -3,7 +3,7 @@ use std::{fmt, io};
use derive_more::Display;
use super::{Codec, Frame};
use crate::common::{SecretKey, SecretKey32};
use crate::{SecretKey, SecretKey32};
/// Represents the type of encryption for a [`EncryptionCodec`]
#[derive(

@ -6,7 +6,7 @@ use p256::PublicKey;
use rand::rngs::OsRng;
use sha2::Sha256;
use crate::common::SecretKey32;
use crate::SecretKey32;
mod pkb;
pub use pkb::PublicKeyBytes;

@ -203,7 +203,7 @@ mod tests {
use test_log::test;
use super::*;
use crate::common::TransportExt;
use crate::TransportExt;
#[test]
fn is_rx_closed_should_properly_reflect_if_internal_rx_channel_is_closed() {

@ -79,7 +79,7 @@ mod tests {
use tokio::task::JoinHandle;
use super::*;
use crate::common::TransportExt;
use crate::TransportExt;
async fn find_ephemeral_addr() -> SocketAddr {
// Start a listener on a distinct port, get its port, and kill it

@ -69,7 +69,7 @@ mod tests {
use tokio::task::JoinHandle;
use super::*;
use crate::common::TransportExt;
use crate::TransportExt;
async fn start_and_run_server(tx: oneshot::Sender<PathBuf>) -> io::Result<()> {
// Generate a socket path and delete the file after so there is nothing there

@ -93,7 +93,7 @@ mod tests {
use tokio::task::JoinHandle;
use super::*;
use crate::common::TransportExt;
use crate::TransportExt;
async fn start_and_run_server(tx: oneshot::Sender<String>) -> io::Result<()> {
let pipe = start_server(tx).await?;

@ -1,29 +0,0 @@
/// Full API that represents a distant-compatible server.
pub trait Api {
type FileSystem: FileSystemApi;
type Process: ProcessApi;
type Search: SearchApi;
type SystemInfo: SystemInfoApi;
type Version: VersionApi;
}
/// API supporting filesystem operations.
pub trait FileSystemApi {}
/// API supporting process creation and manipulation.
pub trait ProcessApi {}
/// API supporting searching through the remote system.
pub trait SearchApi {}
/// API supporting retrieval of information about the remote system.
pub trait SystemInfoApi {}
/// API supporting retrieval of the server's version.
pub trait VersionApi {}
/// Generic struct that implements all APIs as unsupported.
pub struct Unsupported;
impl FileSystemApi for Unsupported {
}

@ -0,0 +1,24 @@
[package]
name = "distant-core-server"
description = "Core server library for distant, providing distant-compatible server implementation"
categories = ["network-programming"]
keywords = ["server", "network", "distant"]
version = "0.21.0"
authors = ["Chip Senkbeil <chip@senkbeil.org>"]
edition = "2021"
homepage = "https://github.com/chipsenkbeil/distant"
repository = "https://github.com/chipsenkbeil/distant"
readme = "README.md"
license = "MIT OR Apache-2.0"
[dependencies]
async-trait = "0.1.68"
derive_more = { version = "0.99.17", default-features = false, features = ["display", "from", "error"] }
distant-core-net = { version = "=0.21.0", path = "../distant-core-net" }
log = "0.4.18"
serde = { version = "1.0.163", features = ["derive"] }
[dev-dependencies]
env_logger = "0.10.0"
test-log = "0.2.11"
tokio = { version = "1.28.2", features = ["full"] }

@ -17,7 +17,10 @@ bitflags = "2.3.1"
bytes = "1.4.0"
derive_more = { version = "0.99.17", default-features = false, features = ["as_mut", "as_ref", "deref", "deref_mut", "display", "from", "error", "into", "into_iterator", "is_variant", "try_into"] }
distant-core-net = { version = "=0.21.0", path = "../distant-core-net" }
distant-core-plugin = { version = "=0.21.0", path = "../distant-core-plugin" }
distant-core-client = { version = "=0.21.0", path = "../distant-core-client" }
distant-core-manager = { version = "=0.21.0", path = "../distant-core-manager" }
distant-core-server = { version = "=0.21.0", path = "../distant-core-server" }
distant-plugin = { version = "=0.21.0", path = "../distant-plugin" }
distant-core-protocol = { version = "=0.21.0", path = "../distant-core-protocol" }
futures = "0.3.28"
hex = "0.4.3"

@ -1,6 +1,6 @@
[package]
name = "distant-core-plugin"
description = "Core plugin library for distant that provides everything needed to build a plugin"
name = "distant-plugin"
description = "Plugin library for distant that provides everything needed to build a plugin"
categories = []
keywords = []
version = "0.21.0"

@ -0,0 +1,251 @@
use std::io;
use std::path::PathBuf;
use async_trait::async_trait;
use distant_core_protocol::*;
mod checker;
mod ctx;
mod unsupported;
pub use checker::*;
pub use ctx::*;
pub use unsupported::*;
/// Full API that represents a distant-compatible server.
#[async_trait]
pub trait Api: Send + Sync {
/// Specific implementation of [`FileSystemApi`] associated with this [`Api`].
type FileSystem: FileSystemApi;
/// Specific implementation of [`ProcessApi`] associated with this [`Api`].
type Process: ProcessApi;
/// Specific implementation of [`SearchApi`] associated with this [`Api`].
type Search: SearchApi;
/// Specific implementation of [`SystemInfoApi`] associated with this [`Api`].
type SystemInfo: SystemInfoApi;
/// Specific implementation of [`VersionApi`] associated with this [`Api`].
type Version: VersionApi;
/// Specific implementation of [`WatchApi`] associated with this [`Api`].
type Watch: WatchApi;
/// Returns a reference to the [`FileSystemApi`] implementation tied to this [`Api`].
fn file_system(&self) -> &Self::FileSystem;
/// Returns a reference to the [`ProcessApi`] implementation tied to this [`Api`].
fn process(&self) -> &Self::Process;
/// Returns a reference to the [`SearchApi`] implementation tied to this [`Api`].
fn search(&self) -> &Self::Search;
/// Returns a reference to the [`SystemInfoApi`] implementation tied to this [`Api`].
fn system_info(&self) -> &Self::SystemInfo;
/// Returns a reference to the [`VersionApi`] implementation tied to this [`Api`].
fn version(&self) -> &Self::Version;
/// Returns a reference to the [`WatchApi`] implementation tied to this [`Api`].
fn watch(&self) -> &Self::Watch;
}
/// API supporting filesystem operations.
#[async_trait]
pub trait FileSystemApi: Send + Sync {
/// Reads bytes from a file.
///
/// * `path` - the path to the file
async fn read_file(&self, ctx: BoxedCtx, path: PathBuf) -> io::Result<Vec<u8>>;
/// Reads bytes from a file as text.
///
/// * `path` - the path to the file
async fn read_file_text(&self, ctx: BoxedCtx, path: PathBuf) -> io::Result<String>;
/// Writes bytes to a file, overwriting the file if it exists.
///
/// * `path` - the path to the file
/// * `data` - the data to write
async fn write_file(&self, ctx: BoxedCtx, path: PathBuf, data: Vec<u8>) -> io::Result<()>;
/// Writes text to a file, overwriting the file if it exists.
///
/// * `path` - the path to the file
/// * `data` - the data to write
async fn write_file_text(&self, ctx: BoxedCtx, path: PathBuf, data: String) -> io::Result<()>;
/// Writes bytes to the end of a file, creating it if it is missing.
///
/// * `path` - the path to the file
/// * `data` - the data to append
async fn append_file(&self, ctx: BoxedCtx, path: PathBuf, data: Vec<u8>) -> io::Result<()>;
/// Writes bytes to the end of a file, creating it if it is missing.
///
/// * `path` - the path to the file
/// * `data` - the data to append
async fn append_file_text(&self, ctx: BoxedCtx, path: PathBuf, data: String) -> io::Result<()>;
/// Reads entries from a directory.
///
/// * `path` - the path to the directory
/// * `depth` - how far to traverse the directory, 0 being unlimited
/// * `absolute` - if true, will return absolute paths instead of relative paths
/// * `canonicalize` - if true, will canonicalize entry paths before returned
/// * `include_root` - if true, will include the directory specified in the entries
async fn read_dir(
&self,
ctx: BoxedCtx,
path: PathBuf,
depth: usize,
absolute: bool,
canonicalize: bool,
include_root: bool,
) -> io::Result<(Vec<DirEntry>, Vec<io::Error>)>;
/// Creates a directory.
///
/// * `path` - the path to the directory
/// * `all` - if true, will create all missing parent components
async fn create_dir(&self, ctx: BoxedCtx, path: PathBuf, all: bool) -> io::Result<()>;
/// Copies some file or directory.
///
/// * `src` - the path to the file or directory to copy
/// * `dst` - the path where the copy will be placed
async fn copy(&self, ctx: BoxedCtx, src: PathBuf, dst: PathBuf) -> io::Result<()>;
/// Removes some file or directory.
///
/// * `path` - the path to a file or directory
/// * `force` - if true, will remove non-empty directories
async fn remove(&self, ctx: BoxedCtx, path: PathBuf, force: bool) -> io::Result<()>;
/// Renames some file or directory.
///
/// * `src` - the path to the file or directory to rename
/// * `dst` - the new name for the file or directory
async fn rename(&self, ctx: BoxedCtx, src: PathBuf, dst: PathBuf) -> io::Result<()>;
/// Checks if the specified path exists.
///
/// * `path` - the path to the file or directory
async fn exists(&self, ctx: BoxedCtx, path: PathBuf) -> io::Result<bool>;
/// Reads metadata for a file or directory.
///
/// * `path` - the path to the file or directory
/// * `canonicalize` - if true, will include a canonicalized path in the metadata
/// * `resolve_file_type` - if true, will resolve symlinks to underlying type (file or dir)
async fn metadata(
&self,
ctx: BoxedCtx,
path: PathBuf,
canonicalize: bool,
resolve_file_type: bool,
) -> io::Result<Metadata>;
/// Sets permissions for a file, directory, or symlink.
///
/// * `path` - the path to the file, directory, or symlink
/// * `resolve_symlink` - if true, will resolve the path to the underlying file/directory
/// * `permissions` - the new permissions to apply
async fn set_permissions(
&self,
ctx: BoxedCtx,
path: PathBuf,
permissions: Permissions,
options: SetPermissionsOptions,
) -> io::Result<()>;
}
/// API supporting process creation and manipulation.
#[async_trait]
pub trait ProcessApi: Send + Sync {
/// Spawns a new process, returning its id.
///
/// * `cmd` - the full command to run as a new process (including arguments)
/// * `environment` - the environment variables to associate with the process
/// * `current_dir` - the alternative current directory to use with the process
/// * `pty` - if provided, will run the process within a PTY of the given size
async fn proc_spawn(
&self,
ctx: BoxedCtx,
cmd: String,
environment: Environment,
current_dir: Option<PathBuf>,
pty: Option<PtySize>,
) -> io::Result<ProcessId>;
/// Kills a running process by its id.
///
/// * `id` - the unique id of the process
async fn proc_kill(&self, ctx: BoxedCtx, id: ProcessId) -> io::Result<()>;
/// Sends data to the stdin of the process with the specified id.
///
/// * `id` - the unique id of the process
/// * `data` - the bytes to send to stdin
async fn proc_stdin(&self, ctx: BoxedCtx, id: ProcessId, data: Vec<u8>) -> io::Result<()>;
/// Resizes the PTY of the process with the specified id.
///
/// * `id` - the unique id of the process
/// * `size` - the new size of the pty
async fn proc_resize_pty(&self, ctx: BoxedCtx, id: ProcessId, size: PtySize) -> io::Result<()>;
}
/// API supporting searching through the remote system.
#[async_trait]
pub trait SearchApi: Send + Sync {
/// Searches files for matches based on a query.
///
/// * `query` - the specific query to perform
async fn search(&self, ctx: BoxedCtx, query: SearchQuery) -> io::Result<SearchId>;
/// Cancels an actively-ongoing search.
///
/// * `id` - the id of the search to cancel
async fn cancel_search(&self, ctx: BoxedCtx, id: SearchId) -> io::Result<()>;
}
/// API supporting retrieval of information about the remote system.
#[async_trait]
pub trait SystemInfoApi: Send + Sync {
/// Retrieves information about the system.
async fn system_info(&self, ctx: BoxedCtx) -> io::Result<SystemInfo>;
}
/// API supporting retrieval of the server's version.
#[async_trait]
pub trait VersionApi: Send + Sync {
/// Retrieves information about the server's capabilities.
async fn version(&self, ctx: BoxedCtx) -> io::Result<Version>;
}
/// API supporting watching of changes to the remote filesystem.
#[async_trait]
pub trait WatchApi: Send + Sync {
/// Watches a file or directory for changes.
///
/// * `path` - the path to the file or directory
/// * `recursive` - if true, will watch for changes within subdirectories and beyond
/// * `only` - if non-empty, will limit reported changes to those included in this list
/// * `except` - if non-empty, will limit reported changes to those not included in this list
async fn watch(
&self,
ctx: BoxedCtx,
path: PathBuf,
recursive: bool,
only: Vec<ChangeKind>,
except: Vec<ChangeKind>,
) -> io::Result<()>;
/// Removes a file or directory from being watched.
///
/// * `path` - the path to the file or directory
async fn unwatch(&self, ctx: BoxedCtx, path: PathBuf) -> io::Result<()>;
}

@ -0,0 +1,68 @@
use std::any::TypeId;
use super::{Api, Unsupported};
/// Utility class to check if various APIs are supported.
pub struct Checker;
impl Checker {
/// Returns true if [`FileSystemApi`] is supported. This is checked by ensuring that the
/// implementation of the associated trait is not [`Unsupported`].
pub fn has_file_system_support<T>() -> bool
where
T: Api,
T::FileSystem: 'static,
{
TypeId::of::<T::FileSystem>() != TypeId::of::<Unsupported>()
}
/// Returns true if [`ProcessApi`] is supported. This is checked by ensuring that the
/// implementation of the associated trait is not [`Unsupported`].
pub fn has_process_support<T>() -> bool
where
T: Api,
T::Process: 'static,
{
TypeId::of::<T::Process>() != TypeId::of::<Unsupported>()
}
/// Returns true if [`SearchApi`] is supported. This is checked by ensuring that the
/// implementation of the associated trait is not [`Unsupported`].
pub fn has_search_support<T>() -> bool
where
T: Api,
T::Search: 'static,
{
TypeId::of::<T::Search>() != TypeId::of::<Unsupported>()
}
/// Returns true if [`SystemInfoApi`] is supported. This is checked by ensuring that the
/// implementation of the associated trait is not [`Unsupported`].
pub fn has_system_info_support<T>() -> bool
where
T: Api,
T::SystemInfo: 'static,
{
TypeId::of::<T::SystemInfo>() != TypeId::of::<Unsupported>()
}
/// Returns true if [`VersionApi`] is supported. This is checked by ensuring that the
/// implementation of the associated trait is not [`Unsupported`].
pub fn has_version_support<T>() -> bool
where
T: Api,
T::Version: 'static,
{
TypeId::of::<T::Version>() != TypeId::of::<Unsupported>()
}
/// Returns true if [`WatchApi`] is supported. This is checked by ensuring that the
/// implementation of the associated trait is not [`Unsupported`].
pub fn has_watch_support<T>() -> bool
where
T: Api,
T::Watch: 'static,
{
TypeId::of::<T::Watch>() != TypeId::of::<Unsupported>()
}
}

@ -0,0 +1,21 @@
use std::io;
use async_trait::async_trait;
use distant_core_protocol::Response;
/// Type abstraction of a boxed [`Ctx`].
pub type BoxedCtx = Box<dyn Ctx>;
/// Represents a context associated when an API request is being executed, supporting the ability
/// to send responses back asynchronously.
#[async_trait]
pub trait Ctx: Send {
/// Id of the connection associated with this context.
fn connection(&self) -> u32;
/// Clones context, returning a new boxed instance.
fn clone_ctx(&self) -> BoxedCtx;
/// Sends some response back.
fn send(&self, response: Response) -> io::Result<()>;
}

@ -0,0 +1,212 @@
use async_trait::async_trait;
use super::*;
#[inline]
fn unsupported<T>(label: &str) -> io::Result<T> {
Err(io::Error::new(
io::ErrorKind::Unsupported,
format!("{label} is unsupported"),
))
}
/// Generic struct that implements all APIs as unsupported.
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
pub struct Unsupported;
#[async_trait]
impl Api for Unsupported {
type FileSystem = Self;
type Process = Self;
type Search = Self;
type SystemInfo = Self;
type Version = Self;
type Watch = Self;
fn file_system(&self) -> &Self::FileSystem {
self
}
fn process(&self) -> &Self::Process {
self
}
fn search(&self) -> &Self::Search {
self
}
fn system_info(&self) -> &Self::SystemInfo {
self
}
fn version(&self) -> &Self::Version {
self
}
fn watch(&self) -> &Self::Watch {
self
}
}
#[async_trait]
impl FileSystemApi for Unsupported {
async fn read_file(&self, _ctx: BoxedCtx, _path: PathBuf) -> io::Result<Vec<u8>> {
unsupported("read_file")
}
async fn read_file_text(&self, _ctx: BoxedCtx, _path: PathBuf) -> io::Result<String> {
unsupported("read_file_text")
}
async fn write_file(&self, _ctx: BoxedCtx, _path: PathBuf, _data: Vec<u8>) -> io::Result<()> {
unsupported("write_file")
}
async fn write_file_text(
&self,
_ctx: BoxedCtx,
_path: PathBuf,
_data: String,
) -> io::Result<()> {
unsupported("write_file_text")
}
async fn append_file(&self, _ctx: BoxedCtx, _path: PathBuf, _data: Vec<u8>) -> io::Result<()> {
unsupported("append_file")
}
async fn append_file_text(
&self,
_ctx: BoxedCtx,
_path: PathBuf,
_data: String,
) -> io::Result<()> {
unsupported("append_file_text")
}
async fn read_dir(
&self,
_ctx: BoxedCtx,
_path: PathBuf,
_depth: usize,
_absolute: bool,
_canonicalize: bool,
_include_root: bool,
) -> io::Result<(Vec<DirEntry>, Vec<io::Error>)> {
unsupported("read_dir")
}
async fn create_dir(&self, _ctx: BoxedCtx, _path: PathBuf, _all: bool) -> io::Result<()> {
unsupported("create_dir")
}
async fn copy(&self, _ctx: BoxedCtx, _src: PathBuf, _dst: PathBuf) -> io::Result<()> {
unsupported("copy")
}
async fn remove(&self, _ctx: BoxedCtx, _path: PathBuf, _force: bool) -> io::Result<()> {
unsupported("remove")
}
async fn rename(&self, _ctx: BoxedCtx, _src: PathBuf, _dst: PathBuf) -> io::Result<()> {
unsupported("rename")
}
async fn exists(&self, _ctx: BoxedCtx, _path: PathBuf) -> io::Result<bool> {
unsupported("exists")
}
async fn metadata(
&self,
_ctx: BoxedCtx,
_path: PathBuf,
_canonicalize: bool,
_resolve_file_type: bool,
) -> io::Result<Metadata> {
unsupported("metadata")
}
async fn set_permissions(
&self,
_ctx: BoxedCtx,
_path: PathBuf,
_permissions: Permissions,
_options: SetPermissionsOptions,
) -> io::Result<()> {
unsupported("set_permissions")
}
}
#[async_trait]
impl ProcessApi for Unsupported {
async fn proc_spawn(
&self,
_ctx: BoxedCtx,
_cmd: String,
_environment: Environment,
_current_dir: Option<PathBuf>,
_pty: Option<PtySize>,
) -> io::Result<ProcessId> {
unsupported("proc_spawn")
}
async fn proc_kill(&self, _ctx: BoxedCtx, _id: ProcessId) -> io::Result<()> {
unsupported("proc_kill")
}
async fn proc_stdin(&self, _ctx: BoxedCtx, _id: ProcessId, _data: Vec<u8>) -> io::Result<()> {
unsupported("proc_stdin")
}
async fn proc_resize_pty(
&self,
_ctx: BoxedCtx,
_id: ProcessId,
_size: PtySize,
) -> io::Result<()> {
unsupported("proc_resize_pty")
}
}
#[async_trait]
impl SearchApi for Unsupported {
async fn search(&self, _ctx: BoxedCtx, _query: SearchQuery) -> io::Result<SearchId> {
unsupported("search")
}
async fn cancel_search(&self, _ctx: BoxedCtx, _id: SearchId) -> io::Result<()> {
unsupported("cancel_search")
}
}
#[async_trait]
impl SystemInfoApi for Unsupported {
async fn system_info(&self, _ctx: BoxedCtx) -> io::Result<SystemInfo> {
unsupported("system_info")
}
}
#[async_trait]
impl VersionApi for Unsupported {
async fn version(&self, _ctx: BoxedCtx) -> io::Result<Version> {
unsupported("version")
}
}
#[async_trait]
impl WatchApi for Unsupported {
async fn watch(
&self,
_ctx: BoxedCtx,
_path: PathBuf,
_recursive: bool,
_only: Vec<ChangeKind>,
_except: Vec<ChangeKind>,
) -> io::Result<()> {
unsupported("watch")
}
async fn unwatch(&self, _ctx: BoxedCtx, _path: PathBuf) -> io::Result<()> {
unsupported("unwatch")
}
}

@ -0,0 +1,307 @@
use std::io;
use std::sync::{mpsc, Arc};
use async_trait::async_trait;
use distant_core_protocol::{Error, Request, Response};
use crate::api::{
Api, Ctx, FileSystemApi, ProcessApi, SearchApi, SystemInfoApi, VersionApi, WatchApi,
};
pub type BoxedClient = Box<dyn Client>;
/// Full API for a distant-compatible client.
#[async_trait]
pub trait Client {
/// Sends a request without waiting for a response; this method is able to be used even
/// if the session's receiving line to the remote server has been severed.
async fn fire(&mut self, request: Request) -> io::Result<()>;
/// Sends a request and returns a mailbox that can receive one or more responses, failing if
/// unable to send a request or if the session's receiving line to the remote server has
/// already been severed.
async fn mail(&mut self, request: Request) -> io::Result<mpsc::Receiver<Response>>;
/// Sends a request and waits for a response, failing if unable to send a request or if
/// the session's receiving line to the remote server has already been severed
async fn send(&mut self, request: Request) -> io::Result<Response>;
}
/// Represents a bridge between a [`Client`] and an [`Api`] implementation that maps requests to
/// the API and forwards responses back.
///
/// This can be used to run an Api implementation locally, such as when you want to translate some
/// other platform (e.g. ssh, docker) into a distant-compatible form.
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct ClientBridge<T: Api> {
api: Arc<T>,
}
impl<T: Api> ClientBridge<T> {
/// Creates a new bridge wrapping around the provided api.
pub fn new(api: T) -> Self {
Self { api: Arc::new(api) }
}
}
#[async_trait]
impl<T: Api> Client for ClientBridge<T> {
async fn fire(&mut self, request: Request) -> io::Result<()> {
let _ = self.send(request).await?;
Ok(())
}
async fn mail(&mut self, request: Request) -> io::Result<mpsc::Receiver<Response>> {
#[derive(Clone, Debug)]
struct __Ctx(u32, mpsc::Sender<Response>);
impl Ctx for __Ctx {
fn connection(&self) -> u32 {
self.0
}
fn clone_ctx(&self) -> Box<dyn Ctx> {
Box::new(__Ctx(self.0, self.1.clone()))
}
fn send(&self, response: Response) -> io::Result<()> {
self.1
.send(response)
.map_err(|_| io::Error::new(io::ErrorKind::Other, "Bridge has closed"))
}
}
// TODO: Do we give this some unique id? We could randomize it, but would need the
// random crate to do so. Is that even necessary given this represents a "connection"
// and the likelihood that someone creates multiple bridges to the same api is minimal?
let (tx, rx) = mpsc::channel();
let ctx = Box::new(__Ctx(0, tx));
// TODO: This is blocking! How can we make this async? Do we REALLY need to import tokio?
//
// We would need to import tokio to spawn a task to run this...
//
// Alternatively, we could make some sort of trait that is a task queuer that is
// also passed to the bridge and is used to abstract the tokio spawn. Tokio itself
// can implement that trait by creating some newtype that just uses tokio spawn underneath
let _response = handle_request(Arc::clone(&self.api), ctx, request).await;
Ok(rx)
}
async fn send(&mut self, request: Request) -> io::Result<Response> {
let rx = self.mail(request).await?;
// TODO: This is blocking! How can we make this async? Do we REALLY need to import tokio?
//
// If we abstract the mpsc::Receiver to be async, we can make this async without using
// tokio runtime directly. The mail function would return a boxed version of this trait
// and we can await on it like usual
rx.recv()
.map_err(|_| io::Error::new(io::ErrorKind::Other, "Bridge has closed"))
}
}
/// Processes an incoming request.
async fn handle_request<T>(api: Arc<T>, ctx: Box<dyn Ctx>, request: Request) -> Response
where
T: Api,
{
match request {
Request::Version {} => {
let api = api.version();
api.version(ctx)
.await
.map(Response::Version)
.unwrap_or_else(Response::from)
}
Request::FileRead { path } => {
let api = api.file_system();
api.read_file(ctx, path)
.await
.map(|data| Response::Blob { data })
.unwrap_or_else(Response::from)
}
Request::FileReadText { path } => {
let api = api.file_system();
api.read_file_text(ctx, path)
.await
.map(|data| Response::Text { data })
.unwrap_or_else(Response::from)
}
Request::FileWrite { path, data } => {
let api = api.file_system();
api.write_file(ctx, path, data)
.await
.map(|_| Response::Ok)
.unwrap_or_else(Response::from)
}
Request::FileWriteText { path, text } => {
let api = api.file_system();
api.write_file_text(ctx, path, text)
.await
.map(|_| Response::Ok)
.unwrap_or_else(Response::from)
}
Request::FileAppend { path, data } => {
let api = api.file_system();
api.append_file(ctx, path, data)
.await
.map(|_| Response::Ok)
.unwrap_or_else(Response::from)
}
Request::FileAppendText { path, text } => {
let api = api.file_system();
api.append_file_text(ctx, path, text)
.await
.map(|_| Response::Ok)
.unwrap_or_else(Response::from)
}
Request::DirRead {
path,
depth,
absolute,
canonicalize,
include_root,
} => {
let api = api.file_system();
api.read_dir(ctx, path, depth, absolute, canonicalize, include_root)
.await
.map(|(entries, errors)| Response::DirEntries {
entries,
errors: errors.into_iter().map(Error::from).collect(),
})
.unwrap_or_else(Response::from)
}
Request::DirCreate { path, all } => {
let api = api.file_system();
api.create_dir(ctx, path, all)
.await
.map(|_| Response::Ok)
.unwrap_or_else(Response::from)
}
Request::Remove { path, force } => {
let api = api.file_system();
api.remove(ctx, path, force)
.await
.map(|_| Response::Ok)
.unwrap_or_else(Response::from)
}
Request::Copy { src, dst } => {
let api = api.file_system();
api.copy(ctx, src, dst)
.await
.map(|_| Response::Ok)
.unwrap_or_else(Response::from)
}
Request::Rename { src, dst } => {
let api = api.file_system();
api.rename(ctx, src, dst)
.await
.map(|_| Response::Ok)
.unwrap_or_else(Response::from)
}
Request::Watch {
path,
recursive,
only,
except,
} => {
let api = api.watch();
api.watch(ctx, path, recursive, only, except)
.await
.map(|_| Response::Ok)
.unwrap_or_else(Response::from)
}
Request::Unwatch { path } => {
let api = api.watch();
api.unwatch(ctx, path)
.await
.map(|_| Response::Ok)
.unwrap_or_else(Response::from)
}
Request::Exists { path } => {
let api = api.file_system();
api.exists(ctx, path)
.await
.map(|value| Response::Exists { value })
.unwrap_or_else(Response::from)
}
Request::Metadata {
path,
canonicalize,
resolve_file_type,
} => {
let api = api.file_system();
api.metadata(ctx, path, canonicalize, resolve_file_type)
.await
.map(Response::Metadata)
.unwrap_or_else(Response::from)
}
Request::SetPermissions {
path,
permissions,
options,
} => {
let api = api.file_system();
api.set_permissions(ctx, path, permissions, options)
.await
.map(|_| Response::Ok)
.unwrap_or_else(Response::from)
}
Request::Search { query } => {
let api = api.search();
api.search(ctx, query)
.await
.map(|id| Response::SearchStarted { id })
.unwrap_or_else(Response::from)
}
Request::CancelSearch { id } => {
let api = api.search();
api.cancel_search(ctx, id)
.await
.map(|_| Response::Ok)
.unwrap_or_else(Response::from)
}
Request::ProcSpawn {
cmd,
environment,
current_dir,
pty,
} => {
let api = api.process();
api.proc_spawn(ctx, cmd.into(), environment, current_dir, pty)
.await
.map(|id| Response::ProcSpawned { id })
.unwrap_or_else(Response::from)
}
Request::ProcKill { id } => {
let api = api.process();
api.proc_kill(ctx, id)
.await
.map(|_| Response::Ok)
.unwrap_or_else(Response::from)
}
Request::ProcStdin { id, data } => {
let api = api.process();
api.proc_stdin(ctx, id, data)
.await
.map(|_| Response::Ok)
.unwrap_or_else(Response::from)
}
Request::ProcResizePty { id, size } => {
let api = api.process();
api.proc_resize_pty(ctx, id, size)
.await
.map(|_| Response::Ok)
.unwrap_or_else(Response::from)
}
Request::SystemInfo {} => {
let api = api.system_info();
api.system_info(ctx)
.await
.map(Response::SystemInfo)
.unwrap_or_else(Response::from)
}
}
}

Some files were not shown because too many files have changed in this diff Show More

Loading…
Cancel
Save