Skip to content

Commit

Permalink
added flow control time to throttling_latencies
Browse files Browse the repository at this point in the history
  • Loading branch information
daniel-sanche committed Dec 20, 2023
1 parent b109304 commit 7332a73
Show file tree
Hide file tree
Showing 9 changed files with 112 additions and 13 deletions.
1 change: 0 additions & 1 deletion google/cloud/bigtable/data/_async/_mutate_rows.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,6 @@ async def start(self):
"""
try:
# trigger mutate_rows
self._operation_metrics.start()
await self._operation
except Exception as exc:
# exceptions raised by retryable are added to the list of exceptions for all unfinalized mutations
Expand Down
19 changes: 14 additions & 5 deletions google/cloud/bigtable/data/_async/mutations_batcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from typing import Any, Sequence, TYPE_CHECKING
import asyncio
import atexit
import time
import warnings
from collections import deque

Expand All @@ -33,6 +34,7 @@
)
from google.cloud.bigtable.data.mutations import Mutation
from google.cloud.bigtable.data._metrics import OperationType
from google.cloud.bigtable.data._metrics import ActiveOperationMetric

if TYPE_CHECKING:
from google.cloud.bigtable.data._async.client import TableAsync
Expand Down Expand Up @@ -329,17 +331,26 @@ async def _flush_internal(self, new_entries: list[RowMutationEntry]):
"""
# flush new entries
in_process_requests: list[asyncio.Future[list[FailedMutationEntryError]]] = []
metric = self._table._metrics.create_operation(OperationType.BULK_MUTATE_ROWS)
flow_start_time = time.monotonic()
async for batch in self._flow_control.add_to_flow(new_entries):
batch_task = self._create_bg_task(self._execute_mutate_rows, batch)
# add time waiting on flow control to throttling metric
metric.flow_throttling_time = time.monotonic() - flow_start_time
batch_task = self._create_bg_task(self._execute_mutate_rows, batch, metric)
in_process_requests.append(batch_task)
# start a new metric for next batch
metric = self._table._metrics.create_operation(
OperationType.BULK_MUTATE_ROWS
)
flow_start_time = time.monotonic()
# wait for all inflight requests to complete
found_exceptions = await self._wait_for_batch_results(*in_process_requests)
# update exception data to reflect any new errors
self._entries_processed_since_last_raise += len(new_entries)
self._add_exceptions(found_exceptions)

async def _execute_mutate_rows(
self, batch: list[RowMutationEntry]
self, batch: list[RowMutationEntry], metrics: ActiveOperationMetric
) -> list[FailedMutationEntryError]:
"""
Helper to execute mutation operation on a batch
Expand All @@ -359,9 +370,7 @@ async def _execute_mutate_rows(
batch,
operation_timeout=self._operation_timeout,
attempt_timeout=self._attempt_timeout,
metrics=self._table._metrics.create_operation(
OperationType.BULK_MUTATE_ROWS
),
metrics=metrics,
retryable_exceptions=self._retryable_errors,
)
await operation.start()
Expand Down
10 changes: 10 additions & 0 deletions google/cloud/bigtable/data/_metrics/data_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ class CompletedAttemptMetric:
gfe_latency: float | None = None
application_blocking_time: float = 0.0
backoff_before_attempt: float = 0.0
grpc_throttling_time: float = 0.0


@dataclass(frozen=True)
Expand All @@ -107,6 +108,7 @@ class CompletedOperationMetric:
cluster_id: str
zone: str
is_streaming: bool
flow_throttling_time: float = 0.0


@dataclass
Expand All @@ -121,7 +123,11 @@ class ActiveAttemptMetric:
# time waiting on user to process the response
# currently only relevant for ReadRows
application_blocking_time: float = 0.0
# backoff time is added to application_blocking_time
backoff_before_attempt: float = 0.0
# time waiting on grpc channel
# TODO: capture grpc_throttling_time
grpc_throttling_time: float = 0.0


@dataclass
Expand All @@ -142,6 +148,8 @@ class ActiveOperationMetric:
is_streaming: bool = False # only True for read_rows operations
was_completed: bool = False
handlers: list[MetricsHandler] = field(default_factory=list)
# time waiting on flow control
flow_throttling_time: float = 0.0

@property
def state(self) -> OperationState:
Expand Down Expand Up @@ -276,6 +284,7 @@ def end_attempt_with_status(self, status: StatusCode | Exception) -> None:
gfe_latency=self.active_attempt.gfe_latency,
application_blocking_time=self.active_attempt.application_blocking_time,
backoff_before_attempt=self.active_attempt.backoff_before_attempt,
grpc_throttling_time=self.active_attempt.grpc_throttling_time,
)
self.completed_attempts.append(new_attempt)
self.active_attempt = None
Expand Down Expand Up @@ -314,6 +323,7 @@ def end_with_status(self, status: StatusCode | Exception) -> None:
cluster_id=self.cluster_id or DEFAULT_CLUSTER_ID,
zone=self.zone or DEFAULT_ZONE,
is_streaming=self.is_streaming,
flow_throttling_time=self.flow_throttling_time,
)
for handler in self.handlers:
handler.on_operation_complete(finalized)
Expand Down
12 changes: 12 additions & 0 deletions google/cloud/bigtable/data/_metrics/handlers/opentelemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ def __init__(self):
description="A distribution of the total latency introduced by your application when Cloud Bigtable has available response data but your application has not consumed it.",
unit="ms",
)
self.throttling_latencies = meter.create_histogram(
name="throttling_latencies",
description="The latency introduced by the client by blocking on sending more requests to the server when there are too many pending requests in bulk operations.",
unit="ms",
)


class OpenTelemetryMetricsHandler(MetricsHandler):
Expand Down Expand Up @@ -136,6 +141,8 @@ def on_attempt_complete(
- first_response_latencies
- server_latencies
- connectivity_error_count
- application_blocking_latencies
- throttling_latencies
"""
labels = {
"method": op.op_type.value,
Expand All @@ -145,6 +152,11 @@ def on_attempt_complete(
}

self.otel.attempt_latencies.record(attempt.duration, labels)
combined_throttling = attempt.grpc_throttling_time
if not op.completed_attempts:
# add flow control latency to first attempt's throttling latency
combined_throttling += op.flow_throttling_time
self.otel.throttling_latencies.record(combined_throttling, labels)
self.otel.application_blocking_latencies.record(
attempt.application_blocking_time + attempt.backoff_before_attempt, labels
)
Expand Down
1 change: 0 additions & 1 deletion tests/unit/data/_async/test__mutate_rows.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ def _make_one(self, *args, **kwargs):

async def _mock_stream(self, mutation_list, error_dict):
for idx, entry in enumerate(mutation_list):

code = error_dict.get(idx, 0)
yield MutateRowsResponse(
entries=[
Expand Down
44 changes: 40 additions & 4 deletions tests/unit/data/_async/test_mutations_batcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -903,14 +903,16 @@ async def test__execute_mutate_rows(self, mutate_rows):
table.default_mutate_rows_retryable_errors = ()
async with self._make_one(table) as instance:
batch = [_make_mutation()]
result = await instance._execute_mutate_rows(batch)
mock_metric = mock.Mock()
result = await instance._execute_mutate_rows(batch, mock_metric)
assert start_operation.call_count == 1
args, kwargs = mutate_rows.call_args
assert args[0] == table.client._gapic_client
assert args[1] == table
assert args[2] == batch
kwargs["operation_timeout"] == 17
kwargs["attempt_timeout"] == 13
kwargs["metrics"] == mock_metric
assert result == []

@pytest.mark.asyncio
Expand All @@ -933,7 +935,7 @@ async def test__execute_mutate_rows_returns_errors(self, mutate_rows):
table.default_mutate_rows_retryable_errors = ()
async with self._make_one(table) as instance:
batch = [_make_mutation()]
result = await instance._execute_mutate_rows(batch)
result = await instance._execute_mutate_rows(batch, mock.Mock())
assert len(result) == 2
assert result[0] == err1
assert result[1] == err2
Expand Down Expand Up @@ -1058,7 +1060,7 @@ async def test_timeout_args_passed(self, mutate_rows):
assert instance._operation_timeout == expected_operation_timeout
assert instance._attempt_timeout == expected_attempt_timeout
# make simulated gapic call
await instance._execute_mutate_rows([_make_mutation()])
await instance._execute_mutate_rows([_make_mutation()], mock.Mock())
assert mutate_rows.call_count == 1
kwargs = mutate_rows.call_args[1]
assert kwargs["operation_timeout"] == expected_operation_timeout
Expand Down Expand Up @@ -1174,11 +1176,45 @@ async def test_customizable_retryable_errors(
predicate_builder_mock.return_value = expected_predicate
retry_fn_mock.side_effect = RuntimeError("stop early")
mutation = _make_mutation(count=1, size=1)
await instance._execute_mutate_rows([mutation])
await instance._execute_mutate_rows([mutation], mock.Mock())
# passed in errors should be used to build the predicate
predicate_builder_mock.assert_called_once_with(
*expected_retryables, _MutateRowsIncomplete
)
retry_call_args = retry_fn_mock.call_args_list[0].args
# output of if_exception_type should be sent in to retry constructor
assert retry_call_args[1] is expected_predicate

@pytest.mark.asyncio
@pytest.mark.parametrize("sleep_time,flow_size", [(0, 10), (0.1, 1), (0.01, 10)])
async def test_flow_throttling_metric(self, sleep_time, flow_size):
"""
When there are delays due to waiting on flow control,
should be reflected in operation metric's flow_throttling_time
"""
import time
from google.cloud.bigtable.data._metrics import (
BigtableClientSideMetricsController,
)
from google.cloud.bigtable.data._metrics import ActiveOperationMetric

# create mock call
async def mock_add_to_flow():
time.sleep(sleep_time)
for _ in range(flow_size):
await asyncio.sleep(0)
yield mock.Mock()

mock_instance = mock.Mock()
mock_instance._wait_for_batch_results.return_value = asyncio.sleep(0)
mock_instance._entries_processed_since_last_raise = 0
mock_instance._table._metrics = BigtableClientSideMetricsController([])
mock_instance._flow_control.add_to_flow.return_value = mock_add_to_flow()
await self._get_target_class()._flush_internal(mock_instance, [])
# get list of metrics
mock_bg_task = mock_instance._create_bg_task
metric_list = [arg[0][-1] for arg in mock_bg_task.call_args_list]
# make sure operations were set up as expected
assert len(metric_list) == flow_size
assert all([isinstance(m, ActiveOperationMetric) for m in metric_list])
assert abs(metric_list[0].flow_throttling_time - sleep_time) < 0.002
24 changes: 22 additions & 2 deletions tests/unit/data/_metrics/handlers/test_handler_opentelemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def _make_one(self, **kwargs):
("server_latencies", "histogram"),
("connectivity_error_count", "count"),
("application_blocking_latencies", "histogram"),
# ("throttling_latencies", "histogram"),
("throttling_latencies", "histogram"),
],
)
def test_ctor_creates_metrics(self, metric_name, kind):
Expand Down Expand Up @@ -148,7 +148,7 @@ def ctor_defaults(self):
("server_latencies", "histogram"),
("connectivity_error_count", "count"),
("application_blocking_latencies", "histogram"),
# ("throttling_latencies", "histogram"),
("throttling_latencies", "histogram"),
],
)
def test_attempt_update_labels(self, metric_name, kind):
Expand Down Expand Up @@ -326,6 +326,26 @@ def test_attempt_update_application_blocking_latencies(self, app_blocking, backo
assert record.call_count == 1
assert record.call_args[0][0] == expected_total_latency

@pytest.mark.parametrize("grpc,flow", [(0, 10), (10, 0), (123, 456)])
def test_attempt_update_throttling_latencies(self, grpc, flow):
"""
Update throttling_latencies on attempt completion
"""
expected_total_latency = grpc + flow
attempt = CompletedAttemptMetric(
start_time=0,
duration=1,
end_status=mock.Mock(),
grpc_throttling_time=grpc,
)
op = ActiveOperationMetric(mock.Mock(), flow_throttling_time=flow)

instance = self._make_one()
with mock.patch.object(instance.otel.throttling_latencies, "record") as record:
instance.on_attempt_complete(attempt, op)
assert record.call_count == 1
assert record.call_args[0][0] == expected_total_latency

def tyest_operation_update_latency(self):
"""
update op_latency on operation completion
Expand Down
11 changes: 11 additions & 0 deletions tests/unit/data/_metrics/test_data_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ def test_ctor_defaults(self):
assert metric.was_completed is False
assert len(metric.handlers) == 0
assert metric.is_streaming is False
assert metric.flow_throttling_time == 0

def test_ctor_explicit(self):
"""
Expand All @@ -59,6 +60,7 @@ def test_ctor_explicit(self):
expected_was_completed = True
expected_handlers = [mock.Mock()]
expected_is_streaming = True
expected_flow_throttling = 12
metric = self._make_one(
op_type=expected_type,
start_time=expected_start_time,
Expand All @@ -69,6 +71,7 @@ def test_ctor_explicit(self):
was_completed=expected_was_completed,
handlers=expected_handlers,
is_streaming=expected_is_streaming,
flow_throttling_time=expected_flow_throttling,
)
assert metric.op_type == expected_type
assert metric.start_time == expected_start_time
Expand All @@ -79,6 +82,7 @@ def test_ctor_explicit(self):
assert metric.was_completed == expected_was_completed
assert metric.handlers == expected_handlers
assert metric.is_streaming == expected_is_streaming
assert metric.flow_throttling_time == expected_flow_throttling

def test_state_machine_w_methods(self):
"""
Expand Down Expand Up @@ -209,6 +213,7 @@ def test_start_attempt(self):
) < datetime.timedelta(seconds=0.1)
assert metric.active_attempt.first_response_latency is None
assert metric.active_attempt.gfe_latency is None
assert metric.active_attempt.grpc_throttling_time == 0
# should be in ACTIVE_ATTEMPT state after completing
assert metric.state == State.ACTIVE_ATTEMPT

Expand Down Expand Up @@ -406,6 +411,7 @@ def test_end_attempt_with_status(self):
expected_gfe_latency = 5
expected_app_blocking = 12
expected_backoff = 2
expected_grpc_throttle = 3

metric = self._make_one(mock.Mock())
assert metric.active_attempt is None
Expand All @@ -416,6 +422,7 @@ def test_end_attempt_with_status(self):
metric.active_attempt.first_response_latency = expected_latency
metric.active_attempt.application_blocking_time = expected_app_blocking
metric.active_attempt.backoff_before_attempt = expected_backoff
metric.active_attempt.grpc_throttling_time = expected_grpc_throttle
metric.end_attempt_with_status(expected_status)
assert len(metric.completed_attempts) == 1
got_attempt = metric.completed_attempts[0]
Expand All @@ -425,6 +432,7 @@ def test_end_attempt_with_status(self):
time.monotonic() - got_attempt.duration - expected_start_time.monotonic
< 0.001
)
assert got_attempt.grpc_throttling_time == expected_grpc_throttle
assert got_attempt.end_status == expected_status
assert got_attempt.gfe_latency == expected_gfe_latency
assert got_attempt.application_blocking_time == expected_app_blocking
Expand Down Expand Up @@ -461,6 +469,7 @@ def test_end_with_status(self):
expected_attempt_start_time = TimeTuple(10, 0)
expected_attempt_first_response_latency = 9
expected_attempt_gfe_latency = 5
expected_flow_time = 16

expected_status = object()
expected_type = object()
Expand All @@ -476,6 +485,7 @@ def test_end_with_status(self):
metric.cluster_id = expected_cluster
metric.zone = expected_zone
metric.is_streaming = is_streaming
metric.flow_throttling_time = expected_flow_time
attempt = ActiveAttemptMetric(
start_time=expected_attempt_start_time,
first_response_latency=expected_attempt_first_response_latency,
Expand All @@ -500,6 +510,7 @@ def test_end_with_status(self):
assert called_with.cluster_id == expected_cluster
assert called_with.zone == expected_zone
assert called_with.is_streaming == is_streaming
assert called_with.flow_throttling_time == expected_flow_time
# check the attempt
assert len(called_with.completed_attempts) == 1
final_attempt = called_with.completed_attempts[0]
Expand Down
3 changes: 3 additions & 0 deletions tests/unit/data/_metrics/test_rpcs_instrumented.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,9 @@ async def test_rpc_instrumented(fn_name, fn_args, gapic_fn, is_unary, expected_t
# no application blocking time or backoff time expected
assert found_attempt.application_blocking_time == 0
assert found_attempt.backoff_before_attempt == 0
# no throttling expected
assert found_attempt.grpc_throttling_time == 0
assert found_operation.flow_throttling_time == 0


@pytest.mark.parametrize(RPC_ARGS, RETRYABLE_RPCS)
Expand Down

0 comments on commit 7332a73

Please sign in to comment.