Skip to content

Commit

Permalink
WorkerThreadPool: Add safety point between languages finished and poo…
Browse files Browse the repository at this point in the history
…l termination
  • Loading branch information
RandomShaper committed Sep 16, 2024
1 parent 1bd039d commit 3e06192
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 29 deletions.
108 changes: 83 additions & 25 deletions core/object/worker_thread_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ void WorkerThreadPool::_thread_function(void *p_user) {
{
MutexLock lock(singleton->task_mutex);

bool exit = singleton->_handle_runlevel();
bool exit = singleton->_handle_runlevel(thread_data, lock);
if (exit) {
break;
}
Expand All @@ -198,7 +198,7 @@ void WorkerThreadPool::_thread_function(void *p_user) {
singleton->task_queue.remove(singleton->task_queue.first());
} else {
thread_data->cond_var.wait(lock);
DEV_ASSERT(singleton->_check_runlevel_needs_handling() || thread_data->signaled);
DEV_ASSERT(singleton->_check_runlevel_needs_handling(thread_data) || thread_data->signaled);
}
}

Expand All @@ -208,19 +208,24 @@ void WorkerThreadPool::_thread_function(void *p_user) {
}
}

void WorkerThreadPool::_post_tasks_and_unlock(Task **p_tasks, uint32_t p_count, bool p_high_priority) {
void WorkerThreadPool::_post_tasks(Task **p_tasks, uint32_t p_count, bool p_high_priority, MutexLock<BinaryMutex> &p_lock) {
// Fall back to processing on the calling thread if there are no worker threads.
// Separated into its own variable to make it easier to extend this logic
// in custom builds.
bool process_on_calling_thread = threads.size() == 0;
if (process_on_calling_thread) {
task_mutex.unlock();
p_lock.temp_unlock();
for (uint32_t i = 0; i < p_count; i++) {
_process_task(p_tasks[i]);
}
p_lock.temp_relock();
return;
}

while (runlevel == RUNLEVEL_EXIT_LANGUAGES) {
control_cond_var.wait(p_lock);
}

uint32_t to_process = 0;
uint32_t to_promote = 0;

Expand All @@ -242,8 +247,6 @@ void WorkerThreadPool::_post_tasks_and_unlock(Task **p_tasks, uint32_t p_count,
}

_notify_threads(caller_pool_thread, to_process, to_promote);

task_mutex.unlock();
}

void WorkerThreadPool::_notify_threads(const ThreadData *p_current_thread_data, uint32_t p_process_count, uint32_t p_promote_count) {
Expand Down Expand Up @@ -327,7 +330,8 @@ WorkerThreadPool::TaskID WorkerThreadPool::add_native_task(void (*p_func)(void *
}

WorkerThreadPool::TaskID WorkerThreadPool::_add_task(const Callable &p_callable, void (*p_func)(void *), void *p_userdata, BaseTemplateUserdata *p_template_userdata, bool p_high_priority, const String &p_description) {
task_mutex.lock();
MutexLock<BinaryMutex> lock(task_mutex);

// Get a free task
Task *task = task_allocator.alloc();
TaskID id = last_task++;
Expand All @@ -339,7 +343,7 @@ WorkerThreadPool::TaskID WorkerThreadPool::_add_task(const Callable &p_callable,
task->template_userdata = p_template_userdata;
tasks.insert(id, task);

_post_tasks_and_unlock(&task, 1, p_high_priority);
_post_tasks(&task, 1, p_high_priority, lock);

return id;
}
Expand Down Expand Up @@ -458,7 +462,7 @@ void WorkerThreadPool::_wait_collaboratively(ThreadData *p_caller_pool_thread, T
bool was_signaled = p_caller_pool_thread->signaled;
p_caller_pool_thread->signaled = false;

bool exit = _handle_runlevel();
bool exit = _handle_runlevel(p_caller_pool_thread, lock);
if (unlikely(exit)) {
break;
}
Expand Down Expand Up @@ -501,7 +505,7 @@ void WorkerThreadPool::_wait_collaboratively(ThreadData *p_caller_pool_thread, T
relock_unlockables = true;
p_caller_pool_thread->cond_var.wait(lock);

DEV_ASSERT(_check_runlevel_needs_handling() || _is_wait_over() || p_caller_pool_thread->signaled);
DEV_ASSERT(_check_runlevel_needs_handling(p_caller_pool_thread) || _is_wait_over() || p_caller_pool_thread->signaled);
p_caller_pool_thread->awaited_task = nullptr;
}
}
Expand All @@ -519,45 +523,78 @@ void WorkerThreadPool::_wait_collaboratively(ThreadData *p_caller_pool_thread, T
void WorkerThreadPool::_switch_runlevel(Runlevel p_runlevel) {
DEV_ASSERT(p_runlevel > runlevel);
runlevel = p_runlevel;
memset(&runlevel_data, 0, sizeof(runlevel_data));
for (uint32_t i = 0; i < threads.size(); i++) {
threads[i].cond_var.notify_one();
}
control_cond_var.notify_all();
}

#ifdef DEV_ENABLED
// Returns whether _handle_runlevel() should be called. Important for correctness verification.
bool WorkerThreadPool::_check_runlevel_needs_handling() {
bool WorkerThreadPool::_check_runlevel_needs_handling(const ThreadData *p_thread_data) {
switch (runlevel) {
case RUNLEVEL_NORMAL:
return false;
case RUNLEVEL_PRE_EXIT_LANGUAGES:
return !p_thread_data->pre_exited_languages;
case RUNLEVEL_EXIT_LANGUAGES:
return !p_thread_data->exited_languages;
case RUNLEVEL_EXIT:
return true;
}
CRASH_NOW();
}
#endif
}

Check failure on line 548 in core/object/worker_thread_pool.cpp

View workflow job for this annotation

GitHub Actions / 🐧 Linux / Editor w/ Mono (target=editor)

expected declaration before '}' token

Check failure on line 548 in core/object/worker_thread_pool.cpp

View workflow job for this annotation

GitHub Actions / 🐧 Linux / Template w/ Mono (target=template_release)

expected declaration before '}' token

Check failure on line 548 in core/object/worker_thread_pool.cpp

View workflow job for this annotation

GitHub Actions / 🐧 Linux / Minimal template (target=template_release, everything disabled)

expected declaration before '}' token

Check failure on line 548 in core/object/worker_thread_pool.cpp

View workflow job for this annotation

GitHub Actions / 🏁 Windows / Editor (target=editor, tests=yes)

syntax error: '}'

Check failure on line 548 in core/object/worker_thread_pool.cpp

View workflow job for this annotation

GitHub Actions / 🏁 Windows / Editor (target=editor, tests=yes)

syntax error: missing ';' before '}'

Check failure on line 548 in core/object/worker_thread_pool.cpp

View workflow job for this annotation

GitHub Actions / 🏁 Windows / Template (target=template_release)

syntax error: '}'

Check failure on line 548 in core/object/worker_thread_pool.cpp

View workflow job for this annotation

GitHub Actions / 🏁 Windows / Template (target=template_release)

syntax error: missing ';' before '}'

// Returns whether threads have to exit. This may perform the check about handling needed.
bool WorkerThreadPool::_handle_runlevel() {
bool WorkerThreadPool::_handle_runlevel(ThreadData *p_thread_data, MutexLock<BinaryMutex> &p_lock) {

Check failure on line 551 in core/object/worker_thread_pool.cpp

View workflow job for this annotation

GitHub Actions / 🏁 Windows / Editor (target=editor, tests=yes)

syntax error: missing ';' before '{'

Check failure on line 551 in core/object/worker_thread_pool.cpp

View workflow job for this annotation

GitHub Actions / 🏁 Windows / Editor (target=editor, tests=yes)

'{': missing function header (old-style formal list?)

Check failure on line 551 in core/object/worker_thread_pool.cpp

View workflow job for this annotation

GitHub Actions / 🏁 Windows / Template (target=template_release)

syntax error: missing ';' before '{'

Check failure on line 551 in core/object/worker_thread_pool.cpp

View workflow job for this annotation

GitHub Actions / 🏁 Windows / Template (target=template_release)

'{': missing function header (old-style formal list?)
bool exit = false;
switch (runlevel) {
case RUNLEVEL_NORMAL:
return false;
case RUNLEVEL_EXIT:
return true;
case RUNLEVEL_NORMAL: {
} break;
case RUNLEVEL_PRE_EXIT_LANGUAGES: {
if (!p_thread_data->pre_exited_languages) {
if (!task_queue.first() && !low_priority_task_queue.first()) {
p_thread_data->pre_exited_languages = true;
runlevel_data.pre_exit_languages.num_idle_threads++;
control_cond_var.notify_all();
}
}
} break;
case RUNLEVEL_EXIT_LANGUAGES: {
if (!p_thread_data->exited_languages) {
p_lock.temp_unlock();
ScriptServer::thread_exit();
p_lock.temp_relock();
p_thread_data->exited_languages = true;
runlevel_data.exit_languages.num_exited_threads++;
control_cond_var.notify_all();
}
} break;
case RUNLEVEL_EXIT: {
exit = true;
} break;
}
CRASH_NOW();
return exit;
}

void WorkerThreadPool::yield() {
int th_index = get_thread_index();
ERR_FAIL_COND_MSG(th_index == -1, "This function can only be called from a worker thread.");
_wait_collaboratively(&threads[th_index], ThreadData::YIELDING);

// If this long-lived task started before the scripting server was initialized,
// now is a good time to have scripting languages ready for the current thread.
// Otherwise, such a piece of setup won't happen unless another task has been
// run during the collaborative wait.
ScriptServer::thread_enter();
task_mutex.lock();
if (runlevel < RUNLEVEL_EXIT_LANGUAGES) {
// If this long-lived task started before the scripting server was initialized,
// now is a good time to have scripting languages ready for the current thread.
// Otherwise, such a piece of setup won't happen unless another task has been
// run during the collaborative wait.
task_mutex.unlock();
ScriptServer::thread_enter();
} else {
task_mutex.unlock();
}
}

void WorkerThreadPool::notify_yield_over(TaskID p_task_id) {
Expand Down Expand Up @@ -587,7 +624,8 @@ WorkerThreadPool::GroupID WorkerThreadPool::_add_group_task(const Callable &p_ca
p_tasks = MAX(1u, threads.size());
}

task_mutex.lock();
MutexLock<BinaryMutex> lock(task_mutex);

Group *group = group_allocator.alloc();
GroupID id = last_task++;
group->max = p_elements;
Expand Down Expand Up @@ -622,7 +660,7 @@ WorkerThreadPool::GroupID WorkerThreadPool::_add_group_task(const Callable &p_ca

groups[id] = group;

_post_tasks_and_unlock(tasks_posted, p_tasks, p_high_priority);
_post_tasks(tasks_posted, p_tasks, p_high_priority, lock);

return id;
}
Expand Down Expand Up @@ -745,6 +783,26 @@ void WorkerThreadPool::init(int p_thread_count, float p_low_priority_task_ratio)
}
}

void WorkerThreadPool::exit_languages_threads() {
if (threads.size() == 0) {
return;
}

MutexLock lock(task_mutex);

// Wait until all threads are idle.
_switch_runlevel(RUNLEVEL_PRE_EXIT_LANGUAGES);
while (runlevel_data.pre_exit_languages.num_idle_threads != threads.size()) {
control_cond_var.wait(lock);
}

// Wait until all threads have detached from scripting languages.
_switch_runlevel(RUNLEVEL_EXIT_LANGUAGES);
while (runlevel_data.exit_languages.num_exited_threads != threads.size()) {
control_cond_var.wait(lock);
}
}

void WorkerThreadPool::finish() {
if (threads.size() == 0) {
return;
Expand Down
23 changes: 19 additions & 4 deletions core/object/worker_thread_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,20 +114,34 @@ class WorkerThreadPool : public Object {
Thread thread;
bool signaled : 1;
bool yield_is_over : 1;
bool pre_exited_languages : 1;
bool exited_languages : 1;
Task *current_task = nullptr;
Task *awaited_task = nullptr; // Null if not awaiting the condition variable, or special value (YIELDING).
ConditionVariable cond_var;

ThreadData() :
signaled(false),
yield_is_over(false) {}
yield_is_over(false),
exited_languages(false) {}
};

TightLocalVector<ThreadData> threads;
enum Runlevel {
RUNLEVEL_NORMAL,
RUNLEVEL_PRE_EXIT_LANGUAGES, // Block adding new tasks
RUNLEVEL_EXIT_LANGUAGES, // All threads detach from scripting threads.
RUNLEVEL_EXIT,
} runlevel = RUNLEVEL_NORMAL;
union { // Cleared on every runlevel change.
struct {
uint32_t num_idle_threads;
} pre_exit_languages;
struct {
uint32_t num_exited_threads;
} exit_languages;
} runlevel_data;
ConditionVariable control_cond_var;

HashMap<Thread::ID, int> thread_ids;
HashMap<
Expand Down Expand Up @@ -155,7 +169,7 @@ class WorkerThreadPool : public Object {

void _process_task(Task *task);

void _post_tasks_and_unlock(Task **p_tasks, uint32_t p_count, bool p_high_priority);
void _post_tasks(Task **p_tasks, uint32_t p_count, bool p_high_priority, MutexLock<BinaryMutex> &p_lock);
void _notify_threads(const ThreadData *p_current_thread_data, uint32_t p_process_count, uint32_t p_promote_count);

bool _try_promote_low_priority_task();
Expand Down Expand Up @@ -198,9 +212,9 @@ class WorkerThreadPool : public Object {

void _switch_runlevel(Runlevel p_runlevel);
#ifdef DEV_ENABLED
bool _check_runlevel_needs_handling();
bool _check_runlevel_needs_handling(const ThreadData *p_thread_data);
#endif
bool _handle_runlevel();
bool _handle_runlevel(ThreadData *p_thread_data, MutexLock<BinaryMutex> &p_lock);

#ifdef THREADS_ENABLED
static uint32_t _thread_enter_unlock_allowance_zone(THREADING_NAMESPACE::unique_lock<THREADING_NAMESPACE::mutex> &p_ulock);
Expand Down Expand Up @@ -265,6 +279,7 @@ class WorkerThreadPool : public Object {
#endif

void init(int p_thread_count = -1, float p_low_priority_task_ratio = 0.3);
void exit_languages_threads();
void finish();
WorkerThreadPool();
~WorkerThreadPool();
Expand Down
2 changes: 2 additions & 0 deletions main/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4501,6 +4501,8 @@ void Main::cleanup(bool p_force) {
ResourceLoader::clear_translation_remaps();
ResourceLoader::clear_path_remaps();

WorkerThreadPool::get_singleton()->exit_languages_threads();

ScriptServer::finish_languages();

// Sync pending commands that may have been queued from a different thread during ScriptServer finalization
Expand Down

0 comments on commit 3e06192

Please sign in to comment.