Skip to content

Commit

Permalink
redesign
Browse files Browse the repository at this point in the history
  • Loading branch information
xudong963 committed Dec 9, 2023
1 parent ee430e8 commit 31861bb
Show file tree
Hide file tree
Showing 21 changed files with 139 additions and 55 deletions.
2 changes: 2 additions & 0 deletions src/query/catalog/src/plan/datasource/datasource_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ pub struct DataSourcePlan {

// data mask policy for `output_schema` columns
pub data_mask_policy: Option<BTreeMap<FieldIndex, RemoteExpr>>,

pub table_index: usize,
}

impl DataSourcePlan {
Expand Down
7 changes: 5 additions & 2 deletions src/query/catalog/src/table_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ use common_base::base::Progress;
use common_base::base::ProgressValues;
use common_exception::ErrorCode;
use common_exception::Result;
use common_expression::{DataBlock, Expr};
use common_expression::DataBlock;
use common_expression::Expr;
use common_expression::FunctionContext;
use common_io::prelude::FormatSettings;
use common_meta_app::principal::FileFormatParams;
Expand Down Expand Up @@ -228,5 +229,7 @@ pub trait TableContext: Send + Sync {
/// Get license key from context, return empty if license is not found or error happened.
fn get_license_key(&self) -> String;

// fn set_runtime_filter(&self, filters: HashMap<String, Expr<String>>)
fn set_runtime_filter(&self, filters: (usize, Vec<Expr<String>>));

fn get_runtime_filter_with_id(&self, id: usize) -> Vec<Expr<String>>;
}
10 changes: 0 additions & 10 deletions src/query/pipeline/sources/src/sync_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,13 @@
// limitations under the License.

use std::any::Any;
use std::collections::HashMap;
use std::sync::Arc;

use common_base::base::Progress;
use common_base::base::ProgressValues;
use common_catalog::table_context::TableContext;
use common_exception::Result;
use common_expression::DataBlock;
use common_expression::Expr;
use common_pipeline_core::processors::Event;
use common_pipeline_core::processors::OutputPort;
use common_pipeline_core::processors::Processor;
Expand All @@ -34,14 +32,6 @@ pub trait SyncSource: Send {
const NAME: &'static str;

fn generate(&mut self) -> Result<Option<DataBlock>>;

fn can_add_runtime_filter(&self) -> bool {
false
}

fn add_runtime_filters(&mut self, _filters: &HashMap<String, Expr<String>>) -> Result<()> {
Ok(())
}
}

// TODO: This can be refactored using proc macros
Expand Down
6 changes: 4 additions & 2 deletions src/query/service/src/pipelines/builders/builder_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,18 +124,20 @@ impl PipelineBuilder {
}

pub(crate) fn build_join(&mut self, join: &HashJoin) -> Result<()> {
let state = self.build_join_state(join)?;
let id = join.probe.get_table_index();
let state = self.build_join_state(join, id)?;
self.expand_build_side_pipeline(&join.build, join, state.clone())?;
self.build_join_probe(join, state)
}

fn build_join_state(&mut self, join: &HashJoin) -> Result<Arc<HashJoinState>> {
fn build_join_state(&mut self, join: &HashJoin, id: IndexType) -> Result<Arc<HashJoinState>> {
HashJoinState::try_create(
self.ctx.clone(),
join.build.output_schema()?,
&join.build_projections,
HashJoinDesc::create(join)?,
&join.probe_to_build,
id,
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ impl PipelineBuilder {
base_block_ids: None,
update_stream_columns: table.change_tracking_enabled(),
data_mask_policy: None,
table_index: usize::MAX,
};

self.ctx.set_partitions(plan.parts.clone())?;
Expand Down
3 changes: 0 additions & 3 deletions src/query/service/src/pipelines/executor/executor_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::collections::VecDeque;
use std::fmt::Debug;
use std::fmt::Formatter;
Expand All @@ -24,7 +23,6 @@ use common_base::runtime::TrackedFuture;
use common_base::runtime::TrySpawn;
use common_exception::ErrorCode;
use common_exception::Result;
use common_expression::Expr;
use common_pipeline_core::processors::profile::Profile;
use common_pipeline_core::processors::EventCause;
use common_pipeline_core::Pipeline;
Expand All @@ -37,7 +35,6 @@ use petgraph::dot::Dot;
use petgraph::prelude::EdgeIndex;
use petgraph::prelude::NodeIndex;
use petgraph::prelude::StableGraph;
use petgraph::stable_graph::Neighbors;
use petgraph::Direction;

use crate::pipelines::executor::ExecutorTask;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
// limitations under the License.

use std::cell::SyncUnsafeCell;
use std::collections::HashMap;
use std::collections::HashSet;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::AtomicI8;
Expand All @@ -28,7 +27,6 @@ use common_catalog::table_context::TableContext;
use common_exception::ErrorCode;
use common_exception::Result;
use common_expression::DataSchemaRef;
use common_expression::Expr;
use common_expression::HashMethodFixedKeys;
use common_expression::HashMethodSerializer;
use common_expression::HashMethodSingleString;
Expand All @@ -37,6 +35,7 @@ use common_hashtable::HashtableKeyable;
use common_hashtable::StringHashJoinHashMap;
use common_sql::plans::JoinType;
use common_sql::ColumnSet;
use common_sql::IndexType;
use ethnum::U256;
use parking_lot::RwLock;

Expand Down Expand Up @@ -121,10 +120,8 @@ pub struct HashJoinState {
/// If partition_id is -1, it means all partitions are spilled.
pub(crate) partition_id: AtomicI8,

/// Runtime filters
pub(crate) runtime_filters: RwLock<HashMap<String, Expr<String>>>,
/// If the join node generate runtime filters, the scan node will use it to do prune.
pub(crate) scan_node_id: AtomicUsize,
pub(crate) table_index: IndexType,
}

impl HashJoinState {
Expand All @@ -134,6 +131,7 @@ impl HashJoinState {
build_projections: &ColumnSet,
hash_join_desc: HashJoinDesc,
probe_to_build: &[(usize, (bool, bool))],
table_index: IndexType,
) -> Result<Arc<HashJoinState>> {
if matches!(
hash_join_desc.join_type,
Expand Down Expand Up @@ -162,8 +160,7 @@ impl HashJoinState {
continue_build_watcher,
_continue_build_dummy_receiver,
partition_id: AtomicI8::new(-2),
runtime_filters: Default::default(),
scan_node_id: Default::default(),
table_index,
}))
}

Expand Down Expand Up @@ -264,22 +261,17 @@ impl HashJoinState {
data_blocks.clear();
return Ok(());
}
let mut runtime_filters = self.runtime_filters.write();
for (build_key, probe_key) in self
.hash_join_desc
.build_keys
.iter()
.zip(self.hash_join_desc.probe_keys_rt.iter())
{
if let Some(filter) = inlist_filter(&func_ctx, build_key, probe_key, data_blocks)? {
runtime_filters.insert(filter.0, filter.1);
self.ctx.set_runtime_filter((self.table_index, filter))
}
}
data_blocks.clear();
Ok(())
}

pub(crate) fn set_scan_node_id(&self, id: usize) {
self.scan_node_id.store(id, Ordering::Release);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
// limitations under the License.

use std::any::Any;
use std::collections::HashMap;
use std::collections::VecDeque;
use std::sync::atomic::Ordering;
use std::sync::Arc;
Expand All @@ -22,7 +21,6 @@ use common_catalog::table_context::TableContext;
use common_exception::ErrorCode;
use common_exception::Result;
use common_expression::DataBlock;
use common_expression::Expr;
use common_expression::FunctionContext;
use common_sql::optimizer::ColumnSet;
use common_sql::plans::JoinType;
Expand Down Expand Up @@ -73,8 +71,6 @@ pub struct TransformHashJoinProbe {
// If input data can't find proper partitions to spill,
// directly probe them with hashtable.
need_spill: bool,

runtime_filters: HashMap<String, Expr<String>>,
}

impl TransformHashJoinProbe {
Expand Down Expand Up @@ -114,7 +110,6 @@ impl TransformHashJoinProbe {
spill_state: probe_spill_state,
processor_id: id,
need_spill: true,
runtime_filters: Default::default(),
}))
}

Expand Down Expand Up @@ -405,12 +400,6 @@ impl Processor for TransformHashJoinProbe {
.hash_join_state
.wait_first_round_build_done()
.await?;
self.runtime_filters = self
.join_probe_state
.hash_join_state
.runtime_filters
.read()
.clone();
} else {
self.join_probe_state
.hash_join_state
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ pub(crate) fn inlist_filter(
build_key: &Expr,
probe_key: &Expr<String>,
build_blocks: &[DataBlock],
) -> Result<Option<(String, Expr<String>)>> {
) -> Result<Option<Vec<Expr<String>>>> {
// Currently, only support key is a column, will support more later.
// Such as t1.a + 1 = t2.a, or t1.a + t1.b = t2.a (left side is probe side)
if let Expr::ColumnRef {
Expand Down Expand Up @@ -124,7 +124,7 @@ pub(crate) fn inlist_filter(
args,
};
let expr = type_check::check(&contain_func, &BUILTIN_FUNCTIONS)?;
return Ok(Some((id.to_string(), expr)));
return Ok(Some(vec![expr]));
}
Ok(None)
}
3 changes: 3 additions & 0 deletions src/query/service/src/schedulers/fragments/fragmenter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,8 @@ impl PhysicalPlanReplacer for Fragmenter {
.all(|fragment| !matches!(&fragment.exchange, Some(DataExchange::Merge(_)))),
)?;

let table_index = plan.get_table_index();

let mut source_fragment = PlanFragment {
plan,
fragment_type,
Expand All @@ -311,6 +313,7 @@ impl PhysicalPlanReplacer for Fragmenter {
query_id: self.query_id.clone(),

source_fragment_id,
table_index,
}))
}
}
29 changes: 29 additions & 0 deletions src/query/service/src/sessions/query_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ use common_exception::ErrorCode;
use common_exception::Result;
use common_expression::date_helper::TzFactory;
use common_expression::DataBlock;
use common_expression::Expr;
use common_expression::FunctionContext;
use common_io::prelude::FormatSettings;
use common_meta_app::principal::FileFormatParams;
Expand Down Expand Up @@ -850,6 +851,34 @@ impl TableContext for QueryContext {

queries_profile
}

fn set_runtime_filter(&self, filters: (IndexType, Vec<Expr<String>>)) {
let mut runtime_filters = self.shared.runtime_filters.write();
match runtime_filters.entry(filters.0) {
Entry::Vacant(v) => {
info!(
"set {:?} runtime filters for {:?}",
filters.1.len(),
filters.0
);
v.insert(filters.1);
}
Entry::Occupied(mut v) => {
info!(
"add {:?} runtime filters for {:?}",
filters.1.len(),
filters.0
);
v.get_mut().extend(filters.1);
}
}
}

fn get_runtime_filter_with_id(&self, id: IndexType) -> Vec<Expr<String>> {
let runtime_filters = self.shared.runtime_filters.read();
// If don't find the runtime filters, return empty vector.
runtime_filters.get(&id).cloned().unwrap_or_default()
}
}

impl TrySpawn for QueryContext {
Expand Down
4 changes: 4 additions & 0 deletions src/query/service/src/sessions/query_ctx_shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,14 @@ use common_catalog::table_context::MaterializedCtesBlocks;
use common_catalog::table_context::StageAttachment;
use common_exception::ErrorCode;
use common_exception::Result;
use common_expression::Expr;
use common_meta_app::principal::OnErrorMode;
use common_meta_app::principal::RoleInfo;
use common_meta_app::principal::UserDefinedConnection;
use common_meta_app::principal::UserInfo;
use common_pipeline_core::InputError;
use common_settings::Settings;
use common_sql::IndexType;
use common_storage::CopyStatus;
use common_storage::DataOperator;
use common_storage::MergeStatus;
Expand Down Expand Up @@ -104,6 +106,7 @@ pub struct QueryContextShared {
pub(in crate::sessions) user_agent: Arc<RwLock<String>>,
/// Key is (cte index, used_count), value contains cte's materialized blocks
pub(in crate::sessions) materialized_cte_tables: MaterializedCtesBlocks,
pub(in crate::sessions) runtime_filters: Arc<RwLock<HashMap<IndexType, Vec<Expr<String>>>>>,
}

impl QueryContextShared {
Expand Down Expand Up @@ -145,6 +148,7 @@ impl QueryContextShared {
join_spill_progress: Arc::new(Progress::create()),
agg_spill_progress: Arc::new(Progress::create()),
group_by_spill_progress: Arc::new(Progress::create()),
runtime_filters: Default::default(),
}))
}

Expand Down
9 changes: 9 additions & 0 deletions src/query/service/tests/it/sql/exec/get_table_bind_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use common_catalog::table_context::TableContext;
use common_exception::ErrorCode;
use common_exception::Result;
use common_expression::DataBlock;
use common_expression::Expr;
use common_expression::FunctionContext;
use common_io::prelude::FormatSettings;
use common_meta_app::principal::FileFormatParams;
Expand Down Expand Up @@ -104,6 +105,7 @@ use common_meta_types::MetaId;
use common_pipeline_core::processors::profile::Profile;
use common_pipeline_core::InputError;
use common_settings::Settings;
use common_sql::IndexType;
use common_sql::Planner;
use common_storage::CopyStatus;
use common_storage::DataOperator;
Expand Down Expand Up @@ -706,6 +708,13 @@ impl TableContext for CtxDelegation {
fn get_merge_status(&self) -> Arc<RwLock<MergeStatus>> {
todo!()
}

fn set_runtime_filter(&self, _filters: (IndexType, Vec<Expr<String>>)) {
todo!()
}
fn get_runtime_filter_with_id(&self, _id: IndexType) -> Vec<Expr<String>> {
todo!()
}
}

#[tokio::test(flavor = "multi_thread")]
Expand Down
Loading

0 comments on commit 31861bb

Please sign in to comment.