Skip to content

Commit

Permalink
feat(nimtable): nimtable make drop table more robust (#18422)
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzl25 committed Sep 20, 2024
1 parent 0b75269 commit 9852ac0
Show file tree
Hide file tree
Showing 2 changed files with 116 additions and 77 deletions.
120 changes: 65 additions & 55 deletions src/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use either::Either;
use fixedbitset::FixedBitSet;
use itertools::Itertools;
use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_common::bail_not_implemented;
use risingwave_common::catalog::{
CdcTableDesc, ColumnCatalog, ColumnDesc, Engine, TableId, TableVersionId, DEFAULT_SCHEMA_NAME,
INITIAL_TABLE_VERSION_ID, ROWID_PREFIX,
Expand All @@ -30,6 +29,7 @@ use risingwave_common::license::Feature;
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
use risingwave_common::util::value_encoding::DatumToProtoExt;
use risingwave_common::{bail, bail_not_implemented};
use risingwave_connector::source::cdc::build_cdc_table_id;
use risingwave_connector::source::cdc::external::{
ExternalTableConfig, ExternalTableImpl, DATABASE_NAME_KEY, SCHEMA_NAME_KEY, TABLE_NAME_KEY,
Expand Down Expand Up @@ -1308,56 +1308,58 @@ pub async fn handle_create_table(
.await?;
}
Engine::Iceberg => {
// export AWS_REGION=your_region
// export AWS_ACCESS_KEY_ID=your_access_key
// export AWS_SECRET_ACCESS_KEY=your_secret_key
// export RW_S3_ENDPOINT=your_endpoint
// export META_STORE_URI=your_meta_store_uri
// export META_STORE_USER=your_meta_store_user
// export META_STORE_PASSWORD=your_meta_store_password

let s3_endpoint = if let Ok(endpoint) = std::env::var("RW_S3_ENDPOINT") {
endpoint
let s3_endpoint = if let Ok(s3_endpoint) = std::env::var("AWS_ENDPOINT_URL") {
Some(s3_endpoint)
} else {
"http://127.0.0.1:9301".to_string()
None
};

let s3_region = if let Ok(region) = std::env::var("AWS_REGION") {
region
} else {
"us-east-1".to_string()
let Ok(s3_region) = std::env::var("AWS_REGION") else {
bail!("To create an iceberg engine table, AWS_REGION needed to be set");
};

let s3_ak = if let Ok(ak) = std::env::var("AWS_ACCESS_KEY_ID") {
ak
} else {
"hummockadmin".to_string()
let Ok(s3_bucket) = std::env::var("AWS_BUCKET") else {
bail!("To create an iceberg engine table, AWS_BUCKET needed to be set");
};

let s3_sk = if let Ok(sk) = std::env::var("AWS_SECRET_ACCESS_KEY") {
sk
} else {
"hummockadmin".to_string()
let Ok(s3_ak) = std::env::var("AWS_ACCESS_KEY_ID") else {
bail!("To create an iceberg engine table, AWS_ACCESS_KEY_ID needed to be set");
};

let meta_store_uri = if let Ok(uri) = std::env::var("META_STORE_URI") {
uri
} else {
"jdbc:sqlite:/tmp/sqlite/iceberg.db".to_string()
let Ok(s3_sk) = std::env::var("AWS_SECRET_ACCESS_KEY") else {
bail!("To create an iceberg engine table, AWS_SECRET_ACCESS_KEY needed to be set");
};

let meta_store_user = if let Ok(user) = std::env::var("META_STORE_USER") {
user
} else {
"xxx".to_string()
let Ok(meta_store_uri) = std::env::var("META_STORE_URI") else {
bail!("To create an iceberg engine table, META_STORE_URI needed to be set");
};

let meta_store_password = if let Ok(password) = std::env::var("META_STORE_PASSWORD") {
password
} else {
"xxx".to_string()
let Ok(meta_store_user) = std::env::var("META_STORE_USER") else {
bail!("To create an iceberg engine table, META_STORE_USER needed to be set");
};

let Ok(meta_store_password) = std::env::var("META_STORE_PASSWORD") else {
bail!("To create an iceberg engine table, META_STORE_PASSWORD needed to be set");
};

let rw_db_name = session
.env()
.catalog_reader()
.read_guard()
.get_database_by_id(&table.database_id)?
.name()
.to_string();
let rw_schema_name = session
.env()
.catalog_reader()
.read_guard()
.get_schema_by_id(&table.database_id, &table.schema_id)?
.name()
.to_string();
let iceberg_catalog_name = rw_db_name.to_string();
let iceberg_database_name = rw_schema_name.to_string();
let iceberg_table_name = table_name.0.last().unwrap().real_value();

// Iceberg sinks require a primary key, if none is provided, we will use the _row_id column
// Fetch primary key from columns
let mut pks = column_defs
Expand Down Expand Up @@ -1411,11 +1413,14 @@ pub async fn handle_create_table(
};

let with_properties = WithProperties(vec![]);
let mut sink_name = table_name.clone();
*sink_name.0.last_mut().unwrap() = Ident::from(
(ICEBERG_SINK_PREFIX.to_string() + &sink_name.0.last().unwrap().real_value())
.as_str(),
);
let create_sink_stmt = CreateSinkStatement {
if_not_exists: false,
sink_name: ObjectName::from(vec![Ident::from(
(ICEBERG_SINK_PREFIX.to_string() + &table_name.real_value()).as_str(),
)]),
sink_name,
with_properties,
sink_from,
columns: vec![],
Expand All @@ -1424,18 +1429,17 @@ pub async fn handle_create_table(
into_table_name: None,
};

let catalog_name = "nimtable".to_string();
let database_name = "nimtable_db".to_string();

let mut sink_handler_args = handler_args.clone();
let mut with = BTreeMap::new();
with.insert("connector".to_string(), "iceberg".to_string());

with.insert("primary_key".to_string(), pks.join(","));
with.insert("type".to_string(), "upsert".to_string());
with.insert("catalog.type".to_string(), "jdbc".to_string());
with.insert("warehouse.path".to_string(), "s3://hummock001".to_string());
with.insert("s3.endpoint".to_string(), s3_endpoint.clone());
with.insert("warehouse.path".to_string(), format!("s3://{}", s3_bucket));
if let Some(s3_endpoint) = s3_endpoint.clone() {
with.insert("s3.endpoint".to_string(), s3_endpoint);
}
with.insert("s3.access.key".to_string(), s3_ak.clone());
with.insert("s3.secret.key".to_string(), s3_sk.clone());
with.insert("s3.region".to_string(), s3_region.clone());
Expand All @@ -1445,19 +1449,23 @@ pub async fn handle_create_table(
"catalog.jdbc.password".to_string(),
meta_store_password.clone(),
);
with.insert("catalog.name".to_string(), catalog_name.clone());
with.insert("database.name".to_string(), database_name.clone());
with.insert("table.name".to_string(), table_name.to_string());
with.insert("catalog.name".to_string(), iceberg_catalog_name.clone());
with.insert("database.name".to_string(), iceberg_database_name.clone());
with.insert("table.name".to_string(), iceberg_table_name.to_string());
with.insert("commit_checkpoint_interval".to_string(), "1".to_string());
with.insert("create_table_if_not_exists".to_string(), "true".to_string());
sink_handler_args.with_options = WithOptions::new_with_options(with);

let mut source_name = table_name.clone();
*source_name.0.last_mut().unwrap() = Ident::from(
(ICEBERG_SOURCE_PREFIX.to_string() + &source_name.0.last().unwrap().real_value())
.as_str(),
);
let create_source_stmt = CreateSourceStatement {
temporary: false,
if_not_exists: false,
columns: vec![],
source_name: ObjectName::from(vec![Ident::from(
(ICEBERG_SOURCE_PREFIX.to_string() + &table_name.real_value()).as_str(),
)]),
source_name,
wildcard_idx: None,
constraints: vec![],
with_properties: WithProperties(vec![]),
Expand All @@ -1470,8 +1478,10 @@ pub async fn handle_create_table(
let mut with = BTreeMap::new();
with.insert("connector".to_string(), "iceberg".to_string());
with.insert("catalog.type".to_string(), "jdbc".to_string());
with.insert("warehouse.path".to_string(), "s3://hummock001".to_string());
with.insert("s3.endpoint".to_string(), s3_endpoint.clone());
with.insert("warehouse.path".to_string(), format!("s3://{}", s3_bucket));
if let Some(s3_endpoint) = s3_endpoint {
with.insert("s3.endpoint".to_string(), s3_endpoint.clone());
}
with.insert("s3.access.key".to_string(), s3_ak.clone());
with.insert("s3.secret.key".to_string(), s3_sk.clone());
with.insert("s3.region".to_string(), s3_region.clone());
Expand All @@ -1481,9 +1491,9 @@ pub async fn handle_create_table(
"catalog.jdbc.password".to_string(),
meta_store_password.clone(),
);
with.insert("catalog.name".to_string(), catalog_name.clone());
with.insert("database.name".to_string(), database_name.clone());
with.insert("table.name".to_string(), table_name.to_string());
with.insert("catalog.name".to_string(), iceberg_catalog_name.clone());
with.insert("database.name".to_string(), iceberg_database_name.clone());
with.insert("table.name".to_string(), iceberg_table_name.to_string());
source_handler_args.with_options = WithOptions::new_with_options(with);

catalog_writer
Expand Down
73 changes: 51 additions & 22 deletions src/frontend/src/handler/drop_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use risingwave_common::catalog::Engine;
use risingwave_connector::sink::iceberg::IcebergConfig;
use risingwave_connector::source::ConnectorProperties;
use risingwave_sqlparser::ast::{Ident, ObjectName};
use tracing::warn;

use super::RwPgResponse;
use crate::binder::Binder;
Expand Down Expand Up @@ -66,7 +67,7 @@ pub async fn handle_drop_table(

match engine {
Engine::Iceberg => {
let source = session
let iceberg_config = if let Ok(source) = session
.env()
.catalog_reader()
.read_guard()
Expand All @@ -75,7 +76,31 @@ pub async fn handle_drop_table(
schema_path,
&(ICEBERG_SOURCE_PREFIX.to_string() + &table_name),
)
.map(|(source, _)| source.clone())?;
.map(|(source, _)| source.clone())
{
let config = ConnectorProperties::extract(source.with_properties.clone(), false)?;
if let ConnectorProperties::Iceberg(iceberg_properties) = config {
Some(iceberg_properties.to_iceberg_config())
} else {
unreachable!("must be iceberg source");
}
} else if let Ok(sink) = session
.env()
.catalog_reader()
.read_guard()
.get_sink_by_name(
db_name,
schema_path,
&(ICEBERG_SINK_PREFIX.to_string() + &table_name),
)
.map(|(sink, _)| sink.clone())
{
// If iceberg source does not exist, use iceberg sink to load iceberg table
Some(IcebergConfig::from_btreemap(sink.properties.clone())?)
} else {
None
};

// TODO(nimtable): handle drop table failures in the middle.
// Drop sink
// Drop iceberg table
Expand All @@ -92,40 +117,44 @@ pub async fn handle_drop_table(
)
.await?;

let config = ConnectorProperties::extract(source.with_properties.clone(), false)?;
if let ConnectorProperties::Iceberg(iceberg_properties) = config {
let iceberg_config: IcebergConfig = iceberg_properties.to_iceberg_config();
if let Some(iceberg_config) = iceberg_config {
let iceberg_catalog = iceberg_config
.create_catalog_v2()
.await
.context("Unable to load iceberg catalog")?;
let table_id = iceberg_config
.full_table_name_v2()
.context("Unable to parse table name")?;
let table = iceberg_catalog
if let Ok(table) = iceberg_catalog
.load_table(&table_id)
.await
.context("failed to load iceberg table")?;
table
.file_io()
.remove_all(table.metadata().location())
.await
.context("failed to purge iceberg table")?;
.context("failed to load iceberg table")
{
table
.file_io()
.remove_all(table.metadata().location())
.await
.context("failed to purge iceberg table")?;
} else {
warn!("Table {} with iceberg engine, but failed to load iceberg table. It might be the warehouse path has been cleared but fail before drop iceberg source", table_name);
}
iceberg_catalog
.drop_table(&table_id)
.await
.context("failed to drop iceberg table")?;
}

crate::handler::drop_source::handle_drop_source(
handler_args.clone(),
ObjectName::from(vec![Ident::from(
(ICEBERG_SOURCE_PREFIX.to_string() + &table_name).as_str(),
)]),
true,
false,
)
.await?;
crate::handler::drop_source::handle_drop_source(
handler_args.clone(),
ObjectName::from(vec![Ident::from(
(ICEBERG_SOURCE_PREFIX.to_string() + &table_name).as_str(),
)]),
true,
false,
)
.await?;
} else {
warn!("Table {} with iceberg engine but with no source and sink. It might be created partially. Please check it with iceberg catalog", table_name);
}
}
Engine::Hummock => {}
}
Expand Down

0 comments on commit 9852ac0

Please sign in to comment.