Skip to content

Commit

Permalink
feat(connector): optionally limit kafka input throughput bytes (#7058)
Browse files Browse the repository at this point in the history
Add a configurable parameter to limit the source throughput in terms of bytes.

This parameter may or may not(so not go into doc and release notes this time) be intended to be used by public users at the moment.

It is convenient for people to test resource utilization/performance for a fixed input throughput.(https://redpanda.com/blog/redpanda-vs-kafka-performance-benchmark#:~:text=3.2%20under%20workloads%20(-,up%20to%201GB/sec,-)%20that%20are%20common)
Otherwise, people have to always generate events by some source generators on the fly to Kafka/Redpanda at exactly the throughput you want, which may not be achievable and depends on many other factors.
Instead, we can pre-generate a lot and then limit the source throughput stably.

Also, it may be a workaround solution for #5214.

Approved-By: tabVersion
  • Loading branch information
lmatz authored Dec 26, 2022
1 parent 569e4ff commit bcbe772
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 3 deletions.
7 changes: 7 additions & 0 deletions src/connector/src/source/kafka/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,13 @@ pub const KAFKA_CONNECTOR: &str = "kafka";

#[derive(Clone, Debug, Deserialize)]
pub struct KafkaProperties {
/// This parameter is not intended to be exposed to users.
/// Since kafka source has no concept of splits and its parallelism
/// is determined by the compute node's parameter, this parameter specifies
/// the bytes per second in total across all the parallelism.
#[serde(rename = "bytes.per.second", alias = "kafka.bytes.per.second")]
pub bytes_per_second: Option<String>,

#[serde(rename = "properties.bootstrap.server", alias = "kafka.brokers")]
pub brokers: String,

Expand Down
38 changes: 35 additions & 3 deletions src/connector/src/source/kafka/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::time::{SystemTime, UNIX_EPOCH};
use std::mem::swap;
use std::time::{Duration, SystemTime, UNIX_EPOCH};

use anyhow::{Context, Result};
use async_trait::async_trait;
Expand All @@ -29,6 +30,7 @@ use crate::source::{BoxSourceStream, Column, ConnectorState, SplitImpl};
pub struct KafkaSplitReader {
consumer: StreamConsumer<DefaultConsumerContext>,
stop_offset: Option<i64>,
bytes_per_second: usize,
}

#[async_trait]
Expand Down Expand Up @@ -94,9 +96,17 @@ impl SplitReader for KafkaSplitReader {
consumer.assign(&tpl)?;
}

let bytes_per_second = match properties.bytes_per_second {
None => usize::MAX,
Some(number) => number
.parse::<usize>()
.expect("bytes.per.second expect usize"),
};

Ok(Self {
consumer,
stop_offset,
bytes_per_second,
})
}

Expand All @@ -112,12 +122,19 @@ impl KafkaSplitReader {
yield Vec::new();
return Ok(());
}
let mut interval = tokio::time::interval(Duration::from_secs(1));
interval.tick().await;
let mut bytes_current_second = 0;
let mut res = Vec::with_capacity(MAX_CHUNK_SIZE);
#[for_await]
'for_outer_loop: for msgs in self.consumer.stream().ready_chunks(MAX_CHUNK_SIZE) {
let mut res = Vec::with_capacity(msgs.len());
for msg in msgs {
let msg = msg?;
let cur_offset = msg.offset();
bytes_current_second += match &msg.payload() {
None => 0,
Some(payload) => payload.len(),
};
res.push(SourceMessage::from(msg));
if let Some(stop_offset) = self.stop_offset {
if cur_offset == stop_offset - 1 {
Expand All @@ -130,8 +147,23 @@ impl KafkaSplitReader {
break 'for_outer_loop;
}
}
// This judgement has to be put in the inner loop as `msgs` can be multiple ones.
if bytes_current_second > self.bytes_per_second {
// swap to make compiler happy
let mut cur = Vec::with_capacity(res.capacity());
swap(&mut cur, &mut res);
yield cur;
interval.tick().await;
bytes_current_second = 0;
res.clear();
}
}
yield res;
let mut cur = Vec::with_capacity(res.capacity());
swap(&mut cur, &mut res);
yield cur;
// don't clear `bytes_current_second` here as it is only related to `.tick()`.
// yield in the outer loop so that we can always guarantee that some messages are read
// every `MAX_CHUNK_SIZE`.
}
}
}

0 comments on commit bcbe772

Please sign in to comment.