Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into 1-add-documentation-c…
Browse files Browse the repository at this point in the history
…omments

# Conflicts:
#	Sources/AblyChat/Messages.swift
#	Sources/AblyChat/Presence.swift
  • Loading branch information
maratal committed Dec 9, 2024
2 parents 9935fb0 + 76984aa commit b4a89f4
Show file tree
Hide file tree
Showing 4 changed files with 219 additions and 79 deletions.
8 changes: 3 additions & 5 deletions Sources/AblyChat/Messages.swift
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public struct SendMessageParams: Sendable {
* Options for querying messages in a chat room.
*/
public struct QueryOptions: Sendable {
public enum ResultOrder: Sendable {
public enum OrderBy: Sendable {
case oldestFirst
case newestFirst
}
Expand Down Expand Up @@ -143,15 +143,13 @@ public struct QueryOptions: Sendable {
* The direction to query messages in.
* If ``ResultOrder/oldestFirst``, the response will include messages from the start of the time window to the end.
* If ``ResultOrder/newestFirst``, the response will include messages from the end of the time window to the start.
*
* Defaults to `oldestFirst`.
*/
public var orderBy: ResultOrder?
public var orderBy: OrderBy?

// (CHA-M5g) The subscribers subscription point must be additionally specified (internally, by us) in the fromSerial query parameter.
internal var fromSerial: String?

public init(start: Date? = nil, end: Date? = nil, limit: Int? = nil, orderBy: QueryOptions.ResultOrder? = nil) {
public init(start: Date? = nil, end: Date? = nil, limit: Int? = nil, orderBy: QueryOptions.OrderBy? = nil) {
self.start = start
self.end = end
self.limit = limit
Expand Down
39 changes: 34 additions & 5 deletions Sources/AblyChat/Presence.swift
Original file line number Diff line number Diff line change
Expand Up @@ -155,11 +155,6 @@ public protocol Presence: AnyObject, Sendable, EmitsDiscontinuities {
*/
func subscribe(event: PresenceEventType, bufferingPolicy: BufferingPolicy) async -> Subscription<PresenceEvent>

/// Same as calling ``subscribe(event:bufferingPolicy:)`` with ``BufferingPolicy.unbounded``.
///
/// The `Presence` protocol provides a default implementation of this method.
func subscribe(event: PresenceEventType) async -> Subscription<PresenceEvent>

/**
* Subscribes a given listener to different presense events in the chat room.
*
Expand All @@ -171,12 +166,46 @@ public protocol Presence: AnyObject, Sendable, EmitsDiscontinuities {
*/
func subscribe(events: [PresenceEventType], bufferingPolicy: BufferingPolicy) async -> Subscription<PresenceEvent>

/// Same as calling ``enter(data:)`` with `nil`.
///
/// The `Presence` protocol provides a default implementation of this method.
func enter() async throws

/// Same as calling ``update(data:)`` with `nil`.
///
/// The `Presence` protocol provides a default implementation of this method.
func update() async throws

/// Same as calling ``leave(data:)`` with `nil`.
///
/// The `Presence` protocol provides a default implementation of this method.
func leave() async throws

/// Same as calling ``subscribe(event:bufferingPolicy:)`` with ``BufferingPolicy.unbounded``.
///
/// The `Presence` protocol provides a default implementation of this method.
func subscribe(event: PresenceEventType) async -> Subscription<PresenceEvent>

/// Same as calling ``subscribe(events:bufferingPolicy:)`` with ``BufferingPolicy.unbounded``.
///
/// The `Presence` protocol provides a default implementation of this method.
func subscribe(events: [PresenceEventType]) async -> Subscription<PresenceEvent>
}

public extension Presence {
func enter() async throws {
try await enter(data: nil)
}

func update() async throws {
try await update(data: nil)
}

func leave() async throws {
try await leave(data: nil)
}
}

public extension Presence {
func subscribe(event: PresenceEventType) async -> Subscription<PresenceEvent> {
await subscribe(event: event, bufferingPolicy: .unbounded)
Expand Down
101 changes: 76 additions & 25 deletions Sources/AblyChat/RoomLifecycleManager.swift
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,9 @@ internal actor DefaultRoomLifecycleManager<Contributor: RoomLifecycleContributor
// `retryOperationTask` is exposed so that tests can wait for the triggered RETRY operation to complete.
case suspendedAwaitingStartOfRetryOperation(retryOperationTask: Task<Void, Never>, error: ARTErrorInfo)
case suspended(retryOperationID: UUID, error: ARTErrorInfo)
// `rundownOperationTask` is exposed so that tests can wait for the triggered RUNDOWN operation to complete.
case failedAwaitingStartOfRundownOperation(rundownOperationTask: Task<Void, Never>, error: ARTErrorInfo)
case failedAndPerformingRundownOperation(rundownOperationID: UUID, error: ARTErrorInfo)
case failed(error: ARTErrorInfo)
case releasing(releaseOperationID: UUID)
case released
Expand All @@ -216,6 +219,10 @@ internal actor DefaultRoomLifecycleManager<Contributor: RoomLifecycleContributor
.suspended(error: error)
case let .suspended(_, error):
.suspended(error: error)
case let .failedAwaitingStartOfRundownOperation(_, error):
.failed(error: error)
case let .failedAndPerformingRundownOperation(_, error):
.failed(error: error)
case let .failed(error):
.failed(error: error)
case .releasing:
Expand All @@ -239,9 +246,12 @@ internal actor DefaultRoomLifecycleManager<Contributor: RoomLifecycleContributor
releaseOperationID
case let .suspended(retryOperationID, _):
retryOperationID
case let .failedAndPerformingRundownOperation(rundownOperationID, _):
rundownOperationID
case .initialized,
.attached,
.detached,
.failedAwaitingStartOfRundownOperation,
.failed,
.released,
.attachingDueToContributorStateChange,
Expand Down Expand Up @@ -355,7 +365,7 @@ internal actor DefaultRoomLifecycleManager<Contributor: RoomLifecycleContributor
let previous = status
status = new

// Avoid a double-emit of room status when changing from `.suspendedAwaitingStartOfRetryOperation` to `.suspended`.
// Avoid a double-emit of room status when changing between `Status` values that map to the same `RoomStatus`; e.g. when changing from `.suspendedAwaitingStartOfRetryOperation` to `.suspended`.
if new.toRoomStatus != previous.toRoomStatus {
let statusChange = RoomStatusChange(current: status.toRoomStatus, previous: previous.toRoomStatus)
emitRoomStatusChange(statusChange)
Expand Down Expand Up @@ -741,6 +751,8 @@ internal actor DefaultRoomLifecycleManager<Contributor: RoomLifecycleContributor
private enum OperationKind {
/// The RETRY operation.
case retry(triggeringContributor: Contributor, errorForSuspendedStatus: ARTErrorInfo)
/// The RUNDOWN operation.
case rundown(errorForFailedStatus: ARTErrorInfo)
}

/// Requests that a room lifecycle operation be performed asynchronously.
Expand All @@ -754,6 +766,10 @@ internal actor DefaultRoomLifecycleManager<Contributor: RoomLifecycleContributor
triggeredByContributor: triggeringContributor,
errorForSuspendedStatus: errorForSuspendedStatus
)
case let .rundown(errorForFailedStatus):
await performRundownOperation(
errorForFailedStatus: errorForFailedStatus
)
}
}
}
Expand Down Expand Up @@ -789,7 +805,7 @@ internal actor DefaultRoomLifecycleManager<Contributor: RoomLifecycleContributor
case .released:
// CHA-RL1c
throw ARTErrorInfo(chatError: .roomIsReleased)
case .initialized, .suspendedAwaitingStartOfRetryOperation, .suspended, .attachingDueToAttachOperation, .attachingDueToRetryOperation, .attachingDueToContributorStateChange, .detached, .detachedDueToRetryOperation, .detaching, .failed:
case .initialized, .suspendedAwaitingStartOfRetryOperation, .suspended, .attachingDueToAttachOperation, .attachingDueToRetryOperation, .attachingDueToContributorStateChange, .detached, .detachedDueToRetryOperation, .detaching, .failed, .failedAwaitingStartOfRundownOperation, .failedAndPerformingRundownOperation:
break
}

Expand Down Expand Up @@ -832,14 +848,18 @@ internal actor DefaultRoomLifecycleManager<Contributor: RoomLifecycleContributor
changeStatus(to: .suspendedAwaitingStartOfRetryOperation(retryOperationTask: retryOperationTask, error: error))
throw error
case .failed:
// CHA-RL1h4
let error = ARTErrorInfo(chatError: .attachmentFailed(feature: contributor.feature, underlyingError: contributorAttachError))
changeStatus(to: .failed(error: error))

// CHA-RL1h5
// TODO: Implement the "asynchronously with respect to CHA-RL1h4" part of CHA-RL1h5 (https://github.com/ably-labs/ably-chat-swift/issues/50)
await detachNonFailedContributors()
// My understanding is that, since this task is being created inside an actor’s synchronous code, the two .failed* statuses will always come in the right order; i.e. first .failedAwaitingStartOfRundownOperation and then .failedAndPerformingRundownOperation.
let rundownOperationTask = scheduleAnOperation(
kind: .rundown(
errorForFailedStatus: error
)
)

// CHA-RL1h4
changeStatus(to: .failedAwaitingStartOfRundownOperation(rundownOperationTask: rundownOperationTask, error: error))
throw error
default:
// TODO: The spec assumes the channel will be in one of the above states, but working in a multi-threaded environment means it might not be (https://github.com/ably-labs/ably-chat-swift/issues/49)
Expand Down Expand Up @@ -873,23 +893,6 @@ internal actor DefaultRoomLifecycleManager<Contributor: RoomLifecycleContributor
contributorAnnotations.clearPendingDiscontinuityEvents()
}

/// Implements CHA-RL1h5’s "detach all channels that are not in the FAILED state".
private func detachNonFailedContributors() async {
for contributor in contributors where await (contributor.channel.state) != .failed {
// CHA-RL1h6: Retry until detach succeeds
while true {
do {
logger.log(message: "Detaching non-failed contributor \(contributor)", level: .info)
try await contributor.channel.detach()
break
} catch {
logger.log(message: "Failed to detach non-failed contributor \(contributor), error \(error). Retrying.", level: .info)
// Loop repeats
}
}
}
}

// MARK: - DETACH operation

internal func performDetachOperation() async throws {
Expand Down Expand Up @@ -921,7 +924,7 @@ internal actor DefaultRoomLifecycleManager<Contributor: RoomLifecycleContributor
case .released:
// CHA-RL2c
throw ARTErrorInfo(chatError: .roomIsReleased)
case .failed:
case .failed, .failedAwaitingStartOfRundownOperation, .failedAndPerformingRundownOperation:
// CHA-RL2d
throw ARTErrorInfo(chatError: .roomInFailedState)
case .initialized, .suspendedAwaitingStartOfRetryOperation, .suspended, .attachingDueToAttachOperation, .attachingDueToRetryOperation, .attachingDueToContributorStateChange, .attached, .detaching:
Expand Down Expand Up @@ -1056,7 +1059,7 @@ internal actor DefaultRoomLifecycleManager<Contributor: RoomLifecycleContributor
// See note on waitForCompletionOfOperationWithID for the current need for this force try
// swiftlint:disable:next force_try
return try! await waitForCompletionOfOperationWithID(releaseOperationID, requester: .anotherOperation(operationID: operationID))
case .attached, .attachingDueToAttachOperation, .attachingDueToRetryOperation, .attachingDueToContributorStateChange, .detaching, .suspendedAwaitingStartOfRetryOperation, .suspended, .failed:
case .attached, .attachingDueToAttachOperation, .attachingDueToRetryOperation, .attachingDueToContributorStateChange, .detaching, .suspendedAwaitingStartOfRetryOperation, .suspended, .failed, .failedAwaitingStartOfRundownOperation, .failedAndPerformingRundownOperation:
break
}

Expand Down Expand Up @@ -1197,6 +1200,54 @@ internal actor DefaultRoomLifecycleManager<Contributor: RoomLifecycleContributor
}
}

// MARK: - RUNDOWN operation

/// Implements the RUNDOWN operation.
///
/// This operation is not currently in the specification, but it comes from my suggestion in https://github.com/ably/specification/issues/253 for how to handle the fact that the spec, as currently written, does not guarantee that the CHA-RL1h5 detach behaviour is performed atomically with respect to room lifecycle operations. TODO bring in line with spec once spec updated.
///
/// - Parameters:
/// - forcedOperationID: Allows tests to force the operation to have a given ID. In combination with the ``testsOnly_subscribeToOperationWaitEvents`` API, this allows tests to verify that one test-initiated operation is waiting for another test-initiated operation.
internal func performRundownOperation(testsOnly_forcingOperationID forcedOperationID: UUID? = nil, errorForFailedStatus: ARTErrorInfo) async {
// See note on performAnOperation for the current need for this force try
// swiftlint:disable:next force_try
try! await performAnOperation(forcingOperationID: forcedOperationID) { operationID in
await bodyOfRundownOperation(
operationID: operationID,
errorForFailedStatus: errorForFailedStatus
)
}
}

private func bodyOfRundownOperation(
operationID: UUID,
errorForFailedStatus: ARTErrorInfo
) async {
changeStatus(to: .failedAndPerformingRundownOperation(rundownOperationID: operationID, error: errorForFailedStatus))

// CHA-RL1h5
await detachNonFailedContributors()

changeStatus(to: .failed(error: errorForFailedStatus))
}

/// Implements CHA-RL1h5’s "detach all channels that are not in the FAILED state".
private func detachNonFailedContributors() async {
for contributor in contributors where await (contributor.channel.state) != .failed {
// CHA-RL1h6: Retry until detach succeeds
while true {
do {
logger.log(message: "Detaching non-failed contributor \(contributor)", level: .info)
try await contributor.channel.detach()
break
} catch {
logger.log(message: "Failed to detach non-failed contributor \(contributor), error \(error). Retrying.", level: .info)
// Loop repeats
}
}
}
}

// MARK: - Waiting to be able to perform presence operations

internal func waitToBeAbleToPerformPresenceOperations(requestedByFeature requester: RoomFeature) async throws(ARTErrorInfo) {
Expand Down
Loading

0 comments on commit b4a89f4

Please sign in to comment.