Skip to content

Commit

Permalink
Address CL comments
Browse files Browse the repository at this point in the history
  • Loading branch information
m-trieu committed Nov 30, 2023
1 parent b6362e4 commit 2d82662
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ public final class GrpcDirectGetWorkStream
.build())
.build();

private final AtomicReference<GetWorkBudget> inFlightBudget;
private final AtomicReference<GetWorkBudget> nextBudgetAdjustment;
private final AtomicReference<GetWorkBudget> pendingResponseBudget;
private final GetWorkRequest request;
private final WorkItemProcessor workItemProcessorFn;
private final ThrottleTimer getWorkThrottleTimer;
Expand All @@ -84,10 +87,6 @@ public final class GrpcDirectGetWorkStream
*/
private final ConcurrentMap<Long, WorkItemBuffer> workItemBuffers;

private final AtomicReference<GetWorkBudget> inflightBudget;
private final AtomicReference<GetWorkBudget> nextBudgetAdjustment;
private final AtomicReference<GetWorkBudget> pendingResponseBudget;

private GrpcDirectGetWorkStream(
Function<
StreamObserver<StreamingGetWorkResponseChunk>,
Expand All @@ -112,8 +111,7 @@ private GrpcDirectGetWorkStream(
// stream.
this.getDataStream = Suppliers.memoize(getDataStream::get);
this.commitWorkStream = Suppliers.memoize(commitWorkStream::get);

this.inflightBudget = new AtomicReference<>(GetWorkBudget.noBudget());
this.inFlightBudget = new AtomicReference<>(GetWorkBudget.noBudget());
this.nextBudgetAdjustment = new AtomicReference<>(GetWorkBudget.noBudget());
this.pendingResponseBudget = new AtomicReference<>(GetWorkBudget.noBudget());
}
Expand Down Expand Up @@ -178,10 +176,11 @@ private void sendRequestExtension() {
@Override
protected synchronized void onNewStream() {
workItemBuffers.clear();
// Add the current inflight budget to the next adjustment. Only positive values are allowed here
// Add the current in-flight budget to the next adjustment. Only positive values are allowed
// here
// with negatives defaulting to 0, since GetWorkBudgets cannot be created with negative values.
GetWorkBudget budgetAdjustment = nextBudgetAdjustment.get().apply(inflightBudget.get());
inflightBudget.set(budgetAdjustment);
GetWorkBudget budgetAdjustment = nextBudgetAdjustment.get().apply(inFlightBudget.get());
inFlightBudget.set(budgetAdjustment);
send(
StreamingGetWorkRequest.newBuilder()
.setRequest(
Expand All @@ -205,7 +204,7 @@ public void appendSpecificHtml(PrintWriter writer) {
// Number of buffers is same as distinct workers that sent work on this stream.
writer.format(
"GetWorkStream: %d buffers, %s inflight budget allowed.",
workItemBuffers.size(), inflightBudget.get());
workItemBuffers.size(), inFlightBudget.get());
}

@Override
Expand All @@ -224,7 +223,7 @@ protected void onResponse(StreamingGetWorkResponseChunk chunk) {
if (chunk.getRemainingBytesForWorkItem() == 0) {
workItemBuffer.runAndReset();
// Record the fact that there are now fewer outstanding messages and bytes on the stream.
inflightBudget.updateAndGet(budget -> budget.subtract(1, workItemBuffer.bufferedSize()));
inFlightBudget.updateAndGet(budget -> budget.subtract(1, workItemBuffer.bufferedSize()));
}
}

Expand All @@ -244,18 +243,11 @@ public GetWorkBudget remainingBudget() {
// Snapshot the current budgets.
GetWorkBudget currentPendingResponseBudget = pendingResponseBudget.get();
GetWorkBudget currentNextBudgetAdjustment = nextBudgetAdjustment.get();
GetWorkBudget currentInflightBudget = inflightBudget.get();
GetWorkBudget currentInflightBudget = inFlightBudget.get();

return GetWorkBudget.builder()
.setItems(
currentNextBudgetAdjustment.items()
+ currentPendingResponseBudget.items()
+ currentInflightBudget.items())
.setBytes(
currentNextBudgetAdjustment.bytes()
+ currentPendingResponseBudget.bytes()
+ currentInflightBudget.bytes())
.build();
return currentPendingResponseBudget
.apply(currentNextBudgetAdjustment)
.apply(currentInflightBudget);
}

private synchronized void updatePendingResponseBudget(long itemsDelta, long bytesDelta) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,6 @@ public void testStreamsStartCorrectly() throws InterruptedException {
String workerToken = "workerToken1";
String workerToken2 = "workerToken2";

Thread streamingEngineClientThread = new Thread(this::waitForFirstWorkerMetadata);
WorkerMetadataResponse firstWorkerMetadata =
WorkerMetadataResponse.newBuilder()
.setMetadataVersion(1)
Expand All @@ -205,7 +204,6 @@ public void testStreamsStartCorrectly() throws InterruptedException {
.putAllGlobalDataEndpoints(DEFAULT)
.build();

streamingEngineClientThread.start();
getWorkerMetadataReady.await();
fakeGetWorkerMetadataStub.injectWorkerMetadata(firstWorkerMetadata);
StreamingEngineConnectionState currentConnections = waitForWorkerMetadataToBeConsumed(1);
Expand Down Expand Up @@ -240,9 +238,6 @@ public void testScheduledBudgetRefresh() throws InterruptedException {
newStreamingEngineClient(
GetWorkBudget.builder().setItems(1L).setBytes(1L).build(), noOpProcessWorkItemFn());

Thread streamingEngineClientThread = new Thread(this::waitForFirstWorkerMetadata);

streamingEngineClientThread.start();
getWorkerMetadataReady.await();
fakeGetWorkerMetadataStub.injectWorkerMetadata(
WorkerMetadataResponse.newBuilder()
Expand All @@ -266,7 +261,6 @@ public void testOnNewWorkerMetadata_correctlyRemovesStaleWindmillServers()
String workerToken2 = "workerToken2";
String workerToken3 = "workerToken3";

Thread streamingEngineClientThread = new Thread(this::waitForFirstWorkerMetadata);
WorkerMetadataResponse firstWorkerMetadata =
WorkerMetadataResponse.newBuilder()
.setMetadataVersion(1)
Expand All @@ -284,7 +278,6 @@ public void testOnNewWorkerMetadata_correctlyRemovesStaleWindmillServers()
.putAllGlobalDataEndpoints(DEFAULT)
.build();

streamingEngineClientThread.start();
getWorkerMetadataReady.await();
fakeGetWorkerMetadataStub.injectWorkerMetadata(firstWorkerMetadata);
fakeGetWorkerMetadataStub.injectWorkerMetadata(secondWorkerMetadata);
Expand Down Expand Up @@ -314,7 +307,6 @@ public void testOnNewWorkerMetadata_redistributesBudget() throws InterruptedExce
String workerToken2 = "workerToken2";
String workerToken3 = "workerToken3";

Thread streamingEngineClientThread = new Thread(this::waitForFirstWorkerMetadata);
WorkerMetadataResponse firstWorkerMetadata =
WorkerMetadataResponse.newBuilder()
.setMetadataVersion(1)
Expand All @@ -337,7 +329,6 @@ public void testOnNewWorkerMetadata_redistributesBudget() throws InterruptedExce
.putAllGlobalDataEndpoints(DEFAULT)
.build();

streamingEngineClientThread.start();
getWorkerMetadataReady.await();
fakeGetWorkerMetadataStub.injectWorkerMetadata(firstWorkerMetadata);
Thread.sleep(50);
Expand Down Expand Up @@ -366,10 +357,6 @@ private StreamingEngineConnectionState waitForWorkerMetadataToBeConsumed(
return connections.get();
}

private void waitForFirstWorkerMetadata() {
while (!Preconditions.checkNotNull(streamingEngineClient).isWorkerMetadataReady()) {}
}

private static class GetWorkerMetadataTestStub
extends CloudWindmillServiceV1Alpha1Grpc.CloudWindmillServiceV1Alpha1ImplBase {
private static final WorkerMetadataResponse CLOSE_ALL_STREAMS =
Expand Down

0 comments on commit 2d82662

Please sign in to comment.