diff --git a/proto/catalog.proto b/proto/catalog.proto index 5b4f5ae40ff4b..c9eedf93f41b4 100644 --- a/proto/catalog.proto +++ b/proto/catalog.proto @@ -83,8 +83,8 @@ message StreamSourceInfo { // Options specified by user in the FORMAT ENCODE clause. map format_encode_options = 14; - // Handle the source relies on any sceret. The key is the propertity name and the value is the secret id. - map secret_ref = 16; + // Handle the source relies on any sceret. The key is the propertity name and the value is the secret id and type. + map secret_ref = 16; } message Source { @@ -180,8 +180,8 @@ message Sink { // Whether it should use background ddl or block until backfill finishes. CreateType create_type = 24; - // Handle the sink relies on any sceret. The key is the propertity name and the value is the secret id. - map secret_ref = 25; + // Handle the sink relies on any sceret. The key is the propertity name and the value is the secret id and type. + map secret_ref = 25; } message Subscription { @@ -450,3 +450,14 @@ message Secret { uint32 owner = 5; uint32 schema_id = 6; } + +message SecretRef { + enum RefAsType { + UNSPECIFIED = 0; + TEXT = 1; + // AS FILE + FILE = 2; + } + uint32 secret_id = 1; + RefAsType ref_as = 2; +} diff --git a/src/connector/src/sink/catalog/desc.rs b/src/connector/src/sink/catalog/desc.rs index 87ccc7b96caf7..380cce6a8ebe3 100644 --- a/src/connector/src/sink/catalog/desc.rs +++ b/src/connector/src/sink/catalog/desc.rs @@ -19,6 +19,7 @@ use risingwave_common::catalog::{ ColumnCatalog, ConnectionId, CreateType, DatabaseId, SchemaId, TableId, UserId, }; use risingwave_common::util::sort_util::ColumnOrder; +use risingwave_pb::catalog::PbSecretRef; use risingwave_pb::stream_plan::PbSinkDesc; use super::{SinkCatalog, SinkFormatDesc, SinkId, SinkType}; @@ -83,7 +84,7 @@ impl SinkDesc { owner: UserId, connection_id: Option, dependent_relations: Vec, - secret_ref: BTreeMap, + secret_ref: BTreeMap, ) -> SinkCatalog { SinkCatalog { id: self.id, diff --git a/src/connector/src/sink/catalog/mod.rs b/src/connector/src/sink/catalog/mod.rs index f02eb2cdcf9e9..206236970d91d 100644 --- a/src/connector/src/sink/catalog/mod.rs +++ b/src/connector/src/sink/catalog/mod.rs @@ -25,7 +25,7 @@ use risingwave_common::catalog::{ use risingwave_common::util::epoch::Epoch; use risingwave_common::util::sort_util::ColumnOrder; use risingwave_pb::catalog::{ - PbCreateType, PbSink, PbSinkFormatDesc, PbSinkType, PbStreamJobStatus, + PbCreateType, PbSecretRef, PbSink, PbSinkFormatDesc, PbSinkType, PbStreamJobStatus, }; use super::{ @@ -339,7 +339,7 @@ pub struct SinkCatalog { pub create_type: CreateType, /// The secret reference for the sink, mapping from property name to secret id. - pub secret_ref: BTreeMap, + pub secret_ref: BTreeMap, } impl SinkCatalog { diff --git a/src/ctl/src/cmd_impl/meta/migration.rs b/src/ctl/src/cmd_impl/meta/migration.rs index a7f8395271943..93be066d4e727 100644 --- a/src/ctl/src/cmd_impl/meta/migration.rs +++ b/src/ctl/src/cmd_impl/meta/migration.rs @@ -555,12 +555,12 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an *id = *connection_rewrite.get(id).unwrap(); } for secret_id in s.secret_ref.values_mut() { - *secret_id = *secret_rewrite.get(secret_id).unwrap(); + secret_id.secret_id = *secret_rewrite.get(&secret_id.secret_id).unwrap(); } object_dependencies.extend(s.secret_ref.values().map(|id| { object_dependency::ActiveModel { id: NotSet, - oid: Set(*id as _), + oid: Set(id.secret_id as _), used_by: Set(s.id as _), } })); diff --git a/src/frontend/src/utils/with_options.rs b/src/frontend/src/utils/with_options.rs index 01993fd24108a..8f372b58b17ea 100644 --- a/src/frontend/src/utils/with_options.rs +++ b/src/frontend/src/utils/with_options.rs @@ -19,6 +19,7 @@ use risingwave_connector::source::kafka::private_link::{ insert_privatelink_broker_rewrite_map, CONNECTION_NAME_KEY, PRIVATELINK_ENDPOINT_KEY, }; use risingwave_connector::WithPropertiesExt; +use risingwave_pb::catalog::PbSecretRef; use risingwave_sqlparser::ast::{ CreateConnectionStatement, CreateSinkStatement, CreateSourceStatement, CreateSubscriptionStatement, SqlOption, Statement, Value, @@ -119,7 +120,7 @@ impl WithOptions { pub(crate) fn resolve_secret_in_with_options( _with_options: &mut WithOptions, _session: &SessionImpl, -) -> RwResult> { +) -> RwResult> { // todo: implement the function and take `resolve_privatelink_in_with_option` as reference Ok(BTreeMap::new()) diff --git a/src/meta/model_v2/migration/src/m20240525_090457_secret.rs b/src/meta/model_v2/migration/src/m20240525_090457_secret.rs index f16bfca5ec035..ed23085c66574 100644 --- a/src/meta/model_v2/migration/src/m20240525_090457_secret.rs +++ b/src/meta/model_v2/migration/src/m20240525_090457_secret.rs @@ -37,12 +37,22 @@ impl MigrationTrait for Migration { ) .await?; - // Add a new column to the table + // Add a new column to the `sink` table manager .alter_table( MigrationTable::alter() .table(Sink::Table) - .add_column(ColumnDef::new(Sink::SecretRef).json_binary()) + .add_column(ColumnDef::new(Sink::SecretRef).binary()) + .to_owned(), + ) + .await?; + + // Add a new column to the `source` table + manager + .alter_table( + MigrationTable::alter() + .table(Source::Table) + .add_column(ColumnDef::new(Source::SecretRef).binary()) .to_owned(), ) .await?; @@ -60,6 +70,14 @@ impl MigrationTrait for Migration { .to_owned(), ) .await?; + manager + .alter_table( + MigrationTable::alter() + .table(Source::Table) + .drop_column(Source::SecretRef) + .to_owned(), + ) + .await?; Ok(()) } } @@ -77,3 +95,9 @@ enum Sink { Table, SecretRef, } + +#[derive(DeriveIden)] +enum Source { + Table, + SecretRef, +} diff --git a/src/meta/model_v2/src/lib.rs b/src/meta/model_v2/src/lib.rs index 11c5209bdc562..116cb66cab1dc 100644 --- a/src/meta/model_v2/src/lib.rs +++ b/src/meta/model_v2/src/lib.rs @@ -14,7 +14,7 @@ use std::collections::BTreeMap; -use risingwave_pb::catalog::{PbCreateType, PbStreamJobStatus}; +use risingwave_pb::catalog::{PbCreateType, PbSecretRef, PbStreamJobStatus}; use risingwave_pb::meta::table_fragments::PbState as PbStreamJobState; use risingwave_pb::stream_plan::PbStreamNode; use sea_orm::entity::prelude::*; @@ -258,6 +258,55 @@ macro_rules! derive_array_from_blob { }; } +macro_rules! derive_btreemap_from_blob { + ($struct_name:ident, $key_type:ty, $value_type:ty, $field_type:ident) => { + #[derive(Clone, PartialEq, Eq, DeriveValueType, serde::Deserialize, serde::Serialize)] + pub struct $struct_name(#[sea_orm] Vec); + + #[derive(Clone, PartialEq, ::prost::Message)] + pub struct $field_type { + #[prost(btree_map = "string, message")] + inner: BTreeMap<$key_type, $value_type>, + } + impl Eq for $field_type {} + + impl $struct_name { + pub fn to_protobuf(&self) -> BTreeMap<$key_type, $value_type> { + let data: $field_type = prost::Message::decode(self.0.as_slice()).unwrap(); + data.inner + } + + fn from_protobuf(val: BTreeMap<$key_type, $value_type>) -> Self { + Self(prost::Message::encode_to_vec(&$field_type { inner: val })) + } + } + + impl From> for $struct_name { + fn from(value: BTreeMap<$key_type, $value_type>) -> Self { + Self::from_protobuf(value) + } + } + + impl std::fmt::Debug for $struct_name { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.to_protobuf().fmt(f) + } + } + + impl Default for $struct_name { + fn default() -> Self { + Self(vec![]) + } + } + + impl sea_orm::sea_query::Nullable for $struct_name { + fn null() -> Value { + Value::Bytes(None) + } + } + }; +} + pub(crate) use {derive_array_from_blob, derive_from_blob}; derive_from_json_struct!(I32Array, Vec); @@ -286,7 +335,7 @@ impl From>> for ActorUpstreamActors { } } -derive_from_json_struct!(SecretRef, BTreeMap); +derive_btreemap_from_blob!(SecretRef, String, PbSecretRef, PbSecretRefMap); derive_from_blob!(StreamNode, PbStreamNode); derive_from_blob!(DataType, risingwave_pb::data::PbDataType); diff --git a/src/meta/model_v2/src/sink.rs b/src/meta/model_v2/src/sink.rs index 78d0806f98a5e..25d6293b0b120 100644 --- a/src/meta/model_v2/src/sink.rs +++ b/src/meta/model_v2/src/sink.rs @@ -72,7 +72,7 @@ pub struct Model { pub sink_from_name: String, pub sink_format_desc: Option, pub target_table: Option, - // `secret_ref` stores a json string, mapping from property name to secret id. + // `secret_ref` stores the mapping info mapping from property name to secret id and type. pub secret_ref: Option, } diff --git a/src/meta/model_v2/src/source.rs b/src/meta/model_v2/src/source.rs index be2d2f7110cab..a90f399e4b8cc 100644 --- a/src/meta/model_v2/src/source.rs +++ b/src/meta/model_v2/src/source.rs @@ -19,8 +19,8 @@ use sea_orm::ActiveValue::Set; use serde::{Deserialize, Serialize}; use crate::{ - ColumnCatalogArray, ConnectionId, I32Array, Property, SourceId, StreamSourceInfo, TableId, - WatermarkDescArray, + ColumnCatalogArray, ConnectionId, I32Array, Property, SecretRef, SourceId, StreamSourceInfo, + TableId, WatermarkDescArray, }; #[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)] @@ -39,6 +39,8 @@ pub struct Model { pub optional_associated_table_id: Option, pub connection_id: Option, pub version: i64, + // `secret_ref` stores the mapping info mapping from property name to secret id and type. + pub secret_ref: Option, } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] @@ -101,6 +103,7 @@ impl From for ActiveModel { optional_associated_table_id: Set(optional_associated_table_id), connection_id: Set(source.connection_id.map(|id| id as _)), version: Set(source.version as _), + secret_ref: Set(None), } } } diff --git a/src/meta/src/controller/mod.rs b/src/meta/src/controller/mod.rs index 4fb5d086a060d..b6997240f7723 100644 --- a/src/meta/src/controller/mod.rs +++ b/src/meta/src/controller/mod.rs @@ -201,9 +201,9 @@ impl From> for PbSource { impl From> for PbSink { fn from(value: ObjectModel) -> Self { - let mut secret_ref_map: BTreeMap = BTreeMap::new(); + let mut secret_ref_map = BTreeMap::new(); if let Some(secret_ref) = value.0.secret_ref { - secret_ref_map = secret_ref.into_inner(); + secret_ref_map = secret_ref.to_protobuf(); } Self { id: value.0.sink_id as _, diff --git a/src/prost/build.rs b/src/prost/build.rs index 961dbe196944f..4e939f46abb63 100644 --- a/src/prost/build.rs +++ b/src/prost/build.rs @@ -66,6 +66,7 @@ fn main() -> Result<(), Box> { ".plan_common.ExternalTableDesc", ".hummock.CompactTask", ".catalog.StreamSourceInfo", + ".catalog.SecretRef", ".catalog.Source", ".catalog.Sink", ".catalog.View", @@ -111,6 +112,7 @@ fn main() -> Result<(), Box> { // The requirement is from Source node -> SourceCatalog -> WatermarkDesc -> expr .type_attribute("catalog.WatermarkDesc", "#[derive(Eq, Hash)]") .type_attribute("catalog.StreamSourceInfo", "#[derive(Eq, Hash)]") + .type_attribute("catalog.SecretRef", "#[derive(Eq, Hash)]") .type_attribute("catalog.IndexColumnProperties", "#[derive(Eq, Hash)]") .type_attribute("expr.ExprNode", "#[derive(Eq, Hash)]") .type_attribute("data.DataType", "#[derive(Eq, Hash)]")