Skip to content

Commit

Permalink
fix: allow nullable for target table columns for normal sink (#18251)
Browse files Browse the repository at this point in the history
  • Loading branch information
yezizp2012 authored Aug 26, 2024
1 parent ce7a5af commit 860a936
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 4 deletions.
6 changes: 6 additions & 0 deletions src/meta/model_v2/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,12 @@ macro_rules! derive_array_from_blob {
Self(vec![])
}
}

impl sea_orm::sea_query::Nullable for $struct_name {
fn null() -> Value {
Value::Bytes(None)
}
}
};
}

Expand Down
4 changes: 2 additions & 2 deletions src/meta/model_v2/src/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ pub struct Model {
pub target_table: Option<TableId>,
// `secret_ref` stores the mapping info mapping from property name to secret id and type.
pub secret_ref: Option<SecretRef>,
pub original_target_columns: ColumnCatalogArray,
pub original_target_columns: Option<ColumnCatalogArray>,
}

#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
Expand Down Expand Up @@ -131,7 +131,7 @@ impl From<PbSink> for ActiveModel {
sink_format_desc: Set(pb_sink.format_desc.as_ref().map(|x| x.into())),
target_table: Set(pb_sink.target_table.map(|x| x as _)),
secret_ref: Set(Some(SecretRef::from(pb_sink.secret_refs))),
original_target_columns: Set(pb_sink.original_target_columns.into()),
original_target_columns: Set(Some(pb_sink.original_target_columns.into())),
}
}
}
6 changes: 5 additions & 1 deletion src/meta/src/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,11 @@ impl From<ObjectModel<sink::Model>> for PbSink {
created_at_cluster_version: value.1.created_at_cluster_version,
create_type: PbCreateType::Foreground as _,
secret_refs: secret_ref_map,
original_target_columns: value.0.original_target_columns.to_protobuf(),
original_target_columns: value
.0
.original_target_columns
.map(|cols| cols.to_protobuf())
.unwrap_or_default(),
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/controller/streaming_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -989,7 +989,7 @@ impl CatalogController {
for sink_id in updated_sink_catalogs {
sink::ActiveModel {
sink_id: Set(sink_id as _),
original_target_columns: Set(original_table_catalogs.clone()),
original_target_columns: Set(Some(original_table_catalogs.clone())),
..Default::default()
}
.update(txn)
Expand Down

0 comments on commit 860a936

Please sign in to comment.