From 46930d7b66268fcecdfcdbb631cb63650110f3c0 Mon Sep 17 00:00:00 2001 From: tmcgroul Date: Mon, 30 Sep 2024 05:31:38 +0700 Subject: [PATCH 1/6] update cancel_unfreezeV2_amount definition --- tron/tron-data/src/data.ts | 5 ++++- tron/tron-normalization/src/mapping.ts | 4 ++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/tron/tron-data/src/data.ts b/tron/tron-data/src/data.ts index d41c0465..f509272d 100644 --- a/tron/tron-data/src/data.ts +++ b/tron/tron-data/src/data.ts @@ -101,7 +101,10 @@ export const TransactionInfo = object({ unfreeze_amount: option(ANY_NAT), internal_transactions: option(array(InternalTransaction)), withdraw_expire_amount: option(ANY_NAT), - cancel_unfreezeV2_amount: option(record(STRING, ANY_NAT)), + cancel_unfreezeV2_amount: option(array(object({ + key: STRING, + value: ANY_NAT + }))), }) diff --git a/tron/tron-normalization/src/mapping.ts b/tron/tron-normalization/src/mapping.ts index 9306bf83..539e928f 100644 --- a/tron/tron-normalization/src/mapping.ts +++ b/tron/tron-normalization/src/mapping.ts @@ -81,8 +81,8 @@ function mapTransaction(src: raw.Transaction, transactionIndex: number, info?: r } if (info?.cancel_unfreezeV2_amount) { tx.cancelUnfreezeV2Amount = {} - for (let key in info.cancel_unfreezeV2_amount) { - tx.cancelUnfreezeV2Amount[key] = BigInt(info.cancel_unfreezeV2_amount[key]) + for (let obj of info?.cancel_unfreezeV2_amount) { + tx.cancelUnfreezeV2Amount[obj.key] = BigInt(obj.value) } } From 77f14c6fcf9c630f8167bb6dfb7942e6bfc7b3d1 Mon Sep 17 00:00:00 2001 From: belopash Date: Sun, 29 Sep 2024 20:36:35 +0500 Subject: [PATCH 2/6] add hot blocks to tron --- tron/tron-data/src/data-source.ts | 147 ++++++++++++++++++++-- tron/tron-data/src/data.ts | 2 + tron/tron-normalization/src/mapping.ts | 2 +- tron/tron-stream/src/data/data-partial.ts | 7 ++ tron/tron-stream/src/gateway/source.ts | 10 +- tron/tron-stream/src/http/source.ts | 42 +++++-- tron/tron-stream/src/source.ts | 30 +++-- 7 files changed, 203 insertions(+), 37 deletions(-) diff --git a/tron/tron-data/src/data-source.ts b/tron/tron-data/src/data-source.ts index 61894e81..d51f6a98 100644 --- a/tron/tron-data/src/data-source.ts +++ b/tron/tron-data/src/data-source.ts @@ -1,6 +1,6 @@ -import {Batch, coldIngest} from '@subsquid/util-internal-ingest-tools' -import {RangeRequest, SplitRequest} from '@subsquid/util-internal-range' -import {assertNotNull} from '@subsquid/util-internal' +import {Batch, BlockConsistencyError, BlockRef, coldIngest, HotProcessor, HotUpdate, isDataConsistencyError, trimInvalid} from '@subsquid/util-internal-ingest-tools' +import {getRequestAt, mapRangeRequestList, rangeEnd, RangeRequest, splitRange, splitRangeByRequest, SplitRequest} from '@subsquid/util-internal-range' +import {assertNotNull, AsyncQueue, last, maybeLast, Throttler, wait} from '@subsquid/util-internal' import assert from 'assert' import {BlockData, TransactionInfo} from './data' import {HttpApi} from './http' @@ -45,9 +45,14 @@ export class HttpDataSource { } async getFinalizedHeight(): Promise { + let height = await this.getHeight() + return height - this.finalityConfirmation + } + + async getHeight(): Promise { let block = await this.httpApi.getNowBlock() let number = assertNotNull(block.block_header.raw_data.number) - return number - this.finalityConfirmation + return number } getFinalizedBlocks( @@ -56,7 +61,7 @@ export class HttpDataSource { ): AsyncIterable> { return coldIngest({ getFinalizedHeight: () => this.getFinalizedHeight(), - getSplit: (req) => this.getSplit(req), + getSplit: (req) => this.getColdSplit(req), requests, concurrency: this.strideConcurrency, splitSize: this.strideSize, @@ -65,6 +70,103 @@ export class HttpDataSource { }) } + async *getHotBlocks( + requests: RangeRequest[], + stopOnHead?: boolean + ): AsyncIterable> { + if (requests.length == 0) return + + let self = this + + let from = requests[0].range.from + let block = await this.getBlockHeader(from) + + let queue = new AsyncQueue>(1) + + let proc = new HotProcessor( + { + height: from, + hash: block.blockID, + top: [], + }, + { + process: async (update) => { await queue.put(update)}, + getBlock: async (ref) => { + let req = getRequestAt(requests, ref.height) || {} + let block = await this.getBlock(ref.height, !!req.transactionsInfo) + if (block.hash !== ref.hash) { + throw new BlockConsistencyError({hash: ref.hash}) + } + await this.addRequestedData([block], req) + if (block._isInvalid) { + throw new BlockConsistencyError(block, block._errorMessage) + } + return block + }, + async *getBlockRange(from: number, to: BlockRef): AsyncIterable { + assert(to.height != null) + if (from > to.height) { + from = to.height + } + for (let split of splitRangeByRequest(requests, {from, to: to.height})) { + let request = split.request || {} + for (let range of splitRange(10, split.range)) { + let blocks = await self.getHotSplit({ + range, + request, + finalizedHeight: proc.getFinalizedHeight(), + }) + let lastBlock = maybeLast(blocks)?.height ?? range.from - 1 + yield blocks + if (lastBlock < range.to) { + throw new BlockConsistencyError({height: lastBlock + 1}) + } + } + } + }, + getHeader(block) { + return { + hash: block.hash, + height: block.height, + parentHash: block.block.block_header.raw_data.parentHash, + } + }, + } + ) + + let isEnd = () => proc.getFinalizedHeight() >= rangeEnd(last(requests).range) + + let prev = -1 + let height = new Throttler(() => this.getHeight(), this.headPollInterval) + while (!isEnd()) { + let next = await height.call() + if (next <= prev) continue + prev = next + for (let i = 0; i < 100; i++) { + try { + await proc.goto({ + best: {height: next}, + finalized: { + height: Math.max(next - this.finalityConfirmation, 0) + } + }) + + for await (let update of queue.iterate()) { + yield update + } + + break + } catch(err: any) { + if (isDataConsistencyError(err)) { + await wait(100) + } else { + throw err + } + } + } + } + } + private async getBlock(num: number, detail: boolean): Promise { let block = await this.httpApi.getBlock(num, detail) return { @@ -105,11 +207,38 @@ export class HttpDataSource { await Promise.all(promises) } - private async getSplit(req: SplitRequest): Promise { + private async getColdSplit(req: SplitRequest): Promise { let blocks = await this.getBlocks(req.range.from, req.range.to, !!req.request.transactions) - if (req.request.transactionsInfo) { - this.addTransactionsInfo(blocks) - } + return blocks } + + private async getHotSplit(req: SplitRequest & {finalizedHeight: number}): Promise { + let blocks = await this.getBlocks(req.range.from, req.range.to, !!req.request.transactions) + + let chain: BlockData[] = [] + + for (let i = 0; i < blocks.length; i++) { + let block = blocks[i] + if (block == null) break + if (i > 0 && chain[i - 1].hash !== block.block.block_header.raw_data.parentHash) break + chain.push(block) + } + + await this.addRequestedData(chain, req.request) + + return trimInvalid(chain) + } + + private async addRequestedData(blocks: BlockData[], req: DataRequest): Promise { + if (blocks.length == 0) return + + let subtasks = [] + + if (req.transactionsInfo) { + subtasks.push(this.addTransactionsInfo(blocks)) + } + + await Promise.all(subtasks) + } } diff --git a/tron/tron-data/src/data.ts b/tron/tron-data/src/data.ts index d41c0465..53700646 100644 --- a/tron/tron-data/src/data.ts +++ b/tron/tron-data/src/data.ts @@ -165,4 +165,6 @@ export interface BlockData { hash: string block: Block transactionsInfo?: TransactionInfo[] + _isInvalid?: boolean + _errorMessage?: string } diff --git a/tron/tron-normalization/src/mapping.ts b/tron/tron-normalization/src/mapping.ts index 9306bf83..f3e4c0bb 100644 --- a/tron/tron-normalization/src/mapping.ts +++ b/tron/tron-normalization/src/mapping.ts @@ -3,7 +3,7 @@ import assert from 'assert' import {Block, BlockHeader, CallValueInfo, InternalTransaction, Log, Transaction} from './data' -function mapBlockHeader(src: raw.Block): BlockHeader { +export function mapBlockHeader(src: raw.Block): BlockHeader { return { hash: src.blockID, height: src.block_header.raw_data.number || 0, diff --git a/tron/tron-stream/src/data/data-partial.ts b/tron/tron-stream/src/data/data-partial.ts index 28c641d7..5fc7a1b0 100644 --- a/tron/tron-stream/src/data/data-partial.ts +++ b/tron/tron-stream/src/data/data-partial.ts @@ -1,5 +1,6 @@ import type * as data from '@subsquid/tron-normalization' import type {MakePartial} from './util' +import {HashAndHeight} from '@subsquid/util-internal-ingest-tools' export type BlockRequiredFields = 'height' | 'hash' @@ -20,3 +21,9 @@ export interface PartialBlock { logs?: PartialLog[] internalTransactions?: PartialInternalTransaction[] } + + +export interface BlocksData { + finalizedHead: HashAndHeight + blocks: B[] +} \ No newline at end of file diff --git a/tron/tron-stream/src/gateway/source.ts b/tron/tron-stream/src/gateway/source.ts index d58de9ff..349aa4b3 100644 --- a/tron/tron-stream/src/gateway/source.ts +++ b/tron/tron-stream/src/gateway/source.ts @@ -7,7 +7,7 @@ import {array, cast} from '@subsquid/util-internal-validation' import assert from 'assert' import {DataRequest} from '../data/data-request' import {getDataSchema} from './data-schema' -import {PartialBlock} from '../data/data-partial' +import {BlocksData, PartialBlock} from '../data/data-partial' export class TronGateway { @@ -56,7 +56,7 @@ export class TronGateway { async *getBlockStream( requests: RangeRequestList, stopOnHead?: boolean - ): AsyncIterable { + ): AsyncIterable> { let archiveRequests = mapRangeRequestList(requests, req => { let {fields, includeAllBlocks, ...items} = req let archiveItems: any = {} @@ -79,6 +79,10 @@ export class TronGateway { })) { let req = getRequestAt(requests, batch.blocks[0].header.number) + // FIXME: needs to be done during batch ingestion + let finalizedHeight = await this.getFinalizedHeight() + let finalizedHead = await this.getBlockHeader(finalizedHeight) + let blocks = cast( array(getDataSchema(assertNotNull(req?.fields))), batch.blocks @@ -90,7 +94,7 @@ export class TronGateway { } }) - yield blocks as any + yield {finalizedHead, blocks: blocks as PartialBlock[]} } } } diff --git a/tron/tron-stream/src/http/source.ts b/tron/tron-stream/src/http/source.ts index 3a8afd7f..dac33714 100644 --- a/tron/tron-stream/src/http/source.ts +++ b/tron/tron-stream/src/http/source.ts @@ -1,13 +1,13 @@ import {mapRangeRequestList, RangeRequestList} from '@subsquid/util-internal-range' import { - Block, DataRequest as RawDataRequest, HttpDataSource as RawHttpDataSource } from '@subsquid/tron-data' -import {mapBlock} from '@subsquid/tron-normalization' +import {BlockHeader, mapBlock, mapBlockHeader} from '@subsquid/tron-normalization' import {DataRequest} from '../data/data-request' -import {PartialBlock} from '../data/data-partial' +import {BlocksData, PartialBlock} from '../data/data-partial' import {filterBlockBatch} from './filter' +import assert from 'assert' export class HttpDataSource { @@ -18,25 +18,41 @@ export class HttpDataSource { } async getBlockHash(height: number): Promise { - let header = await this.baseDataSource.getBlockHeader(height) - return header.blockID + let header = await this.getBlockHeader(height) + return header?.hash } - getBlockHeader(height: number): Promise { - return this.baseDataSource.getBlockHeader(height) + async getBlockHeader(height: number): Promise { + let header = await this.baseDataSource.getBlockHeader(height) + return header ? mapBlockHeader(header) : undefined } - async *getBlockStream( + async *getBlockStream(opts: { requests: RangeRequestList, stopOnHead?: boolean - ): AsyncIterable { + supportHotBlocks?: boolean + }): AsyncIterable> { for await (let batch of this.baseDataSource.getFinalizedBlocks( - mapRangeRequestList(requests, toRawDataRequest), - stopOnHead + mapRangeRequestList(opts.requests, toRawDataRequest), + !opts.supportHotBlocks || opts.stopOnHead )) { + // FIXME: needs to be done during batch ingestion + let finalizedHeight = await this.getFinalizedHeight() + let finalizedHead = await this.getBlockHeader(finalizedHeight) + assert(finalizedHead != null) + let blocks = batch.blocks.map(mapBlock) - filterBlockBatch(requests, blocks) - yield blocks + filterBlockBatch(opts.requests, blocks) + yield {finalizedHead, blocks: blocks as PartialBlock[]} + } + + for await (let data of this.baseDataSource.getHotBlocks( + mapRangeRequestList(opts.requests, toRawDataRequest), + opts.stopOnHead + )) { + let blocks = data.blocks.map(mapBlock) + filterBlockBatch(opts.requests, blocks) + yield {finalizedHead: data.finalizedHead, blocks: blocks as PartialBlock[]} } } } diff --git a/tron/tron-stream/src/source.ts b/tron/tron-stream/src/source.ts index bf323b35..b53a9050 100644 --- a/tron/tron-stream/src/source.ts +++ b/tron/tron-stream/src/source.ts @@ -23,7 +23,7 @@ import {HttpDataSource} from './http/source' import {TronGateway} from './gateway/source' import {Block, FieldSelection} from './data/model' import {getFields} from './data/fields' -import {PartialBlock} from './data/data-partial' +import {BlocksData, PartialBlock} from './data/data-partial' import { DataRequest, LogRequest, @@ -33,6 +33,7 @@ import { TriggerSmartContractTransactionRequest, InternalTransactionRequest } from './data/data-request' +import {HashAndHeight} from '@subsquid/util-internal-ingest-tools' export interface HttpApiSettings { @@ -254,11 +255,17 @@ export class DataSourceBuilder { } +export interface BlockStreamOptions { + supportHotBlocks?: boolean + range?: Range +} + + export interface DataSource { getFinalizedHeight(): Promise getBlockHash(height: number): Promise getBlocksCountInRange(range: FiniteRange): number - getBlockStream(fromBlockHeight?: number): AsyncIterable + getBlockStream(opts?: BlockStreamOptions): AsyncIterable> } @@ -323,26 +330,27 @@ class TronDataSource implements DataSource { private async performConsistencyCheck(): Promise<{ gatewayBlock: BlockHeader - httpApiBlock: RawBlock | null + httpApiBlock: BlockHeader | null } | undefined> { let gateway = this.createGateway() let height = await gateway.getFinalizedHeight() let gatewayBlock = await gateway.getBlockHeader(height) let httpApiBlock = await this.httpApi!.getBlockHeader(gatewayBlock.height) - if (httpApiBlock?.blockID === gatewayBlock.hash && httpApiBlock.block_header.raw_data.number === gatewayBlock.height) return + if (httpApiBlock?.hash === gatewayBlock.hash && httpApiBlock.height === gatewayBlock.height) return return {gatewayBlock, httpApiBlock: httpApiBlock || null} } + getBlocksCountInRange(range: FiniteRange): number { return getSize(this.ranges, range) } - async *getBlockStream(fromBlockHeight?: number): AsyncIterable { + async *getBlockStream(opts: BlockStreamOptions = {}): AsyncIterable> { await this.assertConsistency() - let requests = fromBlockHeight == null + let requests = opts.range == null ? this.requests - : applyRangeBound(this.requests, {from: fromBlockHeight}) + : applyRangeBound(this.requests, opts.range) if (requests.length == 0) return @@ -353,9 +361,9 @@ class TronDataSource implements DataSource { let height = await gateway.getFinalizedHeight() let from = requests[0].range.from if (height > from || !this.httpApi) { - for await (let batch of gateway.getBlockStream(requests, !!this.httpApi)) { - yield batch - from = last(batch).header.height + 1 + for await (let data of gateway.getBlockStream(requests, !!this.httpApi)) { + yield data + from = last(data.blocks).header.height + 1 } requests = applyRangeBound(requests, {from}) } @@ -368,7 +376,7 @@ class TronDataSource implements DataSource { assert(this.httpApi) - yield* this.httpApi.getBlockStream(requests) + yield* this.httpApi.getBlockStream({requests, supportHotBlocks: opts.supportHotBlocks}) } private createGateway(agent?: HttpAgent): TronGateway { From 2b541f173f11aa3dfe578c8955b270606c22aca1 Mon Sep 17 00:00:00 2001 From: belopash Date: Sun, 29 Sep 2024 22:47:43 +0500 Subject: [PATCH 3/6] save --- test/tron-usdt/src/main.ts | 3 +- tron/tron-data/src/data-source.ts | 18 +++++----- tron/tron-stream/src/http/source.ts | 55 ++++++++++++++++++----------- 3 files changed, 46 insertions(+), 30 deletions(-) diff --git a/test/tron-usdt/src/main.ts b/test/tron-usdt/src/main.ts index cd127323..937b7a04 100644 --- a/test/tron-usdt/src/main.ts +++ b/test/tron-usdt/src/main.ts @@ -13,7 +13,8 @@ const TOPIC0 = 'ddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef const dataSource = new DataSourceBuilder() .setGateway('https://v2.archive.subsquid.io/network/tron-mainnet') - .setBlockRange({from: 11322942, to: 11323358}) + .setHttpApi({url: 'https://rpc.ankr.com/http/tron'}) + .setBlockRange({from: 65659767}) .addLog({ where: { address: [CONTRACT], diff --git a/tron/tron-data/src/data-source.ts b/tron/tron-data/src/data-source.ts index d51f6a98..20b404a3 100644 --- a/tron/tron-data/src/data-source.ts +++ b/tron/tron-data/src/data-source.ts @@ -72,16 +72,15 @@ export class HttpDataSource { async *getHotBlocks( requests: RangeRequest[], - stopOnHead?: boolean ): AsyncIterable> { if (requests.length == 0) return let self = this - let from = requests[0].range.from + let from = requests[0].range.from - 1 let block = await this.getBlockHeader(from) - let queue = new AsyncQueue>(1) + let queue: HotUpdate[] = [] let proc = new HotProcessor( { @@ -90,11 +89,11 @@ export class HttpDataSource { top: [], }, { - process: async (update) => { await queue.put(update)}, + process: async (update) => { queue.push(update) }, getBlock: async (ref) => { let req = getRequestAt(requests, ref.height) || {} let block = await this.getBlock(ref.height, !!req.transactionsInfo) - if (block.hash !== ref.hash) { + if (block.block.blockID !== ref.hash) { throw new BlockConsistencyError({hash: ref.hash}) } await this.addRequestedData([block], req) @@ -126,8 +125,8 @@ export class HttpDataSource { }, getHeader(block) { return { - hash: block.hash, height: block.height, + hash: block.block.blockID, parentHash: block.block.block_header.raw_data.parentHash, } }, @@ -151,8 +150,9 @@ export class HttpDataSource { } }) - for await (let update of queue.iterate()) { + for await (let update of queue) { yield update + queue.shift() } break @@ -172,7 +172,7 @@ export class HttpDataSource { return { block, height: block.block_header.raw_data.number || 0, - hash: getBlockHash(block.blockID) + hash: getBlockHash(block.blockID), } } @@ -221,7 +221,7 @@ export class HttpDataSource { for (let i = 0; i < blocks.length; i++) { let block = blocks[i] if (block == null) break - if (i > 0 && chain[i - 1].hash !== block.block.block_header.raw_data.parentHash) break + if (i > 0 && chain[i - 1].block.blockID !== block.block.block_header.raw_data.parentHash) break chain.push(block) } diff --git a/tron/tron-stream/src/http/source.ts b/tron/tron-stream/src/http/source.ts index dac33714..432f65a0 100644 --- a/tron/tron-stream/src/http/source.ts +++ b/tron/tron-stream/src/http/source.ts @@ -1,4 +1,4 @@ -import {mapRangeRequestList, RangeRequestList} from '@subsquid/util-internal-range' +import {applyRangeBound, mapRangeRequestList, RangeRequestList} from '@subsquid/util-internal-range' import { DataRequest as RawDataRequest, HttpDataSource as RawHttpDataSource @@ -8,6 +8,7 @@ import {DataRequest} from '../data/data-request' import {BlocksData, PartialBlock} from '../data/data-partial' import {filterBlockBatch} from './filter' import assert from 'assert' +import {last} from '@subsquid/util-internal' export class HttpDataSource { @@ -32,27 +33,41 @@ export class HttpDataSource { stopOnHead?: boolean supportHotBlocks?: boolean }): AsyncIterable> { - for await (let batch of this.baseDataSource.getFinalizedBlocks( - mapRangeRequestList(opts.requests, toRawDataRequest), - !opts.supportHotBlocks || opts.stopOnHead - )) { - // FIXME: needs to be done during batch ingestion - let finalizedHeight = await this.getFinalizedHeight() - let finalizedHead = await this.getBlockHeader(finalizedHeight) - assert(finalizedHead != null) + let requests = opts.requests + let from = requests[0].range.from - let blocks = batch.blocks.map(mapBlock) - filterBlockBatch(opts.requests, blocks) - yield {finalizedHead, blocks: blocks as PartialBlock[]} - } + while (true) { + requests = applyRangeBound(requests, {from}) + + for await (let batch of this.baseDataSource.getFinalizedBlocks( + mapRangeRequestList(requests, toRawDataRequest), + !!opts.supportHotBlocks || opts.stopOnHead + )) { + // FIXME: needs to be done during batch ingestion + let finalizedHeight = await this.getFinalizedHeight() + let finalizedHead = await this.getBlockHeader(finalizedHeight) + assert(finalizedHead != null) + + let blocks = batch.blocks.map(mapBlock) + filterBlockBatch(requests, blocks) + yield {finalizedHead, blocks: blocks as PartialBlock[]} + from = last(blocks).header.height + 1 + } + + if (opts.supportHotBlocks) { + requests = applyRangeBound(requests, {from}) + + for await (let data of this.baseDataSource.getHotBlocks( + mapRangeRequestList(requests, toRawDataRequest), + )) { + let blocks = data.blocks.map(mapBlock) + filterBlockBatch(requests, blocks) + yield {finalizedHead: data.finalizedHead, blocks: blocks as PartialBlock[]} + from = Math.min(last(blocks).header.height, data.finalizedHead.height) + 1 + } + } - for await (let data of this.baseDataSource.getHotBlocks( - mapRangeRequestList(opts.requests, toRawDataRequest), - opts.stopOnHead - )) { - let blocks = data.blocks.map(mapBlock) - filterBlockBatch(opts.requests, blocks) - yield {finalizedHead: data.finalizedHead, blocks: blocks as PartialBlock[]} + if (opts.stopOnHead) break } } } From e0daafe7f79cbc6ef480aae332d37151350a5dd9 Mon Sep 17 00:00:00 2001 From: belopash Date: Mon, 30 Sep 2024 12:20:42 +0500 Subject: [PATCH 4/6] save --- test/tron-usdt/src/main.ts | 2 +- tron/tron-data/src/data-source.ts | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/test/tron-usdt/src/main.ts b/test/tron-usdt/src/main.ts index 937b7a04..7ee13ed7 100644 --- a/test/tron-usdt/src/main.ts +++ b/test/tron-usdt/src/main.ts @@ -14,7 +14,7 @@ const TOPIC0 = 'ddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef const dataSource = new DataSourceBuilder() .setGateway('https://v2.archive.subsquid.io/network/tron-mainnet') .setHttpApi({url: 'https://rpc.ankr.com/http/tron'}) - .setBlockRange({from: 65659767}) + .setBlockRange({from: 65677134}) .addLog({ where: { address: [CONTRACT], diff --git a/tron/tron-data/src/data-source.ts b/tron/tron-data/src/data-source.ts index 20b404a3..c899be1f 100644 --- a/tron/tron-data/src/data-source.ts +++ b/tron/tron-data/src/data-source.ts @@ -150,9 +150,10 @@ export class HttpDataSource { } }) - for await (let update of queue) { + let update = queue.shift() + while (update) { yield update - queue.shift() + update = queue.shift() } break From fd79a533bb426640cc44768ffc006d808013f7ae Mon Sep 17 00:00:00 2001 From: tmcgroul Date: Mon, 30 Sep 2024 15:53:37 +0700 Subject: [PATCH 5/6] handle empty block responses --- tron/tron-data/src/data-source.ts | 1 + tron/tron-data/src/http.ts | 20 ++++++++++++++++---- tron/tron-normalization/src/mapping.ts | 2 +- tron/tron-stream/src/http/source.ts | 2 +- 4 files changed, 19 insertions(+), 6 deletions(-) diff --git a/tron/tron-data/src/data-source.ts b/tron/tron-data/src/data-source.ts index 61894e81..f6078d29 100644 --- a/tron/tron-data/src/data-source.ts +++ b/tron/tron-data/src/data-source.ts @@ -67,6 +67,7 @@ export class HttpDataSource { private async getBlock(num: number, detail: boolean): Promise { let block = await this.httpApi.getBlock(num, detail) + assert(block) return { block, height: block.block_header.raw_data.number || 0, diff --git a/tron/tron-data/src/http.ts b/tron/tron-data/src/http.ts index 35702bc3..4ea7edec 100644 --- a/tron/tron-data/src/http.ts +++ b/tron/tron-data/src/http.ts @@ -15,19 +15,31 @@ function getResultValidator(validator: V): (result: unknown } +function isEmpty(value: unknown): boolean { + return value != null && typeof value == 'object' && Object.keys(value).length == 0 +} + + export class HttpApi { constructor( private readonly http: HttpClient, private readonly options: RequestOptions = {} ) {} - async getBlock(num: number, detail: boolean): Promise { - return this.post('wallet/getblock', { + async getBlock(hashOrHeight: number | string, detail: boolean): Promise { + let block = await this.post('wallet/getblock', { json: { - id_or_num: String(num), + id_or_num: String(hashOrHeight), detail } - }, getResultValidator(Block)) + }) + + if (isEmpty(block)) { + return undefined + } + + let validateResult = getResultValidator(Block) + return validateResult(block) } async getTransactionInfo(num: number): Promise { diff --git a/tron/tron-normalization/src/mapping.ts b/tron/tron-normalization/src/mapping.ts index 539e928f..3035394e 100644 --- a/tron/tron-normalization/src/mapping.ts +++ b/tron/tron-normalization/src/mapping.ts @@ -81,7 +81,7 @@ function mapTransaction(src: raw.Transaction, transactionIndex: number, info?: r } if (info?.cancel_unfreezeV2_amount) { tx.cancelUnfreezeV2Amount = {} - for (let obj of info?.cancel_unfreezeV2_amount) { + for (let obj of info.cancel_unfreezeV2_amount) { tx.cancelUnfreezeV2Amount[obj.key] = BigInt(obj.value) } } diff --git a/tron/tron-stream/src/http/source.ts b/tron/tron-stream/src/http/source.ts index 3a8afd7f..403354fd 100644 --- a/tron/tron-stream/src/http/source.ts +++ b/tron/tron-stream/src/http/source.ts @@ -19,7 +19,7 @@ export class HttpDataSource { async getBlockHash(height: number): Promise { let header = await this.baseDataSource.getBlockHeader(height) - return header.blockID + return header?.blockID } getBlockHeader(height: number): Promise { From 57508d6f4bc6b6b9a44fbfa1f94a49081582f3d8 Mon Sep 17 00:00:00 2001 From: belopash Date: Mon, 30 Sep 2024 16:14:25 +0500 Subject: [PATCH 6/6] fix --- tron/tron-data/src/data-source.ts | 64 +++++++++++++++++++++---------- 1 file changed, 44 insertions(+), 20 deletions(-) diff --git a/tron/tron-data/src/data-source.ts b/tron/tron-data/src/data-source.ts index 262a57c1..26b76e5c 100644 --- a/tron/tron-data/src/data-source.ts +++ b/tron/tron-data/src/data-source.ts @@ -1,5 +1,5 @@ import {Batch, BlockConsistencyError, BlockRef, coldIngest, HotProcessor, HotUpdate, isDataConsistencyError, trimInvalid} from '@subsquid/util-internal-ingest-tools' -import {getRequestAt, mapRangeRequestList, rangeEnd, RangeRequest, splitRange, splitRangeByRequest, SplitRequest} from '@subsquid/util-internal-range' +import {getRequestAt, mapRangeRequestList, rangeEnd, RangeRequest, rangeToArray, splitRange, splitRangeByRequest, SplitRequest} from '@subsquid/util-internal-range' import {assertNotNull, AsyncQueue, last, maybeLast, Throttler, wait} from '@subsquid/util-internal' import assert from 'assert' import {BlockData, TransactionInfo} from './data' @@ -40,8 +40,9 @@ export class HttpDataSource { this.finalityConfirmation = 20 } - getBlockHeader(height: number) { - return this.httpApi.getBlock(height, false) + async getBlockHeader(height: number) { + let block = await this.httpApi.getBlock(height, false) + return assertNotNull(block) } async getFinalizedHeight(): Promise { @@ -92,10 +93,8 @@ export class HttpDataSource { process: async (update) => { queue.push(update) }, getBlock: async (ref) => { let req = getRequestAt(requests, ref.height) || {} - let block = await this.getBlock(ref.height, !!req.transactionsInfo) - if (block.block.blockID !== ref.hash) { - throw new BlockConsistencyError({hash: ref.hash}) - } + let block = await this.getBlock(ref.hash, !!req.transactionsInfo) + if (block == null) throw new BlockConsistencyError(ref) await this.addRequestedData([block], req) if (block._isInvalid) { throw new BlockConsistencyError(block, block._errorMessage) @@ -168,25 +167,48 @@ export class HttpDataSource { } } - private async getBlock(num: number, detail: boolean): Promise { - let block = await this.httpApi.getBlock(num, detail) - assert(block) - return { + private async getBlock(numOrHash: number | string, detail: boolean): Promise { + let block = await this.httpApi.getBlock(numOrHash, detail) + return block ? { block, height: block.block_header.raw_data.number || 0, hash: getBlockHash(block.blockID), - } + } : undefined } - private async getBlocks(from: number, to: number, detail: boolean): Promise { - let promises = [] - for (let num = from; num <= to; num++) { - let promise = this.getBlock(num, detail) - promises.push(promise) - } + private async getBlocks(numbers: number[], detail: boolean): Promise<(BlockData | undefined)[]> { + let promises = numbers.map(n => this.getBlock(n, detail)) return Promise.all(promises) } + private async getColdBlocks(numbers: number[], withTransactions: boolean, depth: number = 0): Promise { + let result = await this.getBlocks(numbers, withTransactions) + let missing: number[] = [] + for (let i = 0; i < result.length; i++) { + if (result[i] == null) { + missing.push(i) + } + } + + if (missing.length == 0) return result as BlockData[] + + if (depth > 9) throw new BlockConsistencyError({ + height: numbers[missing[0]] + }, `failed to get finalized block after ${depth} attempts`) + + let missed = await this.getColdBlocks( + missing.map(i => numbers[i]), + withTransactions, + depth + 1 + ) + + for (let i = 0; i < missing.length; i++) { + result[missing[i]] = missed[i] + } + + return result as BlockData[] + } + private async addTransactionsInfo(blocks: BlockData[]) { let promises = [] for (let block of blocks) { @@ -210,13 +232,15 @@ export class HttpDataSource { } private async getColdSplit(req: SplitRequest): Promise { - let blocks = await this.getBlocks(req.range.from, req.range.to, !!req.request.transactions) + let blocks = await this.getColdBlocks(rangeToArray(req.range), !!req.request.transactions) + + await this.addRequestedData(blocks, req.request) return blocks } private async getHotSplit(req: SplitRequest & {finalizedHeight: number}): Promise { - let blocks = await this.getBlocks(req.range.from, req.range.to, !!req.request.transactions) + let blocks = await this.getBlocks(rangeToArray(req.range), !!req.request.transactions) let chain: BlockData[] = []