forked from marc-shade/TeamForgeAI
-
Notifications
You must be signed in to change notification settings - Fork 0
/
agent_interactions.py
263 lines (221 loc) · 14.8 KB
/
agent_interactions.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
# TeamForgeAI/agent_interactions.py
import time
import json # Import the json module
import re # Import the re module
import streamlit as st
from api_utils import send_request_to_ollama_api
from file_utils import load_skills
from skills.fetch_web_content import fetch_web_content
from skills.generate_sd_images import generate_sd_images
from skills.update_project_status import update_checklists # Updated import
from skills.summarize_project_status import summarize_project_status
from ui.discussion import update_discussion_and_whiteboard # Corrected import
from ui.utils import extract_keywords # Import extract_keywords
from ollama_llm import OllamaLLM # Import OllamaLLM from ollama_llm.py
from skills.web_search import web_search # Import web_search directly
from agent_creation import create_autogen_agent # Import create_autogen_agent
def process_agent_interaction(agent_index: int) -> None:
"""Handles the interaction with a selected agent."""
print("Processing agent interaction...")
print(
"Session state:", st.session_state
) # Log the session state when this function is called
# --- Use st.session_state.agents_data to access the agents ---
if "agents_data" not in st.session_state:
st.session_state.agents_data = []
agent_data = st.session_state.agents_data[agent_index] # Get the agent data
# Create an instance of OllamaConversableAgent from the agent_instance dictionary
agent_instance = create_autogen_agent(agent_data)
available_skills = load_skills() # Load available skills
selected_skill = agent_data.get("skill", []) # Get the agent's selected skill from agent_data, default to an empty list
# --- Check if the image generation skill should be triggered ---
if st.session_state.get("generate_image_trigger", False):
discussion_history = st.session_state.get("discussion_history", "")
generate_and_display_images(discussion_history) # Call the new function to handle multiple images
st.session_state["generate_image_trigger"] = False # Reset the trigger
return # Exit early after image generation
# --- Otherwise, proceed with regular agent interaction or other skills ---
agent_name = agent_data["config"]["name"] # Access from agent_data
agent_emoji = agent_data.get("emoji", "") # Get the agent's emoji
description = agent_data["description"] # Access from agent_data
user_request = st.session_state.get("user_request", "")
user_input = st.session_state.get("user_input", "")
rephrased_request = st.session_state.get("rephrased_request", "")
reference_url = st.session_state.get("reference_url", "")
url_content = fetch_web_content(reference_url) if reference_url else ""
# --- Construct the request based on the selected skill ---
request = f"""Act as the {agent_name} who {description}.
Original request was: {user_request}.
You are helping a team work on satisfying {rephrased_request}.
Additional input: {user_input}.
Reference URL content: {url_content}.
The discussion so far has been {st.session_state.discussion_history[-50000:]}."""
# --- Prepare the query based on the skill ---
if selected_skill: # If a skill is selected for the agent
if selected_skill[0] == "web_search":
keywords = extract_keywords(rephrased_request) + extract_keywords(
st.session_state.get("user_request", "") # Use the original user request instead of the formatted discussion history
)
query = " ".join(keywords)
# Call the web_search function directly
skill_result = web_search(query, st.session_state.discussion_history, st.session_state.agents_data, agent_instance.teachability) # Use agent_instance.teachability
response_text = f"Skill '{selected_skill[0]}' result: {skill_result}"
update_discussion_and_whiteboard(agent_name, response_text, user_input)
return
elif selected_skill[0] == "plot_diagram": # Update query for plot_diagram
query = '{}' # Pass an empty JSON string as a placeholder
request += f"\nYou have been tasked to use the '{selected_skill[0]}' skill. Analyze the discussion history and determine if there is any data that can be visualized as a diagram. If so, extract the relevant data, interpret keywords, numerical values, and patterns to generate a JSON string with the appropriate parameters for the 'plot_diagram' skill, and then use the skill to create the diagram. If no relevant data is found, or if the user has provided specific instructions for the diagram, follow those instructions instead. Remember to always provide a valid JSON string as parameters for the 'plot_diagram' skill, even if it's an empty dictionary '{{}}'."
elif selected_skill[0] in ["generate_agent_instructions", "update_project_status", "summarize_project_status"]: # Handle new skills
query = "" # These skills don't require a query
else: # Handle other skills
query = user_input
request += f"\nYou have been tasked to use the '{selected_skill[0]}' skill with the following input: '{query}'."
# --- If a skill other than generate_sd_images is selected, execute it ---
if selected_skill and selected_skill[0] != "generate_sd_images":
skill_function = available_skills[selected_skill[0]]
if selected_skill[0] in ["web_search", "fetch_web_content"]:
skill_result = skill_function(query=query, discussion_history=st.session_state.discussion_history, teachability=agent_instance.teachability) # Pass teachability to skill_function
elif selected_skill[0] == "plot_diagram":
skill_result = skill_function(query=query, discussion_history=st.session_state.discussion_history) # Pass the query to the skill function
else:
skill_result = skill_function(query=query, agents_data=st.session_state.agents_data, discussion_history=st.session_state.discussion_history) # Pass the query to the skill function
# --- Handle plot_diagram skill result ---
if selected_skill[0] == "plot_diagram":
if skill_result.startswith("Error:"):
st.error(skill_result)
else:
st.session_state.chart_data = skill_result # Store chart data in session state
st.session_state.trigger_rerun = True # Trigger a rerun to display the chart
return # Exit after executing the skill
if isinstance(skill_result, list):
formatted_results = "\n".join(
[f"- {title}: {url} ({snippet})" for title, url, snippet in skill_result]
)
response_text = formatted_results
else:
response_text = f"Skill '{selected_skill[0]}' result: {skill_result}"
update_discussion_and_whiteboard(agent_name, response_text, user_input)
return
# --- Add user input to the discussion history BEFORE sending to LLM ---
if user_input:
user_input_text = f"\n\n\n\n{user_input}\n\n"
st.session_state.discussion_history += user_input_text
st.session_state["trigger_rerun"] = True # Trigger a rerun to display the update
# Reset the UI update flag
st.session_state["update_ui"] = False
# --- If no skill is selected, get the agent's response from the LLM ---
if agent_data.get("enable_moa", False): # Access from agent_data
full_response = execute_moa_workflow(request, st.session_state.agents_data, agent_data, agent_instance) # Pass agent_data and agent_instance
else:
# Check if memory is enabled
if agent_data.get("enable_memory", False):
# Store the user input in the agent's memory using add_message
agent_instance.add_message("User", user_input) # Call add_message on the agent instance
response_generator = send_request_to_ollama_api(agent_name, request, agent_data=agent_data) # Pass agent_data
full_response = ""
for response_chunk in response_generator:
if 'done' in response_chunk and response_chunk['done']: # Check if the response is complete
response_text = response_chunk.get("response", "")
full_response += response_text
# --- Enforce image request format before updating discussion history ---
full_response = enforce_image_request_format(full_response)
break # Exit the loop since the response is complete
response_text = response_chunk.get("response", "")
full_response += response_text
# Update discussion history AFTER the response is complete
update_discussion_and_whiteboard(f"{agent_emoji} {agent_name}", full_response, user_input) # Add emoji to agent name
st.session_state["accumulated_response"] = full_response
st.session_state["trigger_rerun"] = True # Set the flag to trigger a rerun
st.session_state["form_agent_name"] = agent_name
st.session_state["form_agent_description"] = description
st.session_state["selected_agent_index"] = agent_index
# --- Update checklists after agent interaction ---
if "current_project" in st.session_state:
update_checklists(st.session_state.discussion_history, st.session_state.current_project)
st.session_state.current_project = st.session_state.current_project # Update session state
# --- Set the flag to trigger a rerun ---
st.session_state["trigger_rerun"] = True
def generate_and_display_images(discussion_history: str) -> None:
"""Generates images using the generate_sd_images skill and displays them."""
while True: # Keep generating images until there are no more scenes
image_paths = generate_sd_images(
discussion_history=discussion_history # Pass discussion_history to generate_sd_images
)
try:
# Assuming generate_sd_images returns a list of image file paths
time.sleep(1)
# Display the generated images
if image_paths is not None:
for image_path in image_paths:
with open(image_path, "rb") as file:
image_bytes = file.read()
st.image(image_bytes, caption=f"Generated Image: {image_path}")
else:
print(f"generate_sd_images did not return any images")
break # Exit the loop if no images were generated
except Exception as error:
print(f"Error executing generated code: {error}")
st.error(f"Error generating image: {error}")
break # Exit the loop if there was an error
def enforce_image_request_format(text: str) -> str:
"""
Enforces the standardized image request format in the agent's response.
:param text: The agent's response text.
:return: The text with image requests formatted correctly.
"""
image_request_pattern = r"(?:Image|Illustration|Visual):\s*(.*?)(?:\n|$)"
image_requests = re.findall(image_request_pattern, text)
for image_request in image_requests:
text = text.replace(f"Image: {image_request}", f"![Image Request]({image_request})")
text = text.replace(f"Illustration: {image_request}", f"![Image Request]({image_request})")
text = text.replace(f"Visual: {image_request}", f"![Image Request]({image_request})")
return text
def execute_moa_workflow(request: str, agents_data: list, current_agent: dict, agent_instance) -> str:
"""Executes the Mixture-of-Agents workflow."""
# Get the full discussion history
discussion_history = st.session_state.get("discussion_history", "")
# Separate proposers and aggregators
proposers = [agent for agent in agents_data if agent.get("moa_role") == "proposer"]
aggregators = [agent for agent in agents_data if agent.get("moa_role") == "aggregator"]
# Layer 1: Proposers generate initial responses
layer_1_outputs = []
for proposer in proposers:
proposer_emoji = proposer.get("emoji", "") # Get the proposer's emoji
print(f"🟢 Proposer: {proposer_emoji} {proposer['config']['name']}") # Log the proposer's name with emoji
# Create an instance of OllamaConversableAgent from the agent_instance dictionary
proposer_instance = create_autogen_agent(proposer)
# Include discussion history and user request in the prompt
proposer_prompt = f"""{discussion_history}\n{request}"""
# Check if memory is enabled for the proposer
if proposer.get("enable_memory", False):
# Store the user input in the agent's memory using add_message
proposer_instance.add_message("User", request) # Call add_message on the agent instance
response = proposer_instance.ollama_llm.generate_text(proposer_prompt)
layer_1_outputs.append(response)
print(f" Proposed Response: {response}") # Log the proposed response
# Subsequent layers: Aggregators refine responses
current_responses = layer_1_outputs
for i in range(2, 4): # Adjust the number of layers as needed
new_responses = []
for aggregator in aggregators:
aggregator_emoji = aggregator.get("emoji", "") # Get the aggregator's emoji
print(f"🟠 Aggregator (Layer {i}): {aggregator_emoji} {aggregator['config']['name']}") # Log the aggregator's name and layer with emoji
# Create an instance of OllamaConversableAgent from the agent_instance dictionary
aggregator_instance = create_autogen_agent(aggregator)
# Include discussion history and user request in the prompt
aggregator_prompt = f"""{discussion_history}\n{request}\n\nResponses from models:\n{chr(10).join([f'{j+1}. {response}' for j, response in enumerate(current_responses)])}"""
# Check if memory is enabled for the aggregator
if aggregator.get("enable_memory", False):
# Store the user input in the agent's memory using add_message
aggregator_instance.add_message("User", aggregator_prompt) # Call add_message on the agent instance
response = aggregator_instance.ollama_llm.generate_text(aggregator_prompt)
new_responses.append(response)
print(f" Aggregated Response (Layer {i}): {response}") # Log the aggregated response
current_responses = new_responses
# Final output: Use the current agent as the final aggregator
agent_emoji = current_agent.get("emoji", "") # Get the agent's emoji
print(f"🔴 Final Aggregator: {agent_emoji} {current_agent['config']['name']}") # Log the final aggregator's name with emoji
aggregate_prompt = f"""{discussion_history}\n{request}\n\nResponses from models:\n{chr(10).join([f'{j+1}. {response}' for j, response in enumerate(current_responses)])}"""
moa_response = agent_instance.ollama_llm.generate_text(aggregate_prompt)
print(f" Final MoA Response: {moa_response}") # Log the final MoA response
return moa_response