Skip to content

Commit

Permalink
fix: a rare race condition in the row merger
Browse files Browse the repository at this point in the history
This would manifest as a hang when iterating over a ServerStream from ReadRows

Change-Id: I74533c6714b40a68ec0ef81dadac747e10bee39d
  • Loading branch information
igorbernstein2 committed Sep 28, 2023
1 parent 15cd486 commit 8f16146
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ private void deliverUnsafe() {
// Optimization: the inner loop will eager process any accumulated state, so reset the lock
// for just this iteration. (If another event occurs during processing, it can increment the
// lock to enqueue another iteration).
lock.lazySet(1);
lock.set(1);

// Process the upstream message if one exists.
pollUpstream();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,12 @@
*/
package com.google.cloud.bigtable.gaxx.reframing;

import static com.google.common.truth.Truth.assertWithMessage;

import com.google.api.gax.rpc.StreamController;
import com.google.cloud.bigtable.gaxx.testing.FakeStreamingApi.ServerStreamingStashCallable;
import com.google.cloud.bigtable.gaxx.testing.FakeStreamingApi.ServerStreamingStashCallable.StreamControllerStash;
import com.google.cloud.bigtable.gaxx.testing.MockStreamingApi;
import com.google.cloud.bigtable.gaxx.testing.MockStreamingApi.MockResponseObserver;
import com.google.cloud.bigtable.gaxx.testing.MockStreamingApi.MockServerStreamingCall;
import com.google.cloud.bigtable.gaxx.testing.MockStreamingApi.MockServerStreamingCallable;
Expand All @@ -27,9 +30,13 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Queues;
import com.google.common.truth.Truth;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -431,6 +438,119 @@ public String pop() {
Truth.assertThat(finalError.getSuppressed()[0].getCause()).isSameInstanceAs(fakeCancelError);
}

/** Test race between a request() and onComplete (b/295866356).
* This will stress the concurrency primitives in deliver() by running a many iterations across many threads.
* Some race conditions are very subtle and are very rare, so bugs in the implementation would present themselves as
* flakes in this test. All flakes of this test should be investigated as a failure.
*/
@Test
public void testRequestAndCompleteRaceCondition() throws Throwable {
int concurrency = 20;
int iterations = 20_000;

ExecutorService executor = Executors.newFixedThreadPool(concurrency);

List<Future<?>> results = new ArrayList<>();

for (int i = 0; i < concurrency; i++) {
Future<?> result =
executor.submit(
(Callable<Void>)
() -> {
for (int j = 0; j < iterations; j++) {
requestAndCompleteRaceConditionIteration();
}
return null;
});
results.add(result);
}

executor.shutdown();

for (Future<?> result : results) {
try {
result.get();
} catch (ExecutionException e) {
throw e.getCause();
}
}
}

private static void requestAndCompleteRaceConditionIteration()
throws InterruptedException, ExecutionException {
MockStreamingApi.MockResponseObserver<String> observer =
new MockStreamingApi.MockResponseObserver<>(false);
ReframingResponseObserver<String, String> underTest =
new ReframingResponseObserver<>(
observer, new ReframingResponseObserverTest.DasherizingReframer(1));

// This is intentionally not a Phaser, the Phaser seems to drastically reduce the reproduction
// rate of the
// original race condition.
CountDownLatch readySignal = new CountDownLatch(2);
CompletableFuture<Void> startSignal = new CompletableFuture<>();

ExecutorService executor = Executors.newFixedThreadPool(2);

Future<Void> f1 =
executor.submit(
() -> {
// no setup, tell controller thread we are ready and wait for the start signal
readySignal.countDown();
startSignal.get();

// Race start
underTest.onComplete();
// Race end

return null;
});

Future<Void> f2 =
executor.submit(
() -> {
// Setup before race - simulate that the ServerStream iterator got one row and is now
// checking if there
// is another. This is the lead up to the race with grpc's onComplete
underTest.onStart(
new StreamController() {
@Override
public void cancel() {}

@Override
public void disableAutoInboundFlowControl() {}

@Override
public void request(int count) {}
});
observer.getController().request(1);
underTest.onResponse("moo");

// Setup complete, tell controller thread we are ready and wait for the start signal
readySignal.countDown();
startSignal.get();

// Race start
observer.getController().request(1);
// Race end

return null;
});
executor.shutdown();

// Wait for worker setup
readySignal.await();
// Tell workers to race
startSignal.complete(null);

// Wait workers to finish
f1.get();
f2.get();

// the outer observer should be told of the completion of rpc
assertWithMessage("outer observer should not hang").that(observer.isDone()).isTrue();
}

/**
* A simple implementation of a {@link Reframer}. The input string is split by dash, and the
* output is concatenated by dashes. The test can verify M:N behavior by adjusting the
Expand Down

0 comments on commit 8f16146

Please sign in to comment.