From 645a5d58c86e6a25112a13493eede39b1850f873 Mon Sep 17 00:00:00 2001 From: koalasat Date: Mon, 28 Oct 2024 22:36:31 +0100 Subject: [PATCH 1/2] Fix channel recovery and add events queue --- .../pokey/service/NotificationsService.kt | 164 ++++++++++-------- 1 file changed, 91 insertions(+), 73 deletions(-) diff --git a/app/src/main/java/com/koalasat/pokey/service/NotificationsService.kt b/app/src/main/java/com/koalasat/pokey/service/NotificationsService.kt index 929ecf3..0b399db 100644 --- a/app/src/main/java/com/koalasat/pokey/service/NotificationsService.kt +++ b/app/src/main/java/com/koalasat/pokey/service/NotificationsService.kt @@ -37,12 +37,36 @@ import com.vitorpamplona.quartz.events.EventInterface import java.time.Instant import java.util.Timer import java.util.TimerTask +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.LinkedBlockingQueue +import kotlin.concurrent.thread import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.delay import kotlinx.coroutines.launch +class QueueSystem(private val onProcess: (Event) -> Unit) { + private val queue = LinkedBlockingQueue() + private val processedEvents = ConcurrentHashMap.newKeySet() + + init { + thread(start = true) { + while (true) { + val event = queue.take() + // Check for duplication + if (processedEvents.add(event.id)) { + onProcess(event) + } + } + } + } + + fun add(event: Event) { + queue.offer(event) + } +} + class NotificationsService : Service() { private var channelRelaysId = "RelaysConnections" private var channelNotificationsId = "Notifications" @@ -51,16 +75,6 @@ class NotificationsService : Service() { private var subscriptionInboxId = "inboxRelays" private var subscriptionReadId = "readRelays" - private var receivedEventsCache = mutableSetOf() - private var defaultRelayUrls = listOf( - "wss://relay.damus.io", - "wss://offchain.pub", - "wss://relay.snort.social", - "wss://nos.lol", - "wss://relay.nsec.app", - "wss://relay.0xchat.com", - ) - private val timer = Timer() private val scope = CoroutineScope(Dispatchers.IO + SupervisorJob()) @@ -91,12 +105,8 @@ class NotificationsService : Service() { relay: Relay, afterEOSE: Boolean, ) { - if (receivedEventsCache.contains(event.id)) return Log.d("Pokey", "Relay Event: ${relay.url} - $subscriptionId - ${event.toJson()}") - receivedEventsCache.add(event.id) - if (subscriptionId == subscriptionNotificationId) { - createNoteNotification(event) - } + eventsQueue.add(event) } override fun onNotify(relay: Relay, description: String) { @@ -156,6 +166,16 @@ class NotificationsService : Service() { } } + private var defaultRelayUrls = listOf( + "wss://relay.damus.io", + "wss://offchain.pub", + "wss://relay.snort.social", + "wss://nos.lol", + "wss://relay.nsec.app", + "wss://relay.0xchat.com", + ) + private var eventsQueue = QueueSystem { createNoteNotification(it) } + override fun onBind(intent: Intent): IBinder { return null!! } @@ -170,9 +190,13 @@ class NotificationsService : Service() { override fun onStartCommand(intent: Intent?, flags: Int, startId: Int): Int { Log.d("Pokey", "Starting foreground service...") + RelayPool.getAll().forEach { RelayPool.removeRelay(it) } startForeground(1, createNotification()) - startSubscription() - keepAlive() + CoroutineScope(Dispatchers.IO).launch { + connectRelays() + startSubscription() + keepAlive() + } val connectivityManager = (getSystemService(ConnectivityManager::class.java) as ConnectivityManager) @@ -183,8 +207,7 @@ class NotificationsService : Service() { override fun onDestroy() { timer.cancel() - stopSubscription() - RelayPool.disconnect() + RelayPool.getAll().forEach { RelayPool.removeRelay(it) } try { val connectivityManager = @@ -201,53 +224,50 @@ class NotificationsService : Service() { val hexKey = Pokey.getInstance().getHexKey() if (hexKey.isEmpty()) return - CoroutineScope(Dispatchers.IO).launch { - if (!Client.isSubscribed(clientListener)) Client.subscribe(clientListener) + if (!Client.isSubscribed(clientListener)) Client.subscribe(clientListener) - val dao = AppDatabase.getDatabase(this@NotificationsService, hexKey).applicationDao() - var latestNotification = dao.getLatestNotification() - if (latestNotification == null) latestNotification = Instant.now().toEpochMilli() / 1000 + val dao = AppDatabase.getDatabase(this@NotificationsService, hexKey).applicationDao() + var latestNotification = dao.getLatestNotification() + if (latestNotification == null) latestNotification = Instant.now().toEpochMilli() / 1000 - connectRelays() - Client.sendFilter( - subscriptionNotificationId, - listOf( - TypedFilter( - types = COMMON_FEED_TYPES, - filter = SincePerRelayFilter( - tags = mapOf("p" to listOf(hexKey)), - since = RelayPool.getAll().associate { it.url to EOSETime(latestNotification) }, - ), + Client.sendFilter( + subscriptionNotificationId, + listOf( + TypedFilter( + types = COMMON_FEED_TYPES, + filter = SincePerRelayFilter( + tags = mapOf("p" to listOf(hexKey)), + since = RelayPool.getAll().associate { it.url to EOSETime(latestNotification) }, ), ), - ) - Client.sendFilterAndStopOnFirstResponse( - subscriptionReadId, - listOf( - TypedFilter( - types = EVENT_FINDER_TYPES, - filter = SincePerRelayFilter( - kinds = listOf(10002), - authors = listOf(hexKey), - ), + ), + ) + Client.sendFilterAndStopOnFirstResponse( + subscriptionReadId, + listOf( + TypedFilter( + types = EVENT_FINDER_TYPES, + filter = SincePerRelayFilter( + kinds = listOf(10002), + authors = listOf(hexKey), ), ), - onResponse = { manageInboxRelays(it) }, - ) - Client.sendFilterAndStopOnFirstResponse( - subscriptionInboxId, - listOf( - TypedFilter( - types = EVENT_FINDER_TYPES, - filter = SincePerRelayFilter( - kinds = listOf(10050), - authors = listOf(hexKey), - ), + ), + onResponse = { manageInboxRelays(it) }, + ) + Client.sendFilterAndStopOnFirstResponse( + subscriptionInboxId, + listOf( + TypedFilter( + types = EVENT_FINDER_TYPES, + filter = SincePerRelayFilter( + kinds = listOf(10050), + authors = listOf(hexKey), ), ), - onResponse = { manageInboxRelays(it) }, - ) - } + ), + onResponse = { manageInboxRelays(it) }, + ) } private fun stopSubscription() { @@ -258,24 +278,21 @@ class NotificationsService : Service() { timer.schedule( object : TimerTask() { override fun run() { - receivedEventsCache.clear() - CoroutineScope(Dispatchers.IO).launch { - if (RelayPool.getAll().isEmpty()) { - connectRelays() - } - RelayPool.getAll().forEach { - if (!it.isConnected()) { - Log.d( - "Pokey", - "Relay ${it.url} is not connected, reconnecting...", - ) - it.connectAndSendFiltersIfDisconnected() - } + if (RelayPool.getAll().isEmpty()) { + connectRelays() + } + RelayPool.getAll().forEach { + if (!it.isConnected()) { + Log.d( + "Pokey", + "Relay ${it.url} is not connected, reconnecting...", + ) + it.connectAndSendFiltersIfDisconnected() } } } }, - 5000, + 0, 61000, ) } @@ -427,8 +444,9 @@ class NotificationsService : Service() { ) .setContentTitle(title) .setContentText(text) + .setWhen(event.createdAt) .setSmallIcon(R.drawable.ic_launcher_foreground) - .setPriority(NotificationCompat.PRIORITY_HIGH) + .setPriority(NotificationCompat.PRIORITY_DEFAULT) .setContentIntent(pendingIntent) .setAutoCancel(true) From d3e1e4827a2ea692ad725d3e8587aa16c4b0ea49 Mon Sep 17 00:00:00 2001 From: koalasat Date: Mon, 28 Oct 2024 22:55:43 +0100 Subject: [PATCH 2/2] Better queue --- .../pokey/service/NotificationsService.kt | 31 +++---------------- 1 file changed, 5 insertions(+), 26 deletions(-) diff --git a/app/src/main/java/com/koalasat/pokey/service/NotificationsService.kt b/app/src/main/java/com/koalasat/pokey/service/NotificationsService.kt index 0b399db..6e90de0 100644 --- a/app/src/main/java/com/koalasat/pokey/service/NotificationsService.kt +++ b/app/src/main/java/com/koalasat/pokey/service/NotificationsService.kt @@ -38,35 +38,12 @@ import java.time.Instant import java.util.Timer import java.util.TimerTask import java.util.concurrent.ConcurrentHashMap -import java.util.concurrent.LinkedBlockingQueue -import kotlin.concurrent.thread import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.delay import kotlinx.coroutines.launch -class QueueSystem(private val onProcess: (Event) -> Unit) { - private val queue = LinkedBlockingQueue() - private val processedEvents = ConcurrentHashMap.newKeySet() - - init { - thread(start = true) { - while (true) { - val event = queue.take() - // Check for duplication - if (processedEvents.add(event.id)) { - onProcess(event) - } - } - } - } - - fun add(event: Event) { - queue.offer(event) - } -} - class NotificationsService : Service() { private var channelRelaysId = "RelaysConnections" private var channelNotificationsId = "Notifications" @@ -77,6 +54,7 @@ class NotificationsService : Service() { private val timer = Timer() private val scope = CoroutineScope(Dispatchers.IO + SupervisorJob()) + private val processedEvents = ConcurrentHashMap() private val clientListener = object : Client.Listener { @@ -105,8 +83,10 @@ class NotificationsService : Service() { relay: Relay, afterEOSE: Boolean, ) { - Log.d("Pokey", "Relay Event: ${relay.url} - $subscriptionId - ${event.toJson()}") - eventsQueue.add(event) + if (processedEvents.putIfAbsent(event.id, true) == null) { + Log.d("Pokey", "Relay Event: ${relay.url} - $subscriptionId - ${event.toJson()}") + createNoteNotification(event) + } } override fun onNotify(relay: Relay, description: String) { @@ -174,7 +154,6 @@ class NotificationsService : Service() { "wss://relay.nsec.app", "wss://relay.0xchat.com", ) - private var eventsQueue = QueueSystem { createNoteNotification(it) } override fun onBind(intent: Intent): IBinder { return null!!