Skip to content

Commit

Permalink
Pubsub dynamic topic destinations
Browse files Browse the repository at this point in the history
  • Loading branch information
TSultanov authored and reuvenlax committed Apr 22, 2023
1 parent 0924840 commit ca67490
Show file tree
Hide file tree
Showing 26 changed files with 1,472 additions and 330 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ public class PTransformTranslation {
public static final String PUBSUB_READ = "beam:transform:pubsub_read:v1";
public static final String PUBSUB_WRITE = "beam:transform:pubsub_write:v1";

public static final String PUBSUB_WRITE_DYNAMIC = "beam:transform:pubsub_write:v2";

// CombineComponents
public static final String COMBINE_PER_KEY_PRECOMBINE_TRANSFORM_URN =
"beam:transform:combine_per_key_precombine:v1";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1797,8 +1797,7 @@ public PubsubMessage apply(PubsubMessage input) {
* Suppress application of {@link PubsubUnboundedSink#expand} in streaming mode so that we can
* instead defer to Windmill's implementation.
*/
private static class StreamingPubsubIOWrite
extends PTransform<PCollection<PubsubMessage>, PDone> {
static class StreamingPubsubIOWrite extends PTransform<PCollection<PubsubMessage>, PDone> {

private final PubsubUnboundedSink transform;

Expand Down Expand Up @@ -1850,13 +1849,24 @@ private static void translate(
StepTranslationContext stepContext,
PCollection input) {
stepContext.addInput(PropertyNames.FORMAT, "pubsub");
if (overriddenTransform.getTopicProvider().isAccessible()) {
stepContext.addInput(
PropertyNames.PUBSUB_TOPIC, overriddenTransform.getTopic().getFullPath());
if (overriddenTransform.getTopicProvider() != null) {
if (overriddenTransform.getTopicProvider().isAccessible()) {
stepContext.addInput(
PropertyNames.PUBSUB_TOPIC, overriddenTransform.getTopic().getFullPath());
} else {
stepContext.addInput(
PropertyNames.PUBSUB_TOPIC_OVERRIDE,
((NestedValueProvider) overriddenTransform.getTopicProvider()).propertyName());
}
} else {
stepContext.addInput(
PropertyNames.PUBSUB_TOPIC_OVERRIDE,
((NestedValueProvider) overriddenTransform.getTopicProvider()).propertyName());
DataflowPipelineOptions options =
input.getPipeline().getOptions().as(DataflowPipelineOptions.class);
if (options.getEnableDynamicPubsubDestinations()) {
stepContext.addInput(PropertyNames.PUBSUB_DYNAMIC_DESTINATIONS, true);
} else {
throw new RuntimeException(
"Dynamic Pubsub destinations not yet supported. Topic must be set.");
}
}
if (overriddenTransform.getTimestampAttribute() != null) {
stepContext.addInput(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,14 @@ public interface DataflowPipelineOptions
@Description("The customized dataflow worker jar")
String getDataflowWorkerJar();

void setDataflowWorkerJar(String dataflowWorkerJar);
void setDataflowWorkerJar(String dataflowWorkerJafr);

// Disable this support for now until the Dataflow backend fully supports this option.
@Description("Whether to allow dynamic pubsub destinations. Temporary option: will be removed.")
@Default.Boolean(false)
Boolean getEnableDynamicPubsubDestinations();

void setEnableDynamicPubsubDestinations(Boolean enable);

/** Set of available Flexible Resource Scheduling goals. */
enum FlexResourceSchedulingGoal {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,15 @@ public class PropertyNames {
public static final String PARALLEL_INPUT = "parallel_input";
public static final String PUBSUB_ID_ATTRIBUTE = "pubsub_id_label";
public static final String PUBSUB_SERIALIZED_ATTRIBUTES_FN = "pubsub_serialized_attributes_fn";

public static final String PUBSUB_SUBSCRIPTION = "pubsub_subscription";
public static final String PUBSUB_SUBSCRIPTION_OVERRIDE = "pubsub_subscription_runtime_override";
public static final String PUBSUB_TIMESTAMP_ATTRIBUTE = "pubsub_timestamp_label";
public static final String PUBSUB_TOPIC = "pubsub_topic";
public static final String PUBSUB_TOPIC_OVERRIDE = "pubsub_topic_runtime_override";

public static final String PUBSUB_DYNAMIC_DESTINATIONS = "pubsub_with_dynamic_destinations";

public static final String SCALAR_FIELD_NAME = "value";
public static final String SERIALIZED_FN = "serialized_fn";
public static final String SERIALIZED_TEST_STREAM = "serialized_test_stream";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import java.io.IOException;
import java.io.Serializable;
import java.nio.channels.FileChannel;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
Expand Down Expand Up @@ -120,6 +121,8 @@
import org.apache.beam.sdk.io.WriteFiles;
import org.apache.beam.sdk.io.WriteFilesResult;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptions.CheckEnabled;
Expand Down Expand Up @@ -163,7 +166,10 @@
import org.apache.beam.vendor.grpc.v1p54p0.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.hamcrest.Description;
import org.hamcrest.Matchers;
import org.hamcrest.TypeSafeMatcher;
Expand Down Expand Up @@ -2341,6 +2347,74 @@ public void testStreamingGroupIntoBatchesWithShardedKeyOverrideBytes() throws IO
verifyGroupIntoBatchesOverrideBytes(p, true, true);
}

@Test
public void testPubsubSinkOverride() throws IOException {
PipelineOptions options = buildPipelineOptions();
List<String> experiments =
new ArrayList<>(
ImmutableList.of(
GcpOptions.STREAMING_ENGINE_EXPERIMENT,
GcpOptions.WINDMILL_SERVICE_EXPERIMENT,
"use_runner_v2"));
DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
dataflowOptions.setExperiments(experiments);
dataflowOptions.setStreaming(true);
Pipeline p = Pipeline.create(options);

List<PubsubMessage> testValues =
Arrays.asList(
new PubsubMessage("foo".getBytes(StandardCharsets.UTF_8), Collections.emptyMap()));
PCollection<PubsubMessage> input =
p.apply("CreateValuesBytes", Create.of(testValues))
.setIsBoundedInternal(PCollection.IsBounded.UNBOUNDED);
input.apply(PubsubIO.writeMessages().to("projects/project/topics/topic"));
p.run();

AtomicBoolean sawPubsubOverride = new AtomicBoolean(false);
p.traverseTopologically(
new PipelineVisitor.Defaults() {
@Override
public void visitPrimitiveTransform(@UnknownKeyFor @NonNull @Initialized Node node) {
if (node.getTransform() instanceof DataflowRunner.StreamingPubsubIOWrite) {
sawPubsubOverride.set(true);
}
}
});
assertTrue(sawPubsubOverride.get());
}

@Test
public void testPubsubSinkDynamicOverride() throws IOException {
PipelineOptions options = buildPipelineOptions();
DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
dataflowOptions.setStreaming(true);
dataflowOptions.setEnableDynamicPubsubDestinations(true);
Pipeline p = Pipeline.create(options);

List<PubsubMessage> testValues =
Arrays.asList(
new PubsubMessage("foo".getBytes(StandardCharsets.UTF_8), Collections.emptyMap())
.withTopic(""));
PCollection<PubsubMessage> input =
p.apply("CreateValuesBytes", Create.of(testValues))
.setIsBoundedInternal(PCollection.IsBounded.UNBOUNDED);
input.apply(PubsubIO.writeMessagesDynamic());
p.run();

AtomicBoolean sawPubsubOverride = new AtomicBoolean(false);
p.traverseTopologically(
new PipelineVisitor.Defaults() {

@Override
public void visitPrimitiveTransform(@UnknownKeyFor @NonNull @Initialized Node node) {
if (node.getTransform() instanceof DataflowRunner.StreamingPubsubIOWrite) {
sawPubsubOverride.set(true);
}
}
});
assertTrue(sawPubsubOverride.get());
}

static class TestExpansionServiceClientFactory implements ExpansionServiceClientFactory {
ExpansionApi.ExpansionResponse response;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.dataflow.worker;

import static org.apache.beam.runners.dataflow.util.Structs.getString;
import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;

import com.google.auto.service.AutoService;
import java.io.IOException;
import java.util.Map;
import org.apache.beam.runners.dataflow.util.CloudObject;
import org.apache.beam.runners.dataflow.util.PropertyNames;
import org.apache.beam.runners.dataflow.worker.util.common.worker.Sink;
import org.apache.beam.runners.dataflow.worker.windmill.Pubsub;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.util.ByteStringOutputStream;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
import org.checkerframework.checker.nullness.qual.Nullable;

@SuppressWarnings({
"rawtypes", // TODO(https://github.com/apache/beam/issues/20447)
})
public class PubsubDynamicSink extends Sink<WindowedValue<PubsubMessage>> {
private final String timestampLabel;
private final String idLabel;
private final StreamingModeExecutionContext context;

PubsubDynamicSink(String timestampLabel, String idLabel, StreamingModeExecutionContext context) {
this.timestampLabel = timestampLabel;
this.idLabel = idLabel;
this.context = context;
}

/** A {@link SinkFactory.Registrar} for pubsub sinks. */
@AutoService(SinkFactory.Registrar.class)
public static class Registrar implements SinkFactory.Registrar {

@Override
public Map<String, SinkFactory> factories() {
PubsubDynamicSink.Factory factory = new Factory();
return ImmutableMap.of(
"PubsubDynamicSink",
factory,
"org.apache.beam.runners.dataflow.worker.PubsubDynamicSink",
factory);
}
}

static class Factory implements SinkFactory {
@Override
public PubsubDynamicSink create(
CloudObject spec,
Coder<?> coder,
@Nullable PipelineOptions options,
@Nullable DataflowExecutionContext executionContext,
DataflowOperationContext operationContext)
throws Exception {
String timestampLabel = getString(spec, PropertyNames.PUBSUB_TIMESTAMP_ATTRIBUTE, "");
String idLabel = getString(spec, PropertyNames.PUBSUB_ID_ATTRIBUTE, "");

return new PubsubDynamicSink(
timestampLabel,
idLabel,
(StreamingModeExecutionContext) checkArgumentNotNull(executionContext));
}
}

@Override
public Sink.SinkWriter<WindowedValue<PubsubMessage>> writer() {
return new PubsubDynamicSink.PubsubWriter();
}

class PubsubWriter implements Sink.SinkWriter<WindowedValue<PubsubMessage>> {
private final Map<String, Windmill.PubSubMessageBundle.Builder> outputBuilders;
private final ByteStringOutputStream stream; // Kept across adds for buffer reuse.

PubsubWriter() {
outputBuilders = Maps.newHashMap();
stream = new ByteStringOutputStream();
}

public ByteString getDataFromMessage(PubsubMessage formatted, ByteStringOutputStream stream)
throws IOException {
Pubsub.PubsubMessage.Builder pubsubMessageBuilder =
Pubsub.PubsubMessage.newBuilder().setData(ByteString.copyFrom(formatted.getPayload()));
Map<String, String> attributeMap = formatted.getAttributeMap();
if (attributeMap != null) {
pubsubMessageBuilder.putAllAttributes(attributeMap);
}
pubsubMessageBuilder.build().writeTo(stream);
return stream.toByteStringAndReset();
}

public void close(Windmill.PubSubMessageBundle.Builder outputBuilder) throws IOException {
context.getOutputBuilder().addPubsubMessages(outputBuilder);
outputBuilder.clear();
}

@Override
public long add(WindowedValue<PubsubMessage> data) throws IOException {
String dataTopic =
checkArgumentNotNull(
data.getValue().getTopic(), "No topic set for message when using dynamic topics.");
Preconditions.checkArgument(
!dataTopic.isEmpty(), "No topic set for message when using dynamic topics.");
ByteString byteString = getDataFromMessage(data.getValue(), stream);
Windmill.PubSubMessageBundle.Builder builder =
outputBuilders.computeIfAbsent(
dataTopic,
topic ->
context
.getOutputBuilder()
.addPubsubMessagesBuilder()
.setTopic(topic)
.setTimestampLabel(timestampLabel)
.setIdLabel(idLabel)
.setWithAttributes(true));
builder.addMessages(
Windmill.Message.newBuilder()
.setData(byteString)
.setTimestamp(WindmillTimeUtils.harnessToWindmillTimestamp(data.getTimestamp()))
.build());
return byteString.size();
}

@Override
public void close() throws IOException {
outputBuilders.clear();
}

@Override
public void abort() throws IOException {
close();
}
}

@Override
public boolean supportsRestart() {
return true;
}
}
Loading

0 comments on commit ca67490

Please sign in to comment.