Refactor to use ignore::WalkParallel

pull/136/head
Chip Senkbeil 2 years ago
parent 32c9775632
commit 84fb2beec5
No known key found for this signature in database
GPG Key ID: 35EF1F8EC72A4131

@ -9,11 +9,11 @@ use grep::{
regex::RegexMatcher,
searcher::{Searcher, Sink, SinkMatch},
};
use ignore::{DirEntry, WalkBuilder, WalkParallel};
use ignore::{DirEntry, ParallelVisitor, ParallelVisitorBuilder, WalkBuilder, WalkParallel};
use log::*;
use std::{collections::HashMap, io, ops::Deref, path::Path};
use tokio::{
sync::{mpsc, oneshot},
sync::{broadcast, mpsc, oneshot},
task::JoinHandle,
};
@ -122,7 +122,7 @@ enum InnerSearchMsg {
}
async fn search_task(tx: mpsc::Sender<InnerSearchMsg>, mut rx: mpsc::Receiver<InnerSearchMsg>) {
let mut searches: HashMap<SearchId, oneshot::Sender<()>> = HashMap::new();
let mut searches: HashMap<SearchId, broadcast::Sender<()>> = HashMap::new();
while let Some(msg) = rx.recv().await {
match msg {
@ -259,8 +259,8 @@ struct SearchQueryExecutor {
walker: WalkParallel,
matcher: RegexMatcher,
cancel_tx: Option<oneshot::Sender<()>>,
cancel_rx: oneshot::Receiver<()>,
cancel_tx: Option<broadcast::Sender<()>>,
cancel_rx: broadcast::Receiver<()>,
match_tx: mpsc::UnboundedSender<SearchQueryMatch>,
match_rx: Option<mpsc::UnboundedReceiver<SearchQueryMatch>>,
@ -269,46 +269,39 @@ struct SearchQueryExecutor {
impl SearchQueryExecutor {
/// Creates a new executor
pub fn new(query: SearchQuery) -> io::Result<Self> {
let (cancel_tx, cancel_rx) = oneshot::channel();
let (cancel_tx, cancel_rx) = broadcast::channel(1);
let (match_tx, match_rx) = mpsc::unbounded_channel();
let regex = query.condition.to_regex_string();
let matcher = RegexMatcher::new(&regex)
.map_err(|x| io::Error::new(io::ErrorKind::InvalidInput, x))?;
let mut walker_builder: Option<WalkBuilder> = None;
for path in query.paths.iter() {
let path = path.as_path();
let follow_links = query.options.follow_symbolic_links;
walker_builder = match walker_builder {
Some(builder) => {
builder.add(path);
Some(builder)
}
None => {
let mut builder = WalkBuilder::new(path);
builder.hidden(false).follow_links(follow_links);
Some(builder)
}
};
if query.paths.is_empty() {
return Err(io::Error::new(io::ErrorKind::InvalidInput, "missing paths"));
}
if let Some(depth) = query.options.max_depth.as_ref().copied() {
walker_builder
.as_mut()
.unwrap()
.max_depth(Some(depth as usize));
}
let mut walker_builder = WalkBuilder::new(&query.paths[0]);
for path in &query.paths[1..] {
walker_builder.add(path);
}
walker_builder
.skip_stdout(true)
.follow_links(query.options.follow_symbolic_links)
.max_depth(
query
.options
.max_depth
.as_ref()
.copied()
.map(|d| d as usize),
);
Ok(Self {
id: rand::random(),
query,
matcher,
walker: walker_builder
.map(|builder| builder.build_parallel())
.ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, "missing paths"))?,
walker: walker_builder.build_parallel(),
cancel_tx: Some(cancel_tx),
cancel_rx,
@ -321,7 +314,7 @@ impl SearchQueryExecutor {
self.id
}
pub fn take_cancel_tx(&mut self) -> Option<oneshot::Sender<()>> {
pub fn take_cancel_tx(&mut self) -> Option<broadcast::Sender<()>> {
self.cancel_tx.take()
}
@ -344,7 +337,8 @@ impl SearchQueryExecutor {
let id = self.id;
let walker = self.walker;
let tx = self.match_tx;
let mut cancel = self.cancel_rx;
let cancel = self.cancel_rx;
let matcher = self.matcher;
// Create our path filter we will use to filter out entries that do not match filter
let include_path_filter = match self.query.options.include.as_ref() {
@ -387,72 +381,127 @@ impl SearchQueryExecutor {
options: self.query.options.clone(),
};
let mut builder = SearchQueryExecutorParallelVistorBuilder {
search_id: self.id,
target: self.query.target,
cancel,
tx,
matcher: &matcher,
include_path_filter: &include_path_filter,
exclude_path_filter: &exclude_path_filter,
options_filter: &options_filter,
};
// Search all entries for matches and report them
walker.run(|| {
let tx = tx.clone();
Box::new(move |result| {
use ignore::WalkState;
// Get the entry, skipping errors with directories, and continuing on
// errors with non-directories
let entry = match result {
Ok(entry) => entry,
Err(_) => return WalkState::Skip,
};
//
// NOTE: This should block waiting for all threads to complete
walker.visit(&mut builder);
}
}
// Validate the path of the entry should be processed
if !include_path_filter.filter(entry.path())
|| exclude_path_filter.filter(entry.path())
|| !options_filter.filter(&entry)
{
return WalkState::Skip;
}
struct SearchQueryExecutorParallelVistorBuilder<'a> {
search_id: SearchId,
target: SearchQueryTarget,
cancel: broadcast::Receiver<()>,
tx: mpsc::UnboundedSender<SearchQueryMatch>,
matcher: &'a RegexMatcher,
include_path_filter: &'a SearchQueryPathFilter,
exclude_path_filter: &'a SearchQueryPathFilter,
options_filter: &'a SearchQueryOptionsFilter,
}
// Check if we are being interrupted, and if so exit our loop early
match cancel.try_recv() {
Err(oneshot::error::TryRecvError::Empty) => (),
_ => {
debug!("[Query {id}] Cancelled");
return WalkState::Quit;
}
}
impl<'a> ParallelVisitorBuilder<'a> for SearchQueryExecutorParallelVistorBuilder<'a> {
fn build(&mut self) -> Box<dyn ParallelVisitor + 'a> {
Box::new(SearchQueryExecutorParallelVistor {
search_id: self.search_id,
target: self.target,
cancel: self.cancel.resubscribe(),
tx: self.tx.clone(),
matcher: self.matcher,
include_path_filter: self.include_path_filter,
exclude_path_filter: self.exclude_path_filter,
options_filter: self.options_filter,
})
}
}
let res = match self.query.target {
// Perform the search against the path itself
SearchQueryTarget::Path => {
let path_str = entry.path().to_string_lossy();
Searcher::new().search_slice(
&self.matcher,
path_str.as_bytes(),
SearchQueryPathSink {
search_id: id,
path: entry.path(),
matcher: &self.matcher,
callback: |m| Ok(tx.send(m).is_ok()),
},
)
}
struct SearchQueryExecutorParallelVistor<'a> {
search_id: SearchId,
target: SearchQueryTarget,
cancel: broadcast::Receiver<()>,
tx: mpsc::UnboundedSender<SearchQueryMatch>,
matcher: &'a RegexMatcher,
include_path_filter: &'a SearchQueryPathFilter,
exclude_path_filter: &'a SearchQueryPathFilter,
options_filter: &'a SearchQueryOptionsFilter,
}
// Perform the search against the file's contents
SearchQueryTarget::Contents => Searcher::new().search_path(
&self.matcher,
entry.path(),
SearchQueryContentsSink {
search_id: id,
path: entry.path(),
matcher: &self.matcher,
callback: |m| Ok(tx.send(m).is_ok()),
},
),
};
impl<'a> ParallelVisitor for SearchQueryExecutorParallelVistor<'a> {
fn visit(&mut self, entry: Result<DirEntry, ignore::Error>) -> ignore::WalkState {
use ignore::WalkState;
let id = self.search_id;
if let Err(x) = res {
error!("[Query {id}] Search failed for {:?}: {x}", entry.path());
}
// Get the entry, skipping errors with directories, and continuing on
// errors with non-directories
let entry = match entry {
Ok(entry) => entry,
Err(_) => return WalkState::Skip,
};
WalkState::Continue
})
})
// Validate the path of the entry should be processed
//
// NOTE: We do not SKIP here as we cannot cancel a directory traversal early as this can
// cause us to miss relevant submatches deeper in the traversal
if !self.include_path_filter.filter(entry.path())
|| self.exclude_path_filter.filter(entry.path())
|| !self.options_filter.filter(&entry)
{
return WalkState::Continue;
}
// Check if we are being interrupted, and if so exit our loop early
match self.cancel.try_recv() {
Err(broadcast::error::TryRecvError::Empty) => (),
_ => {
debug!("[Query {id}] Cancelled");
return WalkState::Quit;
}
}
let res = match self.target {
// Perform the search against the path itself
SearchQueryTarget::Path => {
let path_str = entry.path().to_string_lossy();
Searcher::new().search_slice(
self.matcher,
path_str.as_bytes(),
SearchQueryPathSink {
search_id: id,
path: entry.path(),
matcher: self.matcher,
callback: |m| Ok(self.tx.send(m).is_ok()),
},
)
}
// Perform the search against the file's contents
SearchQueryTarget::Contents => Searcher::new().search_path(
self.matcher,
entry.path(),
SearchQueryContentsSink {
search_id: id,
path: entry.path(),
matcher: self.matcher,
callback: |m| Ok(self.tx.send(m).is_ok()),
},
),
};
if let Err(x) = res {
error!("[Query {id}] Search failed for {:?}: {x}", entry.path());
}
WalkState::Continue
}
}
@ -1208,116 +1257,6 @@ mod tests {
assert_eq!(rx.recv().await, None);
}
#[tokio::test]
async fn should_traverse_starting_from_min_depth_if_specified() {
let root = setup_dir(vec![
("path/to/file1.txt", ""),
("path/to/file2.txt", ""),
("other/file.txt", ""),
("other/dir/bin", ""),
]);
async fn test_min_depth(
root: &assert_fs::TempDir,
depth: u64,
expected_paths: Vec<PathBuf>,
) {
let state = SearchState::new();
let (reply, mut rx) = mpsc::channel(100);
let query = SearchQuery {
paths: vec![root.path().to_path_buf()],
target: SearchQueryTarget::Path,
condition: SearchQueryCondition::regex(".*"),
options: SearchQueryOptions {
min_depth: Some(depth),
..Default::default()
},
};
let search_id = state.start(query, Box::new(reply)).await.unwrap();
let mut paths = get_matches(rx.recv().await.unwrap())
.into_iter()
.filter_map(|m| m.into_path_match())
.map(|m| m.path)
.collect::<Vec<_>>();
paths.sort_unstable();
assert_eq!(paths, expected_paths);
let data = rx.recv().await;
assert_eq!(
data,
Some(DistantResponseData::SearchDone { id: search_id })
);
assert_eq!(rx.recv().await, None);
}
// Minimum depth of 0 should include root search path
test_min_depth(
&root,
0,
vec![
root.to_path_buf(),
root.child(make_path("other")).to_path_buf(),
root.child(make_path("other/dir")).to_path_buf(),
root.child(make_path("other/dir/bin")).to_path_buf(),
root.child(make_path("other/file.txt")).to_path_buf(),
root.child(make_path("path")).to_path_buf(),
root.child(make_path("path/to")).to_path_buf(),
root.child(make_path("path/to/file1.txt")).to_path_buf(),
root.child(make_path("path/to/file2.txt")).to_path_buf(),
],
)
.await;
// Minimum depth of 1 should not root search path
test_min_depth(
&root,
1,
vec![
root.child(make_path("other")).to_path_buf(),
root.child(make_path("other/dir")).to_path_buf(),
root.child(make_path("other/dir/bin")).to_path_buf(),
root.child(make_path("other/file.txt")).to_path_buf(),
root.child(make_path("path")).to_path_buf(),
root.child(make_path("path/to")).to_path_buf(),
root.child(make_path("path/to/file1.txt")).to_path_buf(),
root.child(make_path("path/to/file2.txt")).to_path_buf(),
],
)
.await;
// Minimum depth of 2 should not include root or children
test_min_depth(
&root,
2,
vec![
root.child(make_path("other/dir")).to_path_buf(),
root.child(make_path("other/dir/bin")).to_path_buf(),
root.child(make_path("other/file.txt")).to_path_buf(),
root.child(make_path("path/to")).to_path_buf(),
root.child(make_path("path/to/file1.txt")).to_path_buf(),
root.child(make_path("path/to/file2.txt")).to_path_buf(),
],
)
.await;
// Minimum depth of 3 should not include root or children or grandchildren
test_min_depth(
&root,
3,
vec![
root.child(make_path("other/dir/bin")).to_path_buf(),
root.child(make_path("path/to/file1.txt")).to_path_buf(),
root.child(make_path("path/to/file2.txt")).to_path_buf(),
],
)
.await;
}
#[tokio::test]
async fn should_traverse_no_deeper_than_max_depth_if_specified() {
let root = setup_dir(vec![

@ -170,15 +170,6 @@ pub struct SearchQueryOptions {
#[serde(default)]
pub limit: Option<u64>,
/// Minimum depth (directories) to search
///
/// The smallest depth is 0 and always corresponds to the path given to the new function on
/// this type. Its direct descendents have depth 1, and their descendents have depth 2, and so
/// on.
#[cfg_attr(feature = "clap", clap(long))]
#[serde(default)]
pub min_depth: Option<u64>,
/// Maximum depth (directories) to search
///
/// The smallest depth is 0 and always corresponds to the path given to the new function on

Loading…
Cancel
Save