Fix dropped messages on client side and lockup of transport when trying to read and write concurrently

pull/38/head
Chip Senkbeil 3 years ago
parent 6ef55d6e38
commit f59ae7f6ed
No known key found for this signature in database
GPG Key ID: 35EF1F8EC72A4131

200
Cargo.lock generated

@ -43,6 +43,15 @@ version = "1.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693"
[[package]]
name = "block-buffer"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4152116fd6e9dadb291ae18fc1ec3575ed6d84c29642d97890f4b4a3417297e4"
dependencies = [
"generic-array",
]
[[package]]
name = "bumpalo"
version = "3.7.0"
@ -89,18 +98,64 @@ dependencies = [
"vec_map",
]
[[package]]
name = "const-oid"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "44c32f031ea41b4291d695026c023b95d59db2d8a2c7640800ed56bc8f510f22"
[[package]]
name = "convert_case"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6245d59a3e82a7fc217c5828a6692dbc6dfb63a0c8c90495621f7b9d79704a0e"
[[package]]
name = "cpufeatures"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "66c99696f6c9dd7f35d486b9d04d7e6e202aa3e8c40d553f2fdf5e7e0c6a71ef"
dependencies = [
"libc",
]
[[package]]
name = "crypto-bigint"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b32a398eb1ccfbe7e4f452bc749c44d38dd732e9a253f19da224c416f00ee7f4"
dependencies = [
"generic-array",
"rand_core",
"subtle",
"zeroize",
]
[[package]]
name = "crypto-mac"
version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b1d1a86f49236c215f271d40892d5fc950490551400b02ef360692c29815c714"
dependencies = [
"generic-array",
"subtle",
]
[[package]]
name = "ct-codecs"
version = "1.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f3b7eb4404b8195a9abb6356f4ac07d8ba267045c8d6d220ac4dc992e6cc75df"
[[package]]
name = "der"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "49f215f706081a44cb702c71c39a52c05da637822e9c1645a50b7202689e982d"
dependencies = [
"const-oid",
]
[[package]]
name = "derive_more"
version = "0.99.16"
@ -113,6 +168,15 @@ dependencies = [
"syn",
]
[[package]]
name = "digest"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d3dd60d1080a57a05ab032377049e0591415d2b31afd7028356dbf3cc6dcb066"
dependencies = [
"generic-array",
]
[[package]]
name = "directories"
version = "3.0.2"
@ -144,6 +208,7 @@ dependencies = [
"fork",
"futures",
"hex",
"k256",
"lazy_static",
"log",
"orion",
@ -160,6 +225,44 @@ dependencies = [
"whoami",
]
[[package]]
name = "ecdsa"
version = "0.12.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "713c32426287891008edb98f8b5c6abb2130aa043c93a818728fcda78606f274"
dependencies = [
"der",
"elliptic-curve",
"hmac",
"signature",
]
[[package]]
name = "elliptic-curve"
version = "0.10.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "069397e10739989e400628cbc0556a817a8a64119d7a2315767f4456e1332c23"
dependencies = [
"crypto-bigint",
"ff",
"generic-array",
"group",
"pkcs8",
"rand_core",
"subtle",
"zeroize",
]
[[package]]
name = "ff"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "63eec06c61e487eecf0f7e6e6372e596a81922c28d33e645d6983ca6493a1af0"
dependencies = [
"rand_core",
"subtle",
]
[[package]]
name = "flexi_logger"
version = "0.18.0"
@ -279,6 +382,16 @@ dependencies = [
"slab",
]
[[package]]
name = "generic-array"
version = "0.14.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "501466ecc8a30d1d3b7fc9229b122b2ce8ed6e9d9223f1138d4babb253e51817"
dependencies = [
"typenum",
"version_check",
]
[[package]]
name = "getrandom"
version = "0.2.3"
@ -296,6 +409,17 @@ version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9b919933a397b79c37e33b77bb2aa3dc8eb6e165ad809e58ff75bc7db2e34574"
[[package]]
name = "group"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1c363a5301b8f153d80747126a04b3c82073b9fe3130571a9d170cacdeaf7912"
dependencies = [
"ff",
"rand_core",
"subtle",
]
[[package]]
name = "half"
version = "1.7.1"
@ -326,6 +450,16 @@ version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70"
[[package]]
name = "hmac"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2a2a2320eb7ec0ebe8da8f744d7812d9fc4cb4d09344ac01898dbcb6a20ae69b"
dependencies = [
"crypto-mac",
"digest",
]
[[package]]
name = "instant"
version = "0.1.10"
@ -350,6 +484,18 @@ dependencies = [
"wasm-bindgen",
]
[[package]]
name = "k256"
version = "0.9.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "903ae2481bcdfdb7b68e0a9baa4b7c9aff600b9ae2e8e5bb5833b8c91ab851ea"
dependencies = [
"cfg-if",
"ecdsa",
"elliptic-curve",
"sha2",
]
[[package]]
name = "lazy_static"
version = "1.4.0"
@ -452,6 +598,12 @@ version = "1.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "692fcb63b64b1758029e0a96ee63e049ce8c5948587f2f7208df04625e5f6b56"
[[package]]
name = "opaque-debug"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5"
[[package]]
name = "orion"
version = "0.16.0"
@ -501,6 +653,16 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "pkcs8"
version = "0.7.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fbee84ed13e44dd82689fa18348a49934fa79cc774a344c42fc9b301c71b140a"
dependencies = [
"der",
"spki",
]
[[package]]
name = "ppv-lite86"
version = "0.2.10"
@ -699,6 +861,19 @@ dependencies = [
"serde",
]
[[package]]
name = "sha2"
version = "0.9.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b362ae5752fd2137731f9fa25fd4d9058af34666ca1966fb969119cc35719f12"
dependencies = [
"block-buffer",
"cfg-if",
"cpufeatures",
"digest",
"opaque-debug",
]
[[package]]
name = "signal-hook-registry"
version = "1.4.0"
@ -708,6 +883,16 @@ dependencies = [
"libc",
]
[[package]]
name = "signature"
version = "1.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c19772be3c4dd2ceaacf03cb41d5885f2a02c4d8804884918e3a258480803335"
dependencies = [
"digest",
"rand_core",
]
[[package]]
name = "slab"
version = "0.4.3"
@ -720,6 +905,15 @@ version = "1.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fe0f37c9e8f3c5a4a66ad655a93c74daac4ad00c441533bf5c6e7990bb42604e"
[[package]]
name = "spki"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "987637c5ae6b3121aba9d513f869bd2bff11c4cc086c22473befd6649c0bd521"
dependencies = [
"der",
]
[[package]]
name = "strsim"
version = "0.8.0"
@ -884,6 +1078,12 @@ dependencies = [
"tokio",
]
[[package]]
name = "typenum"
version = "1.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "879f6906492a7cd215bfa4cf595b600146ccfac0c79bcbd1f3000162af5e8b06"
[[package]]
name = "unicode-segmentation"
version = "1.8.0"

@ -16,6 +16,7 @@ derive_more = { version = "0.99.16", default-features = false, features = ["disp
directories = "3.0.2"
futures = "0.3.16"
hex = "0.4.3"
k256 = { version = "0.9.6", features = ["ecdh"] }
log = "0.4.14"
orion = "0.16.0"
rand = "0.8.4"

@ -0,0 +1,3 @@
/// Capacity associated with a client broadcasting its received messages that
/// do not have a callback associated
pub static CLIENT_BROADCAST_CHANNEL_CAPACITY: usize = 100;

@ -1,3 +1,4 @@
mod constants;
mod data;
mod net;
mod opt;

@ -2,7 +2,8 @@ mod transport;
pub use transport::{Transport, TransportError, TransportReadHalf, TransportWriteHalf};
use crate::{
data::{Request, Response, ResponsePayload},
constants::CLIENT_BROADCAST_CHANNEL_CAPACITY,
data::{Request, Response},
utils::Session,
};
use log::*;
@ -12,41 +13,54 @@ use std::{
};
use tokio::{
io,
sync::{oneshot, watch},
sync::{broadcast, oneshot},
};
use tokio_stream::wrappers::WatchStream;
use tokio_stream::wrappers::BroadcastStream;
type Callbacks = Arc<Mutex<HashMap<usize, oneshot::Sender<Response>>>>;
/// Represents a client that can make requests against a server
pub struct Client {
/// Underlying transport used by client
transport: Arc<tokio::sync::Mutex<Transport>>,
t_write: TransportWriteHalf,
/// Collection of callbacks to be invoked upon receiving a response to a request
callbacks: Callbacks,
/// Callback to trigger when a response is received without an origin or with an origin
/// not found in the list of callbacks
rx: watch::Receiver<Response>,
broadcast: broadcast::Sender<Response>,
/// Represents an initial receiver for broadcasted responses that can capture responses
/// prior to a stream being established and consumed
init_broadcast_receiver: Option<broadcast::Receiver<Response>>,
}
impl Client {
/// Establishes a connection using the provided session
pub async fn connect(session: Session) -> io::Result<Self> {
let transport = Arc::new(tokio::sync::Mutex::new(Transport::connect(session).await?));
let transport = Transport::connect(session).await?;
debug!(
"Client has connected to {}",
transport
.peer_addr()
.map(|x| x.to_string())
.unwrap_or_else(|_| String::from("???"))
);
let (mut t_read, t_write) = transport.into_split();
let callbacks: Callbacks = Arc::new(Mutex::new(HashMap::new()));
let (tx, rx) = watch::channel(Response::from(ResponsePayload::Error {
description: String::from("Fake server response"),
}));
let (broadcast, init_broadcast_receiver) =
broadcast::channel(CLIENT_BROADCAST_CHANNEL_CAPACITY);
// Start a task that continually checks for responses and triggers callbacks
let transport_2 = Arc::clone(&transport);
let callbacks_2 = Arc::clone(&callbacks);
let broadcast_2 = broadcast.clone();
tokio::spawn(async move {
loop {
match transport_2.lock().await.receive::<Response>().await {
match t_read.receive::<Response>().await {
Ok(Some(res)) => {
trace!("Client got response: {:?}", res);
let maybe_callback = res
.origin_id
.as_ref()
@ -54,14 +68,16 @@ impl Client {
// If there is an origin to this response, trigger the callback
if let Some(tx) = maybe_callback {
trace!("Client has callback! Triggering!");
if let Err(res) = tx.send(res) {
error!("Failed to trigger callback for response {}", res.id);
}
// Otherwise, this goes into the junk draw of response handlers
} else {
if let Err(x) = tx.send(res) {
error!("Failed to trigger watch: {}", x);
trace!("Client does not have callback! Broadcasting!");
if let Err(x) = broadcast_2.send(res) {
error!("Failed to trigger broadcast: {}", x);
}
}
}
@ -75,20 +91,21 @@ impl Client {
});
Ok(Self {
transport,
t_write,
callbacks,
rx,
broadcast,
init_broadcast_receiver: Some(init_broadcast_receiver),
})
}
/// Sends a request and waits for a response
pub async fn send(&self, req: Request) -> Result<Response, TransportError> {
pub async fn send(&mut self, req: Request) -> Result<Response, TransportError> {
// First, add a callback that will trigger when we get the response for this request
let (tx, rx) = oneshot::channel();
self.callbacks.lock().unwrap().insert(req.id, tx);
// Second, send the request
self.transport.lock().await.send(req).await?;
self.t_write.send(req).await?;
// Third, wait for the response
rx.await
@ -96,7 +113,11 @@ impl Client {
}
/// Creates and returns a new stream of responses that are received with no originating request
pub fn to_response_stream(&self) -> WatchStream<Response> {
WatchStream::new(self.rx.clone())
pub fn to_response_stream(&mut self) -> BroadcastStream<Response> {
BroadcastStream::new(
self.init_broadcast_receiver
.take()
.unwrap_or_else(|| self.broadcast.subscribe()),
)
}
}

@ -7,7 +7,7 @@ use orion::{
errors::UnknownCryptoError,
};
use serde::{de::DeserializeOwned, Serialize};
use std::sync::Arc;
use std::{net::SocketAddr, sync::Arc};
use tokio::{
io,
net::{tcp, TcpStream},
@ -46,7 +46,13 @@ impl Transport {
Ok(Self::new(stream, Arc::new(session.key)))
}
/// Returns the address of the peer the transport is connected to
pub fn peer_addr(&self) -> io::Result<SocketAddr> {
self.inner.get_ref().peer_addr()
}
/// Sends some data across the wire
#[allow(dead_code)]
pub async fn send<T: Serialize>(&mut self, data: T) -> Result<(), TransportError> {
// Serialize, encrypt, and then (TODO) sign
// NOTE: Cannot used packed implementation for now due to issues with deserialization
@ -61,6 +67,7 @@ impl Transport {
/// Receives some data from out on the wire, waiting until it's available,
/// returning none if the transport is now closed
#[allow(dead_code)]
pub async fn receive<T: DeserializeOwned>(&mut self) -> Result<Option<T>, TransportError> {
// If data is received, we process like usual
if let Some(data) = self.inner.next().await {

@ -89,7 +89,7 @@ async fn run_async(cmd: LaunchSubcommand, _opt: CommonOpt) -> Result<(), Error>
session.save().await?;
if cmd.print_startup_data {
println!("DISTANT DATA {} {}", port, session.to_hex_key());
println!("DISTANT DATA {} {}", port, session.to_unprotected_hex_key());
}
Ok(())

@ -207,7 +207,7 @@ async fn proc_run(
// Spawn a task that sends stdout as a response
let tx_2 = tx.clone();
let mut stdout = child.stdout.take().unwrap();
tokio::spawn(async move {
let stdout_task = tokio::spawn(async move {
loop {
let mut data = Vec::new();
match stdout.read_to_end(&mut data).await {
@ -228,7 +228,7 @@ async fn proc_run(
// Spawn a task that sends stderr as a response
let tx_2 = tx.clone();
let mut stderr = child.stderr.take().unwrap();
tokio::spawn(async move {
let stderr_task = tokio::spawn(async move {
loop {
let mut data = Vec::new();
match stderr.read_to_end(&mut data).await {
@ -265,6 +265,14 @@ async fn proc_run(
tokio::spawn(async move {
tokio::select! {
status = child.wait() => {
if let Err(x) = stderr_task.await {
error!("Join on stderr task failed: {}", x);
}
if let Err(x) = stdout_task.await {
error!("Join on stdout task failed: {}", x);
}
match status {
Ok(status) => {
let success = status.success();
@ -292,6 +300,14 @@ async fn proc_run(
error!("Unable to kill process {}: {}", id, x);
}
if let Err(x) = stderr_task.await {
error!("Join on stderr task failed: {}", x);
}
if let Err(x) = stdout_task.await {
error!("Join on stdout task failed: {}", x);
}
if let Err(_) = tx
.send(Response::from(ResponsePayload::ProcDone { id, success: false, code: None }))
.await

@ -23,7 +23,7 @@ pub fn run(cmd: SendSubcommand, opt: CommonOpt) -> Result<(), Error> {
async fn run_async(cmd: SendSubcommand, _opt: CommonOpt) -> Result<(), Error> {
let session = Session::load().await?;
let client = Client::connect(session).await?;
let mut client = Client::connect(session).await?;
let req = Request::from(cmd.operation);
@ -43,6 +43,12 @@ async fn run_async(cmd: SendSubcommand, _opt: CommonOpt) -> Result<(), Error> {
if is_proc_req && not_detach {
let mut stream = client.to_response_stream();
while let Some(res) = stream.next().await {
let res = res.map_err(|_| {
io::Error::new(
io::ErrorKind::BrokenPipe,
"Response stream no longer available",
)
})?;
let done = res.payload.is_proc_done();
print_response(cmd.format, res)?;

@ -42,7 +42,7 @@ pub struct Session {
impl Session {
/// Returns a string representing the secret key as hex
pub fn to_hex_key(&self) -> String {
pub fn to_unprotected_hex_key(&self) -> String {
hex::encode(self.key.unprotected_as_bytes())
}
@ -75,7 +75,7 @@ impl Session {
/// Saves a session to disk
pub async fn save(&self) -> io::Result<()> {
let key_hex_str = self.to_hex_key();
let key_hex_str = self.to_unprotected_hex_key();
// Ensure our cache directory exists
let cache_dir = PROJECT_DIRS.cache_dir();

Loading…
Cancel
Save