diff --git a/src/clj/puppetlabs/trapperkeeper/services/protocols/filesystem_watch_service.clj b/src/clj/puppetlabs/trapperkeeper/services/protocols/filesystem_watch_service.clj index 4a68b2a..3af9324 100644 --- a/src/clj/puppetlabs/trapperkeeper/services/protocols/filesystem_watch_service.clj +++ b/src/clj/puppetlabs/trapperkeeper/services/protocols/filesystem_watch_service.clj @@ -1,6 +1,7 @@ (ns puppetlabs.trapperkeeper.services.protocols.filesystem-watch-service (:require [schema.core :as schema]) - (:import (java.io File))) + (:import (java.io File) + (java.nio.file Path))) (def Event "Schema for an event on a file watched by this service." @@ -25,7 +26,22 @@ When dir is deleted, the behavior is unspecified, left up to the implementation, and may be platform-specific.") - (add-callback! [this callback] + (add-file-callback! [this path-to-file callback] [this path-to-file callback types] + "Adds a callback to a Watcher tha will be invoked when the watched file changes. + The parent directory containing the file must be added via `add-watch-dir!` for the callbacks to be triggered. + The callback will be passed a sequence of Events as its only argument. + The exact events passed to the callback are unspecified, left up to the implementation, + and possibly platform-dependent; however, the following events are guaranteed to be passed to the callback + + * an event of :type :create with :path p, when a file is created at path p + * an event of :type :modify with :path p, when the contents of a file at path p are modified + * an event of :type :delete with :path p, when a file is deleted at path p + + The types of events where the callback is triggered can be limited with the optional fourth argument. The fourth + argument should be the `set` of types of interest from: `create`, `modify`, `delete`, `unknown`. + The default is to specify all types.") + + (add-callback! [this callback] [this callback types] "Adds a callback to a Watcher. The callback will be invoked when any watched directories change. The callback will be passed a sequence of Events as its only argument. The exact events passed to the callback are @@ -38,7 +54,11 @@ Note that, for any of those particular changes, there may also be additional events passed to the callback, such as events on a parent directory of a - changed file.")) + changed file. + + The types of events where the callback is triggered can be limited with the optional third argument. + The types argument should be the `set` of types of interest from: `create`, `modify`, `delete`, `unknown`. + The default is to specify all types.")) (defprotocol FilesystemWatchService (create-watcher [this] [this options] diff --git a/src/clj/puppetlabs/trapperkeeper/services/watcher/filesystem_watch_core.clj b/src/clj/puppetlabs/trapperkeeper/services/watcher/filesystem_watch_core.clj index 74c30b8..0d8daa8 100644 --- a/src/clj/puppetlabs/trapperkeeper/services/watcher/filesystem_watch_core.clj +++ b/src/clj/puppetlabs/trapperkeeper/services/watcher/filesystem_watch_core.clj @@ -62,6 +62,8 @@ (IllegalArgumentException. (trs "Must pass a boolean value for :recursive (directory watching) option"))))) +(def all-types #{:create :modify :delete :unknown}) + (defrecord WatcherImpl [watch-service callbacks recursive] Watcher @@ -86,9 +88,21 @@ @recursive (:recursive options))))) (watch-protocol/add-watch-dir! this dir)) + (add-file-callback! + [this path-to-file callback] + (watch-protocol/add-file-callback! this path-to-file callback all-types)) + + (add-file-callback! + [_this path-to-file callback types] + (swap! callbacks conj {:callback callback :file path-to-file :types (set types)})) + + (add-callback! + [this callback] + (watch-protocol/add-callback! this callback all-types)) + (add-callback! - [_this callback] - (swap! callbacks conj callback))) + [_this callback types] + (swap! callbacks conj {:callback callback :types (set types)}))) (defn create-watcher ([] @@ -120,7 +134,7 @@ (map #(clojurize % (.watchable watch-key)) events))) (schema/defn retrieve-events :- [Event] - "Blocks until an event the watcher is concerned with has occured. Will then + "Blocks until an event the watcher is concerned with has occurred. Will then poll for a new event, waiting at least `window-min` for a new event to occur. Will continue polling for as long as there are new events that occur within `window-min`, or the `window-max` time limit has been exceeded." @@ -146,26 +160,38 @@ events)) initial-events))) +(defn filter-events + [callback-entry events] + (let [allowed-types (:types callback-entry) + event-filter (fn [event] (contains? allowed-types (:type event))) + ;; construct an optimal filter function based on the type of filtering needed + filter-fn (if (nil? (:file callback-entry)) + ;; check the type match only + event-filter + ;; check the file as well as the type match + (fn [event] (and (event-filter event) + (= (.toPath (:changed-path event)) (:file callback-entry)))))] + (filter filter-fn events))) + (schema/defn process-events! "Process for side effects any events that occurred for watcher's watch-key" [watcher :- (schema/protocol Watcher) events :- [Event]] (let [callbacks @(:callbacks watcher)] + ;; avoid doing a sequence walk if we aren't logging it ;; avoid doing a potentially expensive walk when we aren't logging at :debug (when (log/enabled? :debug) (let [events-by-dir (group-by :watched-path events)] (doseq [[dir events'] events-by-dir] (log/debug (trs "Got {0} event(s) in directory {1}" - (count events') dir))))) - - ;; avoid doing a potentially expensive print-to-string when we aren't logging at :trace + (count events') dir))))) + ;; avoid doing a format and print-to-string if we aren't logging it (when (log/enabled? :trace) (log/tracef "%s\n%s" (trs "Events:") (ks/pprint-to-string events))) - - (doseq [callback callbacks] - (callback events)))) + (doseq [callback-entry callbacks] + ((:callback callback-entry) (filter-events callback-entry events))))) (schema/defn watch! :- Future "Creates and returns a future. Processes events for the passed in watcher within the context of that future. diff --git a/test/puppetlabs/trapperkeeper/services/watcher/filesystem_watch_service_test.clj b/test/puppetlabs/trapperkeeper/services/watcher/filesystem_watch_service_test.clj index 3777d49..13e61d3 100644 --- a/test/puppetlabs/trapperkeeper/services/watcher/filesystem_watch_service_test.clj +++ b/test/puppetlabs/trapperkeeper/services/watcher/filesystem_watch_service_test.clj @@ -1,16 +1,16 @@ (ns puppetlabs.trapperkeeper.services.watcher.filesystem-watch-service-test - (:require [clojure.test :refer [deftest testing is use-fixtures]] - [clojure.set :as set] - [schema.test :as schema-test] + (:require [clojure.set :as set] + [clojure.test :refer [deftest is testing use-fixtures]] [me.raynes.fs :as fs] - [puppetlabs.trapperkeeper.services.protocols.filesystem-watch-service :refer [add-callback! add-watch-dir! create-watcher]] - [puppetlabs.trapperkeeper.services.watcher.filesystem-watch-service :refer [filesystem-watch-service]] + [puppetlabs.trapperkeeper.app :as tk-app] + [puppetlabs.trapperkeeper.core :as tk] + [puppetlabs.trapperkeeper.internal :as tk-internal] + [puppetlabs.trapperkeeper.services.protocols.filesystem-watch-service :refer [add-callback! add-file-callback! add-watch-dir! create-watcher]] [puppetlabs.trapperkeeper.services.watcher.filesystem-watch-core :as watch-core] + [puppetlabs.trapperkeeper.services.watcher.filesystem-watch-service :refer [filesystem-watch-service]] [puppetlabs.trapperkeeper.testutils.bootstrap :refer [with-app-with-config]] [puppetlabs.trapperkeeper.testutils.logging :refer [with-test-logging]] - [puppetlabs.trapperkeeper.core :as tk] - [puppetlabs.trapperkeeper.app :as tk-app] - [puppetlabs.trapperkeeper.internal :as tk-internal]) + [schema.test :as schema-test]) (:import (com.puppetlabs DirWatchUtils) (java.io File) (java.net URI) @@ -26,14 +26,13 @@ (defn contains-events? [dest events] (let [select-keys-set (set (map #(select-keys % [:changed-path :type]) @dest))] - (set/subset? events select-keys-set))) + (set/subset? events select-keys-set))) (defn exactly-matches-event? [dest expected-event] (let [select-keys-set (set (map #(select-keys % [:changed-path :type]) @dest))] (= expected-event select-keys-set))) - (def wait-time "Number of milliseconds wait-for-events! should wait." (* 10 1000)) ;; 10 seconds @@ -41,17 +40,20 @@ (defn wait-for-events "Waits for events to land in dest. If they do, events is returned. Gives up and returns the contents of dest after wait-time." - [dest events] - (let [start-time (System/currentTimeMillis)] - (loop [] - (let [elapsed-time (- (System/currentTimeMillis) start-time)] - (if (contains-events? dest events) - events - (if (< elapsed-time wait-time) - (recur) - (do - (println "timed-out waiting for events") - @dest))))))) + ([dest events] + (wait-for-events dest events wait-time)) + ([dest events timeout-ms] + (let [start-time (System/currentTimeMillis)] + (loop [] + (let [elapsed-time (- (System/currentTimeMillis) start-time)] + (if (contains-events? dest events) + events + (if (< elapsed-time timeout-ms) + (do (Thread/sleep 100) + (recur)) + (do + (println "timed-out waiting for events") + @dest)))))))) (defn wait-for-exactly-event "Waits for event to land in dest. On first registered event or events in dest, @@ -71,14 +73,16 @@ @dest)))))) (defn watch!* - [watcher root callback] + [watcher root callback types] (add-watch-dir! watcher root {:recursive true}) - (add-callback! watcher callback)) + (add-callback! watcher callback types)) ;; TODO perhaps move this (or something similar) up to the TK protocol (defn watch! - [service root callback] - (watch!* (create-watcher service) root callback)) + ([service root callback] + (watch! service root callback [:create :modify :delete :unknown])) + ([service root callback types] + (watch!* (create-watcher service) root callback types))) (deftest ^:integration single-path-test (let [root (fs/temp-dir "single-path-test") @@ -151,6 +155,190 @@ :type :create}}] (is (= events (wait-for-events results events)))))))))) +(deftest ^:integration one-event-type + (let [root (fs/temp-dir "single-path-test") + first-file (fs/file root "first-file") + second-file (fs/file root "second-file") + results (atom []) + callback (make-callback results)] + (with-app-with-config + app [filesystem-watch-service] {} + (let [service (tk-app/get-service app :FilesystemWatchService)] + (watch! service root callback [:modify])) + (testing "callback not invoked until directory changes" + (is (= @results []))) + (testing "callback not invoked when a new file is created" + (spit first-file "foo") + (Thread/sleep 1500) + (is (empty? @results))) + (testing "callback not invoked when another new file is created" + (reset! results []) + (spit second-file "bar") + (Thread/sleep 1500) + (is (empty? @results))) + (testing "reports file modifications" + (testing "of a single file" + (reset! results []) + (spit first-file "something different") + (let [events #{{:changed-path first-file + :type :modify}}] + (is (= events (wait-for-events results events))))) + (testing "of multiple files" + (reset! results []) + (spit first-file "still not the same as before") + (spit second-file "still not the same as before") + (let [events #{{:changed-path first-file + :type :modify} + {:changed-path second-file + :type :modify}}] + (is (= events (wait-for-events results events)))))) + (testing "watch-dir! does not report file deletions" + (testing "of multiple files" + (reset! results []) + (is (fs/delete first-file)) + (is (fs/delete second-file)) + (let [events #{}] + (is (= events (wait-for-events results events)))))) + (testing "re-creation of a deleted directory" + (reset! results []) + (let [sub-dir (fs/file root "sub-dir")] + (testing "Initial directory creation and deletion" + (is (fs/mkdir sub-dir)) + (Thread/sleep 1500) + (is (empty? @results)) + (is (fs/delete sub-dir)) + (Thread/sleep 1500) + (is (empty? @results))) + (testing "Re-creating the directory does not create an event as expected" + (reset! results []) + (is (fs/mkdir sub-dir)) + (Thread/sleep 1500) + (is (empty? @results)))))))) + +(deftest ^:integration one-file-watch + (let [root (fs/temp-dir "single-path-test") + first-file (fs/file root "first-file") + second-file (fs/file root "second-file") + results (atom []) + callback (make-callback results)] + (with-app-with-config + app [filesystem-watch-service] {} + (let [service (tk-app/get-service app :FilesystemWatchService) + watcher (create-watcher service)] + (add-watch-dir! watcher root) + ;; watch only the first path + (add-file-callback! watcher (.toPath first-file) callback)) + (testing "callback not invoked until directory changes" + (is (= @results []))) + (testing "callback invoked when a new file is created" + (spit first-file "foo") + (let [events #{{:changed-path first-file + :type :create}}] + (is (= events (wait-for-events results events))))) + (testing "callback not invoked when another new file is created" + (reset! results []) + (spit second-file "bar") + (Thread/sleep 1500) + (is (empty? @results))) + (testing "reports file modifications" + (testing "of a single file" + (reset! results []) + (spit first-file "something different") + (let [events #{{:changed-path first-file + :type :modify}}] + (is (= events (wait-for-events results events))))) + (testing "of single file when multiple files are changed" + (reset! results []) + (spit first-file "still not the same as before") + (spit second-file "still not the same as before") + (let [events #{{:changed-path first-file + :type :modify}}] + (is (= events (wait-for-events results events)))))) + (testing "watch-dir! reports file deletion" + (testing "of single file when multiples are deleted" + (reset! results []) + (is (fs/delete first-file)) + (is (fs/delete second-file)) + (let [events [{:changed-path first-file + :type :delete}]] + (is (= events (wait-for-events results events)))))) + (testing "re-creation of a deleted directory" + (reset! results []) + (let [sub-dir (fs/file root "sub-dir")] + (testing "Initial directory creation and deletion" + (is (fs/mkdir sub-dir)) + (Thread/sleep 1500) + (is (empty? @results)) + (is (fs/delete sub-dir)) + (Thread/sleep 1500) + (is (empty? @results))) + (testing "Re-creating the directory does not create an event as expected" + (reset! results []) + (is (fs/mkdir sub-dir)) + (Thread/sleep 1500) + (is (empty? @results)))))))) + +(deftest ^:integration one-file-watch-single-event + (let [root (fs/temp-dir "single-path-test") + first-file (fs/file root "first-file") + second-file (fs/file root "second-file") + results (atom []) + callback (make-callback results)] + (with-app-with-config + app [filesystem-watch-service] {} + (let [service (tk-app/get-service app :FilesystemWatchService) + watcher (create-watcher service)] + (add-watch-dir! watcher root) + ;; watch only the first path + (add-file-callback! watcher (.toPath first-file) callback #{:modify})) + (testing "callback not invoked until directory changes" + (is (= @results []))) + (testing "callback not invoked when a new file is created" + (spit first-file "foo") + (Thread/sleep 1500) + (is (empty? @results))) + (testing "callback not invoked when another new file is created" + (reset! results []) + (spit second-file "bar") + (Thread/sleep 1500) + (is (empty? @results))) + (testing "reports file modifications" + (testing "of a single file" + (reset! results []) + (spit first-file "something different") + (let [events #{{:changed-path first-file + :type :modify}}] + (is (= events (wait-for-events results events))))) + (testing "of single file when multiple files are changed" + (reset! results []) + (spit first-file "still not the same as before") + (spit second-file "still not the same as before") + (let [events #{{:changed-path first-file + :type :modify}}] + (is (= events (wait-for-events results events)))))) + (testing "watch-dir! does not report file deletion" + (testing "of single file when multiples are deleted" + (reset! results []) + (is (fs/delete first-file)) + (is (fs/delete second-file)) + (Thread/sleep 1500) + (is (empty? @results)))) + (testing "re-creation of a deleted directory" + (reset! results []) + (let [sub-dir (fs/file root "sub-dir")] + (testing "Initial directory creation and deletion" + (is (fs/mkdir sub-dir)) + (Thread/sleep 1500) + (is (empty? @results)) + (is (fs/delete sub-dir)) + (Thread/sleep 1500) + (is (empty? @results))) + (testing "Re-creating the directory does not create an event as expected" + (reset! results []) + (is (fs/mkdir sub-dir)) + (Thread/sleep 1500) + (is (empty? @results)))))))) + (deftest ^:integration multi-callbacks-test (let [root-1 (fs/temp-dir "multi-root-test-1") root-2 (fs/temp-dir "multi-root-test-2") @@ -464,8 +652,8 @@ test-file-1 (fs/file dir-1 "test-file") test-file-2 (fs/file dir-2 "test-file")] (testing "Watching separate directories" - (watch!* watcher-1 dir-1 callback-1) - (watch!* watcher-2 dir-2 callback-2) + (watch!* watcher-1 dir-1 callback-1 [:create :modify :delete :unknown]) + (watch!* watcher-2 dir-2 callback-2 [:create :modify :delete :unknown]) (testing "Events do not bleed over between watchers" (spit test-file-1 "foo") (spit test-file-2 "foo") @@ -483,7 +671,7 @@ results-3 (atom []) callback-3 (make-callback results-3)] ;; ... and tell it to watch the same directory as the first one. - (watch!* watcher-3 dir-1 callback-3) + (watch!* watcher-3 dir-1 callback-3 [:create :modify :delete :unknown]) (spit test-file-1 "bar") (spit test-file-2 "bar") (let [events-1 #{{:changed-path test-file-1