A Platform-Independent Thread Pool Using C++14

13 minute read

Introduction

One of the major benefits provided by the new generation of graphics APIs is much better support for multithreaded command list generation and submission.  It’s not uncommon for computers nowadays to contain 2, 4, 8, or even 16 core processors.  The goal of the solution in this post is to ensure we can use the power our CPU provides, not just for generating graphics command lists, but for any task that can be easily parallelized.

At its simplest, a thread pool is a collection of threads that run continuously waiting to take on a task to complete.  If there’s no task available, they yield or sleep for some amount of time, wake back up, and check again.  When a task is available, one of the waiting threads claims it, runs it, and returns to the waiting state.

The reason we would want to use a thread pool instead of creating new threads over and over for each task we want to run on a separate thread is to save on the time it would otherwise take to construct a thread, submit work to it, and deconstruct it when it’s done running.  With a small collection of threads continuously running and waiting on tasks, we’re only left with the middle step - work submission.

Implementation

The thread pool presented here is based off the implementation provided in [1].  It has been updated to include variadic arguments for added flexibility.

A Thread-Safe Queue

Before we build the pool itself, we need a means of submitting work in a thread-safe manner.  Jobs should be picked up in the same order they are submitted to the pool, which means a queue is a good candidate.  Jobs are pushed to the back of the queue, and popped from the front.

/**
 * The ThreadSafeQueue class.
 * Provides a wrapper around a basic queue to provide thread safety.
 */
#pragma once

#ifndef THREADSAFEQUEUE_HPP
#define THREADSAFEQUEUE_HPP

#include <atomic>
#include <condition_variable>
#include <mutex>
#include <queue>
#include <utility>

namespace MyNamespace
{
    template <typename T>
    class ThreadSafeQueue
    {
    public:
        /**
         * Destructor.
         */
        ~ThreadSafeQueue(void)
        {
            invalidate();
        }

        /**
         * Attempt to get the first value in the queue.
         * Returns true if a value was successfully written to the out parameter, false otherwise.
         */
        bool tryPop(T& out)
        {
            std::lock_guard<std::mutex> lock{m_mutex};
            if(m_queue.empty() || !m_valid)
            {
                return false;
            }
            out = std::move(m_queue.front());
            m_queue.pop();
            return true;
        }

        /**
         * Get the first value in the queue.
         * Will block until a value is available unless clear is called or the instance is destructed.
         * Returns true if a value was successfully written to the out parameter, false otherwise.
         */
        bool waitPop(T& out)
        {
            std::unique_lock<std::mutex> lock{m_mutex};
            m_condition.wait(lock, [this]()
            {
                return !m_queue.empty() || !m_valid;
            });
            /*
             * Using the condition in the predicate ensures that spurious wakeups with a valid
             * but empty queue will not proceed, so only need to check for validity before proceeding.
             */
            if(!m_valid)
            {
                return false;
            }
            out = std::move(m_queue.front());
            m_queue.pop();
            return true;
        }

        /**
         * Push a new value onto the queue.
         */
        void push(T value)
        {
            std::lock_guard<std::mutex> lock{m_mutex};
            m_queue.push(std::move(value));
            m_condition.notify_one();
        }

        /**
         * Check whether or not the queue is empty.
         */
        bool empty(void) const
        {
            std::lock_guard<std::mutex> lock{m_mutex};
            return m_queue.empty();
        }

        /**
         * Clear all items from the queue.
         */
        void clear(void)
        {
            std::lock_guard<std::mutex> lock{m_mutex};
            while(!m_queue.empty())
            {
                m_queue.pop();
            }
            m_condition.notify_all();
        }

        /**
         * Invalidate the queue.
         * Used to ensure no conditions are being waited on in waitPop when
         * a thread or the application is trying to exit.
         * The queue is invalid after calling this method and it is an error
         * to continue using a queue after this method has been called.
         */
        void invalidate(void)
        {
            std::lock_guard<std::mutex> lock{m_mutex};
            m_valid = false;
            m_condition.notify_all();
        }

        /**
         * Returns whether or not this queue is valid.
         */
        bool isValid(void) const
        {
            std::lock_guard<std::mutex> lock{m_mutex};
            return m_valid;
        }

    private:
        std::atomic_bool m_valid{true};
        mutable std::mutex m_mutex;
        std::queue<T> m_queue;
        std::condition_variable m_condition;
    };
}

#endif

Most of this is pretty standard fare for designing a thread-safe class.  We lock a mutex anytime we need to read or write data and provide a simplified interface over a std::queue where writes are checked for validity before being performed.  This is why tryPop and waitPop return bools for success and write to the provide parameter in successful cases.

Any time push is called with a new task, it calls notify_one() on the condition variable which will wake one thread blocked on the condition.  The mutex is locked, the predicate is checked, and if all conditions are met (the queue is not empty and the queue is still valid), a task is popped and returned from the queue.

Because this queue provides a blocking method, waitPop, that depends on a condition variable being set to continue, it also needs a way to signal to anything waiting on the condition in the case that the queue needs to be deconstructed while there are threads still blocked on the condition.  This is accomplished through the invalidate() method that first sets the m_valid member to false and then calls notify_all() on the condition variable.  This will wake up every thread blocked on the condition and waitPop will return with a value of false, indicating to the call site that no work is being returned.

Another nicety the condition variable gives us is protection from spurious wakeups [3].  If a spurious wakeup does occur and the entire predicate isn’t met, the thread goes back to waiting.

The Thread Pool

The implementation of the thread pool is shown below.

/**
 * The ThreadPool class.
 * Keeps a set of threads constantly waiting to execute incoming jobs.
 */
#pragma once

#ifndef THREADPOOL_HPP
#define THREADPOOL_HPP

#include "ThreadSafeQueue.hpp"

#include <algorithm>
#include <atomic>
#include <cstdint>
#include <functional>
#include <future>
#include <memory>
#include <thread>
#include <type_traits>
#include <utility>
#include <vector>

namespace MyNamespace
{
    class ThreadPool
    {
    private:
        class IThreadTask
        {
        public:
            IThreadTask(void) = default;
            virtual ~IThreadTask(void) = default;
            IThreadTask(const IThreadTask& rhs) = delete;
            IThreadTask& operator=(const IThreadTask& rhs) = delete;
            IThreadTask(IThreadTask&& other) = default;
            IThreadTask& operator=(IThreadTask&& other) = default;

            /**
             * Run the task.
             */
            virtual void execute() = 0;
        };

        template <typename Func>
        class ThreadTask: public IThreadTask
        {
        public:
            ThreadTask(Func&& func)
                :m_func{std::move(func)}
            {
            }

            ~ThreadTask(void) override = default;
            ThreadTask(const ThreadTask& rhs) = delete;
            ThreadTask& operator=(const ThreadTask& rhs) = delete;
            ThreadTask(ThreadTask&& other) = default;
            ThreadTask& operator=(ThreadTask&& other) = default;

            /**
             * Run the task.
             */
            void execute() override
            {
                m_func();
            }

        private:
            Func m_func;
        };

    public:
        /**
         * A wrapper around a std::future that adds the behavior of futures returned from std::async.
         * Specifically, this object will block and wait for execution to finish before going out of scope.
         */
        template <typename T>
        class TaskFuture
        {
        public:
            TaskFuture(std::future<T>&& future)
                :m_future{std::move(future)}
            {
            }

            TaskFuture(const TaskFuture& rhs) = delete;
            TaskFuture& operator=(const TaskFuture& rhs) = delete;
            TaskFuture(TaskFuture&& other) = default;
            TaskFuture& operator=(TaskFuture&& other) = default;
            ~TaskFuture(void)
            {
                if(m_future.valid())
                {
                    m_future.get();
                }
            }

            auto get(void)
            {
                return m_future.get();
            }


        private:
            std::future<T> m_future;
        };

    public:
        /**
         * Constructor.
         */
        ThreadPool(void)
            :ThreadPool{std::max(std::thread::hardware_concurrency(), 2u) - 1u}
        {
            /*
             * Always create at least one thread.  If hardware_concurrency() returns 0,
             * subtracting one would turn it to UINT_MAX, so get the maximum of
             * hardware_concurrency() and 2 before subtracting 1.
             */
        }

        /**
         * Constructor.
         */
        explicit ThreadPool(const std::uint32_t numThreads)
            :m_done{false},
            m_workQueue{},
            m_threads{}
        {
            try
            {
                for(std::uint32_t i = 0u; i < numThreads; ++i)
                {
                    m_threads.emplace_back(&ThreadPool::worker, this);
                }
            }
            catch(...)
            {
                destroy();
                throw;
            }
        }

        /**
         * Non-copyable.
         */
        ThreadPool(const ThreadPool& rhs) = delete;

        /**
         * Non-assignable.
         */
        ThreadPool& operator=(const ThreadPool& rhs) = delete;

        /**
         * Destructor.
         */
        ~ThreadPool(void)
        {
            destroy();
        }

        /**
         * Submit a job to be run by the thread pool.
         */
        template <typename Func, typename... Args>
        auto submit(Func&& func, Args&&... args)
        {
            auto boundTask = std::bind(std::forward<Func>(func), std::forward<Args>(args)...);
            using ResultType = std::result_of_t<decltype(boundTask)()>;
            using PackagedTask = std::packaged_task<ResultType()>;
            using TaskType = ThreadTask<PackagedTask>;
            
            PackagedTask task{std::move(boundTask)};
            TaskFuture<ResultType> result{task.get_future()};
            m_workQueue.push(std::make_unique<TaskType>(std::move(task)));
            return result;
        }

    private:
        /**
         * Constantly running function each thread uses to acquire work items from the queue.
         */
        void worker(void)
        {
            while(!m_done)
            {
                std::unique_ptr<IThreadTask> pTask{nullptr};
                if(m_workQueue.waitPop(pTask))
                {
                    pTask->execute();
                }
            }
        }

        /**
         * Invalidates the queue and joins all running threads.
         */
        void destroy(void)
        {
            m_done = true;
            m_workQueue.invalidate();
            for(auto& thread : m_threads)
            {
                if(thread.joinable())
                {
                    thread.join();
                }
            }
        }

    private:
        std::atomic_bool m_done;
        ThreadSafeQueue<std::unique_ptr<IThreadTask>> m_workQueue;
        std::vector<std::thread> m_threads;
    };

    namespace DefaultThreadPool
    {
        /**
         * Get the default thread pool for the application.
         * This pool is created with std::thread::hardware_concurrency() - 1 threads.
         */
        inline ThreadPool& getThreadPool(void)
        {
            static ThreadPool defaultPool;
            return defaultPool;
        }

        /**
         * Submit a job to the default thread pool.
         */
        template <typename Func, typename... Args>
        inline auto submitJob(Func&& func, Args&&... args)
        {
            return getThreadPool().submit(std::forward<Func>(func), std::forward<Args>(args)...);
        }
    }
}

#endif

There are a few pieces to touch on here.  First, we have an IThreadTask interface that defines an execute() pure virtual function.  The reason for this interface is simply so we can maintain a collection of them in one container type (the ThreadSafeQueue<T>).  ThreadTask<T> implements IThreadTask and takes a callable type T for its template parameter.

When constructing the thread pool, we attempt to read the number of hardware threads available to the system by using std::thread::hardware_concurrency().  We always ensure the pool is started with at least one thread running, and ideally started with hardware_concurrency - 1 threads running.  The reason for the minus one will be discussed later.  For each thread available, we construct a std::thread object that runs the private member function worker().

The worker function’s only job is to endlessly check the queue to see if there is work to be done and execute the task if there is.  Since we’ve taken care to design the queue in a thread-safe manner, we don’t need to do any additional synchronization here.  The thread will enter the loop, get to waitPop, and either pop and execute a queued task, or wait on a task to become available via the submit function.  If waitPop returns true, we know pTask has been written to and can immediately execute it.  If it returns false, it most likely means that the queue has been invalidated.

The submit function is the public facing interface of the thread pool.  It starts by creating a few handy type definitions that make the actual implementation easier to follow.  First, the provided function and its arguments are bound to a callable object with no parameters using std::bind.  We need this for our ThreadTask<T> class to be able to call execute on its functor without having to know the arguments that came with the original function.  We then create a std::packaged_task with the bound task and extract the std::future from it before pushing it onto the queue.  Here again, we do not need to do any additional synchronization due to the thread-safe implementation of the queue.  You’ll notice the std::future returned from the std::packaged_task is wrapped in a class called TaskFuture<T>.  This was a design decision because of the way I intend to use the pool in my specific application.  I wanted the futures to mimic the way std::async futures work, specifically that they will block until their work is complete when they are going out of scope and being destructed.  std::packaged_task futures don’t do this out of the box, so we give them a simple wrapper to emulate the behavior [2].  Like std::future, TaskFuture is movable-only, so the synchronization does not have to occur in the same method as the call site as long as it’s passed along from the method.

You will see where the queue’s invalidate method is called in the thread pool’s destroy() method, which is called from the destructor or if an exception is thrown while creating the threads in the constructor, before joining the threads, and after setting the thread pool’s done marker to true.  The order is important to ensure that the threads know to exit their worker functions instead of re-attempting to obtain more work from the invalidated queue.  Due to the way the predicate is set up on the queue’s condition variable, it is not an error to re-enter waitPop on an invalidated queue since it will just return false, but it is a waste of time.

An optional nicety I decided to throw in is the DefaultThreadPool namespace.  This creates a thread pool with the maximum number of threads as discussed previously and is accessible from anywhere in the application that includes the thread pool header.  I prefer using this as opposed to having each subsystem owning its own thread pool, but there’s nothing wrong with creating thread pool instances through the constructors, either.

Submitting Work to the Thread Pool

With the above in place.  Submitting work is as simple as including the thread pools header file and calling its submit function with a callable object and optionally arguments to be provided to it.

auto taskFuture = MyNamespace::DefaultThreadPool::submitJob([]()
{
    lengthyProcess();
});

auto taskFuture2 = MyNamespace::DefaultThreadPool::submitJob([](int a, float b)
{
    lengthyProcessWithArguments(a, b);
}, 5, 10.0f);

If submitting a reference for an argument, it is important to remember to wrap it with std::ref or std::cref.

MyObject obj;
auto taskFuture = MyNamespace::DefaultThreadPool::submitJob([](const MyObject& object)
{
    lengthyProcessThatNeedsToReadObject(object);
}, std::cref(obj));

Does It Work?

To ensure the thread pool and backing queue work not only in ideal cases, but also in the case where work is being submitted faster than the threads can take it on, we can write a little program that submits a bunch of jobs that sleep for a while and then synchronizes on them.  My machine reports eight as the result of std::thread::hardware_concurrency(), so I create a thread pool with seven threads.  The task I’m running is just to sleep whatever thread is executing for one second and finish.  I’ll submit twenty-one of these jobs to the pool.  We know that this would take about twenty-one seconds if executed serially, and since we’re running a thread pool with seven threads, we know that if everything is working well the jobs should all complete in about three seconds.

using namespace MyNamespace;
Timer saturationTimer;
const auto startTime = saturationTimer.tick();
std::vector<ThreadPool::TaskFuture<void>> v;
for(std::uint32_t i = 0u; i < 21u; ++i)
{
    v.push_back(DefaultThreadPool::submitJob([]()
    {
        std::this_thread::sleep_for(std::chrono::seconds(1));
    }));
}
for(auto& item: v)
{
    item.get();
}
const auto dt = saturationTimer.tick() - startTime;

Running the above code on my machine, the result is just about what would be expected, averaging around 3.005 seconds over a dozen runs.

About the Number of Pooled Threads

Earlier I mentioned that I start the thread pool with std::thread::hardware_concurrency() - 1 threads.  The reason for this is simple.  The thread that’s calling the thread pool is a perfectly valid thread to do work on while you’re waiting for the results of submitted tasks to become available.  Despite the example from the Does It Work? section, submitting a bunch of jobs and then just waiting on them to complete is hardly optimal, so it makes sense to have the thread pool executing up to NumThreads - 1 jobs and the main thread doing whatever work it can accomplish in the meantime.  Splitting the workload up evenly across all available threads is usually the best approach with a task-based setup like this.

Conclusion

This post has discussed what a thread pool is, why they’re useful, and how to get started implementing one.  There are very likely ways to make the provided thread pool more performant by specializing it more to avoid memory allocations on job submissions, but for my use cases I typically ensure the jobs being submitted are large enough that they make up for the time lost to allocating and deallocating memory with the time gained by running them in parallel with other large tasks.  Your mileage may vary, but at the very least you should have a solid start to customizing a thread pool to fit your exact needs.

Thank You

A big thank you to the members of /r/cpp who helped with code review and provided excellent feedback!

References

[1] William, Anthony.  C++ Concurrency in Action:  Practical Multithreading.  ISBN:  9781933988771

[2] http://scottmeyers.blogspot.com/2013/03/stdfutures-from-stdasync-arent-special.html

[3] http://en.cppreference.com/w/cpp/thread/condition_variable/wait

Updated:

Leave a Comment