Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: add topic existing validation #32465

Merged
merged 10 commits into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@

## I/Os

* Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* Check Pub/Sub topic is existing before Read/Write (Java) ([#32465](https://github.com/apache/beam/pull/32465))
proost marked this conversation as resolved.
Show resolved Hide resolved

## New Features / Improvements

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,9 @@ public abstract void modifyAckDeadline(
/** Return a list of topics for {@code project}. */
public abstract List<TopicPath> listTopics(ProjectPath project) throws IOException;

/** Return {@literal true} if {@code topic} exists. */
public abstract boolean isTopicExists(TopicPath topic) throws IOException;

/** Create {@code subscription} to {@code topic}. */
public abstract void createSubscription(
TopicPath topic, SubscriptionPath subscription, int ackDeadlineSeconds) throws IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import io.grpc.Channel;
import io.grpc.ClientInterceptors;
import io.grpc.ManagedChannel;
import io.grpc.StatusRuntimeException;
import io.grpc.auth.ClientAuthInterceptor;
import io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.NegotiationType;
Expand Down Expand Up @@ -372,6 +373,21 @@ public List<TopicPath> listTopics(ProjectPath project) throws IOException {
return topics;
}

@Override
public boolean isTopicExists(TopicPath topic) throws IOException {
GetTopicRequest request = GetTopicRequest.newBuilder().setTopic(topic.getPath()).build();
try {
publisherStub().getTopic(request);
return true;
} catch (StatusRuntimeException e) {
if (e.getStatus().getCode() == io.grpc.Status.Code.NOT_FOUND) {
return false;
}

throw e;
Copy link
Contributor

@Abacn Abacn Sep 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a breaking change. Pipeline construction environment may not have access to the PubSub. Here it should either be fail safe or not enabled by default.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Abacn
Oh, I missed that point. How about disabled as default? Checking topic existence is needed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about disabled as default?

sounds good

}
}

@Override
public void createSubscription(
TopicPath topic, SubscriptionPath subscription, int ackDeadlineSeconds) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath;
import org.apache.beam.sdk.metrics.Lineage;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
Expand Down Expand Up @@ -861,6 +862,8 @@ public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>>

abstract ErrorHandler<BadRecord, ?> getBadRecordErrorHandler();

abstract boolean getValidate();

abstract Builder<T> toBuilder();

static <T> Builder<T> newBuilder(SerializableFunction<PubsubMessage, T> parseFn) {
Expand All @@ -872,6 +875,7 @@ static <T> Builder<T> newBuilder(SerializableFunction<PubsubMessage, T> parseFn)
builder.setNeedsOrderingKey(false);
builder.setBadRecordRouter(BadRecordRouter.THROWING_ROUTER);
builder.setBadRecordErrorHandler(new DefaultErrorHandler<>());
builder.setValidate(true);
proost marked this conversation as resolved.
Show resolved Hide resolved
return builder;
}

Expand Down Expand Up @@ -919,6 +923,8 @@ abstract static class Builder<T> {
abstract Builder<T> setBadRecordErrorHandler(
ErrorHandler<BadRecord, ?> badRecordErrorHandler);

abstract Builder<T> setValidate(boolean validation);

abstract Read<T> build();
}

Expand All @@ -944,6 +950,7 @@ public Read<T> fromSubscription(ValueProvider<String> subscription) {
return toBuilder()
.setSubscriptionProvider(
NestedValueProvider.of(subscription, PubsubSubscription::fromPath))
.setValidate(true)
.build();
}

Expand All @@ -967,6 +974,7 @@ public Read<T> fromTopic(ValueProvider<String> topic) {
validateTopic(topic);
return toBuilder()
.setTopicProvider(NestedValueProvider.of(topic, PubsubTopic::fromPath))
.setValidate(true)
.build();
}

Expand Down Expand Up @@ -1010,6 +1018,7 @@ public Read<T> withDeadLetterTopic(ValueProvider<String> deadLetterTopic) {
return toBuilder()
.setDeadLetterTopicProvider(
NestedValueProvider.of(deadLetterTopic, PubsubTopic::fromPath))
.setValidate(true)
.build();
}

Expand All @@ -1027,7 +1036,7 @@ private static void validateTopic(ValueProvider<String> topic) {
* PubsubGrpcClientFactory}.
*/
public Read<T> withClientFactory(PubsubClient.PubsubClientFactory factory) {
return toBuilder().setPubsubClientFactory(factory).build();
return toBuilder().setPubsubClientFactory(factory).setValidate(true).build();
}

/**
Expand Down Expand Up @@ -1059,7 +1068,7 @@ public Read<T> withClientFactory(PubsubClient.PubsubClientFactory factory) {
* @see <a href="https://www.ietf.org/rfc/rfc3339.txt">RFC 3339</a>
*/
public Read<T> withTimestampAttribute(String timestampAttribute) {
return toBuilder().setTimestampAttribute(timestampAttribute).build();
return toBuilder().setTimestampAttribute(timestampAttribute).setValidate(true).build();
}

/**
Expand All @@ -1072,7 +1081,7 @@ public Read<T> withTimestampAttribute(String timestampAttribute) {
* delivered, and deduplication of the stream will be strictly best effort.
*/
public Read<T> withIdAttribute(String idAttribute) {
return toBuilder().setIdAttribute(idAttribute).build();
return toBuilder().setIdAttribute(idAttribute).setValidate(true).build();
}

/**
Expand All @@ -1082,7 +1091,7 @@ public Read<T> withIdAttribute(String idAttribute) {
* PCollection#setCoder(Coder)}.
*/
public Read<T> withCoderAndParseFn(Coder<T> coder, SimpleFunction<PubsubMessage, T> parseFn) {
return toBuilder().setCoder(coder).setParseFn(parseFn).build();
return toBuilder().setCoder(coder).setParseFn(parseFn).setValidate(true).build();
}

/**
Expand All @@ -1095,9 +1104,15 @@ public Read<T> withErrorHandler(ErrorHandler<BadRecord, ?> badRecordErrorHandler
return toBuilder()
.setBadRecordErrorHandler(badRecordErrorHandler)
.setBadRecordRouter(BadRecordRouter.RECORDING_ROUTER)
.setValidate(true)
.build();
}

/** Disable validation of the existence of the topic. */
public Read<T> withoutValidation() {
return toBuilder().setValidate(false).build();
}

@VisibleForTesting
/**
* Set's the internal Clock.
Expand Down Expand Up @@ -1258,6 +1273,35 @@ public void process() {
return read.setCoder(getCoder());
}

@Override
public void validate(PipelineOptions options) {
if (!getValidate()) {
return;
}

PubsubOptions psOptions = options.as(PubsubOptions.class);

// Validate the existence of the topic.
if (getTopicProvider() != null) {
PubsubTopic topic = getTopicProvider().get();
boolean topicExists = true;
try (PubsubClient pubsubClient =
getPubsubClientFactory()
.newClient(getTimestampAttribute(), getIdAttribute(), psOptions)) {
topicExists =
pubsubClient.isTopicExists(
PubsubClient.topicPathFromName(topic.project, topic.topic));
} catch (Exception e) {
throw new RuntimeException(e);
}

if (!topicExists) {
throw new IllegalArgumentException(
String.format("Pubsub topic '%s' does not exist.", topic));
}
}
}

@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
Expand Down Expand Up @@ -1337,6 +1381,8 @@ public abstract static class Write<T> extends PTransform<PCollection<T>, PDone>

abstract ErrorHandler<BadRecord, ?> getBadRecordErrorHandler();

abstract boolean getValidate();

abstract Builder<T> toBuilder();

static <T> Builder<T> newBuilder(
Expand All @@ -1346,6 +1392,7 @@ static <T> Builder<T> newBuilder(
builder.setFormatFn(formatFn);
builder.setBadRecordRouter(BadRecordRouter.THROWING_ROUTER);
builder.setBadRecordErrorHandler(new DefaultErrorHandler<>());
builder.setValidate(true);
proost marked this conversation as resolved.
Show resolved Hide resolved
return builder;
}

Expand Down Expand Up @@ -1382,6 +1429,8 @@ abstract Builder<T> setFormatFn(
abstract Builder<T> setBadRecordErrorHandler(
ErrorHandler<BadRecord, ?> badRecordErrorHandler);

abstract Builder<T> setValidate(boolean validation);

abstract Write<T> build();
}

Expand All @@ -1392,15 +1441,19 @@ abstract Builder<T> setBadRecordErrorHandler(
* {@code topic} string.
*/
public Write<T> to(String topic) {
return to(StaticValueProvider.of(topic));
ValueProvider<String> topicProvider = StaticValueProvider.of(topic);
validateTopic(topicProvider);
return to(topicProvider);
}

/** Like {@code topic()} but with a {@link ValueProvider}. */
public Write<T> to(ValueProvider<String> topic) {
validateTopic(topic);
return toBuilder()
.setTopicProvider(NestedValueProvider.of(topic, PubsubTopic::fromPath))
.setTopicFunction(null)
.setDynamicDestinations(false)
.setValidate(true)
.build();
}

Expand All @@ -1414,17 +1467,25 @@ public Write<T> to(SerializableFunction<ValueInSingleWindow<T>, String> topicFun
.setTopicProvider(null)
.setTopicFunction(v -> PubsubTopic.fromPath(topicFunction.apply(v)))
.setDynamicDestinations(true)
.setValidate(true)
.build();
}

/** Handles validation of {@code topic}. */
private static void validateTopic(ValueProvider<String> topic) {
if (topic.isAccessible()) {
PubsubTopic.fromPath(topic.get());
}
}

/**
* The default client to write to Pub/Sub is the {@link PubsubJsonClient}, created by the {@link
* PubsubJsonClient.PubsubJsonClientFactory}. This function allows to change the Pub/Sub client
* by providing another {@link PubsubClient.PubsubClientFactory} like the {@link
* PubsubGrpcClientFactory}.
*/
public Write<T> withClientFactory(PubsubClient.PubsubClientFactory factory) {
return toBuilder().setPubsubClientFactory(factory).build();
return toBuilder().setPubsubClientFactory(factory).setValidate(true).build();
}

/**
Expand All @@ -1439,15 +1500,15 @@ public Write<T> withClientFactory(PubsubClient.PubsubClientFactory factory) {
* hit.
*/
public Write<T> withMaxBatchSize(int batchSize) {
return toBuilder().setMaxBatchSize(batchSize).build();
return toBuilder().setMaxBatchSize(batchSize).setValidate(true).build();
}

/**
* Writes to Pub/Sub are limited by 10mb in general. This attribute controls the maximum allowed
* bytes to be sent to Pub/Sub in a single batched message.
*/
public Write<T> withMaxBatchBytesSize(int maxBatchBytesSize) {
return toBuilder().setMaxBatchBytesSize(maxBatchBytesSize).build();
return toBuilder().setMaxBatchBytesSize(maxBatchBytesSize).setValidate(true).build();
}

/**
Expand All @@ -1461,7 +1522,7 @@ public Write<T> withMaxBatchBytesSize(int maxBatchBytesSize) {
* these timestamps from the appropriate attribute.
*/
public Write<T> withTimestampAttribute(String timestampAttribute) {
return toBuilder().setTimestampAttribute(timestampAttribute).build();
return toBuilder().setTimestampAttribute(timestampAttribute).setValidate(true).build();
}

/**
Expand All @@ -1473,11 +1534,11 @@ public Write<T> withTimestampAttribute(String timestampAttribute) {
* these unique identifiers from the appropriate attribute.
*/
public Write<T> withIdAttribute(String idAttribute) {
return toBuilder().setIdAttribute(idAttribute).build();
return toBuilder().setIdAttribute(idAttribute).setValidate(true).build();
}

public Write<T> withPubsubRootUrl(String pubsubRootUrl) {
return toBuilder().setPubsubRootUrl(pubsubRootUrl).build();
return toBuilder().setPubsubRootUrl(pubsubRootUrl).setValidate(true).build();
}

/**
Expand All @@ -1490,9 +1551,18 @@ public Write<T> withErrorHandler(ErrorHandler<BadRecord, ?> badRecordErrorHandle
return toBuilder()
.setBadRecordErrorHandler(badRecordErrorHandler)
.setBadRecordRouter(BadRecordRouter.RECORDING_ROUTER)
.setValidate(true)
.build();
}

/**
* Disable validation of the existence of the topic. Validation of the topic works only if the
* topic is set statically and not dynamically.
*/
public Write<T> withoutValidation() {
return toBuilder().setValidate(false).build();
}

@Override
public PDone expand(PCollection<T> input) {
if (getTopicProvider() == null && !getDynamicDestinations()) {
Expand Down Expand Up @@ -1569,6 +1639,35 @@ public void populateDisplayData(DisplayData.Builder builder) {
builder, getTimestampAttribute(), getIdAttribute(), getTopicProvider());
}

@Override
public void validate(PipelineOptions options) {
if (!getValidate()) {
return;
}

PubsubOptions psOptions = options.as(PubsubOptions.class);

// Validate the existence of the topic.
if (getTopicProvider() != null) {
PubsubTopic topic = getTopicProvider().get();
boolean topicExists = true;
try (PubsubClient pubsubClient =
getPubsubClientFactory()
.newClient(getTimestampAttribute(), getIdAttribute(), psOptions)) {
topicExists =
pubsubClient.isTopicExists(
PubsubClient.topicPathFromName(topic.project, topic.topic));
} catch (Exception e) {
throw new RuntimeException(e);
}

if (!topicExists) {
throw new IllegalArgumentException(
String.format("Pubsub topic '%s' does not exist.", topic));
}
}
}

/**
* Writer to Pubsub which batches messages from bounded collections.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;

import com.google.api.client.googleapis.json.GoogleJsonResponseException;
import com.google.api.client.http.HttpRequestInitializer;
import com.google.api.services.pubsub.Pubsub;
import com.google.api.services.pubsub.Pubsub.Projects.Subscriptions;
Expand Down Expand Up @@ -310,6 +311,19 @@ public List<TopicPath> listTopics(ProjectPath project) throws IOException {
return topics;
}

@Override
public boolean isTopicExists(TopicPath topic) throws IOException {
try {
pubsub.projects().topics().get(topic.getPath()).execute();
return true;
} catch (GoogleJsonResponseException e) {
if (e.getStatusCode() == 404) {
return false;
}
throw e;
}
}

@Override
public void createSubscription(
TopicPath topic, SubscriptionPath subscription, int ackDeadlineSeconds) throws IOException {
Expand Down
Loading
Loading