Skip to content

Commit

Permalink
fixes per review
Browse files Browse the repository at this point in the history
  • Loading branch information
Igorgro committed Sep 27, 2023
1 parent af892c3 commit 65f0caf
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 35 deletions.
22 changes: 9 additions & 13 deletions substrate/substrate-dump/src/dumper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ export interface DumperOptions {
lastBlock?: number
withTrace?: boolean | string
chunkSize: number
metricsPort?: number
metrics?: number
}


Expand Down Expand Up @@ -86,7 +86,7 @@ export class Dumper {

@def
prometheus() {
return new PrometheusServer(this.options.metricsPort ?? 3000);
return new PrometheusServer(this.options.metrics ?? 0, this.rpc())
}

ingest(range: Range): AsyncIterable<BlockBatch> {
Expand All @@ -99,11 +99,10 @@ export class Dumper {
: this.options.withTrace ? '' : undefined
}

const blocks = this.src().getFinalizedBlocks([{
return this.src().getFinalizedBlocks([{
range,
request
}])
return blocks;
}

private async *process(from?: number, prevHash?: string): AsyncIterable<BlockData[]> {
Expand All @@ -115,7 +114,7 @@ export class Dumper {

let height = new Throttler(() => this.src().getFinalizedHeight(), 300_000)
let chainHeight = await height.get()
this.prometheus().setChainHeight(chainHeight);
this.prometheus().setChainHeight(chainHeight)

let progress = new Progress({
initialValue: this.range().from,
Expand All @@ -141,9 +140,6 @@ export class Dumper {
)
}
}
const metrics = this.rpc().getMetrics();
this.prometheus().setSuccesfulRequestCount(metrics.requestsServed);
this.prometheus().setFailedRequestCount(metrics.connectionErrors);

yield batch.blocks

Expand Down Expand Up @@ -206,16 +202,16 @@ export class Dumper {
}
} else {
const archive = new ArchiveLayout(this.fs())
const prometheus = this.prometheus();
if (this.options.metricsPort) {
await prometheus.serve();
this.log().info(`prometheus metrics are available on port ${this.options.metricsPort}`)
const prometheus = this.prometheus()
if (this.options.metrics != null) {
const promServer = await prometheus.serve()
this.log().info(`prometheus metrics are available on port ${promServer.port}`)
}
await archive.appendRawBlocks({
blocks: (nextBlock, prevHash) => this.saveMetadata(this.process(nextBlock, prevHash)),
range: this.range(),
chunkSize: this.options.chunkSize * 1024 * 1024,
onSuccessWrite: ({ blockRange: { to } }) => { prometheus.setLastWrittenBlock(to.height); }
onSuccessWrite: ({ blockRange: { to } }) => { prometheus.setLastWrittenBlock(to.height) }
})
}
}
Expand Down
2 changes: 1 addition & 1 deletion substrate/substrate-dump/src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ runProgram(() => {
program.option('--last-block <number>', 'Height of the last block to dump', nat)
program.option('--with-trace [targets]', 'Fetch block trace')
program.option('--chunk-size <MB>', 'Data chunk size in megabytes', positiveInt, 32)
program.option('--metrics-port <port>', 'Port to serve metrics on', nat)
program.option('--metrics <port>', 'Enable prometheus metrics server', nat)

let args = program.parse().opts() as DumperOptions

Expand Down
26 changes: 12 additions & 14 deletions substrate/substrate-dump/src/prometheus.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { RpcClient } from '@subsquid/rpc-client';
import {createPrometheusServer, ListeningServer} from '@subsquid/util-internal-prometheus-server'
import promClient, { collectDefaultMetrics, Gauge, Registry } from 'prom-client';

Expand All @@ -9,7 +10,7 @@ export class PrometheusServer {
private lastWrittenBlockGauge: Gauge;
private rpcRequestsGauge: Gauge;

constructor(port: number) {
constructor(port: number, rpc: RpcClient) {
this.port = port;
this.chainHeightGauge = new Gauge({
name: 'sqd_dump_chain_height',
Expand All @@ -27,7 +28,16 @@ export class PrometheusServer {
name: 'sqd_rpc_requests_count',
help: 'Number of rpc requests of different kinds',
labelNames: ['kind'],
registers: [this.registry]
registers: [this.registry],
collect() {
const metrics = rpc.getMetrics();
this.set({
kind: 'successful'
}, metrics.requestsServed);
this.set({
kind: 'failed'
}, metrics.connectionErrors);
}
});

collectDefaultMetrics({register: this.registry})
Expand All @@ -41,18 +51,6 @@ export class PrometheusServer {
this.lastWrittenBlockGauge.set(block);
}

setSuccesfulRequestCount(requests: number) {
this.rpcRequestsGauge.set({
'kind': 'successful'
}, requests)
}

setFailedRequestCount(requests: number) {
this.rpcRequestsGauge.set({
'kind': 'failed'
}, requests)
}

serve(): Promise<ListeningServer> {
return createPrometheusServer(this.registry, this.port)
}
Expand Down
13 changes: 6 additions & 7 deletions util/util-internal-archive-layout/src/layout.ts
Original file line number Diff line number Diff line change
Expand Up @@ -193,13 +193,12 @@ export class ArchiveLayout {
const chunk = getNextChunk(blockRange.from, blockRange.to)
chunk.transactDir('.', async fs => {
let content = await out.end()
const writeResult = fs.write('blocks.jsonl.gz', content)
writeResult.then(() => args.onSuccessWrite ? args.onSuccessWrite({
chunk: chunk.abs(),
blockRange
}) : null)
return writeResult
})
return fs.write('blocks.jsonl.gz', content)
}).then(() => args.onSuccessWrite?.({
chunk: chunk.abs(),
blockRange
}))

firstBlock = undefined
lastBlock = undefined
out = new GzipBuffer()
Expand Down

0 comments on commit 65f0caf

Please sign in to comment.