diff --git a/python/bench/upload_example_with_large_file_attachment.py b/python/bench/upload_example_with_large_file_attachment.py new file mode 100644 index 000000000..8aaedd696 --- /dev/null +++ b/python/bench/upload_example_with_large_file_attachment.py @@ -0,0 +1,111 @@ +import os +import statistics +import time +from pathlib import Path +from typing import Dict + +from langsmith import Client +from langsmith.schemas import ExampleUpsertWithAttachments + +WRITE_BATCH = 10000 + + +def create_large_file(size: int, dir: str) -> str: + """Create a large file for benchmarking purposes.""" + filename = f"large_file_{size}.txt" + filepath = os.path.join(dir, filename) + + # delete the file if it exists + print("Deleting existing file...") + if os.path.exists(filepath): + os.remove(filepath) + + print("Creating big file...") + with open(filepath, "w") as f: + curr_size = 0 + while curr_size < size: + f.write("a" * (size - curr_size)) + curr_size += size - curr_size + + print("Done creating big file...") + return filepath + + +DATASET_NAME = "upsert_big_file_to_dataset" + + +def benchmark_big_file_upload( + size_bytes: int, num_examples: int, samples: int = 1 +) -> Dict: + """ + Benchmark run creation with specified parameters. + Returns timing statistics. + """ + multipart_timings = [] + + for _ in range(samples): + client = Client() + + if client.has_dataset(dataset_name=DATASET_NAME): + client.delete_dataset(dataset_name=DATASET_NAME) + + dataset = client.create_dataset( + DATASET_NAME, + description="Test dataset for big file upload", + ) + large_file = create_large_file(size_bytes, "/tmp") + examples = [ + ExampleUpsertWithAttachments( + dataset_id=dataset.id, + inputs={"a": 1}, + outputs={"b": 2}, + attachments={ + "bigfile": ("text/plain", Path(large_file)), + }, + ) + for _ in range(num_examples) + ] + + multipart_start = time.perf_counter() + client.upsert_examples_multipart(upserts=examples) + multipart_elapsed = time.perf_counter() - multipart_start + + multipart_timings.append(multipart_elapsed) + + return { + "mean": statistics.mean(multipart_timings), + "median": statistics.median(multipart_timings), + "stdev": ( + statistics.stdev(multipart_timings) if len(multipart_timings) > 1 else 0 + ), + "min": min(multipart_timings), + "max": max(multipart_timings), + } + + +size_bytes = 50000000 +num_examples = 10 + + +def main(size_bytes: int, num_examples: int = 1): + """ + Run benchmarks with different combinations of parameters and report results. + """ + results = benchmark_big_file_upload(size_bytes, num_examples) + + print(f"\nBenchmark Results for size {size_bytes} and {num_examples} examples:") + print("-" * 30) + print(f"{'Metric':<15} {'Result':>20}") + print("-" * 30) + + metrics = ["mean", "median", "stdev", "min", "max"] + for metric in metrics: + print(f"{results[metric]:>20.4f}") + + print("-" * 30) + print(f"{'Throughput':<15} {num_examples / results['mean']:>20.2f} ") + print("(examples/second)") + + +if __name__ == "__main__": + main(size_bytes, num_examples) diff --git a/python/bench/upload_examples_bench.py b/python/bench/upload_examples_bench.py new file mode 100644 index 000000000..5a22a731b --- /dev/null +++ b/python/bench/upload_examples_bench.py @@ -0,0 +1,143 @@ +import statistics +import time +from typing import Dict +from uuid import uuid4 + +from langsmith import Client +from langsmith.schemas import DataType, ExampleUpsertWithAttachments + + +def create_large_json(length: int) -> Dict: + """Create a large JSON object for benchmarking purposes.""" + large_array = [ + { + "index": i, + "data": f"This is element number {i}", + "nested": {"id": i, "value": f"Nested value for element {i}"}, + } + for i in range(length) + ] + + return { + "name": "Huge JSON" + str(uuid4()), + "description": "This is a very large JSON object for benchmarking purposes.", + "array": large_array, + "metadata": { + "created_at": "2024-10-22T19:00:00Z", + "author": "Python Program", + "version": 1.0, + }, + } + + +def create_example_data(dataset_id: str, json_size: int) -> Dict: + """Create a single example data object.""" + return ExampleUpsertWithAttachments( + **{ + "dataset_id": dataset_id, + "inputs": create_large_json(json_size), + "outputs": create_large_json(json_size), + } + ) + + +DATASET_NAME = "upsert_llm_evaluator_benchmark_dataset" + + +def benchmark_example_uploading( + num_examples: int, json_size: int, samples: int = 1 +) -> Dict: + """ + Benchmark run creation with specified parameters. + Returns timing statistics. + """ + multipart_timings, old_timings = [], [] + + for _ in range(samples): + client = Client() + + if client.has_dataset(dataset_name=DATASET_NAME): + client.delete_dataset(dataset_name=DATASET_NAME) + + dataset = client.create_dataset( + DATASET_NAME, + description="Test dataset for multipart example upload", + data_type=DataType.kv, + ) + examples = [ + create_example_data(dataset.id, json_size) for i in range(num_examples) + ] + + # Old method + old_start = time.perf_counter() + # inputs = [e.inputs for e in examples] + # outputs = [e.outputs for e in examples] + # # the create_examples endpoint fails above 20mb + # # so this will crash with json_size > ~100 + # client.create_examples(inputs=inputs, outputs=outputs, dataset_id=dataset.id) + old_elapsed = time.perf_counter() - old_start + + # New method + multipart_start = time.perf_counter() + client.upsert_examples_multipart(upserts=examples) + multipart_elapsed = time.perf_counter() - multipart_start + + multipart_timings.append(multipart_elapsed) + old_timings.append(old_elapsed) + + return { + "old": { + "mean": statistics.mean(old_timings), + "median": statistics.median(old_timings), + "stdev": statistics.stdev(old_timings) if len(old_timings) > 1 else 0, + "min": min(old_timings), + "max": max(old_timings), + }, + "new": { + "mean": statistics.mean(multipart_timings), + "median": statistics.median(multipart_timings), + "stdev": ( + statistics.stdev(multipart_timings) if len(multipart_timings) > 1 else 0 + ), + "min": min(multipart_timings), + "max": max(multipart_timings), + }, + } + + +json_size = 1000 +num_examples = 1000 + + +def main(json_size: int, num_examples: int): + """ + Run benchmarks with different combinations of parameters and report results. + """ + results = benchmark_example_uploading( + num_examples=num_examples, json_size=json_size + ) + + print( + f"\nBenchmark Results for {num_examples} examples with JSON size {json_size}:" + ) + print("-" * 60) + print(f"{'Metric':<15} {'Old Method':>20} {'New Method':>20}") + print("-" * 60) + + metrics = ["mean", "median", "stdev", "min", "max"] + for metric in metrics: + print( + f"{metric:<15} {results['old'][metric]:>20.4f} " + f"{results['new'][metric]:>20.4f}" + ) + + print("-" * 60) + print( + f"{'Throughput':<15} {num_examples / results['old']['mean']:>20.2f} " + f"{num_examples / results['new']['mean']:>20.2f}" + ) + print("(examples/second)") + + +if __name__ == "__main__": + main(json_size, num_examples) diff --git a/python/langsmith/client.py b/python/langsmith/client.py index e422dbec1..a92a89659 100644 --- a/python/langsmith/client.py +++ b/python/langsmith/client.py @@ -34,6 +34,7 @@ import warnings import weakref from inspect import signature +from pathlib import Path from queue import PriorityQueue from typing import ( TYPE_CHECKING, @@ -83,6 +84,7 @@ _SIZE_LIMIT_BYTES, ) from langsmith._internal._multipart import ( + MultipartPart, MultipartPartsAndContext, join_multipart_parts_and_context, ) @@ -946,7 +948,6 @@ def _get_paginated_list( params=params_, ) items = response.json() - if not items: break yield from items @@ -1682,9 +1683,7 @@ def update_run( events: Optional[Sequence[dict]] = None, extra: Optional[Dict] = None, tags: Optional[List[str]] = None, - attachments: Optional[ - Dict[str, tuple[str, bytes] | ls_schemas.Attachment] - ] = None, + attachments: Optional[ls_schemas.Attachments] = None, **kwargs: Any, ) -> None: """Update a run in the LangSmith API. @@ -3463,6 +3462,262 @@ def create_example_from_run( created_at=created_at, ) + def _prepare_multipart_data( + self, + examples: Union[ + List[ls_schemas.ExampleUploadWithAttachments] + | List[ls_schemas.ExampleUpsertWithAttachments] + | List[ls_schemas.ExampleUpdateWithAttachments], + ], + include_dataset_id: bool = False, + ) -> Tuple[Any, bytes]: + parts: List[MultipartPart] = [] + if include_dataset_id: + if not isinstance(examples[0], ls_schemas.ExampleUpsertWithAttachments): + raise ValueError( + "The examples must be of type ExampleUpsertWithAttachments" + " if include_dataset_id is True" + ) + dataset_id = examples[0].dataset_id + + for example in examples: + if ( + not isinstance(example, ls_schemas.ExampleUploadWithAttachments) + and not isinstance(example, ls_schemas.ExampleUpsertWithAttachments) + and not isinstance(example, ls_schemas.ExampleUpdateWithAttachments) + ): + raise ValueError( + "The examples must be of type ExampleUploadWithAttachments" + " or ExampleUpsertWithAttachments" + " or ExampleUpdateWithAttachments" + ) + if example.id is not None: + example_id = str(example.id) + else: + example_id = str(uuid.uuid4()) + + if isinstance(example, ls_schemas.ExampleUpdateWithAttachments): + created_at = None + else: + created_at = example.created_at + + example_body = { + **({"dataset_id": dataset_id} if include_dataset_id else {}), + **({"created_at": created_at} if created_at is not None else {}), + } + if example.metadata is not None: + example_body["metadata"] = example.metadata + if example.split is not None: + example_body["split"] = example.split + valb = _dumps_json(example_body) + + parts.append( + ( + f"{example_id}", + ( + None, + valb, + "application/json", + {}, + ), + ) + ) + + inputsb = _dumps_json(example.inputs) + + parts.append( + ( + f"{example_id}.inputs", + ( + None, + inputsb, + "application/json", + {}, + ), + ) + ) + + if example.outputs: + outputsb = _dumps_json(example.outputs) + parts.append( + ( + f"{example_id}.outputs", + ( + None, + outputsb, + "application/json", + {}, + ), + ) + ) + + if example.attachments: + for name, attachment in example.attachments.items(): + if isinstance(attachment, tuple): + if isinstance(attachment[1], Path): + mime_type, file_path = attachment + file_size = os.path.getsize(file_path) + parts.append( + ( + f"{example_id}.attachment.{name}", + ( + None, + open(file_path, "rb"), # type: ignore[arg-type] + f"{mime_type}; length={file_size}", + {}, + ), + ) + ) + else: + mime_type, data = attachment + parts.append( + ( + f"{example_id}.attachment.{name}", + ( + None, + data, + f"{mime_type}; length={len(data)}", + {}, + ), + ) + ) + else: + parts.append( + ( + f"{example_id}.attachment.{name}", + ( + None, + attachment.data, + f"{attachment.mime_type}; length={len(attachment.data)}", + {}, + ), + ) + ) + + if ( + isinstance(example, ls_schemas.ExampleUpdateWithAttachments) + and example.attachments_operations + ): + attachments_operationsb = _dumps_json(example.attachments_operations) + parts.append( + ( + f"{example_id}.attachments_operations", + ( + None, + attachments_operationsb, + "application/json", + {}, + ), + ) + ) + + encoder = rqtb_multipart.MultipartEncoder(parts, boundary=BOUNDARY) + if encoder.len <= 20_000_000: # ~20 MB + data = encoder.to_string() + else: + data = encoder + + return encoder, data + + def update_examples_multipart( + self, + *, + dataset_id: ID_TYPE, + updates: Optional[List[ls_schemas.ExampleUpdateWithAttachments]] = None, + ) -> ls_schemas.UpsertExamplesResponse: + """Upload examples.""" + if not (self.info.instance_flags or {}).get( + "dataset_examples_multipart_enabled", False + ): + raise ValueError( + "Your LangSmith version does not allow using the multipart examples endpoint, please update to the latest version." + ) + if updates is None: + updates = [] + + encoder, data = self._prepare_multipart_data(updates, include_dataset_id=False) + + response = self.request_with_retries( + "PATCH", + f"/v1/platform/datasets/{dataset_id}/examples", + request_kwargs={ + "data": data, + "headers": { + **self._headers, + "Content-Type": encoder.content_type, + }, + }, + ) + ls_utils.raise_for_status_with_text(response) + return response.json() + + def upload_examples_multipart( + self, + *, + dataset_id: ID_TYPE, + uploads: Optional[List[ls_schemas.ExampleUploadWithAttachments]] = None, + ) -> ls_schemas.UpsertExamplesResponse: + """Upload examples.""" + if not (self.info.instance_flags or {}).get( + "dataset_examples_multipart_enabled", False + ): + raise ValueError( + "Your LangSmith version does not allow using the multipart examples endpoint, please update to the latest version." + ) + if uploads is None: + uploads = [] + encoder, data = self._prepare_multipart_data(uploads, include_dataset_id=False) + + response = self.request_with_retries( + "POST", + f"/v1/platform/datasets/{dataset_id}/examples", + request_kwargs={ + "data": data, + "headers": { + **self._headers, + "Content-Type": encoder.content_type, + }, + }, + ) + ls_utils.raise_for_status_with_text(response) + return response.json() + + def upsert_examples_multipart( + self, + *, + upserts: Optional[List[ls_schemas.ExampleUpsertWithAttachments]] = None, + ) -> ls_schemas.UpsertExamplesResponse: + """Upsert examples. + + .. deprecated:: 0.1.0 + This method is deprecated. Use :func:`langsmith.upload_examples_multipart` instead. + + """ # noqa: E501 + if not (self.info.instance_flags or {}).get( + "examples_multipart_enabled", False + ): + raise ValueError( + "Your LangSmith version does not allow using the multipart examples endpoint, please update to the latest version." + ) + if upserts is None: + upserts = [] + + encoder, data = self._prepare_multipart_data(upserts, include_dataset_id=True) + + response = self.request_with_retries( + "POST", + "/v1/platform/examples/multipart", + request_kwargs={ + "data": data, + "headers": { + **self._headers, + "Content-Type": encoder.content_type, + }, + }, + ) + ls_utils.raise_for_status_with_text(response) + return response.json() + def create_examples( self, *, @@ -3637,8 +3892,22 @@ def read_example( "as_of": as_of.isoformat() if as_of else None, }, ) + + example = response.json() + attachments = {} + if example["attachment_urls"]: + for key, value in example["attachment_urls"].items(): + response = requests.get(value["presigned_url"], stream=True) + response.raise_for_status() + reader = io.BytesIO(response.content) + attachments[key.split(".")[1]] = { + "presigned_url": value["presigned_url"], + "reader": reader, + } + return ls_schemas.Example( - **response.json(), + **{k: v for k, v in example.items() if k != "attachment_urls"}, + attachments=attachments, _host_url=self._host_url, _tenant_id=self._get_optional_tenant_id(), ) @@ -3656,6 +3925,7 @@ def list_examples( limit: Optional[int] = None, metadata: Optional[dict] = None, filter: Optional[str] = None, + include_attachments: bool = False, **kwargs: Any, ) -> Iterator[ls_schemas.Example]: """Retrieve the example rows of the specified dataset. @@ -3705,11 +3975,25 @@ def list_examples( params["dataset"] = dataset_id else: pass + if include_attachments: + params["select"] = ["attachment_urls", "outputs", "metadata"] for i, example in enumerate( self._get_paginated_list("/examples", params=params) ): + attachments = {} + if example["attachment_urls"]: + for key, value in example["attachment_urls"].items(): + response = requests.get(value["presigned_url"], stream=True) + response.raise_for_status() + reader = io.BytesIO(response.content) + attachments[key.split(".")[1]] = { + "presigned_url": value["presigned_url"], + "reader": reader, + } + yield ls_schemas.Example( - **example, + **{k: v for k, v in example.items() if k != "attachment_urls"}, + attachments=attachments, _host_url=self._host_url, _tenant_id=self._get_optional_tenant_id(), ) @@ -3849,6 +4133,7 @@ def update_example( metadata: Optional[Dict] = None, split: Optional[str | List[str]] = None, dataset_id: Optional[ID_TYPE] = None, + attachments_operations: Optional[ls_schemas.AttachmentsOperations] = None, ) -> Dict[str, Any]: """Update a specific example. @@ -3873,12 +4158,20 @@ def update_example( Dict[str, Any] The updated example. """ + if attachments_operations is not None: + if not (self.info.instance_flags or {}).get( + "dataset_examples_multipart_enabled", False + ): + raise ValueError( + "Your LangSmith version does not allow using the attachment operations, please update to the latest version." + ) example = dict( inputs=inputs, outputs=outputs, dataset_id=dataset_id, metadata=metadata, split=split, + attachments_operations=attachments_operations, ) response = self.request_with_retries( "PATCH", @@ -3898,6 +4191,9 @@ def update_examples( metadata: Optional[Sequence[Optional[Dict]]] = None, splits: Optional[Sequence[Optional[str | List[str]]]] = None, dataset_ids: Optional[Sequence[Optional[ID_TYPE]]] = None, + attachments_operations: Optional[ + Sequence[Optional[ls_schemas.AttachmentsOperations]] + ] = None, ) -> Dict[str, Any]: """Update multiple examples. @@ -3922,12 +4218,20 @@ def update_examples( Dict[str, Any] The response from the server (specifies the number of examples updated). """ + if attachments_operations is not None: + if not (self.info.instance_flags or {}).get( + "dataset_examples_multipart_enabled", False + ): + raise ValueError( + "Your LangSmith version does not allow using the attachment operations, please update to the latest version." + ) sequence_args = { "inputs": inputs, "outputs": outputs, "metadata": metadata, "splits": splits, "dataset_ids": dataset_ids, + "attachments_operations": attachments_operations, } # Since inputs are required, we will check against them examples_len = len(example_ids) @@ -3945,14 +4249,16 @@ def update_examples( "dataset_id": dataset_id_, "metadata": metadata_, "split": split_, + "attachments_operations": attachments_operations_, } - for id_, in_, out_, metadata_, split_, dataset_id_ in zip( + for id_, in_, out_, metadata_, split_, dataset_id_, attachments_operations_ in zip( example_ids, inputs or [None] * len(example_ids), outputs or [None] * len(example_ids), metadata or [None] * len(example_ids), splits or [None] * len(example_ids), dataset_ids or [None] * len(example_ids), + attachments_operations or [None] * len(example_ids), ) ] response = self.request_with_retries( diff --git a/python/langsmith/evaluation/_arunner.py b/python/langsmith/evaluation/_arunner.py index 4d5c063f6..25ea0d62a 100644 --- a/python/langsmith/evaluation/_arunner.py +++ b/python/langsmith/evaluation/_arunner.py @@ -37,14 +37,17 @@ DATA_T, EVALUATOR_T, ExperimentResultRow, + _evaluators_include_attachments, _ExperimentManagerMixin, _extract_feedback_keys, _ForwardResults, + _include_attachments, _is_langchain_runnable, _load_examples_map, _load_experiment, _load_tqdm, _load_traces, + _make_fresh_examples, _resolve_data, _resolve_evaluators, _resolve_experiment, @@ -68,7 +71,9 @@ logger = logging.getLogger(__name__) -ATARGET_T = Callable[[dict], Awaitable[dict]] +ATARGET_T = Union[ + Callable[[dict], Awaitable[dict]], Callable[[dict, dict], Awaitable[dict]] +] async def aevaluate( @@ -256,6 +261,7 @@ async def aevaluate( ... ) # doctest: +ELLIPSIS View the evaluation results for experiment:... + .. versionchanged:: 0.2.0 'max_concurrency' default updated from None (no limit on concurrency) @@ -473,6 +479,8 @@ async def _aevaluate( description=description, num_repetitions=num_repetitions, runs=runs, + include_attachments=_include_attachments(target) + or _evaluators_include_attachments(evaluators), upload_results=upload_results, ).astart() cache_dir = ls_utils.get_cache_dir(None) @@ -534,6 +542,7 @@ def __init__( summary_results: Optional[AsyncIterable[EvaluationResults]] = None, description: Optional[str] = None, num_repetitions: int = 1, + include_attachments: bool = False, upload_results: bool = True, ): super().__init__( @@ -550,14 +559,23 @@ def __init__( self._evaluation_results = evaluation_results self._summary_results = summary_results self._num_repetitions = num_repetitions + self._include_attachments = include_attachments self._upload_results = upload_results async def aget_examples(self) -> AsyncIterator[schemas.Example]: if self._examples is None: - self._examples = _aresolve_data(self._data, client=self.client) + self._examples = _aresolve_data( + self._data, + client=self.client, + include_attachments=self._include_attachments, + ) if self._num_repetitions > 1: + examples_list = [example async for example in self._examples] self._examples = async_chain_from_iterable( - aitertools.atee(self._examples, self._num_repetitions) + [ + async_iter_from_list(_make_fresh_examples(examples_list)) + for _ in range(self._num_repetitions) + ] ) self._examples, examples_iter = aitertools.atee( @@ -620,6 +638,7 @@ async def astart(self) -> _AsyncExperimentManager: client=self.client, runs=self._runs, evaluation_results=self._evaluation_results, + include_attachments=self._include_attachments, upload_results=self._upload_results, ) @@ -629,7 +648,11 @@ async def awith_predictions( /, max_concurrency: Optional[int] = None, ) -> _AsyncExperimentManager: - _experiment_results = self._apredict(target, max_concurrency=max_concurrency) + _experiment_results = self._apredict( + target, + max_concurrency=max_concurrency, + include_attachments=_include_attachments(target), + ) r1, r2 = aitertools.atee(_experiment_results, 2, lock=asyncio.Lock()) return _AsyncExperimentManager( (pred["example"] async for pred in r1), @@ -637,6 +660,7 @@ async def awith_predictions( metadata=self._metadata, client=self.client, runs=(pred["run"] async for pred in r2), + include_attachments=self._include_attachments, upload_results=self._upload_results, ) @@ -657,6 +681,7 @@ async def awith_evaluators( runs=(result["run"] async for result in r2), evaluation_results=(result["evaluation_results"] async for result in r3), summary_results=self._summary_results, + include_attachments=self._include_attachments, upload_results=self._upload_results, ) @@ -674,6 +699,7 @@ async def awith_summary_evaluators( runs=self.aget_runs(), evaluation_results=self._evaluation_results, summary_results=aggregate_feedback_gen, + include_attachments=self._include_attachments, upload_results=self._upload_results, ) @@ -701,7 +727,11 @@ async def aget_summary_scores(self) -> Dict[str, List[dict]]: ## Private methods async def _apredict( - self, target: ATARGET_T, /, max_concurrency: Optional[int] = None + self, + target: ATARGET_T, + /, + max_concurrency: Optional[int] = None, + include_attachments: bool = False, ) -> AsyncIterator[_ForwardResults]: fn = _ensure_async_traceable(target) @@ -709,7 +739,12 @@ async def predict_all(): async for example in await self.aget_examples(): # Yield the coroutine to be awaited later yield _aforward( - fn, example, self.experiment_name, self._metadata, self.client + fn, + example, + self.experiment_name, + self._metadata, + self.client, + include_attachments, ) async for result in aitertools.aiter_with_concurrency( @@ -993,6 +1028,7 @@ async def _aforward( experiment_name: str, metadata: dict, client: langsmith.Client, + include_attachments: bool = False, ) -> _ForwardResults: run: Optional[schemas.RunBase] = None @@ -1002,8 +1038,13 @@ def _get_run(r: run_trees.RunTree) -> None: with rh.tracing_context(enabled=True): try: + args = ( + (example.inputs, example.attachments) + if include_attachments + else (example.inputs,) + ) await fn( - example.inputs, + *args, langsmith_extra=rh.LangSmithExtra( reference_example_id=example.id, on_end=_get_run, @@ -1019,6 +1060,10 @@ def _get_run(r: run_trees.RunTree) -> None: client=client, ), ) + if include_attachments and example.attachments is not None: + for attachment in example.attachments: + reader = example.attachments[attachment]["reader"] + reader.seek(0) except Exception as e: logger.error( f"Error running target function: {e}", exc_info=True, stacklevel=1 @@ -1055,17 +1100,22 @@ def _ensure_async_traceable( return target # type: ignore else: if _is_langchain_runnable(target): - target = target.ainvoke # type: ignore[attr-defined] - return rh.traceable(name="AsyncTarget")(target) + target = target.ainvoke # type: ignore[union-attr] + return rh.traceable(name="AsyncTarget")(target) # type: ignore[arg-type] def _aresolve_data( - data: Union[DATA_T, AsyncIterable[schemas.Example]], *, client: langsmith.Client + data: Union[DATA_T, AsyncIterable[schemas.Example]], + *, + client: langsmith.Client, + include_attachments: bool = False, ) -> AsyncIterator[schemas.Example]: """Return the examples for the given dataset.""" if isinstance(data, AsyncIterable): return aitertools.ensure_async_iterator(data) - return aitertools.ensure_async_iterator(_resolve_data(data, client=client)) + return aitertools.ensure_async_iterator( + _resolve_data(data, client=client, include_attachments=include_attachments) + ) T = TypeVar("T") @@ -1078,3 +1128,11 @@ async def async_chain_from_iterable( for sub_iterable in iterable: async for item in sub_iterable: yield item + + +async def async_iter_from_list( + examples: List[schemas.Example], +) -> AsyncIterable[schemas.Example]: + """Convert a list of examples to an async iterable.""" + for example in examples: + yield example diff --git a/python/langsmith/evaluation/_runner.py b/python/langsmith/evaluation/_runner.py index bf7505284..ea206b098 100644 --- a/python/langsmith/evaluation/_runner.py +++ b/python/langsmith/evaluation/_runner.py @@ -8,6 +8,7 @@ import datetime import functools import inspect +import io import itertools import logging import pathlib @@ -36,6 +37,7 @@ cast, ) +import requests from typing_extensions import TypedDict, overload import langsmith @@ -68,7 +70,7 @@ DataFrame = Any logger = logging.getLogger(__name__) -TARGET_T = Callable[[dict], dict] +TARGET_T = Union[Callable[[dict], dict], Callable[[dict, dict], dict]] # Data format: dataset-name, dataset_id, or examples DATA_T = Union[str, uuid.UUID, Iterable[schemas.Example], schemas.Dataset] # Summary evaluator runs over the whole dataset @@ -1064,6 +1066,8 @@ def _evaluate( # If provided, we don't need to create a new experiment. runs=runs, # Create or resolve the experiment. + include_attachments=_include_attachments(target) + or _evaluators_include_attachments(evaluators), upload_results=upload_results, ).start() cache_dir = ls_utils.get_cache_dir(None) @@ -1312,6 +1316,7 @@ def __init__( summary_results: Optional[Iterable[EvaluationResults]] = None, description: Optional[str] = None, num_repetitions: int = 1, + include_attachments: bool = False, upload_results: bool = True, ): super().__init__( @@ -1326,15 +1331,22 @@ def __init__( self._evaluation_results = evaluation_results self._summary_results = summary_results self._num_repetitions = num_repetitions + self._include_attachments = include_attachments self._upload_results = upload_results @property def examples(self) -> Iterable[schemas.Example]: if self._examples is None: - self._examples = _resolve_data(self._data, client=self.client) + self._examples = _resolve_data( + self._data, + client=self.client, + include_attachments=self._include_attachments, + ) if self._num_repetitions > 1: + examples_list = list(self._examples) self._examples = itertools.chain.from_iterable( - itertools.tee(self._examples, self._num_repetitions) + _make_fresh_examples(examples_list) + for _ in range(self._num_repetitions) ) self._examples, examples_iter = itertools.tee(self._examples) return examples_iter @@ -1377,6 +1389,7 @@ def start(self) -> _ExperimentManager: client=self.client, runs=self._runs, evaluation_results=self._evaluation_results, + include_attachments=self._include_attachments, upload_results=self._upload_results, ) @@ -1389,7 +1402,10 @@ def with_predictions( """Lazily apply the target function to the experiment.""" context = copy_context() _experiment_results = context.run( - self._predict, target, max_concurrency=max_concurrency + self._predict, + target, + max_concurrency=max_concurrency, + include_attachments=_include_attachments(target), ) r1, r2 = itertools.tee(_experiment_results, 2) return _ExperimentManager( @@ -1400,6 +1416,7 @@ def with_predictions( runs=(pred["run"] for pred in r2), upload_results=self._upload_results, # TODO: Can't do multiple prediction rounds rn. + include_attachments=self._include_attachments, ) def with_evaluators( @@ -1430,6 +1447,7 @@ def with_evaluators( runs=(result["run"] for result in r2), evaluation_results=(result["evaluation_results"] for result in r3), summary_results=self._summary_results, + include_attachments=self._include_attachments, upload_results=self._upload_results, ) @@ -1451,6 +1469,7 @@ def with_summary_evaluators( runs=self.runs, evaluation_results=self._evaluation_results, summary_results=aggregate_feedback_gen, + include_attachments=self._include_attachments, upload_results=self._upload_results, ) @@ -1481,10 +1500,15 @@ def get_summary_scores(self) -> Dict[str, List[dict]]: # Private methods def _predict( - self, target: TARGET_T, /, max_concurrency: Optional[int] = None + self, + target: TARGET_T, + /, + max_concurrency: Optional[int] = None, + include_attachments: bool = False, ) -> Generator[_ForwardResults, None, None]: """Run the target function on the examples.""" fn = _ensure_traceable(target) + if max_concurrency == 0: for example in self.examples: yield _forward( @@ -1494,6 +1518,7 @@ def _predict( self._metadata, self.client, self._upload_results, + include_attachments, ) else: @@ -1507,6 +1532,7 @@ def _predict( self._metadata, self.client, self._upload_results, + include_attachments, ) for example in self.examples ] @@ -1794,6 +1820,7 @@ def _forward( metadata: dict, client: langsmith.Client, upload_results: bool, + include_attachments: bool = False, ) -> _ForwardResults: run: Optional[schemas.RunBase] = None @@ -1815,7 +1842,19 @@ def _get_run(r: rt.RunTree) -> None: client=client, ) try: - fn(example.inputs, langsmith_extra=langsmith_extra) + args = ( + (example.inputs, example.attachments) + if include_attachments + else (example.inputs,) + ) + fn( + *args, + langsmith_extra=langsmith_extra, + ) + if include_attachments and example.attachments is not None: + for attachment in example.attachments: + reader = example.attachments[attachment]["reader"] + reader.seek(0) except Exception as e: logger.error( f"Error running target function: {e}", exc_info=True, stacklevel=1 @@ -1832,17 +1871,28 @@ def _is_valid_uuid(value: str) -> bool: def _resolve_data( - data: DATA_T, *, client: langsmith.Client + data: DATA_T, + *, + client: langsmith.Client, + include_attachments: bool = False, ) -> Iterable[schemas.Example]: """Return the examples for the given dataset.""" if isinstance(data, uuid.UUID): - return client.list_examples(dataset_id=data) + return client.list_examples( + dataset_id=data, include_attachments=include_attachments + ) elif isinstance(data, str) and _is_valid_uuid(data): - return client.list_examples(dataset_id=uuid.UUID(data)) + return client.list_examples( + dataset_id=uuid.UUID(data), include_attachments=include_attachments + ) elif isinstance(data, str): - return client.list_examples(dataset_name=data) + return client.list_examples( + dataset_name=data, include_attachments=include_attachments + ) elif isinstance(data, schemas.Dataset): - return client.list_examples(dataset_id=data.id) + return client.list_examples( + dataset_id=data.id, include_attachments=include_attachments + ) return data @@ -1862,6 +1912,7 @@ def _ensure_traceable( " ...\n" ")" ) + if rh.is_traceable_function(target): fn: rh.SupportsLangsmithExtra[[dict], dict] = target else: @@ -1871,6 +1922,60 @@ def _ensure_traceable( return fn +def _evaluators_include_attachments( + evaluators: Optional[Sequence[Union[EVALUATOR_T, AEVALUATOR_T]]], +) -> bool: + if evaluators is None: + return False + + def evaluator_has_attachments(evaluator: Any) -> bool: + if not callable(evaluator): + return False + sig = inspect.signature(evaluator) + params = list(sig.parameters.values()) + positional_params = [ + p for p in params if p.kind in (p.POSITIONAL_ONLY, p.POSITIONAL_OR_KEYWORD) + ] + return any(p.name == "attachments" for p in positional_params) + + return any(evaluator_has_attachments(e) for e in evaluators) + + +def _include_attachments( + target: Any, +) -> bool: + """Whether the target function accepts attachments.""" + if _is_langchain_runnable(target) or not callable(target): + return False + # Check function signature + sig = inspect.signature(target) + params = list(sig.parameters.values()) + positional_params = [ + p for p in params if p.kind in (p.POSITIONAL_ONLY, p.POSITIONAL_OR_KEYWORD) + ] + positional_no_default = [p for p in positional_params if p.default is p.empty] + + if len(positional_params) == 0: + raise ValueError( + "Target function must accept at least one positional argument (inputs)." + ) + elif len(positional_no_default) > 2: + raise ValueError( + "Target function must accept at most two " + "arguments without default values: (inputs, attachments)." + ) + elif len(positional_no_default) == 2: + if [p.name for p in positional_no_default] != ["inputs", "attachments"]: + raise ValueError( + "When passing 2 positional arguments, they must be named " + "'inputs' and 'attachments', respectively. Received: " + f"{[p.name for p in positional_no_default]}" + ) + return True + else: + return [p.name for p in positional_params[:2]] == ["inputs", "attachments"] + + def _resolve_experiment( experiment: Optional[Union[schemas.TracerSession, str, uuid.UUID]], runs: Optional[Iterable[schemas.Run]], @@ -2122,3 +2227,42 @@ def _import_langchain_runnable() -> Optional[type]: def _is_langchain_runnable(o: Any) -> bool: return bool((Runnable := _import_langchain_runnable()) and isinstance(o, Runnable)) + + +def _reset_example_attachments(example: schemas.Example) -> schemas.Example: + """Reset attachment readers for an example.""" + if not hasattr(example, "attachments") or not example.attachments: + return example + + new_attachments = {} + for key, attachment in example.attachments.items(): + response = requests.get(attachment["presigned_url"], stream=True) + response.raise_for_status() + reader = io.BytesIO(response.content) + new_attachments[key] = { + "presigned_url": attachment["presigned_url"], + "reader": reader, + } + + # Create a new Example instance with the updated attachments + return schemas.Example( + id=example.id, + created_at=example.created_at, + dataset_id=example.dataset_id, + inputs=example.inputs, + outputs=example.outputs, + metadata=example.metadata, + modified_at=example.modified_at, + runs=example.runs, + source_run_id=example.source_run_id, + attachments=new_attachments, + _host_url=example._host_url, + _tenant_id=example._tenant_id, + ) + + +def _make_fresh_examples( + _original_examples: List[schemas.Example], +) -> List[schemas.Example]: + """Create fresh copies of examples with reset readers.""" + return [_reset_example_attachments(example) for example in _original_examples] diff --git a/python/langsmith/evaluation/evaluator.py b/python/langsmith/evaluation/evaluator.py index 02fab3b71..a1505699a 100644 --- a/python/langsmith/evaluation/evaluator.py +++ b/python/langsmith/evaluation/evaluator.py @@ -624,7 +624,14 @@ def _normalize_evaluator_func( Callable[[Run, Optional[Example]], _RUNNABLE_OUTPUT], Callable[[Run, Optional[Example]], Awaitable[_RUNNABLE_OUTPUT]], ]: - supported_args = ("run", "example", "inputs", "outputs", "reference_outputs") + supported_args = ( + "run", + "example", + "inputs", + "outputs", + "reference_outputs", + "attachments", + ) sig = inspect.signature(func) positional_args = [ pname @@ -659,6 +666,7 @@ async def awrapper( "example": example, "inputs": example.inputs if example else {}, "outputs": run.outputs or {}, + "attachments": example.attachments or {} if example else {}, "reference_outputs": example.outputs or {} if example else {}, } args = (arg_map[arg] for arg in positional_args) @@ -679,6 +687,7 @@ def wrapper(run: Run, example: Example) -> _RUNNABLE_OUTPUT: "example": example, "inputs": example.inputs if example else {}, "outputs": run.outputs or {}, + "attachments": example.attachments or {}, "reference_outputs": example.outputs or {} if example else {}, } args = (arg_map[arg] for arg in positional_args) diff --git a/python/langsmith/schemas.py b/python/langsmith/schemas.py index 7caca8f7b..acedaf177 100644 --- a/python/langsmith/schemas.py +++ b/python/langsmith/schemas.py @@ -64,7 +64,25 @@ def my_function(bar: int, my_val: Attachment): Attachments = Dict[str, Union[Tuple[str, bytes], Attachment]] -"""Attachments associated with the run. Each entry is a tuple of (mime_type, bytes).""" +"""Attachments associated with the run. +Each entry is a tuple of (mime_type, bytes), or (mime_type, file_path)""" + + +@runtime_checkable +class BinaryIOLike(Protocol): + """Protocol for binary IO-like objects.""" + + def read(self, size: int = -1) -> bytes: + """Read function.""" + ... + + def write(self, b: bytes) -> int: + """Write function.""" + ... + + def seek(self, offset: int, whence: int = 0) -> int: + """Seek function.""" + ... class ExampleBase(BaseModel): @@ -79,6 +97,7 @@ class Config: """Configuration class for the schema.""" frozen = True + arbitrary_types_allowed = True class ExampleCreate(ExampleBase): @@ -89,6 +108,32 @@ class ExampleCreate(ExampleBase): split: Optional[Union[str, List[str]]] = None +class ExampleUploadWithAttachments(BaseModel): + """Example upload with attachments.""" + + id: Optional[UUID] + created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) + inputs: Dict[str, Any] = Field(default_factory=dict) + outputs: Optional[Dict[str, Any]] = Field(default=None) + metadata: Optional[Dict[str, Any]] = Field(default=None) + split: Optional[Union[str, List[str]]] = None + attachments: Optional[Attachments] = None + + +class ExampleUpsertWithAttachments(ExampleUploadWithAttachments): + """Example create with attachments.""" + + dataset_id: UUID + + +class AttachmentInfo(TypedDict): + """Info for an attachment.""" + + presigned_url: str + reader: BinaryIOLike + # TODO: add mime type + + class Example(ExampleBase): """Example model.""" @@ -100,6 +145,9 @@ class Example(ExampleBase): modified_at: Optional[datetime] = Field(default=None) runs: List[Run] = Field(default_factory=list) source_run_id: Optional[UUID] = None + attachments: Optional[Dict[str, AttachmentInfo]] = Field(default=None) + """Dictionary with attachment names as keys and a tuple of the S3 url + and a reader of the data for the file.""" _host_url: Optional[str] = PrivateAttr(default=None) _tenant_id: Optional[UUID] = PrivateAttr(default=None) @@ -135,12 +183,24 @@ class ExampleSearch(ExampleBase): id: UUID +class AttachmentsOperations(BaseModel): + """Operations to perform on attachments.""" + + rename: Dict[str, str] = Field( + default_factory=dict, description="Mapping of old attachment names to new names" + ) + retain: List[str] = Field( + default_factory=list, description="List of attachment names to keep" + ) + + class ExampleUpdate(BaseModel): """Update class for Example.""" dataset_id: Optional[UUID] = None inputs: Optional[Dict[str, Any]] = None outputs: Optional[Dict[str, Any]] = None + attachments_operations: Optional[AttachmentsOperations] = None metadata: Optional[Dict[str, Any]] = None split: Optional[Union[str, List[str]]] = None @@ -150,6 +210,18 @@ class Config: frozen = True +class ExampleUpdateWithAttachments(ExampleUpdate): + """Example update with attachments.""" + + id: UUID + inputs: Dict[str, Any] = Field(default_factory=dict) + outputs: Optional[Dict[str, Any]] = Field(default=None) + metadata: Optional[Dict[str, Any]] = Field(default=None) + split: Optional[Union[str, List[str]]] = None + attachments: Optional[Attachments] = None + attachments_operations: Optional[AttachmentsOperations] = None + + class DataType(str, Enum): """Enum for dataset data types.""" @@ -718,6 +790,8 @@ class LangSmithInfo(BaseModel): license_expiration_time: Optional[datetime] = None """The time the license will expire.""" batch_ingest_config: Optional[BatchIngestConfig] = None + """The instance flags.""" + instance_flags: Optional[Dict[str, Any]] = None Example.update_forward_refs() @@ -1003,3 +1077,12 @@ class UsageMetadata(TypedDict): Does *not* need to sum to full output token count. Does *not* need to have all keys. """ + + +class UpsertExamplesResponse(TypedDict): + """Response object returned from the upsert_examples_multipart method.""" + + count: int + """The number of examples that were upserted.""" + example_ids: List[str] + """The ids of the examples that were upserted.""" diff --git a/python/pyproject.toml b/python/pyproject.toml index a831ff0df..5b008c34d 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "langsmith" -version = "0.2.1" +version = "0.2.2" description = "Client library to connect to the LangSmith LLM Tracing and Evaluation Platform." authors = ["LangChain "] license = "MIT" diff --git a/python/tests/integration_tests/test_client.py b/python/tests/integration_tests/test_client.py index 9bea700cd..f5f7ba878 100644 --- a/python/tests/integration_tests/test_client.py +++ b/python/tests/integration_tests/test_client.py @@ -20,10 +20,20 @@ from requests_toolbelt import MultipartEncoder, MultipartEncoderMonitor from langsmith.client import ID_TYPE, Client -from langsmith.schemas import DataType +from langsmith.evaluation import aevaluate, evaluate +from langsmith.schemas import ( + AttachmentsOperations, + DataType, + Example, + ExampleUpdateWithAttachments, + ExampleUploadWithAttachments, + ExampleUpsertWithAttachments, + Run, +) from langsmith.utils import ( LangSmithConnectionError, LangSmithError, + LangSmithNotFoundError, get_env_var, ) @@ -48,7 +58,14 @@ def wait_for( @pytest.fixture def langchain_client() -> Client: get_env_var.cache_clear() - return Client() + return Client( + info={ + "instance_flags": { + "dataset_examples_multipart_enabled": True, + "examples_multipart_enabled": True, + } + }, + ) def test_datasets(langchain_client: Client) -> None: @@ -369,6 +386,184 @@ def test_error_surfaced_invalid_uri(uri: str) -> None: client.create_run("My Run", inputs={"text": "hello world"}, run_type="llm") +def test_upload_examples_multipart(langchain_client: Client): + """Test uploading examples with attachments via multipart endpoint.""" + dataset_name = "__test_upload_examples_multipart" + uuid4().hex[:4] + if langchain_client.has_dataset(dataset_name=dataset_name): + langchain_client.delete_dataset(dataset_name=dataset_name) + + dataset = langchain_client.create_dataset( + dataset_name, + description="Test dataset for multipart example upload", + data_type=DataType.kv, + ) + + # Test example with all fields + example_id = uuid4() + example_1 = ExampleUploadWithAttachments( + id=example_id, + inputs={"text": "hello world"}, + attachments={ + "test_file": ("text/plain", b"test content"), + }, + ) + + # Test example with minimum required fields + example_2 = ExampleUploadWithAttachments( + inputs={"text": "minimal example"}, + ) + + # Test example with outputs and multiple attachments + example_3 = ExampleUploadWithAttachments( + inputs={"text": "example with outputs"}, + outputs={"response": "test response"}, + attachments={ + "file1": ("text/plain", b"content 1"), + "file2": ("text/plain", b"content 2"), + }, + ) + + # Test uploading multiple examples at once + created_examples = langchain_client.upload_examples_multipart( + dataset_id=dataset.id, uploads=[example_1, example_2, example_3] + ) + assert created_examples["count"] == 3 + + created_example_1 = langchain_client.read_example(example_id) + assert created_example_1.inputs["text"] == "hello world" + + # Verify the examples were created correctly + examples = [ + ex + for ex in langchain_client.list_examples( + dataset_id=dataset.id, + include_attachments=True, + ) + ] + assert len(examples) == 3 + + # Verify example with ID was created with correct ID + example_with_id = [ex for ex in examples if ex.id == example_id][0] + assert example_with_id.inputs["text"] == "hello world" + assert "test_file" in example_with_id.attachments + + # Verify example with outputs and multiple attachments + example_with_outputs = next( + ex + for ex in examples + if ex.outputs and ex.outputs.get("response") == "test response" + ) + assert len(example_with_outputs.attachments) == 2 + assert "file1" in example_with_outputs.attachments + assert "file2" in example_with_outputs.attachments + + # Test uploading to non-existent dataset fails + fake_id = uuid4() + with pytest.raises(LangSmithNotFoundError): + langchain_client.upload_examples_multipart( + dataset_id=fake_id, + uploads=[ + ExampleUploadWithAttachments( + inputs={"text": "should fail"}, + ) + ], + ) + + # Clean up + langchain_client.delete_dataset(dataset_name=dataset_name) + + +def test_upsert_examples_multipart(langchain_client: Client) -> None: + """Test upserting examples with attachments via the multipart endpoint.""" + dataset_name = "__test_upsert_examples_multipart" + uuid4().hex[:4] + if langchain_client.has_dataset(dataset_name=dataset_name): + langchain_client.delete_dataset(dataset_name=dataset_name) + + dataset = langchain_client.create_dataset( + dataset_name, + description="Test dataset for multipart example upload", + data_type=DataType.kv, + ) + + # Test example with all fields + example_id = uuid4() + example_1 = ExampleUpsertWithAttachments( + id=example_id, + dataset_id=dataset.id, + inputs={"text": "hello world"}, + # test without outputs + attachments={ + "test_file": ("text/plain", b"test content"), + }, + ) + # Test example without id + example_2 = ExampleUpsertWithAttachments( + dataset_id=dataset.id, + inputs={"text": "foo bar"}, + outputs={"response": "baz"}, + attachments={ + "my_file": ("text/plain", b"more test content"), + }, + ) + created_examples = langchain_client.upsert_examples_multipart( + upserts=[example_1, example_2] + ) + assert created_examples["count"] == 2 + + created_example_1 = langchain_client.read_example(example_id) + assert created_example_1.inputs["text"] == "hello world" + assert created_example_1.outputs is None + + created_example_2 = langchain_client.read_example( + [id_ for id_ in created_examples["example_ids"] if id_ != str(example_id)][0] + ) + assert created_example_2.inputs["text"] == "foo bar" + assert created_example_2.outputs["response"] == "baz" + + # make sure examples were sent to the correct dataset + all_examples_in_dataset = [ + example for example in langchain_client.list_examples(dataset_id=dataset.id) + ] + assert len(all_examples_in_dataset) == 2 + + example_1_update = ExampleUpsertWithAttachments( + id=example_id, + dataset_id=dataset.id, + inputs={"text": "bar baz"}, + outputs={"response": "foo"}, + attachments={ + "my_file": ("text/plain", b"more test content"), + }, + ) + updated_examples = langchain_client.upsert_examples_multipart( + upserts=[example_1_update] + ) + assert updated_examples["count"] == 0 + # Test that adding invalid example fails + # even if valid examples are added alongside + example_3 = ExampleUpsertWithAttachments( + dataset_id=uuid4(), # not a real dataset + inputs={"text": "foo bar"}, + outputs={"response": "baz"}, + attachments={ + "my_file": ("text/plain", b"more test content"), + }, + ) + + with pytest.raises(LangSmithNotFoundError): + langchain_client.upsert_examples_multipart(upserts=[example_3]) + + all_examples_in_dataset = [ + example for example in langchain_client.list_examples(dataset_id=dataset.id) + ] + assert len(all_examples_in_dataset) == 2 + + # Throw type errors when not passing ExampleUpsertWithAttachments + with pytest.raises(ValueError): + langchain_client.upsert_examples_multipart(upserts=[{"foo": "bar"}]) + langchain_client.delete_dataset(dataset_name=dataset_name) + + def test_create_dataset(langchain_client: Client) -> None: dataset_name = "__test_create_dataset" + uuid4().hex[:4] if langchain_client.has_dataset(dataset_name=dataset_name): @@ -1020,6 +1215,349 @@ def create_encoder(*args, **kwargs): assert not caplog.records +def test_list_examples_attachments_keys(langchain_client: Client) -> None: + """Test list_examples returns same keys with and without attachments.""" + dataset_name = "__test_list_examples_attachments" + uuid4().hex[:4] + dataset = langchain_client.create_dataset(dataset_name=dataset_name) + + langchain_client.upload_examples_multipart( + dataset_id=dataset.id, + uploads=[ + ExampleUploadWithAttachments( + inputs={"text": "hello world"}, + outputs={"response": "hi there"}, + attachments={ + "test_file": ("text/plain", b"test content"), + }, + ) + ], + ) + + # Get examples with attachments + with_attachments = next( + langchain_client.list_examples(dataset_id=dataset.id, include_attachments=True) + ) + + # Get examples without attachments + without_attachments = next( + langchain_client.list_examples(dataset_id=dataset.id, include_attachments=False) + ) + + with_keys = set(with_attachments.dict().keys()) + without_keys = set(without_attachments.dict().keys()) + assert with_keys == without_keys, ( + f"Keys differ when include_attachments=True vs False.\n" + f"Only in with_attachments: {with_keys - without_keys}\n" + f"Only in without_attachments: {without_keys - with_keys}" + ) + + langchain_client.delete_dataset(dataset_id=dataset.id) + + +def test_evaluate_with_attachments(langchain_client: Client) -> None: + """Test evaluating examples with attachments.""" + dataset_name = "__test_evaluate_attachments" + uuid4().hex[:4] + + # 1. Create dataset + dataset = langchain_client.create_dataset( + dataset_name, + description="Test dataset for evals with attachments", + data_type=DataType.kv, + ) + + # 2. Create example with attachments + example = ExampleUploadWithAttachments( + inputs={"question": "What is shown in the image?"}, + outputs={"answer": "test image"}, + attachments={ + "image": ("image/png", b"fake image data for testing"), + }, + ) + + langchain_client.upload_examples_multipart(dataset_id=dataset.id, uploads=[example]) + + def target(inputs: Dict[str, Any], attachments: Dict[str, Any]) -> Dict[str, Any]: + # Verify we receive the attachment data + assert "image" in attachments + assert "presigned_url" in attachments["image"] + image_data = attachments["image"]["reader"] + assert image_data.read() == b"fake image data for testing" + return {"answer": "test image"} + + def evaluator( + outputs: dict, reference_outputs: dict, attachments: dict + ) -> Dict[str, Any]: + assert "image" in attachments + assert "presigned_url" in attachments["image"] + image_data = attachments["image"]["reader"] + assert image_data.read() == b"fake image data for testing" + return { + "score": float( + reference_outputs.get("answer") == outputs.get("answer") # type: ignore + ) + } + + results = langchain_client.evaluate( + target, + data=dataset_name, + evaluators=[evaluator], + num_repetitions=2, + ) + + assert len(results) == 2 + for result in results: + assert result["evaluation_results"]["results"][0].score == 1.0 + + langchain_client.delete_dataset(dataset_name=dataset_name) + + +def test_evaluate_with_attachments_not_in_target(langchain_client: Client) -> None: + """Test evaluating examples with attachments.""" + dataset_name = "__test_evaluate_attachments" + uuid4().hex[:4] + dataset = langchain_client.create_dataset( + dataset_name, + description="Test dataset for evals with attachments", + data_type=DataType.kv, + ) + + example = ExampleUploadWithAttachments( + inputs={"question": "What is shown in the image?"}, + outputs={"answer": "test image"}, + attachments={ + "image": ("image/png", b"fake image data for testing"), + }, + ) + + langchain_client.upload_examples_multipart(dataset_id=dataset.id, uploads=[example]) + + def target(inputs: Dict[str, Any]) -> Dict[str, Any]: + return {"answer": "test image"} + + def evaluator( + outputs: dict, reference_outputs: dict, attachments: dict + ) -> Dict[str, Any]: + assert "image" in attachments + assert "presigned_url" in attachments["image"] + image_data = attachments["image"]["reader"] + assert image_data.read() == b"fake image data for testing" + return { + "score": float( + reference_outputs.get("answer") == outputs.get("answer") # type: ignore + ) + } + + results = langchain_client.evaluate( + target, + data=dataset_name, + evaluators=[evaluator], + num_repetitions=2, + ) + + assert len(results) == 2 + for result in results: + assert result["evaluation_results"]["results"][0].score == 1.0 + + langchain_client.delete_dataset(dataset_name=dataset_name) + + +def test_evaluate_with_no_attachments(langchain_client: Client) -> None: + """Test evaluating examples without attachments using a target with attachments.""" + dataset_name = "__test_evaluate_no_attachments" + uuid4().hex[:4] + dataset = langchain_client.create_dataset( + dataset_name, + description="Test dataset for evals without attachments", + data_type=DataType.kv, + ) + + # Create example using old way, attachments should be set to {} + langchain_client.create_example( + dataset_id=dataset.id, + inputs={"question": "What is 2+2?"}, + outputs={"answer": "4"}, + ) + + # Verify we can create example the new way without attachments + example = ExampleUploadWithAttachments( + inputs={"question": "What is 3+1?"}, + outputs={"answer": "4"}, + ) + langchain_client.upload_examples_multipart(dataset_id=dataset.id, uploads=[example]) + + def target(inputs: Dict[str, Any], attachments: Dict[str, Any]) -> Dict[str, Any]: + # Verify we receive an empty attachments dict + assert isinstance(attachments, dict) + assert len(attachments) == 0 + return {"answer": "4"} + + def evaluator(run: Run, example: Example) -> Dict[str, Any]: + return { + "score": float( + run.outputs.get("answer") == example.outputs.get("answer") # type: ignore + ) + } + + results = evaluate( + target, data=dataset_name, evaluators=[evaluator], client=langchain_client + ) + + assert len(results) == 2 + for result in results: + assert result["evaluation_results"]["results"][0].score == 1.0 + + langchain_client.delete_dataset(dataset_name=dataset_name) + + +async def test_aevaluate_with_attachments(langchain_client: Client) -> None: + """Test evaluating examples with attachments.""" + dataset_name = "__test_aevaluate_attachments" + uuid4().hex[:4] + dataset = langchain_client.create_dataset( + dataset_name, + description="Test dataset for evals with attachments", + data_type=DataType.kv, + ) + + example = ExampleUploadWithAttachments( + inputs={"question": "What is shown in the image?"}, + outputs={"answer": "test image"}, + attachments={ + "image": ("image/png", b"fake image data for testing"), + }, + ) + + langchain_client.upload_examples_multipart(dataset_id=dataset.id, uploads=[example]) + + async def target( + inputs: Dict[str, Any], attachments: Dict[str, Any] + ) -> Dict[str, Any]: + # Verify we receive the attachment data + assert "image" in attachments + assert "presigned_url" in attachments["image"] + image_data = attachments["image"]["reader"] + assert image_data.read() == b"fake image data for testing" + return {"answer": "test image"} + + async def evaluator( + outputs: dict, reference_outputs: dict, attachments: dict + ) -> Dict[str, Any]: + assert "image" in attachments + assert "presigned_url" in attachments["image"] + image_data = attachments["image"]["reader"] + assert image_data.read() == b"fake image data for testing" + return { + "score": float( + reference_outputs.get("answer") == outputs.get("answer") # type: ignore + ) + } + + results = await langchain_client.aevaluate( + target, data=dataset_name, evaluators=[evaluator], num_repetitions=2 + ) + + assert len(results) == 2 + async for result in results: + assert result["evaluation_results"]["results"][0].score == 1.0 + + langchain_client.delete_dataset(dataset_name=dataset_name) + + +async def test_aevaluate_with_attachments_not_in_target( + langchain_client: Client, +) -> None: + """Test evaluating examples with attachments.""" + dataset_name = "__test_aevaluate_attachments" + uuid4().hex[:4] + dataset = langchain_client.create_dataset( + dataset_name, + description="Test dataset for evals with attachments", + data_type=DataType.kv, + ) + + example = ExampleUploadWithAttachments( + inputs={"question": "What is shown in the image?"}, + outputs={"answer": "test image"}, + attachments={ + "image": ("image/png", b"fake image data for testing"), + }, + ) + + langchain_client.upload_examples_multipart(dataset_id=dataset.id, uploads=[example]) + + async def target(inputs: Dict[str, Any]) -> Dict[str, Any]: + # Verify we receive the attachment data + return {"answer": "test image"} + + async def evaluator( + outputs: dict, reference_outputs: dict, attachments: dict + ) -> Dict[str, Any]: + assert "image" in attachments + assert "presigned_url" in attachments["image"] + image_data = attachments["image"]["reader"] + assert image_data.read() == b"fake image data for testing" + return { + "score": float( + reference_outputs.get("answer") == outputs.get("answer") # type: ignore + ) + } + + results = await langchain_client.aevaluate( + target, data=dataset_name, evaluators=[evaluator], num_repetitions=2 + ) + + assert len(results) == 2 + async for result in results: + assert result["evaluation_results"]["results"][0].score == 1.0 + + langchain_client.delete_dataset(dataset_name=dataset_name) + + +async def test_aevaluate_with_no_attachments(langchain_client: Client) -> None: + """Test evaluating examples without attachments using a target with attachments.""" + dataset_name = "__test_aevaluate_no_attachments" + uuid4().hex[:4] + dataset = langchain_client.create_dataset( + dataset_name, + description="Test dataset for evals without attachments", + data_type=DataType.kv, + ) + + # Create example using old way, attachments should be set to {} + langchain_client.create_example( + dataset_id=dataset.id, + inputs={"question": "What is 2+2?"}, + outputs={"answer": "4"}, + ) + + # Verify we can create example the new way without attachments + example = ExampleUploadWithAttachments( + inputs={"question": "What is 3+1?"}, + outputs={"answer": "4"}, + ) + langchain_client.upload_examples_multipart(dataset_id=dataset.id, uploads=[example]) + + async def target( + inputs: Dict[str, Any], attachments: Dict[str, Any] + ) -> Dict[str, Any]: + # Verify we receive an empty attachments dict + assert isinstance(attachments, dict) + assert len(attachments) == 0 + return {"answer": "4"} + + async def evaluator(run: Run, example: Example) -> Dict[str, Any]: + return { + "score": float( + run.outputs.get("answer") == example.outputs.get("answer") # type: ignore + ) + } + + results = await aevaluate( + target, data=dataset_name, evaluators=[evaluator], client=langchain_client + ) + + assert len(results) == 2 + async for result in results: + assert result["evaluation_results"]["results"][0].score == 1.0 + + langchain_client.delete_dataset(dataset_name=dataset_name) + + def test_examples_length_validation(langchain_client: Client) -> None: """Test that mismatched lengths raise ValueError for create and update examples.""" dataset_name = "__test_examples_length_validation" + uuid4().hex[:4] @@ -1059,3 +1597,299 @@ def test_examples_length_validation(langchain_client: Client) -> None: # Clean up langchain_client.delete_dataset(dataset_id=dataset.id) + + +def test_update_example_with_attachments_operations(langchain_client: Client) -> None: + """Test updating an example with attachment operations.""" + dataset_name = "__test_update_example_attachments" + uuid4().hex[:4] + dataset = langchain_client.create_dataset( + dataset_name=dataset_name, + description="Test dataset for updating example attachments", + ) + example_id = uuid4() + # Create example with attachments + example = ExampleUploadWithAttachments( + id=example_id, + inputs={"query": "What's in this image?"}, + outputs={"answer": "A test image"}, + attachments={ + "image1": ("image/png", b"fake image data 1"), + "image2": ("image/png", b"fake image data 2"), + }, + ) + langchain_client.upload_examples_multipart(dataset_id=dataset.id, uploads=[example]) + + # Update example with attachment operations to rename and retain attachments + attachments_operations = AttachmentsOperations( + rename={"image1": "renamed_image"}, + retain=["image2"], # Only keep the renamed image1, drop image2 + ) + + langchain_client.update_example( + example_id=example_id, + attachments_operations=attachments_operations, + ) + + # Verify the update + retrieved_example = langchain_client.read_example( + example_id=example_id, + ) + + # Check that only the renamed attachment exists + assert len(retrieved_example.attachments) == 2 + assert "renamed_image" in retrieved_example.attachments + assert "image2" in retrieved_example.attachments + assert "image1" not in retrieved_example.attachments + assert ( + retrieved_example.attachments["image2"]["reader"].read() == b"fake image data 2" + ) + assert ( + retrieved_example.attachments["renamed_image"]["reader"].read() + == b"fake image data 1" + ) + + # Clean up + langchain_client.delete_dataset(dataset_id=dataset.id) + + +def test_bulk_update_examples_with_attachments_operations( + langchain_client: Client, +) -> None: + """Test bulk updating examples with attachment operations.""" + dataset_name = "__test_bulk_update_attachments" + uuid4().hex[:4] + dataset = langchain_client.create_dataset( + dataset_name=dataset_name, + description="Test dataset for bulk updating example attachments", + ) + + example_id1, example_id2 = uuid4(), uuid4() + # Create two examples with attachments + example1 = ExampleUploadWithAttachments( + id=example_id1, + inputs={"query": "What's in this image?"}, + outputs={"answer": "A test image 1"}, + attachments={ + "image1": ("image/png", b"fake image data 1"), + "extra": ("text/plain", b"extra data"), + }, + ) + example2 = ExampleUploadWithAttachments( + id=example_id2, + inputs={"query": "What's in this image?"}, + outputs={"answer": "A test image 2"}, + attachments={ + "image2": ("image/png", b"fake image data 2"), + "extra": ("text/plain", b"extra data"), + }, + ) + + created_examples = langchain_client.upload_examples_multipart( + dataset_id=dataset.id, + uploads=[example1, example2], + ) + assert len(created_examples["example_ids"]) == 2 + assert str(example_id1) in created_examples["example_ids"] + assert str(example_id2) in created_examples["example_ids"] + + # Update both examples with different attachment operations + attachments_operations = [ + AttachmentsOperations( + rename={"image1": "renamed_image1"}, + ), + AttachmentsOperations(retain=["extra"]), + ] + + langchain_client.update_examples( + example_ids=[example_id1, example_id2], + attachments_operations=attachments_operations, + ) + + # Verify the updates + updated_examples = list( + langchain_client.list_examples( + dataset_id=dataset.id, + example_ids=[example_id1, example_id2], + include_attachments=True, + ) + ) + + updated_example_1 = next(ex for ex in updated_examples if ex.id == example_id1) + updated_example_2 = next(ex for ex in updated_examples if ex.id == example_id2) + # Check first example + assert len(updated_example_1.attachments) == 1 + assert "renamed_image1" in updated_example_1.attachments + assert "extra" not in updated_example_1.attachments + + # Check second example + assert len(updated_example_2.attachments) == 1 + assert "extra" in updated_example_2.attachments + assert "image2" not in updated_example_2.attachments + + # Check attachment data + assert ( + updated_example_1.attachments["renamed_image1"]["reader"].read() + == b"fake image data 1" + ) + assert updated_example_2.attachments["extra"]["reader"].read() == b"extra data" + + # Clean up + langchain_client.delete_dataset(dataset_id=dataset.id) + + +def test_update_examples_multipart(langchain_client: Client) -> None: + """Test updating examples with attachments via multipart endpoint.""" + dataset_name = "__test_update_examples_multipart" + uuid4().hex[:4] + if langchain_client.has_dataset(dataset_name=dataset_name): + langchain_client.delete_dataset(dataset_name=dataset_name) + + dataset = langchain_client.create_dataset( + dataset_name, + description="Test dataset for multipart example updates", + data_type=DataType.kv, + ) + example_ids = [uuid4() for _ in range(2)] + + # First create some examples with attachments + example_1 = ExampleUploadWithAttachments( + id=example_ids[0], + inputs={"text": "hello world"}, + attachments={ + "file1": ("text/plain", b"original content 1"), + "file2": ("text/plain", b"original content 2"), + }, + ) + + example_2 = ExampleUploadWithAttachments( + id=example_ids[1], + inputs={"text": "second example"}, + attachments={ + "file3": ("text/plain", b"original content 3"), + "file4": ("text/plain", b"original content 4"), + }, + ) + + created_examples = langchain_client.upload_examples_multipart( + dataset_id=dataset.id, uploads=[example_1, example_2] + ) + assert created_examples["count"] == 2 + + # Now create update operations + update_1 = ExampleUpdateWithAttachments( + id=example_ids[0], + inputs={"text": "updated hello world"}, + attachments={ + "new_file1": ("text/plain", b"new content 1"), + }, + attachments_operations=AttachmentsOperations( + retain=["file1"], + ), + ) + + update_2 = ExampleUpdateWithAttachments( + id=example_ids[1], + inputs={"text": "updated second example"}, + attachments={ + "new_file2": ("text/plain", b"new content 2"), + }, + attachments_operations=AttachmentsOperations(retain=["file3"]), + ) + + # Test updating multiple examples at once + updated_examples = langchain_client.update_examples_multipart( + dataset_id=dataset.id, updates=[update_1, update_2] + ) + assert updated_examples["count"] == 2 + + # Verify the updates + updated = list( + langchain_client.list_examples( + dataset_id=dataset.id, + include_attachments=True, + ) + ) + + # Verify first example updates + example_1_updated = next(ex for ex in updated if ex.id == example_ids[0]) + assert example_1_updated.inputs["text"] == "updated hello world" + assert "file1" in example_1_updated.attachments + assert "new_file1" in example_1_updated.attachments + assert "file2" not in example_1_updated.attachments + assert ( + example_1_updated.attachments["new_file1"]["reader"].read() == b"new content 1" + ) + assert ( + example_1_updated.attachments["file1"]["reader"].read() == b"original content 1" + ) + + # Verify second example updates + example_2_updated = next(ex for ex in updated if ex.id == example_ids[1]) + assert example_2_updated.inputs["text"] == "updated second example" + assert "file3" in example_2_updated.attachments + assert "new_file2" in example_2_updated.attachments + assert "file4" not in example_2_updated.attachments + assert "file3" in example_2_updated.attachments + assert "new_file2" in example_2_updated.attachments + assert "file4" not in example_2_updated.attachments + assert ( + example_2_updated.attachments["file3"]["reader"].read() == b"original content 3" + ) + assert ( + example_2_updated.attachments["new_file2"]["reader"].read() == b"new content 2" + ) + + # Test updating non-existent example doesn't do anything + response = langchain_client.update_examples_multipart( + dataset_id=dataset.id, + updates=[ + ExampleUpdateWithAttachments( + id=uuid4(), + inputs={"text": "should fail"}, + ) + ], + ) + assert response["count"] == 0 + + # Test new attachments have priority + response = langchain_client.update_examples_multipart( + dataset_id=dataset.id, + updates=[ + ExampleUpdateWithAttachments( + id=example_ids[0], + attachments={ + "renamed_file1": ("text/plain", b"new content 1"), + }, + attachments_operations=AttachmentsOperations( + retain=["renamed_file1"], + ), + ) + ], + ) + assert response["count"] == 1 + example_1_updated = langchain_client.read_example(example_ids[0]) + assert list(example_1_updated.attachments.keys()) == ["renamed_file1"] + assert ( + example_1_updated.attachments["renamed_file1"]["reader"].read() + == b"new content 1" + ) + + # Test new attachments have priority + response = langchain_client.update_examples_multipart( + dataset_id=dataset.id, + updates=[ + ExampleUpdateWithAttachments( + id=example_ids[0], + attachments={ + "foo": ("text/plain", b"new content 1"), + }, + attachments_operations=AttachmentsOperations( + rename={"renamed_file1": "foo"}, + ), + ) + ], + ) + assert response["count"] == 1 + example_1_updated = langchain_client.read_example(example_ids[0]) + assert list(example_1_updated.attachments.keys()) == ["foo"] + + # Clean up + langchain_client.delete_dataset(dataset_id=dataset.id) diff --git a/python/tests/unit_tests/evaluation/test_runner.py b/python/tests/unit_tests/evaluation/test_runner.py index 132af656a..e33d07fd5 100644 --- a/python/tests/unit_tests/evaluation/test_runner.py +++ b/python/tests/unit_tests/evaluation/test_runner.py @@ -5,19 +5,22 @@ import itertools import json import random +import re import sys import time import uuid from datetime import datetime, timezone from threading import Lock -from typing import Callable, List +from typing import Any, Callable, Dict, List, Tuple from unittest import mock from unittest.mock import MagicMock import pytest +from langchain_core.runnables import chain as as_runnable from langsmith import Client, aevaluate, evaluate from langsmith import schemas as ls_schemas +from langsmith.evaluation._runner import _include_attachments from langsmith.evaluation.evaluator import ( _normalize_comparison_evaluator_func, _normalize_evaluator_func, @@ -47,7 +50,9 @@ def request(self, verb: str, endpoint: str, *args, **kwargs): return res elif endpoint == "http://localhost:1984/examples": res = MagicMock() - res.json.return_value = [e.dict() for e in self.ds_examples] + res.json.return_value = [ + e.dict() if not isinstance(e, dict) else e for e in self.ds_examples + ] return res elif endpoint == "http://localhost:1984/sessions": res = {} # type: ignore @@ -137,14 +142,23 @@ def _wait_until(condition: Callable, timeout: int = 8): raise TimeoutError("Condition not met") -def _create_example(idx: int) -> ls_schemas.Example: +def _create_example(idx: int) -> Tuple[ls_schemas.Example, Dict[str, Any]]: + _id = uuid.uuid4() + _created_at = datetime.now(timezone.utc) return ls_schemas.Example( - id=uuid.uuid4(), + id=_id, inputs={"in": idx}, outputs={"answer": idx + 1}, dataset_id="00886375-eb2a-4038-9032-efff60309896", - created_at=datetime.now(timezone.utc), - ) + created_at=_created_at, + ), { + "id": _id, + "dataset_id": "00886375-eb2a-4038-9032-efff60309896", + "created_at": _created_at, + "inputs": {"in": idx}, + "outputs": {"answer": idx + 1}, + "attachment_urls": None, + } @pytest.mark.skipif(sys.version_info < (3, 9), reason="requires python3.9 or higher") @@ -160,10 +174,13 @@ def test_evaluate_results( SPLIT_SIZE = 3 NUM_REPETITIONS = 4 - ds_examples = [_create_example(i) for i in range(10)] + ds_example_responses = [_create_example(i) for i in range(10)] + ds_examples = [e[0] for e in ds_example_responses] dev_split = random.sample(ds_examples, SPLIT_SIZE) tenant_id = str(uuid.uuid4()) - fake_request = FakeRequest(ds_id, ds_name, ds_examples, tenant_id) + fake_request = FakeRequest( + ds_id, ds_name, [e[1] for e in ds_example_responses], tenant_id + ) session.request = fake_request.request client = Client( api_url="http://localhost:1984", @@ -225,6 +242,14 @@ def score_unpacked_inputs_outputs_reference(inputs, outputs, reference_outputs): ordering_of_stuff.append("evaluate") return {"score": reference_outputs["answer"]} + def score_unpacked_inputs_outputs_attachments(inputs, outputs, attachments): + ordering_of_stuff.append("evaluate") + return {"score": outputs["output"]} + + def score_unpacked_outputs(outputs): + ordering_of_stuff.append("evaluate") + return {"score": outputs["output"]} + def eval_float(run, example): ordering_of_stuff.append("evaluate") return 0.2 @@ -253,6 +278,8 @@ def summary_eval_outputs_reference(outputs, reference_outputs): score_value_first, score_unpacked_inputs_outputs, score_unpacked_inputs_outputs_reference, + score_unpacked_inputs_outputs_attachments, + score_unpacked_outputs, eval_float, eval_str, eval_list, @@ -387,7 +414,12 @@ def eval2(x, y, inputs): _normalize_evaluator_func(eval_) with pytest.raises(ValueError, match="Invalid evaluator function."): - evaluate((lambda x: x), data=ds_examples, evaluators=[eval_], client=client) + evaluate( + (lambda inputs: inputs), + data=ds_examples, + evaluators=[eval_], + client=client, + ) def test_evaluate_raises_for_async(): @@ -431,10 +463,13 @@ async def test_aevaluate_results( SPLIT_SIZE = 3 NUM_REPETITIONS = 4 - ds_examples = [_create_example(i) for i in range(10)] + ds_example_responses = [_create_example(i) for i in range(10)] + ds_examples = [e[0] for e in ds_example_responses] dev_split = random.sample(ds_examples, SPLIT_SIZE) tenant_id = str(uuid.uuid4()) - fake_request = FakeRequest(ds_id, ds_name, ds_examples, tenant_id) + fake_request = FakeRequest( + ds_id, ds_name, [e[1] for e in ds_example_responses], tenant_id + ) session.request = fake_request.request client = Client( api_url="http://localhost:1984", @@ -499,6 +534,14 @@ async def score_unpacked_inputs_outputs_reference( ordering_of_stuff.append("evaluate") return {"score": reference_outputs["answer"]} + async def score_unpacked_inputs_outputs_attachments(inputs, outputs, attachments): + ordering_of_stuff.append("evaluate") + return {"score": outputs["output"]} + + async def score_unpacked_outputs(outputs): + ordering_of_stuff.append("evaluate") + return {"score": outputs["output"]} + async def eval_float(run, example): ordering_of_stuff.append("evaluate") return 0.2 @@ -527,6 +570,8 @@ def summary_eval_outputs_reference(outputs, reference_outputs): score_value_first, score_unpacked_inputs_outputs, score_unpacked_inputs_outputs_reference, + score_unpacked_inputs_outputs_attachments, + score_unpacked_outputs, eval_float, eval_str, eval_list, @@ -658,8 +703,8 @@ async def eval2(x, y, inputs): evaluators = [eval1, eval2] - async def atarget(x): - return x + async def atarget(inputs): + return inputs for eval_ in evaluators: with pytest.raises(ValueError, match="Invalid evaluator function."): @@ -676,6 +721,242 @@ async def atarget(x): ) +@as_runnable +def nested_predict(inputs): + return {"output": "Yes"} + + +@as_runnable +def lc_predict(inputs): + return nested_predict.invoke(inputs) + + +async def async_just_inputs(inputs): + return None + + +async def async_just_inputs_with_attachments(inputs, attachments): + return None + + +async def async_extra_args(inputs, attachments, foo="bar"): + return None + + +@pytest.mark.parametrize( + "target,expected,error_msg,is_async", + [ + # Valid cases + (lambda inputs: None, False, None, False), + (lambda inputs, attachments: None, True, None, False), + (async_just_inputs, False, None, True), + (async_just_inputs_with_attachments, True, None, True), + # Invalid parameter names + ( + lambda x, y: None, + None, + re.escape( + "When passing 2 positional arguments, they must be named 'inputs' and " + "'attachments', respectively. Received: ['x', 'y']" + ), + False, + ), + ( + lambda input, attachment: None, + None, + re.escape( + "When passing 2 positional arguments, they must be named 'inputs' and " + "'attachments', respectively. Received: ['input', 'attachment']" + ), + False, + ), + # Too many parameters + ( + lambda inputs, attachments, extra: None, + None, + re.escape( + "Target function must accept at most two arguments without " + "default values: (inputs, attachments)." + ), + False, + ), + # No positional parameters + ( + lambda *, foo="bar": None, + None, + re.escape( + "Target function must accept at least one positional argument (inputs)" + ), + False, + ), + # Mixed positional and keyword + (lambda inputs, *, optional=None: None, False, None, False), + (lambda inputs, attachments, *, optional=None: None, True, None, False), + # Non-callable + ("not_a_function", False, None, False), + # Runnable + (lc_predict.invoke, False, None, False), + # Positional args with defaults + (lambda inputs, attachments, foo="bar": None, True, None, False), + (async_extra_args, True, None, True), + ], +) +def test_include_attachments(target, expected, error_msg, is_async): + """Test the _include_attachments function with various input cases.""" + try: + from langchain_core.runnables import RunnableLambda + except ImportError: + if target == "runnable": + pytest.skip("langchain-core not installed") + return + + if target == "runnable": + target = RunnableLambda(lambda x: x) + expected = False + error_msg = None + + if error_msg is not None: + with pytest.raises(ValueError, match=error_msg): + _include_attachments(target) + else: + result = _include_attachments(target) + assert result == expected + + +def valid_single_supported(inputs, *, optional=None): + return {"score": 1} + + +async def valid_single_supported_async(inputs, *, optional=None): + return {"score": 1} + + +def valid_two_arbitrary(foo, bar, *, optional=None): + return {"score": 1} + + +async def valid_two_arbitrary_async(foo, bar, *, optional=None): + return {"score": 1} + + +def valid_multiple_supported(inputs, outputs, reference_outputs, *, optional=None): + return {"score": 1} + + +async def valid_multiple_supported_async( + inputs, outputs, reference_outputs, *, optional=None +): + return {"score": 1} + + +def invalid_single_unsupported(foo, *, optional=None): + return {"score": 1} + + +async def invalid_single_unsupported_async(foo, *, optional=None): + return {"score": 1} + + +def invalid_three_args(inputs, outputs, foo, *, optional=None): + return {"score": 1} + + +async def invalid_three_args_async(inputs, outputs, foo, *, optional=None): + return {"score": 1} + + +def invalid_no_positional(*, inputs, outputs, optional=None): + return {"score": 1} + + +async def invalid_no_positional_async(*, inputs, outputs, optional=None): + return {"score": 1} + + +# Test cases that should succeed +VALID_EVALUATOR_CASES = [ + (valid_single_supported, False), + (valid_single_supported_async, True), + (valid_two_arbitrary, False), + (valid_two_arbitrary_async, True), + (valid_multiple_supported, False), + (valid_multiple_supported_async, True), +] + +# Test cases that should raise ValueError +INVALID_EVALUATOR_CASES = [ + (invalid_single_unsupported, False), + (invalid_single_unsupported_async, True), + (invalid_three_args, False), + (invalid_three_args_async, True), + (invalid_no_positional, False), + (invalid_no_positional_async, True), +] + + +def target(inputs, attachments): + return {"foo": "bar"} + + +async def atarget(inputs, attachments): + return {"foo": "bar"} + + +@pytest.mark.parametrize("func,is_async", VALID_EVALUATOR_CASES) +def test_normalize_evaluator_func_valid(func, is_async): + """Test _normalize_evaluator_func succeeds.""" + func = _normalize_evaluator_func(func) + session = mock.Mock() + ds_name = "my-dataset" + ds_id = "00886375-eb2a-4038-9032-efff60309896" + + ds_example_responses = [_create_example(i) for i in range(10)] + ds_examples = [e[0] for e in ds_example_responses] + tenant_id = str(uuid.uuid4()) + fake_request = FakeRequest( + ds_id, ds_name, [e[1] for e in ds_example_responses], tenant_id + ) + session.request = fake_request.request + client = Client(api_url="http://localhost:1984", api_key="123", session=session) + client._tenant_id = tenant_id # type: ignore + + if is_async: + asyncio.run( + aevaluate(atarget, data=ds_examples, evaluators=[func], client=client) + ) + else: + evaluate(target, data=ds_examples, evaluators=[func], client=client) + + +@pytest.mark.parametrize("func,is_async", INVALID_EVALUATOR_CASES) +def test_normalize_evaluator_func_invalid(func, is_async): + """Test _normalize_evaluator_func fails correctly.""" + with pytest.raises(ValueError, match="Invalid evaluator function"): + _normalize_evaluator_func(func) + + session = mock.Mock() + ds_name = "my-dataset" + ds_id = "00886375-eb2a-4038-9032-efff60309896" + + ds_example_responses = [_create_example(i) for i in range(10)] + ds_examples = [e[0] for e in ds_example_responses] + tenant_id = str(uuid.uuid4()) + fake_request = FakeRequest( + ds_id, ds_name, [e[1] for e in ds_example_responses], tenant_id + ) + session.request = fake_request.request + client = Client(api_url="http://localhost:1984", api_key="123", session=session) + client._tenant_id = tenant_id # type: ignore + + with pytest.raises(ValueError, match="Invalid evaluator function"): + if is_async: + asyncio.run( + aevaluate(atarget, data=ds_examples, evaluators=[func], client=client) + ) + else: + evaluate(target, data=ds_examples, evaluators=[func], client=client) + + def summary_eval_runs_examples(runs_, examples_): return {"score": len(runs_[0].dotted_order)} diff --git a/python/tests/unit_tests/test_client.py b/python/tests/unit_tests/test_client.py index 902f6f392..939aa9ad2 100644 --- a/python/tests/unit_tests/test_client.py +++ b/python/tests/unit_tests/test_client.py @@ -417,6 +417,97 @@ def test_create_run_mutate( assert outputs == {"messages": ["hi", "there"]} +@mock.patch("langsmith.client.requests.Session") +def test_upsert_examples_multipart(mock_session_cls: mock.Mock) -> None: + """Test that upsert_examples_multipart sends correct multipart data.""" + mock_session = MagicMock() + mock_response = MagicMock() + mock_response.status_code = 200 + mock_session.request.return_value = mock_response + mock_session_cls.return_value = mock_session + + client = Client( + api_url="http://localhost:1984", + api_key="123", + info={"instance_flags": {"examples_multipart_enabled": True}}, + ) + + # Create test data + example_id = uuid.uuid4() + dataset_id = uuid.uuid4() + created_at = datetime(2015, 1, 1, 0, 0, 0) + + example = ls_schemas.ExampleUpsertWithAttachments( + id=example_id, + dataset_id=dataset_id, + created_at=created_at, + inputs={"input": "test input"}, + outputs={"output": "test output"}, + metadata={"meta": "data"}, + split="train", + attachments={ + "file1": ("text/plain", b"test data"), + "file2": ls_schemas.Attachment( + mime_type="application/json", data=b'{"key": "value"}' + ), + }, + ) + client.upsert_examples_multipart(upserts=[example]) + + # Verify the request + assert mock_session.request.call_count == 1 + call_args = mock_session.request.call_args + + assert call_args[0][0] == "POST" + assert call_args[0][1].endswith("/v1/platform/examples/multipart") + + # Parse the multipart data + request_data = call_args[1]["data"] + content_type = call_args[1]["headers"]["Content-Type"] + boundary = parse_options_header(content_type)[1]["boundary"] + + parser = MultipartParser( + io.BytesIO( + request_data + if isinstance(request_data, bytes) + else request_data.to_string() + ), + boundary, + ) + parts = list(parser.parts()) + + # Verify all expected parts are present + expected_parts = { + str(example_id): { + "dataset_id": str(dataset_id), + "created_at": created_at.isoformat(), + "metadata": {"meta": "data"}, + "split": "train", + }, + f"{example_id}.inputs": {"input": "test input"}, + f"{example_id}.outputs": {"output": "test output"}, + f"{example_id}.attachment.file1": "test data", + f"{example_id}.attachment.file2": '{"key": "value"}', + } + + assert len(parts) == len(expected_parts) + + for part in parts: + name = part.name + assert name in expected_parts, f"Unexpected part: {name}" + + if name.endswith(".attachment.file1"): + assert part.value == expected_parts[name] + assert part.headers["Content-Type"] == "text/plain; length=9" + elif name.endswith(".attachment.file2"): + assert part.value == expected_parts[name] + assert part.headers["Content-Type"] == "application/json; length=16" + else: + value = json.loads(part.value) + assert value == expected_parts[name] + assert part.headers["Content-Type"] == "application/json" + + class CallTracker: def __init__(self) -> None: self.counter = 0