From fb080ea765696f03df15399cdc4c75b0a24e1a73 Mon Sep 17 00:00:00 2001 From: Arun Pandian Date: Wed, 18 Sep 2024 19:53:19 -0700 Subject: [PATCH] Use separate heartbeat streams based on job settings --- .../DataflowStreamingPipelineOptions.java | 5 +- .../worker/StreamingDataflowWorker.java | 37 +++++- .../refresh/StreamPoolHeartbeatSender.java | 44 ++++++- .../StreamPoolHeartbeatSenderTest.java | 113 ++++++++++++++++++ 4 files changed, 186 insertions(+), 13 deletions(-) create mode 100644 runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/StreamPoolHeartbeatSenderTest.java diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java index a761d38de1ab..10df6e24f49a 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java @@ -132,10 +132,9 @@ public interface DataflowStreamingPipelineOptions extends PipelineOptions { @Description( "If true, separate streaming rpcs will be used for heartbeats instead of sharing streams with state reads.") - @Default.Boolean(false) - boolean getUseSeparateWindmillHeartbeatStreams(); + Boolean getUseSeparateWindmillHeartbeatStreams(); - void setUseSeparateWindmillHeartbeatStreams(boolean value); + void setUseSeparateWindmillHeartbeatStreams(Boolean value); @Description("The number of streams to use for GetData requests.") @Default.Integer(1) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java index 0dedd4f34fd6..81868729d851 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java @@ -146,6 +146,8 @@ public final class StreamingDataflowWorker { private static final int DEFAULT_STATUS_PORT = 8081; private static final Random CLIENT_ID_GENERATOR = new Random(); private static final String CHANNELZ_PATH = "/channelz"; + public static final String STREAMING_ENGINE_USE_JOB_SETTINGS_FOR_HEARTBEAT_POOL = + "streaming_engine_use_job_settings_for_heartbeat_pool"; private final WindmillStateCache stateCache; private final StreamingWorkerStatusPages statusPages; @@ -253,12 +255,29 @@ private StreamingDataflowWorker( GET_DATA_STREAM_TIMEOUT, windmillServer::getDataStream); getDataClient = new StreamPoolGetDataClient(getDataMetricTracker, getDataStreamPool); - heartbeatSender = - new StreamPoolHeartbeatSender( - options.getUseSeparateWindmillHeartbeatStreams() - ? WindmillStreamPool.create( - 1, GET_DATA_STREAM_TIMEOUT, windmillServer::getDataStream) - : getDataStreamPool); + // Experiment gates the logic till backend changes are rollback safe + if (DataflowRunner.hasExperiment( + options, STREAMING_ENGINE_USE_JOB_SETTINGS_FOR_HEARTBEAT_POOL)) { + heartbeatSender = + // If the setting is explicitly passed in via PipelineOptions use it, + // else rely on the global config + options.getUseSeparateWindmillHeartbeatStreams() != null + ? StreamPoolHeartbeatSender.Create( + options.getUseSeparateWindmillHeartbeatStreams() + ? separateHeartbeatPool(windmillServer) + : getDataStreamPool) + : StreamPoolHeartbeatSender.Create( + separateHeartbeatPool(windmillServer), + getDataStreamPool, + configFetcher.getGlobalConfigHandle()); + } else { + heartbeatSender = + StreamPoolHeartbeatSender.Create( + Boolean.TRUE.equals(options.getUseSeparateWindmillHeartbeatStreams()) + ? separateHeartbeatPool(windmillServer) + : getDataStreamPool); + } + stuckCommitDurationMillis = options.getStuckCommitDurationMillis() > 0 ? options.getStuckCommitDurationMillis() : 0; statusPagesBuilder @@ -326,6 +345,11 @@ private StreamingDataflowWorker( LOG.debug("LocalWindmillHostport: {}", options.getLocalWindmillHostport()); } + private static WindmillStreamPool separateHeartbeatPool( + WindmillServerStub windmillServer) { + return WindmillStreamPool.create(1, GET_DATA_STREAM_TIMEOUT, windmillServer::getDataStream); + } + public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions options) { long clientId = CLIENT_ID_GENERATOR.nextLong(); MemoryMonitor memoryMonitor = MemoryMonitor.fromOptions(options); @@ -834,6 +858,7 @@ private static ConfigFetcherComputationStateCacheAndWindmillClient create( */ @AutoValue abstract static class BackgroundMemoryMonitor { + private static BackgroundMemoryMonitor create(MemoryMonitor memoryMonitor) { return new AutoValue_StreamingDataflowWorker_BackgroundMemoryMonitor( memoryMonitor, diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/StreamPoolHeartbeatSender.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/StreamPoolHeartbeatSender.java index e571f89f142c..b632db3c6d47 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/StreamPoolHeartbeatSender.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/StreamPoolHeartbeatSender.java @@ -17,6 +17,9 @@ */ package org.apache.beam.runners.dataflow.worker.windmill.work.refresh; +import java.util.concurrent.atomic.AtomicReference; +import javax.annotation.Nonnull; +import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfigHandle; import org.apache.beam.runners.dataflow.worker.windmill.client.CloseableStream; import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream; import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStreamPool; @@ -27,19 +30,52 @@ /** StreamingEngine stream pool based implementation of {@link HeartbeatSender}. */ @Internal public final class StreamPoolHeartbeatSender implements HeartbeatSender { + private static final Logger LOG = LoggerFactory.getLogger(StreamPoolHeartbeatSender.class); - private final WindmillStreamPool heartbeatStreamPool; + @Nonnull + private final AtomicReference> + heartbeatStreamPool = new AtomicReference<>(); - public StreamPoolHeartbeatSender( + private StreamPoolHeartbeatSender( WindmillStreamPool heartbeatStreamPool) { - this.heartbeatStreamPool = heartbeatStreamPool; + this.heartbeatStreamPool.set(heartbeatStreamPool); + } + + public static StreamPoolHeartbeatSender Create( + @Nonnull WindmillStreamPool heartbeatStreamPool) { + return new StreamPoolHeartbeatSender(heartbeatStreamPool); + } + + /** + * Creates StreamPoolHeartbeatSender that switches between the passed in stream pools depending on + * global config. + * + * @param heartbeatStreamPool stream to use when using separate streams for heartbeat is enabled. + * @param getDataPool stream to use when using separate streams for heartbeat is disabled. + */ + public static StreamPoolHeartbeatSender Create( + @Nonnull WindmillStreamPool heartbeatStreamPool, + @Nonnull WindmillStreamPool getDataPool, + @Nonnull StreamingGlobalConfigHandle configHandle) { + // Use getDataPool as the default, settings callback will + // switch to the separate pool if enabled before processing any elements are processed. + StreamPoolHeartbeatSender heartbeatSender = new StreamPoolHeartbeatSender(heartbeatStreamPool); + configHandle.registerConfigObserver( + streamingGlobalConfig -> + heartbeatSender.heartbeatStreamPool.set( + streamingGlobalConfig + .userWorkerJobSettings() + .getUseSeparateWindmillHeartbeatStreams() + ? heartbeatStreamPool + : getDataPool)); + return heartbeatSender; } @Override public void sendHeartbeats(Heartbeats heartbeats) { try (CloseableStream closeableStream = - heartbeatStreamPool.getCloseableStream()) { + heartbeatStreamPool.get().getCloseableStream()) { closeableStream.stream().refreshActiveWork(heartbeats.heartbeatRequests().asMap()); } catch (Exception e) { LOG.warn("Error occurred sending heartbeats=[{}].", heartbeats, e); diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/StreamPoolHeartbeatSenderTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/StreamPoolHeartbeatSenderTest.java new file mode 100644 index 000000000000..b4ba49317950 --- /dev/null +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/StreamPoolHeartbeatSenderTest.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.work.refresh; + +import static org.junit.Assert.assertEquals; + +import java.util.Optional; +import org.apache.beam.runners.dataflow.worker.FakeWindmillServer; +import org.apache.beam.runners.dataflow.worker.streaming.config.FixedGlobalConfigHandle; +import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfig; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill.HeartbeatRequest; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill.UserWorkerRunnerV1Settings; +import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStreamPool; +import org.joda.time.Duration; +import org.junit.Test; +import org.junit.rules.ErrorCollector; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class StreamPoolHeartbeatSenderTest { + + @Test + public void sendsHeartbeatsOnStream() { + FakeWindmillServer server = new FakeWindmillServer(new ErrorCollector(), c -> Optional.empty()); + StreamPoolHeartbeatSender heartbeatSender = + StreamPoolHeartbeatSender.Create( + WindmillStreamPool.create(1, Duration.standardSeconds(10), server::getDataStream)); + Heartbeats.Builder heartbeatsBuilder = Heartbeats.builder(); + heartbeatsBuilder + .heartbeatRequestsBuilder() + .put("key", HeartbeatRequest.newBuilder().setWorkToken(123).build()); + heartbeatSender.sendHeartbeats(heartbeatsBuilder.build()); + assertEquals(1, server.getGetDataRequests().size()); + } + + @Test + public void sendsHeartbeatsOnDedicatedStream() { + FakeWindmillServer dedicatedServer = + new FakeWindmillServer(new ErrorCollector(), c -> Optional.empty()); + FakeWindmillServer getDataServer = + new FakeWindmillServer(new ErrorCollector(), c -> Optional.empty()); + + FixedGlobalConfigHandle configHandle = + new FixedGlobalConfigHandle( + StreamingGlobalConfig.builder() + .setUserWorkerJobSettings( + UserWorkerRunnerV1Settings.newBuilder() + .setUseSeparateWindmillHeartbeatStreams(true) + .build()) + .build()); + StreamPoolHeartbeatSender heartbeatSender = + StreamPoolHeartbeatSender.Create( + WindmillStreamPool.create( + 1, Duration.standardSeconds(10), dedicatedServer::getDataStream), + WindmillStreamPool.create( + 1, Duration.standardSeconds(10), getDataServer::getDataStream), + configHandle); + Heartbeats.Builder heartbeatsBuilder = Heartbeats.builder(); + heartbeatsBuilder + .heartbeatRequestsBuilder() + .put("key", HeartbeatRequest.newBuilder().setWorkToken(123).build()); + heartbeatSender.sendHeartbeats(heartbeatsBuilder.build()); + assertEquals(1, dedicatedServer.getGetDataRequests().size()); + assertEquals(0, getDataServer.getGetDataRequests().size()); + } + + @Test + public void sendsHeartbeatsOnGetDataStream() { + FakeWindmillServer dedicatedServer = + new FakeWindmillServer(new ErrorCollector(), c -> Optional.empty()); + FakeWindmillServer getDataServer = + new FakeWindmillServer(new ErrorCollector(), c -> Optional.empty()); + + FixedGlobalConfigHandle configHandle = + new FixedGlobalConfigHandle( + StreamingGlobalConfig.builder() + .setUserWorkerJobSettings( + UserWorkerRunnerV1Settings.newBuilder() + .setUseSeparateWindmillHeartbeatStreams(false) + .build()) + .build()); + StreamPoolHeartbeatSender heartbeatSender = + StreamPoolHeartbeatSender.Create( + WindmillStreamPool.create( + 1, Duration.standardSeconds(10), dedicatedServer::getDataStream), + WindmillStreamPool.create( + 1, Duration.standardSeconds(10), getDataServer::getDataStream), + configHandle); + Heartbeats.Builder heartbeatsBuilder = Heartbeats.builder(); + heartbeatsBuilder + .heartbeatRequestsBuilder() + .put("key", HeartbeatRequest.newBuilder().setWorkToken(123).build()); + heartbeatSender.sendHeartbeats(heartbeatsBuilder.build()); + assertEquals(0, dedicatedServer.getGetDataRequests().size()); + assertEquals(1, getDataServer.getGetDataRequests().size()); + } +}