From 15b95115c7a0110331b02e4976d324b06062b159 Mon Sep 17 00:00:00 2001 From: digitallysavvy Date: Wed, 2 Oct 2024 20:04:56 -0400 Subject: [PATCH] updated agent.py guide --- shared/open-ai-integration/quickstart.mdx | 164 +++++++++++----------- 1 file changed, 83 insertions(+), 81 deletions(-) diff --git a/shared/open-ai-integration/quickstart.mdx b/shared/open-ai-integration/quickstart.mdx index a5f1bc2ad..1d33dc781 100644 --- a/shared/open-ai-integration/quickstart.mdx +++ b/shared/open-ai-integration/quickstart.mdx @@ -202,6 +202,8 @@ from .realtimeapi.struct import ( ResponseOutputItemAdded, ResponseOutputItemDone, ServerVADUpdateParams, + SessionUpdate, + SessionUpdateParams, SessionUpdated, Voices, to_json @@ -327,7 +329,7 @@ The `setup_and_run_agent` method connects to an Agora channel using `RtcEngine`, try: async with RealtimeApiClient( - base_uri=os.getenv("REALTIME_API_BASE_URI", "wss://api.openai.com"), + base_uri="wss://api.openai.com", api_key=os.getenv("OPENAI_API_KEY"), verbose=False, ) as client: @@ -363,7 +365,7 @@ The `setup_and_run_agent` method connects to an Agora channel using `RtcEngine`, finally: await channel.disconnect() - await client.shutdown() + await client.shutdown() ``` ### Initialize the RealtimeKitAgent @@ -374,11 +376,11 @@ The constructor for `RealtimeKitAgent` sets up the OpenAI client, optional tools def __init__( self, *, - client: RealtimeApiClient, + connection: RealtimeApiConnection, tools: ToolContext | None, channel: Channel, ) -> None: - self.client = client + self.connection = connection self.tools = tools self._client_tool_futures = {} self.channel = channel @@ -485,7 +487,7 @@ Replace the `rtc_to_model` placeholder with the following implementation in `age async for audio_frame in audio_frames: # Process received audio (send to model) _monitor_queue_size(self.audio_queue, "audio_queue") - await self.client.send_audio_data(audio_frame.data) + await self.connection.send_audio_data(audio_frame.data) # Write PCM data if enabled await pcm_writer.write(audio_frame.data) @@ -525,15 +527,25 @@ Replace the `model_to_rtc` placeholder with the following implementation: # Write PCM data if enabled await pcm_writer.write(frame) + + except asyncio.CancelledError: + # Write any remaining PCM data before exiting + await pcm_writer.flush() + raise # Re-raise the cancelled exception to properly exit the task ``` #### Process model messages -In addition to handling audio streaming, the agent must process messages received from the OpenAI model. The `_process_model_messages` method listens for these messages and takes appropriate actions based on the type of message, such as audio responses, transcripts, and various model-generated outputs. +In addition to handling audio streaming, the agent must process messages received from the OpenAI model. Message processing in `RealtimeKitAgent` is central to how the agent interacts with OpenAI’s model and the Agora channel. Messages received from the model can include audio data, text transcripts, or other responses, and the agent needs to process these accordingly to ensure smooth real-time communication. -The code implements the following key features: +The `_process_model_messages` method listens for incoming messages and handles them according to their type, ensuring the appropriate action is taken, such as playing back audio, sending text transcripts, or invoking tools. -- **Message handling**: The method listens for various message types, including audio data, text transcripts, and other outputs, and processes them accordingly. +Key features implemented by `_process_model_messages`: + +- **Listening for messages**: The agent continuously listens for incoming messages from OpenAI’s model. +- **Handling audio data**: If the message contains audio data, it is placed in a queue for playback to the Agora channel. +- **Handling transcripts**: If the message contains partial or final text transcripts, they are processed and sent to the Agora chat. +- **Handling other responses**: Additional message types, such as tool invocations and other outputs are processed as needed. ```python async def _process_model_messages(self) -> None: @@ -543,121 +555,123 @@ The code implements the following key features: # Handle different message types ``` -- **Queue management**: For audio messages, the data is decoded and placed in the audio queue for playback. -- **Real-time response**: Text messages and other outputs are immediately sent back to the Agora chat. - ### Audio and message flow -The agent manages real-time audio and message flow between Agora and OpenAI as follows: - -- `rtc_to_model`: Continuously captures audio from the Agora channel and streams it to OpenAI. -- `model_to_rtc`: Retrieves audio responses from OpenAI and plays them back in real-time. -- `_process_model_messages`: Listens for and processes various message types, such as audio and transcripts and ensures timely delivery to the Agora channel. +The first case in our `_process_model_messages` method is `InputAudioBufferSpeechStarted`. When this event is triggered, the system clears the sender’s audio buffer on the Agora channel and empties the local audio queue to ensure no prior audio interferes with the new input. It also logs the event for tracking purposes, allowing the agent to effectively manage and process incoming audio streams. -### Message processing - -The message processing logic in `RealtimeKitAgent` is central to how the agent interacts with OpenAI’s model and the Agora channel. Messages received from the model can include audio data, text transcripts, or other responses, and the agent needs to process these accordingly to ensure smooth real-time communication. - -The `_process_model_messages` method listens for incoming messages and handles them according to their type, ensuring the appropriate action is taken, such as playing back audio, sending text transcripts, or invoking tools. - -The code implements the following key features: - -- **Listening for messages**: The agent continuously listens for incoming messages from OpenAI’s model. -- **Handling audio data**: If the message contains audio data, it is placed in a queue for playback to the Agora channel. -- **Handling transcripts**: If the message contains partial or final text transcripts, they are processed and sent to the Agora chat. -- **Handling other responses**: Additional message types, such as tool invocations and other outputs are processed as needed. +``` +case InputAudioBufferSpeechStarted(): + await self.channel.clear_sender_audio_buffer() + # clear the audio queue so audio stops playing + while not self.audio_queue.empty(): + self.audio_queue.get_nowait() + logger.info(f"TMS:InputAudioBufferSpeechStarted: item_id: {message.item_id}") -#### Handling Text Transcripts +``` -The agent receives partial or completed text transcripts. These are identified and handled by their message types: +#### Response Messages -- `ResponseAudioTranscriptDelta`: Represents partial transcripts. -- `ResponseAudioTranscriptDone`: Indicates a completed transcript. +The `_process_model_messages` method is also responsible for handling both audio and text responses in real time. It processes various message types by managing audio data and sending text messages to the Agora chat channel. -For both types, the agent sends the transcript to the Agora chat as a message. +When an audio delta message is received, the system decodes the audio and adds it to the local audio queue for playback, while also logging the event for reference. For transcript updates, the agent sends the corresponding text message to the chat asynchronously, ensuring that message handling does not block other processes. Finally, when the transcript is complete, the system logs the event and sends the final message to the Agora chat. ```python -case messages.ResponseAudioTranscriptDelta(): - logger.info(f"Received text message {message=}") +case ResponseAudioDelta(): + # logger.info("Received audio message") + self.audio_queue.put_nowait(base64.b64decode(message.delta)) + # loop.call_soon_threadsafe(self.audio_queue.put_nowait, base64.b64decode(message.delta)) + logger.info(f"TMS:ResponseAudioDelta: response_id:{message.response_id},item_id: {message.item_id}") + +case ResponseAudioTranscriptDelta(): + # logger.info(f"Received text message {message=}") asyncio.create_task(self.channel.chat.send_message( ChatMessage( - message=message.model_dump_json(), msg_id=message.item_id + message=to_json(message), msg_id=message.item_id ) )) -case messages.ResponseAudioTranscriptDone(): +case ResponseAudioTranscriptDone(): logger.info(f"Text message done: {message=}") asyncio.create_task(self.channel.chat.send_message( ChatMessage( - message=message.model_dump_json(), msg_id=message.item_id + message=to_json(message), msg_id=message.item_id ) )) ``` -#### Handling Other Responses +#### Handling Message Responses -The agent handles a variety of other message types from OpenAI’s model. These include tool calls, errors, or other output from the model. In the event of an unhandled message type, the agent logs a warning for further investigation. +Below is the full implementation, where the examples from previous sections. In the full implementation, audio input events are handled by clearing audio buffers and queues when speech starts or stops. Audio deltas are decoded and placed into the local queue, while transcript messages are sent asynchronously to the Agora chat. -Replace the `_process_model_messages` placeholder with the following implementation: +The agent can also be extended to support a variety of other message types, including tool calls, errors, and other outputs from OpenAI’s model. If the agent encounters an unhandled message type, it logs a warning to notify developers for further investigation. ```python async def _process_model_messages(self) -> None: - async for message in self.client.listen(): + async for message in self.connection.listen(): # logger.info(f"Received message {message=}") match message: - case messages.ResponseAudioDelta(): + case InputAudioBufferSpeechStarted(): + await self.channel.clear_sender_audio_buffer() + # clear the audio queue so audio stops playing + while not self.audio_queue.empty(): + self.audio_queue.get_nowait() + logger.info(f"TMS:InputAudioBufferSpeechStarted: item_id: {message.item_id}") + + case InputAudioBufferSpeechStopped(): + logger.info(f"TMS:InputAudioBufferSpeechStopped: item_id: {message.item_id}") + pass + + case ResponseAudioDelta(): # logger.info("Received audio message") self.audio_queue.put_nowait(base64.b64decode(message.delta)) # loop.call_soon_threadsafe(self.audio_queue.put_nowait, base64.b64decode(message.delta)) logger.info(f"TMS:ResponseAudioDelta: response_id:{message.response_id},item_id: {message.item_id}") - case messages.ResponseAudioTranscriptDelta(): - logger.info(f"Received text message {message=}") + + case ResponseAudioTranscriptDelta(): + # logger.info(f"Received text message {message=}") asyncio.create_task(self.channel.chat.send_message( ChatMessage( - message=message.model_dump_json(), msg_id=message.item_id + message=to_json(message), msg_id=message.item_id ) )) - case messages.ResponseAudioTranscriptDone(): + + case ResponseAudioTranscriptDone(): logger.info(f"Text message done: {message=}") asyncio.create_task(self.channel.chat.send_message( ChatMessage( - message=message.model_dump_json(), msg_id=message.item_id + message=to_json(message), msg_id=message.item_id ) )) - case messages.InputAudioBufferSpeechStarted(): - await self.channel.clear_sender_audio_buffer() - # clear the audio queue so audio stops playing - while not self.audio_queue.empty(): - self.audio_queue.get_nowait() - logger.info(f"TMS:InputAudioBufferSpeechStarted: item_id: {message.item_id}") - case messages.InputAudioBufferSpeechStopped(): - pass + # InputAudioBufferCommitted - case messages.InputAudioBufferCommitted(): + case InputAudioBufferCommitted(): pass - # ItemCreated - case messages.ItemCreated(): + case ItemCreated(): pass # ResponseCreated - case messages.ResponseCreated(): + case ResponseCreated(): pass # ResponseDone - case messages.ResponseDone(): + case ResponseDone(): pass # ResponseOutputItemAdded - case messages.ResponseOutputItemAdded(): + case ResponseOutputItemAdded(): pass - # ResponseContentPartAdded - case messages.ResponseContentPartAdded(): + # ResponseContenPartAdded + case ResponseContentPartAdded(): pass # ResponseAudioDone - case messages.ResponseAudioDone(): + case ResponseAudioDone(): pass # ResponseContentPartDone - case messages.ResponseContentPartDone(): + case ResponseContentPartDone(): pass # ResponseOutputItemDone - case messages.ResponseOutputItemDone(): + case ResponseOutputItemDone(): + pass + case SessionUpdated(): + pass + case RateLimitsUpdated(): pass case _: logger.warning(f"Unhandled message {message=}") @@ -688,7 +702,7 @@ async def wait_for_remote_user(channel: Channel) -> int: channel.once("user_joined", lambda conn, user_id: future.set_result(user_id)) try: - # Wait for the remote user with a timeout + # Wait for the remote user with a timeout of 30 seconds remote_user = await asyncio.wait_for(future, timeout=15.0) return remote_user except KeyboardInterrupt: @@ -699,21 +713,9 @@ async def wait_for_remote_user(channel: Channel) -> int: raise ``` -### Add model to RealtimeApiClient - -Modify the `realtimeapi/client.py` file to include the model name as part of the `self.url`. This ensures the appropriate model is used when interacting with the OpenAI API. - -Update the `self.url` definition as follows: - -```python -self.url = f"{base_uri}{path}?model={os.environ.get('OPENAI_MODEL')}" -``` - -This adjustment ensures that the model specified in your environment variables `OPENAI_MODEL` is included in the API requests made by the `RealtimeApiClient`. - ### Tool Management -The tool management system extends the agents functionality by allowing OpenAI’s model to invoke specific tools. These tools can either run locally or pass data back to the model for further processing. By registering tools and executing them based on incoming messages, the agent adds the capability and flexibility to handling a variety of tasks. +Every agent needs a tool management system to extend the agent's functionality by allowing OpenAI’s model to invoke specific tools. These tools can either run locally or pass data back to the model for further processing. By registering tools and executing them based on incoming messages, the agent adds the capability and flexibility to handling a variety of tasks. Tool management implements the following key features: