diff --git a/chat-android/build.gradle.kts b/chat-android/build.gradle.kts index 3262af49..aa0cb5c1 100644 --- a/chat-android/build.gradle.kts +++ b/chat-android/build.gradle.kts @@ -1,3 +1,5 @@ +import org.gradle.api.tasks.testing.logging.TestExceptionFormat + plugins { alias(libs.plugins.android.library) alias(libs.plugins.android.kotlin) @@ -57,3 +59,9 @@ dependencies { androidTestImplementation(libs.androidx.test.runner) androidTestImplementation(libs.androidx.junit) } + +tasks.withType().configureEach { + testLogging { + exceptionFormat = TestExceptionFormat.FULL + } +} diff --git a/chat-android/src/main/java/com/ably/chat/Occupancy.kt b/chat-android/src/main/java/com/ably/chat/Occupancy.kt index a9e18ed1..1426cebd 100644 --- a/chat-android/src/main/java/com/ably/chat/Occupancy.kt +++ b/chat-android/src/main/java/com/ably/chat/Occupancy.kt @@ -2,7 +2,18 @@ package com.ably.chat +import com.google.gson.JsonObject +import com.google.gson.JsonPrimitive +import io.ably.lib.realtime.AblyRealtime import io.ably.lib.realtime.Channel +import java.util.concurrent.CopyOnWriteArrayList +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.cancel +import kotlinx.coroutines.channels.BufferOverflow +import kotlinx.coroutines.flow.MutableSharedFlow +import kotlinx.coroutines.launch /** * This interface is used to interact with occupancy in a chat room: subscribing to occupancy updates and @@ -46,6 +57,8 @@ interface Occupancy : EmitsDiscontinuities { /** * Represents the occupancy of a chat room. + * + * (CHA-O2) */ data class OccupancyEvent( /** @@ -60,20 +73,145 @@ data class OccupancyEvent( ) internal class DefaultOccupancy( - private val messages: Messages, + realtimeChannels: AblyRealtime.Channels, + private val chatApi: ChatApi, + private val roomId: String, + private val logger: Logger, ) : Occupancy { - override val channel: Channel - get() = messages.channel + // (CHA-O1) + private val messagesChannelName = "$roomId::\$chat::\$chatMessages" + + override val channel: Channel = realtimeChannels.get( + messagesChannelName, + ChatChannelOptions { + params = mapOf( + "occupancy" to "metrics", + ) + }, + ) + + private val listeners: MutableList = CopyOnWriteArrayList() + + private val eventBus = MutableSharedFlow( + extraBufferCapacity = 1, + onBufferOverflow = BufferOverflow.DROP_OLDEST, + ) + + private val occupancyScope = CoroutineScope(Dispatchers.Default.limitedParallelism(1) + SupervisorJob()) + private val occupancySubscription: Subscription + + init { + occupancyScope.launch { + eventBus.collect { occupancyEvent -> + listeners.forEach { + it.onEvent(occupancyEvent) + } + } + } + + val occupancyListener = PubSubMessageListener { + internalChannelListener(it) + } + channel.subscribe(occupancyListener) + + occupancySubscription = Subscription { + channel.unsubscribe(occupancyListener) + } + } + + // (CHA-O4) override fun subscribe(listener: Occupancy.Listener): Subscription { - TODO("Not yet implemented") + logger.trace("Occupancy.subscribe()") + listeners.add(listener) + + return Subscription { + logger.trace("Occupancy.unsubscribe()") + // (CHA-04b) + listeners.remove(listener) + } } + // (CHA-O3) override suspend fun get(): OccupancyEvent { - TODO("Not yet implemented") + logger.trace("Occupancy.get()") + return chatApi.getOccupancy(roomId) } override fun onDiscontinuity(listener: EmitsDiscontinuities.Listener): Subscription { + // (CHA-O5) TODO("Not yet implemented") } + + fun release() { + occupancySubscription.unsubscribe() + occupancyScope.cancel() + } + + /** + * An internal listener that listens for occupancy events from the underlying channel and translates them into + * occupancy events for the public API. + */ + @Suppress("ReturnCount") + private fun internalChannelListener(message: PubSubMessage) { + val data = message.data as? JsonObject + + if (data == null) { + logger.error( + "invalid occupancy event received; data is not an object", + staticContext = mapOf( + "message" to message.toString(), + ), + ) + // (CHA-04d) + return + } + + val metrics = data.get("metrics") as? JsonObject + + if (metrics == null) { + logger.error( + "invalid occupancy event received; metrics is missing", + staticContext = mapOf( + "data" to data.toString(), + ), + ) + // (CHA-04d) + return + } + + val connections = metrics.get("connections") as? JsonPrimitive + + if (connections == null) { + logger.error( + "invalid occupancy event received; connections is missing", + staticContext = mapOf( + "data" to data.toString(), + ), + ) + // (CHA-04d) + return + } + + val presenceMembers = metrics.get("presenceMembers") as? JsonPrimitive + + if (presenceMembers == null) { + logger.error( + "invalid occupancy event received; presenceMembers is missing", + staticContext = mapOf( + "data" to data.toString(), + ), + ) + // (CHA-04d) + return + } + + eventBus.tryEmit( + // (CHA-04c) + OccupancyEvent( + connections = connections.asInt, + presenceMembers = presenceMembers.asInt, + ), + ) + } } diff --git a/chat-android/src/main/java/com/ably/chat/Room.kt b/chat-android/src/main/java/com/ably/chat/Room.kt index 912c5776..fe70d055 100644 --- a/chat-android/src/main/java/com/ably/chat/Room.kt +++ b/chat-android/src/main/java/com/ably/chat/Room.kt @@ -107,12 +107,22 @@ internal class DefaultRoom( logger = logger.withContext(tag = "Typing"), ) + private val _occupancy = DefaultOccupancy( + roomId = roomId, + realtimeChannels = realtimeClient.channels, + chatApi = chatApi, + logger = logger.withContext(tag = "Occupancy"), + ) + override val messages: Messages get() = _messages override val typing: Typing get() = _typing + override val occupancy: Occupancy + get() = _occupancy + override val presence: Presence = DefaultPresence( channel = messages.channel, clientId = clientId, @@ -125,10 +135,6 @@ internal class DefaultRoom( realtimeChannels = realtimeClient.channels, ) - override val occupancy: Occupancy = DefaultOccupancy( - messages = messages, - ) - override val status: RoomStatus get() { TODO("Not yet implemented") @@ -149,5 +155,6 @@ internal class DefaultRoom( fun release() { _messages.release() _typing.release() + _occupancy.release() } } diff --git a/chat-android/src/main/java/com/ably/chat/Typing.kt b/chat-android/src/main/java/com/ably/chat/Typing.kt index f5e2a177..b4237a22 100644 --- a/chat-android/src/main/java/com/ably/chat/Typing.kt +++ b/chat-android/src/main/java/com/ably/chat/Typing.kt @@ -3,6 +3,7 @@ package com.ably.chat import io.ably.lib.realtime.Channel +import io.ably.lib.realtime.Presence.PresenceListener import io.ably.lib.types.AblyException import io.ably.lib.types.ErrorInfo import java.util.concurrent.CopyOnWriteArrayList @@ -113,6 +114,8 @@ internal class DefaultTyping( private var lastTyping: Set = setOf() + private var presenceSubscription: Subscription + init { typingScope.launch { eventBus.collect { @@ -120,13 +123,19 @@ internal class DefaultTyping( } } - channel.presence.subscribe { + val presenceListener = PresenceListener { if (it.clientId == null) { logger.error("unable to handle typing event; no clientId", staticContext = mapOf("member" to it.toString())) } else { eventBus.tryEmit(Unit) } } + + channel.presence.subscribe(presenceListener) + + presenceSubscription = Subscription { + channel.presence.unsubscribe(presenceListener) + } } override fun subscribe(listener: Typing.Listener): Subscription { @@ -172,6 +181,7 @@ internal class DefaultTyping( } fun release() { + presenceSubscription.unsubscribe() typingScope.cancel() } diff --git a/chat-android/src/test/java/com/ably/chat/OccupancyTest.kt b/chat-android/src/test/java/com/ably/chat/OccupancyTest.kt new file mode 100644 index 00000000..bb64cb2c --- /dev/null +++ b/chat-android/src/test/java/com/ably/chat/OccupancyTest.kt @@ -0,0 +1,151 @@ +package com.ably.chat + +import com.google.gson.JsonObject +import io.ably.lib.realtime.AblyRealtime.Channels +import io.ably.lib.realtime.Channel +import io.ably.lib.realtime.buildRealtimeChannel +import io.mockk.every +import io.mockk.mockk +import io.mockk.slot +import io.mockk.spyk +import kotlinx.coroutines.CompletableDeferred +import kotlinx.coroutines.test.runTest +import org.junit.Assert.assertEquals +import org.junit.Before +import org.junit.Test + +class OccupancyTest { + + private val realtimeClient = mockk(relaxed = true) + private val realtimeChannels = mockk(relaxed = true) + private val realtimeChannel = spyk(buildRealtimeChannel()) + private val chatApi = spyk(ChatApi(realtimeClient, "clientId", EmptyLogger(LogContext(tag = "TEST")))) + private lateinit var occupancy: Occupancy + private val pubSubMessageListenerSlot = slot() + + @Before + fun setUp() { + every { realtimeChannels.get(any(), any()) } returns realtimeChannel + every { realtimeChannel.subscribe(capture(pubSubMessageListenerSlot)) } returns Unit + + occupancy = DefaultOccupancy( + roomId = "room1", + realtimeChannels = realtimeChannels, + chatApi = chatApi, + logger = EmptyLogger(LogContext(tag = "TEST")), + ) + } + + /** + * @spec CHA-O3 + */ + @Test + fun `user should be able to receive occupancy via #get()`() = runTest { + mockOccupancyApiResponse( + realtimeClient, + JsonObject().apply { + addProperty("connections", 2) + addProperty("presenceMembers", 1) + }, + roomId = "room1", + ) + + assertEquals(OccupancyEvent(connections = 2, presenceMembers = 1), occupancy.get()) + } + + /** + * @spec CHA-O4a + * @spec CHA-04c + */ + @Test + fun `user should be able to register a listener that receives occupancy events in realtime`() = runTest { + val occupancyEventMessage = PubSubMessage().apply { + data = JsonObject().apply { + add( + "metrics", + JsonObject().apply { + addProperty("connections", 2) + addProperty("presenceMembers", 1) + }, + ) + } + } + + val deferredEvent = CompletableDeferred() + occupancy.subscribe { + deferredEvent.complete(it) + } + + pubSubMessageListenerSlot.captured.onMessage(occupancyEventMessage) + + assertEquals(OccupancyEvent(connections = 2, presenceMembers = 1), deferredEvent.await()) + } + + /** + * @spec CHA-04d + */ + @Test + fun `invalid occupancy event should be dropped`() = runTest { + val validOccupancyEvent = PubSubMessage().apply { + data = JsonObject().apply { + add( + "metrics", + JsonObject().apply { + addProperty("connections", 1) + addProperty("presenceMembers", 1) + }, + ) + } + } + + val invalidOccupancyEvent = PubSubMessage().apply { + data = JsonObject().apply { + add("metrics", JsonObject()) + } + } + + val deferredEvent = CompletableDeferred() + occupancy.subscribe { + deferredEvent.complete(it) + } + + pubSubMessageListenerSlot.captured.onMessage(invalidOccupancyEvent) + pubSubMessageListenerSlot.captured.onMessage(validOccupancyEvent) + + assertEquals(OccupancyEvent(connections = 1, presenceMembers = 1), deferredEvent.await()) + } + + /** + * @spec CHA-04b + */ + @Test + fun `user should be able to remove a listener`() = runTest { + val subscription = occupancy.subscribe { + error("Should not be called") + } + subscription.unsubscribe() + + val fakeMessage = PubSubMessage().apply { + data = JsonObject().apply { + add( + "metrics", + JsonObject().apply { + addProperty("connections", 1) + addProperty("presenceMembers", 1) + }, + ) + } + } + + pubSubMessageListenerSlot.captured.onMessage(fakeMessage) + + val deferredEvent = CompletableDeferred() + occupancy.subscribe { + deferredEvent.complete(it) + } + + pubSubMessageListenerSlot.captured.onMessage(fakeMessage) + + assertEquals(OccupancyEvent(connections = 1, presenceMembers = 1), deferredEvent.await()) + } +} diff --git a/chat-android/src/test/java/com/ably/chat/SandboxTest.kt b/chat-android/src/test/java/com/ably/chat/SandboxTest.kt index 2140198f..9f15b197 100644 --- a/chat-android/src/test/java/com/ably/chat/SandboxTest.kt +++ b/chat-android/src/test/java/com/ably/chat/SandboxTest.kt @@ -4,18 +4,11 @@ import java.util.UUID import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.test.runTest import org.junit.Assert.assertEquals -import org.junit.Before +import org.junit.BeforeClass import org.junit.Test class SandboxTest { - private lateinit var sandbox: Sandbox - - @Before - fun setUp() = runTest { - sandbox = Sandbox.createInstance() - } - @Test fun `should return empty list of presence members if nobody is entered`() = runTest { val chatClient = sandbox.createSandboxChatClient() @@ -56,4 +49,32 @@ class SandboxTest { assertEquals(setOf("client1"), typingEvent.currentlyTyping) assertEquals(setOf("client1"), chatClient2Room.typing.get()) } + + @Test + fun `should return occupancy for the client`() = runTest { + val chatClient = sandbox.createSandboxChatClient("client1") + val roomId = UUID.randomUUID().toString() + val roomOptions = RoomOptions(occupancy = OccupancyOptions) + + val chatClientRoom = chatClient.rooms.get(roomId, roomOptions) + + val firstOccupancyEvent = CompletableDeferred() + chatClientRoom.occupancy.subscribeOnce { + firstOccupancyEvent.complete(it) + } + + chatClientRoom.attach() + assertEquals(OccupancyEvent(1, 0), firstOccupancyEvent.await()) + } + + companion object { + + private lateinit var sandbox: Sandbox + + @JvmStatic + @BeforeClass + fun setUp() = runTest { + sandbox = Sandbox.createInstance() + } + } } diff --git a/chat-android/src/test/java/com/ably/chat/TestUtils.kt b/chat-android/src/test/java/com/ably/chat/TestUtils.kt index e3449e2f..e7982fa8 100644 --- a/chat-android/src/test/java/com/ably/chat/TestUtils.kt +++ b/chat-android/src/test/java/com/ably/chat/TestUtils.kt @@ -54,3 +54,11 @@ internal class EmptyLogger(override val context: LogContext) : Logger { override fun withContext(tag: String?, staticContext: Map, dynamicContext: Map String>): Logger = this override fun log(message: String, level: LogLevel, throwable: Throwable?, newTag: String?, newStaticContext: Map) = Unit } + +fun Occupancy.subscribeOnce(listener: Occupancy.Listener) { + lateinit var subscription: Subscription + subscription = subscribe { + listener.onEvent(it) + subscription.unsubscribe() + } +}