From d9454fc0059f916fc10ece47369d57cae1bddbc8 Mon Sep 17 00:00:00 2001 From: Madeesh Kannan Date: Wed, 22 May 2024 11:49:47 +0200 Subject: [PATCH] feat: Implement `PipelinePair` primitive and related utility functions (#4) * feat: Implement `PipelinePair` primitive and related utility functions * Make `PipelinePair._split_input_output_path` a static method * Clarify what `_prepare_reqd_outputs_for_first_pipeline` returns as its output * Add license header * Disable superfluous lints * Fix lints * `mypy` fix --- .../evaluation/pipeline_pair.py | 209 ++++++++++++++++ haystack_experimental/evaluation/util.py | 98 ++++++++ haystack_experimental/testing/__init__.py | 3 + .../testing/sample_components.py | 40 +++ pyproject.toml | 27 +- test/evaluation/__init__.py | 3 + test/evaluation/test_pipeline_pair.py | 236 ++++++++++++++++++ test/evaluation/test_util.py | 54 ++++ 8 files changed, 654 insertions(+), 16 deletions(-) create mode 100644 haystack_experimental/evaluation/pipeline_pair.py create mode 100644 haystack_experimental/evaluation/util.py create mode 100644 haystack_experimental/testing/__init__.py create mode 100644 haystack_experimental/testing/sample_components.py create mode 100644 test/evaluation/__init__.py create mode 100644 test/evaluation/test_pipeline_pair.py create mode 100644 test/evaluation/test_util.py diff --git a/haystack_experimental/evaluation/pipeline_pair.py b/haystack_experimental/evaluation/pipeline_pair.py new file mode 100644 index 00000000..3a30c5ba --- /dev/null +++ b/haystack_experimental/evaluation/pipeline_pair.py @@ -0,0 +1,209 @@ +# SPDX-FileCopyrightText: 2022-present deepset GmbH +# +# SPDX-License-Identifier: Apache-2.0 + +from dataclasses import dataclass +from typing import Any, Callable, Dict, List, Optional, Set, Tuple + +from haystack import Pipeline + + +@dataclass(frozen=True) +class PipelinePair: + """ + A pair of pipelines that are linked together and executed sequentially. + + :param first: + The first pipeline in the sequence. + :param second: + The second pipeline in the sequence. + :param outputs_to_inputs: + A mapping of the outputs of the first pipeline to the + inputs of the second pipeline in the following format: + `"name_of_component.name_of_output": "name_of_component.name_of_input`. + A single output can be mapped to multiple inputs. + :param map_first_outputs: + A function that post-processes the outputs of the first + pipeline, which it receives as its (only) argument. + :param included_first_outputs: + Names of components in the first pipeline whose outputs + should be included in the final outputs. + :param included_second_outputs: + Names of components in the second pipeline whose outputs + should be included in the final outputs. + """ + + first: Pipeline + second: Pipeline + outputs_to_inputs: Dict[str, List[str]] + map_first_outputs: Optional[Callable] = None + included_first_outputs: Optional[Set[str]] = None + included_second_outputs: Optional[Set[str]] = None + + def __post_init__(self): + first_outputs = self.first.outputs(include_components_with_connected_outputs=True) + second_inputs = self.second.inputs(include_components_with_connected_inputs=True) + seen_second_inputs = set() + + # Validate the mapping of outputs from the first pipeline + # to the inputs of the second pipeline. + for first_out, second_ins in self.outputs_to_inputs.items(): + first_comp_name, first_out_name = self._split_input_output_path(first_out) + if first_comp_name not in first_outputs: + raise ValueError(f"Output component '{first_comp_name}' not found in first pipeline.") + if first_out_name not in first_outputs[first_comp_name]: + raise ValueError( + f"Component '{first_comp_name}' in first pipeline does not have expected output '{first_out_name}'." + ) + + for second_in in second_ins: + if second_in in seen_second_inputs: + raise ValueError( + f"Input '{second_in}' in second pipeline is connected to multiple first pipeline outputs." + ) + + second_comp_name, second_input_name = self._split_input_output_path(second_in) + if second_comp_name not in second_inputs: + raise ValueError(f"Input component '{second_comp_name}' not found in second pipeline.") + if second_input_name not in second_inputs[second_comp_name]: + raise ValueError( + f"Component '{second_comp_name}' in second pipeline " + f"does not have expected input '{second_input_name}'." + ) + seen_second_inputs.add(second_in) + + def _validate_second_inputs(self, inputs: Dict[str, Dict[str, Any]]): + # Check if the connected input is also provided explicitly. + second_connected_inputs = [ + self._split_input_output_path(p_h) for p in self.outputs_to_inputs.values() for p_h in p + ] + for component_name, input_name in second_connected_inputs: + provided_input = inputs.get(component_name) + if provided_input is None: + continue + if input_name in provided_input: + raise ValueError( + f"Second pipeline input '{component_name}.{input_name}' cannot " + "be provided both explicitly and by the first pipeline." + ) + + @staticmethod + def _split_input_output_path(path: str) -> Tuple[str, str]: + # Split the input/output path into component name and input/output name. + pos = path.find(".") + if pos == -1: + raise ValueError( + f"Invalid pipeline i/o path specifier '{path}' - Must be " + "in the following format: ." + ) + return path[:pos], path[pos + 1 :] + + def _prepare_reqd_outputs_for_first_pipeline(self) -> Set[str]: + # To ensure that we have all the outputs from the first + # pipeline that are required by the second pipeline, we + # collect first collect all the keys in the first-to-second + # output-to-input mapping and then add the explicitly included + # first pipeline outputs. + first_components_with_outputs = {self._split_input_output_path(p)[0] for p in self.outputs_to_inputs.keys()} + if self.included_first_outputs is not None: + first_components_with_outputs = first_components_with_outputs.union(self.included_first_outputs) + return first_components_with_outputs + + def _map_first_second_pipeline_io( + self, first_outputs: Dict[str, Dict[str, Any]], second_inputs: Dict[str, Dict[str, Any]] + ) -> Dict[str, Dict[str, Any]]: + # Map the first pipeline outputs to the second pipeline inputs. + for first_output, second_input_candidates in self.outputs_to_inputs.items(): + first_component, first_output = self._split_input_output_path(first_output) + + # Each output from the first pipeline can be mapped to multiple inputs in the second pipeline. + for second_input in second_input_candidates: + second_component, second_input_socket = self._split_input_output_path(second_input) + + second_component_inputs = second_inputs.get(second_component) + if second_component_inputs is not None: + # Pre-condition should've been validated earlier. + assert second_input_socket not in second_component_inputs + # The first pipeline's output should also guaranteed at this point. + second_component_inputs[second_input_socket] = first_outputs[first_component][first_output] + else: + second_inputs[second_component] = { + second_input_socket: first_outputs[first_component][first_output] + } + + return second_inputs + + def run( + self, first_inputs: Dict[str, Dict[str, Any]], second_inputs: Optional[Dict[str, Dict[str, Any]]] = None + ) -> Dict[str, Dict[str, Any]]: + """ + Execute the pipeline pair in sequence. + + Invokes the first pipeline and then the second with the outputs + of the former. This assumes that both pipelines have the same input + modality, i.e., the shapes of the first pipeline's outputs match the + shapes of the second pipeline's inputs. + + :param first_inputs: + The inputs to the first pipeline. + :param second_inputs: + The inputs to the second pipeline. + :returns: + A dictionary with the following keys: + - `first` - The outputs of the first pipeline. + - `second` - The outputs of the second pipeline. + """ + second_inputs = second_inputs or {} + self._validate_second_inputs(second_inputs) + + first_outputs = self.first.run( + first_inputs, include_outputs_from=self._prepare_reqd_outputs_for_first_pipeline() + ) + if self.map_first_outputs is not None: + first_outputs = self.map_first_outputs(first_outputs) + second_inputs = self._map_first_second_pipeline_io(first_outputs, second_inputs) + second_outputs = self.second.run(second_inputs, include_outputs_from=self.included_second_outputs) + + return {"first": first_outputs, "second": second_outputs} + + def run_first_as_batch( + self, first_inputs: List[Dict[str, Dict[str, Any]]], second_inputs: Optional[Dict[str, Dict[str, Any]]] = None + ) -> Dict[str, Dict[str, Any]]: + """ + Execute the pipeline pair in sequence. + + Invokes the first pipeline iteratively over the list of inputs and + passing the cumulative outputs to the second pipeline. This is suitable + when the first pipeline has a single logical input-to-output mapping and the + second pipeline expects multiple logical inputs, e.g: a retrieval + pipeline that accepts a single query and returns a list of documents + and an evaluation pipeline that accepts multiple lists of documents + and multiple lists of ground truth data. + + :param first_inputs: + A batch of inputs to the first pipeline. A mapping + function must be provided to aggregate the outputs. + :param second_inputs: + The inputs to the second pipeline. + :returns: + A dictionary with the following keys: + - `first` - The (aggregate) outputs of the first pipeline. + - `second` - The outputs of the second pipeline. + """ + second_inputs = second_inputs or {} + self._validate_second_inputs(second_inputs) + + first_components_with_outputs = self._prepare_reqd_outputs_for_first_pipeline() + if self.map_first_outputs is None: + raise ValueError("Mapping function for first pipeline outputs must be provided for batch execution.") + + first_outputs: Dict[str, Dict[str, Any]] = self.map_first_outputs( + [self.first.run(i, include_outputs_from=first_components_with_outputs) for i in first_inputs] + ) + if not isinstance(first_outputs, dict): + raise ValueError("Mapping function must return an aggregate dictionary of outputs.") + + second_inputs = self._map_first_second_pipeline_io(first_outputs, second_inputs) + second_outputs = self.second.run(second_inputs, include_outputs_from=self.included_second_outputs) + + return {"first": first_outputs, "second": second_outputs} diff --git a/haystack_experimental/evaluation/util.py b/haystack_experimental/evaluation/util.py new file mode 100644 index 00000000..fe48516b --- /dev/null +++ b/haystack_experimental/evaluation/util.py @@ -0,0 +1,98 @@ +# SPDX-FileCopyrightText: 2022-present deepset GmbH +# +# SPDX-License-Identifier: Apache-2.0 + +from copy import deepcopy +from typing import Any, Dict, List + + +def aggregate_batched_pipeline_outputs(outputs: List[Dict[str, Dict[str, Any]]]) -> Dict[str, Dict[str, Any]]: + """ + Combine the outputs of a pipeline that has been executed iteratively over a batch of inputs. + + Performs a transpose operation on the first and the third dimensions of the outputs. + + :param outputs: + A list of outputs from the pipeline, where each output + is a dictionary with the same keys and values with the + same keys. + :returns: + The combined outputs. + """ + # The pipeline is invoked iteratively over a batch of inputs, such + # that each element in the outputs corresponds to a single element in + # the batch input. + if len(outputs) == 0: + return {} + if len(outputs) == 1: + return outputs[0] + + # We'll use the first output as a sentinel to determine + # if the shape of the rest of the outputs are the same. + sentinel = outputs[0] + for output in outputs[1:]: + if output.keys() != sentinel.keys(): + raise ValueError(f"Expected components '{list(sentinel.keys())}' " f"but got '{list(output.keys())}'") + + for component_name, expected in sentinel.items(): + got = output[component_name] + if got.keys() != expected.keys(): + raise ValueError( + f"Expected outputs from component '{component_name}' to have " + f"keys '{list(expected.keys())}' but got '{list(got.keys())}'" + ) + + # The outputs are of the correct/same shape. Now to transpose + # the outermost list with the innermost dictionary. + transposed: Dict[str, Dict[str, Any]] = {} + for k, v in sentinel.items(): + transposed[k] = {k_h: [] for k_h in v.keys()} + + for output in outputs: + for component_name, component_outputs in output.items(): + dest = transposed[component_name] + for output_name, output_value in component_outputs.items(): + dest[output_name].append(output_value) + + return transposed + + +def deaggregate_batched_pipeline_inputs(inputs: Dict[str, Dict[str, List[Any]]]) -> List[Dict[str, Dict[str, Any]]]: + """ + Separate the inputs of a pipeline that has been batched along its innermost dimension. + + Performs a transpose operation on the first and the third dimensions of the inputs. + + :param inputs: + A dictionary of pipeline inputs that maps + component-input pairs to lists of values. + :returns: + The separated inputs. + """ + if len(inputs) == 0: + return [] + + # First component's inputs + sentinel = next(iter(inputs.values())) + # First component's first input's values + sentinel = next(iter(sentinel.values())) # type: ignore + + for component_name, component_inputs in inputs.items(): + for input_name, input_values in component_inputs.items(): + if len(input_values) != len(sentinel): + raise ValueError( + f"Expected input '{component_name}.{input_name}' to have " + f"{len(sentinel)} values but got {len(input_values)}" + ) + + proto = {k: {k_h: None for k_h in v.keys()} for k, v in inputs.items()} + transposed: List[Dict[str, Dict[str, Any]]] = [] + + for i in range(len(sentinel)): + new_dict = deepcopy(proto) + for component_name, component_inputs in inputs.items(): + for input_name, input_values in component_inputs.items(): + new_dict[component_name][input_name] = input_values[i] + transposed.append(new_dict) + + return transposed diff --git a/haystack_experimental/testing/__init__.py b/haystack_experimental/testing/__init__.py new file mode 100644 index 00000000..c1764a6e --- /dev/null +++ b/haystack_experimental/testing/__init__.py @@ -0,0 +1,3 @@ +# SPDX-FileCopyrightText: 2022-present deepset GmbH +# +# SPDX-License-Identifier: Apache-2.0 diff --git a/haystack_experimental/testing/sample_components.py b/haystack_experimental/testing/sample_components.py new file mode 100644 index 00000000..60e8d33a --- /dev/null +++ b/haystack_experimental/testing/sample_components.py @@ -0,0 +1,40 @@ +# SPDX-FileCopyrightText: 2022-present deepset GmbH +# +# SPDX-License-Identifier: Apache-2.0 + +from typing import List, Optional + +from haystack.core.component import component + + +@component +class AddFixedValueBatch: + """ + Adds two values together. + """ + + def __init__(self, add: int = 1): + self.add = add + + @component.output_types(result=List[int]) + def run(self, value: List[int], add: Optional[List[int]] = None): + """ + Adds two values together. + """ + if add is None: + add = [self.add] * len(value) + return {"result": [v + a for v, a in zip(value, add)]} + + +@component +class DoubleBatch: + """ + Doubles the input value. + """ + + @component.output_types(value=List[int]) + def run(self, value: List[int]): + """ + Doubles the input value. + """ + return {"value": [v * 2 for v in value]} diff --git a/pyproject.toml b/pyproject.toml index 547d69d7..01ee888b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -7,7 +7,7 @@ name = "haystack-experimental" dynamic = ["version"] description = "Experimental components and features for the Haystack LLM framework." readme = "README.md" -license = {text = "Apache-2.0"} +license = { text = "Apache-2.0" } requires-python = ">=3.8" authors = [{ name = "deepset.ai", email = "info@deepset.ai" }] classifiers = [ @@ -25,9 +25,7 @@ classifiers = [ "Programming Language :: Python :: 3.12", "Topic :: Scientific/Engineering :: Artificial Intelligence", ] -dependencies = [ - "haystack-ai", -] +dependencies = ["haystack-ai"] [project.urls] "CI: GitHub" = "https://github.com/deepset-ai/haystack-experimental/actions" "GitHub: issues" = "https://github.com/deepset-ai/haystack-experimental/issues" @@ -60,17 +58,11 @@ lint = [ "pylint -ry -j 0 {args:haystack_experimental}", ] test-cov = "coverage run -m pytest {args:test}" -cov-report = [ - "- coverage combine", - "coverage xml", -] -cov = [ - "test-cov", - "cov-report", -] +cov-report = ["- coverage combine", "coverage xml"] +cov = ["test-cov", "cov-report"] [tool.hatch.envs.readme] -detached = true # To avoid installing the dependencies from the default environment +detached = true # To avoid installing the dependencies from the default environment dependencies = ["haystack-pydoc-tools"] [tool.hatch.envs.readme.scripts] @@ -99,15 +91,18 @@ ignore-paths = [ "haystack_experimental/__init__.py", "haystack_experimental/version.py", ] + [tool.pylint.'MESSAGES CONTROL'] max-line-length = 120 +disable = [ + "C0114", # missing-module-docstring + "R0903", # too-few-public-methods +] [tool.pytest.ini_options] minversion = "6.0" addopts = "--strict-markers" -markers = [ - "integration: integration tests", -] +markers = ["integration: integration tests"] log_cli = true [tool.mypy] diff --git a/test/evaluation/__init__.py b/test/evaluation/__init__.py new file mode 100644 index 00000000..c1764a6e --- /dev/null +++ b/test/evaluation/__init__.py @@ -0,0 +1,3 @@ +# SPDX-FileCopyrightText: 2022-present deepset GmbH +# +# SPDX-License-Identifier: Apache-2.0 diff --git a/test/evaluation/test_pipeline_pair.py b/test/evaluation/test_pipeline_pair.py new file mode 100644 index 00000000..16007f8a --- /dev/null +++ b/test/evaluation/test_pipeline_pair.py @@ -0,0 +1,236 @@ +import pytest + +from haystack import Pipeline +from haystack_experimental.evaluation.pipeline_pair import PipelinePair +from haystack_experimental.evaluation.util import aggregate_batched_pipeline_outputs + +from haystack.testing.sample_components import AddFixedValue, Double +from haystack_experimental.testing.sample_components import AddFixedValueBatch, DoubleBatch + + +@pytest.fixture +def first_pipeline(): + first = Pipeline() + first.add_component("first_addition", AddFixedValue(add=10)) + first.add_component("second_addition", AddFixedValue(add=100)) + first.add_component("double", Double()) + first.connect("first_addition", "double") + first.connect("double", "second_addition") + return first + + +@pytest.fixture +def second_pipeline(): + second = Pipeline() + second.add_component("first_addition", AddFixedValue(add=1)) + second.add_component("second_addition", AddFixedValue(add=2)) + second.add_component("double", Double()) + second.connect("first_addition", "double") + second.connect("double", "second_addition") + return second + + +@pytest.fixture +def second_pipeline_batched(): + second = Pipeline() + second.add_component("first_addition", AddFixedValueBatch(add=1)) + second.add_component("second_addition", AddFixedValueBatch(add=2)) + second.add_component("double", DoubleBatch()) + second.connect("first_addition", "double") + second.connect("double", "second_addition") + return second + + +def test_pipeline_pair_init(first_pipeline, second_pipeline): + pair = PipelinePair( + first=first_pipeline, + second=second_pipeline, + outputs_to_inputs={"first_addition.result": ["first_addition.value"]}, + ) + + +def test_pipeline_pair_invalid_io_specifier(first_pipeline, second_pipeline): + with pytest.raises(ValueError, match="Invalid pipeline i/o path specifier"): + _ = PipelinePair( + first=first_pipeline, second=second_pipeline, outputs_to_inputs={"nonexistent": ["nonexistent"]} + ) + + +def test_pipeline_pair_invalid_first_component(first_pipeline, second_pipeline): + with pytest.raises(ValueError, match="Output component .* not found in first pipeline."): + _ = PipelinePair( + first=first_pipeline, second=second_pipeline, outputs_to_inputs={"nonexistent.out": ["nonexistent.in"]} + ) + + +def test_pipeline_pair_invalid_first_component_output(first_pipeline, second_pipeline): + + with pytest.raises(ValueError, match="Component .* in first pipeline does not have expected output"): + _ = PipelinePair( + first=first_pipeline, second=second_pipeline, outputs_to_inputs={"double.out": ["nonexistent.in"]} + ) + + +def test_pipeline_pair_invalid_second_component(first_pipeline, second_pipeline): + with pytest.raises(ValueError, match="Input component .* not found in second pipeline."): + _ = PipelinePair( + first=first_pipeline, + second=second_pipeline, + outputs_to_inputs={"first_addition.result": ["nonexistent.in"]}, + ) + + +def test_pipeline_pair_invalid_second_component_input(first_pipeline, second_pipeline): + with pytest.raises(ValueError, match="Component .* in second pipeline does not have expected input"): + _ = PipelinePair( + first=first_pipeline, + second=second_pipeline, + outputs_to_inputs={"first_addition.result": ["second_addition.some_input"]}, + ) + + +def test_pipeline_pair_invalid_second_multiple_inputs(first_pipeline, second_pipeline): + with pytest.raises( + ValueError, match="Input .* in second pipeline is connected to multiple first pipeline outputs." + ): + _ = PipelinePair( + first=first_pipeline, + second=second_pipeline, + outputs_to_inputs={ + "first_addition.result": ["second_addition.value"], + "second_addition.result": ["second_addition.value"], + }, + ) + + +def test_pipeline_pair_run(first_pipeline, second_pipeline): + pair = PipelinePair( + first=first_pipeline, + second=second_pipeline, + outputs_to_inputs={"first_addition.result": ["first_addition.value"]}, + included_first_outputs={"first_addition"}, + included_second_outputs={"double"}, + ) + + results = pair.run({"first_addition": {"value": 1}}) + assert results == { + "first": {"first_addition": {"result": 11}, "second_addition": {"result": 122}}, + "second": {"double": {"value": 24}, "second_addition": {"result": 26}}, + } + + pair = PipelinePair( + first=first_pipeline, + second=second_pipeline, + outputs_to_inputs={"first_addition.result": ["first_addition.value", "first_addition.add"]}, + included_first_outputs={"first_addition"}, + included_second_outputs={"first_addition", "double"}, + ) + + results = pair.run({"first_addition": {"value": 10}}) + assert results == { + "first": {"first_addition": {"result": 20}, "second_addition": {"result": 140}}, + "second": {"first_addition": {"result": 40}, "double": {"value": 80}, "second_addition": {"result": 82}}, + } + + +def test_pipeline_pair_run_second_extra_inputs(first_pipeline, second_pipeline): + pair = PipelinePair( + first=first_pipeline, + second=second_pipeline, + outputs_to_inputs={"first_addition.result": ["first_addition.value"]}, + included_first_outputs={"first_addition"}, + included_second_outputs={"first_addition", "double"}, + ) + + results = pair.run( + first_inputs={"first_addition": {"value": 1}}, + second_inputs={"first_addition": {"add": 10}, "second_addition": {"add": 100}}, + ) + assert results == { + "first": {"first_addition": {"result": 11}, "second_addition": {"result": 122}}, + "second": {"first_addition": {"result": 21}, "double": {"value": 42}, "second_addition": {"result": 142}}, + } + + +def test_pipeline_pair_run_invalid_second_extra_inputs(first_pipeline, second_pipeline): + pair = PipelinePair( + first=first_pipeline, + second=second_pipeline, + outputs_to_inputs={"first_addition.result": ["first_addition.value"]}, + included_first_outputs={"first_addition"}, + included_second_outputs={"first_addition", "double"}, + ) + + with pytest.raises( + ValueError, match="Second pipeline input .* cannot be provided both explicitly and by the first pipeline" + ): + results = pair.run( + first_inputs={"first_addition": {"value": 1}}, second_inputs={"first_addition": {"value": 10}} + ) + + +def test_pipeline_pair_run_map_first_outputs(first_pipeline, second_pipeline): + pair = PipelinePair( + first=first_pipeline, + second=second_pipeline, + outputs_to_inputs={"first_addition.result": ["first_addition.value"]}, + included_first_outputs={"first_addition"}, + included_second_outputs={"double"}, + map_first_outputs=lambda x: {"first_addition": {"result": 0}, "second_addition": {"result": 0}}, + ) + + results = pair.run({"first_addition": {"value": 1}}) + assert results == { + "first": {"first_addition": {"result": 0}, "second_addition": {"result": 0}}, + "second": {"double": {"value": 2}, "second_addition": {"result": 4}}, + } + + +def test_pipeline_pair_run_first_as_batch(first_pipeline, second_pipeline_batched): + pair = PipelinePair( + first=first_pipeline, + second=second_pipeline_batched, + outputs_to_inputs={"second_addition.result": ["first_addition.value"]}, + included_first_outputs={"first_addition"}, + included_second_outputs={"first_addition", "double"}, + map_first_outputs=lambda x: aggregate_batched_pipeline_outputs(x), + ) + + results = pair.run_first_as_batch([{"first_addition": {"value": i}} for i in range(5)]) + assert results == { + "first": { + "first_addition": {"result": [10, 11, 12, 13, 14]}, + "second_addition": {"result": [120, 122, 124, 126, 128]}, + }, + "second": { + "first_addition": {"result": [121, 123, 125, 127, 129]}, + "double": {"value": [242, 246, 250, 254, 258]}, + "second_addition": {"result": [244, 248, 252, 256, 260]}, + }, + } + + +def test_pipeline_pair_run_first_as_batch_invalid_map_first_outputs(first_pipeline, second_pipeline_batched): + pair = PipelinePair( + first=first_pipeline, + second=second_pipeline_batched, + outputs_to_inputs={"second_addition.result": ["first_addition.value"]}, + included_first_outputs={"first_addition"}, + included_second_outputs={"first_addition", "double"}, + map_first_outputs=None, + ) + + with pytest.raises(ValueError, match="Mapping function for first pipeline outputs must be provided"): + results = pair.run_first_as_batch([{"first_addition": {"value": i}} for i in range(5)]) + + pair = PipelinePair( + first=first_pipeline, + second=second_pipeline_batched, + outputs_to_inputs={"second_addition.result": ["first_addition.value"]}, + included_first_outputs={"first_addition"}, + included_second_outputs={"first_addition", "double"}, + map_first_outputs=lambda x: x, + ) + + with pytest.raises(ValueError, match="Mapping function must return an aggregate dictionary"): + results = pair.run_first_as_batch([{"first_addition": {"value": i}} for i in range(5)]) diff --git a/test/evaluation/test_util.py b/test/evaluation/test_util.py new file mode 100644 index 00000000..e6d80d89 --- /dev/null +++ b/test/evaluation/test_util.py @@ -0,0 +1,54 @@ +# SPDX-FileCopyrightText: 2022-present deepset GmbH +# +# SPDX-License-Identifier: Apache-2.0 +import pytest + +from haystack_experimental.evaluation.util import ( + aggregate_batched_pipeline_outputs, + deaggregate_batched_pipeline_inputs, +) + + +def test_aggregate_batched_pipeline_outputs_empty(): + assert aggregate_batched_pipeline_outputs([]) == {} + + +def test_aggregate_batched_pipeline_outputs_single(): + assert aggregate_batched_pipeline_outputs([{"a": {"b": [1, 2]}}]) == {"a": {"b": [1, 2]}} + + +def test_aggregate_batched_pipeline_outputs_multiple(): + outputs = [{"a": {"b": [1, 2], "c": [10, 20]}}, {"a": {"b": [3, 4], "c": [30, 40]}}] + assert aggregate_batched_pipeline_outputs(outputs) == {"a": {"b": [[1, 2], [3, 4]], "c": [[10, 20], [30, 40]]}} + + +def test_aggregate_batched_pipeline_outputs_mismatched_components(): + outputs = [{"a": {"b": [1, 2]}}, {"c": {"b": [3, 4]}}] + with pytest.raises(ValueError, match="Expected components .* but got"): + aggregate_batched_pipeline_outputs(outputs) + + +def test_aggregate_batched_pipeline_outputs_mismatched_component_outputs(): + outputs = [{"a": {"b": [1, 2]}}, {"a": {"b": [3, 4], "c": [5, 6]}}] + with pytest.raises(ValueError, match="Expected outputs from component .* to have keys .* but got"): + aggregate_batched_pipeline_outputs(outputs) + + +def test_deaggregate_batched_pipeline_inputs_empty(): + assert deaggregate_batched_pipeline_inputs({}) == [] + + +def test_deaggregate_batched_pipeline_inputs_single(): + inputs = {"a": {"b": [1, 2]}} + assert deaggregate_batched_pipeline_inputs(inputs) == [{"a": {"b": 1}}, {"a": {"b": 2}}] + + +def test_deaggregate_batched_pipeline_inputs_multiple(): + inputs = {"a": {"b": [1, 2], "c": [10, 20]}} + assert deaggregate_batched_pipeline_inputs(inputs) == [{"a": {"b": 1, "c": 10}}, {"a": {"b": 2, "c": 20}}] + + +def test_deaggregate_batched_pipeline_inputs_shape_mismatch(): + inputs = {"a": {"b": [1, 2]}, "c": {"b": [3]}} + with pytest.raises(ValueError, match="Expected input .* to have *. values but got"): + deaggregate_batched_pipeline_inputs(inputs)