pull/146/head
Chip Senkbeil 2 years ago
parent 1bd80ae0db
commit fbdbcdc152
No known key found for this signature in database
GPG Key ID: 35EF1F8EC72A4131

@ -19,10 +19,10 @@ mod unix;
#[cfg(unix)]
pub use unix::*;
#[cfg(windows)]
// #[cfg(windows)]
mod windows;
#[cfg(windows)]
// #[cfg(windows)]
pub use windows::*;
/// Represents a type that has a listen interface for receiving raw streams

@ -66,11 +66,8 @@ impl Listener for WindowsPipeListener {
#[cfg(test)]
mod tests {
use super::*;
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
sync::oneshot,
task::JoinHandle,
};
use crate::RawTransport;
use tokio::{sync::oneshot, task::JoinHandle};
#[tokio::test]
async fn should_fail_to_bind_if_pipe_already_bound() {

@ -17,10 +17,10 @@ mod unix;
#[cfg(unix)]
pub use unix::*;
#[cfg(windows)]
// #[cfg(windows)]
mod windows;
#[cfg(windows)]
// #[cfg(windows)]
pub use windows::*;
/// Interface representing a transport of raw bytes into and out of the system

@ -2,9 +2,6 @@ use super::{Interest, RawTransport, Ready, Reconnectable};
use std::{
ffi::{OsStr, OsString},
fmt, io,
pin::Pin,
task::{Context, Poll},
time::Duration,
};
mod pipe;
@ -33,10 +30,7 @@ impl WindowsPipeTransport {
let addr = addr.into();
let inner = NamedPipe::connect_as_client(&addr).await?;
Ok(Self {
addr,
inner,
})
Ok(Self { addr, inner })
}
/// Returns the addr that the listener is bound to
@ -61,9 +55,6 @@ impl Reconnectable for WindowsPipeTransport {
return Err(io::Error::from(io::ErrorKind::Unsupported));
}
// Drop the existing connection to ensure we are disconnected before trying again
drop(self.inner);
self.inner = NamedPipe::connect_as_client(&self.addr).await?;
Ok(())
}
@ -72,27 +63,31 @@ impl Reconnectable for WindowsPipeTransport {
#[async_trait]
impl RawTransport for WindowsPipeTransport {
fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> {
self.inner.try_read(buf)
match self.inner {
NamedPipe::Client(x) => x.try_read(buf),
NamedPipe::Server(x) => x.try_read(buf),
}
}
fn try_write(&self, buf: &[u8]) -> io::Result<usize> {
self.inner.try_write(buf)
match self.inner {
NamedPipe::Client(x) => x.try_write(buf),
NamedPipe::Server(x) => x.try_write(buf),
}
}
async fn ready(&self, interest: Interest) -> io::Result<Ready> {
self.inner.ready(interest).await
match self.inner {
NamedPipe::Client(x) => x.ready(interest).await,
NamedPipe::Server(x) => x.ready(interest).await,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::windows::named_pipe::ServerOptions,
sync::oneshot,
task::JoinHandle,
};
use tokio::{net::windows::named_pipe::ServerOptions, sync::oneshot, task::JoinHandle};
#[tokio::test]
async fn should_fail_to_connect_if_pipe_does_not_exist() {
@ -112,6 +107,8 @@ mod tests {
// Spawn a task that will wait for a connection, send data,
// and receive data that it will return in the task
let task: JoinHandle<io::Result<()>> = tokio::spawn(async move {
use tokio::io::{AsyncReadExt, AsyncWriteExt};
// Generate a pipe address (not just a name)
let addr = format!(r"\\.\pipe\test_pipe_{}", rand::random::<usize>());

@ -1,5 +1,5 @@
use derive_more::{From, TryInto};
use std::io;
use std::{ffi::OsStr, io, time::Duration};
use tokio::net::windows::named_pipe::{ClientOptions, NamedPipeClient, NamedPipeServer};
// Equivalent to winapi::shared::winerror::ERROR_PIPE_BUSY
@ -7,7 +7,7 @@ use tokio::net::windows::named_pipe::{ClientOptions, NamedPipeClient, NamedPipeS
const ERROR_PIPE_BUSY: u32 = 231;
// Time between attempts to connect to a busy pipe
const BUSY_PIPE_SLEEP_MILLIS: u64 = 50;
const BUSY_PIPE_SLEEP_DURATION: Duration = Duration::from_millis(50);
/// Represents a named pipe from either a client or server perspective
#[derive(From, TryInto)]
@ -17,6 +17,11 @@ pub enum NamedPipe {
}
impl NamedPipe {
/// Returns true if the underlying named pipe is a client named pipe
pub fn is_client(&self) -> bool {
matches!(self, Self::Client(_))
}
/// Returns a reference to the underlying named client pipe
pub fn as_client(&self) -> Option<&NamedPipeClient> {
match self {
@ -41,6 +46,11 @@ impl NamedPipe {
}
}
/// Returns true if the underlying named pipe is a server named pipe
pub fn is_server(&self) -> bool {
matches!(self, Self::Server(_))
}
/// Returns a reference to the underlying named server pipe
pub fn as_server(&self) -> Option<&NamedPipeServer> {
match self {
@ -66,7 +76,7 @@ impl NamedPipe {
}
/// Attempts to connect as a client pipe
pub(super) fn connect_as_client(addr: &OsStr) -> io::Result<Self> {
pub(super) async fn connect_as_client(addr: &OsStr) -> io::Result<Self> {
let pipe = loop {
match ClientOptions::new().open(addr) {
Ok(client) => break client,
@ -74,7 +84,7 @@ impl NamedPipe {
Err(e) => return Err(e),
}
tokio::time::sleep(Duration::from_millis(BUSY_PIPE_SLEEP_MILLIS)).await;
tokio::time::sleep(BUSY_PIPE_SLEEP_DURATION).await;
};
Ok(NamedPipe::from(pipe))

Loading…
Cancel
Save