From d5f5d0591482361d8bf1116281abe5dfa9bc453c Mon Sep 17 00:00:00 2001 From: xxchan Date: Mon, 2 Dec 2024 10:22:53 +0800 Subject: [PATCH] refactor(meta): unify DdlType and StreamingJobType Signed-off-by: xxchan --- src/meta/src/barrier/command.rs | 4 +-- src/meta/src/barrier/progress.rs | 10 +++---- src/meta/src/controller/streaming_job.rs | 2 +- src/meta/src/manager/streaming_job.rs | 23 ++++++++-------- src/meta/src/rpc/ddl_controller.rs | 18 ++++++------- src/meta/src/stream/stream_graph/fragment.rs | 28 +++++++++++--------- src/meta/src/stream/stream_manager.rs | 10 ++++--- 7 files changed, 49 insertions(+), 46 deletions(-) diff --git a/src/meta/src/barrier/command.rs b/src/meta/src/barrier/command.rs index 4e7a659c5029d..8e1f20eb63e08 100644 --- a/src/meta/src/barrier/command.rs +++ b/src/meta/src/barrier/command.rs @@ -48,7 +48,7 @@ use crate::barrier::utils::collect_resp_info; use crate::barrier::InflightSubscriptionInfo; use crate::controller::fragment::InflightFragmentInfo; use crate::hummock::{CommitEpochInfo, NewTableFragmentInfo}; -use crate::manager::{DdlType, StreamingJob}; +use crate::manager::{StreamingJob, StreamingJobType}; use crate::model::{ActorId, DispatcherId, FragmentId, StreamJobFragments, TableParallelism}; use crate::stream::{build_actor_connector_splits, SplitAssignment, ThrottleConfig}; @@ -157,7 +157,7 @@ pub struct CreateStreamingJobCommandInfo { pub dispatchers: HashMap>, pub init_split_assignment: SplitAssignment, pub definition: String, - pub ddl_type: DdlType, + pub job_type: StreamingJobType, pub create_type: CreateType, pub streaming_job: StreamingJob, pub internal_tables: Vec, diff --git a/src/meta/src/barrier/progress.rs b/src/meta/src/barrier/progress.rs index aa676c01b3bdb..d59e7fad0ec95 100644 --- a/src/meta/src/barrier/progress.rs +++ b/src/meta/src/barrier/progress.rs @@ -29,7 +29,7 @@ use crate::barrier::info::BarrierInfo; use crate::barrier::{ Command, CreateStreamingJobCommandInfo, CreateStreamingJobType, ReplaceStreamJobPlan, }; -use crate::manager::{DdlType, MetadataManager}; +use crate::manager::{MetadataManager, StreamingJobType}; use crate::model::{ActorId, BackfillUpstreamType, StreamJobFragments}; use crate::MetaResult; @@ -522,14 +522,14 @@ impl CreateMviewProgressTracker { upstream_root_actors, dispatchers, definition, - ddl_type, + job_type, create_type, .. } = &info; let creating_mv_id = table_fragments.stream_job_id(); - let (upstream_mv_count, upstream_total_key_count, ddl_type, create_type) = { + let (upstream_mv_count, upstream_total_key_count, job_type, create_type) = { // Keep track of how many times each upstream MV appears. let mut upstream_mv_count = HashMap::new(); for (table_id, actors) in upstream_root_actors { @@ -547,7 +547,7 @@ impl CreateMviewProgressTracker { ( upstream_mv_count, upstream_total_key_count, - ddl_type, + job_type, create_type, ) }; @@ -562,7 +562,7 @@ impl CreateMviewProgressTracker { upstream_total_key_count, definition.clone(), ); - if *ddl_type == DdlType::Sink && *create_type == CreateType::Background { + if *job_type == StreamingJobType::Sink && *create_type == CreateType::Background { // We return the original tracking job immediately. // This is because sink can be decoupled with backfill progress. // We don't need to wait for sink to finish backfill. diff --git a/src/meta/src/controller/streaming_job.rs b/src/meta/src/controller/streaming_job.rs index 8b6e46dd9ec2f..eb8f79f80e642 100644 --- a/src/meta/src/controller/streaming_job.rs +++ b/src/meta/src/controller/streaming_job.rs @@ -1209,7 +1209,7 @@ impl CatalogController { // 4. update catalogs and notify. let mut relations = vec![]; match job_type { - StreamingJobType::Table => { + StreamingJobType::Table(_) => { let (table, table_obj) = Table::find_by_id(original_job_id) .find_also_related(Object) .one(txn) diff --git a/src/meta/src/manager/streaming_job.rs b/src/meta/src/manager/streaming_job.rs index 6d808814796c7..3dbf1fc7bf053 100644 --- a/src/meta/src/manager/streaming_job.rs +++ b/src/meta/src/manager/streaming_job.rs @@ -24,7 +24,7 @@ use risingwave_pb::catalog::{CreateType, Index, PbSource, Sink, Table}; use risingwave_pb::ddl_service::TableJobType; use sea_orm::entity::prelude::*; use sea_orm::{DatabaseTransaction, QuerySelect}; -use strum::{EnumDiscriminants, EnumIs}; +use strum::EnumIs; use super::{ get_referred_connection_ids_from_sink, get_referred_connection_ids_from_source, @@ -35,8 +35,7 @@ use crate::{MetaError, MetaResult}; // This enum is used in order to re-use code in `DdlServiceImpl` for creating MaterializedView and // Sink. -#[derive(Debug, Clone, EnumDiscriminants, EnumIs)] -#[strum_discriminants(name(StreamingJobType))] +#[derive(Debug, Clone, EnumIs)] pub enum StreamingJob { MaterializedView(Table), Sink(Sink, Option<(Table, Option)>), @@ -46,7 +45,7 @@ pub enum StreamingJob { } #[derive(Debug, Clone, Copy, PartialEq)] -pub enum DdlType { +pub enum StreamingJobType { MaterializedView, Sink, Table(TableJobType), @@ -54,25 +53,25 @@ pub enum DdlType { Source, } -impl From<&StreamingJob> for DdlType { +impl From<&StreamingJob> for StreamingJobType { fn from(job: &StreamingJob) -> Self { match job { - StreamingJob::MaterializedView(_) => DdlType::MaterializedView, - StreamingJob::Sink(_, _) => DdlType::Sink, - StreamingJob::Table(_, _, ty) => DdlType::Table(*ty), - StreamingJob::Index(_, _) => DdlType::Index, - StreamingJob::Source(_) => DdlType::Source, + StreamingJob::MaterializedView(_) => StreamingJobType::MaterializedView, + StreamingJob::Sink(_, _) => StreamingJobType::Sink, + StreamingJob::Table(_, _, ty) => StreamingJobType::Table(*ty), + StreamingJob::Index(_, _) => StreamingJobType::Index, + StreamingJob::Source(_) => StreamingJobType::Source, } } } #[cfg(test)] #[allow(clippy::derivable_impls)] -impl Default for DdlType { +impl Default for StreamingJobType { fn default() -> Self { // This should not be used by mock services, // so we can just pick an arbitrary default variant. - DdlType::MaterializedView + StreamingJobType::MaterializedView } } diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 7faf8aa2eab99..e56598839bf25 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -70,8 +70,8 @@ use crate::controller::cluster::StreamingClusterInfo; use crate::controller::streaming_job::SinkIntoTableContext; use crate::error::{bail_invalid_parameter, bail_unavailable}; use crate::manager::{ - DdlType, LocalNotification, MetaSrvEnv, MetadataManager, NotificationVersion, StreamingJob, - IGNORED_NOTIFICATION_VERSION, + LocalNotification, MetaSrvEnv, MetadataManager, NotificationVersion, StreamingJob, + StreamingJobType, IGNORED_NOTIFICATION_VERSION, }; use crate::model::{StreamContext, StreamJobFragments, TableParallelism}; use crate::stream::{ @@ -1709,7 +1709,7 @@ impl DdlController { definition: stream_job.definition(), mv_table_id: stream_job.mv_table(), create_type: stream_job.create_type(), - ddl_type: (&stream_job).into(), + job_type: (&stream_job).into(), streaming_job: stream_job, replace_table_job_info, option: CreateStreamingJobOption {}, @@ -1753,11 +1753,11 @@ impl DdlController { .mview_fragment() .expect("mview fragment not found"); - let ddl_type = DdlType::from(stream_job); - let DdlType::Table(table_job_type) = &ddl_type else { + let job_type = StreamingJobType::from(stream_job); + let StreamingJobType::Table(table_job_type) = &job_type else { bail!( - "only support replacing table streaming job, ddl_type: {:?}", - ddl_type + "only support replacing table streaming job, job_type: {:?}", + job_type ) }; @@ -1789,7 +1789,7 @@ impl DdlController { original_table_fragment.fragment_id, downstream_fragments, downstream_actor_location, - ddl_type, + job_type, )?, TableJobType::SharedCdcSource => { @@ -1806,7 +1806,7 @@ impl DdlController { original_table_fragment.fragment_id, downstream_fragments, downstream_actor_location, - ddl_type, + job_type, )? } TableJobType::Unspecified => { diff --git a/src/meta/src/stream/stream_graph/fragment.rs b/src/meta/src/stream/stream_graph/fragment.rs index 210c9c157cdde..2b8a10eed79f3 100644 --- a/src/meta/src/stream/stream_graph/fragment.rs +++ b/src/meta/src/stream/stream_graph/fragment.rs @@ -42,7 +42,7 @@ use risingwave_pb::stream_plan::{ }; use crate::barrier::SnapshotBackfillInfo; -use crate::manager::{DdlType, MetaSrvEnv, StreamingJob}; +use crate::manager::{MetaSrvEnv, StreamingJob, StreamingJobType}; use crate::model::{ActorId, FragmentId}; use crate::stream::stream_graph::id::{GlobalFragmentId, GlobalFragmentIdGen, GlobalTableIdGen}; use crate::stream::stream_graph::schedule::Distribution; @@ -714,7 +714,7 @@ impl CompleteStreamFragmentGraph { graph: StreamFragmentGraph, upstream_root_fragments: HashMap, existing_actor_location: HashMap, - ddl_type: DdlType, + job_type: StreamingJobType, ) -> MetaResult { Self::build_helper( graph, @@ -723,7 +723,7 @@ impl CompleteStreamFragmentGraph { upstream_actor_location: existing_actor_location, }), None, - ddl_type, + job_type, ) } @@ -734,7 +734,7 @@ impl CompleteStreamFragmentGraph { original_table_fragment_id: FragmentId, downstream_fragments: Vec<(DispatchStrategy, Fragment)>, existing_actor_location: HashMap, - ddl_type: DdlType, + job_type: StreamingJobType, ) -> MetaResult { Self::build_helper( graph, @@ -744,7 +744,7 @@ impl CompleteStreamFragmentGraph { downstream_fragments, downstream_actor_location: existing_actor_location, }), - ddl_type, + job_type, ) } @@ -756,7 +756,7 @@ impl CompleteStreamFragmentGraph { original_table_fragment_id: FragmentId, downstream_fragments: Vec<(DispatchStrategy, Fragment)>, downstream_actor_location: HashMap, - ddl_type: DdlType, + job_type: StreamingJobType, ) -> MetaResult { Self::build_helper( graph, @@ -769,7 +769,7 @@ impl CompleteStreamFragmentGraph { downstream_fragments, downstream_actor_location, }), - ddl_type, + job_type, ) } @@ -778,7 +778,7 @@ impl CompleteStreamFragmentGraph { mut graph: StreamFragmentGraph, upstream_ctx: Option, downstream_ctx: Option, - ddl_type: DdlType, + job_type: StreamingJobType, ) -> MetaResult { let mut extra_downstreams = HashMap::new(); let mut extra_upstreams = HashMap::new(); @@ -794,8 +794,8 @@ impl CompleteStreamFragmentGraph { for (&id, fragment) in &mut graph.fragments { let uses_shuffled_backfill = fragment.has_shuffled_backfill(); for (&upstream_table_id, output_columns) in &fragment.upstream_table_columns { - let (up_fragment_id, edge) = match ddl_type { - DdlType::Table(TableJobType::SharedCdcSource) => { + let (up_fragment_id, edge) = match job_type { + StreamingJobType::Table(TableJobType::SharedCdcSource) => { let source_fragment = upstream_root_fragments .get(&upstream_table_id) .context("upstream source fragment not found")?; @@ -831,7 +831,9 @@ impl CompleteStreamFragmentGraph { (source_job_id, edge) } - DdlType::MaterializedView | DdlType::Sink | DdlType::Index => { + StreamingJobType::MaterializedView + | StreamingJobType::Sink + | StreamingJobType::Index => { // handle MV on MV/Source // Build the extra edges between the upstream `Materialize` and the downstream `StreamScan` @@ -927,8 +929,8 @@ impl CompleteStreamFragmentGraph { bail!("the upstream fragment should be a MView or Source, got fragment type: {:b}", upstream_fragment.fragment_type_mask) } } - DdlType::Source | DdlType::Table(_) => { - bail!("the streaming job shouldn't have an upstream fragment, ddl_type: {:?}", ddl_type) + StreamingJobType::Source | StreamingJobType::Table(_) => { + bail!("the streaming job shouldn't have an upstream fragment, job_type: {:?}", job_type) } }; diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index 3a0abf9bc665c..5f52e1a79a708 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -34,7 +34,9 @@ use crate::barrier::{ ReplaceStreamJobPlan, SnapshotBackfillInfo, }; use crate::error::bail_invalid_parameter; -use crate::manager::{DdlType, MetaSrvEnv, MetadataManager, NotificationVersion, StreamingJob}; +use crate::manager::{ + MetaSrvEnv, MetadataManager, NotificationVersion, StreamingJob, StreamingJobType, +}; use crate::model::{ActorId, FragmentId, StreamJobFragments, TableParallelism}; use crate::stream::SourceManagerRef; use crate::{MetaError, MetaResult}; @@ -74,7 +76,7 @@ pub struct CreateStreamingJobContext { pub create_type: CreateType, - pub ddl_type: DdlType, + pub job_type: StreamingJobType, /// Context provided for potential replace table, typically used when sinking into a table. pub replace_table_job_info: Option<(StreamingJob, ReplaceStreamJobContext, StreamJobFragments)>, @@ -334,7 +336,7 @@ impl GlobalStreamManager { upstream_root_actors, definition, create_type, - ddl_type, + job_type, replace_table_job_info, internal_tables, snapshot_backfill_info, @@ -394,7 +396,7 @@ impl GlobalStreamManager { definition: definition.to_string(), streaming_job: streaming_job.clone(), internal_tables: internal_tables.into_values().collect_vec(), - ddl_type, + job_type, create_type, };