diff --git a/change/@apibara-sink-mongo-0e59869f-bf56-417c-b432-622a2c1bb1da.json b/change/@apibara-sink-mongo-0e59869f-bf56-417c-b432-622a2c1bb1da.json new file mode 100644 index 0000000..09bc3e7 --- /dev/null +++ b/change/@apibara-sink-mongo-0e59869f-bf56-417c-b432-622a2c1bb1da.json @@ -0,0 +1,7 @@ +{ + "type": "prerelease", + "comment": "sink-mongo: add mongodb sink", + "packageName": "@apibara/sink-mongo", + "email": "jadejajaipal5@gmail.com", + "dependentChangeType": "patch" +} diff --git a/packages/sink-mongo/README.md b/packages/sink-mongo/README.md new file mode 100644 index 0000000..c2f2690 --- /dev/null +++ b/packages/sink-mongo/README.md @@ -0,0 +1,7 @@ +# `@apibara/sink-mongo` + +TODO + +## Installation + +TODO diff --git a/packages/sink-mongo/build.config.ts b/packages/sink-mongo/build.config.ts new file mode 100644 index 0000000..9aaddef --- /dev/null +++ b/packages/sink-mongo/build.config.ts @@ -0,0 +1,11 @@ +import { defineBuildConfig } from "unbuild"; + +export default defineBuildConfig({ + entries: ["./src/index.ts"], + clean: true, + outDir: "./dist", + declaration: true, + rollup: { + emitCJS: true, + }, +}); diff --git a/packages/sink-mongo/package.json b/packages/sink-mongo/package.json new file mode 100644 index 0000000..d229b16 --- /dev/null +++ b/packages/sink-mongo/package.json @@ -0,0 +1,39 @@ +{ + "name": "@apibara/sink-mongo", + "version": "2.0.0-beta.26", + "type": "module", + "files": [ + "dist", + "src", + "README.md" + ], + "main": "./dist/index.mjs", + "types": "./dist/index.d.ts", + "exports": { + ".": { + "types": "./dist/index.d.ts", + "import": "./dist/index.mjs", + "require": "./dist/index.cjs", + "default": "./dist/index.mjs" + } + }, + "scripts": { + "build": "unbuild", + "typecheck": "tsc --noEmit", + "lint": "biome check .", + "lint:fix": "pnpm lint --write" + }, + "devDependencies": { + "@types/node": "^20.14.0", + "mongodb": "^6.12.0", + "unbuild": "^2.0.0", + "vitest": "^1.6.0" + }, + "peerDependencies": { + "mongodb": "^6.12.0" + }, + "dependencies": { + "@apibara/indexer": "workspace:*", + "@apibara/protocol": "workspace:*" + } +} diff --git a/packages/sink-mongo/src/collection.ts b/packages/sink-mongo/src/collection.ts new file mode 100644 index 0000000..78d3f22 --- /dev/null +++ b/packages/sink-mongo/src/collection.ts @@ -0,0 +1,184 @@ +import type { Cursor } from "@apibara/protocol"; +import type { + BulkWriteOptions, + ClientSession, + Collection, + Condition, + DeleteOptions, + Document, + Filter, + FindCursor, + FindOptions, + InsertManyResult, + InsertOneOptions, + InsertOneResult, + MatchKeysAndValues, + OptionalUnlessRequiredId, + UpdateFilter, + UpdateOptions, + UpdateResult, + WithId, +} from "mongodb"; + +export type MongoCursor = { + from: number | null; + to: number | null; +}; + +export type CursoredSchema = TSchema & { + _cursor: MongoCursor; +}; + +export class MongoSinkCollection { + constructor( + private session: ClientSession, + private collection: Collection, + private endCursor?: Cursor, + ) {} + + async insertOne( + doc: OptionalUnlessRequiredId, + options?: InsertOneOptions, + ): Promise> { + return await this.collection.insertOne( + { + ...doc, + _cursor: { + from: Number(this.endCursor?.orderKey), + to: null, + } as MongoCursor, + }, + { ...options, session: this.session }, + ); + } + + async insertMany( + docs: ReadonlyArray>, + options?: BulkWriteOptions, + ): Promise> { + return await this.collection.insertMany( + docs.map((doc) => ({ + ...doc, + _cursor: { + from: Number(this.endCursor?.orderKey), + to: null, + } as MongoCursor, + })), + { ...options, session: this.session }, + ); + } + + async updateOne( + filter: Filter, + update: UpdateFilter, + options?: UpdateOptions, + ): Promise> { + return await this.collection.updateOne( + { + ...filter, + _cursor: { + to: null, + } as Condition, + }, + { + ...update, + $set: { + ...update.$set, + "_cursor.to": Number(this.endCursor?.orderKey), + } as unknown as MatchKeysAndValues, + }, + { ...options, session: this.session }, + ); + } + + async updateMany( + filter: Filter, + update: UpdateFilter, + options?: UpdateOptions, + ): Promise> { + return await this.collection.updateMany( + { + ...filter, + _cursor: { to: null }, + }, + { + ...update, + $set: { + ...update.$set, + "_cursor.to": Number(this.endCursor?.orderKey), + } as unknown as MatchKeysAndValues, + }, + { ...options, session: this.session }, + ); + } + + async deleteOne( + filter?: Filter, + options?: DeleteOptions, + ): Promise> { + return await this.collection.updateOne( + { + ...((filter ?? {}) as Filter), + _cursor: { + to: null, + } as Condition, + }, + { + $set: { + "_cursor.to": Number(this.endCursor?.orderKey), + } as unknown as MatchKeysAndValues, + }, + { ...options, session: this.session }, + ); + } + + async deleteMany( + filter?: Filter, + options?: DeleteOptions, + ): Promise> { + return await this.collection.updateMany( + { + ...((filter ?? {}) as Filter), + _cursor: { + to: null, + } as Condition, + }, + { + $set: { + "_cursor.to": Number(this.endCursor?.orderKey), + } as unknown as MatchKeysAndValues, + }, + { ...options, session: this.session }, + ); + } + + async findOne( + filter: Filter, + options: Omit, + ): Promise | null> { + return await this.collection.findOne( + { + ...filter, + _cursor: { + to: null, + } as Condition, + }, + { ...options, session: this.session }, + ); + } + + find( + filter: Filter, + options?: FindOptions, + ): FindCursor> { + return this.collection.find( + { + ...filter, + _cursor: { + to: null, + } as Condition, + }, + { ...options, session: this.session }, + ); + } +} diff --git a/packages/sink-mongo/src/index.ts b/packages/sink-mongo/src/index.ts new file mode 100644 index 0000000..69f0b58 --- /dev/null +++ b/packages/sink-mongo/src/index.ts @@ -0,0 +1,3 @@ +export * from "./mongo"; +export * from "./transaction"; +export * from "./collection"; diff --git a/packages/sink-mongo/src/mongo.ts b/packages/sink-mongo/src/mongo.ts new file mode 100644 index 0000000..90221c6 --- /dev/null +++ b/packages/sink-mongo/src/mongo.ts @@ -0,0 +1,102 @@ +import { Sink, type SinkCursorParams } from "@apibara/indexer"; +import type { Cursor } from "@apibara/protocol"; +import type { ClientSession, DbOptions, MongoClient } from "mongodb"; +import { MongoSinkTransactionDb } from "./transaction"; + +export interface MongoSinkOptions { + client: MongoClient; + dbName: string; + dbOptions?: DbOptions; + collections: string[]; +} + +export class MongoSink extends Sink { + constructor( + private client: MongoClient, + private config: Omit, + ) { + super(); + } + + async transaction( + { cursor, endCursor, finality }: SinkCursorParams, + cb: (params: { + db: MongoSinkTransactionDb; + session: ClientSession; + }) => Promise, + ): Promise { + await this.client.withSession(async (session) => + session.withTransaction(async (session) => { + const db = this.client.db(this.config.dbName, this.config.dbOptions); + await cb({ + db: new MongoSinkTransactionDb(db, session, endCursor), + session, + }); + return "Transaction committed."; + }), + ); + } + + async finalize(cursor?: Cursor) { + if (cursor?.orderKey === undefined) return; + + await this.client.withSession(async (session) => + session.withTransaction(async (session) => { + const db = this.client.db(this.config.dbName, this.config.dbOptions); + const orderKeyValue = Number(cursor.orderKey); + + for (const collection of this.config.collections) { + // Delete documents where the upper bound of _cursor is less than the finalize cursor + await db.collection(collection).deleteMany( + { + "_cursor.to": { $lt: orderKeyValue }, + }, + { session }, + ); + } + }), + ); + } + + async invalidate(cursor?: Cursor) { + if (cursor?.orderKey === undefined) return; + + this.client.withSession(async (session) => + session.withTransaction(async (session) => { + const db = this.client.db(this.config.dbName, this.config.dbOptions); + const orderKeyValue = Number(cursor.orderKey); + for (const collection of this.config.collections) { + // Delete documents where the lower bound of _cursor is greater than the invalidate cursor + await db.collection(collection).deleteMany( + { + "cursor.from": { + $gt: orderKeyValue, + }, + }, + { session }, + ); + + // Update documents where the upper bound of _cursor is greater than the invalidate cursor + await db.collection(collection).updateMany( + { "_cursor.to": { $gt: orderKeyValue } }, + { + $set: { + "_cursor.to": null, + }, + }, + { session }, + ); + } + }), + ); + } + + async invalidateOnRestart(cursor?: Cursor) { + await this.invalidate(cursor); + } +} + +export const mongo = (args: MongoSinkOptions) => { + const { client, ...rest } = args; + return new MongoSink(client, rest); +}; diff --git a/packages/sink-mongo/src/transaction.ts b/packages/sink-mongo/src/transaction.ts new file mode 100644 index 0000000..a15ea0e --- /dev/null +++ b/packages/sink-mongo/src/transaction.ts @@ -0,0 +1,24 @@ +import type { Cursor } from "@apibara/protocol"; +import type { ClientSession, CollectionOptions, Db } from "mongodb"; +import { MongoSinkCollection } from "./collection"; + +export class MongoSinkTransactionDb { + constructor( + private db: Db, + private session: ClientSession, + private endCursor?: Cursor, + ) {} + + collection( + name: string, + options?: CollectionOptions, + ) { + const collection = this.db.collection(name, options); + + return new MongoSinkCollection( + this.session, + collection, + this.endCursor, + ); + } +} diff --git a/packages/sink-mongo/tsconfig.json b/packages/sink-mongo/tsconfig.json new file mode 100644 index 0000000..9d05974 --- /dev/null +++ b/packages/sink-mongo/tsconfig.json @@ -0,0 +1,11 @@ +{ + "extends": "../../tsconfig.json", + "compilerOptions": { + "outDir": "dist", + "declarationDir": "dist", + "noEmit": false, + "rootDir": "src", + "types": ["node"] + }, + "include": ["src/"] +} diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 42b75cf..1fe9a55 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -585,6 +585,28 @@ importers: specifier: ^1.6.0 version: 1.6.0(@types/node@20.12.13) + packages/sink-mongo: + dependencies: + '@apibara/indexer': + specifier: workspace:* + version: link:../indexer + '@apibara/protocol': + specifier: workspace:* + version: link:../protocol + devDependencies: + '@types/node': + specifier: ^20.14.0 + version: 20.14.0 + mongodb: + specifier: ^6.12.0 + version: 6.12.0 + unbuild: + specifier: ^2.0.0 + version: 2.0.0(typescript@5.6.2) + vitest: + specifier: ^1.6.0 + version: 1.6.0(@types/node@20.14.0) + packages/starknet: dependencies: '@apibara/protocol': @@ -1484,6 +1506,9 @@ packages: '@jridgewell/trace-mapping@0.3.25': resolution: {integrity: sha512-vNk6aEwybGtawWmy/PzwnGDOjCkLWSD2wqvjGGAgOAwCGWySYXfYoxt00IJkTF+8Lb57DwOb3Aa0o9CApepiYQ==} + '@mongodb-js/saslprep@1.1.9': + resolution: {integrity: sha512-tVkljjeEaAhCqTzajSdgbQ6gE6f3oneVwa3iXR6csiEwXXOFsiC6Uh9iAjAhXPtqa/XMDHWjjeNH/77m/Yq2dw==} + '@noble/curves@1.2.0': resolution: {integrity: sha512-oYclrNgRaM9SsBUBVbb8M6DTV7ZHRTKugureoYEncY5c65HOmRzvSiTE3y5CYaPYJA/GVkrhXEoF0M3Ya9PMnw==} @@ -1913,6 +1938,12 @@ packages: '@types/shimmer@1.0.5': resolution: {integrity: sha512-9Hp0ObzwwO57DpLFF0InUjUm/II8GmKAvzbefxQTihCb7KI6yc9yzf0nLc4mVdby5N4DRCgQM2wCup9KTieeww==} + '@types/webidl-conversions@7.0.3': + resolution: {integrity: sha512-CiJJvcRtIgzadHCYXw7dqEnMNRjhGZlYK05Mj9OyktqV8uVT8fD2BFOB7S1uwBE3Kj2Z+4UyPmFw/Ixgw/LAlA==} + + '@types/whatwg-url@11.0.5': + resolution: {integrity: sha512-coYR071JRaHa+xoEvvYqvnIHaVqaYrLPbsufM9BF63HkwI5Lgmy2QR8Q5K/lYDYo5AK82wOvSOS0UsLTpTG7uQ==} + '@vitest/expect@1.6.0': resolution: {integrity: sha512-ixEvFVQjycy/oNgHjqsL6AZCDduC+tflRluaHIzKIsdbzkLn2U/iBnVeJwB6HsIjQBdfMR8Z0tRxKUsvFJEeWQ==} @@ -2064,6 +2095,10 @@ packages: engines: {node: ^6 || ^7 || ^8 || ^9 || ^10 || ^11 || ^12 || >=13.7} hasBin: true + bson@6.10.1: + resolution: {integrity: sha512-P92xmHDQjSKPLHqFxefqMxASNq/aWJMEZugpCjf+AF/pgcUpMMQCg7t7+ewko0/u8AapvF3luf/FoehddEK+sA==} + engines: {node: '>=16.20.1'} + buffer-from@1.1.2: resolution: {integrity: sha512-E+XQCRwSbaaiChtv6k6Dwgc+bx+Bs6vuKJHHl5kox/BaKbhiXzqQOwK4cO22yElGp2OCmjwVhT3HmxgyPGnJfQ==} @@ -2825,6 +2860,9 @@ packages: mdn-data@2.0.30: resolution: {integrity: sha512-GaqWWShW4kv/G9IEucWScBx9G1/vsFZZJUO+tD26M8J8z3Kw5RDQjaoZe03YAClgeS/SWPOcb4nkFBTEi5DUEA==} + memory-pager@1.5.0: + resolution: {integrity: sha512-ZS4Bp4r/Zoeq6+NLJpP+0Zzm0pR8whtGPf1XExKLJBAczGMnSi3It14OiNCStjQjM6NU1okjQGSxgEZN8eBYKg==} + merge-stream@2.0.0: resolution: {integrity: sha512-abv/qOcuPfk3URPfDzmZU1LKmuw8kT+0nIHvKrKgFrwifol/doWcdA4ZqsWQ8ENrFKkd67Mfpo/LovbIUsbt3w==} @@ -2910,6 +2948,36 @@ packages: module-details-from-path@1.0.3: resolution: {integrity: sha512-ySViT69/76t8VhE1xXHK6Ch4NcDd26gx0MzKXLO+F7NOtnqH68d9zF94nT8ZWSxXh8ELOERsnJO/sWt1xZYw5A==} + mongodb-connection-string-url@3.0.1: + resolution: {integrity: sha512-XqMGwRX0Lgn05TDB4PyG2h2kKO/FfWJyCzYQbIhXUxz7ETt0I/FqHjUeqj37irJ+Dl1ZtU82uYyj14u2XsZKfg==} + + mongodb@6.12.0: + resolution: {integrity: sha512-RM7AHlvYfS7jv7+BXund/kR64DryVI+cHbVAy9P61fnb1RcWZqOW1/Wj2YhqMCx+MuYhqTRGv7AwHBzmsCKBfA==} + engines: {node: '>=16.20.1'} + peerDependencies: + '@aws-sdk/credential-providers': ^3.188.0 + '@mongodb-js/zstd': ^1.1.0 || ^2.0.0 + gcp-metadata: ^5.2.0 + kerberos: ^2.0.1 + mongodb-client-encryption: '>=6.0.0 <7' + snappy: ^7.2.2 + socks: ^2.7.1 + peerDependenciesMeta: + '@aws-sdk/credential-providers': + optional: true + '@mongodb-js/zstd': + optional: true + gcp-metadata: + optional: true + kerberos: + optional: true + mongodb-client-encryption: + optional: true + snappy: + optional: true + socks: + optional: true + mri@1.2.0: resolution: {integrity: sha512-tzzskb3bG8LvYGFF/mDTpq3jpI6Q9wc3LEmBaghu+DdCssd1FakN7Bc0hVNmEyGq1bq3RgfkCb3cmQLpNPOroA==} engines: {node: '>=4'} @@ -3524,6 +3592,9 @@ packages: resolution: {integrity: sha512-UjgapumWlbMhkBgzT7Ykc5YXUT46F0iKu8SGXq0bcwP5dz/h0Plj6enJqjz1Zbq2l5WaqYnrVbwWOWMyF3F47g==} engines: {node: '>=0.10.0'} + sparse-bitfield@3.0.3: + resolution: {integrity: sha512-kvzhi7vqKTfkh0PZU+2D2PIllw2ymqJKujUcyPMd9Y75Nv4nPbGJZXNhxsgdQab2BmlDct1YnfQCguEvHr7VsQ==} + split2@4.2.0: resolution: {integrity: sha512-UcjcJOWknrNkF6PLX83qcHM6KHgVKNkV62Y8a5uYDVv9ydGQVwAHMKqHdJje1VTWpljG0WYpCDhrCdAOYH4TWg==} engines: {node: '>= 10.x'} @@ -3633,6 +3704,10 @@ packages: tr46@0.0.3: resolution: {integrity: sha512-N3WMsuqV66lT30CrXNbEjx4GEwlow3v6rr4mCcv6prnfwhS01rkgyFdjPNBYd9br7LpXV1+Emh01fHnq2Gdgrw==} + tr46@4.1.1: + resolution: {integrity: sha512-2lv/66T7e5yNyhAAC4NaKe5nVavzuGJQVVtRYLyQ2OI8tsJ61PMLlelehb0wi2Hx6+hT/OJUWZcw8MjlSRnxvw==} + engines: {node: '>=14'} + ts-error@1.0.6: resolution: {integrity: sha512-tLJxacIQUM82IR7JO1UUkKlYuUTmoY9HBJAmNWFzheSlDS5SPMcNIepejHJa4BpPQLAcbRhRf3GDJzyj6rbKvA==} @@ -3859,6 +3934,10 @@ packages: webidl-conversions@3.0.1: resolution: {integrity: sha512-2JAn3z8AR6rjK8Sm8orRC0h/bcl/DqL7tRPdGZ4I1CjdF+EaMLmYxBHyXuKL849eucPFhvBoxMsflfOb8kxaeQ==} + webidl-conversions@7.0.0: + resolution: {integrity: sha512-VwddBukDzu71offAQR975unBIGqfKZpM+8ZX6ySk8nYhVoo5CYaZyzt3YBvYtRtO+aoGlqxPg/B87NGVZ/fu6g==} + engines: {node: '>=12'} + webpack-sources@3.2.3: resolution: {integrity: sha512-/DyMEOrDgLKKIG0fmvtz+4dUX/3Ghozwgm6iPp8KRhvn+eQf9+Q7GWxVNMk3+uCPWfdXYC4ExGBckIXdFEfH1w==} engines: {node: '>=10.13.0'} @@ -3869,6 +3948,10 @@ packages: whatwg-fetch@3.6.20: resolution: {integrity: sha512-EqhiFU6daOA8kpjOWTL0olhVOF3i7OrFzSYiGsEMB8GcXS+RrzauAERX65xMeNWVqxA6HXH2m69Z9LaKKdisfg==} + whatwg-url@13.0.0: + resolution: {integrity: sha512-9WWbymnqj57+XEuqADHrCJ2eSXzn8WXIW/YSGaZtb2WKAInQ6CHfaUUcTyyver0p8BDg5StLQq8h1vtZuwmOig==} + engines: {node: '>=16'} + whatwg-url@5.0.0: resolution: {integrity: sha512-saE57nupxk6v3HY35+jzBwYa0rKSy0XR8JSxZPwgLr7ys0IBzhGviA1/TUGJLmSVqs8pb9AnvICXEuOHLprYTw==} @@ -4531,6 +4614,10 @@ snapshots: '@jridgewell/resolve-uri': 3.1.1 '@jridgewell/sourcemap-codec': 1.4.15 + '@mongodb-js/saslprep@1.1.9': + dependencies: + sparse-bitfield: 3.0.3 + '@noble/curves@1.2.0': dependencies: '@noble/hashes': 1.3.2 @@ -4975,6 +5062,12 @@ snapshots: '@types/shimmer@1.0.5': {} + '@types/webidl-conversions@7.0.3': {} + + '@types/whatwg-url@11.0.5': + dependencies: + '@types/webidl-conversions': 7.0.3 + '@vitest/expect@1.6.0': dependencies: '@vitest/spy': 1.6.0 @@ -5132,6 +5225,8 @@ snapshots: node-releases: 2.0.14 update-browserslist-db: 1.0.16(browserslist@4.23.0) + bson@6.10.1: {} + buffer-from@1.1.2: {} buffer@5.7.1: @@ -5914,6 +6009,8 @@ snapshots: mdn-data@2.0.30: {} + memory-pager@1.5.0: {} + merge-stream@2.0.0: {} merge2@1.4.1: {} @@ -5996,6 +6093,17 @@ snapshots: module-details-from-path@1.0.3: {} + mongodb-connection-string-url@3.0.1: + dependencies: + '@types/whatwg-url': 11.0.5 + whatwg-url: 13.0.0 + + mongodb@6.12.0: + dependencies: + '@mongodb-js/saslprep': 1.1.9 + bson: 6.10.1 + mongodb-connection-string-url: 3.0.1 + mri@1.2.0: {} ms@2.1.2: {} @@ -6608,6 +6716,10 @@ snapshots: source-map@0.6.1: {} + sparse-bitfield@3.0.3: + dependencies: + memory-pager: 1.5.0 + split2@4.2.0: {} sqlite@5.1.1: {} @@ -6737,6 +6849,10 @@ snapshots: tr46@0.0.3: {} + tr46@4.1.1: + dependencies: + punycode: 2.3.1 + ts-error@1.0.6: {} ts-mixer@6.0.4: {} @@ -7132,12 +7248,19 @@ snapshots: webidl-conversions@3.0.1: {} + webidl-conversions@7.0.0: {} + webpack-sources@3.2.3: {} webpack-virtual-modules@0.6.1: {} whatwg-fetch@3.6.20: {} + whatwg-url@13.0.0: + dependencies: + tr46: 4.1.1 + webidl-conversions: 7.0.0 + whatwg-url@5.0.0: dependencies: tr46: 0.0.3