From f952ed4eeef97cd5beb131b483447ac2e92a0195 Mon Sep 17 00:00:00 2001 From: Bartosz Zablocki Date: Tue, 29 Oct 2024 18:28:57 +0100 Subject: [PATCH] SolaceIO.Read: Make sure the messageReceiver is always initialized in a case of a random connection drop --- .../io/solace/read/UnboundedSolaceReader.java | 27 +++++++++++-------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java index c18a9d110b2a..cd06e25b83a2 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java @@ -71,36 +71,39 @@ public UnboundedSolaceReader(UnboundedSolaceSource currentSource) { @Override public boolean start() { - populateSession(); - populateMessageConsumer(); + getOrInitializeSessionService(); + getOrInitializeMessageReceiver(); return advance(); } - public void populateSession() { + public SessionService getOrInitializeSessionService() { if (sessionService == null) { sessionService = getCurrentSource().getSessionServiceFactory().create(); } - if (sessionService.isClosed()) { - checkNotNull(sessionService).connect(); + SessionService service = checkNotNull(sessionService); + if (service.isClosed()) { + service.connect(); } + return service; } - private void populateMessageConsumer() { + private MessageReceiver getOrInitializeMessageReceiver() { if (messageReceiver == null) { - messageReceiver = checkNotNull(sessionService).createReceiver(); + messageReceiver = getOrInitializeSessionService().createReceiver(); messageReceiver.start(); } MessageReceiver receiver = checkNotNull(messageReceiver); if (receiver.isClosed()) { receiver.start(); } + return receiver; } @Override public boolean advance() { BytesXMLMessage receivedXmlMessage; try { - receivedXmlMessage = checkNotNull(messageReceiver).receive(); + receivedXmlMessage = getOrInitializeMessageReceiver().receive(); } catch (IOException e) { LOG.warn("SolaceIO.Read: Exception when pulling messages from the broker.", e); return false; @@ -119,13 +122,15 @@ public boolean advance() { @Override public void close() { active.set(false); - checkNotNull(sessionService).close(); + if (sessionService != null && !sessionService.isClosed()) { + checkNotNull(sessionService).close(); + } } @Override public Instant getWatermark() { - // should be only used by a test receiver - if (checkNotNull(messageReceiver).isEOF()) { + // should be only used by a test receiver in unit/integration tests + if (getOrInitializeMessageReceiver().isEOF()) { return BoundedWindow.TIMESTAMP_MAX_VALUE; } return watermarkPolicy.getWatermark();