Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/branch-3.0' into sessionvar_4_co…
Browse files Browse the repository at this point in the history
…oldown_3.0

# Conflicts:
#	fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
  • Loading branch information
cjj2010 committed Nov 7, 2024
2 parents 984f71a + 37efa96 commit e7de6a5
Show file tree
Hide file tree
Showing 792 changed files with 25,988 additions and 9,480 deletions.
50 changes: 50 additions & 0 deletions .github/workflows/auto-cherry-pick.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
name: Auto Cherry-Pick to Branch

on:
pull_request:
types:
- closed
branches:
- master

jobs:
auto_cherry_pick:
runs-on: ubuntu-latest
if: ${{ contains(github.event.pull_request.labels.*.name, 'dev/3.0.x') && github.event.pull_request.merged == true }}
steps:
- name: Checkout repository
uses: actions/checkout@v3

- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.x'

- name: Install dependencies
run: |
pip install PyGithub
- name: Auto cherry-pick
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
REPO_NAME: ${{ github.repository }}
CONFLICT_LABEL: cherry-pick-conflict-in-3.0
run: |
python tools/auto-pick-script.py ${{ github.event.pull_request.number }} branch-3.0
8 changes: 7 additions & 1 deletion be/src/cloud/cloud_meta_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include "cloud/cloud_tablet.h"
#include "cloud/config.h"
#include "cloud/pb_convert.h"
#include "common/config.h"
#include "common/logging.h"
#include "common/status.h"
#include "cpp/sync_point.h"
Expand Down Expand Up @@ -410,6 +411,10 @@ Status CloudMetaMgr::sync_tablet_rowsets(CloudTablet* tablet, bool warmup_delta_
req.set_cumulative_point(tablet->cumulative_layer_point());
}
req.set_end_version(-1);
// backend side use schema dict
if (config::variant_use_cloud_schema_dict) {
req.set_schema_op(GetRowsetRequest::RETURN_DICT);
}
VLOG_DEBUG << "send GetRowsetRequest: " << req.ShortDebugString();

stub->get_rowset(&cntl, &req, &resp, nullptr);
Expand Down Expand Up @@ -524,7 +529,8 @@ Status CloudMetaMgr::sync_tablet_rowsets(CloudTablet* tablet, bool warmup_delta_
existed_rowset->rowset_id().to_string() == cloud_rs_meta_pb.rowset_id_v2()) {
continue; // Same rowset, skip it
}
RowsetMetaPB meta_pb = cloud_rowset_meta_to_doris(cloud_rs_meta_pb);
RowsetMetaPB meta_pb = cloud_rowset_meta_to_doris(
cloud_rs_meta_pb, resp.has_schema_dict() ? &resp.schema_dict() : nullptr);
auto rs_meta = std::make_shared<RowsetMeta>();
rs_meta->init_from_pb(meta_pb);
RowsetSharedPtr rowset;
Expand Down
6 changes: 4 additions & 2 deletions be/src/cloud/cloud_storage_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -558,14 +558,16 @@ std::vector<CloudTabletSPtr> CloudStorageEngine::_generate_cloud_compaction_task
} else if (config::enable_parallel_cumu_compaction) {
filter_out = [&tablet_preparing_cumu_compaction](CloudTablet* t) {
return tablet_preparing_cumu_compaction.contains(t->tablet_id()) ||
(t->tablet_state() != TABLET_RUNNING && t->alter_version() == -1);
(t->tablet_state() != TABLET_RUNNING &&
(!config::enable_new_tablet_do_compaction || t->alter_version() == -1));
};
} else {
filter_out = [&tablet_preparing_cumu_compaction,
&submitted_cumu_compactions](CloudTablet* t) {
return tablet_preparing_cumu_compaction.contains(t->tablet_id()) ||
submitted_cumu_compactions.contains(t->tablet_id()) ||
(t->tablet_state() != TABLET_RUNNING && t->alter_version() == -1);
(t->tablet_state() != TABLET_RUNNING &&
(!config::enable_new_tablet_do_compaction || t->alter_version() == -1));
};
}

Expand Down
45 changes: 35 additions & 10 deletions be/src/cloud/cloud_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,36 @@ Status CloudTablet::capture_rs_readers(const Version& spec_version,
return capture_rs_readers_unlocked(version_path, rs_splits);
}

Status CloudTablet::merge_rowsets_schema() {
// Find the rowset with the max version
auto max_version_rowset =
std::max_element(
_rs_version_map.begin(), _rs_version_map.end(),
[](const auto& a, const auto& b) {
return !a.second->tablet_schema()
? true
: (!b.second->tablet_schema()
? false
: a.second->tablet_schema()->schema_version() <
b.second->tablet_schema()
->schema_version());
})
->second;
TabletSchemaSPtr max_version_schema = max_version_rowset->tablet_schema();
// If the schema has variant columns, perform a merge to create a wide tablet schema
if (max_version_schema->num_variant_columns() > 0) {
std::vector<TabletSchemaSPtr> schemas;
std::transform(_rs_version_map.begin(), _rs_version_map.end(), std::back_inserter(schemas),
[](const auto& rs_meta) { return rs_meta.second->tablet_schema(); });
// Merge the collected schemas to obtain the least common schema
RETURN_IF_ERROR(vectorized::schema_util::get_least_common_schema(schemas, nullptr,
max_version_schema));
VLOG_DEBUG << "dump schema: " << max_version_schema->dump_full_schema();
_merged_tablet_schema = max_version_schema;
}
return Status::OK();
}

// There are only two tablet_states RUNNING and NOT_READY in cloud mode
// This function will erase the tablet from `CloudTabletMgr` when it can't find this tablet in MS.
Status CloudTablet::sync_rowsets(int64_t query_version, bool warmup_delta_data) {
Expand All @@ -133,6 +163,10 @@ Status CloudTablet::sync_rowsets(int64_t query_version, bool warmup_delta_data)
if (st.is<ErrorCode::NOT_FOUND>()) {
clear_cache();
}

// Merge all rowset schemas within a CloudTablet
RETURN_IF_ERROR(merge_rowsets_schema());

return st;
}

Expand Down Expand Up @@ -188,16 +222,7 @@ Status CloudTablet::sync_if_not_running() {
}

TabletSchemaSPtr CloudTablet::merged_tablet_schema() const {
std::shared_lock rdlock(_meta_lock);
TabletSchemaSPtr target_schema;
std::vector<TabletSchemaSPtr> schemas;
for (const auto& [_, rowset] : _rs_version_map) {
schemas.push_back(rowset->tablet_schema());
}
// get the max version schema and merge all schema
static_cast<void>(
vectorized::schema_util::get_least_common_schema(schemas, nullptr, target_schema));
return target_schema;
return _merged_tablet_schema;
}

void CloudTablet::add_rowsets(std::vector<RowsetSharedPtr> to_add, bool version_overlap,
Expand Down
6 changes: 6 additions & 0 deletions be/src/cloud/cloud_tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,9 @@ class CloudTablet final : public BaseTablet {

Status sync_if_not_running();

// Merge all rowset schemas within a CloudTablet
Status merge_rowsets_schema();

CloudStorageEngine& _engine;

// this mutex MUST ONLY be used when sync meta
Expand Down Expand Up @@ -246,6 +249,9 @@ class CloudTablet final : public BaseTablet {
std::mutex _base_compaction_lock;
std::mutex _cumulative_compaction_lock;
mutable std::mutex _rowset_update_lock;

// Schema will be merged from all rowsets when sync_rowsets
TabletSchemaSPtr _merged_tablet_schema;
};

using CloudTabletSPtr = std::shared_ptr<CloudTablet>;
Expand Down
9 changes: 9 additions & 0 deletions be/src/cloud/injection_point_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,15 @@ void register_suites() {
sp->set_call_back("VOlapTableSink::close",
[](auto&&) { std::this_thread::sleep_for(std::chrono::seconds(5)); });
});
// curl be_ip:http_port/api/injection_point/apply_suite?name=test_ttl_lru_evict'
suite_map.emplace("test_ttl_lru_evict", [] {
auto* sp = SyncPoint::get_instance();
sp->set_call_back("BlockFileCache::change_limit1", [](auto&& args) {
LOG(INFO) << "BlockFileCache::change_limit1";
auto* limit = try_any_cast<size_t*>(args[0]);
*limit = 1;
});
});
suite_map.emplace("test_file_segment_cache_corruption", [] {
auto* sp = SyncPoint::get_instance();
sp->set_call_back("Segment::open:corruption", [](auto&& args) {
Expand Down
55 changes: 49 additions & 6 deletions be/src/cloud/pb_convert.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include "cloud/pb_convert.h"

#include <common/logging.h>
#include <gen_cpp/olap_file.pb.h>

#include <utility>
Expand Down Expand Up @@ -138,19 +139,54 @@ void doris_rowset_meta_to_cloud(RowsetMetaCloudPB* out, RowsetMetaPB&& in) {
out->mutable_inverted_index_file_info()->Swap(in.mutable_inverted_index_file_info());
}

RowsetMetaPB cloud_rowset_meta_to_doris(const RowsetMetaCloudPB& in) {
static void fill_schema_with_dict(const RowsetMetaCloudPB& in, RowsetMetaPB* out,
const SchemaCloudDictionary& dict) {
std::unordered_map<int32_t, ColumnPB*> unique_id_map;
//init map
for (ColumnPB& column : *out->mutable_tablet_schema()->mutable_column()) {
unique_id_map[column.unique_id()] = &column;
}
// column info
for (size_t i = 0; i < in.schema_dict_key_list().column_dict_key_list_size(); ++i) {
int dict_key = in.schema_dict_key_list().column_dict_key_list(i);
const ColumnPB& dict_val = dict.column_dict().at(dict_key);
ColumnPB& to_add = *out->mutable_tablet_schema()->add_column();
to_add = dict_val;
VLOG_DEBUG << "fill dict column " << dict_val.ShortDebugString();
}

// index info
for (size_t i = 0; i < in.schema_dict_key_list().index_info_dict_key_list_size(); ++i) {
int dict_key = in.schema_dict_key_list().index_info_dict_key_list(i);
const TabletIndexPB& dict_val = dict.index_dict().at(dict_key);
*out->mutable_tablet_schema()->add_index() = dict_val;
VLOG_DEBUG << "fill dict index " << dict_val.ShortDebugString();
}

// sparse column info
for (size_t i = 0; i < in.schema_dict_key_list().sparse_column_dict_key_list_size(); ++i) {
int dict_key = in.schema_dict_key_list().sparse_column_dict_key_list(i);
const ColumnPB& dict_val = dict.column_dict().at(dict_key);
*unique_id_map.at(dict_val.parent_unique_id())->add_sparse_columns() = dict_val;
VLOG_DEBUG << "fill dict sparse column" << dict_val.ShortDebugString();
}
}

RowsetMetaPB cloud_rowset_meta_to_doris(const RowsetMetaCloudPB& in,
const SchemaCloudDictionary* dict) {
RowsetMetaPB out;
cloud_rowset_meta_to_doris(&out, in);
cloud_rowset_meta_to_doris(&out, in, dict);
return out;
}

RowsetMetaPB cloud_rowset_meta_to_doris(RowsetMetaCloudPB&& in) {
RowsetMetaPB cloud_rowset_meta_to_doris(RowsetMetaCloudPB&& in, const SchemaCloudDictionary* dict) {
RowsetMetaPB out;
cloud_rowset_meta_to_doris(&out, std::move(in));
cloud_rowset_meta_to_doris(&out, std::move(in), dict);
return out;
}

void cloud_rowset_meta_to_doris(RowsetMetaPB* out, const RowsetMetaCloudPB& in) {
void cloud_rowset_meta_to_doris(RowsetMetaPB* out, const RowsetMetaCloudPB& in,
const SchemaCloudDictionary* dict) {
// ATTN: please keep the set order aligned with the definition of proto `TabletSchemaCloudPB`.
out->set_rowset_id(in.rowset_id());
out->set_partition_id(in.partition_id());
Expand Down Expand Up @@ -185,6 +221,9 @@ void cloud_rowset_meta_to_doris(RowsetMetaPB* out, const RowsetMetaCloudPB& in)
if (in.has_tablet_schema()) {
cloud_tablet_schema_to_doris(out->mutable_tablet_schema(), in.tablet_schema());
}
if (dict != nullptr) {
fill_schema_with_dict(in, out, *dict);
}
out->set_txn_expiration(in.txn_expiration());
out->set_segments_overlap_pb(in.segments_overlap_pb());
out->mutable_segments_file_size()->CopyFrom(in.segments_file_size());
Expand All @@ -198,7 +237,8 @@ void cloud_rowset_meta_to_doris(RowsetMetaPB* out, const RowsetMetaCloudPB& in)
out->mutable_inverted_index_file_info()->CopyFrom(in.inverted_index_file_info());
}

void cloud_rowset_meta_to_doris(RowsetMetaPB* out, RowsetMetaCloudPB&& in) {
void cloud_rowset_meta_to_doris(RowsetMetaPB* out, RowsetMetaCloudPB&& in,
const SchemaCloudDictionary* dict) {
// ATTN: please keep the set order aligned with the definition of proto `TabletSchemaCloudPB`.
out->set_rowset_id(in.rowset_id());
out->set_partition_id(in.partition_id());
Expand Down Expand Up @@ -234,6 +274,9 @@ void cloud_rowset_meta_to_doris(RowsetMetaPB* out, RowsetMetaCloudPB&& in) {
cloud_tablet_schema_to_doris(out->mutable_tablet_schema(),
std::move(*in.mutable_tablet_schema()));
}
if (dict != nullptr) {
fill_schema_with_dict(in, out, *dict);
}
out->set_txn_expiration(in.txn_expiration());
out->set_segments_overlap_pb(in.segments_overlap_pb());
out->mutable_segments_file_size()->Swap(in.mutable_segments_file_size());
Expand Down
12 changes: 8 additions & 4 deletions be/src/cloud/pb_convert.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,14 @@ RowsetMetaCloudPB doris_rowset_meta_to_cloud(const RowsetMetaPB&);
RowsetMetaCloudPB doris_rowset_meta_to_cloud(RowsetMetaPB&&);
void doris_rowset_meta_to_cloud(RowsetMetaCloudPB* out, const RowsetMetaPB& in);
void doris_rowset_meta_to_cloud(RowsetMetaCloudPB* out, RowsetMetaPB&& in);
RowsetMetaPB cloud_rowset_meta_to_doris(const RowsetMetaCloudPB&);
RowsetMetaPB cloud_rowset_meta_to_doris(RowsetMetaCloudPB&&);
void cloud_rowset_meta_to_doris(RowsetMetaPB* out, const RowsetMetaCloudPB& in);
void cloud_rowset_meta_to_doris(RowsetMetaPB* out, RowsetMetaCloudPB&& in);
RowsetMetaPB cloud_rowset_meta_to_doris(const RowsetMetaCloudPB&,
const SchemaCloudDictionary* dict = nullptr);
RowsetMetaPB cloud_rowset_meta_to_doris(RowsetMetaCloudPB&&,
const SchemaCloudDictionary* dict = nullptr);
void cloud_rowset_meta_to_doris(RowsetMetaPB* out, const RowsetMetaCloudPB& in,
const SchemaCloudDictionary* dict = nullptr);
void cloud_rowset_meta_to_doris(RowsetMetaPB* out, RowsetMetaCloudPB&& in,
const SchemaCloudDictionary* dict = nullptr);

// TabletSchemaPB <=> TabletSchemaCloudPB
TabletSchemaCloudPB doris_tablet_schema_to_cloud(const TabletSchemaPB&);
Expand Down
16 changes: 10 additions & 6 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1003,6 +1003,8 @@ DEFINE_Int32(pipeline_executor_size, "0");
DEFINE_Bool(enable_workload_group_for_scan, "false");
DEFINE_mInt64(workload_group_scan_task_wait_timeout_ms, "10000");

// Whether use schema dict in backend side instead of MetaService side(cloud mode)
DEFINE_mBool(variant_use_cloud_schema_dict, "true");
DEFINE_mDouble(variant_ratio_of_defaults_as_sparse_column, "1");
DEFINE_mInt64(variant_threshold_rows_to_estimate_sparse_column, "2048");
DEFINE_mBool(variant_throw_exeception_on_invalid_json, "false");
Expand Down Expand Up @@ -1030,7 +1032,7 @@ DEFINE_Bool(enable_file_cache_query_limit, "false");
DEFINE_mInt32(file_cache_enter_disk_resource_limit_mode_percent, "90");
DEFINE_mInt32(file_cache_exit_disk_resource_limit_mode_percent, "80");
DEFINE_mBool(enable_read_cache_file_directly, "false");
DEFINE_mBool(file_cache_enable_evict_from_other_queue_by_size, "false");
DEFINE_mBool(file_cache_enable_evict_from_other_queue_by_size, "true");
DEFINE_mInt64(file_cache_ttl_valid_check_interval_second, "0"); // zero for not checking
// If true, evict the ttl cache using LRU when full.
// Otherwise, only expiration can evict ttl and new data won't add to cache when full.
Expand All @@ -1048,7 +1050,7 @@ DEFINE_String(inverted_index_searcher_cache_limit, "10%");
// set `true` to enable insert searcher into cache when write inverted index data
DEFINE_Bool(enable_write_index_searcher_cache, "true");
DEFINE_Bool(enable_inverted_index_cache_check_timestamp, "true");
DEFINE_Int32(inverted_index_fd_number_limit_percent, "40"); // 40%
DEFINE_Int32(inverted_index_fd_number_limit_percent, "20"); // 20%
DEFINE_Int32(inverted_index_query_cache_shards, "256");

// inverted index match bitmap cache size
Expand Down Expand Up @@ -1097,9 +1099,9 @@ DEFINE_mInt32(schema_cache_sweep_time_sec, "100");

// max number of segment cache, default -1 for backward compatibility fd_number*2/5
DEFINE_Int32(segment_cache_capacity, "-1");
DEFINE_Int32(segment_cache_fd_percentage, "40");
DEFINE_mInt32(estimated_mem_per_column_reader, "1024");
DEFINE_Int32(segment_cache_memory_percentage, "2");
DEFINE_Int32(segment_cache_fd_percentage, "20");
DEFINE_mInt32(estimated_mem_per_column_reader, "512");
DEFINE_Int32(segment_cache_memory_percentage, "5");

// enable feature binlog, default false
DEFINE_Bool(enable_feature_binlog, "false");
Expand Down Expand Up @@ -1313,7 +1315,7 @@ DEFINE_Int64(num_s3_file_upload_thread_pool_min_thread, "16");
// The max thread num for S3FileUploadThreadPool
DEFINE_Int64(num_s3_file_upload_thread_pool_max_thread, "64");
// The max ratio for ttl cache's size
DEFINE_mInt64(max_ttl_cache_ratio, "90");
DEFINE_mInt64(max_ttl_cache_ratio, "50");
// The maximum jvm heap usage ratio for hdfs write workload
DEFINE_mDouble(max_hdfs_wirter_jni_heap_usage_ratio, "0.5");
// The sleep milliseconds duration when hdfs write exceeds the maximum usage
Expand Down Expand Up @@ -1374,6 +1376,8 @@ DEFINE_mInt32(lz4_compression_block_size, "262144");

DEFINE_mBool(enable_pipeline_task_leakage_detect, "false");

DEFINE_Int32(query_cache_size, "512");

// clang-format off
#ifdef BE_TEST
// test s3
Expand Down
4 changes: 4 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1199,6 +1199,7 @@ DECLARE_mInt64(LZ4_HC_compression_level);
// Threshold of a column as sparse column
// Notice: TEST ONLY
DECLARE_mDouble(variant_ratio_of_defaults_as_sparse_column);
DECLARE_mBool(variant_use_cloud_schema_dict);
// Threshold to estimate a column is sparsed
// Notice: TEST ONLY
DECLARE_mInt64(variant_threshold_rows_to_estimate_sparse_column);
Expand Down Expand Up @@ -1454,6 +1455,9 @@ DECLARE_mInt32(lz4_compression_block_size);

DECLARE_mBool(enable_pipeline_task_leakage_detect);

// MB
DECLARE_Int32(query_cache_size);

#ifdef BE_TEST
// test s3
DECLARE_String(test_s3_resource);
Expand Down
Loading

0 comments on commit e7de6a5

Please sign in to comment.