Skip to content

Commit

Permalink
Refactor account processing implementation to be more efficient (#553)
Browse files Browse the repository at this point in the history
* fix proccess accounts function to be more effecient

* add change log

* fix lints
  • Loading branch information
nullpointer0x00 authored Oct 2, 2024
1 parent fc1d8b6 commit db73ab3
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 26 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ Ref: https://keepachangelog.com/en/1.0.0/
* Removes the logic for calculating `running_count` and `total_count` in the `missed_blocks` [#548](https://github.com/provenance-io/explorer-service/pull/548)
* Remove update block latency procedure call [#550](https://github.com/provenance-io/explorer-service/pull/550)
* Docker images at tag `latest` for main branch merges [#551](https://github.com/provenance-io/explorer-service/pull/551)
* Refactor account processing implementation to be more efficient [#552](https://github.com/provenance-io/explorer-service/issues/552)

## [v5.11.0](https://github.com/provenance-io/explorer-service/releases/tag/v5.11.0) - 2024-08-27

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,6 @@ import io.provenance.explorer.service.PricingService
import io.provenance.explorer.service.TokenService
import io.provenance.explorer.service.ValidatorService
import io.provenance.explorer.service.getBlock
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.produce
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.jetbrains.exposed.sql.SortOrder
import org.jetbrains.exposed.sql.SqlExpressionBuilder.inList
Expand Down Expand Up @@ -445,32 +439,23 @@ class ScheduledTaskService(
}

@Scheduled(initialDelay = 5000L, fixedDelay = 5000L)
fun startAccountProcess() = runBlocking {
ProcessQueueRecord.reset(ProcessQueueType.ACCOUNT)
val producer = startAccountProcess()
repeat(5) { accountProcessor(producer) }
fun startAccountProcess() {
processAccountRecords()
}

@OptIn(ExperimentalCoroutinesApi::class)
fun CoroutineScope.startAccountProcess() = produce {
while (true) {
ProcessQueueRecord.findByType(ProcessQueueType.ACCOUNT).firstOrNull()?.let {
try {
transaction { it.apply { this.processing = true } }
send(it.processValue)
} catch (_: Exception) {
}
fun processAccountRecords() {
ProcessQueueRecord.reset(ProcessQueueType.ACCOUNT)
val records = ProcessQueueRecord.findByType(ProcessQueueType.ACCOUNT)
for (record in records) {
try {
transaction { record.apply { this.processing = true } }
runBlocking { accountService.updateTokenCounts(record.processValue) }
ProcessQueueRecord.delete(ProcessQueueType.ACCOUNT, record.processValue)
} catch (_: Exception) {
}
}
}

fun CoroutineScope.accountProcessor(channel: ReceiveChannel<String>) = launch(Dispatchers.IO) {
for (msg in channel) {
accountService.updateTokenCounts(msg)
ProcessQueueRecord.delete(ProcessQueueType.ACCOUNT, msg)
}
}

@Scheduled(cron = "0 0 0 * * *") // Every beginning of every day
fun calculateValidatorMetrics() {
val (year, quarter) = DateTime.now().minusMinutes(5).let { it.year to it.monthOfYear.monthToQuarter() }
Expand Down

0 comments on commit db73ab3

Please sign in to comment.