From 1f621789281e1d993d358c421dbfedb522d9e15e Mon Sep 17 00:00:00 2001 From: Jeremiah Lowin <153965+jlowin@users.noreply.github.com> Date: Fri, 6 Sep 2024 09:56:53 -0400 Subject: [PATCH 1/2] Remove planning module --- docs/quickstart.mdx | 17 +++-- src/controlflow/planning/plan.py | 117 ------------------------------- src/controlflow/tasks/task.py | 18 ----- 3 files changed, 8 insertions(+), 144 deletions(-) delete mode 100644 src/controlflow/planning/plan.py diff --git a/docs/quickstart.mdx b/docs/quickstart.mdx index bc927898..5c5c5085 100644 --- a/docs/quickstart.mdx +++ b/docs/quickstart.mdx @@ -1,6 +1,6 @@ --- title: "Quickstart" -description: Build your first agentic workflow in less than 30 seconds. +description: Build your first agentic workflow in less than a minute. icon: rocket --- @@ -21,19 +21,23 @@ Install ControlFlow with pip: pip install controlflow ``` -Configure your preferred LLM provider. By default, ControlFlow uses OpenAI, so you'll need to set your API key: +Next, set up your LLM provider. By default, ControlFlow uses OpenAI, so you'll need to configure an OpenAI API key: ```bash export OPENAI_API_KEY="your-api-key" ``` -To use another provider, see the docs on [configuring LLMs](/guides/llms). Note that one of the agents in this quickstart is configured with OpenAI's GPT-4o-mini model; you can change the model name to whatever you prefer. +To use another provider, see the docs on [configuring LLMs](/guides/llms). -## Quickstart setup +## Create some data In this quickstart, we're going to build an email processing pipelines, so let's create some sample data to work with. Execute this code in your Python interpreter to set up the (very simple) example emails we'll use throughout the quickstart: + +Try changing the emails to your own content to see how ControlFlow works with different inputs. + + ```python emails = [ "Hello, I need an update on the project status.", @@ -41,9 +45,6 @@ emails = [ "Urgent: Project deadline moved up by one week.", ] ``` - -Try changing the emails to your own content to see how ControlFlow works with different inputs. - ## Running a single task Let's start with the basics. We're going to create a task that generates a reply to an email. @@ -105,8 +106,6 @@ You may have noticed that in the last example, we didn't assign an agent to the Agents are sort of like portable configurations for how to perform tasks, which could include specific LLMs, tools, instructions, and more. For our spam classifier, we'll create a new agent that uses a smaller, faster LLM and specialized instructions. - - In addition, note that the `result_type` of this task is a list of labels, indicating that the agent must choose one of the provided options. This is the simplest way to create a classification task, but you can require more complex [output formats](/concepts/tasks/task-results) as well. diff --git a/src/controlflow/planning/plan.py b/src/controlflow/planning/plan.py deleted file mode 100644 index 32d738b7..00000000 --- a/src/controlflow/planning/plan.py +++ /dev/null @@ -1,117 +0,0 @@ -from typing import Optional, TypeVar, Union - -from pydantic import Field - -from controlflow.agents import Agent -from controlflow.flows import Flow -from controlflow.tasks.task import Task -from controlflow.tools import Tool, as_tools -from controlflow.utilities.general import ControlFlowModel - -ToolLiteral = TypeVar("ToolLiteral", bound=str) - - -class PlanTask(ControlFlowModel): - id: int - objective: str - instructions: Optional[str] = Field( - None, - description="Any additional instructions for completing the task objective.", - ) - depends_on: list[int] = Field( - [], description="Tasks that must be completed before this task can be started." - ) - parent: Optional[int] = Field(None, description="The parent of this task (if any).") - agent: int = Field( - description="The agent assigned to the task. If empty, the default agent is used.", - ) - tools: list[int] = Field( - [], - description="The tools provided to complete the task. If empty, no tools are provided.", - ) - - -def create_plan( - objective: str, - instructions: Optional[str] = None, - planning_agent: Optional[Agent] = None, - agents: Optional[list[Agent]] = None, - tools: list[Union[callable, Tool]] = None, - context: dict = None, -) -> list[Task]: - """ - Given an objective and instructions for achieving it, generate a plan for - completing the objective. Each step of the plan will be turned into a task - objective. - """ - tools = as_tools(tools or []) - - agent_dict = dict(enumerate(agents or [])) - tool_dict = dict( - enumerate([t.dict(include={"name", "description"}) for t in tools]) - ) - - task = Task( - objective=""" - Create a plan consisting of multiple tasks to complete the provided objective. - """, - instructions=""" - Use your mark_successful tool to create the plan. Do not post a - message or talk out loud. - - Each task should be a discrete, actionable step that contributes to - the overall objective. Do not waste time on uneccessary or redundant - steps. Make sure to use your tools. - - When creating tasks, imagine that you had to complete the plan - yourself. What steps would you take? What tools would you use? What - information would you need? Remember that each task has a token cost - (both in its evaluation and needing to mark it complete), so try to - organize objectives by outcomes and dependencies, not by the actions - you'd need to take. - - - Use `depends_on` to indicate which tasks must be completed before - others can start. Tasks can only depend on tasks that come before - them in your plan. - - Use `parent` to indicate tasks that are subtasks of others. - - Don't create needless tasks like "document the findings" - - """, - context=dict( - plan_objective=objective, - plan_instructions=instructions, - plan_agents=agent_dict, - plan_tools=tool_dict, - ) - | context - or {}, - agents=[planning_agent] if planning_agent else None, - result_type=list[PlanTask], - infer_parent=False, - ) - - # create a new flow to avoid polluting the main flow's history - with Flow(): - task.run() - - plan: list[PlanTask] = task.result - - task_ids = {} - - for t in plan: - try: - task_tools = [tool_dict[i] for i in t.tools] - except KeyError: - task_tools = [] - - task_ids[t.id] = Task( - objective=t.objective, - instructions=t.instructions, - depends_on=[task_ids[i] for i in t.depends_on], - parent=task_ids[t.parent] if t.parent else None, - agent=agent_dict[task.agent] if task.agent else None, - tools=task_tools, - context=context, - ) - - return list(task_ids.values()) diff --git a/src/controlflow/tasks/task.py b/src/controlflow/tasks/task.py index 52564976..ec161c89 100644 --- a/src/controlflow/tasks/task.py +++ b/src/controlflow/tasks/task.py @@ -508,24 +508,6 @@ def mark_failed(self, reason: Optional[str] = None): def mark_skipped(self): self.set_status(TaskStatus.SKIPPED) - # def generate_subtasks(self, instructions: str = None, agents: list[Agent] = None): - # """ - # Generate subtasks for this task based on the provided instructions. - # Subtasks can reuse the same tools and agents as this task. - # """ - # from controlflow.planning.plan import create_plan - - # # enter a context to set the parent task - # with self: - # create_plan( - # self.objective, - # instructions=instructions, - # planning_agent=agents[0] if agents else self.agents[0], - # agents=agents or self.agents, - # tools=self.tools, - # context=self.context, - # ) - def create_success_tool(self) -> Tool: """ Create an agent-compatible tool for marking this task as successful. From 0bbd301e1e49dca3b7b50b01408d6033fa453e7e Mon Sep 17 00:00:00 2001 From: Jeremiah Lowin <153965+jlowin@users.noreply.github.com> Date: Sat, 7 Sep 2024 10:53:15 -0400 Subject: [PATCH 2/2] Update docs --- docs/concepts/tasks/running-tasks.mdx | 22 +++- docs/patterns/planning.mdx | 62 +++++----- src/controlflow/__init__.py | 2 +- src/controlflow/agents/agent.py | 54 ++++++--- src/controlflow/fns/__init__.py | 2 + src/controlflow/fns/plan.py | 156 ++++++++++++++++++++++++++ src/controlflow/{ => fns}/run.py | 16 ++- tests/{ => fns}/test_run.py | 13 ++- 8 files changed, 266 insertions(+), 61 deletions(-) create mode 100644 src/controlflow/fns/__init__.py create mode 100644 src/controlflow/fns/plan.py rename src/controlflow/{ => fns}/run.py (73%) rename tests/{ => fns}/test_run.py (91%) diff --git a/docs/concepts/tasks/running-tasks.mdx b/docs/concepts/tasks/running-tasks.mdx index a44fd505..951c389d 100644 --- a/docs/concepts/tasks/running-tasks.mdx +++ b/docs/concepts/tasks/running-tasks.mdx @@ -56,11 +56,31 @@ Crafting worlds and shaping dreams. -Note that this example is functionally equivalent to the previous one. +Note that this example is functionally equivalent to the previous one. There is also an async equivalent, `cf.run_async()`. This operation is so common that you'll see `cf.run()` used throughout the ControlFlow documentation. +## `cf.run_tasks()` +ControlFlow also provides a `run_tasks` function, which orchestrates one or more preexisting tasks to completion. Note that you need to access the task's `result` to see the output. + +```python +import controlflow as cf + +task_1 = cf.Task('Write a poem about AI') +task_2 = cf.Task('Critique the poem', depends_on=[task_1]) + +cf.run_tasks([task_1, task_2]) + +print(task_1.result) +print(task_2.result) +``` + +There is also an equivalent async function, `cf.run_tasks_async`. + + +When you run tasks as a batch, they share context because they are automatically run in a single flow. + ## `@task` diff --git a/docs/patterns/planning.mdx b/docs/patterns/planning.mdx index 2edba14f..ced240f0 100644 --- a/docs/patterns/planning.mdx +++ b/docs/patterns/planning.mdx @@ -1,57 +1,45 @@ --- -title: Planning +title: AI Planning +description: Use AI to generate new tasks. +icon: compass --- - -Automatically generate subtasks to break complex tasks into manageable steps. - +The `plan()` function in ControlFlow extends the capabilities of AI workflows by allowing dynamic generation of tasks. This feature allows you to leverage AI for creating structured, goal-oriented task sequences programmatically. -ControlFlow has many features that help you structure your AI workflows into small, well-defined tasks. However, sometimes it will be impractical or even impossible to identify all the necessary tasks upfront. In these cases, you can use ControlFlow's planning capabilities to automatically generate subtasks based on the main task's objective. +## Purpose of AI planning -By calling the `generate_subtasks()` method on a task, you can instruct an AI agent to come up with a plan for achieving the main task. The agent will generate a series of subtasks, each representing a step in the plan, and set up the necessary dependencies between them. +While ControlFlow allows manual creation of tasks for AI workflows, there are scenarios where automatically generating tasks can be beneficial: +1. **Dynamic Task Generation**: When the specific steps to achieve a goal aren't known in advance. +2. **Complex Problem Decomposition**: For objectives that require breaking down into subtasks based on context or intermediate results. +3. **Adaptive Workflows**: In processes that need to adjust based on changing conditions or new information. +## The `plan()` function + +The `plan()` function takes a high-level objective and generates a structured sequence of tasks to achieve that goal. Here's a basic example: + ```python import controlflow as cf -task = cf.Task( - objective="Compare the height of the tallest building " - "in North America to the tallest building in Europe", +tasks = cf.plan( + objective="Analyze customer feedback data", + n_tasks=3 # Optionally specify the number of tasks ) -task.generate_subtasks() - -print([f'{i+1}: {t.objective}' for i, t in enumerate(task.subtasks)]) +# Execute the generated plan +cf.run_tasks(tasks) ``` -Running the above code will print something like: - -```python -[ - "1: Identify the Tallest Building in North America", - "2: Identify the Tallest Building in Europe", - "3: Obtain Height of the Tallest Building in North America", - "4: Obtain Height of the Tallest Building in Europe", - "5: Compare the Heights", -] - ``` -If you investigate more closely, you'll see that the subtasks have proper dependencies. In the above example, #3 depends on #1, #4 depends on #2, and #5 depends on #3 and #4. And of course, the parent task depends on all of them. - -ControlFlow's orchestration engine will not allow the parent task to be considered complete until all of its subtasks have been successfully executed, which is why this is an effective way to structure complex workflows. - - -Subtask generation isn't magic: it's a ControlFlow flow! - - -## Customizing subtask generation -You can influence subtask generation in a few ways. +In this example, `plan()` will generate a list of 3 tasks that, when completed, should result in an analysis of customer feedback data. These tasks might include steps like "Load data", "Preprocess text", "Perform sentiment analysis", etc. -### Planning agent +## Advanced usage -By default, subtasks are generated by the first agent assigned to the parent task. You can customize this by passing an `agent` argument to `generate_subtasks()`. +The `plan` function can do more than just generate tasks that achieve an objective. +### Dependencies -### Instructions +If appropriate, `plan` can generate tasks that depend on each other or have parent/child relationships. You can influence this behavior by providing `instructions`. -You can provide natural language `instructions` to help the agent generate subtasks. This is especially useful when the task is ambiguous or requires domain-specific knowledge. +### Agents and tools +You can pass a list of agents or tools to the `plan` function. It will take these into account when generating tasks and assign agents or tools to tasks as needed. diff --git a/src/controlflow/__init__.py b/src/controlflow/__init__.py index dad19a11..bae31721 100644 --- a/src/controlflow/__init__.py +++ b/src/controlflow/__init__.py @@ -15,7 +15,7 @@ from .instructions import instructions from .decorators import flow, task from .tools import tool -from .run import run, run_async, run_tasks, run_tasks_async +from .fns import run, run_async, run_tasks, run_tasks_async, plan # --- Version --- diff --git a/src/controlflow/agents/agent.py b/src/controlflow/agents/agent.py index 438beb86..cf0c18b1 100644 --- a/src/controlflow/agents/agent.py +++ b/src/controlflow/agents/agent.py @@ -34,6 +34,7 @@ if TYPE_CHECKING: from controlflow.orchestration.turn_strategies import TurnStrategy + from controlflow.tasks import Task from controlflow.tools.tools import Tool logger = logging.getLogger(__name__) @@ -190,36 +191,61 @@ def __exit__(self, *exc_info): def run( self, objective: str, - *task_args, - agents: list["Agent"] = None, + *, turn_strategy: "TurnStrategy" = None, **task_kwargs, ): - agents = agents or [] + [self] - task = controlflow.Task( + return controlflow.run( objective=objective, - agents=agents, - *task_args, + agents=[self], + turn_strategy=turn_strategy, **task_kwargs, ) - return task.run(turn_strategy=turn_strategy) async def run_async( self, objective: str, - *task_args, - agents: list["Agent"] = None, + *, turn_strategy: "TurnStrategy" = None, **task_kwargs, ): - agents = agents or [] + [self] - task = controlflow.Task( + return await controlflow.run_async( objective=objective, - agents=agents, - *task_args, + agents=[self], + turn_strategy=turn_strategy, **task_kwargs, ) - return await task.run_async(turn_strategy=turn_strategy) + + def plan( + self, + objective: str, + instructions: Optional[str] = None, + agents: Optional[list["Agent"]] = None, + tools: Optional[list["Tool"]] = None, + context: Optional[dict] = None, + ) -> list["Task"]: + """ + Generate a list of tasks that represent a structured plan for achieving + the objective. + + Args: + objective (str): The objective to plan for. + instructions (Optional[str]): Optional instructions for the planner. + agents (Optional[list[Agent]]): Optional list of agents to include in the plan. If None, this agent is used. + tools (Optional[list[Tool]]): Optional list of tools to include in the plan. If None, this agent's tools are used. + context (Optional[dict]): Optional context to include in the plan. + + Returns: + list[Task]: A list of tasks that represent a structured plan for achieving the objective. + """ + return controlflow.tasks.plan( + objective=objective, + instructions=instructions, + agent=self, + agents=agents or [self], + tools=tools or [self.tools], + context=context, + ) def _run_model( self, diff --git a/src/controlflow/fns/__init__.py b/src/controlflow/fns/__init__.py new file mode 100644 index 00000000..bf3dea9e --- /dev/null +++ b/src/controlflow/fns/__init__.py @@ -0,0 +1,2 @@ +from .run import run, run_async, run_tasks, run_tasks_async +from .plan import plan diff --git a/src/controlflow/fns/plan.py b/src/controlflow/fns/plan.py new file mode 100644 index 00000000..9875e9f1 --- /dev/null +++ b/src/controlflow/fns/plan.py @@ -0,0 +1,156 @@ +from typing import Optional, TypeVar, Union + +from pydantic import Field + +import controlflow +from controlflow.agents import Agent +from controlflow.flows import Flow +from controlflow.tasks.task import Task +from controlflow.tools import Tool, as_tools +from controlflow.utilities.general import ControlFlowModel + +ToolLiteral = TypeVar("ToolLiteral", bound=str) + + +class PlanTask(ControlFlowModel): + id: int + objective: str = Field( + description="The objective of the task. This should be a concise statement of the task's purpose.", + ) + instructions: Optional[str] = Field( + None, + description="Any additional instructions for completing the task objective.", + ) + depends_on: list[int] = Field( + [], + description="Tasks that must be completed before this task can be started. Must be the id of one of the other tasks in the plan.", + ) + parent: Optional[int] = Field( + None, + description="The parent of this task (if any). Must be the id of one of the other tasks in the plan.", + ) + agents: list[int] = Field( + description="The agents assigned to the task. Must be the index of one of the agents provided in the plan_agents context variable.", + ) + tools: list[int] = Field( + [], + description="The tools provided to complete the task, if any. Must be the index of one of the tools provided in the plan_tools context variable.", + ) + + +def plan( + objective: str, + instructions: Optional[str] = None, + agent: Optional[Agent] = None, + agents: Optional[list[Agent]] = None, + tools: list[Union[callable, Tool]] = None, + context: Optional[dict] = None, + n_tasks: Optional[int] = None, +) -> list[Task]: + """ + Given an objective and instructions for achieving it, generate a plan for + completing the objective. Each step of the plan will be turned into a task + objective. + """ + tools = as_tools(tools or []) + + if agent is None: + agent = controlflow.defaults.agent + if not agents: + agents = [agent] + + agent_dict = dict(enumerate(agents)) + tool_dict = dict( + enumerate([t.dict(include={"name", "description"}) for t in tools]) + ) + + def validate_plan(plan: list[PlanTask]): + if n_tasks and len(plan) != n_tasks: + raise ValueError(f"Expected {n_tasks} tasks, got {len(plan)}") + for task in plan: + if any(a not in agent_dict for a in task.agents): + raise ValueError( + f"Not all agents in task {task.id} are valid: {task.agents}" + ) + if any(t not in tool_dict for t in task.tools): + raise ValueError( + f"Not all tools in task {task.id} are valid: {task.tools}" + ) + return plan + + plan_task = Task( + objective=""" + Create a plan consisting of ControlFlow tasks that will allow agents + to achieve the provided objective. + """, + instructions=""" + Use your mark_successful tool to create the plan. Do not post a + message or talk out loud. + + If specified, the task must include exactly `number_of_tasks` tasks; + otherwise, follow your judgement or any additional instructions. + + Each task should be a discrete, actionable step that contributes to + the overall objective. Do not waste time on uneccessary or redundant + steps. Take tools and agent capabilities into account when creating + a task. Do not create unachievable tasks, like "search for X" if + your agent or tools do not have a search capability. You may, + however, create tasks that serve as discrete or useful checkpoints + for completing the overall objective. Do not create tasks for + "verifying" results unless you have agents or tools to deploy that + will truly lead to a differentiated outcome. + + When creating tasks, imagine that you had to complete the plan + yourself. What steps would you take? What tools would you use? What + information would you need? Remember that each task has a token cost + (both in its evaluation and needing to mark it complete), so try to + organize objectives by outcomes and dependencies, not by the actions + you'd need to take. + + - Use `depends_on` to indicate which tasks must be completed before + others can start. Tasks can only depend on tasks that come before + them in your plan. + - Use `parent` to indicate tasks that are subtasks of others. + - Assign agents and tools to tasks to help manage the plan. Try not + to assign agents unless they are needed. + - Don't create needless tasks like "document the findings." Only + create tasks whose results are useful checkpoints for completing + the overall objective. + + """, + context=dict( + plan_objective=objective, + plan_instructions=instructions, + plan_agents=agent_dict, + plan_tools=tool_dict, + number_of_tasks=n_tasks, + ) + | (context or {}), + agents=[agent] if agent else None, + result_type=list[PlanTask], + result_validator=validate_plan, + ) + + # create a new flow to avoid polluting the main flow's history + with Flow(): + plan: list[PlanTask] = plan_task.run() + + task_ids = {} + + for t in plan: + try: + task_tools = [tool_dict[i] for i in t.tools] + except KeyError: + task_tools = [] + + task_ids[t.id] = Task( + objective=t.objective, + instructions=t.instructions, + depends_on=[task_ids[i] for i in t.depends_on], + parent=task_ids[t.parent] if t.parent else None, + agents=[agent_dict[a] for a in t.agents] if t.agents else None, + tools=task_tools, + context=context or {}, + ) + + return list(task_ids.values()) diff --git a/src/controlflow/run.py b/src/controlflow/fns/run.py similarity index 73% rename from src/controlflow/run.py rename to src/controlflow/fns/run.py index 66951a3d..8dee2adb 100644 --- a/src/controlflow/run.py +++ b/src/controlflow/fns/run.py @@ -45,6 +45,7 @@ def run_tasks( tasks: list[Task], flow: Flow = None, turn_strategy: TurnStrategy = None, + raise_on_error: bool = False, **run_kwargs, ): """ @@ -52,13 +53,19 @@ def run_tasks( """ flow = flow or get_flow() or Flow() orchestrator = Orchestrator(tasks=tasks, flow=flow, turn_strategy=turn_strategy) - return orchestrator.run(**run_kwargs) + orchestrator.run(**run_kwargs) + + if raise_on_error: + errors = [f"- {t.friendly_name()}: {t.result}" for t in tasks if t.is_failed()] + if errors: + raise ValueError(f"{len(errors)} task(s) failed: " + "\n".join(errors)) async def run_tasks_async( tasks: list[Task], flow: Flow = None, turn_strategy: TurnStrategy = None, + raise_on_error: bool = False, **run_kwargs, ): """ @@ -66,4 +73,9 @@ async def run_tasks_async( """ flow = flow or get_flow() or Flow() orchestrator = Orchestrator(tasks=tasks, flow=flow, turn_strategy=turn_strategy) - return await orchestrator.run_async(**run_kwargs) + await orchestrator.run_async(**run_kwargs) + + if raise_on_error: + errors = [f"- {t.friendly_name()}: {t.result}" for t in tasks if t.is_failed()] + if errors: + raise ValueError(f"{len(errors)} task(s) failed: " + "\n".join(errors)) diff --git a/tests/test_run.py b/tests/fns/test_run.py similarity index 91% rename from tests/test_run.py rename to tests/fns/test_run.py index 2f82a61c..1a0f69d1 100644 --- a/tests/test_run.py +++ b/tests/fns/test_run.py @@ -2,15 +2,16 @@ import controlflow from controlflow.agents.agent import Agent +from controlflow.fns.run import run, run_async, run_tasks, run_tasks_async def test_run(): - result = controlflow.run("what's 2 + 2", result_type=int) + result = run("what's 2 + 2", result_type=int) assert result == 4 async def test_run_async(): - result = await controlflow.run_async("what's 2 + 2", result_type=int) + result = await run_async("what's 2 + 2", result_type=int) assert result == 4 @@ -51,7 +52,7 @@ def test_run_with_limits( max_calls_per_turn, expected_calls, ): - controlflow.run( + run( "send messages", max_calls_per_turn=max_calls_per_turn, max_turns=max_turns, @@ -74,7 +75,7 @@ async def test_run_async_with_limits( max_calls_per_turn, expected_calls, ): - await controlflow.run_async( + await run_async( "send messages", max_calls_per_turn=max_calls_per_turn, max_turns=max_turns, @@ -97,7 +98,7 @@ def test_run_task_with_limits( max_calls_per_turn, expected_calls, ): - controlflow.run_tasks( + run_tasks( tasks=[ controlflow.Task("send messages"), controlflow.Task("send messages"), @@ -123,7 +124,7 @@ async def test_run_task_async_with_limits( max_calls_per_turn, expected_calls, ): - await controlflow.run_tasks_async( + await run_tasks_async( tasks=[ controlflow.Task("send messages"), controlflow.Task("send messages"),