Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sql-backend): change json column type to blob to allow proto field rename #16090

Merged
merged 14 commits into from
Apr 12, 2024
13 changes: 7 additions & 6 deletions src/ctl/src/cmd_impl/meta/migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ use risingwave_meta::stream::TableRevision;
use risingwave_meta_model_migration::{Migrator, MigratorTrait};
use risingwave_meta_model_v2::catalog_version::VersionCategory;
use risingwave_meta_model_v2::compaction_status::LevelHandlers;
use risingwave_meta_model_v2::fragment::StreamNode;
use risingwave_meta_model_v2::hummock_sequence::{
COMPACTION_GROUP_ID, COMPACTION_TASK_ID, META_BACKUP_ID, SSTABLE_OBJECT_ID,
};
Expand Down Expand Up @@ -435,7 +434,7 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an
table.schema_id = *schema_rewrite.get(&table.schema_id).unwrap();
});
let mut fragment = fragment.into_active_model();
fragment.stream_node = Set(StreamNode::from_protobuf(&stream_node));
fragment.stream_node = Set((&stream_node).into());
Fragment::insert(fragment)
.exec(&meta_store_sql.conn)
.await?;
Expand Down Expand Up @@ -683,7 +682,7 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an
max_committed_epoch: Set(vd.max_committed_epoch as _),
safe_epoch: Set(vd.safe_epoch as _),
trivial_move: Set(vd.trivial_move),
full_version_delta: Set(vd.to_protobuf().into()),
full_version_delta: Set((&vd.to_protobuf()).into()),
})
.collect_vec(),
)
Expand Down Expand Up @@ -716,7 +715,7 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an
.into_iter()
.map(|cg| compaction_config::ActiveModel {
compaction_group_id: Set(cg.group_id as _),
config: Set((*cg.compaction_config).clone().into()),
config: Set((&*cg.compaction_config).into()),
})
.collect_vec(),
)
Expand All @@ -733,7 +732,9 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an
.into_iter()
.map(|cs| compaction_status::ActiveModel {
compaction_group_id: Set(cs.compaction_group_id as _),
status: Set(LevelHandlers(cs.level_handlers.iter().map_into().collect())),
status: Set(LevelHandlers::from(
cs.level_handlers.iter().map_into().collect_vec(),
)),
})
.collect_vec(),
)
Expand All @@ -751,7 +752,7 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an
compaction_task::ActiveModel {
id: Set(task.task_id as _),
context_id: Set(context_id as _),
task: Set(task.into()),
task: Set((&task).into()),
}
}))
.exec(&meta_store_sql.conn)
Expand Down
11 changes: 10 additions & 1 deletion src/meta/model_v2/migration/README.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,19 @@
# Running Migrator CLI

> **WARNING:** Migration files are used to define schema changes for the database. Each migration file contains an up and down function,
> which are used to define upgrade and downgrade operations for the schema.
>
> When you need to make schema changes to the system catalog, you need to generate a new migration file and then apply it to the database.
> Note that each migration file can only be applied once and will be recorded in a system table, so for new schema changes, you need to
> generate a new migration file. Unless you are sure the modification of the migration file has not been included in any released version yet,
> **DO NOT** modify already published migration files.

## How to run the migrator CLI
- Generate a new migration file
```sh
cargo run -- generate MIGRATION_NAME
```
- Apply all pending migrations
- Apply all pending migrations for test purposes, `DATABASE_URL` required.
```sh
cargo run
```
Expand Down
57 changes: 23 additions & 34 deletions src/meta/model_v2/migration/src/m20230908_072257_init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ impl MigrationTrait for Migration {
.col(ColumnDef::new(User::CanCreateDb).boolean().not_null())
.col(ColumnDef::new(User::CanCreateUser).boolean().not_null())
.col(ColumnDef::new(User::CanLogin).boolean().not_null())
.col(ColumnDef::new(User::AuthInfo).json_binary())
.col(ColumnDef::new(User::AuthInfo).binary())
.to_owned(),
)
.await?;
Expand Down Expand Up @@ -381,11 +381,7 @@ impl MigrationTrait for Migration {
.blob(BlobSize::Long)
.not_null(),
)
.col(
ColumnDef::new(Fragment::VnodeMapping)
.json_binary()
.not_null(),
)
.col(ColumnDef::new(Fragment::VnodeMapping).binary().not_null())
.col(ColumnDef::new(Fragment::StateTableIds).json_binary())
.col(ColumnDef::new(Fragment::UpstreamFragmentId).json_binary())
.foreign_key(
Expand All @@ -411,12 +407,12 @@ impl MigrationTrait for Migration {
)
.col(ColumnDef::new(Actor::FragmentId).integer().not_null())
.col(ColumnDef::new(Actor::Status).string().not_null())
.col(ColumnDef::new(Actor::Splits).json_binary())
.col(ColumnDef::new(Actor::Splits).binary())
.col(ColumnDef::new(Actor::ParallelUnitId).integer().not_null())
.col(ColumnDef::new(Actor::WorkerId).integer().not_null())
.col(ColumnDef::new(Actor::UpstreamActorIds).json_binary())
.col(ColumnDef::new(Actor::VnodeBitmap).json_binary())
.col(ColumnDef::new(Actor::ExprContext).json_binary().not_null())
.col(ColumnDef::new(Actor::VnodeBitmap).binary())
.col(ColumnDef::new(Actor::ExprContext).binary().not_null())
.foreign_key(
&mut ForeignKey::create()
.name("FK_actor_fragment_id")
Expand Down Expand Up @@ -458,7 +454,7 @@ impl MigrationTrait for Migration {
.json_binary()
.not_null(),
)
.col(ColumnDef::new(ActorDispatcher::HashMapping).json_binary())
.col(ColumnDef::new(ActorDispatcher::HashMapping).binary())
.col(
ColumnDef::new(ActorDispatcher::DispatcherId)
.integer()
Expand Down Expand Up @@ -499,7 +495,7 @@ impl MigrationTrait for Migration {
.primary_key(),
)
.col(ColumnDef::new(Connection::Name).string().not_null())
.col(ColumnDef::new(Connection::Info).json_binary().not_null())
.col(ColumnDef::new(Connection::Info).binary().not_null())
.foreign_key(
&mut ForeignKey::create()
.name("FK_connection_object_id")
Expand All @@ -518,20 +514,16 @@ impl MigrationTrait for Migration {
.col(ColumnDef::new(Source::SourceId).integer().primary_key())
.col(ColumnDef::new(Source::Name).string().not_null())
.col(ColumnDef::new(Source::RowIdIndex).integer())
.col(ColumnDef::new(Source::Columns).json_binary().not_null())
.col(ColumnDef::new(Source::Columns).binary().not_null())
.col(ColumnDef::new(Source::PkColumnIds).json_binary().not_null())
.col(
ColumnDef::new(Source::WithProperties)
.json_binary()
.not_null(),
)
.col(ColumnDef::new(Source::Definition).text().not_null())
.col(ColumnDef::new(Source::SourceInfo).json_binary())
.col(
ColumnDef::new(Source::WatermarkDescs)
.json_binary()
.not_null(),
)
.col(ColumnDef::new(Source::SourceInfo).binary())
.col(ColumnDef::new(Source::WatermarkDescs).binary().not_null())
.col(ColumnDef::new(Source::OptionalAssociatedTableId).integer())
.col(ColumnDef::new(Source::ConnectionId).integer())
.col(ColumnDef::new(Source::Version).big_integer().not_null())
Expand Down Expand Up @@ -570,8 +562,8 @@ impl MigrationTrait for Migration {
.col(ColumnDef::new(Table::OptionalAssociatedSourceId).integer())
.col(ColumnDef::new(Table::TableType).string().not_null())
.col(ColumnDef::new(Table::BelongsToJobId).integer())
.col(ColumnDef::new(Table::Columns).json_binary().not_null())
.col(ColumnDef::new(Table::Pk).json_binary().not_null())
.col(ColumnDef::new(Table::Columns).binary().not_null())
.col(ColumnDef::new(Table::Pk).binary().not_null())
.col(
ColumnDef::new(Table::DistributionKey)
.json_binary()
Expand Down Expand Up @@ -601,14 +593,14 @@ impl MigrationTrait for Migration {
)
.col(ColumnDef::new(Table::DistKeyInPk).json_binary().not_null())
.col(ColumnDef::new(Table::DmlFragmentId).integer())
.col(ColumnDef::new(Table::Cardinality).json_binary())
.col(ColumnDef::new(Table::Cardinality).binary())
.col(
ColumnDef::new(Table::CleanedByWatermark)
.boolean()
.not_null(),
)
.col(ColumnDef::new(Table::Description).string())
.col(ColumnDef::new(Table::Version).json_binary())
.col(ColumnDef::new(Table::Version).binary())
.col(ColumnDef::new(Table::RetentionSeconds).integer())
.col(
ColumnDef::new(Table::IncomingSinks)
Expand Down Expand Up @@ -650,7 +642,8 @@ impl MigrationTrait for Migration {
&mut ForeignKey::create()
.name("FK_table_optional_associated_source_id")
.from(Table::Table, Table::OptionalAssociatedSourceId)
.to(Source::Table, Source::SourceId)
.to(Object::Table, Object::Oid)
.on_delete(ForeignKeyAction::Cascade)
.to_owned(),
)
.to_owned(),
Expand All @@ -662,8 +655,8 @@ impl MigrationTrait for Migration {
.table(Sink::Table)
.col(ColumnDef::new(Sink::SinkId).integer().primary_key())
.col(ColumnDef::new(Sink::Name).string().not_null())
.col(ColumnDef::new(Sink::Columns).json_binary().not_null())
.col(ColumnDef::new(Sink::PlanPk).json_binary().not_null())
.col(ColumnDef::new(Sink::Columns).binary().not_null())
.col(ColumnDef::new(Sink::PlanPk).binary().not_null())
.col(
ColumnDef::new(Sink::DistributionKey)
.json_binary()
Expand All @@ -676,7 +669,7 @@ impl MigrationTrait for Migration {
.col(ColumnDef::new(Sink::ConnectionId).integer())
.col(ColumnDef::new(Sink::DbName).string().not_null())
.col(ColumnDef::new(Sink::SinkFromName).string().not_null())
.col(ColumnDef::new(Sink::SinkFormatDesc).json_binary())
.col(ColumnDef::new(Sink::SinkFormatDesc).binary())
.col(ColumnDef::new(Sink::TargetTable).integer())
.foreign_key(
&mut ForeignKey::create()
Expand Down Expand Up @@ -711,7 +704,7 @@ impl MigrationTrait for Migration {
.col(ColumnDef::new(View::Name).string().not_null())
.col(ColumnDef::new(View::Properties).json_binary().not_null())
.col(ColumnDef::new(View::Definition).text().not_null())
.col(ColumnDef::new(View::Columns).json_binary().not_null())
.col(ColumnDef::new(View::Columns).binary().not_null())
.foreign_key(
&mut ForeignKey::create()
.name("FK_view_object_id")
Expand All @@ -731,7 +724,7 @@ impl MigrationTrait for Migration {
.col(ColumnDef::new(Index::Name).string().not_null())
.col(ColumnDef::new(Index::IndexTableId).integer().not_null())
.col(ColumnDef::new(Index::PrimaryTableId).integer().not_null())
.col(ColumnDef::new(Index::IndexItems).json_binary().not_null())
.col(ColumnDef::new(Index::IndexItems).binary().not_null())
.col(ColumnDef::new(Index::IndexColumnsLen).integer().not_null())
.foreign_key(
&mut ForeignKey::create()
Expand Down Expand Up @@ -767,12 +760,8 @@ impl MigrationTrait for Migration {
.col(ColumnDef::new(Function::FunctionId).integer().primary_key())
.col(ColumnDef::new(Function::Name).string().not_null())
.col(ColumnDef::new(Function::ArgNames).string().not_null())
.col(ColumnDef::new(Function::ArgTypes).json_binary().not_null())
.col(
ColumnDef::new(Function::ReturnType)
.json_binary()
.not_null(),
)
.col(ColumnDef::new(Function::ArgTypes).binary().not_null())
.col(ColumnDef::new(Function::ReturnType).binary().not_null())
.col(ColumnDef::new(Function::Language).string().not_null())
.col(ColumnDef::new(Function::Link).string())
.col(ColumnDef::new(Function::Identifier).string())
Expand Down
12 changes: 4 additions & 8 deletions src/meta/model_v2/migration/src/m20231008_020431_hummock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,7 @@ impl MigrationTrait for Migration {
.not_null()
.primary_key(),
)
.col(
ColumnDef::new(CompactionTask::Task)
.json_binary()
.not_null(),
)
.col(ColumnDef::new(CompactionTask::Task).binary().not_null())
.col(
ColumnDef::new(CompactionTask::ContextId)
.integer()
Expand All @@ -54,7 +50,7 @@ impl MigrationTrait for Migration {
.not_null()
.primary_key(),
)
.col(ColumnDef::new(CompactionConfig::Config).json_binary())
.col(ColumnDef::new(CompactionConfig::Config).binary())
.to_owned(),
)
.await?;
Expand All @@ -69,7 +65,7 @@ impl MigrationTrait for Migration {
.not_null()
.primary_key(),
)
.col(ColumnDef::new(CompactionStatus::Status).json_binary())
.col(ColumnDef::new(CompactionStatus::Status).binary())
.to_owned(),
)
.await?;
Expand Down Expand Up @@ -142,7 +138,7 @@ impl MigrationTrait for Migration {
.boolean()
.not_null(),
)
.col(ColumnDef::new(HummockVersionDelta::FullVersionDelta).json_binary())
.col(ColumnDef::new(HummockVersionDelta::FullVersionDelta).binary())
.to_owned(),
)
.await?;
Expand Down
12 changes: 2 additions & 10 deletions src/meta/model_v2/migration/src/m20240304_074901_subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,8 @@ impl MigrationTrait for Migration {
.primary_key(),
)
.col(ColumnDef::new(Subscription::Name).string().not_null())
.col(
ColumnDef::new(Subscription::Columns)
.json_binary()
.not_null(),
)
.col(
ColumnDef::new(Subscription::PlanPk)
.json_binary()
.not_null(),
)
.col(ColumnDef::new(Subscription::Columns).binary().not_null())
.col(ColumnDef::new(Subscription::PlanPk).binary().not_null())
.col(
ColumnDef::new(Subscription::DistributionKey)
.json_binary()
Expand Down
4 changes: 2 additions & 2 deletions src/meta/model_v2/src/actor_dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ impl From<(u32, PbDispatcher)> for Model {
dispatcher_type: dispatcher.r#type().into(),
dist_key_indices: dispatcher.dist_key_indices.into(),
output_indices: dispatcher.output_indices.into(),
hash_mapping: dispatcher.hash_mapping.map(ActorMapping),
hash_mapping: dispatcher.hash_mapping.as_ref().map(ActorMapping::from),
dispatcher_id: dispatcher.dispatcher_id as _,
downstream_actor_ids: dispatcher.downstream_actor_id.into(),
}
Expand All @@ -74,7 +74,7 @@ impl From<Model> for PbDispatcher {
r#type: PbDispatcherType::from(model.dispatcher_type) as _,
dist_key_indices: model.dist_key_indices.into_u32_array(),
output_indices: model.output_indices.into_u32_array(),
hash_mapping: model.hash_mapping.map(|mapping| mapping.into_inner()),
hash_mapping: model.hash_mapping.map(|mapping| mapping.to_protobuf()),
dispatcher_id: model.dispatcher_id as _,
downstream_actor_id: model.downstream_actor_ids.into_u32_array(),
}
Expand Down
3 changes: 1 addition & 2 deletions src/meta/model_v2/src/compaction_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

use risingwave_pb::hummock::CompactionConfig as PbCompactionConfig;
use sea_orm::entity::prelude::*;
use sea_orm::FromJsonQueryResult;
use serde::{Deserialize, Serialize};

use crate::CompactionGroupId;
Expand All @@ -32,4 +31,4 @@ pub enum Relation {}

impl ActiveModelBehavior for ActiveModel {}

crate::derive_from_json_struct!(CompactionConfig, PbCompactionConfig);
crate::derive_from_blob!(CompactionConfig, PbCompactionConfig);
4 changes: 1 addition & 3 deletions src/meta/model_v2/src/compaction_status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@

use risingwave_pb::hummock::LevelHandler as PbLevelHandler;
use sea_orm::entity::prelude::*;
use sea_orm::FromJsonQueryResult;
use serde::{Deserialize, Serialize};

use crate::CompactionGroupId;

Expand All @@ -32,4 +30,4 @@ pub enum Relation {}

impl ActiveModelBehavior for ActiveModel {}

crate::derive_from_json_struct!(LevelHandlers, Vec<PbLevelHandler>);
crate::derive_array_from_blob!(LevelHandlers, PbLevelHandler, PbLevelHandlerArray);
5 changes: 2 additions & 3 deletions src/meta/model_v2/src/compaction_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

use risingwave_pb::hummock::{CompactTask as PbCompactTask, CompactTaskAssignment};
use sea_orm::entity::prelude::*;
use sea_orm::FromJsonQueryResult;
use serde::{Deserialize, Serialize};

use crate::{CompactionTaskId, WorkerId};
Expand All @@ -33,12 +32,12 @@ pub enum Relation {}

impl ActiveModelBehavior for ActiveModel {}

crate::derive_from_json_struct!(CompactionTask, PbCompactTask);
crate::derive_from_blob!(CompactionTask, PbCompactTask);

impl From<Model> for CompactTaskAssignment {
fn from(value: Model) -> Self {
Self {
compact_task: Some(value.task.0),
compact_task: Some(value.task.to_protobuf()),
context_id: value.context_id as _,
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/meta/model_v2/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ impl From<PbConnection> for ActiveModel {
Self {
connection_id: Set(conn.id as _),
name: Set(conn.name),
info: Set(PrivateLinkService(private_link_srv)),
info: Set(PrivateLinkService::from(&private_link_srv)),
}
}
}
Loading
Loading