Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Connection serial tests #984

Merged
merged 25 commits into from
Feb 6, 2024
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
32e0687
Fixed failing tests for recovery key
sacOO7 Dec 20, 2023
6ea5c2b
Refactored test file for no connection serial
sacOO7 Jan 8, 2024
cfc7cb3
Merge branch 'feature/no-connection-serial' into fix/no-connection-se…
sacOO7 Jan 23, 2024
75a8032
updated realtime channel test for resume flag
sacOO7 Jan 23, 2024
ea65bff
Added todo that checks for exact error
sacOO7 Jan 24, 2024
cd355b6
Fixed test for server injected attach
sacOO7 Jan 25, 2024
969d1d5
Refactored helper method to wait channel event
sacOO7 Jan 26, 2024
f84e07a
Added test for server initiated detached
sacOO7 Jan 26, 2024
da74762
Fixed connect reauth failure test
sacOO7 Jan 26, 2024
5297b40
Added a test for valid/invalid resume channel attach
sacOO7 Jan 26, 2024
e9790c0
Simplified channel attach/detach assertions
sacOO7 Jan 26, 2024
f7c5437
Added channelStateChange specific helpers with recorders
sacOO7 Jan 29, 2024
a72e990
Fixed tests for re-attaching channels on connection resume success/fa…
sacOO7 Jan 29, 2024
9a6a7f4
Refactored realtime delta decoder helpers
sacOO7 Jan 29, 2024
f92ab59
Merge branch 'main' into fix/no-connection-serial-tests
sacOO7 Jan 29, 2024
d13051f
Fixed deltadecode failure recovery test
sacOO7 Jan 30, 2024
5449a9b
Fixed test for resume publish reenter with right message size
sacOO7 Jan 30, 2024
98e1e8f
refactored test for resume publish re-enter
sacOO7 Jan 30, 2024
4fded69
refactored code, added a separate class for updating connectionmanage…
sacOO7 Jan 30, 2024
c93f89d
Fixed checkstyle issues for integration tests
sacOO7 Jan 31, 2024
e93f0ca
Moved mutableConnection manager under helpers
sacOO7 Jan 31, 2024
2436f67
Updating complex tests where reflection is used
sacOO7 Jan 31, 2024
e791f8f
refactored tests with easier test helper implementation
sacOO7 Jan 31, 2024
68a0944
Refactored ably-java tests, removed unnecessary callbacks
sacOO7 Feb 4, 2024
8e7b272
Refactored channel resume tests
sacOO7 Feb 6, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 11 additions & 9 deletions lib/src/main/java/io/ably/lib/realtime/ChannelBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -82,15 +82,13 @@ public abstract class ChannelBase extends EventEmitter<ChannelEvent, ChannelStat
*/
public ChannelProperties properties = new ChannelProperties();

private int retryCount = 0;
private int retryAttempt = 0;

/**
* @see #markAsReleased()
*/
private boolean released = false;



/***
* internal
*
Expand Down Expand Up @@ -129,7 +127,7 @@ private void setState(ChannelState newState, ErrorInfo reason, boolean resumed,
}

if (newState != ChannelState.attaching && newState != ChannelState.suspended) {
this.retryCount = 0;
this.retryAttempt = 0;
}

// RTP5a1
Expand Down Expand Up @@ -253,10 +251,11 @@ private void attachImpl(final boolean forceReattach, final CompletionListener li
attachMessage.setFlags(options.getModeFlags());
}
}
if(this.decodeFailureRecoveryInProgress) {
Log.v(TAG, "attach(); message decode recovery in progress.");
}
attachMessage.channelSerial = properties.channelSerial; // RTL4c1
if(this.decodeFailureRecoveryInProgress) { // RTL18c
Log.v(TAG, "attach(); message decode recovery in progress, setting last message channelserial");
attachMessage.channelSerial = this.lastPayloadProtocolMessageChannelSerial;
}
try {
if (listener != null) {
on(new ChannelStateCompletionListener(listener, ChannelState.attached, ChannelState.failed));
Expand Down Expand Up @@ -525,8 +524,8 @@ synchronized private void reattachAfterTimeout() {
}
reattachTimer = currentReattachTimer;

this.retryCount++;
int retryDelay = ReconnectionStrategy.getRetryTime(ably.options.channelRetryTimeout, retryCount);
this.retryAttempt++;
int retryDelay = ReconnectionStrategy.getRetryTime(ably.options.channelRetryTimeout, retryAttempt);

final Timer inProgressTimer = currentReattachTimer;
reattachTimer.schedule(new TimerTask() {
Expand Down Expand Up @@ -611,6 +610,7 @@ public void run() {

/* State changes provoked by ConnectionManager state changes. */
public void setConnected() {
// TODO - seems test is failing because of explicit attach after connect
if (state.isReattachable()){
attach(true,null); // RTN15c6, RTN15c7
}
Expand Down Expand Up @@ -839,6 +839,7 @@ private void onMessage(final ProtocolMessage protocolMessage) {
}

lastPayloadMessageId = lastMessage.id;
lastPayloadProtocolMessageChannelSerial = protocolMessage.channelSerial;

for (final Message msg : messages) {
this.listeners.onMessage(msg);
Expand Down Expand Up @@ -1341,6 +1342,7 @@ public void once(ChannelState state, ChannelStateListener listener) {
*/
private Set<ChannelMode> modes;
private String lastPayloadMessageId;
private String lastPayloadProtocolMessageChannelSerial;
private boolean decodeFailureRecoveryInProgress;
private final DecodingContext decodingContext;
}
118 changes: 104 additions & 14 deletions lib/src/test/java/io/ably/lib/test/common/Helpers.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package io.ably.lib.test.common;

import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.ArrayList;
Expand All @@ -18,6 +21,7 @@
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import com.google.gson.Gson;
import com.google.gson.JsonArray;
Expand All @@ -28,8 +32,10 @@
import io.ably.lib.debug.DebugOptions.RawProtocolListener;
import io.ably.lib.http.HttpCore;
import io.ably.lib.http.HttpUtils;
import io.ably.lib.realtime.AblyRealtime;
import io.ably.lib.realtime.Channel;
import io.ably.lib.realtime.Channel.MessageListener;
import io.ably.lib.realtime.ChannelEvent;
import io.ably.lib.realtime.ChannelState;
import io.ably.lib.realtime.ChannelStateListener;
import io.ably.lib.realtime.CompletionListener;
Expand Down Expand Up @@ -60,6 +66,7 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;

public class Helpers {

Expand Down Expand Up @@ -263,7 +270,7 @@ public MessageWaiter(Channel channel, String event) {
*/
public synchronized void waitFor(int count) {
while(receivedMessages.size() < count)
try { wait(); } catch(InterruptedException e) {}
try { wait(); } catch(InterruptedException ignored) {}
}

/**
Expand All @@ -274,7 +281,7 @@ public synchronized void waitFor(int count, long time) {
long targetTime = System.currentTimeMillis() + time;
long remaining = time;
while(receivedMessages.size() < count && remaining > 0) {
try { wait(remaining); } catch(InterruptedException e) {}
try { wait(remaining); } catch(InterruptedException ignored) {}
remaining = targetTime - System.currentTimeMillis();
}
}
Expand Down Expand Up @@ -401,6 +408,38 @@ public PresenceMessage contains(String clientId, String connectionId, PresenceMe
}
}

public static class MutableConnectionManager {
ConnectionManager connectionManager;

public MutableConnectionManager(AblyRealtime ablyRealtime) {
this.connectionManager = ablyRealtime.connection.connectionManager;
}

public void setField(String fieldName, long value) {
Field connectionStateField = null;
try {
connectionStateField = ConnectionManager.class.getDeclaredField(fieldName);
connectionStateField.setAccessible(true);
connectionStateField.setLong(connectionManager, value);
} catch (NoSuchFieldException | IllegalAccessException e) {
fail("Unexpected exception in checking connectionStateTtl");
}
}

/**
* Suppress automatic retries by the connection manager and disconnect
*/
public void disconnectAndSuppressRetries() {
try {
Method method = ConnectionManager.class.getDeclaredMethod("disconnectAndSuppressRetries");
method.setAccessible(true);
method.invoke(connectionManager);
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
fail("Unexpected exception in suppressing retries");
}
}
}

/**
* A class that listens for state change events on a connection.
* @author paddy
Expand Down Expand Up @@ -557,14 +596,14 @@ public ConnectionManagerWaiter(ConnectionManager connectionManager) {
*/
public synchronized ErrorInfo waitFor(ConnectionState state) {
while(connectionManager.getConnectionState().state != state)
try { wait(INTERVAL_POLLING); } catch(InterruptedException e) {}
try { wait(INTERVAL_POLLING); } catch(InterruptedException ignored) {}
return connectionManager.getConnectionState().defaultErrorInfo;
}

/**
* Internal
*/
private ConnectionManager connectionManager;
private final ConnectionManager connectionManager;
}

/**
Expand All @@ -577,7 +616,6 @@ public static class ChannelWaiter implements ChannelStateListener {

/**
* Public API
* @param channel
*/
public ChannelWaiter(Channel channel) {
this.channel = channel;
Expand All @@ -586,28 +624,80 @@ public ChannelWaiter(Channel channel) {

/**
* Wait for a given state to be reached.
* @param state
*/
public synchronized ErrorInfo waitFor(ChannelState state) {
Log.d(TAG, "waitFor(" + state + ")");
while(channel.state != state)
try { wait(); } catch(InterruptedException e) {}
Log.d(TAG, "waitFor done: " + channel.state + ", " + channel.reason + ")");
public synchronized ErrorInfo waitFor(ChannelState ... states) {
for (ChannelState state : states) {
Log.d(TAG, "waitFor(" + state + ")");
while(channel.state != state)
try { wait(); } catch(InterruptedException ignored) {}
Log.d(TAG, "waitFor done: " + channel.state + ", " + channel.reason + ")");
}
return channel.reason;
}

/**
* Wait for a given ChannelEvent to be reached.
*/
public synchronized ChannelStateChange waitFor(ChannelEvent channelEvent) {
Log.d(TAG, "waitFor(" + channelEvent + ")");
ChannelStateChange lastStateChange = getLastStateChange();
while(lastStateChange.event != channelEvent)
try { wait(); } catch(InterruptedException ignored) {}
Log.d(TAG, "waitFor done: " + channel.state + ", " + channel.reason + ")");
return lastStateChange;
}

/**
* ChannelStateListener interface
*/
@Override
public void onChannelStateChanged(ChannelStateListener.ChannelStateChange stateChange) {
synchronized(this) { notify(); }
public void onChannelStateChanged(ChannelStateChange stateChange) {
synchronized(this) {
recordedStates.add(stateChange);
notify();
}
}

private final List<ChannelStateChange> recordedStates = Collections.synchronizedList(new ArrayList<>());

public List<ChannelState> getRecordedStates() {
return recordedStates.stream().map(stateChange -> stateChange.current).collect(Collectors.toList());
}

public boolean hasFinalStates(ChannelState ... states) {
List<ChannelState> rstates = getRecordedStates();
List<ChannelState> vettedList = rstates.subList(rstates.size() - states.length, rstates.size());
return hasStates(vettedList, states);
}

public boolean hasStates(ChannelState ... states) {
return hasStates(getRecordedStates(), states);
}

private static boolean hasStates(List<ChannelState> stateList, ChannelState ... states) {
boolean foundStates = false;
int statesCounter = 0;
for (ChannelState recordedState : stateList) {
if (states[statesCounter] != recordedState) {
statesCounter = 0;
}
if (states[statesCounter] == recordedState) {
statesCounter++;
}
if (statesCounter == states.length) {
foundStates = true;
}
}
return foundStates;
}

public ChannelStateChange getLastStateChange() {
return recordedStates.get(recordedStates.size()-1);
}
/**
* Internal
*/
private Channel channel;
private final Channel channel;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -618,14 +618,7 @@ public void onConnectionStateChanged(ConnectionStateChange state) {
connectionWaiter.waitFor(ConnectionState.connected);
final String firstConnectionId = ably.connection.id;

/* suppress automatic retries by the connection manager and disconnect */
try {
Method method = ably.connection.connectionManager.getClass().getDeclaredMethod("disconnectAndSuppressRetries");
method.setAccessible(true);
method.invoke(ably.connection.connectionManager);
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
fail("Unexpected exception in suppressing retries");
}
new Helpers.MutableConnectionManager(ably).disconnectAndSuppressRetries();
connectionWaiter.waitFor(ConnectionState.disconnected);
assertEquals("Disconnected state was not reached", ConnectionState.disconnected, ably.connection.state);

Expand Down Expand Up @@ -726,14 +719,7 @@ public void onChannelStateChanged(ChannelStateChange stateChange) {
attachedChannel.attach();
attachedChannelWaiter.waitFor(ChannelState.attached);

/* suppress automatic retries by the connection manager and disconnect */
try {
Method method = ably.connection.connectionManager.getClass().getDeclaredMethod("disconnectAndSuppressRetries");
method.setAccessible(true);
method.invoke(ably.connection.connectionManager);
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
fail("Unexpected exception in suppressing retries");
}
new Helpers.MutableConnectionManager(ably).disconnectAndSuppressRetries();
connectionWaiter.waitFor(ConnectionState.disconnected);
assertEquals("Disconnected state was not reached", ConnectionState.disconnected, ably.connection.state);

Expand Down
Loading
Loading