Skip to content

Commit

Permalink
[Dataflow Streaming] Use separate heartbeat streams based on job sett…
Browse files Browse the repository at this point in the history
…ings (#32511)
  • Loading branch information
arunpandianp authored Sep 26, 2024
1 parent aabae27 commit c7fb9a0
Show file tree
Hide file tree
Showing 5 changed files with 253 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,9 @@ public interface DataflowStreamingPipelineOptions extends PipelineOptions {

@Description(
"If true, separate streaming rpcs will be used for heartbeats instead of sharing streams with state reads.")
@Default.Boolean(false)
boolean getUseSeparateWindmillHeartbeatStreams();
Boolean getUseSeparateWindmillHeartbeatStreams();

void setUseSeparateWindmillHeartbeatStreams(boolean value);
void setUseSeparateWindmillHeartbeatStreams(Boolean value);

@Description("The number of streams to use for GetData requests.")
@Default.Integer(1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@ public final class StreamingDataflowWorker {
private static final int DEFAULT_STATUS_PORT = 8081;
private static final Random CLIENT_ID_GENERATOR = new Random();
private static final String CHANNELZ_PATH = "/channelz";
public static final String STREAMING_ENGINE_USE_JOB_SETTINGS_FOR_HEARTBEAT_POOL =
"streaming_engine_use_job_settings_for_heartbeat_pool";

private final WindmillStateCache stateCache;
private final StreamingWorkerStatusPages statusPages;
Expand Down Expand Up @@ -253,12 +255,24 @@ private StreamingDataflowWorker(
GET_DATA_STREAM_TIMEOUT,
windmillServer::getDataStream);
getDataClient = new StreamPoolGetDataClient(getDataMetricTracker, getDataStreamPool);
heartbeatSender =
new StreamPoolHeartbeatSender(
options.getUseSeparateWindmillHeartbeatStreams()
? WindmillStreamPool.create(
1, GET_DATA_STREAM_TIMEOUT, windmillServer::getDataStream)
: getDataStreamPool);
// Experiment gates the logic till backend changes are rollback safe
if (!DataflowRunner.hasExperiment(
options, STREAMING_ENGINE_USE_JOB_SETTINGS_FOR_HEARTBEAT_POOL)
|| options.getUseSeparateWindmillHeartbeatStreams() != null) {
heartbeatSender =
StreamPoolHeartbeatSender.Create(
Boolean.TRUE.equals(options.getUseSeparateWindmillHeartbeatStreams())
? separateHeartbeatPool(windmillServer)
: getDataStreamPool);

} else {
heartbeatSender =
StreamPoolHeartbeatSender.Create(
separateHeartbeatPool(windmillServer),
getDataStreamPool,
configFetcher.getGlobalConfigHandle());
}

stuckCommitDurationMillis =
options.getStuckCommitDurationMillis() > 0 ? options.getStuckCommitDurationMillis() : 0;
statusPagesBuilder
Expand Down Expand Up @@ -326,6 +340,11 @@ private StreamingDataflowWorker(
LOG.debug("LocalWindmillHostport: {}", options.getLocalWindmillHostport());
}

private static WindmillStreamPool<GetDataStream> separateHeartbeatPool(
WindmillServerStub windmillServer) {
return WindmillStreamPool.create(1, GET_DATA_STREAM_TIMEOUT, windmillServer::getDataStream);
}

public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions options) {
long clientId = CLIENT_ID_GENERATOR.nextLong();
MemoryMonitor memoryMonitor = MemoryMonitor.fromOptions(options);
Expand Down Expand Up @@ -834,6 +853,7 @@ private static ConfigFetcherComputationStateCacheAndWindmillClient create(
*/
@AutoValue
abstract static class BackgroundMemoryMonitor {

private static BackgroundMemoryMonitor create(MemoryMonitor memoryMonitor) {
return new AutoValue_StreamingDataflowWorker_BackgroundMemoryMonitor(
memoryMonitor,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.dataflow.worker.streaming.config;

import java.util.function.Consumer;
import javax.annotation.Nonnull;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.beam.sdk.annotations.Internal;

@Internal
@ThreadSafe
/*
* Fake StreamingGlobalConfigHandle used for Tests. Allows setting fake configs.
*/
public class FakeGlobalConfigHandle implements StreamingGlobalConfigHandle {

private final StreamingGlobalConfigHandleImpl globalConfigHandle;

public FakeGlobalConfigHandle(StreamingGlobalConfig config) {
this.globalConfigHandle = new StreamingGlobalConfigHandleImpl();
this.globalConfigHandle.setConfig(config);
}

@Override
public StreamingGlobalConfig getConfig() {
return globalConfigHandle.getConfig();
}

public void setConfig(StreamingGlobalConfig config) {
globalConfigHandle.setConfig(config);
}

@Override
public void registerConfigObserver(@Nonnull Consumer<StreamingGlobalConfig> callback) {
globalConfigHandle.registerConfigObserver(callback);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
*/
package org.apache.beam.runners.dataflow.worker.windmill.work.refresh;

import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nonnull;
import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfigHandle;
import org.apache.beam.runners.dataflow.worker.windmill.client.CloseableStream;
import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream;
import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStreamPool;
Expand All @@ -27,19 +30,53 @@
/** StreamingEngine stream pool based implementation of {@link HeartbeatSender}. */
@Internal
public final class StreamPoolHeartbeatSender implements HeartbeatSender {

private static final Logger LOG = LoggerFactory.getLogger(StreamPoolHeartbeatSender.class);

private final WindmillStreamPool<WindmillStream.GetDataStream> heartbeatStreamPool;
@Nonnull
private final AtomicReference<WindmillStreamPool<WindmillStream.GetDataStream>>
heartbeatStreamPool = new AtomicReference<>();

public StreamPoolHeartbeatSender(
private StreamPoolHeartbeatSender(
WindmillStreamPool<WindmillStream.GetDataStream> heartbeatStreamPool) {
this.heartbeatStreamPool = heartbeatStreamPool;
this.heartbeatStreamPool.set(heartbeatStreamPool);
}

public static StreamPoolHeartbeatSender Create(
@Nonnull WindmillStreamPool<WindmillStream.GetDataStream> heartbeatStreamPool) {
return new StreamPoolHeartbeatSender(heartbeatStreamPool);
}

/**
* Creates StreamPoolHeartbeatSender that switches between the passed in stream pools depending on
* global config.
*
* @param dedicatedHeartbeatPool stream to use when using separate streams for heartbeat is
* enabled.
* @param getDataPool stream to use when using separate streams for heartbeat is disabled.
*/
public static StreamPoolHeartbeatSender Create(
@Nonnull WindmillStreamPool<WindmillStream.GetDataStream> dedicatedHeartbeatPool,
@Nonnull WindmillStreamPool<WindmillStream.GetDataStream> getDataPool,
@Nonnull StreamingGlobalConfigHandle configHandle) {
// Use getDataPool as the default, settings callback will
// switch to the separate pool if enabled before processing any elements are processed.
StreamPoolHeartbeatSender heartbeatSender = new StreamPoolHeartbeatSender(getDataPool);
configHandle.registerConfigObserver(
streamingGlobalConfig ->
heartbeatSender.heartbeatStreamPool.set(
streamingGlobalConfig
.userWorkerJobSettings()
.getUseSeparateWindmillHeartbeatStreams()
? dedicatedHeartbeatPool
: getDataPool));
return heartbeatSender;
}

@Override
public void sendHeartbeats(Heartbeats heartbeats) {
try (CloseableStream<WindmillStream.GetDataStream> closeableStream =
heartbeatStreamPool.getCloseableStream()) {
heartbeatStreamPool.get().getCloseableStream()) {
closeableStream.stream().refreshActiveWork(heartbeats.heartbeatRequests().asMap());
} catch (Exception e) {
LOG.warn("Error occurred sending heartbeats=[{}].", heartbeats, e);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.dataflow.worker.windmill.work.refresh;

import static org.junit.Assert.assertEquals;

import java.util.Optional;
import org.apache.beam.runners.dataflow.worker.FakeWindmillServer;
import org.apache.beam.runners.dataflow.worker.streaming.config.FakeGlobalConfigHandle;
import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfig;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.HeartbeatRequest;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.UserWorkerRunnerV1Settings;
import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStreamPool;
import org.joda.time.Duration;
import org.junit.Test;
import org.junit.rules.ErrorCollector;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
public class StreamPoolHeartbeatSenderTest {

@Test
public void sendsHeartbeatsOnStream() {
FakeWindmillServer server = new FakeWindmillServer(new ErrorCollector(), c -> Optional.empty());
StreamPoolHeartbeatSender heartbeatSender =
StreamPoolHeartbeatSender.Create(
WindmillStreamPool.create(1, Duration.standardSeconds(10), server::getDataStream));
Heartbeats.Builder heartbeatsBuilder = Heartbeats.builder();
heartbeatsBuilder
.heartbeatRequestsBuilder()
.put("key", HeartbeatRequest.newBuilder().setWorkToken(123).build());
heartbeatSender.sendHeartbeats(heartbeatsBuilder.build());
assertEquals(1, server.getGetDataRequests().size());
}

@Test
public void sendsHeartbeatsOnDedicatedStream() {
FakeWindmillServer dedicatedServer =
new FakeWindmillServer(new ErrorCollector(), c -> Optional.empty());
FakeWindmillServer getDataServer =
new FakeWindmillServer(new ErrorCollector(), c -> Optional.empty());

FakeGlobalConfigHandle configHandle =
new FakeGlobalConfigHandle(getGlobalConfig(/*useSeparateHeartbeatStreams=*/ true));
StreamPoolHeartbeatSender heartbeatSender =
StreamPoolHeartbeatSender.Create(
WindmillStreamPool.create(
1, Duration.standardSeconds(10), dedicatedServer::getDataStream),
WindmillStreamPool.create(
1, Duration.standardSeconds(10), getDataServer::getDataStream),
configHandle);
Heartbeats.Builder heartbeatsBuilder = Heartbeats.builder();
heartbeatsBuilder
.heartbeatRequestsBuilder()
.put("key", HeartbeatRequest.newBuilder().setWorkToken(123).build());
heartbeatSender.sendHeartbeats(heartbeatsBuilder.build());
assertEquals(1, dedicatedServer.getGetDataRequests().size());
assertEquals(0, getDataServer.getGetDataRequests().size());

heartbeatSender.sendHeartbeats(heartbeatsBuilder.build());
assertEquals(2, dedicatedServer.getGetDataRequests().size());
assertEquals(0, getDataServer.getGetDataRequests().size());

// Turn off separate heartbeats
configHandle.setConfig(getGlobalConfig(/*useSeparateHeartbeatStreams=*/ false));
heartbeatSender.sendHeartbeats(heartbeatsBuilder.build());
// request to getDataServer increases and dedicatedServer remains same
assertEquals(2, dedicatedServer.getGetDataRequests().size());
assertEquals(1, getDataServer.getGetDataRequests().size());
}

private static StreamingGlobalConfig getGlobalConfig(boolean useSeparateHeartbeatStreams) {
return StreamingGlobalConfig.builder()
.setUserWorkerJobSettings(
UserWorkerRunnerV1Settings.newBuilder()
.setUseSeparateWindmillHeartbeatStreams(useSeparateHeartbeatStreams)
.build())
.build();
}

@Test
public void sendsHeartbeatsOnGetDataStream() {
FakeWindmillServer dedicatedServer =
new FakeWindmillServer(new ErrorCollector(), c -> Optional.empty());
FakeWindmillServer getDataServer =
new FakeWindmillServer(new ErrorCollector(), c -> Optional.empty());

FakeGlobalConfigHandle configHandle =
new FakeGlobalConfigHandle(getGlobalConfig(/*useSeparateHeartbeatStreams=*/ false));
StreamPoolHeartbeatSender heartbeatSender =
StreamPoolHeartbeatSender.Create(
WindmillStreamPool.create(
1, Duration.standardSeconds(10), dedicatedServer::getDataStream),
WindmillStreamPool.create(
1, Duration.standardSeconds(10), getDataServer::getDataStream),
configHandle);
Heartbeats.Builder heartbeatsBuilder = Heartbeats.builder();
heartbeatsBuilder
.heartbeatRequestsBuilder()
.put("key", HeartbeatRequest.newBuilder().setWorkToken(123).build());
heartbeatSender.sendHeartbeats(heartbeatsBuilder.build());
assertEquals(0, dedicatedServer.getGetDataRequests().size());
assertEquals(1, getDataServer.getGetDataRequests().size());

heartbeatSender.sendHeartbeats(heartbeatsBuilder.build());
assertEquals(0, dedicatedServer.getGetDataRequests().size());
assertEquals(2, getDataServer.getGetDataRequests().size());

// Turn on separate heartbeats
configHandle.setConfig(getGlobalConfig(/*useSeparateHeartbeatStreams=*/ true));
heartbeatSender.sendHeartbeats(heartbeatsBuilder.build());
// request to dedicatedServer increases and getDataServer remains same
assertEquals(1, dedicatedServer.getGetDataRequests().size());
assertEquals(2, getDataServer.getGetDataRequests().size());
}
}

0 comments on commit c7fb9a0

Please sign in to comment.