From 1b701f29468ca665f4959781ead46719eed002af Mon Sep 17 00:00:00 2001 From: Jacob Lee Date: Tue, 20 Aug 2024 09:57:57 -0700 Subject: [PATCH] Refactor to use PregelLoop (#330) * Refactor checkpoint * Fix lint and format * Fix tests, fix UUIDv5 issue * Fix build * Refactor _applyWrites * Finish initial loop implementation * Fix build * Continued refactor * Fix test --- langgraph/package.json | 2 + langgraph/src/channels/base.ts | 9 + langgraph/src/checkpoint/base.ts | 36 +- langgraph/src/checkpoint/index.ts | 2 +- langgraph/src/checkpoint/memory.ts | 13 +- langgraph/src/checkpoint/sqlite.ts | 301 ++++++----- langgraph/src/checkpoint/types.ts | 32 ++ langgraph/src/constants.ts | 14 +- langgraph/src/errors.ts | 22 + langgraph/src/graph/graph.ts | 9 +- langgraph/src/graph/state.ts | 10 +- langgraph/src/pregel/algo.ts | 213 +++++--- langgraph/src/pregel/debug.ts | 202 ++++++- langgraph/src/pregel/index.ts | 685 ++++++++++-------------- langgraph/src/pregel/io.ts | 65 +-- langgraph/src/pregel/loop.ts | 496 +++++++++++++++++ langgraph/src/pregel/types.ts | 86 ++- langgraph/src/pregel/utils.ts | 38 ++ langgraph/src/pregel/write.ts | 5 +- langgraph/src/tests/checkpoints.test.ts | 2 + langgraph/src/tests/pregel.test.ts | 358 +++++++++---- langgraph/src/tests/tracing.test.ts | 133 +++-- langgraph/src/tests/utils.ts | 12 +- langgraph/src/utils.ts | 46 ++ langgraph/src/web.ts | 2 +- yarn.lock | 16 + 26 files changed, 1911 insertions(+), 898 deletions(-) create mode 100644 langgraph/src/checkpoint/types.ts create mode 100644 langgraph/src/pregel/loop.ts create mode 100644 langgraph/src/pregel/utils.ts diff --git a/langgraph/package.json b/langgraph/package.json index 011568c73..667b4e5d8 100644 --- a/langgraph/package.json +++ b/langgraph/package.json @@ -31,6 +31,7 @@ "license": "MIT", "dependencies": { "@langchain/core": ">=0.2.20 <0.3.0", + "double-ended-queue": "^2.1.0-0", "uuid": "^10.0.0", "zod": "^3.23.8" }, @@ -44,6 +45,7 @@ "@swc/jest": "^0.2.29", "@tsconfig/recommended": "^1.0.3", "@types/better-sqlite3": "^7.6.9", + "@types/double-ended-queue": "^2", "@types/uuid": "^10", "@typescript-eslint/eslint-plugin": "^6.12.0", "@typescript-eslint/parser": "^6.12.0", diff --git a/langgraph/src/channels/base.ts b/langgraph/src/channels/base.ts index 2822bd4c9..3406dadf4 100644 --- a/langgraph/src/channels/base.ts +++ b/langgraph/src/channels/base.ts @@ -52,6 +52,15 @@ export abstract class BaseChannel< * @returns {CheckpointType | undefined} */ abstract checkpoint(): CheckpointType | undefined; + + /** + * Mark the current value of the channel as consumed. By default, no-op. + * This is called by Pregel before the start of the next step, for all + * channels that triggered a node. If the channel was updated, return true. + */ + consume(): boolean { + return true; + } } export function emptyChannels>( diff --git a/langgraph/src/checkpoint/base.ts b/langgraph/src/checkpoint/base.ts index a7fffe48f..3f8f0b421 100644 --- a/langgraph/src/checkpoint/base.ts +++ b/langgraph/src/checkpoint/base.ts @@ -2,13 +2,13 @@ import { RunnableConfig } from "@langchain/core/runnables"; import { DefaultSerializer, SerializerProtocol } from "../serde/base.js"; import { uuid6 } from "./id.js"; import { SendInterface } from "../constants.js"; -import { - CheckpointMetadata, - CheckpointPendingWrite, +import type { PendingWrite, -} from "../pregel/types.js"; + CheckpointPendingWrite, + CheckpointMetadata, +} from "./types.js"; -export type { CheckpointMetadata }; +export type ChannelVersions = Record; export interface Checkpoint< N extends string = string, @@ -118,6 +118,13 @@ export interface CheckpointTuple { pendingWrites?: CheckpointPendingWrite[]; } +export type CheckpointListOptions = { + limit?: number; + before?: RunnableConfig; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + filter?: Record; +}; + export abstract class BaseCheckpointSaver { serde: SerializerProtocol = DefaultSerializer; @@ -136,14 +143,14 @@ export abstract class BaseCheckpointSaver { abstract list( config: RunnableConfig, - limit?: number, - before?: RunnableConfig + options?: CheckpointListOptions ): AsyncGenerator; abstract put( config: RunnableConfig, checkpoint: Checkpoint, - metadata: CheckpointMetadata + metadata: CheckpointMetadata, + newVersions: ChannelVersions ): Promise; /** @@ -154,4 +161,17 @@ export abstract class BaseCheckpointSaver { writes: PendingWrite[], taskId: string ): Promise; + + /** + * Generate the next version ID for a channel. + * + * Default is to use integer versions, incrementing by 1. If you override, you can use str/int/float versions, + * as long as they are monotonically increasing. + * + * TODO: Fix type + */ + // eslint-disable-next-line @typescript-eslint/no-explicit-any + getNextVersion(current: number | undefined, _channel: any) { + return current !== undefined ? current + 1 : 1; + } } diff --git a/langgraph/src/checkpoint/index.ts b/langgraph/src/checkpoint/index.ts index 066d5ccd7..7965fd726 100644 --- a/langgraph/src/checkpoint/index.ts +++ b/langgraph/src/checkpoint/index.ts @@ -1,8 +1,8 @@ export { MemorySaver } from "./memory.js"; export { type Checkpoint, - type CheckpointMetadata, copyCheckpoint, emptyCheckpoint, BaseCheckpointSaver, } from "./base.js"; +export { type CheckpointMetadata } from "./types.js"; diff --git a/langgraph/src/checkpoint/memory.ts b/langgraph/src/checkpoint/memory.ts index 0eeedad23..07e73ae11 100644 --- a/langgraph/src/checkpoint/memory.ts +++ b/langgraph/src/checkpoint/memory.ts @@ -2,11 +2,15 @@ import { RunnableConfig } from "@langchain/core/runnables"; import { BaseCheckpointSaver, Checkpoint, - CheckpointMetadata, + CheckpointListOptions, CheckpointTuple, } from "./base.js"; import { SerializerProtocol } from "../serde/base.js"; -import { CheckpointPendingWrite, PendingWrite } from "../pregel/types.js"; +import { + CheckpointMetadata, + CheckpointPendingWrite, + PendingWrite, +} from "../checkpoint/types.js"; function _generateKey( threadId: string, @@ -111,9 +115,10 @@ export class MemorySaver extends BaseCheckpointSaver { async *list( config: RunnableConfig, - limit?: number, - before?: RunnableConfig + options?: CheckpointListOptions ): AsyncGenerator { + // eslint-disable-next-line prefer-const + let { before, limit } = options ?? {}; const threadIds = config.configurable?.thread_id ? [config.configurable?.thread_id] : Object.keys(this.storage); diff --git a/langgraph/src/checkpoint/sqlite.ts b/langgraph/src/checkpoint/sqlite.ts index 2a25f7111..37c24f127 100644 --- a/langgraph/src/checkpoint/sqlite.ts +++ b/langgraph/src/checkpoint/sqlite.ts @@ -3,19 +3,30 @@ import { RunnableConfig } from "@langchain/core/runnables"; import { BaseCheckpointSaver, Checkpoint, - CheckpointMetadata, + CheckpointListOptions, CheckpointTuple, } from "./base.js"; import { SerializerProtocol } from "../serde/base.js"; -import { PendingWrite } from "../pregel/types.js"; +import type { PendingWrite, CheckpointMetadata } from "../checkpoint/types.js"; -// snake_case is used to match Python implementation -interface Row { +interface CheckpointRow { checkpoint: string; metadata: string; - parent_id?: string; + parent_checkpoint_id?: string; thread_id: string; checkpoint_id: string; + checkpoint_ns?: string; +} + +interface WritesRow { + thread_id: string; + checkpoint_ns: string; + checkpoint_id: string; + task_id: string; + idx: number; + channel: string; + type?: string; + value?: string; } export class SqliteSaver extends BaseCheckpointSaver { @@ -33,106 +44,123 @@ export class SqliteSaver extends BaseCheckpointSaver { return new SqliteSaver(new Database(connStringOrLocalPath)); } - private setup(): void { + protected setup(): void { if (this.isSetup) { return; } - try { - this.db.pragma("journal_mode=WAL"); - this.db.exec(` + this.db.pragma("journal_mode=WAL"); + this.db.exec(` CREATE TABLE IF NOT EXISTS checkpoints ( thread_id TEXT NOT NULL, + checkpoint_ns TEXT NOT NULL DEFAULT '', checkpoint_id TEXT NOT NULL, - parent_id TEXT, + parent_checkpoint_id TEXT, + type TEXT, checkpoint BLOB, metadata BLOB, - PRIMARY KEY (thread_id, checkpoint_id) + PRIMARY KEY (thread_id, checkpoint_ns, checkpoint_id) +);`); + this.db.exec(` +CREATE TABLE IF NOT EXISTS writes ( + thread_id TEXT NOT NULL, + checkpoint_ns TEXT NOT NULL DEFAULT '', + checkpoint_id TEXT NOT NULL, + task_id TEXT NOT NULL, + idx INTEGER NOT NULL, + channel TEXT NOT NULL, + type TEXT, + value BLOB, + PRIMARY KEY (thread_id, checkpoint_ns, checkpoint_id, task_id, idx) );`); - } catch (error) { - console.log("Error creating checkpoints table", error); - throw error; - } this.isSetup = true; } async getTuple(config: RunnableConfig): Promise { this.setup(); - const thread_id = config.configurable?.thread_id; - const checkpoint_id = config.configurable?.checkpoint_id; - + const { + thread_id, + checkpoint_ns = "", + checkpoint_id, + } = config.configurable ?? {}; + let row: CheckpointRow; if (checkpoint_id) { - try { - const row: Row = this.db - .prepare( - `SELECT checkpoint, parent_id, metadata FROM checkpoints WHERE thread_id = ? AND checkpoint_id = ?` - ) - .get(thread_id, checkpoint_id) as Row; - - if (row) { - return { - config, - checkpoint: (await this.serde.parse(row.checkpoint)) as Checkpoint, - metadata: (await this.serde.parse( - row.metadata - )) as CheckpointMetadata, - parentConfig: row.parent_id - ? { - configurable: { - thread_id, - checkpoint_id: row.parent_id, - }, - } - : undefined, - }; - } - } catch (error) { - console.log("Error retrieving checkpoint", error); - throw error; - } + row = this.db + .prepare( + `SELECT thread_id, checkpoint_id, parent_checkpoint_id, type, checkpoint, metadata FROM checkpoints WHERE thread_id = ? AND checkpoint_ns = ? AND checkpoint_id = ?` + ) + .get(thread_id, checkpoint_ns, checkpoint_id) as CheckpointRow; } else { - const row: Row = this.db + row = this.db .prepare( - `SELECT thread_id, checkpoint_id, parent_id, checkpoint, metadata FROM checkpoints WHERE thread_id = ? ORDER BY checkpoint_id DESC LIMIT 1` + `SELECT thread_id, checkpoint_id, parent_checkpoint_id, type, checkpoint, metadata FROM checkpoints WHERE thread_id = ? AND checkpoint_ns = ? ORDER BY checkpoint_id DESC LIMIT 1` ) - .get(thread_id) as Row; - - if (row) { - return { - config: { + .get(thread_id, checkpoint_ns) as CheckpointRow; + } + if (row === undefined) { + return undefined; + } + let finalConfig = config; + if (!checkpoint_id) { + finalConfig = { + configurable: { + thread_id: row.thread_id, + checkpoint_ns, + checkpoint_id: row.checkpoint_id, + }, + }; + } + if ( + finalConfig.configurable?.thread_id === undefined || + finalConfig.configurable?.checkpoint_id === undefined + ) { + throw new Error("Missing thread_id or checkpoint_id"); + } + // find any pending writes + const pendingWritesRows = this.db + .prepare( + `SELECT task_id, channel, type, value FROM writes WHERE thread_id = ? AND checkpoint_ns = ? AND checkpoint_id = ?` + ) + .all( + finalConfig.configurable.thread_id.toString(), + checkpoint_ns, + finalConfig.configurable.checkpoint_id.toString() + ) as WritesRow[]; + const pendingWrites = await Promise.all( + pendingWritesRows.map(async (row) => { + return [ + row.task_id, + row.channel, + await this.serde.parse(row.value ?? ""), + ] as [string, string, unknown]; + }) + ); + return { + config: finalConfig, + checkpoint: (await this.serde.parse(row.checkpoint)) as Checkpoint, + metadata: (await this.serde.parse(row.metadata)) as CheckpointMetadata, + parentConfig: row.parent_checkpoint_id + ? { configurable: { thread_id: row.thread_id, - checkpoint_id: row.checkpoint_id, + checkpoint_ns, + checkpoint_id: row.parent_checkpoint_id, }, - }, - checkpoint: (await this.serde.parse(row.checkpoint)) as Checkpoint, - metadata: (await this.serde.parse( - row.metadata - )) as CheckpointMetadata, - parentConfig: row.parent_id - ? { - configurable: { - thread_id: row.thread_id, - checkpoint_id: row.parent_id, - }, - } - : undefined, - }; - } - } - - return undefined; + } + : undefined, + pendingWrites, + }; } async *list( config: RunnableConfig, - limit?: number, - before?: RunnableConfig + options?: CheckpointListOptions ): AsyncGenerator { + const { limit, before } = options ?? {}; this.setup(); const thread_id = config.configurable?.thread_id; - let sql = `SELECT thread_id, checkpoint_id, parent_id, checkpoint, metadata FROM checkpoints WHERE thread_id = ? ${ + let sql = `SELECT thread_id, checkpoint_ns, checkpoint_id, parent_checkpoint_id, type, checkpoint, metadata FROM checkpoints WHERE thread_id = ? ${ before ? "AND checkpoint_id < ?" : "" } ORDER BY checkpoint_id DESC`; if (limit) { @@ -142,36 +170,35 @@ CREATE TABLE IF NOT EXISTS checkpoints ( Boolean ); - try { - const rows: Row[] = this.db.prepare(sql).all(...args) as Row[]; - - if (rows) { - for (const row of rows) { - yield { - config: { - configurable: { - thread_id: row.thread_id, - checkpoint_id: row.checkpoint_id, - }, + const rows: CheckpointRow[] = this.db + .prepare(sql) + .all(...args) as CheckpointRow[]; + + if (rows) { + for (const row of rows) { + yield { + config: { + configurable: { + thread_id: row.thread_id, + checkpoint_ns: row.checkpoint_ns, + checkpoint_id: row.checkpoint_id, }, - checkpoint: (await this.serde.parse(row.checkpoint)) as Checkpoint, - metadata: (await this.serde.parse( - row.metadata - )) as CheckpointMetadata, - parentConfig: row.parent_id - ? { - configurable: { - thread_id: row.thread_id, - checkpoint_id: row.parent_id, - }, - } - : undefined, - }; - } + }, + checkpoint: (await this.serde.parse(row.checkpoint)) as Checkpoint, + metadata: (await this.serde.parse( + row.metadata + )) as CheckpointMetadata, + parentConfig: row.parent_checkpoint_id + ? { + configurable: { + thread_id: row.thread_id, + checkpoint_ns: row.checkpoint_ns, + checkpoint_id: row.parent_checkpoint_id, + }, + } + : undefined, + }; } - } catch (error) { - console.log("Error listing checkpoints", error); - throw error; } } @@ -182,39 +209,61 @@ CREATE TABLE IF NOT EXISTS checkpoints ( ): Promise { this.setup(); - try { - const row = [ - config.configurable?.thread_id, - checkpoint.id, - config.configurable?.checkpoint_id, - this.serde.stringify(checkpoint), - this.serde.stringify(metadata), - ]; + const row = [ + config.configurable?.thread_id?.toString(), + config.configurable?.checkpoint_ns, + checkpoint.id, + config.configurable?.checkpoint_id, + "Checkpoint", + this.serde.stringify(checkpoint), + this.serde.stringify(metadata), + ]; - this.db - .prepare( - `INSERT OR REPLACE INTO checkpoints (thread_id, checkpoint_id, parent_id, checkpoint, metadata) VALUES (?, ?, ?, ?, ?)` - ) - .run(...row); - } catch (error) { - console.log("Error saving checkpoint", error); - throw error; - } + this.db + .prepare( + `INSERT OR REPLACE INTO checkpoints (thread_id, checkpoint_ns, checkpoint_id, parent_checkpoint_id, type, checkpoint, metadata) VALUES (?, ?, ?, ?, ?, ?, ?)` + ) + .run(...row); return { configurable: { thread_id: config.configurable?.thread_id, + checkpoint_ns: config.configurable?.checkpoint_ns, checkpoint_id: checkpoint.id, }, }; } - // TODO: Implement - putWrites( - _config: RunnableConfig, - _writes: PendingWrite[], - _taskId: string + async putWrites( + config: RunnableConfig, + writes: PendingWrite[], + taskId: string ): Promise { - throw new Error("Not implemented"); + this.setup(); + + const stmt = this.db.prepare(` + INSERT OR REPLACE INTO writes + (thread_id, checkpoint_ns, checkpoint_id, task_id, idx, channel, type, value) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) + `); + + const transaction = this.db.transaction((rows) => { + for (const row of rows) { + stmt.run(...row); + } + }); + + const rows = writes.map((write, idx) => [ + config.configurable?.thread_id, + config.configurable?.checkpoint_ns, + config.configurable?.checkpoint_id, + taskId, + idx, + write[0], + "Checkpoint", + this.serde.stringify(write[1]), + ]); + + transaction(rows); } } diff --git a/langgraph/src/checkpoint/types.ts b/langgraph/src/checkpoint/types.ts new file mode 100644 index 000000000..4e9e32898 --- /dev/null +++ b/langgraph/src/checkpoint/types.ts @@ -0,0 +1,32 @@ +export type All = "*"; + +export type PendingWriteValue = unknown; + +export type PendingWrite = [Channel, PendingWriteValue]; + +export type CheckpointPendingWrite = [ + TaskId, + ...PendingWrite +]; + +export interface CheckpointMetadata { + /** + * The source of the checkpoint. + * - "input": The checkpoint was created from an input to invoke/stream/batch. + * - "loop": The checkpoint was created from inside the pregel loop. + * - "update": The checkpoint was created from a manual state update. + */ + source: "input" | "loop" | "update"; + /** + * The step number of the checkpoint. + * -1 for the first "input" checkpoint. + * 0 for the first "loop" checkpoint. + * ... for the nth checkpoint afterwards. + */ + step: number; + /** + * The writes that were made between the previous checkpoint and this one. + * Mapping from node name to writes emitted by that node. + */ + writes: Record | null; +} diff --git a/langgraph/src/constants.ts b/langgraph/src/constants.ts index 9a9ffe523..76f341eea 100644 --- a/langgraph/src/constants.ts +++ b/langgraph/src/constants.ts @@ -1,6 +1,8 @@ +export const INPUT = "__input__"; export const CONFIG_KEY_SEND = "__pregel_send"; export const CONFIG_KEY_READ = "__pregel_read"; - +export const CONFIG_KEY_CHECKPOINTER = "__pregel_checkpointer"; +export const CONFIG_KEY_RESUMING = "__pregel_resuming"; export const INTERRUPT = "__interrupt__"; export const TAG_HIDDEN = "langsmith:hidden"; @@ -8,6 +10,16 @@ export const TAG_HIDDEN = "langsmith:hidden"; export const TASKS = "__pregel_tasks"; export const TASK_NAMESPACE = "6ba7b831-9dad-11d1-80b4-00c04fd430c8"; +export const RESERVED = [ + INTERRUPT, + TASKS, + CONFIG_KEY_SEND, + CONFIG_KEY_READ, + CONFIG_KEY_CHECKPOINTER, + CONFIG_KEY_RESUMING, + INPUT, +]; + export const CHECKPOINT_NAMESPACE_SEPARATOR = "|"; export interface SendInterface { diff --git a/langgraph/src/errors.ts b/langgraph/src/errors.ts index 4f492c51e..2d75ef125 100644 --- a/langgraph/src/errors.ts +++ b/langgraph/src/errors.ts @@ -20,6 +20,28 @@ export class GraphValueError extends Error { } } +export class GraphInterrupt extends Error { + constructor(message?: string) { + super(message); + this.name = "GraphInterrupt"; + } + + static get unminifiable_name() { + return "GraphInterrupt"; + } +} + +export class EmptyInputError extends Error { + constructor(message?: string) { + super(message); + this.name = "EmptyInputError"; + } + + static get unminifiable_name() { + return "EmptyInputError"; + } +} + export class EmptyChannelError extends Error { constructor(message?: string) { super(message); diff --git a/langgraph/src/graph/graph.ts b/langgraph/src/graph/graph.ts index c8a31c797..fdfcb7c15 100644 --- a/langgraph/src/graph/graph.ts +++ b/langgraph/src/graph/graph.ts @@ -11,7 +11,8 @@ import { } from "@langchain/core/runnables/graph"; import { z } from "zod"; import { PregelNode } from "../pregel/read.js"; -import { Channel, Pregel, PregelInterface } from "../pregel/index.js"; +import { Channel, Pregel } from "../pregel/index.js"; +import type { PregelParams } from "../pregel/types.js"; import { BaseCheckpointSaver } from "../checkpoint/base.js"; import { BaseChannel } from "../channels/base.js"; import { EphemeralValue } from "../channels/ephemeral_value.js"; @@ -266,8 +267,8 @@ export class Graph< [START]: new EphemeralValue(), [END]: new EphemeralValue(), } as Record, - inputs: START, - outputs: END, + inputChannels: START, + outputChannels: END, streamChannels: [] as N[], streamMode: "values", }); @@ -372,7 +373,7 @@ export class CompiledGraph< constructor({ builder, ...rest - }: { builder: Graph } & PregelInterface< + }: { builder: Graph } & PregelParams< Record>, Record >) { diff --git a/langgraph/src/graph/state.ts b/langgraph/src/graph/state.ts index 865c86a7d..e7b51de9d 100644 --- a/langgraph/src/graph/state.ts +++ b/langgraph/src/graph/state.ts @@ -152,7 +152,7 @@ export class StateGraph< // prepare output channels const stateKeys = Object.keys(this.channels); - const outputs = + const outputChannels = stateKeys.length === 1 && stateKeys[0] === ROOT ? stateKeys[0] : stateKeys; @@ -169,9 +169,9 @@ export class StateGraph< ...this.channels, [START]: new EphemeralValue(), } as Record, - inputs: START, - outputs, - streamChannels: outputs, + inputChannels: START, + outputChannels, + streamChannels: outputChannels, streamMode: "updates", }); @@ -348,7 +348,7 @@ export class CompiledStateGraph< return new ChannelWrite(writes, [TAG_HIDDEN]); }, // reader - (config) => ChannelRead.doRead(config, this.outputs, true) + (config) => ChannelRead.doRead(config, this.outputChannels, true) ) ); diff --git a/langgraph/src/pregel/algo.ts b/langgraph/src/pregel/algo.ts index 5a8948f96..9257eb037 100644 --- a/langgraph/src/pregel/algo.ts +++ b/langgraph/src/pregel/algo.ts @@ -4,12 +4,14 @@ import { patchConfig, RunnableConfig, } from "@langchain/core/runnables"; +import { CallbackManagerForChainRun } from "@langchain/core/callbacks/manager"; import { BaseChannel, createCheckpoint, emptyChannels, } from "../channels/base.js"; import { + BaseCheckpointSaver, Checkpoint, ReadonlyCheckpoint, copyCheckpoint, @@ -22,20 +24,18 @@ import { _isSend, _isSendInterface, CHECKPOINT_NAMESPACE_SEPARATOR, + CONFIG_KEY_CHECKPOINTER, CONFIG_KEY_READ, + CONFIG_KEY_RESUMING, CONFIG_KEY_SEND, INTERRUPT, + RESERVED, Send, TAG_HIDDEN, TASKS, } from "../constants.js"; -import { - All, - PendingWrite, - PendingWriteValue, - PregelExecutableTask, - PregelTaskDescription, -} from "./types.js"; +import { All, PregelExecutableTask, PregelTaskDescription } from "./types.js"; +import { PendingWrite, PendingWriteValue } from "../checkpoint/types.js"; import { EmptyChannelError, InvalidUpdateError } from "../errors.js"; import { uuid5 } from "../checkpoint/id.js"; @@ -46,6 +46,12 @@ export type StrRecord = { [P in K]: T; }; +export type WritesProtocol = { + name: string; + writes: PendingWrite[]; + triggers: string[]; +}; + export async function executeTasks( tasks: Array<() => Promise>, stepTimeout?: number, @@ -69,6 +75,7 @@ export async function executeTasks( // Start all tasks const started = tasks.map((task) => task()); + let listener: () => void; // Wait for all tasks to settle // If any tasks fail, or signal is aborted, the promise will reject await Promise.all( @@ -76,36 +83,49 @@ export async function executeTasks( ? [ ...started, new Promise((_resolve, reject) => { - signal?.addEventListener("abort", () => reject(new Error("Abort"))); - }), + listener = () => reject(new Error("Abort")); + signal?.addEventListener("abort", listener); + }).finally(() => signal?.removeEventListener("abort", listener)), ] : started ); } -export function _shouldInterrupt( - checkpoint: ReadonlyCheckpoint, - interruptNodes: All | Array, - snapshotChannels: Array, - tasks: Array> +export function shouldInterrupt( + checkpoint: Checkpoint, + interruptNodes: All | N[], + tasks: PregelExecutableTask[] ): boolean { - const anySnapshotChannelUpdated = snapshotChannels.some( - (chan) => - getChannelVersion(checkpoint, chan as string) > - getVersionSeen(checkpoint, INTERRUPT, chan as string) + const versionValues = Object.values(checkpoint.channel_versions); + const versionType = + versionValues.length > 0 ? typeof versionValues[0] : undefined; + let nullVersion: number | string; + if (versionType === "number") { + nullVersion = 0; + } else if (versionType === "string") { + nullVersion = ""; + } + const seen = checkpoint.versions_seen[INTERRUPT] || {}; + + const anyChannelUpdated = Object.entries(checkpoint.channel_versions).some( + ([chan, version]) => { + return version > (seen[chan] ?? nullVersion); + } ); - const anyTaskNodeInInterruptNodes = tasks.some((task) => + + const anyTriggeredNodeInInterruptNodes = tasks.some((task) => interruptNodes === "*" ? !task.config?.tags?.includes(TAG_HIDDEN) : interruptNodes.includes(task.name) ); - return anySnapshotChannelUpdated && anyTaskNodeInInterruptNodes; + + return anyChannelUpdated && anyTriggeredNodeInInterruptNodes; } export function _localRead>( checkpoint: ReadonlyCheckpoint, channels: Cc, - writes: Array<[keyof Cc, unknown]>, + task: WritesProtocol, select: Array | keyof Cc, fresh: boolean = false ): Record | unknown { @@ -114,7 +134,7 @@ export function _localRead>( // create a new copy of channels const newChannels = emptyChannels(channels, newCheckpoint); // Note: _applyWrites contains side effects - _applyWrites(copyCheckpoint(newCheckpoint), newChannels, writes); + _applyWrites(copyCheckpoint(newCheckpoint), newChannels, [task]); return readChannels(newChannels, select); } else { return readChannels(channels, select); @@ -153,8 +173,47 @@ export function _localWrite( export function _applyWrites>( checkpoint: Checkpoint, channels: Cc, - pendingWrites: PendingWrite[] + pendingTasks: WritesProtocol[], + getNextVersion?: (current: number | undefined, channel: BaseChannel) => number ): void { + // Update seen versions + for (const task of pendingTasks) { + if (!checkpoint.versions_seen[task.name]) { + checkpoint.versions_seen[task.name] = {}; + } + for (const chan of task.triggers) { + if (chan in checkpoint.channel_versions) { + checkpoint.versions_seen[task.name][chan] = + checkpoint.channel_versions[chan]; + } + } + } + + // Find the highest version of all channels + let maxVersion = + Object.values(checkpoint.channel_versions).length > 0 + ? Math.max(...Object.values(checkpoint.channel_versions)) + : undefined; + + // Consume all channels that were read + const readChannels = new Set( + pendingTasks + .flatMap((task) => task.triggers) + .filter((chan) => !RESERVED.includes(chan)) + ); + + for (const chan of readChannels) { + if (channels[chan].consume()) { + if (getNextVersion !== undefined) { + checkpoint.channel_versions[chan] = getNextVersion( + maxVersion, + channels[chan] + ); + } + } + } + + // clear pending sends if (checkpoint.pending_sends) { checkpoint.pending_sends = []; } @@ -163,26 +222,28 @@ export function _applyWrites>( PendingWriteValue[] >; // Group writes by channel - for (const [chan, val] of pendingWrites) { - if (chan === TASKS) { - checkpoint.pending_sends.push({ - node: (val as Send).node, - args: (val as Send).args, - }); - } else { - if (chan in pendingWriteValuesByChannel) { - pendingWriteValuesByChannel[chan].push(val); + for (const pendingTask of pendingTasks) { + for (const [chan, val] of pendingTask.writes) { + if (chan === TASKS) { + checkpoint.pending_sends.push({ + node: (val as Send).node, + args: (val as Send).args, + }); } else { - pendingWriteValuesByChannel[chan] = [val]; + if (chan in pendingWriteValuesByChannel) { + pendingWriteValuesByChannel[chan].push(val); + } else { + pendingWriteValuesByChannel[chan] = [val]; + } } } } // find the highest version of all channels - let maxVersion = 0; - if (Object.keys(checkpoint.channel_versions).length > 0) { - maxVersion = Math.max(...Object.values(checkpoint.channel_versions)); - } + maxVersion = + Object.values(checkpoint.channel_versions).length > 0 + ? Math.max(...Object.values(checkpoint.channel_versions)) + : undefined; const updatedChannels: Set = new Set(); // Apply writes to channels @@ -201,7 +262,7 @@ export function _applyWrites>( } // side effect: update checkpoint channel versions - checkpoint.channel_versions[chan] = maxVersion + 1; + checkpoint.channel_versions[chan] = (maxVersion ?? 0) + 1; updatedChannels.add(chan); } else { @@ -218,6 +279,13 @@ export function _applyWrites>( } } +export type NextTaskExtraFields = { + step: number; + isResuming?: boolean; + checkpointer?: BaseCheckpointSaver; + manager?: CallbackManagerForChainRun; +}; + export function _prepareNextTasks< Nn extends StrRecord, Cc extends StrRecord @@ -227,7 +295,7 @@ export function _prepareNextTasks< channels: Cc, config: RunnableConfig, forExecution: false, - extra: { step: number } + extra: NextTaskExtraFields ): [Checkpoint, Array]; export function _prepareNextTasks< @@ -239,7 +307,7 @@ export function _prepareNextTasks< channels: Cc, config: RunnableConfig, forExecution: true, - extra: { step: number } + extra: NextTaskExtraFields ): [Checkpoint, Array>]; export function _prepareNextTasks< @@ -251,7 +319,7 @@ export function _prepareNextTasks< channels: Cc, config: RunnableConfig, forExecution: boolean, - extra: { step: number } + extra: NextTaskExtraFields ): [ Checkpoint, PregelTaskDescription[] | PregelExecutableTask[] @@ -260,6 +328,7 @@ export function _prepareNextTasks< const newCheckpoint = copyCheckpoint(checkpoint); const tasks: Array> = []; const taskDescriptions: Array = []; + const { step, isResuming = false, checkpointer, manager } = extra; for (const packet of checkpoint.pending_sends) { if (!_isSendInterface(packet)) { @@ -280,7 +349,7 @@ export function _prepareNextTasks< if (node !== undefined) { const triggers = [TASKS]; const metadata = { - langgraph_step: extra.step, + langgraph_step: step, langgraph_node: packet.node, langgraph_triggers: triggers, langgraph_task_idx: tasks.length, @@ -301,12 +370,12 @@ export function _prepareNextTasks< writes, triggers, config: patchConfig( - mergeConfigs(proc.config, processes[packet.node].config, { + mergeConfigs(config, processes[packet.node].config, { metadata, }), { runName: packet.node, - // callbacks: + callbacks: manager?.getChild(`graph:step:${step}`), configurable: { [CONFIG_KEY_SEND]: _localWrite.bind( undefined, @@ -318,7 +387,11 @@ export function _prepareNextTasks< undefined, checkpoint, channels, - writes as Array<[string, unknown]> + { + name: packet.node, + writes: writes as Array<[string, unknown]>, + triggers, + } ), }, } @@ -419,7 +492,7 @@ export function _prepareNextTasks< const node = proc.getNode(); if (node !== undefined) { const metadata = { - langgraph_step: extra.step, + langgraph_step: step, langgraph_node: name, langgraph_triggers: proc.triggers, langgraph_task_idx: tasks.length, @@ -439,25 +512,35 @@ export function _prepareNextTasks< proc: node, writes, triggers: proc.triggers, - config: patchConfig(mergeConfigs(proc.config, { metadata }), { - runName: name, - configurable: { - [CONFIG_KEY_SEND]: _localWrite.bind( - undefined, - (items: [keyof Cc, unknown][]) => writes.push(...items), - processes, - channels - ), - [CONFIG_KEY_READ]: _localRead.bind( - undefined, - checkpoint, - channels, - writes as Array<[string, unknown]> - ), - checkpoint_id: checkpoint.id, - checkpoint_ns: checkpointNamespace, - }, - }), + config: patchConfig( + mergeConfigs(config, proc.config, { metadata }), + { + runName: name, + callbacks: manager?.getChild(`graph:step:${step}`), + configurable: { + [CONFIG_KEY_SEND]: _localWrite.bind( + undefined, + (items: [keyof Cc, unknown][]) => writes.push(...items), + processes, + channels + ), + [CONFIG_KEY_READ]: _localRead.bind( + undefined, + checkpoint, + channels, + { + name, + writes: writes as Array<[string, unknown]>, + triggers: proc.triggers, + } + ), + [CONFIG_KEY_CHECKPOINTER]: checkpointer, + [CONFIG_KEY_RESUMING]: isResuming, + checkpoint_id: checkpoint.id, + checkpoint_ns: checkpointNamespace, + }, + } + ), id: taskId, }); } diff --git a/langgraph/src/pregel/debug.ts b/langgraph/src/pregel/debug.ts index 42c8ed23a..145b5ff49 100644 --- a/langgraph/src/pregel/debug.ts +++ b/langgraph/src/pregel/debug.ts @@ -1,6 +1,11 @@ +import { RunnableConfig } from "@langchain/core/runnables"; import { BaseChannel } from "../channels/base.js"; +import { CheckpointMetadata } from "../checkpoint/types.js"; +import { uuid5 } from "../checkpoint/id.js"; +import { TAG_HIDDEN, TASK_NAMESPACE } from "../constants.js"; import { EmptyChannelError } from "../errors.js"; import { PregelExecutableTask } from "./types.js"; +import { readChannels } from "./io.js"; type ConsoleColors = { start: string; @@ -16,6 +21,14 @@ const COLORS_MAP: ConsoleColorMap = { start: "\x1b[34m", end: "\x1b[0m", }, + green: { + start: "\x1b[32m", + end: "\x1b[0m", + }, + yellow: { + start: "\x1b[33;1m", + end: "\x1b[0m", + }, }; /** @@ -24,36 +37,20 @@ const COLORS_MAP: ConsoleColorMap = { const wrap = (color: ConsoleColors, text: string): string => `${color.start}${text}${color.end}`; -export function printStepStart( - step: number, - nextTasks: readonly PregelExecutableTask[] -): void { - const nTasks = nextTasks.length; - console.log( - `${wrap(COLORS_MAP.blue, "[langgraph/step]")}`, - `Starting step ${step} with ${nTasks} task${ - nTasks === 1 ? "" : "s" - }. Next tasks:\n`, - `\n${nextTasks - .map( - (task) => `${String(task.name)}(${JSON.stringify(task.input, null, 2)})` - ) - .join("\n")}` - ); -} - export function printCheckpoint( step: number, channels: Record> ) { console.log( - `${wrap(COLORS_MAP.blue, "[langgraph/checkpoint]")}`, - `Finishing step ${step}. Channel values:\n`, - `\n${JSON.stringify( - Object.fromEntries(_readChannels(channels)), - null, - 2 - )}` + [ + `${wrap(COLORS_MAP.blue, "[langgraph/checkpoint]")}`, + `Finishing step ${step}. Channel values:\n`, + `\n${JSON.stringify( + Object.fromEntries(_readChannels(channels)), + null, + 2 + )}`, + ].join("") ); } @@ -75,3 +72,158 @@ function* _readChannels( } } } + +export function* mapDebugTasks( + step: number, + tasks: readonly PregelExecutableTask[] +) { + const ts = new Date().toISOString(); + for (const { name, input, config, triggers } of tasks) { + if (config?.tags?.includes(TAG_HIDDEN)) continue; + + const metadata = { ...config?.metadata }; + delete metadata.checkpoint_id; + + yield { + type: "task", + timestamp: ts, + step, + payload: { + id: uuid5(JSON.stringify([name, step, metadata]), TASK_NAMESPACE), + name, + input, + triggers, + }, + }; + } +} + +export function* mapDebugTaskResults< + N extends PropertyKey, + C extends PropertyKey +>( + step: number, + tasks: readonly PregelExecutableTask[], + streamChannelsList: Array +) { + const ts = new Date().toISOString(); + for (const { name, writes, config } of tasks) { + if (config?.tags?.includes(TAG_HIDDEN)) continue; + + const metadata = { ...config?.metadata }; + delete metadata.checkpoint_id; + + yield { + type: "task_result", + timestamp: ts, + step, + payload: { + id: uuid5(JSON.stringify([name, step, metadata]), TASK_NAMESPACE), + name, + result: writes.filter(([channel]) => + streamChannelsList.includes(channel) + ), + }, + }; + } +} + +export function* mapDebugCheckpoint( + step: number, + config: RunnableConfig, + channels: Record, + streamChannels: string | string[], + metadata: CheckpointMetadata +) { + function getCurrentUTC() { + const now = new Date(); + return new Date(now.getTime() - now.getTimezoneOffset() * 60 * 1000); + } + + const ts = getCurrentUTC().toISOString(); + yield { + type: "checkpoint", + timestamp: ts, + step, + payload: { + config, + values: readChannels(channels, streamChannels), + metadata, + }, + }; +} + +export function printStepCheckpoint( + step: number, + channels: Record>, + whitelist: string[] +): void { + console.log( + [ + `${wrap(COLORS_MAP.blue, `[${step}:checkpoint]`)}`, + `\x1b[1m State at the end of step ${step}:\x1b[0m\n`, + JSON.stringify(readChannels(channels, whitelist), null, 2), + ].join("") + ); +} + +export function printStepTasks( + step: number, + nextTasks: readonly PregelExecutableTask[] +): void { + const nTasks = nextTasks.length; + console.log( + [ + `${wrap(COLORS_MAP.blue, `[${step}:tasks]`)}`, + `\x1b[1m Starting step ${step} with ${nTasks} task${ + nTasks === 1 ? "" : "s" + }:\x1b[0m\n`, + nextTasks + .map( + (task) => + `- ${wrap(COLORS_MAP.green, String(task.name))} -> ${JSON.stringify( + task.input, + null, + 2 + )}` + ) + .join("\n"), + ].join("") + ); +} + +export function printStepWrites( + step: number, + // eslint-disable-next-line @typescript-eslint/no-explicit-any + writes: Array<[string, any]>, + whitelist: string[] +): void { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const byChannel: Record = {}; + + for (const [channel, value] of writes) { + if (whitelist.includes(channel)) { + if (!byChannel[channel]) { + byChannel[channel] = []; + } + byChannel[channel].push(value); + } + } + + console.log( + [ + `${wrap(COLORS_MAP.blue, `[${step}:writes]`)}`, + `\x1b[1m Finished step ${step} with writes to ${ + Object.keys(byChannel).length + } channel${Object.keys(byChannel).length !== 1 ? "s" : ""}:\x1b[0m\n`, + Object.entries(byChannel) + .map( + ([name, vals]) => + `- ${wrap(COLORS_MAP.yellow, name)} -> ${vals + .map((v) => JSON.stringify(v)) + .join(", ")}` + ) + .join("\n"), + ].join("") + ); +} diff --git a/langgraph/src/pregel/index.ts b/langgraph/src/pregel/index.ts index 132b0fb47..a3db7f611 100644 --- a/langgraph/src/pregel/index.ts +++ b/langgraph/src/pregel/index.ts @@ -7,11 +7,9 @@ import { RunnableSequence, _coerceToRunnable, ensureConfig, - mergeConfigs, + getCallbackManagerForConfig, patchConfig, } from "@langchain/core/runnables"; -import { CallbackManagerForChainRun } from "@langchain/core/callbacks/manager"; -import { IterableReadableStream } from "@langchain/core/utils/stream"; import { BaseChannel, createCheckpoint, @@ -19,32 +17,33 @@ import { } from "../channels/base.js"; import { BaseCheckpointSaver, + CheckpointListOptions, copyCheckpoint, emptyCheckpoint, } from "../checkpoint/base.js"; import { PregelNode } from "./read.js"; import { validateGraph, validateKeys } from "./validate.js"; +import { mapOutputUpdates, readChannels } from "./io.js"; import { - mapInput, - mapOutputUpdates, - mapOutputValues, - mapDebugTasks, - readChannels, - single, mapDebugTaskResults, -} from "./io.js"; + printStepCheckpoint, + printStepTasks, + printStepWrites, +} from "./debug.js"; import { ChannelWrite, ChannelWriteEntry, PASSTHROUGH } from "./write.js"; import { + CONFIG_KEY_CHECKPOINTER, CONFIG_KEY_READ, CONFIG_KEY_SEND, INTERRUPT, - TASKS, } from "../constants.js"; import { All, - PendingWrite, PregelExecutableTask, + PregelInterface, + PregelParams, StateSnapshot, + StreamMode, } from "./types.js"; import { GraphRecursionError, @@ -54,13 +53,14 @@ import { import { executeTasks, _prepareNextTasks, - _shouldInterrupt, _localRead, _applyWrites, + StrRecord, } from "./algo.js"; import { uuid5 } from "../checkpoint/id.js"; - -const DEFAULT_LOOP_LIMIT = 25; +import { prefixGenerator } from "../utils.js"; +import { _coerceToDict, getNewChannelVersions } from "./utils.js"; +import { PregelLoop } from "./loop.js"; type WriteValue = Runnable | RunnableFunc | unknown; @@ -68,14 +68,6 @@ function isString(value: unknown): value is string { return typeof value === "string"; } -function* prefixGenerator( - generator: Generator, - prefix: string | undefined -) { - if (!prefix) yield* generator; - for (const value of generator) yield [prefix, value]; -} - export class Channel { static subscribeTo( channels: string, @@ -164,65 +156,23 @@ export class Channel { } } -export type StreamMode = "values" | "updates" | "debug"; - /** - * Construct a type with a set of properties K of type T + * Config for executing the graph. */ -type StrRecord = { - [P in K]: T; -}; - -export interface PregelInterface< - Nn extends StrRecord, - Cc extends StrRecord -> { - nodes: Nn; - - channels: Cc; - - inputs: keyof Cc | Array; - - outputs: keyof Cc | Array; - /** - * @default true - */ - autoValidate?: boolean; - /** - * @default "values" - */ - streamMode?: StreamMode | StreamMode[]; - - streamChannels?: keyof Cc | Array; - /** - * @default [] - */ - interruptAfter?: Array | All; - /** - * @default [] - */ - interruptBefore?: Array | All; - /** - * @default undefined - */ - stepTimeout?: number; - /** - * @default false - */ - debug?: boolean; - - checkpointer?: BaseCheckpointSaver; -} - export interface PregelOptions< Nn extends StrRecord, Cc extends StrRecord > extends RunnableConfig { + /** The stream mode for the graph run. Default is ["values"]. */ streamMode?: StreamMode | StreamMode[]; inputKeys?: keyof Cc | Array; + /** The output keys to retrieve from the graph run. */ outputKeys?: keyof Cc | Array; + /** The nodes to interrupt the graph run before. */ interruptBefore?: All | Array; + /** The nodes to interrupt the graph run after. */ interruptAfter?: All | Array; + /** Enable debug mode for the graph run. */ debug?: boolean; } @@ -250,9 +200,9 @@ export class Pregel< channels: Cc; - inputs: keyof Cc | Array; + inputChannels: keyof Cc | Array; - outputs: keyof Cc | Array; + outputChannels: keyof Cc | Array; autoValidate: boolean = true; @@ -270,7 +220,7 @@ export class Pregel< checkpointer?: BaseCheckpointSaver; - constructor(fields: PregelInterface) { + constructor(fields: PregelParams) { super(fields); let { streamMode } = fields; @@ -282,18 +232,15 @@ export class Pregel< this.channels = fields.channels; this.autoValidate = fields.autoValidate ?? this.autoValidate; this.streamMode = streamMode ?? this.streamMode; - this.outputs = fields.outputs; + this.inputChannels = fields.inputChannels; + this.outputChannels = fields.outputChannels; this.streamChannels = fields.streamChannels ?? this.streamChannels; this.interruptAfter = fields.interruptAfter; this.interruptBefore = fields.interruptBefore; - this.inputs = fields.inputs; this.stepTimeout = fields.stepTimeout ?? this.stepTimeout; this.debug = fields.debug ?? this.debug; this.checkpointer = fields.checkpointer; - // Bind the method to the instance - this._transform = this._transform.bind(this); - if (this.autoValidate) { this.validate(); } @@ -303,8 +250,8 @@ export class Pregel< validateGraph({ nodes: this.nodes, channels: this.channels, - outputChannels: this.outputs, - inputChannels: this.inputs, + outputChannels: this.outputChannels, + inputChannels: this.inputChannels, streamChannels: this.streamChannels, interruptAfterNodes: this.interruptAfter, interruptBeforeNodes: this.interruptBefore, @@ -339,8 +286,7 @@ export class Pregel< const saved = await this.checkpointer.getTuple(config); const checkpoint = saved ? saved.checkpoint : emptyCheckpoint(); const channels = emptyChannels(this.channels, checkpoint); - // eslint-disable-next-line @typescript-eslint/no-unused-vars - const [_, nextTasks] = _prepareNextTasks( + const [, nextTasks] = _prepareNextTasks( checkpoint, this.nodes, channels, @@ -360,16 +306,14 @@ export class Pregel< async *getStateHistory( config: RunnableConfig, - limit?: number, - before?: RunnableConfig + options?: CheckpointListOptions ): AsyncIterableIterator { if (!this.checkpointer) { throw new GraphValueError("No checkpointer set"); } - for await (const saved of this.checkpointer.list(config, limit, before)) { + for await (const saved of this.checkpointer.list(config, options)) { const channels = emptyChannels(this.channels, saved.checkpoint); - // eslint-disable-next-line @typescript-eslint/no-unused-vars - const [_, nextTasks] = _prepareNextTasks( + const [, nextTasks] = _prepareNextTasks( saved.checkpoint, this.nodes, channels, @@ -402,41 +346,88 @@ export class Pregel< const checkpoint = saved ? copyCheckpoint(saved.checkpoint) : emptyCheckpoint(); - // Find last that updated the state, if not provided - const maxSeens = Object.entries(checkpoint.versions_seen).reduce( - (acc, [node, versions]) => { - const maxSeen = Math.max(...Object.values(versions)); - if (maxSeen) { - if (!acc[maxSeen]) { - acc[maxSeen] = []; - } - acc[maxSeen].push(node); - } - return acc; + const checkpointPreviousVersions = saved?.checkpoint.channel_versions ?? {}; + const step = saved?.metadata?.step ?? -1; + + // merge configurable fields with previous checkpoint config + const checkpointConfig = { + ...config, + configurable: { + ...config.configurable, + // TODO: add proper support for updating nested subgraph state + checkpoint_ns: "", + ...saved?.config.configurable, }, - {} as Record - ); - if (!asNode && !Object.keys(maxSeens).length) { - if (!Array.isArray(this.inputs) && this.inputs in this.nodes) { - asNode = this.inputs as keyof Nn; + }; + + // Find last node that updated the state, if not provided + if (values === undefined && asNode === undefined) { + return await this.checkpointer.put( + checkpointConfig, + createCheckpoint(checkpoint, {}, step), + { + source: "update", + step, + writes: {}, + }, + {} + ); + } + + const nonNullVersion = Object.values(checkpoint.versions_seen) + .map((seenVersions) => { + return Object.values(seenVersions); + }) + .flat() + .find((v) => !!v); + if (asNode === undefined && !nonNullVersion) { + if ( + typeof this.inputChannels === "string" && + this.nodes[this.inputChannels] !== undefined + ) { + asNode = this.inputChannels; } - } else if (!asNode) { - const maxSeen = Math.max(...Object.keys(maxSeens).map(Number)); - const nodes = maxSeens[maxSeen]; - if (nodes.length === 1) { - asNode = nodes[0] as keyof Nn; + } else if (asNode === undefined) { + // TODO: Double check + const lastSeenByNode = Object.entries(checkpoint.versions_seen) + .map(([n, seen]) => { + return Object.values(seen).map((v) => { + return [v, n] as const; + }); + }) + .flat() + .sort(([aNumber], [bNumber]) => { + return aNumber - bNumber; + }); + // if two nodes updated the state at the same time, it's ambiguous + if (lastSeenByNode) { + if (lastSeenByNode.length === 1) { + // eslint-disable-next-line prefer-destructuring + asNode = lastSeenByNode[0][1]; + } else if ( + lastSeenByNode[lastSeenByNode.length - 1][0] !== + lastSeenByNode[lastSeenByNode.length - 2][0] + ) { + // eslint-disable-next-line prefer-destructuring + asNode = lastSeenByNode[lastSeenByNode.length - 1][1]; + } } } - if (!asNode) { - throw new InvalidUpdateError("Ambiguous update, specify as_node"); + if (asNode === undefined) { + throw new InvalidUpdateError(`Ambiguous update, specify "asNode"`); + } + if (this.nodes[asNode] === undefined) { + throw new InvalidUpdateError( + `Node "${asNode.toString()}" does not exist` + ); } // update channels const channels = emptyChannels(this.channels, checkpoint); - // create task to run all writers of the chosen node + // run all writers of the chosen node const writers = this.nodes[asNode].getWriters(); if (!writers.length) { throw new InvalidUpdateError( - `No writers found for node ${asNode as string}` + `No writers found for node "${asNode.toString()}"` ); } const task: PregelExecutableTask = { @@ -450,6 +441,7 @@ export class Pregel< config: undefined, id: uuid5(INTERRUPT, checkpoint.id), }; + // execute task await task.proc.invoke( task.input, @@ -462,49 +454,47 @@ export class Pregel< undefined, checkpoint, channels, - task.writes as Array<[string, unknown]> + // TODO: Why does keyof StrRecord allow number and symbol? + task as PregelExecutableTask ), }, }) ); + // apply to checkpoint and save - _applyWrites(checkpoint, channels, task.writes); - const step = (saved?.metadata?.step ?? -2) + 1; - let checkpointConfig: RunnableConfig = { - ...config, - configurable: { - ...(config?.configurable ?? {}), - // TODO: add proper support for updating nested subgraph state - checkpoint_ns: "", - }, - }; - if (saved !== undefined) { - checkpointConfig = { - configurable: { - ...(config?.configurable ?? {}), - ...saved.config.configurable, - }, - }; - } + // TODO: Why does keyof StrRecord allow number and symbol? + _applyWrites( + checkpoint, + channels, + [task as PregelExecutableTask], + this.checkpointer.getNextVersion.bind(this.checkpointer) + ); + + const newVersions = getNewChannelVersions( + checkpointPreviousVersions, + checkpoint.channel_versions + ); return await this.checkpointer.put( checkpointConfig, - createCheckpoint(checkpoint, channels, step), + createCheckpoint(checkpoint, channels, step + 1), { source: "update", - step, + step: step + 1, writes: { [asNode]: values }, - } + }, + newVersions ); } _defaults(config: PregelOptions): [ boolean, // debug StreamMode[], // stream mode - keyof Cc | Array, // input keys - keyof Cc | Array, // output keys + string | string[], // input keys + string | string[], // output keys RunnableConfig, // config without pregel keys - All | Array, // interrupt before - All | Array // interrupt after, + All | string[], // interrupt before + All | string[], // interrupt after + BaseCheckpointSaver | undefined ] { const { debug, @@ -526,7 +516,7 @@ export class Pregel< let defaultInputKeys = inputKeys; if (defaultInputKeys === undefined) { - defaultInputKeys = this.inputs; + defaultInputKeys = this.inputChannels; } else { validateKeys(defaultInputKeys, this.channels); } @@ -543,230 +533,131 @@ export class Pregel< defaultStreamMode = this.streamMode; } + let defaultCheckpointer: BaseCheckpointSaver | undefined; if ( config.configurable !== undefined && config.configurable[CONFIG_KEY_READ] !== undefined ) { defaultStreamMode = ["values"]; } + if ( + config !== undefined && + config.configurable?.[CONFIG_KEY_CHECKPOINTER] !== undefined && + (defaultInterruptAfter.length > 0 || defaultInterruptBefore.length > 0) + ) { + defaultCheckpointer = config.configurable[CONFIG_KEY_CHECKPOINTER]; + } else { + defaultCheckpointer = this.checkpointer; + } return [ defaultDebug, defaultStreamMode, - defaultInputKeys, - defaultOutputKeys, + defaultInputKeys as string | string[], + defaultOutputKeys as string | string[], rest, - defaultInterruptBefore, - defaultInterruptAfter, + defaultInterruptBefore as All | string[], + defaultInterruptAfter as All | string[], + defaultCheckpointer, ]; } - async *_transform( - input: AsyncGenerator, - runManager?: CallbackManagerForChainRun, - config: PregelOptions = {} + async *_streamIterator( + input: PregelInputType, + options?: Partial> ): AsyncGenerator { - const bg: Promise[] = []; + const inputConfig = ensureConfig(options); + if ( + inputConfig.recursionLimit === undefined || + inputConfig.recursionLimit < 1 + ) { + throw new Error(`Passed "recursionLimit" must be at least 1.`); + } + if ( + this.checkpointer !== undefined && + inputConfig.configurable === undefined + ) { + throw new Error( + `Checkpointer requires one or more of the following "configurable" keys: "thread_id", "checkpoint_ns", "checkpoint_id"` + ); + } + const callbackManager = await getCallbackManagerForConfig(inputConfig); + const runManager = await callbackManager?.handleChainStart( + this.toJSON(), + _coerceToDict(input, "input"), + inputConfig.runId, + undefined, + undefined, + undefined, + inputConfig?.runName ?? this.getName() + ); + delete inputConfig.runId; + // assign defaults + const [ + debug, + streamMode, + , + outputKeys, + config, + interruptBefore, + interruptAfter, + checkpointer, + ] = this._defaults(inputConfig); + let loop; try { - if (config.recursionLimit && config.recursionLimit < 1) { - throw new GraphValueError( - `Recursion limit must be greater than 0, got ${config.recursionLimit}` - ); - } - if (this.checkpointer && !config.configurable) { - throw new GraphValueError( - `Checkpointer requires one or more of the following 'configurable' keys: thread_id, checkpoint_ns, checkpoint_id` - ); - } - // assign defaults - const [ - debug, - streamMode, - inputKeys, - outputKeys, - restConfig, - interruptBefore, - interruptAfter, - ] = this._defaults(config); - // copy nodes to ignore mutations during execution - const processes = { ...this.nodes }; - // get checkpoint, or create an empty one - const saved = this.checkpointer - ? await this.checkpointer.getTuple(config) - : null; - let checkpoint = saved ? saved.checkpoint : emptyCheckpoint(); - let checkpointConfig = saved ? saved.config : config; - if ( - this.checkpointer && - checkpointConfig.configurable?.checkpoint_ns === undefined + loop = await PregelLoop.initialize({ + input, + config, + checkpointer, + graph: this, + }); + while ( + await loop.tick({ + outputKeys, + interruptAfter, + interruptBefore, + manager: runManager, + }) ) { - checkpointConfig.configurable = { - ...checkpointConfig.configurable, - checkpoint_ns: "", - }; - } - let start = (saved?.metadata?.step ?? -2) + 1; - // create channels from checkpoint - const channels = emptyChannels(this.channels, checkpoint); - // map inputs to channel updates - const inputPendingWrites: PendingWrite[] = []; - for await (const c of input) { - for (const value of mapInput(inputKeys, c)) { - inputPendingWrites.push(value); - } - } - if (inputPendingWrites.length) { - // discard any unfinished tasks from previous checkpoint - const discarded = _prepareNextTasks( - checkpoint, - processes, - channels, - config, - true, - { step: -1 } - ); - checkpoint = discarded[0]; // eslint-disable-line prefer-destructuring - // apply input writes - _applyWrites(checkpoint, channels, inputPendingWrites); - // save input checkpoint - if (this.checkpointer) { - checkpoint = createCheckpoint(checkpoint, channels, start); - bg.push( - this.checkpointer.put(checkpointConfig, checkpoint, { - source: "input", - step: start, - writes: Object.fromEntries(inputPendingWrites), - }) + if (debug) { + printStepCheckpoint( + loop.checkpointMetadata.step, + loop.channels, + this.streamChannelsList as string[] ); - checkpointConfig = { - configurable: { - ...checkpointConfig.configurable, - checkpoint_id: checkpoint.id, - }, - }; } - // increment start to 0 - start += 1; - } else { - checkpoint = copyCheckpoint(checkpoint); - for (const k of this.streamChannelsList) { - const version = checkpoint.channel_versions[k as string] ?? 0; - if (!checkpoint.versions_seen[INTERRUPT]) { - checkpoint.versions_seen[INTERRUPT] = {}; + while (loop.stream.length > 0) { + const nextItem = loop.stream.shift(); + if (nextItem === undefined) { + throw new Error("Data structure error."); + } + if (streamMode.includes(nextItem[0])) { + if (streamMode.length === 1) { + yield nextItem[1]; + } else { + yield nextItem; + } } - checkpoint.versions_seen[INTERRUPT][k as string] = version; - } - } - - // Similarly to Bulk Synchronous Parallel / Pregel model - // computation proceeds in steps, while there are channel updates - // channel updates from step N are only visible in step N+1 - // channels are guaranteed to be immutable for the duration of the step, - // with channel updates applied only at the transition between steps - const stop = start + (config.recursionLimit ?? DEFAULT_LOOP_LIMIT); - for (let step = start; step < stop + 1; step += 1) { - const [nextCheckpoint, nextTasks] = _prepareNextTasks( - checkpoint, - processes, - channels, - config, - true, - { step } - ); - - // if no more tasks, we're done - if (nextTasks.length === 0 && step === start) { - throw new GraphValueError(`No tasks to run in graph.`); - } else if (nextTasks.length === 0) { - break; - } else if (step === stop) { - throw new GraphRecursionError( - `Recursion limit of ${config.recursionLimit} reached without hitting a stop condition. You can increase the limit by setting the "recursionLimit" config key.` - ); - } - - // before execution, check if we should interrupt - if ( - _shouldInterrupt( - checkpoint, - interruptBefore, - this.streamChannelsList, - nextTasks - ) - ) { - break; - } else { - checkpoint = nextCheckpoint; - } - - // produce debug stream mode event - if (streamMode.includes("debug")) { - yield* prefixGenerator( - mapDebugTasks(step, nextTasks), - streamMode.length > 1 ? "debug" : undefined - ); } - if (debug) { - console.log(nextTasks); + printStepTasks(loop.step, loop.tasks); } - - const tasksWithConfig = nextTasks.map( - // eslint-disable-next-line no-loop-func - (task, i) => - [ - task.proc, - task.input, - patchConfig( - mergeConfigs(restConfig, processes[task.name].config, { - metadata: { - langgraph_step: step, - langgraph_node: task.name, - langgraph_triggers: [TASKS], - langgraph_task_idx: i, - }, - }), - { - callbacks: runManager?.getChild(`graph:step:${step}`), - runName: task.name as string, - configurable: { - ...config.configurable, - [CONFIG_KEY_SEND]: (items: [keyof Cc, unknown][]) => - task.writes.push(...items), - [CONFIG_KEY_READ]: _localRead.bind( - undefined, - checkpoint, - channels, - task.writes as Array<[string, unknown]> - ), - }, - } - ), - ] as const - ); - // execute tasks, and wait for one to fail or all to finish. // each task is independent from all other concurrent tasks - const tasks = tasksWithConfig.map( - ([proc, input, updatedConfig]) => - () => - proc.invoke(input, updatedConfig) - ); + // yield updates/debug output as each task finishes + const tasks = loop.tasks.map((pregelTask) => () => { + return pregelTask.proc.invoke(pregelTask.input, pregelTask.config); + }); await executeTasks(tasks, this.stepTimeout, config.signal); - // combine pending writes from all tasks - const pendingWrites: PendingWrite[] = []; - for (const task of nextTasks) { - pendingWrites.push(...task.writes); + for (const task of loop.tasks) { + loop.putWrites(task.id, task.writes); } - // apply writes to channels - _applyWrites(checkpoint, channels, pendingWrites); - if (streamMode.includes("updates")) { // TODO: Refactor - for await (const task of nextTasks) { + for await (const task of loop.tasks) { yield* prefixGenerator( mapOutputUpdates(outputKeys, [task]), streamMode.length > 1 ? "updates" : undefined @@ -774,103 +665,75 @@ export class Pregel< } } - // yield current value and checkpoint view - if (streamMode.includes("values")) { - yield* prefixGenerator( - mapOutputValues(outputKeys, pendingWrites, channels), - streamMode.length > 1 ? "values" : undefined - ); - } - if (streamMode.includes("debug")) { yield* prefixGenerator( - mapDebugTaskResults(step, nextTasks, this.streamChannelsList), + mapDebugTaskResults(loop.step, loop.tasks, this.streamChannelsList), streamMode.length > 1 ? "debug" : undefined ); } - // save end of step checkpoint - if (this.checkpointer) { - checkpoint = createCheckpoint(checkpoint, channels, step); - bg.push( - this.checkpointer.put(checkpointConfig, checkpoint, { - source: "loop", - step, - writes: single( - this.streamMode.includes("values") - ? mapOutputValues(outputKeys, pendingWrites, channels) - : mapOutputUpdates(outputKeys, nextTasks) - ), - }) + if (debug) { + printStepWrites( + loop.step, + loop.tasks.map((task) => task.writes).flat(), + this.streamChannelsList as string[] ); - checkpointConfig = { - configurable: { - ...checkpointConfig.configurable, - checkpoint_id: checkpoint.id, - }, - }; } - - if ( - _shouldInterrupt( - checkpoint, - interruptAfter, - this.streamChannelsList, - nextTasks - ) - ) { - break; + } + while (loop.stream.length > 0) { + const nextItem = loop.stream.shift(); + if (nextItem === undefined) { + throw new Error("Data structure error."); + } + if (streamMode.includes(nextItem[0])) { + if (streamMode.length === 1) { + yield nextItem[1]; + } else { + yield nextItem; + } } } + if (loop.status === "out_of_steps") { + throw new GraphRecursionError( + [ + `Recursion limit of ${config.recursionLimit} reached`, + "without hitting a stop condition. You can increase the", + `limit by setting the "recursionLimit" config key.`, + ].join(" ") + ); + } + await runManager?.handleChainEnd(readChannels(loop.channels, outputKeys)); + } catch (e) { + await runManager?.handleChainError(e); + throw e; } finally { - await Promise.all(bg); + await loop?.backgroundTasksPromise; } } + /** + * Run the graph with a single input and config. + * @param input + * @param options + */ async invoke( input: PregelInputType, - options?: PregelOptions + options?: Partial> ): Promise { - const config = ensureConfig(options); - if (!config?.outputKeys) { - config.outputKeys = this.outputs; - } - if (!config?.streamMode) { - config.streamMode = "values"; - } - - let latest: PregelOutputType | undefined; - for await (const chunk of await this.stream(input, config)) { - latest = chunk; - } - if (!latest) { - return undefined as PregelOutputType; + const streamMode = options?.streamMode ?? "values"; + const config = { + ...ensureConfig(options), + outputKeys: options?.outputKeys ?? this.outputChannels, + streamMode, + }; + const chunks = []; + const stream = await this.stream(input, config); + for await (const chunk of stream) { + chunks.push(chunk); } - return latest; - } - - async stream( - input: PregelInputType, - config?: PregelOptions - ): Promise> { - const inputIterator: AsyncGenerator = (async function* () { - yield input; - })(); - return IterableReadableStream.fromAsyncGenerator( - this.transform(inputIterator, config) - ); - } - - async *transform( - generator: AsyncGenerator, - config?: PregelOptions - ): AsyncGenerator { - for await (const chunk of this._transformStreamWithConfig( - generator, - this._transform, - config - )) { - yield chunk; + if (streamMode === "values") { + return chunks[chunks.length - 1]; } + return chunks; } } diff --git a/langgraph/src/pregel/io.ts b/langgraph/src/pregel/io.ts index 7b9beab94..8c951cd43 100644 --- a/langgraph/src/pregel/io.ts +++ b/langgraph/src/pregel/io.ts @@ -1,13 +1,13 @@ import { BaseChannel } from "../channels/base.js"; -import type { PendingWrite, PregelExecutableTask } from "./types.js"; -import { TAG_HIDDEN, TASK_NAMESPACE } from "../constants.js"; +import type { PregelExecutableTask } from "./types.js"; +import type { PendingWrite } from "../checkpoint/types.js"; +import { TAG_HIDDEN } from "../constants.js"; import { EmptyChannelError } from "../errors.js"; -import { uuid5 } from "../checkpoint/id.js"; export function readChannel( channels: Record, chan: C, - catch_: boolean = true, + catchErrors: boolean = true, returnException: boolean = false ): unknown | null { try { @@ -17,7 +17,7 @@ export function readChannel( if (e.name === EmptyChannelError.unminifiable_name) { if (returnException) { return e; - } else if (catch_) { + } else if (catchErrors) { return null; } } @@ -80,61 +80,6 @@ export function* mapInput( } } -export function* mapDebugTasks( - step: number, - tasks: readonly PregelExecutableTask[] -) { - const ts = new Date().toISOString(); - for (const { name, input, config, triggers } of tasks) { - if (config?.tags?.includes(TAG_HIDDEN)) continue; - - const metadata = { ...config?.metadata }; - delete metadata.checkpoint_id; - - yield { - type: "task", - timestamp: ts, - step, - payload: { - id: uuid5(JSON.stringify([name, step, metadata]), TASK_NAMESPACE), - name, - input, - triggers, - }, - }; - } -} - -export function* mapDebugTaskResults< - N extends PropertyKey, - C extends PropertyKey ->( - step: number, - tasks: readonly PregelExecutableTask[], - streamChannelsList: Array -) { - const ts = new Date().toISOString(); - for (const { name, writes, config } of tasks) { - if (config?.tags?.includes(TAG_HIDDEN)) continue; - - const metadata = { ...config?.metadata }; - delete metadata.checkpoint_id; - - yield { - type: "task_result", - timestamp: ts, - step, - payload: { - id: uuid5(JSON.stringify([name, step, metadata]), TASK_NAMESPACE), - name, - result: writes.filter(([channel]) => - streamChannelsList.includes(channel) - ), - }, - }; - } -} - /** * Map pending writes (a sequence of tuples (channel, value)) to output chunk. */ diff --git a/langgraph/src/pregel/loop.ts b/langgraph/src/pregel/loop.ts new file mode 100644 index 000000000..f79f1d5a3 --- /dev/null +++ b/langgraph/src/pregel/loop.ts @@ -0,0 +1,496 @@ +import Deque from "double-ended-queue"; +import type { RunnableConfig } from "@langchain/core/runnables"; +import type { CallbackManagerForChainRun } from "@langchain/core/callbacks/manager"; +import { + BaseCheckpointSaver, + Checkpoint, + CheckpointTuple, + copyCheckpoint, + emptyCheckpoint, +} from "../checkpoint/base.js"; +import { + BaseChannel, + createCheckpoint, + emptyChannels, +} from "../channels/base.js"; +import { PregelExecutableTask, PregelInterface, StreamMode } from "./types.js"; +import { + PendingWrite, + CheckpointPendingWrite, + CheckpointMetadata, + All, +} from "../checkpoint/types.js"; +import { + CONFIG_KEY_READ, + CONFIG_KEY_RESUMING, + INPUT, + INTERRUPT, +} from "../constants.js"; +import { + _applyWrites, + _prepareNextTasks, + shouldInterrupt, + WritesProtocol, +} from "./algo.js"; +import { gatherIterator, prefixGenerator } from "../utils.js"; +import { mapInput, mapOutputUpdates, mapOutputValues } from "./io.js"; +import { EmptyInputError, GraphInterrupt } from "../errors.js"; +import { getNewChannelVersions } from "./utils.js"; +import { mapDebugTasks, mapDebugCheckpoint } from "./debug.js"; + +const INPUT_DONE = Symbol.for("INPUT_DONE"); +const INPUT_RESUMING = Symbol.for("INPUT_RESUMING"); +const DEFAULT_LOOP_LIMIT = 25; + +export type PregelLoopInitializeParams = { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + input?: any; + config: RunnableConfig; + checkpointer?: BaseCheckpointSaver; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + graph: PregelInterface; +}; + +type PregelLoopParams = { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + input?: any; + config: RunnableConfig; + checkpointer?: BaseCheckpointSaver; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + graph: PregelInterface; + checkpoint: Checkpoint; + checkpointMetadata: CheckpointMetadata; + checkpointPreviousVersions: Record; + checkpointPendingWrites: CheckpointPendingWrite[]; + checkpointConfig: RunnableConfig; + channels: Record; + step: number; + stop: number; +}; + +export class PregelLoop { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + protected input?: any; + + config: RunnableConfig; + + protected checkpointer?: BaseCheckpointSaver; + + protected checkpointerGetNextVersion: ( + current: number | undefined, + channel: BaseChannel + ) => number; + + // TODO: Fix typing + // eslint-disable-next-line @typescript-eslint/no-explicit-any + protected graph: PregelInterface; + + channels: Record; + + protected checkpoint: Checkpoint; + + protected checkpointConfig: RunnableConfig; + + checkpointMetadata: CheckpointMetadata; + + protected checkpointPendingWrites: CheckpointPendingWrite[] = []; + + protected checkpointPreviousVersions: Record; + + step: number; + + protected stop: number; + + status: + | "pending" + | "done" + | "interrupt_before" + | "interrupt_after" + | "out_of_steps" = "pending"; + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + tasks: PregelExecutableTask[] = []; + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + stream: Deque<[StreamMode, any]> = new Deque(); + + protected isNested: boolean; + + protected _putCheckpointPromise: Promise = Promise.resolve(); + + get backgroundTasksPromise() { + return this._putCheckpointPromise; + } + + constructor(params: PregelLoopParams) { + this.input = params.input; + this.config = params.config; + this.checkpointer = params.checkpointer; + // TODO: if managed values no longer needs graph we can replace with + // managed_specs, channel_specs + if (this.checkpointer !== undefined) { + this.checkpointerGetNextVersion = this.checkpointer.getNextVersion.bind( + this.checkpointer + ); + } else { + this.checkpointerGetNextVersion = (current) => { + return current !== undefined ? current + 1 : 1; + }; + } + this.graph = params.graph; + this.checkpoint = params.checkpoint; + this.checkpointConfig = params.checkpointConfig; + this.checkpointMetadata = params.checkpointMetadata; + this.checkpointPreviousVersions = params.checkpointPreviousVersions; + this.channels = params.channels; + this.checkpointPendingWrites = params.checkpointPendingWrites; + this.step = params.step; + this.stop = params.stop; + this.isNested = CONFIG_KEY_READ in (this.config.configurable ?? {}); + } + + static async initialize(params: PregelLoopInitializeParams) { + const saved: CheckpointTuple = (await params.checkpointer?.getTuple( + params.config + )) ?? { + config: params.config, + checkpoint: emptyCheckpoint(), + metadata: { + source: "input", + step: -2, + writes: null, + }, + pendingWrites: [], + }; + const checkpointConfig = { + ...params.config, + ...saved.config, + configurable: { + ...params.config.configurable, + ...saved.config.configurable, + }, + }; + const checkpoint = copyCheckpoint(saved.checkpoint); + const checkpointMetadata = { ...saved.metadata } as CheckpointMetadata; + const checkpointPendingWrites = saved.pendingWrites ?? []; + + const channels = emptyChannels(params.graph.channels, checkpoint); + + const step = (checkpointMetadata.step ?? 0) + 1; + const stop = + step + (params.config.recursionLimit ?? DEFAULT_LOOP_LIMIT) + 1; + const checkpointPreviousVersions = { ...checkpoint.channel_versions }; + return new PregelLoop({ + input: params.input, + config: params.config, + checkpointer: params.checkpointer, + graph: params.graph, + checkpoint, + checkpointMetadata, + checkpointConfig, + channels, + step, + stop, + checkpointPreviousVersions, + checkpointPendingWrites, + }); + } + + protected async _checkpointerPutAfterPrevious(input: { + config: RunnableConfig; + checkpoint: Checkpoint; + metadata: CheckpointMetadata; + newVersions: Record; + }) { + try { + await this._putCheckpointPromise; + } finally { + this._putCheckpointPromise = + this.checkpointer?.put( + input.config, + input.checkpoint, + input.metadata, + input.newVersions + ) ?? Promise.resolve(); + } + } + + /** + * Put writes for a task, to be read by the next tick. + * @param taskId + * @param writes + */ + putWrites(taskId: string, writes: PendingWrite[]) { + const pendingWrites: CheckpointPendingWrite[] = writes.map( + ([key, value]) => { + return [taskId, key, value]; + } + ); + this.checkpointPendingWrites.push(...pendingWrites); + if (this.checkpointer !== undefined) { + void this.checkpointer.putWrites( + { + ...this.checkpointConfig, + configurable: { + ...this.checkpointConfig.configurable, + checkpoint_ns: this.config.configurable?.checkpoint_ns ?? "", + checkpoint_id: this.checkpoint.id, + }, + }, + writes, + taskId + ); + } + } + + /** + * Execute a single iteration of the Pregel loop. + * Returns true if more iterations are needed. + * @param params + */ + async tick(params: { + outputKeys: string | string[]; + interruptAfter: string[] | All; + interruptBefore: string[] | All; + manager?: CallbackManagerForChainRun; + }): Promise { + const { + outputKeys = [], + interruptAfter = [], + interruptBefore = [], + manager, + } = params; + if (this.status !== "pending") { + throw new Error( + `Cannot tick when status is no longer "pending". Current status: "${this.status}"` + ); + } + if (![INPUT_DONE, INPUT_RESUMING].includes(this.input)) { + await this._first(); + } else if (this.tasks.every((task) => task.writes.length > 0)) { + const writes = this.tasks.flatMap((t) => t.writes); + // All tasks have finished + _applyWrites(this.checkpoint, this.channels, this.tasks); + // produce values output + const valuesOutput = await gatherIterator( + prefixGenerator( + mapOutputValues(outputKeys, writes, this.channels), + "values" + ) + ); + this.stream.push(...valuesOutput); + // clear pending writes + this.checkpointPendingWrites = []; + const updatesOnly = + this.graph.streamMode?.length === 1 && + this.graph.streamMode?.includes("updates"); + const metadataWrites = updatesOnly + ? mapOutputUpdates(outputKeys, this.tasks).next().value + : mapOutputValues(outputKeys, writes, this.channels).next().value; + await this._putCheckpoint({ + source: "loop", + writes: metadataWrites, + }); + // after execution, check if we should interrupt + if (shouldInterrupt(this.checkpoint, interruptAfter, this.tasks)) { + this.status = "interrupt_after"; + if (this.isNested) { + throw new GraphInterrupt(); + } else { + return false; + } + } + } else { + return false; + } + if (this.step > this.stop) { + this.status = "out_of_steps"; + return false; + } + + const [, nextTasks] = _prepareNextTasks( + this.checkpoint, + this.graph.nodes, + this.channels, + this.config, + true, + { + step: this.step, + checkpointer: this.checkpointer, + isResuming: this.input === INPUT_RESUMING, + manager, + } + ); + this.tasks = nextTasks; + if (this.tasks.length === 0) { + this.status = "done"; + return false; + } + // if there are pending writes from a previous loop, apply them + if (this.checkpointPendingWrites.length > 0) { + for (const [tid, k, v] of this.checkpointPendingWrites) { + const task = this.tasks.find((t) => t.id === tid); + if (task) { + task.writes.push([k, v]); + } + } + } + // if all tasks have finished, re-tick + if (this.tasks.every((task) => task.writes.length > 0)) { + return this.tick({ + outputKeys, + interruptAfter, + interruptBefore, + manager, + }); + } + + // Before execution, check if we should interrupt + if (shouldInterrupt(this.checkpoint, interruptBefore, this.tasks)) { + this.status = "interrupt_before"; + if (this.isNested) { + throw new GraphInterrupt(); + } else { + return false; + } + } + // Produce debug output + const debugOutput = await gatherIterator( + prefixGenerator(mapDebugTasks(this.step, this.tasks), "debug") + ); + this.stream.push(...debugOutput); + + return true; + } + + /** + * Resuming from previous checkpoint requires + * - finding a previous checkpoint + * - receiving None input (outer graph) or RESUMING flag (subgraph) + */ + protected async _first() { + const isResuming = + (Object.keys(this.checkpoint.channel_versions).length !== 0 && + this.config.configurable?.[CONFIG_KEY_RESUMING] !== undefined) || + this.input === null; + if (isResuming) { + for (const channelName of Object.keys(this.channels)) { + if (this.checkpoint.channel_versions[channelName] !== undefined) { + const version = this.checkpoint.channel_versions[channelName]; + this.checkpoint.versions_seen[INTERRUPT] = { + ...this.checkpoint.versions_seen[INTERRUPT], + [channelName]: version, + }; + } + } + // map inputs to channel updates + } else { + const inputWrites = await gatherIterator( + mapInput(this.graph.inputChannels, this.input) + ); + if (inputWrites.length === 0) { + throw new EmptyInputError( + `Received no input writes for ${JSON.stringify( + this.graph.inputChannels, + null, + 2 + )}` + ); + } + const [, discardTasks] = _prepareNextTasks( + this.checkpoint, + this.graph.nodes, + this.channels, + this.config, + true, + { step: this.step } + ); + _applyWrites( + this.checkpoint, + this.channels, + (discardTasks as WritesProtocol[]).concat([ + { + name: INPUT, + writes: inputWrites as PendingWrite[], + triggers: [], + }, + ]), + this.checkpointerGetNextVersion + ); + // save input checkpoint + await this._putCheckpoint({ source: "input", writes: this.input }); + } + // done with input + this.input = isResuming ? INPUT_RESUMING : INPUT_DONE; + } + + protected async _putCheckpoint( + inputMetadata: Omit + ) { + // Assign step + const metadata = { + ...inputMetadata, + step: this.step, + }; + // Bail if no checkpointer + if (this.checkpointer !== undefined) { + // create new checkpoint + this.checkpointMetadata = metadata; + // child graphs keep at most one checkpoint per parent checkpoint + // this is achieved by writing child checkpoints as progress is made + // (so that error recovery / resuming from interrupt don't lose work) + // but doing so always with an id equal to that of the parent checkpoint + this.checkpoint = createCheckpoint( + this.checkpoint, + this.channels, + this.step + // id: this.isNested ? this.config.configurable?.checkpoint_id : undefined, + ); + this.checkpointConfig = { + ...this.checkpointConfig, + configurable: { + ...this.checkpointConfig.configurable, + checkpoint_ns: this.config.configurable?.checkpoint_ns ?? "", + }, + }; + const channelVersions = { ...this.checkpoint.channel_versions }; + const newVersions = getNewChannelVersions( + this.checkpointPreviousVersions, + channelVersions + ); + this.checkpointPreviousVersions = channelVersions; + // save it, without blocking + // if there's a previous checkpoint save in progress, wait for it + // ensuring checkpointers receive checkpoints in order + void this._checkpointerPutAfterPrevious({ + config: { ...this.checkpointConfig }, + checkpoint: copyCheckpoint(this.checkpoint), + metadata: { ...this.checkpointMetadata }, + newVersions, + }); + this.checkpointConfig = { + ...this.checkpointConfig, + configurable: { + ...this.checkpointConfig.configurable, + checkpoint_id: this.checkpoint.id, + }, + }; + + // Produce debug output + const debugOutput = await gatherIterator( + prefixGenerator( + mapDebugCheckpoint( + this.step, + this.checkpointConfig, + this.channels, + this.graph.streamChannelsAsIs as string[], + this.checkpointMetadata + ), + "debug" + ) + ); + this.stream.push(...debugOutput); + } + this.step += 1; + } +} diff --git a/langgraph/src/pregel/types.ts b/langgraph/src/pregel/types.ts index 29e813d2b..af0f646e4 100644 --- a/langgraph/src/pregel/types.ts +++ b/langgraph/src/pregel/types.ts @@ -1,27 +1,72 @@ -import { Runnable, RunnableConfig } from "@langchain/core/runnables"; +import type { Runnable, RunnableConfig } from "@langchain/core/runnables"; +import type { PendingWrite, CheckpointMetadata } from "../checkpoint/types.js"; +import type { BaseCheckpointSaver } from "../checkpoint/base.js"; +import type { BaseChannel } from "../channels/base.js"; +import type { PregelNode } from "./read.js"; + +export type StreamMode = "values" | "updates" | "debug"; + +/** + * Construct a type with a set of properties K of type T + */ +type StrRecord = { + [P in K]: T; +}; + +export interface PregelInterface< + Nn extends StrRecord, + Cc extends StrRecord +> { + nodes: Nn; + + channels: Cc; + + /** + * @default true + */ + autoValidate?: boolean; + + /** + * @default "values" + */ + streamMode?: StreamMode | StreamMode[]; + + inputChannels: keyof Cc | Array; + + outputChannels: keyof Cc | Array; -export interface CheckpointMetadata { /** - * The source of the checkpoint. - * - "input": The checkpoint was created from an input to invoke/stream/batch. - * - "loop": The checkpoint was created from inside the pregel loop. - * - "update": The checkpoint was created from a manual state update. + * @default [] */ - source: "input" | "loop" | "update"; + interruptAfter?: Array | All; + /** - * The step number of the checkpoint. - * -1 for the first "input" checkpoint. - * 0 for the first "loop" checkpoint. - * ... for the nth checkpoint afterwards. + * @default [] */ - step: number; + interruptBefore?: Array | All; + + streamChannels?: keyof Cc | Array; + + get streamChannelsAsIs(): keyof Cc | Array; + /** - * The writes that were made between the previous checkpoint and this one. - * Mapping from node name to writes emitted by that node. + * @default undefined */ - writes: Record | null; + stepTimeout?: number; + + /** + * @default false + */ + debug?: boolean; + + checkpointer?: BaseCheckpointSaver; } +export type PregelParams< + Nn extends StrRecord, + Cc extends StrRecord +> = Omit, "streamChannelsAsIs">; + export interface PregelTaskDescription { readonly name: string; readonly input: unknown; @@ -34,7 +79,7 @@ export interface PregelExecutableTask< readonly name: N; readonly input: unknown; readonly proc: Runnable; - readonly writes: Array<[C, unknown]>; + readonly writes: PendingWrite[]; readonly config: RunnableConfig | undefined; readonly triggers: Array; readonly retry_policy?: string; @@ -71,12 +116,3 @@ export interface StateSnapshot { } export type All = "*"; - -export type PendingWriteValue = unknown; - -export type PendingWrite = [Channel, PendingWriteValue]; - -export type CheckpointPendingWrite = [ - TaskId, - ...PendingWrite -]; diff --git a/langgraph/src/pregel/utils.ts b/langgraph/src/pregel/utils.ts new file mode 100644 index 000000000..13c03a285 --- /dev/null +++ b/langgraph/src/pregel/utils.ts @@ -0,0 +1,38 @@ +import type { ChannelVersions } from "../checkpoint/base.js"; + +export function getNewChannelVersions( + previousVersions: ChannelVersions, + currentVersions: ChannelVersions +): ChannelVersions { + // Get new channel versions + if (Object.keys(previousVersions).length > 0) { + const versionValues = Object.values(currentVersions); + const versionType = + versionValues.length > 0 ? typeof versionValues[0] : undefined; + let nullVersion: number | string; + if (versionType === "number") { + nullVersion = 0; + } else if (versionType === "string") { + nullVersion = ""; + } + + return Object.fromEntries( + Object.entries(currentVersions).filter( + ([k, v]) => v > (previousVersions[k] ?? nullVersion) + ) + ); + } else { + return currentVersions; + } +} + +// eslint-disable-next-line @typescript-eslint/no-explicit-any +export function _coerceToDict(value: any, defaultKey: string) { + return value && + !Array.isArray(value) && + // eslint-disable-next-line no-instanceof/no-instanceof + !(value instanceof Date) && + typeof value === "object" + ? value + : { [defaultKey]: value }; +} diff --git a/langgraph/src/pregel/write.ts b/langgraph/src/pregel/write.ts index 5e90d9ae3..9c2764436 100644 --- a/langgraph/src/pregel/write.ts +++ b/langgraph/src/pregel/write.ts @@ -53,8 +53,9 @@ export class ChannelWrite< .join(",")}>`; super({ ...{ writes, name, tags }, - func: async (input: RunInput, config?: RunnableConfig) => - this._write(input, config ?? {}), + func: async (input: RunInput, config?: RunnableConfig) => { + return this._write(input, config ?? {}); + }, }); this.writes = writes; diff --git a/langgraph/src/tests/checkpoints.test.ts b/langgraph/src/tests/checkpoints.test.ts index 931a86a9a..6b4b3fe36 100644 --- a/langgraph/src/tests/checkpoints.test.ts +++ b/langgraph/src/tests/checkpoints.test.ts @@ -168,6 +168,7 @@ describe("SqliteSaver", () => { expect(firstCheckpointTuple?.config).toEqual({ configurable: { thread_id: "1", + checkpoint_ns: "", checkpoint_id: checkpoint1.id, }, }); @@ -193,6 +194,7 @@ describe("SqliteSaver", () => { expect(secondCheckpointTuple?.parentConfig).toEqual({ configurable: { thread_id: "1", + checkpoint_ns: "", checkpoint_id: "2024-04-18T17:19:07.952Z", }, }); diff --git a/langgraph/src/tests/pregel.test.ts b/langgraph/src/tests/pregel.test.ts index d7fe6a330..f65f78e11 100644 --- a/langgraph/src/tests/pregel.test.ts +++ b/langgraph/src/tests/pregel.test.ts @@ -1,6 +1,6 @@ /* eslint-disable no-process-env */ /* eslint-disable no-promise-executor-return */ -import { it, expect, jest, beforeAll, describe } from "@jest/globals"; +import { it, expect, jest, describe } from "@jest/globals"; import { RunnableConfig, RunnableLambda, @@ -19,11 +19,8 @@ import { ToolMessage, } from "@langchain/core/messages"; import { ToolCall } from "@langchain/core/messages/tool"; -import { - gatherIterator, - FakeChatModel, - MemorySaverAssertImmutable, -} from "./utils.js"; +import { FakeChatModel, MemorySaverAssertImmutable } from "./utils.js"; +import { gatherIterator } from "../utils.js"; import { LastValue } from "../channels/last_value.js"; import { Annotation, @@ -43,7 +40,7 @@ import { _applyWrites, _localRead, _prepareNextTasks, - _shouldInterrupt, + shouldInterrupt, } from "../pregel/algo.js"; import { ToolExecutor, createAgentExecutor } from "../prebuilt/index.js"; import { MessageGraph, messagesStateReducer } from "../graph/message.js"; @@ -52,16 +49,7 @@ import { Checkpoint } from "../checkpoint/base.js"; import { GraphRecursionError, InvalidUpdateError } from "../errors.js"; import { SqliteSaver } from "../checkpoint/sqlite.js"; import { uuid5, uuid6 } from "../checkpoint/id.js"; -import { Send, TASKS } from "../constants.js"; - -// Tracing slows down the tests -beforeAll(() => { - process.env.LANGCHAIN_TRACING_V2 = "false"; - process.env.LANGCHAIN_ENDPOINT = ""; - process.env.LANGCHAIN_ENDPOINT = ""; - process.env.LANGCHAIN_API_KEY = ""; - process.env.LANGCHAIN_PROJECT = ""; -}); +import { INTERRUPT, Send, TASKS } from "../constants.js"; describe("Channel", () => { describe("writeTo", () => { @@ -120,8 +108,8 @@ describe("Pregel", () => { input: new LastValue(), output: new LastValue(), }, - inputs: "input", - outputs: "output", + inputChannels: "input", + outputChannels: "output", streamChannels: "output", }); const pregel2 = new Pregel({ @@ -130,8 +118,8 @@ describe("Pregel", () => { input: new LastValue(), output: new LastValue(), }, - inputs: "input", - outputs: "output", + inputChannels: "input", + outputChannels: "output", streamChannels: ["input", "output"], }); const pregel3 = new Pregel({ @@ -140,8 +128,8 @@ describe("Pregel", () => { input: new LastValue(), output: new LastValue(), }, - inputs: "input", - outputs: "output", + inputChannels: "input", + outputChannels: "output", }); // call method / assertions @@ -186,17 +174,18 @@ describe("Pregel", () => { tags: ["hello"], }; + const checkpointer = new MemorySaver(); // create Pregel class const pregel = new Pregel({ nodes, debug: false, - inputs: "outputKey", - outputs: "outputKey", + inputChannels: "outputKey", + outputChannels: "outputKey", interruptBefore: ["one"], interruptAfter: ["one"], streamMode: "values", channels, - checkpointer: new MemorySaver(), + checkpointer, }); // call method / assertions @@ -208,6 +197,7 @@ describe("Pregel", () => { {}, ["one"], // interrupt before ["one"], // interrupt after + checkpointer, ]; const expectedDefaults2 = [ @@ -218,6 +208,7 @@ describe("Pregel", () => { { tags: ["hello"] }, "*", // interrupt before ["one"], // interrupt after + checkpointer, ]; expect(pregel._defaults(config1)).toEqual(expectedDefaults1); @@ -226,7 +217,7 @@ describe("Pregel", () => { }); }); -describe("_shouldInterrupt", () => { +describe("shouldInterrupt", () => { it("should return true if any snapshot channel has been updated since last interrupt and any channel written to is in interrupt nodes list", () => { // set up test const checkpoint: Checkpoint = { @@ -240,7 +231,7 @@ describe("_shouldInterrupt", () => { channel1: 2, // current channel version is greater than last version seen }, versions_seen: { - __interrupt__: { + [INTERRUPT]: { channel1: 1, }, }, @@ -248,11 +239,10 @@ describe("_shouldInterrupt", () => { }; const interruptNodes = ["node1"]; - const snapshotChannels = ["channel1"]; // call method / assertions expect( - _shouldInterrupt(checkpoint, interruptNodes, snapshotChannels, [ + shouldInterrupt(checkpoint, interruptNodes, [ { name: "node1", input: undefined, @@ -283,11 +273,10 @@ describe("_shouldInterrupt", () => { }; const interruptNodes = ["node1"]; - const snapshotChannels = ["channel1"]; // call method / assertions expect( - _shouldInterrupt(checkpoint, interruptNodes, snapshotChannels, [ + shouldInterrupt(checkpoint, interruptNodes, [ { name: "node1", input: undefined, @@ -322,11 +311,10 @@ describe("_shouldInterrupt", () => { }; const interruptNodes = ["node1"]; - const snapshotChannels = ["channel1"]; // call method / assertions expect( - _shouldInterrupt(checkpoint, interruptNodes, snapshotChannels, [ + shouldInterrupt(checkpoint, interruptNodes, [ { name: "node1", input: undefined, @@ -361,11 +349,10 @@ describe("_shouldInterrupt", () => { }; const interruptNodes = ["node1"]; - const snapshotChannels = ["channel1"]; // call method / assertions expect( - _shouldInterrupt(checkpoint, interruptNodes, snapshotChannels, [ + shouldInterrupt(checkpoint, interruptNodes, [ { name: "node2", // node2 is not in interrupt nodes input: undefined, @@ -407,9 +394,23 @@ describe("_localRead", () => { const writes: Array<[string, any]> = []; // call method / assertions - expect(_localRead(checkpoint, channels, writes, "channel1", false)).toBe(1); expect( - _localRead(checkpoint, channels, writes, ["channel1", "channel2"], false) + _localRead( + checkpoint, + channels, + { name: "test", writes, triggers: [] }, + "channel1", + false + ) + ).toBe(1); + expect( + _localRead( + checkpoint, + channels, + { name: "test", writes, triggers: [] }, + ["channel1", "channel2"], + false + ) ).toEqual({ channel1: 1, channel2: 2 }); }); @@ -442,11 +443,23 @@ describe("_localRead", () => { ]; // call method / assertions - expect(_localRead(checkpoint, channels, writes, "channel1", true)).toBe( - 100 - ); expect( - _localRead(checkpoint, channels, writes, ["channel1", "channel2"], true) + _localRead( + checkpoint, + channels, + { name: "test", writes, triggers: [] }, + "channel1", + true + ) + ).toBe(100); + expect( + _localRead( + checkpoint, + channels, + { name: "test", writes, triggers: [] }, + ["channel1", "channel2"], + true + ) ).toEqual({ channel1: 100, channel2: 200 }); }); }); @@ -492,7 +505,9 @@ describe("_applyWrites", () => { expect(channels.channel2.get()).toBe("channel2value"); expect(checkpoint.channel_versions.channel1).toBe(2); - _applyWrites(checkpoint, channels, pendingWrites); // contains side effects + _applyWrites(checkpoint, channels, [ + { name: "foo", writes: pendingWrites, triggers: [] }, + ]); // contains side effects expect(channels.channel1.get()).toBe("channel1valueUpdated!"); expect(channels.channel2.get()).toBe("channel2value"); @@ -534,7 +549,9 @@ describe("_applyWrites", () => { // call method / assertions expect(() => { - _applyWrites(checkpoint, channels, pendingWrites); // contains side effects + _applyWrites(checkpoint, channels, [ + { name: "foo", writes: pendingWrites, triggers: [] }, + ]); // contains side effects }).toThrow(InvalidUpdateError); }); }); @@ -719,12 +736,12 @@ describe("_prepareNextTasks", () => { config: { tags: [], configurable: expect.any(Object), - metadata: { + metadata: expect.objectContaining({ langgraph_node: "node1", langgraph_step: -1, langgraph_task_idx: 0, langgraph_triggers: [TASKS], - }, + }), recursionLimit: 25, runId: undefined, runName: "node1", @@ -740,12 +757,12 @@ describe("_prepareNextTasks", () => { config: { tags: [], configurable: expect.any(Object), - metadata: { + metadata: expect.objectContaining({ langgraph_node: "node1", langgraph_step: -1, langgraph_task_idx: 1, langgraph_triggers: ["channel1"], - }, + }), recursionLimit: 25, runId: undefined, runName: "node1", @@ -761,12 +778,12 @@ describe("_prepareNextTasks", () => { config: { tags: [], configurable: expect.any(Object), - metadata: { + metadata: expect.objectContaining({ langgraph_node: "node2", langgraph_step: -1, langgraph_task_idx: 2, langgraph_triggers: ["channel1", "channel2"], - }, + }), recursionLimit: 25, runId: undefined, runName: "node2", @@ -794,8 +811,8 @@ it("can invoke pregel with a single process", async () => { input: new LastValue(), output: new LastValue(), }, - inputs: "input", - outputs: "output", + inputChannels: "input", + outputChannels: "output", }); expect(await app.invoke(2)).toBe(3); @@ -831,8 +848,8 @@ it("should process input and produce output with implicit channels", async () => input: new LastValue(), output: new LastValue(), }, - inputs: "input", - outputs: "output", + inputChannels: "input", + outputChannels: "output", }); expect(await app.invoke(2)).toBe(3); @@ -860,8 +877,8 @@ it("should process input and write kwargs correctly", async () => { fixed: new LastValue(), outputPlusOne: new LastValue(), }, - outputs: ["output", "fixed", "outputPlusOne"], - inputs: "input", + outputChannels: ["output", "fixed", "outputPlusOne"], + inputChannels: "input", }); expect(await app.invoke(2)).toEqual({ @@ -885,8 +902,8 @@ it("should invoke single process in out objects", async () => { input: new LastValue(), output: new LastValue(), }, - inputs: "input", - outputs: ["output"], + inputChannels: "input", + outputChannels: ["output"], }); expect(await app.invoke(2)).toEqual({ output: 3 }); @@ -904,8 +921,8 @@ it("should process input and output as objects", async () => { input: new LastValue(), output: new LastValue(), }, - inputs: ["input"], - outputs: ["output"], + inputChannels: ["input"], + outputChannels: ["output"], }); expect(await app.invoke({ input: 2 })).toEqual({ output: 3 }); @@ -928,8 +945,8 @@ it("should invoke two processes and get correct output", async () => { output: new LastValue(), input: new LastValue(), }, - inputs: "input", - outputs: "output", + inputChannels: "input", + outputChannels: "output", streamChannels: ["inbox", "output"], }); @@ -968,9 +985,9 @@ it("should process two processes with object input and output", async () => { input: new LastValue(), output: new LastValue(), }, - inputs: ["input", "inbox"], + inputChannels: ["input", "inbox"], streamChannels: ["output", "inbox"], - outputs: "output", + outputChannels: "output", }); expect( @@ -1025,7 +1042,7 @@ it("should process two processes with object input and output", async () => { timestamp: expect.any(String), step: 0, payload: { - id: "1726020d-12ca-56e2-a3d3-5b5752b526cf", + id: "240c2924-b25b-573d-a0b1-b3aee1241331", name: "one", result: [["inbox", 3]], }, @@ -1035,7 +1052,7 @@ it("should process two processes with object input and output", async () => { timestamp: expect.any(String), step: 0, payload: { - id: "ad0a1023-e379-52e7-be4c-5a2c1433aba0", + id: "7f2c3a63-782c-58c7-ba9e-7c2e4ceafdaa", name: "two", result: [["output", 13]], }, @@ -1056,7 +1073,7 @@ it("should process two processes with object input and output", async () => { timestamp: expect.any(String), step: 1, payload: { - id: "92ce7404-7c07-5383-b528-6933ac523e6a", + id: "f812355e-0e5c-5b76-9c43-f7fce750d1a0", name: "two", result: [["output", 4]], }, @@ -1086,8 +1103,8 @@ it("should process batch with two processes and delays", async () => { output: new LastValue(), input: new LastValue(), }, - inputs: "input", - outputs: "output", + inputChannels: "input", + outputChannels: "output", }); expect(await app.batch([3, 2, 1, 3, 5])).toEqual([5, 4, 3, 5, 7]); @@ -1147,8 +1164,8 @@ it("should batch many processes with input and output", async () => { const app = new Pregel({ nodes, channels, - inputs: "input", - outputs: "output", + inputChannels: "input", + outputChannels: "output", }); for (let i = 0; i < 3; i += 1) { @@ -1180,8 +1197,8 @@ it("should raise InvalidUpdateError when the same LastValue channel is updated t output: new LastValue(), input: new LastValue(), }, - inputs: "input", - outputs: "output", + inputChannels: "input", + outputChannels: "output", }); await expect(app.invoke(2)).rejects.toThrow(InvalidUpdateError); @@ -1204,8 +1221,8 @@ it("should process two inputs to two outputs validly", async () => { input: new LastValue(), output2: new LastValue(), }, - inputs: "input", - outputs: "output", + inputChannels: "input", + outputChannels: "output", }); // An Inbox channel accumulates updates into a sequence @@ -1261,8 +1278,8 @@ it("should handle checkpoints correctly", async () => { input: new LastValue(), output: new LastValue(), }, - inputs: "input", - outputs: "output", + inputChannels: "input", + outputChannels: "output", checkpointer: memory, }); @@ -1330,8 +1347,8 @@ it("should process two inputs joined into one topic and produce two outputs", as output: new LastValue(), input: new LastValue(), }, - inputs: "input", - outputs: "output", + inputChannels: "input", + outputChannels: "output", }); // Invoke app and check results @@ -1364,8 +1381,8 @@ it("should invoke join then call other app", async () => { output: new LastValue(), input: new LastValue(), }, - inputs: "input", - outputs: "output", + inputChannels: "input", + outputChannels: "output", }); const one = Channel.subscribeTo("input") @@ -1393,8 +1410,8 @@ it("should invoke join then call other app", async () => { output: new LastValue(), input: new LastValue(), }, - inputs: "input", - outputs: "output", + inputChannels: "input", + outputChannels: "output", }); // Run the test 10 times sequentially @@ -1434,8 +1451,8 @@ it("should handle two processes with one input and two outputs", async () => { output: new LastValue(), between: new LastValue(), }, - inputs: "input", - outputs: "output", + inputChannels: "input", + outputChannels: "output", streamChannels: ["output", "between"], }); @@ -1462,8 +1479,8 @@ it("should finish executing without output", async () => { between: new LastValue(), output: new LastValue(), }, - inputs: "input", - outputs: "output", + inputChannels: "input", + outputChannels: "output", }); // It finishes executing (once no more messages being published) @@ -2182,7 +2199,9 @@ describe("StateGraph", () => { createdAt: (await appWithInterrupt.checkpointer?.getTuple(config)) ?.checkpoint.ts, parentConfig: ( - await gatherIterator(appWithInterrupt.checkpointer!.list(config, 2)) + await gatherIterator( + appWithInterrupt.checkpointer!.list(config, { limit: 2 }) + ) ).slice(-1)[0].config, }); @@ -2240,7 +2259,9 @@ describe("StateGraph", () => { createdAt: (await appWithInterrupt.checkpointer?.getTuple(config)) ?.checkpoint.ts, parentConfig: ( - await gatherIterator(appWithInterrupt.checkpointer!.list(config, 2)) + await gatherIterator( + appWithInterrupt.checkpointer!.list(config, { limit: 2 }) + ) ).slice(-1)[0].config, }); @@ -2302,7 +2323,9 @@ describe("StateGraph", () => { ?.checkpoint.ts, config: (await appWithInterrupt.checkpointer?.getTuple(config))?.config, parentConfig: ( - await gatherIterator(appWithInterrupt.checkpointer!.list(config, 2)) + await gatherIterator( + appWithInterrupt.checkpointer!.list(config, { limit: 2 }) + ) ).slice(-1)[0].config, }); @@ -2361,7 +2384,9 @@ describe("StateGraph", () => { ?.checkpoint.ts, config: (await appWithInterrupt.checkpointer?.getTuple(config))?.config, parentConfig: ( - await gatherIterator(appWithInterrupt.checkpointer!.list(config, 2)) + await gatherIterator( + appWithInterrupt.checkpointer!.list(config, { limit: 2 }) + ) ).slice(-1)[0].config, }); }); @@ -2795,29 +2820,46 @@ it("StateGraph start branch then end", async () => { const thread1 = { configurable: { thread_id: "1" } }; expect( await toolTwoWithCheckpointer.invoke( - { my_key: "value", market: "DE" }, + { my_key: "value ⛰️", market: "DE" }, thread1 ) - ).toEqual({ my_key: "value", market: "DE" }); + ).toEqual({ my_key: "value ⛰️", market: "DE" }); + expect( + ( + await gatherIterator(toolTwoWithCheckpointer.checkpointer!.list(thread1)) + ).map((c) => c.metadata) + ).toEqual([ + { + source: "loop", + step: 0, + }, + { + source: "input", + step: -1, + writes: { my_key: "value ⛰️", market: "DE" }, + }, + ]); expect(await toolTwoWithCheckpointer.getState(thread1)).toEqual({ - values: { my_key: "value", market: "DE" }, + values: { my_key: "value ⛰️", market: "DE" }, next: ["tool_two_slow"], config: (await toolTwoWithCheckpointer.checkpointer!.getTuple(thread1))! .config, createdAt: (await toolTwoWithCheckpointer.checkpointer!.getTuple(thread1))! .checkpoint.ts, - metadata: { source: "loop", step: 0, writes: null }, + metadata: { source: "loop", step: 0 }, parentConfig: ( - await last(toolTwoWithCheckpointer.checkpointer!.list(thread1, 2)) + await last( + toolTwoWithCheckpointer.checkpointer!.list(thread1, { limit: 2 }) + ) ).config, }); expect(await toolTwoWithCheckpointer.invoke(null, thread1)).toEqual({ - my_key: "value slow", + my_key: "value ⛰️ slow", market: "DE", }); expect(await toolTwoWithCheckpointer.getState(thread1)).toEqual({ - values: { my_key: "value slow", market: "DE" }, + values: { my_key: "value ⛰️ slow", market: "DE" }, next: [], config: (await toolTwoWithCheckpointer.checkpointer!.getTuple(thread1))! .config, @@ -2829,7 +2871,125 @@ it("StateGraph start branch then end", async () => { writes: { tool_two_slow: { my_key: " slow" } }, }, parentConfig: ( - await last(toolTwoWithCheckpointer.checkpointer!.list(thread1, 2)) + await last( + toolTwoWithCheckpointer.checkpointer!.list(thread1, { limit: 2 }) + ) + ).config, + }); + const thread2 = { configurable: { thread_id: "2" } }; + // stop when about to enter node + expect( + await toolTwoWithCheckpointer.invoke( + { my_key: "value", market: "US" }, + thread2 + ) + ).toEqual({ + my_key: "value", + market: "US", + }); + expect(await toolTwoWithCheckpointer.getState(thread2)).toEqual({ + values: { my_key: "value", market: "US" }, + next: ["tool_two_fast"], + config: (await toolTwoWithCheckpointer.checkpointer!.getTuple(thread2))! + .config, + createdAt: (await toolTwoWithCheckpointer.checkpointer!.getTuple(thread2))! + .checkpoint.ts, + metadata: { source: "loop", step: 0 }, + parentConfig: ( + await last( + toolTwoWithCheckpointer.checkpointer!.list(thread2, { limit: 2 }) + ) + ).config, + }); + // resume, for same result as above + expect(await toolTwoWithCheckpointer.invoke(null, thread2)).toEqual({ + my_key: "value fast", + market: "US", + }); + expect(await toolTwoWithCheckpointer.getState(thread2)).toEqual({ + values: { my_key: "value fast", market: "US" }, + next: [], + config: (await toolTwoWithCheckpointer.checkpointer!.getTuple(thread2))! + .config, + createdAt: (await toolTwoWithCheckpointer.checkpointer!.getTuple(thread2))! + .checkpoint.ts, + metadata: { + source: "loop", + step: 1, + writes: { tool_two_fast: { my_key: " fast" } }, + }, + parentConfig: ( + await last( + toolTwoWithCheckpointer.checkpointer!.list(thread2, { limit: 2 }) + ) + ).config, + }); + const thread3 = { configurable: { thread_id: "3" } }; + // stop when about to enter node + expect( + await toolTwoWithCheckpointer.invoke( + { my_key: "value", market: "US" }, + thread3 + ) + ).toEqual({ + my_key: "value", + market: "US", + }); + expect(await toolTwoWithCheckpointer.getState(thread3)).toEqual({ + values: { my_key: "value", market: "US" }, + next: ["tool_two_fast"], + config: (await toolTwoWithCheckpointer.checkpointer!.getTuple(thread3))! + .config, + createdAt: (await toolTwoWithCheckpointer.checkpointer!.getTuple(thread3))! + .checkpoint.ts, + metadata: { source: "loop", step: 0 }, + parentConfig: ( + await last( + toolTwoWithCheckpointer.checkpointer!.list(thread3, { limit: 2 }) + ) + ).config, + }); + // update state + await toolTwoWithCheckpointer.updateState(thread3, { my_key: "key" }); // appends to my_key + expect(await toolTwoWithCheckpointer.getState(thread3)).toEqual({ + values: { my_key: "valuekey", market: "US" }, + next: ["tool_two_fast"], + config: (await toolTwoWithCheckpointer.checkpointer!.getTuple(thread3))! + .config, + createdAt: (await toolTwoWithCheckpointer.checkpointer!.getTuple(thread3))! + .checkpoint.ts, + metadata: { + source: "update", + step: 1, + writes: { [START]: { my_key: "key" } }, + }, + parentConfig: ( + await last( + toolTwoWithCheckpointer.checkpointer!.list(thread3, { limit: 2 }) + ) + ).config, + }); + // resume, for same result as above + expect(await toolTwoWithCheckpointer.invoke(null, thread3)).toEqual({ + my_key: "valuekey fast", + market: "US", + }); + expect(await toolTwoWithCheckpointer.getState(thread3)).toEqual({ + values: { my_key: "valuekey fast", market: "US" }, + next: [], + config: (await toolTwoWithCheckpointer.checkpointer!.getTuple(thread3))! + .config, + createdAt: (await toolTwoWithCheckpointer.checkpointer!.getTuple(thread3))! + .checkpoint.ts, + metadata: { + source: "loop", + step: 2, + writes: { tool_two_fast: { my_key: " fast" } }, + }, + parentConfig: ( + await last( + toolTwoWithCheckpointer.checkpointer!.list(thread3, { limit: 2 }) + ) ).config, }); }); diff --git a/langgraph/src/tests/tracing.test.ts b/langgraph/src/tests/tracing.test.ts index 3376eeca2..d6632eb9f 100644 --- a/langgraph/src/tests/tracing.test.ts +++ b/langgraph/src/tests/tracing.test.ts @@ -3,8 +3,9 @@ import { AIMessage, BaseMessage, HumanMessage } from "@langchain/core/messages"; import { FakeToolCallingChatModel } from "./utils.js"; // Import from main `@langchain/langgraph` endpoint to turn on automatic config passing import { END, START, StateGraph } from "../index.js"; +import { gatherIterator } from "../utils.js"; -it("should pass config through if importing from the primary entrypoint", async () => { +it("stream events for a multi-node graph", async () => { const stateGraph = new StateGraph<{ messages: BaseMessage[]; }>({ @@ -30,10 +31,7 @@ it("should pass config through if importing from the primary entrypoint", async .compile(); const eventStream = graph.streamEvents({ messages: [] }, { version: "v2" }); - const events = []; - for await (const event of eventStream) { - events.push(event); - } + const events = await gatherIterator(eventStream); expect(events).toEqual([ { event: "on_chain_start", @@ -57,12 +55,12 @@ it("should pass config through if importing from the primary entrypoint", async name: "__start__", tags: ["graph:step:0", "langsmith:hidden"], run_id: expect.any(String), - metadata: { + metadata: expect.objectContaining({ langgraph_node: "__start__", langgraph_step: 0, langgraph_task_idx: 0, - langgraph_triggers: ["__pregel_tasks"], - }, + langgraph_triggers: ["__start__"], + }), }, { event: "on_chain_end", @@ -75,12 +73,12 @@ it("should pass config through if importing from the primary entrypoint", async run_id: expect.any(String), name: "__start__", tags: ["graph:step:0", "langsmith:hidden"], - metadata: { + metadata: expect.objectContaining({ langgraph_node: "__start__", langgraph_step: 0, langgraph_task_idx: 0, - langgraph_triggers: ["__pregel_tasks"], - }, + langgraph_triggers: ["__start__"], + }), }, { event: "on_chain_start", @@ -92,12 +90,15 @@ it("should pass config through if importing from the primary entrypoint", async name: "testnode", tags: ["graph:step:1"], run_id: expect.any(String), - metadata: { + metadata: expect.objectContaining({ langgraph_node: "testnode", langgraph_step: 1, langgraph_task_idx: 0, - langgraph_triggers: ["__pregel_tasks"], - }, + langgraph_triggers: [ + "start:testnode", + "branch:testnode:condition:testnode", + ], + }), }, { event: "on_chain_start", @@ -109,12 +110,15 @@ it("should pass config through if importing from the primary entrypoint", async name: "RunnableLambda", tags: ["seq:step:1"], run_id: expect.any(String), - metadata: { + metadata: expect.objectContaining({ langgraph_node: "testnode", langgraph_step: 1, langgraph_task_idx: 0, - langgraph_triggers: ["__pregel_tasks"], - }, + langgraph_triggers: [ + "start:testnode", + "branch:testnode:condition:testnode", + ], + }), }, { event: "on_chat_model_start", @@ -126,14 +130,17 @@ it("should pass config through if importing from the primary entrypoint", async name: "model_call", tags: [], run_id: expect.any(String), - metadata: { + metadata: expect.objectContaining({ langgraph_node: "testnode", langgraph_step: 1, langgraph_task_idx: 0, - langgraph_triggers: ["__pregel_tasks"], + langgraph_triggers: [ + "start:testnode", + "branch:testnode:condition:testnode", + ], ls_model_type: "chat", ls_stop: undefined, - }, + }), }, { event: "on_chat_model_end", @@ -146,15 +153,17 @@ it("should pass config through if importing from the primary entrypoint", async run_id: expect.any(String), name: "model_call", tags: [], - metadata: { + metadata: expect.objectContaining({ langgraph_node: "testnode", langgraph_step: 1, langgraph_task_idx: 0, - langgraph_triggers: ["__pregel_tasks"], - + langgraph_triggers: [ + "start:testnode", + "branch:testnode:condition:testnode", + ], ls_model_type: "chat", ls_stop: undefined, - }, + }), }, { event: "on_chain_end", @@ -169,12 +178,15 @@ it("should pass config through if importing from the primary entrypoint", async run_id: expect.any(String), name: "RunnableLambda", tags: ["seq:step:1"], - metadata: { + metadata: expect.objectContaining({ langgraph_node: "testnode", langgraph_step: 1, langgraph_task_idx: 0, - langgraph_triggers: ["__pregel_tasks"], - }, + langgraph_triggers: [ + "start:testnode", + "branch:testnode:condition:testnode", + ], + }), }, { event: "on_chain_start", @@ -186,12 +198,15 @@ it("should pass config through if importing from the primary entrypoint", async name: "ChannelWrite", tags: ["seq:step:2", "langsmith:hidden"], run_id: expect.any(String), - metadata: { + metadata: expect.objectContaining({ langgraph_node: "testnode", langgraph_step: 1, langgraph_task_idx: 0, - langgraph_triggers: ["__pregel_tasks"], - }, + langgraph_triggers: [ + "start:testnode", + "branch:testnode:condition:testnode", + ], + }), }, { event: "on_chain_end", @@ -204,12 +219,15 @@ it("should pass config through if importing from the primary entrypoint", async run_id: expect.any(String), name: "ChannelWrite", tags: ["seq:step:2", "langsmith:hidden"], - metadata: { + metadata: expect.objectContaining({ langgraph_node: "testnode", langgraph_step: 1, langgraph_task_idx: 0, - langgraph_triggers: ["__pregel_tasks"], - }, + langgraph_triggers: [ + "start:testnode", + "branch:testnode:condition:testnode", + ], + }), }, { event: "on_chain_start", @@ -221,12 +239,15 @@ it("should pass config through if importing from the primary entrypoint", async name: "func", tags: ["seq:step:3"], run_id: expect.any(String), - metadata: { + metadata: expect.objectContaining({ langgraph_node: "testnode", langgraph_step: 1, langgraph_task_idx: 0, - langgraph_triggers: ["__pregel_tasks"], - }, + langgraph_triggers: [ + "start:testnode", + "branch:testnode:condition:testnode", + ], + }), }, { event: "on_chat_model_start", @@ -238,14 +259,17 @@ it("should pass config through if importing from the primary entrypoint", async name: "conditional_edge_call", tags: [], run_id: expect.any(String), - metadata: { + metadata: expect.objectContaining({ langgraph_node: "testnode", langgraph_step: 1, langgraph_task_idx: 0, - langgraph_triggers: ["__pregel_tasks"], + langgraph_triggers: [ + "start:testnode", + "branch:testnode:condition:testnode", + ], ls_model_type: "chat", ls_stop: undefined, - }, + }), }, { event: "on_chat_model_end", @@ -258,14 +282,17 @@ it("should pass config through if importing from the primary entrypoint", async run_id: expect.any(String), name: "conditional_edge_call", tags: [], - metadata: { + metadata: expect.objectContaining({ langgraph_node: "testnode", langgraph_step: 1, langgraph_task_idx: 0, - langgraph_triggers: ["__pregel_tasks"], + langgraph_triggers: [ + "start:testnode", + "branch:testnode:condition:testnode", + ], ls_model_type: "chat", ls_stop: undefined, - }, + }), }, { event: "on_chain_end", @@ -280,12 +307,15 @@ it("should pass config through if importing from the primary entrypoint", async run_id: expect.any(String), name: "func", tags: ["seq:step:3"], - metadata: { + metadata: expect.objectContaining({ langgraph_node: "testnode", langgraph_step: 1, langgraph_task_idx: 0, - langgraph_triggers: ["__pregel_tasks"], - }, + langgraph_triggers: [ + "start:testnode", + "branch:testnode:condition:testnode", + ], + }), }, { event: "on_chain_end", @@ -298,12 +328,15 @@ it("should pass config through if importing from the primary entrypoint", async run_id: expect.any(String), name: "testnode", tags: ["graph:step:1"], - metadata: { + metadata: expect.objectContaining({ langgraph_node: "testnode", langgraph_step: 1, langgraph_task_idx: 0, - langgraph_triggers: ["__pregel_tasks"], - }, + langgraph_triggers: [ + "start:testnode", + "branch:testnode:condition:testnode", + ], + }), }, { event: "on_chain_stream", @@ -323,9 +356,7 @@ it("should pass config through if importing from the primary entrypoint", async event: "on_chain_end", data: { output: { - testnode: { - messages: [new AIMessage("hey!")], - }, + messages: [new AIMessage("hey!")], }, }, run_id: expect.any(String), diff --git a/langgraph/src/tests/utils.ts b/langgraph/src/tests/utils.ts index fa2724cd1..2c8999517 100644 --- a/langgraph/src/tests/utils.ts +++ b/langgraph/src/tests/utils.ts @@ -11,7 +11,8 @@ import { RunnableConfig } from "@langchain/core/runnables"; import { Tool } from "@langchain/core/tools"; import { z } from "zod"; import { MemorySaver } from "../checkpoint/memory.js"; -import { Checkpoint, CheckpointMetadata } from "../checkpoint/base.js"; +import { Checkpoint } from "../checkpoint/base.js"; +import { CheckpointMetadata } from "../checkpoint/types.js"; export interface FakeChatModelArgs extends BaseChatModelParams { responses: BaseMessage[]; @@ -182,12 +183,3 @@ export class FakeSearchTool extends Tool { return `result for ${query}`; } } - -// https://github.com/tc39/proposal-array-from-async -export async function gatherIterator( - i: AsyncIterable | Promise> -): Promise> { - const out: T[] = []; - for await (const item of await i) out.push(item); - return out; -} diff --git a/langgraph/src/utils.ts b/langgraph/src/utils.ts index 9641cc6e4..fc632aca4 100644 --- a/langgraph/src/utils.ts +++ b/langgraph/src/utils.ts @@ -89,3 +89,49 @@ export class RunnableCallable extends Runnable { return returnValue; } } + +export function prefixGenerator( + generator: Generator, + prefix: Prefix +): Generator<[Prefix, T]>; +export function prefixGenerator( + generator: Generator, + prefix?: undefined +): Generator; +export function prefixGenerator< + T, + Prefix extends string | undefined = undefined +>( + generator: Generator, + prefix?: Prefix | undefined +): Generator; +export function* prefixGenerator< + T, + Prefix extends string | undefined = undefined +>( + generator: Generator, + prefix?: Prefix | undefined +): Generator { + if (prefix === undefined) { + yield* generator as Generator; + } else { + for (const value of generator) { + yield [prefix, value] as Prefix extends string ? [Prefix, T] : T; + } + } +} + +// https://github.com/tc39/proposal-array-from-async +export async function gatherIterator( + i: + | AsyncIterable + | Promise> + | Iterable + | Promise> +): Promise> { + const out: T[] = []; + for await (const item of await i) { + out.push(item); + } + return out; +} diff --git a/langgraph/src/web.ts b/langgraph/src/web.ts index 1716eb8d2..5d4d1d679 100644 --- a/langgraph/src/web.ts +++ b/langgraph/src/web.ts @@ -15,12 +15,12 @@ export { export { MemorySaver } from "./checkpoint/memory.js"; export { type Checkpoint, - type CheckpointMetadata, type CheckpointTuple, copyCheckpoint, emptyCheckpoint, BaseCheckpointSaver, } from "./checkpoint/base.js"; +export { type CheckpointMetadata } from "./checkpoint/types.js"; export { GraphRecursionError, GraphValueError, diff --git a/yarn.lock b/yarn.lock index 92ac77078..400b9f2f8 100644 --- a/yarn.lock +++ b/yarn.lock @@ -1886,6 +1886,7 @@ __metadata: "@swc/jest": ^0.2.29 "@tsconfig/recommended": ^1.0.3 "@types/better-sqlite3": ^7.6.9 + "@types/double-ended-queue": ^2 "@types/uuid": ^10 "@typescript-eslint/eslint-plugin": ^6.12.0 "@typescript-eslint/parser": ^6.12.0 @@ -1893,6 +1894,7 @@ __metadata: better-sqlite3: ^9.5.0 cheerio: 1.0.0-rc.12 dotenv: ^16.3.1 + double-ended-queue: ^2.1.0-0 dpdm: ^3.12.0 eslint: ^8.33.0 eslint-config-airbnb-base: ^15.0.0 @@ -3018,6 +3020,13 @@ __metadata: languageName: node linkType: hard +"@types/double-ended-queue@npm:^2": + version: 2.1.7 + resolution: "@types/double-ended-queue@npm:2.1.7" + checksum: 6a4a6e339c90048013cf762f07b284492e7e0fe3caf9d8e1d716ca6335d29a23993c21110f869b667a54d605280c072f84030eb18f37f47c1b36884030721d36 + languageName: node + linkType: hard + "@types/estree@npm:*, @types/estree@npm:1.0.5": version: 1.0.5 resolution: "@types/estree@npm:1.0.5" @@ -5182,6 +5191,13 @@ __metadata: languageName: node linkType: hard +"double-ended-queue@npm:^2.1.0-0": + version: 2.1.0-0 + resolution: "double-ended-queue@npm:2.1.0-0" + checksum: 3030cf9dcf6f8e7d8cb6ae5b7304890445d7c32233a614e400ba7b378086ad76f5822d0e501afd5ffe0af1de4bcb842fa23d4c79174d54f6566399435fafc271 + languageName: node + linkType: hard + "dpdm@npm:^3.12.0": version: 3.14.0 resolution: "dpdm@npm:3.14.0"