Skip to content

Commit

Permalink
Added strategies to handle subscription loop #2
Browse files Browse the repository at this point in the history
  • Loading branch information
jguz-pubnub committed Oct 27, 2023
1 parent 04ba22d commit 2f0091e
Show file tree
Hide file tree
Showing 21 changed files with 202 additions and 326 deletions.
12 changes: 8 additions & 4 deletions PubNub.xcodeproj/project.pbxproj
Original file line number Diff line number Diff line change
Expand Up @@ -3580,6 +3580,7 @@
"@loader_path/../Frameworks",
"@loader_path/Frameworks",
);
MACOSX_DEPLOYMENT_TARGET = 10.13;
OTHER_CFLAGS = "$(inherited)";
OTHER_LDFLAGS = "$(inherited)";
OTHER_SWIFT_FLAGS = "$(inherited)";
Expand Down Expand Up @@ -3613,6 +3614,7 @@
"@loader_path/../Frameworks",
"@loader_path/Frameworks",
);
MACOSX_DEPLOYMENT_TARGET = 10.13;
OTHER_CFLAGS = "$(inherited)";
OTHER_LDFLAGS = "$(inherited)";
OTHER_SWIFT_FLAGS = "$(inherited)";
Expand Down Expand Up @@ -4307,7 +4309,7 @@
);
HEADER_SEARCH_PATHS = "$(inherited)";
INFOPLIST_FILE = PubNub.xcodeproj/PubNubContractTests_Info.plist;
IPHONEOS_DEPLOYMENT_TARGET = 9.0;
IPHONEOS_DEPLOYMENT_TARGET = 11.0;
LD_RUNPATH_SEARCH_PATHS = (
"$(inherited)",
"@loader_path/../Frameworks",
Expand Down Expand Up @@ -4349,7 +4351,7 @@
);
HEADER_SEARCH_PATHS = "$(inherited)";
INFOPLIST_FILE = PubNub.xcodeproj/PubNubContractTests_Info.plist;
IPHONEOS_DEPLOYMENT_TARGET = 9.0;
IPHONEOS_DEPLOYMENT_TARGET = 11.0;
LD_RUNPATH_SEARCH_PATHS = (
"$(inherited)",
"@loader_path/../Frameworks",
Expand Down Expand Up @@ -4391,7 +4393,7 @@
);
HEADER_SEARCH_PATHS = "$(inherited)";
INFOPLIST_FILE = PubNub.xcodeproj/PubNubContractTests_Info.plist;
IPHONEOS_DEPLOYMENT_TARGET = 9.0;
IPHONEOS_DEPLOYMENT_TARGET = 11.0;
LD_RUNPATH_SEARCH_PATHS = (
"$(inherited)",
"@loader_path/../Frameworks",
Expand Down Expand Up @@ -4434,7 +4436,7 @@
);
HEADER_SEARCH_PATHS = "$(inherited)";
INFOPLIST_FILE = PubNub.xcodeproj/PubNubContractTests_Info.plist;
IPHONEOS_DEPLOYMENT_TARGET = 9.0;
IPHONEOS_DEPLOYMENT_TARGET = 11.0;
LD_RUNPATH_SEARCH_PATHS = (
"$(inherited)",
"@loader_path/../Frameworks",
Expand Down Expand Up @@ -4670,6 +4672,7 @@
"@loader_path/../Frameworks",
"@loader_path/Frameworks",
);
MACOSX_DEPLOYMENT_TARGET = 10.13;
OTHER_CFLAGS = "$(inherited)";
OTHER_LDFLAGS = "$(inherited)";
OTHER_SWIFT_FLAGS = "$(inherited)";
Expand Down Expand Up @@ -4704,6 +4707,7 @@
"@loader_path/../Frameworks",
"@loader_path/Frameworks",
);
MACOSX_DEPLOYMENT_TARGET = 10.13;
OTHER_CFLAGS = "$(inherited)";
OTHER_LDFLAGS = "$(inherited)";
OTHER_SWIFT_FLAGS = "$(inherited)";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class PresenceHeartbeatRequest {
func execute(completionBlock: @escaping (Result<Void, PubNubError>) -> Void) {
request = session.request(with: PresenceRouter(
.heartbeat(channels: channels, groups: groups, presenceTimeout: configuration.durationUntilTimeout),
configuration: configuration), requestOperator: nil
configuration: configuration, eventEngineEnabled: true), requestOperator: nil
)
request?.validate().response(on: sessionResponseQueue, decoder: GenericServiceResponseDecoder()) { result in
switch result {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class PresenceLeaveRequest {
func execute(completionBlock: @escaping (Result<Void, PubNubError>) -> Void) {
request = session.request(with: PresenceRouter(
.leave(channels: channels, groups: groups),
configuration: configuration), requestOperator: nil
configuration: configuration, eventEngineEnabled: true), requestOperator: nil
)
request?.validate().response(on: sessionResponseQueue, decoder: GenericServiceResponseDecoder()) { result in
switch result {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ class SubscribeRequest {
timetoken: timetoken,
region: region?.description ?? nil,
heartbeat: configuration.durationUntilTimeout,
filter: configuration.filterExpression
filter: configuration.filterExpression,
eventEngineEnabled: true
), configuration: configuration
), requestOperator: nil
)
Expand Down
36 changes: 0 additions & 36 deletions Sources/PubNub/Events/Subscription/SubscriptionStream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -51,38 +51,8 @@ public enum SubscriptionChangeEvent {
}
}

/// The header of a PubNub subscribe response for zero or more events
public struct SubscribeResponseHeader {
/// The channels that are actively subscribed
public let channels: [PubNubChannel]
/// The groups that are actively subscribed
public let groups: [PubNubChannel]
/// The most recent successful Timetoken used in subscriptionstatus
public let previous: SubscribeCursor?
/// Timetoken that will be used on the next subscription cycle
public let next: SubscribeCursor?

public init(
channels: [PubNubChannel],
groups: [PubNubChannel],
previous: SubscribeCursor?,
next: SubscribeCursor?
) {
self.channels = channels
self.groups = groups
self.previous = previous
self.next = next
}
}

/// Local events emitted from the Subscribe method
public enum PubNubSubscribeEvent {
/// A change in the Channel or Group state occured
@available(*, unavailable)
case subscriptionChanged(SubscriptionChangeEvent)
/// A subscribe response was received
@available(*, unavailable)
case responseReceived(SubscribeResponseHeader)
/// The connection status of the PubNub subscription was changed
case connectionChanged(ConnectionStatus)
/// An error was received
Expand All @@ -100,9 +70,6 @@ public enum PubNubCoreEvent {
case signalReceived(PubNubMessage)
/// A change in the subscription connection has occurred
case connectionStatusChanged(ConnectionStatus)
/// A change in the subscribed channels or groups has occurred
@available(*, unavailable)
case subscriptionChanged(SubscriptionChangeEvent)
/// A presence change has been received
case presenceChanged(PubNubPresenceChange)
/// A User object has been updated
Expand Down Expand Up @@ -175,9 +142,6 @@ public final class CoreListener: BaseSubscriptionListener {
public var didReceiveBatchSubscription: (([SubscriptionEvent]) -> Void)?
/// Receiver for all subscription events
public var didReceiveSubscription: ((SubscriptionEvent) -> Void)?
/// Receiver for changes in the subscribe/unsubscribe status of channels/groups
@available(*, unavailable)
public var didReceiveSubscriptionChange: ((SubscriptionChangeEvent) -> Void)?
/// Receiver for status (Connection & Error) events
public var didReceiveStatus: ((StatusEvent) -> Void)?
/// Receiver for presence events
Expand Down
1 change: 1 addition & 0 deletions Sources/PubNub/Networking/HTTPRouter.swift
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ enum QueryKey: String {
case filter
case sort
case descending = "desc"
case eventEngine = "ee"
}

/// The PubNub Key requirement for a given Endpoint
Expand Down
10 changes: 9 additions & 1 deletion Sources/PubNub/Networking/Routers/PresenceRouter.swift
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,15 @@ struct PresenceRouter: HTTPRouter {
}

// Init
init(_ endpoint: Endpoint, configuration: RouterConfiguration) {
init(_ endpoint: Endpoint, configuration: RouterConfiguration, eventEngineEnabled: Bool = false) {
self.endpoint = endpoint
self.configuration = configuration
self.eventEngineEnabled = eventEngineEnabled
}

var endpoint: Endpoint
var configuration: RouterConfiguration
var eventEngineEnabled: Bool

// Protocol Properties
var service: PubNubService {
Expand Down Expand Up @@ -143,8 +145,14 @@ struct PresenceRouter: HTTPRouter {
case let .heartbeat(_, groups, presenceTimeout):
query.appendIfNotEmpty(key: .channelGroup, value: groups)
query.appendIfPresent(key: .heartbeat, value: presenceTimeout?.description)
if eventEngineEnabled {
query.append(URLQueryItem(key: .eventEngine, value: nil))
}
case let .leave(_, groups):
query.appendIfNotEmpty(key: .channelGroup, value: groups)
if eventEngineEnabled {
query.append(URLQueryItem(key: .eventEngine, value: nil))
}
case let .hereNow(_, groups, includeUUIDs, includeState):
query.appendIfNotEmpty(key: .channelGroup, value: groups)
query.append(URLQueryItem(key: .disableUUIDs, value: (!includeUUIDs).stringNumber))
Expand Down
18 changes: 12 additions & 6 deletions Sources/PubNub/Networking/Routers/SubscribeRouter.swift
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,11 @@ import Foundation
struct SubscribeRouter: HTTPRouter {
// Nested Endpoint
enum Endpoint: CaseAccessible, CustomStringConvertible {
case subscribe(channels: [String], groups: [String],
timetoken: Timetoken?, region: String?,
heartbeat: UInt?, filter: String?)
case subscribe(
channels: [String], groups: [String],
timetoken: Timetoken?, region: String?,
heartbeat: UInt?, filter: String?, eventEngineEnabled: Bool
)

var description: String {
switch self {
Expand Down Expand Up @@ -66,7 +68,7 @@ struct SubscribeRouter: HTTPRouter {
let path: String

switch endpoint {
case let .subscribe(channels, _, _, _, _, _):
case let .subscribe(channels, _, _, _, _, _, _):
path = "/v2/subscribe/\(subscribeKey)/\(channels.commaOrCSVString.urlEncodeSlash)/0"
}

Expand All @@ -77,12 +79,16 @@ struct SubscribeRouter: HTTPRouter {
var query = defaultQueryItems

switch endpoint {
case let .subscribe(_, groups, timetoken, region, heartbeat, filter):
case let .subscribe(_, groups, timetoken, region, heartbeat, filter, eventEngineEnabled):
query.appendIfNotEmpty(key: .channelGroup, value: groups)
query.appendIfPresent(key: .timetokenShort, value: timetoken?.description)
query.appendIfPresent(key: .regionShort, value: region?.description)
query.appendIfPresent(key: .filterExpr, value: filter)
query.appendIfPresent(key: .heartbeat, value: heartbeat?.description)

if eventEngineEnabled {
query.append(URLQueryItem(key: .eventEngine, value: nil))
}
}

return .success(query)
Expand All @@ -91,7 +97,7 @@ struct SubscribeRouter: HTTPRouter {
// Validated
var validationErrorDetail: String? {
switch endpoint {
case let .subscribe(channels, groups, _, _, _, _):
case let .subscribe(channels, groups, _, _, _, _, _):
return isInvalidForReason(
(channels.isEmpty && groups.isEmpty, ErrorDescription.missingChannelsAnyGroups))
}
Expand Down
6 changes: 4 additions & 2 deletions Sources/PubNub/PubNubConfiguration.swift
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ public struct PubNubConfiguration: Hashable {
heartbeatInterval: UInt = 0,
supressLeaveEvents: Bool = false,
requestMessageCountThreshold: UInt = 100,
filterExpression: String? = nil
filterExpression: String? = nil,
enableEventEngine: Bool = false
) {
guard userId.trimmingCharacters(in: .whitespacesAndNewlines).count > 0 else {
preconditionFailure("UserId should not be empty.")
Expand All @@ -127,6 +128,7 @@ public struct PubNubConfiguration: Hashable {
self.supressLeaveEvents = supressLeaveEvents
self.requestMessageCountThreshold = requestMessageCountThreshold
self.filterExpression = filterExpression
self.enableEventEngine = enableEventEngine
}

// swiftlint:disable:next line_length
Expand Down Expand Up @@ -220,7 +222,7 @@ public struct PubNubConfiguration: Hashable {
public var useInstanceId: Bool
/// Whether a request identifier should be included on outgoing requests
public var useRequestId: Bool
/// A flag describing whether to enable new strategy for handling subscription loop
/// A flag describing whether to enable the new strategy for handling subscription loop
public var enableEventEngine: Bool = false
/// Reconnection policy which will be used if/when a request fails
public var automaticRetry: AutomaticRetry?
Expand Down
30 changes: 6 additions & 24 deletions Sources/PubNub/Subscription/ConnectionStatus.swift
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,6 @@ import Foundation

/// Status of a connection to a remote system
public enum ConnectionStatus: Equatable {
/// Attempting to connect to a remote system
@available(*, unavailable)
case connecting
/// Attempting to reconnect to a remote system
@available(*, unavailable)
case reconnecting
/// Successfully connected to a remote system
case connected
/// Explicit disconnect from a remote system
Expand All @@ -47,7 +41,7 @@ public enum ConnectionStatus: Equatable {
/// If the connection is connected or attempting to connect
public var isActive: Bool {
switch self {
case .connecting, .connected, .reconnecting:
case .connected:
return true
default:
return false
Expand All @@ -65,27 +59,15 @@ public enum ConnectionStatus: Equatable {

func canTransition(to state: ConnectionStatus) -> Bool {
switch (self, state) {
case (.connecting, .reconnecting):
return false
case (.connecting, _):
case (.connected, .disconnected):
return true
case (.connected, .connecting):
return false
case (.connected, _):
case (.disconnected, .connected):
return true
case (.reconnecting, .connecting):
return false
case (.reconnecting, _):
case (.connected, .disconnectedUnexpectedly):
return true
case (.disconnected, .connecting):
case (.disconnected, .connectionError):
return true
case (.disconnected, _):
return false
case (.disconnectedUnexpectedly, .connecting):
return true
case (.disconnectedUnexpectedly, _):
return false
case (.connectionError, _):
default:
return false
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,9 @@ class EventEngineSubscriptionSessionStrategy: SubscriptionSessionStrategy {

func subscribe(
to channels: [String],
and groups: [String] = [],
at cursor: SubscribeCursor? = nil,
withPresence: Bool = false
and groups: [String],
at cursor: SubscribeCursor?,
withPresence: Bool
) {
let newInput = subscribeEngine.state.input + SubscribeInput(
channels: channels.map { PubNubChannel(id: $0, withPresence: withPresence) },
Expand All @@ -134,7 +134,7 @@ class EventEngineSubscriptionSessionStrategy: SubscriptionSessionStrategy {
))
}

func reconnect(at cursor: SubscribeCursor? = nil) {
func reconnect(at cursor: SubscribeCursor?) {
let input = subscribeEngine.state.input
let channels = input.allSubscribedChannels
let groups = input.allSubscribedGroups
Expand All @@ -153,7 +153,7 @@ class EventEngineSubscriptionSessionStrategy: SubscriptionSessionStrategy {

// MARK: - Unsubscribe

func unsubscribe(from channels: [String], and groups: [String] = [], presenceOnly: Bool = false) {
func unsubscribe(from channels: [String], and groups: [String], presenceOnly: Bool) {
let newInput = subscribeEngine.state.input - (
channels: channels.map { presenceOnly ? $0.presenceChannelName : $0 },
groups: groups.map { presenceOnly ? $0.presenceChannelName : $0 }
Expand Down
Loading

0 comments on commit 2f0091e

Please sign in to comment.