Create ThreadPool component with test suite

pull/53/head
Michael 6 years ago
parent 8a52bf448e
commit 3c5e3e79f9
No known key found for this signature in database
GPG Key ID: 2D51757B47E2434C

@ -267,6 +267,7 @@ set(LIB_PLATFORM_SRC
llarp/timer.cpp
# for threading
llarp/queue_manager.cpp
llarp/thread_pool.cpp
llarp/threadpool.cpp
# for android shim
${ANDROID_PLATFORM_SRC}
@ -515,6 +516,7 @@ set(TEST_SRC
test/test_dnsd_unit.cpp
test/test_llarp_queue.cpp
test/test_llarp_queue_manager.cpp
test/test_llarp_thread_pool.cpp
)

@ -0,0 +1,329 @@
#include <thread_pool.hpp>
namespace llarp
{
namespace thread
{
using LockGuard = std::unique_lock< std::mutex >;
void
ThreadPool::join()
{
for(auto& t : m_threads)
{
if(t.joinable())
{
t.join();
}
}
m_createdThreads = 0;
}
void
ThreadPool::runJobs()
{
while(m_status.load(std::memory_order_relaxed) == Status::Run)
{
auto functor = m_queue.tryPopFront();
if(functor.has_value())
{
functor.value()();
}
else
{
m_idleThreads++;
if(m_status == Status::Run && m_queue.empty())
{
m_semaphore.wait();
}
m_idleThreads.fetch_sub(1, std::memory_order_relaxed);
}
}
}
void
ThreadPool::drainQueue()
{
while(m_status.load(std::memory_order_relaxed) == Status::Drain)
{
auto functor = m_queue.tryPopFront();
if(!functor)
{
return;
}
functor.value()();
}
}
void
ThreadPool::waitThreads()
{
LockGuard lock(m_gateMutex);
m_threadsReadyCond.wait(
lock, [this]() { return m_numThreadsReady == m_threads.size(); });
}
void
ThreadPool::releaseThreads()
{
LockGuard lock(m_gateMutex);
m_numThreadsReady = 0;
++m_gateCount;
m_gateCond.notify_all();
}
void
ThreadPool::interrupt()
{
LockGuard lock(m_gateMutex);
size_t count = m_idleThreads;
for(size_t i = 0; i < count; ++i)
{
m_semaphore.notify();
}
}
void
ThreadPool::worker()
{
size_t gateCount = m_gateCount;
for(;;)
{
{
LockGuard lock(m_gateMutex);
++m_numThreadsReady;
m_threadsReadyCond.notify_one();
m_gateCond.wait(lock, [&]() { return gateCount != m_gateCount; });
gateCount = m_gateCount;
}
Status status = m_status.load(std::memory_order_relaxed);
// Can't use a switch here as we want to load and fall through.
if(status == Status::Run)
{
runJobs();
status = m_status;
}
if(status == Status::Drain)
{
drainQueue();
}
else if(status == Status::Suspend)
{
continue;
}
else
{
assert(status == Status::Stop);
return;
}
}
}
bool
ThreadPool::spawn()
{
try
{
m_threads.at(m_createdThreads) =
std::thread(std::bind(&ThreadPool::worker, this));
++m_createdThreads;
return true;
}
catch(const std::system_error&)
{
return false;
}
}
ThreadPool::ThreadPool(size_t numThreads, size_t maxJobs)
: m_queue(maxJobs)
, m_semaphore(0)
, m_idleThreads(0)
, m_status(Status::Stop)
, m_gateCount(0)
, m_numThreadsReady(0)
, m_threads(numThreads)
, m_createdThreads(0)
{
assert(numThreads != 0);
assert(maxJobs != 0);
disable();
}
ThreadPool::~ThreadPool()
{
shutdown();
}
bool
ThreadPool::addJob(const Job& job)
{
assert(job);
QueueReturn ret = m_queue.pushBack(job);
if(ret == QueueReturn::Success && m_idleThreads > 0)
{
m_semaphore.notify();
}
return ret == QueueReturn::Success;
}
bool
ThreadPool::addJob(Job&& job)
{
assert(job);
QueueReturn ret = m_queue.pushBack(std::move(job));
if(ret == QueueReturn::Success && m_idleThreads > 0)
{
m_semaphore.notify();
}
return ret == QueueReturn::Success;
}
bool
ThreadPool::tryAddJob(const Job& job)
{
assert(job);
QueueReturn ret = m_queue.tryPushBack(job);
if(ret == QueueReturn::Success && m_idleThreads > 0)
{
m_semaphore.notify();
}
return ret == QueueReturn::Success;
}
bool
ThreadPool::tryAddJob(Job&& job)
{
assert(job);
QueueReturn ret = m_queue.tryPushBack(std::move(job));
if(ret == QueueReturn::Success && m_idleThreads > 0)
{
m_semaphore.notify();
}
return ret == QueueReturn::Success;
}
void
ThreadPool::drain()
{
LockGuard lock(m_mutex);
if(m_status.load(std::memory_order_relaxed) == Status::Run)
{
m_status = Status::Drain;
interrupt();
waitThreads();
m_status = Status::Run;
releaseThreads();
}
}
void
ThreadPool::shutdown()
{
LockGuard lock(m_mutex);
if(m_status.load(std::memory_order_relaxed) == Status::Run)
{
m_queue.disable();
m_status = Status::Stop;
interrupt();
m_queue.removeAll();
join();
}
}
bool
ThreadPool::start()
{
LockGuard lock(m_mutex);
if(m_status.load(std::memory_order_relaxed) != Status::Stop)
{
return true;
}
for(auto it = (m_threads.begin() + m_createdThreads);
it != m_threads.end(); ++it)
{
if(!spawn())
{
releaseThreads();
join();
return false;
}
}
waitThreads();
m_queue.enable();
m_status = Status::Run;
// `releaseThreads` has a release barrier so workers don't return from
// wait and not see the above store.
releaseThreads();
return true;
}
void
ThreadPool::stop()
{
LockGuard lock(m_mutex);
if(m_status.load(std::memory_order_relaxed) == Status::Run)
{
m_queue.disable();
m_status = Status::Drain;
// `interrupt` has an acquire barrier (locks a mutex), so nothing will
// be executed before the above store to `status`.
interrupt();
waitThreads();
m_status = Status::Stop;
// `releaseThreads` has a release barrier so workers don't return from
// wait and not see the above store.
releaseThreads();
join();
}
}
} // namespace thread
} // namespace llarp

@ -0,0 +1,210 @@
#ifndef LLARP_THREAD_POOL_HPP
#define LLARP_THREAD_POOL_HPP
#include <llarp/threading.hpp>
#include <queue.hpp>
#include <functional>
#include <thread>
#include <vector>
namespace llarp
{
namespace thread
{
class ThreadPool
{
// Provide an efficient fixed size threadpool. The following attributes
// of the threadpool are fixed at construction time:
// - the max number of pending jobs
// - the number of threads
public:
using Job = std::function< void() >;
using JobQueue = Queue< Job >;
enum class Status
{
Stop,
Run,
Suspend,
Drain
};
private:
JobQueue m_queue; // The job queue
util::Semaphore m_semaphore; // The semaphore for the queue.
std::atomic_size_t m_idleThreads; // Number of idle threads
std::mutex m_mutex;
std::atomic< Status > m_status;
size_t m_gateCount;
size_t m_numThreadsReady; // Threads ready to go through the gate.
std::mutex m_gateMutex;
std::condition_variable m_threadsReadyCond;
std::condition_variable m_gateCond;
std::vector< std::thread > m_threads;
size_t m_createdThreads;
void
join();
void
runJobs();
void
drainQueue();
void
waitThreads();
void
releaseThreads();
void
interrupt();
void
worker();
bool
spawn();
public:
ThreadPool(size_t numThreads, size_t maxJobs);
~ThreadPool();
// Disable the threadpool. Calls to `addJob` and `tryAddJob` will fail.
// Jobs currently in the pool will not be affected.
void
disable();
void
enable();
// Add a job to the bool. Note this call will block if the underlying
// queue is full.
// Returns false if the queue is currently disabled.
bool
addJob(const Job& job);
bool
addJob(Job&& job);
// Try to add a job to the pool. If the queue is full, or the queue is
// disabled, return false.
// This call will not block.
bool
tryAddJob(const Job& job);
bool
tryAddJob(Job&& job);
// Wait until all current jobs are complete.
// If any jobs are submitted during this time, they **may** or **may not**
// run.
void
drain();
// Disable this pool, and cancel all pending jobs. After all currently
// running jobs are complete, join with the threads in the pool.
void
shutdown();
// Start this threadpool by spawning `threadCount()` threads.
bool
start();
// Disable queueing on this threadpool and wait until all pending jobs
// have finished.
void
stop();
bool
enabled() const;
bool
started() const;
size_t
activeThreadCount() const;
// Current number of queued jobs
size_t
jobCount() const;
// Number of threads passed in the constructor
size_t
threadCount() const;
// Number of threads currently started in the threadpool
size_t
startedThreadCount() const;
// Max number of queued jobs
size_t
capacity() const;
};
inline void
ThreadPool::disable()
{
m_queue.disable();
}
inline void
ThreadPool::enable()
{
m_queue.enable();
}
inline bool
ThreadPool::enabled() const
{
return m_queue.enabled();
}
inline size_t
ThreadPool::activeThreadCount() const
{
if(m_threads.size() == m_createdThreads)
{
return m_threads.size() - m_idleThreads.load(std::memory_order_relaxed);
}
else
{
return 0;
}
}
inline size_t
ThreadPool::threadCount() const
{
return m_threads.size();
}
inline size_t
ThreadPool::startedThreadCount() const
{
return m_createdThreads;
}
inline size_t
ThreadPool::jobCount() const
{
return m_queue.size();
}
inline size_t
ThreadPool::capacity() const
{
return m_queue.capacity();
}
} // namespace thread
} // namespace llarp
#endif

@ -0,0 +1,451 @@
#include <thread_pool.hpp>
#include <llarp/threading.hpp>
#include <gtest/gtest.h>
using namespace llarp;
using namespace llarp::thread;
using LockGuard = std::unique_lock< std::mutex >;
class PoolArgs
{
public:
std::mutex& mutex;
std::condition_variable& start;
std::condition_variable& stop;
volatile size_t count;
volatile size_t startSignal;
volatile size_t stopSignal;
};
class BarrierArgs
{
public:
util::Barrier& startBarrier;
util::Barrier& stopBarrier;
std::atomic_size_t count;
};
class BasicWorkArgs
{
public:
std::atomic_size_t count;
};
void
simpleFunction(PoolArgs& args)
{
LockGuard lock(args.mutex);
++args.count;
++args.startSignal;
args.start.notify_one();
args.stop.wait(lock, [&]() { return args.stopSignal; });
}
void
incrementFunction(PoolArgs& args)
{
LockGuard lock(args.mutex);
++args.count;
++args.startSignal;
args.start.notify_one();
}
void
barrierFunction(BarrierArgs& args)
{
args.startBarrier.wait();
args.count++;
args.stopBarrier.wait();
}
void
basicWork(BasicWorkArgs& args)
{
args.count++;
}
void
recurse(util::Barrier& barrier, std::atomic_size_t& counter, ThreadPool& pool,
size_t depthLimit)
{
ASSERT_LE(0u, counter);
ASSERT_GT(depthLimit, counter);
if(++counter != depthLimit)
{
ASSERT_TRUE(
pool.addJob(std::bind(recurse, std::ref(barrier), std::ref(counter),
std::ref(pool), depthLimit)));
}
barrier.wait();
}
class DestructiveObject
{
private:
util::Barrier& barrier;
ThreadPool& pool;
public:
DestructiveObject(util::Barrier& b, ThreadPool& p) : barrier(b), pool(p)
{
}
~DestructiveObject()
{
auto job = std::bind(&util::Barrier::wait, &barrier);
pool.addJob(job);
}
};
void
destructiveJob(DestructiveObject* obj)
{
delete obj;
}
TEST(TestThreadPool, breathing)
{
static constexpr size_t threads = 10;
static constexpr size_t capacity = 50;
ThreadPool pool(threads, capacity);
ASSERT_EQ(0u, pool.startedThreadCount());
ASSERT_EQ(capacity, pool.capacity());
ASSERT_EQ(0u, pool.jobCount());
ASSERT_TRUE(pool.start());
ASSERT_EQ(threads, pool.startedThreadCount());
ASSERT_EQ(capacity, pool.capacity());
ASSERT_EQ(0u, pool.jobCount());
pool.drain();
}
struct AccessorsData
{
size_t threads;
size_t capacity;
};
std::ostream&
operator<<(std::ostream& os, AccessorsData d)
{
os << "[ threads = " << d.threads << " capacity = " << d.capacity << " ]";
return os;
}
class Accessors : public ::testing::TestWithParam< AccessorsData >
{
};
TEST_P(Accessors, acessors)
{
auto d = GetParam();
ThreadPool pool(d.threads, d.capacity);
ASSERT_EQ(d.threads, pool.threadCount());
ASSERT_EQ(d.capacity, pool.capacity());
ASSERT_EQ(0u, pool.startedThreadCount());
}
static const AccessorsData accessorsData[] = {
{10, 50}, {1, 1}, {50, 100}, {2, 22}, {100, 200}};
INSTANTIATE_TEST_CASE_P(TestThreadPool, Accessors,
::testing::ValuesIn(accessorsData));
struct ClosingData
{
size_t threads;
size_t capacity;
};
std::ostream&
operator<<(std::ostream& os, ClosingData d)
{
os << "[ threads = " << d.threads << " capacity = " << d.capacity << " ]";
return os;
}
class Closing : public ::testing::TestWithParam< ClosingData >
{
};
TEST_P(Closing, drain)
{
auto d = GetParam();
std::mutex mutex;
std::condition_variable start;
std::condition_variable stop;
PoolArgs args{mutex, start, stop, 0, 0, 0};
ThreadPool pool(d.threads, d.capacity);
ASSERT_EQ(d.threads, pool.threadCount());
ASSERT_EQ(d.capacity, pool.capacity());
ASSERT_EQ(0u, pool.startedThreadCount());
auto simpleJob = std::bind(simpleFunction, std::ref(args));
ASSERT_FALSE(pool.addJob(simpleJob));
ASSERT_TRUE(pool.start());
ASSERT_EQ(0u, pool.jobCount());
LockGuard lock(mutex);
for(size_t i = 0; i < d.threads; ++i)
{
args.startSignal = 0;
args.stopSignal = 0;
ASSERT_TRUE(pool.addJob(simpleJob));
start.wait(lock, [&]() { return args.startSignal; });
}
args.stopSignal++;
lock.unlock();
stop.notify_all();
pool.drain();
ASSERT_EQ(d.threads, pool.startedThreadCount());
ASSERT_EQ(0u, pool.jobCount());
}
TEST_P(Closing, stop)
{
auto d = GetParam();
ThreadPool pool(d.threads, d.capacity);
std::mutex mutex;
std::condition_variable start;
std::condition_variable stop;
PoolArgs args{mutex, start, stop, 0, 0, 0};
ASSERT_EQ(d.threads, pool.threadCount());
ASSERT_EQ(d.capacity, pool.capacity());
ASSERT_EQ(0u, pool.startedThreadCount());
auto simpleJob = std::bind(simpleFunction, std::ref(args));
ASSERT_FALSE(pool.addJob(simpleJob));
ASSERT_TRUE(pool.start());
ASSERT_EQ(0u, pool.jobCount());
LockGuard lock(mutex);
for(size_t i = 0; i < d.capacity; ++i)
{
args.startSignal = 0;
args.stopSignal = 0;
ASSERT_TRUE(pool.addJob(simpleJob));
while(i < d.threads && !args.startSignal)
{
start.wait(lock);
}
}
args.stopSignal++;
lock.unlock();
stop.notify_all();
pool.stop();
ASSERT_EQ(d.capacity, args.count);
ASSERT_EQ(0u, pool.startedThreadCount());
ASSERT_EQ(0u, pool.activeThreadCount());
ASSERT_EQ(0u, pool.jobCount());
}
TEST_P(Closing, shutdown)
{
auto d = GetParam();
ThreadPool pool(d.threads, d.capacity);
std::mutex mutex;
std::condition_variable start;
std::condition_variable stop;
PoolArgs args{mutex, start, stop, 0, 0, 0};
ASSERT_EQ(d.threads, pool.threadCount());
ASSERT_EQ(d.capacity, pool.capacity());
ASSERT_EQ(0u, pool.startedThreadCount());
auto simpleJob = std::bind(simpleFunction, std::ref(args));
ASSERT_FALSE(pool.addJob(simpleJob));
ASSERT_TRUE(pool.start());
ASSERT_EQ(0u, pool.jobCount());
LockGuard lock(mutex);
for(size_t i = 0; i < d.capacity; ++i)
{
args.startSignal = 0;
args.stopSignal = 0;
ASSERT_TRUE(pool.addJob(simpleJob));
while(i < d.threads && !args.startSignal)
{
start.wait(lock);
}
}
ASSERT_EQ(d.threads, pool.startedThreadCount());
ASSERT_EQ(d.capacity - d.threads, pool.jobCount());
auto incrementJob = std::bind(incrementFunction, std::ref(args));
for(size_t i = 0; i < d.threads; ++i)
{
ASSERT_TRUE(pool.addJob(incrementJob));
}
args.stopSignal++;
stop.notify_all();
lock.unlock();
pool.shutdown();
ASSERT_EQ(0u, pool.startedThreadCount());
ASSERT_EQ(0u, pool.activeThreadCount());
ASSERT_EQ(0u, pool.jobCount());
}
ClosingData closingData[] = {{1, 1}, {2, 2}, {10, 10},
{10, 50}, {50, 75}, {25, 80}};
INSTANTIATE_TEST_CASE_P(TestThreadPool, Closing,
::testing::ValuesIn(closingData));
struct TryAddData
{
size_t threads;
size_t capacity;
};
std::ostream&
operator<<(std::ostream& os, TryAddData d)
{
os << "[ threads = " << d.threads << " capacity = " << d.capacity << " ]";
return os;
}
class TryAdd : public ::testing::TestWithParam< TryAddData >
{
};
TEST_P(TryAdd, noblocking)
{
// Verify that tryAdd does not block.
// Fill the queue, then verify `tryAddJob` does not block.
auto d = GetParam();
ThreadPool pool(d.threads, d.capacity);
util::Barrier startBarrier(d.threads + 1);
util::Barrier stopBarrier(d.threads + 1);
BarrierArgs args{startBarrier, stopBarrier, 0};
auto simpleJob = std::bind(barrierFunction, std::ref(args));
ASSERT_FALSE(pool.tryAddJob(simpleJob));
ASSERT_TRUE(pool.start());
for(size_t i = 0; i < d.threads; ++i)
{
ASSERT_TRUE(pool.tryAddJob(simpleJob));
}
// Wait for everything to start.
startBarrier.wait();
// and that we emptied the queue.
ASSERT_EQ(0u, pool.jobCount());
BasicWorkArgs basicWorkArgs = {0};
auto workJob = std::bind(basicWork, std::ref(basicWorkArgs));
for(size_t i = 0; i < d.capacity; ++i)
{
ASSERT_TRUE(pool.tryAddJob(workJob));
}
// queue should now be full
ASSERT_FALSE(pool.tryAddJob(workJob));
// and finish
stopBarrier.wait();
}
TEST(TestThreadPool, recurseJob)
{
// Verify we can enqueue a job onto the threadpool from a thread which is
// currently executing a threadpool job.
static constexpr size_t threads = 10;
static constexpr size_t depth = 10;
static constexpr size_t capacity = 100;
ThreadPool pool(threads, capacity);
util::Barrier barrier(threads + 1);
std::atomic_size_t counter = 0;
pool.start();
ASSERT_TRUE(pool.addJob(std::bind(recurse, std::ref(barrier),
std::ref(counter), std::ref(pool), depth)));
barrier.wait();
ASSERT_EQ(depth, counter);
}
TEST(TestThreadPool, destructors)
{
// Verify that functors have their destructors called outside of threadpool
// locks.
static constexpr size_t threads = 1;
static constexpr size_t capacity = 100;
ThreadPool pool(threads, capacity);
pool.start();
util::Barrier barrier(threads + 1);
{
DestructiveObject* obj = new DestructiveObject(barrier, pool);
ASSERT_TRUE(pool.addJob(std::bind(destructiveJob, obj)));
}
barrier.wait();
}
Loading…
Cancel
Save