Skip to content

Commit

Permalink
solana-dump: save --slot-tip workaround attempt
Browse files Browse the repository at this point in the history
  • Loading branch information
eldargab committed Mar 28, 2024
1 parent 4df786e commit 1a6456e
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 2 deletions.
26 changes: 26 additions & 0 deletions solana/solana-data/src/rpc/data-source.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,18 @@ export interface RpcDataSourceOptions {
}


interface HeightAndSlot {
height: number
slot: number
}


export class RpcDataSource {
private rpc: Rpc
private headPollInterval: number
private strideSize: number
private strideConcurrency: number
private slotTips: HeightAndSlot[] = []

constructor(options: RpcDataSourceOptions) {
this.rpc = new Rpc(options.rpc)
Expand All @@ -31,18 +38,35 @@ export class RpcDataSource {
assert(this.strideSize >= 1)
}

addSlotTip(tip: HeightAndSlot): void {
this.slotTips.push(tip)
}

async getFinalizedHeight(): Promise<number> {
let top = await getFinalizedTop(this.rpc)
return top.height
}

private getTopSlot(bottom: HeightAndSlot, top: number): HeightAndSlot {
if (bottom.height == top) return bottom
assert(bottom.height < top)
for (let tip of this.slotTips) {
if (tip.height == top) return tip
if (bottom.height < tip.height && tip.height < top) {
bottom = tip
}
}
return bottom
}

async *getFinalizedBlocks(
requests: RangeRequest<DataRequest>[],
stopOnHead?: boolean
): AsyncIterable<Batch<Block>> {
let head = new Throttler(() => getFinalizedTop(this.rpc), this.headPollInterval)
let rpc = this.rpc
let strideSize = this.strideSize
let self = this

async function* splits(): AsyncIterable<{
slots: FiniteRange
Expand All @@ -56,6 +80,8 @@ export class RpcDataSource {
let beg = req.range.from
let end = req.range.to ?? Infinity

bottom = self.getTopSlot(bottom, beg)

while (beg <= end) {
if (top.height < beg) {
top = await head.get()
Expand Down
27 changes: 25 additions & 2 deletions solana/solana-dump/src/dumper.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,20 @@
import {Block, RpcDataSource} from '@subsquid/solana-data/lib/rpc'
import {def} from '@subsquid/util-internal'
import {Command, Dumper, DumperOptions, positiveInt, Range, removeOption} from '@subsquid/util-internal-dump-cli'
import {
Command,
Dumper,
DumperOptions,
ErrorMessage,
positiveInt,
Range,
removeOption
} from '@subsquid/util-internal-dump-cli'


interface Options extends DumperOptions {
strideConcurrency: number
strideSize: number
slotTip?: string[]
}


Expand All @@ -14,6 +23,7 @@ export class SolanaDumper extends Dumper<Block, Options> {
program.description('Data archiving tool for Solana')
removeOption(program, 'endpointMaxBatchCallSize')
removeOption(program, 'endpointCapacity')
program.option('--slot-tip <BLOCK:SLOT...>', 'BLOCK:SLOT pair to help to locate required blocks')
program.option('--stride-size <N>', 'Maximum size of getBlock batch call', positiveInt, 10)
program.option('--stride-concurrency <N>', 'Maximum number of pending getBlock batch calls', positiveInt, 5)
}
Expand All @@ -25,6 +35,17 @@ export class SolanaDumper extends Dumper<Block, Options> {
return options
}

private getSlotTips(): {height: number, slot: number}[] {
return this.options().slotTip?.map(tip => {
let m = /^(\d+):(\d+)$/.exec(tip)
if (!m) throw new ErrorMessage(`invalid slot tip: ${tip}`)
return {
height: parseInt(m[1]),
slot: parseInt(m[2])
}
}) ?? []
}

protected fixUnsafeIntegers(): boolean {
return true
}
Expand All @@ -47,12 +68,14 @@ export class SolanaDumper extends Dumper<Block, Options> {

@def
private getDataSource(): RpcDataSource {
return new RpcDataSource({
let src = new RpcDataSource({
rpc: this.rpc(),
headPollInterval: 10_000,
strideSize: this.options().strideSize,
strideConcurrency: this.options().strideConcurrency
})
this.getSlotTips().forEach(tip => src.addSlotTip(tip))
return src
}

protected async* getBlocks(range: Range): AsyncIterable<Block[]> {
Expand Down

0 comments on commit 1a6456e

Please sign in to comment.