Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(docs): improve docstrings in async classes #964

Merged
merged 17 commits into from
May 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 14 additions & 15 deletions google/cloud/bigtable/data/_async/_mutate_rows.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,14 @@ class _MutateRowsOperationAsync:

Errors are exposed as a MutationsExceptionGroup, which contains a list of
exceptions organized by the related failed mutation entries.

Args:
gapic_client: the client to use for the mutate_rows call
table: the table associated with the request
mutation_entries: a list of RowMutationEntry objects to send to the server
operation_timeout: the timeout to use for the entire operation, in seconds.
attempt_timeout: the timeout to use for each mutate_rows attempt, in seconds.
If not specified, the request will run until operation_timeout is reached.
"""

def __init__(
Expand All @@ -67,15 +75,6 @@ def __init__(
attempt_timeout: float | None,
retryable_exceptions: Sequence[type[Exception]] = (),
):
"""
Args:
- gapic_client: the client to use for the mutate_rows call
- table: the table associated with the request
- mutation_entries: a list of RowMutationEntry objects to send to the server
- operation_timeout: the timeout to use for the entire operation, in seconds.
- attempt_timeout: the timeout to use for each mutate_rows attempt, in seconds.
If not specified, the request will run until operation_timeout is reached.
"""
# check that mutations are within limits
total_mutations = sum(len(entry.mutations) for entry in mutation_entries)
if total_mutations > _MUTATE_ROWS_REQUEST_MUTATION_LIMIT:
Expand Down Expand Up @@ -121,7 +120,7 @@ async def start(self):
Start the operation, and run until completion

Raises:
- MutationsExceptionGroup: if any mutations failed
MutationsExceptionGroup: if any mutations failed
"""
try:
# trigger mutate_rows
Expand Down Expand Up @@ -157,9 +156,9 @@ async def _run_attempt(self):
Run a single attempt of the mutate_rows rpc.

Raises:
- _MutateRowsIncomplete: if there are failed mutations eligible for
retry after the attempt is complete
- GoogleAPICallError: if the gapic rpc fails
_MutateRowsIncomplete: if there are failed mutations eligible for
retry after the attempt is complete
GoogleAPICallError: if the gapic rpc fails
"""
request_entries = [self.mutations[idx].proto for idx in self.remaining_indices]
# track mutations in this request that have not been finalized yet
Expand Down Expand Up @@ -213,8 +212,8 @@ def _handle_entry_error(self, idx: int, exc: Exception):
retryable.

Args:
- idx: the index of the mutation that failed
- exc: the exception to add to the list
idx: the index of the mutation that failed
exc: the exception to add to the list
"""
entry = self.mutations[idx].entry
self.errors.setdefault(idx, []).append(exc)
Expand Down
33 changes: 29 additions & 4 deletions google/cloud/bigtable/data/_async/_read_rows.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,13 @@ class _ReadRowsOperationAsync:

ReadRowsOperation(request, client) handles row merging logic end-to-end, including
performing retries on stream errors.

Args:
query: The query to execute
table: The table to send the request to
operation_timeout: The total time to allow for the operation, in seconds
attempt_timeout: The time to allow for each individual attempt, in seconds
retryable_exceptions: A list of exceptions that should trigger a retry
"""

__slots__ = (
Expand Down Expand Up @@ -104,6 +111,9 @@ def __init__(
def start_operation(self) -> AsyncGenerator[Row, None]:
"""
Start the read_rows operation, retrying on retryable errors.

Yields:
Row: The next row in the stream
"""
return retries.retry_target_stream_async(
self._read_rows_attempt,
Expand All @@ -119,6 +129,9 @@ def _read_rows_attempt(self) -> AsyncGenerator[Row, None]:
This function is intended to be wrapped by retry logic,
which will call this function until it succeeds or
a non-retryable error is raised.

Yields:
Row: The next row in the stream
"""
# revise request keys and ranges between attempts
if self._last_yielded_row_key is not None:
Expand Down Expand Up @@ -151,6 +164,11 @@ async def chunk_stream(
) -> AsyncGenerator[ReadRowsResponsePB.CellChunk, None]:
"""
process chunks out of raw read_rows stream

Args:
stream: the raw read_rows stream from the gapic client
Yields:
ReadRowsResponsePB.CellChunk: the next chunk in the stream
"""
async for resp in await stream:
# extract proto from proto-plus wrapper
Expand Down Expand Up @@ -195,9 +213,14 @@ async def chunk_stream(
@staticmethod
async def merge_rows(
chunks: AsyncGenerator[ReadRowsResponsePB.CellChunk, None] | None
):
) -> AsyncGenerator[Row, None]:
"""
Merge chunks into rows

Args:
chunks: the chunk stream to merge
Yields:
Row: the next row in the stream
"""
if chunks is None:
return
Expand Down Expand Up @@ -311,10 +334,12 @@ def _revise_request_rowset(
Revise the rows in the request to avoid ones we've already processed.

Args:
- row_set: the row set from the request
- last_seen_row_key: the last row key encountered
row_set: the row set from the request
last_seen_row_key: the last row key encountered
Returns:
RowSetPB: the new rowset after adusting for the last seen key
Raises:
- _RowSetComplete: if there are no rows left to process after the revision
_RowSetComplete: if there are no rows left to process after the revision
"""
# if user is doing a whole table scan, start a new one with the last seen key
if row_set is None or (not row_set.row_ranges and row_set.row_keys is not None):
Expand Down
Loading
Loading