Skip to content

Commit

Permalink
feat: add invalidation and finalize method implementations for sinks (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
fracek authored Nov 29, 2024
2 parents e2ee1be + 7d5888e commit 2a7b470
Show file tree
Hide file tree
Showing 7 changed files with 56 additions and 14 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"type": "prerelease",
"comment": "add reorg aware implementations in sinks",
"packageName": "@apibara/indexer",
"email": "[email protected]",
"dependentChangeType": "patch"
}
3 changes: 3 additions & 0 deletions packages/indexer/src/indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,9 @@ export async function run<TFilter, TBlock, TTxnParams>(

await indexer.hooks.callHook("connect:before", { request, options });

// avoid having duplicate data if it was inserted before the persistence commited the state
await sink.invalidateOnRestart(request.startingCursor);

// store main filter, so later it can be merged
let mainFilter: TFilter;
if (isFactoryMode) {
Expand Down
7 changes: 5 additions & 2 deletions packages/indexer/src/internal/testing.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,14 +91,17 @@ export class MockSink extends Sink {
this.write({ data: context.buffer, endCursor });
}

async invalidateOnRestart(cursor?: Cursor) {
// No Implementation required
}

async invalidate(cursor?: Cursor) {
// TODO: Implement
throw new Error("Not implemented");
}

async finalize(cursor?: Cursor) {
// TODO: Implement
throw new Error("Not implemented");
// No Implementation required
}
}

Expand Down
7 changes: 7 additions & 0 deletions packages/indexer/src/sink.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ export abstract class Sink<TTxnParams = unknown> {
cb: (params: TTxnParams) => Promise<void>,
): Promise<void>;

abstract invalidateOnRestart(cursor?: Cursor): Promise<void>;
abstract invalidate(cursor?: Cursor): Promise<void>;
abstract finalize(cursor?: Cursor): Promise<void>;
}
Expand All @@ -28,6 +29,12 @@ export class DefaultSink extends Sink<unknown> {
await cb({});
}

async invalidateOnRestart(cursor?: Cursor) {
consola.info(
`Invalidating all rows with cursor > ${cursor?.orderKey} on restart`,
);
}

async invalidate(cursor?: Cursor) {
consola.info(`Invalidating cursor ${cursor?.orderKey}`);
}
Expand Down
10 changes: 6 additions & 4 deletions packages/indexer/src/sinks/csv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,14 +99,16 @@ export class CsvSink extends Sink {
await this.write({ data: context.buffer, endCursor });
}

async invalidateOnRestart(cursor?: Cursor) {
// No Implementation required
}

async invalidate(cursor?: Cursor) {
// TODO: Implement
throw new Error("Not implemented");
throw new Error("Reorg for CSV Sink is not implemented");
}

async finalize(cursor?: Cursor) {
// TODO: Implement
throw new Error("Not implemented");
// No Implementation required
}

private async insertToCSV(data: SinkData[]) {
Expand Down
21 changes: 17 additions & 4 deletions packages/indexer/src/sinks/drizzle/drizzle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,19 @@ export class DrizzleSink<
});
}

async invalidateOnRestart(cursor?: Cursor) {
await this.invalidate(cursor);
}

async invalidate(cursor?: Cursor) {
if (cursor?.orderKey === undefined) return;

await this._db.transaction(async (db) => {
for (const table of this._tables) {
// delete all rows whose lowerbound of "_cursor" (int8range) column is greater than the invalidate cursor
await db
.delete(table)
.where(gt(sql`lower(_cursor)`, sql`${Number(cursor?.orderKey)}`))
.returning();
.where(gt(sql`lower(_cursor)`, sql`${Number(cursor?.orderKey)}`));
// and for rows whose upperbound of "_cursor" (int8range) column is greater than the invalidate cursor, set the upperbound to infinity
await db
.update(table)
Expand All @@ -103,8 +108,16 @@ export class DrizzleSink<
}

async finalize(cursor?: Cursor) {
// TODO: Implement
throw new Error("Not implemented");
if (cursor?.orderKey === undefined) return;

await this._db.transaction(async (db) => {
for (const table of this._tables) {
// delete all rows where the upper bound of "_cursor" is less than the finalize cursor
await db
.delete(table)
.where(sql`upper(_cursor) < ${Number(cursor?.orderKey)}`);
}
});
}
}

Expand Down
15 changes: 11 additions & 4 deletions packages/indexer/src/sinks/sqlite.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,14 +99,21 @@ export class SqliteSink extends Sink {
await this.write({ data: context.buffer, endCursor });
}

async invalidateOnRestart(cursor?: Cursor) {
await this.invalidate(cursor);
}

async invalidate(cursor?: Cursor) {
// TODO: Implement
throw new Error("Not implemented");
if (cursor?.orderKey === undefined) return;

const cursorValue = Number(cursor.orderKey);

const sql = `DELETE FROM ${this._config.tableName} WHERE ${this._config.cursorColumn ?? "_cursor"} > ?`;
this._db.prepare(sql).run(cursorValue);
}

async finalize(cursor?: Cursor) {
// TODO: Implement
throw new Error("Not implemented");
// No Implementation required
}

private async insertJsonArray(data: SinkData[]) {
Expand Down

0 comments on commit 2a7b470

Please sign in to comment.