Skip to content

Commit

Permalink
Handle explicit undefined values passed in an update with a warning
Browse files Browse the repository at this point in the history
  • Loading branch information
jacoblee93 committed Aug 25, 2024
1 parent 70fa16a commit 3e9860d
Show file tree
Hide file tree
Showing 7 changed files with 266 additions and 50 deletions.
67 changes: 33 additions & 34 deletions libs/checkpoint/src/memory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,17 +54,7 @@ export class MemorySaver extends BaseCheckpointSaver {
];
})
);
const parentConfig =
parentCheckpointId !== undefined
? {
configurable: {
thread_id,
checkpoint_ns,
checkpoint_id,
},
}
: undefined;
return {
const checkpointTuple: CheckpointTuple = {
config,
checkpoint: (await this.serde.loadsTyped(
"json",
Expand All @@ -75,8 +65,17 @@ export class MemorySaver extends BaseCheckpointSaver {
metadata
)) as CheckpointMetadata,
pendingWrites,
parentConfig,
};
if (parentCheckpointId !== undefined) {
checkpointTuple.parentConfig = {
configurable: {
thread_id,
checkpoint_ns,
checkpoint_id,
},
};
}
return checkpointTuple;
}
} else {
const checkpoints = this.storage[thread_id]?.[checkpoint_ns];
Expand All @@ -99,17 +98,7 @@ export class MemorySaver extends BaseCheckpointSaver {
];
})
);
const parentConfig =
parentCheckpointId !== undefined
? {
configurable: {
thread_id,
checkpoint_ns,
checkpoint_id: parentCheckpointId,
},
}
: undefined;
return {
const checkpointTuple: CheckpointTuple = {
config: {
configurable: {
thread_id,
Expand All @@ -126,8 +115,17 @@ export class MemorySaver extends BaseCheckpointSaver {
metadata
)) as CheckpointMetadata,
pendingWrites,
parentConfig,
};
if (parentCheckpointId !== undefined) {
checkpointTuple.parentConfig = {
configurable: {
thread_id,
checkpoint_ns,
checkpoint_id: parentCheckpointId,
},
};
}
return checkpointTuple;
}
}

Expand Down Expand Up @@ -191,7 +189,7 @@ export class MemorySaver extends BaseCheckpointSaver {
})
);

yield {
const checkpointTuple: CheckpointTuple = {
config: {
configurable: {
thread_id: threadId,
Expand All @@ -205,16 +203,17 @@ export class MemorySaver extends BaseCheckpointSaver {
)) as Checkpoint,
metadata,
pendingWrites,
parentConfig: parentCheckpointId
? {
configurable: {
thread_id: threadId,
checkpoint_ns: checkpointNamespace,
checkpoint_id: parentCheckpointId,
},
}
: undefined,
};
if (parentCheckpointId !== undefined) {
checkpointTuple.parentConfig = {
configurable: {
thread_id: threadId,
checkpoint_ns: checkpointNamespace,
checkpoint_id: parentCheckpointId,
},
};
}
yield checkpointTuple;
}
}
}
Expand Down
7 changes: 5 additions & 2 deletions libs/checkpoint/src/serde/jsonplus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,17 @@ function _encodeConstructorArgs(
lc: 2,
type: "constructor",
id: [constructor.name],
method,
method: method ?? null,
args: args ?? [],
kwargs: kwargs ?? {},
};
}

function _default(_key: string, obj: any): any {
if (obj instanceof Set || obj instanceof Map) {
if (obj === undefined) {
console.warn(`Serializer received an explicit "undefined" value. Converting to "null".`);
return null;
} else if (obj instanceof Set || obj instanceof Map) {
return _encodeConstructorArgs(obj.constructor, undefined, [
Array.from(obj),
]);
Expand Down
23 changes: 21 additions & 2 deletions libs/checkpoint/src/serde/tests/jsonplus.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { AIMessage, HumanMessage } from "@langchain/core/messages";
import { uuid6 } from "../../id.js";
import { JsonPlusSerializer } from "../jsonplus.js";

const value = {
const complexValue = {
number: 1,
id: uuid6(-1),
error: new Error("test error"),
Expand All @@ -18,6 +18,7 @@ const value = {
new Error("nestedfoo"),
5,
true,
null,
false,
{
a: "b",
Expand All @@ -26,12 +27,30 @@ const value = {
],
object: {
messages: [new HumanMessage("hey there"), new AIMessage("hi how are you")],
nestedNullVal: null,
emptyString: "",
},
emptyString: "",
nullVal: null,
};

it("should serialize and deserialize various data types", async () => {
const VALUES = [
["null", null],
["empty string", ""],
["simple string", "foobar"],
["various data types", complexValue],
] satisfies [string, unknown][];

it.each(VALUES)("should serialize and deserialize %s", async (_description, value) => {
const serde = new JsonPlusSerializer();
const [type, serialized] = serde.dumpsTyped(value);
const deserialized = await serde.loadsTyped(type, serialized);
expect(deserialized).toEqual(value);
});

it(`should serialize "undefined" as "null"`, async () => {
const serde = new JsonPlusSerializer();
const [type, serialized] = serde.dumpsTyped(undefined);
const deserialized = await serde.loadsTyped(type, serialized);
expect(deserialized).toEqual(null);
});
1 change: 0 additions & 1 deletion libs/langgraph/src/pregel/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,6 @@ export class Pregel<
writers.length > 1 ? RunnableSequence.from(writers as any) : writers[0],
writes: [],
triggers: [INTERRUPT],
config: undefined,
id: uuid5(INTERRUPT, checkpoint.id),
};

Expand Down
7 changes: 5 additions & 2 deletions libs/langgraph/src/pregel/loop.ts
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ export class PregelLoop {
: mapOutputValues(outputKeys, writes, this.channels).next().value;
await this._putCheckpoint({
source: "loop",
writes: metadataWrites,
writes: metadataWrites ?? null,
});
// after execution, check if we should interrupt
if (shouldInterrupt(this.checkpoint, interruptAfter, this.tasks)) {
Expand Down Expand Up @@ -456,7 +456,10 @@ export class PregelLoop {
this.checkpointerGetNextVersion
);
// save input checkpoint
await this._putCheckpoint({ source: "input", writes: this.input });
await this._putCheckpoint({
source: "input",
writes: this.input ?? null,
});
}
// done with input
this.input = isResuming ? INPUT_RESUMING : INPUT_DONE;
Expand Down
2 changes: 1 addition & 1 deletion libs/langgraph/src/pregel/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ export interface PregelExecutableTask<
readonly input: unknown;
readonly proc: Runnable;
readonly writes: PendingWrite<C>[];
readonly config: RunnableConfig | undefined;
readonly config?: RunnableConfig;
readonly triggers: Array<string>;
readonly retry_policy?: string;
readonly id: string;
Expand Down
Loading

0 comments on commit 3e9860d

Please sign in to comment.