Several core and lua enhancements

1. Implement system_info and spawn_wait for lua session
2. Implement wait and output for remote process
3. Switch mlua to git latest
4. Update core data error to be error type with io error conversions
5. Add proper error reporting when process gets an error response
6. Update lua launch and connect options to have defaults
pull/96/head
Chip Senkbeil 3 years ago
parent 89d1cf0e5a
commit 788fa48e96
No known key found for this signature in database
GPG Key ID: 35EF1F8EC72A4131

@ -273,7 +273,9 @@ jobs:
# built and added to each release manually
files: |
${{ env.MACOS }}/${{ env.MACOS_UNIVERSAL_BIN }}
${{ env.MACOS }}/${{ env.MACOS_UNIVERSAL_LIB }}
${{ env.MACOS }}/${{ env.MACOS_X86_LIB }}
${{ env.MACOS }}/${{ env.MACOS_ARM_LIB }}
${{ env.WIN64 }}/${{ env.WIN64_BIN }}
${{ env.WIN64 }}/${{ env.WIN64_LIB }}
${{ env.LINUX64 }}/${{ env.LINUX64_GNU_BIN }}

6
Cargo.lock generated

@ -1034,8 +1034,7 @@ dependencies = [
[[package]]
name = "mlua"
version = "0.6.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e10220b40602740bbb1bfa676bb477c7587ec1226d26f9a5f379192ea0a3e24f"
source = "git+https://github.com/khvzak/mlua.git#458b06796c39d3c04e7c4c250f84ccf7f5958106"
dependencies = [
"bstr 0.2.17",
"cc",
@ -1055,8 +1054,7 @@ dependencies = [
[[package]]
name = "mlua_derive"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1713774a29db53a48932596dc943439dd54eb56a9efaace716719cc10fa82d5b"
source = "git+https://github.com/khvzak/mlua.git#458b06796c39d3c04e7c4c250f84ccf7f5958106"
dependencies = [
"itertools",
"once_cell",

@ -90,6 +90,11 @@ impl RemoteProcess {
let origin_id = res.origin_id;
match res.payload.into_iter().next().unwrap() {
ResponseData::ProcStart { id } => (id, origin_id),
ResponseData::Error(x) => {
return Err(RemoteProcessError::TransportError(TransportError::IoError(
x.into(),
)))
}
x => {
return Err(RemoteProcessError::TransportError(TransportError::IoError(
io::Error::new(

@ -34,6 +34,9 @@ pub struct Session {
/// Used to send requests to a server
channel: SessionChannel,
/// Textual description of the underlying connection
connection_tag: String,
/// Contains the task that is running to send requests to a server
request_task: JoinHandle<()>,
@ -116,6 +119,7 @@ impl Session {
T: DataStream,
U: Codec + Send + 'static,
{
let connection_tag = transport.to_connection_tag();
let (mut t_read, mut t_write) = transport.into_split();
let post_office = Arc::new(Mutex::new(PostOffice::new()));
let weak_post_office = Arc::downgrade(&post_office);
@ -186,6 +190,7 @@ impl Session {
Ok(Self {
channel,
connection_tag,
request_task,
response_task,
prune_task,
@ -194,6 +199,11 @@ impl Session {
}
impl Session {
/// Returns a textual description of the underlying connection
pub fn connection_tag(&self) -> &str {
&self.connection_tag
}
/// Waits for the session to terminate, which results when the receiving end of the network
/// connection is closed (or the session is shutdown)
pub async fn wait(self) -> Result<(), JoinError> {

@ -501,6 +501,8 @@ pub struct Error {
pub description: String,
}
impl std::error::Error for Error {}
impl From<io::Error> for Error {
fn from(x: io::Error) -> Self {
Self {
@ -510,6 +512,12 @@ impl From<io::Error> for Error {
}
}
impl From<Error> for io::Error {
fn from(x: Error) -> Self {
Self::new(x.kind.into(), x.description)
}
}
impl From<walkdir::Error> for Error {
fn from(x: walkdir::Error) -> Self {
if x.io_error().is_some() {
@ -638,6 +646,32 @@ impl From<io::ErrorKind> for ErrorKind {
}
}
impl From<ErrorKind> for io::ErrorKind {
fn from(kind: ErrorKind) -> Self {
match kind {
ErrorKind::NotFound => Self::NotFound,
ErrorKind::PermissionDenied => Self::PermissionDenied,
ErrorKind::ConnectionRefused => Self::ConnectionRefused,
ErrorKind::ConnectionReset => Self::ConnectionReset,
ErrorKind::ConnectionAborted => Self::ConnectionAborted,
ErrorKind::NotConnected => Self::NotConnected,
ErrorKind::AddrInUse => Self::AddrInUse,
ErrorKind::AddrNotAvailable => Self::AddrNotAvailable,
ErrorKind::BrokenPipe => Self::BrokenPipe,
ErrorKind::AlreadyExists => Self::AlreadyExists,
ErrorKind::WouldBlock => Self::WouldBlock,
ErrorKind::InvalidInput => Self::InvalidInput,
ErrorKind::InvalidData => Self::InvalidData,
ErrorKind::TimedOut => Self::TimedOut,
ErrorKind::WriteZero => Self::WriteZero,
ErrorKind::Interrupted => Self::Interrupted,
ErrorKind::Other => Self::Other,
ErrorKind::UnexpectedEof => Self::UnexpectedEof,
_ => Self::Other,
}
}
}
/// Used to provide a default serde value of 1
const fn one() -> usize {
1

@ -20,7 +20,7 @@ assert_fs = "1.0.4"
distant-core = { path = "../distant-core" }
futures = "0.3.17"
indoc = "1.0.3"
mlua = { version = "0.6.5", features = ["async", "macros", "serialize"] }
mlua = { git = "https://github.com/khvzak/mlua.git", features = ["async", "macros", "serialize"] }
once_cell = "1.8.0"
predicates = "2.0.2"
rstest = "0.11.0"

@ -10,5 +10,7 @@ mod read_file_text;
mod remove;
mod rename;
mod spawn;
mod spawn_wait;
mod system_info;
mod write_file;
mod write_file_text;

@ -114,7 +114,7 @@ fn should_return_back_process_on_success(ctx: &'_ DistantServerCtx) {
err = res
end
end)
assert(not err, "Unexpectedly failed to spawn process")
assert(not err, "Unexpectedly failed to spawn process: " .. tostring(err))
assert(proc.id >= 0, "Invalid process returned")
})
.exec();
@ -158,7 +158,7 @@ fn should_return_process_that_can_retrieve_stdout(ctx: &'_ DistantServerCtx) {
err = res
end
end)
assert(not err, "Unexpectedly failed to spawn process")
assert(not err, "Unexpectedly failed to spawn process: " .. tostring(err))
assert(proc, "Missing proc")
// Wait briefly to ensure the process sends stdout
@ -173,7 +173,7 @@ fn should_return_process_that_can_retrieve_stdout(ctx: &'_ DistantServerCtx) {
err = res
end
end)
assert(not err, "Unexpectedly failed reading stdout")
assert(not err, "Unexpectedly failed reading stdout: " .. tostring(err))
assert(stdout == "some stdout", "Unexpected stdout: " .. stdout)
})
.exec();
@ -217,7 +217,7 @@ fn should_return_process_that_can_retrieve_stderr(ctx: &'_ DistantServerCtx) {
err = res
end
end)
assert(not err, "Unexpectedly failed to spawn process")
assert(not err, "Unexpectedly failed to spawn process: " .. tostring(err))
assert(proc, "Missing proc")
// Wait briefly to ensure the process sends stdout
@ -232,7 +232,7 @@ fn should_return_process_that_can_retrieve_stderr(ctx: &'_ DistantServerCtx) {
err = res
end
end)
assert(not err, "Unexpectedly failed reading stdout")
assert(not err, "Unexpectedly failed reading stdout: " .. tostring(err))
assert(stderr == "some stderr", "Unexpected stderr: " .. stderr)
})
.exec();
@ -271,7 +271,7 @@ fn should_return_error_when_killing_dead_process(ctx: &'_ DistantServerCtx) {
err = res
end
end)
assert(not err, "Unexpectedly failed to spawn process")
assert(not err, "Unexpectedly failed to spawn process: " .. tostring(err))
assert(proc, "Missing proc")
// Wait briefly to ensure the process dies
@ -314,7 +314,7 @@ fn should_support_killing_processing(ctx: &'_ DistantServerCtx) {
err = res
end
end)
assert(not err, "Unexpectedly failed to spawn process")
assert(not err, "Unexpectedly failed to spawn process: " .. tostring(err))
assert(proc, "Missing proc")
local f = distant.utils.wrap_async(proc.kill_async, $schedule_fn)
@ -324,7 +324,7 @@ fn should_support_killing_processing(ctx: &'_ DistantServerCtx) {
err = res
end
end)
assert(not err, "Unexpectedly failed to kill process")
assert(not err, "Unexpectedly failed to kill process: " .. tostring(err))
})
.exec();
assert!(result.is_ok(), "Failed: {}", result.unwrap_err());
@ -362,7 +362,7 @@ fn should_return_error_if_sending_stdin_to_dead_process(ctx: &'_ DistantServerCt
err = res
end
end)
assert(not err, "Unexpectedly failed to spawn process")
assert(not err, "Unexpectedly failed to spawn process: " .. tostring(err))
assert(proc, "Missing proc")
// Wait briefly to ensure the process dies
@ -408,7 +408,7 @@ fn should_support_sending_stdin_to_spawned_process(ctx: &'_ DistantServerCtx) {
err = res
end
end)
assert(not err, "Unexpectedly failed spawning process")
assert(not err, "Unexpectedly failed spawning process: " .. tostring(err))
assert(proc, "Missing proc")
local f = distant.utils.wrap_async(proc.write_stdin_async, $schedule_fn)
@ -418,7 +418,7 @@ fn should_support_sending_stdin_to_spawned_process(ctx: &'_ DistantServerCtx) {
err = res
end
end)
assert(not err, "Unexpectedly failed writing stdin")
assert(not err, "Unexpectedly failed writing stdin: " .. tostring(err))
local f = distant.utils.wrap_async(proc.read_stdout_async, $schedule_fn)
local err, stdout
@ -429,7 +429,7 @@ fn should_support_sending_stdin_to_spawned_process(ctx: &'_ DistantServerCtx) {
err = res
end
end)
assert(not err, "Unexpectedly failed reading stdout")
assert(not err, "Unexpectedly failed reading stdout: " .. tostring(err))
assert(stdout == "some text\n", "Unexpected stdout received: " .. stdout)
})
.exec();

@ -0,0 +1,173 @@
use crate::common::{fixtures::*, lua, poll, session};
use assert_fs::prelude::*;
use mlua::chunk;
use once_cell::sync::Lazy;
use rstest::*;
static TEMP_SCRIPT_DIR: Lazy<assert_fs::TempDir> = Lazy::new(|| assert_fs::TempDir::new().unwrap());
static SCRIPT_RUNNER: Lazy<String> = Lazy::new(|| String::from("bash"));
static ECHO_ARGS_TO_STDOUT_SH: Lazy<assert_fs::fixture::ChildPath> = Lazy::new(|| {
let script = TEMP_SCRIPT_DIR.child("echo_args_to_stdout.sh");
script
.write_str(indoc::indoc!(
r#"
#/usr/bin/env bash
printf "%s" "$*"
"#
))
.unwrap();
script
});
static ECHO_ARGS_TO_STDERR_SH: Lazy<assert_fs::fixture::ChildPath> = Lazy::new(|| {
let script = TEMP_SCRIPT_DIR.child("echo_args_to_stderr.sh");
script
.write_str(indoc::indoc!(
r#"
#/usr/bin/env bash
printf "%s" "$*" 1>&2
"#
))
.unwrap();
script
});
static DOES_NOT_EXIST_BIN: Lazy<assert_fs::fixture::ChildPath> =
Lazy::new(|| TEMP_SCRIPT_DIR.child("does_not_exist_bin"));
#[rstest]
fn should_return_error_on_failure(ctx: &'_ DistantServerCtx) {
let lua = lua::make().unwrap();
let new_session = session::make_function(&lua, ctx).unwrap();
let schedule_fn = poll::make_function(&lua).unwrap();
let cmd = DOES_NOT_EXIST_BIN.to_str().unwrap().to_string();
let args: Vec<String> = Vec::new();
let result = lua
.load(chunk! {
local session = $new_session()
local distant = require("distant_lua")
local f = distant.utils.wrap_async(session.spawn_wait_async, $schedule_fn)
// Because of our scheduler, the invocation turns async -> sync
local err
f(session, { cmd = $cmd, args = $args }, function(success, res)
if not success then
err = res
end
end)
assert(err, "Unexpectedly succeeded")
})
.exec();
assert!(result.is_ok(), "Failed: {}", result.unwrap_err());
}
#[rstest]
fn should_return_back_status_on_success(ctx: &'_ DistantServerCtx) {
let lua = lua::make().unwrap();
let new_session = session::make_function(&lua, ctx).unwrap();
let schedule_fn = poll::make_function(&lua).unwrap();
let cmd = SCRIPT_RUNNER.to_string();
let args = vec![ECHO_ARGS_TO_STDOUT_SH.to_str().unwrap().to_string()];
let result = lua
.load(chunk! {
local session = $new_session()
local distant = require("distant_lua")
local f = distant.utils.wrap_async(session.spawn_wait_async, $schedule_fn)
// Because of our scheduler, the invocation turns async -> sync
local err, output
f(session, { cmd = $cmd, args = $args }, function(success, res)
if success then
output = res
else
err = res
end
end)
assert(not err, "Unexpectedly failed to spawn process: " .. tostring(err))
assert(output, "Missing process output")
assert(output.success, "Process output returned !success")
})
.exec();
assert!(result.is_ok(), "Failed: {}", result.unwrap_err());
}
// NOTE: Ignoring on windows because it's using WSL which wants a Linux path
// with / but thinks it's on windows and is providing \
#[rstest]
#[cfg_attr(windows, ignore)]
fn should_return_process_that_can_retrieve_stdout(ctx: &'_ DistantServerCtx) {
let lua = lua::make().unwrap();
let new_session = session::make_function(&lua, ctx).unwrap();
let schedule_fn = poll::make_function(&lua).unwrap();
let cmd = SCRIPT_RUNNER.to_string();
let args = vec![
ECHO_ARGS_TO_STDOUT_SH.to_str().unwrap().to_string(),
String::from("some stdout"),
];
let result = lua
.load(chunk! {
local session = $new_session()
local distant = require("distant_lua")
local f = distant.utils.wrap_async(session.spawn_wait_async, $schedule_fn)
// Because of our scheduler, the invocation turns async -> sync
local err, output
f(session, { cmd = $cmd, args = $args }, function(success, res)
if success then
output = res
else
err = res
end
end)
assert(not err, "Unexpectedly failed to spawn process: " .. tostring(err))
assert(output, "Missing process output")
assert(output.stdout, "some stdout")
})
.exec();
assert!(result.is_ok(), "Failed: {}", result.unwrap_err());
}
// NOTE: Ignoring on windows because it's using WSL which wants a Linux path
// with / but thinks it's on windows and is providing \
#[rstest]
#[cfg_attr(windows, ignore)]
fn should_return_process_that_can_retrieve_stderr(ctx: &'_ DistantServerCtx) {
let lua = lua::make().unwrap();
let new_session = session::make_function(&lua, ctx).unwrap();
let schedule_fn = poll::make_function(&lua).unwrap();
let cmd = SCRIPT_RUNNER.to_string();
let args = vec![
ECHO_ARGS_TO_STDERR_SH.to_str().unwrap().to_string(),
String::from("some stderr"),
];
let result = lua
.load(chunk! {
local session = $new_session()
local distant = require("distant_lua")
local f = distant.utils.wrap_async(session.spawn_wait_async, $schedule_fn)
// Because of our scheduler, the invocation turns async -> sync
local err, output
f(session, { cmd = $cmd, args = $args }, function(success, res)
if success then
output = res
else
err = res
end
end)
assert(not err, "Unexpectedly failed to spawn process: " .. tostring(err))
assert(output, "Missing process output")
assert(output.stderr, "some stderr")
})
.exec();
assert!(result.is_ok(), "Failed: {}", result.unwrap_err());
}

@ -0,0 +1,33 @@
use crate::common::{fixtures::*, lua, poll, session};
use mlua::chunk;
use rstest::*;
#[rstest]
fn should_return_system_information(ctx: &'_ DistantServerCtx) {
let lua = lua::make().unwrap();
let new_session = session::make_function(&lua, ctx).unwrap();
let schedule_fn = poll::make_function(&lua).unwrap();
let result = lua
.load(chunk! {
local session = $new_session()
local f = require("distant_lua").utils.wrap_async(
session.system_info_async,
$schedule_fn
)
// Because of our scheduler, the invocation turns async -> sync
local err, system_info
f(session, function(success, res)
if success then
system_info = res
else
err = res
end
end)
assert(not err, "Unexpectedly failed")
assert(system_info, "Missing system information")
})
.exec();
assert!(result.is_ok(), "Failed: {}", result.unwrap_err());
}

@ -10,5 +10,6 @@ mod read_file_text;
mod remove;
mod rename;
mod spawn;
mod system_info;
mod write_file;
mod write_file_text;

@ -0,0 +1,18 @@
use crate::common::{fixtures::*, lua, session};
use mlua::chunk;
use rstest::*;
#[rstest]
fn should_return_system_info(ctx: &'_ DistantServerCtx) {
let lua = lua::make().unwrap();
let new_session = session::make_function(&lua, ctx).unwrap();
let result = lua
.load(chunk! {
local session = $new_session()
local system_info = session:system_info()
assert(system_info, "System info unexpectedly missing")
})
.exec();
assert!(result.is_ok(), "Failed: {}", result.unwrap_err());
}

@ -28,7 +28,7 @@ distant-core = { version = "=0.15.0-alpha.7", path = "../distant-core" }
distant-ssh2 = { version = "=0.15.0-alpha.7", features = ["serde"], path = "../distant-ssh2" }
futures = "0.3.17"
log = "0.4.14"
mlua = { version = "0.6.5", features = ["async", "macros", "module", "serialize"] }
mlua = { git = "https://github.com/khvzak/mlua.git", features = ["async", "macros", "module", "serialize"] }
once_cell = "1.8.0"
oorandom = "11.1.3"
paste = "1.0.5"

@ -0,0 +1,2 @@
/// Default timeout (15 secs)
pub const TIMEOUT_MILLIS: u64 = 15000;

@ -13,6 +13,7 @@ macro_rules! to_value {
}};
}
mod constants;
mod log;
mod runtime;
mod session;

@ -3,6 +3,7 @@ use distant_core::{
SecretKey32, Session as DistantSession, SessionChannel, XChaCha20Poly1305Codec,
};
use distant_ssh2::{IntoDistantSessionOpts, Ssh2Session};
use log::*;
use mlua::{prelude::*, LuaSerdeExt, UserData, UserDataFields, UserDataMethods};
use once_cell::sync::Lazy;
use paste::paste;
@ -12,6 +13,9 @@ use std::{collections::HashMap, io, sync::RwLock};
pub fn make_session_tbl(lua: &Lua) -> LuaResult<LuaTable> {
let tbl = lua.create_table()?;
// get_all() -> Vec<Session>
tbl.set("get_all", lua.create_function(|_, ()| Session::all())?)?;
// get_by_id(id: usize) -> Option<Session>
tbl.set(
"get_by_id",
@ -30,7 +34,9 @@ pub fn make_session_tbl(lua: &Lua) -> LuaResult<LuaTable> {
"launch",
lua.create_function(|lua, opts: LuaValue| {
let opts = LaunchOpts::from_lua(opts, lua)?;
runtime::block_on(Session::launch(opts))
let x = runtime::block_on(Session::launch(opts))?;
trace!("launch: {:?}", x);
Ok(x)
})?,
)?;
@ -101,7 +107,7 @@ fn has_session(id: usize) -> LuaResult<bool> {
.contains_key(&id))
}
fn get_session_channel(id: usize) -> LuaResult<SessionChannel> {
fn with_session<T>(id: usize, f: impl FnOnce(&DistantSession) -> T) -> LuaResult<T> {
let lock = SESSION_MAP.read().map_err(|x| x.to_string().to_lua_err())?;
let session = lock.get(&id).ok_or_else(|| {
io::Error::new(
@ -111,7 +117,15 @@ fn get_session_channel(id: usize) -> LuaResult<SessionChannel> {
.to_lua_err()
})?;
Ok(session.clone_channel())
Ok(f(session))
}
fn get_session_connection_tag(id: usize) -> LuaResult<String> {
with_session(id, |session| session.connection_tag().to_string())
}
fn get_session_channel(id: usize) -> LuaResult<SessionChannel> {
with_session(id, |session| session.clone_channel())
}
/// Holds a reference to the session to perform remote operations
@ -126,8 +140,20 @@ impl Session {
Self { id }
}
/// Retrieves all sessions
pub fn all() -> LuaResult<Vec<Self>> {
Ok(SESSION_MAP
.read()
.map_err(|x| x.to_string().to_lua_err())?
.keys()
.copied()
.map(Self::new)
.collect())
}
/// Launches a new distant session on a remote machine
pub async fn launch(opts: LaunchOpts<'_>) -> LuaResult<Self> {
trace!("Session::launch({:?})", opts);
let LaunchOpts {
host,
mode,
@ -137,12 +163,15 @@ impl Session {
} = opts;
// First, establish a connection to an SSH server
let mut ssh_session = Ssh2Session::connect(host, ssh).to_lua_err()?;
debug!("Connecting to {} {:#?}", host, ssh);
let mut ssh_session = Ssh2Session::connect(host.as_str(), ssh).to_lua_err()?;
// Second, authenticate with the server
debug!("Authenticating against {}", host);
ssh_session.authenticate(handler).await.to_lua_err()?;
// Third, convert our ssh session into a distant session based on desired method
debug!("Mapping session for {} into {:?}", host, mode);
let session = match mode {
Mode::Distant => ssh_session
.into_distant_session(IntoDistantSessionOpts {
@ -156,6 +185,7 @@ impl Session {
// Fourth, store our current session in our global map and then return a reference
let id = utils::rand_u32()? as usize;
debug!("Session {} established against {}", id, host);
SESSION_MAP
.write()
.map_err(|x| x.to_string().to_lua_err())?
@ -165,6 +195,9 @@ impl Session {
/// Connects to an already-running remote distant server
pub async fn connect(opts: ConnectOpts) -> LuaResult<Self> {
trace!("Session::connect({:?})", opts);
debug!("Looking up {}", opts.host);
let addr = tokio::net::lookup_host(format!("{}:{}", opts.host, opts.port))
.await
.to_lua_err()?
@ -177,14 +210,17 @@ impl Session {
})
.to_lua_err()?;
debug!("Constructing codec");
let key: SecretKey32 = opts.key.parse().to_lua_err()?;
let codec = XChaCha20Poly1305Codec::from(key);
debug!("Connecting to {}", addr);
let session = DistantSession::tcp_connect_timeout(addr, codec, opts.timeout)
.await
.to_lua_err()?;
let id = utils::rand_u32()? as usize;
debug!("Session {} established against {}", id, opts.host);
SESSION_MAP
.write()
.map_err(|x| x.to_string().to_lua_err())?
@ -200,15 +236,23 @@ macro_rules! impl_methods {
};
($methods:expr, $name:ident, |$lua:ident, $data:ident| $block:block) => {{
paste! {
$methods.add_method(stringify!([<$name:snake>]), |$lua, this, params: LuaValue| {
$methods.add_method(stringify!([<$name:snake>]), |$lua, this, params: Option<LuaValue>| {
let params: LuaValue = match params {
Some(params) => params,
None => LuaValue::Table($lua.create_table()?),
};
let params: api::[<$name:camel Params>] = $lua.from_value(params)?;
let $data = api::[<$name:snake>](get_session_channel(this.id)?, params)?;
#[allow(unused_braces)]
$block
});
$methods.add_async_method(stringify!([<$name:snake _async>]), |$lua, this, params: LuaValue| async move {
$methods.add_async_method(stringify!([<$name:snake _async>]), |$lua, this, params: Option<LuaValue>| async move {
let rt = crate::runtime::get_runtime()?;
let params: LuaValue = match params {
Some(params) => params,
None => LuaValue::Table($lua.create_table()?),
};
let params: api::[<$name:camel Params>] = $lua.from_value(params)?;
let $data = {
let tmp = rt.spawn(
@ -231,10 +275,13 @@ macro_rules! impl_methods {
impl UserData for Session {
fn add_fields<'lua, F: UserDataFields<'lua, Self>>(fields: &mut F) {
fields.add_field_method_get("id", |_, this| Ok(this.id));
fields.add_field_method_get("connection_tag", |_, this| {
get_session_connection_tag(this.id)
});
}
fn add_methods<'lua, M: UserDataMethods<'lua, Self>>(methods: &mut M) {
methods.add_method("is_active", |_, this, _: LuaValue| {
methods.add_method("is_active", |_, this, ()| {
Ok(get_session_channel(this.id).is_ok())
});
@ -271,9 +318,13 @@ impl UserData for Session {
impl_methods!(methods, spawn, |_lua, proc| {
Ok(RemoteProcess::from_distant(proc))
});
impl_methods!(methods, spawn_wait);
impl_methods!(methods, spawn_lsp, |_lua, proc| {
Ok(RemoteLspProcess::from_distant(proc))
});
impl_methods!(methods, system_info, |lua, info| {
Ok(lua.to_value(&info)?)
});
impl_methods!(methods, write_file);
impl_methods!(methods, write_file_text);
}

@ -1,7 +1,10 @@
use crate::runtime;
use crate::{
runtime,
session::proc::{Output, RemoteProcess as LuaRemoteProcess},
};
use distant_core::{
DirEntry, Error as Failure, Metadata, RemoteLspProcess, RemoteProcess, SessionChannel,
SessionChannelExt,
SessionChannelExt, SystemInfo,
};
use mlua::prelude::*;
use once_cell::sync::Lazy;
@ -25,13 +28,13 @@ macro_rules! make_api {
(
$name:ident,
$ret:ty,
{$($(#[$pmeta:meta])* $pname:ident: $ptype:ty),*},
$({$($(#[$pmeta:meta])* $pname:ident: $ptype:ty),+},)?
|$channel:ident, $tenant:ident, $params:ident| $block:block $(,)?
) => {
paste! {
#[derive(Clone, Debug, Deserialize)]
pub struct [<$name:camel Params>] {
$($(#[$pmeta])* $pname: $ptype,)*
$($($(#[$pmeta])* $pname: $ptype,)*)?
#[serde(default = "default_timeout")]
timeout: u64,
@ -165,6 +168,17 @@ make_api!(
|channel, tenant, params| { channel.spawn(tenant, params.cmd, params.args).await }
);
make_api!(
spawn_wait,
Output,
{ cmd: String, args: Vec<String> },
|channel, tenant, params| {
let proc = channel.spawn(tenant, params.cmd, params.args).await.to_lua_err()?;
let id = LuaRemoteProcess::from_distant(proc)?.id;
LuaRemoteProcess::output_async(id).await
}
);
make_api!(
spawn_lsp,
RemoteLspProcess,
@ -172,6 +186,10 @@ make_api!(
|channel, tenant, params| { channel.spawn_lsp(tenant, params.cmd, params.args).await }
);
make_api!(system_info, SystemInfo, |channel, tenant, _params| {
channel.system_info(tenant).await
});
make_api!(
write_file,
(),

@ -1,3 +1,4 @@
use crate::constants::TIMEOUT_MILLIS;
use distant_ssh2::{Ssh2AuthHandler, Ssh2SessionOpts};
use mlua::prelude::*;
use serde::Deserialize;
@ -19,8 +20,8 @@ impl<'lua> FromLua<'lua> for ConnectOpts {
port: tbl.get("port")?,
key: tbl.get("key")?,
timeout: {
let milliseconds: u64 = tbl.get("timeout")?;
Duration::from_millis(milliseconds)
let milliseconds: Option<u64> = tbl.get("timeout")?;
Duration::from_millis(milliseconds.unwrap_or(TIMEOUT_MILLIS))
},
}),
LuaValue::Nil => Err(LuaError::FromLuaConversionError {
@ -100,44 +101,75 @@ impl fmt::Debug for LaunchOpts<'_> {
impl<'lua> FromLua<'lua> for LaunchOpts<'lua> {
fn from_lua(lua_value: LuaValue<'lua>, lua: &'lua Lua) -> LuaResult<Self> {
let Ssh2AuthHandler {
on_authenticate,
on_banner,
on_error,
on_host_verify,
} = Default::default();
match lua_value {
LuaValue::Table(tbl) => Ok(Self {
host: tbl.get("host")?,
mode: lua.from_value(tbl.get("mode")?)?,
mode: {
let mode: Option<LuaValue> = tbl.get("mode")?;
match mode {
Some(value) => lua.from_value(value)?,
None => Default::default(),
}
},
handler: Ssh2AuthHandler {
on_authenticate: {
let f: LuaFunction = tbl.get("on_authenticate")?;
Box::new(move |ev| {
let value = to_value!(lua, &ev)
.map_err(|x| io::Error::new(io::ErrorKind::InvalidData, x))?;
f.call::<LuaValue, Vec<String>>(value)
.map_err(|x| io::Error::new(io::ErrorKind::Other, x))
})
let f: Option<LuaFunction> = tbl.get("on_authenticate")?;
match f {
Some(f) => Box::new(move |ev| {
let value = to_value!(lua, &ev)
.map_err(|x| io::Error::new(io::ErrorKind::InvalidData, x))?;
f.call::<LuaValue, Vec<String>>(value)
.map_err(|x| io::Error::new(io::ErrorKind::Other, x))
}),
None => on_authenticate,
}
},
on_banner: {
let f: LuaFunction = tbl.get("on_banner")?;
Box::new(move |banner| {
let _ = f.call::<String, ()>(banner.to_string());
})
let f: Option<LuaFunction> = tbl.get("on_banner")?;
match f {
Some(f) => Box::new(move |banner| {
let _ = f.call::<String, ()>(banner.to_string());
}),
None => on_banner,
}
},
on_host_verify: {
let f: LuaFunction = tbl.get("on_host_verify")?;
Box::new(move |host| {
f.call::<String, bool>(host.to_string())
.map_err(|x| io::Error::new(io::ErrorKind::Other, x))
})
let f: Option<LuaFunction> = tbl.get("on_host_verify")?;
match f {
Some(f) => Box::new(move |host| {
f.call::<String, bool>(host.to_string())
.map_err(|x| io::Error::new(io::ErrorKind::Other, x))
}),
None => on_host_verify,
}
},
on_error: {
let f: LuaFunction = tbl.get("on_error")?;
Box::new(move |err| {
let _ = f.call::<String, ()>(err.to_string());
})
let f: Option<LuaFunction> = tbl.get("on_error")?;
match f {
Some(f) => Box::new(move |err| {
let _ = f.call::<String, ()>(err.to_string());
}),
None => on_error,
}
},
},
ssh: lua.from_value(tbl.get("ssh")?)?,
ssh: {
let ssh_tbl: Option<LuaValue> = tbl.get("ssh")?;
match ssh_tbl {
Some(value) => lua.from_value(value)?,
None => Default::default(),
}
},
timeout: {
let milliseconds: u64 = tbl.get("timeout")?;
Duration::from_millis(milliseconds)
let milliseconds: Option<u64> = tbl.get("timeout")?;
Duration::from_millis(milliseconds.unwrap_or(TIMEOUT_MILLIS))
},
}),
LuaValue::Nil => Err(LuaError::FromLuaConversionError {

@ -49,7 +49,7 @@ macro_rules! impl_process {
($name:ident, $type:ty, $map_name:ident) => {
#[derive(Copy, Clone, Debug)]
pub struct $name {
id: usize,
pub(crate) id: usize,
}
impl $name {
@ -151,6 +151,60 @@ macro_rules! impl_process {
})
}
fn wait(id: usize) -> LuaResult<(bool, Option<i32>)> {
runtime::block_on(Self::wait_async(id))
}
async fn wait_async(id: usize) -> LuaResult<(bool, Option<i32>)> {
let proc = $map_name.write().await.remove(&id).ok_or_else(|| {
io::Error::new(
io::ErrorKind::NotFound,
format!("No remote process found with id {}", id),
)
.to_lua_err()
})?;
proc.wait().await.to_lua_err()
}
fn output(id: usize) -> LuaResult<Output> {
runtime::block_on(Self::output_async(id))
}
pub(crate) async fn output_async(id: usize) -> LuaResult<Output> {
let mut proc = $map_name.write().await.remove(&id).ok_or_else(|| {
io::Error::new(
io::ErrorKind::NotFound,
format!("No remote process found with id {}", id),
)
.to_lua_err()
})?;
// Remove the stdout and stderr streams before letting process run to completion
let mut stdout = proc.stdout.take().unwrap();
let mut stderr = proc.stderr.take().unwrap();
// Gather stdout and stderr after process completes
let (success, exit_code) = proc.wait().await.to_lua_err()?;
let mut stdout_buf = String::new();
while let Ok(Some(data)) = stdout.try_read() {
stdout_buf.push_str(&data);
}
let mut stderr_buf = String::new();
while let Ok(Some(data)) = stderr.try_read() {
stderr_buf.push_str(&data);
}
Ok(Output {
success,
exit_code,
stdout: stdout_buf,
stderr: stderr_buf,
})
}
fn abort(id: usize) -> LuaResult<()> {
with_proc!($map_name, id, proc -> {
Ok(proc.abort())
@ -180,6 +234,14 @@ macro_rules! impl_process {
methods.add_async_method("read_stderr_async", |_, this, ()| {
runtime::spawn(Self::read_stderr_async(this.id))
});
methods.add_method("wait", |_, this, ()| Self::wait(this.id));
methods.add_async_method("wait_async", |_, this, ()| {
runtime::spawn(Self::wait_async(this.id))
});
methods.add_method("output", |_, this, ()| Self::output(this.id));
methods.add_async_method("output_async", |_, this, ()| {
runtime::spawn(Self::output_async(this.id))
});
methods.add_method("kill", |_, this, ()| Self::kill(this.id));
methods.add_async_method("kill_async", |_, this, ()| {
runtime::spawn(Self::kill_async(this.id))
@ -190,5 +252,34 @@ macro_rules! impl_process {
};
}
/// Represents process output
#[derive(Clone, Debug)]
pub struct Output {
pub success: bool,
pub exit_code: Option<i32>,
pub stdout: String,
pub stderr: String,
}
impl UserData for Output {
fn add_fields<'lua, F: UserDataFields<'lua, Self>>(fields: &mut F) {
fields.add_field_method_get("success", |_, this| Ok(this.success));
fields.add_field_method_get("exit_code", |_, this| Ok(this.exit_code));
fields.add_field_method_get("stdout", |_, this| Ok(this.stdout.to_string()));
fields.add_field_method_get("stderr", |_, this| Ok(this.stderr.to_string()));
}
fn add_methods<'lua, M: UserDataMethods<'lua, Self>>(methods: &mut M) {
methods.add_method("to_tbl", |lua, this, ()| {
let tbl = lua.create_table()?;
tbl.set("success", this.success)?;
tbl.set("exit_code", this.exit_code)?;
tbl.set("stdout", this.stdout.to_string())?;
tbl.set("stderr", this.stdout.to_string())?;
Ok(tbl)
});
}
}
impl_process!(RemoteProcess, DistantRemoteProcess, PROC_MAP);
impl_process!(RemoteLspProcess, DistantRemoteLspProcess, LSP_PROC_MAP);

@ -46,6 +46,7 @@ pub struct Ssh2AuthEvent {
/// Represents options to be provided when establishing an ssh session
#[derive(Clone, Debug, Default)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
#[cfg_attr(feature = "serde", serde(default))]
pub struct Ssh2SessionOpts {
/// List of files from which the user's DSA, ECDSA, Ed25519, or RSA authentication identity
/// is read, defaulting to

Loading…
Cancel
Save