Skip to content
This repository has been archived by the owner on Nov 13, 2024. It is now read-only.

Commit

Permalink
Adding support for .txt files in canopy upsert (#157)
Browse files Browse the repository at this point in the history
* commit txt file support

* fix linting

* fix CLI line too long

* fix docstrings has typehints

* fix, run non schecmatic text upsert only if found files

* [cli] Bug fix in new text loader

A DF was created in every iteration

* [tests] Added unit test for loading text files

* Update src/canopy_cli/data_loader/data_loader.py

Removed redundant line

---------

Co-authored-by: ilai <[email protected]>
Co-authored-by: igiloh-pinecone <[email protected]>
  • Loading branch information
3 people authored Nov 8, 2023
1 parent 4d01155 commit 6751828
Show file tree
Hide file tree
Showing 5 changed files with 147 additions and 15 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -163,4 +163,5 @@ cython_debug/
**/.DS_Store

datafiles/*
canopy-api-docs.html
canopy-api-docs.html
.vscode/
9 changes: 8 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,14 @@ canopy upsert /path/to/data_directory/file.parquet

# or
canopy upsert /path/to/data_directory/file.jsonl

# or
canopy upsert /path/to/directory_of_txt_files/

# ...
```

Canopy supports files in `jsonl` or `parquet` formats. The documents should have the following schema:
Canopy supports files in `jsonl`, `parquet` and `csv` formats. The documents should have the following schema:

```
+----------+--------------+--------------+---------------+
Expand All @@ -130,6 +135,8 @@ Canopy supports files in `jsonl` or `parquet` formats. The documents should have
```
> [This notebook](https://colab.research.google.com/github/pinecone-io/examples/blob/master/learn/generation/canopy/00-canopy-data-prep.ipynb) shows how you create a dataset in this format.
Additionally, you can load plaintext data files in `.txt` format. In this case, each file will be treated as a single document. The document id will be the filename, and the source will be the full path of the file.

Follow the instructions in the CLI to upload your data.

### 3. Start the Canopy server
Expand Down
3 changes: 2 additions & 1 deletion src/canopy_cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,8 @@ def upsert(index_name: str,
except Exception:
msg = (
f"A unexpected error while loading the data from files in {data_path}. "
"Please make sure the data is in valid `jsonl` or `parquet` format."
"Please make sure the data is in valid `jsonl`, `parquet`, `csv` format"
" or plaintext `.txt` files."
)
raise CLIError(msg)
pd.options.display.max_colwidth = 20
Expand Down
104 changes: 96 additions & 8 deletions src/canopy_cli/data_loader/data_loader.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import json
import os
import glob
from enum import Enum
from collections.abc import Iterable
from typing import List
from textwrap import dedent
Expand All @@ -21,6 +22,10 @@ class DocumentsValidationError(ValueError):
pass


class NonSchematicFilesTypes(Enum):
TEXT = "txt"


def format_multiline(msg):
return dedent(msg).strip()

Expand Down Expand Up @@ -67,30 +72,113 @@ def _df_to_documents(df: pd.DataFrame) -> List[Document]:
return documents


def _load_single_file_by_suffix(file_path: str) -> List[Document]:
def _load_multiple_txt_files(file_paths: List[str]) -> pd.DataFrame:
"""Load multiple text files into a single dataframe
Args:
file_paths: List of file paths to load
Returns:
pd.DataFrame: Dataframe with columns `id`, `text` and 'source`
Note: metadata will be empty
"""
if not isinstance(file_paths, list):
raise ValueError("file_paths must be a list of strings")
if len(file_paths) == 0:
raise ValueError("file_paths must not be empty")

rows = []
for file_path in file_paths:
with open(file_path, "r") as f:
text = f.read()
rows.append(
{
"id": os.path.basename(file_path).replace(".txt", ""),
"text": text,
"source": file_path
}
)
df = pd.DataFrame(rows, columns=["id", "text", "source"])
return df


def _load_single_schematic_file_by_suffix(file_path: str) -> List[Document]:
if file_path.endswith(".parquet"):
df = pd.read_parquet(file_path)
elif file_path.endswith(".csv"):
df = pd.read_csv(file_path)
elif file_path.endswith(".jsonl"):
df = pd.read_json(file_path, lines=True)
else:
raise ValueError("Only .parquet and .jsonl files are supported")
raise ValueError(
"Only [.parquet, .jsonl, .csv, .txt] files are supported"
)
return _df_to_documents(df)


def _load_multiple_non_schematic_files(
file_paths: List[str],
type: NonSchematicFilesTypes
) -> List[Document]:
if not isinstance(file_paths, list):
raise ValueError("file_paths must be a list of strings")
if len(file_paths) == 0:
raise ValueError("file_paths must not be empty")

if type == NonSchematicFilesTypes.TEXT:
df = _load_multiple_txt_files(file_paths)
else:
raise ValueError(f"Unsupported file type: {type}")

return _df_to_documents(df)


def load_from_path(path: str) -> List[Document]:
"""
Load documents from a file or directory
Args:
path: Path to file or directory
Returns:
List[Document]: List of documents
"""
if os.path.isdir(path):
all_files = [f for ext in ['*.jsonl', '*.parquet', '*.csv']
for f in glob.glob(os.path.join(path, ext))]
if len(all_files) == 0:
# List all files in directory
all_files_schematic = []
all_files_non_schematic_txt = []
for file in glob.glob(os.path.join(path, "*")):
if not os.path.isfile(file):
continue
if file.endswith(".txt"):
all_files_non_schematic_txt.append(file)
elif (file.endswith(".jsonl") or
file.endswith(".csv") or
file.endswith(".parquet")):
all_files_schematic.append(file)
if len(all_files_schematic) + len(all_files_non_schematic_txt) == 0:
raise ValueError("No files found in directory")

documents: List[Document] = []
for f in all_files:
documents.extend(_load_single_file_by_suffix(f))
# Load all schematic files
for f in all_files_schematic:
documents.extend(_load_single_schematic_file_by_suffix(f))

# Load all non-schematic files
if len(all_files_non_schematic_txt) > 0:
documents.extend(
_load_multiple_non_schematic_files(
all_files_non_schematic_txt,
NonSchematicFilesTypes.TEXT))

# Load single file
elif os.path.isfile(path):
documents = _load_single_file_by_suffix(path)
if path.endswith(".txt"):
documents = _load_multiple_non_schematic_files(
[path],
NonSchematicFilesTypes.TEXT)
else:
documents = _load_single_schematic_file_by_suffix(path)
else:
raise ValueError(f"Could not find file or directory at {path}")
return documents
43 changes: 39 additions & 4 deletions tests/unit/cli/test_data_loader.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import os
import random

import numpy as np
import pytest
import json
Expand All @@ -8,8 +11,9 @@
IDsNotUniqueError,
DocumentsValidationError,
load_from_path,
_load_single_file_by_suffix, _df_to_documents,
_load_single_schematic_file_by_suffix, _df_to_documents,
)
from tests.unit import random_words


good_df_minimal = (
Expand Down Expand Up @@ -253,7 +257,7 @@ def test_load_single_file_jsonl(tmpdir, dict_rows_input, expected_documents):
path = tmpdir.join("test.jsonl")
path.write("\n".join([json.dumps(row) for row in dict_rows_input]))

docs = _load_single_file_by_suffix(str(path))
docs = _load_single_schematic_file_by_suffix(str(path))
assert docs == expected_documents


Expand All @@ -263,7 +267,7 @@ def test_load_single_file_parquet(tmpdir, dict_rows_input, expected_documents):
path = tmpdir.join("test.parquet")
pd.DataFrame(data).to_parquet(str(path))

docs = _load_single_file_by_suffix(str(path))
docs = _load_single_schematic_file_by_suffix(str(path))
assert docs == expected_documents


Expand Down Expand Up @@ -292,4 +296,35 @@ def test_load_multiple_files(tmpdir, dict_rows_input, expected_documents):
pd.DataFrame(data2).to_parquet(str(path2))

docs = load_from_path(str(base_path))
assert docs == expected
assert sorted(docs, key=lambda x: x.id) == sorted(expected, key=lambda x: x.id)


def _generate_text(num_words: int, num_rows: int) -> str:
return "\n".join([" ".join(random.choices(random_words, k=num_words)) for _ in range(num_rows)]) # noqa: E501


def test_load_text_files(tmpdir, dict_rows_input, expected_documents):
tmpdir.mkdir("test_text_files")
base_path = tmpdir.join("test_text_files")
path1 = base_path.join("test1.jsonl")
path1.write("\n".join([json.dumps(row) for row in dict_rows_input]))
path2 = base_path.join("test2.txt")
path_2_text = _generate_text(10, 3)
path2.write(path_2_text)
path3 = base_path.join("test3.txt")
path_3_text = _generate_text(10, 3)
path3.write(path_3_text)

expected = expected_documents + [
Document(text=path_2_text,
id="test2",
source=os.path.join(str(base_path), "test2.txt")
),
Document(text=path_3_text,
id="test3",
source=os.path.join(str(base_path), "test3.txt")
),
]

docs = load_from_path(str(base_path))
assert sorted(docs, key=lambda x: x.id) == sorted(expected, key=lambda x: x.id)

0 comments on commit 6751828

Please sign in to comment.