Refactor GenericServerRef to ServerRef

unused/TracingSupport
Chip Senkbeil 12 months ago
parent 569052d0cd
commit 3ac715eadd
No known key found for this signature in database
GPG Key ID: 35EF1F8EC72A4131

@ -24,6 +24,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- `DistantApi` now handles batch requests in parallel, returning the results in - `DistantApi` now handles batch requests in parallel, returning the results in
order. To achieve the previous sequential processing of batch requests, the order. To achieve the previous sequential processing of batch requests, the
header value `sequence` needs to be set to true header value `sequence` needs to be set to true
- Rename `GenericServerRef` to `ServerRef` and remove `ServerRef` trait,
refactoring `TcpServerRef`, `UnixSocketServerRef`, and `WindowsPipeServerRef`
to use the struct instead of `Box<dyn ServerRef>`
## [0.20.0-alpha.8] ## [0.20.0-alpha.8]

@ -13,7 +13,7 @@ use distant_net::server::{Server, ServerRef};
/// Stands up an inmemory client and server using the given api. /// Stands up an inmemory client and server using the given api.
async fn setup( async fn setup(
api: impl DistantApi<LocalData = ()> + Send + Sync + 'static, api: impl DistantApi<LocalData = ()> + Send + Sync + 'static,
) -> (DistantClient, Box<dyn ServerRef>) { ) -> (DistantClient, ServerRef) {
let (t1, t2) = InmemoryTransport::pair(100); let (t1, t2) = InmemoryTransport::pair(100);
let server = Server::new() let server = Server::new()

@ -148,7 +148,7 @@ where
{ {
/// Consumes the server, starting a task to process connections from the `listener` and /// Consumes the server, starting a task to process connections from the `listener` and
/// returning a [`ServerRef`] that can be used to control the active server instance. /// returning a [`ServerRef`] that can be used to control the active server instance.
pub fn start<L>(self, listener: L) -> io::Result<Box<dyn ServerRef>> pub fn start<L>(self, listener: L) -> io::Result<ServerRef>
where where
L: Listener + 'static, L: Listener + 'static,
L::Output: Transport + 'static, L::Output: Transport + 'static,
@ -157,7 +157,7 @@ where
let (tx, rx) = broadcast::channel(1); let (tx, rx) = broadcast::channel(1);
let task = tokio::spawn(self.task(Arc::clone(&state), listener, tx.clone(), rx)); let task = tokio::spawn(self.task(Arc::clone(&state), listener, tx.clone(), rx));
Ok(Box::new(GenericServerRef { shutdown: tx, task })) Ok(ServerRef { shutdown: tx, task })
} }
/// Internal task that is run to receive connections and spawn connection tasks /// Internal task that is run to receive connections and spawn connection tasks

@ -1,94 +1,27 @@
use std::future::Future; use std::future::Future;
use std::io;
use std::pin::Pin; use std::pin::Pin;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use std::time::Duration;
use tokio::sync::broadcast; use tokio::sync::broadcast;
use tokio::task::{JoinError, JoinHandle}; use tokio::task::{JoinError, JoinHandle};
use crate::common::AsAny; /// Represents a reference to a server
pub struct ServerRef {
/// Interface to engage with a server instance.
pub trait ServerRef: AsAny + Send {
/// Returns true if the server is no longer running.
fn is_finished(&self) -> bool;
/// Sends a shutdown signal to the server.
fn shutdown(&self);
fn wait(self) -> Pin<Box<dyn Future<Output = io::Result<()>>>>
where
Self: Sized + 'static,
{
Box::pin(async {
let task = tokio::spawn(async move {
while !self.is_finished() {
tokio::time::sleep(Duration::from_millis(100)).await;
}
});
task.await
.map_err(|x| io::Error::new(io::ErrorKind::Other, x))
})
}
}
impl dyn ServerRef {
/// Attempts to convert this ref into a concrete ref by downcasting
pub fn as_server_ref<R: ServerRef>(&self) -> Option<&R> {
self.as_any().downcast_ref::<R>()
}
/// Attempts to convert this mutable ref into a concrete mutable ref by downcasting
pub fn as_mut_server_ref<R: ServerRef>(&mut self) -> Option<&mut R> {
self.as_mut_any().downcast_mut::<R>()
}
/// Attempts to convert this into a concrete, boxed ref by downcasting
pub fn into_boxed_server_ref<R: ServerRef>(
self: Box<Self>,
) -> Result<Box<R>, Box<dyn std::any::Any>> {
self.into_any().downcast::<R>()
}
/// Waits for the server to complete by continuously polling the finished state.
pub async fn polling_wait(&self) -> io::Result<()> {
while !self.is_finished() {
tokio::time::sleep(Duration::from_millis(100)).await;
}
Ok(())
}
}
/// Represents a generic reference to a server
pub struct GenericServerRef {
pub(crate) shutdown: broadcast::Sender<()>, pub(crate) shutdown: broadcast::Sender<()>,
pub(crate) task: JoinHandle<()>, pub(crate) task: JoinHandle<()>,
} }
/// Runtime-specific implementation of [`ServerRef`] for a [`tokio::task::JoinHandle`] impl ServerRef {
impl ServerRef for GenericServerRef { pub fn is_finished(&self) -> bool {
fn is_finished(&self) -> bool {
self.task.is_finished() self.task.is_finished()
} }
fn shutdown(&self) { pub fn shutdown(&self) {
let _ = self.shutdown.send(()); let _ = self.shutdown.send(());
} }
fn wait(self) -> Pin<Box<dyn Future<Output = io::Result<()>>>>
where
Self: Sized + 'static,
{
Box::pin(async {
self.task
.await
.map_err(|x| io::Error::new(io::ErrorKind::Other, x))
})
}
} }
impl Future for GenericServerRef { impl Future for ServerRef {
type Output = Result<(), JoinError>; type Output = Result<(), JoinError>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {

@ -1,36 +1,58 @@
use std::future::Future;
use std::net::IpAddr; use std::net::IpAddr;
use std::ops::{Deref, DerefMut};
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::task::JoinError;
use super::ServerRef; use super::ServerRef;
/// Reference to a TCP server instance /// Reference to a TCP server instance.
pub struct TcpServerRef { pub struct TcpServerRef {
pub(crate) addr: IpAddr, pub(crate) addr: IpAddr,
pub(crate) port: u16, pub(crate) port: u16,
pub(crate) inner: Box<dyn ServerRef>, pub(crate) inner: ServerRef,
} }
impl TcpServerRef { impl TcpServerRef {
pub fn new(addr: IpAddr, port: u16, inner: Box<dyn ServerRef>) -> Self { pub fn new(addr: IpAddr, port: u16, inner: ServerRef) -> Self {
Self { addr, port, inner } Self { addr, port, inner }
} }
/// Returns the IP address that the listener is bound to /// Returns the IP address that the listener is bound to.
pub fn ip_addr(&self) -> IpAddr { pub fn ip_addr(&self) -> IpAddr {
self.addr self.addr
} }
/// Returns the port that the listener is bound to /// Returns the port that the listener is bound to.
pub fn port(&self) -> u16 { pub fn port(&self) -> u16 {
self.port self.port
} }
/// Consumes ref, returning inner ref.
pub fn into_inner(self) -> ServerRef {
self.inner
}
}
impl Future for TcpServerRef {
type Output = Result<(), JoinError>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Pin::new(&mut self.inner.task).poll(cx)
}
} }
impl ServerRef for TcpServerRef { impl Deref for TcpServerRef {
fn is_finished(&self) -> bool { type Target = ServerRef;
self.inner.is_finished()
fn deref(&self) -> &Self::Target {
&self.inner
} }
}
fn shutdown(&self) { impl DerefMut for TcpServerRef {
self.inner.shutdown(); fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.inner
} }
} }

@ -1,35 +1,52 @@
use std::future::Future;
use std::ops::{Deref, DerefMut};
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::task::JoinError;
use super::ServerRef; use super::ServerRef;
/// Reference to a unix socket server instance /// Reference to a unix socket server instance.
pub struct UnixSocketServerRef { pub struct UnixSocketServerRef {
pub(crate) path: PathBuf, pub(crate) path: PathBuf,
pub(crate) inner: Box<dyn ServerRef>, pub(crate) inner: ServerRef,
} }
impl UnixSocketServerRef { impl UnixSocketServerRef {
pub fn new(path: PathBuf, inner: Box<dyn ServerRef>) -> Self { pub fn new(path: PathBuf, inner: ServerRef) -> Self {
Self { path, inner } Self { path, inner }
} }
/// Returns the path to the socket /// Returns the path to the socket.
pub fn path(&self) -> &Path { pub fn path(&self) -> &Path {
&self.path &self.path
} }
/// Consumes ref, returning inner ref /// Consumes ref, returning inner ref.
pub fn into_inner(self) -> Box<dyn ServerRef> { pub fn into_inner(self) -> ServerRef {
self.inner self.inner
} }
} }
impl ServerRef for UnixSocketServerRef { impl Future for UnixSocketServerRef {
fn is_finished(&self) -> bool { type Output = Result<(), JoinError>;
self.inner.is_finished()
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Pin::new(&mut self.inner.task).poll(cx)
}
}
impl Deref for UnixSocketServerRef {
type Target = ServerRef;
fn deref(&self) -> &Self::Target {
&self.inner
} }
}
fn shutdown(&self) { impl DerefMut for UnixSocketServerRef {
self.inner.shutdown(); fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.inner
} }
} }

@ -1,35 +1,52 @@
use std::ffi::{OsStr, OsString}; use std::ffi::{OsStr, OsString};
use std::future::Future;
use std::ops::{Deref, DerefMut};
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::task::JoinError;
use super::ServerRef; use super::ServerRef;
/// Reference to a unix socket server instance /// Reference to a windows pipe server instance.
pub struct WindowsPipeServerRef { pub struct WindowsPipeServerRef {
pub(crate) addr: OsString, pub(crate) addr: OsString,
pub(crate) inner: Box<dyn ServerRef>, pub(crate) inner: Box<dyn ServerRef>,
} }
impl WindowsPipeServerRef { impl WindowsPipeServerRef {
pub fn new(addr: OsString, inner: Box<dyn ServerRef>) -> Self { pub fn new(addr: OsString, inner: ServerRef) -> Self {
Self { addr, inner } Self { addr, inner }
} }
/// Returns the addr that the listener is bound to /// Returns the addr that the listener is bound to.
pub fn addr(&self) -> &OsStr { pub fn addr(&self) -> &OsStr {
&self.addr &self.addr
} }
/// Consumes ref, returning inner ref /// Consumes ref, returning inner ref.
pub fn into_inner(self) -> Box<dyn ServerRef> { pub fn into_inner(self) -> ServerRef {
self.inner self.inner
} }
} }
impl ServerRef for WindowsPipeServerRef { impl Future for WindowsPipeServerRef {
fn is_finished(&self) -> bool { type Output = Result<(), JoinError>;
self.inner.is_finished()
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Pin::new(&mut self.inner.task).poll(cx)
}
}
impl Deref for WindowsPipeServerRef {
type Target = ServerRef;
fn deref(&self) -> &Self::Target {
&self.inner
} }
}
fn shutdown(&self) { impl DerefMut for WindowsPipeServerRef {
self.inner.shutdown(); fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.inner
} }
} }

@ -37,7 +37,7 @@ async fn should_be_able_to_establish_a_single_connection_and_communicate_with_a_
let (t1, t2) = InmemoryTransport::pair(100); let (t1, t2) = InmemoryTransport::pair(100);
// Spawn a server on one end and connect to it on the other // Spawn a server on one end and connect to it on the other
let _ = Server::new() let _server = Server::new()
.handler(TestServerHandler) .handler(TestServerHandler)
.verifier(Verifier::none()) .verifier(Verifier::none())
.start(OneshotListener::from_value(t2))?; .start(OneshotListener::from_value(t2))?;

@ -30,7 +30,7 @@ impl ServerHandler for TestServerHandler {
async fn should_be_able_to_send_and_receive_typed_payloads_between_client_and_server() { async fn should_be_able_to_send_and_receive_typed_payloads_between_client_and_server() {
let (t1, t2) = InmemoryTransport::pair(100); let (t1, t2) = InmemoryTransport::pair(100);
let _ = Server::new() let _server = Server::new()
.handler(TestServerHandler) .handler(TestServerHandler)
.verifier(Verifier::none()) .verifier(Verifier::none())
.start(OneshotListener::from_value(t2)) .start(OneshotListener::from_value(t2))

@ -30,7 +30,7 @@ impl ServerHandler for TestServerHandler {
async fn should_be_able_to_send_and_receive_untyped_payloads_between_client_and_server() { async fn should_be_able_to_send_and_receive_untyped_payloads_between_client_and_server() {
let (t1, t2) = InmemoryTransport::pair(100); let (t1, t2) = InmemoryTransport::pair(100);
let _ = Server::new() let _server = Server::new()
.handler(TestServerHandler) .handler(TestServerHandler)
.verifier(Verifier::none()) .verifier(Verifier::none())
.start(OneshotListener::from_value(t2)) .start(OneshotListener::from_value(t2))

@ -722,7 +722,7 @@ impl Ssh {
} }
/// Consumes [`Ssh`] and produces a [`DistantClient`] and [`ServerRef`] pair. /// Consumes [`Ssh`] and produces a [`DistantClient`] and [`ServerRef`] pair.
pub async fn into_distant_pair(self) -> io::Result<(DistantClient, Box<dyn ServerRef>)> { pub async fn into_distant_pair(self) -> io::Result<(DistantClient, ServerRef)> {
// Exit early if not authenticated as this is a requirement // Exit early if not authenticated as this is a requirement
if !self.authenticated { if !self.authenticated {
return Err(io::Error::new( return Err(io::Error::new(

@ -185,7 +185,7 @@ async fn async_run(cmd: ManagerSubcommand) -> CliResult {
"global".to_string() "global".to_string()
} }
); );
let manager_ref = Manager { let manager = Manager {
access, access,
config: NetManagerConfig { config: NetManagerConfig {
user, user,
@ -223,11 +223,7 @@ async fn async_run(cmd: ManagerSubcommand) -> CliResult {
.context("Failed to start manager")?; .context("Failed to start manager")?;
// Let our server run to completion // Let our server run to completion
manager_ref manager.await.context("Failed to wait on manager")?;
.as_ref()
.polling_wait()
.await
.context("Failed to wait on manager")?;
info!("Manager is shutting down"); info!("Manager is shutting down");
Ok(()) Ok(())

@ -3,7 +3,7 @@ use std::io::{self, Read, Write};
use anyhow::Context; use anyhow::Context;
use distant_core::net::auth::Verifier; use distant_core::net::auth::Verifier;
use distant_core::net::common::{Host, SecretKey32}; use distant_core::net::common::{Host, SecretKey32};
use distant_core::net::server::{Server, ServerConfig as NetServerConfig, ServerRef}; use distant_core::net::server::{Server, ServerConfig as NetServerConfig};
use distant_core::DistantSingleKeyCredentials; use distant_core::DistantSingleKeyCredentials;
use distant_local::{Config as LocalConfig, WatchConfig as LocalWatchConfig}; use distant_local::{Config as LocalConfig, WatchConfig as LocalWatchConfig};
use log::*; use log::*;
@ -212,7 +212,7 @@ async fn async_run(cmd: ServerSubcommand, _is_forked: bool) -> CliResult {
} }
// Let our server run to completion // Let our server run to completion
server.wait().await.context("Failed to wait on server")?; server.await.context("Failed to wait on server")?;
info!("Server is shutting down"); info!("Server is shutting down");
} }
} }

@ -15,7 +15,7 @@ pub struct Manager {
impl Manager { impl Manager {
/// Begin listening on the network interface specified within [`NetworkConfig`] /// Begin listening on the network interface specified within [`NetworkConfig`]
pub async fn listen(self) -> anyhow::Result<Box<dyn ServerRef>> { pub async fn listen(self) -> anyhow::Result<ServerRef> {
let user = self.config.user; let user = self.config.user;
#[cfg(unix)] #[cfg(unix)]
@ -36,7 +36,7 @@ impl Manager {
.with_context(|| format!("Failed to create socket directory {parent:?}"))?; .with_context(|| format!("Failed to create socket directory {parent:?}"))?;
} }
let boxed_ref = ManagerServer::new(self.config) let server = ManagerServer::new(self.config)
.verifier(Verifier::none()) .verifier(Verifier::none())
.start( .start(
UnixSocketListener::bind_with_permissions(socket_path, self.access.into_mode()) UnixSocketListener::bind_with_permissions(socket_path, self.access.into_mode())
@ -45,7 +45,7 @@ impl Manager {
.with_context(|| format!("Failed to start manager at socket {socket_path:?}"))?; .with_context(|| format!("Failed to start manager at socket {socket_path:?}"))?;
info!("Manager listening using unix socket @ {:?}", socket_path); info!("Manager listening using unix socket @ {:?}", socket_path);
Ok(boxed_ref) Ok(server)
} }
#[cfg(windows)] #[cfg(windows)]
@ -57,13 +57,13 @@ impl Manager {
global_paths::WINDOWS_PIPE_NAME.as_str() global_paths::WINDOWS_PIPE_NAME.as_str()
}); });
let boxed_ref = ManagerServer::new(self.config) let server = ManagerServer::new(self.config)
.verifier(Verifier::none()) .verifier(Verifier::none())
.start(WindowsPipeListener::bind_local(pipe_name)?) .start(WindowsPipeListener::bind_local(pipe_name)?)
.with_context(|| format!("Failed to start manager at pipe {pipe_name:?}"))?; .with_context(|| format!("Failed to start manager at pipe {pipe_name:?}"))?;
info!("Manager listening using windows pipe @ {:?}", pipe_name); info!("Manager listening using windows pipe @ {:?}", pipe_name);
Ok(boxed_ref) Ok(server)
} }
} }
} }

Loading…
Cancel
Save