diff --git a/libs/text-splitters/langchain_text_splitters/html.py b/libs/text-splitters/langchain_text_splitters/html.py index 5c934a3125ab3..074a3825e35e6 100644 --- a/libs/text-splitters/langchain_text_splitters/html.py +++ b/libs/text-splitters/langchain_text_splitters/html.py @@ -3,8 +3,7 @@ import copy import pathlib import re -from dataclasses import dataclass, field -from io import BytesIO, StringIO +from io import StringIO from typing import ( Any, Callable, @@ -34,27 +33,6 @@ class ElementType(TypedDict): metadata: Dict[str, str] -@dataclass -class Node: - """Represents a node in a hierarchical structure. - - Attributes: - name: The name of the node. - tag_type: The type of the node. - content: The content of the node. - level: The level of the node in the hierarchy. - dom_depth: The depth of the node in the DOM structure. - parent: The parent node. Defaults to None. - """ - - name: str - tag_type: str - content: str - level: int - dom_depth: int - parent: Optional[Node] = field(default=None) - - class HTMLHeaderTextSplitter: """Split HTML content into structured Documents based on specified headers. @@ -151,58 +129,8 @@ def __init__( ) self.header_mapping = dict(self.headers_to_split_on) self.header_tags = [tag for tag, _ in self.headers_to_split_on] - self.elements_tree: Dict[int, Tuple[str, str, int, int]] = {} self.return_each_element = return_each_element - def _header_level(self, element: Any) -> int: - """Determine the heading level of an element. - - Args: - element: A BeautifulSoup element. - - Returns: - The heading level (1-6) if a heading, else a large number. - """ - tag_name = element.name.lower() if hasattr(element, "name") else "" - if tag_name in ["h1", "h2", "h3", "h4", "h5", "h6"]: - return int(tag_name[1]) - return 9999 - - def _dom_depth(self, element: Any) -> int: - """Compute the DOM depth of an element. - - Args: - element: A BeautifulSoup element. - - Returns: - The depth of the element in the DOM tree. - """ - depth = 0 - for _ in element.parents: - depth += 1 - return depth - - def _build_tree(self, elements: Any) -> None: - """Build a tree structure from a list of HTML elements. - - Args: - elements: A list of BeautifulSoup elements. - """ - for idx, element in enumerate(elements): - text = " ".join( - t - for t in element.find_all(string=True, recursive=False) - if isinstance(t, str) - ).strip() - - if not text: - continue - - level = self._header_level(element) - dom_depth = self._dom_depth(element) - - self.elements_tree[idx] = (element.name, text, level, dom_depth) - def split_text(self, text: str) -> List[Document]: """Split the given text into a list of Document objects. @@ -230,111 +158,40 @@ def split_text_from_url( Raises: requests.RequestException: If the HTTP request fails. """ - try: - kwargs.setdefault("timeout", timeout) - response = requests.get(url, **kwargs) # noqa: E501 - response.raise_for_status() - except requests.RequestException as e: - msg = f"Error fetching URL {url}: {e}" - raise requests.RequestException(msg) from e - return self.split_text_from_file(BytesIO(response.content)) - - def _finalize_chunk( - self, - current_chunk: List[str], - active_headers: Dict[str, Tuple[str, int, int]], - documents: List[Document], - chunk_dom_depth: int, - ) -> None: - if current_chunk: - final_meta: Dict[str, str] = { - key: content - for key, (content, level, dom_depth) in active_headers.items() - if chunk_dom_depth >= dom_depth - } - combined_text = " \n".join(line for line in current_chunk if line.strip()) - documents.append(Document(page_content=combined_text, metadata=final_meta)) - current_chunk.clear() - chunk_dom_depth = 0 - - def _generate_documents(self, nodes: Dict[int, Node]) -> List[Document]: - """Generate a list of Document objects from a node structure. - - Args: - nodes: A dictionary of nodes indexed by their position. - - Returns: - A list of generated Document objects. - """ - documents: List[Document] = [] - active_headers: Dict[str, Tuple[str, int, int]] = {} - current_chunk: List[str] = [] - chunk_dom_depth = 0 - - def process_node(node: Node) -> None: - """Process a node and update chunk, headers, and documents accordingly. - - Updates current chunk, active headers, and documents based on the - node's type and content. - - Args: - node: The node to be processed. It should have attributes - 'tag_type', 'content', 'level', and 'dom_depth'. - """ - nonlocal chunk_dom_depth - node_type = node.tag_type # type: ignore[attr-defined] - node_content = node.content # type: ignore[attr-defined] - node_level = node.level # type: ignore[attr-defined] - node_dom_depth = node.dom_depth # type: ignore[attr-defined] - - if node_type in self.header_tags: - self._finalize_chunk( - current_chunk, active_headers, documents, chunk_dom_depth - ) - headers_to_remove = [ - key - for key, (_, lvl, _) in active_headers.items() - if lvl >= node_level - ] - for key in headers_to_remove: - del active_headers[key] - header_key = self.header_mapping[node_type] # type: ignore[attr-defined] - active_headers[header_key] = (node_content, node_level, node_dom_depth) - header_meta: Dict[str, str] = { - key: content - for key, (content, _, dd) in active_headers.items() - if node_dom_depth >= dd - } - documents.append( - Document(page_content=node_content, metadata=header_meta) - ) - else: - headers_to_remove = [ - key - for key, (_, _, dd) in active_headers.items() - if node_dom_depth < dd - ] - for key in headers_to_remove: - del active_headers[key] - if node_content.strip(): - current_chunk.append(node_content) - chunk_dom_depth = max(chunk_dom_depth, node_dom_depth) + kwargs.setdefault("timeout", timeout) + response = requests.get(url, **kwargs) + response.raise_for_status() + return self.split_text(response.text) + + def _header_level(self, tag_name: str) -> int: + """Determine the heading level of a tag.""" + if tag_name.lower() in ["h1", "h2", "h3", "h4", "h5", "h6"]: + return int(tag_name[1]) + # Returns high level if it isn't a header + return 9999 - sorted_nodes = sorted(nodes.items()) - for _, node in sorted_nodes: - process_node(node) + def _dom_depth(self, element: Any) -> int: + """Determine the DOM depth of an element by counting its parents.""" + depth = 0 + for _ in element.parents: + depth += 1 + return depth - self._finalize_chunk(current_chunk, active_headers, documents, chunk_dom_depth) - return documents + def _get_elements(self, html_content: str) -> List[Any]: + """Parse HTML content and return a list of BeautifulSoup elements. - def split_text_from_file(self, file: Any) -> List[Document]: - """Split HTML content from a file into a list of Document objects. + This helper function takes HTML content as input, parses it using BeautifulSoup4, + and returns all HTML elements found in the document body. If no body tag exists, + it returns all elements in the full document. Args: - file: A file path or a file-like object containing HTML content. + html_content (str): Raw HTML content to be parsed. Returns: - A list of split Document objects. + List[Any]: A list of BeautifulSoup elements found in the HTML document. + + Raises: + ImportError: If the BeautifulSoup4 package is not installed. """ try: from bs4 import BeautifulSoup # type: ignore[import-untyped] @@ -344,131 +201,120 @@ def split_text_from_file(self, file: Any) -> List[Document]: please install with `pip install \ bs4`." ) from e - if isinstance(file, str): - with open(file, "r", encoding="utf-8") as f: - html_content = f.read() - else: - html_content = file.read() - soup = BeautifulSoup(html_content, "html.parser") body = soup.body if soup.body else soup + return body.find_all() - elements = body.find_all() - self._build_tree(elements) - - if not self.elements_tree: - return [] - - min_level = min(level for (_, _, level, _) in self.elements_tree.values()) - root = Node( - "root", tag_type="root", content="", level=min_level - 1, dom_depth=0 - ) - - nodes = { - idx: Node( - f"{tag}_{idx}", - tag_type=tag, - content=text, - level=level, - dom_depth=dom_depth, - ) - for idx, (tag, text, level, dom_depth) in self.elements_tree.items() - } - - stack: List[Node] = [] - for idx in sorted(nodes): - node = nodes[idx] - while stack and ( - stack[-1].level >= node.level or stack[-1].dom_depth >= node.dom_depth - ): - stack.pop() - if stack: - node.parent = stack[-1] - else: - node.parent = root - stack.append(node) - - if not self.return_each_element: - return self._aggregate_documents(nodes) - - return self._generate_individual_documents(nodes) - - def _aggregate_documents(self, nodes: Dict[int, Node]) -> List[Document]: - """Generate documents from a list of nodes. - - Args: - nodes: List of Node objects representing the HTML structure. - - Returns: - List of CoreDocument objects containing the processed text chunks. - """ - return self._generate_documents(nodes) - - def _generate_individual_documents(self, nodes: Dict[int, Node]) -> List[Document]: - """Generate individual Document objects for each element. + def split_text_from_file(self, file: Any) -> List[Document]: + """Split HTML content from a file into a list of Document objects. Args: - nodes: A dictionary of nodes indexed by their position. + file: A file path or a file-like object containing HTML content. Returns: - A list of individual Document objects. + A list of split Document objects. """ + if isinstance(file, str): + with open(file, "r", encoding="utf-8") as f: + html_content = f.read() + else: + html_content = file.read() + elements = self._get_elements(html_content) documents: List[Document] = [] active_headers: Dict[str, Tuple[str, int, int]] = {} + current_chunk: List[str] = [] + chunk_dom_depth = 0 - sorted_nodes = sorted(nodes.items()) + def finalize_chunk(): + if current_chunk: + final_meta = { + key: content + for key, (content, level, dom_depth) in active_headers.items() + if chunk_dom_depth >= dom_depth + } + combined_text = " \n".join( + line for line in current_chunk if line.strip() + ) + if combined_text.strip(): + documents.append( + Document(page_content=combined_text, metadata=final_meta) + ) + current_chunk.clear() - def process_node(node: Node) -> None: - """Process a single node to create Document objects based on header tags. + for element in elements: + tag = element.name + if not tag: + continue + text = " ".join( + t + for t in element.find_all(string=True, recursive=False) + if isinstance(t, str) + ).strip() + if not text: + continue - Args: - node: The node to process. - """ - node_type = node.type # type: ignore[attr-defined] - node_content = node.content # type: ignore[attr-defined] - node_level = node.level # type: ignore[attr-defined] - node_dom_depth = node.dom_depth # type: ignore[attr-defined] - header_meta: Dict[str, str] - if node_type in self.header_tags: - # Remove headers of the same or lower level + level = self._header_level(tag) + dom_depth = self._dom_depth(element) + + if tag in self.header_tags: + if not self.return_each_element: + finalize_chunk() + + # Remove headers at same or deeper level headers_to_remove = [ - key - for key, (_, lvl, _) in active_headers.items() - if lvl >= node_level + key for key, (_, lvl, _) in active_headers.items() if lvl >= level ] for key in headers_to_remove: del active_headers[key] - # Update active headers with the current header - header_key = self.header_mapping[node_type] # type: ignore[attr-defined] - active_headers[header_key] = (node_content, node_level, node_dom_depth) + header_key = self.header_mapping[tag] + active_headers[header_key] = (text, level, dom_depth) - # Create metadata based on active headers + # Produce a document for the header itself header_meta = { key: content for key, (content, lvl, dd) in active_headers.items() - if node_dom_depth >= dd + if dom_depth >= dd } - - # Create a Document for the header element - documents.append( - Document(page_content=node_content, metadata=header_meta) - ) + documents.append(Document(page_content=text, metadata=header_meta)) + # After encountering a header, no immediate content goes to current_chunk + # (if return_each_element is False, we wait for next content) + # (if return_each_element is True, we create docs per element anyway) else: - # For non-header elements, associate with current headers - if node_content.strip(): - header_meta = { + # Non-header element logic + # Remove headers that don't apply if dom_depth < their dom_depth + headers_to_remove = [ + key for key, (_, _, dd) in active_headers.items() if dom_depth < dd + ] + for key in headers_to_remove: + del active_headers[key] + + if self.return_each_element: + # Produce a doc for this element immediately + element_meta = { key: content for key, (content, lvl, dd) in active_headers.items() - if node_dom_depth >= dd + if dom_depth >= dd } - documents.append( - Document(page_content=node_content, metadata=header_meta) - ) + if text.strip(): + documents.append( + Document(page_content=text, metadata=element_meta) + ) + else: + # Accumulate content in current_chunk + if text.strip(): + current_chunk.append(text) + chunk_dom_depth = max(chunk_dom_depth, dom_depth) - # Process each node using the inner process_node function - for _, node in sorted_nodes: - process_node(node) + if not self.return_each_element: + # finalize any remaining chunk + finalize_chunk() + + # If no headers were found at all and return_each_element=False, behavior is: + # The entire content should be in one document. + # The logic above naturally handles it: + # If no recognized headers, we never split; we ended up just accumulating text + # in current_chunk and finalizing once at the end. return documents @@ -1142,3 +988,6 @@ def _reinsert_preserved_elements( for placeholder, preserved_content in preserved_elements.items(): content = content.replace(placeholder, preserved_content.strip()) return content + + +# %%