From ef532179d32805c2f26fdab3e81c8cc9e5f49de2 Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Wed, 18 Sep 2024 10:57:22 -0400 Subject: [PATCH] Fix JmsIO NPE (#32489) --- .../org/apache/beam/sdk/io/jms/JmsIO.java | 35 ++++++++++++------- 1 file changed, 23 insertions(+), 12 deletions(-) diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java index 060e660b0847..610e76c78416 100644 --- a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java +++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java @@ -31,6 +31,7 @@ import java.util.Map; import java.util.NoSuchElementException; import java.util.Optional; +import java.util.UUID; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.stream.Stream; @@ -586,7 +587,7 @@ public boolean requiresDeduping() { } static class UnboundedJmsReader extends UnboundedReader { - + private static final byte[] EMPTY = new byte[0]; private UnboundedJmsSource source; @VisibleForTesting JmsCheckpointMark.Preparer checkpointMarkPreparer; private Connection connection; @@ -604,7 +605,7 @@ public UnboundedJmsReader(UnboundedJmsSource source, PipelineOptions options) this.source = source; this.checkpointMarkPreparer = JmsCheckpointMark.newPreparer(); this.currentMessage = null; - this.currentID = new byte[0]; + this.currentID = EMPTY; this.options = options; } @@ -684,18 +685,22 @@ public boolean advance() throws IOException { currentTimestamp = new Instant(message.getJMSTimestamp()); String messageID = message.getJMSMessageID(); - if (this.source.spec.isRequiresDeduping()) { - // per JMS specification, message ID has prefix "id:". The runner use it to dedup message. - // Empty or non-exist message id (possible for optimization configuration set) will induce - // data loss. - if (messageID.length() <= 3) { - throw new RuntimeException( - String.format( - "Invalid JMSMessageID %s while requiresDeduping is set. Data loss possible.", - messageID)); + if (messageID != null) { + if (this.source.spec.isRequiresDeduping()) { + // per JMS specification, message ID has prefix "id:". The runner use it to dedup + // message. Empty or non-exist message id (possible for optimization configuration set) + // will cause data loss. + if (messageID.length() <= 3) { + throw new RuntimeException( + String.format( + "Invalid JMSMessageID %s while requiresDeduping is set. Data loss possible.", + messageID)); + } } + currentID = messageID.getBytes(StandardCharsets.UTF_8); + } else { + currentID = EMPTY; } - currentID = messageID.getBytes(StandardCharsets.UTF_8); return true; } catch (Exception e) { @@ -728,6 +733,12 @@ public Instant getCurrentTimestamp() { public byte[] getCurrentRecordId() { if (currentMessage == null) { throw new NoSuchElementException(); + } else if (currentID == EMPTY && this.source.spec.isRequiresDeduping()) { + LOG.warn( + "Empty JMSRecordID received when requiresDeduping enabled, runner deduplication will" + + " not be effective"); + // Return a random UUID to ensure it won't get dedup + currentID = UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8); } return currentID; }