Skip to content

Commit

Permalink
Implemented onAttach channel presence
Browse files Browse the repository at this point in the history
1. Added private onAttach method to presence_manager
2. Depending on the has_presence_flag, sync is marked either as started or none
3. Entering local members when onAttach is called
4. Removed client initiated sync from presence code ( as per spec )
5. Removed entering local members at the end of the sync
  • Loading branch information
sacOO7 committed Jun 6, 2024
1 parent 9dbe92c commit 5b0a006
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 99 deletions.
19 changes: 5 additions & 14 deletions lib/ably/realtime/channel/channel_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ def detach(error, previous_state)
# Channel is attached, notify presence if sync is expected
def attached(attached_protocol_message)
# If no attached ProtocolMessage then this attached request was triggered by the client
# library, such as returning to attached whne detach has failed
# library, such as returning to attached when detach has failed
if attached_protocol_message
update_presence_sync_state_following_attached attached_protocol_message
channel.presence.manager.on_attach attached_protocol_message.has_presence_flag?
channel.properties.set_attach_serial(attached_protocol_message.channel_serial)
channel.options.set_modes_from_flags(attached_protocol_message.flags)
channel.options.set_params(attached_protocol_message.params)
Expand All @@ -60,6 +60,7 @@ def request_reattach(options = {})
end

def duplicate_attached_received(protocol_message)
logger.debug { "Server initiated ATTACHED message received for channel '#{channel.name}' with state #{channel.state}" }
if protocol_message.error
channel.set_channel_error_reason protocol_message.error
log_channel_error protocol_message.error
Expand All @@ -68,17 +69,15 @@ def duplicate_attached_received(protocol_message)
channel.properties.set_attach_serial(protocol_message.channel_serial)
channel.options.set_modes_from_flags(protocol_message.flags)

if protocol_message.has_channel_resumed_flag?
logger.debug { "ChannelManager: Additional resumed ATTACHED message received for #{channel.state} channel '#{channel.name}'" }
else
unless protocol_message.has_channel_resumed_flag?
channel.emit :update, Ably::Models::ChannelStateChange.new(
current: channel.state,
previous: channel.state,
event: Ably::Realtime::Channel::EVENT(:update),
reason: protocol_message.error,
resumed: false,
)
update_presence_sync_state_following_attached protocol_message
channel.presence.manager.on_attach protocol_message.has_presence_flag?
end
end

Expand Down Expand Up @@ -254,14 +253,6 @@ def send_state_change_protocol_message(new_state, state_if_failed, message_optio
)
end

def update_presence_sync_state_following_attached(attached_protocol_message)
if attached_protocol_message.has_presence_flag?
channel.presence.manager.sync_expected
else
channel.presence.manager.sync_not_expected
end
end

def logger
connection.logger
end
Expand Down
17 changes: 14 additions & 3 deletions lib/ably/realtime/presence.rb
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,14 @@ def enter_client(client_id, data = nil, &success_block)
send_presence_action_for_client(Ably::Models::PresenceMessage::ACTION.Enter, client_id, data, &success_block)
end

# @api private
def enter_client_with_id(id, client_id, data = nil, &success_block)
ensure_supported_client_id client_id
ensure_supported_payload data

send_presence_action_for_client(Ably::Models::PresenceMessage::ACTION.Enter, client_id, data, id, &success_block)
end

# Leave this client from this channel. This client will be removed from the presence
# set and presence subscribers will see a leave message for this client.
#
Expand Down Expand Up @@ -338,8 +346,11 @@ def sync_complete?

private
# @return [Ably::Models::PresenceMessage] presence message is returned allowing callbacks to be added
def send_presence_protocol_message(presence_action, client_id, data)
def send_presence_protocol_message(presence_action, client_id, data, id = nil)
presence_message = create_presence_message(presence_action, client_id, data)
unless id.nil?
presence_message.id = id
end
unless presence_message.client_id
raise Ably::Exceptions::Standard.new('Unable to enter create presence message without a client_id', 400, Ably::Exceptions::Codes::UNABLE_TO_ENTER_PRESENCE_CHANNEL_NO_CLIENTID)
end
Expand Down Expand Up @@ -433,13 +444,13 @@ def deferrable_fail(deferrable, *args, &block)
deferrable
end

def send_presence_action_for_client(action, client_id, data, &success_block)
def send_presence_action_for_client(action, client_id, data, id = nil, &success_block)
requirements_failed_deferrable = ensure_presence_publishable_on_connection_deferrable
return requirements_failed_deferrable if requirements_failed_deferrable

deferrable = create_deferrable
ensure_channel_attached(deferrable) do
send_presence_protocol_message(action, client_id, data).tap do |protocol_message|
send_presence_protocol_message(action, client_id, data, id).tap do |protocol_message|
protocol_message.callback { |message| deferrable_succeed deferrable, &success_block }
protocol_message.errback { |error| deferrable_fail deferrable, error }
end
Expand Down
91 changes: 26 additions & 65 deletions lib/ably/realtime/presence/members_map.rb
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ def each(&block)
# and thus the responsibility of this library to re-enter on the channel automatically if the
# channel loses continuity
#
# @return [Array<PresenceMessage>]
# @return [Hash<String, PresenceMessage>]
# @api private
def local_members
@local_members
Expand Down Expand Up @@ -203,23 +203,14 @@ def setup_event_handlers
update_members_and_emit_events presence_message
end

# RTP5a
channel.unsafe_on(:failed, :detached) do
reset_members
reset_local_members
end

resume_sync_proc = method(:resume_sync).to_proc

unsafe_on(:sync_starting) do
@sync_session_id += 1

channel.unsafe_once(:attached) do
connection.on_resume(&resume_sync_proc)
end

unsafe_once(:sync_complete, :failed) do
connection.off_resume(&resume_sync_proc)
end
end

unsafe_on(:sync_none) do
Expand All @@ -233,60 +224,31 @@ def setup_event_handlers
clean_up_members_not_present_after_sync
change_state :sync_complete
end

unsafe_on(:sync_complete) do
update_local_member_state
end
end

# Listen for events that change the PresenceMap state and thus
# need to be replicated to the local member set
def update_local_member_state
new_local_members = members.select do |member_key, member|
member.fetch(:message).connection_id == connection.id
end.each_with_object({}) do |(member_key, member), hash_object|
hash_object[member_key] = member.fetch(:message)
end

@local_members.reject do |member_key, message|
new_local_members.keys.include?(member_key)
end.each do |member_key, message|
re_enter_local_member_missing_from_presence_map message
end

@local_members = new_local_members
end

def re_enter_local_member_missing_from_presence_map(presence_message)
local_client_id = presence_message.client_id || client.auth.client_id
logger.debug { "#{self.class.name}: Manually re-entering local presence member, client ID: #{local_client_id} with data: #{presence_message.data}" }
presence.enter_client(local_client_id, presence_message.data).tap do |deferrable|
deferrable.errback do |error|
presence_message_client_id = presence_message.client_id || client.auth.client_id
re_enter_error = Ably::Models::ErrorInfo.new(
message: "unable to automatically re-enter presence channel for client_id '#{presence_message_client_id}'. Source error code #{error.code} and message '#{error.message}'",
code: Ably::Exceptions::Codes::UNABLE_TO_AUTOMATICALLY_REENTER_PRESENCE_CHANNEL
)
channel.emit :update, Ably::Models::ChannelStateChange.new(
current: channel.state,
previous: channel.state,
event: Ably::Realtime::Channel::EVENT(:update),
reason: re_enter_error,
resumed: true
)
def enter_local_members
local_members.values.each do |member|
local_client_id = member.client_id || client.auth.client_id
logger.debug { "#{self.class.name}: Manually re-entering local presence member, client ID: #{local_client_id} with data: #{member.data}" }
presence.enter_client_with_id(member.id, local_client_id, member.data).tap do |deferrable|
deferrable.errback do |error|
presence_message_client_id = member.client_id || client.auth.client_id
re_enter_error = Ably::Models::ErrorInfo.new(
message: "unable to automatically re-enter presence channel for client_id '#{presence_message_client_id}'. Source error code #{error.code} and message '#{error.message}'",
code: Ably::Exceptions::Codes::UNABLE_TO_AUTOMATICALLY_REENTER_PRESENCE_CHANNEL
)
channel.emit :update, Ably::Models::ChannelStateChange.new(
current: channel.state,
previous: channel.state,
event: Ably::Realtime::Channel::EVENT(:update),
reason: re_enter_error,
resumed: true
)
end
end
end
end

# Trigger a manual SYNC operation to resume member synchronisation from last known cursor position
def resume_sync
connection.send_protocol_message(
action: Ably::Models::ProtocolMessage::ACTION.Sync.to_i,
channel: channel.name,
channel_serial: sync_serial
) if channel.attached?
end

def update_members_and_emit_events(presence_message)
return unless ensure_presence_message_is_valid(presence_message)

Expand Down Expand Up @@ -384,17 +346,16 @@ def touch_presence_member(presence_message)
def member_set_upsert(presence_message, present)
members[presence_message.member_key] = { present: present, message: presence_message, sync_session_id: sync_session_id }
if presence_message.connection_id == connection.id
local_members[presence_message.member_key] = presence_message
logger.debug { "#{self.class.name}: Local member '#{presence_message.member_key}' added" }
local_members[presence_message.client_id] = presence_message
logger.debug { "#{self.class.name}: Local member '#{presence_message.client_id}' added" }
end
end

def member_set_delete(presence_message)
members.delete presence_message.member_key
if sync_complete?
# If not in SYNC, then local members missing may need to be re-entered
# Let #update_local_member_state handle missing members
local_members.delete presence_message.member_key
if sync_complete? and presence_message.connection_id == connection.id
local_members.delete presence_message.client_id
logger.debug { "#{self.class.name}: Local member '#{presence_message.client_id}' deleted" }
end
end

Expand Down
28 changes: 11 additions & 17 deletions lib/ably/realtime/presence/presence_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,18 @@ def initialize(presence)
setup_channel_event_handlers
end

# Expect SYNC ProtocolMessages from the server with a list of current members on this channel
#
# @return [void]
#
# @api private
def sync_expected
presence.members.change_state :sync_starting
def on_attach(has_presence_flag)
if has_presence_flag
# Expect SYNC ProtocolMessages from the server with a list of current members on this channel
presence.members.change_state :sync_starting
else
# There server has indicated that there are no SYNC ProtocolMessages to come because
# there are no members on this channel
logger.debug { "#{self.class.name}: Emitting leave events for all members as a SYNC is not expected and thus there are no members on the channel" }
presence.members.change_state :sync_none
end
presence.members.send(:enter_local_members)
end

# Process presence messages from SYNC messages. Sync can be server-initiated or triggered following ATTACH
Expand All @@ -47,17 +52,6 @@ def sync_process_messages(serial, presence_messages)
presence.members.change_state :finalizing_sync if presence.members.sync_serial_cursor_at_end?
end

# There server has indicated that there are no SYNC ProtocolMessages to come because
# there are no members on this channel
#
# @return [void]
#
# @api private
def sync_not_expected
logger.debug { "#{self.class.name}: Emitting leave events for all members as a SYNC is not expected and thus there are no members on the channel" }
presence.members.change_state :sync_none
end

private
def_delegators :presence, :members, :channel

Expand Down

0 comments on commit 5b0a006

Please sign in to comment.