diff --git a/src/thread_pool.cc b/src/thread_pool.cc index 35e773f6..4c7642f3 100644 --- a/src/thread_pool.cc +++ b/src/thread_pool.cc @@ -57,7 +57,7 @@ void thread_pool::loop_function() while (true) { - THREADINFO info; + FunctionPointer info; { std::unique_lock lock_queue_mutex(m_queue_mutex); m_queue_cond.wait(lock_queue_mutex, [this]{ return (!m_thread_queue.empty() || m_queue_shutdown); }); @@ -73,13 +73,13 @@ void thread_pool::loop_function() m_thread_queue.pop(); } - int ret = info.m_thread_func(info.m_opaque); + int ret = info(); Logging::trace(nullptr, "The job using pool thread no. %1 with id 0x%<" FFMPEGFS_FORMAT_PTHREAD_T ">2 has exited with return code %3.", thread_no, pthread_self(), ret); } } -bool thread_pool::schedule_thread(int (*thread_func)(void *), void *opaque) +bool thread_pool::schedule_thread(FunctionPointer &&func) { if (!m_queue_shutdown) { @@ -88,11 +88,7 @@ bool thread_pool::schedule_thread(int (*thread_func)(void *), void *opaque) { std::lock_guard lock_queue_mutex(m_queue_mutex); - THREADINFO info; - - info.m_thread_func = thread_func; - info.m_opaque = opaque; - m_thread_queue.push(info); + m_thread_queue.push(func); } m_queue_cond.notify_one(); @@ -122,8 +118,14 @@ unsigned int thread_pool::pool_size() const return static_cast(m_thread_pool.size()); } -void thread_pool::init(unsigned int num_threads /*= 0*/) +int thread_pool::init(unsigned int num_threads /*= 0*/) { + if (!m_thread_pool.empty()) + { + Logging::warning(nullptr, "The thread pool already initialised"); + return 0; + } + if (num_threads) { m_num_threads = num_threads; @@ -135,6 +137,8 @@ void thread_pool::init(unsigned int num_threads /*= 0*/) { m_thread_pool.emplace_back(std::thread(&thread_pool::loop_function_starter, std::ref(*this))); } + + return static_cast(m_thread_pool.size()); } void thread_pool::tear_down(bool silent) diff --git a/src/thread_pool.h b/src/thread_pool.h index ac743fbb..9076c94e 100644 --- a/src/thread_pool.h +++ b/src/thread_pool.h @@ -46,17 +46,15 @@ #include #include #include +#include /** * @brief The thread_pool class. */ class thread_pool { - typedef struct THREADINFO /**< Thread info structure */ - { - int (*m_thread_func)(void *); /**< Job function pointer */ - void *m_opaque; /**< Parameter for job function */ - } THREADINFO; +public: + typedef std::function FunctionPointer; public: /** @@ -71,9 +69,11 @@ class thread_pool /** * @brief Initialise thread pool. + * Initialise the thread pool. Does nothing if called more than once. * @param[in] num_threads - Optional: number of threads to create in pool. Defaults to Defaults to 4x number of CPU cores. + * @return Number of threads created on success; on error or if called more than once, returns 0. */ - void init(unsigned int num_threads = 0); + int init(unsigned int num_threads = 0); /** * @brief Shut down the thread pool. * @param[in] silent - If true, no log messages will be issued. @@ -85,7 +85,7 @@ class thread_pool * @param[in] opaque - Parameter passed to thread function. * @return Returns true if thread was successfully scheduled, fals if not. */ - bool schedule_thread(int (*thread_func)(void *), void *opaque); + bool schedule_thread(FunctionPointer && func); /** * @brief Get number of currently running threads. * @return Returns number of currently running threads. @@ -117,7 +117,7 @@ class thread_pool std::vector m_thread_pool; /**< Thread pool */ std::mutex m_queue_mutex; /**< Mutex for critical section */ std::condition_variable m_queue_cond; /**< Condition for critical section */ - std::queue m_thread_queue; /**< Thread queue parameters */ + std::queue m_thread_queue; /**< Thread queue parameters */ std::atomic_bool m_queue_shutdown; /**< If true all threads have been shut down */ unsigned int m_num_threads; /**< Max. number of threads. Defaults to 4x number of CPU cores. */ unsigned int m_cur_threads; /**< Current number of threads. */ diff --git a/src/transcode.cc b/src/transcode.cc index ea9ecab0..612b7ed0 100644 --- a/src/transcode.cc +++ b/src/transcode.cc @@ -64,7 +64,7 @@ static Cache * cache; /**< @brief Global c static std::atomic_bool thread_exit; /**< @brief Used for shutdown: if true, forcibly exit all threads */ static bool transcode(THREAD_DATA *thread_data, Cache_Entry *cache_entry, FFmpeg_Transcoder & transcoder, bool *timeout); -static int transcoder_thread(void *arg); +static int transcoder_thread(THREAD_DATA *thread_data); static bool transcode_until(Cache_Entry* cache_entry, size_t offset, size_t len, uint32_t segment_no); static int transcode_finish(Cache_Entry* cache_entry, FFmpeg_Transcoder & transcoder); @@ -416,7 +416,7 @@ Cache_Entry* transcoder_new(LPVIRTUALFILE virtualfile, bool begin_transcode) { std::unique_lock lock_thread_running_mutex(thread_data->m_thread_running_mutex); - tp->schedule_thread(&transcoder_thread, thread_data); + tp->schedule_thread(std::bind(&transcoder_thread, thread_data)); // Let decoder get into gear before returning from open while (!thread_data->m_thread_running_lock_guard) @@ -995,12 +995,11 @@ static bool transcode(THREAD_DATA *thread_data, Cache_Entry *cache_entry, FFmpeg /** * @brief Transcoding thread - * @param[in] arg - Corresponding Cache_Entry object. + * @param[in] thread_data - Corresponding Cache_Entry object. * @returns Returns 0 on success; or errno code on error. */ -static int transcoder_thread(void *arg) +static int transcoder_thread(THREAD_DATA *thread_data) { - THREAD_DATA *thread_data = static_cast(arg); Cache_Entry *cache_entry = static_cast(thread_data->m_arg); FFmpeg_Transcoder transcoder; bool timeout = false;