Skip to content

Commit

Permalink
Implement “asynchronously” part of CHA-RL1h5
Browse files Browse the repository at this point in the history
This implementation reflects my suggested spec changes [1] which aim to
preserve lifecycle operation atomicity by introducing a new RUNDOWN
operation:

> The `ATTACH` operation ends in CHA-RL1h4, implying that the CHA-RL1h5
> asynchronous detach happens _outside of any room lifecycle operation_.
> This means that another room lifecycle operation could run at the same
> time as this detach operation, which doesn't seem intentional.
>
> I looked at the JS implementation [2] and it seems that it keeps the
> lifecycle manager’s mutex locked during this "rundown" (as it calls the
> CHA-RL1h5 detach operation). But this is not implied in the spec. I
> think that to translate this behaviour to the spec, which implements
> mutual exclusion through lifecycle operations, we should do something
> analogous to the `RETRY` operation; that is, define a new internal-only
> room lifecycle operation (I’ll call it `RUNDOWN` for want of a better
> term), which is scheduled by CHA-RL1h5 and which:
>
> - performs the detach-all-non-`FAILED`-contributors behaviour of CHA-RL1h5
> - implements the retry behaviour of CHA-RL1h6

Resolves #119.

[1] ably/specification#253
[2] https://github.com/ably/ably-chat-js/blob/e8380583424a83f7151405cc0716e01302295eb6/src/core/room-lifecycle-manager.ts#L506-L509
  • Loading branch information
lawrence-forooghian committed Dec 4, 2024
1 parent db9d4b5 commit 77d0791
Show file tree
Hide file tree
Showing 2 changed files with 182 additions and 69 deletions.
101 changes: 76 additions & 25 deletions Sources/AblyChat/RoomLifecycleManager.swift
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,9 @@ internal actor DefaultRoomLifecycleManager<Contributor: RoomLifecycleContributor
// `retryOperationTask` is exposed so that tests can wait for the triggered RETRY operation to complete.
case suspendedAwaitingStartOfRetryOperation(retryOperationTask: Task<Void, Never>, error: ARTErrorInfo)
case suspended(retryOperationID: UUID, error: ARTErrorInfo)
// `rundownOperationTask` is exposed so that tests can wait for the triggered RUNDOWN operation to complete.
case failedAwaitingStartOfRundownOperation(rundownOperationTask: Task<Void, Never>, error: ARTErrorInfo)
case failedAndPerformingRundownOperation(rundownOperationID: UUID, error: ARTErrorInfo)
case failed(error: ARTErrorInfo)
case releasing(releaseOperationID: UUID)
case released
Expand All @@ -216,6 +219,10 @@ internal actor DefaultRoomLifecycleManager<Contributor: RoomLifecycleContributor
.suspended(error: error)
case let .suspended(_, error):
.suspended(error: error)
case let .failedAwaitingStartOfRundownOperation(_, error):
.failed(error: error)
case let .failedAndPerformingRundownOperation(_, error):
.failed(error: error)
case let .failed(error):
.failed(error: error)
case .releasing:
Expand All @@ -239,9 +246,12 @@ internal actor DefaultRoomLifecycleManager<Contributor: RoomLifecycleContributor
releaseOperationID
case let .suspended(retryOperationID, _):
retryOperationID
case let .failedAndPerformingRundownOperation(rundownOperationID, _):
rundownOperationID
case .initialized,
.attached,
.detached,
.failedAwaitingStartOfRundownOperation,
.failed,
.released,
.attachingDueToContributorStateChange,
Expand Down Expand Up @@ -355,7 +365,7 @@ internal actor DefaultRoomLifecycleManager<Contributor: RoomLifecycleContributor
let previous = status
status = new

// Avoid a double-emit of room status when changing from `.suspendedAwaitingStartOfRetryOperation` to `.suspended`.
// Avoid a double-emit of room status when changing between `Status` values that map to the same `RoomStatus`; e.g. when changing from `.suspendedAwaitingStartOfRetryOperation` to `.suspended`.
if new.toRoomStatus != previous.toRoomStatus {
let statusChange = RoomStatusChange(current: status.toRoomStatus, previous: previous.toRoomStatus)
emitRoomStatusChange(statusChange)
Expand Down Expand Up @@ -741,6 +751,8 @@ internal actor DefaultRoomLifecycleManager<Contributor: RoomLifecycleContributor
private enum OperationKind {
/// The RETRY operation.
case retry(triggeringContributor: Contributor, errorForSuspendedStatus: ARTErrorInfo)
/// The RUNDOWN operation.
case rundown(errorForFailedStatus: ARTErrorInfo)
}

/// Requests that a room lifecycle operation be performed asynchronously.
Expand All @@ -754,6 +766,10 @@ internal actor DefaultRoomLifecycleManager<Contributor: RoomLifecycleContributor
triggeredByContributor: triggeringContributor,
errorForSuspendedStatus: errorForSuspendedStatus
)
case let .rundown(errorForFailedStatus):
await performRundownOperation(
errorForFailedStatus: errorForFailedStatus
)
}
}
}
Expand Down Expand Up @@ -789,7 +805,7 @@ internal actor DefaultRoomLifecycleManager<Contributor: RoomLifecycleContributor
case .released:
// CHA-RL1c
throw ARTErrorInfo(chatError: .roomIsReleased)
case .initialized, .suspendedAwaitingStartOfRetryOperation, .suspended, .attachingDueToAttachOperation, .attachingDueToRetryOperation, .attachingDueToContributorStateChange, .detached, .detachedDueToRetryOperation, .detaching, .failed:
case .initialized, .suspendedAwaitingStartOfRetryOperation, .suspended, .attachingDueToAttachOperation, .attachingDueToRetryOperation, .attachingDueToContributorStateChange, .detached, .detachedDueToRetryOperation, .detaching, .failed, .failedAwaitingStartOfRundownOperation, .failedAndPerformingRundownOperation:
break
}

Expand Down Expand Up @@ -832,14 +848,18 @@ internal actor DefaultRoomLifecycleManager<Contributor: RoomLifecycleContributor
changeStatus(to: .suspendedAwaitingStartOfRetryOperation(retryOperationTask: retryOperationTask, error: error))
throw error
case .failed:
// CHA-RL1h4
let error = ARTErrorInfo(chatError: .attachmentFailed(feature: contributor.feature, underlyingError: contributorAttachError))
changeStatus(to: .failed(error: error))

// CHA-RL1h5
// TODO: Implement the "asynchronously with respect to CHA-RL1h4" part of CHA-RL1h5 (https://github.com/ably-labs/ably-chat-swift/issues/50)
await detachNonFailedContributors()
// My understanding is that, since this task is being created inside an actor’s synchronous code, the two .failed* statuses will always come in the right order; i.e. first .failedAwaitingStartOfRundownOperation and then .failedAndPerformingRundownOperation.
let rundownOperationTask = scheduleAnOperation(
kind: .rundown(
errorForFailedStatus: error
)
)

// CHA-RL1h4
changeStatus(to: .failedAwaitingStartOfRundownOperation(rundownOperationTask: rundownOperationTask, error: error))
throw error
default:
// TODO: The spec assumes the channel will be in one of the above states, but working in a multi-threaded environment means it might not be (https://github.com/ably-labs/ably-chat-swift/issues/49)
Expand Down Expand Up @@ -873,23 +893,6 @@ internal actor DefaultRoomLifecycleManager<Contributor: RoomLifecycleContributor
contributorAnnotations.clearPendingDiscontinuityEvents()
}

/// Implements CHA-RL1h5’s "detach all channels that are not in the FAILED state".
private func detachNonFailedContributors() async {
for contributor in contributors where await (contributor.channel.state) != .failed {
// CHA-RL1h6: Retry until detach succeeds
while true {
do {
logger.log(message: "Detaching non-failed contributor \(contributor)", level: .info)
try await contributor.channel.detach()
break
} catch {
logger.log(message: "Failed to detach non-failed contributor \(contributor), error \(error). Retrying.", level: .info)
// Loop repeats
}
}
}
}

// MARK: - DETACH operation

internal func performDetachOperation() async throws {
Expand Down Expand Up @@ -921,7 +924,7 @@ internal actor DefaultRoomLifecycleManager<Contributor: RoomLifecycleContributor
case .released:
// CHA-RL2c
throw ARTErrorInfo(chatError: .roomIsReleased)
case .failed:
case .failed, .failedAwaitingStartOfRundownOperation, .failedAndPerformingRundownOperation:
// CHA-RL2d
throw ARTErrorInfo(chatError: .roomInFailedState)
case .initialized, .suspendedAwaitingStartOfRetryOperation, .suspended, .attachingDueToAttachOperation, .attachingDueToRetryOperation, .attachingDueToContributorStateChange, .attached, .detaching:
Expand Down Expand Up @@ -1056,7 +1059,7 @@ internal actor DefaultRoomLifecycleManager<Contributor: RoomLifecycleContributor
// See note on waitForCompletionOfOperationWithID for the current need for this force try
// swiftlint:disable:next force_try
return try! await waitForCompletionOfOperationWithID(releaseOperationID, requester: .anotherOperation(operationID: operationID))
case .attached, .attachingDueToAttachOperation, .attachingDueToRetryOperation, .attachingDueToContributorStateChange, .detaching, .suspendedAwaitingStartOfRetryOperation, .suspended, .failed:
case .attached, .attachingDueToAttachOperation, .attachingDueToRetryOperation, .attachingDueToContributorStateChange, .detaching, .suspendedAwaitingStartOfRetryOperation, .suspended, .failed, .failedAwaitingStartOfRundownOperation, .failedAndPerformingRundownOperation:
break
}

Expand Down Expand Up @@ -1197,6 +1200,54 @@ internal actor DefaultRoomLifecycleManager<Contributor: RoomLifecycleContributor
}
}

// MARK: - RUNDOWN operation

/// Implements the RUNDOWN operation.
///
/// This operation is not currently in the specification, but it comes from my suggestion in https://github.com/ably/specification/issues/253 for how to handle the fact that the spec, as currently written, does not guarantee that the CHA-RL1h5 detach behaviour is performed atomically with respect to room lifecycle operations. TODO bring in line with spec once spec updated.
///
/// - Parameters:
/// - forcedOperationID: Allows tests to force the operation to have a given ID. In combination with the ``testsOnly_subscribeToOperationWaitEvents`` API, this allows tests to verify that one test-initiated operation is waiting for another test-initiated operation.
internal func performRundownOperation(testsOnly_forcingOperationID forcedOperationID: UUID? = nil, errorForFailedStatus: ARTErrorInfo) async {
// See note on performAnOperation for the current need for this force try
// swiftlint:disable:next force_try
try! await performAnOperation(forcingOperationID: forcedOperationID) { operationID in
await bodyOfRundownOperation(
operationID: operationID,
errorForFailedStatus: errorForFailedStatus
)
}
}

private func bodyOfRundownOperation(
operationID: UUID,
errorForFailedStatus: ARTErrorInfo
) async {
changeStatus(to: .failedAndPerformingRundownOperation(rundownOperationID: operationID, error: errorForFailedStatus))

// CHA-RL1h5
await detachNonFailedContributors()

changeStatus(to: .failed(error: errorForFailedStatus))
}

/// Implements CHA-RL1h5’s "detach all channels that are not in the FAILED state".
private func detachNonFailedContributors() async {
for contributor in contributors where await (contributor.channel.state) != .failed {
// CHA-RL1h6: Retry until detach succeeds
while true {
do {
logger.log(message: "Detaching non-failed contributor \(contributor)", level: .info)
try await contributor.channel.detach()
break
} catch {
logger.log(message: "Failed to detach non-failed contributor \(contributor), error \(error). Retrying.", level: .info)
// Loop repeats
}
}
}
}

// MARK: - Waiting to be able to perform presence operations

internal func waitToBeAbleToPerformPresenceOperations(requestedByFeature requester: RoomFeature) async throws(ARTErrorInfo) {
Expand Down
Loading

0 comments on commit 77d0791

Please sign in to comment.