Skip to content

Commit

Permalink
Merge branch 'main' into isaac/multipartstuff
Browse files Browse the repository at this point in the history
  • Loading branch information
agola11 authored Dec 10, 2024
2 parents c841ec6 + 6fc9f3e commit eeeb375
Show file tree
Hide file tree
Showing 6 changed files with 116 additions and 42 deletions.
5 changes: 4 additions & 1 deletion python/langsmith/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1751,7 +1751,10 @@ def update_run(
data["events"] = events
if data["extra"]:
self._insert_runtime_env([data])
if use_multipart and self.tracing_queue is not None:

if self._pyo3_client is not None:
self._pyo3_client.update_run(data)
elif use_multipart and self.tracing_queue is not None:
# not collecting attachments currently, use empty dict
serialized_op = serialize_run_dict(operation="patch", payload=data)
self.tracing_queue.put(
Expand Down
2 changes: 1 addition & 1 deletion rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion rust/crates/langsmith-pyo3/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "langsmith-pyo3"
version = "0.1.0-rc2"
version = "0.1.0-rc4"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
Expand Down
14 changes: 12 additions & 2 deletions rust/crates/langsmith-pyo3/src/blocking_tracing_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,7 @@ impl BlockingTracingClient {
Ok(Self { client: Arc::from(client) })
}

// N.B.: We use `Py<Self>` so that we don't hold the GIL while running this method.
// `slf.get()` below is only valid if the `Self` type is `Sync` and `pyclass(frozen)`,
// N.B.: `slf.get()` below is only valid if the `Self` type is `Sync` and `pyclass(frozen)`,
// which is enforced at compile-time.
pub fn create_run(
slf: &Bound<'_, Self>,
Expand All @@ -59,6 +58,17 @@ impl BlockingTracingClient {
.map_err(|e| into_py_err(slf.py(), e))
}

// N.B.: `slf.get()` below is only valid if the `Self` type is `Sync` and `pyclass(frozen)`,
// which is enforced at compile-time.
pub fn update_run(
slf: &Bound<'_, Self>,
run: super::py_run::RunUpdateExtended,
) -> PyResult<()> {
let unpacked = slf.get();
Python::allow_threads(slf.py(), || unpacked.client.submit_run_update(run.into_inner()))
.map_err(|e| into_py_err(slf.py(), e))
}

pub fn drain(slf: &Bound<'_, Self>) -> PyResult<()> {
let unpacked = slf.get();
Python::allow_threads(slf.py(), || unpacked.client.drain())
Expand Down
121 changes: 90 additions & 31 deletions rust/crates/langsmith-pyo3/src/py_run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,43 +46,43 @@ impl FromPyObject<'_> for RunCreateExtended {
}
}

fn extract_attachments(value: &Bound<'_, PyAny>) -> PyResult<Option<Vec<Attachment>>> {
if value.is_none() {
return Ok(None);
}

let mapping = value.downcast::<PyMapping>()?;
#[derive(Debug)]
pub struct RunUpdateExtended(langsmith_tracing_client::client::RunUpdateExtended);

let size = mapping.len()?;
if size == 0 {
return Ok(None);
impl RunUpdateExtended {
#[inline]
pub(crate) fn into_inner(self) -> langsmith_tracing_client::client::RunUpdateExtended {
self.0
}
}

let mut attachments = Vec::with_capacity(size);

for result in mapping.items()?.iter()? {
let key_value_pair = result?;

let key_item = key_value_pair.get_item(0)?;
let key = key_item.extract::<&str>()?;
impl FromPyObject<'_> for RunUpdateExtended {
fn extract_bound(value: &Bound<'_, PyAny>) -> PyResult<Self> {
let run_update = value.extract::<RunUpdate>()?.into_inner();

// TODO: attachments are WIP at the moment, ignore them here for now.
//
// let attachments = {
// if let Ok(attachments_value) = value.get_item(pyo3::intern!(value.py(), "attachments"))
// {
// extract_attachments(&attachments_value)?
// } else {
// None
// }
// };
let attachments = None;

// Each value in the attachments dict is a (mime_type, bytes) tuple.
let value = key_value_pair.get_item(1)?;
let value_tuple = value.downcast_exact::<PyTuple>()?;
let mime_type_value = value_tuple.get_item(0)?;
let bytes_value = value_tuple.get_item(1)?;
let io = RunIO {
inputs: serialize_optional_dict_value(value, pyo3::intern!(value.py(), "inputs"))?,
outputs: serialize_optional_dict_value(value, pyo3::intern!(value.py(), "outputs"))?,
};

attachments.push(Attachment {
// TODO: It's unclear whether the key in the attachments dict is
// the `filename`` or the `ref_name`, and where the other one is coming from.
ref_name: key.to_string(),
filename: key.to_string(),
data: bytes_value.extract()?,
content_type: mime_type_value.extract()?,
});
Ok(Self(langsmith_tracing_client::client::RunUpdateExtended {
run_update,
io,
attachments,
}))
}

Ok(Some(attachments))
}

#[derive(Debug)]
Expand Down Expand Up @@ -137,6 +137,26 @@ impl FromPyObject<'_> for RunCreate {
}
}

#[derive(Debug)]
pub(crate) struct RunUpdate(langsmith_tracing_client::client::RunUpdate);

impl FromPyObject<'_> for RunUpdate {
fn extract_bound(value: &Bound<'_, PyAny>) -> PyResult<Self> {
let common = RunCommon::extract_bound(value)?.into_inner();

let end_time = extract_time_value(&value.get_item(pyo3::intern!(value.py(), "end_time"))?)?;

Ok(Self(langsmith_tracing_client::client::RunUpdate { common, end_time }))
}
}

impl RunUpdate {
#[inline]
pub(crate) fn into_inner(self) -> langsmith_tracing_client::client::RunUpdate {
self.0
}
}

#[derive(Debug)]
pub(crate) struct RunCommon(langsmith_tracing_client::client::RunCommon);

Expand Down Expand Up @@ -196,6 +216,45 @@ impl FromPyObject<'_> for RunCommon {
}
}

fn extract_attachments(value: &Bound<'_, PyAny>) -> PyResult<Option<Vec<Attachment>>> {
if value.is_none() {
return Ok(None);
}

let mapping = value.downcast::<PyMapping>()?;

let size = mapping.len()?;
if size == 0 {
return Ok(None);
}

let mut attachments = Vec::with_capacity(size);

for result in mapping.items()?.iter()? {
let key_value_pair = result?;

let key_item = key_value_pair.get_item(0)?;
let key = key_item.extract::<&str>()?;

// Each value in the attachments dict is a (mime_type, bytes) tuple.
let value = key_value_pair.get_item(1)?;
let value_tuple = value.downcast_exact::<PyTuple>()?;
let mime_type_value = value_tuple.get_item(0)?;
let bytes_value = value_tuple.get_item(1)?;

attachments.push(Attachment {
// TODO: It's unclear whether the key in the attachments dict is
// the `filename`` or the `ref_name`, and where the other one is coming from.
ref_name: key.to_string(),
filename: key.to_string(),
data: bytes_value.extract()?,
content_type: mime_type_value.extract()?,
});
}

Ok(Some(attachments))
}

/// Get an optional string from a Python `None`, string, or string-like object such as a UUID value.
fn extract_string_like_or_none(value: Option<&Bound<'_, PyAny>>) -> PyResult<Option<String>> {
match value {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@ async fn handle_request(body: Vec<u8>, content_type_str: String) -> Vec<Multipar
assert!(content_type_str.starts_with("multipart/form-data"));

let boundary = content_type_str.split("boundary=").nth(1).unwrap();
let stream = futures::stream::once(
async move { Ok::<_, Box<dyn Error + Send + Sync>>(multer::bytes::Bytes::copy_from_slice(body.as_slice())) }
);
let stream = futures::stream::once(async move {
Ok::<_, Box<dyn Error + Send + Sync>>(multer::bytes::Bytes::copy_from_slice(
body.as_slice(),
))
});
let mut mp = Multipart::new(stream, boundary);

let mut fields = Vec::new();
Expand All @@ -40,9 +42,9 @@ async fn handle_request(body: Vec<u8>, content_type_str: String) -> Vec<Multipar
let field_content_type = field.content_type().map(|ct| ct.to_string());
let field_filename = field.file_name().map(String::from);

let content = String::from_utf8(
field.bytes().await.expect("failed to read field bytes").into(),
).expect("failed to turn field data into string");
let content =
String::from_utf8(field.bytes().await.expect("failed to read field bytes").into())
.expect("failed to turn field data into string");

let multipart_field = MultipartField {
name: field_name,
Expand Down

0 comments on commit eeeb375

Please sign in to comment.