Skip to content

Commit

Permalink
Merge pull request #11 from small-thinking/add-agent-sketch
Browse files Browse the repository at this point in the history
Add agent and thought process sketch
  • Loading branch information
yxjiang authored Mar 10, 2024
2 parents d461e9b + b52b498 commit bdcfa32
Show file tree
Hide file tree
Showing 7 changed files with 298 additions and 165 deletions.
2 changes: 1 addition & 1 deletion polymind/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
from .core.tool import BaseTool
from .core.message import Message
from .core.action import BaseAction, CompositeAction, SequentialAction
from .core.task import BaseTask, CompositeTask, SequentialTask
109 changes: 0 additions & 109 deletions polymind/core/action.py

This file was deleted.

82 changes: 82 additions & 0 deletions polymind/core/agent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
from pydantic import BaseModel, Field
from polymind.core.message import Message
from polymind.core.tool import BaseTool
from abc import ABC, abstractmethod
from typing import Dict, Optional, TYPE_CHECKING


class ThoughtProcess(BaseModel, ABC):
"""The base class of the thought process.
In an agent system, a thought process is an object that can be used to perform a complex task.
It will breakdown a complex task into a series of simpler tasks and execute them.
And it will leverage tools (including LLM, data sources, code interpretor, etc.) to perform the tasks.
"""

thought_process_name: str
tools: Dict[str, BaseModel]

def __str__(self):
return self.thought_process_name

async def __call__(self, input: Message, agent: "Agent") -> Message:
"""Makes the instance callable, delegating to the execute method.
This allows the instance to be used as a callable object, simplifying the syntax for executing the thought process.
Args:
input (Message): The input message to the thought process.
Returns:
Message: The output message from the thought process.
"""
return await self._execute(input)

@abstractmethod
async def _execute(self, input: Message) -> Message:
"""Execute the thought process and return the result.
The derived class must implement this method to define the behavior of the thought process.
Args:
input (Message): The input to the thought process carried in a message.
Returns:
Message: The result of the thought process carried in a message.
"""
pass


class Agent(BaseModel):

agent_name: str
# Persona of the agent indicates the role of the agent.
persona: str
tools: Dict[str, BaseTool]
thought_process: Optional[ThoughtProcess] = None

def __str__(self):
return self.agent_name

def set_thought_process(self, thought_process: ThoughtProcess):
self.thought_process = thought_process

def _input_preprocess(self, input: Message) -> None:
"""Preprocess the input message before the agent starts working.
Now now the only thing to do is to add the persona to the input message.
"""
input.content["persona"] = self.persona

async def __call__(self, input: Message) -> Message:
"""Enable the agent to start working.
The actual processing is driven by the thought process.
Args:
input (Message): The input message to the agent.
Returns:
Message: The output message from the agent.
"""
if not self.thought_process:
raise ValueError(
"The thought process of the agent needs to be hooked first."
)
self._input_preprocess(input, self)
return await self.thought_process(input)
109 changes: 109 additions & 0 deletions polymind/core/task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
from pydantic import BaseModel, Field
from abc import ABC, abstractmethod
from polymind.core.message import Message
from polymind.core.tool import BaseTool
from typing import Dict, List


class BaseTask(BaseModel, ABC):
"""BaseTask is the base class of the task.
A Task is a stateful object that can leverage tools (an LLM is considered a tool) to perform a specific work.
In most cases, a Task is a logically unit of to fulfill an atomic work.
But sometimes, a complex task can be divided into multiple sub-tasks.
"""

task_name: str
tool: BaseTool

async def __call__(self, input: Message) -> Message:
"""Makes the instance callable, delegating to the execute method.
This allows the instance to be used as a callable object, simplifying the syntax for executing the task.
Args:
input (Message): The input message to the task.
Returns:
Message: The output message from the task.
"""
return await self._execute(input)

@abstractmethod
async def _execute(self, input: Message) -> Message:
"""Execute the task and return the result.
The derived class must implement this method to define the behavior of the task.
Args:
input (Message): The input to the task carried in a message.
Returns:
Message: The result of the task carried in a message.
"""
pass


class CompositeTask(BaseTask, ABC):
"""CompositeTask is a class that represents a composite task.
A composite task is a task that is composed of multiple sub-tasks.
"""

# Context is a message that is used to carry the state of the composite task.
context: Message = Field(default=Message(content={}))

@abstractmethod
def _get_next_task(self, input: Message) -> BaseTask:
"""Return the next sub-task to execute.
The derived class must implement this method to define the behavior of the composite task.
Args:
input (Message): The input to the composite task carried in a message.
context (Message): The context of the composite task carried in a message.
Returns:
BaseTask: The next sub-task to execute. None if there is no more sub-task to execute.
"""
pass

@abstractmethod
def _update_context(self) -> None:
"""Update the context of the composite task."""
pass

async def _execute(self, input: Message) -> Message:
"""Execute the composite task and return the result.
Args:
input (Message): The input to the composite task carried in a message.
Returns:
Message: The result of the composite task carried in a message.
"""
message = input
self._update_context()
task = self._get_next_task(message)
while task:
message = await task(message)
self._update_context()
task = self._get_next_task(message)
return message


class SequentialTask(CompositeTask):

tasks: List[BaseTask] = Field(default_factory=list)

def __init__(self, task_name: str, tool: BaseTool, tasks: List[BaseTask]):
super().__init__(task_name=task_name, tool=tool)
self.tasks = tasks

def _update_context(self) -> None:
if not bool(self.context.content):
self.context = Message(content={"idx": 0})
else:
self.context.content["idx"] += 1

def _get_next_task(self, input: Message) -> BaseTask:
if self.context.content["idx"] < len(self.tasks):
return self.tasks[self.context.content["idx"]]
else:
return None
55 changes: 0 additions & 55 deletions tests/polymind/core/test_action.py

This file was deleted.

Loading

0 comments on commit bdcfa32

Please sign in to comment.