Skip to content

Commit

Permalink
feat(sql-backend): support notification generator in sql backend (#14875
Browse files Browse the repository at this point in the history
)
  • Loading branch information
yezizp2012 authored Jan 31, 2024
1 parent c022b24 commit 1f57fdd
Show file tree
Hide file tree
Showing 12 changed files with 252 additions and 113 deletions.
30 changes: 30 additions & 0 deletions src/meta/model_v2/migration/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,33 @@ impl MigratorTrait for Migrator {
]
}
}

#[macro_export]
macro_rules! assert_not_has_tables {
($manager:expr, $( $table:ident ),+) => {
$(
assert!(
!$manager
.has_table($table::Table.to_string())
.await?
);
)+
};
}

#[macro_export]
macro_rules! drop_tables {
($manager:expr, $( $table:ident ),+) => {
$(
$manager
.drop_table(
sea_orm_migration::prelude::Table::drop()
.table($table::Table)
.if_exists()
.cascade()
.to_owned(),
)
.await?;
)+
};
}
102 changes: 53 additions & 49 deletions src/meta/model_v2/migration/src/m20230908_072257_init.rs
Original file line number Diff line number Diff line change
@@ -1,44 +1,38 @@
use sea_orm_migration::prelude::{Index as MigrationIndex, Table as MigrationTable, *};

use crate::{assert_not_has_tables, drop_tables};

#[derive(DeriveMigrationName)]
pub struct Migration;

#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
// 1. check if the table exists.
assert!(!manager.has_table(Cluster::Table.to_string()).await?);
assert!(!manager.has_table(Worker::Table.to_string()).await?);
assert!(!manager.has_table(WorkerProperty::Table.to_string()).await?);
assert!(!manager.has_table(User::Table.to_string()).await?);
assert!(!manager.has_table(UserPrivilege::Table.to_string()).await?);
assert!(!manager.has_table(Database::Table.to_string()).await?);
assert!(!manager.has_table(Schema::Table.to_string()).await?);
assert!(!manager.has_table(StreamingJob::Table.to_string()).await?);
assert!(!manager.has_table(Fragment::Table.to_string()).await?);
assert!(!manager.has_table(Actor::Table.to_string()).await?);
assert!(
!manager
.has_table(ActorDispatcher::Table.to_string())
.await?
);
assert!(!manager.has_table(Table::Table.to_string()).await?);
assert!(!manager.has_table(Source::Table.to_string()).await?);
assert!(!manager.has_table(Sink::Table.to_string()).await?);
assert!(!manager.has_table(Connection::Table.to_string()).await?);
assert!(!manager.has_table(View::Table.to_string()).await?);
assert!(!manager.has_table(Index::Table.to_string()).await?);
assert!(!manager.has_table(Function::Table.to_string()).await?);
assert!(!manager.has_table(Object::Table.to_string()).await?);
assert!(
!manager
.has_table(ObjectDependency::Table.to_string())
.await?
);
assert!(
!manager
.has_table(SystemParameter::Table.to_string())
.await?
assert_not_has_tables!(
manager,
Cluster,
Worker,
WorkerProperty,
User,
UserPrivilege,
Database,
Schema,
StreamingJob,
Fragment,
Actor,
ActorDispatcher,
Table,
Source,
Sink,
Connection,
View,
Index,
Function,
Object,
ObjectDependency,
SystemParameter,
CatalogVersion
);

// 2. create tables.
Expand Down Expand Up @@ -742,6 +736,24 @@ impl MigrationTrait for Migration {
.to_owned(),
)
.await?;
manager
.create_table(
crate::Table::create()
.table(CatalogVersion::Table)
.col(
ColumnDef::new(CatalogVersion::Name)
.string()
.not_null()
.primary_key(),
)
.col(
ColumnDef::new(CatalogVersion::Version)
.big_integer()
.not_null(),
)
.to_owned(),
)
.await?;

// 3. create indexes.
manager
Expand Down Expand Up @@ -836,22 +848,6 @@ impl MigrationTrait for Migration {
}

async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
macro_rules! drop_tables {
($manager:expr, $( $table:ident ),+) => {
$(
$manager
.drop_table(
MigrationTable::drop()
.table($table::Table)
.if_exists()
.cascade()
.to_owned(),
)
.await?;
)+
};
}

// drop tables cascade.
drop_tables!(
manager,
Expand All @@ -875,7 +871,8 @@ impl MigrationTrait for Migration {
Function,
Object,
ObjectDependency,
SystemParameter
SystemParameter,
CatalogVersion
);
Ok(())
}
Expand Down Expand Up @@ -1136,3 +1133,10 @@ enum SystemParameter {
IsMutable,
Description,
}

#[derive(DeriveIden)]
enum CatalogVersion {
Table,
Name,
Version,
}
31 changes: 4 additions & 27 deletions src/meta/model_v2/migration/src/m20231008_020431_hummock.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,13 @@
use sea_orm_migration::prelude::*;

use crate::{assert_not_has_tables, drop_tables};

#[derive(DeriveMigrationName)]
pub struct Migration;

#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
macro_rules! assert_not_has_tables {
($manager:expr, $( $table:ident ),+) => {
$(
assert!(
!$manager
.has_table($table::Table.to_string())
.await?
);
)+
};
}
assert_not_has_tables!(
manager,
CompactionTask,
Expand All @@ -25,7 +16,8 @@ impl MigrationTrait for Migration {
HummockPinnedVersion,
HummockPinnedSnapshot,
HummockVersionDelta,
HummockVersionStats
HummockVersionStats,
HummockSequence
);

manager
Expand Down Expand Up @@ -196,21 +188,6 @@ impl MigrationTrait for Migration {
}

async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
macro_rules! drop_tables {
($manager:expr, $( $table:ident ),+) => {
$(
$manager
.drop_table(
Table::drop()
.table($table::Table)
.if_exists()
.cascade()
.to_owned(),
)
.await?;
)+
};
}
drop_tables!(
manager,
CompactionTask,
Expand Down
37 changes: 37 additions & 0 deletions src/meta/model_v2/src/catalog_version.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use sea_orm::entity::prelude::*;

#[derive(Clone, Copy, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum)]
#[sea_orm(rs_type = "String", db_type = "String(None)")]
pub enum VersionCategory {
#[sea_orm(string_value = "NOTIFICATION")]
Notification,
#[sea_orm(string_value = "TABLE_REVISION")]
TableRevision,
}

#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)]
#[sea_orm(table_name = "catalog_version")]
pub struct Model {
#[sea_orm(primary_key, auto_increment = false)]
pub name: VersionCategory,
pub version: i64,
}

#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {}

impl ActiveModelBehavior for ActiveModel {}
1 change: 1 addition & 0 deletions src/meta/model_v2/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ pub mod prelude;

pub mod actor;
pub mod actor_dispatcher;
pub mod catalog_version;
pub mod cluster;
pub mod compaction_config;
pub mod compaction_status;
Expand Down
1 change: 1 addition & 0 deletions src/meta/model_v2/src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

pub use super::actor::Entity as Actor;
pub use super::actor_dispatcher::Entity as ActorDispatcher;
pub use super::catalog_version::Entity as CatalogVersion;
pub use super::cluster::Entity as Cluster;
pub use super::compaction_config::Entity as CompactionConfig;
pub use super::compaction_status::Entity as CompactionStatus;
Expand Down
17 changes: 7 additions & 10 deletions src/meta/src/controller/system_param.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ impl SystemParamsController {
.await;

// Sync params to worker nodes.
self.notify_workers(&params).await;
self.notify_workers(&params);

Ok(params)
}
Expand All @@ -241,25 +241,22 @@ impl SystemParamsController {
}
}
system_params_controller
.notify_workers(&*system_params_controller.params.read().await)
.await;
.notify_workers(&*system_params_controller.params.read().await);
}
});

(join_handle, shutdown_tx)
}

// Notify workers of parameter change.
async fn notify_workers(&self, params: &PbSystemParams) {
// TODO: add system params into snapshot to avoid periodically sync.
fn notify_workers(&self, params: &PbSystemParams) {
self.notification_manager
.notify_frontend(Operation::Update, Info::SystemParams(params.clone()))
.await;
.notify_frontend_without_version(Operation::Update, Info::SystemParams(params.clone()));
self.notification_manager
.notify_compute(Operation::Update, Info::SystemParams(params.clone()))
.await;
.notify_compute_without_version(Operation::Update, Info::SystemParams(params.clone()));
self.notification_manager
.notify_compactor(Operation::Update, Info::SystemParams(params.clone()))
.await;
.notify_compute_without_version(Operation::Update, Info::SystemParams(params.clone()));
}
}

Expand Down
6 changes: 4 additions & 2 deletions src/meta/src/manager/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,8 @@ impl MetaSrvEnv {
// change to sync after refactor `IdGeneratorManager::new` sync.
let id_gen_manager = Arc::new(IdGeneratorManager::new(meta_store.clone()).await);
let stream_client_pool = Arc::new(StreamClientPool::default());
let notification_manager = Arc::new(NotificationManager::new(meta_store.clone()).await);
let notification_manager =
Arc::new(NotificationManager::new(meta_store.clone(), meta_store_sql.clone()).await);
let idle_manager = Arc::new(IdleManager::new(opts.max_idle_ms));
let (mut cluster_id, cluster_first_launch) =
if let Some(id) = ClusterId::from_meta_store(&meta_store).await? {
Expand Down Expand Up @@ -463,7 +464,8 @@ impl MetaSrvEnv {
let meta_store_sql = Some(SqlMetaStore::for_test().await);

let id_gen_manager = Arc::new(IdGeneratorManager::new(meta_store.clone()).await);
let notification_manager = Arc::new(NotificationManager::new(meta_store.clone()).await);
let notification_manager =
Arc::new(NotificationManager::new(meta_store.clone(), meta_store_sql.clone()).await);
let stream_client_pool = Arc::new(StreamClientPool::default());
let idle_manager = Arc::new(IdleManager::disabled());
let (cluster_id, cluster_first_launch) = (ClusterId::new(), true);
Expand Down
1 change: 1 addition & 0 deletions src/meta/src/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ mod id;
mod idle;
mod metadata;
mod notification;
mod notification_version;
pub mod sink_coordination;
mod streaming_job;
mod system_param;
Expand Down
Loading

0 comments on commit 1f57fdd

Please sign in to comment.