Skip to content

Commit

Permalink
Store messages in a Queue that is referenced from the CheckpointMark
Browse files Browse the repository at this point in the history
  • Loading branch information
bzablocki committed Nov 21, 2024
1 parent 48520ea commit 036dd62
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,8 @@
package org.apache.beam.sdk.io.solace.read;

import com.solacesystems.jcsmp.BytesXMLMessage;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.function.Consumer;
import java.util.Queue;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
Expand All @@ -40,44 +38,32 @@
@VisibleForTesting
public class SolaceCheckpointMark implements UnboundedSource.CheckpointMark {
private static final Logger LOG = LoggerFactory.getLogger(SolaceCheckpointMark.class);
private transient Map<Long, BytesXMLMessage> safeToAck;
private transient Consumer<Long> confirmAckCallback;
private transient Queue<BytesXMLMessage> safeToAck;

@SuppressWarnings("initialization") // Avro will set the fields by breaking abstraction
private SolaceCheckpointMark() {}

/**
* Creates a new {@link SolaceCheckpointMark}.
*
* @param markAsAckedFn {@link Consumer<Long>} a reference to a method in the {@link
* UnboundedSolaceReader} that will mark the message as acknowledged.
* @param safeToAck {@link Map<Long, BytesXMLMessage>} of {@link BytesXMLMessage} to be
* acknowledged.
* @param safeToAck - a queue of {@link BytesXMLMessage} to be acknowledged.
*/
SolaceCheckpointMark(Consumer<Long> markAsAckedFn, Map<Long, BytesXMLMessage> safeToAck) {
this.confirmAckCallback = markAsAckedFn;
SolaceCheckpointMark(Queue<BytesXMLMessage> safeToAck) {
this.safeToAck = safeToAck;
}

@Override
public void finalizeCheckpoint() {
if (safeToAck == null) {
return;
}

for (Entry<Long, BytesXMLMessage> entry : safeToAck.entrySet()) {
BytesXMLMessage msg = entry.getValue();
if (msg != null) {
try {
msg.ackMessage();
} catch (IllegalStateException e) {
LOG.error(
"SolaceIO.Read: cannot acknowledge the message with applicationMessageId={}, ackMessageId={} .",
msg.getApplicationMessageId(),
msg.getAckMessageId(),
e);
}
confirmAckCallback.accept(entry.getKey());
BytesXMLMessage msg;
while ((msg = safeToAck.poll()) != null) {
try {
msg.ackMessage();
} catch (IllegalStateException e) {
LOG.error(
"SolaceIO.Read: cannot acknowledge the message with applicationMessageId={}, ackMessageId={}. It will not be retried.",
msg.getApplicationMessageId(),
msg.getAckMessageId(),
e);
}
}
}
Expand All @@ -94,12 +80,11 @@ public boolean equals(@Nullable Object o) {
return false;
}
SolaceCheckpointMark that = (SolaceCheckpointMark) o;
return Objects.equals(safeToAck, that.safeToAck)
&& Objects.equals(confirmAckCallback, that.confirmAckCallback);
return Objects.equals(safeToAck, that.safeToAck);
}

@Override
public int hashCode() {
return Objects.hash(safeToAck, confirmAckCallback);
return Objects.hash(safeToAck);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.ArrayDeque;
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.UUID;
Expand All @@ -43,7 +42,6 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.RemovalNotification;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Instant;
import org.slf4j.Logger;
Expand All @@ -63,23 +61,16 @@ class UnboundedSolaceReader<T> extends UnboundedReader<T> {
private @Nullable T solaceMappedRecord;

/**
* List of successfully ACKed message (surrogate) ids which need to be pruned from the above.
* CAUTION: Accessed by both reader and checkpointing threads.
* Queue to place advanced messages before {@link #getCheckpointMark()} is called. CAUTION:
* Accessed by both reader and checkpointing threads.
*/
private final Queue<Long> ackedMessageIds;
private final Queue<BytesXMLMessage> safeToAckMessages = new ConcurrentLinkedQueue<>();

/**
* Map to place advanced messages before {@link #getCheckpointMark()} is called. This is a
* non-concurrent object, should only be accessed by the reader thread.
* Queue for messages that were ingested in the {@link #advance()} method, but not sent yet to a
* {@link SolaceCheckpointMark}.
*/
private final Map<Long, BytesXMLMessage> safeToAckMessages;

/**
* Surrogate id used as a key in Collections storing messages that are waiting to be acknowledged
* ({@link UnboundedSolaceReader#safeToAckMessages}) and already acknowledged ({@link
* UnboundedSolaceReader#ackedMessageIds}).
*/
private Long surrogateId = 0L;
private final Queue<BytesXMLMessage> receivedMessages = new ArrayDeque<>();

private static final Cache<UUID, SessionService> sessionServiceCache;
private static final ScheduledExecutorService cleanUpThread = Executors.newScheduledThreadPool(1);
Expand Down Expand Up @@ -117,8 +108,6 @@ public UnboundedSolaceReader(UnboundedSolaceSource<T> currentSource) {
currentSource.getTimestampFn(), currentSource.getWatermarkIdleDurationThreshold());
this.sessionServiceFactory = currentSource.getSessionServiceFactory();
this.sempClient = currentSource.getSempClientFactory().create();
this.safeToAckMessages = new HashMap<>();
this.ackedMessageIds = new ConcurrentLinkedQueue<>();
this.readerUuid = UUID.randomUUID();
}

Expand Down Expand Up @@ -147,8 +136,7 @@ public boolean start() {

@Override
public boolean advance() {
// Retire state associated with ACKed messages.
retire();
finalizeReadyMessages();

BytesXMLMessage receivedXmlMessage;
try {
Expand All @@ -163,17 +151,34 @@ public boolean advance() {
}
solaceOriginalRecord = receivedXmlMessage;
solaceMappedRecord = getCurrentSource().getParseFn().apply(receivedXmlMessage);
safeToAckMessages.put(surrogateId, receivedXmlMessage);
surrogateId++;
receivedMessages.add(receivedXmlMessage);

return true;
}

@Override
public void close() {
finalizeReadyMessages();
sessionServiceCache.invalidate(readerUuid);
}

public void finalizeReadyMessages() {
BytesXMLMessage msg;
while ((msg = safeToAckMessages.poll()) != null) {
try {
msg.ackMessage();
} catch (IllegalStateException e) {
LOG.error(
"SolaceIO.Read: failed to acknowledge the message with applicationMessageId={}, ackMessageId={}. Returning the message to queue to retry.",
msg.getApplicationMessageId(),
msg.getAckMessageId(),
e);
safeToAckMessages.add(msg); // In case the error was transient, might succeed later
break; // Commit is only best effort
}
}
}

@Override
public Instant getWatermark() {
// should be only used by a test receiver
Expand All @@ -185,10 +190,9 @@ public Instant getWatermark() {

@Override
public UnboundedSource.CheckpointMark getCheckpointMark() {
// It's possible for a checkpoint to be taken but never finalized.
// So we simply copy whatever safeToAckIds we currently have.
Map<Long, BytesXMLMessage> snapshotSafeToAckMessages = Maps.newHashMap(safeToAckMessages);
return new SolaceCheckpointMark(this::markAsAcked, snapshotSafeToAckMessages);
safeToAckMessages.addAll(receivedMessages);
receivedMessages.clear();
return new SolaceCheckpointMark(safeToAckMessages);
}

@Override
Expand Down Expand Up @@ -238,21 +242,4 @@ public long getTotalBacklogBytes() {
return BACKLOG_UNKNOWN;
}
}

public void markAsAcked(Long messageSurrogateId) {
ackedMessageIds.add(messageSurrogateId);
}

/**
* Messages which have been ACKed (via the checkpoint finalize) can be safely removed from the
* list of messages to acknowledge.
*/
private void retire() {
while (!ackedMessageIds.isEmpty()) {
Long ackMessageId = ackedMessageIds.poll();
if (ackMessageId != null) {
safeToAckMessages.remove(ackMessageId);
}
}
}
}

0 comments on commit 036dd62

Please sign in to comment.