Skip to content

Commit

Permalink
Merge pull request #1148 from Agenta-AI/evaluation-rate-limiting
Browse files Browse the repository at this point in the history
Enhancement: RPM/TPM Rate Limiting for Evaluation
  • Loading branch information
aakrem authored Jan 4, 2024
2 parents 29fa6da + 1584608 commit 3dfdf04
Show file tree
Hide file tree
Showing 10 changed files with 346 additions and 100 deletions.
13 changes: 13 additions & 0 deletions agenta-backend/agenta_backend/models/api/evaluation_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ class NewHumanEvaluation(BaseModel):
status: str


class AppOutput(BaseModel):
output: Any
status: str


class Evaluation(BaseModel):
id: str
app_id: str
Expand Down Expand Up @@ -246,11 +251,19 @@ class EvaluationSettingsTemplate(BaseModel):
description: str


class LLMRunRateLimit(BaseModel):
batch_size: int
max_retries: int
retry_delay: int
delay_between_batches: int


class NewEvaluation(BaseModel):
app_id: str
variant_ids: List[str]
evaluators_configs: List[str]
testset_id: str
rate_limit: LLMRunRateLimit


class NewEvaluatorConfig(BaseModel):
Expand Down
6 changes: 5 additions & 1 deletion agenta-backend/agenta_backend/routers/evaluation_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ async def create_evaluation(
"variant_ids": [variant_id], # Only this variant ID
"evaluators_configs": payload.evaluators_configs,
"testset_id": payload.testset_id,
"rate_limit": payload.rate_limit.dict(),
}

evaluation = await evaluation_service.create_new_evaluation(
Expand All @@ -78,7 +79,10 @@ async def create_evaluation(
)

evaluate.delay(
app_data, new_evaluation_data, evaluation.id, evaluation.testset_id
app_data,
new_evaluation_data,
evaluation.id,
evaluation.testset_id,
)
evaluations.append(evaluation)

Expand Down
88 changes: 77 additions & 11 deletions agenta-backend/agenta_backend/services/llm_apps_service.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
from typing import Any
import asyncio
import logging
from typing import Any, List

from agenta_backend.models.api.evaluation_model import AppOutput

import httpx
import backoff


@backoff.on_exception(
backoff.expo,
(httpx.TimeoutException, httpx.ConnectTimeout, httpx.ConnectError),
max_tries=2,
)
def get_llm_app_output(uri: str, input: Any) -> Any:
# Set logger
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)


async def get_llm_app_output(uri: str, input: Any) -> AppOutput:
url = f"{uri}/generate"

# TODO: adjust these hardcoded values in this payload
Expand All @@ -23,9 +26,72 @@ def get_llm_app_output(uri: str, input: Any) -> Any:
"inputs": {"country": input},
}

with httpx.Client() as client:
response = client.post(
async with httpx.AsyncClient() as client:
response = await client.post(
url, json=payload, timeout=httpx.Timeout(timeout=5, read=None, write=5)
)
response.raise_for_status()
return response.json()
return AppOutput(output=response.json(), status="success")


async def run_with_retry(
uri: str, input_data: Any, max_retry_count: int, retry_delay: int
) -> AppOutput:
retries = 0
last_exception = None
while retries < max_retry_count:
try:
result = await get_llm_app_output(uri, input_data)
return result
except (httpx.TimeoutException, httpx.ConnectTimeout, httpx.ConnectError) as e:
last_exception = e
print(f"Error in evaluation. Retrying in {retry_delay} seconds:", e)
await asyncio.sleep(retry_delay)
retries += 1

# If max retries reached, return the last exception
return AppOutput(output=None, status=str(last_exception))


async def batch_invoke(
uri: str, testset_data: List[dict], rate_limit_config: dict
) -> List[AppOutput]:
batch_size = rate_limit_config[
"batch_size"
] # Number of testset to make in each batch
max_retries = rate_limit_config[
"max_retries"
] # Maximum number of times to retry the failed llm call
retry_delay = rate_limit_config[
"retry_delay"
] # Delay before retrying the failed llm call (in seconds)
delay_between_batches = rate_limit_config[
"delay_between_batches"
] # Delay between batches (in seconds)

list_of_app_outputs: List[AppOutput] = [] # Outputs after running all batches

async def run_batch(start_idx: int):
print(f"Preparing {start_idx} batch...")
end_idx = min(start_idx + batch_size, len(testset_data))
for index in range(start_idx, end_idx):
try:
batch_output: AppOutput = await run_with_retry(
uri, testset_data[index], max_retries, retry_delay
)
list_of_app_outputs.append(batch_output)
print(f"Adding outputs to batch {start_idx}")
except Exception as exc:
logger.info(
f"Error processing batch[{start_idx}]:[{end_idx}] ==> {str(exc)}"
)

# Schedule the next batch with a delay
next_batch_start_idx = end_idx
if next_batch_start_idx < len(testset_data):
await asyncio.sleep(delay_between_batches)
await run_batch(next_batch_start_idx)

# Start the first batch
await run_batch(0)
return list_of_app_outputs
109 changes: 55 additions & 54 deletions agenta-backend/agenta_backend/tasks/evaluations.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@
fetch_app_variant_by_id,
fetch_evaluator_config,
get_deployment_by_objectid,
update_evaluation,
fetch_testset_by_id,
create_new_evaluation_scenario,
fetch_evaluator_config_by_appId,
update_evaluation,
update_evaluation_with_aggregated_results,
)
from agenta_backend.models.db_models import (
Expand All @@ -25,19 +25,21 @@
Result,
)
from agenta_backend.services import evaluators_service
from agenta_backend.models.api.evaluation_model import NewEvaluation
from agenta_backend.models.api.evaluation_model import NewEvaluation, AppOutput


@shared_task(queue="agenta_backend.tasks.evaluations.evaluate")
def evaluate(
app_data: dict, new_evaluation_data: dict, evaluation_id: str, testset_id: str
app_data: dict,
new_evaluation_data: dict,
evaluation_id: str,
testset_id: str,
):
loop = asyncio.get_event_loop()
app = AppDB(**app_data)
evaluation = NewEvaluation(**new_evaluation_data)

try:
app = AppDB(**app_data)
evaluation = NewEvaluation(**new_evaluation_data)

testset = loop.run_until_complete(fetch_testset_by_id(testset_id))
new_evaluation_db = loop.run_until_complete(
fetch_evaluation_by_id(evaluation_id)
Expand All @@ -51,8 +53,27 @@ def evaluate(
get_deployment_by_objectid(app_variant_db.base.deployment)
)

for data_point in testset.csvdata:
# 1. We prepare the inputs
#!NOTE: do not remove! this will be used in github workflow!
backend_environment = os.environ.get("ENVIRONMENT")
if backend_environment is not None and backend_environment == "github":
uri = f"http://{deployment.container_name}"
else:
uri = deployment.uri.replace(
"http://localhost", "http://host.docker.internal"
)

# 1. We get the output from the llm app
app_outputs: List[AppOutput] = loop.run_until_complete(
llm_apps_service.batch_invoke(
uri, testset.csvdata, evaluation.rate_limit.dict()
)
)
for data_point, app_output in zip(testset.csvdata, app_outputs):
if len(testset.csvdata) != len(app_outputs):
# TODO: properly handle error in the case where the length are not the same
break

# 2. We prepare the inputs
raw_inputs = (
app_variant_db.parameters.get("inputs", [])
if app_variant_db.parameters
Expand All @@ -69,25 +90,6 @@ def evaluate(
for input_item in raw_inputs
]

#!NOTE: do not remove! this will be used in github workflow!
backend_environment = os.environ.get("ENVIRONMENT")
if backend_environment is not None and backend_environment == "github":
uri = f"http://{deployment.container_name}"
else:
uri = deployment.uri.replace(
"http://localhost", "http://host.docker.internal"
)

# 2. We get the output from the llm app
try:
variant_output = llm_apps_service.get_llm_app_output(uri, data_point)
except Exception as e:
print(f"Error getting variant output: {e}")
loop.run_until_complete(
update_evaluation(evaluation_id, {"status": "EVALUATION_FAILED"})
)
return

# 3. We evaluate
evaluators_results: [EvaluationScenarioResult] = []
for evaluator_config_id in evaluation.evaluators_configs:
Expand All @@ -105,7 +107,7 @@ def evaluate(
)
result = evaluators_service.evaluate(
evaluator_config.evaluator_key,
variant_output,
app_output.output,
data_point["correct_answer"],
evaluator_config.settings_values,
**additional_kwargs,
Expand All @@ -120,40 +122,39 @@ def evaluate(
result
)

# 4. We create a new evaluation scenario
evaluation_scenario = loop.run_until_complete(
create_new_evaluation_scenario(
user=app.user,
organization=app.organization,
evaluation=new_evaluation_db,
variant_id=variant_id,
evaluators_configs=new_evaluation_db.evaluators_configs,
inputs=inputs,
is_pinned=False,
note="",
correct_answer=data_point["correct_answer"],
outputs=[
EvaluationScenarioOutputDB(type="text", value=variant_output)
],
results=evaluators_results,
)
)

aggregated_results = loop.run_until_complete(
aggregate_evaluator_results(app, evaluators_aggregated_data)
)
updated_evaluation = loop.run_until_complete(
update_evaluation_with_aggregated_results(
new_evaluation_db.id, aggregated_results
# 4. We create a new evaluation scenario
evaluation_scenario = loop.run_until_complete(
create_new_evaluation_scenario(
user=app.user,
organization=app.organization,
evaluation=new_evaluation_db,
variant_id=variant_id,
evaluators_configs=new_evaluation_db.evaluators_configs,
inputs=inputs,
is_pinned=False,
note="",
correct_answer=data_point["correct_answer"],
outputs=[
EvaluationScenarioOutputDB(type="text", value=app_output.output)
],
results=evaluators_results,
)
)

except Exception as e:
print(f"An error occurred during evaluation: {e}")
loop.run_until_complete(
update_evaluation(evaluation_id, {"status": "EVALUATION_FAILED"})
)

aggregated_results = loop.run_until_complete(
aggregate_evaluator_results(app, evaluators_aggregated_data)
)
updated_evaluation = loop.run_until_complete(
update_evaluation_with_aggregated_results(
new_evaluation_db.id, aggregated_results
)
)


async def aggregate_evaluator_results(
app: AppDB, evaluators_aggregated_data: dict
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,12 @@ async def test_create_evaluation():
"variant_ids": [str(app_variant.id)],
"evaluators_configs": [],
"testset_id": str(testset.id),
"rate_limit": {
"batch_size": 10,
"max_retries": 3,
"retry_delay": 3,
"delay_between_batches": 5,
},
}

# Fetch evaluator configs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,8 @@ const CreateAppStatusModal: React.FC<Props & React.ComponentProps<typeof Modal>>
type === "success"
? "success"
: type === "error"
? "danger"
: "secondary"
? "danger"
: "secondary"
}
strong={Object.keys(messages)[ix] === "success"}
>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,12 @@ export default function HumanEvaluationResult() {
}
const fetchEvaluations = async () => {
try {
loadEvaluations(app_id, true)
fetchData(`${getAgentaApiUrl()}/api/human-evaluations/?app_id=${app_id}`)
.then((response) => {
const fetchPromises = response.map((item: EvaluationResponseType) => {
return fetchEvaluationResults(item.id, true)
return fetchData(
`${getAgentaApiUrl()}/api/human-evaluations/${item.id}/results/`,
)
.then((results) => {
if (item.evaluation_type === EvaluationType.human_a_b_testing) {
if (Object.keys(results.votes_data).length > 0) {
Expand Down
Loading

0 comments on commit 3dfdf04

Please sign in to comment.