Skip to content

Commit

Permalink
fix(sql-backend): fix some function column types and change json colu…
Browse files Browse the repository at this point in the history
…mn to jsonb (#15299)
  • Loading branch information
yezizp2012 authored Feb 27, 2024
1 parent e0f9c68 commit 82dba30
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 46 deletions.
120 changes: 78 additions & 42 deletions src/meta/model_v2/migration/src/m20230908_072257_init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ impl MigrationTrait for Migration {
)
.col(
ColumnDef::new(WorkerProperty::ParallelUnitIds)
.json()
.json_binary()
.not_null(),
)
.col(
Expand Down Expand Up @@ -129,7 +129,7 @@ impl MigrationTrait for Migration {
.col(ColumnDef::new(User::CanCreateDb).boolean().not_null())
.col(ColumnDef::new(User::CanCreateUser).boolean().not_null())
.col(ColumnDef::new(User::CanLogin).boolean().not_null())
.col(ColumnDef::new(User::AuthInfo).json())
.col(ColumnDef::new(User::AuthInfo).json_binary())
.to_owned(),
)
.await?;
Expand Down Expand Up @@ -328,7 +328,11 @@ impl MigrationTrait for Migration {
.col(ColumnDef::new(StreamingJob::JobStatus).string().not_null())
.col(ColumnDef::new(StreamingJob::CreateType).string().not_null())
.col(ColumnDef::new(StreamingJob::Timezone).string())
.col(ColumnDef::new(StreamingJob::Parallelism).json().not_null())
.col(
ColumnDef::new(StreamingJob::Parallelism)
.json_binary()
.not_null(),
)
.foreign_key(
&mut ForeignKey::create()
.name("FK_streaming_job_object_id")
Expand Down Expand Up @@ -362,9 +366,13 @@ impl MigrationTrait for Migration {
.not_null(),
)
.col(ColumnDef::new(Fragment::StreamNode).binary().not_null())
.col(ColumnDef::new(Fragment::VnodeMapping).json().not_null())
.col(ColumnDef::new(Fragment::StateTableIds).json())
.col(ColumnDef::new(Fragment::UpstreamFragmentId).json())
.col(
ColumnDef::new(Fragment::VnodeMapping)
.json_binary()
.not_null(),
)
.col(ColumnDef::new(Fragment::StateTableIds).json_binary())
.col(ColumnDef::new(Fragment::UpstreamFragmentId).json_binary())
.foreign_key(
&mut ForeignKey::create()
.name("FK_fragment_table_id")
Expand All @@ -388,12 +396,12 @@ impl MigrationTrait for Migration {
)
.col(ColumnDef::new(Actor::FragmentId).integer().not_null())
.col(ColumnDef::new(Actor::Status).string().not_null())
.col(ColumnDef::new(Actor::Splits).json())
.col(ColumnDef::new(Actor::Splits).json_binary())
.col(ColumnDef::new(Actor::ParallelUnitId).integer().not_null())
.col(ColumnDef::new(Actor::WorkerId).integer().not_null())
.col(ColumnDef::new(Actor::UpstreamActorIds).json())
.col(ColumnDef::new(Actor::VnodeBitmap).json())
.col(ColumnDef::new(Actor::ExprContext).json().not_null())
.col(ColumnDef::new(Actor::UpstreamActorIds).json_binary())
.col(ColumnDef::new(Actor::VnodeBitmap).json_binary())
.col(ColumnDef::new(Actor::ExprContext).json_binary().not_null())
.foreign_key(
&mut ForeignKey::create()
.name("FK_actor_fragment_id")
Expand Down Expand Up @@ -427,23 +435,23 @@ impl MigrationTrait for Migration {
)
.col(
ColumnDef::new(ActorDispatcher::DistKeyIndices)
.json()
.json_binary()
.not_null(),
)
.col(
ColumnDef::new(ActorDispatcher::OutputIndices)
.json()
.json_binary()
.not_null(),
)
.col(ColumnDef::new(ActorDispatcher::HashMapping).json())
.col(ColumnDef::new(ActorDispatcher::HashMapping).json_binary())
.col(
ColumnDef::new(ActorDispatcher::DispatcherId)
.integer()
.not_null(),
)
.col(
ColumnDef::new(ActorDispatcher::DownstreamActorIds)
.json()
.json_binary()
.not_null(),
)
.col(ColumnDef::new(ActorDispatcher::DownstreamTableName).string())
Expand Down Expand Up @@ -476,7 +484,7 @@ impl MigrationTrait for Migration {
.primary_key(),
)
.col(ColumnDef::new(Connection::Name).string().not_null())
.col(ColumnDef::new(Connection::Info).json().not_null())
.col(ColumnDef::new(Connection::Info).json_binary().not_null())
.foreign_key(
&mut ForeignKey::create()
.name("FK_connection_object_id")
Expand All @@ -495,12 +503,20 @@ impl MigrationTrait for Migration {
.col(ColumnDef::new(Source::SourceId).integer().primary_key())
.col(ColumnDef::new(Source::Name).string().not_null())
.col(ColumnDef::new(Source::RowIdIndex).integer())
.col(ColumnDef::new(Source::Columns).json().not_null())
.col(ColumnDef::new(Source::PkColumnIds).json().not_null())
.col(ColumnDef::new(Source::WithProperties).json().not_null())
.col(ColumnDef::new(Source::Columns).json_binary().not_null())
.col(ColumnDef::new(Source::PkColumnIds).json_binary().not_null())
.col(
ColumnDef::new(Source::WithProperties)
.json_binary()
.not_null(),
)
.col(ColumnDef::new(Source::Definition).string().not_null())
.col(ColumnDef::new(Source::SourceInfo).json())
.col(ColumnDef::new(Source::WatermarkDescs).json().not_null())
.col(ColumnDef::new(Source::SourceInfo).json_binary())
.col(
ColumnDef::new(Source::WatermarkDescs)
.json_binary()
.not_null(),
)
.col(ColumnDef::new(Source::OptionalAssociatedTableId).integer())
.col(ColumnDef::new(Source::ConnectionId).integer())
.col(ColumnDef::new(Source::Version).big_integer().not_null())
Expand Down Expand Up @@ -539,15 +555,19 @@ impl MigrationTrait for Migration {
.col(ColumnDef::new(Table::OptionalAssociatedSourceId).integer())
.col(ColumnDef::new(Table::TableType).string().not_null())
.col(ColumnDef::new(Table::BelongsToJobId).integer())
.col(ColumnDef::new(Table::Columns).json().not_null())
.col(ColumnDef::new(Table::Pk).json().not_null())
.col(ColumnDef::new(Table::DistributionKey).json().not_null())
.col(ColumnDef::new(Table::StreamKey).json().not_null())
.col(ColumnDef::new(Table::Columns).json_binary().not_null())
.col(ColumnDef::new(Table::Pk).json_binary().not_null())
.col(
ColumnDef::new(Table::DistributionKey)
.json_binary()
.not_null(),
)
.col(ColumnDef::new(Table::StreamKey).json_binary().not_null())
.col(ColumnDef::new(Table::AppendOnly).boolean().not_null())
.col(ColumnDef::new(Table::FragmentId).integer())
.col(ColumnDef::new(Table::VnodeColIndex).integer())
.col(ColumnDef::new(Table::RowIdIndex).integer())
.col(ColumnDef::new(Table::ValueIndices).json().not_null())
.col(ColumnDef::new(Table::ValueIndices).json_binary().not_null())
.col(ColumnDef::new(Table::Definition).string().not_null())
.col(
ColumnDef::new(Table::HandlePkConflictBehavior)
Expand All @@ -559,19 +579,27 @@ impl MigrationTrait for Migration {
.integer()
.not_null(),
)
.col(ColumnDef::new(Table::WatermarkIndices).json().not_null())
.col(ColumnDef::new(Table::DistKeyInPk).json().not_null())
.col(
ColumnDef::new(Table::WatermarkIndices)
.json_binary()
.not_null(),
)
.col(ColumnDef::new(Table::DistKeyInPk).json_binary().not_null())
.col(ColumnDef::new(Table::DmlFragmentId).integer())
.col(ColumnDef::new(Table::Cardinality).json())
.col(ColumnDef::new(Table::Cardinality).json_binary())
.col(
ColumnDef::new(Table::CleanedByWatermark)
.boolean()
.not_null(),
)
.col(ColumnDef::new(Table::Description).string())
.col(ColumnDef::new(Table::Version).json())
.col(ColumnDef::new(Table::Version).json_binary())
.col(ColumnDef::new(Table::RetentionSeconds).integer())
.col(ColumnDef::new(Table::IncomingSinks).json().not_null())
.col(
ColumnDef::new(Table::IncomingSinks)
.json_binary()
.not_null(),
)
.foreign_key(
&mut ForeignKey::create()
.name("FK_table_object_id")
Expand Down Expand Up @@ -618,17 +646,21 @@ impl MigrationTrait for Migration {
.table(Sink::Table)
.col(ColumnDef::new(Sink::SinkId).integer().primary_key())
.col(ColumnDef::new(Sink::Name).string().not_null())
.col(ColumnDef::new(Sink::Columns).json().not_null())
.col(ColumnDef::new(Sink::PlanPk).json().not_null())
.col(ColumnDef::new(Sink::DistributionKey).json().not_null())
.col(ColumnDef::new(Sink::DownstreamPk).json().not_null())
.col(ColumnDef::new(Sink::Columns).json_binary().not_null())
.col(ColumnDef::new(Sink::PlanPk).json_binary().not_null())
.col(
ColumnDef::new(Sink::DistributionKey)
.json_binary()
.not_null(),
)
.col(ColumnDef::new(Sink::DownstreamPk).json_binary().not_null())
.col(ColumnDef::new(Sink::SinkType).string().not_null())
.col(ColumnDef::new(Sink::Properties).json().not_null())
.col(ColumnDef::new(Sink::Properties).json_binary().not_null())
.col(ColumnDef::new(Sink::Definition).string().not_null())
.col(ColumnDef::new(Sink::ConnectionId).integer())
.col(ColumnDef::new(Sink::DbName).string().not_null())
.col(ColumnDef::new(Sink::SinkFromName).string().not_null())
.col(ColumnDef::new(Sink::SinkFormatDesc).json())
.col(ColumnDef::new(Sink::SinkFormatDesc).json_binary())
.col(ColumnDef::new(Sink::TargetTable).integer())
.foreign_key(
&mut ForeignKey::create()
Expand Down Expand Up @@ -661,9 +693,9 @@ impl MigrationTrait for Migration {
.table(View::Table)
.col(ColumnDef::new(View::ViewId).integer().primary_key())
.col(ColumnDef::new(View::Name).string().not_null())
.col(ColumnDef::new(View::Properties).json().not_null())
.col(ColumnDef::new(View::Properties).json_binary().not_null())
.col(ColumnDef::new(View::Definition).string().not_null())
.col(ColumnDef::new(View::Columns).json().not_null())
.col(ColumnDef::new(View::Columns).json_binary().not_null())
.foreign_key(
&mut ForeignKey::create()
.name("FK_view_object_id")
Expand All @@ -683,7 +715,7 @@ impl MigrationTrait for Migration {
.col(ColumnDef::new(Index::Name).string().not_null())
.col(ColumnDef::new(Index::IndexTableId).integer().not_null())
.col(ColumnDef::new(Index::PrimaryTableId).integer().not_null())
.col(ColumnDef::new(Index::IndexItems).json().not_null())
.col(ColumnDef::new(Index::IndexItems).json_binary().not_null())
.col(ColumnDef::new(Index::IndexColumnsLen).integer().not_null())
.foreign_key(
&mut ForeignKey::create()
Expand Down Expand Up @@ -716,9 +748,13 @@ impl MigrationTrait for Migration {
.table(Function::Table)
.col(ColumnDef::new(Function::FunctionId).integer().primary_key())
.col(ColumnDef::new(Function::Name).string().not_null())
.col(ColumnDef::new(Function::ArgNames).json().not_null())
.col(ColumnDef::new(Function::ArgTypes).json().not_null())
.col(ColumnDef::new(Function::ReturnType).json().not_null())
.col(ColumnDef::new(Function::ArgNames).string().not_null())
.col(ColumnDef::new(Function::ArgTypes).json_binary().not_null())
.col(
ColumnDef::new(Function::ReturnType)
.json_binary()
.not_null(),
)
.col(ColumnDef::new(Function::Language).string().not_null())
.col(ColumnDef::new(Function::Link).string())
.col(ColumnDef::new(Function::Identifier).string())
Expand Down
10 changes: 6 additions & 4 deletions src/meta/src/controller/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2897,17 +2897,18 @@ mod tests {
#[tokio::test]
async fn test_create_function() -> MetaResult<()> {
let mgr = CatalogController::new(MetaSrvEnv::for_test().await)?;
let return_type = risingwave_pb::data::DataType {
let test_data_type = risingwave_pb::data::DataType {
type_name: risingwave_pb::data::data_type::TypeName::Int32 as _,
..Default::default()
};
let arg_types = vec![test_data_type.clone()];
let pb_function = PbFunction {
schema_id: TEST_SCHEMA_ID as _,
database_id: TEST_DATABASE_ID as _,
name: "test_function".to_string(),
owner: TEST_OWNER_ID as _,
arg_types: vec![],
return_type: Some(return_type.clone()),
arg_types,
return_type: Some(test_data_type.clone()),
language: "python".to_string(),
kind: Some(risingwave_pb::catalog::function::Kind::Scalar(
Default::default(),
Expand All @@ -2928,7 +2929,8 @@ mod tests {
.one(&mgr.inner.read().await.db)
.await?
.unwrap();
assert_eq!(function.return_type.0, return_type);
assert_eq!(function.return_type.0, test_data_type);
assert_eq!(function.arg_types.into_inner().len(), 1);
assert_eq!(function.language, "python");

mgr.drop_function(function.function_id).await?;
Expand Down

0 comments on commit 82dba30

Please sign in to comment.