From a5702c3a7a057837522462b855e189d326e5b90c Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Thu, 26 Oct 2023 16:30:25 +0800 Subject: [PATCH] prepare data and set global tracing dispatch Signed-off-by: Bugen Zhao --- src/connector/benches/nexmark_integration.rs | 66 +++++++++++--------- 1 file changed, 37 insertions(+), 29 deletions(-) diff --git a/src/connector/benches/nexmark_integration.rs b/src/connector/benches/nexmark_integration.rs index 01a19807753e0..4d470f2236e6e 100644 --- a/src/connector/benches/nexmark_integration.rs +++ b/src/connector/benches/nexmark_integration.rs @@ -12,6 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. +#![feature(lazy_cell)] + +use std::sync::LazyLock; + use criterion::{criterion_group, criterion_main, BatchSize, Criterion}; use futures::{FutureExt, StreamExt, TryStreamExt}; use itertools::Itertools; @@ -27,10 +31,9 @@ use risingwave_connector::source::{ use tracing::Level; use tracing_subscriber::prelude::*; -/// The length of `Vec` to be generated by the data stream. -const BATCH_SIZE: usize = 1024; +const BATCH: LazyLock> = LazyLock::new(make_batch); -fn make_data_stream() -> BoxSourceStream { +fn make_batch() -> Vec { let mut generator = nexmark::EventGenerator::default() .with_type_filter(nexmark::event::EventType::Bid) .map(|e| match e { @@ -47,24 +50,24 @@ fn make_data_stream() -> BoxSourceStream { meta: SourceMeta::Empty, }; - let data_stream = futures::stream::repeat_with(move || { - let messages = generator - .by_ref() - .take(BATCH_SIZE) - .map(|(i, e)| { - let payload = serde_json::to_vec(&e).unwrap(); - SourceMessage { - payload: Some(payload), - offset: i.to_string(), - ..message_base.clone() - } - }) - .collect_vec(); - - Ok(messages) - }); + generator + .by_ref() + .take(1024) + .map(|(i, e)| { + let payload = serde_json::to_vec(&e).unwrap(); + SourceMessage { + payload: Some(payload), + offset: i.to_string(), + ..message_base.clone() + } + }) + .collect_vec() +} - data_stream.boxed() +fn make_data_stream() -> BoxSourceStream { + futures::future::ready(Ok(BATCH.clone())) + .into_stream() + .boxed() } fn make_parser() -> impl ByteStreamSourceParser { @@ -111,7 +114,13 @@ fn bench(c: &mut Criterion) { ) .into(); - // Tracing disabled + // Enable tracing globally. + // + // TODO: we should use `tracing::with_default` to set the dispatch in the scope, + // so that we can compare the performance with/without tracing side by side. + // However, why the global dispatch works much worse than the scoped one. + dispatch.init(); + c.bench_function("parse_nexmark", |b| { b.iter_batched( make_stream_iter, @@ -120,14 +129,13 @@ fn bench(c: &mut Criterion) { ) }); - // Tracing enabled, like production - c.bench_function("parse_nexmark_with_tracing", |b| { - b.iter_batched( - make_stream_iter, - |mut iter| tracing::dispatcher::with_default(&dispatch, || iter.next().unwrap()), - BatchSize::SmallInput, - ) - }); + // c.bench_function("parse_nexmark_with_tracing_scoped", |b| { + // b.iter_batched( + // make_stream_iter, + // |mut iter| tracing::with_default(&dispatch, || iter.next().unwrap()), + // BatchSize::SmallInput, + // ) + // }); } criterion_group!(benches, bench);