-
Notifications
You must be signed in to change notification settings - Fork 6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Persistent streams #347
Persistent streams #347
Conversation
private final int segments; | ||
private final String sequencingPolicyName; | ||
private final List<String> sequencingPolicyParameters; | ||
private final int initialPosition; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Position should be a long
src/main/java/io/axoniq/axonserver/connector/event/EventChannel.java
Outdated
Show resolved
Hide resolved
src/main/java/io/axoniq/axonserver/connector/event/impl/BufferedPersistentStreamSegment.java
Outdated
Show resolved
Hide resolved
src/main/java/io/axoniq/axonserver/connector/event/impl/BufferedPersistentStreamSegment.java
Outdated
Show resolved
Hide resolved
* or to only sent progress information after a number of processed events. | ||
* @param token the last processed token for this segment | ||
*/ | ||
void acknowledge(long token); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens when the client cannot send the ack to the server, for example when the connection is lost? Should we return a CompletableFuture to... ehhh... ack the ack?
|
||
import java.util.List; | ||
|
||
public class PersistedStreamProperties { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typo in the classname. Should be PersistentStreamProperties
* created with the properties specified in {@code creationProperties}. | ||
* @param streamId the unique identification of a persistent stream | ||
* @param creationProperties properties to initialize the persistent stream if it does not exist yet | ||
* @return a PersistedStream streaming events per segment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PersistedStream -> PersistentStream
|
||
|
||
/** | ||
* Deletes a persistent stream. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps explicitly mention that this command will also return normally if the stream does not exist (if that's the way it actually behaves, of course)
* @param segments the requested number of segments for the persistent stream | ||
* @return a CompletableFuture that completes when the persistent stream is updated | ||
*/ | ||
CompletableFuture<Void> setPersistentStreamSegments(String streamId, Integer segments); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this an Integer
, rather than an int
? Is null
an option somehow?
* Returns a list of persistent streams with the connected client per segment. | ||
* @return a list of persistent streams | ||
*/ | ||
CompletableFuture<List<StreamConnections>> persistentStreamConnections(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Couldn't this become potentially very long? What about returning the connections for a given streamId? Then the persistentStreams()
call can be used to retrieve all ID's, and the details can be fetched for those that are relevant.
private final long initialPosition; | ||
private final String filter; | ||
|
||
public PersistedStreamProperties(String streamName, int segments, String sequencingPolicyName, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Javadoc is missing on this class.
Is 0
a valid value for segments
? If not, would be good to validate that.
import java.util.concurrent.atomic.AtomicReference; | ||
import java.util.function.LongConsumer; | ||
|
||
public class BufferedPersistentStreamSegment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a specific reason this class doesn't extend the AbstractBufferedStream
?
|
||
@BeforeEach | ||
void setup() throws ExecutionException, InterruptedException { | ||
AxonServerConnectionFactory connectionFactory = AxonServerConnectionFactory.forClient("demo", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test seems to assume AxonServer is running on port 8124 to run. Could we use testcontainers instead?
.build()).get(1, TimeUnit.SECONDS); | ||
|
||
long last = eventChannel.getLastToken().get(); | ||
Thread.sleep(2000); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is an assertWithin
in the AssertUtils
class which would prevent the need for arbitrary sleeps.
There don't seem to be any assertions in this test either.
let client control flow of events for a persistent stream.
let client control flow of events for a persistent stream.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bunch of remarks that would clean up the new code before we merge it.
|
||
/** | ||
* Opens the persistent stream identified by {@code streamId}. Fails if the stream does not exist. | ||
* @param streamId the unique identification of a persistent stream |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not all parameters are named in the JavaDoc.
src/main/java/io/axoniq/axonserver/connector/event/EventChannel.java
Outdated
Show resolved
Hide resolved
src/test/java/io/axoniq/axonserver/connector/event/impl/PersistentStreamImplTest.java
Show resolved
Hide resolved
src/main/java/io/axoniq/axonserver/connector/event/impl/EventChannelImpl.java
Outdated
Show resolved
Hide resolved
return InitializationProperties.newBuilder() | ||
.setInitialPosition(creationProperties.initialPosition()) | ||
.setSegments(creationProperties.segments()) | ||
.setStreamName(nonNullOrDefault(creationProperties.streamName(),"")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An empty String
as the Stream name seems rather useless to me. Wouldn't a shortened UUID better fit the bill if nothing's provided?
src/main/java/io/axoniq/axonserver/connector/event/impl/PersistentStreamImpl.java
Outdated
Show resolved
Hide resolved
src/main/java/io/axoniq/axonserver/connector/event/impl/PersistentStreamImpl.java
Outdated
Show resolved
Hide resolved
# Conflicts: # pom.xml
update callback signature to provide the PersistentStreamSegment instead of just the segment number.
…fter a close request wait for segments to complete the active batch on stopping the application
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My concerns have been addressed, hence I'm approving this pull request.
No description provided.