Skip to content

Commit

Permalink
indexer: add better-sqlite3 (#92)
Browse files Browse the repository at this point in the history
- remove sqlite & sqlite3 and replace it with better-sqlite3
- add test for csv and sqlite sink
  • Loading branch information
fracek authored Jul 17, 2024
2 parents 02ccc95 + a86370b commit 316f5b7
Show file tree
Hide file tree
Showing 12 changed files with 461 additions and 978 deletions.
3 changes: 2 additions & 1 deletion examples/indexer/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@
"@opentelemetry/sdk-node": "^0.52.0",
"@opentelemetry/sdk-trace-base": "^1.25.0",
"@opentelemetry/semantic-conventions": "^1.25.0",
"better-sqlite3": "^11.1.2",
"citty": "^0.1.6",
"consola": "^3.2.3",
"csv-stringify": "^6.5.0",
"sqlite": "^5.1.1",
"viem": "^2.12.4"
},
"devDependencies": {
"@types/better-sqlite3": "^7.6.11",
"@types/node": "^20.12.12",
"jiti": "^1.21.0"
}
Expand Down
6 changes: 3 additions & 3 deletions packages/indexer/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,10 @@
"format": "biome format . --write"
},
"devDependencies": {
"@types/better-sqlite3": "^7.6.11",
"@types/node": "^20.14.0",
"better-sqlite3": "^11.1.2",
"csv-stringify": "^6.5.0",
"sqlite": "^5.1.1",
"sqlite3": "^5.1.7",
"unbuild": "^2.0.0",
"vitest": "^1.6.0"
},
Expand All @@ -65,8 +65,8 @@
"unctx": "^2.3.1"
},
"peerDependencies": {
"better-sqlite3": "^11.1.2",
"csv-stringify": "^6.5.0",
"sqlite": "^5.1.1",
"vitest": "^1.6.0"
}
}
50 changes: 14 additions & 36 deletions packages/indexer/src/indexer.test.ts
Original file line number Diff line number Diff line change
@@ -1,33 +1,17 @@
import fs from "node:fs/promises";
import {
type MockBlock,
MockClient,
type MockFilter,
} from "@apibara/protocol/testing";
import Database from "better-sqlite3";
import { klona } from "klona/full";
import { open } from "sqlite";
import sqlite3 from "sqlite3";
import { afterEach, beforeEach, describe, expect, it } from "vitest";
import { describe, expect, it } from "vitest";
import { run } from "./indexer";
import { SqlitePersistence, sqlitePersistence } from "./plugins/persistence";
import { generateMockMessages, vcr } from "./testing";
import { type MockRet, getMockIndexer } from "./testing/indexer";

describe("Run Test", () => {
async function cleanup() {
try {
await fs.unlink("file:memdb_indexer?mode=memory&cache=shared");
} catch {}
}

beforeEach(async () => {
await cleanup();
});

afterEach(async () => {
await cleanup();
});

it("should stream messages", async () => {
const client = new MockClient<MockFilter, MockBlock>((request, options) => {
return generateMockMessages();
Expand Down Expand Up @@ -260,9 +244,10 @@ describe("Run Test", () => {
return [];
});

const db = Database(":memory:");

const persistence = sqlitePersistence<MockFilter, MockBlock, MockRet>({
driver: sqlite3.Database,
filename: "file:memdb_indexer?mode=memory&cache=shared",
database: db,
});

// create mock indexer with persistence plugin
Expand All @@ -284,15 +269,9 @@ describe("Run Test", () => {

await run(client, indexer, sink);

// open same db again to check last cursor
const db = await open({
driver: sqlite3.Database,
filename: "file:memdb_indexer?mode=memory&cache=shared",
});

const store = new SqlitePersistence<MockFilter>(db);

const latest = await store.get();
const latest = store.get();

expect(latest.cursor?.orderKey).toEqual(108n);
expect(latest.filter?.filter).toEqual("BC");
Expand Down Expand Up @@ -361,6 +340,8 @@ describe("Run Test", () => {
},
]
`);

db.close();
});

it("factory mode: last cursor should persist when error is thrown in indexer", async () => {
Expand Down Expand Up @@ -440,9 +421,10 @@ describe("Run Test", () => {
return [];
});

const db = Database(":memory:");

const persistence = sqlitePersistence<MockFilter, MockBlock, MockRet>({
driver: sqlite3.Database,
filename: "file:memdb_indexer?mode=memory&cache=shared",
database: db,
});

// create mock indexer with persistence plugin
Expand All @@ -466,19 +448,15 @@ describe("Run Test", () => {
"this error should occurr!",
);

// open same db again to check last cursor
const db = await open({
driver: sqlite3.Database,
filename: "file:memdb_indexer?mode=memory&cache=shared",
});

const store = new SqlitePersistence<MockFilter>(db);

const latest = await store.get();
const latest = store.get();

expect(latest.cursor?.orderKey).toEqual(103n);
expect(latest.filter?.filter).toEqual("B");

expect(sink.result).toMatchInlineSnapshot("[]");

db.close();
});
});
62 changes: 35 additions & 27 deletions packages/indexer/src/plugins/kv.test.ts
Original file line number Diff line number Diff line change
@@ -1,82 +1,89 @@
import { type Database, open } from "sqlite";
import sqlite3 from "sqlite3";
import Database, { type Database as SqliteDatabase } from "better-sqlite3";
import { afterAll, beforeAll, describe, expect, it } from "vitest";
import { KVStore } from "./kv";

type ValueType = { data: bigint };

type DatabaseRowType = {
from_block: number;
k: string;
to_block: number;
v: unknown;
};

describe("KVStore", () => {
let db: Database<sqlite3.Database, sqlite3.Statement>;
let db: SqliteDatabase;
let store: KVStore;
const key = "test_key";

beforeAll(async () => {
db = await open({ driver: sqlite3.Database, filename: ":memory:" });
await KVStore.initialize(db);
beforeAll(() => {
db = new Database(":memory:");
KVStore.initialize(db);
store = new KVStore(db, "finalized", { orderKey: 5_000_000n });
});

afterAll(async () => {
await db.close();
db.close();
});

it("should begin transaction", async () => {
await store.beginTransaction();
it("should begin transaction", () => {
store.beginTransaction();
});

it("should put and get a value", async () => {
const value = { data: 0n };

await store.put<ValueType>(key, value);
const result = await store.get<ValueType>(key);
store.put<ValueType>(key, value);
const result = store.get<ValueType>(key);

expect(result).toEqual(value);
});

it("should commit transaction", async () => {
await store.commitTransaction();
store.commitTransaction();

const value = { data: 0n };

const result = await store.get<ValueType>(key);
const result = store.get<ValueType>(key);

expect(result).toEqual(value);
});

it("should return undefined for non-existing key", async () => {
const result = await store.get<ValueType>("non_existent_key");
const result = store.get<ValueType>("non_existent_key");
expect(result).toBeUndefined();
});

it("should begin transaction", async () => {
await store.beginTransaction();
store.beginTransaction();
});

it("should update an existing value", async () => {
store = new KVStore(db, "finalized", { orderKey: 5_000_020n });

const value = { data: 50n };

await store.put<ValueType>(key, value);
const result = await store.get<ValueType>(key);
store.put<ValueType>(key, value);
const result = store.get<ValueType>(key);

expect(result).toEqual(value);
});

it("should delete a value", async () => {
await store.del(key);
const result = await store.get<ValueType>(key);
store.del(key);
const result = store.get<ValueType>(key);

expect(result).toBeUndefined();

const rows = await db.all(
`
const rows = db
.prepare<string, DatabaseRowType>(
`
SELECT from_block, to_block, k, v
FROM kvs
WHERE k = ?
`,
[key],
);
)
.all(key);

// Check that the old is correctly marked with to_block
expect(rows[0].to_block).toBe(Number(5_000_020n));
Expand All @@ -87,14 +94,15 @@ describe("KVStore", () => {
});

it("should revert the changes to last commit", async () => {
const rows = await db.all(
`
const rows = db
.prepare(
`
SELECT from_block, to_block, k, v
FROM kvs
WHERE k = ?
`,
[key],
);
)
.all([key]);

expect(rows).toMatchInlineSnapshot(`
[
Expand Down
Loading

0 comments on commit 316f5b7

Please sign in to comment.