Skip to content

Commit

Permalink
Merge pull request #60 from ably-labs/ECO-4949/add-occupancy
Browse files Browse the repository at this point in the history
[ECO-4949] feat: add occupancy feature
  • Loading branch information
ttypic authored Nov 22, 2024
2 parents 77a892a + 3ee0088 commit 45c7156
Show file tree
Hide file tree
Showing 7 changed files with 361 additions and 18 deletions.
8 changes: 8 additions & 0 deletions chat-android/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import org.gradle.api.tasks.testing.logging.TestExceptionFormat

plugins {
alias(libs.plugins.android.library)
alias(libs.plugins.android.kotlin)
Expand Down Expand Up @@ -57,3 +59,9 @@ dependencies {
androidTestImplementation(libs.androidx.test.runner)
androidTestImplementation(libs.androidx.junit)
}

tasks.withType<Test>().configureEach {
testLogging {
exceptionFormat = TestExceptionFormat.FULL
}
}
148 changes: 143 additions & 5 deletions chat-android/src/main/java/com/ably/chat/Occupancy.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,18 @@

package com.ably.chat

import com.google.gson.JsonObject
import com.google.gson.JsonPrimitive
import io.ably.lib.realtime.AblyRealtime
import io.ably.lib.realtime.Channel
import java.util.concurrent.CopyOnWriteArrayList
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.launch

/**
* This interface is used to interact with occupancy in a chat room: subscribing to occupancy updates and
Expand Down Expand Up @@ -46,6 +57,8 @@ interface Occupancy : EmitsDiscontinuities {

/**
* Represents the occupancy of a chat room.
*
* (CHA-O2)
*/
data class OccupancyEvent(
/**
Expand All @@ -60,20 +73,145 @@ data class OccupancyEvent(
)

internal class DefaultOccupancy(
private val messages: Messages,
realtimeChannels: AblyRealtime.Channels,
private val chatApi: ChatApi,
private val roomId: String,
private val logger: Logger,
) : Occupancy {
override val channel: Channel
get() = messages.channel
// (CHA-O1)
private val messagesChannelName = "$roomId::\$chat::\$chatMessages"

override val channel: Channel = realtimeChannels.get(
messagesChannelName,
ChatChannelOptions {
params = mapOf(
"occupancy" to "metrics",
)
},
)

private val listeners: MutableList<Occupancy.Listener> = CopyOnWriteArrayList()

private val eventBus = MutableSharedFlow<OccupancyEvent>(
extraBufferCapacity = 1,
onBufferOverflow = BufferOverflow.DROP_OLDEST,
)

private val occupancyScope = CoroutineScope(Dispatchers.Default.limitedParallelism(1) + SupervisorJob())
private val occupancySubscription: Subscription

init {
occupancyScope.launch {
eventBus.collect { occupancyEvent ->
listeners.forEach {
it.onEvent(occupancyEvent)
}
}
}

val occupancyListener = PubSubMessageListener {
internalChannelListener(it)
}

channel.subscribe(occupancyListener)

occupancySubscription = Subscription {
channel.unsubscribe(occupancyListener)
}
}

// (CHA-O4)
override fun subscribe(listener: Occupancy.Listener): Subscription {
TODO("Not yet implemented")
logger.trace("Occupancy.subscribe()")
listeners.add(listener)

return Subscription {
logger.trace("Occupancy.unsubscribe()")
// (CHA-04b)
listeners.remove(listener)
}
}

// (CHA-O3)
override suspend fun get(): OccupancyEvent {
TODO("Not yet implemented")
logger.trace("Occupancy.get()")
return chatApi.getOccupancy(roomId)
}

override fun onDiscontinuity(listener: EmitsDiscontinuities.Listener): Subscription {
// (CHA-O5)
TODO("Not yet implemented")
}

fun release() {
occupancySubscription.unsubscribe()
occupancyScope.cancel()
}

/**
* An internal listener that listens for occupancy events from the underlying channel and translates them into
* occupancy events for the public API.
*/
@Suppress("ReturnCount")
private fun internalChannelListener(message: PubSubMessage) {
val data = message.data as? JsonObject

if (data == null) {
logger.error(
"invalid occupancy event received; data is not an object",
staticContext = mapOf(
"message" to message.toString(),
),
)
// (CHA-04d)
return
}

val metrics = data.get("metrics") as? JsonObject

if (metrics == null) {
logger.error(
"invalid occupancy event received; metrics is missing",
staticContext = mapOf(
"data" to data.toString(),
),
)
// (CHA-04d)
return
}

val connections = metrics.get("connections") as? JsonPrimitive

if (connections == null) {
logger.error(
"invalid occupancy event received; connections is missing",
staticContext = mapOf(
"data" to data.toString(),
),
)
// (CHA-04d)
return
}

val presenceMembers = metrics.get("presenceMembers") as? JsonPrimitive

if (presenceMembers == null) {
logger.error(
"invalid occupancy event received; presenceMembers is missing",
staticContext = mapOf(
"data" to data.toString(),
),
)
// (CHA-04d)
return
}

eventBus.tryEmit(
// (CHA-04c)
OccupancyEvent(
connections = connections.asInt,
presenceMembers = presenceMembers.asInt,
),
)
}
}
15 changes: 11 additions & 4 deletions chat-android/src/main/java/com/ably/chat/Room.kt
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,22 @@ internal class DefaultRoom(
logger = logger.withContext(tag = "Typing"),
)

private val _occupancy = DefaultOccupancy(
roomId = roomId,
realtimeChannels = realtimeClient.channels,
chatApi = chatApi,
logger = logger.withContext(tag = "Occupancy"),
)

override val messages: Messages
get() = _messages

override val typing: Typing
get() = _typing

override val occupancy: Occupancy
get() = _occupancy

override val presence: Presence = DefaultPresence(
channel = messages.channel,
clientId = clientId,
Expand All @@ -125,10 +135,6 @@ internal class DefaultRoom(
realtimeChannels = realtimeClient.channels,
)

override val occupancy: Occupancy = DefaultOccupancy(
messages = messages,
)

override val status: RoomStatus
get() {
TODO("Not yet implemented")
Expand All @@ -149,5 +155,6 @@ internal class DefaultRoom(
fun release() {
_messages.release()
_typing.release()
_occupancy.release()
}
}
12 changes: 11 additions & 1 deletion chat-android/src/main/java/com/ably/chat/Typing.kt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package com.ably.chat

import io.ably.lib.realtime.Channel
import io.ably.lib.realtime.Presence.PresenceListener
import io.ably.lib.types.AblyException
import io.ably.lib.types.ErrorInfo
import java.util.concurrent.CopyOnWriteArrayList
Expand Down Expand Up @@ -113,20 +114,28 @@ internal class DefaultTyping(

private var lastTyping: Set<String> = setOf()

private var presenceSubscription: Subscription

init {
typingScope.launch {
eventBus.collect {
processEvent()
}
}

channel.presence.subscribe {
val presenceListener = PresenceListener {
if (it.clientId == null) {
logger.error("unable to handle typing event; no clientId", staticContext = mapOf("member" to it.toString()))
} else {
eventBus.tryEmit(Unit)
}
}

channel.presence.subscribe(presenceListener)

presenceSubscription = Subscription {
channel.presence.unsubscribe(presenceListener)
}
}

override fun subscribe(listener: Typing.Listener): Subscription {
Expand Down Expand Up @@ -172,6 +181,7 @@ internal class DefaultTyping(
}

fun release() {
presenceSubscription.unsubscribe()
typingScope.cancel()
}

Expand Down
Loading

0 comments on commit 45c7156

Please sign in to comment.