diff --git a/core/src/main/java/org/mule/runtime/core/privileged/exception/TemplateOnErrorHandler.java b/core/src/main/java/org/mule/runtime/core/privileged/exception/TemplateOnErrorHandler.java index 0c9fb0271f6..fbd44dc7699 100644 --- a/core/src/main/java/org/mule/runtime/core/privileged/exception/TemplateOnErrorHandler.java +++ b/core/src/main/java/org/mule/runtime/core/privileged/exception/TemplateOnErrorHandler.java @@ -10,19 +10,20 @@ import static org.mule.runtime.api.notification.EnrichedNotificationInfo.createInfo; import static org.mule.runtime.api.notification.ErrorHandlerNotification.PROCESS_END; import static org.mule.runtime.api.notification.ErrorHandlerNotification.PROCESS_START; +import static org.mule.runtime.api.profiling.type.RuntimeProfilingEventTypes.TX_ROLLBACK; import static org.mule.runtime.core.api.lifecycle.LifecycleUtils.disposeIfNeeded; import static org.mule.runtime.core.api.lifecycle.LifecycleUtils.initialiseIfNeeded; import static org.mule.runtime.core.api.lifecycle.LifecycleUtils.startIfNeeded; import static org.mule.runtime.core.api.lifecycle.LifecycleUtils.stopIfNeeded; import static org.mule.runtime.core.api.rx.Exceptions.unwrap; -import static org.mule.runtime.core.privileged.processor.MessageProcessors.buildNewChainWithListOfProcessors; -import static org.mule.runtime.core.privileged.processor.MessageProcessors.getDefaultProcessingStrategyFactory; -import static org.mule.runtime.core.privileged.processor.MessageProcessors.getProcessingStrategy; - +import static org.mule.runtime.core.api.transaction.TransactionUtils.profileTransactionAction; import static org.mule.runtime.core.internal.component.ComponentAnnotations.updateRootContainerName; import static org.mule.runtime.core.internal.exception.ErrorHandlerContextManager.addContext; import static org.mule.runtime.core.internal.util.LocationUtils.globalLocation; +import static org.mule.runtime.core.privileged.processor.MessageProcessors.buildNewChainWithListOfProcessors; +import static org.mule.runtime.core.privileged.processor.MessageProcessors.getDefaultProcessingStrategyFactory; +import static org.mule.runtime.core.privileged.processor.MessageProcessors.getProcessingStrategy; import static java.lang.Boolean.FALSE; import static java.lang.Boolean.TRUE; @@ -43,6 +44,7 @@ import org.mule.runtime.api.component.location.Location; import org.mule.runtime.api.exception.ErrorTypeRepository; import org.mule.runtime.api.exception.MuleException; +import org.mule.runtime.api.functional.Either; import org.mule.runtime.api.lifecycle.Disposable; import org.mule.runtime.api.lifecycle.InitialisationException; import org.mule.runtime.api.message.Error; @@ -50,6 +52,8 @@ import org.mule.runtime.api.message.error.matcher.ErrorTypeMatcher; import org.mule.runtime.api.notification.ErrorHandlerNotification; import org.mule.runtime.core.api.context.notification.FlowStackElement; +import org.mule.runtime.api.profiling.ProfilingDataProducer; +import org.mule.runtime.api.profiling.type.context.TransactionProfilingEventContext; import org.mule.runtime.core.api.el.ExpressionManager; import org.mule.runtime.core.api.event.CoreEvent; import org.mule.runtime.core.api.exception.NullExceptionHandler; @@ -67,6 +71,7 @@ import org.mule.runtime.core.internal.exception.ErrorHandlerContextManager.ErrorHandlerContext; import org.mule.runtime.core.internal.exception.ExceptionRouter; import org.mule.runtime.core.internal.exception.GlobalErrorHandler; +import org.mule.runtime.core.internal.profiling.InternalProfilingService; import org.mule.runtime.core.internal.rx.FluxSinkRecorder; import java.util.ArrayList; @@ -111,6 +116,10 @@ public abstract class TemplateOnErrorHandler extends AbstractDeclaredExceptionLi @Inject private ConfigurationProperties configurationProperties; + @Inject + private InternalProfilingService profilingService; + private ProfilingDataProducer rollbackProducer; + @Inject private ComponentTracerFactory componentTracerFactory; protected Optional flowLocation = empty(); @@ -213,6 +222,7 @@ public final CoreEvent handleException(Exception exception, CoreEvent event) { @Override public synchronized void initialise() throws InitialisationException { componentTracer = componentTracerFactory.fromComponent(TemplateOnErrorHandler.this); + this.rollbackProducer = profilingService.getProfilingDataProducer(TX_ROLLBACK); super.initialise(); } @@ -257,11 +267,21 @@ private Mono applyInternal(final Exception exception) { } private BiConsumer onRoutingError() { - return (me, event) -> { + return (me, obj) -> { try { logger.error("Exception during exception strategy execution"); getExceptionListener().resolveAndLogException(me); - if (isOwnedTransaction((CoreEvent) event, getException((CoreEvent) event))) { + boolean rollback = false; + if (obj instanceof CoreEvent) { + rollback = isOwnedTransaction((CoreEvent) obj, getException((CoreEvent) obj)); + } else if (obj instanceof Either) { + if (((Either) obj).getLeft() instanceof MessagingException) { + MessagingException exception = (MessagingException) ((Either) obj).getLeft(); + rollback = isOwnedTransaction(exception.getEvent(), exception); + } + } + if (rollback) { + profileTransactionAction(rollbackProducer, TX_ROLLBACK, getLocation()); TransactionCoordination.getInstance().rollbackCurrentTransaction(); } } catch (Exception ex) { diff --git a/modules/extensions-support/src/main/java/org/mule/runtime/module/extension/internal/runtime/source/DefaultSourceCallbackContext.java b/modules/extensions-support/src/main/java/org/mule/runtime/module/extension/internal/runtime/source/DefaultSourceCallbackContext.java index 7535a660694..a756e370345 100644 --- a/modules/extensions-support/src/main/java/org/mule/runtime/module/extension/internal/runtime/source/DefaultSourceCallbackContext.java +++ b/modules/extensions-support/src/main/java/org/mule/runtime/module/extension/internal/runtime/source/DefaultSourceCallbackContext.java @@ -6,13 +6,14 @@ */ package org.mule.runtime.module.extension.internal.runtime.source; -import static java.lang.String.format; -import static java.util.Optional.ofNullable; import static org.mule.runtime.api.i18n.I18nMessageFactory.createStaticMessage; import static org.mule.runtime.api.profiling.type.RuntimeProfilingEventTypes.TX_START; import static org.mule.runtime.api.util.Preconditions.checkArgument; import static org.mule.runtime.api.util.Preconditions.checkState; import static org.mule.runtime.core.api.transaction.TransactionUtils.profileTransactionAction; +import static org.slf4j.LoggerFactory.getLogger; +import static java.lang.String.format; +import static java.util.Optional.ofNullable; import org.mule.runtime.api.component.Component; import org.mule.runtime.api.connection.ConnectionException; @@ -37,6 +38,7 @@ import org.mule.sdk.api.runtime.source.SourceCallback; import org.mule.sdk.api.runtime.source.SourceCallbackContext; import org.mule.sdk.api.runtime.source.DistributedTraceContextManager; +import org.slf4j.Logger; import java.util.LinkedList; import java.util.List; @@ -52,6 +54,7 @@ class DefaultSourceCallbackContext implements SourceCallbackContextAdapter { private static final TransactionHandle NULL_TRANSACTION_HANDLE = new NullTransactionHandle(); private static final TransactionHandle DEFAULT_TRANSACTION_HANDLE = new DefaultTransactionHandle(); + private static final Logger LOGGER = getLogger(DefaultSourceCallbackContext.class); private final SourceCallbackAdapter sourceCallback; private final Map variables = new SmallMap<>(); @@ -108,6 +111,7 @@ public TransactionHandle bindConnection(Object connection) throws ConnectionExce transactionHandle = DEFAULT_TRANSACTION_HANDLE; } } catch (Exception e) { + LOGGER.warn("Connection could not be bound", e); releaseConnection(); throw e; }