diff --git a/src/clj/puppetlabs/trapperkeeper/services/watcher/filesystem_watch_core.clj b/src/clj/puppetlabs/trapperkeeper/services/watcher/filesystem_watch_core.clj index b3e03df..e3147c4 100644 --- a/src/clj/puppetlabs/trapperkeeper/services/watcher/filesystem_watch_core.clj +++ b/src/clj/puppetlabs/trapperkeeper/services/watcher/filesystem_watch_core.clj @@ -8,7 +8,8 @@ (:import (clojure.lang IFn) (java.io File) (java.nio.file StandardWatchEventKinds Path WatchEvent WatchKey FileSystems ClosedWatchServiceException WatchService) - (com.puppetlabs DirWatchUtils))) + (com.puppetlabs DirWatchUtils) + (java.util.concurrent Future))) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;; Constants @@ -167,18 +168,25 @@ (callback events)))) (schema/defn watch! - "Creates a future and processes events for the passed in watcher. - The future will continue until the underlying WatchService is closed." + "Creates and returns a future. Processes events for the passed in watcher within the context of that future. + The future will continue until the underlying WatchService is closed, or the future is interrupted." [watcher :- (schema/protocol Watcher) - shutdown-fn :- IFn] + shutdown-fn :- IFn + stopped? :- (schema/atom schema/Bool)] (future - (let [stopped? (atom false)] - (shutdown-fn #(while (not @stopped?) - (try - (let [events (retrieve-events watcher)] - (when-not (empty? events) - (process-events! watcher events))) - (catch ClosedWatchServiceException _e - (reset! stopped? true) - (log/info (trs "Closing watcher {0}" watcher))))))))) + (shutdown-fn + #(while (not @stopped?) + (try + (let [events (retrieve-events watcher)] + (when-not (empty? events) + (process-events! watcher events))) + (catch ClosedWatchServiceException _e + (log/info (trs "Closing watcher {0}" watcher))) + ;; it is possible for `retrieve-events` to generate an InterruptedException if the `.take` occurs when an + ;; interrupt is requested. This is explicitly handled to prevent shutting down the whole application. + (catch InterruptedException _e + (log/info (trs "Watching for events interrupted by thread shutdown"))) + (catch Throwable e + (log/error e (trs "Fatal error while watching for events")) + (throw e))))))) diff --git a/src/clj/puppetlabs/trapperkeeper/services/watcher/filesystem_watch_service.clj b/src/clj/puppetlabs/trapperkeeper/services/watcher/filesystem_watch_service.clj index b5e5c42..a634eb3 100644 --- a/src/clj/puppetlabs/trapperkeeper/services/watcher/filesystem_watch_service.clj +++ b/src/clj/puppetlabs/trapperkeeper/services/watcher/filesystem_watch_service.clj @@ -2,10 +2,12 @@ (:require [clojure.tools.logging :as log] [puppetlabs.i18n.core :refer [trs]] [puppetlabs.trapperkeeper.services :as tk] - [puppetlabs.trapperkeeper.services.protocols.filesystem-watch-service :refer [FilesystemWatchService create-watcher]] + [puppetlabs.trapperkeeper.services.protocols.filesystem-watch-service :refer [FilesystemWatchService create-watcher]] [puppetlabs.trapperkeeper.services.watcher.filesystem-watch-core :as watch-core]) - (:import (java.io IOException) - (java.nio.file WatchService))) + (:import (java.nio.file WatchService))) + +(def max-future-cancel-wait-tries 100) +(def future-cancel-wait-sleep-ms 10) (tk/defservice filesystem-watch-service FilesystemWatchService @@ -13,16 +15,34 @@ (init [this context] - (assoc context :watchers (atom []))) + (assoc context :watchers (atom []) + :watchers-futures (atom []) + :stopping? (atom false))) (stop [this context] + (log/info (trs "Shutting down watcher service")) + (reset! (:stopping? context) true) ;; Shut down the WatchServices (doseq [watcher @(:watchers context)] (try (.close ^WatchService (:watch-service watcher)) - (catch IOException e + (catch Throwable e (log/warn e (trs "Exception while closing watch service"))))) + + (doseq [watchers-future @(:watchers-futures context)] + (try + (future-cancel watchers-future) + (loop [tries max-future-cancel-wait-tries] + (if (and (pos? tries) (not (future-done? watchers-future))) + (do + (Thread/sleep future-cancel-wait-sleep-ms) + (recur (dec tries))) + (log/debug (trs "Future completed after {0} tries" (- max-future-cancel-wait-tries tries))))) + (catch Throwable e + (log/warn e (trs "Exception while closing watch service"))))) + + (log/info (trs "Done shutting down watcher service")) context) (create-watcher @@ -31,9 +51,10 @@ (create-watcher [this options] - (let [{:keys [watchers]} (tk/service-context this) + (let [{:keys [watchers watchers-futures stopping?]} (tk/service-context this) watcher (watch-core/create-watcher options) - shutdown-fn (partial shutdown-on-error (tk/service-id this))] - (watch-core/watch! watcher shutdown-fn) + shutdown-fn (partial shutdown-on-error (tk/service-id this)) + watch-future (watch-core/watch! watcher shutdown-fn stopping?)] (swap! watchers conj watcher) + (swap! watchers-futures conj watch-future) watcher)))