-
Notifications
You must be signed in to change notification settings - Fork 0
/
agent.py
248 lines (198 loc) · 10.8 KB
/
agent.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
import os
from dotenv import load_dotenv
from haystack_experimental.dataclasses import ChatMessage, ToolCallResult, ToolCall
from haystack_experimental.components.generators.chat import OpenAIChatGenerator
from haystack_experimental.core import AsyncPipeline
from haystack.utils import Secret
from custom_components.chat_tool_invoker import ChatToolInvoker
from custom_components.openai_agent import OpenAIAgent
from custom_components.agent_visualizer import AgentVisualizer
from tools import get_tools
from typing import AsyncGenerator
import asyncio
from asyncio import Queue
from haystack.dataclasses import StreamingChunk
_pipeline = None
_tools = None
_car_simulation_agent = None
_car_simulation_tools = None
class ChunkCollector:
"""
Collector that stores chunks in an async queue.
"""
def __init__(self):
self.queue = Queue()
async def generator(self) -> AsyncGenerator[str, None]:
"""
Generate chunks from the queue.
:returns: AsyncGenerator yielding string chunks
"""
while True:
chunk = await self.queue.get()
if chunk is None:
break
yield chunk
async def collect_chunk(queue: Queue, chunk: StreamingChunk):
"""
Collect chunks and store them in the queue.
:param queue: Queue to store the chunks
:param chunk: StreamingChunk to be collected
"""
await queue.put(chunk.content)
def initialize_pipeline():
load_dotenv()
tools = get_tools()
tool_invoker = ChatToolInvoker(tools=tools)
generator = OpenAIChatGenerator(api_key=Secret.from_token(os.getenv("OPENAI_API_KEY")), model="gpt-4o")
llm = OpenAIAgent(generator=generator)
# Use AsyncPipeline instead of Pipeline
pipeline = AsyncPipeline()
pipeline.add_component("llm", llm)
pipeline.add_component("tool_invoker", tool_invoker)
pipeline.add_component("agent_visualizer", AgentVisualizer())
# Very simple agent loop
pipeline.connect("llm.tool_reply", "tool_invoker.messages")
pipeline.connect("tool_invoker.tool_messages", "llm.followup_messages")
pipeline.connect("llm.chat_history", "agent_visualizer.messages")
return pipeline, tools
def get_pipeline():
global _pipeline, _tools
if _pipeline is None or _tools is None:
_pipeline, _tools = initialize_pipeline()
return _pipeline, _tools
def convert_to_chat_message_objects(messages):
chat_message_objects = []
for msg in messages:
role = msg["role"]
content = msg["content"]
adapted_message = {
"_role": role,
"_content": [{"text": content}]
}
chat_message = ChatMessage.from_dict(adapted_message)
chat_message_objects.append(chat_message)
return chat_message_objects
async def query_pipeline(messages) -> AsyncGenerator[str, None]:
"""
Asynchronously query the pipeline and stream the response.
"""
request_collector = ChunkCollector()
pipeline, tools = get_pipeline()
# System message
system_message = """
Du bist ein agentisches RAG-System. Bearbeite Benutzerfragen in 3 Schritten:
1. **Umformulierung:** Nutze "umformulieren_anfrage" genau einmal zu Beginn, um die originale Frage des Benutzers an interne Begriffe oder Abkürzungen anzupassen.
2. **Zerlegungsregeln (nur wenn nötig):** Zerlege die umformulierte Frage ausschließlich, wenn:
- **Vergleich:** Ein expliziter Vergleich enthalten ist (z. B. „Was ist der Unterschied zwischen A und B?“).
- **Mehrere Themen:** Die Frage mehrere klar abgegrenzte Themen oder Aspekte umfasst, die unabhängig voneinander beantwortet werden können.
- **Keine Zerlegung:** Wenn die umformulierte Frage bereits präzise und semantisch sinnvoll gestellt ist (z. B. „Was ist die Geschichte von Brot?“), führe **keine Zerlegung** durch und nutze die gesamte Frage für die Suche.
**Beispiele für Zerlegung in verschiedene Suchanfragen des Tools "suche_interne_kenntnisse":**
- Ursprünglich: „Unterschied zwischen Zubereitung von Brot heute und früher?“
- Teilfragen:
1. „Wie wurde Brot früher zubereitet?“
2. „Wie wird Brot heute zubereitet?“
- Ursprünglich: „Wie ist die Geschichte von Brot und wie wird es heute industriell hergestellt?“
- Teilfragen:
1. „Was ist die Geschichte von Brot?“
2. „Wie wird Brot heute industriell hergestellt?“
3. **Suche:** Führe gezielte Suchanfragen mit "suche_interne_kenntnisse" auf Basis der Zerlegungsregeln durch:
- Ergänze keine neuen Informationen oder Aspekte, die nicht explizit in der umformulierten Frage stehen.
- Die Suchanfragen sollen präzise Teilfragen der ursprünglichen Frage sein, ohne Vermutungen oder zusätzliche Inhalte.
- Verwende 6 relevante Textfragmente pro Benutzerfrage (top_k). Nutze diese clever aus, so dass du sie geschickt aufteilst, falls du mehrere Suchanfrage auslöst.
- Wenn keine Ergebnisse gefunden werden, passe die Suchanfrage maximal zweimal an (Synonyme/Alternativen).
- Informiere den Benutzer bei ausbleibenden Ergebnissen: „Es konnten keine relevanten Informationen zu Ihrer Anfrage gefunden werden.“
- Führe also für eine zerlegte Suche iterativ einen Tool Call aus.
**Regeln:**
- Ergänze keine neuen Inhalte oder Details in den Suchanfragen. Die Suchanfragen basieren nur auf der ursprünglichen Frage oder deren Teilfragen.
- Zerlege die Frage nur, wenn ein Vergleich oder mehrere klar abgegrenzte Themen vorhanden sind.
- Vermeide redundante, generische oder irrelevante Teilfragen.
- Keine erfundenen Fakten, nutze nur Informationen aus der Frage.
"""
system_message = [ChatMessage.from_system(system_message)]
messages = convert_to_chat_message_objects(messages)
messages = system_message + messages
async def callback(chunk: StreamingChunk):
if isinstance(chunk.content, ChatMessage) and chunk.content._content:
for item in chunk.content._content:
if isinstance(item, ToolCallResult):
origin_call = item.origin
tool_name = origin_call.tool_name
arguments = origin_call.arguments
result = item.result
arg_table_rows = "\n".join(
f" | {key} | {value} |" for key, value in arguments.items()
)
md = (
f"**{tool_name}**\n"
"<details>\n"
" <summary>Click to expand</summary>\n\n"
" | Parameter | Value |\n"
" |-----------|-------|\n"
f"{arg_table_rows}\n\n"
f"{result}\n"
"</details>"
)
await request_collector.queue.put(md)
return
# Falls kein ToolCallResult gefunden wurde, normaler Ablauf
await collect_chunk(request_collector.queue, chunk)
input_data = {
"llm": {"messages": messages, "tools": tools, "streaming_callback": callback},
"agent_visualizer": {"tools": tools}
}
async def pipeline_runner():
async for content in pipeline.run(
data={
"llm": {"messages": messages, "tools": tools, "streaming_callback": callback},
"agent_visualizer": {"tools": tools},
},
):
print(f"chunk: {chunk}")# pass
await request_collector.queue.put(None)
asyncio.create_task(pipeline_runner())
async for chunk in request_collector.generator():
yield chunk
async def run_pipeline(messages):
pipeline, tools = get_pipeline()
system_message = """
Du bist ein agentisches RAG-System. Bearbeite Benutzerfragen in 3 Schritten:
1. **Umformulierung:** Nutze "umformulieren_anfrage" genau einmal zu Beginn, um die originale Frage des Benutzers an interne Begriffe oder Abkürzungen anzupassen.
2. **Zerlegung (nur wenn nötig):** Zerlege die umformulierte Frage ausschließlich, wenn:
- **Vergleich:** Ein expliziter Vergleich enthalten ist (z. B. „Was ist der Unterschied zwischen A und B?“).
- **Mehrere Themen:** Die Frage mehrere klar abgegrenzte Themen oder Aspekte umfasst, die unabhängig voneinander beantwortet werden können.
- **Keine Zerlegung:** Wenn die umformulierte Frage bereits präzise und semantisch sinnvoll gestellt ist (z. B. „Was ist die Geschichte von Brot?“), führe **keine Zerlegung** durch und nutze die gesamte Frage für die Suche.
**Beispiele für Zerlegung:**
- Ursprünglich: „Unterschied zwischen Zubereitung von Brot heute und früher?“
- Teilfragen:
1. „Wie wurde Brot früher zubereitet?“
2. „Wie wird Brot heute zubereitet?“
- Ursprünglich: „Wie ist die Geschichte von Brot und wie wird es heute industriell hergestellt?“
- Teilfragen:
1. „Was ist die Geschichte von Brot?“
2. „Wie wird Brot heute industriell hergestellt?“
3. **Suche:** Führe gezielte Suchanfragen mit "suche_interne_kenntnisse" durch:
- Ergänze keine neuen Informationen oder Aspekte, die nicht explizit in der umformulierten Frage stehen.
- Die Suchanfragen sollen präzise Teilfragen der ursprünglichen Frage sein, ohne Vermutungen oder zusätzliche Inhalte.
- Plane maximal 6 relevante Textfragmente pro Benutzerfrage.
- Wenn keine Ergebnisse gefunden werden, passe die Suchanfrage maximal zweimal an (Synonyme/Alternativen).
- Informiere den Benutzer bei ausbleibenden Ergebnissen: „Es konnten keine relevanten Informationen zu Ihrer Anfrage gefunden werden.“
**Regeln:**
- Ergänze keine neuen Inhalte oder Details in den Suchanfragen. Die Suchanfragen basieren nur auf der ursprünglichen Frage oder deren Teilfragen.
- Zerlege die Frage nur, wenn ein Vergleich oder mehrere klar abgegrenzte Themen vorhanden sind.
- Vermeide redundante, generische oder irrelevante Teilfragen.
- Keine erfundenen Fakten, nutze nur Informationen aus der Frage.
"""
system_message = [ChatMessage.from_system(system_message)]
messages = convert_to_chat_message_objects(messages)
messages = system_message + messages
final_result = None
async for result in pipeline.run(
data={
"llm": {"messages": messages, "tools": tools},
"agent_visualizer": {"tools": tools},
},
# include_outputs_from=["llm", "tool_invoker"]
):
final_result = result
output = final_result["agent_visualizer"]["output"]
return output