Skip to content

Commit

Permalink
indexer: add factory mode tests
Browse files Browse the repository at this point in the history
  • Loading branch information
jaipaljadeja committed Jul 6, 2024
1 parent dc6ec2e commit b4ce1ca
Show file tree
Hide file tree
Showing 2 changed files with 358 additions and 4 deletions.
2 changes: 1 addition & 1 deletion examples/indexer/src/indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ export function createIndexerConfig(streamUrl: string) {
},
],
},
transform({ block: { header, logs, transactions } }) {
async transform({ block: { header, logs, transactions } }) {
const ctx = useIndexerContext();
ctx.counter += 1;

Expand Down
360 changes: 357 additions & 3 deletions packages/indexer/src/indexer.test.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,30 @@
import { MockClient } from "@apibara/protocol/testing";
import { describe, expect, it } from "vitest";
import fs from "node:fs/promises";
import {
type MockBlock,
MockClient,
type MockFilter,
} from "@apibara/protocol/testing";
import { klona } from "klona/full";
import { open } from "sqlite";
import sqlite3 from "sqlite3";
import { afterEach, describe, expect, it } from "vitest";
import { run } from "./indexer";
import { SqlitePersistence, sqlitePersistence } from "./plugins/persistence";
import { generateMockMessages, vcr } from "./testing";
import { type MockRet, getMockIndexer } from "./testing/indexer";

describe("Run Test", () => {
const client = new MockClient(generateMockMessages(), [{}]);
afterEach(async () => {
try {
await fs.unlink("file:memdb_indexer?mode=memory&cache=shared");
} catch {}
});

it("should stream messages", async () => {
const client = new MockClient<MockFilter, MockBlock>((request, options) => {
return generateMockMessages();
});

const sink = vcr<MockRet>();
await run(client, getMockIndexer(), sink);

Expand Down Expand Up @@ -116,4 +133,341 @@ describe("Run Test", () => {
]
`);
});

it("factory mode: indexer should merge filters and restart when needed", async () => {
const client = new MockClient<MockFilter, MockBlock>((request, options) => {
const [, mainFilter] = request.filter;

if (Object.keys(mainFilter).length === 0) {
expect(request.startingCursor?.orderKey).toEqual(100n);

return [
{
_tag: "data",
data: {
finality: "accepted",
cursor: { orderKey: 100n },
endCursor: { orderKey: 101n },
data: [null, null],
},
},
{
_tag: "data",
data: {
finality: "accepted",
cursor: { orderKey: 101n },
endCursor: { orderKey: 102n },
data: [null, null],
},
},
{
_tag: "data",
data: {
finality: "accepted",
cursor: { orderKey: 102n },
endCursor: { orderKey: 103n },
data: [{ data: "B" }, null],
},
},
];
}

if (mainFilter.filter === "B") {
expect(request.startingCursor?.orderKey).toEqual(102n);

return [
{
_tag: "data",
data: {
finality: "accepted",
cursor: { orderKey: 102n },
endCursor: { orderKey: 103n },
data: [{ data: "B" }, { data: "103B" }],
},
},
{
_tag: "data",
data: {
finality: "accepted",
cursor: { orderKey: 103n },
endCursor: { orderKey: 104n },
data: [null, { data: "104B" }],
},
},
{
_tag: "data",
data: {
finality: "accepted",
cursor: { orderKey: 104n },
endCursor: { orderKey: 105n },
data: [null, { data: "105B" }],
},
},
{
_tag: "data",
data: {
finality: "accepted",
cursor: { orderKey: 105n },
endCursor: { orderKey: 106n },
data: [{ data: "C" }, { data: "106B" }],
},
},
];
}

if (mainFilter.filter === "BC") {
expect(request.startingCursor?.orderKey).toEqual(105n);

return [
{
_tag: "data",
data: {
finality: "accepted",
cursor: { orderKey: 105n },
endCursor: { orderKey: 106n },
data: [{ data: "C" }, { data: "106BC" }],
},
},
{
_tag: "data",
data: {
finality: "accepted",
cursor: { orderKey: 106n },
endCursor: { orderKey: 107n },
data: [null, { data: "107BC" }],
},
},
{
_tag: "data",
data: {
finality: "accepted",
cursor: { orderKey: 107n },
endCursor: { orderKey: 108n },
data: [null, { data: "108BC" }],
},
},
];
}

return [];
});

const persistence = sqlitePersistence<MockFilter, MockBlock, MockRet>({
driver: sqlite3.Database,
filename: "file:memdb_indexer?mode=memory&cache=shared",
});

// create mock indexer with persistence plugin
const indexer = klona(getMockIndexer([persistence]));
indexer.options.startingCursor = { orderKey: 100n };
indexer.options.factory = async (block) => {
if (block.data === "B") {
return { filter: { filter: "B" } };
}

if (block.data === "C") {
return { filter: { filter: "C" } };
}

return {};
};

const sink = vcr<MockRet>();

await run(client, indexer, sink);

// open same db again to check last cursor
const db = await open({
driver: sqlite3.Database,
filename: "file:memdb_indexer?mode=memory&cache=shared",
});

const store = new SqlitePersistence<MockFilter>(db);

const latest = await store.get();

expect(latest.cursor?.orderKey).toEqual(108n);
expect(latest.filter?.filter).toEqual("BC");

expect(sink.result).toMatchInlineSnapshot(`
[
{
"data": [
{
"data": "103B",
},
],
"endCursor": {
"orderKey": 103n,
},
},
{
"data": [
{
"data": "104B",
},
],
"endCursor": {
"orderKey": 104n,
},
},
{
"data": [
{
"data": "105B",
},
],
"endCursor": {
"orderKey": 105n,
},
},
{
"data": [
{
"data": "106BC",
},
],
"endCursor": {
"orderKey": 106n,
},
},
{
"data": [
{
"data": "107BC",
},
],
"endCursor": {
"orderKey": 107n,
},
},
{
"data": [
{
"data": "108BC",
},
],
"endCursor": {
"orderKey": 108n,
},
},
]
`);
});

it("factory mode: last cursor should persist when error is thrown in indexer", async () => {
const client = new MockClient<MockFilter, MockBlock>((request, options) => {
const [, mainFilter] = request.filter;

if (Object.keys(mainFilter).length === 0) {
expect(request.startingCursor?.orderKey).toEqual(100n);

return [
{
_tag: "data",
data: {
finality: "accepted",
cursor: { orderKey: 100n },
endCursor: { orderKey: 101n },
data: [null, null],
},
},
{
_tag: "data",
data: {
finality: "accepted",
cursor: { orderKey: 101n },
endCursor: { orderKey: 102n },
data: [null, null],
},
},
{
_tag: "data",
data: {
finality: "accepted",
cursor: { orderKey: 102n },
endCursor: { orderKey: 103n },
data: [{ data: "B" }, null],
},
},
];
}

if (mainFilter.filter === "B") {
expect(request.startingCursor?.orderKey).toEqual(102n);

return [
Error("Some error occurred!"),
{
_tag: "data",
data: {
finality: "accepted",
cursor: { orderKey: 103n },
endCursor: { orderKey: 104n },
data: [null, { data: "104B" }],
},
},
{
_tag: "data",
data: {
finality: "accepted",
cursor: { orderKey: 104n },
endCursor: { orderKey: 105n },
data: [null, { data: "105B" }],
},
},
{
_tag: "data",
data: {
finality: "accepted",
cursor: { orderKey: 105n },
endCursor: { orderKey: 106n },
data: [{ data: "C" }, { data: "106B" }],
},
},
];
}

return [];
});

const persistence = sqlitePersistence<MockFilter, MockBlock, MockRet>({
driver: sqlite3.Database,
filename: "file:memdb_indexer?mode=memory&cache=shared",
});

// create mock indexer with persistence plugin
const indexer = klona(getMockIndexer([persistence]));
indexer.options.startingCursor = { orderKey: 100n };
indexer.options.factory = async (block) => {
if (block.data === "B") {
return { filter: { filter: "B" } };
}

if (block.data === "C") {
return { filter: { filter: "C" } };
}

return {};
};

const sink = vcr<MockRet>();

await expect(() => run(client, indexer, sink)).rejects.toThrowError();

// open same db again to check last cursor
const db = await open({
driver: sqlite3.Database,
filename: "file:memdb_indexer?mode=memory&cache=shared",
});

const store = new SqlitePersistence<MockFilter>(db);

const latest = await store.get();

expect(latest.cursor?.orderKey).toEqual(103n);
expect(latest.filter?.filter).toEqual("B");

expect(sink.result).toMatchInlineSnapshot("[]");
});
});

0 comments on commit b4ce1ca

Please sign in to comment.