Skip to content

Commit

Permalink
use ResponseParams proto to parse blob
Browse files Browse the repository at this point in the history
  • Loading branch information
daniel-sanche committed Feb 1, 2024
1 parent f6831bb commit 3ffc7c0
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 24 deletions.
25 changes: 13 additions & 12 deletions google/cloud/bigtable/data/_metrics/data_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
from grpc import StatusCode

import google.cloud.bigtable.data.exceptions as bt_exceptions
from google.cloud.bigtable_v2.types.response_params import ResponseParams
from google.protobuf.message import DecodeError

if TYPE_CHECKING:
from google.cloud.bigtable.data._metrics.handlers._base import MetricsHandler
Expand Down Expand Up @@ -229,12 +231,16 @@ def add_response_metadata(self, metadata: dict[str, bytes | str]) -> None:
INVALID_STATE_ERROR.format("add_response_metadata", self.state)
)
if self.cluster_id is None or self.zone is None:
# BIGTABLE_METADATA_KEY should give a binary string with cluster_id and zone
# BIGTABLE_METADATA_KEY should give a binary-encoded ResponseParams proto
blob = cast(bytes, metadata.get(BIGTABLE_METADATA_KEY))
if blob:
parse_result = self._parse_response_metadata_blob(blob)
if parse_result is not None:
self.zone, self.cluster_id = parse_result
cluster, zone = parse_result
if cluster:
self.cluster_id = cluster
if zone:
self.zone = zone
else:
self._handle_error(
f"Failed to decode {BIGTABLE_METADATA_KEY} metadata: {blob!r}"
Expand All @@ -251,24 +257,19 @@ def add_response_metadata(self, metadata: dict[str, bytes | str]) -> None:
@lru_cache(maxsize=32)
def _parse_response_metadata_blob(blob: bytes) -> Tuple[str, str] | None:
"""
Parse the response metadata blob and return a dictionary of key-value pairs.
Parse the response metadata blob and return a tuple of cluster and zone.
Function is cached to avoid parsing the same blob multiple times.
Args:
- blob: the metadata blob as extracted from the grpc call
Returns:
- a tuple of zone and cluster_id, or None if parsing failed
- a tuple of cluster_id and zone, or None if parsing failed
"""
try:
decoded = "".join(
c if c.isprintable() else " " for c in blob.decode("utf-8")
)
split_data = decoded.split()
zone = split_data[0]
cluster_id = split_data[1]
return zone, cluster_id
except (AttributeError, IndexError):
proto = ResponseParams.pb().FromString(blob)
return proto.cluster_id, proto.zone_id
except (DecodeError, TypeError):
# failed to parse metadata
return None

Expand Down
47 changes: 35 additions & 12 deletions tests/unit/data/_metrics/test_data_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

from google.cloud.bigtable.data._metrics.data_model import OperationState as State
from google.cloud.bigtable.data._metrics.data_model import TimeTuple
from google.cloud.bigtable_v2.types import ResponseParams


class TestActiveOperationMetric:
Expand Down Expand Up @@ -247,27 +248,47 @@ def mock_generator():
assert metric.active_attempt.backoff_before_attempt == i

@pytest.mark.parametrize(
"start_cluster,start_zone,metadata_field,end_cluster,end_zone",
"start_cluster,start_zone,metadata_proto,end_cluster,end_zone",
[
(None, None, None, None, None),
("orig_cluster", "orig_zone", None, "orig_cluster", "orig_zone"),
(None, None, b"zone cluster", "cluster", "zone"),
(None, None, ResponseParams(), None, None),
(
"orig_cluster",
"orig_zone",
ResponseParams(),
"orig_cluster",
"orig_zone",
),
(
None,
None,
b"\n\rtest-cluster\x12\x0cus-central1-b",
"us-central1-b",
ResponseParams(cluster_id="test-cluster", zone_id="us-central1-b"),
"test-cluster",
"us-central1-b",
),
(
None,
"filled",
ResponseParams(cluster_id="cluster", zone_id="zone"),
"cluster",
"zone",
),
(None, "filled", ResponseParams(cluster_id="cluster"), "cluster", "filled"),
(None, "filled", ResponseParams(zone_id="zone"), None, "zone"),
(
"filled",
None,
ResponseParams(cluster_id="cluster", zone_id="zone"),
"cluster",
"zone",
),
("orig_cluster", "orig_zone", b"new_new", "orig_cluster", "orig_zone"),
(None, None, b"", None, None),
(None, None, b"zone cluster future", "cluster", "zone"),
(None, "filled", b"zone cluster", "cluster", "zone"),
("filled", None, b"zone cluster", "cluster", "zone"),
("filled", None, ResponseParams(cluster_id="cluster"), "cluster", None),
("filled", None, ResponseParams(zone_id="zone"), "filled", "zone"),
],
)
def test_add_response_metadata_cbt_header(
self, start_cluster, start_zone, metadata_field, end_cluster, end_zone
self, start_cluster, start_zone, metadata_proto, end_cluster, end_zone
):
"""
calling add_response_metadata should update fields based on grpc response metadata
Expand All @@ -283,8 +304,10 @@ def test_add_response_metadata_cbt_header(
metric.active_attempt = mock.Mock()
metric.active_attempt.gfe_latency = None
metadata = grpc.aio.Metadata()
if metadata_field:
metadata["x-goog-ext-425905942-bin"] = metadata_field
if metadata_proto is not None:
metadata["x-goog-ext-425905942-bin"] = ResponseParams.serialize(
metadata_proto
)
metric.add_response_metadata(metadata)
assert metric.cluster_id == end_cluster
assert metric.zone == end_zone
Expand Down

0 comments on commit 3ffc7c0

Please sign in to comment.