Skip to content

Commit

Permalink
Merge branch 'experimental_v3' into merge_main
Browse files Browse the repository at this point in the history
  • Loading branch information
daniel-sanche authored Dec 15, 2023
2 parents ec6a6f7 + 9342e27 commit 45aa877
Show file tree
Hide file tree
Showing 17 changed files with 136 additions and 289 deletions.
2 changes: 0 additions & 2 deletions google/cloud/bigtable/data/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
from google.cloud.bigtable.data.mutations import DeleteAllFromFamily
from google.cloud.bigtable.data.mutations import DeleteAllFromRow

from google.cloud.bigtable.data.exceptions import IdleTimeout
from google.cloud.bigtable.data.exceptions import InvalidChunk
from google.cloud.bigtable.data.exceptions import FailedMutationEntryError
from google.cloud.bigtable.data.exceptions import FailedQueryShardError
Expand Down Expand Up @@ -63,7 +62,6 @@
"DeleteAllFromRow",
"Row",
"Cell",
"IdleTimeout",
"InvalidChunk",
"FailedMutationEntryError",
"FailedQueryShardError",
Expand Down
33 changes: 11 additions & 22 deletions google/cloud/bigtable/data/_async/_mutate_rows.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,16 @@
from __future__ import annotations

from typing import Sequence, TYPE_CHECKING
import asyncio
from dataclasses import dataclass
import functools

from google.api_core import exceptions as core_exceptions
from google.api_core import retry_async as retries
from google.api_core import retry as retries
import google.cloud.bigtable_v2.types.bigtable as types_pb
import google.cloud.bigtable.data.exceptions as bt_exceptions
from google.cloud.bigtable.data._helpers import _make_metadata
from google.cloud.bigtable.data._helpers import _convert_retry_deadline
from google.cloud.bigtable.data._helpers import _attempt_timeout_generator
from google.cloud.bigtable.data._helpers import _retry_exception_factory

# mutate_rows requests are limited to this number of mutations
from google.cloud.bigtable.data.mutations import _MUTATE_ROWS_REQUEST_MUTATION_LIMIT
Expand Down Expand Up @@ -101,17 +100,13 @@ def __init__(
# Entry level errors
bt_exceptions._MutateRowsIncomplete,
)
# build retryable operation
retry = retries.AsyncRetry(
predicate=self.is_retryable,
timeout=operation_timeout,
initial=0.01,
multiplier=2,
maximum=60,
)
retry_wrapped = retry(self._run_attempt)
self._operation = _convert_retry_deadline(
retry_wrapped, operation_timeout, is_async=True
sleep_generator = retries.exponential_sleep_generator(0.01, 2, 60)
self._operation = retries.retry_target_async(
self._run_attempt,
self.is_retryable,
sleep_generator,
operation_timeout,
exception_factory=_retry_exception_factory,
)
# initialize state
self.timeout_generator = _attempt_timeout_generator(
Expand All @@ -130,7 +125,7 @@ async def start(self):
"""
try:
# trigger mutate_rows
await self._operation()
await self._operation
except Exception as exc:
# exceptions raised by retryable are added to the list of exceptions for all unfinalized mutations
incomplete_indices = self.remaining_indices.copy()
Expand Down Expand Up @@ -180,6 +175,7 @@ async def _run_attempt(self):
result_generator = await self._gapic_fn(
timeout=next(self.timeout_generator),
entries=request_entries,
retry=None,
)
async for result_list in result_generator:
for result in result_list.entries:
Expand All @@ -195,13 +191,6 @@ async def _run_attempt(self):
self._handle_entry_error(orig_idx, entry_error)
# remove processed entry from active list
del active_request_indices[result.index]
except asyncio.CancelledError:
# when retry wrapper timeout expires, the operation is cancelled
# make sure incomplete indices are tracked,
# but don't record exception (it will be raised by wrapper)
# TODO: remove asyncio.wait_for in retry wrapper. Let grpc call handle expiration
self.remaining_indices.extend(active_request_indices.values())
raise
except Exception as exc:
# add this exception to list for each mutation that wasn't
# already handled, and update remaining_indices if mutation is retryable
Expand Down
42 changes: 4 additions & 38 deletions google/cloud/bigtable/data/_async/_read_rows.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,13 @@
from google.cloud.bigtable.data.row import Row, Cell
from google.cloud.bigtable.data.read_rows_query import ReadRowsQuery
from google.cloud.bigtable.data.exceptions import InvalidChunk
from google.cloud.bigtable.data.exceptions import RetryExceptionGroup
from google.cloud.bigtable.data.exceptions import _RowSetComplete
from google.cloud.bigtable.data._helpers import _attempt_timeout_generator
from google.cloud.bigtable.data._helpers import _make_metadata
from google.cloud.bigtable.data._helpers import _retry_exception_factory

from google.api_core import retry_async as retries
from google.api_core.retry_streaming_async import retry_target_stream
from google.api_core import retry as retries
from google.api_core.retry import exponential_sleep_generator
from google.api_core import exceptions as core_exceptions

if TYPE_CHECKING:
from google.cloud.bigtable.data._async.client import TableAsync
Expand Down Expand Up @@ -107,12 +105,12 @@ def start_operation(self) -> AsyncGenerator[Row, None]:
"""
Start the read_rows operation, retrying on retryable errors.
"""
return retry_target_stream(
return retries.retry_target_stream_async(
self._read_rows_attempt,
self._predicate,
exponential_sleep_generator(0.01, 60, multiplier=2),
self.operation_timeout,
exception_factory=self._build_exception,
exception_factory=_retry_exception_factory,
)

def _read_rows_attempt(self) -> AsyncGenerator[Row, None]:
Expand Down Expand Up @@ -343,35 +341,3 @@ def _revise_request_rowset(
# this will avoid an unwanted full table scan
raise _RowSetComplete()
return RowSetPB(row_keys=adjusted_keys, row_ranges=adjusted_ranges)

@staticmethod
def _build_exception(
exc_list: list[Exception], is_timeout: bool, timeout_val: float
) -> tuple[Exception, Exception | None]:
"""
Build retry error based on exceptions encountered during operation
Args:
- exc_list: list of exceptions encountered during operation
- is_timeout: whether the operation failed due to timeout
- timeout_val: the operation timeout value in seconds, for constructing
the error message
Returns:
- tuple of the exception to raise, and a cause exception if applicable
"""
if is_timeout:
# if failed due to timeout, raise deadline exceeded as primary exception
source_exc: Exception = core_exceptions.DeadlineExceeded(
f"operation_timeout of {timeout_val} exceeded"
)
elif exc_list:
# otherwise, raise non-retryable error as primary exception
source_exc = exc_list.pop()
else:
source_exc = RuntimeError("failed with unspecified exception")
# use the retry exception group as the cause of the exception
cause_exc: Exception | None = (
RetryExceptionGroup(exc_list) if exc_list else None
)
source_exc.__cause__ = cause_exc
return source_exc, cause_exc
71 changes: 25 additions & 46 deletions google/cloud/bigtable/data/_async/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import random
import os

from functools import partial

from google.cloud.bigtable_v2.services.bigtable.client import BigtableClientMeta
from google.cloud.bigtable_v2.services.bigtable.async_client import BigtableAsyncClient
Expand All @@ -43,9 +44,8 @@
)
from google.cloud.bigtable_v2.types.bigtable import PingAndWarmRequest
from google.cloud.client import ClientWithProject
from google.api_core.exceptions import GoogleAPICallError
from google.cloud.environment_vars import BIGTABLE_EMULATOR # type: ignore
from google.api_core import retry_async as retries
from google.api_core import retry as retries
from google.api_core.exceptions import DeadlineExceeded
from google.api_core.exceptions import ServiceUnavailable
from google.api_core.exceptions import Aborted
Expand All @@ -65,7 +65,7 @@
from google.cloud.bigtable.data._helpers import _WarmedInstanceKey
from google.cloud.bigtable.data._helpers import _CONCURRENCY_LIMIT
from google.cloud.bigtable.data._helpers import _make_metadata
from google.cloud.bigtable.data._helpers import _convert_retry_deadline
from google.cloud.bigtable.data._helpers import _retry_exception_factory
from google.cloud.bigtable.data._helpers import _validate_timeouts
from google.cloud.bigtable.data._helpers import _get_retryable_errors
from google.cloud.bigtable.data._helpers import _get_timeouts
Expand Down Expand Up @@ -223,7 +223,7 @@ async def close(self, timeout: float = 2.0):

async def _ping_and_warm_instances(
self, channel: grpc.aio.Channel, instance_key: _WarmedInstanceKey | None = None
) -> list[GoogleAPICallError | None]:
) -> list[BaseException | None]:
"""
Prepares the backend for requests on a channel
Expand Down Expand Up @@ -578,7 +578,6 @@ async def read_rows_stream(
will be chained with a RetryExceptionGroup containing GoogleAPIError exceptions
from any retries that failed
- GoogleAPIError: raised if the request encounters an unrecoverable error
- IdleTimeout: if iterator was abandoned
"""
operation_timeout, attempt_timeout = _get_timeouts(
operation_timeout, attempt_timeout, self
Expand Down Expand Up @@ -761,6 +760,9 @@ async def read_rows_sharded(
for result in batch_result:
if isinstance(result, Exception):
error_dict[shard_idx] = result
elif isinstance(result, BaseException):
# BaseException not expected; raise immediately
raise result
else:
results_list.extend(result)
shard_idx += 1
Expand Down Expand Up @@ -872,22 +874,8 @@ async def sample_row_keys(
# prepare retryable
retryable_excs = _get_retryable_errors(retryable_errors, self)
predicate = retries.if_exception_type(*retryable_excs)
transient_errors = []

def on_error_fn(exc):
# add errors to list if retryable
if predicate(exc):
transient_errors.append(exc)

retry = retries.AsyncRetry(
predicate=predicate,
timeout=operation_timeout,
initial=0.01,
multiplier=2,
maximum=60,
on_error=on_error_fn,
is_stream=False,
)
sleep_generator = retries.exponential_sleep_generator(0.01, 2, 60)

# prepare request
metadata = _make_metadata(self.table_name, self.app_profile_id)
Expand All @@ -902,10 +890,13 @@ async def execute_rpc():
)
return [(s.row_key, s.offset_bytes) async for s in results]

wrapped_fn = _convert_retry_deadline(
retry(execute_rpc), operation_timeout, transient_errors, is_async=True
return await retries.retry_target_async(
execute_rpc,
predicate,
sleep_generator,
operation_timeout,
exception_factory=_retry_exception_factory,
)
return await wrapped_fn()

def mutations_batcher(
self,
Expand Down Expand Up @@ -1014,37 +1005,25 @@ async def mutate_row(
# mutations should not be retried
predicate = retries.if_exception_type()

transient_errors = []

def on_error_fn(exc):
if predicate(exc):
transient_errors.append(exc)
sleep_generator = retries.exponential_sleep_generator(0.01, 2, 60)

retry = retries.AsyncRetry(
predicate=predicate,
on_error=on_error_fn,
timeout=operation_timeout,
initial=0.01,
multiplier=2,
maximum=60,
)
# wrap rpc in retry logic
retry_wrapped = retry(self.client._gapic_client.mutate_row)
# convert RetryErrors from retry wrapper into DeadlineExceeded errors
deadline_wrapped = _convert_retry_deadline(
retry_wrapped, operation_timeout, transient_errors, is_async=True
)
metadata = _make_metadata(self.table_name, self.app_profile_id)
# trigger rpc
await deadline_wrapped(
target = partial(
self.client._gapic_client.mutate_row,
row_key=row_key.encode("utf-8") if isinstance(row_key, str) else row_key,
mutations=[mutation._to_pb() for mutation in mutations_list],
table_name=self.table_name,
app_profile_id=self.app_profile_id,
timeout=attempt_timeout,
metadata=metadata,
metadata=_make_metadata(self.table_name, self.app_profile_id),
retry=None,
)
return await retries.retry_target_async(
target,
predicate,
sleep_generator,
operation_timeout,
exception_factory=_retry_exception_factory,
)

async def bulk_mutate_rows(
self,
Expand Down
3 changes: 3 additions & 0 deletions google/cloud/bigtable/data/_async/mutations_batcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,9 @@ async def _wait_for_batch_results(
if isinstance(result, Exception):
# will receive direct Exception objects if request task fails
found_errors.append(result)
elif isinstance(result, BaseException):
# BaseException not expected from grpc calls. Raise immediately
raise result
elif result:
# completed requests will return a list of FailedMutationEntryError
for e in result:
Expand Down
Loading

0 comments on commit 45aa877

Please sign in to comment.