Skip to content

[3.5 February 2014] Scatter Gather

Mariano Gonzalez edited this page Jan 8, 2014 · 5 revisions

[DESIGN CLOSED]

Motivation / Context

Mule implements Multicasting through the <all> router. Basically, this router has a list of different routes and sends a different copy of the current mule message through each one of them. This is done sequentially which is fine for use cases in which:

  • route (n) depends on side effects generated by route (n-1)
  • route (n+1) should be skipped if an exception is thrown by route (n)

The downside of processing sequentially if lack of efficiency when none of the above conditions are met. In that case, the best is to execute all routes concurrently.

A new element needs to be added so that all routes can be executed in parallel.

Uses cases

  • As a developer, I want to be able to multicast a message in parallel. The thread executing the flow that owns the router should wait until all routes finish.
  • As a developer, I want to be able to configure a timeout so that an exception is thrown if a route is not completed in a certain amount of time
  • As a developer, I want exceptions to be grouped in case of routes failing

Behaviour

The router will execute all routes concurrently. The thread executing the flow that owns the router will wait until all routes complete or time out.

Exception handling

If there's no failure, results are aggregated into a MessageCollection. Failure in one route will not stop the other ones from being executed, which opens the possibility to many routes failing. The behaviour in that case will be to aggregate the resulting messages in a MessageCollection as usual, setting each aggregated message's exception payload accordingly. After setting this MessageCollection into the payload, a CompositeRoutingException will be thrown.

CompositeRoutingException will be a new type of exception extending MessagingException. It will map each exception to a route by using a sequential route id.

Timeout

For each scattered event, a timeout will be used to make sure that no routes takes more than a certain amount of milliseconds in being completed. If the timeout is exceeded, a ResponseTimeoutException will be attached to that route

Custom Gathering strategies

There're some scenarios in which a custom gathering strategy might be needed, for example:

  • To merge message properties added by different routes.
  • To discard failed messages without throwing exception
  • To only select one response
  • Etc

To do this, the following interface will be provided

public interface AggregationStrategy {

    public MuleEvent aggregate(MuleEvent originalEvent, List<MuleEvent> events) throws MuleException;
}

The aggregate method will receive an ordered list in which each event's position matches the order of its corresponding route alongside the original event that was scattered.

Additionaly, you might want to also specify a custom way to handle exceptions. For those cases, this other interface will be provided:

public interface AggregationErrorHandler {

    public MuleEvent onException(MuleEvent originalEvent, List<MuleEvent> successfulResponses, List<MuleEvent> failedResponses) throws MuleException;
}

NOTE: In case you're wondering, YES! in the future, we will be extending the use of these interfaces to other aggregation components.

Differences between scatter-gather and <all>

At the beginning of this spec we already discussed the difference between doing sequential and parallel multicasting. The question you might be asking is "why a new element over adding a parallel='true' attribute on the <all> router?"

Short answer is behaviour. Parallel processing would force the <all> router to drastically change its behaviour and responses between sequential and parallel modes. We believe that's not a really good practice since it means that:

  • Behaviour is not isolated: Changing the router mode will also affect any other MP or exception strategy expecting the router's output
  • Backwards compatibility: Maintaining compatibility and feature parity between two modes of one single component is way more difficult than maintaining two separate elements, which might be a limitation in the future

Following is a comprehensive list of behaviour differences between <all> and <scatter-gather>

Payload side effect

  • When using <all> changes to the payload performed in route n are visible in route (n+1)
  • When using <scatter-gather>, each route has different shallow copies of the original event

Exception handling

  • <all> throws CouldNotRouteOutboundMessageException upon route failure and stops processing. When catching the exception, you'll have no information about the result of any prior routes
  • <scatter-gather> will process all routes no matter what. It will also aggregate the results of all routes into a MuleMessageCollection in which each entry has the ExceptionPayload set accordingly and then will throw a CompositeRoutingException which will give you visibility over the output of other routes.

Syntax

Minimum syntax

<scatter-gather>
    <flow-ref name="A" />
    <vm:outbound-endpoint path="foo" />
    <sfdc:upsert ..... />
</scatter-gather>

This syntax assumes a default threading profile -the one returned by muleContext.getDefaultThreadingProfile() and without timeout

Complete syntax

<scatter-gather timeout="5000">
    <flow-ref name="A" />
    <vm:outbound-endpoint path="foo" />
    <sfdc:upsert ..... />

    <custom-aggregation-strategy [class="org.my.CustomAggregationStrategy" | ref="beanReference"] />
    <custom-aggregation-error-handler [class="org.my.CustomAggregationErrorHandler" | ref="beanReference"] />
    <threading-profile maxThreadsActive="5" />
</scatter-gather>

###timeout

This attribute is in milliseconds and defaults to zero. Any value lower or equals to zero means no timeout.

###threading-profile

This element is optional. Allows configuring the underlying thread pool.

custom-aggregation-strategy

This optional element allows setting a custom gathering strategy. You can either set the attribute:

  • class: A String with the canonical name of a class that implements AggregationStrategy. That class needs to have a default constructor
  • ref: the name of a registered bean that implements AggregationStrategy.

custom-aggregation-error-handler

This optional element allows setting a custom exception handle. You can either set the attribute:

  • class: A String with the canonical name of a class that implements AggregationErrorHandler. That class needs to have a default constructor
  • ref: the name of a registered bean that implements AggregationErrorHandler.

You can't set name and ref at the same time. Doing so will result in an exception when starting the application

Risks

User confusion regarding differences between <all> and <scatter-gather> and when to use which.

Impact

Studio Impact

Studio needs to update their editors to support this new feature

DevKit Impact

No impact

MMC Impact

No impact

CH Impact

No impact

Service Registry Impact

No impact

Migration Impact

No impact since this is a new feature

Documentation Impact

  • Update docs reflecting this new feature
  • Update training documents