Skip to content

Commit

Permalink
Merge pull request #984 from ably/fix/no-connection-serial-tests
Browse files Browse the repository at this point in the history
Connection serial tests
  • Loading branch information
sacOO7 authored Feb 6, 2024
2 parents 564a96a + 8e7b272 commit ada8758
Show file tree
Hide file tree
Showing 12 changed files with 491 additions and 501 deletions.
20 changes: 11 additions & 9 deletions lib/src/main/java/io/ably/lib/realtime/ChannelBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -82,15 +82,13 @@ public abstract class ChannelBase extends EventEmitter<ChannelEvent, ChannelStat
*/
public ChannelProperties properties = new ChannelProperties();

private int retryCount = 0;
private int retryAttempt = 0;

/**
* @see #markAsReleased()
*/
private boolean released = false;



/***
* internal
*
Expand Down Expand Up @@ -129,7 +127,7 @@ private void setState(ChannelState newState, ErrorInfo reason, boolean resumed,
}

if (newState != ChannelState.attaching && newState != ChannelState.suspended) {
this.retryCount = 0;
this.retryAttempt = 0;
}

// RTP5a1
Expand Down Expand Up @@ -253,10 +251,11 @@ private void attachImpl(final boolean forceReattach, final CompletionListener li
attachMessage.setFlags(options.getModeFlags());
}
}
if(this.decodeFailureRecoveryInProgress) {
Log.v(TAG, "attach(); message decode recovery in progress.");
}
attachMessage.channelSerial = properties.channelSerial; // RTL4c1
if(this.decodeFailureRecoveryInProgress) { // RTL18c
Log.v(TAG, "attach(); message decode recovery in progress, setting last message channelserial");
attachMessage.channelSerial = this.lastPayloadProtocolMessageChannelSerial;
}
try {
if (listener != null) {
on(new ChannelStateCompletionListener(listener, ChannelState.attached, ChannelState.failed));
Expand Down Expand Up @@ -525,8 +524,8 @@ synchronized private void reattachAfterTimeout() {
}
reattachTimer = currentReattachTimer;

this.retryCount++;
int retryDelay = ReconnectionStrategy.getRetryTime(ably.options.channelRetryTimeout, retryCount);
this.retryAttempt++;
int retryDelay = ReconnectionStrategy.getRetryTime(ably.options.channelRetryTimeout, retryAttempt);

final Timer inProgressTimer = currentReattachTimer;
reattachTimer.schedule(new TimerTask() {
Expand Down Expand Up @@ -611,6 +610,7 @@ public void run() {

/* State changes provoked by ConnectionManager state changes. */
public void setConnected() {
// TODO - seems test is failing because of explicit attach after connect
if (state.isReattachable()){
attach(true,null); // RTN15c6, RTN15c7
}
Expand Down Expand Up @@ -839,6 +839,7 @@ private void onMessage(final ProtocolMessage protocolMessage) {
}

lastPayloadMessageId = lastMessage.id;
lastPayloadProtocolMessageChannelSerial = protocolMessage.channelSerial;

for (final Message msg : messages) {
this.listeners.onMessage(msg);
Expand Down Expand Up @@ -1341,6 +1342,7 @@ public void once(ChannelState state, ChannelStateListener listener) {
*/
private Set<ChannelMode> modes;
private String lastPayloadMessageId;
private String lastPayloadProtocolMessageChannelSerial;
private boolean decodeFailureRecoveryInProgress;
private final DecodingContext decodingContext;
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public void onChannelStateChanged(ChannelStateChange stateChange) {
for (final ChannelStateListener member : getMembers())
try {
member.onChannelStateChanged(stateChange);
} catch(Throwable t) {}
} catch(Throwable ignored) {}
}
}

Expand Down
152 changes: 130 additions & 22 deletions lib/src/test/java/io/ably/lib/test/common/Helpers.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package io.ably.lib.test.common;

import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.ArrayList;
Expand All @@ -18,6 +21,7 @@
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import com.google.gson.Gson;
import com.google.gson.JsonArray;
Expand All @@ -28,8 +32,10 @@
import io.ably.lib.debug.DebugOptions.RawProtocolListener;
import io.ably.lib.http.HttpCore;
import io.ably.lib.http.HttpUtils;
import io.ably.lib.realtime.AblyRealtime;
import io.ably.lib.realtime.Channel;
import io.ably.lib.realtime.Channel.MessageListener;
import io.ably.lib.realtime.ChannelEvent;
import io.ably.lib.realtime.ChannelState;
import io.ably.lib.realtime.ChannelStateListener;
import io.ably.lib.realtime.CompletionListener;
Expand Down Expand Up @@ -60,6 +66,7 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;

public class Helpers {

Expand Down Expand Up @@ -263,7 +270,7 @@ public MessageWaiter(Channel channel, String event) {
*/
public synchronized void waitFor(int count) {
while(receivedMessages.size() < count)
try { wait(); } catch(InterruptedException e) {}
try { wait(); } catch(InterruptedException ignored) {}
}

/**
Expand All @@ -274,7 +281,7 @@ public synchronized void waitFor(int count, long time) {
long targetTime = System.currentTimeMillis() + time;
long remaining = time;
while(receivedMessages.size() < count && remaining > 0) {
try { wait(remaining); } catch(InterruptedException e) {}
try { wait(remaining); } catch(InterruptedException ignored) {}
remaining = targetTime - System.currentTimeMillis();
}
}
Expand Down Expand Up @@ -401,6 +408,48 @@ public PresenceMessage contains(String clientId, String connectionId, PresenceMe
}
}

public static class MutableConnectionManager {
ConnectionManager connectionManager;

public MutableConnectionManager(AblyRealtime ablyRealtime) {
this.connectionManager = ablyRealtime.connection.connectionManager;
}

public void setField(String fieldName, long value) {
try {
Field connectionStateField = ConnectionManager.class.getDeclaredField(fieldName);
connectionStateField.setAccessible(true);
connectionStateField.setLong(connectionManager, value);
} catch (NoSuchFieldException | IllegalAccessException e) {
fail("Failed updating " + fieldName + " with error " + e);
}
}

public long getField(String fieldName) {
try {
Field connectionStateField = ConnectionManager.class.getDeclaredField(fieldName);
connectionStateField.setAccessible(true);
return connectionStateField.getLong(connectionManager);
} catch (NoSuchFieldException | IllegalAccessException e) {
fail("Failed accessing " + fieldName + " with error " + e);
}
return 0;
}

/**
* Suppress automatic retries by the connection manager and disconnect
*/
public void disconnectAndSuppressRetries() {
try {
Method method = ConnectionManager.class.getDeclaredMethod("disconnectAndSuppressRetries");
method.setAccessible(true);
method.invoke(connectionManager);
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
fail("Unexpected exception in suppressing retries");
}
}
}

/**
* A class that listens for state change events on a connection.
* @author paddy
Expand Down Expand Up @@ -428,7 +477,7 @@ public synchronized ErrorInfo waitFor(ConnectionState state) {
while (currentState() != state) {
try {
wait();
} catch (InterruptedException e) {
} catch (InterruptedException ignored) {
}
}
Log.d(TAG, "waitFor done: state=" + targetStateName + ")");
Expand All @@ -444,8 +493,8 @@ public synchronized void waitFor(ConnectionState state, int count) {
Log.d(TAG, "waitFor(state=" + state.getConnectionEvent().name() + ", count=" + count + ")");

while(getStateCount(state) < count)
try { wait(); } catch(InterruptedException e) {}
Log.d(TAG, "waitFor done: state=" + latestChange.current.getConnectionEvent().name() + ", count=" + getStateCount(state) + ")");
try { wait(); } catch(InterruptedException ignored) {}
Log.d(TAG, "waitFor done: state=" + lastStateChange().current.getConnectionEvent().name() + ", count=" + getStateCount(state) + ")");
}

/**
Expand All @@ -462,7 +511,7 @@ public synchronized boolean waitFor(ConnectionState state, int count, long time)
long remaining = time;
while(getStateCount(state) < count && remaining > 0) {
Log.d(TAG, "waitFor(state=" + state.getConnectionEvent().name() + ", waiting for=" + remaining + ")");
try { wait(remaining); } catch(InterruptedException e) {}
try { wait(remaining); } catch(InterruptedException ignored) {}
remaining = targetTime - System.currentTimeMillis();
}
int stateCount = getStateCount(state);
Expand Down Expand Up @@ -503,7 +552,7 @@ public synchronized void reset() {
@Override
public void onConnectionStateChanged(ConnectionStateListener.ConnectionStateChange state) {
synchronized(this) {
latestChange = state;
stateChanges.add(state);
reason = state.reason;
Counter counter = stateCounts.get(state.current); if(counter == null) stateCounts.put(state.current, (counter = new Counter()));
counter.incr();
Expand All @@ -524,15 +573,23 @@ private synchronized int getStateCount(ConnectionState state) {
}

private synchronized ConnectionState currentState() {
return latestChange == null ? connection.state : latestChange.current;
ConnectionStateChange stateChange = lastStateChange();
return stateChange == null ? connection.state : stateChange.current;
}

public synchronized ConnectionStateChange lastStateChange() {
if (stateChanges.size() == 0) {
return null;
}
return stateChanges.get(stateChanges.size() -1);
}

/**
* Internal
*/
private Connection connection;
private final Connection connection;
private ErrorInfo reason;
private ConnectionStateChange latestChange;
private final List<ConnectionStateChange> stateChanges = new ArrayList<>();
private Map<ConnectionState, Counter> stateCounts;
private static final String TAG = ConnectionWaiter.class.getName();
}
Expand All @@ -557,14 +614,14 @@ public ConnectionManagerWaiter(ConnectionManager connectionManager) {
*/
public synchronized ErrorInfo waitFor(ConnectionState state) {
while(connectionManager.getConnectionState().state != state)
try { wait(INTERVAL_POLLING); } catch(InterruptedException e) {}
try { wait(INTERVAL_POLLING); } catch(InterruptedException ignored) {}
return connectionManager.getConnectionState().defaultErrorInfo;
}

/**
* Internal
*/
private ConnectionManager connectionManager;
private final ConnectionManager connectionManager;
}

/**
Expand All @@ -577,7 +634,6 @@ public static class ChannelWaiter implements ChannelStateListener {

/**
* Public API
* @param channel
*/
public ChannelWaiter(Channel channel) {
this.channel = channel;
Expand All @@ -586,28 +642,80 @@ public ChannelWaiter(Channel channel) {

/**
* Wait for a given state to be reached.
* @param state
*/
public synchronized ErrorInfo waitFor(ChannelState state) {
Log.d(TAG, "waitFor(" + state + ")");
while(channel.state != state)
try { wait(); } catch(InterruptedException e) {}
Log.d(TAG, "waitFor done: " + channel.state + ", " + channel.reason + ")");
public synchronized ErrorInfo waitFor(ChannelState ... states) {
for (ChannelState state : states) {
Log.d(TAG, "waitFor(" + state + ")");
while(channel.state != state)
try { wait(); } catch(InterruptedException ignored) {}
Log.d(TAG, "waitFor done: " + channel.state + ", " + channel.reason + ")");
}
return channel.reason;
}

/**
* Wait for a given ChannelEvent to be reached.
*/
public synchronized ChannelStateChange waitFor(ChannelEvent channelEvent) {
Log.d(TAG, "waitFor(" + channelEvent + ")");
ChannelStateChange lastStateChange = getLastStateChange();
while(lastStateChange.event != channelEvent)
try { wait(); } catch(InterruptedException ignored) {}
Log.d(TAG, "waitFor done: " + channel.state + ", " + channel.reason + ")");
return lastStateChange;
}

/**
* ChannelStateListener interface
*/
@Override
public void onChannelStateChanged(ChannelStateListener.ChannelStateChange stateChange) {
synchronized(this) { notify(); }
public void onChannelStateChanged(ChannelStateChange stateChange) {
synchronized(this) {
recordedStates.add(stateChange);
notify();
}
}

private final List<ChannelStateChange> recordedStates = Collections.synchronizedList(new ArrayList<>());

public List<ChannelState> getRecordedStates() {
return recordedStates.stream().map(stateChange -> stateChange.current).collect(Collectors.toList());
}

public boolean hasFinalStates(ChannelState ... states) {
List<ChannelState> rstates = getRecordedStates();
List<ChannelState> vettedList = rstates.subList(rstates.size() - states.length, rstates.size());
return hasStates(vettedList, states);
}

public boolean hasStates(ChannelState ... states) {
return hasStates(getRecordedStates(), states);
}

private static boolean hasStates(List<ChannelState> stateList, ChannelState ... states) {
boolean foundStates = false;
int statesCounter = 0;
for (ChannelState recordedState : stateList) {
if (states[statesCounter] != recordedState) {
statesCounter = 0;
}
if (states[statesCounter] == recordedState) {
statesCounter++;
}
if (statesCounter == states.length) {
foundStates = true;
}
}
return foundStates;
}

public ChannelStateChange getLastStateChange() {
return recordedStates.get(recordedStates.size()-1);
}
/**
* Internal
*/
private Channel channel;
private final Channel channel;
}

/**
Expand Down
Loading

0 comments on commit ada8758

Please sign in to comment.