I wanted to quickly create a thread pooler that could be passed lamdas. Turns out it’s super simple. Here are the main things I wanted to accomplish:
- pass lamdas as tasks to be executed asap.
- be able to wait for all tasks to execute
- create a bunch of threads at initialization and them have then wait around for tasks, since it can be time-costly to create a new thread on the fly.
- be able to create as many poolers as I want for different uses.
The first thing I needed to be able to do was store a bunch of lamdas in a single vector, and any sort of lamda. I decided the easiest way to do this was to use lamda members for the task parameters. This is what add task call could look like:
int x, y,z;
mThreadPool.addTask([x,y,z](){
//task on x, y, z ...
});
The task should execute on another thread as soon as possible.
So in a loop, I’d call, mThreadPool.addTask a bunch of times, and then I’d have to wait on them like so:
mThreadPool.wait();
To store the task lamdas inside the ThreadPool, I used an std::function:
typedef std::function<void()> Task;
std::vector< Task > mTasks;
std::mutex mTaskMutex;
This works because the task receives its parameters from the lamda itself, not from the lamda call operator. There has to be a mutex because multiple threads are going to query the mTasks vector.
I wanted to create the entire thread pool at one time, so there are initialize and shutdown functions:
std::vector<std::thread> mThreads;
std::atomic<bool> mWorking{ false };
void initialize() {
if( mWorking ) EXCEPT << "ThreadPool Already Running!";
mThreads.resize(std::thread::hardware_concurrency());
//create the threads
mWorking = true;
for (auto& thread : mThreads) {
thread = std::thread([&]() { threadWork(); });
}
}
void stop(){
mWorking=false;
}
void shutdown() {
stop();
for (auto& thread : mThreads) {
if (thread.joinable()) thread.join();
}
}
The threadWork function is where threads wait for tasks to perform and execute them as they become available:
std::atomic<bool> mWorking{ false };
std::atomic<int> mTasksExecutingCount{ 0 };
void threadWork() {
Task task = nullptr;
while (mWorking) {
//get a task
{
std::scoped_lock lock(mTaskMutex);
if (mTasks.size()) {
++mTasksExecutingCount;
task = std::move(mTasks.back());
mTasks.pop_back();
}
}
if (task) {
task();
--mTasksExecutingCount;
task = nullptr;
}
else
std::this_thread::yield();
}
}
The number of tasks currently executing is kept track of with mTasksExecutingCount. When the ThreadPool is waiting, it waits until the count is zero and the mTasks.size is zero.
class ThreadPool {
std::vector<std::thread> mThreads;
typedef std::function<void()> Task;
std::vector< Task > mTasks;
std::mutex mTaskMutex;
std::atomic<bool> mWorking{ false };
std::atomic<int> mTasksExecutingCount{ 0 };
void threadWork() {
Task task = nullptr;
while (mWorking) {
//get a task
{
std::scoped_lock lock(mTaskMutex);
if (mTasks.size()) {
++mTasksExecutingCount;
task = std::move(mTasks.back());
mTasks.pop_back();
}
}
if (task) {
task();
--mTasksExecutingCount;
task = nullptr;
}
else
std::this_thread::yield();
}
}
public:
void initialize() {
if (mWorking || mTasksExecutingCount != 0) EXCEPT << "ThreadPool Already Running!";
mThreads.resize(std::thread::hardware_concurrency());
//create the threads
mWorking = true;
for (auto& thread : mThreads) {
thread = std::thread([&]() { threadWork(); });
}
}
void addTask(Task task) {
std::scoped_lock lock(mTaskMutex);
mTasks.push_back(task);
}
void wait() {
do {
{
std::scoped_lock lock(mTaskMutex);
if (mTasks.size() == 0 && mTasksExecutingCount == 0) break;
}
std::this_thread::yield();
} while (mWorking);
}
void stop() {
mWorking = false;
}
void shutdown() {
stop();
for (auto& thread : mThreads) {
if (thread.joinable()) thread.join();
}
}
};