Skip to content

Commit

Permalink
Merge branch 'update_api_core' into client_side_metrics3
Browse files Browse the repository at this point in the history
  • Loading branch information
daniel-sanche committed Dec 13, 2023
2 parents 7a337e6 + c329474 commit 9638d2a
Show file tree
Hide file tree
Showing 21 changed files with 728 additions and 560 deletions.
6 changes: 0 additions & 6 deletions google/__init__.py

This file was deleted.

6 changes: 0 additions & 6 deletions google/cloud/__init__.py

This file was deleted.

2 changes: 0 additions & 2 deletions google/cloud/bigtable/data/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
from google.cloud.bigtable.data.mutations import DeleteAllFromFamily
from google.cloud.bigtable.data.mutations import DeleteAllFromRow

from google.cloud.bigtable.data.exceptions import IdleTimeout
from google.cloud.bigtable.data.exceptions import InvalidChunk
from google.cloud.bigtable.data.exceptions import FailedMutationEntryError
from google.cloud.bigtable.data.exceptions import FailedQueryShardError
Expand Down Expand Up @@ -63,7 +62,6 @@
"DeleteAllFromRow",
"Row",
"Cell",
"IdleTimeout",
"InvalidChunk",
"FailedMutationEntryError",
"FailedQueryShardError",
Expand Down
42 changes: 16 additions & 26 deletions google/cloud/bigtable/data/_async/_mutate_rows.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,17 @@
#
from __future__ import annotations

from typing import TYPE_CHECKING
import asyncio
from typing import Sequence, TYPE_CHECKING
from dataclasses import dataclass
import functools

from google.api_core import exceptions as core_exceptions
from google.api_core import retry_async as retries
from google.api_core import retry as retries
import google.cloud.bigtable_v2.types.bigtable as types_pb
import google.cloud.bigtable.data.exceptions as bt_exceptions
from google.cloud.bigtable.data._helpers import _make_metadata
from google.cloud.bigtable.data._helpers import _convert_retry_deadline
from google.cloud.bigtable.data._helpers import _attempt_timeout_generator
from google.cloud.bigtable.data._helpers import _retry_exception_factory

# mutate_rows requests are limited to this number of mutations
from google.cloud.bigtable.data.mutations import _MUTATE_ROWS_REQUEST_MUTATION_LIMIT
Expand Down Expand Up @@ -68,6 +67,7 @@ def __init__(
operation_timeout: float,
attempt_timeout: float | None,
metrics: ActiveOperationMetric,
retryable_exceptions: Sequence[type[Exception]] = (),
):
"""
Args:
Expand All @@ -77,6 +77,8 @@ def __init__(
- operation_timeout: the timeout to use for the entire operation, in seconds.
- attempt_timeout: the timeout to use for each mutate_rows attempt, in seconds.
If not specified, the request will run until operation_timeout is reached.
- metrics: the metrics object to use for tracking the operation
- retryable_exceptions: a list of exceptions that should be retried
"""
# check that mutations are within limits
total_mutations = sum(len(entry.mutations) for entry in mutation_entries)
Expand All @@ -98,23 +100,17 @@ def __init__(
# create predicate for determining which errors are retryable
self.is_retryable = retries.if_exception_type(
# RPC level errors
core_exceptions.DeadlineExceeded,
core_exceptions.ServiceUnavailable,
*retryable_exceptions,
# Entry level errors
bt_exceptions._MutateRowsIncomplete,
)
# build retryable operation
retry = retries.AsyncRetry(
predicate=self.is_retryable,
timeout=operation_timeout,
initial=0.01,
multiplier=2,
maximum=60,
)
# TODO: fix typing after streaming retries are finalized
retry_wrapped = retry(self._run_attempt) # type: ignore
self._operation = _convert_retry_deadline(
retry_wrapped, operation_timeout, is_async=True
sleep_generator = retries.exponential_sleep_generator(0.01, 2, 60)
self._operation = retries.retry_target_async(
self._run_attempt,
self.is_retryable,
sleep_generator,
operation_timeout,
exception_factory=_retry_exception_factory,
)
# initialize state
self.timeout_generator = _attempt_timeout_generator(
Expand All @@ -136,7 +132,7 @@ async def start(self):
try:
# trigger mutate_rows
self._operation_metrics.start()
await self._operation()
await self._operation
except Exception as exc:
# exceptions raised by retryable are added to the list of exceptions for all unfinalized mutations
incomplete_indices = self.remaining_indices.copy()
Expand Down Expand Up @@ -192,6 +188,7 @@ async def _run_attempt(self):
result_generator = await self._gapic_fn(
timeout=next(self.timeout_generator),
entries=request_entries,
retry=None,
)
try:
async for result_list in result_generator:
Expand All @@ -216,13 +213,6 @@ async def _run_attempt(self):
+ await result_generator.initial_metadata()
)
self._operation_metrics.add_response_metadata(metadata)
except asyncio.CancelledError:
# when retry wrapper timeout expires, the operation is cancelled
# make sure incomplete indices are tracked,
# but don't record exception (it will be raised by wrapper)
# TODO: remove asyncio.wait_for in retry wrapper. Let grpc call handle expiration
self.remaining_indices.extend(active_request_indices.values())
raise
except Exception as exc:
# add this exception to list for each mutation that wasn't
# already handled, and update remaining_indices if mutation is retryable
Expand Down
59 changes: 15 additions & 44 deletions google/cloud/bigtable/data/_async/_read_rows.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,13 @@

from __future__ import annotations

from typing import TYPE_CHECKING, AsyncGenerator, Awaitable

from typing import (
TYPE_CHECKING,
AsyncGenerator,
AsyncIterable,
Awaitable,
Sequence,
)
import time

from google.cloud.bigtable_v2.types import ReadRowsRequest as ReadRowsRequestPB
Expand All @@ -27,17 +32,19 @@
from google.cloud.bigtable.data.row import Row, Cell
from google.cloud.bigtable.data.read_rows_query import ReadRowsQuery
from google.cloud.bigtable.data.exceptions import InvalidChunk
from google.cloud.bigtable.data.exceptions import RetryExceptionGroup
from google.cloud.bigtable.data.exceptions import _RowSetComplete
from google.cloud.bigtable.data._helpers import _attempt_timeout_generator
from google.cloud.bigtable.data._helpers import _make_metadata
from google.cloud.bigtable.data._helpers import backoff_generator

from google.api_core import retry_async as retries
from google.api_core.retry_streaming_async import retry_target_stream
from google.api_core.retry import RetryFailureReason
from google.api_core import exceptions as core_exceptions
from google.api_core.grpc_helpers_async import GrpcAsyncStream
from google.cloud.bigtable.data._helpers import _retry_exception_factory

from google.api_core import retry as retries
from google.api_core.retry import exponential_sleep_generator

if TYPE_CHECKING:
from google.cloud.bigtable.data._async.client import TableAsync
Expand Down Expand Up @@ -81,6 +88,7 @@ def __init__(
operation_timeout: float,
attempt_timeout: float,
metrics: ActiveOperationMetric,
retryable_exceptions: Sequence[type[Exception]] = (),
):
self.attempt_timeout_gen = _attempt_timeout_generator(
attempt_timeout, operation_timeout
Expand All @@ -95,11 +103,7 @@ def __init__(
else:
self.request = query._to_pb(table)
self.table = table
self._predicate = retries.if_exception_type(
core_exceptions.DeadlineExceeded,
core_exceptions.ServiceUnavailable,
core_exceptions.Aborted,
)
self._predicate = retries.if_exception_type(*retryable_exceptions)
self._metadata = _make_metadata(
table.table_name,
table.app_profile_id,
Expand All @@ -117,12 +121,12 @@ def start_operation(self) -> AsyncGenerator[Row, None]:
sleep_generator = backoff_generator()
self._operation_metrics.backoff_generator = sleep_generator

return retry_target_stream(
return retries.retry_target_stream_async(
self._read_rows_attempt,
self._operation_metrics.build_wrapped_predicate(self._predicate),
sleep_generator,
self.operation_timeout,
exception_factory=self._build_exception,
exception_factory=_retry_exception_factory,
)

def _read_rows_attempt(self) -> AsyncGenerator[Row, None]:
Expand Down Expand Up @@ -375,36 +379,3 @@ def _revise_request_rowset(
# this will avoid an unwanted full table scan
raise _RowSetComplete()
return RowSetPB(row_keys=adjusted_keys, row_ranges=adjusted_ranges)

@staticmethod
def _build_exception(
exc_list: list[Exception], reason: RetryFailureReason, timeout_val: float | None
) -> tuple[Exception, Exception | None]:
"""
Build retry error based on exceptions encountered during operation
Args:
- exc_list: list of exceptions encountered during operation
- is_timeout: whether the operation failed due to timeout
- timeout_val: the operation timeout value in seconds, for constructing
the error message
Returns:
- tuple of the exception to raise, and a cause exception if applicable
"""
if reason == RetryFailureReason.TIMEOUT:
# if failed due to timeout, raise deadline exceeded as primary exception
timeout_val_str = f"of {timeout_val:.1f}s " if timeout_val else ""
source_exc: Exception = core_exceptions.DeadlineExceeded(
f"operation_timeout {timeout_val_str}exceeded"
)
elif exc_list:
# otherwise, raise non-retryable error as primary exception
source_exc = exc_list.pop()
else:
source_exc = RuntimeError("failed with unspecified exception")
# use the retry exception group as the cause of the exception
cause_exc: Exception | None = (
RetryExceptionGroup(exc_list) if exc_list else None
)
source_exc.__cause__ = cause_exc
return source_exc, cause_exc
Loading

0 comments on commit 9638d2a

Please sign in to comment.