diff --git a/rust/openvasd/src/main.rs b/rust/openvasd/src/main.rs index 01190a5ea..5606a7640 100644 --- a/rust/openvasd/src/main.rs +++ b/rust/openvasd/src/main.rs @@ -53,7 +53,7 @@ async fn main() -> Result<(), Box> { let config = config::Config::load(); let filter = tracing_subscriber::EnvFilter::builder() .with_default_directive(tracing::metadata::LevelFilter::INFO.into()) - .parse_lossy(format!("{},rustls=info", &config.log.level)); + .parse_lossy(format!("{},rustls=info,h2=info", &config.log.level)); tracing::debug!("config: {:?}", config); tracing_subscriber::fmt().with_env_filter(filter).init(); if !config.ospd.socket.exists() { diff --git a/rust/openvasd/src/response.rs b/rust/openvasd/src/response.rs index efc55a340..e02ee86c4 100644 --- a/rust/openvasd/src/response.rs +++ b/rust/openvasd/src/response.rs @@ -176,20 +176,38 @@ impl Response { where T: Iterator> + Send + 'static, { - let (tx, rx) = std::sync::mpsc::sync_channel::(0); + // buffer one extra for fast clients + let (tx, rx) = std::sync::mpsc::sync_channel::(2); // unfortunately we cannot use tokio::spawn as we don't know // if we are running in something that uses TokioExecutor (e.g. http2) // or not (e.g. tests or http1) this deep down. // Therefore we enforce a thread via the OS. thread::spawn(move || { - tx.send(SendState::Start).unwrap(); + let send = |s| match tx.send(s) { + Ok(_) => false, + Err(e) => { + tracing::trace!(%e, "retrieve is not available anymore, ignoring."); + true + } + }; + let span = tracing::debug_span!("ok_byte_stream"); + + let _enter = span.enter(); + tracing::debug!("starting to send values"); + if send(SendState::Start) { + return; + } if let Some(v) = value.next() { - tx.send(SendState::Bytes(true, v)).unwrap(); + if send(SendState::Bytes(true, v)) { + return; + }; } - for v in value { - tx.send(SendState::Bytes(false, v)).unwrap(); + if value.map(|v| send(SendState::Bytes(false, v))).any(|x| x) { + return; } - tx.send(SendState::End).unwrap(); + + send(SendState::End); + tracing::debug!("end send values"); drop(tx); }); self.ok_json_response(BodyKind::BinaryStream(rx))