Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(nimtable): nimtable make drop table more robust #18422

Merged
merged 2 commits into from
Sep 7, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 13 additions & 14 deletions src/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1312,24 +1312,22 @@ pub async fn handle_create_table(
.await?;
}
Engine::Iceberg => {
// export AWS_REGION=your_region
// export AWS_ACCESS_KEY_ID=your_access_key
// export AWS_SECRET_ACCESS_KEY=your_secret_key
// export RW_S3_ENDPOINT=your_endpoint
// export META_STORE_URI=your_meta_store_uri
// export META_STORE_USER=your_meta_store_user
// export META_STORE_PASSWORD=your_meta_store_password

let s3_endpoint = if let Ok(endpoint) = std::env::var("RW_S3_ENDPOINT") {
let s3_region = if let Ok(region) = std::env::var("AWS_REGION") {
region
} else {
"us-east-1".to_string()
};
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be a required field without default value?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right. Currently, the default value is used in local development. We can make all of them required.


let s3_endpoint = if let Ok(endpoint) = std::env::var("AWS_ENDPOINT_URL") {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

endpoint
} else {
"http://127.0.0.1:9301".to_string()
};

let s3_region = if let Ok(region) = std::env::var("AWS_REGION") {
region
let s3_bucket = if let Ok(bucket) = std::env::var("AWS_BUCKET") {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

bucket
} else {
"us-east-1".to_string()
"hummock001".to_string()
};

let s3_ak = if let Ok(ak) = std::env::var("AWS_ACCESS_KEY_ID") {
Expand Down Expand Up @@ -1438,7 +1436,7 @@ pub async fn handle_create_table(
with.insert("primary_key".to_string(), pks.join(","));
with.insert("type".to_string(), "upsert".to_string());
with.insert("catalog.type".to_string(), "jdbc".to_string());
with.insert("warehouse.path".to_string(), "s3://hummock001".to_string());
with.insert("warehouse.path".to_string(), format!("s3://{}", s3_bucket));
with.insert("s3.endpoint".to_string(), s3_endpoint.clone());
with.insert("s3.access.key".to_string(), s3_ak.clone());
with.insert("s3.secret.key".to_string(), s3_sk.clone());
Expand All @@ -1452,6 +1450,7 @@ pub async fn handle_create_table(
with.insert("catalog.name".to_string(), catalog_name.clone());
with.insert("database.name".to_string(), database_name.clone());
with.insert("table.name".to_string(), table_name.to_string());
with.insert("commit_checkpoint_interval".to_string(), "1".to_string());
with.insert("create_table_if_not_exists".to_string(), "true".to_string());
sink_handler_args.with_options = WithOptions::new_with_options(with);

Expand All @@ -1474,7 +1473,7 @@ pub async fn handle_create_table(
let mut with = BTreeMap::new();
with.insert("connector".to_string(), "iceberg".to_string());
with.insert("catalog.type".to_string(), "jdbc".to_string());
with.insert("warehouse.path".to_string(), "s3://hummock001".to_string());
with.insert("warehouse.path".to_string(), format!("s3://{}", s3_bucket));
with.insert("s3.endpoint".to_string(), s3_endpoint.clone());
with.insert("s3.access.key".to_string(), s3_ak.clone());
with.insert("s3.secret.key".to_string(), s3_sk.clone());
Expand Down
73 changes: 51 additions & 22 deletions src/frontend/src/handler/drop_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use risingwave_common::catalog::Engine;
use risingwave_connector::sink::iceberg::IcebergConfig;
use risingwave_connector::source::ConnectorProperties;
use risingwave_sqlparser::ast::{Ident, ObjectName};
use tracing::warn;

use super::RwPgResponse;
use crate::binder::Binder;
Expand Down Expand Up @@ -66,7 +67,7 @@ pub async fn handle_drop_table(

match engine {
Engine::Iceberg => {
let source = session
let iceberg_config = if let Ok(source) = session
.env()
.catalog_reader()
.read_guard()
Expand All @@ -75,7 +76,31 @@ pub async fn handle_drop_table(
schema_path,
&(ICEBERG_SOURCE_PREFIX.to_string() + &table_name),
)
.map(|(source, _)| source.clone())?;
.map(|(source, _)| source.clone())
{
let config = ConnectorProperties::extract(source.with_properties.clone(), false)?;
if let ConnectorProperties::Iceberg(iceberg_properties) = config {
Some(iceberg_properties.to_iceberg_config())
} else {
unreachable!("must be iceberg source");
}
} else if let Ok(sink) = session
.env()
.catalog_reader()
.read_guard()
.get_sink_by_name(
db_name,
schema_path,
&(ICEBERG_SINK_PREFIX.to_string() + &table_name),
)
.map(|(sink, _)| sink.clone())
{
// If iceberg source does not exist, use iceberg sink to load iceberg table
Some(IcebergConfig::from_btreemap(sink.properties.clone())?)
} else {
None
};

// TODO(nimtable): handle drop table failures in the middle.
// Drop sink
// Drop iceberg table
Expand All @@ -92,40 +117,44 @@ pub async fn handle_drop_table(
)
.await?;

let config = ConnectorProperties::extract(source.with_properties.clone(), false)?;
if let ConnectorProperties::Iceberg(iceberg_properties) = config {
let iceberg_config: IcebergConfig = iceberg_properties.to_iceberg_config();
if let Some(iceberg_config) = iceberg_config {
let iceberg_catalog = iceberg_config
.create_catalog_v2()
.await
.context("Unable to load iceberg catalog")?;
let table_id = iceberg_config
.full_table_name_v2()
.context("Unable to parse table name")?;
let table = iceberg_catalog
if let Ok(table) = iceberg_catalog
.load_table(&table_id)
.await
.context("failed to load iceberg table")?;
table
.file_io()
.remove_all(table.metadata().location())
.await
.context("failed to purge iceberg table")?;
.context("failed to load iceberg table")
{
table
.file_io()
.remove_all(table.metadata().location())
.await
.context("failed to purge iceberg table")?;
} else {
warn!("Table {} with iceberg engine, but failed to load iceberg table. It might be the warehouse path has been cleared but fail before drop iceberg source", table_name);
}
iceberg_catalog
.drop_table(&table_id)
.await
.context("failed to drop iceberg table")?;
}

crate::handler::drop_source::handle_drop_source(
handler_args.clone(),
ObjectName::from(vec![Ident::from(
(ICEBERG_SOURCE_PREFIX.to_string() + &table_name).as_str(),
)]),
true,
false,
)
.await?;
crate::handler::drop_source::handle_drop_source(
handler_args.clone(),
ObjectName::from(vec![Ident::from(
(ICEBERG_SOURCE_PREFIX.to_string() + &table_name).as_str(),
)]),
true,
false,
)
.await?;
} else {
warn!("Table {} with iceberg engine but with no source and sink. It might be created partially. Please check it with iceberg catalog", table_name);
}
}
Engine::Hummock => {}
}
Expand Down
Loading