Skip to content

Commit

Permalink
added instrumebtation test for batcher
Browse files Browse the repository at this point in the history
  • Loading branch information
daniel-sanche committed Dec 14, 2023
1 parent 22f66b9 commit 8a457ee
Showing 1 changed file with 65 additions and 0 deletions.
65 changes: 65 additions & 0 deletions tests/unit/data/_metrics/test_rpcs_instrumented.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,3 +233,68 @@ async def test_rpc_instrumented_multiple_attempts(
)
assert success.backoff_before_attempt > 0
assert failure.backoff_before_attempt == 0


@pytest.mark.asyncio
async def test_batcher_rpcs_instrumented():
"""check that all requests attach proper metadata headers"""
from google.cloud.bigtable.data import TableAsync
from google.cloud.bigtable.data import BigtableDataClientAsync

cluster_data = "my-cluster"
zone_data = "my-zone"
expected_gfe_latency = 123

with mock.patch(
"google.cloud.bigtable_v2.BigtableAsyncClient.mutate_rows"
) as gapic_mock:
# populate metadata fields
initial_metadata = Metadata(
(BIGTABLE_METADATA_KEY, f"{cluster_data} {zone_data}".encode("utf-8"))
)
trailing_metadata = Metadata(
(SERVER_TIMING_METADATA_KEY, f"gfet4t7; dur={expected_gfe_latency*1000}")
)
grpc_call = mock_grpc_call(
initial_metadata=initial_metadata, trailing_metadata=trailing_metadata
)
gapic_mock.return_value = grpc_call
async with BigtableDataClientAsync() as client:
table = TableAsync(client, "instance-id", "table-id")
# customize metrics handlers
mock_metric_handler = mock.Mock()
table._metrics.handlers = [mock_metric_handler]
async with table.mutations_batcher() as batcher:
await batcher.append(
mutations.RowMutationEntry(
b"row-key", [mutations.DeleteAllFromRow()]
)
)
# check for recorded metrics values
assert mock_metric_handler.on_operation_complete.call_count == 1
found_operation = mock_metric_handler.on_operation_complete.call_args[0][0]
# make sure expected fields were set properly
assert found_operation.op_type == OperationType.BULK_MUTATE_ROWS
now = datetime.datetime.now(datetime.timezone.utc)
assert found_operation.start_time - now < datetime.timedelta(seconds=1)
assert found_operation.duration < 0.1
assert found_operation.duration > 0
assert found_operation.final_status == StatusCode.OK
assert found_operation.cluster_id == cluster_data
assert found_operation.zone == zone_data
assert found_operation.is_streaming is False
# check attempts
assert len(found_operation.completed_attempts) == 1
found_attempt = found_operation.completed_attempts[0]
assert found_attempt.end_status == StatusCode.OK
assert found_attempt.start_time - now < datetime.timedelta(seconds=1)
assert found_attempt.duration < 0.1
assert found_attempt.duration > 0
assert found_attempt.start_time >= found_operation.start_time
assert found_attempt.duration <= found_operation.duration
assert found_attempt.gfe_latency == expected_gfe_latency
# first response latency not populated, because no real read_rows chunks processed
assert found_attempt.first_response_latency is None
# no application blocking time or backoff time expected
assert found_attempt.application_blocking_time == 0
assert found_attempt.backoff_before_attempt == 0

0 comments on commit 8a457ee

Please sign in to comment.