More progress -- no compile

pull/146/head
Chip Senkbeil 2 years ago
parent 4e42d6cee8
commit c4b7db33d5
No known key found for this signature in database
GPG Key ID: 35EF1F8EC72A4131

@ -411,7 +411,7 @@ mod tests {
use super::*;
use crate::data::{DistantRequestData, DistantResponseData};
use distant_net::{
Client, FramedTransport, InmemoryRawTransport, IntoSplit, PlainCodec, Request, Response,
Client, FramedTransport, InmemoryTransport, IntoSplit, PlainCodec, Request, Response,
TypedAsyncRead, TypedAsyncWrite,
};
use std::{future::Future, time::Duration};
@ -421,7 +421,7 @@ mod tests {
// Configures an lsp process with a means to send & receive data from outside
async fn spawn_lsp_process() -> (
FramedTransport<InmemoryRawTransport, PlainCodec>,
FramedTransport<InmemoryTransport, PlainCodec>,
RemoteLspProcess,
) {
let (mut t1, t2) = FramedTransport::pair(100);

@ -609,13 +609,13 @@ mod tests {
data::{Error, ErrorKind},
};
use distant_net::{
Client, FramedTransport, InmemoryRawTransport, IntoSplit, PlainCodec, Response,
Client, FramedTransport, InmemoryTransport, IntoSplit, PlainCodec, Response,
TypedAsyncRead, TypedAsyncWrite,
};
use std::time::Duration;
fn make_session() -> (
FramedTransport<InmemoryRawTransport, PlainCodec>,
FramedTransport<InmemoryTransport, PlainCodec>,
DistantClient,
) {
let (t1, t2) = FramedTransport::pair(100);

@ -197,14 +197,14 @@ mod tests {
};
use crate::DistantClient;
use distant_net::{
Client, FramedTransport, InmemoryRawTransport, IntoSplit, PlainCodec, Response,
Client, FramedTransport, InmemoryTransport, IntoSplit, PlainCodec, Response,
TypedAsyncRead, TypedAsyncWrite,
};
use std::{path::PathBuf, sync::Arc};
use tokio::sync::Mutex;
fn make_session() -> (
FramedTransport<InmemoryRawTransport, PlainCodec>,
FramedTransport<InmemoryTransport, PlainCodec>,
DistantClient,
) {
let (t1, t2) = FramedTransport::pair(100);

@ -185,14 +185,14 @@ mod tests {
use crate::data::ChangeKind;
use crate::DistantClient;
use distant_net::{
Client, FramedTransport, InmemoryRawTransport, IntoSplit, PlainCodec, Response,
Client, FramedTransport, InmemoryTransport, IntoSplit, PlainCodec, Response,
TypedAsyncRead, TypedAsyncWrite,
};
use std::sync::Arc;
use tokio::sync::Mutex;
fn make_session() -> (
FramedTransport<InmemoryRawTransport, PlainCodec>,
FramedTransport<InmemoryTransport, PlainCodec>,
DistantClient,
) {
let (t1, t2) = FramedTransport::pair(100);

@ -378,13 +378,12 @@ mod tests {
use super::*;
use crate::data::{Error, ErrorKind};
use distant_net::{
FramedTransport, InmemoryRawTransport, PlainCodec, UntypedTransportRead,
UntypedTransportWrite,
FramedTransport, InmemoryTransport, PlainCodec, UntypedTransportRead, UntypedTransportWrite,
};
fn setup() -> (
DistantManagerClient,
FramedTransport<InmemoryRawTransport, PlainCodec>,
FramedTransport<InmemoryTransport, PlainCodec>,
) {
let (t1, t2) = FramedTransport::pair(100);
let client =

@ -426,8 +426,8 @@ impl Server for DistantManager {
mod tests {
use super::*;
use distant_net::{
AuthClient, FramedTransport, HeapAuthServer, InmemoryRawTransport, IntoSplit,
MappedListener, OneshotListener, PlainCodec, ServerExt, ServerRef,
AuthClient, FramedTransport, HeapAuthServer, InmemoryTransport, IntoSplit, MappedListener,
OneshotListener, PlainCodec, ServerExt, ServerRef,
};
/// Create a new server, bypassing the start loop
@ -470,7 +470,7 @@ mod tests {
/// Creates a writer & reader with a connected transport
fn setup_distant_writer_reader() -> (
(BoxedDistantWriter, BoxedDistantReader),
FramedTransport<InmemoryRawTransport, PlainCodec>,
FramedTransport<InmemoryTransport, PlainCodec>,
) {
let (t1, t2) = FramedTransport::pair(1);
let (writer, reader) = t1.into_split();

@ -1,5 +1,5 @@
use distant_core::{
net::{FramedTransport, InmemoryRawTransport, IntoSplit, OneshotListener, PlainCodec},
net::{FramedTransport, InmemoryTransport, IntoSplit, OneshotListener, PlainCodec},
BoxedDistantReader, BoxedDistantWriter, Destination, DistantApiServer, DistantChannelExt,
DistantManager, DistantManagerClient, DistantManagerClientConfig, DistantManagerConfig, Map,
};
@ -8,10 +8,10 @@ use std::io;
/// Creates a client transport and server listener for our tests
/// that are connected together
async fn setup() -> (
FramedTransport<InmemoryRawTransport, PlainCodec>,
OneshotListener<FramedTransport<InmemoryRawTransport, PlainCodec>>,
FramedTransport<InmemoryTransport, PlainCodec>,
OneshotListener<FramedTransport<InmemoryTransport, PlainCodec>>,
) {
let (t1, t2) = InmemoryRawTransport::pair(100);
let (t1, t2) = InmemoryTransport::pair(100);
let listener = OneshotListener::from_value(FramedTransport::new(t2, PlainCodec));
let transport = FramedTransport::new(t1, PlainCodec);

@ -1,7 +1,6 @@
mod any;
mod auth;
mod client;
mod codec;
mod id;
mod key;
mod listener;
@ -14,7 +13,6 @@ mod utils;
pub use any::*;
pub use auth::*;
pub use client::*;
pub use codec::*;
pub use id::*;
pub use key::*;
pub use listener::*;

@ -6,11 +6,8 @@ pub struct ServerConnection {
/// Unique identifier tied to the connection
pub id: ConnectionId,
/// Task that is processing incoming requests from the connection
pub(crate) reader_task: Option<JoinHandle<()>>,
/// Task that is processing outgoing responses to the connection
pub(crate) writer_task: Option<JoinHandle<()>>,
/// Task that is processing requests and responses
pub(crate) task: Option<JoinHandle<()>>,
}
impl Default for ServerConnection {
@ -24,27 +21,18 @@ impl ServerConnection {
pub fn new() -> Self {
Self {
id: rand::random(),
reader_task: None,
writer_task: None,
task: None,
}
}
/// Returns true if connection is still processing incoming or outgoing messages
pub fn is_active(&self) -> bool {
let reader_active =
self.reader_task.is_some() && !self.reader_task.as_ref().unwrap().is_finished();
let writer_active =
self.writer_task.is_some() && !self.writer_task.as_ref().unwrap().is_finished();
reader_active || writer_active
self.reader_task.is_some() && !self.reader_task.as_ref().unwrap().is_finished()
}
/// Aborts the connection
pub fn abort(&self) {
if let Some(task) = self.reader_task.as_ref() {
task.abort();
}
if let Some(task) = self.writer_task.as_ref() {
if let Some(task) = self.task.as_ref() {
task.abort();
}
}

@ -1,6 +1,6 @@
use crate::{
utils::Timer, GenericServerRef, Listener, Request, Response, Server, ServerConnection,
ServerCtx, ServerRef, ServerReply, ServerState, Shutdown, TypedAsyncRead, TypedAsyncWrite,
utils::Timer, ConnectionId, GenericServerRef, Interest, Listener, Request, Response, Server,
ServerConnection, ServerCtx, ServerRef, ServerReply, ServerState, Shutdown, TypedTransport,
};
use log::*;
use serde::{de::DeserializeOwned, Serialize};
@ -8,7 +8,10 @@ use std::{
io,
sync::{Arc, Weak},
};
use tokio::sync::{mpsc, Mutex};
use tokio::{
sync::{mpsc, Mutex},
task::JoinHandle,
};
mod tcp;
pub use tcp::*;
@ -33,28 +36,26 @@ pub trait ServerExt {
type Response;
/// Start a new server using the provided listener
fn start<L, R, W>(self, listener: L) -> io::Result<Box<dyn ServerRef>>
fn start<L, T>(self, listener: L) -> io::Result<Box<dyn ServerRef>>
where
L: Listener<Output = (W, R)> + 'static,
R: TypedAsyncRead<Request<Self::Request>> + Send + 'static,
W: TypedAsyncWrite<Response<Self::Response>> + Send + 'static;
L: Listener<Output = T> + 'static,
T: TypedTransport<Input = Self::Request, Output = Self::Response> + Send + 'static;
}
impl<S, Req, Res, Data> ServerExt for S
impl<S> ServerExt for S
where
S: Server<Request = Req, Response = Res, LocalData = Data> + Sync + 'static,
Req: DeserializeOwned + Send + Sync + 'static,
Res: Serialize + Send + 'static,
Data: Default + Send + Sync + 'static,
S: Server + Sync + 'static,
S::Request: DeserializeOwned + Send + Sync + 'static,
S::Response: Serialize + Send + 'static,
S::LocalData: Default + Send + Sync + 'static,
{
type Request = Req;
type Response = Res;
type Request = S::Request;
type Response = S::Response;
fn start<L, R, W>(self, listener: L) -> io::Result<Box<dyn ServerRef>>
fn start<L, T>(self, listener: L) -> io::Result<Box<dyn ServerRef>>
where
L: Listener<Output = (W, R)> + 'static,
R: TypedAsyncRead<Request<Self::Request>> + Send + 'static,
W: TypedAsyncWrite<Response<Self::Response>> + Send + 'static,
L: Listener<Output = T> + 'static,
T: TypedTransport<Input = Self::Request, Output = Self::Response> + Send + 'static,
{
let server = Arc::new(self);
let state = Arc::new(ServerState::new());
@ -65,15 +66,14 @@ where
}
}
async fn task<S, Req, Res, Data, L, R, W>(server: Arc<S>, state: Arc<ServerState>, mut listener: L)
async fn task<S, L, T>(server: Arc<S>, state: Arc<ServerState>, mut listener: L)
where
S: Server<Request = Req, Response = Res, LocalData = Data> + Sync + 'static,
Req: DeserializeOwned + Send + Sync + 'static,
Res: Serialize + Send + 'static,
Data: Default + Send + Sync + 'static,
L: Listener<Output = (W, R)> + 'static,
R: TypedAsyncRead<Request<Req>> + Send + 'static,
W: TypedAsyncWrite<Response<Res>> + Send + 'static,
S: Server<Request = T::Input, Response = T::Output> + Sync + 'static,
S::LocalData: Default + Send + Sync + 'static,
L: Listener<Output = T> + 'static,
T: TypedTransport + Send + 'static,
T::Input: DeserializeOwned + Send + Sync + 'static,
T::Output: Serialize + Send + 'static,
{
// Grab a copy of our server's configuration so we can leverage it below
let config = server.config();
@ -116,7 +116,7 @@ where
// Receive a new connection, exiting if no longer accepting connections or if the shutdown
// signal has been received
let (mut writer, mut reader) = tokio::select! {
let mut transport = tokio::select! {
result = listener.accept() => {
match result {
Ok(x) => x,
@ -150,44 +150,70 @@ where
// Create some default data for the new connection and pass it
// to the callback prior to processing new requests
let local_data = {
let mut data = Data::default();
let mut data = S::LocalData::default();
server.on_accept(&mut data).await;
Arc::new(data)
};
// Start a writer task that reads from a channel and forwards all
// data through the writer
let (tx, mut rx) = mpsc::channel::<Response<Res>>(1);
connection.writer_task = Some(tokio::spawn(async move {
while let Some(data) = rx.recv().await {
// Log our message as a string, which can be expensive
if log_enabled!(Level::Trace) {
trace!(
"[Conn {connection_id}] Sending {}",
&data
.to_vec()
.map(|x| String::from_utf8_lossy(&x).to_string())
.unwrap_or_else(|_| "<Cannot serialize>".to_string())
);
}
if let Err(x) = writer.write(data).await {
error!("[Conn {connection_id}] Failed to send {x}");
break;
}
connection.task = Some(
ConnectionTask {
id: connection_id,
server,
state: Arc::downgrade(&state),
transport,
local_data,
shutdown_timer: shutdown_timer
.as_ref()
.map(Arc::downgrade)
.unwrap_or_default(),
}
}));
// Start a reader task that reads requests and processes them
// using the provided handler
let weak_state = Arc::downgrade(&state);
let weak_shutdown_timer = shutdown_timer
.as_ref()
.map(Arc::downgrade)
.unwrap_or_default();
connection.reader_task = Some(tokio::spawn(async move {
loop {
match reader.read().await {
.spawn(),
);
state
.connections
.write()
.await
.insert(connection_id, connection);
}
}
struct ConnectionTask<S, T, D> {
id: ConnectionId,
server: Arc<S>,
state: Weak<ServerState>,
transport: T,
local_data: D,
shutdown_timer: Weak<Mutex<Timer<()>>>,
}
impl<S, T, D> ConnectionTask<S, T, D>
where
S: Server<Request = T::Input, Response = T::Output, LocalData = D> + Sync + 'static,
D: Default + Send + Sync + 'static,
T: TypedTransport + Send + 'static,
T::Input: DeserializeOwned + Send + Sync + 'static,
T::Output: Serialize + Send + 'static,
{
pub fn spawn(self) -> JoinHandle<()> {
tokio::spawn(self.run())
}
async fn run(self) {
let connection_id = self.id;
// Construct a queue of outgoing responses
let (tx, mut rx) = mpsc::channel::<Response<T::Output>>(1);
loop {
let ready = self
.transport
.ready(Interest::READABLE | Interest::WRITABLE)
.await
.expect("[Conn {connection_id}] Failed to examine ready state");
if ready.is_readable() {
match self.transport.try_read() {
Ok(Some(request)) => {
let reply = ServerReply {
origin_id: request.id.clone(),
@ -198,20 +224,20 @@ where
connection_id,
request,
reply: reply.clone(),
local_data: Arc::clone(&local_data),
local_data: Arc::clone(&self.local_data),
};
server.on_request(ctx).await;
self.server.on_request(ctx).await;
}
Ok(None) => {
debug!("[Conn {connection_id}] Connection closed");
// Remove the connection from our state if it has closed
if let Some(state) = Weak::upgrade(&weak_state) {
state.connections.write().await.remove(&connection_id);
if let Some(state) = Weak::upgrade(&self.weak_state) {
state.connections.write().await.remove(&self.connection_id);
// If we have no more connections, start the timer
if let Some(timer) = Weak::upgrade(&weak_shutdown_timer) {
if let Some(timer) = Weak::upgrade(&self.weak_shutdown_timer) {
if state.connections.read().await.is_empty() {
timer.lock().await.start();
}
@ -219,6 +245,7 @@ where
}
break;
}
Err(x) if x.kind() == io::ErrorKind::WouldBlock => continue,
Err(x) => {
// NOTE: We do NOT break out of the loop, as this could happen
// if someone sends bad data at any point, but does not
@ -228,13 +255,35 @@ where
}
}
}
}));
state
.connections
.write()
.await
.insert(connection_id, connection);
// If our socket is ready to be written to, we try to get the next item from
// the queue and process it
if ready.is_writable() {
match rx.try_recv() {
Ok(data) => {
// Log our message as a string, which can be expensive
if log_enabled!(Level::Trace) {
trace!(
"[Conn {connection_id}] Sending {}",
&data
.to_vec()
.map(|x| String::from_utf8_lossy(&x).to_string())
.unwrap_or_else(|_| "<Cannot serialize>".to_string())
);
}
match self.transport.try_write(data) {
Ok(()) => continue,
Err(x) if x.kind() == io::ErrorKind::WouldBlock => continue,
Err(x) => error!("[Conn {connection_id}] Send failed: {x}"),
}
}
// If we don't have data, we skip
Err(_) => continue,
}
}
}
}
}

@ -49,7 +49,7 @@ pub trait RawTransport: Reconnectable {
Ok(())
}
/// Waits for the transport to be readable to follow up with `try_write`
/// Waits for the transport to be writeable to follow up with `try_write`
async fn writeable(&self) -> io::Result<()> {
let _ = self.ready(Interest::WRITABLE).await?;
Ok(())

@ -47,7 +47,7 @@ where
}
}
impl FramedRawTransport<super::InmemoryRawTransport, PlainCodec> {
impl FramedRawTransport<super::InmemoryTransport, PlainCodec> {
/// Produces a pair of inmemory transports that are connected to each other using
/// a standard codec
///
@ -55,10 +55,10 @@ impl FramedRawTransport<super::InmemoryRawTransport, PlainCodec> {
pub fn pair(
buffer: usize,
) -> (
FramedRawTransport<super::InmemoryRawTransport, PlainCodec>,
FramedRawTransport<super::InmemoryRawTransport, PlainCodec>,
FramedRawTransport<super::InmemoryTransport, PlainCodec>,
FramedRawTransport<super::InmemoryTransport, PlainCodec>,
) {
let (a, b) = super::InmemoryRawTransport::pair(buffer);
let (a, b) = super::InmemoryTransport::pair(buffer);
let a = FramedRawTransport::new(a, PlainCodec::new());
let b = FramedRawTransport::new(b, PlainCodec::new());
(a, b)

@ -11,7 +11,7 @@ use tokio::sync::mpsc::{
/// Represents a [`RawTransport`] comprised of two inmemory channels
#[derive(Debug)]
pub struct InmemoryRawTransport {
pub struct InmemoryTransport {
tx: mpsc::Sender<Vec<u8>>,
rx: mpsc::Receiver<Vec<u8>>,
@ -19,7 +19,7 @@ pub struct InmemoryRawTransport {
buf: Mutex<Option<Vec<u8>>>,
}
impl InmemoryRawTransport {
impl InmemoryTransport {
pub fn new(tx: mpsc::Sender<Vec<u8>>, rx: mpsc::Receiver<Vec<u8>>) -> Self {
Self {
tx,
@ -80,7 +80,7 @@ impl InmemoryRawTransport {
}
#[async_trait]
impl Reconnectable for InmemoryRawTransport {
impl Reconnectable for InmemoryTransport {
/// Once the underlying channels have closed, there is no way for this transport to
/// re-establish those channels; therefore, reconnecting will always fail with
/// [`ErrorKind::Unsupported`]
@ -92,7 +92,7 @@ impl Reconnectable for InmemoryRawTransport {
}
#[async_trait]
impl RawTransport for InmemoryRawTransport {
impl RawTransport for InmemoryTransport {
fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> {
// Lock our internal storage to ensure that nothing else mutates it for the lifetime of
// this call as we want to make sure that data is read and stored in order
@ -170,7 +170,7 @@ mod tests {
#[tokio::test]
async fn make_should_return_sender_that_sends_data_to_transport() {
let (tx, _, mut transport) = InmemoryRawTransport::make(3);
let (tx, _, mut transport) = InmemoryTransport::make(3);
tx.send(b"test msg 1".to_vec()).await.unwrap();
tx.send(b"test msg 2".to_vec()).await.unwrap();
@ -199,7 +199,7 @@ mod tests {
#[tokio::test]
async fn make_should_return_receiver_that_receives_data_from_transport() {
let (_, mut rx, mut transport) = InmemoryRawTransport::make(3);
let (_, mut rx, mut transport) = InmemoryTransport::make(3);
transport.write_all(b"test msg 1").await.unwrap();
transport.write_all(b"test msg 2").await.unwrap();

@ -46,6 +46,9 @@ impl fmt::Debug for TcpTransport {
#[async_trait]
impl Reconnectable for TcpTransport {
async fn reconnect(&mut self) -> io::Result<()> {
// Drop the existing connection to ensure we are disconnected before trying again
drop(self.inner);
self.inner = TcpStream::connect((self.addr, self.port)).await?;
Ok(())
}

@ -39,6 +39,9 @@ impl fmt::Debug for UnixSocketTransport {
#[async_trait]
impl Reconnectable for UnixSocketTransport {
async fn reconnect(&mut self) -> io::Result<()> {
// Drop the existing connection to ensure we are disconnected before trying again
drop(self.inner);
self.inner = UnixStream::connect(self.path.as_path()).await?;
Ok(())
}

@ -61,6 +61,9 @@ impl Reconnectable for WindowsPipeTransport {
return Err(io::Error::from(io::ErrorKind::Unsupported));
}
// Drop the existing connection to ensure we are disconnected before trying again
drop(self.inner);
self.inner = NamedPipe::connect_as_client(&self.addr).await?;
Ok(())
}

@ -41,7 +41,7 @@ pub trait TypedTransport: Reconnectable {
Ok(())
}
/// Waits for the transport to be readable to follow up with `try_write`
/// Waits for the transport to be writable to follow up with `try_write`
async fn writeable(&self) -> io::Result<()> {
let _ = self.ready(Interest::WRITABLE).await?;
Ok(())

@ -1,4 +1,4 @@
use super::{Interest, Ready, TypedTransport};
use super::{Interest, Ready, Reconnectable, TypedTransport};
use async_trait::async_trait;
use serde::{de::DeserializeOwned, Serialize};
use std::io;
@ -6,7 +6,7 @@ use std::io;
/// Interface representing a transport that uses [`serde`] to serialize and deserialize data
/// as it is sent and received
#[async_trait]
pub trait UntypedTransport {
pub trait UntypedTransport: Reconnectable {
/// Attempts to read some data as `T`, returning [`io::Error`] if unable to deserialize
/// or some other error occurs. `Some(T)` is returned if successful. `None` is
/// returned if no more data is available.
@ -39,7 +39,7 @@ pub trait UntypedTransport {
Ok(())
}
/// Waits for the transport to be readable to follow up with `try_write`
/// Waits for the transport to be writable to follow up with `try_write`
async fn writeable(&self) -> io::Result<()> {
let _ = self.ready(Interest::WRITABLE).await?;
Ok(())

Loading…
Cancel
Save