diff --git a/change/@apibara-indexer-3ab2f0f6-4d43-4499-a0a3-55188fe8eb53.json b/change/@apibara-indexer-3ab2f0f6-4d43-4499-a0a3-55188fe8eb53.json new file mode 100644 index 0000000..82f5023 --- /dev/null +++ b/change/@apibara-indexer-3ab2f0f6-4d43-4499-a0a3-55188fe8eb53.json @@ -0,0 +1,7 @@ +{ + "type": "prerelease", + "comment": "indexer: add drizzle persistence plugin", + "packageName": "@apibara/indexer", + "email": "jadejajaipal5@gmail.com", + "dependentChangeType": "patch" +} diff --git a/examples/cli/indexers/2-starknet.indexer.ts b/examples/cli/indexers/2-starknet.indexer.ts index 386b7ad..339a9a5 100644 --- a/examples/cli/indexers/2-starknet.indexer.ts +++ b/examples/cli/indexers/2-starknet.indexer.ts @@ -1,26 +1,43 @@ import { defineIndexer, useSink } from "@apibara/indexer"; +import { drizzlePersistence } from "@apibara/indexer/plugins/drizzle-persistence"; import { useLogger } from "@apibara/indexer/plugins/logger"; import { sqlite } from "@apibara/indexer/sinks/sqlite"; import { StarknetStream } from "@apibara/starknet"; import type { ApibaraRuntimeConfig } from "apibara/types"; import Database from "better-sqlite3"; +import { sql } from "drizzle-orm"; +import { drizzle } from "drizzle-orm/node-postgres"; +import { Client } from "pg"; import { hash } from "starknet"; export default function (runtimeConfig: ApibaraRuntimeConfig) { console.log("--> Starknet Indexer Runtime Config: ", runtimeConfig); - const database = new Database(runtimeConfig.databasePath); + // Sink Database + const database = new Database(runtimeConfig.databasePath); database.exec("DROP TABLE IF EXISTS test"); database.exec( "CREATE TABLE IF NOT EXISTS test (number TEXT, hash TEXT, _cursor BIGINT)", ); + // Persistence Database + const client = new Client({ + connectionString: "postgres://postgres:postgres@localhost:5432/postgres", + }); + const persistDatabase = drizzle(client); + return defineIndexer(StarknetStream)({ streamUrl: "https://starknet.preview.apibara.org", finality: "accepted", startingCursor: { orderKey: 800_000n, }, + plugins: [ + drizzlePersistence({ + database: persistDatabase, + indexerName: "2-starknet", + }), + ], sink: sqlite({ database, tableName: "test" }), filter: { events: [ @@ -42,5 +59,32 @@ export default function (runtimeConfig: ApibaraRuntimeConfig) { // hash: header?.blockHash, // }]) }, + hooks: { + async "run:before"() { + await client.connect(); + + // Normally user will do migrations of both tables, which are defined in + // ``` + // import { checkpoints, filters } from "@apibara/indexer/plugins/drizzle-persistence" + // ```, + // but just for quick testing and example we create them here directly + + await persistDatabase.execute(sql` + CREATE TABLE IF NOT EXISTS checkpoints ( + id TEXT NOT NULL PRIMARY KEY, + order_key INTEGER NOT NULL, + unique_key TEXT + ); + + CREATE TABLE IF NOT EXISTS filters ( + id TEXT NOT NULL, + filter TEXT NOT NULL, + from_block INTEGER NOT NULL, + to_block INTEGER, + PRIMARY KEY (id, from_block) + ); + `); + }, + }, }); } diff --git a/examples/cli/package.json b/examples/cli/package.json index 44b23f2..566df13 100644 --- a/examples/cli/package.json +++ b/examples/cli/package.json @@ -17,6 +17,7 @@ "devDependencies": { "@types/better-sqlite3": "^7.6.11", "@types/node": "^20.5.2", + "@types/pg": "^8.11.10", "typescript": "^5.6.2", "vitest": "^1.6.0" }, @@ -25,8 +26,11 @@ "@apibara/indexer": "workspace:*", "@apibara/protocol": "workspace:*", "@apibara/starknet": "workspace:*", + "@electric-sql/pglite": "^0.2.14", "apibara": "workspace:*", "better-sqlite3": "^11.5.0", + "drizzle-orm": "^0.35.2", + "pg": "^8.12.0", "starknet": "^6.11.0" } } diff --git a/packages/indexer/build.config.ts b/packages/indexer/build.config.ts index ebb537d..964198d 100644 --- a/packages/indexer/build.config.ts +++ b/packages/indexer/build.config.ts @@ -12,6 +12,7 @@ export default defineBuildConfig({ "./src/plugins/kv.ts", "./src/plugins/logger.ts", "./src/plugins/persistence.ts", + "./src/plugins/drizzle-persistence.ts", ], clean: true, outDir: "./dist", diff --git a/packages/indexer/package.json b/packages/indexer/package.json index aaacc76..6f3cb54 100644 --- a/packages/indexer/package.json +++ b/packages/indexer/package.json @@ -69,6 +69,12 @@ "import": "./dist/plugins/persistence.mjs", "require": "./dist/plugins/persistence.cjs", "default": "./dist/plugins/persistence.mjs" + }, + "./plugins/drizzle-persistence": { + "types": "./dist/plugins/drizzle-persistence.d.ts", + "import": "./dist/plugins/drizzle-persistence.mjs", + "require": "./dist/plugins/drizzle-persistence.cjs", + "default": "./dist/plugins/drizzle-persistence.mjs" } }, "scripts": { @@ -80,6 +86,7 @@ "test:ci": "vitest run" }, "devDependencies": { + "@electric-sql/pglite": "^0.2.14", "@types/better-sqlite3": "^7.6.11", "@types/node": "^20.14.0", "@types/pg": "^8.11.10", @@ -102,6 +109,7 @@ "unctx": "^2.3.1" }, "peerDependencies": { + "@electric-sql/pglite": "^0.2.14", "better-sqlite3": "^11.5.0", "csv-stringify": "^6.5.0", "drizzle-orm": "^0.35.2", diff --git a/packages/indexer/src/plugins/drizzle-persistence.ts b/packages/indexer/src/plugins/drizzle-persistence.ts new file mode 100644 index 0000000..3965f25 --- /dev/null +++ b/packages/indexer/src/plugins/drizzle-persistence.ts @@ -0,0 +1,192 @@ +import type { Cursor } from "@apibara/protocol"; +import { + type ExtractTablesWithRelations, + type TablesRelationalConfig, + and, + eq, + isNull, +} from "drizzle-orm"; +import { + type PgDatabase, + type PgQueryResultHKT, + integer, + pgTable, + primaryKey, + text, +} from "drizzle-orm/pg-core"; +import { deserialize, serialize } from "../vcr"; +import { defineIndexerPlugin } from "./config"; + +export const checkpoints = pgTable("checkpoints", { + id: text("id").notNull().primaryKey(), + orderKey: integer("order_key").notNull(), + uniqueKey: text("unique_key") + .$type<`0x${string}` | undefined>() + .notNull() + .default(undefined), +}); + +export const filters = pgTable( + "filters", + { + id: text("id").notNull(), + filter: text("filter").notNull(), + fromBlock: integer("from_block").notNull(), + toBlock: integer("to_block"), + }, + (table) => ({ + pk: primaryKey({ columns: [table.id, table.fromBlock] }), + }), +); + +export function drizzlePersistence< + TFilter, + TBlock, + TTxnParams, + TQueryResult extends PgQueryResultHKT, + TFullSchema extends Record = Record, + TSchema extends + TablesRelationalConfig = ExtractTablesWithRelations, +>({ + database, + indexerName = "default", +}: { + database: PgDatabase; + indexerName?: string; +}) { + return defineIndexerPlugin((indexer) => { + let store: DrizzlePersistence; + + indexer.hooks.hook("run:before", async () => { + store = new DrizzlePersistence(database, indexerName); + // Tables are created by user via migrations in Drizzle + }); + + indexer.hooks.hook("connect:before", async ({ request }) => { + const { cursor, filter } = await store.get(); + + if (cursor) { + request.startingCursor = cursor; + } + + if (filter) { + request.filter[1] = filter; + } + }); + + indexer.hooks.hook("transaction:commit", async ({ endCursor }) => { + if (endCursor) { + await store.put({ cursor: endCursor }); + } + }); + + indexer.hooks.hook("connect:factory", async ({ request, endCursor }) => { + if (request.filter[1]) { + await store.put({ cursor: endCursor, filter: request.filter[1] }); + } + }); + }); +} + +export class DrizzlePersistence< + TFilter, + TQueryResult extends PgQueryResultHKT, + TFullSchema extends Record = Record, + TSchema extends + TablesRelationalConfig = ExtractTablesWithRelations, +> { + constructor( + private _db: PgDatabase, + private _indexerName: string, + ) {} + + public async get(): Promise<{ cursor?: Cursor; filter?: TFilter }> { + const cursor = await this._getCheckpoint(); + const filter = await this._getFilter(); + + return { cursor, filter }; + } + + public async put({ cursor, filter }: { cursor?: Cursor; filter?: TFilter }) { + if (cursor) { + await this._putCheckpoint(cursor); + + if (filter) { + await this._putFilter(filter, cursor); + } + } + } + + // --- CHECKPOINTS TABLE METHODS --- + + private async _getCheckpoint(): Promise { + const rows = await this._db + .select() + .from(checkpoints) + .where(eq(checkpoints.id, this._indexerName)); + + const row = rows[0]; + if (!row) return undefined; + + return { + orderKey: BigInt(row.orderKey), + uniqueKey: row.uniqueKey, + }; + } + + private async _putCheckpoint(cursor: Cursor) { + await this._db + .insert(checkpoints) + .values({ + id: this._indexerName, + orderKey: Number(cursor.orderKey), + uniqueKey: cursor.uniqueKey, + }) + .onConflictDoUpdate({ + target: checkpoints.id, + set: { + orderKey: Number(cursor.orderKey), + uniqueKey: cursor.uniqueKey, + }, + }); + } + + // --- FILTERS TABLE METHODS --- + + private async _getFilter(): Promise { + const rows = await this._db + .select() + .from(filters) + .where(and(eq(filters.id, this._indexerName), isNull(filters.toBlock))); + + const row = rows[0]; + + if (!row) return undefined; + + return deserialize(row.filter) as TFilter; + } + + private async _putFilter(filter: TFilter, endCursor: Cursor) { + // Update existing filter's to_block + await this._db + .update(filters) + .set({ toBlock: Number(endCursor.orderKey) }) + .where(and(eq(filters.id, this._indexerName), isNull(filters.toBlock))); + + // Insert new filter + await this._db + .insert(filters) + .values({ + id: this._indexerName, + filter: serialize(filter as Record), + fromBlock: Number(endCursor.orderKey), + }) + .onConflictDoUpdate({ + target: [filters.id, filters.fromBlock], + set: { + filter: serialize(filter as Record), + fromBlock: Number(endCursor.orderKey), + }, + }); + } +} diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 3f295ab..7f6d34e 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -60,12 +60,21 @@ importers: '@apibara/starknet': specifier: workspace:* version: link:../../packages/starknet + '@electric-sql/pglite': + specifier: ^0.2.14 + version: 0.2.14 apibara: specifier: workspace:* version: link:../../packages/cli better-sqlite3: specifier: ^11.5.0 version: 11.5.0 + drizzle-orm: + specifier: ^0.35.2 + version: 0.35.2(@electric-sql/pglite@0.2.14)(@opentelemetry/api@1.9.0)(@types/better-sqlite3@7.6.11)(@types/pg@8.11.10)(better-sqlite3@11.5.0)(pg@8.13.0)(postgres@3.4.4) + pg: + specifier: ^8.12.0 + version: 8.13.0 starknet: specifier: ^6.11.0 version: 6.11.0 @@ -76,6 +85,9 @@ importers: '@types/node': specifier: ^20.5.2 version: 20.14.0 + '@types/pg': + specifier: ^8.11.10 + version: 8.11.10 typescript: specifier: ^5.6.2 version: 5.6.2 @@ -179,7 +191,7 @@ importers: version: 6.5.0 drizzle-orm: specifier: ^0.35.0 - version: 0.35.2(@opentelemetry/api@1.9.0)(@types/better-sqlite3@7.6.11)(@types/pg@8.11.10)(better-sqlite3@11.5.0)(pg@8.13.0)(postgres@3.4.4) + version: 0.35.2(@electric-sql/pglite@0.2.14)(@opentelemetry/api@1.9.0)(@types/better-sqlite3@7.6.11)(@types/pg@8.11.10)(better-sqlite3@11.5.0)(pg@8.13.0)(postgres@3.4.4) postgres: specifier: ^3.4.4 version: 3.4.4 @@ -262,7 +274,7 @@ importers: version: 6.5.0 drizzle-orm: specifier: ^0.35.0 - version: 0.35.2(@opentelemetry/api@1.9.0)(@types/better-sqlite3@7.6.11)(@types/pg@8.11.10)(better-sqlite3@11.5.0)(pg@8.13.0)(postgres@3.4.4) + version: 0.35.2(@electric-sql/pglite@0.2.14)(@opentelemetry/api@1.9.0)(@types/better-sqlite3@7.6.11)(@types/pg@8.11.10)(better-sqlite3@11.5.0)(pg@8.13.0)(postgres@3.4.4) postgres: specifier: ^3.4.4 version: 3.4.4 @@ -493,6 +505,9 @@ importers: specifier: ^2.3.1 version: 2.3.1 devDependencies: + '@electric-sql/pglite': + specifier: ^0.2.14 + version: 0.2.14 '@types/better-sqlite3': specifier: ^7.6.11 version: 7.6.11 @@ -510,7 +525,7 @@ importers: version: 6.5.0 drizzle-orm: specifier: ^0.35.2 - version: 0.35.2(@opentelemetry/api@1.9.0)(@types/better-sqlite3@7.6.11)(@types/pg@8.11.10)(better-sqlite3@11.5.0)(pg@8.13.0)(postgres@3.4.4) + version: 0.35.2(@electric-sql/pglite@0.2.14)(@opentelemetry/api@1.9.0)(@types/better-sqlite3@7.6.11)(@types/pg@8.11.10)(better-sqlite3@11.5.0)(pg@8.13.0)(postgres@3.4.4) pg: specifier: ^8.12.0 version: 8.13.0 @@ -856,6 +871,9 @@ packages: peerDependencies: effect: ^3.2.6 + '@electric-sql/pglite@0.2.14': + resolution: {integrity: sha512-ZMYZL/yFu5sCewYecdX4OjyOPcrI2OmQ6598e/tyke4Rpgeekd4+pINf9jjzJNJk1Kq5dtuB6buqZsBQf0sx8A==} + '@esbuild/aix-ppc64@0.19.11': resolution: {integrity: sha512-FnzU0LyE3ySQk7UntJO4+qIiQgI7KoODnZg5xzXIrFJlKd2P2gwHsHY4927xj9y5PJmJSzULiUCWmv7iWnNa7g==} engines: {node: '>=12'} @@ -3907,6 +3925,8 @@ snapshots: effect: 3.2.6 fast-check: 3.19.0 + '@electric-sql/pglite@0.2.14': {} + '@esbuild/aix-ppc64@0.19.11': optional: true @@ -4994,8 +5014,9 @@ snapshots: dotenv@16.4.5: {} - drizzle-orm@0.35.2(@opentelemetry/api@1.9.0)(@types/better-sqlite3@7.6.11)(@types/pg@8.11.10)(better-sqlite3@11.5.0)(pg@8.13.0)(postgres@3.4.4): + drizzle-orm@0.35.2(@electric-sql/pglite@0.2.14)(@opentelemetry/api@1.9.0)(@types/better-sqlite3@7.6.11)(@types/pg@8.11.10)(better-sqlite3@11.5.0)(pg@8.13.0)(postgres@3.4.4): optionalDependencies: + '@electric-sql/pglite': 0.2.14 '@opentelemetry/api': 1.9.0 '@types/better-sqlite3': 7.6.11 '@types/pg': 8.11.10