-
Notifications
You must be signed in to change notification settings - Fork 6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add invalidation and finalize method implementations for sinks #118
Changes from all commits
a36dc0f
bec3f50
6624508
7d5888e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
{ | ||
"type": "prerelease", | ||
"comment": "add reorg aware implementations in sinks", | ||
"packageName": "@apibara/indexer", | ||
"email": "[email protected]", | ||
"dependentChangeType": "patch" | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -99,14 +99,16 @@ export class CsvSink extends Sink { | |
await this.write({ data: context.buffer, endCursor }); | ||
} | ||
|
||
async invalidateOnRestart(cursor?: Cursor) { | ||
// No Implementation required | ||
} | ||
|
||
async invalidate(cursor?: Cursor) { | ||
// TODO: Implement | ||
throw new Error("Not implemented"); | ||
throw new Error("Reorg for CSV Sink is not implemented"); | ||
} | ||
|
||
async finalize(cursor?: Cursor) { | ||
// TODO: Implement | ||
throw new Error("Not implemented"); | ||
// No Implementation required | ||
} | ||
Comment on lines
110
to
112
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Implement proper finalization for resource cleanup The
Here's a suggested implementation: - async finalize(cursor?: Cursor) {
- // No Implementation required
- }
+ async finalize(cursor?: Cursor) {
+ return new Promise<void>((resolve, reject) => {
+ this._stringifier.end(() => {
+ resolve();
+ });
+ });
+ }
|
||
|
||
private async insertToCSV(data: SinkData[]) { | ||
|
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -99,14 +99,21 @@ export class SqliteSink extends Sink { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||
await this.write({ data: context.buffer, endCursor }); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
async invalidateOnRestart(cursor?: Cursor) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
await this.invalidate(cursor); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
async invalidate(cursor?: Cursor) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// TODO: Implement | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
throw new Error("Not implemented"); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if (cursor?.orderKey === undefined) return; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
const cursorValue = Number(cursor.orderKey); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
const sql = `DELETE FROM ${this._config.tableName} WHERE ${this._config.cursorColumn ?? "_cursor"} > ?`; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
this._db.prepare(sql).run(cursorValue); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+107
to
+112
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Several improvements needed for robustness While the basic logic is sound, there are several areas that could be improved for better safety and reliability:
Consider applying these improvements: async invalidate(cursor?: Cursor) {
if (cursor?.orderKey === undefined) return;
- const cursorValue = Number(cursor.orderKey);
+ // Validate orderKey is numeric
+ const cursorValue = Number(cursor.orderKey);
+ if (isNaN(cursorValue)) {
+ throw new Error('Invalid cursor orderKey: must be numeric');
+ }
+
+ try {
+ // Use parameterized query for table/column names
+ const columnName = this._config.cursorColumn ?? "_cursor";
+ // Validate table/column names to prevent SQL injection
+ if (!/^[a-zA-Z_][a-zA-Z0-9_]*$/.test(this._config.tableName) ||
+ !/^[a-zA-Z_][a-zA-Z0-9_]*$/.test(columnName)) {
+ throw new Error('Invalid table or column name');
+ }
+
+ const sql = `DELETE FROM ${this._config.tableName} WHERE ${columnName} > ?`;
+ this._db.prepare(sql).run(cursorValue);
+ } catch (error) {
+ throw new Error(`Failed to invalidate data: ${error.message}`);
+ }
} 📝 Committable suggestion
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
async finalize(cursor?: Cursor) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// TODO: Implement | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
throw new Error("Not implemented"); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// No Implementation required | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
private async insertJsonArray(data: SinkData[]) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codebase verification
Empty implementation of required abstract method needs attention
The
invalidateOnRestart
is an abstract method in the baseSink
class that must be properly implemented by all subclasses. Looking at other sink implementations (SQLite, Drizzle), they handle restart invalidation by delegating to theirinvalidate
method. The current CSV sink's empty implementation with "No Implementation required" comment is incorrect since:invalidate
, suggesting reorg/invalidation isn't supportedEither:
invalidate
method to be consistent🔗 Analysis chain
Implementation needed for data integrity during restarts
The comment suggests no implementation is required, but this seems inconsistent with:
run
function with a starting cursorConsider implementing proper invalidation logic to ensure data consistency when the indexer restarts.
Let's verify how other sinks implement this method:
Let me verify the base
Sink
class implementation to understand if this is an optional method:Let me try a different approach to find the base class:
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
Length of output: 1303
Script:
Length of output: 52
Script:
Length of output: 760