diff --git a/substrate/substrate-dump/src/dumper.ts b/substrate/substrate-dump/src/dumper.ts index 04eac4cf2..5eac02d6e 100644 --- a/substrate/substrate-dump/src/dumper.ts +++ b/substrate/substrate-dump/src/dumper.ts @@ -27,7 +27,7 @@ export interface DumperOptions { lastBlock?: number withTrace?: boolean | string chunkSize: number - metricsPort?: number + metrics?: number } @@ -86,7 +86,7 @@ export class Dumper { @def prometheus() { - return new PrometheusServer(this.options.metricsPort ?? 3000); + return new PrometheusServer(this.options.metrics ?? 0, this.rpc()) } ingest(range: Range): AsyncIterable { @@ -99,11 +99,10 @@ export class Dumper { : this.options.withTrace ? '' : undefined } - const blocks = this.src().getFinalizedBlocks([{ + return this.src().getFinalizedBlocks([{ range, request }]) - return blocks; } private async *process(from?: number, prevHash?: string): AsyncIterable { @@ -115,7 +114,7 @@ export class Dumper { let height = new Throttler(() => this.src().getFinalizedHeight(), 300_000) let chainHeight = await height.get() - this.prometheus().setChainHeight(chainHeight); + this.prometheus().setChainHeight(chainHeight) let progress = new Progress({ initialValue: this.range().from, @@ -141,9 +140,6 @@ export class Dumper { ) } } - const metrics = this.rpc().getMetrics(); - this.prometheus().setSuccesfulRequestCount(metrics.requestsServed); - this.prometheus().setFailedRequestCount(metrics.connectionErrors); yield batch.blocks @@ -206,16 +202,16 @@ export class Dumper { } } else { const archive = new ArchiveLayout(this.fs()) - const prometheus = this.prometheus(); - if (this.options.metricsPort) { - await prometheus.serve(); - this.log().info(`prometheus metrics are available on port ${this.options.metricsPort}`) + const prometheus = this.prometheus() + if (this.options.metrics != null) { + const promServer = await prometheus.serve() + this.log().info(`prometheus metrics are available on port ${promServer.port}`) } await archive.appendRawBlocks({ blocks: (nextBlock, prevHash) => this.saveMetadata(this.process(nextBlock, prevHash)), range: this.range(), chunkSize: this.options.chunkSize * 1024 * 1024, - onSuccessWrite: ({ blockRange: { to } }) => { prometheus.setLastWrittenBlock(to.height); } + onSuccessWrite: ({ blockRange: { to } }) => { prometheus.setLastWrittenBlock(to.height) } }) } } diff --git a/substrate/substrate-dump/src/main.ts b/substrate/substrate-dump/src/main.ts index 0a883c084..b1af928e1 100644 --- a/substrate/substrate-dump/src/main.ts +++ b/substrate/substrate-dump/src/main.ts @@ -22,7 +22,7 @@ runProgram(() => { program.option('--last-block ', 'Height of the last block to dump', nat) program.option('--with-trace [targets]', 'Fetch block trace') program.option('--chunk-size ', 'Data chunk size in megabytes', positiveInt, 32) - program.option('--metrics-port ', 'Port to serve metrics on', nat) + program.option('--metrics ', 'Enable prometheus metrics server', nat) let args = program.parse().opts() as DumperOptions diff --git a/substrate/substrate-dump/src/prometheus.ts b/substrate/substrate-dump/src/prometheus.ts index 4316781c4..a969bfc42 100644 --- a/substrate/substrate-dump/src/prometheus.ts +++ b/substrate/substrate-dump/src/prometheus.ts @@ -1,3 +1,4 @@ +import { RpcClient } from '@subsquid/rpc-client'; import {createPrometheusServer, ListeningServer} from '@subsquid/util-internal-prometheus-server' import promClient, { collectDefaultMetrics, Gauge, Registry } from 'prom-client'; @@ -9,7 +10,7 @@ export class PrometheusServer { private lastWrittenBlockGauge: Gauge; private rpcRequestsGauge: Gauge; - constructor(port: number) { + constructor(port: number, rpc: RpcClient) { this.port = port; this.chainHeightGauge = new Gauge({ name: 'sqd_dump_chain_height', @@ -27,7 +28,16 @@ export class PrometheusServer { name: 'sqd_rpc_requests_count', help: 'Number of rpc requests of different kinds', labelNames: ['kind'], - registers: [this.registry] + registers: [this.registry], + collect() { + const metrics = rpc.getMetrics(); + this.set({ + kind: 'successful' + }, metrics.requestsServed); + this.set({ + kind: 'failed' + }, metrics.connectionErrors); + } }); collectDefaultMetrics({register: this.registry}) @@ -41,18 +51,6 @@ export class PrometheusServer { this.lastWrittenBlockGauge.set(block); } - setSuccesfulRequestCount(requests: number) { - this.rpcRequestsGauge.set({ - 'kind': 'successful' - }, requests) - } - - setFailedRequestCount(requests: number) { - this.rpcRequestsGauge.set({ - 'kind': 'failed' - }, requests) - } - serve(): Promise { return createPrometheusServer(this.registry, this.port) } diff --git a/util/util-internal-archive-layout/src/layout.ts b/util/util-internal-archive-layout/src/layout.ts index 9ffdc217a..bf4285c1f 100644 --- a/util/util-internal-archive-layout/src/layout.ts +++ b/util/util-internal-archive-layout/src/layout.ts @@ -193,13 +193,12 @@ export class ArchiveLayout { const chunk = getNextChunk(blockRange.from, blockRange.to) chunk.transactDir('.', async fs => { let content = await out.end() - const writeResult = fs.write('blocks.jsonl.gz', content) - writeResult.then(() => args.onSuccessWrite ? args.onSuccessWrite({ - chunk: chunk.abs(), - blockRange - }) : null) - return writeResult - }) + return fs.write('blocks.jsonl.gz', content) + }).then(() => args.onSuccessWrite?.({ + chunk: chunk.abs(), + blockRange + })) + firstBlock = undefined lastBlock = undefined out = new GzipBuffer()