Skip to content

Commit

Permalink
plugin-mongo: port to middleware
Browse files Browse the repository at this point in the history
  • Loading branch information
fracek committed Dec 21, 2024
1 parent bf0ab99 commit b1cba90
Show file tree
Hide file tree
Showing 19 changed files with 428 additions and 439 deletions.
4 changes: 1 addition & 3 deletions packages/indexer/src/indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,7 @@ export interface IndexerHooks<TFilter, TBlock> {
request: StreamDataRequest<TFilter>;
options: StreamDataOptions;
}) => void;
"connect:after": ({
request,
}: { request: StreamDataRequest<TFilter> }) => void;
"connect:after": ({ request }: { request: StreamDataRequest<TFilter> }) => void;
"connect:factory": ({
request,
endCursor,
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
12 changes: 12 additions & 0 deletions packages/plugin-mongo/docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
services:
mongo1:
image: mongo:7.0
command: ["--replSet", "rs0", "--bind_ip_all", "--port", "27017"]
ports:
- 27017:27017
healthcheck:
test: echo "try { rs.status() } catch (err) { rs.initiate({_id:'rs0',members:[{_id:0,host:'localhost:27017'}]}) }" | mongosh --port 27017 --quiet
interval: 5s
timeout: 30s
start_period: 0s
retries: 30
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"name": "@apibara/sink-mongo",
"name": "@apibara/plugin-mongo",
"version": "2.0.0-beta.27",
"type": "module",
"files": [
Expand Down
104 changes: 104 additions & 0 deletions packages/plugin-mongo/src/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
import { useIndexerContext } from "@apibara/indexer";
import { defineIndexerPlugin } from "@apibara/indexer/plugins";
import type { DbOptions, MongoClient } from "mongodb";

import { finalize, invalidate } from "./mongo";
import { MongoStorage } from "./storage";
import { MongoStorageError, withTransaction } from "./utils";

export { MongoCollection, MongoStorage } from "./storage";

const MONGO_PROPERTY = "_mongo";

export function useMongoStorage(): MongoStorage {
const context = useIndexerContext();

if (!context[MONGO_PROPERTY]) {
throw new MongoStorageError(
"mongo storage is not available. Did you register the plugin?",
);
}

return context[MONGO_PROPERTY] as MongoStorage;
}

export interface MongoStorageOptions {
client: MongoClient;
dbName: string;
dbOptions?: DbOptions;
collections: string[];
persistState?: boolean;
}

export function mongoStorage<TFilter, TBlock>({
client,
dbName,
dbOptions,
collections,
persistState: enablePersistence = true,
}: MongoStorageOptions) {
return defineIndexerPlugin<TFilter, TBlock>((indexer) => {
indexer.hooks.hook("message:finalize", async ({ message }) => {
const { cursor } = message.finalize;

if (!cursor) {
throw new MongoStorageError("finalized cursor is undefined");
}

await withTransaction(client, async (session) => {
const db = client.db(dbName, dbOptions);
await finalize(db, session, cursor, collections);
});
});

indexer.hooks.hook("message:invalidate", async ({ message }) => {
const { cursor } = message.invalidate;

if (!cursor) {
throw new MongoStorageError("invalidate cursor is undefined");
}

await withTransaction(client, async (session) => {
const db = client.db(dbName, dbOptions);
await invalidate(db, session, cursor, collections);
});
});

indexer.hooks.hook("connect:after", async ({ request }) => {
// On restart, we need to invalidate data for blocks that were processed but not persisted.
const cursor = request.startingCursor;

if (!cursor) {
return;
}

await withTransaction(client, async (session) => {
const db = client.db(dbName, dbOptions);
await invalidate(db, session, cursor, collections);
});
});

indexer.hooks.hook("handler:middleware", async ({ use }) => {
use(async (context, next) => {
const { endCursor } = context;

if (!endCursor) {
throw new MongoStorageError("end cursor is undefined");
}

await withTransaction(client, async (session) => {
const db = client.db(dbName, dbOptions);
context[MONGO_PROPERTY] = new MongoStorage(db, session, endCursor);

await next();

delete context[MONGO_PROPERTY];

if (enablePersistence) {
// TODO: persist state
}
});
});
});
});
}
51 changes: 51 additions & 0 deletions packages/plugin-mongo/src/mongo.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import type { Cursor } from "@apibara/protocol";
import type { ClientSession, Db } from "mongodb";

export async function invalidate(
db: Db,
session: ClientSession,
cursor: Cursor,
collections: string[],
) {
const orderKeyValue = Number(cursor.orderKey);
for (const collection of collections) {
// Delete documents where the lower bound of _cursor is greater than the invalidate cursor
await db.collection(collection).deleteMany(
{
"_cursor.from": {
$gt: orderKeyValue,
},
},
{ session },
);

// Update documents where the upper bound of _cursor is greater than the invalidate cursor
await db.collection(collection).updateMany(
{ "_cursor.to": { $gt: orderKeyValue } },
{
$set: {
"_cursor.to": null,
},
},
{ session },
);
}
}

export async function finalize(
db: Db,
session: ClientSession,
cursor: Cursor,
collections: string[],
) {
const orderKeyValue = Number(cursor.orderKey);
for (const collection of collections) {
// Delete documents where the upper bound of _cursor is less than the finalize cursor
await db.collection(collection).deleteMany(
{
"_cursor.to": { $lt: orderKeyValue },
},
{ session },
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ import type {
BulkWriteOptions,
ClientSession,
Collection,
CollectionOptions,
Db,
DeleteOptions,
Document,
Filter,
Expand All @@ -20,6 +22,27 @@ import type {
WithId,
} from "mongodb";

export class MongoStorage {
constructor(
private db: Db,
private session: ClientSession,
private endCursor?: Cursor,
) {}

collection<TSchema extends Document = Document>(
name: string,
options?: CollectionOptions,
) {
const collection = this.db.collection<TSchema>(name, options);

return new MongoCollection<TSchema>(
this.session,
collection,
this.endCursor,
);
}
}

export type MongoCursor = {
from: number | null;
to: number | null;
Expand All @@ -29,7 +52,7 @@ export type CursoredSchema<TSchema extends Document> = TSchema & {
_cursor: MongoCursor;
};

export class MongoSinkCollection<TSchema extends Document> {
export class MongoCollection<TSchema extends Document> {
constructor(
private session: ClientSession,
private collection: Collection<TSchema>,
Expand Down
19 changes: 19 additions & 0 deletions packages/plugin-mongo/src/utils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import type { ClientSession, MongoClient } from "mongodb";

export class MongoStorageError extends Error {
constructor(message: string) {
super(message);
this.name = "MongoStorageError";
}
}

export async function withTransaction<T>(
client: MongoClient,
cb: (session: ClientSession) => Promise<T>,
) {
return await client.withSession(async (session) => {
return await session.withTransaction(async (session) => {
return await cb(session);
});
});
}
Loading

0 comments on commit b1cba90

Please sign in to comment.