Skip to content

Commit

Permalink
feat(retry): moving retry logic away from EventEngine
Browse files Browse the repository at this point in the history
  • Loading branch information
jguz-pubnub committed Apr 8, 2024
1 parent f744f43 commit d24c0f6
Show file tree
Hide file tree
Showing 22 changed files with 85 additions and 1,616 deletions.
72 changes: 53 additions & 19 deletions PubNub.xcodeproj/project.pbxproj

Large diffs are not rendered by default.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -41,19 +41,6 @@ class PresenceEffectFactory: EffectHandlerFactory {
sessionResponseQueue: sessionResponseQueue
)
)
case let .delayedHeartbeat(channels, groups, retryAttempt, reason):
return DelayedHeartbeatEffect(
request: PresenceHeartbeatRequest(
channels: channels,
groups: groups,
channelStates: presenceStateContainer.getStates(forChannels: channels),
configuration: dependencies.value.configuration,
session: session,
sessionResponseQueue: sessionResponseQueue
),
retryAttempt: retryAttempt,
reason: reason
)
case let .leave(channels, groups):
return LeaveEffect(
request: PresenceLeaveRequest(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class PresenceHeartbeatRequest {
)
request = session.request(
with: PresenceRouter(endpoint, configuration: configuration),
requestOperator: nil
requestOperator: configuration.automaticRetry?.retryOperator(for: .presence)
)
request?.validate().response(on: sessionResponseQueue, decoder: GenericServiceResponseDecoder()) { result in
switch result {
Expand All @@ -60,29 +60,4 @@ class PresenceHeartbeatRequest {
func cancel() {
request?.cancel(PubNubError(.clientCancelled))
}

func reconnectionDelay(dueTo error: PubNubError, retryAttempt: Int) -> TimeInterval? {
guard let automaticRetry = configuration.automaticRetry else {
return nil
}
guard automaticRetry.retryOperator(for: .presence) != nil else {
return nil
}
guard automaticRetry.retryLimit > retryAttempt else {
return nil
}
guard let underlyingError = error.underlying else {
return automaticRetry.policy.delay(for: retryAttempt)
}
guard let urlResponse = error.affected.findFirst(by: PubNubError.AffectedValue.response) else {
return nil
}

let shouldRetry = automaticRetry.shouldRetry(
response: urlResponse,
error: underlyingError
)

return shouldRetry ? automaticRetry.policy.delay(for: retryAttempt) : nil
}
}
13 changes: 0 additions & 13 deletions Sources/PubNub/EventEngine/Presence/Presence.swift
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,6 @@ extension Presence {
let input: PresenceInput
}

struct HeartbeatReconnecting: PresenceState {
let input: PresenceInput
let retryAttempt: Int
let error: PubNubError
}

struct HeartbeatFailed: PresenceState {
let input: PresenceInput
let error: PubNubError
Expand All @@ -74,7 +68,6 @@ extension Presence {
case timesUp
case heartbeatSuccess
case heartbeatFailed(error: PubNubError)
case heartbeatGiveUp(error: PubNubError)
}
}

Expand All @@ -90,20 +83,16 @@ extension Presence {
enum Invocation: AnyEffectInvocation {
case heartbeat(channels: [String], groups: [String])
case leave(channels: [String], groups: [String])
case delayedHeartbeat(channels: [String], groups: [String], retryAttempt: Int, error: PubNubError)
case wait

// swiftlint:disable:next nesting
enum Cancellable: AnyCancellableInvocation {
case wait
case delayedHeartbeat

var id: String {
switch self {
case .wait:
return "Presence.ScheduleNextHeartbeat"
case .delayedHeartbeat:
return "Presence.HeartbeatReconnect"
}
}
}
Expand All @@ -114,8 +103,6 @@ extension Presence {
return "Presence.Heartbeat"
case .wait:
return Cancellable.wait.id
case .delayedHeartbeat:
return Cancellable.delayedHeartbeat.id
case .leave:
return "Presence.Leave"
}
Expand Down
34 changes: 4 additions & 30 deletions Sources/PubNub/EventEngine/Presence/PresenceTransition.swift
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,9 @@ class PresenceTransition: TransitionProtocol {
case .left:
return !(state is Presence.HeartbeatInactive)
case .heartbeatSuccess:
return state is Presence.Heartbeating || state is Presence.HeartbeatReconnecting
return state is Presence.Heartbeating
case .heartbeatFailed:
return state is Presence.Heartbeating || state is Presence.HeartbeatReconnecting
case .heartbeatGiveUp:
return state is Presence.HeartbeatReconnecting
return state is Presence.Heartbeating
case .timesUp:
return state is Presence.HeartbeatCooldown
case .leftAll:
Expand All @@ -51,11 +49,6 @@ class PresenceTransition: TransitionProtocol {
channels: state.channels,
groups: state.input.groups
))]
case let state as Presence.HeartbeatReconnecting:
return [.managed(.delayedHeartbeat(
channels: state.channels, groups: state.groups,
retryAttempt: state.retryAttempt, error: state.error
))]
case is Presence.HeartbeatCooldown:
return [.managed(.wait)]
default:
Expand All @@ -67,8 +60,6 @@ class PresenceTransition: TransitionProtocol {
switch state {
case is Presence.HeartbeatCooldown:
return [.cancel(.wait)]
case is Presence.HeartbeatReconnecting:
return [.cancel(.delayedHeartbeat)]
default:
return []
}
Expand All @@ -85,9 +76,7 @@ class PresenceTransition: TransitionProtocol {
case .heartbeatSuccess:
results = heartbeatSuccessTransition(from: state)
case let .heartbeatFailed(error):
results = heartbeatReconnectingTransition(from: state, dueTo: error)
case let .heartbeatGiveUp(error):
results = heartbeatReconnectingGiveUpTransition(from: state, dueTo: error)
results = heartbeatFailedTransition(from: state, dueTo: error)
case .timesUp:
results = heartbeatingTransition(from: state)
case .leftAll:
Expand Down Expand Up @@ -156,22 +145,7 @@ fileprivate extension PresenceTransition {
}

fileprivate extension PresenceTransition {
func heartbeatReconnectingTransition(
from state: State,
dueTo error: PubNubError
) -> TransitionResult<State, Invocation> {
return TransitionResult(
state: Presence.HeartbeatReconnecting(
input: state.input,
retryAttempt: ((state as? Presence.HeartbeatReconnecting)?.retryAttempt ?? -1) + 1,
error: error
)
)
}
}

fileprivate extension PresenceTransition {
func heartbeatReconnectingGiveUpTransition(
func heartbeatFailedTransition(
from state: State,
dueTo error: PubNubError
) -> TransitionResult<State, Invocation> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,20 +45,6 @@ class SubscribeEffectFactory: EffectHandlerFactory {
sessionResponseQueue: sessionResponseQueue
), listeners: dependencies.value.listeners
)
case let .handshakeReconnect(channels, groups, retryAttempt, reason):
return HandshakeReconnectEffect(
request: SubscribeRequest(
configuration: dependencies.value.configuration,
channels: channels,
groups: groups,
channelStates: presenceStateContainer.getStates(forChannels: channels),
timetoken: 0,
session: session,
sessionResponseQueue: sessionResponseQueue
), listeners: dependencies.value.listeners,
error: reason,
retryAttempt: retryAttempt
)
case let .receiveMessages(channels, groups, cursor):
return ReceivingEffect(
request: SubscribeRequest(
Expand All @@ -72,21 +58,6 @@ class SubscribeEffectFactory: EffectHandlerFactory {
sessionResponseQueue: sessionResponseQueue
), listeners: dependencies.value.listeners
)
case let .receiveReconnect(channels, groups, cursor, retryAttempt, reason):
return ReceiveReconnectEffect(
request: SubscribeRequest(
configuration: dependencies.value.configuration,
channels: channels,
groups: groups,
channelStates: [:],
timetoken: cursor.timetoken,
region: cursor.region,
session: session,
sessionResponseQueue: sessionResponseQueue
), listeners: dependencies.value.listeners,
error: reason,
retryAttempt: retryAttempt
)
case let .emitMessages(messages, cursor):
return EmitMessagesEffect(
messages: messages,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,105 +65,6 @@ class ReceivingEffect: EffectHandler {
}
}

// MARK: - HandshakeReconnectEffect

class HandshakeReconnectEffect: EffectHandler {
private let subscribeEffect: SubscribeEffect
private let timerEffect: TimerEffect?
private let error: PubNubError

init(
request: SubscribeRequest,
listeners: [BaseSubscriptionListener],
error: PubNubError,
retryAttempt: Int
) {
self.timerEffect = TimerEffect(interval: request.reconnectionDelay(
dueTo: error,
retryAttempt: retryAttempt
))
self.subscribeEffect = SubscribeEffect(
request: request,
listeners: listeners,
onResponseReceived: { .handshakeReconnectSuccess(cursor: $0.cursor) },
onErrorReceived: { .handshakeReconnectFailure(error: $0) }
)
self.error = error
}

func performTask(completionBlock: @escaping ([Subscribe.Event]) -> Void) {
guard let timerEffect = timerEffect else {
completionBlock([.handshakeReconnectGiveUp(error: error)]); return
}
timerEffect.performTask { [weak self] _ in
self?.subscribeEffect.performTask(completionBlock: completionBlock)
}
}

func cancelTask() {
timerEffect?.cancelTask()
subscribeEffect.cancelTask()
}

deinit {
cancelTask()
}
}

// MARK: - ReceiveReconnectEffect

class ReceiveReconnectEffect: EffectHandler {
private let subscribeEffect: SubscribeEffect
private let timerEffect: TimerEffect?
private let error: PubNubError

init(
request: SubscribeRequest,
listeners: [BaseSubscriptionListener],
error: PubNubError,
retryAttempt: Int
) {
self.timerEffect = TimerEffect(interval: request.reconnectionDelay(
dueTo: error,
retryAttempt: retryAttempt
))
self.subscribeEffect = SubscribeEffect(
request: request,
listeners: listeners,
onResponseReceived: { .receiveReconnectSuccess(cursor: $0.cursor, messages: $0.messages) },
onErrorReceived: { .receiveReconnectFailure(error: $0) }
)
self.error = error
}

func performTask(completionBlock: @escaping ([Subscribe.Event]) -> Void) {
subscribeEffect.listeners.forEach {
$0.emit(subscribe: .connectionChanged(.reconnecting))
}
guard let timerEffect = timerEffect else {
completionBlock([.receiveReconnectGiveUp(error: error)]); return
}
subscribeEffect.request.onAuthChallengeReceived = { [weak self] in
// Delay time for server to process connection after TLS handshake
DispatchQueue.global(qos: .default).asyncAfter(deadline: DispatchTime.now() + 0.05) {
self?.subscribeEffect.listeners.forEach { $0.emit(subscribe: .connectionChanged(.connected)) }
}
}
timerEffect.performTask { [weak self] _ in
self?.subscribeEffect.performTask(completionBlock: completionBlock)
}
}

func cancelTask() {
timerEffect?.cancelTask()
subscribeEffect.cancelTask()
}

deinit {
cancelTask()
}
}

// MARK: - SubscribeEffect

private class SubscribeEffect: EffectHandler {
Expand Down
Loading

0 comments on commit d24c0f6

Please sign in to comment.