diff --git a/CHANGELOG.md b/CHANGELOG.md index e5d4757..cf1d26d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,15 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `SystemInfo` via ssh backend now reports os when windows detected - `Capabilities` request/response for server and manager that report back the capabilities (and descriptions) supported by the server or manager +- `Search` and `CancelSearch` request/response for server that performs a + search using `grep` crate against paths or file contents, returning results + back as a stream + - New `Searcher` available as part of distant client interface to support + performing a search and getting back results + - Updated `DistantChannelExt` to support creating a `Searcher` and canceling + an ongoing search query + - `distant client action search` now supported, waiting for results and + printing them out ### Changed @@ -37,9 +46,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed - `shutdown-after` replaced with `shutdown` that supports three options: - 1. `never` - server will never shutdown automatically - 2. `after=N` - server will shutdown after N seconds - 3. `lonely=N` - server will shutdown N seconds after no connections + 1. `never` - server will never shutdown automatically + 2. `after=N` - server will shutdown after N seconds + 3. `lonely=N` - server will shutdown N seconds after no connections ## [0.17.6] - 2022-08-18 ### Fixed diff --git a/Cargo.lock b/Cargo.lock index fcff2fc..ddcada3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -761,6 +761,7 @@ dependencies = [ "distant-net", "flexi_logger", "futures", + "grep", "hex", "indoc", "log", @@ -905,6 +906,24 @@ version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a357d28ed41a50f9c765dbfe56cbc04a64e53e5fc58ba79fbc34c10ef3df831f" +[[package]] +name = "encoding_rs" +version = "0.8.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9852635589dc9f9ea1b6fe9f05b50ef208c85c834a562f0c6abb1c475736ec2b" +dependencies = [ + "cfg-if 1.0.0", +] + +[[package]] +name = "encoding_rs_io" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1cc3c5651fb62ab8aa3103998dade57efdd028544bd300516baa31840c252a83" +dependencies = [ + "encoding_rs", +] + [[package]] name = "err-derive" version = "0.3.1" @@ -1228,6 +1247,90 @@ dependencies = [ "walkdir", ] +[[package]] +name = "grep" +version = "0.2.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7cea81f81c4c120466aef365c225a05c707b002a20f2b9fe3287124b809a8d4f" +dependencies = [ + "grep-cli", + "grep-matcher", + "grep-printer", + "grep-regex", + "grep-searcher", +] + +[[package]] +name = "grep-cli" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2dd110c34bb4460d0de5062413b773e385cbf8a85a63fc535590110a09e79e8a" +dependencies = [ + "atty", + "bstr 0.2.17", + "globset", + "lazy_static", + "log", + "regex", + "same-file", + "termcolor", + "winapi-util", +] + +[[package]] +name = "grep-matcher" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d27563c33062cd33003b166ade2bb4fd82db1fd6a86db764dfdad132d46c1cc" +dependencies = [ + "memchr", +] + +[[package]] +name = "grep-printer" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05c271a24daedf5675b61a275a1d0af06e03312ab7856d15433ae6cde044dc72" +dependencies = [ + "base64", + "bstr 0.2.17", + "grep-matcher", + "grep-searcher", + "serde", + "serde_json", + "termcolor", +] + +[[package]] +name = "grep-regex" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1345f8d33c89f2d5b081f2f2a41175adef9fd0bed2fea6a26c96c2deb027e58e" +dependencies = [ + "aho-corasick", + "bstr 0.2.17", + "grep-matcher", + "log", + "regex", + "regex-syntax", + "thread_local", +] + +[[package]] +name = "grep-searcher" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "48852bd08f9b4eb3040ecb6d2f4ade224afe880a9a0909c5563cc59fa67932cc" +dependencies = [ + "bstr 0.2.17", + "bytecount", + "encoding_rs", + "encoding_rs_io", + "grep-matcher", + "log", + "memmap2", +] + [[package]] name = "group" version = "0.12.0" @@ -1513,6 +1616,15 @@ version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d" +[[package]] +name = "memmap2" +version = "0.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "95af15f345b17af2efc8ead6080fb8bc376f8cec1b35277b935637595fe77498" +dependencies = [ + "libc", +] + [[package]] name = "memmem" version = "0.1.1" diff --git a/README.md b/README.md index d31c573..aab52f8 100644 --- a/README.md +++ b/README.md @@ -27,7 +27,9 @@ talk to the server. [RustCrypto/ChaCha20Poly1305](https://github.com/RustCrypto/AEADs/tree/master/chacha20poly1305) Additionally, the core of the distant client and server codebase can be pulled -in to be used with your own Rust crates via the `distant-core` crate. +in to be used with your own Rust crates via the `distant-core` crate. The +networking library, which is agnostic of `distant` protocols, can be used via +the `distant-net` crate. ## Installation @@ -48,6 +50,41 @@ cargo install distant Alternatively, you can clone this repository and build from source following the [build guide](./BUILDING.md). +## Backend Feature Matrix + +Distant supports multiple backends to facilitate remote communication with +another server. Today, these backends include: + +* `distant` - a standalone server acting as the reference implementation +* `ssh` - a wrapper around an `ssh` client that translates the distant protocol + into ssh server requests + +Not every backend supports every feature of distant. Below is a table outlining +the available features and which backend supports each feature: + +| Feature | distant | ssh | +| --------------------- | --------| ----| +| Capabilities | ✅ | ✅ | +| Filesystem I/O | ✅ | ✅ | +| Filesystem Watching | ✅ | ✅ | +| Process Execution | ✅ | ✅ | +| Search | ✅ | ❌ | +| System Information | ✅ | ⚠ | + +* ✅ means full support +* ⚠ means partial support +* ❌ means no support + +### Feature Details + +* `Capabilities` - able to report back what it is capable of performing +* `Filesystem I/O` - able to read from and write to the filesystem +* `Filesystem Watching` - able to receive notifications when changes to the + filesystem occur +* `Process Execution` - able to execute processes +* `Search` - able to search the filesystem +* `System Information` - able to retrieve information about the system + ## Example ### Starting the manager diff --git a/distant-core/Cargo.toml b/distant-core/Cargo.toml index 79de0b9..29ec69e 100644 --- a/distant-core/Cargo.toml +++ b/distant-core/Cargo.toml @@ -21,6 +21,7 @@ bytes = "1.2.1" derive_more = { version = "0.99.17", default-features = false, features = ["as_mut", "as_ref", "deref", "deref_mut", "display", "from", "error", "into", "into_iterator", "is_variant", "try_into"] } distant-net = { version = "=0.18.0", path = "../distant-net" } futures = "0.3.21" +grep = "0.2.10" hex = "0.4.3" log = "0.4.17" notify = { version = "=5.0.0-pre.15", features = ["serde"] } diff --git a/distant-core/src/api.rs b/distant-core/src/api.rs index 3553791..f7d9b61 100644 --- a/distant-core/src/api.rs +++ b/distant-core/src/api.rs @@ -1,7 +1,7 @@ use crate::{ data::{ Capabilities, ChangeKind, DirEntry, Environment, Error, Metadata, ProcessId, PtySize, - SystemInfo, + SearchId, SearchQuery, SystemInfo, }, ConnectionId, DistantMsg, DistantRequestData, DistantResponseData, }; @@ -317,6 +317,34 @@ pub trait DistantApi { unsupported("metadata") } + /// Searches files for matches based on a query. + /// + /// * `query` - the specific query to perform + /// + /// *Override this, otherwise it will return "unsupported" as an error.* + #[allow(unused_variables)] + async fn search( + &self, + ctx: DistantCtx, + query: SearchQuery, + ) -> io::Result { + unsupported("search") + } + + /// Cancels an actively-ongoing search. + /// + /// * `id` - the id of the search to cancel + /// + /// *Override this, otherwise it will return "unsupported" as an error.* + #[allow(unused_variables)] + async fn cancel_search( + &self, + ctx: DistantCtx, + id: SearchId, + ) -> io::Result<()> { + unsupported("cancel_search") + } + /// Spawns a new process, returning its id. /// /// * `cmd` - the full command to run as a new process (including arguments) @@ -613,6 +641,18 @@ where .await .map(DistantResponseData::Metadata) .unwrap_or_else(DistantResponseData::from), + DistantRequestData::Search { query } => server + .api + .search(ctx, query) + .await + .map(|id| DistantResponseData::SearchStarted { id }) + .unwrap_or_else(DistantResponseData::from), + DistantRequestData::CancelSearch { id } => server + .api + .cancel_search(ctx, id) + .await + .map(|_| DistantResponseData::Ok) + .unwrap_or_else(DistantResponseData::from), DistantRequestData::ProcSpawn { cmd, environment, diff --git a/distant-core/src/api/local.rs b/distant-core/src/api/local.rs index 974297c..c744330 100644 --- a/distant-core/src/api/local.rs +++ b/distant-core/src/api/local.rs @@ -1,7 +1,7 @@ use crate::{ data::{ Capabilities, ChangeKind, ChangeKindSet, DirEntry, Environment, FileType, Metadata, - ProcessId, PtySize, SystemInfo, + ProcessId, PtySize, SearchId, SearchQuery, SystemInfo, }, DistantApi, DistantCtx, }; @@ -427,6 +427,29 @@ impl DistantApi for LocalDistantApi { Metadata::read(path, canonicalize, resolve_file_type).await } + async fn search( + &self, + ctx: DistantCtx, + query: SearchQuery, + ) -> io::Result { + debug!( + "[Conn {}] Performing search via {query:?}", + ctx.connection_id, + ); + + self.state.search.start(query, ctx.reply).await + } + + async fn cancel_search( + &self, + ctx: DistantCtx, + id: SearchId, + ) -> io::Result<()> { + debug!("[Conn {}] Cancelling search {id}", ctx.connection_id,); + + self.state.search.cancel(id).await + } + async fn proc_spawn( &self, ctx: DistantCtx, diff --git a/distant-core/src/api/local/state.rs b/distant-core/src/api/local/state.rs index e48fb7f..98484e2 100644 --- a/distant-core/src/api/local/state.rs +++ b/distant-core/src/api/local/state.rs @@ -1,9 +1,15 @@ -use crate::{data::ProcessId, ConnectionId}; +use crate::{ + data::{ProcessId, SearchId}, + ConnectionId, +}; use std::{io, path::PathBuf}; mod process; pub use process::*; +mod search; +pub use search::*; + mod watcher; pub use watcher::*; @@ -12,6 +18,9 @@ pub struct GlobalState { /// State that holds information about processes running on the server pub process: ProcessState, + /// State that holds information about searches running on the server + pub search: SearchState, + /// Watcher used for filesystem events pub watcher: WatcherState, } @@ -20,6 +29,7 @@ impl GlobalState { pub fn initialize() -> io::Result { Ok(Self { process: ProcessState::new(), + search: SearchState::new(), watcher: WatcherState::initialize()?, }) } @@ -34,6 +44,9 @@ pub struct ConnectionState { /// Channel connected to global process state pub(crate) process_channel: ProcessChannel, + /// Channel connected to global search state + pub(crate) search_channel: SearchChannel, + /// Channel connected to global watcher state pub(crate) watcher_channel: WatcherChannel, @@ -42,6 +55,9 @@ pub struct ConnectionState { /// Contains paths being watched that will be unwatched when the connection is closed paths: Vec, + + /// Contains ids of searches that will be terminated when the connection is closed + searches: Vec, } impl Drop for ConnectionState { @@ -49,8 +65,10 @@ impl Drop for ConnectionState { let id = self.id; let processes: Vec = self.processes.drain(..).collect(); let paths: Vec = self.paths.drain(..).collect(); + let searches: Vec = self.searches.drain(..).collect(); let process_channel = self.process_channel.clone(); + let search_channel = self.search_channel.clone(); let watcher_channel = self.watcher_channel.clone(); // NOTE: We cannot (and should not) block during drop to perform cleanup, @@ -60,6 +78,10 @@ impl Drop for ConnectionState { let _ = process_channel.kill(id).await; } + for id in searches { + let _ = search_channel.cancel(id).await; + } + for path in paths { let _ = watcher_channel.unwatch(id, path).await; } diff --git a/distant-core/src/api/local/state/search.rs b/distant-core/src/api/local/state/search.rs new file mode 100644 index 0000000..9d3aa8a --- /dev/null +++ b/distant-core/src/api/local/state/search.rs @@ -0,0 +1,1765 @@ +use crate::data::{ + DistantResponseData, SearchId, SearchQuery, SearchQueryContentsMatch, SearchQueryMatch, + SearchQueryMatchData, SearchQueryOptions, SearchQueryPathMatch, SearchQuerySubmatch, + SearchQueryTarget, +}; +use distant_net::Reply; +use grep::{ + matcher::Matcher, + regex::RegexMatcher, + searcher::{Searcher, Sink, SinkMatch}, +}; +use log::*; +use std::{collections::HashMap, io, ops::Deref, path::Path}; +use tokio::{ + sync::{mpsc, oneshot}, + task::JoinHandle, +}; +use walkdir::{DirEntry, WalkDir}; + +/// Holds information related to active searches on the server +pub struct SearchState { + channel: SearchChannel, + task: JoinHandle<()>, +} + +impl Drop for SearchState { + /// Aborts the task that handles search operations and management + fn drop(&mut self) { + self.abort(); + } +} + +impl SearchState { + pub fn new() -> Self { + let (tx, rx) = mpsc::channel(1); + let task = tokio::spawn(search_task(tx.clone(), rx)); + + Self { + channel: SearchChannel { tx }, + task, + } + } + + #[allow(dead_code)] + pub fn clone_channel(&self) -> SearchChannel { + self.channel.clone() + } + + /// Aborts the process task + pub fn abort(&self) { + self.task.abort(); + } +} + +impl Deref for SearchState { + type Target = SearchChannel; + + fn deref(&self) -> &Self::Target { + &self.channel + } +} + +#[derive(Clone)] +pub struct SearchChannel { + tx: mpsc::Sender, +} + +impl Default for SearchChannel { + /// Creates a new channel that is closed by default + fn default() -> Self { + let (tx, _) = mpsc::channel(1); + Self { tx } + } +} + +impl SearchChannel { + /// Starts a new search using the provided query + pub async fn start( + &self, + query: SearchQuery, + reply: Box>, + ) -> io::Result { + let (cb, rx) = oneshot::channel(); + self.tx + .send(InnerSearchMsg::Start { + query: Box::new(query), + reply, + cb, + }) + .await + .map_err(|_| io::Error::new(io::ErrorKind::Other, "Internal search task closed"))?; + rx.await + .map_err(|_| io::Error::new(io::ErrorKind::Other, "Response to start dropped"))? + } + + /// Cancels an active search + pub async fn cancel(&self, id: SearchId) -> io::Result<()> { + let (cb, rx) = oneshot::channel(); + self.tx + .send(InnerSearchMsg::Cancel { id, cb }) + .await + .map_err(|_| io::Error::new(io::ErrorKind::Other, "Internal search task closed"))?; + rx.await + .map_err(|_| io::Error::new(io::ErrorKind::Other, "Response to cancel dropped"))? + } +} + +/// Internal message to pass to our task below to perform some action +enum InnerSearchMsg { + Start { + query: Box, + reply: Box>, + cb: oneshot::Sender>, + }, + Cancel { + id: SearchId, + cb: oneshot::Sender>, + }, + InternalRemove { + id: SearchId, + }, +} + +async fn search_task(tx: mpsc::Sender, mut rx: mpsc::Receiver) { + let mut searches: HashMap> = HashMap::new(); + + while let Some(msg) = rx.recv().await { + match msg { + InnerSearchMsg::Start { query, reply, cb } => { + let options = query.options.clone(); + + // Build our executor and send an error if it fails + let mut executor = match SearchQueryExecutor::new(*query) { + Ok(executor) => executor, + Err(x) => { + let _ = cb.send(Err(x)); + return; + } + }; + + // Get the unique search id + let id = executor.id(); + + // Queue up our search internally with a cancel sender + searches.insert(id, executor.take_cancel_tx().unwrap()); + + // Report back the search id + let _ = cb.send(Ok(id)); + + // Spawn our reporter of matches coming from the executor + SearchQueryReporter { + id, + options, + rx: executor.take_match_rx().unwrap(), + reply, + } + .spawn(); + + // Spawn our executor to run + executor.spawn(tx.clone()); + } + InnerSearchMsg::Cancel { id, cb } => { + let _ = cb.send(match searches.remove(&id) { + Some(tx) => { + let _ = tx.send(()); + Ok(()) + } + None => Err(io::Error::new( + io::ErrorKind::Other, + format!("[Query {id}] Cancellation failed because no search found"), + )), + }); + } + InnerSearchMsg::InternalRemove { id } => { + trace!("[Query {id}] Removing internal tracking"); + searches.remove(&id); + } + } + } +} + +struct SearchQueryReporter { + id: SearchId, + options: SearchQueryOptions, + rx: mpsc::UnboundedReceiver, + reply: Box>, +} + +impl SearchQueryReporter { + /// Runs the reporter to completion in an async task + pub fn spawn(self) { + tokio::spawn(self.run()); + } + + async fn run(self) { + let Self { + id, + options, + mut rx, + reply, + } = self; + + // Queue of matches that we hold until reaching pagination + let mut matches = Vec::new(); + let mut total_matches_cnt = 0; + + trace!("[Query {id}] Starting reporter with {options:?}"); + while let Some(m) = rx.recv().await { + matches.push(m); + total_matches_cnt += 1; + + // Check if we've reached the limit, and quit if we have + if let Some(len) = options.limit { + if total_matches_cnt >= len { + trace!("[Query {id}] Reached limit of {len} matches"); + break; + } + } + + // Check if we've reached pagination size, and send queued if so + if let Some(len) = options.pagination { + if matches.len() as u64 >= len { + trace!("[Query {id}] Reached {len} paginated matches"); + if let Err(x) = reply + .send(DistantResponseData::SearchResults { + id, + matches: std::mem::take(&mut matches), + }) + .await + { + error!("[Query {id}] Failed to send paginated matches: {x}"); + } + } + } + } + + // Send any remaining matches + if !matches.is_empty() { + trace!("[Query {id}] Sending {} remaining matches", matches.len()); + if let Err(x) = reply + .send(DistantResponseData::SearchResults { id, matches }) + .await + { + error!("[Query {id}] Failed to send final matches: {x}"); + } + } + + // Report that we are done + trace!("[Query {id}] Reporting as done"); + if let Err(x) = reply.send(DistantResponseData::SearchDone { id }).await { + error!("[Query {id}] Failed to send done status: {x}"); + } + } +} + +struct SearchQueryExecutor { + id: SearchId, + query: SearchQuery, + walk_dir: WalkDir, + matcher: RegexMatcher, + + cancel_tx: Option>, + cancel_rx: oneshot::Receiver<()>, + + match_tx: mpsc::UnboundedSender, + match_rx: Option>, +} + +impl SearchQueryExecutor { + /// Creates a new executor + pub fn new(query: SearchQuery) -> io::Result { + let (cancel_tx, cancel_rx) = oneshot::channel(); + let (match_tx, match_rx) = mpsc::unbounded_channel(); + + let path = query.path.as_path(); + let follow_links = query.options.follow_symbolic_links; + let regex = query.condition.to_regex_string(); + + let matcher = RegexMatcher::new(®ex) + .map_err(|x| io::Error::new(io::ErrorKind::InvalidInput, x))?; + let walk_dir = WalkDir::new(path).follow_links(follow_links); + + let walk_dir = match query.options.min_depth.as_ref().copied() { + Some(depth) => walk_dir.min_depth(depth as usize), + None => walk_dir, + }; + + let walk_dir = match query.options.max_depth.as_ref().copied() { + Some(depth) => walk_dir.max_depth(depth as usize), + None => walk_dir, + }; + + Ok(Self { + id: rand::random(), + query, + matcher, + walk_dir, + + cancel_tx: Some(cancel_tx), + cancel_rx, + + match_tx, + match_rx: Some(match_rx), + }) + } + + pub fn id(&self) -> SearchId { + self.id + } + + pub fn take_cancel_tx(&mut self) -> Option> { + self.cancel_tx.take() + } + + pub fn take_match_rx(&mut self) -> Option> { + self.match_rx.take() + } + + /// Runs the executor to completion in another thread + pub fn spawn(self, tx: mpsc::Sender) { + tokio::task::spawn_blocking(move || { + let id = self.id; + self.run(); + + // Once complete, we need to send a request to remove the search from our list + let _ = tx.blocking_send(InnerSearchMsg::InternalRemove { id }); + }); + } + + fn run(self) { + let id = self.id; + let walk_dir = self.walk_dir; + let tx = self.match_tx; + let mut cancel = self.cancel_rx; + + // Create our path filter we will use to filter out entries that do not match filter + let include_path_filter = match self.query.options.include.as_ref() { + Some(condition) => match SearchQueryPathFilter::new(&condition.to_regex_string()) { + Ok(filter) => { + trace!("[Query {id}] Using regex include path filter for {condition:?}"); + filter + } + Err(x) => { + error!("[Query {id}] Failed to instantiate include path filter: {x}"); + return; + } + }, + None => { + trace!("[Query {id}] Using fixed include path filter of true"); + SearchQueryPathFilter::fixed(true) + } + }; + + // Create our path filter we will use to filter out entries that match filter + let exclude_path_filter = match self.query.options.exclude.as_ref() { + Some(condition) => match SearchQueryPathFilter::new(&condition.to_regex_string()) { + Ok(filter) => { + trace!("[Query {id}] Using regex exclude path filter for {condition:?}"); + filter + } + Err(x) => { + error!("[Query {id}] Failed to instantiate exclude path filter: {x}"); + return; + } + }, + None => { + trace!("[Query {id}] Using fixed exclude path filter of false"); + SearchQueryPathFilter::fixed(false) + } + }; + + let options_filter = SearchQueryOptionsFilter { + target: self.query.target, + options: self.query.options.clone(), + }; + + // Search all entries for matches and report them + for entry in walk_dir + .into_iter() + .filter_map(|e| e.ok()) + .filter(|e| include_path_filter.filter(e.path())) + .filter(|e| !exclude_path_filter.filter(e.path())) + .filter(|e| options_filter.filter(e)) + { + // Check if we are being interrupted, and if so exit our loop early + match cancel.try_recv() { + Err(oneshot::error::TryRecvError::Empty) => (), + _ => { + debug!("[Query {id}] Cancelled"); + break; + } + } + + let res = match self.query.target { + // Perform the search against the path itself + SearchQueryTarget::Path => { + let path_str = entry.path().to_string_lossy(); + Searcher::new().search_slice( + &self.matcher, + path_str.as_bytes(), + SearchQueryPathSink { + search_id: id, + path: entry.path(), + matcher: &self.matcher, + callback: |m| Ok(tx.send(m).is_ok()), + }, + ) + } + + // Perform the search against the file's contents + SearchQueryTarget::Contents => Searcher::new().search_path( + &self.matcher, + entry.path(), + SearchQueryContentsSink { + search_id: id, + path: entry.path(), + matcher: &self.matcher, + callback: |m| Ok(tx.send(m).is_ok()), + }, + ), + }; + + if let Err(x) = res { + error!("[Query {id}] Search failed for {:?}: {x}", entry.path()); + } + } + } +} + +struct SearchQueryPathFilter { + matcher: Option, + default_value: bool, +} + +impl SearchQueryPathFilter { + pub fn new(regex: &str) -> io::Result { + Ok(Self { + matcher: Some( + RegexMatcher::new(regex) + .map_err(|x| io::Error::new(io::ErrorKind::InvalidInput, x))?, + ), + default_value: false, + }) + } + + /// Returns a filter that always returns `value` + pub fn fixed(value: bool) -> Self { + Self { + matcher: None, + default_value: value, + } + } + + /// Returns true if path passes the filter + pub fn filter(&self, path: impl AsRef) -> bool { + self.try_filter(path).unwrap_or(false) + } + + fn try_filter(&self, path: impl AsRef) -> io::Result { + match &self.matcher { + Some(matcher) => matcher + .is_match(path.as_ref().to_string_lossy().as_bytes()) + .map_err(|x| io::Error::new(io::ErrorKind::Other, x)), + None => Ok(self.default_value), + } + } +} + +struct SearchQueryOptionsFilter { + target: SearchQueryTarget, + options: SearchQueryOptions, +} + +impl SearchQueryOptionsFilter { + pub fn filter(&self, entry: &DirEntry) -> bool { + // Check if filetype is allowed + let file_type_allowed = self.options.allowed_file_types.is_empty() + || self + .options + .allowed_file_types + .contains(&entry.file_type().into()); + + // Check if target is appropriate + let targeted = match self.target { + SearchQueryTarget::Contents => entry.file_type().is_file(), + _ => true, + }; + + file_type_allowed && targeted + } +} + +#[derive(Clone, Debug)] +struct SearchQueryPathSink<'a, M, F> +where + M: Matcher, + F: FnMut(SearchQueryMatch) -> Result, +{ + search_id: SearchId, + path: &'a Path, + matcher: &'a M, + callback: F, +} + +impl<'a, M, F> Sink for SearchQueryPathSink<'a, M, F> +where + M: Matcher, + F: FnMut(SearchQueryMatch) -> Result, +{ + type Error = io::Error; + + fn matched(&mut self, _searcher: &Searcher, mat: &SinkMatch<'_>) -> Result { + let mut submatches = Vec::new(); + + // Find all matches within the line + let res = self.matcher.find_iter(mat.bytes(), |m| { + let bytes = &mat.bytes()[m]; + submatches.push(SearchQuerySubmatch { + r#match: match std::str::from_utf8(bytes) { + Ok(s) => SearchQueryMatchData::Text(s.to_string()), + Err(_) => SearchQueryMatchData::Bytes(bytes.to_vec()), + }, + start: m.start() as u64, + end: m.end() as u64, + }); + + true + }); + + if let Err(x) = res { + error!( + "[Query {}] SearchQueryPathSink encountered matcher error: {x}", + self.search_id + ); + } + + // If we have at least one submatch, then we have a match + let should_continue = if !submatches.is_empty() { + let r#match = SearchQueryMatch::Path(SearchQueryPathMatch { + path: self.path.to_path_buf(), + submatches, + }); + + (self.callback)(r#match)? + } else { + true + }; + + Ok(should_continue) + } +} + +#[derive(Clone, Debug)] +struct SearchQueryContentsSink<'a, M, F> +where + M: Matcher, + F: FnMut(SearchQueryMatch) -> Result, +{ + search_id: SearchId, + path: &'a Path, + matcher: &'a M, + callback: F, +} + +impl<'a, M, F> Sink for SearchQueryContentsSink<'a, M, F> +where + M: Matcher, + F: FnMut(SearchQueryMatch) -> Result, +{ + type Error = io::Error; + + fn matched(&mut self, _searcher: &Searcher, mat: &SinkMatch<'_>) -> Result { + let mut submatches = Vec::new(); + + // Find all matches within the line + let res = self.matcher.find_iter(mat.bytes(), |m| { + let bytes = &mat.bytes()[m]; + submatches.push(SearchQuerySubmatch { + r#match: match std::str::from_utf8(bytes) { + Ok(s) => SearchQueryMatchData::Text(s.to_string()), + Err(_) => SearchQueryMatchData::Bytes(bytes.to_vec()), + }, + start: m.start() as u64, + end: m.end() as u64, + }); + + true + }); + + if let Err(x) = res { + error!( + "[Query {}] SearchQueryContentsSink encountered matcher error: {x}", + self.search_id + ); + } + + // If we have at least one submatch, then we have a match + let should_continue = if !submatches.is_empty() { + let r#match = SearchQueryMatch::Contents(SearchQueryContentsMatch { + path: self.path.to_path_buf(), + lines: match std::str::from_utf8(mat.bytes()) { + Ok(s) => SearchQueryMatchData::Text(s.to_string()), + Err(_) => SearchQueryMatchData::Bytes(mat.bytes().to_vec()), + }, + + // NOTE: Since we are defining the searcher, we control always including the line + // number, so we can safely unwrap here + line_number: mat.line_number().unwrap(), + + // NOTE: absolute_byte_offset from grep tells us where the bytes start for the + // match, but not inclusive of where within the match + absolute_offset: mat.absolute_byte_offset(), + submatches, + }); + + (self.callback)(r#match)? + } else { + true + }; + + Ok(should_continue) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::data::{FileType, SearchQueryCondition, SearchQueryMatchData}; + use assert_fs::prelude::*; + use std::path::PathBuf; + + fn make_path(path: &str) -> PathBuf { + use std::path::MAIN_SEPARATOR; + + // Ensure that our path is compliant with the current platform + let path = path.replace('/', &MAIN_SEPARATOR.to_string()); + + PathBuf::from(path) + } + + fn setup_dir(files: Vec<(&str, &str)>) -> assert_fs::TempDir { + let root = assert_fs::TempDir::new().unwrap(); + + for (path, contents) in files { + root.child(make_path(path)).write_str(contents).unwrap(); + } + + root + } + + fn get_matches(data: DistantResponseData) -> Vec { + match data { + DistantResponseData::SearchResults { matches, .. } => matches, + x => panic!("Did not get search results: {x:?}"), + } + } + + #[tokio::test] + async fn should_send_event_when_query_finished() { + let root = setup_dir(Vec::new()); + + let state = SearchState::new(); + let (reply, mut rx) = mpsc::channel(100); + + let query = SearchQuery { + path: root.path().to_path_buf(), + target: SearchQueryTarget::Path, + condition: SearchQueryCondition::equals(""), + options: Default::default(), + }; + + let search_id = state.start(query, Box::new(reply)).await.unwrap(); + + let data = rx.recv().await; + assert_eq!( + data, + Some(DistantResponseData::SearchDone { id: search_id }) + ); + + assert_eq!(rx.recv().await, None); + } + + #[tokio::test] + async fn should_send_all_matches_at_once_by_default() { + let root = setup_dir(vec![ + ("path/to/file1.txt", ""), + ("path/to/file2.txt", ""), + ("other/file.txt", ""), + ("dir/other/bin", ""), + ]); + + let state = SearchState::new(); + let (reply, mut rx) = mpsc::channel(100); + + let query = SearchQuery { + path: root.path().to_path_buf(), + target: SearchQueryTarget::Path, + condition: SearchQueryCondition::regex("other"), + options: Default::default(), + }; + + let search_id = state.start(query, Box::new(reply)).await.unwrap(); + + let mut matches = get_matches(rx.recv().await.unwrap()) + .into_iter() + .filter_map(|m| m.into_path_match()) + .collect::>(); + + matches.sort_unstable_by_key(|m| m.path.to_path_buf()); + + // Root path len (including trailing separator) + 1 to be at start of child path + let child_start = (root.path().to_string_lossy().len() + 1) as u64; + + assert_eq!( + matches, + vec![ + SearchQueryPathMatch { + path: root.child(make_path("dir/other")).to_path_buf(), + submatches: vec![SearchQuerySubmatch { + r#match: SearchQueryMatchData::Text("other".to_string()), + start: child_start + 4, + end: child_start + 9, + }] + }, + SearchQueryPathMatch { + path: root.child(make_path("dir/other/bin")).to_path_buf(), + submatches: vec![SearchQuerySubmatch { + r#match: SearchQueryMatchData::Text("other".to_string()), + start: child_start + 4, + end: child_start + 9, + }] + }, + SearchQueryPathMatch { + path: root.child(make_path("other")).to_path_buf(), + submatches: vec![SearchQuerySubmatch { + r#match: SearchQueryMatchData::Text("other".to_string()), + start: child_start, + end: child_start + 5, + }] + }, + SearchQueryPathMatch { + path: root.child(make_path("other/file.txt")).to_path_buf(), + submatches: vec![SearchQuerySubmatch { + r#match: SearchQueryMatchData::Text("other".to_string()), + start: child_start, + end: child_start + 5, + }] + }, + ] + ); + + assert_eq!( + rx.recv().await, + Some(DistantResponseData::SearchDone { id: search_id }) + ); + + assert_eq!(rx.recv().await, None); + } + + #[tokio::test] + async fn should_support_targeting_paths() { + let root = setup_dir(vec![ + ("path/to/file1.txt", ""), + ("path/to/file2.txt", ""), + ("other/file.txt", ""), + ("other/dir/bin", ""), + ]); + + let state = SearchState::new(); + let (reply, mut rx) = mpsc::channel(100); + + let query = SearchQuery { + path: root.path().to_path_buf(), + target: SearchQueryTarget::Path, + condition: SearchQueryCondition::regex("path"), + options: Default::default(), + }; + + let search_id = state.start(query, Box::new(reply)).await.unwrap(); + + let mut matches = get_matches(rx.recv().await.unwrap()) + .into_iter() + .filter_map(|m| m.into_path_match()) + .collect::>(); + + matches.sort_unstable_by_key(|m| m.path.to_path_buf()); + + // Root path len (including trailing separator) + 1 to be at start of child path + let child_start = (root.path().to_string_lossy().len() + 1) as u64; + + assert_eq!( + matches, + vec![ + SearchQueryPathMatch { + path: root.child(make_path("path")).to_path_buf(), + submatches: vec![SearchQuerySubmatch { + r#match: SearchQueryMatchData::Text("path".to_string()), + start: child_start, + end: child_start + 4, + }] + }, + SearchQueryPathMatch { + path: root.child(make_path("path/to")).to_path_buf(), + submatches: vec![SearchQuerySubmatch { + r#match: SearchQueryMatchData::Text("path".to_string()), + start: child_start, + end: child_start + 4, + }] + }, + SearchQueryPathMatch { + path: root.child(make_path("path/to/file1.txt")).to_path_buf(), + submatches: vec![SearchQuerySubmatch { + r#match: SearchQueryMatchData::Text("path".to_string()), + start: child_start, + end: child_start + 4, + }] + }, + SearchQueryPathMatch { + path: root.child(make_path("path/to/file2.txt")).to_path_buf(), + submatches: vec![SearchQuerySubmatch { + r#match: SearchQueryMatchData::Text("path".to_string()), + start: child_start, + end: child_start + 4, + }] + } + ] + ); + + let data = rx.recv().await; + assert_eq!( + data, + Some(DistantResponseData::SearchDone { id: search_id }) + ); + + assert_eq!(rx.recv().await, None); + } + + #[tokio::test] + async fn should_support_targeting_contents() { + let root = setup_dir(vec![ + ("path/to/file1.txt", "some\nlines of text in\na\nfile"), + ("path/to/file2.txt", "more text"), + ("other/file.txt", "some other file with text"), + ("other/dir/bin", "asdfasdfasdfasdfasdfasdfasdfasdfasdf"), + ]); + + let state = SearchState::new(); + let (reply, mut rx) = mpsc::channel(100); + + let query = SearchQuery { + path: root.path().to_path_buf(), + target: SearchQueryTarget::Contents, + condition: SearchQueryCondition::regex("text"), + options: Default::default(), + }; + + let search_id = state.start(query, Box::new(reply)).await.unwrap(); + + let mut matches = get_matches(rx.recv().await.unwrap()) + .into_iter() + .filter_map(|m| m.into_contents_match()) + .collect::>(); + + matches.sort_unstable_by_key(|m| m.path.to_path_buf()); + + assert_eq!( + matches, + vec![ + SearchQueryContentsMatch { + path: root.child(make_path("other/file.txt")).to_path_buf(), + lines: SearchQueryMatchData::text("some other file with text"), + line_number: 1, + absolute_offset: 0, + submatches: vec![SearchQuerySubmatch { + r#match: SearchQueryMatchData::Text("text".to_string()), + start: 21, + end: 25, + }] + }, + SearchQueryContentsMatch { + path: root.child(make_path("path/to/file1.txt")).to_path_buf(), + lines: SearchQueryMatchData::text("lines of text in\n"), + line_number: 2, + absolute_offset: 5, + submatches: vec![SearchQuerySubmatch { + r#match: SearchQueryMatchData::Text("text".to_string()), + start: 9, + end: 13, + }] + }, + SearchQueryContentsMatch { + path: root.child(make_path("path/to/file2.txt")).to_path_buf(), + lines: SearchQueryMatchData::text("more text"), + line_number: 1, + absolute_offset: 0, + submatches: vec![SearchQuerySubmatch { + r#match: SearchQueryMatchData::Text("text".to_string()), + start: 5, + end: 9, + }] + } + ] + ); + + let data = rx.recv().await; + assert_eq!( + data, + Some(DistantResponseData::SearchDone { id: search_id }) + ); + + assert_eq!(rx.recv().await, None); + } + + #[tokio::test] + async fn should_support_multiple_submatches() { + let root = setup_dir(vec![("path/to/file.txt", "aa ab ac\nba bb bc\nca cb cc")]); + + let state = SearchState::new(); + let (reply, mut rx) = mpsc::channel(100); + + let query = SearchQuery { + path: root.path().to_path_buf(), + target: SearchQueryTarget::Contents, + condition: SearchQueryCondition::regex(r"[abc][ab]"), + options: Default::default(), + }; + + let search_id = state.start(query, Box::new(reply)).await.unwrap(); + + let mut matches = get_matches(rx.recv().await.unwrap()) + .into_iter() + .filter_map(|m| m.into_contents_match()) + .collect::>(); + + matches.sort_unstable_by_key(|m| m.line_number); + + assert_eq!( + matches, + vec![ + SearchQueryContentsMatch { + path: root.child(make_path("path/to/file.txt")).to_path_buf(), + lines: SearchQueryMatchData::text("aa ab ac\n"), + line_number: 1, + absolute_offset: 0, + submatches: vec![ + SearchQuerySubmatch { + r#match: SearchQueryMatchData::Text("aa".to_string()), + start: 0, + end: 2, + }, + SearchQuerySubmatch { + r#match: SearchQueryMatchData::Text("ab".to_string()), + start: 3, + end: 5, + } + ] + }, + SearchQueryContentsMatch { + path: root.child(make_path("path/to/file.txt")).to_path_buf(), + lines: SearchQueryMatchData::text("ba bb bc\n"), + line_number: 2, + absolute_offset: 9, + submatches: vec![ + SearchQuerySubmatch { + r#match: SearchQueryMatchData::Text("ba".to_string()), + start: 0, + end: 2, + }, + SearchQuerySubmatch { + r#match: SearchQueryMatchData::Text("bb".to_string()), + start: 3, + end: 5, + } + ] + }, + SearchQueryContentsMatch { + path: root.child(make_path("path/to/file.txt")).to_path_buf(), + lines: SearchQueryMatchData::text("ca cb cc"), + line_number: 3, + absolute_offset: 18, + submatches: vec![ + SearchQuerySubmatch { + r#match: SearchQueryMatchData::Text("ca".to_string()), + start: 0, + end: 2, + }, + SearchQuerySubmatch { + r#match: SearchQueryMatchData::Text("cb".to_string()), + start: 3, + end: 5, + } + ] + }, + ] + ); + + let data = rx.recv().await; + assert_eq!( + data, + Some(DistantResponseData::SearchDone { id: search_id }) + ); + + assert_eq!(rx.recv().await, None); + } + + #[tokio::test] + async fn should_send_paginated_results_if_specified() { + let root = setup_dir(vec![ + ("path/to/file1.txt", "some\nlines of text in\na\nfile"), + ("path/to/file2.txt", "more text"), + ("other/file.txt", "some other file with text"), + ("other/dir/bin", "asdfasdfasdfasdfasdfasdfasdfasdfasdf"), + ]); + + let state = SearchState::new(); + let (reply, mut rx) = mpsc::channel(100); + + let query = SearchQuery { + path: root.path().to_path_buf(), + target: SearchQueryTarget::Contents, + condition: SearchQueryCondition::regex("text"), + options: SearchQueryOptions { + pagination: Some(2), + ..Default::default() + }, + }; + + let search_id = state.start(query, Box::new(reply)).await.unwrap(); + + // Collect all matches here + let mut matches = Vec::new(); + + // Get first two matches + let paginated_matches = get_matches(rx.recv().await.unwrap()) + .into_iter() + .filter_map(|m| m.into_contents_match()) + .collect::>(); + + assert_eq!(paginated_matches.len(), 2); + matches.extend(paginated_matches); + + // Get last match + let paginated_matches = get_matches(rx.recv().await.unwrap()) + .into_iter() + .filter_map(|m| m.into_contents_match()) + .collect::>(); + + assert_eq!(paginated_matches.len(), 1); + matches.extend(paginated_matches); + + // Sort our matches so we can check them all + matches.sort_unstable_by_key(|m| m.path.to_path_buf()); + + assert_eq!( + matches, + vec![ + SearchQueryContentsMatch { + path: root.child(make_path("other/file.txt")).to_path_buf(), + lines: SearchQueryMatchData::text("some other file with text"), + line_number: 1, + absolute_offset: 0, + submatches: vec![SearchQuerySubmatch { + r#match: SearchQueryMatchData::Text("text".to_string()), + start: 21, + end: 25, + }] + }, + SearchQueryContentsMatch { + path: root.child(make_path("path/to/file1.txt")).to_path_buf(), + lines: SearchQueryMatchData::text("lines of text in\n"), + line_number: 2, + absolute_offset: 5, + submatches: vec![SearchQuerySubmatch { + r#match: SearchQueryMatchData::Text("text".to_string()), + start: 9, + end: 13, + }] + }, + SearchQueryContentsMatch { + path: root.child(make_path("path/to/file2.txt")).to_path_buf(), + lines: SearchQueryMatchData::text("more text"), + line_number: 1, + absolute_offset: 0, + submatches: vec![SearchQuerySubmatch { + r#match: SearchQueryMatchData::Text("text".to_string()), + start: 5, + end: 9, + }] + } + ] + ); + + let data = rx.recv().await; + assert_eq!( + data, + Some(DistantResponseData::SearchDone { id: search_id }) + ); + + assert_eq!(rx.recv().await, None); + } + + #[tokio::test] + async fn should_send_maximum_of_limit_results_if_specified() { + let root = setup_dir(vec![ + ("path/to/file1.txt", "some\nlines of text in\na\nfile"), + ("path/to/file2.txt", "more text"), + ("other/file.txt", "some other file with text"), + ("other/dir/bin", "asdfasdfasdfasdfasdfasdfasdfasdfasdf"), + ]); + + let state = SearchState::new(); + let (reply, mut rx) = mpsc::channel(100); + + let query = SearchQuery { + path: root.path().to_path_buf(), + target: SearchQueryTarget::Contents, + condition: SearchQueryCondition::regex("text"), + options: SearchQueryOptions { + limit: Some(2), + ..Default::default() + }, + }; + + let search_id = state.start(query, Box::new(reply)).await.unwrap(); + + // Get all matches and verify the len + let matches = get_matches(rx.recv().await.unwrap()); + assert_eq!(matches.len(), 2); + + let data = rx.recv().await; + assert_eq!( + data, + Some(DistantResponseData::SearchDone { id: search_id }) + ); + + assert_eq!(rx.recv().await, None); + } + + #[tokio::test] + async fn should_send_maximum_of_limit_results_with_pagination_if_specified() { + let root = setup_dir(vec![ + ("path/to/file1.txt", "some\nlines of text in\na\nfile"), + ("path/to/file2.txt", "more text"), + ("other/file.txt", "some other file with text"), + ("other/dir/bin", "asdfasdfasdfasdfasdfasdfasdfasdfasdf"), + ]); + + let state = SearchState::new(); + let (reply, mut rx) = mpsc::channel(100); + + let query = SearchQuery { + path: root.path().to_path_buf(), + target: SearchQueryTarget::Contents, + condition: SearchQueryCondition::regex("text"), + options: SearchQueryOptions { + pagination: Some(1), + limit: Some(2), + ..Default::default() + }, + }; + + let search_id = state.start(query, Box::new(reply)).await.unwrap(); + + // Verify that we get one match at a time up to the limit + let matches = get_matches(rx.recv().await.unwrap()); + assert_eq!(matches.len(), 1); + + let matches = get_matches(rx.recv().await.unwrap()); + assert_eq!(matches.len(), 1); + + let data = rx.recv().await; + assert_eq!( + data, + Some(DistantResponseData::SearchDone { id: search_id }) + ); + + assert_eq!(rx.recv().await, None); + } + + #[tokio::test] + async fn should_traverse_starting_from_min_depth_if_specified() { + let root = setup_dir(vec![ + ("path/to/file1.txt", ""), + ("path/to/file2.txt", ""), + ("other/file.txt", ""), + ("other/dir/bin", ""), + ]); + + async fn test_min_depth( + root: &assert_fs::TempDir, + depth: u64, + expected_paths: Vec, + ) { + let state = SearchState::new(); + let (reply, mut rx) = mpsc::channel(100); + let query = SearchQuery { + path: root.path().to_path_buf(), + target: SearchQueryTarget::Path, + condition: SearchQueryCondition::regex(".*"), + options: SearchQueryOptions { + min_depth: Some(depth), + ..Default::default() + }, + }; + + let search_id = state.start(query, Box::new(reply)).await.unwrap(); + + let mut paths = get_matches(rx.recv().await.unwrap()) + .into_iter() + .filter_map(|m| m.into_path_match()) + .map(|m| m.path) + .collect::>(); + + paths.sort_unstable(); + + assert_eq!(paths, expected_paths); + + let data = rx.recv().await; + assert_eq!( + data, + Some(DistantResponseData::SearchDone { id: search_id }) + ); + + assert_eq!(rx.recv().await, None); + } + + // Minimum depth of 0 should include root search path + test_min_depth( + &root, + 0, + vec![ + root.to_path_buf(), + root.child(make_path("other")).to_path_buf(), + root.child(make_path("other/dir")).to_path_buf(), + root.child(make_path("other/dir/bin")).to_path_buf(), + root.child(make_path("other/file.txt")).to_path_buf(), + root.child(make_path("path")).to_path_buf(), + root.child(make_path("path/to")).to_path_buf(), + root.child(make_path("path/to/file1.txt")).to_path_buf(), + root.child(make_path("path/to/file2.txt")).to_path_buf(), + ], + ) + .await; + + // Minimum depth of 1 should not root search path + test_min_depth( + &root, + 1, + vec![ + root.child(make_path("other")).to_path_buf(), + root.child(make_path("other/dir")).to_path_buf(), + root.child(make_path("other/dir/bin")).to_path_buf(), + root.child(make_path("other/file.txt")).to_path_buf(), + root.child(make_path("path")).to_path_buf(), + root.child(make_path("path/to")).to_path_buf(), + root.child(make_path("path/to/file1.txt")).to_path_buf(), + root.child(make_path("path/to/file2.txt")).to_path_buf(), + ], + ) + .await; + + // Minimum depth of 2 should not include root or children + test_min_depth( + &root, + 2, + vec![ + root.child(make_path("other/dir")).to_path_buf(), + root.child(make_path("other/dir/bin")).to_path_buf(), + root.child(make_path("other/file.txt")).to_path_buf(), + root.child(make_path("path/to")).to_path_buf(), + root.child(make_path("path/to/file1.txt")).to_path_buf(), + root.child(make_path("path/to/file2.txt")).to_path_buf(), + ], + ) + .await; + + // Minimum depth of 3 should not include root or children or grandchildren + test_min_depth( + &root, + 3, + vec![ + root.child(make_path("other/dir/bin")).to_path_buf(), + root.child(make_path("path/to/file1.txt")).to_path_buf(), + root.child(make_path("path/to/file2.txt")).to_path_buf(), + ], + ) + .await; + } + + #[tokio::test] + async fn should_traverse_no_deeper_than_max_depth_if_specified() { + let root = setup_dir(vec![ + ("path/to/file1.txt", ""), + ("path/to/file2.txt", ""), + ("other/file.txt", ""), + ("other/dir/bin", ""), + ]); + + async fn test_max_depth( + root: &assert_fs::TempDir, + depth: u64, + expected_paths: Vec, + ) { + let state = SearchState::new(); + let (reply, mut rx) = mpsc::channel(100); + let query = SearchQuery { + path: root.path().to_path_buf(), + target: SearchQueryTarget::Path, + condition: SearchQueryCondition::regex(".*"), + options: SearchQueryOptions { + max_depth: Some(depth), + ..Default::default() + }, + }; + + let search_id = state.start(query, Box::new(reply)).await.unwrap(); + + let mut paths = get_matches(rx.recv().await.unwrap()) + .into_iter() + .filter_map(|m| m.into_path_match()) + .map(|m| m.path) + .collect::>(); + + paths.sort_unstable(); + + assert_eq!(paths, expected_paths); + + let data = rx.recv().await; + assert_eq!( + data, + Some(DistantResponseData::SearchDone { id: search_id }) + ); + + assert_eq!(rx.recv().await, None); + } + + // Maximum depth of 0 should only include root + test_max_depth(&root, 0, vec![root.to_path_buf()]).await; + + // Maximum depth of 1 should only include root and children + test_max_depth( + &root, + 1, + vec![ + root.to_path_buf(), + root.child(make_path("other")).to_path_buf(), + root.child(make_path("path")).to_path_buf(), + ], + ) + .await; + + // Maximum depth of 2 should only include root and children and grandchildren + test_max_depth( + &root, + 2, + vec![ + root.to_path_buf(), + root.child(make_path("other")).to_path_buf(), + root.child(make_path("other/dir")).to_path_buf(), + root.child(make_path("other/file.txt")).to_path_buf(), + root.child(make_path("path")).to_path_buf(), + root.child(make_path("path/to")).to_path_buf(), + ], + ) + .await; + + // Maximum depth of 3 should include everything we have in our test + test_max_depth( + &root, + 3, + vec![ + root.to_path_buf(), + root.child(make_path("other")).to_path_buf(), + root.child(make_path("other/dir")).to_path_buf(), + root.child(make_path("other/dir/bin")).to_path_buf(), + root.child(make_path("other/file.txt")).to_path_buf(), + root.child(make_path("path")).to_path_buf(), + root.child(make_path("path/to")).to_path_buf(), + root.child(make_path("path/to/file1.txt")).to_path_buf(), + root.child(make_path("path/to/file2.txt")).to_path_buf(), + ], + ) + .await; + } + + #[tokio::test] + async fn should_filter_searched_paths_to_only_those_that_match_include_regex() { + let root = setup_dir(vec![ + ("path/to/file1.txt", "some\nlines of text in\na\nfile"), + ("path/to/file2.txt", "more text"), + ("other/file.txt", "some other file with text"), + ("other/dir/bin", "asdfasdfasdfasdfasdfasdfasdfasdfasdf"), + ]); + + let state = SearchState::new(); + let (reply, mut rx) = mpsc::channel(100); + + let query = SearchQuery { + path: root.path().to_path_buf(), + target: SearchQueryTarget::Contents, + condition: SearchQueryCondition::regex("text"), + options: SearchQueryOptions { + include: Some(SearchQueryCondition::regex("other")), + ..Default::default() + }, + }; + + let search_id = state.start(query, Box::new(reply)).await.unwrap(); + + let mut matches = get_matches(rx.recv().await.unwrap()) + .into_iter() + .filter_map(|m| m.into_contents_match()) + .collect::>(); + + matches.sort_unstable_by_key(|m| m.path.to_path_buf()); + + assert_eq!( + matches, + vec![SearchQueryContentsMatch { + path: root.child(make_path("other/file.txt")).to_path_buf(), + lines: SearchQueryMatchData::text("some other file with text"), + line_number: 1, + absolute_offset: 0, + submatches: vec![SearchQuerySubmatch { + r#match: SearchQueryMatchData::Text("text".to_string()), + start: 21, + end: 25, + }] + }] + ); + + let data = rx.recv().await; + assert_eq!( + data, + Some(DistantResponseData::SearchDone { id: search_id }) + ); + + assert_eq!(rx.recv().await, None); + } + + #[tokio::test] + async fn should_filter_searched_paths_to_only_those_that_do_not_match_exclude_regex() { + let root = setup_dir(vec![ + ("path/to/file1.txt", "some\nlines of text in\na\nfile"), + ("path/to/file2.txt", "more text"), + ("other/file.txt", "some other file with text"), + ("other/dir/bin", "asdfasdfasdfasdfasdfasdfasdfasdfasdf"), + ]); + + let state = SearchState::new(); + let (reply, mut rx) = mpsc::channel(100); + + let query = SearchQuery { + path: root.path().to_path_buf(), + target: SearchQueryTarget::Contents, + condition: SearchQueryCondition::regex("text"), + options: SearchQueryOptions { + exclude: Some(SearchQueryCondition::regex("other")), + ..Default::default() + }, + }; + + let search_id = state.start(query, Box::new(reply)).await.unwrap(); + + let mut matches = get_matches(rx.recv().await.unwrap()) + .into_iter() + .filter_map(|m| m.into_contents_match()) + .collect::>(); + + matches.sort_unstable_by_key(|m| m.path.to_path_buf()); + + assert_eq!( + matches, + vec![ + SearchQueryContentsMatch { + path: root.child(make_path("path/to/file1.txt")).to_path_buf(), + lines: SearchQueryMatchData::text("lines of text in\n"), + line_number: 2, + absolute_offset: 5, + submatches: vec![SearchQuerySubmatch { + r#match: SearchQueryMatchData::Text("text".to_string()), + start: 9, + end: 13, + }] + }, + SearchQueryContentsMatch { + path: root.child(make_path("path/to/file2.txt")).to_path_buf(), + lines: SearchQueryMatchData::text("more text"), + line_number: 1, + absolute_offset: 0, + submatches: vec![SearchQuerySubmatch { + r#match: SearchQueryMatchData::Text("text".to_string()), + start: 5, + end: 9, + }] + } + ] + ); + + let data = rx.recv().await; + assert_eq!( + data, + Some(DistantResponseData::SearchDone { id: search_id }) + ); + + assert_eq!(rx.recv().await, None); + } + + #[tokio::test] + async fn should_return_binary_match_data_if_match_is_not_utf8() { + let root = assert_fs::TempDir::new().unwrap(); + let bin_file = root.child(make_path("file.bin")); + + // Write some invalid bytes, a newline, and then "hello" + bin_file + .write_binary(&[0, 159, 146, 150, 10, 72, 69, 76, 76, 79]) + .unwrap(); + + let state = SearchState::new(); + let (reply, mut rx) = mpsc::channel(100); + + // NOTE: We provide regex that matches an invalid UTF-8 character by disabling the u flag + // and checking for 0x9F (159) + let query = SearchQuery { + path: root.path().to_path_buf(), + target: SearchQueryTarget::Contents, + condition: SearchQueryCondition::regex(r"(?-u:\x9F)"), + options: Default::default(), + }; + + let search_id = state.start(query, Box::new(reply)).await.unwrap(); + + let matches = get_matches(rx.recv().await.unwrap()) + .into_iter() + .filter_map(|m| m.into_contents_match()) + .collect::>(); + + assert_eq!( + matches, + vec![SearchQueryContentsMatch { + path: root.child(make_path("file.bin")).to_path_buf(), + lines: SearchQueryMatchData::bytes([0, 159, 146, 150, 10]), + line_number: 1, + absolute_offset: 0, + submatches: vec![SearchQuerySubmatch { + r#match: SearchQueryMatchData::bytes([159]), + start: 1, + end: 2, + }] + },] + ); + + let data = rx.recv().await; + assert_eq!( + data, + Some(DistantResponseData::SearchDone { id: search_id }) + ); + + assert_eq!(rx.recv().await, None); + } + + #[tokio::test] + async fn should_filter_searched_paths_to_only_those_are_an_allowed_file_type() { + let root = assert_fs::TempDir::new().unwrap(); + let file = root.child(make_path("file")); + file.touch().unwrap(); + root.child(make_path("dir")).create_dir_all().unwrap(); + root.child(make_path("symlink")) + .symlink_to_file(file.path()) + .unwrap(); + + async fn test_allowed_file_types( + root: &assert_fs::TempDir, + allowed_file_types: Vec, + expected_paths: Vec, + ) { + let state = SearchState::new(); + let (reply, mut rx) = mpsc::channel(100); + + let query = SearchQuery { + path: root.path().to_path_buf(), + target: SearchQueryTarget::Path, + condition: SearchQueryCondition::regex(".*"), + options: SearchQueryOptions { + allowed_file_types: allowed_file_types.iter().copied().collect(), + ..Default::default() + }, + }; + + let search_id = state.start(query, Box::new(reply)).await.unwrap(); + + let mut paths = get_matches(rx.recv().await.unwrap()) + .into_iter() + .filter_map(|m| m.into_path_match()) + .map(|m| m.path) + .collect::>(); + + paths.sort_unstable(); + + assert_eq!( + paths, expected_paths, + "Path types did not match allowed: {allowed_file_types:?}" + ); + + let data = rx.recv().await; + assert_eq!( + data, + Some(DistantResponseData::SearchDone { id: search_id }) + ); + + assert_eq!(rx.recv().await, None); + } + + // Empty set of allowed types falls back to allowing everything + test_allowed_file_types( + &root, + vec![], + vec![ + root.to_path_buf(), + root.child("dir").to_path_buf(), + root.child("file").to_path_buf(), + root.child("symlink").to_path_buf(), + ], + ) + .await; + + test_allowed_file_types( + &root, + vec![FileType::File], + vec![root.child("file").to_path_buf()], + ) + .await; + + test_allowed_file_types( + &root, + vec![FileType::Dir], + vec![root.to_path_buf(), root.child("dir").to_path_buf()], + ) + .await; + + test_allowed_file_types( + &root, + vec![FileType::Symlink], + vec![root.child("symlink").to_path_buf()], + ) + .await; + } + + #[tokio::test] + async fn should_follow_not_symbolic_links_if_specified_in_options() { + let root = assert_fs::TempDir::new().unwrap(); + + let file = root.child(make_path("file")); + file.touch().unwrap(); + let dir = root.child(make_path("dir")); + dir.create_dir_all().unwrap(); + root.child(make_path("file_symlink")) + .symlink_to_file(file.path()) + .unwrap(); + root.child(make_path("dir_symlink")) + .symlink_to_dir(dir.path()) + .unwrap(); + + let state = SearchState::new(); + let (reply, mut rx) = mpsc::channel(100); + + let query = SearchQuery { + path: root.path().to_path_buf(), + target: SearchQueryTarget::Path, + condition: SearchQueryCondition::regex(".*"), + options: SearchQueryOptions { + follow_symbolic_links: true, + ..Default::default() + }, + }; + + let search_id = state.start(query, Box::new(reply)).await.unwrap(); + + let mut paths = get_matches(rx.recv().await.unwrap()) + .into_iter() + .filter_map(|m| m.into_path_match()) + .map(|m| m.path) + .collect::>(); + + paths.sort_unstable(); + + assert_eq!( + paths, + vec![ + root.to_path_buf(), + root.child("dir").to_path_buf(), + root.child("dir_symlink").to_path_buf(), + root.child("file").to_path_buf(), + root.child("file_symlink").to_path_buf(), + ] + ); + + let data = rx.recv().await; + assert_eq!( + data, + Some(DistantResponseData::SearchDone { id: search_id }) + ); + + assert_eq!(rx.recv().await, None); + } + + #[tokio::test] + async fn should_follow_symbolic_links_if_specified_in_options() { + let root = assert_fs::TempDir::new().unwrap(); + + let file = root.child(make_path("file")); + file.touch().unwrap(); + let dir = root.child(make_path("dir")); + dir.create_dir_all().unwrap(); + root.child(make_path("file_symlink")) + .symlink_to_file(file.path()) + .unwrap(); + root.child(make_path("dir_symlink")) + .symlink_to_dir(dir.path()) + .unwrap(); + + let state = SearchState::new(); + let (reply, mut rx) = mpsc::channel(100); + + // NOTE: Following symlobic links on its own does nothing, but when combined with a file + // type filter, it will evaluate the underlying type of symbolic links and filter + // based on that instead of the the symbolic link + let query = SearchQuery { + path: root.path().to_path_buf(), + target: SearchQueryTarget::Path, + condition: SearchQueryCondition::regex(".*"), + options: SearchQueryOptions { + allowed_file_types: vec![FileType::File].into_iter().collect(), + follow_symbolic_links: true, + ..Default::default() + }, + }; + + let search_id = state.start(query, Box::new(reply)).await.unwrap(); + + let mut paths = get_matches(rx.recv().await.unwrap()) + .into_iter() + .filter_map(|m| m.into_path_match()) + .map(|m| m.path) + .collect::>(); + + paths.sort_unstable(); + + assert_eq!( + paths, + vec![ + root.child("file").to_path_buf(), + root.child("file_symlink").to_path_buf(), + ] + ); + + let data = rx.recv().await; + assert_eq!( + data, + Some(DistantResponseData::SearchDone { id: search_id }) + ); + + assert_eq!(rx.recv().await, None); + } +} diff --git a/distant-core/src/client.rs b/distant-core/src/client.rs index 987b401..1371a34 100644 --- a/distant-core/src/client.rs +++ b/distant-core/src/client.rs @@ -4,6 +4,7 @@ use distant_net::{Channel, Client}; mod ext; mod lsp; mod process; +mod searcher; mod watcher; /// Represents a [`Client`] that communicates using the distant protocol @@ -15,4 +16,5 @@ pub type DistantChannel = Channel, DistantMsg io::Error { io::Error::new(io::ErrorKind::Other, "Mismatched response") } -/// Provides convenience functions on top of a [`SessionChannel`] +/// Provides convenience functions on top of a [`Channel`] pub trait DistantChannelExt { /// Appends to a remote file using the data from a collection of bytes fn append_file( @@ -53,6 +54,12 @@ pub trait DistantChannelExt { resolve_file_type: bool, ) -> AsyncReturn<'_, Metadata>; + /// Perform a search + fn search(&mut self, query: impl Into) -> AsyncReturn<'_, Searcher>; + + /// Cancel an active search query + fn cancel_search(&mut self, id: SearchId) -> AsyncReturn<'_, ()>; + /// Reads entries from a directory, returning a tuple of directory entries and failures fn read_dir( &mut self, @@ -249,6 +256,19 @@ impl DistantChannelExt ) } + fn search(&mut self, query: impl Into) -> AsyncReturn<'_, Searcher> { + let query = query.into(); + Box::pin(async move { Searcher::search(self.clone(), query).await }) + } + + fn cancel_search(&mut self, id: SearchId) -> AsyncReturn<'_, ()> { + make_body!( + self, + DistantRequestData::CancelSearch { id }, + @ok + ) + } + fn read_dir( &mut self, path: impl Into, diff --git a/distant-core/src/client/searcher.rs b/distant-core/src/client/searcher.rs new file mode 100644 index 0000000..83a659e --- /dev/null +++ b/distant-core/src/client/searcher.rs @@ -0,0 +1,626 @@ +use crate::{ + client::{DistantChannel, DistantChannelExt}, + constants::CLIENT_SEARCHER_CAPACITY, + data::{DistantRequestData, DistantResponseData, SearchId, SearchQuery, SearchQueryMatch}, + DistantMsg, +}; +use distant_net::Request; +use log::*; +use std::{fmt, io}; +use tokio::{sync::mpsc, task::JoinHandle}; + +/// Represents a searcher for files, directories, and symlinks on the filesystem +pub struct Searcher { + channel: DistantChannel, + id: SearchId, + query: SearchQuery, + task: JoinHandle<()>, + rx: mpsc::Receiver, +} + +impl fmt::Debug for Searcher { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Searcher") + .field("id", &self.id) + .field("query", &self.query) + .finish() + } +} + +impl Searcher { + /// Creates a searcher for some query + pub async fn search(mut channel: DistantChannel, query: SearchQuery) -> io::Result { + trace!("Searching using {query:?}",); + + // Submit our run request and get back a mailbox for responses + let mut mailbox = channel + .mail(Request::new(DistantMsg::Single( + DistantRequestData::Search { + query: query.clone(), + }, + ))) + .await?; + + let (tx, rx) = mpsc::channel(CLIENT_SEARCHER_CAPACITY); + + // Wait to get the confirmation of watch as either ok or error + let mut queue: Vec = Vec::new(); + let mut search_id = None; + while let Some(res) = mailbox.next().await { + for data in res.payload.into_vec() { + match data { + // If we get results before the started indicator, queue them up + DistantResponseData::SearchResults { matches, .. } => { + queue.extend(matches); + } + + // Once we get the started indicator, mark as ready to go + DistantResponseData::SearchStarted { id } => { + trace!("[Query {id}] Searcher has started"); + search_id = Some(id); + } + + // If we get an explicit error, convert and return it + DistantResponseData::Error(x) => return Err(io::Error::from(x)), + + // Otherwise, we got something unexpected, and report as such + x => { + return Err(io::Error::new( + io::ErrorKind::Other, + format!("Unexpected response: {:?}", x), + )) + } + } + } + + // Exit if we got the confirmation + // NOTE: Doing this later because we want to make sure the entire payload is processed + // first before exiting the loop + if search_id.is_some() { + break; + } + } + + let search_id = match search_id { + // Send out any of our queued changes that we got prior to the acknowledgement + Some(id) => { + trace!("[Query {id}] Forwarding {} queued matches", queue.len()); + for r#match in queue.drain(..) { + if tx.send(r#match).await.is_err() { + return Err(io::Error::new( + io::ErrorKind::Other, + format!("[Query {id}] Queue search match dropped"), + )); + } + } + id + } + + // If we never received an acknowledgement of search before the mailbox closed, + // fail with a missing confirmation error + None => { + return Err(io::Error::new( + io::ErrorKind::Other, + "Search query missing started confirmation", + )) + } + }; + + // Spawn a task that continues to look for search result events and the conclusion of the + // search, discarding anything else that it gets + let task = tokio::spawn({ + async move { + while let Some(res) = mailbox.next().await { + let mut done = false; + + for data in res.payload.into_vec() { + match data { + DistantResponseData::SearchResults { matches, .. } => { + // If we can't queue up a match anymore, we've + // been closed and therefore want to quit + if tx.is_closed() { + break; + } + + // Otherwise, send over the matches + for r#match in matches { + if let Err(x) = tx.send(r#match).await { + error!( + "[Query {search_id}] Searcher failed to send match {:?}", + x.0 + ); + break; + } + } + } + + // Received completion indicator, so close out + DistantResponseData::SearchDone { .. } => { + trace!("[Query {search_id}] Searcher has finished"); + done = true; + break; + } + + _ => continue, + } + } + + if done { + break; + } + } + } + }); + + Ok(Self { + id: search_id, + query, + channel, + task, + rx, + }) + } + + /// Returns a reference to the query this searcher is running + pub fn query(&self) -> &SearchQuery { + &self.query + } + + /// Returns true if the searcher is still actively searching + pub fn is_active(&self) -> bool { + !self.task.is_finished() + } + + /// Returns the next match detected by the searcher, or none if the searcher has concluded + pub async fn next(&mut self) -> Option { + self.rx.recv().await + } + + /// Cancels the search being performed by the watcher + pub async fn cancel(&mut self) -> io::Result<()> { + trace!("[Query {}] Cancelling search", self.id); + self.channel.cancel_search(self.id).await?; + + // Kill our task that processes inbound matches if we have successfully stopped searching + self.task.abort(); + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::data::{ + SearchQueryCondition, SearchQueryMatchData, SearchQueryOptions, SearchQueryPathMatch, + SearchQuerySubmatch, SearchQueryTarget, + }; + use crate::DistantClient; + use distant_net::{ + Client, FramedTransport, InmemoryTransport, IntoSplit, PlainCodec, Response, + TypedAsyncRead, TypedAsyncWrite, + }; + use std::{path::PathBuf, sync::Arc}; + use tokio::sync::Mutex; + + fn make_session() -> ( + FramedTransport, + DistantClient, + ) { + let (t1, t2) = FramedTransport::pair(100); + let (writer, reader) = t2.into_split(); + (t1, Client::new(writer, reader).unwrap()) + } + + #[tokio::test] + async fn searcher_should_have_query_reflect_ongoing_query() { + let (mut transport, session) = make_session(); + let test_query = SearchQuery { + path: PathBuf::from("/some/test/path"), + target: SearchQueryTarget::Path, + condition: SearchQueryCondition::Regex { + value: String::from("."), + }, + options: SearchQueryOptions::default(), + }; + + // Create a task for searcher as we need to handle the request and a response + // in a separate async block + let search_task = { + let test_query = test_query.clone(); + tokio::spawn(async move { Searcher::search(session.clone_channel(), test_query).await }) + }; + + // Wait until we get the request from the session + let req: Request = transport.read().await.unwrap().unwrap(); + + // Send back an acknowledgement that a search was started + transport + .write(Response::new( + req.id, + DistantResponseData::SearchStarted { id: rand::random() }, + )) + .await + .unwrap(); + + // Get the searcher and verify the query + let searcher = search_task.await.unwrap().unwrap(); + assert_eq!(searcher.query(), &test_query); + } + + #[tokio::test] + async fn searcher_should_support_getting_next_match() { + let (mut transport, session) = make_session(); + let test_query = SearchQuery { + path: PathBuf::from("/some/test/path"), + target: SearchQueryTarget::Path, + condition: SearchQueryCondition::Regex { + value: String::from("."), + }, + options: SearchQueryOptions::default(), + }; + + // Create a task for searcher as we need to handle the request and a response + // in a separate async block + let search_task = + tokio::spawn( + async move { Searcher::search(session.clone_channel(), test_query).await }, + ); + + // Wait until we get the request from the session + let req: Request = transport.read().await.unwrap().unwrap(); + + // Send back an acknowledgement that a searcher was created + let id = rand::random::(); + transport + .write(Response::new( + req.id.clone(), + DistantResponseData::SearchStarted { id }, + )) + .await + .unwrap(); + + // Get the searcher + let mut searcher = search_task.await.unwrap().unwrap(); + + // Send some matches related to the file + transport + .write(Response::new( + req.id, + vec![ + DistantResponseData::SearchResults { + id, + matches: vec![ + SearchQueryMatch::Path(SearchQueryPathMatch { + path: PathBuf::from("/some/path/1"), + submatches: vec![SearchQuerySubmatch { + r#match: SearchQueryMatchData::Text("test match".to_string()), + start: 3, + end: 7, + }], + }), + SearchQueryMatch::Path(SearchQueryPathMatch { + path: PathBuf::from("/some/path/2"), + submatches: vec![SearchQuerySubmatch { + r#match: SearchQueryMatchData::Text("test match 2".to_string()), + start: 88, + end: 99, + }], + }), + ], + }, + DistantResponseData::SearchResults { + id, + matches: vec![SearchQueryMatch::Path(SearchQueryPathMatch { + path: PathBuf::from("/some/path/3"), + submatches: vec![SearchQuerySubmatch { + r#match: SearchQueryMatchData::Text("test match 3".to_string()), + start: 5, + end: 9, + }], + })], + }, + ], + )) + .await + .unwrap(); + + // Verify that the searcher gets the matches, one at a time + let m = searcher.next().await.expect("Searcher closed unexpectedly"); + assert_eq!( + m, + SearchQueryMatch::Path(SearchQueryPathMatch { + path: PathBuf::from("/some/path/1"), + submatches: vec![SearchQuerySubmatch { + r#match: SearchQueryMatchData::Text("test match".to_string()), + start: 3, + end: 7, + }], + }) + ); + + let m = searcher.next().await.expect("Searcher closed unexpectedly"); + assert_eq!( + m, + SearchQueryMatch::Path(SearchQueryPathMatch { + path: PathBuf::from("/some/path/2"), + submatches: vec![SearchQuerySubmatch { + r#match: SearchQueryMatchData::Text("test match 2".to_string()), + start: 88, + end: 99, + }], + }), + ); + + let m = searcher.next().await.expect("Searcher closed unexpectedly"); + assert_eq!( + m, + SearchQueryMatch::Path(SearchQueryPathMatch { + path: PathBuf::from("/some/path/3"), + submatches: vec![SearchQuerySubmatch { + r#match: SearchQueryMatchData::Text("test match 3".to_string()), + start: 5, + end: 9, + }], + }) + ); + } + + #[tokio::test] + async fn searcher_should_distinguish_match_events_and_only_receive_matches_for_itself() { + let (mut transport, session) = make_session(); + + let test_query = SearchQuery { + path: PathBuf::from("/some/test/path"), + target: SearchQueryTarget::Path, + condition: SearchQueryCondition::Regex { + value: String::from("."), + }, + options: SearchQueryOptions::default(), + }; + + // Create a task for searcher as we need to handle the request and a response + // in a separate async block + let search_task = + tokio::spawn( + async move { Searcher::search(session.clone_channel(), test_query).await }, + ); + + // Wait until we get the request from the session + let req: Request = transport.read().await.unwrap().unwrap(); + + // Send back an acknowledgement that a searcher was created + let id = rand::random(); + transport + .write(Response::new( + req.id.clone(), + DistantResponseData::SearchStarted { id }, + )) + .await + .unwrap(); + + // Get the searcher + let mut searcher = search_task.await.unwrap().unwrap(); + + // Send a match from the appropriate origin + transport + .write(Response::new( + req.id.clone(), + DistantResponseData::SearchResults { + id, + matches: vec![SearchQueryMatch::Path(SearchQueryPathMatch { + path: PathBuf::from("/some/path/1"), + submatches: vec![SearchQuerySubmatch { + r#match: SearchQueryMatchData::Text("test match".to_string()), + start: 3, + end: 7, + }], + })], + }, + )) + .await + .unwrap(); + + // Send a chanmatchge from a different origin + transport + .write(Response::new( + req.id.clone() + "1", + DistantResponseData::SearchResults { + id, + matches: vec![SearchQueryMatch::Path(SearchQueryPathMatch { + path: PathBuf::from("/some/path/2"), + submatches: vec![SearchQuerySubmatch { + r#match: SearchQueryMatchData::Text("test match 2".to_string()), + start: 88, + end: 99, + }], + })], + }, + )) + .await + .unwrap(); + + // Send a chanmatchge from the appropriate origin + transport + .write(Response::new( + req.id, + DistantResponseData::SearchResults { + id, + matches: vec![SearchQueryMatch::Path(SearchQueryPathMatch { + path: PathBuf::from("/some/path/3"), + submatches: vec![SearchQuerySubmatch { + r#match: SearchQueryMatchData::Text("test match 3".to_string()), + start: 5, + end: 9, + }], + })], + }, + )) + .await + .unwrap(); + + // Verify that the searcher gets the matches, one at a time + let m = searcher.next().await.expect("Searcher closed unexpectedly"); + assert_eq!( + m, + SearchQueryMatch::Path(SearchQueryPathMatch { + path: PathBuf::from("/some/path/1"), + submatches: vec![SearchQuerySubmatch { + r#match: SearchQueryMatchData::Text("test match".to_string()), + start: 3, + end: 7, + }], + }) + ); + + let m = searcher.next().await.expect("Watcher closed unexpectedly"); + assert_eq!( + m, + SearchQueryMatch::Path(SearchQueryPathMatch { + path: PathBuf::from("/some/path/3"), + submatches: vec![SearchQuerySubmatch { + r#match: SearchQueryMatchData::Text("test match 3".to_string()), + start: 5, + end: 9, + }], + }) + ); + } + + #[tokio::test] + async fn searcher_should_stop_receiving_events_if_cancelled() { + let (mut transport, session) = make_session(); + + let test_query = SearchQuery { + path: PathBuf::from("/some/test/path"), + target: SearchQueryTarget::Path, + condition: SearchQueryCondition::Regex { + value: String::from("."), + }, + options: SearchQueryOptions::default(), + }; + + // Create a task for searcher as we need to handle the request and a response + // in a separate async block + let search_task = + tokio::spawn( + async move { Searcher::search(session.clone_channel(), test_query).await }, + ); + + // Wait until we get the request from the session + let req: Request = transport.read().await.unwrap().unwrap(); + + // Send back an acknowledgement that a watcher was created + let id = rand::random::(); + transport + .write(Response::new( + req.id.clone(), + DistantResponseData::SearchStarted { id }, + )) + .await + .unwrap(); + + // Send some matches from the appropriate origin + transport + .write(Response::new( + req.id, + DistantResponseData::SearchResults { + id, + matches: vec![ + SearchQueryMatch::Path(SearchQueryPathMatch { + path: PathBuf::from("/some/path/1"), + submatches: vec![SearchQuerySubmatch { + r#match: SearchQueryMatchData::Text("test match".to_string()), + start: 3, + end: 7, + }], + }), + SearchQueryMatch::Path(SearchQueryPathMatch { + path: PathBuf::from("/some/path/2"), + submatches: vec![SearchQuerySubmatch { + r#match: SearchQueryMatchData::Text("test match 2".to_string()), + start: 88, + end: 99, + }], + }), + ], + }, + )) + .await + .unwrap(); + + // Wait a little bit for all matches to be queued + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + // Create a task for for cancelling as we need to handle the request and a response + // in a separate async block + let searcher = Arc::new(Mutex::new(search_task.await.unwrap().unwrap())); + + // Verify that the searcher gets the first match + let m = searcher + .lock() + .await + .next() + .await + .expect("Searcher closed unexpectedly"); + assert_eq!( + m, + SearchQueryMatch::Path(SearchQueryPathMatch { + path: PathBuf::from("/some/path/1"), + submatches: vec![SearchQuerySubmatch { + r#match: SearchQueryMatchData::Text("test match".to_string()), + start: 3, + end: 7, + }], + }), + ); + + // Cancel the search, verify the request is sent out, and respond with ok + let searcher_2 = Arc::clone(&searcher); + let cancel_task = tokio::spawn(async move { searcher_2.lock().await.cancel().await }); + + let req: Request = transport.read().await.unwrap().unwrap(); + + transport + .write(Response::new(req.id.clone(), DistantResponseData::Ok)) + .await + .unwrap(); + + // Wait for the cancel to complete + cancel_task.await.unwrap().unwrap(); + + // Send a match that will get ignored + transport + .write(Response::new( + req.id, + DistantResponseData::SearchResults { + id, + matches: vec![SearchQueryMatch::Path(SearchQueryPathMatch { + path: PathBuf::from("/some/path/3"), + submatches: vec![SearchQuerySubmatch { + r#match: SearchQueryMatchData::Text("test match 3".to_string()), + start: 5, + end: 9, + }], + })], + }, + )) + .await + .unwrap(); + + // Verify that we get any remaining matches that were received before cancel, + // but nothing new after that + assert_eq!( + searcher.lock().await.next().await, + Some(SearchQueryMatch::Path(SearchQueryPathMatch { + path: PathBuf::from("/some/path/2"), + submatches: vec![SearchQuerySubmatch { + r#match: SearchQueryMatchData::Text("test match 2".to_string()), + start: 88, + end: 99, + }], + })) + ); + assert_eq!(searcher.lock().await.next().await, None); + } +} diff --git a/distant-core/src/constants.rs b/distant-core/src/constants.rs index 2a949ec..2fc79be 100644 --- a/distant-core/src/constants.rs +++ b/distant-core/src/constants.rs @@ -4,6 +4,9 @@ pub const CLIENT_PIPE_CAPACITY: usize = 10000; /// Capacity associated with a client watcher receiving changes pub const CLIENT_WATCHER_CAPACITY: usize = 100; +/// Capacity associated with a client searcher receiving matches +pub const CLIENT_SEARCHER_CAPACITY: usize = 10000; + /// Capacity associated with the server's file watcher to pass events outbound pub const SERVER_WATCHER_CAPACITY: usize = 10000; diff --git a/distant-core/src/data.rs b/distant-core/src/data.rs index 1104f7f..abeca95 100644 --- a/distant-core/src/data.rs +++ b/distant-core/src/data.rs @@ -33,6 +33,9 @@ pub use metadata::*; mod pty; pub use pty::*; +mod search; +pub use search::*; + mod system; pub use system::*; @@ -145,6 +148,7 @@ impl DistantMsg { #[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))] #[cfg_attr(feature = "clap", derive(clap::Subcommand))] #[strum_discriminants(derive( + AsRefStr, strum::Display, EnumIter, EnumMessage, @@ -381,6 +385,22 @@ pub enum DistantRequestData { resolve_file_type: bool, }, + /// Searches filesystem using the provided query + #[strum_discriminants(strum(message = "Supports searching filesystem using queries"))] + Search { + /// Query to perform against the filesystem + query: SearchQuery, + }, + + /// Cancels an active search being run against the filesystem + #[strum_discriminants(strum( + message = "Supports canceling an active search against the filesystem" + ))] + CancelSearch { + /// Id of the search to cancel + id: SearchId, + }, + /// Spawns a new process on the remote machine #[cfg_attr(feature = "clap", clap(visible_aliases = &["spawn", "run"]))] #[strum_discriminants(strum(message = "Supports spawning a process"))] @@ -499,6 +519,27 @@ pub enum DistantResponseData { /// Represents metadata about some filesystem object (file, directory, symlink) on remote machine Metadata(Metadata), + /// Represents a search being started + SearchStarted { + /// Arbitrary id associated with search + id: SearchId, + }, + + /// Represents some subset of results for a search query (may not be all of them) + SearchResults { + /// Arbitrary id associated with search + id: SearchId, + + /// Collection of matches from performing a query + matches: Vec, + }, + + /// Represents a search being completed + SearchDone { + /// Arbitrary id associated with search + id: SearchId, + }, + /// Response to starting a new process ProcSpawned { /// Arbitrary id associated with running process diff --git a/distant-core/src/data/capabilities.rs b/distant-core/src/data/capabilities.rs index 4a8fff6..bebd416 100644 --- a/distant-core/src/data/capabilities.rs +++ b/distant-core/src/data/capabilities.rs @@ -53,6 +53,15 @@ impl Capabilities { self.0.take(&cap) } + /// Removes the capability with the described kind, returning true if it existed + pub fn remove(&mut self, kind: impl AsRef) -> bool { + let cap = Capability { + kind: kind.as_ref().to_string(), + description: String::new(), + }; + self.0.remove(&cap) + } + /// Converts into vec of capabilities sorted by kind pub fn into_sorted_vec(self) -> Vec { let mut this = self.0.into_iter().collect::>(); diff --git a/distant-core/src/data/filesystem.rs b/distant-core/src/data/filesystem.rs index 5ca9f53..3cd6d89 100644 --- a/distant-core/src/data/filesystem.rs +++ b/distant-core/src/data/filesystem.rs @@ -1,6 +1,6 @@ use derive_more::IsVariant; use serde::{Deserialize, Serialize}; -use std::path::PathBuf; +use std::{fs::FileType as StdFileType, path::PathBuf}; use strum::AsRefStr; /// Represents information about a single entry within a directory @@ -27,7 +27,7 @@ impl DirEntry { } /// Represents the type associated with a dir entry -#[derive(Copy, Clone, Debug, PartialEq, Eq, AsRefStr, IsVariant, Serialize, Deserialize)] +#[derive(Copy, Clone, Debug, Hash, PartialEq, Eq, AsRefStr, IsVariant, Serialize, Deserialize)] #[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))] #[serde(rename_all = "snake_case", deny_unknown_fields)] #[strum(serialize_all = "snake_case")] @@ -37,6 +37,18 @@ pub enum FileType { Symlink, } +impl From for FileType { + fn from(ft: StdFileType) -> Self { + if ft.is_dir() { + Self::Dir + } else if ft.is_symlink() { + Self::Symlink + } else { + Self::File + } + } +} + #[cfg(feature = "schemars")] impl FileType { pub fn root_schema() -> schemars::schema::RootSchema { diff --git a/distant-core/src/data/search.rs b/distant-core/src/data/search.rs new file mode 100644 index 0000000..62a0ea8 --- /dev/null +++ b/distant-core/src/data/search.rs @@ -0,0 +1,335 @@ +use super::FileType; +use serde::{Deserialize, Serialize}; +use std::{borrow::Cow, collections::HashSet, path::PathBuf, str::FromStr}; + +/// Id associated with a search +pub type SearchId = u32; + +/// Represents a query to perform against the filesystem +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))] +pub struct SearchQuery { + /// Path in which to perform the query + pub path: PathBuf, + + /// Kind of data to example using conditions + pub target: SearchQueryTarget, + + /// Condition to meet to be considered a match + pub condition: SearchQueryCondition, + + /// Options to apply to the query + #[serde(default)] + pub options: SearchQueryOptions, +} + +#[cfg(feature = "schemars")] +impl SearchQuery { + pub fn root_schema() -> schemars::schema::RootSchema { + schemars::schema_for!(SearchQuery) + } +} + +impl FromStr for SearchQuery { + type Err = serde_json::error::Error; + + /// Parses search query from a JSON string + fn from_str(s: &str) -> Result { + serde_json::from_str(s) + } +} + +/// Kind of data to examine using conditions +#[derive(Copy, Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))] +#[serde(rename_all = "snake_case")] +pub enum SearchQueryTarget { + /// Checks path of file, directory, or symlink + Path, + + /// Checks contents of files + Contents, +} + +#[cfg(feature = "schemars")] +impl SearchQueryTarget { + pub fn root_schema() -> schemars::schema::RootSchema { + schemars::schema_for!(SearchQueryTarget) + } +} + +/// Condition used to find a match in a search query +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))] +#[serde(rename_all = "snake_case", deny_unknown_fields, tag = "type")] +pub enum SearchQueryCondition { + /// Begins with some text + EndsWith { value: String }, + + /// Matches some text exactly + Equals { value: String }, + + /// Matches some regex + Regex { value: String }, + + /// Begins with some text + StartsWith { value: String }, +} + +impl SearchQueryCondition { + /// Creates a new instance with `EndsWith` variant + pub fn ends_with(value: impl Into) -> Self { + Self::EndsWith { + value: value.into(), + } + } + + /// Creates a new instance with `Equals` variant + pub fn equals(value: impl Into) -> Self { + Self::Equals { + value: value.into(), + } + } + + /// Creates a new instance with `Regex` variant + pub fn regex(value: impl Into) -> Self { + Self::Regex { + value: value.into(), + } + } + + /// Creates a new instance with `StartsWith` variant + pub fn starts_with(value: impl Into) -> Self { + Self::StartsWith { + value: value.into(), + } + } + + /// Converts the condition in a regex string + pub fn to_regex_string(&self) -> String { + match self { + Self::EndsWith { value } => format!(r"{value}$"), + Self::Equals { value } => format!(r"^{value}$"), + Self::Regex { value } => value.to_string(), + Self::StartsWith { value } => format!(r"^{value}"), + } + } +} + +#[cfg(feature = "schemars")] +impl SearchQueryCondition { + pub fn root_schema() -> schemars::schema::RootSchema { + schemars::schema_for!(SearchQueryCondition) + } +} + +/// Options associated with a search query +#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)] +#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))] +pub struct SearchQueryOptions { + /// Restrict search to only these file types (otherwise all are allowed) + #[serde(default)] + pub allowed_file_types: HashSet, + + /// Regex to use to filter paths being searched to only those that match the include condition + #[serde(default)] + pub include: Option, + + /// Regex to use to filter paths being searched to only those that do not match the exclude + /// condition + #[serde(default)] + pub exclude: Option, + + /// Search should follow symbolic links + #[serde(default)] + pub follow_symbolic_links: bool, + + /// Maximum results to return before stopping the query + #[serde(default)] + pub limit: Option, + + /// Minimum depth (directories) to search + /// + /// The smallest depth is 0 and always corresponds to the path given to the new function on + /// this type. Its direct descendents have depth 1, and their descendents have depth 2, and so + /// on. + #[serde(default)] + pub min_depth: Option, + + /// Maximum depth (directories) to search + /// + /// The smallest depth is 0 and always corresponds to the path given to the new function on + /// this type. Its direct descendents have depth 1, and their descendents have depth 2, and so + /// on. + /// + /// Note that this will not simply filter the entries of the iterator, but it will actually + /// avoid descending into directories when the depth is exceeded. + #[serde(default)] + pub max_depth: Option, + + /// Amount of results to batch before sending back excluding final submission that will always + /// include the remaining results even if less than pagination request + #[serde(default)] + pub pagination: Option, +} + +#[cfg(feature = "schemars")] +impl SearchQueryOptions { + pub fn root_schema() -> schemars::schema::RootSchema { + schemars::schema_for!(SearchQueryOptions) + } +} + +/// Represents a match for a search query +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))] +#[serde(rename_all = "snake_case", deny_unknown_fields, tag = "type")] +pub enum SearchQueryMatch { + /// Matches part of a file's path + Path(SearchQueryPathMatch), + + /// Matches part of a file's contents + Contents(SearchQueryContentsMatch), +} + +impl SearchQueryMatch { + pub fn into_path_match(self) -> Option { + match self { + Self::Path(x) => Some(x), + _ => None, + } + } + + pub fn into_contents_match(self) -> Option { + match self { + Self::Contents(x) => Some(x), + _ => None, + } + } +} + +#[cfg(feature = "schemars")] +impl SearchQueryMatch { + pub fn root_schema() -> schemars::schema::RootSchema { + schemars::schema_for!(SearchQueryMatch) + } +} + +/// Represents details for a match on a path +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))] +pub struct SearchQueryPathMatch { + /// Path associated with the match + pub path: PathBuf, + + /// Collection of matches tied to `path` where each submatch's byte offset is relative to + /// `path` + pub submatches: Vec, +} + +#[cfg(feature = "schemars")] +impl SearchQueryPathMatch { + pub fn root_schema() -> schemars::schema::RootSchema { + schemars::schema_for!(SearchQueryPathMatch) + } +} + +/// Represents details for a match on a file's contents +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))] +pub struct SearchQueryContentsMatch { + /// Path to file whose contents match + pub path: PathBuf, + + /// Line(s) that matched + pub lines: SearchQueryMatchData, + + /// Line number where match starts (base index 1) + pub line_number: u64, + + /// Absolute byte offset corresponding to the start of `lines` in the data being searched + pub absolute_offset: u64, + + /// Collection of matches tied to `lines` where each submatch's byte offset is relative to + /// `lines` and not the overall content + pub submatches: Vec, +} + +#[cfg(feature = "schemars")] +impl SearchQueryContentsMatch { + pub fn root_schema() -> schemars::schema::RootSchema { + schemars::schema_for!(SearchQueryContentsMatch) + } +} + +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))] +pub struct SearchQuerySubmatch { + /// Content matched by query + pub r#match: SearchQueryMatchData, + + /// Byte offset representing start of submatch (inclusive) + pub start: u64, + + /// Byte offset representing end of submatch (exclusive) + pub end: u64, +} + +#[cfg(feature = "schemars")] +impl SearchQuerySubmatch { + pub fn root_schema() -> schemars::schema::RootSchema { + schemars::schema_for!(SearchQuerySubmatch) + } +} + +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))] +#[serde( + rename_all = "snake_case", + deny_unknown_fields, + tag = "type", + content = "value" +)] +pub enum SearchQueryMatchData { + /// Match represented as UTF-8 text + Text(String), + + /// Match represented as bytes + Bytes(Vec), +} + +impl SearchQueryMatchData { + /// Creates a new instance with `Text` variant + pub fn text(value: impl Into) -> Self { + Self::Text(value.into()) + } + + /// Creates a new instance with `Bytes` variant + pub fn bytes(value: impl Into>) -> Self { + Self::Bytes(value.into()) + } + + /// Returns the UTF-8 str reference to the data, if is valid UTF-8 + pub fn to_str(&self) -> Option<&str> { + match self { + Self::Text(x) => Some(x), + Self::Bytes(x) => std::str::from_utf8(x).ok(), + } + } + + /// Converts data to a UTF-8 string, replacing any invalid UTF-8 sequences with + /// [`U+FFFD REPLACEMENT CHARACTER`](https://doc.rust-lang.org/nightly/core/char/const.REPLACEMENT_CHARACTER.html) + pub fn to_string_lossy(&self) -> Cow<'_, str> { + match self { + Self::Text(x) => Cow::Borrowed(x), + Self::Bytes(x) => String::from_utf8_lossy(x), + } + } +} + +#[cfg(feature = "schemars")] +impl SearchQueryMatchData { + pub fn root_schema() -> schemars::schema::RootSchema { + schemars::schema_for!(SearchQueryMatchData) + } +} diff --git a/distant-core/src/manager/data/capabilities.rs b/distant-core/src/manager/data/capabilities.rs index 0efd781..43a55c0 100644 --- a/distant-core/src/manager/data/capabilities.rs +++ b/distant-core/src/manager/data/capabilities.rs @@ -57,6 +57,15 @@ impl ManagerCapabilities { self.0.take(&cap) } + /// Removes the capability with the described kind, returning true if it existed + pub fn remove(&mut self, kind: impl AsRef) -> bool { + let cap = ManagerCapability { + kind: kind.as_ref().to_string(), + description: String::new(), + }; + self.0.remove(&cap) + } + /// Converts into vec of capabilities sorted by kind pub fn into_sorted_vec(self) -> Vec { let mut this = self.0.into_iter().collect::>(); diff --git a/distant-core/src/manager/data/request.rs b/distant-core/src/manager/data/request.rs index 0d20f0a..b8e0906 100644 --- a/distant-core/src/manager/data/request.rs +++ b/distant-core/src/manager/data/request.rs @@ -3,11 +3,13 @@ use crate::{DistantMsg, DistantRequestData, Map}; use derive_more::IsVariant; use distant_net::Request; use serde::{Deserialize, Serialize}; -use strum::{EnumDiscriminants, EnumIter, EnumMessage, EnumString}; +use strum::{AsRefStr, EnumDiscriminants, EnumIter, EnumMessage, EnumString}; +#[allow(clippy::large_enum_variant)] #[derive(Clone, Debug, EnumDiscriminants, Serialize, Deserialize)] #[cfg_attr(feature = "clap", derive(clap::Subcommand))] #[strum_discriminants(derive( + AsRefStr, strum::Display, EnumIter, EnumMessage, diff --git a/distant-ssh2/src/api.rs b/distant-ssh2/src/api.rs index a8ff2de..3a37e6e 100644 --- a/distant-ssh2/src/api.rs +++ b/distant-ssh2/src/api.rs @@ -7,8 +7,8 @@ use async_once_cell::OnceCell; use async_trait::async_trait; use distant_core::{ data::{ - Capabilities, DirEntry, Environment, FileType, Metadata, ProcessId, PtySize, SystemInfo, - UnixMetadata, + Capabilities, CapabilityKind, DirEntry, Environment, FileType, Metadata, ProcessId, + PtySize, SystemInfo, UnixMetadata, }, DistantApi, DistantCtx, }; @@ -82,7 +82,14 @@ impl DistantApi for SshDistantApi { async fn capabilities(&self, ctx: DistantCtx) -> io::Result { debug!("[Conn {}] Querying capabilities", ctx.connection_id); - Ok(Capabilities::all()) + let mut capabilities = Capabilities::all(); + + // Searching is not supported by ssh implementation + // TODO: Could we have external search using ripgrep's JSON lines API? + capabilities.take(CapabilityKind::Search); + capabilities.take(CapabilityKind::CancelSearch); + + Ok(capabilities) } async fn read_file( diff --git a/src/cli/commands.rs b/src/cli/commands.rs index c34e049..ebf4757 100644 --- a/src/cli/commands.rs +++ b/src/cli/commands.rs @@ -5,6 +5,7 @@ mod generate; mod manager; mod server; +#[allow(clippy::large_enum_variant)] #[derive(Debug, Subcommand)] pub enum DistantSubcommand { /// Perform client commands diff --git a/src/cli/commands/client.rs b/src/cli/commands/client.rs index 10fc8ad..459aa9d 100644 --- a/src/cli/commands/client.rs +++ b/src/cli/commands/client.rs @@ -17,7 +17,7 @@ use distant_core::{ data::{ChangeKindSet, Environment}, net::{IntoSplit, Request, Response, TypedAsyncRead, TypedAsyncWrite}, ConnectionId, Destination, DistantManagerClient, DistantMsg, DistantRequestData, - DistantResponseData, Host, Map, RemoteCommand, Watcher, + DistantResponseData, Host, Map, RemoteCommand, Searcher, Watcher, }; use log::*; use serde_json::{json, Value}; @@ -279,7 +279,7 @@ impl ClientSubcommand { } ); - let formatter = Formatter::shell(); + let mut formatter = Formatter::shell(); debug!("Sending request {:?}", request); match request { @@ -320,6 +320,26 @@ impl ClientSubcommand { } } } + DistantRequestData::Search { query } => { + debug!("Special request creating searcher for {:?}", query); + let mut searcher = Searcher::search(channel, query) + .await + .context("Failed to start search")?; + + // Continue to receive and process matches + while let Some(m) = searcher.next().await { + // TODO: Provide a cleaner way to print just a match + let res = Response::new( + "".to_string(), + DistantMsg::Single(DistantResponseData::SearchResults { + id: 0, + matches: vec![m], + }), + ); + + formatter.print(res).context("Failed to print match")?; + } + } DistantRequestData::Watch { path, recursive, diff --git a/src/cli/commands/client/format.rs b/src/cli/commands/client/format.rs index 4a0e87a..920ec7b 100644 --- a/src/cli/commands/client/format.rs +++ b/src/cli/commands/client/format.rs @@ -1,10 +1,17 @@ use clap::ValueEnum; use distant_core::{ - data::{ChangeKind, DistantMsg, DistantResponseData, Error, FileType, Metadata, SystemInfo}, + data::{ + ChangeKind, DistantMsg, DistantResponseData, Error, FileType, Metadata, + SearchQueryContentsMatch, SearchQueryMatch, SearchQueryPathMatch, SystemInfo, + }, net::Response, }; use log::*; -use std::io::{self, Write}; +use std::{ + collections::HashMap, + io::{self, Write}, + path::PathBuf, +}; use tabled::{object::Rows, style::Style, Alignment, Disable, Modify, Table, Tabled}; #[derive(Copy, Clone, Debug, PartialEq, Eq, ValueEnum)] @@ -31,14 +38,24 @@ impl Default for Format { } } +#[derive(Default)] +struct FormatterState { + /// Last seen path during search + pub last_searched_path: Option, +} + pub struct Formatter { format: Format, + state: FormatterState, } impl Formatter { /// Create a new output message for the given response based on the specified format pub fn new(format: Format) -> Self { - Self { format } + Self { + format, + state: Default::default(), + } } /// Creates a new [`Formatter`] using [`Format`] of `Format::Shell` @@ -47,7 +64,7 @@ impl Formatter { } /// Consumes the output message, printing it based on its configuration - pub fn print(&self, res: Response>) -> io::Result<()> { + pub fn print(&mut self, res: Response>) -> io::Result<()> { let output = match self.format { Format::Json => Output::StdoutLine( serde_json::to_vec(&res) @@ -61,7 +78,7 @@ impl Formatter { "Shell does not support batch responses", )) } - Format::Shell => format_shell(res.payload.into_single().unwrap()), + Format::Shell => format_shell(&mut self.state, res.payload.into_single().unwrap()), }; match output { @@ -127,7 +144,7 @@ enum Output { None, } -fn format_shell(data: DistantResponseData) -> Output { +fn format_shell(state: &mut FormatterState, data: DistantResponseData) -> Output { match data { DistantResponseData::Ok => Output::None, DistantResponseData::Error(Error { description, .. }) => { @@ -283,6 +300,68 @@ fn format_shell(data: DistantResponseData) -> Output { ) .into_bytes(), ), + DistantResponseData::SearchStarted { id } => { + Output::StdoutLine(format!("Query {id} started").into_bytes()) + } + DistantResponseData::SearchDone { .. } => Output::None, + DistantResponseData::SearchResults { matches, .. } => { + let mut files: HashMap<_, Vec> = HashMap::new(); + let mut is_targeting_paths = false; + + for m in matches { + match m { + SearchQueryMatch::Path(SearchQueryPathMatch { path, .. }) => { + // Create the entry with no lines called out + files.entry(path).or_default(); + is_targeting_paths = true; + } + + SearchQueryMatch::Contents(SearchQueryContentsMatch { + path, + lines, + line_number, + .. + }) => { + let file_matches = files.entry(path).or_default(); + + file_matches.push(format!( + "{line_number}:{}", + lines.to_string_lossy().trim_end() + )); + } + } + } + + let mut output = String::new(); + for (path, lines) in files { + use std::fmt::Write; + + // If we are seening a new path, print it out + if state.last_searched_path.as_deref() != Some(path.as_path()) { + // If we have already seen some path before, we would have printed it, and + // we want to add a space between it and the current path, but only if we are + // printing out file content matches and not paths + if state.last_searched_path.is_some() && !is_targeting_paths { + writeln!(&mut output).unwrap(); + } + + writeln!(&mut output, "{}", path.to_string_lossy()).unwrap(); + } + + for line in lines { + writeln!(&mut output, "{line}").unwrap(); + } + + // Update our last seen path + state.last_searched_path = Some(path); + } + + if !output.is_empty() { + Output::Stdout(output.into_bytes()) + } else { + Output::None + } + } DistantResponseData::ProcSpawned { .. } => Output::None, DistantResponseData::ProcStdout { data, .. } => Output::Stdout(data), DistantResponseData::ProcStderr { data, .. } => Output::Stderr(data), diff --git a/tests/cli/action/capabilities.rs b/tests/cli/action/capabilities.rs index 6b6a150..b4f130d 100644 --- a/tests/cli/action/capabilities.rs +++ b/tests/cli/action/capabilities.rs @@ -7,6 +7,8 @@ const EXPECTED_TABLE: &str = indoc! {" +------------------+------------------------------------------------------------------+ | kind | description | +------------------+------------------------------------------------------------------+ +| cancel_search | Supports canceling an active search against the filesystem | ++------------------+------------------------------------------------------------------+ | capabilities | Supports retrieving capabilities | +------------------+------------------------------------------------------------------+ | copy | Supports copying files, directories, and symlinks | @@ -43,6 +45,8 @@ const EXPECTED_TABLE: &str = indoc! {" +------------------+------------------------------------------------------------------+ | rename | Supports renaming files, directories, and symlinks | +------------------+------------------------------------------------------------------+ +| search | Supports searching filesystem using queries | ++------------------+------------------------------------------------------------------+ | system_info | Supports retrieving system information | +------------------+------------------------------------------------------------------+ | unwatch | Supports unwatching filesystem for changes | diff --git a/tests/cli/action/mod.rs b/tests/cli/action/mod.rs index 39cfc6b..b44cc65 100644 --- a/tests/cli/action/mod.rs +++ b/tests/cli/action/mod.rs @@ -13,5 +13,6 @@ mod metadata; mod proc_spawn; mod remove; mod rename; +mod search; mod system_info; mod watch; diff --git a/tests/cli/action/search.rs b/tests/cli/action/search.rs new file mode 100644 index 0000000..a650e78 --- /dev/null +++ b/tests/cli/action/search.rs @@ -0,0 +1,62 @@ +use crate::cli::fixtures::*; +use assert_cmd::Command; +use assert_fs::prelude::*; +use indoc::indoc; +use predicates::Predicate; +use rstest::*; +use serde_json::json; + +const SEARCH_RESULTS_REGEX: &str = indoc! {r" +.*?[\\/]file1.txt +1:some file text + +.*?[\\/]file2.txt +3:textual + +.*?[\\/]file3.txt +1:more content +"}; + +#[rstest] +fn should_search_filesystem_using_query(mut action_cmd: CtxCommand) { + let root = assert_fs::TempDir::new().unwrap(); + root.child("file1.txt").write_str("some file text").unwrap(); + root.child("file2.txt") + .write_str("lines\nof\ntextual\ninformation") + .unwrap(); + root.child("file3.txt").write_str("more content").unwrap(); + + let query = json!({ + "path": root.path().to_string_lossy(), + "target": "contents", + "condition": {"type": "regex", "value": "te[a-z]*\\b"}, + }); + + let stdout_predicate_fn = predicates::function::function(|s: &[u8]| { + let s = std::str::from_utf8(s).unwrap(); + + // Split by empty line, sort, and then rejoin with empty line inbetween + let mut lines = s + .split("\n\n") + .map(|lines| lines.trim_end()) + .collect::>(); + lines.sort_unstable(); + + // Put together sorted text lines + let full_text = lines.join("\n\n"); + + // Verify that it matches our search results regex + let regex_fn = predicates::str::is_match(SEARCH_RESULTS_REGEX).unwrap(); + + regex_fn.eval(&full_text) + }); + + // distant action system-info + action_cmd + .arg("search") + .arg(&serde_json::to_string(&query).unwrap()) + .assert() + .success() + .stdout(stdout_predicate_fn) + .stderr(""); +} diff --git a/tests/cli/repl/mod.rs b/tests/cli/repl/mod.rs index 39cfc6b..b44cc65 100644 --- a/tests/cli/repl/mod.rs +++ b/tests/cli/repl/mod.rs @@ -13,5 +13,6 @@ mod metadata; mod proc_spawn; mod remove; mod rename; +mod search; mod system_info; mod watch; diff --git a/tests/cli/repl/search.rs b/tests/cli/repl/search.rs new file mode 100644 index 0000000..7622d82 --- /dev/null +++ b/tests/cli/repl/search.rs @@ -0,0 +1,82 @@ +use crate::cli::fixtures::*; +use assert_fs::prelude::*; +use rstest::*; +use serde_json::json; + +#[rstest] +#[tokio::test] +async fn should_support_json_search_filesystem_using_query(mut json_repl: CtxCommand) { + let root = assert_fs::TempDir::new().unwrap(); + root.child("file1.txt").write_str("some file text").unwrap(); + root.child("file2.txt") + .write_str("lines\nof\ntextual\ninformation") + .unwrap(); + root.child("file3.txt").write_str("more content").unwrap(); + + let id = rand::random::().to_string(); + let req = json!({ + "id": id, + "payload": { + "type": "search", + "query": { + "path": root.path().to_string_lossy(), + "target": "contents", + "condition": {"type": "regex", "value": "ua"}, + }, + }, + }); + + // Submit search request and get back started confirmation + let res = json_repl.write_and_read_json(req).await.unwrap().unwrap(); + + // Get id from started confirmation + assert_eq!(res["origin_id"], id); + assert_eq!(res["payload"]["type"], "search_started"); + let search_id = res["payload"]["id"] + .as_u64() + .expect("id missing or not number"); + + // Get search results back + let res = json_repl.read_json_from_stdout().await.unwrap().unwrap(); + assert_eq!(res["origin_id"], id); + assert_eq!( + res["payload"], + json!({ + "type": "search_results", + "id": search_id, + "matches": [ + { + "type": "contents", + "path": root.child("file2.txt").to_string_lossy(), + "lines": { + "type": "text", + "value": "textual\n", + }, + "line_number": 3, + "absolute_offset": 9, + "submatches": [ + { + "match": { + "type": "text", + "value": "ua", + }, + "start": 4, + "end": 6, + } + ], + }, + ] + }) + ); + + // Get search completion confirmation + let res = json_repl.read_json_from_stdout().await.unwrap().unwrap(); + assert_eq!(res["origin_id"], id); + assert_eq!( + res["payload"], + json!({ + "type": "search_done", + "id": search_id, + }) + ); +}