Skip to content

Commit

Permalink
Merge branch 'main' into Fail-fast-ConsumerCOntext-wiht-push-consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
scottf authored Nov 1, 2024
2 parents 13911ec + 10db91d commit 18f3744
Show file tree
Hide file tree
Showing 37 changed files with 675 additions and 265 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -980,6 +980,8 @@ You can however set the deliver policy which will be used to start the subscript
| JsConsumerCreate290NotAvailable | CON-90301 | Name field not valid when v2.9.0 consumer create api is not available. |
| JsConsumerNameDurableMismatch | CON-90302 | Name must match durable if both are supplied. |
| JsMultipleFilterSubjects210NotAvailable | CON-90303 | Multiple filter subjects not available until server version 2.10.0. |
| JsAllowDirectRequired | CON-90304 | Stream must have allow direct set. |
| JsDirectBatchGet211NotAvailable | CON-90305 | Batch direct get not available until server version 2.11.0. |
| OsObjectNotFound | OS-90201 | The object was not found. |
| OsObjectIsDeleted | OS-90202 | The object is deleted. |
| OsObjectAlreadyExists | OS-90203 | An object with that name already exists. |
Expand Down
8 changes: 7 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,11 @@ task sourcesJar(type: Jar) {
from sourceSets.main.allSource
}

task testsJar(type: Jar) {
archiveClassifier.set('tests')
from sourceSets.test.allSource
}

// run build before running fat jar to get classes
task fatJar(type: Jar) {
archiveClassifier.set('fat')
Expand Down Expand Up @@ -168,7 +173,7 @@ jacocoTestReport {
}

artifacts {
archives javadocJar, sourcesJar, examplesJar
archives javadocJar, sourcesJar, examplesJar, testsJar
}

nexusPublishing {
Expand All @@ -187,6 +192,7 @@ publishing {
artifact sourcesJar
artifact examplesJar
artifact javadocJar
artifact testsJar
pom {
name = 'jnats'
packaging = 'jar'
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/io/nats/client/BaseConsumeOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,10 @@ public class BaseConsumeOptions implements JsonSerializable {
protected BaseConsumeOptions(Builder b) {
bytes = b.bytes;
if (bytes > 0) {
messages = b.messages == -1 ? DEFAULT_MESSAGE_COUNT_WHEN_BYTES : b.messages;
messages = b.messages < 0 ? DEFAULT_MESSAGE_COUNT_WHEN_BYTES : b.messages;
}
else {
messages = b.messages == -1 ? DEFAULT_MESSAGE_COUNT : b.messages;
messages = b.messages < 0 ? DEFAULT_MESSAGE_COUNT : b.messages;
}

// validation handled in builder
Expand Down
8 changes: 4 additions & 4 deletions src/main/java/io/nats/client/ConnectionListener.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
* listener is configured in the {@link Options Options} at creation time.
*/
public interface ConnectionListener {
public enum Events {
enum Events {
/** The connection has successfully completed the handshake with the nats-server. */
CONNECTED(true, "opened"),
/** The connection is permanently closed, either by manual action or failed reconnects. */
Expand All @@ -29,7 +29,7 @@ public enum Events {
RECONNECTED(true, "reconnected"),
/** The connection was reconnected and the server has been notified of all subscriptions. */
RESUBSCRIBED(false, "subscriptions re-established"),
/** The connection was told about new servers from, from the current server. */
/** The connection was made aware of new servers from the current server connection. */
DISCOVERED_SERVERS(false, "discovered servers"),
/** Server Sent a lame duck mode. */
LAME_DUCK(false, "lame duck mode");
Expand Down Expand Up @@ -77,5 +77,5 @@ public String toString() {
* @param conn the connection associated with the error
* @param type the type of event that has occurred
*/
public void connectionEvent(Connection conn, Events type);
}
void connectionEvent(Connection conn, Events type);
}
3 changes: 1 addition & 2 deletions src/main/java/io/nats/client/FeatureOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ public abstract class FeatureOptions {

private final JetStreamOptions jso;

@SuppressWarnings("rawtypes") // Don't need the type of the builder to get its vars
protected FeatureOptions(Builder b) {
protected FeatureOptions(Builder<?, ?> b) {
jso = b.jsoBuilder.build();
}

Expand Down
11 changes: 0 additions & 11 deletions src/main/java/io/nats/client/JetStreamManagement.java
Original file line number Diff line number Diff line change
Expand Up @@ -271,17 +271,6 @@ public interface JetStreamManagement {
*/
MessageInfo getLastMessage(String streamName, String subject) throws IOException, JetStreamApiException;

/**
* Get MessageInfo for the first message of the subject.
* @param streamName the name of the stream.
* @param subject the subject to get the first message for.
* @return The MessageInfo
* @throws IOException covers various communication issues with the NATS
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
*/
MessageInfo getFirstMessage(String streamName, String subject) throws IOException, JetStreamApiException;

/**
* Get MessageInfo for the message of the message sequence
* is equal to or greater the requested sequence for the subject.
Expand Down
29 changes: 29 additions & 0 deletions src/main/java/io/nats/client/MessageInfoHandler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright 2024 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package io.nats.client;

import io.nats.client.api.MessageInfo;

/**
* Handler for {@link MessageInfo}.
*/
public interface MessageInfoHandler {
/**
* Called to deliver a {@link MessageInfo} to the handler.
*
* @param messageInfo the received {@link MessageInfo}
* @throws InterruptedException if the thread for this handler is interrupted
*/
void onMessageInfo(MessageInfo messageInfo) throws InterruptedException;
}
36 changes: 33 additions & 3 deletions src/main/java/io/nats/client/Options.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,19 +56,19 @@ public class Options {
// NOTE TO DEVS!!! To add an option, you have to address:
// ----------------------------------------------------------------------------------------------------
// CONSTANTS * optionally add a default value constant
// ENVIRONMENT PROPERTIES * most of the time add an environment property, should always be in the form PFX +
// ENVIRONMENT PROPERTIES * always add an environment property. Constant always starts with PFX, but code accepts without
// PROTOCOL CONNECT OPTION CONSTANTS * not related to options, but here because Options code uses them
// CLASS VARIABLES * add a variable to the class
// BUILDER VARIABLES * add a variable in builder
// BUILDER COPY CONSTRUCTOR * update builder constructor to ensure new variables are set
// BUILD CONSTRUCTOR PROPS * update build props constructor to read new props
// BUILDER METHODS * add a chainable method in builder for new variable
// BUILD IMPL * update build() implementation if needed
// BUILDER COPY CONSTRUCTOR * update builder constructor to ensure new variables are set
// CONSTRUCTOR * update constructor to ensure new variables are set from builder
// GETTERS * update getter to be able to retrieve class variable value
// HELPER FUNCTIONS * just helpers
// ----------------------------------------------------------------------------------------------------
// README - if you add a property or change it's comment, add it to or update the readme
// README - if you add a property or change its comment, add it to or update the readme
// ----------------------------------------------------------------------------------------------------

// ----------------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -492,6 +492,10 @@ public class Options {
* {@link Builder#useDispatcherWithExecutor()}.
*/
public static final String PROP_USE_DISPATCHER_WITH_EXECUTOR = PFX + "use.dispatcher.with.executor";
/**
* Property used to configure a builder from a Properties object. {@value}, see {@link Builder#forceFlushOnRequest() forceFlushOnRequest}.
*/
public static final String PROP_FORCE_FLUSH_ON_REQUEST = PFX + "force.flush.on.request";

// ----------------------------------------------------------------------------------------------------
// PROTOCOL CONNECT OPTION CONSTANTS
Expand Down Expand Up @@ -625,6 +629,7 @@ public class Options {
private final boolean tlsFirst;
private final boolean useTimeoutException;
private final boolean useDispatcherWithExecutor;
private final boolean forceFlushOnRequest;

private final AuthHandler authHandler;
private final ReconnectDelayHandler reconnectDelayHandler;
Expand Down Expand Up @@ -741,6 +746,7 @@ public static class Builder {
private boolean tlsFirst = false;
private boolean useTimeoutException = false;
private boolean useDispatcherWithExecutor = false;
private boolean forceFlushOnRequest = true; // true since it's the original b/w compatible way
private ServerPool serverPool = null;
private DispatcherFactory dispatcherFactory = null;

Expand Down Expand Up @@ -876,6 +882,7 @@ public Builder properties(Properties props) {
booleanProperty(props, PROP_TLS_FIRST, b -> this.tlsFirst = b);
booleanProperty(props, PROP_USE_TIMEOUT_EXCEPTION, b -> this.useTimeoutException = b);
booleanProperty(props, PROP_USE_DISPATCHER_WITH_EXECUTOR, b -> this.useDispatcherWithExecutor = b);
booleanProperty(props, PROP_FORCE_FLUSH_ON_REQUEST, b -> this.forceFlushOnRequest = b);

classnameProperty(props, PROP_SERVERS_POOL_IMPLEMENTATION_CLASS, o -> this.serverPool = (ServerPool) o);
classnameProperty(props, PROP_DISPATCHER_FACTORY_CLASS, o -> this.dispatcherFactory = (DispatcherFactory) o);
Expand Down Expand Up @@ -1658,6 +1665,15 @@ public Builder useDispatcherWithExecutor() {
return this;
}

/**
* Instruct requests to turn off flush on requests.
* @return the Builder for chaining
*/
public Builder dontForceFlushOnRequest() {
this.forceFlushOnRequest = false;
return this;
}

/**
* Set the ServerPool implementation for connections to use instead of the default implementation
* @param serverPool the implementation
Expand Down Expand Up @@ -1905,6 +1921,7 @@ public Builder(Options o) {
this.tlsFirst = o.tlsFirst;
this.useTimeoutException = o.useTimeoutException;
this.useDispatcherWithExecutor = o.useDispatcherWithExecutor;
this.forceFlushOnRequest = o.forceFlushOnRequest;

this.serverPool = o.serverPool;
this.dispatcherFactory = o.dispatcherFactory;
Expand Down Expand Up @@ -1969,6 +1986,7 @@ private Options(Builder b) {
this.tlsFirst = b.tlsFirst;
this.useTimeoutException = b.useTimeoutException;
this.useDispatcherWithExecutor = b.useDispatcherWithExecutor;
this.forceFlushOnRequest = b.forceFlushOnRequest;

this.serverPool = b.serverPool;
this.dispatcherFactory = b.dispatcherFactory;
Expand Down Expand Up @@ -2405,8 +2423,20 @@ public boolean useTimeoutException() {
return useTimeoutException;
}

/**
* Whether the dispatcher should use an executor to async messages to handlers
* @return the flag
*/
public boolean useDispatcherWithExecutor() { return useDispatcherWithExecutor; }

/**
* Whether to flush on any user request
* @return the flag
*/
public boolean forceFlushOnRequest() {
return forceFlushOnRequest;
}

/**
* Get the ServerPool implementation. If null, a default implementation is used.
* @return the ServerPool implementation
Expand Down
10 changes: 0 additions & 10 deletions src/main/java/io/nats/client/StreamContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -163,16 +163,6 @@ public interface StreamContext {
*/
MessageInfo getLastMessage(String subject) throws IOException, JetStreamApiException;

/**
* Get MessageInfo for the first message of the subject.
* @param subject the subject to get the first message for.
* @return The MessageInfo
* @throws IOException covers various communication issues with the NATS
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
*/
MessageInfo getFirstMessage(String subject) throws IOException, JetStreamApiException;

/**
* Get MessageInfo for the message of the message sequence
* is equal to or greater the requested sequence for the subject.
Expand Down
3 changes: 1 addition & 2 deletions src/main/java/io/nats/client/SubscribeOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ public abstract class SubscribeOptions {
protected final long pendingByteLimit; // Only applicable for non dispatched (sync) push consumers.
protected final String name;

@SuppressWarnings("rawtypes") // Don't need the type of the builder to get its vars
protected SubscribeOptions(Builder builder, boolean isPull,
protected SubscribeOptions(Builder<?, ?> builder, boolean isPull,
String deliverSubject, String deliverGroup,
long pendingMessageLimit, long pendingByteLimit) {

Expand Down
6 changes: 6 additions & 0 deletions src/main/java/io/nats/client/api/ApiResponse.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,12 @@ public ApiResponse() {
type = NO_TYPE;
}

public ApiResponse(Error error) {
jv = null;
this.error = error;
type = NO_TYPE;
}

@SuppressWarnings("unchecked")
public T throwOnHasError() throws JetStreamApiException {
if (hasError()) {
Expand Down
Loading

0 comments on commit 18f3744

Please sign in to comment.