Skip to content

Commit

Permalink
ENH: Improve message validation methods
Browse files Browse the repository at this point in the history
  • Loading branch information
cortadocodes committed Nov 20, 2023
1 parent 5695f40 commit 29b2b5c
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 22 deletions.
17 changes: 9 additions & 8 deletions octue/cloud/pub_sub/message_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from google.cloud.pubsub_v1 import SubscriberClient

from octue.cloud import EXCEPTIONS_MAPPING
from octue.cloud.pub_sub.validation import SERVICE_COMMUNICATION_SCHEMA, is_message_valid, log_invalid_message
from octue.cloud.pub_sub.validation import SERVICE_COMMUNICATION_SCHEMA, is_message_valid
from octue.definitions import GOOGLE_COMPUTE_PROVIDERS
from octue.log_handlers import COLOUR_PALETTE
from octue.resources.manifest import Manifest
Expand Down Expand Up @@ -235,13 +235,14 @@ def _pull_and_enqueue_message(self, timeout):

message = json.loads(answer.message.data.decode(), cls=OctueJSONDecoder)

if not is_message_valid(message=message, attributes=answer.message.attributes, schema=self.message_schema):
log_invalid_message(
message=message,
receiving_service=self.receiving_service,
parent_sdk_version=importlib.metadata.version("octue"),
child_sdk_version=self._child_sdk_version,
)
if not is_message_valid(
message=message,
attributes=dict(answer.message.attributes),
receiving_service=self.receiving_service,
parent_sdk_version=importlib.metadata.version("octue"),
child_sdk_version=self._child_sdk_version,
schema=self.message_schema,
):
return

message_number = int(message["message_number"])
Expand Down
16 changes: 6 additions & 10 deletions octue/cloud/pub_sub/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from octue.cloud.pub_sub import Subscription, Topic
from octue.cloud.pub_sub.logging import GooglePubSubHandler
from octue.cloud.pub_sub.message_handler import OrderedMessageHandler
from octue.cloud.pub_sub.validation import SERVICE_COMMUNICATION_SCHEMA, is_message_valid, log_invalid_message
from octue.cloud.pub_sub.validation import SERVICE_COMMUNICATION_SCHEMA, raise_if_message_is_invalid
from octue.cloud.service_id import (
convert_service_id_to_pub_sub_form,
create_sruid,
Expand Down Expand Up @@ -554,23 +554,19 @@ def _parse_question(self, question):
except AttributeError:
allow_save_diagnostics_data_on_crash = False

if not is_message_valid(
raise_if_message_is_invalid(
message=data,
attributes={
"question_uuid": question_uuid,
"forward_logs": forward_logs,
"parent_sdk_version": parent_sdk_version,
"allow_save_diagnostics_data_on_crash": allow_save_diagnostics_data_on_crash,
},
receiving_service=self,
parent_sdk_version=parent_sdk_version,
child_sdk_version=importlib.metadata.version("octue"),
schema={"$ref": SERVICE_COMMUNICATION_SCHEMA},
):
log_invalid_message(
message=data,
receiving_service=self,
parent_sdk_version=parent_sdk_version,
child_sdk_version=importlib.metadata.version("octue"),
)
raise jsonschema.ValidationError
)

logger.info("%r parsed the question successfully.", self)
return data, question_uuid, forward_logs, parent_sdk_version, allow_save_diagnostics_data_on_crash
57 changes: 53 additions & 4 deletions octue/cloud/pub_sub/validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,67 @@
SERVICE_COMMUNICATION_SCHEMA_INFO_URL = "https://strands.octue.com/octue/service-communication"


def is_message_valid(message, attributes, schema=None):
if schema is None:
schema = {"$ref": SERVICE_COMMUNICATION_SCHEMA}
def is_message_valid(message, attributes, receiving_service, parent_sdk_version, child_sdk_version, schema=None):
"""Check if the message or its attributes are valid according to the schema.
:param dict message: the message to validate
:param dict attributes: the attributes of the message to validate
:param octue.cloud.pub_sub.service.Service receiving_service: the service that received the message and is validating it
:param str parent_sdk_version: the semantic version of Octue SDK running the parent
:param str child_sdk_version: the semantic version of Octue SDK running the child
:param dict|None schema: the schema to validate the message and its attributes against; if `None`, this defaults to the service communication schema used in this version of Octue SDK
:return bool: `True` if the message and its attributes are valid
"""
try:
jsonschema.validate({"event": message, "attributes": dict(attributes)}, schema)
raise_if_message_is_invalid(
message,
attributes,
receiving_service,
parent_sdk_version,
child_sdk_version,
schema=schema,
)
except jsonschema.ValidationError:
return False

return True


def raise_if_message_is_invalid(
message,
attributes,
receiving_service,
parent_sdk_version,
child_sdk_version,
schema=None,
):
"""Raise an error if the message or its attributes aren't valid according to the schema.
:param dict message: the message to validate
:param dict attributes: the attributes of the message to validate
:param octue.cloud.pub_sub.service.Service receiving_service: the service that received the message and is validating it
:param str parent_sdk_version: the semantic version of Octue SDK running the parent
:param str child_sdk_version: the semantic version of Octue SDK running the child
:param dict|None schema: the schema to validate the message and its attributes against; if `None`, this defaults to the service communication schema used in this version of Octue SDK
:raise jsonschema.ValidationError: if the message or its attributes are invalid
:return None:
"""
if schema is None:
schema = {"$ref": SERVICE_COMMUNICATION_SCHEMA}

try:
jsonschema.validate({"event": message, "attributes": dict(attributes)}, schema)
except jsonschema.ValidationError as error:
log_invalid_message(
message=message,
receiving_service=receiving_service,
parent_sdk_version=parent_sdk_version,
child_sdk_version=child_sdk_version,
)

raise error


def log_invalid_message(message, receiving_service, parent_sdk_version, child_sdk_version):
"""Log an invalid message and issue a warning if the parent and child SDK versions are incompatible.
Expand Down

0 comments on commit 29b2b5c

Please sign in to comment.