diff --git a/src/clj/puppetlabs/trapperkeeper/services/watcher/filesystem_watch_core.clj b/src/clj/puppetlabs/trapperkeeper/services/watcher/filesystem_watch_core.clj index 1b359f2..0079f93 100644 --- a/src/clj/puppetlabs/trapperkeeper/services/watcher/filesystem_watch_core.clj +++ b/src/clj/puppetlabs/trapperkeeper/services/watcher/filesystem_watch_core.clj @@ -1,5 +1,6 @@ (ns puppetlabs.trapperkeeper.services.watcher.filesystem-watch-core - (:require [clojure.tools.logging :as log] + (:require [clojure.set :as set] + [clojure.tools.logging :as log] [me.raynes.fs :as fs] [schema.core :as schema] [puppetlabs.i18n.core :refer [trs]] @@ -175,14 +176,60 @@ (= (.toPath ^File (:changed-path event)) (:file callback-entry)))))] (filter filter-fn events))) +(defn ->tuple + "Create a tuple of the two fields we care about in an Event. + This simple element querying our sets simple." + [event] + [(:changed-path event) (:type event)]) + +(defn already-seen? + "Expects a set of event tuples (see ->tuple) and a full Event. + Is this a modify event for a changed-path where we've already seen a create or modify event?" + [events {:keys [changed-path type] :as _event}] + (when (= :modify type) + (or (set/subset? #{[changed-path :create]} events) + (set/subset? #{[changed-path :modify]} events)))) + +(defn should-overwrite? + "Expects a set of event tuples (see ->tuple) and a full Event. + Is this a create event that should supersede a previously found modify or create event?" + [events {:keys [changed-path type] :as _event}] + (when (= :create type) + (or (set/subset? #{[changed-path :modify]} events) + (set/subset? #{[changed-path :create]} events)))) + +(defn reducible-remove + [{:keys [tuples] :as memo} item] + (cond + ;; Ignore the new item if we've already seen a value representing it + (already-seen? tuples item) + memo + ;; Remove old value and replace it with the new value + (should-overwrite? tuples item) + (-> memo + (update :tuples set/difference #{[(:changed-path item) :modify]}) + (update :tuples set/union #{(->tuple item)}) + (update :events (fn [es] (filter #(and (= (:changed-path item) (:changed-path %)) + (or + (= :modify (:type %)) + (= :create (:type %)))) + es))) + (update :events conj item)) + ;; default add the event to our tracker + :else + (-> memo + (update :tuples set/union #{(->tuple item)}) + (update :events conj item)))) + (schema/defn process-events! "Process for side effects any events that occurred for watcher's watch-key" [watcher :- (schema/protocol Watcher) events :- [Event]] - (let [callbacks @(:callbacks watcher)] + (let [callbacks @(:callbacks watcher) + compressed-events (->> events (reduce reducible-remove {:tuples #{} :events []}) :events)] ;; avoid doing a potentially expensive walk when we aren't logging at :debug (when (log/enabled? :debug) - (let [events-by-dir (group-by :watched-path events)] + (let [events-by-dir (group-by :watched-path compressed-events)] (doseq [[dir events'] events-by-dir] (log/debug (trs "Got {0} event(s) in directory {1}" (count events') dir))))) @@ -191,10 +238,10 @@ (when (log/enabled? :trace) (log/tracef "%s\n%s" (trs "Events:") - (ks/pprint-to-string events))) + (ks/pprint-to-string compressed-events))) (doseq [callback-entry callbacks] - ((:callback callback-entry) (filter-events callback-entry events))))) + ((:callback callback-entry) (filter-events callback-entry compressed-events))))) (schema/defn watch! :- Future "Creates and returns a future. Processes events for the passed in watcher within the context of that future. diff --git a/test/puppetlabs/trapperkeeper/services/watcher/filesystem_watch_service_test.clj b/test/puppetlabs/trapperkeeper/services/watcher/filesystem_watch_service_test.clj index 864d0e7..00510fc 100644 --- a/test/puppetlabs/trapperkeeper/services/watcher/filesystem_watch_service_test.clj +++ b/test/puppetlabs/trapperkeeper/services/watcher/filesystem_watch_service_test.clj @@ -23,9 +23,13 @@ (fn [events] (swap! dest concat events))) +(defn format-events + [dest] + (set (map #(select-keys % [:changed-path :type]) @dest))) + (defn contains-events? [dest events] - (let [select-keys-set (set (map #(select-keys % [:changed-path :type]) @dest))] + (let [select-keys-set (format-events dest)] (set/subset? events select-keys-set))) (defn exactly-matches-event? @@ -47,7 +51,7 @@ (loop [] (let [elapsed-time (- (System/currentTimeMillis) start-time)] (if (contains-events? dest events) - events + (format-events dest) (if (< elapsed-time timeout-ms) (do (Thread/sleep 100) (recur)) @@ -100,9 +104,6 @@ (spit first-file "foo") (let [events #{{:changed-path first-file :type :create}}] - ;; This is the first of many weird assertions like this, but it's done - ;; this way on purpose to get decent reporting from failed assertions. - ;; See above the docstring on wait-for-events. (is (= events (wait-for-events results events))))) (testing "callback invoked again when another new file is created" (reset! results []) @@ -144,6 +145,7 @@ (let [events #{{:changed-path sub-dir :type :create}}] (is (= events (wait-for-events results events)))) + (reset! results []) (is (fs/delete sub-dir)) (let [events #{{:changed-path sub-dir :type :delete}}] @@ -260,8 +262,8 @@ (reset! results []) (is (fs/delete first-file)) (is (fs/delete second-file)) - (let [events [{:changed-path first-file - :type :delete}]] + (let [events #{{:changed-path first-file + :type :delete}}] (is (= events (wait-for-events results events)))))) (testing "re-creation of a deleted directory" (reset! results []) @@ -444,7 +446,7 @@ (is (fs/delete-dir intermediate-dir)) (let [events #{{:changed-path intermediate-dir :type :delete}}] - (is (= events (wait-for-events results events))))) + (is (every? #(= :delete (:type %)) (wait-for-events results events))))) (let [another-nested-dir (fs/file root-dir "another-nested-dir") new-nested-file (fs/file another-nested-dir "new-nested-file")] (testing "new nested directory" @@ -464,6 +466,8 @@ (reset! results []) (is (fs/delete-dir another-nested-dir)) (let [events #{{:changed-path another-nested-dir + :type :delete} + {:changed-path new-nested-file :type :delete}}] (is (= events (wait-for-events results events))))))))))) @@ -628,6 +632,8 @@ (testing "Deletion thereof" (is (fs/delete-dir nested-dir)) (let [events #{{:changed-path nested-dir + :type :delete} + {:changed-path test-file-nested :type :delete}}] (is (= events (wait-for-events results events)))) (reset! results [])