From ca794e240a04d86f5d0f6bb6502ff55160b534f7 Mon Sep 17 00:00:00 2001 From: Igor Berntein Date: Thu, 3 Oct 2024 18:20:51 -0400 Subject: [PATCH] wip Change-Id: Icd88367491120c32045ff11bf7b89cdcc0ea8b04 --- .../data/v2/stub/EnhancedBigtableStub.java | 57 ++++++++++++++----- .../data/v2/stub/metrics/BigtableTracer.java | 9 +++ .../v2/stub/metrics/BuiltinMetricsTracer.java | 23 +++++++- .../data/v2/stub/metrics/CompositeTracer.java | 19 +++++++ .../data/v2/stub/metrics/MetricsTracer.java | 17 +++++- .../stub/readrows/ReadRowsFirstCallable.java | 46 ++------------- .../data/v2/stub/CookiesHolderTest.java | 5 +- .../v2/stub/EnhancedBigtableStubTest.java | 34 +++++++---- .../stub/metrics/BuiltinMetricsTestUtils.java | 27 +++++---- .../metrics/BuiltinMetricsTracerTest.java | 8 ++- .../readrows/ReadRowsFirstCallableTest.java | 23 ++++---- 11 files changed, 170 insertions(+), 98 deletions(-) diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java index d0022a1a46..8c35cf731a 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java @@ -39,6 +39,7 @@ import com.google.api.gax.retrying.RetryAlgorithm; import com.google.api.gax.retrying.RetryingExecutorWithContext; import com.google.api.gax.retrying.ScheduledRetryingExecutor; +import com.google.api.gax.retrying.SimpleStreamResumptionStrategy; import com.google.api.gax.rpc.Callables; import com.google.api.gax.rpc.ClientContext; import com.google.api.gax.rpc.RequestParamsExtractor; @@ -123,6 +124,8 @@ import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsBatchingDescriptor; import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsPartialErrorRetryAlgorithm; import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsRetryingCallable; +import com.google.cloud.bigtable.data.v2.stub.opt.UnaryOverStreamingCallable; +import com.google.cloud.bigtable.data.v2.stub.opt.UnaryOverStreamingTracerModCallable; import com.google.cloud.bigtable.data.v2.stub.readrows.FilterMarkerRowsCallable; import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsBatchingDescriptor; import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsFirstCallable; @@ -154,6 +157,7 @@ import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; +import java.time.Duration; import java.util.Collections; import java.util.List; import java.util.Map; @@ -563,11 +567,13 @@ public UnaryCallable createReadRowCallable(RowAdapter ReadRowsFirstCallable firstRow = new ReadRowsFirstCallable<>(readRowCallable); - UnaryCallable traced = - new TracedUnaryCallable<>( - firstRow, clientContext.getTracerFactory(), getSpanName("ReadRow")); - - return traced.withDefaultCallContext(clientContext.getDefaultCallContext()); + UnaryOverStreamingTracerModCallable modCallable = + new UnaryOverStreamingTracerModCallable(firstRow); + TracedServerStreamingCallable traced = + new TracedServerStreamingCallable<>( + modCallable, clientContext.getTracerFactory(), getSpanName("ReadRow")); + UnaryOverStreamingCallable unaryAdapter = new UnaryOverStreamingCallable<>(traced); + return unaryAdapter.withDefaultCallContext(clientContext.getDefaultCallContext()); } /** @@ -794,8 +800,8 @@ private UnaryCallable> createSampleRowKeysCallable() { */ private UnaryCallable createMutateRowCallable() { String methodName = "MutateRow"; - UnaryCallable base = - GrpcRawCallableFactory.createUnaryCallable( + ServerStreamingCallable base = + GrpcRawCallableFactory.createServerStreamingCallable( GrpcCallSettings.newBuilder() .setMethodDescriptor(BigtableGrpc.getMutateRowMethod()) .setParamsExtractor( @@ -818,17 +824,26 @@ public Map extract(MutateRowRequest mutateRowRequest) { .build(), settings.mutateRowSettings().getRetryableCodes()); - UnaryCallable withStatsHeaders = - new StatsHeadersUnaryCallable<>(base); + ServerStreamingCallable withStatsHeaders = + new StatsHeadersServerStreamingCallable<>(base); - UnaryCallable withBigtableTracer = - new BigtableTracerUnaryCallable<>(withStatsHeaders); + ServerStreamingCallable withBigtableTracer = + new BigtableTracerStreamingCallable<>(withStatsHeaders); - UnaryCallable retrying = - withRetries(withBigtableTracer, settings.mutateRowSettings()); + ServerStreamingCallable retrying = + withRetries( + withBigtableTracer, + convertUnaryToServerStreamingSettings(settings.mutateRowSettings())); - return createUserFacingUnaryCallable( - methodName, new MutateRowCallable(retrying, requestContext)); + UnaryOverStreamingTracerModCallable modCallable = + new UnaryOverStreamingTracerModCallable<>(retrying); + TracedServerStreamingCallable traced = + new TracedServerStreamingCallable<>( + modCallable, clientContext.getTracerFactory(), getSpanName(methodName)); + UnaryOverStreamingCallable callable = + new UnaryOverStreamingCallable<>(traced); + MutateRowCallable mutateRowCallable = new MutateRowCallable(callable, requestContext); + return mutateRowCallable.withDefaultCallContext(clientContext.getDefaultCallContext()); } /** @@ -1512,6 +1527,18 @@ private SpanName getSpanName(String methodName) { return SpanName.of(CLIENT_NAME, methodName); } + private + ServerStreamingCallSettings convertUnaryToServerStreamingSettings( + UnaryCallSettings unarySettings) { + return ServerStreamingCallSettings.newBuilder() + .setResumptionStrategy(new SimpleStreamResumptionStrategy<>()) + .setRetryableCodes(unarySettings.getRetryableCodes()) + .setRetrySettings(unarySettings.getRetrySettings()) + .setIdleTimeoutDuration(Duration.ZERO) + .setWaitTimeoutDuration(Duration.ZERO) + .build(); + } + @Override public void close() { if (closeClientContext) { diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracer.java index 3445514f7b..e5f4dc4cf9 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracer.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracer.java @@ -18,6 +18,7 @@ import com.google.api.core.BetaApi; import com.google.api.gax.rpc.ApiCallContext; import com.google.api.gax.tracing.ApiTracer; +import com.google.api.gax.tracing.ApiTracerFactory; import com.google.api.gax.tracing.BaseApiTracer; import javax.annotation.Nullable; @@ -86,4 +87,12 @@ public void setLocations(String zone, String cluster) { public void grpcChannelQueuedLatencies(long queuedTimeMs) { // noop } + + public void overrideOperationType(ApiTracerFactory.OperationType operationType) { + // noop + } + + public void operationFinishedEarly() { + // noop + } } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracer.java index abd214d760..e33753a3d3 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracer.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracer.java @@ -47,7 +47,7 @@ class BuiltinMetricsTracer extends BigtableTracer { private static final String NAME = "java-bigtable/" + Version.VERSION; - private final OperationType operationType; + private volatile OperationType operationType; private final SpanName spanName; // Operation level metrics @@ -132,6 +132,21 @@ public void close() {} }; } + @Override + public void overrideOperationType(OperationType operationType) { + this.operationType = operationType; + } + + @Override + public void operationFinishedEarly() { + if (attemptTimer.isRunning()) { + attemptTimer.stop(); + } + if (operationTimer.isRunning()) { + operationTimer.stop(); + } + } + @Override public void operationSucceeded() { recordOperationCompletion(null); @@ -276,7 +291,11 @@ private void recordOperationCompletion(@Nullable Throwable status) { if (!opFinished.compareAndSet(false, true)) { return; } - operationTimer.stop(); + + // timer might've been stopped early because due to #operationFinishedEarly() + if (operationTimer.isRunning()) { + operationTimer.stop(); + } boolean isStreaming = operationType == OperationType.ServerStreaming; String statusStr = Util.extractStatus(status); diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracer.java index 774c6d9f22..71ea860c89 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracer.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracer.java @@ -16,6 +16,7 @@ package com.google.cloud.bigtable.data.v2.stub.metrics; import com.google.api.gax.tracing.ApiTracer; +import com.google.api.gax.tracing.ApiTracerFactory; import com.google.common.collect.ImmutableList; import java.util.ArrayList; import java.util.List; @@ -62,6 +63,24 @@ public void close() { }; } + @Override + public void overrideOperationType(ApiTracerFactory.OperationType operationType) { + for (ApiTracer child : children) { + if (child instanceof BigtableTracer) { + ((BigtableTracer) child).overrideOperationType(operationType); + } + } + } + + @Override + public void operationFinishedEarly() { + for (ApiTracer child : children) { + if (child instanceof BigtableTracer) { + ((BigtableTracer) child).operationFinishedEarly(); + } + } + } + @Override public void operationSucceeded() { for (ApiTracer child : children) { diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracer.java index 0ffabe2606..f76020e637 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracer.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracer.java @@ -36,7 +36,7 @@ class MetricsTracer extends BigtableTracer { - private final OperationType operationType; + private volatile OperationType operationType; private final Tagger tagger; private final StatsRecorder stats; @@ -84,6 +84,17 @@ public void close() {} }; } + @Override + public void overrideOperationType(OperationType operationType) { + this.operationType = operationType; + } + + @Override + public void operationFinishedEarly() { + attemptTimer.stop(); + operationTimer.stop(); + } + @Override public void operationSucceeded() { recordOperationCompletion(null); @@ -103,7 +114,9 @@ private void recordOperationCompletion(@Nullable Throwable throwable) { if (!opFinished.compareAndSet(false, true)) { return; } - operationTimer.stop(); + if (operationTimer.isRunning()) { + operationTimer.stop(); + } long elapsed = operationTimer.elapsed(TimeUnit.MILLISECONDS); diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsFirstCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsFirstCallable.java index 2ef26605b4..5c4aaed8b5 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsFirstCallable.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsFirstCallable.java @@ -15,14 +15,10 @@ */ package com.google.cloud.bigtable.data.v2.stub.readrows; -import com.google.api.core.ApiFuture; import com.google.api.core.InternalApi; -import com.google.api.core.SettableApiFuture; import com.google.api.gax.rpc.ApiCallContext; +import com.google.api.gax.rpc.ResponseObserver; import com.google.api.gax.rpc.ServerStreamingCallable; -import com.google.api.gax.rpc.StateCheckingResponseObserver; -import com.google.api.gax.rpc.StreamController; -import com.google.api.gax.rpc.UnaryCallable; import com.google.cloud.bigtable.data.v2.models.Query; /** @@ -30,7 +26,7 @@ * cancelling the RPC */ @InternalApi -public class ReadRowsFirstCallable extends UnaryCallable { +public class ReadRowsFirstCallable extends ServerStreamingCallable { private final ServerStreamingCallable inner; @@ -39,41 +35,7 @@ public ReadRowsFirstCallable(ServerStreamingCallable inner) { } @Override - public ApiFuture futureCall(Query query, ApiCallContext context) { - ReadRowsFirstResponseObserver observer = new ReadRowsFirstResponseObserver<>(); - this.inner.call(query.limit(1), observer, context); - return observer.getFuture(); - } - - private class ReadRowsFirstResponseObserver extends StateCheckingResponseObserver { - private StreamController innerController; - private RowT firstRow; - private SettableApiFuture settableFuture = SettableApiFuture.create(); - - @Override - protected void onStartImpl(StreamController streamController) { - this.innerController = streamController; - } - - @Override - protected void onResponseImpl(RowT response) { - if (firstRow == null) { - this.firstRow = response; - } - } - - @Override - protected void onErrorImpl(Throwable throwable) { - settableFuture.setException(throwable); - } - - @Override - protected void onCompleteImpl() { - settableFuture.set(firstRow); - } - - protected ApiFuture getFuture() { - return settableFuture; - } + public void call(Query request, ResponseObserver responseObserver, ApiCallContext context) { + inner.call(request.limit(1), responseObserver, context); } } diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/CookiesHolderTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/CookiesHolderTest.java index 95a807bf76..47895a34d0 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/CookiesHolderTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/CookiesHolderTest.java @@ -246,8 +246,9 @@ public void testMutateRows() { } @Test - public void testMutateRow() { + public void testMutateRow() throws InterruptedException { client.mutateRow(RowMutation.create("table", "key").setCell("cf", "q", "v")); + client.close(); assertThat(fakeService.count.get()).isGreaterThan(1); assertThat(serverMetadata).hasSize(fakeService.count.get()); @@ -682,6 +683,7 @@ public void readRows( @Override public void mutateRow( MutateRowRequest request, StreamObserver responseObserver) { + System.out.println("server: mutateRow " + count.get()); if (count.getAndIncrement() < 1) { Metadata trailers = new Metadata(); maybePopulateCookie(trailers, "mutateRow"); @@ -801,6 +803,7 @@ private void maybePopulateCookie(Metadata trailers, String label) { trailers.put(ROUTING_COOKIE_2, testCookie); trailers.put(BAD_KEY, "bad-key"); } + System.out.println("server trailers (" + returnCookie + "): " + trailers); } } } diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubTest.java index 50d086b711..80aa7539e5 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubTest.java @@ -70,6 +70,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Queues; import com.google.common.io.BaseEncoding; +import com.google.common.truth.Correspondence; import com.google.protobuf.ByteString; import com.google.protobuf.BytesValue; import com.google.protobuf.StringValue; @@ -98,9 +99,11 @@ import java.security.KeyPair; import java.security.KeyPairGenerator; import java.security.NoSuchAlgorithmException; +import java.util.ArrayList; import java.util.Base64; import java.util.Collection; import java.util.Iterator; +import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutionException; @@ -328,6 +331,7 @@ public void testUserAgent() throws InterruptedException { .containsMatch("bigtable-java/\\d+\\.\\d+\\.\\d+(?:-SNAPSHOT)?"); } + static final Correspondence SPAN_DATA_BY_NAME = Correspondence.transforming(SpanData::getName, "SpanData.getName()"); @Test public void testSpanAttributes() throws InterruptedException { final BlockingQueue spans = new ArrayBlockingQueue<>(100); @@ -347,6 +351,7 @@ public void export(Collection collection) { }); SpanData foundSpanData = null; + List allSpans = new ArrayList<>(); // Issue the rpc and grab the span try { try (Scope ignored = @@ -357,9 +362,14 @@ public void export(Collection collection) { enhancedBigtableStub.readRowCallable().call(Query.create("table-id").rowKey("row-key")); } - for (int i = 0; i < 100; i++) { - SpanData spanData = spans.poll(10, TimeUnit.SECONDS); - if ("Bigtable.ReadRow".equals(spanData.getName())) { + + for (int i = 0; i < 10; i++) { + SpanData spanData = spans.poll(1, TimeUnit.SECONDS); + if (spanData == null) { + continue; + } + allSpans.add(spanData); + if (SPAN_DATA_BY_NAME.compare(spanData, "Bigtable.ReadRow")) { foundSpanData = spanData; break; } @@ -369,18 +379,18 @@ public void export(Collection collection) { Tracing.getExportComponent().getSpanExporter().unregisterHandler(handlerName); } + // Examine the caught span + assertThat(allSpans).comparingElementsUsing(SPAN_DATA_BY_NAME).contains("Bigtable.ReadRow"); assertThat(foundSpanData).isNotNull(); assertThat(foundSpanData.getAttributes().getAttributeMap()) - .containsEntry("gapic", AttributeValue.stringAttributeValue(Version.VERSION)); - assertThat(foundSpanData.getAttributes().getAttributeMap()) - .containsEntry( - "grpc", - AttributeValue.stringAttributeValue( - GrpcUtil.getGrpcBuildVersion().getImplementationVersion())); - assertThat(foundSpanData.getAttributes().getAttributeMap()) - .containsEntry( - "gax", AttributeValue.stringAttributeValue(GaxGrpcProperties.getGaxGrpcVersion())); + .containsAtLeast( + "gapic", AttributeValue.stringAttributeValue(Version.VERSION), + "grpc", + AttributeValue.stringAttributeValue( + GrpcUtil.getGrpcBuildVersion().getImplementationVersion()), + "gax", AttributeValue.stringAttributeValue(GaxGrpcProperties.getGaxGrpcVersion()) + ); } @Test diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTestUtils.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTestUtils.java index 2ea4f99bdc..f55c391be9 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTestUtils.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTestUtils.java @@ -26,6 +26,7 @@ import io.opentelemetry.sdk.metrics.data.HistogramPointData; import io.opentelemetry.sdk.metrics.data.LongPointData; import io.opentelemetry.sdk.metrics.data.MetricData; +import io.opentelemetry.sdk.metrics.data.PointData; import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader; import java.util.Collection; import java.util.Collections; @@ -38,6 +39,9 @@ public class BuiltinMetricsTestUtils { private static final Correspondence METRIC_DATA_BY_NAME = Correspondence.transforming(MetricData::getName, "MetricData name"); + private static final Correspondence POINT_DATA_BY_ATTRS = + Correspondence.transforming((pd) -> pd.getAttributes(), "PointData attributes"); + private BuiltinMetricsTestUtils() {} public static MetricData getMetricData(InMemoryMetricReader reader, String metricName) { @@ -78,6 +82,9 @@ public static MetricData getMetricData(InMemoryMetricReader reader, String metri public static long getAggregatedValue(MetricData metricData, Attributes attributes) { switch (metricData.getType()) { case HISTOGRAM: + assertThat(metricData.getHistogramData().getPoints()) + .comparingElementsUsing(POINT_DATA_BY_ATTRS) + .contains(attributes); HistogramPointData hd = metricData.getHistogramData().getPoints().stream() .filter(pd -> pd.getAttributes().equals(attributes)) @@ -85,6 +92,9 @@ public static long getAggregatedValue(MetricData metricData, Attributes attribut .get(0); return (long) hd.getSum() / hd.getCount(); case LONG_SUM: + assertThat(metricData.getLongSumData().getPoints()) + .comparingElementsUsing(POINT_DATA_BY_ATTRS) + .contains(attributes); LongPointData ld = metricData.getLongSumData().getPoints().stream() .filter(pd -> pd.getAttributes().equals(attributes)) @@ -120,18 +130,15 @@ public static Timestamp getStartTimeSeconds(MetricData metricData, Attributes at public static void verifyAttributes(MetricData metricData, Attributes attributes) { switch (metricData.getType()) { case HISTOGRAM: - List hd = - metricData.getHistogramData().getPoints().stream() - .filter(pd -> pd.getAttributes().equals(attributes)) - .collect(Collectors.toList()); - assertThat(hd).isNotEmpty(); + assertThat(metricData.getHistogramData().getPoints()) + .comparingElementsUsing(POINT_DATA_BY_ATTRS) + .contains(attributes); + break; case LONG_SUM: - List ld = - metricData.getLongSumData().getPoints().stream() - .filter(pd -> pd.getAttributes().equals(attributes)) - .collect(Collectors.toList()); - assertThat(ld).isNotEmpty(); + assertThat(metricData.getLongSumData().getPoints()) + .comparingElementsUsing(POINT_DATA_BY_ATTRS) + .contains(attributes); break; default: Assert.fail("Unexpected type"); diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracerTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracerTest.java index d37a2562bf..ebc92ed0f3 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracerTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracerTest.java @@ -492,12 +492,13 @@ public void testRetryCount() throws InterruptedException { .put(STATUS_KEY, "OK") .build(); + Thread.sleep(1000); long value = getAggregatedValue(metricData, expectedAttributes); assertThat(value).isEqualTo(fakeService.getAttemptCounter().get() - 1); } @Test - public void testMutateRowAttemptsTagValues() { + public void testMutateRowAttemptsTagValues() throws InterruptedException { stub.mutateRowCallable() .call(RowMutation.create(TABLE, "random-row").setCell("cf", "q", "value")); @@ -527,6 +528,8 @@ public void testMutateRowAttemptsTagValues() { .put(STREAMING_KEY, false) .build(); + stub.close(); + Thread.sleep(1000); verifyAttributes(metricData, expected1); verifyAttributes(metricData, expected2); } @@ -676,7 +679,7 @@ public void testQueuedOnChannelServerStreamLatencies() { } @Test - public void testQueuedOnChannelUnaryLatencies() { + public void testQueuedOnChannelUnaryLatencies() throws InterruptedException { stub.mutateRowCallable().call(RowMutation.create(TABLE, "a-key").setCell("f", "q", "v")); @@ -692,6 +695,7 @@ public void testQueuedOnChannelUnaryLatencies() { .put(CLIENT_NAME_KEY, CLIENT_NAME) .build(); + Thread.sleep(1000); long expected = CHANNEL_BLOCKING_LATENCY * 2 / 3; long actual = getAggregatedValue(clientLatency, attributes); assertThat(actual).isAtLeast(expected); diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsFirstCallableTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsFirstCallableTest.java index 07cf3478c1..5220fd23b2 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsFirstCallableTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsFirstCallableTest.java @@ -15,16 +15,13 @@ */ package com.google.cloud.bigtable.data.v2.stub.readrows; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.verify; import com.google.api.gax.grpc.GrpcCallContext; -import com.google.api.gax.rpc.ApiCallContext; -import com.google.api.gax.rpc.ResponseObserver; import com.google.api.gax.rpc.ServerStreamingCallable; import com.google.cloud.bigtable.data.v2.internal.RequestContext; import com.google.cloud.bigtable.data.v2.models.Query; import com.google.cloud.bigtable.data.v2.models.Row; +import com.google.cloud.bigtable.gaxx.testing.FakeStreamingApi; import com.google.common.truth.Truth; import org.junit.Before; import org.junit.Rule; @@ -56,21 +53,23 @@ public void setUp() { @Test public void testLimitAdded() { + FakeStreamingApi.ServerStreamingStashCallable innerCallable = + new FakeStreamingApi.ServerStreamingStashCallable<>(); ReadRowsFirstCallable callable = new ReadRowsFirstCallable<>(innerCallable); - callable.futureCall(Query.create("fake-table"), GrpcCallContext.createDefault()); - verify(innerCallable) - .call(innerQuery.capture(), any(ResponseObserver.class), any(ApiCallContext.class)); - Truth.assertThat(innerQuery.getValue().toProto(REQUEST_CONTEXT)) + callable.call(Query.create("fake-table"), GrpcCallContext.createDefault()); + + Truth.assertThat(innerCallable.getActualRequest().toProto(REQUEST_CONTEXT)) .isEqualTo(Query.create("fake-table").limit(1).toProto(REQUEST_CONTEXT)); } @Test public void testLimitChanged() { + FakeStreamingApi.ServerStreamingStashCallable innerCallable = + new FakeStreamingApi.ServerStreamingStashCallable<>(); ReadRowsFirstCallable callable = new ReadRowsFirstCallable<>(innerCallable); - callable.futureCall(Query.create("fake-table").limit(10), GrpcCallContext.createDefault()); - verify(innerCallable) - .call(innerQuery.capture(), any(ResponseObserver.class), any(ApiCallContext.class)); - Truth.assertThat(innerQuery.getValue().toProto(REQUEST_CONTEXT)) + callable.call(Query.create("fake-table").limit(10), GrpcCallContext.createDefault()); + + Truth.assertThat(innerCallable.getActualRequest().toProto(REQUEST_CONTEXT)) .isEqualTo(Query.create("fake-table").limit(1).toProto(REQUEST_CONTEXT)); } }