Skip to content

Commit

Permalink
feat: add drizzle persistence plugin (#122)
Browse files Browse the repository at this point in the history
  • Loading branch information
fracek authored Nov 30, 2024
2 parents eeda49e + 76f1e94 commit 639ccdc
Show file tree
Hide file tree
Showing 7 changed files with 282 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"type": "prerelease",
"comment": "indexer: add drizzle persistence plugin",
"packageName": "@apibara/indexer",
"email": "[email protected]",
"dependentChangeType": "patch"
}
46 changes: 45 additions & 1 deletion examples/cli/indexers/2-starknet.indexer.ts
Original file line number Diff line number Diff line change
@@ -1,26 +1,43 @@
import { defineIndexer, useSink } from "@apibara/indexer";
import { drizzlePersistence } from "@apibara/indexer/plugins/drizzle-persistence";
import { useLogger } from "@apibara/indexer/plugins/logger";
import { sqlite } from "@apibara/indexer/sinks/sqlite";
import { StarknetStream } from "@apibara/starknet";
import type { ApibaraRuntimeConfig } from "apibara/types";
import Database from "better-sqlite3";
import { sql } from "drizzle-orm";
import { drizzle } from "drizzle-orm/node-postgres";
import { Client } from "pg";
import { hash } from "starknet";

export default function (runtimeConfig: ApibaraRuntimeConfig) {
console.log("--> Starknet Indexer Runtime Config: ", runtimeConfig);
const database = new Database(runtimeConfig.databasePath);

// Sink Database
const database = new Database(runtimeConfig.databasePath);
database.exec("DROP TABLE IF EXISTS test");
database.exec(
"CREATE TABLE IF NOT EXISTS test (number TEXT, hash TEXT, _cursor BIGINT)",
);

// Persistence Database
const client = new Client({
connectionString: "postgres://postgres:postgres@localhost:5432/postgres",
});
const persistDatabase = drizzle(client);

return defineIndexer(StarknetStream)({
streamUrl: "https://starknet.preview.apibara.org",
finality: "accepted",
startingCursor: {
orderKey: 800_000n,
},
plugins: [
drizzlePersistence({
database: persistDatabase,
indexerName: "2-starknet",
}),
],
sink: sqlite({ database, tableName: "test" }),
filter: {
events: [
Expand All @@ -42,5 +59,32 @@ export default function (runtimeConfig: ApibaraRuntimeConfig) {
// hash: header?.blockHash,
// }])
},
hooks: {
async "run:before"() {
await client.connect();

// Normally user will do migrations of both tables, which are defined in
// ```
// import { checkpoints, filters } from "@apibara/indexer/plugins/drizzle-persistence"
// ```,
// but just for quick testing and example we create them here directly

await persistDatabase.execute(sql`
CREATE TABLE IF NOT EXISTS checkpoints (
id TEXT NOT NULL PRIMARY KEY,
order_key INTEGER NOT NULL,
unique_key TEXT
);
CREATE TABLE IF NOT EXISTS filters (
id TEXT NOT NULL,
filter TEXT NOT NULL,
from_block INTEGER NOT NULL,
to_block INTEGER,
PRIMARY KEY (id, from_block)
);
`);
},
},
});
}
4 changes: 4 additions & 0 deletions examples/cli/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
"devDependencies": {
"@types/better-sqlite3": "^7.6.11",
"@types/node": "^20.5.2",
"@types/pg": "^8.11.10",
"typescript": "^5.6.2",
"vitest": "^1.6.0"
},
Expand All @@ -25,8 +26,11 @@
"@apibara/indexer": "workspace:*",
"@apibara/protocol": "workspace:*",
"@apibara/starknet": "workspace:*",
"@electric-sql/pglite": "^0.2.14",
"apibara": "workspace:*",
"better-sqlite3": "^11.5.0",
"drizzle-orm": "^0.35.2",
"pg": "^8.12.0",
"starknet": "^6.11.0"
}
}
1 change: 1 addition & 0 deletions packages/indexer/build.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ export default defineBuildConfig({
"./src/plugins/kv.ts",
"./src/plugins/logger.ts",
"./src/plugins/persistence.ts",
"./src/plugins/drizzle-persistence.ts",
],
clean: true,
outDir: "./dist",
Expand Down
8 changes: 8 additions & 0 deletions packages/indexer/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,12 @@
"import": "./dist/plugins/persistence.mjs",
"require": "./dist/plugins/persistence.cjs",
"default": "./dist/plugins/persistence.mjs"
},
"./plugins/drizzle-persistence": {
"types": "./dist/plugins/drizzle-persistence.d.ts",
"import": "./dist/plugins/drizzle-persistence.mjs",
"require": "./dist/plugins/drizzle-persistence.cjs",
"default": "./dist/plugins/drizzle-persistence.mjs"
}
},
"scripts": {
Expand All @@ -80,6 +86,7 @@
"test:ci": "vitest run"
},
"devDependencies": {
"@electric-sql/pglite": "^0.2.14",
"@types/better-sqlite3": "^7.6.11",
"@types/node": "^20.14.0",
"@types/pg": "^8.11.10",
Expand All @@ -102,6 +109,7 @@
"unctx": "^2.3.1"
},
"peerDependencies": {
"@electric-sql/pglite": "^0.2.14",
"better-sqlite3": "^11.5.0",
"csv-stringify": "^6.5.0",
"drizzle-orm": "^0.35.2",
Expand Down
192 changes: 192 additions & 0 deletions packages/indexer/src/plugins/drizzle-persistence.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
import type { Cursor } from "@apibara/protocol";
import {
type ExtractTablesWithRelations,
type TablesRelationalConfig,
and,
eq,
isNull,
} from "drizzle-orm";
import {
type PgDatabase,
type PgQueryResultHKT,
integer,
pgTable,
primaryKey,
text,
} from "drizzle-orm/pg-core";
import { deserialize, serialize } from "../vcr";
import { defineIndexerPlugin } from "./config";

export const checkpoints = pgTable("checkpoints", {
id: text("id").notNull().primaryKey(),
orderKey: integer("order_key").notNull(),
uniqueKey: text("unique_key")
.$type<`0x${string}` | undefined>()
.notNull()
.default(undefined),
});

export const filters = pgTable(
"filters",
{
id: text("id").notNull(),
filter: text("filter").notNull(),
fromBlock: integer("from_block").notNull(),
toBlock: integer("to_block"),
},
(table) => ({
pk: primaryKey({ columns: [table.id, table.fromBlock] }),
}),
);

export function drizzlePersistence<
TFilter,
TBlock,
TTxnParams,
TQueryResult extends PgQueryResultHKT,
TFullSchema extends Record<string, unknown> = Record<string, never>,
TSchema extends
TablesRelationalConfig = ExtractTablesWithRelations<TFullSchema>,
>({
database,
indexerName = "default",
}: {
database: PgDatabase<TQueryResult, TFullSchema, TSchema>;
indexerName?: string;
}) {
return defineIndexerPlugin<TFilter, TBlock, TTxnParams>((indexer) => {
let store: DrizzlePersistence<TFilter, TQueryResult, TFullSchema, TSchema>;

indexer.hooks.hook("run:before", async () => {
store = new DrizzlePersistence(database, indexerName);
// Tables are created by user via migrations in Drizzle
});

indexer.hooks.hook("connect:before", async ({ request }) => {
const { cursor, filter } = await store.get();

if (cursor) {
request.startingCursor = cursor;
}

if (filter) {
request.filter[1] = filter;
}
});

indexer.hooks.hook("transaction:commit", async ({ endCursor }) => {
if (endCursor) {
await store.put({ cursor: endCursor });
}
});

indexer.hooks.hook("connect:factory", async ({ request, endCursor }) => {
if (request.filter[1]) {
await store.put({ cursor: endCursor, filter: request.filter[1] });
}
});
});
}

export class DrizzlePersistence<
TFilter,
TQueryResult extends PgQueryResultHKT,
TFullSchema extends Record<string, unknown> = Record<string, never>,
TSchema extends
TablesRelationalConfig = ExtractTablesWithRelations<TFullSchema>,
> {
constructor(
private _db: PgDatabase<TQueryResult, TFullSchema, TSchema>,
private _indexerName: string,
) {}

public async get(): Promise<{ cursor?: Cursor; filter?: TFilter }> {
const cursor = await this._getCheckpoint();
const filter = await this._getFilter();

return { cursor, filter };
}

public async put({ cursor, filter }: { cursor?: Cursor; filter?: TFilter }) {
if (cursor) {
await this._putCheckpoint(cursor);

if (filter) {
await this._putFilter(filter, cursor);
}
}
}

// --- CHECKPOINTS TABLE METHODS ---

private async _getCheckpoint(): Promise<Cursor | undefined> {
const rows = await this._db
.select()
.from(checkpoints)
.where(eq(checkpoints.id, this._indexerName));

const row = rows[0];
if (!row) return undefined;

return {
orderKey: BigInt(row.orderKey),
uniqueKey: row.uniqueKey,
};
}

private async _putCheckpoint(cursor: Cursor) {
await this._db
.insert(checkpoints)
.values({
id: this._indexerName,
orderKey: Number(cursor.orderKey),
uniqueKey: cursor.uniqueKey,
})
.onConflictDoUpdate({
target: checkpoints.id,
set: {
orderKey: Number(cursor.orderKey),
uniqueKey: cursor.uniqueKey,
},
});
}

// --- FILTERS TABLE METHODS ---

private async _getFilter(): Promise<TFilter | undefined> {
const rows = await this._db
.select()
.from(filters)
.where(and(eq(filters.id, this._indexerName), isNull(filters.toBlock)));

const row = rows[0];

if (!row) return undefined;

return deserialize(row.filter) as TFilter;
}

private async _putFilter(filter: TFilter, endCursor: Cursor) {
// Update existing filter's to_block
await this._db
.update(filters)
.set({ toBlock: Number(endCursor.orderKey) })
.where(and(eq(filters.id, this._indexerName), isNull(filters.toBlock)));

// Insert new filter
await this._db
.insert(filters)
.values({
id: this._indexerName,
filter: serialize(filter as Record<string, unknown>),
fromBlock: Number(endCursor.orderKey),
})
.onConflictDoUpdate({
target: [filters.id, filters.fromBlock],
set: {
filter: serialize(filter as Record<string, unknown>),
fromBlock: Number(endCursor.orderKey),
},
});
}
}
Loading

0 comments on commit 639ccdc

Please sign in to comment.