Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/branch-2.1' into sessionvar_4_co…
Browse files Browse the repository at this point in the history
…oldown_2.1

# Conflicts:
#	fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
  • Loading branch information
cjj2010 committed Nov 7, 2024
2 parents a0da74d + bbfed50 commit bbde859
Show file tree
Hide file tree
Showing 255 changed files with 28,641 additions and 2,754 deletions.
10 changes: 9 additions & 1 deletion .github/workflows/build-extension.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ name: Build Extensions

on:
pull_request:

workflow_dispatch:
issue_comment:
types: [ created ]
concurrency:
group: ${{ github.ref }} (Build Extensions)
cancel-in-progress: true
Expand All @@ -29,6 +31,12 @@ jobs:
changes:
name: Detect Changes
runs-on: ubuntu-latest
if: |
(github.event_name == 'pull_request') ||
(github.event_name == 'issue_comment' &&
github.event.comment.body == 'run buildall' &&
github.actor == 'doris-robot' &&
github.event.issue.user.login == 'github-actions[bot]')
outputs:
broker_changes: ${{ steps.filter.outputs.broker_changes }}
docs_changes: ${{ steps.filter.outputs.docs_changes }}
Expand Down
9 changes: 9 additions & 0 deletions .github/workflows/build-thirdparty.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ name: Build Third Party Libraries

on:
pull_request:
workflow_dispatch:
issue_comment:
types: [ created ]

concurrency:
group: ${{ github.ref }} (Build Third Party Libraries)
Expand All @@ -28,6 +31,12 @@ jobs:
changes:
name: Detect Changes
runs-on: ubuntu-latest
if: |
(github.event_name == 'pull_request') ||
(github.event_name == 'issue_comment' &&
github.event.comment.body == 'run buildall' &&
github.actor == 'doris-robot' &&
github.event.issue.user.login == 'github-actions[bot]')
outputs:
thirdparty_changes: ${{ steps.filter.outputs.thirdparty_changes }}
steps:
Expand Down
9 changes: 9 additions & 0 deletions .github/workflows/checkstyle.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,20 @@ name: FE Code Style Checker

on:
pull_request:
workflow_dispatch:
issue_comment:
types: [ created ]

jobs:
java-checkstyle:
name: "CheckStyle"
runs-on: ubuntu-latest
if: |
(github.event_name == 'pull_request') ||
(github.event_name == 'issue_comment' &&
github.event.comment.body == 'run buildall' &&
github.actor == 'doris-robot' &&
github.event.issue.user.login == 'github-actions[bot]')
steps:
- name: Checkout
uses: actions/checkout@v3
Expand Down
14 changes: 12 additions & 2 deletions .github/workflows/clang-format.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,22 @@
---
name: BE Code Formatter

on: [push, pull_request_target]

on:
pull_request:
pull_request_target:
workflow_dispatch:
issue_comment:
types: [ created ]
jobs:
clang-format:
name: "Clang Formatter"
runs-on: ubuntu-latest
if: |
(github.event_name == 'pull_request') || (github.event_name == 'pull_request_target') ||
(github.event_name == 'issue_comment' &&
github.event.comment.body == 'run buildall' &&
github.actor == 'doris-robot' &&
github.event.issue.user.login == 'github-actions[bot]')
steps:
- name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )"
if: ${{ github.event_name != 'pull_request_target' }}
Expand Down
8 changes: 8 additions & 0 deletions .github/workflows/license-eyes.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,21 @@
name: License Check
on:
pull_request_target:
pull_request:
types: [opened, synchronize, labeled]
push:
branches:
- master

jobs:
license-check:
name: "License Check"
runs-on: ubuntu-latest
if: |
(github.event_name == 'pull_request_target') ||
(github.event_name == 'push' && github.ref == 'refs/heads/master') ||
(github.event.label.name == 'run-workflow' &&
github.event.issue.user.login == 'github-actions[bot]')
steps:
- name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )"
if: ${{ github.event_name != 'pull_request_target' }}
Expand Down
3 changes: 2 additions & 1 deletion be/src/olap/cold_data_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

#include "common/compiler_util.h" // IWYU pragma: keep
#include "common/config.h"
#include "common/status.h"
#include "olap/compaction.h"
#include "olap/olap_common.h"
#include "olap/rowset/rowset.h"
Expand Down Expand Up @@ -85,7 +86,7 @@ Status ColdDataCompaction::modify_rowsets(const Merger::Statistics* stats) {
std::lock_guard wlock(_tablet->get_header_lock());
SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD);
// Merged cooldowned rowsets MUST NOT be managed by version graph, they will be reclaimed by `remove_unused_remote_files`.
_tablet->delete_rowsets(_input_rowsets, false);
RETURN_IF_ERROR(_tablet->delete_rowsets(_input_rowsets, false));
_tablet->add_rowsets({_output_rowset});
// TODO(plat1ko): process primary key
_tablet->tablet_meta()->set_cooldown_meta_id(cooldown_meta_id);
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/schema_change.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -881,7 +881,7 @@ Status SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2&
}
}
std::vector<RowsetSharedPtr> empty_vec;
new_tablet->delete_rowsets(rowsets_to_delete, false);
RETURN_IF_ERROR(new_tablet->delete_rowsets(rowsets_to_delete, false));
// inherit cumulative_layer_point from base_tablet
// check if new_tablet.ce_point > base_tablet.ce_point?
new_tablet->set_cumulative_layer_point(-1);
Expand Down
21 changes: 12 additions & 9 deletions be/src/olap/tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ Status Tablet::revise_tablet_meta(const std::vector<RowsetSharedPtr>& to_add,
// error handling
if (!calc_bm_status.ok()) {
if (is_incremental_clone) {
delete_rowsets(to_add, false);
RETURN_IF_ERROR(delete_rowsets(to_add, false));
LOG(WARNING) << "incremental clone on tablet: " << tablet_id() << " failed due to "
<< calc_bm_status.msg() << ", revert " << to_add.size()
<< " rowsets added before.";
Expand All @@ -452,7 +452,7 @@ Status Tablet::revise_tablet_meta(const std::vector<RowsetSharedPtr>& to_add,

// full clone, calculate delete bitmap succeeded, update rowset
if (!is_incremental_clone) {
delete_rowsets(to_delete, false);
RETURN_IF_ERROR(delete_rowsets(to_delete, false));
add_rowsets(to_add);
// reconstruct from tablet meta
_timestamped_version_tracker.construct_versioned_tracker(_tablet_meta->all_rs_metas());
Expand Down Expand Up @@ -638,28 +638,31 @@ void Tablet::add_rowsets(const std::vector<RowsetSharedPtr>& to_add) {
_tablet_meta->modify_rs_metas(rs_metas, {});
}

void Tablet::delete_rowsets(const std::vector<RowsetSharedPtr>& to_delete, bool move_to_stale) {
Status Tablet::delete_rowsets(const std::vector<RowsetSharedPtr>& to_delete, bool move_to_stale) {
if (to_delete.empty()) {
return;
return Status::OK();
}
std::vector<RowsetMetaSharedPtr> rs_metas;
rs_metas.reserve(to_delete.size());
for (auto& rs : to_delete) {
for (const auto& rs : to_delete) {
rs_metas.push_back(rs->rowset_meta());
_rs_version_map.erase(rs->version());
}
_tablet_meta->modify_rs_metas({}, rs_metas, !move_to_stale);
if (move_to_stale) {
for (auto& rs : to_delete) {
for (const auto& rs : to_delete) {
_stale_rs_version_map[rs->version()] = rs;
}
_timestamped_version_tracker.add_stale_path_version(rs_metas);
} else {
for (auto& rs : to_delete) {
for (const auto& rs : to_delete) {
_timestamped_version_tracker.delete_version(rs->version());
_engine.add_unused_rowset(rs);
RETURN_IF_ERROR(RowsetMetaManager::remove(_data_dir->get_meta(), tablet_uid(),
rs->rowset_meta()->rowset_id()));
}
}
return Status::OK();
}

// snapshot manager may call this api to check if version exists, so that
Expand Down Expand Up @@ -2292,7 +2295,7 @@ Status Tablet::_cooldown_data(RowsetSharedPtr rowset) {
std::unique_lock meta_wlock(_meta_lock);
SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD);
if (tablet_state() == TABLET_RUNNING) {
delete_rowsets({std::move(old_rowset)}, false);
RETURN_IF_ERROR(delete_rowsets({std::move(old_rowset)}, false));
add_rowsets({std::move(new_rowset)});
// TODO(plat1ko): process primary key
_tablet_meta->set_cooldown_meta_id(cooldown_meta_id);
Expand Down Expand Up @@ -2506,7 +2509,7 @@ Status Tablet::_follow_cooldowned_data() {
to_add.push_back(std::move(rs));
}
// Note: We CANNOT call `modify_rowsets` here because `modify_rowsets` cannot process version graph correctly.
delete_rowsets(to_delete, false);
RETURN_IF_ERROR(delete_rowsets(to_delete, false));
add_rowsets(to_add);
// TODO(plat1ko): process primary key
_tablet_meta->set_cooldown_meta_id(cooldown_meta_pb.cooldown_meta_id());
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ class Tablet final : public BaseTablet {
// MUST hold EXCLUSIVE `_meta_lock`
void add_rowsets(const std::vector<RowsetSharedPtr>& to_add);
// MUST hold EXCLUSIVE `_meta_lock`
void delete_rowsets(const std::vector<RowsetSharedPtr>& to_delete, bool move_to_stale);
Status delete_rowsets(const std::vector<RowsetSharedPtr>& to_delete, bool move_to_stale);

// MUST hold SHARED `_meta_lock`
const auto& rowset_map() const { return _rs_version_map; }
Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/tablet_meta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
#include "olap/file_header.h"
#include "olap/olap_common.h"
#include "olap/olap_define.h"
#include "olap/rowset/rowset.h"
#include "olap/rowset/rowset_meta_manager.h"
#include "olap/tablet_meta_manager.h"
#include "olap/utils.h"
#include "util/debug_points.h"
Expand Down
7 changes: 7 additions & 0 deletions be/src/olap/task/index_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,13 @@ Status IndexBuilder::_add_nullable(const std::string& column_name,
return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(
"CLuceneError occured: {}", e.what());
}
// we should refresh nullmap for array
for (int row_id = 0; row_id < num_rows; row_id++) {
if (null_map && null_map[row_id] == 1) {
RETURN_IF_ERROR(
_inverted_index_builders[index_writer_sign]->add_array_nulls(row_id));
}
}
return Status::OK();
}

Expand Down
9 changes: 1 addition & 8 deletions be/src/pipeline/exec/exchange_sink_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -480,18 +480,11 @@ bool ExchangeSinkBuffer<Parent>::_is_receiver_eof(InstanceLoId id) {
}

template <typename Parent>
void ExchangeSinkBuffer<Parent>::_turn_off_channel(InstanceLoId id, bool cleanup) {
void ExchangeSinkBuffer<Parent>::_turn_off_channel(InstanceLoId id, bool /*cleanup*/) {
if (!_rpc_channel_is_idle[id]) {
_rpc_channel_is_idle[id] = true;
auto all_done = _busy_channels.fetch_sub(1) == 1;
_set_ready_to_finish(all_done);
if (cleanup && all_done) {
auto weak_task_ctx = weak_task_exec_ctx();
if (auto pip_ctx = weak_task_ctx.lock()) {
DCHECK(_parent);
_parent->set_reach_limit();
}
}
}
}

Expand Down
3 changes: 2 additions & 1 deletion be/src/pipeline/exec/scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1040,7 +1040,7 @@ Status ScanLocalState<Derived>::_start_scanners(
auto& p = _parent->cast<typename Derived::Parent>();
_scanner_ctx = PipXScannerContext::create_shared(
state(), this, p._output_tuple_desc, p.output_row_descriptor(), scanners, p.limit(),
_scan_dependency, p.ignore_data_distribution());
_scan_dependency, p.ignore_data_distribution(), p.is_file_scan_operator());
return Status::OK();
}

Expand Down Expand Up @@ -1309,6 +1309,7 @@ Status ScanOperatorX<LocalStateType>::get_block(RuntimeState* state, vectorized:
if (*eos) {
// reach limit, stop the scanners.
local_state._scanner_ctx->stop_scanners(state);
local_state._scanner_profile->add_info_string("EOS", "True");
}

return Status::OK();
Expand Down
6 changes: 5 additions & 1 deletion be/src/pipeline/pipeline_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -273,19 +273,22 @@ Status PipelineTask::execute(bool* eos) {
}

auto status = Status::OK();

_task_profile->add_info_string("TaskState", "Runnable");
this->set_begin_execute_time();
while (!_fragment_context->is_canceled()) {
if (_data_state != SourceState::MORE_DATA && !source_can_read()) {
set_state(PipelineTaskState::BLOCKED_FOR_SOURCE);
_task_profile->add_info_string("TaskState", "BlockedBySource");
break;
}
if (!sink_can_write()) {
set_state(PipelineTaskState::BLOCKED_FOR_SINK);
_task_profile->add_info_string("TaskState", "BlockedBySink");
break;
}
if (time_spent > THREAD_TIME_SLICE) {
COUNTER_UPDATE(_yield_counts, 1);
_task_profile->add_info_string("TaskState", "Yield");
break;
}
// TODO llj: Pipeline entity should_yield
Expand All @@ -309,6 +312,7 @@ Status PipelineTask::execute(bool* eos) {
}
*eos = status.is<ErrorCode::END_OF_FILE>() ? true : *eos;
if (*eos) { // just return, the scheduler will do finish work
_task_profile->add_info_string("TaskState", "Finished");
break;
}
}
Expand Down
19 changes: 15 additions & 4 deletions be/src/util/jni-util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ Status JniUtil::GetJniExceptionMsg(JNIEnv* env, bool log_stack, const string& pr
}

jobject JniUtil::convert_to_java_map(JNIEnv* env, const std::map<std::string, std::string>& map) {
//TODO: ADD EXCEPTION CHECK.
jclass hashmap_class = env->FindClass("java/util/HashMap");
jmethodID hashmap_constructor = env->GetMethodID(hashmap_class, "<init>", "(I)V");
jobject hashmap_object = env->NewObject(hashmap_class, hashmap_constructor, map.size());
Expand Down Expand Up @@ -346,16 +347,26 @@ std::map<std::string, std::string> JniUtil::convert_to_cpp_map(JNIEnv* env, jobj

Status JniUtil::GetGlobalClassRef(JNIEnv* env, const char* class_str, jclass* class_ref) {
*class_ref = NULL;
jclass local_cl = env->FindClass(class_str);
RETURN_ERROR_IF_EXC(env);
JNI_CALL_METHOD_CHECK_EXCEPTION_DELETE_REF(jclass, local_cl, env, FindClass(class_str));
RETURN_IF_ERROR(LocalToGlobalRef(env, local_cl, reinterpret_cast<jobject*>(class_ref)));
env->DeleteLocalRef(local_cl);
RETURN_ERROR_IF_EXC(env);
return Status::OK();
}

Status JniUtil::LocalToGlobalRef(JNIEnv* env, jobject local_ref, jobject* global_ref) {
*global_ref = env->NewGlobalRef(local_ref);
// NewGlobalRef:
// Returns a global reference to the given obj.
//
//May return NULL if:
// obj refers to null
// the system has run out of memory
// obj was a weak global reference and has already been garbage collected
if (*global_ref == NULL) {
return Status::InternalError(
"LocalToGlobalRef fail,global ref is NULL,maybe the system has run out of memory.");
}

//NewGlobalRef not throw exception,maybe we just need check NULL.
RETURN_ERROR_IF_EXC(env);
return Status::OK();
}
Expand Down
Loading

0 comments on commit bbde859

Please sign in to comment.