Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ECO-4988] Implement (some of) Room Lifecycle Monitoring spec #67

Merged
merged 5 commits into from
Oct 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ let package = Package(
name: "Ably",
package: "ably-cocoa"
),
.product(
name: "AsyncAlgorithms",
package: "swift-async-algorithms"
),
]
),
.testTarget(
Expand Down
259 changes: 246 additions & 13 deletions Sources/AblyChat/RoomLifecycleManager.swift
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
import Ably
@preconcurrency import Ably
umair-ably marked this conversation as resolved.
Show resolved Hide resolved
import AsyncAlgorithms

/// The interface that the lifecycle manager expects its contributing realtime channels to conform to.
///
/// We use this instead of the ``RealtimeChannel`` interface as its ``attach`` and ``detach`` methods are `async` instead of using callbacks. This makes it easier to write mocks for (since ``RealtimeChannel`` doesn’t express to the type system that the callbacks it receives need to be `Sendable`, it’s hard to, for example, create a mock that creates a `Task` and then calls the callback from inside this task).
/// We use this instead of the ``RealtimeChannelProtocol`` interface as:
///
/// - its ``attach`` and ``detach`` methods are `async` instead of using callbacks
/// - it uses `AsyncSequence` to emit state changes instead of using callbacks
///
/// This makes it easier to write mocks for (since ``RealtimeChannelProtocol`` doesn’t express to the type system that the callbacks it receives need to be `Sendable`, it’s hard to, for example, create a mock that creates a `Task` and then calls the callback from inside this task).
///
/// We choose to also mark the channel’s mutable state as `async`. This is a way of highlighting at the call site of accessing this state that, since `ARTRealtimeChannel` mutates this state on a separate thread, it’s possible for this state to have changed since the last time you checked it, or since the last time you performed an operation that might have mutated it, or since the last time you recieved an event informing you that it changed. To be clear, marking these as `async` doesn’t _solve_ these issues; it just makes them a bit more visible. We’ll decide how to address them in https://github.com/ably-labs/ably-chat-swift/issues/49.
internal protocol RoomLifecycleContributorChannel: Sendable {
Expand All @@ -11,31 +17,90 @@ internal protocol RoomLifecycleContributorChannel: Sendable {

var state: ARTRealtimeChannelState { get async }
var errorReason: ARTErrorInfo? { get async }

/// Equivalent to subscribing to a `RealtimeChannelProtocol` object’s state changes via its `on(_:)` method. The subscription should use the ``BufferingPolicy.unbounded`` buffering policy.
///
/// It is marked as `async` purely to make it easier to write mocks for this method (i.e. to use an actor as a mock).
func subscribeToState() async -> Subscription<ARTChannelStateChange>
maratal marked this conversation as resolved.
Show resolved Hide resolved
}

internal actor RoomLifecycleManager<Channel: RoomLifecycleContributorChannel> {
/// A realtime channel that contributes to the room lifecycle.
internal struct Contributor {
/// The room feature that this contributor corresponds to. Used only for choosing which error to throw when a contributor operation fails.
internal var feature: RoomFeature
/// A realtime channel that contributes to the room lifecycle.
///
/// The identity implied by the `Identifiable` conformance must distinguish each of the contributors passed to a given ``RoomLifecycleManager`` instance.
internal protocol RoomLifecycleContributor: Identifiable, Sendable {
associatedtype Channel: RoomLifecycleContributorChannel

/// The room feature that this contributor corresponds to. Used only for choosing which error to throw when a contributor operation fails.
var feature: RoomFeature { get }
var channel: Channel { get }

/// Informs the contributor that there has been a break in channel continuity, which it should inform library users about.
///
/// It is marked as `async` purely to make it easier to write mocks for this method (i.e. to use an actor as a mock).
func emitDiscontinuity(_ error: ARTErrorInfo) async
}

internal var channel: Channel
internal actor RoomLifecycleManager<Contributor: RoomLifecycleContributor> {
/// Stores manager state relating to a given contributor.
private struct ContributorAnnotation {
// TODO: Not clear whether there can be multiple or just one (asked in https://github.com/ably/specification/pull/200/files#r1781927850)
var pendingDiscontinuityEvents: [ARTErrorInfo] = []
maratal marked this conversation as resolved.
Show resolved Hide resolved
}

internal private(set) var current: RoomLifecycle
internal private(set) var error: ARTErrorInfo?
// TODO: This currently allows the the tests to inject a value in order to test the spec points that are predicated on whether “a channel lifecycle operation is in progress”. In https://github.com/ably-labs/ably-chat-swift/issues/52 we’ll set this property based on whether there actually is a lifecycle operation in progress.
private let hasOperationInProgress: Bool
/// Manager state that relates to individual contributors, keyed by contributors’ ``Contributor.id``. Stored separately from ``contributors`` so that the latter can be a `let`, to make it clear that the contributors remain fixed for the lifetime of the manager.
private var contributorAnnotations: ContributorAnnotations
maratal marked this conversation as resolved.
Show resolved Hide resolved

/// Provides a `Dictionary`-like interface for storing manager state about individual contributors.
private struct ContributorAnnotations {
private var storage: [Contributor.ID: ContributorAnnotation]

init(contributors: [Contributor], pendingDiscontinuityEvents: [Contributor.ID: [ARTErrorInfo]]) {
storage = contributors.reduce(into: [:]) { result, contributor in
result[contributor.id] = .init(pendingDiscontinuityEvents: pendingDiscontinuityEvents[contributor.id] ?? [])
}
}

/// It is a programmer error to call this subscript getter with a contributor that was not one of those passed to ``init(contributors:pendingDiscontinuityEvents)``.
subscript(_ contributor: Contributor) -> ContributorAnnotation {
get {
guard let annotation = storage[contributor.id] else {
preconditionFailure("Expected annotation for \(contributor)")
}
return annotation
}

set {
storage[contributor.id] = newValue
}
}

mutating func clearPendingDiscontinuityEvents() {
storage = storage.mapValues { annotation in
var newAnnotation = annotation
newAnnotation.pendingDiscontinuityEvents = []
return newAnnotation
}
}
}

private let logger: InternalLogger
private let clock: SimpleClock
private let contributors: [Contributor]
private var listenForStateChangesTask: Task<Void, Never>!

internal init(
contributors: [Contributor],
logger: InternalLogger,
clock: SimpleClock
) {
self.init(
) async {
await self.init(
current: nil,
hasOperationInProgress: nil,
pendingDiscontinuityEvents: [:],
contributors: contributors,
logger: logger,
clock: clock
Expand All @@ -45,12 +110,16 @@ internal actor RoomLifecycleManager<Channel: RoomLifecycleContributorChannel> {
#if DEBUG
internal init(
testsOnly_current current: RoomLifecycle? = nil,
testsOnly_hasOperationInProgress hasOperationInProgress: Bool? = nil,
testsOnly_pendingDiscontinuityEvents pendingDiscontinuityEvents: [Contributor.ID: [ARTErrorInfo]]? = nil,
contributors: [Contributor],
logger: InternalLogger,
clock: SimpleClock
) {
self.init(
) async {
await self.init(
current: current,
hasOperationInProgress: hasOperationInProgress,
pendingDiscontinuityEvents: pendingDiscontinuityEvents,
contributors: contributors,
logger: logger,
clock: clock
Expand All @@ -60,16 +129,56 @@ internal actor RoomLifecycleManager<Channel: RoomLifecycleContributorChannel> {

private init(
current: RoomLifecycle?,
hasOperationInProgress: Bool?,
pendingDiscontinuityEvents: [Contributor.ID: [ARTErrorInfo]]?,
contributors: [Contributor],
logger: InternalLogger,
clock: SimpleClock
) {
) async {
self.current = current ?? .initialized
self.hasOperationInProgress = hasOperationInProgress ?? false
self.contributors = contributors
contributorAnnotations = .init(contributors: contributors, pendingDiscontinuityEvents: pendingDiscontinuityEvents ?? [:])
self.logger = logger
self.clock = clock

// The idea here is to make sure that, before the initializer completes, we are already listening for state changes, so that e.g. tests don’t miss a state change.
let subscriptions = await withTaskGroup(of: (contributor: Contributor, subscription: Subscription<ARTChannelStateChange>).self) { group in
for contributor in contributors {
group.addTask {
await (contributor: contributor, subscription: contributor.channel.subscribeToState())
}
}

maratal marked this conversation as resolved.
Show resolved Hide resolved
return await Array(group)
}

// CHA-RL4: listen for state changes from our contributors
// TODO: Understand what happens when this task gets cancelled by `deinit`; I’m not convinced that the for-await loops will exit (https://github.com/ably-labs/ably-chat-swift/issues/29)
listenForStateChangesTask = Task {
await withTaskGroup(of: Void.self) { group in
for (contributor, subscription) in subscriptions {
// This `@Sendable` is to make the compiler error "'self'-isolated value of type '() async -> Void' passed as a strongly transferred parameter; later accesses could race" go away. I don’t hugely understand what it means, but given the "'self'-isolated value" I guessed it was something vaguely to do with the fact that `async` actor initializers are actor-isolated and thought that marking it as `@Sendable` would sever this isolation and make the error go away, which it did 🤷. But there are almost certainly consequences that I am incapable of reasoning about with my current level of Swift concurrency knowledge.
group.addTask { @Sendable [weak self] in
for await stateChange in subscription {
await self?.didReceiveStateChange(stateChange, forContributor: contributor)
}
}
}
}
}
}

deinit {
listenForStateChangesTask.cancel()
}

#if DEBUG
internal func testsOnly_pendingDiscontinuityEvents(for contributor: Contributor) -> [ARTErrorInfo] {
contributorAnnotations[contributor].pendingDiscontinuityEvents
}
#endif

// TODO: clean up old subscriptions (https://github.com/ably-labs/ably-chat-swift/issues/36)
private var subscriptions: [Subscription<RoomStatusChange>] = []

Expand All @@ -79,6 +188,113 @@ internal actor RoomLifecycleManager<Channel: RoomLifecycleContributorChannel> {
return subscription
}

#if DEBUG
// TODO: clean up old subscriptions (https://github.com/ably-labs/ably-chat-swift/issues/36)
/// Supports the ``testsOnly_subscribeToHandledContributorStateChanges()`` method.
private var stateChangeHandledSubscriptions: [Subscription<ARTChannelStateChange>] = []

/// Returns a subscription which emits the contributor state changes that have been handled by the manager.
///
/// A contributor state change is considered handled once the manager has performed all of the side effects that it will perform as a result of receiving this state change. Specifically, once:
///
/// - the manager has recorded all pending discontinuity events provoked by the state change (you can retrieve these using ``testsOnly_pendingDiscontinuityEventsForContributor(at:)``)
/// - the manager has performed all status changes provoked by the state change
/// - the manager has performed all contributor actions provoked by the state change, namely calls to ``RoomLifecycleContributorChannel.detach()`` or ``RoomLifecycleContributor.emitDiscontinuity(_:)``
internal func testsOnly_subscribeToHandledContributorStateChanges() -> Subscription<ARTChannelStateChange> {
let subscription = Subscription<ARTChannelStateChange>(bufferingPolicy: .unbounded)
stateChangeHandledSubscriptions.append(subscription)
return subscription
}
#endif

/// Implements CHA-RL4b’s contributor state change handling.
private func didReceiveStateChange(_ stateChange: ARTChannelStateChange, forContributor contributor: Contributor) async {
logger.log(message: "Got state change \(stateChange) for contributor \(contributor)", level: .info)

// TODO: The spec, which is written for a single-threaded environment, is presumably operating on the assumption that the channel is currently in the state given by `stateChange.current` (https://github.com/ably-labs/ably-chat-swift/issues/49)
switch stateChange.event {
case .update:
// CHA-RL4a1 — if RESUMED then no-op
guard !stateChange.resumed else {
break
}

guard let reason = stateChange.reason else {
// TODO: Decide the right thing to do here (https://github.com/ably-labs/ably-chat-swift/issues/74)
preconditionFailure("State change event with resumed == false should have a reason")
}

if hasOperationInProgress {
// CHA-RL4a3
logger.log(message: "Recording pending discontinuity event for contributor \(contributor)", level: .info)

contributorAnnotations[contributor].pendingDiscontinuityEvents.append(reason)
} else {
// CHA-RL4a4
logger.log(message: "Emitting discontinuity event for contributor \(contributor)", level: .info)

await contributor.emitDiscontinuity(reason)
}
case .attached:
if hasOperationInProgress {
if !stateChange.resumed {
// CHA-RL4b1
logger.log(message: "Recording pending discontinuity event for contributor \(contributor)", level: .info)

guard let reason = stateChange.reason else {
// TODO: Decide the right thing to do here (https://github.com/ably-labs/ably-chat-swift/issues/74)
preconditionFailure("State change event with resumed == false should have a reason")
}
lawrence-forooghian marked this conversation as resolved.
Show resolved Hide resolved

contributorAnnotations[contributor].pendingDiscontinuityEvents.append(reason)
}
} else if current != .attached {
if await (contributors.async.map { await $0.channel.state }.allSatisfy { $0 == .attached }) {
// CHA-RL4b8
logger.log(message: "Now that all contributors are ATTACHED, transitioning room to ATTACHED", level: .info)
changeStatus(to: .attached)
}
}
lawrence-forooghian marked this conversation as resolved.
Show resolved Hide resolved
case .failed:
if !hasOperationInProgress {
// CHA-RL4b5
guard let reason = stateChange.reason else {
// TODO: Decide the right thing to do here (https://github.com/ably-labs/ably-chat-swift/issues/74)
preconditionFailure("FAILED state change event should have a reason")
}
lawrence-forooghian marked this conversation as resolved.
Show resolved Hide resolved

changeStatus(to: .failed, error: reason)

// TODO: CHA-RL4b5 is a bit unclear about how to handle failure, and whether they can be detached concurrently (asked in https://github.com/ably/specification/pull/200/files#r1777471810)
for contributor in contributors {
do {
try await contributor.channel.detach()
} catch {
logger.log(message: "Failed to detach contributor \(contributor), error \(error)", level: .info)
}
}
}
case .suspended:
if !hasOperationInProgress {
// CHA-RL4b9
guard let reason = stateChange.reason else {
// TODO: Decide the right thing to do here (https://github.com/ably-labs/ably-chat-swift/issues/74)
preconditionFailure("SUSPENDED state change event should have a reason")
}

lawrence-forooghian marked this conversation as resolved.
Show resolved Hide resolved
changeStatus(to: .suspended, error: reason)
}
default:
break
}

#if DEBUG
for subscription in stateChangeHandledSubscriptions {
subscription.emit(stateChange)
maratal marked this conversation as resolved.
Show resolved Hide resolved
}
#endif
}

/// Updates ``current`` and ``error`` and emits a status change event.
private func changeStatus(to new: RoomLifecycle, error: ARTErrorInfo? = nil) {
logger.log(message: "Transitioning from \(current) to \(new), error \(String(describing: error))", level: .info)
Expand Down Expand Up @@ -150,6 +366,23 @@ internal actor RoomLifecycleManager<Channel: RoomLifecycleContributorChannel> {

// CHA-RL1g1
changeStatus(to: .attached)
maratal marked this conversation as resolved.
Show resolved Hide resolved

// CHA-RL1g2
await emitPendingDiscontinuityEvents()
}

/// Implements CHA-RL1g2’s emitting of pending discontinuity events.
private func emitPendingDiscontinuityEvents() async {
// Emit all pending discontinuity events
logger.log(message: "Emitting pending discontinuity events", level: .info)
for contributor in contributors {
for pendingDiscontinuityEvent in contributorAnnotations[contributor].pendingDiscontinuityEvents {
logger.log(message: "Emitting pending discontinuity event \(pendingDiscontinuityEvent) to contributor \(contributor)", level: .info)
await contributor.emitDiscontinuity(pendingDiscontinuityEvent)
}
}

contributorAnnotations.clearPendingDiscontinuityEvents()
}

/// Implements CHA-RL1h5’s "detach all channels that are not in the FAILED state".
Expand Down
18 changes: 18 additions & 0 deletions Tests/AblyChatTests/Mocks/MockRoomLifecycleContributor.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import Ably
@testable import AblyChat

actor MockRoomLifecycleContributor: RoomLifecycleContributor {
nonisolated let feature: RoomFeature
nonisolated let channel: MockRoomLifecycleContributorChannel

private(set) var emitDiscontinuityArguments: [ARTErrorInfo] = []

init(feature: RoomFeature, channel: MockRoomLifecycleContributorChannel) {
self.feature = feature
self.channel = channel
}

func emitDiscontinuity(_ error: ARTErrorInfo) async {
emitDiscontinuityArguments.append(error)
}
}
Loading