From 32e06878cdff4488a84d7849fb6244adb2fe23b9 Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Wed, 20 Dec 2023 18:19:48 +0530 Subject: [PATCH 01/23] Fixed failing tests for recovery key --- .../ably/lib/test/realtime/RealtimeConnectFailTest.java | 9 +++++---- .../ably/lib/test/realtime/RealtimeHttpHeaderTest.java | 2 +- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/lib/src/test/java/io/ably/lib/test/realtime/RealtimeConnectFailTest.java b/lib/src/test/java/io/ably/lib/test/realtime/RealtimeConnectFailTest.java index 55f95d814..eff2e652d 100644 --- a/lib/src/test/java/io/ably/lib/test/realtime/RealtimeConnectFailTest.java +++ b/lib/src/test/java/io/ably/lib/test/realtime/RealtimeConnectFailTest.java @@ -342,15 +342,16 @@ public void connect_unknown_recover_fail() { AblyRealtime ably = null; try { ClientOptions opts = createOptions(testVars.keys[0].keyStr); - String recoverConnectionId = "0123456789abcdef-99"; - opts.recover = recoverConnectionId + ":0"; + String recoveryKey = + "{\"connectionKey\":\"0123456789abcdef-99\",\"msgSerial\":5,\"channelSerials\":{\"channel1\":\"98\",\"channel2\":\"32\",\"channel3\":\"09\"}}"; + opts.recover = recoveryKey; ably = new AblyRealtime(opts); ConnectionWaiter connectionWaiter = new ConnectionWaiter(ably.connection); ErrorInfo connectedError = connectionWaiter.waitFor(ConnectionState.connected); assertEquals("Verify connected state is reached", ConnectionState.connected, ably.connection.state); assertNotNull("Verify error is returned", connectedError); - assertEquals("Verify correct error code is given", 80008, connectedError.code); - assertFalse("Verify new connection id is assigned", recoverConnectionId.equals(ably.connection.key)); + assertEquals("Verify correct error code is given", 80018, connectedError.code); + assertFalse("Verify new connection id is assigned", "0123456789abcdef-99".equals(ably.connection.key)); } catch (AblyException e) { e.printStackTrace(); fail("init0: Unexpected exception instantiating library"); diff --git a/lib/src/test/java/io/ably/lib/test/realtime/RealtimeHttpHeaderTest.java b/lib/src/test/java/io/ably/lib/test/realtime/RealtimeHttpHeaderTest.java index 07e945629..ac90c7ac3 100644 --- a/lib/src/test/java/io/ably/lib/test/realtime/RealtimeHttpHeaderTest.java +++ b/lib/src/test/java/io/ably/lib/test/realtime/RealtimeHttpHeaderTest.java @@ -81,7 +81,7 @@ public void realtime_websocket_param_test() { * Defaults.ABLY_VERSION_PARAM, as ultimately the request param has been derived from those values. */ assertEquals("Verify correct version", requestParameters.get("v"), - Collections.singletonList("1.0")); + Collections.singletonList("2")); /* Spec RSC7d3 * This test should not directly validate version against Defaults.ABLY_AGENT_VERSION, nor From 6ea5c2b6ed679bba2c63a9f03436d296b9e3761c Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Mon, 8 Jan 2024 18:36:31 +0530 Subject: [PATCH 02/23] Refactored test file for no connection serial --- .../lib/test/realtime/RealtimeResumeTest.java | 112 ++++++++---------- 1 file changed, 47 insertions(+), 65 deletions(-) diff --git a/lib/src/test/java/io/ably/lib/test/realtime/RealtimeResumeTest.java b/lib/src/test/java/io/ably/lib/test/realtime/RealtimeResumeTest.java index 356dcf210..2895dc841 100644 --- a/lib/src/test/java/io/ably/lib/test/realtime/RealtimeResumeTest.java +++ b/lib/src/test/java/io/ably/lib/test/realtime/RealtimeResumeTest.java @@ -86,7 +86,7 @@ public void resume_none() { /* wait */ System.out.println("Got reconnection; waiting 2s"); - try { Thread.sleep(2000L); } catch(InterruptedException e) {} + try { Thread.sleep(2000L); } catch(InterruptedException ignored) {} /* Check the channel is still attached. */ assertEquals("Verify channel still attached", channel.state, ChannelState.attached); @@ -140,12 +140,12 @@ public void resume_simple() { CompletionSet msgComplete1 = new CompletionSet(); for(int i = 0; i < messageCount; i++) { channelTx.publish("test_event", "Test message (resume_simple) " + i, msgComplete1.add()); - try { Thread.sleep(delay); } catch(InterruptedException e){} + try { Thread.sleep(delay); } catch(InterruptedException ignored){} } /* wait for the publish callback to be called */ ErrorInfo[] errors = msgComplete1.waitFor(); - assertTrue("Verify success from all message callbacks", errors.length == 0); + assertEquals("Verify success from all message callbacks", 0, errors.length); /* wait for the subscription callback to be called */ messageWaiter.waitFor(messageCount); @@ -159,7 +159,7 @@ public void resume_simple() { ablyRx.connection.connectionManager.requestState(ConnectionState.disconnected); /* wait */ - try { Thread.sleep(2000L); } catch(InterruptedException e) {} + try { Thread.sleep(2000L); } catch(InterruptedException ignored) {} /* reconnect the rx connection */ ablyRx.connection.connect(); @@ -168,12 +168,12 @@ public void resume_simple() { CompletionSet msgComplete2 = new CompletionSet(); for(int i = 0; i < messageCount; i++) { channelTx.publish("test_event", "Test message (resume_simple) " + i, msgComplete2.add()); - try { Thread.sleep(delay); } catch(InterruptedException e){} + try { Thread.sleep(delay); } catch(InterruptedException ignored){} } /* wait for the publish callback to be called */ errors = msgComplete2.waitFor(); - assertTrue("Verify success from all message callbacks", errors.length == 0); + assertEquals("Verify success from all message callbacks", 0, errors.length); /* wait for the subscription callback to be called */ messageWaiter.waitFor(messageCount); @@ -231,12 +231,12 @@ public void resume_disconnected() { CompletionSet msgComplete1 = new CompletionSet(); for(int i = 0; i < messageCount; i++) { channelTx.publish("test_event", "Test message (resume_disconnected) " + i, msgComplete1.add()); - try { Thread.sleep(delay); } catch(InterruptedException e){} + try { Thread.sleep(delay); } catch(InterruptedException ignored){} } /* wait for the publish callback to be called */ ErrorInfo[] errors = msgComplete1.waitFor(); - assertTrue("Verify success from all message callbacks", errors.length == 0); + assertEquals("Verify success from all message callbacks", 0, errors.length); /* wait for the subscription callback to be called */ messageWaiter.waitFor(messageCount); @@ -250,18 +250,18 @@ public void resume_disconnected() { ablyRx.connection.connectionManager.requestState(ConnectionState.disconnected); /* wait */ - try { Thread.sleep(2000L); } catch(InterruptedException e) {} + try { Thread.sleep(2000L); } catch(InterruptedException ignored) {} /* publish next messages to the channel */ CompletionSet msgComplete2 = new CompletionSet(); for(int i = 0; i < messageCount; i++) { channelTx.publish("test_event", "Test message (resume_disconnected) " + i, msgComplete2.add()); - try { Thread.sleep(delay); } catch(InterruptedException e){} + try { Thread.sleep(delay); } catch(InterruptedException ignored){} } /* wait for the publish callback to be called */ errors = msgComplete2.waitFor(); - assertTrue("Verify success from all message callbacks", errors.length == 0); + assertEquals("Verify success from all message callbacks", 0, errors.length); /* reconnect the rx connection, and expect the messages to be delivered */ ablyRx.connection.connect(); @@ -327,12 +327,12 @@ public void resume_multiple_channel() { for(int i = 0; i < messageCount; i++) { channelTx1.publish("test_event1", "Test message (resume_multiple_channel) " + i, msgComplete1.add()); channelTx2.publish("test_event2", "Test message (resume_multiple_channel) " + i, msgComplete1.add()); - try { Thread.sleep(delay); } catch(InterruptedException e){} + try { Thread.sleep(delay); } catch(InterruptedException ignored){} } /* wait for the publish callback to be called */ ErrorInfo[] errors = msgComplete1.waitFor(); - assertTrue("Verify success from all message callbacks", errors.length == 0); + assertEquals("Verify success from all message callbacks", 0, errors.length); /* wait for the subscription callback to be called */ messageWaiter1.waitFor(messageCount); @@ -349,19 +349,19 @@ public void resume_multiple_channel() { ablyRx.connection.connectionManager.requestState(ConnectionState.disconnected); /* wait */ - try { Thread.sleep(2000L); } catch(InterruptedException e) {} + try { Thread.sleep(2000L); } catch(InterruptedException ignored) {} /* publish next messages to the channel */ CompletionSet msgComplete2 = new CompletionSet(); for(int i = 0; i < messageCount; i++) { channelTx1.publish("test_event1", "Test message (resume_multiple_channel) " + i, msgComplete2.add()); channelTx2.publish("test_event2", "Test message (resume_multiple_channel) " + i, msgComplete2.add()); - try { Thread.sleep(delay); } catch(InterruptedException e){} + try { Thread.sleep(delay); } catch(InterruptedException ignored){} } /* wait for the publish callback to be called */ errors = msgComplete2.waitFor(); - assertTrue("Verify success from all message callbacks", errors.length == 0); + assertEquals("Verify success from all message callbacks", 0, errors.length); /* reconnect the rx connection, and expect the messages to be delivered */ ablyRx.connection.connect(); @@ -420,12 +420,12 @@ public void resume_multiple_interval() { CompletionSet msgComplete1 = new CompletionSet(); for(int i = 0; i < messageCount; i++) { channelTx.publish("test_event", "Test message (resume_multiple_interval) " + i, msgComplete1.add()); - try { Thread.sleep(delay); } catch(InterruptedException e){} + try { Thread.sleep(delay); } catch(InterruptedException ignored){} } /* wait for the publish callback to be called */ ErrorInfo[] errors = msgComplete1.waitFor(); - assertTrue("Verify success from all message callbacks", errors.length == 0); + assertEquals("Verify success from all message callbacks", 0, errors.length); /* wait for the subscription callback to be called */ messageWaiter.waitFor(messageCount); @@ -439,18 +439,18 @@ public void resume_multiple_interval() { ablyRx.connection.connectionManager.requestState(ConnectionState.disconnected); /* wait */ - try { Thread.sleep(20000L); } catch(InterruptedException e) {} + try { Thread.sleep(20000L); } catch(InterruptedException ignored) {} /* publish next messages to the channel */ CompletionSet msgComplete2 = new CompletionSet(); for(int i = 0; i < messageCount; i++) { channelTx.publish("test_event", "Test message (resume_multiple_interval) " + i, msgComplete2.add()); - try { Thread.sleep(delay); } catch(InterruptedException e){} + try { Thread.sleep(delay); } catch(InterruptedException ignored){} } /* wait for the publish callback to be called */ errors = msgComplete2.waitFor(); - assertTrue("Verify success from all message callbacks", errors.length == 0); + assertEquals("Verify success from all message callbacks", 0, errors.length); /* reconnect the rx connection, and expect the messages to be delivered */ ablyRx.connection.connect(); @@ -509,12 +509,12 @@ public void resume_verify_publish() { CompletionSet msgComplete1 = new CompletionSet(); for(int i = 0; i < messageCount; i++) { channelTx.publish("test_event", "Test message (resume_simple) " + i, msgComplete1.add()); - try { Thread.sleep(delay); } catch(InterruptedException e){} + try { Thread.sleep(delay); } catch(InterruptedException ignored){} } /* wait for the publish callback to be called */ ErrorInfo[] errors = msgComplete1.waitFor(); - assertTrue("Verify success from all message callbacks", errors.length == 0); + assertEquals("Verify success from all message callbacks", 0, errors.length); /* wait for the subscription callback to be called */ messageWaiter.waitFor(messageCount); @@ -536,7 +536,7 @@ public void resume_verify_publish() { } /* wait */ - try { Thread.sleep(2000L); } catch(InterruptedException e) {} + try { Thread.sleep(2000L); } catch(InterruptedException ignored) {} /* reconnect the tx connection */ System.out.println("*** about to reconnect tx connection"); @@ -547,7 +547,7 @@ public void resume_verify_publish() { CompletionSet msgComplete2 = new CompletionSet(); for(int i = 0; i < messageCount; i++) { channelTx.publish("test_event", "Test message (resume_simple) " + i, msgComplete2.add()); - try { Thread.sleep(delay); } catch(InterruptedException e){} + try { Thread.sleep(delay); } catch(InterruptedException ignored){} } /* wait for the publish callback to be called. This never finishes if @@ -556,7 +556,7 @@ public void resume_verify_publish() { System.out.println("*** published. About to wait for callbacks"); errors = msgComplete2.waitFor(); System.out.println("*** done"); - assertTrue("Verify success from all message callbacks", errors.length == 0); + assertEquals("Verify success from all message callbacks", 0, errors.length); /* wait for the subscription callback to be called */ messageWaiter.waitFor(messageCount); @@ -619,14 +619,12 @@ public void resume_publish_queue() { CompletionSet msgComplete1 = new CompletionSet(); for(int i = 0; i < messageCount; i++) { senderChannel.publish("test_event", "Test message (resume_publish_queue) " + i, msgComplete1.add()); - try { Thread.sleep(delay); } catch(InterruptedException e){} + try { Thread.sleep(delay); } catch(InterruptedException ignored){} } /* wait for the publish callback to be called */ ErrorInfo[] errors = msgComplete1.waitFor(); - assertTrue( - "First round of messages has errors", errors.length == 0 - ); + assertEquals("First round of messages has errors", 0, errors.length); /* wait for the subscription callback to be called */ messageWaiter.waitFor(messageCount); @@ -643,7 +641,7 @@ public void resume_publish_queue() { sender.connection.connectionManager.requestState(ConnectionState.disconnected); /* wait */ - try { Thread.sleep(2000L); } catch(InterruptedException e) {} + try { Thread.sleep(2000L); } catch(InterruptedException ignored) {} /* * publish further messages to the channel, which should be queued @@ -652,7 +650,7 @@ public void resume_publish_queue() { CompletionSet msgComplete2 = new CompletionSet(); for(int i = 0; i < messageCount; i++) { senderChannel.publish("queued_message_" + i, "Test queued message (resume_publish_queue) " + i, msgComplete2.add()); - try { Thread.sleep(delay); } catch(InterruptedException e){} + try { Thread.sleep(delay); } catch(InterruptedException ignored){} } /* reconnect the sender */ @@ -662,10 +660,7 @@ public void resume_publish_queue() { /* wait for the publish callback to be called.*/ errors = msgComplete2.waitFor(); - assertTrue( - "Second round of messages (queued) has errors", - errors.length == 0 - ); + assertEquals("Second round of messages (queued) has errors", 0, errors.length); /* wait for the subscription callback to be called */ messageWaiter.waitFor(messageCount); @@ -731,10 +726,7 @@ public void resume_publish_resend_pending_messages_when_resume_is_successful() { /* wait for the publish callback to be called.*/ ErrorInfo[] errors = senderCompletion.waitFor(); - assertTrue( - "First completion has errors", - errors.length == 0 - ); + assertEquals("First completion has errors", 0, errors.length); //assert that messages sent till now are sent with correct size and serials assertEquals("First round of messages has incorrect size", 3, transport.getPublishedMessages().size()); @@ -797,10 +789,7 @@ public void resume_publish_resend_pending_messages_when_resume_is_successful() { (new ChannelWaiter(senderChannel)).waitFor(ChannelState.attached); /* wait for the publish callback to be called.*/ ErrorInfo[] senderErrors = senderCompletion.waitFor(); - assertTrue( - "Second round of send has errors", - senderErrors.length == 0 - ); + assertEquals("Second round of send has errors", 0, senderErrors.length); assertEquals("Second round of messages has incorrect size", 6, transport.getPublishedMessages().size()); //make sure they were sent with correct serials @@ -917,7 +906,7 @@ public void onConnectionStateChanged(ConnectionStateChange state) { /* Wait for the connection to go stale, then reconnect */ try { Thread.sleep(waitInDisconnectedState); - } catch (InterruptedException e) { + } catch (InterruptedException ignored) { } ably.connection.connect(); connectionWaiter.waitFor(ConnectionState.connected); @@ -933,10 +922,7 @@ public void onConnectionStateChanged(ConnectionStateChange state) { assertEquals("Connection has the same id", ChannelState.attached, senderChannel.state); ErrorInfo[] resendErrors = senderCompletion.waitFor(); - assertTrue( - "Second round of messages (queued) has errors", - resendErrors.length == 0 - ); + assertEquals("Second round of messages (queued) has errors", 0, resendErrors.length); assertEquals("Second round of messages has incorrect size", 6, transport.getPublishedMessages().size()); //make sure they were sent with reset serials @@ -956,8 +942,9 @@ public void resume_publish_reenter_when_resume_failed() throws AblyException { final String channelName = "sender_channel"; final MockWebsocketFactory mockWebsocketFactory = new MockWebsocketFactory(); final DebugOptions options = createOptions(testVars.keys[0].keyStr); - final String[] clients = new String[]{"client1","client2","client3", - "client4","client5","client6","client7","client8","client9"}; + final String[] clients = new String[]{"client1", "client2", "client3", "client4", "client5", + "client6", "client7", "client8", "client9"}; + options.logLevel = Log.VERBOSE; options.realtimeRequestTimeout = 2000L; @@ -972,10 +959,12 @@ public void resume_publish_reenter_when_resume_failed() throws AblyException { @Override public void onConnectionStateChanged(ConnectionStateChange state) { try { - Field connectionStateField = ably.connection.connectionManager.getClass().getDeclaredField("connectionStateTtl"); + Field connectionStateField = ably.connection.connectionManager.getClass(). + getDeclaredField("connectionStateTtl"); connectionStateField.setAccessible(true); connectionStateField.setLong(ably.connection.connectionManager, newTtl); - Field maxIdleField = ably.connection.connectionManager.getClass().getDeclaredField("maxIdleInterval"); + Field maxIdleField = ably.connection.connectionManager.getClass(). + getDeclaredField("maxIdleInterval"); maxIdleField.setAccessible(true); maxIdleField.setLong(ably.connection.connectionManager, newIdleInterval); } catch (NoSuchFieldException | IllegalAccessException e) { @@ -990,10 +979,7 @@ public void onConnectionStateChanged(ConnectionStateChange state) { final Channel senderChannel = ably.channels.get(channelName); senderChannel.attach(); (new ChannelWaiter(senderChannel)).waitFor(ChannelState.attached); - assertEquals( - "The sender's channel should be attached", - senderChannel.state, ChannelState.attached - ); + assertEquals("The sender's channel should be attached", senderChannel.state, ChannelState.attached); MockWebsocketFactory.MockWebsocketTransport transport = mockWebsocketFactory.getCreatedTransport(); CompletionSet presenceCompletion = new CompletionSet(); @@ -1043,7 +1029,7 @@ public void onConnectionStateChanged(ConnectionStateChange state) { /* Wait for the connection to go stale, then reconnect */ try { Thread.sleep(waitInDisconnectedState); - } catch (InterruptedException e) { + } catch (InterruptedException ignored) { } //now let's unblock the ack nacks and reconnect @@ -1070,13 +1056,9 @@ public void onConnectionStateChanged(ConnectionStateChange state) { for (ErrorInfo resendError : resendErrors) { System.out.println("presence_resume_test: error "+resendError.message); } - assertTrue( - "Second round of messages (queued) has errors", - resendErrors.length == 0 - ); + assertEquals("Second round of messages (queued) has errors", 0, resendErrors.length); - for (PresenceMessage presenceMessage: - transport.getSentPresenceMessages()) { + for (PresenceMessage presenceMessage: transport.getSentPresenceMessages()) { System.out.println("presence_resume_test: sent message with client: "+presenceMessage.clientId +" " + " action:"+presenceMessage.action); } @@ -1087,7 +1069,7 @@ public void onConnectionStateChanged(ConnectionStateChange state) { sentPresenceMap.put(presenceMessage.clientId, presenceMessage); } for (String client : clients) { - assertTrue("Client id isn't there:"+client, sentPresenceMap.containsKey(client)); + assertTrue("Client id isn't there:" + client, sentPresenceMap.containsKey(client)); } } } From 75a8032457da6169c26e95d485ee2a0c1038255e Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Tue, 23 Jan 2024 19:03:26 +0530 Subject: [PATCH 03/23] updated realtime channel test for resume flag --- .../test/realtime/RealtimeChannelTest.java | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/lib/src/test/java/io/ably/lib/test/realtime/RealtimeChannelTest.java b/lib/src/test/java/io/ably/lib/test/realtime/RealtimeChannelTest.java index 181dbc49c..111a9b269 100644 --- a/lib/src/test/java/io/ably/lib/test/realtime/RealtimeChannelTest.java +++ b/lib/src/test/java/io/ably/lib/test/realtime/RealtimeChannelTest.java @@ -1479,8 +1479,8 @@ public void channel_server_initiated_attached_detached() throws AblyException { channel.attach(); channelWaiter.waitFor(ChannelState.attached); - final int[] updateEventsEmitted = new int[]{0}; - final boolean[] resumedFlag = new boolean[]{true}; + final int[] updateEventsEmitted = {0}; + final boolean[] resumedFlag = {false}; channel.on(ChannelEvent.update, new ChannelStateListener() { @Override public void onChannelStateChanged(ChannelStateChange stateChange) { @@ -1497,19 +1497,19 @@ public void onChannelStateChanged(ChannelStateChange stateChange) { }}; ably.connection.connectionManager.onMessage(null, attachedMessage); - /* Inject detached message as if from the server */ - ProtocolMessage detachedMessage = new ProtocolMessage() {{ - action = Action.detached; - channel = channelName; - }}; - ably.connection.connectionManager.onMessage(null, detachedMessage); +// /* Inject detached message as if from the server */ +// ProtocolMessage detachedMessage = new ProtocolMessage() {{ +// action = Action.detached; +// channel = channelName; +// }}; +// ably.connection.connectionManager.onMessage(null, detachedMessage); /* Channel should transition to attaching, then to attached */ - channelWaiter.waitFor(ChannelState.attaching); - channelWaiter.waitFor(ChannelState.attached); +// channelWaiter.waitFor(ChannelState.attaching); +// channelWaiter.waitFor(ChannelState.attached); /* Verify received UPDATE message on channel */ - assertEquals("Verify exactly one UPDATE event was emitted on the channel", updateEventsEmitted[0], 1); + assertEquals("Verify exactly one UPDATE event was emitted on the channel",1, updateEventsEmitted[0]); assertTrue("Verify resumed flag set in UPDATE event", resumedFlag[0]); } finally { if (ably != null) From ea65bffcc1332799609136b3e12a61bd93e773dc Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Wed, 24 Jan 2024 18:08:10 +0530 Subject: [PATCH 04/23] Added todo that checks for exact error --- lib/src/main/java/io/ably/lib/realtime/ChannelBase.java | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/src/main/java/io/ably/lib/realtime/ChannelBase.java b/lib/src/main/java/io/ably/lib/realtime/ChannelBase.java index 4ae7b353a..e8b61383b 100644 --- a/lib/src/main/java/io/ably/lib/realtime/ChannelBase.java +++ b/lib/src/main/java/io/ably/lib/realtime/ChannelBase.java @@ -611,6 +611,7 @@ public void run() { /* State changes provoked by ConnectionManager state changes. */ public void setConnected() { + // TODO - seems test is failing because of explicit attach after connect if (state.isReattachable()){ attach(true,null); // RTN15c6, RTN15c7 } From cd355b6d83cbf082c713feb5b2b3cbc147bbbe72 Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Thu, 25 Jan 2024 18:02:04 +0530 Subject: [PATCH 05/23] Fixed test for server injected attach --- .../io/ably/lib/realtime/ChannelBase.java | 10 +-- .../test/realtime/RealtimeChannelTest.java | 74 ++++++++++++++++--- 2 files changed, 67 insertions(+), 17 deletions(-) diff --git a/lib/src/main/java/io/ably/lib/realtime/ChannelBase.java b/lib/src/main/java/io/ably/lib/realtime/ChannelBase.java index e8b61383b..7538789ad 100644 --- a/lib/src/main/java/io/ably/lib/realtime/ChannelBase.java +++ b/lib/src/main/java/io/ably/lib/realtime/ChannelBase.java @@ -82,15 +82,13 @@ public abstract class ChannelBase extends EventEmitter updateEventsEmitted = new ArrayList<>(); + channel.on(new ChannelStateListener() { + @Override + public void onChannelStateChanged(ChannelStateChange stateChange) { + updateEventsEmitted.add(stateChange); + } + }); + + /* Inject attached message as if received from the server */ + ProtocolMessage attachedMessage = new ProtocolMessage() {{ + action = Action.attached; + channel = channelName; + }}; + + ably.connection.connectionManager.onMessage(null, attachedMessage); + assertEquals(1, updateEventsEmitted.size()); + assertEquals(ChannelEvent.update, updateEventsEmitted.get(0).event); + assertFalse(updateEventsEmitted.get(0).resumed); + + } finally { + if (ably != null) + ably.close(); + Defaults.realtimeRequestTimeout = oldRealtimeTimeout; + } + } + /* * Establish connection, attach channel, simulate sending attached and detached messages * from the server, test correct behaviour @@ -1472,6 +1523,7 @@ public void channel_server_initiated_attached_detached() throws AblyException { opts.channelRetryTimeout = 1000; ably = new AblyRealtime(opts); + new ConnectionWaiter(ably.connection).waitFor(ConnectionState.connected); Channel channel = ably.channels.get(channelName); ChannelWaiter channelWaiter = new ChannelWaiter(channel); @@ -1490,12 +1542,12 @@ public void onChannelStateChanged(ChannelStateChange stateChange) { }); /* Inject attached message as if received from the server */ - ProtocolMessage attachedMessage = new ProtocolMessage() {{ - action = Action.attached; - channel = channelName; - flags |= Flag.resumed.getMask(); - }}; - ably.connection.connectionManager.onMessage(null, attachedMessage); +// ProtocolMessage attachedMessage = new ProtocolMessage() {{ +// action = Action.attached; +// channel = channelName; +// flags |= Flag.resumed.getMask(); +// }}; +// ably.connection.connectionManager.onMessage(null, attachedMessage); // /* Inject detached message as if from the server */ // ProtocolMessage detachedMessage = new ProtocolMessage() {{ From 969d1d563d765f6b4d3cf32a9add85b6f580eea8 Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Fri, 26 Jan 2024 11:16:47 +0530 Subject: [PATCH 06/23] Refactored helper method to wait channel event --- .../java/io/ably/lib/test/common/Helpers.java | 30 ++++++++++++------- 1 file changed, 19 insertions(+), 11 deletions(-) diff --git a/lib/src/test/java/io/ably/lib/test/common/Helpers.java b/lib/src/test/java/io/ably/lib/test/common/Helpers.java index 8a9a7e38b..b2af77cb8 100644 --- a/lib/src/test/java/io/ably/lib/test/common/Helpers.java +++ b/lib/src/test/java/io/ably/lib/test/common/Helpers.java @@ -28,14 +28,8 @@ import io.ably.lib.debug.DebugOptions.RawProtocolListener; import io.ably.lib.http.HttpCore; import io.ably.lib.http.HttpUtils; -import io.ably.lib.realtime.Channel; +import io.ably.lib.realtime.*; import io.ably.lib.realtime.Channel.MessageListener; -import io.ably.lib.realtime.ChannelState; -import io.ably.lib.realtime.ChannelStateListener; -import io.ably.lib.realtime.CompletionListener; -import io.ably.lib.realtime.Connection; -import io.ably.lib.realtime.ConnectionState; -import io.ably.lib.realtime.ConnectionStateListener; import io.ably.lib.realtime.Presence.PresenceListener; import io.ably.lib.transport.ConnectionManager; import io.ably.lib.types.AblyException; @@ -586,28 +580,42 @@ public ChannelWaiter(Channel channel) { /** * Wait for a given state to be reached. - * @param state */ public synchronized ErrorInfo waitFor(ChannelState state) { Log.d(TAG, "waitFor(" + state + ")"); while(channel.state != state) - try { wait(); } catch(InterruptedException e) {} + try { wait(); } catch(InterruptedException ignored) {} Log.d(TAG, "waitFor done: " + channel.state + ", " + channel.reason + ")"); return channel.reason; } + /** + * Wait for a given ChannelEvent to be reached. + */ + public synchronized ChannelStateChange waitFor(ChannelEvent channelEvent) { + Log.d(TAG, "waitFor(" + channelEvent + ")"); + while(this.channelStateChange.event != channelEvent) + try { wait(); } catch(InterruptedException ignored) {} + Log.d(TAG, "waitFor done: " + channel.state + ", " + channel.reason + ")"); + return this.channelStateChange; + } + /** * ChannelStateListener interface */ @Override public void onChannelStateChanged(ChannelStateListener.ChannelStateChange stateChange) { - synchronized(this) { notify(); } + synchronized(this) { + this.channelStateChange = stateChange; + notify(); + } } /** * Internal */ - private Channel channel; + private final Channel channel; + private ChannelStateChange channelStateChange; } /** From f84e07adddb985cb3fab25d290609c68a33417eb Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Fri, 26 Jan 2024 11:17:06 +0530 Subject: [PATCH 07/23] Added test for server initiated detached --- .../test/realtime/RealtimeChannelTest.java | 71 +++++-------------- 1 file changed, 19 insertions(+), 52 deletions(-) diff --git a/lib/src/test/java/io/ably/lib/test/realtime/RealtimeChannelTest.java b/lib/src/test/java/io/ably/lib/test/realtime/RealtimeChannelTest.java index be7a5b18a..56c8441a1 100644 --- a/lib/src/test/java/io/ably/lib/test/realtime/RealtimeChannelTest.java +++ b/lib/src/test/java/io/ably/lib/test/realtime/RealtimeChannelTest.java @@ -37,14 +37,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import static org.junit.Assert.*; public class RealtimeChannelTest extends ParameterizedTest { @@ -1477,24 +1470,19 @@ public void channel_server_initiated_attached() throws AblyException { channel.attach(); channelWaiter.waitFor(ChannelState.attached); - List updateEventsEmitted = new ArrayList<>(); - channel.on(new ChannelStateListener() { - @Override - public void onChannelStateChanged(ChannelStateChange stateChange) { - updateEventsEmitted.add(stateChange); - } - }); - /* Inject attached message as if received from the server */ ProtocolMessage attachedMessage = new ProtocolMessage() {{ action = Action.attached; channel = channelName; }}; - ably.connection.connectionManager.onMessage(null, attachedMessage); - assertEquals(1, updateEventsEmitted.size()); - assertEquals(ChannelEvent.update, updateEventsEmitted.get(0).event); - assertFalse(updateEventsEmitted.get(0).resumed); + + ChannelStateListener.ChannelStateChange channelUpdateEvent = channelWaiter.waitFor(ChannelEvent.update); + assertEquals(ChannelEvent.update, channelUpdateEvent.event); + assertEquals(ChannelState.attached, channelUpdateEvent.previous); + assertEquals(ChannelState.attached, channelUpdateEvent.current); + assertFalse(channelUpdateEvent.resumed); + assertNull(channelUpdateEvent.reason); } finally { if (ably != null) @@ -1504,13 +1492,13 @@ public void onChannelStateChanged(ChannelStateChange stateChange) { } /* - * Establish connection, attach channel, simulate sending attached and detached messages + * Establish connection, attach channel, simulate sending detached messages * from the server, test correct behaviour * - * Tests RTL12, RTL13a + * Tests RTL13a */ @Test - public void channel_server_initiated_attached_detached() throws AblyException { + public void channel_server_initiated_detached() throws AblyException { AblyRealtime ably = null; long oldRealtimeTimeout = Defaults.realtimeRequestTimeout; final String channelName = "channel_server_initiated_attach_detach"; @@ -1531,38 +1519,17 @@ public void channel_server_initiated_attached_detached() throws AblyException { channel.attach(); channelWaiter.waitFor(ChannelState.attached); - final int[] updateEventsEmitted = {0}; - final boolean[] resumedFlag = {false}; - channel.on(ChannelEvent.update, new ChannelStateListener() { - @Override - public void onChannelStateChanged(ChannelStateChange stateChange) { - updateEventsEmitted[0]++; - resumedFlag[0] = stateChange.resumed; - } - }); - - /* Inject attached message as if received from the server */ -// ProtocolMessage attachedMessage = new ProtocolMessage() {{ -// action = Action.attached; -// channel = channelName; -// flags |= Flag.resumed.getMask(); -// }}; -// ably.connection.connectionManager.onMessage(null, attachedMessage); - -// /* Inject detached message as if from the server */ -// ProtocolMessage detachedMessage = new ProtocolMessage() {{ -// action = Action.detached; -// channel = channelName; -// }}; -// ably.connection.connectionManager.onMessage(null, detachedMessage); + /* Inject detached message as if from the server */ + ProtocolMessage detachedMessage = new ProtocolMessage() {{ + action = Action.detached; + channel = channelName; + }}; + ably.connection.connectionManager.onMessage(null, detachedMessage); /* Channel should transition to attaching, then to attached */ -// channelWaiter.waitFor(ChannelState.attaching); -// channelWaiter.waitFor(ChannelState.attached); + channelWaiter.waitFor(ChannelState.attaching); + channelWaiter.waitFor(ChannelState.attached); - /* Verify received UPDATE message on channel */ - assertEquals("Verify exactly one UPDATE event was emitted on the channel",1, updateEventsEmitted[0]); - assertTrue("Verify resumed flag set in UPDATE event", resumedFlag[0]); } finally { if (ably != null) ably.close(); From da747626c0a343d946e2a3994e4d5445c36dbb4c Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Fri, 26 Jan 2024 13:33:48 +0530 Subject: [PATCH 08/23] Fixed connect reauth failure test --- .../realtime/RealtimeConnectFailTest.java | 27 +++---------------- 1 file changed, 4 insertions(+), 23 deletions(-) diff --git a/lib/src/test/java/io/ably/lib/test/realtime/RealtimeConnectFailTest.java b/lib/src/test/java/io/ably/lib/test/realtime/RealtimeConnectFailTest.java index eff2e652d..021d4ec22 100644 --- a/lib/src/test/java/io/ably/lib/test/realtime/RealtimeConnectFailTest.java +++ b/lib/src/test/java/io/ably/lib/test/realtime/RealtimeConnectFailTest.java @@ -418,13 +418,11 @@ public void onError(ErrorInfo reason) { */ @Test public void connect_reauth_failure_state_flow_test() { - try { - AblyRest ablyRest = null; ClientOptions opts = createOptions(testVars.keys[0].keyStr); - ablyRest = new AblyRest(opts); - final TokenDetails tokenDetails = ablyRest.auth.requestToken(new TokenParams() {{ ttl = 8000L; }}, null); + AblyRest ablyRest = new AblyRest(opts); + final TokenDetails tokenDetails = ablyRest.auth.requestToken(new TokenParams() {{ ttl = 2000L; }}, null); assertNotNull("Expected token value", tokenDetails.token); final ArrayList stateHistory = new ArrayList<>(); @@ -433,31 +431,14 @@ public void connect_reauth_failure_state_flow_test() { optsForRealtime.authCallback = new TokenCallback() { @Override public Object getTokenRequest(TokenParams params) throws AblyException { - // return already expired token + // always return same token return tokenDetails; } }; optsForRealtime.tokenDetails = tokenDetails; final AblyRealtime ablyRealtime = new AblyRealtime(optsForRealtime); - ablyRealtime.connection.on(ConnectionState.connected, new ConnectionStateListener() { - @Override - public void onConnectionStateChanged(ConnectionStateChange state) { - /* To go quicker into a disconnected state we use a - * smaller value for maxIdleInterval - */ - try { - Field field = ablyRealtime.connection.connectionManager.getClass().getDeclaredField("maxIdleInterval"); - field.setAccessible(true); - field.setLong(ablyRealtime.connection.connectionManager, 5000L); - } catch (NoSuchFieldException|IllegalAccessException e) { - fail("Unexpected exception in checking connectionStateTtl"); - } - } - }); - (new ConnectionWaiter(ablyRealtime.connection)).waitFor(ConnectionState.connected); - // TODO: improve by collecting and testing also auth attempts final List correctHistory = Arrays.asList( ConnectionState.disconnected, ConnectionState.connecting, @@ -475,7 +456,7 @@ public void onConnectionStateChanged(ConnectionStateChange state) { if (state.current == ConnectionState.disconnected) { disconnections++; if (disconnections == maxDisconnections) { - assertTrue("Verifying state change history", stateHistory.equals(correctHistory)); + assertEquals(correctHistory, stateHistory); ablyRealtime.close(); } } From 5297b404f2bcd8db3e4b13b2ffbe027a6b3b2796 Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Fri, 26 Jan 2024 17:33:20 +0530 Subject: [PATCH 09/23] Added a test for valid/invalid resume channel attach --- .../test/realtime/RealtimeChannelTest.java | 124 +++++++++++++++++- 1 file changed, 122 insertions(+), 2 deletions(-) diff --git a/lib/src/test/java/io/ably/lib/test/realtime/RealtimeChannelTest.java b/lib/src/test/java/io/ably/lib/test/realtime/RealtimeChannelTest.java index 56c8441a1..12a606df1 100644 --- a/lib/src/test/java/io/ably/lib/test/realtime/RealtimeChannelTest.java +++ b/lib/src/test/java/io/ably/lib/test/realtime/RealtimeChannelTest.java @@ -1541,10 +1541,130 @@ public void channel_server_initiated_detached() throws AblyException { * Establish connection, attach channel, disconnection and failed resume * verify that subsequent attaches are performed, and give rise to update events * - * Tests RTN15c3 + * Tests RTN15c6 */ @Test - public void channel_resume_lost_continuity() throws AblyException { + public void channel_valid_resume_reattach_channels() throws AblyException { + AblyRealtime ably = null; + final String attachedChannelName = "channel_resume_lost_continuity_attached"; + final String suspendedChannelName = "channel_resume_lost_continuity_suspended"; + + try { + ClientOptions opts = createOptions(testVars.keys[0].keyStr); + ably = new AblyRealtime(opts); + + /* prepare channels */ + Channel attachedChannel = ably.channels.get(attachedChannelName); + ChannelWaiter attachedChannelWaiter = new ChannelWaiter(attachedChannel); + attachedChannel.attach(); + attachedChannelWaiter.waitFor(ChannelState.attached); + + Channel suspendedChannel = ably.channels.get(suspendedChannelName); + suspendedChannel.state = ChannelState.suspended; + ChannelWaiter suspendedChannelWaiter = new ChannelWaiter(suspendedChannel); + + final boolean[] suspendedStateReached = new boolean[2]; + final boolean[] attachingStateReached = new boolean[2]; + final boolean[] attachedStateReached = new boolean[2]; + final boolean[] resumedFlag = new boolean[]{true, true}; + attachedChannel.on(new ChannelStateListener() { + @Override + public void onChannelStateChanged(ChannelStateChange stateChange) { + switch(stateChange.current) { + case suspended: + suspendedStateReached[0] = true; + break; + case attaching: + attachingStateReached[0] = true; + break; + case attached: + attachedStateReached[0] = true; + resumedFlag[0] = stateChange.resumed; + break; + default: + break; + } + } + }); + suspendedChannel.on(new ChannelStateListener() { + @Override + public void onChannelStateChanged(ChannelStateChange stateChange) { + switch(stateChange.current) { + case attaching: + attachingStateReached[1] = true; + break; + case attached: + attachedStateReached[1] = true; + resumedFlag[1] = stateChange.resumed; + break; + default: + break; + } + } + }); + + /* disconnect, and sabotage the resume */ + String originalConnectionId = ably.connection.id; + ably.connection.key = "_____!ably___test_fake-key____"; + ably.connection.id = "ably___tes"; + ConnectionWaiter connectionWaiter = new ConnectionWaiter(ably.connection); + + /* suppress automatic retries by the connection manager */ + try { + Method method = ably.connection.connectionManager.getClass().getDeclaredMethod("disconnectAndSuppressRetries"); + method.setAccessible(true); + method.invoke(ably.connection.connectionManager); + } catch (NoSuchMethodException|IllegalAccessException|InvocationTargetException e) { + fail("Unexpected exception in suppressing retries"); + } + + connectionWaiter.waitFor(ConnectionState.disconnected); + assertEquals("Verify disconnected state is reached", ConnectionState.disconnected, ably.connection.state); + + /* wait */ + try { Thread.sleep(2000L); } catch(InterruptedException e) {} + + /* wait for connection to be reestablished */ + System.out.println("channel_resume_lost_continuity: initiating reconnection (resume)"); + ably.connection.connect(); + connectionWaiter.waitFor(ConnectionState.connected); + + /* verify a new connection was assigned */ + assertNotEquals("A new connection was created", originalConnectionId, ably.connection.id); + + /* previously suspended channel should transition to attaching, then to attached */ + suspendedChannelWaiter.waitFor(ChannelState.attached); + + /* previously attached channel should remain attached */ + attachedChannelWaiter.waitFor(ChannelState.attached); + + /* + * Verify each channel undergoes relevant events: + * - previously attached channel does attaching, attached, without visiting suspended; + * - previously suspended channel does attaching, attached + */ + assertEquals("Verify channel was not suspended", suspendedStateReached[0], false); + assertEquals("Verify channel was attaching", attachingStateReached[0], true); + assertEquals("Verify channel was attached", attachedStateReached[0], true); + assertFalse("Verify resumed flag set false in ATTACHED event", resumedFlag[0]); + + assertEquals("Verify channel was attaching", attachingStateReached[1], true); + assertEquals("Verify channel was attached", attachedStateReached[1], true); + assertFalse("Verify resumed flag set false in ATTACHED event", resumedFlag[1]); + } finally { + if (ably != null) + ably.close(); + } + } + + /* + * Establish connection, attach channel, disconnection and failed resume + * verify that subsequent attaches are performed, and give rise to update events + * + * Tests RTN15c7 + */ + @Test + public void channel_invalid_resume_reattach_channels() throws AblyException { AblyRealtime ably = null; final String attachedChannelName = "channel_resume_lost_continuity_attached"; final String suspendedChannelName = "channel_resume_lost_continuity_suspended"; From e9790c02b6fd4e5143d1306b17ff70e15914ef7a Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Fri, 26 Jan 2024 17:34:52 +0530 Subject: [PATCH 10/23] Simplified channel attach/detach assertions --- .../test/realtime/RealtimeChannelTest.java | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/lib/src/test/java/io/ably/lib/test/realtime/RealtimeChannelTest.java b/lib/src/test/java/io/ably/lib/test/realtime/RealtimeChannelTest.java index 12a606df1..58a862a3b 100644 --- a/lib/src/test/java/io/ably/lib/test/realtime/RealtimeChannelTest.java +++ b/lib/src/test/java/io/ably/lib/test/realtime/RealtimeChannelTest.java @@ -1643,13 +1643,13 @@ public void onChannelStateChanged(ChannelStateChange stateChange) { * - previously attached channel does attaching, attached, without visiting suspended; * - previously suspended channel does attaching, attached */ - assertEquals("Verify channel was not suspended", suspendedStateReached[0], false); - assertEquals("Verify channel was attaching", attachingStateReached[0], true); - assertEquals("Verify channel was attached", attachedStateReached[0], true); + assertFalse("Verify channel was not suspended", suspendedStateReached[0]); + assertTrue("Verify channel was attaching", attachingStateReached[0]); + assertTrue("Verify channel was attached", attachedStateReached[0]); assertFalse("Verify resumed flag set false in ATTACHED event", resumedFlag[0]); - assertEquals("Verify channel was attaching", attachingStateReached[1], true); - assertEquals("Verify channel was attached", attachedStateReached[1], true); + assertTrue("Verify channel was attaching", attachingStateReached[1]); + assertTrue("Verify channel was attached", attachedStateReached[1]); assertFalse("Verify resumed flag set false in ATTACHED event", resumedFlag[1]); } finally { if (ably != null) @@ -1763,13 +1763,13 @@ public void onChannelStateChanged(ChannelStateChange stateChange) { * - previously attached channel does attaching, attached, without visiting suspended; * - previously suspended channel does attaching, attached */ - assertEquals("Verify channel was not suspended", suspendedStateReached[0], false); - assertEquals("Verify channel was attaching", attachingStateReached[0], true); - assertEquals("Verify channel was attached", attachedStateReached[0], true); + assertFalse("Verify channel was not suspended", suspendedStateReached[0]); + assertTrue("Verify channel was attaching", attachingStateReached[0]); + assertTrue("Verify channel was attached", attachedStateReached[0]); assertFalse("Verify resumed flag set false in ATTACHED event", resumedFlag[0]); - assertEquals("Verify channel was attaching", attachingStateReached[1], true); - assertEquals("Verify channel was attached", attachedStateReached[1], true); + assertTrue("Verify channel was attaching", attachingStateReached[1]); + assertTrue("Verify channel was attached", attachedStateReached[1]); assertFalse("Verify resumed flag set false in ATTACHED event", resumedFlag[1]); } finally { if (ably != null) From f7c5437bbcc581f53df4546c2c7ce044ae28d9ba Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Mon, 29 Jan 2024 18:41:52 +0530 Subject: [PATCH 11/23] Added channelStateChange specific helpers with recorders --- .../java/io/ably/lib/test/common/Helpers.java | 64 +++++++++++++++---- 1 file changed, 51 insertions(+), 13 deletions(-) diff --git a/lib/src/test/java/io/ably/lib/test/common/Helpers.java b/lib/src/test/java/io/ably/lib/test/common/Helpers.java index b2af77cb8..729c80c2c 100644 --- a/lib/src/test/java/io/ably/lib/test/common/Helpers.java +++ b/lib/src/test/java/io/ably/lib/test/common/Helpers.java @@ -18,6 +18,7 @@ import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; import java.util.regex.Pattern; +import java.util.stream.Collectors; import com.google.gson.Gson; import com.google.gson.JsonArray; @@ -551,14 +552,14 @@ public ConnectionManagerWaiter(ConnectionManager connectionManager) { */ public synchronized ErrorInfo waitFor(ConnectionState state) { while(connectionManager.getConnectionState().state != state) - try { wait(INTERVAL_POLLING); } catch(InterruptedException e) {} + try { wait(INTERVAL_POLLING); } catch(InterruptedException ignored) {} return connectionManager.getConnectionState().defaultErrorInfo; } /** * Internal */ - private ConnectionManager connectionManager; + private final ConnectionManager connectionManager; } /** @@ -571,7 +572,6 @@ public static class ChannelWaiter implements ChannelStateListener { /** * Public API - * @param channel */ public ChannelWaiter(Channel channel) { this.channel = channel; @@ -581,11 +581,13 @@ public ChannelWaiter(Channel channel) { /** * Wait for a given state to be reached. */ - public synchronized ErrorInfo waitFor(ChannelState state) { - Log.d(TAG, "waitFor(" + state + ")"); - while(channel.state != state) - try { wait(); } catch(InterruptedException ignored) {} - Log.d(TAG, "waitFor done: " + channel.state + ", " + channel.reason + ")"); + public synchronized ErrorInfo waitFor(ChannelState ... states) { + for (ChannelState state : states) { + Log.d(TAG, "waitFor(" + state + ")"); + while(channel.state != state) + try { wait(); } catch(InterruptedException ignored) {} + Log.d(TAG, "waitFor done: " + channel.state + ", " + channel.reason + ")"); + } return channel.reason; } @@ -594,28 +596,64 @@ public synchronized ErrorInfo waitFor(ChannelState state) { */ public synchronized ChannelStateChange waitFor(ChannelEvent channelEvent) { Log.d(TAG, "waitFor(" + channelEvent + ")"); - while(this.channelStateChange.event != channelEvent) + ChannelStateChange lastStateChange = getLastStateChange(); + while(lastStateChange.event != channelEvent) try { wait(); } catch(InterruptedException ignored) {} Log.d(TAG, "waitFor done: " + channel.state + ", " + channel.reason + ")"); - return this.channelStateChange; + return lastStateChange; } /** * ChannelStateListener interface */ @Override - public void onChannelStateChanged(ChannelStateListener.ChannelStateChange stateChange) { + public void onChannelStateChanged(ChannelStateChange stateChange) { synchronized(this) { - this.channelStateChange = stateChange; + recordedStates.add(stateChange); notify(); } } + private final List recordedStates = Collections.synchronizedList(new ArrayList<>()); + + public List getRecordedStates() { + return recordedStates.stream().map(stateChange -> stateChange.current).collect(Collectors.toList()); + } + + public boolean hasFinalStates(ChannelState ... states) { + List rstates = getRecordedStates(); + List vettedList = rstates.subList(rstates.size() - states.length, rstates.size()); + return hasStates(vettedList, states); + } + + public boolean hasStates(ChannelState ... states) { + return hasStates(getRecordedStates(), states); + } + + private static boolean hasStates(List stateList, ChannelState ... states) { + boolean foundStates = false; + int statesCounter = 0; + for (ChannelState recordedState : stateList) { + if (states[statesCounter] != recordedState) { + statesCounter = 0; + } + if (states[statesCounter] == recordedState) { + statesCounter++; + } + if (statesCounter == states.length) { + foundStates = true; + } + } + return foundStates; + } + + public ChannelStateChange getLastStateChange() { + return recordedStates.get(recordedStates.size()-1); + } /** * Internal */ private final Channel channel; - private ChannelStateChange channelStateChange; } /** From a72e99052685ca303e796fadb64bffacb66f1970 Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Mon, 29 Jan 2024 18:42:21 +0530 Subject: [PATCH 12/23] Fixed tests for re-attaching channels on connection resume success/failure --- .../test/realtime/RealtimeChannelTest.java | 204 ++++++------------ 1 file changed, 67 insertions(+), 137 deletions(-) diff --git a/lib/src/test/java/io/ably/lib/test/realtime/RealtimeChannelTest.java b/lib/src/test/java/io/ably/lib/test/realtime/RealtimeChannelTest.java index 58a862a3b..8341acd54 100644 --- a/lib/src/test/java/io/ably/lib/test/realtime/RealtimeChannelTest.java +++ b/lib/src/test/java/io/ably/lib/test/realtime/RealtimeChannelTest.java @@ -1546,69 +1546,32 @@ public void channel_server_initiated_detached() throws AblyException { @Test public void channel_valid_resume_reattach_channels() throws AblyException { AblyRealtime ably = null; - final String attachedChannelName = "channel_resume_lost_continuity_attached"; - final String suspendedChannelName = "channel_resume_lost_continuity_suspended"; try { ClientOptions opts = createOptions(testVars.keys[0].keyStr); ably = new AblyRealtime(opts); + ConnectionWaiter connectionWaiter = new ConnectionWaiter(ably.connection); + ably.connect(); + connectionWaiter.waitFor(ConnectionState.connected); + String originalConnectionId = ably.connection.id; /* prepare channels */ - Channel attachedChannel = ably.channels.get(attachedChannelName); + Channel attachedChannel = ably.channels.get("attached_channel"); ChannelWaiter attachedChannelWaiter = new ChannelWaiter(attachedChannel); attachedChannel.attach(); attachedChannelWaiter.waitFor(ChannelState.attached); + attachedChannel.publish("chat", "message"); - Channel suspendedChannel = ably.channels.get(suspendedChannelName); - suspendedChannel.state = ChannelState.suspended; + Channel suspendedChannel = ably.channels.get("suspended_channel"); ChannelWaiter suspendedChannelWaiter = new ChannelWaiter(suspendedChannel); + suspendedChannel.attach(); + suspendedChannelWaiter.waitFor(ChannelState.attached); + suspendedChannel.setSuspended(null, true); + suspendedChannelWaiter.waitFor(ChannelState.suspended); - final boolean[] suspendedStateReached = new boolean[2]; - final boolean[] attachingStateReached = new boolean[2]; - final boolean[] attachedStateReached = new boolean[2]; - final boolean[] resumedFlag = new boolean[]{true, true}; - attachedChannel.on(new ChannelStateListener() { - @Override - public void onChannelStateChanged(ChannelStateChange stateChange) { - switch(stateChange.current) { - case suspended: - suspendedStateReached[0] = true; - break; - case attaching: - attachingStateReached[0] = true; - break; - case attached: - attachedStateReached[0] = true; - resumedFlag[0] = stateChange.resumed; - break; - default: - break; - } - } - }); - suspendedChannel.on(new ChannelStateListener() { - @Override - public void onChannelStateChanged(ChannelStateChange stateChange) { - switch(stateChange.current) { - case attaching: - attachingStateReached[1] = true; - break; - case attached: - attachedStateReached[1] = true; - resumedFlag[1] = stateChange.resumed; - break; - default: - break; - } - } - }); + assertEquals(ably.connection.connectionManager.msgSerial, 1); /* disconnect, and sabotage the resume */ - String originalConnectionId = ably.connection.id; - ably.connection.key = "_____!ably___test_fake-key____"; - ably.connection.id = "ably___tes"; - ConnectionWaiter connectionWaiter = new ConnectionWaiter(ably.connection); - /* suppress automatic retries by the connection manager */ try { Method method = ably.connection.connectionManager.getClass().getDeclaredMethod("disconnectAndSuppressRetries"); @@ -1621,36 +1584,35 @@ public void onChannelStateChanged(ChannelStateChange stateChange) { connectionWaiter.waitFor(ConnectionState.disconnected); assertEquals("Verify disconnected state is reached", ConnectionState.disconnected, ably.connection.state); - /* wait */ - try { Thread.sleep(2000L); } catch(InterruptedException e) {} - /* wait for connection to be reestablished */ System.out.println("channel_resume_lost_continuity: initiating reconnection (resume)"); ably.connection.connect(); - connectionWaiter.waitFor(ConnectionState.connected); - /* verify a new connection was assigned */ - assertNotEquals("A new connection was created", originalConnectionId, ably.connection.id); + ErrorInfo resumeError = connectionWaiter.waitFor(ConnectionState.connected); + assertNull(resumeError); + assertNull(ably.connection.connectionManager.getStateErrorInfo()); + assertEquals("Same connection is used", originalConnectionId, ably.connection.id); + assertEquals(ably.connection.connectionManager.msgSerial, 1); - /* previously suspended channel should transition to attaching, then to attached */ + attachedChannelWaiter.waitFor(ChannelState.attaching, ChannelState.attached); suspendedChannelWaiter.waitFor(ChannelState.attached); - /* previously attached channel should remain attached */ - attachedChannelWaiter.waitFor(ChannelState.attached); + assertFalse("Verify channel was not suspended", + attachedChannelWaiter.hasStates(ChannelState.suspended)); + assertTrue("Verify channel was attaching and attached", + attachedChannelWaiter.hasFinalStates(ChannelState.attaching, ChannelState.attached)); + + ChannelStateListener.ChannelStateChange stateChange = attachedChannelWaiter.getLastStateChange(); + assertEquals(ChannelState.attached, stateChange.current); + assertEquals(ChannelState.attaching, stateChange.previous); + + assertTrue("Verify channel was attaching", + suspendedChannelWaiter.hasFinalStates(ChannelState.attaching, ChannelState.attached)); + + stateChange = suspendedChannelWaiter.getLastStateChange(); + assertEquals(ChannelState.attached, stateChange.current); + assertEquals(ChannelState.attaching, stateChange.previous); - /* - * Verify each channel undergoes relevant events: - * - previously attached channel does attaching, attached, without visiting suspended; - * - previously suspended channel does attaching, attached - */ - assertFalse("Verify channel was not suspended", suspendedStateReached[0]); - assertTrue("Verify channel was attaching", attachingStateReached[0]); - assertTrue("Verify channel was attached", attachedStateReached[0]); - assertFalse("Verify resumed flag set false in ATTACHED event", resumedFlag[0]); - - assertTrue("Verify channel was attaching", attachingStateReached[1]); - assertTrue("Verify channel was attached", attachedStateReached[1]); - assertFalse("Verify resumed flag set false in ATTACHED event", resumedFlag[1]); } finally { if (ably != null) ably.close(); @@ -1672,62 +1634,26 @@ public void channel_invalid_resume_reattach_channels() throws AblyException { try { ClientOptions opts = createOptions(testVars.keys[0].keyStr); ably = new AblyRealtime(opts); + ConnectionWaiter connectionWaiter = new ConnectionWaiter(ably.connection); + ably.connect(); + connectionWaiter.waitFor(ConnectionState.connected); + String originalConnectionId = ably.connection.id; /* prepare channels */ Channel attachedChannel = ably.channels.get(attachedChannelName); ChannelWaiter attachedChannelWaiter = new ChannelWaiter(attachedChannel); attachedChannel.attach(); attachedChannelWaiter.waitFor(ChannelState.attached); + attachedChannel.publish("chat", "message"); Channel suspendedChannel = ably.channels.get(suspendedChannelName); - suspendedChannel.state = ChannelState.suspended; ChannelWaiter suspendedChannelWaiter = new ChannelWaiter(suspendedChannel); + suspendedChannel.attach(); + suspendedChannelWaiter.waitFor(ChannelState.attached); + suspendedChannel.setSuspended(null, true); + suspendedChannelWaiter.waitFor(ChannelState.suspended); - final boolean[] suspendedStateReached = new boolean[2]; - final boolean[] attachingStateReached = new boolean[2]; - final boolean[] attachedStateReached = new boolean[2]; - final boolean[] resumedFlag = new boolean[]{true, true}; - attachedChannel.on(new ChannelStateListener() { - @Override - public void onChannelStateChanged(ChannelStateChange stateChange) { - switch(stateChange.current) { - case suspended: - suspendedStateReached[0] = true; - break; - case attaching: - attachingStateReached[0] = true; - break; - case attached: - attachedStateReached[0] = true; - resumedFlag[0] = stateChange.resumed; - break; - default: - break; - } - } - }); - suspendedChannel.on(new ChannelStateListener() { - @Override - public void onChannelStateChanged(ChannelStateChange stateChange) { - switch(stateChange.current) { - case attaching: - attachingStateReached[1] = true; - break; - case attached: - attachedStateReached[1] = true; - resumedFlag[1] = stateChange.resumed; - break; - default: - break; - } - } - }); - - /* disconnect, and sabotage the resume */ - String originalConnectionId = ably.connection.id; - ably.connection.key = "_____!ably___test_fake-key____"; - ably.connection.id = "ably___tes"; - ConnectionWaiter connectionWaiter = new ConnectionWaiter(ably.connection); + assertEquals(ably.connection.connectionManager.msgSerial, 1); /* suppress automatic retries by the connection manager */ try { @@ -1741,36 +1667,40 @@ public void onChannelStateChanged(ChannelStateChange stateChange) { connectionWaiter.waitFor(ConnectionState.disconnected); assertEquals("Verify disconnected state is reached", ConnectionState.disconnected, ably.connection.state); - /* wait */ - try { Thread.sleep(2000L); } catch(InterruptedException e) {} + /* disconnect, and sabotage the resume */ + ably.connection.key = "_____!ably___test_fake-key____"; /* wait for connection to be reestablished */ System.out.println("channel_resume_lost_continuity: initiating reconnection (resume)"); ably.connection.connect(); - connectionWaiter.waitFor(ConnectionState.connected); - /* verify a new connection was assigned */ + + ErrorInfo resumeError = connectionWaiter.waitFor(ConnectionState.connected); + assertNotNull(resumeError); + assertTrue(resumeError.message.contains("Invalid connection key")); + assertSame(resumeError, ably.connection.connectionManager.getStateErrorInfo()); assertNotEquals("A new connection was created", originalConnectionId, ably.connection.id); + assertEquals(ably.connection.connectionManager.msgSerial, 0); - /* previously suspended channel should transition to attaching, then to attached */ + attachedChannelWaiter.waitFor(ChannelState.attaching, ChannelState.attached); suspendedChannelWaiter.waitFor(ChannelState.attached); - /* previously attached channel should remain attached */ - attachedChannelWaiter.waitFor(ChannelState.attached); + assertFalse("Verify channel was not suspended", + attachedChannelWaiter.hasStates(ChannelState.suspended)); + assertTrue("Verify channel was attaching and attached", + attachedChannelWaiter.hasFinalStates(ChannelState.attaching, ChannelState.attached)); + + ChannelStateListener.ChannelStateChange stateChange = attachedChannelWaiter.getLastStateChange(); + assertEquals(ChannelState.attached, stateChange.current); + assertEquals(ChannelState.attaching, stateChange.previous); + + assertTrue("Verify channel was attaching", + suspendedChannelWaiter.hasFinalStates(ChannelState.attaching, ChannelState.attached)); + + stateChange = suspendedChannelWaiter.getLastStateChange(); + assertEquals(ChannelState.attached, stateChange.current); + assertEquals(ChannelState.attaching, stateChange.previous); - /* - * Verify each channel undergoes relevant events: - * - previously attached channel does attaching, attached, without visiting suspended; - * - previously suspended channel does attaching, attached - */ - assertFalse("Verify channel was not suspended", suspendedStateReached[0]); - assertTrue("Verify channel was attaching", attachingStateReached[0]); - assertTrue("Verify channel was attached", attachedStateReached[0]); - assertFalse("Verify resumed flag set false in ATTACHED event", resumedFlag[0]); - - assertTrue("Verify channel was attaching", attachingStateReached[1]); - assertTrue("Verify channel was attached", attachedStateReached[1]); - assertFalse("Verify resumed flag set false in ATTACHED event", resumedFlag[1]); } finally { if (ably != null) ably.close(); From 9a6a7f486f070d30cb0c8ee1d73a97339703b423 Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Tue, 30 Jan 2024 00:17:06 +0530 Subject: [PATCH 13/23] Refactored realtime delta decoder helpers --- lib/src/test/java/io/ably/lib/test/common/Helpers.java | 4 ++-- .../io/ably/lib/test/realtime/RealtimeDeltaDecoderTest.java | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/src/test/java/io/ably/lib/test/common/Helpers.java b/lib/src/test/java/io/ably/lib/test/common/Helpers.java index 8a9a7e38b..61db9db86 100644 --- a/lib/src/test/java/io/ably/lib/test/common/Helpers.java +++ b/lib/src/test/java/io/ably/lib/test/common/Helpers.java @@ -263,7 +263,7 @@ public MessageWaiter(Channel channel, String event) { */ public synchronized void waitFor(int count) { while(receivedMessages.size() < count) - try { wait(); } catch(InterruptedException e) {} + try { wait(); } catch(InterruptedException ignored) {} } /** @@ -274,7 +274,7 @@ public synchronized void waitFor(int count, long time) { long targetTime = System.currentTimeMillis() + time; long remaining = time; while(receivedMessages.size() < count && remaining > 0) { - try { wait(remaining); } catch(InterruptedException e) {} + try { wait(remaining); } catch(InterruptedException ignored) {} remaining = targetTime - System.currentTimeMillis(); } } diff --git a/lib/src/test/java/io/ably/lib/test/realtime/RealtimeDeltaDecoderTest.java b/lib/src/test/java/io/ably/lib/test/realtime/RealtimeDeltaDecoderTest.java index d0fe03531..78d2ea026 100644 --- a/lib/src/test/java/io/ably/lib/test/realtime/RealtimeDeltaDecoderTest.java +++ b/lib/src/test/java/io/ably/lib/test/realtime/RealtimeDeltaDecoderTest.java @@ -62,7 +62,7 @@ public void simple_delta_codec() { Message message = messageWaiter.receivedMessages.get(i); int messageIndex = Integer.parseInt(message.name); assertEquals("Verify message order", i, messageIndex); - assertEquals("Verify message data", true, testData[messageIndex].equals(message.data)); + assertEquals("Verify message data", testData[messageIndex], message.data); } } catch(Exception e) { fail(testName + ": Unexpected exception " + e.getMessage()); From d13051f35733896e78dcff98fdcbe1bb538d286f Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Tue, 30 Jan 2024 16:59:11 +0530 Subject: [PATCH 14/23] Fixed deltadecode failure recovery test --- lib/src/main/java/io/ably/lib/realtime/ChannelBase.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/lib/src/main/java/io/ably/lib/realtime/ChannelBase.java b/lib/src/main/java/io/ably/lib/realtime/ChannelBase.java index 7538789ad..616ac98f4 100644 --- a/lib/src/main/java/io/ably/lib/realtime/ChannelBase.java +++ b/lib/src/main/java/io/ably/lib/realtime/ChannelBase.java @@ -251,10 +251,11 @@ private void attachImpl(final boolean forceReattach, final CompletionListener li attachMessage.setFlags(options.getModeFlags()); } } - if(this.decodeFailureRecoveryInProgress) { - Log.v(TAG, "attach(); message decode recovery in progress."); - } attachMessage.channelSerial = properties.channelSerial; // RTL4c1 + if(this.decodeFailureRecoveryInProgress) { // RTL18c + Log.v(TAG, "attach(); message decode recovery in progress, setting last message channelserial"); + attachMessage.channelSerial = this.lastPayloadProtocolMessageChannelSerial; + } try { if (listener != null) { on(new ChannelStateCompletionListener(listener, ChannelState.attached, ChannelState.failed)); @@ -838,6 +839,7 @@ private void onMessage(final ProtocolMessage protocolMessage) { } lastPayloadMessageId = lastMessage.id; + lastPayloadProtocolMessageChannelSerial = protocolMessage.channelSerial; for (final Message msg : messages) { this.listeners.onMessage(msg); @@ -1340,6 +1342,7 @@ public void once(ChannelState state, ChannelStateListener listener) { */ private Set modes; private String lastPayloadMessageId; + private String lastPayloadProtocolMessageChannelSerial; private boolean decodeFailureRecoveryInProgress; private final DecodingContext decodingContext; } From 5449a9b7477e32396b9b8f527dd2bc5b47cc99be Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Wed, 31 Jan 2024 01:36:13 +0530 Subject: [PATCH 15/23] Fixed test for resume publish reenter with right message size --- .../io/ably/lib/test/realtime/RealtimePresenceTest.java | 2 +- .../java/io/ably/lib/test/realtime/RealtimeResumeTest.java | 6 ++++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/lib/src/test/java/io/ably/lib/test/realtime/RealtimePresenceTest.java b/lib/src/test/java/io/ably/lib/test/realtime/RealtimePresenceTest.java index 0bd2e7b57..84591f86f 100644 --- a/lib/src/test/java/io/ably/lib/test/realtime/RealtimePresenceTest.java +++ b/lib/src/test/java/io/ably/lib/test/realtime/RealtimePresenceTest.java @@ -1717,7 +1717,7 @@ public void onPresenceMessage(PresenceMessage message) { !receivedMessageStack.get(receivedMessageStack.size()-1).data.equals("Dolor sit!")) receivedMessageStack.wait(); } - } catch(InterruptedException e) {} + } catch(InterruptedException ignored) {} /* Validate that, *- we received specific actions diff --git a/lib/src/test/java/io/ably/lib/test/realtime/RealtimeResumeTest.java b/lib/src/test/java/io/ably/lib/test/realtime/RealtimeResumeTest.java index 2895dc841..7a2024a56 100644 --- a/lib/src/test/java/io/ably/lib/test/realtime/RealtimeResumeTest.java +++ b/lib/src/test/java/io/ably/lib/test/realtime/RealtimeResumeTest.java @@ -29,9 +29,11 @@ import java.lang.reflect.Field; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; @@ -1062,13 +1064,13 @@ public void onConnectionStateChanged(ConnectionStateChange state) { System.out.println("presence_resume_test: sent message with client: "+presenceMessage.clientId +" " + " action:"+presenceMessage.action); } - assertEquals("Second round of messages has incorrect size", 9, transport.getSentPresenceMessages().size()); + assertEquals("Second round of messages has incorrect size", 6, transport.getSentPresenceMessages().size()); //make sure they were sent with correct client ids final Map sentPresenceMap = new HashMap<>(); for (PresenceMessage presenceMessage: transport.getSentPresenceMessages()){ sentPresenceMap.put(presenceMessage.clientId, presenceMessage); } - for (String client : clients) { + for (String client : Arrays.stream(clients).skip(3).collect(Collectors.toList())) { assertTrue("Client id isn't there:" + client, sentPresenceMap.containsKey(client)); } } From 98e1e8f72c5dfcf90b72df5a2106b536c69a0f13 Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Wed, 31 Jan 2024 01:40:26 +0530 Subject: [PATCH 16/23] refactored test for resume publish re-enter --- .../ably/lib/test/realtime/RealtimeResumeTest.java | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/lib/src/test/java/io/ably/lib/test/realtime/RealtimeResumeTest.java b/lib/src/test/java/io/ably/lib/test/realtime/RealtimeResumeTest.java index 7a2024a56..39eea60fb 100644 --- a/lib/src/test/java/io/ably/lib/test/realtime/RealtimeResumeTest.java +++ b/lib/src/test/java/io/ably/lib/test/realtime/RealtimeResumeTest.java @@ -1006,8 +1006,8 @@ public void onConnectionStateChanged(ConnectionStateChange state) { message.action == ProtocolMessage.Action.nack); //enter next 3 clients - for (int i = 0; i < 3; i++) { - senderChannel.presence.enterClient(clients[i+3],null,presenceCompletion.add()); + for (int i = 3; i < 6; i++) { + senderChannel.presence.enterClient(clients[i],null,presenceCompletion.add()); } final String firstConnectionId = ably.connection.id; @@ -1024,8 +1024,8 @@ public void onConnectionStateChanged(ConnectionStateChange state) { assertEquals("Disconnected state was not reached", ConnectionState.disconnected, ably.connection.state); //enter last 3 clients while disconnected - for (int i = 0; i < 3; i++) { - senderChannel.presence.enterClient(clients[i+6],null,presenceCompletion.add()); + for (int i = 6; i < 9; i++) { + senderChannel.presence.enterClient(clients[i],null,presenceCompletion.add()); } /* Wait for the connection to go stale, then reconnect */ @@ -1070,8 +1070,10 @@ public void onConnectionStateChanged(ConnectionStateChange state) { for (PresenceMessage presenceMessage: transport.getSentPresenceMessages()){ sentPresenceMap.put(presenceMessage.clientId, presenceMessage); } - for (String client : Arrays.stream(clients).skip(3).collect(Collectors.toList())) { - assertTrue("Client id isn't there:" + client, sentPresenceMap.containsKey(client)); + + for (int i = 3; i < 9; i++) { + assertTrue("Client id isn't there:" + clients[i], sentPresenceMap.containsKey(clients[i])); + senderChannel.presence.enterClient(clients[i],null,presenceCompletion.add()); } } } From 4fded6928dadcc19a54830a1a56a73e4612bce06 Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Wed, 31 Jan 2024 02:07:19 +0530 Subject: [PATCH 17/23] refactored code, added a separate class for updating connectionmanager fields --- .../test/realtime/RealtimePresenceTest.java | 1 - .../lib/test/realtime/RealtimeResumeTest.java | 144 +++++++----------- 2 files changed, 58 insertions(+), 87 deletions(-) diff --git a/lib/src/test/java/io/ably/lib/test/realtime/RealtimePresenceTest.java b/lib/src/test/java/io/ably/lib/test/realtime/RealtimePresenceTest.java index 84591f86f..a11a8c339 100644 --- a/lib/src/test/java/io/ably/lib/test/realtime/RealtimePresenceTest.java +++ b/lib/src/test/java/io/ably/lib/test/realtime/RealtimePresenceTest.java @@ -1655,7 +1655,6 @@ public void onPresenceMessage(PresenceMessage message) { * state will have all messages sent once the channel attaches, and all listeners will be called. *

* - * @throws AblyException */ @Test public void realtime_presence_update_multiple_queued_messages() throws AblyException { diff --git a/lib/src/test/java/io/ably/lib/test/realtime/RealtimeResumeTest.java b/lib/src/test/java/io/ably/lib/test/realtime/RealtimeResumeTest.java index 39eea60fb..4e1959874 100644 --- a/lib/src/test/java/io/ably/lib/test/realtime/RealtimeResumeTest.java +++ b/lib/src/test/java/io/ably/lib/test/realtime/RealtimeResumeTest.java @@ -13,6 +13,7 @@ import io.ably.lib.test.common.Helpers.MessageWaiter; import io.ably.lib.test.common.ParameterizedTest; import io.ably.lib.test.util.MockWebsocketFactory; +import io.ably.lib.transport.ConnectionManager; import io.ably.lib.types.AblyException; import io.ably.lib.types.ClientOptions; import io.ably.lib.types.ErrorInfo; @@ -29,11 +30,9 @@ import java.lang.reflect.Field; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; -import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; @@ -60,6 +59,8 @@ public void resume_none() { try { ClientOptions opts = createOptions(testVars.keys[0].keyStr); ably = new AblyRealtime(opts); + ConnectionWaiter connectionWaiter = new ConnectionWaiter(ably.connection); + connectionWaiter.waitFor(ConnectionState.connected); /* create and attach channel */ final Channel channel = ably.channels.get(channelName); @@ -68,21 +69,12 @@ public void resume_none() { (new ChannelWaiter(channel)).waitFor(ChannelState.attached); assertEquals("Verify attached state reached", channel.state, ChannelState.attached); - /* disconnect the connection, without closing, - /* suppressing automatic retries by the connection manager */ - System.out.println("Simulating dropped transport"); - try { - Method method = ably.connection.connectionManager.getClass().getDeclaredMethod("disconnectAndSuppressRetries"); - method.setAccessible(true); - method.invoke(ably.connection.connectionManager); - } catch (NoSuchMethodException|IllegalAccessException| InvocationTargetException e) { - fail("Unexpected exception in suppressing retries"); - } + new MutableConnectionManager(ably).disconnectAndSuppressRetries(); + connectionWaiter.waitFor(ConnectionState.disconnected); /* reconnect the rx connection */ ably.connection.connect(); System.out.println("Waiting for reconnection"); - ConnectionWaiter connectionWaiter = new ConnectionWaiter(ably.connection); connectionWaiter.waitFor(ConnectionState.connected); assertEquals("Verify connected state is reached", ConnectionState.connected, ably.connection.state); @@ -528,14 +520,9 @@ public void resume_verify_publish() { * of the library, to simulate a dropped transport without * causing the connection itself to be disposed */ System.out.println("*** about to disconnect tx connection"); - /* suppress automatic retries by the connection manager */ - try { - Method method = ablyTx.connection.connectionManager.getClass().getDeclaredMethod("disconnectAndSuppressRetries"); - method.setAccessible(true); - method.invoke(ablyTx.connection.connectionManager); - } catch (NoSuchMethodException|IllegalAccessException|InvocationTargetException e) { - fail("Unexpected exception in suppressing retries"); - } + + new MutableConnectionManager(ablyTx).disconnectAndSuppressRetries(); + (new ConnectionWaiter(ablyTx.connection)).waitFor(ConnectionState.disconnected); /* wait */ try { Thread.sleep(2000L); } catch(InterruptedException ignored) {} @@ -754,15 +741,7 @@ public void resume_publish_resend_pending_messages_when_resume_is_successful() { final String connectionId = sender.connection.id; - /* suppress automatic retries by the connection manager and disconnect */ - try { - Method method = sender.connection.connectionManager.getClass().getDeclaredMethod( - "disconnectAndSuppressRetries"); - method.setAccessible(true); - method.invoke(sender.connection.connectionManager); - } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) { - fail("Unexpected exception in suppressing retries"); - } + new MutableConnectionManager(sender).disconnectAndSuppressRetries(); (new ConnectionWaiter(sender.connection)).waitFor(ConnectionState.disconnected); sender.connection.connectionManager.requestState(ConnectionState.disconnected); @@ -828,28 +807,14 @@ public void resume_publish_resend_pending_messages_when_resume_failed() throws A try(AblyRealtime ably = new AblyRealtime(options)) { final long newTtl = 1000L; final long newIdleInterval = 1000L; - /* We want this greater than newTtl + newIdleInterval */ - final long waitInDisconnectedState = 3000L; - - ably.connection.on(ConnectionEvent.connected, new ConnectionStateListener() { - @Override - public void onConnectionStateChanged(ConnectionStateChange state) { - try { - Field connectionStateField = ably.connection.connectionManager.getClass().getDeclaredField("connectionStateTtl"); - connectionStateField.setAccessible(true); - connectionStateField.setLong(ably.connection.connectionManager, newTtl); - Field maxIdleField = ably.connection.connectionManager.getClass().getDeclaredField("maxIdleInterval"); - maxIdleField.setAccessible(true); - maxIdleField.setLong(ably.connection.connectionManager, newIdleInterval); - } catch (NoSuchFieldException | IllegalAccessException e) { - fail("Unexpected exception in checking connectionStateTtl"); - } - } - }); ConnectionWaiter connectionWaiter = new ConnectionWaiter(ably.connection); connectionWaiter.waitFor(ConnectionState.connected); + MutableConnectionManager connectionManager = new MutableConnectionManager(ably); + connectionManager.setField("connectionStateTtl", newTtl); + connectionManager.setField("maxIdleInterval", newIdleInterval); + final Channel senderChannel = ably.channels.get(channelName); senderChannel.attach(); (new ChannelWaiter(senderChannel)).waitFor(ChannelState.attached); @@ -887,14 +852,7 @@ public void onConnectionStateChanged(ConnectionStateChange state) { final String firstConnectionId = ably.connection.id; - /* suppress automatic retries by the connection manager and disconnect */ - try { - Method method = ably.connection.connectionManager.getClass().getDeclaredMethod("disconnectAndSuppressRetries"); - method.setAccessible(true); - method.invoke(ably.connection.connectionManager); - } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) { - fail("Unexpected exception in suppressing retries"); - } + connectionManager.disconnectAndSuppressRetries(); connectionWaiter.waitFor(ConnectionState.disconnected); assertEquals("Disconnected state was not reached", ConnectionState.disconnected, ably.connection.state); @@ -905,6 +863,9 @@ public void onConnectionStateChanged(ConnectionStateChange state) { } //now let's unblock the ack nacks and reconnect mockWebsocketFactory.blockReceiveProcessing(message -> false); + + /* We want this greater than newTtl + newIdleInterval */ + final long waitInDisconnectedState = 3000L; /* Wait for the connection to go stale, then reconnect */ try { Thread.sleep(waitInDisconnectedState); @@ -935,6 +896,37 @@ public void onConnectionStateChanged(ConnectionStateChange state) { } } + static class MutableConnectionManager { + ConnectionManager connectionManager; + + public MutableConnectionManager(AblyRealtime ablyRealtime) { + this.connectionManager = ablyRealtime.connection.connectionManager; + } + + public void setField(String fieldName, long value) { + Field connectionStateField = null; + try { + connectionStateField = ConnectionManager.class.getDeclaredField(fieldName); + connectionStateField.setAccessible(true); + connectionStateField.setLong(connectionManager, value); + } catch (NoSuchFieldException | IllegalAccessException e) { + fail("Unexpected exception in checking connectionStateTtl"); + } + } + + /** + * Suppress automatic retries by the connection manager and disconnect + */ + public void disconnectAndSuppressRetries() { + try { + Method method = ConnectionManager.class.getDeclaredMethod("disconnectAndSuppressRetries"); + method.setAccessible(true); + method.invoke(connectionManager); + } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) { + fail("Unexpected exception in suppressing retries"); + } + } + } /** * In case of resume failure verify that presence messages are resent @@ -950,34 +942,19 @@ public void resume_publish_reenter_when_resume_failed() throws AblyException { options.logLevel = Log.VERBOSE; options.realtimeRequestTimeout = 2000L; - /* We want this greater than newTtl + newIdleInterval */ - final long waitInDisconnectedState = 5000L; options.transportFactory = mockWebsocketFactory; try(AblyRealtime ably = new AblyRealtime(options)) { - final long newTtl = 1000L; - final long newIdleInterval = 1000L; - /* We want this greater than newTtl + newIdleInterval */ - ably.connection.on(ConnectionEvent.connected, new ConnectionStateListener() { - @Override - public void onConnectionStateChanged(ConnectionStateChange state) { - try { - Field connectionStateField = ably.connection.connectionManager.getClass(). - getDeclaredField("connectionStateTtl"); - connectionStateField.setAccessible(true); - connectionStateField.setLong(ably.connection.connectionManager, newTtl); - Field maxIdleField = ably.connection.connectionManager.getClass(). - getDeclaredField("maxIdleInterval"); - maxIdleField.setAccessible(true); - maxIdleField.setLong(ably.connection.connectionManager, newIdleInterval); - } catch (NoSuchFieldException | IllegalAccessException e) { - fail("Unexpected exception in checking connectionStateTtl"); - } - } - }); ConnectionWaiter connectionWaiter = new ConnectionWaiter(ably.connection); connectionWaiter.waitFor(ConnectionState.connected); + final long newTtl = 1000L; + final long newIdleInterval = 1000L; + + MutableConnectionManager connectionManager = new MutableConnectionManager(ably); + connectionManager.setField("connectionStateTtl", newTtl); + connectionManager.setField("maxIdleInterval", newIdleInterval); + final Channel senderChannel = ably.channels.get(channelName); senderChannel.attach(); (new ChannelWaiter(senderChannel)).waitFor(ChannelState.attached); @@ -1012,14 +989,7 @@ public void onConnectionStateChanged(ConnectionStateChange state) { final String firstConnectionId = ably.connection.id; - /* suppress automatic retries by the connection manager and disconnect */ - try { - Method method = ably.connection.connectionManager.getClass().getDeclaredMethod("disconnectAndSuppressRetries"); - method.setAccessible(true); - method.invoke(ably.connection.connectionManager); - } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) { - fail("Unexpected exception in suppressing retries"); - } + connectionManager.disconnectAndSuppressRetries(); connectionWaiter.waitFor(ConnectionState.disconnected); assertEquals("Disconnected state was not reached", ConnectionState.disconnected, ably.connection.state); @@ -1028,6 +998,8 @@ public void onConnectionStateChanged(ConnectionStateChange state) { senderChannel.presence.enterClient(clients[i],null,presenceCompletion.add()); } + /* We want this greater than newTtl + newIdleInterval */ + final long waitInDisconnectedState = 5000L; /* Wait for the connection to go stale, then reconnect */ try { Thread.sleep(waitInDisconnectedState); From c93f89d091aeaa71c27cbc3f771ea28c67add1a4 Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Wed, 31 Jan 2024 10:20:37 +0530 Subject: [PATCH 18/23] Fixed checkstyle issues for integration tests --- .../java/io/ably/lib/test/common/Helpers.java | 9 ++++++++- .../test/realtime/RealtimeChannelTest.java | 19 +++++++++++++++---- .../realtime/RealtimeConnectFailTest.java | 1 - .../lib/test/realtime/RealtimeResumeTest.java | 5 +---- 4 files changed, 24 insertions(+), 10 deletions(-) diff --git a/lib/src/test/java/io/ably/lib/test/common/Helpers.java b/lib/src/test/java/io/ably/lib/test/common/Helpers.java index 89a0a1bf7..c3862d1d2 100644 --- a/lib/src/test/java/io/ably/lib/test/common/Helpers.java +++ b/lib/src/test/java/io/ably/lib/test/common/Helpers.java @@ -29,8 +29,15 @@ import io.ably.lib.debug.DebugOptions.RawProtocolListener; import io.ably.lib.http.HttpCore; import io.ably.lib.http.HttpUtils; -import io.ably.lib.realtime.*; +import io.ably.lib.realtime.Channel; import io.ably.lib.realtime.Channel.MessageListener; +import io.ably.lib.realtime.ChannelEvent; +import io.ably.lib.realtime.ChannelState; +import io.ably.lib.realtime.ChannelStateListener; +import io.ably.lib.realtime.CompletionListener; +import io.ably.lib.realtime.Connection; +import io.ably.lib.realtime.ConnectionState; +import io.ably.lib.realtime.ConnectionStateListener; import io.ably.lib.realtime.Presence.PresenceListener; import io.ably.lib.transport.ConnectionManager; import io.ably.lib.types.AblyException; diff --git a/lib/src/test/java/io/ably/lib/test/realtime/RealtimeChannelTest.java b/lib/src/test/java/io/ably/lib/test/realtime/RealtimeChannelTest.java index 8341acd54..447607571 100644 --- a/lib/src/test/java/io/ably/lib/test/realtime/RealtimeChannelTest.java +++ b/lib/src/test/java/io/ably/lib/test/realtime/RealtimeChannelTest.java @@ -25,19 +25,30 @@ import io.ably.lib.types.Message; import io.ably.lib.types.ProtocolMessage; import io.ably.lib.util.Log; - -import io.ably.lib.util.StringUtils; import org.hamcrest.Matchers; import org.junit.Ignore; import org.junit.Test; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; -import java.util.*; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; -import static org.junit.Assert.*; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; public class RealtimeChannelTest extends ParameterizedTest { diff --git a/lib/src/test/java/io/ably/lib/test/realtime/RealtimeConnectFailTest.java b/lib/src/test/java/io/ably/lib/test/realtime/RealtimeConnectFailTest.java index 021d4ec22..02a1d07d5 100644 --- a/lib/src/test/java/io/ably/lib/test/realtime/RealtimeConnectFailTest.java +++ b/lib/src/test/java/io/ably/lib/test/realtime/RealtimeConnectFailTest.java @@ -27,7 +27,6 @@ import org.junit.Test; import org.junit.rules.Timeout; -import java.lang.reflect.Field; import java.time.Duration; import java.time.Instant; import java.util.ArrayList; diff --git a/lib/src/test/java/io/ably/lib/test/realtime/RealtimeResumeTest.java b/lib/src/test/java/io/ably/lib/test/realtime/RealtimeResumeTest.java index 4e1959874..179106dad 100644 --- a/lib/src/test/java/io/ably/lib/test/realtime/RealtimeResumeTest.java +++ b/lib/src/test/java/io/ably/lib/test/realtime/RealtimeResumeTest.java @@ -4,9 +4,7 @@ import io.ably.lib.realtime.AblyRealtime; import io.ably.lib.realtime.Channel; import io.ably.lib.realtime.ChannelState; -import io.ably.lib.realtime.ConnectionEvent; import io.ably.lib.realtime.ConnectionState; -import io.ably.lib.realtime.ConnectionStateListener; import io.ably.lib.test.common.Helpers.ChannelWaiter; import io.ably.lib.test.common.Helpers.CompletionSet; import io.ably.lib.test.common.Helpers.ConnectionWaiter; @@ -899,7 +897,7 @@ public void resume_publish_resend_pending_messages_when_resume_failed() throws A static class MutableConnectionManager { ConnectionManager connectionManager; - public MutableConnectionManager(AblyRealtime ablyRealtime) { + MutableConnectionManager(AblyRealtime ablyRealtime) { this.connectionManager = ablyRealtime.connection.connectionManager; } @@ -1045,7 +1043,6 @@ public void resume_publish_reenter_when_resume_failed() throws AblyException { for (int i = 3; i < 9; i++) { assertTrue("Client id isn't there:" + clients[i], sentPresenceMap.containsKey(clients[i])); - senderChannel.presence.enterClient(clients[i],null,presenceCompletion.add()); } } } From e93f0ca9a6b0f7b8bcd3219a08df21214a5767e2 Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Wed, 31 Jan 2024 16:05:23 +0530 Subject: [PATCH 19/23] Moved mutableConnection manager under helpers --- .../java/io/ably/lib/test/common/Helpers.java | 37 +++++++++++++++ .../test/realtime/ConnectionManagerTest.java | 18 +------ .../test/realtime/RealtimeChannelTest.java | 21 +-------- .../lib/test/realtime/RealtimeResumeTest.java | 47 +++---------------- 4 files changed, 47 insertions(+), 76 deletions(-) diff --git a/lib/src/test/java/io/ably/lib/test/common/Helpers.java b/lib/src/test/java/io/ably/lib/test/common/Helpers.java index c3862d1d2..dd3adf10e 100644 --- a/lib/src/test/java/io/ably/lib/test/common/Helpers.java +++ b/lib/src/test/java/io/ably/lib/test/common/Helpers.java @@ -1,5 +1,8 @@ package io.ably.lib.test.common; +import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.net.HttpURLConnection; import java.net.URL; import java.util.ArrayList; @@ -29,6 +32,7 @@ import io.ably.lib.debug.DebugOptions.RawProtocolListener; import io.ably.lib.http.HttpCore; import io.ably.lib.http.HttpUtils; +import io.ably.lib.realtime.AblyRealtime; import io.ably.lib.realtime.Channel; import io.ably.lib.realtime.Channel.MessageListener; import io.ably.lib.realtime.ChannelEvent; @@ -62,6 +66,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; public class Helpers { @@ -403,6 +408,38 @@ public PresenceMessage contains(String clientId, String connectionId, PresenceMe } } + public static class MutableConnectionManager { + ConnectionManager connectionManager; + + public MutableConnectionManager(AblyRealtime ablyRealtime) { + this.connectionManager = ablyRealtime.connection.connectionManager; + } + + public void setField(String fieldName, long value) { + Field connectionStateField = null; + try { + connectionStateField = ConnectionManager.class.getDeclaredField(fieldName); + connectionStateField.setAccessible(true); + connectionStateField.setLong(connectionManager, value); + } catch (NoSuchFieldException | IllegalAccessException e) { + fail("Unexpected exception in checking connectionStateTtl"); + } + } + + /** + * Suppress automatic retries by the connection manager and disconnect + */ + public void disconnectAndSuppressRetries() { + try { + Method method = ConnectionManager.class.getDeclaredMethod("disconnectAndSuppressRetries"); + method.setAccessible(true); + method.invoke(connectionManager); + } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) { + fail("Unexpected exception in suppressing retries"); + } + } + } + /** * A class that listens for state change events on a connection. * @author paddy diff --git a/lib/src/test/java/io/ably/lib/test/realtime/ConnectionManagerTest.java b/lib/src/test/java/io/ably/lib/test/realtime/ConnectionManagerTest.java index a164120a0..97910f0e8 100644 --- a/lib/src/test/java/io/ably/lib/test/realtime/ConnectionManagerTest.java +++ b/lib/src/test/java/io/ably/lib/test/realtime/ConnectionManagerTest.java @@ -618,14 +618,7 @@ public void onConnectionStateChanged(ConnectionStateChange state) { connectionWaiter.waitFor(ConnectionState.connected); final String firstConnectionId = ably.connection.id; - /* suppress automatic retries by the connection manager and disconnect */ - try { - Method method = ably.connection.connectionManager.getClass().getDeclaredMethod("disconnectAndSuppressRetries"); - method.setAccessible(true); - method.invoke(ably.connection.connectionManager); - } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) { - fail("Unexpected exception in suppressing retries"); - } + new Helpers.MutableConnectionManager(ably).disconnectAndSuppressRetries(); connectionWaiter.waitFor(ConnectionState.disconnected); assertEquals("Disconnected state was not reached", ConnectionState.disconnected, ably.connection.state); @@ -726,14 +719,7 @@ public void onChannelStateChanged(ChannelStateChange stateChange) { attachedChannel.attach(); attachedChannelWaiter.waitFor(ChannelState.attached); - /* suppress automatic retries by the connection manager and disconnect */ - try { - Method method = ably.connection.connectionManager.getClass().getDeclaredMethod("disconnectAndSuppressRetries"); - method.setAccessible(true); - method.invoke(ably.connection.connectionManager); - } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) { - fail("Unexpected exception in suppressing retries"); - } + new Helpers.MutableConnectionManager(ably).disconnectAndSuppressRetries(); connectionWaiter.waitFor(ConnectionState.disconnected); assertEquals("Disconnected state was not reached", ConnectionState.disconnected, ably.connection.state); diff --git a/lib/src/test/java/io/ably/lib/test/realtime/RealtimeChannelTest.java b/lib/src/test/java/io/ably/lib/test/realtime/RealtimeChannelTest.java index 447607571..ae3c08438 100644 --- a/lib/src/test/java/io/ably/lib/test/realtime/RealtimeChannelTest.java +++ b/lib/src/test/java/io/ably/lib/test/realtime/RealtimeChannelTest.java @@ -1582,16 +1582,7 @@ public void channel_valid_resume_reattach_channels() throws AblyException { assertEquals(ably.connection.connectionManager.msgSerial, 1); - /* disconnect, and sabotage the resume */ - /* suppress automatic retries by the connection manager */ - try { - Method method = ably.connection.connectionManager.getClass().getDeclaredMethod("disconnectAndSuppressRetries"); - method.setAccessible(true); - method.invoke(ably.connection.connectionManager); - } catch (NoSuchMethodException|IllegalAccessException|InvocationTargetException e) { - fail("Unexpected exception in suppressing retries"); - } - + new Helpers.MutableConnectionManager(ably).disconnectAndSuppressRetries(); connectionWaiter.waitFor(ConnectionState.disconnected); assertEquals("Verify disconnected state is reached", ConnectionState.disconnected, ably.connection.state); @@ -1666,15 +1657,7 @@ public void channel_invalid_resume_reattach_channels() throws AblyException { assertEquals(ably.connection.connectionManager.msgSerial, 1); - /* suppress automatic retries by the connection manager */ - try { - Method method = ably.connection.connectionManager.getClass().getDeclaredMethod("disconnectAndSuppressRetries"); - method.setAccessible(true); - method.invoke(ably.connection.connectionManager); - } catch (NoSuchMethodException|IllegalAccessException|InvocationTargetException e) { - fail("Unexpected exception in suppressing retries"); - } - + new Helpers.MutableConnectionManager(ably).disconnectAndSuppressRetries(); connectionWaiter.waitFor(ConnectionState.disconnected); assertEquals("Verify disconnected state is reached", ConnectionState.disconnected, ably.connection.state); diff --git a/lib/src/test/java/io/ably/lib/test/realtime/RealtimeResumeTest.java b/lib/src/test/java/io/ably/lib/test/realtime/RealtimeResumeTest.java index 179106dad..3a882cf18 100644 --- a/lib/src/test/java/io/ably/lib/test/realtime/RealtimeResumeTest.java +++ b/lib/src/test/java/io/ably/lib/test/realtime/RealtimeResumeTest.java @@ -5,13 +5,13 @@ import io.ably.lib.realtime.Channel; import io.ably.lib.realtime.ChannelState; import io.ably.lib.realtime.ConnectionState; +import io.ably.lib.test.common.Helpers; import io.ably.lib.test.common.Helpers.ChannelWaiter; import io.ably.lib.test.common.Helpers.CompletionSet; import io.ably.lib.test.common.Helpers.ConnectionWaiter; import io.ably.lib.test.common.Helpers.MessageWaiter; import io.ably.lib.test.common.ParameterizedTest; import io.ably.lib.test.util.MockWebsocketFactory; -import io.ably.lib.transport.ConnectionManager; import io.ably.lib.types.AblyException; import io.ably.lib.types.ClientOptions; import io.ably.lib.types.ErrorInfo; @@ -25,9 +25,6 @@ import org.junit.Test; import org.junit.rules.Timeout; -import java.lang.reflect.Field; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -67,7 +64,7 @@ public void resume_none() { (new ChannelWaiter(channel)).waitFor(ChannelState.attached); assertEquals("Verify attached state reached", channel.state, ChannelState.attached); - new MutableConnectionManager(ably).disconnectAndSuppressRetries(); + new Helpers.MutableConnectionManager(ably).disconnectAndSuppressRetries(); connectionWaiter.waitFor(ConnectionState.disconnected); /* reconnect the rx connection */ @@ -519,7 +516,7 @@ public void resume_verify_publish() { * causing the connection itself to be disposed */ System.out.println("*** about to disconnect tx connection"); - new MutableConnectionManager(ablyTx).disconnectAndSuppressRetries(); + new Helpers.MutableConnectionManager(ablyTx).disconnectAndSuppressRetries(); (new ConnectionWaiter(ablyTx.connection)).waitFor(ConnectionState.disconnected); /* wait */ @@ -739,7 +736,7 @@ public void resume_publish_resend_pending_messages_when_resume_is_successful() { final String connectionId = sender.connection.id; - new MutableConnectionManager(sender).disconnectAndSuppressRetries(); + new Helpers.MutableConnectionManager(sender).disconnectAndSuppressRetries(); (new ConnectionWaiter(sender.connection)).waitFor(ConnectionState.disconnected); sender.connection.connectionManager.requestState(ConnectionState.disconnected); @@ -809,7 +806,7 @@ public void resume_publish_resend_pending_messages_when_resume_failed() throws A ConnectionWaiter connectionWaiter = new ConnectionWaiter(ably.connection); connectionWaiter.waitFor(ConnectionState.connected); - MutableConnectionManager connectionManager = new MutableConnectionManager(ably); + Helpers.MutableConnectionManager connectionManager = new Helpers.MutableConnectionManager(ably); connectionManager.setField("connectionStateTtl", newTtl); connectionManager.setField("maxIdleInterval", newIdleInterval); @@ -894,38 +891,6 @@ public void resume_publish_resend_pending_messages_when_resume_failed() throws A } } - static class MutableConnectionManager { - ConnectionManager connectionManager; - - MutableConnectionManager(AblyRealtime ablyRealtime) { - this.connectionManager = ablyRealtime.connection.connectionManager; - } - - public void setField(String fieldName, long value) { - Field connectionStateField = null; - try { - connectionStateField = ConnectionManager.class.getDeclaredField(fieldName); - connectionStateField.setAccessible(true); - connectionStateField.setLong(connectionManager, value); - } catch (NoSuchFieldException | IllegalAccessException e) { - fail("Unexpected exception in checking connectionStateTtl"); - } - } - - /** - * Suppress automatic retries by the connection manager and disconnect - */ - public void disconnectAndSuppressRetries() { - try { - Method method = ConnectionManager.class.getDeclaredMethod("disconnectAndSuppressRetries"); - method.setAccessible(true); - method.invoke(connectionManager); - } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) { - fail("Unexpected exception in suppressing retries"); - } - } - } - /** * In case of resume failure verify that presence messages are resent * */ @@ -949,7 +914,7 @@ public void resume_publish_reenter_when_resume_failed() throws AblyException { final long newTtl = 1000L; final long newIdleInterval = 1000L; - MutableConnectionManager connectionManager = new MutableConnectionManager(ably); + Helpers.MutableConnectionManager connectionManager = new Helpers.MutableConnectionManager(ably); connectionManager.setField("connectionStateTtl", newTtl); connectionManager.setField("maxIdleInterval", newIdleInterval); From 2436f676b4d094483a5bd11beff30914060544ec Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Wed, 31 Jan 2024 17:45:19 +0530 Subject: [PATCH 20/23] Updating complex tests where reflection is used --- .../java/io/ably/lib/test/common/Helpers.java | 16 +++++++-- .../test/realtime/ConnectionManagerTest.java | 36 ++++++++----------- 2 files changed, 28 insertions(+), 24 deletions(-) diff --git a/lib/src/test/java/io/ably/lib/test/common/Helpers.java b/lib/src/test/java/io/ably/lib/test/common/Helpers.java index dd3adf10e..e4d24fd37 100644 --- a/lib/src/test/java/io/ably/lib/test/common/Helpers.java +++ b/lib/src/test/java/io/ably/lib/test/common/Helpers.java @@ -416,16 +416,26 @@ public MutableConnectionManager(AblyRealtime ablyRealtime) { } public void setField(String fieldName, long value) { - Field connectionStateField = null; try { - connectionStateField = ConnectionManager.class.getDeclaredField(fieldName); + Field connectionStateField = ConnectionManager.class.getDeclaredField(fieldName); connectionStateField.setAccessible(true); connectionStateField.setLong(connectionManager, value); } catch (NoSuchFieldException | IllegalAccessException e) { - fail("Unexpected exception in checking connectionStateTtl"); + fail("Error updating " + fieldName + " error occurred" + e); } } + public long getField(String fieldName) { + try { + Field connectionStateField = ConnectionManager.class.getDeclaredField(fieldName); + connectionStateField.setAccessible(true); + return connectionStateField.getLong(connectionManager); + } catch (NoSuchFieldException | IllegalAccessException e) { + fail("Error accessing " + fieldName + " error occurred" + e); + } + return 0; + } + /** * Suppress automatic retries by the connection manager and disconnect */ diff --git a/lib/src/test/java/io/ably/lib/test/realtime/ConnectionManagerTest.java b/lib/src/test/java/io/ably/lib/test/realtime/ConnectionManagerTest.java index 97910f0e8..c1183b69d 100644 --- a/lib/src/test/java/io/ably/lib/test/realtime/ConnectionManagerTest.java +++ b/lib/src/test/java/io/ably/lib/test/realtime/ConnectionManagerTest.java @@ -529,29 +529,23 @@ public void run() { @Test public void connection_details_has_ttl() throws AblyException { ClientOptions opts = createOptions(testVars.keys[0].keyStr); + opts.autoConnect = false; try (AblyRealtime ably = new AblyRealtime(opts)) { - final boolean[] callbackWasRun = new boolean[1]; - ably.connection.on(ConnectionEvent.connected, new ConnectionStateListener() { - @Override - public void onConnectionStateChanged(ConnectionStateChange state) { - synchronized(callbackWasRun) { - callbackWasRun[0] = true; - try { - Field field = ably.connection.connectionManager.getClass().getDeclaredField("connectionStateTtl"); - field.setAccessible(true); - assertEquals("Verify connectionStateTtl has the default value", field.get(ably.connection.connectionManager), 120000L); - } catch (NoSuchFieldException|IllegalAccessException e) { - fail("Unexpected exception in checking connectionStateTtl"); - } - callbackWasRun.notify(); - } - } - }); + Helpers.MutableConnectionManager connectionManager = new Helpers.MutableConnectionManager(ably); - synchronized (callbackWasRun) { - try { callbackWasRun.wait(); } catch(InterruptedException ie) {} - assertTrue("Connected callback was not run", callbackWasRun[0]); - } + // connStateTtl set to default value + long connStateTtl = connectionManager.getField("connectionStateTtl"); + assertEquals(Defaults.connectionStateTtl, connStateTtl); + + connectionManager.setField("connectionStateTtl", 8000L); + long oldConnStateTtl = connectionManager.getField("connectionStateTtl"); + assertEquals(8000L, oldConnStateTtl); + + ably.connect(); + new ConnectionWaiter(ably.connection).waitFor(ConnectionState.connected); + long newConnStateTtl = connectionManager.getField("connectionStateTtl"); + // connStateTtl set by server to 120s + assertEquals(120000L, newConnStateTtl); } } From e791f8f273a36070130133d8a29a6b3ee388829e Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Wed, 31 Jan 2024 18:37:15 +0530 Subject: [PATCH 21/23] refactored tests with easier test helper implementation --- .../java/io/ably/lib/test/common/Helpers.java | 4 +- .../test/realtime/ConnectionManagerTest.java | 106 ++++++------------ .../test/realtime/RealtimeChannelTest.java | 2 - 3 files changed, 34 insertions(+), 78 deletions(-) diff --git a/lib/src/test/java/io/ably/lib/test/common/Helpers.java b/lib/src/test/java/io/ably/lib/test/common/Helpers.java index e4d24fd37..90473b4df 100644 --- a/lib/src/test/java/io/ably/lib/test/common/Helpers.java +++ b/lib/src/test/java/io/ably/lib/test/common/Helpers.java @@ -421,7 +421,7 @@ public void setField(String fieldName, long value) { connectionStateField.setAccessible(true); connectionStateField.setLong(connectionManager, value); } catch (NoSuchFieldException | IllegalAccessException e) { - fail("Error updating " + fieldName + " error occurred" + e); + fail("Failed updating " + fieldName + " with error " + e); } } @@ -431,7 +431,7 @@ public long getField(String fieldName) { connectionStateField.setAccessible(true); return connectionStateField.getLong(connectionManager); } catch (NoSuchFieldException | IllegalAccessException e) { - fail("Error accessing " + fieldName + " error occurred" + e); + fail("Failed accessing " + fieldName + " with error " + e); } return 0; } diff --git a/lib/src/test/java/io/ably/lib/test/realtime/ConnectionManagerTest.java b/lib/src/test/java/io/ably/lib/test/realtime/ConnectionManagerTest.java index c1183b69d..1a9cf7325 100644 --- a/lib/src/test/java/io/ably/lib/test/realtime/ConnectionManagerTest.java +++ b/lib/src/test/java/io/ably/lib/test/realtime/ConnectionManagerTest.java @@ -35,10 +35,7 @@ import java.lang.reflect.Constructor; import java.lang.reflect.Field; import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -46,6 +43,7 @@ import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertThat; @@ -557,23 +555,17 @@ public void connection_is_closed_after_max_idle_interval() throws AblyException ClientOptions opts = createOptions(testVars.keys[0].keyStr); opts.realtimeRequestTimeout = 2000; try(AblyRealtime ably = new AblyRealtime(opts)) { - final long newIdleInterval = 500L; - - // When we connect, we set the max idle interval to be very small - ably.connection.on(ConnectionEvent.connected, state -> { - try { - Field maxIdleField = ably.connection.connectionManager.getClass().getDeclaredField("maxIdleInterval"); - maxIdleField.setAccessible(true); - maxIdleField.setLong(ably.connection.connectionManager, newIdleInterval); - } catch (NoSuchFieldException | IllegalAccessException e) { - fail("Unexpected exception in checking connectionStateTtl"); - } - }); // The original max idle interval we receive from the server is 15s. // We should wait for this, plus a tiny bit extra (as we set the new idle interval to be very low // after connecting) to make sure that the connection is disconnected ConnectionWaiter connectionWaiter = new ConnectionWaiter(ably.connection); + connectionWaiter.waitFor(ConnectionState.connected); + + // When we connect, we set the max idle interval to be very small + Helpers.MutableConnectionManager connectionManager = new Helpers.MutableConnectionManager(ably); + connectionManager.setField("maxIdleInterval", 500L); + assertTrue(connectionWaiter.waitFor(ConnectionState.disconnected, 1, 25000)); } } @@ -587,39 +579,25 @@ public void connection_has_new_id_when_reconnecting_after_statettl_plus_idleinte ClientOptions opts = createOptions(testVars.keys[0].keyStr); opts.realtimeRequestTimeout = 2000L; try(AblyRealtime ably = new AblyRealtime(opts)) { - final long newTtl = 1000L; - final long newIdleInterval = 1000L; /* We want this greater than newTtl + newIdleInterval */ final long waitInDisconnectedState = 3000L; - ably.connection.on(ConnectionEvent.connected, new ConnectionStateListener() { - @Override - public void onConnectionStateChanged(ConnectionStateChange state) { - try { - Field connectionStateField = ably.connection.connectionManager.getClass().getDeclaredField("connectionStateTtl"); - connectionStateField.setAccessible(true); - connectionStateField.setLong(ably.connection.connectionManager, newTtl); - Field maxIdleField = ably.connection.connectionManager.getClass().getDeclaredField("maxIdleInterval"); - maxIdleField.setAccessible(true); - maxIdleField.setLong(ably.connection.connectionManager, newIdleInterval); - } catch (NoSuchFieldException | IllegalAccessException e) { - fail("Unexpected exception in checking connectionStateTtl"); - } - } - }); - ConnectionWaiter connectionWaiter = new ConnectionWaiter(ably.connection); connectionWaiter.waitFor(ConnectionState.connected); final String firstConnectionId = ably.connection.id; - new Helpers.MutableConnectionManager(ably).disconnectAndSuppressRetries(); + Helpers.MutableConnectionManager connectionManager = new Helpers.MutableConnectionManager(ably); + connectionManager.setField("connectionStateTtl", 1000L); + connectionManager.setField("maxIdleInterval", 1000L); + + connectionManager.disconnectAndSuppressRetries(); connectionWaiter.waitFor(ConnectionState.disconnected); assertEquals("Disconnected state was not reached", ConnectionState.disconnected, ably.connection.state); /* Wait for the connection to go stale, then reconnect */ try { Thread.sleep(waitInDisconnectedState); - } catch (InterruptedException e) { + } catch (InterruptedException ignored) { } ably.connection.connect(); connectionWaiter.waitFor(ConnectionState.connected); @@ -662,65 +640,42 @@ public void connection_has_same_id_when_reconnecting_before_statettl_plus_idlein public void channels_are_reattached_after_reconnecting_when_statettl_plus_idleinterval_has_passed() throws AblyException { ClientOptions opts = createOptions(testVars.keys[0].keyStr); try(AblyRealtime ably = new AblyRealtime(opts)) { - final long newTtl = 1000L; - final long newIdleInterval = 1000L; /* We want this greater than newTtl + newIdleInterval */ final long waitInDisconnectedState = 3000L; - final List attachedChannelHistory = new ArrayList(); - final List expectedAttachedChannelHistory = Arrays.asList("attaching", "attached", "attaching", "attached"); - final List suspendedChannelHistory = new ArrayList(); - final List expectedSuspendedChannelHistory = Arrays.asList("attaching", "attached"); - ably.connection.on(ConnectionEvent.connected, new ConnectionStateListener() { - @Override - public void onConnectionStateChanged(ConnectionStateChange state) { - try { - Field connectionStateField = ably.connection.connectionManager.getClass().getDeclaredField("connectionStateTtl"); - connectionStateField.setAccessible(true); - connectionStateField.setLong(ably.connection.connectionManager, newTtl); - Field maxIdleField = ably.connection.connectionManager.getClass().getDeclaredField("maxIdleInterval"); - maxIdleField.setAccessible(true); - maxIdleField.setLong(ably.connection.connectionManager, newIdleInterval); - } catch (NoSuchFieldException | IllegalAccessException e) { - fail("Unexpected exception in checking connectionStateTtl"); - } - } - }); + final ChannelState[] expectedAttachedChannelHistory = new ChannelState[]{ + ChannelState.attaching, ChannelState.attached, ChannelState.attaching, ChannelState.attached}; - ConnectionWaiter connectionWaiter = new ConnectionWaiter(ably.connection); + final ChannelState[] expectedSuspendedChannelHistory = new ChannelState[]{ + ChannelState.attaching, ChannelState.attached}; + + ConnectionWaiter connectionWaiter = new ConnectionWaiter(ably.connection); connectionWaiter.waitFor(ConnectionState.connected); final String firstConnectionId = ably.connection.id; + Helpers.MutableConnectionManager connectionManager = new Helpers.MutableConnectionManager(ably); + connectionManager.setField("connectionStateTtl", 1000L); + connectionManager.setField("maxIdleInterval", 1000L); + /* Prepare channels */ final Channel attachedChannel = ably.channels.get("test-reattach-after-ttl" + testParams.name); ChannelWaiter attachedChannelWaiter = new Helpers.ChannelWaiter(attachedChannel); - attachedChannel.on(new ChannelStateListener() { - @Override - public void onChannelStateChanged(ChannelStateChange stateChange) { - attachedChannelHistory.add(stateChange.current.name()); - } - }); + final Channel suspendedChannel = ably.channels.get("test-reattach-suspended-after-ttl" + testParams.name); suspendedChannel.state = ChannelState.suspended; - suspendedChannel.on(new ChannelStateListener() { - @Override - public void onChannelStateChanged(ChannelStateChange stateChange) { - suspendedChannelHistory.add(stateChange.current.name()); - } - }); ChannelWaiter suspendedChannelWaiter = new Helpers.ChannelWaiter(suspendedChannel); /* attach first channel and wait for it to be attached */ attachedChannel.attach(); attachedChannelWaiter.waitFor(ChannelState.attached); - new Helpers.MutableConnectionManager(ably).disconnectAndSuppressRetries(); + connectionManager.disconnectAndSuppressRetries(); connectionWaiter.waitFor(ConnectionState.disconnected); assertEquals("Disconnected state was not reached", ConnectionState.disconnected, ably.connection.state); /* Wait for the connection to go stale, then reconnect */ try { Thread.sleep(waitInDisconnectedState); - } catch (InterruptedException e) { + } catch (InterruptedException ignored) { } ably.connection.connect(); connectionWaiter.waitFor(ConnectionState.connected); @@ -734,15 +689,18 @@ public void onChannelStateChanged(ChannelStateChange stateChange) { attachedChannel.once(ChannelEvent.attached, new ChannelStateListener() { @Override public void onChannelStateChanged(ChannelStateChange stateChange) { - assertEquals("Resumed is true and should be false", stateChange.resumed, false); + assertFalse("Resumed is true and should be false", stateChange.resumed); } }); /* Wait for both channels to reattach and verify state histories match the expected ones */ attachedChannelWaiter.waitFor(ChannelState.attached); suspendedChannelWaiter.waitFor(ChannelState.attached); - assertEquals("Attached channel histories do not match", attachedChannelHistory, expectedAttachedChannelHistory); - assertEquals("Suspended channel histories do not match", suspendedChannelHistory, expectedSuspendedChannelHistory); + assertTrue("Attached channel histories do not match", + attachedChannelWaiter.hasFinalStates(expectedAttachedChannelHistory)); + + assertTrue("Suspended channel histories do not match", + suspendedChannelWaiter.hasFinalStates(expectedSuspendedChannelHistory)); } } diff --git a/lib/src/test/java/io/ably/lib/test/realtime/RealtimeChannelTest.java b/lib/src/test/java/io/ably/lib/test/realtime/RealtimeChannelTest.java index ae3c08438..a3a7b30cf 100644 --- a/lib/src/test/java/io/ably/lib/test/realtime/RealtimeChannelTest.java +++ b/lib/src/test/java/io/ably/lib/test/realtime/RealtimeChannelTest.java @@ -29,8 +29,6 @@ import org.junit.Ignore; import org.junit.Test; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; From 68a09449f299ce3d6f53af342a2799a113d618d4 Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Sun, 4 Feb 2024 20:14:10 +0530 Subject: [PATCH 22/23] Refactored ably-java tests, removed unnecessary callbacks --- .../java/io/ably/lib/test/common/Helpers.java | 24 ++++++---- .../lib/test/realtime/RealtimeAuthTest.java | 48 +++++++++---------- .../realtime/RealtimeChannelHistoryTest.java | 19 +++----- .../lib/test/realtime/RealtimeResumeTest.java | 37 ++++++-------- 4 files changed, 59 insertions(+), 69 deletions(-) diff --git a/lib/src/test/java/io/ably/lib/test/common/Helpers.java b/lib/src/test/java/io/ably/lib/test/common/Helpers.java index 90473b4df..32033acc4 100644 --- a/lib/src/test/java/io/ably/lib/test/common/Helpers.java +++ b/lib/src/test/java/io/ably/lib/test/common/Helpers.java @@ -477,7 +477,7 @@ public synchronized ErrorInfo waitFor(ConnectionState state) { while (currentState() != state) { try { wait(); - } catch (InterruptedException e) { + } catch (InterruptedException ignored) { } } Log.d(TAG, "waitFor done: state=" + targetStateName + ")"); @@ -493,8 +493,8 @@ public synchronized void waitFor(ConnectionState state, int count) { Log.d(TAG, "waitFor(state=" + state.getConnectionEvent().name() + ", count=" + count + ")"); while(getStateCount(state) < count) - try { wait(); } catch(InterruptedException e) {} - Log.d(TAG, "waitFor done: state=" + latestChange.current.getConnectionEvent().name() + ", count=" + getStateCount(state) + ")"); + try { wait(); } catch(InterruptedException ignored) {} + Log.d(TAG, "waitFor done: state=" + lastStateChange().current.getConnectionEvent().name() + ", count=" + getStateCount(state) + ")"); } /** @@ -511,7 +511,7 @@ public synchronized boolean waitFor(ConnectionState state, int count, long time) long remaining = time; while(getStateCount(state) < count && remaining > 0) { Log.d(TAG, "waitFor(state=" + state.getConnectionEvent().name() + ", waiting for=" + remaining + ")"); - try { wait(remaining); } catch(InterruptedException e) {} + try { wait(remaining); } catch(InterruptedException ignored) {} remaining = targetTime - System.currentTimeMillis(); } int stateCount = getStateCount(state); @@ -552,7 +552,7 @@ public synchronized void reset() { @Override public void onConnectionStateChanged(ConnectionStateListener.ConnectionStateChange state) { synchronized(this) { - latestChange = state; + stateChanges.add(state); reason = state.reason; Counter counter = stateCounts.get(state.current); if(counter == null) stateCounts.put(state.current, (counter = new Counter())); counter.incr(); @@ -573,15 +573,23 @@ private synchronized int getStateCount(ConnectionState state) { } private synchronized ConnectionState currentState() { - return latestChange == null ? connection.state : latestChange.current; + ConnectionStateChange stateChange = lastStateChange(); + return stateChange == null ? connection.state : stateChange.current; + } + + public synchronized ConnectionStateChange lastStateChange() { + if (stateChanges.size() == 0) { + return null; + } + return stateChanges.get(stateChanges.size() -1); } /** * Internal */ - private Connection connection; + private final Connection connection; private ErrorInfo reason; - private ConnectionStateChange latestChange; + private final List stateChanges = new ArrayList<>(); private Map stateCounts; private static final String TAG = ConnectionWaiter.class.getName(); } diff --git a/lib/src/test/java/io/ably/lib/test/realtime/RealtimeAuthTest.java b/lib/src/test/java/io/ably/lib/test/realtime/RealtimeAuthTest.java index c8950eed9..d50325d0b 100644 --- a/lib/src/test/java/io/ably/lib/test/realtime/RealtimeAuthTest.java +++ b/lib/src/test/java/io/ably/lib/test/realtime/RealtimeAuthTest.java @@ -4,7 +4,6 @@ import io.ably.lib.realtime.AblyRealtime; import io.ably.lib.realtime.Channel; import io.ably.lib.realtime.ChannelState; -import io.ably.lib.realtime.ConnectionEvent; import io.ably.lib.realtime.ConnectionState; import io.ably.lib.realtime.ConnectionStateListener; import io.ably.lib.rest.AblyRest; @@ -147,7 +146,8 @@ public void realtime_connection_with_auth_url_in_query_string_connects() { * Spec: RSA4d, RSA4d1 */ @Test - public void auth_client_fails_authorize_server_forbidden() { + public void auth_client_fails() { + AblyRealtime ablyRealtime = null; try { /* init ably for token */ ClientOptions optsForToken = createOptions(testVars.keys[0].keyStr); @@ -163,25 +163,13 @@ public void auth_client_fails_authorize_server_forbidden() { opts.authUrl = "https://echo.ably.io/respondwith"; opts.authParams = new Param[]{ new Param("status", 403)}; - final AblyRealtime ablyRealtime = new AblyRealtime(opts); + ablyRealtime = new AblyRealtime(opts); ablyRealtime.connection.connect(); /* wait for connected state */ Helpers.ConnectionWaiter connectionWaiter = new Helpers.ConnectionWaiter(ablyRealtime.connection); connectionWaiter.waitFor(ConnectionState.connected); - /* create listener for ConnectionEvent.failed */ - ablyRealtime.connection.once(ConnectionEvent.failed, new ConnectionStateListener() { - @Override - public void onConnectionStateChanged(ConnectionStateChange stateChange) { - /* assert that state changes correctly */ - assertEquals(ConnectionState.connected, stateChange.previous); - assertEquals(80019, stateChange.reason.code); - assertEquals(80019, ablyRealtime.connection.reason.code); - assertEquals(403, ablyRealtime.connection.reason.statusCode); - } - }); - try { opts.tokenDetails = null; /* try to authorize */ @@ -194,11 +182,21 @@ public void onConnectionStateChanged(ConnectionStateChange stateChange) { /* wait for failed state */ connectionWaiter.waitFor(ConnectionState.failed); + ConnectionStateListener.ConnectionStateChange lastStateChange = connectionWaiter.lastStateChange(); + assertEquals(ConnectionState.failed, lastStateChange.current); + assertEquals(80019, lastStateChange.reason.code); + assertEquals(403, lastStateChange.reason.statusCode); + assertEquals("Verify connected state has failed", ConnectionState.failed, ablyRealtime.connection.state); assertEquals("Check correct cause error code", 403, ablyRealtime.connection.reason.statusCode); + assertEquals(80019, ablyRealtime.connection.reason.code); + } catch (AblyException e) { e.printStackTrace(); fail(); + } finally { + assert ablyRealtime != null; + ablyRealtime.close(); } } @@ -350,7 +348,7 @@ public void auth_client_match_token_null_clientId() { assertEquals("Verify connected state is reached", ConnectionState.connected, ablyRealtime.connection.state); /* check expected clientId */ - assertEquals("Auth#clientId is expected to be null", null, ablyRealtime.auth.clientId); + assertNull("Auth#clientId is expected to be null", ablyRealtime.auth.clientId); ablyRealtime.close(); } catch (AblyException e) { @@ -383,7 +381,7 @@ public void auth_clientid_null_before_auth() { AblyRealtime ablyRealtime = new AblyRealtime(opts); /* check expected clientId */ - assertEquals("Auth#clientId is expected to be null", null, ablyRealtime.auth.clientId); + assertNull("Auth#clientId is expected to be null", ablyRealtime.auth.clientId); /* wait for connected state */ ablyRealtime.connection.connect(); @@ -688,7 +686,7 @@ public void auth_client_match_tokendetails_clientId_fail() { ClientOptions opts = createOptions(); opts.clientId = "options clientId"; opts.tokenDetails = tokenDetails; - AblyRealtime ablyRealtime = new AblyRealtime(opts); + new AblyRealtime(opts); } catch (AblyException e) { assertEquals("Verify error code indicates clientId mismatch", e.errorInfo.code, 40101); } @@ -773,7 +771,7 @@ public void auth_clientid_publish_implicit() { /* Get sent message */ Message messagePublished = protocolListener.sentMessages.get(0).messages[0]; - assertEquals("Sent message does not contain clientId", messagePublished.clientId, null); + assertNull("Sent message does not contain clientId", messagePublished.clientId); /* wait until message received on transport */ protocolListener.waitForRecv(1); @@ -819,7 +817,7 @@ public void auth_clientid_publish_implicit() { channel.publish(messageToPublish, pubComplete.add()); pubComplete.waitFor(); assertTrue("Verify publish callback called on completion", pubComplete.pending.isEmpty()); - assertTrue("Verify publish callback returns an error", pubComplete.errors.size() == 1); + assertEquals("Verify publish callback returns an error", 1, pubComplete.errors.size()); assertEquals("Verify publish callback error has expected error code", pubComplete.errors.iterator().next().code, 40012); /* verify no message sent or received on transport */ @@ -838,7 +836,7 @@ public void auth_clientid_publish_implicit() { /* Get sent message */ messagePublished = protocolListener.sentMessages.get(0).messages[0]; - assertEquals("Sent message does not contain clientId", messagePublished.clientId, null); + assertNull("Sent message does not contain clientId", messagePublished.clientId); /* wait until message received on transport */ protocolListener.waitForRecv(1); @@ -927,7 +925,7 @@ public void auth_clientid_publish_explicit_before_identified() { /* Get sent message */ messagePublished = protocolListener.sentMessages.get(0).messages[0]; - assertEquals("Sent message does not contain clientId", messagePublished.clientId, null); + assertNull("Sent message does not contain clientId", messagePublished.clientId); /* wait until message received on transport */ protocolListener.waitForRecv(1); @@ -996,7 +994,7 @@ public Object getTokenRequest(Auth.TokenParams params) { ably.connect(); try { opts.wait(); - } catch(InterruptedException ie) {} + } catch(InterruptedException ignored) {} ably.auth.renew(); } @@ -1066,7 +1064,7 @@ public Object getTokenRequest(Auth.TokenParams params) { ably.connect(); try { opts.wait(); - } catch(InterruptedException ie) {} + } catch(InterruptedException ignored) {} ably.auth.renewAuth((success, tokenDetails1, errorInfo) -> { //Ignore completion handling @@ -1183,7 +1181,7 @@ public void auth_expired_token_expire_before_connect_renew() { assertNotNull("Expected token value", tokenDetails.token); /* allow to expire */ - try { Thread.sleep(200L); } catch(InterruptedException ie) {} + try { Thread.sleep(200L); } catch(InterruptedException ignored) {} /* create Ably realtime instance with token and authCallback */ ClientOptions opts = createOptions(); diff --git a/lib/src/test/java/io/ably/lib/test/realtime/RealtimeChannelHistoryTest.java b/lib/src/test/java/io/ably/lib/test/realtime/RealtimeChannelHistoryTest.java index 17dbe8b24..b11439ff4 100644 --- a/lib/src/test/java/io/ably/lib/test/realtime/RealtimeChannelHistoryTest.java +++ b/lib/src/test/java/io/ably/lib/test/realtime/RealtimeChannelHistoryTest.java @@ -355,7 +355,7 @@ public void channelhistory_wait_b() { /* wait for the history to be persisted */ try { Thread.sleep(16000); - } catch(InterruptedException ie) {} + } catch(InterruptedException ignored) {} /* get the history for this channel */ PaginatedResult messages = channel.history(null); @@ -455,7 +455,7 @@ public void channelhistory_mixed_b() { /* wait for the history to be persisted */ try { Thread.sleep(16000); - } catch(InterruptedException ie) {} + } catch(InterruptedException ignored) {} /* publish to the channel */ msgComplete = new CompletionWaiter(); @@ -517,7 +517,7 @@ public void channelhistory_mixed_f() { /* wait for the history to be persisted */ try { Thread.sleep(16000); - } catch(InterruptedException ie) {} + } catch(InterruptedException ignored) {} /* publish to the channel */ msgComplete = new CompletionWaiter(); @@ -654,7 +654,6 @@ public void channelhistory_limit_b() { } catch (AblyException e) { e.printStackTrace(); fail("channelhistory_limit_b: Unexpected exception"); - return; } finally { if(ably != null) ably.close(); @@ -720,10 +719,7 @@ public void channelhistory_time_f() { for(int i = 20; i < 40; i++) expectedMessageHistory[i - 20] = messageContents.get("history" + i); Assert.assertArrayEquals("Expect messages in forward order", messages.items(), expectedMessageHistory); - } catch (AblyException e) { - e.printStackTrace(); - fail("channelhistory_time_f: Unexpected exception"); - } catch (InterruptedException e) { + } catch (AblyException | InterruptedException e) { e.printStackTrace(); fail("channelhistory_time_f: Unexpected exception"); } finally { @@ -791,10 +787,7 @@ public void channelhistory_time_b() { for(int i = 20; i < 40; i++) expectedMessageHistory[i - 20] = messageContents.get("history" + (59 - i)); Assert.assertArrayEquals("Expect messages in backwards order", messages.items(), expectedMessageHistory); - } catch (AblyException e) { - e.printStackTrace(); - fail("channelhistory_time_b: Unexpected exception"); - } catch (InterruptedException e) { + } catch (AblyException | InterruptedException e) { e.printStackTrace(); fail("channelhistory_time_b: Unexpected exception"); } finally { @@ -1205,7 +1198,7 @@ public void run() { /* wait 2 seconds */ try { Thread.sleep(2000L); - } catch(InterruptedException ie) {} + } catch(InterruptedException ignored) {} /* subscribe; this will trigger the attach */ MessageWaiter messageWaiter = new MessageWaiter(rxChannel); diff --git a/lib/src/test/java/io/ably/lib/test/realtime/RealtimeResumeTest.java b/lib/src/test/java/io/ably/lib/test/realtime/RealtimeResumeTest.java index 3a882cf18..461388f28 100644 --- a/lib/src/test/java/io/ably/lib/test/realtime/RealtimeResumeTest.java +++ b/lib/src/test/java/io/ably/lib/test/realtime/RealtimeResumeTest.java @@ -583,19 +583,16 @@ public void resume_publish_queue() { final Channel senderChannel = sender.channels.get(channelName); senderChannel.attach(); (new ChannelWaiter(senderChannel)).waitFor(ChannelState.attached); - assertEquals( - "The sender's channel should be attached", - senderChannel.state, ChannelState.attached - ); + assertEquals("The sender's channel should be attached", + senderChannel.state, ChannelState.attached); /* create and attach channel to recv on */ final Channel receiverChannel = receiver.channels.get(channelName); receiverChannel.attach(); (new ChannelWaiter(receiverChannel)).waitFor(ChannelState.attached); - assertEquals( - "The receiver's channel should be attached", - receiverChannel.state, ChannelState.attached - ); + assertEquals("The receiver's channel should be attached", + receiverChannel.state, ChannelState.attached); + /* subscribe */ MessageWaiter messageWaiter = new MessageWaiter(receiverChannel); @@ -612,10 +609,8 @@ public void resume_publish_queue() { /* wait for the subscription callback to be called */ messageWaiter.waitFor(messageCount); - assertEquals( - "Did not receive the entire first round of messages", - messageWaiter.receivedMessages.size(), messageCount - ); + assertEquals("Did not receive the entire first round of messages", + messageWaiter.receivedMessages.size(), messageCount); messageWaiter.reset(); /* disconnect the sender, without closing; @@ -641,7 +636,6 @@ public void resume_publish_queue() { sender.connection.connect(); (new ConnectionWaiter(sender.connection)).waitFor(ConnectionState.connected); - /* wait for the publish callback to be called.*/ errors = msgComplete2.waitFor(); assertEquals("Second round of messages (queued) has errors", 0, errors.length); @@ -655,10 +649,8 @@ public void resume_publish_queue() { received.size(), messageCount ); for(int i=0; i message.action == ProtocolMessage.Action.ack || message.action == ProtocolMessage.Action.nack); From 8e7b2724946a63ee037889c5d4f8a9b11ecaaabb Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Tue, 6 Feb 2024 18:03:58 +0530 Subject: [PATCH 23/23] Refactored channel resume tests --- .../io/ably/lib/realtime/ChannelStateListener.java | 2 +- .../lib/test/realtime/ConnectionManagerTest.java | 9 +++++---- .../ably/lib/test/realtime/RealtimeAuthTest.java | 2 +- .../ably/lib/test/realtime/RealtimeResumeTest.java | 14 ++++++-------- 4 files changed, 13 insertions(+), 14 deletions(-) diff --git a/lib/src/main/java/io/ably/lib/realtime/ChannelStateListener.java b/lib/src/main/java/io/ably/lib/realtime/ChannelStateListener.java index bf5582d4e..5dbb8ec48 100644 --- a/lib/src/main/java/io/ably/lib/realtime/ChannelStateListener.java +++ b/lib/src/main/java/io/ably/lib/realtime/ChannelStateListener.java @@ -77,7 +77,7 @@ public void onChannelStateChanged(ChannelStateChange stateChange) { for (final ChannelStateListener member : getMembers()) try { member.onChannelStateChanged(stateChange); - } catch(Throwable t) {} + } catch(Throwable ignored) {} } } diff --git a/lib/src/test/java/io/ably/lib/test/realtime/ConnectionManagerTest.java b/lib/src/test/java/io/ably/lib/test/realtime/ConnectionManagerTest.java index 1a9cf7325..292c96049 100644 --- a/lib/src/test/java/io/ably/lib/test/realtime/ConnectionManagerTest.java +++ b/lib/src/test/java/io/ably/lib/test/realtime/ConnectionManagerTest.java @@ -46,6 +46,7 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertSame; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -388,7 +389,7 @@ public void onConnectionStateChanged(ConnectionStateChange state) { /* wait for cm thread to exit */ try { Thread.sleep(2000L); - } catch(InterruptedException e) {} + } catch(InterruptedException ignored) {} assertEquals("Verify closed state is reached", ConnectionState.closed, ably.connection.state); Thread.State cmThreadState = threadContainer[0].getState(); @@ -462,7 +463,7 @@ public void run() { connectionWaiter.waitFor(ConnectionState.connected); assertEquals("Verify connected state is reached", ConnectionState.connected, ably.connection.state); - assertTrue("Not expecting token auth", ably.auth.getAuthMethod() == AuthMethod.basic); + assertSame("Not expecting token auth", ably.auth.getAuthMethod(), AuthMethod.basic); ably.close(); connectionWaiter.waitFor(ConnectionState.closed); @@ -471,7 +472,7 @@ public void run() { /* wait for cm thread to exit */ try { Thread.sleep(2000L); - } catch(InterruptedException e) {} + } catch(InterruptedException ignored) {} Thread.State cmThreadState = threadContainer[0].getState(); assertEquals("Verify cm thread has exited", cmThreadState, Thread.State.TERMINATED); @@ -510,7 +511,7 @@ public void run() { /* wait for cm thread to exit */ try { Thread.sleep(2000L); - } catch(InterruptedException e) {} + } catch(InterruptedException ignored) {} Thread.State cmThreadState = threadContainer[0].getState(); assertEquals("Verify cm thread has exited", cmThreadState, Thread.State.TERMINATED); diff --git a/lib/src/test/java/io/ably/lib/test/realtime/RealtimeAuthTest.java b/lib/src/test/java/io/ably/lib/test/realtime/RealtimeAuthTest.java index d50325d0b..d46014e58 100644 --- a/lib/src/test/java/io/ably/lib/test/realtime/RealtimeAuthTest.java +++ b/lib/src/test/java/io/ably/lib/test/realtime/RealtimeAuthTest.java @@ -88,7 +88,7 @@ public void auth_client_match_tokendetails_null_clientId() { assertEquals("Verify connected state is reached", ConnectionState.connected, ablyRealtime.connection.state); /* check expected clientId */ - assertEquals("Auth#clientId is expected to be null", null, ablyRealtime.auth.clientId); + assertNull("Auth#clientId is expected to be null", ablyRealtime.auth.clientId); ablyRealtime.close(); } catch (AblyException e) { diff --git a/lib/src/test/java/io/ably/lib/test/realtime/RealtimeResumeTest.java b/lib/src/test/java/io/ably/lib/test/realtime/RealtimeResumeTest.java index 461388f28..068054528 100644 --- a/lib/src/test/java/io/ably/lib/test/realtime/RealtimeResumeTest.java +++ b/lib/src/test/java/io/ably/lib/test/realtime/RealtimeResumeTest.java @@ -37,8 +37,6 @@ public class RealtimeResumeTest extends ParameterizedTest { - private static final String TAG = RealtimeResumeTest.class.getName(); - @Rule public Timeout testTimeout = Timeout.seconds(60); @@ -1014,7 +1012,6 @@ public void resume_rewind_1 () String testName = "resume_rewind_1"; try { - ClientOptions common_opts = createOptions(testVars.keys[0].keyStr); sender = new AblyRealtime(common_opts); receiver1 = new AblyRealtime(common_opts); @@ -1036,21 +1033,22 @@ public void onRawMessageRecv(ProtocolMessage message) {} }; receiver2 = new AblyRealtime(receiver2_opts); - Channel recever1_channel = receiver1.channels.get("[?rewind=1]" + testName); - Channel recever2_channel = receiver2.channels.get("[?rewind=1]" + testName); - Channel sender_channel = sender.channels.get(testName); + Channel receiver1_channel = receiver1.channels.get("[?rewind=1]" + testName); + Channel receiver2_channel = receiver2.channels.get("[?rewind=1]" + testName); + + Channel sender_channel = sender.channels.get(testName); sender_channel.attach(); (new ChannelWaiter(sender_channel)).waitFor(ChannelState.attached); sender_channel.publish("0", testMessage); /* subscribe 1*/ - MessageWaiter messageWaiter_1 = new MessageWaiter(recever1_channel); + MessageWaiter messageWaiter_1 = new MessageWaiter(receiver1_channel); messageWaiter_1.waitFor(1); assertEquals("Verify rewound message", testMessage, messageWaiter_1.receivedMessages.get(0).data); /* subscribe 2*/ - MessageWaiter messageWaiter_2 = new MessageWaiter(recever2_channel); + MessageWaiter messageWaiter_2 = new MessageWaiter(receiver2_channel); messageWaiter_2.waitFor(1, 7000); assertEquals("Verify no message received on attach_rewind", 0, messageWaiter_2.receivedMessages.size());