-
Notifications
You must be signed in to change notification settings - Fork 6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add mongo sink and some fixes for drizzle sink #127
Conversation
📝 Walkthrough📝 WalkthroughWalkthroughThis pull request introduces a comprehensive update to the Apibara indexer ecosystem, focusing on adding MongoDB sink functionality and updating various configuration files. The changes span multiple packages, including Changes
Possibly related PRs
Suggested reviewers
Poem
📜 Recent review detailsConfiguration used: CodeRabbit UI 📒 Files selected for processing (4)
🚧 Files skipped from review as they are similar to previous changes (4)
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
076f2d0
to
75f76be
Compare
75f76be
to
33a04f7
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, only a couple of smaller changes.
If you try to run it locally with a dockerized mongodb it should fail because mongodb requires a cluster to support transactions. Is that the case?
return await this.collection.updateOne( | ||
{ | ||
...filter, | ||
_cursor: { | ||
to: null, | ||
} as Condition<MongoCursor | null>, | ||
}, | ||
{ | ||
...update, | ||
$set: { | ||
...update.$set, | ||
"_cursor.to": Number(this.endCursor?.orderKey), | ||
} as unknown as MatchKeysAndValues<TSchema>, | ||
}, | ||
{ ...options, session: this.session }, | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe this one should also duplicate the previously existing value. The easiest way (double check to make sure I'm correct) is to use findOneAndUpdate
asking to return the value before the update. Notice that in this case the update $set
action should set the lower bound to the endCursor (since we are updating in place the old value to be the new value).
const updated = this.collection.findOneAndUpdate(..., ..., { returnDocument: "before" })
updated._cursor[to] = endCursor.orderKey
this.collection.insert(updated)
;
async updateMany( | ||
filter: Filter<TSchema>, | ||
update: UpdateFilter<TSchema>, | ||
options?: UpdateOptions, | ||
): Promise<UpdateResult<TSchema>> { | ||
return await this.collection.updateMany( | ||
{ | ||
...filter, | ||
_cursor: { to: null }, | ||
}, | ||
{ | ||
...update, | ||
$set: { | ||
...update.$set, | ||
"_cursor.to": Number(this.endCursor?.orderKey), | ||
} as unknown as MatchKeysAndValues<TSchema>, | ||
}, | ||
{ ...options, session: this.session }, | ||
); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method should:
- select the old values with
find
- update to the new values with
updateMany
. Remember to change it to set the_cursor.from
to endCursor and leaving_cursor.to
unchanged. - adjust the cursor.to of the old values
- insert the old values back into the db
} | ||
|
||
async deleteOne( | ||
filter?: Filter<TSchema>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make the filter non nullable.
packages/sink-mongo/src/mongo.ts
Outdated
} | ||
} | ||
|
||
export const mongo = (args: MongoSinkOptions) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should name it mongoSink
or it's going to be annoying when we use it in a project.
cae6fa6
to
e914bb1
Compare
packages/sink-mongo/src/mongo.ts
Outdated
db: new MongoSinkTransactionDb(db, session, endCursor), | ||
session, | ||
}); | ||
return "Transaction committed."; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i was just testing if adding this resolves some errors XD.
not required.
38ad34f
to
5974cbd
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🧹 Nitpick comments (5)
packages/sink-mongo/src/mongo.ts (2)
39-58
: Possible performance concerns with repeated deleteMany calls.
Looping over each collection and issuing a deleteMany for each might be expensive for large numbers of collections. Consider bulk operations or parallelizing where possible.
60-92
: Edge case in invalidate logic.
For very large or negative orderKey values, ensure that the conversion with Number() is correctly handled in all conditions. If orderKey can be null, add a defensive check.packages/sink-mongo/src/collection.ts (1)
121-172
: Bulk update approach is solid.
You retrieve all documents first, then update them, and finally insert their old version with updated cursor ranges. This ensures an accurate audit trail. Keep an eye on memory usage if oldDocs is large.packages/sink-mongo/docker-compose.orbstack.yaml (1)
13-19
: Consider adding retry logic to health check commandWhile the health check implementation is good, the MongoDB initialization might fail if the server isn't ready to accept connections.
Consider modifying the health check command to include retry logic:
- test: echo "try { rs.status() } catch (err) { rs.initiate({_id:'rs0',members:[{_id:0,host:'localhost:27017'}]}) }" | mongosh --port 27017 --quiet + test: | + mongosh --port 27017 --quiet --eval " + for (let i = 0; i < 3; i++) { + try { + rs.status(); + exit(0); + } catch (err) { + if (err.codeName === 'NotYetInitialized') { + rs.initiate({_id:'rs0',members:[{_id:0,host:'localhost:27017'}]}); + exit(0); + } + sleep(1000); + } + } + exit(1); + "packages/sink-mongo/docker-compose.yaml (1)
1-26
: Consider consolidating Docker Compose configurationsHaving two nearly identical Docker Compose files increases maintenance overhead. The only significant difference is the host resolution (
localhost
vshost.docker.internal
).Consider using a base compose file with environment-specific overrides:
# docker-compose.base.yaml version: "3.8" services: mongo1: image: mongo:7.0 command: ["--replSet", "rs0", "--bind_ip_all", "--port", "27017"] ports: - 27017:27017 volumes: - "mongo1_data:/data/db" - "mongo1_config:/data/configdb" volumes: mongo1_data: mongo1_config:Then use environment-specific files for overrides:
# docker-compose.override.yaml services: mongo1: extra_hosts: - "host.docker.internal:host-gateway" healthcheck: test: echo "..." # host.docker.internal version # docker-compose.orbstack.yaml services: mongo1: extra_hosts: - "localhost:host-gateway" healthcheck: test: echo "..." # localhost version
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
pnpm-lock.yaml
is excluded by!**/pnpm-lock.yaml
📒 Files selected for processing (24)
.github/workflows/build.yml
(1 hunks)change/@apibara-indexer-b2b2208a-7a4e-4779-826d-c246b023891a.json
(1 hunks)change/@apibara-sink-mongo-0e59869f-bf56-417c-b432-622a2c1bb1da.json
(1 hunks)examples/cli/indexers/1-evm.indexer.ts
(1 hunks)examples/cli/indexers/2-starknet.indexer.ts
(1 hunks)examples/indexer/src/indexer.ts
(1 hunks)examples/starknet-indexer/src/indexer.ts
(1 hunks)packages/indexer/build.config.ts
(1 hunks)packages/indexer/docker-compose.yaml
(1 hunks)packages/indexer/package.json
(1 hunks)packages/indexer/src/sinks/drizzle/drizzle.test.ts
(3 hunks)packages/indexer/src/sinks/drizzle/drizzle.ts
(1 hunks)packages/indexer/src/sinks/drizzle/update.ts
(2 hunks)packages/sink-mongo/README.md
(1 hunks)packages/sink-mongo/build.config.ts
(1 hunks)packages/sink-mongo/docker-compose.orbstack.yaml
(1 hunks)packages/sink-mongo/docker-compose.yaml
(1 hunks)packages/sink-mongo/package.json
(1 hunks)packages/sink-mongo/src/collection.ts
(1 hunks)packages/sink-mongo/src/index.ts
(1 hunks)packages/sink-mongo/src/mongo.test.ts
(1 hunks)packages/sink-mongo/src/mongo.ts
(1 hunks)packages/sink-mongo/src/transaction.ts
(1 hunks)packages/sink-mongo/tsconfig.json
(1 hunks)
✅ Files skipped from review due to trivial changes (12)
- packages/sink-mongo/README.md
- change/@apibara-indexer-b2b2208a-7a4e-4779-826d-c246b023891a.json
- packages/indexer/docker-compose.yaml
- examples/starknet-indexer/src/indexer.ts
- packages/sink-mongo/src/index.ts
- examples/indexer/src/indexer.ts
- packages/sink-mongo/tsconfig.json
- change/@apibara-sink-mongo-0e59869f-bf56-417c-b432-622a2c1bb1da.json
- examples/cli/indexers/2-starknet.indexer.ts
- examples/cli/indexers/1-evm.indexer.ts
- packages/sink-mongo/build.config.ts
- packages/sink-mongo/package.json
🧰 Additional context used
📓 Learnings (2)
packages/indexer/src/sinks/drizzle/drizzle.ts (1)
Learnt from: fracek
PR: apibara/typescript-sdk#109
File: packages/indexer/src/sinks/drizzle/drizzle.test.ts:16-18
Timestamp: 2024-11-12T11:46:58.595Z
Learning: In `packages/indexer/src/sinks/drizzle`, the import references are correct and up-to-date according to the new code structure.
packages/indexer/src/sinks/drizzle/drizzle.test.ts (1)
Learnt from: fracek
PR: apibara/typescript-sdk#109
File: packages/indexer/src/sinks/drizzle/drizzle.test.ts:16-18
Timestamp: 2024-11-12T11:46:58.595Z
Learning: In `packages/indexer/src/sinks/drizzle`, the import references are correct and up-to-date according to the new code structure.
🔇 Additional comments (30)
packages/indexer/build.config.ts (1)
16-16
: LGTM! Verify the internal testing file exists.
The addition of the internal testing file to the build configuration aligns with the new export in package.json.
✅ Verification successful
File exists at the specified location
The internal testing file exists at packages/indexer/src/internal/testing.ts
, confirming that the build configuration entry is valid.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Verify that the internal testing file exists
fd "testing.ts" "src/internal"
Length of output: 128
Script:
#!/bin/bash
# Let's first check if the file exists in the correct location relative to the repository root
fd "testing.ts" "packages/indexer/src/internal"
Length of output: 86
packages/indexer/package.json (2)
79-83
: LGTM! Internal export configuration is consistent.
The new internal export follows the established pattern and correctly maps to the corresponding dist files.
Line range hint 1-1
: Verify mongo dependencies for the mongo sink.
The PR title mentions adding a mongo sink, but I don't see any MongoDB-related dependencies in the package.json. This might be because:
- The mongo dependencies are in a separate package
- The mongo dependencies are yet to be added
packages/indexer/src/sinks/drizzle/update.ts (4)
15-15
: No issues with the added import.
Importing the Int8Range type here clarifies the expected shape of the _cursor data.
16-16
: Check for usage consistency.
The helper function getDrizzleCursor is imported correctly. Ensure it’s invoked consistently wherever the cursor is manipulated, to avoid mismatched cursor formats.
37-40
: Consider potential concurrency implications.
The code correctly selects old records based on the upper_inf(_cursor) condition. However, under high-concurrency scenarios, concurrent writers might insert additional records matching this condition before the subsequent insert. Keep in mind transactional integrity and possible row lock usage to prevent race conditions.
44-58
: Validate insertion of old record versions.
Refreshing old records with a new upper bound is a sound approach to maintain a version history. Confirm that the table’s unique constraints (e.g., primary keys) or index definitions are compatible with duplicating rows that only differ in _cursor range. Otherwise, safeguarding against primary key collisions may be needed.
packages/indexer/src/sinks/drizzle/drizzle.ts (1)
124-124
: Exported factory function for DrizzleSink is good.
Exposing drizzleSink as a factory function is clear and consistent with typical sink usage patterns. No major issues noted.
packages/indexer/src/sinks/drizzle/drizzle.test.ts (3)
16-16
: Import statement matches new naming convention.
Switching from “drizzle as drizzleSink” to “drizzleSink” preserves clarity and aligns with the rest of the codebase’s updated references.
111-115
: Ensure the updated length requirement is valid.
The expected row count has changed to 6, reflecting the new versioning approach where a row may be duplicated with different cursor bounds. Double-check that this accurately matches your domain logic.
118-118
: Test renamed to “should soft delete data”.
Switching from a hard-to soft-delete mechanism is consistent with your versioning approach. Ensure all references, including documentation or older test references, also reflect this behavior.
packages/sink-mongo/src/mongo.ts (3)
1-3
: Confirm handling of BigInt orderKey.
Currently, orderKey is cast to a number using Number(). This may lead to precision loss if orderKey values exceed the 53-bit integer limit in JavaScript. Consider handling it as a string or using a library for BigInt-safe conversions.
13-19
: Constructor usage is clear.
The constructor neatly separates the client from the config. The approach is straightforward and maintains clarity for future modifications.
99-102
: mongoSink helper naming meets prior feedback.
This matches the earlier suggestion to name the sink method as “mongoSink.” Good job applying that recommendation.
packages/sink-mongo/src/collection.ts (5)
32-38
: Class definition is well-structured.
Explicitly storing session and endCursor in the constructor fosters clarity in subsequent operations.
39-53
: Ensure consistent cursor assignment.
When inserting documents, from is set to Number(this.endCursor?.orderKey). If endCursor is undefined or a large BigInt, this might behave unexpectedly.
71-119
: Document versioning logic is thorough.
The approach of updating the existing document, inserting the old version with a new _cursor.to, is a sound pattern for historical tracking. Just ensure performance remains acceptable for high write volumes.
174-190
: Soft delete operation is consistent.
Marking documents with an updated _cursor.to instead of a full deletion aligns with the versioning approach. This ensures historical data remains accessible.
210-235
: Consistent filtering on latest data.
Using "_cursor.to": null to filter for active documents is intuitive. This approach is easy to read and maintain.
packages/sink-mongo/src/mongo.test.ts (7)
1-5
: Thorough usage of mocking.
MockClient and generateMockMessages are utilized well for end-to-end style testing. This keeps the tests isolated from real external dependencies.
42-75
: Good coverage of insert scenario.
The test ensures multiple items are inserted and validates the inserts with expected data. This coverage is crucial for guaranteeing correct sink behavior.
77-128
: Update flow test is robust.
You verify that old data is retained with a new version created upon updates. The final document count is six, confirming versioning.
130-180
: Soft delete testing is well-defined.
It checks whether the row is “deleted” by updating _cursor.to and confirms the final length. This thoroughly covers the soft delete approach.
182-239
: Selective retrieval test is thorough.
You confirm that only records with null upper cursor bounds are fetched, while older versions remain in the database. Excellent demonstration of the versioning logic.
241-274
: Invalidate test is essential.
By verifying the final set of rows and their updated _cursor fields, you validate the effect of invalidation on the data set. Good coverage.
276-279
: Test cleanup is correct.
Closing the MongoClient ensures no open handles remain. This helps avoid resource leaks.
packages/sink-mongo/src/transaction.ts (2)
5-11
: Straightforward constructor.
Providing db, session, and endCursor is sufficient for transaction context. This helps keep transaction scope well-defined.
12-23
: Separation of collection logic is neat.
Returning a MongoSinkCollection from this method is consistent with the approach in mongo.ts. The layering clarifies session usage.
packages/sink-mongo/docker-compose.orbstack.yaml (1)
7-8
: LGTM: MongoDB configuration follows best practices
The MongoDB 7.0 setup with replica set configuration and bind_ip_all is appropriate for development purposes.
.github/workflows/build.yml (1)
36-41
: LGTM: MongoDB service configuration is consistent with development environment
The MongoDB setup in CI matches the local development configuration, ensuring consistent behavior across environments. The placement after Protoc installation and before dependency installation is appropriate.
Let's verify the MongoDB service configuration is properly referenced in test files:
✅ Verification successful
MongoDB configuration in CI matches test environment setup
The MongoDB service configuration in the GitHub Actions workflow is correctly aligned with the test environment. The test file (packages/sink-mongo/src/mongo.test.ts
) uses the same connection string parameters:
- Port: 27017 (matches workflow)
- Replica set: rs0 (matches workflow)
- Host: localhost (appropriate for CI environment)
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Check for MongoDB connection configuration in test files
# Look for MongoDB connection strings or configuration in test files
rg -l "mongodb://.*:27017" "packages/sink-mongo/src" "packages/sink-mongo/test"
# Check for replica set configuration references
rg -l "rs0" "packages/sink-mongo/src" "packages/sink-mongo/test"
Length of output: 344
Script:
#!/bin/bash
# Let's check the actual content of the test file to verify MongoDB configuration
rg -A 5 "mongodb://" "packages/sink-mongo/src/mongo.test.ts"
# Also check for any other MongoDB connection strings in the codebase
fd -t f -e ts -e js -x rg -l "mongodb://" {}
# Look for any environment variable references that might contain MongoDB configuration
rg -A 2 "process\.env\.MONGODB"
Length of output: 300
// 3. Update matching records with new values and new 'lowerbound' cursor | ||
return originalUpdate | ||
.set({ | ||
...values, | ||
_cursor: sql`int8range(${Number(this.endCursor?.orderKey!)}, NULL, '[)')`, | ||
} as PgUpdateSetSource<TTable>) | ||
.where(sql`${where ? sql`${where} AND ` : sql``}upper_inf(_cursor)`); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ensure safe conversion from BigInt to Number.
Using Number(this.endCursor?.orderKey!) might risk losing precision for very large BigInt values. Verify that this range is acceptable for your domain.
@@ -35,7 +35,7 @@ describe("Drizzle Test", () => { | |||
await db.execute(sql`DROP TABLE IF EXISTS test_table`); | |||
// create test_table with db | |||
await db.execute( | |||
sql`CREATE TABLE test_table (id SERIAL PRIMARY KEY, data TEXT, _cursor INT8RANGE)`, | |||
sql`CREATE TABLE test_table (id SERIAL, data TEXT, _cursor INT8RANGE)`, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Removing PRIMARY KEY constraint could lead to duplicated ids.
The table definition lacks a primary key, potentially allowing multiple rows with the same id. Confirm this is intentional, or reintroduce a primary key to guard against duplicate data.
async transaction( | ||
{ cursor, endCursor, finality }: SinkCursorParams, | ||
cb: (params: { | ||
db: MongoSinkTransactionDb; | ||
session: ClientSession; | ||
}) => Promise<void>, | ||
): Promise<void> { | ||
await this.client.withSession(async (session) => | ||
session.withTransaction(async (session) => { | ||
const db = this.client.db(this.config.dbName, this.config.dbOptions); | ||
await cb({ | ||
db: new MongoSinkTransactionDb(db, session, endCursor), | ||
session, | ||
}); | ||
}), | ||
); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codebase verification
Based on the gathered information, I can now provide the final response:
MongoDB transaction retry logic should be implemented
The current implementation relies on MongoDB's default transaction behavior without explicit retry handling. While MongoDB has built-in retry mechanisms for transient errors, implementing application-level retry logic would provide better control and visibility over transaction failures. Consider wrapping the transaction in a try-catch block with a retry mechanism, similar to the retry pattern used in the indexer module.
packages/sink-mongo/src/mongo.ts
: Add retry logic aroundsession.withTransaction()
to handle transient failures- Consider exposing retry configuration options through
MongoSinkOptions
interface
🔗 Analysis chain
Verify transaction error handling.
Using session.withTransaction is clean, but consider handling retries if the transaction fails mid-operation. Otherwise, partial writes might occur before the transaction reverts.
Let me search for any retry-related configurations or error handling mechanisms in the MongoDB client setup.
Let me check if there are any MongoDB-specific error handling or transaction configurations in the codebase.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Searching for any existing error-handling or retry logic in the entire codebase
rg "withTransaction" -A 10 -B 5
Length of output: 3261
Script:
#!/bin/bash
# Search for MongoDB client configuration and initialization
ast-grep --pattern 'new MongoClient($$$)'
# Search for any retry-related configurations
rg -i "retry" -A 5 -B 5
Length of output: 3299
Script:
#!/bin/bash
# Search for MongoDB transaction options or configurations
rg "MongoClient|TransactionOptions" -A 5 -B 5
# Search for any error handling related to MongoDB transactions
ast-grep --pattern 'try {
$$$
withTransaction($$$)
$$$
} catch'
Length of output: 2363
5974cbd
to
3b81140
Compare
No description provided.