Skip to content

Commit

Permalink
Merge pull request #1722 from Agenta-AI/fix-obs-pr-2
Browse files Browse the repository at this point in the history
Improvements to observability PR
  • Loading branch information
aybruhm authored May 30, 2024
2 parents 514f2dc + 623197e commit 46539a2
Show file tree
Hide file tree
Showing 13 changed files with 122 additions and 128 deletions.
2 changes: 0 additions & 2 deletions agenta-backend/agenta_backend/services/app_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,6 @@ async def start_variant(
env_vars = {} if env_vars is None else env_vars
env_vars.update(
{
"AGENTA_VARIANT_NAME": db_app_variant.variant_name,
"AGENTA_VARIANT_ID": str(db_app_variant.id),
"AGENTA_BASE_ID": str(db_app_variant.base.id),
"AGENTA_APP_ID": str(db_app_variant.app.id),
"AGENTA_HOST": domain_name,
Expand Down
2 changes: 1 addition & 1 deletion agenta-cli/agenta/docker/docker_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)

DEBUG = True
DEBUG = False


def create_dockerfile(out_folder: Path) -> Path:
Expand Down
157 changes: 81 additions & 76 deletions agenta-cli/agenta/sdk/agenta_init.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
import logging
import toml
from typing import Optional

from agenta.sdk.utils.globals import set_global
Expand All @@ -12,19 +13,6 @@
logger.setLevel(logging.DEBUG)


BACKEND_URL_SUFFIX = os.environ.get("BACKEND_URL_SUFFIX", "api")
CLIENT_API_KEY = os.environ.get("AGENTA_API_KEY")
CLIENT_HOST = os.environ.get("AGENTA_HOST", "http://localhost")


# initialize the client with the backend url and api key
backend_url = f"{CLIENT_HOST}/{BACKEND_URL_SUFFIX}"
client = AgentaApi(
base_url=backend_url,
api_key=CLIENT_API_KEY if CLIENT_API_KEY else "",
)


class AgentaSingleton:
"""Singleton class to save all the "global variables" for the sdk."""

Expand Down Expand Up @@ -53,73 +41,69 @@ def init(
app_id: Optional[str] = None,
host: Optional[str] = None,
api_key: Optional[str] = None,
config_fname: Optional[str] = None,
) -> None:
"""Main function to initialize the singleton.
Initializes the singleton with the given `app_name`, `base_name`, and `host`. If any of these arguments are not provided,
the function will look for them in environment variables.
Initializes the singleton with the given `app_id`, `host`, and `api_key`. The order of precedence for these variables is:
1. Explicit argument provided in the function call.
2. Value from the configuration file specified by `config_fname`.
3. Environment variables.
Examples:
ag.init(app_id="xxxx", api_key="xxx")
ag.init(config_fname="config.toml")
ag.init() #assuming env vars are set
Args:
app_id (Optional[str]): ID of the Agenta application. Defaults to None. If not provided, will look for "AGENTA_APP_NAME" in environment variables.
host (Optional[str]): Host name of the backend server. Defaults to None. If not provided, will look for "AGENTA_HOST" in environment variables.
api_key (Optional[str]): API Key to use with the host of the backend server.
kwargs (Any): Additional keyword arguments.
app_id (Optional[str]): ID of the Agenta application. Defaults to None. If not provided, will look for "app_id" in the config file, then "AGENTA_APP_ID" in environment variables.
host (Optional[str]): Host name of the backend server. Defaults to None. If not provided, will look for "backend_host" in the config file, then "AGENTA_HOST" in environment variables.
api_key (Optional[str]): API Key to use with the host of the backend server. Defaults to None. If not provided, will look for "api_key" in the config file, then "AGENTA_API_KEY" in environment variables.
config_fname (Optional[str]): Path to the configuration file (relative or absolute). Defaults to None.
Raises:
ValueError: If `app_name`, `base_name`, or `host` are not specified either as arguments or in the environment variables.
ValueError: If `app_id` is not specified either as an argument, in the config file, or in the environment variables.
"""

app_id = app_id or os.environ.get("AGENTA_APP_ID")
if not app_id:
raise ValueError("App ID must be specified.")

base_id = os.environ.get("AGENTA_BASE_ID")
base_name = os.environ.get("AGENTA_BASE_NAME")
if base_id is None and (app_id is None or base_name is None):
config = {}
if config_fname:
config = toml.load(config_fname)

self.app_id = app_id or config.get("app_id") or os.environ.get("AGENTA_APP_ID")
self.host = (
host
or config.get("backend_host")
or os.environ.get("AGENTA_HOST", "https://cloud.agenta.ai")
)
self.api_key = (
api_key or config.get("api_key") or os.environ.get("AGENTA_API_KEY")
)

if not self.app_id:
raise ValueError(
"App ID must be specified. You can provide it in one of the following ways:\n"
"1. As an argument when calling ag.init(app_id='your_app_id').\n"
"2. In the configuration file specified by config_fname.\n"
"3. As an environment variable 'AGENTA_APP_ID'."
)
self.base_id = os.environ.get("AGENTA_BASE_ID")
if self.base_id is None:
print(
f"Warning: Your configuration will not be saved permanently since app_name and base_name are not provided."
"Warning: Your configuration will not be saved permanently since base_id is not provided."
)
else:
try:
base_id = self.get_app_base(app_id, base_name) # type: ignore
except Exception as ex:
raise APIRequestError(
f"Failed to get base id and/or app_id from the server with error: {ex}"
)

self.app_id = app_id
self.base_id = base_id
self.host = host
self.api_key = api_key or ""
self.variant_id = os.environ.get("AGENTA_VARIANT_ID")
self.variant_name = os.environ.get("AGENTA_VARIANT_NAME")
self.config = Config(base_id=self.base_id, host=self.host) # type: ignore

def get_app_base(self, app_id: str, base_name: str) -> str:
bases = client.bases.list_bases(app_id=app_id, base_name=base_name)
if len(bases) == 0:
raise APIRequestError(f"No base was found for the app {app_id}")
return bases[0].base_id

def get_current_config(self):
"""
Retrieves the current active configuration
"""

if self._config_data is None:
raise RuntimeError("AgentaSingleton has not been initialized")
return self._config_data


class Config:
def __init__(self, base_id: str, host: str):
def __init__(self, base_id: str, host: str, api_key: str = ""):
self.base_id = base_id
self.host = host

if base_id is None or host is None:
self.persist = False
else:
self.persist = True
self.client = AgentaApi(base_url=self.host + "/api", api_key=api_key)

def register_default(self, overwrite=False, **kwargs):
"""alias for default"""
Expand All @@ -138,7 +122,7 @@ def default(self, overwrite=False, **kwargs):
self.push(config_name="default", overwrite=overwrite, **kwargs)
except Exception as ex:
logger.warning(
"Unable to push the default configuration to the server." + str(ex)
"Unable to push the default configuration to the server. %s", str(ex)
)

def push(self, config_name: str, overwrite=True, **kwargs):
Expand All @@ -151,15 +135,15 @@ def push(self, config_name: str, overwrite=True, **kwargs):
if not self.persist:
return
try:
client.configs.save_config(
self.client.configs.save_config(
base_id=self.base_id,
config_name=config_name,
parameters=kwargs,
overwrite=overwrite,
)
except Exception as ex:
logger.warning(
"Failed to push the configuration to the server with error: " + str(ex)
"Failed to push the configuration to the server with error: %s", ex
)

def pull(
Expand All @@ -169,38 +153,46 @@ def pull(
if not self.persist and (
config_name != "default" or environment_name is not None
):
raise Exception(
raise ValueError(
"Cannot pull the configuration from the server since the app_name and base_name are not provided."
)
if self.persist:
try:
if environment_name:
config = client.configs.get_config(
config = self.client.configs.get_config(
base_id=self.base_id, environment_name=environment_name
)

else:
config = client.configs.get_config(
config = self.client.configs.get_config(
base_id=self.base_id,
config_name=config_name,
)
except Exception as ex:
logger.warning(
"Failed to pull the configuration from the server with error: "
+ str(ex)
"Failed to pull the configuration from the server with error: %s",
str(ex),
)
try:
self.set(**{"current_version": config.current_version, **config.parameters})
except Exception as ex:
logger.warning("Failed to set the configuration with error: " + str(ex))
logger.warning("Failed to set the configuration with error: %s", str(ex))

def all(self):
"""Returns all the parameters for the app variant"""
return {
k: v
for k, v in self.__dict__.items()
if k
not in ["app_name", "base_name", "host", "base_id", "api_key", "persist"]
not in [
"app_name",
"base_name",
"host",
"base_id",
"api_key",
"persist",
"client",
]
}

# function to set the parameters for the app variant
Expand All @@ -227,25 +219,38 @@ def init(
app_id: Optional[str] = None,
host: Optional[str] = None,
api_key: Optional[str] = None,
config_fname: Optional[str] = None,
max_workers: Optional[int] = None,
):
"""Main function to be called by the user to initialize the sdk.
"""Main function to initialize the agenta sdk.
Initializes agenta with the given `app_id`, `host`, and `api_key`. The order of precedence for these variables is:
1. Explicit argument provided in the function call.
2. Value from the configuration file specified by `config_fname`.
3. Environment variables.
- `app_id` is a required parameter (to be specified in one of the above ways)
- `host` is optional and defaults to "https://cloud.agenta.ai"
- `api_key` is optional and defaults to "". It is required only when using cloud or enterprise version of agenta.
Args:
app_id (str): The Id of the app.
host (str): The host of the backend server.
api_key (str): The API key to use for the backend server.
app_id (Optional[str]): ID of the Agenta application. Defaults to None. If not provided, will look for "app_id" in the config file, then "AGENTA_APP_ID" in environment variables.
host (Optional[str]): Host name of the backend server. Defaults to None. If not provided, will look for "backend_host" in the config file, then "AGENTA_HOST" in environment variables.
api_key (Optional[str]): API Key to use with the host of the backend server. Defaults to None. If not provided, will look for "api_key" in the config file, then "AGENTA_API_KEY" in environment variables.
config_fname (Optional[str]): Path to the configuration file. Defaults to None.
Raises:
ValueError: If `app_id` is not specified either as an argument, in the config file, or in the environment variables.
"""

singleton = AgentaSingleton()

singleton.init(app_id=app_id, host=host, api_key=api_key)
singleton.init(app_id=app_id, host=host, api_key=api_key, config_fname=config_fname)
tracing = Tracing(
host=singleton.host, # type: ignore
app_id=singleton.app_id, # type: ignore
variant_id=singleton.variant_id, # type: ignore
variant_name=singleton.variant_name,
api_key=api_key,
api_key=singleton.api_key,
max_workers=max_workers,
)
set_global(setup=singleton.setup, config=singleton.config, tracing=tracing)
28 changes: 12 additions & 16 deletions agenta-cli/agenta/sdk/decorators/llm_entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,20 +55,18 @@
class entrypoint(BaseDecorator):
"""Decorator class to wrap a function for HTTP POST, terminal exposure and enable tracing.
Args:
BaseDecorator (object): base decorator class
Example:
```python
import agenta as ag
@ag.entrypoint(enable_tracing=True) # Defaults to False
@ag.entrypoint
async def chain_of_prompts_llm(prompt: str):
return ...
```
"""

def __call__(self, func: Callable[..., Any]):
def __init__(self, func: Callable[..., Any]):
endpoint_name = "generate"
func_signature = inspect.signature(func)
config_params = agenta.config.all()
Expand Down Expand Up @@ -134,13 +132,12 @@ async def wrapper_deployed(*args, **kwargs) -> Any:
)

if self.is_main_script(func):
result = self.handle_terminal_run(
self.handle_terminal_run(
func,
func_signature.parameters, # type: ignore
config_params,
ingestible_files,
)
return result

def extract_ingestible_files(
self,
Expand Down Expand Up @@ -205,9 +202,16 @@ async def execute_function(self, func: Callable[..., Any], *args, **func_params)
return FuncResponse(**result, latency=round(latency, 4))
if isinstance(result, str):
return FuncResponse(message=result, latency=round(latency, 4)) # type: ignore
if isinstance(result, int) or isinstance(result, float):
return FuncResponse(message=str(result), latency=round(latency, 4))
if result is None:
return FuncResponse(
message="Function executed successfully, but did return None. \n Are you sure you did not forget to return a value?",
latency=round(latency, 4),
)
except Exception as e:
self.handle_exception(e)
return FuncResponse(message="Unexpected error occurred", latency=0) # type: ignore
return FuncResponse(message="Unexpected error occurred when calling the @entrypoing decorated function", latency=0) # type: ignore

def handle_exception(self, e: Exception):
"""Handle exceptions."""
Expand Down Expand Up @@ -322,14 +326,7 @@ def is_main_script(self, func: Callable) -> bool:
if is_main_script(my_function):
print("This is the main script.")
"""

# the function that gets passed to entrypoint will always be called from sdk/decorators/tracing.py
return os.path.splitext(os.path.basename(inspect.getfile(func)))[
0
] == "tracing" and os.path.splitext(os.path.basename(sys.argv[0]))[0] not in [
"bootstrap",
"main",
] # ensures that the script is called not from mangum or docker
return func.__module__ == "__main__"

def handle_terminal_run(
self,
Expand Down Expand Up @@ -401,7 +398,6 @@ def handle_terminal_run(
print(
f"\n========== Result ==========\n\nMessage: {result.message}\nCost: {result.cost}\nToken Usage: {result.usage}"
)
return result

def override_schema(
self, openapi_schema: dict, func_name: str, endpoint: str, params: dict
Expand Down
12 changes: 10 additions & 2 deletions agenta-cli/agenta/sdk/decorators/tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,13 @@ def __call__(self, func: Callable[..., Any]):
@wraps(func)
async def async_wrapper(*args, **kwargs):
result = None
func_args = inspect.getfullargspec(func).args
input_dict = {name: value for name, value in zip(func_args, args)}
input_dict.update(kwargs)

span = self.tracing.start_span(
name=func.__name__,
input=kwargs,
input=input_dict,
spankind=self.spankind,
config=self.config,
)
Expand All @@ -67,9 +71,13 @@ async def async_wrapper(*args, **kwargs):
@wraps(func)
def sync_wrapper(*args, **kwargs):
result = None
func_args = inspect.getfullargspec(func).args
input_dict = {name: value for name, value in zip(func_args, args)}
input_dict.update(kwargs)

span = self.tracing.start_span(
name=func.__name__,
input=kwargs,
input=input_dict,
spankind=self.spankind,
config=self.config,
)
Expand Down
Loading

0 comments on commit 46539a2

Please sign in to comment.