Skip to content

Commit

Permalink
attachment reader copies
Browse files Browse the repository at this point in the history
  • Loading branch information
isahers1 committed Dec 18, 2024
1 parent 977340a commit 028ad65
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 27 deletions.
34 changes: 21 additions & 13 deletions python/langsmith/evaluation/_arunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -659,14 +659,15 @@ async def awith_predictions_and_evaluators(
max_concurrency: Optional[int] = None,
) -> _AsyncExperimentManager:
"""Run predictions and evaluations in a single pipeline.
This allows evaluators to process results as soon as they're available from
the target function, rather than waiting for all predictions to complete first.
"""
evaluators = _resolve_evaluators(evaluators)
if not hasattr(self, '_evaluator_executor'):

if not hasattr(self, "_evaluator_executor"):
self._evaluator_executor = cf.ThreadPoolExecutor(max_workers=4)

async def process_examples():
async for pred in self._apredict(
target,
Expand All @@ -676,7 +677,11 @@ async def process_examples():
example, run = pred["example"], pred["run"]
result = self._arun_evaluators(
evaluators,
{"run": run, "example": example, "evaluation_results": {"results": []}},
{
"run": run,
"example": example,
"evaluation_results": {"results": []},
},
executor=self._evaluator_executor,
)
yield result
Expand All @@ -688,7 +693,7 @@ async def process_examples():
)

r1, r2, r3 = aitertools.atee(experiment_results, 3, lock=asyncio.Lock())

return _AsyncExperimentManager(
(result["example"] async for result in r1),
experiment=self._experiment,
Expand Down Expand Up @@ -856,13 +861,16 @@ async def _arun_evaluators(
example = current_results["example"]
eval_results = current_results["evaluation_results"]
lock = asyncio.Lock()

async def _run_single_evaluator(evaluator):
try:
evaluator_response = await evaluator.aevaluate_run(
run=run,
example=example,
)
selected_results = self.client._select_eval_results(evaluator_response)
selected_results = self.client._select_eval_results(
evaluator_response
)
async with lock:
eval_results["results"].extend(selected_results)

Expand All @@ -885,7 +893,9 @@ async def _run_single_evaluator(evaluator):
for key in feedback_keys
]
)
selected_results = self.client._select_eval_results(error_response)
selected_results = self.client._select_eval_results(
error_response
)
async with lock:
eval_results["results"].extend(selected_results)
if self._upload_results:
Expand All @@ -900,8 +910,10 @@ async def _run_single_evaluator(evaluator):
f" run {run.id}: {repr(e)}",
exc_info=True,
)

await asyncio.gather(*[_run_single_evaluator(evaluator) for evaluator in evaluators])

await asyncio.gather(
*[_run_single_evaluator(evaluator) for evaluator in evaluators]
)
return ExperimentResultRow(
run=run,
example=example,
Expand Down Expand Up @@ -1118,10 +1130,6 @@ def _get_run(r: run_trees.RunTree) -> None:
client=client,
),
)
if include_attachments and example.attachments is not None:
for attachment in example.attachments:
reader = example.attachments[attachment]["reader"]
reader.seek(0)
except Exception as e:
logger.error(
f"Error running target function: {e}", exc_info=True, stacklevel=1
Expand Down
11 changes: 10 additions & 1 deletion python/langsmith/evaluation/evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import asyncio
import inspect
import io
import uuid
from abc import abstractmethod
from typing import (
Expand Down Expand Up @@ -666,7 +667,15 @@ async def awrapper(
"example": example,
"inputs": example.inputs if example else {},
"outputs": run.outputs or {},
"attachments": example.attachments or {} if example else {},
"attachments": {
name: {
"presigned_url": value["presigned_url"],
"reader": io.BytesIO(value["reader"].getvalue()),
}
for name, value in (example.attachments or {}).items()
}
if example
else {},
"reference_outputs": example.outputs or {} if example else {},
}
args = (arg_map[arg] for arg in positional_args)
Expand Down
8 changes: 4 additions & 4 deletions python/langsmith/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,14 @@ def read(self, size: int = -1) -> bytes:
"""Read function."""
...

def write(self, b: bytes) -> int:
"""Write function."""
...

def seek(self, offset: int, whence: int = 0) -> int:
"""Seek function."""
...

def getvalue(self) -> bytes:
"""Get value function."""
...


class ExampleBase(BaseModel):
"""Example base model."""
Expand Down
24 changes: 15 additions & 9 deletions python/tests/integration_tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1482,21 +1482,27 @@ def evaluator(run: Run, example: Example) -> Dict[str, Any]:
async def test_aevaluate_with_attachments(langchain_client: Client) -> None:
"""Test evaluating examples with attachments."""
dataset_name = "__test_aevaluate_attachments" + uuid4().hex[:4]
langchain_client = Client(
api_key="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"
)
dataset = langchain_client.create_dataset(
dataset_name,
description="Test dataset for evals with attachments",
data_type=DataType.kv,
)

example = ExampleUploadWithAttachments(
inputs={"question": "What is shown in the image?"},
outputs={"answer": "test image"},
attachments={
"image": ("image/png", b"fake image data for testing"),
},
)
examples = [
ExampleUploadWithAttachments(
inputs={"question": "What is shown in the image?"},
outputs={"answer": "test image"},
attachments={
"image": ("image/png", b"fake image data for testing"),
},
)
for i in range(10)
]

langchain_client.upload_examples_multipart(dataset_id=dataset.id, uploads=[example])
langchain_client.upload_examples_multipart(dataset_id=dataset.id, uploads=examples)

async def target(
inputs: Dict[str, Any], attachments: Dict[str, Any]
Expand Down Expand Up @@ -1542,7 +1548,7 @@ async def evaluator_2(
max_concurrency=3,
)

assert len(results) == 2
assert len(results) == 20
async for result in results:
assert result["evaluation_results"]["results"][0].score == 1.0
assert result["evaluation_results"]["results"][1].score == 1.0
Expand Down

0 comments on commit 028ad65

Please sign in to comment.