From c7fb9a0d5da79018836c266355afbb33d92bb983 Mon Sep 17 00:00:00 2001 From: Arun Pandian Date: Thu, 26 Sep 2024 02:39:33 -0700 Subject: [PATCH] [Dataflow Streaming] Use separate heartbeat streams based on job settings (#32511) --- .../DataflowStreamingPipelineOptions.java | 5 +- .../worker/StreamingDataflowWorker.java | 32 ++++- .../config/FakeGlobalConfigHandle.java | 52 +++++++ .../refresh/StreamPoolHeartbeatSender.java | 45 +++++- .../StreamPoolHeartbeatSenderTest.java | 132 ++++++++++++++++++ 5 files changed, 253 insertions(+), 13 deletions(-) create mode 100644 runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/FakeGlobalConfigHandle.java create mode 100644 runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/StreamPoolHeartbeatSenderTest.java diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java index a761d38de1ab..10df6e24f49a 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java @@ -132,10 +132,9 @@ public interface DataflowStreamingPipelineOptions extends PipelineOptions { @Description( "If true, separate streaming rpcs will be used for heartbeats instead of sharing streams with state reads.") - @Default.Boolean(false) - boolean getUseSeparateWindmillHeartbeatStreams(); + Boolean getUseSeparateWindmillHeartbeatStreams(); - void setUseSeparateWindmillHeartbeatStreams(boolean value); + void setUseSeparateWindmillHeartbeatStreams(Boolean value); @Description("The number of streams to use for GetData requests.") @Default.Integer(1) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java index 0dedd4f34fd6..8b440c306f0e 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java @@ -146,6 +146,8 @@ public final class StreamingDataflowWorker { private static final int DEFAULT_STATUS_PORT = 8081; private static final Random CLIENT_ID_GENERATOR = new Random(); private static final String CHANNELZ_PATH = "/channelz"; + public static final String STREAMING_ENGINE_USE_JOB_SETTINGS_FOR_HEARTBEAT_POOL = + "streaming_engine_use_job_settings_for_heartbeat_pool"; private final WindmillStateCache stateCache; private final StreamingWorkerStatusPages statusPages; @@ -253,12 +255,24 @@ private StreamingDataflowWorker( GET_DATA_STREAM_TIMEOUT, windmillServer::getDataStream); getDataClient = new StreamPoolGetDataClient(getDataMetricTracker, getDataStreamPool); - heartbeatSender = - new StreamPoolHeartbeatSender( - options.getUseSeparateWindmillHeartbeatStreams() - ? WindmillStreamPool.create( - 1, GET_DATA_STREAM_TIMEOUT, windmillServer::getDataStream) - : getDataStreamPool); + // Experiment gates the logic till backend changes are rollback safe + if (!DataflowRunner.hasExperiment( + options, STREAMING_ENGINE_USE_JOB_SETTINGS_FOR_HEARTBEAT_POOL) + || options.getUseSeparateWindmillHeartbeatStreams() != null) { + heartbeatSender = + StreamPoolHeartbeatSender.Create( + Boolean.TRUE.equals(options.getUseSeparateWindmillHeartbeatStreams()) + ? separateHeartbeatPool(windmillServer) + : getDataStreamPool); + + } else { + heartbeatSender = + StreamPoolHeartbeatSender.Create( + separateHeartbeatPool(windmillServer), + getDataStreamPool, + configFetcher.getGlobalConfigHandle()); + } + stuckCommitDurationMillis = options.getStuckCommitDurationMillis() > 0 ? options.getStuckCommitDurationMillis() : 0; statusPagesBuilder @@ -326,6 +340,11 @@ private StreamingDataflowWorker( LOG.debug("LocalWindmillHostport: {}", options.getLocalWindmillHostport()); } + private static WindmillStreamPool separateHeartbeatPool( + WindmillServerStub windmillServer) { + return WindmillStreamPool.create(1, GET_DATA_STREAM_TIMEOUT, windmillServer::getDataStream); + } + public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions options) { long clientId = CLIENT_ID_GENERATOR.nextLong(); MemoryMonitor memoryMonitor = MemoryMonitor.fromOptions(options); @@ -834,6 +853,7 @@ private static ConfigFetcherComputationStateCacheAndWindmillClient create( */ @AutoValue abstract static class BackgroundMemoryMonitor { + private static BackgroundMemoryMonitor create(MemoryMonitor memoryMonitor) { return new AutoValue_StreamingDataflowWorker_BackgroundMemoryMonitor( memoryMonitor, diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/FakeGlobalConfigHandle.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/FakeGlobalConfigHandle.java new file mode 100644 index 000000000000..d4d73f5882b1 --- /dev/null +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/FakeGlobalConfigHandle.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.streaming.config; + +import java.util.function.Consumer; +import javax.annotation.Nonnull; +import javax.annotation.concurrent.ThreadSafe; +import org.apache.beam.sdk.annotations.Internal; + +@Internal +@ThreadSafe +/* + * Fake StreamingGlobalConfigHandle used for Tests. Allows setting fake configs. + */ +public class FakeGlobalConfigHandle implements StreamingGlobalConfigHandle { + + private final StreamingGlobalConfigHandleImpl globalConfigHandle; + + public FakeGlobalConfigHandle(StreamingGlobalConfig config) { + this.globalConfigHandle = new StreamingGlobalConfigHandleImpl(); + this.globalConfigHandle.setConfig(config); + } + + @Override + public StreamingGlobalConfig getConfig() { + return globalConfigHandle.getConfig(); + } + + public void setConfig(StreamingGlobalConfig config) { + globalConfigHandle.setConfig(config); + } + + @Override + public void registerConfigObserver(@Nonnull Consumer callback) { + globalConfigHandle.registerConfigObserver(callback); + } +} diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/StreamPoolHeartbeatSender.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/StreamPoolHeartbeatSender.java index e571f89f142c..fa36b11ffe55 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/StreamPoolHeartbeatSender.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/StreamPoolHeartbeatSender.java @@ -17,6 +17,9 @@ */ package org.apache.beam.runners.dataflow.worker.windmill.work.refresh; +import java.util.concurrent.atomic.AtomicReference; +import javax.annotation.Nonnull; +import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfigHandle; import org.apache.beam.runners.dataflow.worker.windmill.client.CloseableStream; import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream; import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStreamPool; @@ -27,19 +30,53 @@ /** StreamingEngine stream pool based implementation of {@link HeartbeatSender}. */ @Internal public final class StreamPoolHeartbeatSender implements HeartbeatSender { + private static final Logger LOG = LoggerFactory.getLogger(StreamPoolHeartbeatSender.class); - private final WindmillStreamPool heartbeatStreamPool; + @Nonnull + private final AtomicReference> + heartbeatStreamPool = new AtomicReference<>(); - public StreamPoolHeartbeatSender( + private StreamPoolHeartbeatSender( WindmillStreamPool heartbeatStreamPool) { - this.heartbeatStreamPool = heartbeatStreamPool; + this.heartbeatStreamPool.set(heartbeatStreamPool); + } + + public static StreamPoolHeartbeatSender Create( + @Nonnull WindmillStreamPool heartbeatStreamPool) { + return new StreamPoolHeartbeatSender(heartbeatStreamPool); + } + + /** + * Creates StreamPoolHeartbeatSender that switches between the passed in stream pools depending on + * global config. + * + * @param dedicatedHeartbeatPool stream to use when using separate streams for heartbeat is + * enabled. + * @param getDataPool stream to use when using separate streams for heartbeat is disabled. + */ + public static StreamPoolHeartbeatSender Create( + @Nonnull WindmillStreamPool dedicatedHeartbeatPool, + @Nonnull WindmillStreamPool getDataPool, + @Nonnull StreamingGlobalConfigHandle configHandle) { + // Use getDataPool as the default, settings callback will + // switch to the separate pool if enabled before processing any elements are processed. + StreamPoolHeartbeatSender heartbeatSender = new StreamPoolHeartbeatSender(getDataPool); + configHandle.registerConfigObserver( + streamingGlobalConfig -> + heartbeatSender.heartbeatStreamPool.set( + streamingGlobalConfig + .userWorkerJobSettings() + .getUseSeparateWindmillHeartbeatStreams() + ? dedicatedHeartbeatPool + : getDataPool)); + return heartbeatSender; } @Override public void sendHeartbeats(Heartbeats heartbeats) { try (CloseableStream closeableStream = - heartbeatStreamPool.getCloseableStream()) { + heartbeatStreamPool.get().getCloseableStream()) { closeableStream.stream().refreshActiveWork(heartbeats.heartbeatRequests().asMap()); } catch (Exception e) { LOG.warn("Error occurred sending heartbeats=[{}].", heartbeats, e); diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/StreamPoolHeartbeatSenderTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/StreamPoolHeartbeatSenderTest.java new file mode 100644 index 000000000000..ed915088d0a6 --- /dev/null +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/StreamPoolHeartbeatSenderTest.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.work.refresh; + +import static org.junit.Assert.assertEquals; + +import java.util.Optional; +import org.apache.beam.runners.dataflow.worker.FakeWindmillServer; +import org.apache.beam.runners.dataflow.worker.streaming.config.FakeGlobalConfigHandle; +import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfig; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill.HeartbeatRequest; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill.UserWorkerRunnerV1Settings; +import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStreamPool; +import org.joda.time.Duration; +import org.junit.Test; +import org.junit.rules.ErrorCollector; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class StreamPoolHeartbeatSenderTest { + + @Test + public void sendsHeartbeatsOnStream() { + FakeWindmillServer server = new FakeWindmillServer(new ErrorCollector(), c -> Optional.empty()); + StreamPoolHeartbeatSender heartbeatSender = + StreamPoolHeartbeatSender.Create( + WindmillStreamPool.create(1, Duration.standardSeconds(10), server::getDataStream)); + Heartbeats.Builder heartbeatsBuilder = Heartbeats.builder(); + heartbeatsBuilder + .heartbeatRequestsBuilder() + .put("key", HeartbeatRequest.newBuilder().setWorkToken(123).build()); + heartbeatSender.sendHeartbeats(heartbeatsBuilder.build()); + assertEquals(1, server.getGetDataRequests().size()); + } + + @Test + public void sendsHeartbeatsOnDedicatedStream() { + FakeWindmillServer dedicatedServer = + new FakeWindmillServer(new ErrorCollector(), c -> Optional.empty()); + FakeWindmillServer getDataServer = + new FakeWindmillServer(new ErrorCollector(), c -> Optional.empty()); + + FakeGlobalConfigHandle configHandle = + new FakeGlobalConfigHandle(getGlobalConfig(/*useSeparateHeartbeatStreams=*/ true)); + StreamPoolHeartbeatSender heartbeatSender = + StreamPoolHeartbeatSender.Create( + WindmillStreamPool.create( + 1, Duration.standardSeconds(10), dedicatedServer::getDataStream), + WindmillStreamPool.create( + 1, Duration.standardSeconds(10), getDataServer::getDataStream), + configHandle); + Heartbeats.Builder heartbeatsBuilder = Heartbeats.builder(); + heartbeatsBuilder + .heartbeatRequestsBuilder() + .put("key", HeartbeatRequest.newBuilder().setWorkToken(123).build()); + heartbeatSender.sendHeartbeats(heartbeatsBuilder.build()); + assertEquals(1, dedicatedServer.getGetDataRequests().size()); + assertEquals(0, getDataServer.getGetDataRequests().size()); + + heartbeatSender.sendHeartbeats(heartbeatsBuilder.build()); + assertEquals(2, dedicatedServer.getGetDataRequests().size()); + assertEquals(0, getDataServer.getGetDataRequests().size()); + + // Turn off separate heartbeats + configHandle.setConfig(getGlobalConfig(/*useSeparateHeartbeatStreams=*/ false)); + heartbeatSender.sendHeartbeats(heartbeatsBuilder.build()); + // request to getDataServer increases and dedicatedServer remains same + assertEquals(2, dedicatedServer.getGetDataRequests().size()); + assertEquals(1, getDataServer.getGetDataRequests().size()); + } + + private static StreamingGlobalConfig getGlobalConfig(boolean useSeparateHeartbeatStreams) { + return StreamingGlobalConfig.builder() + .setUserWorkerJobSettings( + UserWorkerRunnerV1Settings.newBuilder() + .setUseSeparateWindmillHeartbeatStreams(useSeparateHeartbeatStreams) + .build()) + .build(); + } + + @Test + public void sendsHeartbeatsOnGetDataStream() { + FakeWindmillServer dedicatedServer = + new FakeWindmillServer(new ErrorCollector(), c -> Optional.empty()); + FakeWindmillServer getDataServer = + new FakeWindmillServer(new ErrorCollector(), c -> Optional.empty()); + + FakeGlobalConfigHandle configHandle = + new FakeGlobalConfigHandle(getGlobalConfig(/*useSeparateHeartbeatStreams=*/ false)); + StreamPoolHeartbeatSender heartbeatSender = + StreamPoolHeartbeatSender.Create( + WindmillStreamPool.create( + 1, Duration.standardSeconds(10), dedicatedServer::getDataStream), + WindmillStreamPool.create( + 1, Duration.standardSeconds(10), getDataServer::getDataStream), + configHandle); + Heartbeats.Builder heartbeatsBuilder = Heartbeats.builder(); + heartbeatsBuilder + .heartbeatRequestsBuilder() + .put("key", HeartbeatRequest.newBuilder().setWorkToken(123).build()); + heartbeatSender.sendHeartbeats(heartbeatsBuilder.build()); + assertEquals(0, dedicatedServer.getGetDataRequests().size()); + assertEquals(1, getDataServer.getGetDataRequests().size()); + + heartbeatSender.sendHeartbeats(heartbeatsBuilder.build()); + assertEquals(0, dedicatedServer.getGetDataRequests().size()); + assertEquals(2, getDataServer.getGetDataRequests().size()); + + // Turn on separate heartbeats + configHandle.setConfig(getGlobalConfig(/*useSeparateHeartbeatStreams=*/ true)); + heartbeatSender.sendHeartbeats(heartbeatsBuilder.build()); + // request to dedicatedServer increases and getDataServer remains same + assertEquals(1, dedicatedServer.getGetDataRequests().size()); + assertEquals(2, getDataServer.getGetDataRequests().size()); + } +}