Skip to content

Commit

Permalink
Subscribe & Presence Event Engine
Browse files Browse the repository at this point in the history
* Adding `.connnectionError` and removing `.connecting` and `.reconnecting` from `ConnectionStatus`
* Deprecating `SubscriptionSession` and `SubscribeSessionFactory`
* Limiting the scope of public structs/classes associated with the Subscribe loop that aren't part of any exposed public method/variable
* Introducing Strategy pattern to handle both old & new Subscribe loop implementation
* Removing local events emitted from Subscribe loop
* Maintaining `state` parameter for /v2/subscribe and /v2/presence/heartbeat
* Adding `enableEventEngine` and `maintainPresenceState` flags in `PubNubConfiguration`
* Improved AutomaticRetry and making possible to retry other requests
* Fixing unit & contract tests according to changes above
  • Loading branch information
jguz-pubnub committed Dec 22, 2023
1 parent b47b478 commit ce9360d
Show file tree
Hide file tree
Showing 77 changed files with 9,481 additions and 1,236 deletions.
18 changes: 3 additions & 15 deletions Examples/Sources/MasterDetailTableViewController.swift
Original file line number Diff line number Diff line change
Expand Up @@ -250,26 +250,14 @@ class MasterDetailTableViewController: UITableViewController {
print("The signal is \(signal.payload) and was sent by \(signal.publisher ?? "")")
case let .connectionStatusChanged(connectionChange):
switch connectionChange {
case .connecting:
print("Status connecting...")
case .connected:
print("Status connected!")
case .reconnecting:
print("Status reconnecting...")
case .connectionError:
print("Error while attempting to initialize connection")
case .disconnected:
print("Status disconnected")
case .disconnectedUnexpectedly:
print("Status disconnected unexpectedly!")
}
case let .subscriptionChanged(subscribeChange):
switch subscribeChange {
case let .subscribed(channels, groups):
print("\(channels) and \(groups) were added to subscription")
case let .responseHeader(channels, groups, previous, next):
print("\(channels) and \(groups) recevied a response at \(previous?.timetoken ?? 0)")
print("\(next?.timetoken ?? 0) will be used as the new timetoken")
case let .unsubscribed(channels, groups):
print("\(channels) and \(groups) were removed from subscription")
print("Disconnected unexpectedly")
}
case let .presenceChanged(presenceChange):
print("The channel \(presenceChange.channel) has an updated occupancy of \(presenceChange.occupancy)")
Expand Down
2 changes: 1 addition & 1 deletion Podfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@ SPEC CHECKSUMS:

PODFILE CHECKSUM: 61a40240486621bb01f596fdd5bc632504940fab

COCOAPODS: 1.12.1
COCOAPODS: 1.14.3
312 changes: 308 additions & 4 deletions PubNub.xcodeproj/project.pbxproj

Large diffs are not rendered by default.

7 changes: 6 additions & 1 deletion PubNubMembership/Sources/Membership+PubNub.swift
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import PubNubUser
public protocol PubNubMembershipInterface {
/// A copy of the configuration object used for this session
var configuration: PubNubConfiguration { get }

/// Session used for performing request/response REST calls
var networkSession: SessionReplaceable { get }

Expand Down Expand Up @@ -268,6 +267,7 @@ public extension PubNubMembershipInterface {
(requestConfig.customSession ?? networkSession)
.route(
router,
requestOperator: configuration.automaticRetry?[.appContext],
responseDecoder: FetchMultipleValueResponseDecoder<PubNubMembership.PartialSpace>(),
responseQueue: requestConfig.responseQueue
) { result in
Expand Down Expand Up @@ -320,6 +320,7 @@ public extension PubNubMembershipInterface {
(requestConfig.customSession ?? networkSession)
.route(
router,
requestOperator: configuration.automaticRetry?[.appContext],
responseDecoder: FetchMultipleValueResponseDecoder<PubNubMembership.PartialUser>(),
responseQueue: requestConfig.responseQueue
) { result in
Expand Down Expand Up @@ -365,6 +366,7 @@ public extension PubNubMembershipInterface {
(requestConfig.customSession ?? networkSession)
.route(
router,
requestOperator: configuration.automaticRetry?[.appContext],
responseDecoder: FetchStatusResponseDecoder(),
responseQueue: requestConfig.responseQueue
) { result in
Expand Down Expand Up @@ -401,6 +403,7 @@ public extension PubNubMembershipInterface {
(requestConfig.customSession ?? networkSession)
.route(
router,
requestOperator: configuration.automaticRetry?[.appContext],
responseDecoder: FetchStatusResponseDecoder(),
responseQueue: requestConfig.responseQueue
) { result in
Expand Down Expand Up @@ -463,6 +466,7 @@ public extension PubNubMembershipInterface {
(requestConfig.customSession ?? networkSession)
.route(
router,
requestOperator: configuration.automaticRetry?[.appContext],
responseDecoder: FetchStatusResponseDecoder(),
responseQueue: requestConfig.responseQueue
) { result in
Expand Down Expand Up @@ -499,6 +503,7 @@ public extension PubNubMembershipInterface {
(requestConfig.customSession ?? networkSession)
.route(
router,
requestOperator: configuration.automaticRetry?[.appContext],
responseDecoder: FetchStatusResponseDecoder(),
responseQueue: requestConfig.responseQueue
) { result in
Expand Down
5 changes: 4 additions & 1 deletion PubNubSpace/Sources/Space+PubNub.swift
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import PubNub
public protocol PubNubSpaceInterface {
/// A copy of the configuration object used for this session
var configuration: PubNubConfiguration { get }

/// Session used for performing request/response REST calls
var networkSession: SessionReplaceable { get }

Expand Down Expand Up @@ -213,6 +212,7 @@ public extension PubNubSpaceInterface {
(requestConfig.customSession ?? networkSession)
.route(
router,
requestOperator: configuration.automaticRetry?[.appContext],
responseDecoder: FetchMultipleValueResponseDecoder<PubNubSpace>(),
responseQueue: requestConfig.responseQueue
) { result in
Expand All @@ -237,6 +237,7 @@ public extension PubNubSpaceInterface {
(requestConfig.customSession ?? networkSession)
.route(
router,
requestOperator: configuration.automaticRetry?[.appContext],
responseDecoder: FetchSingleValueResponseDecoder<PubNubSpace>(),
responseQueue: requestConfig.responseQueue
) { result in
Expand Down Expand Up @@ -273,6 +274,7 @@ public extension PubNubSpaceInterface {
(requestConfig.customSession ?? networkSession)
.route(
router,
requestOperator: configuration.automaticRetry?[.appContext],
responseDecoder: FetchSingleValueResponseDecoder<PubNubSpace>(),
responseQueue: requestConfig.responseQueue
) { result in
Expand Down Expand Up @@ -317,6 +319,7 @@ public extension PubNubSpaceInterface {
(requestConfig.customSession ?? networkSession)
.route(
router,
requestOperator: configuration.automaticRetry?[.appContext],
responseDecoder: FetchStatusResponseDecoder(),
responseQueue: requestConfig.responseQueue
) { result in
Expand Down
6 changes: 4 additions & 2 deletions PubNubUser/Sources/User+PubNub.swift
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,12 @@
//

import Foundation

import PubNub

/// Protocol interface to manage `PubNubUser` entities using closures
public protocol PubNubUserInterface {
/// A copy of the configuration object used for this session
var configuration: PubNubConfiguration { get }

/// Session used for performing request/response REST calls
var networkSession: SessionReplaceable { get }

Expand Down Expand Up @@ -221,6 +219,7 @@ public extension PubNubUserInterface {
(requestConfig.customSession ?? networkSession)?
.route(
router,
requestOperator: configuration.automaticRetry?[.appContext],
responseDecoder: FetchMultipleValueResponseDecoder<PubNubUser>(),
responseQueue: requestConfig.responseQueue
) { result in
Expand Down Expand Up @@ -248,6 +247,7 @@ public extension PubNubUserInterface {
(requestConfig.customSession ?? networkSession)
.route(
router,
requestOperator: configuration.automaticRetry?[.appContext],
responseDecoder: FetchSingleValueResponseDecoder<PubNubUser>(),
responseQueue: requestConfig.responseQueue
) {
Expand Down Expand Up @@ -288,6 +288,7 @@ public extension PubNubUserInterface {
(requestConfig.customSession ?? networkSession)
.route(
router,
requestOperator: configuration.automaticRetry?[.appContext],
responseDecoder: FetchSingleValueResponseDecoder<PubNubUser>(),
responseQueue: requestConfig.responseQueue
) { result in
Expand Down Expand Up @@ -336,6 +337,7 @@ public extension PubNubUserInterface {
(requestConfig.customSession ?? networkSession)
.route(
router,
requestOperator: configuration.automaticRetry?[.appContext],
responseDecoder: FetchStatusResponseDecoder(),
responseQueue: requestConfig.responseQueue
) { result in
Expand Down
12 changes: 9 additions & 3 deletions Sources/PubNub/APIs/File+PubNub.swift
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public extension PubNub {
) {
route(
FileManagementRouter(.list(channel: channel, limit: limit, next: next), configuration: configuration),
requestOperator: configuration.automaticRetry?[.files],
responseDecoder: FileListResponseDecoder(),
custom: requestConfig
) { result in
Expand Down Expand Up @@ -60,6 +61,7 @@ public extension PubNub {
) {
route(
FileManagementRouter(.delete(channel: channel, fileId: fileId, filename: filename), configuration: configuration),
requestOperator: configuration.automaticRetry?[.files],
responseDecoder: FileGeneralSuccessResponseDecoder(),
custom: requestConfig
) { result in
Expand Down Expand Up @@ -137,6 +139,7 @@ public extension PubNub {
.generateURL(channel: channel, body: .init(name: remoteFilename)),
configuration: configuration
),
requestOperator: configuration.automaticRetry?[.files],
responseDecoder: FileGenerateResponseDecoder(),
custom: requestConfig
) { [configuration] result in
Expand Down Expand Up @@ -225,9 +228,12 @@ public extension PubNub {
configuration: configuration
)

route(router,
responseDecoder: PublishResponseDecoder(),
custom: request.customRequestConfig) { result in
route(
router,
requestOperator: configuration.automaticRetry?[.files],
responseDecoder: PublishResponseDecoder(),
custom: request.customRequestConfig
) { result in
completion?(result.map { $0.payload.timetoken })
}
}
Expand Down
113 changes: 113 additions & 0 deletions Sources/PubNub/EventEngine/Core/Dispatcher.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
//
// Dispatcher.swift
//
// Copyright (c) PubNub Inc.
// All rights reserved.
//
// This source code is licensed under the license found in the
// LICENSE file in the root directory of this source tree.
//

import Foundation

// MARK: - DispatcherListener

struct DispatcherListener<Event> {
let onAnyInvocationCompleted: (([Event]) -> Void)
}

// MARK: - Dispatcher

protocol Dispatcher<Invocation, Event, Dependencies> {
associatedtype Invocation: AnyEffectInvocation
associatedtype Event
associatedtype Dependencies

func dispatch(
invocations: [EffectInvocation<Invocation>],
with dependencies: EventEngineDependencies<Dependencies>,
notify listener: DispatcherListener<Event>
)
}

// MARK: - EffectDispatcher

class EffectDispatcher<Invocation: AnyEffectInvocation, Event, Dependencies>: Dispatcher {
private let factory: any EffectHandlerFactory<Invocation, Event, Dependencies>
private let effectsCache = EffectsCache<Event>()

init(factory: some EffectHandlerFactory<Invocation, Event, Dependencies>) {
self.factory = factory
}

func hasPendingInvocation(_ invocation: Invocation) -> Bool {
effectsCache.hasPendingEffect(with: invocation.id)
}

func dispatch(
invocations: [EffectInvocation<Invocation>],
with dependencies: EventEngineDependencies<Dependencies>,
notify listener: DispatcherListener<Event>
) {
invocations.forEach {
switch $0 {
case .managed(let invocation):
executeEffect(
effect: factory.effect(for: invocation, with: dependencies),
storageId: invocation.id,
notify: listener
)
case .regular(let invocation):
executeEffect(
effect: factory.effect(for: invocation, with: dependencies),
storageId: UUID().uuidString,
notify: listener
)
case .cancel(let cancelInvocation):
effectsCache.getEffect(with: cancelInvocation.id)?.cancelTask()
effectsCache.removeEffect(id: cancelInvocation.id)
}
}
}

private func executeEffect(
effect: some EffectHandler<Event>,
storageId id: String,
notify listener: DispatcherListener<Event>
) {
effectsCache.put(effect: effect, with: id)
effect.performTask { [weak effectsCache] results in
effectsCache?.removeEffect(id: id)
listener.onAnyInvocationCompleted(results)
}
}
}

// MARK: - EffectsCache

fileprivate class EffectsCache<Event> {
private var managedEffects: Atomic<[String: EffectWrapper<Event>]> = Atomic([:])

func hasPendingEffect(with id: String) -> Bool {
managedEffects.lockedRead { $0[id] } != nil
}

func put(effect: some EffectHandler<Event>, with id: String) {
managedEffects.lockedWrite { $0[id] = EffectWrapper<Event>(id: id, effect: effect) }
}

func getEffect(with id: String) -> (any EffectHandler<Event>)? {
managedEffects.lockedRead() { $0[id] }?.effect
}

func removeEffect(id: String) {
managedEffects.lockedWrite { $0[id] = nil }
}
}

// MARK: - EffectWrapper

fileprivate struct EffectWrapper<Action> {
let id: String
let effect: any EffectHandler<Action>
}
66 changes: 66 additions & 0 deletions Sources/PubNub/EventEngine/Core/EffectHandler.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
//
// EffectHandler.swift
//
// Copyright (c) PubNub Inc.
// All rights reserved.
//
// This source code is licensed under the license found in the
// LICENSE file in the root directory of this source tree.
//
import Foundation

// MARK: - EffectHandlerFactory

protocol EffectHandlerFactory<Invocation, Event, Dependencies> {
associatedtype Invocation
associatedtype Event
associatedtype Dependencies

func effect(
for invocation: Invocation,
with dependencies: EventEngineDependencies<Dependencies>
) -> any EffectHandler<Event>
}

// MARK: - EffectHandler

protocol EffectHandler<Event> {
associatedtype Event

func performTask(completionBlock: @escaping ([Event]) -> Void)
func cancelTask()
}

extension EffectHandler {
func cancelTask() {}
}

// MARK: - Delayed Effect Handler

protocol DelayedEffectHandler: AnyObject, EffectHandler {
var workItem: DispatchWorkItem? { get set }

func delayInterval() -> TimeInterval?
func onEarlyExit(notify completionBlock: @escaping ([Event]) -> Void)
func onDelayExpired(notify completionBlock: @escaping ([Event]) -> Void)
}

extension DelayedEffectHandler {
func performTask(completionBlock: @escaping ([Event]) -> Void) {
guard let delay = delayInterval() else {
onEarlyExit(notify: completionBlock); return
}
let workItem = DispatchWorkItem() { [weak self] in
self?.onDelayExpired(notify: completionBlock)
}
DispatchQueue.global(qos: .default).asyncAfter(
deadline: .now() + delay,
execute: workItem
)
self.workItem = workItem
}

func cancelTask() {
workItem?.cancel()
}
}
Loading

0 comments on commit ce9360d

Please sign in to comment.