From 028ad6575f4a2b1cabade85e97f0d1b4eff2872b Mon Sep 17 00:00:00 2001 From: isaac hershenson Date: Tue, 17 Dec 2024 18:26:22 -0800 Subject: [PATCH] attachment reader copies --- python/langsmith/evaluation/_arunner.py | 34 ++++++++++++------- python/langsmith/evaluation/evaluator.py | 11 +++++- python/langsmith/schemas.py | 8 ++--- python/tests/integration_tests/test_client.py | 24 ++++++++----- 4 files changed, 50 insertions(+), 27 deletions(-) diff --git a/python/langsmith/evaluation/_arunner.py b/python/langsmith/evaluation/_arunner.py index c0f9822b8..dc9cb0bbd 100644 --- a/python/langsmith/evaluation/_arunner.py +++ b/python/langsmith/evaluation/_arunner.py @@ -659,14 +659,15 @@ async def awith_predictions_and_evaluators( max_concurrency: Optional[int] = None, ) -> _AsyncExperimentManager: """Run predictions and evaluations in a single pipeline. - + This allows evaluators to process results as soon as they're available from the target function, rather than waiting for all predictions to complete first. """ evaluators = _resolve_evaluators(evaluators) - - if not hasattr(self, '_evaluator_executor'): + + if not hasattr(self, "_evaluator_executor"): self._evaluator_executor = cf.ThreadPoolExecutor(max_workers=4) + async def process_examples(): async for pred in self._apredict( target, @@ -676,7 +677,11 @@ async def process_examples(): example, run = pred["example"], pred["run"] result = self._arun_evaluators( evaluators, - {"run": run, "example": example, "evaluation_results": {"results": []}}, + { + "run": run, + "example": example, + "evaluation_results": {"results": []}, + }, executor=self._evaluator_executor, ) yield result @@ -688,7 +693,7 @@ async def process_examples(): ) r1, r2, r3 = aitertools.atee(experiment_results, 3, lock=asyncio.Lock()) - + return _AsyncExperimentManager( (result["example"] async for result in r1), experiment=self._experiment, @@ -856,13 +861,16 @@ async def _arun_evaluators( example = current_results["example"] eval_results = current_results["evaluation_results"] lock = asyncio.Lock() + async def _run_single_evaluator(evaluator): try: evaluator_response = await evaluator.aevaluate_run( run=run, example=example, ) - selected_results = self.client._select_eval_results(evaluator_response) + selected_results = self.client._select_eval_results( + evaluator_response + ) async with lock: eval_results["results"].extend(selected_results) @@ -885,7 +893,9 @@ async def _run_single_evaluator(evaluator): for key in feedback_keys ] ) - selected_results = self.client._select_eval_results(error_response) + selected_results = self.client._select_eval_results( + error_response + ) async with lock: eval_results["results"].extend(selected_results) if self._upload_results: @@ -900,8 +910,10 @@ async def _run_single_evaluator(evaluator): f" run {run.id}: {repr(e)}", exc_info=True, ) - - await asyncio.gather(*[_run_single_evaluator(evaluator) for evaluator in evaluators]) + + await asyncio.gather( + *[_run_single_evaluator(evaluator) for evaluator in evaluators] + ) return ExperimentResultRow( run=run, example=example, @@ -1118,10 +1130,6 @@ def _get_run(r: run_trees.RunTree) -> None: client=client, ), ) - if include_attachments and example.attachments is not None: - for attachment in example.attachments: - reader = example.attachments[attachment]["reader"] - reader.seek(0) except Exception as e: logger.error( f"Error running target function: {e}", exc_info=True, stacklevel=1 diff --git a/python/langsmith/evaluation/evaluator.py b/python/langsmith/evaluation/evaluator.py index a1505699a..fc1f07685 100644 --- a/python/langsmith/evaluation/evaluator.py +++ b/python/langsmith/evaluation/evaluator.py @@ -4,6 +4,7 @@ import asyncio import inspect +import io import uuid from abc import abstractmethod from typing import ( @@ -666,7 +667,15 @@ async def awrapper( "example": example, "inputs": example.inputs if example else {}, "outputs": run.outputs or {}, - "attachments": example.attachments or {} if example else {}, + "attachments": { + name: { + "presigned_url": value["presigned_url"], + "reader": io.BytesIO(value["reader"].getvalue()), + } + for name, value in (example.attachments or {}).items() + } + if example + else {}, "reference_outputs": example.outputs or {} if example else {}, } args = (arg_map[arg] for arg in positional_args) diff --git a/python/langsmith/schemas.py b/python/langsmith/schemas.py index acedaf177..a6ca393b6 100644 --- a/python/langsmith/schemas.py +++ b/python/langsmith/schemas.py @@ -76,14 +76,14 @@ def read(self, size: int = -1) -> bytes: """Read function.""" ... - def write(self, b: bytes) -> int: - """Write function.""" - ... - def seek(self, offset: int, whence: int = 0) -> int: """Seek function.""" ... + def getvalue(self) -> bytes: + """Get value function.""" + ... + class ExampleBase(BaseModel): """Example base model.""" diff --git a/python/tests/integration_tests/test_client.py b/python/tests/integration_tests/test_client.py index 3bcd9d04c..a87acc8bb 100644 --- a/python/tests/integration_tests/test_client.py +++ b/python/tests/integration_tests/test_client.py @@ -1482,21 +1482,27 @@ def evaluator(run: Run, example: Example) -> Dict[str, Any]: async def test_aevaluate_with_attachments(langchain_client: Client) -> None: """Test evaluating examples with attachments.""" dataset_name = "__test_aevaluate_attachments" + uuid4().hex[:4] + langchain_client = Client( + api_key="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" + ) dataset = langchain_client.create_dataset( dataset_name, description="Test dataset for evals with attachments", data_type=DataType.kv, ) - example = ExampleUploadWithAttachments( - inputs={"question": "What is shown in the image?"}, - outputs={"answer": "test image"}, - attachments={ - "image": ("image/png", b"fake image data for testing"), - }, - ) + examples = [ + ExampleUploadWithAttachments( + inputs={"question": "What is shown in the image?"}, + outputs={"answer": "test image"}, + attachments={ + "image": ("image/png", b"fake image data for testing"), + }, + ) + for i in range(10) + ] - langchain_client.upload_examples_multipart(dataset_id=dataset.id, uploads=[example]) + langchain_client.upload_examples_multipart(dataset_id=dataset.id, uploads=examples) async def target( inputs: Dict[str, Any], attachments: Dict[str, Any] @@ -1542,7 +1548,7 @@ async def evaluator_2( max_concurrency=3, ) - assert len(results) == 2 + assert len(results) == 20 async for result in results: assert result["evaluation_results"]["results"][0].score == 1.0 assert result["evaluation_results"]["results"][1].score == 1.0