Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add invalidation and finalize method implementations for sinks #118

Merged
merged 4 commits into from
Nov 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"type": "prerelease",
"comment": "add reorg aware implementations in sinks",
"packageName": "@apibara/indexer",
"email": "[email protected]",
"dependentChangeType": "patch"
}
3 changes: 3 additions & 0 deletions packages/indexer/src/indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,9 @@ export async function run<TFilter, TBlock, TTxnParams>(

await indexer.hooks.callHook("connect:before", { request, options });

// avoid having duplicate data if it was inserted before the persistence commited the state
await sink.invalidateOnRestart(request.startingCursor);

// store main filter, so later it can be merged
let mainFilter: TFilter;
if (isFactoryMode) {
Expand Down
7 changes: 5 additions & 2 deletions packages/indexer/src/internal/testing.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,14 +91,17 @@ export class MockSink extends Sink {
this.write({ data: context.buffer, endCursor });
}

async invalidateOnRestart(cursor?: Cursor) {
// No Implementation required
}

async invalidate(cursor?: Cursor) {
// TODO: Implement
throw new Error("Not implemented");
}

async finalize(cursor?: Cursor) {
// TODO: Implement
throw new Error("Not implemented");
// No Implementation required
}
}

Expand Down
7 changes: 7 additions & 0 deletions packages/indexer/src/sink.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ export abstract class Sink<TTxnParams = unknown> {
cb: (params: TTxnParams) => Promise<void>,
): Promise<void>;

abstract invalidateOnRestart(cursor?: Cursor): Promise<void>;
abstract invalidate(cursor?: Cursor): Promise<void>;
abstract finalize(cursor?: Cursor): Promise<void>;
}
Expand All @@ -28,6 +29,12 @@ export class DefaultSink extends Sink<unknown> {
await cb({});
}

async invalidateOnRestart(cursor?: Cursor) {
consola.info(
`Invalidating all rows with cursor > ${cursor?.orderKey} on restart`,
);
}

async invalidate(cursor?: Cursor) {
consola.info(`Invalidating cursor ${cursor?.orderKey}`);
}
Expand Down
10 changes: 6 additions & 4 deletions packages/indexer/src/sinks/csv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,14 +99,16 @@ export class CsvSink extends Sink {
await this.write({ data: context.buffer, endCursor });
}

async invalidateOnRestart(cursor?: Cursor) {
// No Implementation required
}
Comment on lines +102 to +104
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

⚠️ Potential issue

Empty implementation of required abstract method needs attention

The invalidateOnRestart is an abstract method in the base Sink class that must be properly implemented by all subclasses. Looking at other sink implementations (SQLite, Drizzle), they handle restart invalidation by delegating to their invalidate method. The current CSV sink's empty implementation with "No Implementation required" comment is incorrect since:

  1. It's a required abstract method from the base class
  2. The CSV sink already throws "not implemented" for invalidate, suggesting reorg/invalidation isn't supported
  3. Other sinks properly implement this for data consistency

Either:

  • Implement proper invalidation support in the CSV sink
  • Or throw the same "not implemented" error as the invalidate method to be consistent
🔗 Analysis chain

Implementation needed for data integrity during restarts

The comment suggests no implementation is required, but this seems inconsistent with:

  1. The PR's objective to add invalidation
  2. Other sinks implementing this method
  3. The method being called from the indexer's run function with a starting cursor

Consider implementing proper invalidation logic to ensure data consistency when the indexer restarts.

Let's verify how other sinks implement this method:


Let me verify the base Sink class implementation to understand if this is an optional method:


Let me try a different approach to find the base class:

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for invalidateOnRestart implementations in other sinks
rg -A 5 "invalidateOnRestart.*\{" packages/indexer/src/sinks/

Length of output: 1303


Script:

#!/bin/bash
# Search for the Sink base class definition
ast-grep --pattern 'abstract class Sink {
  $$$
}'

Length of output: 52


Script:

#!/bin/bash
# Search for Sink class definition using ripgrep
rg -A 10 "^(export )?(abstract )?class Sink" packages/indexer/src/

Length of output: 760


async invalidate(cursor?: Cursor) {
// TODO: Implement
throw new Error("Not implemented");
throw new Error("Reorg for CSV Sink is not implemented");
}

async finalize(cursor?: Cursor) {
// TODO: Implement
throw new Error("Not implemented");
// No Implementation required
}
Comment on lines 110 to 112
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Implement proper finalization for resource cleanup

The finalize method should ensure:

  1. All buffered data is written to the CSV file
  2. The CSV stringifier and file streams are properly closed

Here's a suggested implementation:

-  async finalize(cursor?: Cursor) {
-    // No Implementation required
-  }
+  async finalize(cursor?: Cursor) {
+    return new Promise<void>((resolve, reject) => {
+      this._stringifier.end(() => {
+        resolve();
+      });
+    });
+  }

Committable suggestion skipped: line range outside the PR's diff.


private async insertToCSV(data: SinkData[]) {
Expand Down
21 changes: 17 additions & 4 deletions packages/indexer/src/sinks/drizzle/drizzle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,19 @@ export class DrizzleSink<
});
}

async invalidateOnRestart(cursor?: Cursor) {
await this.invalidate(cursor);
}

async invalidate(cursor?: Cursor) {
if (cursor?.orderKey === undefined) return;

await this._db.transaction(async (db) => {
for (const table of this._tables) {
// delete all rows whose lowerbound of "_cursor" (int8range) column is greater than the invalidate cursor
await db
.delete(table)
.where(gt(sql`lower(_cursor)`, sql`${Number(cursor?.orderKey)}`))
.returning();
.where(gt(sql`lower(_cursor)`, sql`${Number(cursor?.orderKey)}`));
// and for rows whose upperbound of "_cursor" (int8range) column is greater than the invalidate cursor, set the upperbound to infinity
await db
.update(table)
Expand All @@ -103,8 +108,16 @@ export class DrizzleSink<
}

async finalize(cursor?: Cursor) {
// TODO: Implement
throw new Error("Not implemented");
if (cursor?.orderKey === undefined) return;

await this._db.transaction(async (db) => {
for (const table of this._tables) {
// delete all rows where the upper bound of "_cursor" is less than the finalize cursor
await db
.delete(table)
.where(sql`upper(_cursor) < ${Number(cursor?.orderKey)}`);
}
});
}
}

Expand Down
15 changes: 11 additions & 4 deletions packages/indexer/src/sinks/sqlite.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,14 +99,21 @@ export class SqliteSink extends Sink {
await this.write({ data: context.buffer, endCursor });
}

async invalidateOnRestart(cursor?: Cursor) {
await this.invalidate(cursor);
}

async invalidate(cursor?: Cursor) {
// TODO: Implement
throw new Error("Not implemented");
if (cursor?.orderKey === undefined) return;

const cursorValue = Number(cursor.orderKey);

const sql = `DELETE FROM ${this._config.tableName} WHERE ${this._config.cursorColumn ?? "_cursor"} > ?`;
this._db.prepare(sql).run(cursorValue);
Comment on lines +107 to +112
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Several improvements needed for robustness

While the basic logic is sound, there are several areas that could be improved for better safety and reliability:

  1. SQL Injection risk: Table and column names are directly interpolated into the query
  2. Missing error handling for SQL execution
  3. Potential type conversion issues with Number(cursor.orderKey)
  4. Edge case handling when cursorColumn is undefined needs verification

Consider applying these improvements:

 async invalidate(cursor?: Cursor) {
   if (cursor?.orderKey === undefined) return;

-  const cursorValue = Number(cursor.orderKey);
+  // Validate orderKey is numeric
+  const cursorValue = Number(cursor.orderKey);
+  if (isNaN(cursorValue)) {
+    throw new Error('Invalid cursor orderKey: must be numeric');
+  }
+
+  try {
+    // Use parameterized query for table/column names
+    const columnName = this._config.cursorColumn ?? "_cursor";
+    // Validate table/column names to prevent SQL injection
+    if (!/^[a-zA-Z_][a-zA-Z0-9_]*$/.test(this._config.tableName) || 
+        !/^[a-zA-Z_][a-zA-Z0-9_]*$/.test(columnName)) {
+      throw new Error('Invalid table or column name');
+    }
+
+    const sql = `DELETE FROM ${this._config.tableName} WHERE ${columnName} > ?`;
+    this._db.prepare(sql).run(cursorValue);
+  } catch (error) {
+    throw new Error(`Failed to invalidate data: ${error.message}`);
+  }
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if (cursor?.orderKey === undefined) return;
const cursorValue = Number(cursor.orderKey);
const sql = `DELETE FROM ${this._config.tableName} WHERE ${this._config.cursorColumn ?? "_cursor"} > ?`;
this._db.prepare(sql).run(cursorValue);
if (cursor?.orderKey === undefined) return;
// Validate orderKey is numeric
const cursorValue = Number(cursor.orderKey);
if (isNaN(cursorValue)) {
throw new Error('Invalid cursor orderKey: must be numeric');
}
try {
// Use parameterized query for table/column names
const columnName = this._config.cursorColumn ?? "_cursor";
// Validate table/column names to prevent SQL injection
if (!/^[a-zA-Z_][a-zA-Z0-9_]*$/.test(this._config.tableName) ||
!/^[a-zA-Z_][a-zA-Z0-9_]*$/.test(columnName)) {
throw new Error('Invalid table or column name');
}
const sql = `DELETE FROM ${this._config.tableName} WHERE ${columnName} > ?`;
this._db.prepare(sql).run(cursorValue);
} catch (error) {
throw new Error(`Failed to invalidate data: ${error.message}`);
}

}

async finalize(cursor?: Cursor) {
// TODO: Implement
throw new Error("Not implemented");
// No Implementation required
}

private async insertJsonArray(data: SinkData[]) {
Expand Down