diff --git a/libs/text-splitters/langchain_text_splitters/html.py b/libs/text-splitters/langchain_text_splitters/html.py index 212a9abaa7b95..99767fda860ef 100644 --- a/libs/text-splitters/langchain_text_splitters/html.py +++ b/libs/text-splitters/langchain_text_splitters/html.py @@ -3,7 +3,7 @@ import copy import pathlib import re -from io import BytesIO, StringIO +from io import StringIO from typing import ( Any, Callable, @@ -34,148 +34,291 @@ class ElementType(TypedDict): class HTMLHeaderTextSplitter: - """Splitting HTML files based on specified headers. + """Split HTML content into structured Documents based on specified headers. - Requires lxml package. + Splits HTML content by detecting specified header tags (e.g.,
Welcome to the introduction section.
+Some background details here.
+Final thoughts.
+ + + \"\"\" + + documents = splitter.split_text(html_content) + + # 'documents' now contains Document objects reflecting the hierarchy: + # - Document with metadata={"Main Topic": "Introduction"} and + # content="Introduction" + # - Document with metadata={"Main Topic": "Introduction"} and + # content="Welcome to the introduction section." + # - Document with metadata={"Main Topic": "Introduction", + # "Sub Topic": "Background"} and content="Background" + # - Document with metadata={"Main Topic": "Introduction", + # "Sub Topic": "Background"} and content="Some background details here." + # - Document with metadata={"Main Topic": "Conclusion"} and + # content="Conclusion" + # - Document with metadata={"Main Topic": "Conclusion"} and + # content="Final thoughts." """ def __init__( self, headers_to_split_on: List[Tuple[str, str]], return_each_element: bool = False, - ): - """Create a new HTMLHeaderTextSplitter. + ) -> None: + """Initialize with headers to split on. Args: - headers_to_split_on: list of tuples of headers we want to track mapped to - (arbitrary) keys for metadata. Allowed header values: h1, h2, h3, h4, - h5, h6 e.g. [("h1", "Header 1"), ("h2", "Header 2)]. - return_each_element: Return each element w/ associated headers. + headers_to_split_on: A list of tuples where + each tuple contains a header tag and its corresponding value. + return_each_element: Whether to return each HTML + element as a separate Document. Defaults to False. """ - # Output element-by-element or aggregated into chunks w/ common headers + self.headers_to_split_on = sorted( + headers_to_split_on, key=lambda x: int(x[0][1]) + ) + self.header_mapping = dict(self.headers_to_split_on) + self.header_tags = [tag for tag, _ in self.headers_to_split_on] self.return_each_element = return_each_element - self.headers_to_split_on = sorted(headers_to_split_on) - def aggregate_elements_to_chunks( - self, elements: List[ElementType] - ) -> List[Document]: - """Combine elements with common metadata into chunks. + def split_text(self, text: str) -> List[Document]: + """Split the given text into a list of Document objects. Args: - elements: HTML element content with associated identifying info and metadata + text: The HTML text to split. + + Returns: + A list of split Document objects. """ - aggregated_chunks: List[ElementType] = [] + return self.split_text_from_file(StringIO(text)) - for element in elements: - if ( - aggregated_chunks - and aggregated_chunks[-1]["metadata"] == element["metadata"] - ): - # If the last element in the aggregated list - # has the same metadata as the current element, - # append the current content to the last element's content - aggregated_chunks[-1]["content"] += " \n" + element["content"] - else: - # Otherwise, append the current element to the aggregated list - aggregated_chunks.append(element) + def split_text_from_url( + self, url: str, timeout: int = 10, **kwargs: Any + ) -> List[Document]: + """Fetch text content from a URL and split it into documents. - return [ - Document(page_content=chunk["content"], metadata=chunk["metadata"]) - for chunk in aggregated_chunks - ] + Args: + url: The URL to fetch content from. + timeout: Timeout for the request. Defaults to 10. + **kwargs: Additional keyword arguments for the request. - def split_text_from_url(self, url: str, **kwargs: Any) -> List[Document]: - """Split HTML from web URL. + Returns: + A list of split Document objects. - Args: - url: web URL - **kwargs: Arbitrary additional keyword arguments. These are usually passed - to the fetch url content request. + Raises: + requests.RequestException: If the HTTP request fails. """ - r = requests.get(url, **kwargs) - return self.split_text_from_file(BytesIO(r.content)) - - def split_text(self, text: str) -> List[Document]: - """Split HTML text string. + kwargs.setdefault("timeout", timeout) + response = requests.get(url, **kwargs) + response.raise_for_status() + return self.split_text(response.text) + + def _header_level(self, tag_name: str) -> int: + """Determine the heading level of a tag.""" + if tag_name.lower() in ["h1", "h2", "h3", "h4", "h5", "h6"]: + return int(tag_name[1]) + # Returns high level if it isn't a header + return 9999 + + def _dom_depth(self, element: Any) -> int: + """Determine the DOM depth of an element by counting its parents.""" + depth = 0 + for _ in element.parents: + depth += 1 + return depth + + def _get_elements(self, html_content: str) -> List[Any]: + """Parse HTML content and return a list of BeautifulSoup elements. + + This helper function takes HTML content as input, + parses it using BeautifulSoup4, and returns all HTML elements + found in the document body. If no body tag exists, + it returns all elements in the full document. Args: - text: HTML text - """ - return self.split_text_from_file(StringIO(text)) + html_content: Raw HTML content to be parsed. - def split_text_from_file(self, file: Any) -> List[Document]: - """Split HTML file. + Returns: + List[Any]: A list of BeautifulSoup elements found in the HTML document. - Args: - file: HTML file + Raises: + ImportError: If the BeautifulSoup4 package is not installed. """ try: - from lxml import etree + from bs4 import BeautifulSoup # type: ignore[import-untyped] except ImportError as e: raise ImportError( - "Unable to import lxml, please install with `pip install lxml`." + "Unable to import BeautifulSoup/PageElement, \ + please install with `pip install \ + bs4`." ) from e - # use lxml library to parse html document and return xml ElementTree - # Explicitly encoding in utf-8 allows non-English - # html files to be processed without garbled characters - parser = etree.HTMLParser(encoding="utf-8") - tree = etree.parse(file, parser) - - # document transformation for "structure-aware" chunking is handled with xsl. - # see comments in html_chunks_with_headers.xslt for more detailed information. - xslt_path = pathlib.Path(__file__).parent / "xsl/html_chunks_with_headers.xslt" - xslt_tree = etree.parse(xslt_path) - transform = etree.XSLT(xslt_tree) - result = transform(tree) - result_dom = etree.fromstring(str(result)) - - # create filter and mapping for header metadata - header_filter = [header[0] for header in self.headers_to_split_on] - header_mapping = dict(self.headers_to_split_on) - - # map xhtml namespace prefix - ns_map = {"h": "http://www.w3.org/1999/xhtml"} - - # build list of elements from DOM - elements = [] - for element in result_dom.findall("*//*", ns_map): - if element.findall("*[@class='headers']") or element.findall( - "*[@class='chunk']" - ): - elements.append( - ElementType( - url=file, - xpath="".join( - [ - node.text or "" - for node in element.findall("*[@class='xpath']", ns_map) - ] - ), - content="".join( - [ - node.text or "" - for node in element.findall("*[@class='chunk']", ns_map) - ] - ), - metadata={ - # Add text of specified headers to metadata using header - # mapping. - header_mapping[node.tag]: node.text or "" - for node in filter( - lambda x: x.tag in header_filter, - element.findall("*[@class='headers']/*", ns_map), - ) - }, - ) + soup = BeautifulSoup(html_content, "html.parser") + body = soup.body if soup.body else soup + return body.find_all() + + def split_text_from_file(self, file: Any) -> List[Document]: + """Split HTML content from a file into a list of Document objects. + + Args: + file: A file path or a file-like object containing HTML content. + + Returns: + A list of split Document objects. + """ + if isinstance(file, str): + with open(file, "r", encoding="utf-8") as f: + html_content = f.read() + else: + html_content = file.read() + elements = self._get_elements(html_content) + documents: List[Document] = [] + active_headers: Dict[str, Tuple[str, int, int]] = {} + current_chunk: List[str] = [] + chunk_dom_depth = 0 + + def finalize_chunk() -> None: + if current_chunk: + final_meta = { + key: content + for key, (content, level, dom_depth) in active_headers.items() + if chunk_dom_depth >= dom_depth + } + combined_text = " \n".join( + line for line in current_chunk if line.strip() ) + if combined_text.strip(): + documents.append( + Document(page_content=combined_text, metadata=final_meta) + ) + current_chunk.clear() + + for element in elements: + tag = element.name + if not tag: + continue + text = " ".join( + t + for t in element.find_all(string=True, recursive=False) + if isinstance(t, str) + ).strip() + if not text: + continue + + level = self._header_level(tag) + dom_depth = self._dom_depth(element) + + if tag in self.header_tags: + if not self.return_each_element: + finalize_chunk() + + # Remove headers at same or deeper level + headers_to_remove = [ + key for key, (_, lvl, _) in active_headers.items() if lvl >= level + ] + for key in headers_to_remove: + del active_headers[key] + + header_key = self.header_mapping[tag] + active_headers[header_key] = (text, level, dom_depth) + + # Produce a document for the header itself + header_meta = { + key: content + for key, (content, lvl, dd) in active_headers.items() + if dom_depth >= dd + } + documents.append(Document(page_content=text, metadata=header_meta)) + # After encountering a header, + # no immediate content goes to current_chunk + # (if return_each_element is False, we wait for next content) + # (if return_each_element is True, we create docs per element anyway) + else: + # Non-header element logic + # Remove headers that don't apply if dom_depth < their dom_depth + headers_to_remove = [ + key for key, (_, _, dd) in active_headers.items() if dom_depth < dd + ] + for key in headers_to_remove: + del active_headers[key] + + if self.return_each_element: + # Produce a doc for this element immediately + element_meta = { + key: content + for key, (content, lvl, dd) in active_headers.items() + if dom_depth >= dd + } + if text.strip(): + documents.append( + Document(page_content=text, metadata=element_meta) + ) + else: + # Accumulate content in current_chunk + if text.strip(): + current_chunk.append(text) + chunk_dom_depth = max(chunk_dom_depth, dom_depth) if not self.return_each_element: - return self.aggregate_elements_to_chunks(elements) - else: - return [ - Document(page_content=chunk["content"], metadata=chunk["metadata"]) - for chunk in elements - ] + # finalize any remaining chunk + finalize_chunk() + + # If no headers were found at all and return_each_element=False, behavior is: + # The entire content should be in one document. + # The logic above naturally handles it: + # If no recognized headers, we never split; we ended up just accumulating text + # in current_chunk and finalizing once at the end. + + return documents class HTMLSectionSplitter: @@ -269,7 +412,10 @@ def split_html_by_headers(self, html_doc: str) -> List[Dict[str, Optional[str]]] - 'tag_name': The name of the header tag (e.g., "h1", "h2"). """ try: - from bs4 import BeautifulSoup, PageElement # type: ignore[import-untyped] + from bs4 import ( + BeautifulSoup, # type: ignore[import-untyped] + PageElement, + ) except ImportError as e: raise ImportError( "Unable to import BeautifulSoup/PageElement, \ @@ -343,10 +489,13 @@ def convert_possible_tags_to_header(self, html_content: str) -> str: return str(result) def split_text_from_file(self, file: Any) -> List[Document]: - """Split HTML file. + """Split HTML content from a file into a list of Document objects. Args: - file: HTML file + file: A file path or a file-like object containing HTML content. + + Returns: + A list of split Document objects. """ file_content = file.getvalue() file_content = self.convert_possible_tags_to_header(file_content) @@ -844,3 +993,6 @@ def _reinsert_preserved_elements( for placeholder, preserved_content in preserved_elements.items(): content = content.replace(placeholder, preserved_content.strip()) return content + + +# %% diff --git a/libs/text-splitters/tests/unit_tests/test_text_splitters.py b/libs/text-splitters/tests/unit_tests/test_text_splitters.py index dd6d0023d1027..4e5c0bca4c106 100644 --- a/libs/text-splitters/tests/unit_tests/test_text_splitters.py +++ b/libs/text-splitters/tests/unit_tests/test_text_splitters.py @@ -4,7 +4,7 @@ import re import string from pathlib import Path -from typing import Any, List +from typing import Any, Callable, List, Tuple import pytest from langchain_core.documents import Document @@ -2039,49 +2039,476 @@ def test_haskell_code_splitter() -> None: assert chunks == expected_chunks -@pytest.mark.requires("lxml") -def test_html_header_text_splitter(tmp_path: Path) -> None: - splitter = HTMLHeaderTextSplitter( - headers_to_split_on=[("h1", "Header 1"), ("h2", "Header 2")] - ) +@pytest.fixture +@pytest.mark.requires("bs4") +def html_header_splitter_splitter_factory() -> ( + Callable[[List[Tuple[str, str]]], HTMLHeaderTextSplitter] +): + """ + Fixture to create an HTMLHeaderTextSplitter instance with given headers. + This factory allows dynamic creation of splitters with different headers. + """ - content = """ -Reference content.
+ def _create_splitter( + headers_to_split_on: List[Tuple[str, str]], + ) -> HTMLHeaderTextSplitter: + return HTMLHeaderTextSplitter(headers_to_split_on=headers_to_split_on) -Some text
-Some more text
-This is the introduction.
+Background information.
+Final thoughts.
+ + + """, + [ + Document( + page_content="Introduction", metadata={"Header 1": "Introduction"} + ), + Document( + page_content="This is the introduction.", + metadata={"Header 1": "Introduction"}, + ), + Document( + page_content="Background", + metadata={"Header 1": "Introduction", "Header 2": "Background"}, + ), + Document( + page_content="Background information.", + metadata={"Header 1": "Introduction", "Header 2": "Background"}, + ), + Document( + page_content="Conclusion", metadata={"Header 1": "Conclusion"} + ), + Document( + page_content="Final thoughts.", metadata={"Header 1": "Conclusion"} + ), + ], + "Simple headers and paragraphs", + ), + ( + # Test Case 2: Nested headers with h1, h2, and h3 + [("h1", "Header 1"), ("h2", "Header 2"), ("h3", "Header 3")], + """ + + +Details of subsection.
+More details.
+Content under another main title.
+ + + """, + [ + Document( + page_content="Main Title", metadata={"Header 1": "Main Title"} + ), + Document( + page_content="Subsection", + metadata={"Header 1": "Main Title", "Header 2": "Subsection"}, + ), + Document( + page_content="Details of subsection.", + metadata={"Header 1": "Main Title", "Header 2": "Subsection"}, + ), + Document( + page_content="Sub-subsection", + metadata={ + "Header 1": "Main Title", + "Header 2": "Subsection", + "Header 3": "Sub-subsection", + }, + ), + Document( + page_content="More details.", + metadata={ + "Header 1": "Main Title", + "Header 2": "Subsection", + "Header 3": "Sub-subsection", + }, + ), + Document( + page_content="Another Main Title", + metadata={"Header 1": "Another Main Title"}, + ), + Document( + page_content="Content under another main title.", + metadata={"Header 1": "Another Main Title"}, + ), + ], + "Nested headers with h1, h2, and h3", + ), + ( + # Test Case 3: No headers + [("h1", "Header 1")], + """ + + +Paragraph one.
+Paragraph two.
+Paragraph three.
+Content of chapter 1.
+Content of chapter 2.
+Content of chapter 3.
+ + + """, + [ + Document(page_content="Chapter 1", metadata={"Header 1": "Chapter 1"}), + Document( + page_content="Content of chapter 1.", + metadata={"Header 1": "Chapter 1"}, + ), + Document(page_content="Chapter 2", metadata={"Header 1": "Chapter 2"}), + Document( + page_content="Content of chapter 2.", + metadata={"Header 1": "Chapter 2"}, + ), + Document(page_content="Chapter 3", metadata={"Header 1": "Chapter 3"}), + Document( + page_content="Content of chapter 3.", + metadata={"Header 1": "Chapter 3"}, + ), + ], + "Multiple headers of the same level", + ), + ( + # Test Case 5: Headers with no content + [("h1", "Header 1"), ("h2", "Header 2")], + """ + + +Some intro text about Foo.
+Some intro text about Bar.
+Some text about the first subtopic of Bar.
+Some text about the second subtopic of Bar.
+Some text about Baz
+Some concluding text about Foo
+Paragraph one.
+Paragraph two.
+Paragraph three.
+ + + """, + [ + Document( + metadata={}, + page_content="Paragraph one. \nParagraph two. \nParagraph three.", + ) + ], + "Test Case B: Split on h1 only without any headers", ), - ] - assert docs == expected + ], +) +@pytest.mark.requires("bs4") +def test_additional_html_header_text_splitter( + html_header_splitter_splitter_factory: Any, + headers_to_split_on: List[Tuple[str, str]], + html_content: str, + expected_output: List[Document], + test_case: str, +) -> None: + """ + Test the HTML header text splitter. + + Args: + html_header_splitter_splitter_factory (Any): Factory function to create + the HTML header splitter. + headers_to_split_on (List[Tuple[str, str]]): List of headers to split on. + html_content (str): HTML content to be split. + expected_output (List[Document]): Expected list of Document objects. + test_case (str): Description of the test case. + + Raises: + AssertionError: If the number of documents or their content/metadata + does not match the expected output. + """ + splitter = html_header_splitter_splitter_factory( + headers_to_split_on=headers_to_split_on + ) + docs = splitter.split_text(html_content) + + assert len(docs) == len(expected_output), ( + f"{test_case} Failed: Number of documents mismatch. " + f"Expected {len(expected_output)}, got {len(docs)}." + ) + for idx, (doc, expected) in enumerate(zip(docs, expected_output), start=1): + assert doc.page_content == expected.page_content, ( + f"{test_case} Failed at Document {idx}: " + f"Content mismatch.\nExpected: {expected.page_content}\n" + "Got: {doc.page_content}" + ) + assert doc.metadata == expected.metadata, ( + f"{test_case} Failed at Document {idx}: " + f"Metadata mismatch.\nExpected: {expected.metadata}\nGot: {doc.metadata}" + ) - with open(tmp_path / "doc.html", "w") as tmp: - tmp.write(content) - docs_from_file = splitter.split_text_from_file(tmp_path / "doc.html") - assert docs_from_file == expected +@pytest.mark.parametrize( + "headers_to_split_on, html_content, expected_output, test_case", + [ + ( + # Test Case C: Split on h1, h2, and h3 with no headers present + [("h1", "Header 1"), ("h2", "Header 2"), ("h3", "Header 3")], + """ + + +Just some random text without headers.
+This is some long text that should be split into multiple chunks due to the +
This is some long text that should be split into multiple chunks due to the small chunk size.
""" splitter = HTMLSemanticPreservingSplitter(