Skip to content

Commit

Permalink
feat: Delayed events; Support for SCXML <send> tag
Browse files Browse the repository at this point in the history
  • Loading branch information
fgmacedo committed Nov 27, 2024
1 parent ed35933 commit c46e932
Show file tree
Hide file tree
Showing 25 changed files with 773 additions and 44 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ dummy-variable-rgx = "^(_+|(_+[a-zA-Z0-9_]*[a-zA-Z0-9]+?))$"
"tests/examples/**.py" = ["B018"]

[tool.ruff.lint.mccabe]
max-complexity = 6
max-complexity = 10

[tool.ruff.lint.isort]
force-single-line = true
Expand Down
10 changes: 9 additions & 1 deletion statemachine/engines/async_.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import asyncio
import heapq
from time import time
from typing import TYPE_CHECKING

from ..event_data import EventData
Expand Down Expand Up @@ -61,7 +64,12 @@ async def processing_loop(self):
try:
# Execute the triggers in the queue in FIFO order until the queue is empty
while self._external_queue:
trigger_data = self._external_queue.popleft()
trigger_data = heapq.heappop(self._external_queue)
current_time = time()
if trigger_data.execution_time > current_time:
self.put(trigger_data)
await asyncio.sleep(0.001)
continue

Check warning on line 72 in statemachine/engines/async_.py

View check run for this annotation

Codecov / codecov/patch

statemachine/engines/async_.py#L70-L72

Added lines #L70 - L72 were not covered by tests
try:
result = await self._trigger(trigger_data)
if first_result is self._sentinel:
Expand Down
7 changes: 4 additions & 3 deletions statemachine/engines/base.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from collections import deque
import heapq
from threading import Lock
from typing import TYPE_CHECKING
from weakref import proxy
Expand All @@ -17,7 +17,7 @@
class BaseEngine:
def __init__(self, sm: "StateMachine", rtc: bool = True):
self.sm: StateMachine = proxy(sm)
self._external_queue: deque = deque()
self._external_queue: list = []
self._sentinel = object()
self._rtc = rtc
self._processing = Lock()
Expand All @@ -27,7 +27,8 @@ def put(self, trigger_data: TriggerData):
"""Put the trigger on the queue without blocking the caller."""
if not self._running and not self.sm.allow_event_without_transition:
raise TransitionNotAllowed(trigger_data.event, self.sm.current_state)
self._external_queue.append(trigger_data)

heapq.heappush(self._external_queue, trigger_data)

def start(self):
if self.sm.current_state_value is not None:
Expand Down
12 changes: 10 additions & 2 deletions statemachine/engines/sync.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import heapq
from time import sleep
from time import time
from typing import TYPE_CHECKING

from ..event_data import EventData
Expand Down Expand Up @@ -47,7 +50,7 @@ def processing_loop(self):
"""
if not self._rtc:
# The machine is in "synchronous" mode
trigger_data = self._external_queue.popleft()
trigger_data = heapq.heappop(self._external_queue)
return self._trigger(trigger_data)

# We make sure that only the first event enters the processing critical section,
Expand All @@ -62,7 +65,12 @@ def processing_loop(self):
try:
# Execute the triggers in the queue in FIFO order until the queue is empty
while self._running and self._external_queue:
trigger_data = self._external_queue.popleft()
trigger_data = heapq.heappop(self._external_queue)
current_time = time()
if trigger_data.execution_time > current_time:
self.put(trigger_data)
sleep(0.001)
continue
try:
result = self._trigger(trigger_data)
if first_result is self._sentinel:
Expand Down
36 changes: 21 additions & 15 deletions statemachine/event.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
from inspect import isawaitable
from typing import TYPE_CHECKING
from typing import List
from uuid import uuid4

from statemachine.utils import run_async_from_sync

from .event_data import TriggerData
from .i18n import _

Expand Down Expand Up @@ -42,6 +39,9 @@ class Event(str):
name: str
"""The event name."""

delay: float = 0
"""The delay in milliseconds before the event is triggered. Default is 0."""

_sm: "StateMachine | None" = None
"""The state machine instance."""

Expand All @@ -53,6 +53,7 @@ def __new__(
transitions: "str | TransitionList | None" = None,
id: "str | None" = None,
name: "str | None" = None,
delay: float = 0,
_sm: "StateMachine | None" = None,
):
if isinstance(transitions, str):
Expand All @@ -64,6 +65,7 @@ def __new__(

instance = super().__new__(cls, id)
instance.id = id
instance.delay = delay
if name:
instance.name = name
elif _has_real_id:
Expand Down Expand Up @@ -92,19 +94,13 @@ def __get__(self, instance, owner):
"""
if instance is None:
return self
return BoundEvent(id=self.id, name=self.name, _sm=instance)

def __call__(self, *args, **kwargs):
"""Send this event to the current state machine.
return BoundEvent(id=self.id, name=self.name, delay=self.delay, _sm=instance)

Triggering an event on a state machine means invoking or sending a signal, initiating the
process that may result in executing a transition.
"""
def put(self, *args, machine: "StateMachine", **kwargs):
# The `__call__` is declared here to help IDEs knowing that an `Event`
# can be called as a method. But it is not meant to be called without
# an SM instance. Such SM instance is provided by `__get__` method when
# used as a property descriptor.
machine = self._sm
if machine is None:
raise RuntimeError(_("Event {} cannot be called without a SM instance").format(self))

Expand All @@ -116,10 +112,20 @@ def __call__(self, *args, **kwargs):
kwargs=kwargs,
)
machine._put_nonblocking(trigger_data)
result = machine._processing_loop()
if not isawaitable(result):
return result
return run_async_from_sync(result)

def __call__(self, *args, **kwargs):
"""Send this event to the current state machine.
Triggering an event on a state machine means invoking or sending a signal, initiating the
process that may result in executing a transition.
"""
# The `__call__` is declared here to help IDEs knowing that an `Event`
# can be called as a method. But it is not meant to be called without
# an SM instance. Such SM instance is provided by `__get__` method when
# used as a property descriptor.
machine = self._sm
self.put(*args, machine=machine, **kwargs)
return machine._processing_loop()

def split( # type: ignore[override]
self, sep: "str | None" = None, maxsplit: int = -1
Expand Down
34 changes: 29 additions & 5 deletions statemachine/event_data.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from dataclasses import dataclass
from dataclasses import field
from time import time
from typing import TYPE_CHECKING
from typing import Any

Expand All @@ -11,23 +12,36 @@


@dataclass
class _Data:
kwargs: dict

def __getattr__(self, name):
return self.kwargs.get(name, None)


@dataclass(order=True)
class TriggerData:
machine: "StateMachine"
machine: "StateMachine" = field(compare=False)

event: "Event | None"
event: "Event | None" = field(compare=False)
"""The Event that was triggered."""

model: Any = field(init=False)
execution_time: float = field(default=0.0)
"""The time at which the :ref:`Event` should run."""

model: Any = field(init=False, compare=False)
"""A reference to the underlying model that holds the current :ref:`State`."""

args: tuple = field(default_factory=tuple)
args: tuple = field(default_factory=tuple, compare=False)
"""All positional arguments provided on the :ref:`Event`."""

kwargs: dict = field(default_factory=dict)
kwargs: dict = field(default_factory=dict, compare=False)
"""All keyword arguments provided on the :ref:`Event`."""

def __post_init__(self):
self.model = self.machine.model
delay = self.event.delay if self.event and self.event.delay else 0
self.execution_time = time() + (delay / 1000)


@dataclass
Expand Down Expand Up @@ -77,3 +91,13 @@ def extended_kwargs(self):
kwargs["source"] = self.source
kwargs["target"] = self.target
return kwargs

@property
def data(self):
"Property used by the SCXML namespace"
if self.trigger_data.kwargs:
return _Data(self.trigger_data.kwargs)
elif self.trigger_data.args and len(self.trigger_data.args) == 1:
return self.trigger_data.args[0]
else:
return self.trigger_data.args

Check warning on line 103 in statemachine/event_data.py

View check run for this annotation

Codecov / codecov/patch

statemachine/event_data.py#L103

Added line #L103 was not covered by tests
Loading

0 comments on commit c46e932

Please sign in to comment.