Skip to content

Commit

Permalink
removed all unnecessary references related to connection serial
Browse files Browse the repository at this point in the history
  • Loading branch information
sacOO7 committed May 16, 2024
1 parent d654665 commit 148ba15
Show file tree
Hide file tree
Showing 8 changed files with 33 additions and 268 deletions.
27 changes: 2 additions & 25 deletions lib/ably/models/protocol_message.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ module Ably::Models
# @return [String] Contains a serial number for a message on the current channel
# @!attribute [r] connection_id
# @return [String] Contains a string private connection key used to recover this connection
# @!attribute [r] connection_serial
# @return [Bignum] Contains a serial number for a message sent from the server to the client
# @!attribute [r] message_serial
# @return [Bignum] Contains a serial number for a message sent from the client to the server
# @!attribute [r] timestamp
Expand Down Expand Up @@ -129,12 +127,6 @@ def message_serial
raise TypeError, "msg_serial '#{attributes[:msg_serial]}' is invalid, a positive Integer is expected for a ProtocolMessage"
end

def connection_serial
Integer(attributes[:connection_serial])
rescue TypeError
raise TypeError, "connection_serial '#{attributes[:connection_serial]}' is invalid, a positive Integer is expected for a ProtocolMessage"
end

def count
[1, attributes[:count].to_i].max
end
Expand All @@ -146,24 +138,9 @@ def has_message_serial?
false
end

# @api private
def has_connection_serial?
connection_serial && true
rescue TypeError
false
end

def serial
if has_connection_serial?
connection_serial
else
message_serial
end
end

# @api private
def has_serial?
has_connection_serial? || has_message_serial?
has_message_serial?
end

def messages
Expand Down Expand Up @@ -271,7 +248,7 @@ def attributes
# Return a JSON ready object from the underlying #attributes using Ably naming conventions for keys
def as_json(*args)
raise TypeError, ':action is missing, cannot generate a valid Hash for ProtocolMessage' unless action
raise TypeError, ':msg_serial or :connection_serial is missing, cannot generate a valid Hash for ProtocolMessage' if ack_required? && !has_serial?
raise TypeError, ':msg_serial is missing, cannot generate a valid Hash for ProtocolMessage' if ack_required? && !has_serial?

attributes.dup.tap do |hash_object|
hash_object['action'] = action.to_i
Expand Down
9 changes: 1 addition & 8 deletions lib/ably/realtime/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ def initialize(options)
recovery_context = RecoveryKeyContext.from_json(@recover)
unless recovery_context.nil?
@channels.set_channel_serials recovery_context.channel_serials
@connection.client_msg_serial = recovery_context.msg_serial # RTN16f
@connection.message_serial = recovery_context.msg_serial # RTN16f
end
end
end
Expand Down Expand Up @@ -308,13 +308,6 @@ def logger
@logger ||= Ably::Logger.new(self, log_level, rest_client.logger.custom_logger)
end

# Disable connection recovery, typically used after a connection has been recovered
# @return [void]
# @api private
def disable_automatic_connection_recovery
@recover = nil
end

# @!attribute [r] fallback_endpoint
# @return [URI::Generic] Fallback endpoint used to connect to the realtime Ably service. Note, after each connection attempt, a new random {Ably::FALLBACK_HOSTS fallback host} or provided fallback hosts are used
# @api private
Expand Down
13 changes: 0 additions & 13 deletions lib/ably/realtime/client/incoming_message_dispatcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,6 @@ def dispatch_protocol_message(*args)
logger.debug { "#{protocol_message.action} received: #{protocol_message}" }
end

if protocol_message.action.match_any?(:sync, :presence, :message)
if connection.serial && protocol_message.has_connection_serial? && protocol_message.connection_serial <= connection.serial
error_message = "Protocol error, duplicate message received for serial #{protocol_message.connection_serial}"
logger.error error_message
return
end
end

update_connection_recovery_info protocol_message
connection.set_connection_confirmed_alive

case protocol_message.action
Expand Down Expand Up @@ -172,10 +163,6 @@ def process_connected_update_message(protocol_message)
end
end

def update_connection_recovery_info(protocol_message)
connection.update_connection_serial protocol_message.connection_serial if protocol_message.has_connection_serial?
end

def ack_pending_queue_for_message_serial(ack_protocol_message)
drop_pending_queue_from_ack(ack_protocol_message) do |protocol_message|
ack_messages protocol_message.messages
Expand Down
86 changes: 18 additions & 68 deletions lib/ably/realtime/connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,6 @@ class Connection
include Ably::Modules::UsesStateMachine
ensure_state_machine_emits 'Ably::Models::ConnectionStateChange'

# Expected format for a connection recover key
RECOVER_REGEX = /^(?<recover>[^:]+):(?<connection_serial>[^:]+):(?<msg_serial>\-?\d+)$/

# Defaults for automatic connection recovery and timeouts
DEFAULTS = {
channel_retry_timeout: 15, # when a channel becomes SUSPENDED, after this delay in seconds, the channel will automatically attempt to reattach if the connection is CONNECTED
Expand Down Expand Up @@ -121,7 +118,7 @@ class Connection
#
# @return [Integer]
#
attr_reader :serial
attr_reader :message_serial

# An {Ably::Models::ErrorInfo} object describing the last error received if a connection failure occurs.
#
Expand Down Expand Up @@ -177,17 +174,6 @@ def initialize(client, options)
end if options.kind_of?(Hash)
@defaults.freeze

# If a recover client options is provided, then we need to ensure that the msgSerial matches the
# recover serial immediately at client library instantiation. This is done immediately so that any queued
# publishes use the correct serial number for these queued messages as well.
# There is no harm if the msgSerial is higher than expected if the recover fails.
recovery_msg_serial = connection_recover_parts && connection_recover_parts[:msg_serial].to_i
if recovery_msg_serial
@client_msg_serial = recovery_msg_serial
else
reset_client_msg_serial
end

Client::IncomingMessageDispatcher.new client, self
Client::OutgoingMessageDispatcher.new client, self

Expand Down Expand Up @@ -354,34 +340,24 @@ def internet_up?
# @return [String]
#
def recovery_key
"#{key}:#{serial}:#{client_msg_serial}" if connection_resumable?
"#{key}:#{serial}:#{message_serial}" if connection_resumable?
end

# Following a new connection being made, the connection ID, connection key
# and connection serial need to match the details provided by the server.
# need to match the details provided by the server.
#
# @return [void]
# @api private
def configure_new(connection_id, connection_key, connection_serial)
def configure_new(connection_id, connection_key)
@id = connection_id
@key = connection_key

update_connection_serial connection_serial
end

# Store last received connection serial so that the connection can be resumed from the last known point-in-time
# @return [void]
# @api private
def update_connection_serial(connection_serial)
@serial = connection_serial
end

# Disable automatic resume of a connection
# @return [void]
# @api private
def reset_resume_info
@key = nil
@serial = nil
end

# @!attribute [r] __outgoing_protocol_msgbus__
Expand Down Expand Up @@ -486,14 +462,15 @@ def create_websocket_transport
url_params['clientId'] = client.auth.client_id if client.auth.has_client_id?
url_params.merge!(client.transport_params)

if connection_resumable?
url_params.merge! resume: key, connection_serial: serial
logger.debug { "Resuming connection key #{key} with serial #{serial}" }
elsif connection_recoverable?
url_params.merge! recover: connection_recover_parts[:recover], connectionSerial: connection_recover_parts[:connection_serial]
logger.debug { "Recovering connection with key #{client.recover}" }
unsafe_once(:connected, :closed, :failed) do
client.disable_automatic_connection_recovery
if not (key.nil? || key.empty?)
url_params.merge! resume: key
logger.debug { "Resuming connection with key #{key}" }
elsif not (client.recover.nil? || client.recover.empty?)
recovery_context = RecoveryKeyContext.from_json client.recover
unless recovery_context.nil?
key = recovery_context.connection_key
logger.debug { "Recovering connection with key #{key}" }
url_params.merge! resume: key
end
end

Expand Down Expand Up @@ -593,13 +570,6 @@ def heartbeat_interval
defaults.fetch(:realtime_request_timeout)
end

# Resets the client message serial (msgSerial) sent to Ably for each new {Ably::Models::ProtocolMessage}
# (see #client_msg_serial)
# @api private
def reset_client_msg_serial
@client_msg_serial = -1
end

# When a hearbeat or any other message from Ably is received
# we know it's alive, see #RTN23
# @api private
Expand All @@ -617,21 +587,18 @@ def time_since_connection_confirmed_alive?
# #transition_state_machine must be used instead
private :change_state

def client_msg_serial=(serial)
@client_msg_serial = serial
def message_serial=(serial)
@message_serial = serial
end

private

# The client message serial (msgSerial) is incremented for every message that is published that requires an ACK.
# Note that this is different to the connection serial that contains the last known serial number
# received from the server.
#
# A message serial number does not guarantee a message has been received, only sent.
# A connection serial guarantees the server has received the message and is thus used for connection recovery and resumes.
# @return [Integer] starting at -1 indicating no messages sent, 0 when the first message is sent
def client_msg_serial
@client_msg_serial
def message_serial
@message_serial
end

def resume_callbacks
Expand All @@ -656,23 +623,14 @@ def add_message_serial_if_ack_required_to(protocol_message)
end

def add_message_serial_to(protocol_message)
@client_msg_serial += 1
protocol_message[:msgSerial] = client_msg_serial
yield
rescue StandardError => e
@client_msg_serial -= 1
raise e

end

# Simply wait until the next EventMachine tick to ensure Connection initialization is complete
def when_initialized
EventMachine.next_tick { yield }
end

def connection_resumable?
!key.nil? && !serial.nil? && connection_state_available?
end

def connection_state_available?
return true if connected?

Expand All @@ -686,14 +644,6 @@ def connection_state_available?
end
end

def connection_recoverable?
connection_recover_parts
end

def connection_recover_parts
client.recover.to_s.match(RECOVER_REGEX)
end

def production?
client.environment.nil? || client.environment == :production
end
Expand Down
4 changes: 2 additions & 2 deletions lib/ably/realtime/connection/connection_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ def connected(protocol_message)

reattach_suspended_channels protocol_message.error

connection.configure_new protocol_message.connection_id, protocol_message.connection_details.connection_key, protocol_message.connection_serial
connection.configure_new protocol_message.connection_id, protocol_message.connection_details.connection_key
end

# When connection is CONNECTED and receives an update
Expand All @@ -139,7 +139,7 @@ def connected_update(protocol_message)
# Update the connection details and any associated defaults
connection.set_connection_details protocol_message.connection_details

connection.configure_new protocol_message.connection_id, protocol_message.connection_details.connection_key, protocol_message.connection_serial
connection.configure_new protocol_message.connection_id, protocol_message.connection_details.connection_key

state_change = Ably::Models::ConnectionStateChange.new(
current: connection.state,
Expand Down
Loading

0 comments on commit 148ba15

Please sign in to comment.