Skip to content

Commit

Permalink
Merge branch 'main' into fix-mod
Browse files Browse the repository at this point in the history
  • Loading branch information
BohuTANG committed Dec 16, 2023
2 parents 6035d12 + a732f4a commit 56177eb
Show file tree
Hide file tree
Showing 14 changed files with 177 additions and 108 deletions.
5 changes: 5 additions & 0 deletions src/common/base/src/base/progress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ impl Progress {
.fetch_add(progress_values.bytes, Ordering::Relaxed);
}

pub fn set(&self, progress_values: &ProgressValues) {
self.rows.store(progress_values.rows, Ordering::Relaxed);
self.bytes.store(progress_values.bytes, Ordering::Relaxed);
}

pub fn fetch(&self) -> ProgressValues {
let rows = self.rows.fetch_min(0, Ordering::SeqCst);
let bytes = self.bytes.fetch_min(0, Ordering::SeqCst);
Expand Down
5 changes: 0 additions & 5 deletions src/query/service/src/interpreters/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.

mod compact_hook;
mod grant;
mod metrics;
mod query_log;
mod refresh_hook;
mod stream;
mod table;
mod task;
mod util;

pub use compact_hook::*;
pub use grant::validate_grant_object_exists;
pub use query_log::InterpreterQueryLog;
pub use refresh_hook::hook_refresh;
pub use refresh_hook::RefreshDesc;
pub use stream::build_update_stream_meta_seq;
pub use table::check_referenced_computed_columns;
pub use task::get_client_config;
Expand Down
1 change: 1 addition & 0 deletions src/query/service/src/interpreters/common/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ pub async fn check_deduplicate_label(ctx: Arc<dyn TableContext>) -> Result<bool>
}
}

/// create push down filters
pub fn create_push_down_filters(scalar: &ScalarExpr) -> Result<Filters> {
let filter = cast_expr_to_non_null_boolean(
scalar
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,9 @@ pub struct CompactHookTraceCtx {
pub operation_name: String,
}

// only if target table have cluster keys defined, and auto-reclustering is enabled,
// we will hook the compact action with a on-finished callback.
//
// errors (if any) are ignored
/// Hook compact action with a on-finished callback.
/// only if target table have cluster keys defined, and auto-reclustering is enabled,
/// errors (if any) are ignored.
pub async fn hook_compact(
ctx: Arc<QueryContext>,
pipeline: &mut Pipeline,
Expand All @@ -59,6 +58,9 @@ pub async fn hook_compact(
}
}

/// hook the compact action with a on-finished callback.
/// only if target table have cluster keys defined, and auto-reclustering is enabled,
/// errors (if any) are ignored
async fn do_hook_compact(
ctx: Arc<QueryContext>,
pipeline: &mut Pipeline,
Expand Down Expand Up @@ -96,6 +98,9 @@ async fn do_hook_compact(
Ok(())
}

/// compact the target table
/// only if target table have cluster keys defined, and auto-reclustering is enabled,
/// errors (if any) are ignored
async fn compact_table(
ctx: Arc<QueryContext>,
compact_target: CompactTargetTableDescription,
Expand Down Expand Up @@ -135,10 +140,15 @@ async fn compact_table(

let complete_executor = PipelineCompleteExecutor::from_pipelines(pipelines, settings)?;

// keep the original progress value
let progress_value = ctx.get_write_progress_value();
// Clears previously generated segment locations to avoid duplicate data in the refresh phase
ctx.clear_segment_locations()?;
ctx.set_executor(complete_executor.get_inner())?;
complete_executor.execute()?;

// reset the progress value
ctx.get_write_progress().set(&progress_value);
}
Ok(())
}
20 changes: 20 additions & 0 deletions src/query/service/src/interpreters/hook/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

mod compact_hook;
mod refresh_hook;

pub use compact_hook::*;
pub use refresh_hook::hook_refresh;
pub use refresh_hook::RefreshDesc;
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ pub struct RefreshDesc {
pub table: String,
}

/// Hook refresh action with a on-finished callback.
/// errors (if any) are ignored.
pub async fn hook_refresh(
ctx: Arc<QueryContext>,
pipeline: &mut Pipeline,
Expand All @@ -68,7 +70,7 @@ pub async fn hook_refresh(
pipeline.set_on_finished(move |err| {
if err.is_ok() {
info!("execute pipeline finished successfully, starting run refresh job.");
match GlobalIORuntime::instance().block_on(refresh(
match GlobalIORuntime::instance().block_on(do_hook_refresh(
ctx,
desc,
refresh_agg_index,
Expand All @@ -85,6 +87,86 @@ pub async fn hook_refresh(
Ok(())
}

/// Hook refresh action with a on-finished callback.
async fn do_hook_refresh(
ctx: Arc<QueryContext>,
desc: RefreshDesc,
refresh_agg_index: bool,
refresh_virtual_column: bool,
) -> Result<()> {
let table_id = ctx
.get_table(&desc.catalog, &desc.database, &desc.table)
.await?
.get_id();

let mut plans = Vec::new();
if refresh_agg_index {
let agg_index_plans =
generate_refresh_index_plan(ctx.clone(), &desc.catalog, table_id).await?;
plans.extend_from_slice(&agg_index_plans);
}
if refresh_virtual_column {
let virtual_column_plan = generate_refresh_virtual_column_plan(ctx.clone(), &desc).await?;
plans.push(virtual_column_plan);
}

let mut tasks = Vec::with_capacity(std::cmp::min(
ctx.get_settings().get_max_threads()? as usize,
plans.len(),
));

for plan in plans {
let ctx_cloned = ctx.clone();
tasks.push(async move {
match plan {
Plan::RefreshIndex(agg_index_plan) => {
let refresh_agg_index_interpreter =
RefreshIndexInterpreter::try_create(ctx_cloned.clone(), *agg_index_plan)?;
let mut build_res = refresh_agg_index_interpreter.execute2().await?;
if build_res.main_pipeline.is_empty() {
return Ok(());
}

let settings = ctx_cloned.get_settings();
let query_id = ctx_cloned.get_id();
build_res.set_max_threads(settings.get_max_threads()? as usize);
let settings = ExecutorSettings::try_create(&settings, query_id)?;

if build_res.main_pipeline.is_complete_pipeline()? {
let mut pipelines = build_res.sources_pipelines;
pipelines.push(build_res.main_pipeline);

let complete_executor =
PipelineCompleteExecutor::from_pipelines(pipelines, settings)?;
ctx_cloned.set_executor(complete_executor.get_inner())?;
complete_executor.execute()
} else {
Ok(())
}
}
Plan::RefreshVirtualColumn(virtual_column_plan) => {
let refresh_virtual_column_interpreter =
RefreshVirtualColumnInterpreter::try_create(
ctx_cloned.clone(),
*virtual_column_plan,
)?;
let build_res = refresh_virtual_column_interpreter.execute2().await?;
if !build_res.main_pipeline.is_empty() {
return Err(ErrorCode::Internal(
"Logical error, refresh virtual column is an empty pipeline.",
));
}
Ok(())
}
_ => unreachable!(),
}
});
}

let _ = futures::future::try_join_all(tasks).await?;
Ok(())
}

async fn generate_refresh_index_plan(
ctx: Arc<QueryContext>,
catalog: &str,
Expand Down Expand Up @@ -166,82 +248,3 @@ async fn generate_refresh_virtual_column_plan(

Ok(Plan::RefreshVirtualColumn(Box::new(plan)))
}

async fn refresh(
ctx: Arc<QueryContext>,
desc: RefreshDesc,
refresh_agg_index: bool,
refresh_virtual_column: bool,
) -> Result<()> {
let table_id = ctx
.get_table(&desc.catalog, &desc.database, &desc.table)
.await?
.get_id();

let mut plans = Vec::new();
if refresh_agg_index {
let agg_index_plans =
generate_refresh_index_plan(ctx.clone(), &desc.catalog, table_id).await?;
plans.extend_from_slice(&agg_index_plans);
}
if refresh_virtual_column {
let virtual_column_plan = generate_refresh_virtual_column_plan(ctx.clone(), &desc).await?;
plans.push(virtual_column_plan);
}

let mut tasks = Vec::with_capacity(std::cmp::min(
ctx.get_settings().get_max_threads()? as usize,
plans.len(),
));

for plan in plans {
let ctx_cloned = ctx.clone();
tasks.push(async move {
match plan {
Plan::RefreshIndex(agg_index_plan) => {
let refresh_agg_index_interpreter =
RefreshIndexInterpreter::try_create(ctx_cloned.clone(), *agg_index_plan)?;
let mut build_res = refresh_agg_index_interpreter.execute2().await?;
if build_res.main_pipeline.is_empty() {
return Ok(());
}

let settings = ctx_cloned.get_settings();
let query_id = ctx_cloned.get_id();
build_res.set_max_threads(settings.get_max_threads()? as usize);
let settings = ExecutorSettings::try_create(&settings, query_id)?;

if build_res.main_pipeline.is_complete_pipeline()? {
let mut pipelines = build_res.sources_pipelines;
pipelines.push(build_res.main_pipeline);

let complete_executor =
PipelineCompleteExecutor::from_pipelines(pipelines, settings)?;
ctx_cloned.set_executor(complete_executor.get_inner())?;
complete_executor.execute()
} else {
Ok(())
}
}
Plan::RefreshVirtualColumn(virtual_column_plan) => {
let refresh_virtual_column_interpreter =
RefreshVirtualColumnInterpreter::try_create(
ctx_cloned.clone(),
*virtual_column_plan,
)?;
let build_res = refresh_virtual_column_interpreter.execute2().await?;
if !build_res.main_pipeline.is_empty() {
return Err(ErrorCode::Internal(
"Logical error, refresh virtual column is an empty pipeline.",
));
}
Ok(())
}
_ => unreachable!(),
}
});
}

let _ = futures::future::try_join_all(tasks).await?;
Ok(())
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,11 @@ use log::info;

use crate::interpreters::common::build_update_stream_meta_seq;
use crate::interpreters::common::check_deduplicate_label;
use crate::interpreters::common::hook_compact;
use crate::interpreters::common::hook_refresh;
use crate::interpreters::common::CompactHookTraceCtx;
use crate::interpreters::common::CompactTargetTableDescription;
use crate::interpreters::common::RefreshDesc;
use crate::interpreters::hook::hook_compact;
use crate::interpreters::hook::hook_refresh;
use crate::interpreters::hook::CompactHookTraceCtx;
use crate::interpreters::hook::CompactTargetTableDescription;
use crate::interpreters::hook::RefreshDesc;
use crate::interpreters::Interpreter;
use crate::interpreters::SelectInterpreter;
use crate::pipelines::PipelineBuildResult;
Expand Down
4 changes: 2 additions & 2 deletions src/query/service/src/interpreters/interpreter_insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ use databend_common_sql::NameResolutionContext;

use crate::interpreters::common::build_update_stream_meta_seq;
use crate::interpreters::common::check_deduplicate_label;
use crate::interpreters::common::hook_refresh;
use crate::interpreters::common::RefreshDesc;
use crate::interpreters::hook::hook_refresh;
use crate::interpreters::hook::RefreshDesc;
use crate::interpreters::Interpreter;
use crate::interpreters::InterpreterPtr;
use crate::pipelines::processors::transforms::TransformRuntimeCastSchema;
Expand Down
10 changes: 5 additions & 5 deletions src/query/service/src/interpreters/interpreter_merge_into.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,11 @@ use databend_storages_common_table_meta::meta::TableSnapshot;
use itertools::Itertools;

use crate::interpreters::common::build_update_stream_meta_seq;
use crate::interpreters::common::hook_compact;
use crate::interpreters::common::hook_refresh;
use crate::interpreters::common::CompactHookTraceCtx;
use crate::interpreters::common::CompactTargetTableDescription;
use crate::interpreters::common::RefreshDesc;
use crate::interpreters::hook::hook_compact;
use crate::interpreters::hook::hook_refresh;
use crate::interpreters::hook::CompactHookTraceCtx;
use crate::interpreters::hook::CompactTargetTableDescription;
use crate::interpreters::hook::RefreshDesc;
use crate::interpreters::Interpreter;
use crate::interpreters::InterpreterPtr;
use crate::pipelines::PipelineBuildResult;
Expand Down
10 changes: 5 additions & 5 deletions src/query/service/src/interpreters/interpreter_replace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,11 @@ use parking_lot::RwLock;

use crate::interpreters::common::build_update_stream_meta_seq;
use crate::interpreters::common::check_deduplicate_label;
use crate::interpreters::common::hook_compact;
use crate::interpreters::common::hook_refresh;
use crate::interpreters::common::CompactHookTraceCtx;
use crate::interpreters::common::CompactTargetTableDescription;
use crate::interpreters::common::RefreshDesc;
use crate::interpreters::hook::hook_compact;
use crate::interpreters::hook::hook_refresh;
use crate::interpreters::hook::CompactHookTraceCtx;
use crate::interpreters::hook::CompactTargetTableDescription;
use crate::interpreters::hook::RefreshDesc;
use crate::interpreters::interpreter_copy_into_table::CopyIntoTableInterpreter;
use crate::interpreters::Interpreter;
use crate::interpreters::InterpreterPtr;
Expand Down
4 changes: 2 additions & 2 deletions src/query/service/src/interpreters/interpreter_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ use log::debug;

use crate::interpreters::common::check_deduplicate_label;
use crate::interpreters::common::create_push_down_filters;
use crate::interpreters::common::hook_refresh;
use crate::interpreters::common::RefreshDesc;
use crate::interpreters::hook::hook_refresh;
use crate::interpreters::hook::RefreshDesc;
use crate::interpreters::interpreter_delete::replace_subquery;
use crate::interpreters::interpreter_delete::subquery_filter;
use crate::interpreters::Interpreter;
Expand Down
1 change: 1 addition & 0 deletions src/query/service/src/interpreters/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

mod access;
mod common;
mod hook;
mod interpreter;
mod interpreter_catalog_create;
mod interpreter_catalog_drop;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
expects .stats.write_progress.rows be 2
expects .error be null
2
null
Loading

0 comments on commit 56177eb

Please sign in to comment.