-
Notifications
You must be signed in to change notification settings - Fork 12
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Add Checkpointer base #282
base: langgraph
Are you sure you want to change the base?
Conversation
Thanks for your pull request! It looks like this may be your first contribution to a Google open source project. Before we can look at your pull request, you'll need to sign a Contributor License Agreement (CLA). View this failed invocation of the CLA check for more information. For the most up to date status, view the checks section at the bottom of the pull request. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How do we want to proceed with this PR? There are a few needed fixes and I need to think about the schema as well. Once the comments besides the schema are fixed, we can merge this PR and create a new one for the next bit of work, how does that sound?
That sounds good to me! In the meantime, I'll create the PR for checkpointer base but for cloudSQL. |
By schema I mean the table schema. The interface puts CheckpointTuples and Writes to the database and gets CheckpointTuples from the database. I think we can design a table schema where we can read and write the CheckpointTuple object and writes without needing to have 4 different tables. Currently, I don't think we need the blobs or migration tables. That data is already stored in the checkpoint objects. |
before: Optional[RunnableConfig] = None, | ||
limit: Optional[int] = None | ||
) -> AsyncIterator[CheckpointTuple]: | ||
'''List checkpoints from AlloyDB ''' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please make sure all methods have docstrings with the args and return values listed
/gcbrun |
AsyncAlloyDBSaver: A newly created instance of AsyncAlloyDBSaver. | ||
""" | ||
|
||
checkpoints_table_schema = await engine._aload_table_schema("checkpoints", schema_name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's use a variable for the table name "checkpoints" and "checkpoint_writes" in the engine class and import them here, so we make sure they stay consistent
"thread_id": thread_id, | ||
"checkpoint_ns": checkpoint_ns, | ||
"checkpoint_id": checkpoint_id, | ||
"parent_checkpoint_id": config.get("checkpoint_id"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I need to check on this value
"channel": write[0], | ||
"type": write[1], | ||
"blob": json.dumps(write[2]), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe this is write but note to self to check closer
"checkpoint_ns": config["configurable"]["checkpoint_ns"], | ||
"checkpoint_id": config["configurable"]["checkpoint_id"], | ||
"task_id": task_id, | ||
"idx": idx, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
idx is converted from writes[0] with the IDX Map, see https://github.com/langchain-ai/langgraph/blob/506539ac9df277a113423d40999518a0ad2bdb3f/libs/checkpoint-postgres/langgraph/checkpoint/postgres/aio.py#L299
"type": next_config["configurable"]["type"], | ||
"blob": json.dumps(next_config["configurable"]["blob"]), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need type and blob here since that is capture in the writes.
"checkpoint": json.dumps(copy), | ||
"metadata": json.dumps(dict(metadata)), | ||
"channel": copy.pop("channel_values"), | ||
"version": new_versions, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I need to ask how new_versions is handled.
Change Log:
[ ] Fixed issues from googleapis/langchain-google-cloud-sql-pg-python#235
[ ] Create a copy of langchain's engine.py and create init_checkpoint_table methods
[ ] Finish developing checkpoint.py