Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

indexer: add kv plugin #84

Merged
merged 7 commits into from
Jun 26, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion biome.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@
"linter": {
"enabled": true,
"rules": {
"recommended": true
"recommended": true,
"correctness": {
"noUnusedImports": "error"
}
}
},
"formatter": {
Expand Down
2 changes: 1 addition & 1 deletion examples/evm-client/src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ const command = defineCommand({
abi,
eventName: "Transfer",
args: { from: null, to: null },
}),
}) as `0x${string}`[],
},
],
});
Expand Down
2 changes: 1 addition & 1 deletion examples/indexer/src/indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ export function createIndexerConfig(streamUrl: string) {
abi,
eventName: "Transfer",
args: { from: null, to: null },
}),
}) as `0x${string}`[],
},
],
},
Expand Down
2 changes: 1 addition & 1 deletion packages/evm/src/filter.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ describe("Filter", () => {
abi,
eventName: "Transfer",
args: { from: null, to: null },
}),
}) as `0x${string}`[],
},
],
});
Expand Down
2 changes: 2 additions & 0 deletions packages/indexer/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,14 @@
"lint": "biome check .",
"typecheck": "tsc --noEmit",
"lint:fix": "pnpm lint --write",
"test": "vitest",
"format": "biome format . --write"
},
"devDependencies": {
"@types/node": "^20.14.0",
"csv-stringify": "^6.5.0",
"sqlite": "^5.1.1",
"sqlite3": "^5.1.7",
"unbuild": "^2.0.0",
"vitest": "^1.6.0"
},
Expand Down
1 change: 1 addition & 0 deletions packages/indexer/src/hooks/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export * from "./useKVStore";
12 changes: 12 additions & 0 deletions packages/indexer/src/hooks/useKVStore.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import { useIndexerContext } from "../context";
import type { KVStore } from "../plugins/kv";

export type UseKVStoreResult = InstanceType<typeof KVStore>;

export function useKVStore(): UseKVStoreResult {
const ctx = useIndexerContext();

if (!ctx?.kv) throw new Error("KV Plugin is not available in context!");

return ctx.kv;
}
32 changes: 28 additions & 4 deletions packages/indexer/src/indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import {
createHooks,
} from "hookable";

import assert from "node:assert";
import { indexerAsyncContext } from "./context";
import { tracer } from "./otel";
import type { IndexerPlugin } from "./plugins";
Expand All @@ -31,8 +32,17 @@ export interface IndexerHooks<TFilter, TBlock, TRet> {
options: StreamDataOptions;
}) => void;
"connect:after": () => void;
"handler:before": ({ block }: { block: TBlock }) => void;
"handler:before": ({
block,
finality,
endCursor,
}: {
block: TBlock;
finality: DataFinality;
endCursor?: Cursor;
}) => void;
"handler:after": ({ output }: { output: TRet[] }) => void;
"handler:exception": ({ error }: { error: Error }) => void;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this handler called anywhere? I think we should catch the handler exception and then rethrow it after calling the hook.

"sink:write": ({ data }: { data: TRet[] }) => void;
"sink:flush": () => void;
message: ({ message }: { message: StreamDataResponse<TBlock> }) => void;
Expand Down Expand Up @@ -149,13 +159,27 @@ export async function run<TFilter, TBlock, TRet>(
const output = await tracer.startActiveSpan(
"handler",
async (span) => {
await indexer.hooks.callHook("handler:before", { block });
const output = await indexer.options.transform({
await indexer.hooks.callHook("handler:before", {
block,
cursor,
endCursor,
finality,
});

let output: TRet[];

try {
output = await indexer.options.transform({
block,
cursor,
endCursor,
finality,
});
} catch (error) {
assert(error instanceof Error);
await indexer.hooks.callHook("handler:exception", { error });
throw error;
}

await indexer.hooks.callHook("handler:after", { output });

span.end();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { Indexer } from "./indexer";
import type { Indexer } from "../indexer";

export type IndexerPlugin<TFilter, TBlock, TRet> = (
indexer: Indexer<TFilter, TBlock, TRet>,
Expand Down
2 changes: 1 addition & 1 deletion packages/indexer/src/plugins/index.ts
Original file line number Diff line number Diff line change
@@ -1 +1 @@
export * from "./kv";
export * from "./config";
74 changes: 74 additions & 0 deletions packages/indexer/src/plugins/kv.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import { type Database, open } from "sqlite";
import sqlite3 from "sqlite3";
import { afterAll, beforeAll, describe, expect, it } from "vitest";
import { KVStore } from "./kv";

type ValueType = { data: bigint };

describe("KVStore", () => {
let db: Database<sqlite3.Database, sqlite3.Statement>;
let store: KVStore;
const key = "test_key";

beforeAll(async () => {
db = await open({ driver: sqlite3.Database, filename: ":memory:" });
await db.exec(`
CREATE TABLE IF NOT EXISTS kvs (
from_block INTEGER NOT NULL,
to_block INTEGER,
k TEXT NOT NULL,
v BLOB NOT NULL,
PRIMARY KEY (from_block, k)
);
`);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe put this logic in a static initialize(db) method on the KVStore? That way we can share the logic between tests and hooks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I should have caught it in the previous review round!

store = new KVStore(db, "finalized", { orderKey: 5_000_000n });
});

afterAll(async () => {
await db.close();
});

it("should put and get a value", async () => {
const value = { data: 0n };

await store.put<ValueType>(key, value);
const result = await store.get<ValueType>(key);

expect(result).toEqual(value);
});

it("should return undefined for non-existing key", async () => {
const result = await store.get<ValueType>("non_existent_key");
expect(result).toBeUndefined();
});

it("should update an existing value", async () => {
store = new KVStore(db, "finalized", { orderKey: 5_000_020n });

const value = { data: 50n };

await store.put<ValueType>(key, value);
const result = await store.get<ValueType>(key);

expect(result).toEqual(value);
});

it("should delete a value", async () => {
await store.del(key);
const result = await store.get<ValueType>(key);

expect(result).toBeUndefined();

const rows = await db.all(
`
SELECT from_block, to_block, k, v
FROM kvs
WHERE k = ?
`,
[key],
);

// Check that the old is correctly marked with to_block
expect(rows[0].to_block).toBe(Number(5_000_020n));
});
});
104 changes: 100 additions & 4 deletions packages/indexer/src/plugins/kv.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,112 @@
import assert from "node:assert";
import type { Cursor, DataFinality } from "@apibara/protocol";
import { type Database, type ISqlite, open } from "sqlite";
import { useIndexerContext } from "../context";
import { defineIndexerPlugin } from "../plugins";
import { deserialize, serialize } from "../vcr";
import { defineIndexerPlugin } from "./config";

/** This plugin is a placeholder for a future key-value store plugin. */
export function kv<TFilter, TBlock, TRet>() {
type SqliteArgs = ISqlite.Config;

export function kv<TFilter, TBlock, TRet>(args: SqliteArgs) {
return defineIndexerPlugin<TFilter, TBlock, TRet>((indexer) => {
let db: Database;

indexer.hooks.hook("run:before", async () => {
db = await open(args);
await db.exec(`
CREATE TABLE IF NOT EXISTS kvs (
from_block INTEGER NOT NULL,
to_block INTEGER,
k TEXT NOT NULL,
v BLOB NOT NULL,
PRIMARY KEY (from_block, k)
);
`);
});

indexer.hooks.hook("handler:before", async ({ finality, endCursor }) => {
const ctx = useIndexerContext();

assert(endCursor, new Error("endCursor cannot be undefined"));

ctx.kv = new KVStore(db, finality, endCursor);

await db.exec("BEGIN TRANSACTION");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My advice is to put this into a function on the kv so that it can be tested. You should test both the successfull flow (begin tx -> commit) and the exception flow (begin tx -> rollback)

Suggested change
ctx.kv = new KVStore(db, finality, endCursor);
await db.exec("BEGIN TRANSACTION");
ctx.kv = new KVStore(db, finality, endCursor);
await ctx.kv.beginTransaction();

});

indexer.hooks.hook("handler:after", async () => {
await db.exec("COMMIT TRANSACTION");

const ctx = useIndexerContext();

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
await db.exec("COMMIT TRANSACTION");
const ctx = useIndexerContext();
const ctx = useIndexerContext();
ctx.kv.commitTransaction()

ctx.kv = null;
});

indexer.hooks.hook("handler:exception", async () => {
await db.exec("ROLLBACK TRANSACTION");

const ctx = useIndexerContext();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
await db.exec("ROLLBACK TRANSACTION");
const ctx = useIndexerContext();
const ctx = useIndexerContext();
ctx.kv.rollbackTransaction();

ctx.kv = {};

ctx.kv = null;
});

indexer.hooks.hook("run:after", async () => {
console.log("kv: ", useIndexerContext().kv);
});
});
}

export class KVStore {
constructor(
private _db: Database,
private _finality: DataFinality,
private _endCursor: Cursor,
) {}

async get<T>(key: string): Promise<T> {
const row = await this._db.get<{ v: string }>(
`
SELECT v
FROM kvs
WHERE k = ? AND to_block IS NULL
`,
[key],
);

return row ? deserialize(row.v) : undefined;
}

async put<T>(key: string, value: T) {
await this._db.run(
`
UPDATE kvs
SET to_block = ?
WHERE k = ? AND to_block IS NULL
`,
[Number(this._endCursor.orderKey), key],
);

await this._db.run(
`
INSERT INTO kvs (from_block, to_block, k, v)
VALUES (?, NULL, ?, ?)
`,
[
Number(this._endCursor.orderKey),
key,
serialize(value as Record<string, unknown>),
],
);
}

async del(key: string) {
await this._db.run(
`
UPDATE kvs
SET to_block = ?
WHERE k = ? AND to_block IS NULL
`,
[Number(this._endCursor.orderKey), key],
);
}
}
Loading
Loading