Skip to content

Commit

Permalink
Fix ordering of debounced events when multiple files are modified and…
Browse files Browse the repository at this point in the history
… renamed

Closes notify-rs#587
  • Loading branch information
dfaust committed Aug 4, 2024
1 parent 128bf62 commit 57a24ed
Show file tree
Hide file tree
Showing 9 changed files with 327 additions and 53 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ New crate containing public type definitions for the notify and debouncer crates
- CHANGE: add `RecommendedCache`, which automatically enables the file ID cache on Windows and MacOS
and disables it on Linux, where it is not needed

- FIX: ordering of debounced events when multiple files are modified and renamed (eg. during a safe save performed by Blender)

## debouncer-full 0.3.1 (2023-08-21)

- CHANGE: remove serde binary experiment opt-out after it got removed [#530]
Expand Down
181 changes: 130 additions & 51 deletions notify-debouncer-full/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ struct Queue {

impl Queue {
fn was_created(&self) -> bool {
self.events.front().map_or(false, |event| {
self.events.front().is_some_and(|event| {
matches!(
event.kind,
EventKind::Create(_) | EventKind::Modify(ModifyKind::Name(RenameMode::To))
Expand All @@ -162,7 +162,7 @@ impl Queue {
}

fn was_removed(&self) -> bool {
self.events.front().map_or(false, |event| {
self.events.front().is_some_and(|event| {
matches!(
event.kind,
EventKind::Remove(_) | EventKind::Modify(ModifyKind::Name(RenameMode::From))
Expand All @@ -171,9 +171,48 @@ impl Queue {
}
}

#[derive(Debug)]
pub struct BlockEntry {
pub blocker_path: PathBuf,
pub blocker_time: Instant,
pub blockee_path: PathBuf,
}

#[derive(Debug, Default)]
pub struct BlockManager {
entries: Vec<BlockEntry>,
}

impl BlockManager {
pub fn new() -> BlockManager {
BlockManager {
entries: Vec::new(),
}
}

pub fn add_blocker(&mut self, entry: BlockEntry) {
self.entries.push(entry);
}

pub fn remove_blocker(&mut self, path: &PathBuf, time: Instant) {
self.entries
.retain(|entry| entry.blocker_path != *path || entry.blocker_time != time);
}

pub fn is_blocked_by(&self, path: &PathBuf) -> Option<(&PathBuf, Instant)> {
for entry in &self.entries {
if entry.blockee_path == *path {
return Some((&entry.blocker_path, entry.blocker_time));
}
}
None
}
}

#[derive(Debug)]
pub(crate) struct DebounceDataInner<T> {
queues: HashMap<PathBuf, Queue>,
blocking: BlockManager,
roots: Vec<(PathBuf, RecursiveMode)>,
cache: T,
rename_event: Option<(DebouncedEvent, Option<FileId>)>,
Expand All @@ -186,6 +225,7 @@ impl<T: FileIdCache> DebounceDataInner<T> {
pub(crate) fn new(cache: T, timeout: Duration) -> Self {
Self {
queues: HashMap::new(),
blocking: BlockManager::new(),
roots: Vec::new(),
cache,
rename_event: None,
Expand All @@ -195,11 +235,17 @@ impl<T: FileIdCache> DebounceDataInner<T> {
}
}

fn contains_event(&self, path: &PathBuf, time: Instant) -> bool {
self.queues
.get(path)
.is_some_and(|queue| queue.events.iter().any(|event| event.time == time))
}

/// Retrieve a vec of debounced events, removing them if not continuous
pub fn debounced_events(&mut self) -> Vec<DebouncedEvent> {
let now = Instant::now();
let mut events_expired = Vec::with_capacity(self.queues.len());
let mut queues_remaining = HashMap::with_capacity(self.queues.len());
let events_count = self.queues.values().map(|queue| queue.events.len()).sum();
let mut events_expired = Vec::with_capacity(events_count);

if let Some(event) = self.rescan_event.take() {
if now.saturating_duration_since(event.time) >= self.timeout {
Expand All @@ -210,48 +256,62 @@ impl<T: FileIdCache> DebounceDataInner<T> {
}
}

// TODO: perfect fit for drain_filter https://github.com/rust-lang/rust/issues/59618
for (path, mut queue) in self.queues.drain() {
let mut kind_index = HashMap::new();

while let Some(event) = queue.events.pop_front() {
if now.saturating_duration_since(event.time) >= self.timeout {
// remove previous event of the same kind
if let Some(idx) = kind_index.get(&event.kind).copied() {
events_expired.remove(idx);

kind_index.values_mut().for_each(|i| {
if *i > idx {
*i -= 1
}
})
}
let mut kind_index: HashMap<PathBuf, HashMap<EventKind, usize>> = HashMap::new();

kind_index.insert(event.kind, events_expired.len());
while let Some(path) = self
.queues
// iterate over all queues
.iter()
// get the first event of every queue
.filter_map(|(path, queue)| queue.events.front().map(|event| (path, event.time)))
// filter out all blocked events
.filter(|(path, _)| {
self.blocking
.is_blocked_by(path)
.map_or(true, |(path, time)| !self.contains_event(path, time))
})
// get the event with the earliest timestamp
.min_by_key(|(_, time)| *time)
// get the path of the event
.map(|(path, _)| path.clone())
{
let event = self
.queues
.get_mut(&path)
.unwrap()
.events
.pop_front()
.unwrap();

events_expired.push(event);
} else {
queue.events.push_front(event);
break;
if now.saturating_duration_since(event.time) >= self.timeout {
// remove previous event of the same kind
let kind_index = kind_index.entry(path.clone()).or_default();
if let Some(idx) = kind_index.get(&event.kind).copied() {
events_expired.remove(idx);

kind_index.values_mut().for_each(|i| {
if *i > idx {
*i -= 1
}
})
}
}
kind_index.insert(event.kind, events_expired.len());

if !queue.events.is_empty() {
queues_remaining.insert(path, queue);
self.blocking.remove_blocker(&path, event.time);

events_expired.push(event);
} else {
self.queues.get_mut(&path).unwrap().events.push_front(event);

break;
}
}

self.queues = queues_remaining;
self.queues.retain(|_, queue| !queue.events.is_empty());

// order events for different files chronologically, but keep the order of events for the same file
events_expired.sort_by(|event_a, event_b| {
// use the last path because rename events are emitted for the target path
if event_a.paths.last() == event_b.paths.last() {
std::cmp::Ordering::Equal
} else {
event_a.time.cmp(&event_b.time)
}
});
if self.queues.is_empty() {
self.blocking.entries.clear();
}

events_expired
}
Expand Down Expand Up @@ -426,18 +486,6 @@ impl<T: FileIdCache> DebounceDataInner<T> {
source_queue.events.remove(remove_index);
}

// split off remove or move out event and add it back to the events map
if source_queue.was_removed() {
let event = source_queue.events.pop_front().unwrap();

self.queues.insert(
event.paths[0].clone(),
Queue {
events: [event].into(),
},
);
}

// update paths
for e in &mut source_queue.events {
e.paths = vec![event.paths[0].clone()];
Expand All @@ -456,7 +504,12 @@ impl<T: FileIdCache> DebounceDataInner<T> {
}

if let Some(target_queue) = self.queues.get_mut(&event.paths[0]) {
if !target_queue.was_created() {
if target_queue.was_removed() {
let event = target_queue.events.pop_front().unwrap();
source_queue.events.push_front(event);
}

if !target_queue.was_created() && !source_queue.was_removed() {
let mut remove_event = DebouncedEvent {
event: Event {
kind: EventKind::Remove(RemoveKind::Any),
Expand All @@ -474,6 +527,8 @@ impl<T: FileIdCache> DebounceDataInner<T> {
} else {
self.queues.insert(event.paths[0].clone(), source_queue);
}

self.find_blocked_events(&event.paths[0]);
}

fn push_remove_event(&mut self, event: Event, time: Instant) {
Expand Down Expand Up @@ -519,6 +574,25 @@ impl<T: FileIdCache> DebounceDataInner<T> {
);
}
}

fn find_blocked_events(&mut self, path: &Path) {
for queue in self.queues.values_mut() {
for event in &mut queue.events {
if matches!(
event.event.kind,
EventKind::Modify(ModifyKind::Name(RenameMode::Both))
) && event.event.paths[0] == path
{
self.blocking.add_blocker(BlockEntry {
blocker_path: event.event.paths[1].clone(),
blocker_time: event.time,
blockee_path: path.to_path_buf(),
});
break;
}
}
}
}
}

/// Debouncer guard, stops the debouncer on drop.
Expand Down Expand Up @@ -756,6 +830,11 @@ mod tests {
"add_remove_parent_event_after_remove_child_event",
"add_errors",
"emit_continuous_modify_content_events",
"emit_create_event_after_safe_save_and_backup_override",
"emit_create_event_after_safe_save_and_backup_rotation_twice",
"emit_create_event_after_safe_save_and_backup_rotation",
"emit_create_event_after_safe_save_and_double_move",
"emit_create_event_after_safe_save_and_double_move_and_recreate",
"emit_events_in_chronological_order",
"emit_events_with_a_prepended_rename_event",
"emit_close_events_only_once",
Expand Down
3 changes: 2 additions & 1 deletion notify-debouncer-full/src/testing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use notify::{
Error, ErrorKind, Event, EventKind, RecursiveMode,
};

use crate::{DebounceDataInner, DebouncedEvent, FileIdCache, Queue};
use crate::{BlockManager, DebounceDataInner, DebouncedEvent, FileIdCache, Queue};

pub(crate) use schema::TestCase;

Expand Down Expand Up @@ -268,6 +268,7 @@ impl schema::State {

DebounceDataInner {
queues,
blocking: BlockManager::new(),
roots: Vec::new(),
cache,
rename_event,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Based on the Blender safe save scenario.
//
// In this scenario the backup file is not removed first.
{
state: {}
events: [
{ kind: "create-any", paths: ["/watch/file@"], time: 1 }
{ kind: "rename-from", paths: ["/watch/file"], tracker: 1, time: 3 }
{ kind: "rename-to", paths: ["/watch/file1"], tracker: 1, time: 4 }
{ kind: "rename-from", paths: ["/watch/file@"], tracker: 2, time: 5 }
{ kind: "rename-to", paths: ["/watch/file"], tracker: 2, time: 6 }
]
expected: {
queues: {
/watch/file: {
events: [
{ kind: "create-any", paths: ["*"], time: 1 }
]
}
/watch/file1: {
events: [
{ kind: "rename-both", paths: ["/watch/file", "/watch/file1"], tracker: 1, time: 3 }
]
}
}
events: {
long: [
{ kind: "rename-both", paths: ["/watch/file", "/watch/file1"], tracker: 1, time: 3 }
{ kind: "create-any", paths: ["/watch/file"], time: 1 }
]
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// https://github.com/notify-rs/notify/issues/587
//
// Blender causes the following events when saving a file:
//
// create test.blend@ (new content)
// delete test.blend1 (delete backup)
// rename test.blend -> test.blend1 (move current to backup)
// rename test.blend@ -> test.blend (move new to current)
{
state: {}
events: [
{ kind: "create-any", paths: ["/watch/file@"], time: 1 }
{ kind: "remove-any", paths: ["/watch/file1"], time: 2 }
{ kind: "rename-from", paths: ["/watch/file"], tracker: 1, time: 3 }
{ kind: "rename-to", paths: ["/watch/file1"], tracker: 1, time: 4 }
{ kind: "rename-from", paths: ["/watch/file@"], tracker: 2, time: 5 }
{ kind: "rename-to", paths: ["/watch/file"], tracker: 2, time: 6 }
]
expected: {
queues: {
/watch/file: {
events: [
{ kind: "create-any", paths: ["*"], time: 1 }
]
}
/watch/file1: {
events: [
{ kind: "remove-any", paths: ["*"], time: 2 }
{ kind: "rename-both", paths: ["/watch/file", "/watch/file1"], tracker: 1, time: 3 }
]
}
}
events: {
long: [
{ kind: "remove-any", paths: ["/watch/file1"], time: 2 }
{ kind: "rename-both", paths: ["/watch/file", "/watch/file1"], tracker: 1, time: 3 }
{ kind: "create-any", paths: ["/watch/file"], time: 1 }
]
}
}
}
Loading

0 comments on commit 57a24ed

Please sign in to comment.