Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ECO-5122] Implement the "asynchronously" part of CHA-RL1h5 #174

Merged
merged 2 commits into from
Dec 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 76 additions & 25 deletions Sources/AblyChat/RoomLifecycleManager.swift
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,9 @@ internal actor DefaultRoomLifecycleManager<Contributor: RoomLifecycleContributor
// `retryOperationTask` is exposed so that tests can wait for the triggered RETRY operation to complete.
case suspendedAwaitingStartOfRetryOperation(retryOperationTask: Task<Void, Never>, error: ARTErrorInfo)
case suspended(retryOperationID: UUID, error: ARTErrorInfo)
// `rundownOperationTask` is exposed so that tests can wait for the triggered RUNDOWN operation to complete.
case failedAwaitingStartOfRundownOperation(rundownOperationTask: Task<Void, Never>, error: ARTErrorInfo)
case failedAndPerformingRundownOperation(rundownOperationID: UUID, error: ARTErrorInfo)
case failed(error: ARTErrorInfo)
case releasing(releaseOperationID: UUID)
case released
Expand All @@ -216,6 +219,10 @@ internal actor DefaultRoomLifecycleManager<Contributor: RoomLifecycleContributor
.suspended(error: error)
case let .suspended(_, error):
.suspended(error: error)
case let .failedAwaitingStartOfRundownOperation(_, error):
.failed(error: error)
case let .failedAndPerformingRundownOperation(_, error):
.failed(error: error)
maratal marked this conversation as resolved.
Show resolved Hide resolved
case let .failed(error):
.failed(error: error)
case .releasing:
Expand All @@ -239,9 +246,12 @@ internal actor DefaultRoomLifecycleManager<Contributor: RoomLifecycleContributor
releaseOperationID
case let .suspended(retryOperationID, _):
retryOperationID
case let .failedAndPerformingRundownOperation(rundownOperationID, _):
rundownOperationID
case .initialized,
.attached,
.detached,
.failedAwaitingStartOfRundownOperation,
.failed,
.released,
.attachingDueToContributorStateChange,
Expand Down Expand Up @@ -355,7 +365,7 @@ internal actor DefaultRoomLifecycleManager<Contributor: RoomLifecycleContributor
let previous = status
status = new

// Avoid a double-emit of room status when changing from `.suspendedAwaitingStartOfRetryOperation` to `.suspended`.
// Avoid a double-emit of room status when changing between `Status` values that map to the same `RoomStatus`; e.g. when changing from `.suspendedAwaitingStartOfRetryOperation` to `.suspended`.
if new.toRoomStatus != previous.toRoomStatus {
let statusChange = RoomStatusChange(current: status.toRoomStatus, previous: previous.toRoomStatus)
emitRoomStatusChange(statusChange)
Expand Down Expand Up @@ -741,6 +751,8 @@ internal actor DefaultRoomLifecycleManager<Contributor: RoomLifecycleContributor
private enum OperationKind {
/// The RETRY operation.
case retry(triggeringContributor: Contributor, errorForSuspendedStatus: ARTErrorInfo)
/// The RUNDOWN operation.
case rundown(errorForFailedStatus: ARTErrorInfo)
}

/// Requests that a room lifecycle operation be performed asynchronously.
Expand All @@ -754,6 +766,10 @@ internal actor DefaultRoomLifecycleManager<Contributor: RoomLifecycleContributor
triggeredByContributor: triggeringContributor,
errorForSuspendedStatus: errorForSuspendedStatus
)
case let .rundown(errorForFailedStatus):
await performRundownOperation(
errorForFailedStatus: errorForFailedStatus
)
}
}
}
Expand Down Expand Up @@ -789,7 +805,7 @@ internal actor DefaultRoomLifecycleManager<Contributor: RoomLifecycleContributor
case .released:
// CHA-RL1c
throw ARTErrorInfo(chatError: .roomIsReleased)
case .initialized, .suspendedAwaitingStartOfRetryOperation, .suspended, .attachingDueToAttachOperation, .attachingDueToRetryOperation, .attachingDueToContributorStateChange, .detached, .detachedDueToRetryOperation, .detaching, .failed:
case .initialized, .suspendedAwaitingStartOfRetryOperation, .suspended, .attachingDueToAttachOperation, .attachingDueToRetryOperation, .attachingDueToContributorStateChange, .detached, .detachedDueToRetryOperation, .detaching, .failed, .failedAwaitingStartOfRundownOperation, .failedAndPerformingRundownOperation:
break
}

Expand Down Expand Up @@ -832,14 +848,18 @@ internal actor DefaultRoomLifecycleManager<Contributor: RoomLifecycleContributor
changeStatus(to: .suspendedAwaitingStartOfRetryOperation(retryOperationTask: retryOperationTask, error: error))
throw error
case .failed:
// CHA-RL1h4
let error = ARTErrorInfo(chatError: .attachmentFailed(feature: contributor.feature, underlyingError: contributorAttachError))
changeStatus(to: .failed(error: error))

// CHA-RL1h5
// TODO: Implement the "asynchronously with respect to CHA-RL1h4" part of CHA-RL1h5 (https://github.com/ably-labs/ably-chat-swift/issues/50)
await detachNonFailedContributors()
// My understanding is that, since this task is being created inside an actor’s synchronous code, the two .failed* statuses will always come in the right order; i.e. first .failedAwaitingStartOfRundownOperation and then .failedAndPerformingRundownOperation.
let rundownOperationTask = scheduleAnOperation(
kind: .rundown(
errorForFailedStatus: error
)
)

// CHA-RL1h4
changeStatus(to: .failedAwaitingStartOfRundownOperation(rundownOperationTask: rundownOperationTask, error: error))
throw error
default:
// TODO: The spec assumes the channel will be in one of the above states, but working in a multi-threaded environment means it might not be (https://github.com/ably-labs/ably-chat-swift/issues/49)
Expand Down Expand Up @@ -873,23 +893,6 @@ internal actor DefaultRoomLifecycleManager<Contributor: RoomLifecycleContributor
contributorAnnotations.clearPendingDiscontinuityEvents()
}

/// Implements CHA-RL1h5’s "detach all channels that are not in the FAILED state".
private func detachNonFailedContributors() async {
for contributor in contributors where await (contributor.channel.state) != .failed {
// CHA-RL1h6: Retry until detach succeeds
while true {
do {
logger.log(message: "Detaching non-failed contributor \(contributor)", level: .info)
try await contributor.channel.detach()
break
} catch {
logger.log(message: "Failed to detach non-failed contributor \(contributor), error \(error). Retrying.", level: .info)
// Loop repeats
}
}
}
}

// MARK: - DETACH operation

internal func performDetachOperation() async throws {
Expand Down Expand Up @@ -921,7 +924,7 @@ internal actor DefaultRoomLifecycleManager<Contributor: RoomLifecycleContributor
case .released:
// CHA-RL2c
throw ARTErrorInfo(chatError: .roomIsReleased)
case .failed:
case .failed, .failedAwaitingStartOfRundownOperation, .failedAndPerformingRundownOperation:
// CHA-RL2d
throw ARTErrorInfo(chatError: .roomInFailedState)
case .initialized, .suspendedAwaitingStartOfRetryOperation, .suspended, .attachingDueToAttachOperation, .attachingDueToRetryOperation, .attachingDueToContributorStateChange, .attached, .detaching:
Expand Down Expand Up @@ -1056,7 +1059,7 @@ internal actor DefaultRoomLifecycleManager<Contributor: RoomLifecycleContributor
// See note on waitForCompletionOfOperationWithID for the current need for this force try
// swiftlint:disable:next force_try
return try! await waitForCompletionOfOperationWithID(releaseOperationID, requester: .anotherOperation(operationID: operationID))
case .attached, .attachingDueToAttachOperation, .attachingDueToRetryOperation, .attachingDueToContributorStateChange, .detaching, .suspendedAwaitingStartOfRetryOperation, .suspended, .failed:
case .attached, .attachingDueToAttachOperation, .attachingDueToRetryOperation, .attachingDueToContributorStateChange, .detaching, .suspendedAwaitingStartOfRetryOperation, .suspended, .failed, .failedAwaitingStartOfRundownOperation, .failedAndPerformingRundownOperation:
break
}

Expand Down Expand Up @@ -1197,6 +1200,54 @@ internal actor DefaultRoomLifecycleManager<Contributor: RoomLifecycleContributor
}
}

// MARK: - RUNDOWN operation

/// Implements the RUNDOWN operation.
///
/// This operation is not currently in the specification, but it comes from my suggestion in https://github.com/ably/specification/issues/253 for how to handle the fact that the spec, as currently written, does not guarantee that the CHA-RL1h5 detach behaviour is performed atomically with respect to room lifecycle operations. TODO bring in line with spec once spec updated.
///
/// - Parameters:
/// - forcedOperationID: Allows tests to force the operation to have a given ID. In combination with the ``testsOnly_subscribeToOperationWaitEvents`` API, this allows tests to verify that one test-initiated operation is waiting for another test-initiated operation.
internal func performRundownOperation(testsOnly_forcingOperationID forcedOperationID: UUID? = nil, errorForFailedStatus: ARTErrorInfo) async {
// See note on performAnOperation for the current need for this force try
// swiftlint:disable:next force_try
try! await performAnOperation(forcingOperationID: forcedOperationID) { operationID in
lawrence-forooghian marked this conversation as resolved.
Show resolved Hide resolved
await bodyOfRundownOperation(
operationID: operationID,
errorForFailedStatus: errorForFailedStatus
)
}
}

private func bodyOfRundownOperation(
operationID: UUID,
errorForFailedStatus: ARTErrorInfo
) async {
changeStatus(to: .failedAndPerformingRundownOperation(rundownOperationID: operationID, error: errorForFailedStatus))

// CHA-RL1h5
await detachNonFailedContributors()

changeStatus(to: .failed(error: errorForFailedStatus))
}

/// Implements CHA-RL1h5’s "detach all channels that are not in the FAILED state".
private func detachNonFailedContributors() async {
for contributor in contributors where await (contributor.channel.state) != .failed {
// CHA-RL1h6: Retry until detach succeeds
while true {
do {
logger.log(message: "Detaching non-failed contributor \(contributor)", level: .info)
try await contributor.channel.detach()
break
} catch {
logger.log(message: "Failed to detach non-failed contributor \(contributor), error \(error). Retrying.", level: .info)
// Loop repeats
}
}
}
}

// MARK: - Waiting to be able to perform presence operations

internal func waitToBeAbleToPerformPresenceOperations(requestedByFeature requester: RoomFeature) async throws(ARTErrorInfo) {
Expand Down
Loading
Loading