From 8f161464573f7883d793789bab946ba04aff57e9 Mon Sep 17 00:00:00 2001 From: Igor Berntein Date: Thu, 28 Sep 2023 16:45:08 -0400 Subject: [PATCH] fix: a rare race condition in the row merger This would manifest as a hang when iterating over a ServerStream from ReadRows Change-Id: I74533c6714b40a68ec0ef81dadac747e10bee39d --- .../reframing/ReframingResponseObserver.java | 2 +- .../ReframingResponseObserverTest.java | 120 ++++++++++++++++++ 2 files changed, 121 insertions(+), 1 deletion(-) diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/reframing/ReframingResponseObserver.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/reframing/ReframingResponseObserver.java index 6f2440fff7..3a5458836f 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/reframing/ReframingResponseObserver.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/reframing/ReframingResponseObserver.java @@ -277,7 +277,7 @@ private void deliverUnsafe() { // Optimization: the inner loop will eager process any accumulated state, so reset the lock // for just this iteration. (If another event occurs during processing, it can increment the // lock to enqueue another iteration). - lock.lazySet(1); + lock.set(1); // Process the upstream message if one exists. pollUpstream(); diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/gaxx/reframing/ReframingResponseObserverTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/gaxx/reframing/ReframingResponseObserverTest.java index 426c27f5a3..131c1bf970 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/gaxx/reframing/ReframingResponseObserverTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/gaxx/reframing/ReframingResponseObserverTest.java @@ -15,9 +15,12 @@ */ package com.google.cloud.bigtable.gaxx.reframing; +import static com.google.common.truth.Truth.assertWithMessage; + import com.google.api.gax.rpc.StreamController; import com.google.cloud.bigtable.gaxx.testing.FakeStreamingApi.ServerStreamingStashCallable; import com.google.cloud.bigtable.gaxx.testing.FakeStreamingApi.ServerStreamingStashCallable.StreamControllerStash; +import com.google.cloud.bigtable.gaxx.testing.MockStreamingApi; import com.google.cloud.bigtable.gaxx.testing.MockStreamingApi.MockResponseObserver; import com.google.cloud.bigtable.gaxx.testing.MockStreamingApi.MockServerStreamingCall; import com.google.cloud.bigtable.gaxx.testing.MockStreamingApi.MockServerStreamingCallable; @@ -27,9 +30,13 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Queues; import com.google.common.truth.Truth; +import java.util.ArrayList; import java.util.Arrays; +import java.util.List; import java.util.Queue; +import java.util.concurrent.Callable; import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -431,6 +438,119 @@ public String pop() { Truth.assertThat(finalError.getSuppressed()[0].getCause()).isSameInstanceAs(fakeCancelError); } + /** Test race between a request() and onComplete (b/295866356). + * This will stress the concurrency primitives in deliver() by running a many iterations across many threads. + * Some race conditions are very subtle and are very rare, so bugs in the implementation would present themselves as + * flakes in this test. All flakes of this test should be investigated as a failure. + */ + @Test + public void testRequestAndCompleteRaceCondition() throws Throwable { + int concurrency = 20; + int iterations = 20_000; + + ExecutorService executor = Executors.newFixedThreadPool(concurrency); + + List> results = new ArrayList<>(); + + for (int i = 0; i < concurrency; i++) { + Future result = + executor.submit( + (Callable) + () -> { + for (int j = 0; j < iterations; j++) { + requestAndCompleteRaceConditionIteration(); + } + return null; + }); + results.add(result); + } + + executor.shutdown(); + + for (Future result : results) { + try { + result.get(); + } catch (ExecutionException e) { + throw e.getCause(); + } + } + } + + private static void requestAndCompleteRaceConditionIteration() + throws InterruptedException, ExecutionException { + MockStreamingApi.MockResponseObserver observer = + new MockStreamingApi.MockResponseObserver<>(false); + ReframingResponseObserver underTest = + new ReframingResponseObserver<>( + observer, new ReframingResponseObserverTest.DasherizingReframer(1)); + + // This is intentionally not a Phaser, the Phaser seems to drastically reduce the reproduction + // rate of the + // original race condition. + CountDownLatch readySignal = new CountDownLatch(2); + CompletableFuture startSignal = new CompletableFuture<>(); + + ExecutorService executor = Executors.newFixedThreadPool(2); + + Future f1 = + executor.submit( + () -> { + // no setup, tell controller thread we are ready and wait for the start signal + readySignal.countDown(); + startSignal.get(); + + // Race start + underTest.onComplete(); + // Race end + + return null; + }); + + Future f2 = + executor.submit( + () -> { + // Setup before race - simulate that the ServerStream iterator got one row and is now + // checking if there + // is another. This is the lead up to the race with grpc's onComplete + underTest.onStart( + new StreamController() { + @Override + public void cancel() {} + + @Override + public void disableAutoInboundFlowControl() {} + + @Override + public void request(int count) {} + }); + observer.getController().request(1); + underTest.onResponse("moo"); + + // Setup complete, tell controller thread we are ready and wait for the start signal + readySignal.countDown(); + startSignal.get(); + + // Race start + observer.getController().request(1); + // Race end + + return null; + }); + executor.shutdown(); + + // Wait for worker setup + readySignal.await(); + // Tell workers to race + startSignal.complete(null); + + // Wait workers to finish + f1.get(); + f2.get(); + + // the outer observer should be told of the completion of rpc + assertWithMessage("outer observer should not hang").that(observer.isDone()).isTrue(); + } + /** * A simple implementation of a {@link Reframer}. The input string is split by dash, and the * output is concatenated by dashes. The test can verify M:N behavior by adjusting the