Skip to content

Commit

Permalink
stream instead of read data from buffer
Browse files Browse the repository at this point in the history
  • Loading branch information
angus-langchain committed Dec 10, 2024
1 parent a5cba8d commit c32cb0c
Showing 1 changed file with 21 additions and 11 deletions.
32 changes: 21 additions & 11 deletions python/langsmith/_internal/_background_thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from queue import Empty, Queue
from typing import (
TYPE_CHECKING,
Iterable,
List,
Optional,
Union,
Expand Down Expand Up @@ -94,10 +95,10 @@ def _tracing_thread_drain_queue(


def _tracing_thread_drain_compressed_buffer(
client: Client,
client: "Client",
size_limit: int = 100,
size_limit_bytes: int = 50 * 1024 * 1024
) -> Optional[bytes]:
) -> Optional[Iterable[bytes]]:
with client._buffer_lock:
current_size = client.tracing_queue.tell()

Expand All @@ -111,15 +112,23 @@ def _tracing_thread_drain_compressed_buffer(
client.compressor_writer.close()

client.tracing_queue.seek(0)
data = client.tracing_queue.getvalue()

def data_stream() -> Iterable[bytes]:
chunk_size = 65536
while True:
chunk = client.tracing_queue.read(chunk_size)
if not chunk:
break
yield chunk

# Reinitialize for next batch
client.tracing_queue = io.BytesIO()
client.compressor = zstd.ZstdCompressor()
client.compressor_writer = client.compressor.stream_writer(
client.tracing_queue, closefd=False)
client._run_count = 0
return data

return data_stream()

def _tracing_thread_handle_batch(
client: Client,
Expand Down Expand Up @@ -254,11 +263,11 @@ def keep_thread_active() -> bool:

while keep_thread_active():
try:
result = _tracing_thread_drain_compressed_buffer(
data_stream = _tracing_thread_drain_compressed_buffer(
client, size_limit, size_limit_bytes)
if result is not None:
# Simulating backend call
time.sleep(0.25)
if data_stream is not None:
for chunk in data_stream:
time.sleep(0.150) # Backend call simulation
else:
time.sleep(0.05)
except Exception:
Expand All @@ -267,10 +276,11 @@ def keep_thread_active() -> bool:

# Drain the buffer on exit
try:
final_result = _tracing_thread_drain_compressed_buffer(
final_data_stream = _tracing_thread_drain_compressed_buffer(
client, size_limit=1, size_limit_bytes=1) # Force final drain
if final_result is not None:
time.sleep(0.25) # backend call simulation
if final_data_stream is not None:
for chunk in final_data_stream:
time.sleep(0.150) # Final backend calls
except Exception:
logger.error("Error in final buffer drain", exc_info=True)

Expand Down

0 comments on commit c32cb0c

Please sign in to comment.