Skip to content

Commit

Permalink
W-17430963: ClassCast exception when error within on-error-continue p…
Browse files Browse the repository at this point in the history
…revents from rolling back transaction (#14079) (#14094)
  • Loading branch information
mbuchwald authored Dec 18, 2024
1 parent 966975e commit 0f154e3
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,20 @@
import static org.mule.runtime.api.notification.EnrichedNotificationInfo.createInfo;
import static org.mule.runtime.api.notification.ErrorHandlerNotification.PROCESS_END;
import static org.mule.runtime.api.notification.ErrorHandlerNotification.PROCESS_START;
import static org.mule.runtime.api.profiling.type.RuntimeProfilingEventTypes.TX_ROLLBACK;
import static org.mule.runtime.core.api.lifecycle.LifecycleUtils.disposeIfNeeded;
import static org.mule.runtime.core.api.lifecycle.LifecycleUtils.initialiseIfNeeded;
import static org.mule.runtime.core.api.lifecycle.LifecycleUtils.startIfNeeded;
import static org.mule.runtime.core.api.lifecycle.LifecycleUtils.stopIfNeeded;
import static org.mule.runtime.core.api.rx.Exceptions.unwrap;

import static org.mule.runtime.core.privileged.processor.MessageProcessors.buildNewChainWithListOfProcessors;
import static org.mule.runtime.core.privileged.processor.MessageProcessors.getDefaultProcessingStrategyFactory;
import static org.mule.runtime.core.privileged.processor.MessageProcessors.getProcessingStrategy;

import static org.mule.runtime.core.api.transaction.TransactionUtils.profileTransactionAction;
import static org.mule.runtime.core.internal.component.ComponentAnnotations.updateRootContainerName;
import static org.mule.runtime.core.internal.exception.ErrorHandlerContextManager.addContext;
import static org.mule.runtime.core.internal.util.LocationUtils.globalLocation;
import static org.mule.runtime.core.privileged.processor.MessageProcessors.buildNewChainWithListOfProcessors;
import static org.mule.runtime.core.privileged.processor.MessageProcessors.getDefaultProcessingStrategyFactory;
import static org.mule.runtime.core.privileged.processor.MessageProcessors.getProcessingStrategy;

import static java.lang.Boolean.FALSE;
import static java.lang.Boolean.TRUE;
Expand All @@ -43,13 +44,16 @@
import org.mule.runtime.api.component.location.Location;
import org.mule.runtime.api.exception.ErrorTypeRepository;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.functional.Either;
import org.mule.runtime.api.lifecycle.Disposable;
import org.mule.runtime.api.lifecycle.InitialisationException;
import org.mule.runtime.api.message.Error;
import org.mule.runtime.api.message.ErrorType;
import org.mule.runtime.api.message.error.matcher.ErrorTypeMatcher;
import org.mule.runtime.api.notification.ErrorHandlerNotification;
import org.mule.runtime.core.api.context.notification.FlowStackElement;
import org.mule.runtime.api.profiling.ProfilingDataProducer;
import org.mule.runtime.api.profiling.type.context.TransactionProfilingEventContext;
import org.mule.runtime.core.api.el.ExpressionManager;
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.api.exception.NullExceptionHandler;
Expand All @@ -67,6 +71,7 @@
import org.mule.runtime.core.internal.exception.ErrorHandlerContextManager.ErrorHandlerContext;
import org.mule.runtime.core.internal.exception.ExceptionRouter;
import org.mule.runtime.core.internal.exception.GlobalErrorHandler;
import org.mule.runtime.core.internal.profiling.InternalProfilingService;
import org.mule.runtime.core.internal.rx.FluxSinkRecorder;

import java.util.ArrayList;
Expand Down Expand Up @@ -111,6 +116,10 @@ public abstract class TemplateOnErrorHandler extends AbstractDeclaredExceptionLi
@Inject
private ConfigurationProperties configurationProperties;

@Inject
private InternalProfilingService profilingService;
private ProfilingDataProducer<TransactionProfilingEventContext, Object> rollbackProducer;

@Inject
private ComponentTracerFactory<CoreEvent> componentTracerFactory;
protected Optional<String> flowLocation = empty();
Expand Down Expand Up @@ -213,6 +222,7 @@ public final CoreEvent handleException(Exception exception, CoreEvent event) {
@Override
public synchronized void initialise() throws InitialisationException {
componentTracer = componentTracerFactory.fromComponent(TemplateOnErrorHandler.this);
this.rollbackProducer = profilingService.getProfilingDataProducer(TX_ROLLBACK);
super.initialise();
}

Expand Down Expand Up @@ -257,11 +267,21 @@ private Mono<CoreEvent> applyInternal(final Exception exception) {
}

private BiConsumer<Throwable, Object> onRoutingError() {
return (me, event) -> {
return (me, obj) -> {
try {
logger.error("Exception during exception strategy execution");
getExceptionListener().resolveAndLogException(me);
if (isOwnedTransaction((CoreEvent) event, getException((CoreEvent) event))) {
boolean rollback = false;
if (obj instanceof CoreEvent) {
rollback = isOwnedTransaction((CoreEvent) obj, getException((CoreEvent) obj));
} else if (obj instanceof Either) {
if (((Either) obj).getLeft() instanceof MessagingException) {
MessagingException exception = (MessagingException) ((Either) obj).getLeft();
rollback = isOwnedTransaction(exception.getEvent(), exception);
}
}
if (rollback) {
profileTransactionAction(rollbackProducer, TX_ROLLBACK, getLocation());
TransactionCoordination.getInstance().rollbackCurrentTransaction();
}
} catch (Exception ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@
*/
package org.mule.runtime.module.extension.internal.runtime.source;

import static java.lang.String.format;
import static java.util.Optional.ofNullable;
import static org.mule.runtime.api.i18n.I18nMessageFactory.createStaticMessage;
import static org.mule.runtime.api.profiling.type.RuntimeProfilingEventTypes.TX_START;
import static org.mule.runtime.api.util.Preconditions.checkArgument;
import static org.mule.runtime.api.util.Preconditions.checkState;
import static org.mule.runtime.core.api.transaction.TransactionUtils.profileTransactionAction;
import static org.slf4j.LoggerFactory.getLogger;
import static java.lang.String.format;
import static java.util.Optional.ofNullable;

import org.mule.runtime.api.component.Component;
import org.mule.runtime.api.connection.ConnectionException;
Expand All @@ -37,6 +38,7 @@
import org.mule.sdk.api.runtime.source.SourceCallback;
import org.mule.sdk.api.runtime.source.SourceCallbackContext;
import org.mule.sdk.api.runtime.source.DistributedTraceContextManager;
import org.slf4j.Logger;

import java.util.LinkedList;
import java.util.List;
Expand All @@ -52,6 +54,7 @@ class DefaultSourceCallbackContext implements SourceCallbackContextAdapter {

private static final TransactionHandle NULL_TRANSACTION_HANDLE = new NullTransactionHandle();
private static final TransactionHandle DEFAULT_TRANSACTION_HANDLE = new DefaultTransactionHandle();
private static final Logger LOGGER = getLogger(DefaultSourceCallbackContext.class);

private final SourceCallbackAdapter sourceCallback;
private final Map<String, Object> variables = new SmallMap<>();
Expand Down Expand Up @@ -108,6 +111,7 @@ public TransactionHandle bindConnection(Object connection) throws ConnectionExce
transactionHandle = DEFAULT_TRANSACTION_HANDLE;
}
} catch (Exception e) {
LOGGER.warn("Connection could not be bound", e);
releaseConnection();
throw e;
}
Expand Down

0 comments on commit 0f154e3

Please sign in to comment.