Skip to content

Commit

Permalink
docs: Refactor merge sort code example to use literalinclude (#6091)
Browse files Browse the repository at this point in the history
Signed-off-by: David (Ti-Wei) Lin <[email protected]>
  • Loading branch information
davidlin20dev authored Dec 6, 2024
1 parent c91fb69 commit 9028375
Showing 1 changed file with 3 additions and 57 deletions.
60 changes: 3 additions & 57 deletions docs/user_guide/advanced_composition/dynamic_workflows.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,63 +141,9 @@ resulting in less noticeable overhead.
Merge sort is a perfect example to showcase how to seamlessly achieve recursion using dynamic workflows.
Flyte imposes limitations on the depth of recursion to prevent misuse and potential impacts on the overall stability of the system.

```python
from typing import Tuple

from flytekit import conditional, dynamic, task, workflow


@task
def split(numbers: list[int]) -> Tuple[list[int], list[int], int, int]:
return (
numbers[0 : int(len(numbers) / 2)],
numbers[int(len(numbers) / 2) :],
int(len(numbers) / 2),
int(len(numbers)) - int(len(numbers) / 2),
)


@task
def merge(sorted_list1: list[int], sorted_list2: list[int]) -> list[int]:
result = []
while len(sorted_list1) > 0 and len(sorted_list2) > 0:
# Compare the current element of the first array with the current element of the second array.
# If the element in the first array is smaller, append it to the result and increment the first array index.
# Otherwise, do the same with the second array.
if sorted_list1[0] < sorted_list2[0]:
result.append(sorted_list1.pop(0))
else:
result.append(sorted_list2.pop(0))

# Extend the result with the remaining elements from both arrays
result.extend(sorted_list1)
result.extend(sorted_list2)

return result


@task
def sort_locally(numbers: list[int]) -> list[int]:
return sorted(numbers)


@dynamic
def merge_sort_remotely(numbers: list[int], run_local_at_count: int) -> list[int]:
split1, split2, new_count1, new_count2 = split(numbers=numbers)
sorted1 = merge_sort(numbers=split1, numbers_count=new_count1, run_local_at_count=run_local_at_count)
sorted2 = merge_sort(numbers=split2, numbers_count=new_count2, run_local_at_count=run_local_at_count)
return merge(sorted_list1=sorted1, sorted_list2=sorted2)


@workflow
def merge_sort(numbers: list[int], numbers_count: int, run_local_at_count: int = 5) -> list[int]:
return (
conditional("terminal_case")
.if_(numbers_count <= run_local_at_count)
.then(sort_locally(numbers=numbers))
.else_()
.then(merge_sort_remotely(numbers=numbers, run_local_at_count=run_local_at_count))
)
```{literalinclude} /examples/advanced_composition/advanced_composition/dynamic_workflow.py
:caption: advanced_composition/dynamic_workflow.py
:lines: 84-134
```

By simply adding the `@dynamic` annotation, the `merge_sort_remotely` function transforms into a plan of execution,
Expand Down

0 comments on commit 9028375

Please sign in to comment.