diff --git a/google/cloud/bigtable/data/_metrics/data_model.py b/google/cloud/bigtable/data/_metrics/data_model.py index 079e8b6d7..0eab5a2b3 100644 --- a/google/cloud/bigtable/data/_metrics/data_model.py +++ b/google/cloud/bigtable/data/_metrics/data_model.py @@ -41,9 +41,7 @@ SERVER_TIMING_REGEX = re.compile(r"gfet4t7; dur=(\d+)") -OPERATION_COMPLETE_ERROR = "Operation already completed" -NO_ATTEMPT_ERROR = "No active attempt" -HAS_ATTEMPT_ERROR = "Attempt already in progress" +INVALID_STATE_ERROR = "Invalid state for {}: {}" class OperationType(Enum): @@ -57,6 +55,14 @@ class OperationType(Enum): READ_MODIFY_WRITE = "Bigtable.ReadModifyWriteRow" +class OperationState(Enum): + """Enum for the state of the active operation.""" + CREATED = 0 + ACTIVE_ATTEMPT = 1 + BETWEEN_ATTEMPTS = 2 + COMPLETED = 3 + + @dataclass(frozen=True) class CompletedAttemptMetric: """ @@ -90,7 +96,10 @@ class CompletedOperationMetric: @dataclass class ActiveAttemptMetric: start_time: float = field(default_factory=time.monotonic) + # the time it takes to recieve the first response from the server + # currently only tracked for ReadRows first_response_latency: float | None = None + # the time taken by the backend. Taken from response header gfe_latency: float | None = None @@ -108,9 +117,21 @@ class ActiveOperationMetric: cluster_id: str | None = None zone: str | None = None completed_attempts: list[CompletedAttemptMetric] = field(default_factory=list) - was_completed: bool = False handlers: list[MetricsHandler] = field(default_factory=list) is_streaming: bool = False # only True for read_rows operations + was_completed: bool = False + + @property + def state(self) -> OperationState: + if self.was_completed: + return OperationState.COMPLETED + elif self.active_attempt is None: + if self.completed_attempts: + return OperationState.BETWEEN_ATTEMPTS + else: + return OperationState.CREATED + else: + return OperationState.ACTIVE_ATTEMPT def start(self) -> None: """ @@ -120,10 +141,8 @@ def start(self) -> None: If the operation was completed or has active attempts, will raise an exception or warning based on the value of ALLOW_METRIC_EXCEPTIONS. """ - if self.was_completed: - return self._handle_error(OPERATION_COMPLETE_ERROR) - if self.completed_attempts or self.active_attempt: - return self._handle_error(HAS_ATTEMPT_ERROR) + if self.state != OperationState.CREATED: + return self._handle_error(INVALID_STATE_ERROR.format("start", self.state)) self.start_time = time.monotonic() def start_attempt(self) -> None: @@ -133,10 +152,8 @@ def start_attempt(self) -> None: If the operation was completed or there is already an active attempt, will raise an exception or warning based on the value of ALLOW_METRIC_EXCEPTIONS. """ - if self.was_completed: - return self._handle_error(OPERATION_COMPLETE_ERROR) - if self.active_attempt is not None: - return self._handle_error(HAS_ATTEMPT_ERROR) + if self.state != OperationState.BETWEEN_ATTEMPTS and self.state != OperationState.CREATED: + return self._handle_error(INVALID_STATE_ERROR.format("start_attempt", self.state)) self.active_attempt = ActiveAttemptMetric() @@ -152,25 +169,24 @@ def add_call_metadata(self, metadata: dict[str, bytes | str]) -> None: Args: - metadata: the metadata as extracted from the grpc call """ - if self.was_completed: - return self._handle_error(OPERATION_COMPLETE_ERROR) - if self.active_attempt is None: - return self._handle_error(NO_ATTEMPT_ERROR) + if self.state != OperationState.ACTIVE_ATTEMPT: + return self._handle_error(INVALID_STATE_ERROR.format("add_call_metadata", self.state)) if self.cluster_id is None or self.zone is None: bigtable_metadata = metadata.get(BIGTABLE_METADATA_KEY) - if bigtable_metadata and isinstance(bigtable_metadata, bytes): - decoded = "".join( - c if c.isprintable() else " " - for c in bigtable_metadata.decode("utf-8") - ) - cluster_id, zone = decoded.split() - if cluster_id: - self.cluster_id = cluster_id - if zone: - self.zone = zone + if bigtable_metadata: + try: + decoded = "".join( + c if c.isprintable() else " " + for c in bigtable_metadata.decode("utf-8") + ) + split_data = decoded.split() + self.cluster_id = split_data[0] + self.zone = split_data[1] + except (AttributeError, IndexError): + self._handle_error(f"Failed to decode {BIGTABLE_METADATA_KEY} metadata: {bigtable_metadata}") timing_header = metadata.get(SERVER_TIMING_METADATA_KEY) - if timing_header and isinstance(timing_header, str): + if timing_header: timing_data = SERVER_TIMING_REGEX.match(timing_header) if timing_data: # convert from milliseconds to seconds @@ -185,10 +201,8 @@ def attempt_first_response(self) -> None: active attempt already has a first response time, will raise an exception or warning based on the value of ALLOW_METRIC_EXCEPTIONS. """ - if self.was_completed: - return self._handle_error(OPERATION_COMPLETE_ERROR) - elif self.active_attempt is None: - return self._handle_error(NO_ATTEMPT_ERROR) + if self.state != OperationState.ACTIVE_ATTEMPT: + return self._handle_error(INVALID_STATE_ERROR.format("attempt_first_response", self.state)) elif self.active_attempt.first_response_latency is not None: return self._handle_error("Attempt already received first response") self.active_attempt.first_response_latency = ( @@ -205,10 +219,8 @@ def end_attempt_with_status(self, status: StatusCode | Exception) -> None: Args: - status: The status of the attempt. """ - if self.was_completed: - return self._handle_error(OPERATION_COMPLETE_ERROR) - if self.active_attempt is None: - return self._handle_error(NO_ATTEMPT_ERROR) + if self.state != OperationState.ACTIVE_ATTEMPT: + return self._handle_error(INVALID_STATE_ERROR.format("end_attempt_with_status", self.state)) new_attempt = CompletedAttemptMetric( start_time=self.active_attempt.start_time, @@ -235,9 +247,9 @@ def end_with_status(self, status: StatusCode | Exception) -> None: Args: - status: The status of the operation. """ - if self.was_completed: - return self._handle_error(OPERATION_COMPLETE_ERROR) - if self.active_attempt is not None: + if self.state == OperationState.COMPLETED: + return self._handle_error(INVALID_STATE_ERROR.format("end_with_status", self.state)) + elif self.state == OperationState.ACTIVE_ATTEMPT: self.end_attempt_with_status(status) self.was_completed = True finalized = CompletedOperationMetric( diff --git a/tests/unit/data/_metrics/test_data_model.py b/tests/unit/data/_metrics/test_data_model.py index 0d00855d3..a41d6ba76 100644 --- a/tests/unit/data/_metrics/test_data_model.py +++ b/tests/unit/data/_metrics/test_data_model.py @@ -17,6 +17,9 @@ import mock from uuid import UUID +from google.cloud.bigtable.data._metrics.data_model import OperationType +from google.cloud.bigtable.data._metrics.data_model import OperationState as State + class TestActiveOperationMetric: @@ -79,65 +82,76 @@ def test_ctor_explicit(self): assert metric.handlers == expected_handlers assert metric.is_streaming == expected_is_streaming - @pytest.mark.parametrize("method,args", [ - ("start", ()), - ("start_attempt", ()), - ("add_call_metadata", ({},)), - ("attempt_first_response", ()), - ("end_attempt_with_status", (mock.Mock(),)), - ("end_with_status", (mock.Mock(),)), - ("end_with_success", ()), - ]) - def test_error_completed_operation(self, method, args): + def test_state_machine_w_methods(self): """ - calling any method on a completed operation should call _handle_error - to log or raise an error + Exercise the state machine by calling methods to move between states """ - cls = type(self._make_one(mock.Mock())) - with mock.patch.object(cls, "_handle_error") as mock_handle_error: - mock_handle_error.return_value = None - metric = self._make_one(mock.Mock(), was_completed=True) - return_obj = getattr(metric, method)(*args) - assert return_obj is None - assert mock_handle_error.call_count == 1 - assert mock_handle_error.call_args[0][0] == "Operation already completed" + metric = self._make_one(mock.Mock()) + assert metric.state == State.CREATED + metric.start() + assert metric.state == State.CREATED + metric.start_attempt() + assert metric.state == State.ACTIVE_ATTEMPT + metric.end_attempt_with_status(Exception()) + assert metric.state == State.BETWEEN_ATTEMPTS + metric.start_attempt() + assert metric.state == State.ACTIVE_ATTEMPT + metric.end_with_success() + assert metric.state == State.COMPLETED - @pytest.mark.parametrize("method,args", [ - ("add_call_metadata", ({},)), - ("attempt_first_response", ()), - ("end_attempt_with_status", (mock.Mock(),)), - ]) - def test_error_no_active_attempt(self, method, args): + def test_state_machine_w_state(self): """ - If a method on an attempt is called with no active attempt, call _handle_error - to log or raise an error + Exercise state machine by directly manupulating state variables + + relevant variables are: active_attempt, completed_attempts, was_completed """ - cls = type(self._make_one(mock.Mock())) - with mock.patch.object(cls, "_handle_error") as mock_handle_error: - mock_handle_error.return_value = None - metric = self._make_one(mock.Mock()) - return_obj = getattr(metric, method)(*args) - assert return_obj is None - assert mock_handle_error.call_count == 1 - assert mock_handle_error.call_args[0][0] == "No active attempt" + metric = self._make_one(mock.Mock()) + for was_completed_value in [False, True]: + metric.was_completed = was_completed_value + for active_operation_value in [None, mock.Mock()]: + metric.active_attempt = active_operation_value + for completed_attempts_value in [[], [mock.Mock()]]: + metric.completed_attempts = completed_attempts_value + if was_completed_value: + assert metric.state == State.COMPLETED + elif active_operation_value is not None: + assert metric.state == State.ACTIVE_ATTEMPT + elif completed_attempts_value: + assert metric.state == State.BETWEEN_ATTEMPTS + else: + assert metric.state == State.CREATED - @pytest.mark.parametrize("method,args", [ - ("start", ({},)), - ("start_attempt", ()), - ]) - def test_error_has_active_attempt(self, method, args): + + @pytest.mark.parametrize("method,args,valid_states,error_method_name", [ + ("start", (), (State.CREATED,), None), + ("start_attempt", (), (State.CREATED, State.BETWEEN_ATTEMPTS), None), + ("add_call_metadata", ({},), (State.ACTIVE_ATTEMPT,), None), + ("attempt_first_response", (), (State.ACTIVE_ATTEMPT,), None), + ("end_attempt_with_status", (mock.Mock(),), (State.ACTIVE_ATTEMPT,), None), + ("end_with_status", (mock.Mock(),), (State.CREATED, State.ACTIVE_ATTEMPT,State.BETWEEN_ATTEMPTS,), None), + ("end_with_success", (), (State.CREATED, State.ACTIVE_ATTEMPT,State.BETWEEN_ATTEMPTS,), "end_with_status"), + ], ids=lambda x: x if isinstance(x, str) else "") + def test_error_invalid_states(self, method, args, valid_states, error_method_name): """ - If a method starting attempt is called with an active attempt, call _handle_error - to log or raise an error + each method only works for certain states. Make sure _handle_error is called for invalid states """ cls = type(self._make_one(mock.Mock())) - with mock.patch.object(cls, "_handle_error") as mock_handle_error: - mock_handle_error.return_value = None - metric = self._make_one(mock.Mock(), active_attempt=mock.Mock()) - return_obj = getattr(metric, method)(*args) - assert return_obj is None - assert mock_handle_error.call_count == 1 - assert mock_handle_error.call_args[0][0] == "Attempt already in progress" + invalid_states = set(State) - set(valid_states) + error_method_name = error_method_name or method + for state in invalid_states: + with mock.patch.object(cls, "_handle_error") as mock_handle_error: + mock_handle_error.return_value = None + metric = self._make_one(mock.Mock()) + if state == State.ACTIVE_ATTEMPT: + metric.active_attempt = mock.Mock() + elif state == State.BETWEEN_ATTEMPTS: + metric.completed_attempts.append(mock.Mock()) + elif state == State.COMPLETED: + metric.was_completed = True + return_obj = getattr(metric, method)(*args) + assert return_obj is None + assert mock_handle_error.call_count == 1 + assert mock_handle_error.call_args[0][0] == f"Invalid state for {error_method_name}: {state}" def test_start(self): """ @@ -149,4 +163,123 @@ def test_start(self): metric.start() assert metric.start_time != orig_time assert metric.start_time - time.monotonic() < 0.1 + # should remain in CREATED state after completing + assert metric.state == State.CREATED + + def test_start_attempt(self): + """ + calling start_attempt should create a new emptu atempt metric + """ + from google.cloud.bigtable.data._metrics.data_model import ActiveAttemptMetric + metric = self._make_one(mock.Mock()) + assert metric.active_attempt is None + metric.start_attempt() + assert isinstance(metric.active_attempt, ActiveAttemptMetric) + # make sure it was initialized with the correct values + assert time.monotonic() - metric.active_attempt.start_time < 0.1 + assert metric.active_attempt.first_response_latency is None + assert metric.active_attempt.gfe_latency is None + # should be in ACTIVE_ATTEMPT state after completing + assert metric.state == State.ACTIVE_ATTEMPT + + + @pytest.mark.parametrize("start_cluster,start_zone,metadata_field,end_cluster,end_zone", [ + (None,None, None, None, None), + ("orig_cluster", "orig_zone", None, "orig_cluster", "orig_zone"), + (None,None, b"cluster zone", "cluster", "zone"), + (None,None, b'\n\rus-central1-b\x12\x0ctest-cluster', "us-central1-b", "test-cluster"), + ("orig_cluster","orig_zone", b"new_new", "orig_cluster", "orig_zone"), + (None,None, b"", None, None), + (None,None, b"cluster zone future", "cluster", "zone"), + (None, "filled", b"cluster zone", "cluster", "zone"), + ("filled", None, b"cluster zone", "cluster", "zone"), + ]) + def test_add_call_metadata_cbt_header(self, start_cluster, start_zone, metadata_field, end_cluster, end_zone): + """ + calling add_call_metadata should update fields based on grpc response metadata + The x-goog-ext-425905942-bin field contains cluster and zone info + """ + import grpc + cls = type(self._make_one(mock.Mock())) + with mock.patch.object(cls, "_handle_error") as mock_handle_error: + metric = self._make_one(mock.Mock(), cluster_id=start_cluster, zone=start_zone) + metric.active_attempt = mock.Mock() + metric.active_attempt.gfe_latency = None + metadata = grpc.aio.Metadata() + if metadata_field: + metadata['x-goog-ext-425905942-bin'] = metadata_field + metric.add_call_metadata(metadata) + assert metric.cluster_id == end_cluster + assert metric.zone == end_zone + # should remain in ACTIVE_ATTEMPT state after completing + assert metric.state == State.ACTIVE_ATTEMPT + # no errors encountered + assert mock_handle_error.call_count == 0 + # gfe latency should not be touched + assert metric.active_attempt.gfe_latency is None + + @pytest.mark.parametrize("metadata_field", [ + b"cluster", + "cluster zone", # expect bytes + ]) + def test_add_call_metadata_cbt_header_w_error(self, metadata_field): + """ + If the x-goog-ext-425905942-bin field is present, but not structured properly, + _handle_error should be called + + Extra fields should not result in parsing error + """ + import grpc + cls = type(self._make_one(mock.Mock())) + with mock.patch.object(cls, "_handle_error") as mock_handle_error: + metric = self._make_one(mock.Mock()) + metric.cluster_id = None + metric.zone = None + metric.active_attempt = mock.Mock() + metadata = grpc.aio.Metadata() + metadata['x-goog-ext-425905942-bin'] = metadata_field + metric.add_call_metadata(metadata) + # should remain in ACTIVE_ATTEMPT state after completing + assert metric.state == State.ACTIVE_ATTEMPT + # no errors encountered + assert mock_handle_error.call_count == 1 + assert mock_handle_error.call_args[0][0] == f"Failed to decode x-goog-ext-425905942-bin metadata: {metadata_field}" + + @pytest.mark.parametrize("metadata_field,expected_latency", [ + (None, None), + ("gfet4t7; dur=1000", 1), + ("gfet4t7; dur=1000.0", 1), + ("gfet4t7; dur=1000.1", 1.0001), + ("gfet4t7; dur=1000 dur=2000", 2), + ("gfet4t7; dur=0", 0), + ("gfet4t7; dur=empty", None), + ("gfet4t7;", None), + ("", None), + ]) + def test_add_call_metadata_server_timing_header(self, metadata_field, expected_latency): + """ + calling add_call_metadata should update fields based on grpc response metadata + The server-timing field contains gfle latency info + """ + import grpc + cls = type(self._make_one(mock.Mock())) + with mock.patch.object(cls, "_handle_error") as mock_handle_error: + metric = self._make_one(mock.Mock()) + metric.active_attempt = mock.Mock() + metric.active_attempt.gfe_latency = None + metadata = grpc.aio.Metadata() + if metadata_field: + metadata['server-timing'] = metadata_field + metric.add_call_metadata(metadata) + if metric.active_attempt.gfe_latency is None: + assert expected_latency is None + else: + assert (metric.active_attempt.gfe_latency - expected_latency) < 0.0001 + # should remain in ACTIVE_ATTEMPT state after completing + assert metric.state == State.ACTIVE_ATTEMPT + # no errors encountered + assert mock_handle_error.call_count == 0 + # cluster and zone should not be touched + assert metric.cluster_id is None + assert metric.zone is None