diff --git a/google/cloud/bigtable/data/_async/_mutate_rows.py b/google/cloud/bigtable/data/_async/_mutate_rows.py index 7d1144553..99b9944cd 100644 --- a/google/cloud/bigtable/data/_async/_mutate_rows.py +++ b/google/cloud/bigtable/data/_async/_mutate_rows.py @@ -56,6 +56,14 @@ class _MutateRowsOperationAsync: Errors are exposed as a MutationsExceptionGroup, which contains a list of exceptions organized by the related failed mutation entries. + + Args: + gapic_client: the client to use for the mutate_rows call + table: the table associated with the request + mutation_entries: a list of RowMutationEntry objects to send to the server + operation_timeout: the timeout to use for the entire operation, in seconds. + attempt_timeout: the timeout to use for each mutate_rows attempt, in seconds. + If not specified, the request will run until operation_timeout is reached. """ def __init__( @@ -67,15 +75,6 @@ def __init__( attempt_timeout: float | None, retryable_exceptions: Sequence[type[Exception]] = (), ): - """ - Args: - - gapic_client: the client to use for the mutate_rows call - - table: the table associated with the request - - mutation_entries: a list of RowMutationEntry objects to send to the server - - operation_timeout: the timeout to use for the entire operation, in seconds. - - attempt_timeout: the timeout to use for each mutate_rows attempt, in seconds. - If not specified, the request will run until operation_timeout is reached. - """ # check that mutations are within limits total_mutations = sum(len(entry.mutations) for entry in mutation_entries) if total_mutations > _MUTATE_ROWS_REQUEST_MUTATION_LIMIT: @@ -121,7 +120,7 @@ async def start(self): Start the operation, and run until completion Raises: - - MutationsExceptionGroup: if any mutations failed + MutationsExceptionGroup: if any mutations failed """ try: # trigger mutate_rows @@ -157,9 +156,9 @@ async def _run_attempt(self): Run a single attempt of the mutate_rows rpc. Raises: - - _MutateRowsIncomplete: if there are failed mutations eligible for - retry after the attempt is complete - - GoogleAPICallError: if the gapic rpc fails + _MutateRowsIncomplete: if there are failed mutations eligible for + retry after the attempt is complete + GoogleAPICallError: if the gapic rpc fails """ request_entries = [self.mutations[idx].proto for idx in self.remaining_indices] # track mutations in this request that have not been finalized yet @@ -213,8 +212,8 @@ def _handle_entry_error(self, idx: int, exc: Exception): retryable. Args: - - idx: the index of the mutation that failed - - exc: the exception to add to the list + idx: the index of the mutation that failed + exc: the exception to add to the list """ entry = self.mutations[idx].entry self.errors.setdefault(idx, []).append(exc) diff --git a/google/cloud/bigtable/data/_async/_read_rows.py b/google/cloud/bigtable/data/_async/_read_rows.py index 9e0fd78e1..7f6e8e507 100644 --- a/google/cloud/bigtable/data/_async/_read_rows.py +++ b/google/cloud/bigtable/data/_async/_read_rows.py @@ -59,6 +59,13 @@ class _ReadRowsOperationAsync: ReadRowsOperation(request, client) handles row merging logic end-to-end, including performing retries on stream errors. + + Args: + query: The query to execute + table: The table to send the request to + operation_timeout: The total time to allow for the operation, in seconds + attempt_timeout: The time to allow for each individual attempt, in seconds + retryable_exceptions: A list of exceptions that should trigger a retry """ __slots__ = ( @@ -104,6 +111,9 @@ def __init__( def start_operation(self) -> AsyncGenerator[Row, None]: """ Start the read_rows operation, retrying on retryable errors. + + Yields: + Row: The next row in the stream """ return retries.retry_target_stream_async( self._read_rows_attempt, @@ -119,6 +129,9 @@ def _read_rows_attempt(self) -> AsyncGenerator[Row, None]: This function is intended to be wrapped by retry logic, which will call this function until it succeeds or a non-retryable error is raised. + + Yields: + Row: The next row in the stream """ # revise request keys and ranges between attempts if self._last_yielded_row_key is not None: @@ -151,6 +164,11 @@ async def chunk_stream( ) -> AsyncGenerator[ReadRowsResponsePB.CellChunk, None]: """ process chunks out of raw read_rows stream + + Args: + stream: the raw read_rows stream from the gapic client + Yields: + ReadRowsResponsePB.CellChunk: the next chunk in the stream """ async for resp in await stream: # extract proto from proto-plus wrapper @@ -195,9 +213,14 @@ async def chunk_stream( @staticmethod async def merge_rows( chunks: AsyncGenerator[ReadRowsResponsePB.CellChunk, None] | None - ): + ) -> AsyncGenerator[Row, None]: """ Merge chunks into rows + + Args: + chunks: the chunk stream to merge + Yields: + Row: the next row in the stream """ if chunks is None: return @@ -311,10 +334,12 @@ def _revise_request_rowset( Revise the rows in the request to avoid ones we've already processed. Args: - - row_set: the row set from the request - - last_seen_row_key: the last row key encountered + row_set: the row set from the request + last_seen_row_key: the last row key encountered + Returns: + RowSetPB: the new rowset after adusting for the last seen key Raises: - - _RowSetComplete: if there are no rows left to process after the revision + _RowSetComplete: if there are no rows left to process after the revision """ # if user is doing a whole table scan, start a new one with the last seen key if row_set is None or (not row_set.row_ranges and row_set.row_keys is not None): diff --git a/google/cloud/bigtable/data/_async/client.py b/google/cloud/bigtable/data/_async/client.py index e385ecde7..7d75fab00 100644 --- a/google/cloud/bigtable/data/_async/client.py +++ b/google/cloud/bigtable/data/_async/client.py @@ -116,8 +116,8 @@ def __init__( Client options used to set user options on the client. API Endpoint should be set through client_options. Raises: - - RuntimeError if called outside of an async context (no running event loop) - - ValueError if pool_size is less than 1 + RuntimeError: if called outside of an async context (no running event loop) + ValueError: if pool_size is less than 1 """ # set up transport in registry transport_str = f"pooled_grpc_asyncio_{pool_size}" @@ -199,8 +199,9 @@ def _client_version() -> str: def _start_background_channel_refresh(self) -> None: """ Starts a background task to ping and warm each channel in the pool + Raises: - - RuntimeError if not called in an asyncio event loop + RuntimeError: if not called in an asyncio event loop """ if not self._channel_refresh_tasks and not self._emulator_host: # raise RuntimeError if there is no event loop @@ -234,10 +235,10 @@ async def _ping_and_warm_instances( Pings each Bigtable instance registered in `_active_instances` on the client Args: - - channel: grpc channel to warm - - instance_key: if provided, only warm the instance associated with the key + channel: grpc channel to warm + instance_key: if provided, only warm the instance associated with the key Returns: - - sequence of results or exceptions from the ping requests + list[BaseException | None]: sequence of results or exceptions from the ping requests """ instance_list = ( [instance_key] if instance_key is not None else self._active_instances @@ -323,10 +324,10 @@ async def _register_instance(self, instance_id: str, owner: TableAsync) -> None: Channels will not be refreshed unless at least one instance is registered Args: - - instance_id: id of the instance to register. - - owner: table that owns the instance. Owners will be tracked in - _instance_owners, and instances will only be unregistered when all - owners call _remove_instance_registration + instance_id: id of the instance to register. + owner: table that owns the instance. Owners will be tracked in + _instance_owners, and instances will only be unregistered when all + owners call _remove_instance_registration """ instance_name = self._gapic_client.instance_path(self.project, instance_id) instance_key = _WarmedInstanceKey( @@ -354,12 +355,12 @@ async def _remove_instance_registration( If instance_id is not registered, or is still in use by other tables, returns False Args: - - instance_id: id of the instance to remove - - owner: table that owns the instance. Owners will be tracked in + instance_id: id of the instance to remove + owner: table that owns the instance. Owners will be tracked in _instance_owners, and instances will only be unregistered when all owners call _remove_instance_registration Returns: - - True if instance was removed + bool: True if instance was removed, else False """ instance_name = self._gapic_client.instance_path(self.project, instance_id) instance_key = _WarmedInstanceKey( @@ -408,6 +409,10 @@ def get_table(self, instance_id: str, table_id: str, *args, **kwargs) -> TableAs default_retryable_errors: a list of errors that will be retried if encountered during all other operations. Defaults to 4 (DeadlineExceeded) and 14 (ServiceUnavailable) + Returns: + TableAsync: a table instance for making data API requests + Raises: + RuntimeError: if called outside of an async context (no running event loop) """ return TableAsync(self, instance_id, table_id, *args, **kwargs) @@ -490,7 +495,7 @@ def __init__( encountered during all other operations. Defaults to 4 (DeadlineExceeded) and 14 (ServiceUnavailable) Raises: - - RuntimeError if called outside of an async context (no running event loop) + RuntimeError: if called outside of an async context (no running event loop) """ # NOTE: any changes to the signature of this method should also be reflected # in client.get_table() @@ -564,24 +569,24 @@ async def read_rows_stream( retryable_errors list until operation_timeout is reached. Args: - - query: contains details about which rows to return - - operation_timeout: the time budget for the entire operation, in seconds. + query: contains details about which rows to return + operation_timeout: the time budget for the entire operation, in seconds. Failed requests will be retried within the budget. Defaults to the Table's default_read_rows_operation_timeout - - attempt_timeout: the time budget for an individual network request, in seconds. + attempt_timeout: the time budget for an individual network request, in seconds. If it takes longer than this time to complete, the request will be cancelled with a DeadlineExceeded exception, and a retry will be attempted. Defaults to the Table's default_read_rows_attempt_timeout. If None, defaults to operation_timeout. - - retryable_errors: a list of errors that will be retried if encountered. + retryable_errors: a list of errors that will be retried if encountered. Defaults to the Table's default_read_rows_retryable_errors Returns: - - an asynchronous iterator that yields rows returned by the query + AsyncIterable[Row]: an asynchronous iterator that yields rows returned by the query Raises: - - DeadlineExceeded: raised after operation timeout + google.api_core.exceptions.DeadlineExceeded: raised after operation timeout will be chained with a RetryExceptionGroup containing GoogleAPIError exceptions from any retries that failed - - GoogleAPIError: raised if the request encounters an unrecoverable error + google.api_core.exceptions.GoogleAPIError: raised if the request encounters an unrecoverable error """ operation_timeout, attempt_timeout = _get_timeouts( operation_timeout, attempt_timeout, self @@ -615,26 +620,26 @@ async def read_rows( retryable_errors list until operation_timeout is reached. Args: - - query: contains details about which rows to return - - operation_timeout: the time budget for the entire operation, in seconds. + query: contains details about which rows to return + operation_timeout: the time budget for the entire operation, in seconds. Failed requests will be retried within the budget. Defaults to the Table's default_read_rows_operation_timeout - - attempt_timeout: the time budget for an individual network request, in seconds. + attempt_timeout: the time budget for an individual network request, in seconds. If it takes longer than this time to complete, the request will be cancelled with a DeadlineExceeded exception, and a retry will be attempted. Defaults to the Table's default_read_rows_attempt_timeout. If None, defaults to operation_timeout. If None, defaults to the Table's default_read_rows_attempt_timeout, or the operation_timeout if that is also None. - - retryable_errors: a list of errors that will be retried if encountered. + retryable_errors: a list of errors that will be retried if encountered. Defaults to the Table's default_read_rows_retryable_errors. Returns: - - a list of Rows returned by the query + list[Row]: a list of Rows returned by the query Raises: - - DeadlineExceeded: raised after operation timeout + google.api_core.exceptions.DeadlineExceeded: raised after operation timeout will be chained with a RetryExceptionGroup containing GoogleAPIError exceptions from any retries that failed - - GoogleAPIError: raised if the request encounters an unrecoverable error + google.api_core.exceptions.GoogleAPIError: raised if the request encounters an unrecoverable error """ row_generator = await self.read_rows_stream( query, @@ -661,24 +666,24 @@ async def read_row( retryable_errors list until operation_timeout is reached. Args: - - query: contains details about which rows to return - - operation_timeout: the time budget for the entire operation, in seconds. + query: contains details about which rows to return + operation_timeout: the time budget for the entire operation, in seconds. Failed requests will be retried within the budget. Defaults to the Table's default_read_rows_operation_timeout - - attempt_timeout: the time budget for an individual network request, in seconds. + attempt_timeout: the time budget for an individual network request, in seconds. If it takes longer than this time to complete, the request will be cancelled with a DeadlineExceeded exception, and a retry will be attempted. Defaults to the Table's default_read_rows_attempt_timeout. If None, defaults to operation_timeout. - - retryable_errors: a list of errors that will be retried if encountered. + retryable_errors: a list of errors that will be retried if encountered. Defaults to the Table's default_read_rows_retryable_errors. Returns: - - a Row object if the row exists, otherwise None + Row | None: a Row object if the row exists, otherwise None Raises: - - DeadlineExceeded: raised after operation timeout + google.api_core.exceptions.DeadlineExceeded: raised after operation timeout will be chained with a RetryExceptionGroup containing GoogleAPIError exceptions from any retries that failed - - GoogleAPIError: raised if the request encounters an unrecoverable error + google.api_core.exceptions.GoogleAPIError: raised if the request encounters an unrecoverable error """ if row_key is None: raise ValueError("row_key must be string or bytes") @@ -716,20 +721,22 @@ async def read_rows_sharded( ``` Args: - - sharded_query: a sharded query to execute - - operation_timeout: the time budget for the entire operation, in seconds. + sharded_query: a sharded query to execute + operation_timeout: the time budget for the entire operation, in seconds. Failed requests will be retried within the budget. Defaults to the Table's default_read_rows_operation_timeout - - attempt_timeout: the time budget for an individual network request, in seconds. + attempt_timeout: the time budget for an individual network request, in seconds. If it takes longer than this time to complete, the request will be cancelled with a DeadlineExceeded exception, and a retry will be attempted. Defaults to the Table's default_read_rows_attempt_timeout. If None, defaults to operation_timeout. - - retryable_errors: a list of errors that will be retried if encountered. + retryable_errors: a list of errors that will be retried if encountered. Defaults to the Table's default_read_rows_retryable_errors. + Returns: + list[Row]: a list of Rows returned by the query Raises: - - ShardedReadRowsExceptionGroup: if any of the queries failed - - ValueError: if the query_list is empty + ShardedReadRowsExceptionGroup: if any of the queries failed + ValueError: if the query_list is empty """ if not sharded_query: raise ValueError("empty sharded_query") @@ -796,24 +803,24 @@ async def row_exists( uses the filters: chain(limit cells per row = 1, strip value) Args: - - row_key: the key of the row to check - - operation_timeout: the time budget for the entire operation, in seconds. + row_key: the key of the row to check + operation_timeout: the time budget for the entire operation, in seconds. Failed requests will be retried within the budget. Defaults to the Table's default_read_rows_operation_timeout - - attempt_timeout: the time budget for an individual network request, in seconds. + attempt_timeout: the time budget for an individual network request, in seconds. If it takes longer than this time to complete, the request will be cancelled with a DeadlineExceeded exception, and a retry will be attempted. Defaults to the Table's default_read_rows_attempt_timeout. If None, defaults to operation_timeout. - - retryable_errors: a list of errors that will be retried if encountered. + retryable_errors: a list of errors that will be retried if encountered. Defaults to the Table's default_read_rows_retryable_errors. Returns: - - a bool indicating whether the row exists + bool: a bool indicating whether the row exists Raises: - - DeadlineExceeded: raised after operation timeout + google.api_core.exceptions.DeadlineExceeded: raised after operation timeout will be chained with a RetryExceptionGroup containing GoogleAPIError exceptions from any retries that failed - - GoogleAPIError: raised if the request encounters an unrecoverable error + google.api_core.exceptions.GoogleAPIError: raised if the request encounters an unrecoverable error """ if row_key is None: raise ValueError("row_key must be string or bytes") @@ -847,26 +854,26 @@ async def sample_row_keys( requests will call sample_row_keys internally for this purpose when sharding is enabled RowKeySamples is simply a type alias for list[tuple[bytes, int]]; a list of - row_keys, along with offset positions in the table + row_keys, along with offset positions in the table Args: - - operation_timeout: the time budget for the entire operation, in seconds. + operation_timeout: the time budget for the entire operation, in seconds. Failed requests will be retried within the budget.i Defaults to the Table's default_operation_timeout - - attempt_timeout: the time budget for an individual network request, in seconds. + attempt_timeout: the time budget for an individual network request, in seconds. If it takes longer than this time to complete, the request will be cancelled with a DeadlineExceeded exception, and a retry will be attempted. Defaults to the Table's default_attempt_timeout. If None, defaults to operation_timeout. - - retryable_errors: a list of errors that will be retried if encountered. + retryable_errors: a list of errors that will be retried if encountered. Defaults to the Table's default_retryable_errors. Returns: - - a set of RowKeySamples the delimit contiguous sections of the table + RowKeySamples: a set of RowKeySamples the delimit contiguous sections of the table Raises: - - DeadlineExceeded: raised after operation timeout + google.api_core.exceptions.DeadlineExceeded: raised after operation timeout will be chained with a RetryExceptionGroup containing GoogleAPIError exceptions from any retries that failed - - GoogleAPIError: raised if the request encounters an unrecoverable error + google.api_core.exceptions.GoogleAPIError: raised if the request encounters an unrecoverable error """ # prepare timeouts operation_timeout, attempt_timeout = _get_timeouts( @@ -922,22 +929,22 @@ def mutations_batcher( to avoid excess network calls Args: - - flush_interval: Automatically flush every flush_interval seconds. If None, + flush_interval: Automatically flush every flush_interval seconds. If None, a table default will be used - - flush_limit_mutation_count: Flush immediately after flush_limit_mutation_count + flush_limit_mutation_count: Flush immediately after flush_limit_mutation_count mutations are added across all entries. If None, this limit is ignored. - - flush_limit_bytes: Flush immediately after flush_limit_bytes bytes are added. - - flow_control_max_mutation_count: Maximum number of inflight mutations. - - flow_control_max_bytes: Maximum number of inflight bytes. - - batch_operation_timeout: timeout for each mutate_rows operation, in seconds. + flush_limit_bytes: Flush immediately after flush_limit_bytes bytes are added. + flow_control_max_mutation_count: Maximum number of inflight mutations. + flow_control_max_bytes: Maximum number of inflight bytes. + batch_operation_timeout: timeout for each mutate_rows operation, in seconds. Defaults to the Table's default_mutate_rows_operation_timeout - - batch_attempt_timeout: timeout for each individual request, in seconds. + batch_attempt_timeout: timeout for each individual request, in seconds. Defaults to the Table's default_mutate_rows_attempt_timeout. If None, defaults to batch_operation_timeout. - - batch_retryable_errors: a list of errors that will be retried if encountered. + batch_retryable_errors: a list of errors that will be retried if encountered. Defaults to the Table's default_mutate_rows_retryable_errors. Returns: - - a MutationsBatcherAsync context manager that can batch requests + MutationsBatcherAsync: a MutationsBatcherAsync context manager that can batch requests """ return MutationsBatcherAsync( self, @@ -971,26 +978,26 @@ async def mutate_row( retried on server failure. Non-idempotent operations will not. Args: - - row_key: the row to apply mutations to - - mutations: the set of mutations to apply to the row - - operation_timeout: the time budget for the entire operation, in seconds. - Failed requests will be retried within the budget. - Defaults to the Table's default_operation_timeout - - attempt_timeout: the time budget for an individual network request, in seconds. - If it takes longer than this time to complete, the request will be cancelled with - a DeadlineExceeded exception, and a retry will be attempted. - Defaults to the Table's default_attempt_timeout. - If None, defaults to operation_timeout. - - retryable_errors: a list of errors that will be retried if encountered. - Only idempotent mutations will be retried. Defaults to the Table's - default_retryable_errors. + row_key: the row to apply mutations to + mutations: the set of mutations to apply to the row + operation_timeout: the time budget for the entire operation, in seconds. + Failed requests will be retried within the budget. + Defaults to the Table's default_operation_timeout + attempt_timeout: the time budget for an individual network request, in seconds. + If it takes longer than this time to complete, the request will be cancelled with + a DeadlineExceeded exception, and a retry will be attempted. + Defaults to the Table's default_attempt_timeout. + If None, defaults to operation_timeout. + retryable_errors: a list of errors that will be retried if encountered. + Only idempotent mutations will be retried. Defaults to the Table's + default_retryable_errors. Raises: - - DeadlineExceeded: raised after operation timeout - will be chained with a RetryExceptionGroup containing all - GoogleAPIError exceptions from any retries that failed - - GoogleAPIError: raised on non-idempotent operations that cannot be - safely retried. - - ValueError if invalid arguments are provided + google.api_core.exceptions.DeadlineExceeded: raised after operation timeout + will be chained with a RetryExceptionGroup containing all + GoogleAPIError exceptions from any retries that failed + google.api_core.exceptions.GoogleAPIError: raised on non-idempotent operations that cannot be + safely retried. + ValueError: if invalid arguments are provided """ operation_timeout, attempt_timeout = _get_timeouts( operation_timeout, attempt_timeout, self @@ -1051,23 +1058,23 @@ async def bulk_mutate_rows( raised exception group Args: - - mutation_entries: the batches of mutations to apply + mutation_entries: the batches of mutations to apply Each entry will be applied atomically, but entries will be applied in arbitrary order - - operation_timeout: the time budget for the entire operation, in seconds. + operation_timeout: the time budget for the entire operation, in seconds. Failed requests will be retried within the budget. Defaults to the Table's default_mutate_rows_operation_timeout - - attempt_timeout: the time budget for an individual network request, in seconds. + attempt_timeout: the time budget for an individual network request, in seconds. If it takes longer than this time to complete, the request will be cancelled with a DeadlineExceeded exception, and a retry will be attempted. Defaults to the Table's default_mutate_rows_attempt_timeout. If None, defaults to operation_timeout. - - retryable_errors: a list of errors that will be retried if encountered. + retryable_errors: a list of errors that will be retried if encountered. Defaults to the Table's default_mutate_rows_retryable_errors Raises: - - MutationsExceptionGroup if one or more mutations fails + MutationsExceptionGroup: if one or more mutations fails Contains details about any failed entries in .exceptions - - ValueError if invalid arguments are provided + ValueError: if invalid arguments are provided """ operation_timeout, attempt_timeout = _get_timeouts( operation_timeout, attempt_timeout, self @@ -1099,31 +1106,31 @@ async def check_and_mutate_row( Non-idempotent operation: will not be retried Args: - - row_key: the key of the row to mutate - - predicate: the filter to be applied to the contents of the specified row. + row_key: the key of the row to mutate + predicate: the filter to be applied to the contents of the specified row. Depending on whether or not any results are yielded, either true_case_mutations or false_case_mutations will be executed. If None, checks that the row contains any values at all. - - true_case_mutations: + true_case_mutations: Changes to be atomically applied to the specified row if predicate yields at least one cell when applied to row_key. Entries are applied in order, meaning that earlier mutations can be masked by later ones. Must contain at least one entry if false_case_mutations is empty, and at most 100000. - - false_case_mutations: + false_case_mutations: Changes to be atomically applied to the specified row if predicate_filter does not yield any cells when applied to row_key. Entries are applied in order, meaning that earlier mutations can be masked by later ones. Must contain at least one entry if `true_case_mutations` is empty, and at most 100000. - - operation_timeout: the time budget for the entire operation, in seconds. + operation_timeout: the time budget for the entire operation, in seconds. Failed requests will not be retried. Defaults to the Table's default_operation_timeout Returns: - - bool indicating whether the predicate was true or false + bool indicating whether the predicate was true or false Raises: - - GoogleAPIError exceptions from grpc call + google.api_core.exceptions.GoogleAPIError: exceptions from grpc call """ operation_timeout, _ = _get_timeouts(operation_timeout, None, self) if true_case_mutations is not None and not isinstance( @@ -1167,19 +1174,18 @@ async def read_modify_write_row( Non-idempotent operation: will not be retried Args: - - row_key: the key of the row to apply read/modify/write rules to - - rules: A rule or set of rules to apply to the row. + row_key: the key of the row to apply read/modify/write rules to + rules: A rule or set of rules to apply to the row. Rules are applied in order, meaning that earlier rules will affect the results of later ones. - - operation_timeout: the time budget for the entire operation, in seconds. + operation_timeout: the time budget for the entire operation, in seconds. Failed requests will not be retried. Defaults to the Table's default_operation_timeout. Returns: - - Row: containing cell data that was modified as part of the - operation + Row: a Row containing cell data that was modified as part of the operation Raises: - - GoogleAPIError exceptions from grpc call - - ValueError if invalid arguments are provided + google.api_core.exceptions.GoogleAPIError: exceptions from grpc call + ValueError: if invalid arguments are provided """ operation_timeout, _ = _get_timeouts(operation_timeout, None, self) if operation_timeout <= 0: diff --git a/google/cloud/bigtable/data/_async/mutations_batcher.py b/google/cloud/bigtable/data/_async/mutations_batcher.py index 5d5dd535e..76d13f00b 100644 --- a/google/cloud/bigtable/data/_async/mutations_batcher.py +++ b/google/cloud/bigtable/data/_async/mutations_batcher.py @@ -50,6 +50,13 @@ class _FlowControlAsync: Flow limits are not hard limits. If a single mutation exceeds the configured limits, it will be allowed as a single batch when the capacity is available. + + Args: + max_mutation_count: maximum number of mutations to send in a single rpc. + This corresponds to individual mutations in a single RowMutationEntry. + max_mutation_bytes: maximum number of bytes to send in a single rpc. + Raises: + ValueError: if max_mutation_count or max_mutation_bytes is less than 0 """ def __init__( @@ -57,12 +64,6 @@ def __init__( max_mutation_count: int, max_mutation_bytes: int, ): - """ - Args: - - max_mutation_count: maximum number of mutations to send in a single rpc. - This corresponds to individual mutations in a single RowMutationEntry. - - max_mutation_bytes: maximum number of bytes to send in a single rpc. - """ self._max_mutation_count = max_mutation_count self._max_mutation_bytes = max_mutation_bytes if self._max_mutation_count < 1: @@ -82,10 +83,10 @@ def _has_capacity(self, additional_count: int, additional_size: int) -> bool: previous batches have completed. Args: - - additional_count: number of mutations in the pending entry - - additional_size: size of the pending entry + additional_count: number of mutations in the pending entry + additional_size: size of the pending entry Returns: - - True if there is capacity to send the pending entry, False otherwise + bool: True if there is capacity to send the pending entry, False otherwise """ # adjust limits to allow overly large mutations acceptable_size = max(self._max_mutation_bytes, additional_size) @@ -104,7 +105,7 @@ async def remove_from_flow( operation is complete. Args: - - mutations: mutation or list of mutations to remove from flow control + mutations: mutation or list of mutations to remove from flow control """ if not isinstance(mutations, list): mutations = [mutations] @@ -124,10 +125,11 @@ async def add_to_flow(self, mutations: RowMutationEntry | list[RowMutationEntry] will block until there is capacity available. Args: - - mutations: list mutations to break up into batches + mutations: list mutations to break up into batches Yields: - - list of mutations that have reserved space in the flow control. - Each batch contains at least one mutation. + list[RowMutationEntry]: + list of mutations that have reserved space in the flow control. + Each batch contains at least one mutation. """ if not isinstance(mutations, list): mutations = [mutations] @@ -171,15 +173,28 @@ class MutationsBatcherAsync: Runs mutate_row, mutate_rows, and check_and_mutate_row internally, combining to use as few network requests as required - Flushes: - - every flush_interval seconds - - after queue reaches flush_count in quantity - - after queue reaches flush_size_bytes in storage size - - when batcher is closed or destroyed - - async with table.mutations_batcher() as batcher: - for i in range(10): - batcher.add(row, mut) + Will automatically flush the batcher: + - every flush_interval seconds + - after queue size reaches flush_limit_mutation_count + - after queue reaches flush_limit_bytes + - when batcher is closed or destroyed + + Args: + table: Table to preform rpc calls + flush_interval: Automatically flush every flush_interval seconds. + If None, no time-based flushing is performed. + flush_limit_mutation_count: Flush immediately after flush_limit_mutation_count + mutations are added across all entries. If None, this limit is ignored. + flush_limit_bytes: Flush immediately after flush_limit_bytes bytes are added. + flow_control_max_mutation_count: Maximum number of inflight mutations. + flow_control_max_bytes: Maximum number of inflight bytes. + batch_operation_timeout: timeout for each mutate_rows operation, in seconds. + If TABLE_DEFAULT, defaults to the Table's default_mutate_rows_operation_timeout. + batch_attempt_timeout: timeout for each individual request, in seconds. + If TABLE_DEFAULT, defaults to the Table's default_mutate_rows_attempt_timeout. + If None, defaults to batch_operation_timeout. + batch_retryable_errors: a list of errors that will be retried if encountered. + Defaults to the Table's default_mutate_rows_retryable_errors. """ def __init__( @@ -196,24 +211,6 @@ def __init__( batch_retryable_errors: Sequence[type[Exception]] | TABLE_DEFAULT = TABLE_DEFAULT.MUTATE_ROWS, ): - """ - Args: - - table: Table to preform rpc calls - - flush_interval: Automatically flush every flush_interval seconds. - If None, no time-based flushing is performed. - - flush_limit_mutation_count: Flush immediately after flush_limit_mutation_count - mutations are added across all entries. If None, this limit is ignored. - - flush_limit_bytes: Flush immediately after flush_limit_bytes bytes are added. - - flow_control_max_mutation_count: Maximum number of inflight mutations. - - flow_control_max_bytes: Maximum number of inflight bytes. - - batch_operation_timeout: timeout for each mutate_rows operation, in seconds. - If TABLE_DEFAULT, defaults to the Table's default_mutate_rows_operation_timeout. - - batch_attempt_timeout: timeout for each individual request, in seconds. - If TABLE_DEFAULT, defaults to the Table's default_mutate_rows_attempt_timeout. - If None, defaults to batch_operation_timeout. - - batch_retryable_errors: a list of errors that will be retried if encountered. - Defaults to the Table's default_mutate_rows_retryable_errors. - """ self._operation_timeout, self._attempt_timeout = _get_timeouts( batch_operation_timeout, batch_attempt_timeout, table ) @@ -255,10 +252,10 @@ def _start_flush_timer(self, interval: float | None) -> asyncio.Future[None]: If interval is None, an empty future is returned Args: - - flush_interval: Automatically flush every flush_interval seconds. - If None, no time-based flushing is performed. + flush_interval: Automatically flush every flush_interval seconds. + If None, no time-based flushing is performed. Returns: - - asyncio.Future that represents the background task + asyncio.Future[None]: future representing the background task """ if interval is None or self.closed: empty_future: asyncio.Future[None] = asyncio.Future() @@ -282,14 +279,13 @@ async def append(self, mutation_entry: RowMutationEntry): """ Add a new set of mutations to the internal queue - TODO: return a future to track completion of this entry - Args: - - mutation_entry: new entry to add to flush queue + mutation_entry: new entry to add to flush queue Raises: - - RuntimeError if batcher is closed - - ValueError if an invalid mutation type is added + RuntimeError: if batcher is closed + ValueError: if an invalid mutation type is added """ + # TODO: return a future to track completion of this entry if self.closed: raise RuntimeError("Cannot append to closed MutationsBatcher") if isinstance(mutation_entry, Mutation): # type: ignore @@ -309,7 +305,13 @@ async def append(self, mutation_entry: RowMutationEntry): await asyncio.sleep(0) def _schedule_flush(self) -> asyncio.Future[None] | None: - """Update the flush task to include the latest staged entries""" + """ + Update the flush task to include the latest staged entries + + Returns: + asyncio.Future[None] | None: + future representing the background task, if started + """ if self._staged_entries: entries, self._staged_entries = self._staged_entries, [] self._staged_count, self._staged_bytes = 0, 0 @@ -324,7 +326,7 @@ async def _flush_internal(self, new_entries: list[RowMutationEntry]): Flushes a set of mutations to the server, and updates internal state Args: - - new_entries: list of RowMutationEntry objects to flush + new_entries list of RowMutationEntry objects to flush """ # flush new entries in_process_requests: list[asyncio.Future[list[FailedMutationEntryError]]] = [] @@ -344,12 +346,13 @@ async def _execute_mutate_rows( Helper to execute mutation operation on a batch Args: - - batch: list of RowMutationEntry objects to send to server - - timeout: timeout in seconds. Used as operation_timeout and attempt_timeout. - If not given, will use table defaults + batch: list of RowMutationEntry objects to send to server + timeout: timeout in seconds. Used as operation_timeout and attempt_timeout. + If not given, will use table defaults Returns: - - list of FailedMutationEntryError objects for mutations that failed. - FailedMutationEntryError objects will not contain index information + list[FailedMutationEntryError]: + list of FailedMutationEntryError objects for mutations that failed. + FailedMutationEntryError objects will not contain index information """ try: operation = _MutateRowsOperationAsync( @@ -376,6 +379,9 @@ def _add_exceptions(self, excs: list[Exception]): Add new list of exceptions to internal store. To avoid unbounded memory, the batcher will store the first and last _exception_list_limit exceptions, and discard any in between. + + Args: + excs: list of exceptions to add to the internal store """ self._exceptions_since_last_raise += len(excs) if excs and len(self._oldest_exceptions) < self._exception_list_limit: @@ -392,7 +398,7 @@ def _raise_exceptions(self): Raise any unreported exceptions from background flush operations Raises: - - MutationsExceptionGroup with all unreported exceptions + MutationsExceptionGroup: exception group with all unreported exceptions """ if self._oldest_exceptions or self._newest_exceptions: oldest, self._oldest_exceptions = self._oldest_exceptions, [] @@ -414,11 +420,15 @@ def _raise_exceptions(self): ) async def __aenter__(self): - """For context manager API""" + """Allow use of context manager API""" return self async def __aexit__(self, exc_type, exc, tb): - """For context manager API""" + """ + Allow use of context manager API. + + Flushes the batcher and cleans up resources. + """ await self.close() async def close(self): @@ -457,11 +467,11 @@ def _create_bg_task(func, *args, **kwargs) -> asyncio.Future[Any]: with different concurrency models. Args: - - func: function to execute in background task - - *args: positional arguments to pass to func - - **kwargs: keyword arguments to pass to func + func: function to execute in background task + *args: positional arguments to pass to func + **kwargs: keyword arguments to pass to func Returns: - - Future object representing the background task + asyncio.Future: Future object representing the background task """ return asyncio.create_task(func(*args, **kwargs)) @@ -474,12 +484,13 @@ async def _wait_for_batch_results( waits for them to complete, and returns a list of errors encountered. Args: - - *tasks: futures representing _execute_mutate_rows or _flush_internal tasks + *tasks: futures representing _execute_mutate_rows or _flush_internal tasks Returns: - - list of Exceptions encountered by any of the tasks. Errors are expected - to be FailedMutationEntryError, representing a failed mutation operation. - If a task fails with a different exception, it will be included in the - output list. Successful tasks will not be represented in the output list. + list[Exception]: + list of Exceptions encountered by any of the tasks. Errors are expected + to be FailedMutationEntryError, representing a failed mutation operation. + If a task fails with a different exception, it will be included in the + output list. Successful tasks will not be represented in the output list. """ if not tasks: return [] diff --git a/google/cloud/bigtable/data/_helpers.py b/google/cloud/bigtable/data/_helpers.py index a0b13cbaf..a8fba9ef1 100644 --- a/google/cloud/bigtable/data/_helpers.py +++ b/google/cloud/bigtable/data/_helpers.py @@ -83,11 +83,11 @@ def _attempt_timeout_generator( at which point it will return the remaining time in the operation_timeout. Args: - - per_request_timeout: The timeout value to use for each request, in seconds. + per_request_timeout: The timeout value to use for each request, in seconds. If None, the operation_timeout will be used for each request. - - operation_timeout: The timeout value to use for the entire operationm in seconds. + operation_timeout: The timeout value to use for the entire operationm in seconds. Yields: - - The timeout value to use for the next request, in seonds + float: The timeout value to use for the next request, in seonds """ per_request_timeout = ( per_request_timeout if per_request_timeout is not None else operation_timeout @@ -106,12 +106,13 @@ def _retry_exception_factory( Build retry error based on exceptions encountered during operation Args: - - exc_list: list of exceptions encountered during operation - - is_timeout: whether the operation failed due to timeout - - timeout_val: the operation timeout value in seconds, for constructing + exc_list: list of exceptions encountered during operation + is_timeout: whether the operation failed due to timeout + timeout_val: the operation timeout value in seconds, for constructing the error message Returns: - - tuple of the exception to raise, and a cause exception if applicable + tuple[Exception, Exception|None]: + tuple of the exception to raise, and a cause exception if applicable """ if reason == RetryFailureReason.TIMEOUT: timeout_val_str = f"of {timeout_val:0.1f}s " if timeout_val is not None else "" @@ -144,11 +145,11 @@ def _get_timeouts( resulting timeouts are invalid. Args: - - operation: The timeout value to use for the entire operation, in seconds. - - attempt: The timeout value to use for each attempt, in seconds. - - table: The table to use for default values. + operation: The timeout value to use for the entire operation, in seconds. + attempt: The timeout value to use for each attempt, in seconds. + table: The table to use for default values. Returns: - - A tuple of (operation_timeout, attempt_timeout) + typle[float, float]: A tuple of (operation_timeout, attempt_timeout) """ # load table defaults if necessary if operation == TABLE_DEFAULT.DEFAULT: @@ -185,11 +186,11 @@ def _validate_timeouts( an exception if they are not. Args: - - operation_timeout: The timeout value to use for the entire operation, in seconds. - - attempt_timeout: The timeout value to use for each attempt, in seconds. - - allow_none: If True, attempt_timeout can be None. If False, None values will raise an exception. + operation_timeout: The timeout value to use for the entire operation, in seconds. + attempt_timeout: The timeout value to use for each attempt, in seconds. + allow_none: If True, attempt_timeout can be None. If False, None values will raise an exception. Raises: - - ValueError if operation_timeout or attempt_timeout are invalid. + ValueError: if operation_timeout or attempt_timeout are invalid. """ if operation_timeout is None: raise ValueError("operation_timeout cannot be None") @@ -206,6 +207,16 @@ def _get_retryable_errors( call_codes: Sequence["grpc.StatusCode" | int | type[Exception]] | TABLE_DEFAULT, table: "TableAsync", ) -> list[type[Exception]]: + """ + Convert passed in retryable error codes to a list of exception types. + + Args: + call_codes: The error codes to convert. Can be a list of grpc.StatusCode values, + int values, or Exception types, or a TABLE_DEFAULT value. + table: The table to use for default values. + Returns: + list[type[Exception]]: A list of exception types to retry on. + """ # load table defaults if necessary if call_codes == TABLE_DEFAULT.DEFAULT: call_codes = table.default_retryable_errors diff --git a/google/cloud/bigtable/data/exceptions.py b/google/cloud/bigtable/data/exceptions.py index 3c73ec4e9..8d97640aa 100644 --- a/google/cloud/bigtable/data/exceptions.py +++ b/google/cloud/bigtable/data/exceptions.py @@ -142,10 +142,12 @@ def _format_message( Format a message for the exception group Args: - - excs: the exceptions in the group - - total_entries: the total number of entries attempted, successful or not - - exc_count: the number of exceptions associated with the request - if None, this will be len(excs) + excs: the exceptions in the group + total_entries: the total number of entries attempted, successful or not + exc_count: the number of exceptions associated with the request + if None, this will be len(excs) + Returns: + str: the formatted message """ exc_count = exc_count if exc_count is not None else len(excs) entry_str = "entry" if exc_count == 1 else "entries" @@ -156,10 +158,10 @@ def __init__( ): """ Args: - - excs: the exceptions in the group - - total_entries: the total number of entries attempted, successful or not - - message: the message for the exception group. If None, a default message - will be generated + excs: the exceptions in the group + total_entries: the total number of entries attempted, successful or not + message: the message for the exception group. If None, a default message + will be generated """ message = ( message @@ -174,9 +176,11 @@ def __new__( ): """ Args: - - excs: the exceptions in the group - - total_entries: the total number of entries attempted, successful or not - - message: the message for the exception group. If None, a default message + excs: the exceptions in the group + total_entries: the total number of entries attempted, successful or not + message: the message for the exception group. If None, a default message + Returns: + MutationsExceptionGroup: the new instance """ message = ( message if message is not None else cls._format_message(excs, total_entries) @@ -200,12 +204,14 @@ def from_truncated_lists( describe the number of exceptions that were truncated. Args: - - first_list: the set of oldest exceptions to add to the ExceptionGroup - - last_list: the set of newest exceptions to add to the ExceptionGroup - - total_excs: the total number of exceptions associated with the request - Should be len(first_list) + len(last_list) + number of dropped exceptions - in the middle - - entry_count: the total number of entries attempted, successful or not + first_list: the set of oldest exceptions to add to the ExceptionGroup + last_list: the set of newest exceptions to add to the ExceptionGroup + total_excs: the total number of exceptions associated with the request + Should be len(first_list) + len(last_list) + number of dropped exceptions + in the middle + entry_count: the total number of entries attempted, successful or not + Returns: + MutationsExceptionGroup: the new instance """ first_count, last_count = len(first_list), len(last_list) if first_count + last_count >= total_excs: diff --git a/google/cloud/bigtable/data/mutations.py b/google/cloud/bigtable/data/mutations.py index b5729d25e..fd9b2c24e 100644 --- a/google/cloud/bigtable/data/mutations.py +++ b/google/cloud/bigtable/data/mutations.py @@ -33,36 +33,75 @@ class Mutation(ABC): - """Model class for mutations""" + """ + Abstract base class for mutations. + + This class defines the interface for different types of mutations that can be + applied to Bigtable rows. + """ @abstractmethod def _to_dict(self) -> dict[str, Any]: + """ + Convert the mutation to a dictionary representation. + + Returns: + dict[str, Any]: A dictionary representation of the mutation. + """ raise NotImplementedError def _to_pb(self) -> data_pb.Mutation: """ - Convert the mutation to protobuf + Convert the mutation to a protobuf representation. + + Returns: + Mutation: A protobuf representation of the mutation. """ return data_pb.Mutation(**self._to_dict()) def is_idempotent(self) -> bool: """ Check if the mutation is idempotent - If false, the mutation will not be retried + + Idempotent mutations can be safely retried on failure. + + Returns: + bool: True if the mutation is idempotent, False otherwise. """ return True def __str__(self) -> str: + """ + Return a string representation of the mutation. + + Returns: + str: A string representation of the mutation. + """ return str(self._to_dict()) def size(self) -> int: """ Get the size of the mutation in bytes + + Returns: + int: The size of the mutation in bytes. """ return getsizeof(self._to_dict()) @classmethod def _from_dict(cls, input_dict: dict[str, Any]) -> Mutation: + """ + Create a `Mutation` instance from a dictionary representation. + + Args: + input_dict (dict[str, Any]): A dictionary representation of the mutation. + + Returns: + Mutation: A Mutation instance created from the dictionary. + + Raises: + ValueError: If the input dictionary is invalid or does not represent a valid mutation type. + """ instance: Mutation | None = None try: if "set_cell" in input_dict: @@ -96,6 +135,25 @@ def _from_dict(cls, input_dict: dict[str, Any]) -> Mutation: class SetCell(Mutation): + """ + Mutation to set the value of a cell. + + Args: + family (str): The name of the column family to which the new cell belongs. + qualifier (bytes | str): The column qualifier of the new cell. + new_value (bytes | str | int): The value of the new cell. + timestamp_micros (int | None): The timestamp of the new cell. If `None`, + the current timestamp will be used. Timestamps will be sent with + millisecond precision. Extra precision will be truncated. If -1, the + server will assign a timestamp. Note that `SetCell` mutations with + server-side timestamps are non-idempotent operations and will not be retried. + + Raises: + TypeError: If `qualifier` is not `bytes` or `str`. + TypeError: If `new_value` is not `bytes`, `str`, or `int`. + ValueError: If `timestamp_micros` is less than `_SERVER_SIDE_TIMESTAMP`. + """ + def __init__( self, family: str, @@ -103,18 +161,6 @@ def __init__( new_value: bytes | str | int, timestamp_micros: int | None = None, ): - """ - Mutation to set the value of a cell - - Args: - - family: The name of the column family to which the new cell belongs. - - qualifier: The column qualifier of the new cell. - - new_value: The value of the new cell. str or int input will be converted to bytes - - timestamp_micros: The timestamp of the new cell. If None, the current timestamp will be used. - Timestamps will be sent with milisecond-percision. Extra precision will be truncated. - If -1, the server will assign a timestamp. Note that SetCell mutations with server-side - timestamps are non-idempotent operations and will not be retried. - """ qualifier = qualifier.encode() if isinstance(qualifier, str) else qualifier if not isinstance(qualifier, bytes): raise TypeError("qualifier must be bytes or str") @@ -142,7 +188,6 @@ def __init__( self.timestamp_micros = timestamp_micros def _to_dict(self) -> dict[str, Any]: - """Convert the mutation to a dictionary representation""" return { "set_cell": { "family_name": self.family, @@ -153,12 +198,26 @@ def _to_dict(self) -> dict[str, Any]: } def is_idempotent(self) -> bool: - """Check if the mutation is idempotent""" return self.timestamp_micros != _SERVER_SIDE_TIMESTAMP @dataclass class DeleteRangeFromColumn(Mutation): + """ + Mutation to delete a range of cells from a column. + + Args: + family (str): The name of the column family. + qualifier (bytes): The column qualifier. + start_timestamp_micros (int | None): The start timestamp of the range to + delete. `None` represents 0. Defaults to `None`. + end_timestamp_micros (int | None): The end timestamp of the range to + delete. `None` represents infinity. Defaults to `None`. + + Raises: + ValueError: If `start_timestamp_micros` is greater than `end_timestamp_micros`. + """ + family: str qualifier: bytes # None represents 0 @@ -191,6 +250,13 @@ def _to_dict(self) -> dict[str, Any]: @dataclass class DeleteAllFromFamily(Mutation): + """ + Mutation to delete all cells from a column family. + + Args: + family_to_delete (str): The name of the column family to delete. + """ + family_to_delete: str def _to_dict(self) -> dict[str, Any]: @@ -203,6 +269,10 @@ def _to_dict(self) -> dict[str, Any]: @dataclass class DeleteAllFromRow(Mutation): + """ + Mutation to delete all cells from a row. + """ + def _to_dict(self) -> dict[str, Any]: return { "delete_from_row": {}, @@ -210,6 +280,22 @@ def _to_dict(self) -> dict[str, Any]: class RowMutationEntry: + """ + A single entry in a `MutateRows` request. + + This class represents a set of mutations to apply to a specific row in a + Bigtable table. + + Args: + row_key (bytes | str): The key of the row to mutate. + mutations (Mutation | list[Mutation]): The mutation or list of mutations to apply + to the row. + + Raises: + ValueError: If `mutations` is empty or contains more than + `_MUTATE_ROWS_REQUEST_MUTATION_LIMIT` mutations. + """ + def __init__(self, row_key: bytes | str, mutations: Mutation | list[Mutation]): if isinstance(row_key, str): row_key = row_key.encode("utf-8") @@ -225,29 +311,58 @@ def __init__(self, row_key: bytes | str, mutations: Mutation | list[Mutation]): self.mutations = tuple(mutations) def _to_dict(self) -> dict[str, Any]: + """ + Convert the mutation entry to a dictionary representation. + + Returns: + dict[str, Any]: A dictionary representation of the mutation entry + """ return { "row_key": self.row_key, "mutations": [mutation._to_dict() for mutation in self.mutations], } def _to_pb(self) -> types_pb.MutateRowsRequest.Entry: + """ + Convert the mutation entry to a protobuf representation. + + Returns: + MutateRowsRequest.Entry: A protobuf representation of the mutation entry. + """ return types_pb.MutateRowsRequest.Entry( row_key=self.row_key, mutations=[mutation._to_pb() for mutation in self.mutations], ) def is_idempotent(self) -> bool: - """Check if the mutation is idempotent""" + """ + Check if all mutations in the entry are idempotent. + + Returns: + bool: True if all mutations in the entry are idempotent, False otherwise. + """ return all(mutation.is_idempotent() for mutation in self.mutations) def size(self) -> int: """ - Get the size of the mutation in bytes + Get the size of the mutation entry in bytes. + + Returns: + int: The size of the mutation entry in bytes. """ return getsizeof(self._to_dict()) @classmethod def _from_dict(cls, input_dict: dict[str, Any]) -> RowMutationEntry: + """ + Create a `RowMutationEntry` instance from a dictionary representation. + + Args: + input_dict (dict[str, Any]): A dictionary representation of the mutation entry. + + Returns: + RowMutationEntry: A RowMutationEntry instance created from the dictionary. + """ return RowMutationEntry( row_key=input_dict["row_key"], mutations=[ diff --git a/google/cloud/bigtable/data/read_modify_write_rules.py b/google/cloud/bigtable/data/read_modify_write_rules.py index f43dbe79f..e2d3b9f4f 100644 --- a/google/cloud/bigtable/data/read_modify_write_rules.py +++ b/google/cloud/bigtable/data/read_modify_write_rules.py @@ -23,6 +23,10 @@ class ReadModifyWriteRule(abc.ABC): + """ + Abstract base class for read-modify-write rules. + """ + def __init__(self, family: str, qualifier: bytes | str): qualifier = ( qualifier if isinstance(qualifier, bytes) else qualifier.encode("utf-8") @@ -39,6 +43,23 @@ def _to_pb(self) -> data_pb.ReadModifyWriteRule: class IncrementRule(ReadModifyWriteRule): + """ + Rule to increment a cell's value. + + Args: + family (str): + The family name of the cell to increment. + qualifier (bytes | str): + The qualifier of the cell to increment. + increment_amount (int): + The amount to increment the cell's value. Must be between -2**63 and 2**63 (64-bit signed int). + Raises: + TypeError: + If increment_amount is not an integer. + ValueError: + If increment_amount is not between -2**63 and 2**63 (64-bit signed int). + """ + def __init__(self, family: str, qualifier: bytes | str, increment_amount: int = 1): if not isinstance(increment_amount, int): raise TypeError("increment_amount must be an integer") @@ -58,6 +79,20 @@ def _to_dict(self) -> dict[str, str | bytes | int]: class AppendValueRule(ReadModifyWriteRule): + """ + Rule to append a value to a cell's value. + + Args: + family (str): + The family name of the cell to append to. + qualifier (bytes | str): + The qualifier of the cell to append to. + append_value (bytes | str): + The value to append to the cell's value. + Raises: + TypeError: If append_value is not bytes or str. + """ + def __init__(self, family: str, qualifier: bytes | str, append_value: bytes | str): append_value = ( append_value.encode("utf-8") diff --git a/google/cloud/bigtable/data/read_rows_query.py b/google/cloud/bigtable/data/read_rows_query.py index 362f54c3e..5e414391c 100644 --- a/google/cloud/bigtable/data/read_rows_query.py +++ b/google/cloud/bigtable/data/read_rows_query.py @@ -44,15 +44,15 @@ def __init__( ): """ Args: - - start_key: The start key of the range. If empty, the range is unbounded on the left. - - end_key: The end key of the range. If empty, the range is unbounded on the right. - - start_is_inclusive: Whether the start key is inclusive. If None, the start key is + start_key: The start key of the range. If empty, the range is unbounded on the left. + end_key: The end key of the range. If empty, the range is unbounded on the right. + start_is_inclusive: Whether the start key is inclusive. If None, the start key is inclusive. - - end_is_inclusive: Whether the end key is inclusive. If None, the end key is not inclusive. + end_is_inclusive: Whether the end key is inclusive. If None, the end key is not inclusive. Raises: - - ValueError: if start_key is greater than end_key, or start_is_inclusive, - or end_is_inclusive is set when the corresponding key is None, - or start_key or end_key is not a string or bytes. + ValueError: if start_key is greater than end_key, or start_is_inclusive + ValueError: if end_is_inclusive is set when the corresponding key is None + ValueError: if start_key or end_key is not a string or bytes. """ # convert empty key inputs to None for consistency start_key = None if not start_key else start_key @@ -100,39 +100,69 @@ def start_key(self) -> bytes | None: def end_key(self) -> bytes | None: """ Returns the end key of the range. If None, the range is unbounded on the right. + + Returns: + bytes | None: The end key of the range, or None if the range is unbounded on the right. """ return self._pb.end_key_closed or self._pb.end_key_open or None @property def start_is_inclusive(self) -> bool: """ - Returns whether the range is inclusive of the start key. - Returns True if the range is unbounded on the left. + Indicates if the range is inclusive of the start key. + + If the range is unbounded on the left, this will return True. + + Returns: + bool: Whether the range is inclusive of the start key. """ return not bool(self._pb.start_key_open) @property def end_is_inclusive(self) -> bool: """ - Returns whether the range is inclusive of the end key. - Returns True if the range is unbounded on the right. + Indicates if the range is inclusive of the end key. + + If the range is unbounded on the right, this will return True. + + Returns: + bool: Whether the range is inclusive of the end key. """ return not bool(self._pb.end_key_open) def _to_pb(self) -> RowRangePB: - """Converts this object to a protobuf""" + """ + Converts this object to a protobuf + + Returns: + RowRangePB: The protobuf representation of this object + """ return self._pb @classmethod def _from_pb(cls, data: RowRangePB) -> RowRange: - """Creates a RowRange from a protobuf""" + """ + Creates a RowRange from a protobuf + + Args: + data (RowRangePB): The protobuf to convert + Returns: + RowRange: The converted RowRange + """ instance = cls() instance._pb = data return instance @classmethod def _from_dict(cls, data: dict[str, bytes | str]) -> RowRange: - """Creates a RowRange from a protobuf""" + """ + Creates a RowRange from a protobuf + + Args: + data (dict[str, bytes | str]): The dictionary to convert + Returns: + RowRange: The converted RowRange + """ formatted_data = { k: v.encode() if isinstance(v, str) else v for k, v in data.items() } @@ -144,6 +174,9 @@ def __bool__(self) -> bool: """ Empty RowRanges (representing a full table scan) are falsy, because they can be substituted with None. Non-empty RowRanges are truthy. + + Returns: + bool: True if the RowRange is not empty, False otherwise """ return bool( self._pb.start_key_closed @@ -160,7 +193,11 @@ def __eq__(self, other: Any) -> bool: def __str__(self) -> str: """ Represent range as a string, e.g. "[b'a', b'z)" + Unbounded start or end keys are represented as "-inf" or "+inf" + + Returns: + str: The string representation of the range """ left = "[" if self.start_is_inclusive else "(" right = "]" if self.end_is_inclusive else ")" @@ -199,12 +236,12 @@ def __init__( Create a new ReadRowsQuery Args: - - row_keys: row keys to include in the query + row_keys: row keys to include in the query a query can contain multiple keys, but ranges should be preferred - - row_ranges: ranges of rows to include in the query - - limit: the maximum number of rows to return. None or 0 means no limit + row_ranges: ranges of rows to include in the query + limit: the maximum number of rows to return. None or 0 means no limit default: None (no limit) - - row_filter: a RowFilter to apply to the query + row_filter: a RowFilter to apply to the query """ if row_keys is None: row_keys = [] @@ -223,14 +260,34 @@ def __init__( @property def row_keys(self) -> list[bytes]: + """ + Return the row keys in this query + + Returns: + list[bytes]: the row keys in this query + """ return list(self._row_set.row_keys) @property def row_ranges(self) -> list[RowRange]: + """ + Return the row ranges in this query + + Returns: + list[RowRange]: the row ranges in this query + """ return [RowRange._from_pb(r) for r in self._row_set.row_ranges] @property def limit(self) -> int | None: + """ + Return the maximum number of rows to return by this query + + None or 0 means no limit + + Returns: + int | None: the maximum number of rows to return by this query + """ return self._limit or None @limit.setter @@ -241,11 +298,9 @@ def limit(self, new_limit: int | None): None or 0 means no limit Args: - - new_limit: the new limit to apply to this query - Returns: - - a reference to this query for chaining + new_limit: the new limit to apply to this query Raises: - - ValueError if new_limit is < 0 + ValueError: if new_limit is < 0 """ if new_limit is not None and new_limit < 0: raise ValueError("limit must be >= 0") @@ -253,6 +308,12 @@ def limit(self, new_limit: int | None): @property def filter(self) -> RowFilter | None: + """ + Return the RowFilter applied to this query + + Returns: + RowFilter | None: the RowFilter applied to this query + """ return self._filter @filter.setter @@ -261,9 +322,7 @@ def filter(self, row_filter: RowFilter | None): Set a RowFilter to apply to this query Args: - - row_filter: a RowFilter to apply to this query - Returns: - - a reference to this query for chaining + row_filter: a RowFilter to apply to this query """ self._filter = row_filter @@ -274,11 +333,9 @@ def add_key(self, row_key: str | bytes): A query can contain multiple keys, but ranges should be preferred Args: - - row_key: a key to add to this query - Returns: - - a reference to this query for chaining + row_key: a key to add to this query Raises: - - ValueError if an input is not a string or bytes + ValueError: if an input is not a string or bytes """ if isinstance(row_key, str): row_key = row_key.encode() @@ -295,7 +352,7 @@ def add_range( Add a range of row keys to this query. Args: - - row_range: a range of row keys to add to this query + row_range: a range of row keys to add to this query """ if row_range not in self.row_ranges: self._row_set.row_ranges.append(row_range._pb) @@ -305,10 +362,12 @@ def shard(self, shard_keys: RowKeySamples) -> ShardedQuery: Split this query into multiple queries that can be evenly distributed across nodes and run in parallel + Args: + shard_keys: a list of row keys that define the boundaries of segments. Returns: - - a ShardedQuery that can be used in sharded_read_rows calls + ShardedQuery: a ShardedQuery that can be used in sharded_read_rows calls Raises: - - AttributeError if the query contains a limit + AttributeError: if the query contains a limit """ if self.limit is not None: raise AttributeError("Cannot shard query with a limit") @@ -357,11 +416,11 @@ def _shard_range( segments of the key-space, determined by split_points Args: - - orig_range: a row range to split - - split_points: a list of row keys that define the boundaries of segments. + orig_range: a row range to split + split_points: a list of row keys that define the boundaries of segments. each point represents the inclusive end of a segment Returns: - - a list of tuples, containing a segment index and a new sub-range. + list[tuple[int, RowRange]]: a list of tuples, containing a segment index and a new sub-range. """ # 1. find the index of the segment the start key belongs to if orig_range.start_key is None: @@ -446,6 +505,11 @@ def __eq__(self, other): RowRanges are equal if they have the same row keys, row ranges, filter and limit, or if they both represent a full scan with the same filter and limit + + Args: + other: the object to compare to + Returns: + bool: True if the objects are equal, False otherwise """ if not isinstance(other, ReadRowsQuery): return False diff --git a/google/cloud/bigtable/data/row.py b/google/cloud/bigtable/data/row.py index 13019cbdd..28f0260a9 100644 --- a/google/cloud/bigtable/data/row.py +++ b/google/cloud/bigtable/data/row.py @@ -49,6 +49,10 @@ def __init__( Row objects are not intended to be created by users. They are returned by the Bigtable backend. + + Args: + key (bytes): Row key + cells (list[Cell]): List of cells in the row """ self.row_key = key self.cells: list[Cell] = cells @@ -65,6 +69,9 @@ def _index( Returns an index of cells associated with each family and qualifier. The index is lazily created when needed + + Returns: + OrderedDict: Index of cells """ if self._index_data is None: self._index_data = OrderedDict() @@ -81,6 +88,11 @@ def _from_pb(cls, row_pb: RowPB) -> Row: Row objects are not intended to be created by users. They are returned by the Bigtable backend. + + Args: + row_pb (RowPB): Protobuf representation of the row + Returns: + Row: Row object created from the protobuf representation """ row_key: bytes = row_pb.key cell_list: list[Cell] = [] @@ -112,6 +124,14 @@ def get_cells( Can also be accessed through indexing: cells = row["family", "qualifier"] cells = row["family"] + + Args: + family: family to filter cells by + qualifier: qualifier to filter cells by + Returns: + list[Cell]: List of cells in the row matching the filter + Raises: + ValueError: If family or qualifier is not found in the row """ if family is None: if qualifier is not None: @@ -137,6 +157,13 @@ def get_cells( def _get_all_from_family(self, family: str) -> Generator[Cell, None, None]: """ Returns all cells in the row for the family_id + + Args: + family: family to filter cells by + Yields: + Cell: cells in the row for the family_id + Raises: + ValueError: If family is not found in the row """ if family not in self._index: raise ValueError(f"Family '{family}' not found in row '{self.row_key!r}'") @@ -153,6 +180,9 @@ def __str__(self) -> str: (family='fam', qualifier=b'col'): [b'value', (+1 more),], (family='fam', qualifier=b'col2'): [b'other'], } + + Returns: + str: Human-readable string representation of the row """ output = ["{"] for family, qualifier in self._get_column_components(): @@ -201,6 +231,9 @@ def _to_dict(self) -> dict[str, Any]: def __iter__(self): """ Allow iterating over all cells in the row + + Returns: + Iterator: Iterator over the cells in the row """ return iter(self.cells) @@ -210,6 +243,11 @@ def __contains__(self, item): Works for both cells in the internal list, and `family` or `(family, qualifier)` pairs associated with the cells + + Args: + item: item to check for in the row + Returns: + bool: True if item is in the row, False otherwise """ if isinstance(item, _family_type): return item in self._index @@ -266,7 +304,10 @@ def __getitem__(self, index): def __len__(self): """ - Implements `len()` operator + Returns the number of cells in the row + + Returns: + int: Number of cells in the row """ return len(self.cells) @@ -275,12 +316,18 @@ def _get_column_components(self) -> list[tuple[str, bytes]]: Returns a list of (family, qualifier) pairs associated with the cells Pairs can be used for indexing + + Returns: + list[tuple[str, bytes]]: List of (family, qualifier) pairs """ return [(f, q) for f in self._index for q in self._index[f]] def __eq__(self, other): """ Implements `==` operator + + Returns: + bool: True if rows are equal, False otherwise """ # for performance reasons, check row metadata # before checking individual cells @@ -307,6 +354,9 @@ def __eq__(self, other): def __ne__(self, other) -> bool: """ Implements `!=` operator + + Returns: + bool: True if rows are not equal, False otherwise """ return not self == other @@ -319,6 +369,14 @@ class Cell: Does not represent all data contained in the cell, only data returned by a query. Expected to be read-only to users, and written by backend + + Args: + value: the byte string value of the cell + row_key: the row key of the cell + family: the family associated with the cell + qualifier: the column qualifier associated with the cell + timestamp_micros: the timestamp of the cell in microseconds + labels: the list of labels associated with the cell """ __slots__ = ( @@ -339,12 +397,8 @@ def __init__( timestamp_micros: int, labels: list[str] | None = None, ): - """ - Cell constructor - - Cell objects are not intended to be constructed by users. - They are returned by the Bigtable backend. - """ + # Cell objects are not intended to be constructed by users. + # They are returned by the Bigtable backend. self.value = value self.row_key = row_key self.family = family @@ -359,6 +413,9 @@ def __int__(self) -> int: Allows casting cell to int Interprets value as a 64-bit big-endian signed integer, as expected by ReadModifyWrite increment rule + + Returns: + int: Value of the cell as a 64-bit big-endian signed integer """ return int.from_bytes(self.value, byteorder="big", signed=True) @@ -368,6 +425,9 @@ def _to_dict(self) -> dict[str, Any]: proto format https://cloud.google.com/bigtable/docs/reference/data/rpc/google.bigtable.v2#cell + + Returns: + dict: Dictionary representation of the cell """ cell_dict: dict[str, Any] = { "value": self.value, @@ -381,12 +441,18 @@ def __str__(self) -> str: """ Allows casting cell to str Prints encoded byte string, same as printing value directly. + + Returns: + str: Encoded byte string of the value """ return str(self.value) def __repr__(self): """ Returns a string representation of the cell + + Returns: + str: String representation of the cell """ return f"Cell(value={self.value!r}, row_key={self.row_key!r}, family='{self.family}', qualifier={self.qualifier!r}, timestamp_micros={self.timestamp_micros}, labels={self.labels})" @@ -395,9 +461,16 @@ def __repr__(self): def __lt__(self, other) -> bool: """ Implements `<` operator + + Args: + other: Cell to compare with + Returns: + bool: True if this cell is less than the other cell, False otherwise + Raises: + NotImplementedError: If other is not a Cell """ if not isinstance(other, Cell): - return NotImplemented + raise NotImplementedError this_ordering = ( self.family, self.qualifier, @@ -417,9 +490,14 @@ def __lt__(self, other) -> bool: def __eq__(self, other) -> bool: """ Implements `==` operator + + Args: + other: Cell to compare with + Returns: + bool: True if cells are equal, False otherwise """ if not isinstance(other, Cell): - return NotImplemented + return False return ( self.row_key == other.row_key and self.family == other.family @@ -433,12 +511,20 @@ def __eq__(self, other) -> bool: def __ne__(self, other) -> bool: """ Implements `!=` operator + + Args: + other: Cell to compare with + Returns: + bool: True if cells are not equal, False otherwise """ return not self == other def __hash__(self): """ Implements `hash()` function to fingerprint cell + + Returns: + int: hash value of the cell """ return hash( ( diff --git a/tests/unit/data/_async/test_client.py b/tests/unit/data/_async/test_client.py index a0019947d..7593572d8 100644 --- a/tests/unit/data/_async/test_client.py +++ b/tests/unit/data/_async/test_client.py @@ -1498,7 +1498,6 @@ async def test_read_rows_timeout(self, operation_timeout): "per_request_t, operation_t, expected_num", [ (0.05, 0.08, 2), - (0.05, 0.54, 11), (0.05, 0.14, 3), (0.05, 0.24, 5), ],