Skip to content

Commit

Permalink
Merge pull request #416 from ably/fix/duplicate-attach-msg-send
Browse files Browse the repository at this point in the history
[ECO-4687][Protocol-2] Fix duplicate outgoing ATTACH msg
  • Loading branch information
sacOO7 authored Jul 4, 2024
2 parents 7be9e8c + 47c55d7 commit 9981082
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 45 deletions.
4 changes: 2 additions & 2 deletions lib/ably/realtime/channel.rb
Original file line number Diff line number Diff line change
Expand Up @@ -364,8 +364,8 @@ def __incoming_msgbus__
# @return [Ably::Models::ChannelOptions]
def set_options(channel_options)
@options = Ably::Models::ChannelOptions(channel_options)

manager.request_reattach if need_reattach?
# RTL4i
manager.request_reattach if (need_reattach? and connection.state?(:connected))
end
alias options= set_options

Expand Down
81 changes: 46 additions & 35 deletions lib/ably/realtime/channel/channel_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def initialize(channel, connection)
def attach
if can_transition_to?(:attached)
connect_if_connection_initialized
send_attach_protocol_message
send_attach_protocol_message if connection.state?(:connected) # RTL4i
end
end

Expand Down Expand Up @@ -49,14 +49,12 @@ def log_channel_error(error)
end

# Request channel to be reattached by sending an attach protocol message
# @param [Hash] options
# @option options [Ably::Models::ErrorInfo] :reason
def request_reattach(options = {})
reason = options[:reason]
send_attach_protocol_message
logger.debug { "Explicit channel reattach request sent to Ably due to #{reason}" }
# @param [Ably::Models::ErrorInfo] reason
def request_reattach(reason = nil)
channel.set_channel_error_reason(reason) if reason
channel.transition_state_machine! :attaching, reason: reason unless channel.attaching?
send_attach_protocol_message
logger.debug { "Explicit channel reattach request sent to Ably due to #{reason}" }
end

def duplicate_attached_received(protocol_message)
Expand Down Expand Up @@ -169,6 +167,12 @@ def start_attach_from_suspended_timer
end
end

# RTL13c
def notify_state_change
@pending_state_change_timer.cancel if @pending_state_change_timer
@pending_state_change_timer = nil
end

private
attr_reader :pending_state_change_timer

Expand Down Expand Up @@ -209,48 +213,55 @@ def send_attach_protocol_message
end

message_options[:channelSerial] = channel.properties.channel_serial # RTL4c1
send_state_change_protocol_message Ably::Models::ProtocolMessage::ACTION.Attach, :suspended, message_options
end

def send_detach_protocol_message(previous_state)
send_state_change_protocol_message Ably::Models::ProtocolMessage::ACTION.Detach, previous_state # return to previous state if failed
end

def send_state_change_protocol_message(new_state, state_if_failed, message_options = {})
state_at_time_of_request = channel.state
attach_action = Ably::Models::ProtocolMessage::ACTION.Attach
# RTL4f
@pending_state_change_timer = EventMachine::Timer.new(realtime_request_timeout) do
if channel.state == state_at_time_of_request
error = Ably::Models::ErrorInfo.new(code: Ably::Exceptions::Codes::CHANNEL_OPERATION_FAILED_NO_RESPONSE_FROM_SERVER, message: "Channel #{new_state} operation failed (timed out)")
channel.transition_state_machine state_if_failed, reason: error
error = Ably::Models::ErrorInfo.new(code: Ably::Exceptions::Codes::CHANNEL_OPERATION_FAILED_NO_RESPONSE_FROM_SERVER, message: "Channel #{attach_action} operation failed (timed out)")
channel.transition_state_machine :suspended, reason: error # return to suspended state if failed
end
end
# Shouldn't queue attach message as per RTL4i, so message is added top of the queue
# to be sent immediately while processing next message
connection.send_protocol_message_immediately(
action: attach_action.to_i,
channel: channel.name,
**message_options.to_h
)
end

channel.once_state_changed do
@pending_state_change_timer.cancel if @pending_state_change_timer
@pending_state_change_timer = nil
def send_detach_protocol_message(previous_state)
state_at_time_of_request = channel.state
detach_action = Ably::Models::ProtocolMessage::ACTION.Detach

@pending_state_change_timer = EventMachine::Timer.new(realtime_request_timeout) do
if channel.state == state_at_time_of_request
error = Ably::Models::ErrorInfo.new(code: Ably::Exceptions::Codes::CHANNEL_OPERATION_FAILED_NO_RESPONSE_FROM_SERVER, message: "Channel #{detach_action} operation failed (timed out)")
channel.transition_state_machine previous_state, reason: error # return to previous state if failed
end
end

resend_if_disconnected_and_connected = lambda do
on_disconnected_and_connected = lambda do
connection.unsafe_once(:disconnected) do
next unless pending_state_change_timer
connection.unsafe_once(:connected) do
next unless pending_state_change_timer
connection.send_protocol_message(
action: new_state.to_i,
channel: channel.name,
**message_options.to_h
)
resend_if_disconnected_and_connected.call
end
yield if pending_state_change_timer
end if pending_state_change_timer
end
end
resend_if_disconnected_and_connected.call

connection.send_protocol_message(
action: new_state.to_i,
channel: channel.name,
**message_options.to_h
)
send_detach_message = lambda do
on_disconnected_and_connected.call do
send_detach_message.call
end
connection.send_protocol_message(
action: detach_action.to_i,
channel: channel.name
)
end

send_detach_message.call
end

def logger
Expand Down
1 change: 1 addition & 0 deletions lib/ably/realtime/channel/channel_state_machine.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class ChannelStateMachine
transition :from => :failed, :to => [:attaching, :initialized]

after_transition do |channel, transition|
channel.manager.notify_state_change # RTL13c
channel.synchronize_state_with_statemachine
end

Expand Down
25 changes: 18 additions & 7 deletions lib/ably/realtime/connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -431,17 +431,28 @@ def logger
# @api private
def send_protocol_message(protocol_message)
add_message_serial_if_ack_required_to(protocol_message) do
Ably::Models::ProtocolMessage.new(protocol_message, logger: logger).tap do |message|
add_message_to_outgoing_queue message
notify_message_dispatcher_of_new_message message
logger.debug { "Connection: Prot msg queued =>: #{message.action} #{message}" }
end
message = Ably::Models::ProtocolMessage.new(protocol_message, logger: logger)
add_message_to_outgoing_queue(message)
notify_message_dispatcher_of_new_message message
end
end

def send_protocol_message_immediately(protocol_message)
message = Ably::Models::ProtocolMessage.new(protocol_message, logger: logger)
add_message_to_outgoing_queue(message, true)
notify_message_dispatcher_of_new_message message
end

# @api private
def add_message_to_outgoing_queue(protocol_message)
__outgoing_message_queue__ << protocol_message
def add_message_to_outgoing_queue(protocol_message, send_immediately = false)
if send_immediately
# Adding msg at the top of the queue to get processed immediately while connection is CONNECTED
__outgoing_message_queue__.prepend(protocol_message)
logger.debug { "Connection: protocol msg pushed at the top =>: #{message.action} #{message}" }
else
__outgoing_message_queue__ << protocol_message
logger.debug { "Connection: protocol msg queued =>: #{message.action} #{message}" }
end
end

# @api private
Expand Down
2 changes: 1 addition & 1 deletion lib/ably/realtime/connection/connection_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -582,7 +582,7 @@ def force_reattach_on_channels(error)
channels.select do |channel|
channel.attached? || channel.attaching? || channel.suspended?
end.each do |channel|
channel.manager.request_reattach reason: error
channel.manager.request_reattach error
end
end

Expand Down
28 changes: 28 additions & 0 deletions spec/acceptance/realtime/channel_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -430,16 +430,43 @@ def disconnect_transport
end

context 'with connection state' do

sent_attach_messages = []
received_attached_messages = []
before(:each) do
sent_attach_messages = []
received_attached_messages = []
client.connection.__outgoing_protocol_msgbus__.subscribe do |message|
if message.action == :attach
sent_attach_messages << message
end
end
client.connection.__incoming_protocol_msgbus__.subscribe do |message|
if message.action == :attached
received_attached_messages << message
end
end
end

# Should send/receive attach/attached message only once
# No duplicates should be sent or received
let(:check_for_attach_messages) do
expect(sent_attach_messages.size).to eq(1)
expect(received_attached_messages.size).to eq(1)
end

it 'is initialized (#RTL4i)' do
expect(connection).to be_initialized
channel.attach do
check_for_attach_messages
stop_reactor
end
end

it 'is connecting (#RTL4i)' do
connection.once(:connecting) do
channel.attach do
check_for_attach_messages
stop_reactor
end
end
Expand All @@ -449,6 +476,7 @@ def disconnect_transport
connection.once(:connected) do
connection.once(:disconnected) do
channel.attach do
check_for_attach_messages
stop_reactor
end
end
Expand Down

0 comments on commit 9981082

Please sign in to comment.