Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add mongo sink and some fixes for drizzle sink #127

Merged
merged 8 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"type": "prerelease",
"comment": "sink-mongo: add mongodb sink",
"packageName": "@apibara/sink-mongo",
"email": "[email protected]",
"dependentChangeType": "patch"
}
7 changes: 7 additions & 0 deletions packages/sink-mongo/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# `@apibara/sink-mongo`

TODO

## Installation

TODO
11 changes: 11 additions & 0 deletions packages/sink-mongo/build.config.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import { defineBuildConfig } from "unbuild";

export default defineBuildConfig({
entries: ["./src/index.ts"],
clean: true,
outDir: "./dist",
declaration: true,
rollup: {
emitCJS: true,
},
});
39 changes: 39 additions & 0 deletions packages/sink-mongo/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
{
"name": "@apibara/sink-mongo",
"version": "2.0.0-beta.26",
"type": "module",
"files": [
"dist",
"src",
"README.md"
],
"main": "./dist/index.mjs",
"types": "./dist/index.d.ts",
"exports": {
".": {
"types": "./dist/index.d.ts",
"import": "./dist/index.mjs",
"require": "./dist/index.cjs",
"default": "./dist/index.mjs"
}
},
"scripts": {
"build": "unbuild",
"typecheck": "tsc --noEmit",
"lint": "biome check .",
"lint:fix": "pnpm lint --write"
},
"devDependencies": {
"@types/node": "^20.14.0",
"mongodb": "^6.12.0",
"unbuild": "^2.0.0",
"vitest": "^1.6.0"
},
"peerDependencies": {
"mongodb": "^6.12.0"
},
"dependencies": {
"@apibara/indexer": "workspace:*",
"@apibara/protocol": "workspace:*"
}
}
184 changes: 184 additions & 0 deletions packages/sink-mongo/src/collection.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
import type { Cursor } from "@apibara/protocol";
import type {
BulkWriteOptions,
ClientSession,
Collection,
Condition,
DeleteOptions,
Document,
Filter,
FindCursor,
FindOptions,
InsertManyResult,
InsertOneOptions,
InsertOneResult,
MatchKeysAndValues,
OptionalUnlessRequiredId,
UpdateFilter,
UpdateOptions,
UpdateResult,
WithId,
} from "mongodb";

export type MongoCursor = {
from: number | null;
to: number | null;
};

export type CursoredSchema<TSchema extends Document> = TSchema & {
_cursor: MongoCursor;
};

export class MongoSinkCollection<TSchema extends Document> {
constructor(
private session: ClientSession,
private collection: Collection<TSchema>,
private endCursor?: Cursor,
) {}

async insertOne(
doc: OptionalUnlessRequiredId<TSchema>,
options?: InsertOneOptions,
): Promise<InsertOneResult<TSchema>> {
return await this.collection.insertOne(
{
...doc,
_cursor: {
from: Number(this.endCursor?.orderKey),
to: null,
} as MongoCursor,
},
{ ...options, session: this.session },
);
}

async insertMany(
docs: ReadonlyArray<OptionalUnlessRequiredId<TSchema>>,
options?: BulkWriteOptions,
): Promise<InsertManyResult<TSchema>> {
return await this.collection.insertMany(
docs.map((doc) => ({
...doc,
_cursor: {
from: Number(this.endCursor?.orderKey),
to: null,
} as MongoCursor,
})),
{ ...options, session: this.session },
);
}

async updateOne(
filter: Filter<TSchema>,
update: UpdateFilter<TSchema>,
options?: UpdateOptions,
): Promise<UpdateResult<TSchema>> {
return await this.collection.updateOne(
{
...filter,
_cursor: {
to: null,
} as Condition<MongoCursor | null>,
},
{
...update,
$set: {
...update.$set,
"_cursor.to": Number(this.endCursor?.orderKey),
} as unknown as MatchKeysAndValues<TSchema>,
},
{ ...options, session: this.session },
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this one should also duplicate the previously existing value. The easiest way (double check to make sure I'm correct) is to use findOneAndUpdate asking to return the value before the update. Notice that in this case the update $set action should set the lower bound to the endCursor (since we are updating in place the old value to be the new value).

  • const updated = this.collection.findOneAndUpdate(..., ..., { returnDocument: "before" })
  • updated._cursor[to] = endCursor.orderKey
  • this.collection.insert(updated);

}

async updateMany(
filter: Filter<TSchema>,
update: UpdateFilter<TSchema>,
options?: UpdateOptions,
): Promise<UpdateResult<TSchema>> {
return await this.collection.updateMany(
{
...filter,
_cursor: { to: null },
},
{
...update,
$set: {
...update.$set,
"_cursor.to": Number(this.endCursor?.orderKey),
} as unknown as MatchKeysAndValues<TSchema>,
},
{ ...options, session: this.session },
);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method should:

  • select the old values with find
  • update to the new values with updateMany. Remember to change it to set the _cursor.from to endCursor and leaving _cursor.to unchanged.
  • adjust the cursor.to of the old values
  • insert the old values back into the db


async deleteOne(
filter?: Filter<TSchema>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make the filter non nullable.

options?: DeleteOptions,
): Promise<UpdateResult<TSchema>> {
return await this.collection.updateOne(
{
...((filter ?? {}) as Filter<TSchema>),
_cursor: {
to: null,
} as Condition<MongoCursor | null>,
},
{
$set: {
"_cursor.to": Number(this.endCursor?.orderKey),
} as unknown as MatchKeysAndValues<TSchema>,
},
{ ...options, session: this.session },
);
}

async deleteMany(
filter?: Filter<TSchema>,
options?: DeleteOptions,
): Promise<UpdateResult<TSchema>> {
return await this.collection.updateMany(
{
...((filter ?? {}) as Filter<TSchema>),
_cursor: {
to: null,
} as Condition<MongoCursor | null>,
},
{
$set: {
"_cursor.to": Number(this.endCursor?.orderKey),
} as unknown as MatchKeysAndValues<TSchema>,
},
{ ...options, session: this.session },
);
}

async findOne(
filter: Filter<TSchema>,
options: Omit<FindOptions, "timeoutMode">,
): Promise<WithId<TSchema> | null> {
return await this.collection.findOne(
{
...filter,
_cursor: {
to: null,
} as Condition<MongoCursor | null>,
},
{ ...options, session: this.session },
);
}

find(
filter: Filter<TSchema>,
options?: FindOptions,
): FindCursor<WithId<TSchema>> {
return this.collection.find(
{
...filter,
_cursor: {
to: null,
} as Condition<MongoCursor | null>,
},
{ ...options, session: this.session },
);
}
}
3 changes: 3 additions & 0 deletions packages/sink-mongo/src/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
export * from "./mongo";
export * from "./transaction";
export * from "./collection";
102 changes: 102 additions & 0 deletions packages/sink-mongo/src/mongo.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
import { Sink, type SinkCursorParams } from "@apibara/indexer";
import type { Cursor } from "@apibara/protocol";
import type { ClientSession, DbOptions, MongoClient } from "mongodb";
import { MongoSinkTransactionDb } from "./transaction";

export interface MongoSinkOptions {
client: MongoClient;
dbName: string;
dbOptions?: DbOptions;
collections: string[];
}

export class MongoSink extends Sink {
constructor(
private client: MongoClient,
private config: Omit<MongoSinkOptions, "client">,
) {
super();
}

async transaction(
{ cursor, endCursor, finality }: SinkCursorParams,
cb: (params: {
db: MongoSinkTransactionDb;
session: ClientSession;
}) => Promise<void>,
): Promise<void> {
await this.client.withSession(async (session) =>
session.withTransaction(async (session) => {
const db = this.client.db(this.config.dbName, this.config.dbOptions);
await cb({
db: new MongoSinkTransactionDb(db, session, endCursor),
session,
});
return "Transaction committed.";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this needed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i was just testing if adding this resolves some errors XD.
not required.

}),
);
}

async finalize(cursor?: Cursor) {
if (cursor?.orderKey === undefined) return;

await this.client.withSession(async (session) =>
session.withTransaction(async (session) => {
const db = this.client.db(this.config.dbName, this.config.dbOptions);
const orderKeyValue = Number(cursor.orderKey);

for (const collection of this.config.collections) {
// Delete documents where the upper bound of _cursor is less than the finalize cursor
await db.collection(collection).deleteMany(
{
"_cursor.to": { $lt: orderKeyValue },
},
{ session },
);
}
}),
);
}

async invalidate(cursor?: Cursor) {
if (cursor?.orderKey === undefined) return;

this.client.withSession(async (session) =>
session.withTransaction(async (session) => {
const db = this.client.db(this.config.dbName, this.config.dbOptions);
const orderKeyValue = Number(cursor.orderKey);
for (const collection of this.config.collections) {
// Delete documents where the lower bound of _cursor is greater than the invalidate cursor
await db.collection(collection).deleteMany(
{
"cursor.from": {
$gt: orderKeyValue,
},
},
{ session },
);

// Update documents where the upper bound of _cursor is greater than the invalidate cursor
await db.collection(collection).updateMany(
{ "_cursor.to": { $gt: orderKeyValue } },
{
$set: {
"_cursor.to": null,
},
},
{ session },
);
}
}),
);
}

async invalidateOnRestart(cursor?: Cursor) {
await this.invalidate(cursor);
}
}

export const mongo = (args: MongoSinkOptions) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should name it mongoSink or it's going to be annoying when we use it in a project.

const { client, ...rest } = args;
return new MongoSink(client, rest);
};
24 changes: 24 additions & 0 deletions packages/sink-mongo/src/transaction.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import type { Cursor } from "@apibara/protocol";
import type { ClientSession, CollectionOptions, Db } from "mongodb";
import { MongoSinkCollection } from "./collection";

export class MongoSinkTransactionDb {
constructor(
private db: Db,
private session: ClientSession,
private endCursor?: Cursor,
) {}

collection<TSchema extends Document = Document>(
name: string,
options?: CollectionOptions,
) {
const collection = this.db.collection<TSchema>(name, options);

return new MongoSinkCollection<TSchema>(
this.session,
collection,
this.endCursor,
);
}
}
11 changes: 11 additions & 0 deletions packages/sink-mongo/tsconfig.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
"extends": "../../tsconfig.json",
"compilerOptions": {
"outDir": "dist",
"declarationDir": "dist",
"noEmit": false,
"rootDir": "src",
"types": ["node"]
},
"include": ["src/"]
}
Loading
Loading