diff --git a/octue/cli.py b/octue/cli.py index b2b2f52bd..b65476f36 100644 --- a/octue/cli.py +++ b/octue/cli.py @@ -409,7 +409,7 @@ def create_push_subscription( name=pub_sub_sruid, topic=topic, project_name=project_name, - filter='attributes.is_question = "1"', + filter='attributes.sender_type = "parent"', expiration_time=expiration_time, push_endpoint=push_endpoint, ) diff --git a/octue/cloud/emulators/_pub_sub.py b/octue/cloud/emulators/_pub_sub.py index 961839ae1..dad73eff1 100644 --- a/octue/cloud/emulators/_pub_sub.py +++ b/octue/cloud/emulators/_pub_sub.py @@ -358,7 +358,7 @@ def ask( MockMessage( data=json.dumps(question, cls=OctueJSONEncoder).encode(), attributes={ - "is_question": True, + "sender_type": "parent", "question_uuid": question_uuid, "forward_logs": subscribe_to_logs, "octue_sdk_version": parent_sdk_version, diff --git a/octue/cloud/pub_sub/logging.py b/octue/cloud/pub_sub/logging.py index ccc4e2bf2..b120373c4 100644 --- a/octue/cloud/pub_sub/logging.py +++ b/octue/cloud/pub_sub/logging.py @@ -35,7 +35,7 @@ def emit(self, record): "log_record": self._convert_log_record_to_primitives(record), }, topic=self.topic, - attributes={"question_uuid": self.question_uuid, "is_question": False}, + attributes={"question_uuid": self.question_uuid, "sender_type": "child"}, ) except Exception: # noqa diff --git a/octue/cloud/pub_sub/messages.py b/octue/cloud/pub_sub/messages.py index 183720e43..201e310c1 100644 --- a/octue/cloud/pub_sub/messages.py +++ b/octue/cloud/pub_sub/messages.py @@ -8,7 +8,7 @@ def extract_event_and_attributes_from_pub_sub(message): # Cast attributes to dict to avoid defaultdict behaviour. attributes = dict(getattr_or_subscribe(message, "attributes")) - is_question = bool(int(attributes["is_question"])) + sender_type = attributes["sender_type"] question_uuid = attributes["question_uuid"] message_number = int(attributes["message_number"]) octue_sdk_version = attributes["octue_sdk_version"] @@ -35,7 +35,7 @@ def extract_event_and_attributes_from_pub_sub(message): return ( event, { - "is_question": is_question, + "sender_type": sender_type, "question_uuid": question_uuid, "octue_sdk_version": octue_sdk_version, "message_number": message_number, diff --git a/octue/cloud/pub_sub/service.py b/octue/cloud/pub_sub/service.py index 677a13748..69fd47e8b 100644 --- a/octue/cloud/pub_sub/service.py +++ b/octue/cloud/pub_sub/service.py @@ -136,7 +136,7 @@ def serve(self, timeout=None, delete_topic_and_subscription_on_exit=False, allow name=self._pub_sub_id, topic=topic, project_name=self.backend.project_name, - filter='attributes.is_question = "1"', + filter='attributes.sender_type = "parent"', expiration_time=None, ) @@ -252,7 +252,7 @@ def answer(self, question, heartbeat_interval=120, timeout=30): self._send_message( message=result, topic=topic, - attributes={"question_uuid": question_uuid, "is_question": False}, + attributes={"question_uuid": question_uuid, "sender_type": "child"}, timeout=timeout, ) @@ -333,7 +333,7 @@ def ask( name=".".join((topic.name, ANSWERS_NAMESPACE, question_uuid)), topic=topic, project_name=self.backend.project_name, - filter=f'attributes.question_uuid = "{question_uuid}" AND attributes.is_question = "0"', + filter=f'attributes.question_uuid = "{question_uuid}" AND attributes.sender_type = "child"', push_endpoint=push_endpoint, ) answer_subscription.create(allow_existing=False) @@ -355,7 +355,7 @@ def ask( topic=topic, attributes={ "question_uuid": question_uuid, - "is_question": True, + "sender_type": "parent", "forward_logs": subscribe_to_logs, "allow_save_diagnostics_data_on_crash": allow_save_diagnostics_data_on_crash, }, @@ -428,7 +428,7 @@ def send_exception(self, topic, question_uuid, timeout=30): "traceback": exception["traceback"], }, topic=topic, - attributes={"question_uuid": question_uuid, "is_question": False}, + attributes={"question_uuid": question_uuid, "sender_type": "child"}, timeout=timeout, ) @@ -480,7 +480,7 @@ def _send_delivery_acknowledgment(self, topic, question_uuid, timeout=30): }, topic=topic, timeout=timeout, - attributes={"question_uuid": question_uuid, "is_question": False}, + attributes={"question_uuid": question_uuid, "sender_type": "child"}, ) logger.info("%r acknowledged receipt of question.", self) @@ -500,7 +500,7 @@ def _send_heartbeat(self, topic, question_uuid, timeout=30): }, topic=topic, timeout=timeout, - attributes={"question_uuid": question_uuid, "is_question": False}, + attributes={"question_uuid": question_uuid, "sender_type": "child"}, ) logger.debug("Heartbeat sent by %r.", self) @@ -521,7 +521,7 @@ def _send_monitor_message(self, data, topic, question_uuid, timeout=30): }, topic=topic, timeout=timeout, - attributes={"question_uuid": question_uuid, "is_question": False}, + attributes={"question_uuid": question_uuid, "sender_type": "child"}, ) logger.debug("Monitor message sent by %r.", self) diff --git a/octue/cloud/validation.py b/octue/cloud/validation.py index f0b510d0f..84e7e4088 100644 --- a/octue/cloud/validation.py +++ b/octue/cloud/validation.py @@ -7,7 +7,7 @@ logger = logging.getLogger(__name__) -SERVICE_COMMUNICATION_SCHEMA = "https://jsonschema.registry.octue.com/octue/service-communication/0.4.0.json" +SERVICE_COMMUNICATION_SCHEMA = "https://jsonschema.registry.octue.com/octue/service-communication/0.5.0.json" SERVICE_COMMUNICATION_SCHEMA_INFO_URL = "https://strands.octue.com/octue/service-communication" diff --git a/tests/cloud/pub_sub/test_message_handler.py b/tests/cloud/pub_sub/test_message_handler.py index 286be75ee..c5fb5a002 100644 --- a/tests/cloud/pub_sub/test_message_handler.py +++ b/tests/cloud/pub_sub/test_message_handler.py @@ -418,7 +418,7 @@ def test_pull_and_enqueue_message(self): MockMessage( data=json.dumps(mock_message).encode(), attributes={ - "is_question": False, + "sender_type": "child", "message_number": 0, "question_uuid": question_uuid, "octue_sdk_version": "0.50.0",