diff --git a/google/cloud/bigtable/data/_metrics/data_model.py b/google/cloud/bigtable/data/_metrics/data_model.py index 475073a8f..079e8b6d7 100644 --- a/google/cloud/bigtable/data/_metrics/data_model.py +++ b/google/cloud/bigtable/data/_metrics/data_model.py @@ -41,6 +41,10 @@ SERVER_TIMING_REGEX = re.compile(r"gfet4t7; dur=(\d+)") +OPERATION_COMPLETE_ERROR = "Operation already completed" +NO_ATTEMPT_ERROR = "No active attempt" +HAS_ATTEMPT_ERROR = "Attempt already in progress" + class OperationType(Enum): """Enum for the type of operation being performed.""" @@ -98,14 +102,14 @@ class ActiveOperationMetric: """ op_type: OperationType - start_time: float + start_time: float = field(default_factory=time.monotonic) op_id: UUID = field(default_factory=uuid4) active_attempt: ActiveAttemptMetric | None = None cluster_id: str | None = None zone: str | None = None completed_attempts: list[CompletedAttemptMetric] = field(default_factory=list) was_completed: bool = False - _handlers: list[MetricsHandler] = field(default_factory=list) + handlers: list[MetricsHandler] = field(default_factory=list) is_streaming: bool = False # only True for read_rows operations def start(self) -> None: @@ -117,9 +121,9 @@ def start(self) -> None: exception or warning based on the value of ALLOW_METRIC_EXCEPTIONS. """ if self.was_completed: - return self._handle_error("Operation cannot be reset after completion") + return self._handle_error(OPERATION_COMPLETE_ERROR) if self.completed_attempts or self.active_attempt: - return self._handle_error("Cannot restart operation with active attempts") + return self._handle_error(HAS_ATTEMPT_ERROR) self.start_time = time.monotonic() def start_attempt(self) -> None: @@ -130,9 +134,9 @@ def start_attempt(self) -> None: will raise an exception or warning based on the value of ALLOW_METRIC_EXCEPTIONS. """ if self.was_completed: - return self._handle_error("Operation already completed") + return self._handle_error(OPERATION_COMPLETE_ERROR) if self.active_attempt is not None: - return self._handle_error("Incomplete attempt already exists") + return self._handle_error(HAS_ATTEMPT_ERROR) self.active_attempt = ActiveAttemptMetric() @@ -149,9 +153,9 @@ def add_call_metadata(self, metadata: dict[str, bytes | str]) -> None: - metadata: the metadata as extracted from the grpc call """ if self.was_completed: - return self._handle_error("Operation already completed") + return self._handle_error(OPERATION_COMPLETE_ERROR) if self.active_attempt is None: - return self._handle_error("No active attempt") + return self._handle_error(NO_ATTEMPT_ERROR) if self.cluster_id is None or self.zone is None: bigtable_metadata = metadata.get(BIGTABLE_METADATA_KEY) @@ -182,9 +186,9 @@ def attempt_first_response(self) -> None: exception or warning based on the value of ALLOW_METRIC_EXCEPTIONS. """ if self.was_completed: - return self._handle_error("Operation already completed") + return self._handle_error(OPERATION_COMPLETE_ERROR) elif self.active_attempt is None: - return self._handle_error("No active attempt") + return self._handle_error(NO_ATTEMPT_ERROR) elif self.active_attempt.first_response_latency is not None: return self._handle_error("Attempt already received first response") self.active_attempt.first_response_latency = ( @@ -202,9 +206,9 @@ def end_attempt_with_status(self, status: StatusCode | Exception) -> None: - status: The status of the attempt. """ if self.was_completed: - return self._handle_error("Operation already completed") + return self._handle_error(OPERATION_COMPLETE_ERROR) if self.active_attempt is None: - return self._handle_error("No active attempt") + return self._handle_error(NO_ATTEMPT_ERROR) new_attempt = CompletedAttemptMetric( start_time=self.active_attempt.start_time, @@ -232,7 +236,7 @@ def end_with_status(self, status: StatusCode | Exception) -> None: - status: The status of the operation. """ if self.was_completed: - return self._handle_error("Operation already completed") + return self._handle_error(OPERATION_COMPLETE_ERROR) if self.active_attempt is not None: self.end_attempt_with_status(status) self.was_completed = True @@ -249,7 +253,7 @@ def end_with_status(self, status: StatusCode | Exception) -> None: zone=self.zone or "global", is_streaming=self.is_streaming, ) - for handler in self._handlers: + for handler in self.handlers: handler.on_operation_complete(finalized) def end_with_success(self): diff --git a/google/cloud/bigtable/data/_metrics/metrics_controller.py b/google/cloud/bigtable/data/_metrics/metrics_controller.py index 8cafd943e..acef8f72f 100644 --- a/google/cloud/bigtable/data/_metrics/metrics_controller.py +++ b/google/cloud/bigtable/data/_metrics/metrics_controller.py @@ -75,10 +75,8 @@ def create_operation( - is_streaming: Whether the operation is a streaming operation. Should only be True for ReadRows operations. """ - start_time = time.monotonic() new_op = ActiveOperationMetric( op_type=op_type, - start_time=start_time, _handlers=self.handlers, is_streaming=is_streaming, ) diff --git a/tests/unit/data/_metrics/test_data_model.py b/tests/unit/data/_metrics/test_data_model.py new file mode 100644 index 000000000..0d00855d3 --- /dev/null +++ b/tests/unit/data/_metrics/test_data_model.py @@ -0,0 +1,152 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import time +import pytest +import mock +from uuid import UUID + + +class TestActiveOperationMetric: + + def _make_one(self, *args, **kwargs): + from google.cloud.bigtable.data._metrics.data_model import ActiveOperationMetric + return ActiveOperationMetric(*args, **kwargs) + + def test_ctor_defaults(self): + """ + create an instance with default values + """ + mock_type = mock.Mock() + metric = self._make_one(mock_type) + assert metric.op_type == mock_type + assert metric.start_time - time.monotonic() < 0.1 + assert metric.op_id is not None + assert isinstance(metric.op_id, UUID) + assert metric.active_attempt is None + assert metric.cluster_id is None + assert metric.zone is None + assert len(metric.completed_attempts) == 0 + assert metric.was_completed is False + assert len(metric.handlers) == 0 + assert metric.is_streaming is False + + def test_ctor_explicit(self): + """ + test with explicit arguments + """ + expected_type = mock.Mock() + expected_start_time = 12 + expected_op_id = 5 + expected_active_attempt = mock.Mock() + expected_cluster_id = "cluster" + expected_zone = "zone" + expected_completed_attempts = [mock.Mock()] + expected_was_completed = True + expected_handlers = [mock.Mock()] + expected_is_streaming = True + metric = self._make_one( + op_type=expected_type, + start_time=expected_start_time, + op_id=expected_op_id, + active_attempt=expected_active_attempt, + cluster_id=expected_cluster_id, + zone=expected_zone, + completed_attempts=expected_completed_attempts, + was_completed=expected_was_completed, + handlers=expected_handlers, + is_streaming=expected_is_streaming, + ) + assert metric.op_type == expected_type + assert metric.start_time == expected_start_time + assert metric.op_id == expected_op_id + assert metric.active_attempt == expected_active_attempt + assert metric.cluster_id == expected_cluster_id + assert metric.zone == expected_zone + assert metric.completed_attempts == expected_completed_attempts + assert metric.was_completed == expected_was_completed + assert metric.handlers == expected_handlers + assert metric.is_streaming == expected_is_streaming + + @pytest.mark.parametrize("method,args", [ + ("start", ()), + ("start_attempt", ()), + ("add_call_metadata", ({},)), + ("attempt_first_response", ()), + ("end_attempt_with_status", (mock.Mock(),)), + ("end_with_status", (mock.Mock(),)), + ("end_with_success", ()), + ]) + def test_error_completed_operation(self, method, args): + """ + calling any method on a completed operation should call _handle_error + to log or raise an error + """ + cls = type(self._make_one(mock.Mock())) + with mock.patch.object(cls, "_handle_error") as mock_handle_error: + mock_handle_error.return_value = None + metric = self._make_one(mock.Mock(), was_completed=True) + return_obj = getattr(metric, method)(*args) + assert return_obj is None + assert mock_handle_error.call_count == 1 + assert mock_handle_error.call_args[0][0] == "Operation already completed" + + @pytest.mark.parametrize("method,args", [ + ("add_call_metadata", ({},)), + ("attempt_first_response", ()), + ("end_attempt_with_status", (mock.Mock(),)), + ]) + def test_error_no_active_attempt(self, method, args): + """ + If a method on an attempt is called with no active attempt, call _handle_error + to log or raise an error + """ + cls = type(self._make_one(mock.Mock())) + with mock.patch.object(cls, "_handle_error") as mock_handle_error: + mock_handle_error.return_value = None + metric = self._make_one(mock.Mock()) + return_obj = getattr(metric, method)(*args) + assert return_obj is None + assert mock_handle_error.call_count == 1 + assert mock_handle_error.call_args[0][0] == "No active attempt" + + @pytest.mark.parametrize("method,args", [ + ("start", ({},)), + ("start_attempt", ()), + ]) + def test_error_has_active_attempt(self, method, args): + """ + If a method starting attempt is called with an active attempt, call _handle_error + to log or raise an error + """ + cls = type(self._make_one(mock.Mock())) + with mock.patch.object(cls, "_handle_error") as mock_handle_error: + mock_handle_error.return_value = None + metric = self._make_one(mock.Mock(), active_attempt=mock.Mock()) + return_obj = getattr(metric, method)(*args) + assert return_obj is None + assert mock_handle_error.call_count == 1 + assert mock_handle_error.call_args[0][0] == "Attempt already in progress" + + def test_start(self): + """ + calling start op operation should reset start_time + """ + orig_time = 0 + metric = self._make_one(mock.Mock(), start_time=orig_time) + assert abs(metric.start_time - time.monotonic()) > 5 + metric.start() + assert metric.start_time != orig_time + assert metric.start_time - time.monotonic() < 0.1 + diff --git a/tests/unit/data/test__helpers.py b/tests/unit/data/test__helpers.py index 6c11fa86a..1c0eefea7 100644 --- a/tests/unit/data/test__helpers.py +++ b/tests/unit/data/test__helpers.py @@ -1,3 +1,4 @@ +# Copyright 2023 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License.