Skip to content

Commit

Permalink
feat(sql-backend): change json column type to blob to allow proto fie…
Browse files Browse the repository at this point in the history
…ld rename (#16090)
  • Loading branch information
yezizp2012 committed Apr 12, 2024
1 parent 00cb2cc commit 47be1b6
Show file tree
Hide file tree
Showing 28 changed files with 305 additions and 238 deletions.
13 changes: 7 additions & 6 deletions src/ctl/src/cmd_impl/meta/migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ use risingwave_meta::stream::TableRevision;
use risingwave_meta_model_migration::{Migrator, MigratorTrait};
use risingwave_meta_model_v2::catalog_version::VersionCategory;
use risingwave_meta_model_v2::compaction_status::LevelHandlers;
use risingwave_meta_model_v2::fragment::StreamNode;
use risingwave_meta_model_v2::hummock_sequence::{
COMPACTION_GROUP_ID, COMPACTION_TASK_ID, META_BACKUP_ID, SSTABLE_OBJECT_ID,
};
Expand Down Expand Up @@ -435,7 +434,7 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an
table.schema_id = *schema_rewrite.get(&table.schema_id).unwrap();
});
let mut fragment = fragment.into_active_model();
fragment.stream_node = Set(StreamNode::from_protobuf(&stream_node));
fragment.stream_node = Set((&stream_node).into());
Fragment::insert(fragment)
.exec(&meta_store_sql.conn)
.await?;
Expand Down Expand Up @@ -683,7 +682,7 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an
max_committed_epoch: Set(vd.max_committed_epoch as _),
safe_epoch: Set(vd.safe_epoch as _),
trivial_move: Set(vd.trivial_move),
full_version_delta: Set(vd.to_protobuf().into()),
full_version_delta: Set((&vd.to_protobuf()).into()),
})
.collect_vec(),
)
Expand Down Expand Up @@ -716,7 +715,7 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an
.into_iter()
.map(|cg| compaction_config::ActiveModel {
compaction_group_id: Set(cg.group_id as _),
config: Set((*cg.compaction_config).clone().into()),
config: Set((&*cg.compaction_config).into()),
})
.collect_vec(),
)
Expand All @@ -733,7 +732,9 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an
.into_iter()
.map(|cs| compaction_status::ActiveModel {
compaction_group_id: Set(cs.compaction_group_id as _),
status: Set(LevelHandlers(cs.level_handlers.iter().map_into().collect())),
status: Set(LevelHandlers::from(
cs.level_handlers.iter().map_into().collect_vec(),
)),
})
.collect_vec(),
)
Expand All @@ -751,7 +752,7 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an
compaction_task::ActiveModel {
id: Set(task.task_id as _),
context_id: Set(context_id as _),
task: Set(task.into()),
task: Set((&task).into()),
}
}))
.exec(&meta_store_sql.conn)
Expand Down
11 changes: 10 additions & 1 deletion src/meta/model_v2/migration/README.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,19 @@
# Running Migrator CLI

> **WARNING:** Migration files are used to define schema changes for the database. Each migration file contains an up and down function,
> which are used to define upgrade and downgrade operations for the schema.
>
> When you need to make schema changes to the system catalog, you need to generate a new migration file and then apply it to the database.
> Note that each migration file can only be applied once and will be recorded in a system table, so for new schema changes, you need to
> generate a new migration file. Unless you are sure the modification of the migration file has not been included in any released version yet,
> **DO NOT** modify already published migration files.
## How to run the migrator CLI
- Generate a new migration file
```sh
cargo run -- generate MIGRATION_NAME
```
- Apply all pending migrations
- Apply all pending migrations for test purposes, `DATABASE_URL` required.
```sh
cargo run
```
Expand Down
57 changes: 23 additions & 34 deletions src/meta/model_v2/migration/src/m20230908_072257_init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ impl MigrationTrait for Migration {
.col(ColumnDef::new(User::CanCreateDb).boolean().not_null())
.col(ColumnDef::new(User::CanCreateUser).boolean().not_null())
.col(ColumnDef::new(User::CanLogin).boolean().not_null())
.col(ColumnDef::new(User::AuthInfo).json_binary())
.col(ColumnDef::new(User::AuthInfo).binary())
.to_owned(),
)
.await?;
Expand Down Expand Up @@ -381,11 +381,7 @@ impl MigrationTrait for Migration {
.blob(BlobSize::Long)
.not_null(),
)
.col(
ColumnDef::new(Fragment::VnodeMapping)
.json_binary()
.not_null(),
)
.col(ColumnDef::new(Fragment::VnodeMapping).binary().not_null())
.col(ColumnDef::new(Fragment::StateTableIds).json_binary())
.col(ColumnDef::new(Fragment::UpstreamFragmentId).json_binary())
.foreign_key(
Expand All @@ -411,12 +407,12 @@ impl MigrationTrait for Migration {
)
.col(ColumnDef::new(Actor::FragmentId).integer().not_null())
.col(ColumnDef::new(Actor::Status).string().not_null())
.col(ColumnDef::new(Actor::Splits).json_binary())
.col(ColumnDef::new(Actor::Splits).binary())
.col(ColumnDef::new(Actor::ParallelUnitId).integer().not_null())
.col(ColumnDef::new(Actor::WorkerId).integer().not_null())
.col(ColumnDef::new(Actor::UpstreamActorIds).json_binary())
.col(ColumnDef::new(Actor::VnodeBitmap).json_binary())
.col(ColumnDef::new(Actor::ExprContext).json_binary().not_null())
.col(ColumnDef::new(Actor::VnodeBitmap).binary())
.col(ColumnDef::new(Actor::ExprContext).binary().not_null())
.foreign_key(
&mut ForeignKey::create()
.name("FK_actor_fragment_id")
Expand Down Expand Up @@ -458,7 +454,7 @@ impl MigrationTrait for Migration {
.json_binary()
.not_null(),
)
.col(ColumnDef::new(ActorDispatcher::HashMapping).json_binary())
.col(ColumnDef::new(ActorDispatcher::HashMapping).binary())
.col(
ColumnDef::new(ActorDispatcher::DispatcherId)
.integer()
Expand Down Expand Up @@ -499,7 +495,7 @@ impl MigrationTrait for Migration {
.primary_key(),
)
.col(ColumnDef::new(Connection::Name).string().not_null())
.col(ColumnDef::new(Connection::Info).json_binary().not_null())
.col(ColumnDef::new(Connection::Info).binary().not_null())
.foreign_key(
&mut ForeignKey::create()
.name("FK_connection_object_id")
Expand All @@ -518,20 +514,16 @@ impl MigrationTrait for Migration {
.col(ColumnDef::new(Source::SourceId).integer().primary_key())
.col(ColumnDef::new(Source::Name).string().not_null())
.col(ColumnDef::new(Source::RowIdIndex).integer())
.col(ColumnDef::new(Source::Columns).json_binary().not_null())
.col(ColumnDef::new(Source::Columns).binary().not_null())
.col(ColumnDef::new(Source::PkColumnIds).json_binary().not_null())
.col(
ColumnDef::new(Source::WithProperties)
.json_binary()
.not_null(),
)
.col(ColumnDef::new(Source::Definition).text().not_null())
.col(ColumnDef::new(Source::SourceInfo).json_binary())
.col(
ColumnDef::new(Source::WatermarkDescs)
.json_binary()
.not_null(),
)
.col(ColumnDef::new(Source::SourceInfo).binary())
.col(ColumnDef::new(Source::WatermarkDescs).binary().not_null())
.col(ColumnDef::new(Source::OptionalAssociatedTableId).integer())
.col(ColumnDef::new(Source::ConnectionId).integer())
.col(ColumnDef::new(Source::Version).big_integer().not_null())
Expand Down Expand Up @@ -570,8 +562,8 @@ impl MigrationTrait for Migration {
.col(ColumnDef::new(Table::OptionalAssociatedSourceId).integer())
.col(ColumnDef::new(Table::TableType).string().not_null())
.col(ColumnDef::new(Table::BelongsToJobId).integer())
.col(ColumnDef::new(Table::Columns).json_binary().not_null())
.col(ColumnDef::new(Table::Pk).json_binary().not_null())
.col(ColumnDef::new(Table::Columns).binary().not_null())
.col(ColumnDef::new(Table::Pk).binary().not_null())
.col(
ColumnDef::new(Table::DistributionKey)
.json_binary()
Expand Down Expand Up @@ -601,14 +593,14 @@ impl MigrationTrait for Migration {
)
.col(ColumnDef::new(Table::DistKeyInPk).json_binary().not_null())
.col(ColumnDef::new(Table::DmlFragmentId).integer())
.col(ColumnDef::new(Table::Cardinality).json_binary())
.col(ColumnDef::new(Table::Cardinality).binary())
.col(
ColumnDef::new(Table::CleanedByWatermark)
.boolean()
.not_null(),
)
.col(ColumnDef::new(Table::Description).string())
.col(ColumnDef::new(Table::Version).json_binary())
.col(ColumnDef::new(Table::Version).binary())
.col(ColumnDef::new(Table::RetentionSeconds).integer())
.col(
ColumnDef::new(Table::IncomingSinks)
Expand Down Expand Up @@ -650,7 +642,8 @@ impl MigrationTrait for Migration {
&mut ForeignKey::create()
.name("FK_table_optional_associated_source_id")
.from(Table::Table, Table::OptionalAssociatedSourceId)
.to(Source::Table, Source::SourceId)
.to(Object::Table, Object::Oid)
.on_delete(ForeignKeyAction::Cascade)
.to_owned(),
)
.to_owned(),
Expand All @@ -662,8 +655,8 @@ impl MigrationTrait for Migration {
.table(Sink::Table)
.col(ColumnDef::new(Sink::SinkId).integer().primary_key())
.col(ColumnDef::new(Sink::Name).string().not_null())
.col(ColumnDef::new(Sink::Columns).json_binary().not_null())
.col(ColumnDef::new(Sink::PlanPk).json_binary().not_null())
.col(ColumnDef::new(Sink::Columns).binary().not_null())
.col(ColumnDef::new(Sink::PlanPk).binary().not_null())
.col(
ColumnDef::new(Sink::DistributionKey)
.json_binary()
Expand All @@ -676,7 +669,7 @@ impl MigrationTrait for Migration {
.col(ColumnDef::new(Sink::ConnectionId).integer())
.col(ColumnDef::new(Sink::DbName).string().not_null())
.col(ColumnDef::new(Sink::SinkFromName).string().not_null())
.col(ColumnDef::new(Sink::SinkFormatDesc).json_binary())
.col(ColumnDef::new(Sink::SinkFormatDesc).binary())
.col(ColumnDef::new(Sink::TargetTable).integer())
.foreign_key(
&mut ForeignKey::create()
Expand Down Expand Up @@ -711,7 +704,7 @@ impl MigrationTrait for Migration {
.col(ColumnDef::new(View::Name).string().not_null())
.col(ColumnDef::new(View::Properties).json_binary().not_null())
.col(ColumnDef::new(View::Definition).text().not_null())
.col(ColumnDef::new(View::Columns).json_binary().not_null())
.col(ColumnDef::new(View::Columns).binary().not_null())
.foreign_key(
&mut ForeignKey::create()
.name("FK_view_object_id")
Expand All @@ -731,7 +724,7 @@ impl MigrationTrait for Migration {
.col(ColumnDef::new(Index::Name).string().not_null())
.col(ColumnDef::new(Index::IndexTableId).integer().not_null())
.col(ColumnDef::new(Index::PrimaryTableId).integer().not_null())
.col(ColumnDef::new(Index::IndexItems).json_binary().not_null())
.col(ColumnDef::new(Index::IndexItems).binary().not_null())
.col(ColumnDef::new(Index::IndexColumnsLen).integer().not_null())
.foreign_key(
&mut ForeignKey::create()
Expand Down Expand Up @@ -767,12 +760,8 @@ impl MigrationTrait for Migration {
.col(ColumnDef::new(Function::FunctionId).integer().primary_key())
.col(ColumnDef::new(Function::Name).string().not_null())
.col(ColumnDef::new(Function::ArgNames).string().not_null())
.col(ColumnDef::new(Function::ArgTypes).json_binary().not_null())
.col(
ColumnDef::new(Function::ReturnType)
.json_binary()
.not_null(),
)
.col(ColumnDef::new(Function::ArgTypes).binary().not_null())
.col(ColumnDef::new(Function::ReturnType).binary().not_null())
.col(ColumnDef::new(Function::Language).string().not_null())
.col(ColumnDef::new(Function::Link).string())
.col(ColumnDef::new(Function::Identifier).string())
Expand Down
12 changes: 4 additions & 8 deletions src/meta/model_v2/migration/src/m20231008_020431_hummock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,7 @@ impl MigrationTrait for Migration {
.not_null()
.primary_key(),
)
.col(
ColumnDef::new(CompactionTask::Task)
.json_binary()
.not_null(),
)
.col(ColumnDef::new(CompactionTask::Task).binary().not_null())
.col(
ColumnDef::new(CompactionTask::ContextId)
.integer()
Expand All @@ -54,7 +50,7 @@ impl MigrationTrait for Migration {
.not_null()
.primary_key(),
)
.col(ColumnDef::new(CompactionConfig::Config).json_binary())
.col(ColumnDef::new(CompactionConfig::Config).binary())
.to_owned(),
)
.await?;
Expand All @@ -69,7 +65,7 @@ impl MigrationTrait for Migration {
.not_null()
.primary_key(),
)
.col(ColumnDef::new(CompactionStatus::Status).json_binary())
.col(ColumnDef::new(CompactionStatus::Status).binary())
.to_owned(),
)
.await?;
Expand Down Expand Up @@ -142,7 +138,7 @@ impl MigrationTrait for Migration {
.boolean()
.not_null(),
)
.col(ColumnDef::new(HummockVersionDelta::FullVersionDelta).json_binary())
.col(ColumnDef::new(HummockVersionDelta::FullVersionDelta).binary())
.to_owned(),
)
.await?;
Expand Down
12 changes: 2 additions & 10 deletions src/meta/model_v2/migration/src/m20240304_074901_subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,8 @@ impl MigrationTrait for Migration {
.primary_key(),
)
.col(ColumnDef::new(Subscription::Name).string().not_null())
.col(
ColumnDef::new(Subscription::Columns)
.json_binary()
.not_null(),
)
.col(
ColumnDef::new(Subscription::PlanPk)
.json_binary()
.not_null(),
)
.col(ColumnDef::new(Subscription::Columns).binary().not_null())
.col(ColumnDef::new(Subscription::PlanPk).binary().not_null())
.col(
ColumnDef::new(Subscription::DistributionKey)
.json_binary()
Expand Down
4 changes: 2 additions & 2 deletions src/meta/model_v2/src/actor_dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ impl From<(u32, PbDispatcher)> for Model {
dispatcher_type: dispatcher.r#type().into(),
dist_key_indices: dispatcher.dist_key_indices.into(),
output_indices: dispatcher.output_indices.into(),
hash_mapping: dispatcher.hash_mapping.map(ActorMapping),
hash_mapping: dispatcher.hash_mapping.as_ref().map(ActorMapping::from),
dispatcher_id: dispatcher.dispatcher_id as _,
downstream_actor_ids: dispatcher.downstream_actor_id.into(),
}
Expand All @@ -74,7 +74,7 @@ impl From<Model> for PbDispatcher {
r#type: PbDispatcherType::from(model.dispatcher_type) as _,
dist_key_indices: model.dist_key_indices.into_u32_array(),
output_indices: model.output_indices.into_u32_array(),
hash_mapping: model.hash_mapping.map(|mapping| mapping.into_inner()),
hash_mapping: model.hash_mapping.map(|mapping| mapping.to_protobuf()),
dispatcher_id: model.dispatcher_id as _,
downstream_actor_id: model.downstream_actor_ids.into_u32_array(),
}
Expand Down
3 changes: 1 addition & 2 deletions src/meta/model_v2/src/compaction_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

use risingwave_pb::hummock::CompactionConfig as PbCompactionConfig;
use sea_orm::entity::prelude::*;
use sea_orm::FromJsonQueryResult;
use serde::{Deserialize, Serialize};

use crate::CompactionGroupId;
Expand All @@ -32,4 +31,4 @@ pub enum Relation {}

impl ActiveModelBehavior for ActiveModel {}

crate::derive_from_json_struct!(CompactionConfig, PbCompactionConfig);
crate::derive_from_blob!(CompactionConfig, PbCompactionConfig);
4 changes: 1 addition & 3 deletions src/meta/model_v2/src/compaction_status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@

use risingwave_pb::hummock::LevelHandler as PbLevelHandler;
use sea_orm::entity::prelude::*;
use sea_orm::FromJsonQueryResult;
use serde::{Deserialize, Serialize};

use crate::CompactionGroupId;

Expand All @@ -32,4 +30,4 @@ pub enum Relation {}

impl ActiveModelBehavior for ActiveModel {}

crate::derive_from_json_struct!(LevelHandlers, Vec<PbLevelHandler>);
crate::derive_array_from_blob!(LevelHandlers, PbLevelHandler, PbLevelHandlerArray);
5 changes: 2 additions & 3 deletions src/meta/model_v2/src/compaction_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

use risingwave_pb::hummock::{CompactTask as PbCompactTask, CompactTaskAssignment};
use sea_orm::entity::prelude::*;
use sea_orm::FromJsonQueryResult;
use serde::{Deserialize, Serialize};

use crate::{CompactionTaskId, WorkerId};
Expand All @@ -33,12 +32,12 @@ pub enum Relation {}

impl ActiveModelBehavior for ActiveModel {}

crate::derive_from_json_struct!(CompactionTask, PbCompactTask);
crate::derive_from_blob!(CompactionTask, PbCompactTask);

impl From<Model> for CompactTaskAssignment {
fn from(value: Model) -> Self {
Self {
compact_task: Some(value.task.0),
compact_task: Some(value.task.to_protobuf()),
context_id: value.context_id as _,
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/meta/model_v2/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ impl From<PbConnection> for ActiveModel {
Self {
connection_id: Set(conn.id as _),
name: Set(conn.name),
info: Set(PrivateLinkService(private_link_srv)),
info: Set(PrivateLinkService::from(&private_link_srv)),
}
}
}
Loading

0 comments on commit 47be1b6

Please sign in to comment.