From a36dc0f799f9b671891e99b731519a82aeac6383 Mon Sep 17 00:00:00 2001 From: jaipaljadeja Date: Fri, 29 Nov 2024 18:32:47 +0530 Subject: [PATCH 1/4] indexer: add invalidateOnRestart in sinks --- packages/indexer/src/indexer.ts | 3 +++ packages/indexer/src/internal/testing.ts | 5 +++++ packages/indexer/src/sink.ts | 7 +++++++ packages/indexer/src/sinks/csv.ts | 5 +++++ packages/indexer/src/sinks/drizzle/drizzle.ts | 4 ++++ packages/indexer/src/sinks/sqlite.ts | 5 +++++ 6 files changed, 29 insertions(+) diff --git a/packages/indexer/src/indexer.ts b/packages/indexer/src/indexer.ts index d15dc3f..3026a6b 100644 --- a/packages/indexer/src/indexer.ts +++ b/packages/indexer/src/indexer.ts @@ -178,6 +178,9 @@ export async function run( await indexer.hooks.callHook("connect:before", { request, options }); + // avoid having duplicate data if it was inserted before the persistence commited the state + await sink.invalidateOnRestart(request.startingCursor); + // store main filter, so later it can be merged let mainFilter: TFilter; if (isFactoryMode) { diff --git a/packages/indexer/src/internal/testing.ts b/packages/indexer/src/internal/testing.ts index 4499336..38f8298 100644 --- a/packages/indexer/src/internal/testing.ts +++ b/packages/indexer/src/internal/testing.ts @@ -91,6 +91,11 @@ export class MockSink extends Sink { this.write({ data: context.buffer, endCursor }); } + async invalidateOnRestart(cursor?: Cursor) { + // TODO: Implement + throw new Error("Not implemented"); + } + async invalidate(cursor?: Cursor) { // TODO: Implement throw new Error("Not implemented"); diff --git a/packages/indexer/src/sink.ts b/packages/indexer/src/sink.ts index ca094b6..3edfc7d 100644 --- a/packages/indexer/src/sink.ts +++ b/packages/indexer/src/sink.ts @@ -16,6 +16,7 @@ export abstract class Sink { cb: (params: TTxnParams) => Promise, ): Promise; + abstract invalidateOnRestart(cursor?: Cursor): Promise; abstract invalidate(cursor?: Cursor): Promise; abstract finalize(cursor?: Cursor): Promise; } @@ -28,6 +29,12 @@ export class DefaultSink extends Sink { await cb({}); } + async invalidateOnRestart(cursor?: Cursor) { + consola.info( + `Invalidating all rows with cursor > ${cursor?.orderKey} on restart`, + ); + } + async invalidate(cursor?: Cursor) { consola.info(`Invalidating cursor ${cursor?.orderKey}`); } diff --git a/packages/indexer/src/sinks/csv.ts b/packages/indexer/src/sinks/csv.ts index f31893f..7016948 100644 --- a/packages/indexer/src/sinks/csv.ts +++ b/packages/indexer/src/sinks/csv.ts @@ -99,6 +99,11 @@ export class CsvSink extends Sink { await this.write({ data: context.buffer, endCursor }); } + async invalidateOnRestart(cursor?: Cursor) { + // TODO: Implement + throw new Error("Not implemented"); + } + async invalidate(cursor?: Cursor) { // TODO: Implement throw new Error("Not implemented"); diff --git a/packages/indexer/src/sinks/drizzle/drizzle.ts b/packages/indexer/src/sinks/drizzle/drizzle.ts index 4e160e8..4fcdd0d 100644 --- a/packages/indexer/src/sinks/drizzle/drizzle.ts +++ b/packages/indexer/src/sinks/drizzle/drizzle.ts @@ -83,6 +83,10 @@ export class DrizzleSink< }); } + async invalidateOnRestart(cursor?: Cursor) { + await this.invalidate(cursor); + } + async invalidate(cursor?: Cursor) { await this._db.transaction(async (db) => { for (const table of this._tables) { diff --git a/packages/indexer/src/sinks/sqlite.ts b/packages/indexer/src/sinks/sqlite.ts index f735e9c..b213eeb 100644 --- a/packages/indexer/src/sinks/sqlite.ts +++ b/packages/indexer/src/sinks/sqlite.ts @@ -99,6 +99,11 @@ export class SqliteSink extends Sink { await this.write({ data: context.buffer, endCursor }); } + async invalidateOnRestart(cursor?: Cursor) { + // TODO: Implement + throw new Error("Not implemented"); + } + async invalidate(cursor?: Cursor) { // TODO: Implement throw new Error("Not implemented"); From bec3f50a29e2a1a43d5a33da124f4cf981553010 Mon Sep 17 00:00:00 2001 From: jaipaljadeja Date: Fri, 29 Nov 2024 19:03:05 +0530 Subject: [PATCH 2/4] indexer: add invalidation in sqlite sink --- packages/indexer/src/sinks/sqlite.ts | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/packages/indexer/src/sinks/sqlite.ts b/packages/indexer/src/sinks/sqlite.ts index b213eeb..0b625dc 100644 --- a/packages/indexer/src/sinks/sqlite.ts +++ b/packages/indexer/src/sinks/sqlite.ts @@ -100,13 +100,14 @@ export class SqliteSink extends Sink { } async invalidateOnRestart(cursor?: Cursor) { - // TODO: Implement - throw new Error("Not implemented"); + await this.invalidate(cursor); } async invalidate(cursor?: Cursor) { - // TODO: Implement - throw new Error("Not implemented"); + const cursorValue = Number(cursor?.orderKey); + + const sql = `DELETE FROM ${this._config.tableName} WHERE ${this._config.cursorColumn ?? "_cursor"} > ?`; + this._db.prepare(sql).run(cursorValue); } async finalize(cursor?: Cursor) { From 6624508c09c0aeabb2056c33c0f08f4c6f4f7c7f Mon Sep 17 00:00:00 2001 From: jaipaljadeja Date: Fri, 29 Nov 2024 19:24:39 +0530 Subject: [PATCH 3/4] indexer: add finalize method in drizzle sink --- packages/indexer/src/sinks/csv.ts | 9 +++------ packages/indexer/src/sinks/drizzle/drizzle.ts | 13 +++++++++---- packages/indexer/src/sinks/sqlite.ts | 3 +-- 3 files changed, 13 insertions(+), 12 deletions(-) diff --git a/packages/indexer/src/sinks/csv.ts b/packages/indexer/src/sinks/csv.ts index 7016948..df64a7d 100644 --- a/packages/indexer/src/sinks/csv.ts +++ b/packages/indexer/src/sinks/csv.ts @@ -100,18 +100,15 @@ export class CsvSink extends Sink { } async invalidateOnRestart(cursor?: Cursor) { - // TODO: Implement - throw new Error("Not implemented"); + // No Implementation required } async invalidate(cursor?: Cursor) { - // TODO: Implement - throw new Error("Not implemented"); + throw new Error("Reorg for CSV Sink is not implemented"); } async finalize(cursor?: Cursor) { - // TODO: Implement - throw new Error("Not implemented"); + // No Implementation required } private async insertToCSV(data: SinkData[]) { diff --git a/packages/indexer/src/sinks/drizzle/drizzle.ts b/packages/indexer/src/sinks/drizzle/drizzle.ts index 4fcdd0d..969a306 100644 --- a/packages/indexer/src/sinks/drizzle/drizzle.ts +++ b/packages/indexer/src/sinks/drizzle/drizzle.ts @@ -93,8 +93,7 @@ export class DrizzleSink< // delete all rows whose lowerbound of "_cursor" (int8range) column is greater than the invalidate cursor await db .delete(table) - .where(gt(sql`lower(_cursor)`, sql`${Number(cursor?.orderKey)}`)) - .returning(); + .where(gt(sql`lower(_cursor)`, sql`${Number(cursor?.orderKey)}`)); // and for rows whose upperbound of "_cursor" (int8range) column is greater than the invalidate cursor, set the upperbound to infinity await db .update(table) @@ -107,8 +106,14 @@ export class DrizzleSink< } async finalize(cursor?: Cursor) { - // TODO: Implement - throw new Error("Not implemented"); + await this._db.transaction(async (db) => { + for (const table of this._tables) { + // delete all rows where the upper bound of "_cursor" is less than the finalize cursor + await db + .delete(table) + .where(sql`upper(_cursor) < ${Number(cursor?.orderKey)}`); + } + }); } } diff --git a/packages/indexer/src/sinks/sqlite.ts b/packages/indexer/src/sinks/sqlite.ts index 0b625dc..92e97a6 100644 --- a/packages/indexer/src/sinks/sqlite.ts +++ b/packages/indexer/src/sinks/sqlite.ts @@ -111,8 +111,7 @@ export class SqliteSink extends Sink { } async finalize(cursor?: Cursor) { - // TODO: Implement - throw new Error("Not implemented"); + // No Implementation required } private async insertJsonArray(data: SinkData[]) { From 7d5888eb00c1ffd9438431defeb039931893eec0 Mon Sep 17 00:00:00 2001 From: jaipaljadeja Date: Fri, 29 Nov 2024 19:41:38 +0530 Subject: [PATCH 4/4] indexer: fix edge cases in reorg methods in sinks --- ...ibara-indexer-002ca4e8-77c3-4c7c-a51d-d95504a20094.json | 7 +++++++ packages/indexer/src/internal/testing.ts | 6 ++---- packages/indexer/src/sinks/drizzle/drizzle.ts | 4 ++++ packages/indexer/src/sinks/sqlite.ts | 4 +++- 4 files changed, 16 insertions(+), 5 deletions(-) create mode 100644 change/@apibara-indexer-002ca4e8-77c3-4c7c-a51d-d95504a20094.json diff --git a/change/@apibara-indexer-002ca4e8-77c3-4c7c-a51d-d95504a20094.json b/change/@apibara-indexer-002ca4e8-77c3-4c7c-a51d-d95504a20094.json new file mode 100644 index 0000000..71b4eba --- /dev/null +++ b/change/@apibara-indexer-002ca4e8-77c3-4c7c-a51d-d95504a20094.json @@ -0,0 +1,7 @@ +{ + "type": "prerelease", + "comment": "add reorg aware implementations in sinks", + "packageName": "@apibara/indexer", + "email": "jadejajaipal5@gmail.com", + "dependentChangeType": "patch" +} diff --git a/packages/indexer/src/internal/testing.ts b/packages/indexer/src/internal/testing.ts index 38f8298..cab31fa 100644 --- a/packages/indexer/src/internal/testing.ts +++ b/packages/indexer/src/internal/testing.ts @@ -92,8 +92,7 @@ export class MockSink extends Sink { } async invalidateOnRestart(cursor?: Cursor) { - // TODO: Implement - throw new Error("Not implemented"); + // No Implementation required } async invalidate(cursor?: Cursor) { @@ -102,8 +101,7 @@ export class MockSink extends Sink { } async finalize(cursor?: Cursor) { - // TODO: Implement - throw new Error("Not implemented"); + // No Implementation required } } diff --git a/packages/indexer/src/sinks/drizzle/drizzle.ts b/packages/indexer/src/sinks/drizzle/drizzle.ts index 969a306..03e1597 100644 --- a/packages/indexer/src/sinks/drizzle/drizzle.ts +++ b/packages/indexer/src/sinks/drizzle/drizzle.ts @@ -88,6 +88,8 @@ export class DrizzleSink< } async invalidate(cursor?: Cursor) { + if (cursor?.orderKey === undefined) return; + await this._db.transaction(async (db) => { for (const table of this._tables) { // delete all rows whose lowerbound of "_cursor" (int8range) column is greater than the invalidate cursor @@ -106,6 +108,8 @@ export class DrizzleSink< } async finalize(cursor?: Cursor) { + if (cursor?.orderKey === undefined) return; + await this._db.transaction(async (db) => { for (const table of this._tables) { // delete all rows where the upper bound of "_cursor" is less than the finalize cursor diff --git a/packages/indexer/src/sinks/sqlite.ts b/packages/indexer/src/sinks/sqlite.ts index 92e97a6..4cd4830 100644 --- a/packages/indexer/src/sinks/sqlite.ts +++ b/packages/indexer/src/sinks/sqlite.ts @@ -104,7 +104,9 @@ export class SqliteSink extends Sink { } async invalidate(cursor?: Cursor) { - const cursorValue = Number(cursor?.orderKey); + if (cursor?.orderKey === undefined) return; + + const cursorValue = Number(cursor.orderKey); const sql = `DELETE FROM ${this._config.tableName} WHERE ${this._config.cursorColumn ?? "_cursor"} > ?`; this._db.prepare(sql).run(cursorValue);