Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Persistent streams #347

Merged
merged 27 commits into from
Jul 2, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
e9ef61e
first implementation of persistent stream operations
MGathier Feb 6, 2024
aad728b
first implementation of persistent stream operations
MGathier Feb 7, 2024
9e978db
rename persisted streams to persistent streams
MGathier Feb 13, 2024
a52491f
rename classes in EventChannel interface
MGathier Feb 13, 2024
cc997e2
small cleanup in the persistent stream API
MGathier Feb 15, 2024
a38a8e8
more javadoc
MGathier Feb 15, 2024
fbd97e6
rename service and messages in persistent-streams.proto.
MGathier Feb 19, 2024
464eb9d
process review comments
MGathier Feb 27, 2024
378cafa
process review comments
MGathier Feb 27, 2024
9efbb31
remove invalid test
MGathier Feb 27, 2024
3157e5a
Merge remote-tracking branch 'origin/master' into persisted-streams
MGathier Feb 27, 2024
c78cd44
process review comments.
MGathier Mar 18, 2024
d290f4e
process review comments.
MGathier Mar 18, 2024
9ebfd91
fix compilation errors
MGathier Mar 18, 2024
d3dc847
Merge remote-tracking branch 'origin/master' into persisted-streams
MGathier Apr 26, 2024
aaf43c9
resolve review remarks
MGathier Apr 26, 2024
971bab5
cancel of segment must not cancel the gRPC request
MGathier May 14, 2024
2c0949b
add operation to notify server when a segment has an error.
MGathier May 17, 2024
1f6498d
add support for dead letter queues
MGathier Jun 4, 2024
69035f3
let segment send a marker when it is done processing the last batch a…
MGathier Jun 7, 2024
134a48a
code cleanup
MGathier Jun 7, 2024
56c7f33
reset persistent stream position
MGathier Jun 27, 2024
ebb0f0e
reset persistent stream position - small change in interface
MGathier Jun 27, 2024
34e08ee
reset persistent stream position - missing documentation
MGathier Jun 28, 2024
e503d2e
reset persistent stream position - change in API
MGathier Jun 28, 2024
75b8f52
remove wrapper from ResetStreamConfiguration
MGathier Jul 2, 2024
7ffc1c7
update version to 2024.1.0-SNAPSHOT as this is linked to API version …
MGathier Jul 2, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

<groupId>io.axoniq</groupId>
<artifactId>axonserver-connector-java</artifactId>
<version>2023.2.1-SNAPSHOT</version>
<version>2024.0.0-SNAPSHOT</version>

<name>AxonServer Connector</name>
<description>
Expand All @@ -44,7 +44,7 @@
<sonar.organization>axoniq</sonar.organization>
<sonar.host.url>https://sonarcloud.io</sonar.host.url>

<axonserver.api.version>2023.0.1</axonserver.api.version>
<axonserver.api.version>2024.0.0-SNAPSHOT</axonserver.api.version>
MGathier marked this conversation as resolved.
Show resolved Hide resolved

<grpc.version>1.59.1</grpc.version>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@
import io.axoniq.axonserver.grpc.InstructionAck;
MGathier marked this conversation as resolved.
Show resolved Hide resolved
import io.axoniq.axonserver.grpc.event.Confirmation;
import io.axoniq.axonserver.grpc.event.Event;
import io.axoniq.axonserver.grpc.streams.StreamConnections;
import io.axoniq.axonserver.grpc.streams.StreamStatus;

import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.concurrent.CompletableFuture;

/**
Expand Down Expand Up @@ -387,4 +390,51 @@ default ResultStream<EventQueryResultEntry> querySnapshotEvents(String queryExpr
default ResultStream<EventQueryResultEntry> querySnapshotEvents(String queryExpression, boolean liveStream, String contextName) {
throw new UnsupportedOperationException();
}

/**
* Opens the persistent stream identified by {@code streamId}. Fails if the stream does not exist.
* @param streamId the unique identification of a persistent stream
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not all parameters are named in the JavaDoc.

* @return a PersistedStream streaming events per segment
*/
PersistentStream openPersistentStream(String streamId);

/**
* Opens the persistent stream identified by {@code streamId}. If the stream does not exist it will be
* created with the properties specified in {@code creationProperties}.
* @param streamId the unique identification of a persistent stream
* @param creationProperties properties to initialize the persistent stream if it does not exist yet
* @return a PersistedStream streaming events per segment
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PersistedStream -> PersistentStream

*/
MGathier marked this conversation as resolved.
Show resolved Hide resolved
PersistentStream openPersistentStream(String streamId, PersistedStreamProperties creationProperties);


MGathier marked this conversation as resolved.
Show resolved Hide resolved
/**
* Deletes a persistent stream.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps explicitly mention that this command will also return normally if the stream does not exist (if that's the way it actually behaves, of course)

* @param streamId the unique identification of a persistent stream
* @return a CompletableFuture that completes when the persistent stream is deleted
*/
CompletableFuture<Void> deletePersistentStream(String streamId);

/**
* Updates properties for a persistent stream. If {@code segments} parameter is set to a positive value, it changes
* the number of segments for the persistent stream.
* @param streamId the unique identification of a persistent stream
* @param segments the requested number of segments for the persistent stream
* @param streamName the new logical name for the stream
* @return a CompletableFuture that completes when the persistent stream is updated
*/
CompletableFuture<Void> updatePersistentStream(String streamId, Integer segments, String streamName);
abuijze marked this conversation as resolved.
Show resolved Hide resolved

/**
* Returns a list of persistent streams with the last confirmed token per segment.
* @return a list of persistent streams
*/
CompletableFuture<List<StreamStatus>> persistentStreams();

/**
* Returns a list of persistent streams with the connected client per segment.
* @return a list of persistent streams
*/
CompletableFuture<List<StreamConnections>> persistentStreamConnections();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couldn't this become potentially very long? What about returning the connections for a given streamId? Then the persistentStreams() call can be used to retrieve all ID's, and the details can be fetched for those that are relevant.


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package io.axoniq.axonserver.connector.event;

import java.util.List;

public class PersistedStreamProperties {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo in the classname. Should be PersistentStreamProperties

private final String streamName;
private final int segments;
private final String sequencingPolicyName;
private final List<String> sequencingPolicyParameters;
private final int initialPosition;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Position should be a long

private final String filter;

public PersistedStreamProperties(String streamName, int segments, String sequencingPolicyName,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Javadoc is missing on this class.

Is 0 a valid value for segments? If not, would be good to validate that.

List<String> sequencingPolicyParameters, int initialPosition, String filter) {
this.streamName = streamName;
this.segments = segments;
this.sequencingPolicyName = sequencingPolicyName;
this.sequencingPolicyParameters = sequencingPolicyParameters;
this.initialPosition = initialPosition;
this.filter = filter;
}

public String streamName() {
return streamName;
}

public int segments() {
return segments;
}

public String sequencingPolicyName() {
return sequencingPolicyName;
}

public List<String> sequencingPolicyParameters() {
return sequencingPolicyParameters;
}

public int initialPosition() {
return initialPosition;
}

public String filter() {
return filter;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package io.axoniq.axonserver.connector.event;

import java.util.function.Consumer;

/**
* A connection to a persistent stream. Axon Server can assign zero or more segments to this connection.
*/
MGathier marked this conversation as resolved.
Show resolved Hide resolved
public interface PersistentStream {

/**
* Registers a callback to invoke when Axon Server assigns a segment to this connection.
* @param callback the callback to invoke when a segment is opened
*/
void onSegmentOpened(Consumer<PersistentStreamSegment> callback);

/**
* Closes the persistent stream.
*/
void close();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious, but wouldn't it be practical if the close() operation returned a CompletableFuture?


/**
* Registers a callback to invoke when Axon Server closes this connection (or the connection to Axon Server is lost).
* @param closedCallback the callback to invoke when the persistent stream is closed
*/
void onClosed(Consumer<Throwable> closedCallback);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package io.axoniq.axonserver.connector.event;

import io.axoniq.axonserver.connector.ResultStream;
import io.axoniq.axonserver.grpc.event.EventWithToken;

/**
* An event stream producing events for one segment of a persistent stream.
*/
MGathier marked this conversation as resolved.
Show resolved Hide resolved
public interface PersistentStreamSegment extends ResultStream<EventWithToken> {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't we want the operations of this interface to be asynchronous through the use of the CompletableFuture? I believe we do so throughout the rest of the connector, so wondering why we wouldn't do that here.

/**
* Registers a callback that will be invoked when Axon Server closes the segment within the persistent
* stream connection. This happens when the number of segments in the persistent stream has changed or when
* Axon Server assigns a segment to another client.
* @param callback the callback to register
*/
void onSegmentClosed(Runnable callback);

/**
* Sends the last processed token for this segment to Axon Server. Clients may choose to notify each processed event,
* or to only sent progress information after a number of processed events.
* @param token the last processed token for this segment
*/
void acknowledge(long token);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens when the client cannot send the ack to the server, for example when the connection is lost? Should we return a CompletableFuture to... ehhh... ack the ack?


/**
* Returns the segment number of the stream.
* @return the segment number of this stream
*/
int segment();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package io.axoniq.axonserver.connector.event.impl;

import io.axoniq.axonserver.connector.event.PersistentStreamSegment;
import io.axoniq.axonserver.connector.impl.StreamClosedException;
import io.axoniq.axonserver.grpc.event.EventWithToken;

import java.util.Optional;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.LongConsumer;

public class BufferedPersistentStreamSegment
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a specific reason this class doesn't extend the AbstractBufferedStream?

implements PersistentStreamSegment {

private static final Runnable NO_OP = () -> {
};

private final Set<Runnable> onSegmentClosedCallbacks = new CopyOnWriteArraySet<>();
private final BlockingQueue<EventWithToken> buffer = new LinkedBlockingQueue<>();
private final AtomicReference<Throwable> errorResult = new AtomicReference<>();
private final AtomicBoolean closed = new AtomicBoolean();

private final AtomicReference<Runnable> onAvailableCallback = new AtomicReference<>(NO_OP);

private final int segment;
private final LongConsumer progressCallback;

public BufferedPersistentStreamSegment(int segment, LongConsumer progressCallback) {
this.segment = segment;
this.progressCallback = progressCallback;
}

@Override
public EventWithToken peek() {
checkClosed();
return buffer.peek();
}

@Override
public EventWithToken nextIfAvailable() {
checkClosed();
abuijze marked this conversation as resolved.
Show resolved Hide resolved
return buffer.poll();
}

@Override
public EventWithToken nextIfAvailable(long timeout, TimeUnit unit) throws InterruptedException {
checkClosed();
return buffer.poll(timeout, unit);
}

@Override
public EventWithToken next() throws InterruptedException {
checkClosed();
return buffer.take();
}

@Override
public void onAvailable(Runnable callback) {
checkClosed();
if (callback == null) {
onAvailableCallback.set(NO_OP);
} else {
onAvailableCallback.set(callback);
if (isClosed() || peek() != null) {
callback.run();
}
}
}

private void checkClosed() {
if (closed.get()) {
throw new StreamClosedException(errorResult.get());
}
}

@Override
public void close() {
// No-op
}

@Override
public boolean isClosed() {
return closed.get();
}

@Override
public Optional<Throwable> getError() {
return Optional.ofNullable(errorResult.get());
}

@Override
public void onSegmentClosed(Runnable callback) {
onSegmentClosedCallbacks.add(callback);
}

public void onNext(EventWithToken streamSignal) {
abuijze marked this conversation as resolved.
Show resolved Hide resolved
buffer.add(streamSignal);
onAvailableCallback.get().run();
}

public void onError(Throwable throwable) {
errorResult.set(throwable);
onCompleted();
}

public void onCompleted() {
closed.set(true);
onSegmentClosedCallbacks.forEach(Runnable::run);
onAvailableCallback.get().run();
}

@Override
public void acknowledge(long token) {
if (!closed.get()) {
progressCallback.accept(token);
}
}

@Override
public int segment() {
return segment;
}
}
Loading
Loading