Skip to content

Commit

Permalink
Implemented create_recovery_key method on connection. Implemented
Browse files Browse the repository at this point in the history
1. Retrieving value using create_recovery_key and setting msgserial, channel
serials and recover key on connection
2. Handled resume/recover success/failure in connection manager.
3. Deprecated old recovery_key method on connection
  • Loading branch information
sacOO7 committed Jun 6, 2024
1 parent a0114b9 commit 4ba3917
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 33 deletions.
20 changes: 20 additions & 0 deletions lib/ably/realtime/channels.rb
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,26 @@ def release(channel)
@channels.delete(channel)
end if @channels.has_key?(channel)
end

# Sets channel serial to each channel from given serials hashmap
# @param [Hash] serials - map of channel name to respective channel serial
# @api private
def set_channel_serials(serials)
serials.each do |channel_name, channel_serial|
channels[channel_name].properties.channel_serial = channel_serial
end
end

# @return [Hash] serials - map of channel name to respective channel serial
# @api private
def get_channel_serials
channel_serials = {}
self.each do |channel|
channel_serials[channel.name] = channel.properties.channel_serial if channel.state == :attached
end
channel_serials
end

end
end
end
16 changes: 12 additions & 4 deletions lib/ably/realtime/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -120,15 +120,23 @@ def initialize(options)
acc[key.to_s] = value.to_s
end
@rest_client = Ably::Rest::Client.new(options.merge(realtime_client: self))
@echo_messages = rest_client.options.fetch(:echo_messages, true) == false ? false : true
@queue_messages = rest_client.options.fetch(:queue_messages, true) == false ? false : true
@echo_messages = rest_client.options.fetch(:echo_messages, true)
@queue_messages = rest_client.options.fetch(:queue_messages, true)
@custom_realtime_host = rest_client.options[:realtime_host] || rest_client.options[:ws_host]
@auto_connect = rest_client.options.fetch(:auto_connect, true) == false ? false : true
@recover = rest_client.options[:recover]
@auto_connect = rest_client.options.fetch(:auto_connect, true)
@recover = rest_client.options.fetch(:recover, '')

@auth = Ably::Realtime::Auth.new(self)
@channels = Ably::Realtime::Channels.new(self)
@connection = Ably::Realtime::Connection.new(self, options)

unless @recover.empty?
recovery_context = RecoveryKeyContext.from_json(@recover, logger)
unless recovery_context.nil?
@channels.set_channel_serials recovery_context.channel_serials # RTN16j
@connection.set_msg_serial_from_recover = recovery_context.msg_serial # RTN16f
end
end
end

# Return a {Ably::Realtime::Channel Realtime Channel} for the given name
Expand Down
51 changes: 39 additions & 12 deletions lib/ably/realtime/connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,8 @@ def initialize(client, options)
@manager = ConnectionManager.new(self)

@current_host = client.endpoint.host

reset_client_msg_serial
end

# Causes the connection to close, entering the {Ably::Realtime::Connection::STATE} CLOSING state.
Expand Down Expand Up @@ -323,18 +325,35 @@ def internet_up?
end
end

# The recovery key string can be used by another client to recover this connection's state in the recover client options property. See connection state recover options for more information.
# The recovery key string can be used by another client to recover this connection's state in the
# recover client options property. See connection state recover options for more information.
#
# @spec RTN16b, RTN16c
#
# @return [String]
# @deprecated Use {#create_recovery_key} instead
#
def recovery_key
"will be implemented"
logger.warn "[DEPRECATION] recovery_key is deprecated, use create_recovery_key method instead"
create_recovery_key
end

# Following a new connection being made, when connection key is sent
# connection id need to match with the details provided by the server.
# The recovery key string can be used by another client to recover this connection's state in the recover client
# options property. See connection state recover options for more information.
#
# @spec RTN16g, RTN16c
#
# @return [String] a json string which incorporates the @connectionKey@, the current @msgSerial@ and collection
# of pairs of channel @name@ and current @channelSerial@ for every currently attached channel
def create_recovery_key
if key.nil? || key.empty? || state == :closing || state == :closed || state == :failed || state == :suspended
return "" #RTN16g2
end
Ably::Modules::RecoveryKeyContext.to_json(key, message_serial, client.channels.get_channel_serials)
end

# Following a new connection being made, the connection ID, connection key
# need to match the details provided by the server.
#
# @return [void]
# @api private
Expand Down Expand Up @@ -439,7 +458,7 @@ def create_websocket_transport
url_params = auth_params.merge(
'format' => client.protocol,
'echo' => client.echo_messages,
'v' => Ably::PROTOCOL_VERSION,
'v' => Ably::PROTOCOL_VERSION, # RSC7a
'agent' => client.rest_client.agent
)

Expand All @@ -453,13 +472,15 @@ def create_websocket_transport
url_params['clientId'] = client.auth.client_id if client.auth.has_client_id?
url_params.merge!(client.transport_params)

if connection_resumable?
puts "this is will be implemented as per spec"
elsif connection_recoverable?
puts "this is will be implemented as per spec"
logger.debug { "Recovering connection with key #{client.recover}" }
unsafe_once(:connected, :closed, :failed) do
client.disable_automatic_connection_recovery
if not Ably::Util::String.is_null_or_empty(key)
url_params.merge! resume: key
logger.debug { "Resuming connection with key #{key}" }
elsif not Ably::Util::String.is_null_or_empty(client.recover)
recovery_context = RecoveryKeyContext.from_json(client.recover, logger)
unless recovery_context.nil?
key = recovery_context.connection_key
logger.debug { "Recovering connection with key #{key}" }
url_params.merge! recover: key
end
end

Expand Down Expand Up @@ -566,6 +587,12 @@ def reset_client_msg_serial
@client_msg_serial = -1
end

# Sets the client message serial from recover clientOption.
# @api private
def set_msg_serial_from_recover=(value)
@client_msg_serial = value
end

# When a hearbeat or any other message from Ably is received
# we know it's alive, see #RTN23
# @api private
Expand Down
34 changes: 17 additions & 17 deletions lib/ably/realtime/connection/connection_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ def initialize(connection)
@connection = connection
@timers = Hash.new { |hash, key| hash[key] = [] }

connection.unsafe_on(:closed) do
# RTN8c, RTN9c
connection.unsafe_on(:closing, :closed, :suspended, :failed) do
connection.reset_resume_info
end

Expand Down Expand Up @@ -111,23 +112,30 @@ def connected(protocol_message)
# Update the connection details and any associated defaults
connection.set_connection_details protocol_message.connection_details

is_connection_resume_or_recover_attempt = !Ably::Util::String.is_null_or_empty(connection.key) ||
!Ably::Util::String.is_null_or_empty(client.recover)

# RTN15c7, RTN16d
failed_resume_or_recover = !protocol_message.connection_id == connection.id && !protocol_message.error.nil?
if is_connection_resume_or_recover_attempt and failed_resume_or_recover # RTN15c7
connection.reset_client_msg_serial
end
client.disable_automatic_connection_recovery # RTN16k, explicitly setting null, so it won't be used for subsequent connection requests

if connection.key
if protocol_message.connection_id == connection.id
logger.debug { "ConnectionManager: Connection resumed successfully - ID #{connection.id} and key #{connection.key}" }
EventMachine.next_tick { connection.trigger_resumed }
resend_pending_message_ack_queue
else
logger.debug { "ConnectionManager: Connection was not resumed, old connection ID #{connection.id} has been updated with new connection ID #{protocol_message.connection_id} and key #{protocol_message.connection_details.connection_key}" }
nack_messages_on_all_channels protocol_message.error
force_reattach_on_channels protocol_message.error
end
else
logger.debug { "ConnectionManager: New connection created with ID #{protocol_message.connection_id} and key #{protocol_message.connection_details.connection_key}" }
end

reattach_suspended_channels protocol_message.error

connection.configure_new protocol_message.connection_id, protocol_message.connection_details.connection_key
force_reattach_on_channels protocol_message.error # irrespective of connection success/failure, reattach channels
end

# When connection is CONNECTED and receives an update
Expand Down Expand Up @@ -568,20 +576,12 @@ def currently_renewing_token?
client.auth.authorization_in_flight?
end

def reattach_suspended_channels(error)
channels.select do |channel|
channel.suspended?
end.each do |channel|
channel.transition_state_machine :attaching
end
end

# When continuity on a connection is lost all messages
# Channels in the ATTACHED or ATTACHING state should explicitly be re-attached
# by sending a new ATTACH to Ably
# When reconnected if channel is in ATTACHING, ATTACHED or SUSPENDED state
# it should explicitly be re-attached by sending a new ATTACH to Ably
# Spec : RTN15c6, RTN15c7
def force_reattach_on_channels(error)
channels.select do |channel|
channel.attached? || channel.attaching?
channel.attached? || channel.attaching? || channel.suspended?
end.each do |channel|
channel.manager.request_reattach reason: error
end
Expand Down
9 changes: 9 additions & 0 deletions lib/ably/util/string.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# frozen_string_literal: true

module Ably::Util
module String
def self.is_null_or_empty(str)
str.nil? || str.empty?
end
end
end

0 comments on commit 4ba3917

Please sign in to comment.