Skip to content

Commit

Permalink
WIP a repro with artificial inconsistency
Browse files Browse the repository at this point in the history
Signed-off-by: Sergey Vasilyev <[email protected]>
  • Loading branch information
nolar committed Oct 3, 2021
1 parent b561141 commit 051496e
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 7 deletions.
82 changes: 79 additions & 3 deletions examples/01-minimal/example.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,83 @@
import logging
import kopf
import dataclasses


# @kopf.on.login()
# def delayed_k3s(**_):
# conn = kopf.login_via_pykube(logger=logging.getLogger('xxx'))
# if conn:
# return dataclasses.replace(conn, server=conn.server.rsplit(':', 1)[0] + ':11223')


@kopf.on.create('kopfexamples')
def create_fn(spec, **kwargs):
print(f"And here we are! Creating: {spec}")
return {'message': 'hello world'} # will be the new status
@kopf.on.update('kopfexamples')
def create_fn(meta, spec, reason, logger, **kwargs):
rv = meta.get('resourceVersion')
logger.warning(f">>> {rv=} And here we are! {reason=}: {spec}")


# @kopf.on.create('kopfexamples')
# def create_fn2(spec, **kwargs):
# print(f"And here we are! Creating2: {spec}")

"""
=======================================================================================================================
Trigger with (delete the object first!):
$ kubectl apply -f examples/obj.yaml && sleep 1 && kubectl patch -f examples/obj.yaml --type merge -p '{"spec": {"field": 2}}'
=======================================================================================================================
The timeline of a misbehaving operator (with an artificial latency of 3 seconds):
/-- kubectl creates an object (state a=s0)
| ... sleep 1s
| /-- kubectl patches the spec.field with the patch "p1", creates state "b"=s0+p1
| | /-- Kopf patches with annotations (state c=s0+p1+p2)
| | | /-- Kopf patches with annotations (the same state d=s0+p1+p2+p3, d==c)
↓ ↓ | |
----+-//-aaaaabbbbbbbcccccdddddddddddddddddd--> which state is stored in kubernetes
↓ ↓ ↑↓ ↑↓
| | || |\----3s----\
| | |\---+3s----\ |
| \----3s+---\| | |
\----3s----\| || | |
↓↑ ↓↑ ↓ ↓
----+-//------------aaaaabbbbbbbbcccccdddddd--> which state is seen by the operator
↓ ↓↑ ↓↑ ↓ ↓
| || || | \-- Kopf gets the state "d"=s0+p1+p2+p3, sees the annotations, goes idle.
| || || \-- Kopf gets the state "c"=s0+p1+p2, sees the annotations, goes idle.
| || ||
| || |\-- Kopf reacts, executes handlers (2ND TIME), adds annotations with a patch (p3)
| || \-- Kopf gets the state "b"=s0+p1 with NO annotations of "p2" yet.
| || !BUG!: "c"=s0+p1+p2 is not seen yet, though "c"/"p2" exists by now!
| ||
| |\-- Kopf reacts, executes handlers (1ST TIME), adds annotations with a patch (p2)
| \-- Kopf gets a watch-event (state a)
\-- Kopf starts watching the resource
A fix with consistency tracking (with an artificial latency of 3 seconds):
/-- kubectl creates an object (state a=s0)
| ... sleep 1s
| /-- kubectl patches the spec.field with the patch "p1", creates state "b"=s0+p1
| | /-- Kopf patches with annotations (state c=s0+p1+p2)
↓ ↓ |
----+-//-aaaaabbbbbbbccccccccccccccccccc-> which state is stored in kubernetes
↓ ↓ ↑↓
| | |\----3s----\
| \----3s+---\ |
\----3s----\| | |
↓↑ ↓ ↓
----+-//------------aaaaabbbbbbbbcccccc-> which state is seen by the operator
↓ ↓↑⇶⇶⇶⇶⇶⇶⇶⇶⇶⇶⇶⇶↓ Kopf's own patch "p2" enables the consistency expectation for 5s OR version "c"
| || | |
| || | \-- Kopf gets a consistent state "c"=s0+p1+p2 as expected, thus goes idle.
| || |
| || \-- Kopf executes ONLY the low-level handlers over the state "b"=s0+p1.
| || \~~~~~~⨳ inconsistency mode: wait until a new event (then discard it) OR timeout (then process it)
| ||
| |\-- Kopf reacts, executes handlers, adds annotations with a patch (p2)
| \-- Kopf gets a watch-event (state a)
\-- Kopf starts watching the resource
"""
3 changes: 3 additions & 0 deletions kopf/_core/reactor/processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,7 @@ async def process_resource_causes(
# If the wait exceeds its time and no new consistent events arrive, then fake the consistency.
# However, if a patch is accumulated by now, skip waiting and apply it instantly (by exiting).
# In that case, we are guaranteed to be inconsistent, so also skip the state-dependent handlers.
unslept: Optional[float] = None
consistency_is_required = changing_cause is not None
consistency_is_achieved = consistency_time is None # i.e. preexisting consistency
if consistency_is_required and not consistency_is_achieved and not patch and consistency_time:
Expand All @@ -316,6 +317,8 @@ async def process_resource_causes(
consistency_is_achieved = unslept is None # "woke up" vs. "timed out"
if consistency_is_required and not consistency_is_achieved:
changing_cause = None # exit to PATCHing and/or re-iterating over new events.
rv = raw_event.get('object', {}).get('metadata', {}).get('resourceVersion')
local_logger.debug(f'>>> {rv=} {consistency_is_required=} {consistency_is_achieved=} {unslept=} {changing_cause=}')

# Now, the consistency is either pre-proven (by receiving or not expecting any resource version)
# or implied (by exceeding the allowed consistency-waiting timeout while getting no new events).
Expand Down
27 changes: 23 additions & 4 deletions kopf/_core/reactor/queueing.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ def exception_handler(exc: BaseException) -> None:
exception_handler=exception_handler)
streams: Streams = {}

loop = asyncio.get_running_loop()
try:
# Either use the existing object's queue, or create a new one together with the per-object job.
# "Fire-and-forget": we do not wait for the result; the job destroys itself when it is fully done.
Expand All @@ -192,12 +193,21 @@ def exception_handler(exc: BaseException) -> None:
if isinstance(raw_event, watching.Bookmark):
continue

# TODO: REMOVE: only for debugging!
rv = raw_event.get('object', {}).get('metadata', {}).get('resourceVersion')
fld = raw_event.get('object', {}).get('spec', {}).get('field')
knd = raw_event.get('object', {}).get('kind')
nam = raw_event.get('object', {}).get('metadata', {}).get('name')
logger.debug(f'STREAM GOT {knd=} {nam=} {rv=} // {fld=} ')

# Multiplex the raw events to per-resource workers/queues. Start the new ones if needed.
key: ObjectRef = (resource, get_uid(raw_event))
try:
# Feed the worker, as fast as possible, no extra activities.
streams[key].pressure.set() # interrupt current sleeps, if any.
await streams[key].backlog.put(raw_event)
loop.call_later(3.0, streams[key].pressure.set)
loop.call_later(3.0, streams[key].backlog.put_nowait, raw_event)
# streams[key].pressure.set() # interrupt current sleeps, if any.
# await streams[key].backlog.put(raw_event)
except KeyError:

# Block the operator's readiness for individual resource's index handlers.
Expand All @@ -211,8 +221,10 @@ def exception_handler(exc: BaseException) -> None:

# Start the worker, and feed it initially. Starting can be moderately slow.
streams[key] = Stream(backlog=asyncio.Queue(), pressure=asyncio.Event())
streams[key].pressure.set() # interrupt current sleeps, if any.
await streams[key].backlog.put(raw_event)
# streams[key].pressure.set() # interrupt current sleeps, if any.
# await streams[key].backlog.put(raw_event)
loop.call_later(3.0, streams[key].pressure.set)
loop.call_later(3.0, streams[key].backlog.put_nowait, raw_event)
await scheduler.spawn(
name=f'worker for {key}',
coro=worker(
Expand Down Expand Up @@ -316,6 +328,13 @@ async def worker(
if isinstance(raw_event, EOS):
break # out of the worker.

# TODO: REMOVE: only for debugging!
rv = raw_event.get('object', {}).get('metadata', {}).get('resourceVersion')
fld = raw_event.get('object', {}).get('spec', {}).get('field')
knd = raw_event.get('object', {}).get('kind')
nam = raw_event.get('object', {}).get('metadata', {}).get('name')
logger.debug(f'QUEUED GOT {knd=} {nam=} {rv=} exp={expected_version!r} // {fld=} ')

# Keep track of the resource's consistency for high-level (state-dependent) handlers.
# See `settings.persistence.consistency_timeout` for the explanation of consistency.
if expected_version is not None and expected_version == get_version(raw_event):
Expand Down

0 comments on commit 051496e

Please sign in to comment.