Skip to content

Commit

Permalink
feat: add basic llm call observability to the UI (#209)
Browse files Browse the repository at this point in the history
* feat: add basic llm call observability to the UI

* test all observability for ops

* rebase with main

* edit readme
  • Loading branch information
shreyashankar authored Nov 22, 2024
1 parent 7ab9485 commit 0a4bcc7
Show file tree
Hide file tree
Showing 8 changed files with 224 additions and 84 deletions.
25 changes: 13 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,19 @@ DocETL is the ideal choice when you're looking to maximize correctness and outpu
- You're working with long documents that don't fit into a single prompt
- You have validation criteria and want tasks to automatically retry when validation fails

### Community Projects

- [Conversation Generator](https://github.com/PassionFruits-net/docetl-conversation)
- [Text-to-speech](https://github.com/PassionFruits-net/docetl-speaker)
- [YouTube Transcript Topic Extraction](https://github.com/rajib76/docetl_examples)

### Educational Resources

- [UI/UX Thoughts](https://x.com/sh_reya/status/1846235904664273201)
- [Using Gleaning to Improve Output Quality](https://x.com/sh_reya/status/1843354256335876262)
- [Deep Dive on Resolve Operator](https://x.com/sh_reya/status/1840796824636121288)


## Getting Started

There are two main ways to use DocETL:
Expand Down Expand Up @@ -161,15 +174,3 @@ make tests-basic # Runs basic test suite (costs < $0.01 with OpenAI)
```

For detailed documentation and tutorials, visit our [documentation](https://ucbepic.github.io/docetl).

## Community Projects

- [Conversation Generator](https://github.com/PassionFruits-net/docetl-conversation)
- [Text-to-speech](https://github.com/PassionFruits-net/docetl-speaker)
- [YouTube Transcript Topic Extraction](https://github.com/rajib76/docetl_examples)

## Educational Resources

- [UI/UX Thoughts](https://x.com/sh_reya/status/1846235904664273201)
- [Using Gleaning to Improve Output Quality](https://x.com/sh_reya/status/1843354256335876262)
- [Deep Dive on Resolve Operator](https://x.com/sh_reya/status/1840796824636121288)
14 changes: 12 additions & 2 deletions docetl/operations/map.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class schema(BaseOperation.schema):
gleaning: Optional[Dict[str, Any]] = None
drop_keys: Optional[List[str]] = None
timeout: Optional[int] = None
enable_observability: bool = False
batch_size: Optional[int] = None
clustering_method: Optional[str] = None
batch_prompt: Optional[str] = None
Expand Down Expand Up @@ -228,9 +229,12 @@ def validation_fn(response: Union[Dict[str, Any], ModelResponse]):
)[0]
else:
output = llm_result.response


# Augment the output with the original item
output = {**item, **output}
if self.config.get("enable_observability", False):
output[f"_observability_{self.config['name']}"] = {"prompt": prompt}
return output, llm_result.total_cost

return None, llm_result.total_cost
Expand Down Expand Up @@ -317,6 +321,7 @@ class schema(BaseOperation.schema):
type: str = "parallel_map"
prompts: List[Dict[str, Any]]
output: Dict[str, Any]
enable_observability: bool = False

def __init__(
self,
Expand Down Expand Up @@ -471,7 +476,7 @@ def process_prompt(item, prompt_config):
tools=prompt_config.get("tools", None),
manually_fix_errors=self.manually_fix_errors,
)[0]
return output, response.total_cost
return output, prompt, response.total_cost

with ThreadPoolExecutor(max_workers=self.max_threads) as executor:
if "prompts" in self.config:
Expand All @@ -488,7 +493,7 @@ def process_prompt(item, prompt_config):
desc="Processing parallel map items",
):
future = all_futures[i]
output, cost = future.result()
output, prompt, cost = future.result()
total_cost += cost

# Determine which item this future corresponds to
Expand All @@ -503,6 +508,11 @@ def process_prompt(item, prompt_config):
# Fetch the item_result
item_result = results[item_index]

if self.config.get("enable_observability", False):
if f"_observability_{self.config['name']}" not in item_result:
item_result[f"_observability_{self.config['name']}"] = {}
item_result[f"_observability_{self.config['name']}"].update({f"prompt_{prompt_index}": prompt})

# Update the item_result with the output
item_result.update(output)

Expand Down
72 changes: 43 additions & 29 deletions docetl/operations/reduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class schema(BaseOperation.schema):
verbose: Optional[bool] = None
timeout: Optional[int] = None
litellm_completion_kwargs: Dict[str, Any] = Field(default_factory=dict)
enable_observability: bool = False

def __init__(self, *args, **kwargs):
"""
Expand Down Expand Up @@ -386,24 +387,32 @@ def process_group(

# Only execute merge-based plans if associative = True
if "merge_prompt" in self.config and self.config.get("associative", True):
result, cost = self._parallel_fold_and_merge(key, group_list)
result, prompts, cost = self._parallel_fold_and_merge(key, group_list)
elif (
self.config.get("fold_batch_size", None)
and self.config.get("fold_batch_size") >= len(group_list)
):
# If the fold batch size is greater than or equal to the number of items in the group,
# we can just run a single fold operation
result, cost = self._batch_reduce(key, group_list)
result, prompt, cost = self._batch_reduce(key, group_list)
prompts = [prompt]
elif "fold_prompt" in self.config:
result, cost = self._incremental_reduce(key, group_list)
result, prompts, cost = self._incremental_reduce(key, group_list)
else:
result, cost = self._batch_reduce(key, group_list)
result, prompt, cost = self._batch_reduce(key, group_list)
prompts = [prompt]

total_cost += cost

# Add the counts of items in the group to the result
result[f"_counts_prereduce_{self.config['name']}"] = len(group_elems)

if self.config.get("enable_observability", False):
# Add the _observability_{self.config['name']} key to the result
result[f"_observability_{self.config['name']}"] = {
"prompts": prompts
}

# Apply pass-through at the group level
if (
result is not None
Expand Down Expand Up @@ -548,7 +557,7 @@ def _parallel_fold_and_merge(
fold_batch_size = self.config["fold_batch_size"]
merge_batch_size = self.config["merge_batch_size"]
total_cost = 0

prompts = []
def calculate_num_parallel_folds():
fold_time, fold_default = self.get_fold_time()
merge_time, merge_default = self.get_merge_time()
Expand Down Expand Up @@ -589,8 +598,9 @@ def calculate_num_parallel_folds():

new_fold_results = []
for future in as_completed(fold_futures):
result, cost = future.result()
result, prompt, cost = future.result()
total_cost += cost
prompts.append(prompt)
if result is not None:
new_fold_results.append(result)
if self.config.get("persist_intermediates", False):
Expand Down Expand Up @@ -620,8 +630,9 @@ def calculate_num_parallel_folds():

new_results = []
for future in as_completed(merge_futures):
result, cost = future.result()
result, prompt, cost = future.result()
total_cost += cost
prompts.append(prompt)
if result is not None:
new_results.append(result)
if self.config.get("persist_intermediates", False):
Expand Down Expand Up @@ -660,8 +671,9 @@ def calculate_num_parallel_folds():

new_results = []
for future in as_completed(merge_futures):
result, cost = future.result()
result, prompt, cost = future.result()
total_cost += cost
prompts.append(prompt)
if result is not None:
new_results.append(result)
if self.config.get("persist_intermediates", False):
Expand All @@ -676,11 +688,11 @@ def calculate_num_parallel_folds():

fold_results = new_results

return (fold_results[0], total_cost) if fold_results else (None, total_cost)
return (fold_results[0], prompts, total_cost) if fold_results else (None, prompts, total_cost)

def _incremental_reduce(
self, key: Tuple, group_list: List[Dict]
) -> Tuple[Optional[Dict], float]:
) -> Tuple[Optional[Dict], List[str], float]:
"""
Perform an incremental reduce operation on a group of items.
Expand All @@ -691,12 +703,13 @@ def _incremental_reduce(
group_list (List[Dict]): The list of items in the group to be processed.
Returns:
Tuple[Optional[Dict], float]: A tuple containing the final reduced result (or None if processing failed)
and the total cost of the operation.
Tuple[Optional[Dict], List[str], float]: A tuple containing the final reduced result (or None if processing failed),
the list of prompts used, and the total cost of the operation.
"""
fold_batch_size = self.config["fold_batch_size"]
total_cost = 0
current_output = None
prompts = []

# Calculate and log the number of folds to be performed
num_folds = (len(group_list) + fold_batch_size - 1) // fold_batch_size
Expand All @@ -715,10 +728,11 @@ def _incremental_reduce(
)
batch = group_list[i : i + fold_batch_size]

folded_output, fold_cost = self._increment_fold(
folded_output, prompt, fold_cost = self._increment_fold(
key, batch, current_output, scratchpad
)
total_cost += fold_cost
prompts.append(prompt)

if folded_output is None:
continue
Expand All @@ -744,7 +758,7 @@ def _incremental_reduce(

current_output = folded_output

return current_output, total_cost
return current_output, prompts, total_cost

def validation_fn(self, response: Dict[str, Any]):
output = self.runner.api.parse_llm_response(
Expand All @@ -761,7 +775,7 @@ def _increment_fold(
batch: List[Dict],
current_output: Optional[Dict],
scratchpad: Optional[str] = None,
) -> Tuple[Optional[Dict], float]:
) -> Tuple[Optional[Dict], str, float]:
"""
Perform an incremental fold operation on a batch of items.
Expand All @@ -773,8 +787,8 @@ def _increment_fold(
current_output (Optional[Dict]): The current accumulated output, if any.
scratchpad (Optional[str]): The scratchpad to use for the fold operation.
Returns:
Tuple[Optional[Dict], float]: A tuple containing the folded output (or None if processing failed)
and the cost of the fold operation.
Tuple[Optional[Dict], str, float]: A tuple containing the folded output (or None if processing failed),
the prompt used, and the cost of the fold operation.
"""
if current_output is None:
return self._batch_reduce(key, batch, scratchpad)
Expand Down Expand Up @@ -822,13 +836,13 @@ def _increment_fold(
folded_output.update(dict(zip(self.config["reduce_key"], key)))
fold_cost = response.total_cost

return folded_output, fold_cost
return folded_output, fold_prompt, fold_cost

return None, fold_cost
return None, fold_prompt, fold_cost

def _merge_results(
self, key: Tuple, outputs: List[Dict]
) -> Tuple[Optional[Dict], float]:
) -> Tuple[Optional[Dict], str, float]:
"""
Merge multiple outputs into a single result.
Expand All @@ -839,8 +853,8 @@ def _merge_results(
outputs (List[Dict]): The list of outputs to be merged.
Returns:
Tuple[Optional[Dict], float]: A tuple containing the merged output (or None if processing failed)
and the cost of the merge operation.
Tuple[Optional[Dict], str, float]: A tuple containing the merged output (or None if processing failed),
the prompt used, and the cost of the merge operation.
"""
start_time = time.time()
merge_prompt_template = Template(self.config["merge_prompt"])
Expand Down Expand Up @@ -879,9 +893,9 @@ def _merge_results(
)[0]
merged_output.update(dict(zip(self.config["reduce_key"], key)))
merge_cost = response.total_cost
return merged_output, merge_cost
return merged_output, merge_prompt, merge_cost

return None, merge_cost
return None, merge_prompt, merge_cost

def get_fold_time(self) -> Tuple[float, bool]:
"""
Expand Down Expand Up @@ -935,7 +949,7 @@ def _update_merge_time(self, time: float) -> None:

def _batch_reduce(
self, key: Tuple, group_list: List[Dict], scratchpad: Optional[str] = None
) -> Tuple[Optional[Dict], float]:
) -> Tuple[Optional[Dict], str, float]:
"""
Perform a batch reduce operation on a group of items.
Expand All @@ -946,8 +960,8 @@ def _batch_reduce(
group_list (List[Dict]): The list of items to be reduced.
scratchpad (Optional[str]): The scratchpad to use for the reduce operation.
Returns:
Tuple[Optional[Dict], float]: A tuple containing the reduced output (or None if processing failed)
and the cost of the reduce operation.
Tuple[Optional[Dict], str, float]: A tuple containing the reduced output (or None if processing failed),
the prompt used, and the cost of the reduce operation.
"""
prompt_template = Template(self.config["prompt"])
prompt = prompt_template.render(
Expand Down Expand Up @@ -988,5 +1002,5 @@ def _batch_reduce(
)[0]
output.update(dict(zip(self.config["reduce_key"], key)))

return output, item_cost
return None, item_cost
return output, prompt, item_cost
return None, prompt, item_cost
Loading

0 comments on commit 0a4bcc7

Please sign in to comment.