-
Notifications
You must be signed in to change notification settings - Fork 114
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
fix(checkpoint-mongodb): apply filters correctly in list method
fixes #581
- Loading branch information
1 parent
cc6625c
commit b840061
Showing
12 changed files
with
563 additions
and
59 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
110 changes: 110 additions & 0 deletions
110
libs/checkpoint-mongodb/src/migrations/1_object_metadata.ts
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,110 @@ | ||
import { Binary, ObjectId, Collection, Document, WithId } from "mongodb"; | ||
import { CheckpointMetadata } from "@langchain/langgraph-checkpoint"; | ||
import { Migration, MigrationParams } from "./base.js"; | ||
|
||
const BULK_WRITE_SIZE = 100; | ||
|
||
interface OldCheckpointDocument { | ||
parent_checkpoint_id: string | undefined; | ||
type: string; | ||
checkpoint: Binary; | ||
metadata: Binary; | ||
thread_id: string; | ||
checkpoint_ns: string | undefined; | ||
checkpoint_id: string; | ||
} | ||
|
||
interface NewCheckpointDocument { | ||
parent_checkpoint_id: string | undefined; | ||
type: string; | ||
checkpoint: Binary; | ||
metadata: CheckpointMetadata; | ||
thread_id: string; | ||
checkpoint_ns: string | undefined; | ||
checkpoint_id: string; | ||
} | ||
|
||
export class Migration1ObjectMetadata extends Migration { | ||
version = 1; | ||
|
||
constructor(params: MigrationParams) { | ||
super(params); | ||
} | ||
|
||
override async apply() { | ||
const db = this.client.db(this.dbName); | ||
const checkpointCollection = db.collection(this.checkpointCollectionName); | ||
const schemaVersionCollection = db.collection( | ||
this.schemaVersionCollectionName | ||
); | ||
|
||
// Fetch all documents from the checkpoints collection | ||
const cursor = checkpointCollection.find({}); | ||
|
||
let updateBatch: { | ||
id: string; | ||
newDoc: NewCheckpointDocument; | ||
}[] = []; | ||
|
||
for await (const doc of cursor) { | ||
// already migrated | ||
if (!(doc.metadata._bsontype && doc.metadata._bsontype === "Binary")) { | ||
continue; | ||
} | ||
|
||
const oldDoc = doc as WithId<OldCheckpointDocument>; | ||
|
||
const metadata: CheckpointMetadata = await this.serializer.loadsTyped( | ||
oldDoc.type, | ||
oldDoc.metadata.value() | ||
); | ||
|
||
const newDoc: NewCheckpointDocument = { | ||
...oldDoc, | ||
metadata, | ||
}; | ||
|
||
updateBatch.push({ | ||
id: doc._id.toString(), | ||
newDoc, | ||
}); | ||
|
||
if (updateBatch.length >= BULK_WRITE_SIZE) { | ||
await this.flushBatch(updateBatch, checkpointCollection); | ||
updateBatch = []; | ||
} | ||
} | ||
|
||
if (updateBatch.length > 0) { | ||
await this.flushBatch(updateBatch, checkpointCollection); | ||
} | ||
|
||
// Update schema version to 1 | ||
await schemaVersionCollection.updateOne( | ||
{}, | ||
{ $set: { version: 1 } }, | ||
{ upsert: true } | ||
); | ||
} | ||
|
||
private async flushBatch( | ||
updateBatch: { | ||
id: string; | ||
newDoc: NewCheckpointDocument; | ||
}[], | ||
checkpointCollection: Collection<Document> | ||
) { | ||
if (updateBatch.length === 0) { | ||
throw new Error("No updates to apply"); | ||
} | ||
|
||
const bulkOps = updateBatch.map(({ id, newDoc: newCheckpoint }) => ({ | ||
updateOne: { | ||
filter: { _id: new ObjectId(id) }, | ||
update: { $set: newCheckpoint }, | ||
}, | ||
})); | ||
|
||
await checkpointCollection.bulkWrite(bulkOps); | ||
} | ||
} |
Oops, something went wrong.