Skip to content

Commit

Permalink
Wrap message acknowledgment in a try-catch block, remove active Atomi…
Browse files Browse the repository at this point in the history
…cBool from the reader.
  • Loading branch information
bzablocki committed Nov 19, 2024
1 parent da95027 commit 48520ea
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Checkpoint for an unbounded Solace source. Consists of the Solace messages waiting to be
Expand All @@ -38,9 +39,9 @@
@Internal
@VisibleForTesting
public class SolaceCheckpointMark implements UnboundedSource.CheckpointMark {
private static final Logger LOG = LoggerFactory.getLogger(SolaceCheckpointMark.class);
private transient Map<Long, BytesXMLMessage> safeToAck;
private transient Consumer<Long> confirmAckCallback;
private transient AtomicBoolean activeReader;

@SuppressWarnings("initialization") // Avro will set the fields by breaking abstraction
private SolaceCheckpointMark() {}
Expand All @@ -50,30 +51,32 @@ private SolaceCheckpointMark() {}
*
* @param markAsAckedFn {@link Consumer<Long>} a reference to a method in the {@link
* UnboundedSolaceReader} that will mark the message as acknowledged.
* @param activeReader {@link AtomicBoolean} indicating if the related reader is active. The
* reader creating the messages has to be active to acknowledge the messages.
* @param safeToAck {@link Map<Long, BytesXMLMessage>} of {@link BytesXMLMessage} to be
* acknowledged.
*/
SolaceCheckpointMark(
Consumer<Long> markAsAckedFn,
AtomicBoolean activeReader,
Map<Long, BytesXMLMessage> safeToAck) {
SolaceCheckpointMark(Consumer<Long> markAsAckedFn, Map<Long, BytesXMLMessage> safeToAck) {
this.confirmAckCallback = markAsAckedFn;
this.activeReader = activeReader;
this.safeToAck = safeToAck;
}

@Override
public void finalizeCheckpoint() {
if (activeReader == null || !activeReader.get() || safeToAck == null) {
if (safeToAck == null) {
return;
}

for (Entry<Long, BytesXMLMessage> entry : safeToAck.entrySet()) {
BytesXMLMessage msg = entry.getValue();
if (msg != null) {
msg.ackMessage();
try {
msg.ackMessage();
} catch (IllegalStateException e) {
LOG.error(
"SolaceIO.Read: cannot acknowledge the message with applicationMessageId={}, ackMessageId={} .",
msg.getApplicationMessageId(),
msg.getAckMessageId(),
e);
}
confirmAckCallback.accept(entry.getKey());
}
}
Expand All @@ -92,12 +95,11 @@ public boolean equals(@Nullable Object o) {
}
SolaceCheckpointMark that = (SolaceCheckpointMark) o;
return Objects.equals(safeToAck, that.safeToAck)
&& Objects.equals(confirmAckCallback, that.confirmAckCallback)
&& Objects.equals(activeReader, that.activeReader);
&& Objects.equals(confirmAckCallback, that.confirmAckCallback);
}

@Override
public int hashCode() {
return Objects.hash(safeToAck, confirmAckCallback, activeReader);
return Objects.hash(safeToAck, confirmAckCallback);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
import org.apache.beam.sdk.io.solace.broker.SempClient;
Expand Down Expand Up @@ -62,7 +61,6 @@ class UnboundedSolaceReader<T> extends UnboundedReader<T> {
private final SessionServiceFactory sessionServiceFactory;
private @Nullable BytesXMLMessage solaceOriginalRecord;
private @Nullable T solaceMappedRecord;
AtomicBoolean active = new AtomicBoolean(true);

/**
* List of successfully ACKed message (surrogate) ids which need to be pruned from the above.
Expand Down Expand Up @@ -173,7 +171,6 @@ public boolean advance() {

@Override
public void close() {
active.set(false);
sessionServiceCache.invalidate(readerUuid);
}

Expand All @@ -191,7 +188,7 @@ public UnboundedSource.CheckpointMark getCheckpointMark() {
// It's possible for a checkpoint to be taken but never finalized.
// So we simply copy whatever safeToAckIds we currently have.
Map<Long, BytesXMLMessage> snapshotSafeToAckMessages = Maps.newHashMap(safeToAckMessages);
return new SolaceCheckpointMark(this::markAsAcked, active, snapshotSafeToAckMessages);
return new SolaceCheckpointMark(this::markAsAcked, snapshotSafeToAckMessages);
}

@Override
Expand Down

0 comments on commit 48520ea

Please sign in to comment.