diff --git a/packages/indexer/src/indexer.test.ts b/packages/indexer/src/indexer.test.ts index 02e1641..df2a655 100644 --- a/packages/indexer/src/indexer.test.ts +++ b/packages/indexer/src/indexer.test.ts @@ -1,16 +1,15 @@ -import type { StreamDataResponse } from "@apibara/protocol"; -import { type MockBlock, MockClient } from "@apibara/protocol/testing"; +import { MockClient } from "@apibara/protocol/testing"; import { describe, expect, it } from "vitest"; import { run } from "./indexer"; -import { vcr } from "./testing"; -import { type MockRet, mockIndexer } from "./testing/indexer"; +import { generateMockMessages, vcr } from "./testing"; +import { type MockRet, getMockIndexer } from "./testing/indexer"; describe("Run Test", () => { const client = new MockClient(messages, [{}]); it("should stream messages", async () => { const sink = vcr(); - await run(client, mockIndexer, sink); + await run(client, getMockIndexer(), sink); expect(sink.result).toMatchInlineSnapshot(` [ @@ -119,13 +118,4 @@ describe("Run Test", () => { }); }); -const messages: StreamDataResponse[] = [...Array(10)].map( - (_, i) => ({ - _tag: "data", - data: { - finality: "accepted", - data: [{ blockNumber: BigInt(5_000_000 + i) }], - endCursor: { orderKey: BigInt(5_000_000 + i) }, - }, - }), -); +const messages = generateMockMessages(); diff --git a/packages/indexer/src/indexer.ts b/packages/indexer/src/indexer.ts index 8e644c6..b9608f0 100644 --- a/packages/indexer/src/indexer.ts +++ b/packages/indexer/src/indexer.ts @@ -44,7 +44,10 @@ export interface IndexerHooks { "handler:after": ({ output }: { output: TRet[] }) => void; "handler:exception": ({ error }: { error: Error }) => void; "sink:write": ({ data }: { data: TRet[] }) => void; - "sink:flush": () => void; + "sink:flush": ({ + endCursor, + finality, + }: { endCursor?: Cursor; finality: DataFinality }) => void; message: ({ message }: { message: StreamDataResponse }) => void; } @@ -123,8 +126,8 @@ export async function run( sink.hook("write", async ({ data }) => { await indexer.hooks.callHook("sink:write", { data }); }); - sink.hook("flush", async () => { - await indexer.hooks.callHook("sink:flush"); + sink.hook("flush", async ({ endCursor, finality }) => { + await indexer.hooks.callHook("sink:flush", { endCursor, finality }); }); const request = indexer.streamConfig.Request.make({ diff --git a/packages/indexer/src/plugins/persistence.test.ts b/packages/indexer/src/plugins/persistence.test.ts new file mode 100644 index 0000000..e10d6f6 --- /dev/null +++ b/packages/indexer/src/plugins/persistence.test.ts @@ -0,0 +1,121 @@ +import type { Cursor } from "@apibara/protocol"; +import { type MockBlock, MockClient } from "@apibara/protocol/testing"; +import { klona } from "klona/full"; +import { open } from "sqlite"; +import sqlite3 from "sqlite3"; +import { describe, expect, it } from "vitest"; +import { run } from "../indexer"; +import { generateMockMessages } from "../testing"; +import { type MockRet, getMockIndexer } from "../testing/indexer"; +import { SqlitePersistence, sqlitePersistence } from "./persistence"; + +describe("Persistence", () => { + const initDB = async () => { + const db = await open({ driver: sqlite3.Database, filename: ":memory:" }); + await SqlitePersistence.initialize(db); + return db; + }; + + it("should handle storing and updating a cursor", async () => { + const db = await initDB(); + const store = new SqlitePersistence(db); + + // Assert there's no data + let latest = await store.get(); + expect(latest).toBeUndefined(); + + // Insert value + const cursor: Cursor = { + orderKey: 5_000_000n, + }; + await store.put(cursor); + + // Check that value was stored + latest = await store.get(); + expect(latest).toEqual({ + orderKey: 5_000_000n, + uniqueKey: null, + }); + + // Update value + const updatedCursor: Cursor = { + orderKey: 5_000_010n, + uniqueKey: "0x1234567890", + }; + await store.put(updatedCursor); + + // Check that value was updated + latest = await store.get(); + expect(latest).toEqual({ + orderKey: 5_000_010n, + uniqueKey: "0x1234567890", + }); + + await db.close(); + }); + + it("should handle storing and deleting a cursor", async () => { + const db = await initDB(); + const store = new SqlitePersistence(db); + + // Assert there's no data + let latest = await store.get(); + expect(latest).toBeUndefined(); + + // Insert value + const cursor: Cursor = { + orderKey: 5_000_000n, + }; + await store.put(cursor); + + // Check that value was stored + latest = await store.get(); + expect(latest).toEqual({ + orderKey: 5_000_000n, + uniqueKey: null, + }); + + // Delete value + await store.del(); + + // Check there's no data + latest = await store.get(); + expect(latest).toBeUndefined(); + + await db.close(); + }); + + it("should work with indexer and store cursor of last message", async () => { + const client = new MockClient(messages, [{}]); + + // biome-ignore lint/complexity/noBannedTypes: + const persistence = sqlitePersistence<{}, MockBlock, MockRet>({ + driver: sqlite3.Database, + filename: "file:memdb1?mode=memory&cache=shared", + }); + + // create mock indexer with persistence plugin + const indexer = klona(getMockIndexer([persistence])); + + await run(client, indexer); + + // open same db again to check last cursor + const db = await open({ + driver: sqlite3.Database, + filename: "file:memdb1?mode=memory&cache=shared", + }); + + const store = new SqlitePersistence(db); + + const latest = await store.get(); + + expect(latest).toMatchInlineSnapshot(` + { + "orderKey": 5000009n, + "uniqueKey": null, + } + `); + }); +}); + +const messages = generateMockMessages(); diff --git a/packages/indexer/src/plugins/persistence.ts b/packages/indexer/src/plugins/persistence.ts new file mode 100644 index 0000000..36d6e8d --- /dev/null +++ b/packages/indexer/src/plugins/persistence.ts @@ -0,0 +1,91 @@ +import type { Cursor } from "@apibara/protocol"; +import { type Database, type ISqlite, open } from "sqlite"; +import { defineIndexerPlugin } from "./config"; + +type SqliteArgs = ISqlite.Config; + +export function sqlitePersistence(args: SqliteArgs) { + return defineIndexerPlugin((indexer) => { + let db: Database; + let store: SqlitePersistence; + + indexer.hooks.hook("run:before", async () => { + db = await open(args); + + await SqlitePersistence.initialize(db); + + store = new SqlitePersistence(db); + }); + + indexer.hooks.hook("connect:before", async ({ request }) => { + const lastCursor = await store.get(); + + if (lastCursor) { + request.startingCursor = lastCursor; + } + }); + + indexer.hooks.hook("sink:flush", async ({ endCursor }) => { + if (endCursor) { + await store.put(endCursor); + } + }); + }); +} + +export class SqlitePersistence { + constructor(private _db: Database) {} + + static async initialize(db: Database) { + await db.exec(` + CREATE TABLE IF NOT EXISTS checkpoints ( + id TEXT NOT NULL PRIMARY KEY, + order_key INTEGER NOT NULL, + unique_key TEXT + ); + `); + } + async get(): Promise { + const row = await this._db.get( + ` + SELECT * + FROM checkpoints + WHERE id = ? + `, + ["default"], + ); + + if (!row) return undefined; + + return { orderKey: BigInt(row.order_key), uniqueKey: row.unique_key }; + } + + async put(cursor: Cursor) { + await this._db.run( + ` + INSERT INTO checkpoints (id, order_key, unique_key) + VALUES (?, ?, ?) + ON CONFLICT(id) DO UPDATE SET + order_key = excluded.order_key, + unique_key = excluded.unique_key + `, + ["default", Number(cursor.orderKey), cursor.uniqueKey], + ); + } + + async del() { + await this._db.run( + ` + DELETE FROM checkpoints + WHERE id = ? + `, + ["default"], + ); + } +} + +export type CheckpointRow = { + id: string; + order_key: number; + unique_key?: `0x${string}`; +}; diff --git a/packages/indexer/src/sink.ts b/packages/indexer/src/sink.ts index b2e2e42..3b77868 100644 --- a/packages/indexer/src/sink.ts +++ b/packages/indexer/src/sink.ts @@ -3,7 +3,10 @@ import { Hookable } from "hookable"; export interface SinkEvents { write({ data }: { data: TData[] }): void; - flush(): void; + flush({ + endCursor, + finality, + }: { endCursor?: Cursor; finality: DataFinality }): void; } export type SinkWriteArgs = { @@ -23,9 +26,9 @@ export abstract class Sink extends Hookable> { } export class DefaultSink extends Sink { - async write({ data }: SinkWriteArgs) { + async write({ data, endCursor, finality }: SinkWriteArgs) { await this.callHook("write", { data }); - await this.callHook("flush"); + await this.callHook("flush", { endCursor, finality }); } } diff --git a/packages/indexer/src/sinks/csv.ts b/packages/indexer/src/sinks/csv.ts index 510201f..25aca49 100644 --- a/packages/indexer/src/sinks/csv.ts +++ b/packages/indexer/src/sinks/csv.ts @@ -33,14 +33,14 @@ export class CsvSink< super(); } - async write({ data, endCursor }: SinkWriteArgs) { + async write({ data, endCursor, finality }: SinkWriteArgs) { await this.callHook("write", { data }); // adds a "_cursor" property if "cursorColumn" is not specified by user data = this.processCursorColumn(data, endCursor); // Insert the data into csv await this.insertToCSV(data); - await this.callHook("flush"); + await this.callHook("flush", { endCursor, finality }); } private async insertToCSV(data: TData[]) { diff --git a/packages/indexer/src/sinks/sqlite.ts b/packages/indexer/src/sinks/sqlite.ts index f1e7930..94869b0 100644 --- a/packages/indexer/src/sinks/sqlite.ts +++ b/packages/indexer/src/sinks/sqlite.ts @@ -34,13 +34,13 @@ export class SqliteSink< this._db = db; } - async write({ data, endCursor }: SinkWriteArgs) { + async write({ data, endCursor, finality }: SinkWriteArgs) { await this.callHook("write", { data }); data = this.processCursorColumn(data, endCursor); await this.insertJsonArray(data); - await this.callHook("flush"); + await this.callHook("flush", { endCursor, finality }); } private async insertJsonArray(data: TData[]) { diff --git a/packages/indexer/src/testing/helper.ts b/packages/indexer/src/testing/helper.ts new file mode 100644 index 0000000..05ad061 --- /dev/null +++ b/packages/indexer/src/testing/helper.ts @@ -0,0 +1,15 @@ +import type { StreamDataResponse } from "@apibara/protocol"; +import type { MockBlock } from "@apibara/protocol/testing"; + +export function generateMockMessages( + count = 10, +): StreamDataResponse[] { + return [...Array(count)].map((_, i) => ({ + _tag: "data", + data: { + finality: "accepted", + data: [{ blockNumber: BigInt(5_000_000 + i) }], + endCursor: { orderKey: BigInt(5_000_000 + i) }, + }, + })); +} diff --git a/packages/indexer/src/testing/index.ts b/packages/indexer/src/testing/index.ts index 1d8f5aa..9dececf 100644 --- a/packages/indexer/src/testing/index.ts +++ b/packages/indexer/src/testing/index.ts @@ -1,2 +1,3 @@ export * from "./setup"; export * from "./vcr"; +export * from "./helper"; diff --git a/packages/indexer/src/testing/indexer.ts b/packages/indexer/src/testing/indexer.ts index fc00e22..f92d163 100644 --- a/packages/indexer/src/testing/indexer.ts +++ b/packages/indexer/src/testing/indexer.ts @@ -1,18 +1,24 @@ -import { MockStream } from "@apibara/protocol/testing"; +import { type MockBlock, MockStream } from "@apibara/protocol/testing"; import { createIndexer, defineIndexer } from "../indexer"; +import type { IndexerPlugin } from "../plugins"; -export const mockIndexer = createIndexer( - defineIndexer(MockStream)({ - streamUrl: "https://sepolia.ethereum.a5a.ch", - finality: "accepted", - filter: {}, - transform({ block: { blockNumber } }) { - if (!blockNumber) return []; +export const getMockIndexer = ( + // biome-ignore lint/complexity/noBannedTypes: + plugins: ReadonlyArray> = [], +) => + createIndexer( + defineIndexer(MockStream)({ + streamUrl: "https://sepolia.ethereum.a5a.ch", + finality: "accepted", + filter: {}, + transform({ block: { blockNumber } }) { + if (!blockNumber) return []; - return [{ blockNumber: Number(blockNumber) }]; - }, - }), -); + return [{ blockNumber: Number(blockNumber) }]; + }, + plugins, + }), + ); export type MockRet = { blockNumber: number; diff --git a/packages/indexer/src/testing/vcr.ts b/packages/indexer/src/testing/vcr.ts index 1f718c8..48c212f 100644 --- a/packages/indexer/src/testing/vcr.ts +++ b/packages/indexer/src/testing/vcr.ts @@ -4,10 +4,10 @@ import type { VcrReplayResult } from "../vcr"; export class VcrSink extends Sink { public result: VcrReplayResult["outputs"] = []; - async write({ data, endCursor }: SinkWriteArgs) { + async write({ data, endCursor, finality }: SinkWriteArgs) { await this.callHook("write", { data }); this.result.push({ data, endCursor }); - await this.callHook("flush"); + await this.callHook("flush", { endCursor, finality }); } }