Refactor state searcher, fix not terminating when search is done, refactor formatter to support stateful printing

pull/131/head
Chip Senkbeil 2 years ago
parent d3838a0e22
commit 452e1e3e53
No known key found for this signature in database
GPG Key ID: 35EF1F8EC72A4131

@ -1,7 +1,7 @@
use crate::data::{
DistantResponseData, SearchId, SearchQuery, SearchQueryCondition, SearchQueryContentsMatch,
SearchQueryMatch, SearchQueryMatchData, SearchQueryOptions, SearchQueryPathMatch,
SearchQuerySubmatch, SearchQueryTarget,
DistantResponseData, SearchId, SearchQuery, SearchQueryContentsMatch, SearchQueryMatch,
SearchQueryMatchData, SearchQueryOptions, SearchQueryPathMatch, SearchQuerySubmatch,
SearchQueryTarget,
};
use distant_net::Reply;
use grep::{
@ -15,7 +15,7 @@ use tokio::{
sync::{mpsc, oneshot},
task::JoinHandle,
};
use walkdir::WalkDir;
use walkdir::{DirEntry, WalkDir};
/// Holds information related to active searches on the server
pub struct SearchState {
@ -123,164 +123,37 @@ async fn search_task(tx: mpsc::Sender<InnerSearchMsg>, mut rx: mpsc::Receiver<In
while let Some(msg) = rx.recv().await {
match msg {
InnerSearchMsg::Start { query, reply, cb } => {
let id = rand::random::<SearchId>();
let options = query.options.clone();
// Build our executor and send an error if it fails
let mut executor = match SearchQueryExecutor::new(query) {
Ok(executor) => executor,
Err(x) => {
let _ = cb.send(Err(x));
return;
}
};
// Attach a callback for when the process is finished where
// we will remove it from our above list
let tx = tx.clone();
// Get the unique search id
let id = executor.id();
// Create a cancel channel to support interrupting and stopping the search
let (cancel_tx, mut cancel_rx) = oneshot::channel();
// Queue up our search internally with a cancel sender
searches.insert(id, executor.take_cancel_tx().unwrap());
// Queue up our search internally and report back the id
searches.insert(id, cancel_tx);
// Report back the search id
let _ = cb.send(Ok(id));
let SearchQuery {
path,
target,
condition,
// Spawn our reporter of matches coming from the executor
SearchQueryReporter {
id,
options,
} = query;
// Create a blocking task that will search through all files within the
// query path and look for matches
tokio::task::spawn_blocking(move || {
let SearchQueryOptions {
limit,
pagination,
allowed_file_types,
follow_symbolic_links,
} = options;
// Define our walking setup
let walk_dir = WalkDir::new(path).follow_links(follow_symbolic_links);
// Define our cache of matches
let mut matches = Vec::new();
let (done_tx, mut done_rx) = mpsc::channel(1);
// Pushes a match, clearing and sending matches if we reach pagination,
// and returning true if should continue or false if limit reached
let mut push_match = |m: SearchQueryMatch| -> io::Result<bool> {
matches.push(m);
let should_continue = match limit.as_ref() {
Some(cnt) if *cnt == matches.len() as u64 => {
trace!("[Query {id}] Reached limit of {cnt} matches, so stopping search");
let _ = done_tx.send(());
false
}
_ => true,
};
if let Some(len) = pagination {
if matches.len() as u64 >= len {
trace!(
"[Query {id}] Reached pagination capacity of {len} matches, so forwarding search results to client"
);
let _ = reply.blocking_send(DistantResponseData::SearchResults {
id,
matches: std::mem::take(&mut matches),
});
}
}
Ok(should_continue)
};
// Define our search pattern
let pattern = match condition {
SearchQueryCondition::EndsWith { value } => format!(r"{value}$"),
SearchQueryCondition::Equals { value } => format!(r"^{value}$"),
SearchQueryCondition::Regex { value } => value,
SearchQueryCondition::StartsWith { value } => format!(r"^{value}"),
};
// Define our matcher using regex as the condition and execute the search
match RegexMatcher::new(&pattern) {
Ok(matcher) => {
// Search all entries for matches and report them
for entry in walk_dir.into_iter().filter_map(|e| e.ok()) {
// Check if we are being interrupted, and if so exit our loop early
match cancel_rx.try_recv() {
Err(oneshot::error::TryRecvError::Empty) => (),
_ => break,
}
// Check if our limit has been reached
match done_rx.try_recv() {
Err(mpsc::error::TryRecvError::Empty) => (),
_ => break,
}
// Skip if provided explicit file types to search
if !allowed_file_types.is_empty()
&& !allowed_file_types.contains(&entry.file_type().into())
{
continue;
}
let res = match target {
// Perform the search against the path itself
SearchQueryTarget::Path => {
let path_str = entry.path().to_string_lossy();
Searcher::new().search_slice(
&matcher,
path_str.as_bytes(),
SearchQueryPathSink {
search_id: id,
path: entry.path(),
matcher: &matcher,
callback: &mut push_match,
},
)
}
// Skip if trying to search contents of non-file
SearchQueryTarget::Contents if !entry.file_type().is_file() => {
continue
}
// Perform the search against the file's contents
SearchQueryTarget::Contents => Searcher::new().search_path(
&matcher,
entry.path(),
SearchQueryContentsSink {
search_id: id,
path: entry.path(),
matcher: &matcher,
callback: &mut push_match,
},
),
};
if let Err(x) = res {
error!(
"[Query {id}] Search failed for {:?}: {x}",
entry.path()
);
}
}
}
Err(x) => {
error!("[Query {id}] Failed to define regex matcher: {x}");
}
}
// Send any remaining matches
if !matches.is_empty() {
trace!("[Query {id}] Sending final {} matches", matches.len());
let _ =
reply.blocking_send(DistantResponseData::SearchResults { id, matches });
}
// Send back our search completion event
let _ = reply.blocking_send(DistantResponseData::SearchDone { id });
rx: executor.take_match_rx().unwrap(),
reply,
}
.spawn();
// Once complete, we need to send a request to remove the search from our list
let _ = tx.blocking_send(InnerSearchMsg::InternalRemove { id });
});
// Spawn our executor to run
executor.spawn(tx.clone());
}
InnerSearchMsg::Cancel { id, cb } => {
let _ = cb.send(match searches.remove(&id) {
@ -295,12 +168,290 @@ async fn search_task(tx: mpsc::Sender<InnerSearchMsg>, mut rx: mpsc::Receiver<In
});
}
InnerSearchMsg::InternalRemove { id } => {
trace!("[Query {id}] Removing internal tracking");
searches.remove(&id);
}
}
}
}
struct SearchQueryReporter {
id: SearchId,
options: SearchQueryOptions,
rx: mpsc::UnboundedReceiver<SearchQueryMatch>,
reply: Box<dyn Reply<Data = DistantResponseData>>,
}
impl SearchQueryReporter {
/// Runs the reporter to completion in an async task
pub fn spawn(self) {
tokio::spawn(self.run());
}
async fn run(self) {
let Self {
id,
options,
mut rx,
reply,
} = self;
// Queue of matches that we hold until reaching pagination
let mut matches = Vec::new();
let mut total_matches_cnt = 0;
trace!("[Query {id}] Starting reporter with {options:?}");
while let Some(m) = rx.recv().await {
matches.push(m);
total_matches_cnt += 1;
// Check if we've reached the limit, and quit if we have
if let Some(len) = options.limit {
if total_matches_cnt >= len {
trace!("[Query {id}] Reached limit of {len} matches");
break;
}
}
// Check if we've reached pagination size, and send queued if so
if let Some(len) = options.pagination {
if matches.len() as u64 >= len {
trace!("[Query {id}] Reached {len} paginated matches");
if let Err(x) = reply
.send(DistantResponseData::SearchResults {
id,
matches: std::mem::take(&mut matches),
})
.await
{
error!("[Query {id}] Failed to send paginated matches: {x}");
}
}
}
}
// Send any remaining matches
if !matches.is_empty() {
trace!("[Query {id}] Sending {} remaining matches", matches.len());
if let Err(x) = reply
.send(DistantResponseData::SearchResults { id, matches })
.await
{
error!("[Query {id}] Failed to send final matches: {x}");
}
}
// Report that we are done
trace!("[Query {id}] Reporting as done");
if let Err(x) = reply.send(DistantResponseData::SearchDone { id }).await {
error!("[Query {id}] Failed to send done status: {x}");
}
}
}
struct SearchQueryExecutor {
id: SearchId,
query: SearchQuery,
walk_dir: WalkDir,
matcher: RegexMatcher,
cancel_tx: Option<oneshot::Sender<()>>,
cancel_rx: oneshot::Receiver<()>,
match_tx: mpsc::UnboundedSender<SearchQueryMatch>,
match_rx: Option<mpsc::UnboundedReceiver<SearchQueryMatch>>,
}
impl SearchQueryExecutor {
/// Creates a new executor
pub fn new(query: SearchQuery) -> io::Result<Self> {
let (cancel_tx, cancel_rx) = oneshot::channel();
let (match_tx, match_rx) = mpsc::unbounded_channel();
let path = query.path.as_path();
let follow_links = query.options.follow_symbolic_links;
let regex = query.condition.clone().into_regex_string();
let matcher = RegexMatcher::new(&regex)
.map_err(|x| io::Error::new(io::ErrorKind::InvalidInput, x))?;
let walk_dir = WalkDir::new(path).follow_links(follow_links);
Ok(Self {
id: rand::random(),
query,
matcher,
walk_dir,
cancel_tx: Some(cancel_tx),
cancel_rx,
match_tx,
match_rx: Some(match_rx),
})
}
pub fn id(&self) -> SearchId {
self.id
}
pub fn take_cancel_tx(&mut self) -> Option<oneshot::Sender<()>> {
self.cancel_tx.take()
}
pub fn take_match_rx(&mut self) -> Option<mpsc::UnboundedReceiver<SearchQueryMatch>> {
self.match_rx.take()
}
/// Runs the executor to completion in another thread
pub fn spawn(self, tx: mpsc::Sender<InnerSearchMsg>) {
tokio::task::spawn_blocking(move || {
let id = self.id;
self.run();
// Once complete, we need to send a request to remove the search from our list
let _ = tx.blocking_send(InnerSearchMsg::InternalRemove { id });
});
}
fn run(self) {
let id = self.id;
let walk_dir = self.walk_dir;
let tx = self.match_tx;
let mut cancel = self.cancel_rx;
// Create our path filter we will use to filter entries
let path_filter = match self.query.options.path_regex.as_deref() {
Some(regex) => match SearchQueryPathFilter::new(regex) {
Ok(filter) => {
trace!("[Query {id}] Using regex path filter for {regex:?}");
filter
}
Err(x) => {
error!("[Query {id}] Failed to instantiate path filter: {x}");
return;
}
},
None => {
trace!("[Query {id}] Using noop path filter");
SearchQueryPathFilter::noop()
}
};
let options_filter = SearchQueryOptionsFilter {
target: self.query.target,
options: self.query.options.clone(),
};
// Search all entries for matches and report them
for entry in walk_dir
.into_iter()
.filter_entry(|e| path_filter.filter(e.path()))
.filter_map(|e| e.ok())
.filter(|e| options_filter.filter(e))
{
// Check if we are being interrupted, and if so exit our loop early
match cancel.try_recv() {
Err(oneshot::error::TryRecvError::Empty) => (),
_ => {
debug!("[Query {id}] Cancelled");
break;
}
}
let res = match self.query.target {
// Perform the search against the path itself
SearchQueryTarget::Path => {
let path_str = entry.path().to_string_lossy();
Searcher::new().search_slice(
&self.matcher,
path_str.as_bytes(),
SearchQueryPathSink {
search_id: id,
path: entry.path(),
matcher: &self.matcher,
callback: |m| Ok(tx.send(m).is_ok()),
},
)
}
// Perform the search against the file's contents
SearchQueryTarget::Contents => Searcher::new().search_path(
&self.matcher,
entry.path(),
SearchQueryContentsSink {
search_id: id,
path: entry.path(),
matcher: &self.matcher,
callback: |m| Ok(tx.send(m).is_ok()),
},
),
};
if let Err(x) = res {
error!("[Query {id}] Search failed for {:?}: {x}", entry.path());
}
}
}
}
struct SearchQueryPathFilter {
matcher: Option<RegexMatcher>,
}
impl SearchQueryPathFilter {
pub fn new(regex: &str) -> io::Result<Self> {
Ok(Self {
matcher: Some(
RegexMatcher::new(regex)
.map_err(|x| io::Error::new(io::ErrorKind::InvalidInput, x))?,
),
})
}
/// Returns a filter that always passes the path
pub fn noop() -> Self {
Self { matcher: None }
}
/// Returns true if path passes the filter
pub fn filter(&self, path: impl AsRef<Path>) -> bool {
self.try_filter(path).unwrap_or(false)
}
fn try_filter(&self, path: impl AsRef<Path>) -> io::Result<bool> {
match &self.matcher {
Some(matcher) => matcher
.is_match(path.as_ref().to_string_lossy().as_bytes())
.map_err(|x| io::Error::new(io::ErrorKind::Other, x)),
None => Ok(true),
}
}
}
struct SearchQueryOptionsFilter {
target: SearchQueryTarget,
options: SearchQueryOptions,
}
impl SearchQueryOptionsFilter {
pub fn filter(&self, entry: &DirEntry) -> bool {
// Check if filetype is allowed
let file_type_allowed = self.options.allowed_file_types.is_empty()
|| self
.options
.allowed_file_types
.contains(&entry.file_type().into());
// Check if target is appropriate
let targeted = match self.target {
SearchQueryTarget::Contents => entry.file_type().is_file(),
_ => true,
};
file_type_allowed && targeted
}
}
#[derive(Clone, Debug)]
struct SearchQueryPathSink<'a, M, F>
where
@ -426,3 +577,38 @@ where
Ok(should_continue)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn should_send_event_when_starting_query() {
todo!();
}
#[test]
fn should_send_event_when_query_finished() {
todo!();
}
#[test]
fn should_send_all_matches_at_once_by_default() {
todo!();
}
#[test]
fn should_send_paginated_results_if_specified() {
todo!();
}
#[test]
fn should_send_maximum_of_limit_results_if_specified() {
todo!();
}
#[test]
fn should_limit_searched_paths_using_regex_filter_if_specified() {
todo!();
}
}

@ -49,13 +49,21 @@ impl Searcher {
while let Some(res) = mailbox.next().await {
for data in res.payload.into_vec() {
match data {
// If we get results before the started indicator, queue them up
DistantResponseData::SearchResults { matches, .. } => {
queue.extend(matches);
}
// Once we get the started indicator, mark as ready to go
DistantResponseData::SearchStarted { id } => {
trace!("[Query {id}] Searcher has started");
search_id = Some(id);
}
// If we get an explicit error, convert and return it
DistantResponseData::Error(x) => return Err(io::Error::from(x)),
// Otherwise, we got something unexpected, and report as such
x => {
return Err(io::Error::new(
io::ErrorKind::Other,
@ -81,7 +89,7 @@ impl Searcher {
if tx.send(r#match).await.is_err() {
return Err(io::Error::new(
io::ErrorKind::Other,
"Queue search match dropped",
format!("[Query {id}] Queue search match dropped"),
));
}
}
@ -90,7 +98,12 @@ impl Searcher {
// If we never received an acknowledgement of search before the mailbox closed,
// fail with a missing confirmation error
None => return Err(io::Error::new(io::ErrorKind::Other, "Missing confirmation")),
None => {
return Err(io::Error::new(
io::ErrorKind::Other,
"Search query missing started confirmation",
))
}
};
// Spawn a task that continues to look for search result events and the conclusion of the
@ -98,6 +111,8 @@ impl Searcher {
let task = tokio::spawn({
async move {
while let Some(res) = mailbox.next().await {
let mut done = false;
for data in res.payload.into_vec() {
match data {
DistantResponseData::SearchResults { matches, .. } => {
@ -121,12 +136,18 @@ impl Searcher {
// Received completion indicator, so close out
DistantResponseData::SearchDone { .. } => {
trace!("[Query {search_id}] Searcher has finished");
done = true;
break;
}
_ => continue,
}
}
if done {
break;
}
}
}
});

@ -40,7 +40,7 @@ impl FromStr for SearchQuery {
}
/// Kind of data to examine using conditions
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[derive(Copy, Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))]
#[serde(rename_all = "snake_case")]
pub enum SearchQueryTarget {
@ -76,6 +76,18 @@ pub enum SearchQueryCondition {
StartsWith { value: String },
}
impl SearchQueryCondition {
/// Converts the condition in a regex string
pub fn into_regex_string(self) -> String {
match self {
Self::EndsWith { value } => format!(r"{value}$"),
Self::Equals { value } => format!(r"^{value}$"),
Self::Regex { value } => value,
Self::StartsWith { value } => format!(r"^{value}"),
}
}
}
#[cfg(feature = "schemars")]
impl SearchQueryCondition {
pub fn root_schema() -> schemars::schema::RootSchema {
@ -91,6 +103,11 @@ pub struct SearchQueryOptions {
#[serde(default)]
pub allowed_file_types: HashSet<FileType>,
/// Regex to use to filter paths being searched; will prevent recursing into directories that
/// fail the regex
#[serde(default)]
pub path_regex: Option<String>,
/// Search should follow symbolic links
#[serde(default)]
pub follow_symbolic_links: bool,

@ -279,7 +279,7 @@ impl ClientSubcommand {
}
);
let formatter = Formatter::shell();
let mut formatter = Formatter::shell();
debug!("Sending request {:?}", request);
match request {

@ -10,6 +10,7 @@ use log::*;
use std::{
collections::HashMap,
io::{self, Write},
path::PathBuf,
};
use tabled::{object::Rows, style::Style, Alignment, Disable, Modify, Table, Tabled};
@ -37,14 +38,24 @@ impl Default for Format {
}
}
#[derive(Default)]
struct FormatterState {
/// Last seen path during search
pub last_searched_path: Option<PathBuf>,
}
pub struct Formatter {
format: Format,
state: FormatterState,
}
impl Formatter {
/// Create a new output message for the given response based on the specified format
pub fn new(format: Format) -> Self {
Self { format }
Self {
format,
state: Default::default(),
}
}
/// Creates a new [`Formatter`] using [`Format`] of `Format::Shell`
@ -53,7 +64,7 @@ impl Formatter {
}
/// Consumes the output message, printing it based on its configuration
pub fn print(&self, res: Response<DistantMsg<DistantResponseData>>) -> io::Result<()> {
pub fn print(&mut self, res: Response<DistantMsg<DistantResponseData>>) -> io::Result<()> {
let output = match self.format {
Format::Json => Output::StdoutLine(
serde_json::to_vec(&res)
@ -67,7 +78,7 @@ impl Formatter {
"Shell does not support batch responses",
))
}
Format::Shell => format_shell(res.payload.into_single().unwrap()),
Format::Shell => format_shell(&mut self.state, res.payload.into_single().unwrap()),
};
match output {
@ -133,7 +144,7 @@ enum Output {
None,
}
fn format_shell(data: DistantResponseData) -> Output {
fn format_shell(state: &mut FormatterState, data: DistantResponseData) -> Output {
match data {
DistantResponseData::Ok => Output::None,
DistantResponseData::Error(Error { description, .. }) => {
@ -311,7 +322,10 @@ fn format_shell(data: DistantResponseData) -> Output {
}) => {
let file_matches = files.entry(path).or_default();
file_matches.push(format!("{line_number}:{}", lines.to_string_lossy()));
file_matches.push(format!(
"{line_number}:{}",
lines.to_string_lossy().trim_end()
));
}
}
}
@ -319,11 +333,24 @@ fn format_shell(data: DistantResponseData) -> Output {
let mut output = String::new();
for (path, lines) in files {
use std::fmt::Write;
writeln!(&mut output, "{}", path.to_string_lossy()).unwrap();
// If we are seening a new path, print it out
if state.last_searched_path.as_deref() != Some(path.as_path()) {
// If we have already seen some path before, we would have printed it, and
// we want to add a space between it and the current path
if state.last_searched_path.is_some() {
writeln!(&mut output).unwrap();
}
writeln!(&mut output, "{}", path.to_string_lossy()).unwrap();
}
for line in lines {
writeln!(&mut output, "{line}").unwrap();
}
writeln!(&mut output).unwrap();
// Update our last seen path
state.last_searched_path = Some(path);
}
if !output.is_empty() {

Loading…
Cancel
Save