Skip to content

[3.5 M1][CORE] Connector Auto Paging

Mariano Gonzalez edited this page Aug 29, 2013 · 9 revisions

Jira: http://www.mulesoft.org/jira/browse/MULE-6844

Forum discussion: http://forum.mulesoft.org/mulesoft/topics/_3_5_m2_core_connector_auto_paging?rfm=1

Motivation / Context

Mule has always lacked consistency when executing queries through a Cloud Connector. Since the underlying APIs usually differ a great deal from each other, the usability of the Cloud Connector is always different too.

The DataSense Query Language (Dsql) aims to solve that problem from the perspective of the user building the query. However, there's a yet to be tackled set of problems related to paging. If you have to get the result set in pages (either because it's too big to fit into memory or because the API returns in pages in the first place) you again find yourself with an adhoc user experience. While APIs like Box use a limit/offset pair, Salesforce would use server side cursors and Google will use a page token.

The goal of this story is to enable enable auto paging on Cloud Connectors, so that the whole result set can be automatically streamed hiding from the user the complexities of dealing with large result sets or the mechanics of the connector's paging API.

Uses cases

  • As a SaaS developer I want to be able to stream a list of objects without worrying about the underlying paging mechanism
  • As a SaaS developer I want to be able to stream a large list of objects without worrying about those fitting into memory
  • As a SaaS developer I want to be able to control the page size
  • As a SaaS developer I want to be able to consume a paginated query but still be able to determine the total number of items when the underlying API allows it.

Design

A clear and decoupled solution needs to separate the following concerns:

  • Knowing how to consume the client paging API
  • Knowing how to iterate through those results hiding the paging/streaming mechanics

The details of the client paging API will be handled by Devkit. The iteration/streaming mechanics will be under the scope of the Mule core.

Paging in Mule

From the ESB perspective, support this feature requires:

  • Being able to iterate through the results in a transparent way so that other message processors can make use of them without being aware of what's really happening
  • Make efficient use of resources. Do not hold resources longer than necessary while making a best effort not involving the user in such a management

Closeable

The first thing we'll need is the ability to identify resources that need to be closed:

package org.mule.api;

/**
 * Marking interface to identity resources that need to be closed in order to release
 * resources.
 */
public interface Closeable
{

    /**
     * Closes the resource. Calling this method is mandatory for any component using
     * this instance once it finishes using it. This method should not throw
     * exception is invoked on an instance that has already been closed
     * 
     * @throws MuleException if an exception occurs closing the resource
     */
    public void close() throws MuleException;

}

Iterating

Classes below are ment to support the use cases in this story and any other similar use case. However, usage of the classes desbribed below will not be mandatory for implementing similar use cases. Leverage what you can from them, but if they not quite suite your use cases feel free to implement this in your own way

Iterating through lazy, paged or streaming collections is becoming more and more common in mule. Therefore we want to introduce a set of tools for that kind of scenarios.

The main design principle here will be to expose the paged resource as an Iterator. This iterator will at the same time be decoupled from the actual data source by acting as an adapter of a Consumer:

package org.mule.streaming;

import org.mule.api.Closeable;

import java.util.NoSuchElementException;

public interface Consumer<T> extends Closeable
{

   /**
     * Retrieves the next available item.
     * 
     * @return an object of type T if available
     * @throws NoSuchElementException if no more items are available
     */
    public T consume() throws NoSuchElementException;

    /**
     * Returns <code>true</code> if no more items are available. When the resource
     * has been fully consumed and this method returns <code>true</code>,
     * implementors of this class are require to invoke the {@link
     * org.mule.api.Closeable.close()} method before returning in order to release
     * resources as quickly as possible. Users of this component are still required
     * to invoke the same close method when they're finish with it. This is so to
     * account for the case in which a consumer needs to be released before being
     * fully consumed
     * 
     * @return <code>true</code> if no more items are available. <code>false</code>
     *         otherwise
     */
    public boolean isConsumed();


    /**
    * returns the total amount of items available for consumption.
    * In some scenarios, it might not be possible/convenient to actually retrieve this value.
    * -1 is returned in such a case.
    */
    public int totalAvailable();

}

This consumer will work in paired with a producer

package org.mule.streaming;

import org.mule.api.Closeable;

import java.util.List;

public interface Producer<T> extends Closeable
{

    public List<T> produce();
    
}

Finally, this is how the Iterator would look like:

package org.mule.streaming;

import org.mule.api.Closeable;
import org.mule.api.MuleException;
import org.mule.api.streaming.Consumer;

import java.util.Iterator;

public class ConsumerIterator<T> implements Iterator<T>, Closeable
{

    private Consumer<T> consumer;

    public ConsumerIterator(Consumer<T> consumer)
    {
        this.consumer = consumer;
    }

    /**
     * Closes the underlying consumer
     */
    @Override
    public void close() throws MuleException
    {
        this.consumer.close();
    }

    /**
     * Returns true as long as the underlying consumer is not fully consumed nor
     * closed
     */
    @Override
    public boolean hasNext()
    {
        return !this.consumer.isConsumed();
    }

    /**
     * Gets an item from the consumer and returns it
     */
    @Override
    public T next()
    {
        return this.consumer.consume();
    }

    /**
     * Not allowed on this implementations
     * 
     * @throws UnsupportedOperationException
     */
    public void remove()
    {
        throw new UnsupportedOperationException();
    }

    /**
     * returns the total amount of items available for consumption. In some
     * scenarios, it might not be possible/convenient to actually retrieve this value
     * or it might not be available at this point. -1 is returned in such a case.
     */
    public int size()
    {
        return this.consumer.totalAvailable();
    }
}

As you can see, the iterator above can be used for multiple purposes since the actual complexity is hidden behind the Consumer instance.

For this story, two Consumers will be implemented:

  • An element based consumer that retrieves pages but returns the elements one by one
  • A page based consumer that retrieves and returns whole pages

The last part of the puzzle comes from actually producing the results. To interface these components with any given Cloud Connector (or data source for that matter) the delegate pattern will be implemented through the following class:

package org.mule.streaming;

import org.mule.api.Closeable;

import java.util.List;

/**
 * A PagingDelegate is a {@link org.mule.api.Closeable} capable of consuming a data
 * feed in pages. Implementing this class does not guarantee thread safeness. Check
 * each particular implementation for information about that
 */
public abstract class PagingDelegate<T> implements Closeable
{

    /**
     * Returns the next page of items. If the return value is <code>null</code> or an
     * empty list, then it means no more items are available
     * 
     * @return a populated list of elements. <code>null</code> or an empty list, then
     *         it means no more items are available
     */
    public abstract List<T> getPage();

    /**
     * returns the total amount of items in the unpaged resultset. In some scenarios,
     * it might not be possible/convenient to actually retrieve this value. -1 is
     * returned in such a case.
     */
    public abstract int getTotalResults();

}

A producer for this delegate would look like this: (pseudo-code!)

package org.mule.streaming;

import org.mule.api.MuleException;
import org.mule.api.streaming.PagingDelegate;
import org.mule.api.streaming.Producer;

import java.util.List;

public class PagingDelegateProducer<T> implements Producer<T>
{

    private PagingDelegate<T> delegate;

    public PagingDelegateProducer(PagingDelegate<T> delegate)
    {
        this.delegate = delegate;
    }

    @Override
    public List<T> produce()
    {
        return this.delegate.getPage();
    }

    @Override
    public void close() throws MuleException
    {
        this.delegate.close();
    }
}

Managing Resources

Closing resources early

As specified on the javadocs for the Consumer interface, whenever a consumer is fully consumed, it will be required to close itself.

Closing resources in case of exception

The class DefaultStreamCloserService will be updated so that it also handles instances of Closable.

Considering a Closable as a Consumable

DefaultMuleMessage needs to be updated so that any instance of Closable is considered as a consumable

Paging in Devkit

Now that all the mechanics are set in Mule, let's take a look at how this works on the connector's side.

@Paged

A new annotation called @Paged will be introduced in Devkit. This annotation will be appliable at a method level alongside the @Processor annotation and it means that the processor supports paging. If @Paged then the method will be required to return an instance of the previously introduced PagingDelegate. If a @Paged processor returns any other type, then Devkit will throw an exception at generation time.

A @Paged method is also required to receive as a parameter an instance of org.mule.api.streaming.PagingConfiguration. This object is a simple inmutable pojo carrying pagination parameter. We're grouping those into this object to keep signature consistency if we decided to add/drop parameters. This class is part of mule core and looks like this:

package org.mule.streaming;

/**
 * Inmutable pojo to carry pagination parameters
 */
public class PagingConfiguration
{

    /**
     * The amount of items to fetch on each invocation to the data source
     */
    private int fetchSize;
    
    public PagingConfiguration(int fetchSize)
    {
        this.fetchSize = fetchSize;
    }

    public int getFetchSize()
    {
        return fetchSize;
    }

}

@Paged looks like this:

/**
 * This annotation marks a method inside a {@link Module} as an operation that will return
 * a paged result set. Methods annotated with this interface must also be annotated with
 * {@link Processor} and must return an instance of {@link org.mule.api.streaming.PagingDelegate}
 *
 * Parameters on this method will be featured as an attribute on the Mule XML invocation.
 */
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.SOURCE)
@Documented
public @interface Paged {

    /**
     * Specifies the amount of elements to be retrieved on each invocation
     * to the data source. Defaults to 100.
     */
    int defaultFetchSize() default 100;
}

Message Processors

Generated message processors will extends the class AbstractDevkitBasedPageableMessageProcessor, which will extends the DevkitMessageProcessor class and will exists in the devkit-support module. This base message processor will be in charge of:

  • Transforming the PagingDelegate into an iterator.
  • It will handle the case in which the returned PagingDelegate is null

Upon invocation, this MPs will set the payload to an instance of ConsumerIterator. The user can extract the total amount of available records by using the following MEL expression: #[payload.size()]. Iteration can be aborted by doing #[payload.close()]

Configuration

For any @Paged message processor, the following attributes will be automatically added:

<cc:query
   fetchSize="100"
   firstPage="0"
   lastPage="-1"
   outputUnit="[ELEMENT|PAGE]" />

This is an example of how the search method from the Box connector would look like in a pageable version

@Processor
    @OAuthProtected
    @OAuthInvalidateAccessTokenOn(exception = BoxTokenExpiredException.class)
    @Paged
    public PagingDelegate<Item> getFolderItems(@Optional @Default("0") final String folderId, final PagingConfiguration pagingConfiguration) {
        return new PagingDelegate<Item>() {
            
            private GetItemsResponse cachedResponse;
            
            @Override
            public List<Item> getPage() {
                if (this.cachedResponse != null) {
                    List<Item> items = this.cachedResponse.getEntries();
                    this.cachedResponse = null;
                    
                    return items;
                }
                
                List<Item> items = this.query(folderId).getEntries();
                
                return items;
            }

            private GetItemsResponse query(final String folderId)
            {
                WebResource resource = apiResource.path("folders").path(folderId).path("items");
                return jerseyUtil.get(resource, GetItemsResponse.class, 200);
            }
            
            @Override
            public int getTotalResults() {
                if (this.cachedResponse == null) {
                    this.cachedResponse = this.query(folderId);
                }
                
                return this.cachedResponse.getTotalCount();
            }
            
            @Override
            public void close() throws MuleException {
                this.cachedResponse = null;
            }
        };
        
    }

Risks

  • There's a chance of resource leakege when a paging delegate is not fully consumed nor properly closed before the flow ends. This is a weakness that mule already has all along.
  • Bad implementations of the getTotalCount() method in the PagingDelegate might lead to performance issues.
  • Unforeseen issues may appear while implementing streaming capabilities.

Mule Studio Impact

  • New editor required for @Paged message processors

DevKit Impact

  • Devkit needs to implement support for methods annotated with @Paged
  • Devkit needs to update the documentation it generates

MMC Impact

Initially no impact. May require changes to add monitoring if we want to adapt this approach for batch processing

CH Impact

Initially no impact. Analyze compatibility with event tracking.

Service Registry Impact

No impact

Migration Impact

Paging will be disabled by default, but if it is enabled then the semantics of a flow using a query processor will change.

Documentation Impact

  • Generated documentation of all Cloud Connectors using this new functionality
  • Since this functionality will be cross to all components, Janet and her team to decide if to create a global feature page.