Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix implicit attach on subscribe #1028

Merged
merged 6 commits into from
Sep 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 20 additions & 3 deletions lib/src/main/java/io/ably/lib/realtime/ChannelBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -690,6 +690,17 @@ public synchronized void unsubscribe() {
eventListeners.clear();
}

/**
* <p>
* Checks if {@link io.ably.lib.types.ChannelOptions#attachOnSubscribe} is true.
* </p>
* Defaults to {@code true} when {@link io.ably.lib.realtime.ChannelBase#options} is null.
* <p>Spec: TB4, RTL7g, RTL7gh, RTP6d, RTP6e</p>
*/
protected boolean attachOnSubscribeEnabled() {
return options == null || options.attachOnSubscribe;
}

/**
* Registers a listener for messages on this channel.
* The caller supplies a listener function, which is called each time one or more messages arrives on the channel.
Expand All @@ -704,7 +715,9 @@ public synchronized void unsubscribe() {
public synchronized void subscribe(MessageListener listener) throws AblyException {
Log.v(TAG, "subscribe(); channel = " + this.name);
listeners.add(listener);
attach();
if (attachOnSubscribeEnabled()) {
attach();
}
}

/**
Expand Down Expand Up @@ -739,7 +752,9 @@ public synchronized void unsubscribe(MessageListener listener) {
public synchronized void subscribe(String name, MessageListener listener) throws AblyException {
Log.v(TAG, "subscribe(); channel = " + this.name + "; event = " + name);
subscribeImpl(name, listener);
attach();
if (attachOnSubscribeEnabled()) {
attach();
}
}

/**
Expand Down Expand Up @@ -773,7 +788,9 @@ public synchronized void subscribe(String[] names, MessageListener listener) thr
Log.v(TAG, "subscribe(); channel = " + this.name + "; (multiple events)");
for(String name : names)
subscribeImpl(name, listener);
attach();
if (attachOnSubscribeEnabled()) {
attach();
}
}

/**
Expand Down
6 changes: 6 additions & 0 deletions lib/src/main/java/io/ably/lib/realtime/Presence.java
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,12 @@ public void unsubscribe() {
* @throws AblyException
*/
private void implicitAttachOnSubscribe(CompletionListener completionListener) throws AblyException {
if (!channel.attachOnSubscribeEnabled()) {
if (completionListener != null) {
completionListener.onSuccess();
}
return;
}
if (channel.state == ChannelState.failed) {
String errorString = String.format(Locale.ROOT, "Channel %s: subscribe in FAILED channel state", channel.name);
Log.v(TAG, errorString);
Expand Down
11 changes: 11 additions & 0 deletions lib/src/main/java/io/ably/lib/types/ChannelOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,17 @@ public class ChannelOptions {
*/
public boolean encrypted;

/**
* <p>
* Determines whether calling {@link io.ably.lib.realtime.Channel#subscribe Channel.subscribe} or
* {@link io.ably.lib.realtime.Presence#subscribe Presence.subscribe} method
* should trigger an implicit attach.
* </p>
* <p>Defaults to {@code true}.</p>
* <p>Spec: TB4, RTL7g, RTL7gh, RTP6d, RTP6e</p>
*/
public boolean attachOnSubscribe = true;

public boolean hasModes() {
return null != modes && 0 != modes.length;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,71 @@ public void onMessage(Message message) {
}
}

/**
* <p>
* Validates a client can subscribe to messages without implicit channel attach
* Refer Spec TB4, RTL7g, RTL7gh
* </p>
* @throws AblyException
*/
@Test
public void subscribe_without_implicit_attach() {
String channelName = "subscribe_" + testParams.name;
AblyRealtime ably = null;
try {
ClientOptions opts = createOptions(testVars.keys[0].keyStr);
ably = new AblyRealtime(opts);

/* create a channel and set attachOnSubscribe to false */
final Channel channel = ably.channels.get(channelName);
ChannelOptions chOpts = new ChannelOptions();
chOpts.attachOnSubscribe = false;
channel.setOptions(chOpts);

List<Boolean> receivedMsg = Collections.synchronizedList(new ArrayList<>());

/* Check for all subscriptions without ATTACHING state */
channel.subscribe(message -> receivedMsg.add(true));
assertEquals(ChannelState.initialized, channel.state);

channel.subscribe("test_event", message -> receivedMsg.add(true));
assertEquals(ChannelState.initialized, channel.state);

channel.subscribe(new String[]{"test_event1", "test_event2"}, message -> receivedMsg.add(true));
assertEquals(ChannelState.initialized, channel.state);

channel.attach();
(new ChannelWaiter(channel)).waitFor(ChannelState.attached);

channel.publish("test_event", "hi there");
// Expecting two msg: one from the wildcard subscription and one from test_event subscription
Exception conditionError = new Helpers.ConditionalWaiter().
wait(() -> receivedMsg.size() == 2, 5000);
assertNull(conditionError);

receivedMsg.clear();
channel.publish("test_event1", "hi there");
// Expecting two msg: one from the wildcard subscription and one from test_event1 subscription
conditionError = new Helpers.ConditionalWaiter().
wait(() -> receivedMsg.size() == 2, 5000);
assertNull(conditionError);

receivedMsg.clear();
channel.publish("test_event2", "hi there");
// Expecting two msg: one from the wildcard subscription and one from test_event2 subscription
conditionError = new Helpers.ConditionalWaiter().
wait(() -> receivedMsg.size() == 2, 5000);
assertNull(conditionError);

} catch (AblyException e) {
e.printStackTrace();
fail("subscribe_without_implicit_attach: Unexpected exception");
} finally {
if(ably != null)
ably.close();
}
}

/**
* <p>
* Verifies that unsubscribe call with no argument removes all listeners,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1624,6 +1624,68 @@ public void onPresenceMessage(PresenceMessage message) {
}
}

/**
* <p>
* Validates a client can subscribe to presence without implicit channel attach
* Refer Spec TB4, RTP6d, RTP6e
* </p>
* @throws AblyException
*/
@Test
public void presence_subscribe_without_implicit_attach() {
String ablyChannel = "subscribe_" + testParams.name;
AblyRealtime ably = null;
try {
ClientOptions option1 = createOptions(testVars.keys[0].keyStr);
option1.clientId = "client1";
ably = new AblyRealtime(option1);

/* create a channel and set attachOnSubscribe to false */
final Channel channel = ably.channels.get(ablyChannel);
ChannelOptions chOpts = new ChannelOptions();
chOpts.attachOnSubscribe = false;
channel.setOptions(chOpts);

List<Boolean> receivedPresenceMsg = Collections.synchronizedList(new ArrayList<>());
CompletionWaiter completionWaiter = new CompletionWaiter();

/* Check for all subscriptions without ATTACHING state */
channel.presence.subscribe(m -> receivedPresenceMsg.add(true), completionWaiter);
assertEquals(1, completionWaiter.successCount);
assertEquals(ChannelState.initialized, channel.state);

channel.presence.subscribe(Action.enter, m -> receivedPresenceMsg.add(true), completionWaiter);
assertEquals(2, completionWaiter.successCount);
assertEquals(ChannelState.initialized, channel.state);

channel.presence.subscribe(EnumSet.of(Action.enter, Action.leave),m -> receivedPresenceMsg.add(true));
assertEquals(ChannelState.initialized, channel.state);

channel.attach();
(new ChannelWaiter(channel)).waitFor(ChannelState.attached);

channel.presence.enter("enter client1", null);
// Expecting 3 msg: one from the wildcard subscription and two from specific event subscription
Exception conditionError = new Helpers.ConditionalWaiter().
wait(() -> receivedPresenceMsg.size() == 3, 5000);
assertNull(conditionError);

receivedPresenceMsg.clear();
channel.presence.leave(null);
// Expecting 2 msg: one from the wildcard subscription and one from specific event subscription
conditionError = new Helpers.ConditionalWaiter().
wait(() -> receivedPresenceMsg.size() == 2, 5000);
assertNull(conditionError);

} catch (AblyException e) {
e.printStackTrace();
fail("presence_subscribe_without_implicit_attach: Unexpected exception");
} finally {
if(ably != null)
ably.close();
}
}

/**
* <p>
* Validates a client sending multiple presence updates when the channel is in the attaching
Expand Down
Loading