From 006b333fe0fdff8d399ae22c67d00abd04d0bbf1 Mon Sep 17 00:00:00 2001 From: Lawrence Forooghian Date: Tue, 24 Sep 2024 16:55:07 -0300 Subject: [PATCH] Implement (some of) Room Lifecycle Monitoring spec MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements points CHA-RL4* from same spec as referenced in e70ee44. TODO btw i haven't put action descriptions in all of the test names because they might end up really long In addition to the TODOs added in the code (all of which refer either to existing GitHub issues or questions on the spec, for which we have #66 as a catch-all issue), I’ve also not done: - CHA-RL4a2 — I don’t understand the meaning of “has not yet successfully managed to attach its Realtime Channel”, asked about it in [1] - CHA-RL4b2 — seems redundant, asked about it in [2] - CHA-RL4b3, CHA-RL4b4 — seem redundant, asked about it in [3] - CHA-RL4b5, CHA-RL4b6, CHA-RL4b7 — these relate to transient disconnect timeouts, so will do them in #48 Part of #53. [1] https://github.com/ably/specification/pull/200/files#r1775552624 [2] https://github.com/ably/specification/pull/200/files#r1777212960 [3] https://github.com/ably/specification/pull/200/files#r1777365677 --- Package.swift | 4 + Sources/AblyChat/RoomLifecycleManager.swift | 169 +++++++++++- .../MockRoomLifecycleContributorChannel.swift | 21 +- .../RoomLifecycleManagerTests.swift | 243 +++++++++++++++++- 4 files changed, 434 insertions(+), 3 deletions(-) diff --git a/Package.swift b/Package.swift index fc233cd2..73e5cf0a 100644 --- a/Package.swift +++ b/Package.swift @@ -39,6 +39,10 @@ let package = Package( name: "Ably", package: "ably-cocoa" ), + .product( + name: "AsyncAlgorithms", + package: "swift-async-algorithms" + ), ] ), .testTarget( diff --git a/Sources/AblyChat/RoomLifecycleManager.swift b/Sources/AblyChat/RoomLifecycleManager.swift index 4c010ccc..5c741c47 100644 --- a/Sources/AblyChat/RoomLifecycleManager.swift +++ b/Sources/AblyChat/RoomLifecycleManager.swift @@ -1,4 +1,5 @@ -import Ably +@preconcurrency import Ably +import AsyncAlgorithms /// The interface that the lifecycle manager expects its contributing realtime channels to conform to. /// @@ -11,6 +12,13 @@ internal protocol RoomLifecycleContributorChannel: Sendable { var state: ARTRealtimeChannelState { get async } var errorReason: ARTErrorInfo? { get async } + + // TODO: consider the consequences of this async (right now, it's just to make it easy to write a mock using an actor), but from a semantics point of view does it make sense in the same way as the above ones? + func subscribeToState() async -> Subscription + + // TODO: this really isn't the right place for this to go, move elsewhere + // TODO: again, this `async` is a bit dodgy, it's just there so we can use an actor to manage some subscription state + func emitDiscontinuity(_ error: ARTErrorInfo) async } internal actor RoomLifecycleManager { @@ -22,8 +30,23 @@ internal actor RoomLifecycleManager { internal var channel: Channel } + // TODO: something intelligent to say about this beyond that it's a term used in the spec. Exposed for tests + internal struct PendingDiscontinuityEvent { + internal var error: ARTErrorInfo + } + + /// Stores manager state relating to a given contributor. + private struct ContributorAnnotation { + // TODO: Not clear whether there can be multiple or just one (asked in https://github.com/ably/specification/pull/200/files#r1781927850) + var pendingDiscontinuityEvents: [PendingDiscontinuityEvent] = [] + } + internal private(set) var current: RoomLifecycle internal private(set) var error: ARTErrorInfo? + // TODO: link this to what the manager is actually doing + private var hasOperationInProgress: Bool + /// The annotation at a given index belongs to the element of ``contributors`` at the same index. + private var contributorAnnotations: [ContributorAnnotation] private let logger: InternalLogger private let clock: SimpleClock @@ -36,6 +59,7 @@ internal actor RoomLifecycleManager { ) async { await self.init( current: nil, + hasOperationInProgress: nil, contributors: contributors, logger: logger, clock: clock @@ -45,12 +69,14 @@ internal actor RoomLifecycleManager { #if DEBUG internal init( testsOnly_current current: RoomLifecycle? = nil, + testsOnly_hasOperationInProgress hasOperationInProgress: Bool? = nil, contributors: [Contributor], logger: InternalLogger, clock: SimpleClock ) async { await self.init( current: current, + hasOperationInProgress: hasOperationInProgress, contributors: contributors, logger: logger, clock: clock @@ -60,16 +86,53 @@ internal actor RoomLifecycleManager { private init( current: RoomLifecycle?, + hasOperationInProgress: Bool?, contributors: [Contributor], logger: InternalLogger, clock: SimpleClock ) async { self.current = current ?? .initialized + self.hasOperationInProgress = hasOperationInProgress ?? false self.contributors = contributors + contributorAnnotations = Array(repeating: .init(), count: contributors.count) self.logger = logger self.clock = clock + + // The idea here is to make sure that, before the initializer completes, we are already listening for state changes, so that e.g. tests don’t miss a state change. + let subscriptions = await withTaskGroup(of: Subscription.self) { group in + for contributor in contributors { + group.addTask { + await contributor.channel.subscribeToState() + } + } + + return await Array(group) + } + + // TODO: who owns this task? how does it get cancelled? how do we know when it's started and that the manager is "ready to go"? Do we need an async initializer? + Task { + await withTaskGroup(of: Void.self) { group in + for (index, subscription) in subscriptions.enumerated() { + // TODO: this capture + // TODO: is await what we want? is there a way to make the manager's initializer isolated to the manager? + // TODO: this @Sendable was to make a mysterious compiler error go away + group.addTask { @Sendable in + for await stateChange in subscription { + // TODO: why does this not inherit the actor isolation? (I mean, now that I have @Sendable, I get it) + await self.didReceiveStateChange(stateChange, forContributorAt: index) + } + } + } + } + } } + #if DEBUG + internal func testsOnly_pendingDiscontinuityEventsForContributor(at index: Int) -> [PendingDiscontinuityEvent] { + contributorAnnotations[index].pendingDiscontinuityEvents + } + #endif + // TODO: clean up old subscriptions (https://github.com/ably-labs/ably-chat-swift/issues/36) private var subscriptions: [Subscription] = [] @@ -79,6 +142,110 @@ internal actor RoomLifecycleManager { return subscription } + #if DEBUG + // TODO: explain — these are to let the tests know that it's handled a state change + private var stateChangeHandledSubscriptions: [Subscription] = [] + + internal func testsOnly_subscribeToHandledContributorStateChanges() -> Subscription { + let subscription = Subscription(bufferingPolicy: .unbounded) + stateChangeHandledSubscriptions.append(subscription) + return subscription + } + #endif + + // TODO: this is only async because it needs to call `emitDiscontinuity`; that's not great. update: now it's also calling detach on channels but that probably shouldn't be inline, need clarification) + /// Implements CHA-RL4b’s contributor state change handling. + private func didReceiveStateChange(_ stateChange: ARTChannelStateChange, forContributorAt index: Int) async { + logger.log(message: "Got state change \(stateChange) for contributor at index \(index)", level: .info) + + switch stateChange.event { + case .update: + // CHA-RL4a1 — if RESUMED then no-op + guard !stateChange.resumed else { + break + } + + guard let reason = stateChange.reason else { + // TODO: is this OK? would be good if ably-cocoa could communicate this in types + preconditionFailure("State change event with resumed == false should have a reason") + } + + if hasOperationInProgress { + // CHA-RL4a3 + logger.log(message: "Recording pending discontinuity event for contributor at index \(index)", level: .info) + + contributorAnnotations[index].pendingDiscontinuityEvents.append( + .init(error: reason) + ) + } else { + // CHA-RL4a4 + logger.log(message: "Emitting discontinuity event for contributor at index \(index)", level: .info) + + let contributor = contributors[index] + await contributor.channel.emitDiscontinuity(reason) + } + case .attached: + if hasOperationInProgress { + if !stateChange.resumed { + // CHA-RL4b1 + logger.log(message: "Recording pending discontinuity event for contributor at index \(index)", level: .info) + + guard let reason = stateChange.reason else { + // TODO: same question as above about whether this is OK + preconditionFailure("State change event with resumed == false should have a reason") + } + + contributorAnnotations[index].pendingDiscontinuityEvents.append( + .init(error: reason) + ) + } + } else if current != .attached { + if await (contributors.async.map { await $0.channel.state }.allSatisfy { $0 == .attached }) { + // CHA-RL4b8 + logger.log(message: "Now that all contributors are ATTACHED, transitioning room to ATTACHED", level: .info) + changeStatus(to: .attached) + } + } + case .failed: + if !hasOperationInProgress { + // CHA-RL4b5 + guard let reason = stateChange.reason else { + // TODO: same question as above about whether this is OK + preconditionFailure("FAILED state change event should have a reason") + } + + changeStatus(to: .failed, error: reason) + + // TODO: CHA-RL4b5 is a bit unclear about how to handle failure, and whether they can be detached concurrently (asked in https://github.com/ably/specification/pull/200/files#r1777471810) + for contributor in contributors { + do { + try await contributor.channel.detach() + } catch { + logger.log(message: "Failed to detach contributor \(contributor), error \(error)", level: .info) + } + } + } + case .suspended: + if !hasOperationInProgress { + // CHA-RL4b9 + guard let reason = stateChange.reason else { + // TODO: same question as above about whether this is OK + preconditionFailure("SUSPENDED state change event should have a reason") + } + + changeStatus(to: .suspended, error: reason) + } + default: + break + } + + #if DEBUG + for subscription in stateChangeHandledSubscriptions { + subscription.emit(stateChange) + } + #endif + } + /// Updates ``current`` and ``error`` and emits a status change event. private func changeStatus(to new: RoomLifecycle, error: ARTErrorInfo? = nil) { logger.log(message: "Transitioning from \(current) to \(new), error \(String(describing: error))", level: .info) diff --git a/Tests/AblyChatTests/Mocks/MockRoomLifecycleContributorChannel.swift b/Tests/AblyChatTests/Mocks/MockRoomLifecycleContributorChannel.swift index d23539af..c13b046b 100644 --- a/Tests/AblyChatTests/Mocks/MockRoomLifecycleContributorChannel.swift +++ b/Tests/AblyChatTests/Mocks/MockRoomLifecycleContributorChannel.swift @@ -1,4 +1,4 @@ -import Ably +@preconcurrency import Ably @testable import AblyChat final actor MockRoomLifecycleContributorChannel: RoomLifecycleContributorChannel { @@ -7,9 +7,12 @@ final actor MockRoomLifecycleContributorChannel: RoomLifecycleContributorChannel var state: ARTRealtimeChannelState var errorReason: ARTErrorInfo? + // TODO: clean up + private var subscriptions: [Subscription] = [] private(set) var attachCallCount = 0 private(set) var detachCallCount = 0 + private(set) var emitDiscontinuityArguments: [ARTErrorInfo] = [] init( initialState: ARTRealtimeChannelState, @@ -92,4 +95,20 @@ final actor MockRoomLifecycleContributorChannel: RoomLifecycleContributorChannel throw error } } + + func subscribeToState() -> Subscription { + let subscription = Subscription(bufferingPolicy: .unbounded) + subscriptions.append(subscription) + return subscription + } + + func emitStateChange(_ stateChange: ARTChannelStateChange) { + for subscription in subscriptions { + subscription.emit(stateChange) + } + } + + func emitDiscontinuity(_ error: ARTErrorInfo) async { + emitDiscontinuityArguments.append(error) + } } diff --git a/Tests/AblyChatTests/RoomLifecycleManagerTests.swift b/Tests/AblyChatTests/RoomLifecycleManagerTests.swift index 17a752bc..f33617ec 100644 --- a/Tests/AblyChatTests/RoomLifecycleManagerTests.swift +++ b/Tests/AblyChatTests/RoomLifecycleManagerTests.swift @@ -1,4 +1,4 @@ -import Ably +@preconcurrency import Ably @testable import AblyChat import Testing @@ -29,11 +29,13 @@ struct RoomLifecycleManagerTests { private func createManager( forTestingWhatHappensWhenCurrentlyIn current: RoomLifecycle? = nil, + forTestingWhatHappensWhenHasOperationInProgress hasOperationInProgress: Bool? = nil, contributors: [RoomLifecycleManager.Contributor] = [], clock: SimpleClock = MockSimpleClock() ) async -> RoomLifecycleManager { await .init( testsOnly_current: current, + testsOnly_hasOperationInProgress: hasOperationInProgress, contributors: contributors, logger: TestLogger(), clock: clock @@ -674,4 +676,243 @@ struct RoomLifecycleManagerTests { #expect(await manager.current == .released) } + + // MARK: - Handling contributor UPDATE events + + // @spec CHA-RL4a1 + @Test + func contributorUpdate_withResumedTrue_doesNothing() async throws { + // Given: A RoomLifecycleManager + let contributor = createContributor() + let manager = await createManager(contributors: [contributor]) + + // When: A contributor emits an UPDATE event with `resumed` flag set to true + let contributorStateChange = ARTChannelStateChange( + current: .attached, // arbitrary + previous: .attached, // arbitrary + event: .update, + reason: ARTErrorInfo(domain: "SomeDomain", code: 123), // arbitrary + resumed: true + ) + + // TODO: explain + let handledContributorStateChangesSubscription = await manager.testsOnly_subscribeToHandledContributorStateChanges() + async let handledContributorStateChangeSignal = handledContributorStateChangesSubscription.first { $0 === contributorStateChange } + + await contributor.channel.emitStateChange(contributorStateChange) + + // TODO: explain + _ = try #require(await handledContributorStateChangeSignal) + + // Then: The manager does not record a pending discontinuity event for this contributor, nor does it call `emitDiscontinuity` on the contributor (this is my interpretation of "no action should be taken" in CHA-RL4a1; i.e. that the actions described in CHA-RL4a2 and CHA-RL4a3 shouldn’t happen) (TODO: get clarification; have asked in https://github.com/ably/specification/pull/200#discussion_r1777385499) + #expect(await manager.testsOnly_pendingDiscontinuityEventsForContributor(at: 0).isEmpty) + #expect(await contributor.channel.emitDiscontinuityArguments.isEmpty) + } + + // @spec CHA-RL4a3 + @Test + func contributorUpdate_withResumedFalse_withOperationInProgress_recordsPendingDiscontinuityEvent() async throws { + // Given: A RoomLifecycleManager, with a room lifecycle operation in progress + let contributor = createContributor() + let manager = await createManager(forTestingWhatHappensWhenHasOperationInProgress: true, contributors: [contributor]) + + // When: A contributor emits an UPDATE event with `resumed` flag set to false + let contributorStateChange = ARTChannelStateChange( + current: .attached, // arbitrary + previous: .attached, // arbitrary + event: .update, + reason: ARTErrorInfo(domain: "SomeDomain", code: 123), // arbitrary + resumed: false + ) + + // TODO: explain + let handledContributorStateChangesSubscription = await manager.testsOnly_subscribeToHandledContributorStateChanges() + async let handledContributorStateChangeSignal = handledContributorStateChangesSubscription.first { $0 === contributorStateChange } + + await contributor.channel.emitStateChange(contributorStateChange) + + // TODO: explain + _ = try #require(await handledContributorStateChangeSignal) + + // Then: The manager records a pending discontinuity event for this contributor, and this discontinuity event has error equal to the contributor UPDATE event’s `reason` + let pendingDiscontinuityEvents = await manager.testsOnly_pendingDiscontinuityEventsForContributor(at: 0) + try #require(pendingDiscontinuityEvents.count == 1) + + let pendingDiscontinuityEvent = pendingDiscontinuityEvents[0] + #expect(pendingDiscontinuityEvent.error === contributorStateChange.reason) + } + + // @spec CHA-RL4a4 + @Test + func contributorUpdate_withResumedTrue_withNoOperationInProgress_emitsDiscontinuityEvent() async throws { + // Given: A RoomLifecycleManager, with no room lifecycle operation in progress + let contributor = createContributor() + let manager = await createManager(forTestingWhatHappensWhenHasOperationInProgress: false, contributors: [contributor]) + + // When: A contributor emits an UPDATE event with `resumed` flag set to false + let contributorStateChange = ARTChannelStateChange( + current: .attached, // arbitrary + previous: .attached, // arbitrary + event: .update, + reason: ARTErrorInfo(domain: "SomeDomain", code: 123), // arbitrary + resumed: false + ) + + // TODO: explain + let handledContributorStateChangesSubscription = await manager.testsOnly_subscribeToHandledContributorStateChanges() + async let handledContributorStateChangeSignal = handledContributorStateChangesSubscription.first { $0 === contributorStateChange } + + await contributor.channel.emitStateChange(contributorStateChange) + + // TODO: explain + _ = try #require(await handledContributorStateChangeSignal) + + // Then: The manager calls `emitDiscontinuity` on the contributor, with error equal to the contributor UPDATE event’s `reason` + let emitDiscontinuityArguments = await contributor.channel.emitDiscontinuityArguments + try #require(emitDiscontinuityArguments.count == 1) + + let discontinuity = emitDiscontinuityArguments[0] + #expect(discontinuity === contributorStateChange.reason) + } + + // @specPartial CHA-RL4b1 - I don’t know the meaning of "and the particular contributor has been attached previously" so haven’t implemented that part of the spec point (TODO: asked in https://github.com/ably/specification/pull/200/files#r1775552624) + @Test + func contributorAttachEvent_withResumeFalse_withOperationInProgress_recordsPendingDiscontinuityEvent() async throws { + // Given: A RoomLifecycleManager, with a room lifecycle operation in progress + let contributor = createContributor() + let manager = await createManager(forTestingWhatHappensWhenHasOperationInProgress: true, contributors: [contributor]) + + // When: A contributor emits an ATTACHED event with `resumed` flag set to false + let contributorStateChange = ARTChannelStateChange( + current: .attached, + previous: .attaching, // arbitrary + event: .attached, + reason: ARTErrorInfo(domain: "SomeDomain", code: 123), // arbitrary + resumed: false + ) + // TODO: (threading) — read this from the event or the contributor? + + // TODO: explain + let handledContributorStateChangesSubscription = await manager.testsOnly_subscribeToHandledContributorStateChanges() + async let handledContributorStateChangeSignal = handledContributorStateChangesSubscription.first { $0 === contributorStateChange } + + await contributor.channel.emitStateChange(contributorStateChange) + + // TODO: explain + _ = try #require(await handledContributorStateChangeSignal) + + // Then: The manager records a pending discontinuity event for this contributor, and this discontinuity event has error equal to the contributor ATTACHED event’s `reason` + let pendingDiscontinuityEvents = await manager.testsOnly_pendingDiscontinuityEventsForContributor(at: 0) + try #require(pendingDiscontinuityEvents.count == 1) + + let pendingDiscontinuityEvent = pendingDiscontinuityEvents[0] + #expect(pendingDiscontinuityEvent.error === contributorStateChange.reason) + } + + // @specPartial CHA-RL4b5 - Haven’t implemented the part that refers to "transient disconnect timeouts"; TODO do this (https://github.com/ably-labs/ably-chat-swift/issues/48) + @Test + func contributorFailedEvent_withNoOperationInProgress() async throws { + // Given: A RoomLifecycleManager, with no room lifecycle operation in progress + let contributors = [ + // TODO: I think success is fine, the spec doesn't say what to do in response anyway + createContributor(detachBehavior: .success), + createContributor(detachBehavior: .success), + ] + let manager = await createManager(forTestingWhatHappensWhenHasOperationInProgress: false, contributors: contributors) + + let roomStatusSubscription = await manager.onChange(bufferingPolicy: .unbounded) + async let failedStatusChange = roomStatusSubscription.first { $0.current == .failed } + + // When: A contributor emits an FAILED event + let contributorStateChange = ARTChannelStateChange( + current: .failed, + previous: .attached, // arbitrary + event: .failed, + reason: ARTErrorInfo(domain: "SomeDomain", code: 123), // arbitrary + resumed: false // arbitrary + ) + + // TODO: explain + let handledContributorStateChangesSubscription = await manager.testsOnly_subscribeToHandledContributorStateChanges() + async let handledContributorStateChangeSignal = handledContributorStateChangesSubscription.first { $0 === contributorStateChange } + + await contributors[0].channel.emitStateChange(contributorStateChange) + + // TODO: explain + _ = try #require(await handledContributorStateChangeSignal) + + // Then: + // - the room status transitions to failed, with the error of the status change being the `reason` of the contributor FAILED event + // - and it calls `detach` on all contributors + _ = try #require(await failedStatusChange) + #expect(await manager.current == .failed) + + for contributor in contributors { + #expect(await contributor.channel.detachCallCount == 1) + } + } + + // @spec CHA-RL4b8 + // TODO: should I a test for the case where not all attached? but again you have the business of "how do you know it hasn't" + @Test + func contributorAttachedEvent_withNoOperationInProgress_roomNotAttached_allContributorsAttached() async throws { + // Given: A RoomLifecycleManager, not in the ATTACHED state, all of whose contributors are in the ATTACHED state (to satisfy the condition of CHA-RL4b8; for the purposes of this test I don’t care that they’re in this state even _before_ the state change of the When) + let contributors = [ + createContributor(initialState: .attached), + createContributor(initialState: .attached), + ] + + let manager = await createManager( + forTestingWhatHappensWhenCurrentlyIn: .initialized, // arbitrary non-ATTACHED + forTestingWhatHappensWhenHasOperationInProgress: false, + contributors: contributors + ) + + let roomStatusSubscription = await manager.onChange(bufferingPolicy: .unbounded) + async let maybeAttachedRoomStatusChange = roomStatusSubscription.first { $0.current == .attached } + + // When: A contributor emits a state change to ATTACHED + let contributorStateChange = ARTChannelStateChange( + current: .attached, + previous: .attaching, // arbitrary + event: .attached, + reason: ARTErrorInfo(domain: "SomeDomain", code: 123), // arbitrary + resumed: false // arbitrary + ) + + await contributors[0].channel.emitStateChange(contributorStateChange) + + // Then: The room status transitions to ATTACHED + _ = try #require(await maybeAttachedRoomStatusChange) + #expect(await manager.current == .attached) + } + + // @specPartial CHA-RL4b9 - Haven’t implemented the part that refers to "transient disconnect timeouts"; TODO do this (https://github.com/ably-labs/ably-chat-swift/issues/48). Nor have I implemented "the room enters the RETRY loop"; TODO do this (https://github.com/ably-labs/ably-chat-swift/issues/51) + @Test + func contributorSuspendedEvent_withNoOperationInProgress() async throws { + // Given: A RoomLifecycleManager with no lifecycle operation in progress + let contributor = createContributor() + let manager = await createManager(forTestingWhatHappensWhenHasOperationInProgress: false, contributors: [contributor]) + + let roomStatusSubscription = await manager.onChange(bufferingPolicy: .unbounded) + async let maybeSuspendedRoomStatusChange = roomStatusSubscription.first { $0.current == .suspended } + + // When: A contributor emits a state change to SUSPENDED + let contributorStateChange = ARTChannelStateChange( + current: .suspended, + previous: .attached, // arbitrary + event: .suspended, + reason: ARTErrorInfo(domain: "SomeDomain", code: 123), // arbitrary + resumed: false // arbitrary + ) + + await contributor.channel.emitStateChange(contributorStateChange) + + // Then: The room transitions to SUSPENDED, and this state change has error equal to the contributor state change’s `reason` + let suspendedRoomStatusChange = try #require(await maybeSuspendedRoomStatusChange) + #expect(suspendedRoomStatusChange.error === contributorStateChange.reason) + + #expect(await manager.current == .suspended) + #expect(await manager.error === contributorStateChange.reason) + } }