diff --git a/solana/solana-data/src/rpc/data-source.ts b/solana/solana-data/src/rpc/data-source.ts index d61a91b4b..c9fb5d144 100644 --- a/solana/solana-data/src/rpc/data-source.ts +++ b/solana/solana-data/src/rpc/data-source.ts @@ -17,11 +17,18 @@ export interface RpcDataSourceOptions { } +interface HeightAndSlot { + height: number + slot: number +} + + export class RpcDataSource { private rpc: Rpc private headPollInterval: number private strideSize: number private strideConcurrency: number + private slotTips: HeightAndSlot[] = [] constructor(options: RpcDataSourceOptions) { this.rpc = new Rpc(options.rpc) @@ -31,11 +38,27 @@ export class RpcDataSource { assert(this.strideSize >= 1) } + addSlotTip(tip: HeightAndSlot): void { + this.slotTips.push(tip) + } + async getFinalizedHeight(): Promise { let top = await getFinalizedTop(this.rpc) return top.height } + private getTopSlot(bottom: HeightAndSlot, top: number): HeightAndSlot { + if (bottom.height == top) return bottom + assert(bottom.height < top) + for (let tip of this.slotTips) { + if (tip.height == top) return tip + if (bottom.height < tip.height && tip.height < top) { + bottom = tip + } + } + return bottom + } + async *getFinalizedBlocks( requests: RangeRequest[], stopOnHead?: boolean @@ -43,6 +66,7 @@ export class RpcDataSource { let head = new Throttler(() => getFinalizedTop(this.rpc), this.headPollInterval) let rpc = this.rpc let strideSize = this.strideSize + let self = this async function* splits(): AsyncIterable<{ slots: FiniteRange @@ -56,6 +80,8 @@ export class RpcDataSource { let beg = req.range.from let end = req.range.to ?? Infinity + bottom = self.getTopSlot(bottom, beg) + while (beg <= end) { if (top.height < beg) { top = await head.get() diff --git a/solana/solana-dump/src/dumper.ts b/solana/solana-dump/src/dumper.ts index fb72d7d43..fc9efa7f9 100644 --- a/solana/solana-dump/src/dumper.ts +++ b/solana/solana-dump/src/dumper.ts @@ -1,11 +1,20 @@ import {Block, RpcDataSource} from '@subsquid/solana-data/lib/rpc' import {def} from '@subsquid/util-internal' -import {Command, Dumper, DumperOptions, positiveInt, Range, removeOption} from '@subsquid/util-internal-dump-cli' +import { + Command, + Dumper, + DumperOptions, + ErrorMessage, + positiveInt, + Range, + removeOption +} from '@subsquid/util-internal-dump-cli' interface Options extends DumperOptions { strideConcurrency: number strideSize: number + slotTip?: string[] } @@ -14,6 +23,7 @@ export class SolanaDumper extends Dumper { program.description('Data archiving tool for Solana') removeOption(program, 'endpointMaxBatchCallSize') removeOption(program, 'endpointCapacity') + program.option('--slot-tip ', 'BLOCK:SLOT pair to help to locate required blocks') program.option('--stride-size ', 'Maximum size of getBlock batch call', positiveInt, 10) program.option('--stride-concurrency ', 'Maximum number of pending getBlock batch calls', positiveInt, 5) } @@ -25,6 +35,17 @@ export class SolanaDumper extends Dumper { return options } + private getSlotTips(): {height: number, slot: number}[] { + return this.options().slotTip?.map(tip => { + let m = /^(\d+):(\d+)$/.exec(tip) + if (!m) throw new ErrorMessage(`invalid slot tip: ${tip}`) + return { + height: parseInt(m[1]), + slot: parseInt(m[2]) + } + }) ?? [] + } + protected fixUnsafeIntegers(): boolean { return true } @@ -47,12 +68,14 @@ export class SolanaDumper extends Dumper { @def private getDataSource(): RpcDataSource { - return new RpcDataSource({ + let src = new RpcDataSource({ rpc: this.rpc(), headPollInterval: 10_000, strideSize: this.options().strideSize, strideConcurrency: this.options().strideConcurrency }) + this.getSlotTips().forEach(tip => src.addSlotTip(tip)) + return src } protected async* getBlocks(range: Range): AsyncIterable {