|
|
|
@ -4,7 +4,7 @@ use async_trait::async_trait;
|
|
|
|
|
use bytes::{Buf, BytesMut};
|
|
|
|
|
use log::*;
|
|
|
|
|
use serde::{Deserialize, Serialize};
|
|
|
|
|
use std::{fmt, io};
|
|
|
|
|
use std::{fmt, io, time::Duration};
|
|
|
|
|
|
|
|
|
|
mod codec;
|
|
|
|
|
mod exchange;
|
|
|
|
@ -16,8 +16,11 @@ pub use exchange::*;
|
|
|
|
|
pub use frame::*;
|
|
|
|
|
pub use handshake::*;
|
|
|
|
|
|
|
|
|
|
/// By default, framed transport's initial capacity (and max single-read) will be 8 KiB
|
|
|
|
|
const DEFAULT_CAPACITY: usize = 8 * 1024;
|
|
|
|
|
/// Size of the read buffer when reading bytes to construct a frame
|
|
|
|
|
const READ_BUF_SIZE: usize = 8 * 1024;
|
|
|
|
|
|
|
|
|
|
/// Duration to wait after WouldBlock received during looping operations like `read_frame`
|
|
|
|
|
const SLEEP_DURATION: Duration = Duration::from_millis(50);
|
|
|
|
|
|
|
|
|
|
/// Represents a wrapper around a [`Transport`] that reads and writes using frames defined by a
|
|
|
|
|
/// [`Codec`]. `CAPACITY` represents both the initial capacity of incoming and outgoing buffers as
|
|
|
|
@ -25,20 +28,20 @@ const DEFAULT_CAPACITY: usize = 8 * 1024;
|
|
|
|
|
///
|
|
|
|
|
/// [`try_read`]: Transport::try_read
|
|
|
|
|
#[derive(Clone)]
|
|
|
|
|
pub struct FramedTransport<T, const CAPACITY: usize = DEFAULT_CAPACITY> {
|
|
|
|
|
pub struct FramedTransport<T> {
|
|
|
|
|
inner: T,
|
|
|
|
|
codec: BoxedCodec,
|
|
|
|
|
incoming: BytesMut,
|
|
|
|
|
outgoing: BytesMut,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl<T, const CAPACITY: usize> FramedTransport<T, CAPACITY> {
|
|
|
|
|
impl<T> FramedTransport<T> {
|
|
|
|
|
pub fn new(inner: T, codec: BoxedCodec) -> Self {
|
|
|
|
|
Self {
|
|
|
|
|
inner,
|
|
|
|
|
codec,
|
|
|
|
|
incoming: BytesMut::with_capacity(CAPACITY),
|
|
|
|
|
outgoing: BytesMut::with_capacity(CAPACITY),
|
|
|
|
|
incoming: BytesMut::with_capacity(READ_BUF_SIZE * 2),
|
|
|
|
|
outgoing: BytesMut::with_capacity(READ_BUF_SIZE * 2),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -85,20 +88,16 @@ impl<T, const CAPACITY: usize> FramedTransport<T, CAPACITY> {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl<T, const CAPACITY: usize> fmt::Debug for FramedTransport<T, CAPACITY> {
|
|
|
|
|
impl<T> fmt::Debug for FramedTransport<T> {
|
|
|
|
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
|
|
|
|
f.debug_struct("FramedTransport")
|
|
|
|
|
.field("capacity", &CAPACITY)
|
|
|
|
|
.field("incoming", &self.incoming)
|
|
|
|
|
.field("outgoing", &self.outgoing)
|
|
|
|
|
.finish()
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl<T, const CAPACITY: usize> FramedTransport<T, CAPACITY>
|
|
|
|
|
where
|
|
|
|
|
T: Transport,
|
|
|
|
|
{
|
|
|
|
|
impl<T: Transport> FramedTransport<T> {
|
|
|
|
|
/// Waits for the transport to be ready based on the given interest, returning the ready status
|
|
|
|
|
pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
|
|
|
|
|
Transport::ready(&self.inner, interest).await
|
|
|
|
@ -142,12 +141,7 @@ where
|
|
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl<T, const CAPACITY: usize> FramedTransport<T, CAPACITY>
|
|
|
|
|
where
|
|
|
|
|
T: Transport,
|
|
|
|
|
{
|
|
|
|
|
/// Reads a frame of bytes by using the [`Codec`] tied to this transport. Returns
|
|
|
|
|
/// `Ok(Some(frame))` upon reading a frame, or `Ok(None)` if the underlying transport has
|
|
|
|
|
/// closed.
|
|
|
|
@ -158,7 +152,7 @@ where
|
|
|
|
|
/// [`ErrorKind::WouldBlock`]: io::ErrorKind::WouldBlock
|
|
|
|
|
pub fn try_read_frame(&mut self) -> io::Result<Option<OwnedFrame>> {
|
|
|
|
|
// Continually read bytes into the incoming queue and then attempt to tease out a frame
|
|
|
|
|
let mut buf = [0; CAPACITY];
|
|
|
|
|
let mut buf = [0; READ_BUF_SIZE];
|
|
|
|
|
|
|
|
|
|
loop {
|
|
|
|
|
match self.inner.try_read(&mut buf) {
|
|
|
|
@ -202,7 +196,10 @@ where
|
|
|
|
|
self.readable().await?;
|
|
|
|
|
|
|
|
|
|
match self.try_read_frame() {
|
|
|
|
|
Err(x) if x.kind() == io::ErrorKind::WouldBlock => tokio::task::yield_now().await,
|
|
|
|
|
Err(x) if x.kind() == io::ErrorKind::WouldBlock => {
|
|
|
|
|
// NOTE: We sleep for a little bit before trying again to avoid pegging CPU
|
|
|
|
|
tokio::time::sleep(SLEEP_DURATION).await
|
|
|
|
|
}
|
|
|
|
|
x => return x,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@ -255,7 +252,8 @@ where
|
|
|
|
|
self.writeable().await?;
|
|
|
|
|
match self.try_flush() {
|
|
|
|
|
Err(x) if x.kind() == io::ErrorKind::WouldBlock => {
|
|
|
|
|
tokio::task::yield_now().await
|
|
|
|
|
// NOTE: We sleep for a little bit before trying again to avoid pegging CPU
|
|
|
|
|
tokio::time::sleep(SLEEP_DURATION).await
|
|
|
|
|
}
|
|
|
|
|
x => return x,
|
|
|
|
|
}
|
|
|
|
@ -491,7 +489,7 @@ where
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[async_trait]
|
|
|
|
|
impl<T, const CAPACITY: usize> Reconnectable for FramedTransport<T, CAPACITY>
|
|
|
|
|
impl<T> Reconnectable for FramedTransport<T>
|
|
|
|
|
where
|
|
|
|
|
T: Transport + Send + Sync,
|
|
|
|
|
{
|
|
|
|
@ -500,7 +498,7 @@ where
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl<const CAPACITY: usize> FramedTransport<InmemoryTransport, CAPACITY> {
|
|
|
|
|
impl FramedTransport<InmemoryTransport> {
|
|
|
|
|
/// Produces a pair of inmemory transports that are connected to each other using
|
|
|
|
|
/// a standard codec
|
|
|
|
|
///
|
|
|
|
@ -508,8 +506,8 @@ impl<const CAPACITY: usize> FramedTransport<InmemoryTransport, CAPACITY> {
|
|
|
|
|
pub fn pair(
|
|
|
|
|
buffer: usize,
|
|
|
|
|
) -> (
|
|
|
|
|
FramedTransport<InmemoryTransport, CAPACITY>,
|
|
|
|
|
FramedTransport<InmemoryTransport, CAPACITY>,
|
|
|
|
|
FramedTransport<InmemoryTransport>,
|
|
|
|
|
FramedTransport<InmemoryTransport>,
|
|
|
|
|
) {
|
|
|
|
|
let (a, b) = InmemoryTransport::pair(buffer);
|
|
|
|
|
let a = FramedTransport::new(a, Box::new(PlainCodec::new()));
|
|
|
|
@ -624,7 +622,7 @@ mod tests {
|
|
|
|
|
#[test]
|
|
|
|
|
fn try_read_frame_should_return_would_block_if_fails_to_read_frame_before_blocking() {
|
|
|
|
|
// Should fail if immediately blocks
|
|
|
|
|
let mut transport = FramedTransport::<_>::new(
|
|
|
|
|
let mut transport = FramedTransport::new(
|
|
|
|
|
TestTransport {
|
|
|
|
|
f_try_read: Box::new(|_| Err(io::Error::from(io::ErrorKind::WouldBlock))),
|
|
|
|
|
f_ready: Box::new(|_| Ok(Ready::READABLE)),
|
|
|
|
@ -638,7 +636,7 @@ mod tests {
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
// Should fail if not read enough bytes before blocking
|
|
|
|
|
let mut transport = FramedTransport::<_>::new(
|
|
|
|
|
let mut transport = FramedTransport::new(
|
|
|
|
|
TestTransport {
|
|
|
|
|
f_try_read: simulate_try_read(vec![Frame::new(b"some data")], 1, |cnt| cnt == 1),
|
|
|
|
|
f_ready: Box::new(|_| Ok(Ready::READABLE)),
|
|
|
|
@ -654,7 +652,7 @@ mod tests {
|
|
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
|
fn try_read_frame_should_return_error_if_encountered_error_with_reading_bytes() {
|
|
|
|
|
let mut transport = FramedTransport::<_>::new(
|
|
|
|
|
let mut transport = FramedTransport::new(
|
|
|
|
|
TestTransport {
|
|
|
|
|
f_try_read: Box::new(|_| Err(io::Error::from(io::ErrorKind::NotConnected))),
|
|
|
|
|
f_ready: Box::new(|_| Ok(Ready::READABLE)),
|
|
|
|
@ -670,7 +668,7 @@ mod tests {
|
|
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
|
fn try_read_frame_should_return_error_if_encountered_error_during_decode() {
|
|
|
|
|
let mut transport = FramedTransport::<_>::new(
|
|
|
|
|
let mut transport = FramedTransport::new(
|
|
|
|
|
TestTransport {
|
|
|
|
|
f_try_read: simulate_try_read(vec![Frame::new(b"some data")], 1, |_| false),
|
|
|
|
|
f_ready: Box::new(|_| Ok(Ready::READABLE)),
|
|
|
|
@ -692,7 +690,7 @@ mod tests {
|
|
|
|
|
data.freeze()
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
let mut transport = FramedTransport::<_>::new(
|
|
|
|
|
let mut transport = FramedTransport::new(
|
|
|
|
|
TestTransport {
|
|
|
|
|
f_try_read: Box::new(move |buf| {
|
|
|
|
|
buf[..data.len()].copy_from_slice(data.as_ref());
|
|
|
|
@ -710,7 +708,7 @@ mod tests {
|
|
|
|
|
fn try_read_frame_should_keep_reading_until_a_frame_is_found() {
|
|
|
|
|
const STEP_SIZE: usize = Frame::HEADER_SIZE + 7;
|
|
|
|
|
|
|
|
|
|
let mut transport = FramedTransport::<_>::new(
|
|
|
|
|
let mut transport = FramedTransport::new(
|
|
|
|
|
TestTransport {
|
|
|
|
|
f_try_read: simulate_try_read(
|
|
|
|
|
vec![Frame::new(b"hello world"), Frame::new(b"test hello")],
|
|
|
|
@ -734,7 +732,7 @@ mod tests {
|
|
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
|
fn try_write_frame_should_return_would_block_if_fails_to_write_frame_before_blocking() {
|
|
|
|
|
let mut transport = FramedTransport::<_>::new(
|
|
|
|
|
let mut transport = FramedTransport::new(
|
|
|
|
|
TestTransport {
|
|
|
|
|
f_try_write: Box::new(|_| Err(io::Error::from(io::ErrorKind::WouldBlock))),
|
|
|
|
|
f_ready: Box::new(|_| Ok(Ready::WRITABLE)),
|
|
|
|
@ -755,7 +753,7 @@ mod tests {
|
|
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
|
fn try_write_frame_should_return_error_if_encountered_error_with_writing_bytes() {
|
|
|
|
|
let mut transport = FramedTransport::<_>::new(
|
|
|
|
|
let mut transport = FramedTransport::new(
|
|
|
|
|
TestTransport {
|
|
|
|
|
f_try_write: Box::new(|_| Err(io::Error::from(io::ErrorKind::NotConnected))),
|
|
|
|
|
f_ready: Box::new(|_| Ok(Ready::WRITABLE)),
|
|
|
|
@ -774,7 +772,7 @@ mod tests {
|
|
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
|
fn try_write_frame_should_return_error_if_encountered_error_during_encode() {
|
|
|
|
|
let mut transport = FramedTransport::<_>::new(
|
|
|
|
|
let mut transport = FramedTransport::new(
|
|
|
|
|
TestTransport {
|
|
|
|
|
f_try_write: Box::new(|buf| Ok(buf.len())),
|
|
|
|
|
f_ready: Box::new(|_| Ok(Ready::WRITABLE)),
|
|
|
|
@ -794,7 +792,7 @@ mod tests {
|
|
|
|
|
#[test]
|
|
|
|
|
fn try_write_frame_should_write_entire_frame_if_possible() {
|
|
|
|
|
let (tx, rx) = std::sync::mpsc::sync_channel(1);
|
|
|
|
|
let mut transport = FramedTransport::<_>::new(
|
|
|
|
|
let mut transport = FramedTransport::new(
|
|
|
|
|
TestTransport {
|
|
|
|
|
f_try_write: Box::new(move |buf| {
|
|
|
|
|
let len = buf.len();
|
|
|
|
@ -820,7 +818,7 @@ mod tests {
|
|
|
|
|
fn try_write_frame_should_write_any_prior_queued_bytes_before_writing_next_frame() {
|
|
|
|
|
const STEP_SIZE: usize = Frame::HEADER_SIZE + 5;
|
|
|
|
|
let (tx, rx) = std::sync::mpsc::sync_channel(10);
|
|
|
|
|
let mut transport = FramedTransport::<_>::new(
|
|
|
|
|
let mut transport = FramedTransport::new(
|
|
|
|
|
TestTransport {
|
|
|
|
|
f_try_write: Box::new(move |buf| {
|
|
|
|
|
static mut CNT: usize = 0;
|
|
|
|
@ -875,7 +873,7 @@ mod tests {
|
|
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
|
fn try_flush_should_return_error_if_try_write_fails() {
|
|
|
|
|
let mut transport = FramedTransport::<_>::new(
|
|
|
|
|
let mut transport = FramedTransport::new(
|
|
|
|
|
TestTransport {
|
|
|
|
|
f_try_write: Box::new(|_| Err(io::Error::from(io::ErrorKind::NotConnected))),
|
|
|
|
|
f_ready: Box::new(|_| Ok(Ready::WRITABLE)),
|
|
|
|
@ -896,7 +894,7 @@ mod tests {
|
|
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
|
fn try_flush_should_return_error_if_try_write_returns_0_bytes_written() {
|
|
|
|
|
let mut transport = FramedTransport::<_>::new(
|
|
|
|
|
let mut transport = FramedTransport::new(
|
|
|
|
|
TestTransport {
|
|
|
|
|
f_try_write: Box::new(|_| Ok(0)),
|
|
|
|
|
f_ready: Box::new(|_| Ok(Ready::WRITABLE)),
|
|
|
|
@ -917,7 +915,7 @@ mod tests {
|
|
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
|
fn try_flush_should_be_noop_if_nothing_to_flush() {
|
|
|
|
|
let mut transport = FramedTransport::<_>::new(
|
|
|
|
|
let mut transport = FramedTransport::new(
|
|
|
|
|
TestTransport {
|
|
|
|
|
f_try_write: Box::new(|_| Err(io::Error::from(io::ErrorKind::NotConnected))),
|
|
|
|
|
f_ready: Box::new(|_| Ok(Ready::WRITABLE)),
|
|
|
|
@ -934,7 +932,7 @@ mod tests {
|
|
|
|
|
fn try_flush_should_continually_call_try_write_until_outgoing_buffer_is_empty() {
|
|
|
|
|
const STEP_SIZE: usize = 5;
|
|
|
|
|
let (tx, rx) = std::sync::mpsc::sync_channel(10);
|
|
|
|
|
let mut transport = FramedTransport::<_>::new(
|
|
|
|
|
let mut transport = FramedTransport::new(
|
|
|
|
|
TestTransport {
|
|
|
|
|
f_try_write: Box::new(move |buf| {
|
|
|
|
|
let len = std::cmp::min(STEP_SIZE, buf.len());
|
|
|
|
|