Skip to content

Commit

Permalink
feat: disable event batching by default
Browse files Browse the repository at this point in the history
  • Loading branch information
paxbit committed Sep 10, 2021
1 parent fa12e3e commit a54f7cc
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 14 deletions.
20 changes: 16 additions & 4 deletions kopf/_cogs/configs/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,10 +204,22 @@ class BatchingSettings:
How soon an idle worker is exited and garbage-collected if no events arrive.
"""

batch_window: float = 0.1
"""
How fast/slow does a worker deplete the queue when an event is received.
All events arriving within this window will be ignored except the last one.
batch_window: float = None
"""
A lossy but in some cases viable optimization mechanism to take some
load off of event processing.
Defines the debouncing interval a worker depletes the incoming event queue with.
All events arriving within this window will be ignored except the last one. This can have
consequences depending on the type and use-case of the operator implemented using kopf.
Example: Given an event sequence of resource A of ``CREATED > MODIFIED > DELETED``, if this
sequence is received by the incoming event queue within ``batch_window`` any handlers dealing
with ``CREATED`` or ``MODIFIED`` events of resource A will never be called. Instead the first handler
being called for A will be the ``DELETED`` handler, if one exists. Depending on what your operator
does, this might be problematic.
If your use-case allows debouncing of event sequences for a resource, set this to ``> 0``.
A notable side effect of doing this is an implicit handler delay for all handlers as all events
will only be dispatched to the handler after this interval.
"""

exit_timeout: float = 2.0
Expand Down
21 changes: 11 additions & 10 deletions kopf/_core/reactor/queueing.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,16 +304,17 @@ async def worker(
else:
continue
else:
try:
while True:
prev_event = raw_event
next_event = await asyncio.wait_for(
backlog.get(),
timeout=settings.batching.batch_window)
shouldstop = shouldstop or isinstance(next_event, EOS)
raw_event = prev_event if isinstance(next_event, EOS) else next_event
except asyncio.TimeoutError:
pass
if settings.batching.batch_window is not None:
try:
while True:
prev_event = raw_event
next_event = await asyncio.wait_for(
backlog.get(),
timeout=settings.batching.batch_window)
shouldstop = shouldstop or isinstance(next_event, EOS)
raw_event = prev_event if isinstance(next_event, EOS) else next_event
except asyncio.TimeoutError:
pass

# Exit gracefully and immediately on the end-of-stream marker sent by the watcher.
if isinstance(raw_event, EOS):
Expand Down
55 changes: 55 additions & 0 deletions tests/reactor/test_queueing.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,61 @@ async def test_watchevent_batching(settings, resource, processor, timer,
assert actual_uid_val_pairs == expected_uid_val_pairs


@pytest.mark.parametrize('uids, vals, events', [
pytest.param(['uid1', 'uid1'], ['a', 'b'], [
{'type': 'MODIFIED', 'object': {'metadata': {'uid': 'uid1'}, 'spec': 'a'}},
{'type': 'MODIFIED', 'object': {'metadata': {'uid': 'uid1'}, 'spec': 'b'}},
], id='the same'),
pytest.param(['uid1', 'uid2'], ['a', 'b'], [
{'type': 'MODIFIED', 'object': {'metadata': {'uid': 'uid1'}, 'spec': 'a'}},
{'type': 'MODIFIED', 'object': {'metadata': {'uid': 'uid2'}, 'spec': 'b'}},
], id='distinct'),
pytest.param(['uid1', 'uid2', 'uid1', 'uid2', 'uid1', 'uid3'], ['a', 'b', 'c', 'd', 'e', 'f'], [
{'type': 'ADDED', 'object': {'metadata': {'uid': 'uid1'}, 'spec': 'a'}},
{'type': 'ADDED', 'object': {'metadata': {'uid': 'uid2'}, 'spec': 'b'}},
{'type': 'MODIFIED', 'object': {'metadata': {'uid': 'uid1'}, 'spec': 'c'}},
{'type': 'MODIFIED', 'object': {'metadata': {'uid': 'uid2'}, 'spec': 'd'}},
{'type': 'DELETED', 'object': {'metadata': {'uid': 'uid1'}, 'spec': 'e'}},
{'type': 'DELETED', 'object': {'metadata': {'uid': 'uid3'}, 'spec': 'f'}},
], id='mixed'),
])
@pytest.mark.usefixtures('watcher_limited')
async def test_watchevent_none_batching(settings, resource, processor, timer,
stream, events, uids, vals, event_loop):
""" Verify that all stream events are handled if batching is disabled. """

# Override the default timeouts to make the tests faster.
settings.batching.idle_timeout = 100 # should not be involved, fail if it is
settings.batching.exit_timeout = 100 # should exit instantly, fail if it didn't
settings.batching.batch_window = None # disable batching entirely

# Inject the events of unique objects - to produce few streams/workers.
stream.feed(events)
stream.close()

# Run the watcher (near-instantly and test-blocking).
with timer:
await watcher(
namespace=None,
resource=resource,
settings=settings,
processor=processor,
)

# Was the processor called exactly once for each stream event?
assert processor.await_count == len(events)

expected_uid_val_pairs = set(zip(uids, vals))
actual_uid_val_pairs = set((
kwargs['raw_event']['object']['metadata']['uid'],
kwargs['raw_event']['object']['spec'])
for args, kwargs in processor.call_args_list)
assert actual_uid_val_pairs == expected_uid_val_pairs

@pytest.mark.parametrize('unique, events', [
pytest.param(1, [
Expand Down

0 comments on commit a54f7cc

Please sign in to comment.