diff --git a/src/clj/puppetlabs/trapperkeeper/services/watcher/filesystem_watch_core.clj b/src/clj/puppetlabs/trapperkeeper/services/watcher/filesystem_watch_core.clj index b3e03df..d4411bb 100644 --- a/src/clj/puppetlabs/trapperkeeper/services/watcher/filesystem_watch_core.clj +++ b/src/clj/puppetlabs/trapperkeeper/services/watcher/filesystem_watch_core.clj @@ -170,15 +170,19 @@ "Creates a future and processes events for the passed in watcher. The future will continue until the underlying WatchService is closed." [watcher :- (schema/protocol Watcher) - shutdown-fn :- IFn] + shutdown-fn :- IFn + stopped? :- (schema/atom schema/Bool)] (future - (let [stopped? (atom false)] - (shutdown-fn #(while (not @stopped?) - (try - (let [events (retrieve-events watcher)] - (when-not (empty? events) - (process-events! watcher events))) - (catch ClosedWatchServiceException _e - (reset! stopped? true) - (log/info (trs "Closing watcher {0}" watcher))))))))) + (let [current-thread (Thread/currentThread)] + (shutdown-fn + #(while (and (not @stopped?) (not (.isInterrupted current-thread))) + (try + (let [events (retrieve-events watcher)] + (when-not (empty? events) + (process-events! watcher events))) + (catch ClosedWatchServiceException _e + (log/info (trs "Closing watcher {0}" watcher))) + (catch Throwable e + (log/error e (trs "Fatal error while watching directories")) + (throw e)))))))) diff --git a/src/clj/puppetlabs/trapperkeeper/services/watcher/filesystem_watch_service.clj b/src/clj/puppetlabs/trapperkeeper/services/watcher/filesystem_watch_service.clj index b5e5c42..f6c20ed 100644 --- a/src/clj/puppetlabs/trapperkeeper/services/watcher/filesystem_watch_service.clj +++ b/src/clj/puppetlabs/trapperkeeper/services/watcher/filesystem_watch_service.clj @@ -2,10 +2,9 @@ (:require [clojure.tools.logging :as log] [puppetlabs.i18n.core :refer [trs]] [puppetlabs.trapperkeeper.services :as tk] - [puppetlabs.trapperkeeper.services.protocols.filesystem-watch-service :refer [FilesystemWatchService create-watcher]] + [puppetlabs.trapperkeeper.services.protocols.filesystem-watch-service :refer [FilesystemWatchService create-watcher]] [puppetlabs.trapperkeeper.services.watcher.filesystem-watch-core :as watch-core]) - (:import (java.io IOException) - (java.nio.file WatchService))) + (:import (java.nio.file WatchService))) (tk/defservice filesystem-watch-service FilesystemWatchService @@ -13,16 +12,34 @@ (init [this context] - (assoc context :watchers (atom []))) + (assoc context :watchers (atom []) + :watchers-futures (atom []) + :stopping? (atom false))) (stop [this context] + (log/info (trs "Shutting down watcher service")) + (reset! (:stopping? context) true) ;; Shut down the WatchServices (doseq [watcher @(:watchers context)] (try (.close ^WatchService (:watch-service watcher)) - (catch IOException e + (catch Throwable e (log/warn e (trs "Exception while closing watch service"))))) + + (doseq [watchers-future @(:watchers-futures context)] + (try + (future-cancel watchers-future) + (loop [tries 100] + (if (and (pos? tries) (not (future-done? watchers-future))) + (do + (Thread/sleep 10) + (recur (dec tries))) + (log/debug (trs "Future completed with {0} tries left" tries)))) + (catch Throwable e + (log/warn e (trs "Exception while closing watch service"))))) + + (log/info (trs "Done shutting down watcher service")) context) (create-watcher @@ -31,9 +48,10 @@ (create-watcher [this options] - (let [{:keys [watchers]} (tk/service-context this) + (let [{:keys [watchers watchers-futures stopping?]} (tk/service-context this) watcher (watch-core/create-watcher options) - shutdown-fn (partial shutdown-on-error (tk/service-id this))] - (watch-core/watch! watcher shutdown-fn) + shutdown-fn (partial shutdown-on-error (tk/service-id this)) + watch-future (watch-core/watch! watcher shutdown-fn stopping?)] (swap! watchers conj watcher) + (swap! watchers-futures conj watch-future) watcher)))