From 9d25f0a726f7c6de36fd573709ca37c37739a19d Mon Sep 17 00:00:00 2001 From: cortadocodes Date: Wed, 6 Mar 2024 18:19:22 +0000 Subject: [PATCH] REF: Rename "message" to "event" in event handler classes --- octue/cloud/events/handler.py | 174 ++++++++++---------- octue/cloud/events/replayer.py | 18 +- octue/cloud/pub_sub/event_handler.py | 59 ++++--- octue/cloud/pub_sub/service.py | 4 +- tests/cloud/pub_sub/test_message_handler.py | 64 +++---- 5 files changed, 159 insertions(+), 160 deletions(-) diff --git a/octue/cloud/events/handler.py b/octue/cloud/events/handler.py index 103135dbf..1e23e0a80 100644 --- a/octue/cloud/events/handler.py +++ b/octue/cloud/events/handler.py @@ -34,28 +34,28 @@ def __init__( self, receiving_service, handle_monitor_message=None, - record_messages=True, + record_events=True, service_name="REMOTE", - message_handlers=None, + event_handlers=None, schema=SERVICE_COMMUNICATION_SCHEMA, - skip_missing_messages_after=10, + skip_missing_events_after=10, ): self.receiving_service = receiving_service self.handle_monitor_message = handle_monitor_message - self.record_messages = record_messages + self.record_events = record_events self.service_name = service_name self.schema = schema - self.waiting_messages = None - self.handled_messages = [] - self._previous_message_number = -1 + self.waiting_events = None + self.handled_events = [] + self._previous_event_number = -1 self._child_sdk_version = None - self.skip_missing_messages_after = skip_missing_messages_after - self._missing_message_detection_time = None - self._earliest_waiting_message_number = math.inf + self.skip_missing_events_after = skip_missing_events_after + self._missing_event_detection_time = None + self._earliest_waiting_event_number = math.inf - self._message_handlers = message_handlers or { + self._event_handlers = event_handlers or { "delivery_acknowledgement": self._handle_delivery_acknowledgement, "heartbeat": self._handle_heartbeat, "monitor_message": self._handle_monitor_message, @@ -67,16 +67,16 @@ def __init__( self._log_message_colours = [COLOUR_PALETTE[1], *COLOUR_PALETTE[3:]] @property - def time_since_missing_message(self): - """Get the amount of time elapsed since the last missing message was detected. If no missing messages have been + def time_since_missing_event(self): + """Get the amount of time elapsed since the last missing event was detected. If no missing events have been detected or they've already been skipped past, `None` is returned. :return float|None: """ - if self._missing_message_detection_time is None: + if self._missing_event_detection_time is None: return None - return time.perf_counter() - self._missing_message_detection_time + return time.perf_counter() - self._missing_event_detection_time @abc.abstractmethod def handle_events(self, *args, **kwargs): @@ -87,12 +87,12 @@ def _extract_event_and_attributes(self, event): pass def _extract_and_enqueue_event(self, event): - """Extract an event from the Pub/Sub message and add it to `self.waiting_messages`. + """Extract an event from the Pub/Sub message and add it to `self.waiting_events`. :param dict event: :return None: """ - logger.debug("%r received a message related to question %r.", self.receiving_service, self.question_uuid) + logger.debug("%r received an event related to question %r.", self.receiving_service, self.question_uuid) event, attributes = self._extract_event_and_attributes(event) if not is_event_valid( @@ -105,137 +105,137 @@ def _extract_and_enqueue_event(self, event): ): return - # Get the child's Octue SDK version from the first message. + # Get the child's Octue SDK version from the first event. if not self._child_sdk_version: self._child_sdk_version = attributes["version"] - message_number = attributes["message_number"] + event_number = attributes["message_number"] - if message_number in self.waiting_messages: + if event_number in self.waiting_events: logger.warning( - "%r: Message with duplicate message number %d received for question %s - overwriting original message.", + "%r: Event with duplicate event number %d received for question %s - overwriting original event.", self.receiving_service, - message_number, + event_number, self.question_uuid, ) - self.waiting_messages[message_number] = event + self.waiting_events[event_number] = event - def _attempt_to_handle_waiting_messages(self): - """Attempt to handle messages waiting in `self.waiting_messages`. If these messages aren't consecutive to the - last handled message (i.e. if messages have been received out of order and the next in-order message hasn't been - received yet), just return. After the missing message wait time has passed, if this set of missing messages - haven't arrived but subsequent ones have, skip to the earliest waiting message and continue from there. + def _attempt_to_handle_waiting_events(self): + """Attempt to handle events waiting in `self.waiting_events`. If these events aren't consecutive to the + last handled event (i.e. if events have been received out of order and the next in-order event hasn't been + received yet), just return. After the missing event wait time has passed, if this set of missing events + haven't arrived but subsequent ones have, skip to the earliest waiting event and continue from there. - :return any|None: either a non-`None` result from a message handler or `None` if nothing was returned by the message handlers or if the next in-order message hasn't been received yet + :return any|None: either a non-`None` result from a event handler or `None` if nothing was returned by the event handlers or if the next in-order event hasn't been received yet """ - while self.waiting_messages: + while self.waiting_events: try: - # If the next consecutive message has been received: - message = self.waiting_messages.pop(self._previous_message_number + 1) + # If the next consecutive event has been received: + event = self.waiting_events.pop(self._previous_event_number + 1) - # If the next consecutive message hasn't been received: + # If the next consecutive event hasn't been received: except KeyError: - # Start the missing message timer if it isn't already running. - if self._missing_message_detection_time is None: - self._missing_message_detection_time = time.perf_counter() + # Start the missing event timer if it isn't already running. + if self._missing_event_detection_time is None: + self._missing_event_detection_time = time.perf_counter() - if self.time_since_missing_message > self.skip_missing_messages_after: - message = self._skip_to_earliest_waiting_message() + if self.time_since_missing_event > self.skip_missing_events_after: + event = self._skip_to_earliest_waiting_event() - # Declare there are no more missing messages. - self._missing_message_detection_time = None + # Declare there are no more missing events. + self._missing_event_detection_time = None - if not message: + if not event: return else: return - result = self._handle_message(message) + result = self._handle_event(event) if result is not None: return result - def _skip_to_earliest_waiting_message(self): - """Get the earliest waiting message and set the message handler up to continue from it. + def _skip_to_earliest_waiting_event(self): + """Get the earliest waiting event and set the event handler up to continue from it. :return dict|None: """ try: - message = self.waiting_messages.pop(self._earliest_waiting_message_number) + event = self.waiting_events.pop(self._earliest_waiting_event_number) except KeyError: return - number_of_missing_messages = self._earliest_waiting_message_number - self._previous_message_number - 1 + number_of_missing_events = self._earliest_waiting_event_number - self._previous_event_number - 1 - # Let the message handler know it can handle the next earliest message. - self._previous_message_number = self._earliest_waiting_message_number - 1 + # Let the event handler know it can handle the next earliest event. + self._previous_event_number = self._earliest_waiting_event_number - 1 logger.warning( - "%r: %d consecutive messages missing for question %r after %ds - skipping to next earliest waiting message " - "(message %d).", + "%r: %d consecutive events missing for question %r after %ds - skipping to next earliest waiting event " + "(event %d).", self.receiving_service, - number_of_missing_messages, + number_of_missing_events, self.question_uuid, - self.skip_missing_messages_after, - self._earliest_waiting_message_number, + self.skip_missing_events_after, + self._earliest_waiting_event_number, ) - return message + return event - def _handle_message(self, message): - """Pass a message to its handler and update the previous message number. + def _handle_event(self, event): + """Pass an event to its handler and update the previous event number. - :param dict message: + :param dict event: :return dict|None: """ - self._previous_message_number += 1 + self._previous_event_number += 1 - if self.record_messages: - self.handled_messages.append(message) + if self.record_events: + self.handled_events.append(event) - handler = self._message_handlers[message["kind"]] - return handler(message) + handler = self._event_handlers[event["kind"]] + return handler(event) - def _handle_delivery_acknowledgement(self, message): + def _handle_delivery_acknowledgement(self, event): """Mark the question as delivered to prevent resending it. - :param dict message: + :param dict event: :return None: """ - logger.info("%r's question was delivered at %s.", self.receiving_service, message["datetime"]) + logger.info("%r's question was delivered at %s.", self.receiving_service, event["datetime"]) - def _handle_heartbeat(self, message): + def _handle_heartbeat(self, event): """Record the time the heartbeat was received. - :param dict message: + :param dict event: :return None: """ self._last_heartbeat = datetime.now() logger.info("Heartbeat received from service %r for question %r.", self.service_name, self.question_uuid) - def _handle_monitor_message(self, message): + def _handle_monitor_message(self, event): """Send a monitor message to the handler if one has been provided. - :param dict message: + :param dict event: :return None: """ logger.debug("%r received a monitor message.", self.receiving_service) if self.handle_monitor_message is not None: - self.handle_monitor_message(message["data"]) + self.handle_monitor_message(event["data"]) - def _handle_log_message(self, message): - """Deserialise the message into a log record and pass it to the local log handlers, adding [] to + def _handle_log_message(self, event): + """Deserialise the event into a log record and pass it to the local log handlers, adding [] to the start of the log message. - :param dict message: + :param dict event: :return None: """ - record = logging.makeLogRecord(message["log_record"]) + record = logging.makeLogRecord(event["log_record"]) - # Add information about the immediate child sending the message and colour it with the first colour in the + # Add information about the immediate child sending the event and colour it with the first colour in the # colour palette. immediate_child_analysis_section = colourise( f"[{self.service_name} | analysis-{self.question_uuid}]", @@ -255,41 +255,41 @@ def _handle_log_message(self, message): record.msg = " ".join([immediate_child_analysis_section, *subchild_analysis_sections, final_message]) logger.handle(record) - def _handle_exception(self, message): + def _handle_exception(self, event): """Raise the exception from the responding service that is serialised in `data`. - :param dict message: + :param dict event: :raise Exception: :return None: """ exception_message = "\n\n".join( ( - message["exception_message"], + event["exception_message"], f"The following traceback was captured from the remote service {self.service_name!r}:", - "".join(message["exception_traceback"]), + "".join(event["exception_traceback"]), ) ) try: - exception_type = EXCEPTIONS_MAPPING[message["exception_type"]] + exception_type = EXCEPTIONS_MAPPING[event["exception_type"]] # Allow unknown exception types to still be raised. except KeyError: - exception_type = type(message["exception_type"], (Exception,), {}) + exception_type = type(event["exception_type"], (Exception,), {}) raise exception_type(exception_message) - def _handle_result(self, message): - """Convert the result to the correct form, deserialising the output manifest if it is present in the message. + def _handle_result(self, event): + """Convert the result to the correct form, deserialising the output manifest if it is present in the event. - :param dict message: + :param dict event: :return dict: """ logger.info("%r received an answer to question %r.", self.receiving_service, self.question_uuid) - if message.get("output_manifest"): - output_manifest = Manifest.deserialise(message["output_manifest"]) + if event.get("output_manifest"): + output_manifest = Manifest.deserialise(event["output_manifest"]) else: output_manifest = None - return {"output_values": message.get("output_values"), "output_manifest": output_manifest} + return {"output_values": event.get("output_values"), "output_manifest": output_manifest} diff --git a/octue/cloud/events/replayer.py b/octue/cloud/events/replayer.py index 1adf7ebf7..7d977d4d0 100644 --- a/octue/cloud/events/replayer.py +++ b/octue/cloud/events/replayer.py @@ -14,31 +14,31 @@ def __init__( self, receiving_service=None, handle_monitor_message=None, - record_messages=True, + record_events=True, service_name="REMOTE", - message_handlers=None, + event_handlers=None, schema=SERVICE_COMMUNICATION_SCHEMA, ): super().__init__( receiving_service or Service(backend=ServiceBackend(), service_id="local/local:local"), handle_monitor_message=handle_monitor_message, - record_messages=record_messages, + record_events=record_events, service_name=service_name, - message_handlers=message_handlers, + event_handlers=event_handlers, schema=schema, - skip_missing_messages_after=0, + skip_missing_events_after=0, ) def handle_events(self, events): self.question_uuid = events[0]["attributes"]["question_uuid"] - self.waiting_messages = {} - self._previous_message_number = -1 + self.waiting_events = {} + self._previous_event_number = -1 for event in events: self._extract_and_enqueue_event(event) - self._earliest_waiting_message_number = min(self.waiting_messages.keys()) - return self._attempt_to_handle_waiting_messages() + self._earliest_waiting_event_number = min(self.waiting_events.keys()) + return self._attempt_to_handle_waiting_events() def _extract_event_and_attributes(self, event): event["attributes"]["message_number"] = int(event["attributes"]["message_number"]) diff --git a/octue/cloud/pub_sub/event_handler.py b/octue/cloud/pub_sub/event_handler.py index 4bcac4119..7e9da7070 100644 --- a/octue/cloud/pub_sub/event_handler.py +++ b/octue/cloud/pub_sub/event_handler.py @@ -20,17 +20,16 @@ class PubSubEventHandler(AbstractEventHandler): - """A handler for Google Pub/Sub messages received via a pull subscription that ensures messages are handled in the - order they were sent. + """A handler for events received as Google Pub/Sub messages from a pull subscription. :param octue.cloud.pub_sub.subscription.Subscription subscription: the subscription messages are pulled from - :param octue.cloud.pub_sub.service.Service receiving_service: the service that's receiving the messages + :param octue.cloud.pub_sub.service.Service receiving_service: the service that's receiving the events :param callable|None handle_monitor_message: a function to handle monitor messages (e.g. send them to an endpoint for plotting or displaying) - this function should take a single JSON-compatible python primitive - :param bool record_messages: if `True`, record received messages in the `received_messages` attribute + :param bool record_events: if `True`, record received events in the `received_events` attribute :param str service_name: an arbitrary name to refer to the service subscribed to by (used for labelling its remote log messages) - :param dict|None message_handlers: a mapping of message type names to callables that handle each type of message. The handlers should not mutate the messages. - :param dict|str schema: the JSON schema (or URI of one) to validate messages against - :param int|float skip_missing_messages_after: the number of seconds after which to skip any messages if they haven't arrived but subsequent messages have + :param dict|None event_handlers: a mapping of event type names to callables that handle each type of event. The handlers should not mutate the events. + :param dict|str schema: the JSON schema (or URI of one) to validate events against + :param int|float skip_missing_events_after: the number of seconds after which to skip any events if they haven't arrived but subsequent events have :return None: """ @@ -39,26 +38,26 @@ def __init__( subscription, receiving_service, handle_monitor_message=None, - record_messages=True, + record_events=True, service_name="REMOTE", - message_handlers=None, + event_handlers=None, schema=SERVICE_COMMUNICATION_SCHEMA, - skip_missing_messages_after=10, + skip_missing_events_after=10, ): self.subscription = subscription super().__init__( receiving_service, handle_monitor_message=handle_monitor_message, - record_messages=record_messages, + record_events=record_events, service_name=service_name, - message_handlers=message_handlers, + event_handlers=event_handlers, schema=schema, - skip_missing_messages_after=skip_missing_messages_after, + skip_missing_events_after=skip_missing_events_after, ) self.question_uuid = self.subscription.path.split(".")[-1] - self.waiting_messages = None + self.waiting_events = None self._subscriber = SubscriberClient() self._heartbeat_checker = None self._last_heartbeat = None @@ -89,17 +88,17 @@ def _time_since_last_heartbeat(self): return datetime.now() - self._last_heartbeat def handle_events(self, timeout=60, maximum_heartbeat_interval=300): - """Pull messages and handle them in the order they were sent until a result is returned by a message handler, + """Pull events and handle them in the order they were sent until a result is returned by a event handler, then return that result. :param float|None timeout: how long to wait for an answer before raising a `TimeoutError` :param int|float maximum_heartbeat_interval: the maximum amount of time (in seconds) allowed between child heartbeats before an error is raised - :raise TimeoutError: if the timeout is exceeded before receiving the final message - :return dict: the first result returned by a message handler + :raise TimeoutError: if the timeout is exceeded before receiving the final event + :return dict: the first result returned by a event handler """ self._start_time = time.perf_counter() - self.waiting_messages = {} - self._previous_message_number = -1 + self.waiting_events = {} + self._previous_event_number = -1 self._heartbeat_checker = RepeatingTimer( interval=maximum_heartbeat_interval, @@ -113,8 +112,8 @@ def handle_events(self, timeout=60, maximum_heartbeat_interval=300): while self._alive: pull_timeout = self._check_timeout_and_get_pull_timeout(timeout) - self._pull_and_enqueue_available_messages(timeout=pull_timeout) - result = self._attempt_to_handle_waiting_messages() + self._pull_and_enqueue_available_events(timeout=pull_timeout) + result = self._attempt_to_handle_waiting_events() if result is not None: return result @@ -165,11 +164,11 @@ def _check_timeout_and_get_pull_timeout(self, timeout): return timeout - total_run_time - def _pull_and_enqueue_available_messages(self, timeout): - """Pull as many messages from the subscription as are available and enqueue them in `self.waiting_messages`, + def _pull_and_enqueue_available_events(self, timeout): + """Pull as many events from the subscription as are available and enqueue them in `self.waiting_events`, raising a `TimeoutError` if the timeout is exceeded before succeeding. - :param float|None timeout: how long to wait in seconds for the message before raising a `TimeoutError` + :param float|None timeout: how long to wait in seconds for the event before raising a `TimeoutError` :raise TimeoutError|concurrent.futures.TimeoutError: if the timeout is exceeded :return None: """ @@ -177,7 +176,7 @@ def _pull_and_enqueue_available_messages(self, timeout): attempt = 1 while self._alive: - logger.debug("Pulling messages from Google Pub/Sub: attempt %d.", attempt) + logger.debug("Pulling events from Google Pub/Sub: attempt %d.", attempt) pull_response = self._subscriber.pull( request={"subscription": self.subscription.path, "max_messages": MAX_SIMULTANEOUS_MESSAGES_PULL}, @@ -207,10 +206,10 @@ def _pull_and_enqueue_available_messages(self, timeout): } ) - for message in pull_response.received_messages: - self._extract_and_enqueue_event(message) + for event in pull_response.received_messages: + self._extract_and_enqueue_event(event) - self._earliest_waiting_message_number = min(self.waiting_messages.keys()) + self._earliest_waiting_event_number = min(self.waiting_events.keys()) - def _extract_event_and_attributes(self, message): - return extract_event_and_attributes_from_pub_sub(message.message) + def _extract_event_and_attributes(self, event): + return extract_event_and_attributes_from_pub_sub(event.message) diff --git a/octue/cloud/pub_sub/service.py b/octue/cloud/pub_sub/service.py index 06be5520f..b4dc1eea8 100644 --- a/octue/cloud/pub_sub/service.py +++ b/octue/cloud/pub_sub/service.py @@ -124,7 +124,7 @@ def received_messages(self): :return list(dict)|None: """ if self._event_handler: - return self._event_handler.handled_messages + return self._event_handler.handled_events return None def serve(self, timeout=None, delete_topic_and_subscription_on_exit=False, allow_existing=False, detach=False): @@ -396,7 +396,7 @@ def wait_for_answer( receiving_service=self, handle_monitor_message=handle_monitor_message, service_name=service_name, - record_messages=record_messages, + record_events=record_messages, ) try: diff --git a/tests/cloud/pub_sub/test_message_handler.py b/tests/cloud/pub_sub/test_message_handler.py index e88c3537b..e5ab4f7d4 100644 --- a/tests/cloud/pub_sub/test_message_handler.py +++ b/tests/cloud/pub_sub/test_message_handler.py @@ -43,7 +43,7 @@ def test_timeout(self): message_handler = PubSubEventHandler( subscription=mock_subscription, receiving_service=parent, - message_handlers={"test": lambda message: None, "finish-test": lambda message: message}, + event_handlers={"test": lambda message: None, "finish-test": lambda message: message}, schema={}, ) @@ -58,7 +58,7 @@ def test_in_order_messages_are_handled_in_order(self): message_handler = PubSubEventHandler( subscription=mock_subscription, receiving_service=parent, - message_handlers={"test": lambda message: None, "finish-test": lambda message: "This is the result."}, + event_handlers={"test": lambda message: None, "finish-test": lambda message: "This is the result."}, schema={}, ) @@ -78,7 +78,7 @@ def test_in_order_messages_are_handled_in_order(self): self.assertEqual(result, "This is the result.") self.assertEqual( - message_handler.handled_messages, + message_handler.handled_events, [{"kind": "test"}, {"kind": "test"}, {"kind": "test"}, {"kind": "finish-test"}], ) @@ -90,7 +90,7 @@ def test_out_of_order_messages_are_handled_in_order(self): message_handler = PubSubEventHandler( subscription=mock_subscription, receiving_service=parent, - message_handlers={"test": lambda message: None, "finish-test": lambda message: "This is the result."}, + event_handlers={"test": lambda message: None, "finish-test": lambda message: "This is the result."}, schema={}, ) @@ -124,7 +124,7 @@ def test_out_of_order_messages_are_handled_in_order(self): self.assertEqual(result, "This is the result.") self.assertEqual( - message_handler.handled_messages, + message_handler.handled_events, [ {"kind": "test", "order": 0}, {"kind": "test", "order": 1}, @@ -143,7 +143,7 @@ def test_out_of_order_messages_with_end_message_first_are_handled_in_order(self) message_handler = PubSubEventHandler( subscription=mock_subscription, receiving_service=parent, - message_handlers={"test": lambda message: None, "finish-test": lambda message: "This is the result."}, + event_handlers={"test": lambda message: None, "finish-test": lambda message: "This is the result."}, schema={}, ) @@ -177,7 +177,7 @@ def test_out_of_order_messages_with_end_message_first_are_handled_in_order(self) self.assertEqual(result, "This is the result.") self.assertEqual( - message_handler.handled_messages, + message_handler.handled_events, [ {"kind": "test", "order": 0}, {"kind": "test", "order": 1}, @@ -194,7 +194,7 @@ def test_no_timeout(self): message_handler = PubSubEventHandler( subscription=mock_subscription, receiving_service=parent, - message_handlers={"test": lambda message: None, "finish-test": lambda message: "This is the result."}, + event_handlers={"test": lambda message: None, "finish-test": lambda message: "This is the result."}, schema={}, ) @@ -222,7 +222,7 @@ def test_no_timeout(self): self.assertEqual(result, "This is the result.") self.assertEqual( - message_handler.handled_messages, + message_handler.handled_events, [{"kind": "test", "order": 0}, {"kind": "test", "order": 1}, {"kind": "finish-test", "order": 2}], ) @@ -336,7 +336,7 @@ def test_time_since_missing_message_is_none_if_no_unhandled_missing_messages(sel """ question_uuid, _, mock_subscription = create_mock_topic_and_subscription() message_handler = PubSubEventHandler(subscription=mock_subscription, receiving_service=parent) - self.assertIsNone(message_handler.time_since_missing_message) + self.assertIsNone(message_handler.time_since_missing_event) def test_missing_messages_at_start_can_be_skipped(self): """Test that missing messages at the start of the event stream can be skipped if they aren't received after a @@ -348,9 +348,9 @@ def test_missing_messages_at_start_can_be_skipped(self): message_handler = PubSubEventHandler( subscription=mock_subscription, receiving_service=parent, - message_handlers={"test": lambda message: None, "finish-test": lambda message: "This is the result."}, + event_handlers={"test": lambda message: None, "finish-test": lambda message: "This is the result."}, schema={}, - skip_missing_messages_after=0, + skip_missing_events_after=0, ) # Simulate the first two messages not being received. @@ -384,7 +384,7 @@ def test_missing_messages_at_start_can_be_skipped(self): self.assertEqual(result, "This is the result.") self.assertEqual( - message_handler.handled_messages, + message_handler.handled_events, [ {"kind": "test", "order": 2}, {"kind": "test", "order": 3}, @@ -401,9 +401,9 @@ def test_missing_messages_in_middle_can_skipped(self): message_handler = PubSubEventHandler( subscription=mock_subscription, receiving_service=parent, - message_handlers={"test": lambda message: None, "finish-test": lambda message: "This is the result."}, + event_handlers={"test": lambda message: None, "finish-test": lambda message: "This is the result."}, schema={}, - skip_missing_messages_after=0, + skip_missing_events_after=0, ) child = MockService(backend=GCPPubSubBackend(project_name=TEST_PROJECT_NAME)) @@ -441,7 +441,7 @@ def test_missing_messages_in_middle_can_skipped(self): # Check that all the non-missing messages were handled. self.assertEqual( - message_handler.handled_messages, + message_handler.handled_events, [ {"kind": "test", "order": 0}, {"kind": "test", "order": 1}, @@ -458,9 +458,9 @@ def test_multiple_blocks_of_missing_messages_in_middle_can_skipped(self): message_handler = PubSubEventHandler( subscription=mock_subscription, receiving_service=parent, - message_handlers={"test": lambda message: None, "finish-test": lambda message: "This is the result."}, + event_handlers={"test": lambda message: None, "finish-test": lambda message: "This is the result."}, schema={}, - skip_missing_messages_after=0, + skip_missing_events_after=0, ) child = MockService(backend=GCPPubSubBackend(project_name=TEST_PROJECT_NAME)) @@ -524,7 +524,7 @@ def test_multiple_blocks_of_missing_messages_in_middle_can_skipped(self): # Check that all the non-missing messages were handled. self.assertEqual( - message_handler.handled_messages, + message_handler.handled_events, [ {"kind": "test", "order": 0}, {"kind": "test", "order": 1}, @@ -545,9 +545,9 @@ def test_all_messages_missing_apart_from_result(self): message_handler = PubSubEventHandler( subscription=mock_subscription, receiving_service=parent, - message_handlers={"test": lambda message: None, "finish-test": lambda message: "This is the result."}, + event_handlers={"test": lambda message: None, "finish-test": lambda message: "This is the result."}, schema={}, - skip_missing_messages_after=0, + skip_missing_events_after=0, ) child = MockService(backend=GCPPubSubBackend(project_name=TEST_PROJECT_NAME)) @@ -565,11 +565,11 @@ def test_all_messages_missing_apart_from_result(self): message_handler.handle_events() # Check that the result message was handled. - self.assertEqual(message_handler.handled_messages, [{"kind": "finish-test", "order": 1000}]) + self.assertEqual(message_handler.handled_events, [{"kind": "finish-test", "order": 1000}]) class TestPullAndEnqueueAvailableMessages(BaseTestCase): - def test_pull_and_enqueue_available_messages(self): + def test_pull_and_enqueue_available_events(self): """Test that pulling and enqueuing a message works.""" question_uuid, mock_topic, _ = create_mock_topic_and_subscription() @@ -582,12 +582,12 @@ def test_pull_and_enqueue_available_messages(self): message_handler = PubSubEventHandler( subscription=mock_subscription, receiving_service=parent, - message_handlers={"test": lambda message: None, "finish-test": lambda message: "This is the result."}, + event_handlers={"test": lambda message: None, "finish-test": lambda message: "This is the result."}, schema={}, ) message_handler._child_sdk_version = "0.1.3" - message_handler.waiting_messages = {} + message_handler.waiting_events = {} # Enqueue a mock message for a mock subscription to receive. mock_message = {"kind": "test"} @@ -604,9 +604,9 @@ def test_pull_and_enqueue_available_messages(self): ) ] - message_handler._pull_and_enqueue_available_messages(timeout=10) - self.assertEqual(message_handler.waiting_messages, {0: mock_message}) - self.assertEqual(message_handler._earliest_waiting_message_number, 0) + message_handler._pull_and_enqueue_available_events(timeout=10) + self.assertEqual(message_handler.waiting_events, {0: mock_message}) + self.assertEqual(message_handler._earliest_waiting_event_number, 0) def test_timeout_error_raised_if_result_message_not_received_in_time(self): """Test that a timeout error is raised if a result message is not received in time.""" @@ -621,17 +621,17 @@ def test_timeout_error_raised_if_result_message_not_received_in_time(self): message_handler = PubSubEventHandler( subscription=mock_subscription, receiving_service=parent, - message_handlers={"test": lambda message: None, "finish-test": lambda message: "This is the result."}, + event_handlers={"test": lambda message: None, "finish-test": lambda message: "This is the result."}, ) message_handler._child_sdk_version = "0.1.3" - message_handler.waiting_messages = {} + message_handler.waiting_events = {} message_handler._start_time = 0 # Create a mock subscription. SUBSCRIPTIONS[mock_subscription.name] = [] with self.assertRaises(TimeoutError): - message_handler._pull_and_enqueue_available_messages(timeout=1e-6) + message_handler._pull_and_enqueue_available_events(timeout=1e-6) - self.assertEqual(message_handler._earliest_waiting_message_number, math.inf) + self.assertEqual(message_handler._earliest_waiting_event_number, math.inf)