diff --git a/langgraph/src/pregel/algo.ts b/langgraph/src/pregel/algo.ts index 5a8948f9..96ed0855 100644 --- a/langgraph/src/pregel/algo.ts +++ b/langgraph/src/pregel/algo.ts @@ -337,7 +337,7 @@ export function _prepareNextTasks< // Check if any processes should be run in next step // If so, prepare the values to be passed to them for (const [name, proc] of Object.entries(processes)) { - const hasUpdatedChannels = proc.triggers + const updatedChannels = proc.triggers .filter((chan) => { try { readChannel(channels, chan, false); @@ -346,11 +346,13 @@ export function _prepareNextTasks< return false; } }) - .some( + .filter( (chan) => getChannelVersion(newCheckpoint, chan) > getVersionSeen(newCheckpoint, name, chan) ); + + const hasUpdatedChannels = updatedChannels.length > 0; // If any of the channels read by this process were updated if (hasUpdatedChannels) { // eslint-disable-next-line @typescript-eslint/no-explicit-any @@ -421,7 +423,7 @@ export function _prepareNextTasks< const metadata = { langgraph_step: extra.step, langgraph_node: name, - langgraph_triggers: proc.triggers, + langgraph_triggers: updatedChannels, langgraph_task_idx: tasks.length, }; const checkpointNamespace = @@ -438,7 +440,7 @@ export function _prepareNextTasks< input: val, proc: node, writes, - triggers: proc.triggers, + triggers: updatedChannels, config: patchConfig(mergeConfigs(proc.config, { metadata }), { runName: name, configurable: { diff --git a/langgraph/src/tests/pregel.test.ts b/langgraph/src/tests/pregel.test.ts index a526aee3..b95e2d58 100644 --- a/langgraph/src/tests/pregel.test.ts +++ b/langgraph/src/tests/pregel.test.ts @@ -757,7 +757,7 @@ describe("_prepareNextTasks", () => { input: 100, proc: new RunnablePassthrough(), writes: [], - triggers: ["channel1", "channel2"], + triggers: ["channel1"], config: { tags: [], configurable: expect.any(Object), @@ -765,7 +765,7 @@ describe("_prepareNextTasks", () => { langgraph_node: "node2", langgraph_step: -1, langgraph_task_idx: 2, - langgraph_triggers: ["channel1", "channel2"], + langgraph_triggers: ["channel1"], }, recursionLimit: 25, runId: undefined, @@ -2922,10 +2922,10 @@ it("checkpoint events", async () => { timestamp: expect.any(String), step: 3, payload: { - id: "9b41e438-8b1c-5f01-b1cb-cc158ed8bd57", + id: "d082881c-b51f-5f07-a0f3-3f4581048aeb", name: "finish", input: { my_key: "value prepared slow", market: "DE" }, - triggers: ["tool_two_fast", "tool_two_slow"], + triggers: ["tool_two_slow"], }, }, { @@ -2933,7 +2933,7 @@ it("checkpoint events", async () => { timestamp: expect.any(String), step: 3, payload: { - id: "9b41e438-8b1c-5f01-b1cb-cc158ed8bd57", + id: "d082881c-b51f-5f07-a0f3-3f4581048aeb", name: "finish", result: [["my_key", " finished"]], },