Skip to content

Commit

Permalink
feat: support create sink into table in sql backend (#14780)
Browse files Browse the repository at this point in the history
Signed-off-by: Shanicky Chen <[email protected]>
  • Loading branch information
shanicky authored Feb 18, 2024
1 parent 5f8ce34 commit 8323b17
Show file tree
Hide file tree
Showing 10 changed files with 291 additions and 54 deletions.
11 changes: 11 additions & 0 deletions src/meta/model_v2/migration/src/m20230908_072257_init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,7 @@ impl MigrationTrait for Migration {
.col(ColumnDef::new(Table::Description).string())
.col(ColumnDef::new(Table::Version).json())
.col(ColumnDef::new(Table::RetentionSeconds).integer())
.col(ColumnDef::new(Table::IncomingSinks).json().not_null())
.foreign_key(
&mut ForeignKey::create()
.name("FK_table_object_id")
Expand Down Expand Up @@ -628,6 +629,7 @@ impl MigrationTrait for Migration {
.col(ColumnDef::new(Sink::DbName).string().not_null())
.col(ColumnDef::new(Sink::SinkFromName).string().not_null())
.col(ColumnDef::new(Sink::SinkFormatDesc).json())
.col(ColumnDef::new(Sink::TargetTable).integer())
.foreign_key(
&mut ForeignKey::create()
.name("FK_sink_object_id")
Expand All @@ -643,6 +645,13 @@ impl MigrationTrait for Migration {
.to(Connection::Table, Connection::ConnectionId)
.to_owned(),
)
.foreign_key(
&mut ForeignKey::create()
.name("FK_sink_target_table_id")
.from(Sink::Table, Sink::TargetTable)
.to(Table::Table, Table::TableId)
.to_owned(),
)
.to_owned(),
)
.await?;
Expand Down Expand Up @@ -1034,6 +1043,7 @@ enum Table {
Description,
Version,
RetentionSeconds,
IncomingSinks,
}

#[derive(DeriveIden)]
Expand Down Expand Up @@ -1069,6 +1079,7 @@ enum Sink {
DbName,
SinkFromName,
SinkFormatDesc,
TargetTable,
}

#[derive(DeriveIden)]
Expand Down
3 changes: 3 additions & 0 deletions src/meta/model_v2/src/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use sea_orm::ActiveValue::Set;

use crate::{
ColumnCatalogArray, ColumnOrderArray, ConnectionId, I32Array, Property, SinkFormatDesc, SinkId,
TableId,
};

#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum)]
Expand Down Expand Up @@ -69,6 +70,7 @@ pub struct Model {
pub db_name: String,
pub sink_from_name: String,
pub sink_format_desc: Option<SinkFormatDesc>,
pub target_table: Option<TableId>,
}

#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
Expand Down Expand Up @@ -123,6 +125,7 @@ impl From<PbSink> for ActiveModel {
db_name: Set(pb_sink.db_name),
sink_from_name: Set(pb_sink.sink_from_name),
sink_format_desc: Set(pb_sink.format_desc.map(|x| x.into())),
target_table: Set(pb_sink.target_table.map(|x| x as _)),
}
}
}
2 changes: 2 additions & 0 deletions src/meta/model_v2/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ pub struct Model {
pub description: Option<String>,
pub version: Option<TableVersion>,
pub retention_seconds: Option<i32>,
pub incoming_sinks: I32Array,
}

#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
Expand Down Expand Up @@ -232,6 +233,7 @@ impl From<PbTable> for ActiveModel {
description: Set(pb_table.description),
version: Set(pb_table.version.map(|v| v.into())),
retention_seconds: Set(pb_table.retention_seconds.map(|i| i as _)),
incoming_sinks: Set(pb_table.incoming_sinks.into()),
}
}
}
1 change: 1 addition & 0 deletions src/meta/node/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ pub async fn rpc_serve(
let mut options = sea_orm::ConnectOptions::new(endpoint);
options
.max_connections(max_connection)
.sqlx_logging(false)
.connect_timeout(Duration::from_secs(10))
.idle_timeout(Duration::from_secs(30));
let conn = sea_orm::Database::connect(options).await?;
Expand Down
21 changes: 21 additions & 0 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -928,6 +928,27 @@ impl CommandContext {
init_split_assignment,
)
.await?;

if let Some(ReplaceTablePlan {
new_table_fragments,
dispatchers,
init_split_assignment,
old_table_fragments,
..
}) = replace_table
{
// Tell compute nodes to drop actors.
self.clean_up(old_table_fragments.actor_ids()).await?;

mgr.catalog_controller
.post_collect_table_fragments(
new_table_fragments.table_id().table_id as _,
new_table_fragments.actor_ids(),
dispatchers.clone(),
init_split_assignment,
)
.await?;
}
}
}

Expand Down
6 changes: 2 additions & 4 deletions src/meta/src/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,7 @@ impl From<ObjectModel<table::Model>> for PbTable {
.optional_associated_source_id
.map(|id| PbOptionalAssociatedSourceId::AssociatedSourceId(id as _)),
description: value.0.description,
// TODO: fix it for model v2.
incoming_sinks: vec![],
incoming_sinks: value.0.incoming_sinks.into_u32_array(),
initialized_at_cluster_version: value.1.initialized_at_cluster_version,
created_at_cluster_version: value.1.created_at_cluster_version,
retention_seconds: value.0.retention_seconds.map(|id| id as u32),
Expand Down Expand Up @@ -204,8 +203,7 @@ impl From<ObjectModel<sink::Model>> for PbSink {
sink_from_name: value.0.sink_from_name,
stream_job_status: PbStreamJobStatus::Created as _, // todo: deprecate it.
format_desc: value.0.sink_format_desc.map(|desc| desc.0),
// todo: fix this for model v2
target_table: None,
target_table: value.0.target_table.map(|id| id as _),
initialized_at_cluster_version: value.1.initialized_at_cluster_version,
created_at_cluster_version: value.1.created_at_cluster_version,
}
Expand Down
21 changes: 18 additions & 3 deletions src/meta/src/controller/streaming_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -552,8 +552,8 @@ impl CatalogController {
streaming_job: StreamingJob,
merge_updates: Vec<PbMergeUpdate>,
table_col_index_mapping: Option<ColIndexMapping>,
_creating_sink_id: Option<SinkId>,
_dropping_sink_id: Option<SinkId>,
creating_sink_id: Option<SinkId>,
dropping_sink_id: Option<SinkId>,
) -> MetaResult<NotificationVersion> {
// Question: The source catalog should be remain unchanged?
let StreamingJob::Table(_, table, ..) = streaming_job else {
Expand All @@ -564,7 +564,22 @@ impl CatalogController {
let txn = inner.db.begin().await?;
let job_id = table.id as ObjectId;

let table = table::ActiveModel::from(table).update(&txn).await?;
let mut table = table::ActiveModel::from(table);
let mut incoming_sinks = table.incoming_sinks.as_ref().inner_ref().clone();
if let Some(sink_id) = creating_sink_id {
debug_assert!(!incoming_sinks.contains(&(sink_id as i32)));
incoming_sinks.push(sink_id as _);
}

if let Some(sink_id) = dropping_sink_id {
let drained = incoming_sinks
.extract_if(|id| *id == sink_id as i32)
.collect_vec();
debug_assert_eq!(drained, vec![sink_id as i32]);
}

table.incoming_sinks = Set(incoming_sinks.into());
let table = table.update(&txn).await?;

// let old_fragment_mappings = get_fragment_mappings(&txn, job_id).await?;
// 1. replace old fragments/actors with new ones.
Expand Down
87 changes: 56 additions & 31 deletions src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -389,8 +389,13 @@ impl DdlController {
.await
}
MetadataManager::V2(_) => {
self.drop_object(ObjectType::Database, database_id as _, DropMode::Cascade)
.await
self.drop_object(
ObjectType::Database,
database_id as _,
DropMode::Cascade,
None,
)
.await
}
}
}
Expand All @@ -409,7 +414,7 @@ impl DdlController {
match &self.metadata_manager {
MetadataManager::V1(mgr) => mgr.catalog_manager.drop_schema(schema_id).await,
MetadataManager::V2(_) => {
self.drop_object(ObjectType::Schema, schema_id as _, DropMode::Restrict)
self.drop_object(ObjectType::Schema, schema_id as _, DropMode::Restrict, None)
.await
}
}
Expand Down Expand Up @@ -453,7 +458,7 @@ impl DdlController {
) -> MetaResult<NotificationVersion> {
let MetadataManager::V1(mgr) = &self.metadata_manager else {
return self
.drop_object(ObjectType::Source, source_id as _, drop_mode)
.drop_object(ObjectType::Source, source_id as _, drop_mode, None)
.await;
};
// 1. Drop source in catalog.
Expand Down Expand Up @@ -523,7 +528,7 @@ impl DdlController {
) -> MetaResult<NotificationVersion> {
let MetadataManager::V1(mgr) = &self.metadata_manager else {
return self
.drop_object(ObjectType::View, view_id as _, drop_mode)
.drop_object(ObjectType::View, view_id as _, drop_mode, None)
.await;
};
let (version, streaming_job_ids) = mgr
Expand Down Expand Up @@ -569,6 +574,7 @@ impl DdlController {
ObjectType::Connection,
connection_id as _,
DropMode::Restrict,
None,
)
.await
}
Expand Down Expand Up @@ -616,7 +622,7 @@ impl DdlController {
) -> MetaResult<NotificationVersion> {
let MetadataManager::V1(mgr) = &self.metadata_manager else {
return self
.create_streaming_job_v2(stream_job, fragment_graph)
.create_streaming_job_v2(stream_job, fragment_graph, affected_table_replace_info)
.await;
};
let id = self.gen_unique_id::<{ IdCategory::Table }>().await?;
Expand Down Expand Up @@ -843,22 +849,17 @@ impl DdlController {
// Here we modify the union node of the downstream table by the TableFragments of the to-be-created sink upstream.
// The merge in the union has already been set up in the frontend and will be filled with specific upstream actors in this function.
// Meanwhile, the Dispatcher corresponding to the upstream of the merge will also be added to the replace table context here.
async fn inject_replace_table_job_for_table_sink(
pub(crate) async fn inject_replace_table_job_for_table_sink(
&self,
mgr: &MetadataManagerV1,
dummy_id: u32,
mgr: &MetadataManager,
stream_ctx: StreamContext,
sink: Option<&Sink>,
creating_sink_table_fragments: Option<&TableFragments>,
dropping_sink_id: Option<SinkId>,
streaming_job: &StreamingJob,
fragment_graph: StreamFragmentGraph,
) -> MetaResult<(ReplaceTableContext, TableFragments)> {
let dummy_id = self
.env
.id_gen_manager()
.generate::<{ IdCategory::Table }>()
.await? as u32;

let (mut replace_table_ctx, mut table_fragments) = self
.build_replace_table(stream_ctx, streaming_job, fragment_graph, None, dummy_id)
.await?;
Expand Down Expand Up @@ -901,28 +902,24 @@ impl DdlController {
}

let [table_catalog]: [_; 1] = mgr
.catalog_manager
.get_tables(&[table.id])
.await
.get_table_catalog_by_ids(vec![table.id])
.await?
.try_into()
.expect("Target table should exist in sink into table");

assert_eq!(table_catalog.incoming_sinks, table.incoming_sinks);

{
let guard = mgr.fragment_manager.get_fragment_read_guard().await;

for sink_id in &table_catalog.incoming_sinks {
if let Some(dropping_sink_id) = dropping_sink_id
&& *sink_id == dropping_sink_id
{
continue;
};

let sink_table_fragments = guard
.table_fragments()
.get(&risingwave_common::catalog::TableId::new(*sink_id))
.unwrap();
let sink_table_fragments = mgr
.get_job_fragments_by_id(&risingwave_common::catalog::TableId::new(*sink_id))
.await?;

let sink_fragment = sink_table_fragments.sink_fragment().unwrap();

Expand Down Expand Up @@ -1133,16 +1130,18 @@ impl DdlController {
.await
}
MetadataManager::V2(_) => {
if target_replace_info.is_some() {
unimplemented!("support replace table for drop in v2");
}
let (object_id, object_type) = match job_id {
StreamingJobId::MaterializedView(id) => (id as _, ObjectType::Table),
StreamingJobId::Sink(id) => (id as _, ObjectType::Sink),
StreamingJobId::Table(_, id) => (id as _, ObjectType::Table),
StreamingJobId::Index(idx) => (idx as _, ObjectType::Index),
};
self.drop_object(object_type, object_id, drop_mode).await

let version = self
.drop_object(object_type, object_id, drop_mode, target_replace_info)
.await?;

Ok(version)
}
}
}
Expand Down Expand Up @@ -1218,9 +1217,17 @@ impl DdlController {

let result: MetaResult<()> = try {
tracing::debug!(id = streaming_job.id(), "replacing table for dropped sink");

let dummy_id = self
.env
.id_gen_manager()
.generate::<{ IdCategory::Table }>()
.await? as u32;

let (context, table_fragments) = self
.inject_replace_table_job_for_table_sink(
mgr,
dummy_id,
&self.metadata_manager,
stream_ctx,
None,
None,
Expand Down Expand Up @@ -1387,13 +1394,31 @@ impl DdlController {
let StreamingJob::Sink(s, _) = stream_job else {
bail!("additional replace table event only occurs when sinking into table");
};
let MetadataManager::V1(mgr) = &self.metadata_manager else {
unimplemented!("support create sink into table in v2");

let dummy_id = match &self.metadata_manager {
MetadataManager::V1(_) => {
self.env
.id_gen_manager()
.generate::<{ IdCategory::Table }>()
.await? as u32
}
MetadataManager::V2(mgr) => {
let table = streaming_job.table().unwrap();
mgr.catalog_controller
.create_job_catalog_for_replace(
&streaming_job,
&stream_ctx,
table.get_version()?,
&fragment_graph.default_parallelism(),
)
.await? as u32
}
};

let (context, table_fragments) = self
.inject_replace_table_job_for_table_sink(
mgr,
dummy_id,
&self.metadata_manager,
stream_ctx,
Some(s),
Some(&table_fragments),
Expand Down
Loading

0 comments on commit 8323b17

Please sign in to comment.