diff --git a/core/src/telemetry/log_export.rs b/core/src/telemetry/log_export.rs index d46319615..62e794565 100644 --- a/core/src/telemetry/log_export.rs +++ b/core/src/telemetry/log_export.rs @@ -1,3 +1,4 @@ +use futures::channel::mpsc::{channel, Receiver, Sender}; use parking_lot::Mutex; use ringbuf::{Consumer, HeapRb, Producer}; use std::{collections::HashMap, fmt, sync::Arc, time::SystemTime}; @@ -141,6 +142,32 @@ impl CoreLogBuffer { } } +/// Core log consumer implementation backed by a mpsc channel. +pub struct CoreLogStreamConsumer { + tx: Sender, +} + +impl CoreLogStreamConsumer { + /// Create a stream consumer and stream of logs. + pub fn new(buffer: usize) -> (Self, Receiver) { + let (tx, rx) = channel(buffer); + (Self { tx }, rx) + } +} + +impl CoreLogConsumer for CoreLogStreamConsumer { + fn on_log(&self, log: CoreLog) { + // We will drop messages if we can't send + let _ = self.tx.clone().try_send(log); + } +} + +impl fmt::Debug for CoreLogStreamConsumer { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str("") + } +} + struct JsonVisitor<'a>(&'a mut HashMap); impl<'a> tracing::field::Visit for JsonVisitor<'a> { @@ -190,7 +217,10 @@ impl<'a> tracing::field::Visit for JsonVisitor<'a> { #[cfg(test)] mod tests { - use crate::{telemetry::construct_filter_string, telemetry_init}; + use crate::{ + telemetry::construct_filter_string, telemetry::CoreLogStreamConsumer, telemetry_init, + }; + use futures::stream::StreamExt; use std::fmt; use std::sync::{Arc, Mutex}; use temporal_sdk_core_api::telemetry::{ @@ -272,4 +302,22 @@ mod tests { write_logs(); assert_logs(consumer.0.lock().unwrap().drain(..).collect()); } + + #[tokio::test] + async fn test_push_stream_output() { + let (consumer, stream) = CoreLogStreamConsumer::new(100); + let consumer = Arc::new(consumer); + let opts = TelemetryOptionsBuilder::default() + .logging(Logger::Push { + filter: construct_filter_string(Level::INFO, Level::WARN), + consumer: consumer.clone(), + }) + .build() + .unwrap(); + let instance = telemetry_init(opts).unwrap(); + let _g = tracing::subscriber::set_default(instance.trace_subscriber().unwrap().clone()); + + write_logs(); + assert_logs(stream.ready_chunks(100).next().await.unwrap()); + } } diff --git a/core/src/telemetry/mod.rs b/core/src/telemetry/mod.rs index aad4f38e9..6f3130acd 100644 --- a/core/src/telemetry/mod.rs +++ b/core/src/telemetry/mod.rs @@ -10,7 +10,7 @@ pub use metrics::{ MetricsCallBuffer, }; -pub use log_export::{CoreLogBuffer, CoreLogBufferedConsumer}; +pub use log_export::{CoreLogBuffer, CoreLogBufferedConsumer, CoreLogStreamConsumer}; use crate::telemetry::log_export::CoreLogConsumerLayer; use crate::telemetry::metrics::PrefixedMetricsMeter;