Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

createConsumerContext fail fast on push consumer #1222

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion src/main/java/io/nats/client/JetStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
* It can create a named durable consumers though Options, but we prefer to avoid creating durable consumers implictly.
* It is <b>recommened</b> to manage consumers explicitly through {@link StreamContext StreamContext} and {@link ConsumerContext ConsumerContext} or {@link JetStreamManagement JetStreamManagement}
*
*
* <h3>Recommended usage for creating streams, consumers, publish and listen on a stream</h3>
* <pre>
* io.nats.client.Connection nc = Nats.connect();
Expand Down
7 changes: 6 additions & 1 deletion src/main/java/io/nats/client/api/Error.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,12 @@ public class Error implements JsonSerializable {
static Error optionalInstance(JsonValue vError) {
return vError == null ? null : new Error(vError);
}


public static Error instance(int code, String description) {
return new Error(code, NOT_SET, description);
}


Error(JsonValue jv) {
this.jv = jv;
}
Expand Down
13 changes: 12 additions & 1 deletion src/main/java/io/nats/client/impl/NatsConsumerContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,11 @@ public class NatsConsumerContext implements ConsumerContext, SimplifiedSubscript
private final AtomicReference<Dispatcher> defaultDispatcher;
private final AtomicReference<NatsMessageConsumerBase> lastConsumer;

NatsConsumerContext(NatsStreamContext sc, ConsumerInfo unorderedConsumerInfo, OrderedConsumerConfiguration orderedCc) {
/*
* Only called from the internal implementation.
* <p>
*/
NatsConsumerContext(NatsStreamContext sc, ConsumerInfo unorderedConsumerInfo, OrderedConsumerConfiguration orderedCc) throws JetStreamApiException {
stateLock = new ReentrantLock();
streamCtx = sc;
cachedConsumerInfo = new AtomicReference<>();
Expand All @@ -56,6 +60,13 @@ public class NatsConsumerContext implements ConsumerContext, SimplifiedSubscript
defaultDispatcher = new AtomicReference<>();
lastConsumer = new AtomicReference<>();
if (unorderedConsumerInfo != null) {
if (unorderedConsumerInfo != null) {
//Fail fast on push consumers as all operation on ConsumerContext expect a pull consumer.
//If we don't fail fast, calls to consume() or iterat() will hang with the server returning: code=409, message='Consumer is push based'
ConsumerConfiguration consumerConfig = unorderedConsumerInfo.getConsumerConfiguration();
if ( consumerConfig.getDeliverSubject() != null )
throw new JetStreamApiException(io.nats.client.api.Error.instance(409, "Consumer is push based. ConsumerContext only supports pull consumers."));
}
ordered = false;
originalOrderedCc = null;
subscribeSubject = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
public class NatsOrderedConsumerContext implements OrderedConsumerContext {
private final NatsConsumerContext impl;

NatsOrderedConsumerContext(NatsStreamContext streamContext, OrderedConsumerConfiguration config) {
NatsOrderedConsumerContext(NatsStreamContext streamContext, OrderedConsumerConfiguration config) throws JetStreamApiException {
impl = new NatsConsumerContext(streamContext, null, config);
}

Expand Down
22 changes: 22 additions & 0 deletions src/test/java/io/nats/client/impl/SimplificationTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -966,6 +966,28 @@ public void testOrderedConsumerBuilder() throws IOException, ClassNotFoundExcept
check_values(roundTripSerialize(occ), zdt);
}


@Test
public void testConsumerContextRejectsPush() throws Exception {
jsServer.run(TestBase::atLeast2_9_1, nc -> {
JetStreamManagement jsm = nc.jetStreamManagement();
JetStream js = nc.jetStream();

TestingStreamContainer tsc = new TestingStreamContainer(jsm);
jsPublish(js, tsc.subject(), 4);

String name = name();

// Pre define a consumer - set delivery subject to make it a push consumer
ConsumerConfiguration cc = ConsumerConfiguration.builder().durable(name).deliverSubject("delivery_"+name).build();
jsm.addOrUpdateConsumer(tsc.stream, cc);

// Consumer[Context]
assertThrows(JetStreamApiException.class, () -> js.getConsumerContext(tsc.stream, name)); // Push consumer not allowed

});
}

private static void check_default_values(OrderedConsumerConfiguration occ) {
assertEquals(">", occ.getFilterSubject());
assertNull(occ.getDeliverPolicy());
Expand Down
Loading