From 540e276001d1367c0157549eba3da7f41238a06a Mon Sep 17 00:00:00 2001 From: isaac hershenson Date: Thu, 19 Dec 2024 18:06:23 -0800 Subject: [PATCH 1/8] Add inline evaluation results --- python/langsmith/evaluation/_arunner.py | 29 ++-- python/langsmith/evaluation/_runner.py | 53 ++++--- python/langsmith/evaluation/evaluator.py | 138 ++++++------------ python/langsmith/schemas.py | 81 ++++++++++ python/tests/integration_tests/test_client.py | 64 ++++++++ 5 files changed, 232 insertions(+), 133 deletions(-) diff --git a/python/langsmith/evaluation/_arunner.py b/python/langsmith/evaluation/_arunner.py index 7b29241e6..8dea802cd 100644 --- a/python/langsmith/evaluation/_arunner.py +++ b/python/langsmith/evaluation/_arunner.py @@ -36,7 +36,6 @@ AEVALUATOR_T, DATA_T, EVALUATOR_T, - ExperimentResultRow, _evaluators_include_attachments, _ExperimentManagerMixin, _extract_feedback_keys, @@ -690,7 +689,9 @@ async def awith_summary_evaluators( summary_evaluators: Sequence[SUMMARY_EVALUATOR_T], ) -> _AsyncExperimentManager: wrapped_evaluators = _wrap_summary_evaluators(summary_evaluators) - aggregate_feedback_gen = self._aapply_summary_evaluators(wrapped_evaluators) + aggregate_feedback_gen = self._aapply_summary_evaluators( + wrapped_evaluators, [r async for r in self.aget_results()] + ) return _AsyncExperimentManager( await self.aget_examples(), experiment=self._experiment, @@ -703,11 +704,11 @@ async def awith_summary_evaluators( upload_results=self._upload_results, ) - async def aget_results(self) -> AsyncIterator[ExperimentResultRow]: + async def aget_results(self) -> AsyncIterator[schemas.ExperimentResultRow]: async for run, example, evaluation_results in aitertools.async_zip( self.aget_runs(), await self.aget_examples(), self.aget_evaluation_results() ): - yield ExperimentResultRow( + yield schemas.ExperimentResultRow( run=run, example=example, evaluation_results=evaluation_results, @@ -758,7 +759,7 @@ async def _ascore( self, evaluators: Sequence[RunEvaluator], max_concurrency: Optional[int] = None, - ) -> AsyncIterator[ExperimentResultRow]: + ) -> AsyncIterator[schemas.ExperimentResultRow]: with cf.ThreadPoolExecutor(max_workers=4) as executor: async def score_all(): @@ -776,9 +777,9 @@ async def score_all(): async def _arun_evaluators( self, evaluators: Sequence[RunEvaluator], - current_results: ExperimentResultRow, + current_results: schemas.ExperimentResultRow, executor: cf.ThreadPoolExecutor, - ) -> ExperimentResultRow: + ) -> schemas.ExperimentResultRow: current_context = rh.get_tracing_context() metadata = { **(current_context["metadata"] or {}), @@ -848,14 +849,16 @@ async def _arun_evaluators( for attachment in example.attachments: reader = example.attachments[attachment]["reader"] reader.seek(0) - return ExperimentResultRow( + return schemas.ExperimentResultRow( run=run, example=example, evaluation_results=eval_results, ) async def _aapply_summary_evaluators( - self, summary_evaluators: Sequence[SUMMARY_EVALUATOR_T] + self, + summary_evaluators: Sequence[SUMMARY_EVALUATOR_T], + evaluation_results: List[schemas.ExperimentResultRow], ) -> AsyncIterator[EvaluationResults]: runs, examples = [], [] async_examples = aitertools.ensure_async_iterator(await self.aget_examples()) @@ -885,7 +888,7 @@ async def _aapply_summary_evaluators( ): for evaluator in summary_evaluators: try: - summary_eval_result = evaluator(runs, examples) + summary_eval_result = evaluator(runs, examples, evaluation_results) flattened_results = self.client._select_eval_results( summary_eval_result, fn_name=evaluator.__name__, @@ -963,7 +966,7 @@ def __init__( experiment_manager: _AsyncExperimentManager, ): self._manager = experiment_manager - self._results: List[ExperimentResultRow] = [] + self._results: List[schemas.ExperimentResultRow] = [] self._lock = asyncio.Lock() self._task = asyncio.create_task(self._process_data(self._manager)) self._processed_count = 0 @@ -972,10 +975,10 @@ def __init__( def experiment_name(self) -> str: return self._manager.experiment_name - def __aiter__(self) -> AsyncIterator[ExperimentResultRow]: + def __aiter__(self) -> AsyncIterator[schemas.ExperimentResultRow]: return self - async def __anext__(self) -> ExperimentResultRow: + async def __anext__(self) -> schemas.ExperimentResultRow: async def _wait_until_index(index: int) -> None: while self._processed_count < index: await asyncio.sleep(0.05) diff --git a/python/langsmith/evaluation/_runner.py b/python/langsmith/evaluation/_runner.py index e755fe423..06ae64dab 100644 --- a/python/langsmith/evaluation/_runner.py +++ b/python/langsmith/evaluation/_runner.py @@ -534,12 +534,6 @@ def evaluate_existing( ) -class ExperimentResultRow(TypedDict): - run: schemas.Run - example: schemas.Example - evaluation_results: EvaluationResults - - class ExperimentResults: """Represents the results of an evaluate() call. @@ -554,8 +548,8 @@ class ExperimentResults: def __init__(self, experiment_manager: _ExperimentManager, blocking: bool = True): self._manager = experiment_manager - self._results: List[ExperimentResultRow] = [] - self._queue: queue.Queue[ExperimentResultRow] = queue.Queue() + self._results: List[schemas.ExperimentResultRow] = [] + self._queue: queue.Queue[schemas.ExperimentResultRow] = queue.Queue() self._processing_complete = threading.Event() if not blocking: self._thread: Optional[threading.Thread] = threading.Thread( @@ -570,7 +564,7 @@ def __init__(self, experiment_manager: _ExperimentManager, blocking: bool = True def experiment_name(self) -> str: return self._manager.experiment_name - def __iter__(self) -> Iterator[ExperimentResultRow]: + def __iter__(self) -> Iterator[schemas.ExperimentResultRow]: ix = 0 while ( not self._processing_complete.is_set() @@ -1439,6 +1433,7 @@ def with_evaluators( # Split the generator into three so the manager # can consume each value individually. r1, r2, r3 = itertools.tee(experiment_results, 3) + # print("FOOOO", [result["evaluation_results"] for result in r3]) return _ExperimentManager( (result["example"] for result in r1), experiment=self._experiment, @@ -1459,7 +1454,9 @@ def with_summary_evaluators( wrapped_evaluators = _wrap_summary_evaluators(summary_evaluators) context = copy_context() aggregate_feedback_gen = context.run( - self._apply_summary_evaluators, wrapped_evaluators + self._apply_summary_evaluators, + wrapped_evaluators, + [r for r in self.get_results()], ) return _ExperimentManager( self.examples, @@ -1473,12 +1470,12 @@ def with_summary_evaluators( upload_results=self._upload_results, ) - def get_results(self) -> Iterable[ExperimentResultRow]: + def get_results(self) -> Iterable[schemas.ExperimentResultRow]: """Return the traces, evaluation results, and associated examples.""" for run, example, evaluation_results in zip( self.runs, self.examples, self.evaluation_results ): - yield ExperimentResultRow( + yield schemas.ExperimentResultRow( run=run, example=example, evaluation_results=evaluation_results, @@ -1544,9 +1541,9 @@ def _predict( def _run_evaluators( self, evaluators: Sequence[RunEvaluator], - current_results: ExperimentResultRow, + current_results: schemas.ExperimentResultRow, executor: cf.ThreadPoolExecutor, - ) -> ExperimentResultRow: + ) -> schemas.ExperimentResultRow: current_context = rh.get_tracing_context() metadata = { **(current_context["metadata"] or {}), @@ -1619,7 +1616,7 @@ def _run_evaluators( reader = example.attachments[attachment]["reader"] reader.seek(0) - return ExperimentResultRow( + return schemas.ExperimentResultRow( run=run, example=example, evaluation_results=eval_results, @@ -1629,7 +1626,7 @@ def _score( self, evaluators: Sequence[RunEvaluator], max_concurrency: Optional[int] = None, - ) -> Iterable[ExperimentResultRow]: + ) -> Iterable[schemas.ExperimentResultRow]: """Run the evaluators on the prediction stream. Expects runs to be available in the manager. @@ -1671,7 +1668,9 @@ def _score( yield result def _apply_summary_evaluators( - self, summary_evaluators: Sequence[SUMMARY_EVALUATOR_T] + self, + summary_evaluators: Sequence[SUMMARY_EVALUATOR_T], + evaluation_results: List[schemas.ExperimentResultRow], ) -> Generator[EvaluationResults, None, None]: runs, examples = [], [] for run, example in zip(self.runs, self.examples): @@ -1699,7 +1698,9 @@ def _apply_summary_evaluators( ): for evaluator in summary_evaluators: try: - summary_eval_result = evaluator(runs, examples) + summary_eval_result = evaluator( + runs, examples, evaluation_results + ) # TODO: Expose public API for this. flattened_results = self.client._select_eval_results( summary_eval_result, @@ -1793,16 +1794,20 @@ def _wrap(evaluator: SUMMARY_EVALUATOR_T) -> SUMMARY_EVALUATOR_T: @functools.wraps(evaluator) def _wrapper_inner( - runs: Sequence[schemas.Run], examples: Sequence[schemas.Example] + runs: Sequence[schemas.Run], + examples: Sequence[schemas.Example], + evaluation_results: Sequence[schemas.ExperimentResultRow], ) -> Union[EvaluationResult, EvaluationResults]: @rh.traceable(name=eval_name) def _wrapper_super_inner( - runs_: str, examples_: str + runs_: str, examples_: str, evaluation_results_: str ) -> Union[EvaluationResult, EvaluationResults]: - return evaluator(list(runs), list(examples)) + return evaluator(list(runs), list(examples), list(evaluation_results)) return _wrapper_super_inner( - f"Runs[] (Length={len(runs)})", f"Examples[] (Length={len(examples)})" + f"Runs[] (Length={len(runs)})", + f"Examples[] (Length={len(examples)})", + f"EvaluationResults[] (Length={len(evaluation_results)})", ) return _wrapper_inner @@ -2173,7 +2178,7 @@ def extract_evaluation_results_keys(node, variables): def _to_pandas( - results: list[ExperimentResultRow], + results: list[schemas.ExperimentResultRow], start: Optional[int] = 0, end: Optional[int] = None, ): @@ -2190,7 +2195,7 @@ def _to_pandas( def _flatten_experiment_results( - results: list[ExperimentResultRow], + results: list[schemas.ExperimentResultRow], start: Optional[int] = 0, end: Optional[int] = None, ): diff --git a/python/langsmith/evaluation/evaluator.py b/python/langsmith/evaluation/evaluator.py index a1505699a..e62db709a 100644 --- a/python/langsmith/evaluation/evaluator.py +++ b/python/langsmith/evaluation/evaluator.py @@ -12,122 +12,40 @@ Callable, Dict, List, - Literal, Optional, Sequence, Union, cast, ) -from typing_extensions import TypedDict - from langsmith import schemas try: from pydantic.v1 import ( # type: ignore[import] BaseModel, - Field, ValidationError, - validator, ) except ImportError: from pydantic import ( # type: ignore[assignment] BaseModel, - Field, ValidationError, - validator, ) import logging from functools import wraps -from langsmith.schemas import SCORE_TYPE, VALUE_TYPE, Example, Run +from langsmith.schemas import ( + SCORE_TYPE, + EvaluationResult, + EvaluationResults, + Example, + ExperimentResultRow, + Run, +) logger = logging.getLogger(__name__) -class Category(TypedDict): - """A category for categorical feedback.""" - - value: Optional[Union[float, int]] - """The numeric score/ordinal corresponding to this category.""" - label: str - """The label for this category.""" - - -class FeedbackConfig(TypedDict, total=False): - """Configuration to define a type of feedback. - - Applied on on the first creation of a feedback_key. - """ - - type: Literal["continuous", "categorical", "freeform"] - """The type of feedback.""" - min: Optional[Union[float, int]] - """The minimum permitted value (if continuous type).""" - max: Optional[Union[float, int]] - """The maximum value permitted value (if continuous type).""" - categories: Optional[List[Union[Category, dict]]] - - -class EvaluationResult(BaseModel): - """Evaluation result.""" - - key: str - """The aspect, metric name, or label for this evaluation.""" - score: SCORE_TYPE = None - """The numeric score for this evaluation.""" - value: VALUE_TYPE = None - """The value for this evaluation, if not numeric.""" - comment: Optional[str] = None - """An explanation regarding the evaluation.""" - correction: Optional[Dict] = None - """What the correct value should be, if applicable.""" - evaluator_info: Dict = Field(default_factory=dict) - """Additional information about the evaluator.""" - feedback_config: Optional[Union[FeedbackConfig, dict]] = None - """The configuration used to generate this feedback.""" - source_run_id: Optional[Union[uuid.UUID, str]] = None - """The ID of the trace of the evaluator itself.""" - target_run_id: Optional[Union[uuid.UUID, str]] = None - """The ID of the trace this evaluation is applied to. - - If none provided, the evaluation feedback is applied to the - root trace being.""" - extra: Optional[Dict] = None - """Metadata for the evaluator run.""" - - class Config: - """Pydantic model configuration.""" - - allow_extra = False - - @validator("value", pre=True) - def check_value_non_numeric(cls, v, values): - """Check that the value is not numeric.""" - # If a score isn't provided and the value is numeric - # it's more likely the user intended use the score field - if "score" not in values or values["score"] is None: - if isinstance(v, (int, float)): - logger.warning( - "Numeric values should be provided in" - " the 'score' field, not 'value'." - f" Got: {v}" - ) - return v - - -class EvaluationResults(TypedDict, total=False): - """Batch evaluation results. - - This makes it easy for your evaluator to return multiple - metrics at once. - """ - - results: List[EvaluationResult] - """The evaluation results.""" - - class RunEvaluator: """Evaluator interface class.""" @@ -805,18 +723,29 @@ def _format_evaluator_result( SUMMARY_EVALUATOR_T = Union[ Callable[ - [Sequence[schemas.Run], Sequence[schemas.Example]], + [ + Sequence[schemas.Run], + Sequence[schemas.Example], + Sequence[ExperimentResultRow], + ], Union[EvaluationResult, EvaluationResults], ], Callable[ - [List[schemas.Run], List[schemas.Example]], + [List[schemas.Run], List[schemas.Example], List[ExperimentResultRow]], Union[EvaluationResult, EvaluationResults], ], ] def _normalize_summary_evaluator(func: Callable) -> SUMMARY_EVALUATOR_T: - supported_args = ("runs", "examples", "inputs", "outputs", "reference_outputs") + supported_args = ( + "runs", + "examples", + "inputs", + "outputs", + "reference_outputs", + "evaluation_results", + ) sig = inspect.signature(func) positional_args = [ pname @@ -828,7 +757,7 @@ def _normalize_summary_evaluator(func: Callable) -> SUMMARY_EVALUATOR_T: and len(positional_args) != 2 ): msg = ( - f"Invalid evaluator function. Must have at least one positional " + f"Invalid summary evaluator function. Must have at least one positional " f"argument. Supported positional arguments are {supported_args}." ) if positional_args: @@ -839,11 +768,27 @@ def _normalize_summary_evaluator(func: Callable) -> SUMMARY_EVALUATOR_T: elif not all( pname in supported_args for pname in positional_args ) or positional_args == ["runs", "examples"]: - return func + + def wrapper( + runs: Sequence[schemas.Run], + examples: Sequence[schemas.Example], + evaluation_results: Sequence[ExperimentResultRow], + ) -> Union[EvaluationResult, EvaluationResults]: + result = func(runs, examples) + if isinstance(result, EvaluationResult): + return result + return _format_evaluator_result(result) # type: ignore[return-value] + + wrapper.__name__ = ( + getattr(func, "__name__") if hasattr(func, "__name__") else wrapper.__name__ + ) + return wrapper # type: ignore[return-value] else: def wrapper( - runs: Sequence[schemas.Run], examples: Sequence[schemas.Example] + runs: Sequence[schemas.Run], + examples: Sequence[schemas.Example], + evaluation_results: Sequence[ExperimentResultRow], ) -> Union[EvaluationResult, EvaluationResults]: arg_map = { "runs": runs, @@ -851,6 +796,7 @@ def wrapper( "inputs": [example.inputs for example in examples], "outputs": [run.outputs or {} for run in runs], "reference_outputs": [example.outputs or {} for example in examples], + "evaluation_results": evaluation_results, } args = (arg_map[arg] for arg in positional_args) result = func(*args) diff --git a/python/langsmith/schemas.py b/python/langsmith/schemas.py index acedaf177..639b2bafd 100644 --- a/python/langsmith/schemas.py +++ b/python/langsmith/schemas.py @@ -28,6 +28,7 @@ StrictBool, StrictFloat, StrictInt, + validator, ) except ImportError: from pydantic import ( # type: ignore[assignment] @@ -37,10 +38,15 @@ StrictBool, StrictFloat, StrictInt, + validator, ) +import logging + from typing_extensions import Literal +logger = logging.getLogger(__name__) + SCORE_TYPE = Union[StrictBool, StrictInt, StrictFloat, None] VALUE_TYPE = Union[Dict, str, None] @@ -1086,3 +1092,78 @@ class UpsertExamplesResponse(TypedDict): """The number of examples that were upserted.""" example_ids: List[str] """The ids of the examples that were upserted.""" + + +class Category(TypedDict): + """A category for categorical feedback.""" + + value: Optional[Union[float, int]] + """The numeric score/ordinal corresponding to this category.""" + label: str + """The label for this category.""" + + +class EvaluationResult(BaseModel): + """Evaluation result.""" + + key: str + """The aspect, metric name, or label for this evaluation.""" + score: SCORE_TYPE = None + """The numeric score for this evaluation.""" + value: VALUE_TYPE = None + """The value for this evaluation, if not numeric.""" + comment: Optional[str] = None + """An explanation regarding the evaluation.""" + correction: Optional[Dict] = None + """What the correct value should be, if applicable.""" + evaluator_info: Dict = Field(default_factory=dict) + """Additional information about the evaluator.""" + feedback_config: Optional[Union[FeedbackConfig, dict]] = None + """The configuration used to generate this feedback.""" + source_run_id: Optional[Union[UUID, str]] = None + """The ID of the trace of the evaluator itself.""" + target_run_id: Optional[Union[UUID, str]] = None + """The ID of the trace this evaluation is applied to. + + If none provided, the evaluation feedback is applied to the + root trace being.""" + extra: Optional[Dict] = None + """Metadata for the evaluator run.""" + + class Config: + """Pydantic model configuration.""" + + allow_extra = False + + @validator("value", pre=True) + def check_value_non_numeric(cls, v, values): + """Check that the value is not numeric.""" + # If a score isn't provided and the value is numeric + # it's more likely the user intended use the score field + if "score" not in values or values["score"] is None: + if isinstance(v, (int, float)): + logger.warning( + "Numeric values should be provided in" + " the 'score' field, not 'value'." + f" Got: {v}" + ) + return v + + +class EvaluationResults(TypedDict, total=False): + """Batch evaluation results. + + This makes it easy for your evaluator to return multiple + metrics at once. + """ + + results: List[EvaluationResult] + """The evaluation results.""" + + +class ExperimentResultRow(TypedDict): + """A row of experiment results.""" + + run: Run + example: Example + evaluation_results: EvaluationResults diff --git a/python/tests/integration_tests/test_client.py b/python/tests/integration_tests/test_client.py index 3bcd9d04c..14d02aad3 100644 --- a/python/tests/integration_tests/test_client.py +++ b/python/tests/integration_tests/test_client.py @@ -28,6 +28,7 @@ ExampleUpdateWithAttachments, ExampleUploadWithAttachments, ExampleUpsertWithAttachments, + ExperimentResultRow, Run, ) from langsmith.utils import ( @@ -1254,6 +1255,69 @@ def test_list_examples_attachments_keys(langchain_client: Client) -> None: langchain_client.delete_dataset(dataset_id=dataset.id) +async def test_summary_evaluation_with_evaluator_results( + langchain_client: Client, +) -> None: + """Test summary evaluators receive evaluator results.""" + dataset_name = "__test_summary_evaluation_inline_eval" + uuid4().hex[:4] + langchain_client = Client( + api_key="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" + ) + dataset = langchain_client.create_dataset( + dataset_name, + description="Test dataset for evals with attachments", + data_type=DataType.kv, + ) + + example_id = uuid4() + langchain_client.create_example( + dataset_id=dataset.id, + inputs={"question": "What is 2+2?"}, + outputs={"answer": "4"}, + example_id=example_id, + ) + + def target(inputs: Dict[str, Any]) -> Dict[str, Any]: + return {"answer": "4"} + + async def target_async(inputs: Dict[str, Any]) -> Dict[str, Any]: + return {"answer": "4"} + + def evaluator(outputs: dict, reference_outputs: dict) -> dict: + return {"score": 1, "key": "foo"} + + def summary_evaluator(evaluation_results: list[ExperimentResultRow]) -> dict: + assert len(evaluation_results) == 1 + assert evaluation_results[0]["evaluation_results"]["results"][0].key == "foo" + assert evaluation_results[0]["evaluation_results"]["results"][0].score == 1 + assert evaluation_results[0]["example"].id == example_id + return 1 + + results = langchain_client.evaluate( + target, + data=dataset_name, + evaluators=[evaluator], + summary_evaluators=[summary_evaluator], + num_repetitions=1, + ) + assert len(results._summary_results["results"]) == 1 + assert results._summary_results["results"][0].score == 1 + assert results._summary_results["results"][0].key == "summary_evaluator" + + results = await langchain_client.aevaluate( + target_async, + data=dataset_name, + evaluators=[evaluator], + summary_evaluators=[summary_evaluator], + num_repetitions=1, + ) + assert len(results._summary_results["results"]) == 1 + assert results._summary_results["results"][0].score == 1 + assert results._summary_results["results"][0].key == "summary_evaluator" + + langchain_client.delete_dataset(dataset_id=dataset.id) + + def test_evaluate_with_attachments_multiple_evaluators( langchain_client: Client, ) -> None: From 6d17be1fa4912b32e3949f06aa7150f33f0a1b32 Mon Sep 17 00:00:00 2001 From: Bagatur Date: Mon, 23 Dec 2024 18:43:34 -0500 Subject: [PATCH 2/8] fmt --- python/langsmith/evaluation/_arunner.py | 17 ++++++----------- python/langsmith/evaluation/_runner.py | 22 ++++++++++------------ python/langsmith/evaluation/evaluator.py | 14 +++++++++----- 3 files changed, 25 insertions(+), 28 deletions(-) diff --git a/python/langsmith/evaluation/_arunner.py b/python/langsmith/evaluation/_arunner.py index 8dea802cd..80ba8db17 100644 --- a/python/langsmith/evaluation/_arunner.py +++ b/python/langsmith/evaluation/_arunner.py @@ -689,9 +689,7 @@ async def awith_summary_evaluators( summary_evaluators: Sequence[SUMMARY_EVALUATOR_T], ) -> _AsyncExperimentManager: wrapped_evaluators = _wrap_summary_evaluators(summary_evaluators) - aggregate_feedback_gen = self._aapply_summary_evaluators( - wrapped_evaluators, [r async for r in self.aget_results()] - ) + aggregate_feedback_gen = self._aapply_summary_evaluators(wrapped_evaluators) return _AsyncExperimentManager( await self.aget_examples(), experiment=self._experiment, @@ -858,15 +856,12 @@ async def _arun_evaluators( async def _aapply_summary_evaluators( self, summary_evaluators: Sequence[SUMMARY_EVALUATOR_T], - evaluation_results: List[schemas.ExperimentResultRow], ) -> AsyncIterator[EvaluationResults]: - runs, examples = [], [] - async_examples = aitertools.ensure_async_iterator(await self.aget_examples()) - async for run, example in aitertools.async_zip( - self.aget_runs(), async_examples - ): - runs.append(run) - examples.append(example) + runs, examples, evaluation_results = [], [], [] + async for row in self.aget_results(): + runs.append(row["run"]) + examples.append(row["example"]) + evaluation_results.append(row["evaluation_results"]["results"]) aggregate_feedback = [] project_id = self._get_experiment().id if self._upload_results else None current_context = rh.get_tracing_context() diff --git a/python/langsmith/evaluation/_runner.py b/python/langsmith/evaluation/_runner.py index 06ae64dab..4e9970a95 100644 --- a/python/langsmith/evaluation/_runner.py +++ b/python/langsmith/evaluation/_runner.py @@ -1454,9 +1454,7 @@ def with_summary_evaluators( wrapped_evaluators = _wrap_summary_evaluators(summary_evaluators) context = copy_context() aggregate_feedback_gen = context.run( - self._apply_summary_evaluators, - wrapped_evaluators, - [r for r in self.get_results()], + self._apply_summary_evaluators, wrapped_evaluators ) return _ExperimentManager( self.examples, @@ -1670,12 +1668,12 @@ def _score( def _apply_summary_evaluators( self, summary_evaluators: Sequence[SUMMARY_EVALUATOR_T], - evaluation_results: List[schemas.ExperimentResultRow], ) -> Generator[EvaluationResults, None, None]: - runs, examples = [], [] - for run, example in zip(self.runs, self.examples): - runs.append(run) - examples.append(example) + runs, examples, evaluation_results = [], [], [] + for row in self.get_results(): + runs.append(row["run"]) + examples.append(row["example"]) + evaluation_results.append(row["evaluation_results"]["results"]) aggregate_feedback = [] with ls_utils.ContextThreadPoolExecutor() as executor: project_id = self._get_experiment().id if self._upload_results else None @@ -1794,15 +1792,15 @@ def _wrap(evaluator: SUMMARY_EVALUATOR_T) -> SUMMARY_EVALUATOR_T: @functools.wraps(evaluator) def _wrapper_inner( - runs: Sequence[schemas.Run], - examples: Sequence[schemas.Example], - evaluation_results: Sequence[schemas.ExperimentResultRow], + runs: list[schemas.Run], + examples: list[schemas.Example], + evaluation_results: list[list[EvaluationResult]], ) -> Union[EvaluationResult, EvaluationResults]: @rh.traceable(name=eval_name) def _wrapper_super_inner( runs_: str, examples_: str, evaluation_results_: str ) -> Union[EvaluationResult, EvaluationResults]: - return evaluator(list(runs), list(examples), list(evaluation_results)) + return evaluator(runs, examples, evaluation_results) return _wrapper_super_inner( f"Runs[] (Length={len(runs)})", diff --git a/python/langsmith/evaluation/evaluator.py b/python/langsmith/evaluation/evaluator.py index e62db709a..e732c0dd7 100644 --- a/python/langsmith/evaluation/evaluator.py +++ b/python/langsmith/evaluation/evaluator.py @@ -726,12 +726,16 @@ def _format_evaluator_result( [ Sequence[schemas.Run], Sequence[schemas.Example], - Sequence[ExperimentResultRow], + Sequence[Sequence[schemas.EvaluationResult]], ], Union[EvaluationResult, EvaluationResults], ], Callable[ - [List[schemas.Run], List[schemas.Example], List[ExperimentResultRow]], + [ + List[schemas.Run], + List[schemas.Example], + List[List[schemas.EvaluationResult]], + ], Union[EvaluationResult, EvaluationResults], ], ] @@ -772,7 +776,7 @@ def _normalize_summary_evaluator(func: Callable) -> SUMMARY_EVALUATOR_T: def wrapper( runs: Sequence[schemas.Run], examples: Sequence[schemas.Example], - evaluation_results: Sequence[ExperimentResultRow], + _: Sequence[Sequence[ExperimentResultRow]], ) -> Union[EvaluationResult, EvaluationResults]: result = func(runs, examples) if isinstance(result, EvaluationResult): @@ -785,10 +789,10 @@ def wrapper( return wrapper # type: ignore[return-value] else: - def wrapper( + def wrapper( # type: ignore[misc] runs: Sequence[schemas.Run], examples: Sequence[schemas.Example], - evaluation_results: Sequence[ExperimentResultRow], + evaluation_results: Sequence[Sequence[EvaluationResult]], ) -> Union[EvaluationResult, EvaluationResults]: arg_map = { "runs": runs, From 9921653af89182ffbd6f2a00f1a4454fd7b38a84 Mon Sep 17 00:00:00 2001 From: Isaac Francisco <78627776+isahers1@users.noreply.github.com> Date: Mon, 23 Dec 2024 15:46:44 -0800 Subject: [PATCH 3/8] Update test_client.py --- python/tests/integration_tests/test_client.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/python/tests/integration_tests/test_client.py b/python/tests/integration_tests/test_client.py index 14d02aad3..c8146b1fd 100644 --- a/python/tests/integration_tests/test_client.py +++ b/python/tests/integration_tests/test_client.py @@ -1260,9 +1260,6 @@ async def test_summary_evaluation_with_evaluator_results( ) -> None: """Test summary evaluators receive evaluator results.""" dataset_name = "__test_summary_evaluation_inline_eval" + uuid4().hex[:4] - langchain_client = Client( - api_key="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" - ) dataset = langchain_client.create_dataset( dataset_name, description="Test dataset for evals with attachments", From ab84861e3bb10168573fc31334f1532f3615a86f Mon Sep 17 00:00:00 2001 From: Bagatur Date: Mon, 23 Dec 2024 18:49:46 -0500 Subject: [PATCH 4/8] fmt --- python/langsmith/evaluation/_runner.py | 1 - python/tests/integration_tests/test_client.py | 14 +++++--------- 2 files changed, 5 insertions(+), 10 deletions(-) diff --git a/python/langsmith/evaluation/_runner.py b/python/langsmith/evaluation/_runner.py index 4e9970a95..f3901cc6a 100644 --- a/python/langsmith/evaluation/_runner.py +++ b/python/langsmith/evaluation/_runner.py @@ -1433,7 +1433,6 @@ def with_evaluators( # Split the generator into three so the manager # can consume each value individually. r1, r2, r3 = itertools.tee(experiment_results, 3) - # print("FOOOO", [result["evaluation_results"] for result in r3]) return _ExperimentManager( (result["example"] for result in r1), experiment=self._experiment, diff --git a/python/tests/integration_tests/test_client.py b/python/tests/integration_tests/test_client.py index 14d02aad3..25536149d 100644 --- a/python/tests/integration_tests/test_client.py +++ b/python/tests/integration_tests/test_client.py @@ -24,11 +24,11 @@ from langsmith.schemas import ( AttachmentsOperations, DataType, + EvaluationResult, Example, ExampleUpdateWithAttachments, ExampleUploadWithAttachments, ExampleUpsertWithAttachments, - ExperimentResultRow, Run, ) from langsmith.utils import ( @@ -1260,9 +1260,6 @@ async def test_summary_evaluation_with_evaluator_results( ) -> None: """Test summary evaluators receive evaluator results.""" dataset_name = "__test_summary_evaluation_inline_eval" + uuid4().hex[:4] - langchain_client = Client( - api_key="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" - ) dataset = langchain_client.create_dataset( dataset_name, description="Test dataset for evals with attachments", @@ -1286,12 +1283,11 @@ async def target_async(inputs: Dict[str, Any]) -> Dict[str, Any]: def evaluator(outputs: dict, reference_outputs: dict) -> dict: return {"score": 1, "key": "foo"} - def summary_evaluator(evaluation_results: list[ExperimentResultRow]) -> dict: + def summary_evaluator(evaluation_results: list[EvaluationResult]) -> bool: assert len(evaluation_results) == 1 - assert evaluation_results[0]["evaluation_results"]["results"][0].key == "foo" - assert evaluation_results[0]["evaluation_results"]["results"][0].score == 1 - assert evaluation_results[0]["example"].id == example_id - return 1 + assert evaluation_results[0][0].key == "foo" + assert evaluation_results[0][0].score == 1 + return True results = langchain_client.evaluate( target, From 197c037c7e24837a6f078c6e6be83dbab7d75f8c Mon Sep 17 00:00:00 2001 From: Bagatur Date: Mon, 23 Dec 2024 19:03:42 -0500 Subject: [PATCH 5/8] fmt --- python/tests/integration_tests/test_client.py | 60 ------------------- .../unit_tests/evaluation/test_runner.py | 45 ++++++++------ 2 files changed, 27 insertions(+), 78 deletions(-) diff --git a/python/tests/integration_tests/test_client.py b/python/tests/integration_tests/test_client.py index 25536149d..3bcd9d04c 100644 --- a/python/tests/integration_tests/test_client.py +++ b/python/tests/integration_tests/test_client.py @@ -24,7 +24,6 @@ from langsmith.schemas import ( AttachmentsOperations, DataType, - EvaluationResult, Example, ExampleUpdateWithAttachments, ExampleUploadWithAttachments, @@ -1255,65 +1254,6 @@ def test_list_examples_attachments_keys(langchain_client: Client) -> None: langchain_client.delete_dataset(dataset_id=dataset.id) -async def test_summary_evaluation_with_evaluator_results( - langchain_client: Client, -) -> None: - """Test summary evaluators receive evaluator results.""" - dataset_name = "__test_summary_evaluation_inline_eval" + uuid4().hex[:4] - dataset = langchain_client.create_dataset( - dataset_name, - description="Test dataset for evals with attachments", - data_type=DataType.kv, - ) - - example_id = uuid4() - langchain_client.create_example( - dataset_id=dataset.id, - inputs={"question": "What is 2+2?"}, - outputs={"answer": "4"}, - example_id=example_id, - ) - - def target(inputs: Dict[str, Any]) -> Dict[str, Any]: - return {"answer": "4"} - - async def target_async(inputs: Dict[str, Any]) -> Dict[str, Any]: - return {"answer": "4"} - - def evaluator(outputs: dict, reference_outputs: dict) -> dict: - return {"score": 1, "key": "foo"} - - def summary_evaluator(evaluation_results: list[EvaluationResult]) -> bool: - assert len(evaluation_results) == 1 - assert evaluation_results[0][0].key == "foo" - assert evaluation_results[0][0].score == 1 - return True - - results = langchain_client.evaluate( - target, - data=dataset_name, - evaluators=[evaluator], - summary_evaluators=[summary_evaluator], - num_repetitions=1, - ) - assert len(results._summary_results["results"]) == 1 - assert results._summary_results["results"][0].score == 1 - assert results._summary_results["results"][0].key == "summary_evaluator" - - results = await langchain_client.aevaluate( - target_async, - data=dataset_name, - evaluators=[evaluator], - summary_evaluators=[summary_evaluator], - num_repetitions=1, - ) - assert len(results._summary_results["results"]) == 1 - assert results._summary_results["results"][0].score == 1 - assert results._summary_results["results"][0].key == "summary_evaluator" - - langchain_client.delete_dataset(dataset_id=dataset.id) - - def test_evaluate_with_attachments_multiple_evaluators( langchain_client: Client, ) -> None: diff --git a/python/tests/unit_tests/evaluation/test_runner.py b/python/tests/unit_tests/evaluation/test_runner.py index e33d07fd5..9ad151bfc 100644 --- a/python/tests/unit_tests/evaluation/test_runner.py +++ b/python/tests/unit_tests/evaluation/test_runner.py @@ -265,15 +265,6 @@ def eval_list(run, example): {"score": 1, "key": "list_eval_int"}, ] - def summary_eval_runs_examples(runs_, examples_): - return {"score": len(runs_[0].dotted_order)} - - def summary_eval_inputs_outputs(inputs, outputs): - return [{"score": len([x["in"] for x in inputs])}] - - def summary_eval_outputs_reference(outputs, reference_outputs): - return len([x["answer"] for x in reference_outputs]) - evaluators = [ score_value_first, score_unpacked_inputs_outputs, @@ -285,10 +276,23 @@ def summary_eval_outputs_reference(outputs, reference_outputs): eval_list, ] + def summary_eval_runs_examples(runs_, examples_): + return {"score": len(runs_[0].dotted_order)} + + def summary_eval_inputs_outputs(inputs, outputs): + return [{"score": len([x["in"] for x in inputs])}] + + def summary_eval_outputs_reference(outputs, reference_outputs): + return len([x["answer"] for x in reference_outputs]) + + def summary_eval_evaluation_results(evaluation_results): + return all(len(r) == len(evaluators) + 1 for r in evaluation_results) + summary_evaluators = [ summary_eval_runs_examples, summary_eval_inputs_outputs, summary_eval_outputs_reference, + summary_eval_evaluation_results, ] results = evaluate( @@ -302,6 +306,7 @@ def summary_eval_outputs_reference(outputs, reference_outputs): upload_results=upload_results, max_concurrency=None, ) + if not blocking: deltas = [] last = None @@ -557,15 +562,6 @@ async def eval_list(run, example): {"score": 1, "key": "list_eval_int"}, ] - def summary_eval_runs_examples(runs_, examples_): - return {"score": len(runs_[0].dotted_order)} - - def summary_eval_inputs_outputs(inputs, outputs): - return {"score": len([x["in"] for x in inputs])} - - def summary_eval_outputs_reference(outputs, reference_outputs): - return {"score": len([x["answer"] for x in reference_outputs])} - evaluators = [ score_value_first, score_unpacked_inputs_outputs, @@ -577,10 +573,23 @@ def summary_eval_outputs_reference(outputs, reference_outputs): eval_list, ] + def summary_eval_runs_examples(runs_, examples_): + return {"score": len(runs_[0].dotted_order)} + + def summary_eval_inputs_outputs(inputs, outputs): + return {"score": len([x["in"] for x in inputs])} + + def summary_eval_outputs_reference(outputs, reference_outputs): + return {"score": len([x["answer"] for x in reference_outputs])} + + def summary_eval_evaluation_results(evaluation_results): + return all(len(r) == len(evaluators) + 1 for r in evaluation_results) + summary_evaluators = [ summary_eval_runs_examples, summary_eval_inputs_outputs, summary_eval_outputs_reference, + summary_eval_evaluation_results, ] results = await aevaluate( From dd0621a8a24686077229127dc33f27e44f846638 Mon Sep 17 00:00:00 2001 From: Bagatur Date: Mon, 23 Dec 2024 19:13:03 -0500 Subject: [PATCH 6/8] fmt --- python/docs/create_api_rst.py | 4 +++- python/tests/unit_tests/evaluation/test_runner.py | 9 +++++++-- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/python/docs/create_api_rst.py b/python/docs/create_api_rst.py index 2be0a1525..83fbc4757 100644 --- a/python/docs/create_api_rst.py +++ b/python/docs/create_api_rst.py @@ -105,7 +105,9 @@ def _load_module_members(module_path: str, namespace: str) -> ModuleMembers: else ( "enum" if issubclass(type_, Enum) - else "Pydantic" if issubclass(type_, BaseModel) else "Regular" + else "Pydantic" + if issubclass(type_, BaseModel) + else "Regular" ) ) # if hasattr(type_, "__slots__"): diff --git a/python/tests/unit_tests/evaluation/test_runner.py b/python/tests/unit_tests/evaluation/test_runner.py index 9ad151bfc..eae858247 100644 --- a/python/tests/unit_tests/evaluation/test_runner.py +++ b/python/tests/unit_tests/evaluation/test_runner.py @@ -18,7 +18,7 @@ import pytest from langchain_core.runnables import chain as as_runnable -from langsmith import Client, aevaluate, evaluate +from langsmith import Client, EvaluationResult, aevaluate, evaluate from langsmith import schemas as ls_schemas from langsmith.evaluation._runner import _include_attachments from langsmith.evaluation.evaluator import ( @@ -978,6 +978,10 @@ def summary_eval_outputs_reference(outputs, reference_outputs): return min([len(x["response"]) for x in outputs]) +def summary_eval_outputs_reference(evaluation_results): + return len(evaluation_results) + + @pytest.mark.parametrize( "evaluator", [ @@ -1004,7 +1008,8 @@ def test__normalize_summary_evaluator(evaluator: Callable) -> None: inputs={"in": "b" * 12}, ) ] - assert normalized(runs, examples)["score"] == 12 + evaluation_results = [EvaluationResult(key="foo", score=1)] * 12 + assert normalized(runs, examples, evaluation_results)["score"] == 12 def summary_eval_kwargs(*, runs, examples): From f83a4a84418164122a970a91a8754b12a6a5dab0 Mon Sep 17 00:00:00 2001 From: Bagatur Date: Mon, 23 Dec 2024 19:47:28 -0500 Subject: [PATCH 7/8] fmt --- python/docs/create_api_rst.py | 4 +--- python/langsmith/evaluation/_runner.py | 4 ++-- python/tests/unit_tests/evaluation/test_runner.py | 3 ++- 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/python/docs/create_api_rst.py b/python/docs/create_api_rst.py index 83fbc4757..2be0a1525 100644 --- a/python/docs/create_api_rst.py +++ b/python/docs/create_api_rst.py @@ -105,9 +105,7 @@ def _load_module_members(module_path: str, namespace: str) -> ModuleMembers: else ( "enum" if issubclass(type_, Enum) - else "Pydantic" - if issubclass(type_, BaseModel) - else "Regular" + else "Pydantic" if issubclass(type_, BaseModel) else "Regular" ) ) # if hasattr(type_, "__slots__"): diff --git a/python/langsmith/evaluation/_runner.py b/python/langsmith/evaluation/_runner.py index f3901cc6a..5148e99be 100644 --- a/python/langsmith/evaluation/_runner.py +++ b/python/langsmith/evaluation/_runner.py @@ -1665,14 +1665,14 @@ def _score( yield result def _apply_summary_evaluators( - self, - summary_evaluators: Sequence[SUMMARY_EVALUATOR_T], + self, summary_evaluators: Sequence[SUMMARY_EVALUATOR_T] ) -> Generator[EvaluationResults, None, None]: runs, examples, evaluation_results = [], [], [] for row in self.get_results(): runs.append(row["run"]) examples.append(row["example"]) evaluation_results.append(row["evaluation_results"]["results"]) + aggregate_feedback = [] with ls_utils.ContextThreadPoolExecutor() as executor: project_id = self._get_experiment().id if self._upload_results else None diff --git a/python/tests/unit_tests/evaluation/test_runner.py b/python/tests/unit_tests/evaluation/test_runner.py index eae858247..1b18b577c 100644 --- a/python/tests/unit_tests/evaluation/test_runner.py +++ b/python/tests/unit_tests/evaluation/test_runner.py @@ -978,7 +978,7 @@ def summary_eval_outputs_reference(outputs, reference_outputs): return min([len(x["response"]) for x in outputs]) -def summary_eval_outputs_reference(evaluation_results): +def summary_eval_evaluation_results(evaluation_results): return len(evaluation_results) @@ -988,6 +988,7 @@ def summary_eval_outputs_reference(evaluation_results): summary_eval_runs_examples, summary_eval_inputs_outputs, summary_eval_outputs_reference, + summary_eval_evaluation_results, ], ) def test__normalize_summary_evaluator(evaluator: Callable) -> None: From dd50503517bdb66e21844cf8be0bdb6248722cec Mon Sep 17 00:00:00 2001 From: isaac hershenson Date: Tue, 24 Dec 2024 08:26:35 -0800 Subject: [PATCH 8/8] fmt --- python/langsmith/evaluation/_arunner.py | 15 +++++++++++---- python/langsmith/evaluation/_runner.py | 18 ++++++++++-------- 2 files changed, 21 insertions(+), 12 deletions(-) diff --git a/python/langsmith/evaluation/_arunner.py b/python/langsmith/evaluation/_arunner.py index 80ba8db17..8a612b0b9 100644 --- a/python/langsmith/evaluation/_arunner.py +++ b/python/langsmith/evaluation/_arunner.py @@ -858,10 +858,17 @@ async def _aapply_summary_evaluators( summary_evaluators: Sequence[SUMMARY_EVALUATOR_T], ) -> AsyncIterator[EvaluationResults]: runs, examples, evaluation_results = [], [], [] - async for row in self.aget_results(): - runs.append(row["run"]) - examples.append(row["example"]) - evaluation_results.append(row["evaluation_results"]["results"]) + + async_examples = aitertools.ensure_async_iterator(await self.aget_examples()) + async for run, example in aitertools.async_zip( + self.aget_runs(), async_examples + ): + runs.append(run) + examples.append(example) + + async for evaluation_result in self.aget_evaluation_results(): + evaluation_results.append(evaluation_result["results"]) + aggregate_feedback = [] project_id = self._get_experiment().id if self._upload_results else None current_context = rh.get_tracing_context() diff --git a/python/langsmith/evaluation/_runner.py b/python/langsmith/evaluation/_runner.py index 5148e99be..dbed77c2a 100644 --- a/python/langsmith/evaluation/_runner.py +++ b/python/langsmith/evaluation/_runner.py @@ -1668,10 +1668,12 @@ def _apply_summary_evaluators( self, summary_evaluators: Sequence[SUMMARY_EVALUATOR_T] ) -> Generator[EvaluationResults, None, None]: runs, examples, evaluation_results = [], [], [] - for row in self.get_results(): - runs.append(row["run"]) - examples.append(row["example"]) - evaluation_results.append(row["evaluation_results"]["results"]) + for run, example in zip(self.runs, self.examples): + runs.append(run) + examples.append(example) + + for evaluation_result in self.evaluation_results: + evaluation_results.append(evaluation_result["results"]) aggregate_feedback = [] with ls_utils.ContextThreadPoolExecutor() as executor: @@ -1791,15 +1793,15 @@ def _wrap(evaluator: SUMMARY_EVALUATOR_T) -> SUMMARY_EVALUATOR_T: @functools.wraps(evaluator) def _wrapper_inner( - runs: list[schemas.Run], - examples: list[schemas.Example], - evaluation_results: list[list[EvaluationResult]], + runs: Sequence[schemas.Run], + examples: Sequence[schemas.Example], + evaluation_results: Sequence[list[EvaluationResult]], ) -> Union[EvaluationResult, EvaluationResults]: @rh.traceable(name=eval_name) def _wrapper_super_inner( runs_: str, examples_: str, evaluation_results_: str ) -> Union[EvaluationResult, EvaluationResults]: - return evaluator(runs, examples, evaluation_results) + return evaluator(list(runs), list(examples), list(evaluation_results)) return _wrapper_super_inner( f"Runs[] (Length={len(runs)})",