Skip to content

Commit

Permalink
Emit discontinuities via Messages
Browse files Browse the repository at this point in the history
Based on same spec as 8daa191.

Resolves #47.
  • Loading branch information
lawrence-forooghian committed Nov 12, 2024
1 parent 8daa191 commit b1860c2
Show file tree
Hide file tree
Showing 6 changed files with 101 additions and 24 deletions.
16 changes: 10 additions & 6 deletions Sources/AblyChat/DefaultMessages.swift
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,16 @@ private struct MessageSubscriptionWrapper {
@MainActor
internal final class DefaultMessages: Messages, EmitsDiscontinuities {
private let roomID: String
public nonisolated let channel: RealtimeChannelProtocol
public nonisolated let featureChannel: FeatureChannel
private let chatAPI: ChatAPI
private let clientID: String

// TODO: https://github.com/ably-labs/ably-chat-swift/issues/36 - Handle unsubscribing in line with CHA-M4b
// UUID acts as a unique identifier for each listener/subscription. MessageSubscriptionWrapper houses the subscription and the timeserial of when it was attached or resumed.
private var subscriptionPoints: [UUID: MessageSubscriptionWrapper] = [:]

internal nonisolated init(channel: RealtimeChannelProtocol, chatAPI: ChatAPI, roomID: String, clientID: String) async {
self.channel = channel
internal nonisolated init(featureChannel: FeatureChannel, chatAPI: ChatAPI, roomID: String, clientID: String) async {
self.featureChannel = featureChannel
self.chatAPI = chatAPI
self.roomID = roomID
self.clientID = clientID
Expand All @@ -32,6 +32,10 @@ internal final class DefaultMessages: Messages, EmitsDiscontinuities {
await handleChannelEvents(roomId: roomID)
}

internal nonisolated var channel: any RealtimeChannelProtocol {
featureChannel.channel
}

// (CHA-M4) Messages can be received via a subscription in realtime.
internal func subscribe(bufferingPolicy: BufferingPolicy) async throws -> MessageSubscription {
let uuid = UUID()
Expand Down Expand Up @@ -99,9 +103,9 @@ internal final class DefaultMessages: Messages, EmitsDiscontinuities {
try await chatAPI.sendMessage(roomId: roomID, params: params)
}

// TODO: (CHA-M7) Users may subscribe to discontinuity events to know when there’s been a break in messages that they need to resolve. Their listener will be called when a discontinuity event is triggered from the room lifecycle. - https://github.com/ably-labs/ably-chat-swift/issues/47
internal nonisolated func subscribeToDiscontinuities() -> Subscription<ARTErrorInfo> {
fatalError("not implemented")
// (CHA-M7) Users may subscribe to discontinuity events to know when there’s been a break in messages that they need to resolve. Their listener will be called when a discontinuity event is triggered from the room lifecycle.
internal func subscribeToDiscontinuities() async -> Subscription<ARTErrorInfo> {
await featureChannel.subscribeToDiscontinuities()
}

private func getBeforeSubscriptionStart(_ uuid: UUID, params: QueryOptions) async throws -> any PaginatedResult<Message> {
Expand Down
16 changes: 13 additions & 3 deletions Sources/AblyChat/DefaultRoomLifecycleContributor.swift
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import Ably

internal actor DefaultRoomLifecycleContributor: RoomLifecycleContributor {
internal actor DefaultRoomLifecycleContributor: RoomLifecycleContributor, EmitsDiscontinuities {
internal let channel: DefaultRoomLifecycleContributorChannel
internal let feature: RoomFeature
private var discontinuitySubscriptions: [Subscription<ARTErrorInfo>] = []

internal init(channel: DefaultRoomLifecycleContributorChannel, feature: RoomFeature) {
self.channel = channel
Expand All @@ -11,8 +12,17 @@ internal actor DefaultRoomLifecycleContributor: RoomLifecycleContributor {

// MARK: - Discontinuities

internal func emitDiscontinuity(_: ARTErrorInfo) {
// TODO: https://github.com/ably-labs/ably-chat-swift/issues/47
internal func emitDiscontinuity(_ error: ARTErrorInfo) {
for subscription in discontinuitySubscriptions {
subscription.emit(error)
}
}

internal func subscribeToDiscontinuities() -> Subscription<ARTErrorInfo> {
let subscription = Subscription<ARTErrorInfo>(bufferingPolicy: .unbounded)
// TODO: clean up old subscriptions (https://github.com/ably-labs/ably-chat-swift/issues/36)
discontinuitySubscriptions.append(subscription)
return subscription
}
}

Expand Down
19 changes: 7 additions & 12 deletions Sources/AblyChat/Room.swift
Original file line number Diff line number Diff line change
Expand Up @@ -81,37 +81,32 @@ internal actor DefaultRoom<LifecycleManagerFactory: RoomLifecycleManagerFactory>
throw ARTErrorInfo.create(withCode: 40000, message: "Ensure your Realtime instance is initialized with a clientId.")
}

channels = Self.createChannels(roomID: roomID, realtime: realtime)
let contributors = Self.createContributors(channels: channels)
let featureChannels = Self.createFeatureChannels(roomID: roomID, realtime: realtime)
channels = featureChannels.mapValues(\.channel)
let contributors = featureChannels.values.map(\.contributor)

lifecycleManager = await lifecycleManagerFactory.createManager(
contributors: contributors,
logger: logger
)

messages = await DefaultMessages(
channel: channels[.messages]!,
featureChannel: featureChannels[.messages]!,
chatAPI: chatAPI,
roomID: roomID,
clientID: clientId
)
}

private static func createChannels(roomID: String, realtime: RealtimeClient) -> [RoomFeature: RealtimeChannelProtocol] {
private static func createFeatureChannels(roomID: String, realtime: RealtimeClient) -> [RoomFeature: DefaultFeatureChannel] {
.init(uniqueKeysWithValues: [RoomFeature.messages].map { feature in
let channel = realtime.getChannel(feature.channelNameForRoomID(roomID))
let contributor = DefaultRoomLifecycleContributor(channel: .init(underlyingChannel: channel), feature: feature)

return (feature, channel)
return (feature, .init(channel: channel, contributor: contributor))
})
}

private static func createContributors(channels: [RoomFeature: RealtimeChannelProtocol]) -> [DefaultRoomLifecycleContributor] {
channels.map { entry in
let (feature, channel) = entry
return .init(channel: .init(underlyingChannel: channel), feature: feature)
}
}

public nonisolated var presence: any Presence {
fatalError("Not yet implemented")
}
Expand Down
21 changes: 21 additions & 0 deletions Sources/AblyChat/RoomFeature.swift
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import Ably

/// The features offered by a chat room.
internal enum RoomFeature {
case messages
Expand All @@ -21,3 +23,22 @@ internal enum RoomFeature {
}
}
}

/// Provides all of the channel-related functionality that a room feature (e.g. an implementation of ``Messages``) needs.
///
/// This mishmash exists to give a room feature access to both:
///
/// - a `RealtimeChannelProtocol` object (this is the interface that our features are currently written against, as opposed to, say, `RoomLifecycleContributorChannel`)
/// - the discontinuities emitted by the room lifecycle
internal protocol FeatureChannel: Sendable, EmitsDiscontinuities {
var channel: RealtimeChannelProtocol { get }
}

internal struct DefaultFeatureChannel: FeatureChannel {
internal var channel: RealtimeChannelProtocol
internal var contributor: DefaultRoomLifecycleContributor

internal func subscribeToDiscontinuities() async -> Subscription<ARTErrorInfo> {
await contributor.subscribeToDiscontinuities()
}
}
29 changes: 26 additions & 3 deletions Tests/AblyChatTests/DefaultMessagesTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ struct DefaultMessagesTests {
let realtime = MockRealtime.create()
let chatAPI = ChatAPI(realtime: realtime)
let channel = MockRealtimeChannel()
let defaultMessages = await DefaultMessages(channel: channel, chatAPI: chatAPI, roomID: "basketball", clientID: "clientId")
let featureChannel = MockFeatureChannel(channel: channel)
let defaultMessages = await DefaultMessages(featureChannel: featureChannel, chatAPI: chatAPI, roomID: "basketball", clientID: "clientId")

// Then
await #expect(throws: ARTErrorInfo.create(withCode: 40000, status: 400, message: "channel is attached, but channelSerial is not defined"), performing: {
Expand All @@ -28,7 +29,8 @@ struct DefaultMessagesTests {
let realtime = MockRealtime.create { (MockHTTPPaginatedResponse.successGetMessagesWithNoItems, nil) }
let chatAPI = ChatAPI(realtime: realtime)
let channel = MockRealtimeChannel()
let defaultMessages = await DefaultMessages(channel: channel, chatAPI: chatAPI, roomID: "basketball", clientID: "clientId")
let featureChannel = MockFeatureChannel(channel: channel)
let defaultMessages = await DefaultMessages(featureChannel: featureChannel, chatAPI: chatAPI, roomID: "basketball", clientID: "clientId")

// Then
await #expect(throws: Never.self, performing: {
Expand All @@ -52,7 +54,8 @@ struct DefaultMessagesTests {
channelSerial: "001"
)
)
let defaultMessages = await DefaultMessages(channel: channel, chatAPI: chatAPI, roomID: "basketball", clientID: "clientId")
let featureChannel = MockFeatureChannel(channel: channel)
let defaultMessages = await DefaultMessages(featureChannel: featureChannel, chatAPI: chatAPI, roomID: "basketball", clientID: "clientId")
let subscription = try await defaultMessages.subscribe(bufferingPolicy: .unbounded)
let expectedPaginatedResult = PaginatedResultWrapper<Message>(
paginatedResponse: MockHTTPPaginatedResponse.successGetMessagesWithNoItems,
Expand All @@ -65,4 +68,24 @@ struct DefaultMessagesTests {
// Then
#expect(previousMessages == expectedPaginatedResult)
}

// @spec CHA-M7
@Test
func subscribeToDiscontinuities() async throws {
// Given: A DefaultMessages instance
let realtime = MockRealtime.create()
let chatAPI = ChatAPI(realtime: realtime)
let channel = MockRealtimeChannel()
let featureChannel = MockFeatureChannel(channel: channel)
let messages = await DefaultMessages(featureChannel: featureChannel, chatAPI: chatAPI, roomID: "basketball", clientID: "clientId")

// When: The feature channel emits a discontinuity through `subscribeToDiscontinuities`
let featureChannelDiscontinuity = ARTErrorInfo.createUnknownError() // arbitrary
let messagesDiscontinuitySubscription = await messages.subscribeToDiscontinuities()
await featureChannel.emitDiscontinuity(featureChannelDiscontinuity)

// Then: The DefaultMessages instance emits this discontinuity through `subscribeToDiscontinuities`
let messagesDiscontinuity = try #require(await messagesDiscontinuitySubscription.first { _ in true })
#expect(messagesDiscontinuity === featureChannelDiscontinuity)
}
}
24 changes: 24 additions & 0 deletions Tests/AblyChatTests/Mocks/MockFeatureChannel.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import Ably
@testable import AblyChat

final actor MockFeatureChannel: FeatureChannel {
let channel: RealtimeChannelProtocol
// TODO: clean up old subscriptions (https://github.com/ably-labs/ably-chat-swift/issues/36)
private var discontinuitySubscriptions: [Subscription<ARTErrorInfo>] = []

init(channel: RealtimeChannelProtocol) {
self.channel = channel
}

func subscribeToDiscontinuities() async -> Subscription<ARTErrorInfo> {
let subscription = Subscription<ARTErrorInfo>(bufferingPolicy: .unbounded)
discontinuitySubscriptions.append(subscription)
return subscription
}

func emitDiscontinuity(_ discontinuity: ARTErrorInfo) {
for subscription in discontinuitySubscriptions {
subscription.emit(discontinuity)
}
}
}

0 comments on commit b1860c2

Please sign in to comment.