From 3930973448fa870b56516cbdb1d2a07a1939b3e6 Mon Sep 17 00:00:00 2001 From: Enrique Gonzalez Martinez Date: Tue, 29 Aug 2023 09:41:13 +0200 Subject: [PATCH] [kie-issues-509] Handle SLA timers during process instance migration --- .../impl/migration/MigrationManager.java | 130 ++++++++++-------- 1 file changed, 74 insertions(+), 56 deletions(-) diff --git a/jbpm-runtime-manager/src/main/java/org/jbpm/runtime/manager/impl/migration/MigrationManager.java b/jbpm-runtime-manager/src/main/java/org/jbpm/runtime/manager/impl/migration/MigrationManager.java index 9e131161b5..a91e3fad14 100644 --- a/jbpm-runtime-manager/src/main/java/org/jbpm/runtime/manager/impl/migration/MigrationManager.java +++ b/jbpm-runtime-manager/src/main/java/org/jbpm/runtime/manager/impl/migration/MigrationManager.java @@ -16,8 +16,10 @@ package org.jbpm.runtime.manager.impl.migration; +import static java.util.Arrays.asList; +import static java.util.Collections.emptyList; + import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.Date; import java.util.HashMap; @@ -25,6 +27,10 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.stream.Collectors; import javax.persistence.EntityManager; import javax.persistence.EntityManagerFactory; @@ -142,12 +148,18 @@ public MigrationReport migrate(Map nodeMapping) { boolean migrateExecutorJobs = ((SimpleRuntimeEnvironment)currentManager.getEnvironment()).getEnvironmentTemplate().get("ExecutorService") != null; validate(migrateExecutorJobs); Map> timerMigrated = null; + Map> stateBasedTimer = null; + Map> slaTimerMigrated = null; + Map> humanTaskSuspended = null; try { // collect and cancel any active timers before migration - timerMigrated = cancelActiveTimersBeforeMigration(currentManager); + timerMigrated = cancelActiveTimersBeforeMigration(currentManager, TimerNodeInstance.class, active -> asList(active.getTimerId())); + stateBasedTimer = cancelActiveTimersBeforeMigration(currentManager, StateBasedNodeInstance.class, active -> active.getTimerInstances()); + slaTimerMigrated = cancelActiveTimersBeforeMigration(currentManager, org.jbpm.workflow.instance.impl.NodeInstanceImpl.class, active -> asList(active.getSlaTimerId())); + humanTaskSuspended = cancelActiveTimersBeforeMigration(currentManager, HumanTaskNodeInstance.class, active -> asList((active.getSuspendUntilTimerId()))); - // start transaction to secure consistency of the migration + // start transaction to secure consistency of the migration txm = TransactionManagerFactory.get().newTransactionManager(currentManager.getEnvironment().getEnvironment()); transactionOwner = txm.begin(); @@ -254,9 +266,14 @@ public MigrationReport migrate(Map nodeMapping) { tobe = toBeManager.getEnvironment().getKieBase().newKieSession(); upgradeProcessInstance(current, tobe, migrationSpec.getProcessInstanceId(), migrationSpec.getToProcessId(), nodeMapping, em, toBeManager.getIdentifier()); - if (!timerMigrated.isEmpty()) { - rescheduleTimersAfterMigration(toBeManager, timerMigrated); - } + + // reschedule timers + rescheduleTimersAfterMigration(toBeManager, TimerNodeInstance.class, timerMigrated, (active, timers) -> active.internalSetTimerId(toSingletonTimerId(timers))); + rescheduleTimersAfterMigration(toBeManager, StateBasedNodeInstance.class, stateBasedTimer, (active, timers) -> active.internalSetTimerInstances(timers.stream().map(TimerInstance::getId).collect(Collectors.toList()))); + rescheduleTimersAfterMigration(toBeManager, org.jbpm.workflow.instance.impl.NodeInstanceImpl.class, slaTimerMigrated, (active, timers) -> active.internalSetSlaTimerId(toSingletonTimerId(timers))); + rescheduleTimersAfterMigration(toBeManager, HumanTaskNodeInstance.class, humanTaskSuspended, (active, timers) -> active.setSuspendUntilTimerId(toSingletonTimerId(timers))); + + em.flush(); } finally { em.clear(); @@ -270,10 +287,12 @@ public MigrationReport migrate(Map nodeMapping) { } catch (Throwable e) { txm.rollback(transactionOwner); logger.error("Unexpected error during migration", e); - // put back timers (if there are any) in case of rollback - if (timerMigrated != null && !timerMigrated.isEmpty()) { - rescheduleTimersAfterMigration(currentManager, timerMigrated); - } + + rescheduleTimersAfterMigration(toBeManager, TimerNodeInstance.class, timerMigrated, (active, timers) -> active.internalSetTimerId(toSingletonTimerId(timers))); + rescheduleTimersAfterMigration(toBeManager, StateBasedNodeInstance.class, stateBasedTimer, (active, timers) -> active.internalSetTimerInstances(timers.stream().map(TimerInstance::getId).collect(Collectors.toList()))); + rescheduleTimersAfterMigration(toBeManager, org.jbpm.workflow.instance.impl.NodeInstanceImpl.class, slaTimerMigrated, (active, timers) -> active.internalSetSlaTimerId(toSingletonTimerId(timers))); + rescheduleTimersAfterMigration(toBeManager, HumanTaskNodeInstance.class, humanTaskSuspended, (active, timers) -> active.setSuspendUntilTimerId(toSingletonTimerId(timers))); + report.addEntry(Type.ERROR, "Migration of process instance (" + migrationSpec.getProcessInstanceId() + ") failed due to " + e.getMessage()); } finally { @@ -297,6 +316,10 @@ public MigrationReport migrate(Map nodeMapping) { return report; } + + private Long toSingletonTimerId(List timerInstances) { + return (timerInstances.isEmpty()) ? -1 : timerInstances.get(0).getId(); + } private void validate(boolean migrateExecutorJobs) { if (migrationSpec == null) { @@ -356,7 +379,7 @@ private void validate(boolean migrateExecutorJobs) { if (migrateExecutorJobs) { List executorJobs = (List) em.createQuery("select id FROM RequestInfo ri WHERE ri.processInstanceId = :processInstanceId and ri.status in (:statuses)") .setParameter("processInstanceId", migrationSpec.getProcessInstanceId()) - .setParameter("statuses", Arrays.asList(STATUS.QUEUED, STATUS.RETRYING, STATUS.RUNNING)) + .setParameter("statuses", asList(STATUS.QUEUED, STATUS.RETRYING, STATUS.RUNNING)) .getResultList(); if (!executorJobs.isEmpty()) { @@ -569,7 +592,7 @@ protected TimerManager getTimerManager(KieSession ksession) { return ((InternalProcessRuntime) ((StatefulKnowledgeSessionImpl) internal).getProcessRuntime()).getTimerManager(); } - protected Map> cancelActiveTimersBeforeMigration(RuntimeManager manager) { + protected Map> cancelActiveTimersBeforeMigration(RuntimeManager manager, Class type, Function> getTimerInstances ) { RuntimeEngine engineBefore = manager.getRuntimeEngine(ProcessInstanceIdContext.get(migrationSpec.getProcessInstanceId())); try { Map> timerMigrated = engineBefore.getKieSession().execute(new ExecutableCommand>>() { @@ -589,28 +612,26 @@ public Map> execute(Context context) { Collection activeInstances = processInstance.getNodeInstances(true); for (org.jbpm.workflow.instance.NodeInstance active : activeInstances) { - if (active instanceof TimerNodeInstance) { - TimerInstance timerInstance = timerManager.getTimerMap().get(((TimerNodeInstance) active).getTimerId()); - - timerManager.cancelTimer(processInstance.getId(), timerInstance.getId()); - result.put(active.getId(), Arrays.asList(timerInstance)); - } else if (active instanceof StateBasedNodeInstance) { - List timers = ((StateBasedNodeInstance) active).getTimerInstances(); - - if (timers != null && !timers.isEmpty()) { - List collected = new ArrayList<>(); - for (Long timerId : timers) { - TimerInstance timerInstance = timerManager.getTimerMap().get(timerId); - if (timerInstance==null) { - report.addEntry(Type.WARN, "Could not find timer instance with id "+timerId+" to cancel."); - continue; - } - timerManager.cancelTimer(processInstance.getId(), timerInstance.getId()); - collected.add(timerInstance); - } - result.put(active.getId(), collected); + if (!type.isAssignableFrom(active.getClass())) { + continue; + } + + List collected = new ArrayList<>(); + List timers = getTimerInstances.apply(type.cast(active)); + for (Long timerId : timers) { + if (timerId == -1) { + continue; } + + TimerInstance timerInstance = timerManager.getTimerMap().get(timerId); + if (timerInstance == null) { + report.addEntry(Type.WARN, "Could not find timer instance with id " + timerId + " to cancel."); + continue; + } + timerManager.cancelTimer(processInstance.getId(), timerInstance.getId()); + collected.add(timerInstance); } + result.put(active.getId(), collected); } return result; @@ -623,12 +644,19 @@ public Map> execute(Context context) { } } - protected void rescheduleTimersAfterMigration(RuntimeManager manager, Map> timerMigrated) { + + + + protected void rescheduleTimersAfterMigration(RuntimeManager manager, Class nodeType, Map> timersToMigrate, BiConsumer> timerMigrated) { + if(timersToMigrate.isEmpty()) { + return; + } + RuntimeEngine engine = manager.getRuntimeEngine(ProcessInstanceIdContext.get(migrationSpec.getProcessInstanceId())); try { engine.getKieSession().execute(new ExecutableCommand() { - private static final long serialVersionUID = 7144657913971146080L; + private static final long serialVersionUID = 7144357923971146089L; @Override public Void execute(Context context) { @@ -637,35 +665,25 @@ public Void execute(Context context) { WorkflowProcessInstanceImpl processInstance = (WorkflowProcessInstanceImpl) kieSession.getProcessInstance(migrationSpec.getProcessInstanceId()); - for (Entry> entry : timerMigrated.entrySet()) { + for (Entry> entry : timersToMigrate.entrySet()) { org.jbpm.workflow.instance.NodeInstance active = processInstance.getNodeInstance(entry.getKey(), true); - if (active instanceof TimerNodeInstance) { - TimerInstance timerInstance = entry.getValue().get(0); - + if(!nodeType.isAssignableFrom(active.getClass())) { + continue; + } + + List timerInstances = entry.getValue(); + for (TimerInstance timerInstance : timerInstances) { long delay = timerInstance.getDelay() - (System.currentTimeMillis() - timerInstance.getActivated().getTime()); timerInstance.setDelay(delay); updateBasedOnTrigger(timerInstance); - - timerManager.registerTimer(timerInstance, processInstance); - ((TimerNodeInstance) active).internalSetTimerId(timerInstance.getId()); - } else if (active instanceof StateBasedNodeInstance) { - - List timerInstances = entry.getValue(); - List timers = new ArrayList<>(); - for (TimerInstance timerInstance : timerInstances) { - long delay = timerInstance.getDelay() - (System.currentTimeMillis() - timerInstance.getActivated().getTime()); - timerInstance.setDelay(delay); - - updateBasedOnTrigger(timerInstance); - - timerManager.registerTimer(timerInstance, processInstance); - timers.add(timerInstance.getId()); - } - ((StateBasedNodeInstance) active).internalSetTimerInstances(timers); + timerManager.registerTimer(timerInstance, processInstance); } + + timerMigrated.accept(nodeType.cast(active), timerInstances); + } return null; @@ -697,4 +715,4 @@ protected void updateBasedOnTrigger(TimerInstance timerInstance) { } } -} +} \ No newline at end of file