Skip to content

Commit

Permalink
address PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
m-trieu committed Oct 15, 2024
1 parent b866d42 commit 496d1ed
Show file tree
Hide file tree
Showing 7 changed files with 21 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public final class FanOutStreamingEngineWorkerHarness implements StreamingWorker
private final ThrottlingGetDataMetricTracker getDataMetricTracker;
private final ExecutorService windmillStreamManager;
private final ExecutorService workerMetadataConsumer;
private final Object metadataLock;
private final Object metadataLock = new Object();

/** Writes are guarded by synchronization, reads are lock free. */
private final AtomicReference<StreamingEngineBackends> backends;
Expand Down Expand Up @@ -142,7 +142,6 @@ private FanOutStreamingEngineWorkerHarness(
this.totalGetWorkBudget = totalGetWorkBudget;
this.activeMetadataVersion = Long.MIN_VALUE;
this.workCommitterFactory = workCommitterFactory;
this.metadataLock = new Object();
this.getWorkerMetadataStream = null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.beam.runners.dataflow.worker.windmill.WindmillEndpoints.Endpoint;
import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetDataStream;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Suppliers;

@Internal
@ThreadSafe
Expand All @@ -34,7 +35,8 @@ final class GlobalDataStreamSender implements Closeable, Supplier<GetDataStream>
private volatile boolean started;

GlobalDataStreamSender(Supplier<GetDataStream> delegate, Endpoint endpoint) {
this.delegate = delegate;
// Ensures that the Supplier is thread-safe
this.delegate = Suppliers.memoize(delegate::get);
this.started = false;
this.endpoint = endpoint;
}
Expand All @@ -44,12 +46,15 @@ public GetDataStream get() {
if (!started) {
started = true;
}

return delegate.get();
}

@Override
public void close() {
delegate.get().shutdown();
if (started) {
delegate.get().shutdown();
}
}

Endpoint endpoint() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ public void close() {
public void setBudget(long items, long bytes) {
getWorkBudget.set(getWorkBudget.get().apply(items, bytes));
if (started.get()) {
getWorkStream.get().adjustBudget(items, bytes);
getWorkStream.get().setBudget(items, bytes);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,11 @@ public interface WindmillStream {
@ThreadSafe
interface GetWorkStream extends WindmillStream {
/** Adjusts the {@link GetWorkBudget} for the stream. */
void adjustBudget(long itemsDelta, long bytesDelta);
void setBudget(long newItems, long newBytes);

/** Returns the remaining in-flight {@link GetWorkBudget}. */
GetWorkBudget remainingBudget();
default void setBudget(GetWorkBudget newBudget) {
setBudget(newBudget.items(), newBudget.bytes());
}
}

/** Interface for streaming GetDataRequests to Windmill. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ private static Watermarks createWatermarks(
* which can deadlock since we send on the stream beneath the synchronization. {@link
* AbstractWindmillStream#send(Object)} is synchronized so the sends are already guarded.
*/
private void sendRequestExtension(GetWorkBudget extension) {
private void maybeSendRequestExtension(GetWorkBudget extension) {
if (extension.items() > 0 || extension.bytes() > 0) {
executeSafely(
() -> {
Expand Down Expand Up @@ -203,7 +203,8 @@ protected synchronized void onNewStream() {
StreamingGetWorkRequest request =
StreamingGetWorkRequest.newBuilder()
.setRequest(
requestHeader.toBuilder()
requestHeader
.toBuilder()
.setMaxItems(initialGetWorkBudget.items())
.setMaxBytes(initialGetWorkBudget.bytes())
.build())
Expand Down Expand Up @@ -262,7 +263,7 @@ private void consumeAssembledWorkItem(AssembledWorkItem assembledWorkItem) {
assembledWorkItem.latencyAttributions());
budgetTracker.recordBudgetReceived(assembledWorkItem.bufferedSize());
GetWorkBudget extension = budgetTracker.computeBudgetExtension(maxGetWorkBudget.get());
sendRequestExtension(extension);
maybeSendRequestExtension(extension);
}

private Work.ProcessingContext createProcessingContext(String computationId) {
Expand All @@ -276,17 +277,12 @@ protected void startThrottleTimer() {
}

@Override
public void adjustBudget(long newItems, long newBytes) {
public void setBudget(long newItems, long newBytes) {
GetWorkBudget currentMaxGetWorkBudget =
maxGetWorkBudget.updateAndGet(
ignored -> GetWorkBudget.builder().setItems(newItems).setBytes(newBytes).build());
GetWorkBudget extension = budgetTracker.computeBudgetExtension(currentMaxGetWorkBudget);
sendRequestExtension(extension);
}

@Override
public GetWorkBudget remainingBudget() {
return maxGetWorkBudget.get().subtract(budgetTracker.inFlightBudget());
maybeSendRequestExtension(extension);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.observers.StreamObserverFactory;
import org.apache.beam.runners.dataflow.worker.windmill.client.throttling.ThrottleTimer;
import org.apache.beam.runners.dataflow.worker.windmill.work.WorkItemReceiver;
import org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudget;
import org.apache.beam.sdk.util.BackOff;
import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.StreamObserver;

Expand Down Expand Up @@ -194,15 +193,7 @@ protected void startThrottleTimer() {
}

@Override
public void adjustBudget(long itemsDelta, long bytesDelta) {
public void setBudget(long newItems, long newBytes) {
// no-op
}

@Override
public GetWorkBudget remainingBudget() {
return GetWorkBudget.builder()
.setBytes(request.getMaxBytes() - inflightBytes.get())
.setItems(request.getMaxItems() - inflightMessages.get())
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@
import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetDataStream;
import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetWorkStream;
import org.apache.beam.runners.dataflow.worker.windmill.work.WorkItemReceiver;
import org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudget;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Uninterruptibles;
Expand Down Expand Up @@ -245,18 +244,10 @@ public void halfClose() {
}

@Override
public void adjustBudget(long itemsDelta, long bytesDelta) {
public void setBudget(long newItems, long newBytes) {
// no-op.
}

@Override
public GetWorkBudget remainingBudget() {
return GetWorkBudget.builder()
.setItems(request.getMaxItems())
.setBytes(request.getMaxBytes())
.build();
}

@Override
public boolean awaitTermination(int time, TimeUnit unit) throws InterruptedException {
while (done.getCount() > 0) {
Expand Down

0 comments on commit 496d1ed

Please sign in to comment.