Skip to content

Commit

Permalink
fixed lint issues
Browse files Browse the repository at this point in the history
  • Loading branch information
daniel-sanche committed Dec 14, 2023
1 parent 0e7887e commit 22f66b9
Show file tree
Hide file tree
Showing 11 changed files with 139 additions and 93 deletions.
5 changes: 3 additions & 2 deletions google/cloud/bigtable/data/_async/_read_rows.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
from typing import (
TYPE_CHECKING,
AsyncGenerator,
AsyncIterable,
Awaitable,
Sequence,
)
Expand Down Expand Up @@ -318,7 +317,9 @@ async def merge_rows(
yield Row(row_key, cells)
# most metric operations use setters, but this one updates
# the value directly to avoid extra overhead
operation.active_attempt.application_blocking_time += (time.monotonic() - block_time)
operation.active_attempt.application_blocking_time += (
time.monotonic() - block_time
)
break
c = await it.__anext__()
except _ResetRow as e:
Expand Down
4 changes: 3 additions & 1 deletion google/cloud/bigtable/data/_async/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1062,7 +1062,9 @@ async def mutate_row(
)
target = partial(
metric_wrapped,
row_key=row_key.encode("utf-8") if isinstance(row_key, str) else row_key,
row_key=row_key.encode("utf-8")
if isinstance(row_key, str)
else row_key,
mutations=[mutation._to_pb() for mutation in mutations_list],
table_name=self.table_name,
app_profile_id=self.app_profile_id,
Expand Down
17 changes: 13 additions & 4 deletions google/cloud/bigtable/data/_metrics/data_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@
# this allows us to be resistent to clock changes, eg DST
@dataclass(frozen=True)
class TimeTuple:
utc: datetime.datetime = field(default_factory=lambda: datetime.datetime.now(datetime.timezone.utc))
utc: datetime.datetime = field(
default_factory=lambda: datetime.datetime.now(datetime.timezone.utc)
)
monotonic: float = field(default_factory=time.monotonic)


Expand Down Expand Up @@ -249,7 +251,6 @@ def attempt_first_response(self) -> None:
time.monotonic() - self.active_attempt.start_time.monotonic
)


def end_attempt_with_status(self, status: StatusCode | Exception) -> None:
"""
Called to mark the end of a failed attempt for the operation.
Expand Down Expand Up @@ -328,7 +329,9 @@ def end_with_success(self):
"""
return self.end_with_status(StatusCode.OK)

def build_wrapped_predicate(self, inner_predicate: Callable[[Exception], bool]) -> Callable[[Exception], bool]:
def build_wrapped_predicate(
self, inner_predicate: Callable[[Exception], bool]
) -> Callable[[Exception], bool]:
"""
Wrapps a predicate to include metrics tracking. Any call to the resulting predicate
is assumed to be an rpc failure, and will either mark the end of the active attempt
Expand All @@ -337,13 +340,15 @@ def build_wrapped_predicate(self, inner_predicate: Callable[[Exception], bool])
Args:
- predicate: The predicate to wrap.
"""

def wrapped_predicate(exc: Exception) -> bool:
inner_result = inner_predicate(exc)
if inner_result:
self.end_attempt_with_status(exc)
else:
self.end_with_status(exc)
return inner_result

return wrapped_predicate

@staticmethod
Expand All @@ -363,7 +368,11 @@ def _exc_to_status(exc: Exception) -> StatusCode:
exc = exc.exceptions[-1]
if hasattr(exc, "grpc_status_code") and exc.grpc_status_code is not None:
return exc.grpc_status_code
if exc.__cause__ and hasattr(exc.__cause__, "grpc_status_code") and exc.__cause__.grpc_status_code is not None:
if (
exc.__cause__
and hasattr(exc.__cause__, "grpc_status_code")
and exc.__cause__.grpc_status_code is not None
):
return exc.__cause__.grpc_status_code
return StatusCode.UNKNOWN

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,7 @@ def on_attempt_complete(
**self.shared_labels,
}

self.otel.attempt_latencies.record(
attempt.duration, labels
)
self.otel.attempt_latencies.record(attempt.duration, labels)
self.otel.application_blocking_latencies.record(
attempt.application_blocking_time + attempt.backoff_before_attempt, labels
)
Expand Down
7 changes: 6 additions & 1 deletion tests/unit/data/_async/test__mutate_rows.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,12 @@ async def test_mutate_rows_operation(self):
f"{cls.__module__}.{cls.__name__}._run_attempt", AsyncMock()
) as attempt_mock:
instance = self._make_one(
client, table, entries, operation_timeout, operation_timeout, mock.Mock()
client,
table,
entries,
operation_timeout,
operation_timeout,
mock.Mock(),
)
await instance.start()
assert attempt_mock.call_count == 1
Expand Down
77 changes: 24 additions & 53 deletions tests/unit/data/_async/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,13 @@ class mock_grpc_call:
Used for mocking the responses from grpc calls. Can simulate both unary and streaming calls.
"""

def __init__(self, unary_response=None, stream_response=(), sleep_time=0,
initial_metadata=grpc.aio.Metadata(), trailing_metadata=grpc.aio.Metadata()
def __init__(
self,
unary_response=None,
stream_response=(),
sleep_time=0,
initial_metadata=grpc.aio.Metadata(),
trailing_metadata=grpc.aio.Metadata(),
):
self.unary_response = unary_response
self.stream_response = stream_response
Expand Down Expand Up @@ -2214,9 +2219,7 @@ async def test_mutate_row(self, mutation_arg):
expected_attempt_timeout = 19
async with self._make_client(project="project") as client:
table = client.get_table("instance", "table")
with mock.patch.object(
client._gapic_client, "mutate_row"
) as mock_gapic:
with mock.patch.object(client._gapic_client, "mutate_row") as mock_gapic:
mock_gapic.return_value = mock_grpc_call()
await table.mutate_row(
"row_key",
Expand Down Expand Up @@ -2254,16 +2257,12 @@ async def test_mutate_row_retryable_errors(self, retryable_exception):

async with self._make_client(project="project") as client:
table = client.get_table("instance", "table")
with mock.patch.object(
client._gapic_client, "mutate_row"
) as mock_gapic:
with mock.patch.object(client._gapic_client, "mutate_row") as mock_gapic:
mock_gapic.side_effect = retryable_exception("mock")
with pytest.raises(DeadlineExceeded) as e:
mutation = mutations.DeleteAllFromRow()
assert mutation.is_idempotent() is True
await table.mutate_row(
"row_key", mutation, operation_timeout=0.01
)
await table.mutate_row("row_key", mutation, operation_timeout=0.01)
cause = e.value.__cause__
assert isinstance(cause, RetryExceptionGroup)
assert isinstance(cause.exceptions[0], retryable_exception)
Expand All @@ -2284,18 +2283,12 @@ async def test_mutate_row_non_idempotent_retryable_errors(
"""
async with self._make_client(project="project") as client:
table = client.get_table("instance", "table")
with mock.patch.object(
client._gapic_client, "mutate_row"
) as mock_gapic:
with mock.patch.object(client._gapic_client, "mutate_row") as mock_gapic:
mock_gapic.side_effect = retryable_exception("mock")
with pytest.raises(retryable_exception):
mutation = mutations.SetCell(
"family", b"qualifier", b"value", -1
)
mutation = mutations.SetCell("family", b"qualifier", b"value", -1)
assert mutation.is_idempotent() is False
await table.mutate_row(
"row_key", mutation, operation_timeout=0.2
)
await table.mutate_row("row_key", mutation, operation_timeout=0.2)

@pytest.mark.parametrize(
"non_retryable_exception",
Expand All @@ -2312,9 +2305,7 @@ async def test_mutate_row_non_idempotent_retryable_errors(
async def test_mutate_row_non_retryable_errors(self, non_retryable_exception):
async with self._make_client(project="project") as client:
table = client.get_table("instance", "table")
with mock.patch.object(
client._gapic_client, "mutate_row"
) as mock_gapic:
with mock.patch.object(client._gapic_client, "mutate_row") as mock_gapic:
mock_gapic.side_effect = non_retryable_exception("mock")
with pytest.raises(non_retryable_exception):
mutation = mutations.SetCell(
Expand All @@ -2324,9 +2315,7 @@ async def test_mutate_row_non_retryable_errors(self, non_retryable_exception):
timestamp_micros=1234567890,
)
assert mutation.is_idempotent() is True
await table.mutate_row(
"row_key", mutation, operation_timeout=0.2
)
await table.mutate_row("row_key", mutation, operation_timeout=0.2)

@pytest.mark.parametrize("mutations", [[], None])
@pytest.mark.asyncio
Expand Down Expand Up @@ -2391,9 +2380,7 @@ async def test_bulk_mutate_rows(self, mutation_arg):
expected_attempt_timeout = 19
async with self._make_client(project="project") as client:
table = client.get_table("instance", "table")
with mock.patch.object(
client._gapic_client, "mutate_rows"
) as mock_gapic:
with mock.patch.object(client._gapic_client, "mutate_rows") as mock_gapic:
mock_gapic.return_value = self._mock_response([None])
bulk_mutation = mutations.RowMutationEntry(b"row_key", mutation_arg)
await table.bulk_mutate_rows(
Expand Down Expand Up @@ -2502,9 +2489,7 @@ async def test_bulk_mutate_rows_idempotent_mutation_error_non_retryable(

async with self._make_client(project="project") as client:
table = client.get_table("instance", "table")
with mock.patch.object(
client._gapic_client, "mutate_rows"
) as mock_gapic:
with mock.patch.object(client._gapic_client, "mutate_rows") as mock_gapic:
mock_gapic.side_effect = lambda *a, **k: self._mock_response(
[exception("mock")]
)
Expand Down Expand Up @@ -2542,9 +2527,7 @@ async def test_bulk_mutate_idempotent_retryable_request_errors(

async with self._make_client(project="project") as client:
table = client.get_table("instance", "table")
with mock.patch.object(
client._gapic_client, "mutate_rows"
) as mock_gapic:
with mock.patch.object(client._gapic_client, "mutate_rows") as mock_gapic:
mock_gapic.side_effect = retryable_exception("mock")
with pytest.raises(MutationsExceptionGroup) as e:
mutation = mutations.SetCell(
Expand Down Expand Up @@ -2580,16 +2563,12 @@ async def test_bulk_mutate_rows_non_idempotent_retryable_errors(

async with self._make_client(project="project") as client:
table = client.get_table("instance", "table")
with mock.patch.object(
client._gapic_client, "mutate_rows"
) as mock_gapic:
with mock.patch.object(client._gapic_client, "mutate_rows") as mock_gapic:
mock_gapic.side_effect = lambda *a, **k: self._mock_response(
[retryable_exception("mock")]
)
with pytest.raises(MutationsExceptionGroup) as e:
mutation = mutations.SetCell(
"family", b"qualifier", b"value", -1
)
mutation = mutations.SetCell("family", b"qualifier", b"value", -1)
entry = mutations.RowMutationEntry(b"row_key", [mutation])
assert mutation.is_idempotent() is False
await table.bulk_mutate_rows([entry], operation_timeout=0.2)
Expand Down Expand Up @@ -2622,9 +2601,7 @@ async def test_bulk_mutate_rows_non_retryable_errors(self, non_retryable_excepti

async with self._make_client(project="project") as client:
table = client.get_table("instance", "table")
with mock.patch.object(
client._gapic_client, "mutate_rows"
) as mock_gapic:
with mock.patch.object(client._gapic_client, "mutate_rows") as mock_gapic:
mock_gapic.side_effect = non_retryable_exception("mock")
with pytest.raises(MutationsExceptionGroup) as e:
mutation = mutations.SetCell(
Expand Down Expand Up @@ -2700,9 +2677,7 @@ async def test_bulk_mutate_error_recovery(self):

async with self._make_client(project="project") as client:
table = client.get_table("instance", "table")
with mock.patch.object(
client._gapic_client, "mutate_rows"
) as mock_gapic:
with mock.patch.object(client._gapic_client, "mutate_rows") as mock_gapic:
# fail with a retryable error, then a non-retryable one
mock_gapic.side_effect = [
self._mock_response([DeadlineExceeded("mock")]),
Expand All @@ -2712,9 +2687,7 @@ async def test_bulk_mutate_error_recovery(self):
"family", b"qualifier", b"value", timestamp_micros=123
)
entries = [
mutations.RowMutationEntry(
(f"row_key_{i}").encode(), [mutation]
)
mutations.RowMutationEntry((f"row_key_{i}").encode(), [mutation])
for i in range(3)
]
await table.bulk_mutate_rows(entries, operation_timeout=1000)
Expand Down Expand Up @@ -2929,9 +2902,7 @@ async def test_read_modify_write_call_rule_args(self, call_rules, expected_rules
client._gapic_client,
"read_modify_write_row",
) as mock_gapic:
mock_gapic.return_value = mock_grpc_call(
ReadModifyWriteRowResponse()
)
mock_gapic.return_value = mock_grpc_call(ReadModifyWriteRowResponse())
await table.read_modify_write_row("key", call_rules)
assert mock_gapic.call_count == 1
found_kwargs = mock_gapic.call_args_list[0][1]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,9 +303,7 @@ def test_attempt_update_connectivity_error_count(self):
assert add.call_count == 1
assert add.call_args[0][0] == 1

@pytest.mark.parametrize("app_blocking,backoff", [
(0, 10), (10, 0), (123, 456)
])
@pytest.mark.parametrize("app_blocking,backoff", [(0, 10), (10, 0), (123, 456)])
def test_attempt_update_application_blocking_latencies(self, app_blocking, backoff):
"""
update application_blocking_latencies on attempt completion
Expand Down
26 changes: 18 additions & 8 deletions tests/unit/data/_metrics/test_data_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ def test_ctor_defaults(self):
metric = self._make_one(mock_type)
assert metric.op_type == mock_type
assert metric.start_time.monotonic - time.monotonic() < 0.1
assert (metric.start_time.utc - datetime.datetime.now(datetime.timezone.utc)).total_seconds() < 1
assert (
metric.start_time.utc - datetime.datetime.now(datetime.timezone.utc)
).total_seconds() < 1
assert metric.active_attempt is None
assert metric.cluster_id is None
assert metric.zone is None
Expand Down Expand Up @@ -184,7 +186,9 @@ def test_start(self):
assert metric.start_time != orig_time
assert metric.start_time.monotonic != orig_time.monotonic
assert metric.start_time.monotonic - time.monotonic() < 0.1
assert metric.start_time.utc - datetime.datetime.now(datetime.timezone.utc) < datetime.timedelta(seconds=0.1)
assert metric.start_time.utc - datetime.datetime.now(
datetime.timezone.utc
) < datetime.timedelta(seconds=0.1)
# should remain in CREATED state after completing
assert metric.state == State.CREATED

Expand All @@ -200,7 +204,9 @@ def test_start_attempt(self):
assert isinstance(metric.active_attempt, ActiveAttemptMetric)
# make sure it was initialized with the correct values
assert time.monotonic() - metric.active_attempt.start_time.monotonic < 0.1
assert metric.active_attempt.start_time.utc - datetime.datetime.now(datetime.timezone.utc) < datetime.timedelta(seconds=0.1)
assert metric.active_attempt.start_time.utc - datetime.datetime.now(
datetime.timezone.utc
) < datetime.timedelta(seconds=0.1)
assert metric.active_attempt.first_response_latency is None
assert metric.active_attempt.gfe_latency is None
# should be in ACTIVE_ATTEMPT state after completing
Expand All @@ -211,7 +217,6 @@ def test_start_attempt_with_backoff_generator(self):
If operation has a backoff generator, it should be used to attach backoff
times to attempts
"""
from google.cloud.bigtable.data._metrics.data_model import ActiveAttemptMetric

def mock_generator():
"""
Expand Down Expand Up @@ -315,8 +320,8 @@ def test_add_response_metadata_cbt_header_w_error(self, metadata_field):
# no errors encountered
assert mock_handle_error.call_count == 1
assert (
"Failed to decode x-goog-ext-425905942-bin metadata:" in
mock_handle_error.call_args[0][0]
"Failed to decode x-goog-ext-425905942-bin metadata:"
in mock_handle_error.call_args[0][0]
)
assert str(metadata_field) in mock_handle_error.call_args[0][0]

Expand Down Expand Up @@ -416,7 +421,10 @@ def test_end_attempt_with_status(self):
got_attempt = metric.completed_attempts[0]
assert got_attempt.start_time == expected_start_time.utc
assert got_attempt.first_response_latency == expected_latency
assert time.monotonic() - got_attempt.duration - expected_start_time.monotonic < 0.001
assert (
time.monotonic() - got_attempt.duration - expected_start_time.monotonic
< 0.001
)
assert got_attempt.end_status == expected_status
assert got_attempt.gfe_latency == expected_gfe_latency
assert got_attempt.application_blocking_time == expected_app_blocking
Expand Down Expand Up @@ -665,7 +673,9 @@ def test__handle_error(self):
assert logger_mock.warning.call_args[0][0] == expected_message
assert len(logger_mock.warning.call_args[0]) == 1
# otherwise, do nothing
with mock.patch("google.cloud.bigtable.data._metrics.data_model.LOGGER", None):
with mock.patch(
"google.cloud.bigtable.data._metrics.data_model.LOGGER", None
):
type(self._make_one(object()))._handle_error(input_message)

@pytest.mark.asyncio
Expand Down
Loading

0 comments on commit 22f66b9

Please sign in to comment.