From ed5c45760b81cd322ad23c8648e1070510b2ae3f Mon Sep 17 00:00:00 2001 From: Rajesh Thallam Date: Fri, 30 Aug 2024 12:04:22 -0700 Subject: [PATCH] Added support for Google Cloud Document AI Layout Parser (#294) --- .../langchain_google_community/docai.py | 192 +++++++++++++----- .../tests/integration_tests/terraform/main.tf | 5 +- .../tests/integration_tests/test_docai.py | 21 ++ 3 files changed, 160 insertions(+), 58 deletions(-) create mode 100644 libs/community/tests/integration_tests/test_docai.py diff --git a/libs/community/langchain_google_community/docai.py b/libs/community/langchain_google_community/docai.py index 5f6a059e..50af7556 100644 --- a/libs/community/langchain_google_community/docai.py +++ b/libs/community/langchain_google_community/docai.py @@ -9,7 +9,7 @@ import re import time from dataclasses import dataclass -from typing import TYPE_CHECKING, Iterator, List, Optional, Sequence +from typing import TYPE_CHECKING, Any, Iterator, List, Optional, Sequence from langchain_core.document_loaders import BaseBlobParser from langchain_core.document_loaders.blob_loaders import Blob @@ -23,7 +23,7 @@ from google.cloud.documentai import ( # type: ignore[import] DocumentProcessorServiceClient, ) - + from google.cloud.documentai_v1.types import ProcessOptions logger = logging.getLogger(__name__) @@ -47,6 +47,7 @@ def __init__( self, *, client: Optional["DocumentProcessorServiceClient"] = None, + project_id: Optional[str] = None, location: Optional[str] = None, gcs_output_path: Optional[str] = None, processor_name: Optional[str] = None, @@ -95,12 +96,19 @@ def __init__( "`pip install langchain-google-community[docai]`" ) from exc options = ClientOptions( - api_endpoint=f"{location}-documentai.googleapis.com" + quota_project_id=project_id, + api_endpoint=f"{location}-documentai.googleapis.com", ) self._client = DocumentProcessorServiceClient( client_options=options, client_info=get_client_info(module="document-ai"), ) + # get processor type + self._processor_type = self._client.get_processor(name=processor_name).type + if self._processor_type == "LAYOUT_PARSER_PROCESSOR": + self._use_layout_parser = True + else: + self._use_layout_parser = False def lazy_parse(self, blob: Blob) -> Iterator[Document]: """Parses a blob lazily. @@ -113,31 +121,83 @@ def lazy_parse(self, blob: Blob) -> Iterator[Document]: """ yield from self.batch_parse([blob], gcs_output_path=self._gcs_output_path) + def _prepare_process_options( + self, + enable_native_pdf_parsing: Optional[bool] = True, + page_range: Optional[List[int]] = None, + chunk_size: Optional[int] = 500, + include_ancestor_headings: Optional[bool] = True, + ) -> "ProcessOptions": + """Prepare process options for DocAI process request + + Args: + enable_native_pdf_parsing: enable pdf embedded text extraction + page_range: list of page numbers to parse. If `None`, + entire document will be parsed. + chunk_size: maximum number of characters per chunk (supported + only with Document AI Layout Parser processor). + include_ancestor_headings: whether or not to include ancestor + headings when splitting (supported only + with Document AI Layout Parser processor). + """ + try: + from google.cloud.documentai_v1.types import OcrConfig, ProcessOptions + except ImportError as exc: + raise ImportError( + "documentai package not found, please install it with " + "`pip install langchain-google-community[docai]`" + ) from exc + if self._use_layout_parser: + layout_config = ProcessOptions.LayoutConfig( + chunking_config=ProcessOptions.LayoutConfig.ChunkingConfig( + chunk_size=chunk_size, + include_ancestor_headings=include_ancestor_headings, + ) + ) + individual_page_selector = ( + ProcessOptions.IndividualPageSelector(pages=page_range) + if page_range + else None + ) + process_options = ProcessOptions( + layout_config=layout_config, + individual_page_selector=individual_page_selector, + ) + else: + ocr_config = ( + OcrConfig(enable_native_pdf_parsing=enable_native_pdf_parsing) + if enable_native_pdf_parsing + else None + ) + individual_page_selector = ( + ProcessOptions.IndividualPageSelector(pages=page_range) + if page_range + else None + ) + process_options = ProcessOptions( + ocr_config=ocr_config, individual_page_selector=individual_page_selector + ) + + return process_options + def online_process( self, blob: Blob, - enable_native_pdf_parsing: bool = True, field_mask: Optional[str] = None, - page_range: Optional[List[int]] = None, + **process_options_kwargs: Any, ) -> Iterator[Document]: """Parses a blob lazily using online processing. Args: blob: a blob to parse. - enable_native_pdf_parsing: enable pdf embedded text extraction field_mask: a comma-separated list of which fields to include in the Document AI response. suggested: "text,pages.pageNumber,pages.layout" - page_range: list of page numbers to parse. If `None`, - entire document will be parsed. + process_options_kwargs: optional parameters to pass to the Document + AI processors """ try: from google.cloud import documentai - from google.cloud.documentai_v1.types import ( # type: ignore[import, attr-defined] - IndividualPageSelector, - OcrConfig, - ProcessOptions, - ) except ImportError as exc: raise ImportError( "Could not import google-cloud-documentai python package. " @@ -153,14 +213,9 @@ def online_process( "documentai_toolbox package not found, please install it with " "`pip install langchain-google-community[docai]`" ) from exc - ocr_config = ( - OcrConfig(enable_native_pdf_parsing=enable_native_pdf_parsing) - if enable_native_pdf_parsing - else None - ) - individual_page_selector = ( - IndividualPageSelector(pages=page_range) if page_range else None - ) + + # prepare process options + process_options = self._prepare_process_options(**process_options_kwargs) response = self._client.process_document( documentai.ProcessRequest( @@ -169,24 +224,34 @@ def online_process( gcs_uri=blob.path, mime_type=blob.mimetype or "application/pdf", ), - process_options=ProcessOptions( - ocr_config=ocr_config, - individual_page_selector=individual_page_selector, - ), + process_options=process_options, skip_human_review=True, field_mask=field_mask, ) ) - yield from ( - Document( - page_content=_text_from_layout(page.layout, response.document.text), - metadata={ - "page": page.page_number, - "source": blob.path, - }, + + if self._use_layout_parser: + yield from ( + Document( + page_content=chunk.content, + metadata={ + "chunk_id": chunk.chunk_id, + "source": blob.path, + }, + ) + for chunk in response.document.chunked_document.chunks + ) + else: + yield from ( + Document( + page_content=_text_from_layout(page.layout, response.document.text), + metadata={ + "page": page.page_number, + "source": blob.path, + }, + ) + for page in response.document.pages ) - for page in response.document.pages - ) def batch_parse( self, @@ -194,6 +259,7 @@ def batch_parse( gcs_output_path: Optional[str] = None, timeout_sec: int = 3600, check_in_interval_sec: int = 60, + **process_options_kwargs: Any, ) -> Iterator[Document]: """Parses a list of blobs lazily. @@ -202,7 +268,10 @@ def batch_parse( gcs_output_path: a path on Google Cloud Storage to store parsing results. timeout_sec: a timeout to wait for Document AI to complete, in seconds. check_in_interval_sec: an interval to wait until next check - whether parsing operations have been completed, in seconds + whether parsing operations have been completed, in seconds. + process_options_kwargs: optional parameters to pass to the Document + AI processors + This is a long-running operation. A recommended way is to decouple parsing from creating LangChain Documents: >>> operations = parser.docai_parse(blobs, gcs_path) @@ -219,7 +288,9 @@ def batch_parse( raise ValueError( "An output path on Google Cloud Storage should be provided." ) - operations = self.docai_parse(blobs, gcs_output_path=output_path) + operations = self.docai_parse( + blobs, gcs_output_path=output_path, **process_options_kwargs + ) operation_names = [op.operation.name for op in operations] logger.debug( "Started parsing with Document AI, submitted operations %s", operation_names @@ -255,15 +326,31 @@ def parse_from_results( ) from exc for result in results: gcs_bucket_name, gcs_prefix = split_gcs_uri(result.parsed_path) - shards = _get_shards(gcs_bucket_name, gcs_prefix) - yield from ( - Document( - page_content=_text_from_layout(page.layout, shard.text), - metadata={"page": page.page_number, "source": result.source_path}, + shards = _get_shards(gcs_bucket_name, gcs_prefix + "/") + if self._use_layout_parser: + yield from ( + Document( + page_content=chunk.content, + metadata={ + "chunk_id": chunk.chunk_id, + "source": result.source_path, + }, + ) + for shard in shards + for chunk in shard.chunked_document.chunks + ) + else: + yield from ( + Document( + page_content=_text_from_layout(page.layout, shard.text), + metadata={ + "page": page.page_number, + "source": result.source_path, + }, + ) + for shard in shards + for page in shard.pages ) - for shard in shards - for page in shard.pages - ) def operations_from_names(self, operation_names: List[str]) -> List["Operation"]: """Initializes Long-Running Operations from their names.""" @@ -292,8 +379,8 @@ def docai_parse( gcs_output_path: Optional[str] = None, processor_name: Optional[str] = None, batch_size: int = 1000, - enable_native_pdf_parsing: bool = True, field_mask: Optional[str] = None, + **process_options_kwargs: Any, ) -> List["Operation"]: """Runs Google Document AI PDF Batch Processing on a list of blobs. @@ -302,10 +389,11 @@ def docai_parse( gcs_output_path: a path (folder) on GCS to store results processor_name: name of a Document AI processor. batch_size: amount of documents per batch - enable_native_pdf_parsing: a config option for the parser field_mask: a comma-separated list of which fields to include in the Document AI response. suggested: "text,pages.pageNumber,pages.layout" + process_options_kwargs: optional parameters to pass to the Document + AI processors Document AI has a 1000 file limit per batch, so batches larger than that need to be split into multiple requests. @@ -314,7 +402,6 @@ def docai_parse( """ try: from google.cloud import documentai - from google.cloud.documentai_v1.types import OcrConfig, ProcessOptions except ImportError as exc: raise ImportError( "documentai package not found, please install it with " @@ -350,15 +437,8 @@ def docai_parse( ) ) - process_options = ( - ProcessOptions( - ocr_config=OcrConfig( - enable_native_pdf_parsing=enable_native_pdf_parsing - ) - ) - if enable_native_pdf_parsing - else None - ) + process_options = self._prepare_process_options(**process_options_kwargs) + operations.append( self._client.batch_process_documents( documentai.BatchProcessRequest( diff --git a/libs/community/tests/integration_tests/terraform/main.tf b/libs/community/tests/integration_tests/terraform/main.tf index adb1c25e..532f405e 100644 --- a/libs/community/tests/integration_tests/terraform/main.tf +++ b/libs/community/tests/integration_tests/terraform/main.tf @@ -5,8 +5,9 @@ module "cloudbuild" { project_id = "" cloudbuildv2_repository_id = "" cloudbuild_env_vars = { - DATA_STORE_ID = "" - IMAGE_GCS_PATH = "gs://cloud-samples-data/vision/label/wakeupcat.jpg" + DATA_STORE_ID = "", + IMAGE_GCS_PATH = "gs://cloud-samples-data/vision/label/wakeupcat.jpg", + PROCESSOR_NAME = "" } cloudbuild_secret_vars = { GOOGLE_API_KEY = "" diff --git a/libs/community/tests/integration_tests/test_docai.py b/libs/community/tests/integration_tests/test_docai.py new file mode 100644 index 00000000..8badd39c --- /dev/null +++ b/libs/community/tests/integration_tests/test_docai.py @@ -0,0 +1,21 @@ +"""Integration tests for the Google Cloud DocAI parser.""" + +import os + +import pytest +from langchain_core.document_loaders.blob_loaders import Blob + +from langchain_google_community.docai import DocAIParser + + +@pytest.mark.extended +def test_docai_layout_parser() -> None: + processor_name = os.environ["PROCESSOR_NAME"] + parser = DocAIParser(processor_name=processor_name, location="us") + assert parser._use_layout_parser is True + blob = Blob( + data=None, + path="gs://cloud-samples-data/gen-app-builder/search/alphabet-investor-pdfs/2022Q1_alphabet_earnings_release.pdf", + ) + docs = list(parser.online_process(blob=blob)) + assert len(docs) == 11