Skip to content

Commit

Permalink
fix: a rare race condition in the row merger (#1939) (#1988)
Browse files Browse the repository at this point in the history
* fix: a rare race condition in the row merger (#1939)

* fix: a rare race condition in the row merger

This would manifest as a hang when iterating over a ServerStream from ReadRows

Change-Id: I74533c6714b40a68ec0ef81dadac747e10bee39d

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

---------

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

---------

Co-authored-by: Igor Bernstein <[email protected]>
Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
  • Loading branch information
3 people authored Nov 6, 2023
1 parent 0285425 commit 31b084a
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 5 deletions.
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,20 +50,20 @@ If you are using Maven without the BOM, add this to your dependencies:
If you are using Gradle 5.x or later, add this to your dependencies:

```Groovy
implementation platform('com.google.cloud:libraries-bom:26.16.0')
implementation platform('com.google.cloud:libraries-bom:26.26.0')
implementation 'com.google.cloud:google-cloud-bigtable'
```
If you are using Gradle without BOM, add this to your dependencies:

```Groovy
implementation 'com.google.cloud:google-cloud-bigtable:2.23.2'
implementation 'com.google.cloud:google-cloud-bigtable:2.29.0'
```

If you are using SBT, add this to your dependencies:

```Scala
libraryDependencies += "com.google.cloud" % "google-cloud-bigtable" % "2.23.2"
libraryDependencies += "com.google.cloud" % "google-cloud-bigtable" % "2.29.0"
```
<!-- {x-version-update-end} -->

Expand Down Expand Up @@ -609,7 +609,7 @@ Java is a registered trademark of Oracle and/or its affiliates.
[kokoro-badge-link-5]: http://storage.googleapis.com/cloud-devrel-public/java/badges/java-bigtable/java11.html
[stability-image]: https://img.shields.io/badge/stability-stable-green
[maven-version-image]: https://img.shields.io/maven-central/v/com.google.cloud/google-cloud-bigtable.svg
[maven-version-link]: https://central.sonatype.com/artifact/com.google.cloud/google-cloud-bigtable/2.23.2
[maven-version-link]: https://central.sonatype.com/artifact/com.google.cloud/google-cloud-bigtable/2.29.0
[authentication]: https://github.com/googleapis/google-cloud-java#authentication
[auth-scopes]: https://developers.google.com/identity/protocols/oauth2/scopes
[predefined-iam-roles]: https://cloud.google.com/iam/docs/understanding-roles#predefined_roles
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ private void deliverUnsafe() {
// Optimization: the inner loop will eager process any accumulated state, so reset the lock
// for just this iteration. (If another event occurs during processing, it can increment the
// lock to enqueue another iteration).
lock.lazySet(1);
lock.set(1);

// Process the upstream message if one exists.
pollUpstream();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,12 @@
*/
package com.google.cloud.bigtable.gaxx.reframing;

import static com.google.common.truth.Truth.assertWithMessage;

import com.google.api.gax.rpc.StreamController;
import com.google.cloud.bigtable.gaxx.testing.FakeStreamingApi.ServerStreamingStashCallable;
import com.google.cloud.bigtable.gaxx.testing.FakeStreamingApi.ServerStreamingStashCallable.StreamControllerStash;
import com.google.cloud.bigtable.gaxx.testing.MockStreamingApi;
import com.google.cloud.bigtable.gaxx.testing.MockStreamingApi.MockResponseObserver;
import com.google.cloud.bigtable.gaxx.testing.MockStreamingApi.MockServerStreamingCall;
import com.google.cloud.bigtable.gaxx.testing.MockStreamingApi.MockServerStreamingCallable;
Expand All @@ -27,9 +30,13 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Queues;
import com.google.common.truth.Truth;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -431,6 +438,120 @@ public String pop() {
Truth.assertThat(finalError.getSuppressed()[0].getCause()).isSameInstanceAs(fakeCancelError);
}

/**
* Test race between a request() and onComplete (b/295866356). This will stress the concurrency
* primitives in deliver() by running a many iterations across many threads. Some race conditions
* are very subtle and are very rare, so bugs in the implementation would present themselves as
* flakes in this test. All flakes of this test should be investigated as a failure.
*/
@Test
public void testRequestAndCompleteRaceCondition() throws Throwable {
int concurrency = 20;
int iterations = 20_000;

ExecutorService executor = Executors.newFixedThreadPool(concurrency);

List<Future<?>> results = new ArrayList<>();

for (int i = 0; i < concurrency; i++) {
Future<?> result =
executor.submit(
(Callable<Void>)
() -> {
for (int j = 0; j < iterations; j++) {
requestAndCompleteRaceConditionIteration();
}
return null;
});
results.add(result);
}

executor.shutdown();

for (Future<?> result : results) {
try {
result.get();
} catch (ExecutionException e) {
throw e.getCause();
}
}
}

private static void requestAndCompleteRaceConditionIteration()
throws InterruptedException, ExecutionException {
MockStreamingApi.MockResponseObserver<String> observer =
new MockStreamingApi.MockResponseObserver<>(false);
ReframingResponseObserver<String, String> underTest =
new ReframingResponseObserver<>(
observer, new ReframingResponseObserverTest.DasherizingReframer(1));

// This is intentionally not a Phaser, the Phaser seems to drastically reduce the reproduction
// rate of the
// original race condition.
CountDownLatch readySignal = new CountDownLatch(2);
CompletableFuture<Void> startSignal = new CompletableFuture<>();

ExecutorService executor = Executors.newFixedThreadPool(2);

Future<Void> f1 =
executor.submit(
() -> {
// no setup, tell controller thread we are ready and wait for the start signal
readySignal.countDown();
startSignal.get();

// Race start
underTest.onComplete();
// Race end

return null;
});

Future<Void> f2 =
executor.submit(
() -> {
// Setup before race - simulate that the ServerStream iterator got one row and is now
// checking if there
// is another. This is the lead up to the race with grpc's onComplete
underTest.onStart(
new StreamController() {
@Override
public void cancel() {}

@Override
public void disableAutoInboundFlowControl() {}

@Override
public void request(int count) {}
});
observer.getController().request(1);
underTest.onResponse("moo");

// Setup complete, tell controller thread we are ready and wait for the start signal
readySignal.countDown();
startSignal.get();

// Race start
observer.getController().request(1);
// Race end

return null;
});
executor.shutdown();

// Wait for worker setup
readySignal.await();
// Tell workers to race
startSignal.complete(null);

// Wait workers to finish
f1.get();
f2.get();

// the outer observer should be told of the completion of rpc
assertWithMessage("outer observer should not hang").that(observer.isDone()).isTrue();
}

/**
* A simple implementation of a {@link Reframer}. The input string is split by dash, and the
* output is concatenated by dashes. The test can verify M:N behavior by adjusting the
Expand Down

0 comments on commit 31b084a

Please sign in to comment.