Skip to content

Commit

Permalink
Presence & Subscribe EE
Browse files Browse the repository at this point in the history
* Added default static factory methods for Subscribe/PresenceEffectFactory
* Presence EE contract tests (finalizing)
* Fixes for DelayedHeartbeatEffect
  • Loading branch information
jguz-pubnub committed Dec 5, 2023
1 parent 2fb69f1 commit e3aecaa
Show file tree
Hide file tree
Showing 9 changed files with 166 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,25 @@ class DelayedHeartbeatEffect: DelayedEffectHandler {
}

func delayInterval() -> TimeInterval? {
switch retryAttempt {
case 0:
return 0
case 1:
return 0.5 * Double(configuration.durationUntilTimeout)
case 2:
return 0.5 * Double(configuration.durationUntilTimeout) - 1.0
default:
guard let automaticRetry = configuration.automaticRetry else {
return nil
}
guard automaticRetry.retryLimit > retryAttempt else {
return nil
}
guard let underlyingError = reason.underlying else {
return automaticRetry.policy.delay(for: retryAttempt)
}
guard let urlResponse = reason.affected.findFirst(by: PubNubError.AffectedValue.response) else {
return nil
}

let shouldRetry = automaticRetry.shouldRetry(
response: urlResponse,
error: underlyingError
)

return shouldRetry ? automaticRetry.policy.delay(for: retryAttempt) : nil
}

func onEarlyExit(notify completionBlock: @escaping ([Presence.Event]) -> Void) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,6 @@ class HeartbeatEffect: EffectHandler {
}

func performTask(completionBlock: @escaping ([Presence.Event]) -> Void) {
guard request.configuration.heartbeatInterval > 0 else {
completionBlock([]); return
}
request.execute() { result in
switch result {
case .success(_):
Expand Down
3 changes: 0 additions & 3 deletions Sources/PubNub/EventEngine/Presence/Effects/LeaveEffect.swift
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,6 @@ class LeaveEffect: EffectHandler {
}

func performTask(completionBlock: @escaping ([Presence.Event]) -> Void) {
guard !request.configuration.supressLeaveEvents else {
completionBlock([]); return
}
request.execute() { result in
switch result {
case .success(_):
Expand Down
26 changes: 16 additions & 10 deletions Sources/PubNub/EventEngine/Presence/PresenceTransition.swift
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class PresenceTransition: TransitionProtocol {
func canTransition(from state: State, dueTo event: Event) -> Bool {
switch event {
case .joined(_,_):
return true
return configuration.heartbeatInterval > 0
case .left(_,_):
return !(state is Presence.HeartbeatInactive)
case .heartbeatSuccess:
Expand All @@ -64,7 +64,10 @@ class PresenceTransition: TransitionProtocol {
private func onEntry(to state: State) -> [EffectInvocation<Invocation>] {
switch state {
case is Presence.Heartbeating:
return [.regular(.heartbeat(channels: state.channels, groups: state.input.groups))]
return [.regular(.heartbeat(
channels: state.channels,
groups: state.input.groups
))]
case let state as Presence.HeartbeatReconnecting:
return [.managed(.delayedHeartbeat(
channels: state.channels, groups: state.groups,
Expand Down Expand Up @@ -150,15 +153,14 @@ fileprivate extension PresenceTransition {
state: Presence.HeartbeatStopped(input: newInput),
invocations: []
)
} else if newInput.isEmpty {
return TransitionResult(
state: Presence.HeartbeatInactive(),
invocations: [.regular(.leave(channels: leaving.channels, groups: leaving.groups))]
)
} else {
let leaveInvocation = EffectInvocation.regular(Presence.Invocation.leave(
channels: leaving.channels,
groups: leaving.groups
))
return TransitionResult(
state: Presence.Heartbeating(input: newInput),
invocations: [.regular(.leave(channels: leaving.channels, groups: leaving.groups))]
state: newInput.isEmpty ? Presence.HeartbeatInactive() : Presence.Heartbeating(input: newInput),
invocations: configuration.supressLeaveEvents ? [] : [leaveInvocation]
)
}
}
Expand Down Expand Up @@ -216,9 +218,13 @@ fileprivate extension PresenceTransition {

fileprivate extension PresenceTransition {
func heartbeatInactiveTransition(from state: State) -> TransitionResult<State, Invocation> {
let leaveInvocation = EffectInvocation.regular(Presence.Invocation.leave(
channels: state.input.channels,
groups: state.input.groups
))
return TransitionResult(
state: Presence.HeartbeatInactive(),
invocations: [.regular(.leave(channels: state.input.channels, groups: state.input.groups))]
invocations: configuration.supressLeaveEvents ? []: [leaveInvocation]
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,8 @@ class EventEngineSubscriptionSessionStrategy: SubscriptionSessionStrategy {
))
}
sendPresenceEvent(event: .joined(
channels: newInput.presenceSubscribedChannels,
groups: newInput.presenceSubscribedGroups
channels: newInput.subscribedChannels,
groups: newInput.subscribedGroups
))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,9 @@ extension Presence.Event: ContractTestIdentifiable {
case .heartbeatSuccess:
return "HEARTBEAT_SUCCESS"
case .heartbeatFailed(_):
return "HEARTBEAT_FAILED"
return "HEARTBEAT_FAILURE"
case .heartbeatGiveUp(_):
return "HEARTBEAT_GIVE_UP"
return "HEARTBEAT_GIVEUP"
}
}
}
Expand Down Expand Up @@ -116,8 +116,8 @@ class PubNubPresenceEngineContractTestsSteps: PubNubEventEngineContractTestsStep
)
)
return PubNub(
configuration: self.configuration,
session: HTTPSession(session: URLSession.shared, delegate: HTTPSessionDelegate(), sessionQueue: .global(qos: .default)),
configuration: configuration,
session: HTTPSession(configuration: configuration.urlSessionConfiguration),
fileSession: URLSession(configuration: .pubnubBackground),
subscriptionSession: subscriptionSession
)
Expand All @@ -137,6 +137,20 @@ class PubNubPresenceEngineContractTestsSteps: PubNubEventEngineContractTestsStep
))
}

Given("a linear reconnection policy with 3 retries") { args, _ in
self.replacePubNubConfiguration(with: PubNubConfiguration(
publishKey: self.configuration.publishKey,
subscribeKey: self.configuration.subscribeKey,
userId: self.configuration.userId,
useSecureConnections: self.configuration.useSecureConnections,
origin: self.configuration.origin,
automaticRetry: AutomaticRetry(retryLimit: 3, policy: .linear(delay: 0.5)),
heartbeatInterval: 30,
supressLeaveEvents: true,
enableEventEngine: true
))
}

Given("^heartbeatInterval set to '([0-9]+)', timeout set to '([0-9]+)' and suppressLeaveEvents set to '(.*)'$") { args, _ in
self.replacePubNubConfiguration(with: PubNubConfiguration(
publishKey: self.configuration.publishKey,
Expand All @@ -156,6 +170,14 @@ class PubNubPresenceEngineContractTestsSteps: PubNubEventEngineContractTestsStep
let secondChannel = args?[1] ?? ""
let thirdChannel = args?[2] ?? ""

self.subscribeSynchronously(self.client, to: [firstChannel, secondChannel, thirdChannel], with: false)
}

When("^I join '(.*)', '(.*)', '(.*)' channels with presence$") { args, _ in
let firstChannel = args?[0] ?? ""
let secondChannel = args?[1] ?? ""
let thirdChannel = args?[2] ?? ""

self.subscribeSynchronously(self.client, to: [firstChannel, secondChannel, thirdChannel], with: true)
}

Expand All @@ -171,13 +193,17 @@ class PubNubPresenceEngineContractTestsSteps: PubNubEventEngineContractTestsStep
XCTAssertNotNil(self.waitForPresenceChanges(self.client, count: 2))
}

Then("^I leave '(.*)' and '(.*)' channels$") { args, _ in
Then("^I leave '(.*)' and '(.*)' channels with presence$") { args, _ in
let firstChannel = args?[0] ?? ""
let secondChannel = args?[1] ?? ""

self.client.unsubscribe(from: [firstChannel, secondChannel])
}

Then("^I receive an error in my heartbeat response$") { _, _ in
self.waitFor(delay: 3.0)
}

Match(["And"], "^The timeout expires$") { _, _ in
self.waitFor(delay: TimeInterval(self.configuration.durationUntilTimeout))
}
Expand All @@ -189,5 +215,10 @@ class PubNubPresenceEngineContractTestsSteps: PubNubEventEngineContractTestsStep
XCTAssertTrue(recordedEvents.elementsEqual(self.extractExpectedResults(from: value).events))
XCTAssertTrue(recordedInvocations.elementsEqual(self.extractExpectedResults(from: value).invocations))
}

Then("^I don't observe any Events and Invocations of the Presence EE") { args, value in
XCTAssertTrue(self.transitionDecorator.recordedEvents.isEmpty)
XCTAssertTrue(self.dispatcherDecorator.recordedInvocations.isEmpty)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ class PubNubSubscribeEngineContractTestsSteps: PubNubEventEngineContractTestsSte
)
return PubNub(
configuration: self.configuration,
session: HTTPSession(session: URLSession.shared, delegate: HTTPSessionDelegate(), sessionQueue: .global(qos: .default)),
session: HTTPSession(configuration: configuration.urlSessionConfiguration),
fileSession: URLSession(configuration: .pubnubBackground),
subscriptionSession: subscriptionSession
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,16 +59,16 @@ class DelayedHeartbeatEffectTests: XCTestCase {

mockResponse(GenericServicePayloadResponse(status: 200))

let timeout: UInt = 4
let effect = configureEffect(attempt: 0, durationUntilTimeout: timeout, error: PubNubError(.unknown))
let automaticRetry = AutomaticRetry(retryLimit: 3, policy: .linear(delay: 1.0))
let effect = configureEffect(attempt: 0, automaticRetry: automaticRetry, error: PubNubError(.unknown))
let startDate = Date()

effect.performTask { returnedEvents in
XCTAssertTrue(returnedEvents.elementsEqual([.heartbeatSuccess]))
XCTAssertEqual(Int(Date().timeIntervalSince(startDate)), 0)
XCTAssertEqual(Int(Date().timeIntervalSince(startDate)), 1)
expectation.fulfill()
}
wait(for: [expectation], timeout: 0.5)
wait(for: [expectation], timeout: 2.5)
}

func test_DelayedHeartbeatEffectIsShiftedForSecondAttempt() {
Expand All @@ -78,13 +78,13 @@ class DelayedHeartbeatEffectTests: XCTestCase {

mockResponse(GenericServicePayloadResponse(status: 200))

let timeout: UInt = 4
let effect = configureEffect(attempt: 1, durationUntilTimeout: timeout, error: PubNubError(.unknown))
let automaticRetry = AutomaticRetry(retryLimit: 3, policy: .linear(delay: 1.0))
let effect = configureEffect(attempt: 1, automaticRetry: automaticRetry, error: PubNubError(.unknown))
let startDate = Date()

effect.performTask { returnedEvents in
XCTAssertTrue(returnedEvents.elementsEqual([.heartbeatSuccess]))
XCTAssertEqual(Int(Date().timeIntervalSince(startDate)), Int(timeout) / 2)
XCTAssertEqual(Int(Date().timeIntervalSince(startDate)), 1)
expectation.fulfill()
}
wait(for: [expectation], timeout: 2.5)
Expand All @@ -97,16 +97,16 @@ class DelayedHeartbeatEffectTests: XCTestCase {

mockResponse(GenericServicePayloadResponse(status: 200))

let timeout: UInt = 4
let effect = configureEffect(attempt: 2, durationUntilTimeout: timeout, error: PubNubError(.unknown))
let automaticRetry = AutomaticRetry(retryLimit: 3, policy: .linear(delay: 1.0))
let effect = configureEffect(attempt: 2, automaticRetry: automaticRetry, error: PubNubError(.unknown))
let startDate = Date()

effect.performTask { returnedEvents in
XCTAssertTrue(returnedEvents.elementsEqual([.heartbeatSuccess]))
XCTAssertEqual(Int(Date().timeIntervalSince(startDate)), Int(0.5 * Double(timeout)) - 1)
XCTAssertEqual(Int(Date().timeIntervalSince(startDate)), 1)
expectation.fulfill()
}
wait(for: [expectation], timeout: 3.5)
wait(for: [expectation], timeout: 2.5)
}

func test_DelayedHeartbeatEffectFailure() {
Expand All @@ -116,33 +116,33 @@ class DelayedHeartbeatEffectTests: XCTestCase {

mockResponse(GenericServicePayloadResponse(status: 500))

let timeout: UInt = 4
let automaticRetry = AutomaticRetry(retryLimit: 3, policy: .linear(delay: 1.0))
let error = PubNubError(.unknown)
let effect = configureEffect(attempt: 0, durationUntilTimeout: timeout, error: error)
let effect = configureEffect(attempt: 0, automaticRetry: automaticRetry, error: error)

effect.performTask { returnedEvents in
XCTAssertTrue(returnedEvents.elementsEqual([.heartbeatFailed(error: PubNubError(.internalServiceError))]))
expectation.fulfill()
}
wait(for: [expectation], timeout: 0.5)
wait(for: [expectation], timeout: 2.5)
}

func test_DelayedHeartbeatEffectGiveUp() {
let expectation = XCTestExpectation()
expectation.expectationDescription = "Effect Completion Expectation"
expectation.assertForOverFulfill = true

let timeout: UInt = 4
let automaticRetry = AutomaticRetry(retryLimit: 3, policy: .linear(delay: 1.0))
let error = PubNubError(.unknown)
let effect = configureEffect(attempt: 3, durationUntilTimeout: timeout, error: error)
let effect = configureEffect(attempt: 3, automaticRetry: automaticRetry, error: error)

mockResponse(GenericServicePayloadResponse(status: 200))

effect.performTask { returnedEvents in
XCTAssertTrue(returnedEvents.elementsEqual([.heartbeatGiveUp(error: PubNubError(.unknown))]))
expectation.fulfill()
}
wait(for: [expectation], timeout: 3.5)
wait(for: [expectation], timeout: 2.5)
}
}

Expand All @@ -156,7 +156,10 @@ fileprivate extension DelayedHeartbeatEffectTests {
}
}

func configureEffect(attempt: Int, durationUntilTimeout: UInt, error: PubNubError) -> any EffectHandler<Presence.Event> {
func configureEffect(
attempt: Int, automaticRetry: AutomaticRetry?,
error: PubNubError
) -> any EffectHandler<Presence.Event> {
factory.effect(
for: .delayedHeartbeat(
channels: ["channel-1", "channel-2"], groups: ["group-1", "group-2"],
Expand All @@ -167,7 +170,7 @@ fileprivate extension DelayedHeartbeatEffectTests {
publishKey: "pubKey",
subscribeKey: "subKey",
userId: "userId",
durationUntilTimeout: durationUntilTimeout
automaticRetry: automaticRetry
))
)
)
Expand Down
Loading

0 comments on commit e3aecaa

Please sign in to comment.