From 8c6eec5f256d17e7dd3f64e5b7cf691a61fbb49e Mon Sep 17 00:00:00 2001 From: Prashanth Rao <35005448+prrao87@users.noreply.github.com> Date: Mon, 9 Dec 2024 22:15:28 -0500 Subject: [PATCH] community: KuzuGraph needs allow_dangerous_requests, add graph documents via LLMGraphTransformer (#27949) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - [x] **PR title**: "community: Kuzu - Add graph documents via LLMGraphTransformer" - This PR adds a new method `add_graph_documents` to use the `GraphDocument`s extracted by `LLMGraphTransformer` and store in a Kùzu graph backend. - This allows users to transform unstructured text into a graph that uses Kùzu as the graph store. - [x] **Lint and test**: Run `make format`, `make lint` and `make test` from the root of the package(s) you've modified. See contribution guidelines for more: https://python.langchain.com/docs/contributing/ --------- Co-authored-by: pookam90 Co-authored-by: Pooja Kamath <60406274+Pookam90@users.noreply.github.com> Co-authored-by: hsm207 Co-authored-by: Erick Friis --- .../langchain_community/graphs/kuzu_graph.py | 174 +++++++++++++++++- 1 file changed, 170 insertions(+), 4 deletions(-) diff --git a/libs/community/langchain_community/graphs/kuzu_graph.py b/libs/community/langchain_community/graphs/kuzu_graph.py index 1f99f49fc9435..3fe3d60c283c2 100644 --- a/libs/community/langchain_community/graphs/kuzu_graph.py +++ b/libs/community/langchain_community/graphs/kuzu_graph.py @@ -1,4 +1,7 @@ -from typing import Any, Dict, List +from hashlib import md5 +from typing import Any, Dict, List, Tuple + +from langchain_community.graphs.graph_document import GraphDocument, Relationship class KuzuGraph: @@ -16,7 +19,19 @@ class KuzuGraph: See https://python.langchain.com/docs/security for more information. """ - def __init__(self, db: Any, database: str = "kuzu") -> None: + def __init__( + self, db: Any, database: str = "kuzu", allow_dangerous_requests: bool = False + ) -> None: + """Initializes the Kùzu graph database connection.""" + + if allow_dangerous_requests is not True: + raise ValueError( + "The KuzuGraph class is a powerful tool that can be used to execute " + "arbitrary queries on the database. To enable this functionality, " + "set the `allow_dangerous_requests` parameter to `True` when " + "constructing the KuzuGraph object." + ) + try: import kuzu except ImportError: @@ -57,7 +72,7 @@ def refresh_schema(self) -> None: if properties[property_name]["dimension"] > 0: if "shape" in properties[property_name]: for s in properties[property_name]["shape"]: - list_type_flag += "[%s]" % s + list_type_flag += f"[{s}]" else: for i in range(properties[property_name]["dimension"]): list_type_flag += "[]" @@ -71,7 +86,7 @@ def refresh_schema(self) -> None: rel_tables = self.conn._get_rel_table_names() for table in rel_tables: relationships.append( - "(:%s)-[:%s]->(:%s)" % (table["src"], table["name"], table["dst"]) + f"(:{table['src']})-[:{table['name']}]->(:{table['dst']})" ) rel_properties = [] @@ -93,3 +108,154 @@ def refresh_schema(self) -> None: f"Relationships properties: {rel_properties}\n" f"Relationships: {relationships}\n" ) + + def _create_chunk_node_table(self) -> None: + self.conn.execute( + """ + CREATE NODE TABLE IF NOT EXISTS Chunk ( + id STRING, + text STRING, + type STRING, + PRIMARY KEY(id) + ); + """ + ) + + def _create_entity_node_table(self, node_label: str) -> None: + self.conn.execute( + f""" + CREATE NODE TABLE IF NOT EXISTS {node_label} ( + id STRING, + type STRING, + PRIMARY KEY(id) + ); + """ + ) + + def _create_entity_relationship_table(self, rel: Relationship) -> None: + self.conn.execute( + f""" + CREATE REL TABLE IF NOT EXISTS {rel.type} ( + FROM {rel.source.type} TO {rel.target.type} + ); + """ + ) + + def add_graph_documents( + self, + graph_documents: List[GraphDocument], + allowed_relationships: List[Tuple[str, str, str]], + include_source: bool = False, + ) -> None: + """ + Adds a list of `GraphDocument` objects that represent nodes and relationships + in a graph to a Kùzu backend. + + Parameters: + - graph_documents (List[GraphDocument]): A list of `GraphDocument` objects + that contain the nodes and relationships to be added to the graph. Each + `GraphDocument` should encapsulate the structure of part of the graph, + including nodes, relationships, and the source document information. + + - allowed_relationships (List[Tuple[str, str, str]]): A list of allowed + relationships that exist in the graph. Each tuple contains three elements: + the source node type, the relationship type, and the target node type. + Required for Kùzu, as the names of the relationship tables that need to + pre-exist are derived from these tuples. + + - include_source (bool): If True, stores the source document + and links it to nodes in the graph using the `MENTIONS` relationship. + This is useful for tracing back the origin of data. Merges source + documents based on the `id` property from the source document metadata + if available; otherwise it calculates the MD5 hash of `page_content` + for merging process. Defaults to False. + """ + # Get unique node labels in the graph documents + node_labels = list( + {node.type for document in graph_documents for node in document.nodes} + ) + + for document in graph_documents: + # Add chunk nodes and create source document relationships if include_source + # is True + if include_source: + self._create_chunk_node_table() + if not document.source.metadata.get("id"): + # Add a unique id to each document chunk via an md5 hash + document.source.metadata["id"] = md5( + document.source.page_content.encode("utf-8") + ).hexdigest() + + self.conn.execute( + f""" + MERGE (c:Chunk {{id: $id}}) + SET c.text = $text, + c.type = "text_chunk" + """, # noqa: F541 + parameters={ + "id": document.source.metadata["id"], + "text": document.source.page_content, + }, + ) + + for node_label in node_labels: + self._create_entity_node_table(node_label) + + # Add entity nodes from data + for node in document.nodes: + self.conn.execute( + f""" + MERGE (e:{node.type} {{id: $id}}) + SET e.type = "entity" + """, + parameters={"id": node.id}, + ) + if include_source: + # If include_source is True, we need to create a relationship table + # between the chunk nodes and the entity nodes + self._create_chunk_node_table() + ddl = "CREATE REL TABLE GROUP IF NOT EXISTS MENTIONS (" + table_names = [] + for node_label in node_labels: + table_names.append(f"FROM Chunk TO {node_label}") + table_names = list(set(table_names)) + ddl += ", ".join(table_names) + # Add common properties for all the tables here + ddl += ", label STRING, triplet_source_id STRING)" + if ddl: + self.conn.execute(ddl) + + # Only allow relationships that exist in the schema + if node.type in node_labels: + self.conn.execute( + f""" + MATCH (c:Chunk {{id: $id}}), + (e:{node.type} {{id: $node_id}}) + MERGE (c)-[m:MENTIONS]->(e) + SET m.triplet_source_id = $id + """, + parameters={ + "id": document.source.metadata["id"], + "node_id": node.id, + }, + ) + + # Add entity relationships + for rel in document.relationships: + self._create_entity_relationship_table(rel) + # Create relationship + source_label = rel.source.type + source_id = rel.source.id + target_label = rel.target.type + target_id = rel.target.id + self.conn.execute( + f""" + MATCH (e1:{source_label} {{id: $source_id}}), + (e2:{target_label} {{id: $target_id}}) + MERGE (e1)-[:{rel.type}]->(e2) + """, + parameters={ + "source_id": source_id, + "target_id": target_id, + }, + )