From 3ac715eaddaecb414d9ab387d77f84d96e99476b Mon Sep 17 00:00:00 2001 From: Chip Senkbeil Date: Mon, 12 Jun 2023 20:37:50 -0500 Subject: [PATCH] Refactor GenericServerRef to ServerRef --- CHANGELOG.md | 3 + distant-core/tests/api_tests.rs | 2 +- distant-net/src/server.rs | 4 +- distant-net/src/server/ref.rs | 79 ++------------------------- distant-net/src/server/ref/tcp.rs | 42 ++++++++++---- distant-net/src/server/ref/unix.rs | 39 +++++++++---- distant-net/src/server/ref/windows.rs | 37 +++++++++---- distant-net/tests/manager_tests.rs | 2 +- distant-net/tests/typed_tests.rs | 2 +- distant-net/tests/untyped_tests.rs | 2 +- distant-ssh2/src/lib.rs | 2 +- src/cli/commands/manager.rs | 8 +-- src/cli/commands/server.rs | 4 +- src/cli/common/manager.rs | 10 ++-- 14 files changed, 112 insertions(+), 124 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7bffc90..1fdda8e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `DistantApi` now handles batch requests in parallel, returning the results in order. To achieve the previous sequential processing of batch requests, the header value `sequence` needs to be set to true +- Rename `GenericServerRef` to `ServerRef` and remove `ServerRef` trait, + refactoring `TcpServerRef`, `UnixSocketServerRef`, and `WindowsPipeServerRef` + to use the struct instead of `Box` ## [0.20.0-alpha.8] diff --git a/distant-core/tests/api_tests.rs b/distant-core/tests/api_tests.rs index a1dbcda..79bc12b 100644 --- a/distant-core/tests/api_tests.rs +++ b/distant-core/tests/api_tests.rs @@ -13,7 +13,7 @@ use distant_net::server::{Server, ServerRef}; /// Stands up an inmemory client and server using the given api. async fn setup( api: impl DistantApi + Send + Sync + 'static, -) -> (DistantClient, Box) { +) -> (DistantClient, ServerRef) { let (t1, t2) = InmemoryTransport::pair(100); let server = Server::new() diff --git a/distant-net/src/server.rs b/distant-net/src/server.rs index 248a176..cdc32f4 100644 --- a/distant-net/src/server.rs +++ b/distant-net/src/server.rs @@ -148,7 +148,7 @@ where { /// Consumes the server, starting a task to process connections from the `listener` and /// returning a [`ServerRef`] that can be used to control the active server instance. - pub fn start(self, listener: L) -> io::Result> + pub fn start(self, listener: L) -> io::Result where L: Listener + 'static, L::Output: Transport + 'static, @@ -157,7 +157,7 @@ where let (tx, rx) = broadcast::channel(1); let task = tokio::spawn(self.task(Arc::clone(&state), listener, tx.clone(), rx)); - Ok(Box::new(GenericServerRef { shutdown: tx, task })) + Ok(ServerRef { shutdown: tx, task }) } /// Internal task that is run to receive connections and spawn connection tasks diff --git a/distant-net/src/server/ref.rs b/distant-net/src/server/ref.rs index 28fe704..eab5212 100644 --- a/distant-net/src/server/ref.rs +++ b/distant-net/src/server/ref.rs @@ -1,94 +1,27 @@ use std::future::Future; -use std::io; use std::pin::Pin; use std::task::{Context, Poll}; -use std::time::Duration; use tokio::sync::broadcast; use tokio::task::{JoinError, JoinHandle}; -use crate::common::AsAny; - -/// Interface to engage with a server instance. -pub trait ServerRef: AsAny + Send { - /// Returns true if the server is no longer running. - fn is_finished(&self) -> bool; - - /// Sends a shutdown signal to the server. - fn shutdown(&self); - - fn wait(self) -> Pin>>> - where - Self: Sized + 'static, - { - Box::pin(async { - let task = tokio::spawn(async move { - while !self.is_finished() { - tokio::time::sleep(Duration::from_millis(100)).await; - } - }); - task.await - .map_err(|x| io::Error::new(io::ErrorKind::Other, x)) - }) - } -} - -impl dyn ServerRef { - /// Attempts to convert this ref into a concrete ref by downcasting - pub fn as_server_ref(&self) -> Option<&R> { - self.as_any().downcast_ref::() - } - - /// Attempts to convert this mutable ref into a concrete mutable ref by downcasting - pub fn as_mut_server_ref(&mut self) -> Option<&mut R> { - self.as_mut_any().downcast_mut::() - } - - /// Attempts to convert this into a concrete, boxed ref by downcasting - pub fn into_boxed_server_ref( - self: Box, - ) -> Result, Box> { - self.into_any().downcast::() - } - - /// Waits for the server to complete by continuously polling the finished state. - pub async fn polling_wait(&self) -> io::Result<()> { - while !self.is_finished() { - tokio::time::sleep(Duration::from_millis(100)).await; - } - Ok(()) - } -} - -/// Represents a generic reference to a server -pub struct GenericServerRef { +/// Represents a reference to a server +pub struct ServerRef { pub(crate) shutdown: broadcast::Sender<()>, pub(crate) task: JoinHandle<()>, } -/// Runtime-specific implementation of [`ServerRef`] for a [`tokio::task::JoinHandle`] -impl ServerRef for GenericServerRef { - fn is_finished(&self) -> bool { +impl ServerRef { + pub fn is_finished(&self) -> bool { self.task.is_finished() } - fn shutdown(&self) { + pub fn shutdown(&self) { let _ = self.shutdown.send(()); } - - fn wait(self) -> Pin>>> - where - Self: Sized + 'static, - { - Box::pin(async { - self.task - .await - .map_err(|x| io::Error::new(io::ErrorKind::Other, x)) - }) - } } -impl Future for GenericServerRef { +impl Future for ServerRef { type Output = Result<(), JoinError>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { diff --git a/distant-net/src/server/ref/tcp.rs b/distant-net/src/server/ref/tcp.rs index f0ee233..2367f5f 100644 --- a/distant-net/src/server/ref/tcp.rs +++ b/distant-net/src/server/ref/tcp.rs @@ -1,36 +1,58 @@ +use std::future::Future; use std::net::IpAddr; +use std::ops::{Deref, DerefMut}; +use std::pin::Pin; +use std::task::{Context, Poll}; +use tokio::task::JoinError; use super::ServerRef; -/// Reference to a TCP server instance +/// Reference to a TCP server instance. pub struct TcpServerRef { pub(crate) addr: IpAddr, pub(crate) port: u16, - pub(crate) inner: Box, + pub(crate) inner: ServerRef, } impl TcpServerRef { - pub fn new(addr: IpAddr, port: u16, inner: Box) -> Self { + pub fn new(addr: IpAddr, port: u16, inner: ServerRef) -> Self { Self { addr, port, inner } } - /// Returns the IP address that the listener is bound to + /// Returns the IP address that the listener is bound to. pub fn ip_addr(&self) -> IpAddr { self.addr } - /// Returns the port that the listener is bound to + /// Returns the port that the listener is bound to. pub fn port(&self) -> u16 { self.port } + + /// Consumes ref, returning inner ref. + pub fn into_inner(self) -> ServerRef { + self.inner + } +} + +impl Future for TcpServerRef { + type Output = Result<(), JoinError>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + Pin::new(&mut self.inner.task).poll(cx) + } } -impl ServerRef for TcpServerRef { - fn is_finished(&self) -> bool { - self.inner.is_finished() +impl Deref for TcpServerRef { + type Target = ServerRef; + + fn deref(&self) -> &Self::Target { + &self.inner } +} - fn shutdown(&self) { - self.inner.shutdown(); +impl DerefMut for TcpServerRef { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.inner } } diff --git a/distant-net/src/server/ref/unix.rs b/distant-net/src/server/ref/unix.rs index 50bdb0f..d21d7ac 100644 --- a/distant-net/src/server/ref/unix.rs +++ b/distant-net/src/server/ref/unix.rs @@ -1,35 +1,52 @@ +use std::future::Future; +use std::ops::{Deref, DerefMut}; use std::path::{Path, PathBuf}; +use std::pin::Pin; +use std::task::{Context, Poll}; +use tokio::task::JoinError; use super::ServerRef; -/// Reference to a unix socket server instance +/// Reference to a unix socket server instance. pub struct UnixSocketServerRef { pub(crate) path: PathBuf, - pub(crate) inner: Box, + pub(crate) inner: ServerRef, } impl UnixSocketServerRef { - pub fn new(path: PathBuf, inner: Box) -> Self { + pub fn new(path: PathBuf, inner: ServerRef) -> Self { Self { path, inner } } - /// Returns the path to the socket + /// Returns the path to the socket. pub fn path(&self) -> &Path { &self.path } - /// Consumes ref, returning inner ref - pub fn into_inner(self) -> Box { + /// Consumes ref, returning inner ref. + pub fn into_inner(self) -> ServerRef { self.inner } } -impl ServerRef for UnixSocketServerRef { - fn is_finished(&self) -> bool { - self.inner.is_finished() +impl Future for UnixSocketServerRef { + type Output = Result<(), JoinError>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + Pin::new(&mut self.inner.task).poll(cx) + } +} + +impl Deref for UnixSocketServerRef { + type Target = ServerRef; + + fn deref(&self) -> &Self::Target { + &self.inner } +} - fn shutdown(&self) { - self.inner.shutdown(); +impl DerefMut for UnixSocketServerRef { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.inner } } diff --git a/distant-net/src/server/ref/windows.rs b/distant-net/src/server/ref/windows.rs index bf768e1..4a1b6ad 100644 --- a/distant-net/src/server/ref/windows.rs +++ b/distant-net/src/server/ref/windows.rs @@ -1,35 +1,52 @@ use std::ffi::{OsStr, OsString}; +use std::future::Future; +use std::ops::{Deref, DerefMut}; +use std::pin::Pin; +use std::task::{Context, Poll}; +use tokio::task::JoinError; use super::ServerRef; -/// Reference to a unix socket server instance +/// Reference to a windows pipe server instance. pub struct WindowsPipeServerRef { pub(crate) addr: OsString, pub(crate) inner: Box, } impl WindowsPipeServerRef { - pub fn new(addr: OsString, inner: Box) -> Self { + pub fn new(addr: OsString, inner: ServerRef) -> Self { Self { addr, inner } } - /// Returns the addr that the listener is bound to + /// Returns the addr that the listener is bound to. pub fn addr(&self) -> &OsStr { &self.addr } - /// Consumes ref, returning inner ref - pub fn into_inner(self) -> Box { + /// Consumes ref, returning inner ref. + pub fn into_inner(self) -> ServerRef { self.inner } } -impl ServerRef for WindowsPipeServerRef { - fn is_finished(&self) -> bool { - self.inner.is_finished() +impl Future for WindowsPipeServerRef { + type Output = Result<(), JoinError>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + Pin::new(&mut self.inner.task).poll(cx) + } +} + +impl Deref for WindowsPipeServerRef { + type Target = ServerRef; + + fn deref(&self) -> &Self::Target { + &self.inner } +} - fn shutdown(&self) { - self.inner.shutdown(); +impl DerefMut for WindowsPipeServerRef { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.inner } } diff --git a/distant-net/tests/manager_tests.rs b/distant-net/tests/manager_tests.rs index eed4c36..900ca4d 100644 --- a/distant-net/tests/manager_tests.rs +++ b/distant-net/tests/manager_tests.rs @@ -37,7 +37,7 @@ async fn should_be_able_to_establish_a_single_connection_and_communicate_with_a_ let (t1, t2) = InmemoryTransport::pair(100); // Spawn a server on one end and connect to it on the other - let _ = Server::new() + let _server = Server::new() .handler(TestServerHandler) .verifier(Verifier::none()) .start(OneshotListener::from_value(t2))?; diff --git a/distant-net/tests/typed_tests.rs b/distant-net/tests/typed_tests.rs index be19b11..bcfad76 100644 --- a/distant-net/tests/typed_tests.rs +++ b/distant-net/tests/typed_tests.rs @@ -30,7 +30,7 @@ impl ServerHandler for TestServerHandler { async fn should_be_able_to_send_and_receive_typed_payloads_between_client_and_server() { let (t1, t2) = InmemoryTransport::pair(100); - let _ = Server::new() + let _server = Server::new() .handler(TestServerHandler) .verifier(Verifier::none()) .start(OneshotListener::from_value(t2)) diff --git a/distant-net/tests/untyped_tests.rs b/distant-net/tests/untyped_tests.rs index af43d02..8a5d35b 100644 --- a/distant-net/tests/untyped_tests.rs +++ b/distant-net/tests/untyped_tests.rs @@ -30,7 +30,7 @@ impl ServerHandler for TestServerHandler { async fn should_be_able_to_send_and_receive_untyped_payloads_between_client_and_server() { let (t1, t2) = InmemoryTransport::pair(100); - let _ = Server::new() + let _server = Server::new() .handler(TestServerHandler) .verifier(Verifier::none()) .start(OneshotListener::from_value(t2)) diff --git a/distant-ssh2/src/lib.rs b/distant-ssh2/src/lib.rs index 7f93a42..f8cbd7d 100644 --- a/distant-ssh2/src/lib.rs +++ b/distant-ssh2/src/lib.rs @@ -722,7 +722,7 @@ impl Ssh { } /// Consumes [`Ssh`] and produces a [`DistantClient`] and [`ServerRef`] pair. - pub async fn into_distant_pair(self) -> io::Result<(DistantClient, Box)> { + pub async fn into_distant_pair(self) -> io::Result<(DistantClient, ServerRef)> { // Exit early if not authenticated as this is a requirement if !self.authenticated { return Err(io::Error::new( diff --git a/src/cli/commands/manager.rs b/src/cli/commands/manager.rs index 9897328..3eca61d 100644 --- a/src/cli/commands/manager.rs +++ b/src/cli/commands/manager.rs @@ -185,7 +185,7 @@ async fn async_run(cmd: ManagerSubcommand) -> CliResult { "global".to_string() } ); - let manager_ref = Manager { + let manager = Manager { access, config: NetManagerConfig { user, @@ -223,11 +223,7 @@ async fn async_run(cmd: ManagerSubcommand) -> CliResult { .context("Failed to start manager")?; // Let our server run to completion - manager_ref - .as_ref() - .polling_wait() - .await - .context("Failed to wait on manager")?; + manager.await.context("Failed to wait on manager")?; info!("Manager is shutting down"); Ok(()) diff --git a/src/cli/commands/server.rs b/src/cli/commands/server.rs index f385f6d..81f1d1b 100644 --- a/src/cli/commands/server.rs +++ b/src/cli/commands/server.rs @@ -3,7 +3,7 @@ use std::io::{self, Read, Write}; use anyhow::Context; use distant_core::net::auth::Verifier; use distant_core::net::common::{Host, SecretKey32}; -use distant_core::net::server::{Server, ServerConfig as NetServerConfig, ServerRef}; +use distant_core::net::server::{Server, ServerConfig as NetServerConfig}; use distant_core::DistantSingleKeyCredentials; use distant_local::{Config as LocalConfig, WatchConfig as LocalWatchConfig}; use log::*; @@ -212,7 +212,7 @@ async fn async_run(cmd: ServerSubcommand, _is_forked: bool) -> CliResult { } // Let our server run to completion - server.wait().await.context("Failed to wait on server")?; + server.await.context("Failed to wait on server")?; info!("Server is shutting down"); } } diff --git a/src/cli/common/manager.rs b/src/cli/common/manager.rs index 3060214..5aa4e25 100644 --- a/src/cli/common/manager.rs +++ b/src/cli/common/manager.rs @@ -15,7 +15,7 @@ pub struct Manager { impl Manager { /// Begin listening on the network interface specified within [`NetworkConfig`] - pub async fn listen(self) -> anyhow::Result> { + pub async fn listen(self) -> anyhow::Result { let user = self.config.user; #[cfg(unix)] @@ -36,7 +36,7 @@ impl Manager { .with_context(|| format!("Failed to create socket directory {parent:?}"))?; } - let boxed_ref = ManagerServer::new(self.config) + let server = ManagerServer::new(self.config) .verifier(Verifier::none()) .start( UnixSocketListener::bind_with_permissions(socket_path, self.access.into_mode()) @@ -45,7 +45,7 @@ impl Manager { .with_context(|| format!("Failed to start manager at socket {socket_path:?}"))?; info!("Manager listening using unix socket @ {:?}", socket_path); - Ok(boxed_ref) + Ok(server) } #[cfg(windows)] @@ -57,13 +57,13 @@ impl Manager { global_paths::WINDOWS_PIPE_NAME.as_str() }); - let boxed_ref = ManagerServer::new(self.config) + let server = ManagerServer::new(self.config) .verifier(Verifier::none()) .start(WindowsPipeListener::bind_local(pipe_name)?) .with_context(|| format!("Failed to start manager at pipe {pipe_name:?}"))?; info!("Manager listening using windows pipe @ {:?}", pipe_name); - Ok(boxed_ref) + Ok(server) } } }