From 3a4a236949facb13eb898445b2c9ed80d4a51363 Mon Sep 17 00:00:00 2001 From: Francesco Ceccon Date: Mon, 1 Jul 2024 19:39:43 +0200 Subject: [PATCH] indexer: make sink hookable to ensure ordering --- packages/indexer/package.json | 1 - packages/indexer/src/indexer.ts | 4 ++-- packages/indexer/src/sink.ts | 8 ++++---- packages/indexer/src/sinks/csv.ts | 4 ++-- packages/indexer/src/sinks/sqlite.ts | 4 ++-- packages/indexer/src/testing/vcr.ts | 4 ++-- pnpm-lock.yaml | 7 ------- 7 files changed, 12 insertions(+), 20 deletions(-) diff --git a/packages/indexer/package.json b/packages/indexer/package.json index e481967..7cf1c97 100644 --- a/packages/indexer/package.json +++ b/packages/indexer/package.json @@ -58,7 +58,6 @@ "@apibara/protocol": "workspace:*", "@opentelemetry/api": "^1.9.0", "consola": "^3.2.3", - "eventemitter3": "^5.0.1", "hookable": "^5.5.3", "klona": "^2.0.6", "nice-grpc": "^2.1.8", diff --git a/packages/indexer/src/indexer.ts b/packages/indexer/src/indexer.ts index ccbb6f3..8e644c6 100644 --- a/packages/indexer/src/indexer.ts +++ b/packages/indexer/src/indexer.ts @@ -120,10 +120,10 @@ export async function run( const sink = sinkArg ?? defaultSink(); - sink.on("write", async ({ data }) => { + sink.hook("write", async ({ data }) => { await indexer.hooks.callHook("sink:write", { data }); }); - sink.on("flush", async () => { + sink.hook("flush", async () => { await indexer.hooks.callHook("sink:flush"); }); diff --git a/packages/indexer/src/sink.ts b/packages/indexer/src/sink.ts index 1e22be3..b2e2e42 100644 --- a/packages/indexer/src/sink.ts +++ b/packages/indexer/src/sink.ts @@ -1,5 +1,5 @@ import type { Cursor, DataFinality } from "@apibara/protocol"; -import { EventEmitter } from "eventemitter3"; +import { Hookable } from "hookable"; export interface SinkEvents { write({ data }: { data: TData[] }): void; @@ -13,7 +13,7 @@ export type SinkWriteArgs = { finality: DataFinality; }; -export abstract class Sink extends EventEmitter> { +export abstract class Sink extends Hookable> { abstract write({ data, cursor, @@ -24,8 +24,8 @@ export abstract class Sink extends EventEmitter> { export class DefaultSink extends Sink { async write({ data }: SinkWriteArgs) { - this.emit("write", { data }); - this.emit("flush"); + await this.callHook("write", { data }); + await this.callHook("flush"); } } diff --git a/packages/indexer/src/sinks/csv.ts b/packages/indexer/src/sinks/csv.ts index 8f31519..510201f 100644 --- a/packages/indexer/src/sinks/csv.ts +++ b/packages/indexer/src/sinks/csv.ts @@ -34,13 +34,13 @@ export class CsvSink< } async write({ data, endCursor }: SinkWriteArgs) { - this.emit("write", { data }); + await this.callHook("write", { data }); // adds a "_cursor" property if "cursorColumn" is not specified by user data = this.processCursorColumn(data, endCursor); // Insert the data into csv await this.insertToCSV(data); - this.emit("flush"); + await this.callHook("flush"); } private async insertToCSV(data: TData[]) { diff --git a/packages/indexer/src/sinks/sqlite.ts b/packages/indexer/src/sinks/sqlite.ts index 128b631..f1e7930 100644 --- a/packages/indexer/src/sinks/sqlite.ts +++ b/packages/indexer/src/sinks/sqlite.ts @@ -35,12 +35,12 @@ export class SqliteSink< } async write({ data, endCursor }: SinkWriteArgs) { - this.emit("write", { data }); + await this.callHook("write", { data }); data = this.processCursorColumn(data, endCursor); await this.insertJsonArray(data); - this.emit("flush"); + await this.callHook("flush"); } private async insertJsonArray(data: TData[]) { diff --git a/packages/indexer/src/testing/vcr.ts b/packages/indexer/src/testing/vcr.ts index aee71b8..1f718c8 100644 --- a/packages/indexer/src/testing/vcr.ts +++ b/packages/indexer/src/testing/vcr.ts @@ -5,9 +5,9 @@ export class VcrSink extends Sink { public result: VcrReplayResult["outputs"] = []; async write({ data, endCursor }: SinkWriteArgs) { - this.emit("write", { data }); + await this.callHook("write", { data }); this.result.push({ data, endCursor }); - this.emit("flush"); + await this.callHook("flush"); } } diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 24bde22..9a49853 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -248,9 +248,6 @@ importers: consola: specifier: ^3.2.3 version: 3.2.3 - eventemitter3: - specifier: ^5.0.1 - version: 5.0.1 hookable: specifier: ^5.5.3 version: 5.5.3 @@ -3333,10 +3330,6 @@ packages: dependencies: '@types/estree': 1.0.5 - /eventemitter3@5.0.1: - resolution: {integrity: sha512-GWkBvjiSZK87ELrYOSESUYeVIc9mvLLf/nXalMOS5dYrgZq9o5OVkbZAVM06CVxYsCwH9BDZFPlQTlPA1j4ahA==} - dev: false - /execa@8.0.1: resolution: {integrity: sha512-VyhnebXciFV2DESc+p6B+y0LjSm0krU4OgJN44qFAhBY0TJ+1V61tYD2+wHusZ6F9n5K+vl8k0sTy7PEfV4qpg==} engines: {node: '>=16.17'}