From 48520eaf53a760436df40c753058c6143e2efe9c Mon Sep 17 00:00:00 2001 From: Bartosz Zablocki Date: Tue, 19 Nov 2024 17:02:39 +0100 Subject: [PATCH] Wrap message acknowledgment in a try-catch block, remove active AtomicBool from the reader. --- .../io/solace/read/SolaceCheckpointMark.java | 30 ++++++++++--------- .../io/solace/read/UnboundedSolaceReader.java | 5 +--- 2 files changed, 17 insertions(+), 18 deletions(-) diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/SolaceCheckpointMark.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/SolaceCheckpointMark.java index ba63f3f48f9e..7717032d94e4 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/SolaceCheckpointMark.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/SolaceCheckpointMark.java @@ -21,7 +21,6 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Objects; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.coders.DefaultCoder; @@ -29,6 +28,8 @@ import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.checkerframework.checker.nullness.qual.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Checkpoint for an unbounded Solace source. Consists of the Solace messages waiting to be @@ -38,9 +39,9 @@ @Internal @VisibleForTesting public class SolaceCheckpointMark implements UnboundedSource.CheckpointMark { + private static final Logger LOG = LoggerFactory.getLogger(SolaceCheckpointMark.class); private transient Map safeToAck; private transient Consumer confirmAckCallback; - private transient AtomicBoolean activeReader; @SuppressWarnings("initialization") // Avro will set the fields by breaking abstraction private SolaceCheckpointMark() {} @@ -50,30 +51,32 @@ private SolaceCheckpointMark() {} * * @param markAsAckedFn {@link Consumer} a reference to a method in the {@link * UnboundedSolaceReader} that will mark the message as acknowledged. - * @param activeReader {@link AtomicBoolean} indicating if the related reader is active. The - * reader creating the messages has to be active to acknowledge the messages. * @param safeToAck {@link Map} of {@link BytesXMLMessage} to be * acknowledged. */ - SolaceCheckpointMark( - Consumer markAsAckedFn, - AtomicBoolean activeReader, - Map safeToAck) { + SolaceCheckpointMark(Consumer markAsAckedFn, Map safeToAck) { this.confirmAckCallback = markAsAckedFn; - this.activeReader = activeReader; this.safeToAck = safeToAck; } @Override public void finalizeCheckpoint() { - if (activeReader == null || !activeReader.get() || safeToAck == null) { + if (safeToAck == null) { return; } for (Entry entry : safeToAck.entrySet()) { BytesXMLMessage msg = entry.getValue(); if (msg != null) { - msg.ackMessage(); + try { + msg.ackMessage(); + } catch (IllegalStateException e) { + LOG.error( + "SolaceIO.Read: cannot acknowledge the message with applicationMessageId={}, ackMessageId={} .", + msg.getApplicationMessageId(), + msg.getAckMessageId(), + e); + } confirmAckCallback.accept(entry.getKey()); } } @@ -92,12 +95,11 @@ public boolean equals(@Nullable Object o) { } SolaceCheckpointMark that = (SolaceCheckpointMark) o; return Objects.equals(safeToAck, that.safeToAck) - && Objects.equals(confirmAckCallback, that.confirmAckCallback) - && Objects.equals(activeReader, that.activeReader); + && Objects.equals(confirmAckCallback, that.confirmAckCallback); } @Override public int hashCode() { - return Objects.hash(safeToAck, confirmAckCallback, activeReader); + return Objects.hash(safeToAck, confirmAckCallback); } } diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java index c716f8638b5d..afc46a8ce355 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java @@ -33,7 +33,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader; import org.apache.beam.sdk.io.solace.broker.SempClient; @@ -62,7 +61,6 @@ class UnboundedSolaceReader extends UnboundedReader { private final SessionServiceFactory sessionServiceFactory; private @Nullable BytesXMLMessage solaceOriginalRecord; private @Nullable T solaceMappedRecord; - AtomicBoolean active = new AtomicBoolean(true); /** * List of successfully ACKed message (surrogate) ids which need to be pruned from the above. @@ -173,7 +171,6 @@ public boolean advance() { @Override public void close() { - active.set(false); sessionServiceCache.invalidate(readerUuid); } @@ -191,7 +188,7 @@ public UnboundedSource.CheckpointMark getCheckpointMark() { // It's possible for a checkpoint to be taken but never finalized. // So we simply copy whatever safeToAckIds we currently have. Map snapshotSafeToAckMessages = Maps.newHashMap(safeToAckMessages); - return new SolaceCheckpointMark(this::markAsAcked, active, snapshotSafeToAckMessages); + return new SolaceCheckpointMark(this::markAsAcked, snapshotSafeToAckMessages); } @Override