Initial commit

pull/195/head
Chip Senkbeil 1 year ago
parent 9da7679081
commit dd91fffeea
No known key found for this signature in database
GPG Key ID: 35EF1F8EC72A4131

22
Cargo.lock generated

@ -902,6 +902,7 @@ dependencies = [
"indoc",
"log",
"notify",
"notify-debouncer-full",
"num_cpus",
"once_cell",
"portable-pty 0.8.1",
@ -1142,6 +1143,15 @@ dependencies = [
"subtle",
]
[[package]]
name = "file-id"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e13be71e6ca82e91bc0cb862bebaac0b2d1924a5a1d970c822b2f98b63fda8c3"
dependencies = [
"winapi-util",
]
[[package]]
name = "file-mode"
version = "0.1.2"
@ -1975,6 +1985,18 @@ dependencies = [
"windows-sys 0.45.0",
]
[[package]]
name = "notify-debouncer-full"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f4812c1eb49be776fb8df4961623bdc01ec9dfdc1abe8211ceb09150a2e64219"
dependencies = [
"file-id",
"notify",
"parking_lot 0.12.1",
"walkdir",
]
[[package]]
name = "ntapi"
version = "0.4.1"

@ -25,7 +25,8 @@ distant-core = { version = "=0.20.0-alpha.7", path = "../distant-core" }
grep = "0.2.12"
ignore = "0.4.20"
log = "0.4.18"
notify = { version = "6.0.0", default-features = false }
notify = { version = "6.0.0", default-features = false, features = ["macos_fsevent"] }
notify-debouncer-full = { version = "0.1.0", default-features = false }
num_cpus = "1.15.0"
portable-pty = "0.8.1"
rand = { version = "0.8.5", features = ["getrandom"] }

@ -2,6 +2,7 @@ use std::path::{Path, PathBuf};
use std::time::SystemTime;
use std::{env, io};
use crate::config::Config;
use async_trait::async_trait;
use distant_core::protocol::{
Capabilities, ChangeKind, ChangeKindSet, DirEntry, Environment, FileType, Metadata,
@ -15,7 +16,6 @@ use tokio::io::AsyncWriteExt;
use walkdir::WalkDir;
mod process;
mod state;
use state::*;
@ -29,9 +29,9 @@ pub struct LocalDistantApi {
impl LocalDistantApi {
/// Initialize the api instance
pub fn initialize() -> io::Result<Self> {
pub fn initialize(config: Config) -> io::Result<Self> {
Ok(Self {
state: GlobalState::initialize()?,
state: GlobalState::initialize(config)?,
})
}
}
@ -709,6 +709,7 @@ mod tests {
use tokio::sync::mpsc;
use super::*;
use crate::config::WatchConfig;
static TEMP_SCRIPT_DIR: Lazy<assert_fs::TempDir> =
Lazy::new(|| assert_fs::TempDir::new().unwrap());
@ -769,8 +770,16 @@ mod tests {
static DOES_NOT_EXIST_BIN: Lazy<assert_fs::fixture::ChildPath> =
Lazy::new(|| TEMP_SCRIPT_DIR.child("does_not_exist_bin"));
const DEBOUNCE_TIMEOUT: Duration = Duration::from_millis(100);
async fn setup(buffer: usize) -> (LocalDistantApi, DistantCtx<()>, mpsc::Receiver<Response>) {
let api = LocalDistantApi::initialize().unwrap();
let api = LocalDistantApi::initialize(Config {
watch: WatchConfig {
debounce_timeout: DEBOUNCE_TIMEOUT,
..Default::default()
},
})
.unwrap();
let (reply, rx) = make_reply(buffer);
let connection_id = rand::random();
@ -1630,7 +1639,7 @@ mod tests {
// Sleep a bit to give time to get all changes happening
// TODO: Can we slim down this sleep? Or redesign test in some other way?
tokio::time::sleep(Duration::from_millis(100)).await;
tokio::time::sleep(DEBOUNCE_TIMEOUT + Duration::from_millis(100)).await;
// Collect all responses, as we may get multiple for interactions within a directory
let mut responses = Vec::new();

@ -1,3 +1,4 @@
use crate::config::Config;
use std::io;
mod process;
@ -22,11 +23,13 @@ pub struct GlobalState {
}
impl GlobalState {
pub fn initialize() -> io::Result<Self> {
pub fn initialize(config: Config) -> io::Result<Self> {
Ok(Self {
process: ProcessState::new(),
search: SearchState::new(),
watcher: WatcherState::initialize()?,
watcher: WatcherBuilder::new()
.with_config(config.watch)
.initialize()?,
})
}
}

@ -2,15 +2,18 @@ use std::collections::HashMap;
use std::io;
use std::ops::Deref;
use std::path::{Path, PathBuf};
use std::time::Duration;
use crate::config::WatchConfig;
use distant_core::net::common::ConnectionId;
use distant_core::protocol::ChangeKind;
use log::*;
use notify::event::{AccessKind, AccessMode, ModifyKind};
use notify::{
Config as WatcherConfig, Error as WatcherError, ErrorKind as WatcherErrorKind,
Event as WatcherEvent, EventKind, PollWatcher, RecursiveMode, Watcher,
Event as WatcherEvent, EventKind, PollWatcher, RecommendedWatcher, RecursiveMode, Watcher,
};
use notify_debouncer_full::{new_debouncer_opt, DebounceEventResult, Debouncer, FileIdMap};
use tokio::sync::mpsc::error::TrySendError;
use tokio::sync::{mpsc, oneshot};
use tokio::task::JoinHandle;
@ -20,38 +23,36 @@ use crate::constants::SERVER_WATCHER_CAPACITY;
mod path;
pub use path::*;
/// Holds information related to watched paths on the server
pub struct WatcherState {
channel: WatcherChannel,
task: JoinHandle<()>,
/// Builder for a watcher.
#[derive(Default)]
pub struct WatcherBuilder {
config: WatchConfig,
}
impl Drop for WatcherState {
/// Aborts the task that handles watcher path operations and management
fn drop(&mut self) {
self.abort();
impl WatcherBuilder {
/// Creates a new builder configured to use the native watcher using default configuration.
pub fn new() -> Self {
Self::default()
}
/// Swaps the configuration with the provided one.
pub fn with_config(self, config: WatchConfig) -> Self {
Self { config }
}
}
impl WatcherState {
/// Will create a watcher and initialize watched paths to be empty
pub fn initialize() -> io::Result<Self> {
pub fn initialize(self) -> io::Result<WatcherState> {
// NOTE: Cannot be something small like 1 as this seems to cause a deadlock sometimes
// with a large volume of watch requests
let (tx, rx) = mpsc::channel(SERVER_WATCHER_CAPACITY);
macro_rules! spawn_watcher {
($watcher:ident) => {{
Self {
channel: WatcherChannel { tx },
task: tokio::spawn(watcher_task($watcher, rx)),
}
}};
}
let watcher_config = WatcherConfig::default()
.with_compare_contents(self.config.compare_contents)
.with_poll_interval(self.config.poll_interval.unwrap_or(Duration::from_secs(30)));
macro_rules! event_handler {
($tx:ident) => {
move |res| match $tx.try_send(match res {
macro_rules! process_event {
($tx:ident, $evt:expr) => {
match $tx.try_send(match $evt {
Ok(x) => InnerWatcherMsg::Event { ev: x },
Err(x) => InnerWatcherMsg::Error { err: x },
}) {
@ -69,30 +70,83 @@ impl WatcherState {
};
}
macro_rules! new_debouncer {
($watcher:ident, $tx:ident) => {{
new_debouncer_opt::<_, $watcher, FileIdMap>(
self.config.debounce_timeout,
self.config.debounce_tick_rate,
move |result: DebounceEventResult| match result {
Ok(events) => {
for x in events {
process_event!($tx, Ok(x));
}
}
Err(errors) => {
for x in errors {
process_event!($tx, Err(x));
}
}
},
FileIdMap::new(),
watcher_config,
)
}};
}
macro_rules! spawn_task {
($debouncer:expr) => {{
WatcherState {
channel: WatcherChannel { tx },
task: tokio::spawn(watcher_task($debouncer, rx)),
}
}};
}
let tx = tx.clone();
let result = {
let tx = tx.clone();
notify::recommended_watcher(event_handler!(tx))
};
match result {
Ok(watcher) => Ok(spawn_watcher!(watcher)),
Err(x) => match x.kind {
// notify-rs has a bug on Mac M1 with Docker and Linux, so we detect that error
// and fall back to the poll watcher if this occurs
//
// https://github.com/notify-rs/notify/issues/423
WatcherErrorKind::Io(x) if x.raw_os_error() == Some(38) => {
warn!("Recommended watcher is unsupported! Falling back to polling watcher!");
let watcher = PollWatcher::new(event_handler!(tx), WatcherConfig::default())
.map_err(|x| io::Error::new(io::ErrorKind::Other, x))?;
Ok(spawn_watcher!(watcher))
if self.config.native {
let result = {
let tx = tx.clone();
new_debouncer!(RecommendedWatcher, tx)
};
match result {
Ok(debouncer) => Ok(spawn_task!(debouncer)),
Err(x) => {
match x.kind {
// notify-rs has a bug on Mac M1 with Docker and Linux, so we detect that error
// and fall back to the poll watcher if this occurs
//
// https://github.com/notify-rs/notify/issues/423
WatcherErrorKind::Io(x) if x.raw_os_error() == Some(38) => {
warn!("Recommended watcher is unsupported! Falling back to polling watcher!");
Ok(spawn_task!(new_debouncer!(PollWatcher, tx)
.map_err(|x| io::Error::new(io::ErrorKind::Other, x))?))
}
_ => Err(io::Error::new(io::ErrorKind::Other, x)),
}
}
_ => Err(io::Error::new(io::ErrorKind::Other, x)),
},
}
} else {
Ok(spawn_task!(new_debouncer!(PollWatcher, tx)
.map_err(|x| io::Error::new(io::ErrorKind::Other, x))?))
}
}
}
/// Holds information related to watched paths on the server
pub struct WatcherState {
channel: WatcherChannel,
task: JoinHandle<()>,
}
impl Drop for WatcherState {
/// Aborts the task that handles watcher path operations and management
fn drop(&mut self) {
self.abort();
}
}
impl WatcherState {
/// Aborts the watcher task
pub fn abort(&self) {
self.task.abort();
@ -169,7 +223,12 @@ enum InnerWatcherMsg {
},
}
async fn watcher_task(mut watcher: impl Watcher, mut rx: mpsc::Receiver<InnerWatcherMsg>) {
async fn watcher_task<W>(
mut debouncer: Debouncer<W, FileIdMap>,
mut rx: mpsc::Receiver<InnerWatcherMsg>,
) where
W: Watcher,
{
// TODO: Optimize this in some way to be more performant than
// checking every path whenever an event comes in
let mut registered_paths: Vec<RegisteredPath> = Vec::new();
@ -193,7 +252,8 @@ async fn watcher_task(mut watcher: impl Watcher, mut rx: mpsc::Receiver<InnerWat
// Send an okay because we always succeed in this case
let _ = cb.send(Ok(()));
} else {
let res = watcher
let res = debouncer
.watcher()
.watch(
registered_path.path(),
if registered_path.is_recursive() {
@ -233,7 +293,8 @@ async fn watcher_task(mut watcher: impl Watcher, mut rx: mpsc::Receiver<InnerWat
// 3. Otherwise, we return okay because we succeeded
if *cnt <= removed_cnt {
let _ = cb.send(
watcher
debouncer
.watcher()
.unwatch(&path)
.map_err(|x| io::Error::new(io::ErrorKind::Other, x)),
);

@ -0,0 +1,28 @@
use std::time::Duration;
#[derive(Clone, Debug, Default, PartialEq, Eq)]
pub struct Config {
pub watch: WatchConfig,
}
/// Configuration specifically for watching files and directories.
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct WatchConfig {
pub native: bool,
pub poll_interval: Option<Duration>,
pub compare_contents: bool,
pub debounce_timeout: Duration,
pub debounce_tick_rate: Option<Duration>,
}
impl Default for WatchConfig {
fn default() -> Self {
Self {
native: true,
poll_interval: None,
compare_contents: false,
debounce_timeout: Duration::from_secs(2),
debounce_tick_rate: None,
}
}
}

@ -5,8 +5,10 @@
pub struct ReadmeDoctests;
mod api;
mod config;
mod constants;
pub use api::LocalDistantApi;
pub use config::*;
use distant_core::{DistantApi, DistantApiServerHandler};
/// Implementation of [`DistantApiServerHandler`] using [`LocalDistantApi`].
@ -14,8 +16,8 @@ pub type LocalDistantApiServerHandler =
DistantApiServerHandler<LocalDistantApi, <LocalDistantApi as DistantApi>::LocalData>;
/// Initializes a new [`LocalDistantApiServerHandler`].
pub fn initialize_handler() -> std::io::Result<LocalDistantApiServerHandler> {
pub fn initialize_handler(config: Config) -> std::io::Result<LocalDistantApiServerHandler> {
Ok(LocalDistantApiServerHandler::new(
LocalDistantApi::initialize()?,
LocalDistantApi::initialize(config)?,
))
}

@ -22,7 +22,7 @@ impl DistantClientCtx {
let (started_tx, mut started_rx) = mpsc::channel::<u16>(1);
tokio::spawn(async move {
if let Ok(api) = LocalDistantApi::initialize() {
if let Ok(api) = LocalDistantApi::initialize(Default::default()) {
let port: PortRange = "0".parse().unwrap();
let port = {
let handler = DistantApiServerHandler::new(api);

@ -140,7 +140,7 @@ async fn async_run(cmd: ServerSubcommand, _is_forked: bool) -> CliResult {
"using an ephemeral port".to_string()
}
);
let handler = distant_local::initialize_handler()
let handler = distant_local::initialize_handler(Default::default())
.context("Failed to create local distant api")?;
let server = Server::tcp()
.config(NetServerConfig {

Loading…
Cancel
Save