Skip to content

Commit

Permalink
feat: added instrumentation to all rpcs
Browse files Browse the repository at this point in the history
  • Loading branch information
daniel-sanche committed Jan 26, 2024
1 parent 2c57f22 commit 2955411
Show file tree
Hide file tree
Showing 16 changed files with 1,172 additions and 592 deletions.
67 changes: 47 additions & 20 deletions google/cloud/bigtable/data/_async/_mutate_rows.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from google.cloud.bigtable.data._helpers import _make_metadata
from google.cloud.bigtable.data._helpers import _attempt_timeout_generator
from google.cloud.bigtable.data._helpers import _retry_exception_factory
from google.cloud.bigtable.data._helpers import backoff_generator

# mutate_rows requests are limited to this number of mutations
from google.cloud.bigtable.data.mutations import _MUTATE_ROWS_REQUEST_MUTATION_LIMIT
Expand All @@ -35,6 +36,7 @@
)
from google.cloud.bigtable.data.mutations import RowMutationEntry
from google.cloud.bigtable.data._async.client import TableAsync
from google.cloud.bigtable.data._metrics import ActiveOperationMetric


@dataclass
Expand Down Expand Up @@ -65,6 +67,7 @@ def __init__(
mutation_entries: list["RowMutationEntry"],
operation_timeout: float,
attempt_timeout: float | None,
metrics: ActiveOperationMetric,
retryable_exceptions: Sequence[type[Exception]] = (),
):
"""
Expand All @@ -75,6 +78,8 @@ def __init__(
- operation_timeout: the timeout to use for the entire operation, in seconds.
- attempt_timeout: the timeout to use for each mutate_rows attempt, in seconds.
If not specified, the request will run until operation_timeout is reached.
- metrics: the metrics object to use for tracking the operation
- retryable_exceptions: a list of exceptions that should be retried
"""
# check that mutations are within limits
total_mutations = sum(len(entry.mutations) for entry in mutation_entries)
Expand All @@ -100,7 +105,7 @@ def __init__(
# Entry level errors
bt_exceptions._MutateRowsIncomplete,
)
sleep_generator = retries.exponential_sleep_generator(0.01, 2, 60)
sleep_generator = backoff_generator(0.01, 2, 60)
self._operation = retries.retry_target_async(
self._run_attempt,
self.is_retryable,
Expand All @@ -115,6 +120,9 @@ def __init__(
self.mutations = [_EntryWithProto(m, m._to_pb()) for m in mutation_entries]
self.remaining_indices = list(range(len(self.mutations)))
self.errors: dict[int, list[Exception]] = {}
# set up metrics
metrics.backoff_generator = sleep_generator
self._operation_metrics = metrics

async def start(self):
"""
Expand Down Expand Up @@ -148,9 +156,13 @@ async def start(self):
bt_exceptions.FailedMutationEntryError(idx, entry, cause_exc)
)
if all_errors:
raise bt_exceptions.MutationsExceptionGroup(
combined_exc = bt_exceptions.MutationsExceptionGroup(
all_errors, len(self.mutations)
)
self._operation_metrics.end_with_status(combined_exc)
raise combined_exc
else:
self._operation_metrics.end_with_success()

async def _run_attempt(self):
"""
Expand All @@ -161,6 +173,8 @@ async def _run_attempt(self):
retry after the attempt is complete
- GoogleAPICallError: if the gapic rpc fails
"""
# register attempt start
self._operation_metrics.start_attempt()
request_entries = [self.mutations[idx].proto for idx in self.remaining_indices]
# track mutations in this request that have not been finalized yet
active_request_indices = {
Expand All @@ -177,34 +191,47 @@ async def _run_attempt(self):
entries=request_entries,
retry=None,
)
async for result_list in result_generator:
for result in result_list.entries:
# convert sub-request index to global index
orig_idx = active_request_indices[result.index]
entry_error = core_exceptions.from_grpc_status(
result.status.code,
result.status.message,
details=result.status.details,
)
if result.status.code != 0:
# mutation failed; update error list (and remaining_indices if retryable)
self._handle_entry_error(orig_idx, entry_error)
elif orig_idx in self.errors:
# mutation succeeded; remove from error list
del self.errors[orig_idx]
# remove processed entry from active list
del active_request_indices[result.index]
try:
async for result_list in result_generator:
for result in result_list.entries:
# convert sub-request index to global index
orig_idx = active_request_indices[result.index]
entry_error = core_exceptions.from_grpc_status(
result.status.code,
result.status.message,
details=result.status.details,
)
if result.status.code != 0:
# mutation failed; update error list (and remaining_indices if retryable)
self._handle_entry_error(orig_idx, entry_error)
elif orig_idx in self.errors:
# mutation succeeded; remove from error list
del self.errors[orig_idx]
# remove processed entry from active list
del active_request_indices[result.index]
finally:
# send trailing metadata to metrics
result_generator.cancel()
metadata = (
await result_generator.trailing_metadata()
+ await result_generator.initial_metadata()
)
self._operation_metrics.add_response_metadata(metadata)
except Exception as exc:
# add this exception to list for each mutation that wasn't
# already handled, and update remaining_indices if mutation is retryable
for idx in active_request_indices.values():
self._handle_entry_error(idx, exc)
# record attempt failure metric
self._operation_metrics.end_attempt_with_status(exc)
# bubble up exception to be handled by retry wrapper
raise
# check if attempt succeeded, or needs to be retried
if self.remaining_indices:
# unfinished work; raise exception to trigger retry
raise bt_exceptions._MutateRowsIncomplete
last_exc = self.errors[self.remaining_indices[-1]][-1]
self._operation_metrics.end_attempt_with_status(last_exc)
raise bt_exceptions._MutateRowsIncomplete()

def _handle_entry_error(self, idx: int, exc: Exception):
"""
Expand Down
123 changes: 79 additions & 44 deletions google/cloud/bigtable/data/_async/_read_rows.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@
from typing import (
TYPE_CHECKING,
AsyncGenerator,
AsyncIterable,
Awaitable,
Sequence,
)
import time

from google.cloud.bigtable_v2.types import ReadRowsRequest as ReadRowsRequestPB
from google.cloud.bigtable_v2.types import ReadRowsResponse as ReadRowsResponsePB
Expand All @@ -34,13 +34,16 @@
from google.cloud.bigtable.data.exceptions import _RowSetComplete
from google.cloud.bigtable.data._helpers import _attempt_timeout_generator
from google.cloud.bigtable.data._helpers import _make_metadata
from google.cloud.bigtable.data._helpers import backoff_generator

from google.api_core.grpc_helpers_async import GrpcAsyncStream
from google.cloud.bigtable.data._helpers import _retry_exception_factory

from google.api_core import retry as retries
from google.api_core.retry import exponential_sleep_generator

if TYPE_CHECKING:
from google.cloud.bigtable.data._async.client import TableAsync
from google.cloud.bigtable.data._metrics import ActiveOperationMetric


class _ResetRow(Exception):
Expand Down Expand Up @@ -70,6 +73,7 @@ class _ReadRowsOperationAsync:
"_metadata",
"_last_yielded_row_key",
"_remaining_count",
"_operation_metrics",
)

def __init__(
Expand All @@ -78,6 +82,7 @@ def __init__(
table: "TableAsync",
operation_timeout: float,
attempt_timeout: float,
metrics: ActiveOperationMetric,
retryable_exceptions: Sequence[type[Exception]] = (),
):
self.attempt_timeout_gen = _attempt_timeout_generator(
Expand All @@ -100,15 +105,21 @@ def __init__(
)
self._last_yielded_row_key: bytes | None = None
self._remaining_count: int | None = self.request.rows_limit or None
self._operation_metrics = metrics

def start_operation(self) -> AsyncGenerator[Row, None]:
"""
Start the read_rows operation, retrying on retryable errors.
"""
self._operation_metrics.start()

sleep_generator = backoff_generator()
self._operation_metrics.backoff_generator = sleep_generator

return retries.retry_target_stream_async(
self._read_rows_attempt,
self._predicate,
exponential_sleep_generator(0.01, 60, multiplier=2),
self._operation_metrics.build_wrapped_predicate(self._predicate),
sleep_generator,
self.operation_timeout,
exception_factory=_retry_exception_factory,
)
Expand All @@ -120,6 +131,8 @@ def _read_rows_attempt(self) -> AsyncGenerator[Row, None]:
which will call this function until it succeeds or
a non-retryable error is raised.
"""
# register metric start
self._operation_metrics.start_attempt()
# revise request keys and ranges between attempts
if self._last_yielded_row_key is not None:
# if this is a retry, try to trim down the request to avoid ones we've already processed
Expand All @@ -130,12 +143,12 @@ def _read_rows_attempt(self) -> AsyncGenerator[Row, None]:
)
except _RowSetComplete:
# if we've already seen all the rows, we're done
return self.merge_rows(None)
return self.merge_rows(None, self._operation_metrics)
# revise the limit based on number of rows already yielded
if self._remaining_count is not None:
self.request.rows_limit = self._remaining_count
if self._remaining_count == 0:
return self.merge_rows(None)
return self.merge_rows(None, self._operation_metrics)
# create and return a new row merger
gapic_stream = self.table.client._gapic_client.read_rows(
self.request,
Expand All @@ -144,70 +157,82 @@ def _read_rows_attempt(self) -> AsyncGenerator[Row, None]:
retry=None,
)
chunked_stream = self.chunk_stream(gapic_stream)
return self.merge_rows(chunked_stream)
return self.merge_rows(chunked_stream, self._operation_metrics)

async def chunk_stream(
self, stream: Awaitable[AsyncIterable[ReadRowsResponsePB]]
self, stream: Awaitable[GrpcAsyncStream[ReadRowsResponsePB]]
) -> AsyncGenerator[ReadRowsResponsePB.CellChunk, None]:
"""
process chunks out of raw read_rows stream
"""
async for resp in await stream:
# extract proto from proto-plus wrapper
resp = resp._pb
call = await stream
try:
async for resp in call:
# extract proto from proto-plus wrapper
resp = resp._pb

# handle last_scanned_row_key packets, sent when server
# has scanned past the end of the row range
if resp.last_scanned_row_key:
if (
self._last_yielded_row_key is not None
and resp.last_scanned_row_key <= self._last_yielded_row_key
):
raise InvalidChunk("last scanned out of order")
self._last_yielded_row_key = resp.last_scanned_row_key

# handle last_scanned_row_key packets, sent when server
# has scanned past the end of the row range
if resp.last_scanned_row_key:
if (
self._last_yielded_row_key is not None
and resp.last_scanned_row_key <= self._last_yielded_row_key
):
raise InvalidChunk("last scanned out of order")
self._last_yielded_row_key = resp.last_scanned_row_key

current_key = None
# process each chunk in the response
for c in resp.chunks:
if current_key is None:
current_key = c.row_key
current_key = None
# process each chunk in the response
for c in resp.chunks:
if current_key is None:
raise InvalidChunk("first chunk is missing a row key")
elif (
self._last_yielded_row_key
and current_key <= self._last_yielded_row_key
):
raise InvalidChunk("row keys should be strictly increasing")
current_key = c.row_key
if current_key is None:
raise InvalidChunk("first chunk is missing a row key")
elif (
self._last_yielded_row_key
and current_key <= self._last_yielded_row_key
):
raise InvalidChunk("row keys should be strictly increasing")

yield c
yield c

if c.reset_row:
current_key = None
elif c.commit_row:
# update row state after each commit
self._last_yielded_row_key = current_key
if self._remaining_count is not None:
self._remaining_count -= 1
if self._remaining_count < 0:
raise InvalidChunk("emit count exceeds row limit")
current_key = None
if c.reset_row:
current_key = None
elif c.commit_row:
# update row state after each commit
self._last_yielded_row_key = current_key
if self._remaining_count is not None:
self._remaining_count -= 1
if self._remaining_count < 0:
raise InvalidChunk("emit count exceeds row limit")
current_key = None
finally:
# ensure stream is closed
call.cancel()
# send trailing metadata to metrics
metadata = await call.trailing_metadata() + await call.initial_metadata()
self._operation_metrics.add_response_metadata(metadata)

@staticmethod
async def merge_rows(
chunks: AsyncGenerator[ReadRowsResponsePB.CellChunk, None] | None
chunks: AsyncGenerator[ReadRowsResponsePB.CellChunk, None] | None,
operation: ActiveOperationMetric,
):
"""
Merge chunks into rows
"""
if chunks is None:
operation.end_with_success()
return
it = chunks.__aiter__()
is_first_row = True
# For each row
while True:
try:
c = await it.__anext__()
except StopAsyncIteration:
# stream complete
operation.end_with_success()
return
row_key = c.row_key

Expand Down Expand Up @@ -284,7 +309,17 @@ async def merge_rows(
Cell(value, row_key, family, qualifier, ts, list(labels))
)
if c.commit_row:
if is_first_row:
# record first row latency in metrics
is_first_row = False
operation.attempt_first_response()
block_time = time.monotonic()
yield Row(row_key, cells)
# most metric operations use setters, but this one updates
# the value directly to avoid extra overhead
operation.active_attempt.application_blocking_time += ( # type: ignore
time.monotonic() - block_time
)
break
c = await it.__anext__()
except _ResetRow as e:
Expand Down
Loading

0 comments on commit 2955411

Please sign in to comment.