From 6685aa331f2010c645f2dbe50f2fe4f7a6ed8b8a Mon Sep 17 00:00:00 2001 From: Igor Bernstein Date: Tue, 5 Nov 2024 16:48:04 -0500 Subject: [PATCH] chore: remodel unary callables as server streaming callables with an adapter at the end (#2403) * chore: remodel unary callables as server streaming callables with an adapter at the end Change-Id: I8708dff0e192d7647ef2cb361fc0992e1ddd2b24 * test + fixes Change-Id: Id4c56656a829f5f4c7ab1170f5f980cf3cc3760c * chore: generate libraries at Mon Nov 4 22:30:01 UTC 2024 * oops Change-Id: I1bd8c318b3272925cd6b81601d7b1d7c772a853f * more tests Change-Id: I1c45f2058cadc1acb9c6abd87222be9eb233778c * avoid multiple cancels Change-Id: I4e05efaac6ae60f5827c6d666c3c6f6cebebaa54 * chore: generate libraries at Tue Nov 5 00:23:44 UTC 2024 * fix fallback Change-Id: I654e70f0b34f5d4c3071ba3c2fed64ea183a865e * chore: generate libraries at Tue Nov 5 00:42:37 UTC 2024 * proper fallback Change-Id: Ic0106f3c6983edbb032aeba6e107e4324952397d * Use transforming callable Change-Id: I8d8474050e40cd819d3be2a5b251448f6eb8c94f * fix npe Change-Id: Ib589ca063369e26ef214eb89099e459981dafe83 * clean up logic Change-Id: I4504c47143000d97554a96469d5f3fd368d08ef1 * oops, messed up splitting commits, this should've been part of this pr not the next Change-Id: I16a35e19c50b7b7b855f4299cf41f0607b3e90bd * typo Change-Id: I8202e935975e1a55606265c502fe7573b8a4acb0 * disable watchdog for the new ReadRow callable chain Change-Id: I4522719a65f24d27fb9dccde031c3b1cc04042c2 --------- Co-authored-by: cloud-java-bot --- .../stub/BigtableUnaryOperationCallable.java | 189 ++++++++++++++++++ .../data/v2/stub/EnhancedBigtableStub.java | 135 +++++++++++-- .../v2/stub/EnhancedBigtableStubSettings.java | 6 + .../TransformingServerStreamingCallable.java | 72 +++++++ .../BigtableUnaryOperationCallableTest.java | 166 +++++++++++++++ ...ansformingServerStreamingCallableTest.java | 74 +++++++ 6 files changed, 620 insertions(+), 22 deletions(-) create mode 100644 google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableUnaryOperationCallable.java create mode 100644 google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/TransformingServerStreamingCallable.java create mode 100644 google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/BigtableUnaryOperationCallableTest.java create mode 100644 google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/TransformingServerStreamingCallableTest.java diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableUnaryOperationCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableUnaryOperationCallable.java new file mode 100644 index 0000000000..19f7a5224c --- /dev/null +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableUnaryOperationCallable.java @@ -0,0 +1,189 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.stub; + +import com.google.api.core.AbstractApiFuture; +import com.google.api.core.ApiFuture; +import com.google.api.gax.grpc.GrpcStatusCode; +import com.google.api.gax.rpc.ApiCallContext; +import com.google.api.gax.rpc.InternalException; +import com.google.api.gax.rpc.ResponseObserver; +import com.google.api.gax.rpc.ServerStreamingCallable; +import com.google.api.gax.rpc.StreamController; +import com.google.api.gax.rpc.UnaryCallable; +import com.google.api.gax.tracing.ApiTracer; +import com.google.api.gax.tracing.ApiTracerFactory; +import com.google.api.gax.tracing.SpanName; +import com.google.common.base.Preconditions; +import io.grpc.Status; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.logging.Level; +import java.util.logging.Logger; +import javax.annotation.Nullable; + +/** + * Helper to convert a fake {@link ServerStreamingCallable} (ie only up to 1 response) into a {@link + * UnaryCallable}. It is intended to be the outermost callable of a chain. + * + *

Responsibilities: + * + *

*/ public UnaryCallable createReadRowCallable(RowAdapter rowAdapter) { - ServerStreamingCallable readRowsCallable = - createReadRowsBaseCallable( - ServerStreamingCallSettings.newBuilder() - .setRetryableCodes(settings.readRowSettings().getRetryableCodes()) - .setRetrySettings(settings.readRowSettings().getRetrySettings()) - .setIdleTimeout(settings.readRowSettings().getRetrySettings().getTotalTimeout()) - .build(), - rowAdapter); - - ReadRowsUserCallable readRowCallable = - new ReadRowsUserCallable<>(readRowsCallable, requestContext); - - ReadRowsFirstCallable firstRow = new ReadRowsFirstCallable<>(readRowCallable); - - UnaryCallable traced = - new TracedUnaryCallable<>( - firstRow, clientContext.getTracerFactory(), getSpanName("ReadRow")); - - return traced.withDefaultCallContext(clientContext.getDefaultCallContext()); + if (!EnhancedBigtableStubSettings.SKIP_TRAILERS) { + ServerStreamingCallable readRowsCallable = + createReadRowsBaseCallable( + ServerStreamingCallSettings.newBuilder() + .setRetryableCodes(settings.readRowSettings().getRetryableCodes()) + .setRetrySettings(settings.readRowSettings().getRetrySettings()) + .setIdleTimeout(settings.readRowSettings().getRetrySettings().getTotalTimeout()) + .build(), + rowAdapter); + + ReadRowsUserCallable readRowCallable = + new ReadRowsUserCallable<>(readRowsCallable, requestContext); + ReadRowsFirstCallable firstRow = new ReadRowsFirstCallable<>(readRowCallable); + UnaryCallable traced = + new TracedUnaryCallable<>( + firstRow, clientContext.getTracerFactory(), getSpanName("ReadRow")); + return traced.withDefaultCallContext(clientContext.getDefaultCallContext()); + } else { + ServerStreamingCallable readRowsCallable = + createReadRowsBaseCallable( + ServerStreamingCallSettings.newBuilder() + .setRetryableCodes(settings.readRowSettings().getRetryableCodes()) + .setRetrySettings(settings.readRowSettings().getRetrySettings()) + .setIdleTimeoutDuration(Duration.ZERO) + .setWaitTimeoutDuration(Duration.ZERO) + .build(), + rowAdapter, + new SimpleStreamResumptionStrategy<>()); + ServerStreamingCallable readRowCallable = + new TransformingServerStreamingCallable<>( + readRowsCallable, + (query) -> query.limit(1).toProto(requestContext), + Functions.identity()); + + return new BigtableUnaryOperationCallable<>( + readRowCallable, + clientContext.getDefaultCallContext(), + clientContext.getTracerFactory(), + getSpanName("ReadRow"), + /*allowNoResponses=*/ true); + } } + private ServerStreamingCallable createReadRowsBaseCallable( + ServerStreamingCallSettings readRowsSettings, RowAdapter rowAdapter) { + return createReadRowsBaseCallable( + readRowsSettings, rowAdapter, new ReadRowsResumptionStrategy(rowAdapter)); + } /** * Creates a callable chain to handle ReadRows RPCs. The chain will: * @@ -596,8 +627,9 @@ public UnaryCallable createReadRowCallable(RowAdapter *

NOTE: the caller is responsible for adding tracing & metrics. */ private ServerStreamingCallable createReadRowsBaseCallable( - ServerStreamingCallSettings readRowsSettings, RowAdapter rowAdapter) { - + ServerStreamingCallSettings readRowsSettings, + RowAdapter rowAdapter, + StreamResumptionStrategy resumptionStrategy) { ServerStreamingCallable base = GrpcRawCallableFactory.createServerStreamingCallable( GrpcCallSettings.newBuilder() @@ -625,7 +657,7 @@ private ServerStreamingCallable createReadRo // ReadRowsRequest -> ReadRowsResponse callable). ServerStreamingCallSettings innerSettings = ServerStreamingCallSettings.newBuilder() - .setResumptionStrategy(new ReadRowsResumptionStrategy<>(rowAdapter)) + .setResumptionStrategy(resumptionStrategy) .setRetryableCodes(readRowsSettings.getRetryableCodes()) .setRetrySettings(readRowsSettings.getRetrySettings()) .setIdleTimeout(readRowsSettings.getIdleTimeout()) @@ -1264,6 +1296,21 @@ private UnaryCallable createUnar UnaryCallSettings callSettings, Function requestTransformer, Function responseTranformer) { + if (EnhancedBigtableStubSettings.SKIP_TRAILERS) { + return createUnaryCallableNew( + methodDescriptor, headerParamsFn, callSettings, requestTransformer, responseTranformer); + } else { + return createUnaryCallableOld( + methodDescriptor, headerParamsFn, callSettings, requestTransformer, responseTranformer); + } + } + + private UnaryCallable createUnaryCallableOld( + MethodDescriptor methodDescriptor, + RequestParamsExtractor headerParamsFn, + UnaryCallSettings callSettings, + Function requestTransformer, + Function responseTranformer) { UnaryCallable base = GrpcRawCallableFactory.createUnaryCallable( @@ -1300,6 +1347,50 @@ public ApiFuture futureCall(ReqT reqT, ApiCallContext apiCallContext) { return traced.withDefaultCallContext(clientContext.getDefaultCallContext()); } + private UnaryCallable createUnaryCallableNew( + MethodDescriptor methodDescriptor, + RequestParamsExtractor headerParamsFn, + UnaryCallSettings callSettings, + Function requestTransformer, + Function responseTranformer) { + + ServerStreamingCallable base = + GrpcRawCallableFactory.createServerStreamingCallable( + GrpcCallSettings.newBuilder() + .setMethodDescriptor(methodDescriptor) + .setParamsExtractor(headerParamsFn) + .build(), + callSettings.getRetryableCodes()); + + base = new StatsHeadersServerStreamingCallable<>(base); + + base = new BigtableTracerStreamingCallable<>(base); + + base = withRetries(base, convertUnaryToServerStreamingSettings(callSettings)); + + ServerStreamingCallable transformed = + new TransformingServerStreamingCallable<>(base, requestTransformer, responseTranformer); + + return new BigtableUnaryOperationCallable<>( + transformed, + clientContext.getDefaultCallContext(), + clientContext.getTracerFactory(), + getSpanName(methodDescriptor.getBareMethodName()), + /* allowNoResponse= */ false); + } + + private static + ServerStreamingCallSettings convertUnaryToServerStreamingSettings( + UnaryCallSettings unarySettings) { + return ServerStreamingCallSettings.newBuilder() + .setResumptionStrategy(new SimpleStreamResumptionStrategy<>()) + .setRetryableCodes(unarySettings.getRetryableCodes()) + .setRetrySettings(unarySettings.getRetrySettings()) + .setIdleTimeoutDuration(Duration.ZERO) + .setWaitTimeoutDuration(Duration.ZERO) + .build(); + } + private UnaryCallable createPingAndWarmCallable() { UnaryCallable pingAndWarm = GrpcRawCallableFactory.createUnaryCallable( diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java index 5e5dc64fd9..863389166f 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java @@ -62,6 +62,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.logging.Logger; import javax.annotation.Nonnull; @@ -108,6 +109,11 @@ public class EnhancedBigtableStubSettings extends StubSettings IDEMPOTENT_RETRY_CODES = ImmutableSet.of(Code.DEADLINE_EXCEEDED, Code.UNAVAILABLE); diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/TransformingServerStreamingCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/TransformingServerStreamingCallable.java new file mode 100644 index 0000000000..576d8257d7 --- /dev/null +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/TransformingServerStreamingCallable.java @@ -0,0 +1,72 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.stub; + +import com.google.api.gax.rpc.ApiCallContext; +import com.google.api.gax.rpc.ResponseObserver; +import com.google.api.gax.rpc.ServerStreamingCallable; +import com.google.api.gax.rpc.StreamController; +import java.util.function.Function; + +/** Callable to help crossing api boundary lines between models and protos */ +class TransformingServerStreamingCallable + extends ServerStreamingCallable { + private final ServerStreamingCallable inner; + private final Function requestTransformer; + private final Function responseTransformer; + + public TransformingServerStreamingCallable( + ServerStreamingCallable inner, + Function requestTransformer, + Function responseTransformer) { + this.inner = inner; + this.requestTransformer = requestTransformer; + this.responseTransformer = responseTransformer; + } + + @Override + public void call( + OuterReqT outerReqT, + ResponseObserver outerObserver, + ApiCallContext apiCallContext) { + InnerReqT innerReq = requestTransformer.apply(outerReqT); + + inner.call( + innerReq, + new ResponseObserver() { + @Override + public void onStart(StreamController streamController) { + outerObserver.onStart(streamController); + } + + @Override + public void onResponse(InnerRespT innerResp) { + outerObserver.onResponse(responseTransformer.apply(innerResp)); + } + + @Override + public void onError(Throwable throwable) { + outerObserver.onError(throwable); + } + + @Override + public void onComplete() { + outerObserver.onComplete(); + } + }, + apiCallContext); + } +} diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/BigtableUnaryOperationCallableTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/BigtableUnaryOperationCallableTest.java new file mode 100644 index 0000000000..b6f1a24b70 --- /dev/null +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/BigtableUnaryOperationCallableTest.java @@ -0,0 +1,166 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.stub; + +import static com.google.common.truth.Truth.assertThat; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; + +import com.google.api.core.ApiFuture; +import com.google.api.gax.grpc.GrpcCallContext; +import com.google.api.gax.rpc.InternalException; +import com.google.api.gax.tracing.ApiTracerFactory; +import com.google.api.gax.tracing.SpanName; +import com.google.cloud.bigtable.data.v2.stub.metrics.BigtableTracer; +import com.google.cloud.bigtable.gaxx.testing.FakeStreamingApi; +import com.google.cloud.bigtable.gaxx.testing.MockStreamingApi.MockServerStreamingCall; +import com.google.cloud.bigtable.gaxx.testing.MockStreamingApi.MockServerStreamingCallable; +import com.google.common.collect.ImmutableList; +import java.util.concurrent.ExecutionException; +import java.util.logging.Logger; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnit; +import org.mockito.junit.MockitoRule; + +@RunWith(JUnit4.class) +public class BigtableUnaryOperationCallableTest { + @Rule public final MockitoRule mockitoRule = MockitoJUnit.rule(); + + @Mock private ApiTracerFactory tracerFactory; + @Mock private BigtableTracer tracer; + + @Before + public void setUp() throws Exception { + Mockito.when(tracerFactory.newTracer(Mockito.any(), Mockito.any(), Mockito.any())) + .thenReturn(tracer); + } + + @Test + public void testFutureResolve() throws Exception { + BigtableUnaryOperationCallable callable = + new BigtableUnaryOperationCallable<>( + new FakeStreamingApi.ServerStreamingStashCallable<>(ImmutableList.of("value")), + GrpcCallContext.createDefault(), + tracerFactory, + SpanName.of("Fake", "method"), + false); + + ApiFuture f = callable.futureCall("fake"); + assertThat(f.get()).isEqualTo("value"); + } + + @Test + public void testMultipleResponses() throws Exception { + MockServerStreamingCallable inner = new MockServerStreamingCallable<>(); + + BigtableUnaryOperationCallable callable = + new BigtableUnaryOperationCallable<>( + inner, + GrpcCallContext.createDefault(), + tracerFactory, + SpanName.of("Fake", "method"), + false); + callable.logger = Mockito.mock(Logger.class); + + ApiFuture f = callable.futureCall("fake"); + MockServerStreamingCall call = inner.popLastCall(); + call.getController().getObserver().onResponse("first"); + call.getController().getObserver().onResponse("second"); + + Throwable e = Assert.assertThrows(ExecutionException.class, f::get).getCause(); + assertThat(e).isInstanceOf(InternalException.class); + assertThat(e) + .hasMessageThat() + .contains( + "Received multiple responses for a Fake.method unary operation. Previous: first, New: second"); + + ArgumentCaptor msgCaptor = ArgumentCaptor.forClass(String.class); + verify(callable.logger).log(Mockito.any(), msgCaptor.capture()); + assertThat(msgCaptor.getValue()) + .isEqualTo( + "Received multiple responses for a Fake.method unary operation. Previous: first, New: second"); + + assertThat(call.getController().isCancelled()).isTrue(); + } + + @Test + public void testCancel() { + MockServerStreamingCallable inner = new MockServerStreamingCallable<>(); + BigtableUnaryOperationCallable callable = + new BigtableUnaryOperationCallable<>( + inner, + GrpcCallContext.createDefault(), + tracerFactory, + SpanName.of("Fake", "method"), + false); + ApiFuture f = callable.futureCall("req"); + f.cancel(true); + + MockServerStreamingCall call = inner.popLastCall(); + assertThat(call.getController().isCancelled()).isTrue(); + } + + @Test + public void testMissingResponse() { + MockServerStreamingCallable inner = new MockServerStreamingCallable<>(); + BigtableUnaryOperationCallable callable = + new BigtableUnaryOperationCallable<>( + inner, + GrpcCallContext.createDefault(), + tracerFactory, + SpanName.of("Fake", "method"), + false); + ApiFuture f = callable.futureCall("req"); + MockServerStreamingCall call = inner.popLastCall(); + call.getController().getObserver().onComplete(); + + Throwable cause = Assert.assertThrows(ExecutionException.class, f::get).getCause(); + assertThat(cause) + .hasMessageThat() + .isEqualTo("Fake.method unary operation completed without a response message"); + } + + @Test + public void testTracing() throws Exception { + MockServerStreamingCallable inner = new MockServerStreamingCallable<>(); + BigtableUnaryOperationCallable callable = + new BigtableUnaryOperationCallable<>( + inner, + GrpcCallContext.createDefault(), + tracerFactory, + SpanName.of("Fake", "method"), + false); + ApiFuture f = callable.futureCall("req"); + MockServerStreamingCall call = inner.popLastCall(); + call.getController().getObserver().onResponse("value"); + call.getController().getObserver().onComplete(); + + f.get(); + verify(tracer).responseReceived(); + verify(tracer).operationSucceeded(); + + // afterResponse is the responsibility of BigtableTracerStreamingCallable + verify(tracer, never()).afterResponse(Mockito.anyLong()); + } +} diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/TransformingServerStreamingCallableTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/TransformingServerStreamingCallableTest.java new file mode 100644 index 0000000000..856d732f5c --- /dev/null +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/TransformingServerStreamingCallableTest.java @@ -0,0 +1,74 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.stub; + +import static com.google.common.truth.Truth.assertThat; + +import com.google.cloud.bigtable.gaxx.testing.MockStreamingApi.MockResponseObserver; +import com.google.cloud.bigtable.gaxx.testing.MockStreamingApi.MockServerStreamingCall; +import com.google.cloud.bigtable.gaxx.testing.MockStreamingApi.MockServerStreamingCallable; +import com.google.common.base.Functions; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class TransformingServerStreamingCallableTest { + @Test + public void testReqTransform() { + MockServerStreamingCallable inner = new MockServerStreamingCallable<>(); + TransformingServerStreamingCallable xform = + new TransformingServerStreamingCallable<>(inner, Object::toString, Functions.identity()); + + MockResponseObserver responseObserver = new MockResponseObserver<>(true); + xform.call(37, responseObserver); + + MockServerStreamingCall call = inner.popLastCall(); + assertThat(call.getRequest()).isEqualTo("37"); + } + + @Test + public void testRespTransform() { + MockServerStreamingCallable inner = new MockServerStreamingCallable<>(); + TransformingServerStreamingCallable xform = + new TransformingServerStreamingCallable<>(inner, Functions.identity(), Integer::parseInt); + + MockResponseObserver outerObserver = new MockResponseObserver<>(true); + xform.call("req", outerObserver); + + MockServerStreamingCall call = inner.popLastCall(); + call.getController().getObserver().onResponse("37"); + + assertThat(outerObserver.popNextResponse()).isEqualTo(37); + } + + @Test + public void testError() { + MockServerStreamingCallable inner = new MockServerStreamingCallable<>(); + TransformingServerStreamingCallable xform = + new TransformingServerStreamingCallable<>( + inner, Functions.identity(), Functions.identity()); + + MockResponseObserver outerObserver = new MockResponseObserver<>(true); + xform.call("req", outerObserver); + + MockServerStreamingCall call = inner.popLastCall(); + RuntimeException e = new RuntimeException("fake error"); + call.getController().getObserver().onError(e); + + assertThat(outerObserver.getFinalError()).isEqualTo(e); + } +}