Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added from_url method to fetch evaluation annotation from the api. #1795

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 102 additions & 12 deletions src/ragas/dataset_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,25 @@
from uuid import UUID

import numpy as np
import requests
from datasets import Dataset as HFDataset
from pydantic import BaseModel, field_validator

from ragas.callbacks import ChainRunEncoder, parse_run_traces
from ragas.cost import CostCallbackHandler
from ragas.exceptions import UploadException
from ragas.messages import AIMessage, HumanMessage, ToolCall, ToolMessage
from ragas.sdk import RAGAS_API_URL, RAGAS_APP_URL, upload_packet
from ragas.sdk import (
upload_packet,
RAGAS_API_SOURCE,
get_app_token,
check_api_response,
build_evaluation_app_url,
get_api_url,
get_app_url,
)
from ragas.utils import safe_nanmean
from ragas._version import __version__

if t.TYPE_CHECKING:
from pathlib import Path
Expand Down Expand Up @@ -508,7 +518,10 @@ def total_cost(
cost_per_input_token, cost_per_output_token, per_model_costs
)

def upload(self, base_url: str = RAGAS_API_URL, verbose: bool = True) -> str:
def upload(
self,
verbose: bool = True,
) -> str:
from datetime import datetime, timezone

timestamp = datetime.now(timezone.utc).isoformat()
Expand All @@ -526,18 +539,16 @@ def upload(self, base_url: str = RAGAS_API_URL, verbose: bool = True) -> str:
response = upload_packet(
path="/alignment/evaluation",
data_json_string=packet,
base_url=base_url,
)

# check status codes
evaluation_endpoint = (
f"{RAGAS_APP_URL}/dashboard/alignment/evaluation/{root_trace.run_id}"
)
app_url = get_app_url()
evaluation_app_url = build_evaluation_app_url(app_url, root_trace.run_id)
if response.status_code == 409:
# this evalution already exists
if verbose:
print(f"Evaluation run already exists. View at {evaluation_endpoint}")
return evaluation_endpoint
print(f"Evaluation run already exists. View at {evaluation_app_url}")
return evaluation_app_url
elif response.status_code != 200:
# any other error
raise UploadException(
Expand All @@ -546,8 +557,8 @@ def upload(self, base_url: str = RAGAS_API_URL, verbose: bool = True) -> str:
)

if verbose:
print(f"Evaluation results uploaded! View at {evaluation_endpoint}")
return evaluation_endpoint
print(f"Evaluation results uploaded! View at {evaluation_app_url}")
return evaluation_app_url


class PromptAnnotation(BaseModel):
Expand Down Expand Up @@ -577,8 +588,24 @@ def __getitem__(self, key):
return SingleMetricAnnotation(name=key, samples=self.root[key])

@classmethod
def from_json(cls, path, metric_name: t.Optional[str]) -> "MetricAnnotation":
dataset = json.load(open(path))
def _process_dataset(
cls, dataset: dict, metric_name: t.Optional[str]
) -> "MetricAnnotation":
"""
Process raw dataset into MetricAnnotation format

Parameters
----------
dataset : dict
Raw dataset to process
metric_name : str, optional
Name of the specific metric to filter

Returns
-------
MetricAnnotation
Processed annotation data
"""
if metric_name is not None and metric_name not in dataset:
raise ValueError(f"Split {metric_name} not found in the dataset.")

Expand All @@ -590,6 +617,69 @@ def from_json(cls, path, metric_name: t.Optional[str]) -> "MetricAnnotation":
}
)

@classmethod
def from_json(cls, path: str, metric_name: t.Optional[str]) -> "MetricAnnotation":
"""Load annotations from a JSON file"""
dataset = json.load(open(path))
return cls._process_dataset(dataset, metric_name)

@classmethod
def from_app(
cls,
run_id: str = None,
metric_name: t.Optional[str] = None,
) -> "MetricAnnotation":
"""
Fetch annotations from a URL using either evaluation result or run_id

Parameters
----------
run_id : str
Direct run ID to fetch annotations
metric_name : str, optional
Name of the specific metric to filter

Returns
-------
MetricAnnotation
Annotation data from the API

Raises
------
ValueError
If run_id is not provided
"""
if run_id is None:
raise ValueError("run_id must be provided")

endpoint = f"/api/v1/alignment/evaluation/annotation/{run_id}"

app_token = get_app_token()
base_url = get_api_url()
app_url = get_app_url()

response = requests.get(
f"{base_url}{endpoint}",
headers={
"Content-Type": "application/json",
"x-app-token": app_token,
"x-source": RAGAS_API_SOURCE,
"x-app-version": __version__,
},
)

check_api_response(response)
dataset = response.json()["data"]

if not dataset:
evaluation_url = build_evaluation_app_url(app_url, run_id)
raise ValueError(
f"No annotations found. Please annotate the Evaluation first then run this method. "
f"\nNote: you can annotate the evaluations using the Ragas app by going to {evaluation_url}"
)

return cls._process_dataset(dataset, metric_name)

def __len__(self):
return sum(len(value) for value in self.root.values())

Expand Down
65 changes: 58 additions & 7 deletions src/ragas/metrics/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,12 @@

from ragas._analytics import EvaluationEvent, _analytics_batcher
from ragas.callbacks import ChainType, new_group
from ragas.dataset_schema import MetricAnnotation, MultiTurnSample, SingleTurnSample
from ragas.dataset_schema import (
MetricAnnotation,
MultiTurnSample,
SingleTurnSample,
EvaluationResult,
)
from ragas.executor import is_event_loop_running
from ragas.losses import BinaryMetricLoss, MSELoss
from ragas.prompt import FewShotPydanticPrompt, PromptMixin
Expand Down Expand Up @@ -350,7 +355,8 @@ def _optimize_demonstration(

def train(
self,
path: str,
path: t.Optional[str] = None,
run_id: t.Optional[str] = None,
demonstration_config: t.Optional[DemonstrationConfig] = None,
instruction_config: t.Optional[InstructionConfig] = None,
callbacks: t.Optional[Callbacks] = None,
Expand All @@ -359,13 +365,59 @@ def train(
with_debugging_logs=False,
raise_exceptions: bool = True,
) -> None:
"""
Train the metric using local JSON data or annotations from Ragas platform

Parameters
----------
path : str, optional
Path to local JSON training data file
run_id : str, optional
Direct run ID to fetch annotations
demonstration_config : DemonstrationConfig, optional
Configuration for demonstration optimization
instruction_config : InstructionConfig, optional
Configuration for instruction optimization
callbacks : Callbacks, optional
List of callback functions
run_config : RunConfig, optional
Run configuration
batch_size : int, optional
Batch size for training
with_debugging_logs : bool, default=False
Enable debugging logs
raise_exceptions : bool, default=True
Whether to raise exceptions during training

Raises
------
ValueError
If invalid combination of path, and run_id is provided
"""
# Validate input parameters
provided_inputs = sum(x is not None for x in [path, run_id])
if provided_inputs == 0:
raise ValueError(
"One of path or run_id must be provided"
)
if provided_inputs > 1:
raise ValueError(
"Only one of path or run_id should be provided"
)

run_config = run_config or RunConfig()
callbacks = callbacks or []

# load the dataset from path
if not path.endswith(".json"):
raise ValueError("Train data must be in json format")
dataset = MetricAnnotation.from_json(path, metric_name=self.name)
# Load the dataset based on input type
if path is not None:
if not path.endswith(".json"):
raise ValueError("Train data must be in json format")
dataset = MetricAnnotation.from_json(path, metric_name=self.name)
else:
dataset = MetricAnnotation.from_app(
run_id=run_id,
metric_name=self.name,
)

# only optimize the instruction if instruction_config is provided
if instruction_config is not None:
Expand All @@ -386,7 +438,6 @@ def train(
dataset=dataset,
)


@dataclass
class MetricWithEmbeddings(Metric):
embeddings: t.Optional[BaseRagasEmbeddings] = None
Expand Down
54 changes: 52 additions & 2 deletions src/ragas/sdk.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,20 @@ def get_app_token() -> str:
return app_token


def upload_packet(path: str, data_json_string: str, base_url: str = RAGAS_API_URL):
@lru_cache(maxsize=1)
def get_api_url() -> str:
return os.environ.get("RAGAS_API_URL", RAGAS_API_URL)


@lru_cache(maxsize=1)
def get_app_url() -> str:
return os.environ.get("RAGAS_APP_URL", RAGAS_APP_URL)


def upload_packet(path: str, data_json_string: str):
app_token = get_app_token()
base_url = get_api_url()

response = requests.post(
f"{base_url}/api/v1{path}",
data=data_json_string,
Expand All @@ -36,9 +48,47 @@ def upload_packet(path: str, data_json_string: str, base_url: str = RAGAS_API_UR
"x-app-version": __version__,
},
)
check_api_response(response)
return response


def check_api_response(response: requests.Response) -> None:
"""
Check API response status and raise appropriate exceptions

Parameters
----------
response : requests.Response
Response object from API request

Raises
------
UploadException
If authentication fails or other API errors occur
"""
if response.status_code == 403:
raise UploadException(
status_code=response.status_code,
message="AUTHENTICATION_ERROR: The app token is invalid. Please check your RAGAS_APP_TOKEN environment variable.",
)
return response

try:
response.raise_for_status()
except requests.exceptions.HTTPError:
error_msg = ""
try:
error_data = response.json()
if "message" in error_data:
error_msg += f"\nAPI Message: {error_data['message']}"
if "debug_error_info" in error_data:
error_msg += f"\nDebug Info: {error_data['debug_error_info']}"
except Exception as _e:
error_msg = f"\nStatus Code: {response.status_code}"

raise UploadException(
status_code=response.status_code, message=f"Request failed: {error_msg}"
)


def build_evaluation_app_url(app_url: str, run_id: str) -> str:
return f"{app_url}/dashboard/alignment/evaluation/{run_id}"
9 changes: 5 additions & 4 deletions src/ragas/testset/synthesizers/testset_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
SingleTurnSample,
)
from ragas.exceptions import UploadException
from ragas.sdk import RAGAS_API_URL, RAGAS_APP_URL, upload_packet
from ragas.sdk import upload_packet, get_app_url


class TestsetSample(BaseSample):
Expand Down Expand Up @@ -136,14 +136,15 @@ def total_cost(
cost_per_output_token=cost_per_output_token,
)

def upload(self, base_url: str = RAGAS_API_URL, verbose: bool = True) -> str:
def upload(self, verbose: bool = True) -> str:
packet = TestsetPacket(samples_original=self.samples, run_id=self.run_id)
response = upload_packet(
path="/alignment/testset",
data_json_string=packet.model_dump_json(),
base_url=base_url,
)
testset_endpoint = f"{RAGAS_APP_URL}/dashboard/alignment/testset/{self.run_id}"
app_url = get_app_url()

testset_endpoint = f"{app_url}/dashboard/alignment/testset/{self.run_id}"
if response.status_code == 409:
# this testset already exists
if verbose:
Expand Down
Loading