From 82dba308595870876eb9fab6465d9782f78ba255 Mon Sep 17 00:00:00 2001 From: August Date: Tue, 27 Feb 2024 18:47:05 +0800 Subject: [PATCH] fix(sql-backend): fix some function column types and change json column to jsonb (#15299) --- .../migration/src/m20230908_072257_init.rs | 120 ++++++++++++------ src/meta/src/controller/catalog.rs | 10 +- 2 files changed, 84 insertions(+), 46 deletions(-) diff --git a/src/meta/model_v2/migration/src/m20230908_072257_init.rs b/src/meta/model_v2/migration/src/m20230908_072257_init.rs index de19649cb95c..b6a3f53de9f8 100644 --- a/src/meta/model_v2/migration/src/m20230908_072257_init.rs +++ b/src/meta/model_v2/migration/src/m20230908_072257_init.rs @@ -85,7 +85,7 @@ impl MigrationTrait for Migration { ) .col( ColumnDef::new(WorkerProperty::ParallelUnitIds) - .json() + .json_binary() .not_null(), ) .col( @@ -129,7 +129,7 @@ impl MigrationTrait for Migration { .col(ColumnDef::new(User::CanCreateDb).boolean().not_null()) .col(ColumnDef::new(User::CanCreateUser).boolean().not_null()) .col(ColumnDef::new(User::CanLogin).boolean().not_null()) - .col(ColumnDef::new(User::AuthInfo).json()) + .col(ColumnDef::new(User::AuthInfo).json_binary()) .to_owned(), ) .await?; @@ -328,7 +328,11 @@ impl MigrationTrait for Migration { .col(ColumnDef::new(StreamingJob::JobStatus).string().not_null()) .col(ColumnDef::new(StreamingJob::CreateType).string().not_null()) .col(ColumnDef::new(StreamingJob::Timezone).string()) - .col(ColumnDef::new(StreamingJob::Parallelism).json().not_null()) + .col( + ColumnDef::new(StreamingJob::Parallelism) + .json_binary() + .not_null(), + ) .foreign_key( &mut ForeignKey::create() .name("FK_streaming_job_object_id") @@ -362,9 +366,13 @@ impl MigrationTrait for Migration { .not_null(), ) .col(ColumnDef::new(Fragment::StreamNode).binary().not_null()) - .col(ColumnDef::new(Fragment::VnodeMapping).json().not_null()) - .col(ColumnDef::new(Fragment::StateTableIds).json()) - .col(ColumnDef::new(Fragment::UpstreamFragmentId).json()) + .col( + ColumnDef::new(Fragment::VnodeMapping) + .json_binary() + .not_null(), + ) + .col(ColumnDef::new(Fragment::StateTableIds).json_binary()) + .col(ColumnDef::new(Fragment::UpstreamFragmentId).json_binary()) .foreign_key( &mut ForeignKey::create() .name("FK_fragment_table_id") @@ -388,12 +396,12 @@ impl MigrationTrait for Migration { ) .col(ColumnDef::new(Actor::FragmentId).integer().not_null()) .col(ColumnDef::new(Actor::Status).string().not_null()) - .col(ColumnDef::new(Actor::Splits).json()) + .col(ColumnDef::new(Actor::Splits).json_binary()) .col(ColumnDef::new(Actor::ParallelUnitId).integer().not_null()) .col(ColumnDef::new(Actor::WorkerId).integer().not_null()) - .col(ColumnDef::new(Actor::UpstreamActorIds).json()) - .col(ColumnDef::new(Actor::VnodeBitmap).json()) - .col(ColumnDef::new(Actor::ExprContext).json().not_null()) + .col(ColumnDef::new(Actor::UpstreamActorIds).json_binary()) + .col(ColumnDef::new(Actor::VnodeBitmap).json_binary()) + .col(ColumnDef::new(Actor::ExprContext).json_binary().not_null()) .foreign_key( &mut ForeignKey::create() .name("FK_actor_fragment_id") @@ -427,15 +435,15 @@ impl MigrationTrait for Migration { ) .col( ColumnDef::new(ActorDispatcher::DistKeyIndices) - .json() + .json_binary() .not_null(), ) .col( ColumnDef::new(ActorDispatcher::OutputIndices) - .json() + .json_binary() .not_null(), ) - .col(ColumnDef::new(ActorDispatcher::HashMapping).json()) + .col(ColumnDef::new(ActorDispatcher::HashMapping).json_binary()) .col( ColumnDef::new(ActorDispatcher::DispatcherId) .integer() @@ -443,7 +451,7 @@ impl MigrationTrait for Migration { ) .col( ColumnDef::new(ActorDispatcher::DownstreamActorIds) - .json() + .json_binary() .not_null(), ) .col(ColumnDef::new(ActorDispatcher::DownstreamTableName).string()) @@ -476,7 +484,7 @@ impl MigrationTrait for Migration { .primary_key(), ) .col(ColumnDef::new(Connection::Name).string().not_null()) - .col(ColumnDef::new(Connection::Info).json().not_null()) + .col(ColumnDef::new(Connection::Info).json_binary().not_null()) .foreign_key( &mut ForeignKey::create() .name("FK_connection_object_id") @@ -495,12 +503,20 @@ impl MigrationTrait for Migration { .col(ColumnDef::new(Source::SourceId).integer().primary_key()) .col(ColumnDef::new(Source::Name).string().not_null()) .col(ColumnDef::new(Source::RowIdIndex).integer()) - .col(ColumnDef::new(Source::Columns).json().not_null()) - .col(ColumnDef::new(Source::PkColumnIds).json().not_null()) - .col(ColumnDef::new(Source::WithProperties).json().not_null()) + .col(ColumnDef::new(Source::Columns).json_binary().not_null()) + .col(ColumnDef::new(Source::PkColumnIds).json_binary().not_null()) + .col( + ColumnDef::new(Source::WithProperties) + .json_binary() + .not_null(), + ) .col(ColumnDef::new(Source::Definition).string().not_null()) - .col(ColumnDef::new(Source::SourceInfo).json()) - .col(ColumnDef::new(Source::WatermarkDescs).json().not_null()) + .col(ColumnDef::new(Source::SourceInfo).json_binary()) + .col( + ColumnDef::new(Source::WatermarkDescs) + .json_binary() + .not_null(), + ) .col(ColumnDef::new(Source::OptionalAssociatedTableId).integer()) .col(ColumnDef::new(Source::ConnectionId).integer()) .col(ColumnDef::new(Source::Version).big_integer().not_null()) @@ -539,15 +555,19 @@ impl MigrationTrait for Migration { .col(ColumnDef::new(Table::OptionalAssociatedSourceId).integer()) .col(ColumnDef::new(Table::TableType).string().not_null()) .col(ColumnDef::new(Table::BelongsToJobId).integer()) - .col(ColumnDef::new(Table::Columns).json().not_null()) - .col(ColumnDef::new(Table::Pk).json().not_null()) - .col(ColumnDef::new(Table::DistributionKey).json().not_null()) - .col(ColumnDef::new(Table::StreamKey).json().not_null()) + .col(ColumnDef::new(Table::Columns).json_binary().not_null()) + .col(ColumnDef::new(Table::Pk).json_binary().not_null()) + .col( + ColumnDef::new(Table::DistributionKey) + .json_binary() + .not_null(), + ) + .col(ColumnDef::new(Table::StreamKey).json_binary().not_null()) .col(ColumnDef::new(Table::AppendOnly).boolean().not_null()) .col(ColumnDef::new(Table::FragmentId).integer()) .col(ColumnDef::new(Table::VnodeColIndex).integer()) .col(ColumnDef::new(Table::RowIdIndex).integer()) - .col(ColumnDef::new(Table::ValueIndices).json().not_null()) + .col(ColumnDef::new(Table::ValueIndices).json_binary().not_null()) .col(ColumnDef::new(Table::Definition).string().not_null()) .col( ColumnDef::new(Table::HandlePkConflictBehavior) @@ -559,19 +579,27 @@ impl MigrationTrait for Migration { .integer() .not_null(), ) - .col(ColumnDef::new(Table::WatermarkIndices).json().not_null()) - .col(ColumnDef::new(Table::DistKeyInPk).json().not_null()) + .col( + ColumnDef::new(Table::WatermarkIndices) + .json_binary() + .not_null(), + ) + .col(ColumnDef::new(Table::DistKeyInPk).json_binary().not_null()) .col(ColumnDef::new(Table::DmlFragmentId).integer()) - .col(ColumnDef::new(Table::Cardinality).json()) + .col(ColumnDef::new(Table::Cardinality).json_binary()) .col( ColumnDef::new(Table::CleanedByWatermark) .boolean() .not_null(), ) .col(ColumnDef::new(Table::Description).string()) - .col(ColumnDef::new(Table::Version).json()) + .col(ColumnDef::new(Table::Version).json_binary()) .col(ColumnDef::new(Table::RetentionSeconds).integer()) - .col(ColumnDef::new(Table::IncomingSinks).json().not_null()) + .col( + ColumnDef::new(Table::IncomingSinks) + .json_binary() + .not_null(), + ) .foreign_key( &mut ForeignKey::create() .name("FK_table_object_id") @@ -618,17 +646,21 @@ impl MigrationTrait for Migration { .table(Sink::Table) .col(ColumnDef::new(Sink::SinkId).integer().primary_key()) .col(ColumnDef::new(Sink::Name).string().not_null()) - .col(ColumnDef::new(Sink::Columns).json().not_null()) - .col(ColumnDef::new(Sink::PlanPk).json().not_null()) - .col(ColumnDef::new(Sink::DistributionKey).json().not_null()) - .col(ColumnDef::new(Sink::DownstreamPk).json().not_null()) + .col(ColumnDef::new(Sink::Columns).json_binary().not_null()) + .col(ColumnDef::new(Sink::PlanPk).json_binary().not_null()) + .col( + ColumnDef::new(Sink::DistributionKey) + .json_binary() + .not_null(), + ) + .col(ColumnDef::new(Sink::DownstreamPk).json_binary().not_null()) .col(ColumnDef::new(Sink::SinkType).string().not_null()) - .col(ColumnDef::new(Sink::Properties).json().not_null()) + .col(ColumnDef::new(Sink::Properties).json_binary().not_null()) .col(ColumnDef::new(Sink::Definition).string().not_null()) .col(ColumnDef::new(Sink::ConnectionId).integer()) .col(ColumnDef::new(Sink::DbName).string().not_null()) .col(ColumnDef::new(Sink::SinkFromName).string().not_null()) - .col(ColumnDef::new(Sink::SinkFormatDesc).json()) + .col(ColumnDef::new(Sink::SinkFormatDesc).json_binary()) .col(ColumnDef::new(Sink::TargetTable).integer()) .foreign_key( &mut ForeignKey::create() @@ -661,9 +693,9 @@ impl MigrationTrait for Migration { .table(View::Table) .col(ColumnDef::new(View::ViewId).integer().primary_key()) .col(ColumnDef::new(View::Name).string().not_null()) - .col(ColumnDef::new(View::Properties).json().not_null()) + .col(ColumnDef::new(View::Properties).json_binary().not_null()) .col(ColumnDef::new(View::Definition).string().not_null()) - .col(ColumnDef::new(View::Columns).json().not_null()) + .col(ColumnDef::new(View::Columns).json_binary().not_null()) .foreign_key( &mut ForeignKey::create() .name("FK_view_object_id") @@ -683,7 +715,7 @@ impl MigrationTrait for Migration { .col(ColumnDef::new(Index::Name).string().not_null()) .col(ColumnDef::new(Index::IndexTableId).integer().not_null()) .col(ColumnDef::new(Index::PrimaryTableId).integer().not_null()) - .col(ColumnDef::new(Index::IndexItems).json().not_null()) + .col(ColumnDef::new(Index::IndexItems).json_binary().not_null()) .col(ColumnDef::new(Index::IndexColumnsLen).integer().not_null()) .foreign_key( &mut ForeignKey::create() @@ -716,9 +748,13 @@ impl MigrationTrait for Migration { .table(Function::Table) .col(ColumnDef::new(Function::FunctionId).integer().primary_key()) .col(ColumnDef::new(Function::Name).string().not_null()) - .col(ColumnDef::new(Function::ArgNames).json().not_null()) - .col(ColumnDef::new(Function::ArgTypes).json().not_null()) - .col(ColumnDef::new(Function::ReturnType).json().not_null()) + .col(ColumnDef::new(Function::ArgNames).string().not_null()) + .col(ColumnDef::new(Function::ArgTypes).json_binary().not_null()) + .col( + ColumnDef::new(Function::ReturnType) + .json_binary() + .not_null(), + ) .col(ColumnDef::new(Function::Language).string().not_null()) .col(ColumnDef::new(Function::Link).string()) .col(ColumnDef::new(Function::Identifier).string()) diff --git a/src/meta/src/controller/catalog.rs b/src/meta/src/controller/catalog.rs index 41b94a4d73fa..1cd068d1fd57 100644 --- a/src/meta/src/controller/catalog.rs +++ b/src/meta/src/controller/catalog.rs @@ -2897,17 +2897,18 @@ mod tests { #[tokio::test] async fn test_create_function() -> MetaResult<()> { let mgr = CatalogController::new(MetaSrvEnv::for_test().await)?; - let return_type = risingwave_pb::data::DataType { + let test_data_type = risingwave_pb::data::DataType { type_name: risingwave_pb::data::data_type::TypeName::Int32 as _, ..Default::default() }; + let arg_types = vec![test_data_type.clone()]; let pb_function = PbFunction { schema_id: TEST_SCHEMA_ID as _, database_id: TEST_DATABASE_ID as _, name: "test_function".to_string(), owner: TEST_OWNER_ID as _, - arg_types: vec![], - return_type: Some(return_type.clone()), + arg_types, + return_type: Some(test_data_type.clone()), language: "python".to_string(), kind: Some(risingwave_pb::catalog::function::Kind::Scalar( Default::default(), @@ -2928,7 +2929,8 @@ mod tests { .one(&mgr.inner.read().await.db) .await? .unwrap(); - assert_eq!(function.return_type.0, return_type); + assert_eq!(function.return_type.0, test_data_type); + assert_eq!(function.arg_types.into_inner().len(), 1); assert_eq!(function.language, "python"); mgr.drop_function(function.function_id).await?;