Skip to content

Commit

Permalink
fix(sql-backend): fix some corner cases in alter rename and schema in…
Browse files Browse the repository at this point in the history
… sql backend (#14982)

Co-authored-by: Shanicky Chen <[email protected]>
  • Loading branch information
yezizp2012 and shanicky authored Feb 4, 2024
1 parent 42cc7c8 commit fd2a899
Show file tree
Hide file tree
Showing 8 changed files with 78 additions and 44 deletions.
2 changes: 1 addition & 1 deletion e2e_test/batch/basic/logical_view.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ SELECT * FROM v3;
statement error
DROP TABLE t;

statement error other relation\(s\) depend on it
statement error Permission denied
DROP VIEW v2;

statement ok
Expand Down
2 changes: 1 addition & 1 deletion e2e_test/ddl/alter_rename.slt
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ DROP SCHEMA schema1;
statement ok
DROP SINK sink1;

statement error other relation\(s\) depend on it
statement error Permission denied
DROP VIEW v5;

statement ok
Expand Down
10 changes: 5 additions & 5 deletions e2e_test/ddl/dependency_check.slt
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ create index i_b1 on b(b1);
statement ok
create materialized view mv1 as select * from a join b on a.a1 = b.b1;

statement error other relation\(s\) depend on it
statement error Permission denied
drop index i_a1;

statement ok
Expand All @@ -25,7 +25,7 @@ drop materialized view mv1;
statement ok
create materialized view mv2 as with ctx as (select a1 from a) select b1 from b;

statement error other relation\(s\) depend on it
statement error Permission denied
drop table a;

statement ok
Expand All @@ -48,13 +48,13 @@ create view v2 as select * from v;
statement ok
create materialized view mv3 as select * from v2;

statement error other relation\(s\) depend on it
statement error Permission denied
drop source src;

statement error other relation\(s\) depend on it
statement error Permission denied
drop view v;

statement error other relation\(s\) depend on it
statement error Permission denied
drop view v2;

statement ok
Expand Down
2 changes: 1 addition & 1 deletion e2e_test/source/basic/ddl.slt
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ create source s (
statement ok
create materialized view mv_1 as select * from s

statement error other relation\(s\) depend on it
statement error Permission denied
drop source s

statement ok
Expand Down
2 changes: 1 addition & 1 deletion e2e_test/source/basic/old_row_format_syntax/ddl.slt
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ create source s (
statement ok
create materialized view mv_1 as select * from s

statement error other relation\(s\) depend on it
statement error Permission denied
drop source s

statement ok
Expand Down
2 changes: 1 addition & 1 deletion src/meta/model_v2/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::{
SourceId, TableId, TableVersion,
};

#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum)]
#[derive(Clone, Debug, PartialEq, Copy, Eq, EnumIter, DeriveActiveEnum)]
#[sea_orm(rs_type = "String", db_type = "String(None)")]
pub enum TableType {
#[sea_orm(string_value = "TABLE")]
Expand Down
82 changes: 59 additions & 23 deletions src/meta/src/controller/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1131,10 +1131,6 @@ impl CatalogController {
if obj.schema_id == Some(new_schema) {
return Ok(IGNORED_NOTIFICATION_VERSION);
}

let mut obj = obj.into_active_model();
obj.schema_id = Set(Some(new_schema));
let obj = obj.update(&txn).await?;
let database_id = obj.database_id.unwrap();

let mut relations = vec![];
Expand All @@ -1145,9 +1141,16 @@ impl CatalogController {
.await?
.ok_or_else(|| MetaError::catalog_id_not_found("table", object_id))?;
check_relation_name_duplicate(&table.name, database_id, new_schema, &txn).await?;
let (associated_src_id, table_type) =
(table.optional_associated_source_id, table.table_type);

let mut obj = obj.into_active_model();
obj.schema_id = Set(Some(new_schema));
let obj = obj.update(&txn).await?;
relations.push(PbRelationInfo::Table(ObjectModel(table, obj).into()));

// associated source.
if let Some(associated_source_id) = table.optional_associated_source_id {
if let Some(associated_source_id) = associated_src_id {
let src_obj = object::ActiveModel {
oid: Set(associated_source_id as _),
schema_id: Set(Some(new_schema)),
Expand All @@ -1168,7 +1171,7 @@ impl CatalogController {
let (index_ids, (index_names, mut table_ids)): (
Vec<IndexId>,
(Vec<String>, Vec<TableId>),
) = if table.table_type == TableType::Table {
) = if table_type == TableType::Table {
Index::find()
.select_only()
.columns([
Expand All @@ -1186,7 +1189,6 @@ impl CatalogController {
} else {
(vec![], (vec![], vec![]))
};
relations.push(PbRelationInfo::Table(ObjectModel(table, obj).into()));

// internal tables.
let internal_tables: Vec<TableId> = Table::find()
Expand Down Expand Up @@ -1220,18 +1222,6 @@ impl CatalogController {
.await?;
}

if !index_ids.is_empty() {
let index_objs = Index::find()
.find_also_related(Object)
.filter(index::Column::IndexId.is_in(index_ids))
.all(&txn)
.await?;
for (index, index_obj) in index_objs {
relations.push(PbRelationInfo::Index(
ObjectModel(index, index_obj.unwrap()).into(),
));
}
}
if !table_ids.is_empty() {
let table_objs = Table::find()
.find_also_related(Object)
Expand All @@ -1244,13 +1234,29 @@ impl CatalogController {
));
}
}
if !index_ids.is_empty() {
let index_objs = Index::find()
.find_also_related(Object)
.filter(index::Column::IndexId.is_in(index_ids))
.all(&txn)
.await?;
for (index, index_obj) in index_objs {
relations.push(PbRelationInfo::Index(
ObjectModel(index, index_obj.unwrap()).into(),
));
}
}
}
ObjectType::Source => {
let source = Source::find_by_id(object_id)
.one(&txn)
.await?
.ok_or_else(|| MetaError::catalog_id_not_found("source", object_id))?;
check_relation_name_duplicate(&source.name, database_id, new_schema, &txn).await?;

let mut obj = obj.into_active_model();
obj.schema_id = Set(Some(new_schema));
let obj = obj.update(&txn).await?;
relations.push(PbRelationInfo::Source(ObjectModel(source, obj).into()));
}
ObjectType::Sink => {
Expand All @@ -1259,6 +1265,10 @@ impl CatalogController {
.await?
.ok_or_else(|| MetaError::catalog_id_not_found("sink", object_id))?;
check_relation_name_duplicate(&sink.name, database_id, new_schema, &txn).await?;

let mut obj = obj.into_active_model();
obj.schema_id = Set(Some(new_schema));
let obj = obj.update(&txn).await?;
relations.push(PbRelationInfo::Sink(ObjectModel(sink, obj).into()));

// internal tables.
Expand Down Expand Up @@ -1298,16 +1308,30 @@ impl CatalogController {
.await?
.ok_or_else(|| MetaError::catalog_id_not_found("view", object_id))?;
check_relation_name_duplicate(&view.name, database_id, new_schema, &txn).await?;

let mut obj = obj.into_active_model();
obj.schema_id = Set(Some(new_schema));
let obj = obj.update(&txn).await?;
relations.push(PbRelationInfo::View(ObjectModel(view, obj).into()));
}
ObjectType::Function => {
let function = Function::find_by_id(object_id)
.one(&txn)
.await?
.ok_or_else(|| MetaError::catalog_id_not_found("function", object_id))?;
let pb_function: PbFunction = ObjectModel(function, obj).into();

let mut pb_function: PbFunction = ObjectModel(function, obj).into();
pb_function.schema_id = new_schema as _;
check_function_signature_duplicate(&pb_function, &txn).await?;

object::ActiveModel {
oid: Set(object_id),
schema_id: Set(Some(new_schema)),
..Default::default()
}
.update(&txn)
.await?;

txn.commit().await?;
let version = self
.notify_frontend(
Expand All @@ -1322,9 +1346,19 @@ impl CatalogController {
.one(&txn)
.await?
.ok_or_else(|| MetaError::catalog_id_not_found("connection", object_id))?;
let pb_connection: PbConnection = ObjectModel(connection, obj).into();

let mut pb_connection: PbConnection = ObjectModel(connection, obj).into();
pb_connection.schema_id = new_schema as _;
check_connection_name_duplicate(&pb_connection, &txn).await?;

object::ActiveModel {
oid: Set(object_id),
schema_id: Set(Some(new_schema)),
..Default::default()
}
.update(&txn)
.await?;

txn.commit().await?;
let version = self
.notify_frontend(
Expand All @@ -1337,6 +1371,7 @@ impl CatalogController {
_ => unreachable!("not supported object type: {:?}", object_type),
}

txn.commit().await?;
let version = self
.notify_frontend(
Operation::Update,
Expand Down Expand Up @@ -1748,7 +1783,7 @@ impl CatalogController {
let obj = obj.unwrap();
let old_name = relation.name.clone();
relation.name = object_name.into();
if obj.obj_type != ObjectType::Index {
if obj.obj_type != ObjectType::View {
relation.definition = alter_relation_rename(&relation.definition, object_name);
}
let active_model = $table::ActiveModel {
Expand Down Expand Up @@ -1778,6 +1813,7 @@ impl CatalogController {
.unwrap();
index.name = object_name.into();
let index_table_id = index.index_table_id;
let old_name = rename_relation!(Table, table, table_id, index_table_id);

// the name of index and its associated table is the same.
let active_model = index::ActiveModel {
Expand All @@ -1791,7 +1827,7 @@ impl CatalogController {
ObjectModel(index, obj.unwrap()).into(),
)),
});
rename_relation!(Table, table, table_id, index_table_id)
old_name
}
_ => unreachable!("only relation name can be altered."),
};
Expand Down
20 changes: 9 additions & 11 deletions src/meta/src/controller/streaming_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use risingwave_pb::catalog::table::{PbOptionalAssociatedSourceId, PbTableVersion
use risingwave_pb::catalog::{PbCreateType, PbTable};
use risingwave_pb::meta::relation::PbRelationInfo;
use risingwave_pb::meta::subscribe_response::{
Info as NotificationInfo, Info, Operation as NotificationOperation, Operation,
Info as NotificationInfo, Operation as NotificationOperation, Operation,
};
use risingwave_pb::meta::table_fragments::PbActorStatus;
use risingwave_pb::meta::{
Expand Down Expand Up @@ -543,7 +543,7 @@ impl CatalogController {

let table = table::ActiveModel::from(table).update(&txn).await?;

let old_fragment_mappings = get_fragment_mappings(&txn, job_id).await?;
// let old_fragment_mappings = get_fragment_mappings(&txn, job_id).await?;
// 1. replace old fragments/actors with new ones.
Fragment::delete_many()
.filter(fragment::Column::JobId.eq(job_id))
Expand Down Expand Up @@ -701,8 +701,11 @@ impl CatalogController {

txn.commit().await?;

self.notify_fragment_mapping(NotificationOperation::Delete, old_fragment_mappings)
.await;
// FIXME: Do not notify frontend currently, because frontend nodes might refer to old table
// catalog and need to access the old fragment. Let frontend nodes delete the old fragment
// when they receive table catalog change.
// self.notify_fragment_mapping(NotificationOperation::Delete, old_fragment_mappings)
// .await;
self.notify_fragment_mapping(NotificationOperation::Add, fragment_mapping)
.await;
let version = self
Expand Down Expand Up @@ -1200,13 +1203,8 @@ impl CatalogController {
}

txn.commit().await?;

for mapping in fragment_mapping_to_notify {
self.env
.notification_manager()
.notify_frontend(Operation::Update, Info::ParallelUnitMapping(mapping))
.await;
}
self.notify_fragment_mapping(Operation::Update, fragment_mapping_to_notify)
.await;

Ok(())
}
Expand Down

0 comments on commit fd2a899

Please sign in to comment.