Skip to content

Commit

Permalink
Added support for Google Cloud Document AI Layout Parser (langchain-a…
Browse files Browse the repository at this point in the history
  • Loading branch information
RajeshThallam authored Aug 30, 2024
1 parent fc632d9 commit ed5c457
Show file tree
Hide file tree
Showing 3 changed files with 160 additions and 58 deletions.
192 changes: 136 additions & 56 deletions libs/community/langchain_google_community/docai.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import re
import time
from dataclasses import dataclass
from typing import TYPE_CHECKING, Iterator, List, Optional, Sequence
from typing import TYPE_CHECKING, Any, Iterator, List, Optional, Sequence

from langchain_core.document_loaders import BaseBlobParser
from langchain_core.document_loaders.blob_loaders import Blob
Expand All @@ -23,7 +23,7 @@
from google.cloud.documentai import ( # type: ignore[import]
DocumentProcessorServiceClient,
)

from google.cloud.documentai_v1.types import ProcessOptions

logger = logging.getLogger(__name__)

Expand All @@ -47,6 +47,7 @@ def __init__(
self,
*,
client: Optional["DocumentProcessorServiceClient"] = None,
project_id: Optional[str] = None,
location: Optional[str] = None,
gcs_output_path: Optional[str] = None,
processor_name: Optional[str] = None,
Expand Down Expand Up @@ -95,12 +96,19 @@ def __init__(
"`pip install langchain-google-community[docai]`"
) from exc
options = ClientOptions(
api_endpoint=f"{location}-documentai.googleapis.com"
quota_project_id=project_id,
api_endpoint=f"{location}-documentai.googleapis.com",
)
self._client = DocumentProcessorServiceClient(
client_options=options,
client_info=get_client_info(module="document-ai"),
)
# get processor type
self._processor_type = self._client.get_processor(name=processor_name).type
if self._processor_type == "LAYOUT_PARSER_PROCESSOR":
self._use_layout_parser = True
else:
self._use_layout_parser = False

def lazy_parse(self, blob: Blob) -> Iterator[Document]:
"""Parses a blob lazily.
Expand All @@ -113,31 +121,83 @@ def lazy_parse(self, blob: Blob) -> Iterator[Document]:
"""
yield from self.batch_parse([blob], gcs_output_path=self._gcs_output_path)

def _prepare_process_options(
self,
enable_native_pdf_parsing: Optional[bool] = True,
page_range: Optional[List[int]] = None,
chunk_size: Optional[int] = 500,
include_ancestor_headings: Optional[bool] = True,
) -> "ProcessOptions":
"""Prepare process options for DocAI process request
Args:
enable_native_pdf_parsing: enable pdf embedded text extraction
page_range: list of page numbers to parse. If `None`,
entire document will be parsed.
chunk_size: maximum number of characters per chunk (supported
only with Document AI Layout Parser processor).
include_ancestor_headings: whether or not to include ancestor
headings when splitting (supported only
with Document AI Layout Parser processor).
"""
try:
from google.cloud.documentai_v1.types import OcrConfig, ProcessOptions
except ImportError as exc:
raise ImportError(
"documentai package not found, please install it with "
"`pip install langchain-google-community[docai]`"
) from exc
if self._use_layout_parser:
layout_config = ProcessOptions.LayoutConfig(
chunking_config=ProcessOptions.LayoutConfig.ChunkingConfig(
chunk_size=chunk_size,
include_ancestor_headings=include_ancestor_headings,
)
)
individual_page_selector = (
ProcessOptions.IndividualPageSelector(pages=page_range)
if page_range
else None
)
process_options = ProcessOptions(
layout_config=layout_config,
individual_page_selector=individual_page_selector,
)
else:
ocr_config = (
OcrConfig(enable_native_pdf_parsing=enable_native_pdf_parsing)
if enable_native_pdf_parsing
else None
)
individual_page_selector = (
ProcessOptions.IndividualPageSelector(pages=page_range)
if page_range
else None
)
process_options = ProcessOptions(
ocr_config=ocr_config, individual_page_selector=individual_page_selector
)

return process_options

def online_process(
self,
blob: Blob,
enable_native_pdf_parsing: bool = True,
field_mask: Optional[str] = None,
page_range: Optional[List[int]] = None,
**process_options_kwargs: Any,
) -> Iterator[Document]:
"""Parses a blob lazily using online processing.
Args:
blob: a blob to parse.
enable_native_pdf_parsing: enable pdf embedded text extraction
field_mask: a comma-separated list of which fields to include in the
Document AI response.
suggested: "text,pages.pageNumber,pages.layout"
page_range: list of page numbers to parse. If `None`,
entire document will be parsed.
process_options_kwargs: optional parameters to pass to the Document
AI processors
"""
try:
from google.cloud import documentai
from google.cloud.documentai_v1.types import ( # type: ignore[import, attr-defined]
IndividualPageSelector,
OcrConfig,
ProcessOptions,
)
except ImportError as exc:
raise ImportError(
"Could not import google-cloud-documentai python package. "
Expand All @@ -153,14 +213,9 @@ def online_process(
"documentai_toolbox package not found, please install it with "
"`pip install langchain-google-community[docai]`"
) from exc
ocr_config = (
OcrConfig(enable_native_pdf_parsing=enable_native_pdf_parsing)
if enable_native_pdf_parsing
else None
)
individual_page_selector = (
IndividualPageSelector(pages=page_range) if page_range else None
)

# prepare process options
process_options = self._prepare_process_options(**process_options_kwargs)

response = self._client.process_document(
documentai.ProcessRequest(
Expand All @@ -169,31 +224,42 @@ def online_process(
gcs_uri=blob.path,
mime_type=blob.mimetype or "application/pdf",
),
process_options=ProcessOptions(
ocr_config=ocr_config,
individual_page_selector=individual_page_selector,
),
process_options=process_options,
skip_human_review=True,
field_mask=field_mask,
)
)
yield from (
Document(
page_content=_text_from_layout(page.layout, response.document.text),
metadata={
"page": page.page_number,
"source": blob.path,
},

if self._use_layout_parser:
yield from (
Document(
page_content=chunk.content,
metadata={
"chunk_id": chunk.chunk_id,
"source": blob.path,
},
)
for chunk in response.document.chunked_document.chunks
)
else:
yield from (
Document(
page_content=_text_from_layout(page.layout, response.document.text),
metadata={
"page": page.page_number,
"source": blob.path,
},
)
for page in response.document.pages
)
for page in response.document.pages
)

def batch_parse(
self,
blobs: Sequence[Blob],
gcs_output_path: Optional[str] = None,
timeout_sec: int = 3600,
check_in_interval_sec: int = 60,
**process_options_kwargs: Any,
) -> Iterator[Document]:
"""Parses a list of blobs lazily.
Expand All @@ -202,7 +268,10 @@ def batch_parse(
gcs_output_path: a path on Google Cloud Storage to store parsing results.
timeout_sec: a timeout to wait for Document AI to complete, in seconds.
check_in_interval_sec: an interval to wait until next check
whether parsing operations have been completed, in seconds
whether parsing operations have been completed, in seconds.
process_options_kwargs: optional parameters to pass to the Document
AI processors
This is a long-running operation. A recommended way is to decouple
parsing from creating LangChain Documents:
>>> operations = parser.docai_parse(blobs, gcs_path)
Expand All @@ -219,7 +288,9 @@ def batch_parse(
raise ValueError(
"An output path on Google Cloud Storage should be provided."
)
operations = self.docai_parse(blobs, gcs_output_path=output_path)
operations = self.docai_parse(
blobs, gcs_output_path=output_path, **process_options_kwargs
)
operation_names = [op.operation.name for op in operations]
logger.debug(
"Started parsing with Document AI, submitted operations %s", operation_names
Expand Down Expand Up @@ -255,15 +326,31 @@ def parse_from_results(
) from exc
for result in results:
gcs_bucket_name, gcs_prefix = split_gcs_uri(result.parsed_path)
shards = _get_shards(gcs_bucket_name, gcs_prefix)
yield from (
Document(
page_content=_text_from_layout(page.layout, shard.text),
metadata={"page": page.page_number, "source": result.source_path},
shards = _get_shards(gcs_bucket_name, gcs_prefix + "/")
if self._use_layout_parser:
yield from (
Document(
page_content=chunk.content,
metadata={
"chunk_id": chunk.chunk_id,
"source": result.source_path,
},
)
for shard in shards
for chunk in shard.chunked_document.chunks
)
else:
yield from (
Document(
page_content=_text_from_layout(page.layout, shard.text),
metadata={
"page": page.page_number,
"source": result.source_path,
},
)
for shard in shards
for page in shard.pages
)
for shard in shards
for page in shard.pages
)

def operations_from_names(self, operation_names: List[str]) -> List["Operation"]:
"""Initializes Long-Running Operations from their names."""
Expand Down Expand Up @@ -292,8 +379,8 @@ def docai_parse(
gcs_output_path: Optional[str] = None,
processor_name: Optional[str] = None,
batch_size: int = 1000,
enable_native_pdf_parsing: bool = True,
field_mask: Optional[str] = None,
**process_options_kwargs: Any,
) -> List["Operation"]:
"""Runs Google Document AI PDF Batch Processing on a list of blobs.
Expand All @@ -302,10 +389,11 @@ def docai_parse(
gcs_output_path: a path (folder) on GCS to store results
processor_name: name of a Document AI processor.
batch_size: amount of documents per batch
enable_native_pdf_parsing: a config option for the parser
field_mask: a comma-separated list of which fields to include in the
Document AI response.
suggested: "text,pages.pageNumber,pages.layout"
process_options_kwargs: optional parameters to pass to the Document
AI processors
Document AI has a 1000 file limit per batch, so batches larger than that need
to be split into multiple requests.
Expand All @@ -314,7 +402,6 @@ def docai_parse(
"""
try:
from google.cloud import documentai
from google.cloud.documentai_v1.types import OcrConfig, ProcessOptions
except ImportError as exc:
raise ImportError(
"documentai package not found, please install it with "
Expand Down Expand Up @@ -350,15 +437,8 @@ def docai_parse(
)
)

process_options = (
ProcessOptions(
ocr_config=OcrConfig(
enable_native_pdf_parsing=enable_native_pdf_parsing
)
)
if enable_native_pdf_parsing
else None
)
process_options = self._prepare_process_options(**process_options_kwargs)

operations.append(
self._client.batch_process_documents(
documentai.BatchProcessRequest(
Expand Down
5 changes: 3 additions & 2 deletions libs/community/tests/integration_tests/terraform/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ module "cloudbuild" {
project_id = ""
cloudbuildv2_repository_id = ""
cloudbuild_env_vars = {
DATA_STORE_ID = ""
IMAGE_GCS_PATH = "gs://cloud-samples-data/vision/label/wakeupcat.jpg"
DATA_STORE_ID = "",
IMAGE_GCS_PATH = "gs://cloud-samples-data/vision/label/wakeupcat.jpg",
PROCESSOR_NAME = ""
}
cloudbuild_secret_vars = {
GOOGLE_API_KEY = ""
Expand Down
21 changes: 21 additions & 0 deletions libs/community/tests/integration_tests/test_docai.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
"""Integration tests for the Google Cloud DocAI parser."""

import os

import pytest
from langchain_core.document_loaders.blob_loaders import Blob

from langchain_google_community.docai import DocAIParser


@pytest.mark.extended
def test_docai_layout_parser() -> None:
processor_name = os.environ["PROCESSOR_NAME"]
parser = DocAIParser(processor_name=processor_name, location="us")
assert parser._use_layout_parser is True
blob = Blob(
data=None,
path="gs://cloud-samples-data/gen-app-builder/search/alphabet-investor-pdfs/2022Q1_alphabet_earnings_release.pdf",
)
docs = list(parser.online_process(blob=blob))
assert len(docs) == 11

0 comments on commit ed5c457

Please sign in to comment.