Skip to content

Commit

Permalink
[Python] Evals: submit feedback in thread (#977)
Browse files Browse the repository at this point in the history
And then will be even better once we add batch endpoint
  • Loading branch information
hinthornw authored Sep 5, 2024
1 parent daf3368 commit 97b6b9f
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 18 deletions.
16 changes: 13 additions & 3 deletions python/langsmith/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3938,8 +3938,17 @@ def _log_evaluation_feedback(
run: Optional[ls_schemas.Run] = None,
source_info: Optional[Dict[str, Any]] = None,
project_id: Optional[ID_TYPE] = None,
*,
_executor: Optional[cf.ThreadPoolExecutor] = None,
) -> List[ls_evaluator.EvaluationResult]:
results = self._select_eval_results(evaluator_response)

def _submit_feedback(**kwargs):
if _executor:
_executor.submit(self.create_feedback, **kwargs)
else:
self.create_feedback(**kwargs)

for res in results:
source_info_ = source_info or {}
if res.evaluator_info:
Expand All @@ -3949,9 +3958,10 @@ def _log_evaluation_feedback(
run_id_ = res.target_run_id
elif run is not None:
run_id_ = run.id
self.create_feedback(
run_id_,
res.key,

_submit_feedback(
run_id=run_id_,
key=res.key,
score=res.score,
value=res.value,
comment=res.comment,
Expand Down
14 changes: 9 additions & 5 deletions python/langsmith/evaluation/_arunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from __future__ import annotations

import asyncio
import concurrent.futures as cf
import datetime
import logging
import pathlib
Expand Down Expand Up @@ -603,9 +604,12 @@ async def _ascore(
max_concurrency: Optional[int] = None,
) -> AsyncIterator[ExperimentResultRow]:
async def score_all():
async for current_results in self.aget_results():
# Yield the coroutine to be awaited later in aiter_with_concurrency
yield self._arun_evaluators(evaluators, current_results)
with cf.ThreadPoolExecutor(max_workers=4) as executor:
async for current_results in self.aget_results():
# Yield the coroutine to be awaited later in aiter_with_concurrency
yield self._arun_evaluators(
evaluators, current_results, executor=executor
)

async for result in aitertools.aiter_with_concurrency(
max_concurrency, score_all(), _eager_consumption_timeout=0.001
Expand All @@ -616,6 +620,7 @@ async def _arun_evaluators(
self,
evaluators: Sequence[RunEvaluator],
current_results: ExperimentResultRow,
executor: cf.ThreadPoolExecutor,
) -> ExperimentResultRow:
current_context = rh.get_tracing_context()
metadata = {
Expand All @@ -642,8 +647,7 @@ async def _arun_evaluators(
)
eval_results["results"].extend(
self.client._log_evaluation_feedback(
evaluator_response,
run=run,
evaluator_response, run=run, _executor=executor
)
)
except Exception as e:
Expand Down
26 changes: 16 additions & 10 deletions python/langsmith/evaluation/_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -1294,6 +1294,7 @@ def _run_evaluators(
self,
evaluators: Sequence[RunEvaluator],
current_results: ExperimentResultRow,
executor: cf.ThreadPoolExecutor,
) -> ExperimentResultRow:
current_context = rh.get_tracing_context()
metadata = {
Expand Down Expand Up @@ -1325,8 +1326,7 @@ def _run_evaluators(
eval_results["results"].extend(
# TODO: This is a hack
self.client._log_evaluation_feedback(
evaluator_response,
run=run,
evaluator_response, run=run, _executor=executor
)
)
except Exception as e:
Expand All @@ -1351,21 +1351,27 @@ def _score(
Expects runs to be available in the manager.
(e.g. from a previous prediction step)
"""
if max_concurrency == 0:
context = copy_context()
for current_results in self.get_results():
yield context.run(self._run_evaluators, evaluators, current_results)
else:
with ls_utils.ContextThreadPoolExecutor(
max_workers=max_concurrency
) as executor:
with ls_utils.ContextThreadPoolExecutor(
max_workers=max_concurrency
) as executor:
if max_concurrency == 0:
context = copy_context()
for current_results in self.get_results():
yield context.run(
self._run_evaluators,
evaluators,
current_results,
executor=executor,
)
else:
futures = set()
for current_results in self.get_results():
futures.add(
executor.submit(
self._run_evaluators,
evaluators,
current_results,
executor=executor,
)
)
try:
Expand Down

0 comments on commit 97b6b9f

Please sign in to comment.