From 6ec1fb23ece81721806bdc1323ffb23fa7ce55a0 Mon Sep 17 00:00:00 2001 From: martin trieu Date: Fri, 28 Jun 2024 02:31:11 -0700 Subject: [PATCH] move heartbeat processor to where it is being used (#31298) --- .../worker/StreamingDataflowWorker.java | 172 +++++++++++------- .../client/grpc/GrpcWindmillServer.java | 53 +++--- .../grpc/GrpcWindmillStreamFactory.java | 101 ++++++---- .../client/grpc/StreamingEngineClient.java | 22 +-- .../client/grpc/WindmillStreamSender.java | 19 +- .../grpc/StreamingEngineClientTest.java | 5 +- .../client/grpc/WindmillStreamSenderTest.java | 14 +- .../EvenGetWorkBudgetDistributorTest.java | 3 +- 8 files changed, 210 insertions(+), 179 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java index 98015e2ea715..fc1be2cd1372 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java @@ -22,6 +22,7 @@ import com.google.api.services.dataflow.model.CounterUpdate; import com.google.api.services.dataflow.model.MapTask; +import com.google.auto.value.AutoValue; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -38,7 +39,6 @@ import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; -import org.apache.beam.repackaged.core.org.apache.commons.lang3.tuple.Pair; import org.apache.beam.runners.core.metrics.MetricsLogger; import org.apache.beam.runners.dataflow.DataflowRunner; import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions; @@ -103,7 +103,6 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Uninterruptibles; -import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Duration; import org.joda.time.Instant; import org.slf4j.Logger; @@ -121,11 +120,6 @@ public class StreamingDataflowWorker { MetricName.named( "org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl", "throttling-msecs"); - // Maximum number of threads for processing. Currently each thread processes one key at a time. - static final int MAX_PROCESSING_THREADS = 300; - static final long THREAD_EXPIRATION_TIME_SEC = 60; - static final int GET_WORK_STREAM_TIMEOUT_MINUTES = 3; - static final Duration COMMIT_STREAM_TIMEOUT = Duration.standardMinutes(1); /** * Sinks are marked 'full' in {@link StreamingModeExecutionContext} once the amount of data sinked @@ -135,13 +129,20 @@ public class StreamingDataflowWorker { */ public static final int MAX_SINK_BYTES = 10_000_000; + // Maximum number of threads for processing. Currently, each thread processes one key at a time. + static final int MAX_PROCESSING_THREADS = 300; + static final long THREAD_EXPIRATION_TIME_SEC = 60; + static final int GET_WORK_STREAM_TIMEOUT_MINUTES = 3; + static final Duration COMMIT_STREAM_TIMEOUT = Duration.standardMinutes(1); private static final Logger LOG = LoggerFactory.getLogger(StreamingDataflowWorker.class); + /** The idGenerator to generate unique id globally. */ private static final IdGenerator ID_GENERATOR = IdGenerators.decrementingLongs(); private static final int DEFAULT_STATUS_PORT = 8081; // Maximum size of the result of a GetWork request. private static final long MAX_GET_WORK_FETCH_BYTES = 64L << 20; // 64m + /** Maximum number of failure stacktraces to report in each update sent to backend. */ private static final int MAX_FAILURES_TO_REPORT_IN_UPDATE = 1000; @@ -328,39 +329,27 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o threadName -> Executors.newSingleThreadScheduledExecutor( new ThreadFactoryBuilder().setNameFormat(threadName).build()); - GrpcWindmillStreamFactory windmillStreamFactory = - createWindmillStreamFactory(options, clientId); - GrpcDispatcherClient dispatcherClient = GrpcDispatcherClient.create(createStubFactory(options)); - - // If ComputationConfig.Fetcher is the Streaming Appliance implementation, WindmillServerStub - // can be created without a heartbeat response processor, as appliance does not send heartbeats. - Pair> configFetcherAndWindmillClient = - createConfigFetcherAndWindmillClient( - options, - dataflowServiceClient, - dispatcherClient, - maxWorkItemCommitBytes, - windmillStreamFactory); + GrpcWindmillStreamFactory.Builder windmillStreamFactoryBuilder = + createGrpcwindmillStreamFactoryBuilder(options, clientId); + + ConfigFetcherComputationStateCacheAndWindmillClient + configFetcherComputationStateCacheAndWindmillClient = + createConfigFetcherComputationStateCacheAndWindmillClient( + options, + dataflowServiceClient, + maxWorkItemCommitBytes, + windmillStreamFactoryBuilder, + configFetcher -> + ComputationStateCache.create( + configFetcher, + workExecutor, + windmillStateCache::forComputation, + ID_GENERATOR)); ComputationStateCache computationStateCache = - ComputationStateCache.create( - configFetcherAndWindmillClient.getLeft(), - workExecutor, - windmillStateCache::forComputation, - ID_GENERATOR); - - // If WindmillServerStub is not present, it is a Streaming Engine job. We now have all the - // components created to initialize the GrpcWindmillServer. + configFetcherComputationStateCacheAndWindmillClient.computationStateCache(); WindmillServerStub windmillServer = - configFetcherAndWindmillClient - .getRight() - .orElseGet( - () -> - GrpcWindmillServer.create( - options, - windmillStreamFactory, - dispatcherClient, - new WorkHeartbeatResponseProcessor(computationStateCache::get))); + configFetcherComputationStateCacheAndWindmillClient.windmillServer(); FailureTracker failureTracker = options.isEnableStreamingEngine() @@ -393,7 +382,7 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o return new StreamingDataflowWorker( windmillServer, clientId, - configFetcherAndWindmillClient.getLeft(), + configFetcherComputationStateCacheAndWindmillClient.configFetcher(), computationStateCache, windmillStateCache, workExecutor, @@ -407,20 +396,29 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o streamingCounters, memoryMonitor, maxWorkItemCommitBytes, - windmillStreamFactory, + configFetcherComputationStateCacheAndWindmillClient.windmillStreamFactory(), executorSupplier, stageInfo); } - private static Pair> - createConfigFetcherAndWindmillClient( + /** + * {@link ComputationConfig.Fetcher}, {@link ComputationStateCache}, and {@link + * WindmillServerStub} are constructed in different orders due to cyclic dependencies depending on + * the underlying implementation. This method simplifies creating them and returns an object with + * all of these dependencies initialized. + */ + private static ConfigFetcherComputationStateCacheAndWindmillClient + createConfigFetcherComputationStateCacheAndWindmillClient( DataflowWorkerHarnessOptions options, WorkUnitClient dataflowServiceClient, - GrpcDispatcherClient dispatcherClient, AtomicInteger maxWorkItemCommitBytes, - GrpcWindmillStreamFactory windmillStreamFactory) { + GrpcWindmillStreamFactory.Builder windmillStreamFactoryBuilder, + Function computationStateCacheFactory) { ComputationConfig.Fetcher configFetcher; - @Nullable WindmillServerStub windmillServer = null; + WindmillServerStub windmillServer; + ComputationStateCache computationStateCache; + GrpcDispatcherClient dispatcherClient = GrpcDispatcherClient.create(createStubFactory(options)); + GrpcWindmillStreamFactory windmillStreamFactory; if (options.isEnableStreamingEngine()) { configFetcher = StreamingEngineComputationConfigFetcher.create( @@ -431,13 +429,36 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o config, dispatcherClient::consumeWindmillDispatcherEndpoints, maxWorkItemCommitBytes)); + computationStateCache = computationStateCacheFactory.apply(configFetcher); + windmillStreamFactory = + windmillStreamFactoryBuilder + .setProcessHeartbeatResponses( + new WorkHeartbeatResponseProcessor(computationStateCache::get)) + .setHealthCheckIntervalMillis( + options.getWindmillServiceStreamingRpcHealthCheckPeriodMs()) + .build(); + windmillServer = GrpcWindmillServer.create(options, windmillStreamFactory, dispatcherClient); } else { - windmillServer = - createWindmillServerStub(options, windmillStreamFactory, dispatcherClient, ignored -> {}); + if (options.getWindmillServiceEndpoint() != null + || options.getLocalWindmillHostport().startsWith("grpc:")) { + windmillStreamFactory = + windmillStreamFactoryBuilder + .setHealthCheckIntervalMillis( + options.getWindmillServiceStreamingRpcHealthCheckPeriodMs()) + .build(); + windmillServer = + GrpcWindmillServer.create(options, windmillStreamFactory, dispatcherClient); + } else { + windmillStreamFactory = windmillStreamFactoryBuilder.build(); + windmillServer = new JniWindmillApplianceServer(options.getLocalWindmillHostport()); + } + configFetcher = new StreamingApplianceComputationConfigFetcher(windmillServer::getConfig); + computationStateCache = computationStateCacheFactory.apply(configFetcher); } - return Pair.of(configFetcher, Optional.ofNullable(windmillServer)); + return ConfigFetcherComputationStateCacheAndWindmillClient.create( + configFetcher, computationStateCache, windmillServer, windmillStreamFactory); } @VisibleForTesting @@ -516,6 +537,11 @@ static StreamingDataflowWorker forTesting( options.getWindmillHarnessUpdateReportingPeriod().getMillis(), options.getPerWorkerMetricsUpdateReportingPeriodMillis()); + GrpcWindmillStreamFactory.Builder windmillStreamFactory = + createGrpcwindmillStreamFactoryBuilder(options, 1) + .setProcessHeartbeatResponses( + new WorkHeartbeatResponseProcessor(computationStateCache::get)); + return new StreamingDataflowWorker( windmillServer, 1L, @@ -533,7 +559,12 @@ static StreamingDataflowWorker forTesting( streamingCounters, memoryMonitor, maxWorkItemCommitBytes, - createWindmillStreamFactory(options, 1), + options.isEnableStreamingEngine() + ? windmillStreamFactory + .setHealthCheckIntervalMillis( + options.getWindmillServiceStreamingRpcHealthCheckPeriodMs()) + .build() + : windmillStreamFactory.build(), executorSupplier, stageInfo); } @@ -552,7 +583,7 @@ private static void onPipelineConfig( } } - private static GrpcWindmillStreamFactory createWindmillStreamFactory( + private static GrpcWindmillStreamFactory.Builder createGrpcwindmillStreamFactoryBuilder( DataflowWorkerHarnessOptions options, long clientId) { Duration maxBackoff = !options.isEnableStreamingEngine() && options.getLocalWindmillHostport() != null @@ -569,7 +600,10 @@ private static GrpcWindmillStreamFactory createWindmillStreamFactory( .setMaxBackOffSupplier(() -> maxBackoff) .setLogEveryNStreamFailures(options.getWindmillServiceStreamingLogEveryNStreamFailures()) .setStreamingRpcBatchLimit(options.getWindmillServiceStreamingRpcBatchLimit()) - .build(); + .setSendKeyedGetDataRequests( + !options.isEnableStreamingEngine() + || !DataflowRunner.hasExperiment( + options, "streaming_engine_send_new_heartbeat_requests")); } private static BoundedQueueExecutor createWorkUnitExecutor(DataflowWorkerHarnessOptions options) { @@ -619,23 +653,6 @@ public static void main(String[] args) throws Exception { worker.start(); } - private static WindmillServerStub createWindmillServerStub( - DataflowWorkerHarnessOptions options, - GrpcWindmillStreamFactory windmillStreamFactory, - GrpcDispatcherClient dispatcherClient, - Consumer> processHeartbeatResponses) { - if (options.getWindmillServiceEndpoint() != null - || options.isEnableStreamingEngine() - || options.getLocalWindmillHostport().startsWith("grpc:")) { - windmillStreamFactory.scheduleHealthChecks( - options.getWindmillServiceStreamingRpcHealthCheckPeriodMs()); - return GrpcWindmillServer.create( - options, windmillStreamFactory, dispatcherClient, processHeartbeatResponses); - } else { - return new JniWindmillApplianceServer(options.getLocalWindmillHostport()); - } - } - private static ChannelCachingStubFactory createStubFactory( DataflowWorkerHarnessOptions workerOptions) { Function channelFactory = @@ -895,4 +912,25 @@ public Iterable buildCounters() { .pendingCumulativeCounters() .extractUpdates(false, DataflowCounterUpdateExtractor.INSTANCE)); } + + @AutoValue + abstract static class ConfigFetcherComputationStateCacheAndWindmillClient { + + private static ConfigFetcherComputationStateCacheAndWindmillClient create( + ComputationConfig.Fetcher configFetcher, + ComputationStateCache computationStateCache, + WindmillServerStub windmillServer, + GrpcWindmillStreamFactory windmillStreamFactory) { + return new AutoValue_StreamingDataflowWorker_ConfigFetcherComputationStateCacheAndWindmillClient( + configFetcher, computationStateCache, windmillServer, windmillStreamFactory); + } + + abstract ComputationConfig.Fetcher configFetcher(); + + abstract ComputationStateCache computationStateCache(); + + abstract WindmillServerStub windmillServer(); + + abstract GrpcWindmillStreamFactory windmillStreamFactory(); + } } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java index 69807c523edf..abf85d98548e 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java @@ -28,7 +28,6 @@ import java.util.HashSet; import java.util.List; import java.util.Set; -import java.util.function.Consumer; import java.util.function.Supplier; import javax.annotation.Nullable; import org.apache.beam.runners.dataflow.DataflowRunner; @@ -40,7 +39,6 @@ import org.apache.beam.runners.dataflow.worker.windmill.Windmill; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.CommitWorkRequest; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.CommitWorkResponse; -import org.apache.beam.runners.dataflow.worker.windmill.Windmill.ComputationHeartbeatResponse; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetConfigRequest; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetConfigResponse; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetDataRequest; @@ -96,29 +94,19 @@ public final class GrpcWindmillServer extends WindmillServerStub { private final GrpcDispatcherClient dispatcherClient; private final DataflowWorkerHarnessOptions options; private final StreamingEngineThrottleTimers throttleTimers; + private final GrpcWindmillStreamFactory windmillStreamFactory; private Duration maxBackoff; private @Nullable WindmillApplianceGrpc.WindmillApplianceBlockingStub syncApplianceStub; - // If true, then active work refreshes will be sent as KeyedGetDataRequests. Otherwise, use the - // newer ComputationHeartbeatRequests. - private final boolean sendKeyedGetDataRequests; - private final Consumer> processHeartbeatResponses; - private final GrpcWindmillStreamFactory windmillStreamFactory; private GrpcWindmillServer( DataflowWorkerHarnessOptions options, GrpcWindmillStreamFactory grpcWindmillStreamFactory, - GrpcDispatcherClient grpcDispatcherClient, - Consumer> processHeartbeatResponses) { + GrpcDispatcherClient grpcDispatcherClient) { this.options = options; this.throttleTimers = StreamingEngineThrottleTimers.create(); this.maxBackoff = Duration.millis(options.getWindmillServiceStreamMaxBackoffMillis()); this.dispatcherClient = grpcDispatcherClient; this.syncApplianceStub = null; - this.sendKeyedGetDataRequests = - !options.isEnableStreamingEngine() - || !DataflowRunner.hasExperiment( - options, "streaming_engine_send_new_heartbeat_requests"); - this.processHeartbeatResponses = processHeartbeatResponses; this.windmillStreamFactory = grpcWindmillStreamFactory; } @@ -148,11 +136,9 @@ private static DataflowWorkerHarnessOptions testOptions( public static GrpcWindmillServer create( DataflowWorkerHarnessOptions workerOptions, GrpcWindmillStreamFactory grpcWindmillStreamFactory, - GrpcDispatcherClient dispatcherClient, - Consumer> processHeartbeatResponses) { + GrpcDispatcherClient dispatcherClient) { GrpcWindmillServer grpcWindmillServer = - new GrpcWindmillServer( - workerOptions, grpcWindmillStreamFactory, dispatcherClient, processHeartbeatResponses); + new GrpcWindmillServer(workerOptions, grpcWindmillStreamFactory, dispatcherClient); if (workerOptions.getWindmillServiceEndpoint() != null) { grpcWindmillServer.configureWindmillServiceEndpoints(); } else if (!workerOptions.isEnableStreamingEngine() @@ -188,11 +174,18 @@ static GrpcWindmillServer newTestInstance( DataflowWorkerHarnessOptions testOptions = testOptions(/* enableStreamingEngine= */ true, experiments); + boolean sendKeyedGetDataRequests = + !testOptions.isEnableStreamingEngine() + || !DataflowRunner.hasExperiment( + testOptions, "streaming_engine_send_new_heartbeat_requests"); GrpcWindmillStreamFactory windmillStreamFactory = - GrpcWindmillStreamFactory.of(createJobHeader(testOptions, clientId)).build(); - windmillStreamFactory.scheduleHealthChecks( - testOptions.getWindmillServiceStreamingRpcHealthCheckPeriodMs()); - return new GrpcWindmillServer(testOptions, windmillStreamFactory, dispatcherClient, noop -> {}); + GrpcWindmillStreamFactory.of(createJobHeader(testOptions, clientId)) + .setSendKeyedGetDataRequests(sendKeyedGetDataRequests) + .setHealthCheckIntervalMillis( + testOptions.getWindmillServiceStreamingRpcHealthCheckPeriodMs()) + .build(); + + return new GrpcWindmillServer(testOptions, windmillStreamFactory, dispatcherClient); } @VisibleForTesting @@ -205,8 +198,7 @@ static GrpcWindmillServer newApplianceTestInstance( options, GrpcWindmillStreamFactory.of(createJobHeader(options, 1)).build(), // No-op, Appliance does not use Dispatcher to call Streaming Engine. - GrpcDispatcherClient.create(windmillStubFactory), - noop -> {}); + GrpcDispatcherClient.create(windmillStubFactory)); testServer.syncApplianceStub = createWindmillApplianceStubWithDeadlineInterceptor(channel); return testServer; } @@ -253,13 +245,13 @@ private void configureLocalHost() { } @Override - public void setWindmillServiceEndpoints(Set endpoints) { - dispatcherClient.consumeWindmillDispatcherEndpoints(ImmutableSet.copyOf(endpoints)); + public ImmutableSet getWindmillServiceEndpoints() { + return dispatcherClient.getDispatcherEndpoints(); } @Override - public ImmutableSet getWindmillServiceEndpoints() { - return dispatcherClient.getDispatcherEndpoints(); + public void setWindmillServiceEndpoints(Set endpoints) { + dispatcherClient.consumeWindmillDispatcherEndpoints(ImmutableSet.copyOf(endpoints)); } @Override @@ -357,10 +349,7 @@ public GetWorkStream getWorkStream(GetWorkRequest request, WorkItemReceiver rece @Override public GetDataStream getDataStream() { return windmillStreamFactory.createGetDataStream( - dispatcherClient.getWindmillServiceStub(), - throttleTimers.getDataThrottleTimer(), - sendKeyedGetDataRequests, - this.processHeartbeatResponses); + dispatcherClient.getWindmillServiceStub(), throttleTimers.getDataThrottleTimer()); } @Override diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillStreamFactory.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillStreamFactory.java index c652e98e5568..14866f3f586b 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillStreamFactory.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillStreamFactory.java @@ -68,6 +68,7 @@ public class GrpcWindmillStreamFactory implements StatusDataProvider { private static final int DEFAULT_LOG_EVERY_N_STREAM_FAILURES = 1; private static final int DEFAULT_STREAMING_RPC_BATCH_LIMIT = Integer.MAX_VALUE; private static final int DEFAULT_WINDMILL_MESSAGES_BETWEEN_IS_READY_CHECKS = 1; + private static final int NO_HEALTH_CHECKS = -1; private final JobHeader jobHeader; private final int logEveryNStreamFailures; @@ -76,12 +77,18 @@ public class GrpcWindmillStreamFactory implements StatusDataProvider { private final Supplier grpcBackOff; private final Set> streamRegistry; private final AtomicLong streamIdGenerator; + // If true, then active work refreshes will be sent as KeyedGetDataRequests. Otherwise, use the + // newer ComputationHeartbeatRequests. + private final boolean sendKeyedGetDataRequests; + private final Consumer> processHeartbeatResponses; - GrpcWindmillStreamFactory( + private GrpcWindmillStreamFactory( JobHeader jobHeader, int logEveryNStreamFailures, int streamingRpcBatchLimit, int windmillMessagesBetweenIsReadyChecks, + boolean sendKeyedGetDataRequests, + Consumer> processHeartbeatResponses, Supplier maxBackOffSupplier) { this.jobHeader = jobHeader; this.logEveryNStreamFailures = logEveryNStreamFailures; @@ -96,9 +103,53 @@ public class GrpcWindmillStreamFactory implements StatusDataProvider { .withMaxBackoff(maxBackOffSupplier.get()) .backoff()); this.streamRegistry = ConcurrentHashMap.newKeySet(); + this.sendKeyedGetDataRequests = sendKeyedGetDataRequests; + this.processHeartbeatResponses = processHeartbeatResponses; this.streamIdGenerator = new AtomicLong(); } + /** @implNote Used for {@link AutoBuilder} {@link Builder} class, do not call directly. */ + static GrpcWindmillStreamFactory create( + JobHeader jobHeader, + int logEveryNStreamFailures, + int streamingRpcBatchLimit, + int windmillMessagesBetweenIsReadyChecks, + boolean sendKeyedGetDataRequests, + Consumer> processHeartbeatResponses, + Supplier maxBackOffSupplier, + int healthCheckIntervalMillis) { + GrpcWindmillStreamFactory streamFactory = + new GrpcWindmillStreamFactory( + jobHeader, + logEveryNStreamFailures, + streamingRpcBatchLimit, + windmillMessagesBetweenIsReadyChecks, + sendKeyedGetDataRequests, + processHeartbeatResponses, + maxBackOffSupplier); + + if (healthCheckIntervalMillis >= 0) { + // Health checks are run on background daemon thread, which will only be cleaned up on JVM + // shutdown. + new Timer("WindmillHealthCheckTimer") + .schedule( + new TimerTask() { + @Override + public void run() { + Instant reportThreshold = + Instant.now().minus(Duration.millis(healthCheckIntervalMillis)); + for (AbstractWindmillStream stream : streamFactory.streamRegistry) { + stream.maybeSendHealthCheck(reportThreshold); + } + } + }, + 0, + healthCheckIntervalMillis); + } + + return streamFactory; + } + /** * Returns a new {@link Builder} for {@link GrpcWindmillStreamFactory} with default values set for * the given {@link JobHeader}. @@ -109,7 +160,10 @@ public static GrpcWindmillStreamFactory.Builder of(JobHeader jobHeader) { .setWindmillMessagesBetweenIsReadyChecks(DEFAULT_WINDMILL_MESSAGES_BETWEEN_IS_READY_CHECKS) .setMaxBackOffSupplier(() -> DEFAULT_MAX_BACKOFF) .setLogEveryNStreamFailures(DEFAULT_LOG_EVERY_N_STREAM_FAILURES) - .setStreamingRpcBatchLimit(DEFAULT_STREAMING_RPC_BATCH_LIMIT); + .setStreamingRpcBatchLimit(DEFAULT_STREAMING_RPC_BATCH_LIMIT) + .setHealthCheckIntervalMillis(NO_HEALTH_CHECKS) + .setSendKeyedGetDataRequests(true) + .setProcessHeartbeatResponses(ignored -> {}); } private static > T withDefaultDeadline(T stub) { @@ -156,10 +210,7 @@ public GetWorkStream createDirectGetWorkStream( } public GetDataStream createGetDataStream( - CloudWindmillServiceV1Alpha1Stub stub, - ThrottleTimer getDataThrottleTimer, - boolean sendKeyedGetDataRequests, - Consumer> processHeartbeatResponses) { + CloudWindmillServiceV1Alpha1Stub stub, ThrottleTimer getDataThrottleTimer) { return GrpcGetDataStream.create( responseObserver -> withDefaultDeadline(stub).getDataStream(responseObserver), grpcBackOff.get(), @@ -174,11 +225,6 @@ public GetDataStream createGetDataStream( processHeartbeatResponses); } - public GetDataStream createGetDataStream( - CloudWindmillServiceV1Alpha1Stub stub, ThrottleTimer getDataThrottleTimer) { - return createGetDataStream(stub, getDataThrottleTimer, false, (response) -> {}); - } - public CommitWorkStream createCommitWorkStream( CloudWindmillServiceV1Alpha1Stub stub, ThrottleTimer commitWorkThrottleTimer) { return GrpcCommitWorkStream.create( @@ -214,30 +260,6 @@ private StreamObserverFactory newStreamObserverFactory() { DEFAULT_STREAM_RPC_DEADLINE_SECONDS * 2, windmillMessagesBetweenIsReadyChecks); } - /** - * Schedules streaming RPC health checks to run on a background daemon thread, which will be - * cleaned up when the JVM shutdown. - */ - public void scheduleHealthChecks(int healthCheckInterval) { - if (healthCheckInterval < 0) { - return; - } - - new Timer("WindmillHealthCheckTimer") - .schedule( - new TimerTask() { - @Override - public void run() { - Instant reportThreshold = Instant.now().minus(Duration.millis(healthCheckInterval)); - for (AbstractWindmillStream stream : streamRegistry) { - stream.maybeSendHealthCheck(reportThreshold); - } - } - }, - 0, - healthCheckInterval); - } - @Override public void appendSummaryHtml(PrintWriter writer) { writer.write("Active Streams:
"); @@ -248,7 +270,7 @@ public void appendSummaryHtml(PrintWriter writer) { } @Internal - @AutoBuilder(ofClass = GrpcWindmillStreamFactory.class) + @AutoBuilder(callMethod = "create") public interface Builder { Builder setJobHeader(JobHeader jobHeader); @@ -260,6 +282,13 @@ public interface Builder { Builder setMaxBackOffSupplier(Supplier maxBackOff); + Builder setSendKeyedGetDataRequests(boolean sendKeyedGetDataRequests); + + Builder setProcessHeartbeatResponses( + Consumer> processHeartbeatResponses); + + Builder setHealthCheckIntervalMillis(int healthCheckIntervalMillis); + GrpcWindmillStreamFactory build(); } } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/StreamingEngineClient.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/StreamingEngineClient.java index a9ca749ff1cd..4760062c5754 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/StreamingEngineClient.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/StreamingEngineClient.java @@ -30,13 +30,11 @@ import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; import javax.annotation.CheckReturnValue; import javax.annotation.concurrent.ThreadSafe; import org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc.CloudWindmillServiceV1Alpha1Stub; -import org.apache.beam.runners.dataflow.worker.windmill.Windmill; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetWorkRequest; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.JobHeader; import org.apache.beam.runners.dataflow.worker.windmill.WindmillConnection; @@ -93,7 +91,6 @@ public final class StreamingEngineClient { private final Supplier getWorkerMetadataStream; private final Queue newWindmillEndpoints; private final Function workCommitterFactory; - private final Consumer> heartbeatResponseProcessor; /** Writes are guarded by synchronization, reads are lock free. */ private final AtomicReference connections; @@ -110,8 +107,7 @@ private StreamingEngineClient( GetWorkBudgetDistributor getWorkBudgetDistributor, GrpcDispatcherClient dispatcherClient, long clientId, - Function workCommitterFactory, - Consumer> heartbeatResponseProcessor) { + Function workCommitterFactory) { this.jobHeader = jobHeader; this.started = false; this.streamFactory = streamFactory; @@ -147,7 +143,6 @@ private StreamingEngineClient( newWorkerMetadataPublisher.submit( () -> newWindmillEndpoints.add(endpoints)))); this.workCommitterFactory = workCommitterFactory; - this.heartbeatResponseProcessor = heartbeatResponseProcessor; } private static ExecutorService singleThreadedExecutorServiceOf(String threadName) { @@ -176,8 +171,7 @@ public static StreamingEngineClient create( ChannelCachingStubFactory channelCachingStubFactory, GetWorkBudgetDistributor getWorkBudgetDistributor, GrpcDispatcherClient dispatcherClient, - Function workCommitterFactory, - Consumer> heartbeatProcessor) { + Function workCommitterFactory) { return new StreamingEngineClient( jobHeader, totalGetWorkBudget, @@ -187,8 +181,7 @@ public static StreamingEngineClient create( getWorkBudgetDistributor, dispatcherClient, /* clientId= */ new Random().nextLong(), - workCommitterFactory, - heartbeatProcessor); + workCommitterFactory); } @VisibleForTesting @@ -201,8 +194,7 @@ static StreamingEngineClient forTesting( GetWorkBudgetDistributor getWorkBudgetDistributor, GrpcDispatcherClient dispatcherClient, long clientId, - Function workCommitterFactory, - Consumer> heartbeatResponseProcessor) { + Function workCommitterFactory) { StreamingEngineClient streamingEngineClient = new StreamingEngineClient( jobHeader, @@ -213,8 +205,7 @@ static StreamingEngineClient forTesting( getWorkBudgetDistributor, dispatcherClient, clientId, - workCommitterFactory, - heartbeatResponseProcessor); + workCommitterFactory); streamingEngineClient.start(); return streamingEngineClient; } @@ -409,8 +400,7 @@ private WindmillStreamSender createAndStartWindmillStreamSenderFor( GetWorkBudget.noBudget(), streamFactory, workItemScheduler, - workCommitterFactory, - heartbeatResponseProcessor); + workCommitterFactory); windmillStreamSender.startStreams(); return windmillStreamSender; } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/WindmillStreamSender.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/WindmillStreamSender.java index ff9ddc00c3f0..e9f008eb522e 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/WindmillStreamSender.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/WindmillStreamSender.java @@ -17,15 +17,12 @@ */ package org.apache.beam.runners.dataflow.worker.windmill.client.grpc; -import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; import javax.annotation.concurrent.ThreadSafe; import org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc.CloudWindmillServiceV1Alpha1Stub; -import org.apache.beam.runners.dataflow.worker.windmill.Windmill; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetWorkRequest; import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.CommitWorkStream; import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetDataStream; @@ -73,24 +70,20 @@ private WindmillStreamSender( AtomicReference getWorkBudget, GrpcWindmillStreamFactory streamingEngineStreamFactory, WorkItemScheduler workItemScheduler, - Function workCommitterFactory, - Consumer> heartbeatResponseProcessor) { + Function workCommitterFactory) { this.started = new AtomicBoolean(false); this.getWorkBudget = getWorkBudget; this.streamingEngineThrottleTimers = StreamingEngineThrottleTimers.create(); // All streams are memoized/cached since they are expensive to create and some implementations // perform side effects on construction (i.e. sending initial requests to the stream server to - // initiate the streaming RPC connection). Stream instances connect/reconnect internally so we + // initiate the streaming RPC connection). Stream instances connect/reconnect internally, so we // can reuse the same instance through the entire lifecycle of WindmillStreamSender. this.getDataStream = Suppliers.memoize( () -> streamingEngineStreamFactory.createGetDataStream( - stub, - streamingEngineThrottleTimers.getDataThrottleTimer(), - false, - heartbeatResponseProcessor)); + stub, streamingEngineThrottleTimers.getDataThrottleTimer())); this.commitWorkStream = Suppliers.memoize( () -> @@ -116,16 +109,14 @@ public static WindmillStreamSender create( GetWorkBudget getWorkBudget, GrpcWindmillStreamFactory streamingEngineStreamFactory, WorkItemScheduler workItemScheduler, - Function workCommitterFactory, - Consumer> heartbeatResponseProcessor) { + Function workCommitterFactory) { return new WindmillStreamSender( stub, getWorkRequest, new AtomicReference<>(getWorkBudget), streamingEngineStreamFactory, workItemScheduler, - workCommitterFactory, - heartbeatResponseProcessor); + workCommitterFactory); } private static GetWorkRequest withRequestBudget(GetWorkRequest request, GetWorkBudget budget) { diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/StreamingEngineClientTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/StreamingEngineClientTest.java index 9822daa91567..bc3afaff1b38 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/StreamingEngineClientTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/StreamingEngineClientTest.java @@ -181,8 +181,7 @@ private StreamingEngineClient newStreamingEngineClient( getWorkBudgetDistributor, dispatcherClient, CLIENT_ID, - ignored -> mock(WorkCommitter.class), - ignored -> {}); + ignored -> mock(WorkCommitter.class)); } @Test @@ -238,7 +237,7 @@ public void testStreamsStartCorrectly() throws InterruptedException { .createDirectGetWorkStream( any(), eq(getWorkRequest(0, 0)), any(), any(), any(), eq(noOpProcessWorkItemFn())); - verify(streamFactory, times(2)).createGetDataStream(any(), any(), eq(false), any()); + verify(streamFactory, times(2)).createGetDataStream(any(), any()); verify(streamFactory, times(2)).createCommitWorkStream(any(), any()); } diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/WindmillStreamSenderTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/WindmillStreamSenderTest.java index 496f69dc52d8..162c69509ae1 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/WindmillStreamSenderTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/WindmillStreamSenderTest.java @@ -107,7 +107,7 @@ public void testStartStream_startsAllStreams() { any(), eq(workItemScheduler)); - verify(streamFactory).createGetDataStream(eq(stub), any(ThrottleTimer.class), eq(false), any()); + verify(streamFactory).createGetDataStream(eq(stub), any(ThrottleTimer.class)); verify(streamFactory).createCommitWorkStream(eq(stub), any(ThrottleTimer.class)); } @@ -138,8 +138,7 @@ public void testStartStream_onlyStartsStreamsOnce() { any(), eq(workItemScheduler)); - verify(streamFactory, times(1)) - .createGetDataStream(eq(stub), any(ThrottleTimer.class), eq(false), any()); + verify(streamFactory, times(1)).createGetDataStream(eq(stub), any(ThrottleTimer.class)); verify(streamFactory, times(1)).createCommitWorkStream(eq(stub), any(ThrottleTimer.class)); } @@ -173,8 +172,7 @@ public void testStartStream_onlyStartsStreamsOnceConcurrent() throws Interrupted any(), eq(workItemScheduler)); - verify(streamFactory, times(1)) - .createGetDataStream(eq(stub), any(ThrottleTimer.class), eq(false), any()); + verify(streamFactory, times(1)).createGetDataStream(eq(stub), any(ThrottleTimer.class)); verify(streamFactory, times(1)).createCommitWorkStream(eq(stub), any(ThrottleTimer.class)); } @@ -208,8 +206,7 @@ public void testCloseAllStreams_closesAllStreams() { eq(workItemScheduler))) .thenReturn(mockGetWorkStream); - when(mockStreamFactory.createGetDataStream( - eq(stub), any(ThrottleTimer.class), eq(false), any())) + when(mockStreamFactory.createGetDataStream(eq(stub), any(ThrottleTimer.class))) .thenReturn(mockGetDataStream); when(mockStreamFactory.createCommitWorkStream(eq(stub), any(ThrottleTimer.class))) .thenReturn(mockCommitWorkStream); @@ -239,7 +236,6 @@ private WindmillStreamSender newWindmillStreamSender( budget, streamFactory, workItemScheduler, - ignored -> mock(WorkCommitter.class), - ignored -> {}); + ignored -> mock(WorkCommitter.class)); } } diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/EvenGetWorkBudgetDistributorTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/EvenGetWorkBudgetDistributorTest.java index 4fa424412ee0..83ae8aa22ce3 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/EvenGetWorkBudgetDistributorTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/EvenGetWorkBudgetDistributorTest.java @@ -259,7 +259,6 @@ private WindmillStreamSender createWindmillStreamSender(GetWorkBudget getWorkBud .build()) .build(), (workItem, watermarks, processingContext, ackWorkItemQueued, getWorkStreamLatencies) -> {}, - ignored -> mock(WorkCommitter.class), - ignored -> {}); + ignored -> mock(WorkCommitter.class)); } }