From e29e6c549992619b0081f84f3252983e7a17529e Mon Sep 17 00:00:00 2001 From: digitallysavvy Date: Thu, 26 Sep 2024 17:29:48 -0400 Subject: [PATCH 1/4] updated title --- open-ai-integration/overview/product-overview.mdx | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/open-ai-integration/overview/product-overview.mdx b/open-ai-integration/overview/product-overview.mdx index 6115ab28c..29bd593d8 100644 --- a/open-ai-integration/overview/product-overview.mdx +++ b/open-ai-integration/overview/product-overview.mdx @@ -7,7 +7,7 @@ description: > --- Integrating Agora’s real-time audio communication with OpenAI’s Large Language Models (LLMs) unlocks the potential for powerful, interactive voice-based applications. By combining Agora’s robust real-time audio streaming capabilities with the conversational intelligence of OpenAI’s LLMs, you can create seamless voice-enabled experiences, such as voice-powered AI assistants or interactive dialogue systems. This integration enables dynamic, responsive audio interactions, enhancing user engagement across a broad range of use cases—from customer support bots to collaborative voice-driven applications. Most importantly, by combining the strengths of Agora and OpenAI, this integration enables the most natural form of language interaction, lowering the barrier for users to harness the power of AI and making advanced technologies more accessible than ever before. + From eb9226506d02f1c65e518edb874d73bbe3ccec9b Mon Sep 17 00:00:00 2001 From: digitallysavvy Date: Thu, 26 Sep 2024 17:37:19 -0400 Subject: [PATCH 2/4] updated env vars --- shared/open-ai-integration/quickstart.mdx | 141 +++++++++++----------- 1 file changed, 70 insertions(+), 71 deletions(-) diff --git a/shared/open-ai-integration/quickstart.mdx b/shared/open-ai-integration/quickstart.mdx index 3ee35c773..0c3c7c34b 100644 --- a/shared/open-ai-integration/quickstart.mdx +++ b/shared/open-ai-integration/quickstart.mdx @@ -24,41 +24,42 @@ Follow these steps to set up your Python integration project: 1. Create a new folder for the project. - ```bash - mkdir realtime-agent - cd realtime-agent/ + ```bash + mkdir realtime-agent + cd realtime-agent/ - ``` + ``` 1. Create the following structure for your project: - ``` - /realtime-agent - ├── __init__.py - ├── .env - ├── agent.py - ├── agora - │   ├── __init__.py - │   ├── requirements.txt - │   └── rtc.py - └── realtimeapi - ├── __init__.py - ├── client.py - ├── messages.py - └── util.py - ``` - - - This project uses the OpenAI [`realtimeapi-examples`](https://openai.com/api/) package.Download the project and unzip it into your `realtime-agent` folder. - - - The following descriptions provide an overview of the key files in the project: - - - `agent.py`: The primary script responsible for executing the `RealtimeKitAgent`. It integrates Agora's functionality from the `agora/rtc.py` module and OpenAI's capabilities from the `realtimeapi` package. - - `agora/rtc.py`: Contains an implementation of the server-side Agora Python Voice SDK. - - `realtimeapi/`: Contains the classes and methods that interact with OpenAI’s Realtime API. - - The [Complete code](#complete-integration-code) for `agent.py` and `rtc.py` is provided at the bottom of this page. + ``` + /realtime-agent + ├── __init__.py + ├── .env + ├── agent.py + ├── agora + │   ├── __init__.py + │   ├── requirements.txt + │   └── rtc.py + └── realtimeapi + ├── __init__.py + ├── client.py + ├── messages.py + └── util.py + ``` + + + This project uses the OpenAI [`realtimeapi-examples`](https://openai.com/api/) package.Download the project and unzip it into your + `realtime-agent` folder. + + + The following descriptions provide an overview of the key files in the project: + + - `agent.py`: The primary script responsible for executing the `RealtimeKitAgent`. It integrates Agora's functionality from the `agora/rtc.py` module and OpenAI's capabilities from the `realtimeapi` package. + - `agora/rtc.py`: Contains an implementation of the server-side Agora Python Voice SDK. + - `realtimeapi/`: Contains the classes and methods that interact with OpenAI’s Realtime API. + + The [Complete code](#complete-integration-code) for `agent.py` and `rtc.py` is provided at the bottom of this page. 1. Open your `.env` file and add the following keys: @@ -68,9 +69,6 @@ Follow these steps to set up your Python integration project: # OpenAI API key for authentication OPENAI_API_KEY=your_openai_api_key_here - - # API base URI for the Realtime API - REALTIME_API_BASE_URI=wss://api.openai.com ``` 1. Install the dependencies: @@ -88,7 +86,8 @@ The `RealtimeKitAgent` class integrates Agora's audio communication capabilities The `setup_and_run_agent` method sets up the `RealtimeKitAgent` by connecting to an Agora channel using the provided `RtcEngine` and initializing a session with the OpenAI Realtime API client. It sends configuration messages to set up the session and define conversation parameters, such as the system message and output audio format, before starting the agent's operations. The method uses asynchronous execution to handle both listening for the session start and sending conversation configuration updates concurrently. It ensures that the connection is properly managed and cleaned up after use, even in cases of exceptions, early exits, or shutdowns. -UIDs in the Python SDK are set using a string value. Agora recommends using only numerical values for UID strings to ensure compatibility with all Agora products and extensions. + UIDs in the Python SDK are set using a string value. Agora recommends using only numerical values for UID strings to ensure compatibility + with all Agora products and extensions. ```python @@ -363,14 +362,14 @@ logger = logging.getLogger(**name**) @dataclass(frozen=True, kw_only=True) class InferenceConfig: - """Configuration for the inference process.""" - system_message: str | None = None - turn_detection: messages.TurnDetectionTypes | None = None - voice: messages.Voices | None = None +"""Configuration for the inference process.""" +system_message: str | None = None +turn_detection: messages.TurnDetectionTypes | None = None +voice: messages.Voices | None = None @dataclass(frozen=True, kw_only=True) class LocalFunctionToolDeclaration: - """Declaration of a tool that can be called by the model, and runs a function locally on the tool context.""" +"""Declaration of a tool that can be called by the model, and runs a function locally on the tool context.""" name: str description: str @@ -389,7 +388,7 @@ class LocalFunctionToolDeclaration: @dataclass(frozen=True, kw_only=True) class PassThroughFunctionToolDeclaration: - """Declaration of a tool that can be called by the model, and is passed through the LiveKit client.""" +"""Declaration of a tool that can be called by the model, and is passed through the LiveKit client.""" name: str description: str @@ -411,19 +410,19 @@ ToolDeclaration = LocalFunctionToolDeclaration | PassThroughFunctionToolDeclarat @dataclass(frozen=True, kw_only=True) class LocalToolCallExecuted: - json_encoded_output: str +json_encoded_output: str @dataclass(frozen=True, kw_only=True) class ShouldPassThroughToolCall: - decoded_function_args: dict[str, Any] +decoded_function_args: dict[str, Any] # Type alias for tool execution results ExecuteToolCallResult = LocalToolCallExecuted | ShouldPassThroughToolCall class ToolContext(abc.ABC): - """Abstract base class for managing tool declarations and executions.""" - _tool_declarations: dict[str, ToolDeclaration] +"""Abstract base class for managing tool declarations and executions.""" +\_tool_declarations: dict[str, ToolDeclaration] def __init__(self) -> None: # TODO: This should be an ordered dict @@ -481,18 +480,18 @@ class ToolContext(abc.ABC): return [v.model_description() for v in self._tool_declarations.values()] class ClientToolCallResponse(BaseModel): - tool_call_id: str - result: dict[str, Any] | str | float | int | bool | None = None +tool_call_id: str +result: dict[str, Any] | str | float | int | bool | None = None class RealtimeKitAgent: - """Main agent class for handling real-time communication and processing.""" - engine: RtcEngine - channel: Channel - client: RealtimeApiClient - audio_queue: asyncio.Queue[bytes] = asyncio.Queue() - message_queue: asyncio.Queue[messages.ResonseAudioTranscriptionDelta] = asyncio.Queue() - message_done_queue: asyncio.Queue[messages.ResonseAudioTranscriptionDone] = asyncio.Queue() - tools: ToolContext | None = None +"""Main agent class for handling real-time communication and processing.""" +engine: RtcEngine +channel: Channel +client: RealtimeApiClient +audio_queue: asyncio.Queue[bytes] = asyncio.Queue() +message_queue: asyncio.Queue[messages.ResonseAudioTranscriptionDelta] = asyncio.Queue() +message_done_queue: asyncio.Queue[messages.ResonseAudioTranscriptionDone] = asyncio.Queue() +tools: ToolContext | None = None _client_tool_futures: dict[str, asyncio.Future[ClientToolCallResponse]] @@ -690,12 +689,12 @@ class RealtimeKitAgent: logger.warning(f"Unhandled message type: {message=}") async def shutdown(loop, signal=None): - """Gracefully shut down the application.""" - if signal: - print(f"Received exit signal {signal.name}...") +"""Gracefully shut down the application.""" +if signal: +print(f"Received exit signal {signal.name}...") tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()] - + print(f"Cancelling {len(tasks)} outstanding tasks") for task in tasks: task.cancel() @@ -703,22 +702,22 @@ async def shutdown(loop, signal=None): await asyncio.gather(*tasks, return_exceptions=True) loop.stop() -if __name__ == "__main__": # Load environment variables and run the agent - load_dotenv() - asyncio.run( - RealtimeKitAgent.entry_point( - engine=RtcEngine(appid="aab8b8f5a8cd4469a63042fcfafe7063"), - inference_config=InferenceConfig( - system_message="""\\ +if **name** == "**main**": # Load environment variables and run the agent +load_dotenv() +asyncio.run( +RealtimeKitAgent.entry_point( +engine=RtcEngine(appid="aab8b8f5a8cd4469a63042fcfafe7063"), +inference_config=InferenceConfig( +system_message="""\\ You are a helpful assistant. If asked about the weather, make sure to use the provided tool to get that information. \\ If you are asked a question that requires a tool, say something like "working on that" and don't provide a concrete response \\ until you have received the response to the tool call.\\ """, - voice=messages.Voices.Alloy, - turn_detection=messages.TurnDetectionTypes.SERVER_VAD, - ), - ) - ) +voice=messages.Voices.Alloy, +turn_detection=messages.TurnDetectionTypes.SERVER_VAD, +), +) +) `} From dfc6d35f65fb908a33738d6cfb093731fda8c6bc Mon Sep 17 00:00:00 2001 From: digitallysavvy Date: Thu, 26 Sep 2024 17:46:13 -0400 Subject: [PATCH 3/4] use env var --- shared/open-ai-integration/quickstart.mdx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/shared/open-ai-integration/quickstart.mdx b/shared/open-ai-integration/quickstart.mdx index 0c3c7c34b..7def74ef5 100644 --- a/shared/open-ai-integration/quickstart.mdx +++ b/shared/open-ai-integration/quickstart.mdx @@ -706,7 +706,7 @@ if **name** == "**main**": # Load environment variables and run the agent load_dotenv() asyncio.run( RealtimeKitAgent.entry_point( -engine=RtcEngine(appid="aab8b8f5a8cd4469a63042fcfafe7063"), +engine=RtcEngine(appid=os.getenv("AGORA_APP_ID")), inference_config=InferenceConfig( system_message="""\\ You are a helpful assistant. If asked about the weather, make sure to use the provided tool to get that information. \\ From ab3ceefeb179958653043771f175fcf0031c1f23 Mon Sep 17 00:00:00 2001 From: digitallysavvy Date: Thu, 26 Sep 2024 21:11:04 -0400 Subject: [PATCH 4/4] updated to match demo --- shared/open-ai-integration/quickstart.mdx | 814 +++++++--------------- 1 file changed, 235 insertions(+), 579 deletions(-) diff --git a/shared/open-ai-integration/quickstart.mdx b/shared/open-ai-integration/quickstart.mdx index 7def74ef5..eb73fb0a7 100644 --- a/shared/open-ai-integration/quickstart.mdx +++ b/shared/open-ai-integration/quickstart.mdx @@ -2,7 +2,7 @@ import CodeBlock from '@theme/CodeBlock'; import CodeRtcPy from '@docs/assets/code/open-ai-integration/rtc-py.mdx'; import Prerequisites from '@docs/shared/common/prerequisites/python.mdx'; -Integrating Agora’s real-time audio communication capabilities with OpenAI’s language models enables dynamic, conversational AI experiences. This guide shows you how to set up a Python project that combines Agora’s server-side Voice SDK with OpenAI’s API to create an interactive, voice-driven assistant. +Integrating Agora's real-time audio communication capabilities with OpenAI's language models enables dynamic, conversational AI experiences. This guide shows you how to set up a Python project that combines Agora's server-side Voice SDK with OpenAI's API to create an interactive, voice-driven assistant. ## Understand the tech @@ -20,75 +20,85 @@ The following figure illustrates the integration topology: ## Set up the project -Follow these steps to set up your Python integration project: - -1. Create a new folder for the project. - +This guide will walk through the [Agora Conversational Ai Demo](https://github.com/AgoraIO/agora-openai-converse) core elements of Integrating Agora's Python SDK with OpenAi's Realtime API: + +1. Download the [Agora Conversational Ai Demo code](https://github.com/AgoraIO/agora-openai-converse). + +1. The project is structured: + + ``` + /realtime_agent + ├── __init__.py + ├── agent.py + ├── agora + │   ├── __init__.py + │   ├── requirements.txt + │   ├── rtc.py + │   └── token_builder + │   ├── AccessToken2.py + │   ├── Packer.py + │   ├── RtcTokenBuilder2.py + │   └── realtimekit_token_builder.py + ├── parse_args.py + └── realtimeapi + ├── __init__.py + ├── call_tool.py + ├── client.py + ├── messages.py + ├── mic_to_websocket.py + ├── push_to_talk.py + ├── send_audio_to_websocket.py + └── util.py + ``` + + + This project uses the OpenAI [`realtimeapi-examples`](https://openai.com/api/) package. Download the project and unzip it into your `realtime-agent` folder. + + + Overview of key files: + + - `agent.py`: The primary script responsible for executing the `RealtimeKitAgent`. It integrates Agora's functionality from the `rtc.py` module and OpenAI's capabilities from the `realtimeapi` package. + - `rtc.py`: Contains an implementation of the server-side Agora Python Voice SDK. + - `parse_args.py`: Handles command-line argument parsing for the application. + - `realtimeapi/`: Contains the classes and methods that interact with OpenAI's Realtime API. + +1. Create the `.env` file by copying the `.env.example` in the root of the repo ```bash - mkdir realtime-agent - cd realtime-agent/ - - ``` - -1. Create the following structure for your project: - + cp .env.example .env ``` - /realtime-agent - ├── __init__.py - ├── .env - ├── agent.py - ├── agora - │   ├── __init__.py - │   ├── requirements.txt - │   └── rtc.py - └── realtimeapi - ├── __init__.py - ├── client.py - ├── messages.py - └── util.py - ``` - - - This project uses the OpenAI [`realtimeapi-examples`](https://openai.com/api/) package.Download the project and unzip it into your - `realtime-agent` folder. - - - The following descriptions provide an overview of the key files in the project: - - - `agent.py`: The primary script responsible for executing the `RealtimeKitAgent`. It integrates Agora's functionality from the `agora/rtc.py` module and OpenAI's capabilities from the `realtimeapi` package. - - `agora/rtc.py`: Contains an implementation of the server-side Agora Python Voice SDK. - - `realtimeapi/`: Contains the classes and methods that interact with OpenAI’s Realtime API. - - The [Complete code](#complete-integration-code) for `agent.py` and `rtc.py` is provided at the bottom of this page. - -1. Open your `.env` file and add the following keys: +1. Fill in the values for the environment variables: ```python # Agora RTC app ID - AGORA_APP_ID=your_agora_app_id + AGORA_APP_ID= + AGORA_APP_CERT= # OpenAI API key for authentication - OPENAI_API_KEY=your_openai_api_key_here + OPENAI_API_KEY= ``` -1. Install the dependencies: +1. Create a virtual environment and activate it: + ```bash + python3 -m venv venv && source venv/bin/activate + ``` +1. Install the required dependencies: ```bash pip install -r requirements.txt ``` +1. Run the demo server: + ```bash + python -m realtime_agent.agent --channel_name= --uid= + ``` + ## Implementation The `RealtimeKitAgent` class integrates Agora's audio communication capabilities with OpenAI's AI services. This class manages audio streams, handles communication with the OpenAI API, and processes AI-generated responses, providing a seamless conversational AI experience. ### Connect to Agora and OpenAI -The `setup_and_run_agent` method sets up the `RealtimeKitAgent` by connecting to an Agora channel using the provided `RtcEngine` and initializing a session with the OpenAI Realtime API client. It sends configuration messages to set up the session and define conversation parameters, such as the system message and output audio format, before starting the agent's operations. The method uses asynchronous execution to handle both listening for the session start and sending conversation configuration updates concurrently. It ensures that the connection is properly managed and cleaned up after use, even in cases of exceptions, early exits, or shutdowns. - - - UIDs in the Python SDK are set using a string value. Agora recommends using only numerical values for UID strings to ensure compatibility - with all Agora products and extensions. - +The `setup_and_run_agent` method sets up the `RealtimeKitAgent` by connecting to an Agora channel using the provided `RtcEngine` and initializing a session with the OpenAI Realtime API client. Here's the updated implementation with inline comments: ```python @classmethod @@ -96,27 +106,34 @@ async def setup_and_run_agent( cls, *, engine: RtcEngine, + options: RtcOptions, inference_config: InferenceConfig, tools: ToolContext | None, ) -> None: - # Connect to a channel using the provided RtcEngine - channel = await engine.connect(channelId="realtimekit_agora", uid="123") + # Create and connect to an Agora channel + channel = engine.create_channel(options) + await channel.connect() try: - # Create and enter a context manager for the RealtimeApiClient + # Initialize the OpenAI Realtime API client async with RealtimeApiClient( - base_uri=os.getenv("REALTIME_API_BASE_URI", "wss://api.openai.com"), + base_uri="wss://api.openai.com", api_key=os.getenv("OPENAI_API_KEY"), verbose=False, ) as client: - # Send a message to update the session configuration + # Update the session configuration await client.send_message( - messages.UpdateSessionConfig( - session=messages.SessionResource(), + messages.SessionUpdate( + session=messages.SessionUpdateParams( + turn_detection=inference_config.turn_detection, + tools=tools.model_description() if tools else None, + tool_choice="auto", + instructions=inference_config.system_message, + ) ) ) - # Concurrently wait for the start session message and send the conversation config + # Concurrently wait for the session to start and update the conversation config [start_session_message, _] = await asyncio.gather( *[ anext(client.listen()), @@ -131,29 +148,21 @@ async def setup_and_run_agent( ), ] ) - - # Ensure the received message is of the correct type - assert isinstance(start_session_message, messages.StartSession) - - # Print session information - print( + logger.info( f"Session started: {start_session_message.session.id} model: {start_session_message.session.model}" ) - # Create an instance of the agent + # Create and run the RealtimeKitAgent agent = cls( client=client, tools=tools, channel=channel, ) - - # Run the agent await agent.run() finally: - # Ensure disconnection and shutdown occur, even if an exception is raised - await engine.disconnect() - await shutdown(asyncio.get_event_loop()) + # Ensure the Agora engine is destroyed, even if an exception occurs + engine.destroy() ``` ### Initialize the RealtimeKitAgent @@ -168,107 +177,111 @@ def __init__( tools: ToolContext | None, channel: Channel, ) -> None: - self.client = client - self.tools = tools - self._client_tool_futures = {} - self.channel = channel + self.client = client # OpenAI Realtime API client + self.tools = tools # Optional tool context for function registration + self._client_tool_futures = {} # For managing asynchronous tool calls + self.channel = channel # Agora channel for audio communication + self.subscribe_user = None # Will store the user ID we're subscribing to ``` ### Launch the Agent -The `entry_point` method serves as the primary entry for launching the agent. It calls `setup_and_run_agent` with the necessary parameters, initializing the agent and activating its core functionalities. - -```python -@classmethod -async def entry_point( - cls, - *, - engine: RtcEngine, # The Agora RTC engine instance for audio streaming - inference_config: InferenceConfig, # Configuration for the AI inference (e.g., system message, voice) - tools: ToolContext | None = None, # Optional tool context for registering functions -) -> None: - # Call the method to set up and run the agent, passing in the necessary parameters - await cls.setup_and_run_agent( - engine=engine, inference_config=inference_config, tools=tools - ) -``` - -The asynchronous `run` method orchestrates the main operations of the `RealtimeKitAgent`. It manages audio streaming, processes tasks related to audio input, output, and model messages, and ensures exception handling is in place. +The `run` method orchestrates the main operations of the `RealtimeKitAgent`. It manages audio streaming, processes tasks related to audio input, output, and model messages, and ensures exception handling is in place. ```python async def run(self) -> None: - # Log unhandled exceptions that occur in tasks - def log_exception(t: asyncio.Task[Any]) -> None: - if not t.cancelled() and t.exception(): - logger.error( - "Unhandled exception", - exc_info=t.exception(), - ) - - # Future used to detect when the agent is disconnected - disconnected_future = asyncio.Future[None]() - - # Set the result for the disconnected future when the agent is disconnected - def _on_disconnected() -> None: - if not disconnected_future.done(): - disconnected_future.set_result(None) - - # Event listener for disconnection (commented out for now) - # self.room.on("disconnected", _on_disconnected) + try: + # Helper function to log unhandled exceptions in tasks + def log_exception(t: asyncio.Task[Any]) -> None: + if not t.cancelled() and t.exception(): + logger.error( + "unhandled exception", + exc_info=t.exception(), + ) + + logger.info("Waiting for remote user to join") + # Wait for a remote user to join the channel + self.subscribe_user = await wait_for_remote_user(self.channel) + logger.info(f"Subscribing to user {self.subscribe_user}") + # Subscribe to the audio of the joined user + await self.channel.subscribe_audio(self.subscribe_user) + + # Handle user leaving the channel + async def on_user_left(agora_rtc_conn: RTCConnection, user_id: int, reason: int): + logger.info(f"User left: {user_id}") + if self.subscribe_user == user_id: + self.subscribe_user = None + logger.info("Subscribed user left, disconnecting") + await self.channel.disconnect() + + self.channel.on("user_left", on_user_left) + + # Set up a future to track when the agent should disconnect + disconnected_future = asyncio.Future[None]() - # Start streaming audio input to the AI model, with exception logging - asyncio.create_task(self._stream_input_audio_to_model()).add_done_callback( - log_exception - ) + # Handle connection state changes + def callback(agora_rtc_conn: RTCConnection, conn_info: RTCConnInfo, reason): + logger.info(f"Connection state changed: {conn_info.state}") + if conn_info.state == 1: # Disconnected state + if not disconnected_future.done(): + disconnected_future.set_result(None) - # Start streaming audio output (synthesized responses) back to the users, with exception logging - asyncio.create_task( - self._stream_audio_queue_to_audio_output() - ).add_done_callback(log_exception) + self.channel.on("connection_state_changed", callback) - # Start processing model messages (e.g., transcriptions, updates), with exception logging - asyncio.create_task(self._process_model_messages()).add_done_callback( - log_exception - ) + # Start tasks for streaming audio and processing messages + asyncio.create_task(self._stream_input_audio_to_model()).add_done_callback( + log_exception + ) + asyncio.create_task( + self._stream_audio_queue_to_audio_output() + ).add_done_callback(log_exception) + asyncio.create_task(self._process_model_messages()).add_done_callback( + log_exception + ) - # Wait until the disconnection future is resolved, meaning the agent has disconnected - await disconnected_future - logger.info("Agent finished running") # Log that the agent has completed its operation + # Wait until the agent is disconnected + await disconnected_future + logger.info("Agent finished running") + except asyncio.CancelledError: + logger.info("Agent cancelled") ``` ### Stream input audio to the AI model -The asynchronous method `_stream_input_audio_to_model` captures audio frames from the Agora channel and sends them to the OpenAI API client for real-time processing by the AI model. +The `_stream_input_audio_to_model` method captures audio frames from the Agora channel and sends them to the OpenAI API client for real-time processing by the AI model. ```python async def _stream_input_audio_to_model(self) -> None: - # Retrieve audio frames from the Agora channel - audio_frames = self.channel.get_audio_frames() - - # Loop through each audio frame received from the channel + # Wait until we have a subscribed user + while self.subscribe_user is None: + await asyncio.sleep(0.1) + # Get the audio frame stream for the subscribed user + audio_frames = self.channel.get_audio_frames(self.subscribe_user) async for audio_frame in audio_frames: - # Send the audio frame's data to the AI model via the OpenAI API client - await self.client.send_audio_data(audio_frame.data) + try: + # Send the audio frame to the OpenAI model via the API client + await self.client.send_audio_data(audio_frame.data) + except Exception as e: + logger.error(f"Error sending audio data to model: {e}") ``` ### Stream audio from the AI model to the user -The asynchronous method `_stream_audio_queue_to_audio_output` handles the playback of processed audio data from the AI model. It retrieves audio frames from a queue and sends them to the Agora channel, allowing users to hear AI-generated responses in real-time. +The `_stream_audio_queue_to_audio_output` method handles the playback of processed audio data from the AI model. It retrieves audio frames from a queue and sends them to the Agora channel, allowing users to hear AI-generated responses in real-time. ```python async def _stream_audio_queue_to_audio_output(self) -> None: while True: - # Retrieve the next processed audio frame from the queue (AI model's response) + # Get the next audio frame from the queue (contains audio data from the model) frame = await self.audio_queue.get() - - # Send the audio frame to the Agora channel for playback to the user + # Send the frame to the Agora channel for playback to the user await self.channel.push_audio_frame(frame) - - # Yield control to allow other tasks to run, improving responsiveness - await asyncio.sleep(0) + await asyncio.sleep(0) # Allow other tasks to run ``` -The asynchronous method `_process_model_messages` listens for messages from the OpenAI API client and processes them based on their type. It handles a variety of message types, including audio deltas, transcriptions, and errors. The method updates the user chat with transcribed text, queues audio for playback, and manages other session-related events, such as tool calls and generation states. +### Process model messages + +The `_process_model_messages` method listens for messages from the OpenAI API client and processes them based on their type. It handles a variety of message types, including audio deltas, transcriptions, and errors. ```python async def _process_model_messages(self) -> None: @@ -277,477 +290,120 @@ async def _process_model_messages(self) -> None: # Process each type of message received from the client match message: case messages.ResponseAudioDelta(): - # Handle audio response deltas by decoding and adding them to the audio queue + # Process incoming audio data from the model await self.audio_queue.put(base64.b64decode(message.delta)) - case messages.ResponseAudioTranscriptionDelta(): - # Log and send transcribed text updates to the Agora chat channel + case messages.ResponseAudioTranscriptDelta(): + # Handle incoming transcription updates logger.info(f"Received text message {message=}") - await self.channel.chat.send_message(ChatMessage(message=message.delta, msg_id=message.output_item_id)) + await self.channel.chat.send_message(ChatMessage(message=message.model_dump_json(), msg_id=message.item_id)) - case messages.ResponseAudioTranscriptionDone(): - # Handle completion of transcription and send the final text message + case messages.ResponseAudioTranscriptDone(): + # Handle completed transcriptions logger.info(f"Text message done: {message=}") - await self.channel.chat.send_message(ChatMessage(message=message.value, msg_id=message.output_item_id, done=True)) + await self.channel.chat.send_message(ChatMessage(message=message.model_dump_json(), msg_id=message.item_id)) - case messages.MessageAdded(): - # Placeholder for handling other message types (currently not used) + case messages.InputAudioBufferSpeechStarted(): + # Handle the start of speech in the input audio pass - - case messages.ServerAddMessage(): - # Placeholder for handling server-side messages (currently not used) + case messages.InputAudioBufferSpeechStopped(): + # Handle the end of speech in the input audio pass - - case messages.VADSpeechStarted(): - # Placeholder for handling voice activity detection start + case messages.InputAudioBufferCommitted(): + # Handle when an input audio buffer is committed pass - - case messages.VADSpeechStopped(): - # Placeholder for handling voice activity detection stop + case messages.ItemCreated(): + # Handle when a new item is created in the conversation + pass + case messages.ResponseCreated(): + # Handle when a new response is created + pass + case messages.ResponseOutputItemAdded(): + # Handle when a new output item is added to the response + pass + case messages.ResponseContenPartAdded(): + # Handle when a new content part is added to the response + pass + case messages.ResponseAudioDone(): + # Handle when the audio response is complete + pass + case messages.ResponseContentPartDone(): + # Handle when a content part of the response is complete + pass + case messages.ResponseOutputItemDone(): + # Handle when an output item in the response is complete pass - - case messages.GenerationCanceled(): - # Log when a generation process is canceled - logger.info(f"Server turn canceled: {message=}") - - case messages.GenerationFinished(): - # Log when the generation process is finished (e.g., no more text appending) - logger.info(f"Server turn finished: {message=}") - # TODO: Implement behavior to mark generation completion - # await self.channel.generation_finished() - - case messages.AddContent(type=messages.AddContentType.TOOL_CALL): - # TODO: Implement streaming tool calls when a tool call is added to the content - logger.info(f"Received tool call buffer add {message=}") - - case messages.RealtimeError(error=error): - # Log any errors received from the OpenAI client - logger.error(f"Received error message {error=}") case _: # Log any unhandled or unknown message types logger.warning(f"Unhandled message {message=}") ``` -### Complete integration code - -The `agent.py` script integrates the code components presented in this section into reusable Python classes that you can extend for your own applications. - -
-Complete code for `agent.py` - -{`import abc -import asyncio -import base64 -import json -import logging -import os -from builtins import anext -from typing import Any, Callable, assert_never - -from attr import dataclass -from dotenv import load_dotenv -from pydantic import BaseModel - -from realtimekit.realtimeapi import messages -from realtimekit.realtimeapi.client import RealtimeApiClient - -from .agora.rtc import Channel, Chat, ChatMessage, RtcEngine - -# Logger configuration - -logger = logging.getLogger(**name**) - -# Data classes for configuration and tool declarations - -@dataclass(frozen=True, kw_only=True) -class InferenceConfig: -"""Configuration for the inference process.""" -system_message: str | None = None -turn_detection: messages.TurnDetectionTypes | None = None -voice: messages.Voices | None = None - -@dataclass(frozen=True, kw_only=True) -class LocalFunctionToolDeclaration: -"""Declaration of a tool that can be called by the model, and runs a function locally on the tool context.""" - - name: str - description: str - parameters: dict[str, Any] - function: Callable[..., Any] - - def model_description(self) -> dict[str, Any]: - return { - "type": "function", - "function": { - "name": self.name, - "description": self.description, - "parameters": self.parameters, - }, - } - -@dataclass(frozen=True, kw_only=True) -class PassThroughFunctionToolDeclaration: -"""Declaration of a tool that can be called by the model, and is passed through the LiveKit client.""" - - name: str - description: str - parameters: dict[str, Any] - - def model_description(self) -> dict[str, Any]: - return { - "type": "function", - "function": { - "name": self.name, - "description": self.description, - "parameters": self.parameters, - }, - } - -# Type alias for tool declarations - -ToolDeclaration = LocalFunctionToolDeclaration | PassThroughFunctionToolDeclaration - -@dataclass(frozen=True, kw_only=True) -class LocalToolCallExecuted: -json_encoded_output: str - -@dataclass(frozen=True, kw_only=True) -class ShouldPassThroughToolCall: -decoded_function_args: dict[str, Any] - -# Type alias for tool execution results - -ExecuteToolCallResult = LocalToolCallExecuted | ShouldPassThroughToolCall - -class ToolContext(abc.ABC): -"""Abstract base class for managing tool declarations and executions.""" -\_tool_declarations: dict[str, ToolDeclaration] - - def __init__(self) -> None: - # TODO: This should be an ordered dict - self._tool_declarations = {} - - def register_function( - self, - *, - name: str, - description: str = "", - parameters: dict[str, Any], - fn: Callable[..., Any], - ) -> None: - """Register a local function as a tool.""" - self._tool_declarations[name] = LocalFunctionToolDeclaration( - name=name, description=description, parameters=parameters, function=fn - ) - - def register_client_function( - self, - *, - name: str, - description: str = "", - parameters: dict[str, Any], - ) -> None: - """Register a client function as a tool.""" - self._tool_declarations[name] = PassThroughFunctionToolDeclaration( - name=name, description=description, parameters=parameters - ) - - async def execute_tool( - self, tool_name: str, encoded_function_args: str - ) -> ExecuteToolCallResult | None: - """Execute a tool based on its name and provided arguments.""" - tool = self._tool_declarations.get(tool_name) - if not tool: - return None - - args = json.loads(encoded_function_args) - assert isinstance(args, dict) - - if isinstance(tool, LocalFunctionToolDeclaration): - logger.info(f"Executing tool {tool_name} with args {args}") - result = await tool.function(**args) - logger.info(f"Tool {tool_name} executed with result {result}") - return LocalToolCallExecuted(json_encoded_output=json.dumps(result)) - - if isinstance(tool, PassThroughFunctionToolDeclaration): - return ShouldPassThroughToolCall(decoded_function_args=args) - - assert_never(tool) - - def model_description(self) -> list[dict[str, Any]]: - """Generate a description of all registered tools for the model.""" - return [v.model_description() for v in self._tool_declarations.values()] - -class ClientToolCallResponse(BaseModel): -tool_call_id: str -result: dict[str, Any] | str | float | int | bool | None = None - -class RealtimeKitAgent: -"""Main agent class for handling real-time communication and processing.""" -engine: RtcEngine -channel: Channel -client: RealtimeApiClient -audio_queue: asyncio.Queue[bytes] = asyncio.Queue() -message_queue: asyncio.Queue[messages.ResonseAudioTranscriptionDelta] = asyncio.Queue() -message_done_queue: asyncio.Queue[messages.ResonseAudioTranscriptionDone] = asyncio.Queue() -tools: ToolContext | None = None - - _client_tool_futures: dict[str, asyncio.Future[ClientToolCallResponse]] - - @classmethod - async def setup_and_run_agent( - cls, - *, - engine: RtcEngine, - inference_config: InferenceConfig, - tools: ToolContext | None, - ) -> None: - """Set up and run the agent with the provided configuration.""" - # Connect to a channel using the provided RtcEngine - channel = await engine.connect(channelId="realtimekit_agora", uid="123") - - try: - # Create and enter a context manager for the RealtimeApiClient - async with RealtimeApiClient( - base_uri=os.getenv("REALTIME_API_BASE_URI", "wss://api.openai.com"), - api_key=os.getenv("OPENAI_API_KEY"), - verbose=False, - ) as client: - # Send a message to update the session configuration - await client.send_message( - messages.UpdateSessionConfig( - session=messages.SessionResource(), - # turn_detection=inference_config.turn_detection, - # transcribe_input=False, - # input_audio_format=messages.AudioFormats.PCM16, - # vads=messages.VADConfig(), - ) - ) - - # Concurrently wait for the start session message and send the conversation config - [start_session_message, _] = await asyncio.gather( - *[ - anext(client.listen()), - client.send_message( - messages.UpdateConversationConfig( - system_message=inference_config.system_message, - output_audio_format=messages.AudioFormats.PCM16, - voice=inference_config.voice, - tools=tools.model_description() if tools else None, - transcribe_input=False, - ) - ), - ] - ) +### Main entry point - # Ensure the received message is of the correct type - assert isinstance(start_session_message, messages.StartSession) +The main entry point of the application sets up the Agora RTC engine, configures the options, and launches the RealtimeKitAgent. Here's the updated implementation with inline comments: - # Print session information - print( - f"Session started: {start_session_message.session.id} model: {start_session_message.session.model}" - ) - - # Create an instance of the agent - agent = cls( - client=client, - tools=tools, - channel=channel, - ) - - # Run the agent - await agent.run() - - finally: - # Ensure disconnection and shutdown occur, even if an exception is raised - await engine.disconnect() - await shutdown(asyncio.get_event_loop()) - - @classmethod - async def entry_point( - cls, - *, - engine: RtcEngine, - inference_config: InferenceConfig, - tools: ToolContext | None = None, - ) -> None: - """Entry point for setting up and running the agent.""" - await cls.setup_and_run_agent( - engine=engine, inference_config=inference_config, tools=tools - ) - - def __init__( - self, - *, - client: RealtimeApiClient, - tools: ToolContext | None, - channel: Channel, - ) -> None: - """Initialize the RealtimeKitAgent.""" - self.client = client - self.tools = tools - self._client_tool_futures = {} - self.channel = channel - - async def run(self) -> None: - """Main loop for running the agent.""" - def log_exception(t: asyncio.Task[Any]) -> None: - """Log unhandled exceptions from tasks.""" - if not t.cancelled() and t.exception(): - logger.error( - "Unhandled exception", - exc_info=t.exception(), - ) - - disconnected_future = asyncio.Future[None]() - - def _on_disconnected() -> None: - """Callback for when the agent is disconnected.""" - if not disconnected_future.done(): - disconnected_future.set_result(None) - - # self.room.on("disconnected", _on_disconnected) - - # Create and monitor tasks for streaming audio and processing messages - asyncio.create_task(self._stream_input_audio_to_model()).add_done_callback( - log_exception - ) - asyncio.create_task(self._stream_audio_queue_to_audio_output()).add_done_callback( - log_exception) - - asyncio.create_task(self._process_model_messages()).add_done_callback( - log_exception - ) - - await disconnected_future - logger.info("Agent finished running") - - async def _stream_input_audio_to_model(self) -> None: - """Stream input audio frames to the model.""" - audio_frames = self.channel.get_audio_frames() - async for audio_frame in audio_frames: - # send the frame to the model via the API client - await self.client.send_audio_data(audio_frame.data) - - async def _stream_audio_queue_to_audio_output(self) -> None: - """Stream audio from the queue to the audio output.""" - while True: - # audio queue contains audio data from the model, send it the end-user via our local audio source - frame = await self.audio_queue.get() - await self.channel.push_audio_frame(frame) - await asyncio.sleep(0) # allow other tasks to run - - async def _process_model_messages(self) -> None: - """Process messages received from the model.""" - async for message in self.client.listen(): - match message: - case messages.ResonseAudioDelta(): - # Process incoming audio data - await self.audio_queue.put(base64.b64decode(message.delta)) - - case messages.ResonseAudioTranscriptionDelta(): - logger.info(f"Received text transcription delta: {message=}") - await self.channel.chat.send_message(ChatMessage(message=message.delta, msg_id=message.output_item_id)) - - case messages.ResonseAudioTranscriptionDone(): - logger.info(f"Text transcription completed: {message=}") - await self.channel.chat.send_message(ChatMessage(message=message.value, msg_id=message.output_item_id, done=True)) - - case messages.MessageAdded(): - # Handle message addition event - pass - - case messages.ServerAddMessage(): - # Handle server message addition event - pass - - case messages.VADSpeechStarted(): - # Handle Voice Activity Detection speech start event - pass - - case messages.VADSpeechStopped(): - # Handle Voice Activity Detection speech stop event - pass - - case messages.GenerationCanceled(): - logger.info(f"Server generation canceled: {message=}") - - case messages.GenerationFinished(): - logger.info(f"Server generation finished: {message=}") - # TODO: Implement logic to mark the end of text appending - - case messages.AddContent(type=messages.AddContentType.TOOL_CALL): - logger.info(f"Received tool call content: {message=}") - # TODO: Implement streaming tool calls - - case messages.RealtimeError(error=error): - logger.error(f"Received error message: {error=}") - # TODO: Determine if session termination is necessary - - case _: - logger.warning(f"Unhandled message type: {message=}") - -async def shutdown(loop, signal=None): -"""Gracefully shut down the application.""" -if signal: -print(f"Received exit signal {signal.name}...") - - tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()] - - print(f"Cancelling {len(tasks)} outstanding tasks") - for task in tasks: - task.cancel() - - await asyncio.gather(*tasks, return_exceptions=True) - loop.stop() - -if **name** == "**main**": # Load environment variables and run the agent -load_dotenv() -asyncio.run( -RealtimeKitAgent.entry_point( -engine=RtcEngine(appid=os.getenv("AGORA_APP_ID")), -inference_config=InferenceConfig( -system_message="""\\ -You are a helpful assistant. If asked about the weather, make sure to use the provided tool to get that information. \\ -If you are asked a question that requires a tool, say something like "working on that" and don't provide a concrete response \\ -until you have received the response to the tool call.\\ +```python +if __name__ == "__main__": + # Load environment variables from .env file + load_dotenv() + + # Parse command line arguments + options = parse_args_realtimekit() + logger.info(f"app_id: channel_id: {options['channel_name']}, uid: {options['uid']}") + + # Ensure the Agora App ID is set + if not os.environ.get("AGORA_APP_ID"): + raise ValueError("Need to set environment variable AGORA_APP_ID") + + # Run the RealtimeKitAgent + asyncio.run( + RealtimeKitAgent.entry_point( + # Initialize the RtcEngine with Agora credentials + engine=RtcEngine(appid=os.environ.get("AGORA_APP_ID"), appcert=os.environ.get("AGORA_APP_CERT")), + # Configure RTC options + options=RtcOptions( + channel_name=options['channel_name'], + uid=options['uid'], + sample_rate=SAMPLE_RATE, + channels=CHANNELS + ), + # Configure inference settings + inference_config=InferenceConfig( + # Set up the AI assistant's behavior + system_message="""\ +You are a helpful assistant. If asked about the weather make sure to use the provided tool to get that information. \ +If you are asked a question that requires a tool, say something like "working on that" and dont provide a concrete response \ +until you have received the response to the tool call.\ """, -voice=messages.Voices.Alloy, -turn_detection=messages.TurnDetectionTypes.SERVER_VAD, -), -) -) -`} - - -
- -The `agent.py` imports key classes from `rtc.py`, which implements the server-side Agora Python Voice SDK,, facilitating communication and managing audio streams. For SDK setup and dependencies, refer to [Voice calling quickstart](/voice-calling/get-started/get-started-sdk?platform=python). - -Below is the complete code for `rtc.py`. - -
- Complete code for `rtc.py` - -
+ voice=messages.Voices.Alloy, + # Configure voice activity detection + turn_detection=messages.ServerVAD( + threshold=0.5, + prefix_padding_ms=500, + suffix_padding_ms=200, + ), + ), + ) + ) +``` ## Test the code -1. **Update the values for** `AGORA_APP_ID` **and** `OPENAI_API_KEY` **in the project's** `.env` **file**. +1. **Update the values for** `AGORA_APP_ID`, `AGORA_APP_CERT`, **and** `OPENAI_API_KEY` **in the project's** `.env` **file**. This step ensures that the necessary credentials for Agora and OpenAI are correctly configured in your project. 2. **Execute the following command to run your app**: ```bash - python3 agent.py + python3 agent.py --channel_name=your_channel_name --uid=your_user_id ``` - This command launches the `agent.py` script, initializing the Agora channel and the OpenAI API connection. + This command launches the `agent.py` script, initializing the Agora channel and the OpenAI API connection. Replace `your_channel_name` with the desired channel name and `your_user_id` with a unique user ID. ## Reference This section contains additional information or links to relevant documentation that complements the current page or explains other aspects of the product. - [API reference for `rtc.py`](https://api-reference-git-python-voice-implementation-agora-gdxe.vercel.app/voice-sdk/python/rtc-py-api.html) -- [Voice calling quickstart (Python)](/voice-calling/get-started/get-started-sdk?platform=python) +- [Voice calling quickstart (Python)](/voice-calling/get-started/get-started-sdk?platform=python) \ No newline at end of file