From 97b6b9f9f7aced9f2ea6e0640f9a0c097cf6bfba Mon Sep 17 00:00:00 2001 From: William FH <13333726+hinthornw@users.noreply.github.com> Date: Wed, 4 Sep 2024 18:42:17 -0700 Subject: [PATCH] [Python] Evals: submit feedback in thread (#977) And then will be even better once we add batch endpoint --- python/langsmith/client.py | 16 ++++++++++++--- python/langsmith/evaluation/_arunner.py | 14 ++++++++----- python/langsmith/evaluation/_runner.py | 26 +++++++++++++++---------- 3 files changed, 38 insertions(+), 18 deletions(-) diff --git a/python/langsmith/client.py b/python/langsmith/client.py index 3e398c808..b5df51267 100644 --- a/python/langsmith/client.py +++ b/python/langsmith/client.py @@ -3938,8 +3938,17 @@ def _log_evaluation_feedback( run: Optional[ls_schemas.Run] = None, source_info: Optional[Dict[str, Any]] = None, project_id: Optional[ID_TYPE] = None, + *, + _executor: Optional[cf.ThreadPoolExecutor] = None, ) -> List[ls_evaluator.EvaluationResult]: results = self._select_eval_results(evaluator_response) + + def _submit_feedback(**kwargs): + if _executor: + _executor.submit(self.create_feedback, **kwargs) + else: + self.create_feedback(**kwargs) + for res in results: source_info_ = source_info or {} if res.evaluator_info: @@ -3949,9 +3958,10 @@ def _log_evaluation_feedback( run_id_ = res.target_run_id elif run is not None: run_id_ = run.id - self.create_feedback( - run_id_, - res.key, + + _submit_feedback( + run_id=run_id_, + key=res.key, score=res.score, value=res.value, comment=res.comment, diff --git a/python/langsmith/evaluation/_arunner.py b/python/langsmith/evaluation/_arunner.py index 20021480f..79754261c 100644 --- a/python/langsmith/evaluation/_arunner.py +++ b/python/langsmith/evaluation/_arunner.py @@ -3,6 +3,7 @@ from __future__ import annotations import asyncio +import concurrent.futures as cf import datetime import logging import pathlib @@ -603,9 +604,12 @@ async def _ascore( max_concurrency: Optional[int] = None, ) -> AsyncIterator[ExperimentResultRow]: async def score_all(): - async for current_results in self.aget_results(): - # Yield the coroutine to be awaited later in aiter_with_concurrency - yield self._arun_evaluators(evaluators, current_results) + with cf.ThreadPoolExecutor(max_workers=4) as executor: + async for current_results in self.aget_results(): + # Yield the coroutine to be awaited later in aiter_with_concurrency + yield self._arun_evaluators( + evaluators, current_results, executor=executor + ) async for result in aitertools.aiter_with_concurrency( max_concurrency, score_all(), _eager_consumption_timeout=0.001 @@ -616,6 +620,7 @@ async def _arun_evaluators( self, evaluators: Sequence[RunEvaluator], current_results: ExperimentResultRow, + executor: cf.ThreadPoolExecutor, ) -> ExperimentResultRow: current_context = rh.get_tracing_context() metadata = { @@ -642,8 +647,7 @@ async def _arun_evaluators( ) eval_results["results"].extend( self.client._log_evaluation_feedback( - evaluator_response, - run=run, + evaluator_response, run=run, _executor=executor ) ) except Exception as e: diff --git a/python/langsmith/evaluation/_runner.py b/python/langsmith/evaluation/_runner.py index 430bb537c..4742a99b9 100644 --- a/python/langsmith/evaluation/_runner.py +++ b/python/langsmith/evaluation/_runner.py @@ -1294,6 +1294,7 @@ def _run_evaluators( self, evaluators: Sequence[RunEvaluator], current_results: ExperimentResultRow, + executor: cf.ThreadPoolExecutor, ) -> ExperimentResultRow: current_context = rh.get_tracing_context() metadata = { @@ -1325,8 +1326,7 @@ def _run_evaluators( eval_results["results"].extend( # TODO: This is a hack self.client._log_evaluation_feedback( - evaluator_response, - run=run, + evaluator_response, run=run, _executor=executor ) ) except Exception as e: @@ -1351,14 +1351,19 @@ def _score( Expects runs to be available in the manager. (e.g. from a previous prediction step) """ - if max_concurrency == 0: - context = copy_context() - for current_results in self.get_results(): - yield context.run(self._run_evaluators, evaluators, current_results) - else: - with ls_utils.ContextThreadPoolExecutor( - max_workers=max_concurrency - ) as executor: + with ls_utils.ContextThreadPoolExecutor( + max_workers=max_concurrency + ) as executor: + if max_concurrency == 0: + context = copy_context() + for current_results in self.get_results(): + yield context.run( + self._run_evaluators, + evaluators, + current_results, + executor=executor, + ) + else: futures = set() for current_results in self.get_results(): futures.add( @@ -1366,6 +1371,7 @@ def _score( self._run_evaluators, evaluators, current_results, + executor=executor, ) ) try: