diff --git a/agenta-backend/agenta_backend/models/api/evaluation_model.py b/agenta-backend/agenta_backend/models/api/evaluation_model.py index f1ace267b8..a1c3571fc6 100644 --- a/agenta-backend/agenta_backend/models/api/evaluation_model.py +++ b/agenta-backend/agenta_backend/models/api/evaluation_model.py @@ -69,6 +69,11 @@ class NewHumanEvaluation(BaseModel): status: str +class AppOutput(BaseModel): + output: Any + status: str + + class Evaluation(BaseModel): id: str app_id: str @@ -246,11 +251,19 @@ class EvaluationSettingsTemplate(BaseModel): description: str +class LLMRunRateLimit(BaseModel): + batch_size: int + max_retries: int + retry_delay: int + delay_between_batches: int + + class NewEvaluation(BaseModel): app_id: str variant_ids: List[str] evaluators_configs: List[str] testset_id: str + rate_limit: LLMRunRateLimit class NewEvaluatorConfig(BaseModel): diff --git a/agenta-backend/agenta_backend/routers/evaluation_router.py b/agenta-backend/agenta_backend/routers/evaluation_router.py index caadbf6a04..fb4feb0041 100644 --- a/agenta-backend/agenta_backend/routers/evaluation_router.py +++ b/agenta-backend/agenta_backend/routers/evaluation_router.py @@ -69,6 +69,7 @@ async def create_evaluation( "variant_ids": [variant_id], # Only this variant ID "evaluators_configs": payload.evaluators_configs, "testset_id": payload.testset_id, + "rate_limit": payload.rate_limit.dict(), } evaluation = await evaluation_service.create_new_evaluation( @@ -78,7 +79,10 @@ async def create_evaluation( ) evaluate.delay( - app_data, new_evaluation_data, evaluation.id, evaluation.testset_id + app_data, + new_evaluation_data, + evaluation.id, + evaluation.testset_id, ) evaluations.append(evaluation) diff --git a/agenta-backend/agenta_backend/services/llm_apps_service.py b/agenta-backend/agenta_backend/services/llm_apps_service.py index d556f526f7..a749cbc418 100644 --- a/agenta-backend/agenta_backend/services/llm_apps_service.py +++ b/agenta-backend/agenta_backend/services/llm_apps_service.py @@ -1,15 +1,18 @@ -from typing import Any +import asyncio +import logging +from typing import Any, List + +from agenta_backend.models.api.evaluation_model import AppOutput import httpx -import backoff -@backoff.on_exception( - backoff.expo, - (httpx.TimeoutException, httpx.ConnectTimeout, httpx.ConnectError), - max_tries=2, -) -def get_llm_app_output(uri: str, input: Any) -> Any: +# Set logger +logger = logging.getLogger(__name__) +logger.setLevel(logging.DEBUG) + + +async def get_llm_app_output(uri: str, input: Any) -> AppOutput: url = f"{uri}/generate" # TODO: adjust these hardcoded values in this payload @@ -23,9 +26,72 @@ def get_llm_app_output(uri: str, input: Any) -> Any: "inputs": {"country": input}, } - with httpx.Client() as client: - response = client.post( + async with httpx.AsyncClient() as client: + response = await client.post( url, json=payload, timeout=httpx.Timeout(timeout=5, read=None, write=5) ) response.raise_for_status() - return response.json() + return AppOutput(output=response.json(), status="success") + + +async def run_with_retry( + uri: str, input_data: Any, max_retry_count: int, retry_delay: int +) -> AppOutput: + retries = 0 + last_exception = None + while retries < max_retry_count: + try: + result = await get_llm_app_output(uri, input_data) + return result + except (httpx.TimeoutException, httpx.ConnectTimeout, httpx.ConnectError) as e: + last_exception = e + print(f"Error in evaluation. Retrying in {retry_delay} seconds:", e) + await asyncio.sleep(retry_delay) + retries += 1 + + # If max retries reached, return the last exception + return AppOutput(output=None, status=str(last_exception)) + + +async def batch_invoke( + uri: str, testset_data: List[dict], rate_limit_config: dict +) -> List[AppOutput]: + batch_size = rate_limit_config[ + "batch_size" + ] # Number of testset to make in each batch + max_retries = rate_limit_config[ + "max_retries" + ] # Maximum number of times to retry the failed llm call + retry_delay = rate_limit_config[ + "retry_delay" + ] # Delay before retrying the failed llm call (in seconds) + delay_between_batches = rate_limit_config[ + "delay_between_batches" + ] # Delay between batches (in seconds) + + list_of_app_outputs: List[AppOutput] = [] # Outputs after running all batches + + async def run_batch(start_idx: int): + print(f"Preparing {start_idx} batch...") + end_idx = min(start_idx + batch_size, len(testset_data)) + for index in range(start_idx, end_idx): + try: + batch_output: AppOutput = await run_with_retry( + uri, testset_data[index], max_retries, retry_delay + ) + list_of_app_outputs.append(batch_output) + print(f"Adding outputs to batch {start_idx}") + except Exception as exc: + logger.info( + f"Error processing batch[{start_idx}]:[{end_idx}] ==> {str(exc)}" + ) + + # Schedule the next batch with a delay + next_batch_start_idx = end_idx + if next_batch_start_idx < len(testset_data): + await asyncio.sleep(delay_between_batches) + await run_batch(next_batch_start_idx) + + # Start the first batch + await run_batch(0) + return list_of_app_outputs diff --git a/agenta-backend/agenta_backend/tasks/evaluations.py b/agenta-backend/agenta_backend/tasks/evaluations.py index acb145cb5f..28f5ca7c84 100644 --- a/agenta-backend/agenta_backend/tasks/evaluations.py +++ b/agenta-backend/agenta_backend/tasks/evaluations.py @@ -10,10 +10,10 @@ fetch_app_variant_by_id, fetch_evaluator_config, get_deployment_by_objectid, + update_evaluation, fetch_testset_by_id, create_new_evaluation_scenario, fetch_evaluator_config_by_appId, - update_evaluation, update_evaluation_with_aggregated_results, ) from agenta_backend.models.db_models import ( @@ -25,19 +25,21 @@ Result, ) from agenta_backend.services import evaluators_service -from agenta_backend.models.api.evaluation_model import NewEvaluation +from agenta_backend.models.api.evaluation_model import NewEvaluation, AppOutput @shared_task(queue="agenta_backend.tasks.evaluations.evaluate") def evaluate( - app_data: dict, new_evaluation_data: dict, evaluation_id: str, testset_id: str + app_data: dict, + new_evaluation_data: dict, + evaluation_id: str, + testset_id: str, ): loop = asyncio.get_event_loop() + app = AppDB(**app_data) + evaluation = NewEvaluation(**new_evaluation_data) try: - app = AppDB(**app_data) - evaluation = NewEvaluation(**new_evaluation_data) - testset = loop.run_until_complete(fetch_testset_by_id(testset_id)) new_evaluation_db = loop.run_until_complete( fetch_evaluation_by_id(evaluation_id) @@ -51,8 +53,27 @@ def evaluate( get_deployment_by_objectid(app_variant_db.base.deployment) ) - for data_point in testset.csvdata: - # 1. We prepare the inputs + #!NOTE: do not remove! this will be used in github workflow! + backend_environment = os.environ.get("ENVIRONMENT") + if backend_environment is not None and backend_environment == "github": + uri = f"http://{deployment.container_name}" + else: + uri = deployment.uri.replace( + "http://localhost", "http://host.docker.internal" + ) + + # 1. We get the output from the llm app + app_outputs: List[AppOutput] = loop.run_until_complete( + llm_apps_service.batch_invoke( + uri, testset.csvdata, evaluation.rate_limit.dict() + ) + ) + for data_point, app_output in zip(testset.csvdata, app_outputs): + if len(testset.csvdata) != len(app_outputs): + # TODO: properly handle error in the case where the length are not the same + break + + # 2. We prepare the inputs raw_inputs = ( app_variant_db.parameters.get("inputs", []) if app_variant_db.parameters @@ -69,25 +90,6 @@ def evaluate( for input_item in raw_inputs ] - #!NOTE: do not remove! this will be used in github workflow! - backend_environment = os.environ.get("ENVIRONMENT") - if backend_environment is not None and backend_environment == "github": - uri = f"http://{deployment.container_name}" - else: - uri = deployment.uri.replace( - "http://localhost", "http://host.docker.internal" - ) - - # 2. We get the output from the llm app - try: - variant_output = llm_apps_service.get_llm_app_output(uri, data_point) - except Exception as e: - print(f"Error getting variant output: {e}") - loop.run_until_complete( - update_evaluation(evaluation_id, {"status": "EVALUATION_FAILED"}) - ) - return - # 3. We evaluate evaluators_results: [EvaluationScenarioResult] = [] for evaluator_config_id in evaluation.evaluators_configs: @@ -105,7 +107,7 @@ def evaluate( ) result = evaluators_service.evaluate( evaluator_config.evaluator_key, - variant_output, + app_output.output, data_point["correct_answer"], evaluator_config.settings_values, **additional_kwargs, @@ -120,40 +122,39 @@ def evaluate( result ) - # 4. We create a new evaluation scenario - evaluation_scenario = loop.run_until_complete( - create_new_evaluation_scenario( - user=app.user, - organization=app.organization, - evaluation=new_evaluation_db, - variant_id=variant_id, - evaluators_configs=new_evaluation_db.evaluators_configs, - inputs=inputs, - is_pinned=False, - note="", - correct_answer=data_point["correct_answer"], - outputs=[ - EvaluationScenarioOutputDB(type="text", value=variant_output) - ], - results=evaluators_results, - ) - ) - - aggregated_results = loop.run_until_complete( - aggregate_evaluator_results(app, evaluators_aggregated_data) - ) - updated_evaluation = loop.run_until_complete( - update_evaluation_with_aggregated_results( - new_evaluation_db.id, aggregated_results + # 4. We create a new evaluation scenario + evaluation_scenario = loop.run_until_complete( + create_new_evaluation_scenario( + user=app.user, + organization=app.organization, + evaluation=new_evaluation_db, + variant_id=variant_id, + evaluators_configs=new_evaluation_db.evaluators_configs, + inputs=inputs, + is_pinned=False, + note="", + correct_answer=data_point["correct_answer"], + outputs=[ + EvaluationScenarioOutputDB(type="text", value=app_output.output) + ], + results=evaluators_results, ) ) - except Exception as e: print(f"An error occurred during evaluation: {e}") loop.run_until_complete( update_evaluation(evaluation_id, {"status": "EVALUATION_FAILED"}) ) + aggregated_results = loop.run_until_complete( + aggregate_evaluator_results(app, evaluators_aggregated_data) + ) + updated_evaluation = loop.run_until_complete( + update_evaluation_with_aggregated_results( + new_evaluation_db.id, aggregated_results + ) + ) + async def aggregate_evaluator_results( app: AppDB, evaluators_aggregated_data: dict diff --git a/agenta-backend/agenta_backend/tests/variants_evaluators_router/test_evaluators_router.py b/agenta-backend/agenta_backend/tests/variants_evaluators_router/test_evaluators_router.py index b399e37fd4..c86201d146 100644 --- a/agenta-backend/agenta_backend/tests/variants_evaluators_router/test_evaluators_router.py +++ b/agenta-backend/agenta_backend/tests/variants_evaluators_router/test_evaluators_router.py @@ -161,6 +161,12 @@ async def test_create_evaluation(): "variant_ids": [str(app_variant.id)], "evaluators_configs": [], "testset_id": str(testset.id), + "rate_limit": { + "batch_size": 10, + "max_retries": 3, + "retry_delay": 3, + "delay_between_batches": 5, + }, } # Fetch evaluator configs diff --git a/agenta-web/src/components/AppSelector/modals/CreateAppStatusModal.tsx b/agenta-web/src/components/AppSelector/modals/CreateAppStatusModal.tsx index 3a7d655407..cb165d4b04 100644 --- a/agenta-web/src/components/AppSelector/modals/CreateAppStatusModal.tsx +++ b/agenta-web/src/components/AppSelector/modals/CreateAppStatusModal.tsx @@ -196,8 +196,8 @@ const CreateAppStatusModal: React.FC> type === "success" ? "success" : type === "error" - ? "danger" - : "secondary" + ? "danger" + : "secondary" } strong={Object.keys(messages)[ix] === "success"} > diff --git a/agenta-web/src/components/Evaluations/HumanEvaluationResult.tsx b/agenta-web/src/components/Evaluations/HumanEvaluationResult.tsx index dd4b7f1168..96a6063500 100644 --- a/agenta-web/src/components/Evaluations/HumanEvaluationResult.tsx +++ b/agenta-web/src/components/Evaluations/HumanEvaluationResult.tsx @@ -103,10 +103,12 @@ export default function HumanEvaluationResult() { } const fetchEvaluations = async () => { try { - loadEvaluations(app_id, true) + fetchData(`${getAgentaApiUrl()}/api/human-evaluations/?app_id=${app_id}`) .then((response) => { const fetchPromises = response.map((item: EvaluationResponseType) => { - return fetchEvaluationResults(item.id, true) + return fetchData( + `${getAgentaApiUrl()}/api/human-evaluations/${item.id}/results/`, + ) .then((results) => { if (item.evaluation_type === EvaluationType.human_a_b_testing) { if (Object.keys(results.votes_data).length > 0) { diff --git a/agenta-web/src/components/pages/evaluations/evaluationResults/NewEvaluationModal.tsx b/agenta-web/src/components/pages/evaluations/evaluationResults/NewEvaluationModal.tsx index 1b34e34c5a..e881685aff 100644 --- a/agenta-web/src/components/pages/evaluations/evaluationResults/NewEvaluationModal.tsx +++ b/agenta-web/src/components/pages/evaluations/evaluationResults/NewEvaluationModal.tsx @@ -1,10 +1,23 @@ import {useAppId} from "@/hooks/useAppId" -import {JSSTheme, Variant, testset} from "@/lib/Types" +import {JSSTheme, Variant, LLMRunRateLimit, testset} from "@/lib/Types" import {evaluatorConfigsAtom, evaluatorsAtom} from "@/lib/atoms/evaluation" import {fetchTestsets, fetchVariants} from "@/lib/services/api" import {CreateEvaluationData, createEvalutaiton} from "@/services/evaluations" -import {PlusOutlined} from "@ant-design/icons" -import {Divider, Form, Modal, Select, Spin, Tag, Typography} from "antd" +import {PlusOutlined, QuestionCircleOutlined} from "@ant-design/icons" +import { + Divider, + Form, + Modal, + Select, + Spin, + Tag, + Typography, + InputNumber, + Row, + Col, + Switch, + Tooltip, +} from "antd" import dayjs from "dayjs" import {useAtom} from "jotai" import Image from "next/image" @@ -61,6 +74,7 @@ const NewEvaluationModal: React.FC = ({onSuccess, ...props}) => { const [evaluatorConfigs] = useAtom(evaluatorConfigsAtom) const [evaluators] = useAtom(evaluatorsAtom) const [submitLoading, setSubmitLoading] = useState(false) + const [showRateLimitInputs, setShowRateLimitInputs] = useState(false) const [form] = Form.useForm() useEffect(() => { @@ -75,9 +89,23 @@ const NewEvaluationModal: React.FC = ({onSuccess, ...props}) => { .finally(() => setFetching(false)) }, [props.open, appId]) + const [rateLimitValues, setRateLimitValues] = useState({ + batch_size: 10, + max_retries: 3, + retry_delay: 3, + delay_between_batches: 5, + }) + const onRateLimitInputChange = (field: keyof LLMRunRateLimit, value: number) => { + setRateLimitValues((prevValues: any) => ({...prevValues, [field]: value})) + } + const onRateLimitSwitchChange = (checked: boolean) => { + setShowRateLimitInputs(checked) + } + const onSubmit = (values: CreateEvaluationData) => { setSubmitLoading(true) - createEvalutaiton(appId, values) + const EvaluationRateLimit: LLMRunRateLimit = rateLimitValues + createEvalutaiton(appId, {...values, rate_limit: EvaluationRateLimit}) .then(onSuccess) .catch(console.error) .finally(() => setSubmitLoading(false)) @@ -176,6 +204,122 @@ const NewEvaluationModal: React.FC = ({onSuccess, ...props}) => { })} + + + + + {showRateLimitInputs && ( + + + + + + Batch Size  + + + + + } + name="batch_size" + style={{marginBottom: "0"}} + rules={[ + {required: true, message: "This field is required"}, + ]} + > + + onRateLimitInputChange("batch_size", value) + } + style={{width: "100%"}} + /> + + + + + Max Retries  + + + + + } + name="max_retries" + rules={[ + {required: true, message: "This field is required"}, + ]} + > + + onRateLimitInputChange("max_retries", value) + } + style={{width: "100%"}} + /> + + + + + Retry Delay  + + + + + } + style={{marginBottom: "0"}} + name="retry_delay" + rules={[ + {required: true, message: "This field is required"}, + ]} + > + + onRateLimitInputChange("retry_delay", value) + } + style={{width: "100%"}} + /> + + + + + Delay Between Batches  + + + + + } + name="delay_between_batches" + style={{marginBottom: "0"}} + rules={[ + {required: true, message: "This field is required"}, + ]} + > + + onRateLimitInputChange( + "delay_between_batches", + value, + ) + } + style={{width: "100%"}} + /> + + + + + )} diff --git a/agenta-web/src/lib/Types.ts b/agenta-web/src/lib/Types.ts index 12c424f3bf..ede9b0f05c 100644 --- a/agenta-web/src/lib/Types.ts +++ b/agenta-web/src/lib/Types.ts @@ -50,6 +50,13 @@ export interface PlaygroundTabsItem { closable: boolean } +export interface LLMRunRateLimit { + batch_size: number + max_retries: number + retry_delay: number + delay_between_batches: number +} + export interface Evaluation { id: string createdAt: string diff --git a/agenta-web/src/services/evaluations/index.ts b/agenta-web/src/services/evaluations/index.ts index c2044387fa..b2ee464db0 100644 --- a/agenta-web/src/services/evaluations/index.ts +++ b/agenta-web/src/services/evaluations/index.ts @@ -5,6 +5,8 @@ import { EvaluationStatus, Evaluator, EvaluatorConfig, + LLMRunRateLimit, + TypedValue, _Evaluation, _EvaluationScenario, } from "@/lib/Types" @@ -16,7 +18,7 @@ import regexImg from "@/media/programming.png" import webhookImg from "@/media/link.png" import aiImg from "@/media/artificial-intelligence.png" import codeImg from "@/media/browser.png" -import {calcEvalDuration} from "@/components/pages/evaluations/evaluationResults/EvaluationResults" +import dayjs from "dayjs" //Prefix convention: // - fetch: GET single entity from server @@ -71,31 +73,31 @@ export const deleteEvaluatorConfig = async (configId: string) => { } // Evaluations -const evaluationTransformer = (item: any) => { - const res = { - id: item.id, - appId: item.app_id, - created_at: item.created_at, - updated_at: item.updated_at, - status: item.status, - testset: { - id: item.testset_id, - name: item.testset_name, - }, - user: { - id: item.user_id, - username: item.user_username, - }, - variants: item.variant_ids.map((id: string, ix: number) => ({ - variantId: id, - variantName: item.variant_names[ix], - })), - aggregated_results: item.aggregated_results || [], - } - - ;(res as _Evaluation).duration = calcEvalDuration(res) - return res -} +const evaluationTransformer = (item: any) => ({ + id: item.id, + appId: item.app_id, + created_at: item.created_at, + updated_at: item.updated_at, + duration: dayjs( + [EvaluationStatus.STARTED, EvaluationStatus.INITIALIZED].includes(item.status) + ? Date.now() + : item.updated_at, + ).diff(dayjs(item.created_at), "milliseconds"), + status: item.status, + testset: { + id: item.testset_id, + name: item.testset_name, + }, + user: { + id: item.user_id, + username: item.user_username, + }, + variants: item.variant_ids.map((id: string, ix: number) => ({ + variantId: id, + variantName: item.variant_names[ix], + })), + aggregated_results: item.aggregated_results || [], +}) export const fetchAllEvaluations = async (appId: string) => { const response = await axios.get(`/api/evaluations/`, {params: {app_id: appId}}) @@ -116,6 +118,7 @@ export type CreateEvaluationData = { testset_id: string variant_ids: string[] evaluators_configs: string[] + rate_limit: LLMRunRateLimit } export const createEvalutaiton = async (appId: string, evaluation: CreateEvaluationData) => { return axios.post(`/api/evaluations/`, {...evaluation, app_id: appId})