Skip to content

Commit

Permalink
add stream statistics
Browse files Browse the repository at this point in the history
  • Loading branch information
zhyass committed Dec 20, 2023
1 parent 739adcc commit dcfd256
Show file tree
Hide file tree
Showing 20 changed files with 223 additions and 138 deletions.
23 changes: 14 additions & 9 deletions src/query/catalog/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ pub trait Table: Sync + Send {
self.get_table_info().engine()
}

fn support_internal_column_id(&self, _column_id: ColumnId) -> bool {
/// Whether the table engine supports the given internal column.
fn supported_internal_column(&self, _column_id: ColumnId) -> bool {
false
}

Expand Down Expand Up @@ -129,12 +130,6 @@ pub trait Table: Sync + Send {
})
}

async fn source_table(&self, ctx: Arc<dyn TableContext>) -> Result<Option<Arc<dyn Table>>> {
let _ = ctx;

Ok(None)
}

/// Whether the table engine supports prewhere optimization.
/// only Fuse Engine supports this.
fn support_prewhere(&self) -> bool {
Expand Down Expand Up @@ -274,12 +269,22 @@ pub trait Table: Sync + Send {
Ok(())
}

async fn table_statistics(&self) -> Result<Option<TableStatistics>> {
async fn table_statistics(
&self,
ctx: Arc<dyn TableContext>,
) -> Result<Option<TableStatistics>> {
let _ = ctx;

Ok(None)
}

#[async_backtrace::framed]
async fn column_statistics_provider(&self) -> Result<Box<dyn ColumnStatisticsProvider>> {
async fn column_statistics_provider(
&self,
ctx: Arc<dyn TableContext>,
) -> Result<Box<dyn ColumnStatisticsProvider>> {
let _ = ctx;

Ok(Box::new(DummyColumnStatisticsProvider))
}

Expand Down
5 changes: 0 additions & 5 deletions src/query/expression/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,6 @@ pub fn is_internal_column_id(column_id: ColumnId) -> bool {
column_id >= CHANGE_ROW_ID_COLUMN_ID
}

#[inline]
pub fn is_internal_stream_column_id(column_id: ColumnId) -> bool {
(CHANGE_ROW_ID_COLUMN_ID..=BASE_ROW_ID_COLUMN_ID).contains(&column_id)
}

#[inline]
pub fn is_internal_column(column_name: &str) -> bool {
matches!(
Expand Down
2 changes: 1 addition & 1 deletion src/query/service/src/interpreters/common/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ pub async fn build_update_stream_meta_seq(
for table in tables.into_iter() {
let stream = StreamTable::try_from_table(table.as_ref())?;
let stream_info = stream.get_table_info();
let source_table = stream.source_table(ctx.clone()).await?.unwrap();
let source_table = stream.source_table(ctx.clone()).await?;
let inner_fuse = FuseTable::try_from_table(source_table.as_ref())?;

let table_version = inner_fuse.get_table_info().ident.seq;
Expand Down
2 changes: 1 addition & 1 deletion src/query/service/src/interpreters/interpreter_delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ impl Interpreter for DeleteInterpreter {
tbl.check_mutable()?;

let selection = if !self.plan.subquery_desc.is_empty() {
let support_row_id = tbl.support_internal_column_id(ROW_ID_COLUMN_ID);
let support_row_id = tbl.supported_internal_column(ROW_ID_COLUMN_ID);
if !support_row_id {
return Err(ErrorCode::from_string(
"table doesn't support row_id, so it can't use delete with subquery"
Expand Down
6 changes: 5 additions & 1 deletion src/query/service/src/interpreters/interpreter_replace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,11 @@ impl ReplaceInterpreter {
.get_replace_into_bloom_pruning_max_column_number()?;
let bloom_filter_column_indexes = if !table.cluster_keys(self.ctx.clone()).is_empty() {
fuse_table
.choose_bloom_filter_columns(&on_conflicts, max_num_pruning_columns)
.choose_bloom_filter_columns(
self.ctx.clone(),
&on_conflicts,
max_num_pruning_columns,
)
.await?
} else {
vec![]
Expand Down
2 changes: 1 addition & 1 deletion src/query/service/src/interpreters/interpreter_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ impl Interpreter for UpdateInterpreter {
tbl.check_mutable()?;

let selection = if !self.plan.subquery_desc.is_empty() {
let support_row_id = tbl.support_internal_column_id(ROW_ID_COLUMN_ID);
let support_row_id = tbl.supported_internal_column(ROW_ID_COLUMN_ID);
if !support_row_id {
return Err(ErrorCode::from_string(
"table doesn't support row_id, so it can't use delete with subquery"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,10 @@ impl Table for NumbersTable {
Ok(())
}

async fn table_statistics(&self) -> Result<Option<TableStatistics>> {
async fn table_statistics(
&self,
_ctx: Arc<dyn TableContext>,
) -> Result<Option<TableStatistics>> {
Ok(Some(TableStatistics {
num_rows: Some(self.total),
data_size: Some(self.total * 8),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,10 @@ async fn test_compact_segment_resolvable_conflict() -> Result<()> {
let ctx = fixture.new_query_ctx().await?;
let latest = table.refresh(ctx.as_ref()).await?;
let latest_fuse_table = FuseTable::try_from_table(latest.as_ref())?;
let table_statistics = latest_fuse_table.table_statistics().await?.unwrap();
let table_statistics = latest_fuse_table
.table_statistics(ctx.clone())
.await?
.unwrap();

assert_eq!(table_statistics.num_rows.unwrap() as usize, num_inserts * 2);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ async fn test_table_modify_column_ndv_statistics() -> Result<()> {
assert_eq!(num_inserts, query_count(stream).await? as usize);

let expected = HashMap::from([(0, num_inserts as u64)]);
check_column_ndv_statistics(table.clone(), expected.clone()).await?;
check_column_ndv_statistics(ctx.clone(), table.clone(), expected.clone()).await?;

// append the same values again, and ndv does changed.
append_rows(ctx.clone(), num_inserts).await?;
Expand All @@ -73,7 +73,7 @@ async fn test_table_modify_column_ndv_statistics() -> Result<()> {
let stream = fixture.execute_query(count_qry).await?;
assert_eq!(num_inserts * 2, query_count(stream).await? as usize);

check_column_ndv_statistics(table.clone(), expected.clone()).await?;
check_column_ndv_statistics(ctx.clone(), table.clone(), expected.clone()).await?;

// delete
ctx.evict_table_from_cache("default", "default", "t")?;
Expand All @@ -87,7 +87,7 @@ async fn test_table_modify_column_ndv_statistics() -> Result<()> {
fixture.execute_command(statistics_sql).await?;

// check count: delete not affect counts
check_column_ndv_statistics(table.clone(), expected).await?;
check_column_ndv_statistics(ctx, table.clone(), expected).await?;

Ok(())
}
Expand Down Expand Up @@ -165,10 +165,11 @@ async fn test_table_update_analyze_statistics() -> Result<()> {
}

async fn check_column_ndv_statistics(
ctx: Arc<dyn TableContext>,
table: Arc<dyn Table>,
expected: HashMap<u32, u64>,
) -> Result<()> {
let provider = table.column_statistics_provider().await?;
let provider = table.column_statistics_provider(ctx).await?;

for (i, num) in expected.iter() {
let stat = provider.column_statistics(*i);
Expand Down
3 changes: 1 addition & 2 deletions src/query/sql/src/planner/binder/bind_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ use databend_common_catalog::plan::InternalColumn;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_exception::Span;
use databend_common_expression::is_internal_stream_column_id;
use databend_common_expression::ColumnId;
use databend_common_expression::DataField;
use databend_common_expression::DataSchemaRef;
Expand Down Expand Up @@ -556,7 +555,7 @@ impl BindContext {

let metadata = metadata.read();
let table = metadata.table(table_index);
if table.table().engine() != "STREAM" && is_internal_stream_column_id(column_id) {
if !table.table().supported_internal_column(column_id) {
return Err(ErrorCode::SemanticError(format!(
"Internal column `{}` is not allowed in table `{}`",
column_binding.internal_column.column_name(),
Expand Down
2 changes: 1 addition & 1 deletion src/query/sql/src/planner/binder/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -846,7 +846,7 @@ impl Binder {
if !metadata
.table(0)
.table()
.support_internal_column_id(ROW_ID_COLUMN_ID)
.supported_internal_column(ROW_ID_COLUMN_ID)
{
return Ok(());
}
Expand Down
9 changes: 2 additions & 7 deletions src/query/sql/src/planner/binder/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1203,12 +1203,7 @@ impl Binder {
let table = self.metadata.read().table(table_index).clone();
let table_name = table.name();
let table = table.table();
let source = table.source_table(self.ctx.clone()).await?;
let statistics_provider = if let Some(source) = source {
source.column_statistics_provider().await?
} else {
table.column_statistics_provider().await?
};
let statistics_provider = table.column_statistics_provider(self.ctx.clone()).await?;
let table_version = if table.engine() == "STREAM" {
let options = table.options();
let table_version = options
Expand Down Expand Up @@ -1345,7 +1340,7 @@ impl Binder {
predicates.push(predicate);
}

let stat = table.table_statistics().await?;
let stat = table.table_statistics(self.ctx.clone()).await?;
let scan = SExpr::create_leaf(Arc::new(
Scan {
table_index,
Expand Down
12 changes: 9 additions & 3 deletions src/query/storages/fuse/src/fuse_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ impl Table for FuseTable {
Some(self.data_metrics.clone())
}

fn support_internal_column_id(&self, column_id: ColumnId) -> bool {
fn supported_internal_column(&self, column_id: ColumnId) -> bool {
column_id >= SNAPSHOT_NAME_COLUMN_ID
}

Expand Down Expand Up @@ -688,7 +688,10 @@ impl Table for FuseTable {
self.do_analyze(&ctx).await
}

async fn table_statistics(&self) -> Result<Option<TableStatistics>> {
async fn table_statistics(
&self,
_ctx: Arc<dyn TableContext>,
) -> Result<Option<TableStatistics>> {
let stats = match self.table_type {
FuseTableType::AttachedReadOnly => {
let snapshot = self.read_table_snapshot().await?.ok_or_else(|| {
Expand Down Expand Up @@ -723,7 +726,10 @@ impl Table for FuseTable {
}

#[async_backtrace::framed]
async fn column_statistics_provider(&self) -> Result<Box<dyn ColumnStatisticsProvider>> {
async fn column_statistics_provider(
&self,
_ctx: Arc<dyn TableContext>,
) -> Result<Box<dyn ColumnStatisticsProvider>> {
let provider = if let Some(snapshot) = self.read_table_snapshot().await? {
let stats = &snapshot.summary.col_stats;
let table_statistics = self.read_table_snapshot_statistics(Some(&snapshot)).await?;
Expand Down
3 changes: 2 additions & 1 deletion src/query/storages/fuse/src/operations/replace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,10 +177,11 @@ impl FuseTable {
// are supported by bloom index.
pub async fn choose_bloom_filter_columns(
&self,
ctx: Arc<dyn TableContext>,
on_conflicts: &[OnConflictField],
max_num_columns: u64,
) -> Result<Vec<FieldIndex>> {
let col_stats_provider = self.column_statistics_provider().await?;
let col_stats_provider = self.column_statistics_provider(ctx).await?;
let mut cols = on_conflicts
.iter()
.enumerate()
Expand Down
5 changes: 4 additions & 1 deletion src/query/storages/hive/hive/src/hive_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -619,7 +619,10 @@ impl Table for HiveTable {
Ok(None)
}

async fn table_statistics(&self) -> Result<Option<TableStatistics>> {
async fn table_statistics(
&self,
_ctx: Arc<dyn TableContext>,
) -> Result<Option<TableStatistics>> {
Ok(None)
}

Expand Down
10 changes: 8 additions & 2 deletions src/query/storages/parquet/src/parquet2/parquet_table/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,10 @@ impl Table for Parquet2Table {
true
}

async fn table_statistics(&self) -> Result<Option<TableStatistics>> {
async fn table_statistics(
&self,
_ctx: Arc<dyn TableContext>,
) -> Result<Option<TableStatistics>> {
let s = &self.table_info.meta.statistics;
Ok(Some(TableStatistics {
num_rows: Some(s.number_of_rows),
Expand All @@ -128,7 +131,10 @@ impl Table for Parquet2Table {
}

#[async_backtrace::framed]
async fn column_statistics_provider(&self) -> Result<Box<dyn ColumnStatisticsProvider>> {
async fn column_statistics_provider(
&self,
_ctx: Arc<dyn TableContext>,
) -> Result<Box<dyn ColumnStatisticsProvider>> {
Ok(Box::new(self.column_statistics_provider.clone()))
}

Expand Down
10 changes: 8 additions & 2 deletions src/query/storages/parquet/src/parquet_rs/parquet_table/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,10 @@ impl Table for ParquetRSTable {
true
}

async fn column_statistics_provider(&self) -> Result<Box<dyn ColumnStatisticsProvider>> {
async fn column_statistics_provider(
&self,
_ctx: Arc<dyn TableContext>,
) -> Result<Box<dyn ColumnStatisticsProvider>> {
if !self.need_stats_provider {
return Ok(Box::new(DummyColumnStatisticsProvider));
}
Expand Down Expand Up @@ -302,7 +305,10 @@ impl Table for ParquetRSTable {
Ok(Box::new(provider))
}

async fn table_statistics(&self) -> Result<Option<TableStatistics>> {
async fn table_statistics(
&self,
_ctx: Arc<dyn TableContext>,
) -> Result<Option<TableStatistics>> {
// Unwrap safety: no other thread will hold this lock.
let parquet_metas = self.parquet_metas.try_lock().unwrap();
if parquet_metas.is_empty() {
Expand Down
Loading

0 comments on commit dcfd256

Please sign in to comment.