diff --git a/lib/ably/realtime/channel.rb b/lib/ably/realtime/channel.rb index 162b153b..af1772c0 100644 --- a/lib/ably/realtime/channel.rb +++ b/lib/ably/realtime/channel.rb @@ -364,8 +364,8 @@ def __incoming_msgbus__ # @return [Ably::Models::ChannelOptions] def set_options(channel_options) @options = Ably::Models::ChannelOptions(channel_options) - - manager.request_reattach if need_reattach? + # RTL4i + manager.request_reattach if (need_reattach? and connection.state?(:connected)) end alias options= set_options diff --git a/lib/ably/realtime/channel/channel_manager.rb b/lib/ably/realtime/channel/channel_manager.rb index 9c0fdc02..90f00b9e 100644 --- a/lib/ably/realtime/channel/channel_manager.rb +++ b/lib/ably/realtime/channel/channel_manager.rb @@ -18,7 +18,7 @@ def initialize(channel, connection) def attach if can_transition_to?(:attached) connect_if_connection_initialized - send_attach_protocol_message + send_attach_protocol_message if connection.state?(:connected) # RTL4i end end @@ -49,14 +49,12 @@ def log_channel_error(error) end # Request channel to be reattached by sending an attach protocol message - # @param [Hash] options - # @option options [Ably::Models::ErrorInfo] :reason - def request_reattach(options = {}) - reason = options[:reason] - send_attach_protocol_message - logger.debug { "Explicit channel reattach request sent to Ably due to #{reason}" } + # @param [Ably::Models::ErrorInfo] reason + def request_reattach(reason = nil) channel.set_channel_error_reason(reason) if reason channel.transition_state_machine! :attaching, reason: reason unless channel.attaching? + send_attach_protocol_message + logger.debug { "Explicit channel reattach request sent to Ably due to #{reason}" } end def duplicate_attached_received(protocol_message) @@ -169,6 +167,12 @@ def start_attach_from_suspended_timer end end + # RTL13c + def notify_state_change + @pending_state_change_timer.cancel if @pending_state_change_timer + @pending_state_change_timer = nil + end + private attr_reader :pending_state_change_timer @@ -209,48 +213,55 @@ def send_attach_protocol_message end message_options[:channelSerial] = channel.properties.channel_serial # RTL4c1 - send_state_change_protocol_message Ably::Models::ProtocolMessage::ACTION.Attach, :suspended, message_options - end - def send_detach_protocol_message(previous_state) - send_state_change_protocol_message Ably::Models::ProtocolMessage::ACTION.Detach, previous_state # return to previous state if failed - end - - def send_state_change_protocol_message(new_state, state_if_failed, message_options = {}) state_at_time_of_request = channel.state + attach_action = Ably::Models::ProtocolMessage::ACTION.Attach + # RTL4f @pending_state_change_timer = EventMachine::Timer.new(realtime_request_timeout) do if channel.state == state_at_time_of_request - error = Ably::Models::ErrorInfo.new(code: Ably::Exceptions::Codes::CHANNEL_OPERATION_FAILED_NO_RESPONSE_FROM_SERVER, message: "Channel #{new_state} operation failed (timed out)") - channel.transition_state_machine state_if_failed, reason: error + error = Ably::Models::ErrorInfo.new(code: Ably::Exceptions::Codes::CHANNEL_OPERATION_FAILED_NO_RESPONSE_FROM_SERVER, message: "Channel #{attach_action} operation failed (timed out)") + channel.transition_state_machine :suspended, reason: error # return to suspended state if failed end end + # Shouldn't queue attach message as per RTL4i, so message is added top of the queue + # to be sent immediately while processing next message + connection.send_protocol_message_immediately( + action: attach_action.to_i, + channel: channel.name, + **message_options.to_h + ) + end - channel.once_state_changed do - @pending_state_change_timer.cancel if @pending_state_change_timer - @pending_state_change_timer = nil + def send_detach_protocol_message(previous_state) + state_at_time_of_request = channel.state + detach_action = Ably::Models::ProtocolMessage::ACTION.Detach + + @pending_state_change_timer = EventMachine::Timer.new(realtime_request_timeout) do + if channel.state == state_at_time_of_request + error = Ably::Models::ErrorInfo.new(code: Ably::Exceptions::Codes::CHANNEL_OPERATION_FAILED_NO_RESPONSE_FROM_SERVER, message: "Channel #{detach_action} operation failed (timed out)") + channel.transition_state_machine previous_state, reason: error # return to previous state if failed + end end - resend_if_disconnected_and_connected = lambda do + on_disconnected_and_connected = lambda do connection.unsafe_once(:disconnected) do - next unless pending_state_change_timer connection.unsafe_once(:connected) do - next unless pending_state_change_timer - connection.send_protocol_message( - action: new_state.to_i, - channel: channel.name, - **message_options.to_h - ) - resend_if_disconnected_and_connected.call - end + yield if pending_state_change_timer + end if pending_state_change_timer end end - resend_if_disconnected_and_connected.call - connection.send_protocol_message( - action: new_state.to_i, - channel: channel.name, - **message_options.to_h - ) + send_detach_message = lambda do + on_disconnected_and_connected.call do + send_detach_message.call + end + connection.send_protocol_message( + action: detach_action.to_i, + channel: channel.name + ) + end + + send_detach_message.call end def logger diff --git a/lib/ably/realtime/channel/channel_state_machine.rb b/lib/ably/realtime/channel/channel_state_machine.rb index a346fec2..e3cedd18 100644 --- a/lib/ably/realtime/channel/channel_state_machine.rb +++ b/lib/ably/realtime/channel/channel_state_machine.rb @@ -29,6 +29,7 @@ class ChannelStateMachine transition :from => :failed, :to => [:attaching, :initialized] after_transition do |channel, transition| + channel.manager.notify_state_change # RTL13c channel.synchronize_state_with_statemachine end diff --git a/lib/ably/realtime/connection.rb b/lib/ably/realtime/connection.rb index 008a2116..d4d11289 100644 --- a/lib/ably/realtime/connection.rb +++ b/lib/ably/realtime/connection.rb @@ -431,17 +431,28 @@ def logger # @api private def send_protocol_message(protocol_message) add_message_serial_if_ack_required_to(protocol_message) do - Ably::Models::ProtocolMessage.new(protocol_message, logger: logger).tap do |message| - add_message_to_outgoing_queue message - notify_message_dispatcher_of_new_message message - logger.debug { "Connection: Prot msg queued =>: #{message.action} #{message}" } - end + message = Ably::Models::ProtocolMessage.new(protocol_message, logger: logger) + add_message_to_outgoing_queue(message) + notify_message_dispatcher_of_new_message message end end + def send_protocol_message_immediately(protocol_message) + message = Ably::Models::ProtocolMessage.new(protocol_message, logger: logger) + add_message_to_outgoing_queue(message, true) + notify_message_dispatcher_of_new_message message + end + # @api private - def add_message_to_outgoing_queue(protocol_message) - __outgoing_message_queue__ << protocol_message + def add_message_to_outgoing_queue(protocol_message, send_immediately = false) + if send_immediately + # Adding msg at the top of the queue to get processed immediately while connection is CONNECTED + __outgoing_message_queue__.prepend(protocol_message) + logger.debug { "Connection: protocol msg pushed at the top =>: #{message.action} #{message}" } + else + __outgoing_message_queue__ << protocol_message + logger.debug { "Connection: protocol msg queued =>: #{message.action} #{message}" } + end end # @api private diff --git a/lib/ably/realtime/connection/connection_manager.rb b/lib/ably/realtime/connection/connection_manager.rb index cd62f637..5c2d2ab8 100644 --- a/lib/ably/realtime/connection/connection_manager.rb +++ b/lib/ably/realtime/connection/connection_manager.rb @@ -582,7 +582,7 @@ def force_reattach_on_channels(error) channels.select do |channel| channel.attached? || channel.attaching? || channel.suspended? end.each do |channel| - channel.manager.request_reattach reason: error + channel.manager.request_reattach error end end diff --git a/spec/acceptance/realtime/channel_spec.rb b/spec/acceptance/realtime/channel_spec.rb index d2c4250e..5c7bc06e 100644 --- a/spec/acceptance/realtime/channel_spec.rb +++ b/spec/acceptance/realtime/channel_spec.rb @@ -430,9 +430,35 @@ def disconnect_transport end context 'with connection state' do + + sent_attach_messages = [] + received_attached_messages = [] + before(:each) do + sent_attach_messages = [] + received_attached_messages = [] + client.connection.__outgoing_protocol_msgbus__.subscribe do |message| + if message.action == :attach + sent_attach_messages << message + end + end + client.connection.__incoming_protocol_msgbus__.subscribe do |message| + if message.action == :attached + received_attached_messages << message + end + end + end + + # Should send/receive attach/attached message only once + # No duplicates should be sent or received + let(:check_for_attach_messages) do + expect(sent_attach_messages.size).to eq(1) + expect(received_attached_messages.size).to eq(1) + end + it 'is initialized (#RTL4i)' do expect(connection).to be_initialized channel.attach do + check_for_attach_messages stop_reactor end end @@ -440,6 +466,7 @@ def disconnect_transport it 'is connecting (#RTL4i)' do connection.once(:connecting) do channel.attach do + check_for_attach_messages stop_reactor end end @@ -449,6 +476,7 @@ def disconnect_transport connection.once(:connected) do connection.once(:disconnected) do channel.attach do + check_for_attach_messages stop_reactor end end