Skip to content

Commit

Permalink
started data model tests
Browse files Browse the repository at this point in the history
  • Loading branch information
daniel-sanche committed Nov 21, 2023
1 parent a7e4874 commit 5509c5e
Show file tree
Hide file tree
Showing 4 changed files with 171 additions and 16 deletions.
32 changes: 18 additions & 14 deletions google/cloud/bigtable/data/_metrics/data_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@

SERVER_TIMING_REGEX = re.compile(r"gfet4t7; dur=(\d+)")

OPERATION_COMPLETE_ERROR = "Operation already completed"
NO_ATTEMPT_ERROR = "No active attempt"
HAS_ATTEMPT_ERROR = "Attempt already in progress"


class OperationType(Enum):
"""Enum for the type of operation being performed."""
Expand Down Expand Up @@ -98,14 +102,14 @@ class ActiveOperationMetric:
"""

op_type: OperationType
start_time: float
start_time: float = field(default_factory=time.monotonic)
op_id: UUID = field(default_factory=uuid4)
active_attempt: ActiveAttemptMetric | None = None
cluster_id: str | None = None
zone: str | None = None
completed_attempts: list[CompletedAttemptMetric] = field(default_factory=list)
was_completed: bool = False
_handlers: list[MetricsHandler] = field(default_factory=list)
handlers: list[MetricsHandler] = field(default_factory=list)
is_streaming: bool = False # only True for read_rows operations

def start(self) -> None:
Expand All @@ -117,9 +121,9 @@ def start(self) -> None:
exception or warning based on the value of ALLOW_METRIC_EXCEPTIONS.
"""
if self.was_completed:
return self._handle_error("Operation cannot be reset after completion")
return self._handle_error(OPERATION_COMPLETE_ERROR)
if self.completed_attempts or self.active_attempt:
return self._handle_error("Cannot restart operation with active attempts")
return self._handle_error(HAS_ATTEMPT_ERROR)
self.start_time = time.monotonic()

def start_attempt(self) -> None:
Expand All @@ -130,9 +134,9 @@ def start_attempt(self) -> None:
will raise an exception or warning based on the value of ALLOW_METRIC_EXCEPTIONS.
"""
if self.was_completed:
return self._handle_error("Operation already completed")
return self._handle_error(OPERATION_COMPLETE_ERROR)
if self.active_attempt is not None:
return self._handle_error("Incomplete attempt already exists")
return self._handle_error(HAS_ATTEMPT_ERROR)

self.active_attempt = ActiveAttemptMetric()

Expand All @@ -149,9 +153,9 @@ def add_call_metadata(self, metadata: dict[str, bytes | str]) -> None:
- metadata: the metadata as extracted from the grpc call
"""
if self.was_completed:
return self._handle_error("Operation already completed")
return self._handle_error(OPERATION_COMPLETE_ERROR)
if self.active_attempt is None:
return self._handle_error("No active attempt")
return self._handle_error(NO_ATTEMPT_ERROR)

if self.cluster_id is None or self.zone is None:
bigtable_metadata = metadata.get(BIGTABLE_METADATA_KEY)
Expand Down Expand Up @@ -182,9 +186,9 @@ def attempt_first_response(self) -> None:
exception or warning based on the value of ALLOW_METRIC_EXCEPTIONS.
"""
if self.was_completed:
return self._handle_error("Operation already completed")
return self._handle_error(OPERATION_COMPLETE_ERROR)
elif self.active_attempt is None:
return self._handle_error("No active attempt")
return self._handle_error(NO_ATTEMPT_ERROR)
elif self.active_attempt.first_response_latency is not None:
return self._handle_error("Attempt already received first response")
self.active_attempt.first_response_latency = (
Expand All @@ -202,9 +206,9 @@ def end_attempt_with_status(self, status: StatusCode | Exception) -> None:
- status: The status of the attempt.
"""
if self.was_completed:
return self._handle_error("Operation already completed")
return self._handle_error(OPERATION_COMPLETE_ERROR)
if self.active_attempt is None:
return self._handle_error("No active attempt")
return self._handle_error(NO_ATTEMPT_ERROR)

new_attempt = CompletedAttemptMetric(
start_time=self.active_attempt.start_time,
Expand Down Expand Up @@ -232,7 +236,7 @@ def end_with_status(self, status: StatusCode | Exception) -> None:
- status: The status of the operation.
"""
if self.was_completed:
return self._handle_error("Operation already completed")
return self._handle_error(OPERATION_COMPLETE_ERROR)
if self.active_attempt is not None:
self.end_attempt_with_status(status)
self.was_completed = True
Expand All @@ -249,7 +253,7 @@ def end_with_status(self, status: StatusCode | Exception) -> None:
zone=self.zone or "global",
is_streaming=self.is_streaming,
)
for handler in self._handlers:
for handler in self.handlers:
handler.on_operation_complete(finalized)

def end_with_success(self):
Expand Down
2 changes: 0 additions & 2 deletions google/cloud/bigtable/data/_metrics/metrics_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,8 @@ def create_operation(
- is_streaming: Whether the operation is a streaming operation. Should only be
True for ReadRows operations.
"""
start_time = time.monotonic()
new_op = ActiveOperationMetric(
op_type=op_type,
start_time=start_time,
_handlers=self.handlers,
is_streaming=is_streaming,
)
Expand Down
152 changes: 152 additions & 0 deletions tests/unit/data/_metrics/test_data_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import time
import pytest
import mock
from uuid import UUID


class TestActiveOperationMetric:

def _make_one(self, *args, **kwargs):
from google.cloud.bigtable.data._metrics.data_model import ActiveOperationMetric
return ActiveOperationMetric(*args, **kwargs)

def test_ctor_defaults(self):
"""
create an instance with default values
"""
mock_type = mock.Mock()
metric = self._make_one(mock_type)
assert metric.op_type == mock_type
assert metric.start_time - time.monotonic() < 0.1
assert metric.op_id is not None
assert isinstance(metric.op_id, UUID)
assert metric.active_attempt is None
assert metric.cluster_id is None
assert metric.zone is None
assert len(metric.completed_attempts) == 0
assert metric.was_completed is False
assert len(metric.handlers) == 0
assert metric.is_streaming is False

def test_ctor_explicit(self):
"""
test with explicit arguments
"""
expected_type = mock.Mock()
expected_start_time = 12
expected_op_id = 5
expected_active_attempt = mock.Mock()
expected_cluster_id = "cluster"
expected_zone = "zone"
expected_completed_attempts = [mock.Mock()]
expected_was_completed = True
expected_handlers = [mock.Mock()]
expected_is_streaming = True
metric = self._make_one(
op_type=expected_type,
start_time=expected_start_time,
op_id=expected_op_id,
active_attempt=expected_active_attempt,
cluster_id=expected_cluster_id,
zone=expected_zone,
completed_attempts=expected_completed_attempts,
was_completed=expected_was_completed,
handlers=expected_handlers,
is_streaming=expected_is_streaming,
)
assert metric.op_type == expected_type
assert metric.start_time == expected_start_time
assert metric.op_id == expected_op_id
assert metric.active_attempt == expected_active_attempt
assert metric.cluster_id == expected_cluster_id
assert metric.zone == expected_zone
assert metric.completed_attempts == expected_completed_attempts
assert metric.was_completed == expected_was_completed
assert metric.handlers == expected_handlers
assert metric.is_streaming == expected_is_streaming

@pytest.mark.parametrize("method,args", [
("start", ()),
("start_attempt", ()),
("add_call_metadata", ({},)),
("attempt_first_response", ()),
("end_attempt_with_status", (mock.Mock(),)),
("end_with_status", (mock.Mock(),)),
("end_with_success", ()),
])
def test_error_completed_operation(self, method, args):
"""
calling any method on a completed operation should call _handle_error
to log or raise an error
"""
cls = type(self._make_one(mock.Mock()))
with mock.patch.object(cls, "_handle_error") as mock_handle_error:
mock_handle_error.return_value = None
metric = self._make_one(mock.Mock(), was_completed=True)
return_obj = getattr(metric, method)(*args)
assert return_obj is None
assert mock_handle_error.call_count == 1
assert mock_handle_error.call_args[0][0] == "Operation already completed"

@pytest.mark.parametrize("method,args", [
("add_call_metadata", ({},)),
("attempt_first_response", ()),
("end_attempt_with_status", (mock.Mock(),)),
])
def test_error_no_active_attempt(self, method, args):
"""
If a method on an attempt is called with no active attempt, call _handle_error
to log or raise an error
"""
cls = type(self._make_one(mock.Mock()))
with mock.patch.object(cls, "_handle_error") as mock_handle_error:
mock_handle_error.return_value = None
metric = self._make_one(mock.Mock())
return_obj = getattr(metric, method)(*args)
assert return_obj is None
assert mock_handle_error.call_count == 1
assert mock_handle_error.call_args[0][0] == "No active attempt"

@pytest.mark.parametrize("method,args", [
("start", ({},)),
("start_attempt", ()),
])
def test_error_has_active_attempt(self, method, args):
"""
If a method starting attempt is called with an active attempt, call _handle_error
to log or raise an error
"""
cls = type(self._make_one(mock.Mock()))
with mock.patch.object(cls, "_handle_error") as mock_handle_error:
mock_handle_error.return_value = None
metric = self._make_one(mock.Mock(), active_attempt=mock.Mock())
return_obj = getattr(metric, method)(*args)
assert return_obj is None
assert mock_handle_error.call_count == 1
assert mock_handle_error.call_args[0][0] == "Attempt already in progress"

def test_start(self):
"""
calling start op operation should reset start_time
"""
orig_time = 0
metric = self._make_one(mock.Mock(), start_time=orig_time)
assert abs(metric.start_time - time.monotonic()) > 5
metric.start()
assert metric.start_time != orig_time
assert metric.start_time - time.monotonic() < 0.1

1 change: 1 addition & 0 deletions tests/unit/data/test__helpers.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down

0 comments on commit 5509c5e

Please sign in to comment.