Skip to content

Commit

Permalink
chore(query): refresh hook does not return an error (#14194)
Browse files Browse the repository at this point in the history
* fix(query): fix hook refresh virtual columns without license

* fix

* fix

* fix

* fix
  • Loading branch information
b41sh authored Jan 2, 2024
1 parent 6b8fd7f commit 98935c1
Show file tree
Hide file tree
Showing 11 changed files with 85 additions and 78 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

40 changes: 20 additions & 20 deletions src/query/service/src/interpreters/hook/compact_hook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,29 +72,29 @@ async fn do_hook_compact(
if ctx.get_settings().get_enable_compact_after_write()? {
{
pipeline.set_on_finished(move |err| {
if !ctx.get_need_compact_after_write() {
return Ok(());
}

let op_name = &trace_ctx.operation_name;
metrics_inc_compact_hook_main_operation_time_ms(op_name, trace_ctx.start.elapsed().as_millis() as u64);

let compact_start_at = Instant::now();
if err.is_ok() {
info!("execute {op_name} finished successfully. running table optimization job.");
match GlobalIORuntime::instance().block_on({
compact_table(ctx, compact_target, need_lock)
}) {
Ok(_) => {
info!("execute {op_name} finished successfully. table optimization job finished.");
}
Err(e) => { info!("execute {op_name} finished successfully. table optimization job failed. {:?}", e) }
if !ctx.get_need_compact_after_write() {
return Ok(());
}

let op_name = &trace_ctx.operation_name;
metrics_inc_compact_hook_main_operation_time_ms(op_name, trace_ctx.start.elapsed().as_millis() as u64);

let compact_start_at = Instant::now();
if err.is_ok() {
info!("execute {op_name} finished successfully. running table optimization job.");
match GlobalIORuntime::instance().block_on({
compact_table(ctx, compact_target, need_lock)
}) {
Ok(_) => {
info!("execute {op_name} finished successfully. table optimization job finished.");
}
Err(e) => { info!("execute {op_name} finished successfully. table optimization job failed. {:?}", e) }
}
metrics_inc_compact_hook_compact_time_ms(&trace_ctx.operation_name, compact_start_at.elapsed().as_millis() as u64);
}
metrics_inc_compact_hook_compact_time_ms(&trace_ctx.operation_name, compact_start_at.elapsed().as_millis() as u64);

Ok(())
});
Ok(())
});
}
}
Ok(())
Expand Down
68 changes: 40 additions & 28 deletions src/query/service/src/interpreters/hook/refresh_hook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_meta_app::schema::IndexMeta;
use databend_common_meta_app::schema::ListIndexesByIdReq;
use databend_common_meta_app::schema::ListVirtualColumnsReq;
use databend_common_meta_types::MetaId;
use databend_common_pipeline_core::Pipeline;
use databend_common_sql::plans::Plan;
Expand All @@ -31,7 +32,6 @@ use databend_common_sql::Binder;
use databend_common_sql::Metadata;
use databend_common_sql::NameResolutionContext;
use databend_storages_common_table_meta::meta::Location;
use log::error;
use log::info;
use parking_lot::RwLock;

Expand All @@ -50,42 +50,37 @@ pub struct RefreshDesc {

/// Hook refresh action with a on-finished callback.
/// errors (if any) are ignored.
pub async fn hook_refresh(
ctx: Arc<QueryContext>,
pipeline: &mut Pipeline,
desc: RefreshDesc,
) -> Result<()> {
pub async fn hook_refresh(ctx: Arc<QueryContext>, pipeline: &mut Pipeline, desc: RefreshDesc) {
if pipeline.is_empty() {
return Ok(());
return;
}

let refresh_virtual_column = ctx
.get_settings()
.get_enable_refresh_virtual_column_after_write()?;
.get_enable_refresh_virtual_column_after_write()
.unwrap_or(false);

pipeline.set_on_finished(move |may_error| match may_error {
Ok(_) => {
pipeline.set_on_finished(move |err| {
if err.is_ok() {
info!("execute pipeline finished successfully, starting run refresh job.");
GlobalIORuntime::instance().block_on(async move {
let result = do_hook_refresh(ctx, desc, refresh_virtual_column).await;
match result {
Ok(_) => Ok(()),
Err(e) if e.code() == ErrorCode::LICENSE_KEY_INVALID => {
error!("license key invalid: {}", e.message());
Ok(())
}
Err(e) => Err(e),
match GlobalIORuntime::instance().block_on(execute_refresh_job(
ctx,
desc,
refresh_virtual_column,
)) {
Ok(_) => {
info!("execute refresh job successfully.");
}
})
Err(e) => {
info!("execute refresh job failed. {:?}", e);
}
}
}
Err(e) => Err(e.clone()),
Ok(())
});

Ok(())
}

/// Hook refresh action with a on-finished callback.
async fn do_hook_refresh(
async fn execute_refresh_job(
ctx: Arc<QueryContext>,
desc: RefreshDesc,
refresh_virtual_column: bool,
Expand All @@ -102,7 +97,9 @@ async fn do_hook_refresh(

if refresh_virtual_column {
let virtual_column_plan = generate_refresh_virtual_column_plan(ctx.clone(), &desc).await?;
plans.push(virtual_column_plan);
if let Some(virtual_column_plan) = virtual_column_plan {
plans.push(virtual_column_plan);
}
}

let mut tasks = Vec::with_capacity(std::cmp::min(
Expand Down Expand Up @@ -231,15 +228,30 @@ async fn build_refresh_index_plan(
async fn generate_refresh_virtual_column_plan(
ctx: Arc<QueryContext>,
desc: &RefreshDesc,
) -> Result<Plan> {
) -> Result<Option<Plan>> {
let segment_locs = ctx.get_segment_locations()?;

let table_info = ctx
.get_table(&desc.catalog, &desc.database, &desc.table)
.await?;
let catalog = ctx.get_catalog(&desc.catalog).await?;
let res = catalog
.list_virtual_columns(ListVirtualColumnsReq {
tenant: ctx.get_tenant(),
table_id: Some(table_info.get_id()),
})
.await?;

if res.is_empty() || res[0].virtual_columns.is_empty() {
return Ok(None);
}
let plan = RefreshVirtualColumnPlan {
catalog: desc.catalog.clone(),
database: desc.database.clone(),
table: desc.table.clone(),
virtual_columns: res[0].virtual_columns.clone(),
segment_locs: Some(segment_locs),
};

Ok(Plan::RefreshVirtualColumn(Box::new(plan)))
Ok(Some(Plan::RefreshVirtualColumn(Box::new(plan))))
}
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ impl Interpreter for CopyIntoTableInterpreter {
table: self.plan.table_name.clone(),
};

hook_refresh(self.ctx.clone(), &mut build_res.main_pipeline, refresh_desc).await?;
hook_refresh(self.ctx.clone(), &mut build_res.main_pipeline, refresh_desc).await;
}

Ok(build_res)
Expand Down
4 changes: 2 additions & 2 deletions src/query/service/src/interpreters/interpreter_insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ impl Interpreter for InsertInterpreter {
table: self.plan.table.clone(),
};

hook_refresh(self.ctx.clone(), &mut build_res.main_pipeline, refresh_desc).await?;
hook_refresh(self.ctx.clone(), &mut build_res.main_pipeline, refresh_desc).await;

return Ok(build_res);
}
Expand Down Expand Up @@ -333,7 +333,7 @@ impl Interpreter for InsertInterpreter {
table: self.plan.table.clone(),
};

hook_refresh(self.ctx.clone(), &mut build_res.main_pipeline, refresh_desc).await?;
hook_refresh(self.ctx.clone(), &mut build_res.main_pipeline, refresh_desc).await;

Ok(build_res)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ impl Interpreter for MergeIntoInterpreter {
table: self.plan.table.clone(),
};

hook_refresh(self.ctx.clone(), &mut build_res.main_pipeline, refresh_desc).await?;
hook_refresh(self.ctx.clone(), &mut build_res.main_pipeline, refresh_desc).await;
}

Ok(build_res)
Expand Down
2 changes: 1 addition & 1 deletion src/query/service/src/interpreters/interpreter_replace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ impl Interpreter for ReplaceInterpreter {
table: self.plan.table.clone(),
};

hook_refresh(self.ctx.clone(), &mut pipeline.main_pipeline, refresh_desc).await?;
hook_refresh(self.ctx.clone(), &mut pipeline.main_pipeline, refresh_desc).await;
}

Ok(pipeline)
Expand Down
2 changes: 1 addition & 1 deletion src/query/service/src/interpreters/interpreter_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ impl Interpreter for UpdateInterpreter {
table: tbl_name.to_string(),
};

hook_refresh(self.ctx.clone(), &mut build_res.main_pipeline, refresh_desc).await?;
hook_refresh(self.ctx.clone(), &mut build_res.main_pipeline, refresh_desc).await;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use databend_common_catalog::table::TableExt;
use databend_common_exception::Result;
use databend_common_license::license::Feature::VirtualColumn;
use databend_common_license::license_manager::get_license_manager;
use databend_common_meta_app::schema::ListVirtualColumnsReq;
use databend_common_sql::plans::RefreshVirtualColumnPlan;
use databend_common_storages_fuse::FuseTable;
use databend_enterprise_virtual_column::get_virtual_column_handler;
Expand Down Expand Up @@ -47,7 +46,6 @@ impl Interpreter for RefreshVirtualColumnInterpreter {

#[async_backtrace::framed]
async fn execute2(&self) -> Result<PipelineBuildResult> {
let tenant = self.ctx.get_tenant();
let license_manager = get_license_manager();
license_manager
.manager
Expand All @@ -64,25 +62,11 @@ impl Interpreter for RefreshVirtualColumnInterpreter {
// check mutability
table.check_mutable()?;

let catalog = self.ctx.get_catalog(&catalog_name).await?;

let list_virtual_columns_req = ListVirtualColumnsReq {
tenant,
table_id: Some(table.get_id()),
};

let handler = get_virtual_column_handler();
let res = handler
.do_list_virtual_columns(catalog, list_virtual_columns_req)
.await?;

if res.is_empty() {
return Ok(PipelineBuildResult::create());
}
let virtual_columns = res[0].virtual_columns.clone();
let fuse_table = FuseTable::try_from_table(table.as_ref())?;
let virtual_columns = self.plan.virtual_columns.clone();
let segment_locs = self.plan.segment_locs.clone();

let handler = get_virtual_column_handler();
let _ = handler
.do_refresh_virtual_column(fuse_table, self.ctx.clone(), virtual_columns, segment_locs)
.await?;
Expand Down
21 changes: 16 additions & 5 deletions src/query/sql/src/planner/binder/ddl/virtual_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_expression::TableDataType;
use databend_common_expression::TableSchemaRef;
use databend_common_meta_app::schema::ListVirtualColumnsReq;
use log::debug;

use crate::binder::Binder;
Expand Down Expand Up @@ -163,17 +164,27 @@ impl Binder {
self.normalize_object_identifier_triple(catalog, database, table);

let table_info = self.ctx.get_table(&catalog, &database, &table).await?;
if table_info.engine() != "FUSE" {
return Err(ErrorCode::SemanticError(
"Virtual Column only support FUSE engine",
));
}

let catalog_info = self.ctx.get_catalog(&catalog).await?;
let res = catalog_info
.list_virtual_columns(ListVirtualColumnsReq {
tenant: self.ctx.get_tenant(),
table_id: Some(table_info.get_id()),
})
.await?;

let virtual_columns = if res.is_empty() {
vec![]
} else {
res[0].virtual_columns.clone()
};

Ok(Plan::RefreshVirtualColumn(Box::new(
RefreshVirtualColumnPlan {
catalog,
database,
table,
virtual_columns,
segment_locs: None,
},
)))
Expand Down
1 change: 1 addition & 0 deletions src/query/sql/src/planner/plans/ddl/virtual_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ pub struct RefreshVirtualColumnPlan {
pub catalog: String,
pub database: String,
pub table: String,
pub virtual_columns: Vec<String>,
pub segment_locs: Option<Vec<Location>>,
}

Expand Down

0 comments on commit 98935c1

Please sign in to comment.