Skip to content

Commit

Permalink
(maint) implement more proactive shutdown behaviors
Browse files Browse the repository at this point in the history
This does several things:
* creates a shutdown? boolean that indicates if the service is being shut down.
* Collects all the futures created for background tasks to explicitly ask them to cancel at shutdown
* Adds a "Thread/isInterrupted" check to terminate if the thread has been asked to cancel and the watcher has not shut down properly
* Adds logging to all Throwable exceptions before rethrowing them.
  • Loading branch information
jonathannewman committed May 15, 2024
1 parent 8823aa8 commit 05f28aa
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -170,15 +170,19 @@
"Creates a future and processes events for the passed in watcher.
The future will continue until the underlying WatchService is closed."
[watcher :- (schema/protocol Watcher)
shutdown-fn :- IFn]
shutdown-fn :- IFn
stopped? :- (schema/atom schema/Bool)]
(future
(let [stopped? (atom false)]
(shutdown-fn #(while (not @stopped?)
(try
(let [events (retrieve-events watcher)]
(when-not (empty? events)
(process-events! watcher events)))
(catch ClosedWatchServiceException _e
(reset! stopped? true)
(log/info (trs "Closing watcher {0}" watcher)))))))))
(let [current-thread (Thread/currentThread)]
(shutdown-fn
#(while (and (not @stopped?) (not (.isInterrupted current-thread)))
(try
(let [events (retrieve-events watcher)]
(when-not (empty? events)
(process-events! watcher events)))
(catch ClosedWatchServiceException _e
(log/info (trs "Closing watcher {0}" watcher)))
(catch Throwable e
(log/error e (trs "Fatal error while watching directories"))
(throw e))))))))

Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,44 @@
(:require [clojure.tools.logging :as log]
[puppetlabs.i18n.core :refer [trs]]
[puppetlabs.trapperkeeper.services :as tk]
[puppetlabs.trapperkeeper.services.protocols.filesystem-watch-service :refer [FilesystemWatchService create-watcher]]
[puppetlabs.trapperkeeper.services.protocols.filesystem-watch-service :refer [FilesystemWatchService create-watcher]]
[puppetlabs.trapperkeeper.services.watcher.filesystem-watch-core :as watch-core])
(:import (java.io IOException)
(java.nio.file WatchService)))
(:import (java.nio.file WatchService)))

(tk/defservice filesystem-watch-service
FilesystemWatchService
[[:ShutdownService shutdown-on-error]]

(init
[this context]
(assoc context :watchers (atom [])))
(assoc context :watchers (atom [])
:watchers-futures (atom [])
:stopping? (atom false)))

(stop
[this context]
(log/info (trs "Shutting down watcher service"))
(reset! (:stopping? context) true)
;; Shut down the WatchServices
(doseq [watcher @(:watchers context)]
(try
(.close ^WatchService (:watch-service watcher))
(catch IOException e
(catch Throwable e
(log/warn e (trs "Exception while closing watch service")))))

(doseq [watchers-future @(:watchers-futures context)]
(try
(future-cancel watchers-future)
(loop [tries 100]
(if (and (pos? tries) (not (future-done? watchers-future)))
(do
(Thread/sleep 10)
(recur (dec tries)))
(log/debug (trs "Future completed with {0} tries left" tries))))
(catch Throwable e
(log/warn e (trs "Exception while closing watch service")))))

(log/info (trs "Done shutting down watcher service"))
context)

(create-watcher
Expand All @@ -31,9 +48,10 @@

(create-watcher
[this options]
(let [{:keys [watchers]} (tk/service-context this)
(let [{:keys [watchers watchers-futures stopping?]} (tk/service-context this)
watcher (watch-core/create-watcher options)
shutdown-fn (partial shutdown-on-error (tk/service-id this))]
(watch-core/watch! watcher shutdown-fn)
shutdown-fn (partial shutdown-on-error (tk/service-id this))
watch-future (watch-core/watch! watcher shutdown-fn stopping?)]
(swap! watchers conj watcher)
(swap! watchers-futures conj watch-future)
watcher)))

0 comments on commit 05f28aa

Please sign in to comment.