-
Notifications
You must be signed in to change notification settings - Fork 153
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
tron: add hot blocks support #345
Changes from 3 commits
46930d7
77f14c6
2b541f1
e0daafe
fd79a53
73c187c
57508d6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,6 @@ | ||
import {Batch, coldIngest} from '@subsquid/util-internal-ingest-tools' | ||
import {RangeRequest, SplitRequest} from '@subsquid/util-internal-range' | ||
import {assertNotNull} from '@subsquid/util-internal' | ||
import {Batch, BlockConsistencyError, BlockRef, coldIngest, HotProcessor, HotUpdate, isDataConsistencyError, trimInvalid} from '@subsquid/util-internal-ingest-tools' | ||
import {getRequestAt, mapRangeRequestList, rangeEnd, RangeRequest, splitRange, splitRangeByRequest, SplitRequest} from '@subsquid/util-internal-range' | ||
import {assertNotNull, AsyncQueue, last, maybeLast, Throttler, wait} from '@subsquid/util-internal' | ||
import assert from 'assert' | ||
import {BlockData, TransactionInfo} from './data' | ||
import {HttpApi} from './http' | ||
|
@@ -45,9 +45,14 @@ export class HttpDataSource { | |
} | ||
|
||
async getFinalizedHeight(): Promise<number> { | ||
let height = await this.getHeight() | ||
return height - this.finalityConfirmation | ||
} | ||
|
||
async getHeight(): Promise<number> { | ||
let block = await this.httpApi.getNowBlock() | ||
let number = assertNotNull(block.block_header.raw_data.number) | ||
return number - this.finalityConfirmation | ||
return number | ||
} | ||
|
||
getFinalizedBlocks( | ||
|
@@ -56,7 +61,7 @@ export class HttpDataSource { | |
): AsyncIterable<Batch<BlockData>> { | ||
return coldIngest({ | ||
getFinalizedHeight: () => this.getFinalizedHeight(), | ||
getSplit: (req) => this.getSplit(req), | ||
getSplit: (req) => this.getColdSplit(req), | ||
requests, | ||
concurrency: this.strideConcurrency, | ||
splitSize: this.strideSize, | ||
|
@@ -65,12 +70,110 @@ export class HttpDataSource { | |
}) | ||
} | ||
|
||
async *getHotBlocks( | ||
requests: RangeRequest<DataRequest>[], | ||
): AsyncIterable<HotUpdate<BlockData>> { | ||
if (requests.length == 0) return | ||
|
||
let self = this | ||
|
||
let from = requests[0].range.from - 1 | ||
let block = await this.getBlockHeader(from) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What if we start from zero? |
||
|
||
let queue: HotUpdate<BlockData>[] = [] | ||
|
||
let proc = new HotProcessor<BlockData>( | ||
{ | ||
height: from, | ||
hash: block.blockID, | ||
top: [], | ||
}, | ||
{ | ||
process: async (update) => { queue.push(update) }, | ||
getBlock: async (ref) => { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just realised, that For example, consider the case, when storage request was made to a block that is already gone. Currently we don't handle this in the SDK, but prev callback based interface for hot blocks had ability to do that. |
||
let req = getRequestAt(requests, ref.height) || {} | ||
let block = await this.getBlock(ref.height, !!req.transactionsInfo) | ||
if (block.block.blockID !== ref.hash) { | ||
throw new BlockConsistencyError({hash: ref.hash}) | ||
} | ||
await this.addRequestedData([block], req) | ||
if (block._isInvalid) { | ||
throw new BlockConsistencyError(block, block._errorMessage) | ||
} | ||
return block | ||
}, | ||
async *getBlockRange(from: number, to: BlockRef): AsyncIterable<BlockData[]> { | ||
assert(to.height != null) | ||
if (from > to.height) { | ||
from = to.height | ||
} | ||
for (let split of splitRangeByRequest(requests, {from, to: to.height})) { | ||
let request = split.request || {} | ||
for (let range of splitRange(10, split.range)) { | ||
let blocks = await self.getHotSplit({ | ||
range, | ||
request, | ||
finalizedHeight: proc.getFinalizedHeight(), | ||
}) | ||
let lastBlock = maybeLast(blocks)?.height ?? range.from - 1 | ||
yield blocks | ||
if (lastBlock < range.to) { | ||
throw new BlockConsistencyError({height: lastBlock + 1}) | ||
} | ||
} | ||
} | ||
}, | ||
getHeader(block) { | ||
return { | ||
height: block.height, | ||
hash: block.block.blockID, | ||
parentHash: block.block.block_header.raw_data.parentHash, | ||
} | ||
}, | ||
} | ||
) | ||
|
||
let isEnd = () => proc.getFinalizedHeight() >= rangeEnd(last(requests).range) | ||
|
||
let prev = -1 | ||
let height = new Throttler(() => this.getHeight(), this.headPollInterval) | ||
while (!isEnd()) { | ||
let next = await height.call() | ||
if (next <= prev) continue | ||
prev = next | ||
for (let i = 0; i < 100; i++) { | ||
try { | ||
await proc.goto({ | ||
best: {height: next}, | ||
finalized: { | ||
height: Math.max(next - this.finalityConfirmation, 0) | ||
} | ||
}) | ||
|
||
let update = queue.shift() | ||
while (update) { | ||
yield update | ||
update = queue.shift() | ||
} | ||
|
||
break | ||
} catch(err: any) { | ||
if (isDataConsistencyError(err)) { | ||
await wait(100) | ||
} else { | ||
throw err | ||
} | ||
} | ||
} | ||
} | ||
} | ||
|
||
private async getBlock(num: number, detail: boolean): Promise<BlockData> { | ||
let block = await this.httpApi.getBlock(num, detail) | ||
return { | ||
block, | ||
height: block.block_header.raw_data.number || 0, | ||
hash: getBlockHash(block.blockID) | ||
hash: getBlockHash(block.blockID), | ||
} | ||
} | ||
|
||
|
@@ -105,11 +208,38 @@ export class HttpDataSource { | |
await Promise.all(promises) | ||
} | ||
|
||
private async getSplit(req: SplitRequest<DataRequest>): Promise<BlockData[]> { | ||
private async getColdSplit(req: SplitRequest<DataRequest>): Promise<BlockData[]> { | ||
let blocks = await this.getBlocks(req.range.from, req.range.to, !!req.request.transactions) | ||
if (req.request.transactionsInfo) { | ||
this.addTransactionsInfo(blocks) | ||
belopash marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
return blocks | ||
} | ||
|
||
private async getHotSplit(req: SplitRequest<DataRequest> & {finalizedHeight: number}): Promise<BlockData[]> { | ||
let blocks = await this.getBlocks(req.range.from, req.range.to, !!req.request.transactions) | ||
|
||
let chain: BlockData[] = [] | ||
|
||
for (let i = 0; i < blocks.length; i++) { | ||
let block = blocks[i] | ||
if (block == null) break | ||
if (i > 0 && chain[i - 1].block.blockID !== block.block.block_header.raw_data.parentHash) break | ||
chain.push(block) | ||
} | ||
|
||
await this.addRequestedData(chain, req.request) | ||
|
||
return trimInvalid(chain) | ||
} | ||
|
||
private async addRequestedData(blocks: BlockData[], req: DataRequest): Promise<void> { | ||
if (blocks.length == 0) return | ||
|
||
let subtasks = [] | ||
|
||
if (req.transactionsInfo) { | ||
subtasks.push(this.addTransactionsInfo(blocks)) | ||
} | ||
|
||
await Promise.all(subtasks) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,7 +7,7 @@ import {array, cast} from '@subsquid/util-internal-validation' | |
import assert from 'assert' | ||
import {DataRequest} from '../data/data-request' | ||
import {getDataSchema} from './data-schema' | ||
import {PartialBlock} from '../data/data-partial' | ||
import {BlocksData, PartialBlock} from '../data/data-partial' | ||
|
||
|
||
export class TronGateway { | ||
|
@@ -56,7 +56,7 @@ export class TronGateway { | |
async *getBlockStream( | ||
requests: RangeRequestList<DataRequest>, | ||
stopOnHead?: boolean | ||
): AsyncIterable<PartialBlock[]> { | ||
): AsyncIterable<BlocksData<PartialBlock>> { | ||
let archiveRequests = mapRangeRequestList(requests, req => { | ||
let {fields, includeAllBlocks, ...items} = req | ||
let archiveItems: any = {} | ||
|
@@ -79,6 +79,10 @@ export class TronGateway { | |
})) { | ||
let req = getRequestAt(requests, batch.blocks[0].header.number) | ||
|
||
// FIXME: needs to be done during batch ingestion | ||
let finalizedHeight = await this.getFinalizedHeight() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is no need for this. Simply take the last block in the batch as finalized. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, now I understand what you mean in this comment, but this is some odd behavior. Finalized head should never decrease |
||
let finalizedHead = await this.getBlockHeader(finalizedHeight) | ||
|
||
let blocks = cast( | ||
array(getDataSchema(assertNotNull(req?.fields))), | ||
batch.blocks | ||
|
@@ -90,7 +94,7 @@ export class TronGateway { | |
} | ||
}) | ||
|
||
yield blocks as any | ||
yield {finalizedHead, blocks: blocks as PartialBlock[]} | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,13 +1,14 @@ | ||
import {mapRangeRequestList, RangeRequestList} from '@subsquid/util-internal-range' | ||
import {applyRangeBound, mapRangeRequestList, RangeRequestList} from '@subsquid/util-internal-range' | ||
import { | ||
Block, | ||
DataRequest as RawDataRequest, | ||
HttpDataSource as RawHttpDataSource | ||
} from '@subsquid/tron-data' | ||
import {mapBlock} from '@subsquid/tron-normalization' | ||
import {BlockHeader, mapBlock, mapBlockHeader} from '@subsquid/tron-normalization' | ||
import {DataRequest} from '../data/data-request' | ||
import {PartialBlock} from '../data/data-partial' | ||
import {BlocksData, PartialBlock} from '../data/data-partial' | ||
import {filterBlockBatch} from './filter' | ||
import assert from 'assert' | ||
import {last} from '@subsquid/util-internal' | ||
|
||
|
||
export class HttpDataSource { | ||
|
@@ -18,25 +19,55 @@ export class HttpDataSource { | |
} | ||
|
||
async getBlockHash(height: number): Promise<string | undefined> { | ||
let header = await this.baseDataSource.getBlockHeader(height) | ||
return header.blockID | ||
let header = await this.getBlockHeader(height) | ||
return header?.hash | ||
} | ||
|
||
getBlockHeader(height: number): Promise<Block | undefined> { | ||
return this.baseDataSource.getBlockHeader(height) | ||
async getBlockHeader(height: number): Promise<BlockHeader | undefined> { | ||
let header = await this.baseDataSource.getBlockHeader(height) | ||
return header ? mapBlockHeader(header) : undefined | ||
} | ||
|
||
async *getBlockStream( | ||
async *getBlockStream(opts: { | ||
requests: RangeRequestList<DataRequest>, | ||
stopOnHead?: boolean | ||
): AsyncIterable<PartialBlock[]> { | ||
for await (let batch of this.baseDataSource.getFinalizedBlocks( | ||
mapRangeRequestList(requests, toRawDataRequest), | ||
stopOnHead | ||
)) { | ||
let blocks = batch.blocks.map(mapBlock) | ||
filterBlockBatch(requests, blocks) | ||
yield blocks | ||
supportHotBlocks?: boolean | ||
}): AsyncIterable<BlocksData<PartialBlock>> { | ||
let requests = opts.requests | ||
let from = requests[0].range.from | ||
|
||
while (true) { | ||
requests = applyRangeBound(requests, {from}) | ||
|
||
for await (let batch of this.baseDataSource.getFinalizedBlocks( | ||
mapRangeRequestList(requests, toRawDataRequest), | ||
!!opts.supportHotBlocks || opts.stopOnHead | ||
)) { | ||
// FIXME: needs to be done during batch ingestion | ||
let finalizedHeight = await this.getFinalizedHeight() | ||
let finalizedHead = await this.getBlockHeader(finalizedHeight) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is no need for this. Simply take the last block in the batch. |
||
assert(finalizedHead != null) | ||
|
||
let blocks = batch.blocks.map(mapBlock) | ||
filterBlockBatch(requests, blocks) | ||
yield {finalizedHead, blocks: blocks as PartialBlock[]} | ||
from = last(blocks).header.height + 1 | ||
} | ||
|
||
if (opts.supportHotBlocks) { | ||
requests = applyRangeBound(requests, {from}) | ||
|
||
for await (let data of this.baseDataSource.getHotBlocks( | ||
mapRangeRequestList(requests, toRawDataRequest), | ||
)) { | ||
let blocks = data.blocks.map(mapBlock) | ||
filterBlockBatch(requests, blocks) | ||
yield {finalizedHead: data.finalizedHead, blocks: blocks as PartialBlock[]} | ||
from = Math.min(last(blocks).header.height, data.finalizedHead.height) + 1 | ||
} | ||
} | ||
|
||
if (opts.stopOnHead) break | ||
} | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should never be negative.