Fix frame_read and frame_write not allowing other async tasks to run on WouldBlock

pull/146/head
Chip Senkbeil 2 years ago
parent 2a61796d04
commit d1757b9acf
No known key found for this signature in database
GPG Key ID: 35EF1F8EC72A4131

32
Cargo.lock generated

@ -811,6 +811,7 @@ dependencies = [
"chacha20poly1305",
"derive_more",
"dyn-clone",
"env_logger",
"flate2",
"hex",
"hkdf",
@ -824,6 +825,7 @@ dependencies = [
"serde_bytes",
"sha2 0.10.2",
"tempfile",
"test-log",
"tokio",
]
@ -942,6 +944,19 @@ dependencies = [
"encoding_rs",
]
[[package]]
name = "env_logger"
version = "0.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c90bf5f19754d10198ccb95b70664fc925bd1fc090a0fd9a6ebc54acc8cd6272"
dependencies = [
"atty",
"humantime",
"log",
"regex",
"termcolor",
]
[[package]]
name = "err-derive"
version = "0.3.1"
@ -1415,6 +1430,12 @@ dependencies = [
"digest 0.10.3",
]
[[package]]
name = "humantime"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4"
[[package]]
name = "ignore"
version = "0.4.18"
@ -3053,6 +3074,17 @@ dependencies = [
"winapi",
]
[[package]]
name = "test-log"
version = "0.2.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "38f0c854faeb68a048f0f2dc410c5ddae3bf83854ef0e4977d58306a5edef50e"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "textwrap"
version = "0.15.0"

@ -34,4 +34,6 @@ tokio = { version = "1.20.1", features = ["full"] }
schemars = { version = "0.8.10", optional = true }
[dev-dependencies]
env_logger = "0.9.1"
tempfile = "3.3.0"
test-log = "0.2.11"

@ -1,4 +1,4 @@
use super::{Interest, Ready, Reconnectable, Transport};
use super::{InmemoryTransport, Interest, Ready, Reconnectable, Transport};
use crate::utils;
use async_trait::async_trait;
use bytes::{Buf, BytesMut};
@ -182,7 +182,7 @@ where
self.readable().await?;
match self.try_read_frame() {
Err(x) if x.kind() == io::ErrorKind::WouldBlock => continue,
Err(x) if x.kind() == io::ErrorKind::WouldBlock => tokio::task::yield_now().await,
x => return x,
}
}
@ -222,7 +222,9 @@ where
Err(x) if x.kind() == io::ErrorKind::WouldBlock => loop {
self.writeable().await?;
match self.try_flush() {
Err(x) if x.kind() == io::ErrorKind::WouldBlock => continue,
Err(x) if x.kind() == io::ErrorKind::WouldBlock => {
tokio::task::yield_now().await
}
x => return x,
}
},
@ -466,7 +468,7 @@ where
}
}
impl<const CAPACITY: usize> FramedTransport<super::InmemoryTransport, CAPACITY> {
impl<const CAPACITY: usize> FramedTransport<InmemoryTransport, CAPACITY> {
/// Produces a pair of inmemory transports that are connected to each other using
/// a standard codec
///
@ -474,10 +476,10 @@ impl<const CAPACITY: usize> FramedTransport<super::InmemoryTransport, CAPACITY>
pub fn pair(
buffer: usize,
) -> (
FramedTransport<super::InmemoryTransport, CAPACITY>,
FramedTransport<super::InmemoryTransport, CAPACITY>,
FramedTransport<InmemoryTransport, CAPACITY>,
FramedTransport<InmemoryTransport, CAPACITY>,
) {
let (a, b) = super::InmemoryTransport::pair(buffer);
let (a, b) = InmemoryTransport::pair(buffer);
let a = FramedTransport::new(a, Box::new(PlainCodec::new()));
let b = FramedTransport::new(b, Box::new(PlainCodec::new()));
(a, b)
@ -485,13 +487,13 @@ impl<const CAPACITY: usize> FramedTransport<super::InmemoryTransport, CAPACITY>
}
#[cfg(test)]
impl FramedTransport<super::InmemoryTransport> {
impl FramedTransport<InmemoryTransport> {
/// Generates a test pair with default capacity
pub fn test_pair(
buffer: usize,
) -> (
FramedTransport<super::InmemoryTransport>,
FramedTransport<super::InmemoryTransport>,
FramedTransport<InmemoryTransport>,
FramedTransport<InmemoryTransport>,
) {
Self::pair(buffer)
}
@ -915,9 +917,29 @@ mod tests {
);
}
#[tokio::test]
#[test_log::test(tokio::test)]
async fn handshake_should_configure_transports_with_matching_codec() {
todo!();
let (mut t1, mut t2) = FramedTransport::test_pair(100);
// NOTE: Spawn a separate task for one of our transports so we can communicate without
// deadlocking
let server_task = tokio::spawn(async move {
// Wait for handshake to complete
t2.server_handshake().await.unwrap();
// Receive one frame and echo it back
let frame = t2.read_frame().await.unwrap().unwrap();
t2.write_frame(frame).await.unwrap();
});
t1.client_handshake().await.unwrap();
// Verify that the transports can still communicate with one another
t1.write_frame(b"hello world").await.unwrap();
assert_eq!(t1.read_frame().await.unwrap().unwrap(), b"hello world");
// Ensure that the server transport did not error
server_task.await.unwrap();
}
#[tokio::test]

Loading…
Cancel
Save