Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add watch_filtered, filtering paths to watch. #632

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 31 additions & 12 deletions notify/src/fsevent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
#![allow(non_upper_case_globals, dead_code)]

use crate::event::*;
use crate::{unbounded, Config, Error, EventHandler, RecursiveMode, Result, Sender, Watcher};
use crate::{
unbounded, Config, Error, EventHandler, RecursiveMode, Result, Sender, WatchFilter, Watcher,
};
use fsevent_sys as fs;
use fsevent_sys::core_foundation as cf;
use std::collections::HashMap;
Expand Down Expand Up @@ -66,7 +68,7 @@ pub struct FsEventWatcher {
flags: fs::FSEventStreamCreateFlags,
event_handler: Arc<Mutex<dyn EventHandler>>,
runloop: Option<(cf::CFRunLoopRef, thread::JoinHandle<()>)>,
recursive_info: HashMap<PathBuf, bool>,
recursive_info: HashMap<PathBuf, (bool, WatchFilter)>,
}

impl fmt::Debug for FsEventWatcher {
Expand Down Expand Up @@ -242,7 +244,7 @@ fn translate_flags(flags: StreamFlags, precise: bool) -> Vec<Event> {

struct StreamContextInfo {
event_handler: Arc<Mutex<dyn EventHandler>>,
recursive_info: HashMap<PathBuf, bool>,
recursive_info: HashMap<PathBuf, (bool, WatchFilter)>,
}

// Free the context when the stream created by `FSEventStreamCreate` is released.
Expand Down Expand Up @@ -280,9 +282,14 @@ impl FsEventWatcher {
})
}

fn watch_inner(&mut self, path: &Path, recursive_mode: RecursiveMode) -> Result<()> {
fn watch_inner(
&mut self,
path: &Path,
recursive_mode: RecursiveMode,
watch_filter: WatchFilter,
) -> Result<()> {
self.stop();
let result = self.append_path(path, recursive_mode);
let result = self.append_path(path, recursive_mode, watch_filter);
// ignore return error: may be empty path list
let _ = self.run();
result
Expand Down Expand Up @@ -360,7 +367,12 @@ impl FsEventWatcher {
}

// https://github.com/thibaudgg/rb-fsevent/blob/master/ext/fsevent_watch/main.c
fn append_path(&mut self, path: &Path, recursive_mode: RecursiveMode) -> Result<()> {
fn append_path(
&mut self,
path: &Path,
recursive_mode: RecursiveMode,
watch_filter: WatchFilter,
) -> Result<()> {
if !path.exists() {
return Err(Error::path_not_found().add_path(path.into()));
}
Expand All @@ -378,8 +390,10 @@ impl FsEventWatcher {
cf::CFArrayAppendValue(self.paths, cf_path);
cf::CFRelease(cf_path);
}
self.recursive_info
.insert(canonical_path, recursive_mode.is_recursive());
self.recursive_info.insert(
canonical_path,
(recursive_mode.is_recursive(), watch_filter),
);
Ok(())
}

Expand Down Expand Up @@ -522,8 +536,8 @@ unsafe fn callback_impl(
});

let mut handle_event = false;
for (p, r) in &(*info).recursive_info {
if path.starts_with(p) {
for (p, (r, filt)) in &(*info).recursive_info {
if path.starts_with(p) && filt.should_watch(p) {
if *r || &path == p {
handle_event = true;
break;
Expand Down Expand Up @@ -557,8 +571,13 @@ impl Watcher for FsEventWatcher {
Self::from_event_handler(Arc::new(Mutex::new(event_handler)))
}

fn watch(&mut self, path: &Path, recursive_mode: RecursiveMode) -> Result<()> {
self.watch_inner(path, recursive_mode)
fn watch_filtered(
&mut self,
path: &Path,
recursive_mode: RecursiveMode,
watch_filter: WatchFilter,
) -> Result<()> {
self.watch_inner(path, recursive_mode, watch_filter)
}

fn unwatch(&mut self, path: &Path) -> Result<()> {
Expand Down
90 changes: 65 additions & 25 deletions notify/src/inotify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
//! will return events for the directory itself, and for files inside the directory.

use super::event::*;
use super::{Config, Error, ErrorKind, EventHandler, RecursiveMode, Result, Watcher};
use super::{Config, Error, ErrorKind, EventHandler, RecursiveMode, Result, WatchFilter, Watcher};
use crate::{bounded, unbounded, BoundSender, Receiver, Sender};
use inotify as inotify_sys;
use inotify_sys::{EventMask, Inotify, WatchDescriptor, WatchMask};
Expand Down Expand Up @@ -37,7 +37,7 @@ struct EventLoop {
inotify: Option<Inotify>,
event_handler: Box<dyn EventHandler>,
/// PathBuf -> (WatchDescriptor, WatchMask, is_recursive, is_dir)
watches: HashMap<PathBuf, (WatchDescriptor, WatchMask, bool, bool)>,
watches: HashMap<PathBuf, (WatchDescriptor, WatchMask, bool, bool, WatchFilter)>,
paths: HashMap<WatchDescriptor, PathBuf>,
rename_event: Option<Event>,
}
Expand All @@ -50,7 +50,7 @@ pub struct INotifyWatcher {
}

enum EventLoopMsg {
AddWatch(PathBuf, RecursiveMode, Sender<Result<()>>),
AddWatch(PathBuf, RecursiveMode, WatchFilter, Sender<Result<()>>),
RemoveWatch(PathBuf, Sender<Result<()>>),
Shutdown,
Configure(Config, BoundSender<Result<bool>>),
Expand All @@ -60,15 +60,15 @@ enum EventLoopMsg {
fn add_watch_by_event(
path: &Option<PathBuf>,
event: &inotify_sys::Event<&OsStr>,
watches: &HashMap<PathBuf, (WatchDescriptor, WatchMask, bool, bool)>,
add_watches: &mut Vec<PathBuf>,
watches: &HashMap<PathBuf, (WatchDescriptor, WatchMask, bool, bool, WatchFilter)>,
add_watches: &mut Vec<(PathBuf, WatchFilter)>,
) {
if let Some(ref path) = *path {
if event.mask.contains(EventMask::ISDIR) {
if let Some(parent_path) = path.parent() {
if let Some(&(_, _, is_recursive, _)) = watches.get(parent_path) {
if let Some(&(_, _, is_recursive, _, ref filter)) = watches.get(parent_path) {
if is_recursive {
add_watches.push(path.to_owned());
add_watches.push((path.to_owned(), filter.clone()));
}
}
}
Expand All @@ -79,7 +79,7 @@ fn add_watch_by_event(
#[inline]
fn remove_watch_by_event(
path: &Option<PathBuf>,
watches: &HashMap<PathBuf, (WatchDescriptor, WatchMask, bool, bool)>,
watches: &HashMap<PathBuf, (WatchDescriptor, WatchMask, bool, bool, WatchFilter)>,
remove_watches: &mut Vec<PathBuf>,
) {
if let Some(ref path) = *path {
Expand Down Expand Up @@ -166,8 +166,13 @@ impl EventLoop {
fn handle_messages(&mut self) {
while let Ok(msg) = self.event_loop_rx.try_recv() {
match msg {
EventLoopMsg::AddWatch(path, recursive_mode, tx) => {
let _ = tx.send(self.add_watch(path, recursive_mode.is_recursive(), true));
EventLoopMsg::AddWatch(path, recursive_mode, watch_filter, tx) => {
let _ = tx.send(self.add_watch(
path,
recursive_mode.is_recursive(),
true,
watch_filter,
));
}
EventLoopMsg::RemoveWatch(path, tx) => {
let _ = tx.send(self.remove_watch(path, false));
Expand Down Expand Up @@ -301,8 +306,8 @@ impl EventLoop {
Some(watched_path) => {
let current_watch = self.watches.get(watched_path);
match current_watch {
Some(&(_, _, _, true)) => RemoveKind::Folder,
Some(&(_, _, _, false)) => RemoveKind::File,
Some(&(_, _, _, true, _)) => RemoveKind::Folder,
Some(&(_, _, _, false, _)) => RemoveKind::File,
None => RemoveKind::Other,
}
}
Expand Down Expand Up @@ -385,24 +390,40 @@ impl EventLoop {
self.remove_watch(path, true).ok();
}

for path in add_watches {
self.add_watch(path, true, false).ok();
for (path, filter) in add_watches {
self.add_watch(path, true, false, filter).ok();
}
}

fn add_watch(&mut self, path: PathBuf, is_recursive: bool, mut watch_self: bool) -> Result<()> {
fn add_watch(
&mut self,
path: PathBuf,
is_recursive: bool,
mut watch_self: bool,
watch_filter: WatchFilter,
) -> Result<()> {
if !watch_filter.should_watch(&path) {
return Ok(());
}

// If the watch is not recursive, or if we determine (by stat'ing the path to get its
// metadata) that the watched path is not a directory, add a single path watch.
if !is_recursive || !metadata(&path).map_err(Error::io)?.is_dir() {
return self.add_single_watch(path, false, true);
return self.add_single_watch(path, false, true, WatchFilter::accept_all());
}

for entry in WalkDir::new(path)
.follow_links(true)
.into_iter()
.filter_map(filter_dir)
.filter(|e| watch_filter.should_watch(e.path()))
{
self.add_single_watch(entry.path().to_path_buf(), is_recursive, watch_self)?;
self.add_single_watch(
entry.path().to_path_buf(),
is_recursive,
watch_self,
watch_filter.clone(),
)?;
watch_self = false;
}

Expand All @@ -414,6 +435,7 @@ impl EventLoop {
path: PathBuf,
is_recursive: bool,
watch_self: bool,
watch_filter: WatchFilter,
) -> Result<()> {
let mut watchmask = WatchMask::ATTRIB
| WatchMask::CREATE
Expand All @@ -428,7 +450,7 @@ impl EventLoop {
watchmask.insert(WatchMask::MOVE_SELF);
}

if let Some(&(_, old_watchmask, _, _)) = self.watches.get(&path) {
if let Some(&(_, old_watchmask, _, _, _)) = self.watches.get(&path) {
watchmask.insert(old_watchmask);
watchmask.insert(WatchMask::MASK_ADD);
}
Expand All @@ -449,8 +471,16 @@ impl EventLoop {
Ok(w) => {
watchmask.remove(WatchMask::MASK_ADD);
let is_dir = metadata(&path).map_err(Error::io)?.is_dir();
self.watches
.insert(path.clone(), (w.clone(), watchmask, is_recursive, is_dir));
self.watches.insert(
path.clone(),
(
w.clone(),
watchmask,
is_recursive,
is_dir,
watch_filter.clone(),
),
);
self.paths.insert(w, path);
Ok(())
}
Expand All @@ -463,7 +493,7 @@ impl EventLoop {
fn remove_watch(&mut self, path: PathBuf, remove_recursive: bool) -> Result<()> {
match self.watches.remove(&path) {
None => return Err(Error::watch_not_found().add_path(path)),
Some((w, _, is_recursive, _)) => {
Some((w, _, is_recursive, _, _)) => {
if let Some(ref mut inotify) = self.inotify {
let mut inotify_watches = inotify.watches();
log::trace!("removing inotify watch: {}", path.display());
Expand Down Expand Up @@ -531,15 +561,20 @@ impl INotifyWatcher {
Ok(INotifyWatcher { channel, waker })
}

fn watch_inner(&mut self, path: &Path, recursive_mode: RecursiveMode) -> Result<()> {
fn watch_inner(
&mut self,
path: &Path,
recursive_mode: RecursiveMode,
watch_filter: WatchFilter,
) -> Result<()> {
let pb = if path.is_absolute() {
path.to_owned()
} else {
let p = env::current_dir().map_err(Error::io)?;
p.join(path)
};
let (tx, rx) = unbounded();
let msg = EventLoopMsg::AddWatch(pb, recursive_mode, tx);
let msg = EventLoopMsg::AddWatch(pb, recursive_mode, watch_filter, tx);

// we expect the event loop to live and reply => unwraps must not panic
self.channel.send(msg).unwrap();
Expand Down Expand Up @@ -570,8 +605,13 @@ impl Watcher for INotifyWatcher {
Self::from_event_handler(Box::new(event_handler))
}

fn watch(&mut self, path: &Path, recursive_mode: RecursiveMode) -> Result<()> {
self.watch_inner(path, recursive_mode)
fn watch_filtered(
&mut self,
path: &Path,
recursive_mode: RecursiveMode,
watch_filter: WatchFilter,
) -> Result<()> {
self.watch_inner(path, recursive_mode, watch_filter)
}

fn unwatch(&mut self, path: &Path) -> Result<()> {
Expand Down
59 changes: 57 additions & 2 deletions notify/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@
pub use config::{Config, RecursiveMode};
pub use error::{Error, ErrorKind, Result};
pub use notify_types::event::{self, Event, EventKind};
use std::path::Path;
use std::{path::Path, sync::Arc};

#[allow(dead_code)]
#[cfg(feature = "crossbeam-channel")]
Expand Down Expand Up @@ -324,6 +324,37 @@ pub enum WatcherKind {
NullWatcher,
}

type FilterFn = dyn Fn(&Path) -> bool + Send + Sync;
/// Path filter to limit what gets watched.
#[derive(Clone)]
pub struct WatchFilter(Option<Arc<FilterFn>>);

impl std::fmt::Debug for WatchFilter {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_tuple("WatchFilterFn")
.field(&self.0.as_ref().map_or("no filter", |_| "filter fn"))
.finish()
}
}

impl WatchFilter {
/// A filter that accepts any path, use to watch all paths.
pub fn accept_all() -> WatchFilter {
WatchFilter(None)
}

/// A fitler to limit the paths that get watched.
///
/// Only paths for which `filter` returns `true` will be watched.
pub fn with_filter(filter: Arc<FilterFn>) -> WatchFilter {
WatchFilter(Some(filter))
}

fn should_watch(&self, path: &Path) -> bool {
self.0.as_ref().map_or(true, |f| f(path))
}
}

/// Type that can deliver file activity notifications
///
/// Watcher is implemented per platform using the best implementation available on that platform.
Expand All @@ -349,7 +380,31 @@ pub trait Watcher {
///
/// [#165]: https://github.com/notify-rs/notify/issues/165
/// [#166]: https://github.com/notify-rs/notify/issues/166
fn watch(&mut self, path: &Path, recursive_mode: RecursiveMode) -> Result<()>;
fn watch(&mut self, path: &Path, recursive_mode: RecursiveMode) -> Result<()> {
self.watch_filtered(path, recursive_mode, WatchFilter::accept_all())
}

/// Begin watching a new path, filtering out sub-paths by name.
///
/// If the `path` is a directory, `recursive_mode` will be evaluated. If `recursive_mode` is
/// `RecursiveMode::Recursive` events will be delivered for all files in that tree. Otherwise
/// only the directory and its immediate children will be watched.
///
/// If the `path` is a file, `recursive_mode` will be ignored and events will be delivered only
/// for the file.
///
/// On some platforms, if the `path` is renamed or removed while being watched, behaviour may
/// be unexpected. See discussions in [#165] and [#166]. If less surprising behaviour is wanted
/// one may non-recursively watch the _parent_ directory as well and manage related events.
///
/// [#165]: https://github.com/notify-rs/notify/issues/165
/// [#166]: https://github.com/notify-rs/notify/issues/166
fn watch_filtered(
&mut self,
path: &Path,
recursive_mode: RecursiveMode,
watch_filter: WatchFilter,
) -> Result<()>;

/// Stop watching a path.
///
Expand Down
Loading