From 8f4303d3bb7113a2059954f016bd3ec45e44ae82 Mon Sep 17 00:00:00 2001 From: William FH <13333726+hinthornw@users.noreply.github.com> Date: Thu, 22 Aug 2024 19:02:40 -0700 Subject: [PATCH] [Python] Fix incremental streaming of eval steps (#944) --- python/langsmith/evaluation/_arunner.py | 3 + python/langsmith/evaluation/_runner.py | 5 +- python/langsmith/evaluation/evaluator.py | 2 +- python/langsmith/run_helpers.py | 16 +- python/pyproject.toml | 2 +- .../unit_tests/evaluation/test_runner.py | 319 ++++++++++++++++++ 6 files changed, 340 insertions(+), 7 deletions(-) create mode 100644 python/tests/unit_tests/evaluation/test_runner.py diff --git a/python/langsmith/evaluation/_arunner.py b/python/langsmith/evaluation/_arunner.py index e155d3599..adaaf3061 100644 --- a/python/langsmith/evaluation/_arunner.py +++ b/python/langsmith/evaluation/_arunner.py @@ -329,6 +329,7 @@ async def aevaluate_existing( max_concurrency=max_concurrency, client=client, blocking=blocking, + experiment=project, ) @@ -627,6 +628,7 @@ async def _arun_evaluators( "project_name": "evaluators", "metadata": metadata, "enabled": True, + "client": self.client, } ): run = current_results["run"] @@ -682,6 +684,7 @@ async def _aapply_summary_evaluators( "project_name": "evaluators", "metadata": metadata, "enabled": True, + "client": self.client, } ): for evaluator in summary_evaluators: diff --git a/python/langsmith/evaluation/_runner.py b/python/langsmith/evaluation/_runner.py index 000d516ed..3d229fb69 100644 --- a/python/langsmith/evaluation/_runner.py +++ b/python/langsmith/evaluation/_runner.py @@ -1084,7 +1084,7 @@ def dataset_id(self) -> str: @property def evaluation_results(self) -> Iterable[EvaluationResults]: if self._evaluation_results is None: - return [{"results": []} for _ in self.examples] + return ({"results": []} for _ in self.examples) return self._evaluation_results @property @@ -1256,6 +1256,7 @@ def _run_evaluators( "project_name": "evaluators", "metadata": metadata, "enabled": True, + "client": self.client, } ): run = current_results["run"] @@ -1340,6 +1341,8 @@ def _apply_summary_evaluators( **current_context, "project_name": "evaluators", "metadata": metadata, + "client": self.client, + "enabled": True, } ): for evaluator in summary_evaluators: diff --git a/python/langsmith/evaluation/evaluator.py b/python/langsmith/evaluation/evaluator.py index 47797e646..d0c107842 100644 --- a/python/langsmith/evaluation/evaluator.py +++ b/python/langsmith/evaluation/evaluator.py @@ -328,7 +328,7 @@ def __call__( def __repr__(self) -> str: """Represent the DynamicRunEvaluator object.""" - return f"" + return f"" def run_evaluator( diff --git a/python/langsmith/run_helpers.py b/python/langsmith/run_helpers.py index 05f2534fe..9fd7978b6 100644 --- a/python/langsmith/run_helpers.py +++ b/python/langsmith/run_helpers.py @@ -58,12 +58,14 @@ _TRACING_ENABLED = contextvars.ContextVar[Optional[bool]]( "_TRACING_ENABLED", default=None ) +_CLIENT = contextvars.ContextVar[Optional[ls_client.Client]]("_CLIENT", default=None) _CONTEXT_KEYS: Dict[str, contextvars.ContextVar] = { "parent": _PARENT_RUN_TREE, "project_name": _PROJECT_NAME, "tags": _TAGS, "metadata": _METADATA, "enabled": _TRACING_ENABLED, + "client": _CLIENT, } @@ -83,6 +85,7 @@ def get_tracing_context( "tags": _TAGS.get(), "metadata": _METADATA.get(), "enabled": _TRACING_ENABLED.get(), + "client": _CLIENT.get(), } return {k: context.get(v) for k, v in _CONTEXT_KEYS.items()} @@ -102,6 +105,7 @@ def tracing_context( metadata: Optional[Dict[str, Any]] = None, parent: Optional[Union[run_trees.RunTree, Mapping, str]] = None, enabled: Optional[bool] = None, + client: Optional[ls_client.Client] = None, **kwargs: Any, ) -> Generator[None, None, None]: """Set the tracing context for a block of code. @@ -113,9 +117,11 @@ def tracing_context( parent: The parent run to use for the context. Can be a Run/RunTree object, request headers (for distributed tracing), or the dotted order string. Defaults to None. + client: The client to use for logging the run to LangSmith. Defaults to None, enabled: Whether tracing is enabled. Defaults to None, meaning it will use the current context value or environment variables. + """ if kwargs: # warn @@ -129,7 +135,6 @@ def tracing_context( tags = sorted(set(tags or []) | set(parent_run.tags or [])) metadata = {**parent_run.metadata, **(metadata or {})} enabled = enabled if enabled is not None else current_context.get("enabled") - _set_tracing_context( { "parent": parent_run, @@ -137,6 +142,7 @@ def tracing_context( "tags": tags, "metadata": metadata, "enabled": enabled, + "client": client, } ) try: @@ -829,11 +835,12 @@ def _setup(self) -> run_trees.RunTree: outer_tags = _TAGS.get() outer_metadata = _METADATA.get() + client_ = self.client or self.old_ctx.get("client") parent_run_ = _get_parent_run( { "parent": self.parent, "run_tree": self.run_tree, - "client": self.client, + "client": client_, } ) @@ -870,7 +877,7 @@ def _setup(self) -> run_trees.RunTree: project_name=project_name_ or "default", inputs=self.inputs or {}, tags=tags_, - client=self.client, # type: ignore[arg-type] + client=client_, # type: ignore ) if enabled: @@ -879,6 +886,7 @@ def _setup(self) -> run_trees.RunTree: _METADATA.set(metadata) _PARENT_RUN_TREE.set(self.new_run) _PROJECT_NAME.set(project_name_) + _CLIENT.set(client_) return self.new_run @@ -1248,7 +1256,7 @@ def _setup_run( outer_project = _PROJECT_NAME.get() langsmith_extra = langsmith_extra or LangSmithExtra() name = langsmith_extra.get("name") or container_input.get("name") - client_ = langsmith_extra.get("client", client) + client_ = langsmith_extra.get("client", client) or _CLIENT.get() parent_run_ = _get_parent_run( {**langsmith_extra, "client": client_}, kwargs.get("config") ) diff --git a/python/pyproject.toml b/python/pyproject.toml index 4f4ef8a5f..0547130e9 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "langsmith" -version = "0.1.103" +version = "0.1.104" description = "Client library to connect to the LangSmith LLM Tracing and Evaluation Platform." authors = ["LangChain "] license = "MIT" diff --git a/python/tests/unit_tests/evaluation/test_runner.py b/python/tests/unit_tests/evaluation/test_runner.py new file mode 100644 index 000000000..d264e804e --- /dev/null +++ b/python/tests/unit_tests/evaluation/test_runner.py @@ -0,0 +1,319 @@ +"""Test the eval runner.""" + +import asyncio +import json +import random +import sys +import time +import uuid +from datetime import datetime, timezone +from threading import Lock +from typing import Callable, List +from unittest import mock +from unittest.mock import MagicMock + +import pytest + +from langsmith import evaluate +from langsmith import schemas as ls_schemas +from langsmith.client import Client +from langsmith.evaluation._arunner import aevaluate, aevaluate_existing +from langsmith.evaluation._runner import evaluate_existing + + +class FakeRequest: + def __init__(self, ds_id, ds_name, ds_examples, tenant_id): + self.created_session = None + self.runs = {} + self.should_fail = False + self.ds_id = ds_id + self.ds_name = ds_name + self.ds_examples = ds_examples + self.tenant_id = tenant_id + + def request(self, verb: str, endpoint: str, *args, **kwargs): + if verb == "GET": + if endpoint == "http://localhost:1984/datasets": + res = MagicMock() + res.json.return_value = { + "id": self.ds_id, + "created_at": "2021-09-01T00:00:00Z", + "name": self.ds_name, + } + return res + elif endpoint == "http://localhost:1984/examples": + res = MagicMock() + res.json.return_value = [e.dict() for e in self.ds_examples] + return res + elif endpoint == "http://localhost:1984/sessions": + res = {} # type: ignore + if kwargs["params"]["name"] == self.created_session["name"]: # type: ignore + res = self.created_session # type: ignore + response = MagicMock() + response.json.return_value = res + return response + + else: + self.should_fail = True + raise ValueError(f"Unknown endpoint: {endpoint}") + elif verb == "POST": + if endpoint == "http://localhost:1984/sessions": + self.created_session = json.loads(kwargs["data"]) | { + "tenant_id": self.tenant_id + } + response = MagicMock() + response.json.return_value = self.created_session + return response + elif endpoint == "http://localhost:1984/runs/batch": + loaded_runs = json.loads(kwargs["data"]) + posted = loaded_runs.get("post", []) + patched = loaded_runs.get("patch", []) + for p in posted: + self.runs[p["id"]] = p + for p in patched: + self.runs[p["id"]].update(p) + response = MagicMock() + return response + elif endpoint == "http://localhost:1984/runs/query": + res = MagicMock() + res.json.return_value = { + "runs": [ + r for r in self.runs.values() if "reference_example_id" in r + ] + } + return res + elif endpoint == "http://localhost:1984/feedback": + response = MagicMock() + response.json.return_value = {} + return response + + else: + raise ValueError(f"Unknown endpoint: {endpoint}") + elif verb == "PATCH": + if ( + endpoint + == f"http://localhost:1984/sessions/{self.created_session['id']}" + ): # type: ignore + updates = json.loads(kwargs["data"]) + self.created_session.update({k: v for k, v in updates.items() if v}) # type: ignore + response = MagicMock() + response.json.return_value = self.created_session + return response + else: + self.should_fail = True + raise ValueError(f"Unknown endpoint: {endpoint}") + else: + self.should_fail = True + raise ValueError(f"Unknown verb: {verb}, {endpoint}") + + +def _wait_until(condition: Callable, timeout: int = 5): + start = time.time() + while time.time() - start < timeout: + if condition(): + return + time.sleep(0.1) + raise TimeoutError("Condition not met") + + +@pytest.mark.skipif(sys.version_info < (3, 9), reason="requires python3.9 or higher") +def test_evaluate_results() -> None: + session = mock.Mock() + ds_name = "my-dataset" + ds_id = "00886375-eb2a-4038-9032-efff60309896" + + def _create_example(idx: int) -> ls_schemas.Example: + return ls_schemas.Example( + id=uuid.uuid4(), + inputs={"in": idx}, + outputs={"answer": idx + 1}, + dataset_id=ds_id, + created_at=datetime.now(timezone.utc), + ) + + SPLIT_SIZE = 3 + NUM_REPETITIONS = 4 + ds_examples = [_create_example(i) for i in range(10)] + dev_split = random.sample(ds_examples, SPLIT_SIZE) + tenant_id = str(uuid.uuid4()) + fake_request = FakeRequest(ds_id, ds_name, ds_examples, tenant_id) + session.request = fake_request.request + client = Client( + api_url="http://localhost:1984", + api_key="123", + session=session, + info=ls_schemas.LangSmithInfo( + batch_ingest_config=ls_schemas.BatchIngestConfig( + size_limit_bytes=None, # Note this field is not used here + size_limit=100, + scale_up_nthreads_limit=16, + scale_up_qsize_trigger=1000, + scale_down_nempty_trigger=4, + ) + ), + ) + client._tenant_id = tenant_id # type: ignore + + ordering_of_stuff: List[str] = [] + locked = False + + lock = Lock() + slow_index = None + + def predict(inputs: dict) -> dict: + nonlocal locked + nonlocal slow_index + + if len(ordering_of_stuff) == 3 and not locked: + with lock: + if len(ordering_of_stuff) == 3 and not locked: + locked = True + time.sleep(1) + slow_index = len(ordering_of_stuff) + ordering_of_stuff.append("predict") + else: + ordering_of_stuff.append("predict") + + else: + ordering_of_stuff.append("predict") + return {"output": inputs["in"] + 1} + + def score_value_first(run, example): + ordering_of_stuff.append("evaluate") + return {"score": 0.3} + + evaluate( + predict, + client=client, + data=dev_split, + evaluators=[score_value_first], + num_repetitions=NUM_REPETITIONS, + ) + assert fake_request.created_session + _wait_until(lambda: fake_request.runs) + N_PREDS = SPLIT_SIZE * NUM_REPETITIONS + _wait_until(lambda: len(ordering_of_stuff) == N_PREDS * 2) + _wait_until(lambda: slow_index is not None) + # Want it to be interleaved + assert ordering_of_stuff != ["predict"] * N_PREDS + ["evaluate"] * N_PREDS + + # It's delayed, so it'll be the penultimate event + # Will run all other preds and evals, then this, then the last eval + assert slow_index == (N_PREDS * 2) - 2 + + def score_value(run, example): + return {"score": 0.7} + + ex_results = evaluate_existing( + fake_request.created_session["name"], evaluators=[score_value], client=client + ) + assert len(list(ex_results)) == SPLIT_SIZE * NUM_REPETITIONS + dev_xample_ids = [e.id for e in dev_split] + for r in ex_results: + assert r["example"].id in dev_xample_ids + assert r["evaluation_results"]["results"][0].score == 0.7 + assert r["run"].reference_example_id in dev_xample_ids + assert not fake_request.should_fail + + +@pytest.mark.skipif(sys.version_info < (3, 9), reason="requires python3.9 or higher") +async def test_aevaluate_results() -> None: + session = mock.Mock() + ds_name = "my-dataset" + ds_id = "00886375-eb2a-4038-9032-efff60309896" + + def _create_example(idx: int) -> ls_schemas.Example: + return ls_schemas.Example( + id=uuid.uuid4(), + inputs={"in": idx}, + outputs={"answer": idx + 1}, + dataset_id=ds_id, + created_at=datetime.now(timezone.utc), + ) + + SPLIT_SIZE = 3 + NUM_REPETITIONS = 4 + ds_examples = [_create_example(i) for i in range(10)] + dev_split = random.sample(ds_examples, SPLIT_SIZE) + tenant_id = str(uuid.uuid4()) + fake_request = FakeRequest(ds_id, ds_name, ds_examples, tenant_id) + session.request = fake_request.request + client = Client( + api_url="http://localhost:1984", + api_key="123", + session=session, + info=ls_schemas.LangSmithInfo( + batch_ingest_config=ls_schemas.BatchIngestConfig( + size_limit_bytes=None, # Note this field is not used here + size_limit=100, + scale_up_nthreads_limit=16, + scale_up_qsize_trigger=1000, + scale_down_nempty_trigger=4, + ) + ), + ) + client._tenant_id = tenant_id # type: ignore + + ordering_of_stuff: List[str] = [] + locked = False + + lock = Lock() + slow_index = None + + async def predict(inputs: dict) -> dict: + nonlocal locked + nonlocal slow_index + + if len(ordering_of_stuff) == 3 and not locked: + with lock: + if len(ordering_of_stuff) == 3 and not locked: + locked = True + await asyncio.sleep(1) + slow_index = len(ordering_of_stuff) + ordering_of_stuff.append("predict") + else: + ordering_of_stuff.append("predict") + + else: + ordering_of_stuff.append("predict") + return {"output": inputs["in"] + 1} + + async def score_value_first(run, example): + ordering_of_stuff.append("evaluate") + return {"score": 0.3} + + await aevaluate( + predict, + client=client, + data=dev_split, + evaluators=[score_value_first], + num_repetitions=NUM_REPETITIONS, + ) + assert fake_request.created_session + _wait_until(lambda: fake_request.runs) + N_PREDS = SPLIT_SIZE * NUM_REPETITIONS + _wait_until(lambda: len(ordering_of_stuff) == N_PREDS * 2) + _wait_until(lambda: slow_index is not None) + # Want it to be interleaved + assert ordering_of_stuff != ["predict"] * N_PREDS + ["evaluate"] * N_PREDS + assert slow_index is not None + # It's delayed, so it'll be the penultimate event + # Will run all other preds and evals, then this, then the last eval + assert slow_index == (N_PREDS * 2) - 2 + + assert fake_request.created_session["name"] + + async def score_value(run, example): + return {"score": 0.7} + + ex_results = await aevaluate_existing( + fake_request.created_session["name"], evaluators=[score_value], client=client + ) + all_results = [r async for r in ex_results] + assert len(all_results) == SPLIT_SIZE * NUM_REPETITIONS + dev_xample_ids = [e.id for e in dev_split] + async for r in ex_results: + assert r["example"].id in dev_xample_ids + assert r["evaluation_results"]["results"][0].score == 0.7 + assert r["run"].reference_example_id in dev_xample_ids + assert not fake_request.should_fail