Skip to content

Commit

Permalink
[ECO-5163] fix: duplicated messages because of duplicated attach message
Browse files Browse the repository at this point in the history
  • Loading branch information
ttypic committed Dec 6, 2024
1 parent 6c0ffcf commit 6a44cf9
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 0 deletions.
10 changes: 10 additions & 0 deletions lib/src/main/java/io/ably/lib/realtime/ChannelBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,16 @@ private void attachImpl(final boolean forceReattach, final CompletionListener li
throw AblyException.fromErrorInfo(connectionManager.getStateErrorInfo());
}

// (RTL4i)
if (connectionManager.getConnectionState().state == ConnectionState.connecting
|| connectionManager.getConnectionState().state == ConnectionState.disconnected) {
if (listener != null) {
on(new ChannelStateCompletionListener(listener, ChannelState.attached, ChannelState.failed));
}
setState(ChannelState.attaching, null);
return;
}

/* send attach request and pending state */
Log.v(TAG, "attach(); channel = " + name + "; sending ATTACH request");
ProtocolMessage attachMessage = new ProtocolMessage(Action.attach, this.name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,16 @@
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonPrimitive;
import io.ably.lib.types.ChannelOptions;
import io.ably.lib.types.MessageAction;
import io.ably.lib.types.MessageExtras;
import io.ably.lib.types.Param;
Expand Down Expand Up @@ -1010,4 +1014,41 @@ public void should_have_serial_action_createdAt() throws AblyException {
assertNull(msgComplete.waitFor(1, 10_000));
}
}

@Test
public void should_not_duplicate_messages() throws Exception {
ClientOptions opts = createOptions(testVars.keys[0].keyStr);
String testChannelName = "my-channel" + System.currentTimeMillis();
try (AblyRest rest = new AblyRest(opts)) {
final io.ably.lib.rest.Channel channel = rest.channels.get(testChannelName);
List<String> testMessagesData = List.of("message 1", "message 2", "message 3");
List<Message> messages = testMessagesData
.stream()
.map(data -> new Message("name", data))
.collect(Collectors.toList());
channel.publish(messages.toArray(Message[]::new));
}

try (AblyRealtime realtime = new AblyRealtime(opts)) {
final ChannelOptions options = new ChannelOptions();
options.params = Map.of("rewind", "10");
final Channel channel = realtime.channels.get(testChannelName, options);
final CompletionWaiter completionWaiter = new CompletionWaiter();
final AtomicInteger counter = new AtomicInteger();

channel.subscribe(message -> {
int value = counter.incrementAndGet();
if (value == 3) completionWaiter.onSuccess();
});

completionWaiter.waitFor();

assertEquals("Should be exactly 3 messages", 3, counter.get());

Thread.sleep(1500);

assertEquals("Should be exactly 3 messages even after 1.5 sec wait", 3, counter.get());
}
}

}

0 comments on commit 6a44cf9

Please sign in to comment.