Skip to content

Commit

Permalink
try to improve portal ingesting
Browse files Browse the repository at this point in the history
  • Loading branch information
belopash committed Dec 10, 2024
1 parent d93ed96 commit 5cab364
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 67 deletions.
8 changes: 4 additions & 4 deletions evm/evm-processor/src/ds-archive/portal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,15 @@ export class EvmPortal implements DataSource<Block, DataRequest> {
constructor(private client: PortalClient) {}

getFinalizedHeight(): Promise<number> {
return this.client.getHeight()
return this.client.getFinalizedHeight()
}

async getBlockHash(height: number): Promise<Bytes32> {
let query = makeQuery({
range: {from: height, to: height},
request: {includeAllBlocks: true},
})
let blocks = await this.client.query(query)
let blocks = await this.client.finalizedQuery(query)
assert(blocks.length == 1)
return blocks[0].header.hash
}
Expand All @@ -77,15 +77,15 @@ export class EvmPortal implements DataSource<Block, DataRequest> {
requests: RangeRequest<DataRequest>[],
stopOnHead?: boolean | undefined
): AsyncIterable<Batch<Block>> {
let height = new Throttler(() => this.client.getHeight(), 20_000)
let height = new Throttler(() => this.client.getFinalizedHeight(), 20_000)

let top = await height.call()
for (let req of requests) {
let lastBlock = req.range.from - 1
let endBlock = req.range.to || Infinity
let query = makeQuery(req)

for await (let batch of this.client.stream(query, stopOnHead)) {
for await (let batch of this.client.finalizedStream(query, stopOnHead)) {
assert(batch.length > 0, 'boundary blocks are expected to be included')
lastBlock = last(batch).header.number

Expand Down
8 changes: 4 additions & 4 deletions substrate/substrate-processor/src/ds-portal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,23 +70,23 @@ export class SubstratePortal implements DataSource<Block, DataRequest> {
}

getFinalizedHeight(): Promise<number> {
return this.client.getHeight()
return this.client.getFinalizedHeight()
}

async getBlockHash(height: number): Promise<string | null> {
let query = makeQuery({
range: {from: height, to: height},
request: {includeAllBlocks: true},
})
let blocks = await this.client.query(query)
let blocks = await this.client.finalizedQuery(query)
return blocks[0]?.header?.hash || null
}

async *getFinalizedBlocks(
requests: RangeRequest<DataRequest>[],
stopOnHead?: boolean | undefined
): AsyncIterable<Batch<Block>> {
let height = new Throttler(() => this.client.getHeight(), 20_000)
let height = new Throttler(() => this.client.getFinalizedHeight(), 20_000)

let runtimeTracker = new RuntimeTracker<ArchiveBlockHeader & WithRuntime>(
this.rpc,
Expand All @@ -101,7 +101,7 @@ export class SubstratePortal implements DataSource<Block, DataRequest> {
let endBlock = req.range.to || Infinity
let query = makeQuery(req)

for await (let batch of this.client.stream<ArchiveBlock>(query, stopOnHead)) {
for await (let batch of this.client.finalizedStream<ArchiveBlock>(query, stopOnHead)) {
assert(batch.length > 0, 'boundary blocks are expected to be included')
lastBlock = last(batch).header.number

Expand Down
8 changes: 6 additions & 2 deletions test/balances/src/processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ import {events} from './types'


const processor = new SubstrateBatchProcessor()
.setPortal('https://portal.sqd.dev/datasets/kusama')
.setPortal({
url: 'https://portal.sqd.dev/datasets/kusama',
bufferThreshold: 50 * 1024 * 1024,
})
.setRpcEndpoint(process.env.KUSAMA_NODE_WS || 'wss://kusama-rpc.polkadot.io')
.setRpcDataIngestionSettings({
// disabled: true,
Expand All @@ -20,11 +23,12 @@ const processor = new SubstrateBatchProcessor()
})
.setBlockRange({from: 0})
.addEvent({
name: [events.balances.transfer.name]
name: [events.balances.transfer.name],
})


processor.run(new TypeormDatabase(), async ctx => {

let transfers: Transfer[] = []

for (let block of ctx.blocks) {
Expand Down
152 changes: 95 additions & 57 deletions util/portal-client/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,16 +58,6 @@ export class PortalClient {
return u.toString()
}

async getHeight(): Promise<number> {
let res: string = await this.http.get(this.getDatasetUrl('finalized-stream/height'), {
retryAttempts: 3,
httpTimeout: 10_000,
})
let height = parseInt(res)
assert(Number.isSafeInteger(height))
return height
}

async getMetadata(): Promise<Metadata> {
let res: {real_time: boolean} = await this.http.get(this.getDatasetUrl('metadata'), {
retryAttempts: 3,
Expand All @@ -78,7 +68,18 @@ export class PortalClient {
}
}

query<B extends Block = Block, Q extends PortalQuery = PortalQuery>(query: Q): Promise<B[]> {
async getFinalizedHeight(): Promise<number> {
let res: string = await this.http.get(this.getDatasetUrl('finalized-stream/height'), {
retryAttempts: 3,
httpTimeout: 10_000,
})
let height = parseInt(res)
assert(Number.isSafeInteger(height))
return height
}

finalizedQuery<B extends Block = Block, Q extends PortalQuery = PortalQuery>(query: Q): Promise<B[]> {
// FIXME: is it needed or it is better to always use stream?
return this.http
.request<Buffer>('POST', this.getDatasetUrl(`finalized-stream`), {
json: query,
Expand All @@ -91,7 +92,6 @@ export class PortalClient {
})
)
.then((res) => {
// TODO: move the conversion to the server
let blocks = res.body
.toString('utf8')
.trimEnd()
Expand All @@ -101,14 +101,50 @@ export class PortalClient {
})
}

async *stream<B extends Block = Block, Q extends PortalQuery = PortalQuery>(
async *finalizedStream<B extends Block = Block, Q extends PortalQuery = PortalQuery>(
query: Q,
stopOnHead = false
): AsyncIterable<B[]> {
let queue = new AsyncQueue<B[] | Error>(1)
let bufferSize = 0
let isReady = false
let cache: B[] = []

const getBuffer = () => {
if (queue.isClosed()) return
let peeked = queue.peek()
// FIXME: is it a valid case?
if (peeked instanceof Error) return

// buffer has been consumed, we need to reset
if (isReady && !peeked) {
reset()
}

return peeked ?? cache
}

const reset = () => {
bufferSize = 0
isReady = false
cache.length = 0
}

const setReady = () => {
if (queue.isClosed()) return
if (isReady) return
queue.forcePut(cache)
isReady = true
cache = []
}

const waitForReset = async () => {
if (queue.isClosed()) return
await queue.wait()
reset()
}

const ingest = async () => {
let bufferSize = 0
let fromBlock = query.fromBlock
let toBlock = query.toBlock ?? Infinity

Expand All @@ -131,59 +167,61 @@ export class PortalClient {
// no blocks left
if (res.status == 204) {
if (stopOnHead) return

await wait(1000)
} else {
try {
let stream = splitLines(res.body)

while (true) {
let lines = await addTimeout(stream.next(), this.newBlockTimeout)
if (lines.done) break

let batch = queue.peek()
if (batch instanceof Error) return

if (!batch) {
bufferSize = 0
}

let blocks = lines.value.map((line) => {
bufferSize += line.length
return JSON.parse(line) as B
})

if (batch) {
// FIXME: won't it overflow stack?
batch.push(...blocks)
if (bufferSize > this.bufferThreshold) {
await queue.wait()
}
} else {
await queue.put(blocks)
}

fromBlock = last(blocks).header.number + 1
}
} catch (err) {
if (err instanceof TimeoutError) {
this.log?.warn(
`resetting stream, because we haven't seen a new blocks for ${this.newBlockTimeout} ms`
)
res.body.destroy()
} else {
throw err
continue
}

try {
let stream = splitLines(res.body)

while (true) {
let lines = await addTimeout(stream.next(), this.newBlockTimeout)
if (lines.done) break

let buffer = getBuffer()
if (buffer == null) break

let blocks = lines.value.map((line) => {
bufferSize += line.length
return JSON.parse(line) as B
})

// FIXME: won't it overflow stack?
buffer.push(...blocks)

fromBlock = last(blocks).header.number + 1

if (bufferSize > this.bufferThreshold) {
setReady()
await waitForReset()
}
}

if (bufferSize > 0) {
setReady()
}
} catch (err) {
if (err instanceof TimeoutError) {
this.log?.warn(
`resetting stream, because we haven't seen a new blocks for ${this.newBlockTimeout} ms`
)
} else {
throw err
}
} finally {
// FIXME: is it needed?
res.body.destroy()
}
}
}

ingest().then(
() => queue.close(),
(err) => {
if (!queue.isClosed()) {
queue.forcePut(ensureError(err))
}
if (queue.isClosed()) return
queue.forcePut(ensureError(err))
queue.close()
}
)

Expand Down

0 comments on commit 5cab364

Please sign in to comment.