From 02617361c5ae854bb3468287dc8bdecbe2feb560 Mon Sep 17 00:00:00 2001 From: Jason Peacock Date: Mon, 12 Aug 2024 13:53:32 -0500 Subject: [PATCH] Implement an API to retrieve a single device config. The CollectorDaemon is also modified to have an optimized run path when collecting a CLI specified device. ZEN-34895 --- Products/ZenCollector/config/__init__.py | 9 +- Products/ZenCollector/config/task.py | 46 ++- Products/ZenCollector/daemon.py | 297 ++++++++++-------- Products/ZenCollector/interfaces.py | 2 +- Products/ZenCollector/scheduler/scheduler.py | 14 +- Products/ZenCollector/scheduler/task.py | 53 ++-- Products/ZenCollector/services/ConfigCache.py | 37 ++- 7 files changed, 287 insertions(+), 171 deletions(-) diff --git a/Products/ZenCollector/config/__init__.py b/Products/ZenCollector/config/__init__.py index 04257133d5..5f629923b3 100644 --- a/Products/ZenCollector/config/__init__.py +++ b/Products/ZenCollector/config/__init__.py @@ -8,10 +8,15 @@ ############################################################################## from .proxy import ConfigurationProxy -from .task import ConfigurationLoaderTask, DeviceConfigLoader +from .task import ( + ConfigurationLoaderTask, + ManyDeviceConfigLoader, + SingleDeviceConfigLoader, +) __all__ = ( "ConfigurationLoaderTask", "ConfigurationProxy", - "DeviceConfigLoader", + "ManyDeviceConfigLoader", + "SingleDeviceConfigLoader", ) diff --git a/Products/ZenCollector/config/task.py b/Products/ZenCollector/config/task.py index afefeac0d0..dfe179b067 100644 --- a/Products/ZenCollector/config/task.py +++ b/Products/ZenCollector/config/task.py @@ -7,7 +7,6 @@ # ############################################################################## -import itertools import logging import time @@ -66,7 +65,39 @@ def _processThresholds(self, thresholds): self._collector._configureThresholds(thresholds) -class DeviceConfigLoader(object): +class SingleDeviceConfigLoader(object): + """Handles retrieving the config of a single device.""" + + def __init__(self, deviceid, collector, service, options, callback): + self._deviceId = deviceid + self._collector = collector + self._service = service + self._options = options + self._callback = callback + + @property + def deviceIds(self): + return [self._deviceId] + + @defer.inlineCallbacks + def __call__(self): + try: + ref = yield self._collector.getRemoteConfigCacheProxy() + + log.debug("fetching device config for %s", self._deviceId) + # get options from prefs.options and send to remote + config = yield ref.callRemote( + "getDeviceConfig", + self._service, + self._deviceId, + options=self._options.__dict__, + ) + yield self._callback(config) + except Exception: + log.exception("failed to retrieve device configs") + + +class ManyDeviceConfigLoader(object): """Handles retrieving devices from the ConfigCache service.""" def __init__(self, proxy, callback): @@ -81,6 +112,7 @@ def deviceIds(self): @defer.inlineCallbacks def __call__(self): + log.debug("fetching device configs") try: next_time = time.time() config_data = yield self._proxy.getConfigProxies( @@ -104,16 +136,6 @@ def _processConfigs(self, config_data): except Exception: log.exception("failed to process device configs") - def _get_specified_config(self, new, updated): - return next( - ( - cfg - for cfg in itertools.chain(new, updated) - if self._options.device == cfg.configId - ), - None, - ) - def _update_local_cache(self, new, updated, removed): self._deviceIds.difference_update(removed) self._deviceIds.update(cfg.id for cfg in new) diff --git a/Products/ZenCollector/daemon.py b/Products/ZenCollector/daemon.py index 2708b863ea..67f76721ac 100644 --- a/Products/ZenCollector/daemon.py +++ b/Products/ZenCollector/daemon.py @@ -36,7 +36,11 @@ from Products.ZenUtils.observable import ObservableProxy from Products.ZenUtils.Utils import load_config -from .config import ConfigurationLoaderTask, DeviceConfigLoader +from .config import ( + ConfigurationLoaderTask, + ManyDeviceConfigLoader, + SingleDeviceConfigLoader, +) from .interfaces import ( ICollector, ICollectorPreferences, @@ -89,21 +93,11 @@ def __init__( ignored. :type stoppingCallback: any callable, optional """ - # Create the configuration first, so we have the collector name - # available before activating the rest of the Daemon class hierarchy. - if not ICollectorPreferences.providedBy(preferences): - raise TypeError("configuration must provide ICollectorPreferences") - if not ITaskSplitter.providedBy(taskSplitter): - raise TypeError("taskSplitter must provide ITaskSplitter") - if configurationListener is not None: - if not IConfigurationListener.providedBy(configurationListener): - raise TypeError( - "configurationListener must provide IConfigurationListener" - ) + _verify_input_args(preferences, taskSplitter, configurationListener) self._prefs = ObservableProxy(preferences) self._prefs.attachAttributeObserver( - "configCycleInterval", self._rescheduleConfig + "configCycleInterval", self._reschedule_configcycle ) self._taskSplitter = taskSplitter self._configListener = ConfigListenerNotifier() @@ -185,10 +179,6 @@ def __init__( # Let the configuration do any additional startup it might need self.preferences.postStartup() - # Set flag to limit what actions are run after the first run - # of the config loader task. - self.__first_config_task_run = True - # Variables used by enterprise collector in resmgr # # Flag that indicates we have finished loading the configs for the @@ -198,19 +188,50 @@ def __init__( # from zenhub self.encryptionKeyInitialized = False + # Initialize the object used for retrieving properties, thresholds, + # and other non-device configurations from ZenHub. self._configloader = ConfigurationLoaderTask(self, self._configProxy) - self._configloadertask = None - self._configloadertaskd = None - # Define _deviceloader to avoid race condition - # with task stats recording. + # Initialize the object used for retrieving device configurations. if self.options.device: - callback = self._singleDeviceConfigCallback + self._deviceloader = SingleDeviceConfigLoader( + self.options.device, + self, + self.preferences.configurationService, + self.options, + self._singleDeviceConfigCallback, + ) else: - callback = self._manyDeviceConfigCallback - self._deviceloader = DeviceConfigLoader(self._configProxy, callback) - self._deviceloadertask = None - self._deviceloadertaskd = None + self._deviceloader = ManyDeviceConfigLoader( + self._configProxy, self._manyDeviceConfigCallback + ) + + # If cycling is enabled, initialize the tasks that will run + # on an interval. + if self.options.cycle: + self._configcycle = _TaskCycle( + self._configloader, + self._config_update_interval, + self.log, + description="properties, thresholds, etc. retrieval", + now=False + ) + self._devicecycle = _TaskCycle( + self._deviceloader, + self._device_config_update_interval, + self.log, + description="device configuration retrieval", + ) + if self.options.logTaskStats: + self._taskstatscycle = _TaskCycle( + lambda: self._displayStatistics(verbose=True), + self.options.logTaskStats, + self.log, + description="task statistics logging", + now=False, + ) + else: + self._taskstatscycle = None # deprecated; kept for vSphere ZP compatibility self._devices = _DeviceIdProxy(self._deviceloader) @@ -309,9 +330,22 @@ def connected(self): # type: (Self) -> Deferred framework = _getFramework(self.frameworkFactoryName) self.log.debug("using framework factory %r", framework) yield self._initEncryptionKey() - yield self._startConfigCycle() - yield self._startMaintenance() - yield self._startTaskStatsLogging() + + # Initial configuration load + yield self._configloader() + + # Add "post startup" tasks provided by preferences + self._add_poststartuptasks() + + if self.options.cycle: + self._configcycle.start() + self._startMaintenance() + self._devicecycle.start() + if self._taskstatscycle is not None: + self._taskstatscycle.start() + else: + # Since we're going to run once, load the device config(s) now. + yield self._deviceloader() except Exception as ex: self.log.critical("unrecoverable error: %s", ex) self.log.exception("failed during startup") @@ -331,28 +365,28 @@ def _initEncryptionKey(self): # type: (Self) -> Deferred self.encryptionKeyInitialized = True self.log.debug("initialized encryption key") - def _startConfigCycle(self): - self._configloadertask = task.LoopingCall(self._configloader) - self._configloadertaskd = self._configloadertask.start( - self._config_update_interval - ) - reactor.addSystemEventTrigger( - "before", "shutdown", self._stop_configloadertask, "before" - ) - self.log.info( - "started receiving collector config changes interval=%d", - self._config_update_interval, + def _add_poststartuptasks(self): + post_startup_tasks = getattr( + self.preferences, "postStartupTasks", lambda: [] ) + for task_ in post_startup_tasks(): + self._scheduler.addTask(task_, now=True) - def _stop_configloadertask(self): - if self._configloadertask is None: + def _reschedule_configcycle( + self, observable, attrName, oldValue, newValue, **kwargs + ): + if not self.options.cycle: return - self._configloadertask.stop() - self._configloadertask = self._configloadertaskd = None + if oldValue == newValue: + return + self.log.info( + "changed configuration loader task interval from %s to %s minutes", + oldValue, + newValue, + ) + self._configcycle.interval = newValue * 60 def _startMaintenance(self): - if not self.options.cycle: - return interval = self.preferences.cycleInterval if self.worker_id == 0: @@ -369,42 +403,6 @@ def _startMaintenance(self): ) self._maintenanceCycle.start() - def _startDeviceConfigLoader(self): - self._deviceloadertask = task.LoopingCall(self._deviceloader) - self._deviceloadertaskd = self._deviceloadertask.start( - self._device_config_update_interval - ) - reactor.addSystemEventTrigger( - "before", "shutdown", self._stop_deviceloadertask, "before" - ) - self.log.info( - "started receiving device config changes interval=%d", - self._device_config_update_interval, - ) - - def _stop_deviceloadertask(self): - if self._deviceloadertask is None: - return - self._deviceloadertask.stop() - self._deviceloadertask = self._deviceloadertaskd = None - - def _startTaskStatsLogging(self): - if not (self.options.cycle and self.options.logTaskStats): - return - self._taskstatslogger = task.LoopingCall( - self._displayStatistics, verbose=True - ) - self._taskstatsloggerd = self._taskstatslogger.start( - self.options.logTaskStats, now=False - ) - reactor.addSystemEventTrigger( - "before", "shutdown", self._taskstatslogger.stop, "before" - ) - self.log.info( - "started logging task statistics interval=%d", - self.options.logTaskStats, - ) - @defer.inlineCallbacks def getRemoteConfigCacheProxy(self): """Return the remote configuration cache proxy.""" @@ -606,23 +604,6 @@ def stop(self, ignored=""): self.log.exception("exception while stopping daemon") super(CollectorDaemon, self).stop(ignored) - def _rescheduleConfig( - self, observable, attrName, oldValue, newValue, **kwargs - ): - """ - Delete and re-add the configuration tasks to start on new interval. - """ - if oldValue == newValue: - return - self._config_update_interval = newValue * 60 - self._stop_configloadertask() - self.log.info( - "changing collector config task interval from %s to %s minutes", - oldValue, - newValue, - ) - self._startConfigCycle() - def _taskCompleteCallback(self, taskName): # if we're not running a normal daemon cycle then we need to shutdown # once all of our pending tasks have completed @@ -636,18 +617,16 @@ def _taskCompleteCallback(self, taskName): # if all pending tasks have been completed then shutdown the daemon if len(self._pendingTasks) == 0: - self._displayStatistics() + self.log.info( + "completed collection tasks count=%s", + self._completedTasks, + ) self.stop() - def _singleDeviceConfigCallback(self, new, updated, removed): - # type: ( - # Self, - # Sequence[DeviceProxy], - # Sequence[DeviceProxy], - # Sequence[str] - # ) -> None + def _singleDeviceConfigCallback(self, config): + # type: (Self, DeviceProxy) -> None """ - Update the device configs for the devices this collector manages + Update the device config for the device this collector manages when a device is specified on the command line. :param new: a list of new device configurations @@ -657,14 +636,6 @@ def _singleDeviceConfigCallback(self, new, updated, removed): :param removed: ignored :type removed: Sequence[str] """ - config = next( - ( - cfg - for cfg in itertools.chain(new, updated) - if self.options.device == cfg.id - ), - None, - ) if not config: self.log.error( "configuration for %s unavailable -- " @@ -887,14 +858,6 @@ def runPostConfigTasks(self): """ Add post-startup tasks from the preferences. """ - if self.__first_config_task_run: - postStartupTasks = getattr( - self.preferences, "postStartupTasks", lambda: [] - ) - for _task in postStartupTasks(): - self._scheduler.addTask(_task, now=True) - self._startDeviceConfigLoader() - self.__first_config_task_run = False def postStatisticsImpl(self): self._displayStatistics() @@ -959,6 +922,18 @@ def worker_id(self): return getattr(self.options, "workerid", 0) +def _verify_input_args(prefs, tasksplitter, configlistener): + if not ICollectorPreferences.providedBy(prefs): + raise TypeError("configuration must provide ICollectorPreferences") + if not ITaskSplitter.providedBy(tasksplitter): + raise TypeError("taskSplitter must provide ITaskSplitter") + if configlistener is not None: + if not IConfigurationListener.providedBy(configlistener): + raise TypeError( + "configurationListener must provide IConfigurationListener" + ) + + @attr.s(frozen=True, slots=True) class _DeviceConfigSizes(object): new = attr.ib(converter=len) @@ -966,7 +941,7 @@ class _DeviceConfigSizes(object): removed = attr.ib(converter=len) def __nonzero__(self): - return (self.new, self.updated, self.removed) != (0,0,0) + return (self.new, self.updated, self.removed) != (0, 0, 0) class _DeviceIdProxy(object): @@ -988,6 +963,78 @@ def discard(self, deviceId): pass +class _TaskCycle(object): + """ + Invoke a callable object at a regular interval. + """ + + def __init__(self, func, interval, log, description=None, now=True): + self._log = log + self._func = func + self._interval = interval + self._now = now + if description: + self._desc = description + elif hasattr(func, "im_func"): + self._desc = func.im_func.func_name + else: + self._desc = func.__class__.__name__ + self._loop = None + self._loopd = None + self._triggerid = None + + @property + def interval(self): + return self._interval + + @interval.setter + def interval(self, value): + if value == self._interval: + return + self._interval = value + self._reschedule() + + def start(self): + if self._loop is not None: + return + self._loop = task.LoopingCall(self._func) + self._loopd = self._loop.start(self._interval, now=self._now) + self._triggerid = reactor.addSystemEventTrigger( + "before", "shutdown", self.stop + ) + self._log.info( + "started %s task interval=%d now=%s", + self._desc, + self._interval, + self._now, + ) + self._loopd.addCallback(self._logstopped) + self._loopd.addErrback(self._logerror) + + def stop(self): + if self._loop is None: + return + self._loop.stop() + self._loop = self._loopd = None + + def _logstopped(self, *args, **kw): + self._log.info("stopped %s task", self._desc) + + def _logerror(self, result): + self._log.error( + "task did not run func=%s error=%s", self._func, result + ) + + def _reschedule(self): + if self._loop is None: + # cycle is not running, so nothing to reschedule + return + self.stop() + reactor.removeSystemEventTrigger(self._triggerid) + self._triggerid = None + self.start() + + def _always_ok(*args): return True diff --git a/Products/ZenCollector/interfaces.py b/Products/ZenCollector/interfaces.py index 3adceb2132..3930057da5 100644 --- a/Products/ZenCollector/interfaces.py +++ b/Products/ZenCollector/interfaces.py @@ -137,7 +137,7 @@ def getThresholds(): @rtype: an iterable set of threshold definitions """ - def getConfigProxies(configIds=[]): + def getConfigProxies(configIds=None): """ Called by the framework whenever the configuration for this collector should be retrieved. diff --git a/Products/ZenCollector/scheduler/scheduler.py b/Products/ZenCollector/scheduler/scheduler.py index 89bfe8961a..1dd5a662ac 100644 --- a/Products/ZenCollector/scheduler/scheduler.py +++ b/Products/ZenCollector/scheduler/scheduler.py @@ -168,10 +168,10 @@ def shutdown(self, phase): continue stopOrder = getattr(task, "stopOrder", 0) queue = stopQ.setdefault(stopOrder, []) - queue.append((taskName, taskWrapper, task)) + queue.append((taskName, taskWrapper)) for stopOrder in sorted(stopQ): - for taskName, taskWrapper, task in stopQ[stopOrder]: + for taskName, taskWrapper in stopQ[stopOrder]: loopTask = self._loopingCalls[taskName] if loopTask.running: log.debug("Stopping running task %s", taskName) @@ -198,7 +198,7 @@ def _startTask(self, task, delayed, attempts=0): return if task.name in self._tasksToCleanup: - delay = random.randint(0, int(task.interval / 2)) + delay = random.randint(0, int(task.interval / 2)) # noqa: S311 delayed = delayed + delay if attempts > TaskScheduler.ATTEMPTS: del self._tasksToCleanup[task.name] @@ -251,7 +251,7 @@ def _getStartDelay(self, task): time. """ # simple delay of random number between 0 and half the task interval - delay = random.randint(0, int(task.interval / 2)) + delay = random.randint(0, int(task.interval / 2)) # noqa: S311 return delay def taskAdded(self, taskWrapper): @@ -299,7 +299,7 @@ def getTasksForConfig(self, configId): Get all tasks associated with the specified identifier. """ tasks = [] - for taskName, taskWrapper in self._tasks.iteritems(): + for taskWrapper in self._tasks.itervalues(): task = taskWrapper.task if task.configId == configId: tasks.append(task) @@ -494,8 +494,8 @@ def displayStatistics(self, verbose): totalRuns, totalFailedRuns, totalMissedRuns, - self.executor.queued, - self.executor.running, + self._executor.queued, + self._executor.running, ) if self._displaycounts != counts: self._displaycounts = counts diff --git a/Products/ZenCollector/scheduler/task.py b/Products/ZenCollector/scheduler/task.py index ee80337f3e..bac73eccbd 100644 --- a/Products/ZenCollector/scheduler/task.py +++ b/Products/ZenCollector/scheduler/task.py @@ -52,23 +52,8 @@ def running(self): """ Called whenever this task is being run. """ - try: - if hasattr(self.task, "missed"): - self.task._eventService.sendEvent( - { - "eventClass": "/Perf/MissedRuns", - "component": os.path.basename(sys.argv[0]).replace( - ".py", "" - ), - }, - device=self.task._devId, - summary="Task `{}` is being run.".format(self.task.name), - severity=Event.Clear, - eventKey=self.task.name, - ) - del self.task.missed - except Exception: - pass + if hasattr(self.task, "missed"): + self._send_clear_event() self.taskStats.totalRuns += 1 def logTwistedTraceback(self, reason): @@ -93,11 +78,14 @@ def late(self): """ Called whenever this task is late and missed its scheduled run time. """ - try: - # some tasks we don't want to consider a missed run. - if getattr(self.task, "suppress_late", False): - return + # some tasks we don't want to consider a missed run. + if getattr(self.task, "suppress_late", False): + return + self._send_warning_event() + self.taskStats.missedRuns += 1 + def _send_warning_event(self): + try: # send event only for missed runs on devices. self.task._eventService.sendEvent( { @@ -116,8 +104,27 @@ def late(self): ) self.task.missed = True except Exception: - pass - self.taskStats.missedRuns += 1 + if log.isEnabledFor(logging.DEBUG): + log.exception("unable to send /Perf/MissedRuns warning event") + + def _send_clear_event(self): + try: + self.task._eventService.sendEvent( + { + "eventClass": "/Perf/MissedRuns", + "component": os.path.basename(sys.argv[0]).replace( + ".py", "" + ), + }, + device=self.task._devId, + summary="Task `{}` is being run.".format(self.task.name), + severity=Event.Clear, + eventKey=self.task.name, + ) + del self.task.missed + except Exception: + if log.isEnabledFor(logging.DEBUG): + log.exception("unable to send /Perf/MissedRuns clear event") def __call__(self): if self.task.state is TaskStates.STATE_PAUSED and not self.paused: diff --git a/Products/ZenCollector/services/ConfigCache.py b/Products/ZenCollector/services/ConfigCache.py index 02005cd13d..8ee50e8ae5 100644 --- a/Products/ZenCollector/services/ConfigCache.py +++ b/Products/ZenCollector/services/ConfigCache.py @@ -11,7 +11,7 @@ from zope.component import createObject -from Products.ZenCollector.configcache.cache import DeviceQuery +from Products.ZenCollector.configcache.cache import DeviceKey, DeviceQuery from Products.ZenHub.errors import translateError from Products.ZenHub.HubService import HubService from Products.ZenUtils.RedisUtils import getRedisClient, getRedisUrl @@ -132,6 +132,41 @@ def remote_getDeviceConfigs( "removed": list(removed), } + @translateError + def remote_getDeviceConfig(self, servicename, deviceid, options=None): + """ + Returns the configuration for the requested device or None. + + If the device does not exist or if the device is filtered out for + whatever reason, None is returned. + + Otherwise, the configuration for the device is returned. + + @param servicename: Name of the configuration service. + @type servicename: str + @param when: When the last set of devices was returned. + @type when: datetime.datetime + @param deviceid: Name of the device. + @type deviceid: str + @rtype: DeviceProxy | None + """ + self.log.info( + "[ConfigCache] getDeviceConfig(%r, %r, %r)", + servicename, + deviceid, + options, + ) + predicate = getOptionsFilter(options) + key = DeviceKey( + service=servicename, monitor=self.instance, device=deviceid + ) + filtered = tuple(self._filter([key], predicate)) + if len(filtered) == 0: + return None + if key not in self._stores.device: + return None + return self._stores.device.get(key).config + def remote_getOidMap(self, checksum): """ Returns the current OID map if its checksum doesn't match `checksum`.