From 842e12a21e7bd1d8efcbbb2bd6b18e594c7f033d Mon Sep 17 00:00:00 2001 From: William FH <13333726+hinthornw@users.noreply.github.com> Date: Wed, 11 Sep 2024 11:36:11 -0700 Subject: [PATCH] [Python:fix] In aevaluate, ensure thread to submit feedback remains open (#995) --- python/langsmith/__init__.py | 9 ++++ python/langsmith/evaluation/_arunner.py | 20 ++++---- python/pyproject.toml | 2 +- python/tests/evaluation/test_evaluation.py | 53 +++++++++++++++++++++- 4 files changed, 71 insertions(+), 13 deletions(-) diff --git a/python/langsmith/__init__.py b/python/langsmith/__init__.py index a20f5b2f9..f3a1de90a 100644 --- a/python/langsmith/__init__.py +++ b/python/langsmith/__init__.py @@ -72,10 +72,19 @@ def __getattr__(name: str) -> Any: from langsmith.evaluation import evaluate return evaluate + + elif name == "evaluate_existing": + from langsmith.evaluation import evaluate_existing + + return evaluate_existing elif name == "aevaluate": from langsmith.evaluation import aevaluate return aevaluate + elif name == "aevaluate_existing": + from langsmith.evaluation import aevaluate_existing + + return aevaluate_existing elif name == "tracing_context": from langsmith.run_helpers import tracing_context diff --git a/python/langsmith/evaluation/_arunner.py b/python/langsmith/evaluation/_arunner.py index 79754261c..732ce94bd 100644 --- a/python/langsmith/evaluation/_arunner.py +++ b/python/langsmith/evaluation/_arunner.py @@ -102,8 +102,7 @@ async def aevaluate( Examples: >>> from typing import Sequence - >>> from langsmith import Client - >>> from langsmith.evaluation import evaluate + >>> from langsmith import Client, aevaluate >>> from langsmith.schemas import Example, Run >>> client = Client() >>> dataset = client.clone_public_dataset( @@ -206,7 +205,7 @@ async def aevaluate( >>> async def helpfulness(run: Run, example: Example): ... # Row-level evaluator for helpfulness. - ... await asyncio.sleep(0.1) # Replace with your LLM API call + ... await asyncio.sleep(5) # Replace with your LLM API call ... return {"score": run.outputs["output"] == "Yes"} >>> results = asyncio.run( @@ -286,7 +285,7 @@ async def aevaluate_existing( Load the experiment and run the evaluation. - >>> from langsmith.evaluation import aevaluate, aevaluate_existing + >>> from langsmith import aevaluate, aevaluate_existing >>> dataset_name = "Evaluate Examples" >>> async def apredict(inputs: dict) -> dict: ... # This can be any async function or just an API call to your app. @@ -603,18 +602,19 @@ async def _ascore( evaluators: Sequence[RunEvaluator], max_concurrency: Optional[int] = None, ) -> AsyncIterator[ExperimentResultRow]: - async def score_all(): - with cf.ThreadPoolExecutor(max_workers=4) as executor: + with cf.ThreadPoolExecutor(max_workers=4) as executor: + + async def score_all(): async for current_results in self.aget_results(): # Yield the coroutine to be awaited later in aiter_with_concurrency yield self._arun_evaluators( evaluators, current_results, executor=executor ) - async for result in aitertools.aiter_with_concurrency( - max_concurrency, score_all(), _eager_consumption_timeout=0.001 - ): - yield result + async for result in aitertools.aiter_with_concurrency( + max_concurrency, score_all(), _eager_consumption_timeout=0.001 + ): + yield result async def _arun_evaluators( self, diff --git a/python/pyproject.toml b/python/pyproject.toml index 8902a3d37..bd48e58e4 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "langsmith" -version = "0.1.117" +version = "0.1.18" description = "Client library to connect to the LangSmith LLM Tracing and Evaluation Platform." authors = ["LangChain "] license = "MIT" diff --git a/python/tests/evaluation/test_evaluation.py b/python/tests/evaluation/test_evaluation.py index c654a2b58..e2917afde 100644 --- a/python/tests/evaluation/test_evaluation.py +++ b/python/tests/evaluation/test_evaluation.py @@ -1,11 +1,36 @@ import asyncio -from typing import Sequence +import time +from typing import Callable, Sequence, Tuple, TypeVar import pytest from langsmith import Client, aevaluate, evaluate, expect, test from langsmith.schemas import Example, Run +T = TypeVar("T") + + +def wait_for( + condition: Callable[[], Tuple[T, bool]], + max_sleep_time: int = 120, + sleep_time: int = 3, +) -> T: + """Wait for a condition to be true.""" + start_time = time.time() + last_e = None + while time.time() - start_time < max_sleep_time: + try: + res, cond = condition() + if cond: + return res + except Exception as e: + last_e = e + time.sleep(sleep_time) + total_time = time.time() - start_time + if last_e is not None: + raise last_e + raise ValueError(f"Callable did not return within {total_time}") + def test_evaluate(): client = Client() @@ -59,6 +84,12 @@ def accuracy(run: Run, example: Example): expected = example.outputs["answer"] # type: ignore return {"score": expected.lower() == pred.lower()} + async def slow_accuracy(run: Run, example: Example): + pred = run.outputs["output"] # type: ignore + expected = example.outputs["answer"] # type: ignore + await asyncio.sleep(5) + return {"score": expected.lower() == pred.lower()} + def precision(runs: Sequence[Run], examples: Sequence[Example]): predictions = [run.outputs["output"].lower() for run in runs] # type: ignore expected = [example.outputs["answer"].lower() for example in examples] # type: ignore @@ -73,7 +104,7 @@ async def apredict(inputs: dict) -> dict: results = await aevaluate( apredict, data=dataset_name, - evaluators=[accuracy], + evaluators=[accuracy, slow_accuracy], summary_evaluators=[precision], experiment_prefix="My Experiment", description="My Experiment Description", @@ -86,12 +117,30 @@ async def apredict(inputs: dict) -> dict: assert len(results) == 20 examples = client.list_examples(dataset_name=dataset_name) all_results = [r async for r in results] + all_examples = [] for example in examples: count = 0 for r in all_results: if r["run"].reference_example_id == example.id: count += 1 assert count == 2 + all_examples.append(example) + + # Wait for there to be 2x runs vs. examples + def check_run_count(): + current_runs = list( + client.list_runs(project_name=results.experiment_name, is_root=True) + ) + for r in current_runs: + assert "accuracy" in r.feedback_stats + assert "slow_accuracy" in r.feedback_stats + return current_runs, len(current_runs) == 2 * len(all_examples) + + final_runs = wait_for(check_run_count, max_sleep_time=60, sleep_time=2) + + assert len(final_runs) == 2 * len( + all_examples + ), f"Expected {2 * len(all_examples)} runs, but got {len(final_runs)}" @test