Skip to content

Commit

Permalink
Add docstrings, update README
Browse files Browse the repository at this point in the history
  • Loading branch information
rlancemartin committed Sep 17, 2024
1 parent 8b527b9 commit 038bb9d
Show file tree
Hide file tree
Showing 6 changed files with 217 additions and 29 deletions.
61 changes: 48 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,21 @@
[![CI](https://github.com/langchain-ai/data-enrichment/actions/workflows/unit-tests.yml/badge.svg)](https://github.com/langchain-ai/data-enrichment/actions/workflows/unit-tests.yml)
[![Integration Tests](https://github.com/langchain-ai/data-enrichment/actions/workflows/integration-tests.yml/badge.svg)](https://github.com/langchain-ai/data-enrichment/actions/workflows/integration-tests.yml)

This is a starter project to help you get started with developing a data enrichment agent using [LangGraph](https://github.com/langchain-ai/langgraph) in [LangGraph Studio](https://github.com/langchain-ai/langgraph-studio).
Producing structured results (e.g., to populate a database or spreadsheet) from open-ended research (e.g., web research) is a common use case that LLM-powered agents are well-suited to handle. Here, we provide a general template for this kind of "data enrichment agent" agent using [LangGraph](https://github.com/langchain-ai/langgraph) in [LangGraph Studio](https://github.com/langchain-ai/langgraph-studio). It contains an example graph exported from `src/enrichment_agent/graph.py` that implements a research assistant capable of automatically gathering information on various topics from the web and structuring the results into a user-defined JSON format.

![Graph view in LangGraph studio UI](./static/studio.png)

It contains an example graph exported from `src/enrichment_agent/graph.py` that implements a research assistant capable of automatically gathering information on various topics from the web.
![Overview of agent](./static/overview.png)

## What it does

The enrichment agent:
The enrichment agent defined in `src/enrichment_agent/graph.py` performs the following steps:

1. Takes a research **topic** and requested **extraction_schema** as input. The
2. Searches the web for relevant information
1. Takes a research **topic** and requested **extraction_schema** as user input.
2. The `call_agent_model` graph node uses an LLM with bound tools (defined in `tools.py`) to perform web search (using [Tavily](https://tavily.com/)) or web scraping.
3. Reads and extracts key details from websites
4. Organizes the findings into the requested structured format
5. Validates the gathered information for completeness and accuracy

By default, it's set up to gather information based on the user-provided schema passed through the `extraction_schema` key in the state.
![Graph view in LangGraph studio UI](./static/studio.png)

## Getting Started

Expand Down Expand Up @@ -87,19 +85,56 @@ OPENAI_API_KEY=your-api-key
End setup instructions
-->
3. Customize whatever you'd like in the code.
4. Open the folder LangGraph Studio!
3. Consider a research topic and desired extraction schema.
As an example, here is a research topic we can consider.
```
"Top 5 chip providers for LLM Training"
```
And here is a desired extraction schema.
```json
"extraction_schema": {
"type": "object",
"properties": {
"companies": {
"type": "string",
"description": "Names of top chip providers for LLM training"
},
"technologies": {
"type": "string",
"description": "Brief summary of key chip technologies used for LLM training"
},
"market_share": {
"type": "string",
"description": "Overview of market share distribution among top providers"
},
"future_outlook": {
"type": "string",
"description": "Brief summary of future prospects and developments in the field"
}
},
"required": ["companies", "technologies", "market_share", "future_outlook"]
}
```
4. Open the folder LangGraph Studio, and input `topic` and `extraction_schema`.

![Results In Studio](./static/studio_example.png)

## How to customize

1. **Customize research targets**: Provide a custom `extraction_schema` when calling the graph to gather different types of information.
1. **Customize research targets**: Provide a custom JSON `extraction_schema` when calling the graph to gather different types of information.
2. **Select a different model**: We default to anthropic (sonnet-35). You can select a compatible chat model using `provider/model-name` via configuration. Example: `openai/gpt-4o-mini`.
3. **Customize the prompt**: We provide a default prompt in [prompts.py](./src/enrichment_agent/prompts.py). You can easily update this via configuration in the studio.
3. **Customize the prompt**: We provide a default prompt in [prompts.py](./src/enrichment_agent/prompts.py). You can easily update this via configuration.

For quick prototyping, these configurations can be set in the studio UI.

![Config In Studio](./static/config.png)

You can also quickly extend this template by:

- Adding new tools and API connections in [tools.py](./src/enrichment_agent/tools.py). These are just any python functions.
- Adding additional steps in [graph.py](./src/enrichment_agent/graph.py). Concerned about hallucinatio
- Adding additional steps in [graph.py](./src/enrichment_agent/graph.py).

## Development

Expand Down
137 changes: 127 additions & 10 deletions src/enrichment_agent/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,29 +19,62 @@
from enrichment_agent.utils import init_model

# Define the nodes


async def call_agent_model(
state: State, *, config: Optional[RunnableConfig] = None
) -> Dict[str, Any]:
"""Call the primary LLM to decide whether and how to continue researching."""
"""
Call the primary Language Model (LLM) to decide on the next research action.
This asynchronous function performs the following steps:
1. Initializes configuration and sets up the 'Info' tool, which is the user-defined extraction schema.
2. Prepares the prompt and message history for the LLM.
3. Initializes and configures the LLM with available tools.
4. Invokes the LLM and processes its response.
5. Handles the LLM's decision to either continue research or submit final info.
Args:
state (State): The current state of the research process, including topic and extraction schema.
config (Optional[RunnableConfig]): Configuration for the LLM, if provided.
Returns:
Dict[str, Any]: A dictionary containing:
- 'messages': List of response messages from the LLM.
- 'info': Extracted information if the LLM decided to submit final info, else None.
- 'loop_step': Incremented step count for the research loop.
Note:
- The function uses three tools: scrape_website, search, and a dynamic 'Info' tool.
- If the LLM calls the 'Info' tool, it's considered as submitting the final answer.
- If the LLM doesn't call any tool, a prompt to use a tool is appended to the messages.
"""

# Load configuration from the provided RunnableConfig
configuration = Configuration.from_runnable_config(config)

# Define the 'Info' tool, which is the user-defined extraction schema
info_tool = {
"name": "Info",
"description": "Call this when you have gathered all the relevant info",
"parameters": state.extraction_schema,
}

# Format the prompt defined in prompts.py with the extraction schema and topic
p = configuration.prompt.format(
info=json.dumps(state.extraction_schema, indent=2), topic=state.topic
)

# Create the messages list with the formatted prompt and the previous messages
messages = [HumanMessage(content=p)] + state.messages
raw_model = init_model(config)

# Initialize the raw model with the provided configuration and bind the tools
raw_model = init_model(config)
model = raw_model.bind_tools([scrape_website, search, info_tool], tool_choice="any")
response = cast(AIMessage, await model.ainvoke(messages))

# Initialize info to None
info = None

# Check if the response has tool calls
if response.tool_calls:
for tool_call in response.tool_calls:
if tool_call["name"] == "Info":
Expand All @@ -65,7 +98,6 @@ async def call_agent_model(
"loop_step": 1,
}


class InfoIsSatisfactory(BaseModel):
"""Validate whether the current extracted info is satisfactory and complete."""

Expand All @@ -80,7 +112,35 @@ class InfoIsSatisfactory(BaseModel):
async def reflect(
state: State, *, config: Optional[RunnableConfig] = None
) -> Dict[str, Any]:
"""Validate the quality of the data enrichment agent's calls."""
"""
Validate the quality of the data enrichment agent's output.
This asynchronous function performs the following steps:
1. Prepares the initial prompt using the main prompt template.
2. Constructs a message history for the model.
3. Prepares a checker prompt to evaluate the presumed info.
4. Initializes and configures a language model with structured output.
5. Invokes the model to assess the quality of the gathered information.
6. Processes the model's response and determines if the info is satisfactory.
Args:
state (State): The current state of the research process, including topic,
extraction schema, and gathered information.
config (Optional[RunnableConfig]): Configuration for the language model, if provided.
Returns:
Dict[str, Any]: A dictionary containing either:
- 'info': The presumed info if it's deemed satisfactory.
- 'messages': A list with a ToolMessage indicating an error or unsatisfactory result.
Raises:
ValueError: If the last message in the state is not an AIMessage with tool calls.
Note:
- This function acts as a quality check for the information gathered by the agent.
- It uses a separate language model invocation to critique the gathered info.
- The function can either approve the gathered info or request further research.
"""
p = prompts.MAIN_PROMPT.format(
info=json.dumps(state.extraction_schema, indent=2), topic=state.topic
)
Expand Down Expand Up @@ -135,21 +195,79 @@ async def reflect(
def route_after_agent(
state: State,
) -> Literal["reflect", "tools", "call_agent_model", "__end__"]:
"""Schedule the next node after the agent."""
"""
Schedule the next node after the agent's action.
This function determines the next step in the research process based on the
last message in the state. It handles three main scenarios:
1. Error recovery: If the last message is unexpectedly not an AIMessage.
2. Info submission: If the agent has called the "Info" tool to submit findings.
3. Continued research: If the agent has called any other tool.
Args:
state (State): The current state of the research process, including
the message history.
Returns:
Literal["reflect", "tools", "call_agent_model", "__end__"]:
- "reflect": If the agent has submitted info for review.
- "tools": If the agent has called a tool other than "Info".
- "call_agent_model": If an unexpected message type is encountered.
Note:
- The function assumes that normally, the last message should be an AIMessage.
- The "Info" tool call indicates that the agent believes it has gathered
sufficient information to answer the query.
- Any other tool call indicates that the agent needs to continue research.
- The error recovery path (returning "call_agent_model" for non-AIMessages)
serves as a safeguard against unexpected states.
"""
last_message = state.messages[-1]

# "If for some reason the last message is not an AIMessage (due to a bug or unexpected behavior elsewhere in the code),
# it ensures the system doesn't crash but instead tries to recover by calling the agent model again.
if not isinstance(last_message, AIMessage):
return "call_agent_model"
# If the "Into" tool was called, then the model provided its extraction output. Reflect on the result
if last_message.tool_calls and last_message.tool_calls[0]["name"] == "Info":
return "reflect"
# The last message is a tool call that is not "Info" (extraction output)
else:
return "tools"


def route_after_checker(
state: State, config: RunnableConfig
) -> Literal["__end__", "call_agent_model"]:
"""Schedule the next node after the checker."""
"""
Schedule the next node after the checker's evaluation.
This function determines whether to continue the research process or end it
based on the checker's evaluation and the current state of the research.
Args:
state (State): The current state of the research process, including
the message history, info gathered, and loop step count.
config (RunnableConfig): Configuration object containing settings like
the maximum number of allowed loops.
Returns:
Literal["__end__", "call_agent_model"]:
- "__end__": If the research process should terminate.
- "call_agent_model": If further research is needed.
The function makes decisions based on the following criteria:
1. If the maximum number of loops has been reached, it ends the process.
2. If no info has been gathered yet, it continues the research.
3. If the last message indicates an error or unsatisfactory result, it continues the research.
4. If none of the above conditions are met, it assumes the result is satisfactory and ends the process.
Note:
- The function relies on a Configuration object derived from the RunnableConfig.
- It checks the loop_step against max_loops to prevent infinite loops.
- The presence of info and its quality (determined by the checker) influence the decision.
- An error status in the last message triggers additional research.
"""
configurable = Configuration.from_runnable_config(config)
last_message = state.messages

Expand All @@ -164,7 +282,6 @@ def route_after_checker(
else:
return "__end__"


# Create the graph
workflow = StateGraph(
State, input=InputState, output=OutputState, config_schema=Configuration
Expand Down
48 changes: 42 additions & 6 deletions src/enrichment_agent/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,27 @@
async def search(
query: str, *, config: Annotated[RunnableConfig, InjectedToolArg]
) -> Optional[list[dict[str, Any]]]:
"""Search for general results.
This function performs a search using the Tavily search engine, which is designed
to provide comprehensive, accurate, and trusted results. It's particularly useful
for answering questions about current events.
"""
Perform a general web search using the Tavily search engine.
This asynchronous function executes the following steps:
1. Extracts configuration from the provided RunnableConfig.
2. Initializes a TavilySearchResults object with a maximum number of results.
3. Invokes the Tavily search with the given query.
4. Returns the search results as a list of dictionaries.
Args:
query (str): The search query string.
config (RunnableConfig): Configuration object containing search parameters.
Returns:
Optional[list[dict[str, Any]]]: A list of search result dictionaries, or None if the search fails.
Each dictionary typically contains information like title, url, content snippet, etc.
Note:
This function uses the Tavily search engine, which is designed for comprehensive
and accurate results, particularly useful for current events and factual queries.
The maximum number of results is determined by the configuration.
"""
configuration = Configuration.from_runnable_config(config)
wrapped = TavilySearchResults(max_results=configuration.max_search_results)
Expand Down Expand Up @@ -56,7 +72,27 @@ async def scrape_website(
state: Annotated[State, InjectedState],
config: Annotated[RunnableConfig, InjectedToolArg],
) -> str:
"""Scrape and summarize content from a given URL."""
"""
Scrape and summarize content from a given URL.
This asynchronous function performs the following steps:
1. Fetches the content of the specified URL.
2. Formats a prompt using the fetched content and the extraction schema from the state.
3. Initializes a language model using the provided configuration.
4. Invokes the model with the formatted prompt to summarize the content.
Args:
url (str): The URL of the website to scrape.
state (State): Injected state containing the extraction schema.
config (RunnableConfig): Configuration for initializing the language model.
Returns:
str: A summary of the scraped content, tailored to the extraction schema.
Note:
The function uses aiohttp for asynchronous HTTP requests and assumes the
existence of a _INFO_PROMPT template and an init_model function.
"""
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
content = await response.text()
Expand Down
Binary file added static/config.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added static/overview.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added static/studio_example.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.

0 comments on commit 038bb9d

Please sign in to comment.