diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 5661f71..e66e7b7 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -20,9 +20,9 @@ jobs: uses: actions/setup-node@v1 with: node-version: 16 - - uses: pnpm/action-setup@v2.2.2 + - uses: pnpm/action-setup@v4 with: - version: 7.1.7 + version: 8.15.5 - name: Install Protoc uses: arduino/setup-protoc@v1 with: @@ -38,4 +38,4 @@ jobs: - name: Run build run: pnpm build - name: Run test - run: CI=true pnpm test + run: pnpm test:ci diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 0796965..daab05a 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -15,9 +15,9 @@ jobs: uses: actions/setup-node@v1 with: node-version: 16 - - uses: pnpm/action-setup@v2.2.2 + - uses: pnpm/action-setup@v4 with: - version: 7.1.7 + version: 8.15.5 - name: Install Protoc uses: arduino/setup-protoc@v1 with: diff --git a/examples/indexer/src/indexer.ts b/examples/indexer/src/indexer.ts index 82ed01b..32260fd 100644 --- a/examples/indexer/src/indexer.ts +++ b/examples/indexer/src/indexer.ts @@ -30,7 +30,7 @@ export function createIndexerConfig(streamUrl: string) { }, ], }, - transform({ block: { header, logs, transactions } }) { + async transform({ block: { header, logs, transactions } }) { const ctx = useIndexerContext(); ctx.counter += 1; diff --git a/package.json b/package.json index 46d2c12..f27ad66 100644 --- a/package.json +++ b/package.json @@ -7,6 +7,7 @@ "build": "turbo run build --parallel", "typecheck": "turbo run typecheck --parallel", "test": "turbo run test", + "test:ci": "turbo run test:ci", "lint": "turbo run lint --parallel", "lint:fix": "turbo run lint:fix --parallel", "format": "turbo run format --parallel", diff --git a/packages/beaconchain/package.json b/packages/beaconchain/package.json index e6910e0..9f2ec34 100644 --- a/packages/beaconchain/package.json +++ b/packages/beaconchain/package.json @@ -22,6 +22,7 @@ "build:proto": "buf generate proto", "typecheck": "tsc --noEmit", "test": "vitest", + "test:ci": "vitest run", "lint": "biome check .", "lint:fix": "pnpm lint --write", "format": "biome format . --write" diff --git a/packages/evm/package.json b/packages/evm/package.json index 18635ba..5471435 100644 --- a/packages/evm/package.json +++ b/packages/evm/package.json @@ -22,6 +22,7 @@ "build:proto": "buf generate proto", "typecheck": "tsc --noEmit", "test": "vitest", + "test:ci": "vitest run", "lint": "biome check .", "lint:fix": "pnpm lint --write", "format": "biome format . --write" diff --git a/packages/indexer/package.json b/packages/indexer/package.json index 7cf1c97..2a6e51a 100644 --- a/packages/indexer/package.json +++ b/packages/indexer/package.json @@ -44,6 +44,7 @@ "typecheck": "tsc --noEmit", "lint:fix": "pnpm lint --write", "test": "vitest", + "test:ci": "vitest run", "format": "biome format . --write" }, "devDependencies": { diff --git a/packages/indexer/src/indexer.test.ts b/packages/indexer/src/indexer.test.ts index e9f0396..e532cd1 100644 --- a/packages/indexer/src/indexer.test.ts +++ b/packages/indexer/src/indexer.test.ts @@ -1,13 +1,38 @@ -import { MockClient } from "@apibara/protocol/testing"; -import { describe, expect, it } from "vitest"; +import fs from "node:fs/promises"; +import { + type MockBlock, + MockClient, + type MockFilter, +} from "@apibara/protocol/testing"; +import { klona } from "klona/full"; +import { open } from "sqlite"; +import sqlite3 from "sqlite3"; +import { afterEach, beforeEach, describe, expect, it } from "vitest"; import { run } from "./indexer"; +import { SqlitePersistence, sqlitePersistence } from "./plugins/persistence"; import { generateMockMessages, vcr } from "./testing"; import { type MockRet, getMockIndexer } from "./testing/indexer"; describe("Run Test", () => { - const client = new MockClient(generateMockMessages(), [{}]); + async function cleanup() { + try { + await fs.unlink("file:memdb_indexer?mode=memory&cache=shared"); + } catch {} + } + + beforeEach(async () => { + await cleanup(); + }); + + afterEach(async () => { + await cleanup(); + }); it("should stream messages", async () => { + const client = new MockClient((request, options) => { + return generateMockMessages(); + }); + const sink = vcr(); await run(client, getMockIndexer(), sink); @@ -116,4 +141,344 @@ describe("Run Test", () => { ] `); }); + + it("factory mode: indexer should merge filters and restart when needed", async () => { + const client = new MockClient((request, options) => { + const [_factoryFilter, mainFilter] = request.filter; + + if (Object.keys(mainFilter).length === 0) { + expect(request.startingCursor?.orderKey).toEqual(100n); + + return [ + { + _tag: "data", + data: { + finality: "accepted", + cursor: { orderKey: 100n }, + endCursor: { orderKey: 101n }, + data: [null, null], + }, + }, + { + _tag: "data", + data: { + finality: "accepted", + cursor: { orderKey: 101n }, + endCursor: { orderKey: 102n }, + data: [null, null], + }, + }, + { + _tag: "data", + data: { + finality: "accepted", + cursor: { orderKey: 102n }, + endCursor: { orderKey: 103n }, + data: [{ data: "B" }, null], + }, + }, + ]; + } + + if (mainFilter.filter === "B") { + expect(request.startingCursor?.orderKey).toEqual(102n); + + return [ + { + _tag: "data", + data: { + finality: "accepted", + cursor: { orderKey: 102n }, + endCursor: { orderKey: 103n }, + data: [{ data: "B" }, { data: "103B" }], + }, + }, + { + _tag: "data", + data: { + finality: "accepted", + cursor: { orderKey: 103n }, + endCursor: { orderKey: 104n }, + data: [null, { data: "104B" }], + }, + }, + { + _tag: "data", + data: { + finality: "accepted", + cursor: { orderKey: 104n }, + endCursor: { orderKey: 105n }, + data: [null, { data: "105B" }], + }, + }, + { + _tag: "data", + data: { + finality: "accepted", + cursor: { orderKey: 105n }, + endCursor: { orderKey: 106n }, + data: [{ data: "C" }, { data: "106B" }], + }, + }, + ]; + } + + if (mainFilter.filter === "BC") { + expect(request.startingCursor?.orderKey).toEqual(105n); + + return [ + { + _tag: "data", + data: { + finality: "accepted", + cursor: { orderKey: 105n }, + endCursor: { orderKey: 106n }, + data: [{ data: "C" }, { data: "106BC" }], + }, + }, + { + _tag: "data", + data: { + finality: "accepted", + cursor: { orderKey: 106n }, + endCursor: { orderKey: 107n }, + data: [null, { data: "107BC" }], + }, + }, + { + _tag: "data", + data: { + finality: "accepted", + cursor: { orderKey: 107n }, + endCursor: { orderKey: 108n }, + data: [null, { data: "108BC" }], + }, + }, + ]; + } + + return []; + }); + + const persistence = sqlitePersistence({ + driver: sqlite3.Database, + filename: "file:memdb_indexer?mode=memory&cache=shared", + }); + + // create mock indexer with persistence plugin + const indexer = klona(getMockIndexer([persistence])); + indexer.options.startingCursor = { orderKey: 100n }; + indexer.options.factory = async (block) => { + if (block.data === "B") { + return { filter: { filter: "B" } }; + } + + if (block.data === "C") { + return { filter: { filter: "C" } }; + } + + return {}; + }; + + const sink = vcr(); + + await run(client, indexer, sink); + + // open same db again to check last cursor + const db = await open({ + driver: sqlite3.Database, + filename: "file:memdb_indexer?mode=memory&cache=shared", + }); + + const store = new SqlitePersistence(db); + + const latest = await store.get(); + + expect(latest.cursor?.orderKey).toEqual(108n); + expect(latest.filter?.filter).toEqual("BC"); + + expect(sink.result).toMatchInlineSnapshot(` + [ + { + "data": [ + { + "data": "103B", + }, + ], + "endCursor": { + "orderKey": 103n, + }, + }, + { + "data": [ + { + "data": "104B", + }, + ], + "endCursor": { + "orderKey": 104n, + }, + }, + { + "data": [ + { + "data": "105B", + }, + ], + "endCursor": { + "orderKey": 105n, + }, + }, + { + "data": [ + { + "data": "106BC", + }, + ], + "endCursor": { + "orderKey": 106n, + }, + }, + { + "data": [ + { + "data": "107BC", + }, + ], + "endCursor": { + "orderKey": 107n, + }, + }, + { + "data": [ + { + "data": "108BC", + }, + ], + "endCursor": { + "orderKey": 108n, + }, + }, + ] + `); + }); + + it("factory mode: last cursor should persist when error is thrown in indexer", async () => { + const client = new MockClient((request, options) => { + const [_factoryFilter, mainFilter] = request.filter; + + if (Object.keys(mainFilter).length === 0) { + expect(request.startingCursor?.orderKey).toEqual(100n); + + return [ + { + _tag: "data", + data: { + finality: "accepted", + cursor: { orderKey: 100n }, + endCursor: { orderKey: 101n }, + data: [null, null], + }, + }, + { + _tag: "data", + data: { + finality: "accepted", + cursor: { orderKey: 101n }, + endCursor: { orderKey: 102n }, + data: [null, null], + }, + }, + { + _tag: "data", + data: { + finality: "accepted", + cursor: { orderKey: 102n }, + endCursor: { orderKey: 103n }, + data: [{ data: "B" }, null], + }, + }, + Error("this error should not occurr!"), + ]; + } + + if (mainFilter.filter === "B") { + expect(request.startingCursor?.orderKey).toEqual(102n); + + return [ + Error("this error should occurr!"), + { + _tag: "data", + data: { + finality: "accepted", + cursor: { orderKey: 103n }, + endCursor: { orderKey: 104n }, + data: [null, { data: "104B" }], + }, + }, + { + _tag: "data", + data: { + finality: "accepted", + cursor: { orderKey: 104n }, + endCursor: { orderKey: 105n }, + data: [null, { data: "105B" }], + }, + }, + { + _tag: "data", + data: { + finality: "accepted", + cursor: { orderKey: 105n }, + endCursor: { orderKey: 106n }, + data: [{ data: "C" }, { data: "106B" }], + }, + }, + ]; + } + + return []; + }); + + const persistence = sqlitePersistence({ + driver: sqlite3.Database, + filename: "file:memdb_indexer?mode=memory&cache=shared", + }); + + // create mock indexer with persistence plugin + const indexer = klona(getMockIndexer([persistence])); + indexer.options.startingCursor = { orderKey: 100n }; + indexer.options.factory = async (block) => { + if (block.data === "B") { + return { filter: { filter: "B" } }; + } + + if (block.data === "C") { + return { filter: { filter: "C" } }; + } + + return {}; + }; + + const sink = vcr(); + + await expect(() => run(client, indexer, sink)).rejects.toThrowError( + "this error should occurr!", + ); + + // open same db again to check last cursor + const db = await open({ + driver: sqlite3.Database, + filename: "file:memdb_indexer?mode=memory&cache=shared", + }); + + const store = new SqlitePersistence(db); + + const latest = await store.get(); + + expect(latest.cursor?.orderKey).toEqual(103n); + expect(latest.filter?.filter).toEqual("B"); + + expect(sink.result).toMatchInlineSnapshot("[]"); + }); }); diff --git a/packages/indexer/src/indexer.ts b/packages/indexer/src/indexer.ts index d7684e9..51d7770 100644 --- a/packages/indexer/src/indexer.ts +++ b/packages/indexer/src/indexer.ts @@ -32,6 +32,13 @@ export interface IndexerHooks { options: StreamDataOptions; }) => void; "connect:after": () => void; + "connect:factory": ({ + request, + endCursor, + }: { + request: StreamDataRequest; + endCursor?: Cursor; + }) => void; "handler:before": ({ block, finality, @@ -56,13 +63,13 @@ export interface IndexerConfig { filter: TFilter; finality?: DataFinality; startingCursor?: Cursor; - factory?: (block: TBlock) => { filter?: TFilter; data?: TRet[] }; + factory?: (block: TBlock) => Promise<{ filter?: TFilter; data?: TRet[] }>; transform: (args: { block: TBlock; cursor?: Cursor | undefined; endCursor?: Cursor | undefined; finality: DataFinality; - }) => TRet[]; + }) => Promise; hooks?: NestedHooks>; plugins?: ReadonlyArray>; debug?: boolean; @@ -130,85 +137,186 @@ export async function run( await indexer.hooks.callHook("sink:flush", { endCursor, finality }); }); + // Check if the it's factory mode or not + const isFactoryMode = indexer.options.factory !== undefined; + + // if factory mode we add a empty filter const request = indexer.streamConfig.Request.make({ - filter: [indexer.options.filter], + filter: isFactoryMode + ? [indexer.options.filter, {} as TFilter] + : [indexer.options.filter], finality: indexer.options.finality, startingCursor: indexer.options.startingCursor, }); const options: StreamDataOptions = {}; + // TODO persistence plugin filter await indexer.hooks.callHook("connect:before", { request, options }); - const stream = client.streamData(request, options); + // store main filter, so later it can be merged + let mainFilter: TFilter; + if (isFactoryMode) { + mainFilter = request.filter[1]; + } + + // create stream + let stream = client.streamData(request, options); await indexer.hooks.callHook("connect:after"); - for await (const message of stream) { - await indexer.hooks.callHook("message", { message }); - - switch (message._tag) { - case "data": { - await tracer.startActiveSpan("message data", async (span) => { - const blocks = message.data.data; - const { cursor, endCursor, finality } = message.data; - if (blocks.length !== 1) { - // Ask me about this. - throw new Error("expected exactly one block"); - } - - const block = blocks[0]; - // Until we implement factory mode, block should never be null. - assert(block !== null); - - const output = await tracer.startActiveSpan( - "handler", - async (span) => { - await indexer.hooks.callHook("handler:before", { - block, - endCursor, - finality, - }); + // on state -> + // normal: iterate as usual + // recover: reconnect after updating filter + let state: { _tag: "normal" } | { _tag: "recover"; data?: TRet[] } = { + _tag: "normal", + }; + + while (true) { + for await (const message of stream) { + await indexer.hooks.callHook("message", { message }); + + switch (message._tag) { + case "data": { + await tracer.startActiveSpan("message data", async (span) => { + const blocks = message.data.data; + const { cursor, endCursor, finality } = message.data; + + let block: TBlock | null; + + // combine output of factory and transform function + const output: TRet[] = []; + + // when factory mode + if (isFactoryMode) { + assert(indexer.options.factory !== undefined); + + const [factoryBlock, mainBlock] = blocks; + + block = mainBlock; + + if (state._tag === "normal" && factoryBlock !== null) { + const { data, filter } = + await indexer.options.factory(factoryBlock); + + // write returned data from factory function if filter is not defined + if (!filter) { + output.push(...(data ?? [])); + } else { + // when filter is defined + // merge old and new filters + mainFilter = indexer.streamConfig.mergeFilter( + mainFilter, + filter, + ); + + // create request with new filters + const request = indexer.streamConfig.Request.make({ + filter: [indexer.options.filter, mainFilter], + finality: indexer.options.finality, + startingCursor: cursor, + }); + + await indexer.hooks.callHook("connect:factory", { + request, + endCursor, + }); - let output: TRet[]; + // create new stream with new request + stream = client.streamData(request, options); - try { - output = await indexer.options.transform({ + // change state to recover mode + state = { + _tag: "recover", + data, + }; + + return; + } + } + // after restart when state in recover mode + else if (state._tag === "recover") { + // we write data to output + output.push(...(state.data ?? [])); + // change state back to normal to avoid infinite loop + state = { _tag: "normal" }; + } + } else { + // when not in factory mode + block = blocks[0]; + } + + // if block is not null + if (block) { + await tracer.startActiveSpan("handler", async (span) => { + await indexer.hooks.callHook("handler:before", { block, - cursor, endCursor, finality, }); - } catch (error) { - assert(error instanceof Error); - await indexer.hooks.callHook("handler:exception", { error }); - throw error; - } - await indexer.hooks.callHook("handler:after", { output }); + try { + const transformOutput = await indexer.options.transform({ + block, + cursor, + endCursor, + finality, + }); - span.end(); - return output; - }, - ); + // write transformed data to output + output.push(...transformOutput); - await tracer.startActiveSpan("sink write", async (span) => { - await sink.write({ data: output, cursor, endCursor, finality }); + await indexer.hooks.callHook("handler:after", { output }); + } catch (error) { + assert(error instanceof Error); + await indexer.hooks.callHook("handler:exception", { + error, + }); + throw error; + } + span.end(); + }); + } + + // if output has data, write it to sink + if (output.length > 0) { + await tracer.startActiveSpan("sink write", async (span) => { + await sink.write({ + data: output, + cursor, + endCursor, + finality, + }); + + span.end(); + }); + } span.end(); }); + break; + } + default: { + consola.warn("unexpected message", message); + throw new Error("not implemented"); + } + } - span.end(); - }); + // if stream needs a restart + // break out of the current stream iterator + if (state._tag !== "normal") { break; } - default: { - consola.warn("unexpected message", message); - throw new Error("not implemented"); - } } - } - await indexer.hooks.callHook("run:after"); + // when restarting stream we continue while loop again + if (state._tag !== "normal") { + continue; + } + + await indexer.hooks.callHook("run:after"); + + break; + } }); } diff --git a/packages/indexer/src/plugins/persistence.test.ts b/packages/indexer/src/plugins/persistence.test.ts index f364064..5af2dfd 100644 --- a/packages/indexer/src/plugins/persistence.test.ts +++ b/packages/indexer/src/plugins/persistence.test.ts @@ -1,3 +1,4 @@ +import fs from "node:fs/promises"; import type { Cursor } from "@apibara/protocol"; import { type MockBlock, @@ -7,7 +8,7 @@ import { import { klona } from "klona/full"; import { open } from "sqlite"; import sqlite3 from "sqlite3"; -import { describe, expect, it } from "vitest"; +import { afterEach, describe, expect, it } from "vitest"; import { run } from "../indexer"; import { generateMockMessages } from "../testing"; import { type MockRet, getMockIndexer } from "../testing/indexer"; @@ -20,77 +21,104 @@ describe("Persistence", () => { return db; }; - it("should handle storing and updating a cursor", async () => { + it("should handle storing and updating a cursor & filter", async () => { const db = await initDB(); - const store = new SqlitePersistence(db); + const store = new SqlitePersistence(db); // Assert there's no data let latest = await store.get(); - expect(latest).toBeUndefined(); + + expect(latest.cursor).toBeUndefined(); + expect(latest.filter).toBeUndefined(); // Insert value const cursor: Cursor = { orderKey: 5_000_000n, }; - await store.put(cursor); + const filter: MockFilter = { + filter: "X", + }; + await store.put({ cursor, filter }); // Check that value was stored latest = await store.get(); - expect(latest).toEqual({ + + expect(latest.cursor).toEqual({ orderKey: 5_000_000n, uniqueKey: null, }); + expect(latest.filter).toEqual({ + filter: "X", + }); // Update value const updatedCursor: Cursor = { orderKey: 5_000_010n, uniqueKey: "0x1234567890", }; - await store.put(updatedCursor); + const updatedFilter: MockFilter = { + filter: "Y", + }; + + await store.put({ cursor: updatedCursor, filter: updatedFilter }); // Check that value was updated latest = await store.get(); - expect(latest).toEqual({ + + expect(latest.cursor).toEqual({ orderKey: 5_000_010n, uniqueKey: "0x1234567890", }); + expect(latest.filter).toEqual({ + filter: "Y", + }); await db.close(); }); - it("should handle storing and deleting a cursor", async () => { + it("should handle storing and deleting a cursor & filter", async () => { const db = await initDB(); const store = new SqlitePersistence(db); // Assert there's no data let latest = await store.get(); - expect(latest).toBeUndefined(); + expect(latest.cursor).toBeUndefined(); + expect(latest.filter).toBeUndefined(); // Insert value const cursor: Cursor = { orderKey: 5_000_000n, }; - await store.put(cursor); + const filter: MockFilter = { + filter: "X", + }; + await store.put({ cursor, filter }); // Check that value was stored latest = await store.get(); - expect(latest).toEqual({ + expect(latest.cursor).toEqual({ orderKey: 5_000_000n, uniqueKey: null, }); + expect(latest.filter).toEqual({ + filter: "X", + }); // Delete value await store.del(); // Check there's no data latest = await store.get(); - expect(latest).toBeUndefined(); + expect(latest.cursor).toBeUndefined(); + expect(latest.filter).toBeUndefined(); await db.close(); }); it("should work with indexer and store cursor of last message", async () => { - const client = new MockClient(messages, [{}]); + const client = new MockClient((request, options) => { + return messages; + }); const persistence = sqlitePersistence({ driver: sqlite3.Database, @@ -108,17 +136,24 @@ describe("Persistence", () => { filename: "file:memdb1?mode=memory&cache=shared", }); - const store = new SqlitePersistence(db); + const store = new SqlitePersistence(db); const latest = await store.get(); - expect(latest).toMatchInlineSnapshot(` + expect(latest.cursor).toMatchInlineSnapshot(` { "orderKey": 5000009n, "uniqueKey": null, } `); }); + + // Cleanup + afterEach(async () => { + try { + await fs.unlink("file:memdb1?mode=memory&cache=shared"); + } catch {} + }); }); const messages = generateMockMessages(); diff --git a/packages/indexer/src/plugins/persistence.ts b/packages/indexer/src/plugins/persistence.ts index 36d6e8d..9f3c292 100644 --- a/packages/indexer/src/plugins/persistence.ts +++ b/packages/indexer/src/plugins/persistence.ts @@ -1,5 +1,6 @@ import type { Cursor } from "@apibara/protocol"; import { type Database, type ISqlite, open } from "sqlite"; +import { deserialize, serialize } from "../vcr"; import { defineIndexerPlugin } from "./config"; type SqliteArgs = ISqlite.Config; @@ -7,7 +8,7 @@ type SqliteArgs = ISqlite.Config; export function sqlitePersistence(args: SqliteArgs) { return defineIndexerPlugin((indexer) => { let db: Database; - let store: SqlitePersistence; + let store: SqlitePersistence; indexer.hooks.hook("run:before", async () => { db = await open(args); @@ -18,22 +19,32 @@ export function sqlitePersistence(args: SqliteArgs) { }); indexer.hooks.hook("connect:before", async ({ request }) => { - const lastCursor = await store.get(); + const { cursor, filter } = await store.get(); - if (lastCursor) { - request.startingCursor = lastCursor; + if (cursor) { + request.startingCursor = cursor; + } + + if (filter) { + request.filter[1] = filter; } }); indexer.hooks.hook("sink:flush", async ({ endCursor }) => { if (endCursor) { - await store.put(endCursor); + await store.put({ cursor: endCursor }); + } + }); + + indexer.hooks.hook("connect:factory", async ({ request, endCursor }) => { + if (request.filter[1]) { + await store.put({ cursor: endCursor, filter: request.filter[1] }); } }); }); } -export class SqlitePersistence { +export class SqlitePersistence { constructor(private _db: Database) {} static async initialize(db: Database) { @@ -43,9 +54,42 @@ export class SqlitePersistence { order_key INTEGER NOT NULL, unique_key TEXT ); + + CREATE TABLE IF NOT EXISTS filters ( + id TEXT NOT NULL, + filter BLOB NOT NULL, + from_block INTEGER NOT NULL, + to_block INTEGER, + PRIMARY KEY (id, from_block) + ); `); } - async get(): Promise { + + public async get(): Promise<{ cursor?: Cursor; filter?: TFilter }> { + const cursor = await this._getCheckpoint(); + const filter = await this._getFilter(); + + return { cursor, filter }; + } + + public async put({ cursor, filter }: { cursor?: Cursor; filter?: TFilter }) { + if (cursor) { + await this._putCheckpoint(cursor); + + if (filter) { + await this._putFilter(filter, cursor); + } + } + } + + public async del() { + await this._delCheckpoint(); + await this._delFilter(); + } + + // --- CHECKPOINTS TABLE METHODS --- + + private async _getCheckpoint(): Promise { const row = await this._db.get( ` SELECT * @@ -60,7 +104,7 @@ export class SqlitePersistence { return { orderKey: BigInt(row.order_key), uniqueKey: row.unique_key }; } - async put(cursor: Cursor) { + private async _putCheckpoint(cursor: Cursor) { await this._db.run( ` INSERT INTO checkpoints (id, order_key, unique_key) @@ -73,7 +117,7 @@ export class SqlitePersistence { ); } - async del() { + private async _delCheckpoint() { await this._db.run( ` DELETE FROM checkpoints @@ -82,6 +126,59 @@ export class SqlitePersistence { ["default"], ); } + + // --- FILTERS TABLE METHODS --- + + private async _getFilter(): Promise { + const row = await this._db.get( + ` + SELECT * + FROM filters + WHERE id = ? AND to_block IS NULL + `, + ["default"], + ); + + if (!row) return undefined; + + return deserialize(row.filter) as TFilter; + } + + private async _putFilter(filter: TFilter, endCursor: Cursor) { + await this._db.run( + ` + UPDATE filters + SET to_block = ? + WHERE id = ? AND to_block IS NULL + `, + [Number(endCursor.orderKey), "default"], + ); + + await this._db.run( + ` + INSERT INTO filters (id, filter, from_block) + VALUES (?, ?, ?) + ON CONFLICT(id, from_block) DO UPDATE SET + filter = excluded.filter, + from_block = excluded.from_block + `, + [ + "default", + serialize(filter as Record), + Number(endCursor.orderKey), + ], + ); + } + + private async _delFilter() { + await this._db.run( + ` + DELETE FROM filters + WHERE id = ? + `, + ["default"], + ); + } } export type CheckpointRow = { @@ -89,3 +186,10 @@ export type CheckpointRow = { order_key: number; unique_key?: `0x${string}`; }; + +export type FilterRow = { + id: string; + filter: string; + from_block: number; + to_block?: number; +}; diff --git a/packages/indexer/src/testing/helper.ts b/packages/indexer/src/testing/helper.ts index 85e53a9..c5ce94d 100644 --- a/packages/indexer/src/testing/helper.ts +++ b/packages/indexer/src/testing/helper.ts @@ -4,6 +4,7 @@ export function generateMockMessages(count = 10): MockStreamResponse[] { return [...Array(count)].map((_, i) => ({ _tag: "data", data: { + cursor: { orderKey: BigInt(5_000_000 - 1) }, finality: "accepted", data: [{ data: `${5_000_000 + i}` }], endCursor: { orderKey: BigInt(5_000_000 + i) }, diff --git a/packages/indexer/src/testing/indexer.ts b/packages/indexer/src/testing/indexer.ts index a1392f7..b2e8b84 100644 --- a/packages/indexer/src/testing/indexer.ts +++ b/packages/indexer/src/testing/indexer.ts @@ -14,7 +14,7 @@ export const getMockIndexer = ( streamUrl: "https://sepolia.ethereum.a5a.ch", finality: "accepted", filter: {}, - transform({ block: { data } }) { + async transform({ block: { data } }) { if (!data) return []; return [{ data }]; diff --git a/packages/indexer/src/vcr/replay.ts b/packages/indexer/src/vcr/replay.ts index a5a569b..55071e7 100644 --- a/packages/indexer/src/vcr/replay.ts +++ b/packages/indexer/src/vcr/replay.ts @@ -1,3 +1,4 @@ +import assert from "node:assert"; import fs from "node:fs"; import path from "node:path"; import type { Client, Cursor } from "@apibara/protocol"; @@ -38,5 +39,12 @@ export function loadCassette( const { filter, messages } = cassetteData; - return new MockClient(messages, [filter]); + return new MockClient((request, options) => { + assert.deepStrictEqual( + request.filter, + filter, + "Request and Cassette filter mismatch", + ); + return messages; + }); } diff --git a/packages/protocol/package.json b/packages/protocol/package.json index a69bdef..1965279 100644 --- a/packages/protocol/package.json +++ b/packages/protocol/package.json @@ -30,6 +30,7 @@ "build:proto": "buf generate proto", "typecheck": "tsc --noEmit", "test": "vitest", + "test:ci": "vitest run", "lint": "biome check .", "lint:fix": "pnpm lint --write", "format": "biome format . --write" diff --git a/packages/protocol/src/proto/index.ts b/packages/protocol/src/proto/index.ts index 423146e..05dd6b8 100644 --- a/packages/protocol/src/proto/index.ts +++ b/packages/protocol/src/proto/index.ts @@ -1,3 +1,3 @@ export * as common from "./common"; export * as stream from "./stream"; -export * as testing from './testing' \ No newline at end of file +export * as testing from "./testing"; \ No newline at end of file diff --git a/packages/protocol/src/stream.ts b/packages/protocol/src/stream.ts index 9f32cb3..8d45f33 100644 --- a/packages/protocol/src/stream.ts +++ b/packages/protocol/src/stream.ts @@ -41,13 +41,13 @@ export const StreamDataRequest = ( Schema.Struct({ finality: Schema.optional(DataFinality), startingCursor: Schema.optional(Cursor), - filter: Schema.Array(filter), + filter: Schema.mutable(Schema.Array(filter)), }); export type StreamDataRequest = { finality?: DataFinality | undefined; startingCursor?: Cursor | undefined; - filter: readonly TA[]; + filter: TA[]; }; export const Invalidate = Schema.Struct({ diff --git a/packages/protocol/src/testing/client.test.ts b/packages/protocol/src/testing/client.test.ts index 663954a..a45c92e 100644 --- a/packages/protocol/src/testing/client.test.ts +++ b/packages/protocol/src/testing/client.test.ts @@ -1,17 +1,18 @@ import { describe, expect, it } from "vitest"; +import type { MockBlock } from "../proto/testing"; import { MockClient } from "./client"; +import type { MockFilter } from "./mock"; describe("MockClient", () => { it("returns a stream of messages", async () => { - const client = new MockClient( - [ + const client = new MockClient(() => { + return [ { _tag: "data", data: { finality: "finalized", data: [{ data: "hello" }] }, }, - ], - [], - ); + ]; + }); const output = []; for await (const m of client.streamData({ filter: [] })) { @@ -36,15 +37,14 @@ describe("MockClient", () => { }); it("supports factory messages", async () => { - const client = new MockClient( - [ + const client = new MockClient(() => { + return [ { _tag: "data", data: { finality: "finalized", data: [{ data: "hello" }, null] }, }, - ], - [], - ); + ]; + }); const output = []; for await (const m of client.streamData({ filter: [] })) { diff --git a/packages/protocol/src/testing/client.ts b/packages/protocol/src/testing/client.ts index a92188b..772c393 100644 --- a/packages/protocol/src/testing/client.ts +++ b/packages/protocol/src/testing/client.ts @@ -1,13 +1,13 @@ -import assert from "node:assert"; - import type { Client, ClientCallOptions, StreamDataOptions } from "../client"; import type { StatusRequest, StatusResponse } from "../status"; import type { StreamDataRequest, StreamDataResponse } from "../stream"; export class MockClient implements Client { constructor( - private messages: StreamDataResponse[], - private filter: TFilter[], + private messageFactory: ( + request: StreamDataRequest, + options?: StreamDataOptions, + ) => (StreamDataResponse | Error)[], ) {} async status( @@ -18,18 +18,14 @@ export class MockClient implements Client { } streamData(request: StreamDataRequest, options?: StreamDataOptions) { - assert.deepStrictEqual( - this.filter, - request.filter, - "Request and Cassette filter mismatch", - ); + const messages = this.messageFactory(request, options); - return new StreamDataIterable(this.messages); + return new StreamDataIterable(messages); } } export class StreamDataIterable { - constructor(private messages: StreamDataResponse[]) {} + constructor(private messages: (StreamDataResponse | Error)[]) {} [Symbol.asyncIterator](): AsyncIterator> { let index = 0; @@ -42,6 +38,10 @@ export class StreamDataIterable { } const message = messages[index++]; + if (message instanceof Error) { + throw message; + } + return { done: false, value: message }; }, }; diff --git a/packages/starknet/package.json b/packages/starknet/package.json index fe7f23a..da27083 100644 --- a/packages/starknet/package.json +++ b/packages/starknet/package.json @@ -22,6 +22,7 @@ "build:proto": "buf generate proto", "typecheck": "tsc --noEmit", "test": "vitest", + "test:ci": "vitest run", "lint": "biome check .", "lint:fix": "pnpm lint --write", "format": "biome format . --write" diff --git a/turbo.json b/turbo.json index 24b2e45..a7458e6 100644 --- a/turbo.json +++ b/turbo.json @@ -2,15 +2,14 @@ "$schema": "https://turborepo.org/schema.json", "tasks": { "build": { - "dependsOn": [ - "^build" - ], + "dependsOn": ["^build"], "outputs": ["dist/**"] }, "test": { - "dependsOn": [ - "build" - ] + "dependsOn": ["build"] + }, + "test:ci": { + "dependsOn": ["build"] }, "typecheck": { "cache": false