Skip to content

Commit

Permalink
ResourceLoader: Add thread-aware resource changed mechanism
Browse files Browse the repository at this point in the history
  • Loading branch information
RandomShaper committed Sep 5, 2024
1 parent fd426da commit d3ff1b8
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 11 deletions.
23 changes: 12 additions & 11 deletions core/io/resource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,12 @@
#include <stdio.h>

void Resource::emit_changed() {
if (ResourceLoader::is_within_load() && MessageQueue::get_main_singleton() != MessageQueue::get_singleton() && !MessageQueue::get_singleton()->is_flushing()) {
// Let the connection happen on the call queue, later, since signals are not thread-safe.
call_deferred("emit_signal", CoreStringName(changed));
} else {
emit_signal(CoreStringName(changed));
if (ResourceLoader::is_within_load() && !Thread::is_main_thread()) {
ResourceLoader::resource_changed_emit(this);
return;
}

emit_signal(CoreStringName(changed));
}

void Resource::_resource_path_changed() {
Expand Down Expand Up @@ -166,22 +166,23 @@ bool Resource::editor_can_reload_from_file() {
}

void Resource::connect_changed(const Callable &p_callable, uint32_t p_flags) {
if (ResourceLoader::is_within_load() && MessageQueue::get_main_singleton() != MessageQueue::get_singleton() && !MessageQueue::get_singleton()->is_flushing()) {
// Let the check and connection happen on the call queue, later, since signals are not thread-safe.
callable_mp(this, &Resource::connect_changed).call_deferred(p_callable, p_flags);
if (ResourceLoader::is_within_load() && !Thread::is_main_thread()) {
ERR_FAIL_COND_MSG(p_flags != 0, "Resource changed signaling doesn't support flags during load.");
ResourceLoader::resource_changed_connect(this, p_callable);
return;
}

if (!is_connected(CoreStringName(changed), p_callable) || p_flags & CONNECT_REFERENCE_COUNTED) {
connect(CoreStringName(changed), p_callable, p_flags);
}
}

void Resource::disconnect_changed(const Callable &p_callable) {
if (ResourceLoader::is_within_load() && MessageQueue::get_main_singleton() != MessageQueue::get_singleton() && !MessageQueue::get_singleton()->is_flushing()) {
// Let the check and disconnection happen on the call queue, later, since signals are not thread-safe.
callable_mp(this, &Resource::disconnect_changed).call_deferred(p_callable);
if (ResourceLoader::is_within_load() && !Thread::is_main_thread()) {
ResourceLoader::resource_changed_disconnect(this, p_callable);
return;
}

if (is_connected(CoreStringName(changed), p_callable)) {
disconnect(CoreStringName(changed), p_callable);
}
Expand Down
82 changes: 82 additions & 0 deletions core/io/resource_loader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "resource_loader.h"

#include "core/config/project_settings.h"
#include "core/core_bind.h"
#include "core/io/file_access.h"
#include "core/io/resource_importer.h"
#include "core/object/script_language.h"
Expand Down Expand Up @@ -323,6 +324,8 @@ Ref<Resource> ResourceLoader::_load(const String &p_path, const String &p_origin
// This implementation must allow re-entrancy for a task that started awaiting in a deeper stack frame.
void ResourceLoader::_run_load_task(void *p_userdata) {
ThreadLoadTask &load_task = *(ThreadLoadTask *)p_userdata;
ThreadLoadTask *curr_load_task_backup = curr_load_task;
curr_load_task = &load_task;

thread_load_mutex.lock();
if (cleaning_tasks) {
Expand Down Expand Up @@ -459,6 +462,8 @@ void ResourceLoader::_run_load_task(void *p_userdata) {
}
DEV_ASSERT(load_paths_stack.is_empty());
}

curr_load_task = curr_load_task_backup;
}

static String _validate_local_path(const String &p_path) {
Expand Down Expand Up @@ -814,6 +819,39 @@ Ref<Resource> ResourceLoader::_load_complete_inner(LoadToken &p_load_token, Erro
if (r_error) {
*r_error = load_task_ptr->error;
}

if (resource.is_valid()) {
if (curr_load_task) {
// A task awaiting another => Let the awaiter accumulate the resource changed connections.
DEV_ASSERT(curr_load_task != load_task_ptr);
for (const ThreadLoadTask::ResourceChangedConnection &rcc : load_task_ptr->resource_changed_connections) {
curr_load_task->resource_changed_connections.push_back(rcc);
}
} else {
// A leaf task being awaited => Propagate the resource changed connections.
if (Thread::is_main_thread()) {
// On the main thread it's safe to migrate the connections to the standard signal mechanism.
for (const ThreadLoadTask::ResourceChangedConnection &rcc : load_task_ptr->resource_changed_connections) {
if (rcc.callable.is_valid()) {
rcc.source->connect_changed(rcc.callable);
}
}
} else {
// On non-main threads, we have to queue and call it done when processed.
if (!load_task_ptr->resource_changed_connections.is_empty()) {
for (const ThreadLoadTask::ResourceChangedConnection &rcc : load_task_ptr->resource_changed_connections) {
if (rcc.callable.is_valid()) {
MessageQueue::get_main_singleton()->push_callable(callable_mp(rcc.source, &Resource::connect_changed).bind(rcc.callable));
}
}
core_bind::Semaphore done;
MessageQueue::get_main_singleton()->push_callable(callable_mp(&done, &core_bind::Semaphore::post));
done.wait();
}
}
}
}

return resource;
}

Expand All @@ -828,6 +866,49 @@ bool ResourceLoader::_ensure_load_progress() {
return true;
}

void ResourceLoader::resource_changed_connect(Resource *p_source, const Callable &p_callable) {
print_lt(vformat("%d\t%ud:%s\t" FUNCTION_STR "\t%d", Thread::get_caller_id(), p_source->get_instance_id(), p_source->get_class(), p_callable.get_object_id()));

Check failure on line 870 in core/io/resource_loader.cpp

View workflow job for this annotation

GitHub Actions / 🐧 Linux / Editor with clang sanitizers (target=editor, tests=yes, dev_build=yes, use_asan=yes, use_ubsan=yes, use_llvm=yes, linker=lld)

expected ')'

Check failure on line 870 in core/io/resource_loader.cpp

View workflow job for this annotation

GitHub Actions / 🐧 Linux / Editor with ThreadSanitizer (target=editor, tests=yes, dev_build=yes, use_tsan=yes, use_llvm=yes, linker=lld)

expected ')'

MutexLock lock(thread_load_mutex);

for (const ThreadLoadTask::ResourceChangedConnection &rcc : curr_load_task->resource_changed_connections) {
if (unlikely(rcc.source == p_source && rcc.callable == p_callable)) {
return;
}
}

ThreadLoadTask::ResourceChangedConnection rcc;
rcc.source = p_source;
rcc.callable = p_callable;
curr_load_task->resource_changed_connections.push_back(rcc);
}

void ResourceLoader::resource_changed_disconnect(Resource *p_source, const Callable &p_callable) {
print_lt(vformat("%d\t%ud:%s\t" FUNCTION_STR "t%d", Thread::get_caller_id(), p_source->get_instance_id(), p_source->get_class(), p_callable.get_object_id()));

Check failure on line 887 in core/io/resource_loader.cpp

View workflow job for this annotation

GitHub Actions / 🐧 Linux / Editor with clang sanitizers (target=editor, tests=yes, dev_build=yes, use_asan=yes, use_ubsan=yes, use_llvm=yes, linker=lld)

expected ')'

Check failure on line 887 in core/io/resource_loader.cpp

View workflow job for this annotation

GitHub Actions / 🐧 Linux / Editor with ThreadSanitizer (target=editor, tests=yes, dev_build=yes, use_tsan=yes, use_llvm=yes, linker=lld)

expected ')'

MutexLock lock(thread_load_mutex);

for (uint32_t i = 0; i < curr_load_task->resource_changed_connections.size(); ++i) {
const ThreadLoadTask::ResourceChangedConnection &rcc = curr_load_task->resource_changed_connections[i];
if (unlikely(rcc.source == p_source && rcc.callable == p_callable)) {
curr_load_task->resource_changed_connections.remove_at_unordered(i);
return;
}
}
}

void ResourceLoader::resource_changed_emit(Resource *p_source) {
print_lt(vformat("%d\t%ud:%s\t" FUNCTION_STR, Thread::get_caller_id(), p_source->get_instance_id(), p_source->get_class()));

Check failure on line 901 in core/io/resource_loader.cpp

View workflow job for this annotation

GitHub Actions / 🐧 Linux / Editor with clang sanitizers (target=editor, tests=yes, dev_build=yes, use_asan=yes, use_ubsan=yes, use_llvm=yes, linker=lld)

expected ')'

Check failure on line 901 in core/io/resource_loader.cpp

View workflow job for this annotation

GitHub Actions / 🐧 Linux / Editor with ThreadSanitizer (target=editor, tests=yes, dev_build=yes, use_tsan=yes, use_llvm=yes, linker=lld)

expected ')'

MutexLock lock(thread_load_mutex);

for (const ThreadLoadTask::ResourceChangedConnection &rcc : curr_load_task->resource_changed_connections) {
if (unlikely(rcc.source == p_source)) {
rcc.callable.call();
}
}
}

Ref<Resource> ResourceLoader::ensure_resource_ref_override_for_outer_load(const String &p_path, const String &p_res_type) {
ERR_FAIL_COND_V(load_nesting == 0, Ref<Resource>()); // It makes no sense to use this from nesting level 0.
const String &local_path = _validate_local_path(p_path);
Expand Down Expand Up @@ -1358,6 +1439,7 @@ bool ResourceLoader::timestamp_on_load = false;
thread_local int ResourceLoader::load_nesting = 0;
thread_local Vector<String> ResourceLoader::load_paths_stack;
thread_local HashMap<int, HashMap<String, Ref<Resource>>> ResourceLoader::res_ref_overrides;
thread_local ResourceLoader::ThreadLoadTask *ResourceLoader::curr_load_task = nullptr;

SafeBinaryMutex<ResourceLoader::BINARY_MUTEX_TAG> &_get_res_loader_mutex() {
return ResourceLoader::thread_load_mutex;
Expand Down
11 changes: 11 additions & 0 deletions core/io/resource_loader.h
Original file line number Diff line number Diff line change
Expand Up @@ -189,13 +189,20 @@ class ResourceLoader {
Ref<Resource> resource;
bool use_sub_threads = false;
HashSet<String> sub_tasks;

struct ResourceChangedConnection {
Resource *source = nullptr;
Callable callable;
};
LocalVector<ResourceChangedConnection> resource_changed_connections;
};

static void _run_load_task(void *p_userdata);

static thread_local int load_nesting;
static thread_local HashMap<int, HashMap<String, Ref<Resource>>> res_ref_overrides; // Outermost key is nesting level.
static thread_local Vector<String> load_paths_stack;
static thread_local ThreadLoadTask *curr_load_task;

static SafeBinaryMutex<BINARY_MUTEX_TAG> thread_load_mutex;
friend SafeBinaryMutex<BINARY_MUTEX_TAG> &_get_res_loader_mutex();
Expand All @@ -216,6 +223,10 @@ class ResourceLoader {

static bool is_within_load() { return load_nesting > 0; };

static void resource_changed_connect(Resource *p_source, const Callable &p_callable);
static void resource_changed_disconnect(Resource *p_source, const Callable &p_callable);
static void resource_changed_emit(Resource *p_source);

static Ref<Resource> load(const String &p_path, const String &p_type_hint = "", ResourceFormatLoader::CacheMode p_cache_mode = ResourceFormatLoader::CACHE_MODE_REUSE, Error *r_error = nullptr);
static bool exists(const String &p_path, const String &p_type_hint = "");

Expand Down

0 comments on commit d3ff1b8

Please sign in to comment.