From 9342e2703cdabc174943dd67102b00c1119506b1 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Fri, 15 Dec 2023 12:30:54 -0800 Subject: [PATCH] chore: update api_core submodule (#897) --- google/__init__.py | 6 -- google/cloud/__init__.py | 6 -- google/cloud/bigtable/data/__init__.py | 2 - .../bigtable/data/_async/_mutate_rows.py | 33 +++------ .../cloud/bigtable/data/_async/_read_rows.py | 42 +---------- google/cloud/bigtable/data/_async/client.py | 71 +++++++----------- .../bigtable/data/_async/mutations_batcher.py | 3 + google/cloud/bigtable/data/_helpers.py | 74 +++++++------------ google/cloud/bigtable/data/exceptions.py | 9 --- mypy.ini | 2 +- noxfile.py | 13 ++-- owlbot.py | 3 +- python-api-core | 2 +- setup.py | 12 +-- testing/constraints-3.7.txt | 2 +- testing/constraints-3.8.txt | 3 +- tests/system/data/test_system.py | 54 +++++++------- tests/unit/data/_async/test__mutate_rows.py | 12 +-- tests/unit/data/_async/test_client.py | 40 +++++----- .../data/_async/test_mutations_batcher.py | 4 +- tests/unit/data/test__helpers.py | 66 ----------------- 21 files changed, 142 insertions(+), 317 deletions(-) delete mode 100644 google/__init__.py delete mode 100644 google/cloud/__init__.py diff --git a/google/__init__.py b/google/__init__.py deleted file mode 100644 index a5ba80656..000000000 --- a/google/__init__.py +++ /dev/null @@ -1,6 +0,0 @@ -try: - import pkg_resources - - pkg_resources.declare_namespace(__name__) -except ImportError: - pass diff --git a/google/cloud/__init__.py b/google/cloud/__init__.py deleted file mode 100644 index a5ba80656..000000000 --- a/google/cloud/__init__.py +++ /dev/null @@ -1,6 +0,0 @@ -try: - import pkg_resources - - pkg_resources.declare_namespace(__name__) -except ImportError: - pass diff --git a/google/cloud/bigtable/data/__init__.py b/google/cloud/bigtable/data/__init__.py index a68be5417..5229f8021 100644 --- a/google/cloud/bigtable/data/__init__.py +++ b/google/cloud/bigtable/data/__init__.py @@ -32,7 +32,6 @@ from google.cloud.bigtable.data.mutations import DeleteAllFromFamily from google.cloud.bigtable.data.mutations import DeleteAllFromRow -from google.cloud.bigtable.data.exceptions import IdleTimeout from google.cloud.bigtable.data.exceptions import InvalidChunk from google.cloud.bigtable.data.exceptions import FailedMutationEntryError from google.cloud.bigtable.data.exceptions import FailedQueryShardError @@ -63,7 +62,6 @@ "DeleteAllFromRow", "Row", "Cell", - "IdleTimeout", "InvalidChunk", "FailedMutationEntryError", "FailedQueryShardError", diff --git a/google/cloud/bigtable/data/_async/_mutate_rows.py b/google/cloud/bigtable/data/_async/_mutate_rows.py index 5971a9894..d4ffdee22 100644 --- a/google/cloud/bigtable/data/_async/_mutate_rows.py +++ b/google/cloud/bigtable/data/_async/_mutate_rows.py @@ -15,17 +15,16 @@ from __future__ import annotations from typing import Sequence, TYPE_CHECKING -import asyncio from dataclasses import dataclass import functools from google.api_core import exceptions as core_exceptions -from google.api_core import retry_async as retries +from google.api_core import retry as retries import google.cloud.bigtable_v2.types.bigtable as types_pb import google.cloud.bigtable.data.exceptions as bt_exceptions from google.cloud.bigtable.data._helpers import _make_metadata -from google.cloud.bigtable.data._helpers import _convert_retry_deadline from google.cloud.bigtable.data._helpers import _attempt_timeout_generator +from google.cloud.bigtable.data._helpers import _retry_exception_factory # mutate_rows requests are limited to this number of mutations from google.cloud.bigtable.data.mutations import _MUTATE_ROWS_REQUEST_MUTATION_LIMIT @@ -101,17 +100,13 @@ def __init__( # Entry level errors bt_exceptions._MutateRowsIncomplete, ) - # build retryable operation - retry = retries.AsyncRetry( - predicate=self.is_retryable, - timeout=operation_timeout, - initial=0.01, - multiplier=2, - maximum=60, - ) - retry_wrapped = retry(self._run_attempt) - self._operation = _convert_retry_deadline( - retry_wrapped, operation_timeout, is_async=True + sleep_generator = retries.exponential_sleep_generator(0.01, 2, 60) + self._operation = retries.retry_target_async( + self._run_attempt, + self.is_retryable, + sleep_generator, + operation_timeout, + exception_factory=_retry_exception_factory, ) # initialize state self.timeout_generator = _attempt_timeout_generator( @@ -130,7 +125,7 @@ async def start(self): """ try: # trigger mutate_rows - await self._operation() + await self._operation except Exception as exc: # exceptions raised by retryable are added to the list of exceptions for all unfinalized mutations incomplete_indices = self.remaining_indices.copy() @@ -180,6 +175,7 @@ async def _run_attempt(self): result_generator = await self._gapic_fn( timeout=next(self.timeout_generator), entries=request_entries, + retry=None, ) async for result_list in result_generator: for result in result_list.entries: @@ -195,13 +191,6 @@ async def _run_attempt(self): self._handle_entry_error(orig_idx, entry_error) # remove processed entry from active list del active_request_indices[result.index] - except asyncio.CancelledError: - # when retry wrapper timeout expires, the operation is cancelled - # make sure incomplete indices are tracked, - # but don't record exception (it will be raised by wrapper) - # TODO: remove asyncio.wait_for in retry wrapper. Let grpc call handle expiration - self.remaining_indices.extend(active_request_indices.values()) - raise except Exception as exc: # add this exception to list for each mutation that wasn't # already handled, and update remaining_indices if mutation is retryable diff --git a/google/cloud/bigtable/data/_async/_read_rows.py b/google/cloud/bigtable/data/_async/_read_rows.py index ad1f7b84d..9e0fd78e1 100644 --- a/google/cloud/bigtable/data/_async/_read_rows.py +++ b/google/cloud/bigtable/data/_async/_read_rows.py @@ -31,15 +31,13 @@ from google.cloud.bigtable.data.row import Row, Cell from google.cloud.bigtable.data.read_rows_query import ReadRowsQuery from google.cloud.bigtable.data.exceptions import InvalidChunk -from google.cloud.bigtable.data.exceptions import RetryExceptionGroup from google.cloud.bigtable.data.exceptions import _RowSetComplete from google.cloud.bigtable.data._helpers import _attempt_timeout_generator from google.cloud.bigtable.data._helpers import _make_metadata +from google.cloud.bigtable.data._helpers import _retry_exception_factory -from google.api_core import retry_async as retries -from google.api_core.retry_streaming_async import retry_target_stream +from google.api_core import retry as retries from google.api_core.retry import exponential_sleep_generator -from google.api_core import exceptions as core_exceptions if TYPE_CHECKING: from google.cloud.bigtable.data._async.client import TableAsync @@ -107,12 +105,12 @@ def start_operation(self) -> AsyncGenerator[Row, None]: """ Start the read_rows operation, retrying on retryable errors. """ - return retry_target_stream( + return retries.retry_target_stream_async( self._read_rows_attempt, self._predicate, exponential_sleep_generator(0.01, 60, multiplier=2), self.operation_timeout, - exception_factory=self._build_exception, + exception_factory=_retry_exception_factory, ) def _read_rows_attempt(self) -> AsyncGenerator[Row, None]: @@ -343,35 +341,3 @@ def _revise_request_rowset( # this will avoid an unwanted full table scan raise _RowSetComplete() return RowSetPB(row_keys=adjusted_keys, row_ranges=adjusted_ranges) - - @staticmethod - def _build_exception( - exc_list: list[Exception], is_timeout: bool, timeout_val: float - ) -> tuple[Exception, Exception | None]: - """ - Build retry error based on exceptions encountered during operation - - Args: - - exc_list: list of exceptions encountered during operation - - is_timeout: whether the operation failed due to timeout - - timeout_val: the operation timeout value in seconds, for constructing - the error message - Returns: - - tuple of the exception to raise, and a cause exception if applicable - """ - if is_timeout: - # if failed due to timeout, raise deadline exceeded as primary exception - source_exc: Exception = core_exceptions.DeadlineExceeded( - f"operation_timeout of {timeout_val} exceeded" - ) - elif exc_list: - # otherwise, raise non-retryable error as primary exception - source_exc = exc_list.pop() - else: - source_exc = RuntimeError("failed with unspecified exception") - # use the retry exception group as the cause of the exception - cause_exc: Exception | None = ( - RetryExceptionGroup(exc_list) if exc_list else None - ) - source_exc.__cause__ = cause_exc - return source_exc, cause_exc diff --git a/google/cloud/bigtable/data/_async/client.py b/google/cloud/bigtable/data/_async/client.py index a79ead7f8..d0578ff1a 100644 --- a/google/cloud/bigtable/data/_async/client.py +++ b/google/cloud/bigtable/data/_async/client.py @@ -33,6 +33,7 @@ import random import os +from functools import partial from google.cloud.bigtable_v2.services.bigtable.client import BigtableClientMeta from google.cloud.bigtable_v2.services.bigtable.async_client import BigtableAsyncClient @@ -43,9 +44,8 @@ ) from google.cloud.bigtable_v2.types.bigtable import PingAndWarmRequest from google.cloud.client import ClientWithProject -from google.api_core.exceptions import GoogleAPICallError from google.cloud.environment_vars import BIGTABLE_EMULATOR # type: ignore -from google.api_core import retry_async as retries +from google.api_core import retry as retries from google.api_core.exceptions import DeadlineExceeded from google.api_core.exceptions import ServiceUnavailable from google.api_core.exceptions import Aborted @@ -65,7 +65,7 @@ from google.cloud.bigtable.data._helpers import _WarmedInstanceKey from google.cloud.bigtable.data._helpers import _CONCURRENCY_LIMIT from google.cloud.bigtable.data._helpers import _make_metadata -from google.cloud.bigtable.data._helpers import _convert_retry_deadline +from google.cloud.bigtable.data._helpers import _retry_exception_factory from google.cloud.bigtable.data._helpers import _validate_timeouts from google.cloud.bigtable.data._helpers import _get_retryable_errors from google.cloud.bigtable.data._helpers import _get_timeouts @@ -223,7 +223,7 @@ async def close(self, timeout: float = 2.0): async def _ping_and_warm_instances( self, channel: grpc.aio.Channel, instance_key: _WarmedInstanceKey | None = None - ) -> list[GoogleAPICallError | None]: + ) -> list[BaseException | None]: """ Prepares the backend for requests on a channel @@ -578,7 +578,6 @@ async def read_rows_stream( will be chained with a RetryExceptionGroup containing GoogleAPIError exceptions from any retries that failed - GoogleAPIError: raised if the request encounters an unrecoverable error - - IdleTimeout: if iterator was abandoned """ operation_timeout, attempt_timeout = _get_timeouts( operation_timeout, attempt_timeout, self @@ -761,6 +760,9 @@ async def read_rows_sharded( for result in batch_result: if isinstance(result, Exception): error_dict[shard_idx] = result + elif isinstance(result, BaseException): + # BaseException not expected; raise immediately + raise result else: results_list.extend(result) shard_idx += 1 @@ -872,22 +874,8 @@ async def sample_row_keys( # prepare retryable retryable_excs = _get_retryable_errors(retryable_errors, self) predicate = retries.if_exception_type(*retryable_excs) - transient_errors = [] - def on_error_fn(exc): - # add errors to list if retryable - if predicate(exc): - transient_errors.append(exc) - - retry = retries.AsyncRetry( - predicate=predicate, - timeout=operation_timeout, - initial=0.01, - multiplier=2, - maximum=60, - on_error=on_error_fn, - is_stream=False, - ) + sleep_generator = retries.exponential_sleep_generator(0.01, 2, 60) # prepare request metadata = _make_metadata(self.table_name, self.app_profile_id) @@ -902,10 +890,13 @@ async def execute_rpc(): ) return [(s.row_key, s.offset_bytes) async for s in results] - wrapped_fn = _convert_retry_deadline( - retry(execute_rpc), operation_timeout, transient_errors, is_async=True + return await retries.retry_target_async( + execute_rpc, + predicate, + sleep_generator, + operation_timeout, + exception_factory=_retry_exception_factory, ) - return await wrapped_fn() def mutations_batcher( self, @@ -1014,37 +1005,25 @@ async def mutate_row( # mutations should not be retried predicate = retries.if_exception_type() - transient_errors = [] - - def on_error_fn(exc): - if predicate(exc): - transient_errors.append(exc) + sleep_generator = retries.exponential_sleep_generator(0.01, 2, 60) - retry = retries.AsyncRetry( - predicate=predicate, - on_error=on_error_fn, - timeout=operation_timeout, - initial=0.01, - multiplier=2, - maximum=60, - ) - # wrap rpc in retry logic - retry_wrapped = retry(self.client._gapic_client.mutate_row) - # convert RetryErrors from retry wrapper into DeadlineExceeded errors - deadline_wrapped = _convert_retry_deadline( - retry_wrapped, operation_timeout, transient_errors, is_async=True - ) - metadata = _make_metadata(self.table_name, self.app_profile_id) - # trigger rpc - await deadline_wrapped( + target = partial( + self.client._gapic_client.mutate_row, row_key=row_key.encode("utf-8") if isinstance(row_key, str) else row_key, mutations=[mutation._to_pb() for mutation in mutations_list], table_name=self.table_name, app_profile_id=self.app_profile_id, timeout=attempt_timeout, - metadata=metadata, + metadata=_make_metadata(self.table_name, self.app_profile_id), retry=None, ) + return await retries.retry_target_async( + target, + predicate, + sleep_generator, + operation_timeout, + exception_factory=_retry_exception_factory, + ) async def bulk_mutate_rows( self, diff --git a/google/cloud/bigtable/data/_async/mutations_batcher.py b/google/cloud/bigtable/data/_async/mutations_batcher.py index b2da30040..5d5dd535e 100644 --- a/google/cloud/bigtable/data/_async/mutations_batcher.py +++ b/google/cloud/bigtable/data/_async/mutations_batcher.py @@ -489,6 +489,9 @@ async def _wait_for_batch_results( if isinstance(result, Exception): # will receive direct Exception objects if request task fails found_errors.append(result) + elif isinstance(result, BaseException): + # BaseException not expected from grpc calls. Raise immediately + raise result elif result: # completed requests will return a list of FailedMutationEntryError for e in result: diff --git a/google/cloud/bigtable/data/_helpers.py b/google/cloud/bigtable/data/_helpers.py index 96ea1d1ce..a0b13cbaf 100644 --- a/google/cloud/bigtable/data/_helpers.py +++ b/google/cloud/bigtable/data/_helpers.py @@ -16,13 +16,14 @@ """ from __future__ import annotations -from typing import Callable, Sequence, List, Tuple, Any, TYPE_CHECKING +from typing import Sequence, List, Tuple, TYPE_CHECKING import time import enum from collections import namedtuple from google.cloud.bigtable.data.read_rows_query import ReadRowsQuery from google.api_core import exceptions as core_exceptions +from google.api_core.retry import RetryFailureReason from google.cloud.bigtable.data.exceptions import RetryExceptionGroup if TYPE_CHECKING: @@ -96,56 +97,37 @@ def _attempt_timeout_generator( yield max(0, min(per_request_timeout, deadline - time.monotonic())) -# TODO:replace this function with an exception_factory passed into the retry when -# feature is merged: -# https://github.com/googleapis/python-bigtable/blob/ea5b4f923e42516729c57113ddbe28096841b952/google/cloud/bigtable/data/_async/_read_rows.py#L130 -def _convert_retry_deadline( - func: Callable[..., Any], - timeout_value: float | None = None, - retry_errors: list[Exception] | None = None, - is_async: bool = False, -): +def _retry_exception_factory( + exc_list: list[Exception], + reason: RetryFailureReason, + timeout_val: float | None, +) -> tuple[Exception, Exception | None]: """ - Decorator to convert RetryErrors raised by api_core.retry into - DeadlineExceeded exceptions, indicating that the underlying retries have - exhaused the timeout value. - Optionally attaches a RetryExceptionGroup to the DeadlineExceeded.__cause__, - detailing the failed exceptions associated with each retry. - - Supports both sync and async function wrapping. + Build retry error based on exceptions encountered during operation Args: - - func: The function to decorate - - timeout_value: The timeout value to display in the DeadlineExceeded error message - - retry_errors: An optional list of exceptions to attach as a RetryExceptionGroup to the DeadlineExceeded.__cause__ + - exc_list: list of exceptions encountered during operation + - is_timeout: whether the operation failed due to timeout + - timeout_val: the operation timeout value in seconds, for constructing + the error message + Returns: + - tuple of the exception to raise, and a cause exception if applicable """ - timeout_str = f" of {timeout_value:.1f}s" if timeout_value is not None else "" - error_str = f"operation_timeout{timeout_str} exceeded" - - def handle_error(): - new_exc = core_exceptions.DeadlineExceeded( - error_str, + if reason == RetryFailureReason.TIMEOUT: + timeout_val_str = f"of {timeout_val:0.1f}s " if timeout_val is not None else "" + # if failed due to timeout, raise deadline exceeded as primary exception + source_exc: Exception = core_exceptions.DeadlineExceeded( + f"operation_timeout{timeout_val_str} exceeded" ) - source_exc = None - if retry_errors: - source_exc = RetryExceptionGroup(retry_errors) - new_exc.__cause__ = source_exc - raise new_exc from source_exc - - # separate wrappers for async and sync functions - async def wrapper_async(*args, **kwargs): - try: - return await func(*args, **kwargs) - except core_exceptions.RetryError: - handle_error() - - def wrapper(*args, **kwargs): - try: - return func(*args, **kwargs) - except core_exceptions.RetryError: - handle_error() - - return wrapper_async if is_async else wrapper + elif exc_list: + # otherwise, raise non-retryable error as primary exception + source_exc = exc_list.pop() + else: + source_exc = RuntimeError("failed with unspecified exception") + # use the retry exception group as the cause of the exception + cause_exc: Exception | None = RetryExceptionGroup(exc_list) if exc_list else None + source_exc.__cause__ = cause_exc + return source_exc, cause_exc def _get_timeouts( diff --git a/google/cloud/bigtable/data/exceptions.py b/google/cloud/bigtable/data/exceptions.py index 7344874df..3c73ec4e9 100644 --- a/google/cloud/bigtable/data/exceptions.py +++ b/google/cloud/bigtable/data/exceptions.py @@ -28,15 +28,6 @@ from google.cloud.bigtable.data.read_rows_query import ReadRowsQuery -class IdleTimeout(core_exceptions.DeadlineExceeded): - """ - Exception raised by ReadRowsIterator when the generator - has been idle for longer than the internal idle_timeout. - """ - - pass - - class InvalidChunk(core_exceptions.GoogleAPICallError): """Exception raised to invalid chunk data from back-end.""" diff --git a/mypy.ini b/mypy.ini index f12ed46fc..3a17a37c6 100644 --- a/mypy.ini +++ b/mypy.ini @@ -1,5 +1,5 @@ [mypy] -python_version = 3.6 +python_version = 3.7 namespace_packages = True exclude = tests/unit/gapic/ diff --git a/noxfile.py b/noxfile.py index e1d2f4acc..2e053ffcf 100644 --- a/noxfile.py +++ b/noxfile.py @@ -39,9 +39,7 @@ "pytest-cov", "pytest-asyncio", ] -UNIT_TEST_EXTERNAL_DEPENDENCIES = [ - # "git+https://github.com/googleapis/python-api-core.git@retry_generators" -] +UNIT_TEST_EXTERNAL_DEPENDENCIES = [] UNIT_TEST_LOCAL_DEPENDENCIES = [] UNIT_TEST_DEPENDENCIES = [] UNIT_TEST_EXTRAS = [] @@ -54,9 +52,7 @@ "pytest-asyncio", "google-cloud-testutils", ] -SYSTEM_TEST_EXTERNAL_DEPENDENCIES = [ - # "git+https://github.com/googleapis/python-api-core.git@retry_generators" -] +SYSTEM_TEST_EXTERNAL_DEPENDENCIES = [] SYSTEM_TEST_LOCAL_DEPENDENCIES = [] UNIT_TEST_DEPENDENCIES = [] SYSTEM_TEST_DEPENDENCIES = [] @@ -138,7 +134,8 @@ def mypy(session): session.install("google-cloud-testutils") session.run( "mypy", - "google/cloud/bigtable/data", + "-p", + "google.cloud.bigtable.data", "--check-untyped-defs", "--warn-unreachable", "--disallow-any-generics", @@ -460,7 +457,7 @@ def prerelease_deps(session): # Exclude version 1.52.0rc1 which has a known issue. See https://github.com/grpc/grpc/issues/32163 "grpcio!=1.52.0rc1", "grpcio-status", - "google-api-core==2.12.0.dev1", # TODO: remove this once streaming retries is merged + "google-api-core==2.16.0rc0", # TODO: remove pin once streaming retries is merged "proto-plus", "google-cloud-testutils", # dependencies of google-cloud-testutils" diff --git a/owlbot.py b/owlbot.py index b542b3246..9ca859fb9 100644 --- a/owlbot.py +++ b/owlbot.py @@ -170,7 +170,8 @@ def mypy(session): session.install("google-cloud-testutils") session.run( "mypy", - "google/cloud/bigtable", + "-p", + "google.cloud.bigtable", "--check-untyped-defs", "--warn-unreachable", "--disallow-any-generics", diff --git a/python-api-core b/python-api-core index a8cfa66b8..17ff5f1d8 160000 --- a/python-api-core +++ b/python-api-core @@ -1 +1 @@ -Subproject commit a8cfa66b8d6001da56823c6488b5da4957e5702b +Subproject commit 17ff5f1d83a9a6f50a0226fb0e794634bd584f17 diff --git a/setup.py b/setup.py index e5efc9937..0bce3a5d6 100644 --- a/setup.py +++ b/setup.py @@ -37,8 +37,8 @@ # 'Development Status :: 5 - Production/Stable' release_status = "Development Status :: 5 - Production/Stable" dependencies = [ - "google-api-core[grpc] == 2.12.0.dev1", # TODO: change to >= after streaming retries is merged - "google-cloud-core >= 1.4.1, <3.0.0dev", + "google-api-core[grpc] >= 2.16.0rc0", + "google-cloud-core >= 1.4.4, <3.0.0dev", "grpc-google-iam-v1 >= 0.12.4, <1.0.0dev", "proto-plus >= 1.22.0, <2.0.0dev", "proto-plus >= 1.22.2, <2.0.0dev; python_version>='3.11'", @@ -59,15 +59,10 @@ # benchmarks, etc. packages = [ package - for package in setuptools.PEP420PackageFinder.find() + for package in setuptools.find_namespace_packages() if package.startswith("google") ] -# Determine which namespaces are needed. -namespaces = ["google"] -if "google.cloud" in packages: - namespaces.append("google.cloud") - setuptools.setup( name=name, @@ -93,7 +88,6 @@ ], platforms="Posix; MacOS X; Windows", packages=packages, - namespace_packages=namespaces, install_requires=dependencies, extras_require=extras, scripts=[ diff --git a/testing/constraints-3.7.txt b/testing/constraints-3.7.txt index 9f23121d1..83bfe4577 100644 --- a/testing/constraints-3.7.txt +++ b/testing/constraints-3.7.txt @@ -5,7 +5,7 @@ # # e.g., if setup.py has "foo >= 1.14.0, < 2.0.0dev", # Then this file should have foo==1.14.0 -google-api-core==2.12.0.dev1 +google-api-core==2.16.0rc0 google-cloud-core==2.3.2 grpc-google-iam-v1==0.12.4 proto-plus==1.22.0 diff --git a/testing/constraints-3.8.txt b/testing/constraints-3.8.txt index 7045a2894..505ba9934 100644 --- a/testing/constraints-3.8.txt +++ b/testing/constraints-3.8.txt @@ -5,9 +5,10 @@ # # e.g., if setup.py has "foo >= 1.14.0, < 2.0.0dev", # Then this file should have foo==1.14.0 -google-api-core==2.12.0.dev1 +google-api-core==2.16.0rc0 google-cloud-core==2.3.2 grpc-google-iam-v1==0.12.4 proto-plus==1.22.0 libcst==0.2.5 protobuf==3.19.5 +pytest-asyncio==0.21.1 diff --git a/tests/system/data/test_system.py b/tests/system/data/test_system.py index 6bd21f386..fb0d9eb82 100644 --- a/tests/system/data/test_system.py +++ b/tests/system/data/test_system.py @@ -92,14 +92,15 @@ async def add_row( self.rows.append(row_key) async def delete_rows(self): - request = { - "table_name": self.table.table_name, - "entries": [ - {"row_key": row, "mutations": [{"delete_from_row": {}}]} - for row in self.rows - ], - } - await self.table.client._gapic_client.mutate_rows(request) + if self.rows: + request = { + "table_name": self.table.table_name, + "entries": [ + {"row_key": row, "mutations": [{"delete_from_row": {}}]} + for row in self.rows + ], + } + await self.table.client._gapic_client.mutate_rows(request) @pytest.mark.usefixtures("table") @@ -147,7 +148,7 @@ async def temp_rows(table): @pytest.mark.usefixtures("table") @pytest.mark.usefixtures("client") -@retry.Retry(predicate=retry.if_exception_type(ClientError), initial=1, maximum=10) +@retry.AsyncRetry(predicate=retry.if_exception_type(ClientError), initial=1, maximum=10) @pytest.mark.asyncio async def test_ping_and_warm_gapic(client, table): """ @@ -160,7 +161,7 @@ async def test_ping_and_warm_gapic(client, table): @pytest.mark.usefixtures("table") @pytest.mark.usefixtures("client") -@retry.Retry(predicate=retry.if_exception_type(ClientError), initial=1, maximum=5) +@retry.AsyncRetry(predicate=retry.if_exception_type(ClientError), initial=1, maximum=5) @pytest.mark.asyncio async def test_ping_and_warm(client, table): """ @@ -176,9 +177,9 @@ async def test_ping_and_warm(client, table): assert results[0] is None -@pytest.mark.usefixtures("table") -@retry.Retry(predicate=retry.if_exception_type(ClientError), initial=1, maximum=5) @pytest.mark.asyncio +@pytest.mark.usefixtures("table") +@retry.AsyncRetry(predicate=retry.if_exception_type(ClientError), initial=1, maximum=5) async def test_mutation_set_cell(table, temp_rows): """ Ensure cells can be set properly @@ -196,7 +197,7 @@ async def test_mutation_set_cell(table, temp_rows): @pytest.mark.usefixtures("client") @pytest.mark.usefixtures("table") -@retry.Retry(predicate=retry.if_exception_type(ClientError), initial=1, maximum=5) +@retry.AsyncRetry(predicate=retry.if_exception_type(ClientError), initial=1, maximum=5) @pytest.mark.asyncio async def test_sample_row_keys(client, table, temp_rows, column_split_config): """ @@ -239,7 +240,7 @@ async def test_bulk_mutations_set_cell(client, table, temp_rows): @pytest.mark.usefixtures("client") @pytest.mark.usefixtures("table") -@retry.Retry(predicate=retry.if_exception_type(ClientError), initial=1, maximum=5) +@retry.AsyncRetry(predicate=retry.if_exception_type(ClientError), initial=1, maximum=5) @pytest.mark.asyncio async def test_mutations_batcher_context_manager(client, table, temp_rows): """ @@ -267,7 +268,7 @@ async def test_mutations_batcher_context_manager(client, table, temp_rows): @pytest.mark.usefixtures("client") @pytest.mark.usefixtures("table") -@retry.Retry(predicate=retry.if_exception_type(ClientError), initial=1, maximum=5) +@retry.AsyncRetry(predicate=retry.if_exception_type(ClientError), initial=1, maximum=5) @pytest.mark.asyncio async def test_mutations_batcher_timer_flush(client, table, temp_rows): """ @@ -293,7 +294,7 @@ async def test_mutations_batcher_timer_flush(client, table, temp_rows): @pytest.mark.usefixtures("client") @pytest.mark.usefixtures("table") -@retry.Retry(predicate=retry.if_exception_type(ClientError), initial=1, maximum=5) +@retry.AsyncRetry(predicate=retry.if_exception_type(ClientError), initial=1, maximum=5) @pytest.mark.asyncio async def test_mutations_batcher_count_flush(client, table, temp_rows): """ @@ -329,7 +330,7 @@ async def test_mutations_batcher_count_flush(client, table, temp_rows): @pytest.mark.usefixtures("client") @pytest.mark.usefixtures("table") -@retry.Retry(predicate=retry.if_exception_type(ClientError), initial=1, maximum=5) +@retry.AsyncRetry(predicate=retry.if_exception_type(ClientError), initial=1, maximum=5) @pytest.mark.asyncio async def test_mutations_batcher_bytes_flush(client, table, temp_rows): """ @@ -366,7 +367,6 @@ async def test_mutations_batcher_bytes_flush(client, table, temp_rows): @pytest.mark.usefixtures("client") @pytest.mark.usefixtures("table") -@retry.Retry(predicate=retry.if_exception_type(ClientError), initial=1, maximum=5) @pytest.mark.asyncio async def test_mutations_batcher_no_flush(client, table, temp_rows): """ @@ -570,7 +570,7 @@ async def test_check_and_mutate( @pytest.mark.usefixtures("table") -@retry.Retry(predicate=retry.if_exception_type(ClientError), initial=1, maximum=5) +@retry.AsyncRetry(predicate=retry.if_exception_type(ClientError), initial=1, maximum=5) @pytest.mark.asyncio async def test_read_rows_stream(table, temp_rows): """ @@ -590,7 +590,7 @@ async def test_read_rows_stream(table, temp_rows): @pytest.mark.usefixtures("table") -@retry.Retry(predicate=retry.if_exception_type(ClientError), initial=1, maximum=5) +@retry.AsyncRetry(predicate=retry.if_exception_type(ClientError), initial=1, maximum=5) @pytest.mark.asyncio async def test_read_rows(table, temp_rows): """ @@ -606,7 +606,7 @@ async def test_read_rows(table, temp_rows): @pytest.mark.usefixtures("table") -@retry.Retry(predicate=retry.if_exception_type(ClientError), initial=1, maximum=5) +@retry.AsyncRetry(predicate=retry.if_exception_type(ClientError), initial=1, maximum=5) @pytest.mark.asyncio async def test_read_rows_sharded_simple(table, temp_rows): """ @@ -629,7 +629,7 @@ async def test_read_rows_sharded_simple(table, temp_rows): @pytest.mark.usefixtures("table") -@retry.Retry(predicate=retry.if_exception_type(ClientError), initial=1, maximum=5) +@retry.AsyncRetry(predicate=retry.if_exception_type(ClientError), initial=1, maximum=5) @pytest.mark.asyncio async def test_read_rows_sharded_from_sample(table, temp_rows): """ @@ -654,7 +654,7 @@ async def test_read_rows_sharded_from_sample(table, temp_rows): @pytest.mark.usefixtures("table") -@retry.Retry(predicate=retry.if_exception_type(ClientError), initial=1, maximum=5) +@retry.AsyncRetry(predicate=retry.if_exception_type(ClientError), initial=1, maximum=5) @pytest.mark.asyncio async def test_read_rows_sharded_filters_limits(table, temp_rows): """ @@ -683,7 +683,7 @@ async def test_read_rows_sharded_filters_limits(table, temp_rows): @pytest.mark.usefixtures("table") -@retry.Retry(predicate=retry.if_exception_type(ClientError), initial=1, maximum=5) +@retry.AsyncRetry(predicate=retry.if_exception_type(ClientError), initial=1, maximum=5) @pytest.mark.asyncio async def test_read_rows_range_query(table, temp_rows): """ @@ -705,7 +705,7 @@ async def test_read_rows_range_query(table, temp_rows): @pytest.mark.usefixtures("table") -@retry.Retry(predicate=retry.if_exception_type(ClientError), initial=1, maximum=5) +@retry.AsyncRetry(predicate=retry.if_exception_type(ClientError), initial=1, maximum=5) @pytest.mark.asyncio async def test_read_rows_single_key_query(table, temp_rows): """ @@ -726,7 +726,7 @@ async def test_read_rows_single_key_query(table, temp_rows): @pytest.mark.usefixtures("table") -@retry.Retry(predicate=retry.if_exception_type(ClientError), initial=1, maximum=5) +@retry.AsyncRetry(predicate=retry.if_exception_type(ClientError), initial=1, maximum=5) @pytest.mark.asyncio async def test_read_rows_with_filter(table, temp_rows): """ @@ -842,7 +842,7 @@ async def test_row_exists(table, temp_rows): @pytest.mark.usefixtures("table") -@retry.Retry(predicate=retry.if_exception_type(ClientError), initial=1, maximum=5) +@retry.AsyncRetry(predicate=retry.if_exception_type(ClientError), initial=1, maximum=5) @pytest.mark.parametrize( "cell_value,filter_input,expect_match", [ diff --git a/tests/unit/data/_async/test__mutate_rows.py b/tests/unit/data/_async/test__mutate_rows.py index d41929518..e03028c45 100644 --- a/tests/unit/data/_async/test__mutate_rows.py +++ b/tests/unit/data/_async/test__mutate_rows.py @@ -164,11 +164,13 @@ async def test_mutate_rows_operation(self): table = mock.Mock() entries = [_make_mutation(), _make_mutation()] operation_timeout = 0.05 - instance = self._make_one( - client, table, entries, operation_timeout, operation_timeout - ) - with mock.patch.object(instance, "_operation", AsyncMock()) as attempt_mock: - attempt_mock.return_value = None + cls = self._target_class() + with mock.patch( + f"{cls.__module__}.{cls.__name__}._run_attempt", AsyncMock() + ) as attempt_mock: + instance = self._make_one( + client, table, entries, operation_timeout, operation_timeout + ) await instance.start() assert attempt_mock.call_count == 1 diff --git a/tests/unit/data/_async/test_client.py b/tests/unit/data/_async/test_client.py index 54bbb6158..60a305bcb 100644 --- a/tests/unit/data/_async/test_client.py +++ b/tests/unit/data/_async/test_client.py @@ -1142,44 +1142,44 @@ def test_table_ctor_sync(self): ( "read_rows_stream", (ReadRowsQuery(),), - "google.cloud.bigtable.data._async._read_rows.retry_target_stream", + "google.api_core.retry.retry_target_stream_async", (), ), ( "read_rows", (ReadRowsQuery(),), - "google.cloud.bigtable.data._async._read_rows.retry_target_stream", + "google.api_core.retry.retry_target_stream_async", (), ), ( "read_row", (b"row_key",), - "google.cloud.bigtable.data._async._read_rows.retry_target_stream", + "google.api_core.retry.retry_target_stream_async", (), ), ( "read_rows_sharded", ([ReadRowsQuery()],), - "google.cloud.bigtable.data._async._read_rows.retry_target_stream", + "google.api_core.retry.retry_target_stream_async", (), ), ( "row_exists", (b"row_key",), - "google.cloud.bigtable.data._async._read_rows.retry_target_stream", + "google.api_core.retry.retry_target_stream_async", (), ), - ("sample_row_keys", (), "google.api_core.retry_async.retry_target", ()), + ("sample_row_keys", (), "google.api_core.retry.retry_target_async", ()), ( "mutate_row", - (b"row_key", []), - "google.api_core.retry_async.retry_target", + (b"row_key", [mock.Mock()]), + "google.api_core.retry.retry_target_async", (), ), ( "bulk_mutate_rows", ([mutations.RowMutationEntry(b"key", [mock.Mock()])],), - "google.api_core.retry_async.retry_target", + "google.api_core.retry.retry_target_async", (_MutateRowsIncomplete,), ), ], @@ -1223,15 +1223,15 @@ async def test_customizable_retryable_errors( """ from google.cloud.bigtable.data import BigtableDataClientAsync - with mock.patch( - "google.api_core.retry_async.if_exception_type" - ) as predicate_builder_mock: - with mock.patch(retry_fn_path) as retry_fn_mock: - async with BigtableDataClientAsync() as client: - table = client.get_table("instance-id", "table-id") - expected_predicate = lambda a: a in expected_retryables # noqa + with mock.patch(retry_fn_path) as retry_fn_mock: + async with BigtableDataClientAsync() as client: + table = client.get_table("instance-id", "table-id") + expected_predicate = lambda a: a in expected_retryables # noqa + retry_fn_mock.side_effect = RuntimeError("stop early") + with mock.patch( + "google.api_core.retry.if_exception_type" + ) as predicate_builder_mock: predicate_builder_mock.return_value = expected_predicate - retry_fn_mock.side_effect = RuntimeError("stop early") with pytest.raises(Exception): # we expect an exception from attempting to call the mock test_fn = table.__getattribute__(fn_name) @@ -1253,10 +1253,10 @@ async def test_customizable_retryable_errors( ("read_rows_sharded", ([ReadRowsQuery()],), "read_rows"), ("row_exists", (b"row_key",), "read_rows"), ("sample_row_keys", (), "sample_row_keys"), - ("mutate_row", (b"row_key", []), "mutate_row"), + ("mutate_row", (b"row_key", [mock.Mock()]), "mutate_row"), ( "bulk_mutate_rows", - ([mutations.RowMutationEntry(b"key", [mock.Mock()])],), + ([mutations.RowMutationEntry(b"key", [mutations.DeleteAllFromRow()])],), "mutate_rows", ), ("check_and_mutate_row", (b"row_key", None), "check_and_mutate_row"), @@ -2204,7 +2204,7 @@ async def test_mutate_row_retryable_errors(self, retryable_exception): mutation = mutations.DeleteAllFromRow() assert mutation.is_idempotent() is True await table.mutate_row( - "row_key", mutation, operation_timeout=0.05 + "row_key", mutation, operation_timeout=0.01 ) cause = e.value.__cause__ assert isinstance(cause, RetryExceptionGroup) diff --git a/tests/unit/data/_async/test_mutations_batcher.py b/tests/unit/data/_async/test_mutations_batcher.py index 17bd8d420..446cd822e 100644 --- a/tests/unit/data/_async/test_mutations_batcher.py +++ b/tests/unit/data/_async/test_mutations_batcher.py @@ -1158,10 +1158,10 @@ async def test_customizable_retryable_errors( from google.cloud.bigtable.data._async.client import TableAsync with mock.patch( - "google.api_core.retry_async.if_exception_type" + "google.api_core.retry.if_exception_type" ) as predicate_builder_mock: with mock.patch( - "google.api_core.retry_async.retry_target" + "google.api_core.retry.retry_target_async" ) as retry_fn_mock: table = None with mock.patch("asyncio.create_task"): diff --git a/tests/unit/data/test__helpers.py b/tests/unit/data/test__helpers.py index b9c1dc2bb..5a9c500ed 100644 --- a/tests/unit/data/test__helpers.py +++ b/tests/unit/data/test__helpers.py @@ -17,7 +17,6 @@ from google.api_core import exceptions as core_exceptions import google.cloud.bigtable.data._helpers as _helpers from google.cloud.bigtable.data._helpers import TABLE_DEFAULT -import google.cloud.bigtable.data.exceptions as bigtable_exceptions import mock @@ -100,71 +99,6 @@ def test_attempt_timeout_w_sleeps(self): expected_value -= sleep_time -class TestConvertRetryDeadline: - """ - Test _convert_retry_deadline wrapper - """ - - @pytest.mark.asyncio - @pytest.mark.parametrize("is_async", [True, False]) - async def test_no_error(self, is_async): - def test_func(): - return 1 - - async def test_async(): - return test_func() - - func = test_async if is_async else test_func - wrapped = _helpers._convert_retry_deadline(func, 0.1, is_async) - result = await wrapped() if is_async else wrapped() - assert result == 1 - - @pytest.mark.asyncio - @pytest.mark.parametrize("timeout", [0.1, 2.0, 30.0]) - @pytest.mark.parametrize("is_async", [True, False]) - async def test_retry_error(self, timeout, is_async): - from google.api_core.exceptions import RetryError, DeadlineExceeded - - def test_func(): - raise RetryError("retry error", None) - - async def test_async(): - return test_func() - - func = test_async if is_async else test_func - wrapped = _helpers._convert_retry_deadline(func, timeout, is_async=is_async) - with pytest.raises(DeadlineExceeded) as e: - await wrapped() if is_async else wrapped() - assert e.value.__cause__ is None - assert f"operation_timeout of {timeout}s exceeded" in str(e.value) - - @pytest.mark.asyncio - @pytest.mark.parametrize("is_async", [True, False]) - async def test_with_retry_errors(self, is_async): - from google.api_core.exceptions import RetryError, DeadlineExceeded - - timeout = 10.0 - - def test_func(): - raise RetryError("retry error", None) - - async def test_async(): - return test_func() - - func = test_async if is_async else test_func - - associated_errors = [RuntimeError("error1"), ZeroDivisionError("other")] - wrapped = _helpers._convert_retry_deadline( - func, timeout, associated_errors, is_async - ) - with pytest.raises(DeadlineExceeded) as e: - await wrapped() - cause = e.value.__cause__ - assert isinstance(cause, bigtable_exceptions.RetryExceptionGroup) - assert cause.exceptions == tuple(associated_errors) - assert f"operation_timeout of {timeout}s exceeded" in str(e.value) - - class TestValidateTimeouts: def test_validate_timeouts_error_messages(self): with pytest.raises(ValueError) as e: