Skip to content

Commit

Permalink
Merge branch 'master' into test_alter-table-partition
Browse files Browse the repository at this point in the history
  • Loading branch information
cjj2010 authored Sep 29, 2024
2 parents 2ad5204 + b6b8e4f commit d245552
Show file tree
Hide file tree
Showing 818 changed files with 97,446 additions and 1,725 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ thirdparty/doris-thirdparty*.tar.xz
docker/thirdparties/docker-compose/mysql/data
docker/thirdparties/docker-compose/hive/scripts/tpch1.db/
docker/thirdparties/docker-compose/hive/scripts/paimon1
docker/thirdparties/docker-compose/hive/scripts/tvf_data

fe_plugins/output
fe_plugins/**/.factorypath
Expand Down
2 changes: 1 addition & 1 deletion be/src/cloud/cloud_base_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ Status CloudBaseCompaction::modify_rowsets() {
int64_t initiator = HashUtil::hash64(_uuid.data(), _uuid.size(), 0) &
std::numeric_limits<int64_t>::max();
RETURN_IF_ERROR(cloud_tablet()->calc_delete_bitmap_for_compaction(
_input_rowsets, _output_rowset, _rowid_conversion, compaction_type(),
_input_rowsets, _output_rowset, *_rowid_conversion, compaction_type(),
_stats.merged_rows, _stats.filtered_rows, initiator, output_rowset_delete_bitmap,
_allow_delete_in_cumu_compaction));
LOG_INFO("update delete bitmap in CloudBaseCompaction, tablet_id={}, range=[{}-{}]",
Expand Down
11 changes: 10 additions & 1 deletion be/src/cloud/cloud_cumulative_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -255,13 +255,22 @@ Status CloudCumulativeCompaction::modify_rowsets() {
compaction_job->add_txn_id(_output_rowset->txn_id());
compaction_job->add_output_rowset_ids(_output_rowset->rowset_id().to_string());

DBUG_EXECUTE_IF("CloudCumulativeCompaction::modify_rowsets.enable_spin_wait", {
LOG(INFO) << "CloudCumulativeCompaction::modify_rowsets.enable_spin_wait, start";
while (DebugPoints::instance()->is_enable(
"CloudCumulativeCompaction::modify_rowsets.block")) {
std::this_thread::sleep_for(std::chrono::milliseconds(50));
}
LOG(INFO) << "CloudCumulativeCompaction::modify_rowsets.enable_spin_wait, exit";
});

DeleteBitmapPtr output_rowset_delete_bitmap = nullptr;
int64_t initiator =
HashUtil::hash64(_uuid.data(), _uuid.size(), 0) & std::numeric_limits<int64_t>::max();
if (_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
_tablet->enable_unique_key_merge_on_write()) {
RETURN_IF_ERROR(cloud_tablet()->calc_delete_bitmap_for_compaction(
_input_rowsets, _output_rowset, _rowid_conversion, compaction_type(),
_input_rowsets, _output_rowset, *_rowid_conversion, compaction_type(),
_stats.merged_rows, _stats.filtered_rows, initiator, output_rowset_delete_bitmap,
_allow_delete_in_cumu_compaction));
LOG_INFO("update delete bitmap in CloudCumulativeCompaction, tablet_id={}, range=[{}-{}]",
Expand Down
6 changes: 4 additions & 2 deletions be/src/cloud/cloud_delete_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,10 @@ Status CloudDeleteTask::execute(CloudStorageEngine& engine, const TPushReq& requ
LOG_WARNING("tablet exceeds max version num limit")
.tag("limit", config::max_tablet_version_num)
.tag("tablet_id", tablet->tablet_id());
return Status::Error<TOO_MANY_VERSION>("too many versions, versions={} tablet={}",
config::max_tablet_version_num, tablet->tablet_id());
return Status::Error<TOO_MANY_VERSION>(
"too many versions, versions={} tablet={}. Please reduce the frequency of loading "
"data or adjust the max_tablet_version_num in be.conf to a larger value.",
config::max_tablet_version_num, tablet->tablet_id());
}

// check delete condition if push for delete
Expand Down
3 changes: 2 additions & 1 deletion be/src/cloud/cloud_rowset_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ Status CloudRowsetBuilder::check_tablet_version_count() {
if (version_count > config::max_tablet_version_num) {
return Status::Error<TOO_MANY_VERSION>(
"failed to init rowset builder. version count: {}, exceed limit: {}, "
"tablet: {}",
"tablet: {}. Please reduce the frequency of loading data or adjust the "
"max_tablet_version_num in be.conf to a larger value.",
version_count, config::max_tablet_version_num, _tablet->tablet_id());
}
return Status::OK();
Expand Down
28 changes: 22 additions & 6 deletions be/src/cloud/cloud_txn_delete_bitmap_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,18 @@ Status CloudTxnDeleteBitmapCache::get_tablet_txn_info(
*publish_status = iter->second.publish_status;
*previous_publish_info = iter->second.publish_info;
}
RETURN_IF_ERROR(
get_delete_bitmap(transaction_id, tablet_id, delete_bitmap, rowset_ids, nullptr));
return Status::OK();

auto st = get_delete_bitmap(transaction_id, tablet_id, delete_bitmap, rowset_ids, nullptr);

if (st.is<ErrorCode::NOT_FOUND>()) {
// Because of the rowset_ids become empty, all delete bitmap
// will be recalculate in CalcDeleteBitmapTask
if (delete_bitmap != nullptr) {
*delete_bitmap = std::make_shared<DeleteBitmap>(tablet_id);
}
return Status::OK();
}
return st;
}

Status CloudTxnDeleteBitmapCache::get_delete_bitmap(
Expand All @@ -95,6 +104,13 @@ Status CloudTxnDeleteBitmapCache::get_delete_bitmap(
CacheKey key(key_str);
Cache::Handle* handle = lookup(key);

DBUG_EXECUTE_IF("CloudTxnDeleteBitmapCache::get_delete_bitmap.cache_miss", {
handle = nullptr;
LOG(INFO) << "CloudTxnDeleteBitmapCache::get_delete_bitmap.cache_miss, make cache missed "
"when get delete bitmap, txn_id:"
<< transaction_id << ", tablet_id: " << tablet_id;
});

DeleteBitmapCacheValue* val =
handle == nullptr ? nullptr : reinterpret_cast<DeleteBitmapCacheValue*>(value(handle));
if (val) {
Expand All @@ -109,9 +125,9 @@ Status CloudTxnDeleteBitmapCache::get_delete_bitmap(
LOG_INFO("cache missed when get delete bitmap")
.tag("txn_id", transaction_id)
.tag("tablet_id", tablet_id);
// Because of the rowset_ids become empty, all delete bitmap
// will be recalculate in CalcDeleteBitmapTask
*delete_bitmap = std::make_shared<DeleteBitmap>(tablet_id);
return Status::Error<ErrorCode::NOT_FOUND, false>(
"cache missed when get delete bitmap, tablet_id={}, transaction_id={}", tablet_id,
transaction_id);
}
return Status::OK();
}
Expand Down
2 changes: 2 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,8 @@ DEFINE_mInt32(doris_scan_range_max_mb, "1024");
DEFINE_mInt32(doris_scanner_row_num, "16384");
// single read execute fragment row bytes
DEFINE_mInt32(doris_scanner_row_bytes, "10485760");
// single read execute fragment max run time millseconds
DEFINE_mInt32(doris_scanner_max_run_time_ms, "1000");
// (Advanced) Maximum size of per-query receive-side buffer
DEFINE_mInt32(exchg_node_buffer_size_bytes, "20485760");
DEFINE_mInt32(exchg_buffer_queue_capacity_factor, "64");
Expand Down
2 changes: 2 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,8 @@ DECLARE_mInt32(doris_scan_range_max_mb);
DECLARE_mInt32(doris_scanner_row_num);
// single read execute fragment row bytes
DECLARE_mInt32(doris_scanner_row_bytes);
// single read execute fragment max run time millseconds
DECLARE_mInt32(doris_scanner_max_run_time_ms);
// (Advanced) Maximum size of per-query receive-side buffer
DECLARE_mInt32(exchg_node_buffer_size_bytes);
DECLARE_mInt32(exchg_buffer_queue_capacity_factor);
Expand Down
14 changes: 8 additions & 6 deletions be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,9 @@ Compaction::Compaction(BaseTabletSPtr tablet, const std::string& label)
_tablet(std::move(tablet)),
_is_vertical(config::enable_vertical_compaction),
_allow_delete_in_cumu_compaction(config::enable_delete_when_cumu_compaction) {
;
init_profile(label);
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker);
_rowid_conversion = std::make_unique<RowIdConversion>();
}

Compaction::~Compaction() {
Expand All @@ -130,6 +131,7 @@ Compaction::~Compaction() {
_input_rowsets.clear();
_output_rowset.reset();
_cur_tablet_schema.reset();
_rowid_conversion.reset();
}

void Compaction::init_profile(const std::string& label) {
Expand Down Expand Up @@ -176,7 +178,7 @@ Status Compaction::merge_input_rowsets() {
// the row ID conversion matrix needs to be used for inverted index compaction.
if (!ctx.skip_inverted_index.empty() || (_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
_tablet->enable_unique_key_merge_on_write())) {
_stats.rowid_conversion = &_rowid_conversion;
_stats.rowid_conversion = _rowid_conversion.get();
}

int64_t way_num = merge_way_num();
Expand Down Expand Up @@ -493,7 +495,7 @@ Status Compaction::do_inverted_index_compaction() {
std::map<RowsetSharedPtr, std::list<std::pair<RowLocation, RowLocation>>> location_map;
// Convert the delete bitmap of the input rowsets to output rowset.
_tablet->calc_compaction_output_rowset_delete_bitmap(
_input_rowsets, _rowid_conversion, 0, cur_max_version + 1, &missed_rows, &location_map,
_input_rowsets, *_rowid_conversion, 0, cur_max_version + 1, &missed_rows, &location_map,
_tablet->tablet_meta()->delete_bitmap(), &output_rowset_delete_bitmap);

if (!_allow_delete_in_cumu_compaction) {
Expand Down Expand Up @@ -947,7 +949,7 @@ Status CompactionMixin::modify_rowsets() {
// TODO(LiaoXin): check if there are duplicate keys
std::size_t missed_rows_size = 0;
tablet()->calc_compaction_output_rowset_delete_bitmap(
_input_rowsets, _rowid_conversion, 0, version.second + 1, missed_rows.get(),
_input_rowsets, *_rowid_conversion, 0, version.second + 1, missed_rows.get(),
location_map.get(), _tablet->tablet_meta()->delete_bitmap(),
&output_rowset_delete_bitmap);
if (missed_rows) {
Expand Down Expand Up @@ -1022,7 +1024,7 @@ Status CompactionMixin::modify_rowsets() {
}
DeleteBitmap txn_output_delete_bitmap(_tablet->tablet_id());
tablet()->calc_compaction_output_rowset_delete_bitmap(
_input_rowsets, _rowid_conversion, 0, UINT64_MAX, missed_rows.get(),
_input_rowsets, *_rowid_conversion, 0, UINT64_MAX, missed_rows.get(),
location_map.get(), *it.delete_bitmap.get(), &txn_output_delete_bitmap);
if (config::enable_merge_on_write_correctness_check) {
RowsetIdUnorderedSet rowsetids;
Expand All @@ -1042,7 +1044,7 @@ Status CompactionMixin::modify_rowsets() {
// Convert the delete bitmap of the input rowsets to output rowset for
// incremental data.
tablet()->calc_compaction_output_rowset_delete_bitmap(
_input_rowsets, _rowid_conversion, version.second, UINT64_MAX,
_input_rowsets, *_rowid_conversion, version.second, UINT64_MAX,
missed_rows.get(), location_map.get(), _tablet->tablet_meta()->delete_bitmap(),
&output_rowset_delete_bitmap);

Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ class Compaction {
Version _output_version;

int64_t _newest_write_timestamp {-1};
RowIdConversion _rowid_conversion;
std::unique_ptr<RowIdConversion> _rowid_conversion = nullptr;
TabletSchemaSPtr _cur_tablet_schema;

std::unique_ptr<RuntimeProfile> _profile;
Expand Down
4 changes: 4 additions & 0 deletions be/src/olap/memtable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,10 @@ Status MemTable::_to_block(std::unique_ptr<vectorized::Block>* res) {
}
if (_keys_type == KeysType::UNIQUE_KEYS && _enable_unique_key_mow &&
!_tablet_schema->cluster_key_idxes().empty()) {
if (_is_partial_update) {
return Status::InternalError(
"Partial update for mow with cluster keys is not supported");
}
RETURN_IF_ERROR(_sort_by_cluster_keys());
}
g_memtable_input_block_allocated_size << -_input_mutable_block.allocated_bytes();
Expand Down
7 changes: 5 additions & 2 deletions be/src/olap/push_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,9 @@ Status PushHandler::_do_streaming_ingestion(TabletSharedPtr tablet, const TPushR
// check if version number exceed limit
if (tablet->exceed_version_limit(config::max_tablet_version_num)) {
return Status::Status::Error<TOO_MANY_VERSION>(
"failed to push data. version count: {}, exceed limit: {}, tablet: {}",
"failed to push data. version count: {}, exceed limit: {}, tablet: {}. Please "
"reduce the frequency of loading data or adjust the max_tablet_version_num in "
"be.conf to a larger value.",
tablet->version_count(), config::max_tablet_version_num, tablet->tablet_id());
}

Expand All @@ -172,7 +174,8 @@ Status PushHandler::_do_streaming_ingestion(TabletSharedPtr tablet, const TPushR
config::tablet_meta_serialize_size_limit) {
return Status::Error<TOO_MANY_VERSION>(
"failed to init rowset builder. meta serialize size : {}, exceed limit: {}, "
"tablet: {}",
"tablet: {}. Please reduce the frequency of loading data or adjust the "
"max_tablet_version_num in be.conf to a larger value.",
tablet->avg_rs_meta_serialize_size() * version_count,
config::tablet_meta_serialize_size_limit, tablet->tablet_id());
}
Expand Down
29 changes: 26 additions & 3 deletions be/src/olap/rowid_conversion.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

#include "olap/olap_common.h"
#include "olap/utils.h"
#include "runtime/thread_context.h"

namespace doris {

Expand All @@ -33,17 +34,24 @@ namespace doris {
class RowIdConversion {
public:
RowIdConversion() = default;
~RowIdConversion() = default;
~RowIdConversion() { RELEASE_THREAD_MEM_TRACKER(_seg_rowid_map_mem_used); }

// resize segment rowid map to its rows num
void init_segment_map(const RowsetId& src_rowset_id, const std::vector<uint32_t>& num_rows) {
size_t delta_std_pair_cap = 0;
for (size_t i = 0; i < num_rows.size(); i++) {
uint32_t id = _segments_rowid_map.size();
_segment_to_id_map.emplace(std::pair<RowsetId, uint32_t> {src_rowset_id, i}, id);
_id_to_segment_map.emplace_back(src_rowset_id, i);
_segments_rowid_map.emplace_back(std::vector<std::pair<uint32_t, uint32_t>>(
num_rows[i], std::pair<uint32_t, uint32_t>(UINT32_MAX, UINT32_MAX)));
std::vector<std::pair<uint32_t, uint32_t>> vec(
num_rows[i], std::pair<uint32_t, uint32_t>(UINT32_MAX, UINT32_MAX));
delta_std_pair_cap += vec.capacity();
_segments_rowid_map.emplace_back(std::move(vec));
}
//NOTE: manually count _segments_rowid_map's memory here, because _segments_rowid_map could be used by indexCompaction.
// indexCompaction is a thridparty code, it's too complex to modify it.
// refer compact_column.
track_mem_usage(delta_std_pair_cap);
}

// set dst rowset id
Expand Down Expand Up @@ -109,12 +117,27 @@ class RowIdConversion {
return _segment_to_id_map.at(segment);
}

private:
void track_mem_usage(size_t delta_std_pair_cap) {
_std_pair_cap += delta_std_pair_cap;

size_t new_size =
_std_pair_cap * sizeof(std::pair<uint32_t, uint32_t>) +
_segments_rowid_map.capacity() * sizeof(std::vector<std::pair<uint32_t, uint32_t>>);

RELEASE_THREAD_MEM_TRACKER(_seg_rowid_map_mem_used);
CONSUME_THREAD_MEM_TRACKER(new_size);
_seg_rowid_map_mem_used = new_size;
}

private:
// the first level vector: index indicates src segment.
// the second level vector: index indicates row id of source segment,
// value indicates row id of destination segment.
// <UINT32_MAX, UINT32_MAX> indicates current row not exist.
std::vector<std::vector<std::pair<uint32_t, uint32_t>>> _segments_rowid_map;
size_t _seg_rowid_map_mem_used {0};
size_t _std_pair_cap {0};

// Map source segment to 0 to n
std::map<std::pair<RowsetId, uint32_t>, uint32_t> _segment_to_id_map;
Expand Down
19 changes: 19 additions & 0 deletions be/src/olap/rowset/beta_rowset_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,11 @@ Status BetaRowsetReader::next_block(vectorized::Block* block) {
return Status::Error<END_OF_FILE>("BetaRowsetReader is empty");
}

RuntimeState* runtime_state = nullptr;
if (_read_context != nullptr) {
runtime_state = _read_context->runtime_state;
}

do {
auto s = _iterator->next_batch(block);
if (!s.ok()) {
Expand All @@ -359,6 +364,10 @@ Status BetaRowsetReader::next_block(vectorized::Block* block) {
}
return s;
}

if (runtime_state != nullptr && runtime_state->is_cancelled()) [[unlikely]] {
return runtime_state->cancel_reason();
}
} while (block->empty());

return Status::OK();
Expand All @@ -367,6 +376,12 @@ Status BetaRowsetReader::next_block(vectorized::Block* block) {
Status BetaRowsetReader::next_block_view(vectorized::BlockView* block_view) {
SCOPED_RAW_TIMER(&_stats->block_fetch_ns);
RETURN_IF_ERROR(_init_iterator_once());

RuntimeState* runtime_state = nullptr;
if (_read_context != nullptr) {
runtime_state = _read_context->runtime_state;
}

do {
auto s = _iterator->next_block_view(block_view);
if (!s.ok()) {
Expand All @@ -375,6 +390,10 @@ Status BetaRowsetReader::next_block_view(vectorized::BlockView* block_view) {
}
return s;
}

if (runtime_state != nullptr && runtime_state->is_cancelled()) [[unlikely]] {
return runtime_state->cancel_reason();
}
} while (block_view->empty());

return Status::OK();
Expand Down
14 changes: 12 additions & 2 deletions be/src/olap/rowset/beta_rowset_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,14 @@ Status BaseBetaRowsetWriter::_generate_delete_bitmap(int32_t segment_id) {
return Status::OK();
}

Status BetaRowsetWriter::init(const RowsetWriterContext& rowset_writer_context) {
RETURN_IF_ERROR(BaseBetaRowsetWriter::init(rowset_writer_context));
if (_segcompaction_worker) {
_segcompaction_worker->init_mem_tracker(rowset_writer_context.txn_id);
}
return Status::OK();
}

Status BetaRowsetWriter::_load_noncompacted_segment(segment_v2::SegmentSharedPtr& segment,
int32_t segment_id) {
DCHECK(_rowset_meta->is_local());
Expand Down Expand Up @@ -917,7 +925,8 @@ Status BaseBetaRowsetWriter::_check_segment_number_limit(size_t segnum) {
if (UNLIKELY(segnum > config::max_segment_num_per_rowset)) {
return Status::Error<TOO_MANY_SEGMENTS>(
"too many segments in rowset. tablet_id:{}, rowset_id:{}, max:{}, "
"_num_segment:{}, rowset_num_rows:{}",
"_num_segment:{}, rowset_num_rows:{}. Please check if the bucket number is too "
"small or if the data is skewed.",
_context.tablet_id, _context.rowset_id.to_string(),
config::max_segment_num_per_rowset, _num_segment, get_rowset_num_rows());
}
Expand All @@ -930,7 +939,8 @@ Status BetaRowsetWriter::_check_segment_number_limit(size_t segnum) {
if (UNLIKELY(segnum > config::max_segment_num_per_rowset)) {
return Status::Error<TOO_MANY_SEGMENTS>(
"too many segments in rowset. tablet_id:{}, rowset_id:{}, max:{}, _num_segment:{}, "
"_segcompacted_point:{}, _num_segcompacted:{}, rowset_num_rows:{}",
"_segcompacted_point:{}, _num_segcompacted:{}, rowset_num_rows:{}. Please check if "
"the bucket number is too small or if the data is skewed.",
_context.tablet_id, _context.rowset_id.to_string(),
config::max_segment_num_per_rowset, _num_segment, _segcompacted_point,
_num_segcompacted, get_rowset_num_rows());
Expand Down
4 changes: 3 additions & 1 deletion be/src/olap/rowset/beta_rowset_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,8 @@ class BetaRowsetWriter : public BaseBetaRowsetWriter {

Status build(RowsetSharedPtr& rowset) override;

Status init(const RowsetWriterContext& rowset_writer_context) override;

Status add_segment(uint32_t segment_id, const SegmentStatistics& segstat,
TabletSchemaSPtr flush_schema) override;

Expand Down Expand Up @@ -318,7 +320,7 @@ class BetaRowsetWriter : public BaseBetaRowsetWriter {
// already been segment compacted
std::atomic<int32_t> _num_segcompacted {0}; // index for segment compaction

std::shared_ptr<SegcompactionWorker> _segcompaction_worker;
std::shared_ptr<SegcompactionWorker> _segcompaction_worker = nullptr;

// ensure only one inflight segcompaction task for each rowset
std::atomic<bool> _is_doing_segcompaction {false};
Expand Down
Loading

0 comments on commit d245552

Please sign in to comment.