Skip to content

Commit

Permalink
address PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
m-trieu committed Nov 25, 2024
1 parent 1c895f5 commit a93bae5
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.apache.beam.runners.dataflow.worker.streaming.harness.StreamingWorkerHarness;
import org.apache.beam.runners.dataflow.worker.streaming.harness.StreamingWorkerStatusPages;
import org.apache.beam.runners.dataflow.worker.streaming.harness.StreamingWorkerStatusReporter;
import org.apache.beam.runners.dataflow.worker.streaming.harness.ThrottledTimeTracker;
import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor;
import org.apache.beam.runners.dataflow.worker.util.MemoryMonitor;
import org.apache.beam.runners.dataflow.worker.windmill.ApplianceWindmillClient;
Expand Down Expand Up @@ -347,14 +348,13 @@ private StreamingDataflowWorker(
.setComputationStateFetcher(this.computationStateCache::get)
.setWaitForResources(() -> memoryMonitor.waitForResources("GetWork"))
.setHeartbeatSender(heartbeatSender)
.setThrottleTimeSupplier(windmillServer::getAndResetThrottleTime)
.setThrottleTimeTracker(windmillServer::getAndResetThrottleTime)
.setGetWorkSender(getWorkSender)
.build();
}

this.workerStatusReporter =
streamingWorkerStatusReporterFactory.createStatusReporter(
streamingWorkerHarness::getAndResetThrottleTime);
streamingWorkerStatusReporterFactory.createStatusReporter(streamingWorkerHarness);
this.activeWorkRefresher =
new ActiveWorkRefresher(
clock,
Expand Down Expand Up @@ -559,41 +559,41 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o
.setWindmillServer(
GrpcWindmillServer.create(options, windmillStreamFactory, dispatcherClient))
.build();
} else {
// Build with local Windmill client.
if (options.getWindmillServiceEndpoint() != null
|| options.getLocalWindmillHostport().startsWith("grpc:")) {
GrpcDispatcherClient dispatcherClient =
GrpcDispatcherClient.create(options, new WindmillStubFactoryFactoryImpl(options));
GrpcWindmillStreamFactory windmillStreamFactory =
windmillStreamFactoryBuilder
.setHealthCheckIntervalMillis(
options.getWindmillServiceStreamingRpcHealthCheckPeriodMs())
.build();
GrpcWindmillServer windmillServer =
GrpcWindmillServer.create(options, windmillStreamFactory, dispatcherClient);
ComputationConfig.Fetcher configFetcher =
createApplianceComputationConfigFetcher(windmillServer);
return ConfigFetcherComputationStateCacheAndWindmillClient.builder()
.setWindmillDispatcherClient(dispatcherClient)
.setWindmillServer(windmillServer)
.setWindmillStreamFactory(windmillStreamFactory)
.setConfigFetcher(configFetcher)
.setComputationStateCache(computationStateCacheFactory.apply(configFetcher))
.build();
}
}

WindmillServerStub windmillServer =
new JniWindmillApplianceServer(options.getLocalWindmillHostport());
// Build with local Windmill client.
if (options.getWindmillServiceEndpoint() != null
|| options.getLocalWindmillHostport().startsWith("grpc:")) {
GrpcDispatcherClient dispatcherClient =
GrpcDispatcherClient.create(options, new WindmillStubFactoryFactoryImpl(options));
GrpcWindmillStreamFactory windmillStreamFactory =
windmillStreamFactoryBuilder
.setHealthCheckIntervalMillis(
options.getWindmillServiceStreamingRpcHealthCheckPeriodMs())
.build();
GrpcWindmillServer windmillServer =
GrpcWindmillServer.create(options, windmillStreamFactory, dispatcherClient);
ComputationConfig.Fetcher configFetcher =
createApplianceComputationConfigFetcher(windmillServer);
return ConfigFetcherComputationStateCacheAndWindmillClient.builder()
.setWindmillStreamFactory(windmillStreamFactoryBuilder.build())
.setWindmillDispatcherClient(dispatcherClient)
.setWindmillServer(windmillServer)
.setWindmillStreamFactory(windmillStreamFactory)
.setConfigFetcher(configFetcher)
.setComputationStateCache(computationStateCacheFactory.apply(configFetcher))
.build();
}

WindmillServerStub windmillServer =
new JniWindmillApplianceServer(options.getLocalWindmillHostport());
ComputationConfig.Fetcher configFetcher =
createApplianceComputationConfigFetcher(windmillServer);
return ConfigFetcherComputationStateCacheAndWindmillClient.builder()
.setWindmillStreamFactory(windmillStreamFactoryBuilder.build())
.setWindmillServer(windmillServer)
.setConfigFetcher(configFetcher)
.setComputationStateCache(computationStateCacheFactory.apply(configFetcher))
.build();
}

private static StreamingApplianceComputationConfigFetcher createApplianceComputationConfigFetcher(
Expand Down Expand Up @@ -948,7 +948,7 @@ private void onCompleteCommit(CompleteCommit completeCommit) {

@FunctionalInterface
private interface StreamingWorkerStatusReporterFactory {
StreamingWorkerStatusReporter createStatusReporter(Supplier<Long> throttleTimeSupplier);
StreamingWorkerStatusReporter createStatusReporter(ThrottledTimeTracker throttledTimeTracker);
}

@AutoValue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public final class SingleSourceWorkerHarness implements StreamingWorkerHarness {
private final Function<String, Optional<ComputationState>> computationStateFetcher;
private final ExecutorService workProviderExecutor;
private final GetWorkSender getWorkSender;
private final Supplier<Long> throttleTimeSupplier;
private final ThrottledTimeTracker throttleTimeTracker;

SingleSourceWorkerHarness(
WorkCommitter workCommitter,
Expand All @@ -76,7 +76,7 @@ public final class SingleSourceWorkerHarness implements StreamingWorkerHarness {
Runnable waitForResources,
Function<String, Optional<ComputationState>> computationStateFetcher,
GetWorkSender getWorkSender,
Supplier<Long> throttleTimeSupplier) {
ThrottledTimeTracker throttleTimeTracker) {
this.workCommitter = workCommitter;
this.getDataClient = getDataClient;
this.heartbeatSender = heartbeatSender;
Expand All @@ -92,7 +92,7 @@ public final class SingleSourceWorkerHarness implements StreamingWorkerHarness {
.build());
this.isRunning = new AtomicBoolean(false);
this.getWorkSender = getWorkSender;
this.throttleTimeSupplier = throttleTimeSupplier;
this.throttleTimeTracker = throttleTimeTracker;
}

public static SingleSourceWorkerHarness.Builder builder() {
Expand Down Expand Up @@ -149,7 +149,7 @@ public void shutdown() {

@Override
public long getAndResetThrottleTime() {
return throttleTimeSupplier.get();
return throttleTimeTracker.getAndResetThrottleTime();
}

private void streamingEngineDispatchLoop(
Expand Down Expand Up @@ -262,7 +262,7 @@ Builder setComputationStateFetcher(

Builder setGetWorkSender(GetWorkSender getWorkSender);

Builder setThrottleTimeSupplier(Supplier<Long> throttleTimeSupplier);
Builder setThrottleTimeTracker(ThrottledTimeTracker throttleTimeTracker);

SingleSourceWorkerHarness build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,8 @@

/** Provides an interface to start streaming worker processing. */
@Internal
public interface StreamingWorkerHarness {
public interface StreamingWorkerHarness extends ThrottledTimeTracker {
void start();

void shutdown();

long getAndResetThrottleTime();
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public final class StreamingWorkerStatusReporter {
private final int initialMaxThreadCount;
private final int initialMaxBundlesOutstanding;
private final WorkUnitClient dataflowServiceClient;
private final Supplier<Long> windmillQuotaThrottleTime;
private final ThrottledTimeTracker windmillQuotaThrottleTime;
private final Supplier<Collection<StageInfo>> allStageInfo;
private final FailureTracker failureTracker;
private final StreamingCounters streamingCounters;
Expand All @@ -101,7 +101,7 @@ public final class StreamingWorkerStatusReporter {
StreamingWorkerStatusReporter(
boolean publishCounters,
WorkUnitClient dataflowServiceClient,
Supplier<Long> windmillQuotaThrottleTime,
ThrottledTimeTracker windmillQuotaThrottleTime,
Supplier<Collection<StageInfo>> allStageInfo,
FailureTracker failureTracker,
StreamingCounters streamingCounters,
Expand Down Expand Up @@ -253,7 +253,9 @@ private void reportHarnessStartup() {
private void sendWorkerUpdatesToDataflowService(
CounterSet deltaCounters, CounterSet cumulativeCounters) throws IOException {
// Throttle time is tracked by the windmillServer but is reported to DFE here.
streamingCounters.windmillQuotaThrottling().addValue(windmillQuotaThrottleTime.get());
streamingCounters
.windmillQuotaThrottling()
.addValue(windmillQuotaThrottleTime.getAndResetThrottleTime());
if (memoryMonitor.isThrashing()) {
streamingCounters.memoryThrashing().addValue(1);
}
Expand Down Expand Up @@ -460,7 +462,7 @@ public interface Builder {

Builder setDataflowServiceClient(WorkUnitClient dataflowServiceClient);

Builder setWindmillQuotaThrottleTime(Supplier<Long> windmillQuotaThrottleTime);
Builder setWindmillQuotaThrottleTime(ThrottledTimeTracker windmillQuotaThrottleTime);

Builder setAllStageInfo(Supplier<Collection<StageInfo>> allStageInfo);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.dataflow.worker.streaming.harness;

import org.apache.beam.sdk.annotations.Internal;

/**
* Tracks time spent in a throttled state due to {@code Status.RESOURCE_EXHAUSTED} errors returned
* from gRPC calls.
*/
@Internal
@FunctionalInterface
public interface ThrottledTimeTracker {

/** Returns the combined total of all throttle times and resets those times to 0. */
long getAndResetThrottleTime();
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ private SingleSourceWorkerHarness createWorkerHarness(
.setStreamingWorkScheduler(streamingWorkScheduler)
.setComputationStateFetcher(computationStateFetcher)
// no-op throttle time supplier.
.setThrottleTimeSupplier(() -> 0L)
.setThrottleTimeTracker(() -> 0L)
.setGetWorkSender(getWorkSender)
.build();
}
Expand Down

0 comments on commit a93bae5

Please sign in to comment.