Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(storage): refactor compact source #16527

Merged
merged 1 commit into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 135 additions & 6 deletions src/query/service/src/pipelines/builders/builder_compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,35 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use databend_common_base::runtime::Runtime;
use databend_common_catalog::plan::PartInfoType;
use databend_common_catalog::plan::Partitions;
use databend_common_catalog::plan::PartitionsShuffleKind;
use databend_common_catalog::plan::Projection;
use databend_common_catalog::table::Table;
use databend_common_catalog::table_context::TableContext;
use databend_common_exception::Result;
use databend_common_pipeline_sources::EmptySource;
use databend_common_sql::executor::physical_plans::CompactSource;
use databend_common_pipeline_sources::PrefetchAsyncSourcer;
use databend_common_pipeline_transforms::processors::TransformPipelineHelper;
use databend_common_sql::executor::physical_plans::CompactSource as PhysicalCompactSource;
use databend_common_sql::executor::physical_plans::MutationKind;
use databend_common_sql::StreamContext;
use databend_common_storages_fuse::operations::BlockCompactMutator;
use databend_common_storages_fuse::operations::CompactLazyPartInfo;
use databend_common_storages_fuse::operations::CompactSource;
use databend_common_storages_fuse::operations::CompactTransform;
use databend_common_storages_fuse::operations::TableMutationAggregator;
use databend_common_storages_fuse::operations::TransformSerializeBlock;
use databend_common_storages_fuse::FuseTable;

use crate::pipelines::PipelineBuilder;

impl PipelineBuilder {
pub(crate) fn build_compact_source(&mut self, compact_block: &CompactSource) -> Result<()> {
pub(crate) fn build_compact_source(
&mut self,
compact_block: &PhysicalCompactSource,
) -> Result<()> {
let table = self
.ctx
.build_table_by_table_info(&compact_block.table_info, None)?;
Expand All @@ -30,11 +50,120 @@ impl PipelineBuilder {
return self.main_pipeline.add_source(EmptySource::create, 1);
}

table.build_compact_source(
let is_lazy = compact_block.parts.partitions_type() == PartInfoType::LazyLevel;
let thresholds = table.get_block_thresholds();
let cluster_key_id = table.cluster_key_id();
let mut max_threads = self.ctx.get_settings().get_max_threads()? as usize;

if is_lazy {
let query_ctx = self.ctx.clone();

let lazy_parts = compact_block
.parts
.partitions
.iter()
.map(|v| {
v.as_any()
.downcast_ref::<CompactLazyPartInfo>()
.unwrap()
.clone()
})
.collect::<Vec<_>>();

let column_ids = compact_block.column_ids.clone();
self.main_pipeline.set_on_init(move || {
let ctx = query_ctx.clone();
let partitions = Runtime::with_worker_threads(2, None)?.block_on(async move {
let partitions = BlockCompactMutator::build_compact_tasks(
ctx.clone(),
column_ids.clone(),
cluster_key_id,
thresholds,
lazy_parts,
)
.await?;

Result::<_>::Ok(partitions)
})?;

let partitions = Partitions::create(PartitionsShuffleKind::Mod, partitions);
query_ctx.set_partitions(partitions)?;
Ok(())
});
} else {
max_threads = max_threads.min(compact_block.parts.len()).max(1);
self.ctx.set_partitions(compact_block.parts.clone())?;
}

let block_reader = table.create_block_reader(
self.ctx.clone(),
Projection::Columns(table.all_column_indices()),
false,
table.change_tracking_enabled(),
false,
)?;
let stream_ctx = if table.change_tracking_enabled() {
Some(StreamContext::try_create(
self.ctx.get_function_context()?,
table.schema_with_stream(),
table.get_table_info().ident.seq,
false,
false,
)?)
} else {
None
};
// Add source pipe.
self.main_pipeline.add_source(
|output| {
let source = CompactSource::create(self.ctx.clone(), block_reader.clone(), 1);
PrefetchAsyncSourcer::create(self.ctx.clone(), output, source)
},
max_threads,
)?;
let storage_format = table.get_storage_format();
self.main_pipeline.add_block_meta_transformer(|| {
CompactTransform::create(
self.ctx.clone(),
block_reader.clone(),
storage_format,
stream_ctx.clone(),
)
});

// sort
let cluster_stats_gen = table.cluster_gen_for_append(
self.ctx.clone(),
compact_block.parts.clone(),
compact_block.column_ids.clone(),
&mut self.main_pipeline,
)
thresholds,
None,
)?;
self.main_pipeline.add_transform(|input, output| {
let proc = TransformSerializeBlock::try_create(
self.ctx.clone(),
input,
output,
table,
cluster_stats_gen.clone(),
MutationKind::Compact,
)?;
proc.into_processor()
})?;

if is_lazy {
self.main_pipeline.try_resize(1)?;
self.main_pipeline.add_async_accumulating_transformer(|| {
TableMutationAggregator::create(
table,
self.ctx.clone(),
vec![],
vec![],
vec![],
Default::default(),
MutationKind::Compact,
)
});
}
Ok(())
}
}
4 changes: 4 additions & 0 deletions src/query/storages/fuse/src/fuse_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,10 @@ impl FuseTable {
};
Ok(retention_period)
}

pub fn get_storage_format(&self) -> FuseStorageFormat {
self.storage_format
}
}

#[async_trait::async_trait]
Expand Down
134 changes: 0 additions & 134 deletions src/query/storages/fuse/src/operations/compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,31 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashSet;
use std::sync::Arc;

use databend_common_base::runtime::Runtime;
use databend_common_catalog::plan::PartInfoType;
use databend_common_catalog::plan::Partitions;
use databend_common_catalog::plan::PartitionsShuffleKind;
use databend_common_catalog::plan::Projection;
use databend_common_catalog::table::CompactionLimits;
use databend_common_exception::Result;
use databend_common_expression::ColumnId;
use databend_common_expression::ComputedExpr;
use databend_common_expression::FieldIndex;
use databend_common_pipeline_core::Pipeline;
use databend_common_pipeline_transforms::processors::TransformPipelineHelper;
use databend_common_sql::executor::physical_plans::MutationKind;
use databend_common_sql::StreamContext;
use databend_storages_common_table_meta::meta::Statistics;
use databend_storages_common_table_meta::meta::TableSnapshot;

use crate::operations::common::TableMutationAggregator;
use crate::operations::common::TransformSerializeBlock;
use crate::operations::mutation::BlockCompactMutator;
use crate::operations::mutation::CompactLazyPartInfo;
use crate::operations::mutation::CompactSource;
use crate::operations::mutation::SegmentCompactMutator;
use crate::FuseTable;
use crate::Table;
Expand Down Expand Up @@ -119,125 +104,6 @@ impl FuseTable {
)))
}

pub fn build_compact_source(
&self,
ctx: Arc<dyn TableContext>,
parts: Partitions,
column_ids: HashSet<ColumnId>,
pipeline: &mut Pipeline,
) -> Result<()> {
let is_lazy = parts.partitions_type() == PartInfoType::LazyLevel;
let thresholds = self.get_block_thresholds();
let cluster_key_id = self.cluster_key_id();
let mut max_threads = ctx.get_settings().get_max_threads()? as usize;

if is_lazy {
let query_ctx = ctx.clone();

let lazy_parts = parts
.partitions
.into_iter()
.map(|v| {
v.as_any()
.downcast_ref::<CompactLazyPartInfo>()
.unwrap()
.clone()
})
.collect::<Vec<_>>();

pipeline.set_on_init(move || {
let ctx = query_ctx.clone();
let column_ids = column_ids.clone();
let partitions = Runtime::with_worker_threads(2, None)?.block_on(async move {
let partitions = BlockCompactMutator::build_compact_tasks(
ctx.clone(),
column_ids,
cluster_key_id,
thresholds,
lazy_parts,
)
.await?;

Result::<_>::Ok(partitions)
})?;

let partitions = Partitions::create(PartitionsShuffleKind::Mod, partitions);
query_ctx.set_partitions(partitions)?;
Ok(())
});
} else {
max_threads = max_threads.min(parts.len()).max(1);
ctx.set_partitions(parts)?;
}

let all_column_indices = self.all_column_indices();
let projection = Projection::Columns(all_column_indices);
let block_reader = self.create_block_reader(
ctx.clone(),
projection,
false,
self.change_tracking_enabled(),
false,
)?;
let stream_ctx = if self.change_tracking_enabled() {
Some(StreamContext::try_create(
ctx.get_function_context()?,
self.schema_with_stream(),
self.get_table_info().ident.seq,
false,
false,
)?)
} else {
None
};
// Add source pipe.
pipeline.add_source(
|output| {
CompactSource::try_create(
ctx.clone(),
self.storage_format,
block_reader.clone(),
stream_ctx.clone(),
output,
)
},
max_threads,
)?;

// sort
let cluster_stats_gen =
self.cluster_gen_for_append(ctx.clone(), pipeline, thresholds, None)?;
pipeline.add_transform(
|input: Arc<databend_common_pipeline_core::processors::InputPort>, output| {
let proc = TransformSerializeBlock::try_create(
ctx.clone(),
input,
output,
self,
cluster_stats_gen.clone(),
MutationKind::Compact,
)?;
proc.into_processor()
},
)?;

if is_lazy {
pipeline.try_resize(1)?;
pipeline.add_async_accumulating_transformer(|| {
TableMutationAggregator::create(
self,
ctx.clone(),
vec![],
vec![],
vec![],
Statistics::default(),
MutationKind::Compact,
)
});
}
Ok(())
}

async fn compact_options_with_segment_limit(
&self,
num_segment_limit: Option<usize>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pub use compact_part::CompactExtraInfo;
pub use compact_part::CompactLazyPartInfo;
pub use compact_part::CompactTaskInfo;
pub use mutation_meta::ClusterStatsGenType;
pub use mutation_meta::CompactSourceMeta;
pub use mutation_meta::SerializeBlock;
pub use mutation_meta::SerializeDataMeta;
pub use mutation_part::DeletedSegmentInfo;
Expand Down
Loading
Loading