Skip to content

Commit

Permalink
[Refactor](exec) Remove unless code and add comment
Browse files Browse the repository at this point in the history
  • Loading branch information
HappenLee committed Jan 7, 2025
1 parent 4c40b4e commit f9b4747
Show file tree
Hide file tree
Showing 6 changed files with 15 additions and 91 deletions.
1 change: 0 additions & 1 deletion be/src/pipeline/pipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,6 @@ class Pipeline : public std::enable_shared_from_this<Pipeline> {
std::vector<std::shared_ptr<Pipeline>> _children;

PipelineId _pipeline_id;
int _previous_schedule_id = -1;

// pipline id + operator names. init when:
// build_operators(), if pipeline;
Expand Down
20 changes: 7 additions & 13 deletions be/src/pipeline/pipeline_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,17 +67,16 @@ class PipelineTask {

QueryContext* query_context();

int get_previous_core_id() const {
return _previous_schedule_id != -1 ? _previous_schedule_id
: _pipeline->_previous_schedule_id;
int get_core_id() const {
return _core_id;
}

void set_previous_core_id(int id) {
if (id != _previous_schedule_id) {
if (_previous_schedule_id != -1) {
void set_core_id(int id) {
if (id != _core_id) {
if (_core_id != -1) {
COUNTER_UPDATE(_core_change_times, 1);
}
_previous_schedule_id = id;
_core_id = id;
}
}

Expand Down Expand Up @@ -175,10 +174,6 @@ class PipelineTask {
void update_queue_level(int queue_level) { this->_queue_level = queue_level; }
int get_queue_level() const { return this->_queue_level; }

// 1.3 priority queue's core id
void set_core_id(int core_id) { this->_core_id = core_id; }
int get_core_id() const { return this->_core_id; }

/**
* Return true if:
* 1. `enable_force_spill` is true which forces this task to spill data.
Expand Down Expand Up @@ -254,7 +249,7 @@ class PipelineTask {
bool _has_exceed_timeout = false;
bool _opened;
RuntimeState* _state = nullptr;
int _previous_schedule_id = -1;
int _core_id = -1;
uint32_t _schedule_time = 0;
std::unique_ptr<doris::vectorized::Block> _block;
PipelineFragmentContext* _fragment_context = nullptr;
Expand All @@ -269,7 +264,6 @@ class PipelineTask {
// 2 exe task
// 3 update task statistics(update _queue_level/_core_id)
int _queue_level = 0;
int _core_id = 0;

RuntimeProfile* _parent_profile = nullptr;
std::unique_ptr<RuntimeProfile> _task_profile;
Expand Down
5 changes: 1 addition & 4 deletions be/src/pipeline/task_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,6 @@ PipelineTask* MultiCoreTaskQueue::take(int core_id) {
<< " _core_size: " << _core_size << " _next_core: " << _next_core.load();
task = _prio_task_queues[core_id].try_take(false);
if (task) {
task->set_core_id(core_id);
break;
}
task = _steal_take(core_id);
Expand All @@ -162,7 +161,6 @@ PipelineTask* MultiCoreTaskQueue::take(int core_id) {
}
task = _prio_task_queues[core_id].take(WAIT_CORE_TASK_TIMEOUT_MS /* timeout_ms */);
if (task) {
task->set_core_id(core_id);
break;
}
}
Expand All @@ -183,15 +181,14 @@ PipelineTask* MultiCoreTaskQueue::_steal_take(int core_id) {
DCHECK(next_id < _core_size);
auto task = _prio_task_queues[next_id].try_take(true);
if (task) {
task->set_core_id(next_id);
return task;
}
}
return nullptr;
}

Status MultiCoreTaskQueue::push_back(PipelineTask* task) {
int core_id = task->get_previous_core_id();
int core_id = task->get_core_id();
if (core_id < 0) {
core_id = _next_core.fetch_add(1) % _core_size;
}
Expand Down
21 changes: 7 additions & 14 deletions be/src/pipeline/task_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@

#include "common/logging.h"
#include "pipeline/pipeline_task.h"
#include "pipeline/task_queue.h"
#include "pipeline_fragment_context.h"
#include "runtime/exec_env.h"
#include "runtime/query_context.h"
Expand Down Expand Up @@ -103,6 +102,9 @@ void TaskScheduler::_do_work(int index) {
if (!task) {
continue;
}
// The task is already running, maybe block in now dependency wake up by other thread
// but the block thread still hold the task, so put it back to the queue, until the hold
// thread set task->set_running(false)
if (task->is_running()) {
static_cast<void>(_task_queue.push_back(task, index));
continue;
Expand All @@ -129,12 +131,8 @@ void TaskScheduler::_do_work(int index) {
// task exec
bool eos = false;
auto status = Status::OK();
task->set_core_id(index);

#ifdef __APPLE__
uint32_t core_id = 0;
#else
uint32_t core_id = sched_getcpu();
#endif
ASSIGN_STATUS_IF_CATCH_EXCEPTION(
//TODO: use a better enclose to abstracting these
if (ExecEnv::GetInstance()->pipeline_tracer_context()->enabled()) {
Expand All @@ -149,12 +147,10 @@ void TaskScheduler::_do_work(int index) {

uint64_t end_time = MonotonicMicros();
ExecEnv::GetInstance()->pipeline_tracer_context()->record(
{query_id, task_name, core_id, thread_id, start_time, end_time});
{query_id, task_name, static_cast<uint32_t>(index), thread_id, start_time, end_time});
} else { status = task->execute(&eos); },
status);

task->set_previous_core_id(index);

if (!status.ok()) {
// Print detail informations below when you debugging here.
//
Expand All @@ -173,14 +169,11 @@ void TaskScheduler::_do_work(int index) {
if (eos) {
// is pending finish will add the task to dependency's blocking queue, and then the task will be
// added to running queue when dependency is ready.
if (task->is_pending_finish()) {
// Only meet eos, should set task to PENDING_FINISH state
task->set_running(false);
} else {
if (!task->is_pending_finish()) {
Status exec_status = fragment_ctx->get_query_ctx()->exec_status();
_close_task(task, exec_status);
continue;
}
continue;
}

task->set_running(false);
Expand Down
54 changes: 0 additions & 54 deletions be/src/vec/runtime/vdatetime_value.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2845,40 +2845,6 @@ int date_day_offset_dict::daynr(int year, int month, int day) const {
return DATE_DAY_OFFSET_DICT[year - START_YEAR][month - 1][day - 1];
}

template <typename T>
uint32_t DateV2Value<T>::set_date_uint32(uint32_t int_val) {
union DateV2UInt32Union {
DateV2Value<T> dt;
uint32_t ui32;
~DateV2UInt32Union() {}
};
DateV2UInt32Union conv = {.ui32 = int_val};
if (is_invalid(conv.dt.year(), conv.dt.month(), conv.dt.day(), 0, 0, 0, 0)) {
return 0;
}
this->unchecked_set_time(conv.dt.year(), conv.dt.month(), conv.dt.day(), 0, 0, 0, 0);

return int_val;
}

template <typename T>
uint64_t DateV2Value<T>::set_datetime_uint64(uint64_t int_val) {
union DateTimeV2UInt64Union {
DateV2Value<T> dt;
uint64_t ui64;
~DateTimeV2UInt64Union() {}
};
DateTimeV2UInt64Union conv = {.ui64 = int_val};
if (is_invalid(conv.dt.year(), conv.dt.month(), conv.dt.day(), conv.dt.hour(), conv.dt.minute(),
conv.dt.second(), conv.dt.microsecond())) {
return 0;
}
this->unchecked_set_time(conv.dt.year(), conv.dt.month(), conv.dt.day(), conv.dt.hour(),
conv.dt.minute(), conv.dt.second(), conv.dt.microsecond());

return int_val;
}

template <typename T>
uint8_t DateV2Value<T>::week(uint8_t mode) const {
uint16_t year = 0;
Expand Down Expand Up @@ -3707,26 +3673,6 @@ bool DateV2Value<T>::to_format_string_conservative(const char* format, size_t le
return true;
}

template <typename T>
bool DateV2Value<T>::from_date(uint32_t value) {
DCHECK(!is_datetime);
if (value < MIN_DATE_V2 || value > MAX_DATE_V2) {
return false;
}

return set_date_uint32(value);
}

template <typename T>
bool DateV2Value<T>::from_datetime(uint64_t value) {
DCHECK(is_datetime);
if (value < MIN_DATETIME_V2 || value > MAX_DATETIME_V2) {
return false;
}

return set_datetime_uint64(value);
}

template <typename T>
int64_t DateV2Value<T>::standardize_timevalue(int64_t value) {
if (value <= 0) {
Expand Down
5 changes: 0 additions & 5 deletions be/src/vec/runtime/vdatetime_value.h
Original file line number Diff line number Diff line change
Expand Up @@ -1169,12 +1169,7 @@ class DateV2Value {

underlying_value to_date_int_val() const { return int_val_; }

bool from_date(uint32_t value);
bool from_datetime(uint64_t value);

bool from_date_int64(int64_t value);
uint32_t set_date_uint32(uint32_t int_val);
uint64_t set_datetime_uint64(uint64_t int_val);

bool get_date_from_daynr(uint64_t);

Expand Down

0 comments on commit f9b4747

Please sign in to comment.