From d67002421d83734f1c0856b0902db2184e73ac13 Mon Sep 17 00:00:00 2001 From: Chip Senkbeil Date: Sun, 29 Oct 2023 14:14:40 -0500 Subject: [PATCH] Splitting out to broken individual crates --- Cargo.lock | 3 + distant-core-client/Cargo.toml | 1 + .../src}/builder.rs | 0 .../src}/builder/tcp.rs | 0 .../src}/builder/unix.rs | 0 .../src}/builder/windows.rs | 0 .../src}/channel.rs | 0 .../src}/channel/mailbox.rs | 0 .../src}/config.rs | 0 distant-core-client/src/lib.rs | 1331 +++++++++++++++++ .../src}/reconnect.rs | 0 .../src}/shutdown.rs | 0 distant-core-manager/Cargo.toml | 1 + .../src}/client.rs | 0 .../src}/client/channel.rs | 0 .../src}/data.rs | 0 .../src}/data/info.rs | 0 .../src}/data/list.rs | 0 .../src}/data/request.rs | 0 .../src}/data/response.rs | 0 distant-core-manager/src/lib.rs | 16 + .../src}/server.rs | 0 .../src}/server/authentication.rs | 0 .../src}/server/config.rs | 0 .../src}/server/connection.rs | 0 .../src}/server/handler.rs | 0 .../tests/manager_tests.rs | 0 distant-core-net/Cargo.toml | 4 +- distant-core-net/src/{common => }/any.rs | 10 +- distant-core-net/src/authentication.rs | 2 +- distant-core-net/src/client.rs | 1331 ----------------- distant-core-net/src/common.rs | 21 - .../src/{common => }/connection.rs | 10 +- distant-core-net/src/{common => }/key.rs | 0 distant-core-net/src/{common => }/keychain.rs | 2 +- distant-core-net/src/lib.rs | 26 +- distant-core-net/src/{common => }/listener.rs | 0 .../src/{common => }/listener/mapped.rs | 0 .../src/{common => }/listener/mpsc.rs | 0 .../src/{common => }/listener/oneshot.rs | 0 .../src/{common => }/listener/tcp.rs | 2 +- .../src/{common => }/listener/unix.rs | 2 +- .../src/{common => }/listener/windows.rs | 2 +- distant-core-net/src/manager.rs | 16 - distant-core-net/src/{common => }/packet.rs | 0 .../src/{common => }/packet/header.rs | 4 +- .../src/{common => }/packet/request.rs | 3 +- .../src/{common => }/packet/response.rs | 3 +- .../src/{common => }/packet/value.rs | 2 +- distant-core-net/src/{common => }/port.rs | 0 distant-core-net/src/server.rs | 473 ------ .../src/{common => }/transport.rs | 0 .../src/{common => }/transport/framed.rs | 2 +- .../{common => }/transport/framed/backup.rs | 0 .../{common => }/transport/framed/codec.rs | 0 .../transport/framed/codec/chain.rs | 0 .../transport/framed/codec/compression.rs | 0 .../transport/framed/codec/encryption.rs | 2 +- .../transport/framed/codec/plain.rs | 0 .../transport/framed/codec/predicate.rs | 0 .../{common => }/transport/framed/exchange.rs | 2 +- .../transport/framed/exchange/pkb.rs | 0 .../transport/framed/exchange/salt.rs | 0 .../{common => }/transport/framed/frame.rs | 0 .../transport/framed/handshake.rs | 0 .../src/{common => }/transport/inmemory.rs | 2 +- .../src/{common => }/transport/tcp.rs | 2 +- .../src/{common => }/transport/test.rs | 0 .../src/{common => }/transport/unix.rs | 2 +- .../src/{common => }/transport/windows.rs | 2 +- .../{common => }/transport/windows/pipe.rs | 0 distant-core-net/src/{common => }/utils.rs | 0 distant-core-net/src/{common => }/version.rs | 0 distant-core-server/Cargo.toml | 1 + .../src}/builder.rs | 0 .../src}/builder/tcp.rs | 0 .../src}/builder/unix.rs | 0 .../src}/builder/windows.rs | 0 .../src}/config.rs | 0 .../src}/connection.rs | 0 .../src}/context.rs | 0 distant-core-server/src/lib.rs | 473 ++++++ .../server => distant-core-server/src}/ref.rs | 0 .../src}/ref/tcp.rs | 0 .../src}/ref/unix.rs | 0 .../src}/ref/windows.rs | 0 .../src}/reply.rs | 0 .../src}/shutdown_timer.rs | 0 .../src}/state.rs | 0 distant-plugin/src/client.rs | 21 +- distant-plugin/src/handlers.rs | 50 +- 91 files changed, 1939 insertions(+), 1885 deletions(-) rename {distant-core-net/src/client => distant-core-client/src}/builder.rs (100%) rename {distant-core-net/src/client => distant-core-client/src}/builder/tcp.rs (100%) rename {distant-core-net/src/client => distant-core-client/src}/builder/unix.rs (100%) rename {distant-core-net/src/client => distant-core-client/src}/builder/windows.rs (100%) rename {distant-core-net/src/client => distant-core-client/src}/channel.rs (100%) rename {distant-core-net/src/client => distant-core-client/src}/channel/mailbox.rs (100%) rename {distant-core-net/src/client => distant-core-client/src}/config.rs (100%) rename {distant-core-net/src/client => distant-core-client/src}/reconnect.rs (100%) rename {distant-core-net/src/client => distant-core-client/src}/shutdown.rs (100%) rename {distant-core-net/src/manager => distant-core-manager/src}/client.rs (100%) rename {distant-core-net/src/manager => distant-core-manager/src}/client/channel.rs (100%) rename {distant-core-net/src/manager => distant-core-manager/src}/data.rs (100%) rename {distant-core-net/src/manager => distant-core-manager/src}/data/info.rs (100%) rename {distant-core-net/src/manager => distant-core-manager/src}/data/list.rs (100%) rename {distant-core-net/src/manager => distant-core-manager/src}/data/request.rs (100%) rename {distant-core-net/src/manager => distant-core-manager/src}/data/response.rs (100%) rename {distant-core-net/src/manager => distant-core-manager/src}/server.rs (100%) rename {distant-core-net/src/manager => distant-core-manager/src}/server/authentication.rs (100%) rename {distant-core-net/src/manager => distant-core-manager/src}/server/config.rs (100%) rename {distant-core-net/src/manager => distant-core-manager/src}/server/connection.rs (100%) rename {distant-core-net/src/manager => distant-core-manager/src}/server/handler.rs (100%) rename {distant-core-net => distant-core-manager}/tests/manager_tests.rs (100%) rename distant-core-net/src/{common => }/any.rs (77%) delete mode 100644 distant-core-net/src/client.rs delete mode 100644 distant-core-net/src/common.rs rename distant-core-net/src/{common => }/connection.rs (99%) rename distant-core-net/src/{common => }/key.rs (100%) rename distant-core-net/src/{common => }/keychain.rs (99%) rename distant-core-net/src/{common => }/listener.rs (100%) rename distant-core-net/src/{common => }/listener/mapped.rs (100%) rename distant-core-net/src/{common => }/listener/mpsc.rs (100%) rename distant-core-net/src/{common => }/listener/oneshot.rs (100%) rename distant-core-net/src/{common => }/listener/tcp.rs (99%) rename distant-core-net/src/{common => }/listener/unix.rs (99%) rename distant-core-net/src/{common => }/listener/windows.rs (99%) delete mode 100644 distant-core-net/src/manager.rs rename distant-core-net/src/{common => }/packet.rs (100%) rename distant-core-net/src/{common => }/packet/header.rs (96%) rename distant-core-net/src/{common => }/packet/request.rs (99%) rename distant-core-net/src/{common => }/packet/response.rs (99%) rename distant-core-net/src/{common => }/packet/value.rs (98%) rename distant-core-net/src/{common => }/port.rs (100%) delete mode 100644 distant-core-net/src/server.rs rename distant-core-net/src/{common => }/transport.rs (100%) rename distant-core-net/src/{common => }/transport/framed.rs (99%) rename distant-core-net/src/{common => }/transport/framed/backup.rs (100%) rename distant-core-net/src/{common => }/transport/framed/codec.rs (100%) rename distant-core-net/src/{common => }/transport/framed/codec/chain.rs (100%) rename distant-core-net/src/{common => }/transport/framed/codec/compression.rs (100%) rename distant-core-net/src/{common => }/transport/framed/codec/encryption.rs (99%) rename distant-core-net/src/{common => }/transport/framed/codec/plain.rs (100%) rename distant-core-net/src/{common => }/transport/framed/codec/predicate.rs (100%) rename distant-core-net/src/{common => }/transport/framed/exchange.rs (98%) rename distant-core-net/src/{common => }/transport/framed/exchange/pkb.rs (100%) rename distant-core-net/src/{common => }/transport/framed/exchange/salt.rs (100%) rename distant-core-net/src/{common => }/transport/framed/frame.rs (100%) rename distant-core-net/src/{common => }/transport/framed/handshake.rs (100%) rename distant-core-net/src/{common => }/transport/inmemory.rs (99%) rename distant-core-net/src/{common => }/transport/tcp.rs (99%) rename distant-core-net/src/{common => }/transport/test.rs (100%) rename distant-core-net/src/{common => }/transport/unix.rs (99%) rename distant-core-net/src/{common => }/transport/windows.rs (99%) rename distant-core-net/src/{common => }/transport/windows/pipe.rs (100%) rename distant-core-net/src/{common => }/utils.rs (100%) rename distant-core-net/src/{common => }/version.rs (100%) rename {distant-core-net/src/server => distant-core-server/src}/builder.rs (100%) rename {distant-core-net/src/server => distant-core-server/src}/builder/tcp.rs (100%) rename {distant-core-net/src/server => distant-core-server/src}/builder/unix.rs (100%) rename {distant-core-net/src/server => distant-core-server/src}/builder/windows.rs (100%) rename {distant-core-net/src/server => distant-core-server/src}/config.rs (100%) rename {distant-core-net/src/server => distant-core-server/src}/connection.rs (100%) rename {distant-core-net/src/server => distant-core-server/src}/context.rs (100%) rename {distant-core-net/src/server => distant-core-server/src}/ref.rs (100%) rename {distant-core-net/src/server => distant-core-server/src}/ref/tcp.rs (100%) rename {distant-core-net/src/server => distant-core-server/src}/ref/unix.rs (100%) rename {distant-core-net/src/server => distant-core-server/src}/ref/windows.rs (100%) rename {distant-core-net/src/server => distant-core-server/src}/reply.rs (100%) rename {distant-core-net/src/server => distant-core-server/src}/shutdown_timer.rs (100%) rename {distant-core-net/src/server => distant-core-server/src}/state.rs (100%) diff --git a/Cargo.lock b/Cargo.lock index 948adc7..8a3e98f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -866,6 +866,7 @@ version = "0.21.0" dependencies = [ "async-trait", "derive_more", + "distant-core-net", "env_logger", "log", "serde", @@ -879,6 +880,7 @@ version = "0.21.0" dependencies = [ "async-trait", "derive_more", + "distant-core-net", "env_logger", "log", "serde", @@ -941,6 +943,7 @@ version = "0.21.0" dependencies = [ "async-trait", "derive_more", + "distant-core-net", "env_logger", "log", "serde", diff --git a/distant-core-client/Cargo.toml b/distant-core-client/Cargo.toml index 9914802..d303533 100644 --- a/distant-core-client/Cargo.toml +++ b/distant-core-client/Cargo.toml @@ -14,6 +14,7 @@ license = "MIT OR Apache-2.0" [dependencies] async-trait = "0.1.68" derive_more = { version = "0.99.17", default-features = false, features = ["display", "from", "error"] } +distant-core-net = { version = "=0.21.0", path = "../distant-core-net" } log = "0.4.18" serde = { version = "1.0.163", features = ["derive"] } diff --git a/distant-core-net/src/client/builder.rs b/distant-core-client/src/builder.rs similarity index 100% rename from distant-core-net/src/client/builder.rs rename to distant-core-client/src/builder.rs diff --git a/distant-core-net/src/client/builder/tcp.rs b/distant-core-client/src/builder/tcp.rs similarity index 100% rename from distant-core-net/src/client/builder/tcp.rs rename to distant-core-client/src/builder/tcp.rs diff --git a/distant-core-net/src/client/builder/unix.rs b/distant-core-client/src/builder/unix.rs similarity index 100% rename from distant-core-net/src/client/builder/unix.rs rename to distant-core-client/src/builder/unix.rs diff --git a/distant-core-net/src/client/builder/windows.rs b/distant-core-client/src/builder/windows.rs similarity index 100% rename from distant-core-net/src/client/builder/windows.rs rename to distant-core-client/src/builder/windows.rs diff --git a/distant-core-net/src/client/channel.rs b/distant-core-client/src/channel.rs similarity index 100% rename from distant-core-net/src/client/channel.rs rename to distant-core-client/src/channel.rs diff --git a/distant-core-net/src/client/channel/mailbox.rs b/distant-core-client/src/channel/mailbox.rs similarity index 100% rename from distant-core-net/src/client/channel/mailbox.rs rename to distant-core-client/src/channel/mailbox.rs diff --git a/distant-core-net/src/client/config.rs b/distant-core-client/src/config.rs similarity index 100% rename from distant-core-net/src/client/config.rs rename to distant-core-client/src/config.rs diff --git a/distant-core-client/src/lib.rs b/distant-core-client/src/lib.rs index e69de29..05350d3 100644 --- a/distant-core-client/src/lib.rs +++ b/distant-core-client/src/lib.rs @@ -0,0 +1,1331 @@ +use std::ops::{Deref, DerefMut}; +use std::sync::Arc; +use std::time::{Duration, Instant}; +use std::{fmt, io}; + +use log::*; +use serde::de::DeserializeOwned; +use serde::Serialize; +use tokio::sync::{mpsc, oneshot, watch}; +use tokio::task::JoinHandle; + +use crate::common::{ + Connection, FramedTransport, HeapSecretKey, InmemoryTransport, Interest, Reconnectable, + Transport, UntypedRequest, UntypedResponse, +}; + +mod builder; +pub use builder::*; + +mod channel; +pub use channel::*; + +mod config; +pub use config::*; + +mod reconnect; +pub use reconnect::*; + +mod shutdown; +pub use shutdown::*; + +/// Time to wait inbetween connection read/write when nothing was read or written on last pass +const SLEEP_DURATION: Duration = Duration::from_millis(1); + +/// Represents a client that can be used to send requests & receive responses from a server. +/// +/// ### Note +/// +/// This variant does not validate the payload of requests or responses being sent and received. +pub struct UntypedClient { + /// Used to send requests to a server. + channel: UntypedChannel, + + /// Used to watch for changes in the connection state. + watcher: ConnectionWatcher, + + /// Used to send shutdown request to inner task. + shutdown: Box, + + /// Indicates whether the client task will be shutdown when the client is dropped. + shutdown_on_drop: bool, + + /// Contains the task that is running to send requests and receive responses from a server. + task: Option>>, +} + +impl fmt::Debug for UntypedClient { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("UntypedClient") + .field("channel", &self.channel) + .field("shutdown", &"...") + .field("task", &self.task) + .field("shutdown_on_drop", &self.shutdown_on_drop) + .finish() + } +} + +impl Drop for UntypedClient { + fn drop(&mut self) { + if self.shutdown_on_drop { + // TODO: Shutdown is an async operation, can we use it here? + if let Some(task) = self.task.take() { + debug!("Shutdown on drop = true, so aborting client task"); + task.abort(); + } + } + } +} + +impl UntypedClient { + /// Consumes the client, returning a typed variant. + pub fn into_typed_client(mut self) -> Client { + Client { + channel: self.clone_channel().into_typed_channel(), + watcher: self.watcher.clone(), + shutdown: self.shutdown.clone(), + shutdown_on_drop: self.shutdown_on_drop, + task: self.task.take(), + } + } + + /// Convert into underlying channel. + pub fn into_channel(self) -> UntypedChannel { + self.clone_channel() + } + + /// Clones the underlying channel for requests and returns the cloned instance. + pub fn clone_channel(&self) -> UntypedChannel { + self.channel.clone() + } + + /// Waits for the client to terminate, which resolves when the receiving end of the network + /// connection is closed (or the client is shutdown). Returns whether or not the client exited + /// successfully or due to an error. + pub async fn wait(mut self) -> io::Result<()> { + match self.task.take().unwrap().await { + Ok(x) => x, + Err(x) => Err(io::Error::new(io::ErrorKind::Other, x)), + } + } + + /// Abort the client's current connection by forcing its tasks to abort. + pub fn abort(&self) { + if let Some(task) = self.task.as_ref() { + task.abort(); + } + } + + /// Clones the underlying shutdown signaler for the client. This enables you to wait on the + /// client while still having the option to shut it down from somewhere else. + pub fn clone_shutdown(&self) -> Box { + self.shutdown.clone() + } + + /// Signal for the client to shutdown its connection cleanly. + pub async fn shutdown(&self) -> io::Result<()> { + self.shutdown.shutdown().await + } + + /// Returns whether the client should fully shutdown once it is dropped. If true, this will + /// result in all channels tied to the client no longer functioning once the client is dropped. + pub fn will_shutdown_on_drop(&mut self) -> bool { + self.shutdown_on_drop + } + + /// Sets whether the client should fully shutdown once it is dropped. If true, this will result + /// in all channels tied to the client no longer functioning once the client is dropped. + pub fn shutdown_on_drop(&mut self, shutdown_on_drop: bool) { + self.shutdown_on_drop = shutdown_on_drop; + } + + /// Clones the underlying [`ConnectionStateWatcher`] for the client. + pub fn clone_connection_watcher(&self) -> ConnectionWatcher { + self.watcher.clone() + } + + /// Spawns a new task that continually monitors for connection changes and invokes the function + /// `f` whenever a new change is detected. + pub fn on_connection_change(&self, f: F) -> JoinHandle<()> + where + F: FnMut(ConnectionState) + Send + 'static, + { + self.watcher.on_change(f) + } + + /// Returns true if client's underlying event processing has finished/terminated. + pub fn is_finished(&self) -> bool { + self.task.is_none() || self.task.as_ref().unwrap().is_finished() + } + + /// Spawns a client using the provided [`FramedTransport`] of [`InmemoryTransport`] and a + /// specific [`ReconnectStrategy`]. + /// + /// ### Note + /// + /// This will NOT perform any handshakes or authentication procedures nor will it replay any + /// missing frames. This is to be used when establishing a [`Client`] to be run internally + /// within a program. + pub fn spawn_inmemory( + transport: FramedTransport, + config: ClientConfig, + ) -> Self { + let connection = Connection::Client { + id: rand::random(), + reauth_otp: HeapSecretKey::generate(32).unwrap(), + transport, + }; + Self::spawn(connection, config) + } + + /// Spawns a client using the provided [`Connection`]. + pub(crate) fn spawn(mut connection: Connection, config: ClientConfig) -> Self + where + V: Transport + 'static, + { + let post_office = Arc::new(PostOffice::default()); + let weak_post_office = Arc::downgrade(&post_office); + let (tx, mut rx) = mpsc::channel::>(1); + let (shutdown_tx, mut shutdown_rx) = mpsc::channel::>>(1); + + // Ensure that our transport starts off clean (nothing in buffers or backup) + connection.clear(); + + let ClientConfig { + mut reconnect_strategy, + shutdown_on_drop, + silence_duration, + } = config; + + // Start a task that continually checks for responses and delivers them using the + // post office + let shutdown_tx_2 = shutdown_tx.clone(); + let (watcher_tx, watcher_rx) = watch::channel(ConnectionState::Connected); + let task = tokio::spawn(async move { + let mut needs_reconnect = false; + let mut last_read_frame_time = Instant::now(); + + // NOTE: We hold onto a copy of the shutdown sender, even though we will never use it, + // to prevent the channel from being closed. This is because we do a check to + // see if we get a shutdown signal or ready state, and closing the channel + // would cause recv() to resolve immediately and result in the task shutting + // down. + let _shutdown_tx = shutdown_tx_2; + + loop { + // If we have flagged that a reconnect is needed, attempt to do so + if needs_reconnect { + info!("Client encountered issue, attempting to reconnect"); + debug!("Using strategy {reconnect_strategy:?}"); + match reconnect_strategy.reconnect(&mut connection).await { + Ok(()) => { + info!("Client successfully reconnected!"); + needs_reconnect = false; + last_read_frame_time = Instant::now(); + watcher_tx.send_replace(ConnectionState::Connected); + } + Err(x) => { + error!("Unable to re-establish connection: {x}"); + watcher_tx.send_replace(ConnectionState::Disconnected); + break Err(x); + } + } + } + + macro_rules! silence_needs_reconnect { + () => {{ + info!( + "Client exceeded {}s without server activity, so attempting to reconnect", + silence_duration.as_secs_f32(), + ); + needs_reconnect = true; + watcher_tx.send_replace(ConnectionState::Reconnecting); + continue; + }}; + } + + let silence_time_remaining = silence_duration + .checked_sub(last_read_frame_time.elapsed()) + .unwrap_or_default(); + + // NOTE: sleep will not trigger if duration is zero/nanosecond scale, so we + // instead will do an early check here in the case that we need to reconnect + // prior to a sleep while polling the ready status + if silence_time_remaining.as_millis() == 0 { + silence_needs_reconnect!(); + } + + let ready = tokio::select! { + // NOTE: This should NEVER return None as we never allow the channel to close. + cb = shutdown_rx.recv() => { + info!("Client got shutdown signal, so exiting event loop"); + let cb = cb.expect("Impossible: shutdown channel closed!"); + let _ = cb.send(Ok(())); + watcher_tx.send_replace(ConnectionState::Disconnected); + break Ok(()); + } + _ = tokio::time::sleep(silence_time_remaining) => { + silence_needs_reconnect!(); + } + result = connection.ready(Interest::READABLE | Interest::WRITABLE) => { + match result { + Ok(result) => result, + Err(x) => { + error!("Failed to examine ready state: {x}"); + needs_reconnect = true; + watcher_tx.send_replace(ConnectionState::Reconnecting); + continue; + } + } + } + }; + + let mut read_blocked = !ready.is_readable(); + let mut write_blocked = !ready.is_writable(); + + if ready.is_readable() { + match connection.try_read_frame() { + // If we get an empty frame, we consider this a heartbeat and want + // to adjust our frame read time and discard it from our backup + Ok(Some(frame)) if frame.is_empty() => { + trace!("Client received heartbeat"); + last_read_frame_time = Instant::now(); + } + + // Otherwise, we attempt to parse a frame as a response + Ok(Some(frame)) => { + last_read_frame_time = Instant::now(); + match UntypedResponse::from_slice(frame.as_item()) { + Ok(response) => { + if log_enabled!(Level::Trace) { + trace!( + "Client receiving (id:{} | origin: {}): {}", + response.id, + response.origin_id, + String::from_utf8_lossy(&response.payload).to_string() + ); + } + + // For trace-level logging, we need to clone the id and + // origin id before passing the response ownership to + // be delivered elsewhere + let (id, origin_id) = if log_enabled!(Level::Trace) { + (response.id.to_string(), response.origin_id.to_string()) + } else { + (String::new(), String::new()) + }; + + // Try to send response to appropriate mailbox + // TODO: This will block if full... is that a problem? + if post_office + .deliver_untyped_response(response.into_owned()) + .await + { + trace!("Client delivered response {id} to {origin_id}"); + } else { + trace!("Client dropped response {id} to {origin_id}"); + } + } + Err(x) => { + error!("Invalid response: {x}"); + } + } + } + + Ok(None) => { + info!("Connection closed"); + needs_reconnect = true; + watcher_tx.send_replace(ConnectionState::Reconnecting); + continue; + } + Err(x) if x.kind() == io::ErrorKind::WouldBlock => read_blocked = true, + Err(x) => { + error!("Failed to read next frame: {x}"); + needs_reconnect = true; + watcher_tx.send_replace(ConnectionState::Reconnecting); + continue; + } + } + } + + if ready.is_writable() { + // If we get more data to write, attempt to write it, which will result in + // writing any queued bytes as well. Othewise, we attempt to flush any pending + // outgoing bytes that weren't sent earlier. + if let Ok(request) = rx.try_recv() { + if log_enabled!(Level::Trace) { + trace!( + "Client sending {}", + String::from_utf8_lossy(&request.to_bytes()).to_string() + ); + } + match connection.try_write_frame(request.to_bytes()) { + Ok(()) => (), + Err(x) if x.kind() == io::ErrorKind::WouldBlock => write_blocked = true, + Err(x) => { + error!("Failed to write frame: {x}"); + needs_reconnect = true; + watcher_tx.send_replace(ConnectionState::Reconnecting); + continue; + } + } + } else { + // In the case of flushing, there are two scenarios in which we want to + // mark no write occurring: + // + // 1. When flush did not write any bytes, which can happen when the buffer + // is empty + // 2. When the call to write bytes blocks + match connection.try_flush() { + Ok(0) => write_blocked = true, + Ok(_) => (), + Err(x) if x.kind() == io::ErrorKind::WouldBlock => write_blocked = true, + Err(x) => { + error!("Failed to flush outgoing data: {x}"); + needs_reconnect = true; + watcher_tx.send_replace(ConnectionState::Reconnecting); + continue; + } + } + } + } + + // If we did not read or write anything, sleep a bit to offload CPU usage + if read_blocked && write_blocked { + tokio::time::sleep(SLEEP_DURATION).await; + } + } + }); + + let channel = UntypedChannel { + tx, + post_office: weak_post_office, + }; + + Self { + channel, + watcher: ConnectionWatcher(watcher_rx), + shutdown: Box::new(shutdown_tx), + shutdown_on_drop, + task: Some(task), + } + } +} + +impl Deref for UntypedClient { + type Target = UntypedChannel; + + fn deref(&self) -> &Self::Target { + &self.channel + } +} + +impl DerefMut for UntypedClient { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.channel + } +} + +impl From for UntypedChannel { + fn from(client: UntypedClient) -> Self { + client.into_channel() + } +} + +/// Represents a client that can be used to send requests & receive responses from a server. +pub struct Client { + /// Used to send requests to a server. + channel: Channel, + + /// Used to watch for changes in the connection state. + watcher: ConnectionWatcher, + + /// Used to send shutdown request to inner task. + shutdown: Box, + + /// Indicates whether the client task will be shutdown when the client is dropped. + shutdown_on_drop: bool, + + /// Contains the task that is running to send requests and receive responses from a server. + task: Option>>, +} + +impl fmt::Debug for Client { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Client") + .field("channel", &self.channel) + .field("shutdown", &"...") + .field("task", &self.task) + .field("shutdown_on_drop", &self.shutdown_on_drop) + .finish() + } +} + +impl Drop for Client { + fn drop(&mut self) { + if self.shutdown_on_drop { + // TODO: Shutdown is an async operation, can we use it here? + if let Some(task) = self.task.take() { + debug!("Shutdown on drop = true, so aborting client task"); + task.abort(); + } + } + } +} + +impl Client +where + T: Send + Sync + Serialize + 'static, + U: Send + Sync + DeserializeOwned + 'static, +{ + /// Consumes the client, returning an untyped variant. + pub fn into_untyped_client(mut self) -> UntypedClient { + UntypedClient { + channel: self.clone_channel().into_untyped_channel(), + watcher: self.watcher.clone(), + shutdown: self.shutdown.clone(), + shutdown_on_drop: self.shutdown_on_drop, + task: self.task.take(), + } + } + + /// Spawns a client using the provided [`FramedTransport`] of [`InmemoryTransport`] and a + /// specific [`ReconnectStrategy`]. + /// + /// ### Note + /// + /// This will NOT perform any handshakes or authentication procedures nor will it replay any + /// missing frames. This is to be used when establishing a [`Client`] to be run internally + /// within a program. + pub fn spawn_inmemory( + transport: FramedTransport, + config: ClientConfig, + ) -> Self { + UntypedClient::spawn_inmemory(transport, config).into_typed_client() + } +} + +impl Client<(), ()> { + /// Creates a new [`ClientBuilder`]. + pub fn build() -> ClientBuilder<(), ()> { + ClientBuilder::new() + } + + /// Creates a new [`ClientBuilder`] configured to use a [`TcpConnector`]. + pub fn tcp(connector: impl Into>) -> ClientBuilder<(), TcpConnector> { + ClientBuilder::new().connector(connector.into()) + } + + /// Creates a new [`ClientBuilder`] configured to use a [`UnixSocketConnector`]. + #[cfg(unix)] + pub fn unix_socket( + connector: impl Into, + ) -> ClientBuilder<(), UnixSocketConnector> { + ClientBuilder::new().connector(connector.into()) + } + + /// Creates a new [`ClientBuilder`] configured to use a local [`WindowsPipeConnector`]. + #[cfg(windows)] + pub fn local_windows_pipe( + connector: impl Into, + ) -> ClientBuilder<(), WindowsPipeConnector> { + let mut connector = connector.into(); + connector.local = true; + ClientBuilder::new().connector(connector) + } + + /// Creates a new [`ClientBuilder`] configured to use a [`WindowsPipeConnector`]. + #[cfg(windows)] + pub fn windows_pipe( + connector: impl Into, + ) -> ClientBuilder<(), WindowsPipeConnector> { + ClientBuilder::new().connector(connector.into()) + } +} + +impl Client { + /// Convert into underlying channel. + pub fn into_channel(self) -> Channel { + self.clone_channel() + } + + /// Clones the underlying channel for requests and returns the cloned instance. + pub fn clone_channel(&self) -> Channel { + self.channel.clone() + } + + /// Waits for the client to terminate, which resolves when the receiving end of the network + /// connection is closed (or the client is shutdown). Returns whether or not the client exited + /// successfully or due to an error. + pub async fn wait(mut self) -> io::Result<()> { + match self.task.take().unwrap().await { + Ok(x) => x, + Err(x) => Err(io::Error::new(io::ErrorKind::Other, x)), + } + } + + /// Abort the client's current connection by forcing its tasks to abort. + pub fn abort(&self) { + if let Some(task) = self.task.as_ref() { + task.abort(); + } + } + + /// Clones the underlying shutdown signaler for the client. This enables you to wait on the + /// client while still having the option to shut it down from somewhere else. + pub fn clone_shutdown(&self) -> Box { + self.shutdown.clone() + } + + /// Signal for the client to shutdown its connection cleanly. + pub async fn shutdown(&self) -> io::Result<()> { + self.shutdown.shutdown().await + } + + /// Returns whether the client should fully shutdown once it is dropped. If true, this will + /// result in all channels tied to the client no longer functioning once the client is dropped. + pub fn will_shutdown_on_drop(&mut self) -> bool { + self.shutdown_on_drop + } + + /// Sets whether the client should fully shutdown once it is dropped. If true, this will result + /// in all channels tied to the client no longer functioning once the client is dropped. + pub fn shutdown_on_drop(&mut self, shutdown_on_drop: bool) { + self.shutdown_on_drop = shutdown_on_drop; + } + + /// Clones the underlying [`ConnectionStateWatcher`] for the client. + pub fn clone_connection_watcher(&self) -> ConnectionWatcher { + self.watcher.clone() + } + + /// Spawns a new task that continually monitors for connection changes and invokes the function + /// `f` whenever a new change is detected. + pub fn on_connection_change(&self, f: F) -> JoinHandle<()> + where + F: FnMut(ConnectionState) + Send + 'static, + { + self.watcher.on_change(f) + } + + /// Returns true if client's underlying event processing has finished/terminated. + pub fn is_finished(&self) -> bool { + self.task.is_none() || self.task.as_ref().unwrap().is_finished() + } +} + +impl Deref for Client { + type Target = Channel; + + fn deref(&self) -> &Self::Target { + &self.channel + } +} + +impl DerefMut for Client { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.channel + } +} + +impl From> for Channel { + fn from(client: Client) -> Self { + client.clone_channel() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::client::ClientConfig; + use crate::common::{Ready, Request, Response, TestTransport}; + + mod typed { + use test_log::test; + + use super::*; + type TestClient = Client; + + fn spawn_test_client( + connection: Connection, + reconnect_strategy: ReconnectStrategy, + ) -> TestClient + where + T: Transport + 'static, + { + UntypedClient::spawn( + connection, + ClientConfig { + reconnect_strategy, + ..Default::default() + }, + ) + .into_typed_client() + } + + /// Creates a new test transport whose operations do not panic, but do nothing. + #[inline] + fn new_test_transport() -> TestTransport { + TestTransport { + f_try_read: Box::new(|_| Err(io::ErrorKind::WouldBlock.into())), + f_try_write: Box::new(|_| Err(io::ErrorKind::WouldBlock.into())), + f_ready: Box::new(|_| Ok(Ready::EMPTY)), + f_reconnect: Box::new(|| Ok(())), + } + } + + #[test(tokio::test)] + async fn should_write_queued_requests_as_outgoing_frames() { + let (client, mut server) = Connection::pair(100); + + let mut client = spawn_test_client(client, ReconnectStrategy::Fail); + client.fire(Request::new(1u8)).await.unwrap(); + client.fire(Request::new(2u8)).await.unwrap(); + client.fire(Request::new(3u8)).await.unwrap(); + + assert_eq!( + server + .read_frame_as::>() + .await + .unwrap() + .unwrap() + .payload, + 1 + ); + assert_eq!( + server + .read_frame_as::>() + .await + .unwrap() + .unwrap() + .payload, + 2 + ); + assert_eq!( + server + .read_frame_as::>() + .await + .unwrap() + .unwrap() + .payload, + 3 + ); + } + + #[test(tokio::test)] + async fn should_read_incoming_frames_as_responses_and_deliver_them_to_waiting_mailboxes() { + let (client, mut server) = Connection::pair(100); + + // NOTE: Spawn a separate task to handle the response so we do not deadlock + tokio::spawn(async move { + let request = server + .read_frame_as::>() + .await + .unwrap() + .unwrap(); + server + .write_frame_for(&Response::new(request.id, 2u8)) + .await + .unwrap(); + }); + + let mut client = spawn_test_client(client, ReconnectStrategy::Fail); + assert_eq!(client.send(Request::new(1u8)).await.unwrap().payload, 2); + } + + #[test(tokio::test)] + async fn should_attempt_to_reconnect_if_connection_fails_to_determine_state() { + let (reconnect_tx, mut reconnect_rx) = mpsc::channel(1); + spawn_test_client( + Connection::test_client({ + let mut transport = new_test_transport(); + + transport.f_ready = Box::new(|_| Err(io::ErrorKind::Other.into())); + + // Send a signal that the reconnect happened while marking it successful + transport.f_reconnect = Box::new(move || { + reconnect_tx.try_send(()).expect("reconnect tx blocked"); + Ok(()) + }); + + transport + }), + ReconnectStrategy::FixedInterval { + interval: Duration::from_millis(50), + max_retries: None, + timeout: None, + }, + ); + + reconnect_rx.recv().await.expect("Reconnect did not occur"); + } + + #[test(tokio::test)] + async fn should_attempt_to_reconnect_if_connection_closed_by_server() { + let (reconnect_tx, mut reconnect_rx) = mpsc::channel(1); + spawn_test_client( + Connection::test_client({ + let mut transport = new_test_transport(); + + // Report back that we're readable to trigger try_read + transport.f_ready = Box::new(|_| Ok(Ready::READABLE)); + + // Report that no bytes were written, indicting the channel was closed + transport.f_try_read = Box::new(|_| Ok(0)); + + // Send a signal that the reconnect happened while marking it successful + transport.f_reconnect = Box::new(move || { + reconnect_tx.try_send(()).expect("reconnect tx blocked"); + Ok(()) + }); + + transport + }), + ReconnectStrategy::FixedInterval { + interval: Duration::from_millis(50), + max_retries: None, + timeout: None, + }, + ); + + reconnect_rx.recv().await.expect("Reconnect did not occur"); + } + + #[test(tokio::test)] + async fn should_attempt_to_reconnect_if_connection_errors_while_reading_data() { + let (reconnect_tx, mut reconnect_rx) = mpsc::channel(1); + spawn_test_client( + Connection::test_client({ + let mut transport = new_test_transport(); + + // Report back that we're readable to trigger try_read + transport.f_ready = Box::new(|_| Ok(Ready::READABLE)); + + // Fail the read + transport.f_try_read = Box::new(|_| Err(io::ErrorKind::Other.into())); + + // Send a signal that the reconnect happened while marking it successful + transport.f_reconnect = Box::new(move || { + reconnect_tx.try_send(()).expect("reconnect tx blocked"); + Ok(()) + }); + + transport + }), + ReconnectStrategy::FixedInterval { + interval: Duration::from_millis(50), + max_retries: None, + timeout: None, + }, + ); + + reconnect_rx.recv().await.expect("Reconnect did not occur"); + } + + #[test(tokio::test)] + async fn should_attempt_to_reconnect_if_connection_unable_to_send_new_request() { + let (reconnect_tx, mut reconnect_rx) = mpsc::channel(1); + let mut client = spawn_test_client( + Connection::test_client({ + let mut transport = new_test_transport(); + + // Report back that we're readable to trigger try_read + transport.f_ready = Box::new(|_| Ok(Ready::WRITABLE)); + + // Fail the write + transport.f_try_write = Box::new(|_| Err(io::ErrorKind::Other.into())); + + // Send a signal that the reconnect happened while marking it successful + transport.f_reconnect = Box::new(move || { + reconnect_tx.try_send(()).expect("reconnect tx blocked"); + Ok(()) + }); + + transport + }), + ReconnectStrategy::FixedInterval { + interval: Duration::from_millis(50), + max_retries: None, + timeout: None, + }, + ); + + // Queue up a request to fail to send + client + .fire(Request::new(123u8)) + .await + .expect("Failed to queue request"); + + reconnect_rx.recv().await.expect("Reconnect did not occur"); + } + + #[test(tokio::test)] + async fn should_attempt_to_reconnect_if_connection_unable_to_flush_an_existing_request() { + let (reconnect_tx, mut reconnect_rx) = mpsc::channel(1); + let mut client = spawn_test_client( + Connection::test_client({ + let mut transport = new_test_transport(); + + // Report back that we're readable to trigger try_read + transport.f_ready = Box::new(|_| Ok(Ready::WRITABLE)); + + // Succeed partially with initial try_write, block on second call, and then + // fail during a try_flush + transport.f_try_write = Box::new(|buf| unsafe { + static mut CNT: u8 = 0; + CNT += 1; + if CNT == 1 { + Ok(buf.len() / 2) + } else if CNT == 2 { + Err(io::ErrorKind::WouldBlock.into()) + } else { + Err(io::ErrorKind::Other.into()) + } + }); + + // Send a signal that the reconnect happened while marking it successful + transport.f_reconnect = Box::new(move || { + reconnect_tx.try_send(()).expect("reconnect tx blocked"); + Ok(()) + }); + + transport + }), + ReconnectStrategy::FixedInterval { + interval: Duration::from_millis(50), + max_retries: None, + timeout: None, + }, + ); + + // Queue up a request to fail to send + client + .fire(Request::new(123u8)) + .await + .expect("Failed to queue request"); + + reconnect_rx.recv().await.expect("Reconnect did not occur"); + } + + #[test(tokio::test)] + async fn should_exit_if_reconnect_strategy_has_failed_to_connect() { + let (client, server) = Connection::pair(100); + + // Spawn the client, verify the task is running, kill our server, and verify that the + // client does not block trying to reconnect + let client = spawn_test_client(client, ReconnectStrategy::Fail); + assert!(!client.is_finished(), "Client unexpectedly died"); + drop(server); + assert_eq!( + client.wait().await.unwrap_err().kind(), + io::ErrorKind::ConnectionAborted + ); + } + + #[test(tokio::test)] + async fn should_exit_if_shutdown_signal_detected() { + let (client, _server) = Connection::pair(100); + + let client = spawn_test_client(client, ReconnectStrategy::Fail); + client.shutdown().await.unwrap(); + + // NOTE: We wait for the client's task to conclude by using `wait` to ensure we do not + // have a race condition testing the task finished state. This will also verify + // that the task exited cleanly, rather than panicking. + client.wait().await.unwrap(); + } + + #[test(tokio::test)] + async fn should_not_exit_if_shutdown_channel_is_closed() { + let (client, mut server) = Connection::pair(100); + + // NOTE: Spawn a separate task to handle the response so we do not deadlock + tokio::spawn(async move { + let request = server + .read_frame_as::>() + .await + .unwrap() + .unwrap(); + server + .write_frame_for(&Response::new(request.id, 2u8)) + .await + .unwrap(); + }); + + // NOTE: We consume the client to produce a channel without maintaining the shutdown + // channel in order to ensure that dropping the client does not kill the task. + let mut channel = spawn_test_client(client, ReconnectStrategy::Fail).into_channel(); + assert_eq!(channel.send(Request::new(1u8)).await.unwrap().payload, 2); + } + } + + mod untyped { + use test_log::test; + + use super::*; + type TestClient = UntypedClient; + + /// Creates a new test transport whose operations do not panic, but do nothing. + #[inline] + fn new_test_transport() -> TestTransport { + TestTransport { + f_try_read: Box::new(|_| Err(io::ErrorKind::WouldBlock.into())), + f_try_write: Box::new(|_| Err(io::ErrorKind::WouldBlock.into())), + f_ready: Box::new(|_| Ok(Ready::EMPTY)), + f_reconnect: Box::new(|| Ok(())), + } + } + + #[test(tokio::test)] + async fn should_write_queued_requests_as_outgoing_frames() { + let (client, mut server) = Connection::pair(100); + + let mut client = TestClient::spawn(client, Default::default()); + client + .fire(Request::new(1u8).to_untyped_request().unwrap()) + .await + .unwrap(); + client + .fire(Request::new(2u8).to_untyped_request().unwrap()) + .await + .unwrap(); + client + .fire(Request::new(3u8).to_untyped_request().unwrap()) + .await + .unwrap(); + + assert_eq!( + server + .read_frame_as::>() + .await + .unwrap() + .unwrap() + .payload, + 1 + ); + assert_eq!( + server + .read_frame_as::>() + .await + .unwrap() + .unwrap() + .payload, + 2 + ); + assert_eq!( + server + .read_frame_as::>() + .await + .unwrap() + .unwrap() + .payload, + 3 + ); + } + + #[test(tokio::test)] + async fn should_read_incoming_frames_as_responses_and_deliver_them_to_waiting_mailboxes() { + let (client, mut server) = Connection::pair(100); + + // NOTE: Spawn a separate task to handle the response so we do not deadlock + tokio::spawn(async move { + let request = server + .read_frame_as::>() + .await + .unwrap() + .unwrap(); + server + .write_frame_for(&Response::new(request.id, 2u8)) + .await + .unwrap(); + }); + + let mut client = TestClient::spawn(client, Default::default()); + assert_eq!( + client + .send(Request::new(1u8).to_untyped_request().unwrap()) + .await + .unwrap() + .to_typed_response::() + .unwrap() + .payload, + 2 + ); + } + + #[test(tokio::test)] + async fn should_attempt_to_reconnect_if_connection_fails_to_determine_state() { + let (reconnect_tx, mut reconnect_rx) = mpsc::channel(1); + TestClient::spawn( + Connection::test_client({ + let mut transport = new_test_transport(); + + transport.f_ready = Box::new(|_| Err(io::ErrorKind::Other.into())); + + // Send a signal that the reconnect happened while marking it successful + transport.f_reconnect = Box::new(move || { + reconnect_tx.try_send(()).expect("reconnect tx blocked"); + Ok(()) + }); + + transport + }), + ClientConfig { + reconnect_strategy: ReconnectStrategy::FixedInterval { + interval: Duration::from_millis(50), + max_retries: None, + timeout: None, + }, + ..Default::default() + }, + ); + + reconnect_rx.recv().await.expect("Reconnect did not occur"); + } + + #[test(tokio::test)] + async fn should_attempt_to_reconnect_if_connection_closed_by_server() { + let (reconnect_tx, mut reconnect_rx) = mpsc::channel(1); + TestClient::spawn( + Connection::test_client({ + let mut transport = new_test_transport(); + + // Report back that we're readable to trigger try_read + transport.f_ready = Box::new(|_| Ok(Ready::READABLE)); + + // Report that no bytes were written, indicting the channel was closed + transport.f_try_read = Box::new(|_| Ok(0)); + + // Send a signal that the reconnect happened while marking it successful + transport.f_reconnect = Box::new(move || { + reconnect_tx.try_send(()).expect("reconnect tx blocked"); + Ok(()) + }); + + transport + }), + ClientConfig { + reconnect_strategy: ReconnectStrategy::FixedInterval { + interval: Duration::from_millis(50), + max_retries: None, + timeout: None, + }, + ..Default::default() + }, + ); + + reconnect_rx.recv().await.expect("Reconnect did not occur"); + } + + #[test(tokio::test)] + async fn should_attempt_to_reconnect_if_connection_errors_while_reading_data() { + let (reconnect_tx, mut reconnect_rx) = mpsc::channel(1); + TestClient::spawn( + Connection::test_client({ + let mut transport = new_test_transport(); + + // Report back that we're readable to trigger try_read + transport.f_ready = Box::new(|_| Ok(Ready::READABLE)); + + // Fail the read + transport.f_try_read = Box::new(|_| Err(io::ErrorKind::Other.into())); + + // Send a signal that the reconnect happened while marking it successful + transport.f_reconnect = Box::new(move || { + reconnect_tx.try_send(()).expect("reconnect tx blocked"); + Ok(()) + }); + + transport + }), + ClientConfig { + reconnect_strategy: ReconnectStrategy::FixedInterval { + interval: Duration::from_millis(50), + max_retries: None, + timeout: None, + }, + ..Default::default() + }, + ); + + reconnect_rx.recv().await.expect("Reconnect did not occur"); + } + + #[test(tokio::test)] + async fn should_attempt_to_reconnect_if_connection_unable_to_send_new_request() { + let (reconnect_tx, mut reconnect_rx) = mpsc::channel(1); + let mut client = TestClient::spawn( + Connection::test_client({ + let mut transport = new_test_transport(); + + // Report back that we're readable to trigger try_read + transport.f_ready = Box::new(|_| Ok(Ready::WRITABLE)); + + // Fail the write + transport.f_try_write = Box::new(|_| Err(io::ErrorKind::Other.into())); + + // Send a signal that the reconnect happened while marking it successful + transport.f_reconnect = Box::new(move || { + reconnect_tx.try_send(()).expect("reconnect tx blocked"); + Ok(()) + }); + + transport + }), + ClientConfig { + reconnect_strategy: ReconnectStrategy::FixedInterval { + interval: Duration::from_millis(50), + max_retries: None, + timeout: None, + }, + ..Default::default() + }, + ); + + // Queue up a request to fail to send + client + .fire(Request::new(123u8).to_untyped_request().unwrap()) + .await + .expect("Failed to queue request"); + + reconnect_rx.recv().await.expect("Reconnect did not occur"); + } + + #[test(tokio::test)] + async fn should_attempt_to_reconnect_if_connection_unable_to_flush_an_existing_request() { + let (reconnect_tx, mut reconnect_rx) = mpsc::channel(1); + let mut client = TestClient::spawn( + Connection::test_client({ + let mut transport = new_test_transport(); + + // Report back that we're readable to trigger try_read + transport.f_ready = Box::new(|_| Ok(Ready::WRITABLE)); + + // Succeed partially with initial try_write, block on second call, and then + // fail during a try_flush + transport.f_try_write = Box::new(|buf| unsafe { + static mut CNT: u8 = 0; + CNT += 1; + if CNT == 1 { + Ok(buf.len() / 2) + } else if CNT == 2 { + Err(io::ErrorKind::WouldBlock.into()) + } else { + Err(io::ErrorKind::Other.into()) + } + }); + + // Send a signal that the reconnect happened while marking it successful + transport.f_reconnect = Box::new(move || { + reconnect_tx.try_send(()).expect("reconnect tx blocked"); + Ok(()) + }); + + transport + }), + ClientConfig { + reconnect_strategy: ReconnectStrategy::FixedInterval { + interval: Duration::from_millis(50), + max_retries: None, + timeout: None, + }, + ..Default::default() + }, + ); + + // Queue up a request to fail to send + client + .fire(Request::new(123u8).to_untyped_request().unwrap()) + .await + .expect("Failed to queue request"); + + reconnect_rx.recv().await.expect("Reconnect did not occur"); + } + + #[test(tokio::test)] + async fn should_exit_if_reconnect_strategy_has_failed_to_connect() { + let (client, server) = Connection::pair(100); + + // Spawn the client, verify the task is running, kill our server, and verify that the + // client does not block trying to reconnect + let client = TestClient::spawn(client, Default::default()); + assert!(!client.is_finished(), "Client unexpectedly died"); + drop(server); + assert_eq!( + client.wait().await.unwrap_err().kind(), + io::ErrorKind::ConnectionAborted + ); + } + + #[test(tokio::test)] + async fn should_exit_if_shutdown_signal_detected() { + let (client, _server) = Connection::pair(100); + + let client = TestClient::spawn(client, Default::default()); + client.shutdown().await.unwrap(); + + // NOTE: We wait for the client's task to conclude by using `wait` to ensure we do not + // have a race condition testing the task finished state. This will also verify + // that the task exited cleanly, rather than panicking. + client.wait().await.unwrap(); + } + + #[test(tokio::test)] + async fn should_not_exit_if_shutdown_channel_is_closed() { + let (client, mut server) = Connection::pair(100); + + // NOTE: Spawn a separate task to handle the response so we do not deadlock + tokio::spawn(async move { + let request = server + .read_frame_as::>() + .await + .unwrap() + .unwrap(); + server + .write_frame_for(&Response::new(request.id, 2u8)) + .await + .unwrap(); + }); + + // NOTE: We consume the client to produce a channel without maintaining the shutdown + // channel in order to ensure that dropping the client does not kill the task. + let mut channel = TestClient::spawn(client, Default::default()).into_channel(); + assert_eq!( + channel + .send(Request::new(1u8).to_untyped_request().unwrap()) + .await + .unwrap() + .to_typed_response::() + .unwrap() + .payload, + 2 + ); + } + + #[test(tokio::test)] + async fn should_attempt_to_reconnect_if_no_activity_from_server_within_silence_duration() { + let (client, _) = Connection::pair(100); + + // NOTE: We consume the client to produce a channel without maintaining the shutdown + // channel in order to ensure that dropping the client does not kill the task. + let client = TestClient::spawn( + client, + ClientConfig { + silence_duration: Duration::from_millis(100), + reconnect_strategy: ReconnectStrategy::FixedInterval { + interval: Duration::from_millis(50), + max_retries: Some(3), + timeout: None, + }, + ..Default::default() + }, + ); + + let (tx, mut rx) = mpsc::unbounded_channel(); + client.on_connection_change(move |state| tx.send(state).unwrap()); + assert_eq!(rx.recv().await, Some(ConnectionState::Reconnecting)); + assert_eq!(rx.recv().await, Some(ConnectionState::Disconnected)); + assert_eq!(rx.recv().await, None); + } + } +} diff --git a/distant-core-net/src/client/reconnect.rs b/distant-core-client/src/reconnect.rs similarity index 100% rename from distant-core-net/src/client/reconnect.rs rename to distant-core-client/src/reconnect.rs diff --git a/distant-core-net/src/client/shutdown.rs b/distant-core-client/src/shutdown.rs similarity index 100% rename from distant-core-net/src/client/shutdown.rs rename to distant-core-client/src/shutdown.rs diff --git a/distant-core-manager/Cargo.toml b/distant-core-manager/Cargo.toml index e2fef18..f6d2d0b 100644 --- a/distant-core-manager/Cargo.toml +++ b/distant-core-manager/Cargo.toml @@ -14,6 +14,7 @@ license = "MIT OR Apache-2.0" [dependencies] async-trait = "0.1.68" derive_more = { version = "0.99.17", default-features = false, features = ["display", "from", "error"] } +distant-core-net = { version = "=0.21.0", path = "../distant-core-net" } log = "0.4.18" serde = { version = "1.0.163", features = ["derive"] } diff --git a/distant-core-net/src/manager/client.rs b/distant-core-manager/src/client.rs similarity index 100% rename from distant-core-net/src/manager/client.rs rename to distant-core-manager/src/client.rs diff --git a/distant-core-net/src/manager/client/channel.rs b/distant-core-manager/src/client/channel.rs similarity index 100% rename from distant-core-net/src/manager/client/channel.rs rename to distant-core-manager/src/client/channel.rs diff --git a/distant-core-net/src/manager/data.rs b/distant-core-manager/src/data.rs similarity index 100% rename from distant-core-net/src/manager/data.rs rename to distant-core-manager/src/data.rs diff --git a/distant-core-net/src/manager/data/info.rs b/distant-core-manager/src/data/info.rs similarity index 100% rename from distant-core-net/src/manager/data/info.rs rename to distant-core-manager/src/data/info.rs diff --git a/distant-core-net/src/manager/data/list.rs b/distant-core-manager/src/data/list.rs similarity index 100% rename from distant-core-net/src/manager/data/list.rs rename to distant-core-manager/src/data/list.rs diff --git a/distant-core-net/src/manager/data/request.rs b/distant-core-manager/src/data/request.rs similarity index 100% rename from distant-core-net/src/manager/data/request.rs rename to distant-core-manager/src/data/request.rs diff --git a/distant-core-net/src/manager/data/response.rs b/distant-core-manager/src/data/response.rs similarity index 100% rename from distant-core-net/src/manager/data/response.rs rename to distant-core-manager/src/data/response.rs diff --git a/distant-core-manager/src/lib.rs b/distant-core-manager/src/lib.rs index e69de29..0201d71 100644 --- a/distant-core-manager/src/lib.rs +++ b/distant-core-manager/src/lib.rs @@ -0,0 +1,16 @@ +mod client; +mod data; +mod server; + +pub use client::*; +pub use data::*; +pub use server::*; + +use crate::common::Version; + +/// Represents the version associated with the manager's protocol. +pub const PROTOCOL_VERSION: Version = Version::new( + const_str::parse!(env!("CARGO_PKG_VERSION_MAJOR"), u64), + const_str::parse!(env!("CARGO_PKG_VERSION_MINOR"), u64), + const_str::parse!(env!("CARGO_PKG_VERSION_PATCH"), u64), +); diff --git a/distant-core-net/src/manager/server.rs b/distant-core-manager/src/server.rs similarity index 100% rename from distant-core-net/src/manager/server.rs rename to distant-core-manager/src/server.rs diff --git a/distant-core-net/src/manager/server/authentication.rs b/distant-core-manager/src/server/authentication.rs similarity index 100% rename from distant-core-net/src/manager/server/authentication.rs rename to distant-core-manager/src/server/authentication.rs diff --git a/distant-core-net/src/manager/server/config.rs b/distant-core-manager/src/server/config.rs similarity index 100% rename from distant-core-net/src/manager/server/config.rs rename to distant-core-manager/src/server/config.rs diff --git a/distant-core-net/src/manager/server/connection.rs b/distant-core-manager/src/server/connection.rs similarity index 100% rename from distant-core-net/src/manager/server/connection.rs rename to distant-core-manager/src/server/connection.rs diff --git a/distant-core-net/src/manager/server/handler.rs b/distant-core-manager/src/server/handler.rs similarity index 100% rename from distant-core-net/src/manager/server/handler.rs rename to distant-core-manager/src/server/handler.rs diff --git a/distant-core-net/tests/manager_tests.rs b/distant-core-manager/tests/manager_tests.rs similarity index 100% rename from distant-core-net/tests/manager_tests.rs rename to distant-core-manager/tests/manager_tests.rs diff --git a/distant-core-net/Cargo.toml b/distant-core-net/Cargo.toml index 388ad83..fa8b8c5 100644 --- a/distant-core-net/Cargo.toml +++ b/distant-core-net/Cargo.toml @@ -1,8 +1,8 @@ [package] name = "distant-core-net" -description = "Core network library for distant, providing implementations to support client/server architecture" +description = "Core network library for distant, providing primitives for use in network communication" categories = ["network-programming"] -keywords = ["api", "async"] +keywords = ["api", "async", "network", "primitives"] version = "0.21.0" authors = ["Chip Senkbeil "] edition = "2021" diff --git a/distant-core-net/src/common/any.rs b/distant-core-net/src/any.rs similarity index 77% rename from distant-core-net/src/common/any.rs rename to distant-core-net/src/any.rs index a03bc61..c080a1f 100644 --- a/distant-core-net/src/common/any.rs +++ b/distant-core-net/src/any.rs @@ -1,19 +1,19 @@ use std::any::Any; -/// Trait used for casting support into the [`Any`] trait object +/// Trait used for casting support into the [`Any`] trait object. pub trait AsAny: Any { - /// Converts reference to [`Any`] + /// Converts reference to [`Any`]. fn as_any(&self) -> &dyn Any; - /// Converts mutable reference to [`Any`] + /// Converts mutable reference to [`Any`]. fn as_mut_any(&mut self) -> &mut dyn Any; - /// Consumes and produces `Box` + /// Consumes and produces `Box`. fn into_any(self: Box) -> Box; } /// Blanket implementation that enables any `'static` reference to convert -/// to the [`Any`] type +/// to the [`Any`] type. impl AsAny for T { fn as_any(&self) -> &dyn Any { self diff --git a/distant-core-net/src/authentication.rs b/distant-core-net/src/authentication.rs index cdba7c6..9786367 100644 --- a/distant-core-net/src/authentication.rs +++ b/distant-core-net/src/authentication.rs @@ -5,7 +5,7 @@ use distant_core_auth::msg::*; use distant_core_auth::{AuthHandler, Authenticate, Authenticator}; use log::*; -use crate::common::{utils, FramedTransport, Transport}; +use crate::{utils, FramedTransport, Transport}; macro_rules! write_frame { ($transport:expr, $data:expr) => {{ diff --git a/distant-core-net/src/client.rs b/distant-core-net/src/client.rs deleted file mode 100644 index 05350d3..0000000 --- a/distant-core-net/src/client.rs +++ /dev/null @@ -1,1331 +0,0 @@ -use std::ops::{Deref, DerefMut}; -use std::sync::Arc; -use std::time::{Duration, Instant}; -use std::{fmt, io}; - -use log::*; -use serde::de::DeserializeOwned; -use serde::Serialize; -use tokio::sync::{mpsc, oneshot, watch}; -use tokio::task::JoinHandle; - -use crate::common::{ - Connection, FramedTransport, HeapSecretKey, InmemoryTransport, Interest, Reconnectable, - Transport, UntypedRequest, UntypedResponse, -}; - -mod builder; -pub use builder::*; - -mod channel; -pub use channel::*; - -mod config; -pub use config::*; - -mod reconnect; -pub use reconnect::*; - -mod shutdown; -pub use shutdown::*; - -/// Time to wait inbetween connection read/write when nothing was read or written on last pass -const SLEEP_DURATION: Duration = Duration::from_millis(1); - -/// Represents a client that can be used to send requests & receive responses from a server. -/// -/// ### Note -/// -/// This variant does not validate the payload of requests or responses being sent and received. -pub struct UntypedClient { - /// Used to send requests to a server. - channel: UntypedChannel, - - /// Used to watch for changes in the connection state. - watcher: ConnectionWatcher, - - /// Used to send shutdown request to inner task. - shutdown: Box, - - /// Indicates whether the client task will be shutdown when the client is dropped. - shutdown_on_drop: bool, - - /// Contains the task that is running to send requests and receive responses from a server. - task: Option>>, -} - -impl fmt::Debug for UntypedClient { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("UntypedClient") - .field("channel", &self.channel) - .field("shutdown", &"...") - .field("task", &self.task) - .field("shutdown_on_drop", &self.shutdown_on_drop) - .finish() - } -} - -impl Drop for UntypedClient { - fn drop(&mut self) { - if self.shutdown_on_drop { - // TODO: Shutdown is an async operation, can we use it here? - if let Some(task) = self.task.take() { - debug!("Shutdown on drop = true, so aborting client task"); - task.abort(); - } - } - } -} - -impl UntypedClient { - /// Consumes the client, returning a typed variant. - pub fn into_typed_client(mut self) -> Client { - Client { - channel: self.clone_channel().into_typed_channel(), - watcher: self.watcher.clone(), - shutdown: self.shutdown.clone(), - shutdown_on_drop: self.shutdown_on_drop, - task: self.task.take(), - } - } - - /// Convert into underlying channel. - pub fn into_channel(self) -> UntypedChannel { - self.clone_channel() - } - - /// Clones the underlying channel for requests and returns the cloned instance. - pub fn clone_channel(&self) -> UntypedChannel { - self.channel.clone() - } - - /// Waits for the client to terminate, which resolves when the receiving end of the network - /// connection is closed (or the client is shutdown). Returns whether or not the client exited - /// successfully or due to an error. - pub async fn wait(mut self) -> io::Result<()> { - match self.task.take().unwrap().await { - Ok(x) => x, - Err(x) => Err(io::Error::new(io::ErrorKind::Other, x)), - } - } - - /// Abort the client's current connection by forcing its tasks to abort. - pub fn abort(&self) { - if let Some(task) = self.task.as_ref() { - task.abort(); - } - } - - /// Clones the underlying shutdown signaler for the client. This enables you to wait on the - /// client while still having the option to shut it down from somewhere else. - pub fn clone_shutdown(&self) -> Box { - self.shutdown.clone() - } - - /// Signal for the client to shutdown its connection cleanly. - pub async fn shutdown(&self) -> io::Result<()> { - self.shutdown.shutdown().await - } - - /// Returns whether the client should fully shutdown once it is dropped. If true, this will - /// result in all channels tied to the client no longer functioning once the client is dropped. - pub fn will_shutdown_on_drop(&mut self) -> bool { - self.shutdown_on_drop - } - - /// Sets whether the client should fully shutdown once it is dropped. If true, this will result - /// in all channels tied to the client no longer functioning once the client is dropped. - pub fn shutdown_on_drop(&mut self, shutdown_on_drop: bool) { - self.shutdown_on_drop = shutdown_on_drop; - } - - /// Clones the underlying [`ConnectionStateWatcher`] for the client. - pub fn clone_connection_watcher(&self) -> ConnectionWatcher { - self.watcher.clone() - } - - /// Spawns a new task that continually monitors for connection changes and invokes the function - /// `f` whenever a new change is detected. - pub fn on_connection_change(&self, f: F) -> JoinHandle<()> - where - F: FnMut(ConnectionState) + Send + 'static, - { - self.watcher.on_change(f) - } - - /// Returns true if client's underlying event processing has finished/terminated. - pub fn is_finished(&self) -> bool { - self.task.is_none() || self.task.as_ref().unwrap().is_finished() - } - - /// Spawns a client using the provided [`FramedTransport`] of [`InmemoryTransport`] and a - /// specific [`ReconnectStrategy`]. - /// - /// ### Note - /// - /// This will NOT perform any handshakes or authentication procedures nor will it replay any - /// missing frames. This is to be used when establishing a [`Client`] to be run internally - /// within a program. - pub fn spawn_inmemory( - transport: FramedTransport, - config: ClientConfig, - ) -> Self { - let connection = Connection::Client { - id: rand::random(), - reauth_otp: HeapSecretKey::generate(32).unwrap(), - transport, - }; - Self::spawn(connection, config) - } - - /// Spawns a client using the provided [`Connection`]. - pub(crate) fn spawn(mut connection: Connection, config: ClientConfig) -> Self - where - V: Transport + 'static, - { - let post_office = Arc::new(PostOffice::default()); - let weak_post_office = Arc::downgrade(&post_office); - let (tx, mut rx) = mpsc::channel::>(1); - let (shutdown_tx, mut shutdown_rx) = mpsc::channel::>>(1); - - // Ensure that our transport starts off clean (nothing in buffers or backup) - connection.clear(); - - let ClientConfig { - mut reconnect_strategy, - shutdown_on_drop, - silence_duration, - } = config; - - // Start a task that continually checks for responses and delivers them using the - // post office - let shutdown_tx_2 = shutdown_tx.clone(); - let (watcher_tx, watcher_rx) = watch::channel(ConnectionState::Connected); - let task = tokio::spawn(async move { - let mut needs_reconnect = false; - let mut last_read_frame_time = Instant::now(); - - // NOTE: We hold onto a copy of the shutdown sender, even though we will never use it, - // to prevent the channel from being closed. This is because we do a check to - // see if we get a shutdown signal or ready state, and closing the channel - // would cause recv() to resolve immediately and result in the task shutting - // down. - let _shutdown_tx = shutdown_tx_2; - - loop { - // If we have flagged that a reconnect is needed, attempt to do so - if needs_reconnect { - info!("Client encountered issue, attempting to reconnect"); - debug!("Using strategy {reconnect_strategy:?}"); - match reconnect_strategy.reconnect(&mut connection).await { - Ok(()) => { - info!("Client successfully reconnected!"); - needs_reconnect = false; - last_read_frame_time = Instant::now(); - watcher_tx.send_replace(ConnectionState::Connected); - } - Err(x) => { - error!("Unable to re-establish connection: {x}"); - watcher_tx.send_replace(ConnectionState::Disconnected); - break Err(x); - } - } - } - - macro_rules! silence_needs_reconnect { - () => {{ - info!( - "Client exceeded {}s without server activity, so attempting to reconnect", - silence_duration.as_secs_f32(), - ); - needs_reconnect = true; - watcher_tx.send_replace(ConnectionState::Reconnecting); - continue; - }}; - } - - let silence_time_remaining = silence_duration - .checked_sub(last_read_frame_time.elapsed()) - .unwrap_or_default(); - - // NOTE: sleep will not trigger if duration is zero/nanosecond scale, so we - // instead will do an early check here in the case that we need to reconnect - // prior to a sleep while polling the ready status - if silence_time_remaining.as_millis() == 0 { - silence_needs_reconnect!(); - } - - let ready = tokio::select! { - // NOTE: This should NEVER return None as we never allow the channel to close. - cb = shutdown_rx.recv() => { - info!("Client got shutdown signal, so exiting event loop"); - let cb = cb.expect("Impossible: shutdown channel closed!"); - let _ = cb.send(Ok(())); - watcher_tx.send_replace(ConnectionState::Disconnected); - break Ok(()); - } - _ = tokio::time::sleep(silence_time_remaining) => { - silence_needs_reconnect!(); - } - result = connection.ready(Interest::READABLE | Interest::WRITABLE) => { - match result { - Ok(result) => result, - Err(x) => { - error!("Failed to examine ready state: {x}"); - needs_reconnect = true; - watcher_tx.send_replace(ConnectionState::Reconnecting); - continue; - } - } - } - }; - - let mut read_blocked = !ready.is_readable(); - let mut write_blocked = !ready.is_writable(); - - if ready.is_readable() { - match connection.try_read_frame() { - // If we get an empty frame, we consider this a heartbeat and want - // to adjust our frame read time and discard it from our backup - Ok(Some(frame)) if frame.is_empty() => { - trace!("Client received heartbeat"); - last_read_frame_time = Instant::now(); - } - - // Otherwise, we attempt to parse a frame as a response - Ok(Some(frame)) => { - last_read_frame_time = Instant::now(); - match UntypedResponse::from_slice(frame.as_item()) { - Ok(response) => { - if log_enabled!(Level::Trace) { - trace!( - "Client receiving (id:{} | origin: {}): {}", - response.id, - response.origin_id, - String::from_utf8_lossy(&response.payload).to_string() - ); - } - - // For trace-level logging, we need to clone the id and - // origin id before passing the response ownership to - // be delivered elsewhere - let (id, origin_id) = if log_enabled!(Level::Trace) { - (response.id.to_string(), response.origin_id.to_string()) - } else { - (String::new(), String::new()) - }; - - // Try to send response to appropriate mailbox - // TODO: This will block if full... is that a problem? - if post_office - .deliver_untyped_response(response.into_owned()) - .await - { - trace!("Client delivered response {id} to {origin_id}"); - } else { - trace!("Client dropped response {id} to {origin_id}"); - } - } - Err(x) => { - error!("Invalid response: {x}"); - } - } - } - - Ok(None) => { - info!("Connection closed"); - needs_reconnect = true; - watcher_tx.send_replace(ConnectionState::Reconnecting); - continue; - } - Err(x) if x.kind() == io::ErrorKind::WouldBlock => read_blocked = true, - Err(x) => { - error!("Failed to read next frame: {x}"); - needs_reconnect = true; - watcher_tx.send_replace(ConnectionState::Reconnecting); - continue; - } - } - } - - if ready.is_writable() { - // If we get more data to write, attempt to write it, which will result in - // writing any queued bytes as well. Othewise, we attempt to flush any pending - // outgoing bytes that weren't sent earlier. - if let Ok(request) = rx.try_recv() { - if log_enabled!(Level::Trace) { - trace!( - "Client sending {}", - String::from_utf8_lossy(&request.to_bytes()).to_string() - ); - } - match connection.try_write_frame(request.to_bytes()) { - Ok(()) => (), - Err(x) if x.kind() == io::ErrorKind::WouldBlock => write_blocked = true, - Err(x) => { - error!("Failed to write frame: {x}"); - needs_reconnect = true; - watcher_tx.send_replace(ConnectionState::Reconnecting); - continue; - } - } - } else { - // In the case of flushing, there are two scenarios in which we want to - // mark no write occurring: - // - // 1. When flush did not write any bytes, which can happen when the buffer - // is empty - // 2. When the call to write bytes blocks - match connection.try_flush() { - Ok(0) => write_blocked = true, - Ok(_) => (), - Err(x) if x.kind() == io::ErrorKind::WouldBlock => write_blocked = true, - Err(x) => { - error!("Failed to flush outgoing data: {x}"); - needs_reconnect = true; - watcher_tx.send_replace(ConnectionState::Reconnecting); - continue; - } - } - } - } - - // If we did not read or write anything, sleep a bit to offload CPU usage - if read_blocked && write_blocked { - tokio::time::sleep(SLEEP_DURATION).await; - } - } - }); - - let channel = UntypedChannel { - tx, - post_office: weak_post_office, - }; - - Self { - channel, - watcher: ConnectionWatcher(watcher_rx), - shutdown: Box::new(shutdown_tx), - shutdown_on_drop, - task: Some(task), - } - } -} - -impl Deref for UntypedClient { - type Target = UntypedChannel; - - fn deref(&self) -> &Self::Target { - &self.channel - } -} - -impl DerefMut for UntypedClient { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.channel - } -} - -impl From for UntypedChannel { - fn from(client: UntypedClient) -> Self { - client.into_channel() - } -} - -/// Represents a client that can be used to send requests & receive responses from a server. -pub struct Client { - /// Used to send requests to a server. - channel: Channel, - - /// Used to watch for changes in the connection state. - watcher: ConnectionWatcher, - - /// Used to send shutdown request to inner task. - shutdown: Box, - - /// Indicates whether the client task will be shutdown when the client is dropped. - shutdown_on_drop: bool, - - /// Contains the task that is running to send requests and receive responses from a server. - task: Option>>, -} - -impl fmt::Debug for Client { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("Client") - .field("channel", &self.channel) - .field("shutdown", &"...") - .field("task", &self.task) - .field("shutdown_on_drop", &self.shutdown_on_drop) - .finish() - } -} - -impl Drop for Client { - fn drop(&mut self) { - if self.shutdown_on_drop { - // TODO: Shutdown is an async operation, can we use it here? - if let Some(task) = self.task.take() { - debug!("Shutdown on drop = true, so aborting client task"); - task.abort(); - } - } - } -} - -impl Client -where - T: Send + Sync + Serialize + 'static, - U: Send + Sync + DeserializeOwned + 'static, -{ - /// Consumes the client, returning an untyped variant. - pub fn into_untyped_client(mut self) -> UntypedClient { - UntypedClient { - channel: self.clone_channel().into_untyped_channel(), - watcher: self.watcher.clone(), - shutdown: self.shutdown.clone(), - shutdown_on_drop: self.shutdown_on_drop, - task: self.task.take(), - } - } - - /// Spawns a client using the provided [`FramedTransport`] of [`InmemoryTransport`] and a - /// specific [`ReconnectStrategy`]. - /// - /// ### Note - /// - /// This will NOT perform any handshakes or authentication procedures nor will it replay any - /// missing frames. This is to be used when establishing a [`Client`] to be run internally - /// within a program. - pub fn spawn_inmemory( - transport: FramedTransport, - config: ClientConfig, - ) -> Self { - UntypedClient::spawn_inmemory(transport, config).into_typed_client() - } -} - -impl Client<(), ()> { - /// Creates a new [`ClientBuilder`]. - pub fn build() -> ClientBuilder<(), ()> { - ClientBuilder::new() - } - - /// Creates a new [`ClientBuilder`] configured to use a [`TcpConnector`]. - pub fn tcp(connector: impl Into>) -> ClientBuilder<(), TcpConnector> { - ClientBuilder::new().connector(connector.into()) - } - - /// Creates a new [`ClientBuilder`] configured to use a [`UnixSocketConnector`]. - #[cfg(unix)] - pub fn unix_socket( - connector: impl Into, - ) -> ClientBuilder<(), UnixSocketConnector> { - ClientBuilder::new().connector(connector.into()) - } - - /// Creates a new [`ClientBuilder`] configured to use a local [`WindowsPipeConnector`]. - #[cfg(windows)] - pub fn local_windows_pipe( - connector: impl Into, - ) -> ClientBuilder<(), WindowsPipeConnector> { - let mut connector = connector.into(); - connector.local = true; - ClientBuilder::new().connector(connector) - } - - /// Creates a new [`ClientBuilder`] configured to use a [`WindowsPipeConnector`]. - #[cfg(windows)] - pub fn windows_pipe( - connector: impl Into, - ) -> ClientBuilder<(), WindowsPipeConnector> { - ClientBuilder::new().connector(connector.into()) - } -} - -impl Client { - /// Convert into underlying channel. - pub fn into_channel(self) -> Channel { - self.clone_channel() - } - - /// Clones the underlying channel for requests and returns the cloned instance. - pub fn clone_channel(&self) -> Channel { - self.channel.clone() - } - - /// Waits for the client to terminate, which resolves when the receiving end of the network - /// connection is closed (or the client is shutdown). Returns whether or not the client exited - /// successfully or due to an error. - pub async fn wait(mut self) -> io::Result<()> { - match self.task.take().unwrap().await { - Ok(x) => x, - Err(x) => Err(io::Error::new(io::ErrorKind::Other, x)), - } - } - - /// Abort the client's current connection by forcing its tasks to abort. - pub fn abort(&self) { - if let Some(task) = self.task.as_ref() { - task.abort(); - } - } - - /// Clones the underlying shutdown signaler for the client. This enables you to wait on the - /// client while still having the option to shut it down from somewhere else. - pub fn clone_shutdown(&self) -> Box { - self.shutdown.clone() - } - - /// Signal for the client to shutdown its connection cleanly. - pub async fn shutdown(&self) -> io::Result<()> { - self.shutdown.shutdown().await - } - - /// Returns whether the client should fully shutdown once it is dropped. If true, this will - /// result in all channels tied to the client no longer functioning once the client is dropped. - pub fn will_shutdown_on_drop(&mut self) -> bool { - self.shutdown_on_drop - } - - /// Sets whether the client should fully shutdown once it is dropped. If true, this will result - /// in all channels tied to the client no longer functioning once the client is dropped. - pub fn shutdown_on_drop(&mut self, shutdown_on_drop: bool) { - self.shutdown_on_drop = shutdown_on_drop; - } - - /// Clones the underlying [`ConnectionStateWatcher`] for the client. - pub fn clone_connection_watcher(&self) -> ConnectionWatcher { - self.watcher.clone() - } - - /// Spawns a new task that continually monitors for connection changes and invokes the function - /// `f` whenever a new change is detected. - pub fn on_connection_change(&self, f: F) -> JoinHandle<()> - where - F: FnMut(ConnectionState) + Send + 'static, - { - self.watcher.on_change(f) - } - - /// Returns true if client's underlying event processing has finished/terminated. - pub fn is_finished(&self) -> bool { - self.task.is_none() || self.task.as_ref().unwrap().is_finished() - } -} - -impl Deref for Client { - type Target = Channel; - - fn deref(&self) -> &Self::Target { - &self.channel - } -} - -impl DerefMut for Client { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.channel - } -} - -impl From> for Channel { - fn from(client: Client) -> Self { - client.clone_channel() - } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::client::ClientConfig; - use crate::common::{Ready, Request, Response, TestTransport}; - - mod typed { - use test_log::test; - - use super::*; - type TestClient = Client; - - fn spawn_test_client( - connection: Connection, - reconnect_strategy: ReconnectStrategy, - ) -> TestClient - where - T: Transport + 'static, - { - UntypedClient::spawn( - connection, - ClientConfig { - reconnect_strategy, - ..Default::default() - }, - ) - .into_typed_client() - } - - /// Creates a new test transport whose operations do not panic, but do nothing. - #[inline] - fn new_test_transport() -> TestTransport { - TestTransport { - f_try_read: Box::new(|_| Err(io::ErrorKind::WouldBlock.into())), - f_try_write: Box::new(|_| Err(io::ErrorKind::WouldBlock.into())), - f_ready: Box::new(|_| Ok(Ready::EMPTY)), - f_reconnect: Box::new(|| Ok(())), - } - } - - #[test(tokio::test)] - async fn should_write_queued_requests_as_outgoing_frames() { - let (client, mut server) = Connection::pair(100); - - let mut client = spawn_test_client(client, ReconnectStrategy::Fail); - client.fire(Request::new(1u8)).await.unwrap(); - client.fire(Request::new(2u8)).await.unwrap(); - client.fire(Request::new(3u8)).await.unwrap(); - - assert_eq!( - server - .read_frame_as::>() - .await - .unwrap() - .unwrap() - .payload, - 1 - ); - assert_eq!( - server - .read_frame_as::>() - .await - .unwrap() - .unwrap() - .payload, - 2 - ); - assert_eq!( - server - .read_frame_as::>() - .await - .unwrap() - .unwrap() - .payload, - 3 - ); - } - - #[test(tokio::test)] - async fn should_read_incoming_frames_as_responses_and_deliver_them_to_waiting_mailboxes() { - let (client, mut server) = Connection::pair(100); - - // NOTE: Spawn a separate task to handle the response so we do not deadlock - tokio::spawn(async move { - let request = server - .read_frame_as::>() - .await - .unwrap() - .unwrap(); - server - .write_frame_for(&Response::new(request.id, 2u8)) - .await - .unwrap(); - }); - - let mut client = spawn_test_client(client, ReconnectStrategy::Fail); - assert_eq!(client.send(Request::new(1u8)).await.unwrap().payload, 2); - } - - #[test(tokio::test)] - async fn should_attempt_to_reconnect_if_connection_fails_to_determine_state() { - let (reconnect_tx, mut reconnect_rx) = mpsc::channel(1); - spawn_test_client( - Connection::test_client({ - let mut transport = new_test_transport(); - - transport.f_ready = Box::new(|_| Err(io::ErrorKind::Other.into())); - - // Send a signal that the reconnect happened while marking it successful - transport.f_reconnect = Box::new(move || { - reconnect_tx.try_send(()).expect("reconnect tx blocked"); - Ok(()) - }); - - transport - }), - ReconnectStrategy::FixedInterval { - interval: Duration::from_millis(50), - max_retries: None, - timeout: None, - }, - ); - - reconnect_rx.recv().await.expect("Reconnect did not occur"); - } - - #[test(tokio::test)] - async fn should_attempt_to_reconnect_if_connection_closed_by_server() { - let (reconnect_tx, mut reconnect_rx) = mpsc::channel(1); - spawn_test_client( - Connection::test_client({ - let mut transport = new_test_transport(); - - // Report back that we're readable to trigger try_read - transport.f_ready = Box::new(|_| Ok(Ready::READABLE)); - - // Report that no bytes were written, indicting the channel was closed - transport.f_try_read = Box::new(|_| Ok(0)); - - // Send a signal that the reconnect happened while marking it successful - transport.f_reconnect = Box::new(move || { - reconnect_tx.try_send(()).expect("reconnect tx blocked"); - Ok(()) - }); - - transport - }), - ReconnectStrategy::FixedInterval { - interval: Duration::from_millis(50), - max_retries: None, - timeout: None, - }, - ); - - reconnect_rx.recv().await.expect("Reconnect did not occur"); - } - - #[test(tokio::test)] - async fn should_attempt_to_reconnect_if_connection_errors_while_reading_data() { - let (reconnect_tx, mut reconnect_rx) = mpsc::channel(1); - spawn_test_client( - Connection::test_client({ - let mut transport = new_test_transport(); - - // Report back that we're readable to trigger try_read - transport.f_ready = Box::new(|_| Ok(Ready::READABLE)); - - // Fail the read - transport.f_try_read = Box::new(|_| Err(io::ErrorKind::Other.into())); - - // Send a signal that the reconnect happened while marking it successful - transport.f_reconnect = Box::new(move || { - reconnect_tx.try_send(()).expect("reconnect tx blocked"); - Ok(()) - }); - - transport - }), - ReconnectStrategy::FixedInterval { - interval: Duration::from_millis(50), - max_retries: None, - timeout: None, - }, - ); - - reconnect_rx.recv().await.expect("Reconnect did not occur"); - } - - #[test(tokio::test)] - async fn should_attempt_to_reconnect_if_connection_unable_to_send_new_request() { - let (reconnect_tx, mut reconnect_rx) = mpsc::channel(1); - let mut client = spawn_test_client( - Connection::test_client({ - let mut transport = new_test_transport(); - - // Report back that we're readable to trigger try_read - transport.f_ready = Box::new(|_| Ok(Ready::WRITABLE)); - - // Fail the write - transport.f_try_write = Box::new(|_| Err(io::ErrorKind::Other.into())); - - // Send a signal that the reconnect happened while marking it successful - transport.f_reconnect = Box::new(move || { - reconnect_tx.try_send(()).expect("reconnect tx blocked"); - Ok(()) - }); - - transport - }), - ReconnectStrategy::FixedInterval { - interval: Duration::from_millis(50), - max_retries: None, - timeout: None, - }, - ); - - // Queue up a request to fail to send - client - .fire(Request::new(123u8)) - .await - .expect("Failed to queue request"); - - reconnect_rx.recv().await.expect("Reconnect did not occur"); - } - - #[test(tokio::test)] - async fn should_attempt_to_reconnect_if_connection_unable_to_flush_an_existing_request() { - let (reconnect_tx, mut reconnect_rx) = mpsc::channel(1); - let mut client = spawn_test_client( - Connection::test_client({ - let mut transport = new_test_transport(); - - // Report back that we're readable to trigger try_read - transport.f_ready = Box::new(|_| Ok(Ready::WRITABLE)); - - // Succeed partially with initial try_write, block on second call, and then - // fail during a try_flush - transport.f_try_write = Box::new(|buf| unsafe { - static mut CNT: u8 = 0; - CNT += 1; - if CNT == 1 { - Ok(buf.len() / 2) - } else if CNT == 2 { - Err(io::ErrorKind::WouldBlock.into()) - } else { - Err(io::ErrorKind::Other.into()) - } - }); - - // Send a signal that the reconnect happened while marking it successful - transport.f_reconnect = Box::new(move || { - reconnect_tx.try_send(()).expect("reconnect tx blocked"); - Ok(()) - }); - - transport - }), - ReconnectStrategy::FixedInterval { - interval: Duration::from_millis(50), - max_retries: None, - timeout: None, - }, - ); - - // Queue up a request to fail to send - client - .fire(Request::new(123u8)) - .await - .expect("Failed to queue request"); - - reconnect_rx.recv().await.expect("Reconnect did not occur"); - } - - #[test(tokio::test)] - async fn should_exit_if_reconnect_strategy_has_failed_to_connect() { - let (client, server) = Connection::pair(100); - - // Spawn the client, verify the task is running, kill our server, and verify that the - // client does not block trying to reconnect - let client = spawn_test_client(client, ReconnectStrategy::Fail); - assert!(!client.is_finished(), "Client unexpectedly died"); - drop(server); - assert_eq!( - client.wait().await.unwrap_err().kind(), - io::ErrorKind::ConnectionAborted - ); - } - - #[test(tokio::test)] - async fn should_exit_if_shutdown_signal_detected() { - let (client, _server) = Connection::pair(100); - - let client = spawn_test_client(client, ReconnectStrategy::Fail); - client.shutdown().await.unwrap(); - - // NOTE: We wait for the client's task to conclude by using `wait` to ensure we do not - // have a race condition testing the task finished state. This will also verify - // that the task exited cleanly, rather than panicking. - client.wait().await.unwrap(); - } - - #[test(tokio::test)] - async fn should_not_exit_if_shutdown_channel_is_closed() { - let (client, mut server) = Connection::pair(100); - - // NOTE: Spawn a separate task to handle the response so we do not deadlock - tokio::spawn(async move { - let request = server - .read_frame_as::>() - .await - .unwrap() - .unwrap(); - server - .write_frame_for(&Response::new(request.id, 2u8)) - .await - .unwrap(); - }); - - // NOTE: We consume the client to produce a channel without maintaining the shutdown - // channel in order to ensure that dropping the client does not kill the task. - let mut channel = spawn_test_client(client, ReconnectStrategy::Fail).into_channel(); - assert_eq!(channel.send(Request::new(1u8)).await.unwrap().payload, 2); - } - } - - mod untyped { - use test_log::test; - - use super::*; - type TestClient = UntypedClient; - - /// Creates a new test transport whose operations do not panic, but do nothing. - #[inline] - fn new_test_transport() -> TestTransport { - TestTransport { - f_try_read: Box::new(|_| Err(io::ErrorKind::WouldBlock.into())), - f_try_write: Box::new(|_| Err(io::ErrorKind::WouldBlock.into())), - f_ready: Box::new(|_| Ok(Ready::EMPTY)), - f_reconnect: Box::new(|| Ok(())), - } - } - - #[test(tokio::test)] - async fn should_write_queued_requests_as_outgoing_frames() { - let (client, mut server) = Connection::pair(100); - - let mut client = TestClient::spawn(client, Default::default()); - client - .fire(Request::new(1u8).to_untyped_request().unwrap()) - .await - .unwrap(); - client - .fire(Request::new(2u8).to_untyped_request().unwrap()) - .await - .unwrap(); - client - .fire(Request::new(3u8).to_untyped_request().unwrap()) - .await - .unwrap(); - - assert_eq!( - server - .read_frame_as::>() - .await - .unwrap() - .unwrap() - .payload, - 1 - ); - assert_eq!( - server - .read_frame_as::>() - .await - .unwrap() - .unwrap() - .payload, - 2 - ); - assert_eq!( - server - .read_frame_as::>() - .await - .unwrap() - .unwrap() - .payload, - 3 - ); - } - - #[test(tokio::test)] - async fn should_read_incoming_frames_as_responses_and_deliver_them_to_waiting_mailboxes() { - let (client, mut server) = Connection::pair(100); - - // NOTE: Spawn a separate task to handle the response so we do not deadlock - tokio::spawn(async move { - let request = server - .read_frame_as::>() - .await - .unwrap() - .unwrap(); - server - .write_frame_for(&Response::new(request.id, 2u8)) - .await - .unwrap(); - }); - - let mut client = TestClient::spawn(client, Default::default()); - assert_eq!( - client - .send(Request::new(1u8).to_untyped_request().unwrap()) - .await - .unwrap() - .to_typed_response::() - .unwrap() - .payload, - 2 - ); - } - - #[test(tokio::test)] - async fn should_attempt_to_reconnect_if_connection_fails_to_determine_state() { - let (reconnect_tx, mut reconnect_rx) = mpsc::channel(1); - TestClient::spawn( - Connection::test_client({ - let mut transport = new_test_transport(); - - transport.f_ready = Box::new(|_| Err(io::ErrorKind::Other.into())); - - // Send a signal that the reconnect happened while marking it successful - transport.f_reconnect = Box::new(move || { - reconnect_tx.try_send(()).expect("reconnect tx blocked"); - Ok(()) - }); - - transport - }), - ClientConfig { - reconnect_strategy: ReconnectStrategy::FixedInterval { - interval: Duration::from_millis(50), - max_retries: None, - timeout: None, - }, - ..Default::default() - }, - ); - - reconnect_rx.recv().await.expect("Reconnect did not occur"); - } - - #[test(tokio::test)] - async fn should_attempt_to_reconnect_if_connection_closed_by_server() { - let (reconnect_tx, mut reconnect_rx) = mpsc::channel(1); - TestClient::spawn( - Connection::test_client({ - let mut transport = new_test_transport(); - - // Report back that we're readable to trigger try_read - transport.f_ready = Box::new(|_| Ok(Ready::READABLE)); - - // Report that no bytes were written, indicting the channel was closed - transport.f_try_read = Box::new(|_| Ok(0)); - - // Send a signal that the reconnect happened while marking it successful - transport.f_reconnect = Box::new(move || { - reconnect_tx.try_send(()).expect("reconnect tx blocked"); - Ok(()) - }); - - transport - }), - ClientConfig { - reconnect_strategy: ReconnectStrategy::FixedInterval { - interval: Duration::from_millis(50), - max_retries: None, - timeout: None, - }, - ..Default::default() - }, - ); - - reconnect_rx.recv().await.expect("Reconnect did not occur"); - } - - #[test(tokio::test)] - async fn should_attempt_to_reconnect_if_connection_errors_while_reading_data() { - let (reconnect_tx, mut reconnect_rx) = mpsc::channel(1); - TestClient::spawn( - Connection::test_client({ - let mut transport = new_test_transport(); - - // Report back that we're readable to trigger try_read - transport.f_ready = Box::new(|_| Ok(Ready::READABLE)); - - // Fail the read - transport.f_try_read = Box::new(|_| Err(io::ErrorKind::Other.into())); - - // Send a signal that the reconnect happened while marking it successful - transport.f_reconnect = Box::new(move || { - reconnect_tx.try_send(()).expect("reconnect tx blocked"); - Ok(()) - }); - - transport - }), - ClientConfig { - reconnect_strategy: ReconnectStrategy::FixedInterval { - interval: Duration::from_millis(50), - max_retries: None, - timeout: None, - }, - ..Default::default() - }, - ); - - reconnect_rx.recv().await.expect("Reconnect did not occur"); - } - - #[test(tokio::test)] - async fn should_attempt_to_reconnect_if_connection_unable_to_send_new_request() { - let (reconnect_tx, mut reconnect_rx) = mpsc::channel(1); - let mut client = TestClient::spawn( - Connection::test_client({ - let mut transport = new_test_transport(); - - // Report back that we're readable to trigger try_read - transport.f_ready = Box::new(|_| Ok(Ready::WRITABLE)); - - // Fail the write - transport.f_try_write = Box::new(|_| Err(io::ErrorKind::Other.into())); - - // Send a signal that the reconnect happened while marking it successful - transport.f_reconnect = Box::new(move || { - reconnect_tx.try_send(()).expect("reconnect tx blocked"); - Ok(()) - }); - - transport - }), - ClientConfig { - reconnect_strategy: ReconnectStrategy::FixedInterval { - interval: Duration::from_millis(50), - max_retries: None, - timeout: None, - }, - ..Default::default() - }, - ); - - // Queue up a request to fail to send - client - .fire(Request::new(123u8).to_untyped_request().unwrap()) - .await - .expect("Failed to queue request"); - - reconnect_rx.recv().await.expect("Reconnect did not occur"); - } - - #[test(tokio::test)] - async fn should_attempt_to_reconnect_if_connection_unable_to_flush_an_existing_request() { - let (reconnect_tx, mut reconnect_rx) = mpsc::channel(1); - let mut client = TestClient::spawn( - Connection::test_client({ - let mut transport = new_test_transport(); - - // Report back that we're readable to trigger try_read - transport.f_ready = Box::new(|_| Ok(Ready::WRITABLE)); - - // Succeed partially with initial try_write, block on second call, and then - // fail during a try_flush - transport.f_try_write = Box::new(|buf| unsafe { - static mut CNT: u8 = 0; - CNT += 1; - if CNT == 1 { - Ok(buf.len() / 2) - } else if CNT == 2 { - Err(io::ErrorKind::WouldBlock.into()) - } else { - Err(io::ErrorKind::Other.into()) - } - }); - - // Send a signal that the reconnect happened while marking it successful - transport.f_reconnect = Box::new(move || { - reconnect_tx.try_send(()).expect("reconnect tx blocked"); - Ok(()) - }); - - transport - }), - ClientConfig { - reconnect_strategy: ReconnectStrategy::FixedInterval { - interval: Duration::from_millis(50), - max_retries: None, - timeout: None, - }, - ..Default::default() - }, - ); - - // Queue up a request to fail to send - client - .fire(Request::new(123u8).to_untyped_request().unwrap()) - .await - .expect("Failed to queue request"); - - reconnect_rx.recv().await.expect("Reconnect did not occur"); - } - - #[test(tokio::test)] - async fn should_exit_if_reconnect_strategy_has_failed_to_connect() { - let (client, server) = Connection::pair(100); - - // Spawn the client, verify the task is running, kill our server, and verify that the - // client does not block trying to reconnect - let client = TestClient::spawn(client, Default::default()); - assert!(!client.is_finished(), "Client unexpectedly died"); - drop(server); - assert_eq!( - client.wait().await.unwrap_err().kind(), - io::ErrorKind::ConnectionAborted - ); - } - - #[test(tokio::test)] - async fn should_exit_if_shutdown_signal_detected() { - let (client, _server) = Connection::pair(100); - - let client = TestClient::spawn(client, Default::default()); - client.shutdown().await.unwrap(); - - // NOTE: We wait for the client's task to conclude by using `wait` to ensure we do not - // have a race condition testing the task finished state. This will also verify - // that the task exited cleanly, rather than panicking. - client.wait().await.unwrap(); - } - - #[test(tokio::test)] - async fn should_not_exit_if_shutdown_channel_is_closed() { - let (client, mut server) = Connection::pair(100); - - // NOTE: Spawn a separate task to handle the response so we do not deadlock - tokio::spawn(async move { - let request = server - .read_frame_as::>() - .await - .unwrap() - .unwrap(); - server - .write_frame_for(&Response::new(request.id, 2u8)) - .await - .unwrap(); - }); - - // NOTE: We consume the client to produce a channel without maintaining the shutdown - // channel in order to ensure that dropping the client does not kill the task. - let mut channel = TestClient::spawn(client, Default::default()).into_channel(); - assert_eq!( - channel - .send(Request::new(1u8).to_untyped_request().unwrap()) - .await - .unwrap() - .to_typed_response::() - .unwrap() - .payload, - 2 - ); - } - - #[test(tokio::test)] - async fn should_attempt_to_reconnect_if_no_activity_from_server_within_silence_duration() { - let (client, _) = Connection::pair(100); - - // NOTE: We consume the client to produce a channel without maintaining the shutdown - // channel in order to ensure that dropping the client does not kill the task. - let client = TestClient::spawn( - client, - ClientConfig { - silence_duration: Duration::from_millis(100), - reconnect_strategy: ReconnectStrategy::FixedInterval { - interval: Duration::from_millis(50), - max_retries: Some(3), - timeout: None, - }, - ..Default::default() - }, - ); - - let (tx, mut rx) = mpsc::unbounded_channel(); - client.on_connection_change(move |state| tx.send(state).unwrap()); - assert_eq!(rx.recv().await, Some(ConnectionState::Reconnecting)); - assert_eq!(rx.recv().await, Some(ConnectionState::Disconnected)); - assert_eq!(rx.recv().await, None); - } - } -} diff --git a/distant-core-net/src/common.rs b/distant-core-net/src/common.rs deleted file mode 100644 index d1f69ff..0000000 --- a/distant-core-net/src/common.rs +++ /dev/null @@ -1,21 +0,0 @@ -mod any; -mod connection; -mod key; -mod keychain; -mod listener; -mod packet; -mod port; -mod transport; -pub(crate) mod utils; -mod version; - -pub use any::*; -pub(crate) use connection::Connection; -pub use connection::ConnectionId; -pub use key::*; -pub use keychain::*; -pub use listener::*; -pub use packet::*; -pub use port::*; -pub use transport::*; -pub use version::*; diff --git a/distant-core-net/src/common/connection.rs b/distant-core-net/src/connection.rs similarity index 99% rename from distant-core-net/src/common/connection.rs rename to distant-core-net/src/connection.rs index 80dc178..d90fbaa 100644 --- a/distant-core-net/src/common/connection.rs +++ b/distant-core-net/src/connection.rs @@ -8,16 +8,16 @@ use serde::{Deserialize, Serialize}; use tokio::sync::oneshot; #[cfg(test)] -use crate::common::InmemoryTransport; -use crate::common::{ +use crate::InmemoryTransport; +use crate::{ Backup, FramedTransport, HeapSecretKey, Keychain, KeychainResult, Reconnectable, Transport, TransportExt, Version, }; -/// Id of the connection +/// Id of the connection. pub type ConnectionId = u32; -/// Represents a connection from either the client or server side +/// Represents a connection from either the client or server side. #[derive(Debug)] pub enum Connection { /// Connection from the client side @@ -179,7 +179,7 @@ where } } -/// Type of connection to perform +/// Type of connection to perform. #[derive(Debug, Serialize, Deserialize)] enum ConnectType { /// Indicates that the connection from client to server is no and not a reconnection diff --git a/distant-core-net/src/common/key.rs b/distant-core-net/src/key.rs similarity index 100% rename from distant-core-net/src/common/key.rs rename to distant-core-net/src/key.rs diff --git a/distant-core-net/src/common/keychain.rs b/distant-core-net/src/keychain.rs similarity index 99% rename from distant-core-net/src/common/keychain.rs rename to distant-core-net/src/keychain.rs index 8ff6a16..d25d43d 100644 --- a/distant-core-net/src/common/keychain.rs +++ b/distant-core-net/src/keychain.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use tokio::sync::RwLock; -use crate::common::HeapSecretKey; +use crate::HeapSecretKey; /// Represents the result of a request to the database. #[derive(Copy, Clone, Debug, PartialEq, Eq)] diff --git a/distant-core-net/src/lib.rs b/distant-core-net/src/lib.rs index 4cc6432..f5b1277 100644 --- a/distant-core-net/src/lib.rs +++ b/distant-core-net/src/lib.rs @@ -4,14 +4,28 @@ #[cfg(doctest)] pub struct ReadmeDoctests; +mod any; mod authentication; -pub mod client; -pub mod common; -pub mod manager; -pub mod server; +mod connection; +mod key; +mod keychain; +mod listener; +mod packet; +mod port; +mod transport; +pub(crate) mod utils; +mod version; + +pub use any::*; +pub use connection::*; +pub use key::*; +pub use keychain::*; +pub use listener::*; +pub use packet::*; +pub use port::*; +pub use transport::*; +pub use version::*; -pub use client::{Client, ReconnectStrategy}; /// Authentication functionality tied to network operations. pub use distant_core_auth as auth; -pub use server::Server; pub use {log, paste}; diff --git a/distant-core-net/src/common/listener.rs b/distant-core-net/src/listener.rs similarity index 100% rename from distant-core-net/src/common/listener.rs rename to distant-core-net/src/listener.rs diff --git a/distant-core-net/src/common/listener/mapped.rs b/distant-core-net/src/listener/mapped.rs similarity index 100% rename from distant-core-net/src/common/listener/mapped.rs rename to distant-core-net/src/listener/mapped.rs diff --git a/distant-core-net/src/common/listener/mpsc.rs b/distant-core-net/src/listener/mpsc.rs similarity index 100% rename from distant-core-net/src/common/listener/mpsc.rs rename to distant-core-net/src/listener/mpsc.rs diff --git a/distant-core-net/src/common/listener/oneshot.rs b/distant-core-net/src/listener/oneshot.rs similarity index 100% rename from distant-core-net/src/common/listener/oneshot.rs rename to distant-core-net/src/listener/oneshot.rs diff --git a/distant-core-net/src/common/listener/tcp.rs b/distant-core-net/src/listener/tcp.rs similarity index 99% rename from distant-core-net/src/common/listener/tcp.rs rename to distant-core-net/src/listener/tcp.rs index 0bd4c9f..8189572 100644 --- a/distant-core-net/src/common/listener/tcp.rs +++ b/distant-core-net/src/listener/tcp.rs @@ -5,7 +5,7 @@ use async_trait::async_trait; use tokio::net::TcpListener as TokioTcpListener; use super::Listener; -use crate::common::{PortRange, TcpTransport}; +use crate::{PortRange, TcpTransport}; /// Represents a [`Listener`] for incoming connections over TCP pub struct TcpListener { diff --git a/distant-core-net/src/common/listener/unix.rs b/distant-core-net/src/listener/unix.rs similarity index 99% rename from distant-core-net/src/common/listener/unix.rs rename to distant-core-net/src/listener/unix.rs index aabce9d..83ab770 100644 --- a/distant-core-net/src/common/listener/unix.rs +++ b/distant-core-net/src/listener/unix.rs @@ -6,7 +6,7 @@ use async_trait::async_trait; use tokio::net::{UnixListener, UnixStream}; use super::Listener; -use crate::common::UnixSocketTransport; +use crate::UnixSocketTransport; /// Represents a [`Listener`] for incoming connections over a Unix socket pub struct UnixSocketListener { diff --git a/distant-core-net/src/common/listener/windows.rs b/distant-core-net/src/listener/windows.rs similarity index 99% rename from distant-core-net/src/common/listener/windows.rs rename to distant-core-net/src/listener/windows.rs index be58cf3..9cf6859 100644 --- a/distant-core-net/src/common/listener/windows.rs +++ b/distant-core-net/src/listener/windows.rs @@ -5,7 +5,7 @@ use async_trait::async_trait; use tokio::net::windows::named_pipe::{NamedPipeServer, ServerOptions}; use super::Listener; -use crate::common::{NamedPipe, WindowsPipeTransport}; +use crate::{NamedPipe, WindowsPipeTransport}; /// Represents a [`Listener`] for incoming connections over a named windows pipe pub struct WindowsPipeListener { diff --git a/distant-core-net/src/manager.rs b/distant-core-net/src/manager.rs deleted file mode 100644 index 0201d71..0000000 --- a/distant-core-net/src/manager.rs +++ /dev/null @@ -1,16 +0,0 @@ -mod client; -mod data; -mod server; - -pub use client::*; -pub use data::*; -pub use server::*; - -use crate::common::Version; - -/// Represents the version associated with the manager's protocol. -pub const PROTOCOL_VERSION: Version = Version::new( - const_str::parse!(env!("CARGO_PKG_VERSION_MAJOR"), u64), - const_str::parse!(env!("CARGO_PKG_VERSION_MINOR"), u64), - const_str::parse!(env!("CARGO_PKG_VERSION_PATCH"), u64), -); diff --git a/distant-core-net/src/common/packet.rs b/distant-core-net/src/packet.rs similarity index 100% rename from distant-core-net/src/common/packet.rs rename to distant-core-net/src/packet.rs diff --git a/distant-core-net/src/common/packet/header.rs b/distant-core-net/src/packet/header.rs similarity index 96% rename from distant-core-net/src/common/packet/header.rs rename to distant-core-net/src/packet/header.rs index f0f478f..b9b10e5 100644 --- a/distant-core-net/src/common/packet/header.rs +++ b/distant-core-net/src/packet/header.rs @@ -6,7 +6,7 @@ use derive_more::IntoIterator; use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; -use crate::common::{utils, Value}; +use crate::{utils, Value}; /// Generates a new [`Header`] of key/value pairs based on literals. /// @@ -18,7 +18,7 @@ use crate::common::{utils, Value}; #[macro_export] macro_rules! header { ($($key:literal -> $value:expr),* $(,)?) => {{ - let mut _header = $crate::common::Header::default(); + let mut _header = $crate::Header::default(); $( _header.insert($key, $value); diff --git a/distant-core-net/src/common/packet/request.rs b/distant-core-net/src/packet/request.rs similarity index 99% rename from distant-core-net/src/common/packet/request.rs rename to distant-core-net/src/packet/request.rs index bd74e34..7056490 100644 --- a/distant-core-net/src/common/packet/request.rs +++ b/distant-core-net/src/packet/request.rs @@ -6,8 +6,7 @@ use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; use super::{read_header_bytes, read_key_eq, read_str_bytes, Header, Id}; -use crate::common::utils; -use crate::header; +use crate::{header, utils}; /// Represents a request to send #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] diff --git a/distant-core-net/src/common/packet/response.rs b/distant-core-net/src/packet/response.rs similarity index 99% rename from distant-core-net/src/common/packet/response.rs rename to distant-core-net/src/packet/response.rs index 6056fe6..72a9387 100644 --- a/distant-core-net/src/common/packet/response.rs +++ b/distant-core-net/src/packet/response.rs @@ -6,8 +6,7 @@ use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; use super::{read_header_bytes, read_key_eq, read_str_bytes, Header, Id}; -use crate::common::utils; -use crate::header; +use crate::{header, utils}; /// Represents a response received related to some response #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] diff --git a/distant-core-net/src/common/packet/value.rs b/distant-core-net/src/packet/value.rs similarity index 98% rename from distant-core-net/src/common/packet/value.rs rename to distant-core-net/src/packet/value.rs index 0dc25b0..b69f790 100644 --- a/distant-core-net/src/common/packet/value.rs +++ b/distant-core-net/src/packet/value.rs @@ -5,7 +5,7 @@ use std::ops::{Deref, DerefMut}; use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; -use crate::common::utils; +use crate::utils; /// Generic value type for data passed through header. #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] diff --git a/distant-core-net/src/common/port.rs b/distant-core-net/src/port.rs similarity index 100% rename from distant-core-net/src/common/port.rs rename to distant-core-net/src/port.rs diff --git a/distant-core-net/src/server.rs b/distant-core-net/src/server.rs deleted file mode 100644 index c70ce81..0000000 --- a/distant-core-net/src/server.rs +++ /dev/null @@ -1,473 +0,0 @@ -use std::io; -use std::sync::Arc; -use std::time::Duration; - -use async_trait::async_trait; -use distant_core_auth::Verifier; -use log::*; -use serde::de::DeserializeOwned; -use serde::Serialize; -use tokio::sync::{broadcast, RwLock}; - -use crate::common::{ConnectionId, Listener, Response, Transport, Version}; - -mod builder; -pub use builder::*; - -mod config; -pub use config::*; - -mod connection; -use connection::*; - -mod context; -pub use context::*; - -mod r#ref; -pub use r#ref::*; - -mod reply; -pub use reply::*; - -mod state; -use state::*; - -mod shutdown_timer; -use shutdown_timer::*; - -/// Represents a server that can be used to receive requests & send responses to clients. -pub struct Server { - /// Custom configuration details associated with the server - config: ServerConfig, - - /// Handler used to process various server events - handler: T, - - /// Performs authentication using various methods - verifier: Verifier, - - /// Version associated with the server used by clients to verify compatibility - version: Version, -} - -/// Interface for a handler that receives connections and requests -#[async_trait] -pub trait ServerHandler: Send { - /// Type of data received by the server - type Request; - - /// Type of data sent back by the server - type Response; - - /// Invoked upon a new connection becoming established. - #[allow(unused_variables)] - async fn on_connect(&self, id: ConnectionId) -> io::Result<()> { - Ok(()) - } - - /// Invoked upon an existing connection getting dropped. - #[allow(unused_variables)] - async fn on_disconnect(&self, id: ConnectionId) -> io::Result<()> { - Ok(()) - } - - /// Invoked upon receiving a request from a client. The server should process this - /// request, which can be found in `ctx`, and send one or more replies in response. - async fn on_request(&self, ctx: RequestCtx); -} - -impl Server<()> { - /// Creates a new [`Server`], starting with a default configuration, no authentication methods, - /// and no [`ServerHandler`]. - pub fn new() -> Self { - Self { - config: Default::default(), - handler: (), - verifier: Verifier::empty(), - version: Default::default(), - } - } - - /// Creates a new [`TcpServerBuilder`] that is used to construct a [`Server`]. - pub fn tcp() -> TcpServerBuilder<()> { - TcpServerBuilder::default() - } - - /// Creates a new [`UnixSocketServerBuilder`] that is used to construct a [`Server`]. - #[cfg(unix)] - pub fn unix_socket() -> UnixSocketServerBuilder<()> { - UnixSocketServerBuilder::default() - } - - /// Creates a new [`WindowsPipeServerBuilder`] that is used to construct a [`Server`]. - #[cfg(windows)] - pub fn windows_pipe() -> WindowsPipeServerBuilder<()> { - WindowsPipeServerBuilder::default() - } -} - -impl Default for Server<()> { - fn default() -> Self { - Self::new() - } -} - -impl Server { - /// Consumes the current server, replacing its config with `config` and returning it. - pub fn config(self, config: ServerConfig) -> Self { - Self { - config, - handler: self.handler, - verifier: self.verifier, - version: self.version, - } - } - - /// Consumes the current server, replacing its handler with `handler` and returning it. - pub fn handler(self, handler: U) -> Server { - Server { - config: self.config, - handler, - verifier: self.verifier, - version: self.version, - } - } - - /// Consumes the current server, replacing its verifier with `verifier` and returning it. - pub fn verifier(self, verifier: Verifier) -> Self { - Self { - config: self.config, - handler: self.handler, - verifier, - version: self.version, - } - } - - /// Consumes the current server, replacing its version with `version` and returning it. - pub fn version(self, version: Version) -> Self { - Self { - config: self.config, - handler: self.handler, - verifier: self.verifier, - version, - } - } -} - -impl Server -where - T: ServerHandler + Sync + 'static, - T::Request: DeserializeOwned + Send + Sync + 'static, - T::Response: Serialize + Send + 'static, -{ - /// Consumes the server, starting a task to process connections from the `listener` and - /// returning a [`ServerRef`] that can be used to control the active server instance. - pub fn start(self, listener: L) -> io::Result - where - L: Listener + 'static, - L::Output: Transport + 'static, - { - let state = Arc::new(ServerState::new()); - let (tx, rx) = broadcast::channel(1); - let task = tokio::spawn(self.task(Arc::clone(&state), listener, tx.clone(), rx)); - - Ok(ServerRef { shutdown: tx, task }) - } - - /// Internal task that is run to receive connections and spawn connection tasks - async fn task( - self, - state: Arc>>, - mut listener: L, - shutdown_tx: broadcast::Sender<()>, - shutdown_rx: broadcast::Receiver<()>, - ) where - L: Listener + 'static, - L::Output: Transport + 'static, - { - let Server { - config, - handler, - verifier, - version, - } = self; - - let handler = Arc::new(handler); - let timer = ShutdownTimer::start(config.shutdown); - let mut notification = timer.clone_notification(); - let timer = Arc::new(RwLock::new(timer)); - let verifier = Arc::new(verifier); - - let mut connection_tasks = Vec::new(); - loop { - // Receive a new connection, exiting if no longer accepting connections or if the shutdown - // signal has been received - let transport = tokio::select! { - result = listener.accept() => { - match result { - Ok(x) => x, - Err(x) => { - error!("Server no longer accepting connections: {x}"); - timer.read().await.abort(); - break; - } - } - } - _ = notification.wait() => { - info!( - "Server shutdown triggered after {}s", - config.shutdown.duration().unwrap_or_default().as_secs_f32(), - ); - - let _ = shutdown_tx.send(()); - - break; - } - }; - - // Ensure that the shutdown timer is cancelled now that we have a connection - timer.read().await.stop(); - - connection_tasks.push( - ConnectionTask::build() - .handler(Arc::downgrade(&handler)) - .state(Arc::downgrade(&state)) - .keychain(state.keychain.clone()) - .transport(transport) - .shutdown(shutdown_rx.resubscribe()) - .shutdown_timer(Arc::downgrade(&timer)) - .sleep_duration(config.connection_sleep) - .heartbeat_duration(config.connection_heartbeat) - .verifier(Arc::downgrade(&verifier)) - .version(version.clone()) - .spawn(), - ); - - // Clean up current tasks being tracked - connection_tasks.retain(|task| !task.is_finished()); - } - - // Once we stop listening, we still want to wait until all connections have terminated - info!("Server waiting for active connections to terminate"); - loop { - connection_tasks.retain(|task| !task.is_finished()); - if connection_tasks.is_empty() { - break; - } - tokio::time::sleep(Duration::from_millis(50)).await; - } - info!("Server task terminated"); - } -} - -#[cfg(test)] -mod tests { - use std::time::Duration; - - use async_trait::async_trait; - use distant_core_auth::{AuthenticationMethod, DummyAuthHandler, NoneAuthenticationMethod}; - use test_log::test; - use tokio::sync::mpsc; - - use super::*; - use crate::common::{Connection, InmemoryTransport, MpscListener, Request, Response}; - - macro_rules! server_version { - () => { - Version::new(1, 2, 3) - }; - } - - pub struct TestServerHandler; - - #[async_trait] - impl ServerHandler for TestServerHandler { - type Request = u16; - type Response = String; - - async fn on_request(&self, ctx: RequestCtx) { - // Always send back "hello" - ctx.reply.send("hello".to_string()).unwrap(); - } - } - - #[inline] - fn make_test_server(config: ServerConfig) -> Server { - let methods: Vec> = - vec![Box::new(NoneAuthenticationMethod::new())]; - - Server { - config, - handler: TestServerHandler, - verifier: Verifier::new(methods), - version: server_version!(), - } - } - - #[allow(clippy::type_complexity)] - fn make_listener( - buffer: usize, - ) -> ( - mpsc::Sender, - MpscListener, - ) { - MpscListener::channel(buffer) - } - - #[test(tokio::test)] - async fn should_invoke_handler_upon_receiving_a_request() { - // Create a test listener where we will forward a connection - let (tx, listener) = make_listener(100); - - // Make bounded transport pair and send off one of them to act as our connection - let (transport, connection) = InmemoryTransport::pair(100); - tx.send(connection) - .await - .expect("Failed to feed listener a connection"); - - let _server = make_test_server(ServerConfig::default()) - .start(listener) - .expect("Failed to start server"); - - // Perform handshake and authentication with the server before beginning to send data - let mut connection = Connection::client(transport, DummyAuthHandler, server_version!()) - .await - .expect("Failed to connect to server"); - - connection - .write_frame(Request::new(123).to_vec().unwrap()) - .await - .expect("Failed to send request"); - - // Wait for a response - let frame = connection.read_frame().await.unwrap().unwrap(); - let response: Response = Response::from_slice(frame.as_item()).unwrap(); - assert_eq!(response.payload, "hello"); - } - - #[test(tokio::test)] - async fn should_lonely_shutdown_if_no_connections_received_after_n_secs_when_config_set() { - let (_tx, listener) = make_listener(100); - - let server = make_test_server(ServerConfig { - shutdown: Shutdown::Lonely(Duration::from_millis(100)), - ..Default::default() - }) - .start(listener) - .expect("Failed to start server"); - - // Wait for some time - tokio::time::sleep(Duration::from_millis(300)).await; - - assert!(server.is_finished(), "Server shutdown not triggered!"); - } - - #[test(tokio::test)] - async fn should_lonely_shutdown_if_last_connection_terminated_and_then_no_connections_after_n_secs( - ) { - // Create a test listener where we will forward a connection - let (tx, listener) = make_listener(100); - - // Make bounded transport pair and send off one of them to act as our connection - let (transport, connection) = InmemoryTransport::pair(100); - tx.send(connection) - .await - .expect("Failed to feed listener a connection"); - - let server = make_test_server(ServerConfig { - shutdown: Shutdown::Lonely(Duration::from_millis(100)), - ..Default::default() - }) - .start(listener) - .expect("Failed to start server"); - - // Drop the connection by dropping the transport - drop(transport); - - // Wait for some time - tokio::time::sleep(Duration::from_millis(300)).await; - - assert!(server.is_finished(), "Server shutdown not triggered!"); - } - - #[test(tokio::test)] - async fn should_not_lonely_shutdown_as_long_as_a_connection_exists() { - // Create a test listener where we will forward a connection - let (tx, listener) = make_listener(100); - - // Make bounded transport pair and send off one of them to act as our connection - let (_transport, connection) = InmemoryTransport::pair(100); - tx.send(connection) - .await - .expect("Failed to feed listener a connection"); - - let server = make_test_server(ServerConfig { - shutdown: Shutdown::Lonely(Duration::from_millis(100)), - ..Default::default() - }) - .start(listener) - .expect("Failed to start server"); - - // Wait for some time - tokio::time::sleep(Duration::from_millis(300)).await; - - assert!(!server.is_finished(), "Server shutdown when it should not!"); - } - - #[test(tokio::test)] - async fn should_shutdown_after_n_seconds_even_with_connections_if_config_set_to_after() { - let (tx, listener) = make_listener(100); - - // Make bounded transport pair and send off one of them to act as our connection - let (_transport, connection) = InmemoryTransport::pair(100); - tx.send(connection) - .await - .expect("Failed to feed listener a connection"); - - let server = make_test_server(ServerConfig { - shutdown: Shutdown::After(Duration::from_millis(100)), - ..Default::default() - }) - .start(listener) - .expect("Failed to start server"); - - // Wait for some time - tokio::time::sleep(Duration::from_millis(300)).await; - - assert!(server.is_finished(), "Server shutdown not triggered!"); - } - - #[test(tokio::test)] - async fn should_shutdown_after_n_seconds_if_config_set_to_after() { - let (_tx, listener) = make_listener(100); - - let server = make_test_server(ServerConfig { - shutdown: Shutdown::After(Duration::from_millis(100)), - ..Default::default() - }) - .start(listener) - .expect("Failed to start server"); - - // Wait for some time - tokio::time::sleep(Duration::from_millis(300)).await; - - assert!(server.is_finished(), "Server shutdown not triggered!"); - } - - #[test(tokio::test)] - async fn should_never_shutdown_if_config_set_to_never() { - let (_tx, listener) = make_listener(100); - - let server = make_test_server(ServerConfig { - shutdown: Shutdown::Never, - ..Default::default() - }) - .start(listener) - .expect("Failed to start server"); - - // Wait for some time - tokio::time::sleep(Duration::from_millis(300)).await; - - assert!(!server.is_finished(), "Server shutdown when it should not!"); - } -} diff --git a/distant-core-net/src/common/transport.rs b/distant-core-net/src/transport.rs similarity index 100% rename from distant-core-net/src/common/transport.rs rename to distant-core-net/src/transport.rs diff --git a/distant-core-net/src/common/transport/framed.rs b/distant-core-net/src/transport/framed.rs similarity index 99% rename from distant-core-net/src/common/transport/framed.rs rename to distant-core-net/src/transport/framed.rs index d5dded8..ef88395 100644 --- a/distant-core-net/src/common/transport/framed.rs +++ b/distant-core-net/src/transport/framed.rs @@ -9,7 +9,7 @@ use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; use super::{InmemoryTransport, Interest, Ready, Reconnectable, Transport}; -use crate::common::{utils, SecretKey32}; +use crate::{utils, SecretKey32}; mod backup; mod codec; diff --git a/distant-core-net/src/common/transport/framed/backup.rs b/distant-core-net/src/transport/framed/backup.rs similarity index 100% rename from distant-core-net/src/common/transport/framed/backup.rs rename to distant-core-net/src/transport/framed/backup.rs diff --git a/distant-core-net/src/common/transport/framed/codec.rs b/distant-core-net/src/transport/framed/codec.rs similarity index 100% rename from distant-core-net/src/common/transport/framed/codec.rs rename to distant-core-net/src/transport/framed/codec.rs diff --git a/distant-core-net/src/common/transport/framed/codec/chain.rs b/distant-core-net/src/transport/framed/codec/chain.rs similarity index 100% rename from distant-core-net/src/common/transport/framed/codec/chain.rs rename to distant-core-net/src/transport/framed/codec/chain.rs diff --git a/distant-core-net/src/common/transport/framed/codec/compression.rs b/distant-core-net/src/transport/framed/codec/compression.rs similarity index 100% rename from distant-core-net/src/common/transport/framed/codec/compression.rs rename to distant-core-net/src/transport/framed/codec/compression.rs diff --git a/distant-core-net/src/common/transport/framed/codec/encryption.rs b/distant-core-net/src/transport/framed/codec/encryption.rs similarity index 99% rename from distant-core-net/src/common/transport/framed/codec/encryption.rs rename to distant-core-net/src/transport/framed/codec/encryption.rs index f0f6632..6fc1bf2 100644 --- a/distant-core-net/src/common/transport/framed/codec/encryption.rs +++ b/distant-core-net/src/transport/framed/codec/encryption.rs @@ -3,7 +3,7 @@ use std::{fmt, io}; use derive_more::Display; use super::{Codec, Frame}; -use crate::common::{SecretKey, SecretKey32}; +use crate::{SecretKey, SecretKey32}; /// Represents the type of encryption for a [`EncryptionCodec`] #[derive( diff --git a/distant-core-net/src/common/transport/framed/codec/plain.rs b/distant-core-net/src/transport/framed/codec/plain.rs similarity index 100% rename from distant-core-net/src/common/transport/framed/codec/plain.rs rename to distant-core-net/src/transport/framed/codec/plain.rs diff --git a/distant-core-net/src/common/transport/framed/codec/predicate.rs b/distant-core-net/src/transport/framed/codec/predicate.rs similarity index 100% rename from distant-core-net/src/common/transport/framed/codec/predicate.rs rename to distant-core-net/src/transport/framed/codec/predicate.rs diff --git a/distant-core-net/src/common/transport/framed/exchange.rs b/distant-core-net/src/transport/framed/exchange.rs similarity index 98% rename from distant-core-net/src/common/transport/framed/exchange.rs rename to distant-core-net/src/transport/framed/exchange.rs index 3545495..cc8f5ad 100644 --- a/distant-core-net/src/common/transport/framed/exchange.rs +++ b/distant-core-net/src/transport/framed/exchange.rs @@ -6,7 +6,7 @@ use p256::PublicKey; use rand::rngs::OsRng; use sha2::Sha256; -use crate::common::SecretKey32; +use crate::SecretKey32; mod pkb; pub use pkb::PublicKeyBytes; diff --git a/distant-core-net/src/common/transport/framed/exchange/pkb.rs b/distant-core-net/src/transport/framed/exchange/pkb.rs similarity index 100% rename from distant-core-net/src/common/transport/framed/exchange/pkb.rs rename to distant-core-net/src/transport/framed/exchange/pkb.rs diff --git a/distant-core-net/src/common/transport/framed/exchange/salt.rs b/distant-core-net/src/transport/framed/exchange/salt.rs similarity index 100% rename from distant-core-net/src/common/transport/framed/exchange/salt.rs rename to distant-core-net/src/transport/framed/exchange/salt.rs diff --git a/distant-core-net/src/common/transport/framed/frame.rs b/distant-core-net/src/transport/framed/frame.rs similarity index 100% rename from distant-core-net/src/common/transport/framed/frame.rs rename to distant-core-net/src/transport/framed/frame.rs diff --git a/distant-core-net/src/common/transport/framed/handshake.rs b/distant-core-net/src/transport/framed/handshake.rs similarity index 100% rename from distant-core-net/src/common/transport/framed/handshake.rs rename to distant-core-net/src/transport/framed/handshake.rs diff --git a/distant-core-net/src/common/transport/inmemory.rs b/distant-core-net/src/transport/inmemory.rs similarity index 99% rename from distant-core-net/src/common/transport/inmemory.rs rename to distant-core-net/src/transport/inmemory.rs index 0034cd6..a316773 100644 --- a/distant-core-net/src/common/transport/inmemory.rs +++ b/distant-core-net/src/transport/inmemory.rs @@ -203,7 +203,7 @@ mod tests { use test_log::test; use super::*; - use crate::common::TransportExt; + use crate::TransportExt; #[test] fn is_rx_closed_should_properly_reflect_if_internal_rx_channel_is_closed() { diff --git a/distant-core-net/src/common/transport/tcp.rs b/distant-core-net/src/transport/tcp.rs similarity index 99% rename from distant-core-net/src/common/transport/tcp.rs rename to distant-core-net/src/transport/tcp.rs index 37558cc..ae52c21 100644 --- a/distant-core-net/src/common/transport/tcp.rs +++ b/distant-core-net/src/transport/tcp.rs @@ -79,7 +79,7 @@ mod tests { use tokio::task::JoinHandle; use super::*; - use crate::common::TransportExt; + use crate::TransportExt; async fn find_ephemeral_addr() -> SocketAddr { // Start a listener on a distinct port, get its port, and kill it diff --git a/distant-core-net/src/common/transport/test.rs b/distant-core-net/src/transport/test.rs similarity index 100% rename from distant-core-net/src/common/transport/test.rs rename to distant-core-net/src/transport/test.rs diff --git a/distant-core-net/src/common/transport/unix.rs b/distant-core-net/src/transport/unix.rs similarity index 99% rename from distant-core-net/src/common/transport/unix.rs rename to distant-core-net/src/transport/unix.rs index bc54a13..efb8c04 100644 --- a/distant-core-net/src/common/transport/unix.rs +++ b/distant-core-net/src/transport/unix.rs @@ -69,7 +69,7 @@ mod tests { use tokio::task::JoinHandle; use super::*; - use crate::common::TransportExt; + use crate::TransportExt; async fn start_and_run_server(tx: oneshot::Sender) -> io::Result<()> { // Generate a socket path and delete the file after so there is nothing there diff --git a/distant-core-net/src/common/transport/windows.rs b/distant-core-net/src/transport/windows.rs similarity index 99% rename from distant-core-net/src/common/transport/windows.rs rename to distant-core-net/src/transport/windows.rs index 1ea3e57..b941f1e 100644 --- a/distant-core-net/src/common/transport/windows.rs +++ b/distant-core-net/src/transport/windows.rs @@ -93,7 +93,7 @@ mod tests { use tokio::task::JoinHandle; use super::*; - use crate::common::TransportExt; + use crate::TransportExt; async fn start_and_run_server(tx: oneshot::Sender) -> io::Result<()> { let pipe = start_server(tx).await?; diff --git a/distant-core-net/src/common/transport/windows/pipe.rs b/distant-core-net/src/transport/windows/pipe.rs similarity index 100% rename from distant-core-net/src/common/transport/windows/pipe.rs rename to distant-core-net/src/transport/windows/pipe.rs diff --git a/distant-core-net/src/common/utils.rs b/distant-core-net/src/utils.rs similarity index 100% rename from distant-core-net/src/common/utils.rs rename to distant-core-net/src/utils.rs diff --git a/distant-core-net/src/common/version.rs b/distant-core-net/src/version.rs similarity index 100% rename from distant-core-net/src/common/version.rs rename to distant-core-net/src/version.rs diff --git a/distant-core-server/Cargo.toml b/distant-core-server/Cargo.toml index 0b43536..eb253a3 100644 --- a/distant-core-server/Cargo.toml +++ b/distant-core-server/Cargo.toml @@ -14,6 +14,7 @@ license = "MIT OR Apache-2.0" [dependencies] async-trait = "0.1.68" derive_more = { version = "0.99.17", default-features = false, features = ["display", "from", "error"] } +distant-core-net = { version = "=0.21.0", path = "../distant-core-net" } log = "0.4.18" serde = { version = "1.0.163", features = ["derive"] } diff --git a/distant-core-net/src/server/builder.rs b/distant-core-server/src/builder.rs similarity index 100% rename from distant-core-net/src/server/builder.rs rename to distant-core-server/src/builder.rs diff --git a/distant-core-net/src/server/builder/tcp.rs b/distant-core-server/src/builder/tcp.rs similarity index 100% rename from distant-core-net/src/server/builder/tcp.rs rename to distant-core-server/src/builder/tcp.rs diff --git a/distant-core-net/src/server/builder/unix.rs b/distant-core-server/src/builder/unix.rs similarity index 100% rename from distant-core-net/src/server/builder/unix.rs rename to distant-core-server/src/builder/unix.rs diff --git a/distant-core-net/src/server/builder/windows.rs b/distant-core-server/src/builder/windows.rs similarity index 100% rename from distant-core-net/src/server/builder/windows.rs rename to distant-core-server/src/builder/windows.rs diff --git a/distant-core-net/src/server/config.rs b/distant-core-server/src/config.rs similarity index 100% rename from distant-core-net/src/server/config.rs rename to distant-core-server/src/config.rs diff --git a/distant-core-net/src/server/connection.rs b/distant-core-server/src/connection.rs similarity index 100% rename from distant-core-net/src/server/connection.rs rename to distant-core-server/src/connection.rs diff --git a/distant-core-net/src/server/context.rs b/distant-core-server/src/context.rs similarity index 100% rename from distant-core-net/src/server/context.rs rename to distant-core-server/src/context.rs diff --git a/distant-core-server/src/lib.rs b/distant-core-server/src/lib.rs index e69de29..c70ce81 100644 --- a/distant-core-server/src/lib.rs +++ b/distant-core-server/src/lib.rs @@ -0,0 +1,473 @@ +use std::io; +use std::sync::Arc; +use std::time::Duration; + +use async_trait::async_trait; +use distant_core_auth::Verifier; +use log::*; +use serde::de::DeserializeOwned; +use serde::Serialize; +use tokio::sync::{broadcast, RwLock}; + +use crate::common::{ConnectionId, Listener, Response, Transport, Version}; + +mod builder; +pub use builder::*; + +mod config; +pub use config::*; + +mod connection; +use connection::*; + +mod context; +pub use context::*; + +mod r#ref; +pub use r#ref::*; + +mod reply; +pub use reply::*; + +mod state; +use state::*; + +mod shutdown_timer; +use shutdown_timer::*; + +/// Represents a server that can be used to receive requests & send responses to clients. +pub struct Server { + /// Custom configuration details associated with the server + config: ServerConfig, + + /// Handler used to process various server events + handler: T, + + /// Performs authentication using various methods + verifier: Verifier, + + /// Version associated with the server used by clients to verify compatibility + version: Version, +} + +/// Interface for a handler that receives connections and requests +#[async_trait] +pub trait ServerHandler: Send { + /// Type of data received by the server + type Request; + + /// Type of data sent back by the server + type Response; + + /// Invoked upon a new connection becoming established. + #[allow(unused_variables)] + async fn on_connect(&self, id: ConnectionId) -> io::Result<()> { + Ok(()) + } + + /// Invoked upon an existing connection getting dropped. + #[allow(unused_variables)] + async fn on_disconnect(&self, id: ConnectionId) -> io::Result<()> { + Ok(()) + } + + /// Invoked upon receiving a request from a client. The server should process this + /// request, which can be found in `ctx`, and send one or more replies in response. + async fn on_request(&self, ctx: RequestCtx); +} + +impl Server<()> { + /// Creates a new [`Server`], starting with a default configuration, no authentication methods, + /// and no [`ServerHandler`]. + pub fn new() -> Self { + Self { + config: Default::default(), + handler: (), + verifier: Verifier::empty(), + version: Default::default(), + } + } + + /// Creates a new [`TcpServerBuilder`] that is used to construct a [`Server`]. + pub fn tcp() -> TcpServerBuilder<()> { + TcpServerBuilder::default() + } + + /// Creates a new [`UnixSocketServerBuilder`] that is used to construct a [`Server`]. + #[cfg(unix)] + pub fn unix_socket() -> UnixSocketServerBuilder<()> { + UnixSocketServerBuilder::default() + } + + /// Creates a new [`WindowsPipeServerBuilder`] that is used to construct a [`Server`]. + #[cfg(windows)] + pub fn windows_pipe() -> WindowsPipeServerBuilder<()> { + WindowsPipeServerBuilder::default() + } +} + +impl Default for Server<()> { + fn default() -> Self { + Self::new() + } +} + +impl Server { + /// Consumes the current server, replacing its config with `config` and returning it. + pub fn config(self, config: ServerConfig) -> Self { + Self { + config, + handler: self.handler, + verifier: self.verifier, + version: self.version, + } + } + + /// Consumes the current server, replacing its handler with `handler` and returning it. + pub fn handler(self, handler: U) -> Server { + Server { + config: self.config, + handler, + verifier: self.verifier, + version: self.version, + } + } + + /// Consumes the current server, replacing its verifier with `verifier` and returning it. + pub fn verifier(self, verifier: Verifier) -> Self { + Self { + config: self.config, + handler: self.handler, + verifier, + version: self.version, + } + } + + /// Consumes the current server, replacing its version with `version` and returning it. + pub fn version(self, version: Version) -> Self { + Self { + config: self.config, + handler: self.handler, + verifier: self.verifier, + version, + } + } +} + +impl Server +where + T: ServerHandler + Sync + 'static, + T::Request: DeserializeOwned + Send + Sync + 'static, + T::Response: Serialize + Send + 'static, +{ + /// Consumes the server, starting a task to process connections from the `listener` and + /// returning a [`ServerRef`] that can be used to control the active server instance. + pub fn start(self, listener: L) -> io::Result + where + L: Listener + 'static, + L::Output: Transport + 'static, + { + let state = Arc::new(ServerState::new()); + let (tx, rx) = broadcast::channel(1); + let task = tokio::spawn(self.task(Arc::clone(&state), listener, tx.clone(), rx)); + + Ok(ServerRef { shutdown: tx, task }) + } + + /// Internal task that is run to receive connections and spawn connection tasks + async fn task( + self, + state: Arc>>, + mut listener: L, + shutdown_tx: broadcast::Sender<()>, + shutdown_rx: broadcast::Receiver<()>, + ) where + L: Listener + 'static, + L::Output: Transport + 'static, + { + let Server { + config, + handler, + verifier, + version, + } = self; + + let handler = Arc::new(handler); + let timer = ShutdownTimer::start(config.shutdown); + let mut notification = timer.clone_notification(); + let timer = Arc::new(RwLock::new(timer)); + let verifier = Arc::new(verifier); + + let mut connection_tasks = Vec::new(); + loop { + // Receive a new connection, exiting if no longer accepting connections or if the shutdown + // signal has been received + let transport = tokio::select! { + result = listener.accept() => { + match result { + Ok(x) => x, + Err(x) => { + error!("Server no longer accepting connections: {x}"); + timer.read().await.abort(); + break; + } + } + } + _ = notification.wait() => { + info!( + "Server shutdown triggered after {}s", + config.shutdown.duration().unwrap_or_default().as_secs_f32(), + ); + + let _ = shutdown_tx.send(()); + + break; + } + }; + + // Ensure that the shutdown timer is cancelled now that we have a connection + timer.read().await.stop(); + + connection_tasks.push( + ConnectionTask::build() + .handler(Arc::downgrade(&handler)) + .state(Arc::downgrade(&state)) + .keychain(state.keychain.clone()) + .transport(transport) + .shutdown(shutdown_rx.resubscribe()) + .shutdown_timer(Arc::downgrade(&timer)) + .sleep_duration(config.connection_sleep) + .heartbeat_duration(config.connection_heartbeat) + .verifier(Arc::downgrade(&verifier)) + .version(version.clone()) + .spawn(), + ); + + // Clean up current tasks being tracked + connection_tasks.retain(|task| !task.is_finished()); + } + + // Once we stop listening, we still want to wait until all connections have terminated + info!("Server waiting for active connections to terminate"); + loop { + connection_tasks.retain(|task| !task.is_finished()); + if connection_tasks.is_empty() { + break; + } + tokio::time::sleep(Duration::from_millis(50)).await; + } + info!("Server task terminated"); + } +} + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use async_trait::async_trait; + use distant_core_auth::{AuthenticationMethod, DummyAuthHandler, NoneAuthenticationMethod}; + use test_log::test; + use tokio::sync::mpsc; + + use super::*; + use crate::common::{Connection, InmemoryTransport, MpscListener, Request, Response}; + + macro_rules! server_version { + () => { + Version::new(1, 2, 3) + }; + } + + pub struct TestServerHandler; + + #[async_trait] + impl ServerHandler for TestServerHandler { + type Request = u16; + type Response = String; + + async fn on_request(&self, ctx: RequestCtx) { + // Always send back "hello" + ctx.reply.send("hello".to_string()).unwrap(); + } + } + + #[inline] + fn make_test_server(config: ServerConfig) -> Server { + let methods: Vec> = + vec![Box::new(NoneAuthenticationMethod::new())]; + + Server { + config, + handler: TestServerHandler, + verifier: Verifier::new(methods), + version: server_version!(), + } + } + + #[allow(clippy::type_complexity)] + fn make_listener( + buffer: usize, + ) -> ( + mpsc::Sender, + MpscListener, + ) { + MpscListener::channel(buffer) + } + + #[test(tokio::test)] + async fn should_invoke_handler_upon_receiving_a_request() { + // Create a test listener where we will forward a connection + let (tx, listener) = make_listener(100); + + // Make bounded transport pair and send off one of them to act as our connection + let (transport, connection) = InmemoryTransport::pair(100); + tx.send(connection) + .await + .expect("Failed to feed listener a connection"); + + let _server = make_test_server(ServerConfig::default()) + .start(listener) + .expect("Failed to start server"); + + // Perform handshake and authentication with the server before beginning to send data + let mut connection = Connection::client(transport, DummyAuthHandler, server_version!()) + .await + .expect("Failed to connect to server"); + + connection + .write_frame(Request::new(123).to_vec().unwrap()) + .await + .expect("Failed to send request"); + + // Wait for a response + let frame = connection.read_frame().await.unwrap().unwrap(); + let response: Response = Response::from_slice(frame.as_item()).unwrap(); + assert_eq!(response.payload, "hello"); + } + + #[test(tokio::test)] + async fn should_lonely_shutdown_if_no_connections_received_after_n_secs_when_config_set() { + let (_tx, listener) = make_listener(100); + + let server = make_test_server(ServerConfig { + shutdown: Shutdown::Lonely(Duration::from_millis(100)), + ..Default::default() + }) + .start(listener) + .expect("Failed to start server"); + + // Wait for some time + tokio::time::sleep(Duration::from_millis(300)).await; + + assert!(server.is_finished(), "Server shutdown not triggered!"); + } + + #[test(tokio::test)] + async fn should_lonely_shutdown_if_last_connection_terminated_and_then_no_connections_after_n_secs( + ) { + // Create a test listener where we will forward a connection + let (tx, listener) = make_listener(100); + + // Make bounded transport pair and send off one of them to act as our connection + let (transport, connection) = InmemoryTransport::pair(100); + tx.send(connection) + .await + .expect("Failed to feed listener a connection"); + + let server = make_test_server(ServerConfig { + shutdown: Shutdown::Lonely(Duration::from_millis(100)), + ..Default::default() + }) + .start(listener) + .expect("Failed to start server"); + + // Drop the connection by dropping the transport + drop(transport); + + // Wait for some time + tokio::time::sleep(Duration::from_millis(300)).await; + + assert!(server.is_finished(), "Server shutdown not triggered!"); + } + + #[test(tokio::test)] + async fn should_not_lonely_shutdown_as_long_as_a_connection_exists() { + // Create a test listener where we will forward a connection + let (tx, listener) = make_listener(100); + + // Make bounded transport pair and send off one of them to act as our connection + let (_transport, connection) = InmemoryTransport::pair(100); + tx.send(connection) + .await + .expect("Failed to feed listener a connection"); + + let server = make_test_server(ServerConfig { + shutdown: Shutdown::Lonely(Duration::from_millis(100)), + ..Default::default() + }) + .start(listener) + .expect("Failed to start server"); + + // Wait for some time + tokio::time::sleep(Duration::from_millis(300)).await; + + assert!(!server.is_finished(), "Server shutdown when it should not!"); + } + + #[test(tokio::test)] + async fn should_shutdown_after_n_seconds_even_with_connections_if_config_set_to_after() { + let (tx, listener) = make_listener(100); + + // Make bounded transport pair and send off one of them to act as our connection + let (_transport, connection) = InmemoryTransport::pair(100); + tx.send(connection) + .await + .expect("Failed to feed listener a connection"); + + let server = make_test_server(ServerConfig { + shutdown: Shutdown::After(Duration::from_millis(100)), + ..Default::default() + }) + .start(listener) + .expect("Failed to start server"); + + // Wait for some time + tokio::time::sleep(Duration::from_millis(300)).await; + + assert!(server.is_finished(), "Server shutdown not triggered!"); + } + + #[test(tokio::test)] + async fn should_shutdown_after_n_seconds_if_config_set_to_after() { + let (_tx, listener) = make_listener(100); + + let server = make_test_server(ServerConfig { + shutdown: Shutdown::After(Duration::from_millis(100)), + ..Default::default() + }) + .start(listener) + .expect("Failed to start server"); + + // Wait for some time + tokio::time::sleep(Duration::from_millis(300)).await; + + assert!(server.is_finished(), "Server shutdown not triggered!"); + } + + #[test(tokio::test)] + async fn should_never_shutdown_if_config_set_to_never() { + let (_tx, listener) = make_listener(100); + + let server = make_test_server(ServerConfig { + shutdown: Shutdown::Never, + ..Default::default() + }) + .start(listener) + .expect("Failed to start server"); + + // Wait for some time + tokio::time::sleep(Duration::from_millis(300)).await; + + assert!(!server.is_finished(), "Server shutdown when it should not!"); + } +} diff --git a/distant-core-net/src/server/ref.rs b/distant-core-server/src/ref.rs similarity index 100% rename from distant-core-net/src/server/ref.rs rename to distant-core-server/src/ref.rs diff --git a/distant-core-net/src/server/ref/tcp.rs b/distant-core-server/src/ref/tcp.rs similarity index 100% rename from distant-core-net/src/server/ref/tcp.rs rename to distant-core-server/src/ref/tcp.rs diff --git a/distant-core-net/src/server/ref/unix.rs b/distant-core-server/src/ref/unix.rs similarity index 100% rename from distant-core-net/src/server/ref/unix.rs rename to distant-core-server/src/ref/unix.rs diff --git a/distant-core-net/src/server/ref/windows.rs b/distant-core-server/src/ref/windows.rs similarity index 100% rename from distant-core-net/src/server/ref/windows.rs rename to distant-core-server/src/ref/windows.rs diff --git a/distant-core-net/src/server/reply.rs b/distant-core-server/src/reply.rs similarity index 100% rename from distant-core-net/src/server/reply.rs rename to distant-core-server/src/reply.rs diff --git a/distant-core-net/src/server/shutdown_timer.rs b/distant-core-server/src/shutdown_timer.rs similarity index 100% rename from distant-core-net/src/server/shutdown_timer.rs rename to distant-core-server/src/shutdown_timer.rs diff --git a/distant-core-net/src/server/state.rs b/distant-core-server/src/state.rs similarity index 100% rename from distant-core-net/src/server/state.rs rename to distant-core-server/src/state.rs diff --git a/distant-plugin/src/client.rs b/distant-plugin/src/client.rs index 400ef9f..f363d4f 100644 --- a/distant-plugin/src/client.rs +++ b/distant-plugin/src/client.rs @@ -1,5 +1,22 @@ +use std::io; +use std::sync::mpsc; + use async_trait::async_trait; +use distant_core_protocol::{Request, Response}; -/// +/// Full API for a distant-compatible client. #[async_trait] -pub trait Client {} +pub trait Client { + /// Sends a request without waiting for a response; this method is able to be used even + /// if the session's receiving line to the remote server has been severed. + async fn fire(&mut self, request: Request) -> io::Result<()>; + + /// Sends a request and returns a mailbox that can receive one or more responses, failing if + /// unable to send a request or if the session's receiving line to the remote server has + /// already been severed. + async fn mail(&mut self, request: Request) -> io::Result>; + + /// Sends a request and waits for a response, failing if unable to send a request or if + /// the session's receiving line to the remote server has already been severed + async fn send(&mut self, request: Request) -> io::Result; +} diff --git a/distant-plugin/src/handlers.rs b/distant-plugin/src/handlers.rs index 291ccfb..d220309 100644 --- a/distant-plugin/src/handlers.rs +++ b/distant-plugin/src/handlers.rs @@ -4,6 +4,7 @@ use std::io; use async_trait::async_trait; use distant_core_auth::Authenticator; +use crate::client::Client; use crate::common::{Destination, Map}; /// Boxed [`LaunchHandler`]. @@ -96,21 +97,21 @@ pub trait ConnectHandler: Send + Sync { destination: &Destination, options: &Map, authenticator: &mut dyn Authenticator, - ) -> io::Result; + ) -> io::Result>; } #[async_trait] impl ConnectHandler for F where F: Fn(&Destination, &Map, &mut dyn Authenticator) -> R + Send + Sync + 'static, - R: Future> + Send + 'static, + R: Future>> + Send + 'static, { async fn connect( &self, destination: &Destination, options: &Map, authenticator: &mut dyn Authenticator, - ) -> io::Result { + ) -> io::Result> { self(destination, options, authenticator).await } } @@ -156,10 +157,10 @@ macro_rules! boxed_connect_handler { #[cfg(test)] mod tests { + use distant_core_auth::*; use test_log::test; use super::*; - use crate::common::FramedTransport; #[inline] fn test_destination() -> Destination { @@ -171,9 +172,48 @@ mod tests { Map::default() } + /// Creates an authenticator that does nothing. #[inline] fn test_authenticator() -> impl Authenticator { - FramedTransport::pair(1).0 + struct __TestAuthenticator; + + impl Authenticator for __TestAuthenticator { + async fn initialize( + &mut self, + initialization: Initialization, + ) -> io::Result { + unimplemented!() + } + + async fn challenge(&mut self, challenge: Challenge) -> io::Result { + unimplemented!() + } + + async fn verify( + &mut self, + verification: Verification, + ) -> io::Result { + unimplemented!() + } + + async fn info(&mut self, info: Info) -> io::Result<()> { + unimplemented!() + } + + async fn error(&mut self, error: Error) -> io::Result<()> { + unimplemented!() + } + + async fn start_method(&mut self, start_method: StartMethod) -> io::Result<()> { + unimplemented!() + } + + async fn finished(&mut self) -> io::Result<()> { + unimplemented!() + } + } + + __TestAuthenticator } #[test(tokio::test)]