Add support for launch to be interactive

pull/38/head v0.4.0
Chip Senkbeil 3 years ago
parent d9c2b9942a
commit bb7829e3f0
No known key found for this signature in database
GPG Key ID: 35EF1F8EC72A4131

@ -1,4 +1,4 @@
use crate::{subcommand, data::RequestPayload};
use crate::{data::RequestPayload, subcommand};
use derive_more::{Display, Error, From, IsVariant};
use lazy_static::lazy_static;
use std::{
@ -8,7 +8,7 @@ use std::{
str::FromStr,
};
use structopt::StructOpt;
use strum::{EnumString, EnumVariantNames, VariantNames, IntoStaticStr};
use strum::{EnumString, EnumVariantNames, IntoStaticStr, VariantNames};
lazy_static! {
static ref USERNAME: String = whoami::username();
@ -87,17 +87,18 @@ pub enum SessionSubcommand {
Exists,
/// Prints out information about the available sessions
Info {
Info {
/// Represents the format that results should be returned
///
/// Currently, there are two possible formats:
///
/// 1. "json": printing out JSON for external program usage
/// 3. "shell": printing out human-readable results for interactive shell usage
/// 2. "shell": printing out human-readable results for interactive shell usage
#[structopt(
short,
long,
short,
long,
case_insensitive = true,
default_value = "shell",
default_value = Mode::Shell.into(),
possible_values = Mode::VARIANTS
)]
mode: Mode,
@ -105,7 +106,18 @@ pub enum SessionSubcommand {
}
/// Represents the communication medium used for the send command
#[derive(Copy, Clone, Debug, Display, PartialEq, Eq, IsVariant, EnumString, EnumVariantNames)]
#[derive(
Copy,
Clone,
Debug,
Display,
PartialEq,
Eq,
IsVariant,
IntoStaticStr,
EnumString,
EnumVariantNames,
)]
#[strum(serialize_all = "snake_case")]
pub enum Mode {
/// Sends and receives data in JSON format
@ -123,20 +135,25 @@ pub struct ActionSubcommand {
/// Represents the format that results should be returned
///
/// Currently, there are two possible formats:
///
/// 1. "json": printing out JSON for external program usage
/// 3. "shell": printing out human-readable results for interactive shell usage
/// 2. "shell": printing out human-readable results for interactive shell usage
#[structopt(
short,
long,
short,
long,
case_insensitive = true,
default_value = "shell",
default_value = Mode::Shell.into(),
possible_values = Mode::VARIANTS
)]
pub mode: Mode,
/// Represents the medium for retrieving a session for use in performing the action
#[structopt(long, default_value = "file", possible_values = SessionSharing::VARIANTS)]
pub session: SessionSharing,
#[structopt(
long,
default_value = SessionInput::File.into(),
possible_values = SessionInput::VARIANTS
)]
pub session: SessionInput,
/// If specified, commands to send are sent over stdin and responses are received
/// over stdout (and stderr if mode is shell)
@ -149,7 +166,7 @@ pub struct ActionSubcommand {
}
/// Represents options for binding a server to an IP address
#[derive(Copy, Clone, Debug, Display, PartialEq, Eq)]
#[derive(Copy, Clone, Debug, Display, PartialEq, Eq, IsVariant)]
pub enum BindAddress {
#[display(fmt = "ssh")]
Ssh,
@ -202,9 +219,48 @@ impl FromStr for BindAddress {
}
/// Represents the means by which to share the session from launching on a remote machine
#[derive(Copy, Clone, Debug, Display, PartialEq, Eq, IntoStaticStr, IsVariant, EnumString, EnumVariantNames)]
#[derive(
Copy,
Clone,
Debug,
Display,
PartialEq,
Eq,
IntoStaticStr,
IsVariant,
EnumString,
EnumVariantNames,
)]
#[strum(serialize_all = "snake_case")]
pub enum SessionSharing {
pub enum SessionOutput {
/// Session is in a file in the form of `DISTANT DATA <host> <port> <auth key>`
File,
/// Special scenario where the session is not shared but is instead kept within the
/// launch program, causing the program itself to listen on stdin for input rather
/// than terminating
Keep,
/// Session is stored and retrieved over anonymous pipes (stdout/stdin)
/// in form of `DISTANT DATA <host> <port> <auth key>`
Pipe,
}
/// Represents the means by which to consume a session when performing an action
#[derive(
Copy,
Clone,
Debug,
Display,
PartialEq,
Eq,
IntoStaticStr,
IsVariant,
EnumString,
EnumVariantNames,
)]
#[strum(serialize_all = "snake_case")]
pub enum SessionInput {
/// Session is in a environment variables
///
/// * `DISTANT_HOST=<host>`
@ -215,28 +271,33 @@ pub enum SessionSharing {
/// Session is in a file in the form of `DISTANT DATA <host> <port> <auth key>`
File,
/// Session is stored and retrieved over anonymous pipes (stdout/stdin)
/// Session is stored and retrieved over anonymous pipes (stdout/stdin)
/// in form of `DISTANT DATA <host> <port> <auth key>`
Pipe,
}
impl SessionSharing {
/// Represents session configurations that can be used for output
pub fn output_variants() -> Vec<&'static str> {
vec![Self::File.into(), Self::Pipe.into()]
}
}
/// Represents subcommand to launch a remote server
#[derive(Debug, StructOpt)]
pub struct LaunchSubcommand {
/// Represents the medium for sharing the session upon launching on a remote machine
#[structopt(
long,
default_value = SessionSharing::File.into(),
possible_values = &SessionSharing::output_variants()
long,
default_value = SessionOutput::File.into(),
possible_values = SessionOutput::VARIANTS
)]
pub session: SessionSharing,
pub session: SessionOutput,
/// Represents the format that results should be returned when session is "keep",
/// causing the launcher to enter an interactive loop to handle input and output
/// itself rather than enabling other clients to connect
#[structopt(
short,
long,
case_insensitive = true,
default_value = Mode::Shell.into(),
possible_values = Mode::VARIANTS
)]
pub mode: Mode,
/// Path to remote program to execute via ssh
#[structopt(short, long, default_value = "distant")]
@ -284,8 +345,8 @@ pub struct LaunchSubcommand {
/// Represents some range of ports
#[derive(Clone, Debug, Display, PartialEq, Eq)]
#[display(
fmt = "{}{}",
start,
fmt = "{}{}",
start,
"end.as_ref().map(|end| format!(\"[:{}]\", end)).unwrap_or_default()"
)]
pub struct PortRange {
@ -383,11 +444,6 @@ pub struct ListenSubcommand {
/// With -p 0, the server will let the operating system pick an available TCP port.
///
/// Please note that this option does not affect the server-side port used by SSH
#[structopt(
short,
long,
value_name = "PORT[:PORT2]",
default_value = "8080:8099"
)]
#[structopt(short, long, value_name = "PORT[:PORT2]", default_value = "8080:8099")]
pub port: PortRange,
}

@ -1,10 +1,9 @@
use crate::{
data::{Request, RequestPayload, Response, ResponsePayload},
net::{Client, TransportError},
opt::{ActionSubcommand, CommonOpt, Mode, SessionSharing},
session::{Session, SessionFile},
net::Client,
opt::Mode,
};
use derive_more::{Display, Error, From};
use derive_more::IsVariant;
use log::*;
use structopt::StructOpt;
use tokio::{
@ -16,98 +15,26 @@ use tokio::{
};
use tokio_stream::StreamExt;
#[derive(Debug, Display, Error, From)]
pub enum Error {
IoError(io::Error),
TransportError(TransportError),
#[display(fmt = "Non-interactive but no operation supplied")]
MissingOperation,
}
pub fn run(cmd: ActionSubcommand, opt: CommonOpt) -> Result<(), Error> {
let rt = tokio::runtime::Runtime::new()?;
rt.block_on(async { run_async(cmd, opt).await })
}
async fn run_async(cmd: ActionSubcommand, _opt: CommonOpt) -> Result<(), Error> {
let session = match cmd.session {
SessionSharing::Environment => Session::from_environment()?,
SessionSharing::File => SessionFile::load().await?.into(),
SessionSharing::Pipe => Session::from_stdin()?,
};
let mut client = Client::connect(session).await?;
if !cmd.interactive && cmd.operation.is_none() {
return Err(Error::MissingOperation);
}
// Special conditions for continuing to process responses
let mut is_proc_req = false;
let mut proc_id = 0;
if let Some(req) = cmd.operation.map(Request::from) {
is_proc_req = req.payload.is_proc_run();
trace!("Client sending request: {:?}", req);
let res = client.send(req).await?;
// Store the spawned process id for using in sending stdin (if we spawned a proc)
proc_id = match &res.payload {
ResponsePayload::ProcStart { id } => *id,
_ => 0,
};
format_response(cmd.mode, res)?.print();
}
// If we are executing a process, we want to continue interacting via stdin and receiving
// results via stdout/stderr
//
// If we are interactive, we want to continue looping regardless
if is_proc_req || cmd.interactive {
interactive_loop(client, proc_id, cmd.mode, cmd.interactive).await?;
}
Ok(())
#[derive(Copy, Clone, PartialEq, Eq, IsVariant)]
pub enum LoopConfig {
Json,
Proc { id: usize },
Shell,
}
fn spawn_stdin_reader() -> mpsc::Receiver<String> {
let (tx, rx) = mpsc::channel(1);
// NOTE: Using blocking I/O per tokio's advice to read from stdin line-by-line and then
// pass the results to a separate async handler to forward to the remote process
std::thread::spawn(move || {
let stdin = std::io::stdin();
loop {
let mut line = String::new();
match stdin.read_line(&mut line) {
Ok(0) | Err(_) => break,
Ok(_) => {
if let Err(x) = tx.blocking_send(line) {
error!(
"Failed to pass along stdin to be sent to remote process: {}",
x
);
}
std::thread::yield_now();
}
}
impl From<LoopConfig> for Mode {
fn from(config: LoopConfig) -> Self {
match config {
LoopConfig::Json => Self::Json,
LoopConfig::Proc { .. } | LoopConfig::Shell => Self::Shell,
}
});
rx
}
}
async fn interactive_loop(
mut client: Client,
id: usize,
mode: Mode,
interactive: bool,
) -> Result<(), Error> {
/// Starts a new action loop that processes requests and receives responses
///
/// id represents the id of a remote process
pub async fn interactive_loop(mut client: Client, config: LoopConfig) -> io::Result<()> {
let mut stream = client.to_response_stream();
// Create a channel that can report when we should stop the loop based on a received request
@ -117,7 +44,7 @@ async fn interactive_loop(
let mut rx = spawn_stdin_reader();
tokio::spawn(async move {
while let Some(line) = rx.recv().await {
match mode {
match config {
// Special exit condition for interactive mode
_ if line.trim() == "exit" => {
if let Err(_) = tx_stop.send(()) {
@ -127,13 +54,13 @@ async fn interactive_loop(
}
// For json mode, all stdin is treated as individual requests
Mode::Json => {
LoopConfig::Json => {
trace!("Client sending request: {:?}", line);
let result = serde_json::from_str(&line)
.map_err(|x| io::Error::new(io::ErrorKind::InvalidData, x));
match result {
Ok(req) => match client.send(req).await {
Ok(res) => match format_response(mode, res) {
Ok(res) => match format_response(Mode::Json, res) {
Ok(out) => out.print(),
Err(x) => error!("Failed to format response: {}", x),
},
@ -148,7 +75,7 @@ async fn interactive_loop(
}
// For interactive shell mode, parse stdin as individual commands
Mode::Shell if interactive => {
LoopConfig::Shell => {
if line.trim().is_empty() {
continue;
}
@ -163,7 +90,7 @@ async fn interactive_loop(
);
match payload_result {
Ok(payload) => match client.send(Request::from(payload)).await {
Ok(res) => match format_response(mode, res) {
Ok(res) => match format_response(Mode::Shell, res) {
Ok(out) => out.print(),
Err(x) => error!("Failed to format response: {}", x),
},
@ -178,7 +105,7 @@ async fn interactive_loop(
}
// For non-interactive shell mode, all stdin is treated as a proc's stdin
Mode::Shell => {
LoopConfig::Proc { id } => {
trace!("Client sending stdin: {:?}", line);
let req = Request::from(RequestPayload::ProcStdin {
id,
@ -202,9 +129,9 @@ async fn interactive_loop(
"Response stream no longer available",
)
})?;
let done = res.payload.is_proc_done() && !interactive;
let done = res.payload.is_proc_done() && config.is_proc();
format_response(mode, res)?.print();
format_response(config.into(), res)?.print();
// If we aren't interactive but are just running a proc and
// we've received the end of the proc, we should exit
@ -221,8 +148,36 @@ async fn interactive_loop(
Ok(())
}
fn spawn_stdin_reader() -> mpsc::Receiver<String> {
let (tx, rx) = mpsc::channel(1);
// NOTE: Using blocking I/O per tokio's advice to read from stdin line-by-line and then
// pass the results to a separate async handler to forward to the remote process
std::thread::spawn(move || {
let stdin = std::io::stdin();
loop {
let mut line = String::new();
match stdin.read_line(&mut line) {
Ok(0) | Err(_) => break,
Ok(_) => {
if let Err(x) = tx.blocking_send(line) {
error!(
"Failed to pass along stdin to be sent to remote process: {}",
x
);
}
std::thread::yield_now();
}
}
}
});
rx
}
/// Represents the output content and destination
enum ResponseOut {
pub enum ResponseOut {
Stdout(String),
Stderr(String),
None,
@ -238,7 +193,7 @@ impl ResponseOut {
}
}
fn format_response(mode: Mode, res: Response) -> io::Result<ResponseOut> {
pub fn format_response(mode: Mode, res: Response) -> io::Result<ResponseOut> {
Ok(match mode {
Mode::Json => ResponseOut::Stdout(format!(
"{}\n",

@ -0,0 +1,74 @@
use crate::{
data::{Request, ResponsePayload},
net::{Client, TransportError},
opt::{ActionSubcommand, CommonOpt, Mode, SessionInput},
session::{Session, SessionFile},
};
use derive_more::{Display, Error, From};
use log::*;
use tokio::io;
pub(crate) mod inner;
#[derive(Debug, Display, Error, From)]
pub enum Error {
IoError(io::Error),
TransportError(TransportError),
#[display(fmt = "Non-interactive but no operation supplied")]
MissingOperation,
}
pub fn run(cmd: ActionSubcommand, opt: CommonOpt) -> Result<(), Error> {
let rt = tokio::runtime::Runtime::new()?;
rt.block_on(async { run_async(cmd, opt).await })
}
async fn run_async(cmd: ActionSubcommand, _opt: CommonOpt) -> Result<(), Error> {
let session = match cmd.session {
SessionInput::Environment => Session::from_environment()?,
SessionInput::File => SessionFile::load().await?.into(),
SessionInput::Pipe => Session::from_stdin()?,
};
let mut client = Client::connect(session).await?;
if !cmd.interactive && cmd.operation.is_none() {
return Err(Error::MissingOperation);
}
// Special conditions for continuing to process responses
let mut is_proc_req = false;
let mut proc_id = 0;
if let Some(req) = cmd.operation.map(Request::from) {
is_proc_req = req.payload.is_proc_run();
trace!("Client sending request: {:?}", req);
let res = client.send(req).await?;
// Store the spawned process id for using in sending stdin (if we spawned a proc)
proc_id = match &res.payload {
ResponsePayload::ProcStart { id } => *id,
_ => 0,
};
inner::format_response(cmd.mode, res)?.print();
}
// If we are executing a process, we want to continue interacting via stdin and receiving
// results via stdout/stderr
//
// If we are interactive, we want to continue looping regardless
if is_proc_req || cmd.interactive {
let config = match cmd.mode {
Mode::Json => inner::LoopConfig::Json,
Mode::Shell if cmd.interactive => inner::LoopConfig::Shell,
Mode::Shell => inner::LoopConfig::Proc { id: proc_id },
};
inner::interactive_loop(client, config).await?;
}
Ok(())
}

@ -1,5 +1,6 @@
use crate::{
opt::{CommonOpt, LaunchSubcommand, SessionSharing},
net::Client,
opt::{CommonOpt, LaunchSubcommand, Mode, SessionOutput},
session::{Session, SessionFile},
};
use derive_more::{Display, Error, From};
@ -66,12 +67,18 @@ async fn run_async(cmd: LaunchSubcommand, _opt: CommonOpt) -> Result<(), Error>
session.host = cmd.host;
// Handle sharing resulting session in different ways
// NOTE: Environment is unreachable here as we disallow it from the defined options since
// there is no way to set the shell's environment variables, only this running process
match cmd.session {
SessionSharing::Environment => unreachable!(),
SessionSharing::File => SessionFile::from(session).save().await?,
SessionSharing::Pipe => println!("{}", session.to_unprotected_string()),
SessionOutput::File => SessionFile::from(session).save().await?,
SessionOutput::Keep => {
use crate::subcommand::action::inner;
let client = Client::connect(session).await?;
let config = match cmd.mode {
Mode::Json => inner::LoopConfig::Json,
Mode::Shell => inner::LoopConfig::Shell,
};
inner::interactive_loop(client, config).await?;
}
SessionOutput::Pipe => println!("{}", session.to_unprotected_string()),
}
Ok(())

Loading…
Cancel
Save