Skip to content

Commit

Permalink
fix: refactor per review
Browse files Browse the repository at this point in the history
  • Loading branch information
Igorgro committed Sep 25, 2023
1 parent 5cf2abd commit 44ce082
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 18 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"changes": [
{
"packageName": "@subsquid/substrate-dump",
"comment": "feat: implement prometheus metrics",
"type": "minor"
}
],
"packageName": "@subsquid/substrate-dump"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"changes": [
{
"packageName": "@subsquid/util-internal-archive-layout",
"comment": "feat: implement onSuccessWrite callback",
"type": "minor"
}
],
"packageName": "@subsquid/util-internal-archive-layout"
}
4 changes: 2 additions & 2 deletions substrate/substrate-dump/src/dumper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ export class Dumper {

let height = new Throttler(() => this.src().getFinalizedHeight(), 300_000)
let chainHeight = await height.get()
this.prometheus().setLastBlock(this.options.lastBlock ? this.options.lastBlock : chainHeight);
this.prometheus().setChainHeight(chainHeight);

let progress = new Progress({
initialValue: this.range().from,
Expand Down Expand Up @@ -211,7 +211,7 @@ export class Dumper {
blocks: (nextBlock, prevHash) => this.saveMetadata(this.process(nextBlock, prevHash)),
range: this.range(),
chunkSize: this.options.chunkSize * 1024 * 1024,
onSuccessWrite: (block) => { prometheus.setLastSavedBlock(block); }
onSuccessWrite: ({ blockRange: { to } }) => { prometheus.setLastWrittenBlock(to.height); }
})
}
}
Expand Down
20 changes: 10 additions & 10 deletions substrate/substrate-dump/src/prometheus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,32 +5,32 @@ import promClient, { collectDefaultMetrics, Gauge, Registry } from 'prom-client'
export class PrometheusServer {
private registry = new Registry()
private port?: number | string
private lastBlockGauge: Gauge;
private lastSavedBlockGauge: Gauge;
private chainHeightGauge: Gauge;
private lastWrittenBlockGauge: Gauge;

constructor(port: number) {
this.port = port;
this.lastBlockGauge = new Gauge({
name: 'sqd_last_block_total',
this.chainHeightGauge = new Gauge({
name: 'sqd_dump_chain_height',
help: 'Last block available in the chain',
registers: [this.registry]
});

this.lastSavedBlockGauge = new Gauge({
name: 'sqd_last_saved_block_total',
this.lastWrittenBlockGauge = new Gauge({
name: 'sqd_dump_last_written_block',
help: 'Last saved block',
registers: [this.registry]
});

collectDefaultMetrics({register: this.registry})
}

setLastBlock(block: number) {
this.lastBlockGauge.set(block);
setChainHeight(height: number) {
this.chainHeightGauge.set(height);
}

setLastSavedBlock(block: number) {
this.lastSavedBlockGauge.set(block);
setLastWrittenBlock(block: number) {
this.lastWrittenBlockGauge.set(block);
}


Expand Down
15 changes: 9 additions & 6 deletions util/util-internal-archive-layout/src/layout.ts
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ export class ArchiveLayout {
blocks: (nextBlock: number, prevHash?: string) => AsyncIterable<HashAndHeight[]>
range?: Range
chunkSize?: number,
onSuccessWrite?: (lastBlock: number) => void
onSuccessWrite?: (args: { chunk: string, blockRange: { from: HashAndHeight, to: HashAndHeight } }) => void
}
): Promise<void> {
return this.append(
Expand All @@ -186,13 +186,16 @@ export class ArchiveLayout {
let out = new GzipBuffer()

async function save(): Promise<void> {
await getNextChunk(
assertNotNull(firstBlock),
assertNotNull(lastBlock)
).transactDir('.', async fs => {
const fBlock = assertNotNull(firstBlock);
const lBlock = assertNotNull(firstBlock);
const chunk = getNextChunk(fBlock, lBlock)
chunk.transactDir('.', async fs => {
let content = await out.end()
const writeResult = fs.write('blocks.jsonl.gz', content)
writeResult.then(() => args.onSuccessWrite ? args.onSuccessWrite(lastBlock?.height || 0) : null)
writeResult.then(() => args.onSuccessWrite ? args.onSuccessWrite({
chunk: chunk.abs(),
blockRange: { from: fBlock, to: lBlock }
}) : null)
return writeResult
})
firstBlock = undefined
Expand Down

0 comments on commit 44ce082

Please sign in to comment.