Skip to content

Commit

Permalink
feature: add FINAL_BLOCKS_ONLY
Browse files Browse the repository at this point in the history
fixes: #19
  • Loading branch information
DenisCarriere committed Oct 13, 2023
1 parent efabc9b commit 3c055db
Show file tree
Hide file tree
Showing 7 changed files with 24 additions and 17 deletions.
3 changes: 2 additions & 1 deletion .env-example
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ MANIFEST=https://github.com/pinax-network/subtivity-substreams/releases/download
MODULE_NAME=map_block_stats
START_BLOCK=100000
STOP_BLOCK=100010
VERBOSE=true
VERBOSE=true
FINAL_BLOCKS_ONLY=false
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ Options:
--metrics-labels [string...] To apply generic labels to all default metrics (ex: --labels foo=bar) (default: {}, env: METRICS_LABELS)
--collect-default-metrics <boolean> Collect default metrics (default: false, env: COLLECT_DEFAULT_METRICS)
--headers [string...] Set headers that will be sent on every requests (ex: --headers X-HEADER=headerA) (default: {}, env: HEADERS)
--final-blocks-only <boolean> Only process blocks that have pass finality, to prevent any reorg and undo signal by staying further away from the chain HEAD (default: "false", env: FINAL_BLOCKS_ONLY)
--verbose Enable verbose logging (default: false, env: VERBOSE)
-h, --help display help for command
```
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "substreams-sink",
"version": "0.9.9",
"version": "0.10.0",
"description": "Substreams Sink",
"type": "module",
"exports": "./dist/index.js",
Expand Down
12 changes: 7 additions & 5 deletions src/commander.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import "dotenv/config";
import { Command, Option } from "commander";
import { DEFAULT_CURSOR_PATH, DEFAULT_RESTART_INACTIVITY_SECONDS, DEFAULT_PARAMS, DEFAULT_SUBSTREAMS_API_TOKEN, DEFAULT_AUTH_ISSUE_URL, DEFAULT_VERBOSE, DEFAULT_HOSTNAME, DEFAULT_PORT, DEFAULT_METRICS_LABELS, DEFAULT_COLLECT_DEFAULT_METRICS, DEFAULT_START_BLOCK, DEFAULT_DELAY_BEFORE_START, DEFAULT_HEADERS, DEFAULT_PRODUCTION_MODE } from "./config.js";
import { DEFAULT_CURSOR_PATH, DEFAULT_RESTART_INACTIVITY_SECONDS, DEFAULT_PARAMS, DEFAULT_SUBSTREAMS_API_TOKEN, DEFAULT_AUTH_ISSUE_URL, DEFAULT_VERBOSE, DEFAULT_HOSTNAME, DEFAULT_PORT, DEFAULT_METRICS_LABELS, DEFAULT_COLLECT_DEFAULT_METRICS, DEFAULT_START_BLOCK, DEFAULT_DELAY_BEFORE_START, DEFAULT_HEADERS, DEFAULT_PRODUCTION_MODE, DEFAULT_FINAL_BLOCKS_ONLY } from "./config.js";

import { list } from "./list.js";
import { logger } from "./logger.js";
Expand All @@ -22,14 +22,15 @@ export interface RunOptions {
authIssueUrl: string;
delayBeforeStart: number;
cursorPath: string;
productionMode: boolean;
productionMode: string;
restartInactivitySeconds: number;
hostname: string;
port: number;
metricsLabels: string[];
collectDefaultMetrics: boolean;
collectDefaultMetrics: string;
headers: Headers;
verbose: boolean;
verbose: string;
finalBlocksOnly: string;
}

export function program(pkg: Package) {
Expand Down Expand Up @@ -93,5 +94,6 @@ export function run(program: Command, pkg: Package) {
.addOption(new Option("--metrics-labels [string...]", "To apply generic labels to all default metrics (ex: --labels foo=bar)").default(DEFAULT_METRICS_LABELS).env("METRICS_LABELS").argParser(handleMetricsLabels))
.addOption(new Option("--collect-default-metrics <boolean>", "Collect default metrics").default(DEFAULT_COLLECT_DEFAULT_METRICS).env("COLLECT_DEFAULT_METRICS"))
.addOption(new Option("--headers [string...]", "Set headers that will be sent on every requests (ex: --headers X-HEADER=headerA)").default(DEFAULT_HEADERS).env("HEADERS").argParser(handleHeaders))
.addOption(new Option("--verbose", "Enable verbose logging").default(DEFAULT_VERBOSE).env("VERBOSE"));
.addOption(new Option("--final-blocks-only <boolean>", "Only process blocks that have pass finality, to prevent any reorg and undo signal by staying further away from the chain HEAD").default(DEFAULT_FINAL_BLOCKS_ONLY).env("FINAL_BLOCKS_ONLY"))
.addOption(new Option("--verbose <boolean>", "Enable verbose logging").default(DEFAULT_VERBOSE).env("VERBOSE"));
}
7 changes: 4 additions & 3 deletions src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@
export const DEFAULT_HOSTNAME = "localhost";
export const DEFAULT_PORT = 9102;
export const DEFAULT_CURSOR_PATH = "cursor.lock";
export const DEFAULT_VERBOSE = false;
export const DEFAULT_VERBOSE = "false";
export const DEFAULT_RESTART_INACTIVITY_SECONDS = 300;
export const DEFAULT_PRODUCTION_MODE = false;
export const DEFAULT_PRODUCTION_MODE = "false";
export const DEFAULT_DELAY_BEFORE_START = 0;
export const DEFAULT_METRICS_LABELS = {};
export const DEFAULT_COLLECT_DEFAULT_METRICS = false;
export const DEFAULT_COLLECT_DEFAULT_METRICS = "false";
export const DEFAULT_START_BLOCK = "-1";
export const DEFAULT_SUBSTREAMS_API_TOKEN = "";
export const DEFAULT_PARAMS = [];
export const DEFAULT_HEADERS = new Headers();
export const DEFAULT_AUTH_ISSUE_URL = "https://auth.pinax.network/v1/auth/issue";
export const DEFAULT_FINAL_BLOCKS_ONLY = "false";
7 changes: 4 additions & 3 deletions src/prometheus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ export function handleSession(session: SessionInit) {
}

export function handleManifest(emitter: BlockEmitter, moduleHash: string, options: RunOptions) {
logger.info("manifest", { moduleHash, manifest: options.manifest, substreamsEndpoint: options.substreamsEndpoint });
const labelNames = ["module_hash", "manifest", "output_module", "substreams_endpoint", "start_block_num", "stop_block_num", "production_mode"];
logger.info("manifest", { moduleHash, manifest: options.manifest, substreamsEndpoint: options.substreamsEndpoint, finalBlocksOnly: options.finalBlocksOnly, production_mode: options.productionMode });
const labelNames = ["module_hash", "manifest", "output_module", "substreams_endpoint", "start_block_num", "stop_block_num", "production_mode", "final_blocks_only"];
const gauge = registerGauge("manifest", "Substreams manifest and sha256 hash of map module", labelNames) as Gauge;
gauge.labels({
module_hash: moduleHash,
Expand All @@ -112,6 +112,7 @@ export function handleManifest(emitter: BlockEmitter, moduleHash: string, option
substreams_endpoint: options.substreamsEndpoint,
start_block_num: String(emitter.request.startBlockNum),
stop_block_num: String(emitter.request.stopBlockNum),
production_mode: String(emitter.request.productionMode)
production_mode: String(emitter.request.productionMode),
final_blocks_only: String(emitter.request.finalBlocksOnly),
}).set(1);
}
9 changes: 5 additions & 4 deletions src/setup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import { health } from "./health.js";

export async function setup(options: RunOptions) {
// Configure logging with TSLog
if (options.verbose) logger.enable();
if (String(options.verbose) === "true") logger.enable();

// Download Substream package
const manifest = options.manifest;
Expand All @@ -33,7 +33,8 @@ export async function setup(options: RunOptions) {
const params = options.params;
const headers = options.headers;
const cursorPath = options.cursorPath;
const productionMode = options.productionMode;
const productionMode = String(options.productionMode) === "true";
const finalBlocksOnly = String(options.finalBlocksOnly) === "true";

// Adding default headers
headers.set("User-Agent", "substreams-sink");
Expand All @@ -51,7 +52,6 @@ export async function setup(options: RunOptions) {

// Connect Transport
const startCursor = await cursor.readCursor(cursorPath);
console.log({startBlockNum, stopBlockNum, startCursor});
const registry = createRegistry(substreamPackage);
const transport = createDefaultTransport(baseUrl, token, registry, headers);
const request = createRequest({
Expand All @@ -61,6 +61,7 @@ export async function setup(options: RunOptions) {
stopBlockNum,
productionMode,
startCursor,
finalBlocksOnly,
});

// Substreams Block Emitter
Expand All @@ -70,7 +71,7 @@ export async function setup(options: RunOptions) {
const moduleHash = await createModuleHashHex(substreamPackage.modules, outputModule);

// Handle Prometheus Metrics
if (options.collectDefaultMetrics) {
if (String(options.collectDefaultMetrics) === "true") {
prometheus.client.collectDefaultMetrics({ labels: options.metricsLabels });
}
prometheus.handleManifest(emitter, moduleHash, options);
Expand Down

0 comments on commit 3c055db

Please sign in to comment.