pull/146/head
Chip Senkbeil 2 years ago
parent a64ed4b665
commit 1bd80ae0db
No known key found for this signature in database
GPG Key ID: 35EF1F8EC72A4131

@ -1,24 +1,24 @@
mod any;
mod auth;
mod client;
/* mod auth;
mod client; */
mod id;
mod key;
mod listener;
mod packet;
mod port;
mod server;
// mod server;
mod transport;
mod utils;
pub use any::*;
pub use auth::*;
pub use client::*;
/* pub use auth::*;
pub use client::*; */
pub use id::*;
pub use key::*;
pub use listener::*;
pub use packet::*;
pub use port::*;
pub use server::*;
// pub use server::*;
pub use transport::*;
pub use log;

@ -64,12 +64,9 @@ impl Listener for TcpListener {
#[cfg(test)]
mod tests {
use super::*;
use crate::RawTransport;
use std::net::{Ipv6Addr, SocketAddr};
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
sync::oneshot,
task::JoinHandle,
};
use tokio::{sync::oneshot, task::JoinHandle};
#[tokio::test]
async fn should_fail_to_bind_if_port_already_bound() {
@ -109,7 +106,7 @@ mod tests {
.map_err(|x| io::Error::new(io::ErrorKind::Other, x.to_string()))?;
// Get first connection
let mut conn_1 = listener.accept().await?;
let conn_1 = listener.accept().await?;
// Send some data to the first connection (12 bytes)
conn_1.write_all(b"hello conn 1").await?;
@ -120,7 +117,7 @@ mod tests {
assert_eq!(&buf, b"hello server 1");
// Get second connection
let mut conn_2 = listener.accept().await?;
let conn_2 = listener.accept().await?;
// Send some data on to second connection (12 bytes)
conn_2.write_all(b"hello conn 2").await?;
@ -139,7 +136,7 @@ mod tests {
// Connect to the listener twice, sending some bytes and receiving some bytes from each
let mut buf: [u8; 12] = [0; 12];
let mut conn = TcpTransport::connect(&address)
let conn = TcpTransport::connect(&address)
.await
.expect("Conn 1 failed to connect");
conn.write_all(b"hello server 1")
@ -150,7 +147,7 @@ mod tests {
.expect("Conn 1 failed to read");
assert_eq!(&buf, b"hello conn 1");
let mut conn = TcpTransport::connect(&address)
let conn = TcpTransport::connect(&address)
.await
.expect("Conn 2 failed to connect");
conn.write_all(b"hello server 2")

@ -94,12 +94,9 @@ impl Listener for UnixSocketListener {
#[cfg(test)]
mod tests {
use super::*;
use crate::RawTransport;
use tempfile::NamedTempFile;
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
sync::oneshot,
task::JoinHandle,
};
use tokio::{sync::oneshot, task::JoinHandle};
#[tokio::test]
async fn should_succeed_to_bind_if_file_exists_at_path_but_nothing_listening() {
@ -154,7 +151,7 @@ mod tests {
.map_err(|x| io::Error::new(io::ErrorKind::Other, x.display().to_string()))?;
// Get first connection
let mut conn_1 = listener.accept().await?;
let conn_1 = listener.accept().await?;
// Send some data to the first connection (12 bytes)
conn_1.write_all(b"hello conn 1").await?;
@ -165,7 +162,7 @@ mod tests {
assert_eq!(&buf, b"hello server 1");
// Get second connection
let mut conn_2 = listener.accept().await?;
let conn_2 = listener.accept().await?;
// Send some data on to second connection (12 bytes)
conn_2.write_all(b"hello conn 2").await?;
@ -184,7 +181,7 @@ mod tests {
// Connect to the listener twice, sending some bytes and receiving some bytes from each
let mut buf: [u8; 12] = [0; 12];
let mut conn = UnixSocketTransport::connect(&path)
let conn = UnixSocketTransport::connect(&path)
.await
.expect("Conn 1 failed to connect");
conn.write_all(b"hello server 1")
@ -195,7 +192,7 @@ mod tests {
.expect("Conn 1 failed to read");
assert_eq!(&buf, b"hello conn 1");
let mut conn = UnixSocketTransport::connect(&path)
let conn = UnixSocketTransport::connect(&path)
.await
.expect("Conn 2 failed to connect");
conn.write_all(b"hello server 2")

@ -1,16 +1,16 @@
use async_trait::async_trait;
use std::io;
mod router;
// mod router;
mod raw;
pub use raw::*;
mod typed;
pub use typed::*;
/* mod typed;
pub use typed::*; */
mod untyped;
pub use untyped::*;
/* mod untyped;
pub use untyped::*; */
pub use tokio::io::{Interest, Ready};

@ -2,11 +2,11 @@ use super::{Interest, Ready, Reconnectable};
use async_trait::async_trait;
use std::io;
mod framed;
pub use framed::*;
/* mod framed;
pub use framed::*; */
mod inmemory;
pub use inmemory::*;
/* mod inmemory;
pub use inmemory::*; */
mod tcp;
pub use tcp::*;
@ -14,6 +14,9 @@ pub use tcp::*;
#[cfg(unix)]
mod unix;
#[cfg(unix)]
pub use unix::*;
#[cfg(windows)]
mod windows;
@ -54,4 +57,71 @@ pub trait RawTransport: Reconnectable {
let _ = self.ready(Interest::WRITABLE).await?;
Ok(())
}
/// Reads exactly `n` bytes where `n` is the length of `buf` by continuing to call [`try_read`]
/// until completed. Calls to [`readable`] are made to ensure the transport is ready. Returns
/// the total bytes read.
///
/// [`try_read`]: RawTransport::try_read
/// [`readable`]: RawTransport::readable
async fn read_exact(&self, buf: &mut [u8]) -> io::Result<usize> {
let mut i = 0;
while i < buf.len() {
self.readable().await?;
match self.try_read(&mut buf[i..]) {
// If we get 0 bytes read, this usually means that the underlying reader
// has closed, so we will return an EOF error to reflect that
//
// NOTE: `try_read` can also return 0 if the buf len is zero, but because we check
// that our index is < len, the situation where we call try_read with a buf
// of len 0 will never happen
Ok(0) => return Err(io::Error::from(io::ErrorKind::UnexpectedEof)),
Ok(n) => i += n,
// Because we are using `try_read`, it can be possible for it to return
// WouldBlock; so, if we encounter that then we just wait for next readable
Err(x) if x.kind() == io::ErrorKind::WouldBlock => continue,
Err(x) => return Err(x),
}
}
Ok(i)
}
/// Writes all of `buf` by continuing to call [`try_write`] until completed. Calls to
/// [`writeable`] are made to ensure the transport is ready.
///
/// [`try_write`]: RawTransport::try_write
/// [`writable`]: RawTransport::writable
async fn write_all(&self, buf: &[u8]) -> io::Result<()> {
let mut i = 0;
while i < buf.len() {
self.writeable().await?;
match self.try_write(&buf[i..]) {
// If we get 0 bytes written, this usually means that the underlying writer
// has closed, so we will return a broken pipe error to reflect that
//
// NOTE: `try_write` can also return 0 if the buf len is zero, but because we check
// that our index is < len, the situation where we call try_write with a buf
// of len 0 will never happen
Ok(0) => return Err(io::Error::from(io::ErrorKind::BrokenPipe)),
Ok(n) => i += n,
// Because we are using `try_write`, it can be possible for it to return
// WouldBlock; so, if we encounter that then we just wait for next writeable
Err(x) if x.kind() == io::ErrorKind::WouldBlock => continue,
Err(x) => return Err(x),
}
}
Ok(())
}
}

@ -46,9 +46,6 @@ impl fmt::Debug for TcpTransport {
#[async_trait]
impl Reconnectable for TcpTransport {
async fn reconnect(&mut self) -> io::Result<()> {
// Drop the existing connection to ensure we are disconnected before trying again
drop(self.inner);
self.inner = TcpStream::connect((self.addr, self.port)).await?;
Ok(())
}
@ -106,12 +103,13 @@ mod tests {
}
#[tokio::test]
async fn should_be_able_to_send_and_receive_data() {
async fn should_be_able_to_read_and_write_data() {
let (tx, rx) = oneshot::channel();
// Spawn a task that will wait for a connection, send data,
// and receive data that it will return in the task
let task: JoinHandle<io::Result<()>> = tokio::spawn(async move {
use tokio::io::{AsyncReadExt, AsyncWriteExt};
let addr = find_ephemeral_addr().await;
// Start listening at the distinct address
@ -141,9 +139,11 @@ mod tests {
// Connect to the socket, send some bytes, and get some bytes
let mut buf: [u8; 10] = [0; 10];
let mut conn = TcpTransport::connect(&addr)
let conn = TcpTransport::connect(&addr)
.await
.expect("Conn failed to connect");
// Continually read until we get all of the data
conn.read_exact(&mut buf)
.await
.expect("Conn failed to read");

@ -39,9 +39,6 @@ impl fmt::Debug for UnixSocketTransport {
#[async_trait]
impl Reconnectable for UnixSocketTransport {
async fn reconnect(&mut self) -> io::Result<()> {
// Drop the existing connection to ensure we are disconnected before trying again
drop(self.inner);
self.inner = UnixStream::connect(self.path.as_path()).await?;
Ok(())
}
@ -101,7 +98,7 @@ mod tests {
}
#[tokio::test]
async fn should_be_able_to_send_and_receive_data() {
async fn should_be_able_to_read_and_write_data() {
let (tx, rx) = oneshot::channel();
// Spawn a task that will wait for a connection, send data,
@ -140,7 +137,7 @@ mod tests {
// Connect to the socket, send some bytes, and get some bytes
let mut buf: [u8; 10] = [0; 10];
let mut conn = UnixSocketTransport::connect(&path)
let conn = UnixSocketTransport::connect(&path)
.await
.expect("Conn failed to connect");
conn.read_exact(&mut buf)

@ -106,7 +106,7 @@ mod tests {
}
#[tokio::test]
async fn should_be_able_to_send_and_receive_data() {
async fn should_be_able_to_read_and_write_data() {
let (tx, rx) = oneshot::channel();
// Spawn a task that will wait for a connection, send data,

Loading…
Cancel
Save