Initial commit

pull/146/head
Chip Senkbeil 2 years ago
parent 7d1b3ba6f0
commit 4e42d6cee8
No known key found for this signature in database
GPG Key ID: 35EF1F8EC72A4131

2
Cargo.lock generated

@ -795,7 +795,6 @@ dependencies = [
"bytes",
"chacha20poly1305",
"derive_more",
"futures",
"hex",
"hkdf",
"log",
@ -809,7 +808,6 @@ dependencies = [
"sha2 0.10.2",
"tempfile",
"tokio",
"tokio-util",
]
[[package]]

@ -411,7 +411,7 @@ mod tests {
use super::*;
use crate::data::{DistantRequestData, DistantResponseData};
use distant_net::{
Client, FramedTransport, InmemoryTransport, IntoSplit, PlainCodec, Request, Response,
Client, FramedTransport, InmemoryRawTransport, IntoSplit, PlainCodec, Request, Response,
TypedAsyncRead, TypedAsyncWrite,
};
use std::{future::Future, time::Duration};
@ -421,7 +421,7 @@ mod tests {
// Configures an lsp process with a means to send & receive data from outside
async fn spawn_lsp_process() -> (
FramedTransport<InmemoryTransport, PlainCodec>,
FramedTransport<InmemoryRawTransport, PlainCodec>,
RemoteLspProcess,
) {
let (mut t1, t2) = FramedTransport::pair(100);

@ -609,13 +609,13 @@ mod tests {
data::{Error, ErrorKind},
};
use distant_net::{
Client, FramedTransport, InmemoryTransport, IntoSplit, PlainCodec, Response,
Client, FramedTransport, InmemoryRawTransport, IntoSplit, PlainCodec, Response,
TypedAsyncRead, TypedAsyncWrite,
};
use std::time::Duration;
fn make_session() -> (
FramedTransport<InmemoryTransport, PlainCodec>,
FramedTransport<InmemoryRawTransport, PlainCodec>,
DistantClient,
) {
let (t1, t2) = FramedTransport::pair(100);

@ -197,14 +197,14 @@ mod tests {
};
use crate::DistantClient;
use distant_net::{
Client, FramedTransport, InmemoryTransport, IntoSplit, PlainCodec, Response,
Client, FramedTransport, InmemoryRawTransport, IntoSplit, PlainCodec, Response,
TypedAsyncRead, TypedAsyncWrite,
};
use std::{path::PathBuf, sync::Arc};
use tokio::sync::Mutex;
fn make_session() -> (
FramedTransport<InmemoryTransport, PlainCodec>,
FramedTransport<InmemoryRawTransport, PlainCodec>,
DistantClient,
) {
let (t1, t2) = FramedTransport::pair(100);

@ -185,14 +185,14 @@ mod tests {
use crate::data::ChangeKind;
use crate::DistantClient;
use distant_net::{
Client, FramedTransport, InmemoryTransport, IntoSplit, PlainCodec, Response,
Client, FramedTransport, InmemoryRawTransport, IntoSplit, PlainCodec, Response,
TypedAsyncRead, TypedAsyncWrite,
};
use std::sync::Arc;
use tokio::sync::Mutex;
fn make_session() -> (
FramedTransport<InmemoryTransport, PlainCodec>,
FramedTransport<InmemoryRawTransport, PlainCodec>,
DistantClient,
) {
let (t1, t2) = FramedTransport::pair(100);

@ -6,8 +6,8 @@ use crate::{
DistantChannel, DistantClient, DistantMsg, DistantRequestData, DistantResponseData, Map,
};
use distant_net::{
router, Auth, AuthServer, Client, IntoSplit, MpscTransport, OneshotListener, Request, Response,
ServerExt, ServerRef, UntypedTransportRead, UntypedTransportWrite,
router, Auth, AuthServer, Client, InmemoryTypedTransport, IntoSplit, OneshotListener, Request,
Response, ServerExt, ServerRef, UntypedTransportRead, UntypedTransportWrite,
};
use log::*;
use std::{
@ -44,7 +44,7 @@ impl Drop for DistantManagerClient {
/// Represents a raw channel between a manager client and some remote server
pub struct RawDistantChannel {
pub transport: MpscTransport<
pub transport: InmemoryTypedTransport<
Request<DistantMsg<DistantRequestData>>,
Response<DistantMsg<DistantResponseData>>,
>,
@ -60,7 +60,7 @@ impl RawDistantChannel {
}
impl Deref for RawDistantChannel {
type Target = MpscTransport<
type Target = InmemoryTypedTransport<
Request<DistantMsg<DistantRequestData>>,
Response<DistantMsg<DistantResponseData>>,
>;
@ -251,7 +251,7 @@ impl DistantManagerClient {
// Spawn reader and writer tasks to forward requests and replies
// using our opened channel
let (t1, t2) = MpscTransport::pair(1);
let (t1, t2) = InmemoryTypedTransport::pair(1);
let (mut writer, mut reader) = t1.into_split();
let mailbox_task = tokio::spawn(async move {
use distant_net::TypedAsyncWrite;
@ -378,12 +378,13 @@ mod tests {
use super::*;
use crate::data::{Error, ErrorKind};
use distant_net::{
FramedTransport, InmemoryTransport, PlainCodec, UntypedTransportRead, UntypedTransportWrite,
FramedTransport, InmemoryRawTransport, PlainCodec, UntypedTransportRead,
UntypedTransportWrite,
};
fn setup() -> (
DistantManagerClient,
FramedTransport<InmemoryTransport, PlainCodec>,
FramedTransport<InmemoryRawTransport, PlainCodec>,
) {
let (t1, t2) = FramedTransport::pair(100);
let client =

@ -426,8 +426,8 @@ impl Server for DistantManager {
mod tests {
use super::*;
use distant_net::{
AuthClient, FramedTransport, HeapAuthServer, InmemoryTransport, IntoSplit, MappedListener,
OneshotListener, PlainCodec, ServerExt, ServerRef,
AuthClient, FramedTransport, HeapAuthServer, InmemoryRawTransport, IntoSplit,
MappedListener, OneshotListener, PlainCodec, ServerExt, ServerRef,
};
/// Create a new server, bypassing the start loop
@ -470,7 +470,7 @@ mod tests {
/// Creates a writer & reader with a connected transport
fn setup_distant_writer_reader() -> (
(BoxedDistantWriter, BoxedDistantReader),
FramedTransport<InmemoryTransport, PlainCodec>,
FramedTransport<InmemoryRawTransport, PlainCodec>,
) {
let (t1, t2) = FramedTransport::pair(1);
let (writer, reader) = t1.into_split();

@ -1,5 +1,5 @@
use distant_core::{
net::{FramedTransport, InmemoryTransport, IntoSplit, OneshotListener, PlainCodec},
net::{FramedTransport, InmemoryRawTransport, IntoSplit, OneshotListener, PlainCodec},
BoxedDistantReader, BoxedDistantWriter, Destination, DistantApiServer, DistantChannelExt,
DistantManager, DistantManagerClient, DistantManagerClientConfig, DistantManagerConfig, Map,
};
@ -8,10 +8,10 @@ use std::io;
/// Creates a client transport and server listener for our tests
/// that are connected together
async fn setup() -> (
FramedTransport<InmemoryTransport, PlainCodec>,
OneshotListener<FramedTransport<InmemoryTransport, PlainCodec>>,
FramedTransport<InmemoryRawTransport, PlainCodec>,
OneshotListener<FramedTransport<InmemoryRawTransport, PlainCodec>>,
) {
let (t1, t2) = InmemoryTransport::pair(100);
let (t1, t2) = InmemoryRawTransport::pair(100);
let listener = OneshotListener::from_value(FramedTransport::new(t2, PlainCodec));
let transport = FramedTransport::new(t1, PlainCodec);

@ -16,7 +16,6 @@ async-trait = "0.1.57"
bytes = "1.2.1"
chacha20poly1305 = "0.10.0"
derive_more = { version = "0.99.17", default-features = false, features = ["as_mut", "as_ref", "deref", "deref_mut", "display", "from", "error", "into", "into_iterator", "is_variant", "try_into"] }
futures = "0.3.21"
hex = "0.4.3"
hkdf = "0.12.3"
log = "0.4.17"
@ -28,7 +27,6 @@ sha2 = "0.10.2"
serde = { version = "1.0.142", features = ["derive"] }
serde_bytes = "0.11.7"
tokio = { version = "1.20.1", features = ["full"] }
tokio-util = { version = "0.7.3", features = ["codec"] }
# Optional dependencies based on features
schemars = { version = "0.8.10", optional = true }

@ -199,7 +199,7 @@ where
mod tests {
use super::*;
use crate::{
IntoSplit, MpscListener, MpscTransport, Request, Response, ServerExt, ServerRef,
InmemoryTypedTransport, IntoSplit, MpscListener, Request, Response, ServerExt, ServerRef,
TypedAsyncRead, TypedAsyncWrite,
};
use tokio::sync::mpsc;
@ -621,7 +621,7 @@ mod tests {
on_info: InfoFn,
on_error: ErrorFn,
) -> io::Result<(
MpscTransport<Request<Auth>, Response<Auth>>,
InmemoryTypedTransport<Request<Auth>, Response<Auth>>,
Box<dyn ServerRef>,
)>
where
@ -642,7 +642,8 @@ mod tests {
let (tx, listener) = MpscListener::channel(100);
// Make bounded transport pair and send off one of them to act as our connection
let (transport, connection) = MpscTransport::<Request<Auth>, Response<Auth>>::pair(100);
let (transport, connection) =
InmemoryTypedTransport::<Request<Auth>, Response<Auth>>::pair(100);
tx.send(connection.into_split())
.await
.expect("Failed to feed listener a connection");

@ -1,38 +0,0 @@
use bytes::BytesMut;
use std::io;
use tokio_util::codec::{Decoder, Encoder};
/// Represents abstraction of a codec that implements specific encoder and decoder for distant
pub trait Codec:
for<'a> Encoder<&'a [u8], Error = io::Error> + Decoder<Item = Vec<u8>, Error = io::Error> + Clone
{
fn encode(&mut self, item: &[u8], dst: &mut BytesMut) -> io::Result<()>;
fn decode(&mut self, src: &mut BytesMut) -> io::Result<Option<Vec<u8>>>;
}
macro_rules! impl_traits_for_codec {
($type:ident) => {
impl<'a> tokio_util::codec::Encoder<&'a [u8]> for $type {
type Error = io::Error;
fn encode(&mut self, item: &'a [u8], dst: &mut BytesMut) -> Result<(), Self::Error> {
Codec::encode(self, item, dst)
}
}
impl tokio_util::codec::Decoder for $type {
type Item = Vec<u8>;
type Error = io::Error;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
Codec::decode(self, src)
}
}
};
}
mod plain;
pub use plain::PlainCodec;
mod xchacha20poly1305;
pub use xchacha20poly1305::XChaCha20Poly1305Codec;

@ -242,8 +242,8 @@ where
mod tests {
use super::*;
use crate::{
IntoSplit, MpscListener, MpscTransport, MpscTransportReadHalf, MpscTransportWriteHalf,
ServerConfig,
InmemoryTypedTransport, IntoSplit, MpscListener, MpscTransportReadHalf,
MpscTransportWriteHalf, ServerConfig,
};
use async_trait::async_trait;
use std::time::Duration;
@ -289,7 +289,7 @@ mod tests {
// Make bounded transport pair and send off one of them to act as our connection
let (mut transport, connection) =
MpscTransport::<Request<u16>, Response<String>>::pair(100);
InmemoryTypedTransport::<Request<u16>, Response<String>>::pair(100);
tx.send(connection.into_split())
.await
.expect("Failed to feed listener a connection");
@ -331,7 +331,8 @@ mod tests {
let (tx, listener) = make_listener(100);
// Make bounded transport pair and send off one of them to act as our connection
let (transport, connection) = MpscTransport::<Request<u16>, Response<String>>::pair(100);
let (transport, connection) =
InmemoryTypedTransport::<Request<u16>, Response<String>>::pair(100);
tx.send(connection.into_split())
.await
.expect("Failed to feed listener a connection");
@ -359,7 +360,8 @@ mod tests {
let (tx, listener) = make_listener(100);
// Make bounded transport pair and send off one of them to act as our connection
let (_transport, connection) = MpscTransport::<Request<u16>, Response<String>>::pair(100);
let (_transport, connection) =
InmemoryTypedTransport::<Request<u16>, Response<String>>::pair(100);
tx.send(connection.into_split())
.await
.expect("Failed to feed listener a connection");
@ -383,7 +385,8 @@ mod tests {
let (tx, listener) = make_listener(100);
// Make bounded transport pair and send off one of them to act as our connection
let (_transport, connection) = MpscTransport::<Request<u16>, Response<String>>::pair(100);
let (_transport, connection) =
InmemoryTypedTransport::<Request<u16>, Response<String>>::pair(100);
tx.send(connection.into_split())
.await
.expect("Failed to feed listener a connection");

@ -1,112 +1,22 @@
use async_trait::async_trait;
use std::{io, marker::Unpin};
use tokio::io::{AsyncRead, AsyncWrite};
/// Interface to split something into writing and reading halves
pub trait IntoSplit {
type Write;
type Read;
fn into_split(self) -> (Self::Write, Self::Read);
}
impl<W, R> IntoSplit for (W, R) {
type Write = W;
type Read = R;
fn into_split(self) -> (Self::Write, Self::Read) {
(self.0, self.1)
}
}
/// Interface representing a transport of raw bytes into and out of the system
pub trait RawTransport: RawTransportRead + RawTransportWrite {}
/// Interface representing a transport of raw bytes into the system
pub trait RawTransportRead: AsyncRead + Send + Unpin {}
/// Interface representing a transport of raw bytes out of the system
pub trait RawTransportWrite: AsyncWrite + Send + Unpin {}
/// Interface representing a transport of typed data into and out of the system
pub trait TypedTransport<W, R>: TypedAsyncRead<R> + TypedAsyncWrite<W> {}
/// Interface to read some structured data asynchronously
#[async_trait]
pub trait TypedAsyncRead<T> {
/// Reads some data, returning `Some(T)` if available or `None` if the reader
/// has closed and no longer is providing data
async fn read(&mut self) -> io::Result<Option<T>>;
}
#[async_trait]
impl<W, R, T> TypedAsyncRead<T> for (W, R)
where
W: Send,
R: TypedAsyncRead<T> + Send,
{
async fn read(&mut self) -> io::Result<Option<T>> {
self.1.read().await
}
}
#[async_trait]
impl<T: Send> TypedAsyncRead<T> for Box<dyn TypedAsyncRead<T> + Send> {
async fn read(&mut self) -> io::Result<Option<T>> {
(**self).read().await
}
}
/// Interface to write some structured data asynchronously
#[async_trait]
pub trait TypedAsyncWrite<T> {
async fn write(&mut self, data: T) -> io::Result<()>;
}
#[async_trait]
impl<W, R, T> TypedAsyncWrite<T> for (W, R)
where
W: TypedAsyncWrite<T> + Send,
R: Send,
T: Send + 'static,
{
async fn write(&mut self, data: T) -> io::Result<()> {
self.0.write(data).await
}
}
#[async_trait]
impl<T: Send> TypedAsyncWrite<T> for Box<dyn TypedAsyncWrite<T> + Send> {
async fn write(&mut self, data: T) -> io::Result<()> {
(**self).write(data).await
}
}
use std::io;
mod router;
mod framed;
pub use framed::*;
mod inmemory;
pub use inmemory::*;
mod mpsc;
pub use mpsc::*;
mod raw;
pub use raw::*;
mod tcp;
pub use tcp::*;
#[cfg(unix)]
mod unix;
#[cfg(unix)]
pub use unix::*;
mod typed;
pub use typed::*;
mod untyped;
pub use untyped::*;
#[cfg(windows)]
mod windows;
pub use tokio::io::{Interest, Ready};
#[cfg(windows)]
pub use windows::*;
/// Interface representing a connection that is reconnectable
#[async_trait]
pub trait Reconnectable {
/// Attempts to reconnect an already-established connection
async fn reconnect(&mut self) -> io::Result<()>;
}

@ -1,215 +0,0 @@
use crate::{
utils, Codec, IntoSplit, RawTransport, RawTransportRead, RawTransportWrite, UntypedTransport,
UntypedTransportRead, UntypedTransportWrite,
};
use async_trait::async_trait;
use futures::{SinkExt, StreamExt};
use log::*;
use serde::{de::DeserializeOwned, Serialize};
use std::io;
use tokio_util::codec::{Framed, FramedRead, FramedWrite};
#[cfg(test)]
mod test;
#[cfg(test)]
pub use test::*;
mod read;
pub use read::*;
mod write;
pub use write::*;
/// Represents [`TypedTransport`] of data across the network using frames in order to support
/// typed messages instead of arbitrary bytes being sent across the wire.
///
/// Note that this type does **not** implement [`RawTransport`] and instead acts as a wrapper
/// around a transport to provide a higher-level interface
#[derive(Debug)]
pub struct FramedTransport<T, C>(Framed<T, C>)
where
T: RawTransport,
C: Codec;
impl<T, C> FramedTransport<T, C>
where
T: RawTransport,
C: Codec,
{
/// Creates a new instance of the transport, wrapping the stream in a `Framed<T, XChaCha20Poly1305Codec>`
pub fn new(transport: T, codec: C) -> Self {
Self(Framed::new(transport, codec))
}
}
impl<T, C> UntypedTransport for FramedTransport<T, C>
where
T: RawTransport,
C: Codec + Send,
{
}
impl<T, C> IntoSplit for FramedTransport<T, C>
where
T: RawTransport + IntoSplit,
<T as IntoSplit>::Read: RawTransportRead,
<T as IntoSplit>::Write: RawTransportWrite,
C: Codec + Send,
{
type Read = FramedTransportReadHalf<<T as IntoSplit>::Read, C>;
type Write = FramedTransportWriteHalf<<T as IntoSplit>::Write, C>;
fn into_split(self) -> (Self::Write, Self::Read) {
let parts = self.0.into_parts();
let (write_half, read_half) = parts.io.into_split();
// Create our split read half and populate its buffer with existing contents
let mut f_read = FramedRead::new(read_half, parts.codec.clone());
*f_read.read_buffer_mut() = parts.read_buf;
// Create our split write half and populate its buffer with existing contents
let mut f_write = FramedWrite::new(write_half, parts.codec);
*f_write.write_buffer_mut() = parts.write_buf;
let read_half = FramedTransportReadHalf(f_read);
let write_half = FramedTransportWriteHalf(f_write);
(write_half, read_half)
}
}
#[async_trait]
impl<T, C> UntypedTransportWrite for FramedTransport<T, C>
where
T: RawTransport + Send,
C: Codec + Send,
{
async fn write<D>(&mut self, data: D) -> io::Result<()>
where
D: Serialize + Send + 'static,
{
// Serialize data into a byte stream
// NOTE: Cannot used packed implementation for now due to issues with deserialization
let data = utils::serialize_to_vec(&data)?;
// Use underlying codec to send data (may encrypt, sign, etc.)
self.0.send(&data).await
}
}
#[async_trait]
impl<T, C> UntypedTransportRead for FramedTransport<T, C>
where
T: RawTransport + Send,
C: Codec + Send,
{
async fn read<D>(&mut self) -> io::Result<Option<D>>
where
D: DeserializeOwned,
{
// Use underlying codec to receive data (may decrypt, validate, etc.)
if let Some(data) = self.0.next().await {
let data = data?;
// Deserialize byte stream into our expected type
match utils::deserialize_from_slice(&data) {
Ok(data) => Ok(Some(data)),
Err(x) => {
error!("Invalid data: {}", String::from_utf8_lossy(&data));
Err(x)
}
}
} else {
Ok(None)
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{InmemoryTransport, PlainCodec};
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct TestData {
name: String,
value: usize,
}
#[tokio::test]
async fn send_should_convert_data_into_byte_stream_and_send_through_stream() {
let (_tx, mut rx, stream) = InmemoryTransport::make(1);
let mut transport = FramedTransport::new(stream, PlainCodec::new());
let data = TestData {
name: String::from("test"),
value: 123,
};
let bytes = utils::serialize_to_vec(&data).unwrap();
let len = (bytes.len() as u64).to_be_bytes();
let mut frame = Vec::new();
frame.extend(len.iter().copied());
frame.extend(bytes);
transport.write(data).await.unwrap();
let outgoing = rx.recv().await.unwrap();
assert_eq!(outgoing, frame);
}
#[tokio::test]
async fn receive_should_return_none_if_stream_is_closed() {
let (_, _, stream) = InmemoryTransport::make(1);
let mut transport = FramedTransport::new(stream, PlainCodec::new());
let result = transport.read::<TestData>().await;
match result {
Ok(None) => {}
x => panic!("Unexpected result: {:?}", x),
}
}
#[tokio::test]
async fn receive_should_fail_if_unable_to_convert_to_type() {
let (tx, _rx, stream) = InmemoryTransport::make(1);
let mut transport = FramedTransport::new(stream, PlainCodec::new());
#[derive(Serialize, Deserialize)]
struct OtherTestData(usize);
let data = OtherTestData(123);
let bytes = utils::serialize_to_vec(&data).unwrap();
let len = (bytes.len() as u64).to_be_bytes();
let mut frame = Vec::new();
frame.extend(len.iter().copied());
frame.extend(bytes);
tx.send(frame).await.unwrap();
let result = transport.read::<TestData>().await;
assert!(result.is_err(), "Unexpectedly succeeded")
}
#[tokio::test]
async fn receive_should_return_some_instance_of_type_when_coming_into_stream() {
let (tx, _rx, stream) = InmemoryTransport::make(1);
let mut transport = FramedTransport::new(stream, PlainCodec::new());
let data = TestData {
name: String::from("test"),
value: 123,
};
let bytes = utils::serialize_to_vec(&data).unwrap();
let len = (bytes.len() as u64).to_be_bytes();
let mut frame = Vec::new();
frame.extend(len.iter().copied());
frame.extend(bytes);
tx.send(frame).await.unwrap();
let received_data = transport.read::<TestData>().await.unwrap().unwrap();
assert_eq!(received_data, data);
}
}

@ -1,115 +0,0 @@
use crate::{transport::framed::utils, Codec, UntypedTransportRead};
use async_trait::async_trait;
use futures::StreamExt;
use log::*;
use serde::de::DeserializeOwned;
use std::io;
use tokio::io::AsyncRead;
use tokio_util::codec::FramedRead;
/// Represents a transport of inbound data from the network using frames in order to support
/// typed messages instead of arbitrary bytes being sent across the wire.
///
/// Note that this type does **not** implement [`AsyncRead`] and instead acts as a
/// wrapper to provide a higher-level interface
pub struct FramedTransportReadHalf<R, C>(pub(super) FramedRead<R, C>)
where
R: AsyncRead,
C: Codec;
#[async_trait]
impl<R, C> UntypedTransportRead for FramedTransportReadHalf<R, C>
where
R: AsyncRead + Send + Unpin,
C: Codec + Send,
{
async fn read<D>(&mut self) -> io::Result<Option<D>>
where
D: DeserializeOwned,
{
// Use underlying codec to receive data (may decrypt, validate, etc.)
if let Some(data) = self.0.next().await {
let data = data?;
// Deserialize byte stream into our expected type
match utils::deserialize_from_slice(&data) {
Ok(data) => Ok(Some(data)),
Err(x) => {
error!("Invalid data: {}", String::from_utf8_lossy(&data));
Err(x)
}
}
} else {
Ok(None)
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{FramedTransport, InmemoryTransport, IntoSplit, PlainCodec};
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct TestData {
name: String,
value: usize,
}
#[tokio::test]
async fn receive_should_return_none_if_stream_is_closed() {
let (_, _, stream) = InmemoryTransport::make(1);
let transport = FramedTransport::new(stream, PlainCodec::new());
let (_, mut reader) = transport.into_split();
let result = reader.read::<TestData>().await;
match result {
Ok(None) => {}
x => panic!("Unexpected result: {:?}", x),
}
}
#[tokio::test]
async fn receive_should_fail_if_unable_to_convert_to_type() {
let (tx, _rx, stream) = InmemoryTransport::make(1);
let transport = FramedTransport::new(stream, PlainCodec::new());
let (_, mut reader) = transport.into_split();
#[derive(Serialize, Deserialize)]
struct OtherTestData(usize);
let data = OtherTestData(123);
let bytes = utils::serialize_to_vec(&data).unwrap();
let len = (bytes.len() as u64).to_be_bytes();
let mut frame = Vec::new();
frame.extend(len.iter().copied());
frame.extend(bytes);
tx.send(frame).await.unwrap();
let result = reader.read::<TestData>().await;
assert!(result.is_err(), "Unexpectedly succeeded");
}
#[tokio::test]
async fn receive_should_return_some_instance_of_type_when_coming_into_stream() {
let (tx, _rx, stream) = InmemoryTransport::make(1);
let transport = FramedTransport::new(stream, PlainCodec::new());
let (_, mut reader) = transport.into_split();
let data = TestData {
name: String::from("test"),
value: 123,
};
let bytes = utils::serialize_to_vec(&data).unwrap();
let len = (bytes.len() as u64).to_be_bytes();
let mut frame = Vec::new();
frame.extend(len.iter().copied());
frame.extend(bytes);
tx.send(frame).await.unwrap();
let received_data = reader.read::<TestData>().await.unwrap().unwrap();
assert_eq!(received_data, data);
}
}

@ -1,12 +0,0 @@
use crate::{FramedTransport, InmemoryTransport, PlainCodec};
#[cfg(test)]
impl FramedTransport<InmemoryTransport, PlainCodec> {
/// Makes a connected pair of framed inmemory transports with plain codec for testing purposes
pub fn make_test_pair() -> (
FramedTransport<InmemoryTransport, PlainCodec>,
FramedTransport<InmemoryTransport, PlainCodec>,
) {
Self::pair(100)
}
}

@ -1,72 +0,0 @@
use crate::{transport::framed::utils, Codec, UntypedTransportWrite};
use async_trait::async_trait;
use futures::SinkExt;
use serde::Serialize;
use std::io;
use tokio::io::AsyncWrite;
use tokio_util::codec::FramedWrite;
/// Represents a transport of outbound data to the network using frames in order to support
/// typed messages instead of arbitrary bytes being sent across the wire.
///
/// Note that this type does **not** implement [`AsyncWrite`] and instead acts as a
/// wrapper to provide a higher-level interface
pub struct FramedTransportWriteHalf<W, C>(pub(super) FramedWrite<W, C>)
where
W: AsyncWrite,
C: Codec;
#[async_trait]
impl<W, C> UntypedTransportWrite for FramedTransportWriteHalf<W, C>
where
W: AsyncWrite + Send + Unpin,
C: Codec + Send,
{
async fn write<D>(&mut self, data: D) -> io::Result<()>
where
D: Serialize + Send + 'static,
{
// Serialize data into a byte stream
// NOTE: Cannot used packed implementation for now due to issues with deserialization
let data = utils::serialize_to_vec(&data)?;
// Use underlying codec to send data (may encrypt, sign, etc.)
self.0.send(&data).await
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{FramedTransport, InmemoryTransport, IntoSplit, PlainCodec};
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct TestData {
name: String,
value: usize,
}
#[tokio::test]
async fn send_should_convert_data_into_byte_stream_and_send_through_stream() {
let (_tx, mut rx, stream) = InmemoryTransport::make(1);
let transport = FramedTransport::new(stream, PlainCodec::new());
let (mut wh, _) = transport.into_split();
let data = TestData {
name: String::from("test"),
value: 123,
};
let bytes = utils::serialize_to_vec(&data).unwrap();
let len = (bytes.len() as u64).to_be_bytes();
let mut frame = Vec::new();
frame.extend(len.iter().copied());
frame.extend(bytes);
wh.write(data).await.unwrap();
let outgoing = rx.recv().await.unwrap();
assert_eq!(outgoing, frame);
}
}

@ -1,225 +0,0 @@
use crate::{
FramedTransport, IntoSplit, PlainCodec, RawTransport, RawTransportRead, RawTransportWrite,
};
use std::{
io,
pin::Pin,
task::{Context, Poll},
};
use tokio::{
io::{AsyncRead, AsyncWrite, ReadBuf},
sync::mpsc,
};
mod read;
pub use read::*;
mod write;
pub use write::*;
/// Represents a [`RawTransport`] comprised of two inmemory channels
#[derive(Debug)]
pub struct InmemoryTransport {
incoming: InmemoryTransportReadHalf,
outgoing: InmemoryTransportWriteHalf,
}
impl InmemoryTransport {
pub fn new(incoming: mpsc::Receiver<Vec<u8>>, outgoing: mpsc::Sender<Vec<u8>>) -> Self {
Self {
incoming: InmemoryTransportReadHalf::new(incoming),
outgoing: InmemoryTransportWriteHalf::new(outgoing),
}
}
/// Returns (incoming_tx, outgoing_rx, transport)
pub fn make(buffer: usize) -> (mpsc::Sender<Vec<u8>>, mpsc::Receiver<Vec<u8>>, Self) {
let (incoming_tx, incoming_rx) = mpsc::channel(buffer);
let (outgoing_tx, outgoing_rx) = mpsc::channel(buffer);
(
incoming_tx,
outgoing_rx,
Self::new(incoming_rx, outgoing_tx),
)
}
/// Returns pair of transports that are connected such that one sends to the other and
/// vice versa
pub fn pair(buffer: usize) -> (Self, Self) {
let (tx, rx, transport) = Self::make(buffer);
(transport, Self::new(rx, tx))
}
}
impl RawTransport for InmemoryTransport {}
impl RawTransportRead for InmemoryTransport {}
impl RawTransportWrite for InmemoryTransport {}
impl IntoSplit for InmemoryTransport {
type Read = InmemoryTransportReadHalf;
type Write = InmemoryTransportWriteHalf;
fn into_split(self) -> (Self::Write, Self::Read) {
(self.outgoing, self.incoming)
}
}
impl AsyncRead for InmemoryTransport {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
Pin::new(&mut self.incoming).poll_read(cx, buf)
}
}
impl AsyncWrite for InmemoryTransport {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
Pin::new(&mut self.outgoing).poll_write(cx, buf)
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Pin::new(&mut self.outgoing).poll_flush(cx)
}
fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Pin::new(&mut self.outgoing).poll_shutdown(cx)
}
}
impl FramedTransport<InmemoryTransport, PlainCodec> {
/// Produces a pair of inmemory transports that are connected to each other using
/// a standard codec
///
/// Sets the buffer for message passing for each underlying transport to the given buffer size
pub fn pair(
buffer: usize,
) -> (
FramedTransport<InmemoryTransport, PlainCodec>,
FramedTransport<InmemoryTransport, PlainCodec>,
) {
let (a, b) = InmemoryTransport::pair(buffer);
let a = FramedTransport::new(a, PlainCodec::new());
let b = FramedTransport::new(b, PlainCodec::new());
(a, b)
}
}
#[cfg(test)]
mod tests {
use super::*;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
#[tokio::test]
async fn make_should_return_sender_that_sends_data_to_transport() {
let (tx, _, mut transport) = InmemoryTransport::make(3);
tx.send(b"test msg 1".to_vec()).await.unwrap();
tx.send(b"test msg 2".to_vec()).await.unwrap();
tx.send(b"test msg 3".to_vec()).await.unwrap();
// Should get data matching a singular message
let mut buf = [0; 256];
let len = transport.read(&mut buf).await.unwrap();
assert_eq!(&buf[..len], b"test msg 1");
// Next call would get the second message
let len = transport.read(&mut buf).await.unwrap();
assert_eq!(&buf[..len], b"test msg 2");
// When the last of the senders is dropped, we should still get
// the rest of the data that was sent first before getting
// an indicator that there is no more data
drop(tx);
let len = transport.read(&mut buf).await.unwrap();
assert_eq!(&buf[..len], b"test msg 3");
let len = transport.read(&mut buf).await.unwrap();
assert_eq!(len, 0, "Unexpectedly got more data");
}
#[tokio::test]
async fn make_should_return_receiver_that_receives_data_from_transport() {
let (_, mut rx, mut transport) = InmemoryTransport::make(3);
transport.write_all(b"test msg 1").await.unwrap();
transport.write_all(b"test msg 2").await.unwrap();
transport.write_all(b"test msg 3").await.unwrap();
// Should get data matching a singular message
assert_eq!(rx.recv().await, Some(b"test msg 1".to_vec()));
// Next call would get the second message
assert_eq!(rx.recv().await, Some(b"test msg 2".to_vec()));
// When the transport is dropped, we should still get
// the rest of the data that was sent first before getting
// an indicator that there is no more data
drop(transport);
assert_eq!(rx.recv().await, Some(b"test msg 3".to_vec()));
assert_eq!(rx.recv().await, None, "Unexpectedly got more data");
}
#[tokio::test]
async fn into_split_should_provide_a_read_half_that_receives_from_sender() {
let (tx, _, transport) = InmemoryTransport::make(3);
let (_, mut read_half) = transport.into_split();
tx.send(b"test msg 1".to_vec()).await.unwrap();
tx.send(b"test msg 2".to_vec()).await.unwrap();
tx.send(b"test msg 3".to_vec()).await.unwrap();
// Should get data matching a singular message
let mut buf = [0; 256];
let len = read_half.read(&mut buf).await.unwrap();
assert_eq!(&buf[..len], b"test msg 1");
// Next call would get the second message
let len = read_half.read(&mut buf).await.unwrap();
assert_eq!(&buf[..len], b"test msg 2");
// When the last of the senders is dropped, we should still get
// the rest of the data that was sent first before getting
// an indicator that there is no more data
drop(tx);
let len = read_half.read(&mut buf).await.unwrap();
assert_eq!(&buf[..len], b"test msg 3");
let len = read_half.read(&mut buf).await.unwrap();
assert_eq!(len, 0, "Unexpectedly got more data");
}
#[tokio::test]
async fn into_split_should_provide_a_write_half_that_sends_to_receiver() {
let (_, mut rx, transport) = InmemoryTransport::make(3);
let (mut write_half, _) = transport.into_split();
write_half.write_all(b"test msg 1").await.unwrap();
write_half.write_all(b"test msg 2").await.unwrap();
write_half.write_all(b"test msg 3").await.unwrap();
// Should get data matching a singular message
assert_eq!(rx.recv().await, Some(b"test msg 1".to_vec()));
// Next call would get the second message
assert_eq!(rx.recv().await, Some(b"test msg 2".to_vec()));
// When the transport is dropped, we should still get
// the rest of the data that was sent first before getting
// an indicator that there is no more data
drop(write_half);
assert_eq!(rx.recv().await, Some(b"test msg 3".to_vec()));
assert_eq!(rx.recv().await, None, "Unexpectedly got more data");
}
}

@ -1,249 +0,0 @@
use crate::RawTransportRead;
use futures::ready;
use std::{
io,
pin::Pin,
task::{Context, Poll},
};
use tokio::{
io::{AsyncRead, ReadBuf},
sync::mpsc,
};
/// Read portion of an inmemory channel
#[derive(Debug)]
pub struct InmemoryTransportReadHalf {
rx: mpsc::Receiver<Vec<u8>>,
overflow: Vec<u8>,
}
impl InmemoryTransportReadHalf {
pub fn new(rx: mpsc::Receiver<Vec<u8>>) -> Self {
Self {
rx,
overflow: Vec::new(),
}
}
}
impl RawTransportRead for InmemoryTransportReadHalf {}
impl AsyncRead for InmemoryTransportReadHalf {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
// If we cannot fit any more into the buffer at the moment, we wait
if buf.remaining() == 0 {
return Poll::Ready(Err(io::Error::new(
io::ErrorKind::Other,
"Cannot poll as buf.remaining() == 0",
)));
}
// If we have overflow from the last poll, put that in the buffer
if !self.overflow.is_empty() {
if self.overflow.len() > buf.remaining() {
let extra = self.overflow.split_off(buf.remaining());
buf.put_slice(&self.overflow);
self.overflow = extra;
} else {
buf.put_slice(&self.overflow);
self.overflow.clear();
}
return Poll::Ready(Ok(()));
}
// Otherwise, we poll for the next batch to read in
match ready!(self.rx.poll_recv(cx)) {
Some(mut x) => {
if x.len() > buf.remaining() {
self.overflow = x.split_off(buf.remaining());
}
buf.put_slice(&x);
Poll::Ready(Ok(()))
}
None => Poll::Ready(Ok(())),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{InmemoryTransport, IntoSplit};
use tokio::io::AsyncReadExt;
#[tokio::test]
async fn read_half_should_fail_if_buf_has_no_space_remaining() {
let (_tx, _rx, transport) = InmemoryTransport::make(1);
let (_t_write, mut t_read) = transport.into_split();
let mut buf = [0u8; 0];
match t_read.read(&mut buf).await {
Err(x) if x.kind() == io::ErrorKind::Other => {}
x => panic!("Unexpected result: {:?}", x),
}
}
#[tokio::test]
async fn read_half_should_update_buf_with_all_overflow_from_last_read_if_it_all_fits() {
let (tx, _rx, transport) = InmemoryTransport::make(1);
let (_t_write, mut t_read) = transport.into_split();
tx.send(vec![1, 2, 3]).await.expect("Failed to send");
let mut buf = [0u8; 2];
// First, read part of the data (first two bytes)
match t_read.read(&mut buf).await {
Ok(n) if n == 2 => assert_eq!(&buf[..n], &[1, 2]),
x => panic!("Unexpected result: {:?}", x),
}
// Second, we send more data because the last message was placed in overflow
tx.send(vec![4, 5, 6]).await.expect("Failed to send");
// Third, read remainder of the overflow from first message (third byte)
match t_read.read(&mut buf).await {
Ok(n) if n == 1 => assert_eq!(&buf[..n], &[3]),
x => panic!("Unexpected result: {:?}", x),
}
// Fourth, verify that we start to receive the next overflow
match t_read.read(&mut buf).await {
Ok(n) if n == 2 => assert_eq!(&buf[..n], &[4, 5]),
x => panic!("Unexpected result: {:?}", x),
}
// Fifth, verify that we get the last bit of overflow
match t_read.read(&mut buf).await {
Ok(n) if n == 1 => assert_eq!(&buf[..n], &[6]),
x => panic!("Unexpected result: {:?}", x),
}
}
#[tokio::test]
async fn read_half_should_update_buf_with_some_of_overflow_that_can_fit() {
let (tx, _rx, transport) = InmemoryTransport::make(1);
let (_t_write, mut t_read) = transport.into_split();
tx.send(vec![1, 2, 3, 4, 5]).await.expect("Failed to send");
let mut buf = [0u8; 2];
// First, read part of the data (first two bytes)
match t_read.read(&mut buf).await {
Ok(n) if n == 2 => assert_eq!(&buf[..n], &[1, 2]),
x => panic!("Unexpected result: {:?}", x),
}
// Second, we send more data because the last message was placed in overflow
tx.send(vec![6]).await.expect("Failed to send");
// Third, read next chunk of the overflow from first message (next two byte)
match t_read.read(&mut buf).await {
Ok(n) if n == 2 => assert_eq!(&buf[..n], &[3, 4]),
x => panic!("Unexpected result: {:?}", x),
}
// Fourth, read last chunk of the overflow from first message (fifth byte)
match t_read.read(&mut buf).await {
Ok(n) if n == 1 => assert_eq!(&buf[..n], &[5]),
x => panic!("Unexpected result: {:?}", x),
}
}
#[tokio::test]
async fn read_half_should_update_buf_with_all_of_inner_channel_when_it_fits() {
let (tx, _rx, transport) = InmemoryTransport::make(1);
let (_t_write, mut t_read) = transport.into_split();
let mut buf = [0u8; 5];
tx.send(vec![1, 2, 3, 4, 5]).await.expect("Failed to send");
// First, read all of data that fits exactly
match t_read.read(&mut buf).await {
Ok(n) if n == 5 => assert_eq!(&buf[..n], &[1, 2, 3, 4, 5]),
x => panic!("Unexpected result: {:?}", x),
}
tx.send(vec![6, 7, 8]).await.expect("Failed to send");
// Second, read data that fits within buf
match t_read.read(&mut buf).await {
Ok(n) if n == 3 => assert_eq!(&buf[..n], &[6, 7, 8]),
x => panic!("Unexpected result: {:?}", x),
}
}
#[tokio::test]
async fn read_half_should_update_buf_with_some_of_inner_channel_that_can_fit_and_add_rest_to_overflow(
) {
let (tx, _rx, transport) = InmemoryTransport::make(1);
let (_t_write, mut t_read) = transport.into_split();
let mut buf = [0u8; 1];
tx.send(vec![1, 2, 3, 4, 5]).await.expect("Failed to send");
// Attempt a read that places more in overflow
match t_read.read(&mut buf).await {
Ok(n) if n == 1 => assert_eq!(&buf[..n], &[1]),
x => panic!("Unexpected result: {:?}", x),
}
// Verify overflow contains the rest
assert_eq!(&t_read.overflow, &[2, 3, 4, 5]);
// Queue up extra data that will not be read until overflow is finished
tx.send(vec![6, 7, 8]).await.expect("Failed to send");
// Read next data point
match t_read.read(&mut buf).await {
Ok(n) if n == 1 => assert_eq!(&buf[..n], &[2]),
x => panic!("Unexpected result: {:?}", x),
}
// Verify overflow contains the rest without having added extra data
assert_eq!(&t_read.overflow, &[3, 4, 5]);
}
#[tokio::test]
async fn read_half_should_yield_pending_if_no_data_available_on_inner_channel() {
let (_tx, _rx, transport) = InmemoryTransport::make(1);
let (_t_write, mut t_read) = transport.into_split();
let mut buf = [0u8; 1];
// Attempt a read that should yield ok with no change, which is what should
// happen when nothing is read into buf
let f = t_read.read(&mut buf);
tokio::pin!(f);
match futures::poll!(f) {
Poll::Pending => {}
x => panic!("Unexpected poll result: {:?}", x),
}
}
#[tokio::test]
async fn read_half_should_not_update_buf_if_inner_channel_closed() {
let (tx, _rx, transport) = InmemoryTransport::make(1);
let (_t_write, mut t_read) = transport.into_split();
let mut buf = [0u8; 1];
// Drop the channel that would be sending data to the transport
drop(tx);
// Attempt a read that should yield ok with no change, which is what should
// happen when nothing is read into buf
match t_read.read(&mut buf).await {
Ok(n) if n == 0 => assert_eq!(&buf, &[0]),
x => panic!("Unexpected result: {:?}", x),
}
}
}

@ -1,147 +0,0 @@
use crate::RawTransportWrite;
use futures::ready;
use std::{
fmt,
future::Future,
io,
pin::Pin,
task::{Context, Poll},
};
use tokio::{io::AsyncWrite, sync::mpsc};
/// Write portion of an inmemory channel
pub struct InmemoryTransportWriteHalf {
tx: Option<mpsc::Sender<Vec<u8>>>,
task: Option<Pin<Box<dyn Future<Output = io::Result<usize>> + Send + Sync + 'static>>>,
}
impl InmemoryTransportWriteHalf {
pub fn new(tx: mpsc::Sender<Vec<u8>>) -> Self {
Self {
tx: Some(tx),
task: None,
}
}
}
impl fmt::Debug for InmemoryTransportWriteHalf {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("InmemoryTransportWrite")
.field("tx", &self.tx)
.field(
"task",
&if self.tx.is_some() {
"Some(...)"
} else {
"None"
},
)
.finish()
}
}
impl RawTransportWrite for InmemoryTransportWriteHalf {}
impl AsyncWrite for InmemoryTransportWriteHalf {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
loop {
match self.task.as_mut() {
Some(task) => {
let res = ready!(task.as_mut().poll(cx));
self.task.take();
return Poll::Ready(res);
}
None => match self.tx.as_mut() {
Some(tx) => {
let n = buf.len();
let tx_2 = tx.clone();
let data = buf.to_vec();
let task =
Box::pin(async move { tx_2.send(data).await.map(|_| n).or(Ok(0)) });
self.task.replace(task);
}
None => return Poll::Ready(Ok(0)),
},
}
}
}
fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
}
fn poll_shutdown(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
self.tx.take();
self.task.take();
Poll::Ready(Ok(()))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{InmemoryTransport, IntoSplit};
use tokio::io::AsyncWriteExt;
#[tokio::test]
async fn write_half_should_return_buf_len_if_can_send_immediately() {
let (_tx, mut rx, transport) = InmemoryTransport::make(1);
let (mut t_write, _t_read) = transport.into_split();
// Write that is not waiting should always succeed with full contents
let n = t_write.write(&[1, 2, 3]).await.expect("Failed to write");
assert_eq!(n, 3, "Unexpected byte count returned");
// Verify we actually had the data sent
let data = rx.try_recv().expect("Failed to recv data");
assert_eq!(data, &[1, 2, 3]);
}
#[tokio::test]
async fn write_half_should_return_support_eventually_sending_by_retrying_when_not_ready() {
let (_tx, mut rx, transport) = InmemoryTransport::make(1);
let (mut t_write, _t_read) = transport.into_split();
// Queue a write already so that we block on the next one
let _ = t_write.write(&[1, 2, 3]).await.expect("Failed to write");
// Verify that the next write is pending
let f = t_write.write(&[4, 5]);
tokio::pin!(f);
match futures::poll!(&mut f) {
Poll::Pending => {}
x => panic!("Unexpected poll result: {:?}", x),
}
// Consume first batch of data so future of second can continue
let data = rx.try_recv().expect("Failed to recv data");
assert_eq!(data, &[1, 2, 3]);
// Verify that poll now returns success
match futures::poll!(f) {
Poll::Ready(Ok(n)) if n == 2 => {}
x => panic!("Unexpected poll result: {:?}", x),
}
// Consume second batch of data
let data = rx.try_recv().expect("Failed to recv data");
assert_eq!(data, &[4, 5]);
}
#[tokio::test]
async fn write_half_should_zero_if_inner_channel_closed() {
let (_tx, rx, transport) = InmemoryTransport::make(1);
let (mut t_write, _t_read) = transport.into_split();
// Drop receiving end that transport would talk to
drop(rx);
// Channel is dropped, so return 0 to indicate no bytes sent
let n = t_write.write(&[1, 2, 3]).await.expect("Failed to write");
assert_eq!(n, 0, "Unexpected byte count returned");
}
}

@ -1,66 +0,0 @@
use crate::{IntoSplit, TypedAsyncRead, TypedAsyncWrite, TypedTransport};
use async_trait::async_trait;
use std::io;
use tokio::sync::mpsc;
mod read;
pub use read::*;
mod write;
pub use write::*;
/// Represents a [`TypedTransport`] of data across the network that uses [`mpsc::Sender`] and
/// [`mpsc::Receiver`] underneath.
#[derive(Debug)]
pub struct MpscTransport<T, U> {
outbound: MpscTransportWriteHalf<T>,
inbound: MpscTransportReadHalf<U>,
}
impl<T, U> MpscTransport<T, U> {
pub fn new(outbound: mpsc::Sender<T>, inbound: mpsc::Receiver<U>) -> Self {
Self {
outbound: MpscTransportWriteHalf::new(outbound),
inbound: MpscTransportReadHalf::new(inbound),
}
}
/// Creates a pair of connected transports using `buffer` as maximum
/// channel capacity for each
pub fn pair(buffer: usize) -> (MpscTransport<T, U>, MpscTransport<U, T>) {
let (t_tx, t_rx) = mpsc::channel(buffer);
let (u_tx, u_rx) = mpsc::channel(buffer);
(
MpscTransport::new(t_tx, u_rx),
MpscTransport::new(u_tx, t_rx),
)
}
}
impl<T: Send, U: Send> TypedTransport<T, U> for MpscTransport<T, U> {}
#[async_trait]
impl<T: Send, U: Send> TypedAsyncWrite<T> for MpscTransport<T, U> {
async fn write(&mut self, data: T) -> io::Result<()> {
self.outbound
.write(data)
.await
.map_err(|x| io::Error::new(io::ErrorKind::Other, x))
}
}
#[async_trait]
impl<T: Send, U: Send> TypedAsyncRead<U> for MpscTransport<T, U> {
async fn read(&mut self) -> io::Result<Option<U>> {
self.inbound.read().await
}
}
impl<T, U> IntoSplit for MpscTransport<T, U> {
type Read = MpscTransportReadHalf<U>;
type Write = MpscTransportWriteHalf<T>;
fn into_split(self) -> (Self::Write, Self::Read) {
(self.outbound, self.inbound)
}
}

@ -1,22 +0,0 @@
use crate::TypedAsyncRead;
use async_trait::async_trait;
use std::io;
use tokio::sync::mpsc;
#[derive(Debug)]
pub struct MpscTransportReadHalf<T> {
rx: mpsc::Receiver<T>,
}
impl<T> MpscTransportReadHalf<T> {
pub fn new(rx: mpsc::Receiver<T>) -> Self {
Self { rx }
}
}
#[async_trait]
impl<T: Send> TypedAsyncRead<T> for MpscTransportReadHalf<T> {
async fn read(&mut self) -> io::Result<Option<T>> {
Ok(self.rx.recv().await)
}
}

@ -1,25 +0,0 @@
use crate::TypedAsyncWrite;
use async_trait::async_trait;
use std::io;
use tokio::sync::mpsc;
#[derive(Debug)]
pub struct MpscTransportWriteHalf<T> {
tx: mpsc::Sender<T>,
}
impl<T> MpscTransportWriteHalf<T> {
pub fn new(tx: mpsc::Sender<T>) -> Self {
Self { tx }
}
}
#[async_trait]
impl<T: Send> TypedAsyncWrite<T> for MpscTransportWriteHalf<T> {
async fn write(&mut self, data: T) -> io::Result<()> {
self.tx
.send(data)
.await
.map_err(|x| io::Error::new(io::ErrorKind::Other, x.to_string()))
}
}

@ -0,0 +1,57 @@
use super::{Interest, Ready, Reconnectable};
use async_trait::async_trait;
use std::io;
mod framed;
pub use framed::*;
mod inmemory;
pub use inmemory::*;
mod tcp;
pub use tcp::*;
#[cfg(unix)]
mod unix;
#[cfg(windows)]
mod windows;
#[cfg(windows)]
pub use windows::*;
/// Interface representing a transport of raw bytes into and out of the system
#[async_trait]
pub trait RawTransport: Reconnectable {
/// Tries to read data from the transport into the provided buffer, returning how many bytes
/// were read
///
/// This call may return an error with [`ErrorKind::WouldBlock`] in the case that the transport
/// is not ready to read data.
///
/// [`ErrorKind::WouldBlock`]: io::ErrorKind::WouldBlock
fn try_read(&self, buf: &mut [u8]) -> io::Result<usize>;
/// Try to write a buffer to the transport, returning how many bytes were written
///
/// This call may return an error with [`ErrorKind::WouldBlock`] in the case that the transport
/// is not ready to write data.
///
/// [`ErrorKind::WouldBlock`]: io::ErrorKind::WouldBlock
fn try_write(&self, buf: &[u8]) -> io::Result<usize>;
/// Waits for the transport to be ready based on the given interest, returning the ready status
async fn ready(&self, interest: Interest) -> io::Result<Ready>;
/// Waits for the transport to be readable to follow up with `try_read`
async fn readable(&self) -> io::Result<()> {
let _ = self.ready(Interest::READABLE).await?;
Ok(())
}
/// Waits for the transport to be readable to follow up with `try_write`
async fn writeable(&self) -> io::Result<()> {
let _ = self.ready(Interest::WRITABLE).await?;
Ok(())
}
}

@ -0,0 +1,66 @@
use super::{Interest, RawTransport, Ready, Reconnectable};
use async_trait::async_trait;
use std::io;
mod codec;
pub use codec::*;
/// Represents a [`RawTransport`] that reads and writes using frames defined by a [`Codec`],
/// which provides the ability to guarantee that data is read and written completely and also
/// follows the format of the given codec such as encryption and authentication of bytes
pub struct FramedRawTransport<T, C>
where
T: RawTransport,
C: Codec,
{
inner: T,
codec: C,
}
#[async_trait]
impl<T, C> Reconnectable for FramedRawTransport<T, C>
where
T: RawTransport,
C: Codec,
{
async fn reconnect(&mut self) -> io::Result<()> {
Reconnectable::reconnect(&mut self.inner).await
}
}
#[async_trait]
impl<T, C> RawTransport for FramedRawTransport<T, C>
where
T: RawTransport,
C: Codec,
{
fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> {
todo!();
}
fn try_write(&self, buf: &[u8]) -> io::Result<usize> {
todo!();
}
async fn ready(&self, interest: Interest) -> io::Result<Ready> {
todo!();
}
}
impl FramedRawTransport<super::InmemoryRawTransport, PlainCodec> {
/// Produces a pair of inmemory transports that are connected to each other using
/// a standard codec
///
/// Sets the buffer for message passing for each underlying transport to the given buffer size
pub fn pair(
buffer: usize,
) -> (
FramedRawTransport<super::InmemoryRawTransport, PlainCodec>,
FramedRawTransport<super::InmemoryRawTransport, PlainCodec>,
) {
let (a, b) = super::InmemoryRawTransport::pair(buffer);
let a = FramedRawTransport::new(a, PlainCodec::new());
let b = FramedRawTransport::new(b, PlainCodec::new());
(a, b)
}
}

@ -0,0 +1,14 @@
use bytes::BytesMut;
use std::io;
mod plain;
pub use plain::PlainCodec;
mod xchacha20poly1305;
pub use xchacha20poly1305::XChaCha20Poly1305Codec;
/// Represents abstraction of a codec that implements specific encoder and decoder for distant
pub trait Codec: Clone {
fn encode(&mut self, item: &[u8], dst: &mut BytesMut) -> io::Result<()>;
fn decode(&mut self, src: &mut BytesMut) -> io::Result<Option<Vec<u8>>>;
}

@ -1,7 +1,6 @@
use crate::Codec;
use bytes::{Buf, BufMut, BytesMut};
use std::convert::TryInto;
use tokio::io;
use std::{convert::TryInto, io};
/// Total bytes to use as the len field denoting a frame's size
const LEN_SIZE: usize = 8;
@ -9,7 +8,6 @@ const LEN_SIZE: usize = 8;
/// Represents a codec that just ships messages back and forth with no encryption or authentication
#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
pub struct PlainCodec;
impl_traits_for_codec!(PlainCodec);
impl PlainCodec {
pub fn new() -> Self {

@ -1,8 +1,7 @@
use crate::{Codec, SecretKey, SecretKey32};
use bytes::{Buf, BufMut, BytesMut};
use chacha20poly1305::{aead::Aead, Key, KeyInit, XChaCha20Poly1305, XNonce};
use std::{convert::TryInto, fmt};
use tokio::io;
use std::{convert::TryInto, fmt, io};
/// Total bytes to use as the len field denoting a frame's size
const LEN_SIZE: usize = 8;
@ -17,7 +16,6 @@ const NONCE_SIZE: usize = 24;
pub struct XChaCha20Poly1305Codec {
cipher: XChaCha20Poly1305,
}
impl_traits_for_codec!(XChaCha20Poly1305Codec);
impl XChaCha20Poly1305Codec {
pub fn new(key: &[u8]) -> Self {

@ -0,0 +1,223 @@
use super::{Interest, RawTransport, Ready, Reconnectable};
use async_trait::async_trait;
use std::{
io,
sync::{Mutex, MutexGuard},
};
use tokio::sync::mpsc::{
self,
error::{TryRecvError, TrySendError},
};
/// Represents a [`RawTransport`] comprised of two inmemory channels
#[derive(Debug)]
pub struct InmemoryRawTransport {
tx: mpsc::Sender<Vec<u8>>,
rx: mpsc::Receiver<Vec<u8>>,
/// Internal storage used when we get more data from a `try_read` than can be returned
buf: Mutex<Option<Vec<u8>>>,
}
impl InmemoryRawTransport {
pub fn new(tx: mpsc::Sender<Vec<u8>>, rx: mpsc::Receiver<Vec<u8>>) -> Self {
Self {
tx,
rx,
buf: Mutex::new(None),
}
}
/// Returns (incoming_tx, outgoing_rx, transport)
pub fn make(buffer: usize) -> (mpsc::Sender<Vec<u8>>, mpsc::Receiver<Vec<u8>>, Self) {
let (incoming_tx, incoming_rx) = mpsc::channel(buffer);
let (outgoing_tx, outgoing_rx) = mpsc::channel(buffer);
(
incoming_tx,
outgoing_rx,
Self::new(outgoing_tx, incoming_rx),
)
}
/// Returns pair of transports that are connected such that one sends to the other and
/// vice versa
pub fn pair(buffer: usize) -> (Self, Self) {
let (tx, rx, transport) = Self::make(buffer);
(transport, Self::new(tx, rx))
}
/// Returns true if the read channel is closed, meaning it will no longer receive more data.
/// This does not factor in data remaining in the internal buffer, meaning that this may return
/// true while the transport still has data remaining in the internal buffer.
///
/// NOTE: Because there is no `is_closed` on the receiver, we have to actually try to
/// read from the receiver to see if it is disconnected, adding any received data
/// to our internal buffer if it is not disconnected and has data available
///
/// Track https://github.com/tokio-rs/tokio/issues/4638 for future `is_closed` on rx
fn is_rx_closed(&self) -> bool {
match self.rx.try_recv() {
Ok(mut data) => {
let buf_lock = self.buf.lock().unwrap();
let data = match buf_lock.take() {
Some(existing) => {
existing.append(&mut data);
existing
}
None => data,
};
*buf_lock = Some(data);
true
}
Err(TryRecvError::Empty) => false,
Err(TryRecvError::Disconnected) => true,
}
}
}
#[async_trait]
impl Reconnectable for InmemoryRawTransport {
/// Once the underlying channels have closed, there is no way for this transport to
/// re-establish those channels; therefore, reconnecting will always fail with
/// [`ErrorKind::Unsupported`]
///
/// [`ErrorKind::Unsupported`]: io::ErrorKind::Unsupported
async fn reconnect(&mut self) -> io::Result<()> {
Err(io::Error::from(io::ErrorKind::Unsupported))
}
}
#[async_trait]
impl RawTransport for InmemoryRawTransport {
fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> {
// Lock our internal storage to ensure that nothing else mutates it for the lifetime of
// this call as we want to make sure that data is read and stored in order
let buf_lock = self.buf.lock().unwrap();
// Check if we have data in our internal buffer, and if so feed it into the outgoing buf
if let Some(data) = buf_lock.take() {
return Ok(copy_and_store(buf_lock, data, buf));
}
match self.rx.try_recv() {
Ok(data) => Ok(copy_and_store(buf_lock, data, buf)),
Err(TryRecvError::Empty) => Err(io::Error::from(io::ErrorKind::WouldBlock)),
Err(TryRecvError::Disconnected) => Ok(None),
}
}
fn try_write(&self, buf: &[u8]) -> io::Result<usize> {
match self.tx.try_send(buf.to_vec()) {
Ok(()) => Ok(buf.len()),
Err(TrySendError::Full(_)) => Err(io::Error::from(io::ErrorKind::WouldBlock)),
Err(TryRecvError::Closed(_)) => Err(io::Error::from(io::ErrorKind::BrokenPipe)),
}
}
async fn ready(&self, interest: Interest) -> io::Result<Ready> {
let mut status = Ready::EMPTY;
if interest.is_readable() {
// TODO: Replace `self.is_rx_closed()` with `self.rx.is_closed()` once the tokio issue
// is resolved that adds `is_closed` to the `mpsc::Receiver`
//
// See https://github.com/tokio-rs/tokio/issues/4638
status |= if self.is_rx_closed() && self.buf.lock().unwrap().is_none() {
Ready::READ_CLOSED
} else {
Ready::READABLE
};
}
if interest.is_writeable() {
status |= if self.tx.is_closed() {
Ready::WRITE_CLOSED
} else {
Ready::WRITABLE
};
}
Ok(status)
}
}
/// Copies `data` into `out`, storing any overflow from `data` into the storage pointed to by the
/// mutex `buf_lock`
fn copy_and_store(buf_lock: MutexGuard<Option<Vec<u8>>>, data: Vec<u8>, out: &mut [u8]) -> usize {
// NOTE: We can get data that is larger than the destination buf; so,
// we store as much as we can and queue up the rest in our temporary
// storage for future retrievals
if data.len() > out.len() {
let n = out.len();
out.copy_from_slice(&data[..n]);
*buf_lock = Some(data.split_off(n));
n
} else {
let n = data.len();
out[..n].copy_from_slice(&data);
n
}
}
#[cfg(test)]
mod tests {
use super::*;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
#[tokio::test]
async fn make_should_return_sender_that_sends_data_to_transport() {
let (tx, _, mut transport) = InmemoryRawTransport::make(3);
tx.send(b"test msg 1".to_vec()).await.unwrap();
tx.send(b"test msg 2".to_vec()).await.unwrap();
tx.send(b"test msg 3".to_vec()).await.unwrap();
// Should get data matching a singular message
let mut buf = [0; 256];
let len = transport.read(&mut buf).await.unwrap();
assert_eq!(&buf[..len], b"test msg 1");
// Next call would get the second message
let len = transport.read(&mut buf).await.unwrap();
assert_eq!(&buf[..len], b"test msg 2");
// When the last of the senders is dropped, we should still get
// the rest of the data that was sent first before getting
// an indicator that there is no more data
drop(tx);
let len = transport.read(&mut buf).await.unwrap();
assert_eq!(&buf[..len], b"test msg 3");
let len = transport.read(&mut buf).await.unwrap();
assert_eq!(len, 0, "Unexpectedly got more data");
}
#[tokio::test]
async fn make_should_return_receiver_that_receives_data_from_transport() {
let (_, mut rx, mut transport) = InmemoryRawTransport::make(3);
transport.write_all(b"test msg 1").await.unwrap();
transport.write_all(b"test msg 2").await.unwrap();
transport.write_all(b"test msg 3").await.unwrap();
// Should get data matching a singular message
assert_eq!(rx.recv().await, Some(b"test msg 1".to_vec()));
// Next call would get the second message
assert_eq!(rx.recv().await, Some(b"test msg 2".to_vec()));
// When the transport is dropped, we should still get
// the rest of the data that was sent first before getting
// an indicator that there is no more data
drop(transport);
assert_eq!(rx.recv().await, Some(b"test msg 3".to_vec()));
assert_eq!(rx.recv().await, None, "Unexpectedly got more data");
}
}

@ -1,17 +1,7 @@
use crate::{IntoSplit, RawTransport, RawTransportRead, RawTransportWrite};
use std::{
fmt, io,
net::IpAddr,
pin::Pin,
task::{Context, Poll},
};
use tokio::{
io::{AsyncRead, AsyncWrite, ReadBuf},
net::{
tcp::{OwnedReadHalf, OwnedWriteHalf},
TcpStream, ToSocketAddrs,
},
};
use super::{Interest, RawTransport, Ready, Reconnectable};
use async_trait::async_trait;
use std::{fmt, io, net::IpAddr};
use tokio::net::{TcpStream, ToSocketAddrs};
/// Represents a [`RawTransport`] that leverages a TCP stream
pub struct TcpTransport {
@ -53,51 +43,26 @@ impl fmt::Debug for TcpTransport {
}
}
impl RawTransport for TcpTransport {}
impl RawTransportRead for TcpTransport {}
impl RawTransportWrite for TcpTransport {}
impl RawTransportRead for OwnedReadHalf {}
impl RawTransportWrite for OwnedWriteHalf {}
impl IntoSplit for TcpTransport {
type Read = OwnedReadHalf;
type Write = OwnedWriteHalf;
fn into_split(self) -> (Self::Write, Self::Read) {
let (r, w) = TcpStream::into_split(self.inner);
(w, r)
}
}
impl AsyncRead for TcpTransport {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
Pin::new(&mut self.inner).poll_read(cx, buf)
#[async_trait]
impl Reconnectable for TcpTransport {
async fn reconnect(&mut self) -> io::Result<()> {
self.inner = TcpStream::connect((self.addr, self.port)).await?;
Ok(())
}
}
impl AsyncWrite for TcpTransport {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, io::Error>> {
Pin::new(&mut self.inner).poll_write(cx, buf)
#[async_trait]
impl RawTransport for TcpTransport {
fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> {
self.inner.try_read(buf)
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
Pin::new(&mut self.inner).poll_flush(cx)
fn try_write(&self, buf: &[u8]) -> io::Result<usize> {
self.inner.try_write(buf)
}
fn poll_shutdown(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), io::Error>> {
Pin::new(&mut self.inner).poll_shutdown(cx)
async fn ready(&self, interest: Interest) -> io::Result<Ready> {
self.inner.ready(interest).await
}
}
@ -105,12 +70,7 @@ impl AsyncWrite for TcpTransport {
mod tests {
use super::*;
use std::net::{Ipv6Addr, SocketAddr};
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::TcpListener,
sync::oneshot,
task::JoinHandle,
};
use tokio::{net::TcpListener, sync::oneshot, task::JoinHandle};
async fn find_ephemeral_addr() -> SocketAddr {
// Start a listener on a distinct port, get its port, and kill it

@ -1,17 +1,10 @@
use crate::{IntoSplit, RawTransport, RawTransportRead, RawTransportWrite};
use super::{Interest, RawTransport, Ready, Reconnectable};
use async_trait::async_trait;
use std::{
fmt, io,
path::{Path, PathBuf},
pin::Pin,
task::{Context, Poll},
};
use tokio::{
io::{AsyncRead, AsyncWrite, ReadBuf},
net::{
unix::{OwnedReadHalf, OwnedWriteHalf},
UnixStream,
},
};
use tokio::net::UnixStream;
/// Represents a [`RawTransport`] that leverages a Unix socket
pub struct UnixSocketTransport {
@ -43,51 +36,26 @@ impl fmt::Debug for UnixSocketTransport {
}
}
impl RawTransport for UnixSocketTransport {}
impl RawTransportRead for UnixSocketTransport {}
impl RawTransportWrite for UnixSocketTransport {}
impl RawTransportRead for OwnedReadHalf {}
impl RawTransportWrite for OwnedWriteHalf {}
impl IntoSplit for UnixSocketTransport {
type Read = OwnedReadHalf;
type Write = OwnedWriteHalf;
fn into_split(self) -> (Self::Write, Self::Read) {
let (r, w) = UnixStream::into_split(self.inner);
(w, r)
}
}
impl AsyncRead for UnixSocketTransport {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
Pin::new(&mut self.inner).poll_read(cx, buf)
#[async_trait]
impl Reconnectable for UnixSocketTransport {
async fn reconnect(&mut self) -> io::Result<()> {
self.inner = UnixStream::connect(self.path.as_path()).await?;
Ok(())
}
}
impl AsyncWrite for UnixSocketTransport {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, io::Error>> {
Pin::new(&mut self.inner).poll_write(cx, buf)
#[async_trait]
impl RawTransport for UnixSocketTransport {
fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> {
self.inner.try_read(buf)
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
Pin::new(&mut self.inner).poll_flush(cx)
fn try_write(&self, buf: &[u8]) -> io::Result<usize> {
self.inner.try_write(buf)
}
fn poll_shutdown(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), io::Error>> {
Pin::new(&mut self.inner).poll_shutdown(cx)
async fn ready(&self, interest: Interest) -> io::Result<Ready> {
self.inner.ready(interest).await
}
}

@ -1,4 +1,4 @@
use crate::{IntoSplit, RawTransport, RawTransportRead, RawTransportWrite};
use super::{Interest, RawTransport, Ready, Reconnectable};
use std::{
ffi::{OsStr, OsString},
fmt, io,
@ -6,17 +6,6 @@ use std::{
task::{Context, Poll},
time::Duration,
};
use tokio::{
io::{AsyncRead, AsyncWrite, ReadBuf, ReadHalf, WriteHalf},
net::windows::named_pipe::ClientOptions,
};
// Equivalent to winapi::shared::winerror::ERROR_PIPE_BUSY
// DWORD -> c_uLong -> u32
const ERROR_PIPE_BUSY: u32 = 231;
// Time between attempts to connect to a busy pipe
const BUSY_PIPE_SLEEP_MILLIS: u64 = 50;
mod pipe;
pub use pipe::NamedPipe;
@ -42,20 +31,11 @@ impl WindowsPipeTransport {
/// Address may be something like `\.\pipe\my_pipe_name`
pub async fn connect(addr: impl Into<OsString>) -> io::Result<Self> {
let addr = addr.into();
let pipe = loop {
match ClientOptions::new().open(&addr) {
Ok(client) => break client,
Err(e) if e.raw_os_error() == Some(ERROR_PIPE_BUSY as i32) => (),
Err(e) => return Err(e),
}
tokio::time::sleep(Duration::from_millis(BUSY_PIPE_SLEEP_MILLIS)).await;
};
let inner = NamedPipe::connect_as_client(&addr).await?;
Ok(Self {
addr,
inner: NamedPipe::from(pipe),
inner,
})
}
@ -73,51 +53,31 @@ impl fmt::Debug for WindowsPipeTransport {
}
}
impl RawTransport for WindowsPipeTransport {}
impl RawTransportRead for WindowsPipeTransport {}
impl RawTransportWrite for WindowsPipeTransport {}
impl RawTransportRead for ReadHalf<WindowsPipeTransport> {}
impl RawTransportWrite for WriteHalf<WindowsPipeTransport> {}
impl IntoSplit for WindowsPipeTransport {
type Read = ReadHalf<WindowsPipeTransport>;
type Write = WriteHalf<WindowsPipeTransport>;
fn into_split(self) -> (Self::Write, Self::Read) {
let (reader, writer) = tokio::io::split(self);
(writer, reader)
}
}
#[async_trait]
impl Reconnectable for WindowsPipeTransport {
async fn reconnect(&mut self) -> io::Result<()> {
// We cannot reconnect from server-side
if self.inner.is_server() {
return Err(io::Error::from(io::ErrorKind::Unsupported));
}
impl AsyncRead for WindowsPipeTransport {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
Pin::new(&mut self.inner).poll_read(cx, buf)
self.inner = NamedPipe::connect_as_client(&self.addr).await?;
Ok(())
}
}
impl AsyncWrite for WindowsPipeTransport {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, io::Error>> {
Pin::new(&mut self.inner).poll_write(cx, buf)
#[async_trait]
impl RawTransport for WindowsPipeTransport {
fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> {
self.inner.try_read(buf)
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
Pin::new(&mut self.inner).poll_flush(cx)
fn try_write(&self, buf: &[u8]) -> io::Result<usize> {
self.inner.try_write(buf)
}
fn poll_shutdown(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), io::Error>> {
Pin::new(&mut self.inner).poll_shutdown(cx)
async fn ready(&self, interest: Interest) -> io::Result<Ready> {
self.inner.ready(interest).await
}
}

@ -0,0 +1,82 @@
use derive_more::{From, TryInto};
use std::io;
use tokio::net::windows::named_pipe::{ClientOptions, NamedPipeClient, NamedPipeServer};
// Equivalent to winapi::shared::winerror::ERROR_PIPE_BUSY
// DWORD -> c_uLong -> u32
const ERROR_PIPE_BUSY: u32 = 231;
// Time between attempts to connect to a busy pipe
const BUSY_PIPE_SLEEP_MILLIS: u64 = 50;
/// Represents a named pipe from either a client or server perspective
#[derive(From, TryInto)]
pub enum NamedPipe {
Client(NamedPipeClient),
Server(NamedPipeServer),
}
impl NamedPipe {
/// Returns a reference to the underlying named client pipe
pub fn as_client(&self) -> Option<&NamedPipeClient> {
match self {
Self::Client(x) => Some(x),
_ => None,
}
}
/// Returns a mutable reference to the underlying named client pipe
pub fn as_mut_client(&mut self) -> Option<&mut NamedPipeClient> {
match self {
Self::Client(x) => Some(x),
_ => None,
}
}
/// Consumes and returns the underlying named client pipe
pub fn into_client(self) -> Option<NamedPipeClient> {
match self {
Self::Client(x) => Some(x),
_ => None,
}
}
/// Returns a reference to the underlying named server pipe
pub fn as_server(&self) -> Option<&NamedPipeServer> {
match self {
Self::Server(x) => Some(x),
_ => None,
}
}
/// Returns a mutable reference to the underlying named server pipe
pub fn as_mut_server(&mut self) -> Option<&mut NamedPipeServer> {
match self {
Self::Server(x) => Some(x),
_ => None,
}
}
/// Consumes and returns the underlying named server pipe
pub fn into_server(self) -> Option<NamedPipeServer> {
match self {
Self::Server(x) => Some(x),
_ => None,
}
}
/// Attempts to connect as a client pipe
pub(super) fn connect_as_client(addr: &OsStr) -> io::Result<Self> {
let pipe = loop {
match ClientOptions::new().open(addr) {
Ok(client) => break client,
Err(e) if e.raw_os_error() == Some(ERROR_PIPE_BUSY as i32) => (),
Err(e) => return Err(e),
}
tokio::time::sleep(Duration::from_millis(BUSY_PIPE_SLEEP_MILLIS)).await;
};
Ok(NamedPipe::from(pipe))
}
}

@ -0,0 +1,49 @@
use super::{Interest, Ready, Reconnectable};
use async_trait::async_trait;
use std::io;
mod inmemory;
pub use inmemory::*;
/// Interface representing a transport of typed data into and out of the system
#[async_trait]
pub trait TypedTransport: Reconnectable {
/// Type of input the transport can read
type Input;
/// Type of output the transport can write
type Output;
/// Tries to read a value from the transport, returning `Ok(Some(Self::Input))` upon
/// acquiring new input, or `Ok(None)` if the channel has closed.
///
/// This call may return an error with [`ErrorKind::WouldBlock`] in the case that the transport
/// is not ready to read data.
///
/// [`ErrorKind::WouldBlock`]: io::ErrorKind::WouldBlock
fn try_read(&self) -> io::Result<Option<Self::Input>>;
/// Try to write a value to the transport, returning `Ok(())` upon successfully writing all of
/// the data
///
/// This call may return an error with [`ErrorKind::WouldBlock`] in the case that the transport
/// is not ready to write data.
///
/// [`ErrorKind::WouldBlock`]: io::ErrorKind::WouldBlock
fn try_write(&self, value: Self::Output) -> io::Result<()>;
/// Waits for the transport to be ready based on the given interest, returning the ready status
async fn ready(&self, interest: Interest) -> io::Result<Ready>;
/// Waits for the transport to be readable to follow up with `try_read`
async fn readable(&self) -> io::Result<()> {
let _ = self.ready(Interest::READABLE).await?;
Ok(())
}
/// Waits for the transport to be readable to follow up with `try_write`
async fn writeable(&self) -> io::Result<()> {
let _ = self.ready(Interest::WRITABLE).await?;
Ok(())
}
}

@ -0,0 +1,73 @@
use super::{Interest, Ready, Reconnectable, TypedTransport};
use async_trait::async_trait;
use std::io;
use tokio::sync::mpsc::{
self,
error::{TryRecvError, TrySendError},
};
/// Represents a [`TypedTransport`] of data across the network that uses tokio's mpsc [`Sender`]
/// and [`Receiver`] underneath.
///
/// [`Sender`]: mpsc::Sender
/// [`Receiver`]: mpsc::Receiver
#[derive(Debug)]
pub struct InmemoryTypedTransport<T, U> {
tx: mpsc::Sender<T>,
rx: mpsc::Receiver<U>,
}
impl<T, U> InmemoryTypedTransport<T, U> {
pub fn new(tx: mpsc::Sender<T>, rx: mpsc::Receiver<U>) -> Self {
Self { tx, rx }
}
/// Creates a pair of connected transports using `buffer` as maximum
/// channel capacity for each
pub fn pair(buffer: usize) -> (InmemoryTypedTransport<T, U>, InmemoryTypedTransport<U, T>) {
let (t_tx, t_rx) = mpsc::channel(buffer);
let (u_tx, u_rx) = mpsc::channel(buffer);
(
InmemoryTypedTransport::new(t_tx, u_rx),
InmemoryTypedTransport::new(u_tx, t_rx),
)
}
}
#[async_trait]
impl<T, U> Reconnectable for InmemoryTypedTransport<T, U> {
/// Once the underlying channels have closed, there is no way for this transport to
/// re-establish those channels; therefore, reconnecting will always fail with
/// [`ErrorKind::Unsupported`]
///
/// [`ErrorKind::Unsupported`]: io::ErrorKind::Unsupported
async fn reconnect(&mut self) -> io::Result<()> {
Err(io::Error::from(io::ErrorKind::Unsupported))
}
}
#[async_trait]
impl<T, U> TypedTransport<T, U> for InmemoryTypedTransport<T, U> {
type Input = U;
type Output = T;
fn try_read(&self) -> io::Result<Option<Self::Input>> {
match self.rx.try_recv() {
Ok(x) => Ok(Some(x)),
Err(TryRecvError::Empty) => Err(io::Error::from(io::ErrorKind::WouldBlock)),
Err(TryRecvError::Disconnected) => Ok(None),
}
}
fn try_write(&self, value: Self::Output) -> io::Result<()> {
match self.tx.try_send(value) {
Ok(()) => Ok(()),
Err(TrySendError::Full(_)) => Err(io::Error::from(io::ErrorKind::WouldBlock)),
Err(TryRecvError::Closed(_)) => Err(io::Error::from(io::ErrorKind::BrokenPipe)),
}
}
async fn ready(&self, interest: Interest) -> io::Result<Ready> {
todo!();
}
}

@ -1,61 +1,70 @@
use crate::{TypedAsyncRead, TypedAsyncWrite, TypedTransport};
use super::{Interest, Ready, TypedTransport};
use async_trait::async_trait;
use serde::{de::DeserializeOwned, Serialize};
use std::io;
/// Interface representing a transport that uses [`serde`] to serialize and deserialize data
/// as it is sent and received
pub trait UntypedTransport: UntypedTransportRead + UntypedTransportWrite {}
/// Interface representing a transport's read half that uses [`serde`] to deserialize data as it is
/// received
#[async_trait]
pub trait UntypedTransportRead: Send + Unpin {
pub trait UntypedTransport {
/// Attempts to read some data as `T`, returning [`io::Error`] if unable to deserialize
/// or some other error occurs. `Some(T)` is returned if successful. `None` is
/// returned if no more data is available.
async fn read<T>(&mut self) -> io::Result<Option<T>>
///
/// This call may return an error with [`ErrorKind::WouldBlock`] in the case that the transport
/// is not ready to read data.
///
/// [`ErrorKind::WouldBlock`]: io::ErrorKind::WouldBlock
fn try_read<T>(&self) -> io::Result<Option<T>>
where
T: DeserializeOwned;
}
/// Interface representing a transport's write half that uses [`serde`] to serialize data as it is
/// sent
#[async_trait]
pub trait UntypedTransportWrite: Send + Unpin {
/// Attempts to write some data of type `T`, returning [`io::Error`] if unable to serialize
/// or some other error occurs.
async fn write<T>(&mut self, data: T) -> io::Result<()>
/// Attempts to write some data `T` by serializing it into bytes, returning [`io::Error`] if
/// unable to serialize or some other error occurs
///
/// This call may return an error with [`ErrorKind::WouldBlock`] in the case that the transport
/// is not ready to write data.
///
/// [`ErrorKind::WouldBlock`]: io::ErrorKind::WouldBlock
fn try_write<T>(&self, value: &T) -> io::Result<()>
where
T: Serialize + Send + 'static;
}
T: Serialize;
impl<T, W, R> TypedTransport<W, R> for T
where
T: UntypedTransport + Send,
W: Serialize + Send + 'static,
R: DeserializeOwned,
{
}
/// Waits for the transport to be ready based on the given interest, returning the ready status
async fn ready(&self, interest: Interest) -> io::Result<Ready>;
#[async_trait]
impl<W, T> TypedAsyncWrite<T> for W
where
W: UntypedTransportWrite + Send,
T: Serialize + Send + 'static,
{
async fn write(&mut self, data: T) -> io::Result<()> {
W::write(self, data).await
/// Waits for the transport to be readable to follow up with `try_read`
async fn readable(&self) -> io::Result<()> {
let _ = self.ready(Interest::READABLE).await?;
Ok(())
}
/// Waits for the transport to be readable to follow up with `try_write`
async fn writeable(&self) -> io::Result<()> {
let _ = self.ready(Interest::WRITABLE).await?;
Ok(())
}
}
#[async_trait]
impl<R, T> TypedAsyncRead<T> for R
impl<T, I, O> TypedTransport<I, O> for T
where
R: UntypedTransportRead + Send,
T: DeserializeOwned,
T: UntypedTransport,
I: DeserializeOwned,
O: Serialize,
{
async fn read(&mut self) -> io::Result<Option<T>> {
R::read(self).await
/// Tries to read a value from the transport
fn try_read(&self) -> io::Result<Option<I>> {
UntypedTransport::try_read(self)
}
/// Try to write a value to the transport
fn try_write(&self, value: O) -> io::Result<()> {
UntypedTransport::try_write(self, &value)
}
/// Waits for the transport to be ready based on the given interest, returning the ready status
async fn ready(&self, interest: Interest) -> io::Result<Ready> {
UntypedTransport::ready(self, interest).await
}
}

@ -1,101 +0,0 @@
use derive_more::{From, TryInto};
use std::{
pin::Pin,
task::{Context, Poll},
};
use tokio::{
io::{self, AsyncRead, AsyncWrite, ReadBuf},
net::windows::named_pipe::{NamedPipeClient, NamedPipeServer},
};
#[derive(From, TryInto)]
pub enum NamedPipe {
Client(NamedPipeClient),
Server(NamedPipeServer),
}
impl NamedPipe {
pub fn as_client(&self) -> Option<&NamedPipeClient> {
match self {
Self::Client(x) => Some(x),
_ => None,
}
}
pub fn as_mut_client(&mut self) -> Option<&mut NamedPipeClient> {
match self {
Self::Client(x) => Some(x),
_ => None,
}
}
pub fn into_client(self) -> Option<NamedPipeClient> {
match self {
Self::Client(x) => Some(x),
_ => None,
}
}
pub fn as_server(&self) -> Option<&NamedPipeServer> {
match self {
Self::Server(x) => Some(x),
_ => None,
}
}
pub fn as_mut_server(&mut self) -> Option<&mut NamedPipeServer> {
match self {
Self::Server(x) => Some(x),
_ => None,
}
}
pub fn into_server(self) -> Option<NamedPipeServer> {
match self {
Self::Server(x) => Some(x),
_ => None,
}
}
}
impl AsyncRead for NamedPipe {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
match Pin::get_mut(self) {
Self::Client(x) => Pin::new(x).poll_read(cx, buf),
Self::Server(x) => Pin::new(x).poll_read(cx, buf),
}
}
}
impl AsyncWrite for NamedPipe {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, io::Error>> {
match Pin::get_mut(self) {
Self::Client(x) => Pin::new(x).poll_write(cx, buf),
Self::Server(x) => Pin::new(x).poll_write(cx, buf),
}
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
match Pin::get_mut(self) {
Self::Client(x) => Pin::new(x).poll_flush(cx),
Self::Server(x) => Pin::new(x).poll_flush(cx),
}
}
fn poll_shutdown(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Result<(), io::Error>> {
match Pin::get_mut(self) {
Self::Client(x) => Pin::new(x).poll_shutdown(cx),
Self::Server(x) => Pin::new(x).poll_shutdown(cx),
}
}
}

@ -1,6 +1,6 @@
use distant_net::{
AuthClient, AuthErrorKind, AuthQuestion, AuthRequest, AuthServer, AuthVerifyKind, Client,
IntoSplit, MpscListener, MpscTransport, ServerExt,
InmemoryTypedTransport, IntoSplit, MpscListener, ServerExt,
};
use std::collections::HashMap;
use tokio::sync::mpsc;
@ -8,7 +8,7 @@ use tokio::sync::mpsc;
/// Spawns a server and client connected together, returning the client
fn setup() -> (AuthClient, mpsc::Receiver<AuthRequest>) {
// Make a pair of inmemory transports that we can use to test client and server connected
let (t1, t2) = MpscTransport::pair(100);
let (t1, t2) = InmemoryTypedTransport::pair(100);
// Create the client
let (writer, reader) = t1.into_split();

Loading…
Cancel
Save