From 1b3faceba250d174a030e1a7e42b0e0ee26bc318 Mon Sep 17 00:00:00 2001 From: alperkaya Date: Thu, 12 Sep 2024 14:39:23 +0200 Subject: [PATCH 1/5] add bytestream into unstructured --- .../converters/unstructured/converter.py | 110 ++++++++++++------ .../unstructured/tests/test_converter.py | 41 ++++++- 2 files changed, 111 insertions(+), 40 deletions(-) diff --git a/integrations/unstructured/src/haystack_integrations/components/converters/unstructured/converter.py b/integrations/unstructured/src/haystack_integrations/components/converters/unstructured/converter.py index 637c0840f..d3ccd0df1 100644 --- a/integrations/unstructured/src/haystack_integrations/components/converters/unstructured/converter.py +++ b/integrations/unstructured/src/haystack_integrations/components/converters/unstructured/converter.py @@ -6,10 +6,11 @@ import os from collections import defaultdict from pathlib import Path -from typing import Any, Dict, List, Literal, Optional, Union +from typing import Any, Dict, List, Literal, Optional, Union, Tuple from haystack import Document, component, default_from_dict, default_to_dict from haystack.components.converters.utils import normalize_metadata +from haystack.dataclasses.byte_stream import ByteStream from haystack.utils import Secret, deserialize_secrets_inplace from tqdm import tqdm @@ -123,31 +124,25 @@ def from_dict(cls, data: Dict[str, Any]) -> "UnstructuredFileConverter": @component.output_types(documents=List[Document]) def run( self, - paths: Union[List[str], List[os.PathLike]], + sources: Union[List[Union[str, os.PathLike, ByteStream]]], meta: Optional[Union[Dict[str, Any], List[Dict[str, Any]]]] = None, ): """ - Convert files to Haystack Documents using the Unstructured API. + Convert files or byte streams to Haystack Documents using the Unstructured API. - :param paths: List of paths to convert. Paths can be files or directories. - If a path is a directory, all files in the directory are converted. Subdirectories are ignored. + :param sources: List of file paths or byte streams to convert. + Paths can be files or directories. Byte streams are also supported. :param meta: Optional metadata to attach to the Documents. - This value can be either a list of dictionaries or a single dictionary. - If it's a single dictionary, its content is added to the metadata of all produced Documents. - If it's a list, the length of the list must match the number of paths, because the two lists will be zipped. - Please note that if the paths contain directories, `meta` can only be a single dictionary - (same metadata for all files). - + This value can be a single dictionary or a list of dictionaries, matching the number of sources. :returns: A dictionary with the following key: - `documents`: List of Haystack Documents. - - :raises ValueError: If `meta` is a list and `paths` contains directories. + :raises ValueError: If `meta` is a list and `sources` contains directories. """ - paths_obj = [Path(path) for path in paths] - filepaths = [path for path in paths_obj if path.is_file()] - filepaths_in_directories = [ - filepath for path in paths_obj if path.is_dir() for filepath in path.glob("*.*") if filepath.is_file() - ] + + # Separate file paths and byte streams + filepaths, filepaths_in_directories, byte_streams = self._get_sources(sources) + + if filepaths_in_directories and isinstance(meta, list): error = """"If providing directories in the `paths` parameter, `meta` can only be a dictionary (metadata applied to every file), @@ -155,15 +150,16 @@ def run( provide an explicit list of direct paths instead.""" raise ValueError(error) + # Combine file paths and directories for processing all_filepaths = filepaths + filepaths_in_directories - # currently, the files are converted sequentially to gently handle API failures documents = [] - meta_list = normalize_metadata(meta, sources_count=len(all_filepaths)) + meta_list = normalize_metadata(meta, sources_count=len(all_filepaths) + len(byte_streams)) + # Process file paths for filepath, metadata in tqdm( - zip(all_filepaths, meta_list), desc="Converting files to Haystack Documents", disable=not self.progress_bar + zip(all_filepaths, meta_list[:len(all_filepaths)]), desc="Converting files to Haystack Documents" ): - elements = self._partition_file_into_elements(filepath=filepath) + elements = self._partition_source_into_elements(source=filepath) docs_for_file = self._create_documents( filepath=filepath, elements=elements, @@ -172,15 +168,47 @@ def run( meta=metadata, ) documents.extend(docs_for_file) + + # Process byte streams + for bytestream in byte_streams: + elements = self._partition_source_into_elements(source=bytestream) + docs_for_stream = self._create_documents( + elements=elements, + document_creation_mode=self.document_creation_mode, + separator=self.separator, + meta=bytestream.meta, + ) + documents.extend(docs_for_stream) + return {"documents": documents} + def _get_sources( + self, + sources: Union[List[str], List[os.PathLike], List[ByteStream]] + ) -> Tuple[List[Path], List[Path], List[ByteStream]]: + """ + Helper function to process and return file paths, directories, and byte streams separately. + """ + paths_obj = [Path(source) for source in sources if isinstance(source, (str, os.PathLike))] + byte_streams = [source for source in sources if isinstance(source, ByteStream)] + + # Separate files and directories + filepaths = [path for path in paths_obj if path.is_file()] + directories = [path for path in paths_obj if path.is_dir()] + + filepaths_in_directories = [ + filepath for path in paths_obj if path.is_dir() for filepath in path.glob("*.*") if filepath.is_file() + ] + + return filepaths, filepaths_in_directories, byte_streams + @staticmethod def _create_documents( - filepath: Path, elements: List[Element], document_creation_mode: Literal["one-doc-per-file", "one-doc-per-page", "one-doc-per-element"], separator: str, meta: Dict[str, Any], + filepath: Optional[Path] = None, ) -> List[Document]: """ Create Haystack Documents from the elements returned by Unstructured. @@ -190,7 +218,8 @@ def _create_documents( if document_creation_mode == "one-doc-per-file": text = separator.join([str(el) for el in elements]) metadata = copy.deepcopy(meta) - metadata["file_path"] = str(filepath) + if filepath: + metadata["file_path"] = str(filepath) # Only include file path if provided docs = [Document(content=text, meta=metadata)] elif document_creation_mode == "one-doc-per-page": @@ -198,7 +227,8 @@ def _create_documents( meta_per_page: defaultdict[int, dict] = defaultdict(dict) for el in elements: metadata = copy.deepcopy(meta) - metadata["file_path"] = str(filepath) + if filepath: + metadata["file_path"] = str(filepath) if hasattr(el, "metadata"): metadata.update(el.metadata.to_dict()) page_number = int(metadata.get("page_number", 1)) @@ -211,7 +241,8 @@ def _create_documents( elif document_creation_mode == "one-doc-per-element": for index, el in enumerate(elements): metadata = copy.deepcopy(meta) - metadata["file_path"] = str(filepath) + if filepath: + metadata["file_path"] = str(filepath) metadata["element_index"] = index if hasattr(el, "metadata"): metadata.update(el.metadata.to_dict()) @@ -219,20 +250,31 @@ def _create_documents( metadata["category"] = el.category doc = Document(content=str(el), meta=metadata) docs.append(doc) + return docs - def _partition_file_into_elements(self, filepath: Path) -> List[Element]: + + def _partition_source_into_elements(self, source: Union[Path, ByteStream]) -> List[Element]: """ Partition a file into elements using the Unstructured API. """ elements = [] try: - elements = partition_via_api( - filename=str(filepath), - api_url=self.api_url, - api_key=self.api_key.resolve_value() if self.api_key else None, - **self.unstructured_kwargs, - ) + if isinstance(source, Path): + elements = partition_via_api( + filename=str(source), + api_url=self.api_url, + api_key=self.api_key.resolve_value() if self.api_key else None, + **self.unstructured_kwargs, + ) + else: + elements = partition_via_api( + file=source.data, + metadata_filename=str(source.meta), + api_url=self.api_url, + api_key=self.api_key.resolve_value() if self.api_key else None, + **self.unstructured_kwargs, + ) except Exception as e: - logger.warning(f"Unstructured could not process file {filepath}. Error: {e}") + logger.warning(f"Unstructured could not process source {source}. Error: {e}") return elements diff --git a/integrations/unstructured/tests/test_converter.py b/integrations/unstructured/tests/test_converter.py index 5d1a6c091..3021f4d3a 100644 --- a/integrations/unstructured/tests/test_converter.py +++ b/integrations/unstructured/tests/test_converter.py @@ -3,6 +3,7 @@ # SPDX-License-Identifier: Apache-2.0 import pytest from haystack_integrations.components.converters.unstructured import UnstructuredFileConverter +from haystack.dataclasses.byte_stream import ByteStream class TestUnstructuredFileConverter: @@ -86,6 +87,34 @@ def test_run_one_doc_per_file(self, samples_path): assert len(documents) == 1 assert documents[0].meta == {"file_path": str(pdf_path)} + @pytest.mark.integration + def test_run_one_doc_per_file_bytestream(self, samples_path): + pdf_path = samples_path / "sample_pdf.pdf" + pdf_stream = ByteStream.from_file_path(pdf_path) + + local_converter = UnstructuredFileConverter( + api_url="http://localhost:8000/general/v0/general", document_creation_mode="one-doc-per-file" + ) + + documents = local_converter.run([pdf_stream])["documents"] + + assert len(documents) == 1 + + @pytest.mark.integration + def test_run_one_doc_per_page_bytestream(self, samples_path): + pdf_path = samples_path / "sample_pdf.pdf" + pdf_stream = ByteStream.from_file_path(pdf_path) + + local_converter = UnstructuredFileConverter( + api_url="http://localhost:8000/general/v0/general", document_creation_mode="one-doc-per-page" + ) + + documents = local_converter.run([pdf_stream])["documents"] + + assert len(documents) == 4 + for i, doc in enumerate(documents, start=1): + assert doc.meta["page_number"] == i + @pytest.mark.integration def test_run_one_doc_per_page(self, samples_path): pdf_path = samples_path / "sample_pdf.pdf" @@ -127,7 +156,7 @@ def test_run_one_doc_per_file_with_meta(self, samples_path): api_url="http://localhost:8000/general/v0/general", document_creation_mode="one-doc-per-file" ) - documents = local_converter.run(paths=[pdf_path], meta=meta)["documents"] + documents = local_converter.run(sources=[pdf_path], meta=meta)["documents"] assert len(documents) == 1 assert documents[0].meta["file_path"] == str(pdf_path) @@ -143,7 +172,7 @@ def test_run_one_doc_per_page_with_meta(self, samples_path): api_url="http://localhost:8000/general/v0/general", document_creation_mode="one-doc-per-page" ) - documents = local_converter.run(paths=[pdf_path], meta=meta)["documents"] + documents = local_converter.run(sources=[pdf_path], meta=meta)["documents"] assert len(documents) == 4 for i, doc in enumerate(documents, start=1): assert doc.meta["file_path"] == str(pdf_path) @@ -159,7 +188,7 @@ def test_run_one_doc_per_element_with_meta(self, samples_path): api_url="http://localhost:8000/general/v0/general", document_creation_mode="one-doc-per-element" ) - documents = local_converter.run(paths=[pdf_path], meta=meta)["documents"] + documents = local_converter.run(sources=[pdf_path], meta=meta)["documents"] assert len(documents) > 4 first_element_index = 0 @@ -185,7 +214,7 @@ def test_run_one_doc_per_element_with_meta_list_two_files(self, samples_path): api_url="http://localhost:8000/general/v0/general", document_creation_mode="one-doc-per-element" ) - documents = local_converter.run(paths=pdf_path, meta=meta)["documents"] + documents = local_converter.run(sources=pdf_path, meta=meta)["documents"] assert len(documents) > 4 for doc in documents: @@ -205,7 +234,7 @@ def test_run_one_doc_per_element_with_meta_list_folder_fail(self, samples_path): api_url="http://localhost:8000/general/v0/general", document_creation_mode="one-doc-per-element" ) with pytest.raises(ValueError): - local_converter.run(paths=pdf_path, meta=meta)["documents"] + local_converter.run(sources=pdf_path, meta=meta)["documents"] @pytest.mark.integration def test_run_one_doc_per_element_with_meta_list_folder(self, samples_path): @@ -216,7 +245,7 @@ def test_run_one_doc_per_element_with_meta_list_folder(self, samples_path): api_url="http://localhost:8000/general/v0/general", document_creation_mode="one-doc-per-element" ) - documents = local_converter.run(paths=pdf_path, meta=meta)["documents"] + documents = local_converter.run(sources=pdf_path, meta=meta)["documents"] assert len(documents) > 4 for doc in documents: From eac82a2931deccdecead92ec93a37411bff02236 Mon Sep 17 00:00:00 2001 From: alperkaya Date: Thu, 12 Sep 2024 14:55:09 +0200 Subject: [PATCH 2/5] fix lint --- .../components/converters/unstructured/converter.py | 7 +++---- integrations/unstructured/tests/test_converter.py | 3 ++- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/integrations/unstructured/src/haystack_integrations/components/converters/unstructured/converter.py b/integrations/unstructured/src/haystack_integrations/components/converters/unstructured/converter.py index 932d9b75b..7c1ece82d 100644 --- a/integrations/unstructured/src/haystack_integrations/components/converters/unstructured/converter.py +++ b/integrations/unstructured/src/haystack_integrations/components/converters/unstructured/converter.py @@ -6,7 +6,7 @@ import os from collections import defaultdict from pathlib import Path -from typing import Any, Dict, List, Literal, Optional, Union, Tuple +from typing import Any, Dict, List, Literal, Optional, Tuple, Union from haystack import Document, component, default_from_dict, default_to_dict from haystack.components.converters.utils import normalize_metadata @@ -124,7 +124,7 @@ def from_dict(cls, data: Dict[str, Any]) -> "UnstructuredFileConverter": @component.output_types(documents=List[Document]) def run( self, - sources: Union[List[Union[str, os.PathLike, ByteStream]]], + sources: Union[List[Union[str, os.PathLike, ByteStream]]], meta: Optional[Union[Dict[str, Any], List[Dict[str, Any]]]] = None, ): """ @@ -183,7 +183,7 @@ def run( return {"documents": documents} def _get_sources( - self, + self, sources: Union[List[str], List[os.PathLike], List[ByteStream]] ) -> Tuple[List[Path], List[Path], List[ByteStream]]: """ @@ -194,7 +194,6 @@ def _get_sources( # Separate files and directories filepaths = [path for path in paths_obj if path.is_file()] - directories = [path for path in paths_obj if path.is_dir()] filepaths_in_directories = [ filepath for path in paths_obj if path.is_dir() for filepath in path.glob("*.*") if filepath.is_file() diff --git a/integrations/unstructured/tests/test_converter.py b/integrations/unstructured/tests/test_converter.py index 3021f4d3a..48cd8e89e 100644 --- a/integrations/unstructured/tests/test_converter.py +++ b/integrations/unstructured/tests/test_converter.py @@ -2,9 +2,10 @@ # # SPDX-License-Identifier: Apache-2.0 import pytest -from haystack_integrations.components.converters.unstructured import UnstructuredFileConverter from haystack.dataclasses.byte_stream import ByteStream +from haystack_integrations.components.converters.unstructured import UnstructuredFileConverter + class TestUnstructuredFileConverter: @pytest.mark.usefixtures("set_env_variables") From 2686357687b99cb908c3fecdd5c151fc7cefee17 Mon Sep 17 00:00:00 2001 From: alperkaya Date: Fri, 13 Sep 2024 11:46:36 +0200 Subject: [PATCH 3/5] support all sources --- .../converters/unstructured/converter.py | 117 ++++++++++-------- .../unstructured/tests/test_converter.py | 41 ++++++ 2 files changed, 108 insertions(+), 50 deletions(-) diff --git a/integrations/unstructured/src/haystack_integrations/components/converters/unstructured/converter.py b/integrations/unstructured/src/haystack_integrations/components/converters/unstructured/converter.py index 7c1ece82d..aa22f4bc9 100644 --- a/integrations/unstructured/src/haystack_integrations/components/converters/unstructured/converter.py +++ b/integrations/unstructured/src/haystack_integrations/components/converters/unstructured/converter.py @@ -9,7 +9,7 @@ from typing import Any, Dict, List, Literal, Optional, Tuple, Union from haystack import Document, component, default_from_dict, default_to_dict -from haystack.components.converters.utils import normalize_metadata +from haystack.components.converters.utils import get_bytestream_from_source, normalize_metadata from haystack.dataclasses.byte_stream import ByteStream from haystack.utils import Secret, deserialize_secrets_inplace from tqdm import tqdm @@ -139,68 +139,85 @@ def run( :raises ValueError: If `meta` is a list and `sources` contains directories. """ - # Separate file paths and byte streams - filepaths, filepaths_in_directories, byte_streams = self._get_sources(sources) + # Process sources and maintain meta list alignment + # paths_obj = [] + # byte_streams = [] + + # for path in sources: + # if isinstance(path, (str, os.PathLike)): + # paths_obj.append(Path(path)) + # elif isinstance(path, ByteStream): + # byte_streams.append(path) + + # Filter out only file paths + # filepaths = [path for path in paths_obj if path.is_file()] + + # # For each directory, find all files and add them to the list (if there were any directories) + # filepaths_in_directories = [ + # filepath for path in paths_obj if path.is_dir() for filepath in path.glob("*.*") if filepath.is_file() + # ] + + # if filepaths_in_directories and isinstance(meta, list): + # error = """"If providing directories in the `paths` parameter, + # `meta` can only be a dictionary (metadata applied to every file), + # and not a list. To specify different metadata for each file, + # provide an explicit list of direct paths instead.""" + # raise ValueError(error) + + all_sources = [] + meta_sources = normalize_metadata(meta, len(sources)) # Use normalize_metadata here + + # Iterate over the sources + for i, source in enumerate(sources): + if isinstance(source, (str, os.PathLike)): + path_obj = Path(source) + + if path_obj.is_file(): + # Add individual file + all_sources.append(path_obj) + + elif path_obj.is_dir(): + # Ensure meta is a dict when directories are provided + if not isinstance(meta, dict): + error = """"If providing directories in the `paths` parameter, + `meta` can only be a dictionary (metadata applied to every file), + and not a list. To specify different metadata for each file, + provide an explicit list of direct paths instead.""" + raise ValueError(error) + + # If the source is a directory, add all files in the directory + for file in path_obj.glob("*.*"): + if file.is_file(): + all_sources.append(file) # Add each file in the directory + + elif isinstance(source, ByteStream): + # Handle ByteStream + all_sources.append(source) + documents = [] + #logger.info(f"meta_sources: {meta_sources}") - if filepaths_in_directories and isinstance(meta, list): - error = """"If providing directories in the `paths` parameter, - `meta` can only be a dictionary (metadata applied to every file), - and not a list. To specify different metadata for each file, - provide an explicit list of direct paths instead.""" - raise ValueError(error) + for source, metadata in zip(all_sources, meta_sources): + try: + bytestream = get_bytestream_from_source(source=source) + except Exception as e: + logger.warning("Could not read {source}. Skipping it. Error: {error}", source=source, error=e) + continue - # Combine file paths and directories for processing - all_filepaths = filepaths + filepaths_in_directories - documents = [] - meta_list = normalize_metadata(meta, sources_count=len(all_filepaths) + len(byte_streams)) - - # Process file paths - for filepath, metadata in tqdm( - zip(all_filepaths, meta_list[:len(all_filepaths)]), desc="Converting files to Haystack Documents" - ): - elements = self._partition_source_into_elements(source=filepath) - docs_for_file = self._create_documents( - filepath=filepath, - elements=elements, - document_creation_mode=self.document_creation_mode, - separator=self.separator, - meta=metadata, - ) - documents.extend(docs_for_file) + elements = self._partition_source_into_elements(source=source) + merged_metadata = {**bytestream.meta, **metadata} + logger.info(f"merged_data : {merged_metadata}\n") - # Process byte streams - for bytestream in byte_streams: - elements = self._partition_source_into_elements(source=bytestream) docs_for_stream = self._create_documents( elements=elements, document_creation_mode=self.document_creation_mode, separator=self.separator, - meta=bytestream.meta, + meta=merged_metadata, ) documents.extend(docs_for_stream) return {"documents": documents} - def _get_sources( - self, - sources: Union[List[str], List[os.PathLike], List[ByteStream]] - ) -> Tuple[List[Path], List[Path], List[ByteStream]]: - """ - Helper function to process and return file paths, directories, and byte streams separately. - """ - paths_obj = [Path(source) for source in sources if isinstance(source, (str, os.PathLike))] - byte_streams = [source for source in sources if isinstance(source, ByteStream)] - - # Separate files and directories - filepaths = [path for path in paths_obj if path.is_file()] - - filepaths_in_directories = [ - filepath for path in paths_obj if path.is_dir() for filepath in path.glob("*.*") if filepath.is_file() - ] - - return filepaths, filepaths_in_directories, byte_streams - @staticmethod def _create_documents( elements: List[Element], diff --git a/integrations/unstructured/tests/test_converter.py b/integrations/unstructured/tests/test_converter.py index 48cd8e89e..855808f5b 100644 --- a/integrations/unstructured/tests/test_converter.py +++ b/integrations/unstructured/tests/test_converter.py @@ -256,3 +256,44 @@ def test_run_one_doc_per_element_with_meta_list_folder(self, samples_path): assert "category" in doc.meta assert "common_meta" in doc.meta assert doc.meta["common_meta"] == "common" + + @pytest.mark.integration + def test_run_one_doc_per_element_with_meta_list_multiple_sources(self, samples_path): + from pathlib import Path + + sources = [ + ByteStream(data=b"content", meta={"file_path": "some_file.md"}), + "README.md", + ByteStream(data=b"content", meta={"file_path": "yet_another_file.md"}), + Path(__file__), + ByteStream(data=b"content", meta={"file_path": "my_file.md"}), + ] + + meta = [ + {"type": "ByteStream"}, + {"type": "str"}, + {"type": "ByteStream"}, + {"type": "Path"}, + {"type": "ByteStream"}, + ] + + local_converter = UnstructuredFileConverter( + api_url="http://localhost:8000/general/v0/general", document_creation_mode="one-doc-per-page" + ) + + documents = local_converter.run(sources=sources, meta=meta)["documents"] + + assert len(documents) == 5 + assert documents[0].meta["type"] == "ByteStream" + assert documents[0].meta["file_path"] == "some_file.md" + assert documents[1].meta["type"] == "str" + assert documents[1].meta["file_path"] == "README.md" + assert documents[2].meta["type"] == "ByteStream" + assert documents[2].meta["file_path"] == "yet_another_file.md" + assert documents[3].meta["type"] == "Path" + assert documents[4].meta["type"] == "ByteStream" + assert documents[4].meta["file_path"] == "my_file.md" + + + + From 2203bbbcbd61c300e4772c70f7eb551fd4f0a084 Mon Sep 17 00:00:00 2001 From: alperkaya Date: Fri, 13 Sep 2024 12:11:38 +0200 Subject: [PATCH 4/5] clean up debug logs --- .../converters/unstructured/converter.py | 41 +++------------ .../unstructured/tests/test_converter.py | 51 ++++++++++++------- 2 files changed, 40 insertions(+), 52 deletions(-) diff --git a/integrations/unstructured/src/haystack_integrations/components/converters/unstructured/converter.py b/integrations/unstructured/src/haystack_integrations/components/converters/unstructured/converter.py index aa22f4bc9..b0928cc93 100644 --- a/integrations/unstructured/src/haystack_integrations/components/converters/unstructured/converter.py +++ b/integrations/unstructured/src/haystack_integrations/components/converters/unstructured/converter.py @@ -6,13 +6,12 @@ import os from collections import defaultdict from pathlib import Path -from typing import Any, Dict, List, Literal, Optional, Tuple, Union +from typing import Any, Dict, List, Literal, Optional, Union from haystack import Document, component, default_from_dict, default_to_dict from haystack.components.converters.utils import get_bytestream_from_source, normalize_metadata from haystack.dataclasses.byte_stream import ByteStream from haystack.utils import Secret, deserialize_secrets_inplace -from tqdm import tqdm from unstructured.documents.elements import Element from unstructured.partition.api import partition_via_api @@ -131,7 +130,7 @@ def run( Convert files or byte streams to Haystack Documents using the Unstructured API. :param sources: List of file paths or byte streams to convert. - Paths can be files or directories. Byte streams are also supported. + Paths can be files or directories. ByteStream is also supported. :param meta: Optional metadata to attach to the Documents. This value can be a single dictionary or a list of dictionaries, matching the number of sources. :returns: A dictionary with the following key: @@ -139,36 +138,12 @@ def run( :raises ValueError: If `meta` is a list and `sources` contains directories. """ - # Process sources and maintain meta list alignment - # paths_obj = [] - # byte_streams = [] - - # for path in sources: - # if isinstance(path, (str, os.PathLike)): - # paths_obj.append(Path(path)) - # elif isinstance(path, ByteStream): - # byte_streams.append(path) - - # Filter out only file paths - # filepaths = [path for path in paths_obj if path.is_file()] - - # # For each directory, find all files and add them to the list (if there were any directories) - # filepaths_in_directories = [ - # filepath for path in paths_obj if path.is_dir() for filepath in path.glob("*.*") if filepath.is_file() - # ] - - # if filepaths_in_directories and isinstance(meta, list): - # error = """"If providing directories in the `paths` parameter, - # `meta` can only be a dictionary (metadata applied to every file), - # and not a list. To specify different metadata for each file, - # provide an explicit list of direct paths instead.""" - # raise ValueError(error) - + documents = [] all_sources = [] - meta_sources = normalize_metadata(meta, len(sources)) # Use normalize_metadata here + meta_sources = normalize_metadata(meta, len(sources)) # Iterate over the sources - for i, source in enumerate(sources): + for source in sources: if isinstance(source, (str, os.PathLike)): path_obj = Path(source) @@ -184,7 +159,7 @@ def run( and not a list. To specify different metadata for each file, provide an explicit list of direct paths instead.""" raise ValueError(error) - + # If the source is a directory, add all files in the directory for file in path_obj.glob("*.*"): if file.is_file(): @@ -194,9 +169,6 @@ def run( # Handle ByteStream all_sources.append(source) - documents = [] - #logger.info(f"meta_sources: {meta_sources}") - for source, metadata in zip(all_sources, meta_sources): try: bytestream = get_bytestream_from_source(source=source) @@ -206,7 +178,6 @@ def run( elements = self._partition_source_into_elements(source=source) merged_metadata = {**bytestream.meta, **metadata} - logger.info(f"merged_data : {merged_metadata}\n") docs_for_stream = self._create_documents( elements=elements, diff --git a/integrations/unstructured/tests/test_converter.py b/integrations/unstructured/tests/test_converter.py index 855808f5b..7dea5cbd8 100644 --- a/integrations/unstructured/tests/test_converter.py +++ b/integrations/unstructured/tests/test_converter.py @@ -1,6 +1,8 @@ # SPDX-FileCopyrightText: 2023-present deepset GmbH # # SPDX-License-Identifier: Apache-2.0 +from pathlib import Path + import pytest from haystack.dataclasses.byte_stream import ByteStream @@ -259,22 +261,20 @@ def test_run_one_doc_per_element_with_meta_list_folder(self, samples_path): @pytest.mark.integration def test_run_one_doc_per_element_with_meta_list_multiple_sources(self, samples_path): - from pathlib import Path - - sources = [ - ByteStream(data=b"content", meta={"file_path": "some_file.md"}), - "README.md", - ByteStream(data=b"content", meta={"file_path": "yet_another_file.md"}), - Path(__file__), - ByteStream(data=b"content", meta={"file_path": "my_file.md"}), + sources = [ + ByteStream(data=b"content", meta={"file_path": "some_file.md"}), + "README.md", + ByteStream(data=b"content", meta={"file_path": "yet_another_file.md"}), + Path(__file__), + ByteStream(data=b"content", meta={"file_path": "my_file.md"}), ] - - meta = [ - {"type": "ByteStream"}, - {"type": "str"}, - {"type": "ByteStream"}, - {"type": "Path"}, - {"type": "ByteStream"}, + + meta = [ + {"type": "ByteStream"}, + {"type": "str"}, + {"type": "ByteStream"}, + {"type": "Path"}, + {"type": "ByteStream"}, ] local_converter = UnstructuredFileConverter( @@ -293,7 +293,24 @@ def test_run_one_doc_per_element_with_meta_list_multiple_sources(self, samples_p assert documents[3].meta["type"] == "Path" assert documents[4].meta["type"] == "ByteStream" assert documents[4].meta["file_path"] == "my_file.md" - + + @pytest.mark.integration + def test_run_one_doc_per_element_with_meta_list_multiple_sources_directory(self, samples_path): + sources = [ + "README.md", + ByteStream(data=b"Some content", meta={"file_path": "some_file.md"}), + samples_path, + Path(__file__), + ] + + meta = {"common_meta": "applies_to_all"} - + local_converter = UnstructuredFileConverter( + api_url="http://localhost:8000/general/v0/general", document_creation_mode="one-doc-per-page" + ) + + documents = local_converter.run(sources=sources, meta=meta)["documents"] + + for doc in documents: + assert doc.meta["common_meta"] == "applies_to_all" From 968f5b7ec0c94776bbccccddbd607c3e9894e507 Mon Sep 17 00:00:00 2001 From: alperkaya Date: Fri, 13 Sep 2024 12:13:12 +0200 Subject: [PATCH 5/5] delete extra line --- .../components/converters/unstructured/converter.py | 1 - 1 file changed, 1 deletion(-) diff --git a/integrations/unstructured/src/haystack_integrations/components/converters/unstructured/converter.py b/integrations/unstructured/src/haystack_integrations/components/converters/unstructured/converter.py index b0928cc93..e65f91989 100644 --- a/integrations/unstructured/src/haystack_integrations/components/converters/unstructured/converter.py +++ b/integrations/unstructured/src/haystack_integrations/components/converters/unstructured/converter.py @@ -240,7 +240,6 @@ def _create_documents( return docs - def _partition_source_into_elements(self, source: Union[Path, ByteStream]) -> List[Element]: """ Partition a file into elements using the Unstructured API.