Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ECO-4686][Protocol-2] Implement internal presence #410

Merged
merged 7 commits into from
Jul 3, 2024
19 changes: 5 additions & 14 deletions lib/ably/realtime/channel/channel_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ def detach(error, previous_state)
# Channel is attached, notify presence if sync is expected
def attached(attached_protocol_message)
# If no attached ProtocolMessage then this attached request was triggered by the client
# library, such as returning to attached whne detach has failed
# library, such as returning to attached when detach has failed
if attached_protocol_message
update_presence_sync_state_following_attached attached_protocol_message
channel.presence.manager.on_attach attached_protocol_message.has_presence_flag?
channel.properties.set_attach_serial(attached_protocol_message.channel_serial)
channel.options.set_modes_from_flags(attached_protocol_message.flags)
channel.options.set_params(attached_protocol_message.params)
Expand All @@ -60,6 +60,7 @@ def request_reattach(options = {})
end

def duplicate_attached_received(protocol_message)
logger.debug { "Server initiated ATTACHED message received for channel '#{channel.name}' with state #{channel.state}" }
if protocol_message.error
channel.set_channel_error_reason protocol_message.error
log_channel_error protocol_message.error
Expand All @@ -68,17 +69,15 @@ def duplicate_attached_received(protocol_message)
channel.properties.set_attach_serial(protocol_message.channel_serial)
channel.options.set_modes_from_flags(protocol_message.flags)

if protocol_message.has_channel_resumed_flag?
logger.debug { "ChannelManager: Additional resumed ATTACHED message received for #{channel.state} channel '#{channel.name}'" }
else
unless protocol_message.has_channel_resumed_flag?
channel.emit :update, Ably::Models::ChannelStateChange.new(
current: channel.state,
previous: channel.state,
event: Ably::Realtime::Channel::EVENT(:update),
reason: protocol_message.error,
resumed: false,
)
update_presence_sync_state_following_attached protocol_message
channel.presence.manager.on_attach protocol_message.has_presence_flag?
end
end

Expand Down Expand Up @@ -254,14 +253,6 @@ def send_state_change_protocol_message(new_state, state_if_failed, message_optio
)
end

def update_presence_sync_state_following_attached(attached_protocol_message)
if attached_protocol_message.has_presence_flag?
channel.presence.manager.sync_expected
else
channel.presence.manager.sync_not_expected
end
end

def logger
connection.logger
end
Expand Down
17 changes: 14 additions & 3 deletions lib/ably/realtime/presence.rb
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,14 @@ def enter_client(client_id, data = nil, &success_block)
send_presence_action_for_client(Ably::Models::PresenceMessage::ACTION.Enter, client_id, data, &success_block)
end

# @api private
def enter_client_with_id(id, client_id, data = nil, &success_block)
ensure_supported_client_id client_id
ensure_supported_payload data

send_presence_action_for_client(Ably::Models::PresenceMessage::ACTION.Enter, client_id, data, id, &success_block)
end

# Leave this client from this channel. This client will be removed from the presence
# set and presence subscribers will see a leave message for this client.
#
Expand Down Expand Up @@ -338,8 +346,11 @@ def sync_complete?

private
# @return [Ably::Models::PresenceMessage] presence message is returned allowing callbacks to be added
def send_presence_protocol_message(presence_action, client_id, data)
def send_presence_protocol_message(presence_action, client_id, data, id = nil)
presence_message = create_presence_message(presence_action, client_id, data)
unless id.nil?
presence_message.id = id
end
unless presence_message.client_id
raise Ably::Exceptions::Standard.new('Unable to enter create presence message without a client_id', 400, Ably::Exceptions::Codes::UNABLE_TO_ENTER_PRESENCE_CHANNEL_NO_CLIENTID)
end
Expand Down Expand Up @@ -433,13 +444,13 @@ def deferrable_fail(deferrable, *args, &block)
deferrable
end

def send_presence_action_for_client(action, client_id, data, &success_block)
def send_presence_action_for_client(action, client_id, data, id = nil, &success_block)
requirements_failed_deferrable = ensure_presence_publishable_on_connection_deferrable
return requirements_failed_deferrable if requirements_failed_deferrable

deferrable = create_deferrable
ensure_channel_attached(deferrable) do
send_presence_protocol_message(action, client_id, data).tap do |protocol_message|
send_presence_protocol_message(action, client_id, data, id).tap do |protocol_message|
protocol_message.callback { |message| deferrable_succeed deferrable, &success_block }
protocol_message.errback { |error| deferrable_fail deferrable, error }
end
Expand Down
127 changes: 39 additions & 88 deletions lib/ably/realtime/presence/members_map.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class MembersMap
:sync_starting, # Indicates the client is waiting for SYNC ProtocolMessages from Ably
:sync_none, # Indicates the ATTACHED ProtocolMessage had no presence flag and thus no members on the channel
:finalizing_sync,
:in_sync,
:sync_complete, # Indicates completion of server initiated sync
:failed
)
include Ably::Modules::StateEmitter
Expand All @@ -49,16 +49,6 @@ def initialize(presence)
setup_event_handlers
end

# When attaching to a channel that has members present, the server
# initiates a sync automatically so that the client has a complete list of members.
#
# Until this sync is complete, this method returns false
#
# @return [Boolean]
def sync_complete?
in_sync?
end

# Update the SYNC serial from the ProtocolMessage so that SYNC can be resumed.
# If the serial is nil, or the part after the first : is empty, then the SYNC is complete
#
Expand Down Expand Up @@ -110,27 +100,27 @@ def get(options = {}, &block)
# Must be defined before subsequent procs reference this callback
reset_callbacks = nil

in_sync_callback = lambda do
sync_complete_callback = lambda do
reset_callbacks.call if reset_callbacks
result_block.call
end

failed_callback = lambda do |error|
sync_failed_callback = lambda do |error|
reset_callbacks.call if reset_callbacks
deferrable.fail error
end

reset_callbacks = lambda do
off(&in_sync_callback)
off(&failed_callback)
channel.off(&failed_callback)
off(&sync_complete_callback)
off(&sync_failed_callback)
channel.off(&sync_failed_callback)
end

unsafe_once(:in_sync, &in_sync_callback)
unsafe_once(:failed, &failed_callback)
unsafe_once(:sync_complete, &sync_complete_callback)
unsafe_once(:failed, &sync_failed_callback)

channel.unsafe_once(:detaching, :detached, :failed) do |error_reason|
failed_callback.call error_reason
sync_failed_callback.call error_reason
end
end

Expand All @@ -156,7 +146,7 @@ def each(&block)
# and thus the responsibility of this library to re-enter on the channel automatically if the
# channel loses continuity
#
# @return [Array<PresenceMessage>]
# @return [Hash<String, PresenceMessage>]
# @api private
def local_members
@local_members
Expand Down Expand Up @@ -213,23 +203,14 @@ def setup_event_handlers
update_members_and_emit_events presence_message
end

# RTP5a
channel.unsafe_on(:failed, :detached) do
reset_members
reset_local_members
end

resume_sync_proc = method(:resume_sync).to_proc

unsafe_on(:sync_starting) do
@sync_session_id += 1

channel.unsafe_once(:attached) do
connection.on_resume(&resume_sync_proc)
end

unsafe_once(:in_sync, :failed) do
connection.off_resume(&resume_sync_proc)
end
end

unsafe_on(:sync_none) do
Expand All @@ -240,63 +221,34 @@ def setup_event_handlers

unsafe_on(:finalizing_sync) do
clean_up_absent_members
clean_up_members_not_present_in_sync
change_state :in_sync
end

unsafe_on(:in_sync) do
update_local_member_state
end
end

# Listen for events that change the PresenceMap state and thus
# need to be replicated to the local member set
def update_local_member_state
new_local_members = members.select do |member_key, member|
member.fetch(:message).connection_id == connection.id
end.each_with_object({}) do |(member_key, member), hash_object|
hash_object[member_key] = member.fetch(:message)
end

@local_members.reject do |member_key, message|
new_local_members.keys.include?(member_key)
end.each do |member_key, message|
re_enter_local_member_missing_from_presence_map message
clean_up_members_not_present_after_sync
change_state :sync_complete
end

@local_members = new_local_members
end

def re_enter_local_member_missing_from_presence_map(presence_message)
local_client_id = presence_message.client_id || client.auth.client_id
logger.debug { "#{self.class.name}: Manually re-entering local presence member, client ID: #{local_client_id} with data: #{presence_message.data}" }
presence.enter_client(local_client_id, presence_message.data).tap do |deferrable|
deferrable.errback do |error|
presence_message_client_id = presence_message.client_id || client.auth.client_id
re_enter_error = Ably::Models::ErrorInfo.new(
message: "unable to automatically re-enter presence channel for client_id '#{presence_message_client_id}'. Source error code #{error.code} and message '#{error.message}'",
code: Ably::Exceptions::Codes::UNABLE_TO_AUTOMATICALLY_REENTER_PRESENCE_CHANNEL
)
channel.emit :update, Ably::Models::ChannelStateChange.new(
current: channel.state,
previous: channel.state,
event: Ably::Realtime::Channel::EVENT(:update),
reason: re_enter_error,
resumed: true
)
def enter_local_members
local_members.values.each do |member|
local_client_id = member.client_id || client.auth.client_id
lawrence-forooghian marked this conversation as resolved.
Show resolved Hide resolved
logger.debug { "#{self.class.name}: Manually re-entering local presence member, client ID: #{local_client_id} with data: #{member.data}" }
presence.enter_client_with_id(member.id, local_client_id, member.data).tap do |deferrable|
deferrable.errback do |error|
presence_message_client_id = member.client_id || client.auth.client_id
re_enter_error = Ably::Models::ErrorInfo.new(
message: "unable to automatically re-enter presence channel for client_id '#{presence_message_client_id}'. Source error code #{error.code} and message '#{error.message}'",
code: Ably::Exceptions::Codes::UNABLE_TO_AUTOMATICALLY_REENTER_PRESENCE_CHANNEL
)
channel.emit :update, Ably::Models::ChannelStateChange.new(
current: channel.state,
previous: channel.state,
event: Ably::Realtime::Channel::EVENT(:update),
reason: re_enter_error,
resumed: true
)
end
end
end
end

# Trigger a manual SYNC operation to resume member synchronisation from last known cursor position
def resume_sync
connection.send_protocol_message(
action: Ably::Models::ProtocolMessage::ACTION.Sync.to_i,
channel: channel.name,
channel_serial: sync_serial
) if channel.attached?
end

def update_members_and_emit_events(presence_message)
return unless ensure_presence_message_is_valid(presence_message)

Expand Down Expand Up @@ -375,7 +327,7 @@ def add_presence_member(presence_message)
def remove_presence_member(presence_message)
logger.debug { "#{self.class.name}: Member '#{presence_message.member_key}' removed.\n#{presence_message.to_json}" }

if in_sync?
if sync_complete?
member_set_delete presence_message
else
member_set_upsert presence_message, false
Expand All @@ -394,17 +346,16 @@ def touch_presence_member(presence_message)
def member_set_upsert(presence_message, present)
members[presence_message.member_key] = { present: present, message: presence_message, sync_session_id: sync_session_id }
if presence_message.connection_id == connection.id
local_members[presence_message.member_key] = presence_message
logger.debug { "#{self.class.name}: Local member '#{presence_message.member_key}' added" }
local_members[presence_message.client_id] = presence_message
logger.debug { "#{self.class.name}: Local member '#{presence_message.client_id}' added" }
end
end

def member_set_delete(presence_message)
members.delete presence_message.member_key
if in_sync?
# If not in SYNC, then local members missing may need to be re-entered
# Let #update_local_member_state handle missing members
local_members.delete presence_message.member_key
if sync_complete? and presence_message.connection_id == connection.id
local_members.delete presence_message.client_id
logger.debug { "#{self.class.name}: Local member '#{presence_message.client_id}' deleted" }
end
end

Expand All @@ -431,7 +382,7 @@ def clean_up_absent_members
end
end

def clean_up_members_not_present_in_sync
def clean_up_members_not_present_after_sync
members.select do |member_key, member|
member.fetch(:sync_session_id) != sync_session_id
end.each do |member_key, member|
Expand Down
28 changes: 11 additions & 17 deletions lib/ably/realtime/presence/presence_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,18 @@ def initialize(presence)
setup_channel_event_handlers
end

# Expect SYNC ProtocolMessages from the server with a list of current members on this channel
#
# @return [void]
#
# @api private
def sync_expected
presence.members.change_state :sync_starting
def on_attach(has_presence_flag)
if has_presence_flag
# Expect SYNC ProtocolMessages from the server with a list of current members on this channel
presence.members.change_state :sync_starting
else
# There server has indicated that there are no SYNC ProtocolMessages to come because
# there are no members on this channel
logger.debug { "#{self.class.name}: Emitting leave events for all members as a SYNC is not expected and thus there are no members on the channel" }
presence.members.change_state :sync_none
end
presence.members.send(:enter_local_members)
lawrence-forooghian marked this conversation as resolved.
Show resolved Hide resolved
end

# Process presence messages from SYNC messages. Sync can be server-initiated or triggered following ATTACH
Expand All @@ -47,17 +52,6 @@ def sync_process_messages(serial, presence_messages)
presence.members.change_state :finalizing_sync if presence.members.sync_serial_cursor_at_end?
end

# There server has indicated that there are no SYNC ProtocolMessages to come because
# there are no members on this channel
#
# @return [void]
#
# @api private
def sync_not_expected
logger.debug { "#{self.class.name}: Emitting leave events for all members as a SYNC is not expected and thus there are no members on the channel" }
presence.members.change_state :sync_none
end

private
def_delegators :presence, :members, :channel

Expand Down
Loading
Loading