use std::io::{self, stdin, stdout, Read, Write}; use std::os::unix::io::AsRawFd; use std::pin::Pin; use std::task::{Context, Poll}; use pin_project::pin_project; use std::{ io::{Stdin, Stdout}, net::{IpAddr, Ipv4Addr, SocketAddr}, }; use tokio::io::*; use tonic::transport::{server::Connected, Uri}; #[pin_project] pub struct StdioSocket { #[pin] reader: PollEvented>, #[pin] writer: PollEvented>, } pub async fn stdio_connector(_: Uri) -> io::Result> { StdioSocket::try_new() } impl StdioSocket { pub fn try_new() -> io::Result { Self::try_new_rw(stdin(), stdout()) } } impl Connected for StdioSocket { fn remote_addr(&self) -> Option { Some(SocketAddr::new(IpAddr::from(Ipv4Addr::UNSPECIFIED), 8080)) } } impl StdioSocket { pub fn try_new_rw(read: R, write: W) -> io::Result { Ok(StdioSocket { reader: PollEvented::new(async_stdio::EventedStdin::try_new(read)?)?, writer: PollEvented::new(async_stdio::EventedStdout::try_new(write)?)?, }) } } impl AsyncRead for StdioSocket { fn poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll> { self.project().reader.poll_read(cx, buf) } } impl AsyncWrite for StdioSocket { fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll> { self.project().writer.poll_write(cx, buf) } fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { self.project().writer.poll_flush(cx) } fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { self.project().writer.poll_shutdown(cx) } } mod async_stdio { use std::io::{self, Read, Write}; use std::os::unix::io::AsRawFd; use mio::event::Evented; use mio::unix::EventedFd; use mio::{Poll, PollOpt, Ready, Token}; use libc::{fcntl, F_GETFL, F_SETFL, O_NONBLOCK}; pub struct EventedStdin(T); pub struct EventedStdout(T); impl EventedStdin { pub fn try_new(stdin: T) -> io::Result { set_non_blocking_flag(&stdin)?; Ok(EventedStdin(stdin)) } } impl EventedStdout { pub fn try_new(stdout: T) -> io::Result { set_non_blocking_flag(&stdout)?; Ok(EventedStdout(stdout)) } } impl Evented for EventedStdin { fn register( &self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt, ) -> io::Result<()> { EventedFd(&self.0.as_raw_fd()).register(poll, token, interest, opts) } fn reregister( &self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt, ) -> io::Result<()> { EventedFd(&self.0.as_raw_fd()).reregister(poll, token, interest, opts) } fn deregister(&self, poll: &Poll) -> io::Result<()> { EventedFd(&self.0.as_raw_fd()).deregister(poll) } } impl Read for EventedStdin { fn read(&mut self, buf: &mut [u8]) -> io::Result { self.0.read(buf) } } impl Evented for EventedStdout { fn register( &self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt, ) -> io::Result<()> { EventedFd(&self.0.as_raw_fd()).register(poll, token, interest, opts) } fn reregister( &self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt, ) -> io::Result<()> { EventedFd(&self.0.as_raw_fd()).reregister(poll, token, interest, opts) } fn deregister(&self, poll: &Poll) -> io::Result<()> { EventedFd(&self.0.as_raw_fd()).deregister(poll) } } impl Write for EventedStdout { fn write(&mut self, buf: &[u8]) -> io::Result { self.0.write(buf) } fn flush(&mut self) -> io::Result<()> { self.0.flush() } } fn set_non_blocking_flag(stream: &T) -> io::Result<()> { let flags = unsafe { fcntl(stream.as_raw_fd(), F_GETFL, 0) }; if flags < 0 { return Err(std::io::Error::last_os_error()); } if unsafe { fcntl(stream.as_raw_fd(), F_SETFL, flags | O_NONBLOCK) } != 0 { return Err(std::io::Error::last_os_error()); } Ok(()) } }