Skip to content

Latest commit

 

History

History
 
 

riptide-stream

Folders and files

NameName
Last commit message
Last commit date

parent directory

..
 
 
 
 
 
 

Riptide: Stream

Waterfall

Javadoc Maven Central

Riptide: Stream allows to read arbitrary infinite JSON streams.

Example

http.get("/sales-orders")
    .dispatch(series(),
        on(SUCCESSFUL).call(streamOf(Order.class), forEach(this::process)));

Features

Dependencies

  • Java 8

Installation

Add the following dependency to your project:

<dependency>
    <groupId>org.zalando</groupId>
    <artifactId>riptide-stream</artifactId>
    <version>${riptide.version}</version>
</dependency>

Configuration

In order to enable streaming you only need to register the StreamConverter. You're also strongly encouraged to use RestAsyncClientHttpRequestFactory provided by Riptide: HTTP Client as it fixes an issue with Spring's implementation which tries to consume infinite streams when trying to close a connection.

Http.builder()
    .requestFactory(new RestAsyncClientHttpRequestFactory(client, executor))
    .converter(Streams.streamConverter(mapper))
    .build();

Usage

You can either consume a stream using forEach(Consumer), as shown in the first example. Alternatively you can capture and return a stream:

public Stream<Order> streamOrders() {
    Capture<Stream<Order>> capture = Capture.empty();
    
    return http.get("/sales-orders")
        .dispatch(series(),
            on(SUCCESSFUL).call(streamOf(Order.class), capture))
        .thenApply(capture)
        .join();
}

Beware, the returned stream has to be closed properly otherwise it may occupy a connection/socket forever. This might be easy to miss since most streams are backed by collections and don't need to be closed explicitly:

Streams have a BaseStream.close() method and implement AutoCloseable, but nearly all stream instances do not actually need to be closed after use. Generally, only streams whose source is an IO channel (such as those returned by Files.lines(Path, Charset)) will require closing. Most streams are backed by collections, arrays, or generating functions, which require no special resource management. (If a stream does require closing, it can be declared as a resource in a try-with-resources statement.)

https://docs.oracle.com/javase/8/docs/api/java/util/stream/Stream.html

Getting Help

If you have questions, concerns, bug reports, etc., please file an issue in this repository's Issue Tracker.

Getting Involved/Contributing

To contribute, simply make a pull request and add a brief description (1-2 sentences) of your addition or change. For more details, check the contribution guidelines.