From a402d6cfbcf1b3e7ac8fc534eee9fdd2aef9b84f Mon Sep 17 00:00:00 2001 From: Sky Fan <3374614481@qq.com> Date: Wed, 25 Sep 2024 19:18:47 +0800 Subject: [PATCH] refactor: unify path of write segment (#16517) * refactor: unify path of write segment * fix missing header --- .../storages/fuse/operations/vacuum_table.rs | 4 +- src/query/service/src/test_kits/fuse.rs | 9 +-- .../operations/mutation/recluster_mutator.rs | 9 ++- .../mutation/segments_compact_mutator.rs | 9 ++- src/query/storages/common/cache/src/caches.rs | 7 -- .../common/table_meta/src/meta/v4/segment.rs | 4 -- src/query/storages/fuse/src/io/mod.rs | 1 - src/query/storages/fuse/src/io/segments.rs | 18 ------ .../storages/fuse/src/io/write/meta_writer.rs | 18 +++--- src/query/storages/fuse/src/io/write/mod.rs | 2 - .../fuse/src/io/write/segment_writer.rs | 64 ------------------- .../transform_mutation_aggregator.rs | 12 ++-- src/query/storages/fuse/src/operations/gc.rs | 15 +++-- .../mutator/segment_compact_mutator.rs | 27 +++++--- 14 files changed, 55 insertions(+), 144 deletions(-) delete mode 100644 src/query/storages/fuse/src/io/write/segment_writer.rs diff --git a/src/query/ee/src/storages/fuse/operations/vacuum_table.rs b/src/query/ee/src/storages/fuse/operations/vacuum_table.rs index 9c5485104888..f8f8905daaf9 100644 --- a/src/query/ee/src/storages/fuse/operations/vacuum_table.rs +++ b/src/query/ee/src/storages/fuse/operations/vacuum_table.rs @@ -29,7 +29,7 @@ use databend_common_storages_fuse::io::SnapshotsIO; use databend_common_storages_fuse::io::TableMetaLocationGenerator; use databend_common_storages_fuse::FuseTable; use databend_storages_common_cache::LoadParams; -use databend_storages_common_table_meta::meta::CompactSegmentInfo; +use databend_storages_common_table_meta::meta::SegmentInfo; use crate::storages::fuse::get_snapshot_referenced_segments; @@ -200,7 +200,7 @@ pub async fn do_gc_orphan_files( // 2.2 Delete all the orphan segment files to be purged let purged_file_num = segment_locations_to_be_purged.len(); fuse_table - .try_purge_location_files_and_cache::( + .try_purge_location_files_and_cache::( ctx.clone(), HashSet::from_iter(segment_locations_to_be_purged.into_iter()), ) diff --git a/src/query/service/src/test_kits/fuse.rs b/src/query/service/src/test_kits/fuse.rs index 5f1ae981fe82..669a45328cf3 100644 --- a/src/query/service/src/test_kits/fuse.rs +++ b/src/query/service/src/test_kits/fuse.rs @@ -29,7 +29,6 @@ use databend_common_expression::SendableDataBlockStream; use databend_common_sql::optimizer::SExpr; use databend_common_storages_factory::Table; use databend_common_storages_fuse::io::MetaWriter; -use databend_common_storages_fuse::io::SegmentWriter; use databend_common_storages_fuse::statistics::gen_columns_statistics; use databend_common_storages_fuse::statistics::merge_statistics; use databend_common_storages_fuse::statistics::reducers::reduce_block_metas; @@ -132,9 +131,11 @@ pub async fn generate_segments( let block_metas = generate_blocks(fuse_table, blocks_per_segment).await?; let summary = reduce_block_metas(&block_metas, BlockThresholds::default(), None); let segment_info = SegmentInfo::new(block_metas, summary); - let segment_writer = SegmentWriter::new(dal, fuse_table.meta_location_generator()); - let segment_location = segment_writer.write_segment_no_cache(&segment_info).await?; - segs.push((segment_location, segment_info)) + let location = fuse_table + .meta_location_generator() + .gen_segment_info_location(); + segment_info.write_meta(dal, &location).await?; + segs.push(((location, SegmentInfo::VERSION), segment_info)) } Ok(segs) } diff --git a/src/query/service/tests/it/storages/fuse/operations/mutation/recluster_mutator.rs b/src/query/service/tests/it/storages/fuse/operations/mutation/recluster_mutator.rs index 7bb1773e2ccf..0044c201dd87 100644 --- a/src/query/service/tests/it/storages/fuse/operations/mutation/recluster_mutator.rs +++ b/src/query/service/tests/it/storages/fuse/operations/mutation/recluster_mutator.rs @@ -28,7 +28,7 @@ use databend_common_expression::DataBlock; use databend_common_expression::Scalar; use databend_common_expression::TableSchema; use databend_common_expression::TableSchemaRef; -use databend_common_storages_fuse::io::SegmentWriter; +use databend_common_storages_fuse::io::MetaWriter; use databend_common_storages_fuse::io::TableMetaLocationGenerator; use databend_common_storages_fuse::operations::ReclusterMode; use databend_common_storages_fuse::operations::ReclusterMutator; @@ -60,7 +60,6 @@ async fn test_recluster_mutator_block_select() -> Result<()> { let location_generator = TableMetaLocationGenerator::with_prefix("_prefix".to_owned()); let data_accessor = ctx.get_application_level_data_operator()?.operator(); - let seg_writer = SegmentWriter::new(&data_accessor, &location_generator); let cluster_key_id = 0; let gen_test_seg = |cluster_stats: Option| async { @@ -88,7 +87,11 @@ async fn test_recluster_mutator_block_select() -> Result<()> { ); let segment = SegmentInfo::new(vec![test_block_meta], statistics); - Ok::<_, ErrorCode>((seg_writer.write_segment(segment).await?, location)) + let segment_location = location_generator.gen_segment_info_location(); + segment + .write_meta(&data_accessor, &segment_location) + .await?; + Ok::<_, ErrorCode>(((segment_location, SegmentInfo::VERSION), location)) }; let mut test_segment_locations = vec![]; diff --git a/src/query/service/tests/it/storages/fuse/operations/mutation/segments_compact_mutator.rs b/src/query/service/tests/it/storages/fuse/operations/mutation/segments_compact_mutator.rs index a9c6c307b118..efeba9caf134 100644 --- a/src/query/service/tests/it/storages/fuse/operations/mutation/segments_compact_mutator.rs +++ b/src/query/service/tests/it/storages/fuse/operations/mutation/segments_compact_mutator.rs @@ -36,7 +36,6 @@ use databend_common_storages_fuse::io::serialize_block; use databend_common_storages_fuse::io::CompactSegmentInfoReader; use databend_common_storages_fuse::io::MetaReaders; use databend_common_storages_fuse::io::MetaWriter; -use databend_common_storages_fuse::io::SegmentWriter; use databend_common_storages_fuse::io::SegmentsIO; use databend_common_storages_fuse::io::TableMetaLocationGenerator; use databend_common_storages_fuse::io::WriteSettings; @@ -664,13 +663,13 @@ impl CompactSegmentTestFixture { let fuse_segment_io = SegmentsIO::create(self.ctx.clone(), data_accessor.clone(), schema); let max_theads = self.ctx.get_settings().get_max_threads()? as usize; - let segment_writer = SegmentWriter::new(data_accessor, location_gen); let seg_acc = SegmentCompactor::new( block_per_seg, cluster_key_id, max_theads, &fuse_segment_io, - segment_writer.clone(), + data_accessor, + location_gen, ); let rows_per_block = vec![1; num_block_of_segments.len()]; @@ -974,7 +973,6 @@ async fn test_compact_segment_with_cluster() -> Result<()> { settings.set_max_threads(2)?; settings.set_max_storage_io_requests(4)?; - let segment_writer = SegmentWriter::new(&data_accessor, &location_gen); let compact_segment_reader = MetaReaders::segment_info_reader(data_accessor.clone(), schema.clone()); let fuse_segment_io = SegmentsIO::create(ctx.clone(), data_accessor.clone(), schema); @@ -1027,7 +1025,8 @@ async fn test_compact_segment_with_cluster() -> Result<()> { Some(cluster_key_id), chunk_size, &fuse_segment_io, - segment_writer.clone(), + &data_accessor, + &location_gen, ); let state = seg_acc .compact(locations, limit, |status| { diff --git a/src/query/storages/common/cache/src/caches.rs b/src/query/storages/common/cache/src/caches.rs index 3a4f900f9e8a..8d36618e42d2 100644 --- a/src/query/storages/common/cache/src/caches.rs +++ b/src/query/storages/common/cache/src/caches.rs @@ -73,13 +73,6 @@ pub trait CachedObject { fn cache() -> Option; } -impl CachedObject for CompactSegmentInfo { - type Cache = CompactSegmentInfoCache; - fn cache() -> Option { - CacheManager::instance().get_table_segment_cache() - } -} - impl CachedObject for SegmentInfo { type Cache = CompactSegmentInfoCache; fn cache() -> Option { diff --git a/src/query/storages/common/table_meta/src/meta/v4/segment.rs b/src/query/storages/common/table_meta/src/meta/v4/segment.rs index 8b9abf202839..81a8a524aa44 100644 --- a/src/query/storages/common/table_meta/src/meta/v4/segment.rs +++ b/src/query/storages/common/table_meta/src/meta/v4/segment.rs @@ -216,10 +216,6 @@ pub struct CompactSegmentInfo { } impl CompactSegmentInfo { - pub fn from_slice(bytes: &[u8]) -> Result { - Self::from_reader(Cursor::new(bytes)) - } - pub fn from_reader(mut r: impl Read) -> Result { let SegmentHeader { version, diff --git a/src/query/storages/fuse/src/io/mod.rs b/src/query/storages/fuse/src/io/mod.rs index 06bb6b8b3d4d..53b2809a3c01 100644 --- a/src/query/storages/fuse/src/io/mod.rs +++ b/src/query/storages/fuse/src/io/mod.rs @@ -52,5 +52,4 @@ pub use write::CachedMetaWriter; pub use write::InvertedIndexBuilder; pub use write::InvertedIndexWriter; pub use write::MetaWriter; -pub use write::SegmentWriter; pub use write::WriteSettings; diff --git a/src/query/storages/fuse/src/io/segments.rs b/src/query/storages/fuse/src/io/segments.rs index 93e49492440e..c1c18ee89d20 100644 --- a/src/query/storages/fuse/src/io/segments.rs +++ b/src/query/storages/fuse/src/io/segments.rs @@ -19,13 +19,10 @@ use databend_common_catalog::table_context::TableContext; use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_expression::TableSchemaRef; -use databend_storages_common_cache::CacheAccessor; -use databend_storages_common_cache::CacheManager; use databend_storages_common_cache::LoadParams; use databend_storages_common_table_meta::meta::CompactSegmentInfo; use databend_storages_common_table_meta::meta::Location; use databend_storages_common_table_meta::meta::SegmentInfo; -use databend_storages_common_table_meta::meta::Versioned; use fastrace::func_path; use fastrace::prelude::*; use opendal::Operator; @@ -116,19 +113,4 @@ impl SegmentsIO { ) .await } - - #[async_backtrace::framed] - pub async fn write_segment(dal: Operator, serialized_segment: SerializedSegment) -> Result<()> { - assert_eq!( - serialized_segment.segment.format_version, - SegmentInfo::VERSION - ); - let raw_bytes = serialized_segment.segment.to_bytes()?; - let compact_segment_info = CompactSegmentInfo::from_slice(&raw_bytes)?; - dal.write(&serialized_segment.path, raw_bytes).await?; - if let Some(segment_cache) = CacheManager::instance().get_table_segment_cache() { - segment_cache.insert(serialized_segment.path, compact_segment_info); - } - Ok(()) - } } diff --git a/src/query/storages/fuse/src/io/write/meta_writer.rs b/src/query/storages/fuse/src/io/write/meta_writer.rs index 9f31a3f84e27..f38dc6dcd236 100644 --- a/src/query/storages/fuse/src/io/write/meta_writer.rs +++ b/src/query/storages/fuse/src/io/write/meta_writer.rs @@ -15,7 +15,6 @@ use databend_common_exception::Result; use databend_storages_common_cache::CacheAccessor; use databend_storages_common_cache::CachedObject; -use databend_storages_common_table_meta::meta::CompactSegmentInfo; use databend_storages_common_table_meta::meta::SegmentInfo; use databend_storages_common_table_meta::meta::TableSnapshot; use databend_storages_common_table_meta::meta::TableSnapshotStatistics; @@ -24,8 +23,6 @@ use opendal::Operator; #[async_trait::async_trait] pub trait MetaWriter { - /// If meta has a `to_bytes` function, such as `SegmentInfo` and `TableSnapshot` - /// We should not use `write_meta`. Instead, use `write_meta_data` async fn write_meta(&self, data_accessor: &Operator, location: &str) -> Result<()>; } @@ -42,24 +39,25 @@ where T: Marshal + Sync + Send #[async_trait::async_trait] pub trait CachedMetaWriter { - /// If meta has a `to_bytes` function, such as `SegmentInfo` and `TableSnapshot` - /// We should not use `write_meta_through_cache`. Instead, use `write_meta_data_through_cache` - async fn write_meta_through_cache(self, data_accessor: &Operator, location: &str) - -> Result<()>; + async fn write_meta_through_cache( + &self, + data_accessor: &Operator, + location: &str, + ) -> Result<()>; } #[async_trait::async_trait] impl CachedMetaWriter for SegmentInfo { #[async_backtrace::framed] async fn write_meta_through_cache( - self, + &self, data_accessor: &Operator, location: &str, ) -> Result<()> { let bytes = self.marshal()?; data_accessor.write(location, bytes.clone()).await?; - if let Some(cache) = CompactSegmentInfo::cache() { - cache.insert(location.to_owned(), CompactSegmentInfo::try_from(&self)?); + if let Some(cache) = SegmentInfo::cache() { + cache.insert(location.to_owned(), self.try_into()?); } Ok(()) } diff --git a/src/query/storages/fuse/src/io/write/mod.rs b/src/query/storages/fuse/src/io/write/mod.rs index 8549cb15d17b..4e785fc4188c 100644 --- a/src/query/storages/fuse/src/io/write/mod.rs +++ b/src/query/storages/fuse/src/io/write/mod.rs @@ -15,7 +15,6 @@ mod block_writer; mod inverted_index_writer; mod meta_writer; -mod segment_writer; mod write_settings; pub(crate) use block_writer::create_inverted_index_builders; @@ -33,5 +32,4 @@ pub(crate) use inverted_index_writer::create_tokenizer_manager; pub use inverted_index_writer::InvertedIndexWriter; pub use meta_writer::CachedMetaWriter; pub use meta_writer::MetaWriter; -pub use segment_writer::SegmentWriter; pub use write_settings::WriteSettings; diff --git a/src/query/storages/fuse/src/io/write/segment_writer.rs b/src/query/storages/fuse/src/io/write/segment_writer.rs deleted file mode 100644 index 6dae2c3e610b..000000000000 --- a/src/query/storages/fuse/src/io/write/segment_writer.rs +++ /dev/null @@ -1,64 +0,0 @@ -// Copyright 2021 Datafuse Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use databend_common_exception::Result; -use databend_storages_common_table_meta::meta::Location; -use databend_storages_common_table_meta::meta::SegmentInfo; -use databend_storages_common_table_meta::meta::Versioned; -use opendal::Operator; - -use super::meta_writer::MetaWriter; -use crate::io::CachedMetaWriter; -use crate::io::TableMetaLocationGenerator; - -#[derive(Clone)] -pub struct SegmentWriter<'a> { - location_generator: &'a TableMetaLocationGenerator, - data_accessor: &'a Operator, -} - -impl<'a> SegmentWriter<'a> { - pub fn new( - data_accessor: &'a Operator, - location_generator: &'a TableMetaLocationGenerator, - ) -> Self { - Self { - location_generator, - data_accessor, - } - } - - #[async_backtrace::framed] - pub async fn write_segment(&self, segment: SegmentInfo) -> Result { - let location = self.generate_location(); - segment - .write_meta_through_cache(self.data_accessor, &location.0) - .await?; - Ok(location) - } - - #[async_backtrace::framed] - pub async fn write_segment_no_cache(&self, segment: &SegmentInfo) -> Result { - let location = self.generate_location(); - segment - .write_meta(self.data_accessor, location.0.as_str()) - .await?; - Ok(location) - } - - fn generate_location(&self) -> Location { - let path = self.location_generator.gen_segment_info_location(); - (path, SegmentInfo::VERSION) - } -} diff --git a/src/query/storages/fuse/src/operations/common/processors/transform_mutation_aggregator.rs b/src/query/storages/fuse/src/operations/common/processors/transform_mutation_aggregator.rs index 5e24a2eebead..3a42880d3b31 100644 --- a/src/query/storages/fuse/src/operations/common/processors/transform_mutation_aggregator.rs +++ b/src/query/storages/fuse/src/operations/common/processors/transform_mutation_aggregator.rs @@ -40,8 +40,8 @@ use log::info; use log::warn; use opendal::Operator; +use crate::io::CachedMetaWriter; use crate::io::SegmentsIO; -use crate::io::SerializedSegment; use crate::io::TableMetaLocationGenerator; use crate::operations::common::CommitMeta; use crate::operations::common::ConflictResolveContext; @@ -557,12 +557,8 @@ async fn write_segment( } // create new segment info let new_segment = SegmentInfo::new(blocks, new_summary.clone()); - - // write the segment info. - let serialized_segment = SerializedSegment { - path: location.clone(), - segment: Arc::new(new_segment), - }; - SegmentsIO::write_segment(dal, serialized_segment).await?; + new_segment + .write_meta_through_cache(&dal, &location) + .await?; Ok((location, new_summary)) } diff --git a/src/query/storages/fuse/src/operations/gc.rs b/src/query/storages/fuse/src/operations/gc.rs index 8ded8fb1902c..fc695810dd0b 100644 --- a/src/query/storages/fuse/src/operations/gc.rs +++ b/src/query/storages/fuse/src/operations/gc.rs @@ -32,6 +32,7 @@ use databend_storages_common_index::InvertedIndexMeta; use databend_storages_common_io::Files; use databend_storages_common_table_meta::meta::CompactSegmentInfo; use databend_storages_common_table_meta::meta::Location; +use databend_storages_common_table_meta::meta::SegmentInfo; use databend_storages_common_table_meta::meta::TableSnapshot; use databend_storages_common_table_meta::meta::TableSnapshotStatistics; use log::error; @@ -591,7 +592,7 @@ impl FuseTable { } } - self.try_purge_location_files_and_cache::( + self.try_purge_location_files_and_cache::( ctx.clone(), inverted_indexes_to_be_purged, ) @@ -602,7 +603,7 @@ impl FuseTable { let blooms_count = blooms_to_be_purged.len(); if blooms_count > 0 { counter.blooms += blooms_count; - self.try_purge_location_files_and_cache::( + self.try_purge_location_files_and_cache::( ctx.clone(), blooms_to_be_purged, ) @@ -613,7 +614,7 @@ impl FuseTable { let segments_count = segments_to_be_purged.len(); if segments_count > 0 { counter.segments += segments_count; - self.try_purge_location_files_and_cache::( + self.try_purge_location_files_and_cache::( ctx.clone(), segments_to_be_purged, ) @@ -633,7 +634,7 @@ impl FuseTable { let ts_count = ts_to_be_purged.len(); if ts_count > 0 { counter.table_statistics += ts_count; - self.try_purge_location_files_and_cache::( + self.try_purge_location_files_and_cache::( ctx.clone(), ts_to_be_purged, ) @@ -644,7 +645,7 @@ impl FuseTable { let snapshots_count = snapshots_to_be_purged.len(); if snapshots_count > 0 { counter.snapshots += snapshots_count; - self.try_purge_location_files_and_cache::( + self.try_purge_location_files_and_cache::( ctx.clone(), snapshots_to_be_purged, ) @@ -680,13 +681,13 @@ impl FuseTable { // Purge file by location chunks. #[async_backtrace::framed] - pub async fn try_purge_location_files_and_cache( + pub async fn try_purge_location_files_and_cache( &self, ctx: Arc, locations_to_be_purged: HashSet, ) -> Result<()> where - T: CachedObject, + T: CachedObject, { if let Some(cache) = T::cache() { for loc in locations_to_be_purged.iter() { diff --git a/src/query/storages/fuse/src/operations/mutation/mutator/segment_compact_mutator.rs b/src/query/storages/fuse/src/operations/mutation/mutator/segment_compact_mutator.rs index 08ff8fd35370..3dbcf5856760 100644 --- a/src/query/storages/fuse/src/operations/mutation/mutator/segment_compact_mutator.rs +++ b/src/query/storages/fuse/src/operations/mutation/mutator/segment_compact_mutator.rs @@ -21,10 +21,11 @@ use databend_common_metrics::storage::metrics_set_compact_segments_select_durati use databend_storages_common_table_meta::meta::Location; use databend_storages_common_table_meta::meta::SegmentInfo; use databend_storages_common_table_meta::meta::Statistics; +use databend_storages_common_table_meta::meta::Versioned; use log::info; use opendal::Operator; -use crate::io::SegmentWriter; +use crate::io::CachedMetaWriter; use crate::io::SegmentsIO; use crate::io::TableMetaLocationGenerator; use crate::operations::CompactOptions; @@ -101,14 +102,14 @@ impl SegmentCompactMutator { let schema = Arc::new(self.compact_params.base_snapshot.schema.clone()); let fuse_segment_io = SegmentsIO::create(self.ctx.clone(), self.data_accessor.clone(), schema); - let segment_writer = SegmentWriter::new(&self.data_accessor, &self.location_generator); let chunk_size = self.ctx.get_settings().get_max_threads()? as usize * 4; let compactor = SegmentCompactor::new( self.compact_params.block_per_seg as u64, self.default_cluster_key_id, chunk_size, &fuse_segment_io, - segment_writer, + &self.data_accessor, + &self.location_generator, ); self.compaction = compactor @@ -167,7 +168,8 @@ pub struct SegmentCompactor<'a> { accumulated_num_blocks: u64, chunk_size: usize, segment_reader: &'a SegmentsIO, - segment_writer: SegmentWriter<'a>, + operator: &'a Operator, + location_generator: &'a TableMetaLocationGenerator, // accumulated compaction state compacted_state: SegmentCompactionState, } @@ -178,7 +180,8 @@ impl<'a> SegmentCompactor<'a> { default_cluster_key_id: Option, chunk_size: usize, segment_reader: &'a SegmentsIO, - segment_writer: SegmentWriter<'a>, + operator: &'a Operator, + location_generator: &'a TableMetaLocationGenerator, ) -> Self { Self { threshold, @@ -187,7 +190,8 @@ impl<'a> SegmentCompactor<'a> { fragmented_segments: vec![], chunk_size, segment_reader, - segment_writer, + operator, + location_generator, compacted_state: Default::default(), } } @@ -351,11 +355,16 @@ impl<'a> SegmentCompactor<'a> { // 2.2 write down new segment let new_segment = SegmentInfo::new(blocks, new_statistics); - let location = self.segment_writer.write_segment(new_segment).await?; + let location = self.location_generator.gen_segment_info_location(); + new_segment + .write_meta_through_cache(self.operator, &location) + .await?; self.compacted_state .new_segment_paths - .push(location.0.clone()); - self.compacted_state.segments_locations.push(location); + .push(location.clone()); + self.compacted_state + .segments_locations + .push((location, SegmentInfo::VERSION)); Ok(()) }