diff --git a/docs/configuration.rst b/docs/configuration.rst index 568c32eb..61ea0659 100644 --- a/docs/configuration.rst +++ b/docs/configuration.rst @@ -262,6 +262,59 @@ The default is 0.1 seconds (nearly instant, but not flooding). settings.watching.server_timeout = 10 * 60 +.. _consistency: + +Consistency +=========== + +Generally, a resource's events and updates streamed from the Kubernetes API +are processed as soon as possible, with no delays or skipping. However, +high-level change-detection handlers (on-creation/resume/update/deletion) +require a consistent state of the resource. _Consistency_ means that all +patches applied by Kopf itself have arrived back via the watch-stream. +If Kopf did not patch the resource recently, it is consistent by definition. + +The _inconsistent_ states can be seen in relatively rare circumstances +on slow networks (operatorâŸșapiservers) or under high load, especially +when the operator or a third party patches the resource on their own. +In those cases, the framework can misinterpret the intermediate states +and perform double-processing (i.e. double handler execution). + +To prevent this, all state-dependent handlers are postponed until +the consistency is reached via one of the following two ways: + +* The expected resource version from the PATCH API operation arrives + via the watch-stream of the resource within the specified time window. +* The expected resource version from the PATCH API operation does not arrive + via the watch-stream within the specified time window, in which case + the consistency is assumed (implied) and the processing continues as if + the version has arrived, possibly causing the mentioned side-effects. + +The time window is measured relative to the time of the last ``PATCH`` call. +The timeout should be long enough to assume that if the expected resource +version did not arrive within the specified time, it will never arrive. + +.. code-block:: python + + import kopf + + @kopf.on.startup() + def configure(settings: kopf.OperatorSettings, **_): + settings.persistence.consistency_timeout = 10 + +The default value (5 seconds) aims to the safest scenario out of the box. + +The value of ``0`` will effectively disable the consistency tracking +and declare all resource states as consistent -- even if they are not. +Use this with care -- e.g. with self-made persistence storages instead of +Kopf's annotations (see :ref:`progress-storing` and :ref:`diffbase-storing`). + +The consistency timeout does not affect low-level handlers with no persistence, +such as ``@kopf.on.event``, ``@kopf.index``, ``@kopf.daemon``, ``@kopf.timer`` +-- these handlers are invoked for each and every watch-event with no delay +(if they match the :doc:`filters `, of course). + + Finalizers ========== diff --git a/examples/01-minimal/example.py b/examples/01-minimal/example.py index 06ddbb8f..1d11064b 100644 --- a/examples/01-minimal/example.py +++ b/examples/01-minimal/example.py @@ -2,6 +2,12 @@ @kopf.on.create('kopfexamples') -def create_fn(spec, **kwargs): - print(f"And here we are! Creating: {spec}") - return {'message': 'hello world'} # will be the new status +@kopf.on.update('kopfexamples') +def create_fn(meta, spec, reason, logger, **kwargs): + rv = meta.get('resourceVersion') + logger.warning(f">>> {rv=} And here we are! {reason=}: {spec}") + + +# @kopf.on.create('kopfexamples') +# def create_fn2(spec, **kwargs): +# print(f"And here we are! Creating2: {spec}") diff --git a/examples/05-handlers/example.py b/examples/05-handlers/example.py index f542dfbd..f78d3e44 100644 --- a/examples/05-handlers/example.py +++ b/examples/05-handlers/example.py @@ -1,41 +1,8 @@ import kopf -@kopf.on.resume('kopfexamples') -def resume_fn_1(**kwargs): - print(f'RESUMED 1st') - - -@kopf.on.create('kopfexamples') -def create_fn_1(**kwargs): - print('CREATED 1st') - - -@kopf.on.resume('kopfexamples') -def resume_fn_2(**kwargs): - print(f'RESUMED 2nd') - - -@kopf.on.create('kopfexamples') -def create_fn_2(**kwargs): - print('CREATED 2nd') - - -@kopf.on.update('kopfexamples') -def update_fn(old, new, diff, **kwargs): - print('UPDATED') - - @kopf.on.delete('kopfexamples') -def delete_fn_1(**kwargs): - print('DELETED 1st') - - -@kopf.on.delete('kopfexamples') -def delete_fn_2(**kwargs): - print('DELETED 2nd') - - -@kopf.on.field('kopfexamples', field='spec.field') -def field_fn(old, new, **kwargs): - print(f'FIELD CHANGED: {old} -> {new}') +def delete_fn(retry, logger, **_): + if retry < 3: + raise kopf.TemporaryError("no yet", delay=5) + logger.info('DELETED') diff --git a/kopf/_cogs/configs/configuration.py b/kopf/_cogs/configs/configuration.py index db8d5178..553d53f0 100644 --- a/kopf/_cogs/configs/configuration.py +++ b/kopf/_cogs/configs/configuration.py @@ -28,6 +28,7 @@ import concurrent.futures import dataclasses import logging +import warnings from typing import Iterable, Optional, Union from kopf._cogs.configs import diffbase, progress @@ -187,6 +188,7 @@ class WatchingSettings: """ +# TODO: is it now WorkerSettings/MultiplexerSettings? @dataclasses.dataclass class BatchingSettings: """ @@ -204,12 +206,6 @@ class BatchingSettings: How soon an idle worker is exited and garbage-collected if no events arrive. """ - batch_window: float = 0.1 - """ - How fast/slow does a worker deplete the queue when an event is received. - All events arriving within this window will be ignored except the last one. - """ - exit_timeout: float = 2.0 """ How soon a worker is cancelled when the parent watcher is going to exit. @@ -223,6 +219,21 @@ class BatchingSettings: For more information on error throttling, see :ref:`error-throttling`. """ + _batch_window: float = 0.1 # deprecated + + @property + def batch_window(self) -> float: + """ Deprecated and affects nothing. """ + warnings.warn("Time-based event batching was removed. Please stop configuring it.", + DeprecationWarning) + return self._batch_window + + @batch_window.setter + def batch_window(self, value: float) -> None: + warnings.warn("Time-based event batching was removed. Please stop configuring it.", + DeprecationWarning) + self._batch_window = value + @dataclasses.dataclass class ScanningSettings: @@ -374,6 +385,13 @@ class PersistenceSettings: How the resource's essence (non-technical, contentful fields) are stored. """ + consistency_timeout: float = 5.0 + """ + For how long a patched resource version is awaited (seconds). + + See :ref:`consistency` for detailed explanation. + """ + @dataclasses.dataclass class BackgroundSettings: diff --git a/kopf/_core/actions/application.py b/kopf/_core/actions/application.py index 805dda23..73926d13 100644 --- a/kopf/_core/actions/application.py +++ b/kopf/_core/actions/application.py @@ -20,7 +20,7 @@ """ import asyncio import datetime -from typing import Collection, Optional +from typing import Collection, Optional, Tuple from kopf._cogs.aiokits import aiotime from kopf._cogs.clients import patching @@ -49,7 +49,7 @@ async def apply( delays: Collection[float], logger: loggers.ObjectLogger, stream_pressure: Optional[asyncio.Event] = None, # None for tests -) -> bool: +) -> Tuple[bool, Optional[str]]: delay = min(delays) if delays else None # Delete dummies on occasion, but don't trigger special patching for them [discussable]. @@ -57,7 +57,7 @@ async def apply( settings.persistence.progress_storage.touch(body=body, patch=patch, value=None) # Actually patch if it was not empty originally or after the dummies removal. - await patch_and_check( + resource_version = await patch_and_check( settings=settings, resource=resource, logger=logger, @@ -92,7 +92,7 @@ async def apply( value = datetime.datetime.utcnow().isoformat() touch = patches.Patch() settings.persistence.progress_storage.touch(body=body, patch=touch, value=value) - await patch_and_check( + resource_version = await patch_and_check( settings=settings, resource=resource, logger=logger, @@ -101,7 +101,7 @@ async def apply( ) elif not patch: # no patch/touch and no delay applied = True - return applied + return applied, resource_version async def patch_and_check( @@ -111,7 +111,7 @@ async def patch_and_check( body: bodies.Body, patch: patches.Patch, logger: typedefs.Logger, -) -> None: +) -> Optional[str]: # patched resource version """ Apply a patch and verify that it is applied correctly. @@ -146,3 +146,5 @@ async def patch_and_check( logger.debug(f"Patching was skipped: the object does not exist anymore.") elif inconsistencies: logger.warning(f"Patching failed with inconsistencies: {inconsistencies}") + return (resulting_body or {}).get('metadata', {}).get('resourceVersion') + return None diff --git a/kopf/_core/engines/peering.py b/kopf/_core/engines/peering.py index 834102df..6111c419 100644 --- a/kopf/_core/engines/peering.py +++ b/kopf/_core/engines/peering.py @@ -100,6 +100,7 @@ async def process_peering_event( # Must be accepted whether used or not -- as passed by watcher()/worker(). resource_indexed: Optional[aiotoggles.Toggle] = None, # None for tests & observation operator_indexed: Optional[aiotoggles.ToggleSet] = None, # None for tests & observation + consistency_time: Optional[float] = None, # None for tests & observation ) -> None: """ Handle a single update of the peers by us or by other operators. diff --git a/kopf/_core/reactor/observation.py b/kopf/_core/reactor/observation.py index 834c26d2..4318f766 100644 --- a/kopf/_core/reactor/observation.py +++ b/kopf/_core/reactor/observation.py @@ -150,6 +150,7 @@ async def process_discovered_namespace_event( stream_pressure: Optional[asyncio.Event] = None, # None for tests resource_indexed: Optional[aiotoggles.Toggle] = None, # None for tests & observation operator_indexed: Optional[aiotoggles.ToggleSet] = None, # None for tests & observation + consistency_time: Optional[float] = None, # None for tests & observation ) -> None: if raw_event['type'] is None: return @@ -169,6 +170,7 @@ async def process_discovered_resource_event( stream_pressure: Optional[asyncio.Event] = None, # None for tests resource_indexed: Optional[aiotoggles.Toggle] = None, # None for tests & observation operator_indexed: Optional[aiotoggles.ToggleSet] = None, # None for tests & observation + consistency_time: Optional[float] = None, # None for tests & observation ) -> None: # Ignore the initial listing, as all custom resources were already noticed by API listing. # This prevents numerous unneccessary API requests at the the start of the operator. diff --git a/kopf/_core/reactor/processing.py b/kopf/_core/reactor/processing.py index e2ab6c19..d59b5443 100644 --- a/kopf/_core/reactor/processing.py +++ b/kopf/_core/reactor/processing.py @@ -15,9 +15,9 @@ """ import asyncio import time -from typing import Collection, Optional, Tuple +from typing import Collection, NamedTuple, Optional, Tuple -from kopf._cogs.aiokits import aiotoggles +from kopf._cogs.aiokits import aiotime, aiotoggles from kopf._cogs.configs import configuration from kopf._cogs.structs import bodies, diffs, ephemera, finalizers, patches, references from kopf._core.actions import application, execution, lifecycles, loggers, progression, throttlers @@ -39,7 +39,8 @@ async def process_resource_event( stream_pressure: Optional[asyncio.Event] = None, # None for tests resource_indexed: Optional[aiotoggles.Toggle] = None, # None for tests & observation operator_indexed: Optional[aiotoggles.ToggleSet] = None, # None for tests & observation -) -> None: + consistency_time: Optional[float] = None, # None for tests +) -> Optional[str]: # patched resource version, if patched """ Handle a single custom object low-level watch-event. @@ -104,9 +105,11 @@ async def process_resource_event( if operator_indexed is not None and resource_indexed is not None: await operator_indexed.drop_toggle(resource_indexed) if operator_indexed is not None: + # TODO: reconsider this! should on-event be called before indexing is finished? await operator_indexed.wait_for(True) # other resource kinds & objects. if stream_pressure is not None and stream_pressure.is_set(): - return + # TODO: reconsider this! we should process events even if the newer ones arrived. + return None # Do the magic -- do the job. delays, matched = await process_resource_causes( @@ -121,13 +124,15 @@ async def process_resource_event( memory=memory, local_logger=local_logger, event_logger=event_logger, + stream_pressure=stream_pressure, + consistency_time=consistency_time, ) # Whatever was done, apply the accumulated changes to the object, or sleep-n-touch for delays. # But only once, to reduce the number of API calls and the generated irrelevant events. # And only if the object is at least supposed to exist (not "GONE"), even if actually does not. if raw_event['type'] != 'DELETED': - applied = await application.apply( + applied, resource_version = await application.apply( settings=settings, resource=resource, body=body, @@ -136,12 +141,20 @@ async def process_resource_event( delays=delays, stream_pressure=stream_pressure, ) + local_logger.debug(f'ENDED with {resource_version=}') # TODO: REMOVE! if applied and matched: local_logger.debug("Handling cycle is finished, waiting for new changes.") + return resource_version + return None -async def process_resource_causes( - lifecycle: execution.LifeCycleFn, +class _Causes(NamedTuple): + watching_cause: Optional[causes.WatchingCause] + spawning_cause: Optional[causes.SpawningCause] + changing_cause: Optional[causes.ChangingCause] + + +def _detect_causes( indexers: indexing.OperatorIndexers, registry: registries.OperatorRegistry, settings: configuration.OperatorSettings, @@ -152,7 +165,8 @@ async def process_resource_causes( memory: inventory.ResourceMemory, local_logger: loggers.ObjectLogger, event_logger: loggers.ObjectLogger, -) -> Tuple[Collection[float], bool]: +) -> _Causes: + """Detect what are we going to do (or to skip) on this processing cycle.""" finalizer = settings.persistence.finalizer extra_fields = ( @@ -166,7 +180,6 @@ async def process_resource_causes( new = settings.persistence.progress_storage.clear(essence=new) if new is not None else None diff = diffs.diff(old, new) - # Detect what are we going to do on this processing cycle. watching_cause = causes.detect_watching_cause( raw_event=raw_event, resource=resource, @@ -202,6 +215,62 @@ async def process_resource_causes( initial=memory.noticed_by_listing and not memory.fully_handled_once, ) if registry._changing.has_handlers(resource=resource) else None + return _Causes(watching_cause, spawning_cause, changing_cause) + + +async def process_resource_causes( + lifecycle: execution.LifeCycleFn, + indexers: indexing.OperatorIndexers, + registry: registries.OperatorRegistry, + settings: configuration.OperatorSettings, + resource: references.Resource, + raw_event: bodies.RawEvent, + body: bodies.Body, + patch: patches.Patch, + memory: inventory.ResourceMemory, + local_logger: loggers.ObjectLogger, + event_logger: loggers.ObjectLogger, + stream_pressure: Optional[asyncio.Event], # None for tests + consistency_time: Optional[float], +) -> Tuple[Collection[float], bool]: + finalizer = settings.persistence.finalizer + watching_cause, spawning_cause, changing_cause = _detect_causes( + indexers=indexers, + registry=registry, + settings=settings, + resource=resource, + raw_event=raw_event, + body=body, + patch=patch, + memory=memory, + local_logger=local_logger, + event_logger=event_logger, + ) + + # Invoke all the handlers that should or could be invoked at this processing cycle. + # The low-level spies go ASAP always. However, the daemons are spawned before the high-level + # handlers and killed after them: the daemons should live throughout the full object lifecycle. + if watching_cause is not None: + await process_watching_cause( + lifecycle=lifecycles.all_at_once, + registry=registry, + settings=settings, + cause=watching_cause, + ) + + spawning_delays: Collection[float] = [] + if spawning_cause is not None: + spawning_delays = await process_spawning_cause( + registry=registry, + settings=settings, + memory=memory, + cause=spawning_cause, + ) + + # TODO: ----- SPLIT it here! ABOVE: consistency-independent handlers. + # TODO: ----- SPLIT it here! BELOW: consistency-requiring handling. + # TODO: BUT: finalizers? They do not use any state. But they use the spawning/changing cause. + # If there are any handlers for this resource kind in general, but not for this specific object # due to filters, then be blind to it, store no state, and log nothing about the handling cycle. if changing_cause is not None and not registry._changing.prematch(cause=changing_cause): @@ -235,26 +304,26 @@ async def process_resource_causes( finalizers.allow_deletion(body=body, patch=patch, finalizer=finalizer) changing_cause = None # prevent further high-level processing this time - # Invoke all the handlers that should or could be invoked at this processing cycle. - # The low-level spies go ASAP always. However, the daemons are spawned before the high-level - # handlers and killed after them: the daemons should live throughout the full object lifecycle. - if watching_cause is not None: - await process_watching_cause( - lifecycle=lifecycles.all_at_once, - registry=registry, - settings=settings, - cause=watching_cause, - ) - - spawning_delays: Collection[float] = [] - if spawning_cause is not None: - spawning_delays = await process_spawning_cause( - registry=registry, - settings=settings, - memory=memory, - cause=spawning_cause, - ) - + # If the state is inconsistent (yet), wait for new events in a hope that they bring consistency. + # If the wait exceeds its time and no new consistent events arrive, then fake the consistency. + # However, if a patch is accumulated by now, skip waiting and apply it instantly (by exiting). + # In that case, we are guaranteed to be inconsistent, so also skip the state-dependent handlers. + # unslept: Optional[float] = None # TODO: REMOVE + consistency_is_required = changing_cause is not None + consistency_is_achieved = consistency_time is None # i.e. preexisting consistency + if consistency_is_required and not consistency_is_achieved and not patch and consistency_time: + loop = asyncio.get_running_loop() + unslept = await aiotime.sleep(consistency_time - loop.time(), wakeup=stream_pressure) + consistency_is_achieved = unslept is None # "woke up" vs. "timed out" + if consistency_is_required and not consistency_is_achieved: + changing_cause = None # exit to PATCHing and/or re-iterating over new events. + # TODO: REMOVE: + # rv = raw_event.get('object', {}).get('metadata', {}).get('resourceVersion') + # local_logger.debug(f'>>> {rv=} {consistency_is_required=} {consistency_is_achieved=} {unslept=} {changing_cause=}') + + # Now, the consistency is either pre-proven (by receiving or not expecting any resource version) + # or implied (by exceeding the allowed consistency-waiting timeout while getting no new events). + # So we can go for state-dependent handlers (change detection requires a consistent state). changing_delays: Collection[float] = [] if changing_cause is not None: changing_delays = await process_changing_cause( diff --git a/kopf/_core/reactor/queueing.py b/kopf/_core/reactor/queueing.py index a0788936..d867d067 100644 --- a/kopf/_core/reactor/queueing.py +++ b/kopf/_core/reactor/queueing.py @@ -30,7 +30,7 @@ from typing_extensions import Protocol -from kopf._cogs.aiokits import aiotasks, aiotoggles +from kopf._cogs.aiokits import aiotasks, aiotime, aiotoggles from kopf._cogs.clients import watching from kopf._cogs.configs import configuration from kopf._cogs.structs import bodies, references @@ -46,7 +46,8 @@ async def __call__( stream_pressure: Optional[asyncio.Event] = None, # None for tests resource_indexed: Optional[aiotoggles.Toggle] = None, # None for tests & observation operator_indexed: Optional[aiotoggles.ToggleSet] = None, # None for tests & observation - ) -> None: ... + consistency_time: Optional[float] = None, # None for tests + ) -> Optional[str]: ... # patched resource version, if patched # An end-of-stream marker sent from the watcher to the workers. @@ -119,6 +120,13 @@ def get_uid(raw_event: bodies.RawEvent) -> ObjectUid: return ObjectUid(uid) +def get_version(raw_event: Union[bodies.RawEvent, EOS]) -> Optional[str]: + if isinstance(raw_event, EOS): + return None + else: + return raw_event.get('object', {}).get('metadata', {}).get('resourceVersion') + + async def watcher( *, namespace: references.Namespace, @@ -164,6 +172,7 @@ def exception_handler(exc: BaseException) -> None: exception_handler=exception_handler) streams: Streams = {} + loop = asyncio.get_running_loop() try: # Either use the existing object's queue, or create a new one together with the per-object job. # "Fire-and-forget": we do not wait for the result; the job destroys itself when it is fully done. @@ -184,12 +193,21 @@ def exception_handler(exc: BaseException) -> None: if isinstance(raw_event, watching.Bookmark): continue + # TODO: REMOVE: only for debugging! + rv = raw_event.get('object', {}).get('metadata', {}).get('resourceVersion') + fld = raw_event.get('object', {}).get('spec', {}).get('field') + knd = raw_event.get('object', {}).get('kind') + nam = raw_event.get('object', {}).get('metadata', {}).get('name') + logger.debug(f'STREAM GOT {knd=} {nam=} {rv=} // {fld=} ') + # Multiplex the raw events to per-resource workers/queues. Start the new ones if needed. key: ObjectRef = (resource, get_uid(raw_event)) try: # Feed the worker, as fast as possible, no extra activities. - streams[key].pressure.set() # interrupt current sleeps, if any. - await streams[key].backlog.put(raw_event) + loop.call_later(3.0, streams[key].pressure.set) + loop.call_later(3.0, streams[key].backlog.put_nowait, raw_event) + # streams[key].pressure.set() # interrupt current sleeps, if any. + # await streams[key].backlog.put(raw_event) except KeyError: # Block the operator's readiness for individual resource's index handlers. @@ -203,8 +221,10 @@ def exception_handler(exc: BaseException) -> None: # Start the worker, and feed it initially. Starting can be moderately slow. streams[key] = Stream(backlog=asyncio.Queue(), pressure=asyncio.Event()) - streams[key].pressure.set() # interrupt current sleeps, if any. - await streams[key].backlog.put(raw_event) + # streams[key].pressure.set() # interrupt current sleeps, if any. + # await streams[key].backlog.put(raw_event) + loop.call_later(3.0, streams[key].pressure.set) + loop.call_later(3.0, streams[key].backlog.put_nowait, raw_event) await scheduler.spawn( name=f'worker for {key}', coro=worker( @@ -248,41 +268,48 @@ def exception_handler(exc: BaseException) -> None: async def worker( *, signaller: asyncio.Condition, - processor: WatchStreamProcessor, settings: configuration.OperatorSettings, - resource_indexed: Optional[aiotoggles.Toggle], # None for tests & observation - operator_indexed: Optional[aiotoggles.ToggleSet], # None for tests & observation + processor: WatchStreamProcessor, + operator_indexed: Optional[aiotoggles.ToggleSet] = None, # None for tests & observation + resource_indexed: Optional[aiotoggles.Toggle] = None, # None for tests & non-indexable streams: Streams, key: ObjectRef, ) -> None: """ - The per-object workers consume the object's events and invoke the processors/handlers. + A single worker for a single resource object, each running in its own task. + + An object worker consumes the events from the object-dedicated queue filled + by the watcher of the whole resource kind (i.e. of all objects of that kind) + and invokes the processor (which implies handlers) for that specific object. - The processor is expected to be an async coroutine, always the one from the framework. - In fact, it is either a peering processor, which monitors the peer operators, - or a generic resource processor, which internally calls the registered synchronous processors. + The processor is an internal coroutine of the framework, not of the user. + There are several types of processors involved: - The per-object worker is a time-limited task, which ends as soon as all the object's events - have been handled. The watcher will spawn a new job when and if the new events arrive. + - For operator peering: monitors the peers and suspends/resumes operations. + - For cluster observation: spawns new watchers for new CRDs/namespaces. + - For resource handling: detects changes and calls the user-side handlers. - To prevent the queue/job deletion and re-creation to happen too often, the jobs wait some - reasonable, but small enough time (a few seconds) before actually finishing -- - in case the new events are there, but the API or the watcher task lags a bit. + The worker is time-limited: it exits as soon as all the object's events + have been processed and there are no new events for some time if idling + (a few seconds -- to prevent exiting and recreating the workers too often). + The watcher will spawn a new worker when (and if) new events arrive. + This saves system resources (RAM) on large clusters with low activity. """ + loop = asyncio.get_running_loop() backlog = streams[key].backlog pressure = streams[key].pressure shouldstop = False + consistency_time: Optional[float] = None # None if nothing is expected/awaited. + expected_version: Optional[str] = None # None/non-None is synced with the patch-end-time. try: while not shouldstop: - # Try ASAP, but give it a few seconds for the new events to arrive. - # If the queue is empty for some time, then finish the object's worker. - # If the queue is filled, use the latest event only (within a short time window). - # If an EOS marker is received, handle the last real event, then finish the worker ASAP. + # Get an event ASAP (no delay) if possible. But expect the queue can be empty. + # Save memory by finishing the worker if the backlog is empty for some time. + timeout = max(settings.batching.idle_timeout, + consistency_time - loop.time() if consistency_time is not None else 0) try: - raw_event = await asyncio.wait_for( - backlog.get(), - timeout=settings.batching.idle_timeout) + raw_event = await asyncio.wait_for(backlog.get(), timeout=timeout) except asyncio.TimeoutError: # A tricky part! Under high-load or with synchronous blocks of asyncio event-loop, # it is possible that the timeout happens while the queue is filled: depending on @@ -296,31 +323,40 @@ async def worker( break else: continue - else: - try: - while True: - prev_event = raw_event - next_event = await asyncio.wait_for( - backlog.get(), - timeout=settings.batching.batch_window) - shouldstop = shouldstop or isinstance(next_event, EOS) - raw_event = prev_event if isinstance(next_event, EOS) else next_event - except asyncio.TimeoutError: - pass # Exit gracefully and immediately on the end-of-stream marker sent by the watcher. if isinstance(raw_event, EOS): - break - - # Try the processor. In case of errors, show the error, but continue the processing. + break # out of the worker. + + # # TODO: REMOVE: only for debugging! + # rv = raw_event.get('object', {}).get('metadata', {}).get('resourceVersion') + # fld = raw_event.get('object', {}).get('spec', {}).get('field') + # knd = raw_event.get('object', {}).get('kind') + # nam = raw_event.get('object', {}).get('metadata', {}).get('name') + # logger.debug(f'QUEUED GOT {knd=} {nam=} {rv=} exp={expected_version!r} // {fld=} ') + + # Keep track of the resource's consistency for high-level (state-dependent) handlers. + # See `settings.persistence.consistency_timeout` for the explanation of consistency. + if expected_version is not None and expected_version == get_version(raw_event): + expected_version = None + consistency_time = None + + # Process the event. It might include sleeping till the time of consistency assumption + # (i.e. ignoring that the patched version was not received and behaving as if it was). pressure.clear() - await processor( + newer_patch_version = await processor( raw_event=raw_event, stream_pressure=pressure, resource_indexed=resource_indexed, operator_indexed=operator_indexed, + consistency_time=consistency_time, ) + # With every new PATCH API call (if done), restart the consistency waiting. + if newer_patch_version is not None and settings.persistence.consistency_timeout: + expected_version = newer_patch_version + consistency_time = loop.time() + settings.persistence.consistency_timeout + except Exception: # Log the error for every worker: there can be several of them failing at the same time, # but only one will trigger the watcher's failure -- others could be lost if not logged. diff --git a/tests/handling/test_state_consistency.py b/tests/handling/test_state_consistency.py new file mode 100644 index 00000000..7835fa39 --- /dev/null +++ b/tests/handling/test_state_consistency.py @@ -0,0 +1,155 @@ +""" +Ensure that the framework properly invokes or ignores the handlers +depending on the consistency of the incoming stream of events. +""" +import asyncio +import logging + +import pytest + +import kopf +from kopf._core.intents.causes import Reason, HANDLER_REASONS +from kopf._core.engines.indexing import OperatorIndexers +from kopf._core.reactor.inventory import ResourceMemories +from kopf._core.reactor.processing import process_resource_event +from kopf._cogs.structs.ephemera import Memo + + +# TODO: `consistency_time` is the target of the tests: +# consistency_time=None, => it processes immediately +# consistency_time=in the past, => it processes immediately (as if None) +# consistency_time=within the window, => it sleeps until the time, then processes +# consistency_time=after the window, => it assumes the consistency, then processes +# UPD: no need for within/outside window. Just in the future is enough. +# the window limiting is a responsibility of another unit (queueing/worker). +# ALSO: +# with/without change-detecting handlers. +# with/without event-watching handlers. +# ALSO: +# when awakened by a new event (stream pressure). + +# TODO: Test that the on-event() and on-creation/update/deletion happen in different times, +# that the sleep is between them, and that the latter ones are executed STRICTLY after consistency. +# NOW, the timing of the handlers is not tested. + +@pytest.mark.parametrize('cause_reason', HANDLER_REASONS) +async def test_implicit_consistency( + resource, registry, settings, handlers, caplog, cause_mock, cause_reason, k8s_mocked, timer): + caplog.set_level(logging.DEBUG) + + event_type = None if cause_reason == Reason.RESUME else 'irrelevant' + cause_mock.reason = cause_reason + + with timer: + event_queue = asyncio.Queue() + await process_resource_event( + lifecycle=kopf.lifecycles.all_at_once, + registry=registry, + settings=settings, + resource=resource, + indexers=OperatorIndexers(), + memories=ResourceMemories(), + memobase=Memo(), + raw_event={'type': event_type, 'object': {}}, + event_queue=event_queue, + consistency_time=None, + ) + + assert k8s_mocked.sleep.call_count == 0 + # assert timer.seconds < 0.01 + + assert handlers.event_mock.call_count == 1 + assert handlers.create_mock.call_count == (1 if cause_reason == Reason.CREATE else 0) + assert handlers.update_mock.call_count == (1 if cause_reason == Reason.UPDATE else 0) + assert handlers.delete_mock.call_count == (1 if cause_reason == Reason.DELETE else 0) + assert handlers.resume_mock.call_count == (1 if cause_reason == Reason.RESUME else 0) + + +@pytest.mark.parametrize('cause_reason', HANDLER_REASONS) +async def test_past_consistency( + resource, registry, settings, handlers, caplog, cause_mock, cause_reason, k8s_mocked, timer): + caplog.set_level(logging.DEBUG) + loop = asyncio.get_running_loop() + + event_type = None if cause_reason == Reason.RESUME else 'irrelevant' + cause_mock.reason = cause_reason + + with timer: + event_queue = asyncio.Queue() + await process_resource_event( + lifecycle=kopf.lifecycles.all_at_once, + registry=registry, + settings=settings, + resource=resource, + indexers=OperatorIndexers(), + memories=ResourceMemories(), + memobase=Memo(), + raw_event={'type': event_type, 'object': {}}, + event_queue=event_queue, + consistency_time=loop.time() - 10, + ) + + assert k8s_mocked.sleep.call_count == 1 + delay = k8s_mocked.sleep.call_args[0][0] + assert delay < 0 + + # assert timer.seconds < 0.01 + + assert handlers.event_mock.call_count == 1 + assert handlers.create_mock.call_count == (1 if cause_reason == Reason.CREATE else 0) + assert handlers.update_mock.call_count == (1 if cause_reason == Reason.UPDATE else 0) + assert handlers.delete_mock.call_count == (1 if cause_reason == Reason.DELETE else 0) + assert handlers.resume_mock.call_count == (1 if cause_reason == Reason.RESUME else 0) + + +@pytest.mark.parametrize('cause_reason', HANDLER_REASONS) +async def test_future_consistency( + resource, registry, settings, handlers, caplog, cause_mock, cause_reason, k8s_mocked, timer): + caplog.set_level(logging.DEBUG) + loop = asyncio.get_running_loop() + + settings.persistence.consistency_timeout = 5 + + event_type = None if cause_reason == Reason.RESUME else 'irrelevant' + cause_mock.reason = cause_reason + + with timer: + event_queue = asyncio.Queue() + await process_resource_event( + lifecycle=kopf.lifecycles.all_at_once, + registry=registry, + settings=settings, + resource=resource, + indexers=OperatorIndexers(), + memories=ResourceMemories(), + memobase=Memo(), + raw_event={'type': event_type, 'object': {}}, + event_queue=event_queue, + consistency_time=loop.time() + 3, + ) + + assert k8s_mocked.sleep.call_count == 1 + delay = k8s_mocked.sleep.call_args[0][0] + assert 2.9 < delay < 3.0 + # assert timer.seconds < 0.01 + + assert handlers.event_mock.call_count == 1 + assert handlers.create_mock.call_count == (1 if cause_reason == Reason.CREATE else 0) + assert handlers.update_mock.call_count == (1 if cause_reason == Reason.UPDATE else 0) + assert handlers.delete_mock.call_count == (1 if cause_reason == Reason.DELETE else 0) + assert handlers.resume_mock.call_count == (1 if cause_reason == Reason.RESUME else 0) + + +# TODO: we definitely need a loop with a fake time! +# and the time should start with 0. +# and it should have zero-waste on code overhead, only on sleeps. + + +# TODO: +# And then, there will be separate splitting for: +# - watcher() -> processor() with proper/expected consistency_time, +# - processor() -> handlers() properly according to various consistency times. +# This leaks some abstractions of how consistency works to the tests, but can be tolerated +# due to complexity of units, with "consistency time" treated as a unit contract. +# In addition, the whole bundle can be tested: +# - watcher() -> handlers() -- i.e. a full simulation of the watch-stream. diff --git a/tests/reactor/test_processing_consistency.py b/tests/reactor/test_processing_consistency.py new file mode 100644 index 00000000..c69a503a --- /dev/null +++ b/tests/reactor/test_processing_consistency.py @@ -0,0 +1,49 @@ +import asyncio + +import logging + +from unittest.mock import Mock + +import pytest + +import kopf +from kopf._core.intents.causes import Reason +from kopf._core.reactor.processing import process_resource_event + + +def test_(resource, registry, settings, handlers, caplog, cause_mock): + caplog.set_level(logging.DEBUG) + cause_mock.reason = Reason.CREATE + + event_queue = asyncio.Queue() + rv = await process_resource_event( + lifecycle=kopf.lifecycles.all_at_once, + registry=registry, + settings=settings, + resource=resource, + indexers=OperatorIndexers(), + memories=ResourceMemories(), + memobase=Memo(), + raw_event={'type': '...', 'object': {}}, + event_queue=event_queue, + # TODO: this is the target of the tests: + # consistency_time=None, => it processes immediately + # consistency_time=in the past, => it processes immediately (as if None) + # consistency_time=within the window, => it sleeps until the time, then processes + # consistency_time=after the window, => it assumes the consistency, then processes + # ALSO: + # with/without change-detecting handlers. + # with/without event-watching handlers. + # ALSO: + # when awakened by a new event (stream pressure). + consistency_time=None, + ) + + # TODO: + # And then, there will be separate splitting for: + # - watcher() -> processor() with proper/expected consistency_time, + # - processor() -> handlers() properly according to various consistency times. + # This leaks some abstractions of how consistency works to the tests, but can be tolerated + # due to complexity of units, with "consistency time" treated as a unit contract. + # In addition, the whole bundle can be tested: + # - watcher() -> handlers() -- i.e. a full simulation of the watch-stream. diff --git a/tests/reactor/test_queueing.py b/tests/reactor/test_queueing.py index 2bce4c0b..a2caeca1 100644 --- a/tests/reactor/test_queueing.py +++ b/tests/reactor/test_queueing.py @@ -24,6 +24,34 @@ from kopf._core.reactor.queueing import EOS, watcher +# TODO: All tests with a simulated stream must use a Kopf-provided tooling. +# This is reusable in operators! +# It must inject itself into the prepared kopf runner, +# but do not touch the new ones (unless explicitly commanded to monkeypatch/activate itself). +# How can such a syntax/tool look like? +# Supported features of the stream: +# - Exact events. +# - Timed pauses! +# - Synchronization primitives (sync/async: events, toggles, flags, etc) +# - Callables/awaitables/generators for dynamic event generation (None means no emitted event). +# - Connection errors. +# - ? Conditional events when the object reaches some state? +# BUT: how do we get a patch? It can be done by other libraries. +def test_me(stream): + stream.feed(events1) + stream.feed([callableA]) + stream.feed([coroutineB]) + stream.feed([generatorC]) + stream.feed([asyncio_event]) + stream.feed([threading_event]) + stream.feed([kopf.testing.StreamPause(1)]) + stream.pause(1) + stream.close() + with kopf.testing.KopfRunner(...): + ... + ... + + @pytest.mark.parametrize('uids, cnts, events', [ pytest.param(['uid1'], [1], [ diff --git a/tests/reactor/test_resource_version.py b/tests/reactor/test_resource_version.py new file mode 100644 index 00000000..a155a63f --- /dev/null +++ b/tests/reactor/test_resource_version.py @@ -0,0 +1,15 @@ +import pytest + +from kopf._core.reactor.queueing import get_version, EOS + + +@pytest.mark.parametrize('raw_event, expected_version', [ + pytest.param(EOS, None, id='eos'), + pytest.param({}, None, id='empty-event'), + pytest.param({'object': {}}, None, id='empty-object'), + pytest.param({'object': {'metadata': {}}}, None, id='empty-metadata'), + pytest.param({'object': {'metadata': {'resourceVersion': '123abc'}}}, '123abc', id='123abc'), +]) +def test_resource_version_detection(raw_event, expected_version): + resource_version = get_version(raw_event) + assert resource_version == expected_version diff --git a/tests/settings/test_defaults.py b/tests/settings/test_defaults.py index 02c7d610..5fc9550c 100644 --- a/tests/settings/test_defaults.py +++ b/tests/settings/test_defaults.py @@ -1,5 +1,7 @@ import logging +import pytest + import kopf @@ -21,7 +23,6 @@ async def test_declared_public_interface_and_promised_defaults(): assert settings.batching.worker_limit is None assert settings.batching.idle_timeout == 5.0 assert settings.batching.exit_timeout == 2.0 - assert settings.batching.batch_window == 0.1 assert settings.batching.error_delays == (1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144, 233, 377, 610) assert settings.scanning.disabled == False assert settings.admission.server is None @@ -30,6 +31,7 @@ async def test_declared_public_interface_and_promised_defaults(): assert settings.execution.max_workers is None assert settings.networking.request_timeout == 5 * 60 assert settings.networking.connect_timeout is None + assert settings.persistence.consistency_timeout == 5.0 async def test_peering_namespaced_is_modified_by_clusterwide(): @@ -44,3 +46,13 @@ async def test_peering_clusterwide_is_modified_by_namespaced(): assert settings.peering.clusterwide == False settings.peering.namespaced = not settings.peering.namespaced assert settings.peering.clusterwide == True + + +async def test_deprecated_batch_window_is_still_persisted(): + settings = kopf.OperatorSettings() + with pytest.warns(DeprecationWarning, match=r"Time-based event batching was removed."): + assert settings.batching.batch_window == 0.1 + with pytest.warns(DeprecationWarning, match=r"Time-based event batching was removed."): + settings.batching.batch_window = 123 + with pytest.warns(DeprecationWarning, match=r"Time-based event batching was removed."): + assert settings.batching.batch_window == 123