From f653d2dbc6648ce2661e0b0b275dc5f7da60b06e Mon Sep 17 00:00:00 2001 From: evgeny Date: Thu, 5 Dec 2024 23:04:15 +0000 Subject: [PATCH] [ECO-5163] fix: duplicated messages because of duplicated attach message --- .../io/ably/lib/realtime/ChannelBase.java | 10 ++++ .../ably/lib/transport/ConnectionManager.java | 9 +++- .../realtime/RealtimeConnectFailTest.java | 3 +- .../test/realtime/RealtimeMessageTest.java | 46 +++++++++++++++++++ 4 files changed, 65 insertions(+), 3 deletions(-) diff --git a/lib/src/main/java/io/ably/lib/realtime/ChannelBase.java b/lib/src/main/java/io/ably/lib/realtime/ChannelBase.java index b84ba7dc0..9e0c9974c 100644 --- a/lib/src/main/java/io/ably/lib/realtime/ChannelBase.java +++ b/lib/src/main/java/io/ably/lib/realtime/ChannelBase.java @@ -243,6 +243,16 @@ private void attachImpl(final boolean forceReattach, final CompletionListener li throw AblyException.fromErrorInfo(connectionManager.getStateErrorInfo()); } + // (RTL4i) + if (connectionManager.getConnectionState().state == ConnectionState.connecting + || connectionManager.getConnectionState().state == ConnectionState.disconnected) { + if (listener != null) { + on(new ChannelStateCompletionListener(listener, ChannelState.attached, ChannelState.failed)); + } + setState(ChannelState.attaching, null); + return; + } + /* send attach request and pending state */ Log.v(TAG, "attach(); channel = " + name + "; sending ATTACH request"); ProtocolMessage attachMessage = new ProtocolMessage(Action.attach, this.name); diff --git a/lib/src/main/java/io/ably/lib/transport/ConnectionManager.java b/lib/src/main/java/io/ably/lib/transport/ConnectionManager.java index 4a10c7487..40763209d 100644 --- a/lib/src/main/java/io/ably/lib/transport/ConnectionManager.java +++ b/lib/src/main/java/io/ably/lib/transport/ConnectionManager.java @@ -1684,9 +1684,14 @@ private void sendImpl(QueuedMessage msg) throws AblyException { private void sendQueuedMessages() { synchronized(this) { - while(queuedMessages.size() > 0) { + while(!queuedMessages.isEmpty()) { try { - sendImpl(queuedMessages.get(0)); + QueuedMessage message = queuedMessages.get(0); + // Do not send attach message from queued messages to prevent duplication + // (we always send attach on connect event) + if (message.msg.action != ProtocolMessage.Action.attach) { + sendImpl(message); + } } catch (AblyException e) { Log.e(TAG, "sendQueuedMessages(): Unexpected error sending queued messages", e); } finally { diff --git a/lib/src/test/java/io/ably/lib/test/realtime/RealtimeConnectFailTest.java b/lib/src/test/java/io/ably/lib/test/realtime/RealtimeConnectFailTest.java index 02a1d07d5..d9c6d5e58 100644 --- a/lib/src/test/java/io/ably/lib/test/realtime/RealtimeConnectFailTest.java +++ b/lib/src/test/java/io/ably/lib/test/realtime/RealtimeConnectFailTest.java @@ -75,7 +75,8 @@ public void connect_fail_notfound_error() throws AblyException { public void connect_fail_authorized_error() throws AblyException { AblyRealtime ably = null; try { - ClientOptions opts = createOptions(testVars.appId + ".invalid_key_id:invalid_key_value"); + String keyId = testVars.keys[0].keyName.split("\\.")[1]; + ClientOptions opts = createOptions(testVars.appId + "." + keyId + ":invalid_key_value"); ably = new AblyRealtime(opts); ConnectionWaiter connectionWaiter = new ConnectionWaiter(ably.connection); diff --git a/lib/src/test/java/io/ably/lib/test/realtime/RealtimeMessageTest.java b/lib/src/test/java/io/ably/lib/test/realtime/RealtimeMessageTest.java index 2d00524f1..1ff965b1d 100644 --- a/lib/src/test/java/io/ably/lib/test/realtime/RealtimeMessageTest.java +++ b/lib/src/test/java/io/ably/lib/test/realtime/RealtimeMessageTest.java @@ -13,12 +13,14 @@ import java.util.HashMap; import java.util.List; import java.util.Locale; +import java.util.concurrent.atomic.AtomicInteger; import com.google.gson.Gson; import com.google.gson.JsonArray; import com.google.gson.JsonElement; import com.google.gson.JsonObject; import com.google.gson.JsonPrimitive; +import io.ably.lib.types.ChannelOptions; import io.ably.lib.types.MessageAction; import io.ably.lib.types.MessageExtras; import io.ably.lib.types.Param; @@ -995,6 +997,10 @@ public void should_have_serial_action_createdAt() throws AblyException { msgComplete.onSuccess(); }); + CompletionWaiter attachListener = new CompletionWaiter(); + channel.attach(attachListener); + assertNull(attachListener.waitFor(1, 10_000)); + /* publish to the channel */ JsonObject chatMessage = new JsonObject(); chatMessage.addProperty("text", "hello world!"); @@ -1010,4 +1016,44 @@ public void should_have_serial_action_createdAt() throws AblyException { assertNull(msgComplete.waitFor(1, 10_000)); } } + + @Test + public void should_not_duplicate_messages() throws Exception { + ClientOptions opts = createOptions(testVars.keys[0].keyStr); + String testChannelName = "my-channel" + System.currentTimeMillis(); + try (AblyRest rest = new AblyRest(opts)) { + final io.ably.lib.rest.Channel channel = rest.channels.get(testChannelName); + + Message[] messages = new Message[] { + new Message("name", "message 1"), + new Message("name", "message 2"), + new Message("name", "message 3"), + }; + + channel.publish(messages); + } + + try (AblyRealtime realtime = new AblyRealtime(opts)) { + final ChannelOptions options = new ChannelOptions(); + options.params = new HashMap<>(); + options.params.put("rewind", "10"); + final Channel channel = realtime.channels.get(testChannelName, options); + final CompletionWaiter completionWaiter = new CompletionWaiter(); + final AtomicInteger counter = new AtomicInteger(); + + channel.subscribe(message -> { + int value = counter.incrementAndGet(); + if (value == 3) completionWaiter.onSuccess(); + }); + + completionWaiter.waitFor(); + + assertEquals("Should be exactly 3 messages", 3, counter.get()); + + Thread.sleep(1500); + + assertEquals("Should be exactly 3 messages even after 1.5 sec wait", 3, counter.get()); + } + } + }