Skip to content

Commit

Permalink
Add stream-based implementation of core log consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
cretz committed Nov 1, 2023
1 parent 9630071 commit 83a9bb8
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 2 deletions.
50 changes: 49 additions & 1 deletion core/src/telemetry/log_export.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use futures::channel::mpsc::{channel, Receiver, Sender};
use parking_lot::Mutex;
use ringbuf::{Consumer, HeapRb, Producer};
use std::{collections::HashMap, fmt, sync::Arc, time::SystemTime};
Expand Down Expand Up @@ -141,6 +142,32 @@ impl CoreLogBuffer {
}
}

/// Core log consumer implementation backed by a mpsc channel.
pub struct CoreLogStreamConsumer {
tx: Sender<CoreLog>,
}

impl CoreLogStreamConsumer {
/// Create a stream consumer and stream of logs.
pub fn new(buffer: usize) -> (Self, Receiver<CoreLog>) {
let (tx, rx) = channel(buffer);
(Self { tx }, rx)
}
}

impl CoreLogConsumer for CoreLogStreamConsumer {
fn on_log(&self, log: CoreLog) {
// We will drop messages if we can't send
let _ = self.tx.clone().try_send(log);
}
}

impl fmt::Debug for CoreLogStreamConsumer {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("<stream consumer>")
}
}

struct JsonVisitor<'a>(&'a mut HashMap<String, serde_json::Value>);

impl<'a> tracing::field::Visit for JsonVisitor<'a> {
Expand Down Expand Up @@ -190,7 +217,10 @@ impl<'a> tracing::field::Visit for JsonVisitor<'a> {

#[cfg(test)]
mod tests {
use crate::{telemetry::construct_filter_string, telemetry_init};
use crate::{
telemetry::construct_filter_string, telemetry::CoreLogStreamConsumer, telemetry_init,
};
use futures::stream::StreamExt;
use std::fmt;
use std::sync::{Arc, Mutex};
use temporal_sdk_core_api::telemetry::{
Expand Down Expand Up @@ -272,4 +302,22 @@ mod tests {
write_logs();
assert_logs(consumer.0.lock().unwrap().drain(..).collect());
}

#[tokio::test]
async fn test_push_stream_output() {
let (consumer, stream) = CoreLogStreamConsumer::new(100);
let consumer = Arc::new(consumer);
let opts = TelemetryOptionsBuilder::default()
.logging(Logger::Push {
filter: construct_filter_string(Level::INFO, Level::WARN),
consumer: consumer.clone(),
})
.build()
.unwrap();
let instance = telemetry_init(opts).unwrap();
let _g = tracing::subscriber::set_default(instance.trace_subscriber().unwrap().clone());

write_logs();
assert_logs(stream.ready_chunks(100).next().await.unwrap());
}
}
2 changes: 1 addition & 1 deletion core/src/telemetry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ pub use metrics::{
MetricsCallBuffer,
};

pub use log_export::{CoreLogBuffer, CoreLogBufferedConsumer};
pub use log_export::{CoreLogBuffer, CoreLogBufferedConsumer, CoreLogStreamConsumer};

use crate::telemetry::log_export::CoreLogConsumerLayer;
use crate::telemetry::metrics::PrefixedMetricsMeter;
Expand Down

0 comments on commit 83a9bb8

Please sign in to comment.