From 561a7a0689f1f347d38f85c1dcb7976018e885cb Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Fri, 20 Oct 2023 14:51:36 -0700 Subject: [PATCH 01/13] added option for TABLE_DEFAULT --- google/cloud/bigtable/data/_async/client.py | 86 ++++++++++--------- .../bigtable/data/_async/mutations_batcher.py | 16 ++-- 2 files changed, 53 insertions(+), 49 deletions(-) diff --git a/google/cloud/bigtable/data/_async/client.py b/google/cloud/bigtable/data/_async/client.py index e5be1b2d3..6fbae4bd8 100644 --- a/google/cloud/bigtable/data/_async/client.py +++ b/google/cloud/bigtable/data/_async/client.py @@ -21,6 +21,7 @@ AsyncIterable, Optional, Set, + Literal, TYPE_CHECKING, ) @@ -84,6 +85,9 @@ "_WarmedInstanceKey", ["instance_name", "table_name", "app_profile_id"] ) +# literal argument used to signal that the value set at the table should be used +TABLE_DEFAULT = "TABLE_DEFAULT" + class BigtableDataClientAsync(ClientWithProject): def __init__( @@ -525,8 +529,8 @@ async def read_rows_stream( self, query: ReadRowsQuery, *, - operation_timeout: float | None = None, - attempt_timeout: float | None = None, + operation_timeout: float | Literal["TABLE_DEFAULT"] = TABLE_DEFAULT, + attempt_timeout: float | None | Literal["TABLE_DEFAULT"] = TABLE_DEFAULT, ) -> AsyncIterable[Row]: """ Read a set of rows from the table, based on the specified query. @@ -538,12 +542,12 @@ async def read_rows_stream( - query: contains details about which rows to return - operation_timeout: the time budget for the entire operation, in seconds. Failed requests will be retried within the budget. - If None, defaults to the Table's default_read_rows_operation_timeout + If TABLE_DEFAULT, defaults to the Table's default_read_rows_operation_timeout - attempt_timeout: the time budget for an individual network request, in seconds. If it takes longer than this time to complete, the request will be cancelled with a DeadlineExceeded exception, and a retry will be attempted. - If None, defaults to the Table's default_read_rows_attempt_timeout, - or the operation_timeout if that is also None. + If TABLE_DEFAULT, defaults to the Table's default_read_rows_attempt_timeout + If None, defaults to operation_timeout. Returns: - an asynchronous iterator that yields rows returned by the query Raises: @@ -575,8 +579,8 @@ async def read_rows( self, query: ReadRowsQuery, *, - operation_timeout: float | None = None, - attempt_timeout: float | None = None, + operation_timeout: float | Literal["TABLE_DEFAULT"] = TABLE_DEFAULT, + attempt_timeout: float | None | Literal["TABLE_DEFAULT"] = TABLE_DEFAULT, ) -> list[Row]: """ Read a set of rows from the table, based on the specified query. @@ -589,12 +593,12 @@ async def read_rows( - query: contains details about which rows to return - operation_timeout: the time budget for the entire operation, in seconds. Failed requests will be retried within the budget. - If None, defaults to the Table's default_read_rows_operation_timeout + If TABLE_DEFAULT, defaults to the Table's default_read_rows_operation_timeout - attempt_timeout: the time budget for an individual network request, in seconds. If it takes longer than this time to complete, the request will be cancelled with a DeadlineExceeded exception, and a retry will be attempted. - If None, defaults to the Table's default_read_rows_attempt_timeout, - or the operation_timeout if that is also None. + If TABLE_DEFAULT, defaults to the Table's default_read_rows_attempt_timeout. + If None, defaults to operation_timeout. Returns: - a list of Rows returned by the query Raises: @@ -627,12 +631,12 @@ async def read_row( - query: contains details about which rows to return - operation_timeout: the time budget for the entire operation, in seconds. Failed requests will be retried within the budget. - If None, defaults to the Table's default_read_rows_operation_timeout + If TABLE_DEFAULT, defaults to the Table's default_read_rows_operation_timeout - attempt_timeout: the time budget for an individual network request, in seconds. If it takes longer than this time to complete, the request will be cancelled with a DeadlineExceeded exception, and a retry will be attempted. - If None, defaults to the Table's default_read_rows_attempt_timeout, or the operation_timeout - if that is also None. + If TABLE_DEFAULT, defaults to the Table's default_read_rows_attempt_timeout. + If None, defaults to operation_timeout. Returns: - a Row object if the row exists, otherwise None Raises: @@ -677,12 +681,12 @@ async def read_rows_sharded( - sharded_query: a sharded query to execute - operation_timeout: the time budget for the entire operation, in seconds. Failed requests will be retried within the budget. - If None, defaults to the Table's default_read_rows_operation_timeout + If TABLE_DEFAULT, defaults to the Table's default_read_rows_operation_timeout - attempt_timeout: the time budget for an individual network request, in seconds. If it takes longer than this time to complete, the request will be cancelled with a DeadlineExceeded exception, and a retry will be attempted. - If None, defaults to the Table's default_read_rows_attempt_timeout, or the operation_timeout - if that is also None. + If TABLE_DEFAULT, defaults to the Table's default_read_rows_attempt_timeout. + If None, defaults to operation_timeout. Raises: - ShardedReadRowsExceptionGroup: if any of the queries failed - ValueError: if the query_list is empty @@ -754,12 +758,12 @@ async def row_exists( - row_key: the key of the row to check - operation_timeout: the time budget for the entire operation, in seconds. Failed requests will be retried within the budget. - If None, defaults to the Table's default_read_rows_operation_timeout + If TABLE_DEFAULT, defaults to the Table's default_read_rows_operation_timeout - attempt_timeout: the time budget for an individual network request, in seconds. If it takes longer than this time to complete, the request will be cancelled with a DeadlineExceeded exception, and a retry will be attempted. - If None, defaults to the Table's default_read_rows_attempt_timeout, or the operation_timeout - if that is also None. + If TABLE_DEFAULT, defaults to the Table's default_read_rows_attempt_timeout. + If None, defaults to operation_timeout. Returns: - a bool indicating whether the row exists Raises: @@ -785,8 +789,8 @@ async def row_exists( async def sample_row_keys( self, *, - operation_timeout: float | None = None, - attempt_timeout: float | None = None, + operation_timeout: float | Literal["TABLE_DEFAULT"] = TABLE_DEFAULT, + attempt_timeout: float | None | Literal["TABLE_DEFAULT"] = TABLE_DEFAULT, ) -> RowKeySamples: """ Return a set of RowKeySamples that delimit contiguous sections of the table of @@ -802,12 +806,12 @@ async def sample_row_keys( Args: - operation_timeout: the time budget for the entire operation, in seconds. Failed requests will be retried within the budget. - If None, defaults to the Table's default_operation_timeout + If TABLE_DEFAULT, defaults to the Table's default_operation_timeout - attempt_timeout: the time budget for an individual network request, in seconds. If it takes longer than this time to complete, the request will be cancelled with a DeadlineExceeded exception, and a retry will be attempted. - If None, defaults to the Table's default_attempt_timeout, or the operation_timeout - if that is also None. + If TABLE_DEFAULT, defaults to the Table's default_attempt_timeout. + If None, defaults to operation_timeout. Returns: - a set of RowKeySamples the delimit contiguous sections of the table Raises: @@ -873,8 +877,8 @@ def mutations_batcher( flush_limit_bytes: int = 20 * _MB_SIZE, flow_control_max_mutation_count: int = 100_000, flow_control_max_bytes: int = 100 * _MB_SIZE, - batch_operation_timeout: float | None = None, - batch_attempt_timeout: float | None = None, + batch_operation_timeout: float | Literal["TABLE_DEFAULT"] = TABLE_DEFAULT, + batch_attempt_timeout: float | None | Literal["TABLE_DEFAULT"] = TABLE_DEFAULT, ) -> MutationsBatcherAsync: """ Returns a new mutations batcher instance. @@ -890,11 +894,11 @@ def mutations_batcher( - flush_limit_bytes: Flush immediately after flush_limit_bytes bytes are added. - flow_control_max_mutation_count: Maximum number of inflight mutations. - flow_control_max_bytes: Maximum number of inflight bytes. - - batch_operation_timeout: timeout for each mutate_rows operation, in seconds. If None, - table default_mutate_rows_operation_timeout will be used - - batch_attempt_timeout: timeout for each individual request, in seconds. If None, - table default_mutate_rows_attempt_timeout will be used, or batch_operation_timeout - if that is also None. + - batch_operation_timeout: timeout for each mutate_rows operation, in seconds. + If TABLE_DEFAULT, defaults to the Table's default_mutate_rows_operation_timeout. + - batch_attempt_timeout: timeout for each individual request, in seconds. + If TABLE_DEFAULT, defaults to the Table's default_mutate_rows_attempt_timeout. + If None, defaults to batch_operation_timeout. Returns: - a MutationsBatcherAsync context manager that can batch requests """ @@ -914,8 +918,8 @@ async def mutate_row( row_key: str | bytes, mutations: list[Mutation] | Mutation, *, - operation_timeout: float | None = None, - attempt_timeout: float | None = None, + operation_timeout: float | Literal["TABLE_DEFAULT"] = TABLE_DEFAULT, + attempt_timeout: float | None | Literal["TABLE_DEFAULT"] = TABLE_DEFAULT, ): """ Mutates a row atomically. @@ -931,12 +935,12 @@ async def mutate_row( - mutations: the set of mutations to apply to the row - operation_timeout: the time budget for the entire operation, in seconds. Failed requests will be retried within the budget. - If None, defaults to the Table's default_operation_timeout + If TABLE_DEFAULT, defaults to the Table's default_operation_timeout - attempt_timeout: the time budget for an individual network request, in seconds. If it takes longer than this time to complete, the request will be cancelled with a DeadlineExceeded exception, and a retry will be attempted. - If None, defaults to the Table's default_attempt_timeout, or the operation_timeout - if that is also None. + If TABLE_DEFAULT, defaults to the Table's default_attempt_timeout. + If None, defaults to operation_timeout. Raises: - DeadlineExceeded: raised after operation timeout will be chained with a RetryExceptionGroup containing all @@ -1000,8 +1004,8 @@ async def bulk_mutate_rows( self, mutation_entries: list[RowMutationEntry], *, - operation_timeout: float | None = None, - attempt_timeout: float | None = None, + operation_timeout: float | Literal["TABLE_DEFAULT"] = TABLE_DEFAULT, + attempt_timeout: float | None | Literal["TABLE_DEFAULT"] = TABLE_DEFAULT, ): """ Applies mutations for multiple rows in a single batched request. @@ -1021,12 +1025,12 @@ async def bulk_mutate_rows( in arbitrary order - operation_timeout: the time budget for the entire operation, in seconds. Failed requests will be retried within the budget. - If None, defaults to the Table's default_mutate_rows_operation_timeout + If TABLE_DEFAULT, defaults to the Table's default_mutate_rows_operation_timeout - attempt_timeout: the time budget for an individual network request, in seconds. If it takes longer than this time to complete, the request will be cancelled with a DeadlineExceeded exception, and a retry will be attempted. - If None, defaults to the Table's default_mutate_rows_attempt_timeout, - or the operation_timeout if that is also None. + If TABLE_DEFAULT, defaults to the Table's default_mutate_rows_attempt_timeout + If None, defaults to operation_timeout. Raises: - MutationsExceptionGroup if one or more mutations fails Contains details about any failed entries in .exceptions diff --git a/google/cloud/bigtable/data/_async/mutations_batcher.py b/google/cloud/bigtable/data/_async/mutations_batcher.py index 34e1bfb5d..01bac34d6 100644 --- a/google/cloud/bigtable/data/_async/mutations_batcher.py +++ b/google/cloud/bigtable/data/_async/mutations_batcher.py @@ -14,7 +14,7 @@ # from __future__ import annotations -from typing import Any, TYPE_CHECKING +from typing import Any, Literal, TYPE_CHECKING import asyncio import atexit import warnings @@ -189,8 +189,8 @@ def __init__( flush_limit_bytes: int = 20 * _MB_SIZE, flow_control_max_mutation_count: int = 100_000, flow_control_max_bytes: int = 100 * _MB_SIZE, - batch_operation_timeout: float | None = None, - batch_attempt_timeout: float | None = None, + batch_operation_timeout: float | Literal["TABLE_DEFAULT"] = "TABLE_DEFAULT", + batch_attempt_timeout: float | None | Literal["TABLE_DEFAULT"] = "TABLE_DEFAULT", ): """ Args: @@ -202,11 +202,11 @@ def __init__( - flush_limit_bytes: Flush immediately after flush_limit_bytes bytes are added. - flow_control_max_mutation_count: Maximum number of inflight mutations. - flow_control_max_bytes: Maximum number of inflight bytes. - - batch_operation_timeout: timeout for each mutate_rows operation, in seconds. If None, - table default_mutate_rows_operation_timeout will be used - - batch_attempt_timeout: timeout for each individual request, in seconds. If None, - table default_mutate_rows_attempt_timeout will be used, or batch_operation_timeout - if that is also None. + - batch_operation_timeout: timeout for each mutate_rows operation, in seconds. + If TABLE_DEFAULT, defaults to the Table's default_mutate_rows_operation_timeout. + - batch_attempt_timeout: timeout for each individual request, in seconds. + If TABLE_DEFAULT, defaults to the Table's default_mutate_rows_attempt_timeout. + If None, defaults to batch_operation_timeout. """ self._operation_timeout: float = ( batch_operation_timeout or table.default_mutate_rows_operation_timeout From 2c479c204469af089678cf03f23c590fb0ee2c79 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Fri, 20 Oct 2023 15:24:21 -0700 Subject: [PATCH 02/13] changed code to use TABLE_DEFAULT --- google/cloud/bigtable/data/_async/client.py | 93 ++++++------------- .../bigtable/data/_async/mutations_batcher.py | 12 +-- google/cloud/bigtable/data/_helpers.py | 15 ++- 3 files changed, 45 insertions(+), 75 deletions(-) diff --git a/google/cloud/bigtable/data/_async/client.py b/google/cloud/bigtable/data/_async/client.py index 6fbae4bd8..b00f97bac 100644 --- a/google/cloud/bigtable/data/_async/client.py +++ b/google/cloud/bigtable/data/_async/client.py @@ -63,6 +63,7 @@ from google.cloud.bigtable.data._helpers import _make_metadata from google.cloud.bigtable.data._helpers import _convert_retry_deadline from google.cloud.bigtable.data._helpers import _validate_timeouts +from google.cloud.bigtable.data._helpers import _get_timeouts from google.cloud.bigtable.data._async.mutations_batcher import MutationsBatcherAsync from google.cloud.bigtable.data._async.mutations_batcher import _MB_SIZE from google.cloud.bigtable.data._helpers import _attempt_timeout_generator @@ -85,9 +86,6 @@ "_WarmedInstanceKey", ["instance_name", "table_name", "app_profile_id"] ) -# literal argument used to signal that the value set at the table should be used -TABLE_DEFAULT = "TABLE_DEFAULT" - class BigtableDataClientAsync(ClientWithProject): def __init__( @@ -529,8 +527,8 @@ async def read_rows_stream( self, query: ReadRowsQuery, *, - operation_timeout: float | Literal["TABLE_DEFAULT"] = TABLE_DEFAULT, - attempt_timeout: float | None | Literal["TABLE_DEFAULT"] = TABLE_DEFAULT, + operation_timeout: float | Literal["TABLE_DEFAULT"] = "TABLE_DEFAULT", + attempt_timeout: float | None | Literal["TABLE_DEFAULT"] = "TABLE_DEFAULT", ) -> AsyncIterable[Row]: """ Read a set of rows from the table, based on the specified query. @@ -557,15 +555,7 @@ async def read_rows_stream( - GoogleAPIError: raised if the request encounters an unrecoverable error - IdleTimeout: if iterator was abandoned """ - operation_timeout = ( - operation_timeout or self.default_read_rows_operation_timeout - ) - attempt_timeout = ( - attempt_timeout - or self.default_read_rows_attempt_timeout - or operation_timeout - ) - _validate_timeouts(operation_timeout, attempt_timeout) + operation_timeout, attempt_timeout = _get_timeouts(operation_timeout, attempt_timeout, self.default_read_rows_operation_timeout, self.default_read_rows_attempt_timeout) row_merger = _ReadRowsOperationAsync( query, @@ -579,8 +569,8 @@ async def read_rows( self, query: ReadRowsQuery, *, - operation_timeout: float | Literal["TABLE_DEFAULT"] = TABLE_DEFAULT, - attempt_timeout: float | None | Literal["TABLE_DEFAULT"] = TABLE_DEFAULT, + operation_timeout: float | Literal["TABLE_DEFAULT"] = "TABLE_DEFAULT", + attempt_timeout: float | None | Literal["TABLE_DEFAULT"] = "TABLE_DEFAULT", ) -> list[Row]: """ Read a set of rows from the table, based on the specified query. @@ -619,8 +609,8 @@ async def read_row( row_key: str | bytes, *, row_filter: RowFilter | None = None, - operation_timeout: int | float | None = None, - attempt_timeout: int | float | None = None, + operation_timeout: float | Literal["TABLE_DEFAULT"] = "TABLE_DEFAULT", + attempt_timeout: float | None | Literal["TABLE_DEFAULT"] = "TABLE_DEFAULT", ) -> Row | None: """ Read a single row from the table, based on the specified key. @@ -661,8 +651,8 @@ async def read_rows_sharded( self, sharded_query: ShardedQuery, *, - operation_timeout: int | float | None = None, - attempt_timeout: int | float | None = None, + operation_timeout: float | Literal["TABLE_DEFAULT"] = "TABLE_DEFAULT", + attempt_timeout: float | None | Literal["TABLE_DEFAULT"] = "TABLE_DEFAULT", ) -> list[Row]: """ Runs a sharded query in parallel, then return the results in a single list. @@ -694,15 +684,7 @@ async def read_rows_sharded( if not sharded_query: raise ValueError("empty sharded_query") # reduce operation_timeout between batches - operation_timeout = ( - operation_timeout or self.default_read_rows_operation_timeout - ) - attempt_timeout = ( - attempt_timeout - or self.default_read_rows_attempt_timeout - or operation_timeout - ) - _validate_timeouts(operation_timeout, attempt_timeout) + operation_timeout, attempt_timeout = _get_timeouts(operation_timeout, attempt_timeout, self.default_read_rows_operation_timeout, self.default_read_rows_attempt_timeout) timeout_generator = _attempt_timeout_generator( operation_timeout, operation_timeout ) @@ -748,8 +730,8 @@ async def row_exists( self, row_key: str | bytes, *, - operation_timeout: int | float | None = None, - attempt_timeout: int | float | None = None, + operation_timeout: float | Literal["TABLE_DEFAULT"] = "TABLE_DEFAULT", + attempt_timeout: float | None | Literal["TABLE_DEFAULT"] = "TABLE_DEFAULT", ) -> bool: """ Return a boolean indicating whether the specified row exists in the table. @@ -789,8 +771,8 @@ async def row_exists( async def sample_row_keys( self, *, - operation_timeout: float | Literal["TABLE_DEFAULT"] = TABLE_DEFAULT, - attempt_timeout: float | None | Literal["TABLE_DEFAULT"] = TABLE_DEFAULT, + operation_timeout: float | Literal["TABLE_DEFAULT"] = "TABLE_DEFAULT", + attempt_timeout: float | None | Literal["TABLE_DEFAULT"] = "TABLE_DEFAULT", ) -> RowKeySamples: """ Return a set of RowKeySamples that delimit contiguous sections of the table of @@ -821,12 +803,7 @@ async def sample_row_keys( - GoogleAPIError: raised if the request encounters an unrecoverable error """ # prepare timeouts - operation_timeout = operation_timeout or self.default_operation_timeout - attempt_timeout = ( - attempt_timeout or self.default_attempt_timeout or operation_timeout - ) - _validate_timeouts(operation_timeout, attempt_timeout) - + operation_timeout, attempt_timeout = _get_timeouts(operation_timeout, attempt_timeout, self.default_operation_timeout, self.default_attempt_timeout) attempt_timeout_gen = _attempt_timeout_generator( attempt_timeout, operation_timeout ) @@ -877,8 +854,8 @@ def mutations_batcher( flush_limit_bytes: int = 20 * _MB_SIZE, flow_control_max_mutation_count: int = 100_000, flow_control_max_bytes: int = 100 * _MB_SIZE, - batch_operation_timeout: float | Literal["TABLE_DEFAULT"] = TABLE_DEFAULT, - batch_attempt_timeout: float | None | Literal["TABLE_DEFAULT"] = TABLE_DEFAULT, + batch_operation_timeout: float | Literal["TABLE_DEFAULT"] = "TABLE_DEFAULT", + batch_attempt_timeout: float | None | Literal["TABLE_DEFAULT"] = "TABLE_DEFAULT", ) -> MutationsBatcherAsync: """ Returns a new mutations batcher instance. @@ -918,8 +895,8 @@ async def mutate_row( row_key: str | bytes, mutations: list[Mutation] | Mutation, *, - operation_timeout: float | Literal["TABLE_DEFAULT"] = TABLE_DEFAULT, - attempt_timeout: float | None | Literal["TABLE_DEFAULT"] = TABLE_DEFAULT, + operation_timeout: float | Literal["TABLE_DEFAULT"] = "TABLE_DEFAULT", + attempt_timeout: float | None | Literal["TABLE_DEFAULT"] = "TABLE_DEFAULT", ): """ Mutates a row atomically. @@ -948,11 +925,7 @@ async def mutate_row( - GoogleAPIError: raised on non-idempotent operations that cannot be safely retried. """ - operation_timeout = operation_timeout or self.default_operation_timeout - attempt_timeout = ( - attempt_timeout or self.default_attempt_timeout or operation_timeout - ) - _validate_timeouts(operation_timeout, attempt_timeout) + operation_timeout, attempt_timeout = _get_timeouts(operation_timeout, attempt_timeout, self.default_operation_timeout, self.default_attempt_timeout) if isinstance(row_key, str): row_key = row_key.encode("utf-8") @@ -1004,8 +977,8 @@ async def bulk_mutate_rows( self, mutation_entries: list[RowMutationEntry], *, - operation_timeout: float | Literal["TABLE_DEFAULT"] = TABLE_DEFAULT, - attempt_timeout: float | None | Literal["TABLE_DEFAULT"] = TABLE_DEFAULT, + operation_timeout: float | Literal["TABLE_DEFAULT"] = "TABLE_DEFAULT", + attempt_timeout: float | None | Literal["TABLE_DEFAULT"] = "TABLE_DEFAULT", ): """ Applies mutations for multiple rows in a single batched request. @@ -1035,15 +1008,7 @@ async def bulk_mutate_rows( - MutationsExceptionGroup if one or more mutations fails Contains details about any failed entries in .exceptions """ - operation_timeout = ( - operation_timeout or self.default_mutate_rows_operation_timeout - ) - attempt_timeout = ( - attempt_timeout - or self.default_mutate_rows_attempt_timeout - or operation_timeout - ) - _validate_timeouts(operation_timeout, attempt_timeout) + operation_timeout, attempt_timeout = _get_timeouts(operation_timeout, attempt_timeout, self.default_mutate_rows_operation_timeout, self.default_mutate_rows_attempt_timeout) operation = _MutateRowsOperationAsync( self.client._gapic_client, @@ -1096,7 +1061,7 @@ async def check_and_mutate_row( Raises: - GoogleAPIError exceptions from grpc call """ - operation_timeout = operation_timeout or self.default_operation_timeout + operation_timeout = operation_timeout if isinstance(operation_timeout, float) else self.default_operation_timeout if operation_timeout <= 0: raise ValueError("operation_timeout must be greater than 0") row_key = row_key.encode("utf-8") if isinstance(row_key, str) else row_key @@ -1132,7 +1097,7 @@ async def read_modify_write_row( row_key: str | bytes, rules: ReadModifyWriteRule | list[ReadModifyWriteRule], *, - operation_timeout: int | float | None = None, + operation_timeout: float | Literal["TABLE_DEFAULT"] = "TABLE_DEFAULT", ) -> Row: """ Reads and modifies a row atomically according to input ReadModifyWriteRules, @@ -1149,15 +1114,15 @@ async def read_modify_write_row( Rules are applied in order, meaning that earlier rules will affect the results of later ones. - operation_timeout: the time budget for the entire operation, in seconds. - Failed requests will not be retried. Defaults to the Table's default_operation_timeout - if None. + Failed requests will not be retried. + If TABLE_DEFAULT, defaults to the Table's default_operation_timeout Returns: - Row: containing cell data that was modified as part of the operation Raises: - GoogleAPIError exceptions from grpc call """ - operation_timeout = operation_timeout or self.default_operation_timeout + operation_timeout = operation_timeout if isinstance(operation_timeout, float) else self.default_operation_timeout row_key = row_key.encode("utf-8") if isinstance(row_key, str) else row_key if operation_timeout <= 0: raise ValueError("operation_timeout must be greater than 0") diff --git a/google/cloud/bigtable/data/_async/mutations_batcher.py b/google/cloud/bigtable/data/_async/mutations_batcher.py index 01bac34d6..3f600916e 100644 --- a/google/cloud/bigtable/data/_async/mutations_batcher.py +++ b/google/cloud/bigtable/data/_async/mutations_batcher.py @@ -23,7 +23,7 @@ from google.cloud.bigtable.data.mutations import RowMutationEntry from google.cloud.bigtable.data.exceptions import MutationsExceptionGroup from google.cloud.bigtable.data.exceptions import FailedMutationEntryError -from google.cloud.bigtable.data._helpers import _validate_timeouts +from google.cloud.bigtable.data._helpers import _get_timeouts from google.cloud.bigtable.data._async._mutate_rows import _MutateRowsOperationAsync from google.cloud.bigtable.data._async._mutate_rows import ( @@ -208,15 +208,7 @@ def __init__( If TABLE_DEFAULT, defaults to the Table's default_mutate_rows_attempt_timeout. If None, defaults to batch_operation_timeout. """ - self._operation_timeout: float = ( - batch_operation_timeout or table.default_mutate_rows_operation_timeout - ) - self._attempt_timeout: float = ( - batch_attempt_timeout - or table.default_mutate_rows_attempt_timeout - or self._operation_timeout - ) - _validate_timeouts(self._operation_timeout, self._attempt_timeout) + self._operation_timeout, self._attempt_timeout = _get_timeouts(batch_operation_timeout, batch_attempt_timeout, table.default_mutate_rows_operation_timeout, table.default_mutate_rows_attempt_timeout) self.closed: bool = False self._table = table self._staged_entries: list[RowMutationEntry] = [] diff --git a/google/cloud/bigtable/data/_helpers.py b/google/cloud/bigtable/data/_helpers.py index 1f8a63d21..9b7375e29 100644 --- a/google/cloud/bigtable/data/_helpers.py +++ b/google/cloud/bigtable/data/_helpers.py @@ -13,7 +13,7 @@ # from __future__ import annotations -from typing import Callable, Any +from typing import Callable, Literal, Any import time from google.api_core import exceptions as core_exceptions @@ -114,6 +114,19 @@ def wrapper(*args, **kwargs): return wrapper_async if is_async else wrapper +def _get_timeouts(operation: float | Literal["TABLE_DEFAULT"], attempt: float | None | Literal["TABLE_DEFAULT"], table_operation: float, table_attempt: float | None) -> tuple[float, float]: + final_operation = operation if isinstance(operation, float) else table_operation + if attempt is None: + final_attempt = final_operation + elif attempt == "TABLE_DEFAULT": + final_attempt = table_attempt if table_attempt is not None else final_operation + else: + final_attempt = attempt + + _validate_timeouts(final_operation, final_attempt, allow_none=False) + return final_operation, final_attempt + + def _validate_timeouts( operation_timeout: float, attempt_timeout: float | None, allow_none: bool = False ): From 2809d9d0327f45c83a39b2919d6c3e5664ec60a4 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Fri, 20 Oct 2023 15:30:47 -0700 Subject: [PATCH 03/13] clean up --- google/cloud/bigtable/data/_async/client.py | 89 +++++++++++++------ .../bigtable/data/_async/mutations_batcher.py | 11 ++- google/cloud/bigtable/data/_helpers.py | 7 +- 3 files changed, 77 insertions(+), 30 deletions(-) diff --git a/google/cloud/bigtable/data/_async/client.py b/google/cloud/bigtable/data/_async/client.py index b00f97bac..3a8d6656d 100644 --- a/google/cloud/bigtable/data/_async/client.py +++ b/google/cloud/bigtable/data/_async/client.py @@ -86,6 +86,8 @@ "_WarmedInstanceKey", ["instance_name", "table_name", "app_profile_id"] ) +TABLE_DEFAULT = Literal["TABLE_DEFAULT"] + class BigtableDataClientAsync(ClientWithProject): def __init__( @@ -527,8 +529,8 @@ async def read_rows_stream( self, query: ReadRowsQuery, *, - operation_timeout: float | Literal["TABLE_DEFAULT"] = "TABLE_DEFAULT", - attempt_timeout: float | None | Literal["TABLE_DEFAULT"] = "TABLE_DEFAULT", + operation_timeout: float | TABLE_DEFAULT = "TABLE_DEFAULT", + attempt_timeout: float | None | TABLE_DEFAULT = "TABLE_DEFAULT", ) -> AsyncIterable[Row]: """ Read a set of rows from the table, based on the specified query. @@ -555,7 +557,12 @@ async def read_rows_stream( - GoogleAPIError: raised if the request encounters an unrecoverable error - IdleTimeout: if iterator was abandoned """ - operation_timeout, attempt_timeout = _get_timeouts(operation_timeout, attempt_timeout, self.default_read_rows_operation_timeout, self.default_read_rows_attempt_timeout) + operation_timeout, attempt_timeout = _get_timeouts( + operation_timeout, + attempt_timeout, + self.default_read_rows_operation_timeout, + self.default_read_rows_attempt_timeout, + ) row_merger = _ReadRowsOperationAsync( query, @@ -569,8 +576,8 @@ async def read_rows( self, query: ReadRowsQuery, *, - operation_timeout: float | Literal["TABLE_DEFAULT"] = "TABLE_DEFAULT", - attempt_timeout: float | None | Literal["TABLE_DEFAULT"] = "TABLE_DEFAULT", + operation_timeout: float | TABLE_DEFAULT = "TABLE_DEFAULT", + attempt_timeout: float | None | TABLE_DEFAULT = "TABLE_DEFAULT", ) -> list[Row]: """ Read a set of rows from the table, based on the specified query. @@ -609,8 +616,8 @@ async def read_row( row_key: str | bytes, *, row_filter: RowFilter | None = None, - operation_timeout: float | Literal["TABLE_DEFAULT"] = "TABLE_DEFAULT", - attempt_timeout: float | None | Literal["TABLE_DEFAULT"] = "TABLE_DEFAULT", + operation_timeout: float | TABLE_DEFAULT = "TABLE_DEFAULT", + attempt_timeout: float | None | TABLE_DEFAULT = "TABLE_DEFAULT", ) -> Row | None: """ Read a single row from the table, based on the specified key. @@ -651,8 +658,8 @@ async def read_rows_sharded( self, sharded_query: ShardedQuery, *, - operation_timeout: float | Literal["TABLE_DEFAULT"] = "TABLE_DEFAULT", - attempt_timeout: float | None | Literal["TABLE_DEFAULT"] = "TABLE_DEFAULT", + operation_timeout: float | TABLE_DEFAULT = "TABLE_DEFAULT", + attempt_timeout: float | None | TABLE_DEFAULT = "TABLE_DEFAULT", ) -> list[Row]: """ Runs a sharded query in parallel, then return the results in a single list. @@ -684,7 +691,12 @@ async def read_rows_sharded( if not sharded_query: raise ValueError("empty sharded_query") # reduce operation_timeout between batches - operation_timeout, attempt_timeout = _get_timeouts(operation_timeout, attempt_timeout, self.default_read_rows_operation_timeout, self.default_read_rows_attempt_timeout) + operation_timeout, attempt_timeout = _get_timeouts( + operation_timeout, + attempt_timeout, + self.default_read_rows_operation_timeout, + self.default_read_rows_attempt_timeout, + ) timeout_generator = _attempt_timeout_generator( operation_timeout, operation_timeout ) @@ -730,8 +742,8 @@ async def row_exists( self, row_key: str | bytes, *, - operation_timeout: float | Literal["TABLE_DEFAULT"] = "TABLE_DEFAULT", - attempt_timeout: float | None | Literal["TABLE_DEFAULT"] = "TABLE_DEFAULT", + operation_timeout: float | TABLE_DEFAULT = "TABLE_DEFAULT", + attempt_timeout: float | None | TABLE_DEFAULT = "TABLE_DEFAULT", ) -> bool: """ Return a boolean indicating whether the specified row exists in the table. @@ -771,8 +783,8 @@ async def row_exists( async def sample_row_keys( self, *, - operation_timeout: float | Literal["TABLE_DEFAULT"] = "TABLE_DEFAULT", - attempt_timeout: float | None | Literal["TABLE_DEFAULT"] = "TABLE_DEFAULT", + operation_timeout: float | TABLE_DEFAULT = "TABLE_DEFAULT", + attempt_timeout: float | None | TABLE_DEFAULT = "TABLE_DEFAULT", ) -> RowKeySamples: """ Return a set of RowKeySamples that delimit contiguous sections of the table of @@ -803,7 +815,12 @@ async def sample_row_keys( - GoogleAPIError: raised if the request encounters an unrecoverable error """ # prepare timeouts - operation_timeout, attempt_timeout = _get_timeouts(operation_timeout, attempt_timeout, self.default_operation_timeout, self.default_attempt_timeout) + operation_timeout, attempt_timeout = _get_timeouts( + operation_timeout, + attempt_timeout, + self.default_operation_timeout, + self.default_attempt_timeout, + ) attempt_timeout_gen = _attempt_timeout_generator( attempt_timeout, operation_timeout ) @@ -854,8 +871,8 @@ def mutations_batcher( flush_limit_bytes: int = 20 * _MB_SIZE, flow_control_max_mutation_count: int = 100_000, flow_control_max_bytes: int = 100 * _MB_SIZE, - batch_operation_timeout: float | Literal["TABLE_DEFAULT"] = "TABLE_DEFAULT", - batch_attempt_timeout: float | None | Literal["TABLE_DEFAULT"] = "TABLE_DEFAULT", + batch_operation_timeout: float | TABLE_DEFAULT = "TABLE_DEFAULT", + batch_attempt_timeout: float | None | TABLE_DEFAULT = "TABLE_DEFAULT", ) -> MutationsBatcherAsync: """ Returns a new mutations batcher instance. @@ -895,8 +912,8 @@ async def mutate_row( row_key: str | bytes, mutations: list[Mutation] | Mutation, *, - operation_timeout: float | Literal["TABLE_DEFAULT"] = "TABLE_DEFAULT", - attempt_timeout: float | None | Literal["TABLE_DEFAULT"] = "TABLE_DEFAULT", + operation_timeout: float | TABLE_DEFAULT = "TABLE_DEFAULT", + attempt_timeout: float | None | TABLE_DEFAULT = "TABLE_DEFAULT", ): """ Mutates a row atomically. @@ -925,7 +942,12 @@ async def mutate_row( - GoogleAPIError: raised on non-idempotent operations that cannot be safely retried. """ - operation_timeout, attempt_timeout = _get_timeouts(operation_timeout, attempt_timeout, self.default_operation_timeout, self.default_attempt_timeout) + operation_timeout, attempt_timeout = _get_timeouts( + operation_timeout, + attempt_timeout, + self.default_operation_timeout, + self.default_attempt_timeout, + ) if isinstance(row_key, str): row_key = row_key.encode("utf-8") @@ -977,8 +999,8 @@ async def bulk_mutate_rows( self, mutation_entries: list[RowMutationEntry], *, - operation_timeout: float | Literal["TABLE_DEFAULT"] = "TABLE_DEFAULT", - attempt_timeout: float | None | Literal["TABLE_DEFAULT"] = "TABLE_DEFAULT", + operation_timeout: float | TABLE_DEFAULT = "TABLE_DEFAULT", + attempt_timeout: float | None | TABLE_DEFAULT = "TABLE_DEFAULT", ): """ Applies mutations for multiple rows in a single batched request. @@ -1008,7 +1030,12 @@ async def bulk_mutate_rows( - MutationsExceptionGroup if one or more mutations fails Contains details about any failed entries in .exceptions """ - operation_timeout, attempt_timeout = _get_timeouts(operation_timeout, attempt_timeout, self.default_mutate_rows_operation_timeout, self.default_mutate_rows_attempt_timeout) + operation_timeout, attempt_timeout = _get_timeouts( + operation_timeout, + attempt_timeout, + self.default_mutate_rows_operation_timeout, + self.default_mutate_rows_attempt_timeout, + ) operation = _MutateRowsOperationAsync( self.client._gapic_client, @@ -1061,7 +1088,11 @@ async def check_and_mutate_row( Raises: - GoogleAPIError exceptions from grpc call """ - operation_timeout = operation_timeout if isinstance(operation_timeout, float) else self.default_operation_timeout + operation_timeout = ( + operation_timeout + if isinstance(operation_timeout, float) + else self.default_operation_timeout + ) if operation_timeout <= 0: raise ValueError("operation_timeout must be greater than 0") row_key = row_key.encode("utf-8") if isinstance(row_key, str) else row_key @@ -1097,7 +1128,7 @@ async def read_modify_write_row( row_key: str | bytes, rules: ReadModifyWriteRule | list[ReadModifyWriteRule], *, - operation_timeout: float | Literal["TABLE_DEFAULT"] = "TABLE_DEFAULT", + operation_timeout: float | TABLE_DEFAULT = "TABLE_DEFAULT", ) -> Row: """ Reads and modifies a row atomically according to input ReadModifyWriteRules, @@ -1114,7 +1145,7 @@ async def read_modify_write_row( Rules are applied in order, meaning that earlier rules will affect the results of later ones. - operation_timeout: the time budget for the entire operation, in seconds. - Failed requests will not be retried. + Failed requests will not be retried. If TABLE_DEFAULT, defaults to the Table's default_operation_timeout Returns: - Row: containing cell data that was modified as part of the @@ -1122,7 +1153,11 @@ async def read_modify_write_row( Raises: - GoogleAPIError exceptions from grpc call """ - operation_timeout = operation_timeout if isinstance(operation_timeout, float) else self.default_operation_timeout + operation_timeout = ( + operation_timeout + if isinstance(operation_timeout, float) + else self.default_operation_timeout + ) row_key = row_key.encode("utf-8") if isinstance(row_key, str) else row_key if operation_timeout <= 0: raise ValueError("operation_timeout must be greater than 0") diff --git a/google/cloud/bigtable/data/_async/mutations_batcher.py b/google/cloud/bigtable/data/_async/mutations_batcher.py index 3f600916e..90279b9dd 100644 --- a/google/cloud/bigtable/data/_async/mutations_batcher.py +++ b/google/cloud/bigtable/data/_async/mutations_batcher.py @@ -190,7 +190,9 @@ def __init__( flow_control_max_mutation_count: int = 100_000, flow_control_max_bytes: int = 100 * _MB_SIZE, batch_operation_timeout: float | Literal["TABLE_DEFAULT"] = "TABLE_DEFAULT", - batch_attempt_timeout: float | None | Literal["TABLE_DEFAULT"] = "TABLE_DEFAULT", + batch_attempt_timeout: float + | None + | Literal["TABLE_DEFAULT"] = "TABLE_DEFAULT", ): """ Args: @@ -208,7 +210,12 @@ def __init__( If TABLE_DEFAULT, defaults to the Table's default_mutate_rows_attempt_timeout. If None, defaults to batch_operation_timeout. """ - self._operation_timeout, self._attempt_timeout = _get_timeouts(batch_operation_timeout, batch_attempt_timeout, table.default_mutate_rows_operation_timeout, table.default_mutate_rows_attempt_timeout) + self._operation_timeout, self._attempt_timeout = _get_timeouts( + batch_operation_timeout, + batch_attempt_timeout, + table.default_mutate_rows_operation_timeout, + table.default_mutate_rows_attempt_timeout, + ) self.closed: bool = False self._table = table self._staged_entries: list[RowMutationEntry] = [] diff --git a/google/cloud/bigtable/data/_helpers.py b/google/cloud/bigtable/data/_helpers.py index 9b7375e29..1485087c1 100644 --- a/google/cloud/bigtable/data/_helpers.py +++ b/google/cloud/bigtable/data/_helpers.py @@ -114,7 +114,12 @@ def wrapper(*args, **kwargs): return wrapper_async if is_async else wrapper -def _get_timeouts(operation: float | Literal["TABLE_DEFAULT"], attempt: float | None | Literal["TABLE_DEFAULT"], table_operation: float, table_attempt: float | None) -> tuple[float, float]: +def _get_timeouts( + operation: float | Literal["TABLE_DEFAULT"], + attempt: float | None | Literal["TABLE_DEFAULT"], + table_operation: float, + table_attempt: float | None, +) -> tuple[float, float]: final_operation = operation if isinstance(operation, float) else table_operation if attempt is None: final_attempt = final_operation From c24601be59ce59f92ca020b186f4709282c9d2dd Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Fri, 20 Oct 2023 16:01:27 -0700 Subject: [PATCH 04/13] use enum for table defaults --- google/cloud/bigtable/data/_async/client.py | 137 +++++++----------- .../bigtable/data/_async/mutations_batcher.py | 13 +- google/cloud/bigtable/data/_helpers.py | 28 ++-- 3 files changed, 75 insertions(+), 103 deletions(-) diff --git a/google/cloud/bigtable/data/_async/client.py b/google/cloud/bigtable/data/_async/client.py index 3a8d6656d..a18e37a64 100644 --- a/google/cloud/bigtable/data/_async/client.py +++ b/google/cloud/bigtable/data/_async/client.py @@ -32,6 +32,7 @@ import sys import random import os +import enum from collections import namedtuple @@ -86,7 +87,11 @@ "_WarmedInstanceKey", ["instance_name", "table_name", "app_profile_id"] ) -TABLE_DEFAULT = Literal["TABLE_DEFAULT"] + +class _TABLE_DEFAULT(enum.Enum): + DEFAULT = "DEFAULT" + READ_ROWS = "READ_ROWS_DEFAULT" + MUTATE_ROWS = "MUTATE_ROWS_DEFAULT" class BigtableDataClientAsync(ClientWithProject): @@ -529,8 +534,8 @@ async def read_rows_stream( self, query: ReadRowsQuery, *, - operation_timeout: float | TABLE_DEFAULT = "TABLE_DEFAULT", - attempt_timeout: float | None | TABLE_DEFAULT = "TABLE_DEFAULT", + operation_timeout: float | _TABLE_DEFAULT = _TABLE_DEFAULT.READ_ROWS, + attempt_timeout: float | None | _TABLE_DEFAULT = _TABLE_DEFAULT.READ_ROWS, ) -> AsyncIterable[Row]: """ Read a set of rows from the table, based on the specified query. @@ -542,11 +547,11 @@ async def read_rows_stream( - query: contains details about which rows to return - operation_timeout: the time budget for the entire operation, in seconds. Failed requests will be retried within the budget. - If TABLE_DEFAULT, defaults to the Table's default_read_rows_operation_timeout + Defaults to the Table's default_read_rows_operation_timeout - attempt_timeout: the time budget for an individual network request, in seconds. If it takes longer than this time to complete, the request will be cancelled with a DeadlineExceeded exception, and a retry will be attempted. - If TABLE_DEFAULT, defaults to the Table's default_read_rows_attempt_timeout + Defaults to the Table's default_read_rows_attempt_timeout. If None, defaults to operation_timeout. Returns: - an asynchronous iterator that yields rows returned by the query @@ -557,12 +562,7 @@ async def read_rows_stream( - GoogleAPIError: raised if the request encounters an unrecoverable error - IdleTimeout: if iterator was abandoned """ - operation_timeout, attempt_timeout = _get_timeouts( - operation_timeout, - attempt_timeout, - self.default_read_rows_operation_timeout, - self.default_read_rows_attempt_timeout, - ) + operation_timeout, attempt_timeout = _get_timeouts(operation_timeout, attempt_timeout, self) row_merger = _ReadRowsOperationAsync( query, @@ -576,8 +576,8 @@ async def read_rows( self, query: ReadRowsQuery, *, - operation_timeout: float | TABLE_DEFAULT = "TABLE_DEFAULT", - attempt_timeout: float | None | TABLE_DEFAULT = "TABLE_DEFAULT", + operation_timeout: float | _TABLE_DEFAULT = _TABLE_DEFAULT.READ_ROWS, + attempt_timeout: float | None | _TABLE_DEFAULT = _TABLE_DEFAULT.READ_ROWS, ) -> list[Row]: """ Read a set of rows from the table, based on the specified query. @@ -590,11 +590,11 @@ async def read_rows( - query: contains details about which rows to return - operation_timeout: the time budget for the entire operation, in seconds. Failed requests will be retried within the budget. - If TABLE_DEFAULT, defaults to the Table's default_read_rows_operation_timeout + Defaults to the Table's default_read_rows_operation_timeout - attempt_timeout: the time budget for an individual network request, in seconds. If it takes longer than this time to complete, the request will be cancelled with a DeadlineExceeded exception, and a retry will be attempted. - If TABLE_DEFAULT, defaults to the Table's default_read_rows_attempt_timeout. + Defaults to the Table's default_read_rows_attempt_timeout. If None, defaults to operation_timeout. Returns: - a list of Rows returned by the query @@ -616,8 +616,8 @@ async def read_row( row_key: str | bytes, *, row_filter: RowFilter | None = None, - operation_timeout: float | TABLE_DEFAULT = "TABLE_DEFAULT", - attempt_timeout: float | None | TABLE_DEFAULT = "TABLE_DEFAULT", + operation_timeout: float | _TABLE_DEFAULT = _TABLE_DEFAULT.READ_ROWS, + attempt_timeout: float | None | _TABLE_DEFAULT = _TABLE_DEFAULT.READ_ROWS, ) -> Row | None: """ Read a single row from the table, based on the specified key. @@ -628,11 +628,11 @@ async def read_row( - query: contains details about which rows to return - operation_timeout: the time budget for the entire operation, in seconds. Failed requests will be retried within the budget. - If TABLE_DEFAULT, defaults to the Table's default_read_rows_operation_timeout + Defaults to the Table's default_read_rows_operation_timeout - attempt_timeout: the time budget for an individual network request, in seconds. If it takes longer than this time to complete, the request will be cancelled with a DeadlineExceeded exception, and a retry will be attempted. - If TABLE_DEFAULT, defaults to the Table's default_read_rows_attempt_timeout. + Defaults to the Table's default_read_rows_attempt_timeout. If None, defaults to operation_timeout. Returns: - a Row object if the row exists, otherwise None @@ -658,8 +658,8 @@ async def read_rows_sharded( self, sharded_query: ShardedQuery, *, - operation_timeout: float | TABLE_DEFAULT = "TABLE_DEFAULT", - attempt_timeout: float | None | TABLE_DEFAULT = "TABLE_DEFAULT", + operation_timeout: float | _TABLE_DEFAULT = _TABLE_DEFAULT.READ_ROWS, + attempt_timeout: float | None | _TABLE_DEFAULT = _TABLE_DEFAULT.READ_ROWS, ) -> list[Row]: """ Runs a sharded query in parallel, then return the results in a single list. @@ -678,11 +678,11 @@ async def read_rows_sharded( - sharded_query: a sharded query to execute - operation_timeout: the time budget for the entire operation, in seconds. Failed requests will be retried within the budget. - If TABLE_DEFAULT, defaults to the Table's default_read_rows_operation_timeout + Defaults to the Table's default_read_rows_operation_timeout - attempt_timeout: the time budget for an individual network request, in seconds. If it takes longer than this time to complete, the request will be cancelled with a DeadlineExceeded exception, and a retry will be attempted. - If TABLE_DEFAULT, defaults to the Table's default_read_rows_attempt_timeout. + Defaults to the Table's default_read_rows_attempt_timeout. If None, defaults to operation_timeout. Raises: - ShardedReadRowsExceptionGroup: if any of the queries failed @@ -691,12 +691,7 @@ async def read_rows_sharded( if not sharded_query: raise ValueError("empty sharded_query") # reduce operation_timeout between batches - operation_timeout, attempt_timeout = _get_timeouts( - operation_timeout, - attempt_timeout, - self.default_read_rows_operation_timeout, - self.default_read_rows_attempt_timeout, - ) + operation_timeout, attempt_timeout = _get_timeouts(operation_timeout, attempt_timeout, self) timeout_generator = _attempt_timeout_generator( operation_timeout, operation_timeout ) @@ -742,8 +737,8 @@ async def row_exists( self, row_key: str | bytes, *, - operation_timeout: float | TABLE_DEFAULT = "TABLE_DEFAULT", - attempt_timeout: float | None | TABLE_DEFAULT = "TABLE_DEFAULT", + operation_timeout: float | _TABLE_DEFAULT = _TABLE_DEFAULT.READ_ROWS, + attempt_timeout: float | None | _TABLE_DEFAULT = _TABLE_DEFAULT.READ_ROWS, ) -> bool: """ Return a boolean indicating whether the specified row exists in the table. @@ -752,11 +747,11 @@ async def row_exists( - row_key: the key of the row to check - operation_timeout: the time budget for the entire operation, in seconds. Failed requests will be retried within the budget. - If TABLE_DEFAULT, defaults to the Table's default_read_rows_operation_timeout + Defaults to the Table's default_read_rows_operation_timeout - attempt_timeout: the time budget for an individual network request, in seconds. If it takes longer than this time to complete, the request will be cancelled with a DeadlineExceeded exception, and a retry will be attempted. - If TABLE_DEFAULT, defaults to the Table's default_read_rows_attempt_timeout. + Defaults to the Table's default_read_rows_attempt_timeout. If None, defaults to operation_timeout. Returns: - a bool indicating whether the row exists @@ -783,8 +778,8 @@ async def row_exists( async def sample_row_keys( self, *, - operation_timeout: float | TABLE_DEFAULT = "TABLE_DEFAULT", - attempt_timeout: float | None | TABLE_DEFAULT = "TABLE_DEFAULT", + operation_timeout: float | _TABLE_DEFAULT = _TABLE_DEFAULT.DEFAULT, + attempt_timeout: float | None | _TABLE_DEFAULT = _TABLE_DEFAULT.DEFAULT, ) -> RowKeySamples: """ Return a set of RowKeySamples that delimit contiguous sections of the table of @@ -799,12 +794,12 @@ async def sample_row_keys( Args: - operation_timeout: the time budget for the entire operation, in seconds. - Failed requests will be retried within the budget. - If TABLE_DEFAULT, defaults to the Table's default_operation_timeout + Failed requests will be retried within the budget.i + Defaults to the Table's default_operation_timeout - attempt_timeout: the time budget for an individual network request, in seconds. If it takes longer than this time to complete, the request will be cancelled with a DeadlineExceeded exception, and a retry will be attempted. - If TABLE_DEFAULT, defaults to the Table's default_attempt_timeout. + Defaults to the Table's default_attempt_timeout. If None, defaults to operation_timeout. Returns: - a set of RowKeySamples the delimit contiguous sections of the table @@ -815,12 +810,7 @@ async def sample_row_keys( - GoogleAPIError: raised if the request encounters an unrecoverable error """ # prepare timeouts - operation_timeout, attempt_timeout = _get_timeouts( - operation_timeout, - attempt_timeout, - self.default_operation_timeout, - self.default_attempt_timeout, - ) + operation_timeout, attempt_timeout = _get_timeouts(operation_timeout, attempt_timeout, self) attempt_timeout_gen = _attempt_timeout_generator( attempt_timeout, operation_timeout ) @@ -871,8 +861,8 @@ def mutations_batcher( flush_limit_bytes: int = 20 * _MB_SIZE, flow_control_max_mutation_count: int = 100_000, flow_control_max_bytes: int = 100 * _MB_SIZE, - batch_operation_timeout: float | TABLE_DEFAULT = "TABLE_DEFAULT", - batch_attempt_timeout: float | None | TABLE_DEFAULT = "TABLE_DEFAULT", + batch_operation_timeout: float | _TABLE_DEFAULT = _TABLE_DEFAULT.MUTATE_ROWS, + batch_attempt_timeout: float | None | _TABLE_DEFAULT = _TABLE_DEFAULT.MUTATE_ROWS, ) -> MutationsBatcherAsync: """ Returns a new mutations batcher instance. @@ -889,9 +879,9 @@ def mutations_batcher( - flow_control_max_mutation_count: Maximum number of inflight mutations. - flow_control_max_bytes: Maximum number of inflight bytes. - batch_operation_timeout: timeout for each mutate_rows operation, in seconds. - If TABLE_DEFAULT, defaults to the Table's default_mutate_rows_operation_timeout. + Defaults to the Table's default_mutate_rows_operation_timeout - batch_attempt_timeout: timeout for each individual request, in seconds. - If TABLE_DEFAULT, defaults to the Table's default_mutate_rows_attempt_timeout. + Defaults to the Table's default_mutate_rows_attempt_timeout. If None, defaults to batch_operation_timeout. Returns: - a MutationsBatcherAsync context manager that can batch requests @@ -912,8 +902,8 @@ async def mutate_row( row_key: str | bytes, mutations: list[Mutation] | Mutation, *, - operation_timeout: float | TABLE_DEFAULT = "TABLE_DEFAULT", - attempt_timeout: float | None | TABLE_DEFAULT = "TABLE_DEFAULT", + operation_timeout: float | _TABLE_DEFAULT = _TABLE_DEFAULT.DEFAULT, + attempt_timeout: float | None | _TABLE_DEFAULT = _TABLE_DEFAULT.DEFAULT, ): """ Mutates a row atomically. @@ -929,11 +919,11 @@ async def mutate_row( - mutations: the set of mutations to apply to the row - operation_timeout: the time budget for the entire operation, in seconds. Failed requests will be retried within the budget. - If TABLE_DEFAULT, defaults to the Table's default_operation_timeout + Defaults to the Table's default_operation_timeout - attempt_timeout: the time budget for an individual network request, in seconds. If it takes longer than this time to complete, the request will be cancelled with a DeadlineExceeded exception, and a retry will be attempted. - If TABLE_DEFAULT, defaults to the Table's default_attempt_timeout. + Defaults to the Table's default_attempt_timeout. If None, defaults to operation_timeout. Raises: - DeadlineExceeded: raised after operation timeout @@ -942,12 +932,7 @@ async def mutate_row( - GoogleAPIError: raised on non-idempotent operations that cannot be safely retried. """ - operation_timeout, attempt_timeout = _get_timeouts( - operation_timeout, - attempt_timeout, - self.default_operation_timeout, - self.default_attempt_timeout, - ) + operation_timeout, attempt_timeout = _get_timeouts(operation_timeout, attempt_timeout, self) if isinstance(row_key, str): row_key = row_key.encode("utf-8") @@ -999,8 +984,8 @@ async def bulk_mutate_rows( self, mutation_entries: list[RowMutationEntry], *, - operation_timeout: float | TABLE_DEFAULT = "TABLE_DEFAULT", - attempt_timeout: float | None | TABLE_DEFAULT = "TABLE_DEFAULT", + operation_timeout: float | _TABLE_DEFAULT = _TABLE_DEFAULT.MUTATE_ROWS, + attempt_timeout: float | None | _TABLE_DEFAULT = _TABLE_DEFAULT.MUTATE_ROWS, ): """ Applies mutations for multiple rows in a single batched request. @@ -1020,22 +1005,17 @@ async def bulk_mutate_rows( in arbitrary order - operation_timeout: the time budget for the entire operation, in seconds. Failed requests will be retried within the budget. - If TABLE_DEFAULT, defaults to the Table's default_mutate_rows_operation_timeout + Defaults to the Table's default_mutate_rows_operation_timeout - attempt_timeout: the time budget for an individual network request, in seconds. If it takes longer than this time to complete, the request will be cancelled with a DeadlineExceeded exception, and a retry will be attempted. - If TABLE_DEFAULT, defaults to the Table's default_mutate_rows_attempt_timeout + Defaults to the Table's default_mutate_rows_attempt_timeout. If None, defaults to operation_timeout. Raises: - MutationsExceptionGroup if one or more mutations fails Contains details about any failed entries in .exceptions """ - operation_timeout, attempt_timeout = _get_timeouts( - operation_timeout, - attempt_timeout, - self.default_mutate_rows_operation_timeout, - self.default_mutate_rows_attempt_timeout, - ) + operation_timeout, attempt_timeout = _get_timeouts(operation_timeout, attempt_timeout, self) operation = _MutateRowsOperationAsync( self.client._gapic_client, @@ -1053,7 +1033,7 @@ async def check_and_mutate_row( *, true_case_mutations: Mutation | list[Mutation] | None = None, false_case_mutations: Mutation | list[Mutation] | None = None, - operation_timeout: int | float | None = None, + operation_timeout: float | _TABLE_DEFAULT = _TABLE_DEFAULT.DEFAULT, ) -> bool: """ Mutates a row atomically based on the output of a predicate filter @@ -1082,19 +1062,12 @@ async def check_and_mutate_row( `true_case_mutations is empty, and at most 100000. - operation_timeout: the time budget for the entire operation, in seconds. Failed requests will not be retried. Defaults to the Table's default_operation_timeout - if None. Returns: - bool indicating whether the predicate was true or false Raises: - GoogleAPIError exceptions from grpc call """ - operation_timeout = ( - operation_timeout - if isinstance(operation_timeout, float) - else self.default_operation_timeout - ) - if operation_timeout <= 0: - raise ValueError("operation_timeout must be greater than 0") + operation_timeout. _ = _get_timeouts(operation_timeout, None, self) row_key = row_key.encode("utf-8") if isinstance(row_key, str) else row_key if true_case_mutations is not None and not isinstance( true_case_mutations, list @@ -1128,7 +1101,7 @@ async def read_modify_write_row( row_key: str | bytes, rules: ReadModifyWriteRule | list[ReadModifyWriteRule], *, - operation_timeout: float | TABLE_DEFAULT = "TABLE_DEFAULT", + operation_timeout: float | _TABLE_DEFAULT = _TABLE_DEFAULT.DEFAULT, ) -> Row: """ Reads and modifies a row atomically according to input ReadModifyWriteRules, @@ -1146,18 +1119,14 @@ async def read_modify_write_row( results of later ones. - operation_timeout: the time budget for the entire operation, in seconds. Failed requests will not be retried. - If TABLE_DEFAULT, defaults to the Table's default_operation_timeout + Defaults to the Table's default_operation_timeout. Returns: - Row: containing cell data that was modified as part of the operation Raises: - GoogleAPIError exceptions from grpc call """ - operation_timeout = ( - operation_timeout - if isinstance(operation_timeout, float) - else self.default_operation_timeout - ) + operation_timeout. _ = _get_timeouts(operation_timeout, None, self) row_key = row_key.encode("utf-8") if isinstance(row_key, str) else row_key if operation_timeout <= 0: raise ValueError("operation_timeout must be greater than 0") diff --git a/google/cloud/bigtable/data/_async/mutations_batcher.py b/google/cloud/bigtable/data/_async/mutations_batcher.py index 90279b9dd..24d2b094a 100644 --- a/google/cloud/bigtable/data/_async/mutations_batcher.py +++ b/google/cloud/bigtable/data/_async/mutations_batcher.py @@ -189,10 +189,8 @@ def __init__( flush_limit_bytes: int = 20 * _MB_SIZE, flow_control_max_mutation_count: int = 100_000, flow_control_max_bytes: int = 100 * _MB_SIZE, - batch_operation_timeout: float | Literal["TABLE_DEFAULT"] = "TABLE_DEFAULT", - batch_attempt_timeout: float - | None - | Literal["TABLE_DEFAULT"] = "TABLE_DEFAULT", + batch_operation_timeout: float | "_TABLE_DEFAULT" = "MUTATE_ROWS_DEFAULT", + batch_attempt_timeout: float | None | "_TABLE_DEFAULT" = "MUTATE_ROWS_DEFAULT", ): """ Args: @@ -210,12 +208,7 @@ def __init__( If TABLE_DEFAULT, defaults to the Table's default_mutate_rows_attempt_timeout. If None, defaults to batch_operation_timeout. """ - self._operation_timeout, self._attempt_timeout = _get_timeouts( - batch_operation_timeout, - batch_attempt_timeout, - table.default_mutate_rows_operation_timeout, - table.default_mutate_rows_attempt_timeout, - ) + self._operation_timeout, self._attempt_timeout = _get_timeouts(batch_operation_timeout, batch_attempt_timeout, table) self.closed: bool = False self._table = table self._staged_entries: list[RowMutationEntry] = [] diff --git a/google/cloud/bigtable/data/_helpers.py b/google/cloud/bigtable/data/_helpers.py index 1485087c1..148df938b 100644 --- a/google/cloud/bigtable/data/_helpers.py +++ b/google/cloud/bigtable/data/_helpers.py @@ -114,17 +114,27 @@ def wrapper(*args, **kwargs): return wrapper_async if is_async else wrapper -def _get_timeouts( - operation: float | Literal["TABLE_DEFAULT"], - attempt: float | None | Literal["TABLE_DEFAULT"], - table_operation: float, - table_attempt: float | None, -) -> tuple[float, float]: - final_operation = operation if isinstance(operation, float) else table_operation +def _get_timeouts(operation: float | "_TABLE_DEFAULT", attempt: float | None | "_TABLE_DEFAULT", table) -> tuple[float, float]: + # TODO: docstring + # TODO: use enum for _TABLE_DEFAULT + if operation == "DEFAULT": + final_operation = table.default_operation_timeout + elif operation == "READ_ROWS_DEFAULT": + final_operation = table.default_read_rows_operation_timeout + elif operation == "MUTATE_ROWS_DEFAULT": + final_operation = table.default_mutate_rows_operation_timeout + else: + final_operation = operation + + if attempt == "DEFAULT": + attempt = table.default_attempt_timeout + elif attempt == "READ_ROWS_DEFAULT": + attempt = table.default_read_rows_attempt_timeout + elif attempt == "MUTATE_ROWS_DEFAULT": + attempt = table.default_mutate_rows_attempt_timeout + if attempt is None: final_attempt = final_operation - elif attempt == "TABLE_DEFAULT": - final_attempt = table_attempt if table_attempt is not None else final_operation else: final_attempt = attempt From b32d6bb33bf95755b379bfc570d5656a0f51412e Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Wed, 25 Oct 2023 09:00:58 -0700 Subject: [PATCH 05/13] made TABLE_DEFAULTS public --- google/cloud/bigtable/data/_async/client.py | 66 +++++++++++-------- .../bigtable/data/_async/mutations_batcher.py | 4 +- google/cloud/bigtable/data/_helpers.py | 4 +- 3 files changed, 44 insertions(+), 30 deletions(-) diff --git a/google/cloud/bigtable/data/_async/client.py b/google/cloud/bigtable/data/_async/client.py index a18e37a64..f7a8ef2b6 100644 --- a/google/cloud/bigtable/data/_async/client.py +++ b/google/cloud/bigtable/data/_async/client.py @@ -88,7 +88,7 @@ ) -class _TABLE_DEFAULT(enum.Enum): +class TABLE_DEFAULT(enum.Enum): DEFAULT = "DEFAULT" READ_ROWS = "READ_ROWS_DEFAULT" MUTATE_ROWS = "MUTATE_ROWS_DEFAULT" @@ -534,8 +534,8 @@ async def read_rows_stream( self, query: ReadRowsQuery, *, - operation_timeout: float | _TABLE_DEFAULT = _TABLE_DEFAULT.READ_ROWS, - attempt_timeout: float | None | _TABLE_DEFAULT = _TABLE_DEFAULT.READ_ROWS, + operation_timeout: float | TABLE_DEFAULT = TABLE_DEFAULT.READ_ROWS, + attempt_timeout: float | None | TABLE_DEFAULT = TABLE_DEFAULT.READ_ROWS, ) -> AsyncIterable[Row]: """ Read a set of rows from the table, based on the specified query. @@ -562,7 +562,9 @@ async def read_rows_stream( - GoogleAPIError: raised if the request encounters an unrecoverable error - IdleTimeout: if iterator was abandoned """ - operation_timeout, attempt_timeout = _get_timeouts(operation_timeout, attempt_timeout, self) + operation_timeout, attempt_timeout = _get_timeouts( + operation_timeout, attempt_timeout, self + ) row_merger = _ReadRowsOperationAsync( query, @@ -576,8 +578,8 @@ async def read_rows( self, query: ReadRowsQuery, *, - operation_timeout: float | _TABLE_DEFAULT = _TABLE_DEFAULT.READ_ROWS, - attempt_timeout: float | None | _TABLE_DEFAULT = _TABLE_DEFAULT.READ_ROWS, + operation_timeout: float | TABLE_DEFAULT = TABLE_DEFAULT.READ_ROWS, + attempt_timeout: float | None | TABLE_DEFAULT = TABLE_DEFAULT.READ_ROWS, ) -> list[Row]: """ Read a set of rows from the table, based on the specified query. @@ -616,8 +618,8 @@ async def read_row( row_key: str | bytes, *, row_filter: RowFilter | None = None, - operation_timeout: float | _TABLE_DEFAULT = _TABLE_DEFAULT.READ_ROWS, - attempt_timeout: float | None | _TABLE_DEFAULT = _TABLE_DEFAULT.READ_ROWS, + operation_timeout: float | TABLE_DEFAULT = TABLE_DEFAULT.READ_ROWS, + attempt_timeout: float | None | TABLE_DEFAULT = TABLE_DEFAULT.READ_ROWS, ) -> Row | None: """ Read a single row from the table, based on the specified key. @@ -658,8 +660,8 @@ async def read_rows_sharded( self, sharded_query: ShardedQuery, *, - operation_timeout: float | _TABLE_DEFAULT = _TABLE_DEFAULT.READ_ROWS, - attempt_timeout: float | None | _TABLE_DEFAULT = _TABLE_DEFAULT.READ_ROWS, + operation_timeout: float | TABLE_DEFAULT = TABLE_DEFAULT.READ_ROWS, + attempt_timeout: float | None | TABLE_DEFAULT = TABLE_DEFAULT.READ_ROWS, ) -> list[Row]: """ Runs a sharded query in parallel, then return the results in a single list. @@ -691,7 +693,9 @@ async def read_rows_sharded( if not sharded_query: raise ValueError("empty sharded_query") # reduce operation_timeout between batches - operation_timeout, attempt_timeout = _get_timeouts(operation_timeout, attempt_timeout, self) + operation_timeout, attempt_timeout = _get_timeouts( + operation_timeout, attempt_timeout, self + ) timeout_generator = _attempt_timeout_generator( operation_timeout, operation_timeout ) @@ -737,8 +741,8 @@ async def row_exists( self, row_key: str | bytes, *, - operation_timeout: float | _TABLE_DEFAULT = _TABLE_DEFAULT.READ_ROWS, - attempt_timeout: float | None | _TABLE_DEFAULT = _TABLE_DEFAULT.READ_ROWS, + operation_timeout: float | TABLE_DEFAULT = TABLE_DEFAULT.READ_ROWS, + attempt_timeout: float | None | TABLE_DEFAULT = TABLE_DEFAULT.READ_ROWS, ) -> bool: """ Return a boolean indicating whether the specified row exists in the table. @@ -778,8 +782,8 @@ async def row_exists( async def sample_row_keys( self, *, - operation_timeout: float | _TABLE_DEFAULT = _TABLE_DEFAULT.DEFAULT, - attempt_timeout: float | None | _TABLE_DEFAULT = _TABLE_DEFAULT.DEFAULT, + operation_timeout: float | TABLE_DEFAULT = TABLE_DEFAULT.DEFAULT, + attempt_timeout: float | None | TABLE_DEFAULT = TABLE_DEFAULT.DEFAULT, ) -> RowKeySamples: """ Return a set of RowKeySamples that delimit contiguous sections of the table of @@ -810,7 +814,9 @@ async def sample_row_keys( - GoogleAPIError: raised if the request encounters an unrecoverable error """ # prepare timeouts - operation_timeout, attempt_timeout = _get_timeouts(operation_timeout, attempt_timeout, self) + operation_timeout, attempt_timeout = _get_timeouts( + operation_timeout, attempt_timeout, self + ) attempt_timeout_gen = _attempt_timeout_generator( attempt_timeout, operation_timeout ) @@ -861,8 +867,8 @@ def mutations_batcher( flush_limit_bytes: int = 20 * _MB_SIZE, flow_control_max_mutation_count: int = 100_000, flow_control_max_bytes: int = 100 * _MB_SIZE, - batch_operation_timeout: float | _TABLE_DEFAULT = _TABLE_DEFAULT.MUTATE_ROWS, - batch_attempt_timeout: float | None | _TABLE_DEFAULT = _TABLE_DEFAULT.MUTATE_ROWS, + batch_operation_timeout: float | TABLE_DEFAULT = TABLE_DEFAULT.MUTATE_ROWS, + batch_attempt_timeout: float | None | TABLE_DEFAULT = TABLE_DEFAULT.MUTATE_ROWS, ) -> MutationsBatcherAsync: """ Returns a new mutations batcher instance. @@ -902,8 +908,8 @@ async def mutate_row( row_key: str | bytes, mutations: list[Mutation] | Mutation, *, - operation_timeout: float | _TABLE_DEFAULT = _TABLE_DEFAULT.DEFAULT, - attempt_timeout: float | None | _TABLE_DEFAULT = _TABLE_DEFAULT.DEFAULT, + operation_timeout: float | TABLE_DEFAULT = TABLE_DEFAULT.DEFAULT, + attempt_timeout: float | None | TABLE_DEFAULT = TABLE_DEFAULT.DEFAULT, ): """ Mutates a row atomically. @@ -932,7 +938,9 @@ async def mutate_row( - GoogleAPIError: raised on non-idempotent operations that cannot be safely retried. """ - operation_timeout, attempt_timeout = _get_timeouts(operation_timeout, attempt_timeout, self) + operation_timeout, attempt_timeout = _get_timeouts( + operation_timeout, attempt_timeout, self + ) if isinstance(row_key, str): row_key = row_key.encode("utf-8") @@ -984,8 +992,8 @@ async def bulk_mutate_rows( self, mutation_entries: list[RowMutationEntry], *, - operation_timeout: float | _TABLE_DEFAULT = _TABLE_DEFAULT.MUTATE_ROWS, - attempt_timeout: float | None | _TABLE_DEFAULT = _TABLE_DEFAULT.MUTATE_ROWS, + operation_timeout: float | TABLE_DEFAULT = TABLE_DEFAULT.MUTATE_ROWS, + attempt_timeout: float | None | TABLE_DEFAULT = TABLE_DEFAULT.MUTATE_ROWS, ): """ Applies mutations for multiple rows in a single batched request. @@ -1015,7 +1023,9 @@ async def bulk_mutate_rows( - MutationsExceptionGroup if one or more mutations fails Contains details about any failed entries in .exceptions """ - operation_timeout, attempt_timeout = _get_timeouts(operation_timeout, attempt_timeout, self) + operation_timeout, attempt_timeout = _get_timeouts( + operation_timeout, attempt_timeout, self + ) operation = _MutateRowsOperationAsync( self.client._gapic_client, @@ -1033,7 +1043,7 @@ async def check_and_mutate_row( *, true_case_mutations: Mutation | list[Mutation] | None = None, false_case_mutations: Mutation | list[Mutation] | None = None, - operation_timeout: float | _TABLE_DEFAULT = _TABLE_DEFAULT.DEFAULT, + operation_timeout: float | TABLE_DEFAULT = TABLE_DEFAULT.DEFAULT, ) -> bool: """ Mutates a row atomically based on the output of a predicate filter @@ -1067,7 +1077,7 @@ async def check_and_mutate_row( Raises: - GoogleAPIError exceptions from grpc call """ - operation_timeout. _ = _get_timeouts(operation_timeout, None, self) + operation_timeout._ = _get_timeouts(operation_timeout, None, self) row_key = row_key.encode("utf-8") if isinstance(row_key, str) else row_key if true_case_mutations is not None and not isinstance( true_case_mutations, list @@ -1101,7 +1111,7 @@ async def read_modify_write_row( row_key: str | bytes, rules: ReadModifyWriteRule | list[ReadModifyWriteRule], *, - operation_timeout: float | _TABLE_DEFAULT = _TABLE_DEFAULT.DEFAULT, + operation_timeout: float | TABLE_DEFAULT = TABLE_DEFAULT.DEFAULT, ) -> Row: """ Reads and modifies a row atomically according to input ReadModifyWriteRules, @@ -1126,7 +1136,7 @@ async def read_modify_write_row( Raises: - GoogleAPIError exceptions from grpc call """ - operation_timeout. _ = _get_timeouts(operation_timeout, None, self) + operation_timeout._ = _get_timeouts(operation_timeout, None, self) row_key = row_key.encode("utf-8") if isinstance(row_key, str) else row_key if operation_timeout <= 0: raise ValueError("operation_timeout must be greater than 0") diff --git a/google/cloud/bigtable/data/_async/mutations_batcher.py b/google/cloud/bigtable/data/_async/mutations_batcher.py index 24d2b094a..27daf6a27 100644 --- a/google/cloud/bigtable/data/_async/mutations_batcher.py +++ b/google/cloud/bigtable/data/_async/mutations_batcher.py @@ -208,7 +208,9 @@ def __init__( If TABLE_DEFAULT, defaults to the Table's default_mutate_rows_attempt_timeout. If None, defaults to batch_operation_timeout. """ - self._operation_timeout, self._attempt_timeout = _get_timeouts(batch_operation_timeout, batch_attempt_timeout, table) + self._operation_timeout, self._attempt_timeout = _get_timeouts( + batch_operation_timeout, batch_attempt_timeout, table + ) self.closed: bool = False self._table = table self._staged_entries: list[RowMutationEntry] = [] diff --git a/google/cloud/bigtable/data/_helpers.py b/google/cloud/bigtable/data/_helpers.py index 148df938b..e2effdbf1 100644 --- a/google/cloud/bigtable/data/_helpers.py +++ b/google/cloud/bigtable/data/_helpers.py @@ -114,7 +114,9 @@ def wrapper(*args, **kwargs): return wrapper_async if is_async else wrapper -def _get_timeouts(operation: float | "_TABLE_DEFAULT", attempt: float | None | "_TABLE_DEFAULT", table) -> tuple[float, float]: +def _get_timeouts( + operation: float | "_TABLE_DEFAULT", attempt: float | None | "_TABLE_DEFAULT", table +) -> tuple[float, float]: # TODO: docstring # TODO: use enum for _TABLE_DEFAULT if operation == "DEFAULT": From e31d765fd03bf4495c7cbdbe57a8c04ca7339bb7 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Thu, 26 Oct 2023 15:02:42 -0700 Subject: [PATCH 06/13] fixed typing issues --- google/cloud/bigtable/data/__init__.py | 12 +++--- google/cloud/bigtable/data/_async/client.py | 32 +++++----------- .../bigtable/data/_async/mutations_batcher.py | 7 ++-- google/cloud/bigtable/data/_helpers.py | 38 ++++++++++++++++--- 4 files changed, 50 insertions(+), 39 deletions(-) diff --git a/google/cloud/bigtable/data/__init__.py b/google/cloud/bigtable/data/__init__.py index 4b01d0e6b..a68be5417 100644 --- a/google/cloud/bigtable/data/__init__.py +++ b/google/cloud/bigtable/data/__init__.py @@ -13,9 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. # - -from typing import List, Tuple - from google.cloud.bigtable import gapic_version as package_version from google.cloud.bigtable.data._async.client import BigtableDataClientAsync @@ -44,10 +41,10 @@ from google.cloud.bigtable.data.exceptions import MutationsExceptionGroup from google.cloud.bigtable.data.exceptions import ShardedReadRowsExceptionGroup -# Type alias for the output of sample_keys -RowKeySamples = List[Tuple[bytes, int]] -# type alias for the output of query.shard() -ShardedQuery = List[ReadRowsQuery] +from google.cloud.bigtable.data._helpers import TABLE_DEFAULT +from google.cloud.bigtable.data._helpers import RowKeySamples +from google.cloud.bigtable.data._helpers import ShardedQuery + __version__: str = package_version.__version__ @@ -74,4 +71,5 @@ "MutationsExceptionGroup", "ShardedReadRowsExceptionGroup", "ShardedQuery", + "TABLE_DEFAULT", ) diff --git a/google/cloud/bigtable/data/_async/client.py b/google/cloud/bigtable/data/_async/client.py index f7a8ef2b6..bcdb85d12 100644 --- a/google/cloud/bigtable/data/_async/client.py +++ b/google/cloud/bigtable/data/_async/client.py @@ -21,7 +21,6 @@ AsyncIterable, Optional, Set, - Literal, TYPE_CHECKING, ) @@ -32,9 +31,7 @@ import sys import random import os -import enum -from collections import namedtuple from google.cloud.bigtable_v2.services.bigtable.client import BigtableClientMeta from google.cloud.bigtable_v2.services.bigtable.async_client import BigtableAsyncClient @@ -61,37 +58,26 @@ from google.cloud.bigtable.data.mutations import Mutation, RowMutationEntry from google.cloud.bigtable.data._async._mutate_rows import _MutateRowsOperationAsync +from google.cloud.bigtable.data._helpers import TABLE_DEFAULT +from google.cloud.bigtable.data._helpers import _WarmedInstanceKey +from google.cloud.bigtable.data._helpers import _CONCURRENCY_LIMIT from google.cloud.bigtable.data._helpers import _make_metadata from google.cloud.bigtable.data._helpers import _convert_retry_deadline from google.cloud.bigtable.data._helpers import _validate_timeouts from google.cloud.bigtable.data._helpers import _get_timeouts +from google.cloud.bigtable.data._helpers import _attempt_timeout_generator from google.cloud.bigtable.data._async.mutations_batcher import MutationsBatcherAsync from google.cloud.bigtable.data._async.mutations_batcher import _MB_SIZE -from google.cloud.bigtable.data._helpers import _attempt_timeout_generator - from google.cloud.bigtable.data.read_modify_write_rules import ReadModifyWriteRule from google.cloud.bigtable.data.row_filters import RowFilter from google.cloud.bigtable.data.row_filters import StripValueTransformerFilter from google.cloud.bigtable.data.row_filters import CellsRowLimitFilter from google.cloud.bigtable.data.row_filters import RowFilterChain -if TYPE_CHECKING: - from google.cloud.bigtable.data import RowKeySamples - from google.cloud.bigtable.data import ShardedQuery - -# used by read_rows_sharded to limit how many requests are attempted in parallel -_CONCURRENCY_LIMIT = 10 -# used to register instance data with the client for channel warming -_WarmedInstanceKey = namedtuple( - "_WarmedInstanceKey", ["instance_name", "table_name", "app_profile_id"] -) - - -class TABLE_DEFAULT(enum.Enum): - DEFAULT = "DEFAULT" - READ_ROWS = "READ_ROWS_DEFAULT" - MUTATE_ROWS = "MUTATE_ROWS_DEFAULT" +if TYPE_CHECKING: + from google.cloud.bigtable.data._types import RowKeySamples + from google.cloud.bigtable.data._types import ShardedQuery class BigtableDataClientAsync(ClientWithProject): @@ -1077,7 +1063,7 @@ async def check_and_mutate_row( Raises: - GoogleAPIError exceptions from grpc call """ - operation_timeout._ = _get_timeouts(operation_timeout, None, self) + operation_timeout, _ = _get_timeouts(operation_timeout, None, self) row_key = row_key.encode("utf-8") if isinstance(row_key, str) else row_key if true_case_mutations is not None and not isinstance( true_case_mutations, list @@ -1136,7 +1122,7 @@ async def read_modify_write_row( Raises: - GoogleAPIError exceptions from grpc call """ - operation_timeout._ = _get_timeouts(operation_timeout, None, self) + operation_timeout, _ = _get_timeouts(operation_timeout, None, self) row_key = row_key.encode("utf-8") if isinstance(row_key, str) else row_key if operation_timeout <= 0: raise ValueError("operation_timeout must be greater than 0") diff --git a/google/cloud/bigtable/data/_async/mutations_batcher.py b/google/cloud/bigtable/data/_async/mutations_batcher.py index 27daf6a27..7ff5f9a0b 100644 --- a/google/cloud/bigtable/data/_async/mutations_batcher.py +++ b/google/cloud/bigtable/data/_async/mutations_batcher.py @@ -14,7 +14,7 @@ # from __future__ import annotations -from typing import Any, Literal, TYPE_CHECKING +from typing import Any, TYPE_CHECKING import asyncio import atexit import warnings @@ -24,6 +24,7 @@ from google.cloud.bigtable.data.exceptions import MutationsExceptionGroup from google.cloud.bigtable.data.exceptions import FailedMutationEntryError from google.cloud.bigtable.data._helpers import _get_timeouts +from google.cloud.bigtable.data._helpers import TABLE_DEFAULT from google.cloud.bigtable.data._async._mutate_rows import _MutateRowsOperationAsync from google.cloud.bigtable.data._async._mutate_rows import ( @@ -189,8 +190,8 @@ def __init__( flush_limit_bytes: int = 20 * _MB_SIZE, flow_control_max_mutation_count: int = 100_000, flow_control_max_bytes: int = 100 * _MB_SIZE, - batch_operation_timeout: float | "_TABLE_DEFAULT" = "MUTATE_ROWS_DEFAULT", - batch_attempt_timeout: float | None | "_TABLE_DEFAULT" = "MUTATE_ROWS_DEFAULT", + batch_operation_timeout: float | TABLE_DEFAULT = TABLE_DEFAULT.MUTATE_ROWS, + batch_attempt_timeout: float | None | TABLE_DEFAULT = TABLE_DEFAULT.MUTATE_ROWS, ): """ Args: diff --git a/google/cloud/bigtable/data/_helpers.py b/google/cloud/bigtable/data/_helpers.py index e2effdbf1..4b426a830 100644 --- a/google/cloud/bigtable/data/_helpers.py +++ b/google/cloud/bigtable/data/_helpers.py @@ -13,8 +13,11 @@ # from __future__ import annotations -from typing import Callable, Literal, Any +from typing import Callable, List, Tuple, Any import time +import enum +from collections import namedtuple +from google.cloud.bigtable.data.read_rows_query import ReadRowsQuery from google.api_core import exceptions as core_exceptions from google.cloud.bigtable.data.exceptions import RetryExceptionGroup @@ -23,6 +26,30 @@ Helper functions used in various places in the library. """ +# Type alias for the output of sample_keys +RowKeySamples = List[Tuple[bytes, int]] + +# type alias for the output of query.shard() +ShardedQuery = List[ReadRowsQuery] + +# used by read_rows_sharded to limit how many requests are attempted in parallel +_CONCURRENCY_LIMIT = 10 + +# used to register instance data with the client for channel warming +_WarmedInstanceKey = namedtuple( + "_WarmedInstanceKey", ["instance_name", "table_name", "app_profile_id"] +) + + +# enum used on method calls when table defaults should be used +class TABLE_DEFAULT(enum.Enum): + # default for mutate_row, sample_row_keys, check_and_mutate_row, and read_modify_write_row + DEFAULT = "DEFAULT" + # default for read_rows, read_rows_stream, read_rows_sharded, row_exists, and read_row + READ_ROWS = "READ_ROWS_DEFAULT" + # default for bulk_mutate_rows and mutations_batcher + MUTATE_ROWS = "MUTATE_ROWS_DEFAULT" + def _make_metadata( table_name: str, app_profile_id: str | None @@ -115,15 +142,14 @@ def wrapper(*args, **kwargs): def _get_timeouts( - operation: float | "_TABLE_DEFAULT", attempt: float | None | "_TABLE_DEFAULT", table + operation: float | TABLE_DEFAULT, attempt: float | None | TABLE_DEFAULT, table ) -> tuple[float, float]: # TODO: docstring - # TODO: use enum for _TABLE_DEFAULT - if operation == "DEFAULT": + if operation == TABLE_DEFAULT.DEFAULT: final_operation = table.default_operation_timeout - elif operation == "READ_ROWS_DEFAULT": + elif operation == TABLE_DEFAULT.READ_ROWS: final_operation = table.default_read_rows_operation_timeout - elif operation == "MUTATE_ROWS_DEFAULT": + elif operation == TABLE_DEFAULT.MUTATE_ROWS: final_operation = table.default_mutate_rows_operation_timeout else: final_operation = operation From b7164e54be5dc47706c65ca83543e513f5e3ec7d Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Thu, 26 Oct 2023 15:04:43 -0700 Subject: [PATCH 07/13] added docstring --- google/cloud/bigtable/data/_helpers.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/google/cloud/bigtable/data/_helpers.py b/google/cloud/bigtable/data/_helpers.py index 4b426a830..4cc861278 100644 --- a/google/cloud/bigtable/data/_helpers.py +++ b/google/cloud/bigtable/data/_helpers.py @@ -144,7 +144,19 @@ def wrapper(*args, **kwargs): def _get_timeouts( operation: float | TABLE_DEFAULT, attempt: float | None | TABLE_DEFAULT, table ) -> tuple[float, float]: - # TODO: docstring + """ + Convert passed in timeout values to floats, using table defaults if necessary. + + Will call _validate_timeouts on the outputs, and raise ValueError if the + resulting timeouts are invalid. + + Args: + - operation: The timeout value to use for the entire operation, in seconds. + - attempt: The timeout value to use for each attempt, in seconds. + - table: The table to use for default values. + Returns: + - A tuple of (operation_timeout, attempt_timeout) + """ if operation == TABLE_DEFAULT.DEFAULT: final_operation = table.default_operation_timeout elif operation == TABLE_DEFAULT.READ_ROWS: From e8141e3f871037586088e38853c1eb1b0704bce8 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Thu, 26 Oct 2023 15:12:40 -0700 Subject: [PATCH 08/13] use enum values in attempt timeout calculation --- google/cloud/bigtable/data/_helpers.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/google/cloud/bigtable/data/_helpers.py b/google/cloud/bigtable/data/_helpers.py index 4cc861278..b77ae34b0 100644 --- a/google/cloud/bigtable/data/_helpers.py +++ b/google/cloud/bigtable/data/_helpers.py @@ -166,11 +166,11 @@ def _get_timeouts( else: final_operation = operation - if attempt == "DEFAULT": + if attempt == TABLE_DEFAULT.DEFAULT: attempt = table.default_attempt_timeout - elif attempt == "READ_ROWS_DEFAULT": + elif attempt == TABLE_DEFAULT.READ_ROWS: attempt = table.default_read_rows_attempt_timeout - elif attempt == "MUTATE_ROWS_DEFAULT": + elif attempt == TABLE_DEFAULT.MUTATE_ROWS: attempt = table.default_mutate_rows_attempt_timeout if attempt is None: From abb1bbe362fc625367ac9bddd0d5342479f523c9 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Thu, 26 Oct 2023 15:53:43 -0700 Subject: [PATCH 09/13] added tests --- google/cloud/bigtable/data/_helpers.py | 10 ++++-- tests/unit/data/test__helpers.py | 48 ++++++++++++++++++++++++++ 2 files changed, 56 insertions(+), 2 deletions(-) diff --git a/google/cloud/bigtable/data/_helpers.py b/google/cloud/bigtable/data/_helpers.py index b77ae34b0..1d56926ff 100644 --- a/google/cloud/bigtable/data/_helpers.py +++ b/google/cloud/bigtable/data/_helpers.py @@ -147,6 +147,8 @@ def _get_timeouts( """ Convert passed in timeout values to floats, using table defaults if necessary. + attempt will use operation value if None, or if larger than operation. + Will call _validate_timeouts on the outputs, and raise ValueError if the resulting timeouts are invalid. @@ -157,6 +159,7 @@ def _get_timeouts( Returns: - A tuple of (operation_timeout, attempt_timeout) """ + # load table defaults if necessary if operation == TABLE_DEFAULT.DEFAULT: final_operation = table.default_operation_timeout elif operation == TABLE_DEFAULT.READ_ROWS: @@ -165,7 +168,6 @@ def _get_timeouts( final_operation = table.default_mutate_rows_operation_timeout else: final_operation = operation - if attempt == TABLE_DEFAULT.DEFAULT: attempt = table.default_attempt_timeout elif attempt == TABLE_DEFAULT.READ_ROWS: @@ -174,9 +176,11 @@ def _get_timeouts( attempt = table.default_mutate_rows_attempt_timeout if attempt is None: + # no timeout specified, use operation timeout for both final_attempt = final_operation else: - final_attempt = attempt + # cap attempt timeout at operation timeout + final_attempt = min(attempt, final_operation) if final_operation else attempt _validate_timeouts(final_operation, final_attempt, allow_none=False) return final_operation, final_attempt @@ -196,6 +200,8 @@ def _validate_timeouts( Raises: - ValueError if operation_timeout or attempt_timeout are invalid. """ + if operation_timeout is None: + raise ValueError("operation_timeout cannot be None") if operation_timeout <= 0: raise ValueError("operation_timeout must be greater than 0") if not allow_none and attempt_timeout is None: diff --git a/tests/unit/data/test__helpers.py b/tests/unit/data/test__helpers.py index 08bc397c3..3b6867187 100644 --- a/tests/unit/data/test__helpers.py +++ b/tests/unit/data/test__helpers.py @@ -14,6 +14,7 @@ import pytest import google.cloud.bigtable.data._helpers as _helpers +from google.cloud.bigtable.data._helpers import TABLE_DEFAULT import google.cloud.bigtable.data.exceptions as bigtable_exceptions import mock @@ -199,3 +200,50 @@ def test_validate_with_inputs(self, args, expected): except ValueError: pass assert success == expected + +class TestGetTimeouts(): + @pytest.mark.parametrize("input_times,input_table,expected", + [ + ((2,1), {}, (2,1)), + ((2,4), {}, (2,2)), + ((2,None), {}, (2,2)), + ((TABLE_DEFAULT.DEFAULT, TABLE_DEFAULT.DEFAULT), {"operation": 3, "attempt": 2}, (3,2)), + ((TABLE_DEFAULT.READ_ROWS, TABLE_DEFAULT.READ_ROWS), {"read_rows_operation": 3, "read_rows_attempt": 2}, (3,2)), + ((TABLE_DEFAULT.MUTATE_ROWS, TABLE_DEFAULT.MUTATE_ROWS), {"mutate_rows_operation": 3, "mutate_rows_attempt": 2}, (3,2)), + ((10, TABLE_DEFAULT.DEFAULT), {"attempt":None}, (10, 10)), + ((10, TABLE_DEFAULT.DEFAULT), {"attempt": 5}, (10, 5)), + ((10, TABLE_DEFAULT.DEFAULT), {"attempt": 100}, (10, 10)), + ((TABLE_DEFAULT.DEFAULT, 10), {"operation":12}, (12, 10)), + ((TABLE_DEFAULT.DEFAULT, 10), {"operation":3}, (3, 3)), + ]) + def test_get_timeouts(self, input_times, input_table, expected): + """ + test input/output mappings for a variety of valid inputs + """ + fake_table = mock.Mock() + for key in input_table.keys(): + # set the default fields in our fake table mock + setattr(fake_table, f"default_{key}_timeout", input_table[key]) + t1, t2 = _helpers._get_timeouts(input_times[0], input_times[1], fake_table) + assert t1 == expected[0] + assert t2 == expected[1] + + @pytest.mark.parametrize("input_times,input_table", + [ + ([0, 1], {}), + ([1, 0], {}), + ([None, 1], {}), + ([TABLE_DEFAULT.DEFAULT, 1], {"operation": None}), + ([TABLE_DEFAULT.DEFAULT, 1], {"operation": 0}), + ([1, TABLE_DEFAULT.DEFAULT], {"attempt": 0}), + ]) + def test_get_timeouts_invalid(self, input_times, input_table): + """ + test with inputs that should raise error during validation step + """ + fake_table = mock.Mock() + for key in input_table.keys(): + # set the default fields in our fake table mock + setattr(fake_table, f"default_{key}_timeout", input_table[key]) + with pytest.raises(ValueError): + _helpers._get_timeouts(input_times[0], input_times[1], fake_table) From 140743e7a2bb529ee57f2c90cd6f49e82a836e44 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Thu, 26 Oct 2023 15:54:32 -0700 Subject: [PATCH 10/13] ran blacken --- tests/unit/data/test__helpers.py | 53 +++++++++++++++++++++----------- 1 file changed, 35 insertions(+), 18 deletions(-) diff --git a/tests/unit/data/test__helpers.py b/tests/unit/data/test__helpers.py index 3b6867187..6c11fa86a 100644 --- a/tests/unit/data/test__helpers.py +++ b/tests/unit/data/test__helpers.py @@ -201,21 +201,36 @@ def test_validate_with_inputs(self, args, expected): pass assert success == expected -class TestGetTimeouts(): - @pytest.mark.parametrize("input_times,input_table,expected", - [ - ((2,1), {}, (2,1)), - ((2,4), {}, (2,2)), - ((2,None), {}, (2,2)), - ((TABLE_DEFAULT.DEFAULT, TABLE_DEFAULT.DEFAULT), {"operation": 3, "attempt": 2}, (3,2)), - ((TABLE_DEFAULT.READ_ROWS, TABLE_DEFAULT.READ_ROWS), {"read_rows_operation": 3, "read_rows_attempt": 2}, (3,2)), - ((TABLE_DEFAULT.MUTATE_ROWS, TABLE_DEFAULT.MUTATE_ROWS), {"mutate_rows_operation": 3, "mutate_rows_attempt": 2}, (3,2)), - ((10, TABLE_DEFAULT.DEFAULT), {"attempt":None}, (10, 10)), - ((10, TABLE_DEFAULT.DEFAULT), {"attempt": 5}, (10, 5)), - ((10, TABLE_DEFAULT.DEFAULT), {"attempt": 100}, (10, 10)), - ((TABLE_DEFAULT.DEFAULT, 10), {"operation":12}, (12, 10)), - ((TABLE_DEFAULT.DEFAULT, 10), {"operation":3}, (3, 3)), - ]) + +class TestGetTimeouts: + @pytest.mark.parametrize( + "input_times,input_table,expected", + [ + ((2, 1), {}, (2, 1)), + ((2, 4), {}, (2, 2)), + ((2, None), {}, (2, 2)), + ( + (TABLE_DEFAULT.DEFAULT, TABLE_DEFAULT.DEFAULT), + {"operation": 3, "attempt": 2}, + (3, 2), + ), + ( + (TABLE_DEFAULT.READ_ROWS, TABLE_DEFAULT.READ_ROWS), + {"read_rows_operation": 3, "read_rows_attempt": 2}, + (3, 2), + ), + ( + (TABLE_DEFAULT.MUTATE_ROWS, TABLE_DEFAULT.MUTATE_ROWS), + {"mutate_rows_operation": 3, "mutate_rows_attempt": 2}, + (3, 2), + ), + ((10, TABLE_DEFAULT.DEFAULT), {"attempt": None}, (10, 10)), + ((10, TABLE_DEFAULT.DEFAULT), {"attempt": 5}, (10, 5)), + ((10, TABLE_DEFAULT.DEFAULT), {"attempt": 100}, (10, 10)), + ((TABLE_DEFAULT.DEFAULT, 10), {"operation": 12}, (12, 10)), + ((TABLE_DEFAULT.DEFAULT, 10), {"operation": 3}, (3, 3)), + ], + ) def test_get_timeouts(self, input_times, input_table, expected): """ test input/output mappings for a variety of valid inputs @@ -228,15 +243,17 @@ def test_get_timeouts(self, input_times, input_table, expected): assert t1 == expected[0] assert t2 == expected[1] - @pytest.mark.parametrize("input_times,input_table", - [ + @pytest.mark.parametrize( + "input_times,input_table", + [ ([0, 1], {}), ([1, 0], {}), ([None, 1], {}), ([TABLE_DEFAULT.DEFAULT, 1], {"operation": None}), ([TABLE_DEFAULT.DEFAULT, 1], {"operation": 0}), ([1, TABLE_DEFAULT.DEFAULT], {"attempt": 0}), - ]) + ], + ) def test_get_timeouts_invalid(self, input_times, input_table): """ test with inputs that should raise error during validation step From bec98db42d36fdaad9382f492e6cd138999258d4 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Thu, 26 Oct 2023 15:56:31 -0700 Subject: [PATCH 11/13] fixed mypy imports --- google/cloud/bigtable/data/_async/client.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/google/cloud/bigtable/data/_async/client.py b/google/cloud/bigtable/data/_async/client.py index bcdb85d12..c6637581c 100644 --- a/google/cloud/bigtable/data/_async/client.py +++ b/google/cloud/bigtable/data/_async/client.py @@ -76,8 +76,8 @@ if TYPE_CHECKING: - from google.cloud.bigtable.data._types import RowKeySamples - from google.cloud.bigtable.data._types import ShardedQuery + from google.cloud.bigtable.data._helpers import RowKeySamples + from google.cloud.bigtable.data._helpers import ShardedQuery class BigtableDataClientAsync(ClientWithProject): From 7334680b8139807a1889d0ad8cdb9735a9365efc Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Thu, 26 Oct 2023 16:04:10 -0700 Subject: [PATCH 12/13] pin api-core version in prerelease-deps --- noxfile.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/noxfile.py b/noxfile.py index 4b57e617f..c0da91d8a 100644 --- a/noxfile.py +++ b/noxfile.py @@ -460,7 +460,7 @@ def prerelease_deps(session): # Exclude version 1.52.0rc1 which has a known issue. See https://github.com/grpc/grpc/issues/32163 "grpcio!=1.52.0rc1", "grpcio-status", - "google-api-core", + "google-api-core==2.12.0.dev1", # TODO: remove this once streaming retries is merged "proto-plus", "google-cloud-testutils", # dependencies of google-cloud-testutils" From 71d62741610df934b98367a604141a70b9c45f15 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Thu, 26 Oct 2023 16:19:12 -0700 Subject: [PATCH 13/13] ran black --- noxfile.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/noxfile.py b/noxfile.py index c0da91d8a..e1d2f4acc 100644 --- a/noxfile.py +++ b/noxfile.py @@ -460,7 +460,7 @@ def prerelease_deps(session): # Exclude version 1.52.0rc1 which has a known issue. See https://github.com/grpc/grpc/issues/32163 "grpcio!=1.52.0rc1", "grpcio-status", - "google-api-core==2.12.0.dev1", # TODO: remove this once streaming retries is merged + "google-api-core==2.12.0.dev1", # TODO: remove this once streaming retries is merged "proto-plus", "google-cloud-testutils", # dependencies of google-cloud-testutils"