From fdbb5e32231b8c6dd794bc9cdd83c0580bba486b Mon Sep 17 00:00:00 2001 From: Egil Date: Mon, 7 Oct 2024 22:32:32 +0200 Subject: [PATCH 01/10] Parsers can now return any number of fields, and can access the whole item --- docetl/dataset.py | 37 ++++++++++++-------------------- docetl/parsing_tools.py | 47 +++++++++++++++++++++++++++++++---------- 2 files changed, 49 insertions(+), 35 deletions(-) diff --git a/docetl/dataset.py b/docetl/dataset.py index c3be07b5..9d53753c 100644 --- a/docetl/dataset.py +++ b/docetl/dataset.py @@ -148,20 +148,14 @@ def _validate_parsing( for tool in parsing_tools: if ( not isinstance(tool, dict) - or "input_key" not in tool or "function" not in tool - or "output_key" not in tool ): raise ValueError( - "Each parsing tool must be a dictionary with 'input_key', 'function', and 'output_key' keys" + "Each parsing tool must be a dictionary with a 'function' key and any arguments required by that function" ) - if ( - not isinstance(tool["input_key"], str) - or not isinstance(tool["function"], str) - or not isinstance(tool["output_key"], str) - ): + if not isinstance(tool["function"], str): raise ValueError( - "'input_key', 'function', and 'output_key' in parsing tools must be strings" + "'function' in parsing tools must be a string" ) if "function_kwargs" in tool and not isinstance( tool["function_kwargs"], dict @@ -213,19 +207,12 @@ def load(self) -> List[Dict]: def _process_item( self, item: Dict[str, Any], - input_key: str, - output_key: str, func: Callable, **function_kwargs: Dict[str, Any], ): - if input_key not in item: - raise ValueError(f"Input key {input_key} not found in item: {item}") - result = func(item[input_key], **function_kwargs) - if isinstance(result, list): - return [item.copy() | {output_key: res} for res in result] - else: - return [item | {output_key: result}] - + result = func(item, **function_kwargs) + return [item.copy() | res for res in result] + def _apply_parsing_tools(self, data: List[Dict]) -> List[Dict]: """ Apply parsing tools to the data. @@ -240,7 +227,13 @@ def _apply_parsing_tools(self, data: List[Dict]) -> List[Dict]: ValueError: If a parsing tool is not found or if an input key is missing from an item. """ for tool in self.parsing: - input_key = tool["input_key"] + function_kwargs = dict(tool) + function_kwargs.pop("function") + # FIXME: The following is just for backwards compatibility + # with the existing yaml format... + if "function_kwargs" in function_kwargs: + function_kwargs.update(function_kwargs.pop("function_kwargs")) + try: func = get_parser(tool["function"]) except KeyError: @@ -261,8 +254,6 @@ def _apply_parsing_tools(self, data: List[Dict]) -> List[Dict]: f"Parsing tool {tool['function']} not found. Please define it or use one of our existing parsing tools: {get_parsing_tools()}" ) - output_key = tool["output_key"] - function_kwargs = tool.get("function_kwargs", {}) new_data = [] with ThreadPoolExecutor() as executor: @@ -270,8 +261,6 @@ def _apply_parsing_tools(self, data: List[Dict]) -> List[Dict]: executor.submit( self._process_item, item, - input_key, - output_key, func, **function_kwargs, ) diff --git a/docetl/parsing_tools.py b/docetl/parsing_tools.py index 208fa07c..86014abd 100644 --- a/docetl/parsing_tools.py +++ b/docetl/parsing_tools.py @@ -1,31 +1,49 @@ import importlib import io import os -from typing import Dict, List, Optional - - -def llama_index_simple_directory_reader(filename: str) -> List[str]: +from typing import Dict, List, Optional, Any + +def with_input_output_key(fn): + """Decorator that wraps a parser function that takes a single + string parameter and return list of strings and makes it a full + parser function that takes an item as a dictionary and return a + list of dictionaries.""" + def wrapper(item, input_key="text", output_key="text", **kw): + if input_key not in item: + raise ValueError(f"Input key {input_key} not found in item: {item}") + result = fn(item[input_key], **kw) + if not isinstance(result, list): + result = [result] + return [{output_key: res} for res in result] + return wrapper + +def llama_index_simple_directory_reader(item: dict[str, Any], input_key: str ="path") -> List[dict[str, Any]]: from llama_index.core import SimpleDirectoryReader - documents = SimpleDirectoryReader(filename).load_data() - # FIXME: What about doc.metadata? Would be good to include that too... - return [doc.text for doc in documents] + documents = SimpleDirectoryReader(item[input_key]).load_data() + return [{"text": doc.text, + "metadata": doc.metadata} + for doc in documents] -def llama_index_wikipedia_reader(filename: str) -> List[str]: +def llama_index_wikipedia_reader(item: dict[str, Any], input_key: str = "pages") -> List[dict[str, Any]]: from llama_index.readers.wikipedia import WikipediaReader loader = WikipediaReader() - pages = [filename] + pages = item[input_key] + if not isinstance(pages, list): + pages = [pages] documents = loader.load_data(pages=pages, auto_suggest=False) # The wikipedia reader does not include the page url in the metadata, which is impractical... for name, doc in zip(pages, documents): doc.metadata["source"] = "https://en.wikipedia.org/wiki/" + name - # FIXME: What about doc.metadata? Would be good to include that too... - return [doc.text for doc in documents] + return [{"text": doc.text, + "metadata": doc.metadata} + for doc in documents] +@with_input_output_key def whisper_speech_to_text(filename: str) -> List[str]: """ Transcribe speech from an audio file to text using Whisper model via litellm. @@ -72,6 +90,7 @@ def whisper_speech_to_text(filename: str) -> List[str]: return [response.text] +@with_input_output_key def xlsx_to_string( filename: str, orientation: str = "col", @@ -128,6 +147,7 @@ def process_sheet(sheet): return [process_sheet(wb.active)] +@with_input_output_key def txt_to_string(filename: str) -> List[str]: """ Read the content of a text file and return it as a list of strings (only one element). @@ -142,6 +162,7 @@ def txt_to_string(filename: str) -> List[str]: return [file.read()] +@with_input_output_key def docx_to_string(filename: str) -> List[str]: """ Extract text from a Word document. @@ -158,6 +179,7 @@ def docx_to_string(filename: str) -> List[str]: return ["\n".join([paragraph.text for paragraph in doc.paragraphs])] +@with_input_output_key def pptx_to_string(filename: str, doc_per_slide: bool = False) -> List[str]: """ Extract text from a PowerPoint presentation. @@ -195,6 +217,7 @@ def pptx_to_string(filename: str, doc_per_slide: bool = False) -> List[str]: return result +@with_input_output_key def azure_di_read( filename: str, use_url: bool = False, @@ -334,6 +357,7 @@ def azure_di_read( ] +@with_input_output_key def paddleocr_pdf_to_string( input_path: str, doc_per_page: bool = False, @@ -399,6 +423,7 @@ def paddleocr_pdf_to_string( return pdf_content +@with_input_output_key def gptpdf_to_string( input_path: str, gpt_model: str, From ae1b1fce2f3e0662a5e4ace0f632f51773c83e1a Mon Sep 17 00:00:00 2001 From: Shreya Shankar Date: Mon, 7 Oct 2024 10:02:11 -0700 Subject: [PATCH 02/10] nit: change gpt-4o to gpt-4o-mini in tests --- docetl/optimizers/reduce_optimizer.py | 2 +- tests/test_api.py | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/docetl/optimizers/reduce_optimizer.py b/docetl/optimizers/reduce_optimizer.py index 67727dec..84518aa1 100644 --- a/docetl/optimizers/reduce_optimizer.py +++ b/docetl/optimizers/reduce_optimizer.py @@ -1305,7 +1305,7 @@ def _calculate_compression_ratio( reduce_key = op_config["reduce_key"] input_schema = op_config.get("input", {}).get("schema", {}) output_schema = op_config["output"]["schema"] - model = op_config.get("model", "gpt-4o") + model = op_config.get("model", "gpt-4o-mini") compression_ratios = {} diff --git a/tests/test_api.py b/tests/test_api.py index 55b2c6b7..921c2de9 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -227,7 +227,9 @@ def test_pipeline_optimization( default_model="gpt-4o-mini", ) - optimized_pipeline = pipeline.optimize(max_threads=4, model="gpt-4o", timeout=10) + optimized_pipeline = pipeline.optimize( + max_threads=4, model="gpt-4o-mini", timeout=10 + ) assert isinstance(optimized_pipeline, Pipeline) assert len(optimized_pipeline.operations) == len(pipeline.operations) From 8de07c7b41ce3b00f315f95bde7b71181e29f100 Mon Sep 17 00:00:00 2001 From: Shreya Shankar Date: Mon, 7 Oct 2024 11:53:10 -0700 Subject: [PATCH 03/10] feat: add verbose parameter for gleaning --- docetl/operations/reduce.py | 6 ++++++ docetl/operations/utils.py | 12 +++++++----- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/docetl/operations/reduce.py b/docetl/operations/reduce.py index 30d2373b..b4091865 100644 --- a/docetl/operations/reduce.py +++ b/docetl/operations/reduce.py @@ -273,6 +273,11 @@ def execute(self, input_data: List[Dict]) -> Tuple[List[Dict], float]: Returns: Tuple[List[Dict], float]: A tuple containing the processed results and the total cost of the operation. """ + if self.config.get("gleaning", {}).get("validation_prompt", None): + self.console.log( + f"Using gleaning with validation prompt: {self.config.get('gleaning', {}).get('validation_prompt', '')}" + ) + reduce_keys = self.config["reduce_key"] if isinstance(reduce_keys, str): reduce_keys = [reduce_keys] @@ -860,6 +865,7 @@ def _batch_reduce( console=self.console, timeout_seconds=self.config.get("timeout", 120), max_retries_per_timeout=self.config.get("max_retries_per_timeout", 2), + verbose=self.config.get("verbose", False), ) item_cost += gleaning_cost else: diff --git a/docetl/operations/utils.py b/docetl/operations/utils.py index 88553ea7..c3e2545a 100644 --- a/docetl/operations/utils.py +++ b/docetl/operations/utils.py @@ -720,6 +720,7 @@ def call_llm_with_gleaning( console: Console = Console(), timeout_seconds: int = 120, max_retries_per_timeout: int = 2, + verbose: bool = False, ) -> Tuple[str, float]: """ Call LLM with a gleaning process, including validation and improvement rounds. @@ -789,7 +790,7 @@ def call_llm_with_gleaning( # Call LLM for validation self.runner.rate_limiter.try_acquire("llm_call", weight=1) validator_response = completion( - model="gpt-4o-mini", + model=model, messages=truncate_messages( messages + [{"role": "user", "content": validator_prompt}], model ), @@ -817,9 +818,10 @@ def call_llm_with_gleaning( if not suggestion["should_refine"]: break - # console.log( - # f"Validator improvements (gleaning round {rnd + 1}): {suggestion['improvements']}" - # ) + if verbose: + console.log( + f"Validator improvements (gleaning round {rnd + 1}): {suggestion['improvements']}" + ) # Prompt for improvement improvement_prompt = f"""Based on the validation feedback: @@ -1166,4 +1168,4 @@ def rich_as_completed(futures, total=None, desc=None, leave=True, console=None): with RichLoopBar(total=total, desc=desc, leave=leave, console=console) as pbar: for future in as_completed(futures): yield future - pbar.update() \ No newline at end of file + pbar.update() From eab071666ba254e3d481ee59a9f21d9eacd21c0e Mon Sep 17 00:00:00 2001 From: Shreya Shankar Date: Mon, 7 Oct 2024 11:56:26 -0700 Subject: [PATCH 04/10] feat: add verbose parameter for gleaning --- docetl/operations/map.py | 1 + 1 file changed, 1 insertion(+) diff --git a/docetl/operations/map.py b/docetl/operations/map.py index ed9dd4e1..3f077713 100644 --- a/docetl/operations/map.py +++ b/docetl/operations/map.py @@ -171,6 +171,7 @@ def validation_fn(response: Dict[str, Any]): max_retries_per_timeout=self.config.get( "max_retries_per_timeout", 2 ), + verbose=self.config.get("verbose", False), ), validation_fn=validation_fn, val_rule=self.config.get("validate", []), From d0975c8ae6fa7f30e8174d8b4865ecdf96af7ddc Mon Sep 17 00:00:00 2001 From: Shreya Shankar Date: Mon, 7 Oct 2024 19:16:18 -0700 Subject: [PATCH 05/10] fix: tokenizers should be wrapped in try catch --- docetl/operations/split.py | 12 +++++++++--- docetl/utils.py | 5 ++++- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/docetl/operations/split.py b/docetl/operations/split.py index 78444b95..8b9f26a3 100644 --- a/docetl/operations/split.py +++ b/docetl/operations/split.py @@ -51,9 +51,15 @@ def execute(self, input_data: List[Dict]) -> Tuple[List[Dict], float]: split_key = self.config["split_key"] method = self.config["method"] method_kwargs = self.config["method_kwargs"] - encoder = tiktoken.encoding_for_model( - self.config["method_kwargs"].get("model", self.default_model).split("/")[-1] - ) + try: + encoder = tiktoken.encoding_for_model( + self.config["method_kwargs"] + .get("model", self.default_model) + .split("/")[-1] + ) + except Exception: + encoder = tiktoken.encoding_for_model("gpt-4o") + results = [] cost = 0.0 diff --git a/docetl/utils.py b/docetl/utils.py index a1336b1d..61f95379 100644 --- a/docetl/utils.py +++ b/docetl/utils.py @@ -120,7 +120,10 @@ def truncate_sample_data( remaining_tokens = available_tokens - current_tokens # Encode the value - encoder = tiktoken.encoding_for_model(model) + try: + encoder = tiktoken.encoding_for_model(model) + except Exception: + encoder = tiktoken.encoding_for_model("gpt-4o") encoded_value = encoder.encode(str(data[key])) # Calculate how many tokens to keep From fab6641c20089f9010efd2abeef6ab055b0a7d14 Mon Sep 17 00:00:00 2001 From: Shreya Shankar Date: Mon, 7 Oct 2024 21:06:07 -0700 Subject: [PATCH 06/10] fix: resort to eval if ast eval does not work --- docetl/operations/utils.py | 6 ++++- docs/concepts/operators.md | 2 +- tests/test_validation.py | 46 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 52 insertions(+), 2 deletions(-) create mode 100644 tests/test_validation.py diff --git a/docetl/operations/utils.py b/docetl/operations/utils.py index c3e2545a..f4ce3c48 100644 --- a/docetl/operations/utils.py +++ b/docetl/operations/utils.py @@ -359,7 +359,11 @@ def safe_eval(expression: str, output: Dict) -> bool: # Safely evaluate the expression return bool(aeval(expression)) except Exception: - return False + # try to evaluate with python eval + try: + return bool(eval(expression, locals={"output": output})) + except Exception: + return False class APIWrapper(object): diff --git a/docs/concepts/operators.md b/docs/concepts/operators.md index f81653c7..fc9070ef 100644 --- a/docs/concepts/operators.md +++ b/docs/concepts/operators.md @@ -135,7 +135,7 @@ validate: - all(len(insight["supporting_actions"]) >= 1 for insight in output["insights"]) ``` -Access variables using dictionary syntax: `input["field"]` or `output["field"]`. +Access variables using dictionary syntax: `output["field"]`. Note that you can't access `input` docs in validation, but the output docs should have all the fields from the input docs (for non-reduce operations), since fields pass through unchanged. The `num_retries_on_validate_failure` attribute specifies how many times to retry the LLM if any validation statements fail. diff --git a/tests/test_validation.py b/tests/test_validation.py new file mode 100644 index 00000000..ed64af4c --- /dev/null +++ b/tests/test_validation.py @@ -0,0 +1,46 @@ +import pytest +from docetl.operations.map import MapOperation +from tests.conftest import api_wrapper, default_model, max_threads + + +@pytest.fixture +def map_config_with_validation(): + return { + "name": "sentiment_analysis_with_validation", + "type": "map", + "prompt": "Analyze the sentiment of the following text: '{{ input.text }}'. Classify it as either positive, negative, or neutral.", + "output": {"schema": {"sentiment": "string", "confidence": "float"}}, + "model": "gpt-4o-mini", + "validate": [ + "output['sentiment'] in ['positive', 'negative', 'neutral']", + "0 <= output['confidence'] <= 1", + ], + "num_retries_on_validate_failure": 2, + } + + +@pytest.fixture +def sample_data(): + return [ + {"text": "I love this product! It's amazing."}, + {"text": "This is the worst experience ever."}, + {"text": "The weather is okay today."}, + ] + + +def test_map_operation_with_validation( + map_config_with_validation, sample_data, api_wrapper, default_model, max_threads +): + operation = MapOperation( + api_wrapper, map_config_with_validation, default_model, max_threads + ) + results, cost = operation.execute(sample_data) + + assert len(results) == len(sample_data) + assert cost > 0 + + for result in results: + assert "sentiment" in result + assert "confidence" in result + assert result["sentiment"] in ["positive", "negative", "neutral"] + assert 0 <= result["confidence"] <= 1 From 278da0bb2671f4c6a58c23ef6d264a6d35bad23e Mon Sep 17 00:00:00 2001 From: Shreya Shankar Date: Mon, 7 Oct 2024 21:26:51 -0700 Subject: [PATCH 07/10] docs: update docs to reflect new custom parsing API Co-authored-by: redhog --- docs/examples/custom-parsing.md | 82 +++++++++++------------ tests/basic/test_pipeline_with_parsing.py | 10 ++- 2 files changed, 43 insertions(+), 49 deletions(-) diff --git a/docs/examples/custom-parsing.md b/docs/examples/custom-parsing.md index 5f62d1ae..a8959f97 100644 --- a/docs/examples/custom-parsing.md +++ b/docs/examples/custom-parsing.md @@ -35,10 +35,11 @@ To use custom parsing, you need to define parsing tools in your DocETL configura parsing_tools: - name: top_products_report function_code: | - def top_products_report(filename: str) -> List[str]: + def top_products_report(document: Dict) -> List[Dict]: import pandas as pd # Read the Excel file + filename = document["excel_path"] df = pd.read_excel(filename) # Calculate total sales @@ -61,7 +62,10 @@ parsing_tools: mom_growth.to_string() ] - return ["\n".join(report)] + # Return a list of dicts representing the output + # The input document will be merged into each output doc, + # so we can access all original fields from the input doc. + return [{"sales_analysis": "\n".join(report)}] datasets: sales_reports: @@ -69,9 +73,7 @@ datasets: source: local path: "sales_data/sales_paths.json" parsing: - - input_key: excel_path - function: top_products_report - output_key: sales_analysis + - function: top_products_report receipts: type: file @@ -81,9 +83,8 @@ datasets: - input_key: pdf_path function: paddleocr_pdf_to_string output_key: receipt_text - function_kwargs: - ocr_enabled: true - lang: "en" + ocr_enabled: true + lang: "en" ``` In this configuration: @@ -111,8 +112,6 @@ pipeline: This pipeline will use the parsed data from both Excel files and PDFs for further processing. - - ### How Data Gets Parsed and Formatted When you run your DocETL pipeline, the parsing tools you've specified in your configuration file are applied to the external files referenced in your dataset JSONs. Here's what happens: @@ -205,45 +204,45 @@ When you run this command: DocETL provides several built-in parsing tools to handle common file formats and data processing tasks. These tools can be used directly in your configuration by specifying their names in the `function` field of your parsing tools configuration. Here's an overview of the available built-in parsing tools: ::: docetl.parsing_tools.xlsx_to_string - options: - show_root_heading: true - heading_level: 3 + options: + show_root_heading: true + heading_level: 3 ::: docetl.parsing_tools.txt_to_string - options: - show_root_heading: true - heading_level: 3 + options: + show_root_heading: true + heading_level: 3 ::: docetl.parsing_tools.docx_to_string - options: - show_root_heading: true - heading_level: 3 + options: + show_root_heading: true + heading_level: 3 ::: docetl.parsing_tools.whisper_speech_to_text - options: - show_root_heading: true - heading_level: 3 + options: + show_root_heading: true + heading_level: 3 ::: docetl.parsing_tools.pptx_to_string - options: - show_root_heading: true - heading_level: 3 + options: + show_root_heading: true + heading_level: 3 ::: docetl.parsing_tools.azure_di_read - options: - heading_level: 3 - show_root_heading: true + options: + heading_level: 3 + show_root_heading: true ::: docetl.parsing_tools.paddleocr_pdf_to_string - options: - heading_level: 3 - show_root_heading: true + options: + heading_level: 3 + show_root_heading: true ### Using Function Arguments with Parsing Tools -When using parsing tools in your DocETL configuration, you can pass additional arguments to the parsing functions using the function_kwargs field. This allows you to customize the behavior of the parsing tools without modifying their implementation. +When using parsing tools in your DocETL configuration, you can pass additional arguments to the parsing functions. -For example, when using the xlsx_to_string parsing tool, you can specify options like the orientation of the data, the order of columns, or whether to process each sheet separately. Here's an example of how to use function_kwargs in your configuration: +For example, when using the xlsx_to_string parsing tool, you can specify options like the orientation of the data, the order of columns, or whether to process each sheet separately. Here's an example of how to use such kwargs in your configuration: ```yaml datasets: @@ -254,10 +253,9 @@ datasets: parsing_tools: - name: excel_parser function: xlsx_to_string - function_kwargs: - orientation: row - col_order: ["Date", "Product", "Quantity", "Price"] - doc_per_sheet: true + orientation: row + col_order: ["Date", "Product", "Quantity", "Price"] + doc_per_sheet: true ``` ## Contributing Built-in Parsing Tools @@ -285,7 +283,7 @@ While DocETL provides several built-in parsing tools, the community can always b If the built-in tools don't meet your needs, you can create your own custom parsing tools. Here's how: 1. Define your parsing function in the `parsing_tools` section of your configuration. -2. Ensure your function takes a filename as input and returns a list of strings. +2. Ensure your function takes a document (dict) as input and returns a list of documents (dicts). 3. Use your custom parser in the `parsing` section of your dataset configuration. For example: @@ -294,7 +292,7 @@ For example: parsing_tools: - name: my_custom_parser function_code: | - def my_custom_parser(filename: str) -> List[str]: + def my_custom_parser(document: Dict) -> List[Dict]: # Your custom parsing logic here return [processed_data] @@ -304,7 +302,5 @@ datasets: source: local path: "data/paths.json" parsing: - - input_key: file_path - function: my_custom_parser - output_key: processed_data -``` \ No newline at end of file + - function: my_custom_parser +``` diff --git a/tests/basic/test_pipeline_with_parsing.py b/tests/basic/test_pipeline_with_parsing.py index bd524998..03ad15e4 100644 --- a/tests/basic/test_pipeline_with_parsing.py +++ b/tests/basic/test_pipeline_with_parsing.py @@ -1,4 +1,4 @@ -from typing import List +from typing import Dict, List import pytest import json import os @@ -129,9 +129,9 @@ def test_pipeline_with_parsing(config_file): os.remove(sample_data_file.name) -def custom_exploder(text: str) -> List[str]: - - return [t for t in text] +def custom_exploder(doc: Dict) -> List[Dict]: + text = doc["text"] + return [{"text": t} for t in text] def test_pipeline_with_custom_parsing(): @@ -160,9 +160,7 @@ def test_pipeline_with_custom_parsing(): path=tmp_input.name, parsing=[ { - "input_key": "text", "function": "custom_exploder", - "output_key": "parsed_content", } ], ) From 9afd46835ceba403f26f207ee1eb5b9fee6aec2c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Egil=20M=C3=B6ller?= Date: Wed, 9 Oct 2024 08:45:39 +0200 Subject: [PATCH 08/10] Clustering (#84) * nit: change gpt-4o to gpt-4o-mini in tests * feat: add verbose parameter for gleaning * feat: add verbose parameter for gleaning * fix: tokenizers should be wrapped in try catch * fix: resort to eval if ast eval does not work * Merge staging to main (after parsers refactor) (#82) * Parsers can now return any number of fields, and can access the whole item * nit: change gpt-4o to gpt-4o-mini in tests * feat: add verbose parameter for gleaning * feat: add verbose parameter for gleaning * fix: tokenizers should be wrapped in try catch * fix: resort to eval if ast eval does not work * docs: update docs to reflect new custom parsing API --------- Co-authored-by: Egil * Added new clustering operation * Reverse path * Added docs for cluster operator * Bugfix for docs formatting * docs: add sample parameter (#87) * Added new clustering operation * Reverse path * Added docs for cluster operator * Bugfix for docs formatting * add tests and link to doc --------- Co-authored-by: Shreya Shankar Co-authored-by: Egil --- docetl/operations/cluster.py | 206 +++++++++++++++++++++++++++++++ docs/api-reference/operations.md | 9 ++ docs/operators/cluster.md | 188 ++++++++++++++++++++++++++++ docs/operators/filter.md | 1 + docs/operators/gather.md | 1 + docs/operators/map.md | 4 +- docs/operators/parallel-map.md | 2 +- docs/operators/reduce.md | 1 + docs/operators/resolve.md | 2 +- docs/operators/split.md | 1 + docs/operators/unnest.md | 1 + mkdocs.yml | 1 + pyproject.toml | 1 + tests/basic/test_cluster.py | 117 ++++++++++++++++++ 14 files changed, 531 insertions(+), 4 deletions(-) create mode 100644 docetl/operations/cluster.py create mode 100644 docs/operators/cluster.md create mode 100644 tests/basic/test_cluster.py diff --git a/docetl/operations/cluster.py b/docetl/operations/cluster.py new file mode 100644 index 00000000..09144e77 --- /dev/null +++ b/docetl/operations/cluster.py @@ -0,0 +1,206 @@ +from jinja2 import Environment, Template +from concurrent.futures import ThreadPoolExecutor +from typing import Any, Dict, List, Optional, Tuple +from .base import BaseOperation +from .utils import RichLoopBar +from .clustering_utils import get_embeddings_for_clustering + + +class ClusterOperation(BaseOperation): + def __init__( + self, + *args, + **kwargs, + ): + super().__init__(*args, **kwargs) + self.max_batch_size: int = self.config.get( + "max_batch_size", kwargs.get("max_batch_size", float("inf")) + ) + + def syntax_check(self) -> None: + """ + Checks the configuration of the ClusterOperation for required keys and valid structure. + + Raises: + ValueError: If required keys are missing or invalid in the configuration. + TypeError: If configuration values have incorrect types. + """ + required_keys = ["embedding_keys", "summary_schema", "summary_prompt"] + for key in required_keys: + if key not in self.config: + raise ValueError( + f"Missing required key '{key}' in ClusterOperation configuration" + ) + + if not isinstance(self.config["embedding_keys"], list): + raise TypeError("'embedding_keys' must be a list of strings") + + if "output_key" in self.config: + if not isinstance(self.config["output_key"], str): + raise TypeError("'output_key' must be a string") + + if not isinstance(self.config["summary_schema"], dict): + raise TypeError("'summary_schema' must be a dictionary") + + if not isinstance(self.config["summary_prompt"], str): + raise TypeError("'prompt' must be a string") + + # Check if the prompt is a valid Jinja2 template + try: + Template(self.config["summary_prompt"]) + except Exception as e: + raise ValueError(f"Invalid Jinja2 template in 'prompt': {str(e)}") + + # Check optional parameters + if "max_batch_size" in self.config: + if not isinstance(self.config["max_batch_size"], int): + raise TypeError("'max_batch_size' must be an integer") + + if "embedding_model" in self.config: + if not isinstance(self.config["embedding_model"], str): + raise TypeError("'embedding_model' must be a string") + + if "model" in self.config: + if not isinstance(self.config["model"], str): + raise TypeError("'model' must be a string") + + if "validate" in self.config: + if not isinstance(self.config["validate"], list): + raise TypeError("'validate' must be a list of strings") + for rule in self.config["validate"]: + if not isinstance(rule, str): + raise TypeError("Each validation rule must be a string") + + def execute( + self, input_data: List[Dict], is_build: bool = False + ) -> Tuple[List[Dict], float]: + """ + Executes the cluster operation on the input data. Modifies the + input data and returns it in place. + + Args: + input_data (List[Dict]): A list of dictionaries to process. + is_build (bool): Whether the operation is being executed + in the build phase. Defaults to False. + + Returns: + Tuple[List[Dict], float]: A tuple containing the clustered + list of dictionaries and the total cost of the operation. + """ + if not input_data: + return input_data, 0 + + if len(input_data) == 1: + input_data[0][self.config.get("output_key", "clusters")] = () + return input_data, 0 + + embeddings, cost = get_embeddings_for_clustering( + input_data, self.config, self.runner.api + ) + + tree = self.agglomerative_cluster_of_embeddings(input_data, embeddings) + + self.prompt_template = Template(self.config["summary_prompt"]) + cost += self.annotate_clustering_tree(tree) + self.annotate_leaves(tree) + + return input_data, cost + + def agglomerative_cluster_of_embeddings(self, input_data, embeddings): + import sklearn.cluster + + cl = sklearn.cluster.AgglomerativeClustering( + compute_full_tree=True, compute_distances=True + ) + cl.fit(embeddings) + + nsamples = len(embeddings) + + def build_tree(i): + if i < nsamples: + res = input_data[i] + # res["embedding"] = list(embeddings[i]) + return res + return { + "children": [ + build_tree(cl.children_[i - nsamples, 0]), + build_tree(cl.children_[i - nsamples, 1]), + ], + "distance": cl.distances_[i - nsamples], + } + + return build_tree(nsamples + len(cl.children_) - 1) + + def annotate_clustering_tree(self, t): + if "children" in t: + with ThreadPoolExecutor(max_workers=self.max_batch_size) as executor: + futures = [ + executor.submit(self.annotate_clustering_tree, child) + for child in t["children"] + ] + + total_cost = 0 + pbar = RichLoopBar( + range(len(futures)), + desc=f"Processing {self.config['name']} (map) on all documents", + console=self.console, + ) + for i in pbar: + total_cost += futures[i].result() + pbar.update(i) + + assert len(t["children"]) == 2, ( + "Agglomerative clustering is supposed to generate clusters with 2 children each, but this cluster has %s" + % len(t["children"]) + ) + prompt = self.prompt_template.render( + left=t["children"][0], right=t["children"][1] + ) + + def validation_fn(response: Dict[str, Any]): + output = self.runner.api.parse_llm_response( + response, + schema=self.config["summary_schema"], + manually_fix_errors=self.manually_fix_errors, + )[0] + if self.runner.api.validate_output(self.config, output, self.console): + return output, True + return output, False + + output, cost, success = self.runner.api.call_llm_with_validation( + [{"role": "user", "content": prompt}], + model=self.config.get("model", self.default_model), + operation_type="cluster", + schema=self.config["summary_schema"], + llm_call_fn=lambda messages: self.runner.api.call_llm( + self.config.get("model", self.default_model), + "cluster", + messages, + self.config["summary_schema"], + tools=self.config.get("tools", None), + console=self.console, + timeout_seconds=self.config.get("timeout", 120), + max_retries_per_timeout=self.config.get( + "max_retries_per_timeout", 2 + ), + ), + validation_fn=validation_fn, + val_rule=self.config.get("validate", []), + num_retries=self.num_retries_on_validate_failure, + console=self.console, + ) + total_cost += cost + + t.update(output) + + return total_cost + return 0 + + def annotate_leaves(self, tree, path=()): + if "children" in tree: + item = dict(tree) + item.pop("children") + for child in tree["children"]: + self.annotate_leaves(child, path=(item,) + path) + else: + tree[self.config.get("output_key", "clusters")] = path diff --git a/docs/api-reference/operations.md b/docs/api-reference/operations.md index 5d3487e7..423fc59a 100644 --- a/docs/api-reference/operations.md +++ b/docs/api-reference/operations.md @@ -54,6 +54,15 @@ ignore_init_summary: false trim_doctest_flags: true +::: docetl.operations.cluster.ClusterOperation + options: + show_root_heading: true + heading_level: 3 + show_if_no_docstring: false + docstring_options: + ignore_init_summary: false + trim_doctest_flags: true + # Auxiliary Operators ::: docetl.operations.split.SplitOperation diff --git a/docs/operators/cluster.md b/docs/operators/cluster.md new file mode 100644 index 00000000..51ef82a1 --- /dev/null +++ b/docs/operators/cluster.md @@ -0,0 +1,188 @@ +# Cluster operation + +The Cluster operation in DocETL groups all items into a binary tree +using [agglomerative +clustering](https://en.wikipedia.org/wiki/Hierarchical_clustering#Agglomerative_clustering_example) +of the embedding of some keys, and annotates each item with the path +through this tree down to the item (Note that the path is reversed, +starting with the most specific grouping, and ending in the root of +the tree, the cluster that encompasses all your input). + +Each cluster is summarized using an llm prompt, taking the summaries +of its children as inputs (or for the leaf nodes, the actual items). + +## 🚀 Example: Grouping concepts from a knowledge-graph + +```yaml +- name: cluster_concepts + type: cluster + max_batch_size: 5 + embedding_keys: + - concept + - description + output_key: categories # This is optional, and defaults to "clusters" + summary_schema: + concept: str + description: str + summary_prompt: | + The following describes two related concepts. What concept + encompasses both? Try not to be too broad; it might be that one of + these two concepts already encompasses the other; in that case, + you should just use that concept. + + {{left.concept}}: + {{left.description}} + + {{right.concept}}: + {{right.description}} + + Provide the title of the super-concept, and a description. +``` + +This cluster operation processes a set of concepts, each with a title +and a description, and groups them into a tree of categories. + +??? example "Sample Input and Output" + + Input: + ```json + [ + { + "concept": "Shed", + "description": "A shed is typically a simple, single-story roofed structure, often used for storage, for hobbies, or as a workshop, and typically serving as outbuilding, such as in a back garden or on an allotment. Sheds vary considerably in their size and complexity of construction, from simple open-sided ones designed to cover bicycles or garden items to large wood-framed structures with shingled roofs, windows, and electrical outlets. Sheds used on farms or in the industry can be large structures. The main types of shed construction are metal sheathing over a metal frame, plastic sheathing and frame, all-wood construction (the roof may be asphalt shingled or sheathed in tin), and vinyl-sided sheds built over a wooden frame. Small sheds may include a wooden or plastic floor, while more permanent ones may be built on a concrete pad or foundation. Sheds may be lockable to deter theft or entry by children, domestic animals, wildlife, etc." + }, + { + "concept": "Barn", + "description": "A barn is an agricultural building usually on farms and used for various purposes. In North America, a barn refers to structures that house livestock, including cattle and horses, as well as equipment and fodder, and often grain.[2] As a result, the term barn is often qualified e.g. tobacco barn, dairy barn, cow house, sheep barn, potato barn. In the British Isles, the term barn is restricted mainly to storage structures for unthreshed cereals and fodder, the terms byre or shippon being applied to cow shelters, whereas horses are kept in buildings known as stables.[2][3] In mainland Europe, however, barns were often part of integrated structures known as byre-dwellings (or housebarns in US literature). In addition, barns may be used for equipment storage, as a covered workplace, and for activities such as threshing." + }, + { + "concept": "Tree house", + "description": "A tree house, tree fort or treeshed, is a platform or building constructed around, next to or among the trunk or branches of one or more mature trees while above ground level. Tree houses can be used for recreation, work space, habitation, a hangout space and observation. People occasionally connect ladders or staircases to get up to the platforms." + }, + { + "concept": "Castle", + "description": "A castle is a type of fortified structure built during the Middle Ages predominantly by the nobility or royalty and by military orders. Scholars usually consider a castle to be the private fortified residence of a lord or noble. This is distinct from a mansion, palace, and villa, whose main purpose was exclusively for pleasance and are not primarily fortresses but may be fortified.[a] Use of the term has varied over time and, sometimes, has also been applied to structures such as hill forts and 19th- and 20th-century homes built to resemble castles. Over the Middle Ages, when genuine castles were built, they took on a great many forms with many different features, although some, such as curtain walls, arrowslits, and portcullises, were commonplace." + }, + { + "concept": "Fortress", + "description": "A fortification (also called a fort, fortress, fastness, or stronghold) is a military construction designed for the defense of territories in warfare, and is used to establish rule in a region during peacetime. The term is derived from Latin fortis ('strong') and facere ('to make'). From very early history to modern times, defensive walls have often been necessary for cities to survive in an ever-changing world of invasion and conquest. Some settlements in the Indus Valley Civilization were the first small cities to be fortified. In ancient Greece, large stone walls had been built in Mycenaean Greece, such as the ancient site of Mycenae (known for the huge stone blocks of its 'cyclopean' walls). A Greek phrourion was a fortified collection of buildings used as a military garrison, and is the equivalent of the Roman castellum or fortress. These constructions mainly served the purpose of a watch tower, to guard certain roads, passes, and borders. Though smaller than a real fortress, they acted as a border guard rather than a real strongpoint to watch and maintain the border." + } + ] + ``` + + Output: + ```json + [ + { + "concept": "Shed", + "description": "A shed is typically a simple, single-story roofed structure, often used for storage, for hobbies, or as a workshop, and typically serving as outbuilding, such as in a back garden or on an allotment. Sheds vary considerably in their size and complexity of construction, from simple open-sided ones designed to cover bicycles or garden items to large wood-framed structures with shingled roofs, windows, and electrical outlets. Sheds used on farms or in the industry can be large structures. The main types of shed construction are metal sheathing over a metal frame, plastic sheathing and frame, all-wood construction (the roof may be asphalt shingled or sheathed in tin), and vinyl-sided sheds built over a wooden frame. Small sheds may include a wooden or plastic floor, while more permanent ones may be built on a concrete pad or foundation. Sheds may be lockable to deter theft or entry by children, domestic animals, wildlife, etc.", + "categories": [ + { + "distance": 0.9907871670904073, + "concept": "Outbuildings", + "description": "Outbuildings are structures that are separate from a main building, typically located on a property for purposes such as storage, workshops, or housing animals and equipment. This category includes structures like sheds and barns, which serve specific functions like storing tools, equipment, or livestock." + }, + { + "distance": 1.148880974178631, + "concept": "Auxiliary Structures", + "description": "Auxiliary structures are secondary or additional buildings that serve various practical purposes related to a main dwelling or property. This category encompasses structures like tree houses and outbuildings, which provide functional, recreational, or storage spaces, often designed to enhance the usability of the property." + }, + { + "distance": 1.292957924480073, + "concept": "Military and Support Structures", + "description": "Military and support structures refer to various types of constructions designed for specific functions related to defense and utility. This concept encompasses fortified structures, such as castles and fortresses, built for protection and military purposes, as well as auxiliary structures that serve practical roles for main buildings, including storage, recreation, and additional facilities. Together, these structures enhance the safety, functionality, and usability of a property or territory." + } + ] + }, + { + "concept": "Barn", + "description": "A barn is an agricultural building usually on farms and used for various purposes. In North America, a barn refers to structures that house livestock, including cattle and horses, as well as equipment and fodder, and often grain.[2] As a result, the term barn is often qualified e.g. tobacco barn, dairy barn, cow house, sheep barn, potato barn. In the British Isles, the term barn is restricted mainly to storage structures for unthreshed cereals and fodder, the terms byre or shippon being applied to cow shelters, whereas horses are kept in buildings known as stables.[2][3] In mainland Europe, however, barns were often part of integrated structures known as byre-dwellings (or housebarns in US literature). In addition, barns may be used for equipment storage, as a covered workplace, and for activities such as threshing.", + "categories": [ + { + "distance": 0.9907871670904073, + "concept": "Outbuildings", + "description": "Outbuildings are structures that are separate from a main building, typically located on a property for purposes such as storage, workshops, or housing animals and equipment. This category includes structures like sheds and barns, which serve specific functions like storing tools, equipment, or livestock." + }, + { + "distance": 1.148880974178631, + "concept": "Auxiliary Structures", + "description": "Auxiliary structures are secondary or additional buildings that serve various practical purposes related to a main dwelling or property. This category encompasses structures like tree houses and outbuildings, which provide functional, recreational, or storage spaces, often designed to enhance the usability of the property." + }, + { + "distance": 1.292957924480073, + "concept": "Military and Support Structures", + "description": "Military and support structures refer to various types of constructions designed for specific functions related to defense and utility. This concept encompasses fortified structures, such as castles and fortresses, built for protection and military purposes, as well as auxiliary structures that serve practical roles for main buildings, including storage, recreation, and additional facilities. Together, these structures enhance the safety, functionality, and usability of a property or territory." + } + ] + }, + { + "concept": "Tree house", + "description": "A tree house, tree fort or treeshed, is a platform or building constructed around, next to or among the trunk or branches of one or more mature trees while above ground level. Tree houses can be used for recreation, work space, habitation, a hangout space and observation. People occasionally connect ladders or staircases to get up to the platforms.", + "categories": [ + { + "distance": 1.148880974178631, + "concept": "Auxiliary Structures", + "description": "Auxiliary structures are secondary or additional buildings that serve various practical purposes related to a main dwelling or property. This category encompasses structures like tree houses and outbuildings, which provide functional, recreational, or storage spaces, often designed to enhance the usability of the property." + }, + { + "distance": 1.292957924480073, + "concept": "Military and Support Structures", + "description": "Military and support structures refer to various types of constructions designed for specific functions related to defense and utility. This concept encompasses fortified structures, such as castles and fortresses, built for protection and military purposes, as well as auxiliary structures that serve practical roles for main buildings, including storage, recreation, and additional facilities. Together, these structures enhance the safety, functionality, and usability of a property or territory." + } + ] + }, + { + "concept": "Castle", + "description": "A castle is a type of fortified structure built during the Middle Ages predominantly by the nobility or royalty and by military orders. Scholars usually consider a castle to be the private fortified residence of a lord or noble. This is distinct from a mansion, palace, and villa, whose main purpose was exclusively for pleasance and are not primarily fortresses but may be fortified.[a] Use of the term has varied over time and, sometimes, has also been applied to structures such as hill forts and 19th- and 20th-century homes built to resemble castles. Over the Middle Ages, when genuine castles were built, they took on a great many forms with many different features, although some, such as curtain walls, arrowslits, and portcullises, were commonplace.", + "categories": [ + { + "distance": 0.9152435235428339, + "concept": "Fortified structures", + "description": "Fortified structures refer to buildings designed to protect from attacks and enhance defense. This category encompasses various forms of military architecture, including castles and fortresses. Castles serve as private residences for nobility or military orders with substantial fortification features, while fortresses are broader military constructions aimed at defending territories and establishing control. Both types share the common purpose of defense against invasion, though they serve different social and functional roles." + }, + { + "distance": 1.292957924480073, + "concept": "Military and Support Structures", + "description": "Military and support structures refer to various types of constructions designed for specific functions related to defense and utility. This concept encompasses fortified structures, such as castles and fortresses, built for protection and military purposes, as well as auxiliary structures that serve practical roles for main buildings, including storage, recreation, and additional facilities. Together, these structures enhance the safety, functionality, and usability of a property or territory." + } + ] + }, + { + "concept": "Fortress", + "description": "A fortification (also called a fort, fortress, fastness, or stronghold) is a military construction designed for the defense of territories in warfare, and is used to establish rule in a region during peacetime. The term is derived from Latin fortis ('strong') and facere ('to make'). From very early history to modern times, defensive walls have often been necessary for cities to survive in an ever-changing world of invasion and conquest. Some settlements in the Indus Valley Civilization were the first small cities to be fortified. In ancient Greece, large stone walls had been built in Mycenaean Greece, such as the ancient site of Mycenae (known for the huge stone blocks of its 'cyclopean' walls). A Greek phrourion was a fortified collection of buildings used as a military garrison, and is the equivalent of the Roman castellum or fortress. These constructions mainly served the purpose of a watch tower, to guard certain roads, passes, and borders. Though smaller than a real fortress, they acted as a border guard rather than a real strongpoint to watch and maintain the border.", + "categories": [ + { + "distance": 0.9152435235428339, + "concept": "Fortified structures", + "description": "Fortified structures refer to buildings designed to protect from attacks and enhance defense. This category encompasses various forms of military architecture, including castles and fortresses. Castles serve as private residences for nobility or military orders with substantial fortification features, while fortresses are broader military constructions aimed at defending territories and establishing control. Both types share the common purpose of defense against invasion, though they serve different social and functional roles." + }, + { + "distance": 1.292957924480073, + "concept": "Military and Support Structures", + "description": "Military and support structures refer to various types of constructions designed for specific functions related to defense and utility. This concept encompasses fortified structures, such as castles and fortresses, built for protection and military purposes, as well as auxiliary structures that serve practical roles for main buildings, including storage, recreation, and additional facilities. Together, these structures enhance the safety, functionality, and usability of a property or territory." + } + ] + } + ] + ``` + +## Required Parameters + +- `name`: A unique name for the operation. +- `type`: Must be set to "cluster". +- `embedding_keys`: A list of keys to use for the embedding that is clustered on +- `summary_prompt`: The prompt used to summarize a cluster based on its children. Access input variables with `left.keyname` or `right.keyname`. +- `summary_schema`: The schema for the summary of each cluster. This is the output schema for the `summary_prompt` based llm call. + +## Optional Parameters + +| Parameter | Description | Default | +| ------------------------- | -------------------------------------------------------------------------------- | ----------------------------- | +| `output_key` | The name of the output key where the cluster path will be inserted in the items. | "clusters" | +| `model` | The language model to use | Falls back to `default_model` | +| `embedding_model` | The embedding model to use | "text-embedding-3-small" | +| `tools` | List of tool definitions for LLM use | None | +| `timeout` | Timeout for each LLM call in seconds | 120 | +| `max_retries_per_timeout` | Maximum number of retries per timeout | 2 | +| `validate` | List of Python expressions to validate the output | None | +| `sample` | Number of items to sample for this operation | None | diff --git a/docs/operators/filter.md b/docs/operators/filter.md index c3d51c31..1b370547 100644 --- a/docs/operators/filter.md +++ b/docs/operators/filter.md @@ -91,6 +91,7 @@ This example demonstrates how the Filter operation distinguishes between high-im | `num_retries_on_validate_failure` | Number of retry attempts on validation failure | 0 | | `timeout` | Timeout for each LLM call in seconds | 120 | | `max_retries_per_timeout` | Maximum number of retries per timeout | 2 | +| `sample` | Number of samples to use for the operation | None | !!! info "Validation" diff --git a/docs/operators/gather.md b/docs/operators/gather.md index ce030a86..f84ea8c0 100644 --- a/docs/operators/gather.md +++ b/docs/operators/gather.md @@ -170,6 +170,7 @@ The Gather operation includes several key components: - `content_key`: Indicates the field containing the chunk content - `peripheral_chunks`: Specifies how to include context from surrounding chunks - `doc_header_key` (optional): Denotes a field representing extracted headers for each chunk +- `sample` (optional): Number of samples to use for the operation ### Peripheral Chunks Configuration diff --git a/docs/operators/map.md b/docs/operators/map.md index e1c71101..1f1b7ada 100644 --- a/docs/operators/map.md +++ b/docs/operators/map.md @@ -136,7 +136,7 @@ This example demonstrates how the Map operation can transform long, unstructured | `model` | The language model to use | Falls back to `default_model` | | `optimize` | Flag to enable operation optimization | `True` | | `recursively_optimize` | Flag to enable recursive optimization of operators synthesized as part of rewrite rules | `false` | -| `sample_size` | Number of samples to use for the operation | Processes all data | +| `sample` | Number of samples to use for the operation | Processes all data | | `tools` | List of tool definitions for LLM use | None | | `validate` | List of Python expressions to validate the output | None | | `num_retries_on_validate_failure` | Number of retry attempts on validation failure | 0 | @@ -223,5 +223,5 @@ You can use a map operation to act as an LLM no-op, and just drop any key-value 1. **Clear Prompts**: Write clear, specific prompts that guide the LLM to produce the desired output. 2. **Robust Validation**: Use validation to ensure output quality and consistency. 3. **Appropriate Model Selection**: Choose the right model for your task, balancing performance and cost. -4. **Optimize for Scale**: For large datasets, consider using `sample_size` to test your operation before running on the full dataset. +4. **Optimize for Scale**: For large datasets, consider using `sample` to test your operation before running on the full dataset. 5. **Use Tools Wisely**: Leverage tools for complex calculations or operations that the LLM might struggle with. You can write any Python code in the tools, so you can even use tools to call other APIs or search the internet. diff --git a/docs/operators/parallel-map.md b/docs/operators/parallel-map.md index a4e0f6f1..85ae7682 100644 --- a/docs/operators/parallel-map.md +++ b/docs/operators/parallel-map.md @@ -34,7 +34,7 @@ Each prompt configuration in the `prompts` list should contain: | `model` | The default language model to use | Falls back to `default_model` | | `optimize` | Flag to enable operation optimization | True | | `recursively_optimize` | Flag to enable recursive optimization | false | -| `sample_size` | Number of samples to use for the operation | Processes all data | +| `sample` | Number of samples to use for the operation | Processes all data | | `timeout` | Timeout for each LLM call in seconds | 120 | | `max_retries_per_timeout` | Maximum number of retries per timeout | 2 | diff --git a/docs/operators/reduce.md b/docs/operators/reduce.md index 018f1cde..fd8d0b80 100644 --- a/docs/operators/reduce.md +++ b/docs/operators/reduce.md @@ -51,6 +51,7 @@ This Reduce operation processes customer feedback grouped by department: | Parameter | Description | Default | | ------------------------- | ------------------------------------------------------------------------------------------------------ | --------------------------- | +| `sample` | Number of samples to use for the operation | None | | `synthesize_resolve` | If false, won't synthesize a resolve operation between map and reduce | true | | `model` | The language model to use | Falls back to default_model | | `input` | Specifies the schema or keys to subselect from each item | All keys from input items | diff --git a/docs/operators/resolve.md b/docs/operators/resolve.md index ed69a6a6..f9d24965 100644 --- a/docs/operators/resolve.md +++ b/docs/operators/resolve.md @@ -126,7 +126,7 @@ After determining eligible pairs for comparison, the Resolve operation uses a Un | `limit_comparisons` | Maximum number of comparisons to perform | None | | `timeout` | Timeout for each LLM call in seconds | 120 | | `max_retries_per_timeout` | Maximum number of retries per timeout | 2 | - +| `sample` | Number of samples to use for the operation | None | ## Best Practices 1. **Anticipate Resolve Needs**: If you anticipate needing a Resolve operation and want to control the prompts, create it in your pipeline and let the optimizer find the appropriate blocking rules and thresholds. diff --git a/docs/operators/split.md b/docs/operators/split.md index d28ac996..b9597b3f 100644 --- a/docs/operators/split.md +++ b/docs/operators/split.md @@ -50,6 +50,7 @@ Note that chunks will not overlap in content. | --------------------- | ------------------------------------------------------------------------------- | ----------------------------- | | `model` | The language model's tokenizer to use | Falls back to `default_model` | | `num_splits_to_group` | Number of splits to group together into one chunk (only for "delimiter" method) | 1 | +| `sample` | Number of samples to use for the operation | None | ### Splitting Methods diff --git a/docs/operators/unnest.md b/docs/operators/unnest.md index 458af9c1..d45879b6 100644 --- a/docs/operators/unnest.md +++ b/docs/operators/unnest.md @@ -38,6 +38,7 @@ The Unnest operation is valuable in scenarios where you need to: | expand_fields | A list of fields to expand from the nested dictionary into the parent dictionary, if unnesting a dict | [] | | recursive | If true, the unnest operation will be applied recursively to nested arrays | false | | depth | The maximum depth for recursive unnesting (only applicable if recursive is true) | inf | +| sample | Number of samples to use for the operation | None | ## Output diff --git a/mkdocs.yml b/mkdocs.yml index dac2122a..e995670e 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -30,6 +30,7 @@ nav: - Parallel Map: operators/parallel-map.md - Filter: operators/filter.md - Equijoin: operators/equijoin.md + - Cluster: operators/cluster.md - Auxiliary Operators: - Split: operators/split.md - Gather: operators/gather.md diff --git a/pyproject.toml b/pyproject.toml index 274d4650..e0cf9bb2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -87,6 +87,7 @@ split = "docetl.operations.split:SplitOperation" reduce = "docetl.operations.reduce:ReduceOperation" resolve = "docetl.operations.resolve:ResolveOperation" gather = "docetl.operations.gather:GatherOperation" +cluster = "docetl.operations.cluster:ClusterOperation" [tool.poetry.plugins."docetl.parser"] llama_index_simple_directory_reader = "docetl.parsing_tools:llama_index_simple_directory_reader" diff --git a/tests/basic/test_cluster.py b/tests/basic/test_cluster.py new file mode 100644 index 00000000..27c68bf2 --- /dev/null +++ b/tests/basic/test_cluster.py @@ -0,0 +1,117 @@ +import pytest +from docetl.operations.cluster import ClusterOperation +from tests.conftest import api_wrapper, default_model, max_threads + + +@pytest.fixture +def cluster_config(): + return { + "name": "test_cluster", + "type": "cluster", + "embedding_keys": ["concept", "description"], + "output_key": "categories", + "output_schema": {"concept": "string", "description": "string"}, + "summary_prompt": """ + The following describes two related concepts. What concept + encompasses both? Try not to be too broad; it might be that one of + these two concepts already encompasses the other; in that case, + you should just use that concept. + + {{left.concept}}: + {{left.description}} + + {{right.concept}}: + {{right.description}} + + Provide the title of the super-concept, and a description. + """, + "model": "gpt-4o-mini", + } + + +@pytest.fixture +def sample_data(): + return [ + { + "concept": "Shed", + "description": "A simple, single-story roofed structure, often used for storage or as a workshop.", + }, + { + "concept": "Barn", + "description": "A large agricultural building used for storing farm products and sheltering livestock.", + }, + { + "concept": "Tree house", + "description": "A small house built among the branches of a tree for children to play in.", + }, + { + "concept": "Skyscraper", + "description": "A very tall building of many stories, typically found in urban areas.", + }, + { + "concept": "Castle", + "description": "A large fortified building or set of buildings from the medieval period.", + }, + { + "concept": "Igloo", + "description": "A dome-shaped dwelling made of blocks of solid snow, traditionally built by Inuit people.", + }, + { + "concept": "Lighthouse", + "description": "A tower with a bright light at the top, used to warn or guide ships at sea.", + }, + { + "concept": "Windmill", + "description": "A building with sails or vanes that turn in the wind and generate power to grind grain into flour.", + }, + ] + + +def test_cluster_operation( + cluster_config, sample_data, api_wrapper, default_model, max_threads +): + operation = ClusterOperation( + api_wrapper, cluster_config, default_model, max_threads + ) + results, cost = operation.execute(sample_data) + + assert len(results) == len(sample_data) + assert cost > 0 + + for result in results: + assert "categories" in result + assert isinstance(result["categories"], tuple) + assert len(result["categories"]) > 0 + + for category in result["categories"]: + assert "concept" in category + assert "description" in category + + +def test_cluster_operation_empty_input( + cluster_config, api_wrapper, default_model, max_threads +): + operation = ClusterOperation( + api_wrapper, cluster_config, default_model, max_threads + ) + results, cost = operation.execute([]) + + assert len(results) == 0 + assert cost == 0 + + +def test_cluster_operation_single_item( + cluster_config, api_wrapper, default_model, max_threads +): + single_item = [ + {"concept": "House", "description": "A building for human habitation."} + ] + operation = ClusterOperation( + api_wrapper, cluster_config, default_model, max_threads + ) + results, cost = operation.execute(single_item) + + assert len(results) == 1 + assert cost == 0 + assert "categories" in results[0] + assert isinstance(results[0]["categories"], tuple) From ffe0b18e014ce41425a49ad1c987a8ac5ed14ee0 Mon Sep 17 00:00:00 2001 From: Shreya Shankar Date: Tue, 8 Oct 2024 23:49:01 -0700 Subject: [PATCH 09/10] fix: fixing params in test --- tests/basic/test_cluster.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/basic/test_cluster.py b/tests/basic/test_cluster.py index 27c68bf2..3db8424b 100644 --- a/tests/basic/test_cluster.py +++ b/tests/basic/test_cluster.py @@ -10,7 +10,7 @@ def cluster_config(): "type": "cluster", "embedding_keys": ["concept", "description"], "output_key": "categories", - "output_schema": {"concept": "string", "description": "string"}, + "summary_schema": {"concept": "string", "description": "string"}, "summary_prompt": """ The following describes two related concepts. What concept encompasses both? Try not to be too broad; it might be that one of From c6f94913cd0d3552c38b4132018294b0d1c7ad27 Mon Sep 17 00:00:00 2001 From: Shreya Shankar Date: Wed, 9 Oct 2024 17:09:45 -0700 Subject: [PATCH 10/10] fix: handle azure gpt-4o-mini and output to csv --- docetl/operations/utils.py | 6 +++--- docetl/runner.py | 26 +++++++++++++++++++++----- poetry.lock | 14 +++++++------- 3 files changed, 31 insertions(+), 15 deletions(-) diff --git a/docetl/operations/utils.py b/docetl/operations/utils.py index f4ce3c48..163015ed 100644 --- a/docetl/operations/utils.py +++ b/docetl/operations/utils.py @@ -615,7 +615,7 @@ def call_llm_with_cache( len(props) == 1 and list(props.values())[0].get("type") == "string" and scratchpad is None - and "ollama" in model + and ("ollama" in model or "azure/gpt-4o-mini" in model) ): use_tools = False @@ -635,7 +635,7 @@ def call_llm_with_cache( "type": "function", "function": { "name": "send_output", - "description": "Send structured output back to the user", + "description": "Send output back to the user", "strict": True, "parameters": parameters, "additionalProperties": False, @@ -858,7 +858,7 @@ def call_llm_with_gleaning( "type": "function", "function": { "name": "send_output", - "description": "Send structured output back to the user", + "description": "Send output back to the user", "strict": True, "parameters": parameters, "additionalProperties": False, diff --git a/docetl/runner.py b/docetl/runner.py index c81e13dd..6be5da4d 100644 --- a/docetl/runner.py +++ b/docetl/runner.py @@ -54,13 +54,16 @@ def __init__(self, config: Dict, max_threads: int = None): # Check if output path is correctly formatted as JSON output_path = self.config.get("pipeline", {}).get("output", {}).get("path") if output_path: - if not output_path.lower().endswith(".json"): + if not ( + output_path.lower().endswith(".json") + or output_path.lower().endswith(".csv") + ): raise ValueError( - f"Output path '{output_path}' is not a JSON file. Please provide a path ending with '.json'." + f"Output path '{output_path}' is not a JSON or CSV file. Please provide a path ending with '.json' or '.csv'." ) else: raise ValueError( - "No output path specified in the configuration. Please provide an output path ending with '.json' in the configuration." + "No output path specified in the configuration. Please provide an output path ending with '.json' or '.csv' in the configuration." ) self.syntax_check() @@ -77,6 +80,11 @@ def __init__(self, config: Dict, max_threads: int = None): all_ops_until_and_including_current = [ op_map[prev_op] for prev_op in step["operations"][:idx] ] + [op_map[op_name]] + # If there's no model in the op, add the default model + for op in all_ops_until_and_including_current: + if "model" not in op: + op["model"] = self.default_model + all_ops_str = json.dumps(all_ops_until_and_including_current) self.step_op_hashes[step["name"]][op_name] = hashlib.sha256( all_ops_str.encode() @@ -207,8 +215,16 @@ def save_output(self, data: List[Dict]): self.console.rule("[cyan]Saving Output[/cyan]") output_config = self.config["pipeline"]["output"] if output_config["type"] == "file": - with open(output_config["path"], "w") as file: - json.dump(data, file, indent=2) + if output_config["path"].lower().endswith(".json"): + with open(output_config["path"], "w") as file: + json.dump(data, file, indent=2) + else: # CSV + import csv + + with open(output_config["path"], "w", newline="") as file: + writer = csv.DictWriter(file, fieldnames=data[0].keys()) + writer.writeheader() + writer.writerows(data) self.console.print( f"[green italic]💾 Output saved to {output_config['path']}[/green italic]" ) diff --git a/poetry.lock b/poetry.lock index 8eedfc0d..a5ae584d 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1534,13 +1534,13 @@ requests = ">=2.20" [[package]] name = "litellm" -version = "1.48.10" +version = "1.49.0" description = "Library to easily interface with LLM API providers" optional = false python-versions = "!=2.7.*,!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,!=3.5.*,!=3.6.*,!=3.7.*,>=3.8" files = [ - {file = "litellm-1.48.10-py3-none-any.whl", hash = "sha256:752efd59747a0895f4695d025c66f0b2258d80a61175f7cfa41dbe4894ef95e1"}, - {file = "litellm-1.48.10.tar.gz", hash = "sha256:0a4ff75da78e66baeae0658ad8de498298310a5efda74c3d840ce2b013e8401d"}, + {file = "litellm-1.49.0-py3-none-any.whl", hash = "sha256:53711018b730f8a4262c11461b702b771e46e0c974f9c0bcd5b384b027308dd5"}, + {file = "litellm-1.49.0.tar.gz", hash = "sha256:f5ef51b571b14de318fccdd6728f4e705aad68250f9ed374c7fe6c4e95d6c008"}, ] [package.dependencies] @@ -1549,7 +1549,7 @@ click = "*" importlib-metadata = ">=6.8.0" jinja2 = ">=3.1.2,<4.0.0" jsonschema = ">=4.22.0,<5.0.0" -openai = ">=1.45.0" +openai = ">=1.51.0" pydantic = ">=2.0.0,<3.0.0" python-dotenv = ">=0.2.0" requests = ">=2.31.0,<3.0.0" @@ -2330,13 +2330,13 @@ files = [ [[package]] name = "openai" -version = "1.50.2" +version = "1.51.2" description = "The official Python library for the openai API" optional = false python-versions = ">=3.7.1" files = [ - {file = "openai-1.50.2-py3-none-any.whl", hash = "sha256:822dd2051baa3393d0d5406990611975dd6f533020dc9375a34d4fe67e8b75f7"}, - {file = "openai-1.50.2.tar.gz", hash = "sha256:3987ae027152fc8bea745d60b02c8f4c4a76e1b5c70e73565fa556db6f78c9e6"}, + {file = "openai-1.51.2-py3-none-any.whl", hash = "sha256:5c5954711cba931423e471c37ff22ae0fd3892be9b083eee36459865fbbb83fa"}, + {file = "openai-1.51.2.tar.gz", hash = "sha256:c6a51fac62a1ca9df85a522e462918f6bb6bc51a8897032217e453a0730123a6"}, ] [package.dependencies]