From da33310775b6848e2dc3cb685b5122015db0ed47 Mon Sep 17 00:00:00 2001 From: Shreya Shankar Date: Sat, 21 Sep 2024 14:32:39 -0700 Subject: [PATCH 1/3] Create Python API --- docetl/api.py | 313 ++++++++++++++++++++++++++++++++++++++++++++++ docetl/builder.py | 37 ++++-- docetl/cli.py | 5 +- docetl/runner.py | 10 +- 4 files changed, 347 insertions(+), 18 deletions(-) create mode 100644 docetl/api.py diff --git a/docetl/api.py b/docetl/api.py new file mode 100644 index 00000000..8d135484 --- /dev/null +++ b/docetl/api.py @@ -0,0 +1,313 @@ +""" +This module defines the core data structures and classes for the DocETL pipeline. + +It includes dataclasses for various operation types, pipeline steps, and the main Pipeline class. +The module provides a high-level API for defining, optimizing, and running document processing pipelines. + +Classes: + Dataset: Represents a dataset with a type and path. + BaseOp: Base class for all operation types. + MapOp: Represents a map operation in the pipeline. + ResolveOp: Represents a resolve operation for entity resolution. + ReduceOp: Represents a reduce operation in the pipeline. + ParallelMapOp: Represents a parallel map operation. + FilterOp: Represents a filter operation in the pipeline. + EquijoinOp: Represents an equijoin operation for joining datasets. + SplitOp: Represents a split operation for dividing data. + GatherOp: Represents a gather operation for collecting data. + UnnestOp: Represents an unnest operation for flattening nested structures. + PipelineStep: Represents a step in the pipeline with input and operations. + PipelineOutput: Defines the output configuration for the pipeline. + Pipeline: Main class for defining and running a complete document processing pipeline. + +The Pipeline class provides methods for optimizing and running the defined pipeline, +as well as utility methods for converting between dictionary and object representations. + +Usage: + from docetl.api import Pipeline, Dataset, MapOp, ReduceOp + + pipeline = Pipeline( + datasets={"input": Dataset(type="file", path="input.json")}, + operations=[ + MapOp(name="process", type="map", prompt="Process the document"), + ReduceOp(name="summarize", type="reduce", reduce_key="content") + ], + steps=[ + PipelineStep(name="process_step", input="input", operations=["process"]), + PipelineStep(name="summarize_step", input="process_step", operations=["summarize"]) + ], + output=PipelineOutput(type="file", path="output.json") + ) + + optimized_pipeline = pipeline.optimize() + result = optimized_pipeline.run() +""" + +from dataclasses import dataclass +from typing import List, Optional, Dict, Any, Union + +from docetl.builder import Optimizer +from docetl.runner import DSLRunner + + +@dataclass +class Dataset: + type: str + path: str + + +@dataclass +class BaseOp: + name: str + type: str + + +@dataclass +class MapOp(BaseOp): + output: Optional[Dict[str, Any]] = None + prompt: Optional[str] = None + model: Optional[str] = None + optimize: Optional[bool] = None + recursively_optimize: Optional[bool] = None + sample_size: Optional[int] = None + tools: Optional[List[Dict[str, Any]]] = None + validate: Optional[List[str]] = None + num_retries_on_validate_failure: Optional[int] = None + gleaning: Optional[Dict[str, Any]] = None + drop_keys: Optional[List[str]] = None + + +@dataclass +class ResolveOp(BaseOp): + comparison_prompt: str + resolution_prompt: str + output: Optional[Dict[str, Any]] = None + embedding_model: Optional[str] = None + resolution_model: Optional[str] = None + comparison_model: Optional[str] = None + blocking_keys: Optional[List[str]] = None + blocking_threshold: Optional[float] = None + blocking_conditions: Optional[List[str]] = None + input: Optional[Dict[str, Any]] = None + embedding_batch_size: Optional[int] = None + compare_batch_size: Optional[int] = None + limit_comparisons: Optional[int] = None + optimize: Optional[bool] = None + + +@dataclass +class ReduceOp(BaseOp): + reduce_key: Union[str, List[str]] + output: Optional[Dict[str, Any]] = None + prompt: Optional[str] = None + optimize: Optional[bool] = None + synthesize_resolve: Optional[bool] = None + model: Optional[str] = None + input: Optional[Dict[str, Any]] = None + pass_through: Optional[bool] = None + associative: Optional[bool] = None + fold_prompt: Optional[str] = None + fold_batch_size: Optional[int] = None + value_sampling: Optional[Dict[str, Any]] = None + verbose: Optional[bool] = None + + +@dataclass +class ParallelMapOp(BaseOp): + prompts: List[Dict[str, Any]] + output: Optional[Dict[str, Any]] = None + model: Optional[str] = None + optimize: Optional[bool] = None + recursively_optimize: Optional[bool] = None + sample_size: Optional[int] = None + drop_keys: Optional[List[str]] = None + + +@dataclass +class FilterOp(BaseOp): + output: Optional[Dict[str, Any]] = None + prompt: Optional[str] = None + model: Optional[str] = None + optimize: Optional[bool] = None + recursively_optimize: Optional[bool] = None + sample_size: Optional[int] = None + validate: Optional[List[str]] = None + num_retries_on_validate_failure: Optional[int] = None + + +@dataclass +class EquijoinOp(BaseOp): + join_key: Dict[str, Dict[str, str]] + comparison_prompt: str + output: Optional[Dict[str, Any]] = None + blocking_keys: Optional[List[str]] = None + blocking_threshold: Optional[float] = None + blocking_conditions: Optional[List[str]] = None + limits: Optional[Dict[str, int]] = None + model: Optional[str] = None + optimize: Optional[bool] = None + + +@dataclass +class SplitOp(BaseOp): + split_key: str + method: str + method_kwargs: Dict[str, Any] + model: Optional[str] = None + + +@dataclass +class GatherOp(BaseOp): + content_key: str + doc_id_key: str + order_key: str + peripheral_chunks: Dict[str, Any] + doc_header_key: Optional[str] = None + + +@dataclass +class UnnestOp(BaseOp): + unnest_key: str + keep_empty: Optional[bool] = None + expand_fields: Optional[List[str]] = None + recursive: Optional[bool] = None + depth: Optional[int] = None + + +OpType = Union[ + MapOp, + ResolveOp, + ReduceOp, + ParallelMapOp, + FilterOp, + EquijoinOp, + SplitOp, + GatherOp, + UnnestOp, +] + + +@dataclass +class PipelineStep: + name: str + input: str + operations: List[str] + + +@dataclass +class PipelineOutput: + type: str + path: str + intermediate_dir: Optional[str] = None + + +@dataclass +class Pipeline: + datasets: Dict[str, Dataset] + operations: List[OpType] + steps: List[PipelineStep] + output: PipelineOutput + default_model: Optional[str] = None + + def optimize( + self, + max_threads: Optional[int] = None, + model: str = "gpt-4o", + resume: bool = False, + timeout: int = 60, + ) -> "Pipeline": + """ + Optimize the pipeline using the Optimizer. + + Args: + max_threads (Optional[int]): Maximum number of threads to use for optimization. + model (str): The model to use for optimization. Defaults to "gpt-4o". + resume (bool): Whether to resume optimization from a previous state. Defaults to False. + timeout (int): Timeout for optimization in seconds. Defaults to 60. + + Returns: + Pipeline: An optimized version of the pipeline. + """ + config = self._to_dict() + optimizer = Optimizer( + config, + max_threads=max_threads, + model=model, + timeout=timeout, + resume=resume, + ) + optimizer.optimize() + optimized_config = optimizer.clean_optimized_config() + + updated_pipeline = Pipeline() + updated_pipeline._update_from_dict(optimized_config) + return updated_pipeline + + def run(self, max_threads: Optional[int] = None) -> float: + """ + Run the pipeline using the DSLRunner. + + Args: + max_threads (Optional[int]): Maximum number of threads to use for execution. + + Returns: + float: The total cost of running the pipeline. + """ + config = self._to_dict() + runner = DSLRunner(config, max_threads=max_threads) + result = runner.run() + return result + + def _to_dict(self) -> Dict[str, Any]: + """ + Convert the Pipeline object to a dictionary representation. + + Returns: + Dict[str, Any]: Dictionary representation of the Pipeline. + """ + return { + "datasets": { + name: dataset.__dict__ for name, dataset in self.datasets.items() + }, + "operations": [op.__dict__ for op in self.operations], + "pipeline": { + "steps": [step.__dict__ for step in self.steps], + "output": self.output.__dict__, + }, + "default_model": self.default_model, + } + + def _update_from_dict(self, config: Dict[str, Any]): + """ + Update the Pipeline object from a dictionary representation. + + Args: + config (Dict[str, Any]): Dictionary representation of the Pipeline. + """ + self.datasets = { + name: Dataset(**dataset) for name, dataset in config["datasets"].items() + } + self.operations = [] + for op in config["operations"]: + op_type = op.pop("type") + if op_type == "map": + self.operations.append(MapOp(**op, type=op_type)) + elif op_type == "resolve": + self.operations.append(ResolveOp(**op, type=op_type)) + elif op_type == "reduce": + self.operations.append(ReduceOp(**op, type=op_type)) + elif op_type == "parallel_map": + self.operations.append(ParallelMapOp(**op, type=op_type)) + elif op_type == "filter": + self.operations.append(FilterOp(**op, type=op_type)) + elif op_type == "equijoin": + self.operations.append(EquijoinOp(**op, type=op_type)) + elif op_type == "split": + self.operations.append(SplitOp(**op, type=op_type)) + elif op_type == "gather": + self.operations.append(GatherOp(**op, type=op_type)) + elif op_type == "unnest": + self.operations.append(UnnestOp(**op, type=op_type)) + self.steps = [PipelineStep(**step) for step in config["pipeline"]["steps"]] + self.output = PipelineOutput(**config["pipeline"]["output"]) + self.default_model = config.get("default_model") diff --git a/docetl/builder.py b/docetl/builder.py index 553b1bd2..a36f4577 100644 --- a/docetl/builder.py +++ b/docetl/builder.py @@ -77,9 +77,18 @@ def items(self): class Optimizer: + @classmethod + def from_yaml(cls, yaml_file: str, **kwargs): + base_name = yaml_file.rsplit(".", 1)[0] + suffix = yaml_file.rsplit(".", 1)[1] + config = load_config(yaml_file) + return cls(config, base_name, suffix, **kwargs) + def __init__( self, - yaml_file: str, + config: Dict, + base_name: str, + yaml_file_suffix: str, max_threads: Optional[int] = None, model: str = "gpt-4o", resume: bool = False, @@ -119,8 +128,7 @@ def __init__( The method also calls print_optimizer_config() to display the initial configuration. """ - self.yaml_file_path = yaml_file - self.config = load_config(yaml_file) + self.config = config self.console = Console() self.optimized_config = copy.deepcopy(self.config) self.llm_client = LLMClient(model) @@ -132,12 +140,10 @@ def __init__( self.resume = resume home_dir = os.path.expanduser("~") - yaml_file_suffix = yaml_file.split("/")[-1].split(".")[0] cache_dir = os.path.join(home_dir, f".docetl/cache/{yaml_file_suffix}") os.makedirs(cache_dir, exist_ok=True) self.datasets = DatasetOnDisk(dir=cache_dir, console=self.console) self.optimized_ops_path = f"{cache_dir}/optimized_ops" - base_name = yaml_file.rsplit(".", 1)[0] self.optimized_config_path = f"{base_name}_opt.yaml" # Update sample size map @@ -200,7 +206,6 @@ def print_optimizer_config(self): separator lines to clearly delineate the configuration information. """ self.console.rule("[bold cyan]Optimizer Configuration[/bold cyan]") - self.console.log(f"[yellow]YAML File:[/yellow] {self.yaml_file_path}") self.console.log(f"[yellow]Sample Size:[/yellow] {self.sample_size_map}") self.console.log(f"[yellow]Max Threads:[/yellow] {self.max_threads}") self.console.log(f"[yellow]Model:[/yellow] {self.llm_client.model}") @@ -588,8 +593,6 @@ def optimize(self): else: self.datasets[step_hash] = copy.deepcopy(input_data) - self._save_optimized_config() - self.console.log( f"[bold]Total agent cost: ${self.llm_client.total_cost:.2f}[/bold]" ) @@ -1379,12 +1382,9 @@ def resolve_anchors(data): else: return data - def _save_optimized_config(self): + def clean_optimized_config(self): """ - Save the optimized configuration to a YAML file. - - This method creates a copy of the optimized configuration, resolves all anchors and aliases, - and saves it to a new YAML file. The new file name is based on the original file name with '_opt' appended. + Remove _intermediates from each operation in the optimized config """ # Create a copy of the optimized config to modify config_to_save = self.optimized_config.copy() @@ -1397,6 +1397,17 @@ def _save_optimized_config(self): if "_intermediates" in op_config: del op_config["_intermediates"] + return resolved_config + + def save_optimized_config(self): + """ + Save the optimized configuration to a YAML file. + + This method creates a copy of the optimized configuration, resolves all anchors and aliases, + and saves it to a new YAML file. The new file name is based on the original file name with '_opt' appended. + """ + resolved_config = self.clean_optimized_config() + with open(self.optimized_config_path, "w") as f: yaml.safe_dump(resolved_config, f, default_flow_style=False, width=80) self.console.log( diff --git a/docetl/cli.py b/docetl/cli.py index 7951cbab..f643adf2 100644 --- a/docetl/cli.py +++ b/docetl/cli.py @@ -36,7 +36,7 @@ def build( resume (bool): Whether to resume optimization from a previous run. Defaults to False. timeout (int): Timeout for optimization operations in seconds. Defaults to 60. """ - optimizer = Optimizer( + optimizer = Optimizer.from_yaml( str(yaml_file), max_threads=max_threads, model=model, @@ -44,6 +44,7 @@ def build( resume=resume, ) optimizer.optimize() + optimizer.save_optimized_config() @app.command() @@ -62,7 +63,7 @@ def run( yaml_file (Path): Path to the YAML file containing the pipeline configuration. max_threads (Optional[int]): Maximum number of threads to use for running operations. """ - runner = DSLRunner(str(yaml_file), max_threads=max_threads) + runner = DSLRunner.from_yaml(str(yaml_file), max_threads=max_threads) runner.run() diff --git a/docetl/runner.py b/docetl/runner.py index 68c76ec6..2c6ceb73 100644 --- a/docetl/runner.py +++ b/docetl/runner.py @@ -31,15 +31,14 @@ class DSLRunner: datasets (Dict): Storage for loaded datasets. """ - def __init__(self, yaml_file: str, max_threads: int = None): + def __init__(self, config: Dict, max_threads: int = None): """ Initialize the DSLRunner with a YAML configuration file. Args: - yaml_file (str): Path to the YAML configuration file. max_threads (int, optional): Maximum number of threads to use. Defaults to None. """ - self.config = load_config(yaml_file) + self.config = config self.default_model = self.config.get("default_model", "gpt-4o-mini") self.max_threads = max_threads or (os.cpu_count() or 1) * 4 self.console = Console() @@ -64,6 +63,11 @@ def __init__(self, yaml_file: str, max_threads: int = None): self.syntax_check() + @classmethod + def from_yaml(cls, yaml_file: str, **kwargs): + config = load_config(yaml_file) + return cls(config, **kwargs) + def syntax_check(self): """ Perform a syntax check on all operations defined in the configuration. From 4a07d6214e299efd3c96dc241d36088637058383 Mon Sep 17 00:00:00 2001 From: Shreya Shankar Date: Sat, 21 Sep 2024 22:30:30 -0700 Subject: [PATCH 2/3] feat: add python api --- docetl/api.py | 40 +++- docetl/builder.py | 2 +- docetl/ui.py | 75 ------- docs/api-reference/python.md | 127 +++++++++++ mkdocs.yml | 1 + tests/test_api.py | 405 +++++++++++++++++++++++++++++++++++ tests/test_config.py | 2 +- tests/test_synth_gather.py | 8 +- tests/test_synth_resolve.py | 5 +- 9 files changed, 575 insertions(+), 90 deletions(-) delete mode 100644 docetl/ui.py create mode 100644 docs/api-reference/python.md create mode 100644 tests/test_api.py diff --git a/docetl/api.py b/docetl/api.py index 8d135484..ccde6f64 100644 --- a/docetl/api.py +++ b/docetl/api.py @@ -44,6 +44,7 @@ """ from dataclasses import dataclass +import os from typing import List, Optional, Dict, Any, Union from docetl.builder import Optimizer @@ -137,15 +138,20 @@ class FilterOp(BaseOp): @dataclass class EquijoinOp(BaseOp): - join_key: Dict[str, Dict[str, str]] + left: str + right: str comparison_prompt: str output: Optional[Dict[str, Any]] = None - blocking_keys: Optional[List[str]] = None blocking_threshold: Optional[float] = None - blocking_conditions: Optional[List[str]] = None + blocking_conditions: Optional[Dict[str, List[str]]] = None limits: Optional[Dict[str, int]] = None - model: Optional[str] = None + comparison_model: Optional[str] = None optimize: Optional[bool] = None + embedding_model: Optional[str] = None + embedding_batch_size: Optional[int] = None + compare_batch_size: Optional[int] = None + limit_comparisons: Optional[int] = None + blocking_keys: Optional[Dict[str, List[str]]] = None @dataclass @@ -190,8 +196,8 @@ class UnnestOp(BaseOp): @dataclass class PipelineStep: name: str - input: str - operations: List[str] + operations: List[Union[Dict[str, Any], str]] + input: Optional[str] = None @dataclass @@ -203,6 +209,7 @@ class PipelineOutput: @dataclass class Pipeline: + name: str datasets: Dict[str, Dataset] operations: List[OpType] steps: List[PipelineStep] @@ -231,6 +238,8 @@ def optimize( config = self._to_dict() optimizer = Optimizer( config, + base_name=os.path.join(os.getcwd(), self.name), + yaml_file_suffix=self.name, max_threads=max_threads, model=model, timeout=timeout, @@ -239,7 +248,14 @@ def optimize( optimizer.optimize() optimized_config = optimizer.clean_optimized_config() - updated_pipeline = Pipeline() + updated_pipeline = Pipeline( + name=self.name, + datasets=self.datasets, + operations=self.operations, + steps=self.steps, + output=self.output, + default_model=self.default_model, + ) updated_pipeline._update_from_dict(optimized_config) return updated_pipeline @@ -269,9 +285,15 @@ def _to_dict(self) -> Dict[str, Any]: "datasets": { name: dataset.__dict__ for name, dataset in self.datasets.items() }, - "operations": [op.__dict__ for op in self.operations], + "operations": [ + {k: v for k, v in op.__dict__.items() if v is not None} + for op in self.operations + ], "pipeline": { - "steps": [step.__dict__ for step in self.steps], + "steps": [ + {k: v for k, v in step.__dict__.items() if v is not None} + for step in self.steps + ], "output": self.output.__dict__, }, "default_model": self.default_model, diff --git a/docetl/builder.py b/docetl/builder.py index a36f4577..fdd9bac9 100644 --- a/docetl/builder.py +++ b/docetl/builder.py @@ -80,7 +80,7 @@ class Optimizer: @classmethod def from_yaml(cls, yaml_file: str, **kwargs): base_name = yaml_file.rsplit(".", 1)[0] - suffix = yaml_file.rsplit(".", 1)[1] + suffix = yaml_file.split("/")[-1].split(".")[0] config = load_config(yaml_file) return cls(config, base_name, suffix, **kwargs) diff --git a/docetl/ui.py b/docetl/ui.py deleted file mode 100644 index 58db0dc6..00000000 --- a/docetl/ui.py +++ /dev/null @@ -1,75 +0,0 @@ -from textual.app import App, ComposeResult -from textual.containers import ScrollableContainer -from textual.widgets import Header, Footer, Input, Button, Label, Static -from textual.screen import Screen - - -class QueryInput(Static): - """A widget for query input.""" - - def compose(self) -> ComposeResult: - yield Label("Data Path:") - yield Input(placeholder="Enter the path to your data", id="data_path_input") - yield Label("Query Description:") - yield Input( - placeholder="Describe your query in high-level terms", id="query_desc_input" - ) - yield Button("Optimize Query", id="optimize_button", variant="primary") - - -class QueryResult(Static): - """A widget to display query optimization results.""" - - def compose(self) -> ComposeResult: - yield Static("Results will appear here.", id="result_area") - - -class QueryOptimizer(Screen): - def compose(self) -> ComposeResult: - yield Header() - yield ScrollableContainer(QueryInput(), QueryResult()) - yield Footer() - - def on_button_pressed(self, event: Button.Pressed) -> None: - if event.button.id == "optimize_button": - self.optimize_query() - - def optimize_query(self) -> None: - data_path = self.query_one("#data_path_input").value - query_desc = self.query_one("#query_desc_input").value - - if not data_path or not query_desc: - self.query_one("#result_area").update("Please fill in both fields.") - return - - # Simulated query optimization result - result = f""" -Optimizing query... - -Data path: {data_path} -Query description: {query_desc} - -Optimized query plan: -1. Load data from specified path -2. Apply filters based on query description -3. Perform necessary joins -4. Aggregate results -5. Sort output - """ - self.query_one("#result_area").update(result) - - -class QueryOptimizerApp(App): - BINDINGS = [("d", "toggle_dark", "Toggle dark mode")] - - def on_mount(self) -> None: - self.push_screen(QueryOptimizer()) - - def action_toggle_dark(self) -> None: - """An action to toggle dark mode.""" - self.dark = not self.dark - - -if __name__ == "__main__": - app = QueryOptimizerApp() - app.run() diff --git a/docs/api-reference/python.md b/docs/api-reference/python.md new file mode 100644 index 00000000..aa4fcfdb --- /dev/null +++ b/docs/api-reference/python.md @@ -0,0 +1,127 @@ +# Python API + +::: docetl.api.Dataset + options: + show_root_heading: true + heading_level: 3 + show_if_no_docstring: false + docstring_options: + ignore_init_summary: false + trim_doctest_flags: true + +::: docetl.api.BaseOp + options: + show_root_heading: true + heading_level: 3 + show_if_no_docstring: false + docstring_options: + ignore_init_summary: false + trim_doctest_flags: true + +::: docetl.api.MapOp + options: + show_root_heading: true + heading_level: 3 + show_if_no_docstring: false + docstring_options: + ignore_init_summary: false + trim_doctest_flags: true + +::: docetl.api.ResolveOp + options: + show_root_heading: true + heading_level: 3 + show_if_no_docstring: false + docstring_options: + ignore_init_summary: false + trim_doctest_flags: true + +::: docetl.api.ReduceOp + options: + show_root_heading: true + heading_level: 3 + show_if_no_docstring: false + docstring_options: + ignore_init_summary: false + trim_doctest_flags: true + +::: docetl.api.ParallelMapOp + options: + show_root_heading: true + heading_level: 3 + show_if_no_docstring: false + docstring_options: + ignore_init_summary: false + trim_doctest_flags: true + +::: docetl.api.FilterOp + options: + show_root_heading: true + heading_level: 3 + show_if_no_docstring: false + docstring_options: + ignore_init_summary: false + trim_doctest_flags: true + +::: docetl.api.EquijoinOp + options: + show_root_heading: true + heading_level: 3 + show_if_no_docstring: false + docstring_options: + ignore_init_summary: false + trim_doctest_flags: true + +::: docetl.api.SplitOp + options: + show_root_heading: true + heading_level: 3 + show_if_no_docstring: false + docstring_options: + ignore_init_summary: false + trim_doctest_flags: true + +::: docetl.api.GatherOp + options: + show_root_heading: true + heading_level: 3 + show_if_no_docstring: false + docstring_options: + ignore_init_summary: false + trim_doctest_flags: true + +::: docetl.api.UnnestOp + options: + show_root_heading: true + heading_level: 3 + show_if_no_docstring: false + docstring_options: + ignore_init_summary: false + trim_doctest_flags: true + +::: docetl.api.PipelineStep + options: + show_root_heading: true + heading_level: 3 + show_if_no_docstring: false + docstring_options: + ignore_init_summary: false + trim_doctest_flags: true + +::: docetl.api.PipelineOutput + options: + show_root_heading: true + heading_level: 3 + show_if_no_docstring: false + docstring_options: + ignore_init_summary: false + trim_doctest_flags: true + +::: docetl.api.Pipeline + options: + show_root_heading: true + heading_level: 3 + show_if_no_docstring: false + docstring_options: + ignore_init_summary: false + trim_doctest_flags: true diff --git a/mkdocs.yml b/mkdocs.yml index c41c805f..7742e0a2 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -49,6 +49,7 @@ nav: - docetl.cli: api-reference/cli.md - docetl.operations: api-reference/operations.md - docetl.optimizers: api-reference/optimizers.md + - Python API: api-reference/python.md - Examples: - Reporting on Themes from Presidential Debates: examples/presidential-debate-themes.md - Mining Product Reviews for Polarizing Features: examples/mining-product-reviews.md diff --git a/tests/test_api.py b/tests/test_api.py new file mode 100644 index 00000000..55b2c6b7 --- /dev/null +++ b/tests/test_api.py @@ -0,0 +1,405 @@ +import pytest +import json +import tempfile +import os +from docetl.api import ( + Pipeline, + Dataset, + MapOp, + ReduceOp, + ParallelMapOp, + FilterOp, + PipelineStep, + PipelineOutput, + ResolveOp, + EquijoinOp, +) +from dotenv import load_dotenv + +load_dotenv() + + +@pytest.fixture +def default_model(): + return "gpt-4o-mini" + + +@pytest.fixture +def max_threads(): + return 4 + + +@pytest.fixture +def temp_input_file(): + with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".json") as tmp: + json.dump( + [ + {"text": "This is a positive sentence.", "group": "A"}, + {"text": "This is a negative sentence.", "group": "B"}, + {"text": "This is a neutral sentence.", "group": "A"}, + ], + tmp, + ) + yield tmp.name + os.unlink(tmp.name) + + +@pytest.fixture +def temp_output_file(): + with tempfile.NamedTemporaryFile(delete=False, suffix=".json") as tmp: + pass + yield tmp.name + os.unlink(tmp.name) + + +@pytest.fixture +def temp_intermediate_dir(): + with tempfile.TemporaryDirectory() as tmpdirname: + yield tmpdirname + + +@pytest.fixture +def map_config(): + return MapOp( + name="sentiment_analysis", + type="map", + prompt="Analyze the sentiment of the following text: '{{ input.text }}'. Classify it as either positive, negative, or neutral.", + output={"schema": {"sentiment": "string"}}, + model="gpt-4o-mini", + ) + + +@pytest.fixture +def reduce_config(): + return ReduceOp( + name="group_summary", + type="reduce", + reduce_key="group", + prompt="Summarize the following group of values: {{ inputs }} Provide a total and any other relevant statistics.", + output={"schema": {"total": "number", "avg": "number"}}, + model="gpt-4o-mini", + ) + + +@pytest.fixture +def parallel_map_config(): + return ParallelMapOp( + name="sentiment_and_word_count", + type="parallel_map", + prompts=[ + { + "name": "sentiment", + "prompt": "Analyze the sentiment of the following text: '{{ input.text }}'. Classify it as either positive, negative, or neutral.", + "output_keys": ["sentiment"], + "model": "gpt-4o-mini", + }, + { + "name": "word_count", + "prompt": "Count the number of words in the following text: '{{ input.text }}'. Return the count as an integer.", + "output_keys": ["word_count"], + "model": "gpt-4o-mini", + }, + ], + output={"schema": {"sentiment": "string", "word_count": "integer"}}, + ) + + +@pytest.fixture +def filter_config(): + return FilterOp( + name="positive_sentiment_filter", + type="filter", + prompt="Is the sentiment of the following text positive? '{{ input.text }}'. Return true if positive, false otherwise.", + model="gpt-4o-mini", + output={"schema": {"filtered": "boolean"}}, + ) + + +@pytest.fixture +def resolve_config(): + return ResolveOp( + name="name_email_resolver", + type="resolve", + blocking_keys=["name", "email"], + blocking_threshold=0.8, + comparison_prompt="Compare the following two entries and determine if they likely refer to the same person: Person 1: {{ input1 }} Person 2: {{ input2 }} Return true if they likely match, false otherwise.", + output={"schema": {"name": "string", "email": "string"}}, + embedding_model="text-embedding-3-small", + comparison_model="gpt-4o-mini", + resolution_model="gpt-4o-mini", + resolution_prompt="Given the following list of similar entries, determine one common name and email. {{ inputs }}", + ) + + +@pytest.fixture +def reduce_sample_data(temp_input_file): + data = [ + {"group": "A", "value": 10}, + {"group": "B", "value": 20}, + {"group": "A", "value": 15}, + {"group": "B", "value": 25}, + ] + with open(temp_input_file, "w") as f: + json.dump(data, f) + return temp_input_file + + +@pytest.fixture +def resolve_sample_data(temp_input_file): + data = [ + {"name": "John Doe"}, + {"name": "Jane Smith"}, + {"name": "Bob Johnson"}, + ] + with open(temp_input_file, "w") as f: + json.dump(data, f) + return temp_input_file + + +@pytest.fixture +def left_data(temp_input_file): + data = [ + {"user_id": "1", "name": "John Doe"}, + {"user_id": "2", "name": "Jane Smith"}, + {"user_id": "3", "name": "Bob Johnson"}, + ] + with open(temp_input_file, "w") as f: + json.dump(data, f) + return temp_input_file + + +@pytest.fixture +def right_data(temp_input_file): + data = [ + {"id": "1", "email": "john@example.com", "age": 30}, + {"id": "2", "email": "jane@example.com", "age": 28}, + {"id": "3", "email": "bob@example.com", "age": 35}, + ] + with open(temp_input_file, "w") as f: + json.dump(data, f) + return temp_input_file + + +def test_pipeline_creation( + map_config, reduce_config, temp_input_file, temp_output_file, temp_intermediate_dir +): + pipeline = Pipeline( + name="test_pipeline", + datasets={"test_input": Dataset(type="file", path=temp_input_file)}, + operations=[map_config, reduce_config], + steps=[ + PipelineStep( + name="map_step", input="test_input", operations=["sentiment_analysis"] + ), + PipelineStep( + name="reduce_step", input="map_step", operations=["group_summary"] + ), + ], + output=PipelineOutput( + type="file", path=temp_output_file, intermediate_dir=temp_intermediate_dir + ), + default_model="gpt-4o-mini", + ) + + assert isinstance(pipeline, Pipeline) + assert len(pipeline.operations) == 2 + assert len(pipeline.steps) == 2 + + +def test_pipeline_optimization( + map_config, reduce_config, temp_input_file, temp_output_file, temp_intermediate_dir +): + pipeline = Pipeline( + name="test_pipeline", + datasets={"test_input": Dataset(type="file", path=temp_input_file)}, + operations=[map_config, reduce_config], + steps=[ + PipelineStep( + name="map_step", input="test_input", operations=["sentiment_analysis"] + ), + PipelineStep( + name="reduce_step", input="map_step", operations=["group_summary"] + ), + ], + output=PipelineOutput( + type="file", path=temp_output_file, intermediate_dir=temp_intermediate_dir + ), + default_model="gpt-4o-mini", + ) + + optimized_pipeline = pipeline.optimize(max_threads=4, model="gpt-4o", timeout=10) + + assert isinstance(optimized_pipeline, Pipeline) + assert len(optimized_pipeline.operations) == len(pipeline.operations) + assert len(optimized_pipeline.steps) == len(pipeline.steps) + + +def test_pipeline_execution( + map_config, temp_input_file, temp_output_file, temp_intermediate_dir +): + pipeline = Pipeline( + name="test_pipeline", + datasets={"test_input": Dataset(type="file", path=temp_input_file)}, + operations=[map_config], + steps=[ + PipelineStep( + name="map_step", input="test_input", operations=["sentiment_analysis"] + ), + ], + output=PipelineOutput( + type="file", path=temp_output_file, intermediate_dir=temp_intermediate_dir + ), + default_model="gpt-4o-mini", + ) + + cost = pipeline.run(max_threads=4) + + assert isinstance(cost, float) + assert cost > 0 + + +def test_parallel_map_pipeline( + parallel_map_config, temp_input_file, temp_output_file, temp_intermediate_dir +): + pipeline = Pipeline( + name="test_pipeline", + datasets={"test_input": Dataset(type="file", path=temp_input_file)}, + operations=[parallel_map_config], + steps=[ + PipelineStep( + name="parallel_map_step", + input="test_input", + operations=["sentiment_and_word_count"], + ), + ], + output=PipelineOutput( + type="file", path=temp_output_file, intermediate_dir=temp_intermediate_dir + ), + default_model="gpt-4o-mini", + ) + + cost = pipeline.run(max_threads=4) + + assert isinstance(cost, float) + assert cost > 0 + + +def test_filter_pipeline( + filter_config, temp_input_file, temp_output_file, temp_intermediate_dir +): + pipeline = Pipeline( + name="test_pipeline", + datasets={"test_input": Dataset(type="file", path=temp_input_file)}, + operations=[filter_config], + steps=[ + PipelineStep( + name="filter_step", + input="test_input", + operations=["positive_sentiment_filter"], + ), + ], + output=PipelineOutput( + type="file", path=temp_output_file, intermediate_dir=temp_intermediate_dir + ), + default_model="gpt-4o-mini", + ) + + cost = pipeline.run(max_threads=4) + + assert isinstance(cost, float) + assert cost > 0 + + +def test_reduce_pipeline( + reduce_config, reduce_sample_data, temp_output_file, temp_intermediate_dir +): + pipeline = Pipeline( + name="test_pipeline", + datasets={"test_input": Dataset(type="file", path=reduce_sample_data)}, + operations=[reduce_config], + steps=[ + PipelineStep( + name="reduce_step", input="test_input", operations=["group_summary"] + ), + ], + output=PipelineOutput( + type="file", path=temp_output_file, intermediate_dir=temp_intermediate_dir + ), + default_model="gpt-4o-mini", + ) + + cost = pipeline.run(max_threads=4) + + assert isinstance(cost, float) + assert cost > 0 + + +def test_resolve_pipeline( + resolve_config, resolve_sample_data, temp_output_file, temp_intermediate_dir +): + pipeline = Pipeline( + name="test_pipeline", + datasets={"test_input": Dataset(type="file", path=resolve_sample_data)}, + operations=[resolve_config], + steps=[ + PipelineStep( + name="resolve_step", + input="test_input", + operations=["name_email_resolver"], + ), + ], + output=PipelineOutput( + type="file", path=temp_output_file, intermediate_dir=temp_intermediate_dir + ), + default_model="gpt-4o-mini", + ) + + cost = pipeline.run(max_threads=4) + + assert isinstance(cost, float) + assert cost > 0 + + +def test_equijoin_pipeline( + left_data, right_data, temp_output_file, temp_intermediate_dir +): + pipeline = Pipeline( + name="test_pipeline", + datasets={ + "left": Dataset(type="file", path=left_data), + "right": Dataset(type="file", path=right_data), + }, + operations=[ + EquijoinOp( + name="user_data_join", + type="equijoin", + left="left", + right="right", + comparison_prompt="Compare the following two entries and determine if they are the same id: Left: {{ left.id }} Right: {{ right.user_id }}", + embedding_model="text-embedding-3-small", + comparison_model="gpt-4o-mini", + ) + ], + steps=[ + PipelineStep( + name="equijoin_step", + operations=[ + { + "user_data_join": { + "left": "left", + "right": "right", + } + } + ], + ), + ], + output=PipelineOutput( + type="file", path=temp_output_file, intermediate_dir=temp_intermediate_dir + ), + default_model="gpt-4o-mini", + ) + + cost = pipeline.run(max_threads=4) + + assert isinstance(cost, float) + assert cost > 0 diff --git a/tests/test_config.py b/tests/test_config.py index bac7ae29..65102948 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -86,7 +86,7 @@ def test_end_to_end_pipeline(config_file, sample_data_file, tmp_path): yaml.dump(config, f) # Create and run the DSLRunner - runner = DSLRunner(str(config_file)) + runner = DSLRunner.from_yaml(str(config_file)) total_cost = runner.run() # Check if the output file was created diff --git a/tests/test_synth_gather.py b/tests/test_synth_gather.py index 8168a903..277b6a85 100644 --- a/tests/test_synth_gather.py +++ b/tests/test_synth_gather.py @@ -102,7 +102,7 @@ def test_synth_gather(config_yaml): config_path, long_documents_path, output_path = config_yaml # Initialize the optimizer - optimizer = Optimizer(config_path) + optimizer = Optimizer.from_yaml(config_path) # Run the optimization optimizer.optimize() @@ -111,7 +111,11 @@ def test_synth_gather(config_yaml): synthesized_gather_found = False for step in optimizer.optimized_config["pipeline"]["steps"]: for op in step["operations"]: - synthesized_op = optimizer.optimized_config["operations"][op] + synthesized_op = [ + operation + for operation in optimizer.optimized_config["operations"] + if operation["name"] == op + ][0] if synthesized_op.get("type") == "gather": synthesized_gather_found = True diff --git a/tests/test_synth_resolve.py b/tests/test_synth_resolve.py index 98a27826..ecdd4321 100644 --- a/tests/test_synth_resolve.py +++ b/tests/test_synth_resolve.py @@ -76,10 +76,11 @@ def config_yaml(sample_data): def test_synth_resolve(config_yaml): # Initialize the optimizer - optimizer = Optimizer(config_yaml) + optimizer = Optimizer.from_yaml(config_yaml) # Run the optimization optimizer.optimize() + optimizer.save_optimized_config() # Check if a resolve operation was synthesized synthesized_resolve_found = False @@ -91,7 +92,7 @@ def test_synth_resolve(config_yaml): operation for operation in optimizer.optimized_config["operations"] if operation["name"] == op - ] + ][0] # Check if the synthesized operation has the correct properties assert synthesized_op["type"] == "resolve" From b981893d840ac42124b62234c8c5089087df9c7a Mon Sep 17 00:00:00 2001 From: Shreya Shankar Date: Sat, 21 Sep 2024 22:38:02 -0700 Subject: [PATCH 3/3] Add to docs --- docs/python-api.md | 85 ++++++++++++++++++++++++++++++++++++++++++++++ mkdocs.yml | 1 + 2 files changed, 86 insertions(+) create mode 100644 docs/python-api.md diff --git a/docs/python-api.md b/docs/python-api.md new file mode 100644 index 00000000..386346e4 --- /dev/null +++ b/docs/python-api.md @@ -0,0 +1,85 @@ +# Python API + +The DocETL Python API provides a programmatic way to define, optimize, and run document processing pipelines. This approach offers an alternative to the YAML configuration method, allowing for more dynamic and flexible pipeline construction. + +## Overview + +The Python API consists of several classes: + +- Pipeline: The main class for defining and running a complete document processing pipeline. +- Dataset: Represents a dataset with a type and path. +- Various operation classes (e.g., MapOp, ReduceOp, FilterOp) for different types of data processing steps. +- PipelineStep: Represents a step in the pipeline with input and operations. +- PipelineOutput: Defines the output configuration for the pipeline. + +## Example Usage + +Here's an example of how to use the Python API to create and run a simple document processing pipeline: + +```python +from docetl.api import Pipeline, Dataset, MapOp, ReduceOp, PipelineStep, PipelineOutput + +# Define datasets +datasets = { + "input": Dataset(type="file", path="input.json") +} + +# Define operations +operations = [ + MapOp( + name="process", + type="map", + prompt="Process the document", + output={"schema": {"processed_content": "string"}} + ), + ReduceOp( + name="summarize", + type="reduce", + reduce_key="processed_content", + prompt="Summarize the processed content", + output={"schema": {"summary": "string"}} + ) +] + +# Define pipeline steps +steps = [ + PipelineStep(name="process_step", input="input", operations=["process"]), + PipelineStep(name="summarize_step", input="process_step", operations=["summarize"]) +] + +# Define pipeline output +output = PipelineOutput(type="file", path="output.json") + +# Create the pipeline +pipeline = Pipeline( + name="example_pipeline", + datasets=datasets, + operations=operations, + steps=steps, + output=output, + default_model="gpt-4o-mini" +) + +# Optimize the pipeline +optimized_pipeline = pipeline.optimize() + +# Run the optimized pipeline +result = optimized_pipeline.run() # Saves the result to the output path + +print(f"Pipeline execution completed. Total cost: ${result:.2f}") +``` + +This example demonstrates how to create a simple pipeline that processes input documents and then summarizes the processed content. The pipeline is optimized before execution to improve performance. + +## API Reference + +For a complete reference of all available classes and their methods, please refer to the [Python API Reference](api-reference/python.md). + +The API Reference provides detailed information about each class, including: + +- Available parameters +- Method signatures +- Return types +- Usage examples + +By using the Python API, you can create more complex and dynamic pipelines that can adapt to your specific document processing needs. diff --git a/mkdocs.yml b/mkdocs.yml index 7742e0a2..34f79b8c 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -40,6 +40,7 @@ nav: - Overview: optimization/overview.md - Example: optimization/example.md - Configuration: optimization/configuration.md + - Python API: python-api.md # - Advanced Usage: # - User-Defined Functions: advanced/custom-operators.md # - Extending Optimizer Agents: advanced/extending-agents.md