Skip to content

Commit

Permalink
fix: unexpected end of ldes behavior (#687)
Browse files Browse the repository at this point in the history
  • Loading branch information
jobulcke authored Oct 3, 2024
1 parent 0240955 commit 735753d
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -107,18 +107,19 @@ private void saveNewRelations(TreeNodeResponse treeNodeResponse) {
private TreeNodeRecord getNextTreeNode() {
TreeNodeRecord treeNodeRecord = treeNodeRecordRepository
.getTreeNodeRecordWithStatusAndEarliestNextVisit(TreeNodeStatus.IMMUTABLE_WITH_UNPROCESSED_MEMBERS)
.orElseGet(() -> treeNodeRecordRepository.getTreeNodeRecordWithStatusAndEarliestNextVisit(TreeNodeStatus.NOT_VISITED)
.orElseGet(() -> treeNodeRecordRepository.getTreeNodeRecordWithStatusAndEarliestNextVisit(TreeNodeStatus.MUTABLE_AND_ACTIVE)
.orElseThrow(() -> {
clientStatusConsumer.accept(COMPLETED);
return new EndOfLdesException("No fragments to mutable or new fragments to process -> LDES ends.");
})));

if (Objects.requireNonNull(treeNodeRecord.getTreeNodeStatus()) == TreeNodeStatus.IMMUTABLE_WITH_UNPROCESSED_MEMBERS ||
treeNodeRecord.getTreeNodeStatus() == TreeNodeStatus.IMMUTABLE_WITHOUT_UNPROCESSED_MEMBERS ||
treeNodeRecord.getTreeNodeStatus() == TreeNodeStatus.NOT_VISITED) {
.or(() -> treeNodeRecordRepository.getTreeNodeRecordWithStatusAndEarliestNextVisit(TreeNodeStatus.NOT_VISITED))
.or(() -> treeNodeRecordRepository.getTreeNodeRecordWithStatusAndEarliestNextVisit(TreeNodeStatus.MUTABLE_AND_ACTIVE))
.orElseThrow(() -> {
clientStatusConsumer.accept(COMPLETED);
return new EndOfLdesException("No fragments to mutable or new fragments to process -> LDES ends.");
});

TreeNodeStatus treeNodeStatus = Objects.requireNonNull(treeNodeRecord.getTreeNodeStatus());
if (treeNodeStatus == TreeNodeStatus.IMMUTABLE_WITH_UNPROCESSED_MEMBERS ||
treeNodeStatus == TreeNodeStatus.IMMUTABLE_WITHOUT_UNPROCESSED_MEMBERS ||
treeNodeStatus == TreeNodeStatus.NOT_VISITED) {
clientStatusConsumer.accept(REPLICATING);
} else if (treeNodeRecord.getTreeNodeStatus() == TreeNodeStatus.MUTABLE_AND_ACTIVE) {
} else if (treeNodeStatus == TreeNodeStatus.MUTABLE_AND_ACTIVE) {
clientStatusConsumer.accept(SYNCHRONISING);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,8 @@
* Representation how much of a TreeNode has been processed
*/
public enum TreeNodeStatus {
NOT_VISITED, MUTABLE_AND_ACTIVE, IMMUTABLE_WITH_UNPROCESSED_MEMBERS, IMMUTABLE_WITHOUT_UNPROCESSED_MEMBERS
NOT_VISITED,
MUTABLE_AND_ACTIVE,
IMMUTABLE_WITH_UNPROCESSED_MEMBERS,
IMMUTABLE_WITHOUT_UNPROCESSED_MEMBERS
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
public class LdioObserver {
private static final String LDIO_DATA_IN = "ldio_data_in";
private static final String LDIO_DATA_OUT = "ldio_data_out";
private static final String LDIO_COMPONENT_NAME = "ldio_type";
private static final Logger log = LoggerFactory.getLogger(LdioObserver.class);
private final String componentName;
Expand Down Expand Up @@ -69,6 +70,13 @@ public void increment() {
Metrics.counter(LDIO_DATA_IN, PIPELINE_NAME, pipelineName, LDIO_COMPONENT_NAME, componentName).increment();
}

public boolean hasProcessedAllData() {
final double dataIn = Metrics.counter(LDIO_DATA_IN, PIPELINE_NAME, pipelineName, LDIO_COMPONENT_NAME, componentName).count();
final double dataOut = Metrics.counter(LDIO_DATA_OUT, PIPELINE_NAME, pipelineName).count();
log.atDebug().log("Received data: {} - Sent data: {}", dataIn, dataOut);
return dataOut >= dataIn;
}

/**
* Registers a pipeline and the component and initializes the metrics
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,22 @@
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationEventPublisher;

import java.time.Duration;
import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;

import static java.util.concurrent.Executors.newSingleThreadExecutor;

public class LdioLdesClient extends LdioInput {

public static final String NAME = "Ldio:LdesClient";
public static final String LDIO_SHUTDOWN_THREAD_NAME = "ldio-ldes-client-shutdown";

private final Logger log = LoggerFactory.getLogger(LdioLdesClient.class);

private final MemberSupplier memberSupplier;
private boolean threadRunning = true;
private final Supplier<Boolean> canGracefullyShutdownChecker;
private boolean paused = false;
private final boolean keepState;
private final String pipelineName;
Expand All @@ -37,10 +41,11 @@ public LdioLdesClient(ComponentExecutor componentExecutor,
boolean keepState, ClientStatusConsumer clientStatusConsumer) {
super(componentExecutor, null, ldioObserver, applicationEventPublisher);
this.pipelineName = ldioObserver.getPipelineName();
this.canGracefullyShutdownChecker = ldioObserver::hasProcessedAllData;
this.memberSupplier = memberSupplier;
this.keepState = keepState;
this.keepState = keepState;
this.clientStatusConsumer = clientStatusConsumer;
}
}

@Override
public void start() {
Expand Down Expand Up @@ -77,7 +82,7 @@ private synchronized void checkPause() {
while (paused) {
try {
this.wait();
} catch (InterruptedException e) {
} catch (InterruptedException e) {
log.error("Thread interrupted: {}", e.getMessage());
Thread.currentThread().interrupt();
}
Expand All @@ -104,7 +109,17 @@ protected void pause() {
}

private void shutdownPipeline() {
log.info("SHUTTING DOWN pipeline {} because end of LDES has been reached", pipelineName);
applicationEventPublisher.publishEvent(new PipelineShutdownEvent(pipelineName));
Thread.ofVirtual().name(LDIO_SHUTDOWN_THREAD_NAME).start(() -> {
while (Boolean.FALSE.equals(canGracefullyShutdownChecker.get())) {
try {
Thread.sleep(Duration.ofSeconds(30));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
log.info("SHUTTING DOWN pipeline {} because end of LDES has been reached", pipelineName);
updateStatus(PipelineStatusTrigger.HALT);
applicationEventPublisher.publishEvent(new PipelineShutdownEvent(pipelineName));
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.context.ApplicationEventPublisher;
Expand All @@ -25,60 +26,78 @@
@ExtendWith(MockitoExtension.class)
class LdioLdesClientTest {

@Mock
private ComponentExecutor componentExecutor;
@Mock
private ComponentExecutor componentExecutor;

@Mock
private LdioObserver observer;
@Mock
private LdioObserver observer;

@Mock
private MemberSupplier supplier;
@Mock
private MemberSupplier supplier;

@Mock
private ApplicationEventPublisher eventPublisher;
@Mock
private ApplicationEventPublisher eventPublisher;

@Mock
private ClientStatusConsumer clientStatusConsumer;
@Mock
private ClientStatusConsumer clientStatusConsumer;

private LdioLdesClient client;
private final String pipelineName = "pipeline";
private LdioLdesClient client;
private final String pipelineName = "pipeline";

@BeforeEach
void setUp() {
when(observer.getPipelineName()).thenReturn(pipelineName);
client = new LdioLdesClient(
componentExecutor,
observer,
supplier,
eventPublisher,
false,
clientStatusConsumer);
}
@BeforeEach
void setUp() {
when(observer.getPipelineName()).thenReturn(pipelineName);
client = new LdioLdesClient(
componentExecutor,
observer,
supplier,
eventPublisher,
false,
clientStatusConsumer);
}

@AfterEach
void tearDown() {
client.shutdown();
}
@AfterEach
void tearDown() {
client.shutdown();
}

@Test
void when_EndOfLdesException_ShutdownPipeline() {
when(supplier.get()).thenThrow(EndOfLdesException.class);
@Test
void when_EndOfLdesException_And_AllDataProcessed_ShutdownPipeline() {
when(observer.hasProcessedAllData()).thenReturn(true);
when(supplier.get()).thenThrow(EndOfLdesException.class);

client.start();
client.start();

verify(eventPublisher).publishEvent(new PipelineShutdownEvent(pipelineName));
}
InOrder inOrder = inOrder(eventPublisher);
inOrder.verify(eventPublisher).publishEvent(new PipelineStatusEvent(pipelineName, PipelineStatus.RUNNING, StatusChangeSource.MANUAL));
inOrder.verify(eventPublisher).publishEvent(new PipelineStatusEvent(pipelineName, PipelineStatus.HALTED, StatusChangeSource.MANUAL));
inOrder.verify(eventPublisher).publishEvent(new PipelineShutdownEvent(pipelineName));
}

@Test
void when_RuntimeException_StopPipeline() {
doThrow(RuntimeException.class).when(supplier).init();
@Test
void when_EndOfLdesException_ShutdownPipeline() {
when(observer.hasProcessedAllData()).thenReturn(false).thenReturn(true);
when(supplier.get()).thenThrow(EndOfLdesException.class);

client.start();
client.start();
verify(eventPublisher).publishEvent(new PipelineStatusEvent(pipelineName, PipelineStatus.RUNNING, StatusChangeSource.MANUAL));

await().atMost(Duration.ofSeconds(40)).untilAsserted(() -> {
verify(eventPublisher).publishEvent(new PipelineStatusEvent(pipelineName, PipelineStatus.HALTED, StatusChangeSource.MANUAL));
verify(eventPublisher).publishEvent(new PipelineShutdownEvent(pipelineName));
});
}

@Test
void when_RuntimeException_StopPipeline() {
doThrow(RuntimeException.class).when(supplier).init();

client.start();

await().atMost(Duration.ofSeconds(20)).untilAsserted(() -> {
verify(eventPublisher).publishEvent(new PipelineStatusEvent(pipelineName, PipelineStatus.RUNNING, StatusChangeSource.MANUAL));
verify(eventPublisher).publishEvent(new PipelineStatusEvent(pipelineName, PipelineStatus.HALTED, StatusChangeSource.MANUAL));
});

}
}
}

0 comments on commit 735753d

Please sign in to comment.