From 370d4446025388d2c35df11bdb1ca772785c16c3 Mon Sep 17 00:00:00 2001 From: Umair Date: Thu, 21 Nov 2024 02:14:03 +0000 Subject: [PATCH] Runs room checks prior to performing presence operations via `featureChannel.waitToBeAbleToPerformPresenceOperations` which was implemented as part of [1]. It also became necessary to clean up the example app as this uncovered race conditions between concurrently running Tasks. --- Example/AblyChatExample/ContentView.swift | 104 +++++++++++++-------- Sources/AblyChat/DefaultPresence.swift | 52 +++++++++++ Tests/AblyChatTests/IntegrationTests.swift | 33 ++++--- 3 files changed, 135 insertions(+), 54 deletions(-) diff --git a/Example/AblyChatExample/ContentView.swift b/Example/AblyChatExample/ContentView.swift index 46a062ad..158e6209 100644 --- a/Example/AblyChatExample/ContentView.swift +++ b/Example/AblyChatExample/ContentView.swift @@ -134,12 +134,14 @@ struct ContentView: View { } } } - .tryTask { try await setDefaultTitle() } - .tryTask { try await attachRoom() } - .tryTask { try await showMessages() } - .tryTask { try await showReactions() } - .tryTask { try await showPresence() } - .tryTask { try await showOccupancy() } + .tryTask { + try await setDefaultTitle() + try await attachRoom() + try await showMessages() + try await showReactions() + try await showPresence() + try await showOccupancy() + } .tryTask { // NOTE: As we implement more features, move them out of the `if Environment.current == .mock` block and into the main block just above. if Environment.current == .mock { @@ -179,18 +181,25 @@ struct ContentView: View { } } - for await message in messagesSubscription { - withAnimation { - messages.insert(BasicListItem(id: message.timeserial, title: message.clientID, text: message.text), at: 0) + // Continue listening for messages on a background task so this function can return + Task { + for await message in messagesSubscription { + withAnimation { + messages.insert(BasicListItem(id: message.timeserial, title: message.clientID, text: message.text), at: 0) + } } } } func showReactions() async throws { let reactionSubscription = try await room().reactions.subscribe(bufferingPolicy: .unbounded) - for await reaction in reactionSubscription { - withAnimation { - showReaction(reaction.displayedText) + + // Continue listening for reactions on a background task so this function can return + Task { + for await reaction in reactionSubscription { + withAnimation { + showReaction(reaction.displayedText) + } } } } @@ -198,25 +207,31 @@ struct ContentView: View { func showPresence() async throws { try await room().presence.enter(data: .init(userCustomData: ["status": .string("📱 Online")])) - for await event in try await room().presence.subscribe(events: [.enter, .leave, .update]) { - withAnimation { - let status = event.data?.userCustomData?["status"]?.value as? String - let clientPresenceChangeMessage = "\(event.clientID) \(event.action.displayedText)" - let presenceMessage = status != nil ? "\(clientPresenceChangeMessage) with status: \(status!)" : clientPresenceChangeMessage + // Continue listening for new presence events on a background task so this function can return + Task { + for await event in try await room().presence.subscribe(events: [.enter, .leave, .update]) { + withAnimation { + let status = event.data?.userCustomData?["status"]?.value as? String + let clientPresenceChangeMessage = "\(event.clientID) \(event.action.displayedText)" + let presenceMessage = status != nil ? "\(clientPresenceChangeMessage) with status: \(status!)" : clientPresenceChangeMessage - messages.insert(BasicListItem(id: UUID().uuidString, title: "System", text: presenceMessage), at: 0) + messages.insert(BasicListItem(id: UUID().uuidString, title: "System", text: presenceMessage), at: 0) + } } } } func showTypings() async throws { - for await typing in try await room().typing.subscribe(bufferingPolicy: .unbounded) { - withAnimation { - typingInfo = "Typing: \(typing.currentlyTyping.joined(separator: ", "))..." - Task { - try? await Task.sleep(nanoseconds: 1 * 1_000_000_000) - withAnimation { - typingInfo = "" + // Continue listening for typing events on a background task so this function can return + Task { + for await typing in try await room().typing.subscribe(bufferingPolicy: .unbounded) { + withAnimation { + typingInfo = "Typing: \(typing.currentlyTyping.joined(separator: ", "))..." + Task { + try? await Task.sleep(nanoseconds: 1 * 1_000_000_000) + withAnimation { + typingInfo = "" + } } } } @@ -224,25 +239,36 @@ struct ContentView: View { } func showOccupancy() async throws { - for await event in try await room().occupancy.subscribe(bufferingPolicy: .unbounded) { - withAnimation { - occupancyInfo = "Connections: \(event.presenceMembers) (\(event.connections))" + // Continue listening for occupancy events on a background task so this function can return + let currentOccupancy = try await room().occupancy.get() + withAnimation { + occupancyInfo = "Connections: \(currentOccupancy.presenceMembers) (\(currentOccupancy.connections))" + } + + Task { + for await event in try await room().occupancy.subscribe(bufferingPolicy: .unbounded) { + withAnimation { + occupancyInfo = "Connections: \(event.presenceMembers) (\(event.connections))" + } } } } func showRoomStatus() async throws { - for await status in try await room().onStatusChange(bufferingPolicy: .unbounded) { - withAnimation { - if status.current.isAttaching { - statusInfo = "\(status.current)...".capitalized - } else { - statusInfo = "\(status.current)".capitalized - if status.current == .attached { - Task { - try? await Task.sleep(nanoseconds: 1 * 1_000_000_000) - withAnimation { - statusInfo = "" + // Continue listening for status change events on a background task so this function can return + Task { + for await status in try await room().onStatusChange(bufferingPolicy: .unbounded) { + withAnimation { + if status.current.isAttaching { + statusInfo = "\(status.current)...".capitalized + } else { + statusInfo = "\(status.current)".capitalized + if status.current == .attached { + Task { + try? await Task.sleep(nanoseconds: 1 * 1_000_000_000) + withAnimation { + statusInfo = "" + } } } } diff --git a/Sources/AblyChat/DefaultPresence.swift b/Sources/AblyChat/DefaultPresence.swift index 789c2a16..fbeca98d 100644 --- a/Sources/AblyChat/DefaultPresence.swift +++ b/Sources/AblyChat/DefaultPresence.swift @@ -21,6 +21,15 @@ internal final class DefaultPresence: Presence, EmitsDiscontinuities { // (CHA-PR6) It must be possible to retrieve all the @Members of the presence set. The behaviour depends on the current room status, as presence operations in a Realtime Client cause implicit attaches. internal func get() async throws -> [PresenceMember] { logger.log(message: "Getting presence", level: .debug) + + // CHA-PR6b to CHA-PR6f + do { + try await featureChannel.waitToBeAbleToPerformPresenceOperations(requestedByFeature: RoomFeature.presence) + } catch { + logger.log(message: "Error waiting to be able to perform presence get operation: \(error)", level: .error) + throw error + } + return try await withCheckedThrowingContinuation { continuation in channel.presence.get { [processPresenceGet] members, error in do { @@ -36,6 +45,15 @@ internal final class DefaultPresence: Presence, EmitsDiscontinuities { internal func get(params: PresenceQuery) async throws -> [PresenceMember] { logger.log(message: "Getting presence with params: \(params)", level: .debug) + + // CHA-PR6b to CHA-PR6f + do { + try await featureChannel.waitToBeAbleToPerformPresenceOperations(requestedByFeature: RoomFeature.presence) + } catch { + logger.log(message: "Error waiting to be able to perform presence get operation: \(error)", level: .error) + throw error + } + return try await withCheckedThrowingContinuation { continuation in channel.presence.get(params.asARTRealtimePresenceQuery()) { [processPresenceGet] members, error in do { @@ -52,6 +70,15 @@ internal final class DefaultPresence: Presence, EmitsDiscontinuities { // (CHA-PR5) It must be possible to query if a given clientId is in the presence set. internal func isUserPresent(clientID: String) async throws -> Bool { logger.log(message: "Checking if user is present with clientID: \(clientID)", level: .debug) + + // CHA-PR6b to CHA-PR6f + do { + try await featureChannel.waitToBeAbleToPerformPresenceOperations(requestedByFeature: RoomFeature.presence) + } catch { + logger.log(message: "Error waiting to be able to perform presence get operation: \(error)", level: .error) + throw error + } + return try await withCheckedThrowingContinuation { continuation in channel.presence.get(ARTRealtimePresenceQuery(clientId: clientID, connectionId: nil)) { [logger] members, error in guard let members else { @@ -68,6 +95,14 @@ internal final class DefaultPresence: Presence, EmitsDiscontinuities { // (CHA-PR3a) Users may choose to enter presence, optionally providing custom data to enter with. The overall presence data must retain the format specified in CHA-PR2. internal func enter(data: PresenceData? = nil) async throws { logger.log(message: "Entering presence", level: .debug) + + // CHA-PR3c to CHA-PR3g + do { + try await featureChannel.waitToBeAbleToPerformPresenceOperations(requestedByFeature: RoomFeature.presence) + } catch { + logger.log(message: "Error waiting to be able to perform presence enter operation: \(error)", level: .error) + throw error + } return try await withCheckedThrowingContinuation { continuation in channel.presence.enterClient(clientID, data: data?.asQueryItems()) { [logger] error in if let error { @@ -83,6 +118,15 @@ internal final class DefaultPresence: Presence, EmitsDiscontinuities { // (CHA-PR10a) Users may choose to update their presence data, optionally providing custom data to update with. The overall presence data must retain the format specified in CHA-PR2. internal func update(data: PresenceData? = nil) async throws { logger.log(message: "Updating presence", level: .debug) + + // CHA-PR10c to CHA-PR10g + do { + try await featureChannel.waitToBeAbleToPerformPresenceOperations(requestedByFeature: RoomFeature.presence) + } catch { + logger.log(message: "Error waiting to be able to perform presence update operation: \(error)", level: .error) + throw error + } + return try await withCheckedThrowingContinuation { continuation in channel.presence.update(data?.asQueryItems()) { [logger] error in if let error { @@ -98,6 +142,14 @@ internal final class DefaultPresence: Presence, EmitsDiscontinuities { // (CHA-PR4a) Users may choose to leave presence, which results in them being removed from the Realtime presence set. internal func leave(data: PresenceData? = nil) async throws { logger.log(message: "Leaving presence", level: .debug) + + // CHA-PR6b to CHA-PR6f + do { + try await featureChannel.waitToBeAbleToPerformPresenceOperations(requestedByFeature: RoomFeature.presence) + } catch { + logger.log(message: "Error waiting to be able to perform presence leave operation: \(error)", level: .error) + throw error + } return try await withCheckedThrowingContinuation { continuation in channel.presence.leave(data?.asQueryItems()) { [logger] error in if let error { diff --git a/Tests/AblyChatTests/IntegrationTests.swift b/Tests/AblyChatTests/IntegrationTests.swift index 436d3791..3c83799f 100644 --- a/Tests/AblyChatTests/IntegrationTests.swift +++ b/Tests/AblyChatTests/IntegrationTests.swift @@ -109,17 +109,20 @@ struct IntegrationTests { // (13) Subscribe to occupancy let rxOccupancySubscription = await rxRoom.occupancy.subscribe(bufferingPolicy: .unbounded) - // (14) Enter presence on the other client and check that we receive the updated occupancy on the subscription + // (14) Attach the room so we can perform presence operations + try await txRoom.attach() + + // (15) Enter presence on the other client and check that we receive the updated occupancy on the subscription try await txRoom.presence.enter(data: nil) // It can take a moment for the occupancy to update from the clients entering presence above, so we’ll wait 2 seconds here. try await Task.sleep(nanoseconds: 2_000_000_000) - // (15) Check that we received an updated presence count when getting the occupancy + // (16) Check that we received an updated presence count when getting the occupancy let updatedCurrentOccupancy = try await rxRoom.occupancy.get() #expect(updatedCurrentOccupancy.presenceMembers == 1) // 1 for txClient entering presence - // (16) Check that we received an updated presence count on the subscription + // (17) Check that we received an updated presence count on the subscription let rxOccupancyEventFromSubscription = try #require(await rxOccupancySubscription.first { _ in true }) #expect(rxOccupancyEventFromSubscription.presenceMembers == 1) // 1 for txClient entering presence @@ -131,40 +134,40 @@ struct IntegrationTests { // MARK: - Presence - // (17) Subscribe to presence + // (18) Subscribe to presence let rxPresenceSubscription = await rxRoom.presence.subscribe(events: [.enter, .leave, .update]) - // (18) Send `.enter` presence event with custom data on the other client and check that we receive it on the subscription + // (19) Send `.enter` presence event with custom data on the other client and check that we receive it on the subscription try await txRoom.presence.enter(data: .init(userCustomData: ["randomData": .string("randomValue")])) let rxPresenceEnterTxEvent = try #require(await rxPresenceSubscription.first { _ in true }) #expect(rxPresenceEnterTxEvent.action == .enter) #expect(rxPresenceEnterTxEvent.data?.userCustomData?["randomData"]?.value as? String == "randomValue") - // (19) Send `.update` presence event with custom data on the other client and check that we receive it on the subscription + // (20) Send `.update` presence event with custom data on the other client and check that we receive it on the subscription try await txRoom.presence.update(data: .init(userCustomData: ["randomData": .string("randomValue")])) let rxPresenceUpdateTxEvent = try #require(await rxPresenceSubscription.first { _ in true }) #expect(rxPresenceUpdateTxEvent.action == .update) #expect(rxPresenceUpdateTxEvent.data?.userCustomData?["randomData"]?.value as? String == "randomValue") - // (20) Send `.leave` presence event with custom data on the other client and check that we receive it on the subscription + // (21) Send `.leave` presence event with custom data on the other client and check that we receive it on the subscription try await txRoom.presence.leave(data: .init(userCustomData: ["randomData": .string("randomValue")])) let rxPresenceLeaveTxEvent = try #require(await rxPresenceSubscription.first { _ in true }) #expect(rxPresenceLeaveTxEvent.action == .leave) #expect(rxPresenceLeaveTxEvent.data?.userCustomData?["randomData"]?.value as? String == "randomValue") - // (21) Send `.enter` presence event with custom data on our client and check that we receive it on the subscription + // (22) Send `.enter` presence event with custom data on our client and check that we receive it on the subscription try await txRoom.presence.enter(data: .init(userCustomData: ["randomData": .string("randomValue")])) let rxPresenceEnterRxEvent = try #require(await rxPresenceSubscription.first { _ in true }) #expect(rxPresenceEnterRxEvent.action == .enter) #expect(rxPresenceEnterRxEvent.data?.userCustomData?["randomData"]?.value as? String == "randomValue") - // (22) Send `.update` presence event with custom data on our client and check that we receive it on the subscription + // (23) Send `.update` presence event with custom data on our client and check that we receive it on the subscription try await txRoom.presence.update(data: .init(userCustomData: ["randomData": .string("randomValue")])) let rxPresenceUpdateRxEvent = try #require(await rxPresenceSubscription.first { _ in true }) #expect(rxPresenceUpdateRxEvent.action == .update) #expect(rxPresenceUpdateRxEvent.data?.userCustomData?["randomData"]?.value as? String == "randomValue") - // (23) Send `.leave` presence event with custom data on our client and check that we receive it on the subscription + // (24) Send `.leave` presence event with custom data on our client and check that we receive it on the subscription try await txRoom.presence.leave(data: .init(userCustomData: ["randomData": .string("randomValue")])) let rxPresenceLeaveRxEvent = try #require(await rxPresenceSubscription.first { _ in true }) #expect(rxPresenceLeaveRxEvent.action == .leave) @@ -172,23 +175,23 @@ struct IntegrationTests { // MARK: - Detach - // (24) Detach the room + // (25) Detach the room try await rxRoom.detach() - // (25) Check that we received a DETACHED status change as a result of detaching the room + // (26) Check that we received a DETACHED status change as a result of detaching the room _ = try #require(await rxRoomStatusSubscription.first { $0.current == .detached }) #expect(await rxRoom.status == .detached) // MARK: - Release - // (26) Release the room + // (27) Release the room try await rxClient.rooms.release(roomID: roomID) - // (27) Check that we received a RELEASED status change as a result of releasing the room + // (28) Check that we received a RELEASED status change as a result of releasing the room _ = try #require(await rxRoomStatusSubscription.first { $0.current == .released }) #expect(await rxRoom.status == .released) - // (28) Fetch the room we just released and check it’s a new object + // (29) Fetch the room we just released and check it’s a new object let postReleaseRxRoom = try await rxClient.rooms.get(roomID: roomID, options: .init()) #expect(postReleaseRxRoom !== rxRoom) }