Skip to content

Commit

Permalink
Merge branch 'main' into isaac/listexamplesref
Browse files Browse the repository at this point in the history
  • Loading branch information
baskaryan authored Dec 23, 2024
2 parents 514bd9e + ab05ca6 commit 6752f9b
Show file tree
Hide file tree
Showing 8 changed files with 596 additions and 14 deletions.
27 changes: 27 additions & 0 deletions python/docs/templates/zstandard/COPYRIGHT.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
Copyright (c) 2016, Gregory Szorc
All rights reserved.

Redistribution and use in source and binary forms, with or without modification,
are permitted provided that the following conditions are met:

1. Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.

2. Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.

3. Neither the name of the copyright holder nor the names of its contributors
may be used to endorse or promote products derived from this software without
specific prior written permission.

THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR
ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
117 changes: 117 additions & 0 deletions python/langsmith/_internal/_background_thread.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
from __future__ import annotations

import concurrent.futures as cf
import functools
import io
import logging
import sys
import threading
import weakref
from multiprocessing import cpu_count
from queue import Empty, Queue
from typing import (
TYPE_CHECKING,
List,
Optional,
Union,
cast,
)
Expand All @@ -18,6 +22,7 @@
_AUTO_SCALE_DOWN_NEMPTY_TRIGGER,
_AUTO_SCALE_UP_NTHREADS_LIMIT,
_AUTO_SCALE_UP_QSIZE_TRIGGER,
_BOUNDARY,
)
from langsmith._internal._operations import (
SerializedFeedbackOperation,
Expand All @@ -30,6 +35,8 @@

logger = logging.getLogger("langsmith.client")

HTTP_REQUEST_THREAD_POOL = cf.ThreadPoolExecutor(max_workers=cpu_count() * 3)


@functools.total_ordering
class TracingQueueItem:
Expand Down Expand Up @@ -88,6 +95,37 @@ def _tracing_thread_drain_queue(
return next_batch


def _tracing_thread_drain_compressed_buffer(
client: Client, size_limit: int = 100, size_limit_bytes: int | None = 20_971_520
) -> Optional[io.BytesIO]:
assert client.compressed_runs is not None
with client.compressed_runs.lock:
current_size = client.compressed_runs.buffer.tell()

if size_limit is not None and size_limit <= 0:
raise ValueError(f"size_limit must be positive; got {size_limit}")
if size_limit_bytes is not None and size_limit_bytes < 0:
raise ValueError(
f"size_limit_bytes must be nonnegative; got {size_limit_bytes}"
)

if (size_limit_bytes is None or current_size < size_limit_bytes) and (
size_limit is None or client.compressed_runs.run_count < size_limit
):
return None

# Write final boundary and close compression stream
client.compressed_runs.compressor_writer.write(f"--{_BOUNDARY}--\r\n".encode())
client.compressed_runs.compressor_writer.close()

filled_buffer = client.compressed_runs.buffer

client.compressed_runs.reset()

filled_buffer.seek(0)
return filled_buffer


def _tracing_thread_handle_batch(
client: Client,
tracing_queue: Queue,
Expand Down Expand Up @@ -200,6 +238,85 @@ def keep_thread_active() -> bool:
_tracing_thread_handle_batch(client, tracing_queue, next_batch, use_multipart)


def tracing_control_thread_func_compress_parallel(
client_ref: weakref.ref[Client],
) -> None:
client = client_ref()
if client is None:
return

batch_ingest_config = _ensure_ingest_config(client.info)
size_limit: int = batch_ingest_config["size_limit"]
size_limit_bytes = batch_ingest_config.get("size_limit_bytes", 20_971_520)
num_known_refs = 3

def keep_thread_active() -> bool:
# if `client.cleanup()` was called, stop thread
if not client or (
hasattr(client, "_manual_cleanup") and client._manual_cleanup
):
return False
if not threading.main_thread().is_alive():
# main thread is dead. should not be active
return False
if hasattr(sys, "getrefcount"):
# check if client refs count indicates we're the only remaining
# reference to the client

# Count active threads
thread_pool = HTTP_REQUEST_THREAD_POOL._threads
active_count = sum(
1 for thread in thread_pool if thread is not None and thread.is_alive()
)

return sys.getrefcount(client) > num_known_refs + active_count
else:
# in PyPy, there is no sys.getrefcount attribute
# for now, keep thread alive
return True

while True:
triggered = client._data_available_event.wait(timeout=0.05)
if not keep_thread_active():
break
if not triggered:
continue
client._data_available_event.clear()

data_stream = _tracing_thread_drain_compressed_buffer(
client, size_limit, size_limit_bytes
)

if data_stream is not None:
try:
future = HTTP_REQUEST_THREAD_POOL.submit(
client._send_compressed_multipart_req, data_stream
)
client._futures.add(future)
except RuntimeError:
client._send_compressed_multipart_req(data_stream)

# Drain the buffer on exit
try:
final_data_stream = _tracing_thread_drain_compressed_buffer(
client, size_limit=1, size_limit_bytes=1
) # Force final drain
if final_data_stream is not None:
try:
cf.wait(
[
HTTP_REQUEST_THREAD_POOL.submit(
client._send_compressed_multipart_req, final_data_stream
)
]
)
except RuntimeError:
client._send_compressed_multipart_req(final_data_stream)

except Exception:
logger.error("Error in final cleanup", exc_info=True)


def _tracing_sub_thread_func(
client_ref: weakref.ref[Client],
use_multipart: bool,
Expand Down
40 changes: 40 additions & 0 deletions python/langsmith/_internal/_compressed_runs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import io
import threading

try:
from zstandard import ZstdCompressor # type: ignore[import]

HAVE_ZSTD = True
except ImportError:
HAVE_ZSTD = False

from langsmith import utils as ls_utils

compression_level = ls_utils.get_env_var("RUN_COMPRESSION_LEVEL", 3)


class CompressedRuns:
def __init__(self):
self.buffer = io.BytesIO()
self.run_count = 0
self.lock = threading.Lock()
if not HAVE_ZSTD:
raise ImportError(
"zstandard package required for compression. "
"Install with 'pip install langsmith[compression]'"
)
self.compressor_writer = ZstdCompressor(
level=compression_level, threads=-1
).stream_writer(self.buffer, closefd=False)

def reset(self):
self.buffer = io.BytesIO()
self.run_count = 0
if not HAVE_ZSTD:
raise ImportError(
"zstandard package required for compression. "
"Install with 'pip install langsmith[compression]'"
)
self.compressor_writer = ZstdCompressor(
level=compression_level, threads=-1
).stream_writer(self.buffer, closefd=False)
3 changes: 3 additions & 0 deletions python/langsmith/_internal/_constants.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import uuid

_SIZE_LIMIT_BYTES = 20_971_520 # 20MB by default
_AUTO_SCALE_UP_QSIZE_TRIGGER = 200
_AUTO_SCALE_UP_NTHREADS_LIMIT = 32
_AUTO_SCALE_DOWN_NEMPTY_TRIGGER = 4
_BLOCKSIZE_BYTES = 1024 * 1024 # 1MB
_BOUNDARY = uuid.uuid4().hex
41 changes: 41 additions & 0 deletions python/langsmith/_internal/_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,14 @@
import uuid
from typing import Literal, Optional, Union, cast

try:
from zstandard import ZstdCompressionWriter # type: ignore[import]
except ImportError:

class ZstdCompressionWriter: # type: ignore[no-redef]
"""only used for typing checks."""


from langsmith import schemas as ls_schemas
from langsmith._internal import _orjson
from langsmith._internal._multipart import MultipartPart, MultipartPartsAndContext
Expand Down Expand Up @@ -271,3 +279,36 @@ def serialized_run_operation_to_multipart_parts_and_context(
acc_parts,
f"trace={op.trace_id},id={op.id}",
)


def compress_multipart_parts_and_context(
parts_and_context: MultipartPartsAndContext,
compressor_writer: ZstdCompressionWriter,
boundary: str,
) -> None:
for part_name, (filename, data, content_type, headers) in parts_and_context.parts:
header_parts = [
f"--{boundary}\r\n",
f'Content-Disposition: form-data; name="{part_name}"',
]

if filename:
header_parts.append(f'; filename="{filename}"')

header_parts.extend(
[
f"\r\nContent-Type: {content_type}\r\n",
*[f"{k}: {v}\r\n" for k, v in headers.items()],
"\r\n",
]
)

compressor_writer.write("".join(header_parts).encode())

if isinstance(data, (bytes, bytearray)):
compressor_writer.write(data)
else:
compressor_writer.write(str(data).encode())

# Write part terminator
compressor_writer.write(b"\r\n")
Loading

0 comments on commit 6752f9b

Please sign in to comment.