diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/SolaceCheckpointMark.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/SolaceCheckpointMark.java index 7717032d94e4..a913fd6133ea 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/SolaceCheckpointMark.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/SolaceCheckpointMark.java @@ -18,10 +18,8 @@ package org.apache.beam.sdk.io.solace.read; import com.solacesystems.jcsmp.BytesXMLMessage; -import java.util.Map; -import java.util.Map.Entry; import java.util.Objects; -import java.util.function.Consumer; +import java.util.Queue; import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.coders.DefaultCoder; import org.apache.beam.sdk.extensions.avro.coders.AvroCoder; @@ -40,8 +38,7 @@ @VisibleForTesting public class SolaceCheckpointMark implements UnboundedSource.CheckpointMark { private static final Logger LOG = LoggerFactory.getLogger(SolaceCheckpointMark.class); - private transient Map safeToAck; - private transient Consumer confirmAckCallback; + private transient Queue safeToAck; @SuppressWarnings("initialization") // Avro will set the fields by breaking abstraction private SolaceCheckpointMark() {} @@ -49,35 +46,24 @@ private SolaceCheckpointMark() {} /** * Creates a new {@link SolaceCheckpointMark}. * - * @param markAsAckedFn {@link Consumer} a reference to a method in the {@link - * UnboundedSolaceReader} that will mark the message as acknowledged. - * @param safeToAck {@link Map} of {@link BytesXMLMessage} to be - * acknowledged. + * @param safeToAck - a queue of {@link BytesXMLMessage} to be acknowledged. */ - SolaceCheckpointMark(Consumer markAsAckedFn, Map safeToAck) { - this.confirmAckCallback = markAsAckedFn; + SolaceCheckpointMark(Queue safeToAck) { this.safeToAck = safeToAck; } @Override public void finalizeCheckpoint() { - if (safeToAck == null) { - return; - } - - for (Entry entry : safeToAck.entrySet()) { - BytesXMLMessage msg = entry.getValue(); - if (msg != null) { - try { - msg.ackMessage(); - } catch (IllegalStateException e) { - LOG.error( - "SolaceIO.Read: cannot acknowledge the message with applicationMessageId={}, ackMessageId={} .", - msg.getApplicationMessageId(), - msg.getAckMessageId(), - e); - } - confirmAckCallback.accept(entry.getKey()); + BytesXMLMessage msg; + while ((msg = safeToAck.poll()) != null) { + try { + msg.ackMessage(); + } catch (IllegalStateException e) { + LOG.error( + "SolaceIO.Read: cannot acknowledge the message with applicationMessageId={}, ackMessageId={}. It will not be retried.", + msg.getApplicationMessageId(), + msg.getAckMessageId(), + e); } } } @@ -94,12 +80,11 @@ public boolean equals(@Nullable Object o) { return false; } SolaceCheckpointMark that = (SolaceCheckpointMark) o; - return Objects.equals(safeToAck, that.safeToAck) - && Objects.equals(confirmAckCallback, that.confirmAckCallback); + return Objects.equals(safeToAck, that.safeToAck); } @Override public int hashCode() { - return Objects.hash(safeToAck, confirmAckCallback); + return Objects.hash(safeToAck); } } diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java index afc46a8ce355..dc84e0a07017 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java @@ -23,8 +23,7 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; import java.time.Duration; -import java.util.HashMap; -import java.util.Map; +import java.util.ArrayDeque; import java.util.NoSuchElementException; import java.util.Queue; import java.util.UUID; @@ -43,7 +42,6 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.RemovalNotification; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Instant; import org.slf4j.Logger; @@ -63,23 +61,16 @@ class UnboundedSolaceReader extends UnboundedReader { private @Nullable T solaceMappedRecord; /** - * List of successfully ACKed message (surrogate) ids which need to be pruned from the above. - * CAUTION: Accessed by both reader and checkpointing threads. + * Queue to place advanced messages before {@link #getCheckpointMark()} is called. CAUTION: + * Accessed by both reader and checkpointing threads. */ - private final Queue ackedMessageIds; + private final Queue safeToAckMessages = new ConcurrentLinkedQueue<>(); /** - * Map to place advanced messages before {@link #getCheckpointMark()} is called. This is a - * non-concurrent object, should only be accessed by the reader thread. + * Queue for messages that were ingested in the {@link #advance()} method, but not sent yet to a + * {@link SolaceCheckpointMark}. */ - private final Map safeToAckMessages; - - /** - * Surrogate id used as a key in Collections storing messages that are waiting to be acknowledged - * ({@link UnboundedSolaceReader#safeToAckMessages}) and already acknowledged ({@link - * UnboundedSolaceReader#ackedMessageIds}). - */ - private Long surrogateId = 0L; + private final Queue receivedMessages = new ArrayDeque<>(); private static final Cache sessionServiceCache; private static final ScheduledExecutorService cleanUpThread = Executors.newScheduledThreadPool(1); @@ -117,8 +108,6 @@ public UnboundedSolaceReader(UnboundedSolaceSource currentSource) { currentSource.getTimestampFn(), currentSource.getWatermarkIdleDurationThreshold()); this.sessionServiceFactory = currentSource.getSessionServiceFactory(); this.sempClient = currentSource.getSempClientFactory().create(); - this.safeToAckMessages = new HashMap<>(); - this.ackedMessageIds = new ConcurrentLinkedQueue<>(); this.readerUuid = UUID.randomUUID(); } @@ -147,8 +136,7 @@ public boolean start() { @Override public boolean advance() { - // Retire state associated with ACKed messages. - retire(); + finalizeReadyMessages(); BytesXMLMessage receivedXmlMessage; try { @@ -163,17 +151,34 @@ public boolean advance() { } solaceOriginalRecord = receivedXmlMessage; solaceMappedRecord = getCurrentSource().getParseFn().apply(receivedXmlMessage); - safeToAckMessages.put(surrogateId, receivedXmlMessage); - surrogateId++; + receivedMessages.add(receivedXmlMessage); return true; } @Override public void close() { + finalizeReadyMessages(); sessionServiceCache.invalidate(readerUuid); } + public void finalizeReadyMessages() { + BytesXMLMessage msg; + while ((msg = safeToAckMessages.poll()) != null) { + try { + msg.ackMessage(); + } catch (IllegalStateException e) { + LOG.error( + "SolaceIO.Read: failed to acknowledge the message with applicationMessageId={}, ackMessageId={}. Returning the message to queue to retry.", + msg.getApplicationMessageId(), + msg.getAckMessageId(), + e); + safeToAckMessages.add(msg); // In case the error was transient, might succeed later + break; // Commit is only best effort + } + } + } + @Override public Instant getWatermark() { // should be only used by a test receiver @@ -185,10 +190,9 @@ public Instant getWatermark() { @Override public UnboundedSource.CheckpointMark getCheckpointMark() { - // It's possible for a checkpoint to be taken but never finalized. - // So we simply copy whatever safeToAckIds we currently have. - Map snapshotSafeToAckMessages = Maps.newHashMap(safeToAckMessages); - return new SolaceCheckpointMark(this::markAsAcked, snapshotSafeToAckMessages); + safeToAckMessages.addAll(receivedMessages); + receivedMessages.clear(); + return new SolaceCheckpointMark(safeToAckMessages); } @Override @@ -238,21 +242,4 @@ public long getTotalBacklogBytes() { return BACKLOG_UNKNOWN; } } - - public void markAsAcked(Long messageSurrogateId) { - ackedMessageIds.add(messageSurrogateId); - } - - /** - * Messages which have been ACKed (via the checkpoint finalize) can be safely removed from the - * list of messages to acknowledge. - */ - private void retire() { - while (!ackedMessageIds.isEmpty()) { - Long ackMessageId = ackedMessageIds.poll(); - if (ackMessageId != null) { - safeToAckMessages.remove(ackMessageId); - } - } - } }