Skip to content

Commit

Permalink
Merge pull request #32 from ucbepic/shreyashankar/dataset
Browse files Browse the repository at this point in the history
 Add Dataset Class and Parsing Tools
  • Loading branch information
shreyashankar authored Oct 1, 2024
2 parents e8e54cb + 9a82565 commit ab7e87a
Show file tree
Hide file tree
Showing 21 changed files with 1,744 additions and 97 deletions.
5 changes: 1 addition & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,7 @@ tests:
poetry run pytest

tests-basic:
poetry run pytest tests/basic/test_basic_map.py
poetry run pytest tests/basic/test_basic_reduce_resolve.py
poetry run pytest tests/basic/test_basic_parallel_map.py
poetry run pytest tests/basic/test_basic_filter_split_gather.py
poetry run pytest tests/basic

lint:
poetry run ruff check docetl/* --fix
Expand Down
32 changes: 28 additions & 4 deletions docetl/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
The module provides a high-level API for defining, optimizing, and running document processing pipelines.
Classes:
Dataset: Represents a dataset with a type and path.
Dataset: Represents a dataset with a type, path, and optional parsing tools.
BaseOp: Base class for all operation types.
MapOp: Represents a map operation in the pipeline.
ResolveOp: Represents a resolve operation for entity resolution.
Expand All @@ -27,7 +27,13 @@
from docetl.api import Pipeline, Dataset, MapOp, ReduceOp
pipeline = Pipeline(
datasets={"input": Dataset(type="file", path="input.json")},
datasets={
"input": Dataset(
type="file",
path="input.json",
parsing=[{"name": "txt_to_string", "input_key": "text", "output_key": "content"}]
)
},
operations=[
MapOp(name="process", type="map", prompt="Process the document"),
ReduceOp(name="summarize", type="reduce", reduce_key="content")
Expand All @@ -44,7 +50,7 @@
"""

import os
from typing import Any, Dict, Optional
from typing import Any, Dict, List, Optional

import yaml
from rich import print
Expand All @@ -60,6 +66,7 @@
ResolveOp,
SplitOp,
UnnestOp,
ParsingTool,
)


Expand Down Expand Up @@ -103,6 +110,7 @@ def optimize(
steps=self.steps,
output=self.output,
default_model=self.default_model,
parsing_tools=self.parsing_tools,
)
updated_pipeline._update_from_dict(optimized_config)
return updated_pipeline
Expand Down Expand Up @@ -161,6 +169,11 @@ def _to_dict(self) -> Dict[str, Any]:
"output": self.output.dict(),
},
"default_model": self.default_model,
"parsing_tools": (
[tool.dict() for tool in self.parsing_tools]
if self.parsing_tools
else None
),
}

def _update_from_dict(self, config: Dict[str, Any]):
Expand All @@ -171,7 +184,13 @@ def _update_from_dict(self, config: Dict[str, Any]):
config (Dict[str, Any]): Dictionary representation of the Pipeline.
"""
self.datasets = {
name: Dataset(**dataset) for name, dataset in config["datasets"].items()
name: Dataset(
type=dataset["type"],
source=dataset["source"],
path=dataset["path"],
parsing=dataset.get("parsing"),
)
for name, dataset in config["datasets"].items()
}
self.operations = []
for op in config["operations"]:
Expand All @@ -197,3 +216,8 @@ def _update_from_dict(self, config: Dict[str, Any]):
self.steps = [PipelineStep(**step) for step in config["pipeline"]["steps"]]
self.output = PipelineOutput(**config["pipeline"]["output"])
self.default_model = config.get("default_model")
self.parsing_tools = (
[ParsingTool(**tool) for tool in config.get("parsing_tools", [])]
if config.get("parsing_tools")
else []
)
22 changes: 15 additions & 7 deletions docetl/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from typing import Any, Dict, List, Optional, Tuple, Union

import yaml
from docetl.dataset import Dataset, create_parsing_tool_map
from rich.console import Console
from rich.status import Status
from rich.traceback import install
Expand Down Expand Up @@ -139,6 +140,11 @@ def __init__(
self.samples_taken = defaultdict(dict)
self.resume = resume

# create parsing tool map
self.parsing_tool_map = create_parsing_tool_map(
self.config.get("parsing_tools", None)
)

home_dir = os.path.expanduser("~")
cache_dir = os.path.join(home_dir, f".docetl/cache/{yaml_file_suffix}")
os.makedirs(cache_dir, exist_ok=True)
Expand Down Expand Up @@ -955,16 +961,18 @@ def _get_sample_data(
if name_hash and name_hash in self.datasets:
data = self.datasets[name_hash]
else:
dataset = self.config["datasets"].get(dataset_name)
if dataset is None:
dataset_config = self.config["datasets"].get(dataset_name)
if dataset_config is None:
raise ValueError(
f"Dataset '{dataset_name}' not found in config or previous steps."
)
if dataset["type"] == "file":
with open(dataset["path"], "r") as f:
data = json.load(f)
else:
raise ValueError(f"Unsupported dataset type: {dataset['type']}")
dataset = Dataset(
type=dataset_config["type"],
path_or_data=dataset_config["path"],
parsing=dataset_config.get("parsing", []),
user_defined_parsing_tool_map=self.parsing_tool_map,
)
data = dataset.load()

if sample_size == float("inf"):
return data
Expand Down
Loading

0 comments on commit ab7e87a

Please sign in to comment.