Skip to content

Commit

Permalink
feat: add kafka backfill frontend
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan committed Feb 20, 2024
1 parent c6cfce2 commit 7708a13
Show file tree
Hide file tree
Showing 42 changed files with 1,169 additions and 286 deletions.
3 changes: 3 additions & 0 deletions .git-blame-ignore-revs
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,6 @@ d70dba827c303373f3220c9733f7c7443e5c2d37

# chore: cargo +nightly fmt (#13162) (format let-chains)
c583e2c6c054764249acf484438c7bf7197765f4

# chore: replace all ProstXxx with PbXxx (#8621)
6fd8821f2e053957b183d648bea9c95b6703941f
69 changes: 35 additions & 34 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 10 additions & 3 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,16 @@ message StreamSourceInfo {
SchemaRegistryNameStrategy name_strategy = 10;
optional string key_message_name = 11;
plan_common.ExternalTableDesc external_table = 12;
// Whether the stream source is a cdc source streaming job.
// We need this field to differentiate the cdc source job until we fully implement risingwavelabs/rfcs#72.
bool cdc_source_job = 13;
// Whether the stream source has a streaming job.
// This is related with [RFC: Reusable Source Executor](https://github.com/risingwavelabs/rfcs/pull/72).
// Currently, the following sources have streaming jobs:
// - Direct CDC sources (mysql & postgresql)
// - MQ sources (Kafka, Pulsar, Kinesis, etc.)
bool has_streaming_job = 13;
// Only used when `has_streaming_job` is `true`.
// If `false`, `requires_singleton` will be set in the stream plan.
bool is_distributed = 15;
reserved "cdc_source_job"; // deprecated
// Options specified by user in the FORMAT ENCODE clause.
map<string, string> format_encode_options = 14;
}
Expand Down
3 changes: 2 additions & 1 deletion src/common/src/monitor/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use prometheus::{
register_int_counter_vec_with_registry, register_int_gauge_vec_with_registry, IntCounter,
IntCounterVec, IntGauge, IntGaugeVec, Registry,
};
use thiserror_ext::AsReport;
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tonic::transport::{Channel, Endpoint};
use tracing::{debug, info, warn};
Expand Down Expand Up @@ -549,7 +550,7 @@ impl<L> tonic::transport::server::Router<L> {
config.tcp_nodelay,
config.keepalive_duration,
)
.unwrap();
.unwrap_or_else(|err| panic!("failed to connect to {listen_addr}: {}", err.as_report()));
let incoming = MonitoredConnection::new(
incoming,
MonitorNewConnectionImpl {
Expand Down
26 changes: 26 additions & 0 deletions src/common/src/util/iter_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,29 @@ where
{
a.into_iter().zip_eq_fast(b)
}

pub trait IntoIteratorExt
where
for<'a> &'a Self: IntoIterator,
{
/// Shorter version of `self.iter().map(f).collect()`.
fn map_collect<A, B, F, BCollection>(&self, f: F) -> BCollection
where
F: FnMut(&A) -> B,
for<'a> &'a Self: IntoIterator<Item = &'a A>,
BCollection: FromIterator<B>,
{
self.into_iter().map(f).collect()
}

/// Shorter version of `self.iter().map(f).collect_vec()`.
fn map_to_vec<A, B, F>(&self, f: F) -> Vec<B>
where
F: FnMut(&A) -> B,
for<'a> &'a Self: IntoIterator<Item = &'a A>,
{
self.map_collect(f)
}
}

impl<T> IntoIteratorExt for T where for<'a> &'a Self: IntoIterator {}
3 changes: 3 additions & 0 deletions src/common/src/util/stream_graph_visitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,9 @@ pub fn visit_stream_node_tables_inner<F>(
always!(source.state_table, "FsFetch");
}
}
NodeBody::SourceBackfill(node) => {
always!(node.state_table, "SourceBackfill")
}

// Sink
NodeBody::Sink(node) => {
Expand Down
Loading

0 comments on commit 7708a13

Please sign in to comment.