Skip to content

Commit

Permalink
indexer: make sink hookable to ensure ordering (#90)
Browse files Browse the repository at this point in the history
Previously, the `Sink` interface used eventemitter to signal when data
was written or flushed.
EventEmitter emits events in order, but doesn't ensure that the
listeners are completely done
before calling the listeners for the next events.

This caused issues because we expect listeners for a batch of data to be
completely done
before we call the listeners for the next batch of data.

Switch Sink to extend the `Hookable` class so that calls to `callHook`
await for all
callbacks to be finished before returning.
  • Loading branch information
jaipaljadeja authored Jul 1, 2024
2 parents 2fdfade + 3a4a236 commit 56b6490
Show file tree
Hide file tree
Showing 7 changed files with 12 additions and 20 deletions.
1 change: 0 additions & 1 deletion packages/indexer/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@
"@apibara/protocol": "workspace:*",
"@opentelemetry/api": "^1.9.0",
"consola": "^3.2.3",
"eventemitter3": "^5.0.1",
"hookable": "^5.5.3",
"klona": "^2.0.6",
"nice-grpc": "^2.1.8",
Expand Down
4 changes: 2 additions & 2 deletions packages/indexer/src/indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,10 @@ export async function run<TFilter, TBlock, TRet>(

const sink = sinkArg ?? defaultSink();

sink.on("write", async ({ data }) => {
sink.hook("write", async ({ data }) => {
await indexer.hooks.callHook("sink:write", { data });
});
sink.on("flush", async () => {
sink.hook("flush", async () => {
await indexer.hooks.callHook("sink:flush");
});

Expand Down
8 changes: 4 additions & 4 deletions packages/indexer/src/sink.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import type { Cursor, DataFinality } from "@apibara/protocol";
import { EventEmitter } from "eventemitter3";
import { Hookable } from "hookable";

export interface SinkEvents<TData> {
write({ data }: { data: TData[] }): void;
Expand All @@ -13,7 +13,7 @@ export type SinkWriteArgs<TData> = {
finality: DataFinality;
};

export abstract class Sink<TData> extends EventEmitter<SinkEvents<TData>> {
export abstract class Sink<TData> extends Hookable<SinkEvents<TData>> {
abstract write({
data,
cursor,
Expand All @@ -24,8 +24,8 @@ export abstract class Sink<TData> extends EventEmitter<SinkEvents<TData>> {

export class DefaultSink<TData = unknown> extends Sink<TData> {
async write({ data }: SinkWriteArgs<TData>) {
this.emit("write", { data });
this.emit("flush");
await this.callHook("write", { data });
await this.callHook("flush");
}
}

Expand Down
4 changes: 2 additions & 2 deletions packages/indexer/src/sinks/csv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,13 @@ export class CsvSink<
}

async write({ data, endCursor }: SinkWriteArgs<TData>) {
this.emit("write", { data });
await this.callHook("write", { data });
// adds a "_cursor" property if "cursorColumn" is not specified by user
data = this.processCursorColumn(data, endCursor);
// Insert the data into csv
await this.insertToCSV(data);

this.emit("flush");
await this.callHook("flush");
}

private async insertToCSV(data: TData[]) {
Expand Down
4 changes: 2 additions & 2 deletions packages/indexer/src/sinks/sqlite.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,12 @@ export class SqliteSink<
}

async write({ data, endCursor }: SinkWriteArgs<TData>) {
this.emit("write", { data });
await this.callHook("write", { data });

data = this.processCursorColumn(data, endCursor);
await this.insertJsonArray(data);

this.emit("flush");
await this.callHook("flush");
}

private async insertJsonArray(data: TData[]) {
Expand Down
4 changes: 2 additions & 2 deletions packages/indexer/src/testing/vcr.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ export class VcrSink<TData> extends Sink<TData> {
public result: VcrReplayResult<TData>["outputs"] = [];

async write({ data, endCursor }: SinkWriteArgs<TData>) {
this.emit("write", { data });
await this.callHook("write", { data });
this.result.push({ data, endCursor });
this.emit("flush");
await this.callHook("flush");
}
}

Expand Down
7 changes: 0 additions & 7 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 56b6490

Please sign in to comment.