Skip to content

Commit

Permalink
Allow multiple write URLS for runs in SDK (python) (#519)
Browse files Browse the repository at this point in the history
* Allow user to set `LANGSMITH_ENDPOINT` and `LANGSMITH_API_KEY` (in
addition to `LANGCHAIN_ENDPOINT` and `LANGCHAIN_API_KEY`). This priority
will be former > latter > passed-in parameter.
* Allow user to set `LANGSMITH_RUNS_ENDPOINTS` environment variable or
pass in `api_urls` which are a map of api urls to api keys. The first
element will be used for all requests, whereas all elements will be used
for writing (Runs only)

---------

Co-authored-by: William Fu-Hinthorn <[email protected]>
  • Loading branch information
agola11 and hinthornw authored Mar 14, 2024
1 parent d52ce1e commit d1c95c4
Show file tree
Hide file tree
Showing 3 changed files with 165 additions and 65 deletions.
179 changes: 124 additions & 55 deletions python/langsmith/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@
logger = logging.getLogger(__name__)
_urllib3_logger = logging.getLogger("urllib3.connectionpool")

X_API_KEY = "x-api-key"


def _is_localhost(url: str) -> bool:
"""Check if the URL is localhost.
Expand Down Expand Up @@ -287,18 +289,30 @@ def _get_tracing_sampling_rate() -> float | None:
return sampling_rate


def _get_env(var_names: Sequence[str], default: Optional[str] = None) -> Optional[str]:
for var_name in var_names:
var = os.getenv(var_name)
if var is not None:
return var
return default


def _get_api_key(api_key: Optional[str]) -> Optional[str]:
api_key = api_key or os.getenv("LANGSMITH_API_KEY", os.getenv("LANGCHAIN_API_KEY"))
if api_key is None or not api_key.strip():
api_key_ = (
api_key
if api_key is not None
else _get_env(("LANGSMITH_API_KEY", "LANGCHAIN_API_KEY"))
)
if api_key_ is None or not api_key_.strip():
return None
return api_key.strip().strip('"').strip("'")
return api_key_.strip().strip('"').strip("'")


def _get_api_url(api_url: Optional[str]) -> str:
_api_url = api_url or os.getenv(
"LANGSMITH_ENDPOINT",
os.getenv(
"LANGCHAIN_ENDPOINT",
_api_url = api_url or cast(
str,
_get_env(
("LANGSMITH_ENDPOINT", "LANGCHAIN_ENDPOINT"),
"https://api.smith.langchain.com",
),
)
Expand All @@ -307,6 +321,25 @@ def _get_api_url(api_url: Optional[str]) -> str:
return _api_url.strip().strip('"').strip("'").rstrip("/")


def _get_write_api_urls(_write_api_urls: Optional[Dict[str, str]]) -> Dict[str, str]:
_write_api_urls = _write_api_urls or json.loads(
os.getenv("LANGSMITH_RUNS_ENDPOINTS", "{}")
)
processed_write_api_urls = {}
for url, api_key in _write_api_urls.items():
processed_url = url.strip()
if not processed_url:
raise ls_utils.LangSmithUserError(
"LangSmith runs API URL within LANGSMITH_RUNS_ENDPOINTS cannot be empty"
)
processed_url = processed_url.strip().strip('"').strip("'").rstrip("/")
processed_api_key = api_key.strip().strip('"').strip("'")
_validate_api_key_if_hosted(processed_url, processed_api_key)
processed_write_api_urls[processed_url] = processed_api_key

return processed_write_api_urls


def _as_uuid(value: ID_TYPE, var: Optional[str] = None) -> uuid.UUID:
try:
return uuid.UUID(value) if not isinstance(value, uuid.UUID) else value
Expand Down Expand Up @@ -358,6 +391,7 @@ class Client:
"_hide_inputs",
"_hide_outputs",
"_info",
"_write_api_urls",
]

def __init__(
Expand All @@ -373,6 +407,7 @@ def __init__(
hide_inputs: Optional[Union[Callable[[dict], dict], bool]] = None,
hide_outputs: Optional[Union[Callable[[dict], dict], bool]] = None,
info: Optional[Union[dict, ls_schemas.LangSmithInfo]] = None,
api_urls: Optional[Dict[str, str]] = None,
) -> None:
"""Initialize a Client instance.
Expand Down Expand Up @@ -403,17 +438,45 @@ def __init__(
info: Optional[ls_schemas.LangSmithInfo]
The information about the LangSmith API. If not provided, it will
be fetched from the API.
api_urls: Optional[Dict[str, str]]
A dictionary of write API URLs and their corresponding API keys.
Useful for multi-tenant setups. Data is only read from the first
URL in the dictionary. However, ONLY Runs are written (POST and PATCH)
to all URLs in the dictionary. Feedback, sessions, datasets, examples,
annotation queues and evaluation results are only written to the first.
Raises:
------
LangSmithUserError
If the API key is not provided when using the hosted service.
If both api_url and api_urls are provided.
"""
if api_url and api_urls:
raise ls_utils.LangSmithUserError(
"You cannot provide both api_url and api_urls."
)

if (
os.getenv("LANGSMITH_ENDPOINT") or os.getenv("LANGCHAIN_ENDPOINT")
) and os.getenv("LANGSMITH_RUNS_ENDPOINTS"):
raise ls_utils.LangSmithUserError(
"You cannot provide both LANGSMITH_ENDPOINT / LANGCHAIN_ENDPOINT "
"and LANGSMITH_RUNS_ENDPOINTS."
)

self.tracing_sample_rate = _get_tracing_sampling_rate()
self._sampled_post_uuids: set[uuid.UUID] = set()
self.api_key = _get_api_key(api_key)
self.api_url = _get_api_url(api_url)
_validate_api_key_if_hosted(self.api_url, self.api_key)
self._write_api_urls: Mapping[str, Optional[str]] = _get_write_api_urls(
api_urls
)
if self._write_api_urls:
self.api_url = next(iter(self._write_api_urls))
self.api_key: Optional[str] = self._write_api_urls[self.api_url]
else:
self.api_url = _get_api_url(api_url)
self.api_key = _get_api_key(api_key)
_validate_api_key_if_hosted(self.api_url, self.api_key)
self._write_api_urls = {self.api_url: self.api_key}
self.retry_config = retry_config or _default_retry_config()
self.timeout_ms = timeout_ms or 10000
self._web_url = web_url
Expand Down Expand Up @@ -514,7 +577,7 @@ def _headers(self) -> Dict[str, str]:
"""
headers = {"User-Agent": f"langsmith-py/{langsmith.__version__}"}
if self.api_key:
headers["x-api-key"] = self.api_key
headers[X_API_KEY] = self.api_key
return headers

@property
Expand Down Expand Up @@ -1036,21 +1099,23 @@ def create_run(
self._create_run(run_create)

def _create_run(self, run_create: dict):
headers = {
**self._headers,
"Accept": "application/json",
"Content-Type": "application/json",
}
self.request_with_retries(
"post",
f"{self.api_url}/runs",
request_kwargs={
"data": _dumps_json(run_create),
"headers": headers,
"timeout": self.timeout_ms / 1000,
},
to_ignore=(ls_utils.LangSmithConflictError,),
)
for api_url, api_key in self._write_api_urls.items():
headers = {
**self._headers,
"Accept": "application/json",
"Content-Type": "application/json",
X_API_KEY: api_key,
}
self.request_with_retries(
"post",
f"{api_url}/runs",
request_kwargs={
"data": _dumps_json(run_create),
"headers": headers,
"timeout": self.timeout_ms / 1000,
},
to_ignore=(ls_utils.LangSmithConflictError,),
)

def _hide_run_inputs(self, inputs: dict):
if self._hide_inputs is False:
Expand Down Expand Up @@ -1186,22 +1251,24 @@ def handle_429(response: requests.Response, attempt: int) -> bool:
return False

try:
self.request_with_retries(
"post",
f"{self.api_url}/runs/batch",
request_kwargs={
"data": body,
"timeout": self.timeout_ms / 1000,
"headers": {
**self._headers,
"Accept": "application/json",
"Content-Type": "application/json",
for api_url, api_key in self._write_api_urls.items():
self.request_with_retries(
"post",
f"{api_url}/runs/batch",
request_kwargs={
"data": body,
"timeout": self.timeout_ms / 1000,
"headers": {
**self._headers,
"Accept": "application/json",
"Content-Type": "application/json",
X_API_KEY: api_key,
},
},
},
to_ignore=(ls_utils.LangSmithConflictError,),
stop_after_attempt=3,
handle_response=handle_429,
)
to_ignore=(ls_utils.LangSmithConflictError,),
stop_after_attempt=3,
handle_response=handle_429,
)
except Exception as e:
logger.warning(f"Failed to batch ingest runs: {repr(e)}")

Expand Down Expand Up @@ -1275,21 +1342,23 @@ def update_run(
return self._update_run(data)

def _update_run(self, run_update: dict) -> None:
headers = {
**self._headers,
"Accept": "application/json",
"Content-Type": "application/json",
}
for api_url, api_key in self._write_api_urls.items():
headers = {
**self._headers,
"Accept": "application/json",
"Content-Type": "application/json",
X_API_KEY: api_key,
}

self.request_with_retries(
"patch",
f"{self.api_url}/runs/{run_update['id']}",
request_kwargs={
"data": _dumps_json(run_update),
"headers": headers,
"timeout": self.timeout_ms / 1000,
},
)
self.request_with_retries(
"patch",
f"{api_url}/runs/{run_update['id']}",
request_kwargs={
"data": _dumps_json(run_update),
"headers": headers,
"timeout": self.timeout_ms / 1000,
},
)

def _load_child_runs(self, run: ls_schemas.Run) -> ls_schemas.Run:
"""Load child runs for a given run.
Expand Down
5 changes: 3 additions & 2 deletions python/langsmith/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ class RunBase(BaseModel):
"""List of events associated with the run, like
start and end events."""

inputs: dict
inputs: dict = Field(default_factory=dict)
"""Inputs used for the run."""

outputs: Optional[dict] = None
Expand Down Expand Up @@ -301,7 +301,8 @@ def __init__(self, _host_url: Optional[str] = None, **kwargs: Any) -> None:
"""Initialize a Run object."""
if not kwargs.get("trace_id"):
kwargs = {"trace_id": kwargs.get("id"), **kwargs}
super().__init__(**kwargs)
inputs = kwargs.pop("inputs", None) or {}
super().__init__(**kwargs, inputs=inputs)
self._host_url = _host_url
if not self.dotted_order.strip() and not self.parent_run_id:
self.dotted_order = f"{self.start_time.isoformat()}{self.id}"
Expand Down
46 changes: 38 additions & 8 deletions python/tests/unit_tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,15 +99,15 @@ def test_validate_api_url(monkeypatch: pytest.MonkeyPatch) -> None:

def test_validate_api_key(monkeypatch: pytest.MonkeyPatch) -> None:
# Scenario 1: Both LANGCHAIN_API_KEY and LANGSMITH_API_KEY are set,
# but api_key is not
# but api_key is not
monkeypatch.setenv("LANGCHAIN_API_KEY", "env_langchain_api_key")
monkeypatch.setenv("LANGSMITH_API_KEY", "env_langsmith_api_key")

client = Client()
assert client.api_key == "env_langsmith_api_key"

# Scenario 2: Both LANGCHAIN_API_KEY and LANGSMITH_API_KEY are set,
# and api_key is set
# and api_key is set
monkeypatch.setenv("LANGCHAIN_API_KEY", "env_langchain_api_key")
monkeypatch.setenv("LANGSMITH_API_KEY", "env_langsmith_api_key")

Expand All @@ -129,6 +129,36 @@ def test_validate_api_key(monkeypatch: pytest.MonkeyPatch) -> None:
assert client.api_key == "env_langsmith_api_key"


def test_validate_multiple_urls(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setenv("LANGCHAIN_ENDPOINT", "https://api.smith.langchain-endpoint.com")
monkeypatch.setenv("LANGSMITH_ENDPOINT", "https://api.smith.langsmith-endpoint.com")
monkeypatch.setenv("LANGSMITH_RUNS_ENDPOINTS", "{}")

with pytest.raises(ls_utils.LangSmithUserError):
Client()

monkeypatch.undo()
with pytest.raises(ls_utils.LangSmithUserError):
Client(
api_url="https://api.smith.langchain.com",
api_key="123",
api_urls={"https://api.smith.langchain.com": "123"},
)

data = {
"https://api.smith.langsmith-endpoint_1.com": "123",
"https://api.smith.langsmith-endpoint_2.com": "456",
"https://api.smith.langsmith-endpoint_3.com": "789",
}
monkeypatch.delenv("LANGCHAIN_ENDPOINT", raising=False)
monkeypatch.delenv("LANGSMITH_ENDPOINT", raising=False)
monkeypatch.setenv("LANGSMITH_RUNS_ENDPOINTS", json.dumps(data))
client = Client()
assert client._write_api_urls == data
assert client.api_url == "https://api.smith.langsmith-endpoint_1.com"
assert client.api_key == "123"


def test_headers(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.delenv("LANGCHAIN_API_KEY", raising=False)
client = Client(api_url="http://localhost:1984", api_key="123")
Expand Down Expand Up @@ -238,22 +268,22 @@ def test_get_api_key() -> None:


def test_get_api_url() -> None:
assert _get_api_url("http://provided.url", "api_key") == "http://provided.url"
assert _get_api_url("http://provided.url") == "http://provided.url"

with patch.dict(os.environ, {"LANGCHAIN_ENDPOINT": "http://env.url"}):
assert _get_api_url(None, "api_key") == "http://env.url"
assert _get_api_url(None) == "http://env.url"

with patch.dict(os.environ, {}, clear=True):
assert _get_api_url(None, "api_key") == "https://api.smith.langchain.com"
assert _get_api_url(None) == "https://api.smith.langchain.com"

with patch.dict(os.environ, {}, clear=True):
assert _get_api_url(None, None) == "https://api.smith.langchain.com"
assert _get_api_url(None) == "https://api.smith.langchain.com"

with patch.dict(os.environ, {"LANGCHAIN_ENDPOINT": "http://env.url"}):
assert _get_api_url(None, None) == "http://env.url"
assert _get_api_url(None) == "http://env.url"

with pytest.raises(ls_utils.LangSmithUserError):
_get_api_url(" ", "api_key")
_get_api_url(" ")


def test_create_run_unicode() -> None:
Expand Down

0 comments on commit d1c95c4

Please sign in to comment.