Skip to content

Commit

Permalink
Revert "SolaceIO.Read: handle occasional cases when finalizeCheckpoin…
Browse files Browse the repository at this point in the history
…t is not executed (apache#32962)"

This reverts commit e279e55.
  • Loading branch information
bzablocki committed Dec 2, 2024
1 parent 0ec76af commit 3a60f42
Show file tree
Hide file tree
Showing 9 changed files with 127 additions and 138 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,10 @@ public void close() {
if (messageReceiver != null) {
messageReceiver.close();
}
if (jcsmpSession != null) {
if (messageProducer != null) {
messageProducer.close();
}
if (!isClosed()) {
checkStateNotNull(jcsmpSession).closeSession();
}
return 0;
Expand All @@ -116,9 +119,8 @@ public MessageReceiver getReceiver() {
this.messageReceiver =
retryCallableManager.retryCallable(
this::createFlowReceiver, ImmutableSet.of(JCSMPException.class));
this.messageReceiver.start();
}
return checkStateNotNull(this.messageReceiver);
return this.messageReceiver;
}

@Override
Expand All @@ -136,10 +138,15 @@ public java.util.Queue<PublishResult> getPublishedResultsQueue() {
return publishedResultsQueue;
}

@Override
public boolean isClosed() {
return jcsmpSession == null || jcsmpSession.isClosed();
}

private MessageProducer createXMLMessageProducer(SubmissionMode submissionMode)
throws JCSMPException, IOException {

if (jcsmpSession == null) {
if (isClosed()) {
connectWriteSession(submissionMode);
}

Expand All @@ -158,6 +165,9 @@ private MessageProducer createXMLMessageProducer(SubmissionMode submissionMode)
}

private MessageReceiver createFlowReceiver() throws JCSMPException, IOException {
if (isClosed()) {
connectSession();
}

Queue queue =
JCSMPFactory.onlyInstance()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@ public interface MessageReceiver {
*/
void start();

/**
* Returns {@literal true} if the message receiver is closed, {@literal false} otherwise.
*
* <p>A message receiver is closed when it is no longer able to receive messages.
*/
boolean isClosed();

/**
* Receives a message from the broker.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,13 @@ public abstract class SessionService implements Serializable {
/** Gracefully closes the connection to the service. */
public abstract void close();

/**
* Checks whether the connection to the service is currently closed. This method is called when an
* `UnboundedSolaceReader` is starting to read messages - a session will be created if this
* returns true.
*/
public abstract boolean isClosed();

/**
* Returns a MessageReceiver object for receiving messages from Solace. If it is the first time
* this method is used, the receiver is created from the session instance, otherwise it returns
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,12 @@
import java.io.IOException;
import org.apache.beam.sdk.io.solace.RetryCallableManager;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SolaceMessageReceiver implements MessageReceiver {
private static final Logger LOG = LoggerFactory.getLogger(SolaceMessageReceiver.class);

public static final int DEFAULT_ADVANCE_TIMEOUT_IN_MILLIS = 100;
private final FlowReceiver flowReceiver;
private final RetryCallableManager retryCallableManager = RetryCallableManager.create();
Expand All @@ -48,21 +52,28 @@ private void startFlowReceiver() {
ImmutableSet.of(JCSMPException.class));
}

@Override
public boolean isClosed() {
return flowReceiver == null || flowReceiver.isClosed();
}

@Override
public BytesXMLMessage receive() throws IOException {
try {
return flowReceiver.receive(DEFAULT_ADVANCE_TIMEOUT_IN_MILLIS);
} catch (StaleSessionException e) {
LOG.warn("SolaceIO: Caught StaleSessionException, restarting the FlowReceiver.");
startFlowReceiver();
throw new IOException(
"SolaceIO: Caught StaleSessionException, restarting the FlowReceiver.", e);
throw new IOException(e);
} catch (JCSMPException e) {
throw new IOException(e);
}
}

@Override
public void close() {
flowReceiver.close();
if (!isClosed()) {
this.flowReceiver.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,17 @@
package org.apache.beam.sdk.io.solace.read;

import com.solacesystems.jcsmp.BytesXMLMessage;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Checkpoint for an unbounded Solace source. Consists of the Solace messages waiting to be
Expand All @@ -37,33 +38,36 @@
@Internal
@VisibleForTesting
public class SolaceCheckpointMark implements UnboundedSource.CheckpointMark {
private static final Logger LOG = LoggerFactory.getLogger(SolaceCheckpointMark.class);
private transient Queue<BytesXMLMessage> safeToAck;
private transient AtomicBoolean activeReader;
// BytesXMLMessage is not serializable so if a job restarts from the checkpoint, we cannot retry
// these messages here. We relay on Solace's retry mechanism.
private transient ArrayDeque<BytesXMLMessage> ackQueue;

@SuppressWarnings("initialization") // Avro will set the fields by breaking abstraction
private SolaceCheckpointMark() {}

/**
* Creates a new {@link SolaceCheckpointMark}.
*
* @param safeToAck - a queue of {@link BytesXMLMessage} to be acknowledged.
* @param activeReader {@link AtomicBoolean} indicating if the related reader is active. The
* reader creating the messages has to be active to acknowledge the messages.
* @param ackQueue {@link List} of {@link BytesXMLMessage} to be acknowledged.
*/
SolaceCheckpointMark(Queue<BytesXMLMessage> safeToAck) {
this.safeToAck = safeToAck;
SolaceCheckpointMark(AtomicBoolean activeReader, List<BytesXMLMessage> ackQueue) {
this.activeReader = activeReader;
this.ackQueue = new ArrayDeque<>(ackQueue);
}

@Override
public void finalizeCheckpoint() {
BytesXMLMessage msg;
while ((msg = safeToAck.poll()) != null) {
try {
if (activeReader == null || !activeReader.get() || ackQueue == null) {
return;
}

while (!ackQueue.isEmpty()) {
BytesXMLMessage msg = ackQueue.poll();
if (msg != null) {
msg.ackMessage();
} catch (IllegalStateException e) {
LOG.error(
"SolaceIO.Read: cannot acknowledge the message with applicationMessageId={}, ackMessageId={}. It will not be retried.",
msg.getApplicationMessageId(),
msg.getAckMessageId(),
e);
}
}
}
Expand All @@ -80,11 +84,15 @@ public boolean equals(@Nullable Object o) {
return false;
}
SolaceCheckpointMark that = (SolaceCheckpointMark) o;
return Objects.equals(safeToAck, that.safeToAck);
// Needed to convert to ArrayList because ArrayDeque.equals checks only for reference, not
// content.
ArrayList<BytesXMLMessage> ackList = new ArrayList<>(ackQueue);
ArrayList<BytesXMLMessage> thatAckList = new ArrayList<>(that.ackQueue);
return Objects.equals(activeReader, that.activeReader) && Objects.equals(ackList, thatAckList);
}

@Override
public int hashCode() {
return Objects.hash(safeToAck);
return Objects.hash(activeReader, ackQueue);
}
}
Loading

0 comments on commit 3a60f42

Please sign in to comment.