Skip to content

Commit

Permalink
sink-mongo: add mongodb sink
Browse files Browse the repository at this point in the history
  • Loading branch information
jaipaljadeja committed Dec 16, 2024
1 parent c294f4d commit 33a04f7
Show file tree
Hide file tree
Showing 10 changed files with 511 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"type": "prerelease",
"comment": "sink-mongo: add mongodb sink",
"packageName": "@apibara/sink-mongo",
"email": "[email protected]",
"dependentChangeType": "patch"
}
7 changes: 7 additions & 0 deletions packages/sink-mongo/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# `@apibara/sink-mongo`

TODO

## Installation

TODO
11 changes: 11 additions & 0 deletions packages/sink-mongo/build.config.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import { defineBuildConfig } from "unbuild";

export default defineBuildConfig({
entries: ["./src/index.ts"],
clean: true,
outDir: "./dist",
declaration: true,
rollup: {
emitCJS: true,
},
});
39 changes: 39 additions & 0 deletions packages/sink-mongo/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
{
"name": "@apibara/sink-mongo",
"version": "2.0.0-beta.26",
"type": "module",
"files": [
"dist",
"src",
"README.md"
],
"main": "./dist/index.mjs",
"types": "./dist/index.d.ts",
"exports": {
".": {
"types": "./dist/index.d.ts",
"import": "./dist/index.mjs",
"require": "./dist/index.cjs",
"default": "./dist/index.mjs"
}
},
"scripts": {
"build": "unbuild",
"typecheck": "tsc --noEmit",
"lint": "biome check .",
"lint:fix": "pnpm lint --write"
},
"devDependencies": {
"@types/node": "^20.14.0",
"mongodb": "^6.12.0",
"unbuild": "^2.0.0",
"vitest": "^1.6.0"
},
"peerDependencies": {
"mongodb": "^6.12.0"
},
"dependencies": {
"@apibara/indexer": "workspace:*",
"@apibara/protocol": "workspace:*"
}
}
184 changes: 184 additions & 0 deletions packages/sink-mongo/src/collection.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
import type { Cursor } from "@apibara/protocol";
import type {
BulkWriteOptions,
ClientSession,
Collection,
Condition,
DeleteOptions,
Document,
Filter,
FindCursor,
FindOptions,
InsertManyResult,
InsertOneOptions,
InsertOneResult,
MatchKeysAndValues,
OptionalUnlessRequiredId,
UpdateFilter,
UpdateOptions,
UpdateResult,
WithId,
} from "mongodb";

export type MongoCursor = {
from: number | null;
to: number | null;
};

export type CursoredSchema<TSchema extends Document> = TSchema & {
_cursor: MongoCursor;
};

export class MongoSinkCollection<TSchema extends Document> {
constructor(
private session: ClientSession,
private collection: Collection<TSchema>,
private endCursor?: Cursor,
) {}

async insertOne(
doc: OptionalUnlessRequiredId<TSchema>,
options?: InsertOneOptions,
): Promise<InsertOneResult<TSchema>> {
return await this.collection.insertOne(
{
...doc,
_cursor: {
from: Number(this.endCursor?.orderKey),
to: null,
} as MongoCursor,
},
{ ...options, session: this.session },
);
}

async insertMany(
docs: ReadonlyArray<OptionalUnlessRequiredId<TSchema>>,
options?: BulkWriteOptions,
): Promise<InsertManyResult<TSchema>> {
return await this.collection.insertMany(
docs.map((doc) => ({
...doc,
_cursor: {
from: Number(this.endCursor?.orderKey),
to: null,
} as MongoCursor,
})),
{ ...options, session: this.session },
);
}

async updateOne(
filter: Filter<TSchema>,
update: UpdateFilter<TSchema>,
options?: UpdateOptions,
): Promise<UpdateResult<TSchema>> {
return await this.collection.updateOne(
{
...filter,
_cursor: {
to: null,
} as Condition<MongoCursor | null>,
},
{
...update,
$set: {
...update.$set,
"_cursor.to": Number(this.endCursor?.orderKey),
} as unknown as MatchKeysAndValues<TSchema>,
},
{ ...options, session: this.session },
);
}

async updateMany(
filter: Filter<TSchema>,
update: UpdateFilter<TSchema>,
options?: UpdateOptions,
): Promise<UpdateResult<TSchema>> {
return await this.collection.updateMany(
{
...filter,
_cursor: { to: null },
},
{
...update,
$set: {
...update.$set,
"_cursor.to": Number(this.endCursor?.orderKey),
} as unknown as MatchKeysAndValues<TSchema>,
},
{ ...options, session: this.session },
);
}

async deleteOne(
filter?: Filter<TSchema>,
options?: DeleteOptions,
): Promise<UpdateResult<TSchema>> {
return await this.collection.updateOne(
{
...((filter ?? {}) as Filter<TSchema>),
_cursor: {
to: null,
} as Condition<MongoCursor | null>,
},
{
$set: {
"_cursor.to": Number(this.endCursor?.orderKey),
} as unknown as MatchKeysAndValues<TSchema>,
},
{ ...options, session: this.session },
);
}

async deleteMany(
filter?: Filter<TSchema>,
options?: DeleteOptions,
): Promise<UpdateResult<TSchema>> {
return await this.collection.updateMany(
{
...((filter ?? {}) as Filter<TSchema>),
_cursor: {
to: null,
} as Condition<MongoCursor | null>,
},
{
$set: {
"_cursor.to": Number(this.endCursor?.orderKey),
} as unknown as MatchKeysAndValues<TSchema>,
},
{ ...options, session: this.session },
);
}

async findOne(
filter: Filter<TSchema>,
options: Omit<FindOptions, "timeoutMode">,
): Promise<WithId<TSchema> | null> {
return await this.collection.findOne(
{
...filter,
_cursor: {
to: null,
} as Condition<MongoCursor | null>,
},
{ ...options, session: this.session },
);
}

find(
filter: Filter<TSchema>,
options?: FindOptions,
): FindCursor<WithId<TSchema>> {
return this.collection.find(
{
...filter,
_cursor: {
to: null,
} as Condition<MongoCursor | null>,
},
{ ...options, session: this.session },
);
}
}
3 changes: 3 additions & 0 deletions packages/sink-mongo/src/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
export * from "./mongo";
export * from "./transaction";
export * from "./collection";
102 changes: 102 additions & 0 deletions packages/sink-mongo/src/mongo.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
import { Sink, type SinkCursorParams } from "@apibara/indexer";
import type { Cursor } from "@apibara/protocol";
import type { ClientSession, DbOptions, MongoClient } from "mongodb";
import { MongoSinkTransactionDb } from "./transaction";

export interface MongoSinkOptions {
client: MongoClient;
dbName: string;
dbOptions?: DbOptions;
collections: string[];
}

export class MongoSink extends Sink {
constructor(
private client: MongoClient,
private config: Omit<MongoSinkOptions, "client">,
) {
super();
}

async transaction(
{ cursor, endCursor, finality }: SinkCursorParams,
cb: (params: {
db: MongoSinkTransactionDb;
session: ClientSession;
}) => Promise<void>,
): Promise<void> {
await this.client.withSession(async (session) =>
session.withTransaction(async (session) => {
const db = this.client.db(this.config.dbName, this.config.dbOptions);
await cb({
db: new MongoSinkTransactionDb(db, session, endCursor),
session,
});
return "Transaction committed.";
}),
);
}

async finalize(cursor?: Cursor) {
if (cursor?.orderKey === undefined) return;

await this.client.withSession(async (session) =>
session.withTransaction(async (session) => {
const db = this.client.db(this.config.dbName, this.config.dbOptions);
const orderKeyValue = Number(cursor.orderKey);

for (const collection of this.config.collections) {
// Delete documents where the upper bound of _cursor is less than the finalize cursor
await db.collection(collection).deleteMany(
{
"_cursor.to": { $lt: orderKeyValue },
},
{ session },
);
}
}),
);
}

async invalidate(cursor?: Cursor) {
if (cursor?.orderKey === undefined) return;

this.client.withSession(async (session) =>
session.withTransaction(async (session) => {
const db = this.client.db(this.config.dbName, this.config.dbOptions);
const orderKeyValue = Number(cursor.orderKey);
for (const collection of this.config.collections) {
// Delete documents where the lower bound of _cursor is greater than the invalidate cursor
await db.collection(collection).deleteMany(
{
"cursor.from": {
$gt: orderKeyValue,
},
},
{ session },
);

// Update documents where the upper bound of _cursor is greater than the invalidate cursor
await db.collection(collection).updateMany(
{ "_cursor.to": { $gt: orderKeyValue } },
{
$set: {
"_cursor.to": null,
},
},
{ session },
);
}
}),
);
}

async invalidateOnRestart(cursor?: Cursor) {
await this.invalidate(cursor);
}
}

export const mongo = (args: MongoSinkOptions) => {
const { client, ...rest } = args;
return new MongoSink(client, rest);
};
24 changes: 24 additions & 0 deletions packages/sink-mongo/src/transaction.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import type { Cursor } from "@apibara/protocol";
import type { ClientSession, CollectionOptions, Db } from "mongodb";
import { MongoSinkCollection } from "./collection";

export class MongoSinkTransactionDb {
constructor(
private db: Db,
private session: ClientSession,
private endCursor?: Cursor,
) {}

collection<TSchema extends Document = Document>(
name: string,
options?: CollectionOptions,
) {
const collection = this.db.collection<TSchema>(name, options);

return new MongoSinkCollection<TSchema>(
this.session,
collection,
this.endCursor,
);
}
}
11 changes: 11 additions & 0 deletions packages/sink-mongo/tsconfig.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
"extends": "../../tsconfig.json",
"compilerOptions": {
"outDir": "dist",
"declarationDir": "dist",
"noEmit": false,
"rootDir": "src",
"types": ["node"]
},
"include": ["src/"]
}
Loading

0 comments on commit 33a04f7

Please sign in to comment.