|
|
|
@ -173,42 +173,152 @@ mod tests {
|
|
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
|
fn is_rx_closed_should_properly_reflect_if_internal_rx_channel_is_closed() {
|
|
|
|
|
todo!();
|
|
|
|
|
let (write_tx, _write_rx) = mpsc::channel(1);
|
|
|
|
|
let (read_tx, read_rx) = mpsc::channel(1);
|
|
|
|
|
|
|
|
|
|
let transport = InmemoryTransport::new(write_tx, read_rx);
|
|
|
|
|
|
|
|
|
|
// Not closed when the channel is empty
|
|
|
|
|
assert!(!transport.is_rx_closed());
|
|
|
|
|
|
|
|
|
|
read_tx.try_send(b"some bytes".to_vec()).unwrap();
|
|
|
|
|
|
|
|
|
|
// Not closed when the channel has data (will queue up data)
|
|
|
|
|
assert!(!transport.is_rx_closed());
|
|
|
|
|
assert_eq!(
|
|
|
|
|
transport.buf.lock().unwrap().as_deref().unwrap(),
|
|
|
|
|
b"some bytes"
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
// Queue up one more set of bytes and then close the channel
|
|
|
|
|
read_tx.try_send(b"more".to_vec()).unwrap();
|
|
|
|
|
drop(read_tx);
|
|
|
|
|
|
|
|
|
|
// Not closed when channel has closed but has something remaining in the queue
|
|
|
|
|
assert!(!transport.is_rx_closed());
|
|
|
|
|
assert_eq!(
|
|
|
|
|
transport.buf.lock().unwrap().as_deref().unwrap(),
|
|
|
|
|
b"some bytesmore"
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
// Closed once there is nothing left in the channel and it has closed
|
|
|
|
|
assert!(transport.is_rx_closed());
|
|
|
|
|
assert_eq!(
|
|
|
|
|
transport.buf.lock().unwrap().as_deref().unwrap(),
|
|
|
|
|
b"some bytesmore"
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
|
fn try_read_should_succeed_if_able_to_read_entire_data_through_channel() {
|
|
|
|
|
todo!();
|
|
|
|
|
let (write_tx, _write_rx) = mpsc::channel(1);
|
|
|
|
|
let (read_tx, read_rx) = mpsc::channel(1);
|
|
|
|
|
|
|
|
|
|
let transport = InmemoryTransport::new(write_tx, read_rx);
|
|
|
|
|
|
|
|
|
|
// Queue up some data to be read
|
|
|
|
|
read_tx.try_send(b"some bytes".to_vec()).unwrap();
|
|
|
|
|
|
|
|
|
|
let mut buf = [0; 10];
|
|
|
|
|
assert_eq!(transport.try_read(&mut buf).unwrap(), 10);
|
|
|
|
|
assert_eq!(&buf[..10], b"some bytes");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
|
fn try_read_should_succeed_if_reading_cached_data_from_previous_read() {
|
|
|
|
|
todo!();
|
|
|
|
|
let (write_tx, _write_rx) = mpsc::channel(1);
|
|
|
|
|
let (read_tx, read_rx) = mpsc::channel(1);
|
|
|
|
|
|
|
|
|
|
let transport = InmemoryTransport::new(write_tx, read_rx);
|
|
|
|
|
|
|
|
|
|
// Queue up some data to be read
|
|
|
|
|
read_tx.try_send(b"some bytes".to_vec()).unwrap();
|
|
|
|
|
|
|
|
|
|
let mut buf = [0; 5];
|
|
|
|
|
assert_eq!(transport.try_read(&mut buf).unwrap(), 5);
|
|
|
|
|
assert_eq!(&buf[..5], b"some ");
|
|
|
|
|
|
|
|
|
|
// Queue up some new data to be read (previous data already consumed)
|
|
|
|
|
read_tx.try_send(b"more".to_vec()).unwrap();
|
|
|
|
|
|
|
|
|
|
let mut buf = [0; 2];
|
|
|
|
|
assert_eq!(transport.try_read(&mut buf).unwrap(), 2);
|
|
|
|
|
assert_eq!(&buf[..2], b"by");
|
|
|
|
|
|
|
|
|
|
// Inmemory still separates buffered bytes from next channel recv()
|
|
|
|
|
let mut buf = [0; 5];
|
|
|
|
|
assert_eq!(transport.try_read(&mut buf).unwrap(), 3);
|
|
|
|
|
assert_eq!(&buf[..3], b"tes");
|
|
|
|
|
|
|
|
|
|
let mut buf = [0; 5];
|
|
|
|
|
assert_eq!(transport.try_read(&mut buf).unwrap(), 4);
|
|
|
|
|
assert_eq!(&buf[..4], b"more");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
|
fn try_read_should_fail_with_would_block_if_channel_capacity_has_been_reached() {
|
|
|
|
|
todo!();
|
|
|
|
|
fn try_read_should_fail_with_would_block_if_channel_is_empty() {
|
|
|
|
|
let (write_tx, _write_rx) = mpsc::channel(1);
|
|
|
|
|
let (_read_tx, read_rx) = mpsc::channel(1);
|
|
|
|
|
|
|
|
|
|
let transport = InmemoryTransport::new(write_tx, read_rx);
|
|
|
|
|
|
|
|
|
|
assert_eq!(
|
|
|
|
|
transport.try_read(&mut [0; 5]).unwrap_err().kind(),
|
|
|
|
|
io::ErrorKind::WouldBlock
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
|
fn try_read_should_succeed_with_zero_bytes_read_if_channel_closed() {
|
|
|
|
|
todo!();
|
|
|
|
|
let (write_tx, _write_rx) = mpsc::channel(1);
|
|
|
|
|
let (read_tx, read_rx) = mpsc::channel(1);
|
|
|
|
|
|
|
|
|
|
// Drop to close the read channel
|
|
|
|
|
drop(read_tx);
|
|
|
|
|
|
|
|
|
|
let transport = InmemoryTransport::new(write_tx, read_rx);
|
|
|
|
|
assert_eq!(transport.try_read(&mut [0; 5]).unwrap(), 0);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
|
fn try_write_should_succeed_if_able_to_send_data_through_channel() {
|
|
|
|
|
todo!();
|
|
|
|
|
let (write_tx, _write_rx) = mpsc::channel(1);
|
|
|
|
|
let (_read_tx, read_rx) = mpsc::channel(1);
|
|
|
|
|
|
|
|
|
|
let transport = InmemoryTransport::new(write_tx, read_rx);
|
|
|
|
|
|
|
|
|
|
let value = b"some bytes";
|
|
|
|
|
assert_eq!(transport.try_write(value).unwrap(), value.len());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
|
fn try_write_should_fail_with_would_block_if_channel_capacity_has_been_reached() {
|
|
|
|
|
todo!();
|
|
|
|
|
let (write_tx, _write_rx) = mpsc::channel(1);
|
|
|
|
|
let (_read_tx, read_rx) = mpsc::channel(1);
|
|
|
|
|
|
|
|
|
|
let transport = InmemoryTransport::new(write_tx, read_rx);
|
|
|
|
|
|
|
|
|
|
// Fill up the channel
|
|
|
|
|
transport
|
|
|
|
|
.try_write(b"some bytes")
|
|
|
|
|
.expect("Failed to fill channel");
|
|
|
|
|
|
|
|
|
|
assert_eq!(
|
|
|
|
|
transport.try_write(b"some bytes").unwrap_err().kind(),
|
|
|
|
|
io::ErrorKind::WouldBlock
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
|
fn try_write_should_succeed_with_zero_bytes_written_if_channel_closed() {
|
|
|
|
|
todo!();
|
|
|
|
|
let (write_tx, write_rx) = mpsc::channel(1);
|
|
|
|
|
let (_read_tx, read_rx) = mpsc::channel(1);
|
|
|
|
|
|
|
|
|
|
// Drop to close the write channel
|
|
|
|
|
drop(write_rx);
|
|
|
|
|
|
|
|
|
|
let transport = InmemoryTransport::new(write_tx, read_rx);
|
|
|
|
|
assert_eq!(transport.try_write(b"some bytes").unwrap(), 0);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[tokio::test]
|
|
|
|
|