From b1914517bdc2b42d4c4d8071359d9ed70ac79785 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Fri, 1 Dec 2023 13:09:59 -0800 Subject: [PATCH 01/15] chore: optimize gapic calls (#863) --- .../services/bigtable/async_client.py | 98 ++++++++----------- .../bigtable/transports/grpc_asyncio.py | 62 ++++++++++++ tests/unit/data/_async/test_client.py | 2 +- 3 files changed, 105 insertions(+), 57 deletions(-) diff --git a/google/cloud/bigtable_v2/services/bigtable/async_client.py b/google/cloud/bigtable_v2/services/bigtable/async_client.py index a80be70af..d325564c0 100644 --- a/google/cloud/bigtable_v2/services/bigtable/async_client.py +++ b/google/cloud/bigtable_v2/services/bigtable/async_client.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # +import functools from collections import OrderedDict import functools import re @@ -272,7 +273,8 @@ def read_rows( "the individual field arguments should be set." ) - request = bigtable.ReadRowsRequest(request) + if not isinstance(request, bigtable.ReadRowsRequest): + request = bigtable.ReadRowsRequest(request) # If we have keyword arguments corresponding to fields on the # request, apply these. @@ -283,12 +285,9 @@ def read_rows( # Wrap the RPC method; this adds retry and timeout information, # and friendly error handling. - rpc = gapic_v1.method_async.wrap_method( - self._client._transport.read_rows, - default_timeout=43200.0, - client_info=DEFAULT_CLIENT_INFO, - ) - + rpc = self._client._transport._wrapped_methods[ + self._client._transport.read_rows + ] # Certain fields should be provided within the metadata header; # add these here. metadata = tuple(metadata) + ( @@ -367,7 +366,8 @@ def sample_row_keys( "the individual field arguments should be set." ) - request = bigtable.SampleRowKeysRequest(request) + if not isinstance(request, bigtable.SampleRowKeysRequest): + request = bigtable.SampleRowKeysRequest(request) # If we have keyword arguments corresponding to fields on the # request, apply these. @@ -378,12 +378,9 @@ def sample_row_keys( # Wrap the RPC method; this adds retry and timeout information, # and friendly error handling. - rpc = gapic_v1.method_async.wrap_method( - self._client._transport.sample_row_keys, - default_timeout=60.0, - client_info=DEFAULT_CLIENT_INFO, - ) - + rpc = self._client._transport._wrapped_methods[ + self._client._transport.sample_row_keys + ] # Certain fields should be provided within the metadata header; # add these here. metadata = tuple(metadata) + ( @@ -479,7 +476,8 @@ async def mutate_row( "the individual field arguments should be set." ) - request = bigtable.MutateRowRequest(request) + if not isinstance(request, bigtable.MutateRowRequest): + request = bigtable.MutateRowRequest(request) # If we have keyword arguments corresponding to fields on the # request, apply these. @@ -494,21 +492,9 @@ async def mutate_row( # Wrap the RPC method; this adds retry and timeout information, # and friendly error handling. - rpc = gapic_v1.method_async.wrap_method( - self._client._transport.mutate_row, - default_retry=retries.Retry( - initial=0.01, - maximum=60.0, - multiplier=2, - predicate=retries.if_exception_type( - core_exceptions.DeadlineExceeded, - core_exceptions.ServiceUnavailable, - ), - deadline=60.0, - ), - default_timeout=60.0, - client_info=DEFAULT_CLIENT_INFO, - ) + rpc = self._client._transport._wrapped_methods[ + self._client._transport.mutate_row + ] # Certain fields should be provided within the metadata header; # add these here. @@ -601,7 +587,8 @@ def mutate_rows( "the individual field arguments should be set." ) - request = bigtable.MutateRowsRequest(request) + if not isinstance(request, bigtable.MutateRowsRequest): + request = bigtable.MutateRowsRequest(request) # If we have keyword arguments corresponding to fields on the # request, apply these. @@ -614,11 +601,9 @@ def mutate_rows( # Wrap the RPC method; this adds retry and timeout information, # and friendly error handling. - rpc = gapic_v1.method_async.wrap_method( - self._client._transport.mutate_rows, - default_timeout=600.0, - client_info=DEFAULT_CLIENT_INFO, - ) + rpc = self._client._transport._wrapped_methods[ + self._client._transport.mutate_rows + ] # Certain fields should be provided within the metadata header; # add these here. @@ -749,7 +734,8 @@ async def check_and_mutate_row( "the individual field arguments should be set." ) - request = bigtable.CheckAndMutateRowRequest(request) + if not isinstance(request, bigtable.CheckAndMutateRowRequest): + request = bigtable.CheckAndMutateRowRequest(request) # If we have keyword arguments corresponding to fields on the # request, apply these. @@ -768,11 +754,9 @@ async def check_and_mutate_row( # Wrap the RPC method; this adds retry and timeout information, # and friendly error handling. - rpc = gapic_v1.method_async.wrap_method( - self._client._transport.check_and_mutate_row, - default_timeout=20.0, - client_info=DEFAULT_CLIENT_INFO, - ) + rpc = self._client._transport._wrapped_methods[ + self._client._transport.check_and_mutate_row + ] # Certain fields should be provided within the metadata header; # add these here. @@ -851,7 +835,8 @@ async def ping_and_warm( "the individual field arguments should be set." ) - request = bigtable.PingAndWarmRequest(request) + if not isinstance(request, bigtable.PingAndWarmRequest): + request = bigtable.PingAndWarmRequest(request) # If we have keyword arguments corresponding to fields on the # request, apply these. @@ -862,11 +847,9 @@ async def ping_and_warm( # Wrap the RPC method; this adds retry and timeout information, # and friendly error handling. - rpc = gapic_v1.method_async.wrap_method( - self._client._transport.ping_and_warm, - default_timeout=None, - client_info=DEFAULT_CLIENT_INFO, - ) + rpc = self._client._transport._wrapped_methods[ + self._client._transport.ping_and_warm + ] # Certain fields should be provided within the metadata header; # add these here. @@ -968,7 +951,8 @@ async def read_modify_write_row( "the individual field arguments should be set." ) - request = bigtable.ReadModifyWriteRowRequest(request) + if not isinstance(request, bigtable.ReadModifyWriteRowRequest): + request = bigtable.ReadModifyWriteRowRequest(request) # If we have keyword arguments corresponding to fields on the # request, apply these. @@ -983,11 +967,9 @@ async def read_modify_write_row( # Wrap the RPC method; this adds retry and timeout information, # and friendly error handling. - rpc = gapic_v1.method_async.wrap_method( - self._client._transport.read_modify_write_row, - default_timeout=20.0, - client_info=DEFAULT_CLIENT_INFO, - ) + rpc = self._client._transport._wrapped_methods[ + self._client._transport.read_modify_write_row + ] # Certain fields should be provided within the metadata header; # add these here. @@ -1076,7 +1058,10 @@ def generate_initial_change_stream_partitions( "the individual field arguments should be set." ) - request = bigtable.GenerateInitialChangeStreamPartitionsRequest(request) + if not isinstance( + request, bigtable.GenerateInitialChangeStreamPartitionsRequest + ): + request = bigtable.GenerateInitialChangeStreamPartitionsRequest(request) # If we have keyword arguments corresponding to fields on the # request, apply these. @@ -1174,7 +1159,8 @@ def read_change_stream( "the individual field arguments should be set." ) - request = bigtable.ReadChangeStreamRequest(request) + if not isinstance(request, bigtable.ReadChangeStreamRequest): + request = bigtable.ReadChangeStreamRequest(request) # If we have keyword arguments corresponding to fields on the # request, apply these. diff --git a/google/cloud/bigtable_v2/services/bigtable/transports/grpc_asyncio.py b/google/cloud/bigtable_v2/services/bigtable/transports/grpc_asyncio.py index 8bf02ce77..3450d4969 100644 --- a/google/cloud/bigtable_v2/services/bigtable/transports/grpc_asyncio.py +++ b/google/cloud/bigtable_v2/services/bigtable/transports/grpc_asyncio.py @@ -18,6 +18,8 @@ from google.api_core import gapic_v1 from google.api_core import grpc_helpers_async +from google.api_core import exceptions as core_exceptions +from google.api_core import retry as retries from google.auth import credentials as ga_credentials # type: ignore from google.auth.transport.grpc import SslCredentials # type: ignore @@ -512,6 +514,66 @@ def read_change_stream( ) return self._stubs["read_change_stream"] + def _prep_wrapped_messages(self, client_info): + # Precompute the wrapped methods. + self._wrapped_methods = { + self.read_rows: gapic_v1.method_async.wrap_method( + self.read_rows, + default_timeout=43200.0, + client_info=client_info, + ), + self.sample_row_keys: gapic_v1.method_async.wrap_method( + self.sample_row_keys, + default_timeout=60.0, + client_info=client_info, + ), + self.mutate_row: gapic_v1.method_async.wrap_method( + self.mutate_row, + default_retry=retries.Retry( + initial=0.01, + maximum=60.0, + multiplier=2, + predicate=retries.if_exception_type( + core_exceptions.DeadlineExceeded, + core_exceptions.ServiceUnavailable, + ), + deadline=60.0, + ), + default_timeout=60.0, + client_info=client_info, + ), + self.mutate_rows: gapic_v1.method_async.wrap_method( + self.mutate_rows, + default_timeout=600.0, + client_info=client_info, + ), + self.check_and_mutate_row: gapic_v1.method_async.wrap_method( + self.check_and_mutate_row, + default_timeout=20.0, + client_info=client_info, + ), + self.ping_and_warm: gapic_v1.method_async.wrap_method( + self.ping_and_warm, + default_timeout=None, + client_info=client_info, + ), + self.read_modify_write_row: gapic_v1.method_async.wrap_method( + self.read_modify_write_row, + default_timeout=20.0, + client_info=client_info, + ), + self.generate_initial_change_stream_partitions: gapic_v1.method_async.wrap_method( + self.generate_initial_change_stream_partitions, + default_timeout=60.0, + client_info=client_info, + ), + self.read_change_stream: gapic_v1.method_async.wrap_method( + self.read_change_stream, + default_timeout=43200.0, + client_info=client_info, + ), + } + def close(self): return self.grpc_channel.close() diff --git a/tests/unit/data/_async/test_client.py b/tests/unit/data/_async/test_client.py index 7afecc5b0..7718246fc 100644 --- a/tests/unit/data/_async/test_client.py +++ b/tests/unit/data/_async/test_client.py @@ -141,7 +141,7 @@ async def test_ctor_dict_options(self): async def test_veneer_grpc_headers(self): # client_info should be populated with headers to # detect as a veneer client - patch = mock.patch("google.api_core.gapic_v1.method.wrap_method") + patch = mock.patch("google.api_core.gapic_v1.method_async.wrap_method") with patch as gapic_mock: client = self._make_one(project="project-id") wrapped_call_list = gapic_mock.call_args_list From 285cdd3bf74a871e5fa8134029e3ac060d2622f5 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Fri, 1 Dec 2023 14:04:31 -0800 Subject: [PATCH 02/15] feat: expose retryable error codes to users (#879) --- .../bigtable/data/_async/_mutate_rows.py | 6 +- .../cloud/bigtable/data/_async/_read_rows.py | 15 +- google/cloud/bigtable/data/_async/client.py | 140 +++++-- .../bigtable/data/_async/mutations_batcher.py | 12 +- google/cloud/bigtable/data/_helpers.py | 31 +- tests/unit/data/_async/test__mutate_rows.py | 26 +- tests/unit/data/_async/test_client.py | 358 ++++++++++-------- .../data/_async/test_mutations_batcher.py | 82 +++- tests/unit/data/test__helpers.py | 48 +++ 9 files changed, 514 insertions(+), 204 deletions(-) diff --git a/google/cloud/bigtable/data/_async/_mutate_rows.py b/google/cloud/bigtable/data/_async/_mutate_rows.py index 5bf759151..5971a9894 100644 --- a/google/cloud/bigtable/data/_async/_mutate_rows.py +++ b/google/cloud/bigtable/data/_async/_mutate_rows.py @@ -14,7 +14,7 @@ # from __future__ import annotations -from typing import TYPE_CHECKING +from typing import Sequence, TYPE_CHECKING import asyncio from dataclasses import dataclass import functools @@ -66,6 +66,7 @@ def __init__( mutation_entries: list["RowMutationEntry"], operation_timeout: float, attempt_timeout: float | None, + retryable_exceptions: Sequence[type[Exception]] = (), ): """ Args: @@ -96,8 +97,7 @@ def __init__( # create predicate for determining which errors are retryable self.is_retryable = retries.if_exception_type( # RPC level errors - core_exceptions.DeadlineExceeded, - core_exceptions.ServiceUnavailable, + *retryable_exceptions, # Entry level errors bt_exceptions._MutateRowsIncomplete, ) diff --git a/google/cloud/bigtable/data/_async/_read_rows.py b/google/cloud/bigtable/data/_async/_read_rows.py index 90cc7e87c..ad1f7b84d 100644 --- a/google/cloud/bigtable/data/_async/_read_rows.py +++ b/google/cloud/bigtable/data/_async/_read_rows.py @@ -15,7 +15,13 @@ from __future__ import annotations -from typing import TYPE_CHECKING, AsyncGenerator, AsyncIterable, Awaitable +from typing import ( + TYPE_CHECKING, + AsyncGenerator, + AsyncIterable, + Awaitable, + Sequence, +) from google.cloud.bigtable_v2.types import ReadRowsRequest as ReadRowsRequestPB from google.cloud.bigtable_v2.types import ReadRowsResponse as ReadRowsResponsePB @@ -74,6 +80,7 @@ def __init__( table: "TableAsync", operation_timeout: float, attempt_timeout: float, + retryable_exceptions: Sequence[type[Exception]] = (), ): self.attempt_timeout_gen = _attempt_timeout_generator( attempt_timeout, operation_timeout @@ -88,11 +95,7 @@ def __init__( else: self.request = query._to_pb(table) self.table = table - self._predicate = retries.if_exception_type( - core_exceptions.DeadlineExceeded, - core_exceptions.ServiceUnavailable, - core_exceptions.Aborted, - ) + self._predicate = retries.if_exception_type(*retryable_exceptions) self._metadata = _make_metadata( table.table_name, table.app_profile_id, diff --git a/google/cloud/bigtable/data/_async/client.py b/google/cloud/bigtable/data/_async/client.py index ab8cc48f8..a79ead7f8 100644 --- a/google/cloud/bigtable/data/_async/client.py +++ b/google/cloud/bigtable/data/_async/client.py @@ -21,6 +21,7 @@ AsyncIterable, Optional, Set, + Sequence, TYPE_CHECKING, ) @@ -45,7 +46,9 @@ from google.api_core.exceptions import GoogleAPICallError from google.cloud.environment_vars import BIGTABLE_EMULATOR # type: ignore from google.api_core import retry_async as retries -from google.api_core import exceptions as core_exceptions +from google.api_core.exceptions import DeadlineExceeded +from google.api_core.exceptions import ServiceUnavailable +from google.api_core.exceptions import Aborted from google.cloud.bigtable.data._async._read_rows import _ReadRowsOperationAsync import google.auth.credentials @@ -64,6 +67,7 @@ from google.cloud.bigtable.data._helpers import _make_metadata from google.cloud.bigtable.data._helpers import _convert_retry_deadline from google.cloud.bigtable.data._helpers import _validate_timeouts +from google.cloud.bigtable.data._helpers import _get_retryable_errors from google.cloud.bigtable.data._helpers import _get_timeouts from google.cloud.bigtable.data._helpers import _attempt_timeout_generator from google.cloud.bigtable.data._async.mutations_batcher import MutationsBatcherAsync @@ -366,21 +370,10 @@ async def _remove_instance_registration( except KeyError: return False - def get_table( - self, - instance_id: str, - table_id: str, - app_profile_id: str | None = None, - *, - default_read_rows_operation_timeout: float = 600, - default_read_rows_attempt_timeout: float | None = None, - default_mutate_rows_operation_timeout: float = 600, - default_mutate_rows_attempt_timeout: float | None = None, - default_operation_timeout: float = 60, - default_attempt_timeout: float | None = None, - ) -> TableAsync: + def get_table(self, instance_id: str, table_id: str, *args, **kwargs) -> TableAsync: """ - Returns a table instance for making data API requests + Returns a table instance for making data API requests. All arguments are passed + directly to the TableAsync constructor. Args: instance_id: The Bigtable instance ID to associate with this client. @@ -402,15 +395,17 @@ def get_table( seconds. If not set, defaults to 60 seconds default_attempt_timeout: The default timeout for all other individual rpc requests, in seconds. If not set, defaults to 20 seconds + default_read_rows_retryable_errors: a list of errors that will be retried + if encountered during read_rows and related operations. + Defaults to 4 (DeadlineExceeded), 14 (ServiceUnavailable), and 10 (Aborted) + default_mutate_rows_retryable_errors: a list of errors that will be retried + if encountered during mutate_rows and related operations. + Defaults to 4 (DeadlineExceeded) and 14 (ServiceUnavailable) + default_retryable_errors: a list of errors that will be retried if + encountered during all other operations. + Defaults to 4 (DeadlineExceeded) and 14 (ServiceUnavailable) """ - return TableAsync( - self, - instance_id, - table_id, - app_profile_id, - default_operation_timeout=default_operation_timeout, - default_attempt_timeout=default_attempt_timeout, - ) + return TableAsync(self, instance_id, table_id, *args, **kwargs) async def __aenter__(self): self._start_background_channel_refresh() @@ -442,6 +437,19 @@ def __init__( default_mutate_rows_attempt_timeout: float | None = 60, default_operation_timeout: float = 60, default_attempt_timeout: float | None = 20, + default_read_rows_retryable_errors: Sequence[type[Exception]] = ( + DeadlineExceeded, + ServiceUnavailable, + Aborted, + ), + default_mutate_rows_retryable_errors: Sequence[type[Exception]] = ( + DeadlineExceeded, + ServiceUnavailable, + ), + default_retryable_errors: Sequence[type[Exception]] = ( + DeadlineExceeded, + ServiceUnavailable, + ), ): """ Initialize a Table instance @@ -468,9 +476,20 @@ def __init__( seconds. If not set, defaults to 60 seconds default_attempt_timeout: The default timeout for all other individual rpc requests, in seconds. If not set, defaults to 20 seconds + default_read_rows_retryable_errors: a list of errors that will be retried + if encountered during read_rows and related operations. + Defaults to 4 (DeadlineExceeded), 14 (ServiceUnavailable), and 10 (Aborted) + default_mutate_rows_retryable_errors: a list of errors that will be retried + if encountered during mutate_rows and related operations. + Defaults to 4 (DeadlineExceeded) and 14 (ServiceUnavailable) + default_retryable_errors: a list of errors that will be retried if + encountered during all other operations. + Defaults to 4 (DeadlineExceeded) and 14 (ServiceUnavailable) Raises: - RuntimeError if called outside of an async context (no running event loop) """ + # NOTE: any changes to the signature of this method should also be reflected + # in client.get_table() # validate timeouts _validate_timeouts( default_operation_timeout, default_attempt_timeout, allow_none=True @@ -506,6 +525,14 @@ def __init__( ) self.default_mutate_rows_attempt_timeout = default_mutate_rows_attempt_timeout + self.default_read_rows_retryable_errors = ( + default_read_rows_retryable_errors or () + ) + self.default_mutate_rows_retryable_errors = ( + default_mutate_rows_retryable_errors or () + ) + self.default_retryable_errors = default_retryable_errors or () + # raises RuntimeError if called outside of an async context (no running event loop) try: self._register_instance_task = asyncio.create_task( @@ -522,12 +549,15 @@ async def read_rows_stream( *, operation_timeout: float | TABLE_DEFAULT = TABLE_DEFAULT.READ_ROWS, attempt_timeout: float | None | TABLE_DEFAULT = TABLE_DEFAULT.READ_ROWS, + retryable_errors: Sequence[type[Exception]] + | TABLE_DEFAULT = TABLE_DEFAULT.READ_ROWS, ) -> AsyncIterable[Row]: """ Read a set of rows from the table, based on the specified query. Returns an iterator to asynchronously stream back row data. - Failed requests within operation_timeout will be retried. + Failed requests within operation_timeout will be retried based on the + retryable_errors list until operation_timeout is reached. Args: - query: contains details about which rows to return @@ -539,6 +569,8 @@ async def read_rows_stream( a DeadlineExceeded exception, and a retry will be attempted. Defaults to the Table's default_read_rows_attempt_timeout. If None, defaults to operation_timeout. + - retryable_errors: a list of errors that will be retried if encountered. + Defaults to the Table's default_read_rows_retryable_errors Returns: - an asynchronous iterator that yields rows returned by the query Raises: @@ -551,12 +583,14 @@ async def read_rows_stream( operation_timeout, attempt_timeout = _get_timeouts( operation_timeout, attempt_timeout, self ) + retryable_excs = _get_retryable_errors(retryable_errors, self) row_merger = _ReadRowsOperationAsync( query, self, operation_timeout=operation_timeout, attempt_timeout=attempt_timeout, + retryable_exceptions=retryable_excs, ) return row_merger.start_operation() @@ -566,13 +600,16 @@ async def read_rows( *, operation_timeout: float | TABLE_DEFAULT = TABLE_DEFAULT.READ_ROWS, attempt_timeout: float | None | TABLE_DEFAULT = TABLE_DEFAULT.READ_ROWS, + retryable_errors: Sequence[type[Exception]] + | TABLE_DEFAULT = TABLE_DEFAULT.READ_ROWS, ) -> list[Row]: """ Read a set of rows from the table, based on the specified query. Retruns results as a list of Row objects when the request is complete. For streamed results, use read_rows_stream. - Failed requests within operation_timeout will be retried. + Failed requests within operation_timeout will be retried based on the + retryable_errors list until operation_timeout is reached. Args: - query: contains details about which rows to return @@ -584,6 +621,10 @@ async def read_rows( a DeadlineExceeded exception, and a retry will be attempted. Defaults to the Table's default_read_rows_attempt_timeout. If None, defaults to operation_timeout. + If None, defaults to the Table's default_read_rows_attempt_timeout, + or the operation_timeout if that is also None. + - retryable_errors: a list of errors that will be retried if encountered. + Defaults to the Table's default_read_rows_retryable_errors. Returns: - a list of Rows returned by the query Raises: @@ -596,6 +637,7 @@ async def read_rows( query, operation_timeout=operation_timeout, attempt_timeout=attempt_timeout, + retryable_errors=retryable_errors, ) return [row async for row in row_generator] @@ -606,11 +648,14 @@ async def read_row( row_filter: RowFilter | None = None, operation_timeout: float | TABLE_DEFAULT = TABLE_DEFAULT.READ_ROWS, attempt_timeout: float | None | TABLE_DEFAULT = TABLE_DEFAULT.READ_ROWS, + retryable_errors: Sequence[type[Exception]] + | TABLE_DEFAULT = TABLE_DEFAULT.READ_ROWS, ) -> Row | None: """ Read a single row from the table, based on the specified key. - Failed requests within operation_timeout will be retried. + Failed requests within operation_timeout will be retried based on the + retryable_errors list until operation_timeout is reached. Args: - query: contains details about which rows to return @@ -622,6 +667,8 @@ async def read_row( a DeadlineExceeded exception, and a retry will be attempted. Defaults to the Table's default_read_rows_attempt_timeout. If None, defaults to operation_timeout. + - retryable_errors: a list of errors that will be retried if encountered. + Defaults to the Table's default_read_rows_retryable_errors. Returns: - a Row object if the row exists, otherwise None Raises: @@ -637,6 +684,7 @@ async def read_row( query, operation_timeout=operation_timeout, attempt_timeout=attempt_timeout, + retryable_errors=retryable_errors, ) if len(results) == 0: return None @@ -648,6 +696,8 @@ async def read_rows_sharded( *, operation_timeout: float | TABLE_DEFAULT = TABLE_DEFAULT.READ_ROWS, attempt_timeout: float | None | TABLE_DEFAULT = TABLE_DEFAULT.READ_ROWS, + retryable_errors: Sequence[type[Exception]] + | TABLE_DEFAULT = TABLE_DEFAULT.READ_ROWS, ) -> list[Row]: """ Runs a sharded query in parallel, then return the results in a single list. @@ -672,6 +722,8 @@ async def read_rows_sharded( a DeadlineExceeded exception, and a retry will be attempted. Defaults to the Table's default_read_rows_attempt_timeout. If None, defaults to operation_timeout. + - retryable_errors: a list of errors that will be retried if encountered. + Defaults to the Table's default_read_rows_retryable_errors. Raises: - ShardedReadRowsExceptionGroup: if any of the queries failed - ValueError: if the query_list is empty @@ -701,6 +753,7 @@ async def read_rows_sharded( query, operation_timeout=batch_operation_timeout, attempt_timeout=min(attempt_timeout, batch_operation_timeout), + retryable_errors=retryable_errors, ) for query in batch ] @@ -729,10 +782,13 @@ async def row_exists( *, operation_timeout: float | TABLE_DEFAULT = TABLE_DEFAULT.READ_ROWS, attempt_timeout: float | None | TABLE_DEFAULT = TABLE_DEFAULT.READ_ROWS, + retryable_errors: Sequence[type[Exception]] + | TABLE_DEFAULT = TABLE_DEFAULT.READ_ROWS, ) -> bool: """ Return a boolean indicating whether the specified row exists in the table. uses the filters: chain(limit cells per row = 1, strip value) + Args: - row_key: the key of the row to check - operation_timeout: the time budget for the entire operation, in seconds. @@ -743,6 +799,8 @@ async def row_exists( a DeadlineExceeded exception, and a retry will be attempted. Defaults to the Table's default_read_rows_attempt_timeout. If None, defaults to operation_timeout. + - retryable_errors: a list of errors that will be retried if encountered. + Defaults to the Table's default_read_rows_retryable_errors. Returns: - a bool indicating whether the row exists Raises: @@ -762,6 +820,7 @@ async def row_exists( query, operation_timeout=operation_timeout, attempt_timeout=attempt_timeout, + retryable_errors=retryable_errors, ) return len(results) > 0 @@ -770,6 +829,8 @@ async def sample_row_keys( *, operation_timeout: float | TABLE_DEFAULT = TABLE_DEFAULT.DEFAULT, attempt_timeout: float | None | TABLE_DEFAULT = TABLE_DEFAULT.DEFAULT, + retryable_errors: Sequence[type[Exception]] + | TABLE_DEFAULT = TABLE_DEFAULT.DEFAULT, ) -> RowKeySamples: """ Return a set of RowKeySamples that delimit contiguous sections of the table of @@ -791,6 +852,8 @@ async def sample_row_keys( a DeadlineExceeded exception, and a retry will be attempted. Defaults to the Table's default_attempt_timeout. If None, defaults to operation_timeout. + - retryable_errors: a list of errors that will be retried if encountered. + Defaults to the Table's default_retryable_errors. Returns: - a set of RowKeySamples the delimit contiguous sections of the table Raises: @@ -807,10 +870,8 @@ async def sample_row_keys( attempt_timeout, operation_timeout ) # prepare retryable - predicate = retries.if_exception_type( - core_exceptions.DeadlineExceeded, - core_exceptions.ServiceUnavailable, - ) + retryable_excs = _get_retryable_errors(retryable_errors, self) + predicate = retries.if_exception_type(*retryable_excs) transient_errors = [] def on_error_fn(exc): @@ -856,6 +917,8 @@ def mutations_batcher( flow_control_max_bytes: int = 100 * _MB_SIZE, batch_operation_timeout: float | TABLE_DEFAULT = TABLE_DEFAULT.MUTATE_ROWS, batch_attempt_timeout: float | None | TABLE_DEFAULT = TABLE_DEFAULT.MUTATE_ROWS, + batch_retryable_errors: Sequence[type[Exception]] + | TABLE_DEFAULT = TABLE_DEFAULT.MUTATE_ROWS, ) -> MutationsBatcherAsync: """ Returns a new mutations batcher instance. @@ -876,6 +939,8 @@ def mutations_batcher( - batch_attempt_timeout: timeout for each individual request, in seconds. Defaults to the Table's default_mutate_rows_attempt_timeout. If None, defaults to batch_operation_timeout. + - batch_retryable_errors: a list of errors that will be retried if encountered. + Defaults to the Table's default_mutate_rows_retryable_errors. Returns: - a MutationsBatcherAsync context manager that can batch requests """ @@ -888,6 +953,7 @@ def mutations_batcher( flow_control_max_bytes=flow_control_max_bytes, batch_operation_timeout=batch_operation_timeout, batch_attempt_timeout=batch_attempt_timeout, + batch_retryable_errors=batch_retryable_errors, ) async def mutate_row( @@ -897,6 +963,8 @@ async def mutate_row( *, operation_timeout: float | TABLE_DEFAULT = TABLE_DEFAULT.DEFAULT, attempt_timeout: float | None | TABLE_DEFAULT = TABLE_DEFAULT.DEFAULT, + retryable_errors: Sequence[type[Exception]] + | TABLE_DEFAULT = TABLE_DEFAULT.DEFAULT, ): """ Mutates a row atomically. @@ -918,6 +986,9 @@ async def mutate_row( a DeadlineExceeded exception, and a retry will be attempted. Defaults to the Table's default_attempt_timeout. If None, defaults to operation_timeout. + - retryable_errors: a list of errors that will be retried if encountered. + Only idempotent mutations will be retried. Defaults to the Table's + default_retryable_errors. Raises: - DeadlineExceeded: raised after operation timeout will be chained with a RetryExceptionGroup containing all @@ -937,8 +1008,7 @@ async def mutate_row( if all(mutation.is_idempotent() for mutation in mutations_list): # mutations are all idempotent and safe to retry predicate = retries.if_exception_type( - core_exceptions.DeadlineExceeded, - core_exceptions.ServiceUnavailable, + *_get_retryable_errors(retryable_errors, self) ) else: # mutations should not be retried @@ -982,6 +1052,8 @@ async def bulk_mutate_rows( *, operation_timeout: float | TABLE_DEFAULT = TABLE_DEFAULT.MUTATE_ROWS, attempt_timeout: float | None | TABLE_DEFAULT = TABLE_DEFAULT.MUTATE_ROWS, + retryable_errors: Sequence[type[Exception]] + | TABLE_DEFAULT = TABLE_DEFAULT.MUTATE_ROWS, ): """ Applies mutations for multiple rows in a single batched request. @@ -1007,6 +1079,8 @@ async def bulk_mutate_rows( a DeadlineExceeded exception, and a retry will be attempted. Defaults to the Table's default_mutate_rows_attempt_timeout. If None, defaults to operation_timeout. + - retryable_errors: a list of errors that will be retried if encountered. + Defaults to the Table's default_mutate_rows_retryable_errors Raises: - MutationsExceptionGroup if one or more mutations fails Contains details about any failed entries in .exceptions @@ -1015,6 +1089,7 @@ async def bulk_mutate_rows( operation_timeout, attempt_timeout = _get_timeouts( operation_timeout, attempt_timeout, self ) + retryable_excs = _get_retryable_errors(retryable_errors, self) operation = _MutateRowsOperationAsync( self.client._gapic_client, @@ -1022,6 +1097,7 @@ async def bulk_mutate_rows( mutation_entries, operation_timeout, attempt_timeout, + retryable_exceptions=retryable_excs, ) await operation.start() diff --git a/google/cloud/bigtable/data/_async/mutations_batcher.py b/google/cloud/bigtable/data/_async/mutations_batcher.py index 91d2b11e1..b2da30040 100644 --- a/google/cloud/bigtable/data/_async/mutations_batcher.py +++ b/google/cloud/bigtable/data/_async/mutations_batcher.py @@ -14,7 +14,7 @@ # from __future__ import annotations -from typing import Any, TYPE_CHECKING +from typing import Any, Sequence, TYPE_CHECKING import asyncio import atexit import warnings @@ -23,6 +23,7 @@ from google.cloud.bigtable.data.mutations import RowMutationEntry from google.cloud.bigtable.data.exceptions import MutationsExceptionGroup from google.cloud.bigtable.data.exceptions import FailedMutationEntryError +from google.cloud.bigtable.data._helpers import _get_retryable_errors from google.cloud.bigtable.data._helpers import _get_timeouts from google.cloud.bigtable.data._helpers import TABLE_DEFAULT @@ -192,6 +193,8 @@ def __init__( flow_control_max_bytes: int = 100 * _MB_SIZE, batch_operation_timeout: float | TABLE_DEFAULT = TABLE_DEFAULT.MUTATE_ROWS, batch_attempt_timeout: float | None | TABLE_DEFAULT = TABLE_DEFAULT.MUTATE_ROWS, + batch_retryable_errors: Sequence[type[Exception]] + | TABLE_DEFAULT = TABLE_DEFAULT.MUTATE_ROWS, ): """ Args: @@ -208,10 +211,16 @@ def __init__( - batch_attempt_timeout: timeout for each individual request, in seconds. If TABLE_DEFAULT, defaults to the Table's default_mutate_rows_attempt_timeout. If None, defaults to batch_operation_timeout. + - batch_retryable_errors: a list of errors that will be retried if encountered. + Defaults to the Table's default_mutate_rows_retryable_errors. """ self._operation_timeout, self._attempt_timeout = _get_timeouts( batch_operation_timeout, batch_attempt_timeout, table ) + self._retryable_errors: list[type[Exception]] = _get_retryable_errors( + batch_retryable_errors, table + ) + self.closed: bool = False self._table = table self._staged_entries: list[RowMutationEntry] = [] @@ -349,6 +358,7 @@ async def _execute_mutate_rows( batch, operation_timeout=self._operation_timeout, attempt_timeout=self._attempt_timeout, + retryable_exceptions=self._retryable_errors, ) await operation.start() except MutationsExceptionGroup as e: diff --git a/google/cloud/bigtable/data/_helpers.py b/google/cloud/bigtable/data/_helpers.py index 1d56926ff..96ea1d1ce 100644 --- a/google/cloud/bigtable/data/_helpers.py +++ b/google/cloud/bigtable/data/_helpers.py @@ -11,9 +11,12 @@ # See the License for the specific language governing permissions and # limitations under the License. # +""" +Helper functions used in various places in the library. +""" from __future__ import annotations -from typing import Callable, List, Tuple, Any +from typing import Callable, Sequence, List, Tuple, Any, TYPE_CHECKING import time import enum from collections import namedtuple @@ -22,6 +25,10 @@ from google.api_core import exceptions as core_exceptions from google.cloud.bigtable.data.exceptions import RetryExceptionGroup +if TYPE_CHECKING: + import grpc + from google.cloud.bigtable.data import TableAsync + """ Helper functions used in various places in the library. """ @@ -142,7 +149,9 @@ def wrapper(*args, **kwargs): def _get_timeouts( - operation: float | TABLE_DEFAULT, attempt: float | None | TABLE_DEFAULT, table + operation: float | TABLE_DEFAULT, + attempt: float | None | TABLE_DEFAULT, + table: "TableAsync", ) -> tuple[float, float]: """ Convert passed in timeout values to floats, using table defaults if necessary. @@ -209,3 +218,21 @@ def _validate_timeouts( elif attempt_timeout is not None: if attempt_timeout <= 0: raise ValueError("attempt_timeout must be greater than 0") + + +def _get_retryable_errors( + call_codes: Sequence["grpc.StatusCode" | int | type[Exception]] | TABLE_DEFAULT, + table: "TableAsync", +) -> list[type[Exception]]: + # load table defaults if necessary + if call_codes == TABLE_DEFAULT.DEFAULT: + call_codes = table.default_retryable_errors + elif call_codes == TABLE_DEFAULT.READ_ROWS: + call_codes = table.default_read_rows_retryable_errors + elif call_codes == TABLE_DEFAULT.MUTATE_ROWS: + call_codes = table.default_mutate_rows_retryable_errors + + return [ + e if isinstance(e, type) else type(core_exceptions.from_grpc_status(e, "")) + for e in call_codes + ] diff --git a/tests/unit/data/_async/test__mutate_rows.py b/tests/unit/data/_async/test__mutate_rows.py index 89a153af2..d41929518 100644 --- a/tests/unit/data/_async/test__mutate_rows.py +++ b/tests/unit/data/_async/test__mutate_rows.py @@ -46,9 +46,10 @@ def _make_one(self, *args, **kwargs): if not args: kwargs["gapic_client"] = kwargs.pop("gapic_client", mock.Mock()) kwargs["table"] = kwargs.pop("table", AsyncMock()) - kwargs["mutation_entries"] = kwargs.pop("mutation_entries", []) kwargs["operation_timeout"] = kwargs.pop("operation_timeout", 5) kwargs["attempt_timeout"] = kwargs.pop("attempt_timeout", 0.1) + kwargs["retryable_exceptions"] = kwargs.pop("retryable_exceptions", ()) + kwargs["mutation_entries"] = kwargs.pop("mutation_entries", []) return self._target_class()(*args, **kwargs) async def _mock_stream(self, mutation_list, error_dict): @@ -78,15 +79,21 @@ def test_ctor(self): from google.cloud.bigtable.data._async._mutate_rows import _EntryWithProto from google.cloud.bigtable.data.exceptions import _MutateRowsIncomplete from google.api_core.exceptions import DeadlineExceeded - from google.api_core.exceptions import ServiceUnavailable + from google.api_core.exceptions import Aborted client = mock.Mock() table = mock.Mock() entries = [_make_mutation(), _make_mutation()] operation_timeout = 0.05 attempt_timeout = 0.01 + retryable_exceptions = () instance = self._make_one( - client, table, entries, operation_timeout, attempt_timeout + client, + table, + entries, + operation_timeout, + attempt_timeout, + retryable_exceptions, ) # running gapic_fn should trigger a client call assert client.mutate_rows.call_count == 0 @@ -110,8 +117,8 @@ def test_ctor(self): assert next(instance.timeout_generator) == attempt_timeout # ensure predicate is set assert instance.is_retryable is not None - assert instance.is_retryable(DeadlineExceeded("")) is True - assert instance.is_retryable(ServiceUnavailable("")) is True + assert instance.is_retryable(DeadlineExceeded("")) is False + assert instance.is_retryable(Aborted("")) is False assert instance.is_retryable(_MutateRowsIncomplete("")) is True assert instance.is_retryable(RuntimeError("")) is False assert instance.remaining_indices == list(range(len(entries))) @@ -232,7 +239,7 @@ async def test_mutate_rows_exception(self, exc_type): @pytest.mark.parametrize( "exc_type", - [core_exceptions.DeadlineExceeded, core_exceptions.ServiceUnavailable], + [core_exceptions.DeadlineExceeded, RuntimeError], ) @pytest.mark.asyncio async def test_mutate_rows_exception_retryable_eventually_pass(self, exc_type): @@ -256,7 +263,12 @@ async def test_mutate_rows_exception_retryable_eventually_pass(self, exc_type): ) as attempt_mock: attempt_mock.side_effect = [expected_cause] * num_retries + [None] instance = self._make_one( - client, table, entries, operation_timeout, operation_timeout + client, + table, + entries, + operation_timeout, + operation_timeout, + retryable_exceptions=(exc_type,), ) await instance.start() assert attempt_mock.call_count == num_retries + 1 diff --git a/tests/unit/data/_async/test_client.py b/tests/unit/data/_async/test_client.py index 7718246fc..54bbb6158 100644 --- a/tests/unit/data/_async/test_client.py +++ b/tests/unit/data/_async/test_client.py @@ -26,6 +26,8 @@ from google.cloud.bigtable.data.read_rows_query import ReadRowsQuery from google.api_core import exceptions as core_exceptions from google.cloud.bigtable.data.exceptions import InvalidChunk +from google.cloud.bigtable.data.exceptions import _MutateRowsIncomplete +from google.cloud.bigtable.data import TABLE_DEFAULT from google.cloud.bigtable.data.read_modify_write_rules import IncrementRule from google.cloud.bigtable.data.read_modify_write_rules import AppendValueRule @@ -841,6 +843,39 @@ async def test_get_table(self): assert client._instance_owners[instance_key] == {id(table)} await client.close() + @pytest.mark.asyncio + async def test_get_table_arg_passthrough(self): + """ + All arguments passed in get_table should be sent to constructor + """ + async with self._make_one(project="project-id") as client: + with mock.patch( + "google.cloud.bigtable.data._async.client.TableAsync.__init__", + ) as mock_constructor: + mock_constructor.return_value = None + assert not client._active_instances + expected_table_id = "table-id" + expected_instance_id = "instance-id" + expected_app_profile_id = "app-profile-id" + expected_args = (1, "test", {"test": 2}) + expected_kwargs = {"hello": "world", "test": 2} + + client.get_table( + expected_instance_id, + expected_table_id, + expected_app_profile_id, + *expected_args, + **expected_kwargs, + ) + mock_constructor.assert_called_once_with( + client, + expected_instance_id, + expected_table_id, + expected_app_profile_id, + *expected_args, + **expected_kwargs, + ) + @pytest.mark.asyncio async def test_get_table_context_manager(self): from google.cloud.bigtable.data._async.client import TableAsync @@ -1099,6 +1134,173 @@ def test_table_ctor_sync(self): TableAsync(client, "instance-id", "table-id") assert e.match("TableAsync must be created within an async event loop context.") + @pytest.mark.asyncio + # iterate over all retryable rpcs + @pytest.mark.parametrize( + "fn_name,fn_args,retry_fn_path,extra_retryables", + [ + ( + "read_rows_stream", + (ReadRowsQuery(),), + "google.cloud.bigtable.data._async._read_rows.retry_target_stream", + (), + ), + ( + "read_rows", + (ReadRowsQuery(),), + "google.cloud.bigtable.data._async._read_rows.retry_target_stream", + (), + ), + ( + "read_row", + (b"row_key",), + "google.cloud.bigtable.data._async._read_rows.retry_target_stream", + (), + ), + ( + "read_rows_sharded", + ([ReadRowsQuery()],), + "google.cloud.bigtable.data._async._read_rows.retry_target_stream", + (), + ), + ( + "row_exists", + (b"row_key",), + "google.cloud.bigtable.data._async._read_rows.retry_target_stream", + (), + ), + ("sample_row_keys", (), "google.api_core.retry_async.retry_target", ()), + ( + "mutate_row", + (b"row_key", []), + "google.api_core.retry_async.retry_target", + (), + ), + ( + "bulk_mutate_rows", + ([mutations.RowMutationEntry(b"key", [mock.Mock()])],), + "google.api_core.retry_async.retry_target", + (_MutateRowsIncomplete,), + ), + ], + ) + # test different inputs for retryable exceptions + @pytest.mark.parametrize( + "input_retryables,expected_retryables", + [ + ( + TABLE_DEFAULT.READ_ROWS, + [ + core_exceptions.DeadlineExceeded, + core_exceptions.ServiceUnavailable, + core_exceptions.Aborted, + ], + ), + ( + TABLE_DEFAULT.DEFAULT, + [core_exceptions.DeadlineExceeded, core_exceptions.ServiceUnavailable], + ), + ( + TABLE_DEFAULT.MUTATE_ROWS, + [core_exceptions.DeadlineExceeded, core_exceptions.ServiceUnavailable], + ), + ([], []), + ([4], [core_exceptions.DeadlineExceeded]), + ], + ) + async def test_customizable_retryable_errors( + self, + input_retryables, + expected_retryables, + fn_name, + fn_args, + retry_fn_path, + extra_retryables, + ): + """ + Test that retryable functions support user-configurable arguments, and that the configured retryables are passed + down to the gapic layer. + """ + from google.cloud.bigtable.data import BigtableDataClientAsync + + with mock.patch( + "google.api_core.retry_async.if_exception_type" + ) as predicate_builder_mock: + with mock.patch(retry_fn_path) as retry_fn_mock: + async with BigtableDataClientAsync() as client: + table = client.get_table("instance-id", "table-id") + expected_predicate = lambda a: a in expected_retryables # noqa + predicate_builder_mock.return_value = expected_predicate + retry_fn_mock.side_effect = RuntimeError("stop early") + with pytest.raises(Exception): + # we expect an exception from attempting to call the mock + test_fn = table.__getattribute__(fn_name) + await test_fn(*fn_args, retryable_errors=input_retryables) + # passed in errors should be used to build the predicate + predicate_builder_mock.assert_called_once_with( + *expected_retryables, *extra_retryables + ) + retry_call_args = retry_fn_mock.call_args_list[0].args + # output of if_exception_type should be sent in to retry constructor + assert retry_call_args[1] is expected_predicate + + @pytest.mark.parametrize( + "fn_name,fn_args,gapic_fn", + [ + ("read_rows_stream", (ReadRowsQuery(),), "read_rows"), + ("read_rows", (ReadRowsQuery(),), "read_rows"), + ("read_row", (b"row_key",), "read_rows"), + ("read_rows_sharded", ([ReadRowsQuery()],), "read_rows"), + ("row_exists", (b"row_key",), "read_rows"), + ("sample_row_keys", (), "sample_row_keys"), + ("mutate_row", (b"row_key", []), "mutate_row"), + ( + "bulk_mutate_rows", + ([mutations.RowMutationEntry(b"key", [mock.Mock()])],), + "mutate_rows", + ), + ("check_and_mutate_row", (b"row_key", None), "check_and_mutate_row"), + ( + "read_modify_write_row", + (b"row_key", mock.Mock()), + "read_modify_write_row", + ), + ], + ) + @pytest.mark.parametrize("include_app_profile", [True, False]) + @pytest.mark.asyncio + async def test_call_metadata(self, include_app_profile, fn_name, fn_args, gapic_fn): + """check that all requests attach proper metadata headers""" + from google.cloud.bigtable.data import TableAsync + from google.cloud.bigtable.data import BigtableDataClientAsync + + profile = "profile" if include_app_profile else None + with mock.patch( + f"google.cloud.bigtable_v2.BigtableAsyncClient.{gapic_fn}", mock.AsyncMock() + ) as gapic_mock: + gapic_mock.side_effect = RuntimeError("stop early") + async with BigtableDataClientAsync() as client: + table = TableAsync(client, "instance-id", "table-id", profile) + try: + test_fn = table.__getattribute__(fn_name) + maybe_stream = await test_fn(*fn_args) + [i async for i in maybe_stream] + except Exception: + # we expect an exception from attempting to call the mock + pass + kwargs = gapic_mock.call_args_list[0].kwargs + metadata = kwargs["metadata"] + goog_metadata = None + for key, value in metadata: + if key == "x-goog-request-params": + goog_metadata = value + assert goog_metadata is not None, "x-goog-request-params not found" + assert "table_name=" + table.table_name in goog_metadata + if include_app_profile: + assert "app_profile_id=profile" in goog_metadata + else: + assert "app_profile_id=" not in goog_metadata + class TestReadRows: """ @@ -1608,28 +1810,6 @@ async def test_row_exists(self, return_value, expected_result): assert query.limit == 1 assert query.filter._to_dict() == expected_filter - @pytest.mark.parametrize("include_app_profile", [True, False]) - @pytest.mark.asyncio - async def test_read_rows_metadata(self, include_app_profile): - """request should attach metadata headers""" - profile = "profile" if include_app_profile else None - async with self._make_table(app_profile_id=profile) as table: - read_rows = table.client._gapic_client.read_rows - read_rows.return_value = self._make_gapic_stream([]) - await table.read_rows(ReadRowsQuery()) - kwargs = read_rows.call_args_list[0].kwargs - metadata = kwargs["metadata"] - goog_metadata = None - for key, value in metadata: - if key == "x-goog-request-params": - goog_metadata = value - assert goog_metadata is not None, "x-goog-request-params not found" - assert "table_name=" + table.table_name in goog_metadata - if include_app_profile: - assert "app_profile_id=profile" in goog_metadata - else: - assert "app_profile_id=" not in goog_metadata - class TestReadRowsSharded: def _make_client(self, *args, **kwargs): @@ -1735,30 +1915,6 @@ async def mock_call(*args, **kwargs): # if run in sequence, we would expect this to take 1 second assert call_time < 0.2 - @pytest.mark.parametrize("include_app_profile", [True, False]) - @pytest.mark.asyncio - async def test_read_rows_sharded_metadata(self, include_app_profile): - """request should attach metadata headers""" - profile = "profile" if include_app_profile else None - async with self._make_client() as client: - async with client.get_table("i", "t", app_profile_id=profile) as table: - with mock.patch.object( - client._gapic_client, "read_rows", AsyncMock() - ) as read_rows: - await table.read_rows_sharded([ReadRowsQuery()]) - kwargs = read_rows.call_args_list[0].kwargs - metadata = kwargs["metadata"] - goog_metadata = None - for key, value in metadata: - if key == "x-goog-request-params": - goog_metadata = value - assert goog_metadata is not None, "x-goog-request-params not found" - assert "table_name=" + table.table_name in goog_metadata - if include_app_profile: - assert "app_profile_id=profile" in goog_metadata - else: - assert "app_profile_id=" not in goog_metadata - @pytest.mark.asyncio async def test_read_rows_sharded_batching(self): """ @@ -1875,7 +2031,10 @@ async def test_sample_row_keys_default_timeout(self): expected_timeout = 99 async with self._make_client() as client: async with client.get_table( - "i", "t", default_operation_timeout=expected_timeout + "i", + "t", + default_operation_timeout=expected_timeout, + default_attempt_timeout=expected_timeout, ) as table: with mock.patch.object( table.client._gapic_client, "sample_row_keys", AsyncMock() @@ -1914,30 +2073,6 @@ async def test_sample_row_keys_gapic_params(self): assert kwargs["metadata"] is not None assert kwargs["retry"] is None - @pytest.mark.parametrize("include_app_profile", [True, False]) - @pytest.mark.asyncio - async def test_sample_row_keys_metadata(self, include_app_profile): - """request should attach metadata headers""" - profile = "profile" if include_app_profile else None - async with self._make_client() as client: - async with client.get_table("i", "t", app_profile_id=profile) as table: - with mock.patch.object( - client._gapic_client, "sample_row_keys", AsyncMock() - ) as read_rows: - await table.sample_row_keys() - kwargs = read_rows.call_args_list[0].kwargs - metadata = kwargs["metadata"] - goog_metadata = None - for key, value in metadata: - if key == "x-goog-request-params": - goog_metadata = value - assert goog_metadata is not None, "x-goog-request-params not found" - assert "table_name=" + table.table_name in goog_metadata - if include_app_profile: - assert "app_profile_id=profile" in goog_metadata - else: - assert "app_profile_id=" not in goog_metadata - @pytest.mark.parametrize( "retryable_exception", [ @@ -2525,39 +2660,6 @@ async def test_bulk_mutate_error_index(self): assert isinstance(cause.exceptions[1], DeadlineExceeded) assert isinstance(cause.exceptions[2], FailedPrecondition) - @pytest.mark.parametrize("include_app_profile", [True, False]) - @pytest.mark.asyncio - async def test_bulk_mutate_row_metadata(self, include_app_profile): - """request should attach metadata headers""" - profile = "profile" if include_app_profile else None - async with self._make_client() as client: - async with client.get_table("i", "t", app_profile_id=profile) as table: - with mock.patch.object( - client._gapic_client, "mutate_rows", AsyncMock() - ) as mutate_rows: - mutate_rows.side_effect = core_exceptions.Aborted("mock") - mutation = mock.Mock() - mutation.size.return_value = 1 - entry = mock.Mock() - entry.mutations = [mutation] - try: - await table.bulk_mutate_rows([entry]) - except Exception: - # exception used to end early - pass - kwargs = mutate_rows.call_args_list[0].kwargs - metadata = kwargs["metadata"] - goog_metadata = None - for key, value in metadata: - if key == "x-goog-request-params": - goog_metadata = value - assert goog_metadata is not None, "x-goog-request-params not found" - assert "table_name=" + table.table_name in goog_metadata - if include_app_profile: - assert "app_profile_id=profile" in goog_metadata - else: - assert "app_profile_id=" not in goog_metadata - class TestCheckAndMutateRow: def _make_client(self, *args, **kwargs): @@ -2727,30 +2829,6 @@ async def test_check_and_mutate_mutations_parsing(self): mutation._to_pb.call_count == 1 for mutation in mutations[:5] ) - @pytest.mark.parametrize("include_app_profile", [True, False]) - @pytest.mark.asyncio - async def test_check_and_mutate_metadata(self, include_app_profile): - """request should attach metadata headers""" - profile = "profile" if include_app_profile else None - async with self._make_client() as client: - async with client.get_table("i", "t", app_profile_id=profile) as table: - with mock.patch.object( - client._gapic_client, "check_and_mutate_row", AsyncMock() - ) as mock_gapic: - await table.check_and_mutate_row(b"key", mock.Mock()) - kwargs = mock_gapic.call_args_list[0].kwargs - metadata = kwargs["metadata"] - goog_metadata = None - for key, value in metadata: - if key == "x-goog-request-params": - goog_metadata = value - assert goog_metadata is not None, "x-goog-request-params not found" - assert "table_name=" + table.table_name in goog_metadata - if include_app_profile: - assert "app_profile_id=profile" in goog_metadata - else: - assert "app_profile_id=" not in goog_metadata - class TestReadModifyWriteRow: def _make_client(self, *args, **kwargs): @@ -2882,27 +2960,3 @@ async def test_read_modify_write_row_building(self): await table.read_modify_write_row("key", mock.Mock()) assert constructor_mock.call_count == 1 constructor_mock.assert_called_once_with(mock_response.row) - - @pytest.mark.parametrize("include_app_profile", [True, False]) - @pytest.mark.asyncio - async def test_read_modify_write_metadata(self, include_app_profile): - """request should attach metadata headers""" - profile = "profile" if include_app_profile else None - async with self._make_client() as client: - async with client.get_table("i", "t", app_profile_id=profile) as table: - with mock.patch.object( - client._gapic_client, "read_modify_write_row", AsyncMock() - ) as mock_gapic: - await table.read_modify_write_row("key", mock.Mock()) - kwargs = mock_gapic.call_args_list[0].kwargs - metadata = kwargs["metadata"] - goog_metadata = None - for key, value in metadata: - if key == "x-goog-request-params": - goog_metadata = value - assert goog_metadata is not None, "x-goog-request-params not found" - assert "table_name=" + table.table_name in goog_metadata - if include_app_profile: - assert "app_profile_id=profile" in goog_metadata - else: - assert "app_profile_id=" not in goog_metadata diff --git a/tests/unit/data/_async/test_mutations_batcher.py b/tests/unit/data/_async/test_mutations_batcher.py index f95b53271..17bd8d420 100644 --- a/tests/unit/data/_async/test_mutations_batcher.py +++ b/tests/unit/data/_async/test_mutations_batcher.py @@ -14,6 +14,9 @@ import pytest import asyncio +import google.api_core.exceptions as core_exceptions +from google.cloud.bigtable.data.exceptions import _MutateRowsIncomplete +from google.cloud.bigtable.data import TABLE_DEFAULT # try/except added for compatibility with python < 3.8 try: @@ -286,10 +289,17 @@ def _get_target_class(self): return MutationsBatcherAsync def _make_one(self, table=None, **kwargs): + from google.api_core.exceptions import DeadlineExceeded + from google.api_core.exceptions import ServiceUnavailable + if table is None: table = mock.Mock() table.default_mutate_rows_operation_timeout = 10 table.default_mutate_rows_attempt_timeout = 10 + table.default_mutate_rows_retryable_errors = ( + DeadlineExceeded, + ServiceUnavailable, + ) return self._get_target_class()(table, **kwargs) @@ -302,6 +312,7 @@ async def test_ctor_defaults(self, flush_timer_mock): table = mock.Mock() table.default_mutate_rows_operation_timeout = 10 table.default_mutate_rows_attempt_timeout = 8 + table.default_mutate_rows_retryable_errors = [Exception] async with self._make_one(table) as instance: assert instance._table == table assert instance.closed is False @@ -323,6 +334,9 @@ async def test_ctor_defaults(self, flush_timer_mock): assert ( instance._attempt_timeout == table.default_mutate_rows_attempt_timeout ) + assert ( + instance._retryable_errors == table.default_mutate_rows_retryable_errors + ) await asyncio.sleep(0) assert flush_timer_mock.call_count == 1 assert flush_timer_mock.call_args[0][0] == 5 @@ -343,6 +357,7 @@ async def test_ctor_explicit(self, flush_timer_mock): flow_control_max_bytes = 12 operation_timeout = 11 attempt_timeout = 2 + retryable_errors = [Exception] async with self._make_one( table, flush_interval=flush_interval, @@ -352,6 +367,7 @@ async def test_ctor_explicit(self, flush_timer_mock): flow_control_max_bytes=flow_control_max_bytes, batch_operation_timeout=operation_timeout, batch_attempt_timeout=attempt_timeout, + batch_retryable_errors=retryable_errors, ) as instance: assert instance._table == table assert instance.closed is False @@ -371,6 +387,7 @@ async def test_ctor_explicit(self, flush_timer_mock): assert instance._entries_processed_since_last_raise == 0 assert instance._operation_timeout == operation_timeout assert instance._attempt_timeout == attempt_timeout + assert instance._retryable_errors == retryable_errors await asyncio.sleep(0) assert flush_timer_mock.call_count == 1 assert flush_timer_mock.call_args[0][0] == flush_interval @@ -386,6 +403,7 @@ async def test_ctor_no_flush_limits(self, flush_timer_mock): table = mock.Mock() table.default_mutate_rows_operation_timeout = 10 table.default_mutate_rows_attempt_timeout = 8 + table.default_mutate_rows_retryable_errors = () flush_interval = None flush_limit_count = None flush_limit_bytes = None @@ -442,7 +460,7 @@ def test_default_argument_consistency(self): batcher_init_signature.pop("table") # both should have same number of arguments assert len(get_batcher_signature.keys()) == len(batcher_init_signature.keys()) - assert len(get_batcher_signature) == 7 # update if expected params change + assert len(get_batcher_signature) == 8 # update if expected params change # both should have same argument names assert set(get_batcher_signature.keys()) == set(batcher_init_signature.keys()) # both should have same default values @@ -882,6 +900,7 @@ async def test__execute_mutate_rows(self, mutate_rows): table.app_profile_id = "test-app-profile" table.default_mutate_rows_operation_timeout = 17 table.default_mutate_rows_attempt_timeout = 13 + table.default_mutate_rows_retryable_errors = () async with self._make_one(table) as instance: batch = [_make_mutation()] result = await instance._execute_mutate_rows(batch) @@ -911,6 +930,7 @@ async def test__execute_mutate_rows_returns_errors(self, mutate_rows): table = mock.Mock() table.default_mutate_rows_operation_timeout = 17 table.default_mutate_rows_attempt_timeout = 13 + table.default_mutate_rows_retryable_errors = () async with self._make_one(table) as instance: batch = [_make_mutation()] result = await instance._execute_mutate_rows(batch) @@ -1102,3 +1122,63 @@ def test__add_exceptions(self, limit, in_e, start_e, end_e): # then, the newest slots should be filled with the last items of the input list for i in range(1, newest_list_diff + 1): assert mock_batcher._newest_exceptions[-i] == input_list[-i] + + @pytest.mark.asyncio + # test different inputs for retryable exceptions + @pytest.mark.parametrize( + "input_retryables,expected_retryables", + [ + ( + TABLE_DEFAULT.READ_ROWS, + [ + core_exceptions.DeadlineExceeded, + core_exceptions.ServiceUnavailable, + core_exceptions.Aborted, + ], + ), + ( + TABLE_DEFAULT.DEFAULT, + [core_exceptions.DeadlineExceeded, core_exceptions.ServiceUnavailable], + ), + ( + TABLE_DEFAULT.MUTATE_ROWS, + [core_exceptions.DeadlineExceeded, core_exceptions.ServiceUnavailable], + ), + ([], []), + ([4], [core_exceptions.DeadlineExceeded]), + ], + ) + async def test_customizable_retryable_errors( + self, input_retryables, expected_retryables + ): + """ + Test that retryable functions support user-configurable arguments, and that the configured retryables are passed + down to the gapic layer. + """ + from google.cloud.bigtable.data._async.client import TableAsync + + with mock.patch( + "google.api_core.retry_async.if_exception_type" + ) as predicate_builder_mock: + with mock.patch( + "google.api_core.retry_async.retry_target" + ) as retry_fn_mock: + table = None + with mock.patch("asyncio.create_task"): + table = TableAsync(mock.Mock(), "instance", "table") + async with self._make_one( + table, batch_retryable_errors=input_retryables + ) as instance: + assert instance._retryable_errors == expected_retryables + expected_predicate = lambda a: a in expected_retryables # noqa + predicate_builder_mock.return_value = expected_predicate + retry_fn_mock.side_effect = RuntimeError("stop early") + mutation = _make_mutation(count=1, size=1) + await instance._execute_mutate_rows([mutation]) + # passed in errors should be used to build the predicate + predicate_builder_mock.assert_called_once_with( + *expected_retryables, _MutateRowsIncomplete + ) + retry_call_args = retry_fn_mock.call_args_list[0].args + # output of if_exception_type should be sent in to retry constructor + assert retry_call_args[1] is expected_predicate diff --git a/tests/unit/data/test__helpers.py b/tests/unit/data/test__helpers.py index 6c11fa86a..b9c1dc2bb 100644 --- a/tests/unit/data/test__helpers.py +++ b/tests/unit/data/test__helpers.py @@ -13,6 +13,8 @@ # import pytest +import grpc +from google.api_core import exceptions as core_exceptions import google.cloud.bigtable.data._helpers as _helpers from google.cloud.bigtable.data._helpers import TABLE_DEFAULT import google.cloud.bigtable.data.exceptions as bigtable_exceptions @@ -264,3 +266,49 @@ def test_get_timeouts_invalid(self, input_times, input_table): setattr(fake_table, f"default_{key}_timeout", input_table[key]) with pytest.raises(ValueError): _helpers._get_timeouts(input_times[0], input_times[1], fake_table) + + +class TestGetRetryableErrors: + @pytest.mark.parametrize( + "input_codes,input_table,expected", + [ + ((), {}, []), + ((Exception,), {}, [Exception]), + (TABLE_DEFAULT.DEFAULT, {"default": [Exception]}, [Exception]), + ( + TABLE_DEFAULT.READ_ROWS, + {"default_read_rows": (RuntimeError, ValueError)}, + [RuntimeError, ValueError], + ), + ( + TABLE_DEFAULT.MUTATE_ROWS, + {"default_mutate_rows": (ValueError,)}, + [ValueError], + ), + ((4,), {}, [core_exceptions.DeadlineExceeded]), + ( + [grpc.StatusCode.DEADLINE_EXCEEDED], + {}, + [core_exceptions.DeadlineExceeded], + ), + ( + (14, grpc.StatusCode.ABORTED, RuntimeError), + {}, + [ + core_exceptions.ServiceUnavailable, + core_exceptions.Aborted, + RuntimeError, + ], + ), + ], + ) + def test_get_retryable_errors(self, input_codes, input_table, expected): + """ + test input/output mappings for a variety of valid inputs + """ + fake_table = mock.Mock() + for key in input_table.keys(): + # set the default fields in our fake table mock + setattr(fake_table, f"{key}_retryable_errors", input_table[key]) + result = _helpers._get_retryable_errors(input_codes, fake_table) + assert result == expected From 1288727d4356a34e22690ce42d605a688f37f103 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Fri, 8 Dec 2023 17:46:45 -0800 Subject: [PATCH 03/15] use exception factory instead of conversion wrappers --- .../bigtable/data/_async/_mutate_rows.py | 22 +++--- .../cloud/bigtable/data/_async/_read_rows.py | 42 ++--------- google/cloud/bigtable/data/_async/client.py | 64 ++++++----------- google/cloud/bigtable/data/_helpers.py | 71 +++++++------------ noxfile.py | 2 +- python-api-core | 2 +- testing/constraints-3.8.txt | 2 +- tests/unit/data/_async/test_client.py | 34 ++++----- 8 files changed, 79 insertions(+), 160 deletions(-) diff --git a/google/cloud/bigtable/data/_async/_mutate_rows.py b/google/cloud/bigtable/data/_async/_mutate_rows.py index 5971a9894..a0fb02645 100644 --- a/google/cloud/bigtable/data/_async/_mutate_rows.py +++ b/google/cloud/bigtable/data/_async/_mutate_rows.py @@ -20,12 +20,12 @@ import functools from google.api_core import exceptions as core_exceptions -from google.api_core import retry_async as retries +from google.api_core import retry as retries import google.cloud.bigtable_v2.types.bigtable as types_pb import google.cloud.bigtable.data.exceptions as bt_exceptions from google.cloud.bigtable.data._helpers import _make_metadata -from google.cloud.bigtable.data._helpers import _convert_retry_deadline from google.cloud.bigtable.data._helpers import _attempt_timeout_generator +from google.cloud.bigtable.data._helpers import _retry_exception_factory # mutate_rows requests are limited to this number of mutations from google.cloud.bigtable.data.mutations import _MUTATE_ROWS_REQUEST_MUTATION_LIMIT @@ -101,17 +101,13 @@ def __init__( # Entry level errors bt_exceptions._MutateRowsIncomplete, ) - # build retryable operation - retry = retries.AsyncRetry( - predicate=self.is_retryable, - timeout=operation_timeout, - initial=0.01, - multiplier=2, - maximum=60, - ) - retry_wrapped = retry(self._run_attempt) - self._operation = _convert_retry_deadline( - retry_wrapped, operation_timeout, is_async=True + sleep_generator = retries.exponential_sleep_generator(0.01, 2, 60) + self._operation = retries.retry_target_async( + self._run_attempt, + self.is_retryable, + sleep_generator, + operation_timeout, + exception_factory=_retry_exception_factory ) # initialize state self.timeout_generator = _attempt_timeout_generator( diff --git a/google/cloud/bigtable/data/_async/_read_rows.py b/google/cloud/bigtable/data/_async/_read_rows.py index ad1f7b84d..9e0fd78e1 100644 --- a/google/cloud/bigtable/data/_async/_read_rows.py +++ b/google/cloud/bigtable/data/_async/_read_rows.py @@ -31,15 +31,13 @@ from google.cloud.bigtable.data.row import Row, Cell from google.cloud.bigtable.data.read_rows_query import ReadRowsQuery from google.cloud.bigtable.data.exceptions import InvalidChunk -from google.cloud.bigtable.data.exceptions import RetryExceptionGroup from google.cloud.bigtable.data.exceptions import _RowSetComplete from google.cloud.bigtable.data._helpers import _attempt_timeout_generator from google.cloud.bigtable.data._helpers import _make_metadata +from google.cloud.bigtable.data._helpers import _retry_exception_factory -from google.api_core import retry_async as retries -from google.api_core.retry_streaming_async import retry_target_stream +from google.api_core import retry as retries from google.api_core.retry import exponential_sleep_generator -from google.api_core import exceptions as core_exceptions if TYPE_CHECKING: from google.cloud.bigtable.data._async.client import TableAsync @@ -107,12 +105,12 @@ def start_operation(self) -> AsyncGenerator[Row, None]: """ Start the read_rows operation, retrying on retryable errors. """ - return retry_target_stream( + return retries.retry_target_stream_async( self._read_rows_attempt, self._predicate, exponential_sleep_generator(0.01, 60, multiplier=2), self.operation_timeout, - exception_factory=self._build_exception, + exception_factory=_retry_exception_factory, ) def _read_rows_attempt(self) -> AsyncGenerator[Row, None]: @@ -343,35 +341,3 @@ def _revise_request_rowset( # this will avoid an unwanted full table scan raise _RowSetComplete() return RowSetPB(row_keys=adjusted_keys, row_ranges=adjusted_ranges) - - @staticmethod - def _build_exception( - exc_list: list[Exception], is_timeout: bool, timeout_val: float - ) -> tuple[Exception, Exception | None]: - """ - Build retry error based on exceptions encountered during operation - - Args: - - exc_list: list of exceptions encountered during operation - - is_timeout: whether the operation failed due to timeout - - timeout_val: the operation timeout value in seconds, for constructing - the error message - Returns: - - tuple of the exception to raise, and a cause exception if applicable - """ - if is_timeout: - # if failed due to timeout, raise deadline exceeded as primary exception - source_exc: Exception = core_exceptions.DeadlineExceeded( - f"operation_timeout of {timeout_val} exceeded" - ) - elif exc_list: - # otherwise, raise non-retryable error as primary exception - source_exc = exc_list.pop() - else: - source_exc = RuntimeError("failed with unspecified exception") - # use the retry exception group as the cause of the exception - cause_exc: Exception | None = ( - RetryExceptionGroup(exc_list) if exc_list else None - ) - source_exc.__cause__ = cause_exc - return source_exc, cause_exc diff --git a/google/cloud/bigtable/data/_async/client.py b/google/cloud/bigtable/data/_async/client.py index a79ead7f8..0d0334b34 100644 --- a/google/cloud/bigtable/data/_async/client.py +++ b/google/cloud/bigtable/data/_async/client.py @@ -33,6 +33,7 @@ import random import os +from functools import partial from google.cloud.bigtable_v2.services.bigtable.client import BigtableClientMeta from google.cloud.bigtable_v2.services.bigtable.async_client import BigtableAsyncClient @@ -45,7 +46,7 @@ from google.cloud.client import ClientWithProject from google.api_core.exceptions import GoogleAPICallError from google.cloud.environment_vars import BIGTABLE_EMULATOR # type: ignore -from google.api_core import retry_async as retries +from google.api_core import retry as retries from google.api_core.exceptions import DeadlineExceeded from google.api_core.exceptions import ServiceUnavailable from google.api_core.exceptions import Aborted @@ -65,7 +66,7 @@ from google.cloud.bigtable.data._helpers import _WarmedInstanceKey from google.cloud.bigtable.data._helpers import _CONCURRENCY_LIMIT from google.cloud.bigtable.data._helpers import _make_metadata -from google.cloud.bigtable.data._helpers import _convert_retry_deadline +from google.cloud.bigtable.data._helpers import _retry_exception_factory from google.cloud.bigtable.data._helpers import _validate_timeouts from google.cloud.bigtable.data._helpers import _get_retryable_errors from google.cloud.bigtable.data._helpers import _get_timeouts @@ -872,22 +873,8 @@ async def sample_row_keys( # prepare retryable retryable_excs = _get_retryable_errors(retryable_errors, self) predicate = retries.if_exception_type(*retryable_excs) - transient_errors = [] - def on_error_fn(exc): - # add errors to list if retryable - if predicate(exc): - transient_errors.append(exc) - - retry = retries.AsyncRetry( - predicate=predicate, - timeout=operation_timeout, - initial=0.01, - multiplier=2, - maximum=60, - on_error=on_error_fn, - is_stream=False, - ) + sleep_generator = retries.exponential_sleep_generator(0.01, 2, 60) # prepare request metadata = _make_metadata(self.table_name, self.app_profile_id) @@ -902,10 +889,13 @@ async def execute_rpc(): ) return [(s.row_key, s.offset_bytes) async for s in results] - wrapped_fn = _convert_retry_deadline( - retry(execute_rpc), operation_timeout, transient_errors, is_async=True + return await retries.retry_target_async( + execute_rpc, + predicate, + sleep_generator, + operation_timeout, + exception_factory=_retry_exception_factory, ) - return await wrapped_fn() def mutations_batcher( self, @@ -1014,37 +1004,25 @@ async def mutate_row( # mutations should not be retried predicate = retries.if_exception_type() - transient_errors = [] - - def on_error_fn(exc): - if predicate(exc): - transient_errors.append(exc) + sleep_generator = retries.exponential_sleep_generator(0.01, 2, 60) - retry = retries.AsyncRetry( - predicate=predicate, - on_error=on_error_fn, - timeout=operation_timeout, - initial=0.01, - multiplier=2, - maximum=60, - ) - # wrap rpc in retry logic - retry_wrapped = retry(self.client._gapic_client.mutate_row) - # convert RetryErrors from retry wrapper into DeadlineExceeded errors - deadline_wrapped = _convert_retry_deadline( - retry_wrapped, operation_timeout, transient_errors, is_async=True - ) - metadata = _make_metadata(self.table_name, self.app_profile_id) - # trigger rpc - await deadline_wrapped( + target = partial( + self.client._gapic_client.mutate_row, row_key=row_key.encode("utf-8") if isinstance(row_key, str) else row_key, mutations=[mutation._to_pb() for mutation in mutations_list], table_name=self.table_name, app_profile_id=self.app_profile_id, timeout=attempt_timeout, - metadata=metadata, + metadata=_make_metadata(self.table_name, self.app_profile_id), retry=None, ) + return await retries.retry_target_async( + target, + predicate, + sleep_generator, + operation_timeout, + exception_factory=_retry_exception_factory, + ) async def bulk_mutate_rows( self, diff --git a/google/cloud/bigtable/data/_helpers.py b/google/cloud/bigtable/data/_helpers.py index 96ea1d1ce..6c1e40d52 100644 --- a/google/cloud/bigtable/data/_helpers.py +++ b/google/cloud/bigtable/data/_helpers.py @@ -96,57 +96,36 @@ def _attempt_timeout_generator( yield max(0, min(per_request_timeout, deadline - time.monotonic())) -# TODO:replace this function with an exception_factory passed into the retry when -# feature is merged: -# https://github.com/googleapis/python-bigtable/blob/ea5b4f923e42516729c57113ddbe28096841b952/google/cloud/bigtable/data/_async/_read_rows.py#L130 -def _convert_retry_deadline( - func: Callable[..., Any], - timeout_value: float | None = None, - retry_errors: list[Exception] | None = None, - is_async: bool = False, -): +def _retry_exception_factory( + exc_list: list[Exception], is_timeout: bool, timeout_val: float +) -> tuple[Exception, Exception | None]: """ - Decorator to convert RetryErrors raised by api_core.retry into - DeadlineExceeded exceptions, indicating that the underlying retries have - exhaused the timeout value. - Optionally attaches a RetryExceptionGroup to the DeadlineExceeded.__cause__, - detailing the failed exceptions associated with each retry. - - Supports both sync and async function wrapping. + Build retry error based on exceptions encountered during operation Args: - - func: The function to decorate - - timeout_value: The timeout value to display in the DeadlineExceeded error message - - retry_errors: An optional list of exceptions to attach as a RetryExceptionGroup to the DeadlineExceeded.__cause__ + - exc_list: list of exceptions encountered during operation + - is_timeout: whether the operation failed due to timeout + - timeout_val: the operation timeout value in seconds, for constructing + the error message + Returns: + - tuple of the exception to raise, and a cause exception if applicable """ - timeout_str = f" of {timeout_value:.1f}s" if timeout_value is not None else "" - error_str = f"operation_timeout{timeout_str} exceeded" - - def handle_error(): - new_exc = core_exceptions.DeadlineExceeded( - error_str, + if is_timeout: + # if failed due to timeout, raise deadline exceeded as primary exception + source_exc: Exception = core_exceptions.DeadlineExceeded( + f"operation_timeout of {timeout_val} exceeded" ) - source_exc = None - if retry_errors: - source_exc = RetryExceptionGroup(retry_errors) - new_exc.__cause__ = source_exc - raise new_exc from source_exc - - # separate wrappers for async and sync functions - async def wrapper_async(*args, **kwargs): - try: - return await func(*args, **kwargs) - except core_exceptions.RetryError: - handle_error() - - def wrapper(*args, **kwargs): - try: - return func(*args, **kwargs) - except core_exceptions.RetryError: - handle_error() - - return wrapper_async if is_async else wrapper - + elif exc_list: + # otherwise, raise non-retryable error as primary exception + source_exc = exc_list.pop() + else: + source_exc = RuntimeError("failed with unspecified exception") + # use the retry exception group as the cause of the exception + cause_exc: Exception | None = ( + RetryExceptionGroup(exc_list) if exc_list else None + ) + source_exc.__cause__ = cause_exc + return source_exc, cause_exc def _get_timeouts( operation: float | TABLE_DEFAULT, diff --git a/noxfile.py b/noxfile.py index e1d2f4acc..d6e1c666d 100644 --- a/noxfile.py +++ b/noxfile.py @@ -40,7 +40,7 @@ "pytest-asyncio", ] UNIT_TEST_EXTERNAL_DEPENDENCIES = [ - # "git+https://github.com/googleapis/python-api-core.git@retry_generators" + "git+https://github.com/googleapis/python-api-core.git@retry_generators" ] UNIT_TEST_LOCAL_DEPENDENCIES = [] UNIT_TEST_DEPENDENCIES = [] diff --git a/python-api-core b/python-api-core index a8cfa66b8..71e58888b 160000 --- a/python-api-core +++ b/python-api-core @@ -1 +1 @@ -Subproject commit a8cfa66b8d6001da56823c6488b5da4957e5702b +Subproject commit 71e58888b1687bcc09b0c4c795d2bdf85f6a69c2 diff --git a/testing/constraints-3.8.txt b/testing/constraints-3.8.txt index 7045a2894..a2435078e 100644 --- a/testing/constraints-3.8.txt +++ b/testing/constraints-3.8.txt @@ -5,7 +5,7 @@ # # e.g., if setup.py has "foo >= 1.14.0, < 2.0.0dev", # Then this file should have foo==1.14.0 -google-api-core==2.12.0.dev1 +# google-api-core==2.12.0.dev1 google-cloud-core==2.3.2 grpc-google-iam-v1==0.12.4 proto-plus==1.22.0 diff --git a/tests/unit/data/_async/test_client.py b/tests/unit/data/_async/test_client.py index 54bbb6158..a3cf9b8e6 100644 --- a/tests/unit/data/_async/test_client.py +++ b/tests/unit/data/_async/test_client.py @@ -1142,44 +1142,44 @@ def test_table_ctor_sync(self): ( "read_rows_stream", (ReadRowsQuery(),), - "google.cloud.bigtable.data._async._read_rows.retry_target_stream", + "google.api_core.retry.retry_target_stream_async", (), ), ( "read_rows", (ReadRowsQuery(),), - "google.cloud.bigtable.data._async._read_rows.retry_target_stream", + "google.api_core.retry.retry_target_stream_async", (), ), ( "read_row", (b"row_key",), - "google.cloud.bigtable.data._async._read_rows.retry_target_stream", + "google.api_core.retry.retry_target_stream_async", (), ), ( "read_rows_sharded", ([ReadRowsQuery()],), - "google.cloud.bigtable.data._async._read_rows.retry_target_stream", + "google.api_core.retry.retry_target_stream_async", (), ), ( "row_exists", (b"row_key",), - "google.cloud.bigtable.data._async._read_rows.retry_target_stream", + "google.api_core.retry.retry_target_stream_async", (), ), - ("sample_row_keys", (), "google.api_core.retry_async.retry_target", ()), + ("sample_row_keys", (), "google.api_core.retry.retry_target_async", ()), ( "mutate_row", (b"row_key", []), - "google.api_core.retry_async.retry_target", + "google.api_core.retry.retry_target_async", (), ), ( "bulk_mutate_rows", ([mutations.RowMutationEntry(b"key", [mock.Mock()])],), - "google.api_core.retry_async.retry_target", + "google.api_core.retry.retry_target_async", (_MutateRowsIncomplete,), ), ], @@ -1223,15 +1223,15 @@ async def test_customizable_retryable_errors( """ from google.cloud.bigtable.data import BigtableDataClientAsync - with mock.patch( - "google.api_core.retry_async.if_exception_type" - ) as predicate_builder_mock: - with mock.patch(retry_fn_path) as retry_fn_mock: - async with BigtableDataClientAsync() as client: - table = client.get_table("instance-id", "table-id") - expected_predicate = lambda a: a in expected_retryables # noqa + with mock.patch(retry_fn_path) as retry_fn_mock: + async with BigtableDataClientAsync() as client: + table = client.get_table("instance-id", "table-id") + expected_predicate = lambda a: a in expected_retryables # noqa + retry_fn_mock.side_effect = RuntimeError("stop early") + with mock.patch( + "google.api_core.retry.if_exception_type" + ) as predicate_builder_mock: predicate_builder_mock.return_value = expected_predicate - retry_fn_mock.side_effect = RuntimeError("stop early") with pytest.raises(Exception): # we expect an exception from attempting to call the mock test_fn = table.__getattribute__(fn_name) @@ -2204,7 +2204,7 @@ async def test_mutate_row_retryable_errors(self, retryable_exception): mutation = mutations.DeleteAllFromRow() assert mutation.is_idempotent() is True await table.mutate_row( - "row_key", mutation, operation_timeout=0.05 + "row_key", mutation, operation_timeout=0.01 ) cause = e.value.__cause__ assert isinstance(cause, RetryExceptionGroup) From e334b356f35d47a8fc135cccd7713ee0c5539b30 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Mon, 11 Dec 2023 15:12:03 -0800 Subject: [PATCH 04/15] fixed mutate_rows await --- google/cloud/bigtable/data/_async/_mutate_rows.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/google/cloud/bigtable/data/_async/_mutate_rows.py b/google/cloud/bigtable/data/_async/_mutate_rows.py index a0fb02645..8d4f849e0 100644 --- a/google/cloud/bigtable/data/_async/_mutate_rows.py +++ b/google/cloud/bigtable/data/_async/_mutate_rows.py @@ -126,7 +126,7 @@ async def start(self): """ try: # trigger mutate_rows - await self._operation() + await self._operation except Exception as exc: # exceptions raised by retryable are added to the list of exceptions for all unfinalized mutations incomplete_indices = self.remaining_indices.copy() From 6f02348ed670e0f8072c92cbd4ad17c1de8f64ca Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Mon, 11 Dec 2023 15:12:21 -0800 Subject: [PATCH 05/15] fixed exception factory --- google/cloud/bigtable/data/_helpers.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/google/cloud/bigtable/data/_helpers.py b/google/cloud/bigtable/data/_helpers.py index 6c1e40d52..fc0344e3c 100644 --- a/google/cloud/bigtable/data/_helpers.py +++ b/google/cloud/bigtable/data/_helpers.py @@ -23,6 +23,7 @@ from google.cloud.bigtable.data.read_rows_query import ReadRowsQuery from google.api_core import exceptions as core_exceptions +from google.api_core.retry import RetryFailureReason from google.cloud.bigtable.data.exceptions import RetryExceptionGroup if TYPE_CHECKING: @@ -97,7 +98,7 @@ def _attempt_timeout_generator( def _retry_exception_factory( - exc_list: list[Exception], is_timeout: bool, timeout_val: float + exc_list: list[Exception], reason: RetryFailureReason, timeout_val: float, **kwargs ) -> tuple[Exception, Exception | None]: """ Build retry error based on exceptions encountered during operation @@ -110,7 +111,7 @@ def _retry_exception_factory( Returns: - tuple of the exception to raise, and a cause exception if applicable """ - if is_timeout: + if reason == RetryFailureReason.TIMEOUT: # if failed due to timeout, raise deadline exceeded as primary exception source_exc: Exception = core_exceptions.DeadlineExceeded( f"operation_timeout of {timeout_val} exceeded" From cef177f94003d5d3e69d2bc0d364b8c05d9a5805 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Mon, 11 Dec 2023 15:12:34 -0800 Subject: [PATCH 06/15] removed race condition workaround --- google/cloud/bigtable/data/_async/_mutate_rows.py | 7 ------- 1 file changed, 7 deletions(-) diff --git a/google/cloud/bigtable/data/_async/_mutate_rows.py b/google/cloud/bigtable/data/_async/_mutate_rows.py index 8d4f849e0..d6058cfdb 100644 --- a/google/cloud/bigtable/data/_async/_mutate_rows.py +++ b/google/cloud/bigtable/data/_async/_mutate_rows.py @@ -191,13 +191,6 @@ async def _run_attempt(self): self._handle_entry_error(orig_idx, entry_error) # remove processed entry from active list del active_request_indices[result.index] - except asyncio.CancelledError: - # when retry wrapper timeout expires, the operation is cancelled - # make sure incomplete indices are tracked, - # but don't record exception (it will be raised by wrapper) - # TODO: remove asyncio.wait_for in retry wrapper. Let grpc call handle expiration - self.remaining_indices.extend(active_request_indices.values()) - raise except Exception as exc: # add this exception to list for each mutation that wasn't # already handled, and update remaining_indices if mutation is retryable From 2846e8ba31adcfd0e77c7891e8a74e12fdf52788 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Mon, 11 Dec 2023 15:35:02 -0800 Subject: [PATCH 07/15] fixed tests --- tests/unit/data/_async/test__mutate_rows.py | 10 +-- tests/unit/data/_async/test_client.py | 6 +- .../data/_async/test_mutations_batcher.py | 4 +- tests/unit/data/test__helpers.py | 65 ------------------- 4 files changed, 10 insertions(+), 75 deletions(-) diff --git a/tests/unit/data/_async/test__mutate_rows.py b/tests/unit/data/_async/test__mutate_rows.py index d41929518..1860c0be2 100644 --- a/tests/unit/data/_async/test__mutate_rows.py +++ b/tests/unit/data/_async/test__mutate_rows.py @@ -164,11 +164,11 @@ async def test_mutate_rows_operation(self): table = mock.Mock() entries = [_make_mutation(), _make_mutation()] operation_timeout = 0.05 - instance = self._make_one( - client, table, entries, operation_timeout, operation_timeout - ) - with mock.patch.object(instance, "_operation", AsyncMock()) as attempt_mock: - attempt_mock.return_value = None + cls = self._target_class() + with mock.patch(f"{cls.__module__}.{cls.__name__}._run_attempt", AsyncMock()) as attempt_mock: + instance = self._make_one( + client, table, entries, operation_timeout, operation_timeout + ) await instance.start() assert attempt_mock.call_count == 1 diff --git a/tests/unit/data/_async/test_client.py b/tests/unit/data/_async/test_client.py index a3cf9b8e6..60a305bcb 100644 --- a/tests/unit/data/_async/test_client.py +++ b/tests/unit/data/_async/test_client.py @@ -1172,7 +1172,7 @@ def test_table_ctor_sync(self): ("sample_row_keys", (), "google.api_core.retry.retry_target_async", ()), ( "mutate_row", - (b"row_key", []), + (b"row_key", [mock.Mock()]), "google.api_core.retry.retry_target_async", (), ), @@ -1253,10 +1253,10 @@ async def test_customizable_retryable_errors( ("read_rows_sharded", ([ReadRowsQuery()],), "read_rows"), ("row_exists", (b"row_key",), "read_rows"), ("sample_row_keys", (), "sample_row_keys"), - ("mutate_row", (b"row_key", []), "mutate_row"), + ("mutate_row", (b"row_key", [mock.Mock()]), "mutate_row"), ( "bulk_mutate_rows", - ([mutations.RowMutationEntry(b"key", [mock.Mock()])],), + ([mutations.RowMutationEntry(b"key", [mutations.DeleteAllFromRow()])],), "mutate_rows", ), ("check_and_mutate_row", (b"row_key", None), "check_and_mutate_row"), diff --git a/tests/unit/data/_async/test_mutations_batcher.py b/tests/unit/data/_async/test_mutations_batcher.py index 17bd8d420..446cd822e 100644 --- a/tests/unit/data/_async/test_mutations_batcher.py +++ b/tests/unit/data/_async/test_mutations_batcher.py @@ -1158,10 +1158,10 @@ async def test_customizable_retryable_errors( from google.cloud.bigtable.data._async.client import TableAsync with mock.patch( - "google.api_core.retry_async.if_exception_type" + "google.api_core.retry.if_exception_type" ) as predicate_builder_mock: with mock.patch( - "google.api_core.retry_async.retry_target" + "google.api_core.retry.retry_target_async" ) as retry_fn_mock: table = None with mock.patch("asyncio.create_task"): diff --git a/tests/unit/data/test__helpers.py b/tests/unit/data/test__helpers.py index b9c1dc2bb..625cafde7 100644 --- a/tests/unit/data/test__helpers.py +++ b/tests/unit/data/test__helpers.py @@ -100,71 +100,6 @@ def test_attempt_timeout_w_sleeps(self): expected_value -= sleep_time -class TestConvertRetryDeadline: - """ - Test _convert_retry_deadline wrapper - """ - - @pytest.mark.asyncio - @pytest.mark.parametrize("is_async", [True, False]) - async def test_no_error(self, is_async): - def test_func(): - return 1 - - async def test_async(): - return test_func() - - func = test_async if is_async else test_func - wrapped = _helpers._convert_retry_deadline(func, 0.1, is_async) - result = await wrapped() if is_async else wrapped() - assert result == 1 - - @pytest.mark.asyncio - @pytest.mark.parametrize("timeout", [0.1, 2.0, 30.0]) - @pytest.mark.parametrize("is_async", [True, False]) - async def test_retry_error(self, timeout, is_async): - from google.api_core.exceptions import RetryError, DeadlineExceeded - - def test_func(): - raise RetryError("retry error", None) - - async def test_async(): - return test_func() - - func = test_async if is_async else test_func - wrapped = _helpers._convert_retry_deadline(func, timeout, is_async=is_async) - with pytest.raises(DeadlineExceeded) as e: - await wrapped() if is_async else wrapped() - assert e.value.__cause__ is None - assert f"operation_timeout of {timeout}s exceeded" in str(e.value) - - @pytest.mark.asyncio - @pytest.mark.parametrize("is_async", [True, False]) - async def test_with_retry_errors(self, is_async): - from google.api_core.exceptions import RetryError, DeadlineExceeded - - timeout = 10.0 - - def test_func(): - raise RetryError("retry error", None) - - async def test_async(): - return test_func() - - func = test_async if is_async else test_func - - associated_errors = [RuntimeError("error1"), ZeroDivisionError("other")] - wrapped = _helpers._convert_retry_deadline( - func, timeout, associated_errors, is_async - ) - with pytest.raises(DeadlineExceeded) as e: - await wrapped() - cause = e.value.__cause__ - assert isinstance(cause, bigtable_exceptions.RetryExceptionGroup) - assert cause.exceptions == tuple(associated_errors) - assert f"operation_timeout of {timeout}s exceeded" in str(e.value) - - class TestValidateTimeouts: def test_validate_timeouts_error_messages(self): with pytest.raises(ValueError) as e: From b042f83a81740a233804f7975f245c0396f60fea Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Wed, 13 Dec 2023 12:02:29 -0800 Subject: [PATCH 08/15] updated api_core dependency --- noxfile.py | 2 -- setup.py | 2 +- testing/constraints-3.7.txt | 2 +- testing/constraints-3.8.txt | 2 +- 4 files changed, 3 insertions(+), 5 deletions(-) diff --git a/noxfile.py b/noxfile.py index d6e1c666d..2b2539e0f 100644 --- a/noxfile.py +++ b/noxfile.py @@ -40,7 +40,6 @@ "pytest-asyncio", ] UNIT_TEST_EXTERNAL_DEPENDENCIES = [ - "git+https://github.com/googleapis/python-api-core.git@retry_generators" ] UNIT_TEST_LOCAL_DEPENDENCIES = [] UNIT_TEST_DEPENDENCIES = [] @@ -55,7 +54,6 @@ "google-cloud-testutils", ] SYSTEM_TEST_EXTERNAL_DEPENDENCIES = [ - # "git+https://github.com/googleapis/python-api-core.git@retry_generators" ] SYSTEM_TEST_LOCAL_DEPENDENCIES = [] UNIT_TEST_DEPENDENCIES = [] diff --git a/setup.py b/setup.py index e5efc9937..fe15c279e 100644 --- a/setup.py +++ b/setup.py @@ -37,7 +37,7 @@ # 'Development Status :: 5 - Production/Stable' release_status = "Development Status :: 5 - Production/Stable" dependencies = [ - "google-api-core[grpc] == 2.12.0.dev1", # TODO: change to >= after streaming retries is merged + "google-api-core[grpc] >= 2.16.0.rc0", "google-cloud-core >= 1.4.1, <3.0.0dev", "grpc-google-iam-v1 >= 0.12.4, <1.0.0dev", "proto-plus >= 1.22.0, <2.0.0dev", diff --git a/testing/constraints-3.7.txt b/testing/constraints-3.7.txt index 9f23121d1..4c5d40734 100644 --- a/testing/constraints-3.7.txt +++ b/testing/constraints-3.7.txt @@ -5,7 +5,7 @@ # # e.g., if setup.py has "foo >= 1.14.0, < 2.0.0dev", # Then this file should have foo==1.14.0 -google-api-core==2.12.0.dev1 +google-api-core==2.16.0.rc0 google-cloud-core==2.3.2 grpc-google-iam-v1==0.12.4 proto-plus==1.22.0 diff --git a/testing/constraints-3.8.txt b/testing/constraints-3.8.txt index a2435078e..288c73826 100644 --- a/testing/constraints-3.8.txt +++ b/testing/constraints-3.8.txt @@ -5,7 +5,7 @@ # # e.g., if setup.py has "foo >= 1.14.0, < 2.0.0dev", # Then this file should have foo==1.14.0 -# google-api-core==2.12.0.dev1 +google-api-core==2.16.0.rc0 google-cloud-core==2.3.2 grpc-google-iam-v1==0.12.4 proto-plus==1.22.0 From bb8788ebff2c66c8047aad489cf88e14860e1beb Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Wed, 13 Dec 2023 12:44:19 -0800 Subject: [PATCH 09/15] pinned pytest_asyncio version --- testing/constraints-3.8.txt | 1 + tests/system/data/test_system.py | 54 ++++++++++++++++---------------- 2 files changed, 28 insertions(+), 27 deletions(-) diff --git a/testing/constraints-3.8.txt b/testing/constraints-3.8.txt index 288c73826..cbc3b3e8a 100644 --- a/testing/constraints-3.8.txt +++ b/testing/constraints-3.8.txt @@ -11,3 +11,4 @@ grpc-google-iam-v1==0.12.4 proto-plus==1.22.0 libcst==0.2.5 protobuf==3.19.5 +pytest-asyncio==0.21.1 diff --git a/tests/system/data/test_system.py b/tests/system/data/test_system.py index 6bd21f386..fb0d9eb82 100644 --- a/tests/system/data/test_system.py +++ b/tests/system/data/test_system.py @@ -92,14 +92,15 @@ async def add_row( self.rows.append(row_key) async def delete_rows(self): - request = { - "table_name": self.table.table_name, - "entries": [ - {"row_key": row, "mutations": [{"delete_from_row": {}}]} - for row in self.rows - ], - } - await self.table.client._gapic_client.mutate_rows(request) + if self.rows: + request = { + "table_name": self.table.table_name, + "entries": [ + {"row_key": row, "mutations": [{"delete_from_row": {}}]} + for row in self.rows + ], + } + await self.table.client._gapic_client.mutate_rows(request) @pytest.mark.usefixtures("table") @@ -147,7 +148,7 @@ async def temp_rows(table): @pytest.mark.usefixtures("table") @pytest.mark.usefixtures("client") -@retry.Retry(predicate=retry.if_exception_type(ClientError), initial=1, maximum=10) +@retry.AsyncRetry(predicate=retry.if_exception_type(ClientError), initial=1, maximum=10) @pytest.mark.asyncio async def test_ping_and_warm_gapic(client, table): """ @@ -160,7 +161,7 @@ async def test_ping_and_warm_gapic(client, table): @pytest.mark.usefixtures("table") @pytest.mark.usefixtures("client") -@retry.Retry(predicate=retry.if_exception_type(ClientError), initial=1, maximum=5) +@retry.AsyncRetry(predicate=retry.if_exception_type(ClientError), initial=1, maximum=5) @pytest.mark.asyncio async def test_ping_and_warm(client, table): """ @@ -176,9 +177,9 @@ async def test_ping_and_warm(client, table): assert results[0] is None -@pytest.mark.usefixtures("table") -@retry.Retry(predicate=retry.if_exception_type(ClientError), initial=1, maximum=5) @pytest.mark.asyncio +@pytest.mark.usefixtures("table") +@retry.AsyncRetry(predicate=retry.if_exception_type(ClientError), initial=1, maximum=5) async def test_mutation_set_cell(table, temp_rows): """ Ensure cells can be set properly @@ -196,7 +197,7 @@ async def test_mutation_set_cell(table, temp_rows): @pytest.mark.usefixtures("client") @pytest.mark.usefixtures("table") -@retry.Retry(predicate=retry.if_exception_type(ClientError), initial=1, maximum=5) +@retry.AsyncRetry(predicate=retry.if_exception_type(ClientError), initial=1, maximum=5) @pytest.mark.asyncio async def test_sample_row_keys(client, table, temp_rows, column_split_config): """ @@ -239,7 +240,7 @@ async def test_bulk_mutations_set_cell(client, table, temp_rows): @pytest.mark.usefixtures("client") @pytest.mark.usefixtures("table") -@retry.Retry(predicate=retry.if_exception_type(ClientError), initial=1, maximum=5) +@retry.AsyncRetry(predicate=retry.if_exception_type(ClientError), initial=1, maximum=5) @pytest.mark.asyncio async def test_mutations_batcher_context_manager(client, table, temp_rows): """ @@ -267,7 +268,7 @@ async def test_mutations_batcher_context_manager(client, table, temp_rows): @pytest.mark.usefixtures("client") @pytest.mark.usefixtures("table") -@retry.Retry(predicate=retry.if_exception_type(ClientError), initial=1, maximum=5) +@retry.AsyncRetry(predicate=retry.if_exception_type(ClientError), initial=1, maximum=5) @pytest.mark.asyncio async def test_mutations_batcher_timer_flush(client, table, temp_rows): """ @@ -293,7 +294,7 @@ async def test_mutations_batcher_timer_flush(client, table, temp_rows): @pytest.mark.usefixtures("client") @pytest.mark.usefixtures("table") -@retry.Retry(predicate=retry.if_exception_type(ClientError), initial=1, maximum=5) +@retry.AsyncRetry(predicate=retry.if_exception_type(ClientError), initial=1, maximum=5) @pytest.mark.asyncio async def test_mutations_batcher_count_flush(client, table, temp_rows): """ @@ -329,7 +330,7 @@ async def test_mutations_batcher_count_flush(client, table, temp_rows): @pytest.mark.usefixtures("client") @pytest.mark.usefixtures("table") -@retry.Retry(predicate=retry.if_exception_type(ClientError), initial=1, maximum=5) +@retry.AsyncRetry(predicate=retry.if_exception_type(ClientError), initial=1, maximum=5) @pytest.mark.asyncio async def test_mutations_batcher_bytes_flush(client, table, temp_rows): """ @@ -366,7 +367,6 @@ async def test_mutations_batcher_bytes_flush(client, table, temp_rows): @pytest.mark.usefixtures("client") @pytest.mark.usefixtures("table") -@retry.Retry(predicate=retry.if_exception_type(ClientError), initial=1, maximum=5) @pytest.mark.asyncio async def test_mutations_batcher_no_flush(client, table, temp_rows): """ @@ -570,7 +570,7 @@ async def test_check_and_mutate( @pytest.mark.usefixtures("table") -@retry.Retry(predicate=retry.if_exception_type(ClientError), initial=1, maximum=5) +@retry.AsyncRetry(predicate=retry.if_exception_type(ClientError), initial=1, maximum=5) @pytest.mark.asyncio async def test_read_rows_stream(table, temp_rows): """ @@ -590,7 +590,7 @@ async def test_read_rows_stream(table, temp_rows): @pytest.mark.usefixtures("table") -@retry.Retry(predicate=retry.if_exception_type(ClientError), initial=1, maximum=5) +@retry.AsyncRetry(predicate=retry.if_exception_type(ClientError), initial=1, maximum=5) @pytest.mark.asyncio async def test_read_rows(table, temp_rows): """ @@ -606,7 +606,7 @@ async def test_read_rows(table, temp_rows): @pytest.mark.usefixtures("table") -@retry.Retry(predicate=retry.if_exception_type(ClientError), initial=1, maximum=5) +@retry.AsyncRetry(predicate=retry.if_exception_type(ClientError), initial=1, maximum=5) @pytest.mark.asyncio async def test_read_rows_sharded_simple(table, temp_rows): """ @@ -629,7 +629,7 @@ async def test_read_rows_sharded_simple(table, temp_rows): @pytest.mark.usefixtures("table") -@retry.Retry(predicate=retry.if_exception_type(ClientError), initial=1, maximum=5) +@retry.AsyncRetry(predicate=retry.if_exception_type(ClientError), initial=1, maximum=5) @pytest.mark.asyncio async def test_read_rows_sharded_from_sample(table, temp_rows): """ @@ -654,7 +654,7 @@ async def test_read_rows_sharded_from_sample(table, temp_rows): @pytest.mark.usefixtures("table") -@retry.Retry(predicate=retry.if_exception_type(ClientError), initial=1, maximum=5) +@retry.AsyncRetry(predicate=retry.if_exception_type(ClientError), initial=1, maximum=5) @pytest.mark.asyncio async def test_read_rows_sharded_filters_limits(table, temp_rows): """ @@ -683,7 +683,7 @@ async def test_read_rows_sharded_filters_limits(table, temp_rows): @pytest.mark.usefixtures("table") -@retry.Retry(predicate=retry.if_exception_type(ClientError), initial=1, maximum=5) +@retry.AsyncRetry(predicate=retry.if_exception_type(ClientError), initial=1, maximum=5) @pytest.mark.asyncio async def test_read_rows_range_query(table, temp_rows): """ @@ -705,7 +705,7 @@ async def test_read_rows_range_query(table, temp_rows): @pytest.mark.usefixtures("table") -@retry.Retry(predicate=retry.if_exception_type(ClientError), initial=1, maximum=5) +@retry.AsyncRetry(predicate=retry.if_exception_type(ClientError), initial=1, maximum=5) @pytest.mark.asyncio async def test_read_rows_single_key_query(table, temp_rows): """ @@ -726,7 +726,7 @@ async def test_read_rows_single_key_query(table, temp_rows): @pytest.mark.usefixtures("table") -@retry.Retry(predicate=retry.if_exception_type(ClientError), initial=1, maximum=5) +@retry.AsyncRetry(predicate=retry.if_exception_type(ClientError), initial=1, maximum=5) @pytest.mark.asyncio async def test_read_rows_with_filter(table, temp_rows): """ @@ -842,7 +842,7 @@ async def test_row_exists(table, temp_rows): @pytest.mark.usefixtures("table") -@retry.Retry(predicate=retry.if_exception_type(ClientError), initial=1, maximum=5) +@retry.AsyncRetry(predicate=retry.if_exception_type(ClientError), initial=1, maximum=5) @pytest.mark.parametrize( "cell_value,filter_input,expect_match", [ From f74f260b9ed4481b9f5922e0e9a3753d3590c615 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Wed, 13 Dec 2023 12:45:43 -0800 Subject: [PATCH 10/15] ran black --- google/cloud/bigtable/data/_async/_mutate_rows.py | 2 +- google/cloud/bigtable/data/_helpers.py | 5 ++--- noxfile.py | 6 ++---- tests/unit/data/_async/test__mutate_rows.py | 4 +++- 4 files changed, 8 insertions(+), 9 deletions(-) diff --git a/google/cloud/bigtable/data/_async/_mutate_rows.py b/google/cloud/bigtable/data/_async/_mutate_rows.py index d6058cfdb..5286e938f 100644 --- a/google/cloud/bigtable/data/_async/_mutate_rows.py +++ b/google/cloud/bigtable/data/_async/_mutate_rows.py @@ -107,7 +107,7 @@ def __init__( self.is_retryable, sleep_generator, operation_timeout, - exception_factory=_retry_exception_factory + exception_factory=_retry_exception_factory, ) # initialize state self.timeout_generator = _attempt_timeout_generator( diff --git a/google/cloud/bigtable/data/_helpers.py b/google/cloud/bigtable/data/_helpers.py index fc0344e3c..647284b6d 100644 --- a/google/cloud/bigtable/data/_helpers.py +++ b/google/cloud/bigtable/data/_helpers.py @@ -122,12 +122,11 @@ def _retry_exception_factory( else: source_exc = RuntimeError("failed with unspecified exception") # use the retry exception group as the cause of the exception - cause_exc: Exception | None = ( - RetryExceptionGroup(exc_list) if exc_list else None - ) + cause_exc: Exception | None = RetryExceptionGroup(exc_list) if exc_list else None source_exc.__cause__ = cause_exc return source_exc, cause_exc + def _get_timeouts( operation: float | TABLE_DEFAULT, attempt: float | None | TABLE_DEFAULT, diff --git a/noxfile.py b/noxfile.py index 2b2539e0f..0706573ce 100644 --- a/noxfile.py +++ b/noxfile.py @@ -39,8 +39,7 @@ "pytest-cov", "pytest-asyncio", ] -UNIT_TEST_EXTERNAL_DEPENDENCIES = [ -] +UNIT_TEST_EXTERNAL_DEPENDENCIES = [] UNIT_TEST_LOCAL_DEPENDENCIES = [] UNIT_TEST_DEPENDENCIES = [] UNIT_TEST_EXTRAS = [] @@ -53,8 +52,7 @@ "pytest-asyncio", "google-cloud-testutils", ] -SYSTEM_TEST_EXTERNAL_DEPENDENCIES = [ -] +SYSTEM_TEST_EXTERNAL_DEPENDENCIES = [] SYSTEM_TEST_LOCAL_DEPENDENCIES = [] UNIT_TEST_DEPENDENCIES = [] SYSTEM_TEST_DEPENDENCIES = [] diff --git a/tests/unit/data/_async/test__mutate_rows.py b/tests/unit/data/_async/test__mutate_rows.py index 1860c0be2..e03028c45 100644 --- a/tests/unit/data/_async/test__mutate_rows.py +++ b/tests/unit/data/_async/test__mutate_rows.py @@ -165,7 +165,9 @@ async def test_mutate_rows_operation(self): entries = [_make_mutation(), _make_mutation()] operation_timeout = 0.05 cls = self._target_class() - with mock.patch(f"{cls.__module__}.{cls.__name__}._run_attempt", AsyncMock()) as attempt_mock: + with mock.patch( + f"{cls.__module__}.{cls.__name__}._run_attempt", AsyncMock() + ) as attempt_mock: instance = self._make_one( client, table, entries, operation_timeout, operation_timeout ) From e057cd7fe9a20cf81f9393b00382c4752924287e Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Wed, 13 Dec 2023 12:47:33 -0800 Subject: [PATCH 11/15] fixed lint issues --- google/cloud/bigtable/data/_async/_mutate_rows.py | 1 - google/cloud/bigtable/data/_helpers.py | 2 +- tests/unit/data/test__helpers.py | 1 - 3 files changed, 1 insertion(+), 3 deletions(-) diff --git a/google/cloud/bigtable/data/_async/_mutate_rows.py b/google/cloud/bigtable/data/_async/_mutate_rows.py index 5286e938f..801b7b55c 100644 --- a/google/cloud/bigtable/data/_async/_mutate_rows.py +++ b/google/cloud/bigtable/data/_async/_mutate_rows.py @@ -15,7 +15,6 @@ from __future__ import annotations from typing import Sequence, TYPE_CHECKING -import asyncio from dataclasses import dataclass import functools diff --git a/google/cloud/bigtable/data/_helpers.py b/google/cloud/bigtable/data/_helpers.py index 647284b6d..0a6720679 100644 --- a/google/cloud/bigtable/data/_helpers.py +++ b/google/cloud/bigtable/data/_helpers.py @@ -16,7 +16,7 @@ """ from __future__ import annotations -from typing import Callable, Sequence, List, Tuple, Any, TYPE_CHECKING +from typing import Sequence, List, Tuple, TYPE_CHECKING import time import enum from collections import namedtuple diff --git a/tests/unit/data/test__helpers.py b/tests/unit/data/test__helpers.py index 625cafde7..5a9c500ed 100644 --- a/tests/unit/data/test__helpers.py +++ b/tests/unit/data/test__helpers.py @@ -17,7 +17,6 @@ from google.api_core import exceptions as core_exceptions import google.cloud.bigtable.data._helpers as _helpers from google.cloud.bigtable.data._helpers import TABLE_DEFAULT -import google.cloud.bigtable.data.exceptions as bigtable_exceptions import mock From 1b02b2cd75797415aac5db662548123349240510 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Wed, 13 Dec 2023 13:55:02 -0800 Subject: [PATCH 12/15] pass empty retry --- google/cloud/bigtable/data/_async/_mutate_rows.py | 1 + 1 file changed, 1 insertion(+) diff --git a/google/cloud/bigtable/data/_async/_mutate_rows.py b/google/cloud/bigtable/data/_async/_mutate_rows.py index 801b7b55c..d4ffdee22 100644 --- a/google/cloud/bigtable/data/_async/_mutate_rows.py +++ b/google/cloud/bigtable/data/_async/_mutate_rows.py @@ -175,6 +175,7 @@ async def _run_attempt(self): result_generator = await self._gapic_fn( timeout=next(self.timeout_generator), entries=request_entries, + retry=None, ) async for result_list in result_generator: for result in result_list.entries: From d637269c95f80dc33a04a83e4a4751961c05660c Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Wed, 13 Dec 2023 13:55:12 -0800 Subject: [PATCH 13/15] remove IdleTimeout --- google/cloud/bigtable/data/__init__.py | 2 -- google/cloud/bigtable/data/_async/client.py | 1 - google/cloud/bigtable/data/exceptions.py | 9 --------- 3 files changed, 12 deletions(-) diff --git a/google/cloud/bigtable/data/__init__.py b/google/cloud/bigtable/data/__init__.py index a68be5417..5229f8021 100644 --- a/google/cloud/bigtable/data/__init__.py +++ b/google/cloud/bigtable/data/__init__.py @@ -32,7 +32,6 @@ from google.cloud.bigtable.data.mutations import DeleteAllFromFamily from google.cloud.bigtable.data.mutations import DeleteAllFromRow -from google.cloud.bigtable.data.exceptions import IdleTimeout from google.cloud.bigtable.data.exceptions import InvalidChunk from google.cloud.bigtable.data.exceptions import FailedMutationEntryError from google.cloud.bigtable.data.exceptions import FailedQueryShardError @@ -63,7 +62,6 @@ "DeleteAllFromRow", "Row", "Cell", - "IdleTimeout", "InvalidChunk", "FailedMutationEntryError", "FailedQueryShardError", diff --git a/google/cloud/bigtable/data/_async/client.py b/google/cloud/bigtable/data/_async/client.py index 0d0334b34..25b902142 100644 --- a/google/cloud/bigtable/data/_async/client.py +++ b/google/cloud/bigtable/data/_async/client.py @@ -579,7 +579,6 @@ async def read_rows_stream( will be chained with a RetryExceptionGroup containing GoogleAPIError exceptions from any retries that failed - GoogleAPIError: raised if the request encounters an unrecoverable error - - IdleTimeout: if iterator was abandoned """ operation_timeout, attempt_timeout = _get_timeouts( operation_timeout, attempt_timeout, self diff --git a/google/cloud/bigtable/data/exceptions.py b/google/cloud/bigtable/data/exceptions.py index 7344874df..3c73ec4e9 100644 --- a/google/cloud/bigtable/data/exceptions.py +++ b/google/cloud/bigtable/data/exceptions.py @@ -28,15 +28,6 @@ from google.cloud.bigtable.data.read_rows_query import ReadRowsQuery -class IdleTimeout(core_exceptions.DeadlineExceeded): - """ - Exception raised by ReadRowsIterator when the generator - has been idle for longer than the internal idle_timeout. - """ - - pass - - class InvalidChunk(core_exceptions.GoogleAPICallError): """Exception raised to invalid chunk data from back-end.""" From 5b8e856026a6b9f49004401e04ff140429d08690 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Wed, 13 Dec 2023 14:34:57 -0800 Subject: [PATCH 14/15] update pin --- noxfile.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/noxfile.py b/noxfile.py index 0706573ce..6fd41a29e 100644 --- a/noxfile.py +++ b/noxfile.py @@ -456,7 +456,7 @@ def prerelease_deps(session): # Exclude version 1.52.0rc1 which has a known issue. See https://github.com/grpc/grpc/issues/32163 "grpcio!=1.52.0rc1", "grpcio-status", - "google-api-core==2.12.0.dev1", # TODO: remove this once streaming retries is merged + "google-api-core==2.16.0rc0", # TODO: remove pin once streaming retries is merged "proto-plus", "google-cloud-testutils", # dependencies of google-cloud-testutils" From c329474b73dfed8d48ae01a09036fac7e69e74da Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Wed, 13 Dec 2023 14:58:15 -0800 Subject: [PATCH 15/15] support native namespaces --- google/__init__.py | 6 ------ google/cloud/__init__.py | 6 ------ setup.py | 12 +++--------- testing/constraints-3.7.txt | 2 +- testing/constraints-3.8.txt | 2 +- 5 files changed, 5 insertions(+), 23 deletions(-) delete mode 100644 google/__init__.py delete mode 100644 google/cloud/__init__.py diff --git a/google/__init__.py b/google/__init__.py deleted file mode 100644 index a5ba80656..000000000 --- a/google/__init__.py +++ /dev/null @@ -1,6 +0,0 @@ -try: - import pkg_resources - - pkg_resources.declare_namespace(__name__) -except ImportError: - pass diff --git a/google/cloud/__init__.py b/google/cloud/__init__.py deleted file mode 100644 index a5ba80656..000000000 --- a/google/cloud/__init__.py +++ /dev/null @@ -1,6 +0,0 @@ -try: - import pkg_resources - - pkg_resources.declare_namespace(__name__) -except ImportError: - pass diff --git a/setup.py b/setup.py index fe15c279e..0bce3a5d6 100644 --- a/setup.py +++ b/setup.py @@ -37,8 +37,8 @@ # 'Development Status :: 5 - Production/Stable' release_status = "Development Status :: 5 - Production/Stable" dependencies = [ - "google-api-core[grpc] >= 2.16.0.rc0", - "google-cloud-core >= 1.4.1, <3.0.0dev", + "google-api-core[grpc] >= 2.16.0rc0", + "google-cloud-core >= 1.4.4, <3.0.0dev", "grpc-google-iam-v1 >= 0.12.4, <1.0.0dev", "proto-plus >= 1.22.0, <2.0.0dev", "proto-plus >= 1.22.2, <2.0.0dev; python_version>='3.11'", @@ -59,15 +59,10 @@ # benchmarks, etc. packages = [ package - for package in setuptools.PEP420PackageFinder.find() + for package in setuptools.find_namespace_packages() if package.startswith("google") ] -# Determine which namespaces are needed. -namespaces = ["google"] -if "google.cloud" in packages: - namespaces.append("google.cloud") - setuptools.setup( name=name, @@ -93,7 +88,6 @@ ], platforms="Posix; MacOS X; Windows", packages=packages, - namespace_packages=namespaces, install_requires=dependencies, extras_require=extras, scripts=[ diff --git a/testing/constraints-3.7.txt b/testing/constraints-3.7.txt index 4c5d40734..83bfe4577 100644 --- a/testing/constraints-3.7.txt +++ b/testing/constraints-3.7.txt @@ -5,7 +5,7 @@ # # e.g., if setup.py has "foo >= 1.14.0, < 2.0.0dev", # Then this file should have foo==1.14.0 -google-api-core==2.16.0.rc0 +google-api-core==2.16.0rc0 google-cloud-core==2.3.2 grpc-google-iam-v1==0.12.4 proto-plus==1.22.0 diff --git a/testing/constraints-3.8.txt b/testing/constraints-3.8.txt index cbc3b3e8a..505ba9934 100644 --- a/testing/constraints-3.8.txt +++ b/testing/constraints-3.8.txt @@ -5,7 +5,7 @@ # # e.g., if setup.py has "foo >= 1.14.0, < 2.0.0dev", # Then this file should have foo==1.14.0 -google-api-core==2.16.0.rc0 +google-api-core==2.16.0rc0 google-cloud-core==2.3.2 grpc-google-iam-v1==0.12.4 proto-plus==1.22.0