diff --git a/assets/code/open-ai-integration/rtc-py.mdx b/assets/code/open-ai-integration/rtc-py.mdx new file mode 100644 index 000000000..f51312f1f --- /dev/null +++ b/assets/code/open-ai-integration/rtc-py.mdx @@ -0,0 +1,463 @@ +import CodeBlock from '@theme/CodeBlock'; + + +{`import asyncio +import json +import logging +import os +from typing import Any, AsyncIterator + +from agora.rtc.agora_base import ( + ChannelProfileType, + ClientRoleType, +) +from agora.rtc.agora_service import ( + AgoraService, + AgoraServiceConfig, + RTCConnConfig, +) +from agora.rtc.audio_frame_observer import AudioFrame, IAudioFrameObserver +from agora.rtc.audio_pcm_data_sender import PcmAudioFrame +from agora.rtc.local_user import LocalUser +from agora.rtc.local_user_observer import IRTCLocalUserObserver +from agora.rtc.rtc_connection import RTCConnection, RTCConnInfo +from agora.rtc.rtc_connection_observer import IRTCConnectionObserver +from pyee.asyncio import AsyncIOEventEmitter + +from ..realtimeapi.util import CHANNELS, SAMPLE_RATE + +logger = logging.getLogger(__name__) + +class AudioStream: + def __init__(self) -> None: + self.queue: asyncio.Queue = asyncio.Queue() + + def __aiter__(self) -> AsyncIterator[PcmAudioFrame]: + return self + + async def __anext__(self) -> PcmAudioFrame: + item = await self.queue.get() + if item is None: + raise StopAsyncIteration + + return item + +class ChannelEventObserver(IRTCConnectionObserver, IRTCLocalUserObserver, IAudioFrameObserver): + def __init__(self, event_emitter: AsyncIOEventEmitter) -> None: + self.loop = asyncio.get_event_loop() + self.emitter = event_emitter + self.audio_stream = AudioStream() + + + + def emit_event(self, event_name: str, *args): + """Helper function to emit events.""" + self.emitter.emit(event_name, *args) + + def on_connected( + self, agora_rtc_conn: RTCConnection, conn_info: RTCConnInfo, reason + ): + logger.info(f"Connected to RTC: {agora_rtc_conn} {conn_info} {reason}") + self.emit_event("connection_state_changed", agora_rtc_conn, conn_info, reason) + + def on_disconnected( + self, agora_rtc_conn: RTCConnection, conn_info: RTCConnInfo, reason + ): + logger.info(f"Disconnected from RTC: {agora_rtc_conn} {conn_info} {reason}") + self.emit_event("connection_state_changed", agora_rtc_conn, conn_info, reason) + + def on_connecting( + self, agora_rtc_conn: RTCConnection, conn_info: RTCConnInfo, reason + ): + logger.info(f"Connecting to RTC: {agora_rtc_conn} {conn_info} {reason}") + self.emit_event("connection_state_changed", agora_rtc_conn, conn_info, reason) + + def on_connection_failure(self, agora_rtc_conn, conn_info, reason): + logger.error("Connection failure: {agora_rtc_conn} {conn_info} {reason}") + self.emit_event("connection_state_changed", agora_rtc_conn, conn_info, reason) + + def on_user_joined(self, agora_rtc_conn: RTCConnection, user_id): + logger.info(f"User joined: {agora_rtc_conn} {user_id}") + self.emit_event("user_joined", agora_rtc_conn, user_id) + + def on_user_left(self, agora_rtc_conn: RTCConnection, user_id, reason): + logger.info(f"User left: {agora_rtc_conn} {user_id} {reason}") + self.emit_event("user_left", agora_rtc_conn, user_id, reason) + + + def handle_received_chunk(self, json_chunk): + chunk = json.loads(json_chunk) + msg_id = chunk["msg_id"] + part_idx = chunk["part_idx"] + total_parts = chunk["total_parts"] + if msg_id not in self.received_chunks: + self.received_chunks[msg_id] = {"parts": {}, "total_parts": total_parts} + if ( + part_idx not in self.received_chunks[msg_id]["parts"] + and 0 <= part_idx < total_parts + ): + self.received_chunks[msg_id]["parts"][part_idx] = chunk + if len(self.received_chunks[msg_id]["parts"]) == total_parts: + # all parts received, now recomposing original message and get rid it from dict + sorted_parts = sorted( + self.received_chunks[msg_id]["parts"].values(), + key=lambda c: c["part_idx"], + ) + full_message = "".join(part["content"] for part in sorted_parts) + del self.received_chunks[msg_id] + return full_message, msg_id + return (None, None) + + def on_stream_message( + self, agora_local_user: LocalUser, user_id, stream_id, data, length + ): + # logger.info(f"Stream message", agora_local_user, user_id, stream_id, length) + (reassembled_message, msg_id) = self.handle_received_chunk(data.decode("utf-8")) + if reassembled_message is not None: + logger.info(f"Reassembled message: {msg_id} {reassembled_message}") + + + def on_audio_subscribe_state_changed( + self, + agora_local_user, + channel, + user_id, + old_state, + new_state, + elapse_since_last_state, + ): + logger.info(f'on_audio_subscribe_state_changed: {channel} {user_id} {old_state} {new_state} {elapse_since_last_state}') + self.emit_event("audio_subscribe_state_changed", channel, user_id, old_state, new_state) + + def on_playback_audio_frame_before_mixing( + self, agora_local_user: LocalUser, channelId, uid, frame: AudioFrame + ): + audio_frame = PcmAudioFrame() + audio_frame.samples_per_channel = frame.samples_per_channel + audio_frame.bytes_per_sample = frame.bytes_per_sample + audio_frame.number_of_channels = frame.channels + audio_frame.sample_rate = SAMPLE_RATE + audio_frame.data = frame.buffer + + self.loop.call_soon_threadsafe(self.audio_stream.queue.put_nowait, audio_frame) + return 0 + +class Channel(): + def __init__( + self, rtc: "RtcEngine", channelId: str, uid: str + ) -> None: + self.loop = asyncio.get_event_loop() + + # Create the event emitter + self.emitter = AsyncIOEventEmitter() + + self.rtc = rtc + self.chat = Chat(self) + self.channelId = channelId + self.uid = uid + conn_config = RTCConnConfig( + client_role_type=ClientRoleType.CLIENT_ROLE_BROADCASTER, + channel_profile=ChannelProfileType.CHANNEL_PROFILE_LIVE_BROADCASTING, + ) + self.connection = self.rtc.agora_service.create_rtc_connection(conn_config) + + self.channel_event_observer = ChannelEventObserver(self.emitter) + self.connection.register_observer(self.channel_event_observer) + self.connection.connect("", self.channelId, self.uid) + + self.local_user = self.connection.get_local_user() + self.local_user.set_playback_audio_frame_before_mixing_parameters( + CHANNELS, SAMPLE_RATE + ) + self.local_user.register_local_user_observer(self.channel_event_observer) + self.local_user.register_audio_frame_observer(self.channel_event_observer) + self.local_user.subscribe_all_audio() + + self.media_node_factory = self.rtc.agora_service.create_media_node_factory() + self.audio_pcm_data_sender = ( + self.media_node_factory.create_audio_pcm_data_sender() + ) + self.audio_track = self.rtc.agora_service.create_custom_audio_track_pcm( + self.audio_pcm_data_sender + ) + self.audio_track.set_enabled(1) + self.local_user.publish_audio(self.audio_track) + + self.stream_id = self.connection.create_data_stream(False, False) + self.received_chunks = {} + self.waiting_message = None + self.msg_id = "" + self.msg_index = "" + + async def disconnect(self) -> None: + """ + Disconnects the channel. + """ + disconnected_future = asyncio.Future[None]() + def callback(agora_rtc_conn: RTCConnection, conn_info: RTCConnInfo, reason): + self.off("connection_state_changed", callback) + if conn_info.state == 1: + disconnected_future.set_result(None) + self.on("connection_state_changed", callback) + self.connection.disconnect() + await disconnected_future + + def get_audio_frames(self) -> AudioStream: + """ + Returns the audio frames from the channel. + + Returns: + AudioStream: The audio stream. + """ + return self.channel_event_observer.audio_stream + + async def push_audio_frame(self, frame: bytes) -> None: + """ + Pushes an audio frame to the channel. + + Parameters: + frame: The audio frame to push. + """ + audio_frame = PcmAudioFrame() + audio_frame.data = bytearray(frame) + audio_frame.timestamp = 0 + audio_frame.bytes_per_sample = 2 + audio_frame.number_of_channels = CHANNELS + audio_frame.sample_rate = SAMPLE_RATE + audio_frame.samples_per_channel = int( + len(frame) / audio_frame.bytes_per_sample / audio_frame.number_of_channels + ) + + self.audio_pcm_data_sender.send_audio_pcm_data(audio_frame) + + async def subscribe_audio(self, uid: int) -> None: + """ + Subscribes to the audio of a user. + + Parameters: + uid: The user ID to subscribe to. + """ + loop = asyncio.get_event_loop() + future = loop.create_future() + + def callback( + agora_local_user, + channel, + user_id, + old_state, + new_state, + elapse_since_last_state, + ): + if new_state == 3: + loop.call_soon_threadsafe(future.set_result, None) + else: + loop.call_soon_threadsafe( + future.set_exception, + Exception( + f"subscribe {user_id} audio state from {old_state} to {new_state}" + ), + ) + + self.once("audio_subscribe_state_changed", callback) + self.local_user.subscribe_audio(uid) + + await future + + async def unsubscribe_audio(self, uid: int) -> None: + """ + Unsubscribes from the audio of a user. + + Parameters: + uid: The user ID to unsubscribe from. + """ + loop = asyncio.get_event_loop() + future = loop.create_future() + + def callback( + agora_local_user, + channel, + user_id, + old_state, + new_state, + elapse_since_last_state, + ): + if new_state == 3: + loop.call_soon_threadsafe(future.set_result, None) + else: + loop.call_soon_threadsafe( + future.set_exception, + Exception( + f"subscribe {user_id} audio state from {old_state} to {new_state}" + ), + ) + + self.once("audio_subscribe_state_changed", callback) + self.local_user.unsubscribe_audio(uid) + + await future + + def _split_string_into_chunks(self, long_string, msg_id, chunk_size=300) -> list[dict[str: Any]]: + """ + Splits a long string into chunks of a given size. + + Parameters: + long_string: The string to split. + msg_id: The message ID. + chunk_size: The size of each chunk. + + Returns: + list[dict[str: Any]]: The list of chunks. + + """ + total_parts = (len(long_string) + chunk_size - 1) // chunk_size + json_chunks = [] + for idx in range(total_parts): + start = idx * chunk_size + end = min(start + chunk_size, len(long_string)) + chunk = { + 'msg_id': msg_id, + 'part_idx': idx, + 'total_parts': total_parts, + 'content': long_string[start:end] + } + json_chunk = json.dumps(chunk, ensure_ascii=False) + json_chunks.append(json_chunk) + return json_chunks + + async def send_stream_message(self, data: str, msg_id: str) -> None: + """ + Sends a stream message to the channel. + + Parameters: + data: The data to send. + msg_id: The message ID. + """ + + chunks = self._split_string_into_chunks(data, msg_id) + for chunk in chunks: + self.connection.send_stream_message(self.stream_id, chunk) + + def on(self, event_name: str, callback): + """ + Allows external components to subscribe to events. + + Parameters: + event_name: The name of the event to subscribe to. + callback: The callback to call when the event is emitted. + + """ + self.emitter.on(event_name, callback) + + def once(self, event_name: str, callback): + """ + Allows external components to subscribe to events once. + + Parameters: + event_name: The name of the event to subscribe to. + callback: The callback to call when the event is emitted. + """ + self.emitter.once(event_name, callback) + + def off(self, event_name: str, callback): + """ + Allows external components to unsubscribe from events. + + Parameters: + event_name: The name of the event to unsubscribe from. + callback: The callback to remove from the event. + """ + self.emitter.remove_listener(event_name, callback) + +class ChatMessage(): + def __init__(self, message: str, msg_id: str, done: bool = False) -> None: + self.message = message + self.msg_id = msg_id + self.done = done + +class Chat(): + def __init__(self, channel: Channel) -> None: + self.channel = channel + self.loop = self.channel.loop + self.queue = asyncio.Queue() + + def log_exception(t: asyncio.Task[Any]) -> None: + if not t.cancelled() and t.exception(): + logger.error( + "unhandled exception", + exc_info=t.exception(), + ) + asyncio.create_task(self._process_message()).add_done_callback(log_exception) + + async def send_message(self, item: ChatMessage) -> None: + """ + Sends a message to the channel. + + Parameters: + item: The message to send. + """ + await self.queue.put(item) + # await self.queue.put_nowait(item) + + async def _process_message(self) -> None: + """ + Processes messages in the queue. + """ + + while True: + item: ChatMessage = await self.queue.get() + await self.channel.send_stream_message(item.message, item.msg_id) + self.queue.task_done() + # await asyncio.sleep(0) + +class RtcEngine: + def __init__(self, appid: str): + self.appid = appid + config = AgoraServiceConfig() + config.appid = appid + config.log_path = os.path.join( + os.path.dirname( + os.path.dirname( + os.path.dirname(os.path.join(os.path.abspath(__file__))) + ) + ), + "agorasdk.log", + ) + self.agora_service = AgoraService() + self.agora_service.initialize(config) + + async def connect(self, channelId: str, uid: str) -> Channel: + """ + Connects to a channel. + + Parameters: + channelId: The channel ID. + uid: The user ID. + + Returns: + Channel: The channel. + """ + + loop = asyncio.get_event_loop() + future = loop.create_future() + + def callback(agora_rtc_conn: RTCConnection, conn_info: RTCConnInfo, reason): + channel.off("connection_state_changed", callback) + if conn_info.state == 3: + future.set_result(channel) + else: + future.set_exception( + Exception(f"connection state:{conn_info.state}"), + ) + + channel = Channel(self, channelId, uid) + channel.on("connection_state_changed", callback) + + await future + return channel + + def destroy(self) -> None: + """ + Destroys the RTC engine. + """ + self.agora_service.release()`} + \ No newline at end of file diff --git a/open-ai-integration/get-started/_category_.json b/open-ai-integration/get-started/_category_.json new file mode 100644 index 000000000..8d46f797b --- /dev/null +++ b/open-ai-integration/get-started/_category_.json @@ -0,0 +1,6 @@ +{ + "position": 2, + "label": "Get started", + "collapsible": true, + "link": null +} diff --git a/open-ai-integration/get-started/manage-agora-account.mdx b/open-ai-integration/get-started/manage-agora-account.mdx new file mode 100644 index 000000000..e866721be --- /dev/null +++ b/open-ai-integration/get-started/manage-agora-account.mdx @@ -0,0 +1,14 @@ +--- +title: 'Agora account management' +sidebar_position: 2 +type: docs +platform_selector: false +description: > + Create, manage and update your Agora account. +--- + +import ManageAccount from '@docs/shared/common/_manage-agora-account.mdx'; + +export const toc = [{}]; + + diff --git a/open-ai-integration/get-started/quickstart.mdx b/open-ai-integration/get-started/quickstart.mdx new file mode 100644 index 000000000..c0a3d0b0d --- /dev/null +++ b/open-ai-integration/get-started/quickstart.mdx @@ -0,0 +1,15 @@ +--- +title: 'Integration quickstart' +sidebar_position: 1 +type: docs +platform_selector: false +description: > + Integrate Agora's real-time audio communication capabilities with OpenAI's Large Language Models (LLMs) +--- + +import OpenAIIntegration from '@docs/shared/open-ai-integration//quickstart.mdx'; + +export const toc = [{}]; + + + diff --git a/open-ai-integration/overview/_category_.json b/open-ai-integration/overview/_category_.json new file mode 100644 index 000000000..9874239b6 --- /dev/null +++ b/open-ai-integration/overview/_category_.json @@ -0,0 +1,6 @@ +{ + "position": 1, + "label": "Overview", + "collapsible": true, + "link": null +} diff --git a/open-ai-integration/overview/core-concepts.mdx b/open-ai-integration/overview/core-concepts.mdx new file mode 100644 index 000000000..be22c1aa9 --- /dev/null +++ b/open-ai-integration/overview/core-concepts.mdx @@ -0,0 +1,15 @@ +--- +title: 'Core concepts' +sidebar_position: 2 +type: docs +platform_selector: false +description: > + Ideas that are central to developing with Agora. +--- + +import CoreConcepts from '@docs/shared/common/core-concepts/real-time-stt.mdx'; + +export const toc = [{}]; + + + diff --git a/open-ai-integration/overview/product-overview.mdx b/open-ai-integration/overview/product-overview.mdx new file mode 100644 index 000000000..8fcd6b56c --- /dev/null +++ b/open-ai-integration/overview/product-overview.mdx @@ -0,0 +1,50 @@ +--- +title: 'Product overview' +sidebar_position: 1 +platform_selector: false +description: > + Integrate real-time audio with OpenAI LLM. +--- + + + +Integrating Agora's real-time audio communication with OpenAI's Large Language Models (LLM) opens the door to powerful, interactive voice-based applications. Create seamless voice-enabled experiences, such as voice-controlled AI assistants, or interactive dialogue systems by combining Agora's robust real-time audio streaming capabilities with the conversational intelligence of OpenAI's LLMs. This integration allows for dynamic, responsive audio interactions, enhancing user engagement across a wide range of use cases—from customer support bots to collaborative voice-driven applications. + + \ No newline at end of file diff --git a/open-ai-integration/reference/_category_.json b/open-ai-integration/reference/_category_.json new file mode 100644 index 000000000..d540fd6db --- /dev/null +++ b/open-ai-integration/reference/_category_.json @@ -0,0 +1,6 @@ +{ + "position": 4, + "label": "Reference", + "collapsible": true, + "link": null +} diff --git a/open-ai-integration/reference/firewall.mdx b/open-ai-integration/reference/firewall.mdx new file mode 100644 index 000000000..54a9d7789 --- /dev/null +++ b/open-ai-integration/reference/firewall.mdx @@ -0,0 +1,14 @@ +--- +title: 'Firewall requirements' +sidebar_position: 4 +type: docs +platform_selector: false +description: > + use Agora products in environments with restricted network access +--- + +import Firewall from '@docs/shared/common/_firewall.mdx'; + +export const toc = [{}]; + + \ No newline at end of file diff --git a/open-ai-integration/reference/glossary.mdx b/open-ai-integration/reference/glossary.mdx new file mode 100644 index 000000000..fbdc8f368 --- /dev/null +++ b/open-ai-integration/reference/glossary.mdx @@ -0,0 +1,14 @@ +--- +title: 'Glossary' +sidebar_position: 5 +type: docs +platform_selector: false +description: > + A list of terms used in Agora documentation. +--- + +import Glossary from '@docs/shared/common/_glossary.mdx'; + +export const toc = [{}]; + + \ No newline at end of file diff --git a/open-ai-integration/reference/security.mdx b/open-ai-integration/reference/security.mdx new file mode 100644 index 000000000..85eeb6bb7 --- /dev/null +++ b/open-ai-integration/reference/security.mdx @@ -0,0 +1,14 @@ +--- +title: 'Security' +sidebar_position: 3 +type: docs +platform_selector: false +description: > + How Agora handles security. +--- + +import Security from '@docs/shared/common/_security.mdx'; + +export const toc = [{}]; + + \ No newline at end of file diff --git a/shared/common/prerequisites/python.mdx b/shared/common/prerequisites/python.mdx index ca69bfa34..73315ccb0 100644 --- a/shared/common/prerequisites/python.mdx +++ b/shared/common/prerequisites/python.mdx @@ -1,10 +1,9 @@ -- [Python](https://www.python.org/downloads/) 3.8 or higher +- [Python](https://www.python.org/downloads/) 3.12 or higher -- A supported OS: +- A supported Linux distribution: - Ubuntu 18.04 LTS or higher - CentOS 7.0 or higher - - MacOS 13 or higher \ No newline at end of file diff --git a/shared/open-ai-integration/quickstart.mdx b/shared/open-ai-integration/quickstart.mdx new file mode 100644 index 000000000..ada8d65bb --- /dev/null +++ b/shared/open-ai-integration/quickstart.mdx @@ -0,0 +1,666 @@ +import CodeBlock from '@theme/CodeBlock'; +import CodeRtcPy from '@docs/assets/code/open-ai-integration/rtc-py.mdx' +import Prerequisites from '@docs/shared/common/prerequisites/index.mdx'; + +Integrating Agora's real-time audio communication capabilities with OpenAI's language models enables dynamic, conversational AI experiences. This guide shows you how to set up a Python project that combines Agora's voice SDK with OpenAI's API, creating an interactive, voice-driven assistant. + +## Understand the tech + +The `RealtimeKitAgent` class manages the integration by connecting to an Agora channel for real-time audio streaming and to OpenAI's API for processing audio input and generating AI-driven responses. Audio frames captured from an Agora channel are streamed to OpenAI's API where the AI processes the input. The API responses, which include transcribed text and synthesized voice output, are then delivered back to the Agora channel. + +The code sets up tools that can be executed locally or passed through the API. This allows the AI to perform specific tasks, such as retrieving data from external sources. The agent processes various message types from OpenAI, such as audio responses, transcription updates, and error messages, and sends them to users through the Agora audio channel, facilitating continuous interaction. + +## Prerequisites + + + +## Set up your project + +Follow these steps to set up your Python integration project: + +1. Download the OpenAI [`realtimeapi-examples`](https://openai.com/api/) package and unzip it. + +1. Create the following folder structure for your project: + + ``` + /realtime-agent + │ + ├── agent.py + ├── .env + ├── requirements.txt + │ + ├── agora/ + │ ├── __init__.py + │ ├── rtc.py + │ + ├── realtimekit/ + │ ├── __init__.py + │ ├── realtimeapi/ + │ ├── __init__.py + │ ├── client.py + │ ├── messages.py + │ └── util.py + ``` + + - `agent.py`: This is he main script that runs the `RealtimeKitAgent`. + + It imports Agora functionality from the `agora/rtc.py` module and the OpenAI capabilities from the `realtimekit/realtimeapi` package. + - `agora/rtc.py`: Contains the wrapper around the Agora Python SDK. + - `realtimekit/realtimeapi/`: Contains the classes and methods that interact with OpenAI’s Realtime API. + + The [Complete code](#complete-integration-code) code for `agent.py` and `rtc.py` is provided on this page. The files in the `realtimekit/realtimeapi` folder are copied from the downloaded OpenAI package. + +1. Add the following keys to your `.env` file: + + ```python + # Agora RTC app ID + AGORA_APP_ID=your_agora_app_id + + # OpenAI API key for authentication + OPENAI_API_KEY=your_openai_api_key_here + + # API base URI for the Realtime API + REALTIME_API_BASE_URI=wss://api.openai.com + ``` + +1. Install the dependencies: + + ```bash + pip install -r requirements.txt + ``` + +## Implementation + +The `RealtimeKitAgent` class integrates Agora's audio communication capabilities with OpenAI's AI services. This class manages audio streams, handles communication with the OpenAI API, and processes AI-generated responses, providing a seamless conversational AI experience. + +### Connect to Agora and OpenAI + +The `setup_and_run_agent` method sets up the `RealtimeKitAgent` by connecting to an Agora channel and initializing a session with the OpenAI Realtime API client. It sends configuration messages to set up the session and conversation parameters before starting the agent's operations. The method ensures the connection is properly handled and cleaned up after use. + +``` python +@classmethod +async def setup_and_run_agent( + cls, + *, + engine: RtcEngine, + inference_config: InferenceConfig, + tools: ToolContext | None, +) -> None: + channel = await engine.connect(channelId="realtimekit_agora", uid="123") + + try: + async with RealtimeApiClient( + base_uri=os.getenv("REALTIME_API_BASE_URI", "wss://api.openai.com"), + api_key=os.getenv("OPENAI_API_KEY"), + verbose=False, + ) as client: + await client.send_message( + messages.UpdateSessionConfig( + session=messages.SessionResource(), + # turn_detection=inference_config.turn_detection, + # transcribe_input=False, + # input_audio_format=messages.AudioFormats.PCM16, + # vads=messages.VADConfig(), + ) + ) + + [start_session_message, _] = await asyncio.gather( + *[ + anext(client.listen()), + client.send_message( + messages.UpdateConversationConfig( + system_message=inference_config.system_message, + output_audio_format=messages.AudioFormats.PCM16, + voice=inference_config.voice, + tools=tools.model_description() if tools else None, + transcribe_input=False, + ) + ), + ] + ) + assert isinstance(start_session_message, messages.StartSession) + print( + f"Session started: {start_session_message.session.id} model: {start_session_message.session.model}" + ) + + agent = cls( + client=client, + tools=tools, + channel=channel, + ) + await agent.run() + + finally: + await engine.disconnect() + await shutdown(asyncio.get_event_loop()) +``` + +### Initialize the RealtimeKitAgent + +The `RealtimeKitAgent` class constructor accepts an OpenAI `RealtimeApiClient`, an optional `ToolContext` for function registration, and an Agora Channel for audio communication. This setup prepares the agent for processing audio streams and interacting with the AI model. + +```python +def __init__( + self, + *, + client: RealtimeApiClient, + tools: ToolContext | None, + channel: Channel, +) -> None: + self.client = client + self.tools = tools + self._client_tool_futures = {} + self.channel = channel +``` + +### Launch the agent + +The `entry_point` method is the primary entry point for launching the agent. It invokes `setup_and_run_agent` with the relevant parameters, initializing the agent and triggering its functionalities. + +```python +@classmethod +async def entry_point( + cls, + *, + engine: RtcEngine, + inference_config: InferenceConfig, + tools: ToolContext | None = None, +) -> None: + await cls.setup_and_run_agent( + engine=engine, inference_config=inference_config, tools=tools + ) +``` + +The asynchronous `run` method orchestrates the main operations of the `RealtimeKitAgent`. It handles audio streaming, manages tasks for processing audio input, output, and model messages, and sets up exception handling. + +```python +async def run(self) -> None: + def log_exception(t: asyncio.Task[Any]) -> None: + if not t.cancelled() and t.exception(): + logger.error( + "unhandled exception", + exc_info=t.exception(), + ) + + disconnected_future = asyncio.Future[None]() + + def _on_disconnected() -> None: + if not disconnected_future.done(): + disconnected_future.set_result(None) + + # self.room.on("disconnected", _on_disconnected) + + asyncio.create_task(self._stream_input_audio_to_model()).add_done_callback( + log_exception + ) + asyncio.create_task( + self._stream_audio_queue_to_audio_output() + ).add_done_callback(log_exception) + + asyncio.create_task(self._process_model_messages()).add_done_callback( + log_exception + ) + + await disconnected_future + logger.info("Agent finished running") +``` + +### Stream input audio to the AI model + +The asynchronous method `_stream_input_audio_to_model` captures audio frames from the Agora channel and sends them to the OpenAI API client for processing. It listens for incoming audio frames and forwards them for real-time audio analysis by the AI model. + +```python +async def _stream_input_audio_to_model(self) -> None: + audio_frames = self.channel.get_audio_frames() + async for audio_frame in audio_frames: + # send the frame to the model via the API client + await self.client.send_audio_data(audio_frame.data) +``` + +### Stream audio from the AI model to the user + +The asynchronous method `_stream_audio_queue_to_audio_output` manages the transmission of processed audio data from the AI model back to the end-user. It retrieves audio frames from a queue and sends them to the Agora channel, allowing users to hear the AI-generated responses in real-time. + +```python +async def _stream_audio_queue_to_audio_output(self) -> None: + while True: + # audio queue contains audio data from the model, send it the end-user via our local audio source + frame = await self.audio_queue.get() + await self.channel.push_audio_frame(frame) + await asyncio.sleep(0) # allow other tasks to run +``` + +The `_process_model_messages` asynchronous method listens for incoming messages from the OpenAI API client and processes them based on their type. It handles various message types, such as audio deltas, transcriptions, and errors, ensuring appropriate actions for each. This includes updating the user chat with transcribed text and managing audio playback. + +```python +async def _process_model_messages(self) -> None: + async for message in self.client.listen(): + match message: + case messages.ResonseAudioDelta(): + # logger.info("Received audio message") + await self.audio_queue.put(base64.b64decode(message.delta)) + + case messages.ResonseAudioTranscriptionDelta(): + logger.info(f"Received text message {message=}") + await self.channel.chat.send_message(ChatMessage(message=message.delta, msg_id=message.output_item_id)) + + case messages.ResonseAudioTranscriptionDone(): + logger.info(f"Text message done: {message=}") + await self.channel.chat.send_message(ChatMessage(message=message.value, msg_id=message.output_item_id, done=True)) + + case messages.MessageAdded(): + pass + case messages.ServerAddMessage(): + pass + + case messages.VADSpeechStarted(): + pass + case messages.VADSpeechStopped(): + pass + + case messages.GenerationCanceled(): + logger.info(f"Server turn canceled: {message=}") + + case messages.GenerationFinished(): + # TODO this is where we mark no longer appending text + logger.info(f"Server turn finished: {message=}") + # await self.channel.generation_finished() + + case messages.AddContent(type=messages.AddContentType.TOOL_CALL): + # TODO implement streaming tool calls + logger.info(f"Received tool call buffer add {message=}") + + case messages.RealtimeError(error=error): + # TODO do we have to stop the session here? + logger.error(f"Received error message {error=}") + + case _: + logger.warning(f"Unhandled message {message=}") +``` + + +### Complete integration code + +The `agent.py` script integrates the code components presented in this section into reusable Python classes that you can extend for your own applications. + +
+Complete code for `agent.py` + +{`import abc +import asyncio +import base64 +import json +import logging +import os +from builtins import anext +from typing import Any, Callable, assert_never + +from attr import dataclass +from dotenv import load_dotenv +from pydantic import BaseModel + +from realtimekit.realtimeapi import messages +from realtimekit.realtimeapi.client import RealtimeApiClient + +from .agora.rtc import Channel, Chat, ChatMessage, RtcEngine + +logger = logging.getLogger(__name__) + + +@dataclass(frozen=True, kw_only=True) +class InferenceConfig: + system_message: str | None = None + turn_detection: messages.TurnDetectionTypes | None = None + voice: messages.Voices | None = None + + +@dataclass(frozen=True, kw_only=True) +class LocalFunctionToolDeclaration: + """Declaration of a tool that can be called by the model, and runs a function locally on the tool context.""" + + name: str + description: str + parameters: dict[str, Any] + function: Callable[..., Any] + + def model_description(self) -> dict[str, Any]: + return { + "type": "function", + "function": { + "name": self.name, + "description": self.description, + "parameters": self.parameters, + }, + } + + +@dataclass(frozen=True, kw_only=True) +class PassThroughFunctionToolDeclaration: + """Declaration of a tool that can be called by the model, and is passed through the LiveKit client.""" + + name: str + description: str + parameters: dict[str, Any] + + def model_description(self) -> dict[str, Any]: + return { + "type": "function", + "function": { + "name": self.name, + "description": self.description, + "parameters": self.parameters, + }, + } + + +ToolDeclaration = LocalFunctionToolDeclaration | PassThroughFunctionToolDeclaration + + +@dataclass(frozen=True, kw_only=True) +class LocalToolCallExecuted: + json_encoded_output: str + + +@dataclass(frozen=True, kw_only=True) +class ShouldPassThroughToolCall: + decoded_function_args: dict[str, Any] + + +ExecuteToolCallResult = LocalToolCallExecuted | ShouldPassThroughToolCall + + +class ToolContext(abc.ABC): + _tool_declarations: dict[str, ToolDeclaration] + + def __init__(self) -> None: + # TODO should be an ordered dict + self._tool_declarations = {} + + def register_function( + self, + *, + name: str, + description: str = "", + parameters: dict[str, Any], + fn: Callable[..., Any], + ) -> None: + self._tool_declarations[name] = LocalFunctionToolDeclaration( + name=name, description=description, parameters=parameters, function=fn + ) + + def register_client_function( + self, + *, + name: str, + description: str = "", + parameters: dict[str, Any], + ) -> None: + self._tool_declarations[name] = PassThroughFunctionToolDeclaration( + name=name, description=description, parameters=parameters + ) + + async def execute_tool( + self, tool_name: str, encoded_function_args: str + ) -> ExecuteToolCallResult | None: + tool = self._tool_declarations.get(tool_name) + if not tool: + return None + + args = json.loads(encoded_function_args) + assert isinstance(args, dict) + + if isinstance(tool, LocalFunctionToolDeclaration): + logger.info(f"Executing tool {tool_name} with args {args}") + result = await tool.function(**args) + logger.info(f"Tool {tool_name} executed with result {result}") + return LocalToolCallExecuted(json_encoded_output=json.dumps(result)) + + if isinstance(tool, PassThroughFunctionToolDeclaration): + return ShouldPassThroughToolCall(decoded_function_args=args) + + assert_never(tool) + + def model_description(self) -> list[dict[str, Any]]: + return [v.model_description() for v in self._tool_declarations.values()] + + +class ClientToolCallResponse(BaseModel): + tool_call_id: str + result: dict[str, Any] | str | float | int | bool | None = None + + +class RealtimeKitAgent: + engine: RtcEngine + channel: Channel + client: RealtimeApiClient + audio_queue: asyncio.Queue[bytes] = asyncio.Queue() + message_queue: asyncio.Queue[messages.ResonseAudioTranscriptionDelta] = asyncio.Queue() + message_done_queue: asyncio.Queue[messages.ResonseAudioTranscriptionDone] = asyncio.Queue() + tools: ToolContext | None = None + + _client_tool_futures: dict[str, asyncio.Future[ClientToolCallResponse]] + + @classmethod + async def setup_and_run_agent( + cls, + *, + engine: RtcEngine, + inference_config: InferenceConfig, + tools: ToolContext | None, + ) -> None: + channel = await engine.connect(channelId="realtimekit_agora", uid="123") + + try: + async with RealtimeApiClient( + base_uri=os.getenv("REALTIME_API_BASE_URI", "wss://api.openai.com"), + api_key=os.getenv("OPENAI_API_KEY"), + verbose=False, + ) as client: + await client.send_message( + messages.UpdateSessionConfig( + session=messages.SessionResource(), + # turn_detection=inference_config.turn_detection, + # transcribe_input=False, + # input_audio_format=messages.AudioFormats.PCM16, + # vads=messages.VADConfig(), + ) + ) + + [start_session_message, _] = await asyncio.gather( + *[ + anext(client.listen()), + client.send_message( + messages.UpdateConversationConfig( + system_message=inference_config.system_message, + output_audio_format=messages.AudioFormats.PCM16, + voice=inference_config.voice, + tools=tools.model_description() if tools else None, + transcribe_input=False, + ) + ), + ] + ) + assert isinstance(start_session_message, messages.StartSession) + print( + f"Session started: {start_session_message.session.id} model: {start_session_message.session.model}" + ) + + agent = cls( + client=client, + tools=tools, + channel=channel, + ) + await agent.run() + + finally: + await engine.disconnect() + await shutdown(asyncio.get_event_loop()) + + @classmethod + async def entry_point( + cls, + *, + engine: RtcEngine, + inference_config: InferenceConfig, + tools: ToolContext | None = None, + ) -> None: + await cls.setup_and_run_agent( + engine=engine, inference_config=inference_config, tools=tools + ) + + def __init__( + self, + *, + client: RealtimeApiClient, + tools: ToolContext | None, + channel: Channel, + ) -> None: + self.client = client + self.tools = tools + self._client_tool_futures = {} + self.channel = channel + + async def run(self) -> None: + def log_exception(t: asyncio.Task[Any]) -> None: + if not t.cancelled() and t.exception(): + logger.error( + "unhandled exception", + exc_info=t.exception(), + ) + + disconnected_future = asyncio.Future[None]() + + def _on_disconnected() -> None: + if not disconnected_future.done(): + disconnected_future.set_result(None) + + # self.room.on("disconnected", _on_disconnected) + + asyncio.create_task(self._stream_input_audio_to_model()).add_done_callback( + log_exception + ) + asyncio.create_task( + self._stream_audio_queue_to_audio_output() + ).add_done_callback(log_exception) + + asyncio.create_task(self._process_model_messages()).add_done_callback( + log_exception + ) + + await disconnected_future + logger.info("Agent finished running") + + async def _stream_input_audio_to_model(self) -> None: + audio_frames = self.channel.get_audio_frames() + async for audio_frame in audio_frames: + # send the frame to the model via the API client + await self.client.send_audio_data(audio_frame.data) + + async def _stream_audio_queue_to_audio_output(self) -> None: + while True: + # audio queue contains audio data from the model, send it the end-user via our local audio source + frame = await self.audio_queue.get() + await self.channel.push_audio_frame(frame) + await asyncio.sleep(0) # allow other tasks to run + + + async def _process_model_messages(self) -> None: + async for message in self.client.listen(): + match message: + case messages.ResonseAudioDelta(): + # logger.info("Received audio message") + await self.audio_queue.put(base64.b64decode(message.delta)) + + case messages.ResonseAudioTranscriptionDelta(): + logger.info(f"Received text message {message=}") + await self.channel.chat.send_message(ChatMessage(message=message.delta, msg_id=message.output_item_id)) + + case messages.ResonseAudioTranscriptionDone(): + logger.info(f"Text message done: {message=}") + await self.channel.chat.send_message(ChatMessage(message=message.value, msg_id=message.output_item_id, done=True)) + + case messages.MessageAdded(): + pass + case messages.ServerAddMessage(): + pass + + case messages.VADSpeechStarted(): + pass + case messages.VADSpeechStopped(): + pass + + case messages.GenerationCanceled(): + logger.info(f"Server turn canceled: {message=}") + + case messages.GenerationFinished(): + # TODO this is where we mark no longer appending text + logger.info(f"Server turn finished: {message=}") + # await self.channel.generation_finished() + + case messages.AddContent(type=messages.AddContentType.TOOL_CALL): + # TODO implement streaming tool calls + logger.info(f"Received tool call buffer add {message=}") + + case messages.RealtimeError(error=error): + # TODO do we have to stop the session here? + logger.error(f"Received error message {error=}") + + case _: + logger.warning(f"Unhandled message {message=}") + +async def shutdown(loop, signal=None): + """Gracefully shut down the application.""" + if signal: + print(f"Received exit signal {signal.name}...") + + tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()] + + print(f"Cancelling {len(tasks)} outstanding tasks") + for task in tasks: + task.cancel() + + await asyncio.gather(*tasks, return_exceptions=True) + loop.stop() + +if __name__ == "__main__": + load_dotenv() + asyncio.run( + RealtimeKitAgent.entry_point( + engine=RtcEngine(appid="aab8b8f5a8cd4469a63042fcfafe7063"), + inference_config=InferenceConfig( + system_message="""\ +You are a helpful assistant. If asked about the weather make sure to use the provided tool to get that information. \ +If you are asked a question that requires a tool, say something like "working on that" and dont provide a concrete response \ +until you have received the response to the tool call.\ +""", + voice=messages.Voices.Alloy, + turn_detection=messages.TurnDetectionTypes.SERVER_VAD, + ), + ) + )`} + +
+ +The `agent.py` imports key classes from `rtc.py`, a wrapper around the Agora Python Voice SDK. For SDK setup and dependencies, refer to [Voice calling quickstart](/voice-calling/get-started/get-started-sdk?platform=python). +Following is the complete code for `rtc.py`. + +
+Complete code for `rtc.py` + +
+ +## Test your code + +1. Update the values for `AGORA_APP_ID` and ` OPENAI_API_KEY` in the project's `.env` file. + +2. Execute the following command to run your app: + + ```bash + python3 agent.py + ``` + +## Reference + +This section contains content that completes the information on this page, or points you to documentation that explains other aspects to this product. + +- [Voice calling quickstart (Python)](/voice-calling/get-started/get-started-sdk?platform=python) \ No newline at end of file diff --git a/shared/video-sdk/get-started/get-started-sdk/project-implementation/python.mdx b/shared/video-sdk/get-started/get-started-sdk/project-implementation/python.mdx index ea279f418..27fa285a6 100644 --- a/shared/video-sdk/get-started/get-started-sdk/project-implementation/python.mdx +++ b/shared/video-sdk/get-started/get-started-sdk/project-implementation/python.mdx @@ -1,4 +1,5 @@ import CodeBlock from '@theme/CodeBlock'; +import CodeRtcPy from '@docs/assets/code/open-ai-integration/rtc-py.mdx' @@ -256,468 +257,7 @@ The `rtc.py` script integrates the code components presented in this section int
Complete code for `rtc.py` - - -{`import asyncio -import json -import logging -import os -from typing import Any, AsyncIterator - -from agora.rtc.agora_base import ( - ChannelProfileType, - ClientRoleType, -) -from agora.rtc.agora_service import ( - AgoraService, - AgoraServiceConfig, - RTCConnConfig, -) -from agora.rtc.audio_frame_observer import AudioFrame, IAudioFrameObserver -from agora.rtc.audio_pcm_data_sender import PcmAudioFrame -from agora.rtc.local_user import LocalUser -from agora.rtc.local_user_observer import IRTCLocalUserObserver -from agora.rtc.rtc_connection import RTCConnection, RTCConnInfo -from agora.rtc.rtc_connection_observer import IRTCConnectionObserver -from pyee.asyncio import AsyncIOEventEmitter - -from ..realtimeapi.util import CHANNELS, SAMPLE_RATE - -logger = logging.getLogger(__name__) - -class AudioStream: - def __init__(self) -> None: - self.queue: asyncio.Queue = asyncio.Queue() - - def __aiter__(self) -> AsyncIterator[PcmAudioFrame]: - return self - - async def __anext__(self) -> PcmAudioFrame: - item = await self.queue.get() - if item is None: - raise StopAsyncIteration - - return item - -class ChannelEventObserver(IRTCConnectionObserver, IRTCLocalUserObserver, IAudioFrameObserver): - def __init__(self, event_emitter: AsyncIOEventEmitter) -> None: - self.loop = asyncio.get_event_loop() - self.emitter = event_emitter - self.audio_stream = AudioStream() - - - - def emit_event(self, event_name: str, *args): - """Helper function to emit events.""" - self.emitter.emit(event_name, *args) - - def on_connected( - self, agora_rtc_conn: RTCConnection, conn_info: RTCConnInfo, reason - ): - logger.info(f"Connected to RTC: {agora_rtc_conn} {conn_info} {reason}") - self.emit_event("connection_state_changed", agora_rtc_conn, conn_info, reason) - - def on_disconnected( - self, agora_rtc_conn: RTCConnection, conn_info: RTCConnInfo, reason - ): - logger.info(f"Disconnected from RTC: {agora_rtc_conn} {conn_info} {reason}") - self.emit_event("connection_state_changed", agora_rtc_conn, conn_info, reason) - - def on_connecting( - self, agora_rtc_conn: RTCConnection, conn_info: RTCConnInfo, reason - ): - logger.info(f"Connecting to RTC: {agora_rtc_conn} {conn_info} {reason}") - self.emit_event("connection_state_changed", agora_rtc_conn, conn_info, reason) - - def on_connection_failure(self, agora_rtc_conn, conn_info, reason): - logger.error("Connection failure: {agora_rtc_conn} {conn_info} {reason}") - self.emit_event("connection_state_changed", agora_rtc_conn, conn_info, reason) - - def on_user_joined(self, agora_rtc_conn: RTCConnection, user_id): - logger.info(f"User joined: {agora_rtc_conn} {user_id}") - self.emit_event("user_joined", agora_rtc_conn, user_id) - - def on_user_left(self, agora_rtc_conn: RTCConnection, user_id, reason): - logger.info(f"User left: {agora_rtc_conn} {user_id} {reason}") - self.emit_event("user_left", agora_rtc_conn, user_id, reason) - - - def handle_received_chunk(self, json_chunk): - chunk = json.loads(json_chunk) - msg_id = chunk["msg_id"] - part_idx = chunk["part_idx"] - total_parts = chunk["total_parts"] - if msg_id not in self.received_chunks: - self.received_chunks[msg_id] = {"parts": {}, "total_parts": total_parts} - if ( - part_idx not in self.received_chunks[msg_id]["parts"] - and 0 <= part_idx < total_parts - ): - self.received_chunks[msg_id]["parts"][part_idx] = chunk - if len(self.received_chunks[msg_id]["parts"]) == total_parts: - # all parts received, now recomposing original message and get rid it from dict - sorted_parts = sorted( - self.received_chunks[msg_id]["parts"].values(), - key=lambda c: c["part_idx"], - ) - full_message = "".join(part["content"] for part in sorted_parts) - del self.received_chunks[msg_id] - return full_message, msg_id - return (None, None) - - def on_stream_message( - self, agora_local_user: LocalUser, user_id, stream_id, data, length - ): - # logger.info(f"Stream message", agora_local_user, user_id, stream_id, length) - (reassembled_message, msg_id) = self.handle_received_chunk(data.decode("utf-8")) - if reassembled_message is not None: - logger.info(f"Reassembled message: {msg_id} {reassembled_message}") - - - def on_audio_subscribe_state_changed( - self, - agora_local_user, - channel, - user_id, - old_state, - new_state, - elapse_since_last_state, - ): - logger.info(f'on_audio_subscribe_state_changed: {channel} {user_id} {old_state} {new_state} {elapse_since_last_state}') - self.emit_event("audio_subscribe_state_changed", channel, user_id, old_state, new_state) - - def on_playback_audio_frame_before_mixing( - self, agora_local_user: LocalUser, channelId, uid, frame: AudioFrame - ): - audio_frame = PcmAudioFrame() - audio_frame.samples_per_channel = frame.samples_per_channel - audio_frame.bytes_per_sample = frame.bytes_per_sample - audio_frame.number_of_channels = frame.channels - audio_frame.sample_rate = SAMPLE_RATE - audio_frame.data = frame.buffer - - self.loop.call_soon_threadsafe(self.audio_stream.queue.put_nowait, audio_frame) - return 0 - -class Channel(): - def __init__( - self, rtc: "RtcEngine", channelId: str, uid: str - ) -> None: - self.loop = asyncio.get_event_loop() - - # Create the event emitter - self.emitter = AsyncIOEventEmitter() - - self.rtc = rtc - self.chat = Chat(self) - self.channelId = channelId - self.uid = uid - conn_config = RTCConnConfig( - client_role_type=ClientRoleType.CLIENT_ROLE_BROADCASTER, - channel_profile=ChannelProfileType.CHANNEL_PROFILE_LIVE_BROADCASTING, - ) - self.connection = self.rtc.agora_service.create_rtc_connection(conn_config) - - self.channel_event_observer = ChannelEventObserver(self.emitter) - self.connection.register_observer(self.channel_event_observer) - self.connection.connect("", self.channelId, self.uid) - - self.local_user = self.connection.get_local_user() - self.local_user.set_playback_audio_frame_before_mixing_parameters( - CHANNELS, SAMPLE_RATE - ) - self.local_user.register_local_user_observer(self.channel_event_observer) - self.local_user.register_audio_frame_observer(self.channel_event_observer) - self.local_user.subscribe_all_audio() - - self.media_node_factory = self.rtc.agora_service.create_media_node_factory() - self.audio_pcm_data_sender = ( - self.media_node_factory.create_audio_pcm_data_sender() - ) - self.audio_track = self.rtc.agora_service.create_custom_audio_track_pcm( - self.audio_pcm_data_sender - ) - self.audio_track.set_enabled(1) - self.local_user.publish_audio(self.audio_track) - - self.stream_id = self.connection.create_data_stream(False, False) - self.received_chunks = {} - self.waiting_message = None - self.msg_id = "" - self.msg_index = "" - - async def disconnect(self) -> None: - """ - Disconnects the channel. - """ - disconnected_future = asyncio.Future[None]() - def callback(agora_rtc_conn: RTCConnection, conn_info: RTCConnInfo, reason): - self.off("connection_state_changed", callback) - if conn_info.state == 1: - disconnected_future.set_result(None) - self.on("connection_state_changed", callback) - self.connection.disconnect() - await disconnected_future - - def get_audio_frames(self) -> AudioStream: - """ - Returns the audio frames from the channel. - - Returns: - AudioStream: The audio stream. - """ - return self.channel_event_observer.audio_stream - - async def push_audio_frame(self, frame: bytes) -> None: - """ - Pushes an audio frame to the channel. - - Parameters: - frame: The audio frame to push. - """ - audio_frame = PcmAudioFrame() - audio_frame.data = bytearray(frame) - audio_frame.timestamp = 0 - audio_frame.bytes_per_sample = 2 - audio_frame.number_of_channels = CHANNELS - audio_frame.sample_rate = SAMPLE_RATE - audio_frame.samples_per_channel = int( - len(frame) / audio_frame.bytes_per_sample / audio_frame.number_of_channels - ) - - self.audio_pcm_data_sender.send_audio_pcm_data(audio_frame) - - async def subscribe_audio(self, uid: int) -> None: - """ - Subscribes to the audio of a user. - - Parameters: - uid: The user ID to subscribe to. - """ - loop = asyncio.get_event_loop() - future = loop.create_future() - - def callback( - agora_local_user, - channel, - user_id, - old_state, - new_state, - elapse_since_last_state, - ): - if new_state == 3: - loop.call_soon_threadsafe(future.set_result, None) - else: - loop.call_soon_threadsafe( - future.set_exception, - Exception( - f"subscribe {user_id} audio state from {old_state} to {new_state}" - ), - ) - - self.once("audio_subscribe_state_changed", callback) - self.local_user.subscribe_audio(uid) - - await future - - async def unsubscribe_audio(self, uid: int) -> None: - """ - Unsubscribes from the audio of a user. - - Parameters: - uid: The user ID to unsubscribe from. - """ - loop = asyncio.get_event_loop() - future = loop.create_future() - - def callback( - agora_local_user, - channel, - user_id, - old_state, - new_state, - elapse_since_last_state, - ): - if new_state == 3: - loop.call_soon_threadsafe(future.set_result, None) - else: - loop.call_soon_threadsafe( - future.set_exception, - Exception( - f"subscribe {user_id} audio state from {old_state} to {new_state}" - ), - ) - - self.once("audio_subscribe_state_changed", callback) - self.local_user.unsubscribe_audio(uid) - - await future - - def _split_string_into_chunks(self, long_string, msg_id, chunk_size=300) -> list[dict[str: Any]]: - """ - Splits a long string into chunks of a given size. - - Parameters: - long_string: The string to split. - msg_id: The message ID. - chunk_size: The size of each chunk. - - Returns: - list[dict[str: Any]]: The list of chunks. - - """ - total_parts = (len(long_string) + chunk_size - 1) // chunk_size - json_chunks = [] - for idx in range(total_parts): - start = idx * chunk_size - end = min(start + chunk_size, len(long_string)) - chunk = { - 'msg_id': msg_id, - 'part_idx': idx, - 'total_parts': total_parts, - 'content': long_string[start:end] - } - json_chunk = json.dumps(chunk, ensure_ascii=False) - json_chunks.append(json_chunk) - return json_chunks - - async def send_stream_message(self, data: str, msg_id: str) -> None: - """ - Sends a stream message to the channel. - - Parameters: - data: The data to send. - msg_id: The message ID. - """ - - chunks = self._split_string_into_chunks(data, msg_id) - for chunk in chunks: - self.connection.send_stream_message(self.stream_id, chunk) - - def on(self, event_name: str, callback): - """ - Allows external components to subscribe to events. - - Parameters: - event_name: The name of the event to subscribe to. - callback: The callback to call when the event is emitted. - - """ - self.emitter.on(event_name, callback) - - def once(self, event_name: str, callback): - """ - Allows external components to subscribe to events once. - - Parameters: - event_name: The name of the event to subscribe to. - callback: The callback to call when the event is emitted. - """ - self.emitter.once(event_name, callback) - - def off(self, event_name: str, callback): - """ - Allows external components to unsubscribe from events. - - Parameters: - event_name: The name of the event to unsubscribe from. - callback: The callback to remove from the event. - """ - self.emitter.remove_listener(event_name, callback) - -class ChatMessage(): - def __init__(self, message: str, msg_id: str, done: bool = False) -> None: - self.message = message - self.msg_id = msg_id - self.done = done - -class Chat(): - def __init__(self, channel: Channel) -> None: - self.channel = channel - self.loop = self.channel.loop - self.queue = asyncio.Queue() - - def log_exception(t: asyncio.Task[Any]) -> None: - if not t.cancelled() and t.exception(): - logger.error( - "unhandled exception", - exc_info=t.exception(), - ) - asyncio.create_task(self._process_message()).add_done_callback(log_exception) - - async def send_message(self, item: ChatMessage) -> None: - """ - Sends a message to the channel. - - Parameters: - item: The message to send. - """ - await self.queue.put(item) - # await self.queue.put_nowait(item) - - async def _process_message(self) -> None: - """ - Processes messages in the queue. - """ - - while True: - item: ChatMessage = await self.queue.get() - await self.channel.send_stream_message(item.message, item.msg_id) - self.queue.task_done() - # await asyncio.sleep(0) - -class RtcEngine: - def __init__(self, appid: str): - self.appid = appid - config = AgoraServiceConfig() - config.appid = appid - config.log_path = os.path.join( - os.path.dirname( - os.path.dirname( - os.path.dirname(os.path.join(os.path.abspath(__file__))) - ) - ), - "agorasdk.log", - ) - self.agora_service = AgoraService() - self.agora_service.initialize(config) - - async def connect(self, channelId: str, uid: str) -> Channel: - """ - Connects to a channel. - - Parameters: - channelId: The channel ID. - uid: The user ID. - - Returns: - Channel: The channel. - """ - - loop = asyncio.get_event_loop() - future = loop.create_future() - - def callback(agora_rtc_conn: RTCConnection, conn_info: RTCConnInfo, reason): - channel.off("connection_state_changed", callback) - if conn_info.state == 3: - future.set_result(channel) - else: - future.set_exception( - Exception(f"connection state:{conn_info.state}"), - ) - - channel = Channel(self, channelId, uid) - channel.on("connection_state_changed", callback) - - await future - return channel - - def destroy(self) -> None: - """ - Destroys the RTC engine. - """ - self.agora_service.release()`} - +
\ No newline at end of file diff --git a/shared/voice-sdk/get-started/get-started-sdk/index.mdx b/shared/voice-sdk/get-started/get-started-sdk/index.mdx index d6d639959..5ebd9228d 100644 --- a/shared/voice-sdk/get-started/get-started-sdk/index.mdx +++ b/shared/voice-sdk/get-started/get-started-sdk/index.mdx @@ -27,10 +27,12 @@ The following figure shows the basic workflow you implement to integrate this fe - The following information from [](https://console.agora.io/v2): - **App ID**: A unique string generated by Agora that identifies your project. + - **A temporary token**: A dynamic key that authenticates a user when the client joins a channel. Note down the channel name you use to generate the token. The maximum validity period of a temporary token is 24 hours. - + + Please refer to [Agora account management](../get-started/manage-agora-account) for details. ## Set up your project