From ebe6b4d4db939872011f0b31e96a834850f34328 Mon Sep 17 00:00:00 2001 From: lihangyu <15605149486@163.com> Date: Mon, 11 Nov 2024 10:35:30 +0800 Subject: [PATCH 1/3] [Opt](Serde) optimize serialization to string on variant type (#43237) (#43342) (#43237) --- be/src/vec/columns/column_object.cpp | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/be/src/vec/columns/column_object.cpp b/be/src/vec/columns/column_object.cpp index 6eca8bcff44ade0..05f1bdb13a03864 100644 --- a/be/src/vec/columns/column_object.cpp +++ b/be/src/vec/columns/column_object.cpp @@ -1141,15 +1141,17 @@ rapidjson::Value* find_leaf_node_by_path(rapidjson::Value& json, const PathInDat Status find_and_set_leave_value(const IColumn* column, const PathInData& path, const DataTypeSerDeSPtr& type_serde, const DataTypePtr& type, - rapidjson::Value& root, + TypeIndex base_type_index, rapidjson::Value& root, rapidjson::Document::AllocatorType& allocator, Arena& mem_pool, int row) { +#ifndef NDEBUG // sanitize type and column if (column->get_name() != type->create_column()->get_name()) { return Status::InternalError( "failed to set value for path {}, expected type {}, but got {} at row {}", path.get_path(), type->get_name(), column->get_name(), row); } +#endif const auto* nullable = check_and_get_column(column); if (nullable != nullptr && (nullable->is_null_at(row) || (path.empty() && nullable->get_data_at(row).empty()))) { @@ -1272,11 +1274,12 @@ Status ColumnObject::serialize_one_row_to_json_format(int row, rapidjson::String VLOG_DEBUG << "dump structure " << JsonFunctions::print_json_value(*doc_structure); #endif for (const auto& subcolumn : subcolumns) { - RETURN_IF_ERROR(find_and_set_leave_value(subcolumn->data.get_finalized_column_ptr(), - subcolumn->path, - subcolumn->data.get_least_common_type_serde(), - subcolumn->data.get_least_common_type(), root, - doc_structure->GetAllocator(), mem_pool, row)); + RETURN_IF_ERROR(find_and_set_leave_value( + subcolumn->data.get_finalized_column_ptr(), subcolumn->path, + subcolumn->data.get_least_common_type_serde(), + subcolumn->data.get_least_common_type(), + subcolumn->data.least_common_type.get_base_type_id(), root, + doc_structure->GetAllocator(), mem_pool, row)); if (subcolumn->path.empty() && !root.IsObject()) { // root was modified, only handle root node break; @@ -1344,10 +1347,11 @@ Status ColumnObject::merge_sparse_to_root_column() { ++null_count; continue; } - bool succ = find_and_set_leave_value(column, subcolumn->path, - subcolumn->data.get_least_common_type_serde(), - subcolumn->data.get_least_common_type(), root, - doc_structure->GetAllocator(), mem_pool, i); + bool succ = find_and_set_leave_value( + column, subcolumn->path, subcolumn->data.get_least_common_type_serde(), + subcolumn->data.get_least_common_type(), + subcolumn->data.least_common_type.get_base_type_id(), root, + doc_structure->GetAllocator(), mem_pool, i); if (succ && subcolumn->path.empty() && !root.IsObject()) { // root was modified, only handle root node break; From e073b575cce1cc922554519f0a8852874502f1bf Mon Sep 17 00:00:00 2001 From: lihangyu <15605149486@163.com> Date: Mon, 11 Nov 2024 10:38:42 +0800 Subject: [PATCH 2/3] [Opt](TabletSchema) reuse TabletColumn info to reduce mem (#42448) (#43349) (#42448) --- .../segment_v2/inverted_index_writer.cpp | 17 ++++++ .../rowset/segment_v2/inverted_index_writer.h | 19 +----- be/src/olap/tablet_column_object_pool.cpp | 57 ++++++++++++++++++ be/src/olap/tablet_column_object_pool.h | 60 +++++++++++++++++++ be/src/olap/tablet_schema.cpp | 47 +++++++++------ be/src/olap/tablet_schema.h | 20 ++++++- be/src/olap/tablet_schema_cache.cpp | 21 ++++++- be/src/runtime/exec_env.h | 6 ++ be/src/runtime/exec_env_init.cpp | 4 ++ be/src/runtime/memory/cache_policy.h | 6 +- be/src/util/block_compression.cpp | 1 + be/test/testutil/run_all_tests.cpp | 4 ++ 12 files changed, 222 insertions(+), 40 deletions(-) create mode 100644 be/src/olap/tablet_column_object_pool.cpp create mode 100644 be/src/olap/tablet_column_object_pool.h diff --git a/be/src/olap/rowset/segment_v2/inverted_index_writer.cpp b/be/src/olap/rowset/segment_v2/inverted_index_writer.cpp index 27bd9b72b3c609a..a50b34b5fb18724 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_writer.cpp +++ b/be/src/olap/rowset/segment_v2/inverted_index_writer.cpp @@ -72,6 +72,23 @@ const int32_t MAX_LEAF_COUNT = 1024; const float MAXMBSortInHeap = 512.0 * 8; const int DIMS = 1; +bool InvertedIndexColumnWriter::check_support_inverted_index(const TabletColumn& column) { + // bellow types are not supported in inverted index for extracted columns + static std::set invalid_types = { + FieldType::OLAP_FIELD_TYPE_DOUBLE, + FieldType::OLAP_FIELD_TYPE_JSONB, + FieldType::OLAP_FIELD_TYPE_ARRAY, + FieldType::OLAP_FIELD_TYPE_FLOAT, + }; + if (column.is_extracted_column() && (invalid_types.contains(column.type()))) { + return false; + } + if (column.is_variant_type()) { + return false; + } + return true; +} + template class InvertedIndexColumnWriterImpl : public InvertedIndexColumnWriter { public: diff --git a/be/src/olap/rowset/segment_v2/inverted_index_writer.h b/be/src/olap/rowset/segment_v2/inverted_index_writer.h index c29bb8c0b9d8c12..134dc32287c309d 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_writer.h +++ b/be/src/olap/rowset/segment_v2/inverted_index_writer.h @@ -33,7 +33,6 @@ #include "io/fs/local_file_system.h" #include "olap/olap_common.h" #include "olap/options.h" -#include "olap/tablet_schema.h" namespace doris { class CollectionValue; @@ -41,6 +40,7 @@ class CollectionValue; class Field; class TabletIndex; +class TabletColumn; namespace segment_v2 { class InvertedIndexFileWriter; @@ -76,22 +76,7 @@ class InvertedIndexColumnWriter { // check if the column is valid for inverted index, some columns // are generated from variant, but not all of them are supported - static bool check_support_inverted_index(const TabletColumn& column) { - // bellow types are not supported in inverted index for extracted columns - static std::set invalid_types = { - FieldType::OLAP_FIELD_TYPE_DOUBLE, - FieldType::OLAP_FIELD_TYPE_JSONB, - FieldType::OLAP_FIELD_TYPE_ARRAY, - FieldType::OLAP_FIELD_TYPE_FLOAT, - }; - if (column.is_extracted_column() && (invalid_types.contains(column.type()))) { - return false; - } - if (column.is_variant_type()) { - return false; - } - return true; - } + static bool check_support_inverted_index(const TabletColumn& column); private: DISALLOW_COPY_AND_ASSIGN(InvertedIndexColumnWriter); diff --git a/be/src/olap/tablet_column_object_pool.cpp b/be/src/olap/tablet_column_object_pool.cpp new file mode 100644 index 000000000000000..4ebc9bdd6ff44ab --- /dev/null +++ b/be/src/olap/tablet_column_object_pool.cpp @@ -0,0 +1,57 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/tablet_column_object_pool.h" + +#include +#include + +#include "olap/tablet_schema.h" + +namespace doris { + +bvar::Adder g_tablet_column_cache_count("tablet_column_cache_count"); +bvar::Adder g_tablet_column_cache_hit_count("tablet_column_cache_hit_count"); + +std::pair TabletColumnObjectPool::insert(const std::string& key) { + auto* lru_handle = lookup(key); + TabletColumnPtr tablet_column_ptr; + if (lru_handle) { + auto* value = (CacheValue*)LRUCachePolicy::value(lru_handle); + tablet_column_ptr = value->tablet_column; + VLOG_DEBUG << "reuse column "; + g_tablet_column_cache_hit_count << 1; + } else { + auto* value = new CacheValue; + tablet_column_ptr = std::make_shared(); + ColumnPB pb; + pb.ParseFromString(key); + tablet_column_ptr->init_from_pb(pb); + VLOG_DEBUG << "create column "; + value->tablet_column = tablet_column_ptr; + lru_handle = LRUCachePolicyTrackingManual::insert(key, value, 1, 0, CachePriority::NORMAL); + g_tablet_column_cache_count << 1; + } + DCHECK(lru_handle != nullptr); + return {lru_handle, tablet_column_ptr}; +} + +TabletColumnObjectPool::CacheValue::~CacheValue() { + g_tablet_column_cache_count << -1; +} + +} // namespace doris diff --git a/be/src/olap/tablet_column_object_pool.h b/be/src/olap/tablet_column_object_pool.h new file mode 100644 index 000000000000000..949906cad909895 --- /dev/null +++ b/be/src/olap/tablet_column_object_pool.h @@ -0,0 +1,60 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "olap/tablet_fwd.h" +#include "olap/tablet_schema.h" +#include "runtime/exec_env.h" +#include "runtime/memory/lru_cache_policy.h" + +namespace doris { + +// TabletColumnObjectPool is a cache for TabletColumn objects. It is used to reduce memory consumption +// when there are a large number of identical TabletColumns in the cluster, which usually occurs +// when VARIANT type columns are modified and added, each Rowset has an individual TabletSchema. +// Excessive TabletSchemas can lead to significant memory overhead. Reusing memory for identical +// TabletColumns would greatly reduce this memory consumption. + +class TabletColumnObjectPool : public LRUCachePolicyTrackingManual { +public: + using LRUCachePolicyTrackingManual::insert; + TabletColumnObjectPool(size_t capacity) + : LRUCachePolicyTrackingManual(CachePolicy::CacheType::TABLET_COLUMN_OBJECT_POOL, + capacity, LRUCacheType::NUMBER, + config::tablet_schema_cache_recycle_interval) {} + + static TabletColumnObjectPool* create_global_column_cache(size_t capacity) { + auto* res = new TabletColumnObjectPool(capacity); + return res; + } + + static TabletColumnObjectPool* instance() { + return ExecEnv::GetInstance()->get_tablet_column_object_pool(); + } + + std::pair insert(const std::string& key); + +private: + class CacheValue : public LRUCacheValueBase { + public: + ~CacheValue() override; + TabletColumnPtr tablet_column; + }; +}; + +} // namespace doris diff --git a/be/src/olap/tablet_schema.cpp b/be/src/olap/tablet_schema.cpp index 655e6d2e1e4d7b2..f6146c7379857b4 100644 --- a/be/src/olap/tablet_schema.cpp +++ b/be/src/olap/tablet_schema.cpp @@ -38,8 +38,10 @@ #include "exec/tablet_info.h" #include "olap/inverted_index_parser.h" #include "olap/olap_define.h" +#include "olap/tablet_column_object_pool.h" #include "olap/types.h" #include "olap/utils.h" +#include "runtime/memory/lru_cache_policy.h" #include "runtime/thread_context.h" #include "tablet_meta.h" #include "vec/aggregate_functions/aggregate_function_simple_factory.h" @@ -849,6 +851,7 @@ TabletSchema::TabletSchema() { TabletSchema::~TabletSchema() { g_total_tablet_schema_num << -1; + clear_column_cache_handlers(); } void TabletSchema::append_column(TabletColumn column, ColumnType col_type) { @@ -938,9 +941,18 @@ void TabletSchema::clear_columns() { _num_null_columns = 0; _num_key_columns = 0; _cols.clear(); + clear_column_cache_handlers(); } -void TabletSchema::init_from_pb(const TabletSchemaPB& schema, bool ignore_extracted_columns) { +void TabletSchema::clear_column_cache_handlers() { + for (auto* cache_handle : _column_cache_handlers) { + TabletColumnObjectPool::instance()->release(cache_handle); + } + _column_cache_handlers.clear(); +} + +void TabletSchema::init_from_pb(const TabletSchemaPB& schema, bool ignore_extracted_columns, + bool reuse_cache_column) { _keys_type = schema.keys_type(); _num_columns = 0; _num_variant_columns = 0; @@ -951,25 +963,34 @@ void TabletSchema::init_from_pb(const TabletSchemaPB& schema, bool ignore_extrac _field_name_to_index.clear(); _field_id_to_index.clear(); _cluster_key_idxes.clear(); + clear_column_cache_handlers(); for (const auto& i : schema.cluster_key_idxes()) { _cluster_key_idxes.push_back(i); } for (auto& column_pb : schema.column()) { - TabletColumn column; - column.init_from_pb(column_pb); - if (ignore_extracted_columns && column.is_extracted_column()) { + TabletColumnPtr column; + if (reuse_cache_column) { + auto pair = TabletColumnObjectPool::instance()->insert( + deterministic_string_serialize(column_pb)); + column = pair.second; + _column_cache_handlers.push_back(pair.first); + } else { + column = std::make_shared(); + column->init_from_pb(column_pb); + } + if (ignore_extracted_columns && column->is_extracted_column()) { continue; } - if (column.is_key()) { + if (column->is_key()) { _num_key_columns++; } - if (column.is_nullable()) { + if (column->is_nullable()) { _num_null_columns++; } - if (column.is_variant_type()) { + if (column->is_variant_type()) { ++_num_variant_columns; } - _cols.emplace_back(std::make_shared(std::move(column))); + _cols.emplace_back(std::move(column)); _field_name_to_index.emplace(StringRef(_cols.back()->name()), _num_columns); _field_id_to_index[_cols.back()->unique_id()] = _num_columns; _num_columns++; @@ -1077,6 +1098,7 @@ void TabletSchema::build_current_tablet_schema(int64_t index_id, int32_t version _sequence_col_idx = -1; _version_col_idx = -1; _cluster_key_idxes.clear(); + clear_column_cache_handlers(); for (const auto& i : ori_tablet_schema._cluster_key_idxes) { _cluster_key_idxes.push_back(i); } @@ -1525,13 +1547,4 @@ bool operator!=(const TabletSchema& a, const TabletSchema& b) { return !(a == b); } -std::string TabletSchema::deterministic_string_serialize(const TabletSchemaPB& schema_pb) { - std::string output; - google::protobuf::io::StringOutputStream string_output_stream(&output); - google::protobuf::io::CodedOutputStream output_stream(&string_output_stream); - output_stream.SetSerializationDeterministic(true); - schema_pb.SerializeToCodedStream(&output_stream); - return output; -} - } // namespace doris diff --git a/be/src/olap/tablet_schema.h b/be/src/olap/tablet_schema.h index ed7ab896107e360..159ecf78d731037 100644 --- a/be/src/olap/tablet_schema.h +++ b/be/src/olap/tablet_schema.h @@ -39,6 +39,7 @@ #include "olap/rowset/segment_v2/options.h" #include "runtime/define_primitive_type.h" #include "runtime/descriptors.h" +#include "runtime/memory/lru_cache_policy.h" #include "util/string_util.h" #include "vec/aggregate_functions/aggregate_function.h" #include "vec/common/string_ref.h" @@ -273,10 +274,22 @@ class TabletSchema { TabletSchema(); virtual ~TabletSchema(); - void init_from_pb(const TabletSchemaPB& schema, bool ignore_extracted_columns = false); + // Init from pb + // ignore_extracted_columns: ignore the extracted columns from variant column + // reuse_cached_column: reuse the cached column in the schema if they are the same, to reduce memory usage + void init_from_pb(const TabletSchemaPB& schema, bool ignore_extracted_columns = false, + bool reuse_cached_column = false); // Notice: Use deterministic way to serialize protobuf, // since serialize Map in protobuf may could lead to un-deterministic by default - static std::string deterministic_string_serialize(const TabletSchemaPB& schema_pb); + template + static std::string deterministic_string_serialize(const PbType& pb) { + std::string output; + google::protobuf::io::StringOutputStream string_output_stream(&output); + google::protobuf::io::CodedOutputStream output_stream(&string_output_stream); + output_stream.SetSerializationDeterministic(true); + pb.SerializeToCodedStream(&output_stream); + return output; + } void to_schema_pb(TabletSchemaPB* tablet_meta_pb) const; void append_column(TabletColumn column, ColumnType col_type = ColumnType::NORMAL); void append_index(TabletIndex index); @@ -466,10 +479,13 @@ class TabletSchema { friend bool operator==(const TabletSchema& a, const TabletSchema& b); friend bool operator!=(const TabletSchema& a, const TabletSchema& b); + void clear_column_cache_handlers(); + KeysType _keys_type = DUP_KEYS; SortType _sort_type = SortType::LEXICAL; size_t _sort_col_num = 0; std::vector _cols; + std::vector _column_cache_handlers; std::vector _indexes; std::unordered_map _field_name_to_index; diff --git a/be/src/olap/tablet_schema_cache.cpp b/be/src/olap/tablet_schema_cache.cpp index 51618f590a7dd2e..18e190c191c16b4 100644 --- a/be/src/olap/tablet_schema_cache.cpp +++ b/be/src/olap/tablet_schema_cache.cpp @@ -18,30 +18,45 @@ #include "olap/tablet_schema_cache.h" #include +#include +#include #include "bvar/bvar.h" #include "olap/tablet_schema.h" +#include "util/sha.h" bvar::Adder g_tablet_schema_cache_count("tablet_schema_cache_count"); bvar::Adder g_tablet_schema_cache_columns_count("tablet_schema_cache_columns_count"); +bvar::Adder g_tablet_schema_cache_hit_count("tablet_schema_cache_hit_count"); namespace doris { +// to reduce the memory consumption of the serialized TabletSchema as key. +// use sha256 to prevent from hash collision +static std::string get_key_signature(const std::string& origin) { + SHA256Digest digest; + digest.reset(origin.data(), origin.length()); + return std::string {digest.digest().data(), digest.digest().length()}; +} + std::pair TabletSchemaCache::insert(const std::string& key) { - auto* lru_handle = lookup(key); + std::string key_signature = get_key_signature(key); + auto* lru_handle = lookup(key_signature); TabletSchemaSPtr tablet_schema_ptr; if (lru_handle) { auto* value = (CacheValue*)LRUCachePolicy::value(lru_handle); tablet_schema_ptr = value->tablet_schema; + g_tablet_schema_cache_hit_count << 1; } else { auto* value = new CacheValue; tablet_schema_ptr = std::make_shared(); TabletSchemaPB pb; pb.ParseFromString(key); - tablet_schema_ptr->init_from_pb(pb); + // We should reuse the memory of the same TabletColumn object, set reuse_cached_column to true + tablet_schema_ptr->init_from_pb(pb, false, true); value->tablet_schema = tablet_schema_ptr; lru_handle = LRUCachePolicyTrackingManual::insert( - key, value, tablet_schema_ptr->num_columns(), 0, CachePriority::NORMAL); + key_signature, value, tablet_schema_ptr->num_columns(), 0, CachePriority::NORMAL); g_tablet_schema_cache_count << 1; g_tablet_schema_cache_columns_count << tablet_schema_ptr->num_columns(); } diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index 3d4a122bda2bc34..d9485e83b76a78d 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -100,6 +100,7 @@ class FrontendServiceClient; class FileMetaCache; class GroupCommitMgr; class TabletSchemaCache; +class TabletColumnObjectPool; class UserFunctionCache; class SchemaCache; class StoragePageCache; @@ -272,6 +273,9 @@ class ExecEnv { void set_storage_engine(StorageEngine* se) { this->_storage_engine = se; } void set_cache_manager(CacheManager* cm) { this->_cache_manager = cm; } void set_tablet_schema_cache(TabletSchemaCache* c) { this->_tablet_schema_cache = c; } + void set_tablet_column_object_pool(TabletColumnObjectPool* c) { + this->_tablet_column_object_pool = c; + } void set_storage_page_cache(StoragePageCache* c) { this->_storage_page_cache = c; } void set_segment_loader(SegmentLoader* sl) { this->_segment_loader = sl; } void set_routine_load_task_executor(RoutineLoadTaskExecutor* r) { @@ -298,6 +302,7 @@ class ExecEnv { TabletSchemaCache* get_tablet_schema_cache() { return _tablet_schema_cache; } StorageEngine* get_storage_engine() { return _storage_engine; } + TabletColumnObjectPool* get_tablet_column_object_pool() { return _tablet_column_object_pool; } SchemaCache* schema_cache() { return _schema_cache; } StoragePageCache* get_storage_page_cache() { return _storage_page_cache; } SegmentLoader* segment_loader() { return _segment_loader; } @@ -439,6 +444,7 @@ class ExecEnv { // So we choose to use raw pointer, please remember to delete these pointer in deconstructor. TabletSchemaCache* _tablet_schema_cache = nullptr; StorageEngine* _storage_engine = nullptr; + TabletColumnObjectPool* _tablet_column_object_pool = nullptr; SchemaCache* _schema_cache = nullptr; StoragePageCache* _storage_page_cache = nullptr; SegmentLoader* _segment_loader = nullptr; diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index 12d0b3bf9994af9..e47e26e8f6b0349 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -45,6 +45,7 @@ #include "olap/schema_cache.h" #include "olap/segment_loader.h" #include "olap/storage_engine.h" +#include "olap/tablet_column_object_pool.h" #include "olap/tablet_schema_cache.h" #include "olap/wal/wal_manager.h" #include "pipeline/pipeline_tracing.h" @@ -279,6 +280,9 @@ Status ExecEnv::_init(const std::vector& store_paths, _tablet_schema_cache = TabletSchemaCache::create_global_schema_cache(config::tablet_schema_cache_capacity); + _tablet_column_object_pool = TabletColumnObjectPool::create_global_column_cache( + config::tablet_schema_cache_capacity); + // Storage engine doris::EngineOptions options; options.store_paths = store_paths; diff --git a/be/src/runtime/memory/cache_policy.h b/be/src/runtime/memory/cache_policy.h index 610142c2c807a52..1bbeea5ef0d9ff3 100644 --- a/be/src/runtime/memory/cache_policy.h +++ b/be/src/runtime/memory/cache_policy.h @@ -46,6 +46,7 @@ class CachePolicy { TABLET_SCHEMA_CACHE = 14, CREATE_TABLET_RR_IDX_CACHE = 15, NONE = 16, // not be used + TABLET_COLUMN_OBJECT_POOL = 21, }; static std::string type_string(CacheType type) { @@ -82,6 +83,8 @@ class CachePolicy { return "TabletSchemaCache"; case CacheType::CREATE_TABLET_RR_IDX_CACHE: return "CreateTabletRRIdxCache"; + case CacheType::TABLET_COLUMN_OBJECT_POOL: + return "TabletColumnObjectPool"; default: LOG(FATAL) << "not match type of cache policy :" << static_cast(type); } @@ -105,7 +108,8 @@ class CachePolicy { {"CommonObjLRUCache", CacheType::COMMON_OBJ_LRU_CACHE}, {"ForUT", CacheType::FOR_UT}, {"TabletSchemaCache", CacheType::TABLET_SCHEMA_CACHE}, - {"CreateTabletRRIdxCache", CacheType::CREATE_TABLET_RR_IDX_CACHE}}; + {"CreateTabletRRIdxCache", CacheType::CREATE_TABLET_RR_IDX_CACHE}, + {"TabletColumnObjectPool", CacheType::TABLET_COLUMN_OBJECT_POOL}}; static CacheType string_to_type(std::string type) { if (StringToType.contains(type)) { diff --git a/be/src/util/block_compression.cpp b/be/src/util/block_compression.cpp index 8445ea177ab7066..d1516376b803ec3 100644 --- a/be/src/util/block_compression.cpp +++ b/be/src/util/block_compression.cpp @@ -45,6 +45,7 @@ #include #include #include +#include #include #include "common/config.h" diff --git a/be/test/testutil/run_all_tests.cpp b/be/test/testutil/run_all_tests.cpp index 75afdacd87ba6a2..272179fcb8f1bce 100644 --- a/be/test/testutil/run_all_tests.cpp +++ b/be/test/testutil/run_all_tests.cpp @@ -26,6 +26,7 @@ #include "gtest/gtest_pred_impl.h" #include "olap/page_cache.h" #include "olap/segment_loader.h" +#include "olap/tablet_column_object_pool.h" #include "olap/tablet_schema_cache.h" #include "runtime/exec_env.h" #include "runtime/memory/cache_manager.h" @@ -56,6 +57,9 @@ int main(int argc, char** argv) { doris::ExecEnv::GetInstance()->set_tablet_schema_cache( doris::TabletSchemaCache::create_global_schema_cache( doris::config::tablet_schema_cache_capacity)); + doris::ExecEnv::GetInstance()->set_tablet_column_object_pool( + doris::TabletColumnObjectPool::create_global_column_cache( + doris::config::tablet_schema_cache_capacity)); LOG(INFO) << "init config " << st; doris::init_glog("be-test"); From 5dda61b410cdde1dc7495dee9a95eeafe6f6fb25 Mon Sep 17 00:00:00 2001 From: walter Date: Mon, 11 Nov 2024 14:08:35 +0800 Subject: [PATCH 3/3] [fix](backup) Load backup meta and job info bytes from disk #43276 (#43519) cherry pick from #43276 --- .../org/apache/doris/backup/AbstractJob.java | 2 + .../apache/doris/backup/BackupHandler.java | 50 ++++++++++++++++--- .../org/apache/doris/backup/BackupJob.java | 45 +++++++++++------ .../org/apache/doris/backup/RestoreJob.java | 5 ++ .../doris/service/FrontendServiceImpl.java | 7 ++- 5 files changed, 85 insertions(+), 24 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/AbstractJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/AbstractJob.java index 4e2c3fd1990c4b2..f22598dd86b4e78 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/AbstractJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/AbstractJob.java @@ -155,6 +155,8 @@ public void setTypeRead(boolean isTypeRead) { public abstract boolean isCancelled(); + public abstract boolean isFinished(); + public abstract Status updateRepo(Repository repo); public static AbstractJob read(DataInput in) throws IOException { diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java index d9b7659cfc18364..49190acce1a636d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java @@ -109,10 +109,10 @@ public class BackupHandler extends MasterDaemon implements Writable { private Env env; - // map to store backup info, key is label name, value is Pair, meta && info is bytes - // this map not present in persist && only in fe master memory + // map to store backup info, key is label name, value is the BackupJob + // this map not present in persist && only in fe memory // one table only keep one snapshot info, only keep last - private final Map localSnapshots = new HashMap<>(); + private final Map localSnapshots = new HashMap<>(); private ReadWriteLock localSnapshotsLock = new ReentrantReadWriteLock(); public BackupHandler() { @@ -167,6 +167,7 @@ private boolean init() { return false; } } + isInit = true; return true; } @@ -544,11 +545,15 @@ private void addBackupOrRestoreJob(long dbId, AbstractJob job) { return; } + List removedLabels = Lists.newArrayList(); jobLock.lock(); try { Deque jobs = dbIdToBackupOrRestoreJobs.computeIfAbsent(dbId, k -> Lists.newLinkedList()); while (jobs.size() >= Config.max_backup_restore_job_num_per_db) { - jobs.removeFirst(); + AbstractJob removedJob = jobs.removeFirst(); + if (removedJob instanceof BackupJob && ((BackupJob) removedJob).isLocalSnapshot()) { + removedLabels.add(removedJob.getLabel()); + } } AbstractJob lastJob = jobs.peekLast(); @@ -561,6 +566,17 @@ private void addBackupOrRestoreJob(long dbId, AbstractJob job) { } finally { jobLock.unlock(); } + + if (job.isFinished() && job instanceof BackupJob) { + // Save snapshot to local repo, when reload backupHandler from image. + BackupJob backupJob = (BackupJob) job; + if (backupJob.isLocalSnapshot()) { + addSnapshot(backupJob.getLabel(), backupJob); + } + } + for (String label : removedLabels) { + removeSnapshot(label); + } } private List getAllCurrentJobs() { @@ -799,22 +815,42 @@ public boolean report(TTaskType type, long jobId, long taskId, int finishedNum, return false; } - public void addSnapshot(String labelName, Snapshot snapshot) { + public void addSnapshot(String labelName, BackupJob backupJob) { + assert backupJob.isFinished(); + + LOG.info("add snapshot {} to local repo", labelName); localSnapshotsLock.writeLock().lock(); try { - localSnapshots.put(labelName, snapshot); + localSnapshots.put(labelName, backupJob); + } finally { + localSnapshotsLock.writeLock().unlock(); + } + } + + public void removeSnapshot(String labelName) { + LOG.info("remove snapshot {} from local repo", labelName); + localSnapshotsLock.writeLock().lock(); + try { + localSnapshots.remove(labelName); } finally { localSnapshotsLock.writeLock().unlock(); } } public Snapshot getSnapshot(String labelName) { + BackupJob backupJob; localSnapshotsLock.readLock().lock(); try { - return localSnapshots.get(labelName); + backupJob = localSnapshots.get(labelName); } finally { localSnapshotsLock.readLock().unlock(); } + + if (backupJob == null) { + return null; + } + + return backupJob.getSnapshot(); } public static BackupHandler read(DataInput in) throws IOException { diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java index b24fb9fe7fd6d15..9e932d6f8fc729f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java @@ -123,9 +123,6 @@ public enum BackupJobState { // backup properties && table commit seq with table id private Map properties = Maps.newHashMap(); - private byte[] metaInfoBytes = null; - private byte[] jobInfoBytes = null; - public BackupJob() { super(JobType.BACKUP); } @@ -337,11 +334,7 @@ public synchronized boolean finishSnapshotUploadTask(UploadTask task, TFinishTas @Override public synchronized void replayRun() { - LOG.info("replay run backup job: {}", this); - if (state == BackupJobState.FINISHED && repoId == Repository.KEEP_ON_LOCAL_REPO_ID) { - Snapshot snapshot = new Snapshot(label, metaInfoBytes, jobInfoBytes); - env.getBackupHandler().addSnapshot(label, snapshot); - } + // nothing to do } @Override @@ -359,6 +352,11 @@ public boolean isCancelled() { return state == BackupJobState.CANCELLED; } + @Override + public boolean isFinished() { + return state == BackupJobState.FINISHED; + } + @Override public synchronized Status updateRepo(Repository repo) { this.repo = repo; @@ -844,8 +842,6 @@ private void saveMetaInfo() { } backupMeta.writeToFile(metaInfoFile); localMetaInfoFilePath = metaInfoFile.getAbsolutePath(); - // read meta info to metaInfoBytes - metaInfoBytes = Files.readAllBytes(metaInfoFile.toPath()); // 3. save job info file Map tableCommitSeqMap = Maps.newHashMap(); @@ -872,8 +868,6 @@ private void saveMetaInfo() { } jobInfo.writeToFile(jobInfoFile); localJobInfoFilePath = jobInfoFile.getAbsolutePath(); - // read job info to jobInfoBytes - jobInfoBytes = Files.readAllBytes(jobInfoFile.toPath()); } catch (Exception e) { status = new Status(ErrCode.COMMON_ERROR, "failed to save meta info and job info file: " + e.getMessage()); return; @@ -927,7 +921,6 @@ private void uploadMetaAndJobInfoFile() { } } - finishedTime = System.currentTimeMillis(); state = BackupJobState.FINISHED; @@ -936,8 +929,7 @@ private void uploadMetaAndJobInfoFile() { LOG.info("job is finished. {}", this); if (repoId == Repository.KEEP_ON_LOCAL_REPO_ID) { - Snapshot snapshot = new Snapshot(label, metaInfoBytes, jobInfoBytes); - env.getBackupHandler().addSnapshot(label, snapshot); + env.getBackupHandler().addSnapshot(label, this); return; } } @@ -1030,6 +1022,29 @@ private void cancelInternal() { LOG.info("finished to cancel backup job. current state: {}. {}", curState.name(), this); } + public boolean isLocalSnapshot() { + return repoId == Repository.KEEP_ON_LOCAL_REPO_ID; + } + + // read meta and job info bytes from disk, and return the snapshot + public synchronized Snapshot getSnapshot() { + if (state != BackupJobState.FINISHED || repoId != Repository.KEEP_ON_LOCAL_REPO_ID) { + return null; + } + + try { + File metaInfoFile = new File(localMetaInfoFilePath); + File jobInfoFile = new File(localJobInfoFilePath); + byte[] metaInfoBytes = Files.readAllBytes(metaInfoFile.toPath()); + byte[] jobInfoBytes = Files.readAllBytes(jobInfoFile.toPath()); + return new Snapshot(label, metaInfoBytes, jobInfoBytes); + } catch (IOException e) { + LOG.warn("failed to load meta info and job info file, meta info file {}, job info file {}: ", + localMetaInfoFilePath, localJobInfoFilePath, e); + return null; + } + } + public synchronized List getInfo() { List info = Lists.newArrayList(); info.add(String.valueOf(jobId)); diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java index 5a3b569a7cb7abe..1f13a2970d5f999 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java @@ -379,6 +379,11 @@ public boolean isCancelled() { return state == RestoreJobState.CANCELLED; } + @Override + public boolean isFinished() { + return state == RestoreJobState.FINISHED; + } + @Override public synchronized Status updateRepo(Repository repo) { this.repo = repo; diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index d37e54deba89658..d1aefdec5a49c2e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -3018,15 +3018,18 @@ private TGetSnapshotResult getSnapshotImpl(TGetSnapshotRequest request, String c } // Step 3: get snapshot + String label = request.getLabelName(); TGetSnapshotResult result = new TGetSnapshotResult(); result.setStatus(new TStatus(TStatusCode.OK)); - Snapshot snapshot = Env.getCurrentEnv().getBackupHandler().getSnapshot(request.getLabelName()); + Snapshot snapshot = Env.getCurrentEnv().getBackupHandler().getSnapshot(label); if (snapshot == null) { result.getStatus().setStatusCode(TStatusCode.SNAPSHOT_NOT_EXIST); - result.getStatus().addToErrorMsgs("snapshot not exist"); + result.getStatus().addToErrorMsgs(String.format("snapshot %s not exist", label)); } else { result.setMeta(snapshot.getMeta()); result.setJobInfo(snapshot.getJobInfo()); + LOG.info("get snapshot info, snapshot: {}, meta size: {}, job info size: {}", + label, snapshot.getMeta().length, snapshot.getJobInfo().length); } return result;