Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ECO-4687][Protocol-2] Fix duplicate outgoing ATTACH msg #416

Merged
33 changes: 26 additions & 7 deletions lib/ably/realtime/channel/channel_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def log_channel_error(error)
# @option options [Ably::Models::ErrorInfo] :reason
def request_reattach(options = {})
reason = options[:reason]
send_attach_protocol_message
send_attach_protocol_message(options[:forced_attach])
logger.debug { "Explicit channel reattach request sent to Ably due to #{reason}" }
channel.set_channel_error_reason(reason) if reason
channel.transition_state_machine! :attaching, reason: reason unless channel.attaching?
Expand Down Expand Up @@ -201,8 +201,9 @@ def channel_retry_timeout
connection.defaults.fetch(:channel_retry_timeout)
end

def send_attach_protocol_message
def send_attach_protocol_message(forced_attach = false)
message_options = {}
message_options[:forced_attach] = forced_attach
message_options[:params] = channel.options.params if channel.options.params.any?
message_options[:flags] = channel.options.modes_to_flags if channel.options.modes
if channel.attach_resume
Expand All @@ -217,6 +218,11 @@ def send_detach_protocol_message(previous_state)
send_state_change_protocol_message Ably::Models::ProtocolMessage::ACTION.Detach, previous_state # return to previous state if failed
end

def notify_state_change
@pending_state_change_timer.cancel if @pending_state_change_timer
@pending_state_change_timer = nil
end

def send_state_change_protocol_message(new_state, state_if_failed, message_options = {})
state_at_time_of_request = channel.state
@pending_state_change_timer = EventMachine::Timer.new(realtime_request_timeout) do
Expand All @@ -226,11 +232,6 @@ def send_state_change_protocol_message(new_state, state_if_failed, message_optio
end
end

channel.once_state_changed do
@pending_state_change_timer.cancel if @pending_state_change_timer
@pending_state_change_timer = nil
end

resend_if_disconnected_and_connected = lambda do
connection.unsafe_once(:disconnected) do
next unless pending_state_change_timer
sacOO7 marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -245,6 +246,24 @@ def send_state_change_protocol_message(new_state, state_if_failed, message_optio
end
end
end

# Attach is sent on every connected msg received as per RTN15c6, RTN15c7
# So, no need to introduce logic that sends attach on disconnect and connect
if new_state == Ably::Models::ProtocolMessage::ACTION.Attach
# Sends attach message only if it's forced_attach/connected_msg_received or
# connection state is connected.
if message_options.delete(:forced_attach) || connection.state?(:connected)
# Shouldn't queue attach message as per RTL4i, so message is added top of the queue
# to be sent immediately while processing next message
connection.send_protocol_message_immediately(
action: new_state.to_i,
channel: channel.name,
**message_options.to_h
)
end
return
end

resend_if_disconnected_and_connected.call

connection.send_protocol_message(
Expand Down
1 change: 1 addition & 0 deletions lib/ably/realtime/channel/channel_state_machine.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class ChannelStateMachine
transition :from => :failed, :to => [:attaching, :initialized]

after_transition do |channel, transition|
channel.manager.send(:notify_state_change)
channel.synchronize_state_with_statemachine
end

Expand Down
17 changes: 15 additions & 2 deletions lib/ably/realtime/connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -439,9 +439,22 @@ def send_protocol_message(protocol_message)
end
end

def send_protocol_message_immediately(protocol_message)
Ably::Models::ProtocolMessage.new(protocol_message, logger: logger).tap do |message|
lawrence-forooghian marked this conversation as resolved.
Show resolved Hide resolved
add_message_to_outgoing_queue(message, true)
notify_message_dispatcher_of_new_message message
logger.debug { "Connection: Prot msg pushed at the top =>: #{message.action} #{message}" }
end
end

# @api private
def add_message_to_outgoing_queue(protocol_message)
__outgoing_message_queue__ << protocol_message
def add_message_to_outgoing_queue(protocol_message, send_immediately = false)
if send_immediately
# Adding msg at the top of the queue to get processed immediately while connection is CONNECTED
__outgoing_message_queue__.prepend(protocol_message)
else
__outgoing_message_queue__ << protocol_message
end
end

# @api private
Expand Down
2 changes: 1 addition & 1 deletion lib/ably/realtime/connection/connection_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -583,7 +583,7 @@ def force_reattach_on_channels(error)
channels.select do |channel|
channel.attached? || channel.attaching? || channel.suspended?
end.each do |channel|
channel.manager.request_reattach reason: error
channel.manager.request_reattach reason: error, forced_attach: true
end
end

Expand Down
Loading