Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ECO-4687]Feature/resume recover #402

Closed
wants to merge 36 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
1d0f959
Added a separate class for recoveryKeyContext
sacOO7 May 3, 2024
07c053f
Added test for recovery key context
sacOO7 May 3, 2024
2bdf984
Added methods to encode and decode recovery key context
sacOO7 May 7, 2024
631b1f8
Updated recovery key context tests for decoding recovery key
sacOO7 May 7, 2024
5d82e87
Refactored lib version usage across sdk
sacOO7 May 10, 2024
ab10caa
Added channel_serial attribute to channel properties
sacOO7 May 10, 2024
ffda7c6
Added method to set channel serial to given channel
sacOO7 May 10, 2024
ed6e32b
Added a method to get channel serial
sacOO7 May 10, 2024
34bfde6
bumped up protocol version to 2
sacOO7 May 13, 2024
c729a5a
refactored and renamed provided clientOptions as a part of client
sacOO7 May 15, 2024
373e723
refactored code for recovery key encode / decode
sacOO7 May 15, 2024
ec41c0d
refactored code for retrieving recovery key
sacOO7 May 16, 2024
2883118
Merge branch 'fix/tests' into feature/resume-recover
sacOO7 May 16, 2024
718a0c7
refactored comments on the helper methods
sacOO7 May 16, 2024
d654665
Added test for invalid recovery key
sacOO7 May 16, 2024
148ba15
removed all unnecessary references related to connection serial
sacOO7 May 16, 2024
7df5ef5
Added separate method for creating a recovery key
sacOO7 May 16, 2024
0ad4f8c
Added deprecation warning for recovery key, updated method for
sacOO7 May 16, 2024
aae4a71
updated tests and recovery key related code base
sacOO7 May 17, 2024
ad66ec6
Supplied logger as a param while decoding recovery key
sacOO7 May 17, 2024
997256c
Setting connection resume/recover based on message received
sacOO7 May 17, 2024
cd96769
refactored string check for either nil or empty string
sacOO7 May 20, 2024
13fee33
Setting channel serial as per changed state to detached, failed or su…
sacOO7 May 21, 2024
ce45cbc
Added documentation for channel serial as a part of channel properties
sacOO7 May 21, 2024
37ed012
Setting channel serial from received protocolmessage
sacOO7 May 21, 2024
d36aafb
Updated implementation for has_message_serial
sacOO7 May 21, 2024
e8e7ec7
Added spec comment and logging message while setting channel serial
sacOO7 May 21, 2024
fceb558
Added code to set channel serial as per spec
sacOO7 May 22, 2024
fffad06
refactored code for rest clientOptions
sacOO7 May 22, 2024
79de8ed
update method to retrive channel serial of only attached channels
sacOO7 May 31, 2024
3cb643f
Added right spec annotations to implemented code, fixed protocol vers…
sacOO7 May 31, 2024
1b67120
Added channel serial specific changes marked with corresponding spec ids
sacOO7 Jun 1, 2024
51661d0
updated tests for idempotent rest publishing
sacOO7 Jun 1, 2024
599b840
Updated rest test for checking agent headers
sacOO7 Jun 1, 2024
9b25716
Reset connection key and id when it closes or fails (RTN8c and RTN9c)
sacOO7 Jun 3, 2024
e4b51b7
implemented reattach on channels irrespective of resume success or fa…
sacOO7 Jun 4, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ably.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ require 'ably/version'

Gem::Specification.new do |spec|
spec.name = 'ably'
spec.version = Ably::VERSION
spec.version = Ably::LIB_VERSION
spec.authors = ['Lewis Marshall', "Matthew O'Riordan"]
spec.email = ['[email protected]', '[email protected]']
spec.description = %q{A Ruby client library for ably.io realtime messaging}
Expand Down
2 changes: 1 addition & 1 deletion lib/ably/agent.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
module Ably
AGENT = "ably-ruby/#{Ably::VERSION} ruby/#{RUBY_VERSION}"
AGENT = "ably-ruby/#{Ably::LIB_VERSION} ruby/#{RUBY_VERSION}"
end
31 changes: 3 additions & 28 deletions lib/ably/models/protocol_message.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ module Ably::Models
# @return [String] Contains a serial number for a message on the current channel
# @!attribute [r] connection_id
# @return [String] Contains a string private connection key used to recover this connection
# @!attribute [r] connection_serial
# @return [Bignum] Contains a serial number for a message sent from the server to the client
# @!attribute [r] message_serial
# @return [Bignum] Contains a serial number for a message sent from the client to the server
# @!attribute [r] timestamp
Expand Down Expand Up @@ -129,41 +127,18 @@ def message_serial
raise TypeError, "msg_serial '#{attributes[:msg_serial]}' is invalid, a positive Integer is expected for a ProtocolMessage"
end

def connection_serial
Integer(attributes[:connection_serial])
rescue TypeError
raise TypeError, "connection_serial '#{attributes[:connection_serial]}' is invalid, a positive Integer is expected for a ProtocolMessage"
end

def count
[1, attributes[:count].to_i].max
end

# @api private
def has_message_serial?
message_serial && true
rescue TypeError
false
end

# @api private
def has_connection_serial?
connection_serial && true
rescue TypeError
false
end

def serial
if has_connection_serial?
connection_serial
else
message_serial
end
not Ably::Util::String::is_null_or_empty(message_serial)
end

# @api private
def has_serial?
has_connection_serial? || has_message_serial?
has_message_serial?
end

def messages
Expand Down Expand Up @@ -271,7 +246,7 @@ def attributes
# Return a JSON ready object from the underlying #attributes using Ably naming conventions for keys
def as_json(*args)
raise TypeError, ':action is missing, cannot generate a valid Hash for ProtocolMessage' unless action
raise TypeError, ':msg_serial or :connection_serial is missing, cannot generate a valid Hash for ProtocolMessage' if ack_required? && !has_serial?
raise TypeError, ':msg_serial is missing, cannot generate a valid Hash for ProtocolMessage' if ack_required? && !has_serial?

attributes.dup.tap do |hash_object|
hash_object['action'] = action.to_i
Expand Down
2 changes: 1 addition & 1 deletion lib/ably/modules/http_helpers.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def encode64(text)
end

def user_agent
"Ably Ruby client #{Ably::VERSION} (https://www.ably.io)"
"Ably Ruby client #{Ably::LIB_VERSION} (https://www.ably.io)"
end

def setup_outgoing_middleware(builder)
Expand Down
4 changes: 2 additions & 2 deletions lib/ably/modules/safe_deferrable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def errback(&block)
end
end

# Mark the Deferrable as succeeded and trigger all callbacks.
# Mark the Deferrable as succeeded and trigger all success callbacks.
# See http://www.rubydoc.info/gems/eventmachine/1.0.7/EventMachine/Deferrable#succeed-instance_method
#
# @return [void]
Expand All @@ -48,7 +48,7 @@ def succeed(*args)
super(*args)
end

# Mark the Deferrable as failed and trigger all callbacks.
# Mark the Deferrable as failed and trigger all error callbacks.
# See http://www.rubydoc.info/gems/eventmachine/1.0.7/EventMachine/Deferrable#fail-instance_method
#
# @return [void]
Expand Down
2 changes: 1 addition & 1 deletion lib/ably/modules/state_emitter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module Ably::Modules
# the instance variable @state is used exclusively, the {Enum} STATE is defined prior to inclusion of this
# module, and the class is an {EventEmitter}. It then emits state changes.
#
# It also ensures the EventEmitter is configured to retrict permitted events to the
# It also ensures the EventEmitter is configured to restrict permitted events to the
# the available STATEs or EVENTs if defined i.e. if EVENTs includes an additional type such as
# :update, then it will support all EVENTs being emitted. EVENTs must be a superset of STATEs
#
Expand Down
2 changes: 1 addition & 1 deletion lib/ably/realtime/channel.rb
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class Channel
#
# @spec RTL2b
#
# The permited states for this channel
# The permitted states for this channel
STATE = ruby_enum('STATE',
:initialized,
:attaching,
Expand Down
1 change: 1 addition & 0 deletions lib/ably/realtime/channel/channel_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ def send_attach_protocol_message
message_options[:flags] = message_options[:flags].to_i | Ably::Models::ProtocolMessage::ATTACH_FLAGS_MAPPING[:resume]
end

message_options[:channelSerial] = channel.properties.channel_serial # RTL4c1
send_state_change_protocol_message Ably::Models::ProtocolMessage::ACTION.Attach, :suspended, message_options
end

Expand Down
9 changes: 9 additions & 0 deletions lib/ably/realtime/channel/channel_properties.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,15 @@ class ChannelProperties
#
attr_reader :attach_serial

# ChannelSerial contains the channelSerial from latest ProtocolMessage of action type
# Message/PresenceMessage received on the channel.
#
# @spec CP2b, RTL15b
#
# @return [String]
#
attr_accessor :channel_serial

def initialize(channel)
@channel = channel
end
Expand Down
1 change: 1 addition & 0 deletions lib/ably/realtime/channel/channel_state_machine.rb
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class ChannelStateMachine
end

after_transition(to: [:detached, :failed, :suspended]) do |channel, current_transition|
channel.properties.channel_serial = nil # RTP5a1
err = error_from_state_change(current_transition)
channel.manager.fail_queued_messages(err) if channel.failed? or channel.suspended? #RTL11
channel.manager.log_channel_error err if err
Expand Down
16 changes: 16 additions & 0 deletions lib/ably/realtime/channels.rb
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,22 @@ def release(channel)
@channels.delete(channel)
end if @channels.has_key?(channel)
end

# @param [Hash] serials - map of channel name to respective serial
def set_channel_serials(serials)
serials.each do |channel_name, channel_serial|
channels[channel_name].properties.channel_serial = channel_serial
end
end

def get_channel_serials
channel_serials = {}
self.each do |channel|
channel_serials[channel.name] = channel.properties.channel_serial if channel.state == :attached
end
channel_serials
end

end
end
end
27 changes: 13 additions & 14 deletions lib/ably/realtime/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class Client

# When a recover option is specified a connection inherits the state of a previous connection that may have existed under a different instance of the Realtime library, please refer to the API documentation for further information on connection state recovery
# @return [String,Nil]
attr_reader :recover
attr_accessor :recover
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why was this changed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated at #409


# Additional parameters to be sent in the querystring when initiating a realtime connection
# @return [Hash]
Expand Down Expand Up @@ -120,17 +120,23 @@ def initialize(options)
acc[key.to_s] = value.to_s
end
@rest_client = Ably::Rest::Client.new(options.merge(realtime_client: self))
@echo_messages = rest_client.options.fetch(:echo_messages, true) == false ? false : true
@queue_messages = rest_client.options.fetch(:queue_messages, true) == false ? false : true
@echo_messages = rest_client.options.fetch(:echo_messages, true)
@queue_messages = rest_client.options.fetch(:queue_messages, true)
@custom_realtime_host = rest_client.options[:realtime_host] || rest_client.options[:ws_host]
@auto_connect = rest_client.options.fetch(:auto_connect, true) == false ? false : true
@recover = rest_client.options[:recover]

raise ArgumentError, "Recovery key '#{recover}' is invalid" if recover && !recover.match(Connection::RECOVER_REGEX)
@auto_connect = rest_client.options.fetch(:auto_connect, true)
@recover = rest_client.options.fetch(:recover, '')

@auth = Ably::Realtime::Auth.new(self)
@channels = Ably::Realtime::Channels.new(self)
@connection = Ably::Realtime::Connection.new(self, options)

unless @recover.empty?
recovery_context = RecoveryKeyContext.from_json(@recover, logger)
unless recovery_context.nil?
@channels.set_channel_serials recovery_context.channel_serials # RTN16j
@connection.message_serial = recovery_context.msg_serial # RTN16f
end
end
end

# Return a {Ably::Realtime::Channel Realtime Channel} for the given name
Expand Down Expand Up @@ -302,13 +308,6 @@ def logger
@logger ||= Ably::Logger.new(self, log_level, rest_client.logger.custom_logger)
end

# Disable connection recovery, typically used after a connection has been recovered
# @return [void]
# @api private
def disable_automatic_connection_recovery
@recover = nil
end

# @!attribute [r] fallback_endpoint
# @return [URI::Generic] Fallback endpoint used to connect to the realtime Ably service. Note, after each connection attempt, a new random {Ably::FALLBACK_HOSTS fallback host} or provided fallback hosts are used
# @api private
Expand Down
29 changes: 16 additions & 13 deletions lib/ably/realtime/client/incoming_message_dispatcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,22 @@ def logger

def dispatch_protocol_message(*args)
protocol_message = args.first
# RTL15b
unless protocol_message.nil?
if protocol_message.has_message_serial? &&
(
protocol_message.action == :message ||
protocol_message.action == :presence ||
protocol_message.action == :attached
)

logger.info "Setting channel serial for #{channel.name}"
logger.info "Previous serial #{channel.name}, new serial #{protocol_message.channel_serial}"
get_channel(protocol_message.channel).tap do |channel|
channel.properties.channel_serial = protocol_message.channel_serial
end
end
end

unless protocol_message.kind_of?(Ably::Models::ProtocolMessage)
raise ArgumentError, "Expected a ProtocolMessage. Received #{protocol_message}"
Expand All @@ -47,15 +63,6 @@ def dispatch_protocol_message(*args)
logger.debug { "#{protocol_message.action} received: #{protocol_message}" }
end

if protocol_message.action.match_any?(:sync, :presence, :message)
if connection.serial && protocol_message.has_connection_serial? && protocol_message.connection_serial <= connection.serial
error_message = "Protocol error, duplicate message received for serial #{protocol_message.connection_serial}"
logger.error error_message
return
end
end

update_connection_recovery_info protocol_message
connection.set_connection_confirmed_alive

case protocol_message.action
Expand Down Expand Up @@ -172,10 +179,6 @@ def process_connected_update_message(protocol_message)
end
end

def update_connection_recovery_info(protocol_message)
connection.update_connection_serial protocol_message.connection_serial if protocol_message.has_connection_serial?
end

def ack_pending_queue_for_message_serial(ack_protocol_message)
drop_pending_queue_from_ack(ack_protocol_message) do |protocol_message|
ack_messages protocol_message.messages
Expand Down
Loading
Loading