Skip to content

Commit

Permalink
remove cursor logic from sink
Browse files Browse the repository at this point in the history
  • Loading branch information
DenisCarriere committed Feb 19, 2024
1 parent 067a058 commit f764976
Show file tree
Hide file tree
Showing 7 changed files with 48 additions and 38 deletions.
3 changes: 2 additions & 1 deletion .env-example
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ SUBSTREAMS_ENDPOINT=https://eth.substreams.pinax.network:443
# SPKG
MANIFEST=https://github.com/pinax-network/substreams/releases/download/blocks-v0.1.0/blocks-v0.1.0.spkg
MODULE_NAME=map_blocks
START_BLOCK=-10
START_BLOCK=1000000
STOP_BLOCK=1000020
26 changes: 16 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,10 @@ Options:
-s --start-block <int> Start block to stream from (defaults to -1, which means the initialBlock of the first module you are streaming) (default: "-1", env: START_BLOCK)
-t --stop-block <int> Stop block to end stream at, inclusively (env: STOP_BLOCK)
-p, --params <string...> Set a params for parameterizable modules. Can be specified multiple times. (ex: -p module1=valA -p module2=valX&valY) (default: [], env: PARAMS)
--substreams-api-token <string> API token for the substream endpoint or API key if '--auth-issue-url' is specified (default: "", env: SUBSTREAMS_API_TOKEN)
--auth-issue-url <string> URL used to issue a token (default: "https://auth.pinax.network/v1/auth/issue", env: AUTH_ISSUE_URL)
--substreams-api-key <string> API key for the Substream endpoint (env: SUBSTREAMS_API_KEY)
--delay-before-start <int> Delay (ms) before starting Substreams (default: 0, env: DELAY_BEFORE_START)
--cursor-path <string> File path or URL to cursor lock file (default: "cursor.lock", env: CURSOR_PATH)
--http-cursor-auth <string> Basic auth credentials for http cursor (ex: username:password) (env: HTTP_CURSOR_AUTH)
--production-mode <boolean> Enable production mode, allows cached substreams data if available (default: "false", env: PRODUCTION_MODE)
--cursor <string> Cursor to stream from. Leave blank for no cursor
--production-mode <boolean> Enable production mode, allows cached Substreams data if available (default: "false", env: PRODUCTION_MODE)
--inactivity-seconds <int> If set, the sink will stop when inactive for over a certain amount of seconds (default: 300, env: INACTIVITY_SECONDS)
--hostname <string> The process will listen on this hostname for any HTTP and Prometheus metrics requests (default: "localhost", env: HOSTNAME)
--port <int> The process will listen on this port for any HTTP and Prometheus metrics requests (default: 9102, env: PORT)
Expand All @@ -80,13 +78,14 @@ SUBSTREAMS_ENDPOINT=https://eth.substreams.pinax.network:443
# SPKG
MANIFEST=https://github.com/pinax-network/substreams/releases/download/blocks-v0.1.0/blocks-v0.1.0.spkg
MODULE_NAME=map_blocks
START_BLOCK=-10
START_BLOCK=1000000
STOP_BLOCK=1000020
```
**example.js**
```js
import pkg from "./package.json" assert { type: "json" };
import { commander, setup, prometheus, http, logger } from "substreams-sink";
import { commander, setup, prometheus, http, logger, fileCursor } from "substreams-sink";

// Setup CLI using Commander
const program = commander.program(pkg);
Expand All @@ -97,8 +96,11 @@ logger.setName(pkg.name);
const customCounter = prometheus.registerCounter("custom_counter");

command.action(async options => {
// Get cursor from file
const cursor = fileCursor.readCursor("cursor.lock");

// Setup sink for Block Emitter
const { emitter } = await setup(options);
const { emitter } = await setup({...options, cursor});

emitter.on("session", (session) => {
console.log(session);
Expand All @@ -119,13 +121,17 @@ command.action(async options => {
// Setup HTTP server & Prometheus metrics
http.listen(options);

// Start the stream
emitter.start();
// Save new cursor on each new block emitted
fileCursor.onCursor(emitter, "cursor.lock");

// Close HTTP server on close
emitter.on("close", () => {
http.server.close();
console.log("✅ finished");
})

// Start the stream
emitter.start();
})
program.parse();
```
15 changes: 11 additions & 4 deletions example.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import pkg from "./package.json" assert { type: "json" };
import { commander, setup, prometheus, http, logger } from "./dist/index.js";
import { commander, setup, prometheus, http, logger, fileCursor } from "./dist/index.js";

// Setup CLI using Commander
const program = commander.program(pkg);
Expand All @@ -10,8 +10,11 @@ logger.setName(pkg.name);
const customCounter = prometheus.registerCounter("custom_counter");

command.action(async options => {
// Get cursor from file
const cursor = fileCursor.readCursor("cursor.lock");

// Setup sink for Block Emitter
const { emitter } = await setup(options);
const { emitter } = await setup({...options, cursor});

emitter.on("session", (session) => {
console.log(session);
Expand All @@ -32,12 +35,16 @@ command.action(async options => {
// Setup HTTP server & Prometheus metrics
http.listen(options);

// Start the stream
emitter.start();
// Save new cursor on each new block emitted
fileCursor.onCursor(emitter, "cursor.lock");

// Close HTTP server on close
emitter.on("close", () => {
http.server.close();
console.log("✅ finished");
})

// Start the stream
emitter.start();
})
program.parse();
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "substreams-sink",
"version": "0.14.0",
"version": "0.15.0",
"description": "Substreams Sink",
"type": "module",
"exports": "./dist/index.js",
Expand Down
12 changes: 3 additions & 9 deletions src/commander.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import "dotenv/config";
import { Command, Option } from "commander";
import { DEFAULT_CURSOR_PATH, DEFAULT_INACTIVITY_SECONDS, DEFAULT_PARAMS, DEFAULT_VERBOSE, DEFAULT_HOSTNAME, DEFAULT_PORT, DEFAULT_METRICS_LABELS, DEFAULT_COLLECT_DEFAULT_METRICS, DEFAULT_START_BLOCK, DEFAULT_DELAY_BEFORE_START, DEFAULT_HEADERS, DEFAULT_PRODUCTION_MODE, DEFAULT_FINAL_BLOCKS_ONLY } from "./config.js";
import { DEFAULT_INACTIVITY_SECONDS, DEFAULT_PARAMS, DEFAULT_VERBOSE, DEFAULT_HOSTNAME, DEFAULT_PORT, DEFAULT_METRICS_LABELS, DEFAULT_COLLECT_DEFAULT_METRICS, DEFAULT_START_BLOCK, DEFAULT_DELAY_BEFORE_START, DEFAULT_HEADERS, DEFAULT_PRODUCTION_MODE, DEFAULT_FINAL_BLOCKS_ONLY } from "./config.js";

import { list } from "./list.js";
import { logger } from "./logger.js";
Expand All @@ -21,8 +21,7 @@ export interface RunOptions {
substreamsApiKey: string;
substreamsApiToken: string; // Deprecated
delayBeforeStart: number;
cursorPath: string;
httpCursorAuth: string;
cursor: string;
productionMode: string;
inactivitySeconds: number;
hostname: string;
Expand Down Expand Up @@ -74,10 +73,6 @@ function handleHeaders(value: string, previous: Headers) {
return headers;
}

function handleHttpCursorAuth(value: string) {
return btoa(value);
}

export function run(program: Command, pkg: Package) {
return program.command("run")
.showHelpAfterError()
Expand All @@ -91,8 +86,7 @@ export function run(program: Command, pkg: Package) {
.addOption(new Option("--substreams-api-key <string>", "API key for the Substream endpoint").env("SUBSTREAMS_API_KEY"))
.addOption(new Option("--substreams-api-token <string>", "(DEPRECATED) API token for the Substream endpoint").hideHelp(true).env("SUBSTREAMS_API_TOKEN"))
.addOption(new Option("--delay-before-start <int>", "Delay (ms) before starting Substreams").default(DEFAULT_DELAY_BEFORE_START).env("DELAY_BEFORE_START"))
.addOption(new Option("--cursor-path <string>", "File path or URL to cursor lock file").default(DEFAULT_CURSOR_PATH).env("CURSOR_PATH"))
.addOption(new Option("--http-cursor-auth <string>", "Basic auth credentials for http cursor (ex: username:password)").env("HTTP_CURSOR_AUTH").argParser(handleHttpCursorAuth))
.addOption(new Option("--cursor <string>", "Cursor to stream from. Leave blank for no cursor"))
.addOption(new Option("--production-mode <boolean>", "Enable production mode, allows cached Substreams data if available").default(DEFAULT_PRODUCTION_MODE).env("PRODUCTION_MODE"))
.addOption(new Option("--inactivity-seconds <int>", "If set, the sink will stop when inactive for over a certain amount of seconds").default(DEFAULT_INACTIVITY_SECONDS).env("INACTIVITY_SECONDS"))
.addOption(new Option("--hostname <string>", "The process will listen on this hostname for any HTTP and Prometheus metrics requests").default(DEFAULT_HOSTNAME).env("HOSTNAME"))
Expand Down
14 changes: 13 additions & 1 deletion src/logger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ import { Logger, type ILogObj } from "tslog";

class SinkLogger extends Logger<ILogObj> {
constructor() {
super();
super({
prettyLogTemplate: "{{yyyy}}.{{mm}}.{{dd}} {{hh}}:{{MM}}:{{ss}}\t{{logLevelName}}\t{{name}}\t"
});
this.disable();
this.setName("substreams-sink");
}
Expand All @@ -20,6 +22,16 @@ class SinkLogger extends Logger<ILogObj> {
this.settings.type = "hidden";
this.settings.minLevel = 5;
}

public info(...info: unknown[]) {
const messages = info.map((i) => (typeof i === "string" ? i : JSON.stringify(i)));
return super.info(...messages);
}

public error(...err: unknown[]) {
const errors = err.map((e) => (typeof e === "string" ? e : JSON.stringify(e)));
return super.error(...errors);
}
}

export const logger = new SinkLogger();
14 changes: 2 additions & 12 deletions src/setup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import { BlockEmitter } from "@substreams/node";
import { readPackage } from "@substreams/manifest";
import { setTimeout } from "timers/promises";
import type { RunOptions } from "./commander.js";
import * as fileCursor from "./cursor/fileCursor.js";
import * as httpCursor from "./cursor/httpCursor.js";
import * as prometheus from "./prometheus.js";
import { logger } from "./logger.js";
import { onInactivitySeconds } from "./inactivitySeconds.js";
Expand Down Expand Up @@ -36,8 +34,7 @@ export async function setup(options: RunOptions) {
const stopBlockNum = options.stopBlock as any;
const params = options.params;
const headers = options.headers;
const cursorPath = options.cursorPath;
const httpCursorAuth = options.httpCursorAuth;
const startCursor = options.cursor;
const productionMode = String(options.productionMode) === "true";
const finalBlocksOnly = String(options.finalBlocksOnly) === "true";

Expand All @@ -52,11 +49,7 @@ export async function setup(options: RunOptions) {
applyParams(params, substreamPackage.modules.modules);
}

// Cursor
const cursor = cursorPath.startsWith("http") ? httpCursor : fileCursor;

// Connect Transport
const startCursor = await cursor.readCursor(cursorPath, httpCursorAuth);
const registry = createRegistry(substreamPackage);
const transport = createNodeTransport(baseUrl, token, registry, headers);
const request = createRequest({
Expand All @@ -82,14 +75,11 @@ export async function setup(options: RunOptions) {
prometheus.handleManifest(emitter, moduleHash, options);
prometheus.onPrometheusMetrics(emitter);

// Save new cursor on each new block emitted
cursor.onCursor(emitter, cursorPath);

// Adds delay before using sink
await setTimeout(options.delayBeforeStart);

// Stop on inactivity
onInactivitySeconds(emitter, options.inactivitySeconds, stopBlockNum !== undefined);

return { emitter, substreamPackage, moduleHash, startCursor };
return { emitter, substreamPackage, moduleHash };
}

0 comments on commit f764976

Please sign in to comment.