|
|
|
@ -58,7 +58,27 @@ impl<T, const CAPACITY: usize> FramedTransport<T, CAPACITY> {
|
|
|
|
|
self.codec = codec;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Clears the internal buffers used by the transport
|
|
|
|
|
/// Returns a reference to the codec used by the transport.
|
|
|
|
|
///
|
|
|
|
|
/// ### Note
|
|
|
|
|
///
|
|
|
|
|
/// Be careful when accessing the codec to avoid corrupting it through unexpected modifications
|
|
|
|
|
/// as this will place the transport in an undefined state.
|
|
|
|
|
pub fn codec(&self) -> &dyn Codec {
|
|
|
|
|
self.codec.as_ref()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Returns a mutable reference to the codec used by the transport.
|
|
|
|
|
///
|
|
|
|
|
/// ### Note
|
|
|
|
|
///
|
|
|
|
|
/// Be careful when accessing the codec to avoid corrupting it through unexpected modifications
|
|
|
|
|
/// as this will place the transport in an undefined state.
|
|
|
|
|
pub fn mut_codec(&mut self) -> &mut dyn Codec {
|
|
|
|
|
self.codec.as_mut()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Clears the internal buffers used by the transport.
|
|
|
|
|
pub fn clear(&mut self) {
|
|
|
|
|
self.incoming.clear();
|
|
|
|
|
self.outgoing.clear();
|
|
|
|
@ -506,7 +526,7 @@ mod tests {
|
|
|
|
|
use bytes::BufMut;
|
|
|
|
|
|
|
|
|
|
/// Codec that always succeeds without altering the frame
|
|
|
|
|
#[derive(Clone)]
|
|
|
|
|
#[derive(Clone, Debug, PartialEq, Eq)]
|
|
|
|
|
struct OkCodec;
|
|
|
|
|
|
|
|
|
|
impl Codec for OkCodec {
|
|
|
|
@ -520,7 +540,7 @@ mod tests {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Codec that always fails
|
|
|
|
|
#[derive(Clone)]
|
|
|
|
|
#[derive(Clone, Debug, PartialEq, Eq)]
|
|
|
|
|
struct ErrCodec;
|
|
|
|
|
|
|
|
|
|
impl Codec for ErrCodec {
|
|
|
|
@ -533,6 +553,20 @@ mod tests {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Hardcoded custom codec so we can verify it works differently than plain codec
|
|
|
|
|
#[derive(Clone)]
|
|
|
|
|
struct CustomCodec;
|
|
|
|
|
|
|
|
|
|
impl Codec for CustomCodec {
|
|
|
|
|
fn encode<'a>(&mut self, _: Frame<'a>) -> io::Result<Frame<'a>> {
|
|
|
|
|
Ok(Frame::new(b"encode"))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn decode<'a>(&mut self, _: Frame<'a>) -> io::Result<Frame<'a>> {
|
|
|
|
|
Ok(Frame::new(b"decode"))
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Simulate calls to try_read by feeding back `data` in `step` increments, triggering a block
|
|
|
|
|
/// if `block_on` returns true where `block_on` is provided a counter value that is incremented
|
|
|
|
|
/// every time the simulated `try_read` function is called
|
|
|
|
@ -944,22 +978,135 @@ mod tests {
|
|
|
|
|
|
|
|
|
|
#[tokio::test]
|
|
|
|
|
async fn handshake_failing_should_ensure_existing_codec_remains() {
|
|
|
|
|
todo!();
|
|
|
|
|
let (mut t1, t2) = FramedTransport::test_pair(100);
|
|
|
|
|
|
|
|
|
|
// Set a different codec on our transport so we can verify it doesn't change
|
|
|
|
|
t1.set_codec(Box::new(CustomCodec));
|
|
|
|
|
|
|
|
|
|
// Drop our transport on the other side to cause an immediate failure
|
|
|
|
|
drop(t2);
|
|
|
|
|
|
|
|
|
|
// Ensure we detect the failure on handshake
|
|
|
|
|
t1.client_handshake().await.unwrap_err();
|
|
|
|
|
|
|
|
|
|
// Verify that the codec did not reset to plain text by using the codec
|
|
|
|
|
assert_eq!(t1.codec.encode(Frame::new(b"test")).unwrap(), b"encode");
|
|
|
|
|
assert_eq!(t1.codec.decode(Frame::new(b"test")).unwrap(), b"decode");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[tokio::test]
|
|
|
|
|
async fn handshake_should_clear_any_intermittent_buffer_contents_prior_to_handshake() {
|
|
|
|
|
todo!();
|
|
|
|
|
async fn handshake_should_clear_any_intermittent_buffer_contents_prior_to_handshake_failing() {
|
|
|
|
|
let (mut t1, t2) = FramedTransport::test_pair(100);
|
|
|
|
|
|
|
|
|
|
// Set a different codec on our transport so we can verify it doesn't change
|
|
|
|
|
t1.set_codec(Box::new(CustomCodec));
|
|
|
|
|
|
|
|
|
|
// Drop our transport on the other side to cause an immediate failure
|
|
|
|
|
drop(t2);
|
|
|
|
|
|
|
|
|
|
// Put some garbage in our buffers
|
|
|
|
|
t1.incoming.extend_from_slice(b"garbage in");
|
|
|
|
|
t1.outgoing.extend_from_slice(b"garbage out");
|
|
|
|
|
|
|
|
|
|
// Ensure we detect the failure on handshake
|
|
|
|
|
t1.client_handshake().await.unwrap_err();
|
|
|
|
|
|
|
|
|
|
// Verify that the incoming and outgoing buffers are empty
|
|
|
|
|
assert!(t1.incoming.is_empty());
|
|
|
|
|
assert!(t1.outgoing.is_empty());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[tokio::test]
|
|
|
|
|
async fn handshake_should_clear_any_intermittent_buffer_contents_prior_to_handshake_succeeding()
|
|
|
|
|
{
|
|
|
|
|
let (mut t1, mut t2) = FramedTransport::test_pair(100);
|
|
|
|
|
|
|
|
|
|
// NOTE: Spawn a separate task for one of our transports so we can communicate without
|
|
|
|
|
// deadlocking
|
|
|
|
|
let server_task = tokio::spawn(async move {
|
|
|
|
|
// Wait for handshake to complete
|
|
|
|
|
t2.server_handshake().await.unwrap();
|
|
|
|
|
|
|
|
|
|
// Receive one frame and echo it back
|
|
|
|
|
let frame = t2.read_frame().await.unwrap().unwrap();
|
|
|
|
|
t2.write_frame(frame).await.unwrap();
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
// Put some garbage in our buffers
|
|
|
|
|
t1.incoming.extend_from_slice(b"garbage in");
|
|
|
|
|
t1.outgoing.extend_from_slice(b"garbage out");
|
|
|
|
|
|
|
|
|
|
t1.client_handshake().await.unwrap();
|
|
|
|
|
|
|
|
|
|
// Verify that the transports can still communicate with one another
|
|
|
|
|
t1.write_frame(b"hello world").await.unwrap();
|
|
|
|
|
assert_eq!(t1.read_frame().await.unwrap().unwrap(), b"hello world");
|
|
|
|
|
|
|
|
|
|
// Ensure that the server transport did not error
|
|
|
|
|
server_task.await.unwrap();
|
|
|
|
|
|
|
|
|
|
// Verify that the incoming and outgoing buffers are empty
|
|
|
|
|
assert!(t1.incoming.is_empty());
|
|
|
|
|
assert!(t1.outgoing.is_empty());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[tokio::test]
|
|
|
|
|
async fn handshake_for_client_should_fail_if_receives_unexpected_frame_instead_of_options() {
|
|
|
|
|
todo!();
|
|
|
|
|
let (mut t1, mut t2) = FramedTransport::test_pair(100);
|
|
|
|
|
|
|
|
|
|
// NOTE: Spawn a separate task for one of our transports so we can communicate without
|
|
|
|
|
// deadlocking
|
|
|
|
|
let server_task = tokio::spawn(async move {
|
|
|
|
|
t2.write_frame(b"not a valid frame for handshake")
|
|
|
|
|
.await
|
|
|
|
|
.unwrap();
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
// Ensure we detect the failure on handshake
|
|
|
|
|
let err = t1.client_handshake().await.unwrap_err();
|
|
|
|
|
assert_eq!(err.kind(), io::ErrorKind::InvalidData);
|
|
|
|
|
|
|
|
|
|
// Ensure that the server transport did not error
|
|
|
|
|
server_task.await.unwrap();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[tokio::test]
|
|
|
|
|
async fn handshake_for_client_should_fail_unable_to_send_codec_choice_to_other_side() {
|
|
|
|
|
todo!();
|
|
|
|
|
let (mut t1, mut t2) = FramedTransport::test_pair(100);
|
|
|
|
|
/* #[derive(Debug, Serialize, Deserialize)]
|
|
|
|
|
struct Choice {
|
|
|
|
|
compression_level: Option<CompressionLevel>,
|
|
|
|
|
compression_type: Option<CompressionType>,
|
|
|
|
|
encryption_type: Option<EncryptionType>,
|
|
|
|
|
} */
|
|
|
|
|
|
|
|
|
|
#[derive(Debug, Serialize, Deserialize)]
|
|
|
|
|
struct Options {
|
|
|
|
|
compression_types: Vec<CompressionType>,
|
|
|
|
|
encryption_types: Vec<EncryptionType>,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// NOTE: Spawn a separate task for one of our transports so we can communicate without
|
|
|
|
|
// deadlocking
|
|
|
|
|
let server_task = tokio::spawn(async move {
|
|
|
|
|
// Send options, and then quit so the client side will fail
|
|
|
|
|
t2.write_frame(
|
|
|
|
|
utils::serialize_to_vec(&Options {
|
|
|
|
|
compression_types: Vec::new(),
|
|
|
|
|
encryption_types: Vec::new(),
|
|
|
|
|
})
|
|
|
|
|
.unwrap(),
|
|
|
|
|
)
|
|
|
|
|
.await
|
|
|
|
|
.unwrap();
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
// Ensure we detect the failure on handshake
|
|
|
|
|
let err = t1.client_handshake().await.unwrap_err();
|
|
|
|
|
assert_eq!(err.kind(), io::ErrorKind::WriteZero);
|
|
|
|
|
|
|
|
|
|
// Ensure that the server transport did not error
|
|
|
|
|
server_task.await.unwrap();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[tokio::test]
|
|
|
|
|