Skip to content

Commit

Permalink
fix stream table limit error
Browse files Browse the repository at this point in the history
  • Loading branch information
zhyass committed Dec 19, 2023
1 parent 37e04b9 commit 739adcc
Show file tree
Hide file tree
Showing 11 changed files with 91 additions and 76 deletions.
15 changes: 10 additions & 5 deletions src/query/catalog/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ pub trait Table: Sync + Send {
self.get_table_info().engine()
}

fn support_internal_column_id(&self, _column_id: ColumnId) -> bool {
false
}

fn schema(&self) -> Arc<TableSchema> {
self.get_table_info().schema()
}
Expand Down Expand Up @@ -125,6 +129,12 @@ pub trait Table: Sync + Send {
})
}

async fn source_table(&self, ctx: Arc<dyn TableContext>) -> Result<Option<Arc<dyn Table>>> {
let _ = ctx;

Ok(None)
}

/// Whether the table engine supports prewhere optimization.
/// only Fuse Engine supports this.
fn support_prewhere(&self) -> bool {
Expand All @@ -140,11 +150,6 @@ pub trait Table: Sync + Send {
false
}

/// Whether the table engine supports virtual column `_row_id`.
fn support_row_id_column(&self) -> bool {
false
}

#[async_backtrace::framed]
async fn alter_table_cluster_keys(
&self,
Expand Down
2 changes: 1 addition & 1 deletion src/query/service/src/interpreters/common/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ pub async fn build_update_stream_meta_seq(
for table in tables.into_iter() {
let stream = StreamTable::try_from_table(table.as_ref())?;
let stream_info = stream.get_table_info();
let source_table = stream.source_table(ctx.clone()).await?;
let source_table = stream.source_table(ctx.clone()).await?.unwrap();
let inner_fuse = FuseTable::try_from_table(source_table.as_ref())?;

let table_version = inner_fuse.get_table_info().ident.seq;
Expand Down
3 changes: 2 additions & 1 deletion src/query/service/src/interpreters/interpreter_delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use databend_common_exception::Result;
use databend_common_expression::types::DataType;
use databend_common_expression::types::NumberDataType;
use databend_common_expression::DataBlock;
use databend_common_expression::ROW_ID_COLUMN_ID;
use databend_common_expression::ROW_ID_COL_NAME;
use databend_common_functions::BUILTIN_FUNCTIONS;
use databend_common_meta_app::schema::CatalogInfo;
Expand Down Expand Up @@ -118,7 +119,7 @@ impl Interpreter for DeleteInterpreter {
tbl.check_mutable()?;

let selection = if !self.plan.subquery_desc.is_empty() {
let support_row_id = tbl.support_row_id_column();
let support_row_id = tbl.support_internal_column_id(ROW_ID_COLUMN_ID);
if !support_row_id {
return Err(ErrorCode::from_string(
"table doesn't support row_id, so it can't use delete with subquery"
Expand Down
3 changes: 2 additions & 1 deletion src/query/service/src/interpreters/interpreter_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use databend_common_expression::types::DataType;
use databend_common_expression::types::NumberDataType;
use databend_common_expression::FieldIndex;
use databend_common_expression::RemoteExpr;
use databend_common_expression::ROW_ID_COLUMN_ID;
use databend_common_expression::ROW_ID_COL_NAME;
use databend_common_functions::BUILTIN_FUNCTIONS;
use databend_common_license::license::Feature::ComputedColumn;
Expand Down Expand Up @@ -107,7 +108,7 @@ impl Interpreter for UpdateInterpreter {
tbl.check_mutable()?;

let selection = if !self.plan.subquery_desc.is_empty() {
let support_row_id = tbl.support_row_id_column();
let support_row_id = tbl.support_internal_column_id(ROW_ID_COLUMN_ID);
if !support_row_id {
return Err(ErrorCode::from_string(
"table doesn't support row_id, so it can't use delete with subquery"
Expand Down
14 changes: 3 additions & 11 deletions src/query/service/src/pipelines/builders/builder_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use std::sync::Mutex;
use std::time::Instant;

use databend_common_catalog::table_context::TableContext;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_expression::DataBlock;
use databend_common_pipeline_core::processors::ProcessorPtr;
Expand Down Expand Up @@ -72,16 +71,9 @@ impl PipelineBuilder {

// Fill internal columns if needed.
if let Some(internal_columns) = &scan.internal_column {
if table.support_row_id_column() {
self.main_pipeline.add_transform(|input, output| {
TransformAddInternalColumns::try_create(input, output, internal_columns.clone())
})?;
} else {
return Err(ErrorCode::TableEngineNotSupported(format!(
"Table engine `{}` does not support virtual column _row_id",
table.engine()
)));
}
self.main_pipeline.add_transform(|input, output| {
TransformAddInternalColumns::try_create(input, output, internal_columns.clone())
})?;
}

let schema = scan.source.schema();
Expand Down
7 changes: 6 additions & 1 deletion src/query/sql/src/planner/binder/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use databend_common_exception::Result;
use databend_common_exception::Span;
use databend_common_expression::type_check::common_super_type;
use databend_common_expression::types::DataType;
use databend_common_expression::ROW_ID_COLUMN_ID;
use databend_common_expression::ROW_ID_COL_NAME;
use databend_common_functions::BUILTIN_FUNCTIONS;
use log::warn;
Expand Down Expand Up @@ -842,7 +843,11 @@ impl Binder {
return Ok(());
}

if !metadata.table(0).table().support_row_id_column() {
if !metadata
.table(0)
.table()
.support_internal_column_id(ROW_ID_COLUMN_ID)
{
return Ok(());
}

Expand Down
7 changes: 6 additions & 1 deletion src/query/sql/src/planner/binder/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1203,7 +1203,12 @@ impl Binder {
let table = self.metadata.read().table(table_index).clone();
let table_name = table.name();
let table = table.table();
let statistics_provider = table.column_statistics_provider().await?;
let source = table.source_table(self.ctx.clone()).await?;
let statistics_provider = if let Some(source) = source {
source.column_statistics_provider().await?
} else {
table.column_statistics_provider().await?
};
let table_version = if table.engine() == "STREAM" {
let options = table.options();
let table_version = options
Expand Down
10 changes: 6 additions & 4 deletions src/query/storages/fuse/src/fuse_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,12 @@ use databend_common_catalog::table_context::TableContext;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_expression::BlockThresholds;
use databend_common_expression::ColumnId;
use databend_common_expression::RemoteExpr;
use databend_common_expression::ORIGIN_BLOCK_ID_COL_NAME;
use databend_common_expression::ORIGIN_BLOCK_ROW_NUM_COL_NAME;
use databend_common_expression::ORIGIN_VERSION_COL_NAME;
use databend_common_expression::SNAPSHOT_NAME_COLUMN_ID;
use databend_common_io::constants::DEFAULT_BLOCK_BUFFER_SIZE;
use databend_common_io::constants::DEFAULT_BLOCK_MAX_ROWS;
use databend_common_meta_app::schema::DatabaseType;
Expand Down Expand Up @@ -436,6 +438,10 @@ impl Table for FuseTable {
Some(self.data_metrics.clone())
}

fn support_internal_column_id(&self, column_id: ColumnId) -> bool {
column_id >= SNAPSHOT_NAME_COLUMN_ID
}

fn support_column_projection(&self) -> bool {
true
}
Expand Down Expand Up @@ -813,10 +819,6 @@ impl Table for FuseTable {
true
}

fn support_row_id_column(&self) -> bool {
true
}

fn result_can_be_cached(&self) -> bool {
true
}
Expand Down
32 changes: 16 additions & 16 deletions src/query/storages/stream/src/stream_pruner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,12 @@ impl StreamPruner {
#[async_backtrace::framed]
pub async fn pruning(
self: &Arc<Self>,
mut block_metas: Vec<Arc<BlockMeta>>,
block_metas: Vec<Arc<BlockMeta>>,
) -> Result<Vec<(BlockMetaIndex, Arc<BlockMeta>)>> {
let mut remain = block_metas.len() % self.max_concurrency;
let batch_size = block_metas.len() / self.max_concurrency;
let mut works = Vec::with_capacity(self.max_concurrency);
let mut block_metas = block_metas.into_iter().enumerate().collect::<Vec<_>>();

while !block_metas.is_empty() {
let gap_size = std::cmp::min(1, remain);
Expand Down Expand Up @@ -194,7 +195,7 @@ impl StreamPruner {
pub async fn block_pruning(
&self,
bloom_pruner: &Arc<dyn BloomPruner + Send + Sync>,
block_metas: Vec<Arc<BlockMeta>>,
block_metas: Vec<(usize, Arc<BlockMeta>)>,
) -> Result<Vec<(BlockMetaIndex, Arc<BlockMeta>)>> {
let pruning_stats = self.pruning_ctx.pruning_stats.clone();
let pruning_runtime = &self.pruning_ctx.pruning_runtime;
Expand All @@ -203,15 +204,18 @@ impl StreamPruner {
let range_pruner = self.pruning_ctx.range_pruner.clone();
let page_pruner = self.pruning_ctx.page_pruner.clone();

let mut blocks = block_metas.iter().enumerate();
let mut blocks = block_metas.into_iter();
let pruning_tasks = std::iter::from_fn(|| {
// check limit speculatively
if limit_pruner.exceeded() {
return None;
}

type BlockPruningFutureReturn =
Pin<Box<dyn Future<Output = (usize, bool, Option<Range<usize>>, String)> + Send>>;
type BlockPruningFutureReturn = Pin<
Box<
dyn Future<Output = (usize, bool, Option<Range<usize>>, Arc<BlockMeta>)> + Send,
>,
>;
type BlockPruningFuture =
Box<dyn FnOnce(OwnedSemaphorePermit) -> BlockPruningFutureReturn + Send + 'static>;

Expand Down Expand Up @@ -273,9 +277,9 @@ impl StreamPruner {

let (keep, range) =
page_pruner.should_keep(&block_meta.cluster_stats);
(block_idx, keep, range, block_meta.location.0.clone())
(block_idx, keep, range, block_meta)
} else {
(block_idx, keep, None, block_meta.location.0.clone())
(block_idx, keep, None, block_meta)
}
})
});
Expand All @@ -284,7 +288,7 @@ impl StreamPruner {
let v: BlockPruningFuture = Box::new(move |permit: OwnedSemaphorePermit| {
Box::pin(async move {
let _permit = permit;
(block_idx, false, None, block_meta.location.0.clone())
(block_idx, false, None, block_meta)
})
});
v
Expand All @@ -304,20 +308,16 @@ impl StreamPruner {

let mut result = Vec::with_capacity(joint.len());
for item in joint {
let (block_idx, keep, range, block_location) = item;
let (block_idx, keep, range, block) = item;
if keep {
let block = block_metas[block_idx].clone();

debug_assert_eq!(block_location, block.location.0);

result.push((
BlockMetaIndex {
segment_idx: 0,
block_idx,
range,
page_size: block.page_size() as usize,
block_id: 0,
block_location: block_location.clone(),
block_location: block.location.0.clone(),
segment_location: "".to_string(),
snapshot_location: None,
},
Expand All @@ -336,7 +336,7 @@ impl StreamPruner {

fn block_pruning_sync(
&self,
block_metas: Vec<Arc<BlockMeta>>,
block_metas: Vec<(usize, Arc<BlockMeta>)>,
) -> Result<Vec<(BlockMetaIndex, Arc<BlockMeta>)>> {
let pruning_stats = self.pruning_ctx.pruning_stats.clone();
let limit_pruner = self.pruning_ctx.limit_pruner.clone();
Expand All @@ -346,7 +346,7 @@ impl StreamPruner {
let start = Instant::now();

let mut result = Vec::with_capacity(block_metas.len());
for (block_idx, block_meta) in block_metas.into_iter().enumerate() {
for (block_idx, block_meta) in block_metas.into_iter() {
// Perf.
{
metrics_inc_blocks_range_pruning_before(1);
Expand Down
Loading

0 comments on commit 739adcc

Please sign in to comment.