diff --git a/python/langsmith/_expect.py b/python/langsmith/_expect.py index fcdeeebee..db914c31e 100644 --- a/python/langsmith/_expect.py +++ b/python/langsmith/_expect.py @@ -356,7 +356,7 @@ def score( key: The key to use for logging the score. Defaults to "score". Examples: - >>> expect.score(0.8) # doctest: +ELLIPSIS + >>> expect.score(0.8) # doctest: +ELLIPSIS >>> expect.score(0.8, key="similarity").to_be_greater_than(0.7) diff --git a/python/langsmith/client.py b/python/langsmith/client.py index 186ca9990..41aeca557 100644 --- a/python/langsmith/client.py +++ b/python/langsmith/client.py @@ -3439,6 +3439,8 @@ def create_feedback( feedback_config: Optional[ls_schemas.FeedbackConfig] = None, stop_after_attempt: int = 10, project_id: Optional[ID_TYPE] = None, + comparative_experiment_id: Optional[ID_TYPE] = None, + feedback_group_id: Optional[ID_TYPE] = None, **kwargs: Any, ) -> ls_schemas.Feedback: """Create a feedback in the LangSmith API. @@ -3478,6 +3480,12 @@ def create_feedback( project_id : str or UUID The ID of the project_id to provide feedback on. One - and only one - of this and run_id must be provided. + comparative_experiment_id : str or UUID + If this feedback was logged as a part of a comparative experiment, this + associates the feedback with that experiment. + feedback_group_id : str or UUID + When logging preferences, ranking runs, or other comparative feedback, + this is used to group feedback together. """ if run_id is None and project_id is None: raise ValueError("One of run_id and project_id must be provided") @@ -3531,6 +3539,8 @@ def create_feedback( modified_at=datetime.datetime.now(datetime.timezone.utc), feedback_config=feedback_config, session_id=project_id, + comparative_experiment_id=comparative_experiment_id, + feedback_group_id=feedback_group_id, ) self.request_with_retries( "POST", diff --git a/python/langsmith/evaluation/_runner.py b/python/langsmith/evaluation/_runner.py index 80807033f..e1f4b9ad8 100644 --- a/python/langsmith/evaluation/_runner.py +++ b/python/langsmith/evaluation/_runner.py @@ -38,9 +38,11 @@ from langsmith import run_trees, schemas from langsmith import utils as ls_utils from langsmith.evaluation.evaluator import ( + ComparisonEvaluationResult, EvaluationResult, EvaluationResults, RunEvaluator, + comparison_evaluator, run_evaluator, ) from langsmith.evaluation.integrations import LangChainStringEvaluator @@ -329,112 +331,6 @@ def evaluate_existing( ) -# Row-level evaluator -COMPARATIVE_EVALUATOR_T = Callable[[List[schemas.Run], Optional[schemas.Example]], EvaluationResult] - -def evaluate_pairwise( - experiments: Tuple[Union[str, uuid.UUID], Union[str, uuid.UUID]], - /, - evaluators: Optional[Sequence[COMPARATIVE_EVALUATOR_T]] = None, - metadata: Optional[dict] = None, - max_concurrency: Optional[int] = None, - client: Optional[langsmith.Client] = None, - load_nested: bool = False, - blocking: bool = True, -) -> ExperimentResults: - r"""Evaluate existing experiment runs against each other. - - Args: - experiments (Tuple[Union[str, uuid.UUID], Union[str, uuid.UUID]]): The identifier of the experiment to evaluate. - data (DATA_T): The data to use for evaluation. - evaluators (Optional[Sequence[EVALUATOR_T]]): Optional sequence of evaluators to use for individual run evaluation.= - metadata (Optional[dict]): Optional metadata to include in the evaluation results. - max_concurrency (Optional[int]): Optional maximum number of concurrent evaluations. - client (Optional[langsmith.Client]): Optional Langsmith client to use for evaluation. - load_nested: Whether to load all child runs for the experiment. - Default is to only load the top-level root runs. - blocking (bool): Whether to block until evaluation is complete. - - Returns: - ExperimentResults: The evaluation results. - - Environment: - - LANGSMITH_TEST_CACHE: If set, API calls will be cached to disk to save time and - cost during testing. Recommended to commit the cache files to your repository - for faster CI/CD runs. - Requires the 'langsmith[vcr]' package to be installed. - - Examples: - >>> from langsmith.evaluation import evaluate, evaluate_existing - >>> dataset_name = "Evaluate Examples" - >>> def predict(inputs: dict) -> dict: - ... # This can be any function or just an API call to your app. - ... return {"output": "Yes"} - >>> # First run inference on the dataset - ... results = evaluate( - ... predict, - ... data=dataset_name, - ... ) # doctest: +ELLIPSIS - View the evaluation results for experiment:... - >>> # Then apply evaluators to the experiment - ... def accuracy(run: Run, example: Example): - ... # Row-level evaluator for accuracy. - ... pred = run.outputs["output"] - ... expected = example.outputs["answer"] - ... return {"score": expected.lower() == pred.lower()} - >>> def precision(runs: Sequence[Run], examples: Sequence[Example]): - ... # Experiment-level evaluator for precision. - ... # TP / (TP + FP) - ... predictions = [run.outputs["output"].lower() for run in runs] - ... expected = [example.outputs["answer"].lower() for example in examples] - ... # yes and no are the only possible answers - ... tp = sum([p == e for p, e in zip(predictions, expected) if p == "yes"]) - ... fp = sum([p == "yes" and e == "no" for p, e in zip(predictions, expected)]) - ... return {"score": tp / (tp + fp)} - >>> experiment_name = ( - ... results.experiment_name - ... ) # Can use the returned experiment name - >>> experiment_name = "My Experiment:64e6e91" # Or manually specify - >>> results = evaluate_existing( - ... experiment_name, - ... summary_evaluators=[precision], - ... ) # doctest: +ELLIPSIS - View the evaluation results for experiment:... - """ # noqa: E501 - if len(experiments) != 2: - raise ValueError("Pairwise evaluation requires exactly two experiments.") - client = client or langsmith.Client() - projects = [_load_experiment(experiment, client) for experiment in experiments] - if projects[0].reference_dataset_id != projects[1].reference_dataset_id: - raise ValueError("Experiments must have the same reference dataset.") - runs = [_load_traces(experiment, client, load_nested=load_nested) for experiment in experiments] - data = list( - client.list_examples( - dataset_id=projects[0].reference_dataset_id, - as_of=projects[0].metadata.get("dataset_version"), - ) - ) - runs = [sorted(single_exp_runs, key=lambda r: str(r.reference_example_id)) for single_exp_runs in runs] - data = sorted(data, key=lambda d: str(d.id)) - - runs_dict = collections.defaultdict(list) - for runs_list in runs: - for run in runs_list: - runs_dict[run.reference_example_id].append(run) - - - return _evaluate_multi( - runs, - data=data, - evaluators=evaluators, - summary_evaluators=[], - metadata=metadata, - max_concurrency=max_concurrency, - client=client, - blocking=blocking, - ) - - class ExperimentResultRow(TypedDict): run: schemas.Run example: schemas.Example @@ -504,6 +400,92 @@ def wait(self) -> None: self._thread.join() +## Public API for Comparison Experiments + +# Row-level evaluator +COMPARATIVE_EVALUATOR_T = Callable[ + [List[schemas.Run], Optional[schemas.Example]], ComparisonEvaluationResult +] + + +def evaluate_comparative( + experiments: Tuple[Union[str, uuid.UUID], Union[str, uuid.UUID]], + /, + evaluators: Sequence[COMPARATIVE_EVALUATOR_T] = None, + max_concurrency: Optional[int] = None, # TODO: Add max_concurrency + client: Optional[langsmith.Client] = None, + load_nested: bool = False, +) -> ComparativeExperimentResults: + r"""Evaluate existing experiment runs against each other.""" # noqa: E501 + if len(experiments) != 2: + raise ValueError("Pairwise evaluation requires exactly two experiments.") + client = client or langsmith.Client() + # TODO: Add information about comparison experiments + comparative_experiment_id = uuid.uuid4() + projects = [_load_experiment(experiment, client) for experiment in experiments] + if projects[0].reference_dataset_id != projects[1].reference_dataset_id: + raise ValueError("Experiments must have the same reference dataset.") + runs = [ + _load_traces(experiment, client, load_nested=load_nested) + for experiment in experiments + ] + # TODO: Warn if different dataset versions, etc. are used + data = { + e.id: e + for e in client.list_examples( + dataset_id=projects[0].reference_dataset_id, + as_of=projects[0].metadata.get("dataset_version"), + ) + } + runs_dict: Dict[uuid.UUID, List[schemas.Run]] = collections.defaultdict(list) + for runs_list in runs: + for run in runs_list: + runs_dict[cast(uuid.UUID, run.reference_example_id)].append(run) + + comparators = [comparison_evaluator(evaluator) for evaluator in evaluators] + results: dict = {} + for example_id, runs_list in runs_dict.items(): + results[example_id] = { + "runs": runs_list, + } + for comparator in comparators: + feedback_group_id = uuid.uuid4() + result = comparator.compare_runs(runs_list, data[example_id]) + results[example_id][f"feedback.{result.key}"] = result + for run_id, score in result.scores.items(): + client.create_feedback( + run_id=run_id, + key=result.key, + score=score, + comparative_experiment_id=comparative_experiment_id, + source_run_id=result.source_run_id, + feedback_group_id=feedback_group_id, + ) + + return ComparativeExperimentResults(results) + + +class ComparativeExperimentResults: + """Represents the results of an evaluate_comparative() call. + + This class provides an iterator interface to iterate over the experiment results + as they become available. It also provides methods to access the experiment name, + the number of results, and to wait for the results to be processed. + + Methods: + experiment_name() -> str: Returns the name of the experiment. + wait() -> None: Waits for the experiment data to be processed. + """ + + def __init__( + self, + results: dict, + ): + self._results = results + # TODO Add comparative experiment ID, etc. + # this isn't useful rn. + + ## Private API @@ -568,49 +550,6 @@ def _evaluate( return results -def _evaluate_multi( - target: List[Iterable[schemas.Run]], - /, - data: DATA_T, - evaluators: Optional[Sequence[EVALUATOR_T]] = None, - metadata: Optional[dict] = None, - experiment_prefix: Optional[str] = None, - max_concurrency: Optional[int] = None, - client: Optional[langsmith.Client] = None, - blocking: bool = True, - experiment: Optional[schemas.TracerSession] = None, -) -> ExperimentResults: - # Initialize the experiment manager. - client = client or langsmith.Client() - runs = cast(Iterable[schemas.Run], target[0]) - - manager = _ExperimentManager( - data, - client=client, - metadata=metadata, - experiment=experiment_ or experiment_prefix, - # If provided, we don't need to create a new experiment. - runs=runs, - # Create or resolve the experiment. - ).start() - cache_dir = ls_utils.get_cache_dir(None) - cache_path = ( - pathlib.Path(cache_dir) / f"{manager.dataset_id}.yaml" if cache_dir else None - ) - with ls_utils.with_optional_cache(cache_path, ignore_hosts=[client.api_url]): - if evaluators: - # Apply evaluators to the predictions. - manager = manager.with_evaluators( - evaluators, max_concurrency=max_concurrency - ) - # Start consuming the results. - results = ExperimentResults(manager) - if blocking: - # Wait for the evaluation to complete. - results.wait() - return results - - def _is_uuid(value: str) -> bool: try: uuid.UUID(value) @@ -1127,466 +1066,6 @@ def _end(self) -> None: ) -class _ComparativeExperimentManagerMixin: - def __init__( - self, - /, - experiment: Optional[Union[schemas.TracerSession, str]], - metadata: Optional[dict] = None, - client: Optional[langsmith.Client] = None, - ): - self.client = client or langsmith.Client() - self._experiment: Optional[schemas.TracerSession] = None - if experiment is None: - self._experiment_name = _get_random_name() - elif isinstance(experiment, str): - self._experiment_name = experiment + "-" + str(uuid.uuid4().hex[:8]) - else: - self._experiment_name = cast(str, experiment.name) - self._experiment = experiment - - metadata = metadata or {} - if not metadata.get("revision_id"): - metadata = { - "revision_id": ls_env.get_langchain_env_var_metadata().get( - "revision_id" - ), - **metadata, - } - self._metadata = metadata or {} - - @property - def experiment_name(self) -> str: - if self._experiment_name is not None: - return self._experiment_name - raise ValueError( - "Experiment name not provided, and experiment not yet started." - ) - - def _get_experiment(self) -> schemas.TracerSession: - if self._experiment is None: - raise ValueError("Experiment not started yet.") - return self._experiment - - def _get_experiment_metadata(self): - project_metadata = self._metadata or {} - git_info = ls_env.get_git_info() - if git_info: - project_metadata = { - **project_metadata, - "git": git_info, - } - if self._experiment: - project_metadata = { - **self._experiment.metadata, - **project_metadata, - } - return project_metadata - - def _get_project(self, first_example: schemas.Example) -> schemas.TracerSession: - if self._experiment is None: - try: - project_metadata = self._get_experiment_metadata() - project = self.client.create_project( - self.experiment_name, - reference_dataset_id=first_example.dataset_id, - metadata=project_metadata, - ) - except (HTTPError, ValueError, ls_utils.LangSmithError) as e: - if "already exists " not in str(e): - raise e - raise ValueError( - # TODO: Better error - f"Experiment {self.experiment_name} already exists." - " Please use a different name." - ) - else: - project = self._experiment - return project - - def _print_experiment_start( - self, project: schemas.TracerSession, first_example: schemas.Example - ) -> None: - if project.url: - # TODO: Make this a public API - project_url = project.url.split("?")[0] - dataset_id = first_example.dataset_id - base_url = project_url.split("/projects/p/")[0] - comparison_url = ( - f"{base_url}/datasets/{dataset_id}/compare?" - f"selectedSessions={project.id}" - ) - print( # noqa: T201 - f"View the evaluation results for experiment: '{self.experiment_name}'" - f" at:\n{comparison_url}\n\n" - ) - else: - # HACKHACK - print("Starting evaluation of experiment: %s", self.experiment_name) - - -class _ComparativeExperimentManager(_ExperimentManagerMixin): - """Manage the execution of experiments. - - Supports lazily running predictions and evaluations in parallel to facilitate - result streaming and early debugging. - - Args: - data (DATA_T): The data used for the experiment. Can be a dataset name or ID OR - a generator of examples. - runs (Optional[Iterable[schemas.Run]]): The runs associated with the experiment - predictions. - experiment (Optional[schemas.TracerSession]): The tracer session - associated with the experiment. - experiment_prefix (Optional[str]): The prefix for the experiment name. - metadata (Optional[dict]): Additional metadata for the experiment. - client (Optional[langsmith.Client]): The Langsmith client used for - the experiment. - evaluation_results (Optional[Iterable[EvaluationResults]]): The evaluation - sresults for the experiment. - summary_results (Optional[Iterable[EvaluationResults]]): The aggregate results - for the experiment. - """ - - def __init__( - self, - data: DATA_T, - /, - experiment: Optional[Union[schemas.TracerSession, str]], - metadata: Optional[dict] = None, - client: Optional[langsmith.Client] = None, - runs: Optional[Iterable[schemas.Run]] = None, - evaluation_results: Optional[Iterable[EvaluationResults]] = None, - summary_results: Optional[Iterable[EvaluationResults]] = None, - ): - super().__init__( - experiment=experiment, - metadata=metadata, - client=client, - ) - self._data = data - self._examples: Optional[Iterable[schemas.Example]] = None - self._runs = runs - self._evaluation_results = evaluation_results - self._summary_results = summary_results - - @property - def examples(self) -> Iterable[schemas.Example]: - if self._examples is None: - self._examples = _resolve_data(self._data, client=self.client) - self._examples, examples_iter = itertools.tee(self._examples) - return examples_iter - - @property - def dataset_id(self) -> str: - if self._experiment is None or not getattr( - self._experiment, "reference_dataset_id", None - ): - example = next(iter(self.examples)) - return str(example.dataset_id) - return str( - cast(schemas.TracerSessionResult, self._experiment).reference_dataset_id - ) - - @property - def evaluation_results(self) -> Iterable[EvaluationResults]: - if self._evaluation_results is None: - return [{"results": []} for _ in self.examples] - return self._evaluation_results - - @property - def runs(self) -> Iterable[schemas.Run]: - if self._runs is None: - raise ValueError( - "Runs not provided in this experiment." " Please predict first." - ) - self._runs, runs_iter = itertools.tee(self._runs) - return runs_iter - - def start(self) -> _ExperimentManager: - first_example = next(itertools.islice(self.examples, 1)) - project = self._get_project(first_example) - self._print_experiment_start(project, first_example) - return self.__class__( - self.examples, - experiment=project, - metadata=self._metadata, - client=self.client, - runs=self._runs, - evaluation_results=self._evaluation_results, - ) - - def with_predictions( - self, - target: TARGET_T, - /, - max_concurrency: Optional[int] = None, - ) -> _ExperimentManager: - """Lazily apply the target function to the experiment.""" - context = copy_context() - _experiment_results = context.run( - self._predict, target, max_concurrency=max_concurrency - ) - r1, r2 = itertools.tee(_experiment_results, 2) - return _ExperimentManager( - (pred["example"] for pred in r1), - experiment=self._experiment, - metadata=self._metadata, - client=self.client, - runs=(pred["run"] for pred in r2), - # TODO: Can't do multiple prediction rounds rn. - ) - - def with_evaluators( - self, - evaluators: Sequence[ - Union[ - EVALUATOR_T, - RunEvaluator, - ] - ], - *, - max_concurrency: Optional[int] = None, - ) -> _ExperimentManager: - """Lazily apply the provided evaluators to the experiment.""" - evaluators = _resolve_evaluators(evaluators) - context = copy_context() - experiment_results = context.run( - self._score, evaluators, max_concurrency=max_concurrency - ) - # Split the generator into three so the manager - # can consume each value individually. - r1, r2, r3 = itertools.tee(experiment_results, 3) - return _ExperimentManager( - (result["example"] for result in r1), - experiment=self._experiment, - metadata=self._metadata, - client=self.client, - runs=(result["run"] for result in r2), - evaluation_results=(result["evaluation_results"] for result in r3), - summary_results=self._summary_results, - ) - - def with_summary_evaluators( - self, - summary_evaluators: Sequence[SUMMARY_EVALUATOR_T], - ) -> _ExperimentManager: - """Lazily apply the provided summary evaluators to the experiment.""" - wrapped_evaluators = _wrap_summary_evaluators(summary_evaluators) - context = copy_context() - aggregate_feedback_gen = context.run( - self._apply_summary_evaluators, wrapped_evaluators - ) - return _ExperimentManager( - self.examples, - experiment=self._experiment, - metadata=self._metadata, - client=self.client, - runs=self.runs, - evaluation_results=self._evaluation_results, - summary_results=aggregate_feedback_gen, - ) - - def get_results(self) -> Iterable[ExperimentResultRow]: - """Return the traces, evaluation results, and associated examples.""" - for run, example, evaluation_results in zip( - self.runs, self.examples, self.evaluation_results - ): - yield ExperimentResultRow( - run=run, - example=example, - evaluation_results=evaluation_results, - ) - - def get_summary_scores(self) -> Dict[str, List[dict]]: - """If summary_evaluators were applied, consume and return the results.""" - if self._summary_results is None: - return {"results": []} - # Consume the generator - return { - "results": [ - res # type: ignore[misc] - for results in self._summary_results - for res in results["results"] - ] - } - - # Private methods - - def _predict( - self, target: TARGET_T, /, max_concurrency: Optional[int] = None - ) -> Generator[_ForwardResults, None, None]: - """Run the target function on the examples.""" - fn = _ensure_traceable(target) - if max_concurrency == 0: - for example in self.examples: - yield _forward( - fn, example, self.experiment_name, self._metadata, self.client - ) - - else: - with cf.ThreadPoolExecutor(max_concurrency) as executor: - futures = [ - executor.submit( - _forward, - fn, - example, - self.experiment_name, - self._metadata, - self.client, - ) - for example in self.examples - ] - for future in cf.as_completed(futures): - yield future.result() - # Close out the project. - self._end() - - def _run_evaluators( - self, - evaluators: Sequence[RunEvaluator], - current_results: ExperimentResultRow, - ) -> ExperimentResultRow: - current_context = rh.get_tracing_context() - metadata = { - **(current_context["metadata"] or {}), - **{ - "experiment": self.experiment_name, - "reference_example_id": current_results["example"].id, - "reference_run_id": current_results["run"].id, - }, - } - with rh.tracing_context( - **{**current_context, "project_name": "evaluators", "metadata": metadata} - ): - run = current_results["run"] - example = current_results["example"] - eval_results = current_results["evaluation_results"] - for evaluator in evaluators: - try: - evaluator_response = evaluator.evaluate_run( - run=run, - example=example, - ) - eval_results["results"].extend( - # TODO: This is a hack - self.client._log_evaluation_feedback( - evaluator_response, - run=run, - ) - ) - except Exception as e: - logger.error( - f"Error running evaluator {repr(evaluator)} on" - f" run {run.id}: {repr(e)}", - exc_info=True, - ) - return ExperimentResultRow( - run=run, - example=example, - evaluation_results=eval_results, - ) - - def _score( - self, - evaluators: Sequence[RunEvaluator], - max_concurrency: Optional[int] = None, - ) -> Iterable[ExperimentResultRow]: - """Run the evaluators on the prediction stream. - - Expects runs to be available in the manager. - (e.g. from a previous prediction step) - """ - if max_concurrency == 0: - for current_results in self.get_results(): - yield self._run_evaluators(evaluators, current_results) - else: - with cf.ThreadPoolExecutor(max_workers=max_concurrency) as executor: - futures = [] - for current_results in self.get_results(): - futures.append( - executor.submit( - self._run_evaluators, - evaluators, - current_results, - ) - ) - for future in cf.as_completed(futures): - result = future.result() - yield result - - def _apply_summary_evaluators( - self, summary_evaluators: Sequence[SUMMARY_EVALUATOR_T] - ) -> Generator[EvaluationResults, None, None]: - runs, examples = [], [] - for run, example in zip(self.runs, self.examples): - runs.append(run) - examples.append(example) - aggregate_feedback = [] - with cf.ThreadPoolExecutor() as executor: - project_id = self._get_experiment().id - current_context = rh.get_tracing_context() - metadata = { - **(current_context["metadata"] or {}), - **{ - "experiment": self.experiment_name, - "experiment_id": project_id, - }, - } - with rh.tracing_context( - **{ - **current_context, - "project_name": "evaluators", - "metadata": metadata, - } - ): - for evaluator in summary_evaluators: - try: - summary_eval_result = evaluator(runs, examples) - # TODO: Expose public API for this. - flattened_results = self.client._select_eval_results( - summary_eval_result, - fn_name=evaluator.__name__, - ) - aggregate_feedback.extend(flattened_results) - for result in flattened_results: - feedback = result.dict(exclude={"target_run_id"}) - evaluator_info = feedback.pop("evaluator_info", None) - executor.submit( - self.client.create_feedback, - **feedback, - run_id=None, - project_id=project_id, - source_info=evaluator_info, - ) - except Exception as e: - logger.error( - f"Error running summary evaluator {repr(evaluator)}: {e}" - ) - yield {"results": aggregate_feedback} - - def _get_dataset_version(self) -> Optional[str]: - examples = list(self.examples) - modified_at = [ex.modified_at for ex in examples if ex.modified_at] - # Should always be defined in practice when fetched, - # but the typing permits None - max_modified_at = max(modified_at) if modified_at else None - return max_modified_at.isoformat() if max_modified_at else None - - def _end(self) -> None: - experiment = self._experiment - if experiment is None: - raise ValueError("Experiment not started yet.") - - project_metadata = self._get_experiment_metadata() - project_metadata["dataset_version"] = self._get_dataset_version() - self.client.update_project( - experiment.id, - end_time=datetime.datetime.now(datetime.timezone.utc), - metadata=project_metadata, - ) - - def _resolve_evaluators( evaluators: Sequence[Union[EVALUATOR_T, RunEvaluator, AEVALUATOR_T]], ) -> Sequence[RunEvaluator]: diff --git a/python/langsmith/evaluation/evaluator.py b/python/langsmith/evaluation/evaluator.py index 7b6e4bd1f..db85ed4c8 100644 --- a/python/langsmith/evaluation/evaluator.py +++ b/python/langsmith/evaluation/evaluator.py @@ -4,7 +4,18 @@ import inspect import uuid from abc import abstractmethod -from typing import Any, Awaitable, Callable, Dict, List, Literal, Optional, Union, cast +from typing import ( + Any, + Awaitable, + Callable, + Dict, + List, + Literal, + Optional, + Sequence, + Union, + cast, +) from typing_extensions import TypedDict @@ -105,6 +116,24 @@ async def aevaluate_run( _RUNNABLE_OUTPUT = Union[EvaluationResult, EvaluationResults, dict] +class ComparisonEvaluationResult(BaseModel): + """Feedback scores for the results of comparative evaluations. + + These are generated by functions that compare two or more runs, + returning a ranking or other feedback. + """ + + key: str + """The aspect, metric name, or label for this evaluation.""" + scores: Dict[Union[uuid.UUID, str], SCORE_TYPE] + """The scores for each run in the comparison.""" + source_run_id: Optional[Union[uuid.UUID, str]] = None + """The ID of the trace of the evaluator itself.""" + + +_COMPARISON_OUTPUT = Union[ComparisonEvaluationResult, dict] + + class DynamicRunEvaluator(RunEvaluator): """A dynamic evaluator that wraps a function and transforms it into a `RunEvaluator`. @@ -120,13 +149,13 @@ def __init__( self, func: Callable[ [Run, Optional[Example]], - Union[_RUNNABLE_OUTPUT, Awaitable[_RUNNABLE_OUTPUT]], + Union[_COMPARISON_OUTPUT, Awaitable[_COMPARISON_OUTPUT]], ], # Async function to be used for async evaluation. Optional afunc: Optional[ Callable[ [Run, Optional[Example]], - Awaitable[_RUNNABLE_OUTPUT], + Awaitable[_COMPARISON_OUTPUT], ] ] = None, ): @@ -134,7 +163,7 @@ def __init__( Args: func (Callable): A function that takes a `Run` and an optional `Example` as - arguments, and returns an `EvaluationResult` or `EvaluationResults`. + arguments, and returns a dict or `ComparisonEvaluationResult`. """ wraps(func)(self) from langsmith import run_helpers # type: ignore @@ -313,3 +342,217 @@ def run_evaluator( Decorator that transforms a function into a `RunEvaluator`. """ return DynamicRunEvaluator(func) + + +class DynamicComparisonRunEvaluator: + """Compare predictions (as traces) from 2 or more runs.""" + + def __init__( + self, + func: Callable[ + [Sequence[Run], Optional[Example]], + Union[_COMPARISON_OUTPUT, Awaitable[_COMPARISON_OUTPUT]], + ], + # Async function to be used for async evaluation. Optional + afunc: Optional[ + Callable[ + [Sequence[Run], Optional[Example]], + Awaitable[_COMPARISON_OUTPUT], + ] + ] = None, + ): + """Initialize the DynamicRunEvaluator with a given function. + + Args: + func (Callable): A function that takes a `Run` and an optional `Example` as + arguments, and returns an `EvaluationResult` or `EvaluationResults`. + """ + wraps(func)(self) + from langsmith import run_helpers # type: ignore + + if afunc is not None: + self.afunc = run_helpers.ensure_traceable(afunc) + self._name = getattr(afunc, "__name__", "DynamicRunEvaluator") + if inspect.iscoroutinefunction(func): + if afunc is not None: + raise TypeError( + "Func was provided as a coroutine function, but afunc was " + "also provided. If providing both, func should be a regular " + "function to avoid ambiguity." + ) + self.afunc = run_helpers.ensure_traceable(func) + self._name = getattr(func, "__name__", "DynamicRunEvaluator") + else: + self.func = cast( + run_helpers.SupportsLangsmithExtra[_RUNNABLE_OUTPUT], + run_helpers.ensure_traceable(func), + ) + self._name = getattr(func, "__name__", "DynamicRunEvaluator") + + @property + def is_async(self) -> bool: + """Check if the evaluator function is asynchronous. + + Returns: + bool: True if the evaluator function is asynchronous, False otherwise. + """ + return hasattr(self, "afunc") + + def compare_runs( + self, runs: Sequence[Run], example: Optional[Example] = None + ) -> ComparisonEvaluationResult: + """Compare runs to score preferences. + + Args: + runs: A list of runs to compare. + example: An optional example to be used in the evaluation. + + """ # noqa: E501 + if not hasattr(self, "func"): + running_loop = asyncio.get_event_loop() + if running_loop.is_running(): + raise RuntimeError( + "Cannot call `evaluate_run` on an async run evaluator from" + " within an running event loop. Use `aevaluate_run` instead." + ) + else: + return running_loop.run_until_complete( + self.acompare_runs(runs, example) + ) + source_run_id = uuid.uuid4() + tags = self._get_tags(runs) + # TODO: Add metadata for the "comparison experiment" here + result = self.func( + runs, + example, + langsmith_extra={"run_id": source_run_id, "tags": tags}, + ) + return self._format_result(result, source_run_id) + + async def acompare_runs( + self, runs: Sequence[Run], example: Optional[Example] = None + ) -> ComparisonEvaluationResult: + """Evaluate a run asynchronously using the wrapped async function. + + This method directly invokes the wrapped async function with the + provided arguments. + + Args: + run (Run): The run to be evaluated. + example (Optional[Example]): An optional example to be used + in the evaluation. + + Returns: + Union[EvaluationResult, EvaluationResults]: The result of the evaluation. + """ + if not hasattr(self, "afunc"): + return await super().aevaluate_run(runs, example) + source_run_id = uuid.uuid4() + tags = self._get_tags(runs) + # TODO: Add metadata for the "comparison experiment" here + result = await self.afunc( + runs, + example, + langsmith_extra={"run_id": source_run_id, "tags": tags}, + ) + return self._format_result(result, source_run_id) + + def __call__( + self, runs: Sequence[Run], example: Optional[Example] = None + ) -> Union[EvaluationResult, EvaluationResults]: + """Make the evaluator callable, allowing it to be used like a function. + + This method enables the evaluator instance to be called directly, forwarding the + call to `evaluate_run`. + + Args: + run (Run): The run to be evaluated. + example (Optional[Example]): An optional example to be used in the evaluation. + + Returns: + Union[EvaluationResult, EvaluationResults]: The result of the evaluation. + """ # noqa: E501 + return self.evaluate_run(runs, example) + + def __repr__(self) -> str: + """Represent the DynamicRunEvaluator object.""" + return f"" + + @staticmethod + def _get_tags(runs: Sequence[Run]) -> List[str]: + """Extract tags from runs.""" + # Add tags to support filtering + tags = [] + for run in runs: + tags.append("run:" + str(run.id)) + if getattr(run, "session_id", None): + tags.append("experiment:" + str(run.session_id)) + return tags + + def _coerce_evaluation_result( + self, + result: Union[EvaluationResult, dict], + source_run_id: uuid.UUID, + allow_no_key: bool = False, + ) -> EvaluationResult: + if isinstance(result, EvaluationResult): + if not result.source_run_id: + result.source_run_id = source_run_id + return result + try: + if "key" not in result: + if allow_no_key: + result["key"] = self._name + return EvaluationResult(**{"source_run_id": source_run_id, **result}) + except ValidationError as e: + raise ValueError( + "Expected an EvaluationResult object, or dict with a metric" + f" 'key' and optional 'score'; got {result}" + ) from e + + def _coerce_evaluation_results( + self, + results: Union[dict, EvaluationResults], + source_run_id: uuid.UUID, + ) -> Union[EvaluationResult, EvaluationResults]: + if "results" in results: + cp = results.copy() + cp["results"] = [ + self._coerce_evaluation_result(r, source_run_id=source_run_id) + for r in results["results"] + ] + return EvaluationResults(**cp) + + return self._coerce_evaluation_result( + cast(dict, results), allow_no_key=True, source_run_id=source_run_id + ) + + def _format_result( + self, + result: Union[dict, ComparisonEvaluationResult], + source_run_id: uuid.UUID, + ) -> ComparisonEvaluationResult: + if isinstance(result, ComparisonEvaluationResult): + if not result.source_run_id: + result.source_run_id = source_run_id + return result + try: + return ComparisonEvaluationResult( + **{"source_run_id": source_run_id, **result} + ) + except ValidationError as e: + raise ValueError( + f"Expected a dictionary with a 'key' and dictionary of scores mapping" + "run IDs to numeric scores, or ComparisonEvaluationResult object," + f" got {result}" + ) from e + + +def comparison_evaluator( + func: Callable[ + [Sequence[Run], Optional[Example]], + Union[_COMPARISON_OUTPUT, Awaitable[_COMPARISON_OUTPUT]], + ], +) -> DynamicComparisonRunEvaluator: + """Create a comaprison evaluator from a function.""" + return DynamicComparisonRunEvaluator(func)