Skip to content

Commit

Permalink
prepare data and set global tracing dispatch
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao committed Oct 26, 2023
1 parent 51074ee commit 1ca0a80
Showing 1 changed file with 37 additions and 29 deletions.
66 changes: 37 additions & 29 deletions src/connector/benches/nexmark_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#![feature(lazy_cell)]

use std::sync::LazyLock;

use criterion::{criterion_group, criterion_main, BatchSize, Criterion};
use futures::{FutureExt, StreamExt, TryStreamExt};
use itertools::Itertools;
Expand All @@ -27,10 +31,9 @@ use risingwave_connector::source::{
use tracing::Level;
use tracing_subscriber::prelude::*;

/// The length of `Vec<SourceMessage>` to be generated by the data stream.
const BATCH_SIZE: usize = 1024;
const BATCH: LazyLock<Vec<SourceMessage>> = LazyLock::new(make_batch);

fn make_data_stream() -> BoxSourceStream {
fn make_batch() -> Vec<SourceMessage> {
let mut generator = nexmark::EventGenerator::default()
.with_type_filter(nexmark::event::EventType::Bid)
.map(|e| match e {
Expand All @@ -47,24 +50,24 @@ fn make_data_stream() -> BoxSourceStream {
meta: SourceMeta::Empty,
};

let data_stream = futures::stream::repeat_with(move || {
let messages = generator
.by_ref()
.take(BATCH_SIZE)
.map(|(i, e)| {
let payload = serde_json::to_vec(&e).unwrap();
SourceMessage {
payload: Some(payload),
offset: i.to_string(),
..message_base.clone()
}
})
.collect_vec();

Ok(messages)
});
generator
.by_ref()
.take(1024)
.map(|(i, e)| {
let payload = serde_json::to_vec(&e).unwrap();
SourceMessage {
payload: Some(payload),
offset: i.to_string(),
..message_base.clone()
}
})
.collect_vec()
}

data_stream.boxed()
fn make_data_stream() -> BoxSourceStream {
futures::future::ready(Ok(BATCH.clone()))
.into_stream()
.boxed()
}

fn make_parser() -> impl ByteStreamSourceParser {
Expand Down Expand Up @@ -111,7 +114,13 @@ fn bench(c: &mut Criterion) {
)
.into();

// Tracing disabled
// Enable tracing globally.
//
// TODO: we should use `tracing::with_default` to set the dispatch in the scope,
// so that we can compare the performance with/without tracing side by side.
// However, why the global dispatch performs much worse than the scoped one?
dispatch.init();

c.bench_function("parse_nexmark", |b| {
b.iter_batched(
make_stream_iter,
Expand All @@ -120,14 +129,13 @@ fn bench(c: &mut Criterion) {
)
});

// Tracing enabled, like production
c.bench_function("parse_nexmark_with_tracing", |b| {
b.iter_batched(
make_stream_iter,
|mut iter| tracing::dispatcher::with_default(&dispatch, || iter.next().unwrap()),
BatchSize::SmallInput,
)
});
// c.bench_function("parse_nexmark_with_tracing_scoped", |b| {
// b.iter_batched(
// make_stream_iter,
// |mut iter| tracing::with_default(&dispatch, || iter.next().unwrap()),
// BatchSize::SmallInput,
// )
// });
}

criterion_group!(benches, bench);
Expand Down

0 comments on commit 1ca0a80

Please sign in to comment.