Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: unexpected end of ldes behavior #687

Merged
merged 3 commits into from
Oct 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -107,18 +107,19 @@ private void saveNewRelations(TreeNodeResponse treeNodeResponse) {
private TreeNodeRecord getNextTreeNode() {
TreeNodeRecord treeNodeRecord = treeNodeRecordRepository
.getTreeNodeRecordWithStatusAndEarliestNextVisit(TreeNodeStatus.IMMUTABLE_WITH_UNPROCESSED_MEMBERS)
.orElseGet(() -> treeNodeRecordRepository.getTreeNodeRecordWithStatusAndEarliestNextVisit(TreeNodeStatus.NOT_VISITED)
.orElseGet(() -> treeNodeRecordRepository.getTreeNodeRecordWithStatusAndEarliestNextVisit(TreeNodeStatus.MUTABLE_AND_ACTIVE)
.orElseThrow(() -> {
clientStatusConsumer.accept(COMPLETED);
return new EndOfLdesException("No fragments to mutable or new fragments to process -> LDES ends.");
})));

if (Objects.requireNonNull(treeNodeRecord.getTreeNodeStatus()) == TreeNodeStatus.IMMUTABLE_WITH_UNPROCESSED_MEMBERS ||
treeNodeRecord.getTreeNodeStatus() == TreeNodeStatus.IMMUTABLE_WITHOUT_UNPROCESSED_MEMBERS ||
treeNodeRecord.getTreeNodeStatus() == TreeNodeStatus.NOT_VISITED) {
.or(() -> treeNodeRecordRepository.getTreeNodeRecordWithStatusAndEarliestNextVisit(TreeNodeStatus.NOT_VISITED))
.or(() -> treeNodeRecordRepository.getTreeNodeRecordWithStatusAndEarliestNextVisit(TreeNodeStatus.MUTABLE_AND_ACTIVE))
.orElseThrow(() -> {
clientStatusConsumer.accept(COMPLETED);
return new EndOfLdesException("No fragments to mutable or new fragments to process -> LDES ends.");
});

TreeNodeStatus treeNodeStatus = Objects.requireNonNull(treeNodeRecord.getTreeNodeStatus());
if (treeNodeStatus == TreeNodeStatus.IMMUTABLE_WITH_UNPROCESSED_MEMBERS ||
treeNodeStatus == TreeNodeStatus.IMMUTABLE_WITHOUT_UNPROCESSED_MEMBERS ||
treeNodeStatus == TreeNodeStatus.NOT_VISITED) {
clientStatusConsumer.accept(REPLICATING);
} else if (treeNodeRecord.getTreeNodeStatus() == TreeNodeStatus.MUTABLE_AND_ACTIVE) {
} else if (treeNodeStatus == TreeNodeStatus.MUTABLE_AND_ACTIVE) {
clientStatusConsumer.accept(SYNCHRONISING);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,8 @@
* Representation how much of a TreeNode has been processed
*/
public enum TreeNodeStatus {
NOT_VISITED, MUTABLE_AND_ACTIVE, IMMUTABLE_WITH_UNPROCESSED_MEMBERS, IMMUTABLE_WITHOUT_UNPROCESSED_MEMBERS
NOT_VISITED,
MUTABLE_AND_ACTIVE,
IMMUTABLE_WITH_UNPROCESSED_MEMBERS,
IMMUTABLE_WITHOUT_UNPROCESSED_MEMBERS
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
public class LdioObserver {
private static final String LDIO_DATA_IN = "ldio_data_in";
private static final String LDIO_DATA_OUT = "ldio_data_out";
private static final String LDIO_COMPONENT_NAME = "ldio_type";
private static final Logger log = LoggerFactory.getLogger(LdioObserver.class);
private final String componentName;
Expand Down Expand Up @@ -69,6 +70,13 @@ public void increment() {
Metrics.counter(LDIO_DATA_IN, PIPELINE_NAME, pipelineName, LDIO_COMPONENT_NAME, componentName).increment();
}

public boolean hasProcessedAllData() {
final double dataIn = Metrics.counter(LDIO_DATA_IN, PIPELINE_NAME, pipelineName, LDIO_COMPONENT_NAME, componentName).count();
final double dataOut = Metrics.counter(LDIO_DATA_OUT, PIPELINE_NAME, pipelineName).count();
log.atDebug().log("Received data: {} - Sent data: {}", dataIn, dataOut);
return dataOut >= dataIn;
}

/**
* Registers a pipeline and the component and initializes the metrics
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,22 @@
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationEventPublisher;

import java.time.Duration;
import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;

import static java.util.concurrent.Executors.newSingleThreadExecutor;

public class LdioLdesClient extends LdioInput {

public static final String NAME = "Ldio:LdesClient";
public static final String LDIO_SHUTDOWN_THREAD_NAME = "ldio-ldes-client-shutdown";

private final Logger log = LoggerFactory.getLogger(LdioLdesClient.class);

private final MemberSupplier memberSupplier;
private boolean threadRunning = true;
private final Supplier<Boolean> canGracefullyShutdownChecker;
private boolean paused = false;
private final boolean keepState;
private final String pipelineName;
Expand All @@ -37,10 +41,11 @@ public LdioLdesClient(ComponentExecutor componentExecutor,
boolean keepState, ClientStatusConsumer clientStatusConsumer) {
super(componentExecutor, null, ldioObserver, applicationEventPublisher);
this.pipelineName = ldioObserver.getPipelineName();
this.canGracefullyShutdownChecker = ldioObserver::hasProcessedAllData;
this.memberSupplier = memberSupplier;
this.keepState = keepState;
this.keepState = keepState;
this.clientStatusConsumer = clientStatusConsumer;
}
}

@Override
public void start() {
Expand Down Expand Up @@ -77,7 +82,7 @@ private synchronized void checkPause() {
while (paused) {
try {
this.wait();
} catch (InterruptedException e) {
} catch (InterruptedException e) {
log.error("Thread interrupted: {}", e.getMessage());
Thread.currentThread().interrupt();
}
Expand All @@ -104,7 +109,17 @@ protected void pause() {
}

private void shutdownPipeline() {
log.info("SHUTTING DOWN pipeline {} because end of LDES has been reached", pipelineName);
applicationEventPublisher.publishEvent(new PipelineShutdownEvent(pipelineName));
Thread.ofVirtual().name(LDIO_SHUTDOWN_THREAD_NAME).start(() -> {
while (Boolean.FALSE.equals(canGracefullyShutdownChecker.get())) {
try {
Thread.sleep(Duration.ofSeconds(30));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
log.info("SHUTTING DOWN pipeline {} because end of LDES has been reached", pipelineName);
updateStatus(PipelineStatusTrigger.HALT);
applicationEventPublisher.publishEvent(new PipelineShutdownEvent(pipelineName));
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.context.ApplicationEventPublisher;
Expand All @@ -25,60 +26,78 @@
@ExtendWith(MockitoExtension.class)
class LdioLdesClientTest {

@Mock
private ComponentExecutor componentExecutor;
@Mock
private ComponentExecutor componentExecutor;

@Mock
private LdioObserver observer;
@Mock
private LdioObserver observer;

@Mock
private MemberSupplier supplier;
@Mock
private MemberSupplier supplier;

@Mock
private ApplicationEventPublisher eventPublisher;
@Mock
private ApplicationEventPublisher eventPublisher;

@Mock
private ClientStatusConsumer clientStatusConsumer;
@Mock
private ClientStatusConsumer clientStatusConsumer;

private LdioLdesClient client;
private final String pipelineName = "pipeline";
private LdioLdesClient client;
private final String pipelineName = "pipeline";

@BeforeEach
void setUp() {
when(observer.getPipelineName()).thenReturn(pipelineName);
client = new LdioLdesClient(
componentExecutor,
observer,
supplier,
eventPublisher,
false,
clientStatusConsumer);
}
@BeforeEach
void setUp() {
when(observer.getPipelineName()).thenReturn(pipelineName);
client = new LdioLdesClient(
componentExecutor,
observer,
supplier,
eventPublisher,
false,
clientStatusConsumer);
}

@AfterEach
void tearDown() {
client.shutdown();
}
@AfterEach
void tearDown() {
client.shutdown();
}

@Test
void when_EndOfLdesException_ShutdownPipeline() {
when(supplier.get()).thenThrow(EndOfLdesException.class);
@Test
void when_EndOfLdesException_And_AllDataProcessed_ShutdownPipeline() {
when(observer.hasProcessedAllData()).thenReturn(true);
when(supplier.get()).thenThrow(EndOfLdesException.class);

client.start();
client.start();

verify(eventPublisher).publishEvent(new PipelineShutdownEvent(pipelineName));
}
InOrder inOrder = inOrder(eventPublisher);
inOrder.verify(eventPublisher).publishEvent(new PipelineStatusEvent(pipelineName, PipelineStatus.RUNNING, StatusChangeSource.MANUAL));
inOrder.verify(eventPublisher).publishEvent(new PipelineStatusEvent(pipelineName, PipelineStatus.HALTED, StatusChangeSource.MANUAL));
inOrder.verify(eventPublisher).publishEvent(new PipelineShutdownEvent(pipelineName));
}

@Test
void when_RuntimeException_StopPipeline() {
doThrow(RuntimeException.class).when(supplier).init();
@Test
void when_EndOfLdesException_ShutdownPipeline() {
when(observer.hasProcessedAllData()).thenReturn(false).thenReturn(true);
when(supplier.get()).thenThrow(EndOfLdesException.class);

client.start();
client.start();
verify(eventPublisher).publishEvent(new PipelineStatusEvent(pipelineName, PipelineStatus.RUNNING, StatusChangeSource.MANUAL));

await().atMost(Duration.ofSeconds(40)).untilAsserted(() -> {
verify(eventPublisher).publishEvent(new PipelineStatusEvent(pipelineName, PipelineStatus.HALTED, StatusChangeSource.MANUAL));
verify(eventPublisher).publishEvent(new PipelineShutdownEvent(pipelineName));
});
}

@Test
void when_RuntimeException_StopPipeline() {
doThrow(RuntimeException.class).when(supplier).init();

client.start();

await().atMost(Duration.ofSeconds(20)).untilAsserted(() -> {
verify(eventPublisher).publishEvent(new PipelineStatusEvent(pipelineName, PipelineStatus.RUNNING, StatusChangeSource.MANUAL));
verify(eventPublisher).publishEvent(new PipelineStatusEvent(pipelineName, PipelineStatus.HALTED, StatusChangeSource.MANUAL));
});

}
}
}
Loading