From 61dda984d731e5e8d2ccd56b4725a11f7a2cb675 Mon Sep 17 00:00:00 2001 From: Jason Peacock Date: Tue, 27 Aug 2024 09:55:30 -0500 Subject: [PATCH] Run the ConfigurationLoaderTask outside the scheduler. ZEN-34238 --- Products/ZenCollector/config/task.py | 114 +++++---------------------- Products/ZenCollector/daemon.py | 105 +++++++++++++----------- 2 files changed, 80 insertions(+), 139 deletions(-) diff --git a/Products/ZenCollector/config/task.py b/Products/ZenCollector/config/task.py index 4b731008da..afefeac0d0 100644 --- a/Products/ZenCollector/config/task.py +++ b/Products/ZenCollector/config/task.py @@ -7,124 +7,51 @@ # ############################################################################## -""" -The config module provides the implementation of the IConfigurationProxy -interface used within Zenoss Core. This implementation provides basic -configuration retrieval services directly from a remote ZenHub service. -""" - import itertools import logging import time -from metrology import Metrology from twisted.internet import defer -from zope.component import getUtility, queryUtility -from zope.interface import implementer - -from Products.ZenHub.PBDaemon import HubDown -from Products.ZenUtils.observable import ObservableMixin - -from ..tasks import TaskStates -from ..interfaces import ( - ICollector, - IDataService, - IEventService, - IScheduledTask, - IFrameworkFactory, -) log = logging.getLogger("zen.collector.config") -@implementer(IScheduledTask) -class ConfigurationLoaderTask(ObservableMixin): +class ConfigurationLoaderTask(object): """ Periodically retrieves collector configuration via the IConfigurationProxy service. """ - STATE_CONNECTING = "CONNECTING" - STATE_FETCH_MISC_CONFIG = "FETCHING_MISC_CONFIG" - STATE_FETCH_DEVICE_CONFIG = "FETCHING_DEVICE_CONFIG" - STATE_PROCESS_DEVICE_CONFIG = "PROCESSING_DEVICE_CONFIG" - - def __init__( - self, - name, - configId=None, - scheduleIntervalSeconds=None, - taskConfig=None, - ): - if taskConfig is None: - raise TypeError("taskConfig cannot be None") - - super(ConfigurationLoaderTask, self).__init__() - - self._fetchConfigTimer = Metrology.timer("collectordaemon.configs") - - # Needed for interface - self.name = name - self.configId = configId if configId else name - self.state = TaskStates.STATE_IDLE - - self._dataService = queryUtility(IDataService) - self._eventService = queryUtility(IEventService) - - self._prefs = taskConfig - self.interval = self._prefs.configCycleInterval * 60 - self.options = self._prefs.options - - self._collector = getUtility(ICollector) - self._collector.heartbeatTimeout = self.options.heartbeatTimeout - log.debug( - "heartbeat timeout set to %ds", self._collector.heartbeatTimeout - ) - - frameworkFactory = queryUtility( - IFrameworkFactory, self._collector.frameworkFactoryName - ) - self._configProxy = frameworkFactory.getConfigurationProxy() + # Deprecated attribute kept because zenvsphere uses it for reasons + # that are no longer relevant. + STATE_FETCH_DEVICE_CONFIG = "n/a" - self.startDelay = 0 + def __init__(self, collector, proxy): + self._collector = collector + self._proxy = proxy @defer.inlineCallbacks - def doTask(self): - """ - Contact zenhub and gather configuration data. - - @return: A task to gather configs - @rtype: Twisted deferred object - """ - log.debug("%s gathering configuration", self.name) - self.startTime = time.time() - - proxy = self._configProxy + def __call__(self): try: - propertyItems = yield proxy.getPropertyItems() - self._processPropertyItems(propertyItems) + properties = yield self._proxy.getPropertyItems() + self._processPropertyItems(properties) - thresholdClasses = yield proxy.getThresholdClasses() + thresholdClasses = yield self._proxy.getThresholdClasses() self._processThresholdClasses(thresholdClasses) - thresholds = yield proxy.getThresholds() + thresholds = yield self._proxy.getThresholds() self._processThresholds(thresholds) yield self._collector.runPostConfigTasks() - except Exception as ex: - log.exception("task '%s' failed", self.name) - - # stop if a single device was requested and nothing found - if self.options.device or not self.options.cycle: - self._collector.stop() - - if isinstance(ex, HubDown): - # Allow the loader to be reaped and re-added - self.state = TaskStates.STATE_COMPLETED + except Exception: + log.exception( + "failed to retrieve collector configuration " + "collection-daemon=%s", + self._collector.name, + ) def _processPropertyItems(self, propertyItems): log.debug("processing received property items") - self.state = self.STATE_FETCH_MISC_CONFIG if propertyItems: self._collector._setCollectorPreferences(propertyItems) @@ -138,9 +65,6 @@ def _processThresholds(self, thresholds): if thresholds: self._collector._configureThresholds(thresholds) - def cleanup(self): - pass # Required by interface - class DeviceConfigLoader(object): """Handles retrieving devices from the ConfigCache service.""" @@ -187,7 +111,7 @@ def _get_specified_config(self, new, updated): for cfg in itertools.chain(new, updated) if self._options.device == cfg.configId ), - None + None, ) def _update_local_cache(self, new, updated, removed): diff --git a/Products/ZenCollector/daemon.py b/Products/ZenCollector/daemon.py index c0e0d987c8..e95473b321 100644 --- a/Products/ZenCollector/daemon.py +++ b/Products/ZenCollector/daemon.py @@ -15,6 +15,8 @@ from optparse import SUPPRESS_HELP +import attr + from metrology import Metrology from metrology.instruments import Gauge from twisted.internet import defer, reactor, task @@ -34,7 +36,7 @@ from Products.ZenUtils.observable import ObservableProxy from Products.ZenUtils.Utils import load_config -from .config import DeviceConfigLoader +from .config import ConfigurationLoaderTask, DeviceConfigLoader from .interfaces import ( ICollector, ICollectorPreferences, @@ -49,8 +51,6 @@ from .listeners import ConfigListenerNotifier from .utils.maintenance import MaintenanceCycle, ZenHubHeartbeatSender -CONFIG_LOADER_NAME = "configLoader" - @implementer(ICollector, IDataService, IEventService) class CollectorDaemon(RRDDaemon): @@ -160,6 +160,8 @@ def __init__( # value is None, zero, or some other False-like value. self._device_config_update_interval = 300 + self._config_update_interval = self._prefs.configCycleInterval * 60 + self._deviceGuids = {} self._unresponsiveDevices = set() self._rrd = None @@ -204,6 +206,10 @@ def __init__( # from zenhub self.encryptionKeyInitialized = False + self._configloader = ConfigurationLoaderTask(self, self._configProxy) + self._configloadertask = None + self._configloadertaskd = None + # Define _deviceloader to avoid race condition # with task stats recording. if self.options.device: @@ -333,35 +339,26 @@ def _initEncryptionKey(self): # type: (Self) -> Deferred self.encryptionKeyInitialized = True self.log.debug("initialized encryption key") - def _startConfigCycle(self, startDelay=0): # type: (Self, float) -> None - framework = _getFramework(self.frameworkFactoryName) - configLoader = framework.getConfigurationLoaderTask()( - CONFIG_LOADER_NAME, taskConfig=self.preferences + def _startConfigCycle(self): + self._configloadertask = task.LoopingCall(self._configloader) + self._configloadertaskd = self._configloadertask.start( + self._config_update_interval ) - configLoader.startDelay = startDelay - # Don't add the config loader task if the scheduler already has - # an instance of it. - if configLoader not in self._scheduler: - # Run initial maintenance cycle as soon as possible - # TODO: should we not run maintenance if running in - # non-cycle mode? - self._scheduler.addTask(configLoader) - self.log.info( - "scheduled task " - "name=%s config-id=%s interval=%s start-delay=%s", - configLoader.name, - configLoader.configId, - getattr(configLoader, "interval", "n/a"), - configLoader.startDelay, - ) - else: - self.log.info( - "task already scheduled name=%s config-id=%s", - configLoader.name, - configLoader.configId, - ) + reactor.addSystemEventTrigger( + "before", "shutdown", self._stop_configloadertask, "before" + ) + self.log.info( + "started receiving collector config changes interval=%d", + self._config_update_interval, + ) + + def _stop_configloadertask(self): + if self._configloadertask is None: + return + self._configloadertask.stop() + self._configloadertask = self._configloadertaskd = None - def _startMaintenance(self): # type: (Self) -> None + def _startMaintenance(self): if not self.options.cycle: return interval = self.preferences.cycleInterval @@ -386,13 +383,19 @@ def _startDeviceConfigLoader(self): self._device_config_update_interval ) reactor.addSystemEventTrigger( - "before", "shutdown", self._deviceloadertask.stop, "before" + "before", "shutdown", self._stop_deviceloadertask, "before" ) self.log.info( "started receiving device config changes interval=%d", self._device_config_update_interval, ) + def _stop_deviceloadertask(self): + if self._deviceloadertask is None: + return + self._deviceloadertask.stop() + self._deviceloadertask = self._deviceloadertaskd = None + def _startTaskStatsLogging(self): if not (self.options.cycle and self.options.logTaskStats): return @@ -485,6 +488,7 @@ def writeMetric( :return: a deferred that fires when the metric gets published. """ timestamp = int(time.time()) if timestamp == "N" else timestamp + threshEventData = threshEventData if threshEventData else {} tags = {"contextUUID": contextUUID, "key": contextKey} if self.should_trace_metric(metric, contextKey): tags["mtrace"] = "{}".format(int(time.time())) @@ -622,15 +626,16 @@ def _rescheduleConfig( """ Delete and re-add the configuration tasks to start on new interval. """ - if oldValue != newValue: - self.log.info( - "changing config task interval from %s to %s minutes", - oldValue, - newValue, - ) - self._scheduler.removeTasksForConfig(CONFIG_LOADER_NAME) - # values are in minutes, scheduler takes seconds - self._startConfigCycle(newValue * 60) + if oldValue == newValue: + return + self._config_update_interval = newValue * 60 + self._stop_configloadertask() + self.log.info( + "changing collector config task interval from %s to %s minutes", + oldValue, + newValue, + ) + self._startConfigCycle() def _taskCompleteCallback(self, taskName): # if we're not running a normal daemon cycle then we need to shutdown @@ -721,11 +726,13 @@ def _manyDeviceConfigCallback(self, new, updated, removed): self._updateConfig(cfg) - lengths = (len(new), len(updated), len(removed)) - logmethod = self.log.debug if lengths == (0, 0, 0) else self.log.info + sizes = _DeviceConfigSizes(new, updated, removed) + logmethod = self.log.debug if not sizes else self.log.info logmethod( "processed %d new, %d updated, and %d removed device configs", - *lengths + sizes.new, + sizes.updated, + sizes.removed, ) def _deleteDevice(self, deviceId): @@ -874,7 +881,7 @@ def _pauseUnreachableDevices(self): # Device ping issues returns as a tuple of (deviceId, count, total) # and we just want the device id - newUnresponsiveDevices = set(i[0] for i in issues) + newUnresponsiveDevices = {i[0] for i in issues} clearedDevices = self._unresponsiveDevices.difference( newUnresponsiveDevices @@ -966,6 +973,16 @@ def worker_id(self): return getattr(self.options, "workerid", 0) +@attr.s(frozen=True, slots=True) +class _DeviceConfigSizes(object): + new = attr.ib(converter=len) + updated = attr.ib(converter=len) + removed = attr.ib(converter=len) + + def __nonzero__(self): + return (self.new, self.updated, self.removed) != (0,0,0) + + class _DeviceIdProxy(object): """ Exists to maintain an API for ZenPacks that accessed CollectorDaemon's