Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(maint) logging level optimization, streamline shutdown #30

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
(:import (clojure.lang IFn)
(java.io File)
(java.nio.file StandardWatchEventKinds Path WatchEvent WatchKey FileSystems ClosedWatchServiceException WatchService)
(com.puppetlabs DirWatchUtils)))
(com.puppetlabs DirWatchUtils)
(java.util.concurrent Future)))

;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;;; Constants
Expand Down Expand Up @@ -145,39 +146,47 @@
events))
initial-events)))


(schema/defn process-events!
"Process for side-effects any events that occured for watcher's watch-key"
"Process for side effects any events that occurred for watcher's watch-key"
[watcher :- (schema/protocol Watcher)
events :- [Event]]
(let [callbacks @(:callbacks watcher)
events-by-dir (group-by :watched-path events)]
(let [callbacks @(:callbacks watcher)]
;; avoid doing a potentially expensive walk when we aren't logging at :debug
(when (log/enabled? :debug)
(doseq [[dir events'] events-by-dir]
(log/debug (trs "Got {0} event(s) in directory {1}"
(count events') dir))))
(let [events-by-dir (group-by :watched-path events)]
(doseq [[dir events'] events-by-dir]
(log/debug (trs "Got {0} event(s) in directory {1}"
(count events') dir)))))

;; avoid doing a potentially expensive print-to-string when we aren't logging at :trace
(when (log/enabled? :trace)
(log/tracef "%s\n%s"
(trs "Events:")
(ks/pprint-to-string events)))

(doseq [callback callbacks]
(callback events))))

(schema/defn watch!
"Creates a future and processes events for the passed in watcher.
The future will continue until the underlying WatchService is closed."
(schema/defn watch! :- Future
"Creates and returns a future. Processes events for the passed in watcher within the context of that future.
The future will continue until the underlying WatchService is closed, or the future is interrupted."
[watcher :- (schema/protocol Watcher)
shutdown-fn :- IFn]
shutdown-fn :- IFn
stopped? :- (schema/atom schema/Bool)]
(future
(let [stopped? (atom false)]
(shutdown-fn #(while (not @stopped?)
(try
(let [events (retrieve-events watcher)]
(when-not (empty? events)
(process-events! watcher events)))
(catch ClosedWatchServiceException _e
(reset! stopped? true)
(log/info (trs "Closing watcher {0}" watcher)))))))))
(shutdown-fn
#(while (not @stopped?)
(try
(let [events (retrieve-events watcher)]
(when-not (empty? events)
(process-events! watcher events)))
(catch ClosedWatchServiceException _e
(log/info (trs "Closing watcher {0}" watcher)))
;; it is possible for `retrieve-events` to generate an InterruptedException if the `.take` occurs when an
;; interrupt is requested. This is explicitly handled to prevent shutting down the whole application.
(catch InterruptedException _e
(log/info (trs "Watching for events interrupted by thread shutdown")))
(catch Throwable e
(log/error e (trs "Fatal error while watching for events"))
(throw e)))))))

Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,47 @@
(:require [clojure.tools.logging :as log]
[puppetlabs.i18n.core :refer [trs]]
[puppetlabs.trapperkeeper.services :as tk]
[puppetlabs.trapperkeeper.services.protocols.filesystem-watch-service :refer [FilesystemWatchService create-watcher]]
[puppetlabs.trapperkeeper.services.protocols.filesystem-watch-service :refer [FilesystemWatchService create-watcher]]
[puppetlabs.trapperkeeper.services.watcher.filesystem-watch-core :as watch-core])
(:import (java.io IOException)
(java.nio.file WatchService)))
(:import (java.nio.file WatchService)))

(def max-future-cancel-wait-tries 100)
(def future-cancel-wait-sleep-ms 10)

(tk/defservice filesystem-watch-service
FilesystemWatchService
[[:ShutdownService shutdown-on-error]]

(init
[this context]
(assoc context :watchers (atom [])))
(assoc context :watchers (atom [])
:watchers-futures (atom [])
:stopping? (atom false)))

(stop
[this context]
(log/info (trs "Shutting down watcher service"))
(reset! (:stopping? context) true)
;; Shut down the WatchServices
(doseq [watcher @(:watchers context)]
(try
(.close ^WatchService (:watch-service watcher))
(catch IOException e
(catch Throwable e
(log/warn e (trs "Exception while closing watch service")))))

(doseq [watchers-future @(:watchers-futures context)]
(try
(future-cancel watchers-future)
(loop [tries max-future-cancel-wait-tries]
(if (and (pos? tries) (not (future-done? watchers-future)))
(do
(Thread/sleep future-cancel-wait-sleep-ms)
(recur (dec tries)))
(log/debug (trs "Future completed after {0} tries" (- max-future-cancel-wait-tries tries)))))
(catch Throwable e
(log/warn e (trs "Exception while closing watch service")))))

(log/info (trs "Done shutting down watcher service"))
context)

(create-watcher
Expand All @@ -31,9 +51,10 @@

(create-watcher
[this options]
(let [{:keys [watchers]} (tk/service-context this)
(let [{:keys [watchers watchers-futures stopping?]} (tk/service-context this)
watcher (watch-core/create-watcher options)
shutdown-fn (partial shutdown-on-error (tk/service-id this))]
(watch-core/watch! watcher shutdown-fn)
shutdown-fn (partial shutdown-on-error (tk/service-id this))
watch-future (watch-core/watch! watcher shutdown-fn stopping?)]
(swap! watchers conj watcher)
(swap! watchers-futures conj watch-future)
watcher)))
Loading