Skip to content

Commit

Permalink
chore(query): add snapshot logs in read partitions (#16918)
Browse files Browse the repository at this point in the history
  • Loading branch information
sundy-li authored Nov 23, 2024
1 parent 62e5f24 commit d09ffa5
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use databend_common_catalog::plan::split_row_id;
use databend_common_catalog::plan::PartInfoPtr;
use databend_common_catalog::plan::Projection;
use databend_common_catalog::table::Table;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_expression::DataBlock;
use databend_common_expression::DataSchema;
Expand Down Expand Up @@ -142,6 +143,19 @@ impl<const BLOCKING_IO: bool> RowsFetcher for ParquetRowsFetcher<BLOCKING_IO> {
})
.collect::<Vec<_>>();

// check if row index is in valid bounds cause we don't ensure rowid is valid
for (block_idx, row_idx, _) in indices.iter() {
if *block_idx as usize >= blocks.len()
|| *row_idx as usize >= blocks[*block_idx as usize].num_rows()
{
return Err(ErrorCode::Internal(format!(
"RowID is invalid, block idx {block_idx}, row idx {row_idx}, blocks len {}, block idx len {:?}",
blocks.len(),
blocks.get(*block_idx as usize).map(|b| b.num_rows()),
)));
}
}

Ok(DataBlock::take_blocks(&blocks, &indices, num_rows))
}

Expand Down
9 changes: 7 additions & 2 deletions src/query/storages/fuse/src/operations/read_partitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ use databend_storages_common_pruner::BlockMetaIndex;
use databend_storages_common_table_meta::meta::BlockMeta;
use databend_storages_common_table_meta::meta::ColumnStatistics;
use databend_storages_common_table_meta::table::ChangeType;
use log::debug;
use log::info;
use sha2::Digest;
use sha2::Sha256;
Expand All @@ -62,7 +61,6 @@ impl FuseTable {
dry_run: bool,
) -> Result<(PartStatistics, Partitions)> {
let distributed_pruning = ctx.get_settings().get_enable_distributed_pruning()?;
debug!("fuse table do read partitions, push downs:{:?}", push_downs);
if let Some(changes_desc) = &self.changes_desc {
// For "ANALYZE TABLE" statement, we need set the default change type to "Insert".
let change_type = push_downs.as_ref().map_or(ChangeType::Insert, |v| {
Expand All @@ -74,6 +72,13 @@ impl FuseTable {
}

let snapshot = self.read_table_snapshot().await?;

info!(
"fuse table {} do read partitions, push downs:{:?}, snapshot id: {:?}",
self.name(),
push_downs,
snapshot.as_ref().map(|sn| sn.snapshot_id)
);
match snapshot {
Some(snapshot) => {
let snapshot_loc = self
Expand Down

0 comments on commit d09ffa5

Please sign in to comment.