Skip to content

Commit

Permalink
REF: Rename "message" to "event" in event handler classes
Browse files Browse the repository at this point in the history
  • Loading branch information
cortadocodes committed Mar 6, 2024
1 parent 585e0a8 commit 9d25f0a
Show file tree
Hide file tree
Showing 5 changed files with 159 additions and 160 deletions.
174 changes: 87 additions & 87 deletions octue/cloud/events/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,28 +34,28 @@ def __init__(
self,
receiving_service,
handle_monitor_message=None,
record_messages=True,
record_events=True,
service_name="REMOTE",
message_handlers=None,
event_handlers=None,
schema=SERVICE_COMMUNICATION_SCHEMA,
skip_missing_messages_after=10,
skip_missing_events_after=10,
):
self.receiving_service = receiving_service
self.handle_monitor_message = handle_monitor_message
self.record_messages = record_messages
self.record_events = record_events
self.service_name = service_name
self.schema = schema

self.waiting_messages = None
self.handled_messages = []
self._previous_message_number = -1
self.waiting_events = None
self.handled_events = []
self._previous_event_number = -1
self._child_sdk_version = None

self.skip_missing_messages_after = skip_missing_messages_after
self._missing_message_detection_time = None
self._earliest_waiting_message_number = math.inf
self.skip_missing_events_after = skip_missing_events_after
self._missing_event_detection_time = None
self._earliest_waiting_event_number = math.inf

self._message_handlers = message_handlers or {
self._event_handlers = event_handlers or {
"delivery_acknowledgement": self._handle_delivery_acknowledgement,
"heartbeat": self._handle_heartbeat,
"monitor_message": self._handle_monitor_message,
Expand All @@ -67,16 +67,16 @@ def __init__(
self._log_message_colours = [COLOUR_PALETTE[1], *COLOUR_PALETTE[3:]]

@property
def time_since_missing_message(self):
"""Get the amount of time elapsed since the last missing message was detected. If no missing messages have been
def time_since_missing_event(self):
"""Get the amount of time elapsed since the last missing event was detected. If no missing events have been
detected or they've already been skipped past, `None` is returned.
:return float|None:
"""
if self._missing_message_detection_time is None:
if self._missing_event_detection_time is None:
return None

return time.perf_counter() - self._missing_message_detection_time
return time.perf_counter() - self._missing_event_detection_time

@abc.abstractmethod
def handle_events(self, *args, **kwargs):
Expand All @@ -87,12 +87,12 @@ def _extract_event_and_attributes(self, event):
pass

def _extract_and_enqueue_event(self, event):
"""Extract an event from the Pub/Sub message and add it to `self.waiting_messages`.
"""Extract an event from the Pub/Sub message and add it to `self.waiting_events`.
:param dict event:
:return None:
"""
logger.debug("%r received a message related to question %r.", self.receiving_service, self.question_uuid)
logger.debug("%r received an event related to question %r.", self.receiving_service, self.question_uuid)
event, attributes = self._extract_event_and_attributes(event)

if not is_event_valid(
Expand All @@ -105,137 +105,137 @@ def _extract_and_enqueue_event(self, event):
):
return

# Get the child's Octue SDK version from the first message.
# Get the child's Octue SDK version from the first event.
if not self._child_sdk_version:
self._child_sdk_version = attributes["version"]

message_number = attributes["message_number"]
event_number = attributes["message_number"]

if message_number in self.waiting_messages:
if event_number in self.waiting_events:
logger.warning(
"%r: Message with duplicate message number %d received for question %s - overwriting original message.",
"%r: Event with duplicate event number %d received for question %s - overwriting original event.",
self.receiving_service,
message_number,
event_number,
self.question_uuid,
)

self.waiting_messages[message_number] = event
self.waiting_events[event_number] = event

def _attempt_to_handle_waiting_messages(self):
"""Attempt to handle messages waiting in `self.waiting_messages`. If these messages aren't consecutive to the
last handled message (i.e. if messages have been received out of order and the next in-order message hasn't been
received yet), just return. After the missing message wait time has passed, if this set of missing messages
haven't arrived but subsequent ones have, skip to the earliest waiting message and continue from there.
def _attempt_to_handle_waiting_events(self):
"""Attempt to handle events waiting in `self.waiting_events`. If these events aren't consecutive to the
last handled event (i.e. if events have been received out of order and the next in-order event hasn't been
received yet), just return. After the missing event wait time has passed, if this set of missing events
haven't arrived but subsequent ones have, skip to the earliest waiting event and continue from there.
:return any|None: either a non-`None` result from a message handler or `None` if nothing was returned by the message handlers or if the next in-order message hasn't been received yet
:return any|None: either a non-`None` result from a event handler or `None` if nothing was returned by the event handlers or if the next in-order event hasn't been received yet
"""
while self.waiting_messages:
while self.waiting_events:
try:
# If the next consecutive message has been received:
message = self.waiting_messages.pop(self._previous_message_number + 1)
# If the next consecutive event has been received:
event = self.waiting_events.pop(self._previous_event_number + 1)

# If the next consecutive message hasn't been received:
# If the next consecutive event hasn't been received:
except KeyError:
# Start the missing message timer if it isn't already running.
if self._missing_message_detection_time is None:
self._missing_message_detection_time = time.perf_counter()
# Start the missing event timer if it isn't already running.
if self._missing_event_detection_time is None:
self._missing_event_detection_time = time.perf_counter()

if self.time_since_missing_message > self.skip_missing_messages_after:
message = self._skip_to_earliest_waiting_message()
if self.time_since_missing_event > self.skip_missing_events_after:
event = self._skip_to_earliest_waiting_event()

# Declare there are no more missing messages.
self._missing_message_detection_time = None
# Declare there are no more missing events.
self._missing_event_detection_time = None

if not message:
if not event:
return

else:
return

result = self._handle_message(message)
result = self._handle_event(event)

if result is not None:
return result

def _skip_to_earliest_waiting_message(self):
"""Get the earliest waiting message and set the message handler up to continue from it.
def _skip_to_earliest_waiting_event(self):
"""Get the earliest waiting event and set the event handler up to continue from it.
:return dict|None:
"""
try:
message = self.waiting_messages.pop(self._earliest_waiting_message_number)
event = self.waiting_events.pop(self._earliest_waiting_event_number)
except KeyError:
return

number_of_missing_messages = self._earliest_waiting_message_number - self._previous_message_number - 1
number_of_missing_events = self._earliest_waiting_event_number - self._previous_event_number - 1

# Let the message handler know it can handle the next earliest message.
self._previous_message_number = self._earliest_waiting_message_number - 1
# Let the event handler know it can handle the next earliest event.
self._previous_event_number = self._earliest_waiting_event_number - 1

logger.warning(
"%r: %d consecutive messages missing for question %r after %ds - skipping to next earliest waiting message "
"(message %d).",
"%r: %d consecutive events missing for question %r after %ds - skipping to next earliest waiting event "
"(event %d).",
self.receiving_service,
number_of_missing_messages,
number_of_missing_events,
self.question_uuid,
self.skip_missing_messages_after,
self._earliest_waiting_message_number,
self.skip_missing_events_after,
self._earliest_waiting_event_number,
)

return message
return event

def _handle_message(self, message):
"""Pass a message to its handler and update the previous message number.
def _handle_event(self, event):
"""Pass an event to its handler and update the previous event number.
:param dict message:
:param dict event:
:return dict|None:
"""
self._previous_message_number += 1
self._previous_event_number += 1

if self.record_messages:
self.handled_messages.append(message)
if self.record_events:
self.handled_events.append(event)

handler = self._message_handlers[message["kind"]]
return handler(message)
handler = self._event_handlers[event["kind"]]
return handler(event)

def _handle_delivery_acknowledgement(self, message):
def _handle_delivery_acknowledgement(self, event):
"""Mark the question as delivered to prevent resending it.
:param dict message:
:param dict event:
:return None:
"""
logger.info("%r's question was delivered at %s.", self.receiving_service, message["datetime"])
logger.info("%r's question was delivered at %s.", self.receiving_service, event["datetime"])

def _handle_heartbeat(self, message):
def _handle_heartbeat(self, event):
"""Record the time the heartbeat was received.
:param dict message:
:param dict event:
:return None:
"""
self._last_heartbeat = datetime.now()
logger.info("Heartbeat received from service %r for question %r.", self.service_name, self.question_uuid)

def _handle_monitor_message(self, message):
def _handle_monitor_message(self, event):
"""Send a monitor message to the handler if one has been provided.
:param dict message:
:param dict event:
:return None:
"""
logger.debug("%r received a monitor message.", self.receiving_service)

if self.handle_monitor_message is not None:
self.handle_monitor_message(message["data"])
self.handle_monitor_message(event["data"])

def _handle_log_message(self, message):
"""Deserialise the message into a log record and pass it to the local log handlers, adding [<service-name>] to
def _handle_log_message(self, event):
"""Deserialise the event into a log record and pass it to the local log handlers, adding [<service-name>] to
the start of the log message.
:param dict message:
:param dict event:
:return None:
"""
record = logging.makeLogRecord(message["log_record"])
record = logging.makeLogRecord(event["log_record"])

# Add information about the immediate child sending the message and colour it with the first colour in the
# Add information about the immediate child sending the event and colour it with the first colour in the
# colour palette.
immediate_child_analysis_section = colourise(
f"[{self.service_name} | analysis-{self.question_uuid}]",
Expand All @@ -255,41 +255,41 @@ def _handle_log_message(self, message):
record.msg = " ".join([immediate_child_analysis_section, *subchild_analysis_sections, final_message])
logger.handle(record)

def _handle_exception(self, message):
def _handle_exception(self, event):
"""Raise the exception from the responding service that is serialised in `data`.
:param dict message:
:param dict event:
:raise Exception:
:return None:
"""
exception_message = "\n\n".join(
(
message["exception_message"],
event["exception_message"],
f"The following traceback was captured from the remote service {self.service_name!r}:",
"".join(message["exception_traceback"]),
"".join(event["exception_traceback"]),
)
)

try:
exception_type = EXCEPTIONS_MAPPING[message["exception_type"]]
exception_type = EXCEPTIONS_MAPPING[event["exception_type"]]

# Allow unknown exception types to still be raised.
except KeyError:
exception_type = type(message["exception_type"], (Exception,), {})
exception_type = type(event["exception_type"], (Exception,), {})

raise exception_type(exception_message)

def _handle_result(self, message):
"""Convert the result to the correct form, deserialising the output manifest if it is present in the message.
def _handle_result(self, event):
"""Convert the result to the correct form, deserialising the output manifest if it is present in the event.
:param dict message:
:param dict event:
:return dict:
"""
logger.info("%r received an answer to question %r.", self.receiving_service, self.question_uuid)

if message.get("output_manifest"):
output_manifest = Manifest.deserialise(message["output_manifest"])
if event.get("output_manifest"):
output_manifest = Manifest.deserialise(event["output_manifest"])
else:
output_manifest = None

return {"output_values": message.get("output_values"), "output_manifest": output_manifest}
return {"output_values": event.get("output_values"), "output_manifest": output_manifest}
18 changes: 9 additions & 9 deletions octue/cloud/events/replayer.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,31 +14,31 @@ def __init__(
self,
receiving_service=None,
handle_monitor_message=None,
record_messages=True,
record_events=True,
service_name="REMOTE",
message_handlers=None,
event_handlers=None,
schema=SERVICE_COMMUNICATION_SCHEMA,
):
super().__init__(
receiving_service or Service(backend=ServiceBackend(), service_id="local/local:local"),
handle_monitor_message=handle_monitor_message,
record_messages=record_messages,
record_events=record_events,
service_name=service_name,
message_handlers=message_handlers,
event_handlers=event_handlers,
schema=schema,
skip_missing_messages_after=0,
skip_missing_events_after=0,
)

def handle_events(self, events):
self.question_uuid = events[0]["attributes"]["question_uuid"]
self.waiting_messages = {}
self._previous_message_number = -1
self.waiting_events = {}
self._previous_event_number = -1

for event in events:
self._extract_and_enqueue_event(event)

self._earliest_waiting_message_number = min(self.waiting_messages.keys())
return self._attempt_to_handle_waiting_messages()
self._earliest_waiting_event_number = min(self.waiting_events.keys())
return self._attempt_to_handle_waiting_events()

def _extract_event_and_attributes(self, event):
event["attributes"]["message_number"] = int(event["attributes"]["message_number"])
Expand Down
Loading

0 comments on commit 9d25f0a

Please sign in to comment.