Compare commits

...

6 Commits

@ -511,7 +511,7 @@ mod tests {
use test_log::test;
use super::*;
use crate::common::Frame;
use crate::Frame;
macro_rules! server_version {
() => {

@ -74,7 +74,7 @@ mod tests {
use tokio::task::JoinHandle;
use super::*;
use crate::common::TransportExt;
use crate::TransportExt;
#[test(tokio::test)]
async fn should_fail_to_bind_if_port_already_bound() {

@ -100,7 +100,7 @@ mod tests {
use tokio::task::JoinHandle;
use super::*;
use crate::common::TransportExt;
use crate::TransportExt;
#[test(tokio::test)]
async fn should_succeed_to_bind_if_file_exists_at_path_but_nothing_listening() {

@ -71,7 +71,7 @@ mod tests {
use tokio::task::JoinHandle;
use super::*;
use crate::common::TransportExt;
use crate::TransportExt;
#[test(tokio::test)]
async fn should_fail_to_bind_if_pipe_already_bound() {

@ -915,7 +915,7 @@ mod tests {
use test_log::test;
use super::*;
use crate::common::TestTransport;
use crate::TestTransport;
/// Codec that always succeeds without altering the frame
#[derive(Clone, Debug, PartialEq, Eq)]

@ -15,7 +15,7 @@ impl Version {
/// major version is `0`.
///
/// ```
/// use distant_core_net::common::Version;
/// use distant_core_net::Version;
///
/// // Matching versions are compatible
/// let a = Version::new(1, 2, 3);

@ -8,29 +8,29 @@ use tokio::sync::mpsc;
use crate::api::{
Api, Ctx, FileSystemApi, ProcessApi, SearchApi, SystemInfoApi, VersionApi, WatchApi,
};
pub type BoxedClient = Box<dyn Client>;
use crate::common::Stream;
/// Full API for a distant-compatible client.
#[async_trait]
pub trait Client {
/// Sends a request and returns a stream of responses, failing if unable to send a request or
/// if the session's receiving line to the remote server has already been severed.
async fn mail(&mut self, request: Request) -> io::Result<mpsc::UnboundedReceiver<Response>>;
async fn send(&mut self, request: Request) -> io::Result<Box<dyn Stream<Item = Response>>>;
/// Sends a request and waits for a response, failing if unable to send a request or if
/// Sends a request and waits for a single response, failing if unable to send a request or if
/// the session's receiving line to the remote server has already been severed.
async fn send(&mut self, request: Request) -> io::Result<Response> {
let mut rx = self.mail(request).await?;
rx.recv()
async fn ask(&mut self, request: Request) -> io::Result<Response> {
self.send(request)
.await?
.next()
.await
.ok_or_else(|| io::Error::new(io::ErrorKind::Other, "Channel has closed"))
.ok_or_else(|| io::Error::new(io::ErrorKind::Other, "Stream has closed"))
}
/// Sends a request without waiting for a response; this method is able to be used even
/// Sends a request without waiting for any response; this method is able to be used even
/// if the session's receiving line to the remote server has been severed.
async fn fire(&mut self, request: Request) -> io::Result<()> {
let _ = self.send(request).await?;
let _ = self.ask(request).await?;
Ok(())
}
}
@ -54,8 +54,7 @@ impl<T: Api> ClientBridge<T> {
#[async_trait]
impl<T: Api + 'static> Client for ClientBridge<T> {
async fn mail(&mut self, request: Request) -> io::Result<mpsc::UnboundedReceiver<Response>> {
#[derive(Clone, Debug)]
async fn send(&mut self, request: Request) -> io::Result<Box<dyn Stream<Item = Response>>> {
struct __Ctx(u32, mpsc::UnboundedSender<Response>);
impl Ctx for __Ctx {
@ -77,6 +76,9 @@ impl<T: Api + 'static> Client for ClientBridge<T> {
let (tx, rx) = mpsc::unbounded_channel();
let ctx = Box::new(__Ctx(rand::random(), tx));
// Spawn a task that will perform the request using the api. We use a task to allow the
// async engine to not get blocked awaiting immediately for the response to arrive before
// returning the stream of responses.
tokio::task::spawn({
let api = Arc::clone(&self.api);
async move {
@ -89,7 +91,7 @@ impl<T: Api + 'static> Client for ClientBridge<T> {
}
});
Ok(rx)
Ok(Box::new(rx))
}
}

@ -1,6 +1,8 @@
mod destination;
mod map;
mod stream;
mod utils;
pub use destination::{Destination, Host, HostParseError};
pub use map::{Map, MapParseError};
pub use stream::Stream;

@ -0,0 +1,30 @@
use async_trait::async_trait;
use tokio::sync::mpsc;
/// Interface to an asynchronous stream of items.
#[async_trait]
pub trait Stream: Send {
type Item: Send;
/// Retrieves the next item from the stream, returning `None` if no more items are available
/// from the stream.
async fn next(&mut self) -> Option<Self::Item>;
}
#[async_trait]
impl<T: Send> Stream for mpsc::UnboundedReceiver<T> {
type Item = T;
async fn next(&mut self) -> Option<Self::Item> {
self.recv().await
}
}
#[async_trait]
impl<T: Send> Stream for mpsc::Receiver<T> {
type Item = T;
async fn next(&mut self) -> Option<Self::Item> {
self.recv().await
}
}

@ -7,12 +7,6 @@ use distant_core_auth::Authenticator;
use crate::client::Client;
use crate::common::{Destination, Map};
/// Boxed [`LaunchHandler`].
pub type BoxedLaunchHandler = Box<dyn LaunchHandler>;
/// Boxed [`ConnectHandler`].
pub type BoxedConnectHandler = Box<dyn ConnectHandler>;
/// Interface for a handler to launch a server, returning the destination to the server.
#[async_trait]
pub trait LaunchHandler: Send + Sync {
@ -67,7 +61,7 @@ where
#[macro_export]
macro_rules! boxed_launch_handler {
(|$destination:ident, $options:ident, $authenticator:ident| $(async)? $body:block) => {{
let x: $crate::handlers::BoxedLaunchHandler = Box::new(
let x: Box<dyn $crate::handlers::LaunchHandler> = Box::new(
|$destination: &$crate::common::Destination,
$options: &$crate::common::Map,
$authenticator: &mut dyn $crate::auth::Authenticator| async { $body },
@ -75,7 +69,7 @@ macro_rules! boxed_launch_handler {
x
}};
(move |$destination:ident, $options:ident, $authenticator:ident| $(async)? $body:block) => {{
let x: $crate::handlers::BoxedLaunchHandler = Box::new(
let x: Box<dyn $crate::handlers::LaunchHandler> = Box::new(
move |$destination: &$crate::common::Destination,
$options: &$crate::common::Map,
$authenticator: &mut dyn $crate::auth::Authenticator| async move { $body },
@ -138,7 +132,7 @@ where
#[macro_export]
macro_rules! boxed_connect_handler {
(|$destination:ident, $options:ident, $authenticator:ident| $(async)? $body:block) => {{
let x: $crate::handlers::BoxedConnectHandler = Box::new(
let x: Box<dyn $crate::handlers::ConnectHandler> = Box::new(
|$destination: &$crate::common::Destination,
$options: &$crate::common::Map,
$authenticator: &mut dyn $crate::auth::Authenticator| async { $body },
@ -146,7 +140,7 @@ macro_rules! boxed_connect_handler {
x
}};
(move |$destination:ident, $options:ident, $authenticator:ident| $(async)? $body:block) => {{
let x: $crate::handlers::BoxedConnectHandler = Box::new(
let x: Box<dyn $crate::handlers::ConnectHandler> = Box::new(
move |$destination: &$crate::common::Destination,
$options: &$crate::common::Map,
$authenticator: &mut dyn $crate::auth::Authenticator| async move { $body },

@ -11,3 +11,110 @@ pub mod handlers;
pub use distant_core_auth as auth;
pub use distant_core_protocol as protocol;
/// Interface to a plugin that can register new handlers for launching and connecting to
/// distant-compatible servers.
pub trait Plugin {
/// Returns a unique name associated with the plugin.
fn name(&self) -> &'static str;
/// Invoked immediately after the plugin is loaded. Used for initialization.
#[allow(unused_variables)]
fn on_load(&self, registry: &mut PluginRegistry) {}
/// Invoked immediately before the plugin is unloaded. Used for deallocation of resources.
fn on_unload(&self) {}
}
/// Registry that contains various handlers and other information tied to plugins.
#[derive(Default)]
pub struct PluginRegistry {
/// Names of loaded plugins.
loaded: Vec<&'static str>,
/// Launch handlers registered by plugins, keyed by scheme.
launch_handlers: std::collections::HashMap<String, Box<dyn handlers::LaunchHandler>>,
/// Connect handlers registered by plugins, keyed by scheme.
connect_handlers: std::collections::HashMap<String, Box<dyn handlers::ConnectHandler>>,
}
impl PluginRegistry {
pub fn new() -> Self {
Self::default()
}
/// Returns a list of plugin names associated with this registry.
pub fn plugin_names(&self) -> &[&'static str] {
&self.loaded
}
/// Inserts the name of the plugin into the registry. If it already exists, nothing happens.
pub fn insert_plugin_name(&mut self, name: &'static str) {
if !self.loaded.contains(&name) {
self.loaded.push(name);
}
}
/// Returns a reference to the launch handler associated with the `scheme` if one exists.
pub fn launch_handler(&self, scheme: impl AsRef<str>) -> Option<&dyn handlers::LaunchHandler> {
self.launch_handlers
.get(scheme.as_ref())
.map(|x| x.as_ref())
}
/// Inserts a new `handler` for `scheme`. Returns true if successfully inserted, otherwise
/// false if the scheme is already taken.
pub fn insert_launch_handler(
&mut self,
scheme: impl Into<String>,
handler: impl handlers::LaunchHandler + 'static,
) -> bool {
use std::collections::hash_map::Entry;
let scheme = scheme.into();
if let Entry::Vacant(e) = self.launch_handlers.entry(scheme) {
e.insert(Box::new(handler));
true
} else {
false
}
}
/// Returns a reference to the connect handler associated with the `scheme` if one exists.
pub fn connect_handler(
&self,
scheme: impl AsRef<str>,
) -> Option<&dyn handlers::ConnectHandler> {
self.connect_handlers
.get(scheme.as_ref())
.map(|x| x.as_ref())
}
/// Inserts a new `handler` for `scheme`. Returns true if successfully inserted, otherwise
/// false if the scheme is already taken.
pub fn insert_connect_handler(
&mut self,
scheme: impl Into<String>,
handler: impl handlers::ConnectHandler + 'static,
) -> bool {
use std::collections::hash_map::Entry;
let scheme = scheme.into();
if let Entry::Vacant(e) = self.connect_handlers.entry(scheme) {
e.insert(Box::new(handler));
true
} else {
false
}
}
}
impl std::fmt::Debug for PluginRegistry {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PluginHandlerRegistry")
.field("launch_handlers", &self.launch_handlers.keys())
.field("connect_handlers", &self.connect_handlers.keys())
.finish()
}
}

Loading…
Cancel
Save