From d0ec8a37b7e59a09325b5fede7d6fd1637c7b60d Mon Sep 17 00:00:00 2001 From: jguz-pubnub Date: Tue, 13 Feb 2024 14:42:30 +0100 Subject: [PATCH] feat(listeners): adding a new Listeners API --- PubNub.xcodeproj/project.pbxproj | 206 +++++++--- .../Subscribe/Helpers/SubscribeInput.swift | 83 ++-- .../Events/New/Entities/EntityCreator.swift | 93 +++++ .../New/Entities/EntitySubscribable.swift | 47 +++ Sources/PubNub/Events/New/EventEmitter.swift | 129 +++++++ .../SubscribeMessagePayload+PubNubEvent.swift | 66 ++++ Sources/PubNub/Events/New/PubNubEvent.swift | 51 +++ Sources/PubNub/Events/New/Subscribable.swift | 164 ++++++++ Sources/PubNub/Events/New/Subscription.swift | 199 ++++++++++ .../PubNub/Events/New/SubscriptionSet.swift | 230 ++++++++++++ .../PubNub/Events/{ => Old}/EventStream.swift | 0 .../Subscription/PubNubEntityEvent.swift | 0 .../Subscription/SubscriptionStream.swift | 14 +- Sources/PubNub/PubNub.swift | 161 ++++++++ ...entEngineSubscriptionSessionStrategy.swift | 174 +++++---- .../LegacySubscriptionSessionStrategy.swift | 102 ++--- .../SubscriptionSessionStrategy.swift | 18 +- .../Subscription/SubscriptionSession.swift | 354 ++++++++++++++++-- .../Subscribe/SubscribeInputTests.swift | 24 +- .../Subscribe/SubscribeTransitionTests.swift | 28 +- .../Events/New/SubscriptionSetTests.swift | 128 +++++++ .../Events/New/SubscriptionTests.swift | 239 ++++++++++++ .../Events/{ => Old}/EventStreamTests.swift | 0 .../Events/Old/SessionStreamTests.swift | 102 +++++ .../{ => Old}/SubscriptionStreamTests.swift | 0 .../SubscribeMessagesGeneratorTests.swift | 136 +++++++ .../SubscriptionIntegrationTests.swift | 177 ++++++++- 27 files changed, 2609 insertions(+), 316 deletions(-) create mode 100644 Sources/PubNub/Events/New/Entities/EntityCreator.swift create mode 100644 Sources/PubNub/Events/New/Entities/EntitySubscribable.swift create mode 100644 Sources/PubNub/Events/New/EventEmitter.swift create mode 100644 Sources/PubNub/Events/New/Extensions/SubscribeMessagePayload+PubNubEvent.swift create mode 100644 Sources/PubNub/Events/New/PubNubEvent.swift create mode 100644 Sources/PubNub/Events/New/Subscribable.swift create mode 100644 Sources/PubNub/Events/New/Subscription.swift create mode 100644 Sources/PubNub/Events/New/SubscriptionSet.swift rename Sources/PubNub/Events/{ => Old}/EventStream.swift (100%) rename Sources/PubNub/Events/{ => Old}/Subscription/PubNubEntityEvent.swift (100%) rename Sources/PubNub/Events/{ => Old}/Subscription/SubscriptionStream.swift (97%) create mode 100644 Tests/PubNubTests/Events/New/SubscriptionSetTests.swift create mode 100644 Tests/PubNubTests/Events/New/SubscriptionTests.swift rename Tests/PubNubTests/Events/{ => Old}/EventStreamTests.swift (100%) create mode 100644 Tests/PubNubTests/Events/Old/SessionStreamTests.swift rename Tests/PubNubTests/Events/{ => Old}/SubscriptionStreamTests.swift (100%) create mode 100644 Tests/PubNubTests/Events/SubscribeMessagesGeneratorTests.swift diff --git a/PubNub.xcodeproj/project.pbxproj b/PubNub.xcodeproj/project.pbxproj index c77eebdd..fdd446dd 100644 --- a/PubNub.xcodeproj/project.pbxproj +++ b/PubNub.xcodeproj/project.pbxproj @@ -31,7 +31,6 @@ 35012EB82850049D00CF7E0A /* PubNubMembership.swift in Sources */ = {isa = PBXBuildFile; fileRef = 358B8977284D323300DB0F3D /* PubNubMembership.swift */; }; 35012EBA285004E300CF7E0A /* PubNubMembershipEvent.swift in Sources */ = {isa = PBXBuildFile; fileRef = 35012EB9285004E300CF7E0A /* PubNubMembershipEvent.swift */; }; 35012EBC2850052500CF7E0A /* PubNubSpaceEvent.swift in Sources */ = {isa = PBXBuildFile; fileRef = 35012EBB2850052500CF7E0A /* PubNubSpaceEvent.swift */; }; - 35012EC528500BA800CF7E0A /* PubNubEntityEvent.swift in Sources */ = {isa = PBXBuildFile; fileRef = 35012EC428500BA800CF7E0A /* PubNubEntityEvent.swift */; }; 35012EC82852741900CF7E0A /* Test+PubNubUser.swift in Sources */ = {isa = PBXBuildFile; fileRef = 35012EC62852740700CF7E0A /* Test+PubNubUser.swift */; }; 35012ECB2852AE8800CF7E0A /* Test+PubNubUserEvent.swift in Sources */ = {isa = PBXBuildFile; fileRef = 35012EC92852AE6300CF7E0A /* Test+PubNubUserEvent.swift */; }; 35012ECD2852B55A00CF7E0A /* Test+PubNubUserPatcher.swift in Sources */ = {isa = PBXBuildFile; fileRef = 35012ECC2852B55A00CF7E0A /* Test+PubNubUserPatcher.swift */; }; @@ -112,7 +111,6 @@ 35481BF6252275B5004E07B5 /* PubNubFile.swift in Sources */ = {isa = PBXBuildFile; fileRef = 35481BF5252275B5004E07B5 /* PubNubFile.swift */; }; 354ADA8822D909A30093EFFB /* Convertibles+PubNub.swift in Sources */ = {isa = PBXBuildFile; fileRef = 354ADA8722D909A30093EFFB /* Convertibles+PubNub.swift */; }; 354ADA8C22D923F20093EFFB /* Replaceables+PubNub.swift in Sources */ = {isa = PBXBuildFile; fileRef = 354ADA8B22D923F20093EFFB /* Replaceables+PubNub.swift */; }; - 354ADA8E22DA7F280093EFFB /* SessionStream.swift in Sources */ = {isa = PBXBuildFile; fileRef = 354ADA8D22DA7F280093EFFB /* SessionStream.swift */; }; 354ADA9022DA81650093EFFB /* DateFormatter+PubNub.swift in Sources */ = {isa = PBXBuildFile; fileRef = 354ADA8F22DA81650093EFFB /* DateFormatter+PubNub.swift */; }; 354ADA9422DCBC360093EFFB /* ResponseOperator.swift in Sources */ = {isa = PBXBuildFile; fileRef = 354ADA9322DCBC350093EFFB /* ResponseOperator.swift */; }; 354FC4C122D04D3600318932 /* DispatchQueue+PubNub.swift in Sources */ = {isa = PBXBuildFile; fileRef = 354FC4C022D04D3600318932 /* DispatchQueue+PubNub.swift */; }; @@ -131,7 +129,6 @@ 35580682230F3A34005CDD92 /* RequestIdOperator.swift in Sources */ = {isa = PBXBuildFile; fileRef = 35580681230F3A34005CDD92 /* RequestIdOperator.swift */; }; 35580686230F47EA005CDD92 /* RequestIdOperatorTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 35580684230F4771005CDD92 /* RequestIdOperatorTests.swift */; }; 3558068A230F4C99005CDD92 /* InstanceIdOperatorTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 35580687230F4B75005CDD92 /* InstanceIdOperatorTests.swift */; }; - 3558069A2311F968005CDD92 /* SubscriptionStreamTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 355806992311F968005CDD92 /* SubscriptionStreamTests.swift */; }; 3558069C231303D9005CDD92 /* AutomaticRetryTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 3558069B231303D9005CDD92 /* AutomaticRetryTests.swift */; }; 355806DB23145749005CDD92 /* PubNub.framework in Frameworks */ = {isa = PBXBuildFile; fileRef = "PubNub::PubNub::Product" /* PubNub.framework */; }; 3559977B23073D53000BCFD1 /* WeakBoxTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 3559977A23073D53000BCFD1 /* WeakBoxTests.swift */; }; @@ -154,8 +151,6 @@ 3562DBCB23450DA7006DFFBC /* objects_channel_remove_success.json in Resources */ = {isa = PBXBuildFile; fileRef = 3562DBCA23450DA7006DFFBC /* objects_channel_remove_success.json */; }; 3562DBCC23450FDE006DFFBC /* objects_channel_all_success_empty.json in Resources */ = {isa = PBXBuildFile; fileRef = 3562DBC623450D8C006DFFBC /* objects_channel_all_success_empty.json */; }; 3567434822E1E4F700BF2639 /* Collection+PubNub.swift in Sources */ = {isa = PBXBuildFile; fileRef = 3567434722E1E4F700BF2639 /* Collection+PubNub.swift */; }; - 356D48B32360BD6B00C65C40 /* EventStream.swift in Sources */ = {isa = PBXBuildFile; fileRef = 355C47A122CBD6F2006C3EEE /* EventStream.swift */; }; - 356D48B42360BD7000C65C40 /* SubscriptionStream.swift in Sources */ = {isa = PBXBuildFile; fileRef = 35A66A7D22F861BA00AC67A9 /* SubscriptionStream.swift */; }; 356D48B52360D9CE00C65C40 /* publish_message_too_large.json in Resources */ = {isa = PBXBuildFile; fileRef = 35EE2EC4235F69A5006183E9 /* publish_message_too_large.json */; }; 357024BF283C07C900567EE8 /* Objects+PubNub.swift in Sources */ = {isa = PBXBuildFile; fileRef = 357024BE283C07C900567EE8 /* Objects+PubNub.swift */; }; 35721576252FA675005A0144 /* XMLEncoder.swift in Sources */ = {isa = PBXBuildFile; fileRef = 35CDA4CD2510032B00218137 /* XMLEncoder.swift */; }; @@ -371,8 +366,6 @@ 35FE941022EFA30A0051C455 /* malformedFilterExpression_StatusCode.json in Resources */ = {isa = PBXBuildFile; fileRef = 35FE940F22EFA30A0051C455 /* malformedFilterExpression_StatusCode.json */; }; 35FE941222EFB70B0051C455 /* unrecognizedEndpointError.json in Resources */ = {isa = PBXBuildFile; fileRef = 35FE941122EFB70B0051C455 /* unrecognizedEndpointError.json */; }; 35FE941422EFB7C10051C455 /* unknownEndpointError.json in Resources */ = {isa = PBXBuildFile; fileRef = 35FE941322EFB7C10051C455 /* unknownEndpointError.json */; }; - 35FE941822EFCB7F0051C455 /* SessionStreamTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 35FE941722EFCB7F0051C455 /* SessionStreamTests.swift */; }; - 35FE941B22EFE5400051C455 /* EventStreamTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 35FE941A22EFE5400051C455 /* EventStreamTests.swift */; }; 35FE941F22F0929A0051C455 /* RequestRetrierTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 35FE941E22F0929A0051C455 /* RequestRetrierTests.swift */; }; 3D389FE12B35AF4A006928E7 /* TransitionProtocol.swift in Sources */ = {isa = PBXBuildFile; fileRef = 3D389FC32B35AF4A006928E7 /* TransitionProtocol.swift */; }; 3D389FE22B35AF4A006928E7 /* Dispatcher.swift in Sources */ = {isa = PBXBuildFile; fileRef = 3D389FC42B35AF4A006928E7 /* Dispatcher.swift */; }; @@ -437,6 +430,24 @@ 3D758DD62AB48A6A005D2B36 /* CryptorHeaderWithinStreamFinder.swift in Sources */ = {isa = PBXBuildFile; fileRef = 3D758DD42AB48A6A005D2B36 /* CryptorHeaderWithinStreamFinder.swift */; }; 3D9134972A1216F7000A5124 /* PubNubPushTargetTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 3D9134962A1216F7000A5124 /* PubNubPushTargetTests.swift */; }; 3DACC7F72AB88F8E00210B14 /* Data+CommonCrypto.swift in Sources */ = {isa = PBXBuildFile; fileRef = 3DACC7F62AB88F8E00210B14 /* Data+CommonCrypto.swift */; }; + 3DB9255C2B7A2B89001B7E90 /* SubscriptionStreamTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 3DB925592B7A2B89001B7E90 /* SubscriptionStreamTests.swift */; }; + 3DB9255D2B7A2B89001B7E90 /* EventStreamTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 3DB9255A2B7A2B89001B7E90 /* EventStreamTests.swift */; }; + 3DB925602B7A2B9B001B7E90 /* SubscriptionTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 3DB9255F2B7A2B9B001B7E90 /* SubscriptionTests.swift */; }; + 3DB925622B7A2BCA001B7E90 /* SessionStreamTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 3DB925612B7A2BCA001B7E90 /* SessionStreamTests.swift */; }; + 3DB925642B7A2BF5001B7E90 /* SubscriptionSetTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 3DB925632B7A2BF5001B7E90 /* SubscriptionSetTests.swift */; }; + 3DB925662B7A2C52001B7E90 /* SubscribeMessagesGeneratorTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 3DB925652B7A2C52001B7E90 /* SubscribeMessagesGeneratorTests.swift */; }; + 3DB9257C2B7AA75F001B7E90 /* EventStream.swift in Sources */ = {isa = PBXBuildFile; fileRef = 3DB9256B2B7AA75F001B7E90 /* EventStream.swift */; }; + 3DB9257D2B7AA75F001B7E90 /* SubscriptionStream.swift in Sources */ = {isa = PBXBuildFile; fileRef = 3DB9256D2B7AA75F001B7E90 /* SubscriptionStream.swift */; }; + 3DB9257E2B7AA75F001B7E90 /* PubNubEntityEvent.swift in Sources */ = {isa = PBXBuildFile; fileRef = 3DB9256E2B7AA75F001B7E90 /* PubNubEntityEvent.swift */; }; + 3DB9257F2B7AA75F001B7E90 /* SubscriptionSet.swift in Sources */ = {isa = PBXBuildFile; fileRef = 3DB925702B7AA75F001B7E90 /* SubscriptionSet.swift */; }; + 3DB925802B7AA75F001B7E90 /* PubNubEvent.swift in Sources */ = {isa = PBXBuildFile; fileRef = 3DB925712B7AA75F001B7E90 /* PubNubEvent.swift */; }; + 3DB925812B7AA75F001B7E90 /* SubscribeMessagePayload+PubNubEvent.swift in Sources */ = {isa = PBXBuildFile; fileRef = 3DB925732B7AA75F001B7E90 /* SubscribeMessagePayload+PubNubEvent.swift */; }; + 3DB925822B7AA75F001B7E90 /* Subscribable.swift in Sources */ = {isa = PBXBuildFile; fileRef = 3DB925742B7AA75F001B7E90 /* Subscribable.swift */; }; + 3DB925832B7AA75F001B7E90 /* Subscription.swift in Sources */ = {isa = PBXBuildFile; fileRef = 3DB925752B7AA75F001B7E90 /* Subscription.swift */; }; + 3DB925842B7AA75F001B7E90 /* EventEmitter.swift in Sources */ = {isa = PBXBuildFile; fileRef = 3DB925762B7AA75F001B7E90 /* EventEmitter.swift */; }; + 3DB925852B7AA75F001B7E90 /* EntityCreator.swift in Sources */ = {isa = PBXBuildFile; fileRef = 3DB925782B7AA75F001B7E90 /* EntityCreator.swift */; }; + 3DB925862B7AA75F001B7E90 /* EntitySubscribable.swift in Sources */ = {isa = PBXBuildFile; fileRef = 3DB925792B7AA75F001B7E90 /* EntitySubscribable.swift */; }; + 3DB925872B7AA75F001B7E90 /* SessionStream.swift in Sources */ = {isa = PBXBuildFile; fileRef = 3DB9257B2B7AA75F001B7E90 /* SessionStream.swift */; }; 3DBB2C212ABD8053008A100E /* PubNubCryptoModuleContractTestSteps.swift in Sources */ = {isa = PBXBuildFile; fileRef = 3DBB2C202ABD8053008A100E /* PubNubCryptoModuleContractTestSteps.swift */; }; 3DBB2C222ABD8053008A100E /* PubNubCryptoModuleContractTestSteps.swift in Sources */ = {isa = PBXBuildFile; fileRef = 3DBB2C202ABD8053008A100E /* PubNubCryptoModuleContractTestSteps.swift */; }; 3DD1FB992B5A7804005A14E3 /* PubNubPresenceStateContainer.swift in Sources */ = {isa = PBXBuildFile; fileRef = 3DD1FB982B5A7804005A14E3 /* PubNubPresenceStateContainer.swift */; }; @@ -600,7 +611,6 @@ 35012EB4285003EC00CF7E0A /* PubNubUserEvent.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = PubNubUserEvent.swift; sourceTree = ""; }; 35012EB9285004E300CF7E0A /* PubNubMembershipEvent.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = PubNubMembershipEvent.swift; sourceTree = ""; }; 35012EBB2850052500CF7E0A /* PubNubSpaceEvent.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = PubNubSpaceEvent.swift; sourceTree = ""; }; - 35012EC428500BA800CF7E0A /* PubNubEntityEvent.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = PubNubEntityEvent.swift; sourceTree = ""; }; 35012EC62852740700CF7E0A /* Test+PubNubUser.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = "Test+PubNubUser.swift"; sourceTree = ""; }; 35012EC92852AE6300CF7E0A /* Test+PubNubUserEvent.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = "Test+PubNubUserEvent.swift"; sourceTree = ""; }; 35012ECC2852B55A00CF7E0A /* Test+PubNubUserPatcher.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = "Test+PubNubUserPatcher.swift"; sourceTree = ""; }; @@ -687,7 +697,6 @@ 35481BF5252275B5004E07B5 /* PubNubFile.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = PubNubFile.swift; sourceTree = ""; }; 354ADA8722D909A30093EFFB /* Convertibles+PubNub.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = "Convertibles+PubNub.swift"; sourceTree = ""; }; 354ADA8B22D923F20093EFFB /* Replaceables+PubNub.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = "Replaceables+PubNub.swift"; sourceTree = ""; }; - 354ADA8D22DA7F280093EFFB /* SessionStream.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = SessionStream.swift; sourceTree = ""; }; 354ADA8F22DA81650093EFFB /* DateFormatter+PubNub.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = "DateFormatter+PubNub.swift"; sourceTree = ""; }; 354ADA9322DCBC350093EFFB /* ResponseOperator.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = ResponseOperator.swift; sourceTree = ""; }; 354D537028403B560043D61F /* Space+PubNub.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = "Space+PubNub.swift"; sourceTree = ""; }; @@ -703,7 +712,6 @@ 35580681230F3A34005CDD92 /* RequestIdOperator.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = RequestIdOperator.swift; sourceTree = ""; }; 35580684230F4771005CDD92 /* RequestIdOperatorTests.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = RequestIdOperatorTests.swift; sourceTree = ""; }; 35580687230F4B75005CDD92 /* InstanceIdOperatorTests.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = InstanceIdOperatorTests.swift; sourceTree = ""; }; - 355806992311F968005CDD92 /* SubscriptionStreamTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = SubscriptionStreamTests.swift; sourceTree = ""; }; 3558069B231303D9005CDD92 /* AutomaticRetryTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = AutomaticRetryTests.swift; sourceTree = ""; }; 3558073723145749005CDD92 /* PubNubIntTests.xctest */ = {isa = PBXFileReference; explicitFileType = wrapper.cfbundle; includeInIndex = 0; path = PubNubIntTests.xctest; sourceTree = BUILT_PRODUCTS_DIR; }; 3559977A23073D53000BCFD1 /* WeakBoxTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = WeakBoxTests.swift; sourceTree = ""; }; @@ -716,7 +724,6 @@ 35599798230C5878000BCFD1 /* LogWriter.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = LogWriter.swift; sourceTree = ""; }; 355BE9FC22C2917C000EC334 /* AnyJSON.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = AnyJSON.swift; sourceTree = ""; }; 355BE9FE22C2B74A000EC334 /* AnyJSONTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = AnyJSONTests.swift; sourceTree = ""; }; - 355C47A122CBD6F2006C3EEE /* EventStream.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = EventStream.swift; sourceTree = ""; }; 355E1E66234563550094D3E0 /* objects_membership_success.json */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = text.json; path = objects_membership_success.json; sourceTree = ""; }; 355E1E6823458BC10094D3E0 /* objects_members_success.json */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = text.json; path = objects_members_success.json; sourceTree = ""; }; 355F213622DECFCD004DEFBF /* Typealias+PubNub.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = "Typealias+PubNub.swift"; sourceTree = ""; }; @@ -802,7 +809,6 @@ 35A66A7522F861BA00AC67A9 /* WeakBox.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = WeakBox.swift; sourceTree = ""; }; 35A66A7722F861BA00AC67A9 /* AutomaticRetry.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = AutomaticRetry.swift; sourceTree = ""; }; 35A66A7C22F861BA00AC67A9 /* PubNubMessage.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = PubNubMessage.swift; sourceTree = ""; }; - 35A66A7D22F861BA00AC67A9 /* SubscriptionStream.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = SubscriptionStream.swift; sourceTree = ""; }; 35A66A8B22F9080A00AC67A9 /* getState_success.json */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = text.json; path = getState_success.json; sourceTree = ""; }; 35A66A8C22F9084000AC67A9 /* setState_success.json */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = text.json; path = setState_success.json; sourceTree = ""; }; 35A66A8D22F911DB00AC67A9 /* SubscribeSessionFactory.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = SubscribeSessionFactory.swift; sourceTree = ""; }; @@ -905,7 +911,6 @@ 35DA9AB42335491F00867989 /* ObjectsUUIDRouterTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = ObjectsUUIDRouterTests.swift; sourceTree = ""; }; 35DB0C4A2874768C001E1F76 /* FlatJSONCodable.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = FlatJSONCodable.swift; sourceTree = ""; }; 35DB0C4C287476BF001E1F76 /* OptionalChange.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = OptionalChange.swift; sourceTree = ""; }; - 35E381F623149B9000A17549 /* SubscriptionStreamTests.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = SubscriptionStreamTests.swift; sourceTree = ""; }; 35E381FC23149BA900A17549 /* AutomaticRetryTests.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = AutomaticRetryTests.swift; sourceTree = ""; }; 35E46050234B8C44005D04AE /* PubNubError.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = PubNubError.swift; sourceTree = ""; }; 35E71C39249027120032A991 /* getState_single_success.json */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = text.json; path = getState_single_success.json; sourceTree = ""; }; @@ -958,8 +963,6 @@ 35FE940F22EFA30A0051C455 /* malformedFilterExpression_StatusCode.json */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = text.json; path = malformedFilterExpression_StatusCode.json; sourceTree = ""; }; 35FE941122EFB70B0051C455 /* unrecognizedEndpointError.json */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = text.json; path = unrecognizedEndpointError.json; sourceTree = ""; }; 35FE941322EFB7C10051C455 /* unknownEndpointError.json */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = text.json; path = unknownEndpointError.json; sourceTree = ""; }; - 35FE941722EFCB7F0051C455 /* SessionStreamTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = SessionStreamTests.swift; sourceTree = ""; }; - 35FE941A22EFE5400051C455 /* EventStreamTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = EventStreamTests.swift; sourceTree = ""; }; 35FE941E22F0929A0051C455 /* RequestRetrierTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = RequestRetrierTests.swift; sourceTree = ""; }; 3D389FC32B35AF4A006928E7 /* TransitionProtocol.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = TransitionProtocol.swift; sourceTree = ""; }; 3D389FC42B35AF4A006928E7 /* Dispatcher.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Dispatcher.swift; sourceTree = ""; }; @@ -1020,6 +1023,24 @@ 3D758DD42AB48A6A005D2B36 /* CryptorHeaderWithinStreamFinder.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = CryptorHeaderWithinStreamFinder.swift; sourceTree = ""; }; 3D9134962A1216F7000A5124 /* PubNubPushTargetTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = PubNubPushTargetTests.swift; sourceTree = ""; }; 3DACC7F62AB88F8E00210B14 /* Data+CommonCrypto.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = "Data+CommonCrypto.swift"; sourceTree = ""; }; + 3DB925592B7A2B89001B7E90 /* SubscriptionStreamTests.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = SubscriptionStreamTests.swift; sourceTree = ""; }; + 3DB9255A2B7A2B89001B7E90 /* EventStreamTests.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = EventStreamTests.swift; sourceTree = ""; }; + 3DB9255F2B7A2B9B001B7E90 /* SubscriptionTests.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = SubscriptionTests.swift; sourceTree = ""; }; + 3DB925612B7A2BCA001B7E90 /* SessionStreamTests.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = SessionStreamTests.swift; sourceTree = ""; }; + 3DB925632B7A2BF5001B7E90 /* SubscriptionSetTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = SubscriptionSetTests.swift; sourceTree = ""; }; + 3DB925652B7A2C52001B7E90 /* SubscribeMessagesGeneratorTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = SubscribeMessagesGeneratorTests.swift; sourceTree = ""; }; + 3DB9256B2B7AA75F001B7E90 /* EventStream.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = EventStream.swift; sourceTree = ""; }; + 3DB9256D2B7AA75F001B7E90 /* SubscriptionStream.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = SubscriptionStream.swift; sourceTree = ""; }; + 3DB9256E2B7AA75F001B7E90 /* PubNubEntityEvent.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = PubNubEntityEvent.swift; sourceTree = ""; }; + 3DB925702B7AA75F001B7E90 /* SubscriptionSet.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = SubscriptionSet.swift; sourceTree = ""; }; + 3DB925712B7AA75F001B7E90 /* PubNubEvent.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = PubNubEvent.swift; sourceTree = ""; }; + 3DB925732B7AA75F001B7E90 /* SubscribeMessagePayload+PubNubEvent.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = "SubscribeMessagePayload+PubNubEvent.swift"; sourceTree = ""; }; + 3DB925742B7AA75F001B7E90 /* Subscribable.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Subscribable.swift; sourceTree = ""; }; + 3DB925752B7AA75F001B7E90 /* Subscription.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Subscription.swift; sourceTree = ""; }; + 3DB925762B7AA75F001B7E90 /* EventEmitter.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = EventEmitter.swift; sourceTree = ""; }; + 3DB925782B7AA75F001B7E90 /* EntityCreator.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = EntityCreator.swift; sourceTree = ""; }; + 3DB925792B7AA75F001B7E90 /* EntitySubscribable.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = EntitySubscribable.swift; sourceTree = ""; }; + 3DB9257B2B7AA75F001B7E90 /* SessionStream.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = SessionStream.swift; sourceTree = ""; }; 3DBB2C202ABD8053008A100E /* PubNubCryptoModuleContractTestSteps.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = PubNubCryptoModuleContractTestSteps.swift; sourceTree = ""; }; 3DD1FB982B5A7804005A14E3 /* PubNubPresenceStateContainer.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = PubNubPresenceStateContainer.swift; sourceTree = ""; }; 3DE632651BA8B2E27ACFC4AD /* Pods-PubNubContractTestsBeta.release.xcconfig */ = {isa = PBXFileReference; includeInIndex = 1; lastKnownFileType = text.xcconfig; name = "Pods-PubNubContractTestsBeta.release.xcconfig"; path = "Target Support Files/Pods-PubNubContractTestsBeta/Pods-PubNubContractTestsBeta.release.xcconfig"; sourceTree = ""; }; @@ -1269,16 +1290,6 @@ path = Response; sourceTree = ""; }; - 3556839322D28B8E0073A29C /* Events */ = { - isa = PBXGroup; - children = ( - 355C47A122CBD6F2006C3EEE /* EventStream.swift */, - 35A66A7B22F861BA00AC67A9 /* Subscription */, - 355F213322DE5C6F004DEFBF /* Session */, - ); - path = Events; - sourceTree = ""; - }; 3556E36E24802392004FDC25 /* Property Wrappers */ = { isa = PBXGroup; children = ( @@ -1368,14 +1379,6 @@ path = Responses; sourceTree = ""; }; - 355F213322DE5C6F004DEFBF /* Session */ = { - isa = PBXGroup; - children = ( - 354ADA8D22DA7F280093EFFB /* SessionStream.swift */, - ); - path = Session; - sourceTree = ""; - }; 3567434322E0203800BF2639 /* Operators */ = { isa = PBXGroup; children = ( @@ -1519,10 +1522,10 @@ 3580A5A622F14DB700B12E5E /* Events */ = { isa = PBXGroup; children = ( - 35E381F623149B9000A17549 /* SubscriptionStreamTests.swift */, - 35FE941A22EFE5400051C455 /* EventStreamTests.swift */, - 35FE941722EFCB7F0051C455 /* SessionStreamTests.swift */, - 355806992311F968005CDD92 /* SubscriptionStreamTests.swift */, + 3DB925652B7A2C52001B7E90 /* SubscribeMessagesGeneratorTests.swift */, + 3DB925612B7A2BCA001B7E90 /* SessionStreamTests.swift */, + 3DB925572B7A2B89001B7E90 /* Old */, + 3DB9255E2B7A2B9B001B7E90 /* New */, ); path = Events; sourceTree = ""; @@ -1680,15 +1683,6 @@ path = MessageHistory; sourceTree = ""; }; - 35A66A7B22F861BA00AC67A9 /* Subscription */ = { - isa = PBXGroup; - children = ( - 35A66A7D22F861BA00AC67A9 /* SubscriptionStream.swift */, - 35012EC428500BA800CF7E0A /* PubNubEntityEvent.swift */, - ); - path = Subscription; - sourceTree = ""; - }; 35A66A8522F8DB2E00AC67A9 /* Subscription */ = { isa = PBXGroup; children = ( @@ -2213,6 +2207,91 @@ path = Push; sourceTree = ""; }; + 3DB925572B7A2B89001B7E90 /* Old */ = { + isa = PBXGroup; + children = ( + 3DB925592B7A2B89001B7E90 /* SubscriptionStreamTests.swift */, + 3DB9255A2B7A2B89001B7E90 /* EventStreamTests.swift */, + ); + path = Old; + sourceTree = ""; + }; + 3DB9255E2B7A2B9B001B7E90 /* New */ = { + isa = PBXGroup; + children = ( + 3DB9255F2B7A2B9B001B7E90 /* SubscriptionTests.swift */, + 3DB925632B7A2BF5001B7E90 /* SubscriptionSetTests.swift */, + ); + path = New; + sourceTree = ""; + }; + 3DB925692B7AA75F001B7E90 /* Events */ = { + isa = PBXGroup; + children = ( + 3DB9256A2B7AA75F001B7E90 /* Old */, + 3DB9256F2B7AA75F001B7E90 /* New */, + 3DB9257A2B7AA75F001B7E90 /* Session */, + ); + path = Events; + sourceTree = ""; + }; + 3DB9256A2B7AA75F001B7E90 /* Old */ = { + isa = PBXGroup; + children = ( + 3DB9256B2B7AA75F001B7E90 /* EventStream.swift */, + 3DB9256C2B7AA75F001B7E90 /* Subscription */, + ); + path = Old; + sourceTree = ""; + }; + 3DB9256C2B7AA75F001B7E90 /* Subscription */ = { + isa = PBXGroup; + children = ( + 3DB9256D2B7AA75F001B7E90 /* SubscriptionStream.swift */, + 3DB9256E2B7AA75F001B7E90 /* PubNubEntityEvent.swift */, + ); + path = Subscription; + sourceTree = ""; + }; + 3DB9256F2B7AA75F001B7E90 /* New */ = { + isa = PBXGroup; + children = ( + 3DB925712B7AA75F001B7E90 /* PubNubEvent.swift */, + 3DB925762B7AA75F001B7E90 /* EventEmitter.swift */, + 3DB925742B7AA75F001B7E90 /* Subscribable.swift */, + 3DB925752B7AA75F001B7E90 /* Subscription.swift */, + 3DB925702B7AA75F001B7E90 /* SubscriptionSet.swift */, + 3DB925772B7AA75F001B7E90 /* Entities */, + 3DB925722B7AA75F001B7E90 /* Extensions */, + ); + path = New; + sourceTree = ""; + }; + 3DB925722B7AA75F001B7E90 /* Extensions */ = { + isa = PBXGroup; + children = ( + 3DB925732B7AA75F001B7E90 /* SubscribeMessagePayload+PubNubEvent.swift */, + ); + path = Extensions; + sourceTree = ""; + }; + 3DB925772B7AA75F001B7E90 /* Entities */ = { + isa = PBXGroup; + children = ( + 3DB925782B7AA75F001B7E90 /* EntityCreator.swift */, + 3DB925792B7AA75F001B7E90 /* EntitySubscribable.swift */, + ); + path = Entities; + sourceTree = ""; + }; + 3DB9257A2B7AA75F001B7E90 /* Session */ = { + isa = PBXGroup; + children = ( + 3DB9257B2B7AA75F001B7E90 /* SessionStream.swift */, + ); + path = Session; + sourceTree = ""; + }; 3DBD7CDD58292DFFDF108B95 /* Pods */ = { isa = PBXGroup; children = ( @@ -2435,11 +2514,11 @@ 3D389FC12B35AF4A006928E7 /* EventEngine */, 35B0ACE4252BE37C00537A18 /* APIs */, 35DB0C49287475F9001E1F76 /* Core */, + 3DB925692B7AA75F001B7E90 /* Events */, 3580A59B22F128A300B12E5E /* Errors */, 35A66A8522F8DB2E00AC67A9 /* Subscription */, 35AC16312487177500A66030 /* Models */, 35F0259722BBF948007BD7D3 /* Networking */, - 3556839322D28B8E0073A29C /* Events */, 35F0259222BBE68B007BD7D3 /* Extensions */, 3556E36E24802392004FDC25 /* Property Wrappers */, 355BE9F922C28C29000EC334 /* Helpers */, @@ -3355,6 +3434,7 @@ 3D6265D72ABCA79100FDD5E6 /* CryptorUtils.swift in Sources */, 35D8D4C522EB4600001B07D9 /* AnyJSON.swift in Sources */, 35AC16332487179400A66030 /* PubNubPage.swift in Sources */, + 3DB925802B7AA75F001B7E90 /* PubNubEvent.swift in Sources */, 35AC162F2486C9A400A66030 /* PubNubMessageAction.swift in Sources */, 3585033B22CD545400A11D9A /* URLRequest+PubNub.swift in Sources */, 35C6B6E622F51A060054F242 /* AnyJSONType.swift in Sources */, @@ -3374,10 +3454,13 @@ 35A6C7A822FBCC8B00E97CC5 /* PushRouter.swift in Sources */, 3D389FF72B35AF4A006928E7 /* PresenceHeartbeatRequest.swift in Sources */, 3D758DCE2AB0A835005D2B36 /* LegacyCryptor.swift in Sources */, + 3DB925812B7AA75F001B7E90 /* SubscribeMessagePayload+PubNubEvent.swift in Sources */, 35A66A7E22F861BA00AC67A9 /* SubscriptionSession.swift in Sources */, + 3DB925872B7AA75F001B7E90 /* SessionStream.swift in Sources */, 3D389FE42B35AF4A006928E7 /* EventEngine.swift in Sources */, - 356D48B32360BD6B00C65C40 /* EventStream.swift in Sources */, 35C6B6E322F515760054F242 /* SubscribeRouter.swift in Sources */, + 3DB9257C2B7AA75F001B7E90 /* EventStream.swift in Sources */, + 3DB925832B7AA75F001B7E90 /* Subscription.swift in Sources */, 35C6B6DD22F501780054F242 /* Encodable+PubNub.swift in Sources */, 35580682230F3A34005CDD92 /* RequestIdOperator.swift in Sources */, 35A66A8322F861BA00AC67A9 /* PubNubMessage.swift in Sources */, @@ -3387,9 +3470,9 @@ 3585033222CD138300A11D9A /* Set+PubNub.swift in Sources */, 358C641C238C5232009CE354 /* FCMAndroidPayload.swift in Sources */, 35FAC1E72357C2AE0096E418 /* PubNubError.swift in Sources */, - 356D48B42360BD7000C65C40 /* SubscriptionStream.swift in Sources */, 35599799230C5878000BCFD1 /* LogWriter.swift in Sources */, 354ADA9422DCBC360093EFFB /* ResponseOperator.swift in Sources */, + 3DB9257F2B7AA75F001B7E90 /* SubscriptionSet.swift in Sources */, 3DACC7F72AB88F8E00210B14 /* Data+CommonCrypto.swift in Sources */, 357CA28E251D3D0C00BC40D3 /* HTTPFileTask.swift in Sources */, 3534D4E622C67CCA008E89FA /* HTTPRouter.swift in Sources */, @@ -3408,26 +3491,28 @@ 3D389FF52B35AF4A006928E7 /* DelayedHeartbeatEffect.swift in Sources */, 35599792230A3F11000BCFD1 /* Thread+PubNub.swift in Sources */, 3D389FEF2B35AF4A006928E7 /* Presence.swift in Sources */, + 3DB925862B7AA75F001B7E90 /* EntitySubscribable.swift in Sources */, 3D389FF82B35AF4A006928E7 /* PresenceInput.swift in Sources */, + 3DB9257D2B7AA75F001B7E90 /* SubscriptionStream.swift in Sources */, 3567434822E1E4F700BF2639 /* Collection+PubNub.swift in Sources */, 354FC4C122D04D3600318932 /* DispatchQueue+PubNub.swift in Sources */, 359512102301DCAB00C9D3AE /* Crypto.swift in Sources */, - 354ADA8E22DA7F280093EFFB /* SessionStream.swift in Sources */, 3534D4E822C67D0E008E89FA /* OperationQueue+PubNub.swift in Sources */, 3585A02423C63EE900FDA860 /* CBORSerialization.swift in Sources */, 359152A122BA9AA30048842D /* PubNubConfiguration.swift in Sources */, 3D389FE62B35AF4A006928E7 /* EmitMessagesEffect.swift in Sources */, 3D38A02C2B35B087006928E7 /* LegacySubscriptionSessionStrategy+Presence.swift in Sources */, + 3DB925852B7AA75F001B7E90 /* EntityCreator.swift in Sources */, 358C641F238C5FCA009CE354 /* FCMWebpushPayload.swift in Sources */, 352DBFEA237CCB9D00A0106E /* EndpointResponse.swift in Sources */, 350EFBE422C95FED00FA33AA /* Atomic.swift in Sources */, 35293A7A2368F9680049A71F /* MessageActionsRouter.swift in Sources */, 35AC162B2485B1DA00A66030 /* SubscribeMessageActionPayload.swift in Sources */, 35599796230B6FFA000BCFD1 /* FileManager+PubNub.swift in Sources */, - 35012EC528500BA800CF7E0A /* PubNubEntityEvent.swift in Sources */, 3D389FE92B35AF4A006928E7 /* SubscribeEffectFactory.swift in Sources */, 355F213722DECFCD004DEFBF /* Typealias+PubNub.swift in Sources */, 3585A02623C63F3900FDA860 /* DecodingError+PubNub.swift in Sources */, + 3DB925842B7AA75F001B7E90 /* EventEmitter.swift in Sources */, 3557CE0723886434004BBACC /* PubNubAPNSPayload.swift in Sources */, 3D389FED2B35AF4A006928E7 /* Subscribe.swift in Sources */, 3D389FE22B35AF4A006928E7 /* Dispatcher.swift in Sources */, @@ -3479,6 +3564,8 @@ 3534D4E422C57659008E89FA /* PublishRouter.swift in Sources */, 35EE358C22E26A4D00E3F081 /* HTTPURLResponse+PubNub.swift in Sources */, 3D389FEE2B35AF4A006928E7 /* SubscribeTransition.swift in Sources */, + 3DB925822B7AA75F001B7E90 /* Subscribable.swift in Sources */, + 3DB9257E2B7AA75F001B7E90 /* PubNubEntityEvent.swift in Sources */, 3D389FE32B35AF4A006928E7 /* EffectHandler.swift in Sources */, 350EFBE022C9573F00FA33AA /* NSLocking+PubNub.swift in Sources */, ); @@ -3496,7 +3583,6 @@ 35CDFEAB22E762E100F3B9F2 /* String+PubNubTests.swift in Sources */, 35CDFEA922E75DA800F3B9F2 /* Set+PubNubTests.swift in Sources */, 359152AB22BAA6730048842D /* PubNubConfigurationTests.swift in Sources */, - 35FE941B22EFE5400051C455 /* EventStreamTests.swift in Sources */, 3D38A00D2B35AF6A006928E7 /* EmitMessagesTests.swift in Sources */, 35FE93C322EF57FA0051C455 /* Session+URLErrorTests.swift in Sources */, 35FE940122EF983A0051C455 /* Session+EndpointErrorTests.swift in Sources */, @@ -3508,6 +3594,7 @@ 35D0615A2303A61500FDB2F9 /* ValidatedTests.swift in Sources */, 35CDFEBC22E789B200F3B9F2 /* ConstantsTests.swift in Sources */, 35CF54A0248D96320099FE81 /* SubscribeRouterTests.swift in Sources */, + 3DB9255C2B7A2B89001B7E90 /* SubscriptionStreamTests.swift in Sources */, 3D38A0122B35AF6B006928E7 /* WaitEffectTests.swift in Sources */, 35CF54A1248DA6430099FE81 /* ObjectsChannelRouterTests.swift in Sources */, 359C2C1422EBB56A009C3B4B /* Int+PubNubTests.swift in Sources */, @@ -3518,6 +3605,8 @@ 35403F8A253617A8004B978E /* XMLCodingTests.swift in Sources */, 3557CDF8237F4611004BBACC /* MessageActionsRouterTests.swift in Sources */, 35CDFEAD22E7655700F3B9F2 /* URL+PubNubTests.swift in Sources */, + 3DB925622B7A2BCA001B7E90 /* SessionStreamTests.swift in Sources */, + 3DB925602B7A2B9B001B7E90 /* SubscriptionTests.swift in Sources */, 35CDFEBA22E77E2B00F3B9F2 /* URLSessionConfiguration+PubNubTests.swift in Sources */, 35FE941F22F0929A0051C455 /* RequestRetrierTests.swift in Sources */, 35580686230F47EA005CDD92 /* RequestIdOperatorTests.swift in Sources */, @@ -3528,6 +3617,7 @@ 35A6C7BA22FC5BFB00E97CC5 /* Data+PubNubTests.swift in Sources */, 35AB218D22E7D72200BD3049 /* AnyJSON+CodableTests.swift in Sources */, 3557CDFC237F59F6004BBACC /* PublishRouterTests.swift in Sources */, + 3DB9255D2B7A2B89001B7E90 /* EventStreamTests.swift in Sources */, 3557CE00237F6380004BBACC /* TimeRouterTests.swift in Sources */, 3557CDF7237F1E17004BBACC /* HistoryRouterTests.swift in Sources */, 3559977B23073D53000BCFD1 /* WeakBoxTests.swift in Sources */, @@ -3543,9 +3633,9 @@ OBJ_49 /* PubNubTests.swift in Sources */, 3558068A230F4C99005CDD92 /* InstanceIdOperatorTests.swift in Sources */, 35CF549E248D913A0099FE81 /* ObjectsUUIDRouterTests.swift in Sources */, + 3DB925642B7A2BF5001B7E90 /* SubscriptionSetTests.swift in Sources */, 35458BA3230CB3570085B502 /* SubscribeSessionFactoryTests.swift in Sources */, 3580A5A222F13C6500B12E5E /* SessionStreamAwait.swift in Sources */, - 3558069A2311F968005CDD92 /* SubscriptionStreamTests.swift in Sources */, 357AEB8622E6979D00C18250 /* DateFormatter+PubNubTests.swift in Sources */, 357AEB8222E6949300C18250 /* Bool+PubNubTests.swift in Sources */, 35458BA5230D8E500085B502 /* TestLogWriter.swift in Sources */, @@ -3553,13 +3643,13 @@ 3580A5A822F1583900B12E5E /* MockRequestOperators.swift in Sources */, 35458BA7230D91BB0085B502 /* TestSetup.swift in Sources */, 357AEB8C22E6A12400C18250 /* HTTPURLResponse+PubNubTests.swift in Sources */, + 3DB925662B7A2C52001B7E90 /* SubscribeMessagesGeneratorTests.swift in Sources */, 3580A59422F0C74100B12E5E /* RequestMutatorTests.swift in Sources */, 3D38A0112B35AF6B006928E7 /* SubscribeTransitionTests.swift in Sources */, 3D38A00B2B35AF6A006928E7 /* DispatcherTests.swift in Sources */, 35721576252FA675005A0144 /* XMLEncoder.swift in Sources */, 3D38A0182B35AF6B006928E7 /* EventEngineTests.swift in Sources */, 3557CDF6237F189E004BBACC /* ChannelGroupEndpointTests.swift in Sources */, - 35FE941822EFCB7F0051C455 /* SessionStreamTests.swift in Sources */, 35CDFEB822E7776400F3B9F2 /* URLRequest+PubNubTests.swift in Sources */, ); runOnlyForDeploymentPostprocessing = 0; @@ -3660,7 +3750,7 @@ ); HEADER_SEARCH_PATHS = "$(inherited)"; INFOPLIST_FILE = PubNub.xcodeproj/PubNubTests_Info.plist; - IPHONEOS_DEPLOYMENT_TARGET = 9.0; + IPHONEOS_DEPLOYMENT_TARGET = 11.0; LD_RUNPATH_SEARCH_PATHS = ( "$(inherited)", "@loader_path/../Frameworks", @@ -3693,7 +3783,7 @@ ); HEADER_SEARCH_PATHS = "$(inherited)"; INFOPLIST_FILE = PubNub.xcodeproj/PubNubTests_Info.plist; - IPHONEOS_DEPLOYMENT_TARGET = 9.0; + IPHONEOS_DEPLOYMENT_TARGET = 11.0; LD_RUNPATH_SEARCH_PATHS = ( "$(inherited)", "@loader_path/../Frameworks", @@ -4750,7 +4840,7 @@ ); HEADER_SEARCH_PATHS = "$(inherited)"; INFOPLIST_FILE = PubNub.xcodeproj/PubNubTests_Info.plist; - IPHONEOS_DEPLOYMENT_TARGET = 9.0; + IPHONEOS_DEPLOYMENT_TARGET = 11.0; LD_RUNPATH_SEARCH_PATHS = ( "$(inherited)", "@loader_path/../Frameworks", @@ -4784,7 +4874,7 @@ ); HEADER_SEARCH_PATHS = "$(inherited)"; INFOPLIST_FILE = PubNub.xcodeproj/PubNubTests_Info.plist; - IPHONEOS_DEPLOYMENT_TARGET = 9.0; + IPHONEOS_DEPLOYMENT_TARGET = 11.0; LD_RUNPATH_SEARCH_PATHS = ( "$(inherited)", "@loader_path/../Frameworks", diff --git a/Sources/PubNub/EventEngine/Subscribe/Helpers/SubscribeInput.swift b/Sources/PubNub/EventEngine/Subscribe/Helpers/SubscribeInput.swift index 6d6604fc..ed65f997 100644 --- a/Sources/PubNub/EventEngine/Subscribe/Helpers/SubscribeInput.swift +++ b/Sources/PubNub/EventEngine/Subscribe/Helpers/SubscribeInput.swift @@ -14,9 +14,24 @@ struct SubscribeInput: Equatable { private let channelEntries: [String: PubNubChannel] private let groupEntries: [String: PubNubChannel] + typealias InsertingResult = ( + newInput: SubscribeInput, + insertedChannels: [PubNubChannel], + insertedGroups: [PubNubChannel] + ) + typealias RemovingResult = ( + newInput: SubscribeInput, + removedChannels: [PubNubChannel], + removedGroups: [PubNubChannel] + ) + init(channels: [PubNubChannel] = [], groups: [PubNubChannel] = []) { - self.channelEntries = channels.reduce(into: [String: PubNubChannel]()) { r, channel in _ = r.insert(channel) } - self.groupEntries = groups.reduce(into: [String: PubNubChannel]()) { r, channel in _ = r.insert(channel) } + self.channelEntries = channels.reduce(into: [String: PubNubChannel]()) { r, channel in + _ = r.insert(channel) + } + self.groupEntries = groups.reduce(into: [String: PubNubChannel]()) { r, channel in + _ = r.insert(channel) + } } private init(channels: [String: PubNubChannel], groups: [String: PubNubChannel]) { @@ -89,18 +104,15 @@ struct SubscribeInput: Equatable { func adding( channels: [PubNubChannel], and groups: [PubNubChannel] - ) -> ( - newInput: SubscribeInput, - insertedChannels: [PubNubChannel], - insertedGroups: [PubNubChannel] - ) { + ) -> SubscribeInput.InsertingResult { + // Gets a copy of current channels and channel groups var currentChannels = channelEntries var currentGroups = groupEntries let insertedChannels = channels.filter { currentChannels.insert($0) } let insertedGroups = groups.filter { currentGroups.insert($0) } - return ( + return InsertingResult( newInput: SubscribeInput(channels: currentChannels, groups: currentGroups), insertedChannels: insertedChannels, insertedGroups: insertedGroups @@ -108,33 +120,26 @@ struct SubscribeInput: Equatable { } func removing( - channels: [String], - and groups: [String] - ) -> ( - newInput: SubscribeInput, - removedChannels: [PubNubChannel], - removedGroups: [PubNubChannel] - ) { + channels: [PubNubChannel], + presenceChannelsOnly: [PubNubChannel], + groups: [PubNubChannel], + presenceGroupsOnly: [PubNubChannel] + ) -> SubscribeInput.RemovingResult { + // Gets a copy of current channels and channel groups var currentChannels = channelEntries var currentGroups = groupEntries let removedChannels = channels.compactMap { - if $0.isPresenceChannelName { - return currentChannels.unsubscribePresence($0.trimmingPresenceChannelSuffix) - } else { - return currentChannels.removeValue(forKey: $0) - } + currentChannels.removeValue(forKey: $0.id) + } + presenceChannelsOnly.compactMap { + currentChannels.unsubscribePresence($0.id) } - let removedGroups = groups.compactMap { - if $0.isPresenceChannelName { - return currentGroups.unsubscribePresence($0.trimmingPresenceChannelSuffix) - } else { - return currentGroups.removeValue(forKey: $0) - } + currentGroups.removeValue(forKey: $0.id) + } + presenceGroupsOnly.compactMap { + currentGroups.unsubscribePresence($0.id) } - - return ( + return RemovingResult( newInput: SubscribeInput(channels: currentChannels, groups: currentGroups), removedChannels: removedChannels, removedGroups: removedGroups @@ -158,28 +163,6 @@ extension Dictionary where Key == String, Value == PubNubChannel { self[channel.id] = channel return true } - - func difference(_ dict: [Key:Value]) -> [Key: Value] { - let entriesInSelfAndNotInDict = filter { - dict[$0.0] != self[$0.0] - } - return entriesInSelfAndNotInDict.reduce([Key:Value]()) { (res, entry) -> [Key:Value] in - var res = res - res[entry.0] = entry.1 - return res - } - } - - func intersection(_ dict: [Key:Value]) -> [Key: Value] { - let entriesInSelfAndInDict = filter { - dict[$0.0] == self[$0.0] - } - return entriesInSelfAndInDict.reduce([Key:Value]()) { (res, entry) -> [Key:Value] in - var res = res - res[entry.0] = entry.1 - return res - } - } // Updates current Dictionary with the new channel value unsubscribed from Presence. // Returns the updated value if the corresponding entry matching the passed `id:` was found, otherwise `nil` diff --git a/Sources/PubNub/Events/New/Entities/EntityCreator.swift b/Sources/PubNub/Events/New/Entities/EntityCreator.swift new file mode 100644 index 00000000..d146c449 --- /dev/null +++ b/Sources/PubNub/Events/New/Entities/EntityCreator.swift @@ -0,0 +1,93 @@ +// +// PubNub+Subscribable.swift +// +// Copyright (c) PubNub Inc. +// All rights reserved. +// +// This source code is licensed under the license found in the +// LICENSE file in the root directory of this source tree. +// + +import Foundation + +/// Protocol for types capable of creating references for entities to which the user can subscribe, +/// receiving real-time updates. +public protocol EntityCreator { + /// Creates a new channel entity the user can subscribe to. + /// + /// This method does not create any entity, either locally or remotely; it merely provides + /// a reference to a channel that can be subscribed to and unsubscribed from + /// + /// - Parameters: + /// - name: The unique identifier for the channel. + /// - Returns: A `ChannelRepresentation` object representing the channel. + func channel(_ name: String) -> ChannelRepresentation + + /// Creates a new channel group entity the user can subscribe to. + /// + /// - Parameters: + /// - name: The unique identifier for the channel group. + /// - Returns: A `ChannelGroupRepresentation` object representing the channel group. + func channelGroup(_ name: String) -> ChannelGroupRepresentation + + /// Creates user metadata entity the user can subscribe to. + /// + /// This method does not create any entity, either locally or remotely; it merely provides + /// a reference to a channel that can be subscribed to and unsubscribed from + /// + /// - Parameters: + /// - name: The unique identifier for the user metadata. + /// - Returns: A `UserMetadataRepresentation` object representing the user metadata. + func userMetadata(_ name: String) -> UserMetadataRepresentation + + /// Creates channel metadata entity the user can subscribe to. + /// + /// This method does not create any entity, either locally or remotely; it merely provides + /// a reference to a channel that can be subscribed to and unsubscribed from + /// + /// - Parameters: + /// - name: The unique identifier for the channel metadata. + /// - Returns: A `ChannelMetadataRepresentation` object representing the channel metadata. + func channelMetadata(_ name: String) -> ChannelMetadataRepresentation +} + +public extension EntityCreator { + /// Creates a `SubscriptionSet` object from the collection of `Subscribable` entites. + /// + /// Use this function to set up and manage subscriptions for a collection of `Subscribable` entities. + /// + /// - Parameters: + /// - queue: The dispatch queue on which the subscription events should be handled + /// - entities: A collection of `Subscribable` entities to subscribe to + /// - options: Additional options for configuring the subscription + /// - Returns: A `SubscriptionSet` instance for managing the specified entities. + func subscription( + queue: DispatchQueue = .main, + entities: any Collection, + options: SubscriptionOptions = SubscriptionOptions.empty() + ) -> SubscriptionSet { + SubscriptionSet( + queue: queue, + entities: entities, + options: options + ) + } +} + +// This internal protocol is designed for types capable of receiving an intent +// to Subscribe or Unsubscribe and invoking the PubNub service with computed channels +// and channel groups. +protocol SubscribeReceiver: AnyObject { + func registerAdapter(_ adapter: BaseSubscriptionListenerAdapter) + + func internalSubscribe( + with channels: [Subscription], + and groups: [Subscription], + at timetoken: Timetoken? + ) + func internalUnsubscribe( + from channels: [Subscription], + and groups: [Subscription], + presenceOnly: Bool + ) +} diff --git a/Sources/PubNub/Events/New/Entities/EntitySubscribable.swift b/Sources/PubNub/Events/New/Entities/EntitySubscribable.swift new file mode 100644 index 00000000..f00e4a1e --- /dev/null +++ b/Sources/PubNub/Events/New/Entities/EntitySubscribable.swift @@ -0,0 +1,47 @@ +// +// EntitySubscribable.swift +// +// Copyright (c) PubNub Inc. +// All rights reserved. +// +// This source code is licensed under the license found in the +// LICENSE file in the root directory of this source tree. +// + +import Foundation + +// MARK: - PubNubChannelRepresentation + +/// Represents a channel that can be subscribed to and unsubscribed from using the PubNub service. +public class ChannelRepresentation: Subscribable { + init(name: String, receiver: SubscribeReceiver) { + super.init(name: name, subscriptionType: .channel, receiver: receiver) + } +} + +// MARK: - PubNubChannelGroupRepresentation + +/// Represents a channel group that can be subscribed to and unsubscribed from using the PubNub service. +public class ChannelGroupRepresentation: Subscribable { + init(name: String, receiver: SubscribeReceiver) { + super.init(name: name, subscriptionType: .channelGroup, receiver: receiver) + } +} + +// MARK: - PubNubUserMetadataRepresentation + +/// Represents user metadata that can be subscribed to and unsubscribed from using the PubNub service. +public class UserMetadataRepresentation: Subscribable { + init(name: String, receiver: SubscribeReceiver) { + super.init(name: name, subscriptionType: .channel, receiver: receiver) + } +} + +// MARK: - PubNubChannelMetadataRepresentation + +/// Represents channel metadata that can be subscribed to and unsubscribed from using the PubNub service. +public class ChannelMetadataRepresentation: Subscribable { + init(name: String, receiver: SubscribeReceiver) { + super.init(name: name, subscriptionType: .channel, receiver: receiver) + } +} diff --git a/Sources/PubNub/Events/New/EventEmitter.swift b/Sources/PubNub/Events/New/EventEmitter.swift new file mode 100644 index 00000000..f8273330 --- /dev/null +++ b/Sources/PubNub/Events/New/EventEmitter.swift @@ -0,0 +1,129 @@ +// +// EventEmitter.swift +// +// Copyright (c) PubNub Inc. +// All rights reserved. +// +// This source code is licensed under the license found in the +// LICENSE file in the root directory of this source tree. +// + +import Foundation + +// MARK: - StatusEmitter + +/// A protocol for types that emit PubNub status events from the Subscribe loop. +public protocol StatusEmitter: AnyObject { + /// A closure to be called when the connection status changes. + var didReceiveConnectionStatusChange: ((ConnectionStatus) -> Void)? { get set } + /// A closure to be called when a subscription error occurs. + var didReceiveSubscribeError: ((PubNubError) -> Void)? { get set } +} + +// MARK: - EventEmitter + +/// A protocol for types that emit PubNub events. +/// +/// Utilize closures to receive notifications when specific types of PubNub events occur. +public protocol EventEmitter: AnyObject { + /// An underlying queue to dispatch events + var queue: DispatchQueue { get } + /// A unique emitter's identifier + var uuid: UUID { get } + /// Receiver for a single event + var eventStream: ((PubNubEvent) -> Void)? { get set } + /// Receiver for multiple events. This will also emit individual events to `eventStream:` + var eventsStream: (([PubNubEvent]) -> Void)? { get set } + /// Receiver for Message events + var messagesStream: ((PubNubMessage) -> Void)? { get set } + /// Receiver for Signal events + var signalsStream: ((PubNubMessage) -> Void)? { get set } + /// Receiver for Presence events + var presenceStream: ((PubNubPresenceChange) -> Void)? { get set } + /// Receiver for Message Action events + var messageActionsStream: ((PubNubMessageActionEvent) -> Void)? { get set } + /// Receiver for File Upload events + var filesStream: ((PubNubFileEvent) -> Void)? { get set } + /// Receiver for App Context events + var appContextStream: ((PubNubAppContextEvent) -> Void)? { get set } +} + +/// A protocol representing a type that can be used to dispose of subscriptions. +public protocol SubscriptionDisposable { + /// Determines whether current emitter is disposed + var isDisposed: Bool { get } + /// Stops listening to incoming events and disposes current emitter + func dispose() +} + +extension EventEmitter { + func emit(events: [PubNubEvent]) { + queue.async { [weak self] in + if !events.isEmpty { + self?.eventsStream?(events) + } + for event in events { + self?.eventStream?(event) + switch event { + case let .messageReceived(message): + self?.messagesStream?(message) + case let .signalReceived(signal): + self?.signalsStream?(signal) + case let .presenceChange(presence): + self?.presenceStream?(presence) + case let .appContextEvent(appContextEvent): + self?.appContextStream?(appContextEvent) + case let .messageActionEvent(messageActionEvent): + self?.messageActionsStream?(messageActionEvent) + case let .fileUploadEvent(fileEvent): + self?.filesStream?(fileEvent) + } + } + } + } +} + +extension EventEmitter { + func clearCallbacks() { + eventStream = nil + eventsStream = nil + messagesStream = nil + signalsStream = nil + presenceStream = nil + messageActionsStream = nil + filesStream = nil + appContextStream = nil + } +} + +// `SubscribeMessagesReceiver` is an internal protocol defining a receiver for subscription messages. +// Types that conform to this protocol are responsible for handling and processing these payloads +// into concrete events for the user. +protocol SubscribeMessagesReceiver: AnyObject { + // A dictionary representing the names of the underlying subscriptions + var subscriptionTopology: [SubscribableType : [String]] { get } + // This method should return an array of `PubNubEvent` instances, + // representing the concrete events for the user. + @discardableResult func onPayloadsReceived(payloads: [SubscribeMessagePayload]) -> [PubNubEvent] +} + +// An internal class that functions as a bridge between the legacy `BaseSubscriptionListener` +// and either `Subscription` or `SubscriptionSet`, forwarding the received payloads. +class BaseSubscriptionListenerAdapter: BaseSubscriptionListener { + private(set) weak var receiver: SubscribeMessagesReceiver? + + init(receiver: SubscribeMessagesReceiver, uuid: UUID, queue: DispatchQueue) { + self.receiver = receiver + super.init(queue: queue, uuid: uuid) + } + + override func emit(batch: [SubscribeMessagePayload]) { + if let receiver = receiver { + receiver.onPayloadsReceived(payloads: batch) + } + } + + deinit { + cancel() + } +} diff --git a/Sources/PubNub/Events/New/Extensions/SubscribeMessagePayload+PubNubEvent.swift b/Sources/PubNub/Events/New/Extensions/SubscribeMessagePayload+PubNubEvent.swift new file mode 100644 index 00000000..bdd6d738 --- /dev/null +++ b/Sources/PubNub/Events/New/Extensions/SubscribeMessagePayload+PubNubEvent.swift @@ -0,0 +1,66 @@ +// +// SubscribeMessagePayload+PubNubEvent.swift +// +// Copyright (c) PubNub Inc. +// All rights reserved. +// +// This source code is licensed under the license found in the +// LICENSE file in the root directory of this source tree. +// + +import Foundation + +extension SubscribeMessagePayload { + func asPubNubEvent() -> PubNubEvent { + switch messageType { + case .message: + return .messageReceived(PubNubMessageBase(from: self)) + case .signal: + return .signalReceived(PubNubMessageBase(from: self)) + case .object: + guard let objectAction = try? payload.decode(SubscribeObjectMetadataPayload.self) else { + return .messageReceived(PubNubMessageBase(from: self)) + } + switch objectAction.subscribeEvent { + case .channelMetadataRemoved(let metadataId): + return .appContextEvent(.removedChannel(metadataId: metadataId)) + case .channelMetadataSet(let changes): + return .appContextEvent(.setChannel(changes)) + case .uuidMetadataSet(let changes): + return .appContextEvent(.setUUID(changes)) + case .uuidMetadataRemoved(let metadataId): + return .appContextEvent(.removedUUID(metadataId: metadataId)) + case .membershipMetadataSet(let metadata): + return .appContextEvent(.setMembership(metadata)) + case .membershipMetadataRemoved(let metadata): + return .appContextEvent(.removedMembership(metadata)) + default: + return .messageReceived(PubNubMessageBase(from: self)) + } + case .messageAction: + guard + let messageAction = PubNubMessageActionBase(from: self), + let actionEventString = payload[rawValue: "event"] as? String, + let actionEvent = SubscribeMessageActionPayload.Action(rawValue: actionEventString) + else { + return .messageReceived(PubNubMessageBase(from: self)) + } + switch actionEvent { + case .added: + return .messageActionEvent(.added(messageAction)) + case .removed: + return .messageActionEvent(.removed(messageAction)) + } + case .file: + guard let fileMessage = try? PubNubFileEventBase(from: self) else { + return .messageReceived(PubNubMessageBase(from: self)) + } + return .fileUploadEvent(fileMessage) + case .presence: + guard let presence = PubNubPresenceChangeBase(from: self) else { + return .messageReceived(PubNubMessageBase(from: self)) + } + return .presenceChange(presence) + } + } +} diff --git a/Sources/PubNub/Events/New/PubNubEvent.swift b/Sources/PubNub/Events/New/PubNubEvent.swift new file mode 100644 index 00000000..ec09a784 --- /dev/null +++ b/Sources/PubNub/Events/New/PubNubEvent.swift @@ -0,0 +1,51 @@ +// +// PubNubEvent.swift +// +// Copyright (c) PubNub Inc. +// All rights reserved. +// +// This source code is licensed under the license found in the +// LICENSE file in the root directory of this source tree. +// + +import Foundation + +/// Possible events within the PubNub module +public enum PubNubEvent { + /// A message has been received + case messageReceived(PubNubMessage) + /// A signal has been received + case signalReceived(PubNubMessage) + /// A presence change has been received + case presenceChange(PubNubPresenceChange) + /// A MessageAction was added/removed to a published message + case messageActionEvent(PubNubMessageActionEvent) + /// A File was uploaded to storage + case fileUploadEvent(PubNubFileEvent) + /// A Membership object has been added/removed/updated + case appContextEvent(PubNubAppContextEvent) +} + +/// Possible subevents for Message Actions +public enum PubNubMessageActionEvent { + /// The Message Action was added to a message + case added(PubNubMessageAction) + /// The Message Action was removed from a message + case removed(PubNubMessageAction) +} + +/// Possible subevents for AppContext +public enum PubNubAppContextEvent { + /// The `PubNubUUIDMetadataChangeset` of the set Membership + case setUUID(PubNubUUIDMetadataChangeset) + /// The unique identifer of the UUID that was removed + case removedUUID(metadataId: String) + /// The changeset for the Channel object that changed + case setChannel(PubNubChannelMetadataChangeset) + /// The unique identifer of the Channel that was removed + case removedChannel(metadataId: String) + /// The `PubNubMembershipMetadata` of the set Membership + case setMembership(PubNubMembershipMetadata) + /// The `PubNubMembershipMetadata` of the removed Membership + case removedMembership(PubNubMembershipMetadata) +} diff --git a/Sources/PubNub/Events/New/Subscribable.swift b/Sources/PubNub/Events/New/Subscribable.swift new file mode 100644 index 00000000..50fb1f73 --- /dev/null +++ b/Sources/PubNub/Events/New/Subscribable.swift @@ -0,0 +1,164 @@ +// +// ListenersPOC.swift +// +// Copyright (c) PubNub Inc. +// All rights reserved. +// +// This source code is licensed under the license found in the +// LICENSE file in the root directory of this source tree. +// + +import Foundation + +/// A protocol for types capable of initiating subscription-related actions. +public protocol SubscribeCapable { + /// Subscribes with the specified timetoken. + /// + /// - Parameter timetoken: The timetoken to use for subscribing. If `nil`, the `0` timetoken is used. + func subscribe(with timetoken: Timetoken?) + + /// Unsubscribes from, stopping the subscription. + func unsubscribe() +} + +public extension SubscribeCapable { + /// Subscribes with the `0` timetoken. + /// + /// Convenience method equivalent to calling `subscribe(with:)` with `nil`. + func subscribe() { + subscribe(with: nil) + } +} + +/// A base class for entities that can be subscribed to and unsubscribed from using the PubNub service. +public class Subscribable: Subscriber { + /// An entity name + public let name: String + /// The PubNub client associated with this channel. + weak var receiver: SubscribeReceiver? + /// An underlying subscription type + let subscriptionType: SubscribableType + + init(name: String, subscriptionType: SubscribableType, receiver: SubscribeReceiver) { + self.name = name + self.subscriptionType = subscriptionType + self.receiver = receiver + } +} + +enum SubscribableType { + case channel + case channelGroup +} + +/// Provides the ability to return a `Subscription` object for the underlying entity +/// +/// Subsequent calls to `.subscribe()` on the obtained `Subscription` instance will initiate the subscription. +/// Similarly, a subsequent call to `.unsubscribe()` will attempt to deregister the underlying entity from +/// the Subscribe loop if there are no active subscriptions matching the given entity. +public protocol Subscriber { + /// Creates a `Subscription` object with the specified queue and options. + /// + /// - Parameters: + /// - queue: The dispatch queue on which the subscription events should be handled. + /// - options: Additional options for configuring the subscription. + func subscription(queue: DispatchQueue, options: SubscriptionOptions) -> Subscription +} + +/// Provides a default subscription object for the conforming entity like `ChannelRepresentation`, +/// `ChannelGroupRepresentation`,`ChannelMetadataRepresentation`, and `UserMetadataRepresentation` +public extension Subscriber where Self: Subscribable { + /// Creates a `Subscription` object with default options for the conforming entity. + /// + /// - Parameters: + /// - queue: The dispatch queue on which the subscription events should be handled + /// - options: Additional options for configuring the subscription + func subscription( + queue: DispatchQueue = .main, + options: SubscriptionOptions = SubscriptionOptions.empty() + ) -> Subscription { + Subscription( + queue: queue, + entity: self, + options: options + ) + } +} + +/// A typealias representing an interface for PubNub subscriptions. +/// +/// This alias combines the conformance of `EventEmitter` and `SubscribeCapable`. +/// Thus, objects conforming to this type can both emit PubNub events and perform subscription-related actions. +public typealias SubscriptionInterface = EventEmitter & SubscriptionDisposable & SubscribeCapable + +/// A class representing subscription options for PubNub subscriptions. +/// +/// Use this class to define various subscription options that can be applied. +public class SubscriptionOptions { + let allOptions: [SubscriptionOptions] + + init(allOptions: [SubscriptionOptions] = []) { + self.allOptions = allOptions + } + + convenience init() { + self.init(allOptions: []) + } + + func filterCriteriaSatisfied(event: PubNubEvent) -> Bool { + allOptions.compactMap { + $0 as? FilterOption + }.reduce(into: true, { filteringResult, filter in + filteringResult = filteringResult && filter.predicate(event) + }) + } + + func hasPresenceOption() -> Bool { + !(allOptions.filter { $0 is ReceivePresenceEvents }.isEmpty) + } + + /// Provides an instance of `PubNubSubscriptionOptions` with no additional options. + public static func empty() -> SubscriptionOptions { + SubscriptionOptions(allOptions: []) + } + + /// Combines two instances of `PubNubSubscriptionOptions` using the `+` operator. + /// + /// - Parameters: + /// - lhs: The left-hand side instance. + /// - rhs: The right-hand side instance. + /// + /// - Returns: A new `SubscriptionOptions` instance combining the options from both instances. + public static func +( + lhs: SubscriptionOptions, + rhs: SubscriptionOptions + ) -> SubscriptionOptions { + var lhsOptions: [SubscriptionOptions] = lhs.allOptions + var rhsOptions: [SubscriptionOptions] = rhs.allOptions + + if lhs.allOptions.isEmpty { + lhsOptions = [lhs] + } + if rhsOptions.isEmpty { + rhsOptions = [rhs] + } + return SubscriptionOptions(allOptions: lhsOptions + rhsOptions) + } +} + +/// A class representing options for receiving presence events in subscriptions. +public class ReceivePresenceEvents: SubscriptionOptions { + public init() { + super.init(allOptions: []) + } +} + +/// A class representing a filter with a predicate for subscription options. +public class FilterOption: SubscriptionOptions { + public let predicate: ((PubNubEvent) -> Bool) + + public init(predicate: @escaping ((PubNubEvent) -> Bool)) { + self.predicate = predicate + super.init(allOptions: []) + } +} diff --git a/Sources/PubNub/Events/New/Subscription.swift b/Sources/PubNub/Events/New/Subscription.swift new file mode 100644 index 00000000..c7704ce0 --- /dev/null +++ b/Sources/PubNub/Events/New/Subscription.swift @@ -0,0 +1,199 @@ +// +// PubNubSubscription.swift +// +// Copyright (c) PubNub Inc. +// All rights reserved. +// +// This source code is licensed under the license found in the +// LICENSE file in the root directory of this source tree. +// + +import Foundation + +/// A final class representing a PubNub subscription. +/// +/// Use this class to create and manage subscriptions for a specific `Subscribable` entity. +/// It conforms to `EventEmitter`, allowing the handling of subscription-related events. +public final class Subscription: EventEmitter, SubscriptionDisposable { + /// Initializes a `Subscription` object. + /// + /// - Parameters: + /// - queue: An underlying queue to dispatch events + /// - entity: An object that should be added to the Subscribe loop. + /// - options: Additional subscription options + public init( + queue: DispatchQueue = .main, + entity: Subscribable, + options: SubscriptionOptions = SubscriptionOptions.empty() + ) { + self.queue = queue + self.entity = entity + self.options = SubscriptionOptions.empty() + options + } + + public let queue: DispatchQueue + /// A unique identifier for `Subscription` + public let uuid: UUID = UUID() + /// An underlying entity that should be added to the Subscribe loop + public let entity: Subscribable + /// Attached options + public let options: SubscriptionOptions + /// Whether current subscription is disposed or not + public private(set) var isDisposed = false + // Stores the timetoken the user subscribed with + private(set) var timetoken: Timetoken? + + public var eventStream: ((PubNubEvent) -> Void)? + public var eventsStream: (([PubNubEvent]) -> Void)? + public var messagesStream: ((PubNubMessage) -> Void)? + public var signalsStream: ((PubNubMessage) -> Void)? + public var presenceStream: ((PubNubPresenceChange) -> Void)? + public var messageActionsStream: ((PubNubMessageActionEvent) -> Void)? + public var filesStream: ((PubNubFileEvent) -> Void)? + public var appContextStream: ((PubNubAppContextEvent) -> Void)? + + // Intercepts messages from the Subscribe loop and forwards them to the current `Subscription` + lazy var adapter = BaseSubscriptionListenerAdapter( + receiver: self, + uuid: uuid, + queue: queue + ) + + internal var receiver: SubscribeReceiver? { + entity.receiver + } + + internal var subscriptionType: SubscribableType { + entity.subscriptionType + } + + internal var subscriptionNames: [String] { + let hasPresenceOption = options.hasPresenceOption() + let name = entity.name + + switch entity { + case is ChannelRepresentation: + return hasPresenceOption ? [name, name.presenceChannelName] : [name] + case is ChannelGroupRepresentation: + return hasPresenceOption ? [name, name.presenceChannelName] : [name] + default: + return [entity.name] + } + } + + /// Creates a clone of the current instance of `Subscription`. + /// + /// Use this method to create a new instance with the same configuration as the current `Subscription`. + /// The clone is a separate instance that can be used independently. + public func clone() -> Subscription { + Subscription( + queue: queue, + entity: entity, + options: options + ) + } + + /// Disposes the current `Subscription`, ending the subscription. + /// + /// Use this method to gracefully end the subscription and release associated resources. + /// Once disposed, the subscription interface cannot be restarted. + public func dispose() { + clearCallbacks() + unsubscribe() + isDisposed = true + } + + deinit { + dispose() + } +} + +extension Subscription: SubscribeCapable { + /// Subscribes to the associated `entity` with the specified timetoken. + /// + /// - Parameter timetoken: The timetoken to use for subscribing. If `nil`, the `0` value is used. + public func subscribe(with timetoken: Timetoken?) { + guard let receiver = receiver, !isDisposed else { + return + } + let channels = subscriptionType == .channel ? [self] : [] + let channelGroups = subscriptionType == .channelGroup ? [self] : [] + + receiver.registerAdapter(adapter) + receiver.internalSubscribe(with: channels, and: channelGroups, at: timetoken) + } + + /// Unsubscribes from the associated entity, ending the PubNub subscription. + /// + /// Use this method to gracefully end the subscription and stop receiving messages for the associated entity. + /// If there are no remaining subscriptions that match the associated entity, the unsubscribe action will be performed, + /// and the entity will be deregistered from the Subscribe loop. After unsubscribing, the subscription interface + /// can be restarted if needed. + public func unsubscribe() { + guard let receiver = receiver, !isDisposed else { + return + } + let channels = subscriptionType == .channel ? [self] : [] + let groups = subscriptionType == .channelGroup ? [self] : [] + + receiver.internalUnsubscribe(from: channels, and: groups, presenceOnly: false) + } +} + +extension Subscription: Hashable { + public static func ==(lhs: Subscription, rhs: Subscription) -> Bool { + lhs.uuid == rhs.uuid + } + + public func hash(into hasher: inout Hasher) { + hasher.combine(uuid) + } +} + +// MARK: - SubscribeMessagePayloadReceiver + +extension Subscription: SubscribeMessagesReceiver { + var subscriptionTopology: [SubscribableType : [String]] { + [subscriptionType: subscriptionNames] + } + + @discardableResult func onPayloadsReceived(payloads: [SubscribeMessagePayload]) -> [PubNubEvent] { + let events = payloads.compactMap { event(from: $0) } + emit(events: events) + return events + } + + func event(from payload: SubscribeMessagePayload) -> PubNubEvent? { + let isNewerOrEqualToTimetoken = payload.publishTimetoken.timetoken >= timetoken ?? 0 + let receivedFromCurrentEntity: Bool + + if subscriptionType == .channel { + receivedFromCurrentEntity = entity.name.matches(string: payload.channel) + } else if subscriptionType == .channelGroup { + receivedFromCurrentEntity = entity.name.matches(string: payload.subscription ?? payload.channel) + } else { + receivedFromCurrentEntity = true + } + + if receivedFromCurrentEntity && isNewerOrEqualToTimetoken { + let event = payload.asPubNubEvent() + return options.filterCriteriaSatisfied(event: event) ? event : nil + } else { + return nil + } + } +} + +// MARK: - Helper String extension + +fileprivate extension String { + func matches(string: String) -> Bool { + guard hasSuffix(".*") else { + return self == string + } + let pattern = "^" + self + "$" + let predicate = NSPredicate(format: "SELF MATCHES %@", pattern) + + return predicate.evaluate(with: string) + } +} diff --git a/Sources/PubNub/Events/New/SubscriptionSet.swift b/Sources/PubNub/Events/New/SubscriptionSet.swift new file mode 100644 index 00000000..b9f6279b --- /dev/null +++ b/Sources/PubNub/Events/New/SubscriptionSet.swift @@ -0,0 +1,230 @@ +// +// PubNubSubscriptionSet.swift +// +// Copyright (c) PubNub Inc. +// All rights reserved. +// +// This source code is licensed under the license found in the +// LICENSE file in the root directory of this source tree. +// + +import Foundation + +/// A final class representing a set of `Subscription`. +/// +/// Use this class to manage multiple `Subscription` concurrently. +public final class SubscriptionSet: EventEmitter, SubscriptionDisposable { + public var eventStream: ((PubNubEvent) -> Void)? + public var eventsStream: (([PubNubEvent]) -> Void)? + public var messagesStream: ((PubNubMessage) -> Void)? + public var signalsStream: ((PubNubMessage) -> Void)? + public var presenceStream: ((PubNubPresenceChange) -> Void)? + public var messageActionsStream: ((PubNubMessageActionEvent) -> Void)? + public var filesStream: ((PubNubFileEvent) -> Void)? + public var appContextStream: ((PubNubAppContextEvent) -> Void)? + + public let queue: DispatchQueue + /// Additional subscription options + public let options: SubscriptionOptions + /// A unique identifier for the current `SubscriptionSet` + public let uuid: UUID = UUID() + /// Whether current subscription is disposed or not + public private(set) var isDisposed = false + // Internally holds a collection of child subscriptions + private(set) var currentSubscriptions: Set + + // Internally intercepts messages from the Subscribe loop + // and forwards them to the current `SubscriptionSet` + lazy var adapter = BaseSubscriptionListenerAdapter( + receiver: self, + uuid: uuid, + queue: queue + ) + + /// Initializes `SubscriptionSet` object with the specified parameters. + /// + /// - Parameters: + /// - queue: The dispatch queue on which the subscription events should be handled + /// - entities: A collection of `Subscribable` entities to include in the Subscribe loop + /// - options: Additional subscription options + public init( + queue: DispatchQueue = .main, + entities: any Collection, + options: SubscriptionOptions = SubscriptionOptions.empty() + ) { + self.queue = queue + self.options = SubscriptionOptions.empty() + options + self.currentSubscriptions = Set(entities.map { + Subscription( + queue: queue, + entity: $0, + options: options + ) + }) + } + + /// Initializes `SubscriptionSet` object with the specified parameters. + /// + /// - Parameters: + /// - queue: The dispatch queue on which the subscription events should be handled + /// - subscriptions: A collection of existing `Subscription` instances to include in the Subscribe loop. + /// - options: Additional subscription options for configuring the subscription set /// + public init( + queue: DispatchQueue = .main, + subscriptions: any Collection, + options: SubscriptionOptions = SubscriptionOptions.empty() + ) { + self.queue = queue + self.options = options + self.currentSubscriptions = Set(subscriptions) + } + + /// Adds `Subscription` to the existing set of subscriptions. + /// + /// - Parameters: + /// - subscription: `Subscription` to add + public func add(subscription: Subscription) { + currentSubscriptions.insert(subscription) + } + + /// Adds a collection of `Subscription` to the existing set of subscriptions. + /// + /// - Parameters: + /// - subscriptions: List of `Subscription` to add + public func add(subscriptions: any Collection) { + subscriptions.forEach { + currentSubscriptions.insert($0) + } + } + + /// Removes `Subscription` from the existing set of subscriptions. + /// + /// - Parameters: + /// - subscription: `Subscription` to remove + public func remove(subscription: Subscription) { + currentSubscriptions.remove(subscription) + } + + /// Removes a collection of `Subscription` from the existing set of subscriptions. + /// + /// - Parameters: + /// - subscriptions: Collection of `Subscription` to remove + public func remove(subscriptions: any Collection) { + subscriptions.forEach { + currentSubscriptions.remove($0) + } + } + + /// Creates a clone of the current instance of `SubscriptionSet`. + /// + /// Use this method to create a new instance with the same configuration as the current `SubscriptionSet`. + /// The clone is a separate instance that can be used independently. + public func clone() -> SubscriptionSet { + SubscriptionSet( + queue: queue, + entities: currentSubscriptions.map { $0.entity }, + options: options + ) + } + + /// Disposes of the current instance of `SubscriptionSet`, ending all associated subscriptions. + /// + /// Use this method to gracefully end the subscription and release associated resources. + /// Once disposed, the subscription interface cannot be restarted. + public func dispose() { + clearCallbacks() + currentSubscriptions.forEach { $0.dispose() } + isDisposed = true + } + + deinit { + dispose() + } +} + +extension SubscriptionSet: SubscribeCapable { + /// Subscribes to all entities within the current `SubscriptionSet` with the specified timetoken. + /// + /// Use this method to initiate or resume subscriptions for all entities within the set. + /// If a timetoken is provided, it represents the starting point for the subscription. + /// Otherwise, the `0` timetoken is used. + /// + /// - Parameter timetoken: The timetoken to use for the subscriptions + public func subscribe(with timetoken: Timetoken?) { + guard let receiver = currentSubscriptions.first?.receiver, !isDisposed else { + return + } + receiver.registerAdapter(adapter) + currentSubscriptions.forEach { receiver.registerAdapter($0.adapter) } + + let channels = currentSubscriptions.filter { $0.subscriptionType == .channel }.allObjects + let groups = currentSubscriptions.filter { $0.subscriptionType == .channelGroup }.allObjects + + receiver.internalSubscribe( + with: channels, + and: groups, + at: timetoken + ) + } + + /// Unsubscribes from all entities within the current `SubscriptionSet`. If there are no remaining + /// subscriptions that match the associated entities, the unsubscribe action will be performed, + /// and the entities will be deregistered from the Subscribe loop. + /// + /// Use this method to gracefully end all subscriptions and stop receiving messages for all + /// associated entities. After unsubscribing, the subscription set can be restarted if needed. + public func unsubscribe() { + guard let receiver = currentSubscriptions.first?.receiver, !isDisposed else { + return + } + receiver.internalUnsubscribe( + from: currentSubscriptions.filter { $0.subscriptionType == .channel }, + and: currentSubscriptions.filter { $0.subscriptionType == .channelGroup }, + presenceOnly: false + ) + } +} + +// MARK: - SubscribeMessagePayloadReceiver + +extension SubscriptionSet: SubscribeMessagesReceiver { + var subscriptionTopology: [SubscribableType : [String]] { + var result: [SubscribableType: [String]] = [:] + result[.channel] = [] + result[.channelGroup] = [] + + return currentSubscriptions.reduce(into: result, { accumulatedRes, current in + let currentRes = current.subscriptionTopology + accumulatedRes[.channel]?.append(contentsOf: currentRes[.channel] ?? []) + accumulatedRes[.channelGroup]?.append(contentsOf: currentRes[.channelGroup] ?? []) + }) + } + + // Processes payloads according to the following rules: + // + // 1. Gets a subscription from the associated list of child subscriptions + // 2. Checks which payloads the currently iterated child subscription can map to events + // 3. Checks the events result received in the previous step against SubscriptionSet's options + // 4. Emits filtered events from SubscriptionSet + @discardableResult func onPayloadsReceived(payloads: [SubscribeMessagePayload]) -> [PubNubEvent] { + currentSubscriptions.reduce(into: [PubNubEvent]()) { accumulatedRes, childSubscription in + let events = payloads.compactMap { payload in + childSubscription.event(from: payload) + }.filter { + options.filterCriteriaSatisfied(event: $0) + } + accumulatedRes.append(contentsOf: events) + emit(events: events) + } + } +} + +extension SubscriptionSet: Hashable { + public func hash(into hasher: inout Hasher) { + hasher.combine(uuid) + } + + public static func ==(lhs: SubscriptionSet, rhs: SubscriptionSet) -> Bool { + lhs.uuid == rhs.uuid + } +} diff --git a/Sources/PubNub/Events/EventStream.swift b/Sources/PubNub/Events/Old/EventStream.swift similarity index 100% rename from Sources/PubNub/Events/EventStream.swift rename to Sources/PubNub/Events/Old/EventStream.swift diff --git a/Sources/PubNub/Events/Subscription/PubNubEntityEvent.swift b/Sources/PubNub/Events/Old/Subscription/PubNubEntityEvent.swift similarity index 100% rename from Sources/PubNub/Events/Subscription/PubNubEntityEvent.swift rename to Sources/PubNub/Events/Old/Subscription/PubNubEntityEvent.swift diff --git a/Sources/PubNub/Events/Subscription/SubscriptionStream.swift b/Sources/PubNub/Events/Old/Subscription/SubscriptionStream.swift similarity index 97% rename from Sources/PubNub/Events/Subscription/SubscriptionStream.swift rename to Sources/PubNub/Events/Old/Subscription/SubscriptionStream.swift index 11277b0b..62d805c6 100644 --- a/Sources/PubNub/Events/Subscription/SubscriptionStream.swift +++ b/Sources/PubNub/Events/Old/Subscription/SubscriptionStream.swift @@ -300,15 +300,22 @@ public final class CoreListener: BaseSubscriptionListener { /// Listener that will emit events related to PubNub subscription and presence APIs open class BaseSubscriptionListener: EventStreamReceiver, Hashable { // EventStream - public let uuid = UUID() + public let uuid: UUID public var queue: DispatchQueue /// Whether you would like to avoid receiving cancellation errors from this listener public var supressCancellationErrors: Bool = true + // Keeps a mechanism to cancel a listener var token: ListenerToken? public init(queue: DispatchQueue = .main) { self.queue = queue + self.uuid = UUID() + } + + init(queue: DispatchQueue = .main, uuid: UUID = UUID()) { + self.queue = queue + self.uuid = uuid } deinit { @@ -316,9 +323,12 @@ open class BaseSubscriptionListener: EventStreamReceiver, Hashable { } open func emit(batch _: [SubscribeMessagePayload]) {} - open func emit(subscribe _: PubNubSubscribeEvent) {} + public func hash(into hasher: inout Hasher) { + hasher.combine(uuid) + } + public static func == (lhs: BaseSubscriptionListener, rhs: BaseSubscriptionListener) -> Bool { return lhs.uuid == rhs.uuid } diff --git a/Sources/PubNub/PubNub.swift b/Sources/PubNub/PubNub.swift index 6a55a2ab..d61cf1be 100644 --- a/Sources/PubNub/PubNub.swift +++ b/Sources/PubNub/PubNub.swift @@ -461,6 +461,56 @@ public extension PubNub { } } +extension PubNub: SubscribeReceiver { + func registerAdapter(_ adapter: BaseSubscriptionListenerAdapter) { + subscription.registerAdapter(adapter) + } + + func internalSubscribe( + with channels: [Subscription], + and groups: [Subscription], + at timetoken: Timetoken? + ) { + subscription.internalSubscribe( + with: channels, + and: groups, + at: timetoken + ) + } + + func internalUnsubscribe( + from channels: [Subscription], + and groups: [Subscription], + presenceOnly: Bool + ) { + subscription.internalUnsubscribe( + from: channels, + and: groups, + presenceOnly: presenceOnly + ) + } +} + +// MARK: - EntityCreator + +extension PubNub: EntityCreator { + public func channel(_ name: String) -> ChannelRepresentation { + subscription.channel(name) + } + + public func channelGroup(_ name: String) -> ChannelGroupRepresentation { + subscription.channelGroup(name) + } + + public func userMetadata(_ name: String) -> UserMetadataRepresentation { + subscription.userMetadata(name) + } + + public func channelMetadata(_ name: String) -> ChannelMetadataRepresentation { + subscription.channelMetadata(name) + } +} + // MARK: - Presence Management public extension PubNub { @@ -1400,3 +1450,114 @@ public extension PubNub { } // swiftlint:disable:next file_length } + +// MARK: - Global EventEmitter + +/// An extension to the PubNub class, making it conform to the `EventEmitter` protocol and serving +/// as a global emitter for all entities. +/// +/// This extension enables `PubNub` instances to act as event emitters, allowing them to dispatch +/// various types of events for all registered entities in the Subscribe loop. +extension PubNub: EventEmitter { + public var queue: DispatchQueue { + subscription.queue + } + + public var uuid: UUID { + subscription.uuid + } + + public var eventStream: ((PubNubEvent) -> Void)? { + get { + subscription.eventStream + } + set { + subscription.eventStream = newValue + } + } + + public var eventsStream: (([PubNubEvent]) -> Void)? { + get { + subscription.eventsStream + } + set { + subscription.eventsStream = newValue + } + } + + public var messagesStream: ((PubNubMessage) -> Void)? { + get { + subscription.messagesStream + } + set { + subscription.messagesStream = newValue + } + } + + public var signalsStream: ((PubNubMessage) -> Void)? { + get { + subscription.signalsStream + } + set { + subscription.signalsStream = newValue + } + } + + public var presenceStream: ((PubNubPresenceChange) -> Void)? { + get { + subscription.presenceStream + } + set { + subscription.presenceStream = newValue + } + } + + public var messageActionsStream: ((PubNubMessageActionEvent) -> Void)? { + get { + subscription.messageActionsStream + } + set { + subscription.messageActionsStream = newValue + } + } + + public var filesStream: ((PubNubFileEvent) -> Void)? { + get { + subscription.filesStream + } + set { + subscription.filesStream = newValue + } + } + + public var appContextStream: ((PubNubAppContextEvent) -> Void)? { + get { + subscription.appContextStream + } + set { + subscription.appContextStream = newValue + } + } +} + +/// An extension to the `PubNub` class, making it conform to the `StatusEmitter` protocol and serving +/// as a global listener for connection changes and possible errors along the way. +extension PubNub: StatusEmitter { + public var didReceiveConnectionStatusChange: ((ConnectionStatus) -> Void)? { + get { + subscription.didReceiveConnectionStatusChange + } + set { + subscription.didReceiveConnectionStatusChange = newValue + } + } + + public var didReceiveSubscribeError: ((PubNubError) -> Void)? { + get { + subscription.didReceiveSubscribeError + } + set { + subscription.didReceiveSubscribeError = newValue + } + } +} diff --git a/Sources/PubNub/Subscription/Strategy/EventEngineSubscriptionSessionStrategy.swift b/Sources/PubNub/Subscription/Strategy/EventEngineSubscriptionSessionStrategy.swift index cfa66b5f..c7f3c20f 100644 --- a/Sources/PubNub/Subscription/Strategy/EventEngineSubscriptionSessionStrategy.swift +++ b/Sources/PubNub/Subscription/Strategy/EventEngineSubscriptionSessionStrategy.swift @@ -16,7 +16,7 @@ class EventEngineSubscriptionSessionStrategy: SubscriptionSessionStrategy { let presenceEngine: PresenceEngine let presenceStateContainer: PubNubPresenceStateContainer - var privateListeners: WeakSet = WeakSet([]) + var listeners: WeakSet = WeakSet([]) var configuration: SubscriptionConfiguration var previousTokenResponse: SubscribeCursor? var filterExpression: String? { @@ -73,7 +73,7 @@ class EventEngineSubscriptionSessionStrategy: SubscriptionSessionStrategy { subscribeEngine.dependencies = EventEngineDependencies( value: Subscribe.Dependencies( configuration: configuration, - listeners: privateListeners.allObjects + listeners: listeners.allObjects ) ) } @@ -104,48 +104,93 @@ class EventEngineSubscriptionSessionStrategy: SubscriptionSessionStrategy { sendSubscribeEvent(event: .subscriptionChanged(channels: channels, groups: groups)) } - // MARK: - Subscription Loop - func subscribe( - to channels: [String], - and groups: [String], - at cursor: SubscribeCursor?, - withPresence: Bool + to channels: [PubNubChannel], + and groups: [PubNubChannel], + at cursor: SubscribeCursor? ) { - let currentInput = subscribeEngine.state.input - let newChannels = channels.map { PubNubChannel(id: $0, withPresence: withPresence) } - let newGroups = groups.map { PubNubChannel(id: $0, withPresence: withPresence) } - let addingResult = currentInput.adding(channels: newChannels, and: newGroups) - let newInput = addingResult.newInput + let currentChannelsAndGroups = subscribeEngine.state.input + let insertionResult = currentChannelsAndGroups.adding(channels: channels, and: groups) + let newChannelsAndGroups = insertionResult.newInput - if newInput != currentInput { - if let cursor = cursor, cursor.timetoken != 0 { - sendSubscribeEvent(event: .subscriptionRestored( - channels: newInput.allSubscribedChannelNames, - groups: newInput.allSubscribedGroupNames, - cursor: cursor - )) - } else { - sendSubscribeEvent(event: .subscriptionChanged( - channels: newInput.allSubscribedChannelNames, - groups: newInput.allSubscribedGroupNames - )) - } + if let cursor = cursor, cursor.timetoken != 0 { + sendSubscribeEvent(event: .subscriptionRestored( + channels: newChannelsAndGroups.allSubscribedChannelNames, + groups: newChannelsAndGroups.allSubscribedGroupNames, + cursor: cursor + )) sendPresenceEvent(event: .joined( - channels: newInput.subscribedChannelNames, - groups: newInput.subscribedGroupNames + channels: newChannelsAndGroups.subscribedChannelNames, + groups: newChannelsAndGroups.subscribedGroupNames )) - + } else if newChannelsAndGroups != currentChannelsAndGroups { + sendSubscribeEvent(event: .subscriptionChanged( + channels: newChannelsAndGroups.allSubscribedChannelNames, + groups: newChannelsAndGroups.allSubscribedGroupNames + )) + sendPresenceEvent(event: .joined( + channels: newChannelsAndGroups.subscribedChannelNames, + groups: newChannelsAndGroups.subscribedGroupNames + )) + } else { + // No unique channels or channel groups were provided. + // There's no need to alter the Subscribe loop. + } + if !insertionResult.insertedChannels.isEmpty || !insertionResult.insertedGroups.isEmpty { notify { $0.emit(subscribe: .subscriptionChanged( .subscribed( - channels: addingResult.insertedChannels, - groups: addingResult.insertedGroups + channels: insertionResult.insertedChannels, + groups: insertionResult.insertedGroups )) ) } } } + + func unsubscribeFrom( + channels: [PubNubChannel], + presenceChannelsOnly: [PubNubChannel], + groups: [PubNubChannel], + presenceGroupsOnly: [PubNubChannel] + ) { + // Retrieve the current list of subscribed channels and channel groups + let currentChannelsAndGroups = subscribeEngine.state.input + // Provides the outcome after updating the list of channels and channel groups + let removingResult = currentChannelsAndGroups.removing( + channels: channels,presenceChannelsOnly: presenceChannelsOnly, + groups: groups, presenceGroupsOnly: presenceGroupsOnly + ) + + // Exits if there are no differences for channels or channel groups + guard removingResult.newInput != currentChannelsAndGroups else { + return + } + if configuration.maintainPresenceState { + presenceStateContainer.removeState(forChannels: removingResult.removedChannels.map { $0.id }) + } + // Dispatch local event first to guarantee the expected order of events. + // An event indicating unsubscribing from channels and channel groups + // should be emitted before an event related to disconnecting + // from the Subscribe loop, assuming you unsubscribed from all channels + // and channel groups + notify { + $0.emit(subscribe: .subscriptionChanged( + .unsubscribed( + channels: removingResult.removedChannels, + groups: removingResult.removedGroups + )) + ) + } + sendSubscribeEvent(event: .subscriptionChanged( + channels: removingResult.newInput.allSubscribedChannelNames, + groups: removingResult.newInput.allSubscribedGroupNames + )) + sendPresenceEvent(event: .left( + channels: removingResult.removedChannels.map { $0.id }, + groups: removingResult.removedGroups.map { $0.id } + )) + } func reconnect(at cursor: SubscribeCursor?) { sendSubscribeEvent(event: .reconnect(cursor: cursor)) @@ -156,43 +201,14 @@ class EventEngineSubscriptionSessionStrategy: SubscriptionSessionStrategy { sendPresenceEvent(event: .disconnect) } - // MARK: - Unsubscribe - - func unsubscribe(from channels: [String], and groups: [String], presenceOnly: Bool) { - let unsubscribedChannels = channels.map { presenceOnly ? $0.presenceChannelName : $0 } - let unsubscribedGroups = groups.map { presenceOnly ? $0.presenceChannelName : $0 } - let currentInput = subscribeEngine.state.input - let removingRes = subscribeEngine.state.input.removing(channels: unsubscribedChannels, and: unsubscribedGroups) - let newInput = removingRes.newInput - - if newInput != currentInput { - if configuration.maintainPresenceState { - presenceStateContainer.removeState(forChannels: channels) - } - // Ensures that local event is emitted before receiving .disconnected connection status - notify { - $0.emit(subscribe: .subscriptionChanged( - .unsubscribed( - channels: removingRes.removedChannels, - groups: removingRes.removedGroups - )) - ) - } - sendSubscribeEvent(event: .subscriptionChanged( - channels: newInput.allSubscribedChannelNames, - groups: newInput.allSubscribedGroupNames - )) - sendPresenceEvent(event: .left( - channels: channels, - groups: groups - )) - } - } - func unsubscribeAll() { let currentInput = subscribeEngine.state.input - // Ensures that local event is emitted before receiving .disconnected connection status + // Dispatch local event first to guarantee the expected order of events. + // An event indicating unsubscribing from channels and channel groups + // should be emitted before an event related to disconnecting + // from the Subscribe loop, assuming you unsubscribed from all channels + // and channel groups notify { $0.emit(subscribe: .subscriptionChanged( .unsubscribed( @@ -205,30 +221,12 @@ class EventEngineSubscriptionSessionStrategy: SubscriptionSessionStrategy { sendSubscribeEvent(event: .unsubscribeAll) sendPresenceEvent(event: .leftAll) } -} - -extension EventEngineSubscriptionSessionStrategy: EventStreamEmitter { - typealias ListenerType = BaseSubscriptionListener - - var listeners: [ListenerType] { - privateListeners.allObjects - } - - func add(_ listener: ListenerType) { - // Ensure that we cancel the previously attached token - listener.token?.cancel() - // Add new token to the listener - listener.token = ListenerToken { [weak self, weak listener] in - if let listener = listener { - self?.privateListeners.remove(listener) - self?.updateSubscribeEngineDependencies() - } - } - privateListeners.update(listener) + + func onListenerAdded(_ listener: BaseSubscriptionListener) { updateSubscribeEngineDependencies() } - - func notify(listeners closure: (ListenerType) -> Void) { - listeners.forEach { closure($0) } + + private func notify(listeners closure: (BaseSubscriptionListener) -> Void) { + listeners.allObjects.forEach { closure($0) } } } diff --git a/Sources/PubNub/Subscription/Strategy/LegacySubscriptionSessionStrategy.swift b/Sources/PubNub/Subscription/Strategy/LegacySubscriptionSessionStrategy.swift index afec2440..e4af884f 100644 --- a/Sources/PubNub/Subscription/Strategy/LegacySubscriptionSessionStrategy.swift +++ b/Sources/PubNub/Subscription/Strategy/LegacySubscriptionSessionStrategy.swift @@ -18,7 +18,7 @@ class LegacySubscriptionSessionStrategy: SubscriptionSessionStrategy { let responseQueue: DispatchQueue var configuration: SubscriptionConfiguration - var privateListeners: WeakSet = WeakSet([]) + var listeners: WeakSet = WeakSet([]) var filterExpression: String? var messageCache = [SubscribeMessagePayload?].init(repeating: nil, count: 100) var presenceTimer: Timer? @@ -110,36 +110,24 @@ class LegacySubscriptionSessionStrategy: SubscriptionSessionStrategy { // MARK: - Subscription Loop func subscribe( - to channels: [String], - and groups: [String], - at cursor: SubscribeCursor?, - withPresence: Bool + to channels: [PubNubChannel], + and groups: [PubNubChannel], + at cursor: SubscribeCursor? ) { - if channels.isEmpty, groups.isEmpty { - return - } - - let channelObject = channels.map { PubNubChannel(id: $0, withPresence: withPresence) } - let groupObjects = groups.map { PubNubChannel(id: $0, withPresence: withPresence) } - - // Don't attempt to start subscription if there are no changes let subscribeChange = internalState.lockedWrite { state -> SubscriptionChangeEvent in - - let newChannels = channelObject.filter { state.channels.insert($0) } - let newGroups = groupObjects.filter { state.groups.insert($0) } - - return .subscribed(channels: newChannels, groups: newGroups) + .subscribed( + channels: channels.filter { state.channels.insert($0) }, + groups: groups.filter { state.groups.insert($0) } + ) } - if subscribeChange.didChange { notify { $0.emit(subscribe: .subscriptionChanged(subscribeChange)) } } - if subscribeChange.didChange || !connectionStatus.isActive { reconnect(at: cursor) } } - + /// Reconnect a disconnected subscription stream /// - parameter timetoken: The timetoken to subscribe with func reconnect(at cursor: SubscribeCursor? = nil) { @@ -173,12 +161,10 @@ class LegacySubscriptionSessionStrategy: SubscriptionSessionStrategy { let (channels, groups) = internalState.lockedWrite { state -> ([String], [String]) in (state.allSubscribedChannels, state.allSubscribedGroups) } - // Don't start subscription if there no channels/groups if channels.isEmpty, groups.isEmpty { return } - // Create Endpoing let router = SubscribeRouter( .subscribe( @@ -199,7 +185,6 @@ class LegacySubscriptionSessionStrategy: SubscriptionSessionStrategy { let currentSubscribeID = nextSubscribe.requestID request = nextSubscribe - request? .validate() .response(on: .main, decoder: SubscribeDecoder()) { [weak self] result in @@ -208,10 +193,8 @@ class LegacySubscriptionSessionStrategy: SubscriptionSessionStrategy { guard let strongSelf = self else { return } - // Reset heartbeat timer self?.registerHeartbeatTimer() - // Ensure that we're connected now the response has been processed self?.connectionStatus = .connected @@ -263,15 +246,12 @@ class LegacySubscriptionSessionStrategy: SubscriptionSessionStrategy { // Update Cache and notify if not a duplicate message if !strongSelf.messageCache.contains(message) { self?.messageCache.append(message) - // Remove oldest value if we're at max capacity if strongSelf.messageCache.count >= 100 { self?.messageCache.remove(at: 0) } - return true } - return false } @@ -286,7 +266,6 @@ class LegacySubscriptionSessionStrategy: SubscriptionSessionStrategy { .errorReceived(PubNubError.event(error, router: self?.request?.router)) ) } - if error.pubNubError?.reason == .clientCancelled || error.pubNubError?.reason == .longPollingRestart || error.pubNubError?.reason == .longPollingReset { if self?.subscriptionCount == 0 { @@ -297,7 +276,6 @@ class LegacySubscriptionSessionStrategy: SubscriptionSessionStrategy { } } else if let cursor = error.pubNubError?.affected.findFirst(by: PubNubError.AffectedValue.subscribe) { self?.previousTokenResponse = cursor - // Repeat the request self?.performSubscribeLoop(at: cursor) } else { @@ -308,23 +286,27 @@ class LegacySubscriptionSessionStrategy: SubscriptionSessionStrategy { } // MARK: - Unsubscribe - - func unsubscribe(from channels: [String], and groups: [String], presenceOnly: Bool) { - // Update Channel List + + func unsubscribeFrom( + channels: [PubNubChannel], + presenceChannelsOnly: [PubNubChannel], + groups: [PubNubChannel], + presenceGroupsOnly: [PubNubChannel] + ) { let subscribeChange = internalState.lockedWrite { state -> SubscriptionChangeEvent in - if presenceOnly { - let presenceChannelsRemoved = channels.compactMap { state.channels.unsubscribePresence($0) } - let presenceGroupsRemoved = groups.compactMap { state.groups.unsubscribePresence($0) } - - return .unsubscribed(channels: presenceChannelsRemoved, groups: presenceGroupsRemoved) - } else { - let removedChannels = channels.compactMap { state.channels.removeValue(forKey: $0) } - let removedGroups = groups.compactMap { state.groups.removeValue(forKey: $0) } - - return .unsubscribed(channels: removedChannels, groups: removedGroups) - } + .unsubscribed( + channels: channels.compactMap { + state.channels.removeValue(forKey: $0.id) + } + presenceChannelsOnly.compactMap { + state.channels.unsubscribePresence($0.id) + }, + groups: groups.compactMap { + state.groups.removeValue(forKey: $0.id) + } + presenceGroupsOnly.compactMap { + state.groups.unsubscribePresence($0.id) + } + ) } - if subscribeChange.didChange { notify { $0.emit(subscribe: .subscriptionChanged(subscribeChange)) @@ -396,28 +378,12 @@ class LegacySubscriptionSessionStrategy: SubscriptionSessionStrategy { reconnect(at: previousTokenResponse) } } -} - -extension LegacySubscriptionSessionStrategy: EventStreamEmitter { - typealias ListenerType = BaseSubscriptionListener - - var listeners: [ListenerType] { - return privateListeners.allObjects - } - - func add(_ listener: ListenerType) { - // Ensure that we cancel the previously attached token - listener.token?.cancel() - // Add new token to the listener - listener.token = ListenerToken { [weak self, weak listener] in - if let listener = listener { - self?.privateListeners.remove(listener) - } - } - privateListeners.update(listener) + + func onListenerAdded(_ listener: BaseSubscriptionListener) { + } - - func notify(listeners closure: (ListenerType) -> Void) { - listeners.forEach { closure($0) } + + private func notify(listeners closure: (BaseSubscriptionListener) -> Void) { + listeners.allObjects.forEach { closure($0) } } } diff --git a/Sources/PubNub/Subscription/Strategy/SubscriptionSessionStrategy.swift b/Sources/PubNub/Subscription/Strategy/SubscriptionSessionStrategy.swift index 566c560b..f2bb48f4 100644 --- a/Sources/PubNub/Subscription/Strategy/SubscriptionSessionStrategy.swift +++ b/Sources/PubNub/Subscription/Strategy/SubscriptionSessionStrategy.swift @@ -10,7 +10,7 @@ import Foundation -protocol SubscriptionSessionStrategy: EventStreamEmitter where ListenerType == BaseSubscriptionListener { +protocol SubscriptionSessionStrategy: AnyObject { var uuid: UUID { get } var configuration: SubscriptionConfiguration { get set } var subscribedChannels: [String] { get } @@ -19,9 +19,21 @@ protocol SubscriptionSessionStrategy: EventStreamEmitter where ListenerType == B var connectionStatus: ConnectionStatus { get } var previousTokenResponse: SubscribeCursor? { get set } var filterExpression: String? { get set } + var listeners: WeakSet { get set } + + func subscribe( + to channels: [PubNubChannel], + and groups: [PubNubChannel], + at cursor: SubscribeCursor? + ) + func unsubscribeFrom( + channels: [PubNubChannel], + presenceChannelsOnly: [PubNubChannel], + groups: [PubNubChannel], + presenceGroupsOnly: [PubNubChannel] + ) - func subscribe(to channels: [String], and groups: [String], at cursor: SubscribeCursor?, withPresence: Bool) - func unsubscribe(from channels: [String], and groups: [String], presenceOnly: Bool) + func onListenerAdded(_ listener: BaseSubscriptionListener) func reconnect(at cursor: SubscribeCursor?) func disconnect() func unsubscribeAll() diff --git a/Sources/PubNub/Subscription/SubscriptionSession.swift b/Sources/PubNub/Subscription/SubscriptionSession.swift index f1c57737..b485ca18 100644 --- a/Sources/PubNub/Subscription/SubscriptionSession.swift +++ b/Sources/PubNub/Subscription/SubscriptionSession.swift @@ -11,12 +11,15 @@ import Foundation @available(*, deprecated, message: "Subscribe and unsubscribe using methods from a PubNub object") -public class SubscriptionSession { - /// An unique identifier for subscription session +public class SubscriptionSession: EventEmitter { + /// A unique identifier for subscription session public var uuid: UUID { strategy.uuid } + /// An underlying queue to dispatch events + public let queue: DispatchQueue + /// PSV2 feature to subscribe with a custom filter expression. @available(*, deprecated, message: "Use `subscribeFilterExpression` from a PubNub object") public var filterExpression: String? { @@ -27,8 +30,18 @@ public class SubscriptionSession { } } - private let strategy: any SubscriptionSessionStrategy - + /// `EventEmitter` conformance + public var eventStream: ((PubNubEvent) -> Void)? + public var eventsStream: (([PubNubEvent]) -> Void)? + public var messagesStream: ((PubNubMessage) -> Void)? + public var signalsStream: ((PubNubMessage) -> Void)? + public var presenceStream: ((PubNubPresenceChange) -> Void)? + public var messageActionsStream: ((PubNubMessageActionEvent) -> Void)? + public var filesStream: ((PubNubFileEvent) -> Void)? + public var appContextStream: ((PubNubAppContextEvent) -> Void)? + public var didReceiveConnectionStatusChange: ((ConnectionStatus) -> Void)? + public var didReceiveSubscribeError: ((PubNubError) -> Void)? + var previousTokenResponse: SubscribeCursor? { strategy.previousTokenResponse } @@ -41,8 +54,42 @@ public class SubscriptionSession { } } - internal init(strategy: any SubscriptionSessionStrategy) { + private lazy var globalEventsListener: BaseSubscriptionListenerAdapter = { + BaseSubscriptionListenerAdapter( + receiver: self, + uuid: uuid, + queue: queue + ) + }() + + private lazy var globalStatusListener: BaseSubscriptionListener = { + // Creates legacy listener under the hood to capture status changes + let statusListener = SubscriptionListener(queue: queue) + // Detects status changes and forwards events to the current instance + // representing the Subscribe loop's status emitter + statusListener.didReceiveStatus = { [weak self] statusChange in + switch statusChange { + case .success(let newStatus): + self?.didReceiveConnectionStatusChange?(newStatus) + case .failure(let error): + self?.didReceiveSubscribeError?(error) + } + } + return statusListener + }() + + private var globalChannelSubscriptions: [String: Subscription] = [:] + private var globalGroupSubscriptions: [String: Subscription] = [:] + private let strategy: any SubscriptionSessionStrategy + + internal init( + strategy: any SubscriptionSessionStrategy, + eventsQueue queue: DispatchQueue = .main + ) { self.strategy = strategy + self.queue = queue + self.add(globalEventsListener) + self.add(globalStatusListener) } /// Names of all subscribed channels @@ -83,12 +130,33 @@ public class SubscriptionSession { at cursor: SubscribeCursor? = nil, withPresence: Bool = false ) { - strategy.subscribe( - to: channels, - and: groups, - at: cursor, - withPresence: withPresence + let channelSubscriptions = channels.map { + channel($0).subscription( + queue: queue, + options: withPresence ? ReceivePresenceEvents() : SubscriptionOptions.empty() + ) + } + let channelGroupSubscriptions = groups.map { + channelGroup($0).subscription( + queue: queue, + options: withPresence ? ReceivePresenceEvents() : SubscriptionOptions.empty() + ) + } + internalSubscribe( + with: channelSubscriptions, + and: channelGroupSubscriptions, + at: cursor?.timetoken ) + channelSubscriptions.forEach { subscription in + subscription.subscriptionNames.flatMap { $0 }.forEach { + globalChannelSubscriptions[$0] = subscription + } + } + channelGroupSubscriptions.forEach { subscription in + subscription.subscriptionNames.flatMap { $0 }.forEach { + globalGroupSubscriptions[$0] = subscription + } + } } /// Reconnect a disconnected subscription stream @@ -110,12 +178,26 @@ public class SubscriptionSession { /// - from: List of channels to unsubscribe from /// - and: List of channel groups to unsubscribe from /// - presenceOnly: If true, it only unsubscribes from presence events on the specified channels. - public func unsubscribe(from channels: [String], and groups: [String] = [], presenceOnly: Bool = false) { - strategy.unsubscribe( - from: channels, - and: groups, + public func unsubscribe( + from channels: [String], + and groups: [String] = [], + presenceOnly: Bool = false + ) { + internalUnsubscribe( + from: channels.map { Subscription(queue: queue, entity: channel($0)) }, + and: groups.map { Subscription(queue: queue, entity: channelGroup($0)) }, presenceOnly: presenceOnly ) + channels.flatMap { + presenceOnly ? [$0.presenceChannelName] : [$0, $0.presenceChannelName] + }.forEach { + globalChannelSubscriptions.removeValue(forKey: $0) + } + groups.flatMap { + presenceOnly ? [$0.presenceChannelName] : [$0, $0.presenceChannelName] + }.forEach { + globalGroupSubscriptions.removeValue(forKey: $0) + } } /// Unsubscribe from all channels and channel groups @@ -124,22 +206,228 @@ public class SubscriptionSession { } } -extension SubscriptionSession: EventStreamEmitter { - public typealias ListenerType = BaseSubscriptionListener +// MARK: - SubscribeIntentReceiver - public var listeners: [ListenerType] { - strategy.listeners +extension SubscriptionSession: SubscribeReceiver { + // Registers a subscription adapter to translate events from a legacy listener + // into the new Listeners API. + // + // The provided adapter is responsible for translating events received from a legacy listener + // into the new Listeners API, allowing seamless integration with both new and old codebases. + func registerAdapter(_ adapter: BaseSubscriptionListenerAdapter) { + add(adapter) + } + + // Maps the raw channel/channel group array to collections of PubNubChannel that should be subscribed to + // with and without Presence, respectively. + private typealias SubscribeRetrievalRes = ( + itemsWithPresenceIncluded: [PubNubChannel], + itemsWithoutPresence: [PubNubChannel] + ) + // Maps the raw channel/channel group array to collections of `PubNubChannel` that should be unsubscribed to. + private typealias UnsubscribeRetrievalRes = ( + presenceOnlyItems: [PubNubChannel], + items: [PubNubChannel] + ) + + // Composes final PubNubChannel lists the user should subscribe to + // according to provided raw input and forwards the result to the underlying Subscription strategy. + func internalSubscribe( + with channels: [Subscription], + and groups: [Subscription], + at timetoken: Timetoken? + ) { + if channels.isEmpty, groups.isEmpty { + return + } + + let extractingChannelsRes = retrieveItemsToSubscribe(from: channels) + let extractingGroupsRes = retrieveItemsToSubscribe(from: groups) + + channels.forEach { channelSubscription in + registerAdapter(channelSubscription.adapter) + } + groups.forEach { groupSubscription in + registerAdapter(groupSubscription.adapter) + } + strategy.subscribe( + to: extractingChannelsRes.itemsWithPresenceIncluded + extractingChannelsRes.itemsWithoutPresence, + and: extractingGroupsRes.itemsWithPresenceIncluded + extractingGroupsRes.itemsWithoutPresence, + at: SubscribeCursor(timetoken: timetoken) + ) + } + + private func retrieveItemsToSubscribe(from subscriptions: [Subscription]) -> SubscribeRetrievalRes { + // Detects all Presence channels from provided String array and maps them into PubNubChannel + // containing the main channel name and the flag indicating the resulting PubNubChannel is subscribed + // with Presence. Note that Presence channels are supplementary to the main data channels. + // Therefore, subscribing to a Presence channel alone without its corresponding main channel is not supported. + let channelsWithPresenceIncluded = Set(subscriptions.flatMap { + $0.subscriptionNames + }.filter { + $0.isPresenceChannelName + }).map { + PubNubChannel(channel: $0) + } + + // Detects remaining main channel names without Presence enabled from provided input and ensuring + // there are no duplicates with the result received from the previous step + let channelsWithoutPresence = Set(subscriptions.flatMap { + $0.subscriptionNames + }.map { + $0.trimmingPresenceChannelSuffix + }).symmetricDifference(channelsWithPresenceIncluded.map { + $0.id + }).map { + PubNubChannel(id: $0, withPresence: false) + } + + return SubscribeRetrievalRes( + itemsWithPresenceIncluded: channelsWithPresenceIncluded, + itemsWithoutPresence: channelsWithoutPresence + ) + } + + func internalUnsubscribe( + from channels: [Subscription], + and channelGroups: [Subscription], + presenceOnly: Bool + ) { + let extractingChannelsRes = extractItemsToUnsubscribe( + from: channels, + type: .channel, + presenceItemsOnly: presenceOnly + ) + let extractingGroupsRes = extractItemsToUnsubscribe( + from: channelGroups, + type: .channelGroup, + presenceItemsOnly: presenceOnly + ) + channels.forEach { channelSubscription in + remove(channelSubscription.adapter) + } + channelGroups.forEach { channelGroupSubscription in + remove(channelGroupSubscription.adapter) + } + strategy.unsubscribeFrom( + channels: extractingChannelsRes.items, + presenceChannelsOnly: extractingChannelsRes.presenceOnlyItems, + groups: extractingGroupsRes.items, + presenceGroupsOnly: extractingGroupsRes.presenceOnlyItems + ) + } + + private func subscriptionCount(for name: String, type: SubscribableType) -> Int { + subscriptionTopology[type]?.filter { $0 == name }.count ?? 0 } - public func add(_ listener: ListenerType) { - strategy.add(listener) + // Creates the final list of Presence channels/channel groups and main channels/channel groups + // the user should unsubscribe from according to the following rules: + // + // 1. Unsubscribes from the main channel happen if: + // * There are no references to its Presence equivalent from other subscriptions + // * There are no references to the main channel from other subscriptions + // 2. Unsubscribing from the Presence channel happens if: + // * There are no references to it from other subscriptions + private func extractItemsToUnsubscribe( + from subscriptions: [Subscription], + type: SubscribableType, + presenceItemsOnly: Bool + ) -> UnsubscribeRetrievalRes { + let presenceItems = Set(subscriptions.flatMap { + $0.subscriptionNames + }).filter { + $0.isPresenceChannelName + }.map { + PubNubChannel(channel: $0) + }.filter { + subscriptionCount(for: $0.presenceId, type: type) <= 1 + } + + let channels = presenceItemsOnly ? [] : Set(subscriptions.flatMap { + $0.subscriptionNames + }).symmetricDifference(presenceItems.map { + $0.presenceId + }).map { + PubNubChannel(id: $0, withPresence: false) + }.filter { + subscriptionCount( + for: $0.presenceId, + type: type + ) <= 1 && + subscriptionCount( + for: $0.id, + type: type + ) <= 1 + } + + return UnsubscribeRetrievalRes( + presenceOnlyItems: presenceItems, + items: channels + ) + } +} + +fileprivate extension WeakSet where Element == BaseSubscriptionListener { + func subscriptions(excluding uuid: UUID? = nil) -> [BaseSubscriptionListenerAdapter] { + compactMap { + if let listener = $0 as? BaseSubscriptionListenerAdapter { + return listener.uuid != uuid ? listener : nil + } else { + return nil + } + } + } +} + +// MARK: - EntityCreator + +extension SubscriptionSession: EntityCreator { + public func channel(_ name: String) -> ChannelRepresentation { + ChannelRepresentation(name: name, receiver: self) + } + + public func channelGroup(_ name: String) -> ChannelGroupRepresentation { + ChannelGroupRepresentation(name: name, receiver: self) + } + + public func userMetadata(_ name: String) -> UserMetadataRepresentation { + UserMetadataRepresentation(name: name, receiver: self) + } + + public func channelMetadata(_ name: String) -> ChannelMetadataRepresentation { + ChannelMetadataRepresentation(name: name, receiver: self) + } +} + +// MARK: - EventStreamEmitter + +extension SubscriptionSession: EventStreamEmitter { + public typealias ListenerType = BaseSubscriptionListener + + public var listeners: [ListenerType] { + strategy.listeners.allObjects } public func notify(listeners closure: (ListenerType) -> Void) { - strategy.notify(listeners: closure) + listeners.forEach { closure($0) } + } + + public func add(_ listener: ListenerType) { + // Ensure that we cancel the previously attached token + listener.token?.cancel() + // Add new token to the listener + listener.token = ListenerToken { [weak self, weak listener] in + if let listener = listener { + self?.strategy.listeners.remove(listener) + } + } + strategy.listeners.update(listener) } } +// MARK: - Hashable & CustomStringConvertible + extension SubscriptionSession: Hashable, CustomStringConvertible { public static func == (lhs: SubscriptionSession, rhs: SubscriptionSession) -> Bool { lhs.uuid == rhs.uuid @@ -153,3 +441,27 @@ extension SubscriptionSession: Hashable, CustomStringConvertible { uuid.uuidString } } + +// MARK: - SubscribeMessagePayloadReceiver + +extension SubscriptionSession: SubscribeMessagesReceiver { + var subscriptionTopology: [SubscribableType : [String]] { + var result: [SubscribableType: [String]] = [:] + result[.channel] = [] + result[.channelGroup] = [] + + return strategy.listeners.subscriptions( + excluding: globalEventsListener.uuid + ).reduce(into: result) { res, current in + let currentRes = current.receiver?.subscriptionTopology ?? [:] + res[.channel]?.append(contentsOf: currentRes[.channel] ?? []) + res[.channelGroup]?.append(contentsOf: currentRes[.channelGroup] ?? []) + } + } + + func onPayloadsReceived(payloads: [SubscribeMessagePayload]) -> [PubNubEvent] { + let events = payloads.map { $0.asPubNubEvent() } + emit(events: events) + return events + } +} diff --git a/Tests/PubNubTests/EventEngine/Subscribe/SubscribeInputTests.swift b/Tests/PubNubTests/EventEngine/Subscribe/SubscribeInputTests.swift index 6927f953..9ca9e36d 100644 --- a/Tests/PubNubTests/EventEngine/Subscribe/SubscribeInputTests.swift +++ b/Tests/PubNubTests/EventEngine/Subscribe/SubscribeInputTests.swift @@ -112,8 +112,13 @@ class SubscribeInputTests: XCTestCase { PubNubChannel(id: "g3") ] ) + let result = input1.removing( + channels: [PubNubChannel(id: "c1"), PubNubChannel(id: "c3")], + presenceChannelsOnly: [], + groups: [PubNubChannel(id: "g1"), PubNubChannel(id: "g3")], + presenceGroupsOnly: [] + ) - let result = input1.removing(channels: ["c1", "c3"], and: ["g1", "g3"]) let newInput = result.newInput let expAllSubscribedChannelNames = ["c2", "c2-pnpres"] let expSubscribedChannelNames = ["c2"] @@ -150,14 +155,23 @@ class SubscribeInputTests: XCTestCase { PubNubChannel(id: "g3", withPresence: true) ] ) - + let presenceChannelsToRemove = [ + PubNubChannel(id: "c1", withPresence: true), + PubNubChannel(id: "c3", withPresence: true) + ] + let presenceGroupsToRemove = [ + PubNubChannel(id: "g1"), + PubNubChannel(id: "g3") + ] let result = input1.removing( - channels: ["c1".presenceChannelName, "c2".presenceChannelName, "c3".presenceChannelName], - and: ["g1".presenceChannelName, "g3".presenceChannelName] + channels: [], + presenceChannelsOnly: presenceChannelsToRemove, + groups: [], + presenceGroupsOnly: presenceGroupsToRemove ) let newInput = result.newInput - let expAllSubscribedChannelNames = ["c1", "c2", "c3"] + let expAllSubscribedChannelNames = ["c1", "c2", "c2-pnpres", "c3"] let expSubscribedChannelNames = ["c1", "c2", "c3"] let expAllSubscribedGroupNames = ["g1", "g2", "g2-pnpres", "g3"] let expSubscribedGroupNames = ["g1", "g2", "g3"] diff --git a/Tests/PubNubTests/EventEngine/Subscribe/SubscribeTransitionTests.swift b/Tests/PubNubTests/EventEngine/Subscribe/SubscribeTransitionTests.swift index 2483afb6..d3735da7 100644 --- a/Tests/PubNubTests/EventEngine/Subscribe/SubscribeTransitionTests.swift +++ b/Tests/PubNubTests/EventEngine/Subscribe/SubscribeTransitionTests.swift @@ -180,13 +180,12 @@ class SubscribeTransitionTests: XCTestCase { } func test_SubscriptionChangedForHandshakeReconnectingState() throws { - let reason = PubNubError(.unknown) let results = transition.transition( from: Subscribe.HandshakeReconnectingState( input: input, cursor: SubscribeCursor(timetoken: 0, region: 0), retryAttempt: 1, - reason: reason + reason: PubNubError(.unknown) ), event: .subscriptionChanged( channels: ["c1", "c1", "c1-pnpres", "c2"], @@ -612,13 +611,12 @@ class SubscribeTransitionTests: XCTestCase { } func test_SubscriptionRestoredForHandshakeReconnectingState() { - let reason = PubNubError(.unknown) let results = transition.transition( from: Subscribe.HandshakeReconnectingState( input: input, cursor: SubscribeCursor(timetoken: 0, region: 0), retryAttempt: 1, - reason: reason + reason: PubNubError(.unknown) ), event: .subscriptionRestored( channels: ["c1", "c1-pnpres", "c2", "c2", "c2-pnpres", "c3", "c3-pnpres", "c4"], @@ -794,7 +792,6 @@ class SubscribeTransitionTests: XCTestCase { // MARK: - Handshake Reconnect Success func test_HandshakeReconnectSuccessForReconnectingState() { - let reason = PubNubError(.unknown) let cursor = SubscribeCursor( timetoken: 200400600, region: 45 @@ -804,7 +801,7 @@ class SubscribeTransitionTests: XCTestCase { input: input, cursor: SubscribeCursor(timetoken: 0, region: 0), retryAttempt: 1, - reason: reason + reason: PubNubError(.unknown) ), event: .handshakeReconnectSuccess(cursor: cursor) ) @@ -833,13 +830,12 @@ class SubscribeTransitionTests: XCTestCase { // MARK: - Handshake Reconnect Failure func test_HandshakeReconnectFailedForReconnectingState() { - let reason = PubNubError(.unknown) let results = transition.transition( from: Subscribe.HandshakeReconnectingState( input: input, cursor: SubscribeCursor(timetoken: 0, region: 0), retryAttempt: 0, - reason: reason + reason: PubNubError(.unknown) ), event: .handshakeReconnectFailure(error: PubNubError(.unknown)) ) @@ -854,7 +850,7 @@ class SubscribeTransitionTests: XCTestCase { input: input, cursor: SubscribeCursor(timetoken: 0, region: 0), retryAttempt: 1, - reason: reason + reason: PubNubError(.unknown) ) XCTAssertTrue(results.state.isEqual(to: expectedState)) @@ -864,13 +860,12 @@ class SubscribeTransitionTests: XCTestCase { // MARK: - Handshake Give Up func test_HandshakeGiveUpForReconnectingState() { - let reason = PubNubError(.unknown) let results = transition.transition( from: Subscribe.HandshakeReconnectingState( input: input, cursor: SubscribeCursor(timetoken: 0, region: 0), retryAttempt: 3, - reason: reason + reason: PubNubError(.unknown) ), event: .handshakeReconnectGiveUp(error: PubNubError(.unknown)) ) @@ -895,12 +890,11 @@ class SubscribeTransitionTests: XCTestCase { // MARK: - Receive Give Up func test_ReceiveGiveUpForReconnectingState() { - let reason = PubNubError(.unknown) let results = transition.transition( from: Subscribe.ReceiveReconnectingState( input: input, cursor: SubscribeCursor(timetoken: 18001000, region: 123), retryAttempt: 3, - reason: reason + reason: PubNubError(.unknown) ), event: .receiveReconnectGiveUp(error: PubNubError(.unknown)) ) @@ -959,7 +953,6 @@ class SubscribeTransitionTests: XCTestCase { // MARK: - Receive Failed func test_ReceiveFailedForReceivingState() { - let reason = PubNubError(.unknown) let results = transition.transition( from: Subscribe.ReceivingState( input: input, @@ -981,7 +974,7 @@ class SubscribeTransitionTests: XCTestCase { input: input, cursor: SubscribeCursor(timetoken: 100500900, region: 11), retryAttempt: 0, - reason: reason + reason: PubNubError(.unknown) ) XCTAssertTrue(results.state.isEqual(to: expectedState)) @@ -989,13 +982,12 @@ class SubscribeTransitionTests: XCTestCase { } func test_ReceiveReconnectFailedForReconnectingState() { - let reason = PubNubError(.unknown) let results = transition.transition( from: Subscribe.ReceiveReconnectingState( input: input, cursor: SubscribeCursor(timetoken: 100500900, region: 11), retryAttempt: 1, - reason: reason + reason: PubNubError(.unknown) ), event: .receiveReconnectFailure(error: PubNubError(.unknown)) ) @@ -1013,7 +1005,7 @@ class SubscribeTransitionTests: XCTestCase { input: input, cursor: SubscribeCursor(timetoken: 100500900, region: 11), retryAttempt: 2, - reason: reason + reason: PubNubError(.unknown) ) XCTAssertTrue(results.state.isEqual(to: expectedState)) diff --git a/Tests/PubNubTests/Events/New/SubscriptionSetTests.swift b/Tests/PubNubTests/Events/New/SubscriptionSetTests.swift new file mode 100644 index 00000000..7001932d --- /dev/null +++ b/Tests/PubNubTests/Events/New/SubscriptionSetTests.swift @@ -0,0 +1,128 @@ +// +// SubscriptionSetTests.swift +// +// Copyright (c) PubNub Inc. +// All rights reserved. +// +// This source code is licensed under the license found in the +// LICENSE file in the root directory of this source tree. +// + +import XCTest +@testable import PubNub + +class SubscriptionSetTests: XCTestCase { + private let pubnub = PubNub( + configuration: PubNubConfiguration( + publishKey: "pubKey", + subscribeKey: "subKey", + userId: "userId" + ) + ) + + func testSubscriptionSet_VariousPayloads() { + let messagesExpectation = XCTestExpectation(description: "Message") + messagesExpectation.assertForOverFulfill = true + messagesExpectation.expectedFulfillmentCount = 2 + + let signalExpectation = XCTestExpectation(description: "Signal") + signalExpectation.assertForOverFulfill = true + signalExpectation.expectedFulfillmentCount = 2 + + let messageAction = XCTestExpectation(description: "Message Action") + messageAction.assertForOverFulfill = true + messageAction.expectedFulfillmentCount = 2 + + let presenceChangeExpectation = XCTestExpectation(description: "Presence") + presenceChangeExpectation.assertForOverFulfill = true + presenceChangeExpectation.expectedFulfillmentCount = 2 + + let appContextExpectation = XCTestExpectation(description: "App Context") + appContextExpectation.assertForOverFulfill = true + appContextExpectation.expectedFulfillmentCount = 2 + + let fileExpectation = XCTestExpectation(description: "File") + fileExpectation.assertForOverFulfill = true + fileExpectation.expectedFulfillmentCount = 2 + + let allEventsExpectation = XCTestExpectation(description: "All Events") + allEventsExpectation.assertForOverFulfill = true + allEventsExpectation.expectedFulfillmentCount = 2 + + let singleEventExpectation = XCTestExpectation(description: "Single Event") + singleEventExpectation.expectedFulfillmentCount = 12 + singleEventExpectation.assertForOverFulfill = true + + let subscription = pubnub.subscription(entities: [ + pubnub.channel("c1"), + pubnub.channel("c2") + ]) + + subscription.eventsStream = { _ in + allEventsExpectation.fulfill() + singleEventExpectation.fulfill() + } + subscription.messagesStream = { _ in + messagesExpectation.fulfill() + singleEventExpectation.fulfill() + } + subscription.signalsStream = { _ in + signalExpectation.fulfill() + singleEventExpectation.fulfill() + } + subscription.messageActionsStream = { _ in + messageAction.fulfill() + singleEventExpectation.fulfill() + } + subscription.presenceStream = { _ in + presenceChangeExpectation.fulfill() + } + subscription.appContextStream = { _ in + appContextExpectation.fulfill() + singleEventExpectation.fulfill() + } + subscription.filesStream = { _ in + fileExpectation.fulfill() + singleEventExpectation.fulfill() + } + subscription.onPayloadsReceived(payloads: [ + mockMessagePayload(channel: "c1"), mockMessagePayload(channel: "c1"), + mockSignalPayload(channel: "c1"), mockSignalPayload(channel: "c2"), + mockPresenceChangePayload(channel: "c1"), mockPresenceChangePayload(channel: "c2"), + mockAppContextPayload(channel: "c1"), mockAppContextPayload(channel: "c2"), + mockFilePayload(channel: "c1"), mockFilePayload(channel: "c2"), + mockMessageActionPayload(channel: "c1"), mockMessageActionPayload(channel: "c2") + ]) + + let allExpectations = [ + messagesExpectation, signalExpectation, presenceChangeExpectation, + messageAction, fileExpectation, appContextExpectation, + allEventsExpectation, singleEventExpectation + ] + + wait(for: allExpectations, timeout: 1.0) + } + + func testSubscriptionSetTopology() { + let subscriptionSet = pubnub.subscription( + entities: [ + pubnub.channel("c1"), + pubnub.channel("c2"), + pubnub.channelGroup("g1"), + ], options: ReceivePresenceEvents() + ) + let expectedTopology: [SubscribableType: [String]] = [ + .channel : ["c1", "c1-pnpres", "c2", "c2-pnpres"], + .channelGroup: ["g1", "g1-pnpres"] + ] + + XCTAssertEqual( + subscriptionSet.subscriptionTopology[.channel]!.sorted(by: <), + expectedTopology[.channel]!.sorted(by: <) + ) + XCTAssertEqual( + subscriptionSet.subscriptionTopology[.channelGroup]!.sorted(by: <), + expectedTopology[.channelGroup]!.sorted(by: <) + ) + } +} diff --git a/Tests/PubNubTests/Events/New/SubscriptionTests.swift b/Tests/PubNubTests/Events/New/SubscriptionTests.swift new file mode 100644 index 00000000..0ef559a3 --- /dev/null +++ b/Tests/PubNubTests/Events/New/SubscriptionTests.swift @@ -0,0 +1,239 @@ +// +// SubscriptionTests.swift +// +// Copyright (c) PubNub Inc. +// All rights reserved. +// +// This source code is licensed under the license found in the +// LICENSE file in the root directory of this source tree. +// + +import XCTest +@testable import PubNub + +class SubscriptionTests: XCTestCase { + private let pubnub = PubNub( + configuration: PubNubConfiguration( + publishKey: "pubKey", + subscribeKey: "subKey", + userId: "userId" + ) + ) + + func testSubscription_VariousPayloads() { + let messagesExpectation = XCTestExpectation(description: "Message") + messagesExpectation.assertForOverFulfill = true + messagesExpectation.expectedFulfillmentCount = 1 + + let signalExpectation = XCTestExpectation(description: "Signal") + signalExpectation.assertForOverFulfill = true + signalExpectation.expectedFulfillmentCount = 1 + + let messageAction = XCTestExpectation(description: "Message Action") + messageAction.assertForOverFulfill = true + messageAction.expectedFulfillmentCount = 1 + + let presenceChangeExpectation = XCTestExpectation(description: "Presence") + presenceChangeExpectation.assertForOverFulfill = true + presenceChangeExpectation.expectedFulfillmentCount = 1 + + let appContextExpectation = XCTestExpectation(description: "App Context") + appContextExpectation.assertForOverFulfill = true + appContextExpectation.expectedFulfillmentCount = 1 + + let fileExpectation = XCTestExpectation(description: "File") + fileExpectation.assertForOverFulfill = true + fileExpectation.expectedFulfillmentCount = 1 + + let allEventsExpectation = XCTestExpectation(description: "All Events") + allEventsExpectation.assertForOverFulfill = true + allEventsExpectation.expectedFulfillmentCount = 1 + + let singleEventExpectation = XCTestExpectation(description: "Single Event") + singleEventExpectation.expectedFulfillmentCount = 6 + singleEventExpectation.assertForOverFulfill = true + + let channel = pubnub.channel("test-channel") + let subscription = channel.subscription() + + subscription.eventsStream = { _ in + allEventsExpectation.fulfill() + singleEventExpectation.fulfill() + } + subscription.messagesStream = { _ in + messagesExpectation.fulfill() + singleEventExpectation.fulfill() + } + subscription.signalsStream = { _ in + signalExpectation.fulfill() + singleEventExpectation.fulfill() + } + subscription.messageActionsStream = { _ in + messageAction.fulfill() + singleEventExpectation.fulfill() + } + subscription.presenceStream = { _ in + presenceChangeExpectation.fulfill() + } + subscription.appContextStream = { _ in + appContextExpectation.fulfill() + singleEventExpectation.fulfill() + } + subscription.filesStream = { _ in + fileExpectation.fulfill() + singleEventExpectation.fulfill() + } + subscription.onPayloadsReceived(payloads: [ + mockMessagePayload(channel: channel.name), mockSignalPayload(channel: channel.name), + mockPresenceChangePayload(channel: channel.name), mockAppContextPayload(channel: channel.name), + mockFilePayload(channel: channel.name), mockMessageActionPayload(channel: channel.name) + ]) + + let allExpectations = [ + messagesExpectation, signalExpectation, presenceChangeExpectation, + messageAction, fileExpectation, appContextExpectation, + allEventsExpectation, singleEventExpectation + ] + + wait(for: allExpectations, timeout: 1.0) + } + + func testSubscription_PayloadsFromDifferentChannel() { + let messagesExpectation = XCTestExpectation(description: "Message") + messagesExpectation.isInverted = true + messagesExpectation.assertForOverFulfill = true + + let signalExpectation = XCTestExpectation(description: "Signal") + signalExpectation.isInverted = true + signalExpectation.assertForOverFulfill = true + + let messageAction = XCTestExpectation(description: "Message Action") + messageAction.isInverted = true + messageAction.assertForOverFulfill = true + + let presenceChangeExpectation = XCTestExpectation(description: "Presence") + presenceChangeExpectation.isInverted = true + presenceChangeExpectation.assertForOverFulfill = true + presenceChangeExpectation.expectedFulfillmentCount = 1 + + let appContextExpectation = XCTestExpectation(description: "App Context") + appContextExpectation.isInverted = true + appContextExpectation.assertForOverFulfill = true + + let fileExpectation = XCTestExpectation(description: "File") + fileExpectation.isInverted = true + fileExpectation.assertForOverFulfill = true + + let allEventsExpectation = XCTestExpectation(description: "All Events") + allEventsExpectation.isInverted = true + allEventsExpectation.assertForOverFulfill = true + + let singleEventExpectation = XCTestExpectation(description: "Single Event") + singleEventExpectation.isInverted = true + singleEventExpectation.expectedFulfillmentCount = 6 + + let channel = pubnub.channel("channel") + let subscription = channel.subscription() + + subscription.eventsStream = { _ in + allEventsExpectation.fulfill() + singleEventExpectation.fulfill() + } + subscription.messagesStream = { _ in + messagesExpectation.fulfill() + singleEventExpectation.fulfill() + } + subscription.signalsStream = { _ in + signalExpectation.fulfill() + singleEventExpectation.fulfill() + } + subscription.messageActionsStream = { _ in + messageAction.fulfill() + singleEventExpectation.fulfill() + } + subscription.presenceStream = { _ in + presenceChangeExpectation.fulfill() + } + subscription.appContextStream = { _ in + appContextExpectation.fulfill() + singleEventExpectation.fulfill() + } + subscription.filesStream = { _ in + fileExpectation.fulfill() + singleEventExpectation.fulfill() + } + subscription.onPayloadsReceived(payloads: [ + mockMessagePayload(channel: "diff"), mockSignalPayload(channel: "diff"), + mockPresenceChangePayload(channel: "diff"), mockAppContextPayload(channel: "diff"), + mockFilePayload(channel: "diff"), mockMessageActionPayload(channel: "diff") + ]) + + let allExpectations = [ + messagesExpectation, signalExpectation, presenceChangeExpectation, + messageAction, fileExpectation, appContextExpectation, + allEventsExpectation, singleEventExpectation + ] + + wait(for: allExpectations, timeout: 0.5) + } + + func testSubscription_WildcardSubscription() { + let expectation = XCTestExpectation(description: "Message") + expectation.assertForOverFulfill = true + expectation.expectedFulfillmentCount = 1 + + let channel = pubnub.channel("channel.item.*") + let subscription = channel.subscription() + + subscription.messagesStream = { message in + expectation.fulfill() + } + subscription.onPayloadsReceived( + payloads: [mockMessagePayload(channel: "channel.item.x")] + ) + wait(for: [expectation], timeout: 0.5) + } + + func testSubscription_WithFilterOption() { + let expectation = XCTestExpectation(description: "Message") + expectation.assertForOverFulfill = true + expectation.expectedFulfillmentCount = 1 + + let channel = pubnub.channel("channel") + let subscription = channel.subscription(options: FilterOption(predicate: { event in + guard case let .messageReceived(message) = event else { + return false + } + if message.payload.stringOptional == "Hey!" { + expectation.fulfill(); return true + } else { + return false + } + })) + + subscription.onPayloadsReceived(payloads: [ + mockMessagePayload(channel: channel.name, message: "This is a message"), + mockMessagePayload(channel: channel.name, message: "Hey!") + ]) + + wait(for: [expectation], timeout: 1.0) + } + + func testSubscription_ReceivePresenceEvents() { + let channel = pubnub.channel("c") + let subscription = channel.subscription(options: ReceivePresenceEvents()) + + XCTAssertEqual(subscription.subscriptionNames, ["c", "c-pnpres"]) + XCTAssertEqual(subscription.subscriptionType, .channel) + XCTAssertEqual(subscription.subscriptionTopology, [.channel: ["c", "c-pnpres"]]) + } + + func testSubscription_ReceivePresenceEventsForChannelGroup() { + let channel = pubnub.channelGroup("g") + let subscription = channel.subscription(options: ReceivePresenceEvents()) + + XCTAssertEqual(subscription.subscriptionNames, ["g", "g-pnpres"]) + XCTAssertEqual(subscription.subscriptionType, .channelGroup) + XCTAssertEqual(subscription.subscriptionTopology, [.channelGroup: ["g", "g-pnpres"]]) + } +} diff --git a/Tests/PubNubTests/Events/EventStreamTests.swift b/Tests/PubNubTests/Events/Old/EventStreamTests.swift similarity index 100% rename from Tests/PubNubTests/Events/EventStreamTests.swift rename to Tests/PubNubTests/Events/Old/EventStreamTests.swift diff --git a/Tests/PubNubTests/Events/Old/SessionStreamTests.swift b/Tests/PubNubTests/Events/Old/SessionStreamTests.swift new file mode 100644 index 00000000..138913f8 --- /dev/null +++ b/Tests/PubNubTests/Events/Old/SessionStreamTests.swift @@ -0,0 +1,102 @@ +// +// SessionStreamTests.swift +// +// Copyright (c) PubNub Inc. +// All rights reserved. +// +// This source code is licensed under the license found in the +// LICENSE file in the root directory of this source tree. +// + +@testable import PubNub +import XCTest + +class SessionStreamTests: XCTestCase { + var pubnub: PubNub! + let config = PubNubConfiguration(publishKey: "FakeTestString", subscribeKey: "FakeTestString", userId: UUID().uuidString) + + // swiftlint:disable:next function_body_length + func testSessionStream_Closure() { + var expectations = [XCTestExpectation]() + + let closureStream = SessionListener(queue: DispatchQueue(label: "Session Listener", + qos: .userInitiated, + attributes: .concurrent)) + let multiplexStream = MultiplexSessionStream([closureStream]) + + guard let sessions = try? MockURLSession.mockSession(for: ["time_success"], + with: multiplexStream) + else { + return XCTFail("Could not create mock url session") + } + + let sessionMultiplex = sessions.session?.sessionStream as? MultiplexSessionStream + let sessionListener = sessionMultiplex?.streams.first as? SessionListener + + XCTAssertEqual(sessionMultiplex, multiplexStream) + XCTAssertEqual(sessionListener, closureStream) + + let sessionExpector = SessionExpector(session: closureStream) + + sessionExpector.expectDidCreateURLRequest { request, urlRequest in + XCTAssertEqual(request.urlRequest, urlRequest) + } + + sessionExpector.expectDidCreateTask { request, task in + let mockTask = sessions.mockSession.tasks.first + XCTAssertEqual(request.urlRequest, mockTask?.originalRequest) + XCTAssertEqual(task.taskIdentifier, mockTask?.taskIdentifier) + } + + sessionExpector.expectDidResumeTask { request, task in + let mockTask = sessions.mockSession.tasks.first + XCTAssertEqual(request.urlRequest, mockTask?.originalRequest) + XCTAssertEqual(task.taskIdentifier, mockTask?.taskIdentifier) + } + + sessionExpector.expectDidCompleteTask { request, task in + let mockTask = sessions.mockSession.tasks.first + XCTAssertEqual(request.urlRequest, mockTask?.originalRequest) + XCTAssertEqual(task.taskIdentifier, mockTask?.taskIdentifier) + } + + sessionExpector.expectDidResumeRequest { request in + XCTAssertEqual(request.urlRequest, sessions.mockSession.tasks.first?.originalRequest) + } + + sessionExpector.expectDidFinishRequest { request in + XCTAssertEqual(request.urlRequest, sessions.mockSession.tasks.first?.originalRequest) + } + + sessionExpector.expectDidReceiveURLSessionData { urlSession, task, data in + let mockTask = sessions.mockSession.tasks.first as? MockURLSessionDataTask + XCTAssertEqual(urlSession.sessionDescription, sessions.mockSession.sessionDescription) + XCTAssertEqual(task.taskIdentifier, mockTask?.taskIdentifier) + XCTAssertEqual(data, mockTask?.mockData) + } + + sessionExpector.expectDidCompleteURLSessionTask { urlSession, task, error in + XCTAssertEqual(urlSession.sessionDescription, sessions.mockSession.sessionDescription) + XCTAssertEqual(task.taskIdentifier, sessions.mockSession.tasks.first?.taskIdentifier) + XCTAssertNil(error) + } + + let totalExpectation = expectation(description: "Time Response Received") + pubnub = PubNub(configuration: config, session: sessions.session) + pubnub.time { result in + switch result { + case let .success(timetoken): + XCTAssertEqual(timetoken, 15_643_405_135_132_358) + case let .failure(error): + XCTFail("Time request failed with error: \(error.localizedDescription)") + } + totalExpectation.fulfill() + } + expectations.append(totalExpectation) + + XCTAssertEqual(sessionExpector.expectations.count, 8) + expectations.append(contentsOf: sessionExpector.expectations) + + wait(for: expectations, timeout: 1.0) + } +} diff --git a/Tests/PubNubTests/Events/SubscriptionStreamTests.swift b/Tests/PubNubTests/Events/Old/SubscriptionStreamTests.swift similarity index 100% rename from Tests/PubNubTests/Events/SubscriptionStreamTests.swift rename to Tests/PubNubTests/Events/Old/SubscriptionStreamTests.swift diff --git a/Tests/PubNubTests/Events/SubscribeMessagesGeneratorTests.swift b/Tests/PubNubTests/Events/SubscribeMessagesGeneratorTests.swift new file mode 100644 index 00000000..1936d84f --- /dev/null +++ b/Tests/PubNubTests/Events/SubscribeMessagesGeneratorTests.swift @@ -0,0 +1,136 @@ +// +// SubscribeMessagesGeneratorTests.swift +// +// Copyright (c) PubNub Inc. +// All rights reserved. +// +// This source code is licensed under the license found in the +// LICENSE file in the root directory of this source tree. +// + +import Foundation +@testable import PubNub + +func mockMessagePayload( + channel: String = "channel", + message: String = "Hello, this is a message" +) -> SubscribeMessagePayload { + generateMessage( + with: .message, + channel: channel, + payload: AnyJSON(message) + ) +} + +func mockSignalPayload( + channel: String = "channel" +) -> SubscribeMessagePayload { + generateMessage( + with: .signal, + channel: channel, + payload: "Hello, this is a signal" + ) +} + +func mockAppContextPayload( + channel: String = "channel" +) -> SubscribeMessagePayload { + generateMessage( + with: .object, + channel: channel, + payload: AnyJSON( + SubscribeObjectMetadataPayload( + source: "123", + version: "456", + event: .delete, + type: .uuid, + subscribeEvent: .uuidMetadataRemoved(metadataId: "12345") + ) + ) + ) +} + +func mockMessageActionPayload( + channel: String = "channel" +) -> SubscribeMessagePayload { + generateMessage( + with: .messageAction, + channel: channel, + payload: AnyJSON( + [ + "event": "added", + "source": "actions", + "version": "1.0", + "data": [ + "messageTimetoken": "16844114408637596", + "type": "receipt", + "actionTimetoken": "16844114409339370", + "value": "read" + ] + ] as [String: Any] + ) + ) +} + +func mockFilePayload( + channel: String = "channel" +) -> SubscribeMessagePayload { + generateMessage( + with: .file, + channel: channel, + payload: AnyJSON(FilePublishPayload( + channel: channel, + fileId: "", + filename: "", + size: 54556, + contentType: "image/jpeg", + createdDate: nil, + additionalDetails: nil + )) + ) +} + +func mockPresenceChangePayload( + channel: String = "channel" +) -> SubscribeMessagePayload { + generateMessage( + with: .presence, + channel: channel, + payload: AnyJSON( + SubscribePresencePayload( + actionEvent: .join, + occupancy: 15, + uuid: nil, + timestamp: 123123, + refreshHereNow: false, + state: nil, + join: ["dsadf", "fdsa"], + leave: [], + timeout: [] + ) + ) + ) +} + +func generateMessage( + with type: SubscribeMessagePayload.Action, + subscription: String? = nil, + channel: String = "test-channel", + publishTimetoken: SubscribeCursor = SubscribeCursor(timetoken: 122412, region: 1), + payload: AnyJSON +) -> SubscribeMessagePayload { + SubscribeMessagePayload( + shard: "shard", + subscription: subscription, + channel: channel, + messageType: type, + payload: payload, + flags: 123, + publisher: "publisher", + subscribeKey: "FakeKey", + originTimetoken: nil, + publishTimetoken: publishTimetoken, + meta: nil, + error: nil + ) +} diff --git a/Tests/PubNubTests/Integration/SubscriptionIntegrationTests.swift b/Tests/PubNubTests/Integration/SubscriptionIntegrationTests.swift index bdb37e6c..64ebe6be 100644 --- a/Tests/PubNubTests/Integration/SubscriptionIntegrationTests.swift +++ b/Tests/PubNubTests/Integration/SubscriptionIntegrationTests.swift @@ -14,7 +14,6 @@ import XCTest class SubscriptionIntegrationTests: XCTestCase { let testsBundle = Bundle(for: SubscriptionIntegrationTests.self) let testChannel = "SwiftSubscriptionITestsChannel" - let configuration = PubNubConfiguration(publishKey: "", subscribeKey: "", userId: UUID().uuidString) func testSubscribeError() { let configuration = PubNubConfiguration( @@ -60,6 +59,7 @@ class SubscriptionIntegrationTests: XCTestCase { pubnub.add(listener) pubnub.subscribe(to: [testChannel]) + defer { pubnub.disconnect() } wait(for: [subscribeExpect, connectingExpect, disconnectedExpect], timeout: 10.0) } } @@ -95,7 +95,7 @@ class SubscriptionIntegrationTests: XCTestCase { var connectedCount = 0 let listener = SubscriptionListener() - listener.didReceiveSubscription = { [unowned self] event in + listener.didReceiveSubscription = { [unowned self, unowned pubnub] event in switch event { case let .subscriptionChanged(status): switch status { @@ -139,8 +139,179 @@ class SubscriptionIntegrationTests: XCTestCase { pubnub.add(listener) pubnub.subscribe(to: [testChannel]) - wait(for: [subscribeExpect, unsubscribeExpect, publishExpect, connectedExpect, disconnectedExpect], timeout: 333.0) + defer { pubnub.disconnect() } + wait(for: [subscribeExpect, unsubscribeExpect, publishExpect, connectedExpect, disconnectedExpect], timeout: 30.0) } } } + + func test_MixedSubscriptions() { + let configurationFromBundle = PubNubConfiguration( + from: testsBundle + ) + let configWithEventEngineEnabled = PubNubConfiguration( + publishKey: configurationFromBundle.publishKey, + subscribeKey: configurationFromBundle.subscribeKey, + userId: configurationFromBundle.userId, + enableEventEngine: true + ) + + for config in [configurationFromBundle, configWithEventEngineEnabled] { + XCTContext.runActivity(named: "Testing with enableEventEngine=\(config.enableEventEngine)") { _ in + let subscribedEventExpect = XCTestExpectation(description: "SubscribedEvent") + subscribedEventExpect.assertForOverFulfill = true + subscribedEventExpect.expectedFulfillmentCount = 1 + + let responseHeaderExpect = XCTestExpectation(description: "ResponseReceivedEvent") + responseHeaderExpect.assertForOverFulfill = true + responseHeaderExpect.expectedFulfillmentCount = 1 + + let usubscribeEventExpect = XCTestExpectation(description: "UnsubscribedEvent") + usubscribeEventExpect.assertForOverFulfill = true + usubscribeEventExpect.expectedFulfillmentCount = 1 + + let pubnub = PubNub(configuration: config) + let listener = SubscriptionListener() + var firstSubscription: Subscription? = pubnub.channel(testChannel).subscription() + var secondSubscription: Subscription? = pubnub.channel(testChannel).subscription() + var subscriptionSet: SubscriptionSet? = pubnub.subscription(entities: [pubnub.channel(testChannel)]) + + listener.didReceiveSubscription = { [unowned self, unowned pubnub] event in + switch event { + case let .subscriptionChanged(status): + switch status { + case let .subscribed(channels, _): + XCTAssertTrue(channels.contains(where: { $0.id == self.testChannel })) + XCTAssertTrue(pubnub.subscribedChannels.contains(self.testChannel)) + subscribedEventExpect.fulfill() + case let .responseHeader(channels, _, _, _): + XCTAssertTrue(channels.contains(where: { $0.id == self.testChannel })) + responseHeaderExpect.fulfill() + case let .unsubscribed(channels, _): + XCTAssertTrue(channels.contains(where: { $0.id == self.testChannel })) + XCTAssertFalse(pubnub.subscribedChannels.contains(self.testChannel)) + usubscribeEventExpect.fulfill() + } + case let .connectionStatusChanged(status): + switch status { + case .connected: + firstSubscription = nil + secondSubscription = nil + pubnub.unsubscribe(from: [self.testChannel]) + subscriptionSet?.unsubscribe() + subscriptionSet = nil + default: + break + } + case let .subscribeError(error): + XCTFail("An error was returned: \(error)") + default: + break + } + } + + pubnub.add(listener) + pubnub.subscribe(to: [testChannel]) + firstSubscription?.subscribe() + secondSubscription?.subscribe() + subscriptionSet?.subscribe() + + defer { pubnub.disconnect() } + wait(for: [subscribedEventExpect, responseHeaderExpect, usubscribeEventExpect], timeout: 30.0) + } + } + } + + func test_GlobalSubscription() { + let configurationFromBundle = PubNubConfiguration( + from: testsBundle + ) + let configWithEventEngineEnabled = PubNubConfiguration( + publishKey: configurationFromBundle.publishKey, + subscribeKey: configurationFromBundle.subscribeKey, + userId: configurationFromBundle.userId, + enableEventEngine: true + ) + + for config in [configurationFromBundle, configWithEventEngineEnabled] { + XCTContext.runActivity(named: "Testing with enableEventEngine=\(config.enableEventEngine)") { _ in + let messageExpect = XCTestExpectation(description: "Message") + messageExpect.assertForOverFulfill = true + messageExpect.expectedFulfillmentCount = 1 + + let statusExpect = XCTestExpectation(description: "StatusExpect") + statusExpect.assertForOverFulfill = true + statusExpect.expectedFulfillmentCount = 3 + + let pubnub = PubNub(configuration: config) + var statusCounter = 0 + + pubnub.messagesStream = { [unowned pubnub] message in + XCTAssertTrue(message.payload.stringOptional == "This is a message") + messageExpect.fulfill() + pubnub.unsubscribe(from: [self.testChannel]) + } + pubnub.didReceiveConnectionStatusChange = { [unowned pubnub] change in + if statusCounter == 0 { + XCTAssertTrue(change == .connecting) + } else if statusCounter == 1 { + XCTAssertTrue(change == .connected) + pubnub.publish(channel: self.testChannel, message: "This is a message", completion: nil) + } else if statusCounter == 2 { + XCTAssertTrue(change == .disconnected) + } else { + XCTFail("Unexpected condition") + } + statusCounter += 1 + statusExpect.fulfill() + } + pubnub.subscribe(to: [testChannel]) + + defer { pubnub.disconnect() } + wait(for: [statusExpect, messageExpect], timeout: 30.0) + } + } + } + + func test_SimultaneousSubscriptions() { + let expectation = XCTestExpectation(description: "Expectation") + expectation.assertForOverFulfill = true + expectation.expectedFulfillmentCount = 3 + + let publishExpectation = XCTestExpectation(description: "Publish") + publishExpectation.assertForOverFulfill = true + publishExpectation.expectedFulfillmentCount = 1 + + let configWithEventEngineEnabled = PubNubConfiguration( + publishKey: PubNubConfiguration(from: testsBundle).publishKey, + subscribeKey: PubNubConfiguration(from: testsBundle).subscribeKey, + userId: PubNubConfiguration(from: testsBundle).userId, + enableEventEngine: true + ) + + let pubnub = PubNub(configuration: configWithEventEngineEnabled) + let timetoken = Timetoken(Int(Date().timeIntervalSince1970 * 10000000)) + + pubnub.publish(channel: testChannel, message: "Message", completion: { [unowned pubnub, unowned self] _ in + pubnub.publish(channel: self.testChannel, message: "Second message", completion: { _ in + publishExpectation.fulfill() + }) + }) + wait(for: [publishExpectation], timeout: 1.5) + + let anotherChannel = testChannel.appending("2") + let listener = SubscriptionListener() + + listener.didReceiveMessage = { message in + expectation.fulfill() + } + + pubnub.add(listener) + pubnub.subscribe(to: [testChannel], at: timetoken) + pubnub.publish(channel: testChannel, message: "Third message", completion: nil) + pubnub.subscribe(to: [anotherChannel]) + + defer { pubnub.disconnect() } + wait(for: [expectation], timeout: 10) + } }