diff --git a/change/@apibara-indexer-002ca4e8-77c3-4c7c-a51d-d95504a20094.json b/change/@apibara-indexer-002ca4e8-77c3-4c7c-a51d-d95504a20094.json new file mode 100644 index 0000000..71b4eba --- /dev/null +++ b/change/@apibara-indexer-002ca4e8-77c3-4c7c-a51d-d95504a20094.json @@ -0,0 +1,7 @@ +{ + "type": "prerelease", + "comment": "add reorg aware implementations in sinks", + "packageName": "@apibara/indexer", + "email": "jadejajaipal5@gmail.com", + "dependentChangeType": "patch" +} diff --git a/packages/indexer/src/indexer.ts b/packages/indexer/src/indexer.ts index d15dc3f..3026a6b 100644 --- a/packages/indexer/src/indexer.ts +++ b/packages/indexer/src/indexer.ts @@ -178,6 +178,9 @@ export async function run( await indexer.hooks.callHook("connect:before", { request, options }); + // avoid having duplicate data if it was inserted before the persistence commited the state + await sink.invalidateOnRestart(request.startingCursor); + // store main filter, so later it can be merged let mainFilter: TFilter; if (isFactoryMode) { diff --git a/packages/indexer/src/internal/testing.ts b/packages/indexer/src/internal/testing.ts index 4499336..cab31fa 100644 --- a/packages/indexer/src/internal/testing.ts +++ b/packages/indexer/src/internal/testing.ts @@ -91,14 +91,17 @@ export class MockSink extends Sink { this.write({ data: context.buffer, endCursor }); } + async invalidateOnRestart(cursor?: Cursor) { + // No Implementation required + } + async invalidate(cursor?: Cursor) { // TODO: Implement throw new Error("Not implemented"); } async finalize(cursor?: Cursor) { - // TODO: Implement - throw new Error("Not implemented"); + // No Implementation required } } diff --git a/packages/indexer/src/sink.ts b/packages/indexer/src/sink.ts index ca094b6..3edfc7d 100644 --- a/packages/indexer/src/sink.ts +++ b/packages/indexer/src/sink.ts @@ -16,6 +16,7 @@ export abstract class Sink { cb: (params: TTxnParams) => Promise, ): Promise; + abstract invalidateOnRestart(cursor?: Cursor): Promise; abstract invalidate(cursor?: Cursor): Promise; abstract finalize(cursor?: Cursor): Promise; } @@ -28,6 +29,12 @@ export class DefaultSink extends Sink { await cb({}); } + async invalidateOnRestart(cursor?: Cursor) { + consola.info( + `Invalidating all rows with cursor > ${cursor?.orderKey} on restart`, + ); + } + async invalidate(cursor?: Cursor) { consola.info(`Invalidating cursor ${cursor?.orderKey}`); } diff --git a/packages/indexer/src/sinks/csv.ts b/packages/indexer/src/sinks/csv.ts index f31893f..df64a7d 100644 --- a/packages/indexer/src/sinks/csv.ts +++ b/packages/indexer/src/sinks/csv.ts @@ -99,14 +99,16 @@ export class CsvSink extends Sink { await this.write({ data: context.buffer, endCursor }); } + async invalidateOnRestart(cursor?: Cursor) { + // No Implementation required + } + async invalidate(cursor?: Cursor) { - // TODO: Implement - throw new Error("Not implemented"); + throw new Error("Reorg for CSV Sink is not implemented"); } async finalize(cursor?: Cursor) { - // TODO: Implement - throw new Error("Not implemented"); + // No Implementation required } private async insertToCSV(data: SinkData[]) { diff --git a/packages/indexer/src/sinks/drizzle/drizzle.ts b/packages/indexer/src/sinks/drizzle/drizzle.ts index 4e160e8..03e1597 100644 --- a/packages/indexer/src/sinks/drizzle/drizzle.ts +++ b/packages/indexer/src/sinks/drizzle/drizzle.ts @@ -83,14 +83,19 @@ export class DrizzleSink< }); } + async invalidateOnRestart(cursor?: Cursor) { + await this.invalidate(cursor); + } + async invalidate(cursor?: Cursor) { + if (cursor?.orderKey === undefined) return; + await this._db.transaction(async (db) => { for (const table of this._tables) { // delete all rows whose lowerbound of "_cursor" (int8range) column is greater than the invalidate cursor await db .delete(table) - .where(gt(sql`lower(_cursor)`, sql`${Number(cursor?.orderKey)}`)) - .returning(); + .where(gt(sql`lower(_cursor)`, sql`${Number(cursor?.orderKey)}`)); // and for rows whose upperbound of "_cursor" (int8range) column is greater than the invalidate cursor, set the upperbound to infinity await db .update(table) @@ -103,8 +108,16 @@ export class DrizzleSink< } async finalize(cursor?: Cursor) { - // TODO: Implement - throw new Error("Not implemented"); + if (cursor?.orderKey === undefined) return; + + await this._db.transaction(async (db) => { + for (const table of this._tables) { + // delete all rows where the upper bound of "_cursor" is less than the finalize cursor + await db + .delete(table) + .where(sql`upper(_cursor) < ${Number(cursor?.orderKey)}`); + } + }); } } diff --git a/packages/indexer/src/sinks/sqlite.ts b/packages/indexer/src/sinks/sqlite.ts index f735e9c..4cd4830 100644 --- a/packages/indexer/src/sinks/sqlite.ts +++ b/packages/indexer/src/sinks/sqlite.ts @@ -99,14 +99,21 @@ export class SqliteSink extends Sink { await this.write({ data: context.buffer, endCursor }); } + async invalidateOnRestart(cursor?: Cursor) { + await this.invalidate(cursor); + } + async invalidate(cursor?: Cursor) { - // TODO: Implement - throw new Error("Not implemented"); + if (cursor?.orderKey === undefined) return; + + const cursorValue = Number(cursor.orderKey); + + const sql = `DELETE FROM ${this._config.tableName} WHERE ${this._config.cursorColumn ?? "_cursor"} > ?`; + this._db.prepare(sql).run(cursorValue); } async finalize(cursor?: Cursor) { - // TODO: Implement - throw new Error("Not implemented"); + // No Implementation required } private async insertJsonArray(data: SinkData[]) {