Skip to content

Commit

Permalink
Merge pull request #1013 from ably/ECO-4820/fix-imidiate-closing
Browse files Browse the repository at this point in the history
[ECO-4820] fix(ConnectionManager): update the connection close implementation to follow RTN12f
  • Loading branch information
ttypic authored Jun 17, 2024
2 parents 93adeff + 478e524 commit 7166263
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 11 deletions.
46 changes: 35 additions & 11 deletions lib/src/main/java/io/ably/lib/transport/ConnectionManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -388,8 +388,9 @@ StateIndication onTimeout() {
@Override
void enact(StateIndication stateIndication, ConnectionStateChange change) {
super.enact(stateIndication, change);
boolean closed = closeImpl();
if(closed) {
boolean shouldAwaitConnection = change.previous == ConnectionState.connecting;
boolean closed = closeImpl(shouldAwaitConnection);
if (closed) {
addAction(new AsynchronousStateChangeAction(ConnectionState.closed));
}
}
Expand Down Expand Up @@ -1160,7 +1161,13 @@ public void onMessage(ITransport transport, ProtocolMessage message) throws Ably
}
break;
case connected:
onConnected(message);
if (currentState.state == ConnectionState.closing) {
// Based on RTN12f, if a connected protocol message comes while in the closing state,
// send a close protocol message.
if (!trySendCloseProtocolMessage()) requestState(ConnectionState.closed);
} else {
onConnected(message);
}
break;
case disconnect:
case disconnected:
Expand Down Expand Up @@ -1452,6 +1459,12 @@ public synchronized void onTransportUnavailable(ITransport transport, ErrorInfo
setSuspendTime();
}

// Do not fallback for closing
if (currentState.state == ConnectionState.closing) {
requestState(ConnectionState.closed);
return;
}

/* if this is a failure of a pending connection attempt, decide whether or not to attempt a fallback host */
StateIndication fallbackAttempt = checkFallback(reason);
if(fallbackAttempt != null) {
Expand Down Expand Up @@ -1524,27 +1537,38 @@ private void connectImpl(StateIndication request) {

/**
* Close any existing transport
* @param shouldAwaitConnection true if `CONNECTING` state, moves immediately to `CLOSING`
* @return closed if true, otherwise awaiting closed indication
*/
private boolean closeImpl() {
if(transport == null) {
private boolean closeImpl(boolean shouldAwaitConnection) {
if (transport == null) {
return true;
}

// Based on RTN12f we need to wait until connected protocol message come
if (shouldAwaitConnection) {
return false;
}

return !trySendCloseProtocolMessage();
}

/**
* @return true if we successfully send `close` protocol message, false otherwise
*/
private boolean trySendCloseProtocolMessage() {
try {
Log.v(TAG, "Requesting connection close");
transport.send(new ProtocolMessage(ProtocolMessage.Action.close));
return false;
return true;
} catch (AblyException e) {
/* we're closing, and the attempt to send the CLOSE message failed;
* continue, because we're not going to reinstate the transport
* just to send a CLOSE message */
Log.v(TAG, "Closing incomplete transport");
clearTransport();
return false;
}

/* just close the transport */
Log.v(TAG, "Closing incomplete transport");
clearTransport();
return true;
}

private void clearTransport() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,25 @@ public void onConnectionStateChanged(ConnectionStateChange state) {
assertEquals("Verify cm thread has exited", cmThreadState, Thread.State.TERMINATED);
}

/**
* (RTN12f) Close while in connecting state
*/
@Test
public void connectionmanager_close_while_connecting() throws AblyException {
ClientOptions opts = createOptions(testVars.keys[0].keyStr);
final AblyRealtime ably = new AblyRealtime(opts);
ConnectionWaiter connectionWaiter = new ConnectionWaiter(ably.connection);
ConnectionManager connectionManager = ably.connection.connectionManager;
ably.close();

connectionWaiter.waitFor(ConnectionState.closed);
assertEquals("Previous state was closing", ConnectionState.closing, connectionWaiter.lastStateChange().previous);
assertEquals(1 , connectionWaiter.getCount(ConnectionState.connecting));
assertEquals(0 , connectionWaiter.getCount(ConnectionState.connected));
assertEquals("Verify closed state is reached", ConnectionState.closed, ably.connection.state);
assertThat("fallback hasn't been invoked", connectionManager.getHost(), is(equalTo(opts.environment + "-realtime.ably.io")));
}

/**
* Connect, and then perform a close();
* verify that the closed state is reached, and immediately
Expand Down

0 comments on commit 7166263

Please sign in to comment.