Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ECO-4982] Implement Messages discontinuities (CHA-M7) #108

Merged
merged 1 commit into from
Nov 12, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 10 additions & 6 deletions Sources/AblyChat/DefaultMessages.swift
Original file line number Diff line number Diff line change
@@ -13,16 +13,16 @@ private struct MessageSubscriptionWrapper {
@MainActor
internal final class DefaultMessages: Messages, EmitsDiscontinuities {
private let roomID: String
public nonisolated let channel: RealtimeChannelProtocol
public nonisolated let featureChannel: FeatureChannel
lawrence-forooghian marked this conversation as resolved.
Show resolved Hide resolved
private let chatAPI: ChatAPI
private let clientID: String

// TODO: https://github.com/ably-labs/ably-chat-swift/issues/36 - Handle unsubscribing in line with CHA-M4b
// UUID acts as a unique identifier for each listener/subscription. MessageSubscriptionWrapper houses the subscription and the timeserial of when it was attached or resumed.
private var subscriptionPoints: [UUID: MessageSubscriptionWrapper] = [:]

internal nonisolated init(channel: RealtimeChannelProtocol, chatAPI: ChatAPI, roomID: String, clientID: String) async {
self.channel = channel
internal nonisolated init(featureChannel: FeatureChannel, chatAPI: ChatAPI, roomID: String, clientID: String) async {
self.featureChannel = featureChannel
lawrence-forooghian marked this conversation as resolved.
Show resolved Hide resolved
self.chatAPI = chatAPI
self.roomID = roomID
self.clientID = clientID
@@ -32,6 +32,10 @@ internal final class DefaultMessages: Messages, EmitsDiscontinuities {
await handleChannelEvents(roomId: roomID)
}

internal nonisolated var channel: any RealtimeChannelProtocol {
featureChannel.channel
}
lawrence-forooghian marked this conversation as resolved.
Show resolved Hide resolved

// (CHA-M4) Messages can be received via a subscription in realtime.
internal func subscribe(bufferingPolicy: BufferingPolicy) async throws -> MessageSubscription {
let uuid = UUID()
@@ -99,9 +103,9 @@ internal final class DefaultMessages: Messages, EmitsDiscontinuities {
try await chatAPI.sendMessage(roomId: roomID, params: params)
}

// TODO: (CHA-M7) Users may subscribe to discontinuity events to know when there’s been a break in messages that they need to resolve. Their listener will be called when a discontinuity event is triggered from the room lifecycle. - https://github.com/ably-labs/ably-chat-swift/issues/47
internal nonisolated func subscribeToDiscontinuities() -> Subscription<ARTErrorInfo> {
fatalError("not implemented")
// (CHA-M7) Users may subscribe to discontinuity events to know when there’s been a break in messages that they need to resolve. Their listener will be called when a discontinuity event is triggered from the room lifecycle.
internal func subscribeToDiscontinuities() async -> Subscription<ARTErrorInfo> {
await featureChannel.subscribeToDiscontinuities()
}

private func getBeforeSubscriptionStart(_ uuid: UUID, params: QueryOptions) async throws -> any PaginatedResult<Message> {
16 changes: 13 additions & 3 deletions Sources/AblyChat/DefaultRoomLifecycleContributor.swift
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import Ably

internal actor DefaultRoomLifecycleContributor: RoomLifecycleContributor {
internal actor DefaultRoomLifecycleContributor: RoomLifecycleContributor, EmitsDiscontinuities {
internal let channel: DefaultRoomLifecycleContributorChannel
internal let feature: RoomFeature
private var discontinuitySubscriptions: [Subscription<ARTErrorInfo>] = []

internal init(channel: DefaultRoomLifecycleContributorChannel, feature: RoomFeature) {
self.channel = channel
@@ -11,8 +12,17 @@ internal actor DefaultRoomLifecycleContributor: RoomLifecycleContributor {

// MARK: - Discontinuities

internal func emitDiscontinuity(_: ARTErrorInfo) {
// TODO: https://github.com/ably-labs/ably-chat-swift/issues/47
internal func emitDiscontinuity(_ error: ARTErrorInfo) {
for subscription in discontinuitySubscriptions {
subscription.emit(error)
}
}

internal func subscribeToDiscontinuities() -> Subscription<ARTErrorInfo> {
let subscription = Subscription<ARTErrorInfo>(bufferingPolicy: .unbounded)
// TODO: clean up old subscriptions (https://github.com/ably-labs/ably-chat-swift/issues/36)
discontinuitySubscriptions.append(subscription)
return subscription
}
}

19 changes: 7 additions & 12 deletions Sources/AblyChat/Room.swift
Original file line number Diff line number Diff line change
@@ -81,37 +81,32 @@ internal actor DefaultRoom<LifecycleManagerFactory: RoomLifecycleManagerFactory>
throw ARTErrorInfo.create(withCode: 40000, message: "Ensure your Realtime instance is initialized with a clientId.")
}

channels = Self.createChannels(roomID: roomID, realtime: realtime)
let contributors = Self.createContributors(channels: channels)
let featureChannels = Self.createFeatureChannels(roomID: roomID, realtime: realtime)
channels = featureChannels.mapValues(\.channel)
let contributors = featureChannels.values.map(\.contributor)

lifecycleManager = await lifecycleManagerFactory.createManager(
contributors: contributors,
logger: logger
)

messages = await DefaultMessages(
channel: channels[.messages]!,
featureChannel: featureChannels[.messages]!,
chatAPI: chatAPI,
roomID: roomID,
clientID: clientId
)
}

private static func createChannels(roomID: String, realtime: RealtimeClient) -> [RoomFeature: RealtimeChannelProtocol] {
private static func createFeatureChannels(roomID: String, realtime: RealtimeClient) -> [RoomFeature: DefaultFeatureChannel] {
.init(uniqueKeysWithValues: [RoomFeature.messages].map { feature in
let channel = realtime.getChannel(feature.channelNameForRoomID(roomID))
let contributor = DefaultRoomLifecycleContributor(channel: .init(underlyingChannel: channel), feature: feature)

return (feature, channel)
return (feature, .init(channel: channel, contributor: contributor))
})
}

private static func createContributors(channels: [RoomFeature: RealtimeChannelProtocol]) -> [DefaultRoomLifecycleContributor] {
channels.map { entry in
let (feature, channel) = entry
return .init(channel: .init(underlyingChannel: channel), feature: feature)
}
}

public nonisolated var presence: any Presence {
fatalError("Not yet implemented")
}
21 changes: 21 additions & 0 deletions Sources/AblyChat/RoomFeature.swift
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import Ably

/// The features offered by a chat room.
internal enum RoomFeature {
case messages
@@ -21,3 +23,22 @@ internal enum RoomFeature {
}
}
}

/// Provides all of the channel-related functionality that a room feature (e.g. an implementation of ``Messages``) needs.
///
/// This mishmash exists to give a room feature access to both:
///
/// - a `RealtimeChannelProtocol` object (this is the interface that our features are currently written against, as opposed to, say, `RoomLifecycleContributorChannel`)
/// - the discontinuities emitted by the room lifecycle
internal protocol FeatureChannel: Sendable, EmitsDiscontinuities {
var channel: RealtimeChannelProtocol { get }
}

internal struct DefaultFeatureChannel: FeatureChannel {
internal var channel: RealtimeChannelProtocol
internal var contributor: DefaultRoomLifecycleContributor

internal func subscribeToDiscontinuities() async -> Subscription<ARTErrorInfo> {
await contributor.subscribeToDiscontinuities()
}
}
29 changes: 26 additions & 3 deletions Tests/AblyChatTests/DefaultMessagesTests.swift
Original file line number Diff line number Diff line change
@@ -11,7 +11,8 @@ struct DefaultMessagesTests {
let realtime = MockRealtime.create()
let chatAPI = ChatAPI(realtime: realtime)
let channel = MockRealtimeChannel()
let defaultMessages = await DefaultMessages(channel: channel, chatAPI: chatAPI, roomID: "basketball", clientID: "clientId")
let featureChannel = MockFeatureChannel(channel: channel)
let defaultMessages = await DefaultMessages(featureChannel: featureChannel, chatAPI: chatAPI, roomID: "basketball", clientID: "clientId")

// Then
await #expect(throws: ARTErrorInfo.create(withCode: 40000, status: 400, message: "channel is attached, but channelSerial is not defined"), performing: {
@@ -28,7 +29,8 @@ struct DefaultMessagesTests {
let realtime = MockRealtime.create { (MockHTTPPaginatedResponse.successGetMessagesWithNoItems, nil) }
let chatAPI = ChatAPI(realtime: realtime)
let channel = MockRealtimeChannel()
let defaultMessages = await DefaultMessages(channel: channel, chatAPI: chatAPI, roomID: "basketball", clientID: "clientId")
let featureChannel = MockFeatureChannel(channel: channel)
let defaultMessages = await DefaultMessages(featureChannel: featureChannel, chatAPI: chatAPI, roomID: "basketball", clientID: "clientId")

// Then
await #expect(throws: Never.self, performing: {
@@ -52,7 +54,8 @@ struct DefaultMessagesTests {
channelSerial: "001"
)
)
let defaultMessages = await DefaultMessages(channel: channel, chatAPI: chatAPI, roomID: "basketball", clientID: "clientId")
let featureChannel = MockFeatureChannel(channel: channel)
let defaultMessages = await DefaultMessages(featureChannel: featureChannel, chatAPI: chatAPI, roomID: "basketball", clientID: "clientId")
let subscription = try await defaultMessages.subscribe(bufferingPolicy: .unbounded)
let expectedPaginatedResult = PaginatedResultWrapper<Message>(
paginatedResponse: MockHTTPPaginatedResponse.successGetMessagesWithNoItems,
@@ -65,4 +68,24 @@ struct DefaultMessagesTests {
// Then
#expect(previousMessages == expectedPaginatedResult)
}

// @spec CHA-M7
@Test
func subscribeToDiscontinuities() async throws {
// Given: A DefaultMessages instance
let realtime = MockRealtime.create()
let chatAPI = ChatAPI(realtime: realtime)
let channel = MockRealtimeChannel()
let featureChannel = MockFeatureChannel(channel: channel)
let messages = await DefaultMessages(featureChannel: featureChannel, chatAPI: chatAPI, roomID: "basketball", clientID: "clientId")

// When: The feature channel emits a discontinuity through `subscribeToDiscontinuities`
let featureChannelDiscontinuity = ARTErrorInfo.createUnknownError() // arbitrary
let messagesDiscontinuitySubscription = await messages.subscribeToDiscontinuities()
await featureChannel.emitDiscontinuity(featureChannelDiscontinuity)

// Then: The DefaultMessages instance emits this discontinuity through `subscribeToDiscontinuities`
let messagesDiscontinuity = try #require(await messagesDiscontinuitySubscription.first { _ in true })
maratal marked this conversation as resolved.
Show resolved Hide resolved
#expect(messagesDiscontinuity === featureChannelDiscontinuity)
}
}
24 changes: 24 additions & 0 deletions Tests/AblyChatTests/Mocks/MockFeatureChannel.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import Ably
@testable import AblyChat

final actor MockFeatureChannel: FeatureChannel {
let channel: RealtimeChannelProtocol
// TODO: clean up old subscriptions (https://github.com/ably-labs/ably-chat-swift/issues/36)
private var discontinuitySubscriptions: [Subscription<ARTErrorInfo>] = []

init(channel: RealtimeChannelProtocol) {
self.channel = channel
}
lawrence-forooghian marked this conversation as resolved.
Show resolved Hide resolved

func subscribeToDiscontinuities() async -> Subscription<ARTErrorInfo> {
let subscription = Subscription<ARTErrorInfo>(bufferingPolicy: .unbounded)
discontinuitySubscriptions.append(subscription)
return subscription
}

func emitDiscontinuity(_ discontinuity: ARTErrorInfo) {
for subscription in discontinuitySubscriptions {
subscription.emit(discontinuity)
}
}
}