Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Deprecate max_loops_allowed in favour of new argument max_runs_per_component #8354

Merged
merged 4 commits into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions haystack/core/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@
#
# SPDX-License-Identifier: Apache-2.0

import warnings

# TODO: Remove this when PipelineMaxLoops is removed
warnings.filterwarnings("default", category=DeprecationWarning, module=__name__)


class PipelineError(Exception):
pass
Expand All @@ -24,6 +29,15 @@ class PipelineDrawingError(PipelineError):


class PipelineMaxLoops(PipelineError):
# NOTE: This is shown also when importing PipelineMaxComponentRuns, I can't find an easy
# way to fix this, so I will ignore that case.
warnings.warn(
"PipelineMaxLoops is deprecated and will be remove in version '2.7.0'; use PipelineMaxComponentRuns instead.",
DeprecationWarning,
)


class PipelineMaxComponentRuns(PipelineMaxLoops):
pass


Expand Down
64 changes: 55 additions & 9 deletions haystack/core/pipeline/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import importlib
import itertools
import warnings
from collections import defaultdict
from copy import copy, deepcopy
from datetime import datetime
Expand Down Expand Up @@ -41,6 +42,11 @@

logger = logging.getLogger(__name__)

_MAX_LOOPS_ALLOWED_DEPRECATION_MESSAGE = (
"'max_loops_allowed' argument is deprecated and will be removed in version '2.7.0'. "
"Use 'max_runs_per_component' instead."
)


class PipelineBase:
"""
Expand All @@ -52,29 +58,63 @@ class PipelineBase:
def __init__(
self,
metadata: Optional[Dict[str, Any]] = None,
max_loops_allowed: int = 100,
max_loops_allowed: Optional[int] = None,
debug_path: Union[Path, str] = Path(".haystack_debug/"),
max_runs_per_component: int = 100,
):
"""
Creates the Pipeline.

:param metadata:
Arbitrary dictionary to store metadata about this pipeline. Make sure all the values contained in
this dictionary can be serialized and deserialized if you wish to save this pipeline to file with
`save_pipelines()/load_pipelines()`.
Arbitrary dictionary to store metadata about this `Pipeline`. Make sure all the values contained in
this dictionary can be serialized and deserialized if you wish to save this `Pipeline` to file.
:param max_loops_allowed:
How many times the pipeline can run the same node before throwing an exception.
How many times the `Pipeline` can run the same node before throwing an exception.
This is deprecated and will be removed in version 2.7.0, use `max_runs_per_component` instead.
:param debug_path:
When debug is enabled in `run()`, where to save the debug data.
:param max_runs_per_component:
How many times the `Pipeline` can run the same Component.
If this limit is reached a `PipelineMaxComponentRuns` exception is raised.
If not set defaults to 100 runs per Component.
"""
self._telemetry_runs = 0
self._last_telemetry_sent: Optional[datetime] = None
self.metadata = metadata or {}
self.max_loops_allowed = max_loops_allowed
self.graph = networkx.MultiDiGraph()
self._debug: Dict[int, Dict[str, Any]] = {}
self._debug_path = Path(debug_path)

if max_loops_allowed is not None:
warnings.warn(_MAX_LOOPS_ALLOWED_DEPRECATION_MESSAGE, DeprecationWarning)
self._max_runs_per_component = max_loops_allowed
else:
self._max_runs_per_component = max_runs_per_component

@property
def max_loops_allowed(self) -> int:
"""
Returns the maximum number of runs per Component allowed in this Pipeline.

This is a deprecated field, use `max_runs_per_component` instead.

:return: Maximum number of runs per Component
"""
warnings.warn(_MAX_LOOPS_ALLOWED_DEPRECATION_MESSAGE, DeprecationWarning)
return self._max_runs_per_component

@max_loops_allowed.setter
def max_loops_allowed(self, value: int):
"""
Sets the maximum number of runs per Component allowed in this Pipeline.

This is a deprecated property, use `max_runs_per_component` instead.

:param value: Maximum number of runs per Component
"""
warnings.warn(_MAX_LOOPS_ALLOWED_DEPRECATION_MESSAGE, DeprecationWarning)
self._max_runs_per_component = value

def __eq__(self, other) -> bool:
"""
Pipeline equality is defined by their type and the equality of their serialized form.
Expand Down Expand Up @@ -128,7 +168,7 @@ def to_dict(self) -> Dict[str, Any]:
connections.append({"sender": f"{sender}.{sender_socket}", "receiver": f"{receiver}.{receiver_socket}"})
return {
"metadata": self.metadata,
"max_loops_allowed": self.max_loops_allowed,
"max_runs_per_component": self._max_runs_per_component,
"components": components,
"connections": connections,
}
Expand All @@ -152,9 +192,15 @@ def from_dict(
"""
data_copy = deepcopy(data) # to prevent modification of original data
metadata = data_copy.get("metadata", {})
max_loops_allowed = data_copy.get("max_loops_allowed", 100)
max_runs_per_component = data_copy.get("max_runs_per_component", 100)
max_loops_allowed = data_copy.get("max_loops_allowed", None)
debug_path = Path(data_copy.get("debug_path", ".haystack_debug/"))
pipe = cls(metadata=metadata, max_loops_allowed=max_loops_allowed, debug_path=debug_path)
pipe = cls(
metadata=metadata,
max_loops_allowed=max_loops_allowed,
max_runs_per_component=max_runs_per_component,
debug_path=debug_path,
)
components_to_reuse = kwargs.get("components", {})
for name, component_data in data_copy.get("components", {}).items():
if name in components_to_reuse:
Expand Down
11 changes: 6 additions & 5 deletions haystack/core/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from haystack import logging, tracing
from haystack.core.component import Component
from haystack.core.errors import PipelineMaxLoops, PipelineRuntimeError
from haystack.core.errors import PipelineMaxComponentRuns, PipelineRuntimeError
from haystack.core.pipeline.base import (
_dequeue_component,
_dequeue_waiting_component,
Expand Down Expand Up @@ -205,7 +205,8 @@ def run(self, word: str):
"haystack.pipeline.input_data": data,
"haystack.pipeline.output_data": final_outputs,
"haystack.pipeline.metadata": self.metadata,
"haystack.pipeline.max_loops_allowed": self.max_loops_allowed,
"haystack.pipeline.max_loops_allowed": self._max_runs_per_component,
"haystack.pipeline.max_runs_per_component": self._max_runs_per_component,
},
):
# Cache for extra outputs, if enabled.
Expand All @@ -221,9 +222,9 @@ def run(self, word: str):
continue

if self._component_has_enough_inputs_to_run(name, components_inputs):
if self.graph.nodes[name]["visits"] > self.max_loops_allowed:
msg = f"Maximum loops count ({self.max_loops_allowed}) exceeded for component '{name}'"
raise PipelineMaxLoops(msg)
if self.graph.nodes[name]["visits"] > self._max_runs_per_component:
msg = f"Maximum run count {self._max_runs_per_component} reached for component '{name}'"
raise PipelineMaxComponentRuns(msg)

res: Dict[str, Any] = self._run_component(name, components_inputs[name])

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
---
enhancements:
- |
Add new `Pipeline` init argument `max_runs_per_component`, this has the same identical
behaviour as the existing `max_loops_allowed` argument but is more descriptive of its actual effects.
- |
Add new `PipelineMaxLoops` to reflect new `max_runs_per_component` init argument
deprecations:
- |
`Pipeline` init argument `max_loops_allowed` is deprecated and will be remove in version `2.7.0`. Use `max_runs_per_component` instead.
- |
`PipelineMaxLoops` exception is deprecated and will be remove in version `2.7.0`. Use `PipelineMaxComponentRuns` instead.
2 changes: 1 addition & 1 deletion test/core/pipeline/features/pipeline_run.feature
Original file line number Diff line number Diff line change
Expand Up @@ -48,5 +48,5 @@ Feature: Pipeline running

Examples:
| kind | exception |
| that has an infinite loop | PipelineMaxLoops |
| that has an infinite loop | PipelineMaxComponentRuns |
| that has a component that doesn't return a dictionary | PipelineRuntimeError |
20 changes: 10 additions & 10 deletions test/core/pipeline/features/test_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def custom_init(self):
component.set_output_types(self, a=int, b=int)

FakeComponent = component_class("FakeComponent", output={"a": 1, "b": 1}, extra_fields={"__init__": custom_init})
pipe = Pipeline(max_loops_allowed=1)
pipe = Pipeline(max_runs_per_component=1)
pipe.add_component("first", FakeComponent())
pipe.add_component("second", FakeComponent())
pipe.connect("first.a", "second.x")
Expand All @@ -83,7 +83,7 @@ def custom_init(self):

@given("a pipeline that is really complex with lots of components, forks, and loops", target_fixture="pipeline_data")
def pipeline_complex():
pipeline = Pipeline(max_loops_allowed=2)
pipeline = Pipeline(max_runs_per_component=2)
pipeline.add_component("greet_first", Greet(message="Hello, the value is {value}."))
pipeline.add_component("accumulate_1", Accumulate())
pipeline.add_component("add_two", AddFixedValue(add=2))
Expand Down Expand Up @@ -205,7 +205,7 @@ def run(self, a: int, b: int = 2):

@given("a pipeline that has two loops of identical lengths", target_fixture="pipeline_data")
def pipeline_that_has_two_loops_of_identical_lengths():
pipeline = Pipeline(max_loops_allowed=10)
pipeline = Pipeline(max_runs_per_component=10)
pipeline.add_component("branch_joiner", BranchJoiner(type_=int))
pipeline.add_component("remainder", Remainder(divisor=3))
pipeline.add_component("add_one", AddFixedValue(add=1))
Expand Down Expand Up @@ -250,7 +250,7 @@ def pipeline_that_has_two_loops_of_identical_lengths():

@given("a pipeline that has two loops of different lengths", target_fixture="pipeline_data")
def pipeline_that_has_two_loops_of_different_lengths():
pipeline = Pipeline(max_loops_allowed=10)
pipeline = Pipeline(max_runs_per_component=10)
pipeline.add_component("branch_joiner", BranchJoiner(type_=int))
pipeline.add_component("remainder", Remainder(divisor=3))
pipeline.add_component("add_one", AddFixedValue(add=1))
Expand Down Expand Up @@ -306,7 +306,7 @@ def pipeline_that_has_two_loops_of_different_lengths():
@given("a pipeline that has a single loop with two conditional branches", target_fixture="pipeline_data")
def pipeline_that_has_a_single_loop_with_two_conditional_branches():
accumulator = Accumulate()
pipeline = Pipeline(max_loops_allowed=10)
pipeline = Pipeline(max_runs_per_component=10)

pipeline.add_component("add_one", AddFixedValue(add=1))
pipeline.add_component("branch_joiner", BranchJoiner(type_=int))
Expand Down Expand Up @@ -503,7 +503,7 @@ def pipeline_that_has_different_combinations_of_branches_that_merge_and_do_not_m

@given("a pipeline that has two branches, one of which loops back", target_fixture="pipeline_data")
def pipeline_that_has_two_branches_one_of_which_loops_back():
pipeline = Pipeline(max_loops_allowed=10)
pipeline = Pipeline(max_runs_per_component=10)
pipeline.add_component("add_zero", AddFixedValue(add=0))
pipeline.add_component("branch_joiner", BranchJoiner(type_=int))
pipeline.add_component("sum", Sum())
Expand Down Expand Up @@ -704,7 +704,7 @@ def pipeline_that_has_a_component_that_doesnt_return_a_dictionary():
output=1, # type:ignore
)

pipe = Pipeline(max_loops_allowed=10)
pipe = Pipeline(max_runs_per_component=10)
pipe.add_component("comp", BrokenComponent())
return pipe, [PipelineRunData({"comp": {"a": 1}})]

Expand Down Expand Up @@ -916,7 +916,7 @@ def fake_generator_run(self, prompt: str, generation_kwargs: Optional[Dict[str,
target_fixture="pipeline_data",
)
def pipeline_that_has_a_single_component_that_send_one_of_outputs_to_itself():
pipeline = Pipeline(max_loops_allowed=10)
pipeline = Pipeline(max_runs_per_component=10)
pipeline.add_component("self_loop", SelfLoop())
pipeline.connect("self_loop.current_value", "self_loop.values")

Expand All @@ -934,7 +934,7 @@ def pipeline_that_has_a_single_component_that_send_one_of_outputs_to_itself():

@given("a pipeline that has a component that sends one of its outputs to itself", target_fixture="pipeline_data")
def pipeline_that_has_a_component_that_sends_one_of_its_outputs_to_itself():
pipeline = Pipeline(max_loops_allowed=10)
pipeline = Pipeline(max_runs_per_component=10)
pipeline.add_component("add_1", AddFixedValue())
pipeline.add_component("self_loop", SelfLoop())
pipeline.add_component("add_2", AddFixedValue())
Expand Down Expand Up @@ -1068,7 +1068,7 @@ def pipeline_that_is_linear_and_returns_intermediate_outputs():

@given("a pipeline that has a loop and returns intermediate outputs from it", target_fixture="pipeline_data")
def pipeline_that_has_a_loop_and_returns_intermediate_outputs_from_it():
pipeline = Pipeline(max_loops_allowed=10)
pipeline = Pipeline(max_runs_per_component=10)
pipeline.add_component("add_one", AddFixedValue(add=1))
pipeline.add_component("branch_joiner", BranchJoiner(type_=int))
pipeline.add_component("below_10", Threshold(threshold=10))
Expand Down
Loading