Skip to content

Commit

Permalink
There have been reports of batcher.close() hanging every once in awhi…
Browse files Browse the repository at this point in the history
…le. Currently it is impossible to debug because we dont expose any internal state to analyze.

This PR adds 2 additional methods that should help in diagnosing issues:
1. close(timeout) will try to close the batcher, but if any of the underlying batch operations fail, the exception message will contain a wealth of information describing the underlying state of operations as provided by googleapis#3140
2. cancelOutstanding this allows for remediation for close(timeout) throwing an exception.

The intended usecase is dataflow connector's FinishBundle:

try {
  batcher.close(Duration.ofMinutes(1));
} catch(BatchingException e) {
   batcher.cancelOutstanding();
  batcher.close(Duration.ofMinutes(1));
}
  • Loading branch information
igorbernstein2 committed Aug 29, 2024
1 parent 979ac86 commit c889736
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import com.google.api.core.ApiFuture;
import com.google.api.core.InternalExtensionOnly;
import com.google.api.gax.rpc.ApiCallContext;
import javax.annotation.Nullable;
import org.threeten.bp.Duration;

/**
* Represents a batching context where individual elements will be accumulated and flushed in a
Expand Down Expand Up @@ -78,12 +80,26 @@ public interface Batcher<ElementT, ElementResultT> extends AutoCloseable {
void sendOutstanding();

/**
* Closes this Batcher by preventing new elements from being added, and then flushing the existing
* elements.
* Cancels all outstanding batch RPCs.
*/
void cancelOutstanding();

/**
* Closes this Batcher by preventing new elements from being added, then flushing the existing
* elements and waiting for all the outstanding work to be resolved.
*/
@Override
void close() throws InterruptedException;

/**
* Closes this Batcher by preventing new elements from being added, then flushing the existing
* elements and waiting for all the outstanding work to be resolved. If all of the outstanding
* work has not been resolved, then a {@link BatchingException} will be thrown with details of the
* remaining work. The batcher will remain in a closed state and will not allow additional
* elements to be added.
*/
void close(@Nullable Duration timeout) throws InterruptedException;

/**
* Closes this Batcher by preventing new elements from being added, and then sending outstanding
* elements. The returned future will be resolved when the last element completes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.util.concurrent.Futures;
Expand All @@ -51,16 +52,20 @@
import java.lang.ref.WeakReference;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.StringJoiner;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.TimeoutException;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.threeten.bp.Duration;

/**
* Queues up the elements until {@link #flush()} is called; once batching is over, returned future
Expand All @@ -86,7 +91,7 @@ public class BatcherImpl<ElementT, ElementResultT, RequestT, ResponseT>
private final BatcherReference currentBatcherReference;

private Batch<ElementT, ElementResultT, RequestT, ResponseT> currentOpenBatch;
private final AtomicInteger numOfOutstandingBatches = new AtomicInteger(0);
private final ConcurrentMap<Batch<ElementT, ElementResultT, RequestT, ResponseT>, Boolean> outstandingBatches = new ConcurrentHashMap<>();
private final Object flushLock = new Object();
private final Object elementLock = new Object();
private final Future<?> scheduledFuture;
Expand Down Expand Up @@ -297,8 +302,10 @@ public void sendOutstanding() {
} catch (Exception ex) {
batchResponse = ApiFutures.immediateFailedFuture(ex);
}
accumulatedBatch.setOperation(batchResponse);

outstandingBatches.put(accumulatedBatch, Boolean.TRUE);

numOfOutstandingBatches.incrementAndGet();
ApiFutures.addCallback(
batchResponse,
new ApiFutureCallback<ResponseT>() {
Expand All @@ -310,7 +317,7 @@ public void onSuccess(ResponseT response) {
accumulatedBatch.resource.getByteCount());
accumulatedBatch.onBatchSuccess(response);
} finally {
onBatchCompletion();
onBatchCompletion(accumulatedBatch);
}
}

Expand All @@ -322,18 +329,19 @@ public void onFailure(Throwable throwable) {
accumulatedBatch.resource.getByteCount());
accumulatedBatch.onBatchFailure(throwable);
} finally {
onBatchCompletion();
onBatchCompletion(accumulatedBatch);
}
}
},
directExecutor());
}

private void onBatchCompletion() {
private void onBatchCompletion(Batch<ElementT, ElementResultT, RequestT, ResponseT> batch) {
boolean shouldClose = false;

synchronized (flushLock) {
if (numOfOutstandingBatches.decrementAndGet() == 0) {
outstandingBatches.remove(batch);
if (outstandingBatches.isEmpty()) {
flushLock.notifyAll();
shouldClose = closeFuture != null;
}
Expand All @@ -349,22 +357,37 @@ private void onBatchCompletion() {
}

private void awaitAllOutstandingBatches() throws InterruptedException {
while (numOfOutstandingBatches.get() > 0) {
while (!outstandingBatches.isEmpty()) {
synchronized (flushLock) {
// Check again under lock to avoid racing with onBatchCompletion
if (numOfOutstandingBatches.get() == 0) {
if (outstandingBatches.isEmpty()) {
break;
}
flushLock.wait();
}
}
}

@Override
public void cancelOutstanding() {
for (Batch<?,?,?,?> batch : outstandingBatches.keySet()) {
batch.cancel();
}
}
/** {@inheritDoc} */
@Override
public void close() throws InterruptedException {
close(null);
}

@Override
public void close(@Nullable Duration timeout) throws InterruptedException {
try {
closeAsync().get();
if (timeout != null) {
closeAsync().get(timeout.toMillis(), TimeUnit.MILLISECONDS);
} else {
closeAsync().get();
}
} catch (ExecutionException e) {
// Original stacktrace of a batching exception is not useful, so rethrow the error with
// the caller stacktrace
Expand All @@ -374,6 +397,16 @@ public void close() throws InterruptedException {
} else {
throw new IllegalStateException("unexpected error closing the batcher", e.getCause());
}
} catch (TimeoutException e) {
StringJoiner batchesStr = new StringJoiner(",");
for (Batch<ElementT, ElementResultT, RequestT, ResponseT> batch : outstandingBatches.keySet()) {
batchesStr.add(batch.toString());
}
String msg = "Timed out trying to close batcher after " + timeout + ".";
msg += " Batch request prototype: " + prototype + ".";
msg += " Outstanding batches: " + batchesStr;

throw new BatchingException(msg);
}
}

Expand All @@ -393,7 +426,7 @@ public ApiFuture<Void> closeAsync() {
// prevent admission of new elements
closeFuture = SettableApiFuture.create();
// check if we can close immediately
closeImmediately = numOfOutstandingBatches.get() == 0;
closeImmediately = outstandingBatches.isEmpty();
}

// Clean up accounting
Expand Down Expand Up @@ -435,6 +468,8 @@ private static class Batch<ElementT, ElementResultT, RequestT, ResponseT> {
private long totalThrottledTimeMs = 0;
private BatchResource resource;

private ApiFuture<ResponseT> operation;

private Batch(
RequestT prototype,
BatchingDescriptor<ElementT, ElementResultT, RequestT, ResponseT> descriptor,
Expand All @@ -457,6 +492,18 @@ void add(
totalThrottledTimeMs += throttledTimeMs;
}

void setOperation(@Nonnull ApiFuture<ResponseT> operation) {
Preconditions.checkNotNull(operation);
this.operation = operation;
}

void cancel() {
if (this.operation != null) {
this.operation.cancel(true);
}
}


void onBatchSuccess(ResponseT response) {
try {
descriptor.splitResponse(response, entries);
Expand All @@ -480,6 +527,22 @@ void onBatchFailure(Throwable throwable) {
boolean isEmpty() {
return resource.getElementCount() == 0;
}

@Override
public String toString() {
StringJoiner elementsStr = new StringJoiner(",");
for (BatchEntry<ElementT, ElementResultT> entry : entries) {
elementsStr.add(
Optional.ofNullable(entry.getElement())
.map(Object::toString)
.orElse("null")
);
}
return MoreObjects.toStringHelper(this)
.add("operation", operation)
.add("elements", elementsStr)
.toString();
}
}

/**
Expand Down

0 comments on commit c889736

Please sign in to comment.