From c0f6ac980bae1071fa6a072d17325c5e736e0cb9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rados=C5=82aw=20Stankiewicz?= Date: Fri, 19 Jul 2024 16:47:56 +0200 Subject: [PATCH] PubsubMessageWithTopicCoder.of() is returning wrong coder (#31619) * PubsubMessageWithTopicCoder was never used * refactor --- .../java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java | 2 +- .../beam/sdk/io/gcp/pubsub/PubsubMessageWithTopicCoder.java | 4 ++-- .../org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java | 6 +++--- .../beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java | 2 +- 4 files changed, 7 insertions(+), 7 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java index 01848d92d928..6233cf669080 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java @@ -1488,7 +1488,7 @@ public PDone expand(PCollection input) { .get(BAD_RECORD_TAG) .setCoder(BadRecord.getCoder(input.getPipeline()))); PCollection pubsubMessages = - pubsubMessageTuple.get(pubsubMessageTupleTag).setCoder(new PubsubMessageWithTopicCoder()); + pubsubMessageTuple.get(pubsubMessageTupleTag).setCoder(PubsubMessageWithTopicCoder.of()); switch (input.isBounded()) { case BOUNDED: pubsubMessages.apply( diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithTopicCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithTopicCoder.java index d10b9a2f1066..768aebe54e65 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithTopicCoder.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithTopicCoder.java @@ -45,8 +45,8 @@ public static Coder of(TypeDescriptor ignored) { return of(); } - public static PubsubMessageWithAttributesAndMessageIdCoder of() { - return new PubsubMessageWithAttributesAndMessageIdCoder(); + public static PubsubMessageWithTopicCoder of() { + return new PubsubMessageWithTopicCoder(); } @Override diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java index fe6338a501c4..3027db6aee9d 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java @@ -732,7 +732,7 @@ public void testWriteMalformedMessagesWithErrorHandler() throws Exception { PCollection messages = pipeline.apply( Create.timestamped(ImmutableList.of(pubsubMsg, failingPubsubMsg)) - .withCoder(new PubsubMessageWithTopicCoder())); + .withCoder(PubsubMessageWithTopicCoder.of())); messages.setIsBoundedInternal(PCollection.IsBounded.BOUNDED); ErrorHandler> badRecordErrorHandler = pipeline.registerBadRecordErrorHandler(new ErrorSinkTransform()); @@ -882,7 +882,7 @@ public void testDynamicTopics(boolean isBounded) throws IOException { PCollection messages = pipeline.apply( - Create.timestamped(pubsubMessages).withCoder(new PubsubMessageWithTopicCoder())); + Create.timestamped(pubsubMessages).withCoder(PubsubMessageWithTopicCoder.of())); if (!isBounded) { messages = messages.setIsBoundedInternal(PCollection.IsBounded.UNBOUNDED); } @@ -919,7 +919,7 @@ public void testBigMessageBounded() throws IOException { PCollection messages = pipeline.apply( Create.timestamped(ImmutableList.of(pubsubMsg)) - .withCoder(new PubsubMessageWithTopicCoder())); + .withCoder(PubsubMessageWithTopicCoder.of())); messages.setIsBoundedInternal(PCollection.IsBounded.BOUNDED); messages.apply(PubsubIO.writeMessagesDynamic().withClientFactory(factory)); pipeline.run(); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java index c9b6bae45b98..be68083bb28c 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java @@ -223,7 +223,7 @@ public void testDynamicTopics() throws IOException { Instant.ofEpochMilli(o.getTimestampMsSinceEpoch()))) .collect(Collectors.toList()); - p.apply(Create.timestamped(pubsubMessages).withCoder(new PubsubMessageWithTopicCoder())) + p.apply(Create.timestamped(pubsubMessages).withCoder(PubsubMessageWithTopicCoder.of())) .apply(sink); p.run(); }