Skip to content

Commit

Permalink
Load and save aggregate as part of an active transaction
Browse files Browse the repository at this point in the history
  • Loading branch information
stephanpelikan committed Jun 18, 2024
1 parent cbdc54c commit b74b350
Show file tree
Hide file tree
Showing 3 changed files with 246 additions and 93 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,17 @@ public static class TaskHandlerActions {
public Supplier<Map.Entry<Runnable, Supplier<String>>> handlerCompletedCommand;
}

public static class RunDeferredInTransaction {
public RunDeferredInTransactionSupplier[] argsSupplier;
public Runnable saveAggregateAfterWorkflowTask;
}

public interface RunDeferredInTransactionSupplier extends Supplier<Object> { }

public static final ThreadLocal<TaskHandlerActions> actions = ThreadLocal.withInitial(TaskHandlerActions::new);

public static final ThreadLocal<RunDeferredInTransaction> runDeferredInTransaction = ThreadLocal.withInitial(RunDeferredInTransaction::new);

private final ApplicationEventPublisher publisher;

public Camunda8TransactionAspect(
Expand All @@ -36,20 +45,44 @@ public Camunda8TransactionAspect(

}

public static void registerDeferredInTransaction(
final RunDeferredInTransactionSupplier[] argsSupplier,
final Runnable saveAggregateAfterWorkflowTask) {

runDeferredInTransaction.get().argsSupplier = argsSupplier;
runDeferredInTransaction.get().saveAggregateAfterWorkflowTask = saveAggregateAfterWorkflowTask;

}

public static void unregisterDeferredInTransaction() {

runDeferredInTransaction.get().argsSupplier = null;
runDeferredInTransaction.get().saveAggregateAfterWorkflowTask = null;

}

private void saveWorkflowAggregate() {

runDeferredInTransaction.get().saveAggregateAfterWorkflowTask.run();

}

@Around("@annotation(io.vanillabp.spi.service.WorkflowTask)")
private Object checkForTransaction(
final ProceedingJoinPoint pjp) throws Throwable {

final var methodSignature = pjp.getSignature().toLongString();

if (!TransactionSynchronizationManager.isActualTransactionActive()) {
clearCallbacks();
logger.trace("Disable TX callbacks for {}: No TX active", methodSignature);
}

final var isTxActive = TransactionSynchronizationManager.isActualTransactionActive();

try {

final var value = pjp.proceed();
if (actions.get().testForTaskAlreadyCompletedOrCancelledCommand != null) {
final var newArgs = runDeferredInTransactionArgsSupplier(pjp.getArgs());
final var value = pjp.proceed(newArgs); // run @WorkflowTask annotated method
saveWorkflowAggregate();

if (isTxActive
&& (actions.get().testForTaskAlreadyCompletedOrCancelledCommand != null)) {
final var handlerTestCommand = actions.get().testForTaskAlreadyCompletedOrCancelledCommand.get();
if (handlerTestCommand != null) {
publisher.publishEvent(
Expand All @@ -62,18 +95,37 @@ private Object checkForTransaction(
if (actions.get().handlerCompletedCommand != null) {
final var handlerCompletedCommand = actions.get().handlerCompletedCommand.get();
if (handlerCompletedCommand != null) {
publisher.publishEvent(
new Camunda8TransactionProcessor.Camunda8CommandAfterTx(
methodSignature,
handlerCompletedCommand.getKey(),
handlerCompletedCommand.getValue()));
if (isTxActive) {
publisher.publishEvent(
new Camunda8TransactionProcessor.Camunda8CommandAfterTx(
methodSignature,
handlerCompletedCommand.getKey(),
handlerCompletedCommand.getValue()));
} else {
try {
handlerCompletedCommand.getKey().run();
} catch (Exception e) {
final var description = handlerCompletedCommand.getValue();
if (description != null) {
logger.error(
"Could not execute '{}'! Manual action required!",
description.get(),
e);
} else {
logger.error(
"Manual action required due to:",
e);
}
}
}
}
}
return value;

} catch (TaskException taskError) {

if (actions.get().testForTaskAlreadyCompletedOrCancelledCommand != null) {
if (isTxActive
&& (actions.get().testForTaskAlreadyCompletedOrCancelledCommand != null)) {
final var handlerTestCommand = actions.get().testForTaskAlreadyCompletedOrCancelledCommand.get();
if (handlerTestCommand != null) {
publisher.publishEvent(
Expand All @@ -86,17 +138,35 @@ private Object checkForTransaction(
if (actions.get().bpmnErrorCommand != null) {
final var runnable = actions.get().bpmnErrorCommand.getKey();
final var description = actions.get().bpmnErrorCommand.getValue();
publisher.publishEvent(
new Camunda8TransactionProcessor.Camunda8CommandAfterTx(
methodSignature,
() -> runnable.accept(taskError),
() -> description.apply(taskError)));
if (isTxActive) {
publisher.publishEvent(
new Camunda8TransactionProcessor.Camunda8CommandAfterTx(
methodSignature,
() -> runnable.accept(taskError),
() -> description.apply(taskError)));
} else {
try {
runnable.accept(taskError);
} catch (Exception e) {
if (description != null) {
logger.error(
"Could not execute '{}'! Manual action required!",
description.apply(taskError),
e);
} else {
logger.error(
"Manual action required due to:",
e);
}
}
}
}
return null;

} catch (Exception e) {

if (actions.get().testForTaskAlreadyCompletedOrCancelledCommand != null) {
if (isTxActive
&& (actions.get().testForTaskAlreadyCompletedOrCancelledCommand != null)) {
final var handlerTestCommand = actions.get().testForTaskAlreadyCompletedOrCancelledCommand.get();
if (handlerTestCommand != null) {
publisher.publishEvent(
Expand All @@ -109,18 +179,31 @@ private Object checkForTransaction(
if (actions.get().handlerFailedCommand != null) {
final var runnable = actions.get().handlerFailedCommand.getKey();
final var description = actions.get().handlerFailedCommand.getValue();
publisher.publishEvent(
new Camunda8TransactionProcessor.Camunda8CommandAfterTx(
methodSignature,
() -> runnable.accept(e),
() -> description.apply(e)));
if (isTxActive) {
publisher.publishEvent(
new Camunda8TransactionProcessor.Camunda8CommandAfterTx(
methodSignature,
() -> runnable.accept(e),
() -> description.apply(e)));
} else {
try {
runnable.accept(e);
} catch (Exception ie) {
if (description != null) {
logger.error(
"Could not execute '{}'! Manual action required!",
description.apply(e),
ie);
} else {
logger.error(
"Manual action required due to:",
ie);
}
}
}
}
throw e;

} finally {

clearCallbacks();

}

}
Expand All @@ -134,4 +217,25 @@ public static void clearCallbacks() {

}

private Object[] runDeferredInTransactionArgsSupplier(
final Object[] originalArgs) {

if (originalArgs == null) {
return null;
}

final var newArgs = new Object[ originalArgs.length ];
for (var i = 0; i < originalArgs.length; ++i) {
final var supplier = runDeferredInTransaction.get().argsSupplier[i];
if (supplier != null) {
newArgs[i] = supplier.get();
} else {
newArgs[i] = originalArgs[i];
}
}

return newArgs;

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,7 @@ public static Map.Entry<Runnable, Supplier<String>> handlerCompletedCommandCallb

public static void unregisterCallbacks() {

final var actions = Camunda8TransactionAspect.actions.get();
actions.testForTaskAlreadyCompletedOrCancelledCommand = null;
actions.bpmnErrorCommand = null;
actions.handlerFailedCommand = null;
actions.handlerCompletedCommand = null;
Camunda8TransactionAspect.clearCallbacks();

}

Expand Down
Loading

0 comments on commit b74b350

Please sign in to comment.