diff --git a/src/meta/model_v2/src/lib.rs b/src/meta/model_v2/src/lib.rs index 5a386f1ecac3..e0db07f4709c 100644 --- a/src/meta/model_v2/src/lib.rs +++ b/src/meta/model_v2/src/lib.rs @@ -260,6 +260,12 @@ macro_rules! derive_array_from_blob { Self(vec![]) } } + + impl sea_orm::sea_query::Nullable for $struct_name { + fn null() -> Value { + Value::Bytes(None) + } + } }; } diff --git a/src/meta/model_v2/src/sink.rs b/src/meta/model_v2/src/sink.rs index 21aea7b696f4..c5a72fbb748c 100644 --- a/src/meta/model_v2/src/sink.rs +++ b/src/meta/model_v2/src/sink.rs @@ -74,7 +74,7 @@ pub struct Model { pub target_table: Option, // `secret_ref` stores the mapping info mapping from property name to secret id and type. pub secret_ref: Option, - pub original_target_columns: ColumnCatalogArray, + pub original_target_columns: Option, } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] @@ -131,7 +131,7 @@ impl From for ActiveModel { sink_format_desc: Set(pb_sink.format_desc.as_ref().map(|x| x.into())), target_table: Set(pb_sink.target_table.map(|x| x as _)), secret_ref: Set(Some(SecretRef::from(pb_sink.secret_refs))), - original_target_columns: Set(pb_sink.original_target_columns.into()), + original_target_columns: Set(Some(pb_sink.original_target_columns.into())), } } } diff --git a/src/meta/src/controller/mod.rs b/src/meta/src/controller/mod.rs index c1881ab1a65a..a97666a6d903 100644 --- a/src/meta/src/controller/mod.rs +++ b/src/meta/src/controller/mod.rs @@ -243,7 +243,11 @@ impl From> for PbSink { created_at_cluster_version: value.1.created_at_cluster_version, create_type: PbCreateType::Foreground as _, secret_refs: secret_ref_map, - original_target_columns: value.0.original_target_columns.to_protobuf(), + original_target_columns: value + .0 + .original_target_columns + .map(|cols| cols.to_protobuf()) + .unwrap_or_default(), } } } diff --git a/src/meta/src/controller/streaming_job.rs b/src/meta/src/controller/streaming_job.rs index 4756a9ded458..fd12630fd164 100644 --- a/src/meta/src/controller/streaming_job.rs +++ b/src/meta/src/controller/streaming_job.rs @@ -989,7 +989,7 @@ impl CatalogController { for sink_id in updated_sink_catalogs { sink::ActiveModel { sink_id: Set(sink_id as _), - original_target_columns: Set(original_table_catalogs.clone()), + original_target_columns: Set(Some(original_table_catalogs.clone())), ..Default::default() } .update(txn)