From 59829abdbc0af24b9ce8dc6f13e8ceaab9403b9e Mon Sep 17 00:00:00 2001 From: Denis Carriere Date: Sun, 3 Mar 2024 11:47:24 -0500 Subject: [PATCH] improve CLI arguments & add plaintext --- README.md | 26 ++++++++++++++++------- example.js | 9 ++++++-- package.json | 2 +- src/commander.ts | 53 +++++++++++++++++++++++++++++------------------ src/config.ts | 4 ---- src/http.ts | 5 ++--- src/list.ts | 4 +--- src/prometheus.ts | 2 -- src/setup.ts | 25 +++++++++++----------- 9 files changed, 75 insertions(+), 55 deletions(-) diff --git a/README.md b/README.md index 10a858e..510b2fe 100644 --- a/README.md +++ b/README.md @@ -43,6 +43,7 @@ Usage: substreams-sink run [options] Substreams Sink Options: + -v, --version version for substreams-sink -e --substreams-endpoint Substreams gRPC endpoint to stream data from (env: SUBSTREAMS_ENDPOINT) --manifest URL of Substreams package (env: MANIFEST) --module-name Name of the output module (declared in the manifest) (env: MODULE_NAME) @@ -52,15 +53,16 @@ Options: --substreams-api-key API key for the Substream endpoint (env: SUBSTREAMS_API_KEY) --delay-before-start Delay (ms) before starting Substreams (default: 0, env: DELAY_BEFORE_START) --cursor Cursor to stream from. Leave blank for no cursor - --production-mode Enable production mode, allows cached Substreams data if available (default: "false", env: PRODUCTION_MODE) + --production-mode Enable production mode, allows cached Substreams data if available (choices: "true", "false", default: false, env: PRODUCTION_MODE) + --final-blocks-only Only process blocks that have pass finality, to prevent any reorg and undo signal by staying further away from the chain HEAD (choices: "true", "false", default: false, env: FINAL_BLOCKS_ONLY) --inactivity-seconds If set, the sink will stop when inactive for over a certain amount of seconds (default: 300, env: INACTIVITY_SECONDS) + --headers [string...] Set headers that will be sent on every requests (ex: --headers X-HEADER=headerA) (default: {}, env: HEADERS) + --plaintext Establish GRPC connection in plaintext (choices: "true", "false", default: false, env: PLAIN_TEXT) + --verbose Enable verbose logging (choices: "true", "false", default: false, env: VERBOSE) --hostname The process will listen on this hostname for any HTTP and Prometheus metrics requests (default: "localhost", env: HOSTNAME) --port The process will listen on this port for any HTTP and Prometheus metrics requests (default: 9102, env: PORT) --metrics-labels [string...] To apply generic labels to all default metrics (ex: --labels foo=bar) (default: {}, env: METRICS_LABELS) - --collect-default-metrics Collect default metrics (default: "false", env: COLLECT_DEFAULT_METRICS) - --headers [string...] Set headers that will be sent on every requests (ex: --headers X-HEADER=headerA) (default: {}, env: HEADERS) - --final-blocks-only Only process blocks that have pass finality, to prevent any reorg and undo signal by staying further away from the chain HEAD (default: "false", env: FINAL_BLOCKS_ONLY) - --verbose Enable verbose logging (default: "false", env: VERBOSE) + --collect-default-metrics Collect default metrics (choices: "true", "false", default: false, env: COLLECT_DEFAULT_METRICS) -h, --help display help for command ``` @@ -84,12 +86,22 @@ STOP_BLOCK=1000020 **example.js** ```js -import pkg from "./package.json" assert { type: "json" }; import { commander, setup, prometheus, http, logger, fileCursor } from "substreams-sink"; +const pkg = { + name: "substreams-sink", + version: "0.0.1", + description: "Substreams Sink long description", +} + +// Setup CLI using Commander +const program = commander.program(pkg); +const command = commander.addRunOptions(program); +logger.setName(pkg.name); + // Setup CLI using Commander const program = commander.program(pkg); -const command = commander.run(program, pkg); +const command = commander.addRunOptions(program); logger.setName(pkg.name); // Custom Prometheus Counters diff --git a/example.js b/example.js index 111ed8b..113fb58 100644 --- a/example.js +++ b/example.js @@ -1,9 +1,14 @@ -import pkg from "./package.json" assert { type: "json" }; import { commander, setup, prometheus, http, logger, fileCursor } from "./dist/index.js"; +const pkg = { + name: "substreams-sink", + version: "0.0.1", + description: "Substreams Sink long description", +} + // Setup CLI using Commander const program = commander.program(pkg); -const command = commander.run(program, pkg); +const command = commander.addRunOptions(program); logger.setName(pkg.name); // Custom Prometheus Counters diff --git a/package.json b/package.json index 9267fab..420fed2 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "substreams-sink", - "version": "0.15.2", + "version": "0.16.0", "description": "Substreams Sink", "type": "module", "exports": "./dist/index.js", diff --git a/src/commander.ts b/src/commander.ts index 1af5a1a..f4dd75f 100644 --- a/src/commander.ts +++ b/src/commander.ts @@ -1,11 +1,11 @@ import "dotenv/config"; import { Command, Option } from "commander"; -import { DEFAULT_INACTIVITY_SECONDS, DEFAULT_PARAMS, DEFAULT_VERBOSE, DEFAULT_HOSTNAME, DEFAULT_PORT, DEFAULT_METRICS_LABELS, DEFAULT_COLLECT_DEFAULT_METRICS, DEFAULT_START_BLOCK, DEFAULT_DELAY_BEFORE_START, DEFAULT_HEADERS, DEFAULT_PRODUCTION_MODE, DEFAULT_FINAL_BLOCKS_ONLY } from "./config.js"; +import { DEFAULT_INACTIVITY_SECONDS, DEFAULT_PARAMS, DEFAULT_HOSTNAME, DEFAULT_PORT, DEFAULT_METRICS_LABELS, DEFAULT_START_BLOCK, DEFAULT_DELAY_BEFORE_START, DEFAULT_HEADERS } from "./config.js"; export interface Package { - name: string; - version: string; - description: string; + name?: string; + version?: string; + description?: string; } export interface RunOptions { @@ -13,26 +13,30 @@ export interface RunOptions { manifest: string; moduleName: string; params: string[]; - startBlock: string; - stopBlock: string; + startBlock: number | bigint | undefined; + stopBlock: number | bigint | `+${number}` | undefined; substreamsApiKey: string; substreamsApiToken: string; // Deprecated delayBeforeStart: number; cursor: string; - productionMode: string; + productionMode: boolean; inactivitySeconds: number; hostname: string; port: number; metricsLabels: string[]; - collectDefaultMetrics: string; + collectDefaultMetrics: boolean; headers: Headers; - verbose: string; - finalBlocksOnly: string; + verbose: boolean; + finalBlocksOnly: boolean; + plaintext: boolean; } -export function program(pkg: Package) { +export function program(pkg?: Package) { const program = new Command(); - program.name(pkg.name).version(pkg.version, "-v, --version", `version for ${pkg.name}`); + const name = pkg?.name ?? "substreams-sink"; + program.name(name); + if ( pkg?.version ) program.version(pkg.version, "-v, --version", `version for ${name}`); + if ( pkg?.description ) program.description(pkg.description); program.command("completion").description("Generate the autocompletion script for the specified shell"); program.command("help").description("Display help for command"); program.showHelpAfterError(); @@ -60,8 +64,8 @@ function handleHeaders(value: string, previous: Headers) { return headers; } -export function run(program: Command, pkg: Package, options: AddRunOptions = {}) { - return addRunOptions(program.command("run"), pkg, options); +export function run(program: Command, options: AddRunOptions = {}) { + return addRunOptions(program.command("run"), options); } export function list(program: Command) { @@ -82,10 +86,18 @@ interface AddRunOptions { metrics?: boolean; } -export function addRunOptions(program: Command, pkg: Package, options: AddRunOptions = {}) { +function parseBoolean(value?: string) { + if ( value !== undefined ) return value.toLocaleLowerCase() === "true"; + return false; +} + +function addBoolean(flags: string, description: string, env: string) { + return new Option(flags, description).default(false).env(env).choices(["true", "false"]).argParser(parseBoolean); +} + +export function addRunOptions(program: Command, options: AddRunOptions = {}) { const command = program .showHelpAfterError() - .description(pkg.description) .addOption(new Option("-e --substreams-endpoint ", "Substreams gRPC endpoint to stream data from").makeOptionMandatory().env("SUBSTREAMS_ENDPOINT")) .addOption(new Option("--manifest ", "URL of Substreams package").makeOptionMandatory().env("MANIFEST")) .addOption(new Option("--module-name ", "Name of the output module (declared in the manifest)").makeOptionMandatory().env("MODULE_NAME")) @@ -96,11 +108,12 @@ export function addRunOptions(program: Command, pkg: Package, options: AddRunOpt .addOption(new Option("--substreams-api-token ", "(DEPRECATED) API token for the Substream endpoint").hideHelp(true).env("SUBSTREAMS_API_TOKEN")) .addOption(new Option("--delay-before-start ", "Delay (ms) before starting Substreams").default(DEFAULT_DELAY_BEFORE_START).env("DELAY_BEFORE_START")) .addOption(new Option("--cursor ", "Cursor to stream from. Leave blank for no cursor")) - .addOption(new Option("--production-mode ", "Enable production mode, allows cached Substreams data if available").default(DEFAULT_PRODUCTION_MODE).env("PRODUCTION_MODE")) + .addOption(addBoolean("--production-mode ", "Enable production mode, allows cached Substreams data if available", "PRODUCTION_MODE")) + .addOption(addBoolean("--final-blocks-only ", "Only process blocks that have pass finality, to prevent any reorg and undo signal by staying further away from the chain HEAD", "FINAL_BLOCKS_ONLY")) .addOption(new Option("--inactivity-seconds ", "If set, the sink will stop when inactive for over a certain amount of seconds").default(DEFAULT_INACTIVITY_SECONDS).env("INACTIVITY_SECONDS")) .addOption(new Option("--headers [string...]", "Set headers that will be sent on every requests (ex: --headers X-HEADER=headerA)").default(DEFAULT_HEADERS).env("HEADERS").argParser(handleHeaders)) - .addOption(new Option("--final-blocks-only ", "Only process blocks that have pass finality, to prevent any reorg and undo signal by staying further away from the chain HEAD").default(DEFAULT_FINAL_BLOCKS_ONLY).env("FINAL_BLOCKS_ONLY")) - .addOption(new Option("--verbose ", "Enable verbose logging").default(DEFAULT_VERBOSE).env("VERBOSE")) + .addOption(addBoolean("--plaintext ", "Establish GRPC connection in plaintext", "PLAIN_TEXT")) + .addOption(addBoolean("--verbose ", "Enable verbose logging", "VERBOSE")); // HTTP and Prometheus metrics options if ( options.http !== false ) { @@ -111,7 +124,7 @@ export function addRunOptions(program: Command, pkg: Package, options: AddRunOpt if ( options.metrics !== false ) { command .addOption(new Option("--metrics-labels [string...]", "To apply generic labels to all default metrics (ex: --labels foo=bar)").default(DEFAULT_METRICS_LABELS).env("METRICS_LABELS").argParser(handleMetricsLabels)) - .addOption(new Option("--collect-default-metrics ", "Collect default metrics").default(DEFAULT_COLLECT_DEFAULT_METRICS).env("COLLECT_DEFAULT_METRICS")) + .addOption(addBoolean("--collect-default-metrics ", "Collect default metrics", "COLLECT_DEFAULT_METRICS")); } return command; } diff --git a/src/config.ts b/src/config.ts index 5e52f19..05f6ba5 100644 --- a/src/config.ts +++ b/src/config.ts @@ -2,13 +2,9 @@ export const DEFAULT_HOSTNAME = "localhost"; export const DEFAULT_PORT = 9102; export const DEFAULT_CURSOR_PATH = "cursor.lock"; -export const DEFAULT_VERBOSE = "false"; export const DEFAULT_INACTIVITY_SECONDS = 300; -export const DEFAULT_PRODUCTION_MODE = "false"; export const DEFAULT_DELAY_BEFORE_START = 0; export const DEFAULT_METRICS_LABELS = {}; -export const DEFAULT_COLLECT_DEFAULT_METRICS = "false"; export const DEFAULT_START_BLOCK = "-1"; export const DEFAULT_PARAMS = []; export const DEFAULT_HEADERS = new Headers(); -export const DEFAULT_FINAL_BLOCKS_ONLY = "false"; \ No newline at end of file diff --git a/src/http.ts b/src/http.ts index 2de4ce2..1185900 100644 --- a/src/http.ts +++ b/src/http.ts @@ -14,13 +14,12 @@ export const server = http.createServer(async (req, res) => { } }); -export async function listen(options: RunOptions) { +export async function listen(options: RunOptions): Promise { const hostname = options.hostname; const port = options.port; return new Promise(resolve => { server.listen(port, hostname, () => { - logger.info("prometheus server", { hostname, port }); - resolve(true); + resolve(server); }); }) } \ No newline at end of file diff --git a/src/list.ts b/src/list.ts index b0583f7..a8dd5c6 100644 --- a/src/list.ts +++ b/src/list.ts @@ -1,15 +1,13 @@ import { readPackage } from "@substreams/manifest"; import { getModules } from "@substreams/core"; -import { logger } from "./logger.js"; - export async function list(url: string) { const spkg = await readPackage(url) const compatible = [] for (const { name, output } of getModules(spkg)) { if (!output) continue; - logger.info('module', { name, output }) + console.log('module', { name, output }) compatible.push(name) } diff --git a/src/prometheus.ts b/src/prometheus.ts index ad41a2a..8bf32d4 100644 --- a/src/prometheus.ts +++ b/src/prometheus.ts @@ -94,7 +94,6 @@ export function onPrometheusMetrics(emitter: BlockEmitter) { } export function handleSession(session: SessionInit) { - logger.info("session", { traceId: String(session.traceId), resolvedStartBlock: String(session.resolvedStartBlock), linearHandoffBlock: String(session.linearHandoffBlock), maxParallelWorkers: String(session.maxParallelWorkers) }); const labelNames = ["trace_id", "resolved_start_block", "linear_handoff_block", "max_parallel_workers"]; const gauge = registerGauge("session", "Substreams Session", labelNames) as Gauge; gauge.labels({ @@ -106,7 +105,6 @@ export function handleSession(session: SessionInit) { } export function handleManifest(emitter: BlockEmitter, moduleHash: string, options: RunOptions) { - logger.info("manifest", { moduleHash, manifest: options.manifest, substreamsEndpoint: options.substreamsEndpoint, finalBlocksOnly: options.finalBlocksOnly, productionMode: options.productionMode }); const labelNames = ["module_hash", "manifest", "output_module", "substreams_endpoint", "start_block_num", "stop_block_num", "production_mode", "final_blocks_only"]; const gauge = registerGauge("manifest", "Substreams manifest and sha256 hash of map module", labelNames) as Gauge; gauge.labels({ diff --git a/src/setup.ts b/src/setup.ts index 789e6ed..400a823 100644 --- a/src/setup.ts +++ b/src/setup.ts @@ -12,7 +12,7 @@ import { health } from "./health.js"; export async function setup(options: RunOptions) { // Configure logging with TSLog - if (String(options.verbose) === "true") logger.enable(); + if (options.verbose) logger.enable(); // Download Substream package const manifest = options.manifest; @@ -24,20 +24,19 @@ export async function setup(options: RunOptions) { const token = options.substreamsApiKey ?? options.substreamsApiToken; if ( token?.includes(".")) throw new Error("JWT token is not longer supported, please use Substreams API key instead"); - // append https if not present - if (baseUrl.match(/http/) === null) { - baseUrl = `https://${baseUrl}`; - } - // User parameters - const outputModule = options.moduleName; const startBlockNum = options.startBlock as any; - const stopBlockNum = options.stopBlock as any; - const params = options.params; - const headers = options.headers; - const startCursor = options.cursor; - const productionMode = String(options.productionMode) === "true"; - const finalBlocksOnly = String(options.finalBlocksOnly) === "true"; + const stopBlockNum = options.stopBlock; + const { moduleName: outputModule, cursor: startCursor } = options; // renamed otions + const { params, headers, productionMode, finalBlocksOnly, plaintext } = options; + + // append https/http if not present + if (!baseUrl.startsWith("http")) { + baseUrl = `${plaintext ? "http" : "https"}://${baseUrl}`; + } + if ( plaintext && baseUrl.startsWith("https")) { + throw new Error("--plaintext mode is not supported with https"); + } // Adding default headers headers.set("X-User-Agent", "substreams-sink");