Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "SolaceIO.Read: handle occasional cases when finalizeCheckpoint is not executed (#32962)" #33259

Merged
merged 1 commit into from
Dec 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,10 @@ public void close() {
if (messageReceiver != null) {
messageReceiver.close();
}
if (jcsmpSession != null) {
if (messageProducer != null) {
messageProducer.close();
}
if (!isClosed()) {
checkStateNotNull(jcsmpSession).closeSession();
}
return 0;
Expand All @@ -116,9 +119,8 @@ public MessageReceiver getReceiver() {
this.messageReceiver =
retryCallableManager.retryCallable(
this::createFlowReceiver, ImmutableSet.of(JCSMPException.class));
this.messageReceiver.start();
}
return checkStateNotNull(this.messageReceiver);
return this.messageReceiver;
}

@Override
Expand All @@ -136,10 +138,15 @@ public java.util.Queue<PublishResult> getPublishedResultsQueue() {
return publishedResultsQueue;
}

@Override
public boolean isClosed() {
return jcsmpSession == null || jcsmpSession.isClosed();
}

private MessageProducer createXMLMessageProducer(SubmissionMode submissionMode)
throws JCSMPException, IOException {

if (jcsmpSession == null) {
if (isClosed()) {
connectWriteSession(submissionMode);
}

Expand All @@ -158,6 +165,9 @@ private MessageProducer createXMLMessageProducer(SubmissionMode submissionMode)
}

private MessageReceiver createFlowReceiver() throws JCSMPException, IOException {
if (isClosed()) {
connectSession();
}

Queue queue =
JCSMPFactory.onlyInstance()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@ public interface MessageReceiver {
*/
void start();

/**
* Returns {@literal true} if the message receiver is closed, {@literal false} otherwise.
*
* <p>A message receiver is closed when it is no longer able to receive messages.
*/
boolean isClosed();

/**
* Receives a message from the broker.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,13 @@ public abstract class SessionService implements Serializable {
/** Gracefully closes the connection to the service. */
public abstract void close();

/**
* Checks whether the connection to the service is currently closed. This method is called when an
* `UnboundedSolaceReader` is starting to read messages - a session will be created if this
* returns true.
*/
public abstract boolean isClosed();

/**
* Returns a MessageReceiver object for receiving messages from Solace. If it is the first time
* this method is used, the receiver is created from the session instance, otherwise it returns
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,12 @@
import java.io.IOException;
import org.apache.beam.sdk.io.solace.RetryCallableManager;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SolaceMessageReceiver implements MessageReceiver {
private static final Logger LOG = LoggerFactory.getLogger(SolaceMessageReceiver.class);

public static final int DEFAULT_ADVANCE_TIMEOUT_IN_MILLIS = 100;
private final FlowReceiver flowReceiver;
private final RetryCallableManager retryCallableManager = RetryCallableManager.create();
Expand All @@ -48,21 +52,28 @@ private void startFlowReceiver() {
ImmutableSet.of(JCSMPException.class));
}

@Override
public boolean isClosed() {
return flowReceiver == null || flowReceiver.isClosed();
}

@Override
public BytesXMLMessage receive() throws IOException {
try {
return flowReceiver.receive(DEFAULT_ADVANCE_TIMEOUT_IN_MILLIS);
} catch (StaleSessionException e) {
LOG.warn("SolaceIO: Caught StaleSessionException, restarting the FlowReceiver.");
startFlowReceiver();
throw new IOException(
"SolaceIO: Caught StaleSessionException, restarting the FlowReceiver.", e);
throw new IOException(e);
} catch (JCSMPException e) {
throw new IOException(e);
}
}

@Override
public void close() {
flowReceiver.close();
if (!isClosed()) {
this.flowReceiver.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,17 @@
package org.apache.beam.sdk.io.solace.read;

import com.solacesystems.jcsmp.BytesXMLMessage;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Checkpoint for an unbounded Solace source. Consists of the Solace messages waiting to be
Expand All @@ -37,33 +38,36 @@
@Internal
@VisibleForTesting
public class SolaceCheckpointMark implements UnboundedSource.CheckpointMark {
private static final Logger LOG = LoggerFactory.getLogger(SolaceCheckpointMark.class);
private transient Queue<BytesXMLMessage> safeToAck;
private transient AtomicBoolean activeReader;
// BytesXMLMessage is not serializable so if a job restarts from the checkpoint, we cannot retry
// these messages here. We relay on Solace's retry mechanism.
private transient ArrayDeque<BytesXMLMessage> ackQueue;

@SuppressWarnings("initialization") // Avro will set the fields by breaking abstraction
private SolaceCheckpointMark() {}

/**
* Creates a new {@link SolaceCheckpointMark}.
*
* @param safeToAck - a queue of {@link BytesXMLMessage} to be acknowledged.
* @param activeReader {@link AtomicBoolean} indicating if the related reader is active. The
* reader creating the messages has to be active to acknowledge the messages.
* @param ackQueue {@link List} of {@link BytesXMLMessage} to be acknowledged.
*/
SolaceCheckpointMark(Queue<BytesXMLMessage> safeToAck) {
this.safeToAck = safeToAck;
SolaceCheckpointMark(AtomicBoolean activeReader, List<BytesXMLMessage> ackQueue) {
this.activeReader = activeReader;
this.ackQueue = new ArrayDeque<>(ackQueue);
}

@Override
public void finalizeCheckpoint() {
BytesXMLMessage msg;
while ((msg = safeToAck.poll()) != null) {
try {
if (activeReader == null || !activeReader.get() || ackQueue == null) {
return;
}

while (!ackQueue.isEmpty()) {
BytesXMLMessage msg = ackQueue.poll();
if (msg != null) {
msg.ackMessage();
} catch (IllegalStateException e) {
LOG.error(
"SolaceIO.Read: cannot acknowledge the message with applicationMessageId={}, ackMessageId={}. It will not be retried.",
msg.getApplicationMessageId(),
msg.getAckMessageId(),
e);
}
}
}
Expand All @@ -80,11 +84,15 @@ public boolean equals(@Nullable Object o) {
return false;
}
SolaceCheckpointMark that = (SolaceCheckpointMark) o;
return Objects.equals(safeToAck, that.safeToAck);
// Needed to convert to ArrayList because ArrayDeque.equals checks only for reference, not
// content.
ArrayList<BytesXMLMessage> ackList = new ArrayList<>(ackQueue);
ArrayList<BytesXMLMessage> thatAckList = new ArrayList<>(that.ackQueue);
return Objects.equals(activeReader, that.activeReader) && Objects.equals(ackList, thatAckList);
}

@Override
public int hashCode() {
return Objects.hash(safeToAck);
return Objects.hash(activeReader, ackQueue);
}
}
Loading
Loading