From 1ea8c427739d17b162e448347ef9c6593cad9ba8 Mon Sep 17 00:00:00 2001 From: jaipaljadeja Date: Wed, 10 Jul 2024 23:28:14 +0530 Subject: [PATCH] indexer: refactor indexer and test --- packages/indexer/src/indexer.test.ts | 23 +++++++++++++++++------ packages/indexer/src/indexer.ts | 9 ++------- 2 files changed, 19 insertions(+), 13 deletions(-) diff --git a/packages/indexer/src/indexer.test.ts b/packages/indexer/src/indexer.test.ts index 82e0982..e532cd1 100644 --- a/packages/indexer/src/indexer.test.ts +++ b/packages/indexer/src/indexer.test.ts @@ -7,17 +7,25 @@ import { import { klona } from "klona/full"; import { open } from "sqlite"; import sqlite3 from "sqlite3"; -import { afterEach, describe, expect, it } from "vitest"; +import { afterEach, beforeEach, describe, expect, it } from "vitest"; import { run } from "./indexer"; import { SqlitePersistence, sqlitePersistence } from "./plugins/persistence"; import { generateMockMessages, vcr } from "./testing"; import { type MockRet, getMockIndexer } from "./testing/indexer"; describe("Run Test", () => { - afterEach(async () => { + async function cleanup() { try { await fs.unlink("file:memdb_indexer?mode=memory&cache=shared"); } catch {} + } + + beforeEach(async () => { + await cleanup(); + }); + + afterEach(async () => { + await cleanup(); }); it("should stream messages", async () => { @@ -136,7 +144,7 @@ describe("Run Test", () => { it("factory mode: indexer should merge filters and restart when needed", async () => { const client = new MockClient((request, options) => { - const [, mainFilter] = request.filter; + const [_factoryFilter, mainFilter] = request.filter; if (Object.keys(mainFilter).length === 0) { expect(request.startingCursor?.orderKey).toEqual(100n); @@ -357,7 +365,7 @@ describe("Run Test", () => { it("factory mode: last cursor should persist when error is thrown in indexer", async () => { const client = new MockClient((request, options) => { - const [, mainFilter] = request.filter; + const [_factoryFilter, mainFilter] = request.filter; if (Object.keys(mainFilter).length === 0) { expect(request.startingCursor?.orderKey).toEqual(100n); @@ -390,6 +398,7 @@ describe("Run Test", () => { data: [{ data: "B" }, null], }, }, + Error("this error should not occurr!"), ]; } @@ -397,7 +406,7 @@ describe("Run Test", () => { expect(request.startingCursor?.orderKey).toEqual(102n); return [ - Error("Some error occurred!"), + Error("this error should occurr!"), { _tag: "data", data: { @@ -453,7 +462,9 @@ describe("Run Test", () => { const sink = vcr(); - await expect(() => run(client, indexer, sink)).rejects.toThrowError(); + await expect(() => run(client, indexer, sink)).rejects.toThrowError( + "this error should occurr!", + ); // open same db again to check last cursor const db = await open({ diff --git a/packages/indexer/src/indexer.ts b/packages/indexer/src/indexer.ts index 8181447..51d7770 100644 --- a/packages/indexer/src/indexer.ts +++ b/packages/indexer/src/indexer.ts @@ -173,8 +173,6 @@ export async function run( }; while (true) { - let restartStream = false; - for await (const message of stream) { await indexer.hooks.callHook("message", { message }); @@ -233,9 +231,6 @@ export async function run( data, }; - // restart stream - restartStream = true; - return; } } @@ -309,13 +304,13 @@ export async function run( // if stream needs a restart // break out of the current stream iterator - if (restartStream) { + if (state._tag !== "normal") { break; } } // when restarting stream we continue while loop again - if (restartStream) { + if (state._tag !== "normal") { continue; }