Update wezterm-ssh dep to 0.2.0, fix ssh -> distant session, refactor session to have optional details included

pull/96/head
Chip Senkbeil 3 years ago
parent 043ae6ca4b
commit c2e588544f
No known key found for this signature in database
GPG Key ID: 35EF1F8EC72A4131

47
Cargo.lock generated

@ -183,14 +183,6 @@ version = "4.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e91831deabf0d6d7ec49552e489aed63b7456a7a3c46cff62adad428110b0af0"
[[package]]
name = "async_ossl"
version = "0.1.0"
source = "git+https://github.com/chipsenkbeil/wezterm#f25fbb737563666006c166233cab10daf853a3b1"
dependencies = [
"openssl",
]
[[package]]
name = "atomic-waker"
version = "1.0.0"
@ -580,7 +572,8 @@ dependencies = [
[[package]]
name = "filedescriptor"
version = "0.8.1"
source = "git+https://github.com/chipsenkbeil/wezterm#f25fbb737563666006c166233cab10daf853a3b1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9ed3d8a5e20435ff00469e51a0d82049bae66504b5c429920dadf9bb54d47b3f"
dependencies = [
"libc",
"thiserror",
@ -647,21 +640,6 @@ version = "1.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1"
[[package]]
name = "foreign-types"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1"
dependencies = [
"foreign-types-shared",
]
[[package]]
name = "foreign-types-shared"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b"
[[package]]
name = "fork"
version = "0.1.18"
@ -1128,20 +1106,6 @@ version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5"
[[package]]
name = "openssl"
version = "0.10.36"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8d9facdb76fec0b73c406f125d44d86fdad818d66fef0531eec9233ca425ff4a"
dependencies = [
"bitflags",
"cfg-if",
"foreign-types",
"libc",
"once_cell",
"openssl-sys",
]
[[package]]
name = "openssl-src"
version = "111.16.0+1.1.1l"
@ -1247,7 +1211,8 @@ dependencies = [
[[package]]
name = "portable-pty"
version = "0.5.0"
source = "git+https://github.com/chipsenkbeil/wezterm#f25fbb737563666006c166233cab10daf853a3b1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b8383c3934bd6da733223ad1b22f8102c46e8bbced07800b2346cc34326ff83"
dependencies = [
"anyhow",
"bitflags",
@ -2053,10 +2018,10 @@ dependencies = [
[[package]]
name = "wezterm-ssh"
version = "0.2.0"
source = "git+https://github.com/chipsenkbeil/wezterm#f25fbb737563666006c166233cab10daf853a3b1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e233b00aaa22b6ef9b573534e529e9672da8662a8355c37acacedd88741ce1ec"
dependencies = [
"anyhow",
"async_ossl",
"base64",
"bitflags",
"camino",

@ -5,10 +5,12 @@ use crate::{
net::{Codec, DataStream, Transport, TransportError},
};
use log::*;
use serde::{Deserialize, Serialize};
use std::{
convert,
net::SocketAddr,
ops::{Deref, DerefMut},
path::{Path, PathBuf},
sync::{Arc, Weak},
};
use tokio::{
@ -29,13 +31,54 @@ mod mailbox;
pub use mailbox::Mailbox;
use mailbox::PostOffice;
/// Details about the session
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum SessionDetails {
/// Indicates session is a TCP type
Tcp { addr: SocketAddr, tag: String },
/// Indicates session is a Unix socket type
Socket { path: PathBuf, tag: String },
/// Indicates session type is inmemory
Inmemory { tag: String },
}
impl SessionDetails {
/// Represents the tag associated with the session
pub fn tag(&self) -> &str {
match self {
Self::Tcp { tag, .. } => tag.as_str(),
Self::Socket { tag, .. } => tag.as_str(),
Self::Inmemory { tag } => tag.as_str(),
}
}
/// Represents the socket address associated with the session, if it has one
pub fn addr(&self) -> Option<SocketAddr> {
match self {
Self::Tcp { addr, .. } => Some(*addr),
_ => None,
}
}
/// Represents the path associated with the session, if it has one
pub fn path(&self) -> Option<&Path> {
match self {
Self::Socket { path, .. } => Some(path.as_path()),
_ => None,
}
}
}
/// Represents a session with a remote server that can be used to send requests & receive responses
pub struct Session {
/// Used to send requests to a server
channel: SessionChannel,
/// Textual description of the underlying connection
connection_tag: String,
/// Details about the session
details: Option<SessionDetails>,
/// Contains the task that is running to send requests to a server
request_task: JoinHandle<()>,
@ -54,6 +97,10 @@ impl Session {
U: Codec + Send + 'static,
{
let transport = Transport::<TcpStream, U>::connect(addr, codec).await?;
let details = SessionDetails::Tcp {
addr,
tag: transport.to_connection_tag(),
};
debug!(
"Session has been established with {}",
transport
@ -61,7 +108,7 @@ impl Session {
.map(|x| x.to_string())
.unwrap_or_else(|_| String::from("???"))
);
Self::initialize(transport)
Self::initialize_with_details(transport, Some(details))
}
/// Connect to a remote TCP server, timing out after duration has passed
@ -86,7 +133,12 @@ impl Session {
where
U: Codec + Send + 'static,
{
let transport = Transport::<tokio::net::UnixStream, U>::connect(path, codec).await?;
let p = path.as_ref();
let transport = Transport::<tokio::net::UnixStream, U>::connect(p, codec).await?;
let details = SessionDetails::Socket {
path: p.to_path_buf(),
tag: transport.to_connection_tag(),
};
debug!(
"Session has been established with {}",
transport
@ -94,7 +146,7 @@ impl Session {
.map(|x| format!("{:?}", x))
.unwrap_or_else(|_| String::from("???"))
);
Self::initialize(transport)
Self::initialize_with_details(transport, Some(details))
}
/// Connect to a proxy unix socket, timing out after duration has passed
@ -113,13 +165,24 @@ impl Session {
}
impl Session {
/// Initializes a session using the provided transport
/// Initializes a session using the provided transport and no extra details
pub fn initialize<T, U>(transport: Transport<T, U>) -> io::Result<Self>
where
T: DataStream,
U: Codec + Send + 'static,
{
let connection_tag = transport.to_connection_tag();
Self::initialize_with_details(transport, None)
}
/// Initializes a session using the provided transport and extra details
pub fn initialize_with_details<T, U>(
transport: Transport<T, U>,
details: Option<SessionDetails>,
) -> io::Result<Self>
where
T: DataStream,
U: Codec + Send + 'static,
{
let (mut t_read, mut t_write) = transport.into_split();
let post_office = Arc::new(Mutex::new(PostOffice::new()));
let weak_post_office = Arc::downgrade(&post_office);
@ -190,7 +253,7 @@ impl Session {
Ok(Self {
channel,
connection_tag,
details,
request_task,
response_task,
prune_task,
@ -199,9 +262,9 @@ impl Session {
}
impl Session {
/// Returns a textual description of the underlying connection
pub fn connection_tag(&self) -> &str {
&self.connection_tag
/// Returns details about the session, if it has any
pub fn details(&self) -> Option<&SessionDetails> {
self.details.as_ref()
}
/// Waits for the session to terminate, which results when the receiving end of the network

@ -89,6 +89,30 @@ where
self.0.get_ref().to_connection_tag()
}
/// Returns a reference to the underlying I/O stream
///
/// Note that care should be taken to not tamper with the underlying stream of data coming in
/// as it may corrupt the stream of frames otherwise being worked with
pub fn get_ref(&self) -> &T {
self.0.get_ref()
}
/// Returns a reference to the underlying I/O stream
///
/// Note that care should be taken to not tamper with the underlying stream of data coming in
/// as it may corrupt the stream of frames otherwise being worked with
pub fn get_mut(&mut self) -> &mut T {
self.0.get_mut()
}
/// Consumes the transport, returning its underlying I/O stream
///
/// Note that care should be taken to not tamper with the underlying stream of data coming in
/// as it may corrupt the stream of frames otherwise being worked with.
pub fn into_inner(self) -> T {
self.0.into_inner()
}
/// Splits transport into read and write halves
pub fn into_split(
self,

@ -1,6 +1,6 @@
use crate::{runtime, utils};
use distant_core::{
SecretKey32, Session as DistantSession, SessionChannel, XChaCha20Poly1305Codec,
SecretKey32, Session as DistantSession, SessionChannel, SessionDetails, XChaCha20Poly1305Codec,
};
use distant_ssh2::{IntoDistantSessionOpts, Ssh2Session};
use log::*;
@ -118,8 +118,8 @@ fn with_session<T>(id: usize, f: impl FnOnce(&DistantSession) -> T) -> LuaResult
Ok(f(session))
}
fn get_session_connection_tag(id: usize) -> LuaResult<String> {
with_session(id, |session| session.connection_tag().to_string())
fn get_session_details(id: usize) -> LuaResult<Option<SessionDetails>> {
with_session(id, |session| session.details().cloned())
}
fn get_session_channel(id: usize) -> LuaResult<SessionChannel> {
@ -181,7 +181,7 @@ impl Session {
})
.await
.to_lua_err()?,
Mode::Ssh => ssh_session.into_ssh_client_session().to_lua_err()?,
Mode::Ssh => ssh_session.into_ssh_client_session().await.to_lua_err()?,
};
// Fourth, store our current session in our global map and then return a reference
@ -276,8 +276,8 @@ macro_rules! impl_methods {
impl UserData for Session {
fn add_fields<'lua, F: UserDataFields<'lua, Self>>(fields: &mut F) {
fields.add_field_method_get("id", |_, this| Ok(this.id));
fields.add_field_method_get("connection_tag", |_, this| {
get_session_connection_tag(this.id)
fields.add_field_method_get("details", |lua, this| {
get_session_details(this.id).and_then(|x| to_value!(lua, &x))
});
}

@ -20,7 +20,7 @@ rpassword = "5.0.1"
shell-words = "1.0"
smol = "1.2"
tokio = { version = "1.12.0", features = ["full"] }
wezterm-ssh = { version = "0.2.0", features = ["vendored-openssl"], git = "https://github.com/chipsenkbeil/wezterm" }
wezterm-ssh = { version = "0.2.0", features = ["vendored-openssl"] }
# Optional serde support for data structures
serde = { version = "1.0.126", features = ["derive"], optional = true }

@ -14,7 +14,9 @@ use std::{
sync::Arc,
};
use tokio::sync::{mpsc, Mutex};
use wezterm_ssh::{Child, ExecResult, OpenFileType, OpenOptions, Session as WezSession, WriteMode};
use wezterm_ssh::{
Child, ExecResult, FilePermissions, OpenFileType, OpenOptions, Session as WezSession, WriteMode,
};
const MAX_PIPE_CHUNK_SIZE: usize = 8192;
const READ_PAUSE_MILLIS: u64 = 50;
@ -588,9 +590,12 @@ async fn metadata(
Ok(Outgoing::from(ResponseData::Metadata(Metadata {
canonicalized_path,
file_type,
len: metadata.len(),
len: metadata.size.unwrap_or(0),
// Check that owner, group, or other has write permission (if not, then readonly)
readonly: metadata.is_readonly(),
readonly: metadata
.permissions
.map(FilePermissions::is_readonly)
.unwrap_or(true),
accessed: metadata.accessed.map(u128::from),
modified: metadata.modified.map(u128::from),
created: None,

@ -1,12 +1,14 @@
use async_compat::CompatExt;
use distant_core::{
Request, Session, SessionChannelExt, SessionInfo, Transport, XChaCha20Poly1305Codec,
Request, Session, SessionChannelExt, SessionDetails, SessionInfo, Transport,
XChaCha20Poly1305Codec,
};
use log::*;
use smol::channel::Receiver as SmolReceiver;
use std::{
collections::BTreeMap,
io::{self, Write},
net::{IpAddr, SocketAddr},
path::PathBuf,
sync::Arc,
time::Duration,
@ -187,6 +189,8 @@ impl Default for Ssh2AuthHandler<'static> {
pub struct Ssh2Session {
session: WezSession,
events: SmolReceiver<WezSessionEvent>,
host: String,
port: u16,
authenticated: bool,
}
@ -243,6 +247,13 @@ impl Ssh2Session {
// Add in any of the other options provided
config.extend(opts.other);
// Port should always exist, otherwise WezSession will panic from unwrap()
let port = config
.get("port")
.ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, "Missing port"))?
.parse::<u16>()
.map_err(|x| io::Error::new(io::ErrorKind::InvalidData, x))?;
// Establish a connection
let (session, events) =
WezSession::connect(config).map_err(|x| io::Error::new(io::ErrorKind::Other, x))?;
@ -250,10 +261,22 @@ impl Ssh2Session {
Ok(Self {
session,
events,
host: host.as_ref().to_string(),
port,
authenticated: false,
})
}
/// Host this session is connected to
pub fn host(&self) -> &str {
&self.host
}
/// Port this session is connected to on remote host
pub fn port(&self) -> u16 {
self.port
}
#[inline]
pub fn is_authenticated(&self) -> bool {
self.authenticated
@ -328,7 +351,23 @@ impl Ssh2Session {
));
}
let mut session = self.into_ssh_client_session()?;
// Determine distinct candidate ip addresses for connecting
let mut candidate_ips = tokio::net::lookup_host(format!("{}:{}", self.host, self.port))
.await?
.into_iter()
.map(|addr| addr.ip())
.collect::<Vec<IpAddr>>();
candidate_ips.sort_unstable();
candidate_ips.dedup();
if candidate_ips.is_empty() {
return Err(io::Error::new(
io::ErrorKind::AddrNotAvailable,
format!("Unable to resolve {}:{}", self.host, self.port),
));
}
// Turn our ssh connection into a client session so we can use it to spawn our server
let mut session = self.into_ssh_client_session().await?;
// Build arguments for distant
let mut args = vec![String::from("listen"), String::from("--daemon")];
@ -358,6 +397,7 @@ impl Ssh2Session {
// Spawn distant server and detach it so that we don't kill it when the
// ssh session is closed
debug!("Executing {} {}", bin, args.join(" "));
let mut proc = session
.spawn("<ssh-launch>", bin, args, true)
.await
@ -380,15 +420,29 @@ impl Ssh2Session {
while let Ok(data) = stdout.read().await {
out.push_str(&data);
}
let maybe_info = out
.lines()
.find_map(|line| line.parse::<SessionInfo>().ok());
match maybe_info {
Some(info) => {
let addr = info.to_socket_addr().await?;
let key = info.key;
let codec = XChaCha20Poly1305Codec::from(key);
Session::tcp_connect_timeout(addr, codec, opts.timeout).await
// Try each IP address with the same port to see if one works
let mut err = None;
for ip in candidate_ips {
let addr = SocketAddr::new(ip, info.port);
debug!("Attempting to connect to distant server @ {}", addr);
match Session::tcp_connect_timeout(addr, codec.clone(), opts.timeout).await
{
Ok(session) => return Ok(session),
Err(x) => err = Some(x),
}
}
// If all failed, return the last error we got
Err(err.expect("Err set above"))
}
None => Err(io::Error::new(
io::ErrorKind::InvalidData,
@ -415,7 +469,7 @@ impl Ssh2Session {
/// Consume [`Ssh2Session`] and produce a distant [`Session`] that is powered by an ssh client
/// underneath
pub fn into_ssh_client_session(self) -> io::Result<Session> {
pub async fn into_ssh_client_session(self) -> io::Result<Session> {
// Exit early if not authenticated as this is a requirement
if !self.authenticated {
return Err(io::Error::new(
@ -425,7 +479,19 @@ impl Ssh2Session {
}
let (t1, t2) = Transport::pair(1);
let session = Session::initialize(t1)?;
let addr = tokio::net::lookup_host(format!("{}:{}", self.host, self.port))
.await?
.next()
.ok_or_else(|| {
io::Error::new(
io::ErrorKind::AddrNotAvailable,
format!("Failed to resolve host: {}", self.host),
)
})?;
let tag = t1.to_connection_tag();
let session =
Session::initialize_with_details(t1, Some(SessionDetails::Tcp { addr, tag }))?;
// Spawn tasks that forward requests to the ssh session
// and send back responses from the ssh session

@ -432,5 +432,5 @@ pub async fn session(sshd: &'_ Sshd, _logger: &'_ flexi_logger::LoggerHandle) ->
.await
.unwrap();
ssh2_session.into_ssh_client_session().unwrap()
ssh2_session.into_ssh_client_session().await.unwrap()
}

@ -66,7 +66,10 @@ impl CommandRunner {
.await
.map_err(wrap_err)?;
(session.into_ssh_client_session().map_err(wrap_err)?, None)
(
session.into_ssh_client_session().await.map_err(wrap_err)?,
None,
)
}
Method::Distant => {

Loading…
Cancel
Save