Skip to content

Commit

Permalink
REF: Use events' actual attributes instead of first event's
Browse files Browse the repository at this point in the history
BREAKING CHANGE: `Service.received_events`, `AbstractEventHandler.handled_events`, and `Child.received_events` now include event attributes instead of just the event. These attributes/properties now return a list of dictionaries with the keys {"event", "attributes"}, where what was previously returned is now mapped to the "event" key.
  • Loading branch information
cortadocodes committed Jun 26, 2024
1 parent bf8cdca commit 39a2303
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 24 deletions.
33 changes: 11 additions & 22 deletions octue/cloud/events/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,6 @@ def __init__(
self.schema = schema
self.only_handle_result = only_handle_result

# These are set when the first event is received.
self.question_uuid = None
self.child_sruid = None
self.child_sdk_version = None

self.waiting_events = None
self.handled_events = []
self._previous_event_number = -1
Expand Down Expand Up @@ -160,21 +155,15 @@ def _extract_and_enqueue_event(self, container):
):
return

# Get the child's SRUID and Octue SDK version from the first event.
if not self.child_sdk_version:
self.question_uuid = attributes["question_uuid"]
self.child_sruid = attributes["sender"]
self.child_sdk_version = attributes["sender_sdk_version"]

logger.debug("%r: Received an event related to question %r.", self.recipient, self.question_uuid)
logger.debug("%r: Received an event related to question %r.", self.recipient, attributes["question_uuid"])
order = attributes["order"]

if order in self.waiting_events:
logger.warning(
"%r: Event with duplicate order %d received for question %r - overwriting original event.",
self.recipient,
order,
self.question_uuid,
attributes["question_uuid"],
)

self.waiting_events[order] = (event, attributes)
Expand Down Expand Up @@ -242,7 +231,7 @@ def _skip_to_earliest_waiting_event(self):
"(event %d).",
self.recipient,
number_of_missing_events,
self.question_uuid,
attributes["question_uuid"],
self.skip_missing_events_after,
self._earliest_waiting_event_number,
)
Expand All @@ -259,7 +248,7 @@ def _handle_event(self, event, attributes):
self._previous_event_number += 1

if self.record_events:
self.handled_events.append(event)
self.handled_events.append({"event": event, "attributes": attributes})

if self.only_handle_result and event["kind"] != "result":
return
Expand Down Expand Up @@ -288,8 +277,8 @@ def _handle_heartbeat(self, event, attributes):
logger.info(
"%r: Received a heartbeat from service %r for question %r.",
self.recipient,
self.child_sruid,
self.question_uuid,
attributes["sender"],
attributes["question_uuid"],
)

def _handle_monitor_message(self, event, attributes):
Expand All @@ -302,8 +291,8 @@ def _handle_monitor_message(self, event, attributes):
logger.debug(
"%r: Received a monitor message from service %r for question %r.",
self.recipient,
self.child_sruid,
self.question_uuid,
attributes["sender"],
attributes["question_uuid"],
)

if self.handle_monitor_message is not None:
Expand All @@ -323,7 +312,7 @@ def _handle_log_message(self, event, attributes):
# Add information about the immediate child sending the event and colour it with the first colour in the
# colour palette.
immediate_child_analysis_section = colourise(
f"[{self.child_sruid} | {self.question_uuid}]",
f"[{attributes['sender']} | {attributes['question_uuid']}]",
text_colour=self._log_message_colours[0],
)

Expand Down Expand Up @@ -351,7 +340,7 @@ def _handle_exception(self, event, attributes):
exception_message = "\n\n".join(
(
event["exception_message"],
f"The following traceback was captured from the remote service {self.child_sruid!r}:",
f"The following traceback was captured from the remote service {attributes['sender']!r}:",
"".join(event["exception_traceback"]),
)
)
Expand All @@ -372,7 +361,7 @@ def _handle_result(self, event, attributes):
:param dict attributes: the event's attributes
:return dict:
"""
logger.info("%r: Received an answer to question %r.", self.recipient, self.question_uuid)
logger.info("%r: Received an answer to question %r.", self.recipient, attributes["question_uuid"])

if event.get("output_manifest"):
output_manifest = Manifest.deserialise(event["output_manifest"])
Expand Down
12 changes: 10 additions & 2 deletions octue/cloud/pub_sub/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,17 @@ def handle_events(self, timeout=60, maximum_heartbeat_interval=300):
self._heartbeat_checker.cancel()
self._subscriber.close()

if self.handled_events:
last_event = self.handled_events[-1]
sender = last_event["attributes"]["sender"]
question_uuid = last_event["attributes"]["question_uuid"]
else:
sender = "UNKNOWN"
question_uuid = "UNKNOWN"

raise TimeoutError(
f"No heartbeat has been received from {self.child_sruid!r} for question {self.question_uuid} within the "
f"maximum allowed interval of {maximum_heartbeat_interval}s."
f"No heartbeat has been received from {sender!r} for question {question_uuid} within the maximum allowed "
f"interval of {maximum_heartbeat_interval}s."
)

def _monitor_heartbeat(self, maximum_heartbeat_interval):
Expand Down

0 comments on commit 39a2303

Please sign in to comment.