Skip to content

Commit

Permalink
REF: Rename is_question to sender_type and change type to enum
Browse files Browse the repository at this point in the history
  • Loading branch information
cortadocodes committed Nov 21, 2023
1 parent 3e6a51b commit c94cccc
Show file tree
Hide file tree
Showing 7 changed files with 15 additions and 15 deletions.
2 changes: 1 addition & 1 deletion octue/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ def create_push_subscription(
name=pub_sub_sruid,
topic=topic,
project_name=project_name,
filter='attributes.is_question = "1"',
filter='attributes.sender_type = "parent"',
expiration_time=expiration_time,
push_endpoint=push_endpoint,
)
Expand Down
2 changes: 1 addition & 1 deletion octue/cloud/emulators/_pub_sub.py
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ def ask(
MockMessage(
data=json.dumps(question, cls=OctueJSONEncoder).encode(),
attributes={
"is_question": True,
"sender_type": "parent",
"question_uuid": question_uuid,
"forward_logs": subscribe_to_logs,
"octue_sdk_version": parent_sdk_version,
Expand Down
2 changes: 1 addition & 1 deletion octue/cloud/pub_sub/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def emit(self, record):
"log_record": self._convert_log_record_to_primitives(record),
},
topic=self.topic,
attributes={"question_uuid": self.question_uuid, "is_question": False},
attributes={"question_uuid": self.question_uuid, "sender_type": "child"},
)

except Exception: # noqa
Expand Down
4 changes: 2 additions & 2 deletions octue/cloud/pub_sub/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
def extract_event_and_attributes_from_pub_sub(message):
# Cast attributes to dict to avoid defaultdict behaviour.
attributes = dict(getattr_or_subscribe(message, "attributes"))
is_question = bool(int(attributes["is_question"]))
sender_type = attributes["sender_type"]
question_uuid = attributes["question_uuid"]
message_number = int(attributes["message_number"])
octue_sdk_version = attributes["octue_sdk_version"]
Expand All @@ -35,7 +35,7 @@ def extract_event_and_attributes_from_pub_sub(message):
return (
event,
{
"is_question": is_question,
"sender_type": sender_type,
"question_uuid": question_uuid,
"octue_sdk_version": octue_sdk_version,
"message_number": message_number,
Expand Down
16 changes: 8 additions & 8 deletions octue/cloud/pub_sub/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ def serve(self, timeout=None, delete_topic_and_subscription_on_exit=False, allow
name=self._pub_sub_id,
topic=topic,
project_name=self.backend.project_name,
filter='attributes.is_question = "1"',
filter='attributes.sender_type = "parent"',
expiration_time=None,
)

Expand Down Expand Up @@ -252,7 +252,7 @@ def answer(self, question, heartbeat_interval=120, timeout=30):
self._send_message(
message=result,
topic=topic,
attributes={"question_uuid": question_uuid, "is_question": False},
attributes={"question_uuid": question_uuid, "sender_type": "child"},
timeout=timeout,
)

Expand Down Expand Up @@ -333,7 +333,7 @@ def ask(
name=".".join((topic.name, ANSWERS_NAMESPACE, question_uuid)),
topic=topic,
project_name=self.backend.project_name,
filter=f'attributes.question_uuid = "{question_uuid}" AND attributes.is_question = "0"',
filter=f'attributes.question_uuid = "{question_uuid}" AND attributes.sender_type = "child"',
push_endpoint=push_endpoint,
)
answer_subscription.create(allow_existing=False)
Expand All @@ -355,7 +355,7 @@ def ask(
topic=topic,
attributes={
"question_uuid": question_uuid,
"is_question": True,
"sender_type": "parent",
"forward_logs": subscribe_to_logs,
"allow_save_diagnostics_data_on_crash": allow_save_diagnostics_data_on_crash,
},
Expand Down Expand Up @@ -428,7 +428,7 @@ def send_exception(self, topic, question_uuid, timeout=30):
"traceback": exception["traceback"],
},
topic=topic,
attributes={"question_uuid": question_uuid, "is_question": False},
attributes={"question_uuid": question_uuid, "sender_type": "child"},
timeout=timeout,
)

Expand Down Expand Up @@ -480,7 +480,7 @@ def _send_delivery_acknowledgment(self, topic, question_uuid, timeout=30):
},
topic=topic,
timeout=timeout,
attributes={"question_uuid": question_uuid, "is_question": False},
attributes={"question_uuid": question_uuid, "sender_type": "child"},
)

logger.info("%r acknowledged receipt of question.", self)
Expand All @@ -500,7 +500,7 @@ def _send_heartbeat(self, topic, question_uuid, timeout=30):
},
topic=topic,
timeout=timeout,
attributes={"question_uuid": question_uuid, "is_question": False},
attributes={"question_uuid": question_uuid, "sender_type": "child"},
)

logger.debug("Heartbeat sent by %r.", self)
Expand All @@ -521,7 +521,7 @@ def _send_monitor_message(self, data, topic, question_uuid, timeout=30):
},
topic=topic,
timeout=timeout,
attributes={"question_uuid": question_uuid, "is_question": False},
attributes={"question_uuid": question_uuid, "sender_type": "child"},
)

logger.debug("Monitor message sent by %r.", self)
Expand Down
2 changes: 1 addition & 1 deletion octue/cloud/validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

logger = logging.getLogger(__name__)

SERVICE_COMMUNICATION_SCHEMA = "https://jsonschema.registry.octue.com/octue/service-communication/0.4.0.json"
SERVICE_COMMUNICATION_SCHEMA = "https://jsonschema.registry.octue.com/octue/service-communication/0.5.0.json"
SERVICE_COMMUNICATION_SCHEMA_INFO_URL = "https://strands.octue.com/octue/service-communication"


Expand Down
2 changes: 1 addition & 1 deletion tests/cloud/pub_sub/test_message_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ def test_pull_and_enqueue_message(self):
MockMessage(
data=json.dumps(mock_message).encode(),
attributes={
"is_question": False,
"sender_type": "child",
"message_number": 0,
"question_uuid": question_uuid,
"octue_sdk_version": "0.50.0",
Expand Down

0 comments on commit c94cccc

Please sign in to comment.