diff --git a/google/cloud/bigtable/data/_async/_mutate_rows.py b/google/cloud/bigtable/data/_async/_mutate_rows.py index 6e22213d9..e0fad8559 100644 --- a/google/cloud/bigtable/data/_async/_mutate_rows.py +++ b/google/cloud/bigtable/data/_async/_mutate_rows.py @@ -25,6 +25,7 @@ from google.cloud.bigtable.data._helpers import _make_metadata from google.cloud.bigtable.data._helpers import _attempt_timeout_generator from google.cloud.bigtable.data._helpers import _retry_exception_factory +from google.cloud.bigtable.data._helpers import backoff_generator # mutate_rows requests are limited to this number of mutations from google.cloud.bigtable.data.mutations import _MUTATE_ROWS_REQUEST_MUTATION_LIMIT @@ -104,7 +105,7 @@ def __init__( # Entry level errors bt_exceptions._MutateRowsIncomplete, ) - sleep_generator = retries.exponential_sleep_generator(0.01, 2, 60) + sleep_generator = backoff_generator(0.01, 2, 60) self._operation = retries.retry_target_async( self._run_attempt, self.is_retryable, @@ -120,6 +121,7 @@ def __init__( self.remaining_indices = list(range(len(self.mutations))) self.errors: dict[int, list[Exception]] = {} # set up metrics + metrics.backoff_generator = sleep_generator self._operation_metrics = metrics async def start(self): diff --git a/google/cloud/bigtable/data/_async/client.py b/google/cloud/bigtable/data/_async/client.py index 6fa1d869b..884eadc4c 100644 --- a/google/cloud/bigtable/data/_async/client.py +++ b/google/cloud/bigtable/data/_async/client.py @@ -70,6 +70,7 @@ from google.cloud.bigtable.data._helpers import _get_retryable_errors from google.cloud.bigtable.data._helpers import _get_timeouts from google.cloud.bigtable.data._helpers import _attempt_timeout_generator +from google.cloud.bigtable.data._helpers import backoff_generator from google.cloud.bigtable.data._async.mutations_batcher import MutationsBatcherAsync from google.cloud.bigtable.data._async.mutations_batcher import _MB_SIZE from google.cloud.bigtable.data.read_modify_write_rules import ReadModifyWriteRule @@ -905,14 +906,14 @@ async def sample_row_keys( retryable_excs = _get_retryable_errors(retryable_errors, self) predicate = retries.if_exception_type(*retryable_excs) - sleep_generator = retries.exponential_sleep_generator(0.01, 2, 60) + sleep_generator = backoff_generator(0.01, 2, 60) # prepare request metadata = _make_metadata(self.table_name, self.app_profile_id) # wrap rpc in retry and metric collection logic async with self._metrics.create_operation( - OperationType.SAMPLE_ROW_KEYS + OperationType.SAMPLE_ROW_KEYS, backoff_generator=sleep_generator ) as operation: async def execute_rpc(): @@ -1050,12 +1051,11 @@ async def mutate_row( # mutations should not be retried predicate = retries.if_exception_type() - sleep_generator = retries.exponential_sleep_generator(0.01, 2, 60) - + sleep_generator = backoff_generator(0.01, 2, 60) # wrap rpc in retry and metric collection logic async with self._metrics.create_operation( - OperationType.MUTATE_ROW + OperationType.MUTATE_ROW, backoff_generator=sleep_generator ) as operation: metric_wrapped = operation.wrap_attempt_fn( self.client._gapic_client.mutate_row