Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tron: add hot blocks support #345

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion test/tron-usdt/src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ const TOPIC0 = 'ddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef

const dataSource = new DataSourceBuilder()
.setGateway('https://v2.archive.subsquid.io/network/tron-mainnet')
.setBlockRange({from: 11322942, to: 11323358})
.setHttpApi({url: 'https://rpc.ankr.com/http/tron'})
.setBlockRange({from: 65677134})
.addLog({
where: {
address: [CONTRACT],
Expand Down
201 changes: 178 additions & 23 deletions tron/tron-data/src/data-source.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import {Batch, coldIngest} from '@subsquid/util-internal-ingest-tools'
import {RangeRequest, SplitRequest} from '@subsquid/util-internal-range'
import {assertNotNull} from '@subsquid/util-internal'
import {Batch, BlockConsistencyError, BlockRef, coldIngest, HotProcessor, HotUpdate, isDataConsistencyError, trimInvalid} from '@subsquid/util-internal-ingest-tools'
import {getRequestAt, mapRangeRequestList, rangeEnd, RangeRequest, rangeToArray, splitRange, splitRangeByRequest, SplitRequest} from '@subsquid/util-internal-range'
import {assertNotNull, AsyncQueue, last, maybeLast, Throttler, wait} from '@subsquid/util-internal'
import assert from 'assert'
import {BlockData, TransactionInfo} from './data'
import {HttpApi} from './http'
Expand Down Expand Up @@ -40,14 +40,20 @@ export class HttpDataSource {
this.finalityConfirmation = 20
}

getBlockHeader(height: number) {
return this.httpApi.getBlock(height, false)
async getBlockHeader(height: number) {
let block = await this.httpApi.getBlock(height, false)
return assertNotNull(block)
}

async getFinalizedHeight(): Promise<number> {
let height = await this.getHeight()
return height - this.finalityConfirmation
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should never be negative.


async getHeight(): Promise<number> {
let block = await this.httpApi.getNowBlock()
let number = assertNotNull(block.block_header.raw_data.number)
return number - this.finalityConfirmation
return number
}

getFinalizedBlocks(
Expand All @@ -56,7 +62,7 @@ export class HttpDataSource {
): AsyncIterable<Batch<BlockData>> {
return coldIngest({
getFinalizedHeight: () => this.getFinalizedHeight(),
getSplit: (req) => this.getSplit(req),
getSplit: (req) => this.getColdSplit(req),
requests,
concurrency: this.strideConcurrency,
splitSize: this.strideSize,
Expand All @@ -65,24 +71,144 @@ export class HttpDataSource {
})
}

private async getBlock(num: number, detail: boolean): Promise<BlockData> {
let block = await this.httpApi.getBlock(num, detail)
return {
async *getHotBlocks(
requests: RangeRequest<DataRequest>[],
): AsyncIterable<HotUpdate<BlockData>> {
if (requests.length == 0) return

let self = this

let from = requests[0].range.from - 1
let block = await this.getBlockHeader(from)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if we start from zero?


let queue: HotUpdate<BlockData>[] = []

let proc = new HotProcessor<BlockData>(
{
height: from,
hash: block.blockID,
top: [],
},
{
process: async (update) => { queue.push(update) },
getBlock: async (ref) => {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just realised, that src.getBlockStream() interface for hot blocks requires handling data consistency errors on a processor side as well.

For example, consider the case, when storage request was made to a block that is already gone.

Currently we don't handle this in the SDK, but prev callback based interface for hot blocks had ability to do that.

let req = getRequestAt(requests, ref.height) || {}
let block = await this.getBlock(ref.hash, !!req.transactionsInfo)
if (block == null) throw new BlockConsistencyError(ref)
await this.addRequestedData([block], req)
if (block._isInvalid) {
throw new BlockConsistencyError(block, block._errorMessage)
}
return block
},
async *getBlockRange(from: number, to: BlockRef): AsyncIterable<BlockData[]> {
assert(to.height != null)
if (from > to.height) {
from = to.height
}
for (let split of splitRangeByRequest(requests, {from, to: to.height})) {
let request = split.request || {}
for (let range of splitRange(10, split.range)) {
let blocks = await self.getHotSplit({
range,
request,
finalizedHeight: proc.getFinalizedHeight(),
})
let lastBlock = maybeLast(blocks)?.height ?? range.from - 1
yield blocks
if (lastBlock < range.to) {
throw new BlockConsistencyError({height: lastBlock + 1})
}
}
}
},
getHeader(block) {
return {
height: block.height,
hash: block.block.blockID,
parentHash: block.block.block_header.raw_data.parentHash,
}
},
}
)

let isEnd = () => proc.getFinalizedHeight() >= rangeEnd(last(requests).range)

let prev = -1
let height = new Throttler(() => this.getHeight(), this.headPollInterval)
while (!isEnd()) {
let next = await height.call()
if (next <= prev) continue
prev = next
for (let i = 0; i < 100; i++) {
try {
await proc.goto({
best: {height: next},
finalized: {
height: Math.max(next - this.finalityConfirmation, 0)
}
})

let update = queue.shift()
while (update) {
yield update
update = queue.shift()
}

break
} catch(err: any) {
if (isDataConsistencyError(err)) {
await wait(100)
} else {
throw err
}
}
}
}
}

private async getBlock(numOrHash: number | string, detail: boolean): Promise<BlockData | undefined> {
let block = await this.httpApi.getBlock(numOrHash, detail)
return block ? {
block,
height: block.block_header.raw_data.number || 0,
hash: getBlockHash(block.blockID)
}
hash: getBlockHash(block.blockID),
} : undefined
}

private async getBlocks(from: number, to: number, detail: boolean): Promise<BlockData[]> {
let promises = []
for (let num = from; num <= to; num++) {
let promise = this.getBlock(num, detail)
promises.push(promise)
}
private async getBlocks(numbers: number[], detail: boolean): Promise<(BlockData | undefined)[]> {
let promises = numbers.map(n => this.getBlock(n, detail))
return Promise.all(promises)
}

private async getColdBlocks(numbers: number[], withTransactions: boolean, depth: number = 0): Promise<BlockData[]> {
let result = await this.getBlocks(numbers, withTransactions)
let missing: number[] = []
for (let i = 0; i < result.length; i++) {
if (result[i] == null) {
missing.push(i)
}
}

if (missing.length == 0) return result as BlockData[]

if (depth > 9) throw new BlockConsistencyError({
height: numbers[missing[0]]
}, `failed to get finalized block after ${depth} attempts`)

let missed = await this.getColdBlocks(
missing.map(i => numbers[i]),
withTransactions,
depth + 1
)

for (let i = 0; i < missing.length; i++) {
result[missing[i]] = missed[i]
}

return result as BlockData[]
}

private async addTransactionsInfo(blocks: BlockData[]) {
let promises = []
for (let block of blocks) {
Expand All @@ -105,11 +231,40 @@ export class HttpDataSource {
await Promise.all(promises)
}

private async getSplit(req: SplitRequest<DataRequest>): Promise<BlockData[]> {
let blocks = await this.getBlocks(req.range.from, req.range.to, !!req.request.transactions)
if (req.request.transactionsInfo) {
this.addTransactionsInfo(blocks)
belopash marked this conversation as resolved.
Show resolved Hide resolved
}
private async getColdSplit(req: SplitRequest<DataRequest>): Promise<BlockData[]> {
let blocks = await this.getColdBlocks(rangeToArray(req.range), !!req.request.transactions)

await this.addRequestedData(blocks, req.request)

return blocks
}

private async getHotSplit(req: SplitRequest<DataRequest> & {finalizedHeight: number}): Promise<BlockData[]> {
let blocks = await this.getBlocks(rangeToArray(req.range), !!req.request.transactions)

let chain: BlockData[] = []

for (let i = 0; i < blocks.length; i++) {
let block = blocks[i]
if (block == null) break
if (i > 0 && chain[i - 1].block.blockID !== block.block.block_header.raw_data.parentHash) break
chain.push(block)
}

await this.addRequestedData(chain, req.request)

return trimInvalid(chain)
}

private async addRequestedData(blocks: BlockData[], req: DataRequest): Promise<void> {
if (blocks.length == 0) return

let subtasks = []

if (req.transactionsInfo) {
subtasks.push(this.addTransactionsInfo(blocks))
}

await Promise.all(subtasks)
}
}
7 changes: 6 additions & 1 deletion tron/tron-data/src/data.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,10 @@ export const TransactionInfo = object({
unfreeze_amount: option(ANY_NAT),
internal_transactions: option(array(InternalTransaction)),
withdraw_expire_amount: option(ANY_NAT),
cancel_unfreezeV2_amount: option(record(STRING, ANY_NAT)),
cancel_unfreezeV2_amount: option(array(object({
key: STRING,
value: ANY_NAT
}))),
})


Expand Down Expand Up @@ -165,4 +168,6 @@ export interface BlockData {
hash: string
block: Block
transactionsInfo?: TransactionInfo[]
_isInvalid?: boolean
_errorMessage?: string
}
20 changes: 16 additions & 4 deletions tron/tron-data/src/http.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,31 @@ function getResultValidator<V extends Validator>(validator: V): (result: unknown
}


function isEmpty(value: unknown): boolean {
return value != null && typeof value == 'object' && Object.keys(value).length == 0
}


export class HttpApi {
constructor(
private readonly http: HttpClient,
private readonly options: RequestOptions = {}
) {}

async getBlock(num: number, detail: boolean): Promise<Block> {
return this.post('wallet/getblock', {
async getBlock(hashOrHeight: number | string, detail: boolean): Promise<Block | undefined> {
let block = await this.post('wallet/getblock', {
json: {
id_or_num: String(num),
id_or_num: String(hashOrHeight),
detail
}
}, getResultValidator(Block))
})

if (isEmpty(block)) {
return undefined
}

let validateResult = getResultValidator(Block)
return validateResult(block)
}

async getTransactionInfo(num: number): Promise<TransactionInfo[]> {
Expand Down
6 changes: 3 additions & 3 deletions tron/tron-normalization/src/mapping.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import assert from 'assert'
import {Block, BlockHeader, CallValueInfo, InternalTransaction, Log, Transaction} from './data'


function mapBlockHeader(src: raw.Block): BlockHeader {
export function mapBlockHeader(src: raw.Block): BlockHeader {
return {
hash: src.blockID,
height: src.block_header.raw_data.number || 0,
Expand Down Expand Up @@ -81,8 +81,8 @@ function mapTransaction(src: raw.Transaction, transactionIndex: number, info?: r
}
if (info?.cancel_unfreezeV2_amount) {
tx.cancelUnfreezeV2Amount = {}
for (let key in info.cancel_unfreezeV2_amount) {
tx.cancelUnfreezeV2Amount[key] = BigInt(info.cancel_unfreezeV2_amount[key])
for (let obj of info.cancel_unfreezeV2_amount) {
tx.cancelUnfreezeV2Amount[obj.key] = BigInt(obj.value)
}
}

Expand Down
7 changes: 7 additions & 0 deletions tron/tron-stream/src/data/data-partial.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import type * as data from '@subsquid/tron-normalization'
import type {MakePartial} from './util'
import {HashAndHeight} from '@subsquid/util-internal-ingest-tools'


export type BlockRequiredFields = 'height' | 'hash'
Expand All @@ -20,3 +21,9 @@ export interface PartialBlock {
logs?: PartialLog[]
internalTransactions?: PartialInternalTransaction[]
}


export interface BlocksData<B> {
finalizedHead: HashAndHeight
blocks: B[]
}
10 changes: 7 additions & 3 deletions tron/tron-stream/src/gateway/source.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import {array, cast} from '@subsquid/util-internal-validation'
import assert from 'assert'
import {DataRequest} from '../data/data-request'
import {getDataSchema} from './data-schema'
import {PartialBlock} from '../data/data-partial'
import {BlocksData, PartialBlock} from '../data/data-partial'


export class TronGateway {
Expand Down Expand Up @@ -56,7 +56,7 @@ export class TronGateway {
async *getBlockStream(
requests: RangeRequestList<DataRequest>,
stopOnHead?: boolean
): AsyncIterable<PartialBlock[]> {
): AsyncIterable<BlocksData<PartialBlock>> {
let archiveRequests = mapRangeRequestList(requests, req => {
let {fields, includeAllBlocks, ...items} = req
let archiveItems: any = {}
Expand All @@ -79,6 +79,10 @@ export class TronGateway {
})) {
let req = getRequestAt(requests, batch.blocks[0].header.number)

// FIXME: needs to be done during batch ingestion
let finalizedHeight = await this.getFinalizedHeight()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no need for this. Simply take the last block in the batch as finalized.

Copy link
Contributor Author

@belopash belopash Oct 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, now I understand what you mean in this comment, but this is some odd behavior. Finalized head should never decrease
If you write stream data directly to a message queue, you won't expect a such trap

let finalizedHead = await this.getBlockHeader(finalizedHeight)

let blocks = cast(
array(getDataSchema(assertNotNull(req?.fields))),
batch.blocks
Expand All @@ -90,7 +94,7 @@ export class TronGateway {
}
})

yield blocks as any
yield {finalizedHead, blocks: blocks as PartialBlock[]}
}
}
}
Loading
Loading