diff --git a/CHANGELOG.md b/CHANGELOG.md index 03033ba3..0750fef1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -42,6 +42,7 @@ Ref: https://keepachangelog.com/en/1.0.0/ * Removes the logic for calculating `running_count` and `total_count` in the `missed_blocks` [#548](https://github.com/provenance-io/explorer-service/pull/548) * Remove update block latency procedure call [#550](https://github.com/provenance-io/explorer-service/pull/550) * Docker images at tag `latest` for main branch merges [#551](https://github.com/provenance-io/explorer-service/pull/551) +* Refactor account processing implementation to be more efficient [#552](https://github.com/provenance-io/explorer-service/issues/552) ## [v5.11.0](https://github.com/provenance-io/explorer-service/releases/tag/v5.11.0) - 2024-08-27 diff --git a/service/src/main/kotlin/io/provenance/explorer/service/async/ScheduledTaskService.kt b/service/src/main/kotlin/io/provenance/explorer/service/async/ScheduledTaskService.kt index 25f46998..334e3382 100644 --- a/service/src/main/kotlin/io/provenance/explorer/service/async/ScheduledTaskService.kt +++ b/service/src/main/kotlin/io/provenance/explorer/service/async/ScheduledTaskService.kt @@ -54,12 +54,6 @@ import io.provenance.explorer.service.PricingService import io.provenance.explorer.service.TokenService import io.provenance.explorer.service.ValidatorService import io.provenance.explorer.service.getBlock -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.channels.ReceiveChannel -import kotlinx.coroutines.channels.produce -import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import org.jetbrains.exposed.sql.SortOrder import org.jetbrains.exposed.sql.SqlExpressionBuilder.inList @@ -445,32 +439,23 @@ class ScheduledTaskService( } @Scheduled(initialDelay = 5000L, fixedDelay = 5000L) - fun startAccountProcess() = runBlocking { - ProcessQueueRecord.reset(ProcessQueueType.ACCOUNT) - val producer = startAccountProcess() - repeat(5) { accountProcessor(producer) } + fun startAccountProcess() { + processAccountRecords() } - @OptIn(ExperimentalCoroutinesApi::class) - fun CoroutineScope.startAccountProcess() = produce { - while (true) { - ProcessQueueRecord.findByType(ProcessQueueType.ACCOUNT).firstOrNull()?.let { - try { - transaction { it.apply { this.processing = true } } - send(it.processValue) - } catch (_: Exception) { - } + fun processAccountRecords() { + ProcessQueueRecord.reset(ProcessQueueType.ACCOUNT) + val records = ProcessQueueRecord.findByType(ProcessQueueType.ACCOUNT) + for (record in records) { + try { + transaction { record.apply { this.processing = true } } + runBlocking { accountService.updateTokenCounts(record.processValue) } + ProcessQueueRecord.delete(ProcessQueueType.ACCOUNT, record.processValue) + } catch (_: Exception) { } } } - fun CoroutineScope.accountProcessor(channel: ReceiveChannel) = launch(Dispatchers.IO) { - for (msg in channel) { - accountService.updateTokenCounts(msg) - ProcessQueueRecord.delete(ProcessQueueType.ACCOUNT, msg) - } - } - @Scheduled(cron = "0 0 0 * * *") // Every beginning of every day fun calculateValidatorMetrics() { val (year, quarter) = DateTime.now().minusMinutes(5).let { it.year to it.monthOfYear.monthToQuarter() }