Skip to content

Commit

Permalink
Refactored/simplified onSync and onPresence implementation for presence
Browse files Browse the repository at this point in the history
  • Loading branch information
sacOO7 committed Dec 5, 2023
1 parent d003348 commit 0cab20f
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 91 deletions.
49 changes: 2 additions & 47 deletions lib/src/main/java/io/ably/lib/realtime/ChannelBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -349,26 +349,6 @@ private void sendDetachMessage(CompletionListener listener) throws AblyException
}
}

public void sync() throws AblyException {
Log.v(TAG, "sync(); channel = " + name);
/* check preconditions */
switch(state) {
case initialized:
case detaching:
case detached:
throw AblyException.fromErrorInfo(new ErrorInfo("Unable to sync to channel; not attached", 40000));
default:
}
ConnectionManager connectionManager = ably.connection.connectionManager;
if(!connectionManager.isActive())
throw AblyException.fromErrorInfo(connectionManager.getStateErrorInfo());

/* send sync request */
ProtocolMessage syncMessage = new ProtocolMessage(Action.sync, this.name);
syncMessage.channelSerial = syncChannelSerial;
connectionManager.send(syncMessage, true, null);
}

/***
* internal
*
Expand Down Expand Up @@ -888,30 +868,6 @@ public void onError(ErrorInfo reason) {
});
}

private void onPresence(ProtocolMessage message, String syncChannelSerial) {
Log.v(TAG, "onPresence(); channel = " + name + "; syncChannelSerial = " + syncChannelSerial);
PresenceMessage[] messages = message.presence;
for(int i = 0; i < messages.length; i++) {
PresenceMessage msg = messages[i];
try {
msg.decode(options);
} catch (MessageDecodeException e) {
Log.e(TAG, String.format(Locale.ROOT, "%s on channel %s", e.errorInfo.message, name));
}
/* populate fields derived from protocol message */
if(msg.connectionId == null) msg.connectionId = message.connectionId;
if(msg.timestamp == 0) msg.timestamp = message.timestamp;
if(msg.id == null) msg.id = message.id + ':' + i;
}
presence.setPresence(messages, true, syncChannelSerial);
}

private void onSync(ProtocolMessage message) {
Log.v(TAG, "onSync(); channel = " + name);
if(message.presence != null)
onPresence(message, (syncChannelSerial = message.channelSerial));
}

private MessageMulticaster listeners = new MessageMulticaster();
private HashMap<String, MessageMulticaster> eventListeners = new HashMap<String, MessageMulticaster>();

Expand Down Expand Up @@ -1337,10 +1293,10 @@ void onChannelMessage(ProtocolMessage msg) {
}
break;
case presence:
onPresence(msg, null);
presence.onPresence(msg);
break;
case sync:
onSync(msg);
presence.onSync(msg);
break;
case error:
setFailed(msg.error);
Expand Down Expand Up @@ -1375,7 +1331,6 @@ public void once(ChannelState state, ChannelStateListener listener) {
final AblyRealtime ably;
final String basePath;
ChannelOptions options;
String syncChannelSerial;
/**
* Optional <a href="https://ably.com/docs/realtime/channels/channel-parameters/overview">channel parameters</a>
* that configure the behavior of the channel.
Expand Down
124 changes: 80 additions & 44 deletions lib/src/main/java/io/ably/lib/realtime/Presence.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,15 @@
import io.ably.lib.types.AsyncPaginatedResult;
import io.ably.lib.types.Callback;
import io.ably.lib.types.ErrorInfo;
import io.ably.lib.types.MessageDecodeException;
import io.ably.lib.types.PaginatedResult;
import io.ably.lib.types.Param;
import io.ably.lib.types.PresenceMessage;
import io.ably.lib.types.PresenceSerializer;
import io.ably.lib.types.ProtocolMessage;
import io.ably.lib.util.Log;
import io.ably.lib.util.StringUtils;

import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumMap;
Expand Down Expand Up @@ -329,62 +332,95 @@ private void endSync() {
member.id = null;
member.timestamp = System.currentTimeMillis();
}
broadcastPresence(residualMembers.toArray(new PresenceMessage[residualMembers.size()]));
broadcastPresence(residualMembers);
}

void setPresence(PresenceMessage[] messages, boolean broadcast, String syncChannelSerial) {
Log.v(TAG, "setPresence(); channel = " + channel.name + "; broadcast = " + broadcast + "; syncChannelSerial = " + syncChannelSerial);
String syncCursor = null;
if(syncChannelSerial != null) {
int colonPos = syncChannelSerial.indexOf(':');
String serial = colonPos >= 0 ? syncChannelSerial.substring(0, colonPos) : syncChannelSerial;
/* Discard incomplete sync if serial has changed */
if (presence.syncInProgress && currentSyncChannelSerial != null && !currentSyncChannelSerial.equals(serial))
endSync();
syncCursor = syncChannelSerial.substring(colonPos);
if(syncCursor.length() > 1) {
presence.startSync();
currentSyncChannelSerial = serial;
private void updateInnerMessageFields(ProtocolMessage message) {
for(int i = 0; i < message.presence.length; i++) {
PresenceMessage msg = message.presence[i];
try {
msg.decode(channel.options);
} catch (MessageDecodeException e) {
Log.e(TAG, String.format(Locale.ROOT, "%s on channel %s", e.errorInfo.message, channel.name));
}
/* populate fields derived from protocol message */
if(msg.connectionId == null) msg.connectionId = message.connectionId;
if(msg.timestamp == 0) msg.timestamp = message.timestamp;
if(msg.id == null) msg.id = message.id + ':' + i;
}
for(PresenceMessage update : messages) {
boolean updateInternalPresence = update.connectionId.equals(channel.ably.connection.id);
boolean broadcastThisUpdate = broadcast;
PresenceMessage originalUpdate = update;

switch(update.action) {
case enter:
case update:
update = (PresenceMessage)update.clone();
update.action = PresenceMessage.Action.present;
case present:
broadcastThisUpdate &= presence.put(update);
if(updateInternalPresence)
internalPresence.put(update);
break;
case leave:
broadcastThisUpdate &= presence.remove(update);
if(updateInternalPresence)
internalPresence.remove(update);
break;
case absent:
}

void onSync(ProtocolMessage protocolMessage) {
String syncCursor = null;
String syncChannelSerial = protocolMessage.channelSerial;
// RTP18a
if(!StringUtils.isNullOrEmpty(syncChannelSerial)) {
String[] serials = syncChannelSerial.split(":");
String syncSequenceId = serials[0];
syncCursor = serials.length > 1 ? serials[1] : "";

/* If a new sequence identifier is sent from Ably, then the client library
* must consider that to be the start of a new sync sequence
* and any previous in-flight sync should be discarded. (part of RTP18)*/
if (presence.syncInProgress && !StringUtils.isNullOrEmpty(currentSyncChannelSerial)
&& !currentSyncChannelSerial.equals(syncSequenceId)) {
endSync();
}

/*
* RTP2g: Any incoming presence message that passes the newness check should be emitted on the
* Presence object, with an event name set to its original action.
*/
if (broadcastThisUpdate)
broadcastPresence(new PresenceMessage[]{originalUpdate});
presence.startSync();

if (!StringUtils.isNullOrEmpty(syncCursor))
{
currentSyncChannelSerial = syncSequenceId;
}
}

/* if this is the last message in a sequence of sync updates, end the sync */
if(syncChannelSerial == null || syncCursor.length() <= 1) {
onPresence(protocolMessage);

// RTP18b, RTP18c
if (StringUtils.isNullOrEmpty(syncChannelSerial) || StringUtils.isNullOrEmpty(syncCursor))
{
endSync();
currentSyncChannelSerial = null;
}
}

private void broadcastPresence(PresenceMessage[] messages) {
void onPresence(ProtocolMessage protocolMessage) {
updateInnerMessageFields(protocolMessage);
List<PresenceMessage> updatedPresenceMessages = new ArrayList<>();
for(PresenceMessage presenceMessage : protocolMessage.presence) {
boolean updateInternalPresence = presenceMessage.connectionId.equals(channel.ably.connection.id);
boolean memberUpdated = false;

switch(presenceMessage.action) {
case enter:
case update:
case present:
PresenceMessage shallowClone = (PresenceMessage)presenceMessage.clone();
shallowClone.action = PresenceMessage.Action.present;
memberUpdated = presence.put(shallowClone);
if(updateInternalPresence)
internalPresence.put(presenceMessage);
break;
case leave:
memberUpdated = presence.remove(presenceMessage);
if(updateInternalPresence)
internalPresence.remove(presenceMessage);
break;
case absent:
}
if (memberUpdated) {
updatedPresenceMessages.add(presenceMessage);
}
}
/*
* RTP2g: Any incoming presence message that passes the newness check should be emitted on the
* Presence object, with an event name set to its original action.
*/
broadcastPresence(updatedPresenceMessages);
}

private void broadcastPresence(List<PresenceMessage> messages) {
for(PresenceMessage message : messages) {
listeners.onPresenceMessage(message);

Expand Down

0 comments on commit 0cab20f

Please sign in to comment.