Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DRAFT] update evaluate to be concurrent #1340

Closed
wants to merge 3 commits into from
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 79 additions & 25 deletions python/langsmith/evaluation/_arunner.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""V2 Evaluation Interface."""

Check notice on line 1 in python/langsmith/evaluation/_arunner.py

View workflow job for this annotation

GitHub Actions / benchmark

Benchmark results

........... WARNING: the benchmark result may be unstable * the standard deviation (73.7 ms) is 11% of the mean (669 ms) Try to rerun the benchmark with more runs, values and/or loops. Run 'python -m pyperf system tune' command to reduce the system jitter. Use pyperf stats, pyperf dump and pyperf hist to analyze results. Use --quiet option to hide these warnings. create_5_000_run_trees: Mean +- std dev: 669 ms +- 74 ms ........... WARNING: the benchmark result may be unstable * the standard deviation (143 ms) is 10% of the mean (1.38 sec) Try to rerun the benchmark with more runs, values and/or loops. Run 'python -m pyperf system tune' command to reduce the system jitter. Use pyperf stats, pyperf dump and pyperf hist to analyze results. Use --quiet option to hide these warnings. create_10_000_run_trees: Mean +- std dev: 1.38 sec +- 0.14 sec ........... WARNING: the benchmark result may be unstable * the standard deviation (232 ms) is 17% of the mean (1.40 sec) Try to rerun the benchmark with more runs, values and/or loops. Run 'python -m pyperf system tune' command to reduce the system jitter. Use pyperf stats, pyperf dump and pyperf hist to analyze results. Use --quiet option to hide these warnings. create_20_000_run_trees: Mean +- std dev: 1.40 sec +- 0.23 sec ........... dumps_class_nested_py_branch_and_leaf_200x400: Mean +- std dev: 690 us +- 23 us ........... dumps_class_nested_py_leaf_50x100: Mean +- std dev: 25.0 ms +- 0.5 ms ........... dumps_class_nested_py_leaf_100x200: Mean +- std dev: 104 ms +- 2 ms ........... dumps_dataclass_nested_50x100: Mean +- std dev: 25.4 ms +- 0.2 ms ........... WARNING: the benchmark result may be unstable * the standard deviation (15.1 ms) is 21% of the mean (70.6 ms) Try to rerun the benchmark with more runs, values and/or loops. Run 'python -m pyperf system tune' command to reduce the system jitter. Use pyperf stats, pyperf dump and pyperf hist to analyze results. Use --quiet option to hide these warnings. dumps_pydantic_nested_50x100: Mean +- std dev: 70.6 ms +- 15.1 ms ........... dumps_pydanticv1_nested_50x100: Mean +- std dev: 196 ms +- 2 ms

Check notice on line 1 in python/langsmith/evaluation/_arunner.py

View workflow job for this annotation

GitHub Actions / benchmark

Comparison against main

+-----------------------------------------------+----------+------------------------+ | Benchmark | main | changes | +===============================================+==========+========================+ | dumps_pydanticv1_nested_50x100 | 217 ms | 196 ms: 1.10x faster | +-----------------------------------------------+----------+------------------------+ | create_5_000_run_trees | 721 ms | 669 ms: 1.08x faster | +-----------------------------------------------+----------+------------------------+ | dumps_class_nested_py_branch_and_leaf_200x400 | 695 us | 690 us: 1.01x faster | +-----------------------------------------------+----------+------------------------+ | dumps_class_nested_py_leaf_100x200 | 103 ms | 104 ms: 1.00x slower | +-----------------------------------------------+----------+------------------------+ | create_10_000_run_trees | 1.37 sec | 1.38 sec: 1.01x slower | +-----------------------------------------------+----------+------------------------+ | dumps_class_nested_py_leaf_50x100 | 24.8 ms | 25.0 ms: 1.01x slower | +-----------------------------------------------+----------+------------------------+ | dumps_dataclass_nested_50x100 | 25.2 ms | 25.4 ms: 1.01x slower | +-----------------------------------------------+----------+------------------------+ | create_20_000_run_trees | 1.36 sec | 1.40 sec: 1.02x slower | +-----------------------------------------------+----------+------------------------+ | dumps_pydantic_nested_50x100 | 64.9 ms | 70.6 ms: 1.09x slower | +-----------------------------------------------+----------+------------------------+ | Geometric mean | (ref) | 1.01x faster | +-----------------------------------------------+----------+------------------------+

from __future__ import annotations

Expand Down Expand Up @@ -491,15 +491,24 @@
cache_path = None
with ls_utils.with_optional_cache(cache_path, ignore_hosts=[client.api_url]):
if is_async_target:
manager = await manager.awith_predictions(
cast(ATARGET_T, target), max_concurrency=max_concurrency
)
if evaluators:
manager = await manager.awith_evaluators(
evaluators, max_concurrency=max_concurrency
)
if summary_evaluators:
manager = await manager.awith_summary_evaluators(summary_evaluators)
if evaluators:
# Run predictions and evaluations in a single pipeline
manager = await manager.awith_predictions_and_evaluators(
cast(ATARGET_T, target), evaluators, max_concurrency=max_concurrency
)
Comment on lines +496 to +498
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

summary evaluators are still evaluated after all the predictions and evaluations have been made, can change in the future but I think much less of a bottle neck.

else:
manager = await manager.awith_predictions(
cast(ATARGET_T, target), max_concurrency=max_concurrency
)
if summary_evaluators:
manager = await manager.awith_summary_evaluators(summary_evaluators)
else:
if evaluators:
manager = await manager.awith_evaluators(
evaluators, max_concurrency=max_concurrency
)
if summary_evaluators:
manager = await manager.awith_summary_evaluators(summary_evaluators)
results = AsyncExperimentResults(manager)
if blocking:
await results.wait()
Expand Down Expand Up @@ -642,6 +651,56 @@
upload_results=self._upload_results,
)

async def awith_predictions_and_evaluators(
self,
target: ATARGET_T,
evaluators: Sequence[Union[EVALUATOR_T, AEVALUATOR_T]],
/,
max_concurrency: Optional[int] = None,
) -> _AsyncExperimentManager:
"""Run predictions and evaluations in a single pipeline.

This allows evaluators to process results as soon as they're available from
the target function, rather than waiting for all predictions to complete first.
"""
evaluators = _resolve_evaluators(evaluators)

if not hasattr(self, '_evaluator_executor'):
self._evaluator_executor = cf.ThreadPoolExecutor(max_workers=4)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like doing this, but I couldn't figure out another way to pass an executor that doesn't trigger a raise RuntimeError('cannot schedule new futures after shutdown') from _log_evaluation_feedback. I also don't think I ever delete this executor, but I am not 100% sure where to do so

async def process_examples():
async for pred in self._apredict(
target,
max_concurrency=max_concurrency,
include_attachments=_include_attachments(target),
):
example, run = pred["example"], pred["run"]
result = self._arun_evaluators(
evaluators,
{"run": run, "example": example, "evaluation_results": {"results": []}},
executor=self._evaluator_executor,
)
yield result

experiment_results = aitertools.aiter_with_concurrency(
max_concurrency,
process_examples(),
_eager_consumption_timeout=0.001,
)

r1, r2, r3 = aitertools.atee(experiment_results, 3, lock=asyncio.Lock())

return _AsyncExperimentManager(
(result["example"] async for result in r1),
experiment=self._experiment,
metadata=self._metadata,
client=self.client,
runs=(result["run"] async for result in r2),
evaluation_results=(result["evaluation_results"] async for result in r3),
summary_results=self._summary_results,
include_attachments=self._include_attachments,
upload_results=self._upload_results,
)

async def awith_predictions(
self,
target: ATARGET_T,
Expand Down Expand Up @@ -796,15 +855,17 @@
run = current_results["run"]
example = current_results["example"]
eval_results = current_results["evaluation_results"]
for evaluator in evaluators:
lock = asyncio.Lock()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's necessary, can remove if wanted

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if not necessary remove -- is this used only for .extend()? (I believe .extend is atomic)

https://docs.python.org/3/faq/library.html#what-kinds-of-global-value-mutation-are-thread-safe

async def _run_single_evaluator(evaluator):
try:
evaluator_response = await evaluator.aevaluate_run(
run=run,
example=example,
)
eval_results["results"].extend(
self.client._select_eval_results(evaluator_response)
)
selected_results = self.client._select_eval_results(evaluator_response)
async with lock:
eval_results["results"].extend(selected_results)

if self._upload_results:
self.client._log_evaluation_feedback(
evaluator_response, run=run, _executor=executor
Expand All @@ -824,9 +885,9 @@
for key in feedback_keys
]
)
eval_results["results"].extend(
self.client._select_eval_results(error_response)
)
selected_results = self.client._select_eval_results(error_response)
async with lock:
eval_results["results"].extend(selected_results)
if self._upload_results:
self.client._log_evaluation_feedback(
error_response, run=run, _executor=executor
Expand All @@ -839,15 +900,8 @@
f" run {run.id}: {repr(e)}",
exc_info=True,
)
logger.error(
f"Error running evaluator {repr(evaluator)} on"
f" run {run.id}: {repr(e)}",
exc_info=True,
)
if example.attachments is not None:
for attachment in example.attachments:
reader = example.attachments[attachment]["reader"]
reader.seek(0)
Comment on lines -847 to -850
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This no longer works because the evaluators run in parallel. I do not know the ideal solution to this, am open to any and all ideas.


await asyncio.gather(*[_run_single_evaluator(evaluator) for evaluator in evaluators])
return ExperimentResultRow(
run=run,
example=example,
Expand Down
Loading