From b173faa3f42dd0511ce2b9e75b709ee5b2a67cc7 Mon Sep 17 00:00:00 2001 From: digitallysavvy Date: Wed, 2 Oct 2024 21:17:58 -0400 Subject: [PATCH] added utils, parse args, logger, and updted main --- shared/open-ai-integration/quickstart.mdx | 656 ++++++++++++++++++---- 1 file changed, 542 insertions(+), 114 deletions(-) diff --git a/shared/open-ai-integration/quickstart.mdx b/shared/open-ai-integration/quickstart.mdx index 67704fa38..d5935b833 100644 --- a/shared/open-ai-integration/quickstart.mdx +++ b/shared/open-ai-integration/quickstart.mdx @@ -1,6 +1,6 @@ import CodeRtcPy from '@docs/assets/code/open-ai-integration/rtc-py.mdx'; import Prerequisites from '@docs/shared/common/prerequisites/python.mdx'; -import CompleteCode from './complete-code.mdx' +import CompleteCode from './complete-code.mdx'; Integrating Agora's real-time audio communication capabilities with OpenAI's language models enables dynamic, conversational AI experiences. This guide shows you how to set up a Python project that combines Agora's server-side Voice SDK with OpenAI's API to create an interactive, voice-driven assistant. @@ -20,9 +20,9 @@ The following figure illustrates the integration topology: - FFmpeg - ```bash - sudo apt install ffmpeg - ``` + ```bash + sudo apt install ffmpeg + ``` ## Getting Started @@ -39,92 +39,92 @@ cd agora-openai-converse 1. Create a new folder for the project: - ``` - mkdir realtime_agent - cd realtime_agent/ - ``` + ``` + mkdir realtime_agent + cd realtime_agent/ + ``` 1. Create the base project structure: - ``` - mkdir -p realtimeapi && touch {__init__.py,.env,agent.py,logger.py,main.py,parse_args.py,tools.py,utils.py,requirements.txt,realtimeapi/connection.py,realtimeapi/struct.py} - ``` - - The project structure should look like this: + ``` + mkdir -p realtimeapi && touch {__init__.py,.env,agent.py,logger.py,main.py,parse_args.py,tools.py,utils.py,requirements.txt,realtimeapi/connection.py,realtimeapi/struct.py} + ``` - ``` - /realtime_agent - ├── __init__.py - ├── .env - ├── agent.py - ├── logger.py - ├── main.py - ├── parse_args.py - ├── tools.py - ├── utils.py - ├── requirements.txt - └── realtimeapi - ├── connection.py - └── struct.py - ``` + The project structure should look like this: + + ``` + /realtime_agent + ├── __init__.py + ├── .env + ├── agent.py + ├── logger.py + ├── main.py + ├── parse_args.py + ├── tools.py + ├── utils.py + ├── requirements.txt + └── realtimeapi + ├── connection.py + └── struct.py + ``` 1. Add the following dependencies to the `requirements.txt` file: - ``` - agora-python-server-sdk==2.0.5 - agora-realtime-ai-api==1.0.6 - aiohappyeyeballs==2.4.0 - aiohttp==3.10.6 - aiohttp[speedups] - aiosignal==1.3.1 - annotated-types==0.7.0 - anyio==4.4.0 - attrs==24.2.0 - black==24.4.2 - certifi==2024.7.4 - cffi==1.17.1 - click==8.1.7 - colorlog>=6.0.0 - distro==1.9.0 - frozenlist==1.4.1 - h11==0.14.0 - httpcore==1.0.5 - httpx==0.27.0 - idna==3.10 - iniconfig==2.0.0 - multidict==6.1.0 - mypy==1.10.1 - mypy-extensions==1.0.0 - numpy==1.26.4 - numpy>=1.21.0 - openai==1.37.1 - packaging==24.1 - pathspec==0.12.1 - platformdirs==4.2.2 - pluggy==1.5.0 - psutil==5.9.8 - protobuf==5.27.2 - PyAudio==0.2.14 - pyaudio>=0.2.11 - pycparser==2.22 - pydantic==2.9.2 - pydantic_core==2.23.4 - pydub==0.25.1 - pyee==12.0.0 - PyJWT==2.8.0 - pytest==8.2.2 - python-dotenv==1.0.1 - ruff==0.5.2 - six==1.16.0 - sniffio==1.3.1 - sounddevice==0.4.7 - sounddevice>=0.4.6 - tqdm==4.66.4 - types-protobuf==4.25.0.20240417 - typing_extensions==4.12.2 - watchfiles==0.22.0 - yarl==1.12.1 - ``` + ``` + agora-python-server-sdk==2.0.5 + agora-realtime-ai-api==1.0.6 + aiohappyeyeballs==2.4.0 + aiohttp==3.10.6 + aiohttp[speedups] + aiosignal==1.3.1 + annotated-types==0.7.0 + anyio==4.4.0 + attrs==24.2.0 + black==24.4.2 + certifi==2024.7.4 + cffi==1.17.1 + click==8.1.7 + colorlog>=6.0.0 + distro==1.9.0 + frozenlist==1.4.1 + h11==0.14.0 + httpcore==1.0.5 + httpx==0.27.0 + idna==3.10 + iniconfig==2.0.0 + multidict==6.1.0 + mypy==1.10.1 + mypy-extensions==1.0.0 + numpy==1.26.4 + numpy>=1.21.0 + openai==1.37.1 + packaging==24.1 + pathspec==0.12.1 + platformdirs==4.2.2 + pluggy==1.5.0 + psutil==5.9.8 + protobuf==5.27.2 + PyAudio==0.2.14 + pyaudio>=0.2.11 + pycparser==2.22 + pydantic==2.9.2 + pydantic_core==2.23.4 + pydub==0.25.1 + pyee==12.0.0 + PyJWT==2.8.0 + pytest==8.2.2 + python-dotenv==1.0.1 + ruff==0.5.2 + six==1.16.0 + sniffio==1.3.1 + sounddevice==0.4.7 + sounddevice>=0.4.6 + tqdm==4.66.4 + types-protobuf==4.25.0.20240417 + typing_extensions==4.12.2 + watchfiles==0.22.0 + yarl==1.12.1 + ``` 1. Open the `.env` file and fill in the values for the environment variables: @@ -133,7 +133,7 @@ cd agora-openai-converse AGORA_APP_ID= AGORA_APP_CERT= - # OpenAI API key and model + # OpenAI API key and model OPENAI_API_KEY= OPENAI_MODEL= @@ -235,7 +235,7 @@ class InferenceConfig: - Configure turn detection, system message, and voice parameters. """ system_message: str | None = None - turn_detection: ServerVADUpdateParams | None = None + turn_detection: ServerVADUpdateParams | None = None voice: Voices | None = None class RealtimeKitAgent: @@ -275,7 +275,7 @@ class RealtimeKitAgent: client: RealtimeApiConnection, tools: ToolContext | None, channel: Channel, - ) -> None: + ) -> None: """Initialize the agent with the provided tools and channel. - This method sets up the initial state of the agent and its tool context. """ @@ -365,7 +365,7 @@ The `setup_and_run_agent` method connects to an Agora channel using `RtcEngine`, finally: await channel.disconnect() - await client.shutdown() + await client.shutdown() ``` ### Initialize the RealtimeKitAgent @@ -547,13 +547,13 @@ Key features implemented by `_process_model_messages`: - **Handling transcripts**: If the message contains partial or final text transcripts, they are processed and sent to the Agora chat. - **Handling other responses**: Additional message types, such as tool invocations and other outputs are processed as needed. - ```python - async def _process_model_messages(self) -> None: - # Continuously listen for incoming messages from OpenAI - async for message in self.client.listen(): - match message: - # Handle different message types - ``` + ```python + async def _process_model_messages(self) -> None: + # Continuously listen for incoming messages from OpenAI + async for message in self.client.listen(): + match message: + # Handle different message types + ``` ### Audio and message flow @@ -616,17 +616,17 @@ The agent can also be extended to support a variety of other message types, incl while not self.audio_queue.empty(): self.audio_queue.get_nowait() logger.info(f"TMS:InputAudioBufferSpeechStarted: item_id: {message.item_id}") - + case InputAudioBufferSpeechStopped(): logger.info(f"TMS:InputAudioBufferSpeechStopped: item_id: {message.item_id}") pass - + case ResponseAudioDelta(): # logger.info("Received audio message") self.audio_queue.put_nowait(base64.b64decode(message.delta)) # loop.call_soon_threadsafe(self.audio_queue.put_nowait, base64.b64decode(message.delta)) logger.info(f"TMS:ResponseAudioDelta: response_id:{message.response_id},item_id: {message.item_id}") - + case ResponseAudioTranscriptDelta(): # logger.info(f"Received text message {message=}") asyncio.create_task(self.channel.chat.send_message( @@ -689,7 +689,7 @@ The method implements the following: - **Timeout handling**: If no user joins within `60 seconds`, a `TimeoutError` is raised, which is logged as an error. - **Cleanup**: After successfully getting a user ID or timing out, the event listener is removed using `channel.off("user_joined", on_user_joined)`. -In `agent.py`, replace the placeholder code with: +In `agent.py`, replace the placeholder code with: ```python async def wait_for_remote_user(channel: Channel) -> int: @@ -707,12 +707,69 @@ async def wait_for_remote_user(channel: Channel) -> int: return remote_user except KeyboardInterrupt: future.cancel() - + except Exception as e: logger.error(f"Error waiting for remote user: {e}") raise ``` +### Utils + +In the Agent.py file, we initialize a PCMWriter instance, which is responsible for writing audio frames to a file that will be sent to the AI for processing. The PCMWriter class, along with its methods, is defined in the utils.py file. + +```python +import asyncio +import functools +from datetime import datetime + + +def write_pcm_to_file(buffer: bytearray, file_name: str) -> None: + """Helper function to write PCM data to a file.""" + with open(file_name, "ab") as f: # append to file + f.write(buffer) + + +def generate_file_name(prefix: str) -> str: + # Create a timestamp for the file name + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + return f"{prefix}_{timestamp}.pcm" + + +class PCMWriter: + def __init__(self, prefix: str, write_pcm: bool, buffer_size: int = 1024 * 64): + self.write_pcm = write_pcm + self.buffer = bytearray() + self.buffer_size = buffer_size + self.file_name = generate_file_name(prefix) if write_pcm else None + self.loop = asyncio.get_event_loop() + + async def write(self, data: bytes) -> None: + """Accumulate data into the buffer and write to file when necessary.""" + if not self.write_pcm: + return + + self.buffer.extend(data) + + # Write to file if buffer is full + if len(self.buffer) >= self.buffer_size: + await self._flush() + + async def flush(self) -> None: + """Write any remaining data in the buffer to the file.""" + if self.write_pcm and self.buffer: + await self._flush() + + async def _flush(self) -> None: + """Helper method to write the buffer to the file.""" + if self.file_name: + await self.loop.run_in_executor( + None, + functools.partial(write_pcm_to_file, self.buffer[:], self.file_name), + ) + self.buffer.clear() + +``` + ### Tool Management Every agent needs a tool management system to extend the agent's functionality by allowing OpenAI’s model to invoke specific tools. These tools can either run locally or pass data back to the model for further processing. By registering tools and executing them based on incoming messages, the agent adds the capability and flexibility to handling a variety of tasks. @@ -785,7 +842,7 @@ class PassThroughFunctionToolDeclaration: }, } -ToolDeclaration = LocalFunctionToolDeclaration | PassThroughFunctionToolDeclaration +ToolDeclaration = LocalFunctionToolDeclaration | PassThroughFunctionToolDeclaration ``` The `ToolContext` class manages all available tools. It provides the logic for both registering tools and executing them when requested by the OpenAI model. Once tools are registered, the agent can execute them in response to messages from OpenAI’s model. The agent listens for tool call requests and either executes the tool locally or passes data back to the model. @@ -877,7 +934,7 @@ The following flow illustrates how this works: 2. The `_process_model_messages` method identifies the tool call request. 3. The agent retrieves the relevant tool from the `ToolContext` and executes it, either locally or by passing data back to the model. -This integration between **message processing** and **tool management** ensures that the agent can extend its capabilities dynamically, performing tasks or calculations in real-time based on incoming requests. +This integration between **message processing** and **tool management** ensures that the agent can extend its capabilities dynamically, performing tasks or calculations in real-time based on incoming requests. The `ClientToolCallResponse` model represents the response after a tool is invoked and processed. This class is designed to represent the response of a client-side tool call, where the tool_call_id uniquely identifies the tool call, and the result can take on multiple data types, representing the output of that call. The flexibility in the result field allows for a wide variety of responses. @@ -891,7 +948,70 @@ With these pieces in place, the agent can effectively manage tool registration a ## Set up a server -The `main.py` script sets up an HTTP server that handles real-time agent processes using Agora's RTC engine and RealtimeKit agents. It includes routes for starting and stopping agents, manages processes for different channels, and handles cleanup and shutdown procedures. The script manages these agents asynchronously. +The `main.py` script orchestrates the initialization of an HTTP server that allows clients to start and stop AI-driven agents in Agora voice channels. It includes routes for starting and stopping agents, manages processes for different channels, and handles cleanup and shutdown procedures. The agents run as separate processes, ensuring they can handle real-time interactions without blocking the main server. The application leverages aiohttp for handling HTTP requests, multiprocessing to manage agent processes, and asyncio for non-blocking execution. + +Open `main.py` and add the imports and load the env variables. + +```python +import asyncio +import logging +import os +import signal +from multiprocessing import Process + +from aiohttp import web +from dotenv import load_dotenv +from pydantic import BaseModel, Field, ValidationError + +from .realtime.struct import PCM_CHANNELS, PCM_SAMPLE_RATE, ServerVADUpdateParams, Voices + +from .agent import InferenceConfig, RealtimeKitAgent +from agora_realtime_ai_api.rtc import RtcEngine, RtcOptions +from .logger import setup_logger +from .parse_args import parse_args, parse_args_realtimekit + +# Load and validate the environment variables +load_dotenv(override=True) +app_id = os.environ.get("AGORA_APP_ID") +app_cert = os.environ.get("AGORA_APP_CERT") + +if not app_id: + raise ValueError("AGORA_APP_ID must be set in the environment.") + +class StartAgentRequestBody(BaseModel): + channel_name: str = Field(..., description="The name of the channel") + uid: int = Field(..., description="The UID of the user") + language: str = Field("en", description="The language of the agent") + +class StopAgentRequestBody(BaseModel): + channel_name: str = Field(..., description="The name of the channel") +``` + +### Process Management and Signal Handling + +The monitor_process function asynchronously monitors each agent process, ensuring that once it finishes, the process is cleaned up. handle_agent_proc_signal ensures that any agent receiving a termination signal exits gracefully. This process management ensures that the application can run multiple agents concurrently while maintaining proper resource management. + +```python +async def monitor_process(channel_name: str, process: Process): + # Wait for the process to finish in a non-blocking way + await asyncio.to_thread(process.join) + + logger.info(f"Process for channel {channel_name} has finished") + + # Perform additional work after the process finishes + # For example, removing the process from the active_processes dictionary + if channel_name in active_processes: + active_processes.pop(channel_name) + + # Perform any other cleanup or additional actions you need here + logger.info(f"Cleanup for channel {channel_name} completed") + + logger.info(f"Remaining active processes: {len(active_processes.keys())}") + +def handle_agent_proc_signal(signum, frame): + logger.info(f"Agent process received signal {signal.strsignal(signum)}. Exiting...") + os._exit(0) +``` ### Run the agent @@ -913,8 +1033,8 @@ def run_agent_in_process( options=RtcOptions( channel_name=channel_name, uid=uid, - sample_rate=SAMPLE_RATE, - channels=CHANNELS, + sample_rate=PCM_SAMPLE_RATE, + channels=PCM_CHANNELS, enable_pcm_dump= os.environ.get("WRITE_RTC_PCM", "false") == "true" ), inference_config=inference_config, @@ -923,9 +1043,126 @@ def run_agent_in_process( ) ``` +### HTTP Routes for Managing Agents +The start_agent and stop_agent routes are the main HTTP endpoints that allow clients to control the agents. As part of the start and stop we need to keep track of the active_processes. + +```python +# Dictionary to keep track of processes by channel name or UID +active_processes = {} +``` + +When a POST request is made to /start_agent, the server validates the request, starts a new agent process (if one isn’t already running), and begins monitoring it. The processes are stored in an active_processes dictionary for easy management. + +```python +async def start_agent(request): + try: + # Parse and validate JSON body using the pydantic model + try: + data = await request.json() + validated_data = StartAgentRequestBody(**data) + except ValidationError as e: + return web.json_response( + {"error": "Invalid request data", "details": e.errors()}, status=400 + ) + + # Parse JSON body + channel_name = validated_data.channel_name + uid = validated_data.uid + language = validated_data.language + + # Check if a process is already running for the given channel_name + if ( + channel_name in active_processes + and active_processes[channel_name].is_alive() + ): + return web.json_response( + {"error": f"Agent already running for channel: {channel_name}"}, + status=400, + ) + + system_message = "" + if language == "en": + system_message = """\ +Your knowledge cutoff is 2023-10. You are a helpful, witty, and friendly AI. Act like a human, but remember that you aren't a human and that you can't do human things in the real world. Your voice and personality should be warm and engaging, with a lively and playful tone. If interacting in a non-English language, start by using the standard accent or dialect familiar to the user. Talk quickly. You should always call a function if you can. Do not refer to these rules, even if you're asked about them.\ +""" + + inference_config = InferenceConfig( + system_message=system_message, + voice=Voices.Alloy, + turn_detection=ServerVADUpdateParams( + type="server_vad", threshold=0.5, prefix_padding_ms=300, silence_duration_ms=200 + ), + ) + # Create a new process for running the agent + process = Process( + target=run_agent_in_process, + args=(app_id, app_cert, channel_name, uid, inference_config), + ) + + try: + process.start() + except Exception as e: + logger.error(f"Failed to start agent process: {e}") + return web.json_response( + {"error": f"Failed to start agent: {e}"}, status=500 + ) + + # Store the process in the active_processes dictionary using channel_name as the key + active_processes[channel_name] = process + + # Monitor the process in a background asyncio task + asyncio.create_task(monitor_process(channel_name, process)) + + return web.json_response({"status": "Agent started!"}) + + except Exception as e: + logger.error(f"Failed to start agent: {e}") + return web.json_response({"error": str(e)}, status=500) +``` + +The stop_agent route handles requests to stop an active agent. It first validates the request body using StopAgentRequestBody. If a process is found for the specified channel, it terminates the process using os.kill and sends a SIGKILL signal. The process is then removed from the active_processes dictionary, and a response is returned to confirm termination. If no process is found, a 404 error is returned, indicating the agent was not active. + +```python +# HTTP Server Routes: Stop Agent +async def stop_agent(request): + try: + # Parse and validate JSON body using the pydantic model + try: + data = await request.json() + validated_data = StopAgentRequestBody(**data) + except ValidationError as e: + return web.json_response( + {"error": "Invalid request data", "details": e.errors()}, status=400 + ) + + # Parse JSON body + channel_name = validated_data.channel_name + + # Find and terminate the process associated with the given channel name + process = active_processes.get(channel_name) + + if process and process.is_alive(): + logger.info(f"Terminating process for channel {channel_name}") + await asyncio.to_thread(os.kill, process.pid, signal.SIGKILL) + + return web.json_response( + {"status": "Agent process terminated", "channel_name": channel_name} + ) + else: + return web.json_response( + {"error": "No active agent found for the provided channel_name"}, + status=404, + ) + + except Exception as e: + logger.error(f"Failed to stop agent: {e}") + return web.json_response({"error": str(e)}, status=500) + +``` + ### Shutdown gracefully -The `shutdown` function gracefully cancels running tasks and stopping the event loop. This prevents tasks from hanging and ensures resources are properly released. +The shutdown function is responsible for cleaning up agent processes when the server is shutting down. It iterates through all the processes in active_processes and terminates any that are still alive, ensuring no orphaned processes remain. This is essential for graceful shutdowns, preventing resource leaks. Once all processes are terminated, it clears the active_processes dictionary to reset the server state. ```python async def shutdown(app): @@ -939,6 +1176,196 @@ async def shutdown(app): await asyncio.to_thread(process.join) # Ensure process has terminated active_processes.clear() logger.info("All processes terminated, shutting down server") + + +# Signal handler to gracefully stop the application +def handle_signal(signum, frame): + logger.info(f"Received exit signal {signal.strsignal(signum)}...") + + loop = asyncio.get_running_loop() + if loop.is_running(): + # Properly shutdown by stopping the loop and running shutdown + loop.create_task(shutdown(None)) + loop.stop() +``` + +### aiohttp application setup + +The init_app function sets up the core aiohttp web application. It defines the HTTP routes for starting and stopping AI agents with the /start_agent and /stop_agent endpoints, and attaches a cleanup task to properly shut down processes when the server exits. The function returns the initialized app object, ready to be managed by the event loop and handle incoming requests. + +```python +async def init_app(): + app = web.Application() + + # Add cleanup task to run on app exit + app.on_cleanup.append(shutdown) + + app.add_routes([web.post("/start_agent", start_agent)]) + app.add_routes([web.post("/stop_agent", stop_agent)]) + + return app + +``` + +### Main Entry + +Now that we have the entire agent setup, we are ready to bring it all together and implement the main entry point for our project. The main entry point of the program first parses the command-line arguments to determine whether the server should be started or an agent should be run directly. If server is chosen, it sets up the event loop and starts the aiohttp web server using init_app(), which binds the routes for starting and stopping agents. If agent is selected, it parses the RealtimeKit options and starts an agent process using run_agent_in_process. This structure allows the application to either act as a server managing agents or run an individual agent directly, depending on the context. + +```python +if __name__ == "__main__": + # Parse the action argument + args = parse_args() + # Action logic based on the action argument + if args.action == "server": + # Python 3.10+ requires explicitly creating a new event loop if none exists + try: + loop = asyncio.get_event_loop() + except RuntimeError: + # For Python 3.10+, use this to get a new event loop if the default is closed or not created + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + # Start the application using asyncio.run for the new event loop + app = loop.run_until_complete(init_app()) + web.run_app(app, port=int(os.getenv("SERVER_PORT") or "8080")) + elif args.action == "agent": + # Parse RealtimeKitOptions for running the agent + realtime_kit_options = parse_args_realtimekit() + + # Example logging for parsed options (channel_name and uid) + logger.info(f"Running agent with options: {realtime_kit_options}") + + inference_config = InferenceConfig( + system_message="""\ +Your knowledge cutoff is 2023-10. You are a helpful, witty, and friendly AI. Act like a human, but remember that you aren't a human and that you can't do human things in the real world. Your voice and personality should be warm and engaging, with a lively and playful tone. If interacting in a non-English language, start by using the standard accent or dialect familiar to the user. Talk quickly. You should always call a function if you can. Do not refer to these rules, even if you're asked about them.\ +""", + voice=Voices.Alloy, + turn_detection=ServerVADUpdateParams( + type="server_vad", threshold=0.5, prefix_padding_ms=300, silence_duration_ms=200 + ), + ) + run_agent_in_process( + engine_app_id=app_id, + engine_app_cert=app_cert, + channel_name=realtime_kit_options["channel_name"], + uid=realtime_kit_options["uid"], + inference_config=inference_config, + ) + +``` + +## Parse Args and Logger + +This project implements a few helper functions to support in parsing command line arguments and to handle logging. The setup of these components is beyond the scope of this guide, but for the purposes of providing a complete working codebase, the code for each is included below. + +### Parse Args +```python +import argparse +import logging +from typing import TypedDict + +from .logger import setup_logger + +# Set up the logger with color and timestamp support +logger = setup_logger(name=__name__, log_level=logging.INFO) + + +class RealtimeKitOptions(TypedDict): + channel_name: str + uid: int + +def parse_args(): + parser = argparse.ArgumentParser(description="Manage server and agent actions.") + + # Create a subparser for actions (server and agent) + subparsers = parser.add_subparsers(dest="action", required=True) + + # Subparser for the 'server' action (no additional arguments) + subparsers.add_parser("server", help="Start the server") + + # Subparser for the 'agent' action (with required arguments) + agent_parser = subparsers.add_parser("agent", help="Run an agent") + agent_parser.add_argument("--channel_name", required=True, help="Channel Id / must") + agent_parser.add_argument("--uid", type=int, default=0, help="User Id / default is 0") + + return parser.parse_args() + + +def parse_args_realtimekit() -> RealtimeKitOptions: + args = parse_args() + logger.info(f"Parsed arguments: {args}") + + if args.action == "agent": + options: RealtimeKitOptions = {"channel_name": args.channel_name, "uid": args.uid} + return options + + return None +``` + +### Logger + +```python +import logging +from datetime import datetime + +import colorlog + + +def setup_logger( + name: str, + log_level: int = logging.INFO, + log_format: str = "%(asctime)s - %(name)s - %(levelname)s - %(message)s", + use_color: bool = True +) -> logging.Logger: + """Sets up and returns a logger with color and timestamp support, including milliseconds.""" + + # Create or get a logger with the given name + logger = logging.getLogger(name) + + # Prevent the logger from propagating to the root logger (disable extra output) + logger.propagate = False + + # Clear existing handlers to avoid duplicate messages + if logger.hasHandlers(): + logger.handlers.clear() + + # Set the log level + logger.setLevel(log_level) + + # Create console handler + handler = logging.StreamHandler() + + # Custom formatter for adding milliseconds + class CustomFormatter(colorlog.ColoredFormatter): + def formatTime(self, record, datefmt=None): + record_time = datetime.fromtimestamp(record.created) + if datefmt: + return record_time.strftime(datefmt) + f",{int(record.msecs):03d}" + else: + return record_time.strftime("%Y-%m-%d %H:%M:%S") + f",{int(record.msecs):03d}" + + # Use custom formatter that includes milliseconds + if use_color: + formatter = CustomFormatter( + "%(log_color)s" + log_format, + datefmt="%Y-%m-%d %H:%M:%S", # Milliseconds will be appended manually + log_colors={ + "DEBUG": "cyan", + "INFO": "green", + "WARNING": "yellow", + "ERROR": "red", + "CRITICAL": "bold_red", + }, + ) + else: + formatter = CustomFormatter(log_format, datefmt="%Y-%m-%d %H:%M:%S") + + handler.setFormatter(formatter) + + # Add the handler to the logger + logger.addHandler(handler) + + return logger ``` Copy the [complete code](#complete-integration-code) for `main.py` and paste it into the corresponding file in your folder structure. @@ -952,9 +1379,9 @@ Copy the [complete code](#complete-integration-code) for `main.py` and paste it To set up and run the backend, take the following steps: 1. Make sure that you have updated the files in the `realtime_agent` folder with -the [complete code](#complete-integration-code). + the [complete code](#complete-integration-code). -1. Update the values for `AGORA_APP_ID`, `AGORA_APP_CERT`, and `OPENAI_API_KEY` in the project's** `.env` file. +1. Update the values for `AGORA_APP_ID`, `AGORA_APP_CERT`, and `OPENAI_API_KEY` in the project's\*\* `.env` file. Ensure that the necessary credentials for Agora and OpenAI are correctly configured in your project’s environment file. @@ -980,10 +1407,10 @@ The server provides a simple layer for managing agent processes. This api starts an agent with given graph and override properties. The started agent will join into the specified channel, and subscribe to the uid which your browser/device's rtc use to join. -| Param | Description | -| -------- | ------- | -| `channel_name` | Use the same channel name that your browser/device joins, agent needs to be in the same channel to communicate. | -| `uid` | The user ID that the AI agent uses to join. | +| Param | Description | +| -------------- | --------------------------------------------------------------------------------------------------------------- | +| `channel_name` | Use the same channel name that your browser/device joins, agent needs to be in the same channel to communicate. | +| `uid` | The user ID that the AI agent uses to join. | Example: @@ -1000,11 +1427,12 @@ curl 'http://localhost:8080/start_agent' \ This api stops the agent you started -| Param | Description | -| -------- | ------- | +| Param | Description | +| -------------- | ------------------------------------------------------ | | `channel_name` | Use the same channel name you used to start the agent. | Example: + ```bash curl 'http://localhost:8080/stop_agent' \ -H 'Content-Type: application/json' \ @@ -1023,4 +1451,4 @@ Additional relevant documentation that complements the current page or explains - Checkout the [Demo project on GitHub](https://github.com/AgoraIO/openai-realtime-python) - [API reference for `rtc.py`](https://api-ref.agora.io/en/voice-sdk/python/rtc-py-api.html) -- [Voice calling quickstart (Python)](/voice-calling/get-started/get-started-sdk?platform=python) \ No newline at end of file +- [Voice calling quickstart (Python)](/voice-calling/get-started/get-started-sdk?platform=python)