Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add pubsub topic validation #32582

Merged
merged 3 commits into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@

* Added support for using vLLM in the RunInference transform (Python) ([#32528](https://github.com/apache/beam/issues/32528))

## I/Os

* PubsubIO can validate that the Pub/Sub topic exists before running the Read/Write pipeline (Java) ([#32465](https://github.com/apache/beam/pull/32465))

## New Features / Improvements

* Dataflow worker can install packages from Google Artifact Registry Python repositories (Python) ([#32123](https://github.com/apache/beam/issues/32123)).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,9 @@ public abstract void modifyAckDeadline(
/** Return a list of topics for {@code project}. */
public abstract List<TopicPath> listTopics(ProjectPath project) throws IOException;

/** Return {@literal true} if {@code topic} exists. */
public abstract boolean isTopicExists(TopicPath topic) throws IOException;

/** Create {@code subscription} to {@code topic}. */
public abstract void createSubscription(
TopicPath topic, SubscriptionPath subscription, int ackDeadlineSeconds) throws IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import io.grpc.Channel;
import io.grpc.ClientInterceptors;
import io.grpc.ManagedChannel;
import io.grpc.StatusRuntimeException;
import io.grpc.auth.ClientAuthInterceptor;
import io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.NegotiationType;
Expand Down Expand Up @@ -372,6 +373,21 @@ public List<TopicPath> listTopics(ProjectPath project) throws IOException {
return topics;
}

@Override
public boolean isTopicExists(TopicPath topic) throws IOException {
GetTopicRequest request = GetTopicRequest.newBuilder().setTopic(topic.getPath()).build();
try {
publisherStub().getTopic(request);
return true;
} catch (StatusRuntimeException e) {
if (e.getStatus().getCode() == io.grpc.Status.Code.NOT_FOUND) {
return false;
}

throw e;
}
}

@Override
public void createSubscription(
TopicPath topic, SubscriptionPath subscription, int ackDeadlineSeconds) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath;
import org.apache.beam.sdk.metrics.Lineage;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
Expand Down Expand Up @@ -860,6 +861,8 @@ public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>>

abstract ErrorHandler<BadRecord, ?> getBadRecordErrorHandler();

abstract boolean getValidate();

abstract Builder<T> toBuilder();

static <T> Builder<T> newBuilder(SerializableFunction<PubsubMessage, T> parseFn) {
Expand All @@ -871,6 +874,7 @@ static <T> Builder<T> newBuilder(SerializableFunction<PubsubMessage, T> parseFn)
builder.setNeedsOrderingKey(false);
builder.setBadRecordRouter(BadRecordRouter.THROWING_ROUTER);
builder.setBadRecordErrorHandler(new DefaultErrorHandler<>());
builder.setValidate(false);
return builder;
}

Expand Down Expand Up @@ -918,6 +922,8 @@ abstract static class Builder<T> {
abstract Builder<T> setBadRecordErrorHandler(
ErrorHandler<BadRecord, ?> badRecordErrorHandler);

abstract Builder<T> setValidate(boolean validation);

abstract Read<T> build();
}

Expand Down Expand Up @@ -1097,6 +1103,11 @@ public Read<T> withErrorHandler(ErrorHandler<BadRecord, ?> badRecordErrorHandler
.build();
}

/** Enable validation of the PubSub Read. */
public Read<T> withValidation() {
return toBuilder().setValidate(true).build();
}

@VisibleForTesting
/**
* Set's the internal Clock.
Expand Down Expand Up @@ -1262,6 +1273,35 @@ public T apply(PubsubMessage input) {
return read.setCoder(getCoder());
}

@Override
public void validate(PipelineOptions options) {
if (!getValidate()) {
return;
}

PubsubOptions psOptions = options.as(PubsubOptions.class);

// Validate the existence of the topic.
if (getTopicProvider() != null) {
PubsubTopic topic = getTopicProvider().get();
boolean topicExists = true;
try (PubsubClient pubsubClient =
getPubsubClientFactory()
.newClient(getTimestampAttribute(), getIdAttribute(), psOptions)) {
topicExists =
pubsubClient.isTopicExists(
PubsubClient.topicPathFromName(topic.project, topic.topic));
} catch (Exception e) {
throw new RuntimeException(e);
}

if (!topicExists) {
throw new IllegalArgumentException(
String.format("Pubsub topic '%s' does not exist.", topic));
}
}
}

@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
Expand Down Expand Up @@ -1341,6 +1381,8 @@ public abstract static class Write<T> extends PTransform<PCollection<T>, PDone>

abstract ErrorHandler<BadRecord, ?> getBadRecordErrorHandler();

abstract boolean getValidate();

abstract Builder<T> toBuilder();

static <T> Builder<T> newBuilder(
Expand All @@ -1350,6 +1392,7 @@ static <T> Builder<T> newBuilder(
builder.setFormatFn(formatFn);
builder.setBadRecordRouter(BadRecordRouter.THROWING_ROUTER);
builder.setBadRecordErrorHandler(new DefaultErrorHandler<>());
builder.setValidate(false);
return builder;
}

Expand Down Expand Up @@ -1386,6 +1429,8 @@ abstract Builder<T> setFormatFn(
abstract Builder<T> setBadRecordErrorHandler(
ErrorHandler<BadRecord, ?> badRecordErrorHandler);

abstract Builder<T> setValidate(boolean validation);

abstract Write<T> build();
}

Expand All @@ -1396,18 +1441,28 @@ abstract Builder<T> setBadRecordErrorHandler(
* {@code topic} string.
*/
public Write<T> to(String topic) {
ValueProvider<String> topicProvider = StaticValueProvider.of(topic);
validateTopic(topicProvider);
return to(StaticValueProvider.of(topic));
}

/** Like {@code topic()} but with a {@link ValueProvider}. */
public Write<T> to(ValueProvider<String> topic) {
validateTopic(topic);
return toBuilder()
.setTopicProvider(NestedValueProvider.of(topic, PubsubTopic::fromPath))
.setTopicFunction(null)
.setDynamicDestinations(false)
.build();
}

/** Handles validation of {@code topic}. */
private static void validateTopic(ValueProvider<String> topic) {
if (topic.isAccessible()) {
PubsubTopic.fromPath(topic.get());
}
}

/**
* Provides a function to dynamically specify the target topic per message. Not compatible with
* any of the other to methods. If {@link #to} is called again specifying a topic, then this
Expand Down Expand Up @@ -1497,6 +1552,11 @@ public Write<T> withErrorHandler(ErrorHandler<BadRecord, ?> badRecordErrorHandle
.build();
}

/** Enable validation of the PubSub Write. */
public Write<T> withValidation() {
return toBuilder().setValidate(true).build();
}

@Override
public PDone expand(PCollection<T> input) {
if (getTopicProvider() == null && !getDynamicDestinations()) {
Expand Down Expand Up @@ -1566,6 +1626,35 @@ public PDone expand(PCollection<T> input) {
throw new RuntimeException(); // cases are exhaustive.
}

@Override
public void validate(PipelineOptions options) {
if (!getValidate()) {
return;
}

PubsubOptions psOptions = options.as(PubsubOptions.class);

// Validate the existence of the topic.
if (getTopicProvider() != null) {
PubsubTopic topic = getTopicProvider().get();
boolean topicExists = true;
try (PubsubClient pubsubClient =
getPubsubClientFactory()
.newClient(getTimestampAttribute(), getIdAttribute(), psOptions)) {
topicExists =
pubsubClient.isTopicExists(
PubsubClient.topicPathFromName(topic.project, topic.topic));
} catch (Exception e) {
throw new RuntimeException(e);
}

if (!topicExists) {
throw new IllegalArgumentException(
String.format("Pubsub topic '%s' does not exist.", topic));
}
}
}

@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;

import com.google.api.client.googleapis.json.GoogleJsonResponseException;
import com.google.api.client.http.HttpRequestInitializer;
import com.google.api.services.pubsub.Pubsub;
import com.google.api.services.pubsub.Pubsub.Projects.Subscriptions;
Expand Down Expand Up @@ -310,6 +311,19 @@ public List<TopicPath> listTopics(ProjectPath project) throws IOException {
return topics;
}

@Override
public boolean isTopicExists(TopicPath topic) throws IOException {
try {
pubsub.projects().topics().get(topic.getPath()).execute();
return true;
} catch (GoogleJsonResponseException e) {
if (e.getStatusCode() == 404) {
return false;
}
throw e;
}
}

@Override
public void createSubscription(
TopicPath topic, SubscriptionPath subscription, int ackDeadlineSeconds) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,12 @@ public List<TopicPath> listTopics(ProjectPath project) throws IOException {
throw new UnsupportedOperationException();
}

@Override
public boolean isTopicExists(TopicPath topic) throws IOException {
// Always return true for testing purposes.
return true;
}

@Override
public void createSubscription(
TopicPath topic, SubscriptionPath subscription, int ackDeadlineSeconds) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import com.google.pubsub.v1.Topic;
import io.grpc.ManagedChannel;
import io.grpc.Server;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.inprocess.InProcessChannelBuilder;
import io.grpc.inprocess.InProcessServerBuilder;
Expand Down Expand Up @@ -432,4 +433,43 @@ public void getSchema(GetSchemaRequest request, StreamObserver<Schema> responseO
server.shutdownNow();
}
}

@Test
public void isTopicExists() throws IOException {
initializeClient(null, null);
TopicPath topicDoesNotExist =
PubsubClient.topicPathFromPath("projects/testProject/topics/dontexist");
TopicPath topicExists = PubsubClient.topicPathFromPath("projects/testProject/topics/exist");

PublisherImplBase publisherImplBase =
new PublisherImplBase() {
@Override
public void getTopic(GetTopicRequest request, StreamObserver<Topic> responseObserver) {
String topicPath = request.getTopic();
if (topicPath.equals(topicDoesNotExist.getPath())) {
responseObserver.onError(
new StatusRuntimeException(Status.fromCode(Status.Code.NOT_FOUND)));
}
if (topicPath.equals(topicExists.getPath())) {
responseObserver.onNext(
Topic.newBuilder()
.setName(topicPath)
.setSchemaSettings(
SchemaSettings.newBuilder().setSchema(SCHEMA.getPath()).build())
.build());
responseObserver.onCompleted();
}
}
};
Server server =
InProcessServerBuilder.forName(channelName).addService(publisherImplBase).build().start();
try {
assertEquals(false, client.isTopicExists(topicDoesNotExist));

assertEquals(true, client.isTopicExists(topicExists));

} finally {
server.shutdownNow();
}
}
}
Loading
Loading