Server builds -- tests do not

pull/146/head
Chip Senkbeil 2 years ago
parent e576207ca6
commit 569661cd04
No known key found for this signature in database
GPG Key ID: 35EF1F8EC72A4131

1
Cargo.lock generated

@ -810,6 +810,7 @@ dependencies = [
"bytes",
"chacha20poly1305",
"derive_more",
"dyn-clone",
"flate2",
"hex",
"hkdf",

@ -16,6 +16,7 @@ async-trait = "0.1.57"
bytes = "1.2.1"
chacha20poly1305 = "0.10.0"
derive_more = { version = "0.99.17", default-features = false, features = ["as_mut", "as_ref", "deref", "deref_mut", "display", "from", "error", "into", "into_iterator", "is_variant", "try_into"] }
dyn-clone = "1.0.9"
flate2 = "1.0.24"
hex = "0.4.3"
hkdf = "0.12.3"

@ -1,5 +1,7 @@
use crate::{BoxedCodec, FramedTransport};
use async_trait::async_trait;
use serde::{de::DeserializeOwned, Serialize};
use std::io;
mod config;
pub use config::*;
@ -39,11 +41,14 @@ pub trait Server: Send {
ServerConfig::default()
}
/// Invoked immediately on server start, being provided the raw listener to use (untyped
/// transport), and returning the listener when ready to start (enabling servers that need to
/// tweak a listener to do so)
/* async fn on_start(&mut self, listener: L) -> Box<dyn Listener<Output = >> {
} */
/// Invoked to facilitate a handshake between server and client upon establishing a connection,
/// returning an updated [`FramedTransport`] once the handshake is complete
async fn on_handshake<T: Send>(
&self,
transport: FramedTransport<T, BoxedCodec>,
) -> io::Result<FramedTransport<T, BoxedCodec>> {
Ok(transport)
}
/// Invoked upon a new connection becoming established, which provides a mutable reference to
/// the data created for the connection. This can be useful in performing some additional

@ -27,7 +27,7 @@ impl ServerConnection {
/// Returns true if connection is still processing incoming or outgoing messages
pub fn is_active(&self) -> bool {
self.reader_task.is_some() && !self.reader_task.as_ref().unwrap().is_finished()
self.task.is_some() && !self.task.as_ref().unwrap().is_finished()
}
/// Aborts the connection

@ -1,7 +1,7 @@
use crate::{
utils::Timer, ConnectionId, FramedTransport, GenericServerRef, Interest, Listener, Response,
Server, ServerConnection, ServerCtx, ServerRef, ServerReply, ServerState, Shutdown, Transport,
UntypedRequest,
utils::Timer, ConnectionId, FramedTransport, GenericServerRef, Interest, Listener, PlainCodec,
Response, Server, ServerConnection, ServerCtx, ServerRef, ServerReply, ServerState, Shutdown,
Transport, UntypedRequest,
};
use log::*;
use serde::{de::DeserializeOwned, Serialize};
@ -40,7 +40,7 @@ pub trait ServerExt {
fn start<L>(self, listener: L) -> io::Result<Box<dyn ServerRef>>
where
L: Listener + 'static,
L::Output: Transport + Send + 'static;
L::Output: Transport + Send + Sync + 'static;
}
impl<S> ServerExt for S
@ -56,7 +56,7 @@ where
fn start<L>(self, listener: L) -> io::Result<Box<dyn ServerRef>>
where
L: Listener + 'static,
L::Output: Transport + Send + 'static,
L::Output: Transport + Send + Sync + 'static,
{
let server = Arc::new(self);
let state = Arc::new(ServerState::new());
@ -74,7 +74,7 @@ where
S::Response: Serialize + Send + 'static,
S::LocalData: Default + Send + Sync + 'static,
L: Listener + 'static,
L::Output: Transport + Send + 'static,
L::Output: Transport + Send + Sync + 'static,
{
// Grab a copy of our server's configuration so we can leverage it below
let config = server.config();
@ -194,7 +194,7 @@ where
S::Request: DeserializeOwned + Send + Sync + 'static,
S::Response: Serialize + Send + 'static,
D: Default + Send + Sync + 'static,
T: Transport + Send + 'static,
T: Transport + Send + Sync + 'static,
{
pub fn spawn(self) -> JoinHandle<()> {
tokio::spawn(self.run())
@ -206,13 +206,21 @@ where
// Construct a queue of outgoing responses
let (tx, mut rx) = mpsc::channel::<Response<S::Response>>(1);
// TODO: We should perform a handshake here to determine which codec(s) to use in
// collaboration with the client
let mut transport = FramedTransport::new(self.transport);
// Perform a handshake to ensure that the connection is properly established
let mut transport = match self
.server
.on_handshake(FramedTransport::new(self.transport, Box::new(PlainCodec)))
.await
{
Ok(transport) => transport,
Err(x) => {
error!("[Conn {connection_id}] Handshake failed: {x}");
return;
}
};
loop {
let ready = self
.transport
let ready = transport
.ready(Interest::READABLE | Interest::WRITABLE)
.await
.expect("[Conn {connection_id}] Failed to examine ready state");

@ -1,4 +1,4 @@
use crate::{Codec, MappedListener, PortRange, Server, ServerExt, TcpListener, TcpServerRef};
use crate::{PortRange, Server, ServerExt, TcpListener, TcpServerRef};
use async_trait::async_trait;
use serde::{de::DeserializeOwned, Serialize};
use std::{io, net::IpAddr};
@ -11,10 +11,9 @@ pub trait TcpServerExt {
type Response;
/// Start a new server using the provided listener
async fn start<P, C>(self, addr: IpAddr, port: P, codec: C) -> io::Result<TcpServerRef>
async fn start<P>(self, addr: IpAddr, port: P) -> io::Result<TcpServerRef>
where
P: Into<PortRange> + Send,
C: Codec + Send + Sync + 'static;
P: Into<PortRange> + Send;
}
#[async_trait]
@ -28,18 +27,12 @@ where
type Request = S::Request;
type Response = S::Response;
async fn start<P, C>(self, addr: IpAddr, port: P, codec: C) -> io::Result<TcpServerRef>
async fn start<P>(self, addr: IpAddr, port: P) -> io::Result<TcpServerRef>
where
P: Into<PortRange> + Send,
C: Codec + Send + Sync + 'static,
{
let listener = TcpListener::bind(addr, port).await?;
let port = listener.port();
let listener = MappedListener::new(listener, move |transport| {
let transport = FramedTransport::new(transport, codec.clone());
transport.into_split()
});
let inner = ServerExt::start(self, listener)?;
Ok(TcpServerRef { addr, port, inner })
}
@ -70,10 +63,9 @@ mod tests {
#[tokio::test]
async fn should_invoke_handler_upon_receiving_a_request() {
let server =
TcpServerExt::start(TestServer, IpAddr::V6(Ipv6Addr::LOCALHOST), 0, PlainCodec)
.await
.expect("Failed to start TCP server");
let server = TcpServerExt::start(TestServer, IpAddr::V6(Ipv6Addr::LOCALHOST), 0)
.await
.expect("Failed to start TCP server");
let mut client: Client<String, String> = Client::connect(
SocketAddr::from((server.ip_addr(), server.port())),

@ -1,4 +1,4 @@
use crate::{Codec, MappedListener, Server, ServerExt, UnixSocketListener, UnixSocketServerRef};
use crate::{Server, ServerExt, UnixSocketListener, UnixSocketServerRef};
use async_trait::async_trait;
use serde::{de::DeserializeOwned, Serialize};
use std::{io, path::Path};
@ -11,10 +11,9 @@ pub trait UnixSocketServerExt {
type Response;
/// Start a new server using the provided listener
async fn start<P, C>(self, path: P, codec: C) -> io::Result<UnixSocketServerRef>
async fn start<P>(self, path: P) -> io::Result<UnixSocketServerRef>
where
P: AsRef<Path> + Send,
C: Codec + Send + Sync + 'static;
P: AsRef<Path> + Send;
}
#[async_trait]
@ -28,19 +27,14 @@ where
type Request = S::Request;
type Response = S::Response;
async fn start<P, C>(self, path: P, codec: C) -> io::Result<UnixSocketServerRef>
async fn start<P>(self, path: P) -> io::Result<UnixSocketServerRef>
where
P: AsRef<Path> + Send,
C: Codec + Send + Sync + 'static,
{
let path = path.as_ref();
let listener = UnixSocketListener::bind(path).await?;
let path = listener.path().to_path_buf();
let listener = MappedListener::new(listener, move |transport| {
let transport = FramedTransport::new(transport, codec.clone());
transport.into_split()
});
let inner = ServerExt::start(self, listener)?;
Ok(UnixSocketServerRef { path, inner })
}
@ -77,7 +71,7 @@ mod tests {
.path()
.to_path_buf();
let server = UnixSocketServerExt::start(TestServer, path, PlainCodec)
let server = UnixSocketServerExt::start(TestServer, path)
.await
.expect("Failed to start Unix socket server");

@ -1,7 +1,4 @@
use crate::{
Codec, MappedListener, Server, ServerExt, WindowsPipeListener,
WindowsPipeServerRef,
};
use crate::{Server, ServerExt, WindowsPipeListener, WindowsPipeServerRef};
use async_trait::async_trait;
use serde::{de::DeserializeOwned, Serialize};
use std::{
@ -17,21 +14,19 @@ pub trait WindowsPipeServerExt {
type Response;
/// Start a new server at the specified address using the given codec
async fn start<A, C>(self, addr: A, codec: C) -> io::Result<WindowsPipeServerRef>
async fn start<A>(self, addr: A) -> io::Result<WindowsPipeServerRef>
where
A: AsRef<OsStr> + Send,
C: Codec + Send + Sync + 'static;
A: AsRef<OsStr> + Send;
/// Start a new server at the specified address via `\\.\pipe\{name}` using the given codec
async fn start_local<N, C>(self, name: N, codec: C) -> io::Result<WindowsPipeServerRef>
async fn start_local<N>(self, name: N) -> io::Result<WindowsPipeServerRef>
where
Self: Sized,
N: AsRef<OsStr> + Send,
C: Codec + Send + Sync + 'static,
{
let mut addr = OsString::from(r"\\.\pipe\");
addr.push(name.as_ref());
self.start(addr, codec).await
self.start(addr).await
}
}
@ -46,19 +41,14 @@ where
type Request = Req;
type Response = Res;
async fn start<A, C>(self, addr: A, codec: C) -> io::Result<WindowsPipeServerRef>
async fn start<A>(self, addr: A) -> io::Result<WindowsPipeServerRef>
where
A: AsRef<OsStr> + Send,
C: Codec + Send + Sync + 'static,
{
let a = addr.as_ref();
let listener = WindowsPipeListener::bind(a)?;
let addr = listener.addr().to_os_string();
let listener = MappedListener::new(listener, move |transport| {
let transport = FramedTransport::new(transport, codec.clone());
transport.into_split()
});
let inner = ServerExt::start(self, listener)?;
Ok(WindowsPipeServerRef { addr, inner })
}
@ -91,7 +81,6 @@ mod tests {
let server = WindowsPipeServerExt::start_local(
TestServer,
format!("test_pip_{}", rand::random::<usize>()),
PlainCodec,
)
.await
.expect("Failed to start Windows pipe server");

@ -15,20 +15,16 @@ const DEFAULT_CAPACITY: usize = 8 * 1024;
/// Represents a wrapper around a [`Transport`] that reads and writes using frames defined by a
/// [`Codec`]
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct FramedTransport<T, C> {
pub struct FramedTransport<T, U> {
inner: T,
codec: C,
codec: U,
incoming: BytesMut,
outgoing: BytesMut,
}
impl<T, C> FramedTransport<T, C>
where
T: Transport,
C: Codec,
{
pub fn new(inner: T, codec: C) -> Self {
impl<T, U> FramedTransport<T, U> {
pub fn new(inner: T, codec: U) -> Self {
Self {
inner,
codec,
@ -37,6 +33,84 @@ where
}
}
/// Consumes the current transport, replacing it's codec with the provided codec,
/// and returning it. Note that any bytes in the incoming or outgoing buffers will
/// remain in the transport, meaning that this can cause corruption if the bytes
/// in the buffers do not match the new codec.
///
/// For safety, use [`clear`] to wipe the buffers before further use.
///
/// [`clear`]: FramedTransport::clear
pub fn with_codec<C>(self, codec: C) -> FramedTransport<T, C> {
FramedTransport {
inner: self.inner,
codec,
incoming: self.incoming,
outgoing: self.outgoing,
}
}
/// Clears the internal buffers used by the transport
pub fn clear(&mut self) {
self.incoming.clear();
self.outgoing.clear();
}
}
impl<T, U> FramedTransport<T, U>
where
T: Transport,
{
/// Waits for the transport to be ready based on the given interest, returning the ready status
pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
Transport::ready(&self.inner, interest).await
}
/// Waits for the transport to be readable to follow up with `try_read`
pub async fn readable(&self) -> io::Result<()> {
let _ = self.ready(Interest::READABLE).await?;
Ok(())
}
/// Waits for the transport to be writeable to follow up with `try_write`
pub async fn writeable(&self) -> io::Result<()> {
let _ = self.ready(Interest::WRITABLE).await?;
Ok(())
}
/// Attempts to flush any remaining bytes in the outgoing queue.
///
/// This is accomplished by continually calling the inner transport's `try_write`. If 0 is
/// returned from a call to `try_write`, this will fail with [`ErrorKind::WriteZero`].
///
/// This call may return an error with [`ErrorKind::WouldBlock`] in the case that the transport
/// is not ready to write data.
///
/// [`ErrorKind::WouldBlock`]: io::ErrorKind::WouldBlock
pub fn try_flush(&mut self) -> io::Result<()> {
// Continue to send from the outgoing buffer until we either finish or fail
while !self.outgoing.is_empty() {
match self.inner.try_write(self.outgoing.as_ref()) {
// Getting 0 bytes on write indicates the channel has closed
Ok(0) => return Err(io::Error::from(io::ErrorKind::WriteZero)),
// Successful write will advance the outgoing buffer
Ok(n) => self.outgoing.advance(n),
// Any error (including WouldBlock) will get bubbled up
Err(x) => return Err(x),
}
}
Ok(())
}
}
impl<T, U> FramedTransport<T, U>
where
T: Transport,
U: Codec,
{
/// Reads a frame of bytes by using the [`Codec`] tied to this transport. Returns
/// `Ok(Some(frame))` upon reading a frame, or `Ok(None)` if the underlying transport has
/// closed.
@ -99,57 +173,13 @@ where
// Attempt to write everything in our queue
self.try_flush()
}
/// Attempts to flush any remaining bytes in the outgoing queue.
///
/// This is accomplished by continually calling the inner transport's `try_write`. If 0 is
/// returned from a call to `try_write`, this will fail with [`ErrorKind::WriteZero`].
///
/// This call may return an error with [`ErrorKind::WouldBlock`] in the case that the transport
/// is not ready to write data.
///
/// [`ErrorKind::WouldBlock`]: io::ErrorKind::WouldBlock
pub fn try_flush(&mut self) -> io::Result<()> {
// Continue to send from the outgoing buffer until we either finish or fail
while !self.outgoing.is_empty() {
match self.inner.try_write(self.outgoing.as_ref()) {
// Getting 0 bytes on write indicates the channel has closed
Ok(0) => return Err(io::Error::from(io::ErrorKind::WriteZero)),
// Successful write will advance the outgoing buffer
Ok(n) => self.outgoing.advance(n),
// Any error (including WouldBlock) will get bubbled up
Err(x) => return Err(x),
}
}
Ok(())
}
/// Waits for the transport to be ready based on the given interest, returning the ready status
pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
Transport::ready(&self.inner, interest).await
}
/// Waits for the transport to be readable to follow up with `try_read`
pub async fn readable(&self) -> io::Result<()> {
let _ = self.ready(Interest::READABLE).await?;
Ok(())
}
/// Waits for the transport to be writeable to follow up with `try_write`
pub async fn writeable(&self) -> io::Result<()> {
let _ = self.ready(Interest::WRITABLE).await?;
Ok(())
}
}
#[async_trait]
impl<T, C> Reconnectable for FramedTransport<T, C>
impl<T, U> Reconnectable for FramedTransport<T, U>
where
T: Transport + Send,
C: Codec + Send,
U: Codec + Send,
{
async fn reconnect(&mut self) -> io::Result<()> {
Reconnectable::reconnect(&mut self.inner).await

@ -1,4 +1,5 @@
use super::Frame;
use dyn_clone::DynClone;
use std::io;
mod chain;
@ -13,10 +14,13 @@ pub use plain::*;
pub use predicate::*;
pub use xchacha20poly1305::*;
/// Represents a [`Box`]ed version of [`Codec`]
pub type BoxedCodec = Box<dyn Codec + Send + Sync>;
/// Represents abstraction that implements specific encoder and decoder logic to transform an
/// arbitrary collection of bytes. This can be used to encrypt and authenticate bytes sent and
/// received by transports.
pub trait Codec: Clone {
pub trait Codec: DynClone {
/// Encodes a frame's item
fn encode<'a>(&mut self, frame: Frame<'a>) -> io::Result<Frame<'a>>;
@ -24,6 +28,31 @@ pub trait Codec: Clone {
fn decode<'a>(&mut self, frame: Frame<'a>) -> io::Result<Frame<'a>>;
}
macro_rules! impl_traits {
($($x:tt)+) => {
impl Clone for Box<dyn $($x)+> {
fn clone(&self) -> Self {
dyn_clone::clone_box(&**self)
}
}
impl Codec for Box<dyn $($x)+> {
fn encode<'a>(&mut self, frame: Frame<'a>) -> io::Result<Frame<'a>> {
Codec::encode(self.as_mut(), frame)
}
fn decode<'a>(&mut self, frame: Frame<'a>) -> io::Result<Frame<'a>> {
Codec::decode(self.as_mut(), frame)
}
}
};
}
impl_traits!(Codec);
impl_traits!(Codec + Send);
impl_traits!(Codec + Sync);
impl_traits!(Codec + Send + Sync);
/// Interface that provides extensions to the codec interface
pub trait CodecExt {
/// Chains this codec with another codec
@ -33,10 +62,7 @@ pub trait CodecExt {
}
impl<C: Codec> CodecExt for C {
fn chain<T>(self, codec: T) -> ChainCodec<Self, T>
where
Self: Sized,
{
fn chain<T>(self, codec: T) -> ChainCodec<Self, T> {
ChainCodec::new(self, codec)
}
}

@ -45,8 +45,8 @@ impl<T, U> ChainCodec<T, U> {
impl<T, U> Codec for ChainCodec<T, U>
where
T: Codec,
U: Codec,
T: Codec + Clone,
U: Codec + Clone,
{
fn encode<'a>(&mut self, frame: Frame<'a>) -> io::Result<Frame<'a>> {
Codec::encode(&mut self.left, frame).and_then(|frame| Codec::encode(&mut self.right, frame))

@ -62,8 +62,8 @@ where
impl<T, U, P> Codec for PredicateCodec<T, U, P>
where
T: Codec,
U: Codec,
T: Codec + Clone,
U: Codec + Clone,
P: Fn(&Frame) -> bool,
{
fn encode<'a>(&mut self, frame: Frame<'a>) -> io::Result<Frame<'a>> {

Loading…
Cancel
Save