Skip to content

Commit

Permalink
Add and improve logging for Messages and Reactions
Browse files Browse the repository at this point in the history
  • Loading branch information
umair-ably committed Nov 18, 2024
1 parent 900386e commit c02abe8
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 9 deletions.
24 changes: 21 additions & 3 deletions Sources/AblyChat/DefaultMessages.swift
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,18 @@ internal final class DefaultMessages: Messages, EmitsDiscontinuities {
public nonisolated let featureChannel: FeatureChannel
private let chatAPI: ChatAPI
private let clientID: String
private let logger: InternalLogger

// TODO: https://github.com/ably-labs/ably-chat-swift/issues/36 - Handle unsubscribing in line with CHA-M4b
// UUID acts as a unique identifier for each listener/subscription. MessageSubscriptionWrapper houses the subscription and the timeserial of when it was attached or resumed.
private var subscriptionPoints: [UUID: MessageSubscriptionWrapper] = [:]

internal nonisolated init(featureChannel: FeatureChannel, chatAPI: ChatAPI, roomID: String, clientID: String) async {
internal nonisolated init(featureChannel: FeatureChannel, chatAPI: ChatAPI, roomID: String, clientID: String, logger: InternalLogger) async {
self.featureChannel = featureChannel
self.chatAPI = chatAPI
self.roomID = roomID
self.clientID = clientID
self.logger = logger

// Implicitly handles channel events and therefore listners within this class. Alternative is to explicitly call something like `DefaultMessages.start()` which makes the SDK more cumbersome to interact with. This class is useless without kicking off this flow so I think leaving it here is suitable.
// "Calls to instance method 'handleChannelEvents(roomId:)' from outside of its actor context are implicitly asynchronous" hence the `await` here.
Expand All @@ -38,6 +40,7 @@ internal final class DefaultMessages: Messages, EmitsDiscontinuities {

// (CHA-M4) Messages can be received via a subscription in realtime.
internal func subscribe(bufferingPolicy: BufferingPolicy) async throws -> MessageSubscription {
logger.log(message: "Subscribing to messages", level: .debug)
let uuid = UUID()
let timeserial = try await resolveSubscriptionStart()
let messageSubscription = MessageSubscription(
Expand Down Expand Up @@ -162,29 +165,37 @@ internal final class DefaultMessages: Messages, EmitsDiscontinuities {
}

private func handleAttach(fromResume: Bool) async throws {
logger.log(message: "Handling attach", level: .debug)
// Do nothing if we have resumed as there is no discontinuity in the message stream
if fromResume {
logger.log(message: "Channel has resumed, no need to handle attach", level: .debug)
return
}

do {
let timeserialOnChannelAttach = try await timeserialOnChannelAttach()

for uuid in subscriptionPoints.keys {
logger.log(message: "Resetting subscription point for listener: \(uuid)", level: .debug)
subscriptionPoints[uuid]?.timeserial = timeserialOnChannelAttach
}
} catch {
logger.log(message: "Error handling attach: \(error)", level: .error)
throw ARTErrorInfo.create(from: error)
}
}

private func resolveSubscriptionStart() async throws -> TimeserialString {
logger.log(message: "Resolving subscription start", level: .debug)
// (CHA-M5a) If a subscription is added when the underlying realtime channel is ATTACHED, then the subscription point is the current channelSerial of the realtime channel.
if channel.state == .attached {
if let channelSerial = channel.properties.channelSerial {
logger.log(message: "Channel is attached, returning channelSerial: \(channelSerial)", level: .debug)
return channelSerial
} else {
throw ARTErrorInfo.create(withCode: 40000, status: 400, message: "channel is attached, but channelSerial is not defined")
let error = ARTErrorInfo.create(withCode: 40000, status: 400, message: "channel is attached, but channelSerial is not defined")
logger.log(message: "Error resolving subscription start: \(error)", level: .error)
throw error
}
}

Expand All @@ -194,12 +205,16 @@ internal final class DefaultMessages: Messages, EmitsDiscontinuities {

// Always returns the attachSerial and not the channelSerial to also serve (CHA-M5c) - If a channel leaves the ATTACHED state and then re-enters ATTACHED with resumed=false, then it must be assumed that messages have been missed. The subscription point of any subscribers must be reset to the attachSerial.
private func timeserialOnChannelAttach() async throws -> TimeserialString {
logger.log(message: "Resolving timeserial on channel attach", level: .debug)
// If the state is already 'attached', return the attachSerial immediately
if channel.state == .attached {
if let attachSerial = channel.properties.attachSerial {
logger.log(message: "Channel is attached, returning attachSerial: \(attachSerial)", level: .debug)
return attachSerial
} else {
throw ARTErrorInfo.create(withCode: 40000, status: 400, message: "Channel is attached, but attachSerial is not defined")
let error = ARTErrorInfo.create(withCode: 40000, status: 400, message: "Channel is attached, but attachSerial is not defined")
logger.log(message: "Error resolving timeserial on channel attach: \(error)", level: .error)
throw error
}
}

Expand All @@ -217,13 +232,16 @@ internal final class DefaultMessages: Messages, EmitsDiscontinuities {
case .attached:
// Handle successful attachment
if let attachSerial = channel.properties.attachSerial {
logger.log(message: "Channel is attached, returning attachSerial: \(attachSerial)", level: .debug)
nillableContinuation?.resume(returning: attachSerial)
} else {
logger.log(message: "Channel is attached, but attachSerial is not defined", level: .error)
nillableContinuation?.resume(throwing: ARTErrorInfo.create(withCode: 40000, status: 400, message: "Channel is attached, but attachSerial is not defined"))
}
nillableContinuation = nil
case .failed, .suspended:
// TODO: Revisit as part of https://github.com/ably-labs/ably-chat-swift/issues/32
logger.log(message: "Channel failed to attach", level: .error)
nillableContinuation?.resume(
throwing: ARTErrorInfo.create(
withCode: ErrorCode.messagesAttachmentFailed.rawValue,
Expand Down
5 changes: 4 additions & 1 deletion Sources/AblyChat/DefaultRoomReactions.swift
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,20 @@ internal final class DefaultRoomReactions: RoomReactions, EmitsDiscontinuities {
// (CHA-ER3) Ephemeral room reactions are sent to Ably via the Realtime connection via a send method.
// (CHA-ER3a) Reactions are sent on the channel using a message in a particular format - see spec for format.
internal func send(params: SendReactionParams) async throws {
logger.log(message: "Sending reaction with params: \(params)", level: .debug)
let extras = ["headers": params.headers ?? [:]] as ARTJsonCompatible
channel.publish(RoomReactionEvents.reaction.rawValue, data: params.asQueryItems(), extras: extras)
}

// (CHA-ER4) A user may subscribe to reaction events in Realtime.
// (CHA-ER4a) A user may provide a listener to subscribe to reaction events. This operation must have no side-effects in relation to room or underlying status. When a realtime message with name roomReaction is received, this message is converted into a reaction object and emitted to subscribers.
internal func subscribe(bufferingPolicy: BufferingPolicy) async -> Subscription<Reaction> {
logger.log(message: "Subscribing to reaction events", level: .debug)
let subscription = Subscription<Reaction>(bufferingPolicy: bufferingPolicy)

// (CHA-ER4c) Realtime events with an unknown name shall be silently discarded.
channel.subscribe(RoomReactionEvents.reaction.rawValue) { [clientID, logger] message in
logger.log(message: "Received roomReaction message: \(message)", level: .debug)
Task {
do {
guard let data = message.data as? [String: Any],
Expand Down Expand Up @@ -65,7 +68,7 @@ internal final class DefaultRoomReactions: RoomReactions, EmitsDiscontinuities {
clientID: messageClientID,
isSelf: messageClientID == clientID
)

logger.log(message: "Emitting reaction: \(reaction)", level: .debug)
subscription.emit(reaction)
} catch {
logger.log(message: "Error processing incoming reaction message: \(error)", level: .error)
Expand Down
3 changes: 2 additions & 1 deletion Sources/AblyChat/Room.swift
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ internal actor DefaultRoom<LifecycleManagerFactory: RoomLifecycleManagerFactory>
featureChannel: featureChannels[.messages]!,
chatAPI: chatAPI,
roomID: roomID,
clientID: clientId
clientID: clientId,
logger: logger
)

_reactions = options.reactions != nil ? await DefaultRoomReactions(
Expand Down
8 changes: 4 additions & 4 deletions Tests/AblyChatTests/DefaultMessagesTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ struct DefaultMessagesTests {
let chatAPI = ChatAPI(realtime: realtime)
let channel = MockRealtimeChannel()
let featureChannel = MockFeatureChannel(channel: channel)
let defaultMessages = await DefaultMessages(featureChannel: featureChannel, chatAPI: chatAPI, roomID: "basketball", clientID: "clientId")
let defaultMessages = await DefaultMessages(featureChannel: featureChannel, chatAPI: chatAPI, roomID: "basketball", clientID: "clientId", logger: TestLogger())

// Then
await #expect(throws: ARTErrorInfo.create(withCode: 40000, status: 400, message: "channel is attached, but channelSerial is not defined"), performing: {
Expand All @@ -30,7 +30,7 @@ struct DefaultMessagesTests {
let chatAPI = ChatAPI(realtime: realtime)
let channel = MockRealtimeChannel()
let featureChannel = MockFeatureChannel(channel: channel)
let defaultMessages = await DefaultMessages(featureChannel: featureChannel, chatAPI: chatAPI, roomID: "basketball", clientID: "clientId")
let defaultMessages = await DefaultMessages(featureChannel: featureChannel, chatAPI: chatAPI, roomID: "basketball", clientID: "clientId", logger: TestLogger())

// Then
await #expect(throws: Never.self, performing: {
Expand All @@ -55,7 +55,7 @@ struct DefaultMessagesTests {
)
)
let featureChannel = MockFeatureChannel(channel: channel)
let defaultMessages = await DefaultMessages(featureChannel: featureChannel, chatAPI: chatAPI, roomID: "basketball", clientID: "clientId")
let defaultMessages = await DefaultMessages(featureChannel: featureChannel, chatAPI: chatAPI, roomID: "basketball", clientID: "clientId", logger: TestLogger())
let subscription = try await defaultMessages.subscribe(bufferingPolicy: .unbounded)
let expectedPaginatedResult = PaginatedResultWrapper<Message>(
paginatedResponse: MockHTTPPaginatedResponse.successGetMessagesWithNoItems,
Expand All @@ -77,7 +77,7 @@ struct DefaultMessagesTests {
let chatAPI = ChatAPI(realtime: realtime)
let channel = MockRealtimeChannel()
let featureChannel = MockFeatureChannel(channel: channel)
let messages = await DefaultMessages(featureChannel: featureChannel, chatAPI: chatAPI, roomID: "basketball", clientID: "clientId")
let messages = await DefaultMessages(featureChannel: featureChannel, chatAPI: chatAPI, roomID: "basketball", clientID: "clientId", logger: TestLogger())

// When: The feature channel emits a discontinuity through `subscribeToDiscontinuities`
let featureChannelDiscontinuity = ARTErrorInfo.createUnknownError() // arbitrary
Expand Down

0 comments on commit c02abe8

Please sign in to comment.