From 569596e2a8eed0ea5bc073ef4296a8d7ff573366 Mon Sep 17 00:00:00 2001 From: tmcgroul Date: Mon, 22 Jul 2024 07:44:09 +0700 Subject: [PATCH] add finalityConfirmation to substrate processor --- .../substrate-finality_2024-07-22-00-43.json | 10 +++ .../substrate-finality_2024-07-22-00-43.json | 10 +++ .../substrate-finality_2024-07-22-00-43.json | 10 +++ .../substrate-data-raw/src/datasource.ts | 89 +++++++++++++------ substrate/substrate-data/src/datasource.ts | 4 +- substrate/substrate-processor/src/ds-rpc.ts | 1 + .../substrate-processor/src/processor.ts | 17 +++- 7 files changed, 113 insertions(+), 28 deletions(-) create mode 100644 common/changes/@subsquid/substrate-data-raw/substrate-finality_2024-07-22-00-43.json create mode 100644 common/changes/@subsquid/substrate-data/substrate-finality_2024-07-22-00-43.json create mode 100644 common/changes/@subsquid/substrate-processor/substrate-finality_2024-07-22-00-43.json diff --git a/common/changes/@subsquid/substrate-data-raw/substrate-finality_2024-07-22-00-43.json b/common/changes/@subsquid/substrate-data-raw/substrate-finality_2024-07-22-00-43.json new file mode 100644 index 000000000..5d143c0d2 --- /dev/null +++ b/common/changes/@subsquid/substrate-data-raw/substrate-finality_2024-07-22-00-43.json @@ -0,0 +1,10 @@ +{ + "changes": [ + { + "packageName": "@subsquid/substrate-data-raw", + "comment": "allow to use finalityConfirmation to determine finalized blocks", + "type": "minor" + } + ], + "packageName": "@subsquid/substrate-data-raw" +} \ No newline at end of file diff --git a/common/changes/@subsquid/substrate-data/substrate-finality_2024-07-22-00-43.json b/common/changes/@subsquid/substrate-data/substrate-finality_2024-07-22-00-43.json new file mode 100644 index 000000000..3d82db5d9 --- /dev/null +++ b/common/changes/@subsquid/substrate-data/substrate-finality_2024-07-22-00-43.json @@ -0,0 +1,10 @@ +{ + "changes": [ + { + "packageName": "@subsquid/substrate-data", + "comment": "accept finalityConfirmation in `RpcDataSourceOptions`", + "type": "minor" + } + ], + "packageName": "@subsquid/substrate-data" +} \ No newline at end of file diff --git a/common/changes/@subsquid/substrate-processor/substrate-finality_2024-07-22-00-43.json b/common/changes/@subsquid/substrate-processor/substrate-finality_2024-07-22-00-43.json new file mode 100644 index 000000000..60d8ae085 --- /dev/null +++ b/common/changes/@subsquid/substrate-processor/substrate-finality_2024-07-22-00-43.json @@ -0,0 +1,10 @@ +{ + "changes": [ + { + "packageName": "@subsquid/substrate-processor", + "comment": "allow to use finalityConfirmation if `chain_getFinalizedHead` is not an option", + "type": "minor" + } + ], + "packageName": "@subsquid/substrate-processor" +} \ No newline at end of file diff --git a/substrate/substrate-data-raw/src/datasource.ts b/substrate/substrate-data-raw/src/datasource.ts index a5f013163..d195794c6 100644 --- a/substrate/substrate-data-raw/src/datasource.ts +++ b/substrate/substrate-data-raw/src/datasource.ts @@ -1,5 +1,5 @@ import {Logger, LogLevel} from '@subsquid/logger' -import {RpcClient} from '@subsquid/rpc-client' +import {RpcClient, SubscriptionHandle} from '@subsquid/rpc-client' import {AsyncQueue, ensureError, maybeLast, partitionBy, Throttler, wait} from '@subsquid/util-internal' import { assertIsValid, @@ -7,7 +7,6 @@ import { BlockConsistencyError, BlockRef, ChainHeads, - DataConsistencyError, HashAndHeight, HotProcessor, HotState, @@ -34,6 +33,7 @@ export interface RpcDataSourceOptions { rpc: RpcClient headPollInterval?: number newHeadTimeout?: number + finalityConfirmation?: number log?: Logger } @@ -42,20 +42,50 @@ export class RpcDataSource { public readonly rpc: Rpc private headPollInterval: number private newHeadTimeout: number + private finalityConfirmation?: number private log?: Logger constructor(options: RpcDataSourceOptions) { this.rpc = new Rpc(options.rpc) this.headPollInterval = options.headPollInterval ?? 5000 this.newHeadTimeout = options.newHeadTimeout ?? 0 + this.finalityConfirmation = options.finalityConfirmation this.log = options.log } async getFinalizedHeight(): Promise { - let head = await this.rpc.getFinalizedHead() - let header = await this.rpc.getBlockHeader(head) - assert(header, 'finalized blocks must be always available') - return qty2Int(header.number) + if (this.finalityConfirmation == null) { + let head = await this.rpc.getFinalizedHead() + let header = await this.rpc.getBlockHeader(head) + assert(header, 'finalized blocks must be always available') + return qty2Int(header.number) + } else { + let retries = 0 + while (retries < 5) { + let head = await this.rpc.getHead() + let header = await this.rpc.getBlockHeader(head) + if (header == null) { + this.log?.debug(`"${head}" block has no corresponding header. will retry`) + retries += 1 + continue + } + return Math.max(0, qty2Int(header.number) - this.finalityConfirmation) + } + throw new Error('Cannot determine head of the chain') + } + } + + async getFinalizedHead(best: string): Promise { + if (this.finalityConfirmation == null) { + return this.rpc.getFinalizedHead() + } else { + let header = await this.rpc.getBlockHeader(best) + assert(header) + let height = qty2Int(header.number) - this.finalityConfirmation + let hash = await this.rpc.getBlockHash(height) + assert(hash) + return hash + } } async *getFinalizedBlocks(requests: RangeRequestList, stopOnHead?: boolean): AsyncIterable> { @@ -129,7 +159,7 @@ export class RpcDataSource { for (let split of splitRangeByRequest(requests, {from, to: top})) { for (let range of splitRange(10, split.range)) { let blocks = await fetch.getHotSplit( - from, + range.from, range.to === headBlock?.height ? headBlock : range.to, split.request || {} ) @@ -172,7 +202,7 @@ export class RpcDataSource { while (!isEnd()) { let head = await headSrc.call() if (head === prev) continue - let finalizedHead = await this.rpc.getFinalizedHead() + let finalizedHead = await this.getFinalizedHead(head) await this.handleNewHeads({ best: {hash: head}, finalized: {hash: finalizedHead} @@ -181,27 +211,31 @@ export class RpcDataSource { } private async subscription(cb: (heads: ChainHeads) => Promise, isEnd: () => boolean): Promise { + let finalityConfirmation = this.finalityConfirmation let queue = new AsyncQueue(1) let finalizedHeight = 0 let prevHeight = 0 - let finalizedHeadsHandle = this.rpc.client.subscribe({ - method: 'chain_subscribeFinalizedHeads', - unsubscribe: 'chain_unsubscribeFinalizedHeads', - notification: 'chain_finalizedHead', - onMessage(head: BlockHeader) { - try { - let height = qty2Int(head.number) - finalizedHeight = Math.max(finalizedHeight, height) - } catch(err: any) { - close(err) - } - }, - onError(err: Error) { - close(ensureError(err)) - }, - resubscribeOnConnectionLoss: true - }) + let finalizedHeadsHandle: SubscriptionHandle | undefined + if (finalityConfirmation == null) { + finalizedHeadsHandle = this.rpc.client.subscribe({ + method: 'chain_subscribeFinalizedHeads', + unsubscribe: 'chain_unsubscribeFinalizedHeads', + notification: 'chain_finalizedHead', + onMessage(head: BlockHeader) { + try { + let height = qty2Int(head.number) + finalizedHeight = Math.max(finalizedHeight, height) + } catch(err: any) { + close(err) + } + }, + onError(err: Error) { + close(ensureError(err)) + }, + resubscribeOnConnectionLoss: true + }) + } let newHeadsHandle = this.rpc.client.subscribe({ method: 'chain_subscribeNewHeads', @@ -212,6 +246,9 @@ export class RpcDataSource { let height = qty2Int(head.number) if (height >= prevHeight) { prevHeight = height + if (finalityConfirmation != null) { + finalizedHeight = Math.max(0, height - finalityConfirmation) + } queue.forcePut(height) } } catch(err: any) { @@ -226,7 +263,7 @@ export class RpcDataSource { function close(err?: Error) { newHeadsHandle.close() - finalizedHeadsHandle.close() + finalizedHeadsHandle?.close() if (err) { queue.forcePut(err) } diff --git a/substrate/substrate-data/src/datasource.ts b/substrate/substrate-data/src/datasource.ts index 984711af5..6724dfd03 100644 --- a/substrate/substrate-data/src/datasource.ts +++ b/substrate/substrate-data/src/datasource.ts @@ -12,6 +12,7 @@ export interface RpcDataSourceOptions { headPollInterval?: number newHeadTimeout?: number typesBundle?: OldTypesBundle | OldSpecsBundle + finalityConfirmation?: number } @@ -23,7 +24,8 @@ export class RpcDataSource { this.rawDataSource = new raw.RpcDataSource({ rpc: options.rpc, headPollInterval: options.headPollInterval, - newHeadTimeout: options.newHeadTimeout + newHeadTimeout: options.newHeadTimeout, + finalityConfirmation: options.finalityConfirmation }) this.typesBundle = options.typesBundle } diff --git a/substrate/substrate-processor/src/ds-rpc.ts b/substrate/substrate-processor/src/ds-rpc.ts index 25f4f77c0..de61565b2 100644 --- a/substrate/substrate-processor/src/ds-rpc.ts +++ b/substrate/substrate-processor/src/ds-rpc.ts @@ -15,6 +15,7 @@ export interface RpcDataSourceOptions { headPollInterval?: number newHeadTimeout?: number typesBundle?: OldTypesBundle | OldSpecsBundle + finalityConfirmation?: number } diff --git a/substrate/substrate-processor/src/processor.ts b/substrate/substrate-processor/src/processor.ts index ce3394770..1349eb08a 100644 --- a/substrate/substrate-processor/src/processor.ts +++ b/substrate/substrate-processor/src/processor.ts @@ -159,6 +159,7 @@ export class SubstrateBatchProcessor { private requests: RangeRequest[] = [] private fields?: FieldSelection private blockRange?: Range + private finalityConfirmation?: number private archive?: GatewaySettings private rpcEndpoint?: RpcEndpointSettings private rpcIngestSettings?: RpcDataIngestionSettings @@ -285,6 +286,18 @@ export class SubstrateBatchProcessor { return this } + /** + * Distance from the head block behind which all blocks are considered to be finalized. + * + * By default, the processor will track finalized blocks via `chain_getFinalizedHead`. + * Configure it only if `chain_getFinalizedHead` doesn’t return the expected info. + */ + setFinalityConfirmation(nBlocks: number): this { + this.assertNotRunning() + this.finalityConfirmation = nBlocks + return this + } + /** * Configure a set of fetched fields */ @@ -456,7 +469,8 @@ export class SubstrateBatchProcessor { rpc: this.getChainRpcClient(), headPollInterval: this.rpcIngestSettings?.headPollInterval, newHeadTimeout: this.rpcIngestSettings?.newHeadTimeout, - typesBundle: this.typesBundle + typesBundle: this.typesBundle, + finalityConfirmation: this.finalityConfirmation }) } @@ -586,6 +600,7 @@ export class SubstrateBatchProcessor { requests: this.getBatchRequests(), archive: this.archive == null ? undefined : this.getArchiveDataSource(), hotDataSource: this.rpcIngestSettings?.disabled ? undefined : this.getRpcDataSource(), + allBlocksAreFinal: this.finalityConfirmation === 0, process: (s, b) => this.processBatch(s, b as any, handler), prometheus: this.prometheus, log