Skip to content

Commit

Permalink
Stop streams on drop
Browse files Browse the repository at this point in the history
  • Loading branch information
fwcd committed Jun 1, 2024
1 parent 864eb64 commit ecd9e32
Showing 1 changed file with 18 additions and 3 deletions.
21 changes: 18 additions & 3 deletions lighthouse-client/src/lighthouse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,8 @@ impl<S> Lighthouse<S>
self.perform(&Verb::Unlink, dest_path, src_path).await
}

/// Stops the given stream.
/// Stops the given stream. **Should generally not be called manually**,
/// since streams will automatically be stopped once dropped.
pub async fn stop(&self, path: &[&str]) -> Result<ServerMessage<()>> {
self.perform(&Verb::Stop, path, ()).await
}
Expand All @@ -200,15 +201,29 @@ impl<S> Lighthouse<S>
}

/// Performs a STREAM request to the given path with the given payload.
/// Automatically sends a STOP once dropped.
#[tracing::instrument(skip(self, payload))]
pub async fn stream<P, R>(&self, path: &[&str], payload: P) -> Result<impl Stream<Item = Result<ServerMessage<R>>>>
where
P: Serialize,
R: for<'de> Deserialize<'de> {
let request_id = self.send_request(&Verb::Stream, path, payload).await?;
let stream = self.receive_streaming(request_id).await?;
// TODO: Send STOP once dropped
Ok(stream)
Ok(stream.guard({
// Stop the stream on drop
let this = (*self).clone();
let path: Vec<_> = path.into_iter().map(|s| s.to_string()).collect();
move || {
tokio::spawn(async move {
// TODO: Find a more elegant way to pass the path, ideally without
// converting back and forth between Vec<String>, Vec<&str> and &[&str]
let path: Vec<_> = path.iter().map(|s| &**s).collect();
if let Err(error) = this.stop(&path).await {
error! { ?path, %error, "Could not STOP stream" };
}
});
}
}))
}

/// Sends a request to the given path with the given payload.
Expand Down

0 comments on commit ecd9e32

Please sign in to comment.