Skip to content

Commit

Permalink
Merge branch 'branch-2.1' into test_alter_table_partition_2.1
Browse files Browse the repository at this point in the history
  • Loading branch information
cjj2010 authored Nov 11, 2024
2 parents c04bf7d + 5dda61b commit 0fcbf7c
Show file tree
Hide file tree
Showing 18 changed files with 321 additions and 74 deletions.
17 changes: 17 additions & 0 deletions be/src/olap/rowset/segment_v2/inverted_index_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,23 @@ const int32_t MAX_LEAF_COUNT = 1024;
const float MAXMBSortInHeap = 512.0 * 8;
const int DIMS = 1;

bool InvertedIndexColumnWriter::check_support_inverted_index(const TabletColumn& column) {
// bellow types are not supported in inverted index for extracted columns
static std::set<FieldType> invalid_types = {
FieldType::OLAP_FIELD_TYPE_DOUBLE,
FieldType::OLAP_FIELD_TYPE_JSONB,
FieldType::OLAP_FIELD_TYPE_ARRAY,
FieldType::OLAP_FIELD_TYPE_FLOAT,
};
if (column.is_extracted_column() && (invalid_types.contains(column.type()))) {
return false;
}
if (column.is_variant_type()) {
return false;
}
return true;
}

template <FieldType field_type>
class InvertedIndexColumnWriterImpl : public InvertedIndexColumnWriter {
public:
Expand Down
19 changes: 2 additions & 17 deletions be/src/olap/rowset/segment_v2/inverted_index_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,14 @@
#include "io/fs/local_file_system.h"
#include "olap/olap_common.h"
#include "olap/options.h"
#include "olap/tablet_schema.h"

namespace doris {
class CollectionValue;

class Field;

class TabletIndex;
class TabletColumn;

namespace segment_v2 {
class InvertedIndexFileWriter;
Expand Down Expand Up @@ -76,22 +76,7 @@ class InvertedIndexColumnWriter {

// check if the column is valid for inverted index, some columns
// are generated from variant, but not all of them are supported
static bool check_support_inverted_index(const TabletColumn& column) {
// bellow types are not supported in inverted index for extracted columns
static std::set<FieldType> invalid_types = {
FieldType::OLAP_FIELD_TYPE_DOUBLE,
FieldType::OLAP_FIELD_TYPE_JSONB,
FieldType::OLAP_FIELD_TYPE_ARRAY,
FieldType::OLAP_FIELD_TYPE_FLOAT,
};
if (column.is_extracted_column() && (invalid_types.contains(column.type()))) {
return false;
}
if (column.is_variant_type()) {
return false;
}
return true;
}
static bool check_support_inverted_index(const TabletColumn& column);

private:
DISALLOW_COPY_AND_ASSIGN(InvertedIndexColumnWriter);
Expand Down
57 changes: 57 additions & 0 deletions be/src/olap/tablet_column_object_pool.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "olap/tablet_column_object_pool.h"

#include <gen_cpp/AgentService_types.h>
#include <gen_cpp/olap_file.pb.h>

#include "olap/tablet_schema.h"

namespace doris {

bvar::Adder<int64_t> g_tablet_column_cache_count("tablet_column_cache_count");
bvar::Adder<int64_t> g_tablet_column_cache_hit_count("tablet_column_cache_hit_count");

std::pair<Cache::Handle*, TabletColumnPtr> TabletColumnObjectPool::insert(const std::string& key) {
auto* lru_handle = lookup(key);
TabletColumnPtr tablet_column_ptr;
if (lru_handle) {
auto* value = (CacheValue*)LRUCachePolicy::value(lru_handle);
tablet_column_ptr = value->tablet_column;
VLOG_DEBUG << "reuse column ";
g_tablet_column_cache_hit_count << 1;
} else {
auto* value = new CacheValue;
tablet_column_ptr = std::make_shared<TabletColumn>();
ColumnPB pb;
pb.ParseFromString(key);
tablet_column_ptr->init_from_pb(pb);
VLOG_DEBUG << "create column ";
value->tablet_column = tablet_column_ptr;
lru_handle = LRUCachePolicyTrackingManual::insert(key, value, 1, 0, CachePriority::NORMAL);
g_tablet_column_cache_count << 1;
}
DCHECK(lru_handle != nullptr);
return {lru_handle, tablet_column_ptr};
}

TabletColumnObjectPool::CacheValue::~CacheValue() {
g_tablet_column_cache_count << -1;
}

} // namespace doris
60 changes: 60 additions & 0 deletions be/src/olap/tablet_column_object_pool.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include "olap/tablet_fwd.h"
#include "olap/tablet_schema.h"
#include "runtime/exec_env.h"
#include "runtime/memory/lru_cache_policy.h"

namespace doris {

// TabletColumnObjectPool is a cache for TabletColumn objects. It is used to reduce memory consumption
// when there are a large number of identical TabletColumns in the cluster, which usually occurs
// when VARIANT type columns are modified and added, each Rowset has an individual TabletSchema.
// Excessive TabletSchemas can lead to significant memory overhead. Reusing memory for identical
// TabletColumns would greatly reduce this memory consumption.

class TabletColumnObjectPool : public LRUCachePolicyTrackingManual {
public:
using LRUCachePolicyTrackingManual::insert;
TabletColumnObjectPool(size_t capacity)
: LRUCachePolicyTrackingManual(CachePolicy::CacheType::TABLET_COLUMN_OBJECT_POOL,
capacity, LRUCacheType::NUMBER,
config::tablet_schema_cache_recycle_interval) {}

static TabletColumnObjectPool* create_global_column_cache(size_t capacity) {
auto* res = new TabletColumnObjectPool(capacity);
return res;
}

static TabletColumnObjectPool* instance() {
return ExecEnv::GetInstance()->get_tablet_column_object_pool();
}

std::pair<Cache::Handle*, TabletColumnPtr> insert(const std::string& key);

private:
class CacheValue : public LRUCacheValueBase {
public:
~CacheValue() override;
TabletColumnPtr tablet_column;
};
};

} // namespace doris
47 changes: 30 additions & 17 deletions be/src/olap/tablet_schema.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,10 @@
#include "exec/tablet_info.h"
#include "olap/inverted_index_parser.h"
#include "olap/olap_define.h"
#include "olap/tablet_column_object_pool.h"
#include "olap/types.h"
#include "olap/utils.h"
#include "runtime/memory/lru_cache_policy.h"
#include "runtime/thread_context.h"
#include "tablet_meta.h"
#include "vec/aggregate_functions/aggregate_function_simple_factory.h"
Expand Down Expand Up @@ -849,6 +851,7 @@ TabletSchema::TabletSchema() {

TabletSchema::~TabletSchema() {
g_total_tablet_schema_num << -1;
clear_column_cache_handlers();
}

void TabletSchema::append_column(TabletColumn column, ColumnType col_type) {
Expand Down Expand Up @@ -938,9 +941,18 @@ void TabletSchema::clear_columns() {
_num_null_columns = 0;
_num_key_columns = 0;
_cols.clear();
clear_column_cache_handlers();
}

void TabletSchema::init_from_pb(const TabletSchemaPB& schema, bool ignore_extracted_columns) {
void TabletSchema::clear_column_cache_handlers() {
for (auto* cache_handle : _column_cache_handlers) {
TabletColumnObjectPool::instance()->release(cache_handle);
}
_column_cache_handlers.clear();
}

void TabletSchema::init_from_pb(const TabletSchemaPB& schema, bool ignore_extracted_columns,
bool reuse_cache_column) {
_keys_type = schema.keys_type();
_num_columns = 0;
_num_variant_columns = 0;
Expand All @@ -951,25 +963,34 @@ void TabletSchema::init_from_pb(const TabletSchemaPB& schema, bool ignore_extrac
_field_name_to_index.clear();
_field_id_to_index.clear();
_cluster_key_idxes.clear();
clear_column_cache_handlers();
for (const auto& i : schema.cluster_key_idxes()) {
_cluster_key_idxes.push_back(i);
}
for (auto& column_pb : schema.column()) {
TabletColumn column;
column.init_from_pb(column_pb);
if (ignore_extracted_columns && column.is_extracted_column()) {
TabletColumnPtr column;
if (reuse_cache_column) {
auto pair = TabletColumnObjectPool::instance()->insert(
deterministic_string_serialize(column_pb));
column = pair.second;
_column_cache_handlers.push_back(pair.first);
} else {
column = std::make_shared<TabletColumn>();
column->init_from_pb(column_pb);
}
if (ignore_extracted_columns && column->is_extracted_column()) {
continue;
}
if (column.is_key()) {
if (column->is_key()) {
_num_key_columns++;
}
if (column.is_nullable()) {
if (column->is_nullable()) {
_num_null_columns++;
}
if (column.is_variant_type()) {
if (column->is_variant_type()) {
++_num_variant_columns;
}
_cols.emplace_back(std::make_shared<TabletColumn>(std::move(column)));
_cols.emplace_back(std::move(column));
_field_name_to_index.emplace(StringRef(_cols.back()->name()), _num_columns);
_field_id_to_index[_cols.back()->unique_id()] = _num_columns;
_num_columns++;
Expand Down Expand Up @@ -1077,6 +1098,7 @@ void TabletSchema::build_current_tablet_schema(int64_t index_id, int32_t version
_sequence_col_idx = -1;
_version_col_idx = -1;
_cluster_key_idxes.clear();
clear_column_cache_handlers();
for (const auto& i : ori_tablet_schema._cluster_key_idxes) {
_cluster_key_idxes.push_back(i);
}
Expand Down Expand Up @@ -1525,13 +1547,4 @@ bool operator!=(const TabletSchema& a, const TabletSchema& b) {
return !(a == b);
}

std::string TabletSchema::deterministic_string_serialize(const TabletSchemaPB& schema_pb) {
std::string output;
google::protobuf::io::StringOutputStream string_output_stream(&output);
google::protobuf::io::CodedOutputStream output_stream(&string_output_stream);
output_stream.SetSerializationDeterministic(true);
schema_pb.SerializeToCodedStream(&output_stream);
return output;
}

} // namespace doris
20 changes: 18 additions & 2 deletions be/src/olap/tablet_schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include "olap/rowset/segment_v2/options.h"
#include "runtime/define_primitive_type.h"
#include "runtime/descriptors.h"
#include "runtime/memory/lru_cache_policy.h"
#include "util/string_util.h"
#include "vec/aggregate_functions/aggregate_function.h"
#include "vec/common/string_ref.h"
Expand Down Expand Up @@ -273,10 +274,22 @@ class TabletSchema {
TabletSchema();
virtual ~TabletSchema();

void init_from_pb(const TabletSchemaPB& schema, bool ignore_extracted_columns = false);
// Init from pb
// ignore_extracted_columns: ignore the extracted columns from variant column
// reuse_cached_column: reuse the cached column in the schema if they are the same, to reduce memory usage
void init_from_pb(const TabletSchemaPB& schema, bool ignore_extracted_columns = false,
bool reuse_cached_column = false);
// Notice: Use deterministic way to serialize protobuf,
// since serialize Map in protobuf may could lead to un-deterministic by default
static std::string deterministic_string_serialize(const TabletSchemaPB& schema_pb);
template <class PbType>
static std::string deterministic_string_serialize(const PbType& pb) {
std::string output;
google::protobuf::io::StringOutputStream string_output_stream(&output);
google::protobuf::io::CodedOutputStream output_stream(&string_output_stream);
output_stream.SetSerializationDeterministic(true);
pb.SerializeToCodedStream(&output_stream);
return output;
}
void to_schema_pb(TabletSchemaPB* tablet_meta_pb) const;
void append_column(TabletColumn column, ColumnType col_type = ColumnType::NORMAL);
void append_index(TabletIndex index);
Expand Down Expand Up @@ -466,10 +479,13 @@ class TabletSchema {
friend bool operator==(const TabletSchema& a, const TabletSchema& b);
friend bool operator!=(const TabletSchema& a, const TabletSchema& b);

void clear_column_cache_handlers();

KeysType _keys_type = DUP_KEYS;
SortType _sort_type = SortType::LEXICAL;
size_t _sort_col_num = 0;
std::vector<TabletColumnPtr> _cols;
std::vector<Cache::Handle*> _column_cache_handlers;

std::vector<TabletIndex> _indexes;
std::unordered_map<StringRef, int32_t, StringRefHash> _field_name_to_index;
Expand Down
21 changes: 18 additions & 3 deletions be/src/olap/tablet_schema_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,30 +18,45 @@
#include "olap/tablet_schema_cache.h"

#include <gen_cpp/olap_file.pb.h>
#include <glog/logging.h>
#include <json2pb/pb_to_json.h>

#include "bvar/bvar.h"
#include "olap/tablet_schema.h"
#include "util/sha.h"

bvar::Adder<int64_t> g_tablet_schema_cache_count("tablet_schema_cache_count");
bvar::Adder<int64_t> g_tablet_schema_cache_columns_count("tablet_schema_cache_columns_count");
bvar::Adder<int64_t> g_tablet_schema_cache_hit_count("tablet_schema_cache_hit_count");

namespace doris {

// to reduce the memory consumption of the serialized TabletSchema as key.
// use sha256 to prevent from hash collision
static std::string get_key_signature(const std::string& origin) {
SHA256Digest digest;
digest.reset(origin.data(), origin.length());
return std::string {digest.digest().data(), digest.digest().length()};
}

std::pair<Cache::Handle*, TabletSchemaSPtr> TabletSchemaCache::insert(const std::string& key) {
auto* lru_handle = lookup(key);
std::string key_signature = get_key_signature(key);
auto* lru_handle = lookup(key_signature);
TabletSchemaSPtr tablet_schema_ptr;
if (lru_handle) {
auto* value = (CacheValue*)LRUCachePolicy::value(lru_handle);
tablet_schema_ptr = value->tablet_schema;
g_tablet_schema_cache_hit_count << 1;
} else {
auto* value = new CacheValue;
tablet_schema_ptr = std::make_shared<TabletSchema>();
TabletSchemaPB pb;
pb.ParseFromString(key);
tablet_schema_ptr->init_from_pb(pb);
// We should reuse the memory of the same TabletColumn object, set reuse_cached_column to true
tablet_schema_ptr->init_from_pb(pb, false, true);
value->tablet_schema = tablet_schema_ptr;
lru_handle = LRUCachePolicyTrackingManual::insert(
key, value, tablet_schema_ptr->num_columns(), 0, CachePriority::NORMAL);
key_signature, value, tablet_schema_ptr->num_columns(), 0, CachePriority::NORMAL);
g_tablet_schema_cache_count << 1;
g_tablet_schema_cache_columns_count << tablet_schema_ptr->num_columns();
}
Expand Down
Loading

0 comments on commit 0fcbf7c

Please sign in to comment.