Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Persistent streams #347

Merged
merged 27 commits into from
Jul 2, 2024
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
e9ef61e
first implementation of persistent stream operations
MGathier Feb 6, 2024
aad728b
first implementation of persistent stream operations
MGathier Feb 7, 2024
9e978db
rename persisted streams to persistent streams
MGathier Feb 13, 2024
a52491f
rename classes in EventChannel interface
MGathier Feb 13, 2024
cc997e2
small cleanup in the persistent stream API
MGathier Feb 15, 2024
a38a8e8
more javadoc
MGathier Feb 15, 2024
fbd97e6
rename service and messages in persistent-streams.proto.
MGathier Feb 19, 2024
464eb9d
process review comments
MGathier Feb 27, 2024
378cafa
process review comments
MGathier Feb 27, 2024
9efbb31
remove invalid test
MGathier Feb 27, 2024
3157e5a
Merge remote-tracking branch 'origin/master' into persisted-streams
MGathier Feb 27, 2024
c78cd44
process review comments.
MGathier Mar 18, 2024
d290f4e
process review comments.
MGathier Mar 18, 2024
9ebfd91
fix compilation errors
MGathier Mar 18, 2024
d3dc847
Merge remote-tracking branch 'origin/master' into persisted-streams
MGathier Apr 26, 2024
aaf43c9
resolve review remarks
MGathier Apr 26, 2024
971bab5
cancel of segment must not cancel the gRPC request
MGathier May 14, 2024
2c0949b
add operation to notify server when a segment has an error.
MGathier May 17, 2024
1f6498d
add support for dead letter queues
MGathier Jun 4, 2024
69035f3
let segment send a marker when it is done processing the last batch a…
MGathier Jun 7, 2024
134a48a
code cleanup
MGathier Jun 7, 2024
56c7f33
reset persistent stream position
MGathier Jun 27, 2024
ebb0f0e
reset persistent stream position - small change in interface
MGathier Jun 27, 2024
34e08ee
reset persistent stream position - missing documentation
MGathier Jun 28, 2024
e503d2e
reset persistent stream position - change in API
MGathier Jun 28, 2024
75b8f52
remove wrapper from ResetStreamConfiguration
MGathier Jul 2, 2024
7ffc1c7
update version to 2024.1.0-SNAPSHOT as this is linked to API version …
MGathier Jul 2, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

<groupId>io.axoniq</groupId>
<artifactId>axonserver-connector-java</artifactId>
<version>2023.2.1-SNAPSHOT</version>
<version>2024.0.0-SNAPSHOT</version>

<name>AxonServer Connector</name>
<description>
Expand All @@ -44,7 +44,7 @@
<sonar.organization>axoniq</sonar.organization>
<sonar.host.url>https://sonarcloud.io</sonar.host.url>

<axonserver.api.version>2023.0.1</axonserver.api.version>
<axonserver.api.version>2024.0.0-SNAPSHOT</axonserver.api.version>
MGathier marked this conversation as resolved.
Show resolved Hide resolved

<grpc.version>1.59.1</grpc.version>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
import io.axoniq.axonserver.grpc.InstructionAck;
MGathier marked this conversation as resolved.
Show resolved Hide resolved
import io.axoniq.axonserver.grpc.event.Confirmation;
import io.axoniq.axonserver.grpc.event.Event;
import io.axoniq.axonserver.grpc.streams.StreamStatus;

import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.concurrent.CompletableFuture;

/**
Expand Down Expand Up @@ -387,4 +389,54 @@ default ResultStream<EventQueryResultEntry> querySnapshotEvents(String queryExpr
default ResultStream<EventQueryResultEntry> querySnapshotEvents(String queryExpression, boolean liveStream, String contextName) {
throw new UnsupportedOperationException();
}

/**
* Opens the persistent stream identified by {@code streamId}. Fails if the stream does not exist.
* @param streamId the unique identification of a persistent stream
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not all parameters are named in the JavaDoc.

* @return a PersistedStream streaming events per segment
*/
PersistentStream openPersistentStream(String streamId, int bufferSize, int refillBatch,
PersistentStreamCallbacks callbacks);

/**
* Opens the persistent stream identified by {@code streamId}. If the stream does not exist it will be
* created with the properties specified in {@code creationProperties}.
* @param streamId the unique identification of a persistent stream
* @param creationProperties properties to initialize the persistent stream if it does not exist yet
* @return a PersistentStream streaming events per segment
*/
MGathier marked this conversation as resolved.
Show resolved Hide resolved
PersistentStream openPersistentStream(String streamId, int bufferSize, int refillBatch,
PersistentStreamCallbacks callbacks,
PersistentStreamProperties creationProperties);


MGathier marked this conversation as resolved.
Show resolved Hide resolved
/**
* Deletes a persistent stream. If the stream does not exist the operation completes successfully.
* @param streamId the unique identification of a persistent stream
* @return a CompletableFuture that completes when the persistent stream is deleted
*/
CompletableFuture<Void> deletePersistentStream(String streamId);

/**
* Updates the name for a persistent stream.
* @param streamId the unique identification of a persistent stream
* @param streamName the new logical name for the stream
* @return a CompletableFuture that completes when the persistent stream is updated
*/
CompletableFuture<Void> updatePersistentStreamName(String streamId, String streamName);

/**
* Updates the number of segments for a persistent stream.
* @param streamId the unique identification of a persistent stream
* @param segments the requested number of segments for the persistent stream
* @return a CompletableFuture that completes when the persistent stream is updated
*/
CompletableFuture<Void> setPersistentStreamSegments(String streamId, int segments);

/**
* Returns a list of persistent streams with the last confirmed token per segment.
* @return a list of persistent streams
*/
CompletableFuture<List<StreamStatus>> persistentStreams();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright (c) 2020-2024. AxonIQ
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.axoniq.axonserver.connector.event;

/**
* A connection to a persistent stream. Axon Server can assign zero or more segments to this connection.
*/
MGathier marked this conversation as resolved.
Show resolved Hide resolved
public interface PersistentStream {

/**
* Closes the persistent stream.
*/
void close();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious, but wouldn't it be practical if the close() operation returned a CompletableFuture?


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Copyright (c) 2020-2024. AxonIQ
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.axoniq.axonserver.connector.event;

import java.util.function.Consumer;
import java.util.function.IntConsumer;

/**
* Definitions of the callbacks to be added to a persistent stream.
*/
MGathier marked this conversation as resolved.
Show resolved Hide resolved
public class PersistentStreamCallbacks {

private final Consumer<PersistentStreamSegment> onSegmentOpened;

private final IntConsumer onSegmentClosed;
private final IntConsumer onAvailable;

private final Consumer<Throwable> onClosed;

/**
* Instantiates the callbacks object.
*
* @param onSegmentOpened callback that the connector invokes when a persistent stream segment is opened
* @param onSegmentClosed callback that the connector invokes when a persistent stream segment is closed
* @param onAvailable callback that the connector invokes when an event is available on a persistent stream
* segment
* @param onClosed callback that the connector invokes when the persistent stream is closed
*/
public PersistentStreamCallbacks(Consumer<PersistentStreamSegment> onSegmentOpened, IntConsumer onSegmentClosed,
IntConsumer onAvailable,
Consumer<Throwable> onClosed) {
this.onSegmentOpened = onSegmentOpened;
this.onSegmentClosed = onSegmentClosed;
this.onAvailable = onAvailable;
this.onClosed = onClosed;
}

/**
* Returns the callback that the connector invokes when the persistent stream is closed.
*
* @return callback that the connector invokes when the persistent stream is closed
*/
public Consumer<Throwable> onClosed() {
return onClosed;
}

/**
* Returns the callback that the connector invokes when a persistent stream segment is opened.
*
* @return callback that the connector invokes when a persistent stream segment is opened
*/
public Consumer<PersistentStreamSegment> onSegmentOpened() {
return onSegmentOpened;
}

/**
* Returns the callback that the connector invokes when a persistent stream segment is closed.
*
* @return callback that the connector invokes when a persistent stream segment is closed
*/
public IntConsumer onSegmentClosed() {
return onSegmentClosed;
}

/**
* Returns the callback that the connector invokes when an event is available on a persistent stream segment.
*
* @return callback that the connector invokes when an event is available on a persistent stream segment
*/
public IntConsumer onAvailable() {
return onAvailable;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Copyright (c) 2020-2024. AxonIQ
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.axoniq.axonserver.connector.event;

import io.axoniq.axonserver.connector.impl.AssertUtils;

import java.util.List;

/**
* Defines the properties for a new persistent stream.
*/
MGathier marked this conversation as resolved.
Show resolved Hide resolved
public class PersistentStreamProperties {
private final String streamName;
private final int segments;
private final String sequencingPolicyName;
private final List<String> sequencingPolicyParameters;
private final long initialPosition;
private final String filter;

/**
* Instantiates the persistent stream properties
*
* @param streamName a logical name for the persistent stream
* @param segments the number of segments, must be larger than 0
* @param sequencingPolicyName the sequencing policy name, must be a name known in Axon Server
* @param sequencingPolicyParameters optional parameters for the sequencing policy
* @param initialPosition first token to read
* @param filter an optional filter to filter events on Axon Server side
*/
public PersistentStreamProperties(String streamName, int segments, String sequencingPolicyName,
List<String> sequencingPolicyParameters, long initialPosition, String filter) {
AssertUtils.assertParameter(segments > 0, "Segments must be > 0");
AssertUtils.assertParameter(initialPosition >= 0, "initialPosition must be >= 0");
AssertUtils.assertParameter(sequencingPolicyName != null,
"sequencingPolicyName must not be null");

this.streamName = streamName;
this.segments = segments;
this.sequencingPolicyName = sequencingPolicyName;
this.sequencingPolicyParameters = sequencingPolicyParameters;
this.initialPosition = initialPosition;
this.filter = filter;
}

/**
* Returns the logical name for the persistent stream.
*
* @return the logical name for the persistent stream
*/
public String streamName() {
return streamName;
}

/**
* Returns the number of segments.
* @return the number of segments
*/
public int segments() {
return segments;
}

/**
* Returns the sequencing policy name.
* @return the sequencing policy name
*/
public String sequencingPolicyName() {
return sequencingPolicyName;
}

/**
* Returns the parameters for the sequencing policy.
* @return parameters for the sequencing policy
*/
public List<String> sequencingPolicyParameters() {
return sequencingPolicyParameters;
}

/**
* Returns the first token to read.
* @return first token to read
*/
public long initialPosition() {
return initialPosition;
}

/**
* Returns the filter to filter events on Axon Server side.
* @return filter to filter events on Axon Server side
*/
public String filter() {
return filter;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright (c) 2020-2024. AxonIQ
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.axoniq.axonserver.connector.event;

import io.axoniq.axonserver.connector.ResultStream;
import io.axoniq.axonserver.grpc.event.EventWithToken;

/**
* An event stream producing events for one segment of a persistent stream.
*/
MGathier marked this conversation as resolved.
Show resolved Hide resolved
public interface PersistentStreamSegment extends ResultStream<EventWithToken> {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't we want the operations of this interface to be asynchronous through the use of the CompletableFuture? I believe we do so throughout the rest of the connector, so wondering why we wouldn't do that here.

/**
* Registers a callback that will be invoked when Axon Server closes the segment within the persistent
* stream connection. This happens when the number of segments in the persistent stream has changed or when
* Axon Server assigns a segment to another client.
* @param callback the callback to register
*/
void onSegmentClosed(Runnable callback);

/**
* Sends the last processed token for this segment to Axon Server. Clients may choose to notify each processed event,
* or to only sent progress information after a number of processed events.
* @param token the last processed token for this segment
*/
void acknowledge(long token);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens when the client cannot send the ack to the server, for example when the connection is lost? Should we return a CompletableFuture to... ehhh... ack the ack?


/**
* Returns the segment number of the stream.
* @return the segment number of this stream
*/
int segment();
}
Loading
Loading