From 3537b11929b970a2a465f1a8e53b9064e4a2813c Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Thu, 16 May 2024 15:59:20 -0700 Subject: [PATCH 01/19] added docstrings to mutations --- google/cloud/bigtable/data/mutations.py | 149 +++++++++++++++++++++--- 1 file changed, 130 insertions(+), 19 deletions(-) diff --git a/google/cloud/bigtable/data/mutations.py b/google/cloud/bigtable/data/mutations.py index b5729d25e..ae5edbde4 100644 --- a/google/cloud/bigtable/data/mutations.py +++ b/google/cloud/bigtable/data/mutations.py @@ -33,36 +33,75 @@ class Mutation(ABC): - """Model class for mutations""" + """ + Abstract base class for mutations. + + This class defines the interface for different types of mutations that can be + applied to Bigtable rows. + """ @abstractmethod def _to_dict(self) -> dict[str, Any]: + """ + Convert the mutation to a dictionary representation. + + Returns: + dict[str, Any]: A dictionary representation of the mutation. + """ raise NotImplementedError def _to_pb(self) -> data_pb.Mutation: """ - Convert the mutation to protobuf + Convert the mutation to a protobuf representation. + + Returns: + Mutation: A protobuf representation of the mutation. """ return data_pb.Mutation(**self._to_dict()) def is_idempotent(self) -> bool: """ Check if the mutation is idempotent - If false, the mutation will not be retried + + Idempotent mutations can be safely retried on failure. + + Returns: + bool: True if the mutation is idempotent, False otherwise. """ return True def __str__(self) -> str: + """ + Return a string representation of the mutation. + + Returns: + str: A string representation of the mutation. + """ return str(self._to_dict()) def size(self) -> int: """ Get the size of the mutation in bytes + + Returns: + int: The size of the mutation in bytes. """ return getsizeof(self._to_dict()) @classmethod def _from_dict(cls, input_dict: dict[str, Any]) -> Mutation: + """ + Create a `Mutation` instance from a dictionary representation. + + Args: + input_dict (dict[str, Any]): A dictionary representation of the mutation. + + Returns: + Mutation: A Mutation instance created from the dictionary. + + Raises: + ValueError: If the input dictionary is invalid or does not represent a valid mutation type. + """ instance: Mutation | None = None try: if "set_cell" in input_dict: @@ -96,6 +135,24 @@ def _from_dict(cls, input_dict: dict[str, Any]) -> Mutation: class SetCell(Mutation): + """ + Mutation to set the value of a cell. + + Args: + family (str): The name of the column family to which the new cell belongs. + qualifier (bytes | str): The column qualifier of the new cell. + new_value (bytes | str | int): The value of the new cell. + timestamp_micros (int | None): The timestamp of the new cell. If `None`, + the current timestamp will be used. Timestamps will be sent with + millisecond precision. Extra precision will be truncated. If -1, the + server will assign a timestamp. Note that `SetCell` mutations with + server-side timestamps are non-idempotent operations and will not be retried. + + Raises: + TypeError: If `qualifier` is not `bytes` or `str`. + TypeError: If `new_value` is not `bytes`, `str`, or `int`. + ValueError: If `timestamp_micros` is less than `_SERVER_SIDE_TIMESTAMP`. + """ def __init__( self, family: str, @@ -103,18 +160,6 @@ def __init__( new_value: bytes | str | int, timestamp_micros: int | None = None, ): - """ - Mutation to set the value of a cell - - Args: - - family: The name of the column family to which the new cell belongs. - - qualifier: The column qualifier of the new cell. - - new_value: The value of the new cell. str or int input will be converted to bytes - - timestamp_micros: The timestamp of the new cell. If None, the current timestamp will be used. - Timestamps will be sent with milisecond-percision. Extra precision will be truncated. - If -1, the server will assign a timestamp. Note that SetCell mutations with server-side - timestamps are non-idempotent operations and will not be retried. - """ qualifier = qualifier.encode() if isinstance(qualifier, str) else qualifier if not isinstance(qualifier, bytes): raise TypeError("qualifier must be bytes or str") @@ -142,7 +187,6 @@ def __init__( self.timestamp_micros = timestamp_micros def _to_dict(self) -> dict[str, Any]: - """Convert the mutation to a dictionary representation""" return { "set_cell": { "family_name": self.family, @@ -153,12 +197,25 @@ def _to_dict(self) -> dict[str, Any]: } def is_idempotent(self) -> bool: - """Check if the mutation is idempotent""" return self.timestamp_micros != _SERVER_SIDE_TIMESTAMP @dataclass class DeleteRangeFromColumn(Mutation): + """ + Mutation to delete a range of cells from a column. + + Args: + family (str): The name of the column family. + qualifier (bytes): The column qualifier. + start_timestamp_micros (int | None): The start timestamp of the range to + delete. `None` represents 0. Defaults to `None`. + end_timestamp_micros (int | None): The end timestamp of the range to + delete. `None` represents infinity. Defaults to `None`. + + Raises: + ValueError: If `start_timestamp_micros` is greater than `end_timestamp_micros`. + """ family: str qualifier: bytes # None represents 0 @@ -191,6 +248,12 @@ def _to_dict(self) -> dict[str, Any]: @dataclass class DeleteAllFromFamily(Mutation): + """ + Mutation to delete all cells from a column family. + + Args: + family_to_delete (str): The name of the column family to delete. + """ family_to_delete: str def _to_dict(self) -> dict[str, Any]: @@ -203,6 +266,9 @@ def _to_dict(self) -> dict[str, Any]: @dataclass class DeleteAllFromRow(Mutation): + """ + Mutation to delete all cells from a row. + """ def _to_dict(self) -> dict[str, Any]: return { "delete_from_row": {}, @@ -210,6 +276,22 @@ def _to_dict(self) -> dict[str, Any]: class RowMutationEntry: + """ + A single entry in a `MutateRows` request. + + This class represents a set of mutations to apply to a specific row in a + Bigtable table. + + Args: + row_key (bytes | str): The key of the row to mutate. + mutations (Mutation | list[Mutation]): The mutation or list of mutations to apply + to the row. + + Raises: + ValueError: If `mutations` is empty or contains more than + `_MUTATE_ROWS_REQUEST_MUTATION_LIMIT` mutations. + """ + def __init__(self, row_key: bytes | str, mutations: Mutation | list[Mutation]): if isinstance(row_key, str): row_key = row_key.encode("utf-8") @@ -225,29 +307,58 @@ def __init__(self, row_key: bytes | str, mutations: Mutation | list[Mutation]): self.mutations = tuple(mutations) def _to_dict(self) -> dict[str, Any]: + """ + Convert the mutation entry to a dictionary representation. + + Returns: + dict[str, Any]: A dictionary representation of the mutation entry + """ return { "row_key": self.row_key, "mutations": [mutation._to_dict() for mutation in self.mutations], } def _to_pb(self) -> types_pb.MutateRowsRequest.Entry: + """ + Convert the mutation entry to a protobuf representation. + + Returns: + MutateRowsRequest.Entry: A protobuf representation of the mutation entry. + """ return types_pb.MutateRowsRequest.Entry( row_key=self.row_key, mutations=[mutation._to_pb() for mutation in self.mutations], ) def is_idempotent(self) -> bool: - """Check if the mutation is idempotent""" + """ + Check if all mutations in the entry are idempotent. + + Returns: + bool: True if all mutations in the entry are idempotent, False otherwise. + """ return all(mutation.is_idempotent() for mutation in self.mutations) def size(self) -> int: """ - Get the size of the mutation in bytes + Get the size of the mutation entry in bytes. + + Returns: + int: The size of the mutation entry in bytes. """ return getsizeof(self._to_dict()) @classmethod def _from_dict(cls, input_dict: dict[str, Any]) -> RowMutationEntry: + """ + Create a `RowMutationEntry` instance from a dictionary representation. + + Args: + input_dict (dict[str, Any]): A dictionary representation of the mutation entry. + + Returns: + RowMutationEntry: A RowMutationEntry instance created from the dictionary. + """ return RowMutationEntry( row_key=input_dict["row_key"], mutations=[ From 377d3782fe296f6d22232f6f3ed31a5c0d21dbfd Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Thu, 16 May 2024 16:10:39 -0700 Subject: [PATCH 02/19] removed - from docstrings in client --- google/cloud/bigtable/data/_async/client.py | 204 ++++++++++---------- 1 file changed, 102 insertions(+), 102 deletions(-) diff --git a/google/cloud/bigtable/data/_async/client.py b/google/cloud/bigtable/data/_async/client.py index e385ecde7..bb8b4ec0c 100644 --- a/google/cloud/bigtable/data/_async/client.py +++ b/google/cloud/bigtable/data/_async/client.py @@ -116,8 +116,8 @@ def __init__( Client options used to set user options on the client. API Endpoint should be set through client_options. Raises: - - RuntimeError if called outside of an async context (no running event loop) - - ValueError if pool_size is less than 1 + RuntimeError if called outside of an async context (no running event loop) + ValueError if pool_size is less than 1 """ # set up transport in registry transport_str = f"pooled_grpc_asyncio_{pool_size}" @@ -200,7 +200,7 @@ def _start_background_channel_refresh(self) -> None: """ Starts a background task to ping and warm each channel in the pool Raises: - - RuntimeError if not called in an asyncio event loop + RuntimeError if not called in an asyncio event loop """ if not self._channel_refresh_tasks and not self._emulator_host: # raise RuntimeError if there is no event loop @@ -234,10 +234,10 @@ async def _ping_and_warm_instances( Pings each Bigtable instance registered in `_active_instances` on the client Args: - - channel: grpc channel to warm - - instance_key: if provided, only warm the instance associated with the key + channel: grpc channel to warm + instance_key: if provided, only warm the instance associated with the key Returns: - - sequence of results or exceptions from the ping requests + sequence of results or exceptions from the ping requests """ instance_list = ( [instance_key] if instance_key is not None else self._active_instances @@ -312,7 +312,7 @@ async def _manage_channel( ) # subtract the time spent waiting for the channel to be replaced next_refresh = random.uniform(refresh_interval_min, refresh_interval_max) - next_sleep = next_refresh - (time.time() - start_timestamp) + next_sleep = next_refresh (time.time() - start_timestamp) async def _register_instance(self, instance_id: str, owner: TableAsync) -> None: """ @@ -323,10 +323,10 @@ async def _register_instance(self, instance_id: str, owner: TableAsync) -> None: Channels will not be refreshed unless at least one instance is registered Args: - - instance_id: id of the instance to register. - - owner: table that owns the instance. Owners will be tracked in - _instance_owners, and instances will only be unregistered when all - owners call _remove_instance_registration + instance_id: id of the instance to register. + owner: table that owns the instance. Owners will be tracked in + _instance_owners, and instances will only be unregistered when all + owners call _remove_instance_registration """ instance_name = self._gapic_client.instance_path(self.project, instance_id) instance_key = _WarmedInstanceKey( @@ -354,12 +354,12 @@ async def _remove_instance_registration( If instance_id is not registered, or is still in use by other tables, returns False Args: - - instance_id: id of the instance to remove - - owner: table that owns the instance. Owners will be tracked in + instance_id: id of the instance to remove + owner: table that owns the instance. Owners will be tracked in _instance_owners, and instances will only be unregistered when all owners call _remove_instance_registration Returns: - - True if instance was removed + True if instance was removed """ instance_name = self._gapic_client.instance_path(self.project, instance_id) instance_key = _WarmedInstanceKey( @@ -490,7 +490,7 @@ def __init__( encountered during all other operations. Defaults to 4 (DeadlineExceeded) and 14 (ServiceUnavailable) Raises: - - RuntimeError if called outside of an async context (no running event loop) + RuntimeError if called outside of an async context (no running event loop) """ # NOTE: any changes to the signature of this method should also be reflected # in client.get_table() @@ -564,24 +564,24 @@ async def read_rows_stream( retryable_errors list until operation_timeout is reached. Args: - - query: contains details about which rows to return - - operation_timeout: the time budget for the entire operation, in seconds. + query: contains details about which rows to return + operation_timeout: the time budget for the entire operation, in seconds. Failed requests will be retried within the budget. Defaults to the Table's default_read_rows_operation_timeout - - attempt_timeout: the time budget for an individual network request, in seconds. + attempt_timeout: the time budget for an individual network request, in seconds. If it takes longer than this time to complete, the request will be cancelled with a DeadlineExceeded exception, and a retry will be attempted. Defaults to the Table's default_read_rows_attempt_timeout. If None, defaults to operation_timeout. - - retryable_errors: a list of errors that will be retried if encountered. + retryable_errors: a list of errors that will be retried if encountered. Defaults to the Table's default_read_rows_retryable_errors Returns: - - an asynchronous iterator that yields rows returned by the query + an asynchronous iterator that yields rows returned by the query Raises: - - DeadlineExceeded: raised after operation timeout + DeadlineExceeded: raised after operation timeout will be chained with a RetryExceptionGroup containing GoogleAPIError exceptions from any retries that failed - - GoogleAPIError: raised if the request encounters an unrecoverable error + GoogleAPIError: raised if the request encounters an unrecoverable error """ operation_timeout, attempt_timeout = _get_timeouts( operation_timeout, attempt_timeout, self @@ -615,26 +615,26 @@ async def read_rows( retryable_errors list until operation_timeout is reached. Args: - - query: contains details about which rows to return - - operation_timeout: the time budget for the entire operation, in seconds. + query: contains details about which rows to return + operation_timeout: the time budget for the entire operation, in seconds. Failed requests will be retried within the budget. Defaults to the Table's default_read_rows_operation_timeout - - attempt_timeout: the time budget for an individual network request, in seconds. + attempt_timeout: the time budget for an individual network request, in seconds. If it takes longer than this time to complete, the request will be cancelled with a DeadlineExceeded exception, and a retry will be attempted. Defaults to the Table's default_read_rows_attempt_timeout. If None, defaults to operation_timeout. If None, defaults to the Table's default_read_rows_attempt_timeout, or the operation_timeout if that is also None. - - retryable_errors: a list of errors that will be retried if encountered. + retryable_errors: a list of errors that will be retried if encountered. Defaults to the Table's default_read_rows_retryable_errors. Returns: - - a list of Rows returned by the query + a list of Rows returned by the query Raises: - - DeadlineExceeded: raised after operation timeout + DeadlineExceeded: raised after operation timeout will be chained with a RetryExceptionGroup containing GoogleAPIError exceptions from any retries that failed - - GoogleAPIError: raised if the request encounters an unrecoverable error + GoogleAPIError: raised if the request encounters an unrecoverable error """ row_generator = await self.read_rows_stream( query, @@ -661,24 +661,24 @@ async def read_row( retryable_errors list until operation_timeout is reached. Args: - - query: contains details about which rows to return - - operation_timeout: the time budget for the entire operation, in seconds. + query: contains details about which rows to return + operation_timeout: the time budget for the entire operation, in seconds. Failed requests will be retried within the budget. Defaults to the Table's default_read_rows_operation_timeout - - attempt_timeout: the time budget for an individual network request, in seconds. + attempt_timeout: the time budget for an individual network request, in seconds. If it takes longer than this time to complete, the request will be cancelled with a DeadlineExceeded exception, and a retry will be attempted. Defaults to the Table's default_read_rows_attempt_timeout. If None, defaults to operation_timeout. - - retryable_errors: a list of errors that will be retried if encountered. + retryable_errors: a list of errors that will be retried if encountered. Defaults to the Table's default_read_rows_retryable_errors. Returns: - - a Row object if the row exists, otherwise None + a Row object if the row exists, otherwise None Raises: - - DeadlineExceeded: raised after operation timeout + DeadlineExceeded: raised after operation timeout will be chained with a RetryExceptionGroup containing GoogleAPIError exceptions from any retries that failed - - GoogleAPIError: raised if the request encounters an unrecoverable error + GoogleAPIError: raised if the request encounters an unrecoverable error """ if row_key is None: raise ValueError("row_key must be string or bytes") @@ -716,20 +716,20 @@ async def read_rows_sharded( ``` Args: - - sharded_query: a sharded query to execute - - operation_timeout: the time budget for the entire operation, in seconds. + sharded_query: a sharded query to execute + operation_timeout: the time budget for the entire operation, in seconds. Failed requests will be retried within the budget. Defaults to the Table's default_read_rows_operation_timeout - - attempt_timeout: the time budget for an individual network request, in seconds. + attempt_timeout: the time budget for an individual network request, in seconds. If it takes longer than this time to complete, the request will be cancelled with a DeadlineExceeded exception, and a retry will be attempted. Defaults to the Table's default_read_rows_attempt_timeout. If None, defaults to operation_timeout. - - retryable_errors: a list of errors that will be retried if encountered. + retryable_errors: a list of errors that will be retried if encountered. Defaults to the Table's default_read_rows_retryable_errors. Raises: - - ShardedReadRowsExceptionGroup: if any of the queries failed - - ValueError: if the query_list is empty + ShardedReadRowsExceptionGroup: if any of the queries failed + ValueError: if the query_list is empty """ if not sharded_query: raise ValueError("empty sharded_query") @@ -796,24 +796,24 @@ async def row_exists( uses the filters: chain(limit cells per row = 1, strip value) Args: - - row_key: the key of the row to check - - operation_timeout: the time budget for the entire operation, in seconds. + row_key: the key of the row to check + operation_timeout: the time budget for the entire operation, in seconds. Failed requests will be retried within the budget. Defaults to the Table's default_read_rows_operation_timeout - - attempt_timeout: the time budget for an individual network request, in seconds. + attempt_timeout: the time budget for an individual network request, in seconds. If it takes longer than this time to complete, the request will be cancelled with a DeadlineExceeded exception, and a retry will be attempted. Defaults to the Table's default_read_rows_attempt_timeout. If None, defaults to operation_timeout. - - retryable_errors: a list of errors that will be retried if encountered. + retryable_errors: a list of errors that will be retried if encountered. Defaults to the Table's default_read_rows_retryable_errors. Returns: - - a bool indicating whether the row exists + a bool indicating whether the row exists Raises: - - DeadlineExceeded: raised after operation timeout + DeadlineExceeded: raised after operation timeout will be chained with a RetryExceptionGroup containing GoogleAPIError exceptions from any retries that failed - - GoogleAPIError: raised if the request encounters an unrecoverable error + GoogleAPIError: raised if the request encounters an unrecoverable error """ if row_key is None: raise ValueError("row_key must be string or bytes") @@ -850,23 +850,23 @@ async def sample_row_keys( row_keys, along with offset positions in the table Args: - - operation_timeout: the time budget for the entire operation, in seconds. + operation_timeout: the time budget for the entire operation, in seconds. Failed requests will be retried within the budget.i Defaults to the Table's default_operation_timeout - - attempt_timeout: the time budget for an individual network request, in seconds. + attempt_timeout: the time budget for an individual network request, in seconds. If it takes longer than this time to complete, the request will be cancelled with a DeadlineExceeded exception, and a retry will be attempted. Defaults to the Table's default_attempt_timeout. If None, defaults to operation_timeout. - - retryable_errors: a list of errors that will be retried if encountered. + retryable_errors: a list of errors that will be retried if encountered. Defaults to the Table's default_retryable_errors. Returns: - - a set of RowKeySamples the delimit contiguous sections of the table + a set of RowKeySamples the delimit contiguous sections of the table Raises: - - DeadlineExceeded: raised after operation timeout + DeadlineExceeded: raised after operation timeout will be chained with a RetryExceptionGroup containing GoogleAPIError exceptions from any retries that failed - - GoogleAPIError: raised if the request encounters an unrecoverable error + GoogleAPIError: raised if the request encounters an unrecoverable error """ # prepare timeouts operation_timeout, attempt_timeout = _get_timeouts( @@ -922,22 +922,22 @@ def mutations_batcher( to avoid excess network calls Args: - - flush_interval: Automatically flush every flush_interval seconds. If None, + flush_interval: Automatically flush every flush_interval seconds. If None, a table default will be used - - flush_limit_mutation_count: Flush immediately after flush_limit_mutation_count + flush_limit_mutation_count: Flush immediately after flush_limit_mutation_count mutations are added across all entries. If None, this limit is ignored. - - flush_limit_bytes: Flush immediately after flush_limit_bytes bytes are added. - - flow_control_max_mutation_count: Maximum number of inflight mutations. - - flow_control_max_bytes: Maximum number of inflight bytes. - - batch_operation_timeout: timeout for each mutate_rows operation, in seconds. + flush_limit_bytes: Flush immediately after flush_limit_bytes bytes are added. + flow_control_max_mutation_count: Maximum number of inflight mutations. + flow_control_max_bytes: Maximum number of inflight bytes. + batch_operation_timeout: timeout for each mutate_rows operation, in seconds. Defaults to the Table's default_mutate_rows_operation_timeout - - batch_attempt_timeout: timeout for each individual request, in seconds. + batch_attempt_timeout: timeout for each individual request, in seconds. Defaults to the Table's default_mutate_rows_attempt_timeout. If None, defaults to batch_operation_timeout. - - batch_retryable_errors: a list of errors that will be retried if encountered. + batch_retryable_errors: a list of errors that will be retried if encountered. Defaults to the Table's default_mutate_rows_retryable_errors. Returns: - - a MutationsBatcherAsync context manager that can batch requests + a MutationsBatcherAsync context manager that can batch requests """ return MutationsBatcherAsync( self, @@ -971,26 +971,26 @@ async def mutate_row( retried on server failure. Non-idempotent operations will not. Args: - - row_key: the row to apply mutations to - - mutations: the set of mutations to apply to the row - - operation_timeout: the time budget for the entire operation, in seconds. - Failed requests will be retried within the budget. - Defaults to the Table's default_operation_timeout - - attempt_timeout: the time budget for an individual network request, in seconds. - If it takes longer than this time to complete, the request will be cancelled with - a DeadlineExceeded exception, and a retry will be attempted. - Defaults to the Table's default_attempt_timeout. - If None, defaults to operation_timeout. - - retryable_errors: a list of errors that will be retried if encountered. - Only idempotent mutations will be retried. Defaults to the Table's - default_retryable_errors. + row_key: the row to apply mutations to + mutations: the set of mutations to apply to the row + operation_timeout: the time budget for the entire operation, in seconds. + Failed requests will be retried within the budget. + Defaults to the Table's default_operation_timeout + attempt_timeout: the time budget for an individual network request, in seconds. + If it takes longer than this time to complete, the request will be cancelled with + a DeadlineExceeded exception, and a retry will be attempted. + Defaults to the Table's default_attempt_timeout. + If None, defaults to operation_timeout. + retryable_errors: a list of errors that will be retried if encountered. + Only idempotent mutations will be retried. Defaults to the Table's + default_retryable_errors. Raises: - - DeadlineExceeded: raised after operation timeout - will be chained with a RetryExceptionGroup containing all - GoogleAPIError exceptions from any retries that failed - - GoogleAPIError: raised on non-idempotent operations that cannot be - safely retried. - - ValueError if invalid arguments are provided + DeadlineExceeded: raised after operation timeout + will be chained with a RetryExceptionGroup containing all + GoogleAPIError exceptions from any retries that failed + GoogleAPIError: raised on non-idempotent operations that cannot be + safely retried. + ValueError if invalid arguments are provided """ operation_timeout, attempt_timeout = _get_timeouts( operation_timeout, attempt_timeout, self @@ -1051,23 +1051,23 @@ async def bulk_mutate_rows( raised exception group Args: - - mutation_entries: the batches of mutations to apply + mutation_entries: the batches of mutations to apply Each entry will be applied atomically, but entries will be applied in arbitrary order - - operation_timeout: the time budget for the entire operation, in seconds. + operation_timeout: the time budget for the entire operation, in seconds. Failed requests will be retried within the budget. Defaults to the Table's default_mutate_rows_operation_timeout - - attempt_timeout: the time budget for an individual network request, in seconds. + attempt_timeout: the time budget for an individual network request, in seconds. If it takes longer than this time to complete, the request will be cancelled with a DeadlineExceeded exception, and a retry will be attempted. Defaults to the Table's default_mutate_rows_attempt_timeout. If None, defaults to operation_timeout. - - retryable_errors: a list of errors that will be retried if encountered. + retryable_errors: a list of errors that will be retried if encountered. Defaults to the Table's default_mutate_rows_retryable_errors Raises: - - MutationsExceptionGroup if one or more mutations fails + MutationsExceptionGroup if one or more mutations fails Contains details about any failed entries in .exceptions - - ValueError if invalid arguments are provided + ValueError if invalid arguments are provided """ operation_timeout, attempt_timeout = _get_timeouts( operation_timeout, attempt_timeout, self @@ -1099,31 +1099,31 @@ async def check_and_mutate_row( Non-idempotent operation: will not be retried Args: - - row_key: the key of the row to mutate - - predicate: the filter to be applied to the contents of the specified row. + row_key: the key of the row to mutate + predicate: the filter to be applied to the contents of the specified row. Depending on whether or not any results are yielded, either true_case_mutations or false_case_mutations will be executed. If None, checks that the row contains any values at all. - - true_case_mutations: + true_case_mutations: Changes to be atomically applied to the specified row if predicate yields at least one cell when applied to row_key. Entries are applied in order, meaning that earlier mutations can be masked by later ones. Must contain at least one entry if false_case_mutations is empty, and at most 100000. - - false_case_mutations: + false_case_mutations: Changes to be atomically applied to the specified row if predicate_filter does not yield any cells when applied to row_key. Entries are applied in order, meaning that earlier mutations can be masked by later ones. Must contain at least one entry if `true_case_mutations` is empty, and at most 100000. - - operation_timeout: the time budget for the entire operation, in seconds. + operation_timeout: the time budget for the entire operation, in seconds. Failed requests will not be retried. Defaults to the Table's default_operation_timeout Returns: - - bool indicating whether the predicate was true or false + bool indicating whether the predicate was true or false Raises: - - GoogleAPIError exceptions from grpc call + GoogleAPIError exceptions from grpc call """ operation_timeout, _ = _get_timeouts(operation_timeout, None, self) if true_case_mutations is not None and not isinstance( @@ -1167,19 +1167,19 @@ async def read_modify_write_row( Non-idempotent operation: will not be retried Args: - - row_key: the key of the row to apply read/modify/write rules to - - rules: A rule or set of rules to apply to the row. + row_key: the key of the row to apply read/modify/write rules to + rules: A rule or set of rules to apply to the row. Rules are applied in order, meaning that earlier rules will affect the results of later ones. - - operation_timeout: the time budget for the entire operation, in seconds. + operation_timeout: the time budget for the entire operation, in seconds. Failed requests will not be retried. Defaults to the Table's default_operation_timeout. Returns: - - Row: containing cell data that was modified as part of the + Row: containing cell data that was modified as part of the operation Raises: - - GoogleAPIError exceptions from grpc call - - ValueError if invalid arguments are provided + GoogleAPIError exceptions from grpc call + ValueError if invalid arguments are provided """ operation_timeout, _ = _get_timeouts(operation_timeout, None, self) if operation_timeout <= 0: From 9df764387b766520b1ac3c2b43fac8cd1169ba6f Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Thu, 16 May 2024 16:20:41 -0700 Subject: [PATCH 03/19] fix raises docstrings --- google/cloud/bigtable/data/_async/client.py | 28 ++++++++++++--------- 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/google/cloud/bigtable/data/_async/client.py b/google/cloud/bigtable/data/_async/client.py index bb8b4ec0c..64affacfe 100644 --- a/google/cloud/bigtable/data/_async/client.py +++ b/google/cloud/bigtable/data/_async/client.py @@ -116,8 +116,8 @@ def __init__( Client options used to set user options on the client. API Endpoint should be set through client_options. Raises: - RuntimeError if called outside of an async context (no running event loop) - ValueError if pool_size is less than 1 + RuntimeError: if called outside of an async context (no running event loop) + ValueError: if pool_size is less than 1 """ # set up transport in registry transport_str = f"pooled_grpc_asyncio_{pool_size}" @@ -199,8 +199,9 @@ def _client_version() -> str: def _start_background_channel_refresh(self) -> None: """ Starts a background task to ping and warm each channel in the pool + Raises: - RuntimeError if not called in an asyncio event loop + RuntimeError: if not called in an asyncio event loop """ if not self._channel_refresh_tasks and not self._emulator_host: # raise RuntimeError if there is no event loop @@ -408,6 +409,10 @@ def get_table(self, instance_id: str, table_id: str, *args, **kwargs) -> TableAs default_retryable_errors: a list of errors that will be retried if encountered during all other operations. Defaults to 4 (DeadlineExceeded) and 14 (ServiceUnavailable) + Returns: + TableAsync: a table instance for making data API requests + Raises + RuntimeError: if called outside of an async context (no running event loop) """ return TableAsync(self, instance_id, table_id, *args, **kwargs) @@ -490,7 +495,7 @@ def __init__( encountered during all other operations. Defaults to 4 (DeadlineExceeded) and 14 (ServiceUnavailable) Raises: - RuntimeError if called outside of an async context (no running event loop) + RuntimeError: if called outside of an async context (no running event loop) """ # NOTE: any changes to the signature of this method should also be reflected # in client.get_table() @@ -990,7 +995,7 @@ async def mutate_row( GoogleAPIError exceptions from any retries that failed GoogleAPIError: raised on non-idempotent operations that cannot be safely retried. - ValueError if invalid arguments are provided + ValueError: if invalid arguments are provided """ operation_timeout, attempt_timeout = _get_timeouts( operation_timeout, attempt_timeout, self @@ -1065,9 +1070,9 @@ async def bulk_mutate_rows( retryable_errors: a list of errors that will be retried if encountered. Defaults to the Table's default_mutate_rows_retryable_errors Raises: - MutationsExceptionGroup if one or more mutations fails + MutationsExceptionGroup: if one or more mutations fails Contains details about any failed entries in .exceptions - ValueError if invalid arguments are provided + ValueError: if invalid arguments are provided """ operation_timeout, attempt_timeout = _get_timeouts( operation_timeout, attempt_timeout, self @@ -1123,7 +1128,7 @@ async def check_and_mutate_row( Returns: bool indicating whether the predicate was true or false Raises: - GoogleAPIError exceptions from grpc call + GoogleAPIError: exceptions from grpc call """ operation_timeout, _ = _get_timeouts(operation_timeout, None, self) if true_case_mutations is not None and not isinstance( @@ -1175,11 +1180,10 @@ async def read_modify_write_row( Failed requests will not be retried. Defaults to the Table's default_operation_timeout. Returns: - Row: containing cell data that was modified as part of the - operation + Row: containing cell data that was modified as part of the operation Raises: - GoogleAPIError exceptions from grpc call - ValueError if invalid arguments are provided + GoogleAPIError: exceptions from grpc call + ValueError: if invalid arguments are provided """ operation_timeout, _ = _get_timeouts(operation_timeout, None, self) if operation_timeout <= 0: From 3ca9c9834b7e1c751c8402d86677dda2e9810067 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Thu, 16 May 2024 16:21:00 -0700 Subject: [PATCH 04/19] fixed indentation --- google/cloud/bigtable/data/_async/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/google/cloud/bigtable/data/_async/client.py b/google/cloud/bigtable/data/_async/client.py index 64affacfe..22ba1a9b2 100644 --- a/google/cloud/bigtable/data/_async/client.py +++ b/google/cloud/bigtable/data/_async/client.py @@ -986,7 +986,7 @@ async def mutate_row( a DeadlineExceeded exception, and a retry will be attempted. Defaults to the Table's default_attempt_timeout. If None, defaults to operation_timeout. - retryable_errors: a list of errors that will be retried if encountered. + retryable_errors: a list of errors that will be retried if encountered. Only idempotent mutations will be retried. Defaults to the Table's default_retryable_errors. Raises: From 31914c44dbd6b20707ba015eaaf9627bf153f690 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Thu, 16 May 2024 16:26:07 -0700 Subject: [PATCH 05/19] added missing : --- google/cloud/bigtable/data/_async/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/google/cloud/bigtable/data/_async/client.py b/google/cloud/bigtable/data/_async/client.py index 22ba1a9b2..c2363ac08 100644 --- a/google/cloud/bigtable/data/_async/client.py +++ b/google/cloud/bigtable/data/_async/client.py @@ -411,7 +411,7 @@ def get_table(self, instance_id: str, table_id: str, *args, **kwargs) -> TableAs Defaults to 4 (DeadlineExceeded) and 14 (ServiceUnavailable) Returns: TableAsync: a table instance for making data API requests - Raises + Raises: RuntimeError: if called outside of an async context (no running event loop) """ return TableAsync(self, instance_id, table_id, *args, **kwargs) From 687751c061d5552fec5106e929a3c06de3cc7af8 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Thu, 16 May 2024 16:33:02 -0700 Subject: [PATCH 06/19] use full path for exceptions --- google/cloud/bigtable/data/_async/client.py | 28 ++++++++++----------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/google/cloud/bigtable/data/_async/client.py b/google/cloud/bigtable/data/_async/client.py index c2363ac08..0ca41b55d 100644 --- a/google/cloud/bigtable/data/_async/client.py +++ b/google/cloud/bigtable/data/_async/client.py @@ -583,10 +583,10 @@ async def read_rows_stream( Returns: an asynchronous iterator that yields rows returned by the query Raises: - DeadlineExceeded: raised after operation timeout + google.api_core.exceptions.DeadlineExceeded: raised after operation timeout will be chained with a RetryExceptionGroup containing GoogleAPIError exceptions from any retries that failed - GoogleAPIError: raised if the request encounters an unrecoverable error + google.api_core.exceptions.GoogleAPIError: raised if the request encounters an unrecoverable error """ operation_timeout, attempt_timeout = _get_timeouts( operation_timeout, attempt_timeout, self @@ -636,10 +636,10 @@ async def read_rows( Returns: a list of Rows returned by the query Raises: - DeadlineExceeded: raised after operation timeout + google.api_core.exceptions.DeadlineExceeded: raised after operation timeout will be chained with a RetryExceptionGroup containing GoogleAPIError exceptions from any retries that failed - GoogleAPIError: raised if the request encounters an unrecoverable error + google.api_core.exceptions.GoogleAPIError: raised if the request encounters an unrecoverable error """ row_generator = await self.read_rows_stream( query, @@ -680,10 +680,10 @@ async def read_row( Returns: a Row object if the row exists, otherwise None Raises: - DeadlineExceeded: raised after operation timeout + google.api_core.exceptions.DeadlineExceeded: raised after operation timeout will be chained with a RetryExceptionGroup containing GoogleAPIError exceptions from any retries that failed - GoogleAPIError: raised if the request encounters an unrecoverable error + google.api_core.exceptions.GoogleAPIError: raised if the request encounters an unrecoverable error """ if row_key is None: raise ValueError("row_key must be string or bytes") @@ -815,10 +815,10 @@ async def row_exists( Returns: a bool indicating whether the row exists Raises: - DeadlineExceeded: raised after operation timeout + google.api_core.exceptions.DeadlineExceeded: raised after operation timeout will be chained with a RetryExceptionGroup containing GoogleAPIError exceptions from any retries that failed - GoogleAPIError: raised if the request encounters an unrecoverable error + google.api_core.exceptions.GoogleAPIError: raised if the request encounters an unrecoverable error """ if row_key is None: raise ValueError("row_key must be string or bytes") @@ -868,10 +868,10 @@ async def sample_row_keys( Returns: a set of RowKeySamples the delimit contiguous sections of the table Raises: - DeadlineExceeded: raised after operation timeout + google.api_core.exceptions.DeadlineExceeded: raised after operation timeout will be chained with a RetryExceptionGroup containing GoogleAPIError exceptions from any retries that failed - GoogleAPIError: raised if the request encounters an unrecoverable error + google.api_core.exceptions.GoogleAPIError: raised if the request encounters an unrecoverable error """ # prepare timeouts operation_timeout, attempt_timeout = _get_timeouts( @@ -990,10 +990,10 @@ async def mutate_row( Only idempotent mutations will be retried. Defaults to the Table's default_retryable_errors. Raises: - DeadlineExceeded: raised after operation timeout + google.api_core.exceptions.DeadlineExceeded: raised after operation timeout will be chained with a RetryExceptionGroup containing all GoogleAPIError exceptions from any retries that failed - GoogleAPIError: raised on non-idempotent operations that cannot be + google.api_core.exceptions.GoogleAPIError: raised on non-idempotent operations that cannot be safely retried. ValueError: if invalid arguments are provided """ @@ -1128,7 +1128,7 @@ async def check_and_mutate_row( Returns: bool indicating whether the predicate was true or false Raises: - GoogleAPIError: exceptions from grpc call + google.api_core.exceptions.GoogleAPIError: exceptions from grpc call """ operation_timeout, _ = _get_timeouts(operation_timeout, None, self) if true_case_mutations is not None and not isinstance( @@ -1182,7 +1182,7 @@ async def read_modify_write_row( Returns: Row: containing cell data that was modified as part of the operation Raises: - GoogleAPIError: exceptions from grpc call + google.api_core.exceptions.GoogleAPIError: exceptions from grpc call ValueError: if invalid arguments are provided """ operation_timeout, _ = _get_timeouts(operation_timeout, None, self) From 1240ab2c23d8e453fc0a074246837f0624bd01cd Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Thu, 16 May 2024 17:05:15 -0700 Subject: [PATCH 07/19] fixed return types in client --- google/cloud/bigtable/data/_async/client.py | 22 +++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/google/cloud/bigtable/data/_async/client.py b/google/cloud/bigtable/data/_async/client.py index 0ca41b55d..9461ee75a 100644 --- a/google/cloud/bigtable/data/_async/client.py +++ b/google/cloud/bigtable/data/_async/client.py @@ -238,7 +238,7 @@ async def _ping_and_warm_instances( channel: grpc channel to warm instance_key: if provided, only warm the instance associated with the key Returns: - sequence of results or exceptions from the ping requests + list[BaseException | None]: sequence of results or exceptions from the ping requests """ instance_list = ( [instance_key] if instance_key is not None else self._active_instances @@ -360,7 +360,7 @@ async def _remove_instance_registration( _instance_owners, and instances will only be unregistered when all owners call _remove_instance_registration Returns: - True if instance was removed + bool: True if instance was removed, else False """ instance_name = self._gapic_client.instance_path(self.project, instance_id) instance_key = _WarmedInstanceKey( @@ -581,7 +581,7 @@ async def read_rows_stream( retryable_errors: a list of errors that will be retried if encountered. Defaults to the Table's default_read_rows_retryable_errors Returns: - an asynchronous iterator that yields rows returned by the query + AsyncIterable[Row]: an asynchronous iterator that yields rows returned by the query Raises: google.api_core.exceptions.DeadlineExceeded: raised after operation timeout will be chained with a RetryExceptionGroup containing GoogleAPIError exceptions @@ -634,7 +634,7 @@ async def read_rows( retryable_errors: a list of errors that will be retried if encountered. Defaults to the Table's default_read_rows_retryable_errors. Returns: - a list of Rows returned by the query + list[Row]: a list of Rows returned by the query Raises: google.api_core.exceptions.DeadlineExceeded: raised after operation timeout will be chained with a RetryExceptionGroup containing GoogleAPIError exceptions @@ -678,7 +678,7 @@ async def read_row( retryable_errors: a list of errors that will be retried if encountered. Defaults to the Table's default_read_rows_retryable_errors. Returns: - a Row object if the row exists, otherwise None + Row | None: a Row object if the row exists, otherwise None Raises: google.api_core.exceptions.DeadlineExceeded: raised after operation timeout will be chained with a RetryExceptionGroup containing GoogleAPIError exceptions @@ -732,6 +732,8 @@ async def read_rows_sharded( If None, defaults to operation_timeout. retryable_errors: a list of errors that will be retried if encountered. Defaults to the Table's default_read_rows_retryable_errors. + Returns: + list[Row]: a list of Rows returned by the query Raises: ShardedReadRowsExceptionGroup: if any of the queries failed ValueError: if the query_list is empty @@ -813,7 +815,7 @@ async def row_exists( retryable_errors: a list of errors that will be retried if encountered. Defaults to the Table's default_read_rows_retryable_errors. Returns: - a bool indicating whether the row exists + bool: a bool indicating whether the row exists Raises: google.api_core.exceptions.DeadlineExceeded: raised after operation timeout will be chained with a RetryExceptionGroup containing GoogleAPIError exceptions @@ -852,7 +854,7 @@ async def sample_row_keys( requests will call sample_row_keys internally for this purpose when sharding is enabled RowKeySamples is simply a type alias for list[tuple[bytes, int]]; a list of - row_keys, along with offset positions in the table + row_keys, along with offset positions in the table Args: operation_timeout: the time budget for the entire operation, in seconds. @@ -866,7 +868,7 @@ async def sample_row_keys( retryable_errors: a list of errors that will be retried if encountered. Defaults to the Table's default_retryable_errors. Returns: - a set of RowKeySamples the delimit contiguous sections of the table + RowKeySamples: a set of RowKeySamples the delimit contiguous sections of the table Raises: google.api_core.exceptions.DeadlineExceeded: raised after operation timeout will be chained with a RetryExceptionGroup containing GoogleAPIError exceptions @@ -942,7 +944,7 @@ def mutations_batcher( batch_retryable_errors: a list of errors that will be retried if encountered. Defaults to the Table's default_mutate_rows_retryable_errors. Returns: - a MutationsBatcherAsync context manager that can batch requests + MutationsBatcherAsync: a MutationsBatcherAsync context manager that can batch requests """ return MutationsBatcherAsync( self, @@ -1180,7 +1182,7 @@ async def read_modify_write_row( Failed requests will not be retried. Defaults to the Table's default_operation_timeout. Returns: - Row: containing cell data that was modified as part of the operation + Row: a Row containing cell data that was modified as part of the operation Raises: google.api_core.exceptions.GoogleAPIError: exceptions from grpc call ValueError: if invalid arguments are provided From e9b2cfdcdb16b18f0246019572b5adeeb381c56e Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Thu, 16 May 2024 17:23:41 -0700 Subject: [PATCH 08/19] improved exceptions docs --- google/cloud/bigtable/data/exceptions.py | 40 ++++++++++++++---------- 1 file changed, 23 insertions(+), 17 deletions(-) diff --git a/google/cloud/bigtable/data/exceptions.py b/google/cloud/bigtable/data/exceptions.py index 3c73ec4e9..8d97640aa 100644 --- a/google/cloud/bigtable/data/exceptions.py +++ b/google/cloud/bigtable/data/exceptions.py @@ -142,10 +142,12 @@ def _format_message( Format a message for the exception group Args: - - excs: the exceptions in the group - - total_entries: the total number of entries attempted, successful or not - - exc_count: the number of exceptions associated with the request - if None, this will be len(excs) + excs: the exceptions in the group + total_entries: the total number of entries attempted, successful or not + exc_count: the number of exceptions associated with the request + if None, this will be len(excs) + Returns: + str: the formatted message """ exc_count = exc_count if exc_count is not None else len(excs) entry_str = "entry" if exc_count == 1 else "entries" @@ -156,10 +158,10 @@ def __init__( ): """ Args: - - excs: the exceptions in the group - - total_entries: the total number of entries attempted, successful or not - - message: the message for the exception group. If None, a default message - will be generated + excs: the exceptions in the group + total_entries: the total number of entries attempted, successful or not + message: the message for the exception group. If None, a default message + will be generated """ message = ( message @@ -174,9 +176,11 @@ def __new__( ): """ Args: - - excs: the exceptions in the group - - total_entries: the total number of entries attempted, successful or not - - message: the message for the exception group. If None, a default message + excs: the exceptions in the group + total_entries: the total number of entries attempted, successful or not + message: the message for the exception group. If None, a default message + Returns: + MutationsExceptionGroup: the new instance """ message = ( message if message is not None else cls._format_message(excs, total_entries) @@ -200,12 +204,14 @@ def from_truncated_lists( describe the number of exceptions that were truncated. Args: - - first_list: the set of oldest exceptions to add to the ExceptionGroup - - last_list: the set of newest exceptions to add to the ExceptionGroup - - total_excs: the total number of exceptions associated with the request - Should be len(first_list) + len(last_list) + number of dropped exceptions - in the middle - - entry_count: the total number of entries attempted, successful or not + first_list: the set of oldest exceptions to add to the ExceptionGroup + last_list: the set of newest exceptions to add to the ExceptionGroup + total_excs: the total number of exceptions associated with the request + Should be len(first_list) + len(last_list) + number of dropped exceptions + in the middle + entry_count: the total number of entries attempted, successful or not + Returns: + MutationsExceptionGroup: the new instance """ first_count, last_count = len(first_list), len(last_list) if first_count + last_count >= total_excs: From 33635ec7c64fd846bd8cb11c7da17fe31c0c2e28 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Thu, 16 May 2024 17:23:49 -0700 Subject: [PATCH 09/19] improved read modify write docs --- .../bigtable/data/read_modify_write_rules.py | 32 +++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/google/cloud/bigtable/data/read_modify_write_rules.py b/google/cloud/bigtable/data/read_modify_write_rules.py index f43dbe79f..be87d69d0 100644 --- a/google/cloud/bigtable/data/read_modify_write_rules.py +++ b/google/cloud/bigtable/data/read_modify_write_rules.py @@ -23,6 +23,9 @@ class ReadModifyWriteRule(abc.ABC): + """ + Abstract base class for read-modify-write rules. + """ def __init__(self, family: str, qualifier: bytes | str): qualifier = ( qualifier if isinstance(qualifier, bytes) else qualifier.encode("utf-8") @@ -39,6 +42,22 @@ def _to_pb(self) -> data_pb.ReadModifyWriteRule: class IncrementRule(ReadModifyWriteRule): + """ + Rule to increment a cell's value. + + Args: + family (str): + The family name of the cell to increment. + qualifier (bytes | str): + The qualifier of the cell to increment. + increment_amount (int): + The amount to increment the cell's value. Must be between -2**63 and 2**63 (64-bit signed int). + Raises: + TypeError: + If increment_amount is not an integer. + ValueError: + If increment_amount is not between -2**63 and 2**63 (64-bit signed int). + """ def __init__(self, family: str, qualifier: bytes | str, increment_amount: int = 1): if not isinstance(increment_amount, int): raise TypeError("increment_amount must be an integer") @@ -58,6 +77,19 @@ def _to_dict(self) -> dict[str, str | bytes | int]: class AppendValueRule(ReadModifyWriteRule): + """ + Rule to append a value to a cell's value. + + Args: + family (str): + The family name of the cell to append to. + qualifier (bytes | str): + The qualifier of the cell to append to. + append_value (bytes | str): + The value to append to the cell's value. + Raises: + TypeError: If append_value is not bytes or str. + """ def __init__(self, family: str, qualifier: bytes | str, append_value: bytes | str): append_value = ( append_value.encode("utf-8") From 8136631643a93f4c902b62892032e04cafbd5a64 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Thu, 16 May 2024 17:23:57 -0700 Subject: [PATCH 10/19] improved query docs --- google/cloud/bigtable/data/read_rows_query.py | 134 +++++++++++++----- 1 file changed, 99 insertions(+), 35 deletions(-) diff --git a/google/cloud/bigtable/data/read_rows_query.py b/google/cloud/bigtable/data/read_rows_query.py index 362f54c3e..5e414391c 100644 --- a/google/cloud/bigtable/data/read_rows_query.py +++ b/google/cloud/bigtable/data/read_rows_query.py @@ -44,15 +44,15 @@ def __init__( ): """ Args: - - start_key: The start key of the range. If empty, the range is unbounded on the left. - - end_key: The end key of the range. If empty, the range is unbounded on the right. - - start_is_inclusive: Whether the start key is inclusive. If None, the start key is + start_key: The start key of the range. If empty, the range is unbounded on the left. + end_key: The end key of the range. If empty, the range is unbounded on the right. + start_is_inclusive: Whether the start key is inclusive. If None, the start key is inclusive. - - end_is_inclusive: Whether the end key is inclusive. If None, the end key is not inclusive. + end_is_inclusive: Whether the end key is inclusive. If None, the end key is not inclusive. Raises: - - ValueError: if start_key is greater than end_key, or start_is_inclusive, - or end_is_inclusive is set when the corresponding key is None, - or start_key or end_key is not a string or bytes. + ValueError: if start_key is greater than end_key, or start_is_inclusive + ValueError: if end_is_inclusive is set when the corresponding key is None + ValueError: if start_key or end_key is not a string or bytes. """ # convert empty key inputs to None for consistency start_key = None if not start_key else start_key @@ -100,39 +100,69 @@ def start_key(self) -> bytes | None: def end_key(self) -> bytes | None: """ Returns the end key of the range. If None, the range is unbounded on the right. + + Returns: + bytes | None: The end key of the range, or None if the range is unbounded on the right. """ return self._pb.end_key_closed or self._pb.end_key_open or None @property def start_is_inclusive(self) -> bool: """ - Returns whether the range is inclusive of the start key. - Returns True if the range is unbounded on the left. + Indicates if the range is inclusive of the start key. + + If the range is unbounded on the left, this will return True. + + Returns: + bool: Whether the range is inclusive of the start key. """ return not bool(self._pb.start_key_open) @property def end_is_inclusive(self) -> bool: """ - Returns whether the range is inclusive of the end key. - Returns True if the range is unbounded on the right. + Indicates if the range is inclusive of the end key. + + If the range is unbounded on the right, this will return True. + + Returns: + bool: Whether the range is inclusive of the end key. """ return not bool(self._pb.end_key_open) def _to_pb(self) -> RowRangePB: - """Converts this object to a protobuf""" + """ + Converts this object to a protobuf + + Returns: + RowRangePB: The protobuf representation of this object + """ return self._pb @classmethod def _from_pb(cls, data: RowRangePB) -> RowRange: - """Creates a RowRange from a protobuf""" + """ + Creates a RowRange from a protobuf + + Args: + data (RowRangePB): The protobuf to convert + Returns: + RowRange: The converted RowRange + """ instance = cls() instance._pb = data return instance @classmethod def _from_dict(cls, data: dict[str, bytes | str]) -> RowRange: - """Creates a RowRange from a protobuf""" + """ + Creates a RowRange from a protobuf + + Args: + data (dict[str, bytes | str]): The dictionary to convert + Returns: + RowRange: The converted RowRange + """ formatted_data = { k: v.encode() if isinstance(v, str) else v for k, v in data.items() } @@ -144,6 +174,9 @@ def __bool__(self) -> bool: """ Empty RowRanges (representing a full table scan) are falsy, because they can be substituted with None. Non-empty RowRanges are truthy. + + Returns: + bool: True if the RowRange is not empty, False otherwise """ return bool( self._pb.start_key_closed @@ -160,7 +193,11 @@ def __eq__(self, other: Any) -> bool: def __str__(self) -> str: """ Represent range as a string, e.g. "[b'a', b'z)" + Unbounded start or end keys are represented as "-inf" or "+inf" + + Returns: + str: The string representation of the range """ left = "[" if self.start_is_inclusive else "(" right = "]" if self.end_is_inclusive else ")" @@ -199,12 +236,12 @@ def __init__( Create a new ReadRowsQuery Args: - - row_keys: row keys to include in the query + row_keys: row keys to include in the query a query can contain multiple keys, but ranges should be preferred - - row_ranges: ranges of rows to include in the query - - limit: the maximum number of rows to return. None or 0 means no limit + row_ranges: ranges of rows to include in the query + limit: the maximum number of rows to return. None or 0 means no limit default: None (no limit) - - row_filter: a RowFilter to apply to the query + row_filter: a RowFilter to apply to the query """ if row_keys is None: row_keys = [] @@ -223,14 +260,34 @@ def __init__( @property def row_keys(self) -> list[bytes]: + """ + Return the row keys in this query + + Returns: + list[bytes]: the row keys in this query + """ return list(self._row_set.row_keys) @property def row_ranges(self) -> list[RowRange]: + """ + Return the row ranges in this query + + Returns: + list[RowRange]: the row ranges in this query + """ return [RowRange._from_pb(r) for r in self._row_set.row_ranges] @property def limit(self) -> int | None: + """ + Return the maximum number of rows to return by this query + + None or 0 means no limit + + Returns: + int | None: the maximum number of rows to return by this query + """ return self._limit or None @limit.setter @@ -241,11 +298,9 @@ def limit(self, new_limit: int | None): None or 0 means no limit Args: - - new_limit: the new limit to apply to this query - Returns: - - a reference to this query for chaining + new_limit: the new limit to apply to this query Raises: - - ValueError if new_limit is < 0 + ValueError: if new_limit is < 0 """ if new_limit is not None and new_limit < 0: raise ValueError("limit must be >= 0") @@ -253,6 +308,12 @@ def limit(self, new_limit: int | None): @property def filter(self) -> RowFilter | None: + """ + Return the RowFilter applied to this query + + Returns: + RowFilter | None: the RowFilter applied to this query + """ return self._filter @filter.setter @@ -261,9 +322,7 @@ def filter(self, row_filter: RowFilter | None): Set a RowFilter to apply to this query Args: - - row_filter: a RowFilter to apply to this query - Returns: - - a reference to this query for chaining + row_filter: a RowFilter to apply to this query """ self._filter = row_filter @@ -274,11 +333,9 @@ def add_key(self, row_key: str | bytes): A query can contain multiple keys, but ranges should be preferred Args: - - row_key: a key to add to this query - Returns: - - a reference to this query for chaining + row_key: a key to add to this query Raises: - - ValueError if an input is not a string or bytes + ValueError: if an input is not a string or bytes """ if isinstance(row_key, str): row_key = row_key.encode() @@ -295,7 +352,7 @@ def add_range( Add a range of row keys to this query. Args: - - row_range: a range of row keys to add to this query + row_range: a range of row keys to add to this query """ if row_range not in self.row_ranges: self._row_set.row_ranges.append(row_range._pb) @@ -305,10 +362,12 @@ def shard(self, shard_keys: RowKeySamples) -> ShardedQuery: Split this query into multiple queries that can be evenly distributed across nodes and run in parallel + Args: + shard_keys: a list of row keys that define the boundaries of segments. Returns: - - a ShardedQuery that can be used in sharded_read_rows calls + ShardedQuery: a ShardedQuery that can be used in sharded_read_rows calls Raises: - - AttributeError if the query contains a limit + AttributeError: if the query contains a limit """ if self.limit is not None: raise AttributeError("Cannot shard query with a limit") @@ -357,11 +416,11 @@ def _shard_range( segments of the key-space, determined by split_points Args: - - orig_range: a row range to split - - split_points: a list of row keys that define the boundaries of segments. + orig_range: a row range to split + split_points: a list of row keys that define the boundaries of segments. each point represents the inclusive end of a segment Returns: - - a list of tuples, containing a segment index and a new sub-range. + list[tuple[int, RowRange]]: a list of tuples, containing a segment index and a new sub-range. """ # 1. find the index of the segment the start key belongs to if orig_range.start_key is None: @@ -446,6 +505,11 @@ def __eq__(self, other): RowRanges are equal if they have the same row keys, row ranges, filter and limit, or if they both represent a full scan with the same filter and limit + + Args: + other: the object to compare to + Returns: + bool: True if the objects are equal, False otherwise """ if not isinstance(other, ReadRowsQuery): return False From 6c78df0276eab6f7043278c143dbf03b4b4d2e7b Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Thu, 16 May 2024 17:38:54 -0700 Subject: [PATCH 11/19] improved row and cell docstrings --- google/cloud/bigtable/data/row.py | 108 +++++++++++++++++++++++++++--- 1 file changed, 99 insertions(+), 9 deletions(-) diff --git a/google/cloud/bigtable/data/row.py b/google/cloud/bigtable/data/row.py index 13019cbdd..f8e4f9fac 100644 --- a/google/cloud/bigtable/data/row.py +++ b/google/cloud/bigtable/data/row.py @@ -49,6 +49,10 @@ def __init__( Row objects are not intended to be created by users. They are returned by the Bigtable backend. + + Args: + key (bytes): Row key + cells (list[Cell]): List of cells in the row """ self.row_key = key self.cells: list[Cell] = cells @@ -65,6 +69,9 @@ def _index( Returns an index of cells associated with each family and qualifier. The index is lazily created when needed + + Returns: + OrderedDict: Index of cells """ if self._index_data is None: self._index_data = OrderedDict() @@ -81,6 +88,11 @@ def _from_pb(cls, row_pb: RowPB) -> Row: Row objects are not intended to be created by users. They are returned by the Bigtable backend. + + Args: + row_pb (RowPB): Protobuf representation of the row + Returns: + Row: Row object created from the protobuf representation """ row_key: bytes = row_pb.key cell_list: list[Cell] = [] @@ -112,6 +124,14 @@ def get_cells( Can also be accessed through indexing: cells = row["family", "qualifier"] cells = row["family"] + + Args: + family: family to filter cells by + qualifier: qualifier to filter cells by + Returns: + list[Cell]: List of cells in the row matching the filter + Raises: + ValueError: If family or qualifier is not found in the row """ if family is None: if qualifier is not None: @@ -137,6 +157,13 @@ def get_cells( def _get_all_from_family(self, family: str) -> Generator[Cell, None, None]: """ Returns all cells in the row for the family_id + + Args: + family: family to filter cells by + Yields: + Cell: cells in the row for the family_id + Raises: + ValueError: If family is not found in the row """ if family not in self._index: raise ValueError(f"Family '{family}' not found in row '{self.row_key!r}'") @@ -153,6 +180,9 @@ def __str__(self) -> str: (family='fam', qualifier=b'col'): [b'value', (+1 more),], (family='fam', qualifier=b'col2'): [b'other'], } + + Returns: + str: Human-readable string representation of the row """ output = ["{"] for family, qualifier in self._get_column_components(): @@ -201,6 +231,9 @@ def _to_dict(self) -> dict[str, Any]: def __iter__(self): """ Allow iterating over all cells in the row + + Returns: + Iterator: Iterator over the cells in the row """ return iter(self.cells) @@ -210,6 +243,11 @@ def __contains__(self, item): Works for both cells in the internal list, and `family` or `(family, qualifier)` pairs associated with the cells + + Args: + item: item to check for in the row + Returns: + bool: True if item is in the row, False otherwise """ if isinstance(item, _family_type): return item in self._index @@ -266,7 +304,10 @@ def __getitem__(self, index): def __len__(self): """ - Implements `len()` operator + Returns the number of cells in the row + + Returns: + int: Number of cells in the row """ return len(self.cells) @@ -275,12 +316,18 @@ def _get_column_components(self) -> list[tuple[str, bytes]]: Returns a list of (family, qualifier) pairs associated with the cells Pairs can be used for indexing + + Returns: + list[tuple[str, bytes]]: List of (family, qualifier) pairs """ return [(f, q) for f in self._index for q in self._index[f]] def __eq__(self, other): """ Implements `==` operator + + Returns: + bool: True if rows are equal, False otherwise """ # for performance reasons, check row metadata # before checking individual cells @@ -307,6 +354,9 @@ def __eq__(self, other): def __ne__(self, other) -> bool: """ Implements `!=` operator + + Returns: + bool: True if rows are not equal, False otherwise """ return not self == other @@ -319,6 +369,14 @@ class Cell: Does not represent all data contained in the cell, only data returned by a query. Expected to be read-only to users, and written by backend + + Args: + value: the byte string value of the cell + row_key: the row key of the cell + family: the family associated with the cell + qualifier: the column qualifier associated with the cell + timestamp_micros: the timestamp of the cell in microseconds + labels: the list of labels associated with the cell """ __slots__ = ( @@ -339,12 +397,8 @@ def __init__( timestamp_micros: int, labels: list[str] | None = None, ): - """ - Cell constructor - - Cell objects are not intended to be constructed by users. - They are returned by the Bigtable backend. - """ + # Cell objects are not intended to be constructed by users. + # They are returned by the Bigtable backend. self.value = value self.row_key = row_key self.family = family @@ -359,6 +413,9 @@ def __int__(self) -> int: Allows casting cell to int Interprets value as a 64-bit big-endian signed integer, as expected by ReadModifyWrite increment rule + + Returns: + int: Value of the cell as a 64-bit big-endian signed integer """ return int.from_bytes(self.value, byteorder="big", signed=True) @@ -368,6 +425,9 @@ def _to_dict(self) -> dict[str, Any]: proto format https://cloud.google.com/bigtable/docs/reference/data/rpc/google.bigtable.v2#cell + + Returns: + dict: Dictionary representation of the cell """ cell_dict: dict[str, Any] = { "value": self.value, @@ -381,12 +441,18 @@ def __str__(self) -> str: """ Allows casting cell to str Prints encoded byte string, same as printing value directly. + + Returns: + str: Encoded byte string of the value """ return str(self.value) def __repr__(self): """ Returns a string representation of the cell + + Returns: + str: String representation of the cell """ return f"Cell(value={self.value!r}, row_key={self.row_key!r}, family='{self.family}', qualifier={self.qualifier!r}, timestamp_micros={self.timestamp_micros}, labels={self.labels})" @@ -395,9 +461,16 @@ def __repr__(self): def __lt__(self, other) -> bool: """ Implements `<` operator + + Args: + other: Cell to compare with + Returns: + bool: True if this cell is less than the other cell, False otherwise + Raises: + NotImplemented: If other is not a Cell """ if not isinstance(other, Cell): - return NotImplemented + raise NotImplemented this_ordering = ( self.family, self.qualifier, @@ -417,9 +490,16 @@ def __lt__(self, other) -> bool: def __eq__(self, other) -> bool: """ Implements `==` operator + + Args: + other: Cell to compare with + Returns: + bool: True if cells are equal, False otherwise + Raises: + NotImplemented: If other is not a Cell """ if not isinstance(other, Cell): - return NotImplemented + raise NotImplemented return ( self.row_key == other.row_key and self.family == other.family @@ -433,12 +513,22 @@ def __eq__(self, other) -> bool: def __ne__(self, other) -> bool: """ Implements `!=` operator + + Args: + other: Cell to compare with + Returns: + bool: True if cells are not equal, False otherwise + Raises: + NotImplemented: If other is not a Cell """ return not self == other def __hash__(self): """ Implements `hash()` function to fingerprint cell + + Returns: + int: hash value of the cell """ return hash( ( From e339ccd10392a3d970ab88ee478481beffde8d62 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Fri, 17 May 2024 10:38:17 -0700 Subject: [PATCH 12/19] improved comments for batcher --- .../bigtable/data/_async/mutations_batcher.py | 146 ++++++++++-------- 1 file changed, 78 insertions(+), 68 deletions(-) diff --git a/google/cloud/bigtable/data/_async/mutations_batcher.py b/google/cloud/bigtable/data/_async/mutations_batcher.py index 5d5dd535e..af5e72570 100644 --- a/google/cloud/bigtable/data/_async/mutations_batcher.py +++ b/google/cloud/bigtable/data/_async/mutations_batcher.py @@ -50,6 +50,13 @@ class _FlowControlAsync: Flow limits are not hard limits. If a single mutation exceeds the configured limits, it will be allowed as a single batch when the capacity is available. + + Args: + max_mutation_count: maximum number of mutations to send in a single rpc. + This corresponds to individual mutations in a single RowMutationEntry. + max_mutation_bytes: maximum number of bytes to send in a single rpc. + Raises: + ValueError: if max_mutation_count or max_mutation_bytes is less than 0 """ def __init__( @@ -57,12 +64,6 @@ def __init__( max_mutation_count: int, max_mutation_bytes: int, ): - """ - Args: - - max_mutation_count: maximum number of mutations to send in a single rpc. - This corresponds to individual mutations in a single RowMutationEntry. - - max_mutation_bytes: maximum number of bytes to send in a single rpc. - """ self._max_mutation_count = max_mutation_count self._max_mutation_bytes = max_mutation_bytes if self._max_mutation_count < 1: @@ -82,10 +83,10 @@ def _has_capacity(self, additional_count: int, additional_size: int) -> bool: previous batches have completed. Args: - - additional_count: number of mutations in the pending entry - - additional_size: size of the pending entry + additional_count: number of mutations in the pending entry + additional_size: size of the pending entry Returns: - - True if there is capacity to send the pending entry, False otherwise + bool: True if there is capacity to send the pending entry, False otherwise """ # adjust limits to allow overly large mutations acceptable_size = max(self._max_mutation_bytes, additional_size) @@ -104,7 +105,7 @@ async def remove_from_flow( operation is complete. Args: - - mutations: mutation or list of mutations to remove from flow control + mutations: mutation or list of mutations to remove from flow control """ if not isinstance(mutations, list): mutations = [mutations] @@ -124,10 +125,11 @@ async def add_to_flow(self, mutations: RowMutationEntry | list[RowMutationEntry] will block until there is capacity available. Args: - - mutations: list mutations to break up into batches + mutations: list mutations to break up into batches Yields: - - list of mutations that have reserved space in the flow control. - Each batch contains at least one mutation. + list[RowMutationEntry]: + list of mutations that have reserved space in the flow control. + Each batch contains at least one mutation. """ if not isinstance(mutations, list): mutations = [mutations] @@ -171,17 +173,29 @@ class MutationsBatcherAsync: Runs mutate_row, mutate_rows, and check_and_mutate_row internally, combining to use as few network requests as required - Flushes: - - every flush_interval seconds - - after queue reaches flush_count in quantity - - after queue reaches flush_size_bytes in storage size - - when batcher is closed or destroyed - - async with table.mutations_batcher() as batcher: - for i in range(10): - batcher.add(row, mut) + Will automatically flush the batcher: + - every flush_interval seconds + - after queue size reaches flush_limit_mutation_count + - after queue reaches flush_limit_bytes + - when batcher is closed or destroyed + + Args: + table: Table to preform rpc calls + flush_interval: Automatically flush every flush_interval seconds. + If None, no time-based flushing is performed. + flush_limit_mutation_count: Flush immediately after flush_limit_mutation_count + mutations are added across all entries. If None, this limit is ignored. + flush_limit_bytes: Flush immediately after flush_limit_bytes bytes are added. + flow_control_max_mutation_count: Maximum number of inflight mutations. + flow_control_max_bytes: Maximum number of inflight bytes. + batch_operation_timeout: timeout for each mutate_rows operation, in seconds. + If TABLE_DEFAULT, defaults to the Table's default_mutate_rows_operation_timeout. + batch_attempt_timeout: timeout for each individual request, in seconds. + If TABLE_DEFAULT, defaults to the Table's default_mutate_rows_attempt_timeout. + If None, defaults to batch_operation_timeout. + batch_retryable_errors: a list of errors that will be retried if encountered. + Defaults to the Table's default_mutate_rows_retryable_errors. """ - def __init__( self, table: "TableAsync", @@ -196,24 +210,6 @@ def __init__( batch_retryable_errors: Sequence[type[Exception]] | TABLE_DEFAULT = TABLE_DEFAULT.MUTATE_ROWS, ): - """ - Args: - - table: Table to preform rpc calls - - flush_interval: Automatically flush every flush_interval seconds. - If None, no time-based flushing is performed. - - flush_limit_mutation_count: Flush immediately after flush_limit_mutation_count - mutations are added across all entries. If None, this limit is ignored. - - flush_limit_bytes: Flush immediately after flush_limit_bytes bytes are added. - - flow_control_max_mutation_count: Maximum number of inflight mutations. - - flow_control_max_bytes: Maximum number of inflight bytes. - - batch_operation_timeout: timeout for each mutate_rows operation, in seconds. - If TABLE_DEFAULT, defaults to the Table's default_mutate_rows_operation_timeout. - - batch_attempt_timeout: timeout for each individual request, in seconds. - If TABLE_DEFAULT, defaults to the Table's default_mutate_rows_attempt_timeout. - If None, defaults to batch_operation_timeout. - - batch_retryable_errors: a list of errors that will be retried if encountered. - Defaults to the Table's default_mutate_rows_retryable_errors. - """ self._operation_timeout, self._attempt_timeout = _get_timeouts( batch_operation_timeout, batch_attempt_timeout, table ) @@ -255,10 +251,10 @@ def _start_flush_timer(self, interval: float | None) -> asyncio.Future[None]: If interval is None, an empty future is returned Args: - - flush_interval: Automatically flush every flush_interval seconds. - If None, no time-based flushing is performed. + flush_interval: Automatically flush every flush_interval seconds. + If None, no time-based flushing is performed. Returns: - - asyncio.Future that represents the background task + asyncio.Future[None]: future representing the background task """ if interval is None or self.closed: empty_future: asyncio.Future[None] = asyncio.Future() @@ -282,14 +278,13 @@ async def append(self, mutation_entry: RowMutationEntry): """ Add a new set of mutations to the internal queue - TODO: return a future to track completion of this entry - Args: - - mutation_entry: new entry to add to flush queue + mutation_entry: new entry to add to flush queue Raises: - - RuntimeError if batcher is closed - - ValueError if an invalid mutation type is added + RuntimeError: if batcher is closed + ValueError: if an invalid mutation type is added """ + # TODO: return a future to track completion of this entry if self.closed: raise RuntimeError("Cannot append to closed MutationsBatcher") if isinstance(mutation_entry, Mutation): # type: ignore @@ -309,7 +304,13 @@ async def append(self, mutation_entry: RowMutationEntry): await asyncio.sleep(0) def _schedule_flush(self) -> asyncio.Future[None] | None: - """Update the flush task to include the latest staged entries""" + """ + Update the flush task to include the latest staged entries + + Returns: + asyncio.Future[None] | None: + future representing the background task, if started + """ if self._staged_entries: entries, self._staged_entries = self._staged_entries, [] self._staged_count, self._staged_bytes = 0, 0 @@ -324,7 +325,7 @@ async def _flush_internal(self, new_entries: list[RowMutationEntry]): Flushes a set of mutations to the server, and updates internal state Args: - - new_entries: list of RowMutationEntry objects to flush + new_entries list of RowMutationEntry objects to flush """ # flush new entries in_process_requests: list[asyncio.Future[list[FailedMutationEntryError]]] = [] @@ -344,12 +345,13 @@ async def _execute_mutate_rows( Helper to execute mutation operation on a batch Args: - - batch: list of RowMutationEntry objects to send to server - - timeout: timeout in seconds. Used as operation_timeout and attempt_timeout. - If not given, will use table defaults + batch: list of RowMutationEntry objects to send to server + timeout: timeout in seconds. Used as operation_timeout and attempt_timeout. + If not given, will use table defaults Returns: - - list of FailedMutationEntryError objects for mutations that failed. - FailedMutationEntryError objects will not contain index information + list[FailedMutationEntryError]: + list of FailedMutationEntryError objects for mutations that failed. + FailedMutationEntryError objects will not contain index information """ try: operation = _MutateRowsOperationAsync( @@ -376,6 +378,9 @@ def _add_exceptions(self, excs: list[Exception]): Add new list of exceptions to internal store. To avoid unbounded memory, the batcher will store the first and last _exception_list_limit exceptions, and discard any in between. + + Args: + excs: list of exceptions to add to the internal store """ self._exceptions_since_last_raise += len(excs) if excs and len(self._oldest_exceptions) < self._exception_list_limit: @@ -392,7 +397,7 @@ def _raise_exceptions(self): Raise any unreported exceptions from background flush operations Raises: - - MutationsExceptionGroup with all unreported exceptions + MutationsExceptionGroup: exception group with all unreported exceptions """ if self._oldest_exceptions or self._newest_exceptions: oldest, self._oldest_exceptions = self._oldest_exceptions, [] @@ -414,11 +419,15 @@ def _raise_exceptions(self): ) async def __aenter__(self): - """For context manager API""" + """Allow use of context manager API""" return self async def __aexit__(self, exc_type, exc, tb): - """For context manager API""" + """ + Allow use of context manager API. + + Flushes the batcher and cleans up resources. + """ await self.close() async def close(self): @@ -457,11 +466,11 @@ def _create_bg_task(func, *args, **kwargs) -> asyncio.Future[Any]: with different concurrency models. Args: - - func: function to execute in background task - - *args: positional arguments to pass to func - - **kwargs: keyword arguments to pass to func + func: function to execute in background task + *args: positional arguments to pass to func + **kwargs: keyword arguments to pass to func Returns: - - Future object representing the background task + asyncio.Future: Future object representing the background task """ return asyncio.create_task(func(*args, **kwargs)) @@ -474,12 +483,13 @@ async def _wait_for_batch_results( waits for them to complete, and returns a list of errors encountered. Args: - - *tasks: futures representing _execute_mutate_rows or _flush_internal tasks + *tasks: futures representing _execute_mutate_rows or _flush_internal tasks Returns: - - list of Exceptions encountered by any of the tasks. Errors are expected - to be FailedMutationEntryError, representing a failed mutation operation. - If a task fails with a different exception, it will be included in the - output list. Successful tasks will not be represented in the output list. + list[Exception]: + list of Exceptions encountered by any of the tasks. Errors are expected + to be FailedMutationEntryError, representing a failed mutation operation. + If a task fails with a different exception, it will be included in the + output list. Successful tasks will not be represented in the output list. """ if not tasks: return [] From 436755e75a05ac4f25fb7466786022d34f135f24 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Fri, 17 May 2024 10:51:24 -0700 Subject: [PATCH 13/19] improved docstrings for _helpers --- .../bigtable/data/_async/_mutate_rows.py | 29 +++++++------ .../cloud/bigtable/data/_async/_read_rows.py | 33 +++++++++++++-- google/cloud/bigtable/data/_helpers.py | 41 ++++++++++++------- 3 files changed, 69 insertions(+), 34 deletions(-) diff --git a/google/cloud/bigtable/data/_async/_mutate_rows.py b/google/cloud/bigtable/data/_async/_mutate_rows.py index 7d1144553..99b9944cd 100644 --- a/google/cloud/bigtable/data/_async/_mutate_rows.py +++ b/google/cloud/bigtable/data/_async/_mutate_rows.py @@ -56,6 +56,14 @@ class _MutateRowsOperationAsync: Errors are exposed as a MutationsExceptionGroup, which contains a list of exceptions organized by the related failed mutation entries. + + Args: + gapic_client: the client to use for the mutate_rows call + table: the table associated with the request + mutation_entries: a list of RowMutationEntry objects to send to the server + operation_timeout: the timeout to use for the entire operation, in seconds. + attempt_timeout: the timeout to use for each mutate_rows attempt, in seconds. + If not specified, the request will run until operation_timeout is reached. """ def __init__( @@ -67,15 +75,6 @@ def __init__( attempt_timeout: float | None, retryable_exceptions: Sequence[type[Exception]] = (), ): - """ - Args: - - gapic_client: the client to use for the mutate_rows call - - table: the table associated with the request - - mutation_entries: a list of RowMutationEntry objects to send to the server - - operation_timeout: the timeout to use for the entire operation, in seconds. - - attempt_timeout: the timeout to use for each mutate_rows attempt, in seconds. - If not specified, the request will run until operation_timeout is reached. - """ # check that mutations are within limits total_mutations = sum(len(entry.mutations) for entry in mutation_entries) if total_mutations > _MUTATE_ROWS_REQUEST_MUTATION_LIMIT: @@ -121,7 +120,7 @@ async def start(self): Start the operation, and run until completion Raises: - - MutationsExceptionGroup: if any mutations failed + MutationsExceptionGroup: if any mutations failed """ try: # trigger mutate_rows @@ -157,9 +156,9 @@ async def _run_attempt(self): Run a single attempt of the mutate_rows rpc. Raises: - - _MutateRowsIncomplete: if there are failed mutations eligible for - retry after the attempt is complete - - GoogleAPICallError: if the gapic rpc fails + _MutateRowsIncomplete: if there are failed mutations eligible for + retry after the attempt is complete + GoogleAPICallError: if the gapic rpc fails """ request_entries = [self.mutations[idx].proto for idx in self.remaining_indices] # track mutations in this request that have not been finalized yet @@ -213,8 +212,8 @@ def _handle_entry_error(self, idx: int, exc: Exception): retryable. Args: - - idx: the index of the mutation that failed - - exc: the exception to add to the list + idx: the index of the mutation that failed + exc: the exception to add to the list """ entry = self.mutations[idx].entry self.errors.setdefault(idx, []).append(exc) diff --git a/google/cloud/bigtable/data/_async/_read_rows.py b/google/cloud/bigtable/data/_async/_read_rows.py index 9e0fd78e1..7f6e8e507 100644 --- a/google/cloud/bigtable/data/_async/_read_rows.py +++ b/google/cloud/bigtable/data/_async/_read_rows.py @@ -59,6 +59,13 @@ class _ReadRowsOperationAsync: ReadRowsOperation(request, client) handles row merging logic end-to-end, including performing retries on stream errors. + + Args: + query: The query to execute + table: The table to send the request to + operation_timeout: The total time to allow for the operation, in seconds + attempt_timeout: The time to allow for each individual attempt, in seconds + retryable_exceptions: A list of exceptions that should trigger a retry """ __slots__ = ( @@ -104,6 +111,9 @@ def __init__( def start_operation(self) -> AsyncGenerator[Row, None]: """ Start the read_rows operation, retrying on retryable errors. + + Yields: + Row: The next row in the stream """ return retries.retry_target_stream_async( self._read_rows_attempt, @@ -119,6 +129,9 @@ def _read_rows_attempt(self) -> AsyncGenerator[Row, None]: This function is intended to be wrapped by retry logic, which will call this function until it succeeds or a non-retryable error is raised. + + Yields: + Row: The next row in the stream """ # revise request keys and ranges between attempts if self._last_yielded_row_key is not None: @@ -151,6 +164,11 @@ async def chunk_stream( ) -> AsyncGenerator[ReadRowsResponsePB.CellChunk, None]: """ process chunks out of raw read_rows stream + + Args: + stream: the raw read_rows stream from the gapic client + Yields: + ReadRowsResponsePB.CellChunk: the next chunk in the stream """ async for resp in await stream: # extract proto from proto-plus wrapper @@ -195,9 +213,14 @@ async def chunk_stream( @staticmethod async def merge_rows( chunks: AsyncGenerator[ReadRowsResponsePB.CellChunk, None] | None - ): + ) -> AsyncGenerator[Row, None]: """ Merge chunks into rows + + Args: + chunks: the chunk stream to merge + Yields: + Row: the next row in the stream """ if chunks is None: return @@ -311,10 +334,12 @@ def _revise_request_rowset( Revise the rows in the request to avoid ones we've already processed. Args: - - row_set: the row set from the request - - last_seen_row_key: the last row key encountered + row_set: the row set from the request + last_seen_row_key: the last row key encountered + Returns: + RowSetPB: the new rowset after adusting for the last seen key Raises: - - _RowSetComplete: if there are no rows left to process after the revision + _RowSetComplete: if there are no rows left to process after the revision """ # if user is doing a whole table scan, start a new one with the last seen key if row_set is None or (not row_set.row_ranges and row_set.row_keys is not None): diff --git a/google/cloud/bigtable/data/_helpers.py b/google/cloud/bigtable/data/_helpers.py index a0b13cbaf..a8fba9ef1 100644 --- a/google/cloud/bigtable/data/_helpers.py +++ b/google/cloud/bigtable/data/_helpers.py @@ -83,11 +83,11 @@ def _attempt_timeout_generator( at which point it will return the remaining time in the operation_timeout. Args: - - per_request_timeout: The timeout value to use for each request, in seconds. + per_request_timeout: The timeout value to use for each request, in seconds. If None, the operation_timeout will be used for each request. - - operation_timeout: The timeout value to use for the entire operationm in seconds. + operation_timeout: The timeout value to use for the entire operationm in seconds. Yields: - - The timeout value to use for the next request, in seonds + float: The timeout value to use for the next request, in seonds """ per_request_timeout = ( per_request_timeout if per_request_timeout is not None else operation_timeout @@ -106,12 +106,13 @@ def _retry_exception_factory( Build retry error based on exceptions encountered during operation Args: - - exc_list: list of exceptions encountered during operation - - is_timeout: whether the operation failed due to timeout - - timeout_val: the operation timeout value in seconds, for constructing + exc_list: list of exceptions encountered during operation + is_timeout: whether the operation failed due to timeout + timeout_val: the operation timeout value in seconds, for constructing the error message Returns: - - tuple of the exception to raise, and a cause exception if applicable + tuple[Exception, Exception|None]: + tuple of the exception to raise, and a cause exception if applicable """ if reason == RetryFailureReason.TIMEOUT: timeout_val_str = f"of {timeout_val:0.1f}s " if timeout_val is not None else "" @@ -144,11 +145,11 @@ def _get_timeouts( resulting timeouts are invalid. Args: - - operation: The timeout value to use for the entire operation, in seconds. - - attempt: The timeout value to use for each attempt, in seconds. - - table: The table to use for default values. + operation: The timeout value to use for the entire operation, in seconds. + attempt: The timeout value to use for each attempt, in seconds. + table: The table to use for default values. Returns: - - A tuple of (operation_timeout, attempt_timeout) + typle[float, float]: A tuple of (operation_timeout, attempt_timeout) """ # load table defaults if necessary if operation == TABLE_DEFAULT.DEFAULT: @@ -185,11 +186,11 @@ def _validate_timeouts( an exception if they are not. Args: - - operation_timeout: The timeout value to use for the entire operation, in seconds. - - attempt_timeout: The timeout value to use for each attempt, in seconds. - - allow_none: If True, attempt_timeout can be None. If False, None values will raise an exception. + operation_timeout: The timeout value to use for the entire operation, in seconds. + attempt_timeout: The timeout value to use for each attempt, in seconds. + allow_none: If True, attempt_timeout can be None. If False, None values will raise an exception. Raises: - - ValueError if operation_timeout or attempt_timeout are invalid. + ValueError: if operation_timeout or attempt_timeout are invalid. """ if operation_timeout is None: raise ValueError("operation_timeout cannot be None") @@ -206,6 +207,16 @@ def _get_retryable_errors( call_codes: Sequence["grpc.StatusCode" | int | type[Exception]] | TABLE_DEFAULT, table: "TableAsync", ) -> list[type[Exception]]: + """ + Convert passed in retryable error codes to a list of exception types. + + Args: + call_codes: The error codes to convert. Can be a list of grpc.StatusCode values, + int values, or Exception types, or a TABLE_DEFAULT value. + table: The table to use for default values. + Returns: + list[type[Exception]]: A list of exception types to retry on. + """ # load table defaults if necessary if call_codes == TABLE_DEFAULT.DEFAULT: call_codes = table.default_retryable_errors From 2e6378022b17caa60ca3f4c30e481f5b07c706b0 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Fri, 17 May 2024 13:58:22 -0700 Subject: [PATCH 14/19] fixed lint --- google/cloud/bigtable/data/_async/client.py | 2 +- google/cloud/bigtable/data/_async/mutations_batcher.py | 1 + google/cloud/bigtable/data/mutations.py | 6 +++++- google/cloud/bigtable/data/read_modify_write_rules.py | 3 +++ google/cloud/bigtable/data/row.py | 10 +++++----- 5 files changed, 15 insertions(+), 7 deletions(-) diff --git a/google/cloud/bigtable/data/_async/client.py b/google/cloud/bigtable/data/_async/client.py index 9461ee75a..d90358383 100644 --- a/google/cloud/bigtable/data/_async/client.py +++ b/google/cloud/bigtable/data/_async/client.py @@ -313,7 +313,7 @@ async def _manage_channel( ) # subtract the time spent waiting for the channel to be replaced next_refresh = random.uniform(refresh_interval_min, refresh_interval_max) - next_sleep = next_refresh (time.time() - start_timestamp) + next_sleep = next_refresh(time.time() - start_timestamp) async def _register_instance(self, instance_id: str, owner: TableAsync) -> None: """ diff --git a/google/cloud/bigtable/data/_async/mutations_batcher.py b/google/cloud/bigtable/data/_async/mutations_batcher.py index af5e72570..76d13f00b 100644 --- a/google/cloud/bigtable/data/_async/mutations_batcher.py +++ b/google/cloud/bigtable/data/_async/mutations_batcher.py @@ -196,6 +196,7 @@ class MutationsBatcherAsync: batch_retryable_errors: a list of errors that will be retried if encountered. Defaults to the Table's default_mutate_rows_retryable_errors. """ + def __init__( self, table: "TableAsync", diff --git a/google/cloud/bigtable/data/mutations.py b/google/cloud/bigtable/data/mutations.py index ae5edbde4..fd9b2c24e 100644 --- a/google/cloud/bigtable/data/mutations.py +++ b/google/cloud/bigtable/data/mutations.py @@ -153,6 +153,7 @@ class SetCell(Mutation): TypeError: If `new_value` is not `bytes`, `str`, or `int`. ValueError: If `timestamp_micros` is less than `_SERVER_SIDE_TIMESTAMP`. """ + def __init__( self, family: str, @@ -216,6 +217,7 @@ class DeleteRangeFromColumn(Mutation): Raises: ValueError: If `start_timestamp_micros` is greater than `end_timestamp_micros`. """ + family: str qualifier: bytes # None represents 0 @@ -254,6 +256,7 @@ class DeleteAllFromFamily(Mutation): Args: family_to_delete (str): The name of the column family to delete. """ + family_to_delete: str def _to_dict(self) -> dict[str, Any]: @@ -269,6 +272,7 @@ class DeleteAllFromRow(Mutation): """ Mutation to delete all cells from a row. """ + def _to_dict(self) -> dict[str, Any]: return { "delete_from_row": {}, @@ -288,7 +292,7 @@ class RowMutationEntry: to the row. Raises: - ValueError: If `mutations` is empty or contains more than + ValueError: If `mutations` is empty or contains more than `_MUTATE_ROWS_REQUEST_MUTATION_LIMIT` mutations. """ diff --git a/google/cloud/bigtable/data/read_modify_write_rules.py b/google/cloud/bigtable/data/read_modify_write_rules.py index be87d69d0..e2d3b9f4f 100644 --- a/google/cloud/bigtable/data/read_modify_write_rules.py +++ b/google/cloud/bigtable/data/read_modify_write_rules.py @@ -26,6 +26,7 @@ class ReadModifyWriteRule(abc.ABC): """ Abstract base class for read-modify-write rules. """ + def __init__(self, family: str, qualifier: bytes | str): qualifier = ( qualifier if isinstance(qualifier, bytes) else qualifier.encode("utf-8") @@ -58,6 +59,7 @@ class IncrementRule(ReadModifyWriteRule): ValueError: If increment_amount is not between -2**63 and 2**63 (64-bit signed int). """ + def __init__(self, family: str, qualifier: bytes | str, increment_amount: int = 1): if not isinstance(increment_amount, int): raise TypeError("increment_amount must be an integer") @@ -90,6 +92,7 @@ class AppendValueRule(ReadModifyWriteRule): Raises: TypeError: If append_value is not bytes or str. """ + def __init__(self, family: str, qualifier: bytes | str, append_value: bytes | str): append_value = ( append_value.encode("utf-8") diff --git a/google/cloud/bigtable/data/row.py b/google/cloud/bigtable/data/row.py index f8e4f9fac..420806164 100644 --- a/google/cloud/bigtable/data/row.py +++ b/google/cloud/bigtable/data/row.py @@ -467,10 +467,10 @@ def __lt__(self, other) -> bool: Returns: bool: True if this cell is less than the other cell, False otherwise Raises: - NotImplemented: If other is not a Cell + NotImplementedError: If other is not a Cell """ if not isinstance(other, Cell): - raise NotImplemented + raise NotImplementedError this_ordering = ( self.family, self.qualifier, @@ -496,10 +496,10 @@ def __eq__(self, other) -> bool: Returns: bool: True if cells are equal, False otherwise Raises: - NotImplemented: If other is not a Cell + NotImplementedError: If other is not a Cell """ if not isinstance(other, Cell): - raise NotImplemented + raise NotImplementedError return ( self.row_key == other.row_key and self.family == other.family @@ -519,7 +519,7 @@ def __ne__(self, other) -> bool: Returns: bool: True if cells are not equal, False otherwise Raises: - NotImplemented: If other is not a Cell + NotImplementedError: If other is not a Cell """ return not self == other From 90440c985364889a0b49828be784482f6cb5ece2 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Fri, 17 May 2024 14:00:26 -0700 Subject: [PATCH 15/19] fixed missing operator --- google/cloud/bigtable/data/_async/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/google/cloud/bigtable/data/_async/client.py b/google/cloud/bigtable/data/_async/client.py index d90358383..7d75fab00 100644 --- a/google/cloud/bigtable/data/_async/client.py +++ b/google/cloud/bigtable/data/_async/client.py @@ -313,7 +313,7 @@ async def _manage_channel( ) # subtract the time spent waiting for the channel to be replaced next_refresh = random.uniform(refresh_interval_min, refresh_interval_max) - next_sleep = next_refresh(time.time() - start_timestamp) + next_sleep = next_refresh - (time.time() - start_timestamp) async def _register_instance(self, instance_id: str, owner: TableAsync) -> None: """ From 456caddf3771eab35404d33507437eab82aa04a1 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Tue, 28 May 2024 16:00:08 -0700 Subject: [PATCH 16/19] remove NotImplementedError from equality checks --- google/cloud/bigtable/data/row.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/google/cloud/bigtable/data/row.py b/google/cloud/bigtable/data/row.py index 420806164..28f0260a9 100644 --- a/google/cloud/bigtable/data/row.py +++ b/google/cloud/bigtable/data/row.py @@ -495,11 +495,9 @@ def __eq__(self, other) -> bool: other: Cell to compare with Returns: bool: True if cells are equal, False otherwise - Raises: - NotImplementedError: If other is not a Cell """ if not isinstance(other, Cell): - raise NotImplementedError + return False return ( self.row_key == other.row_key and self.family == other.family @@ -518,8 +516,6 @@ def __ne__(self, other) -> bool: other: Cell to compare with Returns: bool: True if cells are not equal, False otherwise - Raises: - NotImplementedError: If other is not a Cell """ return not self == other From ddd184591867e3d1f989b8ab06bb0fbac62c7f86 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Tue, 28 May 2024 16:12:47 -0700 Subject: [PATCH 17/19] adjusting tests --- tests/unit/data/_async/test_client.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/unit/data/_async/test_client.py b/tests/unit/data/_async/test_client.py index a0019947d..7593572d8 100644 --- a/tests/unit/data/_async/test_client.py +++ b/tests/unit/data/_async/test_client.py @@ -1498,7 +1498,6 @@ async def test_read_rows_timeout(self, operation_timeout): "per_request_t, operation_t, expected_num", [ (0.05, 0.08, 2), - (0.05, 0.54, 11), (0.05, 0.14, 3), (0.05, 0.24, 5), ], From 5d76112815c7ae7abc48ea824c0eda25050aa67a Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Fri, 7 Jun 2024 13:58:52 -0700 Subject: [PATCH 18/19] fixed lint --- samples/quickstart/main_async_test.py | 3 ++- samples/snippets/deletes/deletes_async_test.py | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/samples/quickstart/main_async_test.py b/samples/quickstart/main_async_test.py index 26a09b1f1..841cfc180 100644 --- a/samples/quickstart/main_async_test.py +++ b/samples/quickstart/main_async_test.py @@ -16,7 +16,8 @@ from typing import AsyncGenerator from google.cloud.bigtable.data import BigtableDataClientAsync, SetCell -import pytest, pytest_asyncio +import pytest +import pytest_asyncio from main_async import main diff --git a/samples/snippets/deletes/deletes_async_test.py b/samples/snippets/deletes/deletes_async_test.py index 8770733b4..0fad6cfa5 100644 --- a/samples/snippets/deletes/deletes_async_test.py +++ b/samples/snippets/deletes/deletes_async_test.py @@ -18,7 +18,8 @@ from typing import AsyncGenerator from google.cloud._helpers import _microseconds_from_datetime -import pytest, pytest_asyncio +import pytest +import pytest_asyncio import deletes_snippets_async From 4b193389b0e20e82fe93efc7ea31d0acb79c2444 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Fri, 7 Jun 2024 14:04:00 -0700 Subject: [PATCH 19/19] fix sample imports --- samples/quickstart/requirements-test.txt | 1 + samples/quickstart/requirements.txt | 2 +- samples/snippets/deletes/deletes_async_test.py | 2 +- samples/snippets/deletes/requirements-test.txt | 1 + samples/snippets/deletes/requirements.txt | 2 +- 5 files changed, 5 insertions(+), 3 deletions(-) diff --git a/samples/quickstart/requirements-test.txt b/samples/quickstart/requirements-test.txt index cb87efc0f..5cb431d92 100644 --- a/samples/quickstart/requirements-test.txt +++ b/samples/quickstart/requirements-test.txt @@ -1 +1,2 @@ pytest==7.4.4 +pytest-asyncio diff --git a/samples/quickstart/requirements.txt b/samples/quickstart/requirements.txt index 6dc985893..835e1bc78 100644 --- a/samples/quickstart/requirements.txt +++ b/samples/quickstart/requirements.txt @@ -1 +1 @@ -google-cloud-bigtable==2.22.0 +google-cloud-bigtable==2.23.0 diff --git a/samples/snippets/deletes/deletes_async_test.py b/samples/snippets/deletes/deletes_async_test.py index 0fad6cfa5..b708bd52e 100644 --- a/samples/snippets/deletes/deletes_async_test.py +++ b/samples/snippets/deletes/deletes_async_test.py @@ -73,7 +73,7 @@ async def _populate_table(table_id): timestamp = datetime.datetime(2019, 5, 1) timestamp_minus_hr = timestamp - datetime.timedelta(hours=1) - async with (BigtableDataClientAsync(project=PROJECT) as client): + async with BigtableDataClientAsync(project=PROJECT) as client: async with client.get_table(BIGTABLE_INSTANCE, table_id) as table: async with table.mutations_batcher() as batcher: await batcher.append( diff --git a/samples/snippets/deletes/requirements-test.txt b/samples/snippets/deletes/requirements-test.txt index cb87efc0f..5cb431d92 100644 --- a/samples/snippets/deletes/requirements-test.txt +++ b/samples/snippets/deletes/requirements-test.txt @@ -1 +1,2 @@ pytest==7.4.4 +pytest-asyncio diff --git a/samples/snippets/deletes/requirements.txt b/samples/snippets/deletes/requirements.txt index 6dc985893..835e1bc78 100644 --- a/samples/snippets/deletes/requirements.txt +++ b/samples/snippets/deletes/requirements.txt @@ -1 +1 @@ -google-cloud-bigtable==2.22.0 +google-cloud-bigtable==2.23.0