Skip to content

Commit

Permalink
indexer: refactor indexer and test
Browse files Browse the repository at this point in the history
  • Loading branch information
jaipaljadeja committed Jul 10, 2024
1 parent 5a156eb commit 1ea8c42
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 13 deletions.
23 changes: 17 additions & 6 deletions packages/indexer/src/indexer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,25 @@ import {
import { klona } from "klona/full";
import { open } from "sqlite";
import sqlite3 from "sqlite3";
import { afterEach, describe, expect, it } from "vitest";
import { afterEach, beforeEach, describe, expect, it } from "vitest";
import { run } from "./indexer";
import { SqlitePersistence, sqlitePersistence } from "./plugins/persistence";
import { generateMockMessages, vcr } from "./testing";
import { type MockRet, getMockIndexer } from "./testing/indexer";

describe("Run Test", () => {
afterEach(async () => {
async function cleanup() {
try {
await fs.unlink("file:memdb_indexer?mode=memory&cache=shared");
} catch {}
}

beforeEach(async () => {
await cleanup();
});

afterEach(async () => {
await cleanup();
});

it("should stream messages", async () => {
Expand Down Expand Up @@ -136,7 +144,7 @@ describe("Run Test", () => {

it("factory mode: indexer should merge filters and restart when needed", async () => {
const client = new MockClient<MockFilter, MockBlock>((request, options) => {
const [, mainFilter] = request.filter;
const [_factoryFilter, mainFilter] = request.filter;

if (Object.keys(mainFilter).length === 0) {
expect(request.startingCursor?.orderKey).toEqual(100n);
Expand Down Expand Up @@ -357,7 +365,7 @@ describe("Run Test", () => {

it("factory mode: last cursor should persist when error is thrown in indexer", async () => {
const client = new MockClient<MockFilter, MockBlock>((request, options) => {
const [, mainFilter] = request.filter;
const [_factoryFilter, mainFilter] = request.filter;

if (Object.keys(mainFilter).length === 0) {
expect(request.startingCursor?.orderKey).toEqual(100n);
Expand Down Expand Up @@ -390,14 +398,15 @@ describe("Run Test", () => {
data: [{ data: "B" }, null],
},
},
Error("this error should not occurr!"),
];
}

if (mainFilter.filter === "B") {
expect(request.startingCursor?.orderKey).toEqual(102n);

return [
Error("Some error occurred!"),
Error("this error should occurr!"),
{
_tag: "data",
data: {
Expand Down Expand Up @@ -453,7 +462,9 @@ describe("Run Test", () => {

const sink = vcr<MockRet>();

await expect(() => run(client, indexer, sink)).rejects.toThrowError();
await expect(() => run(client, indexer, sink)).rejects.toThrowError(
"this error should occurr!",
);

// open same db again to check last cursor
const db = await open({
Expand Down
9 changes: 2 additions & 7 deletions packages/indexer/src/indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,6 @@ export async function run<TFilter, TBlock, TRet>(
};

while (true) {
let restartStream = false;

for await (const message of stream) {
await indexer.hooks.callHook("message", { message });

Expand Down Expand Up @@ -233,9 +231,6 @@ export async function run<TFilter, TBlock, TRet>(
data,
};

// restart stream
restartStream = true;

return;
}
}
Expand Down Expand Up @@ -309,13 +304,13 @@ export async function run<TFilter, TBlock, TRet>(

// if stream needs a restart
// break out of the current stream iterator
if (restartStream) {
if (state._tag !== "normal") {
break;
}
}

// when restarting stream we continue while loop again
if (restartStream) {
if (state._tag !== "normal") {
continue;
}

Expand Down

0 comments on commit 1ea8c42

Please sign in to comment.