diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h index afbe6c77596432..7bde9323e94617 100644 --- a/be/src/pipeline/pipeline.h +++ b/be/src/pipeline/pipeline.h @@ -148,7 +148,6 @@ class Pipeline : public std::enable_shared_from_this { std::vector> _children; PipelineId _pipeline_id; - int _previous_schedule_id = -1; // pipline id + operator names. init when: // build_operators(), if pipeline; diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h index 1a31e5954f479c..4a8909615b42c5 100644 --- a/be/src/pipeline/pipeline_task.h +++ b/be/src/pipeline/pipeline_task.h @@ -67,17 +67,16 @@ class PipelineTask { QueryContext* query_context(); - int get_previous_core_id() const { - return _previous_schedule_id != -1 ? _previous_schedule_id - : _pipeline->_previous_schedule_id; + int get_core_id() const { + return _core_id; } - void set_previous_core_id(int id) { - if (id != _previous_schedule_id) { - if (_previous_schedule_id != -1) { + void set_core_id(int id) { + if (id != _core_id) { + if (_core_id != -1) { COUNTER_UPDATE(_core_change_times, 1); } - _previous_schedule_id = id; + _core_id = id; } } @@ -175,10 +174,6 @@ class PipelineTask { void update_queue_level(int queue_level) { this->_queue_level = queue_level; } int get_queue_level() const { return this->_queue_level; } - // 1.3 priority queue's core id - void set_core_id(int core_id) { this->_core_id = core_id; } - int get_core_id() const { return this->_core_id; } - /** * Return true if: * 1. `enable_force_spill` is true which forces this task to spill data. @@ -254,7 +249,7 @@ class PipelineTask { bool _has_exceed_timeout = false; bool _opened; RuntimeState* _state = nullptr; - int _previous_schedule_id = -1; + int _core_id = -1; uint32_t _schedule_time = 0; std::unique_ptr _block; PipelineFragmentContext* _fragment_context = nullptr; @@ -269,7 +264,6 @@ class PipelineTask { // 2 exe task // 3 update task statistics(update _queue_level/_core_id) int _queue_level = 0; - int _core_id = 0; RuntimeProfile* _parent_profile = nullptr; std::unique_ptr _task_profile; diff --git a/be/src/pipeline/task_queue.cpp b/be/src/pipeline/task_queue.cpp index ea812ca9b12dd6..cb9f4a6dd8f6bb 100644 --- a/be/src/pipeline/task_queue.cpp +++ b/be/src/pipeline/task_queue.cpp @@ -153,7 +153,6 @@ PipelineTask* MultiCoreTaskQueue::take(int core_id) { << " _core_size: " << _core_size << " _next_core: " << _next_core.load(); task = _prio_task_queues[core_id].try_take(false); if (task) { - task->set_core_id(core_id); break; } task = _steal_take(core_id); @@ -162,7 +161,6 @@ PipelineTask* MultiCoreTaskQueue::take(int core_id) { } task = _prio_task_queues[core_id].take(WAIT_CORE_TASK_TIMEOUT_MS /* timeout_ms */); if (task) { - task->set_core_id(core_id); break; } } @@ -183,7 +181,6 @@ PipelineTask* MultiCoreTaskQueue::_steal_take(int core_id) { DCHECK(next_id < _core_size); auto task = _prio_task_queues[next_id].try_take(true); if (task) { - task->set_core_id(next_id); return task; } } @@ -191,7 +188,7 @@ PipelineTask* MultiCoreTaskQueue::_steal_take(int core_id) { } Status MultiCoreTaskQueue::push_back(PipelineTask* task) { - int core_id = task->get_previous_core_id(); + int core_id = task->get_core_id(); if (core_id < 0) { core_id = _next_core.fetch_add(1) % _core_size; } diff --git a/be/src/pipeline/task_scheduler.cpp b/be/src/pipeline/task_scheduler.cpp index 45898e764175b2..a84996266c4beb 100644 --- a/be/src/pipeline/task_scheduler.cpp +++ b/be/src/pipeline/task_scheduler.cpp @@ -33,7 +33,6 @@ #include "common/logging.h" #include "pipeline/pipeline_task.h" -#include "pipeline/task_queue.h" #include "pipeline_fragment_context.h" #include "runtime/exec_env.h" #include "runtime/query_context.h" @@ -103,6 +102,9 @@ void TaskScheduler::_do_work(int index) { if (!task) { continue; } + // The task is already running, maybe block in now dependency wake up by other thread + // but the block thread still hold the task, so put it back to the queue, until the hold + // thread set task->set_running(false) if (task->is_running()) { static_cast(_task_queue.push_back(task, index)); continue; @@ -129,12 +131,8 @@ void TaskScheduler::_do_work(int index) { // task exec bool eos = false; auto status = Status::OK(); + task->set_core_id(index); -#ifdef __APPLE__ - uint32_t core_id = 0; -#else - uint32_t core_id = sched_getcpu(); -#endif ASSIGN_STATUS_IF_CATCH_EXCEPTION( //TODO: use a better enclose to abstracting these if (ExecEnv::GetInstance()->pipeline_tracer_context()->enabled()) { @@ -149,12 +147,10 @@ void TaskScheduler::_do_work(int index) { uint64_t end_time = MonotonicMicros(); ExecEnv::GetInstance()->pipeline_tracer_context()->record( - {query_id, task_name, core_id, thread_id, start_time, end_time}); + {query_id, task_name, static_cast(index), thread_id, start_time, end_time}); } else { status = task->execute(&eos); }, status); - task->set_previous_core_id(index); - if (!status.ok()) { // Print detail informations below when you debugging here. // @@ -173,14 +169,11 @@ void TaskScheduler::_do_work(int index) { if (eos) { // is pending finish will add the task to dependency's blocking queue, and then the task will be // added to running queue when dependency is ready. - if (task->is_pending_finish()) { - // Only meet eos, should set task to PENDING_FINISH state - task->set_running(false); - } else { + if (!task->is_pending_finish()) { Status exec_status = fragment_ctx->get_query_ctx()->exec_status(); _close_task(task, exec_status); + continue; } - continue; } task->set_running(false); diff --git a/be/src/vec/runtime/vdatetime_value.cpp b/be/src/vec/runtime/vdatetime_value.cpp index 026648319d4be4..e4306f7fa3ef11 100644 --- a/be/src/vec/runtime/vdatetime_value.cpp +++ b/be/src/vec/runtime/vdatetime_value.cpp @@ -2845,40 +2845,6 @@ int date_day_offset_dict::daynr(int year, int month, int day) const { return DATE_DAY_OFFSET_DICT[year - START_YEAR][month - 1][day - 1]; } -template -uint32_t DateV2Value::set_date_uint32(uint32_t int_val) { - union DateV2UInt32Union { - DateV2Value dt; - uint32_t ui32; - ~DateV2UInt32Union() {} - }; - DateV2UInt32Union conv = {.ui32 = int_val}; - if (is_invalid(conv.dt.year(), conv.dt.month(), conv.dt.day(), 0, 0, 0, 0)) { - return 0; - } - this->unchecked_set_time(conv.dt.year(), conv.dt.month(), conv.dt.day(), 0, 0, 0, 0); - - return int_val; -} - -template -uint64_t DateV2Value::set_datetime_uint64(uint64_t int_val) { - union DateTimeV2UInt64Union { - DateV2Value dt; - uint64_t ui64; - ~DateTimeV2UInt64Union() {} - }; - DateTimeV2UInt64Union conv = {.ui64 = int_val}; - if (is_invalid(conv.dt.year(), conv.dt.month(), conv.dt.day(), conv.dt.hour(), conv.dt.minute(), - conv.dt.second(), conv.dt.microsecond())) { - return 0; - } - this->unchecked_set_time(conv.dt.year(), conv.dt.month(), conv.dt.day(), conv.dt.hour(), - conv.dt.minute(), conv.dt.second(), conv.dt.microsecond()); - - return int_val; -} - template uint8_t DateV2Value::week(uint8_t mode) const { uint16_t year = 0; @@ -3707,26 +3673,6 @@ bool DateV2Value::to_format_string_conservative(const char* format, size_t le return true; } -template -bool DateV2Value::from_date(uint32_t value) { - DCHECK(!is_datetime); - if (value < MIN_DATE_V2 || value > MAX_DATE_V2) { - return false; - } - - return set_date_uint32(value); -} - -template -bool DateV2Value::from_datetime(uint64_t value) { - DCHECK(is_datetime); - if (value < MIN_DATETIME_V2 || value > MAX_DATETIME_V2) { - return false; - } - - return set_datetime_uint64(value); -} - template int64_t DateV2Value::standardize_timevalue(int64_t value) { if (value <= 0) { diff --git a/be/src/vec/runtime/vdatetime_value.h b/be/src/vec/runtime/vdatetime_value.h index cfe9a368e83d4b..78b01d0ed00061 100644 --- a/be/src/vec/runtime/vdatetime_value.h +++ b/be/src/vec/runtime/vdatetime_value.h @@ -1169,12 +1169,7 @@ class DateV2Value { underlying_value to_date_int_val() const { return int_val_; } - bool from_date(uint32_t value); - bool from_datetime(uint64_t value); - bool from_date_int64(int64_t value); - uint32_t set_date_uint32(uint32_t int_val); - uint64_t set_datetime_uint64(uint64_t int_val); bool get_date_from_daynr(uint64_t);