Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add inline evaluation results for summary evaluators #1347

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 18 additions & 13 deletions python/langsmith/evaluation/_arunner.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""V2 Evaluation Interface."""

Check notice on line 1 in python/langsmith/evaluation/_arunner.py

View workflow job for this annotation

GitHub Actions / benchmark

Benchmark results

........... WARNING: the benchmark result may be unstable * the standard deviation (101 ms) is 15% of the mean (688 ms) Try to rerun the benchmark with more runs, values and/or loops. Run 'python -m pyperf system tune' command to reduce the system jitter. Use pyperf stats, pyperf dump and pyperf hist to analyze results. Use --quiet option to hide these warnings. create_5_000_run_trees: Mean +- std dev: 688 ms +- 101 ms ........... WARNING: the benchmark result may be unstable * the standard deviation (230 ms) is 16% of the mean (1.45 sec) Try to rerun the benchmark with more runs, values and/or loops. Run 'python -m pyperf system tune' command to reduce the system jitter. Use pyperf stats, pyperf dump and pyperf hist to analyze results. Use --quiet option to hide these warnings. create_10_000_run_trees: Mean +- std dev: 1.45 sec +- 0.23 sec ........... WARNING: the benchmark result may be unstable * the standard deviation (179 ms) is 13% of the mean (1.36 sec) Try to rerun the benchmark with more runs, values and/or loops. Run 'python -m pyperf system tune' command to reduce the system jitter. Use pyperf stats, pyperf dump and pyperf hist to analyze results. Use --quiet option to hide these warnings. create_20_000_run_trees: Mean +- std dev: 1.36 sec +- 0.18 sec ........... dumps_class_nested_py_branch_and_leaf_200x400: Mean +- std dev: 691 us +- 7 us ........... dumps_class_nested_py_leaf_50x100: Mean +- std dev: 25.2 ms +- 0.2 ms ........... dumps_class_nested_py_leaf_100x200: Mean +- std dev: 106 ms +- 3 ms ........... dumps_dataclass_nested_50x100: Mean +- std dev: 25.5 ms +- 0.3 ms ........... WARNING: the benchmark result may be unstable * the standard deviation (16.4 ms) is 23% of the mean (71.6 ms) Try to rerun the benchmark with more runs, values and/or loops. Run 'python -m pyperf system tune' command to reduce the system jitter. Use pyperf stats, pyperf dump and pyperf hist to analyze results. Use --quiet option to hide these warnings. dumps_pydantic_nested_50x100: Mean +- std dev: 71.6 ms +- 16.4 ms ........... dumps_pydanticv1_nested_50x100: Mean +- std dev: 199 ms +- 2 ms

Check notice on line 1 in python/langsmith/evaluation/_arunner.py

View workflow job for this annotation

GitHub Actions / benchmark

Comparison against main

+-----------------------------------------------+----------+------------------------+ | Benchmark | main | changes | +===============================================+==========+========================+ | dumps_pydanticv1_nested_50x100 | 217 ms | 199 ms: 1.09x faster | +-----------------------------------------------+----------+------------------------+ | create_5_000_run_trees | 717 ms | 688 ms: 1.04x faster | +-----------------------------------------------+----------+------------------------+ | create_20_000_run_trees | 1.40 sec | 1.36 sec: 1.03x faster | +-----------------------------------------------+----------+------------------------+ | dumps_class_nested_py_branch_and_leaf_200x400 | 691 us | 691 us: 1.00x slower | +-----------------------------------------------+----------+------------------------+ | dumps_class_nested_py_leaf_50x100 | 25.2 ms | 25.2 ms: 1.00x slower | +-----------------------------------------------+----------+------------------------+ | dumps_dataclass_nested_50x100 | 25.5 ms | 25.5 ms: 1.00x slower | +-----------------------------------------------+----------+------------------------+ | dumps_class_nested_py_leaf_100x200 | 104 ms | 106 ms: 1.02x slower | +-----------------------------------------------+----------+------------------------+ | create_10_000_run_trees | 1.38 sec | 1.45 sec: 1.05x slower | +-----------------------------------------------+----------+------------------------+ | dumps_pydantic_nested_50x100 | 66.1 ms | 71.6 ms: 1.08x slower | +-----------------------------------------------+----------+------------------------+ | Geometric mean | (ref) | 1.00x faster | +-----------------------------------------------+----------+------------------------+

from __future__ import annotations

Expand Down Expand Up @@ -36,7 +36,6 @@
AEVALUATOR_T,
DATA_T,
EVALUATOR_T,
ExperimentResultRow,
_evaluators_include_attachments,
_ExperimentManagerMixin,
_extract_feedback_keys,
Expand Down Expand Up @@ -703,11 +702,11 @@
upload_results=self._upload_results,
)

async def aget_results(self) -> AsyncIterator[ExperimentResultRow]:
async def aget_results(self) -> AsyncIterator[schemas.ExperimentResultRow]:
async for run, example, evaluation_results in aitertools.async_zip(
self.aget_runs(), await self.aget_examples(), self.aget_evaluation_results()
):
yield ExperimentResultRow(
yield schemas.ExperimentResultRow(
run=run,
example=example,
evaluation_results=evaluation_results,
Expand Down Expand Up @@ -758,7 +757,7 @@
self,
evaluators: Sequence[RunEvaluator],
max_concurrency: Optional[int] = None,
) -> AsyncIterator[ExperimentResultRow]:
) -> AsyncIterator[schemas.ExperimentResultRow]:
with cf.ThreadPoolExecutor(max_workers=4) as executor:

async def score_all():
Expand All @@ -776,9 +775,9 @@
async def _arun_evaluators(
self,
evaluators: Sequence[RunEvaluator],
current_results: ExperimentResultRow,
current_results: schemas.ExperimentResultRow,
executor: cf.ThreadPoolExecutor,
) -> ExperimentResultRow:
) -> schemas.ExperimentResultRow:
current_context = rh.get_tracing_context()
metadata = {
**(current_context["metadata"] or {}),
Expand Down Expand Up @@ -848,22 +847,28 @@
for attachment in example.attachments:
reader = example.attachments[attachment]["reader"]
reader.seek(0)
return ExperimentResultRow(
return schemas.ExperimentResultRow(
run=run,
example=example,
evaluation_results=eval_results,
)

async def _aapply_summary_evaluators(
self, summary_evaluators: Sequence[SUMMARY_EVALUATOR_T]
self,
summary_evaluators: Sequence[SUMMARY_EVALUATOR_T],
) -> AsyncIterator[EvaluationResults]:
runs, examples = [], []
runs, examples, evaluation_results = [], [], []

async_examples = aitertools.ensure_async_iterator(await self.aget_examples())
async for run, example in aitertools.async_zip(
self.aget_runs(), async_examples
):
runs.append(run)
examples.append(example)

async for evaluation_result in self.aget_evaluation_results():
evaluation_results.append(evaluation_result["results"])

aggregate_feedback = []
project_id = self._get_experiment().id if self._upload_results else None
current_context = rh.get_tracing_context()
Expand All @@ -885,7 +890,7 @@
):
for evaluator in summary_evaluators:
try:
summary_eval_result = evaluator(runs, examples)
summary_eval_result = evaluator(runs, examples, evaluation_results)
flattened_results = self.client._select_eval_results(
summary_eval_result,
fn_name=evaluator.__name__,
Expand Down Expand Up @@ -963,7 +968,7 @@
experiment_manager: _AsyncExperimentManager,
):
self._manager = experiment_manager
self._results: List[ExperimentResultRow] = []
self._results: List[schemas.ExperimentResultRow] = []
self._lock = asyncio.Lock()
self._task = asyncio.create_task(self._process_data(self._manager))
self._processed_count = 0
Expand All @@ -972,10 +977,10 @@
def experiment_name(self) -> str:
return self._manager.experiment_name

def __aiter__(self) -> AsyncIterator[ExperimentResultRow]:
def __aiter__(self) -> AsyncIterator[schemas.ExperimentResultRow]:
return self

async def __anext__(self) -> ExperimentResultRow:
async def __anext__(self) -> schemas.ExperimentResultRow:
async def _wait_until_index(index: int) -> None:
while self._processed_count < index:
await asyncio.sleep(0.05)
Expand Down
50 changes: 27 additions & 23 deletions python/langsmith/evaluation/_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -534,12 +534,6 @@ def evaluate_existing(
)


class ExperimentResultRow(TypedDict):
run: schemas.Run
example: schemas.Example
evaluation_results: EvaluationResults


class ExperimentResults:
"""Represents the results of an evaluate() call.

Expand All @@ -554,8 +548,8 @@ class ExperimentResults:

def __init__(self, experiment_manager: _ExperimentManager, blocking: bool = True):
self._manager = experiment_manager
self._results: List[ExperimentResultRow] = []
self._queue: queue.Queue[ExperimentResultRow] = queue.Queue()
self._results: List[schemas.ExperimentResultRow] = []
self._queue: queue.Queue[schemas.ExperimentResultRow] = queue.Queue()
self._processing_complete = threading.Event()
if not blocking:
self._thread: Optional[threading.Thread] = threading.Thread(
Expand All @@ -570,7 +564,7 @@ def __init__(self, experiment_manager: _ExperimentManager, blocking: bool = True
def experiment_name(self) -> str:
return self._manager.experiment_name

def __iter__(self) -> Iterator[ExperimentResultRow]:
def __iter__(self) -> Iterator[schemas.ExperimentResultRow]:
ix = 0
while (
not self._processing_complete.is_set()
Expand Down Expand Up @@ -1473,12 +1467,12 @@ def with_summary_evaluators(
upload_results=self._upload_results,
)

def get_results(self) -> Iterable[ExperimentResultRow]:
def get_results(self) -> Iterable[schemas.ExperimentResultRow]:
"""Return the traces, evaluation results, and associated examples."""
for run, example, evaluation_results in zip(
self.runs, self.examples, self.evaluation_results
):
yield ExperimentResultRow(
yield schemas.ExperimentResultRow(
run=run,
example=example,
evaluation_results=evaluation_results,
Expand Down Expand Up @@ -1544,9 +1538,9 @@ def _predict(
def _run_evaluators(
self,
evaluators: Sequence[RunEvaluator],
current_results: ExperimentResultRow,
current_results: schemas.ExperimentResultRow,
executor: cf.ThreadPoolExecutor,
) -> ExperimentResultRow:
) -> schemas.ExperimentResultRow:
current_context = rh.get_tracing_context()
metadata = {
**(current_context["metadata"] or {}),
Expand Down Expand Up @@ -1619,7 +1613,7 @@ def _run_evaluators(
reader = example.attachments[attachment]["reader"]
reader.seek(0)

return ExperimentResultRow(
return schemas.ExperimentResultRow(
run=run,
example=example,
evaluation_results=eval_results,
Expand All @@ -1629,7 +1623,7 @@ def _score(
self,
evaluators: Sequence[RunEvaluator],
max_concurrency: Optional[int] = None,
) -> Iterable[ExperimentResultRow]:
) -> Iterable[schemas.ExperimentResultRow]:
"""Run the evaluators on the prediction stream.

Expects runs to be available in the manager.
Expand Down Expand Up @@ -1673,10 +1667,14 @@ def _score(
def _apply_summary_evaluators(
self, summary_evaluators: Sequence[SUMMARY_EVALUATOR_T]
) -> Generator[EvaluationResults, None, None]:
runs, examples = [], []
runs, examples, evaluation_results = [], [], []
for run, example in zip(self.runs, self.examples):
runs.append(run)
examples.append(example)

for evaluation_result in self.evaluation_results:
evaluation_results.append(evaluation_result["results"])

aggregate_feedback = []
with ls_utils.ContextThreadPoolExecutor() as executor:
project_id = self._get_experiment().id if self._upload_results else None
Expand All @@ -1699,7 +1697,9 @@ def _apply_summary_evaluators(
):
for evaluator in summary_evaluators:
try:
summary_eval_result = evaluator(runs, examples)
summary_eval_result = evaluator(
runs, examples, evaluation_results
)
# TODO: Expose public API for this.
flattened_results = self.client._select_eval_results(
summary_eval_result,
Expand Down Expand Up @@ -1793,16 +1793,20 @@ def _wrap(evaluator: SUMMARY_EVALUATOR_T) -> SUMMARY_EVALUATOR_T:

@functools.wraps(evaluator)
def _wrapper_inner(
runs: Sequence[schemas.Run], examples: Sequence[schemas.Example]
runs: Sequence[schemas.Run],
examples: Sequence[schemas.Example],
evaluation_results: Sequence[list[EvaluationResult]],
) -> Union[EvaluationResult, EvaluationResults]:
@rh.traceable(name=eval_name)
def _wrapper_super_inner(
runs_: str, examples_: str
runs_: str, examples_: str, evaluation_results_: str
) -> Union[EvaluationResult, EvaluationResults]:
return evaluator(list(runs), list(examples))
return evaluator(list(runs), list(examples), list(evaluation_results))

return _wrapper_super_inner(
f"Runs[] (Length={len(runs)})", f"Examples[] (Length={len(examples)})"
f"Runs[] (Length={len(runs)})",
f"Examples[] (Length={len(examples)})",
f"EvaluationResults[] (Length={len(evaluation_results)})",
)

return _wrapper_inner
Expand Down Expand Up @@ -2173,7 +2177,7 @@ def extract_evaluation_results_keys(node, variables):


def _to_pandas(
results: list[ExperimentResultRow],
results: list[schemas.ExperimentResultRow],
start: Optional[int] = 0,
end: Optional[int] = None,
):
Expand All @@ -2190,7 +2194,7 @@ def _to_pandas(


def _flatten_experiment_results(
results: list[ExperimentResultRow],
results: list[schemas.ExperimentResultRow],
start: Optional[int] = 0,
end: Optional[int] = None,
):
Expand Down
Loading
Loading