Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

indexer: add kv plugin #84

Merged
merged 7 commits into from
Jun 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion biome.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@
"linter": {
"enabled": true,
"rules": {
"recommended": true
"recommended": true,
"correctness": {
"noUnusedImports": "error"
}
}
},
"formatter": {
Expand Down
2 changes: 1 addition & 1 deletion examples/evm-client/src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ const command = defineCommand({
abi,
eventName: "Transfer",
args: { from: null, to: null },
}),
}) as `0x${string}`[],
},
],
});
Expand Down
2 changes: 1 addition & 1 deletion examples/indexer/src/indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ export function createIndexerConfig(streamUrl: string) {
abi,
eventName: "Transfer",
args: { from: null, to: null },
}),
}) as `0x${string}`[],
},
],
},
Expand Down
2 changes: 1 addition & 1 deletion packages/evm/src/filter.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ describe("Filter", () => {
abi,
eventName: "Transfer",
args: { from: null, to: null },
}),
}) as `0x${string}`[],
},
],
});
Expand Down
2 changes: 2 additions & 0 deletions packages/indexer/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,14 @@
"lint": "biome check .",
"typecheck": "tsc --noEmit",
"lint:fix": "pnpm lint --write",
"test": "vitest",
"format": "biome format . --write"
},
"devDependencies": {
"@types/node": "^20.14.0",
"csv-stringify": "^6.5.0",
"sqlite": "^5.1.1",
"sqlite3": "^5.1.7",
"unbuild": "^2.0.0",
"vitest": "^1.6.0"
},
Expand Down
1 change: 1 addition & 0 deletions packages/indexer/src/hooks/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export * from "./useKVStore";
12 changes: 12 additions & 0 deletions packages/indexer/src/hooks/useKVStore.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import { useIndexerContext } from "../context";
import type { KVStore } from "../plugins/kv";

export type UseKVStoreResult = InstanceType<typeof KVStore>;

export function useKVStore(): UseKVStoreResult {
const ctx = useIndexerContext();

if (!ctx?.kv) throw new Error("KV Plugin is not available in context!");

return ctx.kv;
}
32 changes: 28 additions & 4 deletions packages/indexer/src/indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import {
createHooks,
} from "hookable";

import assert from "node:assert";
import { indexerAsyncContext } from "./context";
import { tracer } from "./otel";
import type { IndexerPlugin } from "./plugins";
Expand All @@ -31,8 +32,17 @@ export interface IndexerHooks<TFilter, TBlock, TRet> {
options: StreamDataOptions;
}) => void;
"connect:after": () => void;
"handler:before": ({ block }: { block: TBlock }) => void;
"handler:before": ({
block,
finality,
endCursor,
}: {
block: TBlock;
finality: DataFinality;
endCursor?: Cursor;
}) => void;
"handler:after": ({ output }: { output: TRet[] }) => void;
"handler:exception": ({ error }: { error: Error }) => void;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this handler called anywhere? I think we should catch the handler exception and then rethrow it after calling the hook.

"sink:write": ({ data }: { data: TRet[] }) => void;
"sink:flush": () => void;
message: ({ message }: { message: StreamDataResponse<TBlock> }) => void;
Expand Down Expand Up @@ -149,13 +159,27 @@ export async function run<TFilter, TBlock, TRet>(
const output = await tracer.startActiveSpan(
"handler",
async (span) => {
await indexer.hooks.callHook("handler:before", { block });
const output = await indexer.options.transform({
await indexer.hooks.callHook("handler:before", {
block,
cursor,
endCursor,
finality,
});

let output: TRet[];

try {
output = await indexer.options.transform({
block,
cursor,
endCursor,
finality,
});
} catch (error) {
assert(error instanceof Error);
await indexer.hooks.callHook("handler:exception", { error });
throw error;
}

await indexer.hooks.callHook("handler:after", { output });

span.end();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { Indexer } from "./indexer";
import type { Indexer } from "../indexer";

export type IndexerPlugin<TFilter, TBlock, TRet> = (
indexer: Indexer<TFilter, TBlock, TRet>,
Expand Down
2 changes: 1 addition & 1 deletion packages/indexer/src/plugins/index.ts
Original file line number Diff line number Diff line change
@@ -1 +1 @@
export * from "./kv";
export * from "./config";
112 changes: 112 additions & 0 deletions packages/indexer/src/plugins/kv.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
import { type Database, open } from "sqlite";
import sqlite3 from "sqlite3";
import { afterAll, beforeAll, describe, expect, it } from "vitest";
import { KVStore } from "./kv";

type ValueType = { data: bigint };

describe("KVStore", () => {
let db: Database<sqlite3.Database, sqlite3.Statement>;
let store: KVStore;
const key = "test_key";

beforeAll(async () => {
db = await open({ driver: sqlite3.Database, filename: ":memory:" });
await KVStore.initialize(db);
store = new KVStore(db, "finalized", { orderKey: 5_000_000n });
});

afterAll(async () => {
await db.close();
});

it("should begin transaction", async () => {
await store.beginTransaction();
});

it("should put and get a value", async () => {
const value = { data: 0n };

await store.put<ValueType>(key, value);
const result = await store.get<ValueType>(key);

expect(result).toEqual(value);
});

it("should commit transaction", async () => {
await store.commitTransaction();

const value = { data: 0n };

const result = await store.get<ValueType>(key);

expect(result).toEqual(value);
});

it("should return undefined for non-existing key", async () => {
const result = await store.get<ValueType>("non_existent_key");
expect(result).toBeUndefined();
});

it("should begin transaction", async () => {
await store.beginTransaction();
});

it("should update an existing value", async () => {
store = new KVStore(db, "finalized", { orderKey: 5_000_020n });

const value = { data: 50n };

await store.put<ValueType>(key, value);
const result = await store.get<ValueType>(key);

expect(result).toEqual(value);
});

it("should delete a value", async () => {
await store.del(key);
const result = await store.get<ValueType>(key);

expect(result).toBeUndefined();

const rows = await db.all(
`
SELECT from_block, to_block, k, v
FROM kvs
WHERE k = ?
`,
[key],
);

// Check that the old is correctly marked with to_block
expect(rows[0].to_block).toBe(Number(5_000_020n));
});

it("should rollback transaction", async () => {
await store.rollbackTransaction();
});

it("should revert the changes to last commit", async () => {
const rows = await db.all(
`
SELECT from_block, to_block, k, v
FROM kvs
WHERE k = ?
`,
[key],
);

expect(rows).toMatchInlineSnapshot(`
[
{
"from_block": 5000000,
"k": "test_key",
"to_block": null,
"v": "{
"data": "0n"
}",
},
]
`);
});
});
120 changes: 116 additions & 4 deletions packages/indexer/src/plugins/kv.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,128 @@
import assert from "node:assert";
import type { Cursor, DataFinality } from "@apibara/protocol";
import { type Database, type ISqlite, open } from "sqlite";
import { useIndexerContext } from "../context";
import { defineIndexerPlugin } from "../plugins";
import { deserialize, serialize } from "../vcr";
import { defineIndexerPlugin } from "./config";

/** This plugin is a placeholder for a future key-value store plugin. */
export function kv<TFilter, TBlock, TRet>() {
type SqliteArgs = ISqlite.Config;

export function kv<TFilter, TBlock, TRet>(args: SqliteArgs) {
return defineIndexerPlugin<TFilter, TBlock, TRet>((indexer) => {
let db: Database;

indexer.hooks.hook("run:before", async () => {
db = await open(args);
await KVStore.initialize(db);
});

indexer.hooks.hook("handler:before", async ({ finality, endCursor }) => {
const ctx = useIndexerContext();

assert(endCursor, new Error("endCursor cannot be undefined"));

ctx.kv = new KVStore(db, finality, endCursor);

await ctx.kv.beginTransaction();
});

indexer.hooks.hook("handler:after", async () => {
const ctx = useIndexerContext();

await ctx.kv.commitTransaction();

ctx.kv = null;
});

indexer.hooks.hook("handler:exception", async () => {
const ctx = useIndexerContext();
ctx.kv = {};

await ctx.kv.rollbackTransaction();

ctx.kv = null;
});

indexer.hooks.hook("run:after", async () => {
console.log("kv: ", useIndexerContext().kv);
});
});
}

export class KVStore {
constructor(
private _db: Database,
private _finality: DataFinality,
private _endCursor: Cursor,
) {}

static async initialize(db: Database) {
await db.exec(`
CREATE TABLE IF NOT EXISTS kvs (
from_block INTEGER NOT NULL,
to_block INTEGER,
k TEXT NOT NULL,
v BLOB NOT NULL,
PRIMARY KEY (from_block, k)
);
`);
}

async beginTransaction() {
await this._db.exec("BEGIN TRANSACTION");
}

async commitTransaction() {
await this._db.exec("COMMIT TRANSACTION");
}

async rollbackTransaction() {
await this._db.exec("ROLLBACK TRANSACTION");
}

async get<T>(key: string): Promise<T> {
const row = await this._db.get<{ v: string }>(
`
SELECT v
FROM kvs
WHERE k = ? AND to_block IS NULL
`,
[key],
);

return row ? deserialize(row.v) : undefined;
}

async put<T>(key: string, value: T) {
await this._db.run(
`
UPDATE kvs
SET to_block = ?
WHERE k = ? AND to_block IS NULL
`,
[Number(this._endCursor.orderKey), key],
);

await this._db.run(
`
INSERT INTO kvs (from_block, to_block, k, v)
VALUES (?, NULL, ?, ?)
`,
[
Number(this._endCursor.orderKey),
key,
serialize(value as Record<string, unknown>),
],
);
}

async del(key: string) {
await this._db.run(
`
UPDATE kvs
SET to_block = ?
WHERE k = ? AND to_block IS NULL
`,
[Number(this._endCursor.orderKey), key],
);
}
}
Loading
Loading