From 0174e24b19525893a2354b499f6a3fd3a5585d3f Mon Sep 17 00:00:00 2001 From: Justin Stoller Date: Tue, 22 Oct 2024 11:33:08 -0700 Subject: [PATCH] (PE-37592) Compress events for the same changed-path Previously, we would pass all events into a registered callback which triggered for the given watch-path. However, in some cases we may receive a `:create` event and then a `:modify` event as a new file is written to disk. We could also receive multiple `:modify` events, and because all of the events received from a WatchKey can be in any order we may receive the `:create` event after a `:modify` event. Eg, [`:modify`, `:create`, `:modify`, `:modify`] (imagine that these vectors are Events all sharing the same changed-path). However, we want to compress modifies into any creation events. This will reduce the events given to callbacks. For example, the events [`:create`, `:modify`] will reduce to only [`:create`], while [`:modify`, `:create`] will also reduce to [`:create`]; events like [`:modify`, `:modify`, `:modify`] will reduce to [`:modify`]. We do not compress delete events, so events like [`:modify`, `:create`, `:modify`, `:delete`] will be reduced to [`:create`, `:delete`]. The tests had the pecular property of collecting events until a given event was received, then only returning _the given event_. The tests have been updated to now return all the events received. Which, without this patch, many tests fail because they receive modify and create events, when the test is only expecting a create event. The tests now pass with few changes because of the implemented event compression. --- .../watcher/filesystem_watch_core.clj | 57 +++++++++++++++++-- .../watcher/filesystem_watch_service_test.clj | 22 ++++--- 2 files changed, 66 insertions(+), 13 deletions(-) diff --git a/src/clj/puppetlabs/trapperkeeper/services/watcher/filesystem_watch_core.clj b/src/clj/puppetlabs/trapperkeeper/services/watcher/filesystem_watch_core.clj index 1b359f2..0079f93 100644 --- a/src/clj/puppetlabs/trapperkeeper/services/watcher/filesystem_watch_core.clj +++ b/src/clj/puppetlabs/trapperkeeper/services/watcher/filesystem_watch_core.clj @@ -1,5 +1,6 @@ (ns puppetlabs.trapperkeeper.services.watcher.filesystem-watch-core - (:require [clojure.tools.logging :as log] + (:require [clojure.set :as set] + [clojure.tools.logging :as log] [me.raynes.fs :as fs] [schema.core :as schema] [puppetlabs.i18n.core :refer [trs]] @@ -175,14 +176,60 @@ (= (.toPath ^File (:changed-path event)) (:file callback-entry)))))] (filter filter-fn events))) +(defn ->tuple + "Create a tuple of the two fields we care about in an Event. + This simple element querying our sets simple." + [event] + [(:changed-path event) (:type event)]) + +(defn already-seen? + "Expects a set of event tuples (see ->tuple) and a full Event. + Is this a modify event for a changed-path where we've already seen a create or modify event?" + [events {:keys [changed-path type] :as _event}] + (when (= :modify type) + (or (set/subset? #{[changed-path :create]} events) + (set/subset? #{[changed-path :modify]} events)))) + +(defn should-overwrite? + "Expects a set of event tuples (see ->tuple) and a full Event. + Is this a create event that should supersede a previously found modify or create event?" + [events {:keys [changed-path type] :as _event}] + (when (= :create type) + (or (set/subset? #{[changed-path :modify]} events) + (set/subset? #{[changed-path :create]} events)))) + +(defn reducible-remove + [{:keys [tuples] :as memo} item] + (cond + ;; Ignore the new item if we've already seen a value representing it + (already-seen? tuples item) + memo + ;; Remove old value and replace it with the new value + (should-overwrite? tuples item) + (-> memo + (update :tuples set/difference #{[(:changed-path item) :modify]}) + (update :tuples set/union #{(->tuple item)}) + (update :events (fn [es] (filter #(and (= (:changed-path item) (:changed-path %)) + (or + (= :modify (:type %)) + (= :create (:type %)))) + es))) + (update :events conj item)) + ;; default add the event to our tracker + :else + (-> memo + (update :tuples set/union #{(->tuple item)}) + (update :events conj item)))) + (schema/defn process-events! "Process for side effects any events that occurred for watcher's watch-key" [watcher :- (schema/protocol Watcher) events :- [Event]] - (let [callbacks @(:callbacks watcher)] + (let [callbacks @(:callbacks watcher) + compressed-events (->> events (reduce reducible-remove {:tuples #{} :events []}) :events)] ;; avoid doing a potentially expensive walk when we aren't logging at :debug (when (log/enabled? :debug) - (let [events-by-dir (group-by :watched-path events)] + (let [events-by-dir (group-by :watched-path compressed-events)] (doseq [[dir events'] events-by-dir] (log/debug (trs "Got {0} event(s) in directory {1}" (count events') dir))))) @@ -191,10 +238,10 @@ (when (log/enabled? :trace) (log/tracef "%s\n%s" (trs "Events:") - (ks/pprint-to-string events))) + (ks/pprint-to-string compressed-events))) (doseq [callback-entry callbacks] - ((:callback callback-entry) (filter-events callback-entry events))))) + ((:callback callback-entry) (filter-events callback-entry compressed-events))))) (schema/defn watch! :- Future "Creates and returns a future. Processes events for the passed in watcher within the context of that future. diff --git a/test/puppetlabs/trapperkeeper/services/watcher/filesystem_watch_service_test.clj b/test/puppetlabs/trapperkeeper/services/watcher/filesystem_watch_service_test.clj index 864d0e7..00510fc 100644 --- a/test/puppetlabs/trapperkeeper/services/watcher/filesystem_watch_service_test.clj +++ b/test/puppetlabs/trapperkeeper/services/watcher/filesystem_watch_service_test.clj @@ -23,9 +23,13 @@ (fn [events] (swap! dest concat events))) +(defn format-events + [dest] + (set (map #(select-keys % [:changed-path :type]) @dest))) + (defn contains-events? [dest events] - (let [select-keys-set (set (map #(select-keys % [:changed-path :type]) @dest))] + (let [select-keys-set (format-events dest)] (set/subset? events select-keys-set))) (defn exactly-matches-event? @@ -47,7 +51,7 @@ (loop [] (let [elapsed-time (- (System/currentTimeMillis) start-time)] (if (contains-events? dest events) - events + (format-events dest) (if (< elapsed-time timeout-ms) (do (Thread/sleep 100) (recur)) @@ -100,9 +104,6 @@ (spit first-file "foo") (let [events #{{:changed-path first-file :type :create}}] - ;; This is the first of many weird assertions like this, but it's done - ;; this way on purpose to get decent reporting from failed assertions. - ;; See above the docstring on wait-for-events. (is (= events (wait-for-events results events))))) (testing "callback invoked again when another new file is created" (reset! results []) @@ -144,6 +145,7 @@ (let [events #{{:changed-path sub-dir :type :create}}] (is (= events (wait-for-events results events)))) + (reset! results []) (is (fs/delete sub-dir)) (let [events #{{:changed-path sub-dir :type :delete}}] @@ -260,8 +262,8 @@ (reset! results []) (is (fs/delete first-file)) (is (fs/delete second-file)) - (let [events [{:changed-path first-file - :type :delete}]] + (let [events #{{:changed-path first-file + :type :delete}}] (is (= events (wait-for-events results events)))))) (testing "re-creation of a deleted directory" (reset! results []) @@ -444,7 +446,7 @@ (is (fs/delete-dir intermediate-dir)) (let [events #{{:changed-path intermediate-dir :type :delete}}] - (is (= events (wait-for-events results events))))) + (is (every? #(= :delete (:type %)) (wait-for-events results events))))) (let [another-nested-dir (fs/file root-dir "another-nested-dir") new-nested-file (fs/file another-nested-dir "new-nested-file")] (testing "new nested directory" @@ -464,6 +466,8 @@ (reset! results []) (is (fs/delete-dir another-nested-dir)) (let [events #{{:changed-path another-nested-dir + :type :delete} + {:changed-path new-nested-file :type :delete}}] (is (= events (wait-for-events results events))))))))))) @@ -628,6 +632,8 @@ (testing "Deletion thereof" (is (fs/delete-dir nested-dir)) (let [events #{{:changed-path nested-dir + :type :delete} + {:changed-path test-file-nested :type :delete}}] (is (= events (wait-for-events results events)))) (reset! results [])