Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support bytestream in Unstructured API #1082

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@
from typing import Any, Dict, List, Literal, Optional, Union

from haystack import Document, component, default_from_dict, default_to_dict
from haystack.components.converters.utils import normalize_metadata
from haystack.components.converters.utils import get_bytestream_from_source, normalize_metadata
from haystack.dataclasses.byte_stream import ByteStream
from haystack.utils import Secret, deserialize_secrets_inplace
from tqdm import tqdm

from unstructured.documents.elements import Element
from unstructured.partition.api import partition_via_api
Expand Down Expand Up @@ -123,64 +123,79 @@ def from_dict(cls, data: Dict[str, Any]) -> "UnstructuredFileConverter":
@component.output_types(documents=List[Document])
def run(
self,
paths: Union[List[str], List[os.PathLike]],
sources: Union[List[Union[str, os.PathLike, ByteStream]]],
meta: Optional[Union[Dict[str, Any], List[Dict[str, Any]]]] = None,
):
"""
Convert files to Haystack Documents using the Unstructured API.
Convert files or byte streams to Haystack Documents using the Unstructured API.

:param paths: List of paths to convert. Paths can be files or directories.
If a path is a directory, all files in the directory are converted. Subdirectories are ignored.
:param sources: List of file paths or byte streams to convert.
Paths can be files or directories. ByteStream is also supported.
:param meta: Optional metadata to attach to the Documents.
This value can be either a list of dictionaries or a single dictionary.
If it's a single dictionary, its content is added to the metadata of all produced Documents.
If it's a list, the length of the list must match the number of paths, because the two lists will be zipped.
Please note that if the paths contain directories, `meta` can only be a single dictionary
(same metadata for all files).

This value can be a single dictionary or a list of dictionaries, matching the number of sources.
:returns: A dictionary with the following key:
- `documents`: List of Haystack Documents.

:raises ValueError: If `meta` is a list and `paths` contains directories.
:raises ValueError: If `meta` is a list and `sources` contains directories.
silvanocerza marked this conversation as resolved.
Show resolved Hide resolved
"""
paths_obj = [Path(path) for path in paths]
filepaths = [path for path in paths_obj if path.is_file()]
filepaths_in_directories = [
filepath for path in paths_obj if path.is_dir() for filepath in path.glob("*.*") if filepath.is_file()
]
if filepaths_in_directories and isinstance(meta, list):
error = """"If providing directories in the `paths` parameter,
`meta` can only be a dictionary (metadata applied to every file),
and not a list. To specify different metadata for each file,
provide an explicit list of direct paths instead."""
raise ValueError(error)

all_filepaths = filepaths + filepaths_in_directories
# currently, the files are converted sequentially to gently handle API failures

documents = []
meta_list = normalize_metadata(meta, sources_count=len(all_filepaths))

for filepath, metadata in tqdm(
zip(all_filepaths, meta_list), desc="Converting files to Haystack Documents", disable=not self.progress_bar
):
elements = self._partition_file_into_elements(filepath=filepath)
docs_for_file = self._create_documents(
filepath=filepath,
all_sources = []
meta_sources = normalize_metadata(meta, len(sources))

# Iterate over the sources
for source in sources:
if isinstance(source, (str, os.PathLike)):
path_obj = Path(source)

if path_obj.is_file():
# Add individual file
all_sources.append(path_obj)

elif path_obj.is_dir():
# Ensure meta is a dict when directories are provided
if not isinstance(meta, dict):
error = """"If providing directories in the `paths` parameter,
`meta` can only be a dictionary (metadata applied to every file),
and not a list. To specify different metadata for each file,
provide an explicit list of direct paths instead."""
raise ValueError(error)

# If the source is a directory, add all files in the directory
for file in path_obj.glob("*.*"):
if file.is_file():
all_sources.append(file) # Add each file in the directory

elif isinstance(source, ByteStream):
# Handle ByteStream
all_sources.append(source)

for source, metadata in zip(all_sources, meta_sources):
try:
bytestream = get_bytestream_from_source(source=source)
except Exception as e:
logger.warning("Could not read {source}. Skipping it. Error: {error}", source=source, error=e)
continue

elements = self._partition_source_into_elements(source=source)
merged_metadata = {**bytestream.meta, **metadata}

docs_for_stream = self._create_documents(
elements=elements,
document_creation_mode=self.document_creation_mode,
separator=self.separator,
meta=metadata,
meta=merged_metadata,
)
documents.extend(docs_for_file)
documents.extend(docs_for_stream)

return {"documents": documents}

@staticmethod
def _create_documents(
filepath: Path,
elements: List[Element],
document_creation_mode: Literal["one-doc-per-file", "one-doc-per-page", "one-doc-per-element"],
separator: str,
meta: Dict[str, Any],
filepath: Optional[Path] = None,
) -> List[Document]:
"""
Create Haystack Documents from the elements returned by Unstructured.
Expand All @@ -190,15 +205,17 @@ def _create_documents(
if document_creation_mode == "one-doc-per-file":
text = separator.join([str(el) for el in elements])
metadata = copy.deepcopy(meta)
metadata["file_path"] = str(filepath)
if filepath:
metadata["file_path"] = str(filepath) # Only include file path if provided
docs = [Document(content=text, meta=metadata)]

elif document_creation_mode == "one-doc-per-page":
texts_per_page: defaultdict[int, str] = defaultdict(str)
meta_per_page: defaultdict[int, dict] = defaultdict(dict)
for el in elements:
metadata = copy.deepcopy(meta)
metadata["file_path"] = str(filepath)
if filepath:
metadata["file_path"] = str(filepath)
if hasattr(el, "metadata"):
metadata.update(el.metadata.to_dict())
page_number = int(metadata.get("page_number", 1))
Expand All @@ -211,28 +228,39 @@ def _create_documents(
elif document_creation_mode == "one-doc-per-element":
for index, el in enumerate(elements):
metadata = copy.deepcopy(meta)
metadata["file_path"] = str(filepath)
if filepath:
metadata["file_path"] = str(filepath)
metadata["element_index"] = index
if hasattr(el, "metadata"):
metadata.update(el.metadata.to_dict())
if hasattr(el, "category"):
metadata["category"] = el.category
doc = Document(content=str(el), meta=metadata)
docs.append(doc)

return docs

def _partition_file_into_elements(self, filepath: Path) -> List[Element]:
def _partition_source_into_elements(self, source: Union[Path, ByteStream]) -> List[Element]:
"""
Partition a file into elements using the Unstructured API.
"""
elements = []
try:
elements = partition_via_api(
filename=str(filepath),
api_url=self.api_url,
api_key=self.api_key.resolve_value() if self.api_key else None,
**self.unstructured_kwargs,
)
if isinstance(source, Path):
elements = partition_via_api(
filename=str(source),
api_url=self.api_url,
api_key=self.api_key.resolve_value() if self.api_key else None,
**self.unstructured_kwargs,
)
else:
elements = partition_via_api(
file=source.data,
metadata_filename=str(source.meta),
api_url=self.api_url,
api_key=self.api_key.resolve_value() if self.api_key else None,
**self.unstructured_kwargs,
)
except Exception as e:
logger.warning(f"Unstructured could not process file {filepath}. Error: {e}")
logger.warning(f"Unstructured could not process source {source}. Error: {e}")
return elements
100 changes: 94 additions & 6 deletions integrations/unstructured/tests/test_converter.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
# SPDX-FileCopyrightText: 2023-present deepset GmbH <[email protected]>
#
# SPDX-License-Identifier: Apache-2.0
from pathlib import Path

import pytest
from haystack.dataclasses.byte_stream import ByteStream

from haystack_integrations.components.converters.unstructured import UnstructuredFileConverter


Expand Down Expand Up @@ -86,6 +90,34 @@ def test_run_one_doc_per_file(self, samples_path):
assert len(documents) == 1
assert documents[0].meta == {"file_path": str(pdf_path)}

@pytest.mark.integration
def test_run_one_doc_per_file_bytestream(self, samples_path):
pdf_path = samples_path / "sample_pdf.pdf"
pdf_stream = ByteStream.from_file_path(pdf_path)

local_converter = UnstructuredFileConverter(
api_url="http://localhost:8000/general/v0/general", document_creation_mode="one-doc-per-file"
)

documents = local_converter.run([pdf_stream])["documents"]

assert len(documents) == 1

@pytest.mark.integration
def test_run_one_doc_per_page_bytestream(self, samples_path):
pdf_path = samples_path / "sample_pdf.pdf"
pdf_stream = ByteStream.from_file_path(pdf_path)

local_converter = UnstructuredFileConverter(
api_url="http://localhost:8000/general/v0/general", document_creation_mode="one-doc-per-page"
)

documents = local_converter.run([pdf_stream])["documents"]

assert len(documents) == 4
for i, doc in enumerate(documents, start=1):
assert doc.meta["page_number"] == i

@pytest.mark.integration
def test_run_one_doc_per_page(self, samples_path):
pdf_path = samples_path / "sample_pdf.pdf"
Expand Down Expand Up @@ -127,7 +159,7 @@ def test_run_one_doc_per_file_with_meta(self, samples_path):
api_url="http://localhost:8000/general/v0/general", document_creation_mode="one-doc-per-file"
)

documents = local_converter.run(paths=[pdf_path], meta=meta)["documents"]
documents = local_converter.run(sources=[pdf_path], meta=meta)["documents"]

assert len(documents) == 1
assert documents[0].meta["file_path"] == str(pdf_path)
Expand All @@ -143,7 +175,7 @@ def test_run_one_doc_per_page_with_meta(self, samples_path):
api_url="http://localhost:8000/general/v0/general", document_creation_mode="one-doc-per-page"
)

documents = local_converter.run(paths=[pdf_path], meta=meta)["documents"]
documents = local_converter.run(sources=[pdf_path], meta=meta)["documents"]
assert len(documents) == 4
for i, doc in enumerate(documents, start=1):
assert doc.meta["file_path"] == str(pdf_path)
Expand All @@ -159,7 +191,7 @@ def test_run_one_doc_per_element_with_meta(self, samples_path):
api_url="http://localhost:8000/general/v0/general", document_creation_mode="one-doc-per-element"
)

documents = local_converter.run(paths=[pdf_path], meta=meta)["documents"]
documents = local_converter.run(sources=[pdf_path], meta=meta)["documents"]

assert len(documents) > 4
first_element_index = 0
Expand All @@ -185,7 +217,7 @@ def test_run_one_doc_per_element_with_meta_list_two_files(self, samples_path):
api_url="http://localhost:8000/general/v0/general", document_creation_mode="one-doc-per-element"
)

documents = local_converter.run(paths=pdf_path, meta=meta)["documents"]
documents = local_converter.run(sources=pdf_path, meta=meta)["documents"]

assert len(documents) > 4
for doc in documents:
Expand All @@ -205,7 +237,7 @@ def test_run_one_doc_per_element_with_meta_list_folder_fail(self, samples_path):
api_url="http://localhost:8000/general/v0/general", document_creation_mode="one-doc-per-element"
)
with pytest.raises(ValueError):
local_converter.run(paths=pdf_path, meta=meta)["documents"]
local_converter.run(sources=pdf_path, meta=meta)["documents"]

@pytest.mark.integration
def test_run_one_doc_per_element_with_meta_list_folder(self, samples_path):
Expand All @@ -216,7 +248,7 @@ def test_run_one_doc_per_element_with_meta_list_folder(self, samples_path):
api_url="http://localhost:8000/general/v0/general", document_creation_mode="one-doc-per-element"
)

documents = local_converter.run(paths=pdf_path, meta=meta)["documents"]
documents = local_converter.run(sources=pdf_path, meta=meta)["documents"]

assert len(documents) > 4
for doc in documents:
Expand All @@ -226,3 +258,59 @@ def test_run_one_doc_per_element_with_meta_list_folder(self, samples_path):
assert "category" in doc.meta
assert "common_meta" in doc.meta
assert doc.meta["common_meta"] == "common"

@pytest.mark.integration
def test_run_one_doc_per_element_with_meta_list_multiple_sources(self, samples_path):
sources = [
ByteStream(data=b"content", meta={"file_path": "some_file.md"}),
"README.md",
ByteStream(data=b"content", meta={"file_path": "yet_another_file.md"}),
Path(__file__),
ByteStream(data=b"content", meta={"file_path": "my_file.md"}),
]

meta = [
{"type": "ByteStream"},
{"type": "str"},
{"type": "ByteStream"},
{"type": "Path"},
{"type": "ByteStream"},
]

local_converter = UnstructuredFileConverter(
api_url="http://localhost:8000/general/v0/general", document_creation_mode="one-doc-per-page"
)

documents = local_converter.run(sources=sources, meta=meta)["documents"]

assert len(documents) == 5
assert documents[0].meta["type"] == "ByteStream"
assert documents[0].meta["file_path"] == "some_file.md"
assert documents[1].meta["type"] == "str"
assert documents[1].meta["file_path"] == "README.md"
assert documents[2].meta["type"] == "ByteStream"
assert documents[2].meta["file_path"] == "yet_another_file.md"
assert documents[3].meta["type"] == "Path"
assert documents[4].meta["type"] == "ByteStream"
assert documents[4].meta["file_path"] == "my_file.md"

@pytest.mark.integration
def test_run_one_doc_per_element_with_meta_list_multiple_sources_directory(self, samples_path):
sources = [
"README.md",
ByteStream(data=b"Some content", meta={"file_path": "some_file.md"}),
samples_path,
Path(__file__),
]

meta = {"common_meta": "applies_to_all"}


local_converter = UnstructuredFileConverter(
api_url="http://localhost:8000/general/v0/general", document_creation_mode="one-doc-per-page"
)

documents = local_converter.run(sources=sources, meta=meta)["documents"]

for doc in documents:
assert doc.meta["common_meta"] == "applies_to_all"
Loading