Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ECO-4686][Protocol-2] Implement internal presence #410

Merged
merged 7 commits into from
Jul 3, 2024
19 changes: 5 additions & 14 deletions lib/ably/realtime/channel/channel_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ def detach(error, previous_state)
# Channel is attached, notify presence if sync is expected
def attached(attached_protocol_message)
# If no attached ProtocolMessage then this attached request was triggered by the client
# library, such as returning to attached whne detach has failed
# library, such as returning to attached when detach has failed
if attached_protocol_message
update_presence_sync_state_following_attached attached_protocol_message
channel.presence.manager.on_attach attached_protocol_message.has_presence_flag?
channel.properties.set_attach_serial(attached_protocol_message.channel_serial)
channel.options.set_modes_from_flags(attached_protocol_message.flags)
channel.options.set_params(attached_protocol_message.params)
Expand All @@ -60,6 +60,7 @@ def request_reattach(options = {})
end

def duplicate_attached_received(protocol_message)
logger.debug { "Server initiated ATTACHED message received for channel '#{channel.name}' with state #{channel.state}" }
if protocol_message.error
channel.set_channel_error_reason protocol_message.error
log_channel_error protocol_message.error
Expand All @@ -68,17 +69,15 @@ def duplicate_attached_received(protocol_message)
channel.properties.set_attach_serial(protocol_message.channel_serial)
channel.options.set_modes_from_flags(protocol_message.flags)

if protocol_message.has_channel_resumed_flag?
logger.debug { "ChannelManager: Additional resumed ATTACHED message received for #{channel.state} channel '#{channel.name}'" }
else
unless protocol_message.has_channel_resumed_flag?
channel.emit :update, Ably::Models::ChannelStateChange.new(
current: channel.state,
previous: channel.state,
event: Ably::Realtime::Channel::EVENT(:update),
reason: protocol_message.error,
resumed: false,
)
update_presence_sync_state_following_attached protocol_message
channel.presence.manager.on_attach protocol_message.has_presence_flag?
end
end

Expand Down Expand Up @@ -254,14 +253,6 @@ def send_state_change_protocol_message(new_state, state_if_failed, message_optio
)
end

def update_presence_sync_state_following_attached(attached_protocol_message)
if attached_protocol_message.has_presence_flag?
channel.presence.manager.sync_expected
else
channel.presence.manager.sync_not_expected
end
end

def logger
connection.logger
end
Expand Down
22 changes: 0 additions & 22 deletions lib/ably/realtime/connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -532,24 +532,6 @@ def set_connection_details(connection_details)
@details = connection_details
end

# Executes registered callbacks for a successful connection resume event
# @api private
def trigger_resumed
resume_callbacks.each(&:call)
end

# Provides a simple hook to inject a callback when a connection is successfully resumed
# @api private
def on_resume(&callback)
resume_callbacks << callback
end

# Remove a registered connection resume callback
# @api private
def off_resume(&callback)
resume_callbacks.delete(callback)
end

# Returns false if messages cannot be published as a result of message queueing being disabled
# @api private
def can_publish_messages?
Expand Down Expand Up @@ -620,10 +602,6 @@ def client_msg_serial
@client_msg_serial
end

def resume_callbacks
@resume_callbacks ||= []
end

def create_pub_sub_message_bus
Ably::Util::PubSub.new(
coerce_into: lambda do |event|
Expand Down
1 change: 0 additions & 1 deletion lib/ably/realtime/connection/connection_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,6 @@ def connected(protocol_message)
if connection.key
if protocol_message.connection_id == connection.id
logger.debug { "ConnectionManager: Connection resumed successfully - ID #{connection.id} and key #{connection.key}" }
EventMachine.next_tick { connection.trigger_resumed }
resend_pending_message_ack_queue
else
nack_messages_on_all_channels protocol_message.error
Expand Down
21 changes: 15 additions & 6 deletions lib/ably/realtime/presence.rb
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,14 @@ def enter_client(client_id, data = nil, &success_block)
send_presence_action_for_client(Ably::Models::PresenceMessage::ACTION.Enter, client_id, data, &success_block)
end

# @api private
def enter_client_with_id(id, client_id, data = nil, &success_block)
ensure_supported_client_id client_id
ensure_supported_payload data

send_presence_action_for_client(Ably::Models::PresenceMessage::ACTION.Enter, client_id, data, id, &success_block)
end

# Leave this client from this channel. This client will be removed from the presence
# set and presence subscribers will see a leave message for this client.
#
Expand Down Expand Up @@ -338,8 +346,8 @@ def sync_complete?

private
# @return [Ably::Models::PresenceMessage] presence message is returned allowing callbacks to be added
def send_presence_protocol_message(presence_action, client_id, data)
presence_message = create_presence_message(presence_action, client_id, data)
def send_presence_protocol_message(presence_action, client_id, data, id = nil)
presence_message = create_presence_message(presence_action, client_id, data, id)
unless presence_message.client_id
raise Ably::Exceptions::Standard.new('Unable to enter create presence message without a client_id', 400, Ably::Exceptions::Codes::UNABLE_TO_ENTER_PRESENCE_CHANNEL_NO_CLIENTID)
end
Expand All @@ -355,12 +363,13 @@ def send_presence_protocol_message(presence_action, client_id, data)
presence_message
end

def create_presence_message(action, client_id, data)
def create_presence_message(action, client_id, data, id = nil)
model = {
action: Ably::Models::PresenceMessage.ACTION(action).to_i,
clientId: client_id,
data: data
data: data,
}
model[:id] = id unless id.nil?

Ably::Models::PresenceMessage.new(model, logger: logger).tap do |presence_message|
presence_message.encode(client.encoders, channel.options) do |encode_error, error_message|
Expand Down Expand Up @@ -433,13 +442,13 @@ def deferrable_fail(deferrable, *args, &block)
deferrable
end

def send_presence_action_for_client(action, client_id, data, &success_block)
def send_presence_action_for_client(action, client_id, data, id = nil, &success_block)
requirements_failed_deferrable = ensure_presence_publishable_on_connection_deferrable
return requirements_failed_deferrable if requirements_failed_deferrable

deferrable = create_deferrable
ensure_channel_attached(deferrable) do
send_presence_protocol_message(action, client_id, data).tap do |protocol_message|
send_presence_protocol_message(action, client_id, data, id).tap do |protocol_message|
protocol_message.callback { |message| deferrable_succeed deferrable, &success_block }
protocol_message.errback { |error| deferrable_fail deferrable, error }
end
Expand Down
135 changes: 43 additions & 92 deletions lib/ably/realtime/presence/members_map.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class MembersMap
:sync_starting, # Indicates the client is waiting for SYNC ProtocolMessages from Ably
:sync_none, # Indicates the ATTACHED ProtocolMessage had no presence flag and thus no members on the channel
:finalizing_sync,
:in_sync,
:sync_complete, # Indicates completion of server initiated sync
:failed
)
include Ably::Modules::StateEmitter
Expand All @@ -49,16 +49,6 @@ def initialize(presence)
setup_event_handlers
end

# When attaching to a channel that has members present, the server
# initiates a sync automatically so that the client has a complete list of members.
#
# Until this sync is complete, this method returns false
#
# @return [Boolean]
def sync_complete?
in_sync?
end

# Update the SYNC serial from the ProtocolMessage so that SYNC can be resumed.
# If the serial is nil, or the part after the first : is empty, then the SYNC is complete
#
Expand Down Expand Up @@ -110,27 +100,27 @@ def get(options = {}, &block)
# Must be defined before subsequent procs reference this callback
reset_callbacks = nil

in_sync_callback = lambda do
sync_complete_callback = lambda do
reset_callbacks.call if reset_callbacks
result_block.call
end

failed_callback = lambda do |error|
sync_failed_callback = lambda do |error|
reset_callbacks.call if reset_callbacks
deferrable.fail error
end

reset_callbacks = lambda do
off(&in_sync_callback)
off(&failed_callback)
channel.off(&failed_callback)
off(&sync_complete_callback)
off(&sync_failed_callback)
channel.off(&sync_failed_callback)
end

unsafe_once(:in_sync, &in_sync_callback)
unsafe_once(:failed, &failed_callback)
unsafe_once(:sync_complete, &sync_complete_callback)
unsafe_once(:failed, &sync_failed_callback)

channel.unsafe_once(:detaching, :detached, :failed) do |error_reason|
failed_callback.call error_reason
sync_failed_callback.call error_reason
end
end

Expand All @@ -156,12 +146,35 @@ def each(&block)
# and thus the responsibility of this library to re-enter on the channel automatically if the
# channel loses continuity
#
# @return [Array<PresenceMessage>]
# @return [Hash<String, PresenceMessage>]
# @api private
def local_members
@local_members
end

def enter_local_members
local_members.values.each do |member|
local_client_id = member.client_id || client.auth.client_id
logger.debug { "#{self.class.name}: Manually re-entering local presence member, client ID: #{local_client_id} with data: #{member.data}" }
presence.enter_client_with_id(member.id, local_client_id, member.data).tap do |deferrable|
deferrable.errback do |error|
presence_message_client_id = member.client_id || client.auth.client_id
re_enter_error = Ably::Models::ErrorInfo.new(
message: "unable to automatically re-enter presence channel for client_id '#{presence_message_client_id}'. Source error code #{error.code} and message '#{error.message}'",
code: Ably::Exceptions::Codes::UNABLE_TO_AUTOMATICALLY_REENTER_PRESENCE_CHANNEL
)
channel.emit :update, Ably::Models::ChannelStateChange.new(
current: channel.state,
previous: channel.state,
event: Ably::Realtime::Channel::EVENT(:update),
reason: re_enter_error,
resumed: true
)
end
end
end
end

private
attr_reader :sync_session_id

Expand Down Expand Up @@ -213,23 +226,14 @@ def setup_event_handlers
update_members_and_emit_events presence_message
end

# RTP5a
channel.unsafe_on(:failed, :detached) do
reset_members
reset_local_members
end

resume_sync_proc = method(:resume_sync).to_proc

unsafe_on(:sync_starting) do
@sync_session_id += 1

channel.unsafe_once(:attached) do
connection.on_resume(&resume_sync_proc)
end

unsafe_once(:in_sync, :failed) do
connection.off_resume(&resume_sync_proc)
end
end

unsafe_on(:sync_none) do
Expand All @@ -240,61 +244,9 @@ def setup_event_handlers

unsafe_on(:finalizing_sync) do
clean_up_absent_members
clean_up_members_not_present_in_sync
change_state :in_sync
clean_up_members_not_present_after_sync
change_state :sync_complete
end

unsafe_on(:in_sync) do
update_local_member_state
end
end

# Listen for events that change the PresenceMap state and thus
# need to be replicated to the local member set
def update_local_member_state
new_local_members = members.select do |member_key, member|
member.fetch(:message).connection_id == connection.id
end.each_with_object({}) do |(member_key, member), hash_object|
hash_object[member_key] = member.fetch(:message)
end

@local_members.reject do |member_key, message|
new_local_members.keys.include?(member_key)
end.each do |member_key, message|
re_enter_local_member_missing_from_presence_map message
end

@local_members = new_local_members
end

def re_enter_local_member_missing_from_presence_map(presence_message)
local_client_id = presence_message.client_id || client.auth.client_id
logger.debug { "#{self.class.name}: Manually re-entering local presence member, client ID: #{local_client_id} with data: #{presence_message.data}" }
presence.enter_client(local_client_id, presence_message.data).tap do |deferrable|
deferrable.errback do |error|
presence_message_client_id = presence_message.client_id || client.auth.client_id
re_enter_error = Ably::Models::ErrorInfo.new(
message: "unable to automatically re-enter presence channel for client_id '#{presence_message_client_id}'. Source error code #{error.code} and message '#{error.message}'",
code: Ably::Exceptions::Codes::UNABLE_TO_AUTOMATICALLY_REENTER_PRESENCE_CHANNEL
)
channel.emit :update, Ably::Models::ChannelStateChange.new(
current: channel.state,
previous: channel.state,
event: Ably::Realtime::Channel::EVENT(:update),
reason: re_enter_error,
resumed: true
)
end
end
end

# Trigger a manual SYNC operation to resume member synchronisation from last known cursor position
def resume_sync
connection.send_protocol_message(
action: Ably::Models::ProtocolMessage::ACTION.Sync.to_i,
channel: channel.name,
channel_serial: sync_serial
) if channel.attached?
end

def update_members_and_emit_events(presence_message)
Expand Down Expand Up @@ -375,7 +327,7 @@ def add_presence_member(presence_message)
def remove_presence_member(presence_message)
logger.debug { "#{self.class.name}: Member '#{presence_message.member_key}' removed.\n#{presence_message.to_json}" }

if in_sync?
if sync_complete?
member_set_delete presence_message
else
member_set_upsert presence_message, false
Expand All @@ -394,17 +346,16 @@ def touch_presence_member(presence_message)
def member_set_upsert(presence_message, present)
members[presence_message.member_key] = { present: present, message: presence_message, sync_session_id: sync_session_id }
if presence_message.connection_id == connection.id
local_members[presence_message.member_key] = presence_message
logger.debug { "#{self.class.name}: Local member '#{presence_message.member_key}' added" }
local_members[presence_message.client_id] = presence_message
logger.debug { "#{self.class.name}: Local member '#{presence_message.client_id}' added" }
end
end

def member_set_delete(presence_message)
members.delete presence_message.member_key
if in_sync?
# If not in SYNC, then local members missing may need to be re-entered
# Let #update_local_member_state handle missing members
local_members.delete presence_message.member_key
if sync_complete? and presence_message.connection_id == connection.id
local_members.delete presence_message.client_id
logger.debug { "#{self.class.name}: Local member '#{presence_message.client_id}' deleted" }
end
end

Expand All @@ -431,7 +382,7 @@ def clean_up_absent_members
end
end

def clean_up_members_not_present_in_sync
def clean_up_members_not_present_after_sync
members.select do |member_key, member|
member.fetch(:sync_session_id) != sync_session_id
end.each do |member_key, member|
Expand Down
Loading
Loading