diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionService.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionService.java index b2196dbf1067..d4c9a3ec6210 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionService.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionService.java @@ -102,10 +102,7 @@ public void close() { if (messageReceiver != null) { messageReceiver.close(); } - if (messageProducer != null) { - messageProducer.close(); - } - if (!isClosed()) { + if (jcsmpSession != null) { checkStateNotNull(jcsmpSession).closeSession(); } return 0; @@ -119,8 +116,9 @@ public MessageReceiver getReceiver() { this.messageReceiver = retryCallableManager.retryCallable( this::createFlowReceiver, ImmutableSet.of(JCSMPException.class)); + this.messageReceiver.start(); } - return this.messageReceiver; + return checkStateNotNull(this.messageReceiver); } @Override @@ -138,15 +136,10 @@ public java.util.Queue getPublishedResultsQueue() { return publishedResultsQueue; } - @Override - public boolean isClosed() { - return jcsmpSession == null || jcsmpSession.isClosed(); - } - private MessageProducer createXMLMessageProducer(SubmissionMode submissionMode) throws JCSMPException, IOException { - if (isClosed()) { + if (jcsmpSession == null) { connectWriteSession(submissionMode); } @@ -165,9 +158,6 @@ private MessageProducer createXMLMessageProducer(SubmissionMode submissionMode) } private MessageReceiver createFlowReceiver() throws JCSMPException, IOException { - if (isClosed()) { - connectSession(); - } Queue queue = JCSMPFactory.onlyInstance() diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/MessageReceiver.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/MessageReceiver.java index 95f989bd1be9..017a63260678 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/MessageReceiver.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/MessageReceiver.java @@ -35,13 +35,6 @@ public interface MessageReceiver { */ void start(); - /** - * Returns {@literal true} if the message receiver is closed, {@literal false} otherwise. - * - *

A message receiver is closed when it is no longer able to receive messages. - */ - boolean isClosed(); - /** * Receives a message from the broker. * diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionService.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionService.java index 84a876a9d0bc..6dcd0b652616 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionService.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionService.java @@ -120,13 +120,6 @@ public abstract class SessionService implements Serializable { /** Gracefully closes the connection to the service. */ public abstract void close(); - /** - * Checks whether the connection to the service is currently closed. This method is called when an - * `UnboundedSolaceReader` is starting to read messages - a session will be created if this - * returns true. - */ - public abstract boolean isClosed(); - /** * Returns a MessageReceiver object for receiving messages from Solace. If it is the first time * this method is used, the receiver is created from the session instance, otherwise it returns diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SolaceMessageReceiver.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SolaceMessageReceiver.java index d548d2049a5b..d74f3cae89fe 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SolaceMessageReceiver.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SolaceMessageReceiver.java @@ -24,12 +24,8 @@ import java.io.IOException; import org.apache.beam.sdk.io.solace.RetryCallableManager; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class SolaceMessageReceiver implements MessageReceiver { - private static final Logger LOG = LoggerFactory.getLogger(SolaceMessageReceiver.class); - public static final int DEFAULT_ADVANCE_TIMEOUT_IN_MILLIS = 100; private final FlowReceiver flowReceiver; private final RetryCallableManager retryCallableManager = RetryCallableManager.create(); @@ -52,19 +48,14 @@ private void startFlowReceiver() { ImmutableSet.of(JCSMPException.class)); } - @Override - public boolean isClosed() { - return flowReceiver == null || flowReceiver.isClosed(); - } - @Override public BytesXMLMessage receive() throws IOException { try { return flowReceiver.receive(DEFAULT_ADVANCE_TIMEOUT_IN_MILLIS); } catch (StaleSessionException e) { - LOG.warn("SolaceIO: Caught StaleSessionException, restarting the FlowReceiver."); startFlowReceiver(); - throw new IOException(e); + throw new IOException( + "SolaceIO: Caught StaleSessionException, restarting the FlowReceiver.", e); } catch (JCSMPException e) { throw new IOException(e); } @@ -72,8 +63,6 @@ public BytesXMLMessage receive() throws IOException { @Override public void close() { - if (!isClosed()) { - this.flowReceiver.close(); - } + flowReceiver.close(); } } diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/SolaceCheckpointMark.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/SolaceCheckpointMark.java index 77f6eed8f62c..a913fd6133ea 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/SolaceCheckpointMark.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/SolaceCheckpointMark.java @@ -18,17 +18,16 @@ package org.apache.beam.sdk.io.solace.read; import com.solacesystems.jcsmp.BytesXMLMessage; -import java.util.ArrayDeque; -import java.util.ArrayList; -import java.util.List; import java.util.Objects; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.Queue; import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.coders.DefaultCoder; import org.apache.beam.sdk.extensions.avro.coders.AvroCoder; import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.checkerframework.checker.nullness.qual.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Checkpoint for an unbounded Solace source. Consists of the Solace messages waiting to be @@ -38,10 +37,8 @@ @Internal @VisibleForTesting public class SolaceCheckpointMark implements UnboundedSource.CheckpointMark { - private transient AtomicBoolean activeReader; - // BytesXMLMessage is not serializable so if a job restarts from the checkpoint, we cannot retry - // these messages here. We relay on Solace's retry mechanism. - private transient ArrayDeque ackQueue; + private static final Logger LOG = LoggerFactory.getLogger(SolaceCheckpointMark.class); + private transient Queue safeToAck; @SuppressWarnings("initialization") // Avro will set the fields by breaking abstraction private SolaceCheckpointMark() {} @@ -49,25 +46,24 @@ private SolaceCheckpointMark() {} /** * Creates a new {@link SolaceCheckpointMark}. * - * @param activeReader {@link AtomicBoolean} indicating if the related reader is active. The - * reader creating the messages has to be active to acknowledge the messages. - * @param ackQueue {@link List} of {@link BytesXMLMessage} to be acknowledged. + * @param safeToAck - a queue of {@link BytesXMLMessage} to be acknowledged. */ - SolaceCheckpointMark(AtomicBoolean activeReader, List ackQueue) { - this.activeReader = activeReader; - this.ackQueue = new ArrayDeque<>(ackQueue); + SolaceCheckpointMark(Queue safeToAck) { + this.safeToAck = safeToAck; } @Override public void finalizeCheckpoint() { - if (activeReader == null || !activeReader.get() || ackQueue == null) { - return; - } - - while (!ackQueue.isEmpty()) { - BytesXMLMessage msg = ackQueue.poll(); - if (msg != null) { + BytesXMLMessage msg; + while ((msg = safeToAck.poll()) != null) { + try { msg.ackMessage(); + } catch (IllegalStateException e) { + LOG.error( + "SolaceIO.Read: cannot acknowledge the message with applicationMessageId={}, ackMessageId={}. It will not be retried.", + msg.getApplicationMessageId(), + msg.getAckMessageId(), + e); } } } @@ -84,15 +80,11 @@ public boolean equals(@Nullable Object o) { return false; } SolaceCheckpointMark that = (SolaceCheckpointMark) o; - // Needed to convert to ArrayList because ArrayDeque.equals checks only for reference, not - // content. - ArrayList ackList = new ArrayList<>(ackQueue); - ArrayList thatAckList = new ArrayList<>(that.ackQueue); - return Objects.equals(activeReader, that.activeReader) && Objects.equals(ackList, thatAckList); + return Objects.equals(safeToAck, that.safeToAck); } @Override public int hashCode() { - return Objects.hash(activeReader, ackQueue); + return Objects.hash(safeToAck); } } diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java index a421970370da..dc84e0a07017 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java @@ -22,17 +22,26 @@ import com.solacesystems.jcsmp.BytesXMLMessage; import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.time.Duration; import java.util.ArrayDeque; -import java.util.ArrayList; -import java.util.List; import java.util.NoSuchElementException; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.Queue; +import java.util.UUID; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader; import org.apache.beam.sdk.io.solace.broker.SempClient; import org.apache.beam.sdk.io.solace.broker.SessionService; +import org.apache.beam.sdk.io.solace.broker.SessionServiceFactory; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.RemovalNotification; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Instant; import org.slf4j.Logger; @@ -46,48 +55,92 @@ class UnboundedSolaceReader extends UnboundedReader { private final UnboundedSolaceSource currentSource; private final WatermarkPolicy watermarkPolicy; private final SempClient sempClient; + private final UUID readerUuid; + private final SessionServiceFactory sessionServiceFactory; private @Nullable BytesXMLMessage solaceOriginalRecord; private @Nullable T solaceMappedRecord; - private @Nullable SessionService sessionService; - AtomicBoolean active = new AtomicBoolean(true); /** - * Queue to place advanced messages before {@link #getCheckpointMark()} be called non-concurrent - * queue, should only be accessed by the reader thread A given {@link UnboundedReader} object will - * only be accessed by a single thread at once. + * Queue to place advanced messages before {@link #getCheckpointMark()} is called. CAUTION: + * Accessed by both reader and checkpointing threads. */ - private final java.util.Queue elementsToCheckpoint = new ArrayDeque<>(); + private final Queue safeToAckMessages = new ConcurrentLinkedQueue<>(); + + /** + * Queue for messages that were ingested in the {@link #advance()} method, but not sent yet to a + * {@link SolaceCheckpointMark}. + */ + private final Queue receivedMessages = new ArrayDeque<>(); + + private static final Cache sessionServiceCache; + private static final ScheduledExecutorService cleanUpThread = Executors.newScheduledThreadPool(1); + + static { + Duration cacheExpirationTimeout = Duration.ofMinutes(1); + sessionServiceCache = + CacheBuilder.newBuilder() + .expireAfterAccess(cacheExpirationTimeout) + .removalListener( + (RemovalNotification notification) -> { + LOG.info( + "SolaceIO.Read: Closing session for the reader with uuid {} as it has been idle for over {}.", + notification.getKey(), + cacheExpirationTimeout); + SessionService sessionService = notification.getValue(); + if (sessionService != null) { + sessionService.close(); + } + }) + .build(); + + startCleanUpThread(); + } + + @SuppressWarnings("FutureReturnValueIgnored") + private static void startCleanUpThread() { + cleanUpThread.scheduleAtFixedRate(sessionServiceCache::cleanUp, 1, 1, TimeUnit.MINUTES); + } public UnboundedSolaceReader(UnboundedSolaceSource currentSource) { this.currentSource = currentSource; this.watermarkPolicy = WatermarkPolicy.create( currentSource.getTimestampFn(), currentSource.getWatermarkIdleDurationThreshold()); - this.sessionService = currentSource.getSessionServiceFactory().create(); + this.sessionServiceFactory = currentSource.getSessionServiceFactory(); this.sempClient = currentSource.getSempClientFactory().create(); + this.readerUuid = UUID.randomUUID(); + } + + private SessionService getSessionService() { + try { + return sessionServiceCache.get( + readerUuid, + () -> { + LOG.info("SolaceIO.Read: creating a new session for reader with uuid {}.", readerUuid); + SessionService sessionService = sessionServiceFactory.create(); + sessionService.connect(); + sessionService.getReceiver().start(); + return sessionService; + }); + } catch (ExecutionException e) { + throw new RuntimeException(e); + } } @Override public boolean start() { - populateSession(); - checkNotNull(sessionService).getReceiver().start(); + // Create and initialize SessionService with Receiver + getSessionService(); return advance(); } - public void populateSession() { - if (sessionService == null) { - sessionService = getCurrentSource().getSessionServiceFactory().create(); - } - if (sessionService.isClosed()) { - checkNotNull(sessionService).connect(); - } - } - @Override public boolean advance() { + finalizeReadyMessages(); + BytesXMLMessage receivedXmlMessage; try { - receivedXmlMessage = checkNotNull(sessionService).getReceiver().receive(); + receivedXmlMessage = getSessionService().getReceiver().receive(); } catch (IOException e) { LOG.warn("SolaceIO.Read: Exception when pulling messages from the broker.", e); return false; @@ -96,23 +149,40 @@ public boolean advance() { if (receivedXmlMessage == null) { return false; } - elementsToCheckpoint.add(receivedXmlMessage); solaceOriginalRecord = receivedXmlMessage; solaceMappedRecord = getCurrentSource().getParseFn().apply(receivedXmlMessage); - watermarkPolicy.update(solaceMappedRecord); + receivedMessages.add(receivedXmlMessage); + return true; } @Override public void close() { - active.set(false); - checkNotNull(sessionService).close(); + finalizeReadyMessages(); + sessionServiceCache.invalidate(readerUuid); + } + + public void finalizeReadyMessages() { + BytesXMLMessage msg; + while ((msg = safeToAckMessages.poll()) != null) { + try { + msg.ackMessage(); + } catch (IllegalStateException e) { + LOG.error( + "SolaceIO.Read: failed to acknowledge the message with applicationMessageId={}, ackMessageId={}. Returning the message to queue to retry.", + msg.getApplicationMessageId(), + msg.getAckMessageId(), + e); + safeToAckMessages.add(msg); // In case the error was transient, might succeed later + break; // Commit is only best effort + } + } } @Override public Instant getWatermark() { // should be only used by a test receiver - if (checkNotNull(sessionService).getReceiver().isEOF()) { + if (getSessionService().getReceiver().isEOF()) { return BoundedWindow.TIMESTAMP_MAX_VALUE; } return watermarkPolicy.getWatermark(); @@ -120,14 +190,9 @@ public Instant getWatermark() { @Override public UnboundedSource.CheckpointMark getCheckpointMark() { - List ackQueue = new ArrayList<>(); - while (!elementsToCheckpoint.isEmpty()) { - BytesXMLMessage msg = elementsToCheckpoint.poll(); - if (msg != null) { - ackQueue.add(msg); - } - } - return new SolaceCheckpointMark(active, ackQueue); + safeToAckMessages.addAll(receivedMessages); + receivedMessages.clear(); + return new SolaceCheckpointMark(safeToAckMessages); } @Override diff --git a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockEmptySessionService.java b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockEmptySessionService.java index 38b4953a5984..7631d32f63cc 100644 --- a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockEmptySessionService.java +++ b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockEmptySessionService.java @@ -40,11 +40,6 @@ public void close() { throw new UnsupportedOperationException(exceptionMessage); } - @Override - public boolean isClosed() { - throw new UnsupportedOperationException(exceptionMessage); - } - @Override public MessageReceiver getReceiver() { throw new UnsupportedOperationException(exceptionMessage); diff --git a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockSessionService.java b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockSessionService.java index bd52dee7ea86..6d28bcefc84c 100644 --- a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockSessionService.java +++ b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockSessionService.java @@ -77,11 +77,6 @@ public abstract Builder mockProducerFn( @Override public void close() {} - @Override - public boolean isClosed() { - return false; - } - @Override public MessageReceiver getReceiver() { if (messageReceiver == null) { @@ -131,11 +126,6 @@ public MockReceiver( @Override public void start() {} - @Override - public boolean isClosed() { - return false; - } - @Override public BytesXMLMessage receive() throws IOException { return getRecordFn.apply(counter.getAndIncrement()); diff --git a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/SolaceIOReadTest.java b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/SolaceIOReadTest.java index c718c55e1b48..a1f80932eddf 100644 --- a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/SolaceIOReadTest.java +++ b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/SolaceIOReadTest.java @@ -447,25 +447,29 @@ public void testCheckpointMarkAndFinalizeSeparately() throws Exception { // start the reader and move to the first record assertTrue(reader.start()); - // consume 3 messages (NB: start already consumed the first message) + // consume 3 messages (NB: #start() already consumed the first message) for (int i = 0; i < 3; i++) { assertTrue(String.format("Failed at %d-th message", i), reader.advance()); } - // create checkpoint but don't finalize yet + // #advance() was called, but the messages were not ready to be acknowledged. + assertEquals(0, countAckMessages.get()); + + // mark all consumed messages as ready to be acknowledged CheckpointMark checkpointMark = reader.getCheckpointMark(); - // consume 2 more messages - reader.advance(); + // consume 1 more message. This will call #ackMsg() on messages that were ready to be acked. reader.advance(); + assertEquals(4, countAckMessages.get()); - // check if messages are still not acknowledged - assertEquals(0, countAckMessages.get()); + // consume 1 more message. No change in the acknowledged messages. + reader.advance(); + assertEquals(4, countAckMessages.get()); // acknowledge from the first checkpoint checkpointMark.finalizeCheckpoint(); - - // only messages from the first checkpoint are acknowledged + // No change in the acknowledged messages, because they were acknowledged in the #advance() + // method. assertEquals(4, countAckMessages.get()); }