diff --git a/database/src/main/resources/db/migration/V1_91__Create_tx_processing_failures_table.sql b/database/src/main/resources/db/migration/V1_91__Create_tx_processing_failures_table.sql new file mode 100644 index 00000000..d09d275e --- /dev/null +++ b/database/src/main/resources/db/migration/V1_91__Create_tx_processing_failures_table.sql @@ -0,0 +1,13 @@ +SELECT 'Create tx_processing_failure table' AS comment; + +CREATE TABLE IF NOT EXISTS tx_processing_failure ( + id SERIAL PRIMARY KEY, + block_height INT NOT NULL, + tx_hash VARCHAR(128) NOT NULL, + process_type VARCHAR(64) NOT NULL, + failure_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + error_message TEXT DEFAULT NULL, + retried BOOLEAN NOT NULL DEFAULT FALSE, + success BOOLEAN NOT NULL DEFAULT FALSE, + UNIQUE (block_height, tx_hash, process_type) +); diff --git a/service/src/main/kotlin/io/provenance/explorer/domain/entities/Blocks.kt b/service/src/main/kotlin/io/provenance/explorer/domain/entities/Blocks.kt index e51ecf02..68219c6c 100644 --- a/service/src/main/kotlin/io/provenance/explorer/domain/entities/Blocks.kt +++ b/service/src/main/kotlin/io/provenance/explorer/domain/entities/Blocks.kt @@ -212,8 +212,7 @@ class BlockProposerRecord(id: EntityID) : IntEntity(id) { fun getRecordsForProposer(address: String, limit: Int) = transaction { BlockProposerRecord.find { - (BlockProposerTable.proposerOperatorAddress eq address) and - (BlockProposerTable.blockLatency.isNotNull()) + (BlockProposerTable.proposerOperatorAddress eq address) and (BlockProposerTable.blockLatency.isNotNull()) }.orderBy(Pair(BlockProposerTable.blockHeight, SortOrder.DESC)) .limit(limit) .toList() @@ -353,13 +352,14 @@ class BlockCacheHourlyTxCountsRecord(id: EntityID) : Entity( BlockCacheHourlyTxCountsTable.slice(txSum).selectAll().map { it[txSum] }.first()!! } - fun getTxCountsForParams(fromDate: DateTime, toDate: DateTime, granularity: DateTruncGranularity) = transaction { - when (granularity) { - DAY, MONTH -> getGranularityCounts(fromDate, toDate, granularity) - HOUR -> getHourlyCounts(fromDate, toDate) - MINUTE -> emptyList() + fun getTxCountsForParams(fromDate: DateTime, toDate: DateTime, granularity: DateTruncGranularity) = + transaction { + when (granularity) { + DAY, MONTH -> getGranularityCounts(fromDate, toDate, granularity) + HOUR -> getHourlyCounts(fromDate, toDate) + MINUTE -> emptyList() + } } - } fun getTxHeatmap(fromDate: DateTime? = null, toDate: DateTime? = null) = transaction { val blockTimestamp = BlockCacheHourlyTxCountsTable.blockTimestamp @@ -399,22 +399,23 @@ class BlockCacheHourlyTxCountsRecord(id: EntityID) : Entity( TxHeatmapRes(result, dayTotals, hourTotals) } - private fun getGranularityCounts(fromDate: DateTime, toDate: DateTime, granularity: DateTruncGranularity) = transaction { - val dateTrunc = DateTrunc(granularity.name, BlockCacheHourlyTxCountsTable.blockTimestamp) - val txSum = BlockCacheHourlyTxCountsTable.txCount.sum() - BlockCacheHourlyTxCountsTable.slice(dateTrunc, txSum) - .select { - dateTrunc.between(fromDate.startOfDay(), toDate.startOfDay()) - } - .groupBy(dateTrunc) - .orderBy(dateTrunc, SortOrder.DESC) - .map { - TxHistory( - it[dateTrunc]!!.withZone(DateTimeZone.UTC).toString("yyyy-MM-dd HH:mm:ss"), - it[txSum]!! - ) - } - } + private fun getGranularityCounts(fromDate: DateTime, toDate: DateTime, granularity: DateTruncGranularity) = + transaction { + val dateTrunc = DateTrunc(granularity.name, BlockCacheHourlyTxCountsTable.blockTimestamp) + val txSum = BlockCacheHourlyTxCountsTable.txCount.sum() + BlockCacheHourlyTxCountsTable.slice(dateTrunc, txSum) + .select { + dateTrunc.between(fromDate.startOfDay(), toDate.startOfDay()) + } + .groupBy(dateTrunc) + .orderBy(dateTrunc, SortOrder.DESC) + .map { + TxHistory( + it[dateTrunc]!!.withZone(DateTimeZone.UTC).toString("yyyy-MM-dd HH:mm:ss"), + it[txSum]!! + ) + } + } private fun getHourlyCounts(fromDate: DateTime, toDate: DateTime) = transaction { BlockCacheHourlyTxCountsRecord.find { @@ -534,18 +535,14 @@ class BlockTxRetryRecord(id: EntityID) : IntEntity(id) { it[this.height] = height it[this.retried] = true it[this.success] = false - it[this.errorBlock] = - "NON BLOCKING ERROR: Logged to know what happened, but didnt stop processing.\n " + - e.stackTraceToString() + it[this.errorBlock] = "NON BLOCKING ERROR: Logged to know what happened, but didnt stop processing.\n " + e.stackTraceToString() } } fun insertNonBlockingRetry(height: Int, e: Exception) = transaction { BlockTxRetryTable.insertIgnore { it[this.height] = height - it[this.errorBlock] = - "NON BLOCKING ERROR: Logged to know what happened, but didnt stop processing.\n " + - e.stackTraceToString() + it[this.errorBlock] = "NON BLOCKING ERROR: Logged to know what happened, but didnt stop processing.\n " + e.stackTraceToString() } } @@ -582,3 +579,70 @@ class BlockTxRetryRecord(id: EntityID) : IntEntity(id) { var success by BlockTxRetryTable.success var errorBlock by BlockTxRetryTable.errorBlock } + +object TxProcessingFailureTable : IdTable(name = "tx_processing_failure") { + val blockHeight = integer("block_height") + val txHash = varchar("tx_hash", 128) + val processType = varchar("process_type", 64) + val failureTime = datetime("failure_time") + val errorMessage = text("error_message").nullable() + val retried = bool("retried").default(false) + val success = bool("success").default(false) + + override val id = integer("id").entityId() + + init { + index(true, blockHeight, txHash, processType) + } +} + +class TxProcessingFailureRecord(id: EntityID) : IntEntity(id) { + companion object : IntEntityClass(TxProcessingFailureTable) { + + fun insertOrUpdate( + blockHeight: Int, + txHash: String, + processType: String, + errorMessage: String?, + success: Boolean + ) = transaction { + val existingRecord = TxProcessingFailureRecord.find { + (TxProcessingFailureTable.blockHeight eq blockHeight) and + (TxProcessingFailureTable.txHash eq txHash) and + (TxProcessingFailureTable.processType eq processType) + }.firstOrNull() + + if (existingRecord == null) { + TxProcessingFailureTable.insertIgnore { + it[this.blockHeight] = blockHeight + it[this.txHash] = txHash + it[this.processType] = processType + it[this.errorMessage] = errorMessage + it[this.success] = success + } + } else { + existingRecord.apply { + this.errorMessage = errorMessage + this.success = success + this.retried = true + this.failureTime = DateTime.now() + } + } + } + + fun deleteProcessedRecords() = transaction { + TxProcessingFailureTable.deleteWhere { + (TxProcessingFailureTable.retried eq true) and + (TxProcessingFailureTable.success eq true) + } + } + } + + var blockHeight by TxProcessingFailureTable.blockHeight + var txHash by TxProcessingFailureTable.txHash + var processType by TxProcessingFailureTable.processType + var failureTime by TxProcessingFailureTable.failureTime + var errorMessage by TxProcessingFailureTable.errorMessage + var retried by TxProcessingFailureTable.retried + var success by TxProcessingFailureTable.success +} diff --git a/service/src/main/kotlin/io/provenance/explorer/service/ExplorerService.kt b/service/src/main/kotlin/io/provenance/explorer/service/ExplorerService.kt index 72a8befe..db7d319c 100644 --- a/service/src/main/kotlin/io/provenance/explorer/service/ExplorerService.kt +++ b/service/src/main/kotlin/io/provenance/explorer/service/ExplorerService.kt @@ -69,7 +69,7 @@ import io.provenance.explorer.model.base.DateTruncGranularity import io.provenance.explorer.model.base.PREFIX_SCOPE import io.provenance.explorer.model.base.PagedResults import io.provenance.explorer.model.base.USD_UPPER -import io.provenance.explorer.service.async.AsyncCachingV2 +import io.provenance.explorer.service.async.BlockAndTxProcessor import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.async import kotlinx.coroutines.runBlocking @@ -88,7 +88,7 @@ class ExplorerService( private val blockService: BlockService, private val validatorService: ValidatorService, private val assetService: AssetService, - private val asyncV2: AsyncCachingV2, + private val asyncV2: BlockAndTxProcessor, private val govClient: GovGrpcClient, private val accountClient: AccountGrpcClient, private val ibcClient: IbcGrpcClient, diff --git a/service/src/main/kotlin/io/provenance/explorer/service/TransactionService.kt b/service/src/main/kotlin/io/provenance/explorer/service/TransactionService.kt index 8038e512..ac11f7e7 100644 --- a/service/src/main/kotlin/io/provenance/explorer/service/TransactionService.kt +++ b/service/src/main/kotlin/io/provenance/explorer/service/TransactionService.kt @@ -54,7 +54,7 @@ import io.provenance.explorer.model.base.isMAddress import io.provenance.explorer.model.base.toMAddress import io.provenance.explorer.model.base.toMAddressScope import io.provenance.explorer.model.download.TxHistoryChartData -import io.provenance.explorer.service.async.AsyncCachingV2 +import io.provenance.explorer.service.async.BlockAndTxProcessor import io.provenance.explorer.service.async.getAddressType import org.jetbrains.exposed.dao.id.EntityID import org.jetbrains.exposed.sql.SizedIterable @@ -68,7 +68,7 @@ import javax.servlet.ServletOutputStream @Service class TransactionService( private val protoPrinter: JsonFormat.Printer, - private val asyncV2: AsyncCachingV2, + private val asyncV2: BlockAndTxProcessor, private val nftService: NftService, private val valService: ValidatorService, private val ibcService: IbcService diff --git a/service/src/main/kotlin/io/provenance/explorer/service/async/AsyncCachingV2.kt b/service/src/main/kotlin/io/provenance/explorer/service/async/BlockAndTxProcessor.kt similarity index 97% rename from service/src/main/kotlin/io/provenance/explorer/service/async/AsyncCachingV2.kt rename to service/src/main/kotlin/io/provenance/explorer/service/async/BlockAndTxProcessor.kt index b632670c..f31ff861 100644 --- a/service/src/main/kotlin/io/provenance/explorer/service/async/AsyncCachingV2.kt +++ b/service/src/main/kotlin/io/provenance/explorer/service/async/BlockAndTxProcessor.kt @@ -35,6 +35,7 @@ import io.provenance.explorer.domain.entities.TxMessageRecord import io.provenance.explorer.domain.entities.TxMsgTypeSubtypeRecord import io.provenance.explorer.domain.entities.TxMsgTypeSubtypeTable import io.provenance.explorer.domain.entities.TxNftJoinRecord +import io.provenance.explorer.domain.entities.TxProcessingFailureRecord import io.provenance.explorer.domain.entities.TxSingleMessageCacheRecord import io.provenance.explorer.domain.entities.TxSmCodeRecord import io.provenance.explorer.domain.entities.TxSmContractRecord @@ -124,7 +125,7 @@ import org.joda.time.DateTime import org.springframework.stereotype.Service @Service -class AsyncCachingV2( +class BlockAndTxProcessor( private val txClient: TransactionGrpcClient, private val blockService: BlockService, private val validatorService: ValidatorService, @@ -139,7 +140,7 @@ class AsyncCachingV2( private val groupService: GroupService ) { - protected val logger = logger(AsyncCachingV2::class) + protected val logger = logger(BlockAndTxProcessor::class) protected var chainId: String = "" @@ -254,11 +255,11 @@ class AsyncCachingV2( if (pullFromDb) { transaction { TxCacheRecord.findByHeight(blockHeight) - .map { addTxToCacheWithTimestamp(it.txV2, blockTime, proposerRec) } + .map { processAndSaveTransactionData(it.txV2, blockTime.toDateTime(), proposerRec) } } } else { runBlocking { txClient.getTxsByHeight(blockHeight, txCount) } - .map { addTxToCacheWithTimestamp(it, blockTime, proposerRec) } + .map { processAndSaveTransactionData(it, blockTime.toDateTime(), proposerRec) } } } catch (e: Exception) { logger.error("Failed to retrieve transactions at block: $blockHeight error: ${e.message}", e) @@ -266,15 +267,7 @@ class AsyncCachingV2( listOf() } - fun addTxToCacheWithTimestamp( - res: ServiceOuterClass.GetTxResponse, - blockTime: Timestamp, - proposerRec: BlockProposer - ) = - addTxToCache(res, blockTime.toDateTime(), proposerRec) - - // Function that saves all the things under a transaction - fun addTxToCache( + fun processAndSaveTransactionData( res: ServiceOuterClass.GetTxResponse, blockTime: DateTime, proposerRec: BlockProposer @@ -282,17 +275,31 @@ class AsyncCachingV2( val tx = TxCacheRecord.buildInsert(res, blockTime) val txUpdate = TxUpdate(tx) val txInfo = TxData(proposerRec.blockHeight, null, res.txResponse.txhash, blockTime) + + // TODO: See: https://github.com/provenance-io/explorer-service/issues/538 saveMessages(txInfo, res, txUpdate) saveTxFees(res, txInfo, txUpdate, proposerRec) val addrs = saveAddresses(txInfo, res, txUpdate) val markers = saveMarkers(txInfo, res, txUpdate) saveNftData(txInfo, res, txUpdate) saveGovData(res, txInfo, txUpdate) - saveIbcChannelData(res, txInfo, txUpdate) + try { + saveIbcChannelData(res, txInfo, txUpdate) + } catch (e: Exception) { + logger.error("Failed to process IBC channel data for tx ${txInfo.txHash} at height ${txInfo.blockHeight}. Error: ${e.message}") + TxProcessingFailureRecord.insertOrUpdate( + txInfo.blockHeight, + txInfo.txHash, + "ibc_channel_data", + e.stackTraceToString(), + false + ) + } saveSmartContractData(res, txInfo, txUpdate) saveNameData(res, txInfo) groupService.saveGroups(res, txInfo, txUpdate) saveSignaturesTx(res, txInfo, txUpdate) + return TxUpdatedItems(addrs, markers, txUpdate) } diff --git a/service/src/main/kotlin/io/provenance/explorer/service/async/AsyncService.kt b/service/src/main/kotlin/io/provenance/explorer/service/async/ScheduledTaskService.kt similarity index 98% rename from service/src/main/kotlin/io/provenance/explorer/service/async/AsyncService.kt rename to service/src/main/kotlin/io/provenance/explorer/service/async/ScheduledTaskService.kt index cfb6c18e..845b7e08 100644 --- a/service/src/main/kotlin/io/provenance/explorer/service/async/AsyncService.kt +++ b/service/src/main/kotlin/io/provenance/explorer/service/async/ScheduledTaskService.kt @@ -88,12 +88,12 @@ import java.time.ZoneOffset import javax.annotation.PostConstruct @Service -class AsyncService( +class ScheduledTaskService( private val props: ExplorerProperties, private val blockService: BlockService, private val assetService: AssetService, private val govService: GovService, - private val asyncCache: AsyncCachingV2, + private val blockAndTxProcessor: BlockAndTxProcessor, private val explorerService: ExplorerService, private val cacheService: CacheService, private val tokenService: TokenService, @@ -103,7 +103,7 @@ class AsyncService( private val metricsService: MetricsService ) { - protected val logger = logger(AsyncService::class) + protected val logger = logger(ScheduledTaskService::class) protected var collectHistorical = true @PostConstruct @@ -135,7 +135,7 @@ class AsyncService( shouldContinue = false return } - asyncCache.saveBlockEtc(it) + blockAndTxProcessor.saveBlockEtc(it) indexHeight = it.block.height() - 1 } blockService.updateBlockMinHeightIndex(indexHeight + 1) @@ -144,7 +144,7 @@ class AsyncService( } else { while (indexHeight > index.first!!) { blockService.getBlockAtHeightFromChain(indexHeight)?.let { - asyncCache.saveBlockEtc(it) + blockAndTxProcessor.saveBlockEtc(it) indexHeight = it.block.height() - 1 } } @@ -246,7 +246,7 @@ class AsyncService( logger.info("Retrying block/tx record at $height.") var retryException: Exception? = null val block = try { - asyncCache.saveBlockEtc(blockService.getBlockAtHeightFromChain(height), Pair(true, false))!! + blockAndTxProcessor.saveBlockEtc(blockService.getBlockAtHeightFromChain(height), Pair(true, false))!! } catch (e: Exception) { retryException = e logger.error("Error saving block $height on retry.", e) @@ -409,7 +409,7 @@ class AsyncService( (startBlock.toInt()..minOf(props.oneElevenBugRange()!!.last, startBlock.toInt().plus(100))).toList() .let { BlockCacheRecord.getBlocksForRange(it.first(), it.last()) } .forEach { block -> - if (block.txCount > 0) asyncCache.saveBlockEtc(block.block, Pair(true, false)) + if (block.txCount > 0) blockAndTxProcessor.saveBlockEtc(block.block, Pair(true, false)) // Check if the last processed block equals the end of the fee bug range if (block.height == props.oneElevenBugRange()!!.last) { cacheService.updateCacheValue(CacheKeys.FEE_BUG_ONE_ELEVEN_START_BLOCK.key, done) diff --git a/service/src/main/kotlin/io/provenance/explorer/service/utility/MigrationService.kt b/service/src/main/kotlin/io/provenance/explorer/service/utility/MigrationService.kt index c6f4294e..75c0e807 100644 --- a/service/src/main/kotlin/io/provenance/explorer/service/utility/MigrationService.kt +++ b/service/src/main/kotlin/io/provenance/explorer/service/utility/MigrationService.kt @@ -6,14 +6,14 @@ import io.provenance.explorer.domain.entities.BlockCacheTable import io.provenance.explorer.service.AccountService import io.provenance.explorer.service.BlockService import io.provenance.explorer.service.ValidatorService -import io.provenance.explorer.service.async.AsyncCachingV2 +import io.provenance.explorer.service.async.BlockAndTxProcessor import org.jetbrains.exposed.sql.SortOrder import org.jetbrains.exposed.sql.transactions.transaction import org.springframework.stereotype.Service @Service class MigrationService( - private val asyncCaching: AsyncCachingV2, + private val asyncCaching: BlockAndTxProcessor, private val validatorService: ValidatorService, private val accountService: AccountService, private val blockService: BlockService diff --git a/service/src/main/kotlin/io/provenance/explorer/service/utility/UtilityService.kt b/service/src/main/kotlin/io/provenance/explorer/service/utility/UtilityService.kt index e4323bf9..6f0bc22f 100644 --- a/service/src/main/kotlin/io/provenance/explorer/service/utility/UtilityService.kt +++ b/service/src/main/kotlin/io/provenance/explorer/service/utility/UtilityService.kt @@ -18,7 +18,7 @@ import io.provenance.explorer.domain.models.explorer.BlockProposer import io.provenance.explorer.domain.models.explorer.getCategoryForType import io.provenance.explorer.grpc.v1.AccountGrpcClient import io.provenance.explorer.service.AssetService -import io.provenance.explorer.service.async.AsyncCachingV2 +import io.provenance.explorer.service.async.BlockAndTxProcessor import io.provenance.explorer.service.firstMatchLabel import kotlinx.coroutines.runBlocking import net.pearx.kasechange.toSnakeCase @@ -33,7 +33,7 @@ class UtilityService( private val protoParser: JsonFormat.Parser, private val accountClient: AccountGrpcClient, private val assetService: AssetService, - private val async: AsyncCachingV2 + private val async: BlockAndTxProcessor ) { protected val logger = logger(UtilityService::class) @@ -96,7 +96,7 @@ class UtilityService( fun parseRawTxJson(rawJson: String, blockHeight: Int = 1, timestamp: DateTime = DateTime.now()) = transaction { val builder = ServiceOuterClass.GetTxResponse.newBuilder() protoParser.ignoringUnknownFields().merge(rawJson, builder) - async.addTxToCache(builder.build(), DateTime.now(), BlockProposer(blockHeight, "", timestamp)) + async.processAndSaveTransactionData(builder.build(), DateTime.now(), BlockProposer(blockHeight, "", timestamp)) } fun saveRawTxJson(rawJson: String, blockHeight: Int = 1, timestamp: DateTime = DateTime.now()) = transaction { diff --git a/service/src/test/kotlin/io/provenance/explorer/domain/entities/TxProcessingFailuresTableTest.kt b/service/src/test/kotlin/io/provenance/explorer/domain/entities/TxProcessingFailuresTableTest.kt new file mode 100644 index 00000000..0d3d8092 --- /dev/null +++ b/service/src/test/kotlin/io/provenance/explorer/domain/entities/TxProcessingFailuresTableTest.kt @@ -0,0 +1,68 @@ +import io.provenance.explorer.domain.entities.TxProcessingFailureRecord +import io.provenance.explorer.domain.entities.TxProcessingFailureTable +import org.jetbrains.exposed.sql.Database +import org.jetbrains.exposed.sql.and +import org.jetbrains.exposed.sql.transactions.transaction +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertNotNull +import org.junit.jupiter.api.Test + +class TxProcessingFailureRecordTest { + + @Test + fun `test tx_processing_failures table insertOrUpdate`() { + Database.connect("jdbc:h2:mem:test;MODE=PostgreSQL;DB_CLOSE_DELAY=-1;", driver = "org.h2.Driver") + + transaction { + val sql = this::class.java.getResource("/db/migration/V1_91__Create_tx_processing_failures_table.sql")!! + .readText() + exec(sql) + } + + transaction { + TxProcessingFailureRecord.insertOrUpdate( + blockHeight = 100, + txHash = "testHash", + processType = "testProcess", + errorMessage = "testError", + success = false + ) + + var record = TxProcessingFailureRecord.find { + (TxProcessingFailureTable.blockHeight eq 100) and + (TxProcessingFailureTable.txHash eq "testHash") and + (TxProcessingFailureTable.processType eq "testProcess") + }.firstOrNull() + + assertNotNull(record, "Record should not be null") + assertEquals(100, record?.blockHeight) + assertEquals("testHash", record?.txHash) + assertEquals("testProcess", record?.processType) + assertEquals("testError", record?.errorMessage) + assertEquals(false, record?.success) + } + + transaction { + TxProcessingFailureRecord.insertOrUpdate( + blockHeight = 100, + txHash = "testHash", + processType = "testProcess", + errorMessage = "updatedError", + success = true + ) + + val record = TxProcessingFailureRecord.find { + (TxProcessingFailureTable.blockHeight eq 100) and + (TxProcessingFailureTable.txHash eq "testHash") and + (TxProcessingFailureTable.processType eq "testProcess") + }.firstOrNull() + + assertNotNull(record, "Record should not be null") + assertEquals(100, record?.blockHeight) + assertEquals("testHash", record?.txHash) + assertEquals("testProcess", record?.processType) + assertEquals("updatedError", record?.errorMessage) + assertEquals(true, record?.success) + } + } +}