Skip to content

Commit

Permalink
Run the ConfigurationLoaderTask outside the scheduler.
Browse files Browse the repository at this point in the history
ZEN-34238
  • Loading branch information
jpeacock-zenoss committed Sep 11, 2024
1 parent 2c8d64f commit 61dda98
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 139 deletions.
114 changes: 19 additions & 95 deletions Products/ZenCollector/config/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,124 +7,51 @@
#
##############################################################################

"""
The config module provides the implementation of the IConfigurationProxy
interface used within Zenoss Core. This implementation provides basic
configuration retrieval services directly from a remote ZenHub service.
"""

import itertools
import logging
import time

from metrology import Metrology
from twisted.internet import defer
from zope.component import getUtility, queryUtility
from zope.interface import implementer

from Products.ZenHub.PBDaemon import HubDown
from Products.ZenUtils.observable import ObservableMixin

from ..tasks import TaskStates
from ..interfaces import (
ICollector,
IDataService,
IEventService,
IScheduledTask,
IFrameworkFactory,
)

log = logging.getLogger("zen.collector.config")


@implementer(IScheduledTask)
class ConfigurationLoaderTask(ObservableMixin):
class ConfigurationLoaderTask(object):
"""
Periodically retrieves collector configuration via the
IConfigurationProxy service.
"""

STATE_CONNECTING = "CONNECTING"
STATE_FETCH_MISC_CONFIG = "FETCHING_MISC_CONFIG"
STATE_FETCH_DEVICE_CONFIG = "FETCHING_DEVICE_CONFIG"
STATE_PROCESS_DEVICE_CONFIG = "PROCESSING_DEVICE_CONFIG"

def __init__(
self,
name,
configId=None,
scheduleIntervalSeconds=None,
taskConfig=None,
):
if taskConfig is None:
raise TypeError("taskConfig cannot be None")

super(ConfigurationLoaderTask, self).__init__()

self._fetchConfigTimer = Metrology.timer("collectordaemon.configs")

# Needed for interface
self.name = name
self.configId = configId if configId else name
self.state = TaskStates.STATE_IDLE

self._dataService = queryUtility(IDataService)
self._eventService = queryUtility(IEventService)

self._prefs = taskConfig
self.interval = self._prefs.configCycleInterval * 60
self.options = self._prefs.options

self._collector = getUtility(ICollector)
self._collector.heartbeatTimeout = self.options.heartbeatTimeout
log.debug(
"heartbeat timeout set to %ds", self._collector.heartbeatTimeout
)

frameworkFactory = queryUtility(
IFrameworkFactory, self._collector.frameworkFactoryName
)
self._configProxy = frameworkFactory.getConfigurationProxy()
# Deprecated attribute kept because zenvsphere uses it for reasons
# that are no longer relevant.
STATE_FETCH_DEVICE_CONFIG = "n/a"

self.startDelay = 0
def __init__(self, collector, proxy):
self._collector = collector
self._proxy = proxy

@defer.inlineCallbacks
def doTask(self):
"""
Contact zenhub and gather configuration data.
@return: A task to gather configs
@rtype: Twisted deferred object
"""
log.debug("%s gathering configuration", self.name)
self.startTime = time.time()

proxy = self._configProxy
def __call__(self):
try:
propertyItems = yield proxy.getPropertyItems()
self._processPropertyItems(propertyItems)
properties = yield self._proxy.getPropertyItems()
self._processPropertyItems(properties)

thresholdClasses = yield proxy.getThresholdClasses()
thresholdClasses = yield self._proxy.getThresholdClasses()
self._processThresholdClasses(thresholdClasses)

thresholds = yield proxy.getThresholds()
thresholds = yield self._proxy.getThresholds()
self._processThresholds(thresholds)

yield self._collector.runPostConfigTasks()
except Exception as ex:
log.exception("task '%s' failed", self.name)

# stop if a single device was requested and nothing found
if self.options.device or not self.options.cycle:
self._collector.stop()

if isinstance(ex, HubDown):
# Allow the loader to be reaped and re-added
self.state = TaskStates.STATE_COMPLETED
except Exception:
log.exception(
"failed to retrieve collector configuration "
"collection-daemon=%s",
self._collector.name,
)

def _processPropertyItems(self, propertyItems):
log.debug("processing received property items")
self.state = self.STATE_FETCH_MISC_CONFIG
if propertyItems:
self._collector._setCollectorPreferences(propertyItems)

Expand All @@ -138,9 +65,6 @@ def _processThresholds(self, thresholds):
if thresholds:
self._collector._configureThresholds(thresholds)

def cleanup(self):
pass # Required by interface


class DeviceConfigLoader(object):
"""Handles retrieving devices from the ConfigCache service."""
Expand Down Expand Up @@ -187,7 +111,7 @@ def _get_specified_config(self, new, updated):
for cfg in itertools.chain(new, updated)
if self._options.device == cfg.configId
),
None
None,
)

def _update_local_cache(self, new, updated, removed):
Expand Down
105 changes: 61 additions & 44 deletions Products/ZenCollector/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

from optparse import SUPPRESS_HELP

import attr

from metrology import Metrology
from metrology.instruments import Gauge
from twisted.internet import defer, reactor, task
Expand All @@ -34,7 +36,7 @@
from Products.ZenUtils.observable import ObservableProxy
from Products.ZenUtils.Utils import load_config

from .config import DeviceConfigLoader
from .config import ConfigurationLoaderTask, DeviceConfigLoader
from .interfaces import (
ICollector,
ICollectorPreferences,
Expand All @@ -49,8 +51,6 @@
from .listeners import ConfigListenerNotifier
from .utils.maintenance import MaintenanceCycle, ZenHubHeartbeatSender

CONFIG_LOADER_NAME = "configLoader"


@implementer(ICollector, IDataService, IEventService)
class CollectorDaemon(RRDDaemon):
Expand Down Expand Up @@ -160,6 +160,8 @@ def __init__(
# value is None, zero, or some other False-like value.
self._device_config_update_interval = 300

self._config_update_interval = self._prefs.configCycleInterval * 60

self._deviceGuids = {}
self._unresponsiveDevices = set()
self._rrd = None
Expand Down Expand Up @@ -204,6 +206,10 @@ def __init__(
# from zenhub
self.encryptionKeyInitialized = False

self._configloader = ConfigurationLoaderTask(self, self._configProxy)
self._configloadertask = None
self._configloadertaskd = None

# Define _deviceloader to avoid race condition
# with task stats recording.
if self.options.device:
Expand Down Expand Up @@ -333,35 +339,26 @@ def _initEncryptionKey(self): # type: (Self) -> Deferred
self.encryptionKeyInitialized = True
self.log.debug("initialized encryption key")

def _startConfigCycle(self, startDelay=0): # type: (Self, float) -> None
framework = _getFramework(self.frameworkFactoryName)
configLoader = framework.getConfigurationLoaderTask()(
CONFIG_LOADER_NAME, taskConfig=self.preferences
def _startConfigCycle(self):
self._configloadertask = task.LoopingCall(self._configloader)
self._configloadertaskd = self._configloadertask.start(
self._config_update_interval
)
configLoader.startDelay = startDelay
# Don't add the config loader task if the scheduler already has
# an instance of it.
if configLoader not in self._scheduler:
# Run initial maintenance cycle as soon as possible
# TODO: should we not run maintenance if running in
# non-cycle mode?
self._scheduler.addTask(configLoader)
self.log.info(
"scheduled task "
"name=%s config-id=%s interval=%s start-delay=%s",
configLoader.name,
configLoader.configId,
getattr(configLoader, "interval", "n/a"),
configLoader.startDelay,
)
else:
self.log.info(
"task already scheduled name=%s config-id=%s",
configLoader.name,
configLoader.configId,
)
reactor.addSystemEventTrigger(
"before", "shutdown", self._stop_configloadertask, "before"
)
self.log.info(
"started receiving collector config changes interval=%d",
self._config_update_interval,
)

def _stop_configloadertask(self):
if self._configloadertask is None:
return
self._configloadertask.stop()
self._configloadertask = self._configloadertaskd = None

def _startMaintenance(self): # type: (Self) -> None
def _startMaintenance(self):
if not self.options.cycle:
return
interval = self.preferences.cycleInterval
Expand All @@ -386,13 +383,19 @@ def _startDeviceConfigLoader(self):
self._device_config_update_interval
)
reactor.addSystemEventTrigger(
"before", "shutdown", self._deviceloadertask.stop, "before"
"before", "shutdown", self._stop_deviceloadertask, "before"
)
self.log.info(
"started receiving device config changes interval=%d",
self._device_config_update_interval,
)

def _stop_deviceloadertask(self):
if self._deviceloadertask is None:
return
self._deviceloadertask.stop()
self._deviceloadertask = self._deviceloadertaskd = None

def _startTaskStatsLogging(self):
if not (self.options.cycle and self.options.logTaskStats):
return
Expand Down Expand Up @@ -485,6 +488,7 @@ def writeMetric(
:return: a deferred that fires when the metric gets published.
"""
timestamp = int(time.time()) if timestamp == "N" else timestamp
threshEventData = threshEventData if threshEventData else {}
tags = {"contextUUID": contextUUID, "key": contextKey}
if self.should_trace_metric(metric, contextKey):
tags["mtrace"] = "{}".format(int(time.time()))
Expand Down Expand Up @@ -622,15 +626,16 @@ def _rescheduleConfig(
"""
Delete and re-add the configuration tasks to start on new interval.
"""
if oldValue != newValue:
self.log.info(
"changing config task interval from %s to %s minutes",
oldValue,
newValue,
)
self._scheduler.removeTasksForConfig(CONFIG_LOADER_NAME)
# values are in minutes, scheduler takes seconds
self._startConfigCycle(newValue * 60)
if oldValue == newValue:
return
self._config_update_interval = newValue * 60
self._stop_configloadertask()
self.log.info(
"changing collector config task interval from %s to %s minutes",
oldValue,
newValue,
)
self._startConfigCycle()

def _taskCompleteCallback(self, taskName):
# if we're not running a normal daemon cycle then we need to shutdown
Expand Down Expand Up @@ -721,11 +726,13 @@ def _manyDeviceConfigCallback(self, new, updated, removed):

self._updateConfig(cfg)

lengths = (len(new), len(updated), len(removed))
logmethod = self.log.debug if lengths == (0, 0, 0) else self.log.info
sizes = _DeviceConfigSizes(new, updated, removed)
logmethod = self.log.debug if not sizes else self.log.info
logmethod(
"processed %d new, %d updated, and %d removed device configs",
*lengths
sizes.new,
sizes.updated,
sizes.removed,
)

def _deleteDevice(self, deviceId):
Expand Down Expand Up @@ -874,7 +881,7 @@ def _pauseUnreachableDevices(self):

# Device ping issues returns as a tuple of (deviceId, count, total)
# and we just want the device id
newUnresponsiveDevices = set(i[0] for i in issues)
newUnresponsiveDevices = {i[0] for i in issues}

clearedDevices = self._unresponsiveDevices.difference(
newUnresponsiveDevices
Expand Down Expand Up @@ -966,6 +973,16 @@ def worker_id(self):
return getattr(self.options, "workerid", 0)


@attr.s(frozen=True, slots=True)
class _DeviceConfigSizes(object):
new = attr.ib(converter=len)
updated = attr.ib(converter=len)
removed = attr.ib(converter=len)

def __nonzero__(self):
return (self.new, self.updated, self.removed) != (0,0,0)


class _DeviceIdProxy(object):
"""
Exists to maintain an API for ZenPacks that accessed CollectorDaemon's
Expand Down

0 comments on commit 61dda98

Please sign in to comment.