diff --git a/BUILD.bazel b/BUILD.bazel index 088646b226..1e2e67e7f5 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -39,6 +39,7 @@ expand_template( "@ILMTHREAD_NAMESPACE@": "IlmThread", "#cmakedefine01 ILMTHREAD_HAVE_POSIX_SEMAPHORES": "#define ILMTHREAD_HAVE_POSIX_SEMAPHORES 0", "#cmakedefine01 ILMTHREAD_THREADING_ENABLED": "#define ILMTHREAD_THREADING_ENABLED 1", + "#cmakedefine01 ILMTHREAD_USE_TBB": "#define ILMTHREAD_USE_TBB 0", }, template = "cmake/IlmThreadConfig.h.in", ) diff --git a/cmake/IlmThreadConfig.h.in b/cmake/IlmThreadConfig.h.in index 576f09a035..7c4339ca56 100644 --- a/cmake/IlmThreadConfig.h.in +++ b/cmake/IlmThreadConfig.h.in @@ -17,6 +17,7 @@ #cmakedefine01 ILMTHREAD_THREADING_ENABLED #cmakedefine01 ILMTHREAD_HAVE_POSIX_SEMAPHORES +#cmakedefine01 ILMTHREAD_USE_TBB // // Current internal library namespace name diff --git a/cmake/OpenEXRSetup.cmake b/cmake/OpenEXRSetup.cmake index a63acebc11..c3291bd2c8 100644 --- a/cmake/OpenEXRSetup.cmake +++ b/cmake/OpenEXRSetup.cmake @@ -42,6 +42,14 @@ option(OPENEXR_INSTALL_PKG_CONFIG "Install OpenEXR.pc file" ON) # Whether to enable threading. This can be disabled, although thread pool and tasks # are still used, just processed immediately option(OPENEXR_ENABLE_THREADING "Enables threaded processing of requests" ON) +# When set to ON, will change the thread pool to use TBB for the +# global thread pool by default. +# +# Regardless of this setting, if you create your own additional thread +# pools, those will NOT use TBB by default, as it can easily cause +# recursive mutex deadlocks as TBB shares a single thread pool with +# multiple arenas +option(OPENEXR_USE_TBB "Switch internals of IlmThreadPool to use TBB by default" OFF) option(OPENEXR_USE_DEFAULT_VISIBILITY "Makes the compile use default visibility (by default compiles tidy, hidden-by-default)" OFF) @@ -170,7 +178,14 @@ if(OPENEXR_ENABLE_THREADING) message(FATAL_ERROR "Unable to find a threading library, disable with OPENEXR_ENABLE_THREADING=OFF") endif() endif() + if(OPENEXR_USE_TBB) + find_package(TBB) + if(NOT TBB_FOUND) + message(FATAL_ERROR "Unable to find the OneTBB cmake library, disable with ILMTHREAD_USE_TBB=OFF or fix TBB install") + endif() + endif() endif() +set (ILMTHREAD_USE_TBB ${OPENEXR_USE_TBB}) option(OPENEXR_FORCE_INTERNAL_DEFLATE "Force using an internal libdeflate" OFF) set(OPENEXR_DEFLATE_REPO "https://github.com/ebiggers/libdeflate.git" CACHE STRING "Repo path for libdeflate source") diff --git a/src/lib/IlmThread/CMakeLists.txt b/src/lib/IlmThread/CMakeLists.txt index f2ebc95cc4..2eefd05aab 100644 --- a/src/lib/IlmThread/CMakeLists.txt +++ b/src/lib/IlmThread/CMakeLists.txt @@ -27,6 +27,9 @@ openexr_define_library(IlmThread ) if(OPENEXR_ENABLE_THREADING) + if (ILMTHREAD_USE_TBB) + target_link_libraries(IlmThread PUBLIC TBB::tbb) + endif() target_link_libraries(IlmThread PUBLIC Threads::Threads) endif() diff --git a/src/lib/IlmThread/IlmThreadPool.cpp b/src/lib/IlmThread/IlmThreadPool.cpp index 067490fea7..af7de81f9f 100644 --- a/src/lib/IlmThread/IlmThreadPool.cpp +++ b/src/lib/IlmThread/IlmThreadPool.cpp @@ -27,12 +27,16 @@ # include #endif -ILMTHREAD_INTERNAL_NAMESPACE_SOURCE_ENTER - #if ILMTHREAD_THREADING_ENABLED # define ENABLE_THREADING +# if ILMTHREAD_USE_TBB +# include +using namespace oneapi; +# endif #endif +ILMTHREAD_INTERNAL_NAMESPACE_SOURCE_ENTER + namespace { @@ -56,6 +60,7 @@ handleProcessTask (Task* task) } } +#ifdef ENABLE_THREADING struct DefaultThreadPoolData { Semaphore _taskSemaphore; // threads wait on this for ready tasks @@ -81,6 +86,7 @@ struct DefaultThreadPoolData _stopping = false; } }; +#endif } // namespace @@ -95,11 +101,11 @@ struct TaskGroup::Data Data (Data&&) = delete; Data& operator= (Data&&) = delete; + void waitForEmpty (); + void addTask (); void removeTask (); - void waitForEmpty (); - std::atomic numPending; std::atomic inFlight; Semaphore isEmpty; // used to signal that the taskgroup is empty @@ -110,10 +116,11 @@ struct ThreadPool::Data using ProviderPtr = std::shared_ptr; Data (); + Data (ThreadPoolProvider *p); ~Data (); Data (const Data&) = delete; Data& operator= (const Data&) = delete; - Data (Data&&) = delete; + Data (Data&&) = default; Data& operator= (Data&&) = delete; ProviderPtr getProvider () const { return std::atomic_load (&_provider); } @@ -130,6 +137,63 @@ struct ThreadPool::Data namespace { +#if ILMTHREAD_USE_TBB +class TBBThreadPoolProvider : public ThreadPoolProvider +{ +public: + TBBThreadPoolProvider (int count) { setNumThreads (count); } + TBBThreadPoolProvider (const TBBThreadPoolProvider&) = delete; + TBBThreadPoolProvider& + operator= (const TBBThreadPoolProvider&) = delete; + TBBThreadPoolProvider (TBBThreadPoolProvider&&) = delete; + TBBThreadPoolProvider& operator= (TBBThreadPoolProvider&&) = delete; + ~TBBThreadPoolProvider () noexcept override + { + finish (); + } + + int numThreads () const override + { + return _arena ? _arena->max_concurrency () : 1; + } + void setNumThreads (int count) override + { + if (_arena) + _arena->terminate (); + _arena.reset (); + + if (count > 1) + { + _arena = std::make_unique (count); + _arena->initialize (); + } + } + + void addTask (Task* task) override + { + if (_arena) + { + _arena->enqueue ([=] () + { + handleProcessTask (task); + }); + } + else + handleProcessTask (task); + } + + void finish () override + { + if (_arena) + _arena->terminate (); + _arena.reset(); + } +private: + + std::unique_ptr _arena; +}; +#endif + // // class DefaultThreadPoolProvider // @@ -331,7 +395,8 @@ DefaultThreadPoolProvider::threadLoop ( // struct TaskGroup::Data // -TaskGroup::Data::Data () : numPending (0), inFlight (0), isEmpty (1) +TaskGroup::Data::Data () + : numPending (0), inFlight (0), isEmpty (1) {} TaskGroup::Data::~Data () @@ -402,6 +467,12 @@ ThreadPool::Data::Data () // empty } +ThreadPool::Data::Data (ThreadPoolProvider *p) + : _provider (p) +{ + // empty +} + ThreadPool::Data::~Data () { setProvider (nullptr); @@ -485,6 +556,17 @@ ThreadPool::ThreadPool (unsigned nthreads) #endif } +// private constructor to avoid multiple calls +ThreadPool::ThreadPool (Data&& d) + : +#ifdef ENABLE_THREADING + _data (new Data (std::move (d))) +#else + _data (nullptr) +#endif +{ +} + ThreadPool::~ThreadPool () { #ifdef ENABLE_THREADING @@ -580,9 +662,19 @@ ThreadPool::globalThreadPool () // // The global thread pool // - +#if ILMTHREAD_USE_TBB + // Use TBB for the global thread pool by default + // + // We do not (currently) use this as the default thread pool + // provider as it can easily cause recursive mutex deadlocks as + // TBB shares a single thread pool with multiple arenas + static ThreadPool gThreadPool ( + ThreadPool::Data ( + new TBBThreadPoolProvider ( + tbb::this_task_arena::max_concurrency ()))); +#else static ThreadPool gThreadPool (0); - +#endif return gThreadPool; } @@ -596,24 +688,28 @@ unsigned ThreadPool::estimateThreadCountForFileIO () { #ifdef ENABLE_THREADING +# if ILMTHREAD_USE_TBB + return tbb::this_task_arena::max_concurrency (); +# else unsigned rv = std::thread::hardware_concurrency (); // hardware concurrency is not required to work if (rv == 0 || rv > static_cast (std::numeric_limits::max ())) { rv = 1; -# if (defined(_WIN32) || defined(_WIN64)) +# if (defined(_WIN32) || defined(_WIN64)) SYSTEM_INFO si; GetNativeSystemInfo (&si); rv = si.dwNumberOfProcessors; -# else +# else // linux, bsd, and mac are fine with this // other *nix should be too, right? rv = sysconf (_SC_NPROCESSORS_ONLN); -# endif +# endif } return rv; +# endif #else return 0; #endif diff --git a/src/lib/IlmThread/IlmThreadPool.h b/src/lib/IlmThread/IlmThreadPool.h index 0d89742da3..f8472c30ac 100644 --- a/src/lib/IlmThread/IlmThreadPool.h +++ b/src/lib/IlmThread/IlmThreadPool.h @@ -150,6 +150,9 @@ class ILMTHREAD_EXPORT_TYPE ThreadPool protected: Data* _data; + +private: + ILMTHREAD_HIDDEN ThreadPool (Data&& d); }; class ILMTHREAD_EXPORT_TYPE Task diff --git a/src/test/OpenEXRTest/testMultiPartThreading.cpp b/src/test/OpenEXRTest/testMultiPartThreading.cpp index fc25bc21d3..cd3ca56892 100644 --- a/src/test/OpenEXRTest/testMultiPartThreading.cpp +++ b/src/test/OpenEXRTest/testMultiPartThreading.cpp @@ -620,17 +620,19 @@ generateRandomFile (int partCount, const std::string& fn) // // Writing tasks. // - TaskGroup taskGroup; ThreadPool* threadPool = new ThreadPool (32); - vector list; - for (int i = 0; i < taskListSize; i++) { - list.push_back (&taskList[i]); - if (i % 10 == 0 || i == taskListSize - 1) + TaskGroup taskGroup; + vector list; + for (int i = 0; i < taskListSize; i++) { - threadPool->addTask ( - (new WritingTask (&taskGroup, &file, list, tiledFrameBuffers))); - list.clear (); + list.push_back (&taskList[i]); + if (i % 10 == 0 || i == taskListSize - 1) + { + threadPool->addTask ( + (new WritingTask (&taskGroup, &file, list, tiledFrameBuffers))); + list.clear (); + } } } diff --git a/website/install.rst b/website/install.rst index 3730d7aa10..36d03f2614 100644 --- a/website/install.rst +++ b/website/install.rst @@ -89,6 +89,7 @@ Make sure these are installed on your system before building OpenEXR: * C++ compiler that supports C++11 * Imath (auto fetched by CMake if not found) (https://github.com/AcademySoftwareFoundation/openexr) * libdeflate source code (auto fetched by CMake if not found) (https://github.com/ebiggers/libdeflate) +* (optional) Intel's Thread Building Blocks library (TBB) The instructions that follow describe building OpenEXR with CMake. @@ -388,6 +389,30 @@ local filesystem via a ``file:`` url: cmake -DOPENEXR_IMAGES_REPO=file:///my/clone/of/openexr-images -DOPENEXR_IMAGES_TAG="" +TBB Dependency +~~~~~~~~~~~~~~ + +OpenEXR can optionally use the TBB library as the default global +thread pool as a thread provider. This allows applications which also +use TBB for other purposes to lower the number of active threads. With +high core count machines more prevalent, this can signficantly lower +the number of active threads and so the improve available resources +especially when compiling with a static library and using plugins +which use OpenEXR. + +This is disabled by default, but when turned on, assumes the OneAPI +version of TBB which provides cmake modules. This ONLY changes the +global thread pool as otherwise this can cause mutex deadlocks if you +create other ThreadPools thinking that they are separate threads (i.e. +the previous use case), but TBB shares actual threads and uses an +arena to control thread usage. + +To enable this, set the flag during config: + +.. code-block:: + + cmake -DOPENEXR_USE_TBB=ON ... + Namespace Options ~~~~~~~~~~~~~~~~~