Skip to content

Commit

Permalink
Merge pull request #58 from ably-labs/ECO-4946/typing-indicators
Browse files Browse the repository at this point in the history
[ECO-4946] feat: typing indicators
  • Loading branch information
ttypic authored Nov 21, 2024
2 parents c2563d4 + 76e3d5d commit 0067565
Show file tree
Hide file tree
Showing 9 changed files with 301 additions and 103 deletions.
1 change: 1 addition & 0 deletions chat-android/src/main/java/com/ably/chat/ChatClient.kt
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ internal class DefaultChatClient(
chatApi = chatApi,
clientOptions = clientOptions,
clientId = clientId,
logger = logger,
)

override val connection: Connection
Expand Down
20 changes: 3 additions & 17 deletions chat-android/src/main/java/com/ably/chat/Presence.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,6 @@ package com.ably.chat
import com.google.gson.JsonElement
import com.google.gson.JsonObject
import io.ably.lib.realtime.Channel
import io.ably.lib.realtime.Presence.GET_CLIENTID
import io.ably.lib.realtime.Presence.GET_CONNECTIONID
import io.ably.lib.realtime.Presence.GET_WAITFORSYNC
import io.ably.lib.types.Param
import io.ably.lib.types.PresenceMessage
import io.ably.lib.realtime.Presence as PubSubPresence
import io.ably.lib.realtime.Presence.PresenceListener as PubSubPresenceListener
Expand Down Expand Up @@ -143,9 +139,8 @@ internal class DefaultPresence(
private val presence: PubSubPresence,
) : Presence {

suspend fun get(params: List<Param>): List<PresenceMember> {
val usersOnPresence = presence.getCoroutine(params)
return usersOnPresence.map { user ->
override suspend fun get(waitForSync: Boolean, clientId: String?, connectionId: String?): List<PresenceMember> {
return presence.getCoroutine(waitForSync, clientId, connectionId).map { user ->
PresenceMember(
clientId = user.clientId,
action = user.action,
Expand All @@ -155,16 +150,7 @@ internal class DefaultPresence(
}
}

override suspend fun get(waitForSync: Boolean, clientId: String?, connectionId: String?): List<PresenceMember> {
val params = buildList {
if (waitForSync) add(Param(GET_WAITFORSYNC, true))
clientId?.let { add(Param(GET_CLIENTID, it)) }
connectionId?.let { add(Param(GET_CONNECTIONID, it)) }
}
return get(params)
}

override suspend fun isUserPresent(clientId: String): Boolean = presence.getCoroutine(Param(GET_CLIENTID, clientId)).isNotEmpty()
override suspend fun isUserPresent(clientId: String): Boolean = presence.getCoroutine(clientId = clientId).isNotEmpty()

override suspend fun enter(data: PresenceData?) {
presence.enterClientCoroutine(clientId, wrapInUserCustomData(data))
Expand Down
23 changes: 16 additions & 7 deletions chat-android/src/main/java/com/ably/chat/Room.kt
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,10 @@ interface Room {
internal class DefaultRoom(
override val roomId: String,
override val options: RoomOptions,
val realtimeClient: RealtimeClient,
private val realtimeClient: RealtimeClient,
chatApi: ChatApi,
clientId: String,
private val logger: Logger,
) : Room {

private val _messages = DefaultMessages(
Expand All @@ -98,7 +99,19 @@ internal class DefaultRoom(
chatApi = chatApi,
)

override val messages: Messages = _messages
private val _typing: DefaultTyping = DefaultTyping(
roomId = roomId,
realtimeClient = realtimeClient,
options = options.typing,
clientId = clientId,
logger = logger.withContext(tag = "Typing"),
)

override val messages: Messages
get() = _messages

override val typing: Typing
get() = _typing

override val presence: Presence = DefaultPresence(
channel = messages.channel,
Expand All @@ -112,11 +125,6 @@ internal class DefaultRoom(
realtimeChannels = realtimeClient.channels,
)

override val typing: Typing = DefaultTyping(
roomId = roomId,
realtimeClient = realtimeClient,
)

override val occupancy: Occupancy = DefaultOccupancy(
messages = messages,
)
Expand All @@ -140,5 +148,6 @@ internal class DefaultRoom(

fun release() {
_messages.release()
_typing.release()
}
}
2 changes: 2 additions & 0 deletions chat-android/src/main/java/com/ably/chat/Rooms.kt
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ internal class DefaultRooms(
private val chatApi: ChatApi,
override val clientOptions: ClientOptions,
private val clientId: String,
private val logger: Logger,
) : Rooms {
private val roomIdToRoom: MutableMap<String, DefaultRoom> = mutableMapOf()

Expand All @@ -59,6 +60,7 @@ internal class DefaultRooms(
realtimeClient = realtimeClient,
chatApi = chatApi,
clientId = clientId,
logger = logger,
)
}

Expand Down
132 changes: 123 additions & 9 deletions chat-android/src/main/java/com/ably/chat/Typing.kt
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,30 @@
package com.ably.chat

import io.ably.lib.realtime.Channel
import io.ably.lib.types.AblyException
import io.ably.lib.types.ErrorInfo
import java.util.concurrent.CopyOnWriteArrayList
import kotlin.math.min
import kotlin.math.pow
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.launch

/**
* base retry interval, we double it each time
*/
const val PRESENCE_GET_RETRY_INTERVAL_MS = 1500
const val PRESENCE_GET_RETRY_INTERVAL_MS: Long = 1500

/**
* max retry interval
*/
const val PRESENCE_GET_RETRY_MAX_INTERVAL_MS = 30_000
const val PRESENCE_GET_RETRY_MAX_INTERVAL_MS: Long = 30_000

/**
* max num of retries
Expand Down Expand Up @@ -77,30 +91,130 @@ data class TypingEvent(val currentlyTyping: Set<String>)

internal class DefaultTyping(
roomId: String,
private val realtimeClient: RealtimeClient,
realtimeClient: RealtimeClient,
private val clientId: String,
private val options: TypingOptions?,
private val logger: Logger,
) : Typing {
private val typingIndicatorsChannelName = "$roomId::\$chat::\$typingIndicators"

override val channel: Channel
get() = realtimeClient.channels.get(typingIndicatorsChannelName, ChatChannelOptions())
private val typingScope = CoroutineScope(Dispatchers.Default.limitedParallelism(1) + SupervisorJob())

private val eventBus = MutableSharedFlow<Unit>(
extraBufferCapacity = 1,
onBufferOverflow = BufferOverflow.DROP_OLDEST,
)

override val channel: Channel = realtimeClient.channels.get(typingIndicatorsChannelName, ChatChannelOptions())

private var typingJob: Job? = null

private val listeners: MutableList<Typing.Listener> = CopyOnWriteArrayList()

private var lastTyping: Set<String> = setOf()

init {
typingScope.launch {
eventBus.collect {
processEvent()
}
}

channel.presence.subscribe {
if (it.clientId == null) {
logger.error("unable to handle typing event; no clientId", staticContext = mapOf("member" to it.toString()))
} else {
eventBus.tryEmit(Unit)
}
}
}

override fun subscribe(listener: Typing.Listener): Subscription {
TODO("Not yet implemented")
logger.trace("DefaultTyping.subscribe()")
listeners.add(listener)
return Subscription {
logger.trace("DefaultTyping.unsubscribe()")
listeners.remove(listener)
}
}

override suspend fun get(): Set<String> {
TODO("Not yet implemented")
logger.trace("DefaultTyping.get()")
return channel.presence.getCoroutine().map { it.clientId }.toSet()
}

override suspend fun start() {
TODO("Not yet implemented")
logger.trace("DefaultTyping.start()")

typingScope.launch {
// If the user is already typing, reset the timer
if (typingJob != null) {
logger.debug("DefaultTyping.start(); already typing, resetting timer")
typingJob?.cancel()
startTypingTimer()
} else {
startTypingTimer()
channel.presence.enterClientCoroutine(clientId)
}
}.join()
}

override suspend fun stop() {
TODO("Not yet implemented")
logger.trace("DefaultTyping.stop()")
typingScope.launch {
typingJob?.cancel()
channel.presence.leaveClientCoroutine(clientId)
}.join()
}

override fun onDiscontinuity(listener: EmitsDiscontinuities.Listener): Subscription {
TODO("Not yet implemented")
}

fun release() {
typingScope.cancel()
}

private fun startTypingTimer() {
val timeout = options?.timeoutMs ?: throw AblyException.fromErrorInfo(
ErrorInfo(
"Typing options hasn't been initialized",
ErrorCodes.BadRequest,
),
)
logger.trace("DefaultTyping.startTypingTimer()")
typingJob = typingScope.launch {
delay(timeout)
logger.debug("DefaultTyping.startTypingTimer(); timeout expired")
stop()
}
}

private suspend fun processEvent() {
var numRetries = 0
while (numRetries <= PRESENCE_GET_MAX_RETRIES) {
try {
val currentlyTyping = get()
emit(currentlyTyping)
return // Exit if successful
} catch (e: Exception) {
numRetries++
val delayDuration = min(
PRESENCE_GET_RETRY_MAX_INTERVAL_MS,
PRESENCE_GET_RETRY_INTERVAL_MS * 2.0.pow(numRetries).toLong(),
)
logger.debug("Retrying in $delayDuration ms... (Attempt $numRetries of $PRESENCE_GET_MAX_RETRIES)", e)
delay(delayDuration)
}
}
logger.error("Failed to get members after $PRESENCE_GET_MAX_RETRIES retries")
}

private fun emit(currentlyTyping: Set<String>) {
if (lastTyping == currentlyTyping) return
lastTyping = currentlyTyping
listeners.forEach {
it.onEvent(TypingEvent(currentlyTyping))
}
}
}
Loading

0 comments on commit 0067565

Please sign in to comment.