Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improve CLI arguments & add plaintext #31

Merged
merged 1 commit into from
Mar 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 19 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ Usage: substreams-sink run [options]
Substreams Sink

Options:
-v, --version version for substreams-sink
-e --substreams-endpoint <string> Substreams gRPC endpoint to stream data from (env: SUBSTREAMS_ENDPOINT)
--manifest <string> URL of Substreams package (env: MANIFEST)
--module-name <string> Name of the output module (declared in the manifest) (env: MODULE_NAME)
Expand All @@ -52,15 +53,16 @@ Options:
--substreams-api-key <string> API key for the Substream endpoint (env: SUBSTREAMS_API_KEY)
--delay-before-start <int> Delay (ms) before starting Substreams (default: 0, env: DELAY_BEFORE_START)
--cursor <string> Cursor to stream from. Leave blank for no cursor
--production-mode <boolean> Enable production mode, allows cached Substreams data if available (default: "false", env: PRODUCTION_MODE)
--production-mode <boolean> Enable production mode, allows cached Substreams data if available (choices: "true", "false", default: false, env: PRODUCTION_MODE)
--final-blocks-only <boolean> Only process blocks that have pass finality, to prevent any reorg and undo signal by staying further away from the chain HEAD (choices: "true", "false", default: false, env: FINAL_BLOCKS_ONLY)
--inactivity-seconds <int> If set, the sink will stop when inactive for over a certain amount of seconds (default: 300, env: INACTIVITY_SECONDS)
--headers [string...] Set headers that will be sent on every requests (ex: --headers X-HEADER=headerA) (default: {}, env: HEADERS)
--plaintext <boolean> Establish GRPC connection in plaintext (choices: "true", "false", default: false, env: PLAIN_TEXT)
--verbose <boolean> Enable verbose logging (choices: "true", "false", default: false, env: VERBOSE)
--hostname <string> The process will listen on this hostname for any HTTP and Prometheus metrics requests (default: "localhost", env: HOSTNAME)
--port <int> The process will listen on this port for any HTTP and Prometheus metrics requests (default: 9102, env: PORT)
--metrics-labels [string...] To apply generic labels to all default metrics (ex: --labels foo=bar) (default: {}, env: METRICS_LABELS)
--collect-default-metrics <boolean> Collect default metrics (default: "false", env: COLLECT_DEFAULT_METRICS)
--headers [string...] Set headers that will be sent on every requests (ex: --headers X-HEADER=headerA) (default: {}, env: HEADERS)
--final-blocks-only <boolean> Only process blocks that have pass finality, to prevent any reorg and undo signal by staying further away from the chain HEAD (default: "false", env: FINAL_BLOCKS_ONLY)
--verbose <boolean> Enable verbose logging (default: "false", env: VERBOSE)
--collect-default-metrics <boolean> Collect default metrics (choices: "true", "false", default: false, env: COLLECT_DEFAULT_METRICS)
-h, --help display help for command
```

Expand All @@ -84,12 +86,22 @@ STOP_BLOCK=1000020

**example.js**
```js
import pkg from "./package.json" assert { type: "json" };
import { commander, setup, prometheus, http, logger, fileCursor } from "substreams-sink";

const pkg = {
name: "substreams-sink",
version: "0.0.1",
description: "Substreams Sink long description",
}

// Setup CLI using Commander
const program = commander.program(pkg);
const command = commander.addRunOptions(program);
logger.setName(pkg.name);

// Setup CLI using Commander
const program = commander.program(pkg);
const command = commander.run(program, pkg);
const command = commander.addRunOptions(program);
logger.setName(pkg.name);

// Custom Prometheus Counters
Expand Down
9 changes: 7 additions & 2 deletions example.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
import pkg from "./package.json" assert { type: "json" };
import { commander, setup, prometheus, http, logger, fileCursor } from "./dist/index.js";

const pkg = {
name: "substreams-sink",
version: "0.0.1",
description: "Substreams Sink long description",
}

// Setup CLI using Commander
const program = commander.program(pkg);
const command = commander.run(program, pkg);
const command = commander.addRunOptions(program);
logger.setName(pkg.name);

// Custom Prometheus Counters
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "substreams-sink",
"version": "0.15.2",
"version": "0.16.0",
"description": "Substreams Sink",
"type": "module",
"exports": "./dist/index.js",
Expand Down
53 changes: 33 additions & 20 deletions src/commander.ts
Original file line number Diff line number Diff line change
@@ -1,38 +1,42 @@
import "dotenv/config";
import { Command, Option } from "commander";
import { DEFAULT_INACTIVITY_SECONDS, DEFAULT_PARAMS, DEFAULT_VERBOSE, DEFAULT_HOSTNAME, DEFAULT_PORT, DEFAULT_METRICS_LABELS, DEFAULT_COLLECT_DEFAULT_METRICS, DEFAULT_START_BLOCK, DEFAULT_DELAY_BEFORE_START, DEFAULT_HEADERS, DEFAULT_PRODUCTION_MODE, DEFAULT_FINAL_BLOCKS_ONLY } from "./config.js";
import { DEFAULT_INACTIVITY_SECONDS, DEFAULT_PARAMS, DEFAULT_HOSTNAME, DEFAULT_PORT, DEFAULT_METRICS_LABELS, DEFAULT_START_BLOCK, DEFAULT_DELAY_BEFORE_START, DEFAULT_HEADERS } from "./config.js";

export interface Package {
name: string;
version: string;
description: string;
name?: string;
version?: string;
description?: string;
}

export interface RunOptions {
substreamsEndpoint: string;
manifest: string;
moduleName: string;
params: string[];
startBlock: string;
stopBlock: string;
startBlock: number | bigint | undefined;
stopBlock: number | bigint | `+${number}` | undefined;
substreamsApiKey: string;
substreamsApiToken: string; // Deprecated
delayBeforeStart: number;
cursor: string;
productionMode: string;
productionMode: boolean;
inactivitySeconds: number;
hostname: string;
port: number;
metricsLabels: string[];
collectDefaultMetrics: string;
collectDefaultMetrics: boolean;
headers: Headers;
verbose: string;
finalBlocksOnly: string;
verbose: boolean;
finalBlocksOnly: boolean;
plaintext: boolean;
}

export function program(pkg: Package) {
export function program(pkg?: Package) {
const program = new Command();
program.name(pkg.name).version(pkg.version, "-v, --version", `version for ${pkg.name}`);
const name = pkg?.name ?? "substreams-sink";
program.name(name);
if ( pkg?.version ) program.version(pkg.version, "-v, --version", `version for ${name}`);
if ( pkg?.description ) program.description(pkg.description);
program.command("completion").description("Generate the autocompletion script for the specified shell");
program.command("help").description("Display help for command");
program.showHelpAfterError();
Expand Down Expand Up @@ -60,8 +64,8 @@ function handleHeaders(value: string, previous: Headers) {
return headers;
}

export function run(program: Command, pkg: Package, options: AddRunOptions = {}) {
return addRunOptions(program.command("run"), pkg, options);
export function run(program: Command, options: AddRunOptions = {}) {
return addRunOptions(program.command("run"), options);
}

export function list(program: Command) {
Expand All @@ -82,10 +86,18 @@ interface AddRunOptions {
metrics?: boolean;
}

export function addRunOptions(program: Command, pkg: Package, options: AddRunOptions = {}) {
function parseBoolean(value?: string) {
if ( value !== undefined ) return value.toLocaleLowerCase() === "true";
return false;
}

function addBoolean(flags: string, description: string, env: string) {
return new Option(flags, description).default(false).env(env).choices(["true", "false"]).argParser(parseBoolean);
}

export function addRunOptions(program: Command, options: AddRunOptions = {}) {
const command = program
.showHelpAfterError()
.description(pkg.description)
.addOption(new Option("-e --substreams-endpoint <string>", "Substreams gRPC endpoint to stream data from").makeOptionMandatory().env("SUBSTREAMS_ENDPOINT"))
.addOption(new Option("--manifest <string>", "URL of Substreams package").makeOptionMandatory().env("MANIFEST"))
.addOption(new Option("--module-name <string>", "Name of the output module (declared in the manifest)").makeOptionMandatory().env("MODULE_NAME"))
Expand All @@ -96,11 +108,12 @@ export function addRunOptions(program: Command, pkg: Package, options: AddRunOpt
.addOption(new Option("--substreams-api-token <string>", "(DEPRECATED) API token for the Substream endpoint").hideHelp(true).env("SUBSTREAMS_API_TOKEN"))
.addOption(new Option("--delay-before-start <int>", "Delay (ms) before starting Substreams").default(DEFAULT_DELAY_BEFORE_START).env("DELAY_BEFORE_START"))
.addOption(new Option("--cursor <string>", "Cursor to stream from. Leave blank for no cursor"))
.addOption(new Option("--production-mode <boolean>", "Enable production mode, allows cached Substreams data if available").default(DEFAULT_PRODUCTION_MODE).env("PRODUCTION_MODE"))
.addOption(addBoolean("--production-mode <boolean>", "Enable production mode, allows cached Substreams data if available", "PRODUCTION_MODE"))
.addOption(addBoolean("--final-blocks-only <boolean>", "Only process blocks that have pass finality, to prevent any reorg and undo signal by staying further away from the chain HEAD", "FINAL_BLOCKS_ONLY"))
.addOption(new Option("--inactivity-seconds <int>", "If set, the sink will stop when inactive for over a certain amount of seconds").default(DEFAULT_INACTIVITY_SECONDS).env("INACTIVITY_SECONDS"))
.addOption(new Option("--headers [string...]", "Set headers that will be sent on every requests (ex: --headers X-HEADER=headerA)").default(DEFAULT_HEADERS).env("HEADERS").argParser(handleHeaders))
.addOption(new Option("--final-blocks-only <boolean>", "Only process blocks that have pass finality, to prevent any reorg and undo signal by staying further away from the chain HEAD").default(DEFAULT_FINAL_BLOCKS_ONLY).env("FINAL_BLOCKS_ONLY"))
.addOption(new Option("--verbose <boolean>", "Enable verbose logging").default(DEFAULT_VERBOSE).env("VERBOSE"))
.addOption(addBoolean("--plaintext <boolean>", "Establish GRPC connection in plaintext", "PLAIN_TEXT"))
.addOption(addBoolean("--verbose <boolean>", "Enable verbose logging", "VERBOSE"));

// HTTP and Prometheus metrics options
if ( options.http !== false ) {
Expand All @@ -111,7 +124,7 @@ export function addRunOptions(program: Command, pkg: Package, options: AddRunOpt
if ( options.metrics !== false ) {
command
.addOption(new Option("--metrics-labels [string...]", "To apply generic labels to all default metrics (ex: --labels foo=bar)").default(DEFAULT_METRICS_LABELS).env("METRICS_LABELS").argParser(handleMetricsLabels))
.addOption(new Option("--collect-default-metrics <boolean>", "Collect default metrics").default(DEFAULT_COLLECT_DEFAULT_METRICS).env("COLLECT_DEFAULT_METRICS"))
.addOption(addBoolean("--collect-default-metrics <boolean>", "Collect default metrics", "COLLECT_DEFAULT_METRICS"));
}
return command;
}
4 changes: 0 additions & 4 deletions src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,9 @@
export const DEFAULT_HOSTNAME = "localhost";
export const DEFAULT_PORT = 9102;
export const DEFAULT_CURSOR_PATH = "cursor.lock";
export const DEFAULT_VERBOSE = "false";
export const DEFAULT_INACTIVITY_SECONDS = 300;
export const DEFAULT_PRODUCTION_MODE = "false";
export const DEFAULT_DELAY_BEFORE_START = 0;
export const DEFAULT_METRICS_LABELS = {};
export const DEFAULT_COLLECT_DEFAULT_METRICS = "false";
export const DEFAULT_START_BLOCK = "-1";
export const DEFAULT_PARAMS = [];
export const DEFAULT_HEADERS = new Headers();
export const DEFAULT_FINAL_BLOCKS_ONLY = "false";
5 changes: 2 additions & 3 deletions src/http.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,12 @@ export const server = http.createServer(async (req, res) => {
}
});

export async function listen(options: RunOptions) {
export async function listen(options: RunOptions): Promise<http.Server> {
const hostname = options.hostname;
const port = options.port;
return new Promise(resolve => {
server.listen(port, hostname, () => {
logger.info("prometheus server", { hostname, port });
resolve(true);
resolve(server);
});
})
}
4 changes: 1 addition & 3 deletions src/list.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
import { readPackage } from "@substreams/manifest";
import { getModules } from "@substreams/core";

import { logger } from "./logger.js";

export async function list(url: string) {
const spkg = await readPackage(url)
const compatible = []

for (const { name, output } of getModules(spkg)) {
if (!output) continue;
logger.info('module', { name, output })
console.log('module', { name, output })
compatible.push(name)
}

Expand Down
2 changes: 0 additions & 2 deletions src/prometheus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@ export function onPrometheusMetrics(emitter: BlockEmitter) {
}

export function handleSession(session: SessionInit) {
logger.info("session", { traceId: String(session.traceId), resolvedStartBlock: String(session.resolvedStartBlock), linearHandoffBlock: String(session.linearHandoffBlock), maxParallelWorkers: String(session.maxParallelWorkers) });
const labelNames = ["trace_id", "resolved_start_block", "linear_handoff_block", "max_parallel_workers"];
const gauge = registerGauge("session", "Substreams Session", labelNames) as Gauge;
gauge.labels({
Expand All @@ -106,7 +105,6 @@ export function handleSession(session: SessionInit) {
}

export function handleManifest(emitter: BlockEmitter, moduleHash: string, options: RunOptions) {
logger.info("manifest", { moduleHash, manifest: options.manifest, substreamsEndpoint: options.substreamsEndpoint, finalBlocksOnly: options.finalBlocksOnly, productionMode: options.productionMode });
const labelNames = ["module_hash", "manifest", "output_module", "substreams_endpoint", "start_block_num", "stop_block_num", "production_mode", "final_blocks_only"];
const gauge = registerGauge("manifest", "Substreams manifest and sha256 hash of map module", labelNames) as Gauge;
gauge.labels({
Expand Down
25 changes: 12 additions & 13 deletions src/setup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import { health } from "./health.js";

export async function setup(options: RunOptions) {
// Configure logging with TSLog
if (String(options.verbose) === "true") logger.enable();
if (options.verbose) logger.enable();

// Download Substream package
const manifest = options.manifest;
Expand All @@ -24,20 +24,19 @@ export async function setup(options: RunOptions) {
const token = options.substreamsApiKey ?? options.substreamsApiToken;
if ( token?.includes(".")) throw new Error("JWT token is not longer supported, please use Substreams API key instead");

// append https if not present
if (baseUrl.match(/http/) === null) {
baseUrl = `https://${baseUrl}`;
}

// User parameters
const outputModule = options.moduleName;
const startBlockNum = options.startBlock as any;
const stopBlockNum = options.stopBlock as any;
const params = options.params;
const headers = options.headers;
const startCursor = options.cursor;
const productionMode = String(options.productionMode) === "true";
const finalBlocksOnly = String(options.finalBlocksOnly) === "true";
const stopBlockNum = options.stopBlock;
const { moduleName: outputModule, cursor: startCursor } = options; // renamed otions
const { params, headers, productionMode, finalBlocksOnly, plaintext } = options;

// append https/http if not present
if (!baseUrl.startsWith("http")) {
baseUrl = `${plaintext ? "http" : "https"}://${baseUrl}`;
}
if ( plaintext && baseUrl.startsWith("https")) {
throw new Error("--plaintext mode is not supported with https");
}

// Adding default headers
headers.set("X-User-Agent", "substreams-sink");
Expand Down
Loading