Skip to content

Commit

Permalink
ENH: Make input and output values and manifest optional
Browse files Browse the repository at this point in the history
BREAKING CHANGE: Update all your services to this version.
  • Loading branch information
cortadocodes committed Nov 16, 2023
1 parent 8118bbb commit 250dcd1
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 20 deletions.
8 changes: 4 additions & 4 deletions octue/cloud/pub_sub/message_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -414,9 +414,9 @@ def _handle_result(self, message):
"""
logger.info("%r received an answer to question %r.", self.receiving_service, self.question_uuid)

if message["output_manifest"] is None:
output_manifest = None
else:
if message.get("output_manifest"):
output_manifest = Manifest.deserialise(message["output_manifest"])
else:
output_manifest = None

return {"output_values": message["output_values"], "output_manifest": output_manifest}
return {"output_values": message.get("output_values"), "output_manifest": output_manifest}
37 changes: 22 additions & 15 deletions octue/cloud/pub_sub/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ def answer(self, question, heartbeat_interval=120, timeout=30):
:return None:
"""
(
data,
question,
question_uuid,
forward_logs,
parent_sdk_version,
Expand Down Expand Up @@ -224,9 +224,9 @@ def answer(self, question, heartbeat_interval=120, timeout=30):

analysis = self.run_function(
analysis_id=question_uuid,
input_values=data["input_values"],
input_manifest=data["input_manifest"],
children=data.get("children"),
input_values=question.get("input_values"),
input_manifest=question.get("input_manifest"),
children=question.get("children"),
analysis_log_handler=analysis_log_handler,
handle_monitor_message=functools.partial(
self._send_monitor_message,
Expand All @@ -236,17 +236,16 @@ def answer(self, question, heartbeat_interval=120, timeout=30):
allow_save_diagnostics_data_on_crash=allow_save_diagnostics_data_on_crash,
)

if analysis.output_manifest is None:
serialised_output_manifest = None
else:
serialised_output_manifest = analysis.output_manifest.to_primitive()
result = {"type": "result"}

if analysis.output_values is not None:
result["output_values"] = analysis.output_values

if analysis.output_manifest is not None:
result["output_manifest"] = analysis.output_manifest.to_primitive()

self._send_message(
{
"type": "result",
"output_values": analysis.output_values,
"output_manifest": serialised_output_manifest,
},
message=result,
topic=topic,
attributes={"question_uuid": question_uuid, "is_question": False},
timeout=timeout,
Expand Down Expand Up @@ -334,12 +333,20 @@ def ask(
)
answer_subscription.create(allow_existing=False)

question = {"type": "question"}

if input_values is not None:
question["input_values"] = input_values

if input_manifest is not None:
input_manifest.use_signed_urls_for_datasets()
input_manifest = input_manifest.to_primitive()
question["input_manifest"] = input_manifest.to_primitive()

if children is not None:
question["children"] = children

self._send_message(
{"type": "question", "input_values": input_values, "input_manifest": input_manifest, "children": children},
message=question,
topic=topic,
attributes={
"question_uuid": question_uuid,
Expand Down
2 changes: 1 addition & 1 deletion tests/cloud/pub_sub/test_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -722,7 +722,7 @@ def test_child_messages_can_be_recorded_by_parent(self):

self.assertEqual(
parent.received_messages[4],
{"type": "result", "output_values": "Hello! It worked!", "output_manifest": None},
{"type": "result", "output_values": "Hello! It worked!"},
)

def test_child_exception_message_can_be_recorded_by_parent(self):
Expand Down

0 comments on commit 250dcd1

Please sign in to comment.