Skip to content

Commit

Permalink
added ThreadPool
Browse files Browse the repository at this point in the history
  • Loading branch information
zdzhaoyong committed Jun 26, 2018
1 parent 58ff2da commit 05500b4
Showing 1 changed file with 85 additions and 0 deletions.
85 changes: 85 additions & 0 deletions GSLAM/core/Mutex.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ typedef pi::Event Event;
#include <mutex>
#include <condition_variable>
#include <atomic>
#include <functional>
#include <future>
#include <queue>
#include "Glog.h"

namespace GSLAM{
typedef std::mutex Mutex;
Expand Down Expand Up @@ -74,6 +78,87 @@ class Event
std::condition_variable _cond;
};

// A simple threadpool implementation.
class ThreadPool {
public:
// All the threads are created upon construction.
explicit ThreadPool(const int num_threads): stop(false) {
CHECK_GE(num_threads, 1)
<< "The number of threads specified to the ThreadPool is insufficient.";
for (size_t i = 0; i < num_threads; ++i) {
workers.emplace_back([this] {
for (;;) {
std::function<void()> task;

{
std::unique_lock<std::mutex> lock(this->queue_mutex);
this->condition.wait(lock, [this] {
return this->stop || !this->tasks.empty();
});
if (this->stop && this->tasks.empty()) return;
task = std::move(this->tasks.front());
this->tasks.pop();
}

task();
}
});
}
}
~ThreadPool(){
{
std::unique_lock<std::mutex> lock(queue_mutex);
stop = true;
}
condition.notify_all();
for (std::thread& worker : workers)
worker.join();
}


// Adds a task to the threadpool.
template <class F, class... Args>
auto Add(F&& f, Args&& ... args)
->std::future<typename std::result_of<F(Args...)>::type>;

private:
// Keep track of threads so we can join them
std::vector<std::thread> workers;
// The task queue
std::queue<std::function<void()> > tasks;

// Synchronization
std::mutex queue_mutex;
std::condition_variable condition;
bool stop;

};

// add new work item to the pool
template <class F, class... Args>
auto ThreadPool::Add(F&& f, Args&& ... args)
->std::future<typename std::result_of<F(Args...)>::type> {
using return_type = typename std::result_of<F(Args...)>::type;

auto task = std::make_shared<std::packaged_task<return_type()> >(
std::bind(std::forward<F>(f), std::forward<Args>(args)...));

std::future<return_type> res = task->get_future();
{
std::unique_lock<std::mutex> lock(queue_mutex);

// don't allow enqueueing after stopping the pool
CHECK(!stop) << "The ThreadPool object has been destroyed! Cannot add more "
"tasks to the ThreadPool!";

tasks.emplace([task]() {
(*task)();
});
}
condition.notify_one();
return res;
}

}
#endif

Expand Down

0 comments on commit 05500b4

Please sign in to comment.