Skip to content

Commit

Permalink
Update code to C++17
Browse files Browse the repository at this point in the history
  • Loading branch information
nschlia committed Aug 7, 2024
1 parent 22110e4 commit beaecfb
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 22 deletions.
22 changes: 13 additions & 9 deletions src/thread_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ void thread_pool::loop_function()

while (true)
{
THREADINFO info;
FunctionPointer info;
{
std::unique_lock<std::mutex> lock_queue_mutex(m_queue_mutex);
m_queue_cond.wait(lock_queue_mutex, [this]{ return (!m_thread_queue.empty() || m_queue_shutdown); });
Expand All @@ -73,13 +73,13 @@ void thread_pool::loop_function()
m_thread_queue.pop();
}

int ret = info.m_thread_func(info.m_opaque);
int ret = info();

Logging::trace(nullptr, "The job using pool thread no. %1 with id 0x%<" FFMPEGFS_FORMAT_PTHREAD_T ">2 has exited with return code %3.", thread_no, pthread_self(), ret);
}
}

bool thread_pool::schedule_thread(int (*thread_func)(void *), void *opaque)
bool thread_pool::schedule_thread(FunctionPointer &&func)
{
if (!m_queue_shutdown)
{
Expand All @@ -88,11 +88,7 @@ bool thread_pool::schedule_thread(int (*thread_func)(void *), void *opaque)
{
std::lock_guard<std::mutex> lock_queue_mutex(m_queue_mutex);

THREADINFO info;

info.m_thread_func = thread_func;
info.m_opaque = opaque;
m_thread_queue.push(info);
m_thread_queue.push(func);
}

m_queue_cond.notify_one();
Expand Down Expand Up @@ -122,8 +118,14 @@ unsigned int thread_pool::pool_size() const
return static_cast<unsigned int>(m_thread_pool.size());
}

void thread_pool::init(unsigned int num_threads /*= 0*/)
int thread_pool::init(unsigned int num_threads /*= 0*/)
{
if (!m_thread_pool.empty())
{
Logging::warning(nullptr, "The thread pool already initialised");
return 0;
}

if (num_threads)
{
m_num_threads = num_threads;
Expand All @@ -135,6 +137,8 @@ void thread_pool::init(unsigned int num_threads /*= 0*/)
{
m_thread_pool.emplace_back(std::thread(&thread_pool::loop_function_starter, std::ref(*this)));
}

return static_cast<int>(m_thread_pool.size());
}

void thread_pool::tear_down(bool silent)
Expand Down
16 changes: 8 additions & 8 deletions src/thread_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,17 +46,15 @@
#include <condition_variable>
#include <unistd.h>
#include <atomic>
#include <functional>

/**
* @brief The thread_pool class.
*/
class thread_pool
{
typedef struct THREADINFO /**< Thread info structure */
{
int (*m_thread_func)(void *); /**< Job function pointer */
void *m_opaque; /**< Parameter for job function */
} THREADINFO;
public:
typedef std::function<int(void)> FunctionPointer;

public:
/**
Expand All @@ -71,9 +69,11 @@ class thread_pool

/**
* @brief Initialise thread pool.
* Initialise the thread pool. Does nothing if called more than once.
* @param[in] num_threads - Optional: number of threads to create in pool. Defaults to Defaults to 4x number of CPU cores.
* @return Number of threads created on success; on error or if called more than once, returns 0.
*/
void init(unsigned int num_threads = 0);
int init(unsigned int num_threads = 0);
/**
* @brief Shut down the thread pool.
* @param[in] silent - If true, no log messages will be issued.
Expand All @@ -85,7 +85,7 @@ class thread_pool
* @param[in] opaque - Parameter passed to thread function.
* @return Returns true if thread was successfully scheduled, fals if not.
*/
bool schedule_thread(int (*thread_func)(void *), void *opaque);
bool schedule_thread(FunctionPointer && func);
/**
* @brief Get number of currently running threads.
* @return Returns number of currently running threads.
Expand Down Expand Up @@ -117,7 +117,7 @@ class thread_pool
std::vector<std::thread> m_thread_pool; /**< Thread pool */
std::mutex m_queue_mutex; /**< Mutex for critical section */
std::condition_variable m_queue_cond; /**< Condition for critical section */
std::queue<THREADINFO> m_thread_queue; /**< Thread queue parameters */
std::queue<FunctionPointer> m_thread_queue; /**< Thread queue parameters */
std::atomic_bool m_queue_shutdown; /**< If true all threads have been shut down */
unsigned int m_num_threads; /**< Max. number of threads. Defaults to 4x number of CPU cores. */
unsigned int m_cur_threads; /**< Current number of threads. */
Expand Down
9 changes: 4 additions & 5 deletions src/transcode.cc
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ static Cache * cache; /**< @brief Global c
static std::atomic_bool thread_exit; /**< @brief Used for shutdown: if true, forcibly exit all threads */

static bool transcode(THREAD_DATA *thread_data, Cache_Entry *cache_entry, FFmpeg_Transcoder & transcoder, bool *timeout);
static int transcoder_thread(void *arg);
static int transcoder_thread(THREAD_DATA *thread_data);
static bool transcode_until(Cache_Entry* cache_entry, size_t offset, size_t len, uint32_t segment_no);
static int transcode_finish(Cache_Entry* cache_entry, FFmpeg_Transcoder & transcoder);

Expand Down Expand Up @@ -416,7 +416,7 @@ Cache_Entry* transcoder_new(LPVIRTUALFILE virtualfile, bool begin_transcode)
{
std::unique_lock<std::mutex> lock_thread_running_mutex(thread_data->m_thread_running_mutex);

tp->schedule_thread(&transcoder_thread, thread_data);
tp->schedule_thread(std::bind(&transcoder_thread, thread_data));

// Let decoder get into gear before returning from open
while (!thread_data->m_thread_running_lock_guard)
Expand Down Expand Up @@ -995,12 +995,11 @@ static bool transcode(THREAD_DATA *thread_data, Cache_Entry *cache_entry, FFmpeg

/**
* @brief Transcoding thread
* @param[in] arg - Corresponding Cache_Entry object.
* @param[in] thread_data - Corresponding Cache_Entry object.
* @returns Returns 0 on success; or errno code on error.
*/
static int transcoder_thread(void *arg)
static int transcoder_thread(THREAD_DATA *thread_data)
{
THREAD_DATA *thread_data = static_cast<THREAD_DATA*>(arg);
Cache_Entry *cache_entry = static_cast<Cache_Entry *>(thread_data->m_arg);
FFmpeg_Transcoder transcoder;
bool timeout = false;
Expand Down

0 comments on commit beaecfb

Please sign in to comment.