-
Notifications
You must be signed in to change notification settings - Fork 6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add invalidation and finalize method implementations for sinks #118
Conversation
📝 WalkthroughWalkthroughThis pull request introduces several changes to the Changes
Possibly related PRs
Suggested reviewers
Poem
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🧹 Outside diff range and nitpick comments (8)
packages/indexer/src/sink.ts (1)
19-19
: Add JSDoc documentation for the new abstract method.The new
invalidateOnRestart
method would benefit from documentation explaining its purpose, behavior, and the significance of the optional cursor parameter.Add documentation like this:
+ /** + * Invalidates data after an indexer restart. + * @param cursor - Optional cursor indicating the point after which data should be invalidated + */ abstract invalidateOnRestart(cursor?: Cursor): Promise<void>;packages/indexer/src/internal/testing.ts (1)
94-96
: Enhance the documentation for mock implementation.While having no implementation is appropriate for a mock class, the comment should be more descriptive about why no implementation is required and what this method is expected to do in real implementations.
async invalidateOnRestart(cursor?: Cursor) { - // No Implementation required + // No implementation required in mock sink. + // In real sinks, this method is called during indexer restart to handle data invalidation + // based on the provided cursor. }packages/indexer/src/sinks/csv.ts (2)
106-108
: Consider implementing reorg support for data consistencyWhile CSV files are append-only by nature, throwing an error for reorgs could lead to data inconsistency. Consider:
- Implementing reorg support by maintaining a separate file for each block range
- Adding documentation explaining the limitations and potential data consistency issues
102-112
: Consider architectural improvements for data integrityThe current implementation has several potential issues:
- No atomic write guarantees
- No handling of partial writes or failures
- No cleanup of incomplete data on restart
Consider implementing:
- Write to temporary files and atomic rename on completion
- Transaction log for recovery
- Proper error handling and cleanup in the write path
packages/indexer/src/sinks/sqlite.ts (1)
115-116
: Add documentation explaining why finalize is not implementedWhile it's valid that no implementation is required, it would be helpful to add JSDoc comments explaining why this is the case for future maintainers.
Consider adding documentation:
+ /** + * Finalize is not required for SQLite sink as all operations are immediately + * persisted to disk and no cleanup is needed. + * @param cursor Optional cursor parameter (unused) + */ async finalize(cursor?: Cursor) { // No Implementation required }packages/indexer/src/indexer.ts (1)
181-182
: LGTM! Consider adding error handling and debug logging.The placement and implementation of
invalidateOnRestart
is correct for preventing duplicate data processing during restarts.Consider these improvements:
- Add error handling:
// avoid having duplicate data if it was inserted before the persistence commited the state -await sink.invalidateOnRestart(request.startingCursor); +try { + await sink.invalidateOnRestart(request.startingCursor); +} catch (error) { + consola.error('Failed to invalidate on restart:', error); + throw error; +}
- Add debug logging for better observability:
+if (indexer.options.debug) { + consola.debug('Invalidating data from cursor:', request.startingCursor); +} await sink.invalidateOnRestart(request.startingCursor);packages/indexer/src/sinks/drizzle/drizzle.ts (2)
86-88
: Add method documentation forinvalidateOnRestart
Consider adding a JSDoc comment to the newly added
invalidateOnRestart
method to explain its purpose and usage within the class. This enhances code readability and aids future maintainers.
111-112
: Add method documentation forfinalize
Since the
finalize
method now contains implemented logic, consider adding a JSDoc comment to explain its purpose and how it affects the data. This improves code clarity and assists future developers.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (7)
change/@apibara-indexer-002ca4e8-77c3-4c7c-a51d-d95504a20094.json
(1 hunks)packages/indexer/src/indexer.ts
(1 hunks)packages/indexer/src/internal/testing.ts
(1 hunks)packages/indexer/src/sink.ts
(2 hunks)packages/indexer/src/sinks/csv.ts
(1 hunks)packages/indexer/src/sinks/drizzle/drizzle.ts
(2 hunks)packages/indexer/src/sinks/sqlite.ts
(1 hunks)
✅ Files skipped from review due to trivial changes (1)
- change/@apibara-indexer-002ca4e8-77c3-4c7c-a51d-d95504a20094.json
🔇 Additional comments (8)
packages/indexer/src/sink.ts (1)
32-36
: LGTM! Default implementation provides good base for specific sinks.
The implementation follows the pattern of other default methods in this class, providing a basic logging implementation that concrete sink classes can override with specific invalidation logic.
Let's verify that concrete sink implementations properly override this method:
✅ Verification successful
All sink implementations handle invalidateOnRestart appropriately
The verification shows that all concrete sink implementations handle the method according to their specific needs:
- SqliteSink: Properly implements invalidation by delegating to its invalidate method
- CsvSink: Explicitly marks it as not required since CSV is append-only
- MockSink: Marks it as not required since it's a test implementation
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for sink classes that might need to implement this method
ast-grep --pattern 'class $_ extends Sink {
$$$
}'
Length of output: 16612
packages/indexer/src/internal/testing.ts (1)
103-105
: Enhance documentation and verify implementation across sinks.
While removing the error-throwing behavior is appropriate, the documentation should be more descriptive. Additionally, we should verify that other sink implementations properly implement this method.
async finalize(cursor?: Cursor) {
- // No Implementation required
+ // No implementation required in mock sink.
+ // In real sinks, this method is called to perform cleanup or final processing
+ // when the indexer completes its operation.
}
✅ Verification successful
Based on the results, I can now generate the final response as I have sufficient information about the finalize method implementations across different sinks:
Documentation improvement is sufficient, no implementation issues found
The empty implementation in MockSink is correct as verified by checking other sink implementations:
- DefaultSink: Has a basic logging implementation
- SqliteSink: Also uses "No Implementation required" comment
- CsvSink: Also uses "No Implementation required" comment
The suggested documentation improvement would indeed be helpful for clarity.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Check finalize method implementation in other sink classes
# Expected: Find implementations of finalize method in real sink classes
# Search for finalize method implementations
ast-grep --pattern 'class $_ extends Sink {
$$$
async finalize($_) {
$$$
}
$$$
}'
# Search for any TODO or unimplemented finalize methods
rg -A 2 'async finalize.*\{' | rg -A 2 '(TODO|throw new Error|Not implemented)'
Length of output: 16735
packages/indexer/src/sinks/sqlite.ts (2)
102-104
: LGTM: Clean implementation of invalidateOnRestart
The method correctly implements the standard pattern of delegating to invalidate, maintaining consistency with other sink implementations.
102-116
: Verify consistency with other sink implementations
Let's ensure this implementation aligns with other sinks and is properly integrated with the indexer.
✅ Verification successful
Implementation is consistent across sinks and properly integrated
The verification shows that:
- The base
Sink
class has a default implementation inDefaultSink
that logs invalidation SqliteSink
implements invalidation by deleting rows with cursor greater than the given cursorCsvSink
explicitly has no implementation as noted in commentsMockSink
(testing) has no implementation as noted in comments- The indexer correctly calls
invalidateOnRestart
at the start of processing with the starting cursor
This is a consistent pattern where:
- Each sink decides how to handle invalidation based on its storage mechanism
- SQLite needs active cleanup due to its persistent nature
- CSV and Mock sinks don't require cleanup as they handle data differently
- The indexer properly integrates this by calling it at the right time
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Check other sink implementations for consistency
echo "Checking other sink implementations..."
ast-grep --pattern 'class $SINK extends Sink {
$$$
async invalidateOnRestart(cursor?: Cursor) {
$$$
}
$$$
}'
# Verify indexer usage
echo "Checking indexer usage..."
rg -A 3 "sink\.invalidateOnRestart"
Length of output: 17136
packages/indexer/src/indexer.ts (1)
181-182
: Verify sink implementations and transaction handling.
Let's verify the implementation of invalidateOnRestart
across sink classes and its integration with the transaction system.
✅ Verification successful
Let me gather more information about the sink implementations since the previous results don't show the complete picture.
Based on the gathered information, I can now provide the final response:
Implementation of invalidateOnRestart
is consistent and properly handled
The verification shows that:
- The
invalidateOnRestart
method is properly defined in theSink
interface and implemented across all sink classes - Both DrizzleSink and SQLiteSink handle it by delegating to
invalidate()
with proper transaction support - CSV and Testing sinks have no-op implementations as expected
- DrizzleSink specifically implements proper transaction handling for data consistency
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Check sink implementations and transaction handling
# Check for invalidateOnRestart implementation in sink classes
echo "Checking sink implementations..."
ast-grep --pattern 'class $_ implements Sink {
$$$
invalidateOnRestart($_) {
$$$
}
$$$
}'
# Check error handling patterns in similar sink operations
echo "Checking error handling patterns..."
rg -A 5 'sink\.(invalidate|finalize)' --type ts
# Check transaction integration
echo "Checking transaction integration..."
ast-grep --pattern 'sink.transaction($_, async ($txn) => {
$$$
})'
Length of output: 2138
Script:
#!/bin/bash
# Let's check the sink interface and implementations more thoroughly
# Check Sink interface definition
echo "Checking Sink interface definition..."
ast-grep --pattern 'interface Sink {
$$$
}'
# Find all sink implementations
echo "Finding sink class implementations..."
rg "class.*implements.*Sink" -A 10
# Check invalidateOnRestart implementations specifically
echo "Checking invalidateOnRestart implementations..."
rg "invalidateOnRestart" -A 5 --type ts
# Look for transaction-related code in sink implementations
echo "Checking transaction patterns..."
rg "transaction.*async" -A 5 --type ts
Length of output: 6180
packages/indexer/src/sinks/drizzle/drizzle.ts (3)
91-92
: Ensure consistent null checks for cursor?.orderKey
The null check if (cursor?.orderKey === undefined) return;
is appropriate and prevents unnecessary processing when cursor.orderKey
is undefined. Ensure that similar methods use consistent null checking for clarity and maintainability.
98-98
: Verify the necessity of Number()
conversion
In the .where()
clause:
.where(gt(sql`lower(_cursor)`, sql`${Number(cursor?.orderKey)}`));
Ensure that the conversion to Number
is necessary. If cursor.orderKey
is already a number, the explicit conversion might be redundant. Confirm that cursor.orderKey
is of a type that requires conversion to Number
.
111-120
: Implement method logic for finalize
The new implementation of the finalize
method adds essential functionality by deleting rows where the upper bound of _cursor
is less than the provided cursor's orderKey
. This ensures proper cleanup of outdated data.
async finalize(cursor?: Cursor) { | ||
// TODO: Implement | ||
throw new Error("Not implemented"); | ||
// No Implementation required | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Implement proper finalization for resource cleanup
The finalize
method should ensure:
- All buffered data is written to the CSV file
- The CSV stringifier and file streams are properly closed
Here's a suggested implementation:
- async finalize(cursor?: Cursor) {
- // No Implementation required
- }
+ async finalize(cursor?: Cursor) {
+ return new Promise<void>((resolve, reject) => {
+ this._stringifier.end(() => {
+ resolve();
+ });
+ });
+ }
Committable suggestion skipped: line range outside the PR's diff.
async invalidateOnRestart(cursor?: Cursor) { | ||
// No Implementation required | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codebase verification
Empty implementation of required abstract method needs attention
The invalidateOnRestart
is an abstract method in the base Sink
class that must be properly implemented by all subclasses. Looking at other sink implementations (SQLite, Drizzle), they handle restart invalidation by delegating to their invalidate
method. The current CSV sink's empty implementation with "No Implementation required" comment is incorrect since:
- It's a required abstract method from the base class
- The CSV sink already throws "not implemented" for
invalidate
, suggesting reorg/invalidation isn't supported - Other sinks properly implement this for data consistency
Either:
- Implement proper invalidation support in the CSV sink
- Or throw the same "not implemented" error as the
invalidate
method to be consistent
🔗 Analysis chain
Implementation needed for data integrity during restarts
The comment suggests no implementation is required, but this seems inconsistent with:
- The PR's objective to add invalidation
- Other sinks implementing this method
- The method being called from the indexer's
run
function with a starting cursor
Consider implementing proper invalidation logic to ensure data consistency when the indexer restarts.
Let's verify how other sinks implement this method:
Let me verify the base Sink
class implementation to understand if this is an optional method:
Let me try a different approach to find the base class:
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for invalidateOnRestart implementations in other sinks
rg -A 5 "invalidateOnRestart.*\{" packages/indexer/src/sinks/
Length of output: 1303
Script:
#!/bin/bash
# Search for the Sink base class definition
ast-grep --pattern 'abstract class Sink {
$$$
}'
Length of output: 52
Script:
#!/bin/bash
# Search for Sink class definition using ripgrep
rg -A 10 "^(export )?(abstract )?class Sink" packages/indexer/src/
Length of output: 760
if (cursor?.orderKey === undefined) return; | ||
|
||
const cursorValue = Number(cursor.orderKey); | ||
|
||
const sql = `DELETE FROM ${this._config.tableName} WHERE ${this._config.cursorColumn ?? "_cursor"} > ?`; | ||
this._db.prepare(sql).run(cursorValue); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Several improvements needed for robustness
While the basic logic is sound, there are several areas that could be improved for better safety and reliability:
- SQL Injection risk: Table and column names are directly interpolated into the query
- Missing error handling for SQL execution
- Potential type conversion issues with
Number(cursor.orderKey)
- Edge case handling when cursorColumn is undefined needs verification
Consider applying these improvements:
async invalidate(cursor?: Cursor) {
if (cursor?.orderKey === undefined) return;
- const cursorValue = Number(cursor.orderKey);
+ // Validate orderKey is numeric
+ const cursorValue = Number(cursor.orderKey);
+ if (isNaN(cursorValue)) {
+ throw new Error('Invalid cursor orderKey: must be numeric');
+ }
+
+ try {
+ // Use parameterized query for table/column names
+ const columnName = this._config.cursorColumn ?? "_cursor";
+ // Validate table/column names to prevent SQL injection
+ if (!/^[a-zA-Z_][a-zA-Z0-9_]*$/.test(this._config.tableName) ||
+ !/^[a-zA-Z_][a-zA-Z0-9_]*$/.test(columnName)) {
+ throw new Error('Invalid table or column name');
+ }
+
+ const sql = `DELETE FROM ${this._config.tableName} WHERE ${columnName} > ?`;
+ this._db.prepare(sql).run(cursorValue);
+ } catch (error) {
+ throw new Error(`Failed to invalidate data: ${error.message}`);
+ }
}
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
if (cursor?.orderKey === undefined) return; | |
const cursorValue = Number(cursor.orderKey); | |
const sql = `DELETE FROM ${this._config.tableName} WHERE ${this._config.cursorColumn ?? "_cursor"} > ?`; | |
this._db.prepare(sql).run(cursorValue); | |
if (cursor?.orderKey === undefined) return; | |
// Validate orderKey is numeric | |
const cursorValue = Number(cursor.orderKey); | |
if (isNaN(cursorValue)) { | |
throw new Error('Invalid cursor orderKey: must be numeric'); | |
} | |
try { | |
// Use parameterized query for table/column names | |
const columnName = this._config.cursorColumn ?? "_cursor"; | |
// Validate table/column names to prevent SQL injection | |
if (!/^[a-zA-Z_][a-zA-Z0-9_]*$/.test(this._config.tableName) || | |
!/^[a-zA-Z_][a-zA-Z0-9_]*$/.test(columnName)) { | |
throw new Error('Invalid table or column name'); | |
} | |
const sql = `DELETE FROM ${this._config.tableName} WHERE ${columnName} > ?`; | |
this._db.prepare(sql).run(cursorValue); | |
} catch (error) { | |
throw new Error(`Failed to invalidate data: ${error.message}`); | |
} |
No description provided.