diff --git a/docetl/operations/reduce.py b/docetl/operations/reduce.py index 36d0173e..254323a6 100644 --- a/docetl/operations/reduce.py +++ b/docetl/operations/reduce.py @@ -394,11 +394,13 @@ def process_group( ): # If the fold batch size is greater than or equal to the number of items in the group, # we can just run a single fold operation - result, prompts, cost = self._batch_reduce(key, group_list) + result, prompt, cost = self._batch_reduce(key, group_list) + prompts = [prompt] elif "fold_prompt" in self.config: result, prompts, cost = self._incremental_reduce(key, group_list) else: - result, prompts, cost = self._batch_reduce(key, group_list) + result, prompt, cost = self._batch_reduce(key, group_list) + prompts = [prompt] total_cost += cost diff --git a/docetl/operations/resolve.py b/docetl/operations/resolve.py index 489c3a21..bcfec266 100644 --- a/docetl/operations/resolve.py +++ b/docetl/operations/resolve.py @@ -236,6 +236,16 @@ def execute(self, input_data: List[Dict]) -> Tuple[List[Dict], float]: if len(input_data) == 0: return [], 0 + # Initialize observability data for all items at the start + if self.config.get("enable_observability", False): + observability_key = f"_observability_{self.config['name']}" + for item in input_data: + if observability_key not in item: + item[observability_key] = { + "comparison_prompts": [], + "resolution_prompt": None + } + blocking_keys = self.config.get("blocking_keys", []) blocking_threshold = self.config.get("blocking_threshold") blocking_conditions = self.config.get("blocking_conditions", []) @@ -340,7 +350,7 @@ def meets_blocking_conditions(pair: Tuple[int, int]) -> bool: is_match(input_data[i], input_data[j]) if blocking_conditions else False ) - blocked_pairs = list(filter(meets_blocking_conditions, comparison_pairs)) + blocked_pairs = list(filter(meets_blocking_conditions, comparison_pairs)) if blocking_conditions else comparison_pairs # Apply limit_comparisons to blocked pairs if limit_comparisons is not None and len(blocked_pairs) > limit_comparisons: @@ -641,6 +651,8 @@ def process_cluster(cluster): for output_key, compare_key in key_mapping.items(): if compare_key in input_data[list(cluster)[0]]: result[output_key] = input_data[list(cluster)[0]][compare_key] + elif output_key in input_data[list(cluster)[0]]: + result[output_key] = input_data[list(cluster)[0]][output_key] else: result[output_key] = None # or some default value diff --git a/website/src/components/ResizableDataTable.tsx b/website/src/components/ResizableDataTable.tsx index abd5b13f..bbfef1b6 100644 --- a/website/src/components/ResizableDataTable.tsx +++ b/website/src/components/ResizableDataTable.tsx @@ -828,7 +828,7 @@ const ObservabilityIndicator = React.memo( >

- LLM Call for {currentOperation} + LLM Call(s) for {currentOperation}

{observabilityEntries.map(([key, value]) => (