Skip to content

Commit

Permalink
Implement cache for storing the session service and implement an evic…
Browse files Browse the repository at this point in the history
…tion strategy and close services more eagerly.
  • Loading branch information
bzablocki committed Nov 19, 2024
1 parent ebce33f commit 42a0cbd
Show file tree
Hide file tree
Showing 7 changed files with 70 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,7 @@ public void close() {
if (messageReceiver != null) {
messageReceiver.close();
}
if (messageProducer != null) {
messageProducer.close();
}
if (!isClosed()) {
if (jcsmpSession != null) {
checkStateNotNull(jcsmpSession).closeSession();
}
return 0;
Expand All @@ -119,8 +116,9 @@ public MessageReceiver getReceiver() {
this.messageReceiver =
retryCallableManager.retryCallable(
this::createFlowReceiver, ImmutableSet.of(JCSMPException.class));
this.messageReceiver.start();
}
return this.messageReceiver;
return checkStateNotNull(this.messageReceiver);
}

@Override
Expand All @@ -138,15 +136,10 @@ public java.util.Queue<PublishResult> getPublishedResultsQueue() {
return publishedResultsQueue;
}

@Override
public boolean isClosed() {
return jcsmpSession == null || jcsmpSession.isClosed();
}

private MessageProducer createXMLMessageProducer(SubmissionMode submissionMode)
throws JCSMPException, IOException {

if (isClosed()) {
if (jcsmpSession == null) {
connectWriteSession(submissionMode);
}

Expand All @@ -165,9 +158,6 @@ private MessageProducer createXMLMessageProducer(SubmissionMode submissionMode)
}

private MessageReceiver createFlowReceiver() throws JCSMPException, IOException {
if (isClosed()) {
connectSession();
}

Queue queue =
JCSMPFactory.onlyInstance()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,6 @@ public interface MessageReceiver {
*/
void start();

/**
* Returns {@literal true} if the message receiver is closed, {@literal false} otherwise.
*
* <p>A message receiver is closed when it is no longer able to receive messages.
*/
boolean isClosed();

/**
* Receives a message from the broker.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,13 +120,6 @@ public abstract class SessionService implements Serializable {
/** Gracefully closes the connection to the service. */
public abstract void close();

/**
* Checks whether the connection to the service is currently closed. This method is called when an
* `UnboundedSolaceReader` is starting to read messages - a session will be created if this
* returns true.
*/
public abstract boolean isClosed();

/**
* Returns a MessageReceiver object for receiving messages from Solace. If it is the first time
* this method is used, the receiver is created from the session instance, otherwise it returns
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,8 @@
import java.io.IOException;
import org.apache.beam.sdk.io.solace.RetryCallableManager;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SolaceMessageReceiver implements MessageReceiver {
private static final Logger LOG = LoggerFactory.getLogger(SolaceMessageReceiver.class);

public static final int DEFAULT_ADVANCE_TIMEOUT_IN_MILLIS = 100;
private final FlowReceiver flowReceiver;
private final RetryCallableManager retryCallableManager = RetryCallableManager.create();
Expand All @@ -52,28 +48,21 @@ private void startFlowReceiver() {
ImmutableSet.of(JCSMPException.class));
}

@Override
public boolean isClosed() {
return flowReceiver == null || flowReceiver.isClosed();
}

@Override
public BytesXMLMessage receive() throws IOException {
try {
return flowReceiver.receive(DEFAULT_ADVANCE_TIMEOUT_IN_MILLIS);
} catch (StaleSessionException e) {
LOG.warn("SolaceIO: Caught StaleSessionException, restarting the FlowReceiver.");
startFlowReceiver();
throw new IOException(e);
throw new IOException(
"SolaceIO: Caught StaleSessionException, restarting the FlowReceiver.", e);
} catch (JCSMPException e) {
throw new IOException(e);
}
}

@Override
public void close() {
if (!isClosed()) {
this.flowReceiver.close();
}
flowReceiver.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,29 @@
import com.solacesystems.jcsmp.BytesXMLMessage;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
import org.apache.beam.sdk.io.solace.broker.SempClient;
import org.apache.beam.sdk.io.solace.broker.SessionService;
import org.apache.beam.sdk.io.solace.broker.SessionServiceFactory;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.RemovalNotification;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Instant;
Expand All @@ -48,9 +59,10 @@ class UnboundedSolaceReader<T> extends UnboundedReader<T> {
private final UnboundedSolaceSource<T> currentSource;
private final WatermarkPolicy<T> watermarkPolicy;
private final SempClient sempClient;
private final UUID readerUuid;
private final SessionServiceFactory sessionServiceFactory;
private @Nullable BytesXMLMessage solaceOriginalRecord;
private @Nullable T solaceMappedRecord;
private @Nullable SessionService sessionService;
AtomicBoolean active = new AtomicBoolean(true);

/**
Expand All @@ -72,41 +84,76 @@ class UnboundedSolaceReader<T> extends UnboundedReader<T> {
*/
private Long surrogateId = 0L;

private static final Cache<UUID, SessionService> sessionServiceCache;
private static final ScheduledExecutorService cleanUpThread = Executors.newScheduledThreadPool(1);

static {
Duration cacheExpirationTimeout = Duration.ofMinutes(1);
sessionServiceCache =
CacheBuilder.newBuilder()
.expireAfterAccess(cacheExpirationTimeout)
.removalListener(
(RemovalNotification<UUID, SessionService> notification) -> {
LOG.info(
"SolaceIO.Read: Closing session for the reader with uuid {} as it has been idle for over {}.",
notification.getKey(),
cacheExpirationTimeout);
SessionService sessionService = notification.getValue();
if (sessionService != null) {
sessionService.close();
}
})
.build();

ScheduledFuture<?> scheduledFuture =
cleanUpThread.scheduleAtFixedRate(sessionServiceCache::cleanUp, 1, 1, TimeUnit.MINUTES);

LOG.info("bzablockilog is scheduled future done? {}", scheduledFuture.isDone());
}

public UnboundedSolaceReader(UnboundedSolaceSource<T> currentSource) {
this.currentSource = currentSource;
this.watermarkPolicy =
WatermarkPolicy.create(
currentSource.getTimestampFn(), currentSource.getWatermarkIdleDurationThreshold());
this.sessionService = currentSource.getSessionServiceFactory().create();
this.sessionServiceFactory = currentSource.getSessionServiceFactory();
this.sempClient = currentSource.getSempClientFactory().create();
this.safeToAckMessages = new HashMap<>();
this.ackedMessageIds = new ConcurrentLinkedQueue<>();
this.readerUuid = UUID.randomUUID();
}

private SessionService getSessionService() {
try {
return sessionServiceCache.get(
readerUuid,
() -> {
LOG.info("SolaceIO.Read: creating a new session for reader with uuid {}.", readerUuid);
SessionService sessionService = sessionServiceFactory.create();
sessionService.connect();
sessionService.getReceiver().start();
return sessionService;
});
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
}

@Override
public boolean start() {
populateSession();
checkNotNull(sessionService).getReceiver().start();
// Create and initialize SessionService with Receiver
getSessionService();
return advance();
}

public void populateSession() {
if (sessionService == null) {
sessionService = getCurrentSource().getSessionServiceFactory().create();
}
if (sessionService.isClosed()) {
checkNotNull(sessionService).connect();
}
}

@Override
public boolean advance() {
// Retire state associated with ACKed messages.
retire();

BytesXMLMessage receivedXmlMessage;
try {
receivedXmlMessage = checkNotNull(sessionService).getReceiver().receive();
receivedXmlMessage = getSessionService().getReceiver().receive();
} catch (IOException e) {
LOG.warn("SolaceIO.Read: Exception when pulling messages from the broker.", e);
return false;
Expand All @@ -126,13 +173,13 @@ public boolean advance() {
@Override
public void close() {
active.set(false);
checkNotNull(sessionService).close();
sessionServiceCache.invalidate(readerUuid);
}

@Override
public Instant getWatermark() {
// should be only used by a test receiver
if (checkNotNull(sessionService).getReceiver().isEOF()) {
if (getSessionService().getReceiver().isEOF()) {
return BoundedWindow.TIMESTAMP_MAX_VALUE;
}
return watermarkPolicy.getWatermark();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,6 @@ public void close() {
throw new UnsupportedOperationException(exceptionMessage);
}

@Override
public boolean isClosed() {
throw new UnsupportedOperationException(exceptionMessage);
}

@Override
public MessageReceiver getReceiver() {
throw new UnsupportedOperationException(exceptionMessage);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,6 @@ public abstract Builder mockProducerFn(
@Override
public void close() {}

@Override
public boolean isClosed() {
return false;
}

@Override
public MessageReceiver getReceiver() {
if (messageReceiver == null) {
Expand Down Expand Up @@ -131,11 +126,6 @@ public MockReceiver(
@Override
public void start() {}

@Override
public boolean isClosed() {
return false;
}

@Override
public BytesXMLMessage receive() throws IOException {
return getRecordFn.apply(counter.getAndIncrement());
Expand Down

0 comments on commit 42a0cbd

Please sign in to comment.