diff --git a/ldi-core/ldes-client/tree-node-supplier/src/main/java/ldes/client/treenodesupplier/TreeNodeProcessor.java b/ldi-core/ldes-client/tree-node-supplier/src/main/java/ldes/client/treenodesupplier/TreeNodeProcessor.java index 6d4465123..96b3054cf 100644 --- a/ldi-core/ldes-client/tree-node-supplier/src/main/java/ldes/client/treenodesupplier/TreeNodeProcessor.java +++ b/ldi-core/ldes-client/tree-node-supplier/src/main/java/ldes/client/treenodesupplier/TreeNodeProcessor.java @@ -107,18 +107,19 @@ private void saveNewRelations(TreeNodeResponse treeNodeResponse) { private TreeNodeRecord getNextTreeNode() { TreeNodeRecord treeNodeRecord = treeNodeRecordRepository .getTreeNodeRecordWithStatusAndEarliestNextVisit(TreeNodeStatus.IMMUTABLE_WITH_UNPROCESSED_MEMBERS) - .orElseGet(() -> treeNodeRecordRepository.getTreeNodeRecordWithStatusAndEarliestNextVisit(TreeNodeStatus.NOT_VISITED) - .orElseGet(() -> treeNodeRecordRepository.getTreeNodeRecordWithStatusAndEarliestNextVisit(TreeNodeStatus.MUTABLE_AND_ACTIVE) - .orElseThrow(() -> { - clientStatusConsumer.accept(COMPLETED); - return new EndOfLdesException("No fragments to mutable or new fragments to process -> LDES ends."); - }))); - - if (Objects.requireNonNull(treeNodeRecord.getTreeNodeStatus()) == TreeNodeStatus.IMMUTABLE_WITH_UNPROCESSED_MEMBERS || - treeNodeRecord.getTreeNodeStatus() == TreeNodeStatus.IMMUTABLE_WITHOUT_UNPROCESSED_MEMBERS || - treeNodeRecord.getTreeNodeStatus() == TreeNodeStatus.NOT_VISITED) { + .or(() -> treeNodeRecordRepository.getTreeNodeRecordWithStatusAndEarliestNextVisit(TreeNodeStatus.NOT_VISITED)) + .or(() -> treeNodeRecordRepository.getTreeNodeRecordWithStatusAndEarliestNextVisit(TreeNodeStatus.MUTABLE_AND_ACTIVE)) + .orElseThrow(() -> { + clientStatusConsumer.accept(COMPLETED); + return new EndOfLdesException("No fragments to mutable or new fragments to process -> LDES ends."); + }); + + TreeNodeStatus treeNodeStatus = Objects.requireNonNull(treeNodeRecord.getTreeNodeStatus()); + if (treeNodeStatus == TreeNodeStatus.IMMUTABLE_WITH_UNPROCESSED_MEMBERS || + treeNodeStatus == TreeNodeStatus.IMMUTABLE_WITHOUT_UNPROCESSED_MEMBERS || + treeNodeStatus == TreeNodeStatus.NOT_VISITED) { clientStatusConsumer.accept(REPLICATING); - } else if (treeNodeRecord.getTreeNodeStatus() == TreeNodeStatus.MUTABLE_AND_ACTIVE) { + } else if (treeNodeStatus == TreeNodeStatus.MUTABLE_AND_ACTIVE) { clientStatusConsumer.accept(SYNCHRONISING); } diff --git a/ldi-core/ldes-client/tree-node-supplier/src/main/java/ldes/client/treenodesupplier/domain/valueobject/TreeNodeStatus.java b/ldi-core/ldes-client/tree-node-supplier/src/main/java/ldes/client/treenodesupplier/domain/valueobject/TreeNodeStatus.java index 8bfceea0a..ba13a8623 100644 --- a/ldi-core/ldes-client/tree-node-supplier/src/main/java/ldes/client/treenodesupplier/domain/valueobject/TreeNodeStatus.java +++ b/ldi-core/ldes-client/tree-node-supplier/src/main/java/ldes/client/treenodesupplier/domain/valueobject/TreeNodeStatus.java @@ -4,5 +4,8 @@ * Representation how much of a TreeNode has been processed */ public enum TreeNodeStatus { - NOT_VISITED, MUTABLE_AND_ACTIVE, IMMUTABLE_WITH_UNPROCESSED_MEMBERS, IMMUTABLE_WITHOUT_UNPROCESSED_MEMBERS + NOT_VISITED, + MUTABLE_AND_ACTIVE, + IMMUTABLE_WITH_UNPROCESSED_MEMBERS, + IMMUTABLE_WITHOUT_UNPROCESSED_MEMBERS } diff --git a/ldi-orchestrator/ldio-common/src/main/java/be/vlaanderen/informatievlaanderen/ldes/ldio/pipeline/creation/LdioObserver.java b/ldi-orchestrator/ldio-common/src/main/java/be/vlaanderen/informatievlaanderen/ldes/ldio/pipeline/creation/LdioObserver.java index 1dc4d88d2..57acf27a6 100644 --- a/ldi-orchestrator/ldio-common/src/main/java/be/vlaanderen/informatievlaanderen/ldes/ldio/pipeline/creation/LdioObserver.java +++ b/ldi-orchestrator/ldio-common/src/main/java/be/vlaanderen/informatievlaanderen/ldes/ldio/pipeline/creation/LdioObserver.java @@ -17,6 +17,7 @@ */ public class LdioObserver { private static final String LDIO_DATA_IN = "ldio_data_in"; + private static final String LDIO_DATA_OUT = "ldio_data_out"; private static final String LDIO_COMPONENT_NAME = "ldio_type"; private static final Logger log = LoggerFactory.getLogger(LdioObserver.class); private final String componentName; @@ -69,6 +70,13 @@ public void increment() { Metrics.counter(LDIO_DATA_IN, PIPELINE_NAME, pipelineName, LDIO_COMPONENT_NAME, componentName).increment(); } + public boolean hasProcessedAllData() { + final double dataIn = Metrics.counter(LDIO_DATA_IN, PIPELINE_NAME, pipelineName, LDIO_COMPONENT_NAME, componentName).count(); + final double dataOut = Metrics.counter(LDIO_DATA_OUT, PIPELINE_NAME, pipelineName).count(); + log.atDebug().log("Received data: {} - Sent data: {}", dataIn, dataOut); + return dataOut >= dataIn; + } + /** * Registers a pipeline and the component and initializes the metrics * diff --git a/ldi-orchestrator/ldio-connectors/ldio-ldes-client/src/main/java/be/vlaanderen/informatievlaanderen/ldes/ldio/LdioLdesClient.java b/ldi-orchestrator/ldio-connectors/ldio-ldes-client/src/main/java/be/vlaanderen/informatievlaanderen/ldes/ldio/LdioLdesClient.java index fc78f6826..2a88cdeb3 100644 --- a/ldi-orchestrator/ldio-connectors/ldio-ldes-client/src/main/java/be/vlaanderen/informatievlaanderen/ldes/ldio/LdioLdesClient.java +++ b/ldi-orchestrator/ldio-connectors/ldio-ldes-client/src/main/java/be/vlaanderen/informatievlaanderen/ldes/ldio/LdioLdesClient.java @@ -13,18 +13,22 @@ import org.slf4j.LoggerFactory; import org.springframework.context.ApplicationEventPublisher; +import java.time.Duration; import java.util.concurrent.ExecutorService; +import java.util.function.Supplier; import static java.util.concurrent.Executors.newSingleThreadExecutor; public class LdioLdesClient extends LdioInput { public static final String NAME = "Ldio:LdesClient"; + public static final String LDIO_SHUTDOWN_THREAD_NAME = "ldio-ldes-client-shutdown"; private final Logger log = LoggerFactory.getLogger(LdioLdesClient.class); private final MemberSupplier memberSupplier; private boolean threadRunning = true; + private final Supplier canGracefullyShutdownChecker; private boolean paused = false; private final boolean keepState; private final String pipelineName; @@ -37,10 +41,11 @@ public LdioLdesClient(ComponentExecutor componentExecutor, boolean keepState, ClientStatusConsumer clientStatusConsumer) { super(componentExecutor, null, ldioObserver, applicationEventPublisher); this.pipelineName = ldioObserver.getPipelineName(); + this.canGracefullyShutdownChecker = ldioObserver::hasProcessedAllData; this.memberSupplier = memberSupplier; - this.keepState = keepState; + this.keepState = keepState; this.clientStatusConsumer = clientStatusConsumer; - } + } @Override public void start() { @@ -77,7 +82,7 @@ private synchronized void checkPause() { while (paused) { try { this.wait(); - } catch (InterruptedException e) { + } catch (InterruptedException e) { log.error("Thread interrupted: {}", e.getMessage()); Thread.currentThread().interrupt(); } @@ -104,7 +109,17 @@ protected void pause() { } private void shutdownPipeline() { - log.info("SHUTTING DOWN pipeline {} because end of LDES has been reached", pipelineName); - applicationEventPublisher.publishEvent(new PipelineShutdownEvent(pipelineName)); + Thread.ofVirtual().name(LDIO_SHUTDOWN_THREAD_NAME).start(() -> { + while (Boolean.FALSE.equals(canGracefullyShutdownChecker.get())) { + try { + Thread.sleep(Duration.ofSeconds(30)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + log.info("SHUTTING DOWN pipeline {} because end of LDES has been reached", pipelineName); + updateStatus(PipelineStatusTrigger.HALT); + applicationEventPublisher.publishEvent(new PipelineShutdownEvent(pipelineName)); + }); } } diff --git a/ldi-orchestrator/ldio-connectors/ldio-ldes-client/src/test/java/be/vlaanderen/informatievlaanderen/ldes/ldio/LdioLdesClientTest.java b/ldi-orchestrator/ldio-connectors/ldio-ldes-client/src/test/java/be/vlaanderen/informatievlaanderen/ldes/ldio/LdioLdesClientTest.java index 1aae66552..d400a2f54 100644 --- a/ldi-orchestrator/ldio-connectors/ldio-ldes-client/src/test/java/be/vlaanderen/informatievlaanderen/ldes/ldio/LdioLdesClientTest.java +++ b/ldi-orchestrator/ldio-connectors/ldio-ldes-client/src/test/java/be/vlaanderen/informatievlaanderen/ldes/ldio/LdioLdesClientTest.java @@ -13,6 +13,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.InOrder; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.springframework.context.ApplicationEventPublisher; @@ -25,60 +26,78 @@ @ExtendWith(MockitoExtension.class) class LdioLdesClientTest { - @Mock - private ComponentExecutor componentExecutor; + @Mock + private ComponentExecutor componentExecutor; - @Mock - private LdioObserver observer; + @Mock + private LdioObserver observer; - @Mock - private MemberSupplier supplier; + @Mock + private MemberSupplier supplier; - @Mock - private ApplicationEventPublisher eventPublisher; + @Mock + private ApplicationEventPublisher eventPublisher; - @Mock - private ClientStatusConsumer clientStatusConsumer; + @Mock + private ClientStatusConsumer clientStatusConsumer; - private LdioLdesClient client; - private final String pipelineName = "pipeline"; + private LdioLdesClient client; + private final String pipelineName = "pipeline"; - @BeforeEach - void setUp() { - when(observer.getPipelineName()).thenReturn(pipelineName); - client = new LdioLdesClient( - componentExecutor, - observer, - supplier, - eventPublisher, - false, - clientStatusConsumer); - } + @BeforeEach + void setUp() { + when(observer.getPipelineName()).thenReturn(pipelineName); + client = new LdioLdesClient( + componentExecutor, + observer, + supplier, + eventPublisher, + false, + clientStatusConsumer); + } - @AfterEach - void tearDown() { - client.shutdown(); - } + @AfterEach + void tearDown() { + client.shutdown(); + } - @Test - void when_EndOfLdesException_ShutdownPipeline() { - when(supplier.get()).thenThrow(EndOfLdesException.class); + @Test + void when_EndOfLdesException_And_AllDataProcessed_ShutdownPipeline() { + when(observer.hasProcessedAllData()).thenReturn(true); + when(supplier.get()).thenThrow(EndOfLdesException.class); - client.start(); + client.start(); - verify(eventPublisher).publishEvent(new PipelineShutdownEvent(pipelineName)); - } + InOrder inOrder = inOrder(eventPublisher); + inOrder.verify(eventPublisher).publishEvent(new PipelineStatusEvent(pipelineName, PipelineStatus.RUNNING, StatusChangeSource.MANUAL)); + inOrder.verify(eventPublisher).publishEvent(new PipelineStatusEvent(pipelineName, PipelineStatus.HALTED, StatusChangeSource.MANUAL)); + inOrder.verify(eventPublisher).publishEvent(new PipelineShutdownEvent(pipelineName)); + } - @Test - void when_RuntimeException_StopPipeline() { - doThrow(RuntimeException.class).when(supplier).init(); + @Test + void when_EndOfLdesException_ShutdownPipeline() { + when(observer.hasProcessedAllData()).thenReturn(false).thenReturn(true); + when(supplier.get()).thenThrow(EndOfLdesException.class); - client.start(); + client.start(); + verify(eventPublisher).publishEvent(new PipelineStatusEvent(pipelineName, PipelineStatus.RUNNING, StatusChangeSource.MANUAL)); + + await().atMost(Duration.ofSeconds(40)).untilAsserted(() -> { + verify(eventPublisher).publishEvent(new PipelineStatusEvent(pipelineName, PipelineStatus.HALTED, StatusChangeSource.MANUAL)); + verify(eventPublisher).publishEvent(new PipelineShutdownEvent(pipelineName)); + }); + } + + @Test + void when_RuntimeException_StopPipeline() { + doThrow(RuntimeException.class).when(supplier).init(); + + client.start(); await().atMost(Duration.ofSeconds(20)).untilAsserted(() -> { verify(eventPublisher).publishEvent(new PipelineStatusEvent(pipelineName, PipelineStatus.RUNNING, StatusChangeSource.MANUAL)); verify(eventPublisher).publishEvent(new PipelineStatusEvent(pipelineName, PipelineStatus.HALTED, StatusChangeSource.MANUAL)); }); - } + } } \ No newline at end of file