Skip to content

Commit

Permalink
indexer: add persistence plugin (#88)
Browse files Browse the repository at this point in the history
add persistence plugin to persist the indexer’s state between restarts.
  • Loading branch information
fracek authored Jul 1, 2024
2 parents 56b6490 + 9c93e23 commit b07fa77
Show file tree
Hide file tree
Showing 11 changed files with 269 additions and 39 deletions.
20 changes: 5 additions & 15 deletions packages/indexer/src/indexer.test.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
import type { StreamDataResponse } from "@apibara/protocol";
import { type MockBlock, MockClient } from "@apibara/protocol/testing";
import { MockClient } from "@apibara/protocol/testing";
import { describe, expect, it } from "vitest";
import { run } from "./indexer";
import { vcr } from "./testing";
import { type MockRet, mockIndexer } from "./testing/indexer";
import { generateMockMessages, vcr } from "./testing";
import { type MockRet, getMockIndexer } from "./testing/indexer";

describe("Run Test", () => {
const client = new MockClient(messages, [{}]);

it("should stream messages", async () => {
const sink = vcr<MockRet>();
await run(client, mockIndexer, sink);
await run(client, getMockIndexer(), sink);

expect(sink.result).toMatchInlineSnapshot(`
[
Expand Down Expand Up @@ -119,13 +118,4 @@ describe("Run Test", () => {
});
});

const messages: StreamDataResponse<MockBlock>[] = [...Array(10)].map(
(_, i) => ({
_tag: "data",
data: {
finality: "accepted",
data: [{ blockNumber: BigInt(5_000_000 + i) }],
endCursor: { orderKey: BigInt(5_000_000 + i) },
},
}),
);
const messages = generateMockMessages();
9 changes: 6 additions & 3 deletions packages/indexer/src/indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,10 @@ export interface IndexerHooks<TFilter, TBlock, TRet> {
"handler:after": ({ output }: { output: TRet[] }) => void;
"handler:exception": ({ error }: { error: Error }) => void;
"sink:write": ({ data }: { data: TRet[] }) => void;
"sink:flush": () => void;
"sink:flush": ({
endCursor,
finality,
}: { endCursor?: Cursor; finality: DataFinality }) => void;
message: ({ message }: { message: StreamDataResponse<TBlock> }) => void;
}

Expand Down Expand Up @@ -123,8 +126,8 @@ export async function run<TFilter, TBlock, TRet>(
sink.hook("write", async ({ data }) => {
await indexer.hooks.callHook("sink:write", { data });
});
sink.hook("flush", async () => {
await indexer.hooks.callHook("sink:flush");
sink.hook("flush", async ({ endCursor, finality }) => {
await indexer.hooks.callHook("sink:flush", { endCursor, finality });
});

const request = indexer.streamConfig.Request.make({
Expand Down
121 changes: 121 additions & 0 deletions packages/indexer/src/plugins/persistence.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
import type { Cursor } from "@apibara/protocol";
import { type MockBlock, MockClient } from "@apibara/protocol/testing";
import { klona } from "klona/full";
import { open } from "sqlite";
import sqlite3 from "sqlite3";
import { describe, expect, it } from "vitest";
import { run } from "../indexer";
import { generateMockMessages } from "../testing";
import { type MockRet, getMockIndexer } from "../testing/indexer";
import { SqlitePersistence, sqlitePersistence } from "./persistence";

describe("Persistence", () => {
const initDB = async () => {
const db = await open({ driver: sqlite3.Database, filename: ":memory:" });
await SqlitePersistence.initialize(db);
return db;
};

it("should handle storing and updating a cursor", async () => {
const db = await initDB();
const store = new SqlitePersistence(db);

// Assert there's no data
let latest = await store.get();
expect(latest).toBeUndefined();

// Insert value
const cursor: Cursor = {
orderKey: 5_000_000n,
};
await store.put(cursor);

// Check that value was stored
latest = await store.get();
expect(latest).toEqual({
orderKey: 5_000_000n,
uniqueKey: null,
});

// Update value
const updatedCursor: Cursor = {
orderKey: 5_000_010n,
uniqueKey: "0x1234567890",
};
await store.put(updatedCursor);

// Check that value was updated
latest = await store.get();
expect(latest).toEqual({
orderKey: 5_000_010n,
uniqueKey: "0x1234567890",
});

await db.close();
});

it("should handle storing and deleting a cursor", async () => {
const db = await initDB();
const store = new SqlitePersistence(db);

// Assert there's no data
let latest = await store.get();
expect(latest).toBeUndefined();

// Insert value
const cursor: Cursor = {
orderKey: 5_000_000n,
};
await store.put(cursor);

// Check that value was stored
latest = await store.get();
expect(latest).toEqual({
orderKey: 5_000_000n,
uniqueKey: null,
});

// Delete value
await store.del();

// Check there's no data
latest = await store.get();
expect(latest).toBeUndefined();

await db.close();
});

it("should work with indexer and store cursor of last message", async () => {
const client = new MockClient(messages, [{}]);

// biome-ignore lint/complexity/noBannedTypes: <explanation>
const persistence = sqlitePersistence<{}, MockBlock, MockRet>({
driver: sqlite3.Database,
filename: "file:memdb1?mode=memory&cache=shared",
});

// create mock indexer with persistence plugin
const indexer = klona(getMockIndexer([persistence]));

await run(client, indexer);

// open same db again to check last cursor
const db = await open({
driver: sqlite3.Database,
filename: "file:memdb1?mode=memory&cache=shared",
});

const store = new SqlitePersistence(db);

const latest = await store.get();

expect(latest).toMatchInlineSnapshot(`
{
"orderKey": 5000009n,
"uniqueKey": null,
}
`);
});
});

const messages = generateMockMessages();
91 changes: 91 additions & 0 deletions packages/indexer/src/plugins/persistence.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
import type { Cursor } from "@apibara/protocol";
import { type Database, type ISqlite, open } from "sqlite";
import { defineIndexerPlugin } from "./config";

type SqliteArgs = ISqlite.Config;

export function sqlitePersistence<TFilter, TBlock, TRet>(args: SqliteArgs) {
return defineIndexerPlugin<TFilter, TBlock, TRet>((indexer) => {
let db: Database;
let store: SqlitePersistence;

indexer.hooks.hook("run:before", async () => {
db = await open(args);

await SqlitePersistence.initialize(db);

store = new SqlitePersistence(db);
});

indexer.hooks.hook("connect:before", async ({ request }) => {
const lastCursor = await store.get();

if (lastCursor) {
request.startingCursor = lastCursor;
}
});

indexer.hooks.hook("sink:flush", async ({ endCursor }) => {
if (endCursor) {
await store.put(endCursor);
}
});
});
}

export class SqlitePersistence {
constructor(private _db: Database) {}

static async initialize(db: Database) {
await db.exec(`
CREATE TABLE IF NOT EXISTS checkpoints (
id TEXT NOT NULL PRIMARY KEY,
order_key INTEGER NOT NULL,
unique_key TEXT
);
`);
}
async get(): Promise<Cursor | undefined> {
const row = await this._db.get<CheckpointRow>(
`
SELECT *
FROM checkpoints
WHERE id = ?
`,
["default"],
);

if (!row) return undefined;

return { orderKey: BigInt(row.order_key), uniqueKey: row.unique_key };
}

async put(cursor: Cursor) {
await this._db.run(
`
INSERT INTO checkpoints (id, order_key, unique_key)
VALUES (?, ?, ?)
ON CONFLICT(id) DO UPDATE SET
order_key = excluded.order_key,
unique_key = excluded.unique_key
`,
["default", Number(cursor.orderKey), cursor.uniqueKey],
);
}

async del() {
await this._db.run(
`
DELETE FROM checkpoints
WHERE id = ?
`,
["default"],
);
}
}

export type CheckpointRow = {
id: string;
order_key: number;
unique_key?: `0x${string}`;
};
9 changes: 6 additions & 3 deletions packages/indexer/src/sink.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ import { Hookable } from "hookable";

export interface SinkEvents<TData> {
write({ data }: { data: TData[] }): void;
flush(): void;
flush({
endCursor,
finality,
}: { endCursor?: Cursor; finality: DataFinality }): void;
}

export type SinkWriteArgs<TData> = {
Expand All @@ -23,9 +26,9 @@ export abstract class Sink<TData> extends Hookable<SinkEvents<TData>> {
}

export class DefaultSink<TData = unknown> extends Sink<TData> {
async write({ data }: SinkWriteArgs<TData>) {
async write({ data, endCursor, finality }: SinkWriteArgs<TData>) {
await this.callHook("write", { data });
await this.callHook("flush");
await this.callHook("flush", { endCursor, finality });
}
}

Expand Down
4 changes: 2 additions & 2 deletions packages/indexer/src/sinks/csv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,14 @@ export class CsvSink<
super();
}

async write({ data, endCursor }: SinkWriteArgs<TData>) {
async write({ data, endCursor, finality }: SinkWriteArgs<TData>) {
await this.callHook("write", { data });
// adds a "_cursor" property if "cursorColumn" is not specified by user
data = this.processCursorColumn(data, endCursor);
// Insert the data into csv
await this.insertToCSV(data);

await this.callHook("flush");
await this.callHook("flush", { endCursor, finality });
}

private async insertToCSV(data: TData[]) {
Expand Down
4 changes: 2 additions & 2 deletions packages/indexer/src/sinks/sqlite.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,13 @@ export class SqliteSink<
this._db = db;
}

async write({ data, endCursor }: SinkWriteArgs<TData>) {
async write({ data, endCursor, finality }: SinkWriteArgs<TData>) {
await this.callHook("write", { data });

data = this.processCursorColumn(data, endCursor);
await this.insertJsonArray(data);

await this.callHook("flush");
await this.callHook("flush", { endCursor, finality });
}

private async insertJsonArray(data: TData[]) {
Expand Down
15 changes: 15 additions & 0 deletions packages/indexer/src/testing/helper.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import type { StreamDataResponse } from "@apibara/protocol";
import type { MockBlock } from "@apibara/protocol/testing";

export function generateMockMessages(
count = 10,
): StreamDataResponse<MockBlock>[] {
return [...Array(count)].map((_, i) => ({
_tag: "data",
data: {
finality: "accepted",
data: [{ blockNumber: BigInt(5_000_000 + i) }],
endCursor: { orderKey: BigInt(5_000_000 + i) },
},
}));
}
1 change: 1 addition & 0 deletions packages/indexer/src/testing/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
export * from "./setup";
export * from "./vcr";
export * from "./helper";
30 changes: 18 additions & 12 deletions packages/indexer/src/testing/indexer.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,24 @@
import { MockStream } from "@apibara/protocol/testing";
import { type MockBlock, MockStream } from "@apibara/protocol/testing";
import { createIndexer, defineIndexer } from "../indexer";
import type { IndexerPlugin } from "../plugins";

export const mockIndexer = createIndexer(
defineIndexer(MockStream)({
streamUrl: "https://sepolia.ethereum.a5a.ch",
finality: "accepted",
filter: {},
transform({ block: { blockNumber } }) {
if (!blockNumber) return [];
export const getMockIndexer = (
// biome-ignore lint/complexity/noBannedTypes: <explanation>
plugins: ReadonlyArray<IndexerPlugin<{}, MockBlock, MockRet>> = [],
) =>
createIndexer(
defineIndexer(MockStream)({
streamUrl: "https://sepolia.ethereum.a5a.ch",
finality: "accepted",
filter: {},
transform({ block: { blockNumber } }) {
if (!blockNumber) return [];

return [{ blockNumber: Number(blockNumber) }];
},
}),
);
return [{ blockNumber: Number(blockNumber) }];
},
plugins,
}),
);

export type MockRet = {
blockNumber: number;
Expand Down
4 changes: 2 additions & 2 deletions packages/indexer/src/testing/vcr.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ import type { VcrReplayResult } from "../vcr";
export class VcrSink<TData> extends Sink<TData> {
public result: VcrReplayResult<TData>["outputs"] = [];

async write({ data, endCursor }: SinkWriteArgs<TData>) {
async write({ data, endCursor, finality }: SinkWriteArgs<TData>) {
await this.callHook("write", { data });
this.result.push({ data, endCursor });
await this.callHook("flush");
await this.callHook("flush", { endCursor, finality });
}
}

Expand Down

0 comments on commit b07fa77

Please sign in to comment.