From 33a04f7afbf8d3c10029a8e36a2c1612096a1105 Mon Sep 17 00:00:00 2001 From: jaipaljadeja Date: Mon, 16 Dec 2024 15:56:51 +0530 Subject: [PATCH 1/8] sink-mongo: add mongodb sink --- ...-0e59869f-bf56-417c-b432-622a2c1bb1da.json | 7 + packages/sink-mongo/README.md | 7 + packages/sink-mongo/build.config.ts | 11 ++ packages/sink-mongo/package.json | 39 ++++ packages/sink-mongo/src/collection.ts | 184 ++++++++++++++++++ packages/sink-mongo/src/index.ts | 3 + packages/sink-mongo/src/mongo.ts | 102 ++++++++++ packages/sink-mongo/src/transaction.ts | 24 +++ packages/sink-mongo/tsconfig.json | 11 ++ pnpm-lock.yaml | 123 ++++++++++++ 10 files changed, 511 insertions(+) create mode 100644 change/@apibara-sink-mongo-0e59869f-bf56-417c-b432-622a2c1bb1da.json create mode 100644 packages/sink-mongo/README.md create mode 100644 packages/sink-mongo/build.config.ts create mode 100644 packages/sink-mongo/package.json create mode 100644 packages/sink-mongo/src/collection.ts create mode 100644 packages/sink-mongo/src/index.ts create mode 100644 packages/sink-mongo/src/mongo.ts create mode 100644 packages/sink-mongo/src/transaction.ts create mode 100644 packages/sink-mongo/tsconfig.json diff --git a/change/@apibara-sink-mongo-0e59869f-bf56-417c-b432-622a2c1bb1da.json b/change/@apibara-sink-mongo-0e59869f-bf56-417c-b432-622a2c1bb1da.json new file mode 100644 index 0000000..09bc3e7 --- /dev/null +++ b/change/@apibara-sink-mongo-0e59869f-bf56-417c-b432-622a2c1bb1da.json @@ -0,0 +1,7 @@ +{ + "type": "prerelease", + "comment": "sink-mongo: add mongodb sink", + "packageName": "@apibara/sink-mongo", + "email": "jadejajaipal5@gmail.com", + "dependentChangeType": "patch" +} diff --git a/packages/sink-mongo/README.md b/packages/sink-mongo/README.md new file mode 100644 index 0000000..c2f2690 --- /dev/null +++ b/packages/sink-mongo/README.md @@ -0,0 +1,7 @@ +# `@apibara/sink-mongo` + +TODO + +## Installation + +TODO diff --git a/packages/sink-mongo/build.config.ts b/packages/sink-mongo/build.config.ts new file mode 100644 index 0000000..9aaddef --- /dev/null +++ b/packages/sink-mongo/build.config.ts @@ -0,0 +1,11 @@ +import { defineBuildConfig } from "unbuild"; + +export default defineBuildConfig({ + entries: ["./src/index.ts"], + clean: true, + outDir: "./dist", + declaration: true, + rollup: { + emitCJS: true, + }, +}); diff --git a/packages/sink-mongo/package.json b/packages/sink-mongo/package.json new file mode 100644 index 0000000..d229b16 --- /dev/null +++ b/packages/sink-mongo/package.json @@ -0,0 +1,39 @@ +{ + "name": "@apibara/sink-mongo", + "version": "2.0.0-beta.26", + "type": "module", + "files": [ + "dist", + "src", + "README.md" + ], + "main": "./dist/index.mjs", + "types": "./dist/index.d.ts", + "exports": { + ".": { + "types": "./dist/index.d.ts", + "import": "./dist/index.mjs", + "require": "./dist/index.cjs", + "default": "./dist/index.mjs" + } + }, + "scripts": { + "build": "unbuild", + "typecheck": "tsc --noEmit", + "lint": "biome check .", + "lint:fix": "pnpm lint --write" + }, + "devDependencies": { + "@types/node": "^20.14.0", + "mongodb": "^6.12.0", + "unbuild": "^2.0.0", + "vitest": "^1.6.0" + }, + "peerDependencies": { + "mongodb": "^6.12.0" + }, + "dependencies": { + "@apibara/indexer": "workspace:*", + "@apibara/protocol": "workspace:*" + } +} diff --git a/packages/sink-mongo/src/collection.ts b/packages/sink-mongo/src/collection.ts new file mode 100644 index 0000000..78d3f22 --- /dev/null +++ b/packages/sink-mongo/src/collection.ts @@ -0,0 +1,184 @@ +import type { Cursor } from "@apibara/protocol"; +import type { + BulkWriteOptions, + ClientSession, + Collection, + Condition, + DeleteOptions, + Document, + Filter, + FindCursor, + FindOptions, + InsertManyResult, + InsertOneOptions, + InsertOneResult, + MatchKeysAndValues, + OptionalUnlessRequiredId, + UpdateFilter, + UpdateOptions, + UpdateResult, + WithId, +} from "mongodb"; + +export type MongoCursor = { + from: number | null; + to: number | null; +}; + +export type CursoredSchema = TSchema & { + _cursor: MongoCursor; +}; + +export class MongoSinkCollection { + constructor( + private session: ClientSession, + private collection: Collection, + private endCursor?: Cursor, + ) {} + + async insertOne( + doc: OptionalUnlessRequiredId, + options?: InsertOneOptions, + ): Promise> { + return await this.collection.insertOne( + { + ...doc, + _cursor: { + from: Number(this.endCursor?.orderKey), + to: null, + } as MongoCursor, + }, + { ...options, session: this.session }, + ); + } + + async insertMany( + docs: ReadonlyArray>, + options?: BulkWriteOptions, + ): Promise> { + return await this.collection.insertMany( + docs.map((doc) => ({ + ...doc, + _cursor: { + from: Number(this.endCursor?.orderKey), + to: null, + } as MongoCursor, + })), + { ...options, session: this.session }, + ); + } + + async updateOne( + filter: Filter, + update: UpdateFilter, + options?: UpdateOptions, + ): Promise> { + return await this.collection.updateOne( + { + ...filter, + _cursor: { + to: null, + } as Condition, + }, + { + ...update, + $set: { + ...update.$set, + "_cursor.to": Number(this.endCursor?.orderKey), + } as unknown as MatchKeysAndValues, + }, + { ...options, session: this.session }, + ); + } + + async updateMany( + filter: Filter, + update: UpdateFilter, + options?: UpdateOptions, + ): Promise> { + return await this.collection.updateMany( + { + ...filter, + _cursor: { to: null }, + }, + { + ...update, + $set: { + ...update.$set, + "_cursor.to": Number(this.endCursor?.orderKey), + } as unknown as MatchKeysAndValues, + }, + { ...options, session: this.session }, + ); + } + + async deleteOne( + filter?: Filter, + options?: DeleteOptions, + ): Promise> { + return await this.collection.updateOne( + { + ...((filter ?? {}) as Filter), + _cursor: { + to: null, + } as Condition, + }, + { + $set: { + "_cursor.to": Number(this.endCursor?.orderKey), + } as unknown as MatchKeysAndValues, + }, + { ...options, session: this.session }, + ); + } + + async deleteMany( + filter?: Filter, + options?: DeleteOptions, + ): Promise> { + return await this.collection.updateMany( + { + ...((filter ?? {}) as Filter), + _cursor: { + to: null, + } as Condition, + }, + { + $set: { + "_cursor.to": Number(this.endCursor?.orderKey), + } as unknown as MatchKeysAndValues, + }, + { ...options, session: this.session }, + ); + } + + async findOne( + filter: Filter, + options: Omit, + ): Promise | null> { + return await this.collection.findOne( + { + ...filter, + _cursor: { + to: null, + } as Condition, + }, + { ...options, session: this.session }, + ); + } + + find( + filter: Filter, + options?: FindOptions, + ): FindCursor> { + return this.collection.find( + { + ...filter, + _cursor: { + to: null, + } as Condition, + }, + { ...options, session: this.session }, + ); + } +} diff --git a/packages/sink-mongo/src/index.ts b/packages/sink-mongo/src/index.ts new file mode 100644 index 0000000..69f0b58 --- /dev/null +++ b/packages/sink-mongo/src/index.ts @@ -0,0 +1,3 @@ +export * from "./mongo"; +export * from "./transaction"; +export * from "./collection"; diff --git a/packages/sink-mongo/src/mongo.ts b/packages/sink-mongo/src/mongo.ts new file mode 100644 index 0000000..90221c6 --- /dev/null +++ b/packages/sink-mongo/src/mongo.ts @@ -0,0 +1,102 @@ +import { Sink, type SinkCursorParams } from "@apibara/indexer"; +import type { Cursor } from "@apibara/protocol"; +import type { ClientSession, DbOptions, MongoClient } from "mongodb"; +import { MongoSinkTransactionDb } from "./transaction"; + +export interface MongoSinkOptions { + client: MongoClient; + dbName: string; + dbOptions?: DbOptions; + collections: string[]; +} + +export class MongoSink extends Sink { + constructor( + private client: MongoClient, + private config: Omit, + ) { + super(); + } + + async transaction( + { cursor, endCursor, finality }: SinkCursorParams, + cb: (params: { + db: MongoSinkTransactionDb; + session: ClientSession; + }) => Promise, + ): Promise { + await this.client.withSession(async (session) => + session.withTransaction(async (session) => { + const db = this.client.db(this.config.dbName, this.config.dbOptions); + await cb({ + db: new MongoSinkTransactionDb(db, session, endCursor), + session, + }); + return "Transaction committed."; + }), + ); + } + + async finalize(cursor?: Cursor) { + if (cursor?.orderKey === undefined) return; + + await this.client.withSession(async (session) => + session.withTransaction(async (session) => { + const db = this.client.db(this.config.dbName, this.config.dbOptions); + const orderKeyValue = Number(cursor.orderKey); + + for (const collection of this.config.collections) { + // Delete documents where the upper bound of _cursor is less than the finalize cursor + await db.collection(collection).deleteMany( + { + "_cursor.to": { $lt: orderKeyValue }, + }, + { session }, + ); + } + }), + ); + } + + async invalidate(cursor?: Cursor) { + if (cursor?.orderKey === undefined) return; + + this.client.withSession(async (session) => + session.withTransaction(async (session) => { + const db = this.client.db(this.config.dbName, this.config.dbOptions); + const orderKeyValue = Number(cursor.orderKey); + for (const collection of this.config.collections) { + // Delete documents where the lower bound of _cursor is greater than the invalidate cursor + await db.collection(collection).deleteMany( + { + "cursor.from": { + $gt: orderKeyValue, + }, + }, + { session }, + ); + + // Update documents where the upper bound of _cursor is greater than the invalidate cursor + await db.collection(collection).updateMany( + { "_cursor.to": { $gt: orderKeyValue } }, + { + $set: { + "_cursor.to": null, + }, + }, + { session }, + ); + } + }), + ); + } + + async invalidateOnRestart(cursor?: Cursor) { + await this.invalidate(cursor); + } +} + +export const mongo = (args: MongoSinkOptions) => { + const { client, ...rest } = args; + return new MongoSink(client, rest); +}; diff --git a/packages/sink-mongo/src/transaction.ts b/packages/sink-mongo/src/transaction.ts new file mode 100644 index 0000000..a15ea0e --- /dev/null +++ b/packages/sink-mongo/src/transaction.ts @@ -0,0 +1,24 @@ +import type { Cursor } from "@apibara/protocol"; +import type { ClientSession, CollectionOptions, Db } from "mongodb"; +import { MongoSinkCollection } from "./collection"; + +export class MongoSinkTransactionDb { + constructor( + private db: Db, + private session: ClientSession, + private endCursor?: Cursor, + ) {} + + collection( + name: string, + options?: CollectionOptions, + ) { + const collection = this.db.collection(name, options); + + return new MongoSinkCollection( + this.session, + collection, + this.endCursor, + ); + } +} diff --git a/packages/sink-mongo/tsconfig.json b/packages/sink-mongo/tsconfig.json new file mode 100644 index 0000000..9d05974 --- /dev/null +++ b/packages/sink-mongo/tsconfig.json @@ -0,0 +1,11 @@ +{ + "extends": "../../tsconfig.json", + "compilerOptions": { + "outDir": "dist", + "declarationDir": "dist", + "noEmit": false, + "rootDir": "src", + "types": ["node"] + }, + "include": ["src/"] +} diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 42b75cf..1fe9a55 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -585,6 +585,28 @@ importers: specifier: ^1.6.0 version: 1.6.0(@types/node@20.12.13) + packages/sink-mongo: + dependencies: + '@apibara/indexer': + specifier: workspace:* + version: link:../indexer + '@apibara/protocol': + specifier: workspace:* + version: link:../protocol + devDependencies: + '@types/node': + specifier: ^20.14.0 + version: 20.14.0 + mongodb: + specifier: ^6.12.0 + version: 6.12.0 + unbuild: + specifier: ^2.0.0 + version: 2.0.0(typescript@5.6.2) + vitest: + specifier: ^1.6.0 + version: 1.6.0(@types/node@20.14.0) + packages/starknet: dependencies: '@apibara/protocol': @@ -1484,6 +1506,9 @@ packages: '@jridgewell/trace-mapping@0.3.25': resolution: {integrity: sha512-vNk6aEwybGtawWmy/PzwnGDOjCkLWSD2wqvjGGAgOAwCGWySYXfYoxt00IJkTF+8Lb57DwOb3Aa0o9CApepiYQ==} + '@mongodb-js/saslprep@1.1.9': + resolution: {integrity: sha512-tVkljjeEaAhCqTzajSdgbQ6gE6f3oneVwa3iXR6csiEwXXOFsiC6Uh9iAjAhXPtqa/XMDHWjjeNH/77m/Yq2dw==} + '@noble/curves@1.2.0': resolution: {integrity: sha512-oYclrNgRaM9SsBUBVbb8M6DTV7ZHRTKugureoYEncY5c65HOmRzvSiTE3y5CYaPYJA/GVkrhXEoF0M3Ya9PMnw==} @@ -1913,6 +1938,12 @@ packages: '@types/shimmer@1.0.5': resolution: {integrity: sha512-9Hp0ObzwwO57DpLFF0InUjUm/II8GmKAvzbefxQTihCb7KI6yc9yzf0nLc4mVdby5N4DRCgQM2wCup9KTieeww==} + '@types/webidl-conversions@7.0.3': + resolution: {integrity: sha512-CiJJvcRtIgzadHCYXw7dqEnMNRjhGZlYK05Mj9OyktqV8uVT8fD2BFOB7S1uwBE3Kj2Z+4UyPmFw/Ixgw/LAlA==} + + '@types/whatwg-url@11.0.5': + resolution: {integrity: sha512-coYR071JRaHa+xoEvvYqvnIHaVqaYrLPbsufM9BF63HkwI5Lgmy2QR8Q5K/lYDYo5AK82wOvSOS0UsLTpTG7uQ==} + '@vitest/expect@1.6.0': resolution: {integrity: sha512-ixEvFVQjycy/oNgHjqsL6AZCDduC+tflRluaHIzKIsdbzkLn2U/iBnVeJwB6HsIjQBdfMR8Z0tRxKUsvFJEeWQ==} @@ -2064,6 +2095,10 @@ packages: engines: {node: ^6 || ^7 || ^8 || ^9 || ^10 || ^11 || ^12 || >=13.7} hasBin: true + bson@6.10.1: + resolution: {integrity: sha512-P92xmHDQjSKPLHqFxefqMxASNq/aWJMEZugpCjf+AF/pgcUpMMQCg7t7+ewko0/u8AapvF3luf/FoehddEK+sA==} + engines: {node: '>=16.20.1'} + buffer-from@1.1.2: resolution: {integrity: sha512-E+XQCRwSbaaiChtv6k6Dwgc+bx+Bs6vuKJHHl5kox/BaKbhiXzqQOwK4cO22yElGp2OCmjwVhT3HmxgyPGnJfQ==} @@ -2825,6 +2860,9 @@ packages: mdn-data@2.0.30: resolution: {integrity: sha512-GaqWWShW4kv/G9IEucWScBx9G1/vsFZZJUO+tD26M8J8z3Kw5RDQjaoZe03YAClgeS/SWPOcb4nkFBTEi5DUEA==} + memory-pager@1.5.0: + resolution: {integrity: sha512-ZS4Bp4r/Zoeq6+NLJpP+0Zzm0pR8whtGPf1XExKLJBAczGMnSi3It14OiNCStjQjM6NU1okjQGSxgEZN8eBYKg==} + merge-stream@2.0.0: resolution: {integrity: sha512-abv/qOcuPfk3URPfDzmZU1LKmuw8kT+0nIHvKrKgFrwifol/doWcdA4ZqsWQ8ENrFKkd67Mfpo/LovbIUsbt3w==} @@ -2910,6 +2948,36 @@ packages: module-details-from-path@1.0.3: resolution: {integrity: sha512-ySViT69/76t8VhE1xXHK6Ch4NcDd26gx0MzKXLO+F7NOtnqH68d9zF94nT8ZWSxXh8ELOERsnJO/sWt1xZYw5A==} + mongodb-connection-string-url@3.0.1: + resolution: {integrity: sha512-XqMGwRX0Lgn05TDB4PyG2h2kKO/FfWJyCzYQbIhXUxz7ETt0I/FqHjUeqj37irJ+Dl1ZtU82uYyj14u2XsZKfg==} + + mongodb@6.12.0: + resolution: {integrity: sha512-RM7AHlvYfS7jv7+BXund/kR64DryVI+cHbVAy9P61fnb1RcWZqOW1/Wj2YhqMCx+MuYhqTRGv7AwHBzmsCKBfA==} + engines: {node: '>=16.20.1'} + peerDependencies: + '@aws-sdk/credential-providers': ^3.188.0 + '@mongodb-js/zstd': ^1.1.0 || ^2.0.0 + gcp-metadata: ^5.2.0 + kerberos: ^2.0.1 + mongodb-client-encryption: '>=6.0.0 <7' + snappy: ^7.2.2 + socks: ^2.7.1 + peerDependenciesMeta: + '@aws-sdk/credential-providers': + optional: true + '@mongodb-js/zstd': + optional: true + gcp-metadata: + optional: true + kerberos: + optional: true + mongodb-client-encryption: + optional: true + snappy: + optional: true + socks: + optional: true + mri@1.2.0: resolution: {integrity: sha512-tzzskb3bG8LvYGFF/mDTpq3jpI6Q9wc3LEmBaghu+DdCssd1FakN7Bc0hVNmEyGq1bq3RgfkCb3cmQLpNPOroA==} engines: {node: '>=4'} @@ -3524,6 +3592,9 @@ packages: resolution: {integrity: sha512-UjgapumWlbMhkBgzT7Ykc5YXUT46F0iKu8SGXq0bcwP5dz/h0Plj6enJqjz1Zbq2l5WaqYnrVbwWOWMyF3F47g==} engines: {node: '>=0.10.0'} + sparse-bitfield@3.0.3: + resolution: {integrity: sha512-kvzhi7vqKTfkh0PZU+2D2PIllw2ymqJKujUcyPMd9Y75Nv4nPbGJZXNhxsgdQab2BmlDct1YnfQCguEvHr7VsQ==} + split2@4.2.0: resolution: {integrity: sha512-UcjcJOWknrNkF6PLX83qcHM6KHgVKNkV62Y8a5uYDVv9ydGQVwAHMKqHdJje1VTWpljG0WYpCDhrCdAOYH4TWg==} engines: {node: '>= 10.x'} @@ -3633,6 +3704,10 @@ packages: tr46@0.0.3: resolution: {integrity: sha512-N3WMsuqV66lT30CrXNbEjx4GEwlow3v6rr4mCcv6prnfwhS01rkgyFdjPNBYd9br7LpXV1+Emh01fHnq2Gdgrw==} + tr46@4.1.1: + resolution: {integrity: sha512-2lv/66T7e5yNyhAAC4NaKe5nVavzuGJQVVtRYLyQ2OI8tsJ61PMLlelehb0wi2Hx6+hT/OJUWZcw8MjlSRnxvw==} + engines: {node: '>=14'} + ts-error@1.0.6: resolution: {integrity: sha512-tLJxacIQUM82IR7JO1UUkKlYuUTmoY9HBJAmNWFzheSlDS5SPMcNIepejHJa4BpPQLAcbRhRf3GDJzyj6rbKvA==} @@ -3859,6 +3934,10 @@ packages: webidl-conversions@3.0.1: resolution: {integrity: sha512-2JAn3z8AR6rjK8Sm8orRC0h/bcl/DqL7tRPdGZ4I1CjdF+EaMLmYxBHyXuKL849eucPFhvBoxMsflfOb8kxaeQ==} + webidl-conversions@7.0.0: + resolution: {integrity: sha512-VwddBukDzu71offAQR975unBIGqfKZpM+8ZX6ySk8nYhVoo5CYaZyzt3YBvYtRtO+aoGlqxPg/B87NGVZ/fu6g==} + engines: {node: '>=12'} + webpack-sources@3.2.3: resolution: {integrity: sha512-/DyMEOrDgLKKIG0fmvtz+4dUX/3Ghozwgm6iPp8KRhvn+eQf9+Q7GWxVNMk3+uCPWfdXYC4ExGBckIXdFEfH1w==} engines: {node: '>=10.13.0'} @@ -3869,6 +3948,10 @@ packages: whatwg-fetch@3.6.20: resolution: {integrity: sha512-EqhiFU6daOA8kpjOWTL0olhVOF3i7OrFzSYiGsEMB8GcXS+RrzauAERX65xMeNWVqxA6HXH2m69Z9LaKKdisfg==} + whatwg-url@13.0.0: + resolution: {integrity: sha512-9WWbymnqj57+XEuqADHrCJ2eSXzn8WXIW/YSGaZtb2WKAInQ6CHfaUUcTyyver0p8BDg5StLQq8h1vtZuwmOig==} + engines: {node: '>=16'} + whatwg-url@5.0.0: resolution: {integrity: sha512-saE57nupxk6v3HY35+jzBwYa0rKSy0XR8JSxZPwgLr7ys0IBzhGviA1/TUGJLmSVqs8pb9AnvICXEuOHLprYTw==} @@ -4531,6 +4614,10 @@ snapshots: '@jridgewell/resolve-uri': 3.1.1 '@jridgewell/sourcemap-codec': 1.4.15 + '@mongodb-js/saslprep@1.1.9': + dependencies: + sparse-bitfield: 3.0.3 + '@noble/curves@1.2.0': dependencies: '@noble/hashes': 1.3.2 @@ -4975,6 +5062,12 @@ snapshots: '@types/shimmer@1.0.5': {} + '@types/webidl-conversions@7.0.3': {} + + '@types/whatwg-url@11.0.5': + dependencies: + '@types/webidl-conversions': 7.0.3 + '@vitest/expect@1.6.0': dependencies: '@vitest/spy': 1.6.0 @@ -5132,6 +5225,8 @@ snapshots: node-releases: 2.0.14 update-browserslist-db: 1.0.16(browserslist@4.23.0) + bson@6.10.1: {} + buffer-from@1.1.2: {} buffer@5.7.1: @@ -5914,6 +6009,8 @@ snapshots: mdn-data@2.0.30: {} + memory-pager@1.5.0: {} + merge-stream@2.0.0: {} merge2@1.4.1: {} @@ -5996,6 +6093,17 @@ snapshots: module-details-from-path@1.0.3: {} + mongodb-connection-string-url@3.0.1: + dependencies: + '@types/whatwg-url': 11.0.5 + whatwg-url: 13.0.0 + + mongodb@6.12.0: + dependencies: + '@mongodb-js/saslprep': 1.1.9 + bson: 6.10.1 + mongodb-connection-string-url: 3.0.1 + mri@1.2.0: {} ms@2.1.2: {} @@ -6608,6 +6716,10 @@ snapshots: source-map@0.6.1: {} + sparse-bitfield@3.0.3: + dependencies: + memory-pager: 1.5.0 + split2@4.2.0: {} sqlite@5.1.1: {} @@ -6737,6 +6849,10 @@ snapshots: tr46@0.0.3: {} + tr46@4.1.1: + dependencies: + punycode: 2.3.1 + ts-error@1.0.6: {} ts-mixer@6.0.4: {} @@ -7132,12 +7248,19 @@ snapshots: webidl-conversions@3.0.1: {} + webidl-conversions@7.0.0: {} + webpack-sources@3.2.3: {} webpack-virtual-modules@0.6.1: {} whatwg-fetch@3.6.20: {} + whatwg-url@13.0.0: + dependencies: + tr46: 4.1.1 + webidl-conversions: 7.0.0 + whatwg-url@5.0.0: dependencies: tr46: 0.0.3 From 436e6cdfca650bdab0745ddede3e631ee2ed50b9 Mon Sep 17 00:00:00 2001 From: jaipaljadeja Date: Wed, 18 Dec 2024 20:17:21 +0530 Subject: [PATCH 2/8] sink-mongo: update queries in mongo transaction --- packages/sink-mongo/src/collection.ts | 76 ++++++++++++++++++++++++--- packages/sink-mongo/src/mongo.ts | 2 +- 2 files changed, 70 insertions(+), 8 deletions(-) diff --git a/packages/sink-mongo/src/collection.ts b/packages/sink-mongo/src/collection.ts index 78d3f22..31bbf43 100644 --- a/packages/sink-mongo/src/collection.ts +++ b/packages/sink-mongo/src/collection.ts @@ -8,6 +8,7 @@ import type { Document, Filter, FindCursor, + FindOneAndUpdateOptions, FindOptions, InsertManyResult, InsertOneOptions, @@ -73,7 +74,8 @@ export class MongoSinkCollection { update: UpdateFilter, options?: UpdateOptions, ): Promise> { - return await this.collection.updateOne( + // 1. Find and update the document, getting the old version + const oldDoc = await this.collection.findOneAndUpdate( { ...filter, _cursor: { @@ -84,11 +86,39 @@ export class MongoSinkCollection { ...update, $set: { ...update.$set, - "_cursor.to": Number(this.endCursor?.orderKey), + "_cursor.from": Number(this.endCursor?.orderKey), } as unknown as MatchKeysAndValues, }, - { ...options, session: this.session }, + { + ...options, + session: this.session, + returnDocument: "before", + } as FindOneAndUpdateOptions, ); + + // 2. If we found and updated a document, insert its old version + if (oldDoc) { + const { _id, ...doc } = oldDoc; + await this.collection.insertOne( + { + ...doc, + _cursor: { + ...oldDoc._cursor, + to: Number(this.endCursor?.orderKey), + }, + } as unknown as OptionalUnlessRequiredId, + { session: this.session }, + ); + } + + // 3. Return an UpdateResult-compatible object + return { + acknowledged: true, + modifiedCount: oldDoc ? 1 : 0, + upsertedId: null, + upsertedCount: 0, + matchedCount: oldDoc ? 1 : 0, + }; } async updateMany( @@ -96,7 +126,20 @@ export class MongoSinkCollection { update: UpdateFilter, options?: UpdateOptions, ): Promise> { - return await this.collection.updateMany( + // 1. Find all documents matching the filter that are latest (to: null) + const oldDocs = await this.collection + .find( + { + ...filter, + _cursor: { to: null }, + }, + { session: this.session }, + ) + .toArray(); + + // 2. Update to the new values with updateMany + // (setting _cursor.from to endCursor, leaving _cursor.to unchanged) + const updateResult = await this.collection.updateMany( { ...filter, _cursor: { to: null }, @@ -105,20 +148,39 @@ export class MongoSinkCollection { ...update, $set: { ...update.$set, - "_cursor.to": Number(this.endCursor?.orderKey), + "_cursor.from": Number(this.endCursor?.orderKey), } as unknown as MatchKeysAndValues, }, { ...options, session: this.session }, ); + + // 3. Adjust the cursor.to of the old values + const oldDocsWithUpdatedCursor = oldDocs.map(({ _id, ...doc }) => ({ + ...doc, + _cursor: { + ...doc._cursor, + to: Number(this.endCursor?.orderKey), + }, + })); + + // 4. Insert the old values back into the db + if (oldDocsWithUpdatedCursor.length > 0) { + await this.collection.insertMany( + oldDocsWithUpdatedCursor as unknown as OptionalUnlessRequiredId[], + { session: this.session }, + ); + } + + return updateResult; } async deleteOne( - filter?: Filter, + filter: Filter, options?: DeleteOptions, ): Promise> { return await this.collection.updateOne( { - ...((filter ?? {}) as Filter), + ...filter, _cursor: { to: null, } as Condition, diff --git a/packages/sink-mongo/src/mongo.ts b/packages/sink-mongo/src/mongo.ts index 90221c6..e56d922 100644 --- a/packages/sink-mongo/src/mongo.ts +++ b/packages/sink-mongo/src/mongo.ts @@ -96,7 +96,7 @@ export class MongoSink extends Sink { } } -export const mongo = (args: MongoSinkOptions) => { +export const mongoSink = (args: MongoSinkOptions) => { const { client, ...rest } = args; return new MongoSink(client, rest); }; From 456cb67a2ca5fd53bbbd8095ada67053a6fa011d Mon Sep 17 00:00:00 2001 From: jaipaljadeja Date: Thu, 19 Dec 2024 20:12:24 +0530 Subject: [PATCH 3/8] sink-mongo: fix logical issues in mongo sink methods --- packages/sink-mongo/src/collection.ts | 27 ++++++++------------------ packages/sink-mongo/src/mongo.ts | 9 +++++++-- packages/sink-mongo/src/transaction.ts | 2 +- 3 files changed, 16 insertions(+), 22 deletions(-) diff --git a/packages/sink-mongo/src/collection.ts b/packages/sink-mongo/src/collection.ts index 31bbf43..f0dc396 100644 --- a/packages/sink-mongo/src/collection.ts +++ b/packages/sink-mongo/src/collection.ts @@ -3,7 +3,6 @@ import type { BulkWriteOptions, ClientSession, Collection, - Condition, DeleteOptions, Document, Filter, @@ -78,9 +77,7 @@ export class MongoSinkCollection { const oldDoc = await this.collection.findOneAndUpdate( { ...filter, - _cursor: { - to: null, - } as Condition, + "_cursor.to": null, }, { ...update, @@ -131,7 +128,7 @@ export class MongoSinkCollection { .find( { ...filter, - _cursor: { to: null }, + "_cursor.to": null, }, { session: this.session }, ) @@ -142,7 +139,7 @@ export class MongoSinkCollection { const updateResult = await this.collection.updateMany( { ...filter, - _cursor: { to: null }, + "_cursor.to": null, }, { ...update, @@ -181,9 +178,7 @@ export class MongoSinkCollection { return await this.collection.updateOne( { ...filter, - _cursor: { - to: null, - } as Condition, + "_cursor.to": null, }, { $set: { @@ -201,9 +196,7 @@ export class MongoSinkCollection { return await this.collection.updateMany( { ...((filter ?? {}) as Filter), - _cursor: { - to: null, - } as Condition, + "_cursor.to": null, }, { $set: { @@ -216,14 +209,12 @@ export class MongoSinkCollection { async findOne( filter: Filter, - options: Omit, + options?: Omit, ): Promise | null> { return await this.collection.findOne( { ...filter, - _cursor: { - to: null, - } as Condition, + "_cursor.to": null, }, { ...options, session: this.session }, ); @@ -236,9 +227,7 @@ export class MongoSinkCollection { return this.collection.find( { ...filter, - _cursor: { - to: null, - } as Condition, + "_cursor.to": null, }, { ...options, session: this.session }, ); diff --git a/packages/sink-mongo/src/mongo.ts b/packages/sink-mongo/src/mongo.ts index e56d922..a5c563f 100644 --- a/packages/sink-mongo/src/mongo.ts +++ b/packages/sink-mongo/src/mongo.ts @@ -54,6 +54,8 @@ export class MongoSink extends Sink { { session }, ); } + + return "Transaction committed."; }), ); } @@ -61,15 +63,16 @@ export class MongoSink extends Sink { async invalidate(cursor?: Cursor) { if (cursor?.orderKey === undefined) return; - this.client.withSession(async (session) => + await this.client.withSession(async (session) => session.withTransaction(async (session) => { const db = this.client.db(this.config.dbName, this.config.dbOptions); const orderKeyValue = Number(cursor.orderKey); + for (const collection of this.config.collections) { // Delete documents where the lower bound of _cursor is greater than the invalidate cursor await db.collection(collection).deleteMany( { - "cursor.from": { + "_cursor.from": { $gt: orderKeyValue, }, }, @@ -87,6 +90,8 @@ export class MongoSink extends Sink { { session }, ); } + + return "Transaction committed."; }), ); } diff --git a/packages/sink-mongo/src/transaction.ts b/packages/sink-mongo/src/transaction.ts index a15ea0e..e385fb7 100644 --- a/packages/sink-mongo/src/transaction.ts +++ b/packages/sink-mongo/src/transaction.ts @@ -1,5 +1,5 @@ import type { Cursor } from "@apibara/protocol"; -import type { ClientSession, CollectionOptions, Db } from "mongodb"; +import type { ClientSession, CollectionOptions, Db, Document } from "mongodb"; import { MongoSinkCollection } from "./collection"; export class MongoSinkTransactionDb { From d600561e710100eb32b2a82d9343eb39405997a6 Mon Sep 17 00:00:00 2001 From: jaipaljadeja Date: Thu, 19 Dec 2024 20:14:09 +0530 Subject: [PATCH 4/8] sink-mongo: add test cases for mongo sink --- .../sink-mongo/docker-compose.orbstack.yaml | 26 ++ packages/sink-mongo/docker-compose.yaml | 26 ++ packages/sink-mongo/package.json | 3 +- packages/sink-mongo/src/mongo.test.ts | 279 ++++++++++++++++++ 4 files changed, 333 insertions(+), 1 deletion(-) create mode 100644 packages/sink-mongo/docker-compose.orbstack.yaml create mode 100644 packages/sink-mongo/docker-compose.yaml create mode 100644 packages/sink-mongo/src/mongo.test.ts diff --git a/packages/sink-mongo/docker-compose.orbstack.yaml b/packages/sink-mongo/docker-compose.orbstack.yaml new file mode 100644 index 0000000..0fd930e --- /dev/null +++ b/packages/sink-mongo/docker-compose.orbstack.yaml @@ -0,0 +1,26 @@ +# Reference: https://medium.com/workleap/the-only-local-mongodb-replica-set-with-docker-compose-guide-youll-ever-need-2f0b74dd8384 + +version: "3.8" + +services: + mongo1: + image: mongo:7.0 + command: ["--replSet", "rs0", "--bind_ip_all", "--port", "27017"] + ports: + - 27017:27017 + extra_hosts: + - "localhost:host-gateway" + healthcheck: + test: echo "try { rs.status() } catch (err) { rs.initiate({_id:'rs0',members:[{_id:0,host:'localhost:27017'}]}) }" | mongosh --port 27017 --quiet + interval: 5s + timeout: 30s + start_period: 0s + start_interval: 1s + retries: 30 + volumes: + - "mongo1_data:/data/db" + - "mongo1_config:/data/configdb" + +volumes: + mongo1_data: + mongo1_config: diff --git a/packages/sink-mongo/docker-compose.yaml b/packages/sink-mongo/docker-compose.yaml new file mode 100644 index 0000000..d2e07aa --- /dev/null +++ b/packages/sink-mongo/docker-compose.yaml @@ -0,0 +1,26 @@ +# Reference: https://medium.com/workleap/the-only-local-mongodb-replica-set-with-docker-compose-guide-youll-ever-need-2f0b74dd8384 + +version: "3.8" + +services: + mongo1: + image: mongo:7.0 + command: ["--replSet", "rs0", "--bind_ip_all", "--port", "27017"] + ports: + - 27017:27017 + extra_hosts: + - "host.docker.internal:host-gateway" + healthcheck: + test: echo "try { rs.status() } catch (err) { rs.initiate({_id:'rs0',members:[{_id:0,host:'host.docker.internal:27017'}]}) }" | mongosh --port 27017 --quiet + interval: 5s + timeout: 30s + start_period: 0s + start_interval: 1s + retries: 30 + volumes: + - "mongo1_data:/data/db" + - "mongo1_config:/data/configdb" + +volumes: + mongo1_data: + mongo1_config: diff --git a/packages/sink-mongo/package.json b/packages/sink-mongo/package.json index d229b16..062d81a 100644 --- a/packages/sink-mongo/package.json +++ b/packages/sink-mongo/package.json @@ -21,7 +21,8 @@ "build": "unbuild", "typecheck": "tsc --noEmit", "lint": "biome check .", - "lint:fix": "pnpm lint --write" + "lint:fix": "pnpm lint --write", + "test": "vitest" }, "devDependencies": { "@types/node": "^20.14.0", diff --git a/packages/sink-mongo/src/mongo.test.ts b/packages/sink-mongo/src/mongo.test.ts new file mode 100644 index 0000000..21555ce --- /dev/null +++ b/packages/sink-mongo/src/mongo.test.ts @@ -0,0 +1,279 @@ +import { run, useSink } from "@apibara/indexer"; +import { + generateMockMessages, + getMockIndexer, +} from "@apibara/indexer/internal"; +import type { Cursor } from "@apibara/protocol"; +import { + type MockBlock, + MockClient, + type MockFilter, +} from "@apibara/protocol/testing"; +import { MongoClient, type WithId } from "mongodb"; +import { afterAll, beforeAll, beforeEach, describe, expect, it } from "vitest"; +import { mongoSink } from "./mongo"; + +const TEST_COLLECTION = "test_collection"; +const mongoClient = new MongoClient( + "mongodb://localhost:27017/?replicaSet=rs0", +); + +interface Schema { + blockNumber: number; + data: string | undefined; + _cursor?: { + from: number; + to: number | null; + }; +} + +const db = mongoClient.db("test"); + +describe("MongoDB Sink Test", () => { + beforeAll(async () => { + await mongoClient.connect(); + }); + + beforeEach(async () => { + await db.collection(TEST_COLLECTION).deleteMany({}); + }); + + it("should insert data", async () => { + const client = new MockClient((request, options) => { + return generateMockMessages(5); + }); + + const sink = mongoSink({ + client: mongoClient, + dbName: "test", + collections: [TEST_COLLECTION], + }); + + const indexer = getMockIndexer({ + sink, + override: { + transform: async ({ context, endCursor, block: { data } }) => { + const { db } = useSink({ context }); + await db.collection(TEST_COLLECTION).insertOne({ + blockNumber: Number(endCursor?.orderKey), + data, + }); + }, + }, + }); + + await run(client, indexer); + + const result = await db + .collection(TEST_COLLECTION) + .find({}, { sort: { blockNumber: 1 } }) + .toArray(); + + expect(result).toHaveLength(5); + expect(result[0].data).toBe("5000000"); + expect(result[2].data).toBe("5000002"); + }); + + it("should update data", async () => { + const client = new MockClient((request, options) => { + return generateMockMessages(5); + }); + + const sink = mongoSink({ + client: mongoClient, + dbName: "test", + collections: [TEST_COLLECTION], + }); + + const indexer = getMockIndexer({ + sink, + override: { + transform: async ({ context, endCursor, block: { data } }) => { + const { db } = useSink({ context }); + + // insert data for each message in db + await db.collection(TEST_COLLECTION).insertOne({ + blockNumber: Number(endCursor?.orderKey), + data, + }); + + // update data for id 5000002 when orderKey is 5000004 + // this is to test if the update query is working + if (endCursor?.orderKey === 5000004n) { + // Find the document and update it, creating a new version + await db + .collection(TEST_COLLECTION) + .updateOne( + { blockNumber: 5000002 }, + { $set: { data: "0000000" } }, + ); + } + }, + }, + }); + + await run(client, indexer); + + const result = await db + .collection(TEST_COLLECTION) + .find() + .sort({ blockNumber: 1 }) + .toArray(); + + expect(result).toHaveLength(6); + expect( + result.find((r) => r.blockNumber === 5000002 && r._cursor?.to === null) + ?.data, + ).toBe("0000000"); + }); + + it("should handle soft deletes", async () => { + const client = new MockClient((request, options) => { + return generateMockMessages(5); + }); + + const sink = mongoSink({ + client: mongoClient, + dbName: "test", + collections: [TEST_COLLECTION], + }); + + const indexer = getMockIndexer({ + sink, + override: { + transform: async ({ context, endCursor, block: { data } }) => { + const { db } = useSink({ context }); + + // insert data for each message in db + await db.collection(TEST_COLLECTION).insertOne({ + blockNumber: Number(endCursor?.orderKey), + data, + }); + + // delete data for id 5000002 when orderKey is 5000004 + // this is to test if the delete query is working + if (endCursor?.orderKey === 5000004n) { + await db + .collection(TEST_COLLECTION) + .deleteOne({ blockNumber: 5000002 }); + } + }, + }, + }); + + await run(client, indexer); + + const result = await db + .collection(TEST_COLLECTION) + .find() + .sort({ blockNumber: 1 }) + .toArray(); + + expect(result).toHaveLength(5); + + // as when you run delete query on a data, it isnt literally deleted from the db, + // instead, we just update the upper bound of that row to the current cursor + // check if the cursor upper bound has been set correctly + expect(result.find((r) => r.blockNumber === 5000002)?._cursor?.to).toBe( + 5000004, + ); + }); + + it("should select data", async () => { + const client = new MockClient((request, options) => { + return generateMockMessages(5); + }); + + const sink = mongoSink({ + client: mongoClient, + dbName: "test", + collections: [TEST_COLLECTION], + }); + + let result: WithId[] = []; + + const indexer = getMockIndexer({ + sink, + override: { + transform: async ({ context, endCursor, block: { data } }) => { + const { db } = useSink({ context }); + + // insert data for each message in db + await db.collection(TEST_COLLECTION).insertOne({ + blockNumber: Number(endCursor?.orderKey), + data, + }); + + // delete data for id 5000002 when orderKey is 5000004 + // this will update the upper bound of the row with id 5000002 from null to 5000004 + // so when we select all rows, row with id 5000002 will not be included + // as when we run select query it should only return rows with upper bound null + if (endCursor?.orderKey === 5000003n) { + await db + .collection(TEST_COLLECTION) + .deleteOne({ blockNumber: 5000002 }); + } + + // when on last message of mock stream, select all rows from db + if (endCursor?.orderKey === 5000004n) { + result = await db + .collection(TEST_COLLECTION) + .find({}) + .sort({ blockNumber: 1 }) + .toArray(); + } + }, + }, + }); + + await run(client, indexer); + + expect(result).toHaveLength(4); + expect(result.find((r) => r.blockNumber === 5000002)).toBeUndefined(); + // check if all rows are still in db + const allRows = await db + .collection(TEST_COLLECTION) + .find() + .toArray(); + expect(allRows).toHaveLength(5); + }); + + it("should invalidate data correctly", async () => { + const sink = mongoSink({ + client: mongoClient, + dbName: "test", + collections: [TEST_COLLECTION], + }); + + // Insert test data + await db.collection(TEST_COLLECTION).insertMany([ + { blockNumber: 1, data: "data1", _cursor: { from: 1, to: 5 } }, + { blockNumber: 2, data: "data2", _cursor: { from: 2, to: 5 } }, + { blockNumber: 3, data: "data3", _cursor: { from: 3, to: null } }, + { blockNumber: 4, data: "data4", _cursor: { from: 4, to: null } }, + { blockNumber: 5, data: "data5", _cursor: { from: 5, to: null } }, + ]); + + // Create a cursor at position 3 + const cursor: Cursor = { orderKey: 3n }; + + // Invalidate data + await sink.invalidate(cursor); + + // Check the results + const result = await db + .collection(TEST_COLLECTION) + .find() + .sort({ blockNumber: 1 }) + .toArray(); + + expect(result).toHaveLength(3); + expect(result[0]._cursor?.to).toBe(null); + expect(result[1]._cursor?.to).toBe(null); + expect(result[2]._cursor?.to).toBe(null); + }); + + afterAll(async () => { + await mongoClient.close(); + }); +}); From 01e37cf456467b54d1e7d86d0c619368ca7a44d9 Mon Sep 17 00:00:00 2001 From: jaipaljadeja Date: Thu, 19 Dec 2024 20:15:39 +0530 Subject: [PATCH 5/8] indexer: rename drizzle sink factory from drizzle to drizzleSink --- packages/indexer/build.config.ts | 1 + packages/indexer/docker-compose.yaml | 10 ++++++++++ packages/indexer/package.json | 6 ++++++ packages/indexer/src/sinks/drizzle/drizzle.ts | 2 +- 4 files changed, 18 insertions(+), 1 deletion(-) create mode 100644 packages/indexer/docker-compose.yaml diff --git a/packages/indexer/build.config.ts b/packages/indexer/build.config.ts index 964198d..01fb35c 100644 --- a/packages/indexer/build.config.ts +++ b/packages/indexer/build.config.ts @@ -13,6 +13,7 @@ export default defineBuildConfig({ "./src/plugins/logger.ts", "./src/plugins/persistence.ts", "./src/plugins/drizzle-persistence.ts", + "./src/internal/testing.ts", ], clean: true, outDir: "./dist", diff --git a/packages/indexer/docker-compose.yaml b/packages/indexer/docker-compose.yaml new file mode 100644 index 0000000..5246aca --- /dev/null +++ b/packages/indexer/docker-compose.yaml @@ -0,0 +1,10 @@ +version: "3.8" + +services: + postgres: + image: postgres:16 + environment: + POSTGRES_USER: postgres + POSTGRES_PASSWORD: postgres + ports: + - 5432:5432 diff --git a/packages/indexer/package.json b/packages/indexer/package.json index 5bebbbe..435fbd0 100644 --- a/packages/indexer/package.json +++ b/packages/indexer/package.json @@ -75,6 +75,12 @@ "import": "./dist/plugins/drizzle-persistence.mjs", "require": "./dist/plugins/drizzle-persistence.cjs", "default": "./dist/plugins/drizzle-persistence.mjs" + }, + "./internal": { + "types": "./dist/internal/testing.d.ts", + "import": "./dist/internal/testing.mjs", + "require": "./dist/internal/testing.cjs", + "default": "./dist/internal/testing.mjs" } }, "scripts": { diff --git a/packages/indexer/src/sinks/drizzle/drizzle.ts b/packages/indexer/src/sinks/drizzle/drizzle.ts index 03e1597..79cc8bf 100644 --- a/packages/indexer/src/sinks/drizzle/drizzle.ts +++ b/packages/indexer/src/sinks/drizzle/drizzle.ts @@ -121,7 +121,7 @@ export class DrizzleSink< } } -export const drizzle = < +export const drizzleSink = < TQueryResult extends PgQueryResultHKT, TFullSchema extends Record = Record, TSchema extends From bf9c528e9b0bb1ffc48e9c64f134b63b091e5bfb Mon Sep 17 00:00:00 2001 From: jaipaljadeja Date: Thu, 19 Dec 2024 20:16:06 +0530 Subject: [PATCH 6/8] indexer: fix logical issue in update method of drizzle sink --- .../indexer/src/sinks/drizzle/drizzle.test.ts | 13 ++++--- packages/indexer/src/sinks/drizzle/update.ts | 35 +++++++++++++++---- 2 files changed, 37 insertions(+), 11 deletions(-) diff --git a/packages/indexer/src/sinks/drizzle/drizzle.test.ts b/packages/indexer/src/sinks/drizzle/drizzle.test.ts index 39a8acb..f2c2ff6 100644 --- a/packages/indexer/src/sinks/drizzle/drizzle.test.ts +++ b/packages/indexer/src/sinks/drizzle/drizzle.test.ts @@ -13,7 +13,7 @@ import { run } from "../../indexer"; import { generateMockMessages, getMockIndexer } from "../../internal/testing"; import { useSink } from "../../sink"; import type { Int8Range } from "./Int8Range"; -import { drizzle as drizzleSink } from "./drizzle"; +import { drizzleSink } from "./drizzle"; import { getDrizzleCursor, pgIndexerTable } from "./utils"; const testTable = pgIndexerTable("test_table", { @@ -35,7 +35,7 @@ describe("Drizzle Test", () => { await db.execute(sql`DROP TABLE IF EXISTS test_table`); // create test_table with db await db.execute( - sql`CREATE TABLE test_table (id SERIAL PRIMARY KEY, data TEXT, _cursor INT8RANGE)`, + sql`CREATE TABLE test_table (id SERIAL, data TEXT, _cursor INT8RANGE)`, ); }); @@ -108,11 +108,14 @@ describe("Drizzle Test", () => { const result = await db.select().from(testTable).orderBy(asc(testTable.id)); - expect(result).toHaveLength(5); - expect(result[2].data).toBe("0000000"); + expect(result).toHaveLength(6); + expect( + result.find((r) => r.id === 5000002 && r._cursor?.range.upper === null) + ?.data, + ).toBe("0000000"); }); - it("should delete data", async () => { + it("should soft delete data", async () => { const client = new MockClient((request, options) => { return generateMockMessages(5); }); diff --git a/packages/indexer/src/sinks/drizzle/update.ts b/packages/indexer/src/sinks/drizzle/update.ts index 24537fc..3cea116 100644 --- a/packages/indexer/src/sinks/drizzle/update.ts +++ b/packages/indexer/src/sinks/drizzle/update.ts @@ -12,6 +12,8 @@ import type { PgUpdateBase, PgUpdateSetSource, } from "drizzle-orm/pg-core"; +import type { Int8Range } from "./Int8Range"; +import { getDrizzleCursor } from "./utils"; export class DrizzleSinkUpdate< TTable extends PgTable, @@ -32,15 +34,36 @@ export class DrizzleSinkUpdate< return { ...originalSet, where: async (where: SQL | undefined) => { - await this.db - .update(this.table) - .set({ - _cursor: sql`int8range(lower(_cursor), ${Number(this.endCursor?.orderKey!)}, '[)')`, - } as PgUpdateSetSource) + // 1. Find and store old versions of matching records + const oldRecords = await this.db + .select() + .from(this.table) .where(sql`${where ? sql`${where} AND ` : sql``}upper_inf(_cursor)`) .execute(); - return originalSet.where(where); + // 2. Insert old versions with updated upperbound cursor + if (oldRecords.length > 0) { + const oldRecordsWithNewCursor = oldRecords.map((record) => ({ + ...record, + _cursor: getDrizzleCursor([ + BigInt((record._cursor as Int8Range).range.lower!), + this.endCursor?.orderKey, + ]), + })); + + await this.db + .insert(this.table) + .values(oldRecordsWithNewCursor) + .execute(); + } + + // 3. Update matching records with new values and new 'lowerbound' cursor + return originalUpdate + .set({ + ...values, + _cursor: sql`int8range(${Number(this.endCursor?.orderKey!)}, NULL, '[)')`, + } as PgUpdateSetSource) + .where(sql`${where ? sql`${where} AND ` : sql``}upper_inf(_cursor)`); }, } as PgUpdateBase; } From 7d040adca67e622ce6bf3f5dd777f61df9fac5e1 Mon Sep 17 00:00:00 2001 From: jaipaljadeja Date: Thu, 19 Dec 2024 20:16:36 +0530 Subject: [PATCH 7/8] examples: fix drizzle sink import --- examples/cli/indexers/1-evm.indexer.ts | 2 +- examples/cli/indexers/2-starknet.indexer.ts | 2 +- examples/indexer/src/indexer.ts | 2 +- examples/starknet-indexer/src/indexer.ts | 5 +---- 4 files changed, 4 insertions(+), 7 deletions(-) diff --git a/examples/cli/indexers/1-evm.indexer.ts b/examples/cli/indexers/1-evm.indexer.ts index 19042c0..9d06460 100644 --- a/examples/cli/indexers/1-evm.indexer.ts +++ b/examples/cli/indexers/1-evm.indexer.ts @@ -2,7 +2,7 @@ import { EvmStream } from "@apibara/evm"; import { defineIndexer, useSink } from "@apibara/indexer"; import { drizzlePersistence } from "@apibara/indexer/plugins/drizzle-persistence"; import { useLogger } from "@apibara/indexer/plugins/logger"; -import { drizzle as drizzleSink } from "@apibara/indexer/sinks/drizzle"; +import { drizzleSink } from "@apibara/indexer/sinks/drizzle"; import type { ApibaraRuntimeConfig } from "apibara/types"; import type { diff --git a/examples/cli/indexers/2-starknet.indexer.ts b/examples/cli/indexers/2-starknet.indexer.ts index b6f052d..28dc385 100644 --- a/examples/cli/indexers/2-starknet.indexer.ts +++ b/examples/cli/indexers/2-starknet.indexer.ts @@ -1,7 +1,7 @@ import { defineIndexer, useSink } from "@apibara/indexer"; import { drizzlePersistence } from "@apibara/indexer/plugins/drizzle-persistence"; import { useLogger } from "@apibara/indexer/plugins/logger"; -import { drizzle as drizzleSink } from "@apibara/indexer/sinks/drizzle"; +import { drizzleSink } from "@apibara/indexer/sinks/drizzle"; import { StarknetStream } from "@apibara/starknet"; import type { ApibaraRuntimeConfig } from "apibara/types"; diff --git a/examples/indexer/src/indexer.ts b/examples/indexer/src/indexer.ts index 274c894..1ac23b4 100644 --- a/examples/indexer/src/indexer.ts +++ b/examples/indexer/src/indexer.ts @@ -1,6 +1,6 @@ import { EvmStream } from "@apibara/evm"; import { defineIndexer, useSink } from "@apibara/indexer"; -import { drizzle as drizzleSink } from "@apibara/indexer/sinks/drizzle"; +import { drizzleSink } from "@apibara/indexer/sinks/drizzle"; import consola from "consola"; import { pgTable, serial, text, varchar } from "drizzle-orm/pg-core"; import { drizzle } from "drizzle-orm/postgres-js"; diff --git a/examples/starknet-indexer/src/indexer.ts b/examples/starknet-indexer/src/indexer.ts index f1ebc77..f48a0c3 100644 --- a/examples/starknet-indexer/src/indexer.ts +++ b/examples/starknet-indexer/src/indexer.ts @@ -1,8 +1,5 @@ import { defineIndexer, useSink } from "@apibara/indexer"; -import { - drizzle as drizzleSink, - pgIndexerTable, -} from "@apibara/indexer/sinks/drizzle"; +import { drizzleSink, pgIndexerTable } from "@apibara/indexer/sinks/drizzle"; import { StarknetStream } from "@apibara/starknet"; import consola from "consola"; import { bigint } from "drizzle-orm/pg-core"; From 3b81140a933c3bf5b21304fe3551644abd0c345c Mon Sep 17 00:00:00 2001 From: jaipaljadeja Date: Fri, 20 Dec 2024 01:00:30 +0530 Subject: [PATCH 8/8] actions: add mongodb service --- .github/workflows/build.yml | 6 ++++++ ...ibara-indexer-b2b2208a-7a4e-4779-826d-c246b023891a.json | 7 +++++++ packages/sink-mongo/package.json | 3 ++- packages/sink-mongo/src/mongo.ts | 5 ----- 4 files changed, 15 insertions(+), 6 deletions(-) create mode 100644 change/@apibara-indexer-b2b2208a-7a4e-4779-826d-c246b023891a.json diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 3575284..14bc762 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -33,6 +33,12 @@ jobs: uses: arduino/setup-protoc@v1 with: repo-token: ${{ secrets.GITHUB_TOKEN }} + - name: Start MongoDB + uses: supercharge/mongodb-github-action@1.11.0 + with: + mongodb-version: '7.0' + mongodb-replica-set: rs0 + mongodb-port: 27017 - name: Install dependencies run: pnpm install --strict-peer-dependencies=false - name: Run lint diff --git a/change/@apibara-indexer-b2b2208a-7a4e-4779-826d-c246b023891a.json b/change/@apibara-indexer-b2b2208a-7a4e-4779-826d-c246b023891a.json new file mode 100644 index 0000000..c865174 --- /dev/null +++ b/change/@apibara-indexer-b2b2208a-7a4e-4779-826d-c246b023891a.json @@ -0,0 +1,7 @@ +{ + "type": "prerelease", + "comment": "indexer: rename drizzle sink factory and fix update method", + "packageName": "@apibara/indexer", + "email": "jadejajaipal5@gmail.com", + "dependentChangeType": "patch" +} diff --git a/packages/sink-mongo/package.json b/packages/sink-mongo/package.json index 062d81a..eed2f22 100644 --- a/packages/sink-mongo/package.json +++ b/packages/sink-mongo/package.json @@ -22,7 +22,8 @@ "typecheck": "tsc --noEmit", "lint": "biome check .", "lint:fix": "pnpm lint --write", - "test": "vitest" + "test": "vitest", + "test:ci": "vitest run" }, "devDependencies": { "@types/node": "^20.14.0", diff --git a/packages/sink-mongo/src/mongo.ts b/packages/sink-mongo/src/mongo.ts index a5c563f..4a56dda 100644 --- a/packages/sink-mongo/src/mongo.ts +++ b/packages/sink-mongo/src/mongo.ts @@ -32,7 +32,6 @@ export class MongoSink extends Sink { db: new MongoSinkTransactionDb(db, session, endCursor), session, }); - return "Transaction committed."; }), ); } @@ -54,8 +53,6 @@ export class MongoSink extends Sink { { session }, ); } - - return "Transaction committed."; }), ); } @@ -90,8 +87,6 @@ export class MongoSink extends Sink { { session }, ); } - - return "Transaction committed."; }), ); }