diff --git a/lib/ably/models/protocol_message.rb b/lib/ably/models/protocol_message.rb index 8490ebc33..9b97228ff 100644 --- a/lib/ably/models/protocol_message.rb +++ b/lib/ably/models/protocol_message.rb @@ -20,8 +20,6 @@ module Ably::Models # @return [String] Contains a serial number for a message on the current channel # @!attribute [r] connection_id # @return [String] Contains a string private connection key used to recover this connection - # @!attribute [r] connection_serial - # @return [Bignum] Contains a serial number for a message sent from the server to the client # @!attribute [r] message_serial # @return [Bignum] Contains a serial number for a message sent from the client to the server # @!attribute [r] timestamp @@ -129,12 +127,6 @@ def message_serial raise TypeError, "msg_serial '#{attributes[:msg_serial]}' is invalid, a positive Integer is expected for a ProtocolMessage" end - def connection_serial - Integer(attributes[:connection_serial]) - rescue TypeError - raise TypeError, "connection_serial '#{attributes[:connection_serial]}' is invalid, a positive Integer is expected for a ProtocolMessage" - end - def count [1, attributes[:count].to_i].max end @@ -146,24 +138,9 @@ def has_message_serial? false end - # @api private - def has_connection_serial? - connection_serial && true - rescue TypeError - false - end - - def serial - if has_connection_serial? - connection_serial - else - message_serial - end - end - # @api private def has_serial? - has_connection_serial? || has_message_serial? + has_message_serial? end def messages @@ -271,7 +248,7 @@ def attributes # Return a JSON ready object from the underlying #attributes using Ably naming conventions for keys def as_json(*args) raise TypeError, ':action is missing, cannot generate a valid Hash for ProtocolMessage' unless action - raise TypeError, ':msg_serial or :connection_serial is missing, cannot generate a valid Hash for ProtocolMessage' if ack_required? && !has_serial? + raise TypeError, ':msg_serial is missing, cannot generate a valid Hash for ProtocolMessage' if ack_required? && !has_serial? attributes.dup.tap do |hash_object| hash_object['action'] = action.to_i diff --git a/lib/ably/realtime/client.rb b/lib/ably/realtime/client.rb index 1769c1e98..1f0e0bb1a 100644 --- a/lib/ably/realtime/client.rb +++ b/lib/ably/realtime/client.rb @@ -134,7 +134,7 @@ def initialize(options) recovery_context = RecoveryKeyContext.from_json(@recover) unless recovery_context.nil? @channels.set_channel_serials recovery_context.channel_serials - @connection.client_msg_serial = recovery_context.msg_serial # RTN16f + @connection.message_serial = recovery_context.msg_serial # RTN16f end end end @@ -308,13 +308,6 @@ def logger @logger ||= Ably::Logger.new(self, log_level, rest_client.logger.custom_logger) end - # Disable connection recovery, typically used after a connection has been recovered - # @return [void] - # @api private - def disable_automatic_connection_recovery - @recover = nil - end - # @!attribute [r] fallback_endpoint # @return [URI::Generic] Fallback endpoint used to connect to the realtime Ably service. Note, after each connection attempt, a new random {Ably::FALLBACK_HOSTS fallback host} or provided fallback hosts are used # @api private diff --git a/lib/ably/realtime/client/incoming_message_dispatcher.rb b/lib/ably/realtime/client/incoming_message_dispatcher.rb index 292a713b7..0b1057b14 100644 --- a/lib/ably/realtime/client/incoming_message_dispatcher.rb +++ b/lib/ably/realtime/client/incoming_message_dispatcher.rb @@ -47,15 +47,6 @@ def dispatch_protocol_message(*args) logger.debug { "#{protocol_message.action} received: #{protocol_message}" } end - if protocol_message.action.match_any?(:sync, :presence, :message) - if connection.serial && protocol_message.has_connection_serial? && protocol_message.connection_serial <= connection.serial - error_message = "Protocol error, duplicate message received for serial #{protocol_message.connection_serial}" - logger.error error_message - return - end - end - - update_connection_recovery_info protocol_message connection.set_connection_confirmed_alive case protocol_message.action @@ -172,10 +163,6 @@ def process_connected_update_message(protocol_message) end end - def update_connection_recovery_info(protocol_message) - connection.update_connection_serial protocol_message.connection_serial if protocol_message.has_connection_serial? - end - def ack_pending_queue_for_message_serial(ack_protocol_message) drop_pending_queue_from_ack(ack_protocol_message) do |protocol_message| ack_messages protocol_message.messages diff --git a/lib/ably/realtime/connection.rb b/lib/ably/realtime/connection.rb index 9e3477287..2188a735c 100644 --- a/lib/ably/realtime/connection.rb +++ b/lib/ably/realtime/connection.rb @@ -77,9 +77,6 @@ class Connection include Ably::Modules::UsesStateMachine ensure_state_machine_emits 'Ably::Models::ConnectionStateChange' - # Expected format for a connection recover key - RECOVER_REGEX = /^(?[^:]+):(?[^:]+):(?\-?\d+)$/ - # Defaults for automatic connection recovery and timeouts DEFAULTS = { channel_retry_timeout: 15, # when a channel becomes SUSPENDED, after this delay in seconds, the channel will automatically attempt to reattach if the connection is CONNECTED @@ -121,7 +118,7 @@ class Connection # # @return [Integer] # - attr_reader :serial + attr_reader :message_serial # An {Ably::Models::ErrorInfo} object describing the last error received if a connection failure occurs. # @@ -177,17 +174,6 @@ def initialize(client, options) end if options.kind_of?(Hash) @defaults.freeze - # If a recover client options is provided, then we need to ensure that the msgSerial matches the - # recover serial immediately at client library instantiation. This is done immediately so that any queued - # publishes use the correct serial number for these queued messages as well. - # There is no harm if the msgSerial is higher than expected if the recover fails. - recovery_msg_serial = connection_recover_parts && connection_recover_parts[:msg_serial].to_i - if recovery_msg_serial - @client_msg_serial = recovery_msg_serial - else - reset_client_msg_serial - end - Client::IncomingMessageDispatcher.new client, self Client::OutgoingMessageDispatcher.new client, self @@ -354,26 +340,17 @@ def internet_up? # @return [String] # def recovery_key - "#{key}:#{serial}:#{client_msg_serial}" if connection_resumable? + "#{key}:#{serial}:#{message_serial}" if connection_resumable? end # Following a new connection being made, the connection ID, connection key - # and connection serial need to match the details provided by the server. + # need to match the details provided by the server. # # @return [void] # @api private - def configure_new(connection_id, connection_key, connection_serial) + def configure_new(connection_id, connection_key) @id = connection_id @key = connection_key - - update_connection_serial connection_serial - end - - # Store last received connection serial so that the connection can be resumed from the last known point-in-time - # @return [void] - # @api private - def update_connection_serial(connection_serial) - @serial = connection_serial end # Disable automatic resume of a connection @@ -381,7 +358,6 @@ def update_connection_serial(connection_serial) # @api private def reset_resume_info @key = nil - @serial = nil end # @!attribute [r] __outgoing_protocol_msgbus__ @@ -486,14 +462,15 @@ def create_websocket_transport url_params['clientId'] = client.auth.client_id if client.auth.has_client_id? url_params.merge!(client.transport_params) - if connection_resumable? - url_params.merge! resume: key, connection_serial: serial - logger.debug { "Resuming connection key #{key} with serial #{serial}" } - elsif connection_recoverable? - url_params.merge! recover: connection_recover_parts[:recover], connectionSerial: connection_recover_parts[:connection_serial] - logger.debug { "Recovering connection with key #{client.recover}" } - unsafe_once(:connected, :closed, :failed) do - client.disable_automatic_connection_recovery + if not (key.nil? || key.empty?) + url_params.merge! resume: key + logger.debug { "Resuming connection with key #{key}" } + elsif not (client.recover.nil? || client.recover.empty?) + recovery_context = RecoveryKeyContext.from_json client.recover + unless recovery_context.nil? + key = recovery_context.connection_key + logger.debug { "Recovering connection with key #{key}" } + url_params.merge! resume: key end end @@ -593,13 +570,6 @@ def heartbeat_interval defaults.fetch(:realtime_request_timeout) end - # Resets the client message serial (msgSerial) sent to Ably for each new {Ably::Models::ProtocolMessage} - # (see #client_msg_serial) - # @api private - def reset_client_msg_serial - @client_msg_serial = -1 - end - # When a hearbeat or any other message from Ably is received # we know it's alive, see #RTN23 # @api private @@ -617,21 +587,18 @@ def time_since_connection_confirmed_alive? # #transition_state_machine must be used instead private :change_state - def client_msg_serial=(serial) - @client_msg_serial = serial + def message_serial=(serial) + @message_serial = serial end private # The client message serial (msgSerial) is incremented for every message that is published that requires an ACK. - # Note that this is different to the connection serial that contains the last known serial number # received from the server. # # A message serial number does not guarantee a message has been received, only sent. - # A connection serial guarantees the server has received the message and is thus used for connection recovery and resumes. - # @return [Integer] starting at -1 indicating no messages sent, 0 when the first message is sent - def client_msg_serial - @client_msg_serial + def message_serial + @message_serial end def resume_callbacks @@ -656,12 +623,7 @@ def add_message_serial_if_ack_required_to(protocol_message) end def add_message_serial_to(protocol_message) - @client_msg_serial += 1 - protocol_message[:msgSerial] = client_msg_serial - yield - rescue StandardError => e - @client_msg_serial -= 1 - raise e + end # Simply wait until the next EventMachine tick to ensure Connection initialization is complete @@ -669,10 +631,6 @@ def when_initialized EventMachine.next_tick { yield } end - def connection_resumable? - !key.nil? && !serial.nil? && connection_state_available? - end - def connection_state_available? return true if connected? @@ -686,14 +644,6 @@ def connection_state_available? end end - def connection_recoverable? - connection_recover_parts - end - - def connection_recover_parts - client.recover.to_s.match(RECOVER_REGEX) - end - def production? client.environment.nil? || client.environment == :production end diff --git a/lib/ably/realtime/connection/connection_manager.rb b/lib/ably/realtime/connection/connection_manager.rb index 8d514b9f9..00e666226 100644 --- a/lib/ably/realtime/connection/connection_manager.rb +++ b/lib/ably/realtime/connection/connection_manager.rb @@ -127,7 +127,7 @@ def connected(protocol_message) reattach_suspended_channels protocol_message.error - connection.configure_new protocol_message.connection_id, protocol_message.connection_details.connection_key, protocol_message.connection_serial + connection.configure_new protocol_message.connection_id, protocol_message.connection_details.connection_key end # When connection is CONNECTED and receives an update @@ -139,7 +139,7 @@ def connected_update(protocol_message) # Update the connection details and any associated defaults connection.set_connection_details protocol_message.connection_details - connection.configure_new protocol_message.connection_id, protocol_message.connection_details.connection_key, protocol_message.connection_serial + connection.configure_new protocol_message.connection_id, protocol_message.connection_details.connection_key state_change = Ably::Models::ConnectionStateChange.new( current: connection.state, diff --git a/spec/acceptance/realtime/connection_spec.rb b/spec/acceptance/realtime/connection_spec.rb index 1619d6274..e5b670e27 100644 --- a/spec/acceptance/realtime/connection_spec.rb +++ b/spec/acceptance/realtime/connection_spec.rb @@ -744,58 +744,6 @@ def expect_ordered_phases end end - describe '#serial connection serial' do - let(:channel) { client.channel(random_str) } - - it 'is set to -1 when a new connection is opened' do - connection.connect do - expect(connection.serial).to eql(-1) - stop_reactor - end - end - - context 'when a message is sent but the ACK has not yet been received' do - it 'the sent message msgSerial is 0 but the connection serial remains at -1' do - channel.attach do - connection.__outgoing_protocol_msgbus__.subscribe(:protocol_message) do |protocol_message| - if protocol_message.action == :message - connection.__outgoing_protocol_msgbus__.unsubscribe - expect(protocol_message['msgSerial']).to eql(0) - expect(connection.serial).to eql(-1) - stop_reactor - end - end - channel.publish('event', 'data') - end - end - end - - it 'is set to 0 when a message is received back' do - channel.publish('event', 'data') - channel.subscribe do - expect(connection.serial).to eql(0) - stop_reactor - end - end - - it 'is set to 1 when the second message is received' do - channel.attach do - messages = [] - channel.subscribe do |message| - messages << message - if messages.length == 2 - expect(connection.serial).to eql(1) - stop_reactor - end - end - - channel.publish('event', 'data') do - channel.publish('event', 'data') - end - end - end - end - describe '#msgSerial' do context 'when messages are queued for publish before a connection is established' do let(:batches) { 6 } @@ -922,7 +870,6 @@ def log_connection_changes let(:protocol_message_attributes) do { action: Ably::Models::ProtocolMessage::ACTION.Connected.to_i, - connection_serial: 55, connection_details: { max_idle_interval: 2 * 1000 } @@ -1232,7 +1179,6 @@ def log_connection_changes let(:protocol_message_attributes) do { action: Ably::Models::ProtocolMessage::ACTION.Connected.to_i, - connection_serial: 55, connection_details: { max_idle_interval: 2 * 1000 } @@ -1403,34 +1349,6 @@ def self.available_states let(:states) { Hash.new } let(:channel) { client.channel(random_str) } - it 'is composed of connection key and serial that is kept up to date with each message ACK received' do - connection.on(:connected) do - expected_serial = -1 - expect(connection.key).to_not be_nil - expect(connection.serial).to eql(expected_serial) - - channel.attach do - channel.publish('event', 'data') - channel.subscribe do - channel.unsubscribe - - expected_serial += 1 # attach message received - expect(connection.serial).to eql(expected_serial) - - channel.publish('event', 'data') - channel.subscribe do - channel.unsubscribe - expected_serial += 1 # attach message received - expect(connection.serial).to eql(expected_serial) - - expect(connection.recovery_key).to eql("#{connection.key}:#{connection.serial}:#{connection.send(:client_msg_serial)}") - stop_reactor - end - end - end - end - end - it "is available when connection is in one of the states: #{available_states.join(', ')}" do connection.once(:connected) do allow(client).to receive(:endpoint).and_return( @@ -1542,11 +1460,11 @@ def self.available_states msg_serial, recovery_key, connection_id = nil, nil, nil channel.attach do - expect(connection.send(:client_msg_serial)).to eql(-1) # no messages published yet + expect(connection.send(:message_serial)).to eql(-1) # no messages published yet connection_id = client.connection.id connection.transport.__incoming_protocol_msgbus__ channel.publish('event', 'message') do - msg_serial = connection.send(:client_msg_serial) + msg_serial = connection.send(:message_serial) expect(msg_serial).to eql(0) recovery_key = client.connection.recovery_key connection.transition_state_machine! :failed @@ -1558,7 +1476,7 @@ def self.available_states recover_client_channel = recover_client.channel(channel_name) recover_client_channel.attach do expect(recover_client.connection.id).to eql(connection_id) - expect(recover_client.connection.send(:client_msg_serial)).to eql(msg_serial) + expect(recover_client.connection.send(:message_serial)).to eql(msg_serial) stop_reactor end end @@ -1572,12 +1490,12 @@ def self.available_states msg_serial, recovery_key, connection_id = nil, nil, nil channel.attach do - expect(connection.send(:client_msg_serial)).to eql(-1) # no messages published yet + expect(connection.send(:message_serial)).to eql(-1) # no messages published yet connection_id = client.connection.id connection.transport.__incoming_protocol_msgbus__ channel.subscribe('event') do |message| expect(message.data).to eql('message-1') - msg_serial = connection.send(:client_msg_serial) + msg_serial = connection.send(:message_serial) expect(msg_serial).to eql(0) recovery_key = client.connection.recovery_key connection.transition_state_machine! :failed @@ -1588,11 +1506,11 @@ def self.available_states connection.on(:failed) do recover_client = auto_close Ably::Realtime::Client.new(default_options.merge(recover: recovery_key)) recover_client_channel = recover_client.channel(channel_name) - expect(recover_client.connection.send(:client_msg_serial)).to eql(msg_serial) + expect(recover_client.connection.send(:message_serial)).to eql(msg_serial) recover_client.connection.once(:connecting) do recover_client_channel.publish('event', 'message-2') - expect(recover_client.connection.send(:client_msg_serial)).to eql(msg_serial + 1) + expect(recover_client.connection.send(:message_serial)).to eql(msg_serial + 1) end recover_client_channel.attach do @@ -2001,7 +1919,6 @@ def self.available_states let(:protocol_message_attributes) do { action: Ably::Models::ProtocolMessage::ACTION.Connected.to_i, - connection_serial: 55, connection_details: { client_id: 'bob', connection_key: connection_key, @@ -2037,7 +1954,6 @@ def self.available_states connection.once(:update) do |connection_state_change| expect(client.auth.client_id).to eql('bob') expect(connection.key).to eql(connection_key) - expect(connection.serial).to eql(55) expect(connection.connection_state_ttl).to eql(33) expect(connection.details.client_id).to eql('bob') @@ -2060,7 +1976,6 @@ def self.available_states let(:protocol_message_attributes) do { action: Ably::Models::ProtocolMessage::ACTION.Connected.to_i, - connection_serial: 22, error: { code: 50000, message: 'Internal failure' }, } end diff --git a/spec/acceptance/realtime/presence_spec.rb b/spec/acceptance/realtime/presence_spec.rb index 2250a14b1..8f3a5e54c 100644 --- a/spec/acceptance/realtime/presence_spec.rb +++ b/spec/acceptance/realtime/presence_spec.rb @@ -580,7 +580,6 @@ def presence_action(method_name, data) action = Ably::Models::ProtocolMessage::ACTION.Presence presence_msg = Ably::Models::ProtocolMessage.new( action: action, - connection_serial: 20, channel: channel_name, presence: presence_data, timestamp: Time.now.to_i * 1000 @@ -633,7 +632,6 @@ def allow_sync_fabricate_data_final_sync_and_assert_members action = Ably::Models::ProtocolMessage::ACTION.Presence presence_msg = Ably::Models::ProtocolMessage.new( action: action, - connection_serial: anonymous_client.connection.serial + 1, channel: channel_name, presence: presence_data, timestamp: Time.now.to_i * 1000 @@ -644,7 +642,6 @@ def allow_sync_fabricate_data_final_sync_and_assert_members action = Ably::Models::ProtocolMessage::ACTION.Sync sync_msg = Ably::Models::ProtocolMessage.new( action: action, - connection_serial: anonymous_client.connection.serial + 2, channel: channel_name, channel_serial: 'validserialprefix:', # with no part after the `:` this indicates the end to the SYNC presence: [], @@ -2243,7 +2240,6 @@ def connect_members_deferrables action = Ably::Models::ProtocolMessage::ACTION.Sync sync_message = Ably::Models::ProtocolMessage.new( action: action, - connection_serial: 10, channel_serial: 'sequenceid:cursor', channel: channel_name, presence: presence_sync_1, @@ -2253,7 +2249,6 @@ def connect_members_deferrables sync_message = Ably::Models::ProtocolMessage.new( action: action, - connection_serial: 11, channel_serial: 'sequenceid:', # indicates SYNC is complete channel: channel_name, presence: presence_sync_2, @@ -2294,7 +2289,6 @@ def connect_members_deferrables action = Ably::Models::ProtocolMessage::ACTION.Sync sync_message = Ably::Models::ProtocolMessage.new( action: action, - connection_serial: 10, channel: channel_name, presence: presence_sync, timestamp: Time.now.to_i * 1000 @@ -2348,7 +2342,6 @@ def connect_members_deferrables action = Ably::Models::ProtocolMessage::ACTION.Sync sync_message = Ably::Models::ProtocolMessage.new( action: action, - connection_serial: 10, channel: channel_name, presence: presence_sync_protocol_message, timestamp: Time.now.to_i * 1000 diff --git a/spec/unit/models/protocol_message_spec.rb b/spec/unit/models/protocol_message_spec.rb index ddfea5733..a6d7d6be4 100644 --- a/spec/unit/models/protocol_message_spec.rb +++ b/spec/unit/models/protocol_message_spec.rb @@ -127,14 +127,6 @@ def new_protocol_message(options) end end - context '#connection_serial' do - let(:protocol_message) { new_protocol_message(connection_serial: "55") } - it 'converts :connection_serial to an Integer' do - expect(protocol_message.connection_serial).to be_a(Integer) - expect(protocol_message.connection_serial).to eql(55) - end - end - context '#flags (#TR4i)' do context 'when nil' do let(:protocol_message) { new_protocol_message({}) } @@ -241,52 +233,18 @@ def new_protocol_message(options) end end - context '#has_connection_serial?' do - context 'without connection_serial' do - let(:protocol_message) { new_protocol_message({}) } - - it 'returns false' do - expect(protocol_message.has_connection_serial?).to eql(false) - end - end - - context 'with connection_serial' do - let(:protocol_message) { new_protocol_message(connection_serial: "55") } - - it 'returns true' do - expect(protocol_message.has_connection_serial?).to eql(true) - end - end - end - context '#serial' do context 'with underlying msg_serial' do let(:protocol_message) { new_protocol_message(msg_serial: "55") } it 'converts :msg_serial to an Integer' do - expect(protocol_message.serial).to be_a(Integer) - expect(protocol_message.serial).to eql(55) - end - end - - context 'with underlying connection_serial' do - let(:protocol_message) { new_protocol_message(connection_serial: "55") } - it 'converts :connection_serial to an Integer' do - expect(protocol_message.serial).to be_a(Integer) - expect(protocol_message.serial).to eql(55) - end - end - - context 'with underlying connection_serial and msg_serial' do - let(:protocol_message) { new_protocol_message(connection_serial: "99", msg_serial: "11") } - it 'prefers connection_serial and converts :connection_serial to an Integer' do - expect(protocol_message.serial).to be_a(Integer) - expect(protocol_message.serial).to eql(99) + expect(protocol_message.message_serial).to be_a(Integer) + expect(protocol_message.message_serial).to eql(55) end end end context '#has_serial?' do - context 'without msg_serial or connection_serial' do + context 'without msg_serial' do let(:protocol_message) { new_protocol_message({}) } it 'returns false' do @@ -301,14 +259,6 @@ def new_protocol_message(options) expect(protocol_message.has_serial?).to eql(true) end end - - context 'with connection_serial' do - let(:protocol_message) { new_protocol_message(connection_serial: "55") } - - it 'returns true' do - expect(protocol_message.has_serial?).to eql(true) - end - end end context '#error' do