diff --git a/src/meta/model_v2/migration/src/lib.rs b/src/meta/model_v2/migration/src/lib.rs index 6b8a2b01c2d46..c021942bcbadd 100644 --- a/src/meta/model_v2/migration/src/lib.rs +++ b/src/meta/model_v2/migration/src/lib.rs @@ -16,3 +16,33 @@ impl MigratorTrait for Migrator { ] } } + +#[macro_export] +macro_rules! assert_not_has_tables { + ($manager:expr, $( $table:ident ),+) => { + $( + assert!( + !$manager + .has_table($table::Table.to_string()) + .await? + ); + )+ + }; +} + +#[macro_export] +macro_rules! drop_tables { + ($manager:expr, $( $table:ident ),+) => { + $( + $manager + .drop_table( + sea_orm_migration::prelude::Table::drop() + .table($table::Table) + .if_exists() + .cascade() + .to_owned(), + ) + .await?; + )+ + }; +} diff --git a/src/meta/model_v2/migration/src/m20230908_072257_init.rs b/src/meta/model_v2/migration/src/m20230908_072257_init.rs index bf8cb8c0fc1e1..bf96b8c6227a1 100644 --- a/src/meta/model_v2/migration/src/m20230908_072257_init.rs +++ b/src/meta/model_v2/migration/src/m20230908_072257_init.rs @@ -1,5 +1,7 @@ use sea_orm_migration::prelude::{Index as MigrationIndex, Table as MigrationTable, *}; +use crate::{assert_not_has_tables, drop_tables}; + #[derive(DeriveMigrationName)] pub struct Migration; @@ -7,38 +9,30 @@ pub struct Migration; impl MigrationTrait for Migration { async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { // 1. check if the table exists. - assert!(!manager.has_table(Cluster::Table.to_string()).await?); - assert!(!manager.has_table(Worker::Table.to_string()).await?); - assert!(!manager.has_table(WorkerProperty::Table.to_string()).await?); - assert!(!manager.has_table(User::Table.to_string()).await?); - assert!(!manager.has_table(UserPrivilege::Table.to_string()).await?); - assert!(!manager.has_table(Database::Table.to_string()).await?); - assert!(!manager.has_table(Schema::Table.to_string()).await?); - assert!(!manager.has_table(StreamingJob::Table.to_string()).await?); - assert!(!manager.has_table(Fragment::Table.to_string()).await?); - assert!(!manager.has_table(Actor::Table.to_string()).await?); - assert!( - !manager - .has_table(ActorDispatcher::Table.to_string()) - .await? - ); - assert!(!manager.has_table(Table::Table.to_string()).await?); - assert!(!manager.has_table(Source::Table.to_string()).await?); - assert!(!manager.has_table(Sink::Table.to_string()).await?); - assert!(!manager.has_table(Connection::Table.to_string()).await?); - assert!(!manager.has_table(View::Table.to_string()).await?); - assert!(!manager.has_table(Index::Table.to_string()).await?); - assert!(!manager.has_table(Function::Table.to_string()).await?); - assert!(!manager.has_table(Object::Table.to_string()).await?); - assert!( - !manager - .has_table(ObjectDependency::Table.to_string()) - .await? - ); - assert!( - !manager - .has_table(SystemParameter::Table.to_string()) - .await? + assert_not_has_tables!( + manager, + Cluster, + Worker, + WorkerProperty, + User, + UserPrivilege, + Database, + Schema, + StreamingJob, + Fragment, + Actor, + ActorDispatcher, + Table, + Source, + Sink, + Connection, + View, + Index, + Function, + Object, + ObjectDependency, + SystemParameter, + CatalogVersion ); // 2. create tables. @@ -742,6 +736,24 @@ impl MigrationTrait for Migration { .to_owned(), ) .await?; + manager + .create_table( + crate::Table::create() + .table(CatalogVersion::Table) + .col( + ColumnDef::new(CatalogVersion::Name) + .string() + .not_null() + .primary_key(), + ) + .col( + ColumnDef::new(CatalogVersion::Version) + .big_integer() + .not_null(), + ) + .to_owned(), + ) + .await?; // 3. create indexes. manager @@ -836,22 +848,6 @@ impl MigrationTrait for Migration { } async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { - macro_rules! drop_tables { - ($manager:expr, $( $table:ident ),+) => { - $( - $manager - .drop_table( - MigrationTable::drop() - .table($table::Table) - .if_exists() - .cascade() - .to_owned(), - ) - .await?; - )+ - }; - } - // drop tables cascade. drop_tables!( manager, @@ -875,7 +871,8 @@ impl MigrationTrait for Migration { Function, Object, ObjectDependency, - SystemParameter + SystemParameter, + CatalogVersion ); Ok(()) } @@ -1136,3 +1133,10 @@ enum SystemParameter { IsMutable, Description, } + +#[derive(DeriveIden)] +enum CatalogVersion { + Table, + Name, + Version, +} diff --git a/src/meta/model_v2/migration/src/m20231008_020431_hummock.rs b/src/meta/model_v2/migration/src/m20231008_020431_hummock.rs index 5f1f26b3e21db..b90e088da1f14 100644 --- a/src/meta/model_v2/migration/src/m20231008_020431_hummock.rs +++ b/src/meta/model_v2/migration/src/m20231008_020431_hummock.rs @@ -1,22 +1,13 @@ use sea_orm_migration::prelude::*; +use crate::{assert_not_has_tables, drop_tables}; + #[derive(DeriveMigrationName)] pub struct Migration; #[async_trait::async_trait] impl MigrationTrait for Migration { async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { - macro_rules! assert_not_has_tables { - ($manager:expr, $( $table:ident ),+) => { - $( - assert!( - !$manager - .has_table($table::Table.to_string()) - .await? - ); - )+ - }; - } assert_not_has_tables!( manager, CompactionTask, @@ -25,7 +16,8 @@ impl MigrationTrait for Migration { HummockPinnedVersion, HummockPinnedSnapshot, HummockVersionDelta, - HummockVersionStats + HummockVersionStats, + HummockSequence ); manager @@ -196,21 +188,6 @@ impl MigrationTrait for Migration { } async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { - macro_rules! drop_tables { - ($manager:expr, $( $table:ident ),+) => { - $( - $manager - .drop_table( - Table::drop() - .table($table::Table) - .if_exists() - .cascade() - .to_owned(), - ) - .await?; - )+ - }; - } drop_tables!( manager, CompactionTask, diff --git a/src/meta/model_v2/src/catalog_version.rs b/src/meta/model_v2/src/catalog_version.rs new file mode 100644 index 0000000000000..53c6f9109635c --- /dev/null +++ b/src/meta/model_v2/src/catalog_version.rs @@ -0,0 +1,37 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use sea_orm::entity::prelude::*; + +#[derive(Clone, Copy, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum)] +#[sea_orm(rs_type = "String", db_type = "String(None)")] +pub enum VersionCategory { + #[sea_orm(string_value = "NOTIFICATION")] + Notification, + #[sea_orm(string_value = "TABLE_REVISION")] + TableRevision, +} + +#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] +#[sea_orm(table_name = "catalog_version")] +pub struct Model { + #[sea_orm(primary_key, auto_increment = false)] + pub name: VersionCategory, + pub version: i64, +} + +#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] +pub enum Relation {} + +impl ActiveModelBehavior for ActiveModel {} diff --git a/src/meta/model_v2/src/lib.rs b/src/meta/model_v2/src/lib.rs index 572e59c820356..1cec6b553c161 100644 --- a/src/meta/model_v2/src/lib.rs +++ b/src/meta/model_v2/src/lib.rs @@ -23,6 +23,7 @@ pub mod prelude; pub mod actor; pub mod actor_dispatcher; +pub mod catalog_version; pub mod cluster; pub mod compaction_config; pub mod compaction_status; diff --git a/src/meta/model_v2/src/prelude.rs b/src/meta/model_v2/src/prelude.rs index b1cc5c54ff343..6a5316b0e422c 100644 --- a/src/meta/model_v2/src/prelude.rs +++ b/src/meta/model_v2/src/prelude.rs @@ -14,6 +14,7 @@ pub use super::actor::Entity as Actor; pub use super::actor_dispatcher::Entity as ActorDispatcher; +pub use super::catalog_version::Entity as CatalogVersion; pub use super::cluster::Entity as Cluster; pub use super::compaction_config::Entity as CompactionConfig; pub use super::compaction_status::Entity as CompactionStatus; diff --git a/src/meta/src/controller/system_param.rs b/src/meta/src/controller/system_param.rs index b97bf0804a955..6802ad17e1769 100644 --- a/src/meta/src/controller/system_param.rs +++ b/src/meta/src/controller/system_param.rs @@ -217,7 +217,7 @@ impl SystemParamsController { .await; // Sync params to worker nodes. - self.notify_workers(¶ms).await; + self.notify_workers(¶ms); Ok(params) } @@ -241,8 +241,7 @@ impl SystemParamsController { } } system_params_controller - .notify_workers(&*system_params_controller.params.read().await) - .await; + .notify_workers(&*system_params_controller.params.read().await); } }); @@ -250,16 +249,14 @@ impl SystemParamsController { } // Notify workers of parameter change. - async fn notify_workers(&self, params: &PbSystemParams) { + // TODO: add system params into snapshot to avoid periodically sync. + fn notify_workers(&self, params: &PbSystemParams) { self.notification_manager - .notify_frontend(Operation::Update, Info::SystemParams(params.clone())) - .await; + .notify_frontend_without_version(Operation::Update, Info::SystemParams(params.clone())); self.notification_manager - .notify_compute(Operation::Update, Info::SystemParams(params.clone())) - .await; + .notify_compute_without_version(Operation::Update, Info::SystemParams(params.clone())); self.notification_manager - .notify_compactor(Operation::Update, Info::SystemParams(params.clone())) - .await; + .notify_compute_without_version(Operation::Update, Info::SystemParams(params.clone())); } } diff --git a/src/meta/src/manager/env.rs b/src/meta/src/manager/env.rs index 26601f6af0373..ee4287b1aec3b 100644 --- a/src/meta/src/manager/env.rs +++ b/src/meta/src/manager/env.rs @@ -286,7 +286,8 @@ impl MetaSrvEnv { // change to sync after refactor `IdGeneratorManager::new` sync. let id_gen_manager = Arc::new(IdGeneratorManager::new(meta_store.clone()).await); let stream_client_pool = Arc::new(StreamClientPool::default()); - let notification_manager = Arc::new(NotificationManager::new(meta_store.clone()).await); + let notification_manager = + Arc::new(NotificationManager::new(meta_store.clone(), meta_store_sql.clone()).await); let idle_manager = Arc::new(IdleManager::new(opts.max_idle_ms)); let (mut cluster_id, cluster_first_launch) = if let Some(id) = ClusterId::from_meta_store(&meta_store).await? { @@ -463,7 +464,8 @@ impl MetaSrvEnv { let meta_store_sql = Some(SqlMetaStore::for_test().await); let id_gen_manager = Arc::new(IdGeneratorManager::new(meta_store.clone()).await); - let notification_manager = Arc::new(NotificationManager::new(meta_store.clone()).await); + let notification_manager = + Arc::new(NotificationManager::new(meta_store.clone(), meta_store_sql.clone()).await); let stream_client_pool = Arc::new(StreamClientPool::default()); let idle_manager = Arc::new(IdleManager::disabled()); let (cluster_id, cluster_first_launch) = (ClusterId::new(), true); diff --git a/src/meta/src/manager/mod.rs b/src/meta/src/manager/mod.rs index 4ab91c388792d..24c9810084259 100644 --- a/src/meta/src/manager/mod.rs +++ b/src/meta/src/manager/mod.rs @@ -21,6 +21,7 @@ mod id; mod idle; mod metadata; mod notification; +mod notification_version; pub mod sink_coordination; mod streaming_job; mod system_param; diff --git a/src/meta/src/manager/notification.rs b/src/meta/src/manager/notification.rs index 3132f662cd765..49f85a993bdd9 100644 --- a/src/meta/src/manager/notification.rs +++ b/src/meta/src/manager/notification.rs @@ -28,8 +28,10 @@ use tokio::sync::mpsc::{self, UnboundedSender}; use tokio::sync::Mutex; use tonic::Status; +use crate::controller::SqlMetaStore; use crate::manager::cluster::WorkerKey; -use crate::model::{FragmentId, NotificationVersion as Version}; +use crate::manager::notification_version::NotificationVersionGenerator; +use crate::model::FragmentId; use crate::storage::MetaStoreRef; pub type MessageStatus = Status; @@ -77,17 +79,19 @@ pub struct NotificationManager { core: Arc>, /// Sender used to add a notification into the waiting queue. task_tx: UnboundedSender, - /// The current notification version. - current_version: Mutex, - meta_store: MetaStoreRef, + /// The current notification version generator. + version_generator: Mutex, } impl NotificationManager { - pub async fn new(meta_store: MetaStoreRef) -> Self { + pub async fn new(meta_store: MetaStoreRef, meta_store_sql: Option) -> Self { // notification waiting queue. let (task_tx, mut task_rx) = mpsc::unbounded_channel::(); let core = Arc::new(Mutex::new(NotificationManagerCore::new())); let core_clone = core.clone(); + let version_generator = NotificationVersionGenerator::new(meta_store, meta_store_sql) + .await + .unwrap(); tokio::spawn(async move { while let Some(task) = task_rx.recv().await { @@ -105,8 +109,7 @@ impl NotificationManager { Self { core: core_clone, task_tx, - current_version: Mutex::new(Version::new(&meta_store).await), - meta_store, + version_generator: Mutex::new(version_generator), } } @@ -140,9 +143,9 @@ impl NotificationManager { operation: Operation, info: Info, ) -> NotificationVersion { - let mut version_guard = self.current_version.lock().await; - version_guard.increase_version(&self.meta_store).await; - let version = version_guard.version(); + let mut version_guard = self.version_generator.lock().await; + version_guard.increase_version().await; + let version = version_guard.current_version(); self.notify(target, operation, info, Some(version)); version } @@ -252,6 +255,10 @@ impl NotificationManager { self.notify_without_version(SubscribeType::Hummock.into(), operation, info) } + pub fn notify_compactor_without_version(&self, operation: Operation, info: Info) { + self.notify_without_version(SubscribeType::Compactor.into(), operation, info) + } + #[cfg(any(test, feature = "test"))] pub fn notify_hummock_with_version( &self, @@ -320,8 +327,8 @@ impl NotificationManager { } pub async fn current_version(&self) -> NotificationVersion { - let version_guard = self.current_version.lock().await; - version_guard.version() + let version_guard = self.version_generator.lock().await; + version_guard.current_version() } } @@ -411,7 +418,7 @@ mod tests { #[tokio::test] async fn test_multiple_subscribers_one_worker() { - let mgr = NotificationManager::new(MemStore::new().into_ref()).await; + let mgr = NotificationManager::new(MemStore::new().into_ref(), None).await; let worker_key1 = WorkerKey(HostAddress { host: "a".to_string(), port: 1, diff --git a/src/meta/src/manager/notification_version.rs b/src/meta/src/manager/notification_version.rs new file mode 100644 index 0000000000000..ea461da976ac5 --- /dev/null +++ b/src/meta/src/manager/notification_version.rs @@ -0,0 +1,84 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use risingwave_meta_model_v2::catalog_version; +use risingwave_meta_model_v2::catalog_version::VersionCategory; +use risingwave_meta_model_v2::prelude::CatalogVersion; +use sea_orm::ActiveValue::Set; +use sea_orm::{ActiveModelTrait, DatabaseConnection, EntityTrait, TransactionTrait}; + +use crate::controller::SqlMetaStore; +use crate::model::NotificationVersion as NotificationModelV1; +use crate::storage::MetaStoreRef; +use crate::MetaResult; + +pub enum NotificationVersionGenerator { + KvGenerator(NotificationModelV1, MetaStoreRef), + SqlGenerator(u64, DatabaseConnection), +} + +// TODO: add pre-allocation if necessary +impl NotificationVersionGenerator { + pub async fn new( + meta_store: MetaStoreRef, + meta_store_sql: Option, + ) -> MetaResult { + if let Some(sql) = meta_store_sql { + let txn = sql.conn.begin().await?; + let model = CatalogVersion::find_by_id(VersionCategory::Notification) + .one(&txn) + .await?; + let current_version = model.as_ref().map(|m| m.version).unwrap_or(1) as u64; + if model.is_none() { + catalog_version::ActiveModel { + name: Set(VersionCategory::Notification), + version: Set(1), + } + .insert(&txn) + .await?; + txn.commit().await?; + } + + Ok(Self::SqlGenerator(current_version, sql.conn)) + } else { + let current_version = NotificationModelV1::new(&meta_store).await; + Ok(Self::KvGenerator(current_version, meta_store)) + } + } + + pub fn current_version(&self) -> u64 { + match self { + NotificationVersionGenerator::KvGenerator(v, _) => v.version(), + NotificationVersionGenerator::SqlGenerator(v, _) => *v, + } + } + + pub async fn increase_version(&mut self) { + match self { + NotificationVersionGenerator::KvGenerator(v, meta_store) => { + v.increase_version(meta_store).await + } + NotificationVersionGenerator::SqlGenerator(v, conn) => { + catalog_version::ActiveModel { + name: Set(VersionCategory::Notification), + version: Set((*v + 1) as i64), + } + .update(conn) + .await + .unwrap(); + *v += 1; + } + } + } +} diff --git a/src/meta/src/manager/system_param/mod.rs b/src/meta/src/manager/system_param/mod.rs index a7336a10e5de6..bdd54d3ed7b55 100644 --- a/src/meta/src/manager/system_param/mod.rs +++ b/src/meta/src/manager/system_param/mod.rs @@ -116,7 +116,7 @@ impl SystemParamsManager { .await; // Sync params to worker nodes. - self.notify_workers(params).await; + self.notify_workers(params); Ok(params.clone()) } @@ -142,9 +142,7 @@ impl SystemParamsManager { return; } } - system_params_manager - .notify_workers(&*system_params_manager.params.read().await) - .await; + system_params_manager.notify_workers(&*system_params_manager.params.read().await); } }); @@ -152,16 +150,16 @@ impl SystemParamsManager { } // Notify workers of parameter change. - async fn notify_workers(&self, params: &SystemParams) { + // TODO: add system params into snapshot to avoid periodically sync. + fn notify_workers(&self, params: &SystemParams) { self.notification_manager - .notify_frontend(Operation::Update, Info::SystemParams(params.clone())) - .await; - self.notification_manager - .notify_compute(Operation::Update, Info::SystemParams(params.clone())) - .await; + .notify_frontend_without_version(Operation::Update, Info::SystemParams(params.clone())); self.notification_manager - .notify_compactor(Operation::Update, Info::SystemParams(params.clone())) - .await; + .notify_compute_without_version(Operation::Update, Info::SystemParams(params.clone())); + self.notification_manager.notify_compactor_without_version( + Operation::Update, + Info::SystemParams(params.clone()), + ); } }