From 250dcd12c3fb5f5b6691a0d982ce0d5cc022cda4 Mon Sep 17 00:00:00 2001 From: cortadocodes Date: Wed, 15 Nov 2023 17:24:00 +0000 Subject: [PATCH] ENH: Make input and output values and manifest optional BREAKING CHANGE: Update all your services to this version. --- octue/cloud/pub_sub/message_handler.py | 8 +++--- octue/cloud/pub_sub/service.py | 37 +++++++++++++++----------- tests/cloud/pub_sub/test_service.py | 2 +- 3 files changed, 27 insertions(+), 20 deletions(-) diff --git a/octue/cloud/pub_sub/message_handler.py b/octue/cloud/pub_sub/message_handler.py index 60124e64d..98114c3a8 100644 --- a/octue/cloud/pub_sub/message_handler.py +++ b/octue/cloud/pub_sub/message_handler.py @@ -414,9 +414,9 @@ def _handle_result(self, message): """ logger.info("%r received an answer to question %r.", self.receiving_service, self.question_uuid) - if message["output_manifest"] is None: - output_manifest = None - else: + if message.get("output_manifest"): output_manifest = Manifest.deserialise(message["output_manifest"]) + else: + output_manifest = None - return {"output_values": message["output_values"], "output_manifest": output_manifest} + return {"output_values": message.get("output_values"), "output_manifest": output_manifest} diff --git a/octue/cloud/pub_sub/service.py b/octue/cloud/pub_sub/service.py index accebf880..c8612e721 100644 --- a/octue/cloud/pub_sub/service.py +++ b/octue/cloud/pub_sub/service.py @@ -191,7 +191,7 @@ def answer(self, question, heartbeat_interval=120, timeout=30): :return None: """ ( - data, + question, question_uuid, forward_logs, parent_sdk_version, @@ -224,9 +224,9 @@ def answer(self, question, heartbeat_interval=120, timeout=30): analysis = self.run_function( analysis_id=question_uuid, - input_values=data["input_values"], - input_manifest=data["input_manifest"], - children=data.get("children"), + input_values=question.get("input_values"), + input_manifest=question.get("input_manifest"), + children=question.get("children"), analysis_log_handler=analysis_log_handler, handle_monitor_message=functools.partial( self._send_monitor_message, @@ -236,17 +236,16 @@ def answer(self, question, heartbeat_interval=120, timeout=30): allow_save_diagnostics_data_on_crash=allow_save_diagnostics_data_on_crash, ) - if analysis.output_manifest is None: - serialised_output_manifest = None - else: - serialised_output_manifest = analysis.output_manifest.to_primitive() + result = {"type": "result"} + + if analysis.output_values is not None: + result["output_values"] = analysis.output_values + + if analysis.output_manifest is not None: + result["output_manifest"] = analysis.output_manifest.to_primitive() self._send_message( - { - "type": "result", - "output_values": analysis.output_values, - "output_manifest": serialised_output_manifest, - }, + message=result, topic=topic, attributes={"question_uuid": question_uuid, "is_question": False}, timeout=timeout, @@ -334,12 +333,20 @@ def ask( ) answer_subscription.create(allow_existing=False) + question = {"type": "question"} + + if input_values is not None: + question["input_values"] = input_values + if input_manifest is not None: input_manifest.use_signed_urls_for_datasets() - input_manifest = input_manifest.to_primitive() + question["input_manifest"] = input_manifest.to_primitive() + + if children is not None: + question["children"] = children self._send_message( - {"type": "question", "input_values": input_values, "input_manifest": input_manifest, "children": children}, + message=question, topic=topic, attributes={ "question_uuid": question_uuid, diff --git a/tests/cloud/pub_sub/test_service.py b/tests/cloud/pub_sub/test_service.py index 56e29c622..215337570 100644 --- a/tests/cloud/pub_sub/test_service.py +++ b/tests/cloud/pub_sub/test_service.py @@ -722,7 +722,7 @@ def test_child_messages_can_be_recorded_by_parent(self): self.assertEqual( parent.received_messages[4], - {"type": "result", "output_values": "Hello! It worked!", "output_manifest": None}, + {"type": "result", "output_values": "Hello! It worked!"}, ) def test_child_exception_message_can_be_recorded_by_parent(self):