Skip to content

Commit

Permalink
Merge branch 'ucbepic:main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
staru09 authored Nov 14, 2024
2 parents fc4f372 + 8d9935f commit 8a4a47a
Showing 24 changed files with 1,259 additions and 205 deletions.
92 changes: 91 additions & 1 deletion docetl/builder.py
Original file line number Diff line number Diff line change
@@ -475,6 +475,96 @@ def _load_optimized_ops(self):
else:
self.console.log("[yellow]No optimized operations found[/yellow]")

def should_optimize(self, step_name: str, op_name: str) -> bool:
"""
Determine if an operation should be optimized.
We do this by running the operations on a sample of the input data and checking if the output is correct.
"""
self.console.rule("[bold cyan]Beginning Pipeline Optimization[/bold cyan]")
self.syntax_check()

self._insert_empty_resolve_operations()

for step in self.config["pipeline"]["steps"]:
self.captured_output.set_step(step.get("name"))
# Go through each operation in the step until we find the one we want to optimize
ops_run = []
op_name_to_object = {name: self.find_operation(name) for name in step["operations"]}
for op_idx, operation in enumerate(step["operations"]):
if isinstance(operation, dict):
operation_name = list(operation.keys())[0]
operation_config = operation[operation_name]
else:
operation_name = operation
operation_config = {}

op_object = self.find_operation(operation_name).copy()
op_object.update(operation_config)
op_object["name"] = operation_name

# Run the pipeline
sample_size = self.compute_sample_size(
step.get("name"), step.get("operations"), op_object
)
input_data = self._run_partial_step(
step, ops_run, sample_size, op_name_to_object
)

# If this is not the operation we want to optimize, just execute it and add to selectivities
if f"{step.get('name')}/{op_name}" != f"{step_name}/{op_name}" and op_object.get("empty", False):
output_data = self._run_operation(op_object, input_data, is_build=True)
self.selectivities[step.get("name")][op_name] = len(output_data) / len(input_data)
ops_run.append(op_name)

# if this is the operation we want to optimize, invoke the optimizer's should_optimize method
else:
if op_object.get("type") == "map" or op_object.get("type") == "filter":
# Create instance of map optimizer
map_optimizer = MapOptimizer(
self,
self.config,
self.console,
self.llm_client,
self.max_threads,
self._run_operation,
timeout=self.timeout,
is_filter=op_object.get("type") == "filter",
)
should_optimize_output = map_optimizer.should_optimize(op_object, input_data)
elif op_object.get("type") == "reduce":
reduce_optimizer = ReduceOptimizer(
self.runner,
self.config,
self.console,
self.llm_client,
self.max_threads,
self._run_operation,
)
should_optimize_output = reduce_optimizer.should_optimize(op_object, input_data)
elif op_object.get("type") == "resolve":
resolve_optimizer = JoinOptimizer(
self.runner,
self.config,
op_object,
self.console,
self.llm_client,
self.max_threads,
target_recall=self.config.get("optimizer_config", {})
.get("resolve", {})
.get("target_recall", 0.95),
)
_, should_optimize_output = resolve_optimizer.should_optimize(input_data)

# if should_optimize_output is empty, then we should move to the reduce operation
if should_optimize_output == "":
continue

# Return the string and operation cost
return should_optimize_output, self.operations_cost + self.llm_client.total_cost

# Should not get here
raise ValueError("No operation to optimize found")

def optimize(self) -> float:
"""
Optimize the entire pipeline defined in the configuration.
@@ -706,6 +796,7 @@ def _optimize_step(
Raises:
ValueError: If an unsupported operation type is encountered.
"""
self.captured_output.set_step(step.get("name"))
optimized_operations = {}
optimized_operation_names = []
replacement_operations = {} # List from old op name to new ops
@@ -1267,7 +1358,6 @@ def _optimize_map(
Returns:
List[Dict[str, Any]]: The optimized operation configuration.
"""
self.captured_output.set_step(op_config["name"])

map_optimizer = MapOptimizer(
self,
50 changes: 35 additions & 15 deletions docetl/optimizers/join_optimizer.py
Original file line number Diff line number Diff line change
@@ -48,7 +48,7 @@ def __init__(
# f"[yellow]Using estimated selectivity of {self.estimated_selectivity}[/yellow]"
# )

def _analyze_map_prompt_categorization(self, map_prompt: str) -> bool:
def _analyze_map_prompt_categorization(self, map_prompt: str) -> Tuple[bool, str]:
"""
Analyze the map prompt to determine if it's explicitly categorical.
@@ -102,14 +102,14 @@ def _analyze_map_prompt_categorization(self, map_prompt: str) -> bool:
self.console.log(f"Is Categorical: {analysis['is_categorical']}")
self.console.log(f"Explanation: {analysis['explanation']}")

return analysis["is_categorical"].lower() == "yes"
return analysis["is_categorical"].lower() == "yes", analysis["explanation"]

def _determine_duplicate_keys(
self,
input_data: List[Dict[str, Any]],
reduce_key: List[str],
map_prompt: Optional[str] = None,
) -> bool:
) -> Tuple[bool, str]:
# Prepare a sample of the input data for analysis
sample_size = min(10, len(input_data))
data_sample = random.sample(
@@ -156,8 +156,8 @@ def _determine_duplicate_keys(
self.console.log(
"[yellow]Duplicates are likely. Consider using a deduplication strategy in the resolution step.[/yellow]"
)
return True
return False
return True, analysis["explanation"]
return False, ""

def _sample_random_pairs(
self, input_data: List[Dict[str, Any]], n: int
@@ -181,7 +181,7 @@ def _check_duplicates_with_llm(
pairs: List[Tuple[int, int]],
reduce_key: List[str],
map_prompt: Optional[str],
) -> bool:
) -> Tuple[bool, str]:
"""Use LLM to check if any pairs are duplicates."""

content = "Analyze the following pairs of entries and determine if any of them are likely duplicates. Respond with 'Yes' if you find any likely duplicates, or 'No' if none of the pairs seem to be duplicates. Provide a brief explanation for your decision.\n\n"
@@ -222,7 +222,7 @@ def _check_duplicates_with_llm(
f"[bold]Explanation:[/bold] {response['explanation']}"
)

return response["duplicates_found"].lower() == "yes"
return response["duplicates_found"].lower() == "yes", response["explanation"]

def synthesize_compare_prompt(
self, map_prompt: Optional[str], reduce_key: List[str]
@@ -376,13 +376,17 @@ def synthesize_resolution_prompt(
)

return resolution_prompt

def optimize_resolve(
self, input_data: List[Dict[str, Any]]
) -> Tuple[Dict[str, Any], float]:

def should_optimize(self, input_data: List[Dict[str, Any]]) -> Tuple[bool, str]:
"""
Determine if the given operation configuration should be optimized.
"""
# If there are no blocking keys or embeddings, then we don't need to optimize
if not self.op_config.get("blocking_conditions") or not self.op_config.get("blocking_threshold"):
return True, ""

# Check if the operation is marked as empty
if self.op_config.get("empty", False):
elif self.op_config.get("empty", False):
# Extract the map prompt from the intermediates
map_prompt = self.op_config["_intermediates"]["map_prompt"]
reduce_key = self.op_config["_intermediates"]["reduce_key"]
@@ -393,10 +397,11 @@ def optimize_resolve(
)

dedup = True
explanation = "There is a reduce operation that does not follow a resolve operation. Consider adding a resolve operation to deduplicate the data."

if map_prompt:
# Analyze the map prompt
analysis = self._analyze_map_prompt_categorization(map_prompt)
analysis, explanation = self._analyze_map_prompt_categorization(map_prompt)

if analysis:
dedup = False
@@ -410,7 +415,7 @@ def optimize_resolve(
map_prompt = "N/A"

if dedup is False:
dedup = self._determine_duplicate_keys(
dedup, explanation = self._determine_duplicate_keys(
input_data, reduce_key, map_prompt
)

@@ -420,12 +425,27 @@ def optimize_resolve(
sampled_pairs = self._sample_random_pairs(input_data, 20)

# Use LLM to check for duplicates
duplicates_found = self._check_duplicates_with_llm(
duplicates_found, explanation = self._check_duplicates_with_llm(
input_data, sampled_pairs, reduce_key, map_prompt
)

if duplicates_found:
dedup = True

return dedup, explanation

return False, ""

def optimize_resolve(
self, input_data: List[Dict[str, Any]]
) -> Tuple[Dict[str, Any], float]:

# Check if the operation is marked as empty
if self.op_config.get("empty", False):
# Extract the map prompt from the intermediates
dedup, _ = self.should_optimize(input_data)
reduce_key = self.op_config["_intermediates"]["reduce_key"]
map_prompt = self.op_config["_intermediates"]["map_prompt"]

if dedup is False:
# If no deduplication is needed, return the same config with 0 cost
113 changes: 69 additions & 44 deletions docetl/optimizers/map_optimizer/optimizer.py
Original file line number Diff line number Diff line change
@@ -86,52 +86,25 @@ def __init__(
runner, llm_client, console, config, max_threads, is_filter
)

def optimize(
self, op_config: Dict[str, Any], input_data: List[Dict[str, Any]]
) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]], float]:
def should_optimize(self, op_config: Dict[str, Any], input_data: List[Dict[str, Any]]) -> str:
"""
Optimize the given operation configuration for the input data.
This method analyzes the operation and input data, generates various
optimization plans, evaluates them, and returns the best plan along
with its output. A key part of this process is creating a custom
validator prompt for evaluation. The validator prompt is generated
based on the specific task, input data, and output data. It serves
as a critical tool for assessing the quality and correctness of
each optimization plan's output. This custom prompt ensures that
the evaluation is tailored to the unique requirements and nuances
of the given operation. The types of optimization plans include:
1. Improved Prompt Plan: Enhances the original prompt based on evaluation, aiming to improve output quality.
2. Chunk Size Plan: Splits input data into chunks of different sizes,
processes each chunk separately, and then combines the results. This
can improve performance for large inputs.
3. Gleaning Plans: Implements an iterative refinement process where the
output is validated and improved over multiple rounds, enhancing accuracy.
4. Chain Decomposition Plan: Breaks down complex operations into a series
of simpler sub-operations, potentially improving overall performance
and interpretability.
5. Parallel Map Plan: Decomposes the task into subtasks that can be
executed in parallel, potentially speeding up processing for
independent operations.
The method generates these plans, evaluates their performance using
a custom validator, and selects the best performing plan based on
output quality and execution time.
Args:
op_config (Dict[str, Any]): The configuration of the operation to optimize.
input_data (List[Dict[str, Any]]): The input data for the operation.
Returns:
Tuple[List[Dict[str, Any]], List[Dict[str, Any]], float]: A tuple containing
the best optimization plan and its output. The plan is a list of
operation configurations that achieve the best performance.
The cost is the cost of the optimizer (from possibly synthesizing resolves).
Determine if the given operation configuration should be optimized.
"""
input_data, output_data, _, _, validator_prompt, assessment, data_exceeds_limit = self._should_optimize_helper(op_config, input_data)
if data_exceeds_limit or assessment.get("needs_improvement", True):
assessment_str = "\n".join(assessment.get("reasons", [])) + "\n\nHere are some improvements that may help:\n" + "\n".join(assessment.get("improvements", []))
if data_exceeds_limit:
assessment_str += "\nAlso, the input data exceeds the token limit."
return assessment_str
else:
return ""


def _should_optimize_helper(self, op_config: Dict[str, Any], input_data: List[Dict[str, Any]]) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]], int, float, str, Dict[str, Any], bool]:
"""
Determine if the given operation configuration should be optimized.
Create a custom validator prompt and assess the operation's performance
using the validator.
"""
self.console.post_optimizer_status(StageType.SAMPLE_RUN)
input_data = copy.deepcopy(input_data)
@@ -229,6 +202,58 @@ def optimize(
validator_prompt,
)

return input_data, output_data, model_input_context_length, no_change_runtime, validator_prompt, assessment, data_exceeds_limit


def optimize(
self, op_config: Dict[str, Any], input_data: List[Dict[str, Any]]
) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]], float]:
"""
Optimize the given operation configuration for the input data.
This method analyzes the operation and input data, generates various
optimization plans, evaluates them, and returns the best plan along
with its output. A key part of this process is creating a custom
validator prompt for evaluation. The validator prompt is generated
based on the specific task, input data, and output data. It serves
as a critical tool for assessing the quality and correctness of
each optimization plan's output. This custom prompt ensures that
the evaluation is tailored to the unique requirements and nuances
of the given operation. The types of optimization plans include:
1. Improved Prompt Plan: Enhances the original prompt based on evaluation, aiming to improve output quality.
2. Chunk Size Plan: Splits input data into chunks of different sizes,
processes each chunk separately, and then combines the results. This
can improve performance for large inputs.
3. Gleaning Plans: Implements an iterative refinement process where the
output is validated and improved over multiple rounds, enhancing accuracy.
4. Chain Decomposition Plan: Breaks down complex operations into a series
of simpler sub-operations, potentially improving overall performance
and interpretability.
5. Parallel Map Plan: Decomposes the task into subtasks that can be
executed in parallel, potentially speeding up processing for
independent operations.
The method generates these plans, evaluates their performance using
a custom validator, and selects the best performing plan based on
output quality and execution time.
Args:
op_config (Dict[str, Any]): The configuration of the operation to optimize.
input_data (List[Dict[str, Any]]): The input data for the operation.
Returns:
Tuple[List[Dict[str, Any]], List[Dict[str, Any]], float]: A tuple containing
the best optimization plan and its output. The plan is a list of
operation configurations that achieve the best performance.
The cost is the cost of the optimizer (from possibly synthesizing resolves).
"""
input_data, output_data, model_input_context_length, no_change_runtime, validator_prompt, assessment, data_exceeds_limit = self._should_optimize_helper(op_config, input_data)

# Check if improvement is needed based on the assessment
if not data_exceeds_limit and not assessment.get("needs_improvement", True):
self.console.log(
4 changes: 2 additions & 2 deletions docetl/optimizers/map_optimizer/prompt_generators.py
Original file line number Diff line number Diff line change
@@ -72,8 +72,8 @@ def _generate_validator_prompt(
Task Prompt: {op_config.get('prompt', 'N/A')}
Based on this information, create a custom validator prompt that will assess how well the original task was performed. The prompt should ask 2 or 3 specific questions about the quality and completeness of the output, such as:
1. Are there any instances of the target information missed?
2. Would the output improve if the input was analyzed more carefully?
1. Recall-oriented; if the prompt asks for all instances of a target information, the validator prompt should ask if all instances were found?
2. Would the output significantly improve if the input was analyzed more carefully?
3. Is the output format correct and consistent?
4. Are there any errors or inconsistencies in the extracted information?
Loading

0 comments on commit 8a4a47a

Please sign in to comment.