Skip to content

Commit

Permalink
solana: enhance slot search to handle the trim horizon
Browse files Browse the repository at this point in the history
  • Loading branch information
eldargab committed Mar 30, 2024
1 parent 1a6456e commit f359138
Show file tree
Hide file tree
Showing 4 changed files with 150 additions and 146 deletions.
30 changes: 2 additions & 28 deletions solana/solana-data/src/rpc/data-source.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ import {Batch} from '@subsquid/util-internal-ingest-tools'
import {FiniteRange, RangeRequest, splitRange} from '@subsquid/util-internal-range'
import assert from 'assert'
import {Block, DataRequest} from './data'
import {findSlot, getData, getFinalizedTop, isConsistentChain} from './fetch'
import {getData, getFinalizedTop, isConsistentChain} from './fetch'
import {Rpc} from './rpc'
import {findSlot} from './slot-search'


export interface RpcDataSourceOptions {
Expand All @@ -17,18 +18,11 @@ export interface RpcDataSourceOptions {
}


interface HeightAndSlot {
height: number
slot: number
}


export class RpcDataSource {
private rpc: Rpc
private headPollInterval: number
private strideSize: number
private strideConcurrency: number
private slotTips: HeightAndSlot[] = []

constructor(options: RpcDataSourceOptions) {
this.rpc = new Rpc(options.rpc)
Expand All @@ -38,35 +32,18 @@ export class RpcDataSource {
assert(this.strideSize >= 1)
}

addSlotTip(tip: HeightAndSlot): void {
this.slotTips.push(tip)
}

async getFinalizedHeight(): Promise<number> {
let top = await getFinalizedTop(this.rpc)
return top.height
}

private getTopSlot(bottom: HeightAndSlot, top: number): HeightAndSlot {
if (bottom.height == top) return bottom
assert(bottom.height < top)
for (let tip of this.slotTips) {
if (tip.height == top) return tip
if (bottom.height < tip.height && tip.height < top) {
bottom = tip
}
}
return bottom
}

async *getFinalizedBlocks(
requests: RangeRequest<DataRequest>[],
stopOnHead?: boolean
): AsyncIterable<Batch<Block>> {
let head = new Throttler(() => getFinalizedTop(this.rpc), this.headPollInterval)
let rpc = this.rpc
let strideSize = this.strideSize
let self = this

async function* splits(): AsyncIterable<{
slots: FiniteRange
Expand All @@ -79,9 +56,6 @@ export class RpcDataSource {
for (let req of requests) {
let beg = req.range.from
let end = req.range.to ?? Infinity

bottom = self.getTopSlot(bottom, beg)

while (beg <= end) {
if (top.height < beg) {
top = await head.get()
Expand Down
96 changes: 2 additions & 94 deletions solana/solana-data/src/rpc/fetch.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,12 @@
import {createLogger} from '@subsquid/logger'
import {last, wait} from '@subsquid/util-internal'
import {wait} from '@subsquid/util-internal'
import {FiniteRange} from '@subsquid/util-internal-range'
import assert from 'assert'
import {Commitment} from '../base'
import {Block, DataRequest, GetBlock} from './data'
import {GetBlockOptions, Rpc} from './rpc'


const log = createLogger('sqd:solana-data')


interface HeightAndSlot {
export interface HeightAndSlot {
slot: number
height: number
}
Expand All @@ -36,94 +32,6 @@ export async function getFinalizedTop(rpc: Rpc): Promise<HeightAndSlot> {
}


export async function getSlot(rpc: Rpc, height: number): Promise<number> {
if (height == 0) return 0
let top = await getFinalizedTop(rpc)
if (top.height == height) return top.slot
if (top.height < height) throw new Error(`block height ${height} haven't been reached`)
return findSlot(rpc, height, {height: 0, slot: 0}, top)
}


export async function findSlot(rpc: Rpc, height: number, bottom: HeightAndSlot, top: HeightAndSlot): Promise<number> {
if (bottom.height == height) return bottom.slot
if (top.height == height) return top.slot
if (top.slot - bottom.slot == top.height - bottom.height) return bottom.slot + height - bottom.height

log.debug({
height,
bottom,
top,
distance: top.slot - bottom.slot
}, 'block search')

assert(bottom.height < height)
assert(height < top.height)
assert(top.slot - bottom.slot > top.height - bottom.height)

if (height - bottom.height < 100) {
let blocks = await rpc.getBlocksWithLimit('finalized', bottom.slot + 1, height - bottom.height)
assert(blocks.length == height - bottom.height)
return last(blocks)
}

let middle: number

if (height - bottom.height < top.height - height) {
middle = bottom.slot + Math.floor(
(top.slot - bottom.slot) * Math.max((height - bottom.height) / (top.height - bottom.height), 0.01)
)
if (middle - bottom.slot < 100) {
let end = Math.min(bottom.slot + 100, top.slot)
let blocks = await rpc.getBlocks('finalized', bottom.slot + 1, end)
if (blocks.length >= height - bottom.height) return blocks[height - bottom.height - 1]
return findSlot(
rpc,
height,
{
height: bottom.height + blocks.length,
slot: end
},
top
)
}
} else {
middle = top.slot - Math.floor(
(top.slot - bottom.slot) * Math.max((top.height - height) / (top.height - bottom.height), 0.01)
)
if (top.slot - middle < 100) {
let beg = Math.max(bottom.slot + 1, top.slot - 100)
let blocks = await rpc.getBlocks('finalized', beg, top.slot - 1)
if (blocks.length >= top.height - height) return blocks[height - (top.height - blocks.length)]
return findSlot(
rpc,
height,
bottom,
{
height: top.height - blocks.length,
slot: beg
}
)
}
}

let blocks = await rpc.getBlocksWithLimit('finalized', middle, 1)
assert(blocks.length == 1)
let slot = blocks[0]

let info = await rpc.getBlockInfo('finalized', slot)
assert(info)
assert(info.blockHeight != null, 'block search is not possible in this block range')

if (info.blockHeight == height) return slot
if (info.blockHeight > height) {
return findSlot(rpc, height, bottom, {height: info.blockHeight, slot: middle})
} else {
return findSlot(rpc, height, {height: info.blockHeight, slot}, top)
}
}


export async function getData(
rpc: Rpc,
commitment: Commitment,
Expand Down
144 changes: 144 additions & 0 deletions solana/solana-data/src/rpc/slot-search.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
import {createLogger} from '@subsquid/logger'
import {last} from '@subsquid/util-internal'
import assert from 'assert'
import {HeightAndSlot} from './fetch'
import {Rpc} from './rpc'


const log = createLogger('sqd:solana-data')


export function findSlot(rpc: Rpc, height: number, bottom: HeightAndSlot, top: HeightAndSlot): Promise<number> {
return new SlotSearch(rpc, height).search(bottom, top)
}


class SlotSearch {
constructor(
private rpc: Rpc,
private height: number
) {}

async search(bottom: HeightAndSlot, top: HeightAndSlot): Promise<number> {
if (bottom.height == this.height) return bottom.slot
if (top.height == this.height) return top.slot
if (top.slot - bottom.slot == top.height - bottom.height) return bottom.slot + this.height - bottom.height

log.debug({
height: this.height,
bottom,
top,
distance: top.slot - bottom.slot
}, 'block search')

assert(bottom.height < this.height)
assert(this.height < top.height)
assert(top.slot - bottom.slot > top.height - bottom.height)

if (this.height - bottom.height < 100) {
let blocks = await this.rpc.getBlocksWithLimit('finalized', bottom.slot + 1, this.height - bottom.height)
assert(blocks.length == this.height - bottom.height)
return last(blocks)
}

let middle: number

if (this.height - bottom.height < top.height - this.height) {
middle = bottom.slot + Math.ceil(
(top.slot - bottom.slot) * Math.max((this.height - bottom.height) / (top.height - bottom.height), 0.01)
)
if (middle - bottom.slot < 100) {
let end = Math.min(bottom.slot + 100, top.slot)
let blocks = await this.rpc.getBlocks('finalized', bottom.slot + 1, end)
if (blocks.length >= this.height - bottom.height) return blocks[this.height - bottom.height - 1]
return this.search(
{
height: bottom.height + blocks.length,
slot: end
},
top
)
}
} else {
middle = top.slot - Math.ceil(
(top.slot - bottom.slot) * Math.max((top.height - this.height) / (top.height - bottom.height), 0.01)
)
if (top.slot - middle < 100) {
let beg = Math.max(bottom.slot + 1, top.slot - 100)
let blocks = await this.rpc.getBlocks('finalized', beg, top.slot - 1)
if (blocks.length >= top.height - this.height) return blocks[this.height - (top.height - blocks.length)]
return this.search(
bottom,
{
height: top.height - blocks.length,
slot: beg
}
)
}
}

let slot = await this.getFilledSlot(middle)
let height = await this.getBlockHeight(slot)

if (height == 'TRIMMED') {
return this.searchNearTrimHorizon(bottom, top, slot)
}

if (height == this.height) return slot
if (height > this.height) {
return this.search(bottom, {height, slot: middle})
} else {
return this.search({height, slot}, top)
}
}

private async searchNearTrimHorizon(
bottom: HeightAndSlot,
top: HeightAndSlot,
trimmedSlot: number
): Promise<number> {
if (top.slot - trimmedSlot <= top.height - this.height) throw new Error(
`Seems that block with height ${this.height} is not available on RPC node`
)

let middle = trimmedSlot + Math.ceil((top.slot - trimmedSlot) / 2)
assert(middle > trimmedSlot)

let slot = await this.getFilledSlot(middle)
let height = await this.getBlockHeight(slot)

if (height == 'TRIMMED') return this.searchNearTrimHorizon(bottom, top, slot)

if (height <= this.height) {
return this.search({height, slot}, top)
} else {
return this.search(bottom, {height, slot: middle})
}
}

private async getFilledSlot(startSlot: number): Promise<number> {
let blocks = await this.rpc.getBlocksWithLimit('finalized', startSlot, 1)
assert(blocks.length == 1)
return blocks[0]
}

private async getBlockHeight(slot: number): Promise<number | 'TRIMMED'> {
let info = await this.rpc.getBlockInfo('finalized', slot).catch((err: Error) => {
if (/first available block/i.test(err.message)) return 'TRIMMED' as const
throw err
})

if (info == 'TRIMMED') return 'TRIMMED'

if (info == null) throw new Error(
`Slot ${slot} should be already finalized and contain a valid block`
)

// We can hit this even when we are looking for a block with a valid `.blockHeight`
if (info.blockHeight == null) throw new Error(
`Search for block height ${this.height} is not possible, try to start with a higher block`
)

return info.blockHeight
}
}
26 changes: 2 additions & 24 deletions solana/solana-dump/src/dumper.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,6 @@
import {Block, RpcDataSource} from '@subsquid/solana-data/lib/rpc'
import {def} from '@subsquid/util-internal'
import {
Command,
Dumper,
DumperOptions,
ErrorMessage,
positiveInt,
Range,
removeOption
} from '@subsquid/util-internal-dump-cli'
import {Command, Dumper, DumperOptions, positiveInt, Range, removeOption} from '@subsquid/util-internal-dump-cli'


interface Options extends DumperOptions {
Expand All @@ -23,7 +15,6 @@ export class SolanaDumper extends Dumper<Block, Options> {
program.description('Data archiving tool for Solana')
removeOption(program, 'endpointMaxBatchCallSize')
removeOption(program, 'endpointCapacity')
program.option('--slot-tip <BLOCK:SLOT...>', 'BLOCK:SLOT pair to help to locate required blocks')
program.option('--stride-size <N>', 'Maximum size of getBlock batch call', positiveInt, 10)
program.option('--stride-concurrency <N>', 'Maximum number of pending getBlock batch calls', positiveInt, 5)
}
Expand All @@ -35,17 +26,6 @@ export class SolanaDumper extends Dumper<Block, Options> {
return options
}

private getSlotTips(): {height: number, slot: number}[] {
return this.options().slotTip?.map(tip => {
let m = /^(\d+):(\d+)$/.exec(tip)
if (!m) throw new ErrorMessage(`invalid slot tip: ${tip}`)
return {
height: parseInt(m[1]),
slot: parseInt(m[2])
}
}) ?? []
}

protected fixUnsafeIntegers(): boolean {
return true
}
Expand All @@ -68,14 +48,12 @@ export class SolanaDumper extends Dumper<Block, Options> {

@def
private getDataSource(): RpcDataSource {
let src = new RpcDataSource({
return new RpcDataSource({
rpc: this.rpc(),
headPollInterval: 10_000,
strideSize: this.options().strideSize,
strideConcurrency: this.options().strideConcurrency
})
this.getSlotTips().forEach(tip => src.addSlotTip(tip))
return src
}

protected async* getBlocks(range: Range): AsyncIterable<Block[]> {
Expand Down

0 comments on commit f359138

Please sign in to comment.