Skip to content

Commit

Permalink
working
Browse files Browse the repository at this point in the history
  • Loading branch information
angus-langchain committed Dec 10, 2024
1 parent 1f1fa6d commit 58a8a13
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 4 deletions.
16 changes: 13 additions & 3 deletions python/langsmith/_internal/_background_thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,15 @@ def _tracing_thread_drain_compressed_buffer(
current_size = client.compressed_runs_buffer.tell()

# Check if we should send now
print("current_size", current_size)
print("run_count", client._run_count)
if not (client._run_count >= size_limit or current_size >= size_limit_bytes):
print("not sending")
return None

# Write final boundary and close compression stream
client.compressor_writer.write(f'--{client.boundary}--\r\n'.encode())
client.compressor_writer.flush()
client.compressor_writer.close()

filled_buffer = client.compressed_runs_buffer
Expand All @@ -128,27 +132,31 @@ def _tracing_thread_drain_compressed_buffer(

# Attempt decompression only if we have data
try:
decompressor = zstd.ZstdDecompressor()
decompressed_data = decompressor.decompress(filled_data)
dctx = zstd.ZstdDecompressor()
with dctx.stream_reader(filled_data) as reader:
decompressed_data = reader.read()

except zstd.ZstdError as e:
logger.warning(
"Error decompressing final drain data: %s. Possibly empty or incomplete frame.",
e
)
# Reinitialize for next batch and return
client.compressed_runs_buffer = io.BytesIO()
client.compressor_writer = zstd.ZstdCompressor(level=3).stream_writer(
client.compressor_writer = zstd.ZstdCompressor(level=3, write_checksum=True, write_content_size=True).stream_writer(
client.compressed_runs_buffer, closefd=False
)
client._run_count = 0
return []

# Parse the multipart form-data if decompression succeeded
boundary = client.boundary
print(decompressed_data)
content_type = f"multipart/form-data; boundary={boundary}"
decoder = MultipartDecoder(decompressed_data, content_type)

# Iterate over the parts and write them to files
print("decoder.parts", decoder.parts)
for part in decoder.parts:
content_disp = part.headers.get(b'Content-Disposition', b'').decode('utf-8', errors='replace')
name_match = re.search(r'name="([^"]+)"', content_disp)
Expand Down Expand Up @@ -308,6 +316,7 @@ def keep_thread_active() -> bool:

while keep_thread_active():
try:
print("drain_compressed_buffer")
data_stream = _tracing_thread_drain_compressed_buffer(
client, size_limit, size_limit_bytes)
if data_stream is not None:
Expand All @@ -321,6 +330,7 @@ def keep_thread_active() -> bool:

# Drain the buffer on exit
try:
print("final_data_stream")
final_data_stream = _tracing_thread_drain_compressed_buffer(
client, size_limit=1, size_limit_bytes=1) # Force final drain
if final_data_stream is not None:
Expand Down
3 changes: 3 additions & 0 deletions python/langsmith/_internal/_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,11 +293,14 @@ def compress_multipart_parts_and_context(
part_header += f'{header_name}: {header_value}\r\n'

part_header += '\r\n'
print(part_header)
compressor_writer.write(part_header.encode())

if isinstance(data, (bytes, bytearray)):
print(data)
compressor_writer.write(data)
else:
print(str(data))
compressor_writer.write(str(data).encode())

# Write part terminator
Expand Down
4 changes: 3 additions & 1 deletion python/langsmith/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1329,6 +1329,8 @@ def create_run(
multipart_form = serialized_run_operation_to_multipart_parts_and_context(
serialized_op)
with self._buffer_lock:
print("create_run")
print(multipart_form.parts)
compress_multipart_parts_and_context(
multipart_form, self.compressor_writer, self.boundary)
self._run_count += 1
Expand Down Expand Up @@ -1804,9 +1806,9 @@ def update_run(
if self.compressed_runs_buffer is not None:
multipart_form = serialized_run_operation_to_multipart_parts_and_context(serialized_op)
with self._buffer_lock:
print("update_run")
compress_multipart_parts_and_context(
multipart_form, self.compressor_writer, self.boundary)
self._run_count += 1
elif self.tracing_queue is not None:
self.tracing_queue.put(
TracingQueueItem(data["dotted_order"], serialized_op)
Expand Down

0 comments on commit 58a8a13

Please sign in to comment.