Skip to content

Commit

Permalink
Merge branch 'client_side_metrics_data_model' into client_side_metric…
Browse files Browse the repository at this point in the history
…s_handlers
  • Loading branch information
daniel-sanche committed Feb 1, 2024
2 parents a15a1bc + 24432c8 commit b38b1b5
Show file tree
Hide file tree
Showing 7 changed files with 299 additions and 178 deletions.
20 changes: 20 additions & 0 deletions google/cloud/bigtable/data/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from google.cloud.bigtable.data.read_rows_query import ReadRowsQuery

from google.api_core import exceptions as core_exceptions
from google.api_core.retry import exponential_sleep_generator
from google.api_core.retry import RetryFailureReason
from google.cloud.bigtable.data.exceptions import RetryExceptionGroup

Expand Down Expand Up @@ -97,6 +98,25 @@ def _attempt_timeout_generator(
yield max(0, min(per_request_timeout, deadline - time.monotonic()))


def backoff_generator(initial=0.01, multiplier=2, maximum=60):
"""
Build a generator for exponential backoff sleep times.
This implementation builds on top of api_core.retries.exponential_sleep_generator,
adding the ability to retrieve previous values using the send(idx) method. This is
used by the Metrics class to track the sleep times used for each attempt.
"""
history = []
subgenerator = exponential_sleep_generator(initial, multiplier, maximum)
while True:
next_backoff = next(subgenerator)
history.append(next_backoff)
sent_idx = yield next_backoff
while sent_idx is not None:
# requesting from history
sent_idx = yield history[sent_idx]


def _retry_exception_factory(
exc_list: list[Exception],
reason: RetryFailureReason,
Expand Down
145 changes: 82 additions & 63 deletions google/cloud/bigtable/data/_metrics/data_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,31 +28,38 @@
from grpc import StatusCode

import google.cloud.bigtable.data.exceptions as bt_exceptions
from google.cloud.bigtable_v2.types.response_params import ResponseParams
from google.protobuf.message import DecodeError

if TYPE_CHECKING:
from google.cloud.bigtable.data._metrics.handlers._base import MetricsHandler


# by default, exceptions in the metrics system are logged,
# but enabling this flag causes them to be raised instead
ALLOW_METRIC_EXCEPTIONS = os.getenv("BIGTABLE_METRICS_EXCEPTIONS", False)
LOGGER = (
logging.getLogger(__name__) if os.getenv("BIGTABLE_METRICS_LOGS", False) else None
)
LOGGER = logging.getLogger(__name__)

# default values for zone and cluster data, if not captured
DEFAULT_ZONE = "global"
DEFAULT_CLUSTER_ID = "unspecified"

# keys for parsing metadata blobs
BIGTABLE_METADATA_KEY = "x-goog-ext-425905942-bin"
SERVER_TIMING_METADATA_KEY = "server-timing"

SERVER_TIMING_REGEX = re.compile(r"gfet4t7; dur=(\d+)")

INVALID_STATE_ERROR = "Invalid state for {}: {}"


# create a named tuple that holds the clock time, and a more accurate monotonic timestamp
# this allows us to be resistent to clock changes, eg DST
@dataclass(frozen=True)
class TimeTuple:
"""
Tuple that holds both the utc timestamp to record with the metrics, and a
monotonic timestamp for calculating durations. The monotonic timestamp is
preferred for calculations because it is resilient to clock changes, eg DST
"""

utc: datetime.datetime = field(
default_factory=lambda: datetime.datetime.now(datetime.timezone.utc)
)
Expand Down Expand Up @@ -82,60 +89,67 @@ class OperationState(Enum):
@dataclass(frozen=True)
class CompletedAttemptMetric:
"""
A dataclass representing the data associated with a completed rpc attempt.
An immutable dataclass representing the data associated with a
completed rpc attempt.
"""

start_time: datetime.datetime
duration: float
duration_ms: float
end_status: StatusCode
first_response_latency: float | None = None
gfe_latency: float | None = None
application_blocking_time: float = 0.0
backoff_before_attempt: float = 0.0
grpc_throttling_time: float = 0.0
first_response_latency_ms: float | None = None
gfe_latency_ms: float | None = None
application_blocking_time_ms: float = 0.0
backoff_before_attempt_ms: float = 0.0
grpc_throttling_time_ms: float = 0.0


@dataclass(frozen=True)
class CompletedOperationMetric:
"""
A dataclass representing the data associated with a completed rpc operation.
An immutable dataclass representing the data associated with a
completed rpc operation.
"""

op_type: OperationType
start_time: datetime.datetime
duration: float
duration_ms: float
completed_attempts: list[CompletedAttemptMetric]
final_status: StatusCode
cluster_id: str
zone: str
is_streaming: bool
flow_throttling_time: float = 0.0
flow_throttling_time_ms: float = 0.0


@dataclass
class ActiveAttemptMetric:
"""
A dataclass representing the data associated with an rpc attempt that is
currently in progress. Fields are mutable and may be optional.
"""

# keep both clock time and monotonic timestamps for active attempts
start_time: TimeTuple = field(default_factory=TimeTuple)
# the time it takes to recieve the first response from the server
# the time it takes to recieve the first response from the server, in milliseconds
# currently only tracked for ReadRows
first_response_latency: float | None = None
# the time taken by the backend. Taken from response header
gfe_latency: float | None = None
# time waiting on user to process the response
first_response_latency_ms: float | None = None
# the time taken by the backend, in milliseconds. Taken from response header
gfe_latency_ms: float | None = None
# time waiting on user to process the response, in milliseconds
# currently only relevant for ReadRows
application_blocking_time: float = 0.0
# backoff time is added to application_blocking_time
backoff_before_attempt: float = 0.0
# time waiting on grpc channel
application_blocking_time_ms: float = 0.0
# backoff time is added to application_blocking_time_ms
backoff_before_attempt_ms: float = 0.0
# time waiting on grpc channel, in milliseconds
# TODO: capture grpc_throttling_time
grpc_throttling_time: float = 0.0
grpc_throttling_time_ms: float = 0.0


@dataclass
class ActiveOperationMetric:
"""
A dataclass representing the data associated with an rpc operation that is
currently in progress.
currently in progress. Fields are mutable and may be optional.
"""

op_type: OperationType
Expand All @@ -149,8 +163,8 @@ class ActiveOperationMetric:
is_streaming: bool = False # only True for read_rows operations
was_completed: bool = False
handlers: list[MetricsHandler] = field(default_factory=list)
# time waiting on flow control
flow_throttling_time: float = 0.0
# time waiting on flow control, in milliseconds
flow_throttling_time_ms: float = 0.0

@property
def state(self) -> OperationState:
Expand Down Expand Up @@ -194,11 +208,14 @@ def start_attempt(self) -> None:
# find backoff value
if self.backoff_generator and len(self.completed_attempts) > 0:
# find the attempt's backoff by sending attempt number to generator
backoff = self.backoff_generator.send(len(self.completed_attempts) - 1)
# generator will return the backoff time in seconds, so convert to ms
backoff_ms = (
self.backoff_generator.send(len(self.completed_attempts) - 1) * 1000
)
else:
backoff = 0
backoff_ms = 0

self.active_attempt = ActiveAttemptMetric(backoff_before_attempt=backoff)
self.active_attempt = ActiveAttemptMetric(backoff_before_attempt_ms=backoff_ms)

def add_response_metadata(self, metadata: dict[str, bytes | str]) -> None:
"""
Expand All @@ -217,12 +234,16 @@ def add_response_metadata(self, metadata: dict[str, bytes | str]) -> None:
INVALID_STATE_ERROR.format("add_response_metadata", self.state)
)
if self.cluster_id is None or self.zone is None:
# BIGTABLE_METADATA_KEY should give a binary string with cluster_id and zone
# BIGTABLE_METADATA_KEY should give a binary-encoded ResponseParams proto
blob = cast(bytes, metadata.get(BIGTABLE_METADATA_KEY))
if blob:
parse_result = self._parse_response_metadata_blob(blob)
if parse_result is not None:
self.zone, self.cluster_id = parse_result
cluster, zone = parse_result
if cluster:
self.cluster_id = cluster
if zone:
self.zone = zone
else:
self._handle_error(
f"Failed to decode {BIGTABLE_METADATA_KEY} metadata: {blob!r}"
Expand All @@ -232,31 +253,25 @@ def add_response_metadata(self, metadata: dict[str, bytes | str]) -> None:
if timing_header:
timing_data = SERVER_TIMING_REGEX.match(timing_header)
if timing_data and self.active_attempt:
# convert from milliseconds to seconds
self.active_attempt.gfe_latency = float(timing_data.group(1)) / 1000
self.active_attempt.gfe_latency_ms = float(timing_data.group(1))

@staticmethod
@lru_cache(maxsize=32)
def _parse_response_metadata_blob(blob: bytes) -> Tuple[str, str] | None:
"""
Parse the response metadata blob and return a dictionary of key-value pairs.
Parse the response metadata blob and return a tuple of cluster and zone.
Function is cached to avoid parsing the same blob multiple times.
Args:
- blob: the metadata blob as extracted from the grpc call
Returns:
- a tuple of zone and cluster_id, or None if parsing failed
- a tuple of cluster_id and zone, or None if parsing failed
"""
try:
decoded = "".join(
c if c.isprintable() else " " for c in blob.decode("utf-8")
)
split_data = decoded.split()
zone = split_data[0]
cluster_id = split_data[1]
return zone, cluster_id
except (AttributeError, IndexError):
proto = ResponseParams.pb().FromString(blob)
return proto.cluster_id, proto.zone_id
except (DecodeError, TypeError):
# failed to parse metadata
return None

Expand All @@ -273,11 +288,12 @@ def attempt_first_response(self) -> None:
return self._handle_error(
INVALID_STATE_ERROR.format("attempt_first_response", self.state)
)
if self.active_attempt.first_response_latency is not None:
if self.active_attempt.first_response_latency_ms is not None:
return self._handle_error("Attempt already received first response")
self.active_attempt.first_response_latency = (
# convert duration to milliseconds
self.active_attempt.first_response_latency_ms = (
time.monotonic() - self.active_attempt.start_time.monotonic
)
) * 1000

def end_attempt_with_status(self, status: StatusCode | Exception) -> None:
"""
Expand All @@ -293,18 +309,18 @@ def end_attempt_with_status(self, status: StatusCode | Exception) -> None:
return self._handle_error(
INVALID_STATE_ERROR.format("end_attempt_with_status", self.state)
)

if isinstance(status, Exception):
status = self._exc_to_status(status)
duration_seconds = time.monotonic() - self.active_attempt.start_time.monotonic
new_attempt = CompletedAttemptMetric(
start_time=self.active_attempt.start_time.utc,
first_response_latency=self.active_attempt.first_response_latency,
duration=time.monotonic() - self.active_attempt.start_time.monotonic,
end_status=self._exc_to_status(status)
if isinstance(status, Exception)
else status,
gfe_latency=self.active_attempt.gfe_latency,
application_blocking_time=self.active_attempt.application_blocking_time,
backoff_before_attempt=self.active_attempt.backoff_before_attempt,
grpc_throttling_time=self.active_attempt.grpc_throttling_time,
first_response_latency_ms=self.active_attempt.first_response_latency_ms,
duration_ms=duration_seconds * 1000,
end_status=status,
gfe_latency_ms=self.active_attempt.gfe_latency_ms,
application_blocking_time_ms=self.active_attempt.application_blocking_time_ms,
backoff_before_attempt_ms=self.active_attempt.backoff_before_attempt_ms,
grpc_throttling_time_ms=self.active_attempt.grpc_throttling_time_ms,
)
self.completed_attempts.append(new_attempt)
self.active_attempt = None
Expand Down Expand Up @@ -338,12 +354,12 @@ def end_with_status(self, status: StatusCode | Exception) -> None:
op_type=self.op_type,
start_time=self.start_time.utc,
completed_attempts=self.completed_attempts,
duration=time.monotonic() - self.start_time.monotonic,
duration_ms=(time.monotonic() - self.start_time.monotonic) * 1000,
final_status=final_status,
cluster_id=self.cluster_id or DEFAULT_CLUSTER_ID,
zone=self.zone or DEFAULT_ZONE,
is_streaming=self.is_streaming,
flow_throttling_time=self.flow_throttling_time,
flow_throttling_time_ms=self.flow_throttling_time_ms,
)
for handler in self.handlers:
handler.on_operation_complete(finalized)
Expand Down Expand Up @@ -417,12 +433,15 @@ def _handle_error(message: str) -> None:
full_message = f"Error in Bigtable Metrics: {message}"
if ALLOW_METRIC_EXCEPTIONS:
raise ValueError(full_message)
if LOGGER:
LOGGER.warning(full_message)
LOGGER.warning(full_message)

async def __aenter__(self):
"""
Implements the async context manager protocol for wrapping unary calls
Using the operation's context manager provides assurances that the operation
is always closed when complete, with the proper status code automaticallty
detected when an exception is raised.
"""
return self._AsyncContextManager(self)

Expand Down
18 changes: 9 additions & 9 deletions google/cloud/bigtable/data/_metrics/handlers/opentelemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ def on_operation_complete(self, op: CompletedOperationMetric) -> None:
is_streaming = str(op.is_streaming)

self.otel.operation_latencies.record(
op.duration, {"streaming": is_streaming, **labels}
op.duration_ms, {"streaming": is_streaming, **labels}
)
# only record completed attempts if there were retries
if op.completed_attempts:
Expand Down Expand Up @@ -203,26 +203,26 @@ def on_attempt_complete(
is_streaming = str(op.is_streaming)

self.otel.attempt_latencies.record(
attempt.duration, {"streaming": is_streaming, "status": status, **labels}
attempt.duration_ms, {"streaming": is_streaming, "status": status, **labels}
)
combined_throttling = attempt.grpc_throttling_time
combined_throttling = attempt.grpc_throttling_time_ms
if not op.completed_attempts:
# add flow control latency to first attempt's throttling latency
combined_throttling += op.flow_throttling_time
combined_throttling += op.flow_throttling_time_ms
self.otel.throttling_latencies.record(combined_throttling, labels)
self.otel.application_latencies.record(
attempt.application_blocking_time + attempt.backoff_before_attempt, labels
attempt.application_blocking_time_ms + attempt.backoff_before_attempt_ms, labels
)
if (
op.op_type == OperationType.READ_ROWS
and attempt.first_response_latency is not None
and attempt.first_response_latency_ms is not None
):
self.otel.first_response_latencies.record(
attempt.first_response_latency, {"status": status, **labels}
attempt.first_response_latency_ms, {"status": status, **labels}
)
if attempt.gfe_latency is not None:
if attempt.gfe_latency_ms is not None:
self.otel.server_latencies.record(
attempt.gfe_latency,
attempt.gfe_latency_ms,
{"streaming": is_streaming, "status": status, **labels},
)
else:
Expand Down
1 change: 0 additions & 1 deletion noxfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,6 @@ def format(session):
def mypy(session):
"""Verify type hints are mypy compatible."""
session.install("-e", ".")
session.install("-e", "python-api-core")
session.install(
"mypy",
"types-setuptools",
Expand Down
Loading

0 comments on commit b38b1b5

Please sign in to comment.