Skip to content

Commit

Permalink
[Python] Log content-length during connection errors (#972)
Browse files Browse the repository at this point in the history
Also:
- in serialization for the evaluators, by default truncate to a max size
  • Loading branch information
hinthornw authored Sep 4, 2024
1 parent 710c9fe commit e78f394
Show file tree
Hide file tree
Showing 6 changed files with 139 additions and 19 deletions.
88 changes: 80 additions & 8 deletions python/langsmith/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import sys
import threading
import time
import traceback
import typing
import uuid
import warnings
Expand Down Expand Up @@ -656,6 +657,23 @@ def _get_settings(self) -> ls_schemas.LangSmithSettings:

return self._settings

def _content_above_size(self, content_length: Optional[int]) -> Optional[str]:
if content_length is None or self._info is None:
return None
info = cast(ls_schemas.LangSmithInfo, self._info)
bic = info.batch_ingest_config
if not bic:
return None
size_limit = bic.get("size_limit_bytes")
if size_limit is None:
return None
if content_length > size_limit:
return (
f"The content length of {content_length} bytes exceeds the "
f"maximum size limit of {size_limit} bytes."
)
return None

def request_with_retries(
self,
/,
Expand All @@ -667,6 +685,7 @@ def request_with_retries(
retry_on: Optional[Sequence[Type[BaseException]]] = None,
to_ignore: Optional[Sequence[Type[BaseException]]] = None,
handle_response: Optional[Callable[[requests.Response, int], Any]] = None,
_context: str = "",
**kwargs: Any,
) -> requests.Response:
"""Send a request with retries.
Expand Down Expand Up @@ -739,7 +758,6 @@ def request_with_retries(
)
to_ignore_: Tuple[Type[BaseException], ...] = (*(to_ignore or ()),)
response = None

for idx in range(stop_after_attempt):
try:
try:
Expand Down Expand Up @@ -776,22 +794,26 @@ def request_with_retries(
f"Server error caused failure to {method}"
f" {pathname} in"
f" LangSmith API. {repr(e)}"
f"{_context}"
)
elif response.status_code == 429:
raise ls_utils.LangSmithRateLimitError(
f"Rate limit exceeded for {pathname}. {repr(e)}"
f"{_context}"
)
elif response.status_code == 401:
raise ls_utils.LangSmithAuthError(
f"Authentication failed for {pathname}. {repr(e)}"
f"{_context}"
)
elif response.status_code == 404:
raise ls_utils.LangSmithNotFoundError(
f"Resource not found for {pathname}. {repr(e)}"
f"{_context}"
)
elif response.status_code == 409:
raise ls_utils.LangSmithConflictError(
f"Conflict for {pathname}. {repr(e)}"
f"Conflict for {pathname}. {repr(e)}" f"{_context}"
)
else:
raise ls_utils.LangSmithError(
Expand All @@ -806,14 +828,36 @@ def request_with_retries(
)
except requests.ConnectionError as e:
recommendation = (
"Please confirm your LANGCHAIN_ENDPOINT"
"Please confirm your LANGCHAIN_ENDPOINT."
if self.api_url != "https://api.smith.langchain.com"
else "Please confirm your internet connection."
)
try:
content_length = int(
str(e.request.headers.get("Content-Length"))
if e.request
else ""
)
size_rec = self._content_above_size(content_length)
if size_rec:
recommendation = size_rec
except ValueError:
content_length = None

api_key = (
e.request.headers.get("x-api-key") or "" if e.request else ""
)
prefix, suffix = api_key[:5], api_key[-2:]
filler = "*" * (max(0, len(api_key) - 7))
masked_api_key = f"{prefix}{filler}{suffix}"

raise ls_utils.LangSmithConnectionError(
f"Connection error caused failure to {method} {pathname}"
f" in LangSmith API. {recommendation}."
f" in LangSmith API. {recommendation}"
f" {repr(e)}"
f"\nContent-Length: {content_length}"
f"\nAPI Key: {masked_api_key}"
f"{_context}"
) from e
except Exception as e:
args = list(e.args)
Expand All @@ -829,6 +873,7 @@ def request_with_retries(
emsg = msg
raise ls_utils.LangSmithError(
f"Failed to {method} {pathname} in LangSmith API. {emsg}"
f"{_context}"
) from e
except to_ignore_ as e:
if response is not None:
Expand Down Expand Up @@ -1338,21 +1383,42 @@ def batch_ingest_runs(
"post": [_dumps_json(run) for run in raw_body["post"]],
"patch": [_dumps_json(run) for run in raw_body["patch"]],
}
ids = {
"post": [
f"trace={run.get('trace_id')},id={run.get('id')}"
for run in raw_body["post"]
],
"patch": [
f"trace={run.get('trace_id')},id={run.get('id')}"
for run in raw_body["patch"]
],
}

body_chunks: DefaultDict[str, list] = collections.defaultdict(list)
context_ids: DefaultDict[str, list] = collections.defaultdict(list)
body_size = 0
for key in ["post", "patch"]:
body = collections.deque(partial_body[key])
ids_ = collections.deque(ids[key])
while body:
if body_size > 0 and body_size + len(body[0]) > size_limit_bytes:
self._post_batch_ingest_runs(orjson.dumps(body_chunks))
self._post_batch_ingest_runs(
orjson.dumps(body_chunks),
_context=f"\n{key}: {'; '.join(context_ids[key])}",
)
body_size = 0
body_chunks.clear()
context_ids.clear()
body_size += len(body[0])
body_chunks[key].append(orjson.Fragment(body.popleft()))
context_ids[key].append(ids_.popleft())
if body_size:
self._post_batch_ingest_runs(orjson.dumps(body_chunks))
context = "; ".join(f"{k}: {'; '.join(v)}" for k, v in context_ids.items())
self._post_batch_ingest_runs(
orjson.dumps(body_chunks), _context="\n" + context
)

def _post_batch_ingest_runs(self, body: bytes):
def _post_batch_ingest_runs(self, body: bytes, *, _context: str):
for api_url, api_key in self._write_api_urls.items():
try:
self.request_with_retries(
Expand All @@ -1367,9 +1433,15 @@ def _post_batch_ingest_runs(self, body: bytes):
},
to_ignore=(ls_utils.LangSmithConflictError,),
stop_after_attempt=3,
_context=_context,
)
except Exception as e:
logger.warning(f"Failed to batch ingest runs: {repr(e)}")
try:
exc_desc_lines = traceback.format_exception_only(type(e), e)
exc_desc = "".join(exc_desc_lines).rstrip()
logger.warning(f"Failed to batch ingest runs: {exc_desc}")
except Exception:
logger.warning(f"Failed to batch ingest runs: {repr(e)}")

def update_run(
self,
Expand Down
38 changes: 32 additions & 6 deletions python/langsmith/evaluation/evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,9 @@ def __init__(
from langsmith import run_helpers # type: ignore

if afunc is not None:
self.afunc = run_helpers.ensure_traceable(afunc)
self.afunc = run_helpers.ensure_traceable(
afunc, process_inputs=_serialize_inputs
)
self._name = getattr(afunc, "__name__", "DynamicRunEvaluator")
if inspect.iscoroutinefunction(func):
if afunc is not None:
Expand All @@ -205,11 +207,14 @@ def __init__(
"also provided. If providing both, func should be a regular "
"function to avoid ambiguity."
)
self.afunc = run_helpers.ensure_traceable(func)
self.afunc = run_helpers.ensure_traceable(
func, process_inputs=_serialize_inputs
)
self._name = getattr(func, "__name__", "DynamicRunEvaluator")
else:
self.func = run_helpers.ensure_traceable(
cast(Callable[[Run, Optional[Example]], _RUNNABLE_OUTPUT], func)
cast(Callable[[Run, Optional[Example]], _RUNNABLE_OUTPUT], func),
process_inputs=_serialize_inputs,
)
self._name = getattr(func, "__name__", "DynamicRunEvaluator")

Expand Down Expand Up @@ -387,6 +392,22 @@ def run_evaluator(
return DynamicRunEvaluator(func)


_MAXSIZE = 10_000


def _maxsize_repr(obj: Any):
s = repr(obj)
if len(s) > _MAXSIZE:
s = s[: _MAXSIZE - 4] + "...)"
return s


def _serialize_inputs(inputs: dict) -> dict:
run_truncated = _maxsize_repr(inputs.get("run"))
example_truncated = _maxsize_repr(inputs.get("example"))
return {"run": run_truncated, "example": example_truncated}


class DynamicComparisonRunEvaluator:
"""Compare predictions (as traces) from 2 or more runs."""

Expand Down Expand Up @@ -414,7 +435,9 @@ def __init__(
from langsmith import run_helpers # type: ignore

if afunc is not None:
self.afunc = run_helpers.ensure_traceable(afunc)
self.afunc = run_helpers.ensure_traceable(
afunc, process_inputs=_serialize_inputs
)
self._name = getattr(afunc, "__name__", "DynamicRunEvaluator")
if inspect.iscoroutinefunction(func):
if afunc is not None:
Expand All @@ -423,7 +446,9 @@ def __init__(
"also provided. If providing both, func should be a regular "
"function to avoid ambiguity."
)
self.afunc = run_helpers.ensure_traceable(func)
self.afunc = run_helpers.ensure_traceable(
func, process_inputs=_serialize_inputs
)
self._name = getattr(func, "__name__", "DynamicRunEvaluator")
else:
self.func = run_helpers.ensure_traceable(
Expand All @@ -433,7 +458,8 @@ def __init__(
_COMPARISON_OUTPUT,
],
func,
)
),
process_inputs=_serialize_inputs,
)
self._name = getattr(func, "__name__", "DynamicRunEvaluator")

Expand Down
24 changes: 22 additions & 2 deletions python/langsmith/run_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,11 +166,31 @@ def is_traceable_function(
)


def ensure_traceable(func: Callable[P, R]) -> SupportsLangsmithExtra[P, R]:
def ensure_traceable(
func: Callable[P, R],
*,
name: Optional[str] = None,
metadata: Optional[Mapping[str, Any]] = None,
tags: Optional[List[str]] = None,
client: Optional[ls_client.Client] = None,
reduce_fn: Optional[Callable[[Sequence], dict]] = None,
project_name: Optional[str] = None,
process_inputs: Optional[Callable[[dict], dict]] = None,
process_outputs: Optional[Callable[..., dict]] = None,
) -> SupportsLangsmithExtra[P, R]:
"""Ensure that a function is traceable."""
if is_traceable_function(func):
return func
return traceable()(func)
return traceable(
name=name,
metadata=metadata,
tags=tags,
client=client,
reduce_fn=reduce_fn,
project_name=project_name,
process_inputs=process_inputs,
process_outputs=process_outputs,
)(func)


def is_async(func: Callable) -> bool:
Expand Down
2 changes: 1 addition & 1 deletion python/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "langsmith"
version = "0.1.112"
version = "0.1.113"
description = "Client library to connect to the LangSmith LLM Tracing and Evaluation Platform."
authors = ["LangChain <[email protected]>"]
license = "MIT"
Expand Down
3 changes: 2 additions & 1 deletion python/tests/integration_tests/test_async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ async def wait_for(condition, timeout=10):

@pytest.fixture
async def async_client():
client = AsyncClient()
ls_utils.get_env_var.cache_clear()
client = AsyncClient(api_url="https://api.smith.langchain.com")
yield client
await client.aclose()

Expand Down
3 changes: 2 additions & 1 deletion python/tests/integration_tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ def wait_for(

@pytest.fixture
def langchain_client() -> Client:
return Client()
get_env_var.cache_clear()
return Client(api_url="https://api.smith.langchain.com")


def test_datasets(langchain_client: Client) -> None:
Expand Down

0 comments on commit e78f394

Please sign in to comment.