Skip to content

Commit

Permalink
feat: Rework Pipeline.run() to better handle cycles (#8431)
Browse files Browse the repository at this point in the history
* draft

* Enhance

* Almost works

* Simplify some parts and handle intermediate outputs

* Handle connections with default

* Handle cycles with multiple connections from two components

* Update distributed outputs at the correct time

* Remove Component inputs after it runs

* Add agent pipeline test case

* Fix infite loop test

* Handle some corner cases with loops checking and inputs deletion

* Fix tests

* Add new behavioral test

* Remove unused code in behavioural test

* Fix behavioural test

* Fix max run check

* Simplify outputs distribution

* Simplify subgraph run check

* Remove unused _init_run_queue function

* Remove commented code

* Add some missing type hints

* Simplify cycles breaking

* Fix _distribute_output test

* Fix _find_components_that_will_receive_no_input test

* Fix validation test

* Fix tracer losing Component inputs

* Fix some linting issues

* Remove ignore pylint rule

* Rename method that break cycles and make it raise

* Add docstring to _run_subgraph

* Update Pipeline.run() docstring

* Update comment to clarify cycles execution

* Remove SelfLoop sample Component

* Add behavioural test for unsupported cycles

* Rename behavioural test to be more specific

* Add new behavioural test

* Add release notes

* Remove commented out code and random pass

* Use more efficient function to find cycles

* Simplify _break_supported_cycles_in_graph by using defaultdict

* Stop breaking edges as soon as we make the graph acyclic

* Fix docstring and add some more comments

* Fix _distribute_output docstring

* Fix _find_receivers_from docstring

* More detailed release notes

* Minimize calls to networkx.is_directed_acyclic_graph

* Add some more info on edges keys

* Adjust components_in_cycles comment

* Add new Pipeline behavioural test

* Enhance _find_components_that_will_receive_no_input to cover more cases

* Explain why run_queue is reset after running a subgraph cycle

* Rename _init_inputs_state to _normalize_input_data

* Better explain the subgraph output distribution

* Remove for else

* Fix some comments and docstrings

* Fix linting

* Add missing return type

* Fix typo

* Rename _normalize_input_data to _normalize_varidiac_input_data and add more documentation

* Remove unused import

---------

Co-authored-by: Sebastian Husch Lee <[email protected]>
  • Loading branch information
silvanocerza and sjrl authored Oct 29, 2024
1 parent d430833 commit 8205724
Show file tree
Hide file tree
Showing 9 changed files with 1,193 additions and 211 deletions.
241 changes: 178 additions & 63 deletions haystack/core/pipeline/base.py

Large diffs are not rendered by default.

348 changes: 298 additions & 50 deletions haystack/core/pipeline/pipeline.py

Large diffs are not rendered by default.

2 changes: 0 additions & 2 deletions haystack/testing/sample_components/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
from haystack.testing.sample_components.parity import Parity
from haystack.testing.sample_components.remainder import Remainder
from haystack.testing.sample_components.repeat import Repeat
from haystack.testing.sample_components.self_loop import SelfLoop
from haystack.testing.sample_components.subtract import Subtract
from haystack.testing.sample_components.sum import Sum
from haystack.testing.sample_components.text_splitter import TextSplitter
Expand All @@ -35,6 +34,5 @@
"Hello",
"TextSplitter",
"StringListJoiner",
"SelfLoop",
"FString",
]
27 changes: 0 additions & 27 deletions haystack/testing/sample_components/self_loop.py

This file was deleted.

12 changes: 12 additions & 0 deletions releasenotes/notes/pipeline-run-rework-23a972d83b792db2.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
---
highlights: >
`Pipeline.run()` internal logic has been heavily reworked to be more robust and reliable
than before.
This new implementation makes it easier to run `Pipeline`s that have cycles in their graph.
It also fixes some corner cases in `Pipeline`s that don't have any cycle.
features:
- |
Fundamentally rework the internal logic of `Pipeline.run()`.
The rework makes it more reliable and covers more use cases.
We fixed some issues that made `Pipeline`s with cycles unpredictable
and with unclear Components execution order.
7 changes: 6 additions & 1 deletion test/core/pipeline/features/pipeline_run.feature
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ Feature: Pipeline running
| that has a greedy and variadic component after a component with default input |
| that has components added in a different order from the order of execution |
| that has a component with only default inputs |
| that has a component with only default inputs as first to run |
| that has a component with only default inputs as first to run and receives inputs from a loop |
| that has multiple branches that merge into a component with a single variadic input |
| that has multiple branches of different lengths that merge into a component with a single variadic input |
| that is linear and returns intermediate outputs |
Expand All @@ -37,8 +37,12 @@ Feature: Pipeline running
| that has a loop and a component with default inputs that doesn't receive anything from its sender but receives input from user |
| that has multiple components with only default inputs and are added in a different order from the order of execution |
| that is linear with conditional branching and multiple joins |
| that is a simple agent |
| that has a variadic component that receives partial inputs |
| that has an answer joiner variadic component |
| that is linear and a component in the middle receives optional input from other components and input from the user |
| that has a loop in the middle |
| that has variadic component that receives a conditional input |

Scenario Outline: Running a bad Pipeline
Given a pipeline <kind>
Expand All @@ -49,3 +53,4 @@ Feature: Pipeline running
| kind | exception |
| that has an infinite loop | PipelineMaxComponentRuns |
| that has a component that doesn't return a dictionary | PipelineRuntimeError |
| that has a cycle that would get it stuck | PipelineRuntimeError |
Loading

0 comments on commit 8205724

Please sign in to comment.