From c32cb0cecb417f81e18372af23b6dfcb2ebf8375 Mon Sep 17 00:00:00 2001 From: Angus Jelinek Date: Mon, 9 Dec 2024 19:16:09 -0800 Subject: [PATCH] stream instead of read data from buffer --- .../langsmith/_internal/_background_thread.py | 32 ++++++++++++------- 1 file changed, 21 insertions(+), 11 deletions(-) diff --git a/python/langsmith/_internal/_background_thread.py b/python/langsmith/_internal/_background_thread.py index 0805859e6..d0a385f5a 100644 --- a/python/langsmith/_internal/_background_thread.py +++ b/python/langsmith/_internal/_background_thread.py @@ -10,6 +10,7 @@ from queue import Empty, Queue from typing import ( TYPE_CHECKING, + Iterable, List, Optional, Union, @@ -94,10 +95,10 @@ def _tracing_thread_drain_queue( def _tracing_thread_drain_compressed_buffer( - client: Client, + client: "Client", size_limit: int = 100, size_limit_bytes: int = 50 * 1024 * 1024 -) -> Optional[bytes]: +) -> Optional[Iterable[bytes]]: with client._buffer_lock: current_size = client.tracing_queue.tell() @@ -111,7 +112,14 @@ def _tracing_thread_drain_compressed_buffer( client.compressor_writer.close() client.tracing_queue.seek(0) - data = client.tracing_queue.getvalue() + + def data_stream() -> Iterable[bytes]: + chunk_size = 65536 + while True: + chunk = client.tracing_queue.read(chunk_size) + if not chunk: + break + yield chunk # Reinitialize for next batch client.tracing_queue = io.BytesIO() @@ -119,7 +127,8 @@ def _tracing_thread_drain_compressed_buffer( client.compressor_writer = client.compressor.stream_writer( client.tracing_queue, closefd=False) client._run_count = 0 - return data + + return data_stream() def _tracing_thread_handle_batch( client: Client, @@ -254,11 +263,11 @@ def keep_thread_active() -> bool: while keep_thread_active(): try: - result = _tracing_thread_drain_compressed_buffer( + data_stream = _tracing_thread_drain_compressed_buffer( client, size_limit, size_limit_bytes) - if result is not None: - # Simulating backend call - time.sleep(0.25) + if data_stream is not None: + for chunk in data_stream: + time.sleep(0.150) # Backend call simulation else: time.sleep(0.05) except Exception: @@ -267,10 +276,11 @@ def keep_thread_active() -> bool: # Drain the buffer on exit try: - final_result = _tracing_thread_drain_compressed_buffer( + final_data_stream = _tracing_thread_drain_compressed_buffer( client, size_limit=1, size_limit_bytes=1) # Force final drain - if final_result is not None: - time.sleep(0.25) # backend call simulation + if final_data_stream is not None: + for chunk in final_data_stream: + time.sleep(0.150) # Final backend calls except Exception: logger.error("Error in final buffer drain", exc_info=True)