Replies: 2 comments 1 reply
-
Hi @dpetican , how are you? Thanks for getting in touch.
For simple state machines, one can consider the queue as an implementation detail. If you don't have concurrent calls and don't have nested event triggers while within a callback, there's nothing new to consider. The real gain with the queue is that if we receive an event trigger while processing a previous trigger, the new event will be put on a queue instead of starting to execute. |
Beta Was this translation helpful? Give feedback.
-
I think that RTC is always the best choice, I even consider removing the non-RTC option, as it sounds simpler at first, but can hurt if the callback handlers are non-trivial. Can you elaborate on a POC of your expected state machine? If you don't want to worry about input data changing (like info from the sensors) while processing an event, consider passing the data with the event call. Exampleimport random
from statemachine import State
from statemachine import StateMachine
def sensor_temperature_reader(seed: int, lower: float = 15, higher: float = 35):
random.seed(seed)
while True:
yield random.randint(lower, higher)
class AirConditioner(StateMachine):
"""
>>> sensor = sensor_temperature_reader(123456)
>>> sm = AirConditioner()
>>> for i in range(20):
... temperature = next(sensor)
... sm.send("sensor_updated", temperature=temperature)
Running sensor_updated from Off to Off: {'temperature': 24}
Running sensor_updated from Off to Off: {'temperature': 15}
Running sensor_updated from Off to Off: {'temperature': 20}
Running sensor_updated from Off to Off: {'temperature': 15}
Running sensor_updated from Off to Off: {'temperature': 17}
Running sensor_updated from Off to Off: {'temperature': 16}
Running sensor_updated from Off to Off: {'temperature': 23}
Running sensor_updated from Off to Off: {'temperature': 15}
Running sensor_updated from Off to Off: {'temperature': 18}
Running sensor_updated from Off to Off: {'temperature': 22}
Running sensor_updated from Off to Cooling: {'temperature': 35}
Running sensor_updated from Cooling to Cooling: {'temperature': 30}
Running sensor_updated from Cooling to Cooling: {'temperature': 20}
Running sensor_updated from Cooling to Standby: {'temperature': 15}
Running sensor_updated from Standby to Cooling: {'temperature': 31}
Running sensor_updated from Cooling to Standby: {'temperature': 19}
Running sensor_updated from Standby to Cooling: {'temperature': 27}
Running sensor_updated from Cooling to Cooling: {'temperature': 35}
Running sensor_updated from Cooling to Cooling: {'temperature': 26}
Running sensor_updated from Cooling to Standby: {'temperature': 16}
"""
off = State(initial=True)
cooling = State()
standby = State()
sensor_updated = (
off.to(cooling, cond="is_hot")
| cooling.to(standby, cond="is_good")
| standby.to(cooling, cond="is_hot")
| standby.to(off, cond="is_cool")
| off.to.itself(internal=True)
| cooling.to.itself(internal=True)
| standby.to.itself(internal=True)
)
def is_hot(self, temperature: float):
return temperature > 25
def is_good(self, temperature: float):
return temperature < 20
def is_cool(self, temperature: float):
return temperature < 18
def after_transition(self, event: str, source: State, target: State, event_data):
print(f"Running {event} from {source!s} to {target!s}: {event_data.trigger_data.kwargs!r}") |
Beta Was this translation helpful? Give feedback.
-
I've read the docs and have a few questions about events and queues. I couldn't find any more information about queues than was mentioned in the processing model section on RTC v non-RTC.
I'm trying to figure out how to use this library to transition a project from hard embedded C to Python running on Linux. In the current project I do not have a queue per se. I read the sensor data and process that data into transitions though conditions and guards. Each condition/guard combination is unique so only one (or no) transition is possible per iteration of the decision structure. I don't care if the sensor data changes while the state machine is processing.
This suggests that for real-time data I could run the Python state machine in imperative mode with RTC processing where each transition looks at the sensor data parameters as conditions, possibly with guards. Or It could be event based and evaluate the conditions in an if-elseif-else decision structure (similar to current approach) and call the state machine with the resulting event. Can I (should I) mix the two methods?
I like the event based approach because it abstracts the technical details of conditions away from the state machine. For example, all the state machine would have to know is when it receives a start event to transition to the running state from stopped. What causes start/stop events? The state machine should not care. This might make it easier to debug the state machine as I could provide an alternative decision structure that uses synthetic data.
BTW I really like the observers. Makes it easy to implement logging as in your example and also to do things like control a led indicator based on the current machine state. Domain models are also nice for abstraction. I will have to experiment of course, but I'm just trying to get a basic approach first. Thanks.
Beta Was this translation helpful? Give feedback.
All reactions