Skip to content

Commit

Permalink
[kie-issues-509] Handle SLA timers during process instance migration
Browse files Browse the repository at this point in the history
  • Loading branch information
elguardian committed Aug 29, 2023
1 parent 591f13a commit 3930973
Showing 1 changed file with 74 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,21 @@

package org.jbpm.runtime.manager.impl.migration;

import static java.util.Arrays.asList;
import static java.util.Collections.emptyList;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;

import javax.persistence.EntityManager;
import javax.persistence.EntityManagerFactory;
Expand Down Expand Up @@ -142,12 +148,18 @@ public MigrationReport migrate(Map<String, String> nodeMapping) {
boolean migrateExecutorJobs = ((SimpleRuntimeEnvironment)currentManager.getEnvironment()).getEnvironmentTemplate().get("ExecutorService") != null;
validate(migrateExecutorJobs);
Map<Long, List<TimerInstance>> timerMigrated = null;
Map<Long, List<TimerInstance>> stateBasedTimer = null;
Map<Long, List<TimerInstance>> slaTimerMigrated = null;
Map<Long, List<TimerInstance>> humanTaskSuspended = null;
try {

// collect and cancel any active timers before migration
timerMigrated = cancelActiveTimersBeforeMigration(currentManager);
timerMigrated = cancelActiveTimersBeforeMigration(currentManager, TimerNodeInstance.class, active -> asList(active.getTimerId()));
stateBasedTimer = cancelActiveTimersBeforeMigration(currentManager, StateBasedNodeInstance.class, active -> active.getTimerInstances());
slaTimerMigrated = cancelActiveTimersBeforeMigration(currentManager, org.jbpm.workflow.instance.impl.NodeInstanceImpl.class, active -> asList(active.getSlaTimerId()));
humanTaskSuspended = cancelActiveTimersBeforeMigration(currentManager, HumanTaskNodeInstance.class, active -> asList((active.getSuspendUntilTimerId())));

// start transaction to secure consistency of the migration
// start transaction to secure consistency of the migration
txm = TransactionManagerFactory.get().newTransactionManager(currentManager.getEnvironment().getEnvironment());
transactionOwner = txm.begin();

Expand Down Expand Up @@ -254,9 +266,14 @@ public MigrationReport migrate(Map<String, String> nodeMapping) {
tobe = toBeManager.getEnvironment().getKieBase().newKieSession();
upgradeProcessInstance(current, tobe, migrationSpec.getProcessInstanceId(), migrationSpec.getToProcessId(), nodeMapping, em, toBeManager.getIdentifier());

if (!timerMigrated.isEmpty()) {
rescheduleTimersAfterMigration(toBeManager, timerMigrated);
}

// reschedule timers
rescheduleTimersAfterMigration(toBeManager, TimerNodeInstance.class, timerMigrated, (active, timers) -> active.internalSetTimerId(toSingletonTimerId(timers)));
rescheduleTimersAfterMigration(toBeManager, StateBasedNodeInstance.class, stateBasedTimer, (active, timers) -> active.internalSetTimerInstances(timers.stream().map(TimerInstance::getId).collect(Collectors.toList())));
rescheduleTimersAfterMigration(toBeManager, org.jbpm.workflow.instance.impl.NodeInstanceImpl.class, slaTimerMigrated, (active, timers) -> active.internalSetSlaTimerId(toSingletonTimerId(timers)));
rescheduleTimersAfterMigration(toBeManager, HumanTaskNodeInstance.class, humanTaskSuspended, (active, timers) -> active.setSuspendUntilTimerId(toSingletonTimerId(timers)));


em.flush();
} finally {
em.clear();
Expand All @@ -270,10 +287,12 @@ public MigrationReport migrate(Map<String, String> nodeMapping) {
} catch (Throwable e) {
txm.rollback(transactionOwner);
logger.error("Unexpected error during migration", e);
// put back timers (if there are any) in case of rollback
if (timerMigrated != null && !timerMigrated.isEmpty()) {
rescheduleTimersAfterMigration(currentManager, timerMigrated);
}

rescheduleTimersAfterMigration(toBeManager, TimerNodeInstance.class, timerMigrated, (active, timers) -> active.internalSetTimerId(toSingletonTimerId(timers)));
rescheduleTimersAfterMigration(toBeManager, StateBasedNodeInstance.class, stateBasedTimer, (active, timers) -> active.internalSetTimerInstances(timers.stream().map(TimerInstance::getId).collect(Collectors.toList())));
rescheduleTimersAfterMigration(toBeManager, org.jbpm.workflow.instance.impl.NodeInstanceImpl.class, slaTimerMigrated, (active, timers) -> active.internalSetSlaTimerId(toSingletonTimerId(timers)));
rescheduleTimersAfterMigration(toBeManager, HumanTaskNodeInstance.class, humanTaskSuspended, (active, timers) -> active.setSuspendUntilTimerId(toSingletonTimerId(timers)));

report.addEntry(Type.ERROR, "Migration of process instance (" + migrationSpec.getProcessInstanceId() + ") failed due to " + e.getMessage());

} finally {
Expand All @@ -297,6 +316,10 @@ public MigrationReport migrate(Map<String, String> nodeMapping) {

return report;
}

private Long toSingletonTimerId(List<TimerInstance> timerInstances) {
return (timerInstances.isEmpty()) ? -1 : timerInstances.get(0).getId();
}

private void validate(boolean migrateExecutorJobs) {
if (migrationSpec == null) {
Expand Down Expand Up @@ -356,7 +379,7 @@ private void validate(boolean migrateExecutorJobs) {
if (migrateExecutorJobs) {
List<Long> executorJobs = (List<Long>) em.createQuery("select id FROM RequestInfo ri WHERE ri.processInstanceId = :processInstanceId and ri.status in (:statuses)")
.setParameter("processInstanceId", migrationSpec.getProcessInstanceId())
.setParameter("statuses", Arrays.asList(STATUS.QUEUED, STATUS.RETRYING, STATUS.RUNNING))
.setParameter("statuses", asList(STATUS.QUEUED, STATUS.RETRYING, STATUS.RUNNING))
.getResultList();

if (!executorJobs.isEmpty()) {
Expand Down Expand Up @@ -569,7 +592,7 @@ protected TimerManager getTimerManager(KieSession ksession) {
return ((InternalProcessRuntime) ((StatefulKnowledgeSessionImpl) internal).getProcessRuntime()).getTimerManager();
}

protected Map<Long, List<TimerInstance>> cancelActiveTimersBeforeMigration(RuntimeManager manager) {
protected <T extends NodeInstance> Map<Long, List<TimerInstance>> cancelActiveTimersBeforeMigration(RuntimeManager manager, Class<T> type, Function<T, List<Long>> getTimerInstances ) {
RuntimeEngine engineBefore = manager.getRuntimeEngine(ProcessInstanceIdContext.get(migrationSpec.getProcessInstanceId()));
try {
Map<Long, List<TimerInstance>> timerMigrated = engineBefore.getKieSession().execute(new ExecutableCommand<Map<Long, List<TimerInstance>>>() {
Expand All @@ -589,28 +612,26 @@ public Map<Long, List<TimerInstance>> execute(Context context) {
Collection<org.jbpm.workflow.instance.NodeInstance> activeInstances = processInstance.getNodeInstances(true);

for (org.jbpm.workflow.instance.NodeInstance active : activeInstances) {
if (active instanceof TimerNodeInstance) {
TimerInstance timerInstance = timerManager.getTimerMap().get(((TimerNodeInstance) active).getTimerId());

timerManager.cancelTimer(processInstance.getId(), timerInstance.getId());
result.put(active.getId(), Arrays.asList(timerInstance));
} else if (active instanceof StateBasedNodeInstance) {
List<Long> timers = ((StateBasedNodeInstance) active).getTimerInstances();

if (timers != null && !timers.isEmpty()) {
List<TimerInstance> collected = new ArrayList<>();
for (Long timerId : timers) {
TimerInstance timerInstance = timerManager.getTimerMap().get(timerId);
if (timerInstance==null) {
report.addEntry(Type.WARN, "Could not find timer instance with id "+timerId+" to cancel.");
continue;
}
timerManager.cancelTimer(processInstance.getId(), timerInstance.getId());
collected.add(timerInstance);
}
result.put(active.getId(), collected);
if (!type.isAssignableFrom(active.getClass())) {
continue;
}

List<TimerInstance> collected = new ArrayList<>();
List<Long> timers = getTimerInstances.apply(type.cast(active));
for (Long timerId : timers) {
if (timerId == -1) {
continue;
}

TimerInstance timerInstance = timerManager.getTimerMap().get(timerId);
if (timerInstance == null) {
report.addEntry(Type.WARN, "Could not find timer instance with id " + timerId + " to cancel.");
continue;
}
timerManager.cancelTimer(processInstance.getId(), timerInstance.getId());
collected.add(timerInstance);
}
result.put(active.getId(), collected);
}

return result;
Expand All @@ -623,12 +644,19 @@ public Map<Long, List<TimerInstance>> execute(Context context) {
}
}

protected void rescheduleTimersAfterMigration(RuntimeManager manager, Map<Long, List<TimerInstance>> timerMigrated) {



protected <T extends NodeInstance> void rescheduleTimersAfterMigration(RuntimeManager manager, Class<T> nodeType, Map<Long, List<TimerInstance>> timersToMigrate, BiConsumer<T, List<TimerInstance>> timerMigrated) {
if(timersToMigrate.isEmpty()) {
return;
}

RuntimeEngine engine = manager.getRuntimeEngine(ProcessInstanceIdContext.get(migrationSpec.getProcessInstanceId()));
try {
engine.getKieSession().execute(new ExecutableCommand<Void>() {

private static final long serialVersionUID = 7144657913971146080L;
private static final long serialVersionUID = 7144357923971146089L;

@Override
public Void execute(Context context) {
Expand All @@ -637,35 +665,25 @@ public Void execute(Context context) {

WorkflowProcessInstanceImpl processInstance = (WorkflowProcessInstanceImpl) kieSession.getProcessInstance(migrationSpec.getProcessInstanceId());

for (Entry<Long, List<TimerInstance>> entry : timerMigrated.entrySet()) {
for (Entry<Long, List<TimerInstance>> entry : timersToMigrate.entrySet()) {

org.jbpm.workflow.instance.NodeInstance active = processInstance.getNodeInstance(entry.getKey(), true);
if (active instanceof TimerNodeInstance) {
TimerInstance timerInstance = entry.getValue().get(0);

if(!nodeType.isAssignableFrom(active.getClass())) {
continue;
}

List<TimerInstance> timerInstances = entry.getValue();
for (TimerInstance timerInstance : timerInstances) {
long delay = timerInstance.getDelay() - (System.currentTimeMillis() - timerInstance.getActivated().getTime());
timerInstance.setDelay(delay);

updateBasedOnTrigger(timerInstance);

timerManager.registerTimer(timerInstance, processInstance);
((TimerNodeInstance) active).internalSetTimerId(timerInstance.getId());
} else if (active instanceof StateBasedNodeInstance) {

List<TimerInstance> timerInstances = entry.getValue();
List<Long> timers = new ArrayList<>();
for (TimerInstance timerInstance : timerInstances) {
long delay = timerInstance.getDelay() - (System.currentTimeMillis() - timerInstance.getActivated().getTime());
timerInstance.setDelay(delay);

updateBasedOnTrigger(timerInstance);

timerManager.registerTimer(timerInstance, processInstance);
timers.add(timerInstance.getId());
}
((StateBasedNodeInstance) active).internalSetTimerInstances(timers);

timerManager.registerTimer(timerInstance, processInstance);
}

timerMigrated.accept(nodeType.cast(active), timerInstances);

}

return null;
Expand Down Expand Up @@ -697,4 +715,4 @@ protected void updateBasedOnTrigger(TimerInstance timerInstance) {
}
}

}
}

0 comments on commit 3930973

Please sign in to comment.