diff --git a/src/common/base/src/base/progress.rs b/src/common/base/src/base/progress.rs index 1c70375e25ab..a125a545d1a3 100644 --- a/src/common/base/src/base/progress.rs +++ b/src/common/base/src/base/progress.rs @@ -44,6 +44,11 @@ impl Progress { .fetch_add(progress_values.bytes, Ordering::Relaxed); } + pub fn set(&self, progress_values: &ProgressValues) { + self.rows.store(progress_values.rows, Ordering::Relaxed); + self.bytes.store(progress_values.bytes, Ordering::Relaxed); + } + pub fn fetch(&self) -> ProgressValues { let rows = self.rows.fetch_min(0, Ordering::SeqCst); let bytes = self.bytes.fetch_min(0, Ordering::SeqCst); diff --git a/src/query/service/src/interpreters/common/mod.rs b/src/query/service/src/interpreters/common/mod.rs index be9bbc998c77..ff5a1a5ba22d 100644 --- a/src/query/service/src/interpreters/common/mod.rs +++ b/src/query/service/src/interpreters/common/mod.rs @@ -12,21 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. -mod compact_hook; mod grant; mod metrics; mod query_log; -mod refresh_hook; mod stream; mod table; mod task; mod util; -pub use compact_hook::*; pub use grant::validate_grant_object_exists; pub use query_log::InterpreterQueryLog; -pub use refresh_hook::hook_refresh; -pub use refresh_hook::RefreshDesc; pub use stream::build_update_stream_meta_seq; pub use table::check_referenced_computed_columns; pub use task::get_client_config; diff --git a/src/query/service/src/interpreters/common/util.rs b/src/query/service/src/interpreters/common/util.rs index d9043d0da8d7..7cf33e617875 100644 --- a/src/query/service/src/interpreters/common/util.rs +++ b/src/query/service/src/interpreters/common/util.rs @@ -48,6 +48,7 @@ pub async fn check_deduplicate_label(ctx: Arc) -> Result } } +/// create push down filters pub fn create_push_down_filters(scalar: &ScalarExpr) -> Result { let filter = cast_expr_to_non_null_boolean( scalar diff --git a/src/query/service/src/interpreters/common/compact_hook.rs b/src/query/service/src/interpreters/hook/compact_hook.rs similarity index 87% rename from src/query/service/src/interpreters/common/compact_hook.rs rename to src/query/service/src/interpreters/hook/compact_hook.rs index 2e84c4b5b06e..2f0bdd2dd41c 100644 --- a/src/query/service/src/interpreters/common/compact_hook.rs +++ b/src/query/service/src/interpreters/hook/compact_hook.rs @@ -42,10 +42,9 @@ pub struct CompactHookTraceCtx { pub operation_name: String, } -// only if target table have cluster keys defined, and auto-reclustering is enabled, -// we will hook the compact action with a on-finished callback. -// -// errors (if any) are ignored +/// Hook compact action with a on-finished callback. +/// only if target table have cluster keys defined, and auto-reclustering is enabled, +/// errors (if any) are ignored. pub async fn hook_compact( ctx: Arc, pipeline: &mut Pipeline, @@ -59,6 +58,9 @@ pub async fn hook_compact( } } +/// hook the compact action with a on-finished callback. +/// only if target table have cluster keys defined, and auto-reclustering is enabled, +/// errors (if any) are ignored async fn do_hook_compact( ctx: Arc, pipeline: &mut Pipeline, @@ -96,6 +98,9 @@ async fn do_hook_compact( Ok(()) } +/// compact the target table +/// only if target table have cluster keys defined, and auto-reclustering is enabled, +/// errors (if any) are ignored async fn compact_table( ctx: Arc, compact_target: CompactTargetTableDescription, @@ -135,10 +140,15 @@ async fn compact_table( let complete_executor = PipelineCompleteExecutor::from_pipelines(pipelines, settings)?; + // keep the original progress value + let progress_value = ctx.get_write_progress_value(); // Clears previously generated segment locations to avoid duplicate data in the refresh phase ctx.clear_segment_locations()?; ctx.set_executor(complete_executor.get_inner())?; complete_executor.execute()?; + + // reset the progress value + ctx.get_write_progress().set(&progress_value); } Ok(()) } diff --git a/src/query/service/src/interpreters/hook/mod.rs b/src/query/service/src/interpreters/hook/mod.rs new file mode 100644 index 000000000000..44e14c7039e7 --- /dev/null +++ b/src/query/service/src/interpreters/hook/mod.rs @@ -0,0 +1,20 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod compact_hook; +mod refresh_hook; + +pub use compact_hook::*; +pub use refresh_hook::hook_refresh; +pub use refresh_hook::RefreshDesc; diff --git a/src/query/service/src/interpreters/common/refresh_hook.rs b/src/query/service/src/interpreters/hook/refresh_hook.rs similarity index 97% rename from src/query/service/src/interpreters/common/refresh_hook.rs rename to src/query/service/src/interpreters/hook/refresh_hook.rs index c9385df6a2c5..b151ed841325 100644 --- a/src/query/service/src/interpreters/common/refresh_hook.rs +++ b/src/query/service/src/interpreters/hook/refresh_hook.rs @@ -47,6 +47,8 @@ pub struct RefreshDesc { pub table: String, } +/// Hook refresh action with a on-finished callback. +/// errors (if any) are ignored. pub async fn hook_refresh( ctx: Arc, pipeline: &mut Pipeline, @@ -68,7 +70,7 @@ pub async fn hook_refresh( pipeline.set_on_finished(move |err| { if err.is_ok() { info!("execute pipeline finished successfully, starting run refresh job."); - match GlobalIORuntime::instance().block_on(refresh( + match GlobalIORuntime::instance().block_on(do_hook_refresh( ctx, desc, refresh_agg_index, @@ -85,6 +87,86 @@ pub async fn hook_refresh( Ok(()) } +/// Hook refresh action with a on-finished callback. +async fn do_hook_refresh( + ctx: Arc, + desc: RefreshDesc, + refresh_agg_index: bool, + refresh_virtual_column: bool, +) -> Result<()> { + let table_id = ctx + .get_table(&desc.catalog, &desc.database, &desc.table) + .await? + .get_id(); + + let mut plans = Vec::new(); + if refresh_agg_index { + let agg_index_plans = + generate_refresh_index_plan(ctx.clone(), &desc.catalog, table_id).await?; + plans.extend_from_slice(&agg_index_plans); + } + if refresh_virtual_column { + let virtual_column_plan = generate_refresh_virtual_column_plan(ctx.clone(), &desc).await?; + plans.push(virtual_column_plan); + } + + let mut tasks = Vec::with_capacity(std::cmp::min( + ctx.get_settings().get_max_threads()? as usize, + plans.len(), + )); + + for plan in plans { + let ctx_cloned = ctx.clone(); + tasks.push(async move { + match plan { + Plan::RefreshIndex(agg_index_plan) => { + let refresh_agg_index_interpreter = + RefreshIndexInterpreter::try_create(ctx_cloned.clone(), *agg_index_plan)?; + let mut build_res = refresh_agg_index_interpreter.execute2().await?; + if build_res.main_pipeline.is_empty() { + return Ok(()); + } + + let settings = ctx_cloned.get_settings(); + let query_id = ctx_cloned.get_id(); + build_res.set_max_threads(settings.get_max_threads()? as usize); + let settings = ExecutorSettings::try_create(&settings, query_id)?; + + if build_res.main_pipeline.is_complete_pipeline()? { + let mut pipelines = build_res.sources_pipelines; + pipelines.push(build_res.main_pipeline); + + let complete_executor = + PipelineCompleteExecutor::from_pipelines(pipelines, settings)?; + ctx_cloned.set_executor(complete_executor.get_inner())?; + complete_executor.execute() + } else { + Ok(()) + } + } + Plan::RefreshVirtualColumn(virtual_column_plan) => { + let refresh_virtual_column_interpreter = + RefreshVirtualColumnInterpreter::try_create( + ctx_cloned.clone(), + *virtual_column_plan, + )?; + let build_res = refresh_virtual_column_interpreter.execute2().await?; + if !build_res.main_pipeline.is_empty() { + return Err(ErrorCode::Internal( + "Logical error, refresh virtual column is an empty pipeline.", + )); + } + Ok(()) + } + _ => unreachable!(), + } + }); + } + + let _ = futures::future::try_join_all(tasks).await?; + Ok(()) +} + async fn generate_refresh_index_plan( ctx: Arc, catalog: &str, @@ -166,82 +248,3 @@ async fn generate_refresh_virtual_column_plan( Ok(Plan::RefreshVirtualColumn(Box::new(plan))) } - -async fn refresh( - ctx: Arc, - desc: RefreshDesc, - refresh_agg_index: bool, - refresh_virtual_column: bool, -) -> Result<()> { - let table_id = ctx - .get_table(&desc.catalog, &desc.database, &desc.table) - .await? - .get_id(); - - let mut plans = Vec::new(); - if refresh_agg_index { - let agg_index_plans = - generate_refresh_index_plan(ctx.clone(), &desc.catalog, table_id).await?; - plans.extend_from_slice(&agg_index_plans); - } - if refresh_virtual_column { - let virtual_column_plan = generate_refresh_virtual_column_plan(ctx.clone(), &desc).await?; - plans.push(virtual_column_plan); - } - - let mut tasks = Vec::with_capacity(std::cmp::min( - ctx.get_settings().get_max_threads()? as usize, - plans.len(), - )); - - for plan in plans { - let ctx_cloned = ctx.clone(); - tasks.push(async move { - match plan { - Plan::RefreshIndex(agg_index_plan) => { - let refresh_agg_index_interpreter = - RefreshIndexInterpreter::try_create(ctx_cloned.clone(), *agg_index_plan)?; - let mut build_res = refresh_agg_index_interpreter.execute2().await?; - if build_res.main_pipeline.is_empty() { - return Ok(()); - } - - let settings = ctx_cloned.get_settings(); - let query_id = ctx_cloned.get_id(); - build_res.set_max_threads(settings.get_max_threads()? as usize); - let settings = ExecutorSettings::try_create(&settings, query_id)?; - - if build_res.main_pipeline.is_complete_pipeline()? { - let mut pipelines = build_res.sources_pipelines; - pipelines.push(build_res.main_pipeline); - - let complete_executor = - PipelineCompleteExecutor::from_pipelines(pipelines, settings)?; - ctx_cloned.set_executor(complete_executor.get_inner())?; - complete_executor.execute() - } else { - Ok(()) - } - } - Plan::RefreshVirtualColumn(virtual_column_plan) => { - let refresh_virtual_column_interpreter = - RefreshVirtualColumnInterpreter::try_create( - ctx_cloned.clone(), - *virtual_column_plan, - )?; - let build_res = refresh_virtual_column_interpreter.execute2().await?; - if !build_res.main_pipeline.is_empty() { - return Err(ErrorCode::Internal( - "Logical error, refresh virtual column is an empty pipeline.", - )); - } - Ok(()) - } - _ => unreachable!(), - } - }); - } - - let _ = futures::future::try_join_all(tasks).await?; - Ok(()) -} diff --git a/src/query/service/src/interpreters/interpreter_copy_into_table.rs b/src/query/service/src/interpreters/interpreter_copy_into_table.rs index ed2fce8c6f8f..2d9acfee615c 100644 --- a/src/query/service/src/interpreters/interpreter_copy_into_table.rs +++ b/src/query/service/src/interpreters/interpreter_copy_into_table.rs @@ -42,11 +42,11 @@ use log::info; use crate::interpreters::common::build_update_stream_meta_seq; use crate::interpreters::common::check_deduplicate_label; -use crate::interpreters::common::hook_compact; -use crate::interpreters::common::hook_refresh; -use crate::interpreters::common::CompactHookTraceCtx; -use crate::interpreters::common::CompactTargetTableDescription; -use crate::interpreters::common::RefreshDesc; +use crate::interpreters::hook::hook_compact; +use crate::interpreters::hook::hook_refresh; +use crate::interpreters::hook::CompactHookTraceCtx; +use crate::interpreters::hook::CompactTargetTableDescription; +use crate::interpreters::hook::RefreshDesc; use crate::interpreters::Interpreter; use crate::interpreters::SelectInterpreter; use crate::pipelines::PipelineBuildResult; diff --git a/src/query/service/src/interpreters/interpreter_insert.rs b/src/query/service/src/interpreters/interpreter_insert.rs index 15374dcd4c51..700a121a937a 100644 --- a/src/query/service/src/interpreters/interpreter_insert.rs +++ b/src/query/service/src/interpreters/interpreter_insert.rs @@ -32,8 +32,8 @@ use databend_common_sql::NameResolutionContext; use crate::interpreters::common::build_update_stream_meta_seq; use crate::interpreters::common::check_deduplicate_label; -use crate::interpreters::common::hook_refresh; -use crate::interpreters::common::RefreshDesc; +use crate::interpreters::hook::hook_refresh; +use crate::interpreters::hook::RefreshDesc; use crate::interpreters::Interpreter; use crate::interpreters::InterpreterPtr; use crate::pipelines::processors::transforms::TransformRuntimeCastSchema; diff --git a/src/query/service/src/interpreters/interpreter_merge_into.rs b/src/query/service/src/interpreters/interpreter_merge_into.rs index 3739025c0fa1..aae4a1b114a9 100644 --- a/src/query/service/src/interpreters/interpreter_merge_into.rs +++ b/src/query/service/src/interpreters/interpreter_merge_into.rs @@ -56,11 +56,11 @@ use databend_storages_common_table_meta::meta::TableSnapshot; use itertools::Itertools; use crate::interpreters::common::build_update_stream_meta_seq; -use crate::interpreters::common::hook_compact; -use crate::interpreters::common::hook_refresh; -use crate::interpreters::common::CompactHookTraceCtx; -use crate::interpreters::common::CompactTargetTableDescription; -use crate::interpreters::common::RefreshDesc; +use crate::interpreters::hook::hook_compact; +use crate::interpreters::hook::hook_refresh; +use crate::interpreters::hook::CompactHookTraceCtx; +use crate::interpreters::hook::CompactTargetTableDescription; +use crate::interpreters::hook::RefreshDesc; use crate::interpreters::Interpreter; use crate::interpreters::InterpreterPtr; use crate::pipelines::PipelineBuildResult; diff --git a/src/query/service/src/interpreters/interpreter_replace.rs b/src/query/service/src/interpreters/interpreter_replace.rs index 1b37aaa126c2..449ff5fd882d 100644 --- a/src/query/service/src/interpreters/interpreter_replace.rs +++ b/src/query/service/src/interpreters/interpreter_replace.rs @@ -49,11 +49,11 @@ use parking_lot::RwLock; use crate::interpreters::common::build_update_stream_meta_seq; use crate::interpreters::common::check_deduplicate_label; -use crate::interpreters::common::hook_compact; -use crate::interpreters::common::hook_refresh; -use crate::interpreters::common::CompactHookTraceCtx; -use crate::interpreters::common::CompactTargetTableDescription; -use crate::interpreters::common::RefreshDesc; +use crate::interpreters::hook::hook_compact; +use crate::interpreters::hook::hook_refresh; +use crate::interpreters::hook::CompactHookTraceCtx; +use crate::interpreters::hook::CompactTargetTableDescription; +use crate::interpreters::hook::RefreshDesc; use crate::interpreters::interpreter_copy_into_table::CopyIntoTableInterpreter; use crate::interpreters::Interpreter; use crate::interpreters::InterpreterPtr; diff --git a/src/query/service/src/interpreters/interpreter_update.rs b/src/query/service/src/interpreters/interpreter_update.rs index 346ddecbcd1e..6597baaa725b 100644 --- a/src/query/service/src/interpreters/interpreter_update.rs +++ b/src/query/service/src/interpreters/interpreter_update.rs @@ -45,8 +45,8 @@ use log::debug; use crate::interpreters::common::check_deduplicate_label; use crate::interpreters::common::create_push_down_filters; -use crate::interpreters::common::hook_refresh; -use crate::interpreters::common::RefreshDesc; +use crate::interpreters::hook::hook_refresh; +use crate::interpreters::hook::RefreshDesc; use crate::interpreters::interpreter_delete::replace_subquery; use crate::interpreters::interpreter_delete::subquery_filter; use crate::interpreters::Interpreter; diff --git a/src/query/service/src/interpreters/mod.rs b/src/query/service/src/interpreters/mod.rs index 890267e79a89..bbe152bdbfd8 100644 --- a/src/query/service/src/interpreters/mod.rs +++ b/src/query/service/src/interpreters/mod.rs @@ -14,6 +14,7 @@ mod access; mod common; +mod hook; mod interpreter; mod interpreter_catalog_create; mod interpreter_catalog_drop; diff --git a/tests/suites/0_stateless/20+_others/20_0015_compact_hook_stas_issue_13947.result b/tests/suites/0_stateless/20+_others/20_0015_compact_hook_stas_issue_13947.result new file mode 100644 index 000000000000..2c43c9afdad1 --- /dev/null +++ b/tests/suites/0_stateless/20+_others/20_0015_compact_hook_stas_issue_13947.result @@ -0,0 +1,4 @@ +expects .stats.write_progress.rows be 2 +expects .error be null +2 +null diff --git a/tests/suites/0_stateless/20+_others/20_0015_compact_hook_stas_issue_13947.sh b/tests/suites/0_stateless/20+_others/20_0015_compact_hook_stas_issue_13947.sh new file mode 100755 index 000000000000..0bd510417b0c --- /dev/null +++ b/tests/suites/0_stateless/20+_others/20_0015_compact_hook_stas_issue_13947.sh @@ -0,0 +1,30 @@ +#!/usr/bin/env bash + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +. "$CURDIR"/../../../shell_env.sh + +# set up +cat <