From 5f79840b565c409956e84ed1c28ad0494d6c5c38 Mon Sep 17 00:00:00 2001 From: jguz-pubnub Date: Fri, 12 Jan 2024 17:49:35 +0100 Subject: [PATCH] Subscribe & Presence Event Engine * Providing full backward compatibility with old subscription loop * Always using PubNubError for any kind of errors --- .../MasterDetailTableViewController.swift | 20 +- PubNub.xcodeproj/project.pbxproj | 8 +- .../EventEngine/Core/EffectHandler.swift | 26 +- .../Effects/DelayedHeartbeatEffect.swift | 63 +- .../Effects/PresenceEffectFactory.swift | 3 +- .../Presence/Effects/WaitEffect.swift | 32 +- .../Helpers/PresenceHeartbeatRequest.swift | 25 + .../Subscribe/Effects/EmitStatusEffect.swift | 2 +- .../Effects/SubscribeEffectFactory.swift | 8 +- .../Subscribe/Effects/SubscribeEffects.swift | 222 ++- .../Subscribe/Helpers/SubscribeError.swift | 25 - .../Subscribe/Helpers/SubscribeInput.swift | 134 +- .../Subscribe/Helpers/SubscribeRequest.swift | 30 +- .../EventEngine/Subscribe/Subscribe.swift | 26 +- .../Subscribe/SubscribeTransition.swift | 24 +- .../Subscription/SubscriptionStream.swift | 53 +- .../Request/Operators/AutomaticRetry.swift | 118 +- .../Subscription/ConnectionStatus.swift | 34 +- ...entEngineSubscriptionSessionStrategy.swift | 115 +- ...SubscriptionSessionStrategy+Presence.swift | 20 +- .../LegacySubscriptionSessionStrategy.swift | 77 +- .../SubscribeSessionFactory.swift | 4 +- .../Subscription/SubscriptionSession.swift | 4 +- .../PubNubContractTestCase.swift | 2 +- .../Subscribe/EmitStatusTests.swift | 2 +- .../Subscribe/SubscribeEffectsTests.swift | 70 +- .../Subscribe/SubscribeInputTests.swift | 118 +- .../Subscribe/SubscribeRequestTests.swift | 10 +- .../Subscribe/SubscribeTransitionTests.swift | 144 +- Tests/PubNubTests/Helpers/PAMTokenTests.swift | 41 +- .../SubscriptionIntegrationTests.swift | 35 +- .../Operators/AutomaticRetryTests.swift | 193 +- .../Routers/SubscribeRouterTests.swift | 1745 +++++++++-------- .../SubscriptionSessionTests.swift | 164 +- 34 files changed, 1998 insertions(+), 1599 deletions(-) delete mode 100644 Sources/PubNub/EventEngine/Subscribe/Helpers/SubscribeError.swift diff --git a/Examples/Sources/MasterDetailTableViewController.swift b/Examples/Sources/MasterDetailTableViewController.swift index e4b5cd1e..bded2ba0 100644 --- a/Examples/Sources/MasterDetailTableViewController.swift +++ b/Examples/Sources/MasterDetailTableViewController.swift @@ -250,14 +250,28 @@ class MasterDetailTableViewController: UITableViewController { print("The signal is \(signal.payload) and was sent by \(signal.publisher ?? "")") case let .connectionStatusChanged(connectionChange): switch connectionChange { + case .connecting: + print("Status connecting...") case .connected: print("Status connected!") - case .connectionError: - print("Error while attempting to initialize connection") + case .reconnecting: + print("Status reconnecting...") case .disconnected: print("Status disconnected") case .disconnectedUnexpectedly: - print("Disconnected unexpectedly") + print("Status disconnected unexpectedly!") + case .connectionError: + print("Cannot establish initial conection to the remote system") + } + case let .subscriptionChanged(subscribeChange): + switch subscribeChange { + case let .subscribed(channels, groups): + print("\(channels) and \(groups) were added to subscription") + case let .responseHeader(channels, groups, previous, next): + print("\(channels) and \(groups) recevied a response at \(previous?.timetoken ?? 0)") + print("\(next?.timetoken ?? 0) will be used as the new timetoken") + case let .unsubscribed(channels, groups): + print("\(channels) and \(groups) were removed from subscription") } case let .presenceChanged(presenceChange): print("The channel \(presenceChange.channel) has an updated occupancy of \(presenceChange.occupancy)") diff --git a/PubNub.xcodeproj/project.pbxproj b/PubNub.xcodeproj/project.pbxproj index 4690cb80..5de7a138 100644 --- a/PubNub.xcodeproj/project.pbxproj +++ b/PubNub.xcodeproj/project.pbxproj @@ -383,7 +383,6 @@ 3D389FE72B35AF4A006928E7 /* EmitStatusEffect.swift in Sources */ = {isa = PBXBuildFile; fileRef = 3D389FCB2B35AF4A006928E7 /* EmitStatusEffect.swift */; }; 3D389FE82B35AF4A006928E7 /* SubscribeEffects.swift in Sources */ = {isa = PBXBuildFile; fileRef = 3D389FCC2B35AF4A006928E7 /* SubscribeEffects.swift */; }; 3D389FE92B35AF4A006928E7 /* SubscribeEffectFactory.swift in Sources */ = {isa = PBXBuildFile; fileRef = 3D389FCD2B35AF4A006928E7 /* SubscribeEffectFactory.swift */; }; - 3D389FEA2B35AF4A006928E7 /* SubscribeError.swift in Sources */ = {isa = PBXBuildFile; fileRef = 3D389FCF2B35AF4A006928E7 /* SubscribeError.swift */; }; 3D389FEB2B35AF4A006928E7 /* SubscribeInput.swift in Sources */ = {isa = PBXBuildFile; fileRef = 3D389FD02B35AF4A006928E7 /* SubscribeInput.swift */; }; 3D389FEC2B35AF4A006928E7 /* SubscribeRequest.swift in Sources */ = {isa = PBXBuildFile; fileRef = 3D389FD12B35AF4A006928E7 /* SubscribeRequest.swift */; }; 3D389FED2B35AF4A006928E7 /* Subscribe.swift in Sources */ = {isa = PBXBuildFile; fileRef = 3D389FD22B35AF4A006928E7 /* Subscribe.swift */; }; @@ -425,6 +424,7 @@ 3D38A02D2B35B087006928E7 /* SubscriptionSessionStrategy.swift in Sources */ = {isa = PBXBuildFile; fileRef = 3D38A0292B35B087006928E7 /* SubscriptionSessionStrategy.swift */; }; 3D38A02E2B35B087006928E7 /* LegacySubscriptionSessionStrategy.swift in Sources */ = {isa = PBXBuildFile; fileRef = 3D38A02A2B35B087006928E7 /* LegacySubscriptionSessionStrategy.swift */; }; 3D38A0302B35B208006928E7 /* subscription_handshake_success.json in Resources */ = {isa = PBXBuildFile; fileRef = 3D38A02F2B35B208006928E7 /* subscription_handshake_success.json */; }; + 3D4ED42F2B519FC500FE58C7 /* SubscriptionSessionTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 3D4ED42E2B519FC500FE58C7 /* SubscriptionSessionTests.swift */; }; 3D6265D72ABCA79100FDD5E6 /* CryptorUtils.swift in Sources */ = {isa = PBXBuildFile; fileRef = 3D6265D62ABCA79100FDD5E6 /* CryptorUtils.swift */; }; 3D758DBF2AAA1C49005D2B36 /* CryptoModule.swift in Sources */ = {isa = PBXBuildFile; fileRef = 3D758DBE2AAA1C49005D2B36 /* CryptoModule.swift */; }; 3D758DC82AB06A12005D2B36 /* CryptoInputStream.swift in Sources */ = {isa = PBXBuildFile; fileRef = 3D758DC62AB06A12005D2B36 /* CryptoInputStream.swift */; }; @@ -969,7 +969,6 @@ 3D389FCB2B35AF4A006928E7 /* EmitStatusEffect.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = EmitStatusEffect.swift; sourceTree = ""; }; 3D389FCC2B35AF4A006928E7 /* SubscribeEffects.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = SubscribeEffects.swift; sourceTree = ""; }; 3D389FCD2B35AF4A006928E7 /* SubscribeEffectFactory.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = SubscribeEffectFactory.swift; sourceTree = ""; }; - 3D389FCF2B35AF4A006928E7 /* SubscribeError.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = SubscribeError.swift; sourceTree = ""; }; 3D389FD02B35AF4A006928E7 /* SubscribeInput.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = SubscribeInput.swift; sourceTree = ""; }; 3D389FD12B35AF4A006928E7 /* SubscribeRequest.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = SubscribeRequest.swift; sourceTree = ""; }; 3D389FD22B35AF4A006928E7 /* Subscribe.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Subscribe.swift; sourceTree = ""; }; @@ -1007,6 +1006,7 @@ 3D38A0292B35B087006928E7 /* SubscriptionSessionStrategy.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = SubscriptionSessionStrategy.swift; sourceTree = ""; }; 3D38A02A2B35B087006928E7 /* LegacySubscriptionSessionStrategy.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = LegacySubscriptionSessionStrategy.swift; sourceTree = ""; }; 3D38A02F2B35B208006928E7 /* subscription_handshake_success.json */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = text.json; path = subscription_handshake_success.json; sourceTree = ""; }; + 3D4ED42E2B519FC500FE58C7 /* SubscriptionSessionTests.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = SubscriptionSessionTests.swift; sourceTree = ""; }; 3D6265D62ABCA79100FDD5E6 /* CryptorUtils.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = CryptorUtils.swift; sourceTree = ""; }; 3D758DBE2AAA1C49005D2B36 /* CryptoModule.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = CryptoModule.swift; sourceTree = ""; }; 3D758DC62AB06A12005D2B36 /* CryptoInputStream.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = CryptoInputStream.swift; sourceTree = ""; }; @@ -1241,6 +1241,7 @@ 35458BA1230CB32F0085B502 /* Subscription */ = { isa = PBXGroup; children = ( + 3D4ED42E2B519FC500FE58C7 /* SubscriptionSessionTests.swift */, 35458BA2230CB3570085B502 /* SubscribeSessionFactoryTests.swift */, ); path = Subscription; @@ -2056,7 +2057,6 @@ 3D389FCE2B35AF4A006928E7 /* Helpers */ = { isa = PBXGroup; children = ( - 3D389FCF2B35AF4A006928E7 /* SubscribeError.swift */, 3D389FD02B35AF4A006928E7 /* SubscribeInput.swift */, 3D389FD12B35AF4A006928E7 /* SubscribeRequest.swift */, ); @@ -3395,7 +3395,6 @@ 35B6FBAF22F226F4005EE490 /* NSNumber+PubNub.swift in Sources */, 3D38A02E2B35B087006928E7 /* LegacySubscriptionSessionStrategy.swift in Sources */, 357024BF283C07C900567EE8 /* Objects+PubNub.swift in Sources */, - 3D389FEA2B35AF4A006928E7 /* SubscribeError.swift in Sources */, 35B0ACE3252BE36D00537A18 /* File+PubNub.swift in Sources */, 3D758DD52AB48A6A005D2B36 /* CryptorHeader.swift in Sources */, 35CF549C248ABE8B0099FE81 /* PubNubObjectMetadataPatcher.swift in Sources */, @@ -3511,6 +3510,7 @@ 35CDFEC022E7B48000F3B9F2 /* ImportTestResource.swift in Sources */, 3D38A00C2B35AF6A006928E7 /* SubscribeInputTests.swift in Sources */, 3D38A0132B35AF6B006928E7 /* HeartbeatEffectTests.swift in Sources */, + 3D4ED42F2B519FC500FE58C7 /* SubscriptionSessionTests.swift in Sources */, 35403F8A253617A8004B978E /* XMLCodingTests.swift in Sources */, 3557CDF8237F4611004BBACC /* MessageActionsRouterTests.swift in Sources */, 35CDFEAD22E7655700F3B9F2 /* URL+PubNubTests.swift in Sources */, diff --git a/Sources/PubNub/EventEngine/Core/EffectHandler.swift b/Sources/PubNub/EventEngine/Core/EffectHandler.swift index fa1866d5..a968aaf5 100644 --- a/Sources/PubNub/EventEngine/Core/EffectHandler.swift +++ b/Sources/PubNub/EventEngine/Core/EffectHandler.swift @@ -41,20 +41,30 @@ protocol DelayedEffectHandler: AnyObject, EffectHandler { var workItem: DispatchWorkItem? { get set } func delayInterval() -> TimeInterval? - func onEarlyExit(notify completionBlock: @escaping ([Event]) -> Void) + func onEmptyInterval(notify completionBlock: @escaping ([Event]) -> Void) func onDelayExpired(notify completionBlock: @escaping ([Event]) -> Void) } -extension DelayedEffectHandler { - func performTask(completionBlock: @escaping ([Event]) -> Void) { - guard let delay = delayInterval() else { - onEarlyExit(notify: completionBlock); return +// MARK: - TimerEffect + +class TimerEffect: EffectHandler { + private let interval: TimeInterval + private var workItem: DispatchWorkItem? + + init?(interval: TimeInterval?) { + if let interval = interval { + self.interval = interval + } else { + return nil } - let workItem = DispatchWorkItem() { [weak self] in - self?.onDelayExpired(notify: completionBlock) + } + + func performTask(completionBlock: @escaping ([Void]) -> Void) { + let workItem = DispatchWorkItem() { + completionBlock([]) } DispatchQueue.global(qos: .default).asyncAfter( - deadline: .now() + delay, + deadline: .now() + interval, execute: workItem ) self.workItem = workItem diff --git a/Sources/PubNub/EventEngine/Presence/Effects/DelayedHeartbeatEffect.swift b/Sources/PubNub/EventEngine/Presence/Effects/DelayedHeartbeatEffect.swift index bfa5e204..3c38a8a1 100644 --- a/Sources/PubNub/EventEngine/Presence/Effects/DelayedHeartbeatEffect.swift +++ b/Sources/PubNub/EventEngine/Presence/Effects/DelayedHeartbeatEffect.swift @@ -10,70 +10,39 @@ import Foundation -class DelayedHeartbeatEffect: DelayedEffectHandler { - typealias Event = Presence.Event - +class DelayedHeartbeatEffect: EffectHandler { private let request: PresenceHeartbeatRequest - private let configuration: SubscriptionConfiguration - private let retryAttempt: Int private let reason: PubNubError - - var workItem: DispatchWorkItem? + private let timerEffect: TimerEffect? init( request: PresenceHeartbeatRequest, retryAttempt: Int, - reason: PubNubError, - configuration: SubscriptionConfiguration + reason: PubNubError ) { self.request = request - self.retryAttempt = retryAttempt self.reason = reason - self.configuration = configuration + self.timerEffect = TimerEffect(interval: request.reconnectionDelay(dueTo: reason, retryAttempt: retryAttempt)) } - func delayInterval() -> TimeInterval? { - guard let automaticRetry = configuration.automaticRetry else { - return nil - } - guard automaticRetry[.presence] != nil else { - return nil - } - guard automaticRetry.retryLimit > retryAttempt else { - return nil - } - guard let underlyingError = reason.underlying else { - return automaticRetry.policy.delay(for: retryAttempt) - } - guard let urlResponse = reason.affected.findFirst(by: PubNubError.AffectedValue.response) else { - return nil + func performTask(completionBlock: @escaping ([Presence.Event]) -> Void) { + guard let timerEffect = timerEffect else { + completionBlock([.heartbeatGiveUp(error: reason)]); return } - - let shouldRetry = automaticRetry.shouldRetry( - response: urlResponse, - error: underlyingError - ) - - return shouldRetry ? automaticRetry.policy.delay(for: retryAttempt) : nil - } - - func onEarlyExit(notify completionBlock: @escaping ([Presence.Event]) -> Void) { - completionBlock([.heartbeatGiveUp(error: reason)]) - } - - func onDelayExpired(notify completionBlock: @escaping ([Presence.Event]) -> Void) { - request.execute() { result in - switch result { - case .success(_): - completionBlock([.heartbeatSuccess]) - case .failure(let error): - completionBlock([.heartbeatFailed(error: error)]) + timerEffect.performTask { [weak self] _ in + self?.request.execute() { result in + switch result { + case .success(_): + completionBlock([.heartbeatSuccess]) + case .failure(let error): + completionBlock([.heartbeatFailed(error: error)]) + } } } } func cancelTask() { - workItem?.cancel() + timerEffect?.cancelTask() request.cancel() } diff --git a/Sources/PubNub/EventEngine/Presence/Effects/PresenceEffectFactory.swift b/Sources/PubNub/EventEngine/Presence/Effects/PresenceEffectFactory.swift index bbec2c93..1726172a 100644 --- a/Sources/PubNub/EventEngine/Presence/Effects/PresenceEffectFactory.swift +++ b/Sources/PubNub/EventEngine/Presence/Effects/PresenceEffectFactory.swift @@ -52,8 +52,7 @@ class PresenceEffectFactory: EffectHandlerFactory { sessionResponseQueue: sessionResponseQueue ), retryAttempt: retryAttempt, - reason: reason, - configuration: dependencies.value.configuration + reason: reason ) case .leave(let channels, let groups): return LeaveEffect( diff --git a/Sources/PubNub/EventEngine/Presence/Effects/WaitEffect.swift b/Sources/PubNub/EventEngine/Presence/Effects/WaitEffect.swift index 979c4700..b1103395 100644 --- a/Sources/PubNub/EventEngine/Presence/Effects/WaitEffect.swift +++ b/Sources/PubNub/EventEngine/Presence/Effects/WaitEffect.swift @@ -10,29 +10,27 @@ import Foundation -class WaitEffect: DelayedEffectHandler { - typealias Event = Presence.Event - - private let configuration: SubscriptionConfiguration - var workItem: DispatchWorkItem? +class WaitEffect: EffectHandler { + private let timerEffect: TimerEffect? init(configuration: SubscriptionConfiguration) { - self.configuration = configuration - } - - func delayInterval() -> TimeInterval? { - configuration.heartbeatInterval > 0 ? TimeInterval(configuration.heartbeatInterval) : nil - } - - func onEarlyExit(notify completionBlock: @escaping ([Presence.Event]) -> Void) { - completionBlock([]) + if configuration.heartbeatInterval > 0 { + self.timerEffect = TimerEffect(interval: TimeInterval(configuration.heartbeatInterval)) + } else { + self.timerEffect = nil + } } - func onDelayExpired(notify completionBlock: @escaping ([Presence.Event]) -> Void) { - completionBlock([.timesUp]) + func performTask(completionBlock: @escaping ([Presence.Event]) -> Void) { + guard let timerEffect = timerEffect else { + completionBlock([]); return + } + timerEffect.performTask(completionBlock: { _ in + completionBlock([.timesUp]) + }) } func cancelTask() { - workItem?.cancel() + timerEffect?.cancelTask() } } diff --git a/Sources/PubNub/EventEngine/Presence/Helpers/PresenceHeartbeatRequest.swift b/Sources/PubNub/EventEngine/Presence/Helpers/PresenceHeartbeatRequest.swift index 46310960..461cf56a 100644 --- a/Sources/PubNub/EventEngine/Presence/Helpers/PresenceHeartbeatRequest.swift +++ b/Sources/PubNub/EventEngine/Presence/Helpers/PresenceHeartbeatRequest.swift @@ -120,4 +120,29 @@ class PresenceHeartbeatRequest { func cancel() { request?.cancel(PubNubError(.clientCancelled)) } + + func reconnectionDelay(dueTo error: PubNubError, retryAttempt: Int) -> TimeInterval? { + guard let automaticRetry = configuration.automaticRetry else { + return nil + } + guard automaticRetry[.presence] != nil else { + return nil + } + guard automaticRetry.retryLimit > retryAttempt else { + return nil + } + guard let underlyingError = error.underlying else { + return automaticRetry.policy.delay(for: retryAttempt) + } + guard let urlResponse = error.affected.findFirst(by: PubNubError.AffectedValue.response) else { + return nil + } + + let shouldRetry = automaticRetry.shouldRetry( + response: urlResponse, + error: underlyingError + ) + + return shouldRetry ? automaticRetry.policy.delay(for: retryAttempt) : nil + } } diff --git a/Sources/PubNub/EventEngine/Subscribe/Effects/EmitStatusEffect.swift b/Sources/PubNub/EventEngine/Subscribe/Effects/EmitStatusEffect.swift index 9197a992..3fcdba57 100644 --- a/Sources/PubNub/EventEngine/Subscribe/Effects/EmitStatusEffect.swift +++ b/Sources/PubNub/EventEngine/Subscribe/Effects/EmitStatusEffect.swift @@ -16,7 +16,7 @@ struct EmitStatusEffect: EffectHandler { func performTask(completionBlock: @escaping ([Subscribe.Event]) -> Void) { if let error = statusChange.error { listeners.forEach { - $0.emit(subscribe: .errorReceived(error.underlying)) + $0.emit(subscribe: .errorReceived(error)) } } listeners.forEach { diff --git a/Sources/PubNub/EventEngine/Subscribe/Effects/SubscribeEffectFactory.swift b/Sources/PubNub/EventEngine/Subscribe/Effects/SubscribeEffectFactory.swift index 977ec10a..3324d0dd 100644 --- a/Sources/PubNub/EventEngine/Subscribe/Effects/SubscribeEffectFactory.swift +++ b/Sources/PubNub/EventEngine/Subscribe/Effects/SubscribeEffectFactory.swift @@ -43,7 +43,7 @@ class SubscribeEffectFactory: EffectHandlerFactory { timetoken: 0, session: session, sessionResponseQueue: sessionResponseQueue - ) + ), listeners: dependencies.value.listeners ) case .handshakeReconnect(let channels, let groups, let retryAttempt, let reason): return HandshakeReconnectEffect( @@ -55,7 +55,7 @@ class SubscribeEffectFactory: EffectHandlerFactory { timetoken: 0, session: session, sessionResponseQueue: sessionResponseQueue - ), + ), listeners: dependencies.value.listeners, error: reason, retryAttempt: retryAttempt ) @@ -70,7 +70,7 @@ class SubscribeEffectFactory: EffectHandlerFactory { region: cursor.region, session: session, sessionResponseQueue: sessionResponseQueue - ) + ), listeners: dependencies.value.listeners ) case .receiveReconnect(let channels, let groups, let cursor, let retryAttempt, let reason): return ReceiveReconnectEffect( @@ -83,7 +83,7 @@ class SubscribeEffectFactory: EffectHandlerFactory { region: cursor.region, session: session, sessionResponseQueue: sessionResponseQueue - ), + ), listeners: dependencies.value.listeners, error: reason, retryAttempt: retryAttempt ) diff --git a/Sources/PubNub/EventEngine/Subscribe/Effects/SubscribeEffects.swift b/Sources/PubNub/EventEngine/Subscribe/Effects/SubscribeEffects.swift index 2892ebca..b9472fe1 100644 --- a/Sources/PubNub/EventEngine/Subscribe/Effects/SubscribeEffects.swift +++ b/Sources/PubNub/EventEngine/Subscribe/Effects/SubscribeEffects.swift @@ -10,29 +10,27 @@ import Foundation -// MARK: - Handshake Effect +// MARK: - HandshakeEffect class HandshakeEffect: EffectHandler { - let request: SubscribeRequest + private let subscribeEffect: SubscribeEffect - init(request: SubscribeRequest) { - self.request = request + init(request: SubscribeRequest, listeners: [BaseSubscriptionListener]) { + self.subscribeEffect = SubscribeEffect( + request: request, + listeners: listeners, + onResponseReceived: { .handshakeSuccess(cursor: $0.cursor) }, + onErrorReceived: { .handshakeFailure(error: $0) } + ) } func performTask(completionBlock: @escaping ([Subscribe.Event]) -> Void) { - request.execute(onCompletion: { [weak self] in - guard let _ = self else { return } - switch $0 { - case .success(let response): - completionBlock([.handshakeSuccess(cursor: response.cursor)]) - case .failure(let error): - completionBlock([.handshakeFailure(error: error)]) - } - }) + subscribeEffect.listeners.forEach { $0.emit(subscribe: .connectionChanged(.connecting)) } + subscribeEffect.performTask(completionBlock: completionBlock) } func cancelTask() { - request.cancel() + subscribeEffect.cancelTask() } deinit { @@ -40,29 +38,26 @@ class HandshakeEffect: EffectHandler { } } -// MARK: - Receiving Effect +// MARK: - ReceivingEffect class ReceivingEffect: EffectHandler { - let request: SubscribeRequest - - init(request: SubscribeRequest) { - self.request = request + private let subscribeEffect: SubscribeEffect + + init(request: SubscribeRequest, listeners: [BaseSubscriptionListener]) { + self.subscribeEffect = SubscribeEffect( + request: request, + listeners: listeners, + onResponseReceived: { .receiveSuccess(cursor: $0.cursor, messages: $0.messages) }, + onErrorReceived: { .receiveFailure(error: $0) } + ) } func performTask(completionBlock: @escaping ([Subscribe.Event]) -> Void) { - request.execute(onCompletion: { [weak self] in - guard let _ = self else { return } - switch $0 { - case .success(let response): - completionBlock([.receiveSuccess(cursor: response.cursor, messages: response.messages)]) - case .failure(let error): - completionBlock([.receiveFailure(error: error)]) - } - }) + subscribeEffect.performTask(completionBlock: completionBlock) } func cancelTask() { - request.cancel() + subscribeEffect.cancelTask() } deinit { @@ -70,45 +65,107 @@ class ReceivingEffect: EffectHandler { } } -// MARK: - Handshake Reconnect Effect +// MARK: - HandshakeReconnectEffect -class HandshakeReconnectEffect: DelayedEffectHandler { - typealias Event = Subscribe.Event - - let request: SubscribeRequest - let retryAttempt: Int - let error: SubscribeError - var workItem: DispatchWorkItem? - - init(request: SubscribeRequest, error: SubscribeError, retryAttempt: Int) { - self.request = request +class HandshakeReconnectEffect: EffectHandler { + private let subscribeEffect: SubscribeEffect + private let timerEffect: TimerEffect? + private let error: PubNubError + + init( + request: SubscribeRequest, + listeners: [BaseSubscriptionListener], + error: PubNubError, + retryAttempt: Int + ) { + self.timerEffect = TimerEffect(interval: request.reconnectionDelay( + dueTo: error, + retryAttempt: retryAttempt + )) + self.subscribeEffect = SubscribeEffect( + request: request, + listeners: listeners, + onResponseReceived: { .handshakeReconnectSuccess(cursor: $0.cursor) }, + onErrorReceived: { .handshakeReconnectFailure(error: $0) } + ) self.error = error - self.retryAttempt = retryAttempt } - func delayInterval() -> TimeInterval? { - request.reconnectionDelay(dueTo: error, with: retryAttempt) + func performTask(completionBlock: @escaping ([Subscribe.Event]) -> Void) { + subscribeEffect.listeners.forEach { + $0.emit(subscribe: .connectionChanged(.reconnecting)) + } + guard let timerEffect = timerEffect else { + completionBlock([.handshakeReconnectGiveUp(error: error)]); return + } + subscribeEffect.request.onAuthChallengeReceived = { [weak self] in + // Delay time for server to process connection after TLS handshake + DispatchQueue.global(qos: .default).asyncAfter(deadline: DispatchTime.now() + 0.05) { + self?.subscribeEffect.listeners.forEach { $0.emit(subscribe: .connectionChanged(.connected)) } + } + } + timerEffect.performTask { [weak self] _ in + self?.subscribeEffect.performTask(completionBlock: completionBlock) + } + } + + func cancelTask() { + timerEffect?.cancelTask() + subscribeEffect.cancelTask() } - func onEarlyExit(notify completionBlock: @escaping ([Subscribe.Event]) -> Void) { - completionBlock([.handshakeReconnectGiveUp(error: error)]) + deinit { + cancelTask() + } +} + +// MARK: - ReceiveReconnectEffect + +class ReceiveReconnectEffect: EffectHandler { + private let subscribeEffect: SubscribeEffect + private let timerEffect: TimerEffect? + private let error: PubNubError + + init( + request: SubscribeRequest, + listeners: [BaseSubscriptionListener], + error: PubNubError, + retryAttempt: Int + ) { + self.timerEffect = TimerEffect(interval: request.reconnectionDelay( + dueTo: error, + retryAttempt: retryAttempt + )) + self.subscribeEffect = SubscribeEffect( + request: request, + listeners: listeners, + onResponseReceived: { .receiveReconnectSuccess(cursor: $0.cursor, messages: $0.messages) }, + onErrorReceived: { .receiveReconnectFailure(error: $0) } + ) + self.error = error } - func onDelayExpired(notify completionBlock: @escaping ([Subscribe.Event]) -> Void) { - request.execute(onCompletion: { [weak self] in - guard let _ = self else { return } - switch $0 { - case .success(let response): - completionBlock([.handshakeReconnectSuccess(cursor: response.cursor)]) - case .failure(let error): - completionBlock([.handshakeReconnectFailure(error: error)]) + func performTask(completionBlock: @escaping ([Subscribe.Event]) -> Void) { + subscribeEffect.listeners.forEach { + $0.emit(subscribe: .connectionChanged(.reconnecting)) + } + guard let timerEffect = timerEffect else { + completionBlock([.receiveReconnectGiveUp(error: error)]); return + } + subscribeEffect.request.onAuthChallengeReceived = { [weak self] in + // Delay time for server to process connection after TLS handshake + DispatchQueue.global(qos: .default).asyncAfter(deadline: DispatchTime.now() + 0.05) { + self?.subscribeEffect.listeners.forEach { $0.emit(subscribe: .connectionChanged(.connected)) } } - }) + } + timerEffect.performTask { [weak self] _ in + self?.subscribeEffect.performTask(completionBlock: completionBlock) + } } func cancelTask() { - request.cancel() - workItem?.cancel() + timerEffect?.cancelTask() + subscribeEffect.cancelTask() } deinit { @@ -116,45 +173,50 @@ class HandshakeReconnectEffect: DelayedEffectHandler { } } -// MARK: - Receiving Reconnect Effect +// MARK: - SubscribeEffect -class ReceiveReconnectEffect: DelayedEffectHandler { - typealias Event = Subscribe.Event - +fileprivate class SubscribeEffect: EffectHandler { let request: SubscribeRequest - let retryAttempt: Int - let error: SubscribeError - var workItem: DispatchWorkItem? - - init(request: SubscribeRequest, error: SubscribeError, retryAttempt: Int) { + let listeners: [BaseSubscriptionListener] + let onResponseReceived: (SubscribeResponse) -> Subscribe.Event + let onErrorReceived: (PubNubError) -> Subscribe.Event + + init( + request: SubscribeRequest, + listeners: [BaseSubscriptionListener], + onResponseReceived: @escaping ((SubscribeResponse) -> Subscribe.Event), + onErrorReceived: @escaping ((PubNubError) -> Subscribe.Event) + ) { self.request = request - self.error = error - self.retryAttempt = retryAttempt - } - - func delayInterval() -> TimeInterval? { - request.reconnectionDelay(dueTo: error, with: retryAttempt) + self.listeners = listeners + self.onResponseReceived = onResponseReceived + self.onErrorReceived = onErrorReceived } - func onEarlyExit(notify completionBlock: @escaping ([Subscribe.Event]) -> Void) { - completionBlock([.receiveReconnectGiveUp(error: error)]) - } - - func onDelayExpired(notify completionBlock: @escaping ([Subscribe.Event]) -> Void) { + func performTask(completionBlock: @escaping ([Subscribe.Event]) -> Void) { request.execute(onCompletion: { [weak self] in - guard let _ = self else { return } + guard let selfRef = self else { return } switch $0 { case .success(let response): - completionBlock([.receiveReconnectSuccess(cursor: response.cursor, messages: response.messages)]) + selfRef.listeners.forEach { + $0.emit(subscribe: .responseReceived( + SubscribeResponseHeader( + channels: selfRef.request.channels.map { PubNubChannel(channel: $0)}, + groups: selfRef.request.groups.map { PubNubChannel(channel: $0)}, + previous: SubscribeCursor(timetoken: selfRef.request.timetoken, region: selfRef.request.region), + next: response.cursor + )) + ) + } + completionBlock([selfRef.onResponseReceived(response)]) case .failure(let error): - completionBlock([.receiveReconnectFailure(error: error)]) + completionBlock([selfRef.onErrorReceived(error)]) } }) } func cancelTask() { request.cancel() - workItem?.cancel() } deinit { diff --git a/Sources/PubNub/EventEngine/Subscribe/Helpers/SubscribeError.swift b/Sources/PubNub/EventEngine/Subscribe/Helpers/SubscribeError.swift deleted file mode 100644 index 7ea4c052..00000000 --- a/Sources/PubNub/EventEngine/Subscribe/Helpers/SubscribeError.swift +++ /dev/null @@ -1,25 +0,0 @@ -// -// SubscribeError.swift -// -// Copyright (c) PubNub Inc. -// All rights reserved. -// -// This source code is licensed under the license found in the -// LICENSE file in the root directory of this source tree. -// - -import Foundation - -struct SubscribeError: Error, Equatable { - let underlying: PubNubError - let urlResponse: HTTPURLResponse? - - init(underlying: PubNubError, urlResponse: HTTPURLResponse? = nil) { - self.underlying = underlying - self.urlResponse = urlResponse - } - - static func == (lhs: SubscribeError, rhs: SubscribeError) -> Bool { - lhs.underlying == rhs.underlying - } -} diff --git a/Sources/PubNub/EventEngine/Subscribe/Helpers/SubscribeInput.swift b/Sources/PubNub/EventEngine/Subscribe/Helpers/SubscribeInput.swift index e48057a9..6d6604fc 100644 --- a/Sources/PubNub/EventEngine/Subscribe/Helpers/SubscribeInput.swift +++ b/Sources/PubNub/EventEngine/Subscribe/Helpers/SubscribeInput.swift @@ -11,33 +11,41 @@ import Foundation struct SubscribeInput: Equatable { - private let channels: [String: PubNubChannel] - private let groups: [String: PubNubChannel] + private let channelEntries: [String: PubNubChannel] + private let groupEntries: [String: PubNubChannel] init(channels: [PubNubChannel] = [], groups: [PubNubChannel] = []) { - self.channels = channels.reduce(into: [String: PubNubChannel]()) { r, channel in _ = r.insert(channel) } - self.groups = groups.reduce(into: [String: PubNubChannel]()) { r, channel in _ = r.insert(channel) } + self.channelEntries = channels.reduce(into: [String: PubNubChannel]()) { r, channel in _ = r.insert(channel) } + self.groupEntries = groups.reduce(into: [String: PubNubChannel]()) { r, channel in _ = r.insert(channel) } } private init(channels: [String: PubNubChannel], groups: [String: PubNubChannel]) { - self.channels = channels - self.groups = groups + self.channelEntries = channels + self.groupEntries = groups } var isEmpty: Bool { - channels.isEmpty && groups.isEmpty + channelEntries.isEmpty && groupEntries.isEmpty } - var subscribedChannels: [String] { - channels.map { $0.key } + var channels: [PubNubChannel] { + Array(channelEntries.values) } - var subscribedGroups: [String] { - groups.map { $0.key } + var groups: [PubNubChannel] { + Array(groupEntries.values) } - var allSubscribedChannels: [String] { - channels.reduce(into: [String]()) { result, entry in + var subscribedChannelNames: [String] { + channelEntries.map { $0.key } + } + + var subscribedGroupNames: [String] { + groupEntries.map { $0.key } + } + + var allSubscribedChannelNames: [String] { + channelEntries.reduce(into: [String]()) { result, entry in result.append(entry.value.id) if entry.value.isPresenceSubscribed { result.append(entry.value.presenceId) @@ -45,8 +53,8 @@ struct SubscribeInput: Equatable { } } - var allSubscribedGroups: [String] { - groups.reduce(into: [String]()) { result, entry in + var allSubscribedGroupNames: [String] { + groupEntries.reduce(into: [String]()) { result, entry in result.append(entry.value.id) if entry.value.isPresenceSubscribed { result.append(entry.value.presenceId) @@ -54,8 +62,8 @@ struct SubscribeInput: Equatable { } } - var presenceSubscribedChannels: [String] { - channels.compactMap { + var presenceSubscribedChannelNames: [String] { + channelEntries.compactMap { if $0.value.isPresenceSubscribed { return $0.value.id } else { @@ -64,8 +72,8 @@ struct SubscribeInput: Equatable { } } - var presenceSubscribedGroups: [String] { - groups.compactMap { + var presenceSubscribedGroupNames: [String] { + groupEntries.compactMap { if $0.value.isPresenceSubscribed { return $0.value.id } else { @@ -75,49 +83,67 @@ struct SubscribeInput: Equatable { } var totalSubscribedCount: Int { - channels.count + groups.count + channelEntries.count + groupEntries.count } - - static func +(lhs: SubscribeInput, rhs: SubscribeInput) -> SubscribeInput { - var currentChannels = lhs.channels - var currentGroups = rhs.groups - rhs.channels.values.forEach { _ = currentChannels.insert($0) } - lhs.groups.values.forEach { _ = currentGroups.insert($0) } + func adding( + channels: [PubNubChannel], + and groups: [PubNubChannel] + ) -> ( + newInput: SubscribeInput, + insertedChannels: [PubNubChannel], + insertedGroups: [PubNubChannel] + ) { + var currentChannels = channelEntries + var currentGroups = groupEntries + + let insertedChannels = channels.filter { currentChannels.insert($0) } + let insertedGroups = groups.filter { currentGroups.insert($0) } - return SubscribeInput( - channels: currentChannels, - groups: currentGroups + return ( + newInput: SubscribeInput(channels: currentChannels, groups: currentGroups), + insertedChannels: insertedChannels, + insertedGroups: insertedGroups ) } - static func -(lhs: SubscribeInput, rhs: (channels: [String], groups: [String])) -> SubscribeInput { - var currentChannels = lhs.channels - var currentGroups = lhs.groups + func removing( + channels: [String], + and groups: [String] + ) -> ( + newInput: SubscribeInput, + removedChannels: [PubNubChannel], + removedGroups: [PubNubChannel] + ) { + var currentChannels = channelEntries + var currentGroups = groupEntries - rhs.channels.forEach { + let removedChannels = channels.compactMap { if $0.isPresenceChannelName { - currentChannels.unsubscribePresence($0.trimmingPresenceChannelSuffix) + return currentChannels.unsubscribePresence($0.trimmingPresenceChannelSuffix) } else { - currentChannels.removeValue(forKey: $0) + return currentChannels.removeValue(forKey: $0) } } - rhs.groups.forEach { + + let removedGroups = groups.compactMap { if $0.isPresenceChannelName { - currentGroups.unsubscribePresence($0.trimmingPresenceChannelSuffix) + return currentGroups.unsubscribePresence($0.trimmingPresenceChannelSuffix) } else { - currentGroups.removeValue(forKey: $0) + return currentGroups.removeValue(forKey: $0) } } - return SubscribeInput( - channels: currentChannels, - groups: currentGroups + + return ( + newInput: SubscribeInput(channels: currentChannels, groups: currentGroups), + removedChannels: removedChannels, + removedGroups: removedGroups ) } static func ==(lhs: SubscribeInput, rhs: SubscribeInput) -> Bool { - let equalChannels = lhs.allSubscribedChannels.sorted(by: <) == rhs.allSubscribedChannels.sorted(by: <) - let equalGroups = lhs.allSubscribedGroups.sorted(by: <) == rhs.allSubscribedGroups.sorted(by: <) + let equalChannels = lhs.allSubscribedChannelNames.sorted(by: <) == rhs.allSubscribedChannelNames.sorted(by: <) + let equalGroups = lhs.allSubscribedGroupNames.sorted(by: <) == rhs.allSubscribedGroupNames.sorted(by: <) return equalChannels && equalGroups } @@ -132,6 +158,28 @@ extension Dictionary where Key == String, Value == PubNubChannel { self[channel.id] = channel return true } + + func difference(_ dict: [Key:Value]) -> [Key: Value] { + let entriesInSelfAndNotInDict = filter { + dict[$0.0] != self[$0.0] + } + return entriesInSelfAndNotInDict.reduce([Key:Value]()) { (res, entry) -> [Key:Value] in + var res = res + res[entry.0] = entry.1 + return res + } + } + + func intersection(_ dict: [Key:Value]) -> [Key: Value] { + let entriesInSelfAndInDict = filter { + dict[$0.0] == self[$0.0] + } + return entriesInSelfAndInDict.reduce([Key:Value]()) { (res, entry) -> [Key:Value] in + var res = res + res[entry.0] = entry.1 + return res + } + } // Updates current Dictionary with the new channel value unsubscribed from Presence. // Returns the updated value if the corresponding entry matching the passed `id:` was found, otherwise `nil` diff --git a/Sources/PubNub/EventEngine/Subscribe/Helpers/SubscribeRequest.swift b/Sources/PubNub/EventEngine/Subscribe/Helpers/SubscribeRequest.swift index 8b1935bd..7b35ad65 100644 --- a/Sources/PubNub/EventEngine/Subscribe/Helpers/SubscribeRequest.swift +++ b/Sources/PubNub/EventEngine/Subscribe/Helpers/SubscribeRequest.swift @@ -23,9 +23,8 @@ class SubscribeRequest { private var request: RequestReplaceable? - var retryLimit: UInt { - configuration.automaticRetry?.retryLimit ?? 0 - } + var retryLimit: UInt { configuration.automaticRetry?.retryLimit ?? 0 } + var onAuthChallengeReceived: (() -> Void)? init( configuration: SubscriptionConfiguration, @@ -45,9 +44,15 @@ class SubscribeRequest { self.region = region self.session = session self.sessionResponseQueue = sessionResponseQueue + + if let sessionListener = session.sessionStream as? SessionListener { + sessionListener.sessionDidReceiveChallenge = { [weak self] _, _ in + self?.onAuthChallengeReceived?() + } + } } - func reconnectionDelay(dueTo error: SubscribeError, with retryAttempt: Int) -> TimeInterval? { + func reconnectionDelay(dueTo error: PubNubError, retryAttempt: Int) -> TimeInterval? { guard let automaticRetry = configuration.automaticRetry else { return nil } @@ -57,17 +62,20 @@ class SubscribeRequest { guard automaticRetry.retryLimit > retryAttempt else { return nil } - guard let underlyingError = error.underlying.underlying else { + guard let underlyingError = error.underlying else { return automaticRetry.policy.delay(for: retryAttempt) } + guard let urlResponse = error.affected.findFirst(by: PubNubError.AffectedValue.response) else { + return nil + } let shouldRetry = automaticRetry.shouldRetry( - response: error.urlResponse, + response: urlResponse, error: underlyingError ) return shouldRetry ? automaticRetry.policy.delay(for: retryAttempt) : nil } - func execute(onCompletion: @escaping (Result) -> Void) { + func execute(onCompletion: @escaping (Result) -> Void) { let router = SubscribeRouter( .subscribe( channels: channels, @@ -87,21 +95,19 @@ class SubscribeRequest { request?.validate().response( on: sessionResponseQueue, decoder: SubscribeDecoder(), - completion: { [weak self] result in + completion: { result in switch result { case .success(let response): onCompletion(.success(response.payload)) case .failure(let error): - onCompletion(.failure(SubscribeError( - underlying: error as? PubNubError ?? PubNubError(.unknown, underlying: error), - urlResponse: self?.request?.urlResponse - ))) + onCompletion(.failure(error as? PubNubError ?? PubNubError(.unknown, underlying: error))) } } ) } func cancel() { + onAuthChallengeReceived = nil request?.cancel(PubNubError(.clientCancelled)) } diff --git a/Sources/PubNub/EventEngine/Subscribe/Subscribe.swift b/Sources/PubNub/EventEngine/Subscribe/Subscribe.swift index bf02446b..faa225e2 100644 --- a/Sources/PubNub/EventEngine/Subscribe/Subscribe.swift +++ b/Sources/PubNub/EventEngine/Subscribe/Subscribe.swift @@ -48,14 +48,14 @@ extension Subscribe { let input: SubscribeInput let cursor: SubscribeCursor let retryAttempt: Int - let reason: SubscribeError + let reason: PubNubError let connectionStatus = ConnectionStatus.disconnected } struct HandshakeFailedState: SubscribeState { let input: SubscribeInput let cursor: SubscribeCursor - let error: SubscribeError + let error: PubNubError let connectionStatus = ConnectionStatus.disconnected } @@ -69,7 +69,7 @@ extension Subscribe { let input: SubscribeInput let cursor: SubscribeCursor let retryAttempt: Int - let reason: SubscribeError + let reason: PubNubError let connectionStatus = ConnectionStatus.connected } @@ -82,7 +82,7 @@ extension Subscribe { struct ReceiveFailedState: SubscribeState { let input: SubscribeInput let cursor: SubscribeCursor - let error: SubscribeError + let error: PubNubError let connectionStatus = ConnectionStatus.disconnected } @@ -100,15 +100,15 @@ extension Subscribe { case subscriptionChanged(channels: [String], groups: [String]) case subscriptionRestored(channels: [String], groups: [String], cursor: SubscribeCursor) case handshakeSuccess(cursor: SubscribeCursor) - case handshakeFailure(error: SubscribeError) + case handshakeFailure(error: PubNubError) case handshakeReconnectSuccess(cursor: SubscribeCursor) - case handshakeReconnectFailure(error: SubscribeError) - case handshakeReconnectGiveUp(error: SubscribeError) + case handshakeReconnectFailure(error: PubNubError) + case handshakeReconnectGiveUp(error: PubNubError) case receiveSuccess(cursor: SubscribeCursor, messages: [SubscribeMessagePayload]) - case receiveFailure(error: SubscribeError) + case receiveFailure(error: PubNubError) case receiveReconnectSuccess(cursor: SubscribeCursor, messages: [SubscribeMessagePayload]) - case receiveReconnectFailure(error: SubscribeError) - case receiveReconnectGiveUp(error: SubscribeError) + case receiveReconnectFailure(error: PubNubError) + case receiveReconnectGiveUp(error: PubNubError) case disconnect case reconnect case unsubscribeAll @@ -119,7 +119,7 @@ extension Subscribe { struct ConnectionStatusChange: Equatable { let oldStatus: ConnectionStatus let newStatus: ConnectionStatus - let error: SubscribeError? + let error: PubNubError? } } @@ -140,9 +140,9 @@ extension Subscribe { extension Subscribe { enum Invocation: AnyEffectInvocation { case handshakeRequest(channels: [String], groups: [String]) - case handshakeReconnect(channels: [String], groups: [String], retryAttempt: Int, reason: SubscribeError) + case handshakeReconnect(channels: [String], groups: [String], retryAttempt: Int, reason: PubNubError) case receiveMessages(channels: [String], groups: [String], cursor: SubscribeCursor) - case receiveReconnect(channels: [String], groups: [String], cursor: SubscribeCursor, retryAttempt: Int, reason: SubscribeError) + case receiveReconnect(channels: [String], groups: [String], cursor: SubscribeCursor, retryAttempt: Int, reason: PubNubError) case emitStatus(change: Subscribe.ConnectionStatusChange) case emitMessages(events: [SubscribeMessagePayload], forCursor: SubscribeCursor) diff --git a/Sources/PubNub/EventEngine/Subscribe/SubscribeTransition.swift b/Sources/PubNub/EventEngine/Subscribe/SubscribeTransition.swift index e9bf10dd..b0d16514 100644 --- a/Sources/PubNub/EventEngine/Subscribe/SubscribeTransition.swift +++ b/Sources/PubNub/EventEngine/Subscribe/SubscribeTransition.swift @@ -77,8 +77,8 @@ class SubscribeTransition: TransitionProtocol { return [ .managed( .handshakeRequest( - channels: state.input.allSubscribedChannels, - groups: state.input.allSubscribedGroups + channels: state.input.allSubscribedChannelNames, + groups: state.input.allSubscribedGroupNames ) ) ] @@ -86,8 +86,8 @@ class SubscribeTransition: TransitionProtocol { return [ .managed( .handshakeReconnect( - channels: state.input.allSubscribedChannels, - groups: state.input.allSubscribedGroups, + channels: state.input.allSubscribedChannelNames, + groups: state.input.allSubscribedGroupNames, retryAttempt: state.retryAttempt, reason: state.reason ) @@ -97,8 +97,8 @@ class SubscribeTransition: TransitionProtocol { return [ .managed( .receiveMessages( - channels: state.input.allSubscribedChannels, - groups: state.input.allSubscribedGroups, + channels: state.input.allSubscribedChannelNames, + groups: state.input.allSubscribedGroupNames, cursor: state.cursor ) ) @@ -107,8 +107,8 @@ class SubscribeTransition: TransitionProtocol { return [ .managed( .receiveReconnect( - channels: state.input.allSubscribedChannels, - groups: state.input.allSubscribedGroups, + channels: state.input.allSubscribedChannelNames, + groups: state.input.allSubscribedGroupNames, cursor: state.cursor, retryAttempt: state.retryAttempt, reason: state.reason @@ -226,7 +226,7 @@ fileprivate extension SubscribeTransition { fileprivate extension SubscribeTransition { func setHandshakeReconnectingState( from state: State, - error: SubscribeError + error: PubNubError ) -> TransitionResult { return TransitionResult( state: Subscribe.HandshakeReconnectingState( @@ -242,7 +242,7 @@ fileprivate extension SubscribeTransition { fileprivate extension SubscribeTransition { func setHandshakeFailedState( from state: State, - error: SubscribeError + error: PubNubError ) -> TransitionResult { return TransitionResult( state: Subscribe.HandshakeFailedState( @@ -291,7 +291,7 @@ fileprivate extension SubscribeTransition { fileprivate extension SubscribeTransition { func setReceiveReconnectingState( from state: State, - error: SubscribeError + error: PubNubError ) -> TransitionResult { return TransitionResult( state: Subscribe.ReceiveReconnectingState( @@ -307,7 +307,7 @@ fileprivate extension SubscribeTransition { fileprivate extension SubscribeTransition { func setReceiveFailedState( from state: State, - error: SubscribeError + error: PubNubError ) -> TransitionResult { guard let state = state as? Subscribe.ReceiveReconnectingState else { return TransitionResult(state: state) diff --git a/Sources/PubNub/Events/Subscription/SubscriptionStream.swift b/Sources/PubNub/Events/Subscription/SubscriptionStream.swift index b2310374..11277b0b 100644 --- a/Sources/PubNub/Events/Subscription/SubscriptionStream.swift +++ b/Sources/PubNub/Events/Subscription/SubscriptionStream.swift @@ -11,13 +11,12 @@ import Foundation /// A channel or group that has successfully been subscribed or unsubscribed +@available(*, deprecated, message: "This enumeration will be removed in future versions") public enum SubscriptionChangeEvent { /// The channels or groups that have successfully been subscribed case subscribed(channels: [PubNubChannel], groups: [PubNubChannel]) /// The response header for one or more subscription events - case responseHeader( - channels: [PubNubChannel], groups: [PubNubChannel], previous: SubscribeCursor?, next: SubscribeCursor? - ) + case responseHeader(channels: [PubNubChannel], groups: [PubNubChannel], previous: SubscribeCursor?, next: SubscribeCursor?) /// The channels or groups that have successfully been unsubscribed case unsubscribed(channels: [PubNubChannel], groups: [PubNubChannel]) @@ -34,8 +33,39 @@ public enum SubscriptionChangeEvent { } } +/// The header of a PubNub subscribe response for zero or more events +@available(*, deprecated, message: "This struct will be removed in future versions") +public struct SubscribeResponseHeader { + /// The channels that are actively subscribed + public let channels: [PubNubChannel] + /// The groups that are actively subscribed + public let groups: [PubNubChannel] + /// The most recent successful Timetoken used in subscriptionstatus + public let previous: SubscribeCursor? + /// Timetoken that will be used on the next subscription cycle + public let next: SubscribeCursor? + + public init( + channels: [PubNubChannel], + groups: [PubNubChannel], + previous: SubscribeCursor?, + next: SubscribeCursor? + ) { + self.channels = channels + self.groups = groups + self.previous = previous + self.next = next + } +} + /// Local events emitted from the Subscribe method public enum PubNubSubscribeEvent { + /// A change in the Channel or Group state occured + @available(*, deprecated, message: "This case will be removed in future versions") + case subscriptionChanged(SubscriptionChangeEvent) + /// A subscribe response was received + @available(*, deprecated, message: "This case will be removed in future versions") + case responseReceived(SubscribeResponseHeader) /// The connection status of the PubNub subscription was changed case connectionChanged(ConnectionStatus) /// An error was received @@ -53,6 +83,8 @@ public enum PubNubCoreEvent { case signalReceived(PubNubMessage) /// A change in the subscription connection has occurred case connectionStatusChanged(ConnectionStatus) + /// A change in the subscribed channels or groups has occurred + case subscriptionChanged(SubscriptionChangeEvent) /// A presence change has been received case presenceChanged(PubNubPresenceChange) /// A User object has been updated @@ -125,6 +157,8 @@ public final class CoreListener: BaseSubscriptionListener { public var didReceiveBatchSubscription: (([SubscriptionEvent]) -> Void)? /// Receiver for all subscription events public var didReceiveSubscription: ((SubscriptionEvent) -> Void)? + /// Receiver for changes in the subscribe/unsubscribe status of channels/groups + public var didReceiveSubscriptionChange: ((SubscriptionChangeEvent) -> Void)? /// Receiver for status (Connection & Error) events public var didReceiveStatus: ((StatusEvent) -> Void)? /// Receiver for presence events @@ -144,6 +178,17 @@ public final class CoreListener: BaseSubscriptionListener { override public func emit(subscribe event: PubNubSubscribeEvent) { switch event { + case let .subscriptionChanged(changeEvent): + emitDidReceive(subscription: [.subscriptionChanged(changeEvent)]) + case let .responseReceived(header): + emitDidReceive(subscription: [.subscriptionChanged( + .responseHeader( + channels: header.channels, + groups: header.groups, + previous: header.previous, + next: header.next + ) + )]) case let .connectionChanged(status): emitDidReceive(subscription: [.connectionStatusChanged(status)]) case let .errorReceived(error): @@ -223,6 +268,8 @@ public final class CoreListener: BaseSubscriptionListener { self?.didReceiveSignal?(signal) case let .connectionStatusChanged(status): self?.didReceiveStatus?(.success(status)) + case let .subscriptionChanged(change): + self?.didReceiveSubscriptionChange?(change) case let .presenceChanged(presence): self?.didReceivePresence?(presence) case let .uuidMetadataSet(metadata): diff --git a/Sources/PubNub/Networking/Request/Operators/AutomaticRetry.swift b/Sources/PubNub/Networking/Request/Operators/AutomaticRetry.swift index 8fca00a0..65e7d756 100644 --- a/Sources/PubNub/Networking/Request/Operators/AutomaticRetry.swift +++ b/Sources/PubNub/Networking/Request/Operators/AutomaticRetry.swift @@ -30,16 +30,22 @@ public struct AutomaticRetry: RequestOperator, Hashable { static let minDelay: UInt = 2 /// Provides the action taken when a retry is to be performed - public enum ReconnectionPolicy: Hashable { + public enum ReconnectionPolicy: Hashable, Equatable { /// Exponential backoff with base/scale factor of 2, and a 150s max delay - public static let defaultExponential: ReconnectionPolicy = .exponential(minDelay: minDelay, maxDelay: 150) - /// Linear reconnect every 2 seconds - public static let defaultLinear: ReconnectionPolicy = .linear(delay: Double(minDelay)) + public static let defaultExponential: ReconnectionPolicy = .legacyExponential(base: 2, scale: 2, maxDelay: 300) + /// Linear reconnect every 3 seconds + public static let defaultLinear: ReconnectionPolicy = .linear(delay: Double(3)) + /// Reconnect with an exponential backoff + @available(*, unavailable, renamed: "legacyExponential(base:scale:maxDelay:)") + case exponential(base: UInt, scale: Double, maxDelay: UInt) /// Reconnect with an exponential backoff case exponential(minDelay: UInt, maxDelay: UInt) /// Attempt to reconnect every X seconds case linear(delay: Double) + /// Reconnect with an exponential backoff + @available(*, deprecated, message: "Use exponential(minDelay:maxDelay:) instead") + case legacyExponential(base: UInt, scale: Double, maxDelay: UInt) func delay(for retryAttempt: Int) -> TimeInterval { /// Generates a random interval that's added to the final value @@ -47,28 +53,30 @@ public struct AutomaticRetry: RequestOperator, Hashable { let randomDelay = Double.random(in: 0...1) switch self { + case let .legacyExponential(base, scale, maxDelay): + return legacyExponentialBackoffDelay(for: base, scale: scale, maxDelay: maxDelay, current: retryAttempt) + randomDelay case let .exponential(minDelay, maxDelay): - return exponentialBackoffDelay(minDelay: minDelay, maxDelay: maxDelay, current: retryAttempt) + randomDelay + return min(Double(maxDelay), Double(minDelay) * pow(2, Double(retryAttempt))) + randomDelay case let .linear(delay): return delay + randomDelay } } - func exponentialBackoffDelay(minDelay: UInt, maxDelay: UInt, current retryCount: Int) -> Double { - return min(Double(maxDelay), Double(minDelay) * pow(2, Double(retryCount))) + func legacyExponentialBackoffDelay(for base: UInt, scale: Double, maxDelay: UInt, current retryCount: Int) -> Double { + max(min(pow(Double(base), Double(retryCount)) * scale, Double(maxDelay)), Double(AutomaticRetry.minDelay)) } } - /// List of known endpoint groups (by context) + /// List of known endpoint groups (by context) possible to retry public enum Endpoint { /// Sending a message case messageSend - /// Subscribing to channels and channel groups to receive realtime updates + /// Subscribing to channels and channel groups case subscribe - /// Groups Presence related methods + /// Presence related methods case presence - /// Groups Files related methods - /// - Important: Downloading and uploading a File isn't included + /// List Files, publish a File, remove a File + /// - Important: File download and upload aren't part of retrying. case files /// History related methods case messageStorage @@ -126,41 +134,54 @@ public struct AutomaticRetry: RequestOperator, Hashable { .messageActions ] ) { + self.retryLimit = Self.validate( + value: UInt(retryLimit), + using: retryLimit < 10, + replaceOnFailure: UInt(10), + warningMessage: "The `retryLimit` must be less than or equal 10" + ) + switch policy { case let .exponential(minDelay, maxDelay): - var finalMinDelay: UInt = minDelay - var finalMaxDelay: UInt = maxDelay - var finalRetryLimit: UInt = retryLimit - - if finalRetryLimit > 10 { - PubNub.log.warn("The `retryLimit` for exponential policy must be less than or equal 10") - finalRetryLimit = 10 - } - if finalMinDelay < Self.minDelay { - PubNub.log.warn("The `minDelay` must be a minimum of \(Self.minDelay)") - finalMinDelay = Self.minDelay - } - if finalMinDelay > finalMaxDelay { - PubNub.log.warn("The `minDelay` \"\(minDelay)\" must be greater or equal `maxDelay` \"\(maxDelay)\"") - finalMaxDelay = minDelay - } - self.retryLimit = finalRetryLimit - self.policy = .exponential(minDelay: finalMinDelay, maxDelay: finalMaxDelay) - + let validatedMinDelay = Self.validate( + value: minDelay, + using: minDelay > Self.minDelay, + replaceOnFailure: Self.minDelay, + warningMessage: "The `minDelay` must be a minimum of \(Self.minDelay)" + ) + let validatedMaxDelay = Self.validate( + value: maxDelay, + using: maxDelay >= minDelay, + replaceOnFailure: Self.minDelay, + warningMessage: "The `maxDelay` must be greater than or equal \(Self.minDelay)" + ) + self.policy = .exponential( + minDelay: validatedMinDelay, + maxDelay: validatedMaxDelay + ) case let .linear(delay): - var finalRetryLimit = retryLimit - var finalDelay = delay - - if finalRetryLimit > 10 { - PubNub.log.warn("The `retryLimit` for linear policy must be less than or equal 10") - finalRetryLimit = 10 - } - if finalDelay < 0 || UInt(finalDelay) < Self.minDelay { - PubNub.log.warn("The `linear.delay` must be greater than or equal \(Self.minDelay).") - finalDelay = Double(Self.minDelay) - } - self.retryLimit = finalRetryLimit - self.policy = .linear(delay: finalDelay) + self.policy = .linear(delay: Self.validate( + value: delay, + using: delay >= Double(Self.minDelay), + replaceOnFailure: Double(Self.minDelay), + warningMessage: "The `linear.delay` must be greater than or equal \(Self.minDelay)." + )) + case let .legacyExponential(base, scale, maxDelay): + self.policy = .legacyExponential( + base: Self.validate( + value: base, + using: base >= 2, + replaceOnFailure: 2, + warningMessage: "The `exponential.base` must be a minimum of 2." + ), + scale: Self.validate( + value: scale, + using: scale > 0, + replaceOnFailure: 0, + warningMessage: "The `exponential.scale` must be a positive value." + ), + maxDelay: maxDelay + ) } self.retryableHTTPStatusCodes = retryableHTTPStatusCodes @@ -204,3 +225,12 @@ public struct AutomaticRetry: RequestOperator, Hashable { return false } } + +private extension AutomaticRetry { + static func validate(value: T, using condition: Bool, replaceOnFailure: T, warningMessage message: String) -> T { + guard condition else { + PubNub.log.warn(message); return replaceOnFailure + } + return value + } +} diff --git a/Sources/PubNub/Subscription/ConnectionStatus.swift b/Sources/PubNub/Subscription/ConnectionStatus.swift index a10f4ce0..7ed607d1 100644 --- a/Sources/PubNub/Subscription/ConnectionStatus.swift +++ b/Sources/PubNub/Subscription/ConnectionStatus.swift @@ -12,19 +12,25 @@ import Foundation /// Status of a connection to a remote system public enum ConnectionStatus: Equatable { + /// Attempting to connect to a remote system + @available(*, deprecated, message: "This case will be removed in future versions") + case connecting /// Successfully connected to a remote system case connected /// Explicit disconnect from a remote system case disconnected + /// Attempting to reconnect to a remote system + @available(*, deprecated, message: "This case will be removed in future versions") + case reconnecting /// Unexpected disconnect from a remote system case disconnectedUnexpectedly - /// Unable to establish initial connection + /// Unable to establish initial connection. Applies if `enableEventEngine` in `PubNubConfiguration` is true. case connectionError /// If the connection is connected or attempting to connect public var isActive: Bool { switch self { - case .connected: + case .connecting, .connected, .reconnecting: return true default: return false @@ -42,15 +48,29 @@ public enum ConnectionStatus: Equatable { func canTransition(to state: ConnectionStatus) -> Bool { switch (self, state) { - case (.connected, .disconnected): + case (.connecting, .reconnecting): + return false + case (.connecting, _): return true - case (.disconnected, .connected): + case (.connected, .connecting): + return false + case (.connected, _): return true - case (.connected, .disconnectedUnexpectedly): + case (.reconnecting, .connecting): + return false + case (.reconnecting, _): return true - case (.disconnected, .connectionError): + case (.disconnected, .connecting): return true - default: + case (.disconnected, _): + return false + case (.disconnectedUnexpectedly, .connecting): + return true + case (.disconnectedUnexpectedly, _): + return false + case (.connectionError, .connecting): + return true + case (.connectionError, _): return false } } diff --git a/Sources/PubNub/Subscription/Strategy/EventEngineSubscriptionSessionStrategy.swift b/Sources/PubNub/Subscription/Strategy/EventEngineSubscriptionSessionStrategy.swift index 3e8488a0..38163d92 100644 --- a/Sources/PubNub/Subscription/Strategy/EventEngineSubscriptionSessionStrategy.swift +++ b/Sources/PubNub/Subscription/Strategy/EventEngineSubscriptionSessionStrategy.swift @@ -40,11 +40,11 @@ class EventEngineSubscriptionSessionStrategy: SubscriptionSessionStrategy { } var subscribedChannels: [String] { - subscribeEngine.state.input.subscribedChannels + subscribeEngine.state.input.subscribedChannelNames } var subscribedChannelGroups: [String] { - subscribeEngine.state.input.subscribedGroups + subscribeEngine.state.input.subscribedGroupNames } var subscriptionCount: Int { @@ -98,8 +98,8 @@ class EventEngineSubscriptionSessionStrategy: SubscriptionSessionStrategy { private func onFilterExpressionChanged() { let currentState = subscribeEngine.state - let channels = currentState.input.allSubscribedChannels - let groups = currentState.input.allSubscribedGroups + let channels = currentState.input.allSubscribedChannelNames + let groups = currentState.input.allSubscribedGroupNames sendSubscribeEvent(event: .subscriptionChanged(channels: channels, groups: groups)) } @@ -112,32 +112,45 @@ class EventEngineSubscriptionSessionStrategy: SubscriptionSessionStrategy { at cursor: SubscribeCursor?, withPresence: Bool ) { - let newInput = subscribeEngine.state.input + SubscribeInput( - channels: channels.map { PubNubChannel(id: $0, withPresence: withPresence) }, - groups: groups.map { PubNubChannel(id: $0, withPresence: withPresence) } - ) - if let cursor = cursor, cursor.timetoken != 0 { - sendSubscribeEvent(event: .subscriptionRestored( - channels: newInput.allSubscribedChannels, - groups: newInput.allSubscribedGroups, - cursor: cursor - )) - } else { - sendSubscribeEvent(event: .subscriptionChanged( - channels: newInput.allSubscribedChannels, - groups: newInput.allSubscribedGroups + let currentInput = subscribeEngine.state.input + let newChannels = channels.map { PubNubChannel(id: $0, withPresence: withPresence) } + let newGroups = groups.map { PubNubChannel(id: $0, withPresence: withPresence) } + let addingResult = currentInput.adding(channels: newChannels, and: newGroups) + let newInput = addingResult.newInput + + if newInput != currentInput { + if let cursor = cursor, cursor.timetoken != 0 { + sendSubscribeEvent(event: .subscriptionRestored( + channels: newInput.allSubscribedChannelNames, + groups: newInput.allSubscribedGroupNames, + cursor: cursor + )) + } else { + sendSubscribeEvent(event: .subscriptionChanged( + channels: newInput.allSubscribedChannelNames, + groups: newInput.allSubscribedGroupNames + )) + } + sendPresenceEvent(event: .joined( + channels: newInput.subscribedChannelNames, + groups: newInput.subscribedGroupNames )) + + notify { + $0.emit(subscribe: .subscriptionChanged( + .subscribed( + channels: addingResult.insertedChannels, + groups: addingResult.insertedGroups + )) + ) + } } - sendPresenceEvent(event: .joined( - channels: newInput.subscribedChannels, - groups: newInput.subscribedGroups - )) } func reconnect(at cursor: SubscribeCursor?) { let input = subscribeEngine.state.input - let channels = input.allSubscribedChannels - let groups = input.allSubscribedGroups + let channels = input.allSubscribedChannelNames + let groups = input.allSubscribedGroupNames if let cursor = cursor { sendSubscribeEvent(event: .subscriptionRestored( @@ -158,27 +171,51 @@ class EventEngineSubscriptionSessionStrategy: SubscriptionSessionStrategy { // MARK: - Unsubscribe func unsubscribe(from channels: [String], and groups: [String], presenceOnly: Bool) { - let newInput = subscribeEngine.state.input - ( - channels: channels.map { presenceOnly ? $0.presenceChannelName : $0 }, - groups: groups.map { presenceOnly ? $0.presenceChannelName : $0 } - ) - - presenceStateContainer.removeState(forChannels: channels) - presenceStateContainer.removeState(forGroups: groups) + let unsubscribedChannels = channels.map { presenceOnly ? $0.presenceChannelName : $0 } + let unsubscribedGroups = groups.map { presenceOnly ? $0.presenceChannelName : $0 } + let currentInput = subscribeEngine.state.input + let removingRes = subscribeEngine.state.input.removing(channels: unsubscribedChannels, and: unsubscribedGroups) + let newInput = removingRes.newInput - sendSubscribeEvent(event: .subscriptionChanged( - channels: newInput.allSubscribedChannels, - groups: newInput.allSubscribedGroups - )) - sendPresenceEvent(event: .left( - channels: channels, - groups: groups - )) + if newInput != currentInput { + if configuration.maintainPresenceState { + presenceStateContainer.removeState(forChannels: channels) + presenceStateContainer.removeState(forGroups: groups) + } + sendSubscribeEvent(event: .subscriptionChanged( + channels: newInput.allSubscribedChannelNames, + groups: newInput.allSubscribedGroupNames + )) + sendPresenceEvent(event: .left( + channels: channels, + groups: groups + )) + + notify { + $0.emit(subscribe: .subscriptionChanged( + .unsubscribed( + channels: removingRes.removedChannels, + groups: removingRes.removedGroups + )) + ) + } + } } func unsubscribeAll() { + let currentInput = subscribeEngine.state.input + sendSubscribeEvent(event: .unsubscribeAll) sendPresenceEvent(event: .leftAll) + + notify { + $0.emit(subscribe: .subscriptionChanged( + .unsubscribed( + channels: currentInput.channels, + groups: currentInput.groups + ) + )) + } } } diff --git a/Sources/PubNub/Subscription/Strategy/LegacySubscriptionSessionStrategy+Presence.swift b/Sources/PubNub/Subscription/Strategy/LegacySubscriptionSessionStrategy+Presence.swift index 23e3b9f3..bc8b02bf 100644 --- a/Sources/PubNub/Subscription/Strategy/LegacySubscriptionSessionStrategy+Presence.swift +++ b/Sources/PubNub/Subscription/Strategy/LegacySubscriptionSessionStrategy+Presence.swift @@ -21,14 +21,18 @@ extension LegacySubscriptionSessionStrategy { return } - let timer = Timer(fireAt: Date(timeIntervalSinceNow: Double(configuration.heartbeatInterval)), - interval: 0.0, - target: self, - selector: #selector(peformHeartbeatLoop), - userInfo: nil, - repeats: false) - - RunLoop.main.add(timer, forMode: .common) + let timer = Timer( + fireAt: Date(timeIntervalSinceNow: Double(configuration.heartbeatInterval)), + interval: 0.0, + target: self, + selector: #selector(peformHeartbeatLoop), + userInfo: nil, + repeats: false + ) + RunLoop.main.add( + timer, + forMode: .common + ) presenceTimer = timer } diff --git a/Sources/PubNub/Subscription/Strategy/LegacySubscriptionSessionStrategy.swift b/Sources/PubNub/Subscription/Strategy/LegacySubscriptionSessionStrategy.swift index a7c0fecd..a34717cb 100644 --- a/Sources/PubNub/Subscription/Strategy/LegacySubscriptionSessionStrategy.swift +++ b/Sources/PubNub/Subscription/Strategy/LegacySubscriptionSessionStrategy.swift @@ -76,14 +76,27 @@ class LegacySubscriptionSessionStrategy: SubscriptionSessionStrategy { var mutableSession = subscribeSession filterExpression = configuration.filterExpression - nonSubscribeSession = presenceSession responseQueue = DispatchQueue(label: "com.pubnub.subscription.response", qos: .default) sessionStream = SessionListener(queue: responseQueue) + // Add listener to session mutableSession.sessionStream = sessionStream longPollingSession = mutableSession + + sessionStream.didRetryRequest = { [weak self] _ in + self?.connectionStatus = .reconnecting + } + + sessionStream.sessionDidReceiveChallenge = { [weak self] _, _ in + if self?.connectionStatus == .reconnecting { + // Delay time for server to process connection after TLS handshake + DispatchQueue.main.asyncAfter(deadline: DispatchTime.now() + 0.05) { + self?.connectionStatus = .connected + } + } + } } deinit { @@ -119,7 +132,7 @@ class LegacySubscriptionSessionStrategy: SubscriptionSessionStrategy { } if subscribeChange.didChange { - // notify { $0.emit(subscribe: .subscriptionChanged(subscribeChange)) } + notify { $0.emit(subscribe: .subscriptionChanged(subscribeChange)) } } if subscribeChange.didChange || !connectionStatus.isActive { @@ -127,11 +140,13 @@ class LegacySubscriptionSessionStrategy: SubscriptionSessionStrategy { } } - func reconnect(at cursor: SubscribeCursor?) { + /// Reconnect a disconnected subscription stream + /// - parameter timetoken: The timetoken to subscribe with + func reconnect(at cursor: SubscribeCursor? = nil) { if !connectionStatus.isActive { + connectionStatus = .connecting // Start subscribe loop performSubscribeLoop(at: cursor) - // Start presence heartbeat registerHeartbeatTimer() } else { @@ -150,7 +165,6 @@ class LegacySubscriptionSessionStrategy: SubscriptionSessionStrategy { func stopSubscribeLoop(_ reason: PubNubError.Reason) -> Bool { // Cancel subscription requests request?.cancel(PubNubError(reason, router: request?.router)) - return connectionStatus.isActive } @@ -159,28 +173,28 @@ class LegacySubscriptionSessionStrategy: SubscriptionSessionStrategy { let (channels, groups) = internalState.lockedWrite { state -> ([String], [String]) in (state.allSubscribedChannels, state.allSubscribedGroups) } - + // Don't start subscription if there no channels/groups if channels.isEmpty, groups.isEmpty { return } - + // Create Endpoing let router = SubscribeRouter( .subscribe( - channels: channels, groups: groups, channelStates: [:], timetoken: cursor?.timetoken, - region: cursor?.region.description, heartbeat: configuration.durationUntilTimeout, - filter: filterExpression - ), configuration: configuration + channels: channels, groups: groups, channelStates: [:], + timetoken: cursor?.timetoken, region: cursor?.region.description, + heartbeat: configuration.durationUntilTimeout, filter: filterExpression + ),configuration: configuration ) - + // Cancel previous request before starting new one stopSubscribeLoop(.longPollingRestart) - - // Will compre this in the error response to see if we need to restart - let nextSubscribe = longPollingSession - .request(with: router, requestOperator: configuration.automaticRetry) + + // Will compare this in the error response to see if we need to restart + let nextSubscribe = longPollingSession.request(with: router, requestOperator: configuration.automaticRetry) let currentSubscribeID = nextSubscribe.requestID + request = nextSubscribe request? @@ -219,6 +233,15 @@ class LegacySubscriptionSessionStrategy: SubscriptionSessionStrategy { pubnubGroups[$0] = PubNubChannel(channel: $0) } } + + listener.emit(subscribe: .responseReceived( + SubscribeResponseHeader( + channels: pubnubChannels.values.map { $0 }, + groups: pubnubGroups.values.map { $0 }, + previous: cursor, + next: response.payload.cursor + )) + ) } // Attempt to detect missed messages due to queue overflow @@ -250,7 +273,6 @@ class LegacySubscriptionSessionStrategy: SubscriptionSessionStrategy { } self?.notify { $0.emit(batch: events) } - self?.previousTokenResponse = response.payload.cursor // Repeat the request @@ -258,12 +280,12 @@ class LegacySubscriptionSessionStrategy: SubscriptionSessionStrategy { case let .failure(error): self?.notify { [unowned self] in $0.emit(subscribe: - .errorReceived(PubNubError.event(error, router: self?.request?.router)) + .errorReceived(PubNubError.event(error, router: self?.request?.router)) ) } - + if error.pubNubError?.reason == .clientCancelled || error.pubNubError?.reason == .longPollingRestart || - error.pubNubError?.reason == .longPollingReset { + error.pubNubError?.reason == .longPollingReset { if self?.subscriptionCount == 0 { self?.connectionStatus = .disconnected } else if self?.request?.requestID == currentSubscribeID { @@ -301,6 +323,9 @@ class LegacySubscriptionSessionStrategy: SubscriptionSessionStrategy { } if subscribeChange.didChange { + notify { + $0.emit(subscribe: .subscriptionChanged(subscribeChange)) + } // Call unsubscribe to cleanup remaining state items unsubscribeCleanup(subscribeChange: subscribeChange) } @@ -321,6 +346,9 @@ class LegacySubscriptionSessionStrategy: SubscriptionSessionStrategy { } if subscribeChange.didChange { + notify { + $0.emit(subscribe: .subscriptionChanged(subscribeChange)) + } // Cancel previous subscribe request. stopSubscribeLoop(.longPollingReset) // Call unsubscribe to cleanup remaining state items @@ -333,9 +361,11 @@ class LegacySubscriptionSessionStrategy: SubscriptionSessionStrategy { if !configuration.supressLeaveEvents { switch subscribeChange { case let .unsubscribed(channels, groups): - presenceLeave(for: configuration.uuid, - on: channels.map { $0.id }, - and: groups.map { $0.id }) { [weak self] result in + presenceLeave( + for: configuration.uuid, + on: channels.map { $0.id }, + and: groups.map { $0.id } + ) { [weak self] result in switch result { case .success: if !channels.isEmpty { @@ -375,7 +405,6 @@ extension LegacySubscriptionSessionStrategy: EventStreamEmitter { func add(_ listener: ListenerType) { // Ensure that we cancel the previously attached token listener.token?.cancel() - // Add new token to the listener listener.token = ListenerToken { [weak self, weak listener] in if let listener = listener { diff --git a/Sources/PubNub/Subscription/SubscribeSessionFactory.swift b/Sources/PubNub/Subscription/SubscribeSessionFactory.swift index 14837edb..caad6a3a 100644 --- a/Sources/PubNub/Subscription/SubscribeSessionFactory.swift +++ b/Sources/PubNub/Subscription/SubscribeSessionFactory.swift @@ -22,7 +22,7 @@ import Foundation /// /// - Important: Having multiple `SubscriptionSession` instances will result in /// increase network usage and battery drain. -@available(*, deprecated, message: "Use methods from PubNub object to subscribe/unsubscribe") +@available(*, deprecated, message: "Use methods from a PubNub object to subscribe/unsubscribe") public class SubscribeSessionFactory { private typealias SessionMap = [Int: WeakBox] @@ -134,7 +134,7 @@ public class SubscribeSessionFactory { // MARK: - SubscriptionConfiguration /// The configuration used to determine the uniqueness of a `SubscriptionSession` -@available(*, deprecated, message: "Use PubNub object with PubNubConfiguration that matches the parameters below") +@available(*, deprecated, message: "Use a PubNub object with PubNubConfiguration that matches the parameters below") public protocol SubscriptionConfiguration: RouterConfiguration { /// Reconnection policy which will be used if/when a request fails var automaticRetry: AutomaticRetry? { get } diff --git a/Sources/PubNub/Subscription/SubscriptionSession.swift b/Sources/PubNub/Subscription/SubscriptionSession.swift index 6bd95a18..f1c57737 100644 --- a/Sources/PubNub/Subscription/SubscriptionSession.swift +++ b/Sources/PubNub/Subscription/SubscriptionSession.swift @@ -10,7 +10,7 @@ import Foundation -@available(*, deprecated, message: "Subscribe and unsubscribe using methods from PubNub object") +@available(*, deprecated, message: "Subscribe and unsubscribe using methods from a PubNub object") public class SubscriptionSession { /// An unique identifier for subscription session public var uuid: UUID { @@ -18,7 +18,7 @@ public class SubscriptionSession { } /// PSV2 feature to subscribe with a custom filter expression. - @available(*, deprecated, message: "Use `subscribeFilterExpression` from PubNub object") + @available(*, deprecated, message: "Use `subscribeFilterExpression` from a PubNub object") public var filterExpression: String? { get { strategy.filterExpression diff --git a/Tests/PubNubContractTest/PubNubContractTestCase.swift b/Tests/PubNubContractTest/PubNubContractTestCase.swift index 628aefec..f0883b3d 100644 --- a/Tests/PubNubContractTest/PubNubContractTestCase.swift +++ b/Tests/PubNubContractTest/PubNubContractTestCase.swift @@ -348,7 +348,7 @@ let defaultPublishKey = "demo-36" @discardableResult public func waitForPresenceChanges(_: PubNub, count: Int) -> [PubNubPresenceChange]? { if receivedPresenceChanges.count < count { - let receivedPresenceChangeExpectation = expectation(description: "Subscribe messages") + let receivedPresenceChangeExpectation = expectation(description: "Presence Events") receivedPresenceChangeExpectation.assertForOverFulfill = false presenceChangeReceivedHandler = { _, presenceChanges in if presenceChanges.count >= count { diff --git a/Tests/PubNubTests/EventEngine/Subscribe/EmitStatusTests.swift b/Tests/PubNubTests/EventEngine/Subscribe/EmitStatusTests.swift index 44d6a3d7..07b4b0c5 100644 --- a/Tests/PubNubTests/EventEngine/Subscribe/EmitStatusTests.swift +++ b/Tests/PubNubTests/EventEngine/Subscribe/EmitStatusTests.swift @@ -78,7 +78,7 @@ class EmitStatusTests: XCTestCase { statusChange: Subscribe.ConnectionStatusChange( oldStatus: .disconnected, newStatus: .connected, - error: SubscribeError(underlying: PubNubError(.unknown)) + error: PubNubError(.unknown) ), listeners: listeners ) diff --git a/Tests/PubNubTests/EventEngine/Subscribe/SubscribeEffectsTests.swift b/Tests/PubNubTests/EventEngine/Subscribe/SubscribeEffectsTests.swift index c11fc3e9..b66624a2 100644 --- a/Tests/PubNubTests/EventEngine/Subscribe/SubscribeEffectsTests.swift +++ b/Tests/PubNubTests/EventEngine/Subscribe/SubscribeEffectsTests.swift @@ -90,12 +90,9 @@ extension SubscribeEffectsTests { channels: ["channel1", "channel1-pnpres", "channel2"], groups: ["g1", "g2", "g2-pnpres"] ), expectedOutput: [ - .handshakeFailure(error: SubscribeError( - underlying: PubNubError( - .nameResolutionFailure, - underlying: URLError(.cannotFindHost) - ) - )) + .handshakeFailure( + error: PubNubError(.nameResolutionFailure, underlying: URLError(.cannotFindHost)) + ) ] ) } @@ -137,12 +134,7 @@ extension SubscribeEffectsTests { cursor: SubscribeCursor(timetoken: 111, region: 1) ), expectedOutput: [ .receiveFailure( - error: SubscribeError( - underlying: PubNubError( - .nameResolutionFailure, - underlying: URLError(.cannotFindHost) - ) - ) + error: PubNubError(.nameResolutionFailure, underlying: URLError(.cannotFindHost)) ) ]) } @@ -154,6 +146,8 @@ extension SubscribeEffectsTests { func test_HandshakeReconnectingSuccess() { let delayRange = 2.0...3.0 let urlError = URLError(.badServerResponse) + let httpUrlResponse = HTTPURLResponse(statusCode: 500)! + let pubNubError = PubNubError(urlError.pubnubReason!, underlying: urlError, affected: [.response(httpUrlResponse)]) mockResponse(subscribeResponse: SubscribeResponse( cursor: SubscribeCursor(timetoken: 12345, region: 1), @@ -165,7 +159,7 @@ extension SubscribeEffectsTests { channels: ["channel1", "channel1-pnpres", "channel2"], groups: ["g1", "g2", "g2-pnpres"], retryAttempt: 1, - reason: SubscribeError(underlying: PubNubError(urlError.pubnubReason!, underlying: urlError)) + reason: pubNubError ), timeout: 2 * delayRange.upperBound, expectedOutput: [ @@ -180,6 +174,8 @@ extension SubscribeEffectsTests { func test_HandshakeReconnectingFailed() { let delayRange = 2.0...3.0 let urlError = URLError(.badServerResponse) + let httpUrlResponse = HTTPURLResponse(statusCode: 500)! + let pubNubError = PubNubError(urlError.pubnubReason!, underlying: urlError, affected: [.response(httpUrlResponse)]) mockResponse( errorIfAny: URLError(.cannotFindHost), @@ -191,16 +187,14 @@ extension SubscribeEffectsTests { channels: ["channel1", "channel1-pnpres", "channel2"], groups: ["g1", "g2", "g2-pnpres"], retryAttempt: 1, - reason: SubscribeError(underlying: PubNubError(urlError.pubnubReason!, underlying: urlError)) + reason: pubNubError ), timeout: 2 * delayRange.upperBound, expectedOutput: [ .handshakeReconnectFailure( - error: SubscribeError( - underlying: PubNubError( - .nameResolutionFailure, - underlying: URLError(.cannotFindHost) - ) + error: PubNubError( + .nameResolutionFailure, + underlying: URLError(.cannotFindHost) ) ) ] @@ -217,19 +211,22 @@ extension SubscribeEffectsTests { channels: ["channel1", "channel1-pnpres", "channel2"], groups: ["g1", "g2", "g2-pnpres"], retryAttempt: 3, - reason: SubscribeError(underlying: PubNubError(urlError.pubnubReason!, underlying: urlError)) + reason: PubNubError(urlError.pubnubReason!, underlying: urlError) ), expectedOutput: [ .handshakeReconnectGiveUp( - error: SubscribeError(underlying: PubNubError(.badServerResponse)) + error: PubNubError(.badServerResponse) ) ] ) } func test_HandshakeReconnectIsDelayed() { - let delayRange = 2.0...3.0 let urlError = URLError(.badServerResponse) + let httpUrlResponse = HTTPURLResponse(statusCode: 500)! + let pubNubError = PubNubError(urlError.pubnubReason!, underlying: urlError, affected: [.response(httpUrlResponse)]) + + let delayRange = 2.0...3.0 let startDate = Date() mockResponse(subscribeResponse: SubscribeResponse( @@ -242,9 +239,9 @@ extension SubscribeEffectsTests { channels: ["channel1", "channel1-pnpres", "channel2"], groups: ["g1", "g2", "g2-pnpres"], retryAttempt: 1, - reason: SubscribeError(underlying: PubNubError(urlError.pubnubReason!, underlying: urlError)) + reason: pubNubError ), - timeout: 2 * delayRange.upperBound, + timeout: 2.5 * delayRange.upperBound, expectedOutput: [ .handshakeReconnectSuccess( cursor: SubscribeCursor(timetoken: 12345, region: 1) @@ -263,9 +260,11 @@ extension SubscribeEffectsTests { extension SubscribeEffectsTests { func test_ReceiveReconnectingSuccess() { - let delayRange = 2.0...3.0 let urlError = URLError(.badServerResponse) - + let httpUrlResponse = HTTPURLResponse(statusCode: 500)! + let pubNubError = PubNubError(urlError.pubnubReason!, underlying: urlError, affected: [.response(httpUrlResponse)]) + let delayRange = 2.0...3.0 + mockResponse(subscribeResponse: SubscribeResponse( cursor: SubscribeCursor(timetoken: 12345, region: 1), messages: [firstMessage, secondMessage] @@ -277,7 +276,7 @@ extension SubscribeEffectsTests { groups: ["g1", "g2", "g2-pnpres"], cursor: SubscribeCursor(timetoken: 1111, region: 1), retryAttempt: 1, - reason: SubscribeError(underlying: PubNubError(urlError.pubnubReason!, underlying: urlError)) + reason: pubNubError ), timeout: 2 * delayRange.upperBound, expectedOutput: [ @@ -292,6 +291,8 @@ extension SubscribeEffectsTests { func test_ReceiveReconnectingFailure() { let delayRange = 2.0...3.0 let urlError = URLError(.badServerResponse) + let httpUrlResponse = HTTPURLResponse(statusCode: 500)! + let pubNubError = PubNubError(urlError.pubnubReason!, underlying: urlError, affected: [.response(httpUrlResponse)]) mockResponse( errorIfAny: URLError(.cannotFindHost), @@ -304,12 +305,12 @@ extension SubscribeEffectsTests { groups: ["g1", "g2", "g2-pnpres"], cursor: SubscribeCursor(timetoken: 1111, region: 1), retryAttempt: 1, - reason: SubscribeError(underlying: PubNubError(urlError.pubnubReason!, underlying: urlError)) + reason: pubNubError ), timeout: 2 * delayRange.upperBound, expectedOutput: [ .receiveReconnectFailure( - error: SubscribeError(underlying: PubNubError(.nameResolutionFailure)) + error: PubNubError(.nameResolutionFailure) ) ] ) @@ -330,19 +331,22 @@ extension SubscribeEffectsTests { groups: ["g1", "g2", "g2-pnpres"], cursor: SubscribeCursor(timetoken: 1111, region: 1), retryAttempt: 3, - reason: SubscribeError(underlying: PubNubError(urlError.pubnubReason!, underlying: urlError)) + reason: PubNubError(urlError.pubnubReason!, underlying: urlError) ), expectedOutput: [ .receiveReconnectGiveUp( - error: SubscribeError(underlying: PubNubError(.badServerResponse)) + error: PubNubError(.badServerResponse) ) ] ) } func test_ReceiveReconnectingIsDelayed() { - let delayRange = 2.0...3.0 let urlError = URLError(.badServerResponse) + let httpUrlResponse = HTTPURLResponse(statusCode: 500)! + let pubNubError = PubNubError(urlError.pubnubReason!, underlying: urlError, affected: [.response(httpUrlResponse)]) + + let delayRange = 2.0...3.0 let startDate = Date() mockResponse(subscribeResponse: SubscribeResponse( @@ -356,7 +360,7 @@ extension SubscribeEffectsTests { groups: ["g1", "g2", "g2-pnpres"], cursor: SubscribeCursor(timetoken: 1111, region: 1), retryAttempt: 1, - reason: SubscribeError(underlying: PubNubError(urlError.pubnubReason!, underlying: urlError)) + reason: pubNubError ), timeout: 2 * delayRange.upperBound, expectedOutput: [ diff --git a/Tests/PubNubTests/EventEngine/Subscribe/SubscribeInputTests.swift b/Tests/PubNubTests/EventEngine/Subscribe/SubscribeInputTests.swift index f482cd5f..6927f953 100644 --- a/Tests/PubNubTests/EventEngine/Subscribe/SubscribeInputTests.swift +++ b/Tests/PubNubTests/EventEngine/Subscribe/SubscribeInputTests.swift @@ -19,13 +19,13 @@ class SubscribeInputTests: XCTestCase { PubNubChannel(id: "second-channel") ]) - let expectedAllSubscribedChannels = ["first-channel", "second-channel"] - let expectedSubscribedChannels = ["first-channel", "second-channel"] + let expAllSubscribedChannelNames = ["first-channel", "second-channel"] + let expSubscribedChannelNames = ["first-channel", "second-channel"] - XCTAssertTrue(input.subscribedChannels.sorted(by: <).elementsEqual(expectedSubscribedChannels)) - XCTAssertTrue(input.allSubscribedChannels.sorted(by: <).elementsEqual(expectedAllSubscribedChannels)) - XCTAssertTrue(input.subscribedGroups.isEmpty) - XCTAssertTrue(input.allSubscribedGroups.isEmpty) + XCTAssertTrue(input.subscribedChannelNames.sorted(by: <).elementsEqual(expSubscribedChannelNames)) + XCTAssertTrue(input.allSubscribedChannelNames.sorted(by: <).elementsEqual(expAllSubscribedChannelNames)) + XCTAssertTrue(input.subscribedGroupNames.isEmpty) + XCTAssertTrue(input.allSubscribedGroupNames.isEmpty) } func test_ChannelsWithPresence() { @@ -34,13 +34,13 @@ class SubscribeInputTests: XCTestCase { PubNubChannel(id: "second-channel") ]) - let expectedAllSubscribedChannels = ["first-channel", "first-channel-pnpres", "second-channel"] - let expectedSubscribedChannels = ["first-channel", "second-channel"] + let expAllSubscribedChannelNames = ["first-channel", "first-channel-pnpres", "second-channel"] + let expSubscribedChannelNames = ["first-channel", "second-channel"] - XCTAssertTrue(input.subscribedChannels.sorted(by: <).elementsEqual(expectedSubscribedChannels)) - XCTAssertTrue(input.allSubscribedChannels.sorted(by: <).elementsEqual(expectedAllSubscribedChannels)) - XCTAssertTrue(input.subscribedGroups.isEmpty) - XCTAssertTrue(input.allSubscribedGroups.isEmpty) + XCTAssertTrue(input.subscribedChannelNames.sorted(by: <).elementsEqual(expSubscribedChannelNames)) + XCTAssertTrue(input.allSubscribedChannelNames.sorted(by: <).elementsEqual(expAllSubscribedChannelNames)) + XCTAssertTrue(input.subscribedGroupNames.isEmpty) + XCTAssertTrue(input.allSubscribedGroupNames.isEmpty) } func test_ChannelGroups() { @@ -55,15 +55,15 @@ class SubscribeInputTests: XCTestCase { ] ) - let expectedAllSubscribedChannels = ["first-channel", "second-channel"] - let expectedSubscribedChannels = ["first-channel", "second-channel"] - let expectedAllSubscribedGroups = ["group-1", "group-2"] - let expectedSubscribedGroups = ["group-1", "group-2"] + let expAllSubscribedChannelNames = ["first-channel", "second-channel"] + let expSubscribedChannelNames = ["first-channel", "second-channel"] + let expAllSubscribedGroupNames = ["group-1", "group-2"] + let expSubscribedGroupNames = ["group-1", "group-2"] - XCTAssertTrue(input.subscribedChannels.sorted(by: <).elementsEqual(expectedSubscribedChannels)) - XCTAssertTrue(input.allSubscribedChannels.sorted(by: <).elementsEqual(expectedAllSubscribedChannels)) - XCTAssertTrue(input.subscribedGroups.sorted(by: <).elementsEqual(expectedSubscribedGroups)) - XCTAssertTrue(input.allSubscribedGroups.sorted(by: <).elementsEqual(expectedAllSubscribedGroups)) + XCTAssertTrue(input.subscribedChannelNames.sorted(by: <).elementsEqual(expSubscribedChannelNames)) + XCTAssertTrue(input.allSubscribedChannelNames.sorted(by: <).elementsEqual(expAllSubscribedChannelNames)) + XCTAssertTrue(input.subscribedGroupNames.sorted(by: <).elementsEqual(expSubscribedGroupNames)) + XCTAssertTrue(input.allSubscribedGroupNames.sorted(by: <).elementsEqual(expAllSubscribedGroupNames)) } func test_addingInputContainsNoDuplicates() { @@ -77,23 +77,26 @@ class SubscribeInputTests: XCTestCase { PubNubChannel(id: "g2") ] ) - let result = input1 + SubscribeInput(channels: [ + let result = input1.adding(channels: [ PubNubChannel(id: "c1"), PubNubChannel(id: "c3", withPresence: true) - ], groups: [ + ], and: [ PubNubChannel(id: "g1"), PubNubChannel(id: "g3") ]) - let expectedAllSubscribedChannels = ["c1", "c2", "c2-pnpres", "c3", "c3-pnpres"] - let expectedSubscribedChannels = ["c1", "c2", "c3"] - let expectedAllSubscribedGroups = ["g1", "g2", "g3"] - let expectedSubscribedGroups = ["g1", "g2", "g3"] + let newInput = result.newInput + let expAllSubscribedChannelNames = ["c1", "c2", "c2-pnpres", "c3", "c3-pnpres"] + let expSubscribedChannelNames = ["c1", "c2", "c3"] + let expAllSubscribedGroupNames = ["g1", "g2", "g3"] + let expSubscribedGroupNames = ["g1", "g2", "g3"] - XCTAssertTrue(result.allSubscribedChannels.sorted(by: <).elementsEqual(expectedAllSubscribedChannels)) - XCTAssertTrue(result.subscribedChannels.sorted(by: <).elementsEqual(expectedSubscribedChannels)) - XCTAssertTrue(result.subscribedGroups.sorted(by: <).elementsEqual(expectedSubscribedGroups)) - XCTAssertTrue(result.allSubscribedGroups.sorted(by: <).elementsEqual(expectedAllSubscribedGroups)) + XCTAssertTrue(newInput.allSubscribedChannelNames.sorted(by: <).elementsEqual(expAllSubscribedChannelNames)) + XCTAssertTrue(newInput.subscribedChannelNames.sorted(by: <).elementsEqual(expSubscribedChannelNames)) + XCTAssertTrue(newInput.subscribedGroupNames.sorted(by: <).elementsEqual(expSubscribedGroupNames)) + XCTAssertTrue(newInput.allSubscribedGroupNames.sorted(by: <).elementsEqual(expAllSubscribedGroupNames)) + XCTAssertTrue(result.insertedChannels == [PubNubChannel(id: "c3", withPresence: true)]) + XCTAssertTrue(result.insertedGroups == [PubNubChannel(id: "g3")]) } func test_RemovingInput() { @@ -110,16 +113,28 @@ class SubscribeInputTests: XCTestCase { ] ) - let result = input1 - (channels: ["c1", "c3"], groups: ["g1", "g3"]) - let expectedAllSubscribedChannels = ["c2", "c2-pnpres"] - let expectedSubscribedChannels = ["c2"] - let expectedAllSubscribedGroups = ["g2"] - let expectedSubscribedGroups = ["g2"] - - XCTAssertTrue(result.allSubscribedChannels.sorted(by: <).elementsEqual(expectedAllSubscribedChannels)) - XCTAssertTrue(result.subscribedChannels.sorted(by: <).elementsEqual(expectedSubscribedChannels)) - XCTAssertTrue(result.subscribedGroups.sorted(by: <).elementsEqual(expectedSubscribedGroups)) - XCTAssertTrue(result.allSubscribedGroups.sorted(by: <).elementsEqual(expectedAllSubscribedGroups)) + let result = input1.removing(channels: ["c1", "c3"], and: ["g1", "g3"]) + let newInput = result.newInput + let expAllSubscribedChannelNames = ["c2", "c2-pnpres"] + let expSubscribedChannelNames = ["c2"] + let expAllSubscribedGroupNames = ["g2"] + let expSubscribedGroupNames = ["g2"] + + let expRemovedChannels = [ + PubNubChannel(id: "c1", withPresence: true), + PubNubChannel(id: "c3", withPresence: true) + ] + let expRemovedGroups = [ + PubNubChannel(id: "g1"), + PubNubChannel(id: "g3") + ] + + XCTAssertTrue(newInput.allSubscribedChannelNames.sorted(by: <).elementsEqual(expAllSubscribedChannelNames)) + XCTAssertTrue(newInput.subscribedChannelNames.sorted(by: <).elementsEqual(expSubscribedChannelNames)) + XCTAssertTrue(newInput.subscribedGroupNames.sorted(by: <).elementsEqual(expSubscribedGroupNames)) + XCTAssertTrue(newInput.allSubscribedGroupNames.sorted(by: <).elementsEqual(expAllSubscribedGroupNames)) + XCTAssertTrue(result.removedChannels == expRemovedChannels) + XCTAssertTrue(result.removedGroups == expRemovedGroups) } func test_RemovingInputWithPresenceOnly() { @@ -136,19 +151,20 @@ class SubscribeInputTests: XCTestCase { ] ) - let result = input1 - ( + let result = input1.removing( channels: ["c1".presenceChannelName, "c2".presenceChannelName, "c3".presenceChannelName], - groups: ["g1".presenceChannelName, "g3".presenceChannelName] + and: ["g1".presenceChannelName, "g3".presenceChannelName] ) - let expectedAllSubscribedChannels = ["c1", "c2", "c3"] - let expectedSubscribedChannels = ["c1", "c2", "c3"] - let expectedAllSubscribedGroups = ["g1", "g2", "g2-pnpres", "g3"] - let expectedSubscribedGroups = ["g1", "g2", "g3"] - - XCTAssertTrue(result.allSubscribedChannels.sorted(by: <).elementsEqual(expectedAllSubscribedChannels)) - XCTAssertTrue(result.subscribedChannels.sorted(by: <).elementsEqual(expectedSubscribedChannels)) - XCTAssertTrue(result.subscribedGroups.sorted(by: <).elementsEqual(expectedSubscribedGroups)) - XCTAssertTrue(result.allSubscribedGroups.sorted(by: <).elementsEqual(expectedAllSubscribedGroups)) + let newInput = result.newInput + let expAllSubscribedChannelNames = ["c1", "c2", "c3"] + let expSubscribedChannelNames = ["c1", "c2", "c3"] + let expAllSubscribedGroupNames = ["g1", "g2", "g2-pnpres", "g3"] + let expSubscribedGroupNames = ["g1", "g2", "g3"] + + XCTAssertTrue(newInput.allSubscribedChannelNames.sorted(by: <).elementsEqual(expAllSubscribedChannelNames)) + XCTAssertTrue(newInput.subscribedChannelNames.sorted(by: <).elementsEqual(expSubscribedChannelNames)) + XCTAssertTrue(newInput.subscribedGroupNames.sorted(by: <).elementsEqual(expSubscribedGroupNames)) + XCTAssertTrue(newInput.allSubscribedGroupNames.sorted(by: <).elementsEqual(expAllSubscribedGroupNames)) } } diff --git a/Tests/PubNubTests/EventEngine/Subscribe/SubscribeRequestTests.swift b/Tests/PubNubTests/EventEngine/Subscribe/SubscribeRequestTests.swift index b03bab6b..a65cd15e 100644 --- a/Tests/PubNubTests/EventEngine/Subscribe/SubscribeRequestTests.swift +++ b/Tests/PubNubTests/EventEngine/Subscribe/SubscribeRequestTests.swift @@ -29,10 +29,10 @@ class SubscribeRequestTests: XCTestCase { sessionResponseQueue: .main ) - let urlResponse = HTTPURLResponse(statusCode: 500) - let error = SubscribeError(underlying: PubNubError(.connectionFailure), urlResponse: urlResponse) + let urlResponse = HTTPURLResponse(statusCode: 500)! + let error = PubNubError(.connectionFailure, affected: [.response(urlResponse)]) - XCTAssertNil(request.reconnectionDelay(dueTo: error, with: 0)) + XCTAssertNil(request.reconnectionDelay(dueTo: error, retryAttempt: 0)) } func test_SubscribeRequestDoesNotRetryForNonSupportedCode() { @@ -57,8 +57,8 @@ class SubscribeRequestTests: XCTestCase { ) let urlError = URLError(.cannotFindHost) - let subscribeError = SubscribeError(underlying: PubNubError(urlError.pubnubReason!, underlying: urlError)) + let pubNubError = PubNubError(urlError.pubnubReason!, underlying: urlError) - XCTAssertNil(request.reconnectionDelay(dueTo: subscribeError, with: 0)) + XCTAssertNil(request.reconnectionDelay(dueTo: pubNubError, retryAttempt: 0)) } } diff --git a/Tests/PubNubTests/EventEngine/Subscribe/SubscribeTransitionTests.swift b/Tests/PubNubTests/EventEngine/Subscribe/SubscribeTransitionTests.swift index d32c57b4..5d0069e2 100644 --- a/Tests/PubNubTests/EventEngine/Subscribe/SubscribeTransitionTests.swift +++ b/Tests/PubNubTests/EventEngine/Subscribe/SubscribeTransitionTests.swift @@ -122,7 +122,7 @@ class SubscribeTransitionTests: XCTestCase { from: Subscribe.HandshakeFailedState( input: input, cursor: SubscribeCursor(timetoken: 0, region: 0), - error: SubscribeError(underlying: PubNubError(.unknown)) + error: PubNubError(.unknown) ), event: .subscriptionChanged( channels: ["c1", "c1", "c1-pnpres", "c2"], @@ -180,14 +180,13 @@ class SubscribeTransitionTests: XCTestCase { } func test_SubscriptionChangedForHandshakeReconnectingState() throws { - let reason = SubscribeError( - underlying: PubNubError(.unknown) - ) + let reason = PubNubError(.unknown) let results = transition.transition( from: Subscribe.HandshakeReconnectingState( input: input, cursor: SubscribeCursor(timetoken: 0, region: 0), - retryAttempt: 1, reason: reason + retryAttempt: 1, + reason: reason ), event: .subscriptionChanged( channels: ["c1", "c1", "c1-pnpres", "c2"], @@ -296,7 +295,7 @@ class SubscribeTransitionTests: XCTestCase { from: Subscribe.ReceiveFailedState( input: input, cursor: SubscribeCursor(timetoken: 500100900, region: 11), - error: SubscribeError(underlying: PubNubError(.unknown)) + error: PubNubError(.unknown) ), event: .subscriptionChanged( channels: ["c1", "c1", "c1-pnpres", "c2"], @@ -367,7 +366,7 @@ class SubscribeTransitionTests: XCTestCase { input: input, cursor: SubscribeCursor(timetoken: 500100900, region: 11), retryAttempt: 1, - reason: SubscribeError(underlying: PubNubError(.unknown)) + reason: PubNubError(.unknown) ), event: .subscriptionChanged( channels: ["c1", "c1", "c1-pnpres", "c2"], @@ -454,7 +453,7 @@ class SubscribeTransitionTests: XCTestCase { input: input, cursor: SubscribeCursor(timetoken: 1500100900, region: 41), retryAttempt: 1, - reason: SubscribeError(underlying: PubNubError(.unknown)) + reason: PubNubError(.unknown) ), event: .subscriptionRestored( channels: ["c1", "c1-pnpres", "c2", "c2", "c2-pnpres", "c3", "c3-pnpres", "c4"], @@ -499,7 +498,7 @@ class SubscribeTransitionTests: XCTestCase { from: Subscribe.ReceiveFailedState( input: input, cursor: SubscribeCursor(timetoken: 1500100900, region: 41), - error: SubscribeError(underlying: PubNubError(.unknown)) + error: PubNubError(.unknown) ), event: .subscriptionRestored( channels: ["c1", "c1-pnpres", "c2", "c2", "c2-pnpres", "c3", "c3-pnpres", "c4"], @@ -613,14 +612,13 @@ class SubscribeTransitionTests: XCTestCase { } func test_SubscriptionRestoredForHandshakeReconnectingState() { - let reason = SubscribeError( - underlying: PubNubError(.unknown) - ) + let reason = PubNubError(.unknown) let results = transition.transition( from: Subscribe.HandshakeReconnectingState( input: input, cursor: SubscribeCursor(timetoken: 0, region: 0), - retryAttempt: 1, reason: reason + retryAttempt: 1, + reason: reason ), event: .subscriptionRestored( channels: ["c1", "c1-pnpres", "c2", "c2", "c2-pnpres", "c3", "c3-pnpres", "c4"], @@ -664,7 +662,7 @@ class SubscribeTransitionTests: XCTestCase { from: Subscribe.HandshakeFailedState( input: input, cursor: SubscribeCursor(timetoken: 0, region: 0), - error: SubscribeError(underlying: PubNubError(.unknown)) + error: PubNubError(.unknown) ), event: .subscriptionRestored( channels: ["c1", "c1-pnpres", "c2", "c2", "c2-pnpres", "c3", "c3-pnpres", "c4"], @@ -752,8 +750,8 @@ class SubscribeTransitionTests: XCTestCase { newStatus: .connected, error: nil ))), - .managed(.receiveMessages(channels: input.allSubscribedChannels, - groups: input.allSubscribedGroups, + .managed(.receiveMessages(channels: input.allSubscribedChannelNames, + groups: input.allSubscribedGroupNames, cursor: cursor )) ] @@ -771,22 +769,22 @@ class SubscribeTransitionTests: XCTestCase { func test_HandshakeFailureForHandshakingState() { let results = transition.transition( from: Subscribe.HandshakingState(input: input, cursor: SubscribeCursor(timetoken: 0, region: 0)), - event: .handshakeFailure(error: SubscribeError(underlying: PubNubError(.unknown))) + event: .handshakeFailure(error: PubNubError(.unknown)) ) let expectedInvocations: [EffectInvocation] = [ .cancel(.handshakeRequest), .managed(.handshakeReconnect( - channels: input.allSubscribedChannels, - groups: input.allSubscribedGroups, + channels: input.allSubscribedChannelNames, + groups: input.allSubscribedGroupNames, retryAttempt: 0, - reason: SubscribeError(underlying: PubNubError(.unknown)) + reason: PubNubError(.unknown) )) ] let expectedState = Subscribe.HandshakeReconnectingState( input: input, cursor: SubscribeCursor(timetoken: 0, region: 0), retryAttempt: 0, - reason: SubscribeError(underlying: PubNubError(.unknown)) + reason: PubNubError(.unknown) ) XCTAssertTrue(results.state.isEqual(to: expectedState)) @@ -796,9 +794,7 @@ class SubscribeTransitionTests: XCTestCase { // MARK: - Handshake Reconnect Success func test_HandshakeReconnectSuccessForReconnectingState() { - let reason = SubscribeError( - underlying: PubNubError(.unknown) - ) + let reason = PubNubError(.unknown) let cursor = SubscribeCursor( timetoken: 200400600, region: 45 @@ -807,7 +803,8 @@ class SubscribeTransitionTests: XCTestCase { from: Subscribe.HandshakeReconnectingState( input: input, cursor: SubscribeCursor(timetoken: 0, region: 0), - retryAttempt: 1, reason: reason + retryAttempt: 1, + reason: reason ), event: .handshakeReconnectSuccess(cursor: cursor) ) @@ -819,8 +816,8 @@ class SubscribeTransitionTests: XCTestCase { error: nil ))), .managed(.receiveMessages( - channels: input.allSubscribedChannels, - groups: input.allSubscribedGroups, + channels: input.allSubscribedChannelNames, + groups: input.allSubscribedGroupNames, cursor: SubscribeCursor(timetoken: 200400600, region: 45) )) ] @@ -836,9 +833,7 @@ class SubscribeTransitionTests: XCTestCase { // MARK: - Handshake Reconnect Failure func test_HandshakeReconnectFailedForReconnectingState() { - let reason = SubscribeError( - underlying: PubNubError(.unknown) - ) + let reason = PubNubError(.unknown) let results = transition.transition( from: Subscribe.HandshakeReconnectingState( input: input, @@ -846,13 +841,13 @@ class SubscribeTransitionTests: XCTestCase { retryAttempt: 0, reason: reason ), - event: .handshakeReconnectFailure(error: SubscribeError(underlying: PubNubError(.unknown))) + event: .handshakeReconnectFailure(error: PubNubError(.unknown)) ) let expectedInvocations: [EffectInvocation] = [ .cancel(.handshakeReconnect), .managed(.handshakeReconnect( - channels: input.allSubscribedChannels, groups: input.allSubscribedGroups, - retryAttempt: 1, reason: SubscribeError(underlying: PubNubError(.unknown)) + channels: input.allSubscribedChannelNames, groups: input.allSubscribedGroupNames, + retryAttempt: 1, reason: PubNubError(.unknown) )) ] let expectedState = Subscribe.HandshakeReconnectingState( @@ -869,9 +864,7 @@ class SubscribeTransitionTests: XCTestCase { // MARK: - Handshake Give Up func test_HandshakeGiveUpForReconnectingState() { - let reason = SubscribeError( - underlying: PubNubError(.unknown) - ) + let reason = PubNubError(.unknown) let results = transition.transition( from: Subscribe.HandshakeReconnectingState( input: input, @@ -879,20 +872,20 @@ class SubscribeTransitionTests: XCTestCase { retryAttempt: 3, reason: reason ), - event: .handshakeReconnectGiveUp(error: SubscribeError(underlying: PubNubError(.unknown))) + event: .handshakeReconnectGiveUp(error: PubNubError(.unknown)) ) let expectedInvocations: [EffectInvocation] = [ .cancel(.handshakeReconnect), .managed(.emitStatus(change: Subscribe.ConnectionStatusChange( oldStatus: .disconnected, newStatus: .connectionError, - error: SubscribeError(underlying: PubNubError(.unknown)) + error: PubNubError(.unknown) ))) ] let expectedState = Subscribe.HandshakeFailedState( input: input, cursor: SubscribeCursor(timetoken: 0, region: 0), - error: SubscribeError(underlying: PubNubError(.unknown)) + error: PubNubError(.unknown) ) XCTAssertTrue(results.state.isEqual(to: expectedState)) @@ -902,28 +895,27 @@ class SubscribeTransitionTests: XCTestCase { // MARK: - Receive Give Up func test_ReceiveGiveUpForReconnectingState() { - let reason = SubscribeError( - underlying: PubNubError(.unknown) - ) + let reason = PubNubError(.unknown) let results = transition.transition( from: Subscribe.ReceiveReconnectingState( input: input, cursor: SubscribeCursor(timetoken: 18001000, region: 123), - retryAttempt: 3, reason: reason + retryAttempt: 3, + reason: reason ), - event: .receiveReconnectGiveUp(error: SubscribeError(underlying: PubNubError(.unknown))) + event: .receiveReconnectGiveUp(error: PubNubError(.unknown)) ) let expectedInvocations: [EffectInvocation] = [ .cancel(.receiveReconnect), .managed(.emitStatus(change: Subscribe.ConnectionStatusChange( oldStatus: .connected, newStatus: .disconnectedUnexpectedly, - error: SubscribeError(underlying: PubNubError(.unknown)) + error: PubNubError(.unknown) ))) ] let expectedState = Subscribe.ReceiveFailedState( input: input, cursor: SubscribeCursor(timetoken: 18001000, region: 123), - error: SubscribeError(underlying: PubNubError(.unknown)) + error: PubNubError(.unknown) ) XCTAssertTrue(results.state.isEqual(to: expectedState)) @@ -950,8 +942,8 @@ class SubscribeTransitionTests: XCTestCase { forCursor: SubscribeCursor(timetoken: 18002000, region: 123) )), .managed(.receiveMessages( - channels: input.allSubscribedChannels, - groups: input.allSubscribedGroups, + channels: input.allSubscribedChannelNames, + groups: input.allSubscribedGroupNames, cursor: SubscribeCursor(timetoken: 18002000, region: 123) )) ] @@ -967,24 +959,22 @@ class SubscribeTransitionTests: XCTestCase { // MARK: - Receive Failed func test_ReceiveFailedForReceivingState() { - let reason = SubscribeError( - underlying: PubNubError(.unknown) - ) + let reason = PubNubError(.unknown) let results = transition.transition( from: Subscribe.ReceivingState( input: input, cursor: SubscribeCursor(timetoken: 100500900, region: 11) ), - event: .receiveFailure(error: SubscribeError(underlying: PubNubError(.unknown))) + event: .receiveFailure(error: PubNubError(.unknown)) ) let expectedInvocations: [EffectInvocation] = [ .cancel(.receiveMessages), .managed(.receiveReconnect( - channels: input.allSubscribedChannels, - groups: input.allSubscribedGroups, + channels: input.allSubscribedChannelNames, + groups: input.allSubscribedGroupNames, cursor: SubscribeCursor(timetoken: 100500900, region: 11), retryAttempt: 0, - reason: SubscribeError(underlying: PubNubError(.unknown)) + reason: PubNubError(.unknown) )) ] let expectedState = Subscribe.ReceiveReconnectingState( @@ -999,9 +989,7 @@ class SubscribeTransitionTests: XCTestCase { } func test_ReceiveReconnectFailedForReconnectingState() { - let reason = SubscribeError( - underlying: PubNubError(.unknown) - ) + let reason = PubNubError(.unknown) let results = transition.transition( from: Subscribe.ReceiveReconnectingState( input: input, @@ -1009,16 +997,16 @@ class SubscribeTransitionTests: XCTestCase { retryAttempt: 1, reason: reason ), - event: .receiveReconnectFailure(error: SubscribeError(underlying: PubNubError(.unknown))) + event: .receiveReconnectFailure(error: PubNubError(.unknown)) ) let expectedInvocations: [EffectInvocation] = [ .cancel(.receiveReconnect), .managed(.receiveReconnect( - channels: input.allSubscribedChannels, - groups: input.allSubscribedGroups, + channels: input.allSubscribedChannelNames, + groups: input.allSubscribedGroupNames, cursor: SubscribeCursor(timetoken: 100500900, region: 11), retryAttempt: 2, - reason: SubscribeError(underlying: PubNubError(.unknown)) + reason: PubNubError(.unknown) )) ] let expectedState = Subscribe.ReceiveReconnectingState( @@ -1041,8 +1029,8 @@ class SubscribeTransitionTests: XCTestCase { ) let expectedInvocations: [EffectInvocation] = [ .managed(.handshakeRequest( - channels: input.allSubscribedChannels, - groups: input.allSubscribedGroups) + channels: input.allSubscribedChannelNames, + groups: input.allSubscribedGroupNames) ) ] let expectedState = Subscribe.HandshakingState( @@ -1058,14 +1046,14 @@ class SubscribeTransitionTests: XCTestCase { let results = transition.transition( from: Subscribe.HandshakeFailedState( input: input, cursor: SubscribeCursor(timetoken: 0, region: 0), - error: SubscribeError(underlying: PubNubError(.unknown)) + error: PubNubError(.unknown) ), event: .reconnect ) let expectedInvocations: [EffectInvocation] = [ .managed(.handshakeRequest( - channels: input.allSubscribedChannels, - groups: input.allSubscribedGroups + channels: input.allSubscribedChannelNames, + groups: input.allSubscribedGroupNames )) ] let expectedState = Subscribe.HandshakingState( @@ -1087,8 +1075,8 @@ class SubscribeTransitionTests: XCTestCase { ) let expectedInvocations: [EffectInvocation] = [ .managed(.handshakeRequest( - channels: input.allSubscribedChannels, - groups: input.allSubscribedGroups + channels: input.allSubscribedChannelNames, + groups: input.allSubscribedGroupNames )) ] let expectedState = Subscribe.HandshakingState( @@ -1105,14 +1093,14 @@ class SubscribeTransitionTests: XCTestCase { from: Subscribe.ReceiveFailedState( input: input, cursor: SubscribeCursor(timetoken: 123, region: 456), - error: SubscribeError(underlying: PubNubError(.unknown)) + error: PubNubError(.unknown) ), event: .reconnect ) let expectedInvocations: [EffectInvocation] = [ .managed(.handshakeRequest( - channels: input.allSubscribedChannels, - groups: input.allSubscribedGroups + channels: input.allSubscribedChannelNames, + groups: input.allSubscribedGroupNames )) ] let expectedState = Subscribe.HandshakingState( @@ -1154,7 +1142,7 @@ class SubscribeTransitionTests: XCTestCase { input: input, cursor: SubscribeCursor(timetoken: 0, region: 0), retryAttempt: 1, - reason: SubscribeError(underlying: PubNubError(.unknown)) + reason: PubNubError(.unknown) ), event: .disconnect ) @@ -1206,7 +1194,7 @@ class SubscribeTransitionTests: XCTestCase { input: input, cursor: SubscribeCursor(timetoken: 123, region: 456), retryAttempt: 1, - reason: SubscribeError(underlying: PubNubError(.unknown)) + reason: PubNubError(.unknown) ), event: .disconnect ) @@ -1254,7 +1242,7 @@ class SubscribeTransitionTests: XCTestCase { input: input, cursor: SubscribeCursor(timetoken: 0, region: 0), retryAttempt: 1, - reason: SubscribeError(underlying: PubNubError(.badRequest)) + reason: PubNubError(.badRequest) ), event: .unsubscribeAll ) @@ -1276,7 +1264,7 @@ class SubscribeTransitionTests: XCTestCase { let results = transition.transition( from: Subscribe.HandshakeFailedState( input: input, cursor: SubscribeCursor(timetoken: 0, region: 0), - error: SubscribeError(underlying: PubNubError(.badRequest)) + error: PubNubError(.badRequest) ), event: .unsubscribeAll ) @@ -1339,7 +1327,7 @@ class SubscribeTransitionTests: XCTestCase { input: input, cursor: SubscribeCursor(timetoken: 123, region: 456), retryAttempt: 1, - reason: SubscribeError(underlying: PubNubError(.badRequest)) + reason: PubNubError(.badRequest) ), event: .unsubscribeAll ) @@ -1362,7 +1350,7 @@ class SubscribeTransitionTests: XCTestCase { from: Subscribe.ReceiveFailedState( input: input, cursor: SubscribeCursor(timetoken: 123, region: 456), - error: SubscribeError(underlying: PubNubError(.badRequest)) + error: PubNubError(.badRequest) ), event: .unsubscribeAll ) diff --git a/Tests/PubNubTests/Helpers/PAMTokenTests.swift b/Tests/PubNubTests/Helpers/PAMTokenTests.swift index 0a0cf4ef..e11bb583 100644 --- a/Tests/PubNubTests/Helpers/PAMTokenTests.swift +++ b/Tests/PubNubTests/Helpers/PAMTokenTests.swift @@ -19,13 +19,12 @@ class PAMTokenTests: XCTestCase { subscribeKey: "", userId: "tester" ) - let eventEngineEnabledConfig = PubNubConfiguration( + let eeEnabledConfig = PubNubConfiguration( publishKey: "", subscribeKey: "", userId: "tester", enableEventEngine: true ) - static let allPermissionsToken = "qEF2AkF0GmEI03xDdHRsGDxDcmVzpURjaGFuoWljaGFubmVsLTEY70NncnChb2NoYW5uZWxfZ3JvdXAtMQVDdXNyoENzcGOgRHV1aWShZnV1aWQtMRhoQ3BhdKVEY2hhbqFtXmNoYW5uZWwtXFMqJBjvQ2dycKF0XjpjaGFubmVsX2dyb3VwLVxTKiQFQ3VzcqBDc3BjoER1dWlkoWpedXVpZC1cUyokGGhEbWV0YaBEdXVpZHR0ZXN0LWF1dGhvcml6ZWQtdXVpZENzaWdYIPpU-vCe9rkpYs87YUrFNWkyNq8CVvmKwEjVinnDrJJc" } @@ -60,30 +59,28 @@ extension PAMTokenTests { } func testSetToken() { - testSetToken(config: config) - testSetToken(config: eventEngineEnabledConfig) - } + for config in [config, eeEnabledConfig] { + XCTContext.runActivity(named: "Testing with enableEventEngine=\(config.enableEventEngine)") { _ in + let pubnub = PubNub(configuration: config) + pubnub.set(token: "access-token") - func testChangeToken() { - testChangeToken(config: config) - testChangeToken(config: eventEngineEnabledConfig) + XCTAssertEqual(pubnub.configuration.authToken, "access-token") + XCTAssertEqual(pubnub.subscription.configuration.authToken, "access-token") + } + } } - - private func testSetToken(config: PubNubConfiguration) { - let pubnub = PubNub(configuration: config) - pubnub.set(token: "access-token") - XCTAssertEqual(pubnub.configuration.authToken, "access-token") - XCTAssertEqual(pubnub.subscription.configuration.authToken, "access-token") - } - - private func testChangeToken(config: PubNubConfiguration) { - let pubnub = PubNub(configuration: config) - pubnub.set(token: "access-token") - pubnub.set(token: "access-token-updated") + func testChangeToken() { + for config in [config, eeEnabledConfig] { + XCTContext.runActivity(named: "Testing with enableEventEngine=\(config.enableEventEngine)") { _ in + let pubnub = PubNub(configuration: config) + pubnub.set(token: "access-token") + pubnub.set(token: "access-token-updated") - XCTAssertEqual(pubnub.configuration.authToken, "access-token-updated") - XCTAssertEqual(pubnub.subscription.configuration.authToken, "access-token-updated") + XCTAssertEqual(pubnub.configuration.authToken, "access-token-updated") + XCTAssertEqual(pubnub.subscription.configuration.authToken, "access-token-updated") + } + } } // swiftlint:enable line_length diff --git a/Tests/PubNubTests/Integration/SubscriptionIntegrationTests.swift b/Tests/PubNubTests/Integration/SubscriptionIntegrationTests.swift index d621e493..00c325b8 100644 --- a/Tests/PubNubTests/Integration/SubscriptionIntegrationTests.swift +++ b/Tests/PubNubTests/Integration/SubscriptionIntegrationTests.swift @@ -13,11 +13,12 @@ import XCTest class SubscriptionIntegrationTests: XCTestCase { let testsBundle = Bundle(for: SubscriptionIntegrationTests.self) - let testChannel = "SwiftSubscriptionITestsChannel" + let configuration = PubNubConfiguration(publishKey: "", subscribeKey: "", userId: UUID().uuidString) func testSubscribeError() { let subscribeExpect = expectation(description: "Subscribe Expectation") + let connectingExpect = expectation(description: "Connecting Expectation") let disconnectedExpect = expectation(description: "Disconnected Expectation") // Should return subscription key error @@ -29,10 +30,12 @@ class SubscriptionIntegrationTests: XCTestCase { switch event { case let .connectionStatusChanged(status): switch status { - case .disconnected: + case .connecting: + connectingExpect.fulfill() + case .disconnectedUnexpectedly: disconnectedExpect.fulfill() default: - XCTFail("Only should emit disconnected") + XCTFail("Only should emit these two states") } case .subscribeError: subscribeExpect.fulfill() // 8E988B17-C0AA-42F1-A6F9-1461BF51C82C @@ -40,11 +43,11 @@ class SubscriptionIntegrationTests: XCTestCase { break } } + pubnub.add(listener) - pubnub.subscribe(to: [testChannel]) - wait(for: [subscribeExpect, disconnectedExpect], timeout: 10.0) + wait(for: [subscribeExpect, connectingExpect, disconnectedExpect], timeout: 10.0) } // swiftlint:disable:next function_body_length cyclomatic_complexity @@ -70,6 +73,20 @@ class SubscriptionIntegrationTests: XCTestCase { let listener = SubscriptionListener() listener.didReceiveSubscription = { [unowned self] event in switch event { + case let .subscriptionChanged(status): + switch status { + case let .subscribed(channels, _): + XCTAssertTrue(channels.contains(where: { $0.id == self.testChannel })) + XCTAssertTrue(pubnub.subscribedChannels.contains(self.testChannel)) + subscribeExpect.fulfill() + case let .responseHeader(channels, _, _, next): + XCTAssertTrue(channels.contains(where: { $0.id == self.testChannel })) + XCTAssertEqual(pubnub.previousTimetoken, next?.timetoken) + case let .unsubscribed(channels, _): + XCTAssertTrue(channels.contains(where: { $0.id == self.testChannel })) + XCTAssertFalse(pubnub.subscribedChannels.contains(self.testChannel)) + unsubscribeExpect.fulfill() + } case .messageReceived: pubnub.unsubscribe(from: [self.testChannel]) publishExpect.fulfill() @@ -79,16 +96,14 @@ class SubscriptionIntegrationTests: XCTestCase { pubnub.publish(channel: self.testChannel, message: "Test") { _ in } connectedCount += 1 connectedExpect.fulfill() - case .connectionError: - XCTFail("An error was returned") case .disconnected: // Stop reconneced after N attempts if connectedCount < totalLoops { pubnub.subscribe(to: [self.testChannel]) } disconnectedExpect.fulfill() - case .disconnectedUnexpectedly: - XCTFail("An error was returned") + default: + break } case let .subscribeError(error): XCTFail("An error was returned: \(error)") @@ -96,8 +111,8 @@ class SubscriptionIntegrationTests: XCTestCase { break } } + pubnub.add(listener) - pubnub.subscribe(to: [testChannel]) wait(for: [subscribeExpect, unsubscribeExpect, publishExpect, connectedExpect, disconnectedExpect], timeout: 20.0) diff --git a/Tests/PubNubTests/Networking/Operators/AutomaticRetryTests.swift b/Tests/PubNubTests/Networking/Operators/AutomaticRetryTests.swift index b83da655..dd1d8577 100644 --- a/Tests/PubNubTests/Networking/Operators/AutomaticRetryTests.swift +++ b/Tests/PubNubTests/Networking/Operators/AutomaticRetryTests.swift @@ -18,7 +18,7 @@ class AutomaticRetryTests: XCTestCase { func testReconnectionPolicy_DefaultLinearPolicy() { switch defaultLinearPolicy { case let .linear(delay): - XCTAssertEqual(delay, 2) + XCTAssertEqual(delay, 3) default: XCTFail("Default Linear Policy should only match to linear case") } @@ -26,9 +26,10 @@ class AutomaticRetryTests: XCTestCase { func testReconnectionPolicy_DefaultExponentialPolicy() { switch defaultExpoentialPolicy { - case let .exponential(minDelay, maxDelay): - XCTAssertEqual(minDelay, 2) - XCTAssertEqual(maxDelay, 150) + case let .legacyExponential(base, scale, max): + XCTAssertEqual(base, 2) + XCTAssertEqual(scale, 2) + XCTAssertEqual(max, 300) default: XCTFail("Default Exponential Policy should only match to linear case") } @@ -38,101 +39,88 @@ class AutomaticRetryTests: XCTestCase { func testEquatable_Init_Valid_() { let testPolicy = AutomaticRetry.default - let automaticRetry = AutomaticRetry() + let policy = AutomaticRetry() - XCTAssertEqual(testPolicy, automaticRetry) + XCTAssertEqual(testPolicy, policy) } - func testEquatable_Init_Exponential_InvalidMinDelay() { - let invalidBasePolicy = AutomaticRetry.ReconnectionPolicy.exponential(minDelay: 0, maxDelay: 30) - let validBasePolicy = AutomaticRetry.ReconnectionPolicy.exponential(minDelay: 2, maxDelay: 30) - let automaticRetry = AutomaticRetry( - retryLimit: 2, - policy: invalidBasePolicy, - retryableHTTPStatusCodes: [], - retryableURLErrorCodes: [] + func testEquatable_Init_Exponential_InvalidBase() { + let invalidBasePolicy = AutomaticRetry.ReconnectionPolicy.legacyExponential( + base: 0, + scale: 3.0, + maxDelay: 1 + ) + let validBasePolicy = AutomaticRetry.ReconnectionPolicy.legacyExponential( + base: 2, scale: 3.0, maxDelay: 1 + ) + let testPolicy = AutomaticRetry( + retryLimit: 2, policy: invalidBasePolicy, retryableHTTPStatusCodes: [], retryableURLErrorCodes: [] ) - XCTAssertNotEqual(automaticRetry.policy, invalidBasePolicy) - XCTAssertEqual(automaticRetry.policy, validBasePolicy) + XCTAssertNotEqual(testPolicy.policy, invalidBasePolicy) + XCTAssertEqual(testPolicy.policy, validBasePolicy) } - - func testEquatable_Init_Exponential_MinDelayGreaterThanMaxDelay() { - let invalidBasePolicy = AutomaticRetry.ReconnectionPolicy.exponential(minDelay: 10, maxDelay: 5) - let validBasePolicy = AutomaticRetry.ReconnectionPolicy.exponential(minDelay: 10, maxDelay: 10) - let automaticRetry = AutomaticRetry( + + func testEquatable_Init_Exponential_InvalidScale() { + let invalidBasePolicy = AutomaticRetry.ReconnectionPolicy.legacyExponential( + base: 2, scale: -1.0, maxDelay: 1 + ) + let validBasePolicy = AutomaticRetry.ReconnectionPolicy.legacyExponential( + base: 2, scale: 0.0, maxDelay: 1 + ) + let testPolicy = AutomaticRetry( retryLimit: 2, policy: invalidBasePolicy, retryableHTTPStatusCodes: [], retryableURLErrorCodes: [] ) - XCTAssertNotEqual(automaticRetry.policy, invalidBasePolicy) - XCTAssertEqual(automaticRetry.policy, validBasePolicy) + XCTAssertNotEqual(testPolicy.policy, invalidBasePolicy) + XCTAssertEqual(testPolicy.policy, validBasePolicy) } - - func testEquatable_Init_Exponential_TooHighRetryLimit() { - let policy = AutomaticRetry.ReconnectionPolicy.exponential(minDelay: 5, maxDelay: 60) - let automaticRetry = AutomaticRetry( - retryLimit: 12, - policy: policy, + + func testEquatable_Init_Exponential_InvalidBaseAndScale() { + let invalidBasePolicy = AutomaticRetry.ReconnectionPolicy.legacyExponential( + base: 0, scale: -1.0, maxDelay: 1 + ) + let validBasePolicy = AutomaticRetry.ReconnectionPolicy.legacyExponential( + base: 2, scale: 0.0, maxDelay: 1 + ) + let testPolicy = AutomaticRetry( + retryLimit: 2, + policy: invalidBasePolicy, retryableHTTPStatusCodes: [], retryableURLErrorCodes: [] ) - XCTAssertEqual(automaticRetry.policy, policy) - XCTAssertEqual(automaticRetry.retryLimit, 10) + XCTAssertNotEqual(testPolicy.policy, invalidBasePolicy) + XCTAssertEqual(testPolicy.policy, validBasePolicy) } func testEquatable_Init_Linear_InvalidDelay() { let invalidBasePolicy = AutomaticRetry.ReconnectionPolicy.linear(delay: -1.0) let validBasePolicy = AutomaticRetry.ReconnectionPolicy.linear(delay: 2.0) - let automaticRetry = AutomaticRetry( + let testPolicy = AutomaticRetry( retryLimit: 2, policy: invalidBasePolicy, retryableHTTPStatusCodes: [], retryableURLErrorCodes: [] ) - XCTAssertNotEqual(automaticRetry.policy, invalidBasePolicy) - XCTAssertEqual(automaticRetry.policy, validBasePolicy) - } - - func testEquatable_Init_Linear_TooHighRetryLimit() { - let policy = AutomaticRetry.ReconnectionPolicy.linear(delay: 3.0) - let automaticRetry = AutomaticRetry( - retryLimit: 12, - policy: policy, - retryableHTTPStatusCodes: [], - retryableURLErrorCodes: [] - ) - - XCTAssertEqual(automaticRetry.policy, policy) - XCTAssertEqual(automaticRetry.retryLimit, 10) + XCTAssertNotEqual(testPolicy.policy, invalidBasePolicy) + XCTAssertEqual(testPolicy.policy, validBasePolicy) } func testEquatable_Init_Linear_Valid() { - let validLinearPolicy = AutomaticRetry.ReconnectionPolicy.linear(delay: 3.0) - let automaticRetry = AutomaticRetry( + let validLinearPolicy = AutomaticRetry.ReconnectionPolicy.linear(delay: 2.0) + let testPolicy = AutomaticRetry( retryLimit: 2, policy: validLinearPolicy, retryableHTTPStatusCodes: [], retryableURLErrorCodes: [] ) - XCTAssertEqual(automaticRetry.policy, validLinearPolicy) - } - - func testEquatable_Init_Other() { - let linearPolicy = AutomaticRetry.ReconnectionPolicy.linear(delay: 3.0) - let automaticRetry = AutomaticRetry( - retryLimit: 2, - policy: linearPolicy, - retryableHTTPStatusCodes: [], - retryableURLErrorCodes: [] - ) - - XCTAssertEqual(automaticRetry.policy, linearPolicy) + XCTAssertEqual(testPolicy.policy, validLinearPolicy) } // MARK: - retry(:session:for:dueTo:completion:) @@ -157,11 +145,11 @@ class AutomaticRetryTests: XCTestCase { guard let url = URL(string: "http://example.com") else { return XCTFail("Could not create URL") } + let testStatusCode = 500 - let testPolicy = AutomaticRetry( retryLimit: 2, - policy: .linear(delay: 3.0), + policy: .linear(delay: 2.0), retryableHTTPStatusCodes: [testStatusCode], retryableURLErrorCodes: [] ) @@ -180,7 +168,7 @@ class AutomaticRetryTests: XCTestCase { let testError = URLError(testURLErrorCode) let testPolicy = AutomaticRetry( retryLimit: 2, - policy: .linear(delay: 3.0), + policy: .linear(delay: 2.0), retryableHTTPStatusCodes: [], retryableURLErrorCodes: [testURLErrorCode] ) @@ -192,7 +180,7 @@ class AutomaticRetryTests: XCTestCase { let testError = URLError(.timedOut) let testPolicy = AutomaticRetry( retryLimit: 2, - policy: .linear(delay: 3.0), + policy: .linear(delay: 2.0), retryableHTTPStatusCodes: [], retryableURLErrorCodes: [] ) @@ -200,34 +188,71 @@ class AutomaticRetryTests: XCTestCase { XCTAssertFalse(testPolicy.shouldRetry(response: nil, error: testError)) } - // MARK: - exponentialBackoffDelay(for:scale:current:) + // MARK: - legacyExponential(base:scale:maxDelay:) - func testExponentialBackoffDelay_DefaultScale() { + func testLegacyExponentialBackoffDelay_DefaultScale() { let maxRetryCount = 5 + let scale = 2.0 + let base: UInt = 3 let maxDelay = UInt.max - // Usage of Range due to random delay (0...1) that's always added to the final value - let delayForRetry: [ClosedRange] = [2.0...3.0, 4.0...5.0, 8.0...9.0, 16.0...17.0, 32.0...33.0] - + let delayForRetry = [2.0...3.0, 6.0...7.0, 18.0...19.0, 54.0...55.0, 162.0...163.0] + for count in 0..] = [2.0...3.0, 3.0...4.0, 3.0...4.0, 3.0...4.0, 3.0...4.0] + func testLegacyExponentialBackoffDelay_MaxDelayHit() { let maxRetryCount = 5 + let scale = 2.0 + let base: UInt = 2 + let maxDelay: UInt = 0 + let delayForRetry = [2.0...3.0, 2.0...3.0, 2.0...3.0, 2.0...3.0, 2.0...3.0] for count in 0..