From 111ad3a0573afe6acac484b40bc2f9ba1b4d5d43 Mon Sep 17 00:00:00 2001 From: Niel Teng Hu Date: Sat, 30 Dec 2023 11:24:17 -0500 Subject: [PATCH] add logging for azure (#110) --- actionweaver/llms/azure/chat.py | 304 ++++++++---------- actionweaver/llms/azure/tokens.py | 21 +- docs/source/notebooks/cookbooks/logging.ipynb | 40 ++- 3 files changed, 168 insertions(+), 197 deletions(-) diff --git a/actionweaver/llms/azure/chat.py b/actionweaver/llms/azure/chat.py index 301173a..afab68e 100644 --- a/actionweaver/llms/azure/chat.py +++ b/actionweaver/llms/azure/chat.py @@ -6,7 +6,7 @@ import time import uuid from itertools import chain -from typing import List, Union +from typing import List, Optional, Union from openai import AsyncAzureOpenAI, AzureOpenAI, Stream from openai.types.chat.chat_completion_message import ( @@ -17,6 +17,7 @@ from actionweaver.actions.action import Action, ActionHandlers from actionweaver.llms.azure.functions import Functions from actionweaver.llms.azure.tokens import TokenUsageTracker +from actionweaver.telemetry import traceable from actionweaver.utils import DEFAULT_ACTION_SCOPE from actionweaver.utils.stream import get_first_element_and_iterator, merge_dicts @@ -24,7 +25,16 @@ class ChatCompletionException(Exception): - pass + def __init__(self, message="", extra_info=None): + super().__init__(message) + self.extra_info = extra_info or {} + + def __str__(self): + # Customize the string representation to include extra_info + extra_info_str = ", ".join( + f"{key}: {value}" for key, value in self.extra_info.items() + ) + return f"{super().__str__()} | Additional Info: [{extra_info_str}]" class ChatCompletion: @@ -52,12 +62,10 @@ def __init__( api_version=api_version, azure_deployment=azure_deployment, ) - self.token_usage_tracker = token_usage_tracker or TokenUsageTracker( - logger=logger - ) + self.token_usage_tracker = token_usage_tracker or TokenUsageTracker() @staticmethod - def _handle_stream_response(api_response, logger, call_id): + def _handle_stream_response(api_response): _, iterator = get_first_element_and_iterator(api_response) stream_function_calls = None for chunk in iterator: @@ -83,15 +91,13 @@ def _handle_stream_response(api_response, logger, call_id): # if it has content return as generator right away return chain([chunk], iterator) else: - if logger: - logger.debug( - { - "message": "Unsupported streaming response", - "chunk": str(chunk), - "timestamp": time.time(), - "call_id": call_id, - } - ) + raise ChatCompletionException( + f"Unsupported streaming response", + extra_info={ + "message": "Unsupported streaming response", + "timestamp": time.time(), + }, + ) return stream_function_calls @staticmethod @@ -119,7 +125,6 @@ def build_orch(actions: List[Action] = None, orch=None): @staticmethod def _invoke_function( - call_id, messages, model, function_call, @@ -148,17 +153,15 @@ def _invoke_function( try: arguments = json.loads(function_call["arguments"]) except json.decoder.JSONDecodeError as e: - logger.error( - { + raise ChatCompletionException( + e, + extra_info={ "message": "Parsing function call arguments from OpenAI response ", "arguments": function_call["arguments"], "timestamp": time.time(), "model": model, - "call_id": call_id, }, - exc_info=True, - ) - raise ChatCompletionException(e) from e + ) from e # Invoke action function_response = action_handler[name](**arguments) @@ -171,18 +174,6 @@ def _invoke_function( } ] - logger.debug( - { - "message": "Action invoked and response received", - "action_name": name, - "action_arguments": arguments, - "action_response": function_response, - "timestamp": time.time(), - "call_id": call_id, - "stop": stop, - } - ) - # use tools in orch[DEFAULT_ACTION_SCOPE] if expr is DEFAULT_ACTION_SCOPE expr = ( orch[name] @@ -203,16 +194,6 @@ def _invoke_function( "content": unavailable_function_msg, } ] - logger.debug( - { - "message": "Unavailable action", - "action_name": name, - "action_arguments": arguments, - "action_response": unavailable_function_msg, - "timestamp": time.time(), - "call_id": call_id, - } - ) return functions, (False, None) def create( @@ -243,149 +224,148 @@ def validate_orch(orch): if not isinstance(key, str): raise ChatCompletionException( f"Orch keys must be action name (str), found {type(key)}" - ) + ) @staticmethod def wrap_chat_completion_create(original_create_method): - def new_create( - actions: List[Action] = [], - orch=None, - logger=logging.getLogger(__name__), - token_usage_tracker=None, + def wrapper_for_logging( *args, + logger: Optional[logging.Logger] = None, + logging_name: Optional[str] = None, + logging_metadata: Optional[dict] = None, + logging_level=logging.INFO, **kwargs, ): - ChatCompletion.validate_orch(orch) - - # Todo: pass call_id to the decorated method - call_id = str(uuid.uuid4()) - if token_usage_tracker is None: - token_usage_tracker = TokenUsageTracker(logger=logger) + DEFAULT_LOGGING_NAME = "actionweaver_initial_chat_completion" + + def new_create( + actions: List[Action] = [], + orch=None, + token_usage_tracker=None, + *args, + **kwargs, + ): + ChatCompletion.validate_orch(orch) + + chat_completion_create_method = original_create_method + if logger: + chat_completion_create_method = traceable( + name=(logging_name or DEFAULT_LOGGING_NAME) + + ".chat.completions.create", + logger=logger, + metadata=logging_metadata, + level=logging_level, + )(original_create_method) + + if token_usage_tracker is None: + token_usage_tracker = TokenUsageTracker() + + messages = kwargs.get("messages") + if messages is None: + raise ChatCompletionException( + "messages keyword argument is required for chat completion" + ) + model = kwargs.get("model") + if model is None: + raise ChatCompletionException( + "model keyword argument is required for chat completion" + ) - messages = kwargs.get("messages") - if messages is None: - raise ChatCompletionException( - "messages keyword argument is required for chat completion" - ) - model = kwargs.get("model") - if model is None: - raise ChatCompletionException( - "model keyword argument is required for chat completion" - ) + action_handler, orch = ChatCompletion.build_orch(actions, orch) - action_handler, orch = ChatCompletion.build_orch(actions, orch) + response = None + functions = Functions.from_expr(orch[DEFAULT_ACTION_SCOPE]) - logger.debug( - { - "message": "Creating new chat completion", - "timestamp": time.time(), - "call_id": call_id, - } - ) - - response = None - functions = Functions.from_expr(orch[DEFAULT_ACTION_SCOPE]) - - while True: - logger.debug( - { - "message": "Calling OpenAI API", - "call_id": call_id, - "timestamp": time.time(), - **functions.to_arguments(), - } - ) - - function_argument = functions.to_arguments() - if function_argument["functions"]: - api_response = original_create_method( - *args, - **kwargs, - **function_argument, - ) - else: - api_response = original_create_method( - *args, - **kwargs, - ) + while True: + function_argument = functions.to_arguments() + if function_argument["functions"]: + api_response = chat_completion_create_method( + *args, + **kwargs, + **function_argument, + ) + else: + api_response = chat_completion_create_method( + *args, + **kwargs, + ) - # logic to handle streaming API response - processed_stream_response = None - if isinstance(api_response, Stream): - processed_stream_response = ChatCompletion._handle_stream_response( - api_response, logger, call_id - ) + # logic to handle streaming API response + processed_stream_response = None + if isinstance(api_response, Stream): + processed_stream_response = ( + ChatCompletion._handle_stream_response(api_response, logger) + ) - if type(processed_stream_response) == itertools.chain: - # if it's a tee object, return the message right away - return processed_stream_response - else: - token_usage_tracker.track_usage(api_response.usage) - - if processed_stream_response is not None: - functions, ( - stop, - resp, - ) = ChatCompletion._invoke_function( - call_id, - messages, - model, - processed_stream_response, - functions, - orch, - action_handler, - ) - if stop: - return resp - else: - choice = api_response.choices[0] - message = choice.message + if type(processed_stream_response) == itertools.chain: + # if it's a tee object, return the message right away + return processed_stream_response + else: + token_usage_tracker.track_usage(api_response.usage) - if message.function_call: + if processed_stream_response is not None: functions, ( stop, resp, ) = ChatCompletion._invoke_function( - call_id, messages, model, - message.function_call, + processed_stream_response, functions, orch, action_handler, ) if stop: return resp - elif message.content is not None: - response = api_response - - # ignore last message in the function loop - # messages += [{"role": "assistant", "content": message["content"]}] - if choice.finish_reason == "stop": - """ - Stop Reasons: - - - Occurs when the API returns a message that is complete or is concluded by one of the stop sequences defined via the 'stop' parameter. - - See https://platform.openai.com/docs/guides/gpt/chat-completions-api for details. - """ - logger.debug( - { - "message": "Model decides to stop", - "model": model, - "timestamp": time.time(), - "call_id": call_id, - } - ) - - break else: - raise ChatCompletionException( - f"Unsupported response from OpenAI api: {api_response}" - ) - return response - - return new_create + choice = api_response.choices[0] + message = choice.message + + if message.function_call: + functions, ( + stop, + resp, + ) = ChatCompletion._invoke_function( + messages, + model, + message.function_call, + functions, + orch, + action_handler, + ) + if stop: + return resp + elif message.content is not None: + response = api_response + + # ignore last message in the function loop + # messages += [{"role": "assistant", "content": message["content"]}] + if choice.finish_reason == "stop": + """ + Stop Reasons: + + - Occurs when the API returns a message that is complete or is concluded by one of the stop sequences defined via the 'stop' parameter. + + See https://platform.openai.com/docs/guides/gpt/chat-completions-api for details. + """ + break + else: + raise ChatCompletionException( + f"Unsupported response from OpenAI api: {api_response}" + ) + return response + + if logger: + return traceable( + name=logging_name or DEFAULT_LOGGING_NAME, + logger=logger, + metadata=logging_metadata, + level=logging_level, + )(new_create)(*args, **kwargs) + else: + return new_create(*args, **kwargs) + + return wrapper_for_logging @staticmethod def patch(client: Union[AzureOpenAI, AsyncAzureOpenAI]): diff --git a/actionweaver/llms/azure/tokens.py b/actionweaver/llms/azure/tokens.py index ea7f632..063e9ba 100644 --- a/actionweaver/llms/azure/tokens.py +++ b/actionweaver/llms/azure/tokens.py @@ -9,8 +9,7 @@ class TokenUsageTrackerException(Exception): class TokenUsageTracker: - def __init__(self, budget=None, logger=None): - self.logger = logger or logging.getLogger(__name__) + def __init__(self, budget=None): self.tracker = collections.Counter() self.budget = budget @@ -21,25 +20,7 @@ def clear(self): def track_usage(self, usage: Dict): self.tracker = self.tracker + collections.Counter(usage) - self.logger.debug( - { - "message": "token usage updated", - "usage": usage, - "total_usage": dict(self.tracker), - "timestamp": time.time(), - "budget": self.budget, - }, - ) if self.budget is not None and self.tracker["total_tokens"] > self.budget: - self.logger.error( - { - "message": "Token budget exceeded", - "usage": usage, - "total_usage": dict(self.tracker), - "budget": self.budget, - }, - exc_info=True, - ) raise TokenUsageTrackerException( f"Token budget exceeded. Budget: {self.budget}, Usage: {dict(self.tracker)}" ) diff --git a/docs/source/notebooks/cookbooks/logging.ipynb b/docs/source/notebooks/cookbooks/logging.ipynb index 7c71370..25edce8 100644 --- a/docs/source/notebooks/cookbooks/logging.ipynb +++ b/docs/source/notebooks/cookbooks/logging.ipynb @@ -12,16 +12,23 @@ }, { "cell_type": "code", - "execution_count": 34, + "execution_count": 8, "id": "a2df6e35-6e58-4232-bfa9-9e61022059e1", "metadata": {}, "outputs": [], "source": [ - "from actionweaver.llms.openai.tools.tokens import TokenUsageTracker\n", + "import os\n", + "from openai import AzureOpenAI\n", "from actionweaver.llms import patch\n", - "from openai import OpenAI\n", + "from actionweaver.llms.azure.tokens import TokenUsageTracker\n", "\n", - "openai_client = patch(OpenAI())" + "\n", + "\n", + "client = patch(AzureOpenAI(\n", + " azure_endpoint = os.getenv(\"AZURE_OPENAI_ENDPOINT\"), \n", + " api_key=os.getenv(\"AZURE_OPENAI_KEY\"), \n", + " api_version=\"2023-10-01-preview\"\n", + "))" ] }, { @@ -32,7 +39,7 @@ }, { "cell_type": "code", - "execution_count": 35, + "execution_count": 9, "id": "373908f0-87cc-458a-a153-7e6fef571cf9", "metadata": {}, "outputs": [], @@ -67,7 +74,7 @@ }, { "cell_type": "code", - "execution_count": 36, + "execution_count": 10, "id": "4f36d771-ece1-4596-9b4c-e5bb879e0d74", "metadata": {}, "outputs": [], @@ -106,7 +113,7 @@ }, { "cell_type": "code", - "execution_count": 37, + "execution_count": 11, "id": "191eb720-a9fa-4d57-90f2-a7a2b595a056", "metadata": {}, "outputs": [ @@ -114,6 +121,7 @@ "name": "stdout", "output_type": "stream", "text": [ + "Getting current time...\n", "Getting current weather\n", "Getting current weather\n" ] @@ -121,10 +129,10 @@ { "data": { "text/plain": [ - "ChatCompletion(id='chatcmpl-8bI3vh2qNhRg6y4pgWvsM797lrB0Y', choices=[Choice(finish_reason='stop', index=0, logprobs=None, message=ChatCompletionMessage(content='The current weather in San Francisco is 72°F and in Tokyo is 10°C.', role='assistant', function_call=None, tool_calls=None))], created=1703898259, model='gpt-3.5-turbo-0613', object='chat.completion', system_fingerprint=None, usage=CompletionUsage(completion_tokens=19, prompt_tokens=162, total_tokens=181))" + "ChatCompletion(id='chatcmpl-8bIPaEOsveApjS9dnOKVqRCLHNC3F', choices=[Choice(finish_reason='stop', index=0, logprobs=None, message=ChatCompletionMessage(content='The current time is 20:26:42.135028. \\n\\nIn San Francisco, the weather is currently 72°F.\\n\\nIn Tokyo, the weather is currently 10°C.', role='assistant', function_call=None, tool_calls=None), content_filter_results={'hate': {'filtered': False, 'severity': 'safe'}, 'self_harm': {'filtered': False, 'severity': 'safe'}, 'sexual': {'filtered': False, 'severity': 'safe'}, 'violence': {'filtered': False, 'severity': 'safe'}})], created=1703899602, model='gpt-35-turbo-16k', object='chat.completion', system_fingerprint=None, usage=CompletionUsage(completion_tokens=39, prompt_tokens=240, total_tokens=279), prompt_filter_results=[{'prompt_index': 0, 'content_filter_results': {'hate': {'filtered': False, 'severity': 'safe'}, 'self_harm': {'filtered': False, 'severity': 'safe'}, 'sexual': {'filtered': False, 'severity': 'safe'}, 'violence': {'filtered': False, 'severity': 'safe'}}}])" ] }, - "execution_count": 37, + "execution_count": 11, "metadata": {}, "output_type": "execute_result" } @@ -132,14 +140,14 @@ "source": [ "messages = [\n", " {\"role\": \"system\", \"content\": \"You are a helpful assistant.\"},\n", - " {\"role\": \"user\", \"content\": \"what's the weather in San Francisco and Tokyo ?\"}\n", + " {\"role\": \"user\", \"content\": \"what time is it and what's the weather in San Francisco and Tokyo ?\"}\n", " ]\n", "\n", "\n", - "response = openai_client.chat.completions.create(\n", - " model=\"gpt-3.5-turbo\",\n", + "response = client.chat.completions.create(\n", + " model=\"gpt-35-turbo-0613-16k\",\n", " messages=messages,\n", - " actions = [get_current_weather],\n", + " actions = [get_current_time, get_current_weather],\n", " stream=False, \n", " token_usage_tracker = TokenUsageTracker(5000),\n", " logger=logger,\n", @@ -169,7 +177,7 @@ }, { "cell_type": "code", - "execution_count": 38, + "execution_count": 12, "id": "933fffe7-7ad7-452e-9786-1f993be3b071", "metadata": {}, "outputs": [], @@ -242,7 +250,9 @@ "id": "1a185812-f553-435f-b43e-3f15056c7e16", "metadata": {}, "outputs": [], - "source": [] + "source": [ + "# explain inital_conversation mean" + ] }, { "cell_type": "code",