From 3ffc7c031fec234bd43e26d8e2e22b9fe811f35a Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Thu, 1 Feb 2024 11:58:23 -0800 Subject: [PATCH] use ResponseParams proto to parse blob --- .../bigtable/data/_metrics/data_model.py | 25 +++++----- tests/unit/data/_metrics/test_data_model.py | 47 ++++++++++++++----- 2 files changed, 48 insertions(+), 24 deletions(-) diff --git a/google/cloud/bigtable/data/_metrics/data_model.py b/google/cloud/bigtable/data/_metrics/data_model.py index fecf041be..043a8e016 100644 --- a/google/cloud/bigtable/data/_metrics/data_model.py +++ b/google/cloud/bigtable/data/_metrics/data_model.py @@ -28,6 +28,8 @@ from grpc import StatusCode import google.cloud.bigtable.data.exceptions as bt_exceptions +from google.cloud.bigtable_v2.types.response_params import ResponseParams +from google.protobuf.message import DecodeError if TYPE_CHECKING: from google.cloud.bigtable.data._metrics.handlers._base import MetricsHandler @@ -229,12 +231,16 @@ def add_response_metadata(self, metadata: dict[str, bytes | str]) -> None: INVALID_STATE_ERROR.format("add_response_metadata", self.state) ) if self.cluster_id is None or self.zone is None: - # BIGTABLE_METADATA_KEY should give a binary string with cluster_id and zone + # BIGTABLE_METADATA_KEY should give a binary-encoded ResponseParams proto blob = cast(bytes, metadata.get(BIGTABLE_METADATA_KEY)) if blob: parse_result = self._parse_response_metadata_blob(blob) if parse_result is not None: - self.zone, self.cluster_id = parse_result + cluster, zone = parse_result + if cluster: + self.cluster_id = cluster + if zone: + self.zone = zone else: self._handle_error( f"Failed to decode {BIGTABLE_METADATA_KEY} metadata: {blob!r}" @@ -251,24 +257,19 @@ def add_response_metadata(self, metadata: dict[str, bytes | str]) -> None: @lru_cache(maxsize=32) def _parse_response_metadata_blob(blob: bytes) -> Tuple[str, str] | None: """ - Parse the response metadata blob and return a dictionary of key-value pairs. + Parse the response metadata blob and return a tuple of cluster and zone. Function is cached to avoid parsing the same blob multiple times. Args: - blob: the metadata blob as extracted from the grpc call Returns: - - a tuple of zone and cluster_id, or None if parsing failed + - a tuple of cluster_id and zone, or None if parsing failed """ try: - decoded = "".join( - c if c.isprintable() else " " for c in blob.decode("utf-8") - ) - split_data = decoded.split() - zone = split_data[0] - cluster_id = split_data[1] - return zone, cluster_id - except (AttributeError, IndexError): + proto = ResponseParams.pb().FromString(blob) + return proto.cluster_id, proto.zone_id + except (DecodeError, TypeError): # failed to parse metadata return None diff --git a/tests/unit/data/_metrics/test_data_model.py b/tests/unit/data/_metrics/test_data_model.py index 49a714a31..dfe70e990 100644 --- a/tests/unit/data/_metrics/test_data_model.py +++ b/tests/unit/data/_metrics/test_data_model.py @@ -19,6 +19,7 @@ from google.cloud.bigtable.data._metrics.data_model import OperationState as State from google.cloud.bigtable.data._metrics.data_model import TimeTuple +from google.cloud.bigtable_v2.types import ResponseParams class TestActiveOperationMetric: @@ -247,27 +248,47 @@ def mock_generator(): assert metric.active_attempt.backoff_before_attempt == i @pytest.mark.parametrize( - "start_cluster,start_zone,metadata_field,end_cluster,end_zone", + "start_cluster,start_zone,metadata_proto,end_cluster,end_zone", [ (None, None, None, None, None), ("orig_cluster", "orig_zone", None, "orig_cluster", "orig_zone"), - (None, None, b"zone cluster", "cluster", "zone"), + (None, None, ResponseParams(), None, None), + ( + "orig_cluster", + "orig_zone", + ResponseParams(), + "orig_cluster", + "orig_zone", + ), ( None, None, - b"\n\rtest-cluster\x12\x0cus-central1-b", - "us-central1-b", + ResponseParams(cluster_id="test-cluster", zone_id="us-central1-b"), "test-cluster", + "us-central1-b", + ), + ( + None, + "filled", + ResponseParams(cluster_id="cluster", zone_id="zone"), + "cluster", + "zone", + ), + (None, "filled", ResponseParams(cluster_id="cluster"), "cluster", "filled"), + (None, "filled", ResponseParams(zone_id="zone"), None, "zone"), + ( + "filled", + None, + ResponseParams(cluster_id="cluster", zone_id="zone"), + "cluster", + "zone", ), - ("orig_cluster", "orig_zone", b"new_new", "orig_cluster", "orig_zone"), - (None, None, b"", None, None), - (None, None, b"zone cluster future", "cluster", "zone"), - (None, "filled", b"zone cluster", "cluster", "zone"), - ("filled", None, b"zone cluster", "cluster", "zone"), + ("filled", None, ResponseParams(cluster_id="cluster"), "cluster", None), + ("filled", None, ResponseParams(zone_id="zone"), "filled", "zone"), ], ) def test_add_response_metadata_cbt_header( - self, start_cluster, start_zone, metadata_field, end_cluster, end_zone + self, start_cluster, start_zone, metadata_proto, end_cluster, end_zone ): """ calling add_response_metadata should update fields based on grpc response metadata @@ -283,8 +304,10 @@ def test_add_response_metadata_cbt_header( metric.active_attempt = mock.Mock() metric.active_attempt.gfe_latency = None metadata = grpc.aio.Metadata() - if metadata_field: - metadata["x-goog-ext-425905942-bin"] = metadata_field + if metadata_proto is not None: + metadata["x-goog-ext-425905942-bin"] = ResponseParams.serialize( + metadata_proto + ) metric.add_response_metadata(metadata) assert metric.cluster_id == end_cluster assert metric.zone == end_zone