Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add an option to use TBB as the global provider #1852

Merged
merged 5 commits into from
Jan 1, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ expand_template(
"@ILMTHREAD_NAMESPACE@": "IlmThread",
"#cmakedefine01 ILMTHREAD_HAVE_POSIX_SEMAPHORES": "#define ILMTHREAD_HAVE_POSIX_SEMAPHORES 0",
"#cmakedefine01 ILMTHREAD_THREADING_ENABLED": "#define ILMTHREAD_THREADING_ENABLED 1",
"#cmakedefine01 ILMTHREAD_USE_TBB": "#define ILMTHREAD_USE_TBB 0",
},
template = "cmake/IlmThreadConfig.h.in",
)
Expand Down
1 change: 1 addition & 0 deletions cmake/IlmThreadConfig.h.in
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#cmakedefine01 ILMTHREAD_THREADING_ENABLED
#cmakedefine01 ILMTHREAD_HAVE_POSIX_SEMAPHORES
#cmakedefine01 ILMTHREAD_USE_TBB

//
// Current internal library namespace name
Expand Down
15 changes: 15 additions & 0 deletions cmake/OpenEXRSetup.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,14 @@ option(OPENEXR_INSTALL_PKG_CONFIG "Install OpenEXR.pc file" ON)
# Whether to enable threading. This can be disabled, although thread pool and tasks
# are still used, just processed immediately
option(OPENEXR_ENABLE_THREADING "Enables threaded processing of requests" ON)
# When set to ON, will change the thread pool to use TBB for the
# global thread pool by default.
#
# Regardless of this setting, if you create your own additional thread
# pools, those will NOT use TBB by default, as it can easily cause
# recursive mutex deadlocks as TBB shares a single thread pool with
# multiple arenas
option(OPENEXR_USE_TBB "Switch internals of IlmThreadPool to use TBB by default" OFF)

option(OPENEXR_USE_DEFAULT_VISIBILITY "Makes the compile use default visibility (by default compiles tidy, hidden-by-default)" OFF)

Expand Down Expand Up @@ -170,7 +178,14 @@ if(OPENEXR_ENABLE_THREADING)
message(FATAL_ERROR "Unable to find a threading library, disable with OPENEXR_ENABLE_THREADING=OFF")
endif()
endif()
if(OPENEXR_USE_TBB)
find_package(TBB)
if(NOT TBB_FOUND)
message(FATAL_ERROR "Unable to find the OneTBB cmake library, disable with ILMTHREAD_USE_TBB=OFF or fix TBB install")
endif()
endif()
endif()
set (ILMTHREAD_USE_TBB ${OPENEXR_USE_TBB})

option(OPENEXR_FORCE_INTERNAL_DEFLATE "Force using an internal libdeflate" OFF)
set(OPENEXR_DEFLATE_REPO "https://github.com/ebiggers/libdeflate.git" CACHE STRING "Repo path for libdeflate source")
Expand Down
3 changes: 3 additions & 0 deletions src/lib/IlmThread/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ openexr_define_library(IlmThread
)

if(OPENEXR_ENABLE_THREADING)
if (ILMTHREAD_USE_TBB)
target_link_libraries(IlmThread PUBLIC TBB::tbb)
endif()
target_link_libraries(IlmThread PUBLIC Threads::Threads)
endif()

118 changes: 107 additions & 11 deletions src/lib/IlmThread/IlmThreadPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,16 @@
# include <unistd.h>
#endif

ILMTHREAD_INTERNAL_NAMESPACE_SOURCE_ENTER

#if ILMTHREAD_THREADING_ENABLED
# define ENABLE_THREADING
# if ILMTHREAD_USE_TBB
# include <oneapi/tbb/task_arena.h>
using namespace oneapi;
# endif
#endif

ILMTHREAD_INTERNAL_NAMESPACE_SOURCE_ENTER

namespace
{

Expand All @@ -56,6 +60,7 @@ handleProcessTask (Task* task)
}
}

#ifdef ENABLE_THREADING
struct DefaultThreadPoolData
{
Semaphore _taskSemaphore; // threads wait on this for ready tasks
Expand All @@ -81,6 +86,7 @@ struct DefaultThreadPoolData
_stopping = false;
}
};
#endif

} // namespace

Expand All @@ -95,11 +101,11 @@ struct TaskGroup::Data
Data (Data&&) = delete;
Data& operator= (Data&&) = delete;

void waitForEmpty ();

void addTask ();
void removeTask ();

void waitForEmpty ();

std::atomic<int> numPending;
std::atomic<int> inFlight;
Semaphore isEmpty; // used to signal that the taskgroup is empty
Expand All @@ -110,10 +116,11 @@ struct ThreadPool::Data
using ProviderPtr = std::shared_ptr<ThreadPoolProvider>;

Data ();
Data (ThreadPoolProvider *p);
~Data ();
Data (const Data&) = delete;
Data& operator= (const Data&) = delete;
Data (Data&&) = delete;
Data (Data&&) = default;
Data& operator= (Data&&) = delete;

ProviderPtr getProvider () const { return std::atomic_load (&_provider); }
Expand All @@ -130,6 +137,63 @@ struct ThreadPool::Data
namespace
{

#if ILMTHREAD_USE_TBB
class TBBThreadPoolProvider : public ThreadPoolProvider
{
public:
TBBThreadPoolProvider (int count) { setNumThreads (count); }
TBBThreadPoolProvider (const TBBThreadPoolProvider&) = delete;
TBBThreadPoolProvider&
operator= (const TBBThreadPoolProvider&) = delete;
TBBThreadPoolProvider (TBBThreadPoolProvider&&) = delete;
TBBThreadPoolProvider& operator= (TBBThreadPoolProvider&&) = delete;
~TBBThreadPoolProvider () noexcept override
{
finish ();
}

int numThreads () const override
{
return _arena ? _arena->max_concurrency () : 1;
}
void setNumThreads (int count) override
{
if (_arena)
_arena->terminate ();
_arena.reset ();

if (count > 1)
{
_arena = std::make_unique<tbb::task_arena> (count);
_arena->initialize ();
}
}

void addTask (Task* task) override
{
if (_arena)
{
_arena->enqueue ([=] ()
{
handleProcessTask (task);
});
}
else
handleProcessTask (task);
}

void finish () override
{
if (_arena)
_arena->terminate ();
_arena.reset();
}
private:

std::unique_ptr<tbb::task_arena> _arena;
};
#endif

//
// class DefaultThreadPoolProvider
//
Expand Down Expand Up @@ -331,7 +395,8 @@ DefaultThreadPoolProvider::threadLoop (
// struct TaskGroup::Data
//

TaskGroup::Data::Data () : numPending (0), inFlight (0), isEmpty (1)
TaskGroup::Data::Data ()
: numPending (0), inFlight (0), isEmpty (1)
{}

TaskGroup::Data::~Data ()
Expand Down Expand Up @@ -402,6 +467,12 @@ ThreadPool::Data::Data ()
// empty
}

ThreadPool::Data::Data (ThreadPoolProvider *p)
: _provider (p)
{
// empty
}

ThreadPool::Data::~Data ()
{
setProvider (nullptr);
Expand Down Expand Up @@ -485,6 +556,17 @@ ThreadPool::ThreadPool (unsigned nthreads)
#endif
}

// private constructor to avoid multiple calls
ThreadPool::ThreadPool (Data&& d)
:
#ifdef ENABLE_THREADING
_data (new Data (std::move (d)))
#else
_data (nullptr)
#endif
{
}

ThreadPool::~ThreadPool ()
{
#ifdef ENABLE_THREADING
Expand Down Expand Up @@ -580,9 +662,19 @@ ThreadPool::globalThreadPool ()
//
// The global thread pool
//

#if ILMTHREAD_USE_TBB
// Use TBB for the global thread pool by default
//
// We do not (currently) use this as the default thread pool
// provider as it can easily cause recursive mutex deadlocks as
// TBB shares a single thread pool with multiple arenas
static ThreadPool gThreadPool (
ThreadPool::Data (
new TBBThreadPoolProvider (
tbb::this_task_arena::max_concurrency ())));
#else
static ThreadPool gThreadPool (0);

#endif
return gThreadPool;
}

Expand All @@ -596,24 +688,28 @@ unsigned
ThreadPool::estimateThreadCountForFileIO ()
{
#ifdef ENABLE_THREADING
# if ILMTHREAD_USE_TBB
return tbb::this_task_arena::max_concurrency ();
# else
unsigned rv = std::thread::hardware_concurrency ();
// hardware concurrency is not required to work
if (rv == 0 ||
rv > static_cast<unsigned> (std::numeric_limits<int>::max ()))
{
rv = 1;
# if (defined(_WIN32) || defined(_WIN64))
# if (defined(_WIN32) || defined(_WIN64))
SYSTEM_INFO si;
GetNativeSystemInfo (&si);

rv = si.dwNumberOfProcessors;
# else
# else
// linux, bsd, and mac are fine with this
// other *nix should be too, right?
rv = sysconf (_SC_NPROCESSORS_ONLN);
# endif
# endif
}
return rv;
# endif
#else
return 0;
#endif
Expand Down
3 changes: 3 additions & 0 deletions src/lib/IlmThread/IlmThreadPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,9 @@ class ILMTHREAD_EXPORT_TYPE ThreadPool

protected:
Data* _data;

private:
ILMTHREAD_HIDDEN ThreadPool (Data&& d);
};

class ILMTHREAD_EXPORT_TYPE Task
Expand Down
18 changes: 10 additions & 8 deletions src/test/OpenEXRTest/testMultiPartThreading.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -620,17 +620,19 @@ generateRandomFile (int partCount, const std::string& fn)
//
// Writing tasks.
//
TaskGroup taskGroup;
ThreadPool* threadPool = new ThreadPool (32);
vector<WritingTaskData*> list;
for (int i = 0; i < taskListSize; i++)
{
list.push_back (&taskList[i]);
if (i % 10 == 0 || i == taskListSize - 1)
TaskGroup taskGroup;
vector<WritingTaskData*> list;
for (int i = 0; i < taskListSize; i++)
{
threadPool->addTask (
(new WritingTask (&taskGroup, &file, list, tiledFrameBuffers)));
list.clear ();
list.push_back (&taskList[i]);
if (i % 10 == 0 || i == taskListSize - 1)
{
threadPool->addTask (
(new WritingTask (&taskGroup, &file, list, tiledFrameBuffers)));
list.clear ();
}
}
}

Expand Down
25 changes: 25 additions & 0 deletions website/install.rst
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ Make sure these are installed on your system before building OpenEXR:
* C++ compiler that supports C++11
* Imath (auto fetched by CMake if not found) (https://github.com/AcademySoftwareFoundation/openexr)
* libdeflate source code (auto fetched by CMake if not found) (https://github.com/ebiggers/libdeflate)
* (optional) Intel's Thread Building Blocks library (TBB)

The instructions that follow describe building OpenEXR with CMake.

Expand Down Expand Up @@ -388,6 +389,30 @@ local filesystem via a ``file:`` url:

cmake -DOPENEXR_IMAGES_REPO=file:///my/clone/of/openexr-images -DOPENEXR_IMAGES_TAG=""

TBB Dependency
~~~~~~~~~~~~~~

OpenEXR can optionally use the TBB library as the default global
thread pool as a thread provider. This allows applications which also
use TBB for other purposes to lower the number of active threads. With
high core count machines more prevalent, this can signficantly lower
the number of active threads and so the improve available resources
especially when compiling with a static library and using plugins
which use OpenEXR.

This is disabled by default, but when turned on, assumes the OneAPI
version of TBB which provides cmake modules. This ONLY changes the
global thread pool as otherwise this can cause mutex deadlocks if you
create other ThreadPools thinking that they are separate threads (i.e.
the previous use case), but TBB shares actual threads and uses an
arena to control thread usage.

To enable this, set the flag during config:

.. code-block::

cmake -DOPENEXR_USE_TBB=ON ...

Namespace Options
~~~~~~~~~~~~~~~~~

Expand Down
Loading