diff --git a/ably.gemspec b/ably.gemspec index d301cc808..9bbdc9435 100644 --- a/ably.gemspec +++ b/ably.gemspec @@ -5,7 +5,7 @@ require 'ably/version' Gem::Specification.new do |spec| spec.name = 'ably' - spec.version = Ably::VERSION + spec.version = Ably::LIB_VERSION spec.authors = ['Lewis Marshall', "Matthew O'Riordan"] spec.email = ['lewis@lmars.net', 'matt@ably.io'] spec.description = %q{A Ruby client library for ably.io realtime messaging} diff --git a/lib/ably/agent.rb b/lib/ably/agent.rb index 39ea39e09..750027835 100644 --- a/lib/ably/agent.rb +++ b/lib/ably/agent.rb @@ -1,3 +1,3 @@ module Ably - AGENT = "ably-ruby/#{Ably::VERSION} ruby/#{RUBY_VERSION}" + AGENT = "ably-ruby/#{Ably::LIB_VERSION} ruby/#{RUBY_VERSION}" end diff --git a/lib/ably/models/protocol_message.rb b/lib/ably/models/protocol_message.rb index 8490ebc33..ab877e591 100644 --- a/lib/ably/models/protocol_message.rb +++ b/lib/ably/models/protocol_message.rb @@ -20,8 +20,6 @@ module Ably::Models # @return [String] Contains a serial number for a message on the current channel # @!attribute [r] connection_id # @return [String] Contains a string private connection key used to recover this connection - # @!attribute [r] connection_serial - # @return [Bignum] Contains a serial number for a message sent from the server to the client # @!attribute [r] message_serial # @return [Bignum] Contains a serial number for a message sent from the client to the server # @!attribute [r] timestamp @@ -129,41 +127,18 @@ def message_serial raise TypeError, "msg_serial '#{attributes[:msg_serial]}' is invalid, a positive Integer is expected for a ProtocolMessage" end - def connection_serial - Integer(attributes[:connection_serial]) - rescue TypeError - raise TypeError, "connection_serial '#{attributes[:connection_serial]}' is invalid, a positive Integer is expected for a ProtocolMessage" - end - def count [1, attributes[:count].to_i].max end # @api private def has_message_serial? - message_serial && true - rescue TypeError - false - end - - # @api private - def has_connection_serial? - connection_serial && true - rescue TypeError - false - end - - def serial - if has_connection_serial? - connection_serial - else - message_serial - end + not Ably::Util::String::is_null_or_empty(message_serial) end # @api private def has_serial? - has_connection_serial? || has_message_serial? + has_message_serial? end def messages @@ -271,7 +246,7 @@ def attributes # Return a JSON ready object from the underlying #attributes using Ably naming conventions for keys def as_json(*args) raise TypeError, ':action is missing, cannot generate a valid Hash for ProtocolMessage' unless action - raise TypeError, ':msg_serial or :connection_serial is missing, cannot generate a valid Hash for ProtocolMessage' if ack_required? && !has_serial? + raise TypeError, ':msg_serial is missing, cannot generate a valid Hash for ProtocolMessage' if ack_required? && !has_serial? attributes.dup.tap do |hash_object| hash_object['action'] = action.to_i diff --git a/lib/ably/modules/http_helpers.rb b/lib/ably/modules/http_helpers.rb index 82a933b7b..6067d3440 100644 --- a/lib/ably/modules/http_helpers.rb +++ b/lib/ably/modules/http_helpers.rb @@ -18,7 +18,7 @@ def encode64(text) end def user_agent - "Ably Ruby client #{Ably::VERSION} (https://www.ably.io)" + "Ably Ruby client #{Ably::LIB_VERSION} (https://www.ably.io)" end def setup_outgoing_middleware(builder) diff --git a/lib/ably/modules/safe_deferrable.rb b/lib/ably/modules/safe_deferrable.rb index c011b4375..1849544db 100644 --- a/lib/ably/modules/safe_deferrable.rb +++ b/lib/ably/modules/safe_deferrable.rb @@ -39,7 +39,7 @@ def errback(&block) end end - # Mark the Deferrable as succeeded and trigger all callbacks. + # Mark the Deferrable as succeeded and trigger all success callbacks. # See http://www.rubydoc.info/gems/eventmachine/1.0.7/EventMachine/Deferrable#succeed-instance_method # # @return [void] @@ -48,7 +48,7 @@ def succeed(*args) super(*args) end - # Mark the Deferrable as failed and trigger all callbacks. + # Mark the Deferrable as failed and trigger all error callbacks. # See http://www.rubydoc.info/gems/eventmachine/1.0.7/EventMachine/Deferrable#fail-instance_method # # @return [void] diff --git a/lib/ably/modules/state_emitter.rb b/lib/ably/modules/state_emitter.rb index c638cb9a4..2a5f8b516 100644 --- a/lib/ably/modules/state_emitter.rb +++ b/lib/ably/modules/state_emitter.rb @@ -3,7 +3,7 @@ module Ably::Modules # the instance variable @state is used exclusively, the {Enum} STATE is defined prior to inclusion of this # module, and the class is an {EventEmitter}. It then emits state changes. # - # It also ensures the EventEmitter is configured to retrict permitted events to the + # It also ensures the EventEmitter is configured to restrict permitted events to the # the available STATEs or EVENTs if defined i.e. if EVENTs includes an additional type such as # :update, then it will support all EVENTs being emitted. EVENTs must be a superset of STATEs # diff --git a/lib/ably/realtime/channel.rb b/lib/ably/realtime/channel.rb index 5f6cba49c..65b0ddfc5 100644 --- a/lib/ably/realtime/channel.rb +++ b/lib/ably/realtime/channel.rb @@ -42,7 +42,7 @@ class Channel # # @spec RTL2b # - # The permited states for this channel + # The permitted states for this channel STATE = ruby_enum('STATE', :initialized, :attaching, diff --git a/lib/ably/realtime/channel/channel_manager.rb b/lib/ably/realtime/channel/channel_manager.rb index 5b1761087..d10bc66ce 100644 --- a/lib/ably/realtime/channel/channel_manager.rb +++ b/lib/ably/realtime/channel/channel_manager.rb @@ -209,6 +209,7 @@ def send_attach_protocol_message message_options[:flags] = message_options[:flags].to_i | Ably::Models::ProtocolMessage::ATTACH_FLAGS_MAPPING[:resume] end + message_options[:channelSerial] = channel.properties.channel_serial # RTL4c1 send_state_change_protocol_message Ably::Models::ProtocolMessage::ACTION.Attach, :suspended, message_options end diff --git a/lib/ably/realtime/channel/channel_properties.rb b/lib/ably/realtime/channel/channel_properties.rb index 26ddf2622..60f1911a8 100644 --- a/lib/ably/realtime/channel/channel_properties.rb +++ b/lib/ably/realtime/channel/channel_properties.rb @@ -18,6 +18,15 @@ class ChannelProperties # attr_reader :attach_serial + # ChannelSerial contains the channelSerial from latest ProtocolMessage of action type + # Message/PresenceMessage received on the channel. + # + # @spec CP2b, RTL15b + # + # @return [String] + # + attr_accessor :channel_serial + def initialize(channel) @channel = channel end diff --git a/lib/ably/realtime/channel/channel_state_machine.rb b/lib/ably/realtime/channel/channel_state_machine.rb index fb91deee1..a346fec28 100644 --- a/lib/ably/realtime/channel/channel_state_machine.rb +++ b/lib/ably/realtime/channel/channel_state_machine.rb @@ -55,6 +55,7 @@ class ChannelStateMachine end after_transition(to: [:detached, :failed, :suspended]) do |channel, current_transition| + channel.properties.channel_serial = nil # RTP5a1 err = error_from_state_change(current_transition) channel.manager.fail_queued_messages(err) if channel.failed? or channel.suspended? #RTL11 channel.manager.log_channel_error err if err diff --git a/lib/ably/realtime/channels.rb b/lib/ably/realtime/channels.rb index 9b814db12..c34d38815 100644 --- a/lib/ably/realtime/channels.rb +++ b/lib/ably/realtime/channels.rb @@ -46,6 +46,22 @@ def release(channel) @channels.delete(channel) end if @channels.has_key?(channel) end + + # @param [Hash] serials - map of channel name to respective serial + def set_channel_serials(serials) + serials.each do |channel_name, channel_serial| + channels[channel_name].properties.channel_serial = channel_serial + end + end + + def get_channel_serials + channel_serials = {} + self.each do |channel| + channel_serials[channel.name] = channel.properties.channel_serial if channel.state == :attached + end + channel_serials + end + end end end diff --git a/lib/ably/realtime/client.rb b/lib/ably/realtime/client.rb index bf04776c2..179353161 100644 --- a/lib/ably/realtime/client.rb +++ b/lib/ably/realtime/client.rb @@ -62,7 +62,7 @@ class Client # When a recover option is specified a connection inherits the state of a previous connection that may have existed under a different instance of the Realtime library, please refer to the API documentation for further information on connection state recovery # @return [String,Nil] - attr_reader :recover + attr_accessor :recover # Additional parameters to be sent in the querystring when initiating a realtime connection # @return [Hash] @@ -120,17 +120,23 @@ def initialize(options) acc[key.to_s] = value.to_s end @rest_client = Ably::Rest::Client.new(options.merge(realtime_client: self)) - @echo_messages = rest_client.options.fetch(:echo_messages, true) == false ? false : true - @queue_messages = rest_client.options.fetch(:queue_messages, true) == false ? false : true + @echo_messages = rest_client.options.fetch(:echo_messages, true) + @queue_messages = rest_client.options.fetch(:queue_messages, true) @custom_realtime_host = rest_client.options[:realtime_host] || rest_client.options[:ws_host] - @auto_connect = rest_client.options.fetch(:auto_connect, true) == false ? false : true - @recover = rest_client.options[:recover] - - raise ArgumentError, "Recovery key '#{recover}' is invalid" if recover && !recover.match(Connection::RECOVER_REGEX) + @auto_connect = rest_client.options.fetch(:auto_connect, true) + @recover = rest_client.options.fetch(:recover, '') @auth = Ably::Realtime::Auth.new(self) @channels = Ably::Realtime::Channels.new(self) @connection = Ably::Realtime::Connection.new(self, options) + + unless @recover.empty? + recovery_context = RecoveryKeyContext.from_json(@recover, logger) + unless recovery_context.nil? + @channels.set_channel_serials recovery_context.channel_serials # RTN16j + @connection.message_serial = recovery_context.msg_serial # RTN16f + end + end end # Return a {Ably::Realtime::Channel Realtime Channel} for the given name @@ -302,13 +308,6 @@ def logger @logger ||= Ably::Logger.new(self, log_level, rest_client.logger.custom_logger) end - # Disable connection recovery, typically used after a connection has been recovered - # @return [void] - # @api private - def disable_automatic_connection_recovery - @recover = nil - end - # @!attribute [r] fallback_endpoint # @return [URI::Generic] Fallback endpoint used to connect to the realtime Ably service. Note, after each connection attempt, a new random {Ably::FALLBACK_HOSTS fallback host} or provided fallback hosts are used # @api private diff --git a/lib/ably/realtime/client/incoming_message_dispatcher.rb b/lib/ably/realtime/client/incoming_message_dispatcher.rb index 292a713b7..bbd603754 100644 --- a/lib/ably/realtime/client/incoming_message_dispatcher.rb +++ b/lib/ably/realtime/client/incoming_message_dispatcher.rb @@ -38,6 +38,22 @@ def logger def dispatch_protocol_message(*args) protocol_message = args.first + # RTL15b + unless protocol_message.nil? + if protocol_message.has_message_serial? && + ( + protocol_message.action == :message || + protocol_message.action == :presence || + protocol_message.action == :attached + ) + + logger.info "Setting channel serial for #{channel.name}" + logger.info "Previous serial #{channel.name}, new serial #{protocol_message.channel_serial}" + get_channel(protocol_message.channel).tap do |channel| + channel.properties.channel_serial = protocol_message.channel_serial + end + end + end unless protocol_message.kind_of?(Ably::Models::ProtocolMessage) raise ArgumentError, "Expected a ProtocolMessage. Received #{protocol_message}" @@ -47,15 +63,6 @@ def dispatch_protocol_message(*args) logger.debug { "#{protocol_message.action} received: #{protocol_message}" } end - if protocol_message.action.match_any?(:sync, :presence, :message) - if connection.serial && protocol_message.has_connection_serial? && protocol_message.connection_serial <= connection.serial - error_message = "Protocol error, duplicate message received for serial #{protocol_message.connection_serial}" - logger.error error_message - return - end - end - - update_connection_recovery_info protocol_message connection.set_connection_confirmed_alive case protocol_message.action @@ -172,10 +179,6 @@ def process_connected_update_message(protocol_message) end end - def update_connection_recovery_info(protocol_message) - connection.update_connection_serial protocol_message.connection_serial if protocol_message.has_connection_serial? - end - def ack_pending_queue_for_message_serial(ack_protocol_message) drop_pending_queue_from_ack(ack_protocol_message) do |protocol_message| ack_messages protocol_message.messages diff --git a/lib/ably/realtime/connection.rb b/lib/ably/realtime/connection.rb index e07f33764..4f07ca48e 100644 --- a/lib/ably/realtime/connection.rb +++ b/lib/ably/realtime/connection.rb @@ -77,9 +77,6 @@ class Connection include Ably::Modules::UsesStateMachine ensure_state_machine_emits 'Ably::Models::ConnectionStateChange' - # Expected format for a connection recover key - RECOVER_REGEX = /^(?[^:]+):(?[^:]+):(?\-?\d+)$/ - # Defaults for automatic connection recovery and timeouts DEFAULTS = { channel_retry_timeout: 15, # when a channel becomes SUSPENDED, after this delay in seconds, the channel will automatically attempt to reattach if the connection is CONNECTED @@ -121,7 +118,7 @@ class Connection # # @return [Integer] # - attr_reader :serial + attr_reader :message_serial # An {Ably::Models::ErrorInfo} object describing the last error received if a connection failure occurs. # @@ -177,17 +174,6 @@ def initialize(client, options) end if options.kind_of?(Hash) @defaults.freeze - # If a recover client options is provided, then we need to ensure that the msgSerial matches the - # recover serial immediately at client library instantiation. This is done immediately so that any queued - # publishes use the correct serial number for these queued messages as well. - # There is no harm if the msgSerial is higher than expected if the recover fails. - recovery_msg_serial = connection_recover_parts && connection_recover_parts[:msg_serial].to_i - if recovery_msg_serial - @client_msg_serial = recovery_msg_serial - else - reset_client_msg_serial - end - Client::IncomingMessageDispatcher.new client, self Client::OutgoingMessageDispatcher.new client, self @@ -347,33 +333,41 @@ def internet_up? end end - # The recovery key string can be used by another client to recover this connection's state in the recover client options property. See connection state recover options for more information. + # The recovery key string can be used by another client to recover this connection's state in the + # recover client options property. See connection state recover options for more information. # # @spec RTN16b, RTN16c # # @return [String] + # @deprecated Use {#create_recovery_key} instead # def recovery_key - "#{key}:#{serial}:#{client_msg_serial}" if connection_resumable? + logger.warn "[DEPRECATION] recovery_key is deprecated, use create_recovery_key method instead" + create_recovery_key + end + + # The recovery key string can be used by another client to recover this connection's state in the recover client + # options property. See connection state recover options for more information. + # + # @spec RTN16g, RTN16c + # + # @return [String] a json string which incorporates the @connectionKey@, the current @msgSerial@ and collection + # of pairs of channel @name@ and current @channelSerial@ for every currently attached channel + def create_recovery_key + if key.nil? || key.empty? || state == :closing || state == :closed || state == :failed || state == :suspended + return "" #RTN16g2 + end + Ably::Modules::RecoveryKeyContext.to_json(key, message_serial, client.channels.get_channel_serials) end # Following a new connection being made, the connection ID, connection key - # and connection serial need to match the details provided by the server. + # need to match the details provided by the server. # # @return [void] # @api private - def configure_new(connection_id, connection_key, connection_serial) + def configure_new(connection_id, connection_key) @id = connection_id @key = connection_key - - update_connection_serial connection_serial - end - - # Store last received connection serial so that the connection can be resumed from the last known point-in-time - # @return [void] - # @api private - def update_connection_serial(connection_serial) - @serial = connection_serial end # Disable automatic resume of a connection @@ -381,7 +375,7 @@ def update_connection_serial(connection_serial) # @api private def reset_resume_info @key = nil - @serial = nil + @id = nil end # @!attribute [r] __outgoing_protocol_msgbus__ @@ -472,7 +466,7 @@ def create_websocket_transport url_params = auth_params.merge( 'format' => client.protocol, 'echo' => client.echo_messages, - 'v' => Ably::PROTOCOL_VERSION, + 'v' => Ably::PROTOCOL_VERSION, # RSC7a 'agent' => client.rest_client.agent ) @@ -486,14 +480,15 @@ def create_websocket_transport url_params['clientId'] = client.auth.client_id if client.auth.has_client_id? url_params.merge!(client.transport_params) - if connection_resumable? - url_params.merge! resume: key, connection_serial: serial - logger.debug { "Resuming connection key #{key} with serial #{serial}" } - elsif connection_recoverable? - url_params.merge! recover: connection_recover_parts[:recover], connectionSerial: connection_recover_parts[:connection_serial] - logger.debug { "Recovering connection with key #{client.recover}" } - unsafe_once(:connected, :closed, :failed) do - client.disable_automatic_connection_recovery + if not Ably::Util::String.is_null_or_empty(key) + url_params.merge! resume: key + logger.debug { "Resuming connection with key #{key}" } + elsif not Ably::Util::String.is_null_or_empty(client.recover) + recovery_context = RecoveryKeyContext.from_json(client.recover, logger) + unless recovery_context.nil? + key = recovery_context.connection_key + logger.debug { "Recovering connection with key #{key}" } + url_params.merge! recover: key end end @@ -593,13 +588,6 @@ def heartbeat_interval defaults.fetch(:realtime_request_timeout) end - # Resets the client message serial (msgSerial) sent to Ably for each new {Ably::Models::ProtocolMessage} - # (see #client_msg_serial) - # @api private - def reset_client_msg_serial - @client_msg_serial = -1 - end - # When a hearbeat or any other message from Ably is received # we know it's alive, see #RTN23 # @api private @@ -617,17 +605,18 @@ def time_since_connection_confirmed_alive? # #transition_state_machine must be used instead private :change_state + def message_serial=(serial) + @message_serial = serial + end + private # The client message serial (msgSerial) is incremented for every message that is published that requires an ACK. - # Note that this is different to the connection serial that contains the last known serial number # received from the server. # # A message serial number does not guarantee a message has been received, only sent. - # A connection serial guarantees the server has received the message and is thus used for connection recovery and resumes. - # @return [Integer] starting at -1 indicating no messages sent, 0 when the first message is sent - def client_msg_serial - @client_msg_serial + def message_serial + @message_serial end def resume_callbacks @@ -652,12 +641,7 @@ def add_message_serial_if_ack_required_to(protocol_message) end def add_message_serial_to(protocol_message) - @client_msg_serial += 1 - protocol_message[:msgSerial] = client_msg_serial - yield - rescue StandardError => e - @client_msg_serial -= 1 - raise e + end # Simply wait until the next EventMachine tick to ensure Connection initialization is complete @@ -665,10 +649,6 @@ def when_initialized EventMachine.next_tick { yield } end - def connection_resumable? - !key.nil? && !serial.nil? && connection_state_available? - end - def connection_state_available? return true if connected? @@ -682,14 +662,6 @@ def connection_state_available? end end - def connection_recoverable? - connection_recover_parts - end - - def connection_recover_parts - client.recover.to_s.match(RECOVER_REGEX) - end - def production? client.environment.nil? || client.environment == :production end diff --git a/lib/ably/realtime/connection/connection_manager.rb b/lib/ably/realtime/connection/connection_manager.rb index 8d514b9f9..24fac1d38 100644 --- a/lib/ably/realtime/connection/connection_manager.rb +++ b/lib/ably/realtime/connection/connection_manager.rb @@ -19,7 +19,8 @@ def initialize(connection) @connection = connection @timers = Hash.new { |hash, key| hash[key] = [] } - connection.unsafe_on(:closed) do + # RTN8c, RTN9c + connection.unsafe_on(:closing, :closed, :suspended, :failed) do connection.reset_resume_info end @@ -111,23 +112,28 @@ def connected(protocol_message) # Update the connection details and any associated defaults connection.set_connection_details protocol_message.connection_details + is_connection_resume_or_recover_attempt = !connection.key.nil? || !client.recover.nil? + # RTN15c7, RTN16d + failed_resume_or_recover = !protocol_message.connection_id == connection.id && !protocol_message.error.nil? + if is_connection_resume_or_recover_attempt and failed_resume_or_recover # RTN15c7 + connection.message_serial = 0 + end + client.recover = nil # RTN16k, explicitly setting null, so it won't be used for subsequent connection requests + if connection.key if protocol_message.connection_id == connection.id logger.debug { "ConnectionManager: Connection resumed successfully - ID #{connection.id} and key #{connection.key}" } EventMachine.next_tick { connection.trigger_resumed } resend_pending_message_ack_queue else - logger.debug { "ConnectionManager: Connection was not resumed, old connection ID #{connection.id} has been updated with new connection ID #{protocol_message.connection_id} and key #{protocol_message.connection_details.connection_key}" } nack_messages_on_all_channels protocol_message.error - force_reattach_on_channels protocol_message.error end else logger.debug { "ConnectionManager: New connection created with ID #{protocol_message.connection_id} and key #{protocol_message.connection_details.connection_key}" } end - reattach_suspended_channels protocol_message.error - - connection.configure_new protocol_message.connection_id, protocol_message.connection_details.connection_key, protocol_message.connection_serial + connection.configure_new protocol_message.connection_id, protocol_message.connection_details.connection_key + force_reattach_on_channels protocol_message.error # irrespective of connection success/failure, reattach channels end # When connection is CONNECTED and receives an update @@ -139,7 +145,7 @@ def connected_update(protocol_message) # Update the connection details and any associated defaults connection.set_connection_details protocol_message.connection_details - connection.configure_new protocol_message.connection_id, protocol_message.connection_details.connection_key, protocol_message.connection_serial + connection.configure_new protocol_message.connection_id, protocol_message.connection_details.connection_key state_change = Ably::Models::ConnectionStateChange.new( current: connection.state, @@ -568,20 +574,12 @@ def currently_renewing_token? client.auth.authorization_in_flight? end - def reattach_suspended_channels(error) - channels.select do |channel| - channel.suspended? - end.each do |channel| - channel.transition_state_machine :attaching - end - end - - # When continuity on a connection is lost all messages - # Channels in the ATTACHED or ATTACHING state should explicitly be re-attached - # by sending a new ATTACH to Ably + # When reconnected if channel is in ATTACHING, ATTACHED or SUSPENDED state + # it should explicitly be re-attached by sending a new ATTACH to Ably + # Spec : RTN15c6, RTN15c7 def force_reattach_on_channels(error) channels.select do |channel| - channel.attached? || channel.attaching? + channel.attached? || channel.attaching? || channel.suspended? end.each do |channel| channel.manager.request_reattach reason: error end diff --git a/lib/ably/realtime/recovery_key_context.rb b/lib/ably/realtime/recovery_key_context.rb new file mode 100644 index 000000000..30ec3bc2e --- /dev/null +++ b/lib/ably/realtime/recovery_key_context.rb @@ -0,0 +1,36 @@ +require 'json' +# frozen_string_literal: true + +module Ably + module Realtime + class RecoveryKeyContext + attr_reader :connection_key + attr_reader :msg_serial + attr_reader :channel_serials + + def initialize(connection_key, msg_serial, channel_serials) + @connection_key = connection_key + @msg_serial = msg_serial + @channel_serials = channel_serials + if @channel_serials.nil? + @channel_serials = {} + end + end + + def to_json + { 'connection_key' => @connection_key, 'msg_serial' => @msg_serial, 'channel_serials' => @channel_serials }.to_json + end + + def self.from_json(obj, logger = nil) + begin + data = JSON.load obj + self.new data['connection_key'], data['msg_serial'], data['channel_serials'] + rescue => e + logger.warn "unable to decode recovery key, found error #{e}" unless logger.nil? + return nil + end + end + + end + end +end diff --git a/lib/ably/rest/client.rb b/lib/ably/rest/client.rb index 235bd81f0..9f3348090 100644 --- a/lib/ably/rest/client.rb +++ b/lib/ably/rest/client.rb @@ -186,7 +186,7 @@ def initialize(options) @agent = options.delete(:agent) || Ably::AGENT @realtime_client = options.delete(:realtime_client) - @tls = options.delete(:tls) == false ? false : true + @tls = options.fetch(:tls, true); options.delete(:tls) @environment = options.delete(:environment) # nil is production @environment = nil if [:production, 'production'].include?(@environment) @protocol = options.delete(:protocol) || :msgpack @@ -200,10 +200,7 @@ def initialize(options) @log_retries_as_info = options.delete(:log_retries_as_info) @max_message_size = options.delete(:max_message_size) || MAX_MESSAGE_SIZE @max_frame_size = options.delete(:max_frame_size) || MAX_FRAME_SIZE - - if (@idempotent_rest_publishing = options.delete(:idempotent_rest_publishing)).nil? - @idempotent_rest_publishing = Ably::PROTOCOL_VERSION.to_f > 1.1 - end + @idempotent_rest_publishing = options.fetch(:idempotent_rest_publishing, true); options.delete(:idempotent_rest_publishing) if options[:fallback_hosts_use_default] && options[:fallback_hosts] raise ArgumentError, "fallback_hosts_use_default cannot be set to try when fallback_hosts is also provided" diff --git a/lib/ably/util/safe_deferrable.rb b/lib/ably/util/safe_deferrable.rb index 05b13c1cd..36cdd1cf8 100644 --- a/lib/ably/util/safe_deferrable.rb +++ b/lib/ably/util/safe_deferrable.rb @@ -1,6 +1,6 @@ module Ably::Util # SafeDeferrable class provides a Deferrable that is safe to use for for public interfaces - # of this client library. Any exceptions raised in the success or failure callbacks are + # of this client library. Any exceptions raised in the success or failure callbacks are # caught and logged to the provided logger. # # An exception in a callback provided by a developer should not break this client library diff --git a/lib/ably/util/string.rb b/lib/ably/util/string.rb new file mode 100644 index 000000000..c68911de7 --- /dev/null +++ b/lib/ably/util/string.rb @@ -0,0 +1,9 @@ +# frozen_string_literal: true + +module Ably::Util + module String + def self.is_null_or_empty(str) + str.nil? || str.empty? + end + end +end diff --git a/lib/ably/version.rb b/lib/ably/version.rb index c0265f5c9..743cede7b 100644 --- a/lib/ably/version.rb +++ b/lib/ably/version.rb @@ -1,9 +1,7 @@ module Ably - VERSION = '1.2.5' - PROTOCOL_VERSION = '1.2' - - # @api private - def self.major_minor_version_numeric - VERSION.gsub(/\.\d+$/, '').to_f - end + LIB_VERSION = '1.2.5' + # The level of compatibility with the Ably service that this SDK supports. + # Also referred to as the 'wire protocol version'. + # spec : CSV2 + PROTOCOL_VERSION = '2' end diff --git a/spec/acceptance/realtime/connection_spec.rb b/spec/acceptance/realtime/connection_spec.rb index 1619d6274..5fa0e6700 100644 --- a/spec/acceptance/realtime/connection_spec.rb +++ b/spec/acceptance/realtime/connection_spec.rb @@ -744,58 +744,6 @@ def expect_ordered_phases end end - describe '#serial connection serial' do - let(:channel) { client.channel(random_str) } - - it 'is set to -1 when a new connection is opened' do - connection.connect do - expect(connection.serial).to eql(-1) - stop_reactor - end - end - - context 'when a message is sent but the ACK has not yet been received' do - it 'the sent message msgSerial is 0 but the connection serial remains at -1' do - channel.attach do - connection.__outgoing_protocol_msgbus__.subscribe(:protocol_message) do |protocol_message| - if protocol_message.action == :message - connection.__outgoing_protocol_msgbus__.unsubscribe - expect(protocol_message['msgSerial']).to eql(0) - expect(connection.serial).to eql(-1) - stop_reactor - end - end - channel.publish('event', 'data') - end - end - end - - it 'is set to 0 when a message is received back' do - channel.publish('event', 'data') - channel.subscribe do - expect(connection.serial).to eql(0) - stop_reactor - end - end - - it 'is set to 1 when the second message is received' do - channel.attach do - messages = [] - channel.subscribe do |message| - messages << message - if messages.length == 2 - expect(connection.serial).to eql(1) - stop_reactor - end - end - - channel.publish('event', 'data') do - channel.publish('event', 'data') - end - end - end - end - describe '#msgSerial' do context 'when messages are queued for publish before a connection is established' do let(:batches) { 6 } @@ -922,7 +870,6 @@ def log_connection_changes let(:protocol_message_attributes) do { action: Ably::Models::ProtocolMessage::ACTION.Connected.to_i, - connection_serial: 55, connection_details: { max_idle_interval: 2 * 1000 } @@ -1232,7 +1179,6 @@ def log_connection_changes let(:protocol_message_attributes) do { action: Ably::Models::ProtocolMessage::ACTION.Connected.to_i, - connection_serial: 55, connection_details: { max_idle_interval: 2 * 1000 } @@ -1403,34 +1349,6 @@ def self.available_states let(:states) { Hash.new } let(:channel) { client.channel(random_str) } - it 'is composed of connection key and serial that is kept up to date with each message ACK received' do - connection.on(:connected) do - expected_serial = -1 - expect(connection.key).to_not be_nil - expect(connection.serial).to eql(expected_serial) - - channel.attach do - channel.publish('event', 'data') - channel.subscribe do - channel.unsubscribe - - expected_serial += 1 # attach message received - expect(connection.serial).to eql(expected_serial) - - channel.publish('event', 'data') - channel.subscribe do - channel.unsubscribe - expected_serial += 1 # attach message received - expect(connection.serial).to eql(expected_serial) - - expect(connection.recovery_key).to eql("#{connection.key}:#{connection.serial}:#{connection.send(:client_msg_serial)}") - stop_reactor - end - end - end - end - end - it "is available when connection is in one of the states: #{available_states.join(', ')}" do connection.once(:connected) do allow(client).to receive(:endpoint).and_return( @@ -1542,11 +1460,11 @@ def self.available_states msg_serial, recovery_key, connection_id = nil, nil, nil channel.attach do - expect(connection.send(:client_msg_serial)).to eql(-1) # no messages published yet + expect(connection.send(:message_serial)).to eql(-1) # no messages published yet connection_id = client.connection.id connection.transport.__incoming_protocol_msgbus__ channel.publish('event', 'message') do - msg_serial = connection.send(:client_msg_serial) + msg_serial = connection.send(:message_serial) expect(msg_serial).to eql(0) recovery_key = client.connection.recovery_key connection.transition_state_machine! :failed @@ -1558,7 +1476,7 @@ def self.available_states recover_client_channel = recover_client.channel(channel_name) recover_client_channel.attach do expect(recover_client.connection.id).to eql(connection_id) - expect(recover_client.connection.send(:client_msg_serial)).to eql(msg_serial) + expect(recover_client.connection.send(:message_serial)).to eql(msg_serial) stop_reactor end end @@ -1572,12 +1490,12 @@ def self.available_states msg_serial, recovery_key, connection_id = nil, nil, nil channel.attach do - expect(connection.send(:client_msg_serial)).to eql(-1) # no messages published yet + expect(connection.send(:message_serial)).to eql(-1) # no messages published yet connection_id = client.connection.id connection.transport.__incoming_protocol_msgbus__ channel.subscribe('event') do |message| expect(message.data).to eql('message-1') - msg_serial = connection.send(:client_msg_serial) + msg_serial = connection.send(:message_serial) expect(msg_serial).to eql(0) recovery_key = client.connection.recovery_key connection.transition_state_machine! :failed @@ -1588,11 +1506,11 @@ def self.available_states connection.on(:failed) do recover_client = auto_close Ably::Realtime::Client.new(default_options.merge(recover: recovery_key)) recover_client_channel = recover_client.channel(channel_name) - expect(recover_client.connection.send(:client_msg_serial)).to eql(msg_serial) + expect(recover_client.connection.send(:message_serial)).to eql(msg_serial) recover_client.connection.once(:connecting) do recover_client_channel.publish('event', 'message-2') - expect(recover_client.connection.send(:client_msg_serial)).to eql(msg_serial + 1) + expect(recover_client.connection.send(:message_serial)).to eql(msg_serial + 1) end recover_client_channel.attach do @@ -2001,7 +1919,6 @@ def self.available_states let(:protocol_message_attributes) do { action: Ably::Models::ProtocolMessage::ACTION.Connected.to_i, - connection_serial: 55, connection_details: { client_id: 'bob', connection_key: connection_key, @@ -2037,7 +1954,6 @@ def self.available_states connection.once(:update) do |connection_state_change| expect(client.auth.client_id).to eql('bob') expect(connection.key).to eql(connection_key) - expect(connection.serial).to eql(55) expect(connection.connection_state_ttl).to eql(33) expect(connection.details.client_id).to eql('bob') @@ -2060,7 +1976,6 @@ def self.available_states let(:protocol_message_attributes) do { action: Ably::Models::ProtocolMessage::ACTION.Connected.to_i, - connection_serial: 22, error: { code: 50000, message: 'Internal failure' }, } end @@ -2089,7 +2004,7 @@ def self.available_states it 'sends the protocol version param v (#G4, #RTN2f)' do expect(EventMachine).to receive(:connect) do |host, port, transport, object, url| uri = URI.parse(url) - expect(CGI::parse(uri.query)['v'][0]).to eql('1.2') + expect(CGI::parse(uri.query)['v'][0]).to eql('2') stop_reactor end client diff --git a/spec/acceptance/realtime/presence_spec.rb b/spec/acceptance/realtime/presence_spec.rb index 2250a14b1..8f3a5e54c 100644 --- a/spec/acceptance/realtime/presence_spec.rb +++ b/spec/acceptance/realtime/presence_spec.rb @@ -580,7 +580,6 @@ def presence_action(method_name, data) action = Ably::Models::ProtocolMessage::ACTION.Presence presence_msg = Ably::Models::ProtocolMessage.new( action: action, - connection_serial: 20, channel: channel_name, presence: presence_data, timestamp: Time.now.to_i * 1000 @@ -633,7 +632,6 @@ def allow_sync_fabricate_data_final_sync_and_assert_members action = Ably::Models::ProtocolMessage::ACTION.Presence presence_msg = Ably::Models::ProtocolMessage.new( action: action, - connection_serial: anonymous_client.connection.serial + 1, channel: channel_name, presence: presence_data, timestamp: Time.now.to_i * 1000 @@ -644,7 +642,6 @@ def allow_sync_fabricate_data_final_sync_and_assert_members action = Ably::Models::ProtocolMessage::ACTION.Sync sync_msg = Ably::Models::ProtocolMessage.new( action: action, - connection_serial: anonymous_client.connection.serial + 2, channel: channel_name, channel_serial: 'validserialprefix:', # with no part after the `:` this indicates the end to the SYNC presence: [], @@ -2243,7 +2240,6 @@ def connect_members_deferrables action = Ably::Models::ProtocolMessage::ACTION.Sync sync_message = Ably::Models::ProtocolMessage.new( action: action, - connection_serial: 10, channel_serial: 'sequenceid:cursor', channel: channel_name, presence: presence_sync_1, @@ -2253,7 +2249,6 @@ def connect_members_deferrables sync_message = Ably::Models::ProtocolMessage.new( action: action, - connection_serial: 11, channel_serial: 'sequenceid:', # indicates SYNC is complete channel: channel_name, presence: presence_sync_2, @@ -2294,7 +2289,6 @@ def connect_members_deferrables action = Ably::Models::ProtocolMessage::ACTION.Sync sync_message = Ably::Models::ProtocolMessage.new( action: action, - connection_serial: 10, channel: channel_name, presence: presence_sync, timestamp: Time.now.to_i * 1000 @@ -2348,7 +2342,6 @@ def connect_members_deferrables action = Ably::Models::ProtocolMessage::ACTION.Sync sync_message = Ably::Models::ProtocolMessage.new( action: action, - connection_serial: 10, channel: channel_name, presence: presence_sync_protocol_message, timestamp: Time.now.to_i * 1000 diff --git a/spec/acceptance/rest/client_spec.rb b/spec/acceptance/rest/client_spec.rb index db80bb65f..e1619c6ef 100644 --- a/spec/acceptance/rest/client_spec.rb +++ b/spec/acceptance/rest/client_spec.rb @@ -1095,9 +1095,16 @@ def encode64(text) end it 'sends a protocol version and lib version header (#G4, #RSC7a, #RSC7b)' do - client.channels.get('foo').publish("event") + response = client.channels.get('foo').publish("event") + expect(response).to eql true expect(publish_message_stub).to have_been_requested - expect(Ably::PROTOCOL_VERSION).to eql('1.2') + if agent.nil? + expect(publish_message_stub.to_s).to include("'Ably-Agent'=>'#{Ably::AGENT}'") + expect(publish_message_stub.to_s).to include("'X-Ably-Version'=>'2'") + else + expect(publish_message_stub.to_s).to include("'Ably-Agent'=>'ably-ruby/1.1.1 ruby/3.1.1'") + expect(publish_message_stub.to_s).to include("'X-Ably-Version'=>'2'") + end end end end diff --git a/spec/acceptance/rest/message_spec.rb b/spec/acceptance/rest/message_spec.rb index 4291d6aa6..89ec72059 100644 --- a/spec/acceptance/rest/message_spec.rb +++ b/spec/acceptance/rest/message_spec.rb @@ -204,20 +204,17 @@ end end - specify 'idempotent publishing is disabled by default with <= 1.1 (#TO3n)' do - stub_const 'Ably::PROTOCOL_VERSION', '1.0' - client = Ably::Rest::Client.new(key: api_key, protocol: protocol) - expect(client.idempotent_rest_publishing).to be_falsey - stub_const 'Ably::PROTOCOL_VERSION', '1.1' - client = Ably::Rest::Client.new(key: api_key, protocol: protocol) + specify 'idempotent publishing is set as per clientOptions' do + # set idempotent_rest_publishing to false + client = Ably::Rest::Client.new(key: api_key, protocol: protocol, idempotent_rest_publishing: false) expect(client.idempotent_rest_publishing).to be_falsey + + # set idempotent_rest_publishing to true + client = Ably::Rest::Client.new(key: api_key, protocol: protocol, idempotent_rest_publishing: true) + expect(client.idempotent_rest_publishing).to be_truthy end specify 'idempotent publishing is enabled by default with >= 1.2 (#TO3n)' do - stub_const 'Ably::PROTOCOL_VERSION', '1.2' - client = Ably::Rest::Client.new(key: api_key, protocol: protocol) - expect(client.idempotent_rest_publishing).to be_truthy - stub_const 'Ably::PROTOCOL_VERSION', '1.3' client = Ably::Rest::Client.new(key: api_key, protocol: protocol) expect(client.idempotent_rest_publishing).to be_truthy end diff --git a/spec/support/markdown_spec_formatter.rb b/spec/support/markdown_spec_formatter.rb index ba4ca4f9b..40f62ec4b 100644 --- a/spec/support/markdown_spec_formatter.rb +++ b/spec/support/markdown_spec_formatter.rb @@ -28,7 +28,7 @@ def start(notification) else 'REST' end - output.write "# Ably #{scope} Client Library #{Ably::VERSION} Specification\n" + output.write "# Ably #{scope} Client Library #{Ably::LIB_VERSION} Specification\n" end def close(notification) diff --git a/spec/unit/models/protocol_message_spec.rb b/spec/unit/models/protocol_message_spec.rb index ddfea5733..a6d7d6be4 100644 --- a/spec/unit/models/protocol_message_spec.rb +++ b/spec/unit/models/protocol_message_spec.rb @@ -127,14 +127,6 @@ def new_protocol_message(options) end end - context '#connection_serial' do - let(:protocol_message) { new_protocol_message(connection_serial: "55") } - it 'converts :connection_serial to an Integer' do - expect(protocol_message.connection_serial).to be_a(Integer) - expect(protocol_message.connection_serial).to eql(55) - end - end - context '#flags (#TR4i)' do context 'when nil' do let(:protocol_message) { new_protocol_message({}) } @@ -241,52 +233,18 @@ def new_protocol_message(options) end end - context '#has_connection_serial?' do - context 'without connection_serial' do - let(:protocol_message) { new_protocol_message({}) } - - it 'returns false' do - expect(protocol_message.has_connection_serial?).to eql(false) - end - end - - context 'with connection_serial' do - let(:protocol_message) { new_protocol_message(connection_serial: "55") } - - it 'returns true' do - expect(protocol_message.has_connection_serial?).to eql(true) - end - end - end - context '#serial' do context 'with underlying msg_serial' do let(:protocol_message) { new_protocol_message(msg_serial: "55") } it 'converts :msg_serial to an Integer' do - expect(protocol_message.serial).to be_a(Integer) - expect(protocol_message.serial).to eql(55) - end - end - - context 'with underlying connection_serial' do - let(:protocol_message) { new_protocol_message(connection_serial: "55") } - it 'converts :connection_serial to an Integer' do - expect(protocol_message.serial).to be_a(Integer) - expect(protocol_message.serial).to eql(55) - end - end - - context 'with underlying connection_serial and msg_serial' do - let(:protocol_message) { new_protocol_message(connection_serial: "99", msg_serial: "11") } - it 'prefers connection_serial and converts :connection_serial to an Integer' do - expect(protocol_message.serial).to be_a(Integer) - expect(protocol_message.serial).to eql(99) + expect(protocol_message.message_serial).to be_a(Integer) + expect(protocol_message.message_serial).to eql(55) end end end context '#has_serial?' do - context 'without msg_serial or connection_serial' do + context 'without msg_serial' do let(:protocol_message) { new_protocol_message({}) } it 'returns false' do @@ -301,14 +259,6 @@ def new_protocol_message(options) expect(protocol_message.has_serial?).to eql(true) end end - - context 'with connection_serial' do - let(:protocol_message) { new_protocol_message(connection_serial: "55") } - - it 'returns true' do - expect(protocol_message.has_serial?).to eql(true) - end - end end context '#error' do diff --git a/spec/unit/realtime/recovery_key_context_spec.rb b/spec/unit/realtime/recovery_key_context_spec.rb new file mode 100644 index 000000000..fd373a395 --- /dev/null +++ b/spec/unit/realtime/recovery_key_context_spec.rb @@ -0,0 +1,36 @@ +require 'spec_helper' +require 'ably/realtime/recovery_key_context' + +describe Ably::Realtime::RecoveryKeyContext do + + context 'connection recovery key' do + + it 'should encode recovery key - RTN16i, RTN16f, RTN16j' do + connection_key = 'key' + msg_serial = 123 + channel_serials = { + 'channel1' => 'serial1', + 'channel2' => 'serial2' + } + recovery_context = Ably::Realtime::RecoveryKeyContext.new(connection_key, msg_serial, channel_serials) + encoded_recovery_key = recovery_context.to_json + expect(encoded_recovery_key).to eq "{\"connection_key\":\"key\",\"msg_serial\":123," << + "\"channel_serials\":{\"channel1\":\"serial1\",\"channel2\":\"serial2\"}}" + end + + it 'should decode recovery key - RTN16i, RTN16f, RTN16j' do + encoded_recovery_key = "{\"connection_key\":\"key\",\"msg_serial\":123," << + "\"channel_serials\":{\"channel1\":\"serial1\",\"channel2\":\"serial2\"}}" + decoded_recovery_key = Ably::Realtime::RecoveryKeyContext.from_json(encoded_recovery_key) + expect(decoded_recovery_key.connection_key).to eq("key") + expect(decoded_recovery_key.msg_serial).to eq(123) + end + + it 'should return nil for invalid recovery key - RTN16i, RTN16f, RTN16j' do + encoded_recovery_key = "{\"invalid key\"}" + decoded_recovery_key = Ably::Realtime::RecoveryKeyContext.from_json(encoded_recovery_key) + expect(decoded_recovery_key).to be_nil + end + + end +end