Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#21431 Pubsub dynamic topic destinations #26063

Merged
merged 3 commits into from
Apr 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ public class PTransformTranslation {
public static final String PUBSUB_READ = "beam:transform:pubsub_read:v1";
public static final String PUBSUB_WRITE = "beam:transform:pubsub_write:v1";

public static final String PUBSUB_WRITE_DYNAMIC = "beam:transform:pubsub_write:v2";

// CombineComponents
public static final String COMBINE_PER_KEY_PRECOMBINE_TRANSFORM_URN =
"beam:transform:combine_per_key_precombine:v1";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1797,8 +1797,7 @@ public PubsubMessage apply(PubsubMessage input) {
* Suppress application of {@link PubsubUnboundedSink#expand} in streaming mode so that we can
* instead defer to Windmill's implementation.
*/
private static class StreamingPubsubIOWrite
extends PTransform<PCollection<PubsubMessage>, PDone> {
static class StreamingPubsubIOWrite extends PTransform<PCollection<PubsubMessage>, PDone> {

private final PubsubUnboundedSink transform;

Expand Down Expand Up @@ -1850,13 +1849,24 @@ private static void translate(
StepTranslationContext stepContext,
PCollection input) {
stepContext.addInput(PropertyNames.FORMAT, "pubsub");
if (overriddenTransform.getTopicProvider().isAccessible()) {
stepContext.addInput(
PropertyNames.PUBSUB_TOPIC, overriddenTransform.getTopic().getFullPath());
if (overriddenTransform.getTopicProvider() != null) {
if (overriddenTransform.getTopicProvider().isAccessible()) {
stepContext.addInput(
PropertyNames.PUBSUB_TOPIC, overriddenTransform.getTopic().getFullPath());
} else {
stepContext.addInput(
PropertyNames.PUBSUB_TOPIC_OVERRIDE,
((NestedValueProvider) overriddenTransform.getTopicProvider()).propertyName());
}
} else {
stepContext.addInput(
PropertyNames.PUBSUB_TOPIC_OVERRIDE,
((NestedValueProvider) overriddenTransform.getTopicProvider()).propertyName());
DataflowPipelineOptions options =
input.getPipeline().getOptions().as(DataflowPipelineOptions.class);
if (options.getEnableDynamicPubsubDestinations()) {
stepContext.addInput(PropertyNames.PUBSUB_DYNAMIC_DESTINATIONS, true);
} else {
throw new RuntimeException(
"Dynamic Pubsub destinations not yet supported. Topic must be set.");
}
}
if (overriddenTransform.getTimestampAttribute() != null) {
stepContext.addInput(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,14 @@ public interface DataflowPipelineOptions
@Description("The customized dataflow worker jar")
String getDataflowWorkerJar();

void setDataflowWorkerJar(String dataflowWorkerJar);
void setDataflowWorkerJar(String dataflowWorkerJafr);

// Disable this support for now until the Dataflow backend fully supports this option.
@Description("Whether to allow dynamic pubsub destinations. Temporary option: will be removed.")
@Default.Boolean(false)
Boolean getEnableDynamicPubsubDestinations();

void setEnableDynamicPubsubDestinations(Boolean enable);

/** Set of available Flexible Resource Scheduling goals. */
enum FlexResourceSchedulingGoal {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,15 @@ public class PropertyNames {
public static final String PARALLEL_INPUT = "parallel_input";
public static final String PUBSUB_ID_ATTRIBUTE = "pubsub_id_label";
public static final String PUBSUB_SERIALIZED_ATTRIBUTES_FN = "pubsub_serialized_attributes_fn";

public static final String PUBSUB_SUBSCRIPTION = "pubsub_subscription";
public static final String PUBSUB_SUBSCRIPTION_OVERRIDE = "pubsub_subscription_runtime_override";
public static final String PUBSUB_TIMESTAMP_ATTRIBUTE = "pubsub_timestamp_label";
public static final String PUBSUB_TOPIC = "pubsub_topic";
public static final String PUBSUB_TOPIC_OVERRIDE = "pubsub_topic_runtime_override";

public static final String PUBSUB_DYNAMIC_DESTINATIONS = "pubsub_with_dynamic_destinations";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: pubsub_dynamic_destinations to match enum? ie remove "with"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has to match the string on the Dataflow backend, which is pubsub_with_dynamic_destinations


public static final String SCALAR_FIELD_NAME = "value";
public static final String SERIALIZED_FN = "serialized_fn";
public static final String SERIALIZED_TEST_STREAM = "serialized_test_stream";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import java.io.IOException;
import java.io.Serializable;
import java.nio.channels.FileChannel;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
Expand Down Expand Up @@ -120,6 +121,8 @@
import org.apache.beam.sdk.io.WriteFiles;
import org.apache.beam.sdk.io.WriteFilesResult;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptions.CheckEnabled;
Expand Down Expand Up @@ -163,7 +166,10 @@
import org.apache.beam.vendor.grpc.v1p54p0.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.hamcrest.Description;
import org.hamcrest.Matchers;
import org.hamcrest.TypeSafeMatcher;
Expand Down Expand Up @@ -2341,6 +2347,74 @@ public void testStreamingGroupIntoBatchesWithShardedKeyOverrideBytes() throws IO
verifyGroupIntoBatchesOverrideBytes(p, true, true);
}

@Test
public void testPubsubSinkOverride() throws IOException {
PipelineOptions options = buildPipelineOptions();
List<String> experiments =
new ArrayList<>(
ImmutableList.of(
GcpOptions.STREAMING_ENGINE_EXPERIMENT,
GcpOptions.WINDMILL_SERVICE_EXPERIMENT,
"use_runner_v2"));
DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
dataflowOptions.setExperiments(experiments);
dataflowOptions.setStreaming(true);
Pipeline p = Pipeline.create(options);

List<PubsubMessage> testValues =
Arrays.asList(
new PubsubMessage("foo".getBytes(StandardCharsets.UTF_8), Collections.emptyMap()));
PCollection<PubsubMessage> input =
p.apply("CreateValuesBytes", Create.of(testValues))
.setIsBoundedInternal(PCollection.IsBounded.UNBOUNDED);
input.apply(PubsubIO.writeMessages().to("projects/project/topics/topic"));
p.run();

AtomicBoolean sawPubsubOverride = new AtomicBoolean(false);
p.traverseTopologically(
new PipelineVisitor.Defaults() {
@Override
public void visitPrimitiveTransform(@UnknownKeyFor @NonNull @Initialized Node node) {
if (node.getTransform() instanceof DataflowRunner.StreamingPubsubIOWrite) {
sawPubsubOverride.set(true);
}
}
});
assertTrue(sawPubsubOverride.get());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should never be using client-side overrides for v2, where we want it all to happen in the service. Is there necessary functionality in the override? It happens after the v2 pipeline is created anyhow I think...

}

@Test
public void testPubinkDynamicOverride() throws IOException {
PipelineOptions options = buildPipelineOptions();
DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
dataflowOptions.setStreaming(true);
dataflowOptions.setEnableDynamicPubsubDestinations(true);
Pipeline p = Pipeline.create(options);

List<PubsubMessage> testValues =
Arrays.asList(
new PubsubMessage("foo".getBytes(StandardCharsets.UTF_8), Collections.emptyMap())
.withTopic(""));
PCollection<PubsubMessage> input =
p.apply("CreateValuesBytes", Create.of(testValues))
.setIsBoundedInternal(PCollection.IsBounded.UNBOUNDED);
input.apply(PubsubIO.writeMessagesDynamic());
p.run();

AtomicBoolean sawPubsubOverride = new AtomicBoolean(false);
p.traverseTopologically(
new PipelineVisitor.Defaults() {

@Override
public void visitPrimitiveTransform(@UnknownKeyFor @NonNull @Initialized Node node) {
if (node.getTransform() instanceof DataflowRunner.StreamingPubsubIOWrite) {
sawPubsubOverride.set(true);
}
}
});
assertTrue(sawPubsubOverride.get());
}

static class TestExpansionServiceClientFactory implements ExpansionServiceClientFactory {
ExpansionApi.ExpansionResponse response;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.dataflow.worker;

import static org.apache.beam.runners.dataflow.util.Structs.getString;
import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;

import com.google.auto.service.AutoService;
import java.io.IOException;
import java.util.Map;
import org.apache.beam.runners.dataflow.util.CloudObject;
import org.apache.beam.runners.dataflow.util.PropertyNames;
import org.apache.beam.runners.dataflow.worker.util.common.worker.Sink;
import org.apache.beam.runners.dataflow.worker.windmill.Pubsub;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.util.ByteStringOutputStream;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.vendor.grpc.v1p54p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
import org.checkerframework.checker.nullness.qual.Nullable;

@SuppressWarnings({
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since this is new, can you avoid supressing these?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed the nullness. Raw types suppression has to stay, as it's due to the base class

"rawtypes", // TODO(https://github.com/apache/beam/issues/20447)
})
public class PubsubDynamicSink extends Sink<WindowedValue<PubsubMessage>> {
private final String timestampLabel;
private final String idLabel;
private final StreamingModeExecutionContext context;

PubsubDynamicSink(String timestampLabel, String idLabel, StreamingModeExecutionContext context) {
this.timestampLabel = timestampLabel;
this.idLabel = idLabel;
this.context = context;
}

/** A {@link SinkFactory.Registrar} for pubsub sinks. */
@AutoService(SinkFactory.Registrar.class)
public static class Registrar implements SinkFactory.Registrar {

@Override
public Map<String, SinkFactory> factories() {
PubsubDynamicSink.Factory factory = new Factory();
return ImmutableMap.of(
"PubsubDynamicSink",
factory,
"org.apache.beam.runners.dataflow.worker.PubsubDynamicSink",
factory);
}
}

static class Factory implements SinkFactory {
@Override
public PubsubDynamicSink create(
CloudObject spec,
Coder<?> coder,
@Nullable PipelineOptions options,
@Nullable DataflowExecutionContext executionContext,
DataflowOperationContext operationContext)
throws Exception {
String timestampLabel = getString(spec, PropertyNames.PUBSUB_TIMESTAMP_ATTRIBUTE, "");
String idLabel = getString(spec, PropertyNames.PUBSUB_ID_ATTRIBUTE, "");

return new PubsubDynamicSink(
timestampLabel,
idLabel,
(StreamingModeExecutionContext) checkArgumentNotNull(executionContext));
}
}

@Override
public Sink.SinkWriter<WindowedValue<PubsubMessage>> writer() {
return new PubsubDynamicSink.PubsubWriter();
}

class PubsubWriter implements Sink.SinkWriter<WindowedValue<PubsubMessage>> {
private final Map<String, Windmill.PubSubMessageBundle.Builder> outputBuilders;
private final ByteStringOutputStream stream; // Kept across adds for buffer reuse.

PubsubWriter() {
outputBuilders = Maps.newHashMap();
stream = new ByteStringOutputStream();
}

public ByteString getDataFromMessage(PubsubMessage formatted, ByteStringOutputStream stream)
throws IOException {
Pubsub.PubsubMessage.Builder pubsubMessageBuilder =
Pubsub.PubsubMessage.newBuilder().setData(ByteString.copyFrom(formatted.getPayload()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any way to avoid this copy?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure - ByteString has copyFrom and readFrom. Don't see an obvious way to do this without a copy

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should separately look into switching to ByteString within beam's PubsubMessage.

Map<String, String> attributeMap = formatted.getAttributeMap();
if (attributeMap != null) {
pubsubMessageBuilder.putAllAttributes(attributeMap);
}
pubsubMessageBuilder.build().writeTo(stream);
return stream.toByteStringAndReset();
}

public void close(Windmill.PubSubMessageBundle.Builder outputBuilder) throws IOException {
context.getOutputBuilder().addPubsubMessages(outputBuilder);
outputBuilder.clear();
}

@Override
public long add(WindowedValue<PubsubMessage> data) throws IOException {
String dataTopic =
checkArgumentNotNull(
data.getValue().getTopic(), "No topic set for message when using dynamic topics.");
Preconditions.checkArgument(
!dataTopic.isEmpty(), "No topic set for message when using dynamic topics.");
ByteString byteString = getDataFromMessage(data.getValue(), stream);
Windmill.PubSubMessageBundle.Builder builder =
outputBuilders.computeIfAbsent(
dataTopic,
topic ->
context
.getOutputBuilder()
.addPubsubMessagesBuilder()
.setTopic(topic)
.setTimestampLabel(timestampLabel)
.setIdLabel(idLabel)
.setWithAttributes(true));
builder.addMessages(
Windmill.Message.newBuilder()
.setData(byteString)
.setTimestamp(WindmillTimeUtils.harnessToWindmillTimestamp(data.getTimestamp()))
.build());
return byteString.size();
}

@Override
public void close() throws IOException {
outputBuilders.clear();
}

@Override
public void abort() throws IOException {
close();
}
}

@Override
public boolean supportsRestart() {
return true;
}
}
Loading