Skip to content

Commit

Permalink
Merge pull request #30 from jonathannewman/maint/main/streamline-shut…
Browse files Browse the repository at this point in the history
…down
  • Loading branch information
jonathannewman authored May 16, 2024
2 parents e1a5d74 + c780ac9 commit c24619f
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
(:import (clojure.lang IFn)
(java.io File)
(java.nio.file StandardWatchEventKinds Path WatchEvent WatchKey FileSystems ClosedWatchServiceException WatchService)
(com.puppetlabs DirWatchUtils)))
(com.puppetlabs DirWatchUtils)
(java.util.concurrent Future)))

;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;;; Constants
Expand Down Expand Up @@ -145,39 +146,47 @@
events))
initial-events)))


(schema/defn process-events!
"Process for side-effects any events that occured for watcher's watch-key"
"Process for side effects any events that occurred for watcher's watch-key"
[watcher :- (schema/protocol Watcher)
events :- [Event]]
(let [callbacks @(:callbacks watcher)
events-by-dir (group-by :watched-path events)]
(let [callbacks @(:callbacks watcher)]
;; avoid doing a potentially expensive walk when we aren't logging at :debug
(when (log/enabled? :debug)
(doseq [[dir events'] events-by-dir]
(log/debug (trs "Got {0} event(s) in directory {1}"
(count events') dir))))
(let [events-by-dir (group-by :watched-path events)]
(doseq [[dir events'] events-by-dir]
(log/debug (trs "Got {0} event(s) in directory {1}"
(count events') dir)))))

;; avoid doing a potentially expensive print-to-string when we aren't logging at :trace
(when (log/enabled? :trace)
(log/tracef "%s\n%s"
(trs "Events:")
(ks/pprint-to-string events)))

(doseq [callback callbacks]
(callback events))))

(schema/defn watch!
"Creates a future and processes events for the passed in watcher.
The future will continue until the underlying WatchService is closed."
(schema/defn watch! :- Future
"Creates and returns a future. Processes events for the passed in watcher within the context of that future.
The future will continue until the underlying WatchService is closed, or the future is interrupted."
[watcher :- (schema/protocol Watcher)
shutdown-fn :- IFn]
shutdown-fn :- IFn
stopped? :- (schema/atom schema/Bool)]
(future
(let [stopped? (atom false)]
(shutdown-fn #(while (not @stopped?)
(try
(let [events (retrieve-events watcher)]
(when-not (empty? events)
(process-events! watcher events)))
(catch ClosedWatchServiceException _e
(reset! stopped? true)
(log/info (trs "Closing watcher {0}" watcher)))))))))
(shutdown-fn
#(while (not @stopped?)
(try
(let [events (retrieve-events watcher)]
(when-not (empty? events)
(process-events! watcher events)))
(catch ClosedWatchServiceException _e
(log/info (trs "Closing watcher {0}" watcher)))
;; it is possible for `retrieve-events` to generate an InterruptedException if the `.take` occurs when an
;; interrupt is requested. This is explicitly handled to prevent shutting down the whole application.
(catch InterruptedException _e
(log/info (trs "Watching for events interrupted by thread shutdown")))
(catch Throwable e
(log/error e (trs "Fatal error while watching for events"))
(throw e)))))))

Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,47 @@
(:require [clojure.tools.logging :as log]
[puppetlabs.i18n.core :refer [trs]]
[puppetlabs.trapperkeeper.services :as tk]
[puppetlabs.trapperkeeper.services.protocols.filesystem-watch-service :refer [FilesystemWatchService create-watcher]]
[puppetlabs.trapperkeeper.services.protocols.filesystem-watch-service :refer [FilesystemWatchService create-watcher]]
[puppetlabs.trapperkeeper.services.watcher.filesystem-watch-core :as watch-core])
(:import (java.io IOException)
(java.nio.file WatchService)))
(:import (java.nio.file WatchService)))

(def max-future-cancel-wait-tries 100)
(def future-cancel-wait-sleep-ms 10)

(tk/defservice filesystem-watch-service
FilesystemWatchService
[[:ShutdownService shutdown-on-error]]

(init
[this context]
(assoc context :watchers (atom [])))
(assoc context :watchers (atom [])
:watchers-futures (atom [])
:stopping? (atom false)))

(stop
[this context]
(log/info (trs "Shutting down watcher service"))
(reset! (:stopping? context) true)
;; Shut down the WatchServices
(doseq [watcher @(:watchers context)]
(try
(.close ^WatchService (:watch-service watcher))
(catch IOException e
(catch Throwable e
(log/warn e (trs "Exception while closing watch service")))))

(doseq [watchers-future @(:watchers-futures context)]
(try
(future-cancel watchers-future)
(loop [tries max-future-cancel-wait-tries]
(if (and (pos? tries) (not (future-done? watchers-future)))
(do
(Thread/sleep future-cancel-wait-sleep-ms)
(recur (dec tries)))
(log/debug (trs "Future completed after {0} tries" (- max-future-cancel-wait-tries tries)))))
(catch Throwable e
(log/warn e (trs "Exception while closing watch service")))))

(log/info (trs "Done shutting down watcher service"))
context)

(create-watcher
Expand All @@ -31,9 +51,10 @@

(create-watcher
[this options]
(let [{:keys [watchers]} (tk/service-context this)
(let [{:keys [watchers watchers-futures stopping?]} (tk/service-context this)
watcher (watch-core/create-watcher options)
shutdown-fn (partial shutdown-on-error (tk/service-id this))]
(watch-core/watch! watcher shutdown-fn)
shutdown-fn (partial shutdown-on-error (tk/service-id this))
watch-future (watch-core/watch! watcher shutdown-fn stopping?)]
(swap! watchers conj watcher)
(swap! watchers-futures conj watch-future)
watcher)))

0 comments on commit c24619f

Please sign in to comment.