Skip to content

Commit

Permalink
indexer: add kv plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
jaipaljadeja committed Jun 21, 2024
1 parent da2e09a commit 751aa8a
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 5 deletions.
13 changes: 11 additions & 2 deletions packages/indexer/src/indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,13 @@ export interface IndexerHooks<TFilter, TBlock, TRet> {
options: StreamDataOptions;
}) => void;
"connect:after": () => void;
"handler:before": ({ block }: { block: TBlock }) => void;
"handler:before": ({
block,
finality,
endCursor,
}: { block: TBlock; finality: DataFinality; endCursor?: Cursor }) => void;
"handler:after": ({ output }: { output: TRet[] }) => void;
"handler:exception": ({ error }: { error: Error }) => void;
"sink:write": ({ data }: { data: TRet[] }) => void;
"sink:flush": () => void;
message: ({ message }: { message: StreamDataResponse<TBlock> }) => void;
Expand Down Expand Up @@ -149,7 +154,11 @@ export async function run<TFilter, TBlock, TRet>(
const output = await tracer.startActiveSpan(
"handler",
async (span) => {
await indexer.hooks.callHook("handler:before", { block });
await indexer.hooks.callHook("handler:before", {
block,
endCursor,
finality,
});
const output = await indexer.options.transform({
block,
cursor,
Expand Down
98 changes: 95 additions & 3 deletions packages/indexer/src/plugins/kv.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,108 @@
import assert from "node:assert";
import type { Cursor, DataFinality } from "@apibara/protocol";
import { type Database, type ISqlite, open } from "sqlite";
import { useIndexerContext } from "../context";
import { defineIndexerPlugin } from "../plugins";
import { deserialize, serialize } from "../vcr";

/** This plugin is a placeholder for a future key-value store plugin. */
export function kv<TFilter, TBlock, TRet>() {
type SqliteArgs = ISqlite.Config;

export function kv<TFilter, TBlock, TRet>(args: SqliteArgs) {
return defineIndexerPlugin<TFilter, TBlock, TRet>((indexer) => {
let db: Database;

indexer.hooks.hook("run:before", async () => {
db = await open(args);
await db.exec(`
CREATE TABLE IF NOT EXISTS kvs (
from_block INTEGER NOT NULL,
to_block INTEGER,
k TEXT NOT NULL,
v BLOB NOT NULL,
PRIMARY KEY (from_block, k)
);
`);
});

indexer.hooks.hook("handler:before", async ({ finality, endCursor }) => {
const ctx = useIndexerContext();

assert(endCursor, new Error("endCursor cannot be undefined"));

ctx.kv = new KVStore(db, finality, endCursor);

await db.exec("BEGIN TRANSACTION");
});

indexer.hooks.hook("handler:after", async () => {
await db.exec("COMMIT TRANSACTION");

const ctx = useIndexerContext();

ctx.kv = null;
});

indexer.hooks.hook("handler:exception", async () => {
await db.exec("COMMIT TRANSACTION");

const ctx = useIndexerContext();
ctx.kv = {};

ctx.kv = null;
});

indexer.hooks.hook("run:after", async () => {
console.log("kv: ", useIndexerContext().kv);
});
});
}

export class KVStore {
constructor(
private _db: Database,
private _finality: DataFinality,
private _endCursor: Cursor,
) {}

async get<T extends Record<string, unknown>>(key: string): Promise<T> {
const row = await this._db.get<{ v: string }>(
`
SELECT v
FROM kvs
WHERE k = ? AND to_block IS NULL
`,
[key],
);

return row ? deserialize(row.v) : undefined;
}

async put<T extends Record<string, unknown>>(key: string, value: T) {
await this._db.run(
`
UPDATE kvs
SET to_block = ?
WHERE k = ? AND to_block IS NULL
`,
[Number(this._endCursor.orderKey), key],
);

await this._db.run(
`
INSERT INTO kvs (from_block, to_block, k, v)
VALUES (?, NULL, ?, ?)
`,
[Number(this._endCursor.orderKey), key, serialize(value)],
);
}

async del<T>(key: string) {
await this._db.run(
`
UPDATE kvs
SET to_block = ?
WHERE k = ? AND to_block IS NULL
`,
[Number(this._endCursor.orderKey), key],
);
}
}

0 comments on commit 751aa8a

Please sign in to comment.