Skip to content

Commit

Permalink
Presence & Subscribe Event Engine
Browse files Browse the repository at this point in the history
* PubNubConfiguration for both Subscribe & Presence EE
* Fixes for Presence's contract tests
  • Loading branch information
jguz-pubnub committed Nov 27, 2023
1 parent 46cd8c9 commit b4f324e
Show file tree
Hide file tree
Showing 17 changed files with 83 additions and 46 deletions.
4 changes: 2 additions & 2 deletions Sources/PubNub/EventEngine/Core/EventEngineFactory.swift
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ typealias PresenceEngine = EventEngine<(any PresenceState), Presence.Event, Pres

class EventEngineFactory {
func subscribeEngine(
with configuration: SubscriptionConfiguration,
with configuration: PubNubConfiguration,
dispatcher: some Dispatcher<Subscribe.Invocation, Subscribe.Event, Subscribe.EngineInput>,
transition: some TransitionProtocol<any SubscribeState, Subscribe.Event, Subscribe.Invocation>
) -> SubscribeEngine {
Expand All @@ -45,7 +45,7 @@ class EventEngineFactory {
}

func presenceEngine(
with configuration: SubscriptionConfiguration,
with configuration: PubNubConfiguration,
dispatcher: some Dispatcher<Presence.Invocation, Presence.Event, Presence.EngineInput>,
transition: some TransitionProtocol<any PresenceState, Presence.Event, Presence.Invocation>
) -> PresenceEngine {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ class HeartbeatEffect: EffectHandler {
}

func performTask(completionBlock: @escaping ([Presence.Event]) -> Void) {
guard request.configuration.heartbeatInterval > 0 else {
completionBlock([]); return
}
request.execute() { result in
switch result {
case .success(_):
Expand Down
3 changes: 3 additions & 0 deletions Sources/PubNub/EventEngine/Presence/Effects/LeaveEffect.swift
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ class LeaveEffect: EffectHandler {
}

func performTask(completionBlock: @escaping ([Presence.Event]) -> Void) {
guard !request.configuration.supressLeaveEvents else {
completionBlock([]); return
}
request.execute() { result in
switch result {
case .success(_):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import Foundation
class PresenceHeartbeatRequest {
let channels: [String]
let groups: [String]
let configuration: SubscriptionConfiguration
let configuration: PubNubConfiguration

private let session: SessionReplaceable
private let sessionResponseQueue: DispatchQueue
Expand All @@ -39,7 +39,7 @@ class PresenceHeartbeatRequest {
init(
channels: [String],
groups: [String],
configuration: SubscriptionConfiguration,
configuration: PubNubConfiguration,
session: SessionReplaceable,
sessionResponseQueue: DispatchQueue
) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,16 @@ import Foundation
class PresenceLeaveRequest {
let channels: [String]
let groups: [String]
let configuration: PubNubConfiguration

private let configuration: SubscriptionConfiguration
private let session: SessionReplaceable
private let sessionResponseQueue: DispatchQueue
private var request: RequestReplaceable?

init(
channels: [String],
groups: [String],
configuration: SubscriptionConfiguration,
configuration: PubNubConfiguration,
session: SessionReplaceable,
sessionResponseQueue: DispatchQueue
) {
Expand Down
2 changes: 1 addition & 1 deletion Sources/PubNub/EventEngine/Presence/Presence.swift
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ extension Presence {

extension Presence {
struct EngineInput {
let configuration: SubscriptionConfiguration
let configuration: PubNubConfiguration
}
}

Expand Down
30 changes: 14 additions & 16 deletions Sources/PubNub/EventEngine/Presence/PresenceTransition.swift
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ class PresenceTransition: TransitionProtocol {
func canTransition(from state: State, dueTo event: Event) -> Bool {
switch event {
case .joined(_,_):
return configuration.heartbeatInterval > 0
return true
case .left(_,_):
return !(state is Presence.HeartbeatInactive) && !configuration.supressLeaveEvents
return !(state is Presence.HeartbeatInactive)
case .heartbeatSuccess:
return state is Presence.Heartbeating || state is Presence.HeartbeatReconnecting
case .heartbeatFailed(_):
Expand All @@ -65,13 +65,13 @@ class PresenceTransition: TransitionProtocol {
switch state {
case is Presence.Heartbeating:
return [.regular(.heartbeat(channels: state.channels, groups: state.input.groups))]
case is Presence.HeartbeatCooldown:
return [.managed(.wait)]
case let state as Presence.HeartbeatReconnecting:
return [.managed(.delayedHeartbeat(
channels: state.channels, groups: state.groups,
retryAttempt: state.retryAttempt, error: state.error
))]
case is Presence.HeartbeatCooldown:
return [.managed(.wait)]
default:
return []
}
Expand All @@ -93,9 +93,9 @@ class PresenceTransition: TransitionProtocol {

switch event {
case .joined(let channels, let groups):
results = heartbeatingTransition(from: state, joined: channels, and: groups)
results = heartbeatingTransition(from: state, joining: (channels: channels, groups: groups))
case .left(let channels, let groups):
results = heartbeatingTransition(from: state, left: channels, and: groups)
results = heartbeatingTransition(from: state, leaving: (channels: channels, groups: groups))
case .heartbeatSuccess:
results = heartbeatSuccessTransition(from: state)
case .heartbeatFailed(let error):
Expand All @@ -122,12 +122,11 @@ class PresenceTransition: TransitionProtocol {
fileprivate extension PresenceTransition {
func heartbeatingTransition(
from state: State,
joined channels: [String],
and groups: [String]
joining: (channels: [String], groups: [String])
) -> TransitionResult<State, Invocation> {
let newInput = state.input + PresenceInput(
channels: channels,
groups: groups
channels: joining.channels,
groups: joining.groups
)
if state is Presence.HeartbeatStopped {
return TransitionResult(state: Presence.HeartbeatStopped(input: newInput))
Expand All @@ -140,12 +139,11 @@ fileprivate extension PresenceTransition {
fileprivate extension PresenceTransition {
func heartbeatingTransition(
from state: State,
left channels: [String],
and groups: [String]
leaving: (channels: [String], groups: [String])
) -> TransitionResult<State, Invocation> {
let newInput = state.input - PresenceInput(
channels: channels,
groups: groups
channels: leaving.channels,
groups: leaving.groups
)
if state is Presence.HeartbeatStopped {
return TransitionResult(
Expand All @@ -155,12 +153,12 @@ fileprivate extension PresenceTransition {
} else if newInput.isEmpty {
return TransitionResult(
state: Presence.HeartbeatInactive(),
invocations: [.regular(.leave(channels: channels, groups: groups))]
invocations: [.regular(.leave(channels: leaving.channels, groups: leaving.groups))]
)
} else {
return TransitionResult(
state: Presence.Heartbeating(input: newInput),
invocations: [.regular(.leave(channels: channels, groups: groups))]
invocations: [.regular(.leave(channels: leaving.channels, groups: leaving.groups))]
)
}
}
Expand Down
4 changes: 2 additions & 2 deletions Sources/PubNub/EventEngine/Subscribe/Subscribe.swift
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,10 @@ extension Subscribe {

extension Subscribe {
struct EngineInput {
let configuration: SubscriptionConfiguration
let configuration: PubNubConfiguration
let listeners: [BaseSubscriptionListener]

init(configuration: SubscriptionConfiguration, listeners: [BaseSubscriptionListener] = []) {
init(configuration: PubNubConfiguration, listeners: [BaseSubscriptionListener] = []) {
self.configuration = configuration
self.listeners = listeners
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,13 @@ class EventEngineSubscriptionSessionStrategy: SubscriptionSessionStrategy {
let uuid = UUID()

var privateListeners: WeakSet<ListenerType> = WeakSet([])
var configuration: SubscriptionConfiguration
var configuration: PubNubConfiguration
var subscribeEngine: SubscribeEngine
var presenceEngine: PresenceEngine
var previousTokenResponse: SubscribeCursor?

internal init(
configuration: SubscriptionConfiguration,
configuration: PubNubConfiguration,
subscribeEngine: SubscribeEngine,
presenceEngine: PresenceEngine
) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class LegacySubscriptionSessionStrategy: SubscriptionSessionStrategy {
let sessionStream: SessionListener
let responseQueue: DispatchQueue

var configuration: SubscriptionConfiguration
var configuration: PubNubConfiguration
var privateListeners: WeakSet<ListenerType> = WeakSet([])
var filterExpression: String?
var messageCache = [SubscribeMessagePayload?].init(repeating: nil, count: 100)
Expand Down Expand Up @@ -85,7 +85,7 @@ class LegacySubscriptionSessionStrategy: SubscriptionSessionStrategy {
var internalState = Atomic<SubscriptionState>(SubscriptionState())

internal init(
configuration: SubscriptionConfiguration,
configuration: PubNubConfiguration,
network subscribeSession: SessionReplaceable,
presenceSession: SessionReplaceable
) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import Foundation

protocol SubscriptionSessionStrategy: EventStreamEmitter where ListenerType == BaseSubscriptionListener {
var uuid: UUID { get }
var configuration: SubscriptionConfiguration { get set }
var configuration: PubNubConfiguration { get set }
var subscribedChannels: [String] { get }
var subscribedChannelGroups: [String] { get }
var subscriptionCount: Int { get }
Expand Down
2 changes: 1 addition & 1 deletion Sources/PubNub/Subscription/SubscriptionSession.swift
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public class SubscriptionSession {
strategy.previousTokenResponse
}

var configuration: SubscriptionConfiguration {
var configuration: PubNubConfiguration {
get {
strategy.configuration
} set {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,13 @@ class PubNubPresenceEngineContractTestsSteps: PubNubEventEngineContractTestsStep
wrappedInstance: PresenceTransition(configuration: configuration)
)

let configuration = self.configuration
let factory = EventEngineFactory()
let subscriptionSession = SubscriptionSession(
strategy: EventEngineSubscriptionSessionStrategy(
configuration: self.configuration,
configuration: configuration,
subscribeEngine: factory.subscribeEngine(
with: self.configuration,
with: configuration,
dispatcher: EffectDispatcher(factory: SubscribeEffectFactory(session: HTTPSession(
configuration: URLSessionConfiguration.subscription,
sessionQueue: DispatchQueue(label: "Subscribe Response Queue"),
Expand Down Expand Up @@ -173,6 +174,10 @@ class PubNubPresenceEngineContractTestsSteps: PubNubEventEngineContractTestsStep
XCTAssertNotNil(self.waitForPresenceChanges(self.client, count: 3))
}

Then("^I wait '([0-9]+)' seconds$") { args, _ in
self.waitFor(delay: TimeInterval(args!.first!)!)
}

Then("^I wait for getting Presence left events$") { args, _ in
XCTAssertNotNil(self.waitForPresenceChanges(self.client, count: 2))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,24 +164,27 @@ class PubNubSubscribeEngineContractTestsSteps: PubNubEventEngineContractTestsSte

Given("a linear reconnection policy with 3 retries") { args, _ in
self.replacePubNubConfiguration(with: PubNubConfiguration(
publishKey: defaultPublishKey,
subscribeKey: defaultSubscribeKey,
userId: UUID().uuidString,
useSecureConnections: false,
origin: mockServerAddress,
publishKey: self.configuration.publishKey,
subscribeKey: self.configuration.subscribeKey,
userId: self.configuration.userId,
useSecureConnections: self.configuration.useSecureConnections,
origin: self.configuration.origin,
automaticRetry: AutomaticRetry(retryLimit: 3, policy: .linear(delay: 0.5)),
heartbeatInterval: 0,
supressLeaveEvents: true,
enableEventEngine: true
))
}

Given("the demo keyset with event engine enabled") { _, _ in
self.replacePubNubConfiguration(with: PubNubConfiguration(
publishKey: defaultPublishKey,
subscribeKey: defaultSubscribeKey,
userId: UUID().uuidString,
useSecureConnections: false,
origin: mockServerAddress,
publishKey: self.configuration.publishKey,
subscribeKey: self.configuration.subscribeKey,
userId: self.configuration.userId,
useSecureConnections: self.configuration.useSecureConnections,
origin: self.configuration.origin,
heartbeatInterval: 0,
supressLeaveEvents: true,
enableEventEngine: true
))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class DelayedHeartbeatEffectTests: XCTestCase {
super.tearDown()
}

func test_DelayedHeartbeatEffectFiresImmediatelyForFirstAttempt() {
func test_DelayedHeartbeatEffectForFirstAttempt() {
let expectation = XCTestExpectation()
expectation.expectationDescription = "Effect Completion Expectation"
expectation.assertForOverFulfill = true
Expand Down Expand Up @@ -160,7 +160,7 @@ fileprivate extension DelayedHeartbeatEffectTests {
factory.effect(
for: .delayedHeartbeat(
channels: ["channel-1", "channel-2"], groups: ["group-1", "group-2"],
retryAttempt: attempt, error: PubNubError(.unknown)
retryAttempt: attempt, error: error
),
with: EventEngineCustomInput(value: Presence.EngineInput(
configuration: PubNubConfiguration(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ class HeartbeatEffectTests: XCTestCase {
private let config = PubNubConfiguration(
publishKey: "pubKey",
subscribeKey: "subKey",
userId: "userId"
userId: "userId",
heartbeatInterval: 30
)

override func setUp() {
Expand Down
24 changes: 24 additions & 0 deletions Tests/PubNubTests/EventEngine/Presence/LeaveEffectTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,30 @@ class LeaveEffectTests: XCTestCase {
}
wait(for: [expectation], timeout: 0.5)
}

func test_LeaveEffectForFailedRequest() {
let expectation = XCTestExpectation()
expectation.expectationDescription = "Effect Completion Expectation"
expectation.assertForOverFulfill = true

mockResponse(GenericServicePayloadResponse(status: 500))

let config = PubNubConfiguration(
publishKey: "pubKey",
subscribeKey: "subKey",
userId: "userId",
heartbeatInterval: 2
)
let effect = factory.effect(
for: .leave(channels: ["c1", "c2"], groups: ["g1", "g2"]),
with: EventEngineCustomInput(value: Presence.EngineInput(configuration: config))
)
effect.performTask { returnedEvents in
XCTAssertTrue(returnedEvents.isEmpty)
expectation.fulfill()
}
wait(for: [expectation], timeout: 0.5)
}
}

fileprivate extension LeaveEffectTests {
Expand Down

0 comments on commit b4f324e

Please sign in to comment.