diff --git a/packages/indexer/src/indexer.test.ts b/packages/indexer/src/indexer.test.ts index 5250e70..114a9cb 100644 --- a/packages/indexer/src/indexer.test.ts +++ b/packages/indexer/src/indexer.test.ts @@ -16,8 +16,6 @@ describe("Run Test", () => { async function cleanup() { try { await fs.unlink("file:memdb_indexer?mode=memory&cache=shared"); - await fs.unlink("file:memdb_indexer?mode=memory&cache=shared-wal"); - await fs.unlink("file:memdb_indexer?mode=memory&cache=shared-shm"); } catch {} } @@ -261,9 +259,9 @@ describe("Run Test", () => { return []; }); - const persistence = sqlitePersistence({ - filename: "file:memdb_indexer?mode=memory&cache=shared", - }); + const db = Database("file:memdb_indexer?mode=memory&cache=shared"); + + const persistence = sqlitePersistence(db); // create mock indexer with persistence plugin const indexer = klona(getMockIndexer([persistence])); @@ -284,9 +282,6 @@ describe("Run Test", () => { await run(client, indexer, sink); - // open same db again to check last cursor - const db = Database("file:memdb_indexer?mode=memory&cache=shared"); - const store = new SqlitePersistence(db); const latest = store.get(); @@ -358,6 +353,8 @@ describe("Run Test", () => { }, ] `); + + db.close(); }); it("factory mode: last cursor should persist when error is thrown in indexer", async () => { @@ -437,9 +434,9 @@ describe("Run Test", () => { return []; }); - const persistence = sqlitePersistence({ - filename: "file:memdb_indexer?mode=memory&cache=shared", - }); + const db = Database("file:memdb_indexer?mode=memory&cache=shared"); + + const persistence = sqlitePersistence(db); // create mock indexer with persistence plugin const indexer = klona(getMockIndexer([persistence])); @@ -462,9 +459,6 @@ describe("Run Test", () => { "this error should occurr!", ); - // open same db again to check last cursor - const db = Database("file:memdb_indexer?mode=memory&cache=shared"); - const store = new SqlitePersistence(db); const latest = store.get(); @@ -473,5 +467,7 @@ describe("Run Test", () => { expect(latest.filter?.filter).toEqual("B"); expect(sink.result).toMatchInlineSnapshot("[]"); + + db.close(); }); }); diff --git a/packages/indexer/src/plugins/kv.ts b/packages/indexer/src/plugins/kv.ts index 89210ec..83334d1 100644 --- a/packages/indexer/src/plugins/kv.ts +++ b/packages/indexer/src/plugins/kv.ts @@ -1,22 +1,13 @@ import assert from "node:assert"; import type { Cursor, DataFinality } from "@apibara/protocol"; -import Database, { type Database as SqliteDatabase } from "better-sqlite3"; +import type { Database as SqliteDatabase, Statement } from "better-sqlite3"; import { useIndexerContext } from "../context"; import { deserialize, serialize } from "../vcr"; import { defineIndexerPlugin } from "./config"; -type SqliteArgs = Database.Options & { - filename: string | Buffer | undefined; -}; - -export function kv(args: SqliteArgs) { +export function kv(db: SqliteDatabase) { return defineIndexerPlugin((indexer) => { - let db: SqliteDatabase; - indexer.hooks.hook("run:before", () => { - const { filename, ...sqliteOptions } = args; - db = new Database(filename, sqliteOptions); - KVStore.initialize(db); }); @@ -45,93 +36,95 @@ export function kv(args: SqliteArgs) { ctx.kv = null; }); - - indexer.hooks.hook("run:after", () => { - console.log("kv: ", useIndexerContext().kv); - }); }); } export class KVStore { + /** Sqlite Queries Prepare Statements */ + private _beginTxnQuery: Statement; + private _commitTxnQuery: Statement; + private _rollbackTxnQuery: Statement; + private _getQuery: Statement; + private _updateToBlockQuery: Statement<[number, string]>; + private _insertIntoKvsQuery: Statement<[number, string, string]>; + private _delQuery: Statement<[number, string]>; + constructor( private _db: SqliteDatabase, private _finality: DataFinality, private _endCursor: Cursor, - ) {} + ) { + this._beginTxnQuery = this._db.prepare(statements.beginTxn); + this._commitTxnQuery = this._db.prepare(statements.commitTxn); + this._rollbackTxnQuery = this._db.prepare(statements.rollbackTxn); + this._getQuery = this._db.prepare(statements.get); + this._updateToBlockQuery = this._db.prepare(statements.updateToBlock); + this._insertIntoKvsQuery = this._db.prepare(statements.insertIntoKvs); + this._delQuery = this._db.prepare(statements.del); + } static initialize(db: SqliteDatabase) { - db.pragma("journal_mode = WAL"); - - db.prepare(` - CREATE TABLE IF NOT EXISTS kvs ( - from_block INTEGER NOT NULL, - to_block INTEGER, - k TEXT NOT NULL, - v BLOB NOT NULL, - PRIMARY KEY (from_block, k) - ); - `).run(); + db.prepare(statements.createTable).run(); } beginTransaction() { - this._db.prepare("BEGIN TRANSACTION").run(); + this._beginTxnQuery.run(); } commitTransaction() { - this._db.prepare("COMMIT TRANSACTION").run(); + this._commitTxnQuery.run(); } rollbackTransaction() { - this._db.prepare("ROLLBACK TRANSACTION").run(); + this._rollbackTxnQuery.run(); } get(key: string): T { - const row = this._db - .prepare( - ` - SELECT v - FROM kvs - WHERE k = ? AND to_block IS NULL`, - ) - .get(key); + const row = this._getQuery.get(key); return row ? deserialize(row.v) : undefined; } put(key: string, value: T) { - this._db - .prepare( - ` - UPDATE kvs - SET to_block = ? - WHERE k = ? AND to_block IS NULL - `, - ) - .run(Number(this._endCursor.orderKey), key); - - this._db - .prepare( - ` - INSERT INTO kvs (from_block, to_block, k, v) - VALUES (?, NULL, ?, ?) - `, - ) - .run( - Number(this._endCursor.orderKey), - key, - serialize(value as Record), - ); + this._updateToBlockQuery.run(Number(this._endCursor.orderKey), key); + + this._insertIntoKvsQuery.run( + Number(this._endCursor.orderKey), + key, + serialize(value as Record), + ); } del(key: string) { - this._db - .prepare( - ` - UPDATE kvs - SET to_block = ? - WHERE k = ? AND to_block IS NULL - `, - ) - .run(Number(this._endCursor.orderKey), key); + this._delQuery.run(Number(this._endCursor.orderKey), key); } } + +const statements = { + beginTxn: "BEGIN TRANSACTION", + commitTxn: "COMMIT TRANSACTION", + rollbackTxn: "ROLLBACK TRANSACTION", + createTable: ` + CREATE TABLE IF NOT EXISTS kvs ( + from_block INTEGER NOT NULL, + to_block INTEGER, + k TEXT NOT NULL, + v BLOB NOT NULL, + PRIMARY KEY (from_block, k) + );`, + get: ` + SELECT v + FROM kvs + WHERE k = ? AND to_block IS NULL`, + updateToBlock: ` + UPDATE kvs + SET to_block = ? + WHERE k = ? AND to_block IS NULL`, + insertIntoKvs: ` + INSERT INTO kvs (from_block, to_block, k, v) + VALUES (?, NULL, ?, ?)`, + del: ` + UPDATE kvs + SET to_block = ? + WHERE k = ? AND to_block IS NULL`, +}; diff --git a/packages/indexer/src/plugins/persistence.test.ts b/packages/indexer/src/plugins/persistence.test.ts index 653e45b..5d37bac 100644 --- a/packages/indexer/src/plugins/persistence.test.ts +++ b/packages/indexer/src/plugins/persistence.test.ts @@ -119,18 +119,15 @@ describe("Persistence", () => { return messages; }); - const persistence = sqlitePersistence({ - filename: "file:memdb1?mode=memory&cache=shared", - }); + const db = new Database("file:memdb1?mode=memory&cache=shared"); + + const persistence = sqlitePersistence(db); // create mock indexer with persistence plugin const indexer = klona(getMockIndexer([persistence])); await run(client, indexer); - // open same db again to check last cursor - const db = new Database("file:memdb1?mode=memory&cache=shared"); - const store = new SqlitePersistence(db); const latest = store.get(); @@ -141,14 +138,14 @@ describe("Persistence", () => { "uniqueKey": null, } `); + + db.close(); }); // Cleanup afterEach(async () => { try { await fs.unlink("file:memdb1?mode=memory&cache=shared"); - await fs.unlink("file:memdb1?mode=memory&cache=shared-shm"); - await fs.unlink("file:memdb1?mode=memory&cache=shared-wal"); } catch {} }); }); diff --git a/packages/indexer/src/plugins/persistence.ts b/packages/indexer/src/plugins/persistence.ts index aa47934..34934c0 100644 --- a/packages/indexer/src/plugins/persistence.ts +++ b/packages/indexer/src/plugins/persistence.ts @@ -1,21 +1,13 @@ import type { Cursor } from "@apibara/protocol"; -import Database, { type Database as SqliteDatabase } from "better-sqlite3"; +import type { Database as SqliteDatabase, Statement } from "better-sqlite3"; import { deserialize, serialize } from "../vcr"; import { defineIndexerPlugin } from "./config"; -type SqliteArgs = Database.Options & { - filename: string | Buffer | undefined; -}; - -export function sqlitePersistence(args: SqliteArgs) { +export function sqlitePersistence(db: SqliteDatabase) { return defineIndexerPlugin((indexer) => { - let db: SqliteDatabase; let store: SqlitePersistence; indexer.hooks.hook("run:before", () => { - const { filename, ...sqliteOptions } = args; - db = new Database(filename, sqliteOptions); - SqlitePersistence.initialize(db); store = new SqlitePersistence(db); @@ -48,28 +40,32 @@ export function sqlitePersistence(args: SqliteArgs) { } export class SqlitePersistence { - constructor(private _db: SqliteDatabase) {} + /** Sqlite Queries Prepare Statements */ + private _getCheckpointQuery: Statement; + private _putCheckpointQuery: Statement< + [string, number, `0x${string}` | undefined] + >; + private _delCheckpointQuery: Statement; + private _getFilterQuery: Statement; + private _updateFilterToBlockQuery: Statement<[number, string]>; + private _insertFilterQuery: Statement<[string, string, number]>; + private _delFilterQuery: Statement; + + constructor(private _db: SqliteDatabase) { + this._getCheckpointQuery = this._db.prepare(statements.getCheckpoint); + this._putCheckpointQuery = this._db.prepare(statements.putCheckpoint); + this._delCheckpointQuery = this._db.prepare(statements.delCheckpoint); + this._getFilterQuery = this._db.prepare(statements.getFilter); + this._updateFilterToBlockQuery = this._db.prepare( + statements.updateFilterToBlock, + ); + this._insertFilterQuery = this._db.prepare(statements.insertFilter); + this._delFilterQuery = this._db.prepare(statements.delFilter); + } static initialize(db: SqliteDatabase) { - db.pragma("journal_mode = WAL"); - - db.prepare(` - CREATE TABLE IF NOT EXISTS checkpoints ( - id TEXT NOT NULL PRIMARY KEY, - order_key INTEGER NOT NULL, - unique_key TEXT - ); - `).run(); - - db.prepare(` - CREATE TABLE IF NOT EXISTS filters ( - id TEXT NOT NULL, - filter BLOB NOT NULL, - from_block INTEGER NOT NULL, - to_block INTEGER, - PRIMARY KEY (id, from_block) - ); - `).run(); + db.prepare(statements.createCheckpointsTable).run(); + db.prepare(statements.createFiltersTable).run(); } public get(): { cursor?: Cursor; filter?: TFilter } { @@ -97,15 +93,7 @@ export class SqlitePersistence { // --- CHECKPOINTS TABLE METHODS --- private _getCheckpoint(): Cursor | undefined { - const row = this._db - .prepare( - ` - SELECT * - FROM checkpoints - WHERE id = ? - `, - ) - .get("default"); + const row = this._getCheckpointQuery.get("default"); if (!row) return undefined; @@ -113,42 +101,21 @@ export class SqlitePersistence { } private _putCheckpoint(cursor: Cursor) { - this._db - .prepare( - ` - INSERT INTO checkpoints (id, order_key, unique_key) - VALUES (?, ?, ?) - ON CONFLICT(id) DO UPDATE SET - order_key = excluded.order_key, - unique_key = excluded.unique_key - `, - ) - .run("default", Number(cursor.orderKey), cursor.uniqueKey); + this._putCheckpointQuery.run( + "default", + Number(cursor.orderKey), + cursor.uniqueKey, + ); } private _delCheckpoint() { - this._db - .prepare( - ` - DELETE FROM checkpoints - WHERE id = ? - `, - ) - .run("default"); + this._delCheckpointQuery.run("default"); } // --- FILTERS TABLE METHODS --- private _getFilter(): TFilter | undefined { - const row = this._db - .prepare( - ` - SELECT * - FROM filters - WHERE id = ? AND to_block IS NULL - `, - ) - .get("default"); + const row = this._getFilterQuery.get("default"); if (!row) return undefined; @@ -156,45 +123,69 @@ export class SqlitePersistence { } private _putFilter(filter: TFilter, endCursor: Cursor) { - this._db - .prepare( - ` - UPDATE filters - SET to_block = ? - WHERE id = ? AND to_block IS NULL - `, - ) - .run(Number(endCursor.orderKey), "default"); - - this._db - .prepare( - ` - INSERT INTO filters (id, filter, from_block) - VALUES (?, ?, ?) - ON CONFLICT(id, from_block) DO UPDATE SET - filter = excluded.filter, - from_block = excluded.from_block - `, - ) - .run( - "default", - serialize(filter as Record), - Number(endCursor.orderKey), - ); + this._updateFilterToBlockQuery.run(Number(endCursor.orderKey), "default"); + this._insertFilterQuery.run( + "default", + serialize(filter as Record), + Number(endCursor.orderKey), + ); } private _delFilter() { - this._db - .prepare( - ` - DELETE FROM filters - WHERE id = ? - `, - ) - .run("default"); + this._delFilterQuery.run("default"); } } +const statements = { + beginTxn: "BEGIN TRANSACTION", + commitTxn: "COMMIT TRANSACTION", + rollbackTxn: "ROLLBACK TRANSACTION", + createCheckpointsTable: ` + CREATE TABLE IF NOT EXISTS checkpoints ( + id TEXT NOT NULL PRIMARY KEY, + order_key INTEGER NOT NULL, + unique_key TEXT + );`, + createFiltersTable: ` + CREATE TABLE IF NOT EXISTS filters ( + id TEXT NOT NULL, + filter BLOB NOT NULL, + from_block INTEGER NOT NULL, + to_block INTEGER, + PRIMARY KEY (id, from_block) + );`, + getCheckpoint: ` + SELECT * + FROM checkpoints + WHERE id = ?`, + putCheckpoint: ` + INSERT INTO checkpoints (id, order_key, unique_key) + VALUES (?, ?, ?) + ON CONFLICT(id) DO UPDATE SET + order_key = excluded.order_key, + unique_key = excluded.unique_key`, + delCheckpoint: ` + DELETE FROM checkpoints + WHERE id = ?`, + getFilter: ` + SELECT * + FROM filters + WHERE id = ? AND to_block IS NULL`, + updateFilterToBlock: ` + UPDATE filters + SET to_block = ? + WHERE id = ? AND to_block IS NULL`, + insertFilter: ` + INSERT INTO filters (id, filter, from_block) + VALUES (?, ?, ?) + ON CONFLICT(id, from_block) DO UPDATE SET + filter = excluded.filter, + from_block = excluded.from_block`, + delFilter: ` + DELETE FROM filters + WHERE id = ?`, +}; + export type CheckpointRow = { id: string; order_key: number; diff --git a/packages/indexer/src/sinks/sqlite.test.ts b/packages/indexer/src/sinks/sqlite.test.ts index b4c730f..ace9a57 100644 --- a/packages/indexer/src/sinks/sqlite.test.ts +++ b/packages/indexer/src/sinks/sqlite.test.ts @@ -16,8 +16,6 @@ describe("Run Test", () => { async function cleanup() { try { await fs.unlink("file:memdb_sqlitesink?mode=memory&cache=shared"); - await fs.unlink("file:memdb_sqlitesink?mode=memory&cache=shared-wal"); - await fs.unlink("file:memdb_sqlitesink?mode=memory&cache=shared-shm"); } catch {} } @@ -47,7 +45,7 @@ describe("Run Test", () => { ).run(); const sink = sqlite({ - filename: "file:memdb_sqlitesink?mode=memory&cache=shared", + db, tableName: "test", }); await run(client, getMockIndexer(), sink); @@ -98,5 +96,7 @@ describe("Run Test", () => { }, ] `); + + db.close(); }); }); diff --git a/packages/indexer/src/sinks/sqlite.ts b/packages/indexer/src/sinks/sqlite.ts index 56e442c..026c3e3 100644 --- a/packages/indexer/src/sinks/sqlite.ts +++ b/packages/indexer/src/sinks/sqlite.ts @@ -1,12 +1,12 @@ import type { Cursor } from "@apibara/protocol"; -import Database, { type Database as SqliteDatabase } from "better-sqlite3"; +import type { Database as SqliteDatabase } from "better-sqlite3"; import { Sink, type SinkWriteArgs } from "../sink"; -export type SqliteArgs = Database.Options & { - filename: string | Buffer | undefined; -}; - export type SqliteSinkOptions = { + /** + * Database instance of better-sqlite3 + */ + db: SqliteDatabase; /** * The name of the table where data will be inserted. */ @@ -27,11 +27,12 @@ export type SqliteSinkOptions = { export class SqliteSink< TData extends Record, > extends Sink { - private _config: SqliteSinkOptions; + private _config: Omit; private _db: SqliteDatabase; - constructor(db: SqliteDatabase, config: SqliteSinkOptions) { + constructor(options: SqliteSinkOptions) { super(); + const { db, ...config } = options; this._config = config; this._db = db; } @@ -104,13 +105,7 @@ export class SqliteSink< } export const sqlite = >( - args: SqliteArgs & SqliteSinkOptions, + args: SqliteSinkOptions, ) => { - const { filename, cursorColumn, tableName, onConflict, ...sqliteOptions } = - args; - const db = new Database(filename, sqliteOptions); - // For performance reason: https://github.com/WiseLibs/better-sqlite3/blob/master/docs/performance.md - db.pragma("journal_mode = WAL"); - - return new SqliteSink(db, { tableName, cursorColumn, onConflict }); + return new SqliteSink(args); };