Skip to content

Commit

Permalink
indexer: refactor sqlite operations
Browse files Browse the repository at this point in the history
  • Loading branch information
jaipaljadeja committed Jul 16, 2024
1 parent c7539c2 commit d9e0600
Show file tree
Hide file tree
Showing 6 changed files with 181 additions and 209 deletions.
24 changes: 10 additions & 14 deletions packages/indexer/src/indexer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ describe("Run Test", () => {
async function cleanup() {
try {
await fs.unlink("file:memdb_indexer?mode=memory&cache=shared");
await fs.unlink("file:memdb_indexer?mode=memory&cache=shared-wal");
await fs.unlink("file:memdb_indexer?mode=memory&cache=shared-shm");
} catch {}
}

Expand Down Expand Up @@ -261,9 +259,9 @@ describe("Run Test", () => {
return [];
});

const persistence = sqlitePersistence<MockFilter, MockBlock, MockRet>({
filename: "file:memdb_indexer?mode=memory&cache=shared",
});
const db = Database("file:memdb_indexer?mode=memory&cache=shared");

const persistence = sqlitePersistence<MockFilter, MockBlock, MockRet>(db);

// create mock indexer with persistence plugin
const indexer = klona(getMockIndexer([persistence]));
Expand All @@ -284,9 +282,6 @@ describe("Run Test", () => {

await run(client, indexer, sink);

// open same db again to check last cursor
const db = Database("file:memdb_indexer?mode=memory&cache=shared");

const store = new SqlitePersistence<MockFilter>(db);

const latest = store.get();
Expand Down Expand Up @@ -358,6 +353,8 @@ describe("Run Test", () => {
},
]
`);

db.close();
});

it("factory mode: last cursor should persist when error is thrown in indexer", async () => {
Expand Down Expand Up @@ -437,9 +434,9 @@ describe("Run Test", () => {
return [];
});

const persistence = sqlitePersistence<MockFilter, MockBlock, MockRet>({
filename: "file:memdb_indexer?mode=memory&cache=shared",
});
const db = Database("file:memdb_indexer?mode=memory&cache=shared");

const persistence = sqlitePersistence<MockFilter, MockBlock, MockRet>(db);

// create mock indexer with persistence plugin
const indexer = klona(getMockIndexer([persistence]));
Expand All @@ -462,9 +459,6 @@ describe("Run Test", () => {
"this error should occurr!",
);

// open same db again to check last cursor
const db = Database("file:memdb_indexer?mode=memory&cache=shared");

const store = new SqlitePersistence<MockFilter>(db);

const latest = store.get();
Expand All @@ -473,5 +467,7 @@ describe("Run Test", () => {
expect(latest.filter?.filter).toEqual("B");

expect(sink.result).toMatchInlineSnapshot("[]");

db.close();
});
});
131 changes: 62 additions & 69 deletions packages/indexer/src/plugins/kv.ts
Original file line number Diff line number Diff line change
@@ -1,22 +1,13 @@
import assert from "node:assert";
import type { Cursor, DataFinality } from "@apibara/protocol";
import Database, { type Database as SqliteDatabase } from "better-sqlite3";
import type { Database as SqliteDatabase, Statement } from "better-sqlite3";
import { useIndexerContext } from "../context";
import { deserialize, serialize } from "../vcr";
import { defineIndexerPlugin } from "./config";

type SqliteArgs = Database.Options & {
filename: string | Buffer | undefined;
};

export function kv<TFilter, TBlock, TRet>(args: SqliteArgs) {
export function kv<TFilter, TBlock, TRet>(db: SqliteDatabase) {
return defineIndexerPlugin<TFilter, TBlock, TRet>((indexer) => {
let db: SqliteDatabase;

indexer.hooks.hook("run:before", () => {
const { filename, ...sqliteOptions } = args;
db = new Database(filename, sqliteOptions);

KVStore.initialize(db);
});

Expand Down Expand Up @@ -45,93 +36,95 @@ export function kv<TFilter, TBlock, TRet>(args: SqliteArgs) {

ctx.kv = null;
});

indexer.hooks.hook("run:after", () => {
console.log("kv: ", useIndexerContext().kv);
});
});
}

export class KVStore {
/** Sqlite Queries Prepare Statements */
private _beginTxnQuery: Statement;
private _commitTxnQuery: Statement;
private _rollbackTxnQuery: Statement;
private _getQuery: Statement<string, { v: string }>;
private _updateToBlockQuery: Statement<[number, string]>;
private _insertIntoKvsQuery: Statement<[number, string, string]>;
private _delQuery: Statement<[number, string]>;

constructor(
private _db: SqliteDatabase,
private _finality: DataFinality,
private _endCursor: Cursor,
) {}
) {
this._beginTxnQuery = this._db.prepare(statements.beginTxn);
this._commitTxnQuery = this._db.prepare(statements.commitTxn);
this._rollbackTxnQuery = this._db.prepare(statements.rollbackTxn);
this._getQuery = this._db.prepare(statements.get);
this._updateToBlockQuery = this._db.prepare(statements.updateToBlock);
this._insertIntoKvsQuery = this._db.prepare(statements.insertIntoKvs);
this._delQuery = this._db.prepare(statements.del);
}

static initialize(db: SqliteDatabase) {
db.pragma("journal_mode = WAL");

db.prepare(`
CREATE TABLE IF NOT EXISTS kvs (
from_block INTEGER NOT NULL,
to_block INTEGER,
k TEXT NOT NULL,
v BLOB NOT NULL,
PRIMARY KEY (from_block, k)
);
`).run();
db.prepare(statements.createTable).run();
}

beginTransaction() {
this._db.prepare("BEGIN TRANSACTION").run();
this._beginTxnQuery.run();
}

commitTransaction() {
this._db.prepare("COMMIT TRANSACTION").run();
this._commitTxnQuery.run();
}

rollbackTransaction() {
this._db.prepare("ROLLBACK TRANSACTION").run();
this._rollbackTxnQuery.run();
}

get<T>(key: string): T {
const row = this._db
.prepare<string, { v: string }>(
`
SELECT v
FROM kvs
WHERE k = ? AND to_block IS NULL`,
)
.get(key);
const row = this._getQuery.get(key);

return row ? deserialize(row.v) : undefined;
}

put<T>(key: string, value: T) {
this._db
.prepare(
`
UPDATE kvs
SET to_block = ?
WHERE k = ? AND to_block IS NULL
`,
)
.run(Number(this._endCursor.orderKey), key);

this._db
.prepare(
`
INSERT INTO kvs (from_block, to_block, k, v)
VALUES (?, NULL, ?, ?)
`,
)
.run(
Number(this._endCursor.orderKey),
key,
serialize(value as Record<string, unknown>),
);
this._updateToBlockQuery.run(Number(this._endCursor.orderKey), key);

this._insertIntoKvsQuery.run(
Number(this._endCursor.orderKey),
key,
serialize(value as Record<string, unknown>),
);
}

del(key: string) {
this._db
.prepare(
`
UPDATE kvs
SET to_block = ?
WHERE k = ? AND to_block IS NULL
`,
)
.run(Number(this._endCursor.orderKey), key);
this._delQuery.run(Number(this._endCursor.orderKey), key);
}
}

const statements = {
beginTxn: "BEGIN TRANSACTION",
commitTxn: "COMMIT TRANSACTION",
rollbackTxn: "ROLLBACK TRANSACTION",
createTable: `
CREATE TABLE IF NOT EXISTS kvs (
from_block INTEGER NOT NULL,
to_block INTEGER,
k TEXT NOT NULL,
v BLOB NOT NULL,
PRIMARY KEY (from_block, k)
);`,
get: `
SELECT v
FROM kvs
WHERE k = ? AND to_block IS NULL`,
updateToBlock: `
UPDATE kvs
SET to_block = ?
WHERE k = ? AND to_block IS NULL`,
insertIntoKvs: `
INSERT INTO kvs (from_block, to_block, k, v)
VALUES (?, NULL, ?, ?)`,
del: `
UPDATE kvs
SET to_block = ?
WHERE k = ? AND to_block IS NULL`,
};
13 changes: 5 additions & 8 deletions packages/indexer/src/plugins/persistence.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -119,18 +119,15 @@ describe("Persistence", () => {
return messages;
});

const persistence = sqlitePersistence<MockFilter, MockBlock, MockRet>({
filename: "file:memdb1?mode=memory&cache=shared",
});
const db = new Database("file:memdb1?mode=memory&cache=shared");

const persistence = sqlitePersistence<MockFilter, MockBlock, MockRet>(db);

// create mock indexer with persistence plugin
const indexer = klona(getMockIndexer([persistence]));

await run(client, indexer);

// open same db again to check last cursor
const db = new Database("file:memdb1?mode=memory&cache=shared");

const store = new SqlitePersistence<MockFilter>(db);

const latest = store.get();
Expand All @@ -141,14 +138,14 @@ describe("Persistence", () => {
"uniqueKey": null,
}
`);

db.close();
});

// Cleanup
afterEach(async () => {
try {
await fs.unlink("file:memdb1?mode=memory&cache=shared");
await fs.unlink("file:memdb1?mode=memory&cache=shared-shm");
await fs.unlink("file:memdb1?mode=memory&cache=shared-wal");
} catch {}
});
});
Expand Down
Loading

0 comments on commit d9e0600

Please sign in to comment.