Merge branch 'master' into dev

pull/67/head
Jeff Becker 6 years ago
commit f5ac1b5c0d
No known key found for this signature in database
GPG Key ID: F357B3B42F6F9B05

@ -7,8 +7,8 @@
"name": "(lldb) Launch",
"type": "cppdbg",
"request": "launch",
"program": "${workspaceFolder}/lokinet",
"args": [],
"program": "${workspaceFolder}/build/testAll",
"args": ["--gtest_shuffle", "--gtest_filter=TestThreadPool*"],
"stopAtEntry": false,
"cwd": "${workspaceFolder}",
"environment": [],

@ -1,80 +1,76 @@
{
"editor.formatOnSave": true,
"files.associations": {
"array": "cpp",
"limits": "cpp",
"cctype": "cpp",
"clocale": "cpp",
"cmath": "cpp",
"cstdarg": "cpp",
"cstddef": "cpp",
"cstdio": "cpp",
"cstdlib": "cpp",
"cstring": "cpp",
"ctime": "cpp",
"cwchar": "cpp",
"cwctype": "cpp",
"atomic": "cpp",
"*.tcc": "cpp",
"bitset": "cpp",
"chrono": "cpp",
"codecvt": "cpp",
"condition_variable": "cpp",
"cstdint": "cpp",
"deque": "cpp",
"list": "cpp",
"unordered_map": "cpp",
"unordered_set": "cpp",
"exception": "cpp",
"filesystem": "cpp",
"string_view": "cpp",
"fstream": "cpp",
"functional": "cpp",
"initializer_list": "cpp",
"iomanip": "cpp",
"iosfwd": "cpp",
"iostream": "cpp",
"istream": "cpp",
"memory": "cpp",
"mutex": "cpp",
"optional": "cpp",
"ostream": "cpp",
"ratio": "cpp",
"sstream": "cpp",
"stdexcept": "cpp",
"streambuf": "cpp",
"system_error": "cpp",
"thread": "cpp",
"cinttypes": "cpp",
"type_traits": "cpp",
"tuple": "cpp",
"typeindex": "cpp",
"typeinfo": "cpp",
"utility": "cpp",
"__config": "cpp",
"__nullptr": "cpp",
"algorithm": "cpp",
"io": "cpp",
"strstream": "cpp",
"numeric": "cpp",
"valarray": "cpp",
"*.ipp": "cpp",
"csignal": "cpp",
"future": "cpp",
"map": "cpp",
"vector": "cpp",
"new": "cpp",
"shared_mutex": "cpp",
"complex": "cpp",
"variant": "cpp",
"any": "cpp",
"tuntap.h": "c",
"hashtable": "cpp",
"crypto_stream_salsa20.h": "c",
"implementations.h": "c",
"stream_salsa20.h": "c",
"salsa20_xmm6int-sse2.h": "c",
"salsa20_ref.h": "c",
"salsa20_xmm6int-avx2.h": "c"
}
"editor.formatOnSave": true,
"files.associations": {
"array": "cpp",
"limits": "cpp",
"cctype": "cpp",
"clocale": "cpp",
"cmath": "cpp",
"cstdarg": "cpp",
"cstddef": "cpp",
"cstdio": "cpp",
"cstdlib": "cpp",
"cstring": "cpp",
"ctime": "cpp",
"cwchar": "cpp",
"cwctype": "cpp",
"atomic": "cpp",
"*.tcc": "cpp",
"bitset": "cpp",
"chrono": "cpp",
"codecvt": "cpp",
"condition_variable": "cpp",
"cstdint": "cpp",
"deque": "cpp",
"list": "cpp",
"unordered_map": "cpp",
"unordered_set": "cpp",
"exception": "cpp",
"filesystem": "cpp",
"string_view": "cpp",
"fstream": "cpp",
"functional": "cpp",
"initializer_list": "cpp",
"iomanip": "cpp",
"iosfwd": "cpp",
"iostream": "cpp",
"istream": "cpp",
"memory": "cpp",
"mutex": "cpp",
"optional": "cpp",
"ostream": "cpp",
"ratio": "cpp",
"sstream": "cpp",
"stdexcept": "cpp",
"streambuf": "cpp",
"system_error": "cpp",
"thread": "cpp",
"cinttypes": "cpp",
"type_traits": "cpp",
"tuple": "cpp",
"typeindex": "cpp",
"typeinfo": "cpp",
"utility": "cpp",
"__config": "cpp",
"__nullptr": "cpp",
"algorithm": "cpp",
"io": "cpp",
"strstream": "cpp",
"numeric": "cpp",
"valarray": "cpp",
"*.ipp": "cpp",
"csignal": "cpp",
"future": "cpp",
"map": "cpp",
"vector": "cpp",
"new": "cpp",
"shared_mutex": "cpp",
"complex": "cpp",
"variant": "cpp",
"any": "cpp",
"tuntap.h": "c",
"hashtable": "cpp",
"__mutex_base": "cpp",
"iterator": "cpp"
}
}

@ -273,6 +273,8 @@ set(LIB_PLATFORM_SRC
# for logic
llarp/timer.cpp
# for threading
llarp/queue_manager.cpp
llarp/thread_pool.cpp
llarp/threadpool.cpp
# for android shim
${ANDROID_PLATFORM_SRC}
@ -518,6 +520,9 @@ set(TEST_SRC
test/test_dns_unit.cpp
test/test_dnsc_unit.cpp
test/test_dnsd_unit.cpp
test/test_llarp_queue.cpp
test/test_llarp_queue_manager.cpp
test/test_llarp_thread_pool.cpp
)

@ -87,6 +87,115 @@ namespace llarp
}
};
class Semaphore
{
private:
std::mutex m_mutex;
std::condition_variable m_cv;
size_t m_count;
public:
Semaphore(size_t count) : m_count(count)
{
}
void
notify()
{
std::unique_lock< std::mutex > lock(m_mutex);
m_count++;
m_cv.notify_one();
}
void
wait()
{
std::unique_lock< std::mutex > lock(m_mutex);
m_cv.wait(lock, [this]() { return this->m_count > 0; });
m_count--;
}
template < typename Rep, typename Period >
bool
waitFor(const std::chrono::duration< Rep, Period >& period)
{
std::unique_lock< std::mutex > lock(m_mutex);
if(m_cv.wait_for(lock, period, [this]() { return this->m_count > 0; }))
{
m_count--;
return true;
}
return false;
}
};
class Barrier
{
private:
std::mutex mutex;
std::condition_variable cv;
const size_t numThreads;
size_t numThreadsWaiting; // number of threads to be woken
size_t sigCount; // number of times the barrier has been signalled
size_t numPending; // number of threads that have been signalled, but
// haven't woken.
public:
Barrier(size_t threadCount)
: numThreads(threadCount)
, numThreadsWaiting(0)
, sigCount(0)
, numPending(0)
{
}
~Barrier()
{
for(;;)
{
{
std::unique_lock< std::mutex > lock(mutex);
if(numPending == 0)
{
break;
}
}
std::this_thread::yield();
}
assert(numThreadsWaiting == 0);
}
void
wait()
{
std::unique_lock< std::mutex > lock(mutex);
size_t signalCount = sigCount;
if(++numThreadsWaiting == numThreads)
{
++sigCount;
numPending += numThreads - 1;
numThreadsWaiting = 0;
cv.notify_all();
}
else
{
cv.wait(lock, [this, signalCount]() {
return this->sigCount != signalCount;
});
--numPending;
}
}
};
} // namespace util
} // namespace llarp

@ -0,0 +1,533 @@
#ifndef LLARP_QUEUE_HPP
#define LLARP_QUEUE_HPP
#include <llarp/threading.hpp>
#include <queue_manager.hpp>
#include <atomic>
#include <optional>
#include <tuple>
namespace llarp
{
namespace thread
{
template < typename Type >
class QueuePushGuard;
template < typename Type >
class QueuePopGuard;
template < typename Type >
class Queue
{
// This class provides a thread-safe, lock-free, fixed-size queue.
public:
static constexpr size_t Alignment = 64;
private:
Type *m_data;
const char m_dataPadding[Alignment - sizeof(Type *)];
QueueManager m_manager;
std::atomic_uint32_t m_waitingPoppers;
util::Semaphore m_popSemaphore;
const char
m_popSemaphorePadding[(2u * Alignment) - sizeof(util::Semaphore)];
std::atomic_uint32_t m_waitingPushers;
util::Semaphore m_pushSemaphore;
const char
m_pushSemaphorePadding[(2u * Alignment) - sizeof(util::Semaphore)];
friend QueuePopGuard< Type >;
friend QueuePushGuard< Type >;
public:
explicit Queue(size_t capacity);
~Queue();
Queue(const Queue &) = delete;
Queue &
operator=(const Queue &) = delete;
// Push back to the queue, blocking until space is available (if
// required). Will fail if the queue is disabled (or becomes disabled
// while waiting for space on the queue).
QueueReturn
pushBack(const Type &value);
QueueReturn
pushBack(Type &&value);
// Try to push back to the queue. Return false if the queue is full or
// disabled.
QueueReturn
tryPushBack(const Type &value);
QueueReturn
tryPushBack(Type &&value);
// Remove an element from the queue. Block until an element is available
Type
popFront();
std::optional< Type >
tryPopFront();
// Remove all elements from the queue. Note this is not atomic, and if
// other threads `pushBack` onto the queue during this call, the `size` of
// the queue is not guaranteed to be 0.
void
removeAll();
// Disable the queue. All push operations will fail "fast" (including
// blocked operations). Calling this method on a disabled queue has no
// effect.
void
disable();
// Enable the queue. Calling this method on a disabled queue has no
// effect.
void
enable();
size_t
capacity() const;
size_t
size() const;
bool
enabled() const;
bool
full() const;
bool
empty() const;
};
// Provide a guard class to provide exception safety for pushing to a queue.
// On destruction, unless the `release` method has been called, will remove
// and destroy all elements from the queue, putting the queue into an empty
// state.
template < typename Type >
class QueuePushGuard
{
private:
Queue< Type > *m_queue;
uint32_t m_generation;
uint32_t m_index;
public:
QueuePushGuard(Queue< Type > &queue, uint32_t generation, uint32_t index)
: m_queue(&queue), m_generation(generation), m_index(index)
{
}
~QueuePushGuard();
void
release();
};
// Provide a guard class to provide exception safety for popping from a
// queue. On destruction, this will pop the the given element from the
// queue.
template < typename Type >
class QueuePopGuard
{
private:
Queue< Type > &m_queue;
uint32_t m_generation;
uint32_t m_index;
public:
QueuePopGuard(Queue< Type > &queue, uint32_t generation, uint32_t index)
: m_queue(queue), m_generation(generation), m_index(index)
{
}
~QueuePopGuard();
};
template < typename Type >
Queue< Type >::Queue(size_t capacity)
: m_data(nullptr)
, m_dataPadding()
, m_manager(capacity)
, m_waitingPoppers(0)
, m_popSemaphore(0)
, m_popSemaphorePadding()
, m_waitingPushers(0)
, m_pushSemaphore(0)
, m_pushSemaphorePadding()
{
m_data = static_cast< Type * >(::operator new(capacity * sizeof(Type)));
}
template < typename Type >
Queue< Type >::~Queue()
{
removeAll();
// We have already deleted the queue members above, free as (void *)
::operator delete(static_cast< void * >(m_data));
}
template < typename Type >
QueueReturn
Queue< Type >::tryPushBack(const Type &value)
{
uint32_t generation = 0;
uint32_t index = 0;
// Sync point A
//
// The next call writes with full sequential consistency to the push
// index, which guarantees that the relaxed read to the waiting poppers
// count sees any waiting poppers from Sync point B.
QueueReturn retVal = m_manager.reservePushIndex(generation, index);
if(retVal != QueueReturn::Success)
{
return retVal;
}
// Copy into the array. If the copy constructor throws, the pushGuard will
// roll the reserve back.
QueuePushGuard< Type > pushGuard(*this, generation, index);
// Construct in place.
::new(&m_data[index]) Type(value);
pushGuard.release();
m_manager.commitPushIndex(generation, index);
if(m_waitingPoppers > 0)
{
m_popSemaphore.notify();
}
return QueueReturn::Success;
}
template < typename Type >
QueueReturn
Queue< Type >::tryPushBack(Type &&value)
{
uint32_t generation = 0;
uint32_t index = 0;
// Sync point A
//
// The next call writes with full sequential consistency to the push
// index, which guarantees that the relaxed read to the waiting poppers
// count sees any waiting poppers from Sync point B.
QueueReturn retVal = m_manager.reservePushIndex(generation, index);
if(retVal != QueueReturn::Success)
{
return retVal;
}
// Copy into the array. If the copy constructor throws, the pushGuard will
// roll the reserve back.
QueuePushGuard< Type > pushGuard(*this, generation, index);
Type &dummy = value;
// Construct in place.
::new(&m_data[index]) Type(std::move(dummy));
pushGuard.release();
m_manager.commitPushIndex(generation, index);
if(m_waitingPoppers > 0)
{
m_popSemaphore.notify();
}
return QueueReturn::Success;
}
template < typename Type >
std::optional< Type >
Queue< Type >::tryPopFront()
{
uint32_t generation;
uint32_t index;
// Sync Point C.
//
// The call to reservePopIndex writes with full *sequential* consistency,
// which guarantees the relaxed read to waiting poppers is synchronized
// with Sync Point D.
QueueReturn retVal = m_manager.reservePopIndex(generation, index);
if(retVal != QueueReturn::Success)
{
return {};
}
// Pop guard will (even if the move/copy constructor throws)
// - destroy the original object
// - update the queue
// - notify any waiting pushers
QueuePopGuard popGuard(*this, generation, index);
return std::optional< Type >(std::move(m_data[index]));
}
template < typename Type >
QueueReturn
Queue< Type >::pushBack(const Type &value)
{
for(;;)
{
QueueReturn retVal = tryPushBack(value);
switch(retVal)
{
// Queue disabled.
case QueueReturn::QueueDisabled:
// We pushed the value back
case QueueReturn::Success:
return retVal;
default:
// continue on.
break;
}
m_waitingPushers.fetch_add(1, std::memory_order_relaxed);
// Sync Point B.
//
// The call to `full` below loads the push index with full *sequential*
// consistency, which gives visibility of the change above to
// waiting pushers in Synchronisation Point B.
if(full() && enabled())
{
m_pushSemaphore.wait();
}
m_waitingPushers.fetch_add(-1, std::memory_order_relaxed);
}
}
template < typename Type >
QueueReturn
Queue< Type >::pushBack(Type &&value)
{
for(;;)
{
QueueReturn retVal = tryPushBack(std::move(value));
switch(retVal)
{
// Queue disabled.
case QueueReturn::QueueDisabled:
// We pushed the value back
case QueueReturn::Success:
return retVal;
default:
// continue on.
break;
}
m_waitingPushers.fetch_add(1, std::memory_order_relaxed);
// Sync Point B.
//
// The call to `full` below loads the push index with full *sequential*
// consistency, which gives visibility of the change above to
// waiting pushers in Synchronisation Point C.
if(full() && enabled())
{
m_pushSemaphore.wait();
}
m_waitingPushers.fetch_add(-1, std::memory_order_relaxed);
}
}
template < typename Type >
Type
Queue< Type >::popFront()
{
uint32_t generation = 0;
uint32_t index = 0;
while(m_manager.reservePopIndex(generation, index)
!= QueueReturn::Success)
{
m_waitingPoppers.fetch_add(1, std::memory_order_relaxed);
if(empty())
{
m_popSemaphore.wait();
}
m_waitingPoppers.fetch_sub(1, std::memory_order_relaxed);
}
QueuePopGuard popGuard(*this, generation, index);
return Type(std::move(m_data[index]));
}
template < typename Type >
void
Queue< Type >::removeAll()
{
size_t elemCount = size();
uint32_t poppedItems = 0;
while(poppedItems++ < elemCount)
{
uint32_t generation = 0;
uint32_t index = 0;
if(m_manager.reservePopIndex(generation, index) != QueueReturn::Success)
{
break;
}
m_data[index].~Type();
m_manager.commitPopIndex(generation, index);
}
size_t wakeups = std::min(poppedItems, m_waitingPushers.load());
while(wakeups--)
{
m_pushSemaphore.notify();
}
}
template < typename Type >
void
Queue< Type >::disable()
{
m_manager.disable();
uint32_t numWaiting = m_waitingPushers;
while(numWaiting--)
{
m_pushSemaphore.notify();
}
}
template < typename Type >
void
Queue< Type >::enable()
{
m_manager.enable();
}
template < typename Type >
size_t
Queue< Type >::capacity() const
{
return m_manager.capacity();
}
template < typename Type >
size_t
Queue< Type >::size() const
{
return m_manager.size();
}
template < typename Type >
bool
Queue< Type >::enabled() const
{
return m_manager.enabled();
}
template < typename Type >
bool
Queue< Type >::full() const
{
return (capacity() <= size());
}
template < typename Type >
bool
Queue< Type >::empty() const
{
return (0 >= size());
}
template < typename Type >
QueuePushGuard< Type >::~QueuePushGuard()
{
if(m_queue)
{
// Thread currently has the cell at index/generation. Dispose of it.
uint32_t generation = 0;
uint32_t index = 0;
// We should always have at least one item to pop.
size_t poppedItems = 1;
while(m_queue->m_manager.reservePopForClear(generation, index,
m_generation, m_index))
{
m_queue->m_data[index].~Type();
poppedItems++;
m_queue->m_manager.commitPopIndex(generation, index);
}
// And release
m_queue->m_manager.abortPushIndexReservation(m_generation, m_index);
while(poppedItems--)
{
m_queue->m_pushSemaphore.notify();
}
}
}
template < typename Type >
void
QueuePushGuard< Type >::release()
{
m_queue = nullptr;
}
template < typename Type >
QueuePopGuard< Type >::~QueuePopGuard()
{
m_queue.m_data[m_index].~Type();
m_queue.m_manager.commitPopIndex(m_generation, m_index);
// Notify a pusher
if(m_queue.m_waitingPushers > 0)
{
m_queue.m_pushSemaphore.notify();
}
}
} // namespace thread
} // namespace llarp
#endif

@ -0,0 +1,562 @@
#include "queue_manager.hpp"
#include <thread>
namespace llarp
{
namespace thread
{
// Turn an enum into its underlying value.
template < typename E >
constexpr auto
to_underlying(E e) noexcept
{
return static_cast< std::underlying_type_t< E > >(e);
}
static constexpr uint32_t GENERATION_COUNT_SHIFT = 0x2;
// Max number of generations which can be held in an uint32_t.
static constexpr size_t NUM_ELEMENT_GENERATIONS = 1
<< ((sizeof(uint32_t) * 8) - 2);
// mask for holding the element state from an element
static constexpr uint32_t ELEMENT_STATE_MASK = 0x3;
// mask for holding the disabled bit in the index.
static constexpr uint32_t DISABLED_STATE_MASK = 1
<< ((sizeof(uint32_t) * 8) - 1);
// Max number of combinations of index and generations.
static constexpr uint32_t NUM_COMBINED_INDEXES = DISABLED_STATE_MASK;
bool
isDisabledFlagSet(uint32_t encodedIndex)
{
return (encodedIndex & DISABLED_STATE_MASK);
}
uint32_t
discardDisabledFlag(uint32_t encodedIndex)
{
return (encodedIndex & ~DISABLED_STATE_MASK);
}
uint32_t
encodeElement(uint32_t generation, ElementState state)
{
return (generation << GENERATION_COUNT_SHIFT) | to_underlying(state);
}
uint32_t
decodeGenerationFromElementState(uint32_t state)
{
return state >> GENERATION_COUNT_SHIFT;
}
ElementState
decodeStateFromElementState(uint32_t state)
{
return ElementState(state & ELEMENT_STATE_MASK);
}
QueueManager::AtomicIndex&
QueueManager::pushIndex()
{
return m_pushIndex;
}
QueueManager::AtomicIndex&
QueueManager::popIndex()
{
return m_popIndex;
}
const QueueManager::AtomicIndex&
QueueManager::pushIndex() const
{
return m_pushIndex;
}
const QueueManager::AtomicIndex&
QueueManager::popIndex() const
{
return m_popIndex;
}
uint32_t
QueueManager::nextCombinedIndex(uint32_t index) const
{
if(m_maxCombinedIndex == index)
{
return 0;
}
return index + 1;
}
uint32_t
QueueManager::nextGeneration(uint32_t generation) const
{
if(m_maxGeneration == generation)
{
return 0;
}
return generation + 1;
}
size_t
QueueManager::capacity() const
{
return m_capacity;
}
int32_t
QueueManager::circularDifference(uint32_t startingValue,
uint32_t subtractValue, uint32_t modulo)
{
assert(modulo
<= (static_cast< uint32_t >(std::numeric_limits< int32_t >::max())
+ 1));
assert(startingValue < modulo);
assert(subtractValue < modulo);
int32_t difference = startingValue - subtractValue;
if(difference > static_cast< int32_t >(modulo / 2))
{
return difference - modulo;
}
else if(difference < -static_cast< int32_t >(modulo / 2))
{
return difference + modulo;
}
else
{
return difference;
}
}
uint32_t
QueueManager::numGenerations(size_t capacity)
{
assert(capacity != 0);
return static_cast< uint32_t >(
std::min(NUM_COMBINED_INDEXES / capacity, NUM_ELEMENT_GENERATIONS));
}
QueueManager::QueueManager(size_t capacity)
: m_pushIndex(0)
, m_popIndex(0)
, m_capacity(capacity)
, m_maxGeneration(numGenerations(capacity) - 1)
, m_maxCombinedIndex(
numGenerations(capacity) * static_cast< uint32_t >(capacity) - 1)
{
assert(0 < capacity);
assert(capacity <= MAX_CAPACITY);
(void)m_pushPadding;
(void)m_popPadding;
m_states = new std::atomic_uint32_t[capacity];
for(size_t i = 0; i < capacity; ++i)
{
m_states[i] = 0;
}
}
QueueManager::~QueueManager()
{
delete m_states;
}
QueueReturn
QueueManager::reservePushIndex(uint32_t& generation, uint32_t& index)
{
uint32_t loadedPushIndex = pushIndex().load(std::memory_order_relaxed);
uint32_t savedPushIndex = -1;
uint32_t combinedIndex = 0;
uint32_t currIdx = 0;
uint32_t currGen = 0;
// Use savedPushIndex to make us acquire an index at least twice before
// returning QueueFull.
// This prevents us from massive contention when we have a queue of size 1
for(;;)
{
if(isDisabledFlagSet(loadedPushIndex))
{
return QueueReturn::QueueDisabled;
}
combinedIndex = discardDisabledFlag(loadedPushIndex);
currGen = static_cast< uint32_t >(combinedIndex / m_capacity);
currIdx = static_cast< uint32_t >(combinedIndex % m_capacity);
uint32_t compare = encodeElement(currGen, ElementState::Empty);
const uint32_t swap = encodeElement(currGen, ElementState::Writing);
if(m_states[currIdx].compare_exchange_strong(compare, swap))
{
// We changed the state.
generation = currGen;
index = currIdx;
break;
}
// We failed to reserve the index. Use the result from cmp n swap to
// determine if the queue was full or not. Either:
// 1. The cell is from a previous generation (so the queue is full)
// 2. Another cell has reserved this cell for writing, but not commited
// yet
// 3. The push index has been changed between the load and the cmp.
uint32_t elemGen = decodeGenerationFromElementState(compare);
int32_t difference = static_cast< int32_t >(currGen - elemGen);
if(difference == 1
|| (difference == -static_cast< int32_t >(m_maxGeneration)))
{
// Queue is full.
assert(1
== circularDifference(currGen, elemGen, m_maxGeneration + 1));
ElementState state = decodeStateFromElementState(compare);
if(state == ElementState::Reading)
{
// Another thread is reading. Yield this thread
std::this_thread::yield();
loadedPushIndex = pushIndex().load(std::memory_order_relaxed);
continue;
}
assert(state != ElementState::Empty);
if(savedPushIndex != loadedPushIndex)
{
// Make another attempt to check the queue is full before failing
std::this_thread::yield();
savedPushIndex = loadedPushIndex;
loadedPushIndex = pushIndex().load(std::memory_order_relaxed);
continue;
}
return QueueReturn::QueueFull;
}
// Another thread has already acquired this cell, try to increment the
// push index and go again.
assert(0 >= circularDifference(currGen, elemGen, m_maxGeneration + 1));
const uint32_t next = nextCombinedIndex(combinedIndex);
pushIndex().compare_exchange_strong(combinedIndex, next);
loadedPushIndex = combinedIndex;
}
// We got the cell, increment the push index
const uint32_t next = nextCombinedIndex(combinedIndex);
pushIndex().compare_exchange_strong(combinedIndex, next);
return QueueReturn::Success;
}
void
QueueManager::commitPushIndex(uint32_t generation, uint32_t index)
{
assert(generation <= m_maxGeneration);
assert(index < m_capacity);
assert(ElementState::Writing
== decodeStateFromElementState(m_states[index]));
assert(generation == decodeGenerationFromElementState(m_states[index]));
m_states[index] = encodeElement(generation, ElementState::Full);
}
QueueReturn
QueueManager::reservePopIndex(uint32_t& generation, uint32_t& index)
{
uint32_t loadedPopIndex = popIndex().load();
uint32_t savedPopIndex = -1;
uint32_t currIdx = 0;
uint32_t currGen = 0;
for(;;)
{
currGen = static_cast< uint32_t >(loadedPopIndex / m_capacity);
currIdx = static_cast< uint32_t >(loadedPopIndex % m_capacity);
// Try to swap this state from full to reading.
uint32_t compare = encodeElement(currGen, ElementState::Full);
const uint32_t swap = encodeElement(currGen, ElementState::Reading);
if(m_states[currIdx].compare_exchange_strong(compare, swap))
{
generation = currGen;
index = currIdx;
break;
}
// We failed to reserve the index. Use the result from cmp n swap to
// determine if the queue was full or not. Either:
// 1. The cell is from a previous generation (so the queue is empty)
// 2. The cell is from the current generation and empty (so the queue is
// empty)
// 3. The queue is being written to
// 4. The pop index has been changed between the load and the cmp.
uint32_t elemGen = decodeGenerationFromElementState(compare);
ElementState state = decodeStateFromElementState(compare);
int32_t difference = static_cast< int32_t >(currGen - elemGen);
if(difference == 1
|| (difference == -static_cast< int32_t >(m_maxGeneration)))
{
// Queue is full.
assert(state == ElementState::Reading);
assert(
1 == (circularDifference(currGen, elemGen, m_maxGeneration) + 1));
return QueueReturn::QueueEmpty;
}
if(difference == 0 && state == ElementState::Empty)
{
// The cell is empty in the current generation, so the queue is empty
if(savedPopIndex != loadedPopIndex)
{
std::this_thread::yield();
savedPopIndex = loadedPopIndex;
loadedPopIndex = popIndex().load(std::memory_order_relaxed);
continue;
}
return QueueReturn::QueueEmpty;
}
if(difference != 0 || state == ElementState::Writing)
{
// The cell is currently being written to or the index is outdated)
// Yield and try again.
std::this_thread::yield();
loadedPopIndex = popIndex().load(std::memory_order_relaxed);
continue;
}
popIndex().compare_exchange_strong(loadedPopIndex,
nextCombinedIndex(loadedPopIndex));
}
popIndex().compare_exchange_strong(loadedPopIndex,
nextCombinedIndex(loadedPopIndex));
return QueueReturn::Success;
}
void
QueueManager::commitPopIndex(uint32_t generation, uint32_t index)
{
assert(generation <= m_maxGeneration);
assert(index < m_capacity);
assert(decodeStateFromElementState(m_states[index])
== ElementState::Reading);
assert(generation == decodeGenerationFromElementState(m_states[index]));
m_states[index] =
encodeElement(nextGeneration(generation), ElementState::Empty);
}
void
QueueManager::disable()
{
// Loop until we set the disabled bit
for(;;)
{
uint32_t index = pushIndex();
if(isDisabledFlagSet(index))
{
// Queue is already disabled(?!)
return;
}
if(pushIndex().compare_exchange_strong(index,
index | DISABLED_STATE_MASK))
{
// queue has been disabled
return;
}
}
}
void
QueueManager::enable()
{
for(;;)
{
uint32_t index = pushIndex();
if(!isDisabledFlagSet(index))
{
// queue is already enabled.
return;
}
if(pushIndex().compare_exchange_strong(index,
index & ~DISABLED_STATE_MASK))
{
// queue has been enabled
return;
}
}
}
bool
QueueManager::reservePopForClear(uint32_t& generation, uint32_t& index,
uint32_t endGeneration, uint32_t endIndex)
{
assert(endGeneration <= m_maxGeneration);
assert(endIndex < m_capacity);
uint32_t loadedCombinedIndex = popIndex().load(std::memory_order_relaxed);
for(;;)
{
u_int32_t endCombinedIndex =
(endGeneration * static_cast< uint32_t >(m_capacity)) + endIndex;
if(circularDifference(endCombinedIndex, loadedCombinedIndex,
m_maxCombinedIndex + 1)
== 0)
{
return false;
}
assert(0 < circularDifference(endCombinedIndex, loadedCombinedIndex,
m_maxCombinedIndex + 1));
u_int32_t currIdx =
static_cast< uint32_t >(loadedCombinedIndex % m_capacity);
u_int32_t currGen =
static_cast< uint32_t >(loadedCombinedIndex / m_capacity);
// Try to swap this cell from Full to Reading.
// We only set this to Empty after trying to increment popIndex, so we
// don't race against another thread.
uint32_t compare = encodeElement(currGen, ElementState::Full);
const uint32_t swap = encodeElement(currGen, ElementState::Reading);
if(m_states[currIdx].compare_exchange_strong(compare, swap))
{
// We've dropped this index.
generation = currGen;
index = currIdx;
break;
}
ElementState state = decodeStateFromElementState(compare);
if(state == ElementState::Writing || state == ElementState::Full)
{
// Another thread is writing to this cell, or this thread has slept
// for too long.
std::this_thread::yield();
loadedCombinedIndex = popIndex().load(std::memory_order_relaxed);
continue;
}
const uint32_t next = nextCombinedIndex(loadedCombinedIndex);
popIndex().compare_exchange_strong(loadedCombinedIndex, next);
}
// Attempt to increment the index.
const uint32_t next = nextCombinedIndex(loadedCombinedIndex);
popIndex().compare_exchange_strong(loadedCombinedIndex, next);
return true;
}
void
QueueManager::abortPushIndexReservation(uint32_t generation, uint32_t index)
{
assert(generation <= m_maxGeneration);
assert(index < m_capacity);
assert(static_cast< uint32_t >((generation * m_capacity) + index)
== popIndex().load(std::memory_order_relaxed));
assert(decodeStateFromElementState(m_states[index])
== ElementState::Writing);
assert(generation == decodeGenerationFromElementState(m_states[index]));
uint32_t loadedPopIndex = popIndex().load(std::memory_order_relaxed);
assert(generation == loadedPopIndex / m_capacity);
assert(index == loadedPopIndex % m_capacity);
m_states[index] = encodeElement(generation, ElementState::Reading);
const uint32_t nextIndex = nextCombinedIndex(loadedPopIndex);
popIndex().compare_exchange_strong(loadedPopIndex, nextIndex);
m_states[index] =
encodeElement(nextGeneration(generation), ElementState::Empty);
}
size_t
QueueManager::size() const
{
// Note that we rely on these loads being sequentially consistent.
uint32_t combinedPushIndex = discardDisabledFlag(pushIndex());
uint32_t combinedPopIndex = popIndex();
int32_t difference = combinedPushIndex - combinedPopIndex;
if(difference >= 0)
{
if(difference > static_cast< int32_t >(m_capacity))
{
// We've raced between getting push and pop indexes, in this case, it
// means the queue is empty.
assert(0 > circularDifference(combinedPushIndex, combinedPopIndex,
m_maxCombinedIndex + 1));
return 0;
}
return static_cast< size_t >(difference);
}
if(difference < -static_cast< int32_t >(m_maxCombinedIndex / 2))
{
assert(0 < circularDifference(combinedPushIndex, combinedPopIndex,
m_maxCombinedIndex + 1));
difference += m_maxCombinedIndex + 1;
return std::min(static_cast< size_t >(difference), m_capacity);
}
return 0;
}
bool
QueueManager::enabled() const
{
return !isDisabledFlagSet(pushIndex().load());
}
} // namespace thread
} // namespace llarp

@ -0,0 +1,212 @@
#ifndef LLARP_QUEUE_MANAGER_HPP
#define LLARP_QUEUE_MANAGER_HPP
#include <algorithm>
#include <atomic>
#include <cassert>
#include <iostream>
#include <limits>
#include <string>
#include <type_traits>
namespace llarp
{
namespace thread
{
enum class ElementState : uint32_t
{
Empty = 0,
Writing = 1,
Full = 2,
Reading = 3
};
enum class QueueReturn
{
Success,
QueueDisabled,
QueueEmpty,
QueueFull
};
inline std::ostream&
operator<<(std::ostream& os, QueueReturn val)
{
switch(val)
{
case QueueReturn::Success:
os << "Success";
break;
case QueueReturn::QueueDisabled:
os << "QueueDisabled";
break;
case QueueReturn::QueueEmpty:
os << "QueueEmpty";
break;
case QueueReturn::QueueFull:
os << "QueueFull";
break;
}
return os;
}
class QueueManager
{
// This class provides thread-safe state management for a queue.
// Common terminology in this class:
// - "Combined Index": the combination of an index into the circular
// buffer and the generation count. Precisely:
//
// Combined Index = (Generation * Capacity) + Element Index
//
// The combined index has the useful property where incrementing the
// index when the element index is at the end of the buffer does two
// things:
// 1. Sets the element index back to 0
// 2. Increments the generation
public:
static constexpr size_t Alignment = 64;
using AtomicIndex = std::atomic_uint32_t;
private:
AtomicIndex m_pushIndex; // Index in the buffer that the next
// element will be added to.
char m_pushPadding[Alignment - sizeof(AtomicIndex)];
AtomicIndex m_popIndex; // Index in the buffer that the next
// element will be removed from.
char m_popPadding[Alignment - sizeof(AtomicIndex)];
const size_t m_capacity; // max size of the manager.
const uint32_t m_maxGeneration; // Maximum generation for this object.
const uint32_t m_maxCombinedIndex; // Maximum combined value of index and
// generation for this object.
std::atomic_uint32_t* m_states; // Array of index states.
AtomicIndex&
pushIndex();
AtomicIndex&
popIndex();
const AtomicIndex&
pushIndex() const;
const AtomicIndex&
popIndex() const;
// Return the next combined index
uint32_t
nextCombinedIndex(uint32_t index) const;
// Return the next generation
uint32_t
nextGeneration(uint32_t generation) const;
public:
// Return the difference between the startingValue and the subtractValue
// around a particular modulo.
static int32_t
circularDifference(uint32_t startingValue, uint32_t subtractValue,
uint32_t modulo);
// Return the number of possible generations a circular buffer can hold.
static uint32_t
numGenerations(size_t capacity);
// The max capacity of the queue manager.
// 2 bits are used for holding the disabled status and the number of
// generations is at least 2.
static constexpr size_t MAX_CAPACITY = 1 << ((sizeof(uint32_t) * 8) - 2);
explicit QueueManager(size_t capacity);
~QueueManager();
// Push operations
// Reserve the next available index to enqueue an element at. On success:
// - Load `index` with the next available index
// - Load `generation` with the current generation
//
// If this call succeeds, other threads may spin until `commitPushIndex`
// is called.
QueueReturn
reservePushIndex(uint32_t& generation, uint32_t& index);
// Mark the `index` in the given `generation` as in-use. This unblocks
// any other threads which were waiting on the index state.
void
commitPushIndex(uint32_t generation, uint32_t index);
// Pop operations
// Reserve the next available index to remove an element from. On success:
// - Load `index` with the next available index
// - Load `generation` with the current generation
//
// If this call succeeds, other threads may spin until `commitPopIndex`
// is called.
QueueReturn
reservePopIndex(uint32_t& generation, uint32_t& index);
// Mark the `index` in the given `generation` as available. This unblocks
// any other threads which were waiting on the index state.
void
commitPopIndex(uint32_t generation, uint32_t index);
// Disable the queue
void
disable();
// Enable the queue
void
enable();
// Exception safety
// If the next available index an element can be popped from is before
// the `endGeneration` and the `endIndex`, reserve that index into `index`
// and `generation`.
//
// Return true if an index was reserved and false otherwise.
//
// Behaviour is undefined if `endGeneration` and `endIndex` have not been
// acquired for writing.
//
// The intended usage of this method is to help remove all elements if an
// exception is thrown between reserving and committing an index.
// Workflow:
// 1. call reservePopForClear
// 2. call commitPopIndex, emptying all cells up to the reserved index
// 3. call abortPushIndexReservation on the index.
bool
reservePopForClear(uint32_t& generation, uint32_t& index,
uint32_t endGeneration, uint32_t endIndex);
void
abortPushIndexReservation(uint32_t generation, uint32_t index);
// Accessors
bool
enabled() const;
size_t
size() const;
size_t
capacity() const;
};
} // namespace thread
} // namespace llarp
#endif

@ -0,0 +1,329 @@
#include <thread_pool.hpp>
namespace llarp
{
namespace thread
{
using LockGuard = std::unique_lock< std::mutex >;
void
ThreadPool::join()
{
for(auto& t : m_threads)
{
if(t.joinable())
{
t.join();
}
}
m_createdThreads = 0;
}
void
ThreadPool::runJobs()
{
while(m_status.load(std::memory_order_relaxed) == Status::Run)
{
auto functor = m_queue.tryPopFront();
if(functor.has_value())
{
functor.value()();
}
else
{
m_idleThreads++;
if(m_status == Status::Run && m_queue.empty())
{
m_semaphore.wait();
}
m_idleThreads.fetch_sub(1, std::memory_order_relaxed);
}
}
}
void
ThreadPool::drainQueue()
{
while(m_status.load(std::memory_order_relaxed) == Status::Drain)
{
auto functor = m_queue.tryPopFront();
if(!functor)
{
return;
}
functor.value()();
}
}
void
ThreadPool::waitThreads()
{
LockGuard lock(m_gateMutex);
m_threadsReadyCond.wait(
lock, [this]() { return m_numThreadsReady == m_threads.size(); });
}
void
ThreadPool::releaseThreads()
{
LockGuard lock(m_gateMutex);
m_numThreadsReady = 0;
++m_gateCount;
m_gateCond.notify_all();
}
void
ThreadPool::interrupt()
{
LockGuard lock(m_gateMutex);
size_t count = m_idleThreads;
for(size_t i = 0; i < count; ++i)
{
m_semaphore.notify();
}
}
void
ThreadPool::worker()
{
size_t gateCount = m_gateCount;
for(;;)
{
{
LockGuard lock(m_gateMutex);
++m_numThreadsReady;
m_threadsReadyCond.notify_one();
m_gateCond.wait(lock, [&]() { return gateCount != m_gateCount; });
gateCount = m_gateCount;
}
Status status = m_status.load(std::memory_order_relaxed);
// Can't use a switch here as we want to load and fall through.
if(status == Status::Run)
{
runJobs();
status = m_status;
}
if(status == Status::Drain)
{
drainQueue();
}
else if(status == Status::Suspend)
{
continue;
}
else
{
assert(status == Status::Stop);
return;
}
}
}
bool
ThreadPool::spawn()
{
try
{
m_threads.at(m_createdThreads) =
std::thread(std::bind(&ThreadPool::worker, this));
++m_createdThreads;
return true;
}
catch(const std::system_error&)
{
return false;
}
}
ThreadPool::ThreadPool(size_t numThreads, size_t maxJobs)
: m_queue(maxJobs)
, m_semaphore(0)
, m_idleThreads(0)
, m_status(Status::Stop)
, m_gateCount(0)
, m_numThreadsReady(0)
, m_threads(numThreads)
, m_createdThreads(0)
{
assert(numThreads != 0);
assert(maxJobs != 0);
disable();
}
ThreadPool::~ThreadPool()
{
shutdown();
}
bool
ThreadPool::addJob(const Job& job)
{
assert(job);
QueueReturn ret = m_queue.pushBack(job);
if(ret == QueueReturn::Success && m_idleThreads > 0)
{
m_semaphore.notify();
}
return ret == QueueReturn::Success;
}
bool
ThreadPool::addJob(Job&& job)
{
assert(job);
QueueReturn ret = m_queue.pushBack(std::move(job));
if(ret == QueueReturn::Success && m_idleThreads > 0)
{
m_semaphore.notify();
}
return ret == QueueReturn::Success;
}
bool
ThreadPool::tryAddJob(const Job& job)
{
assert(job);
QueueReturn ret = m_queue.tryPushBack(job);
if(ret == QueueReturn::Success && m_idleThreads > 0)
{
m_semaphore.notify();
}
return ret == QueueReturn::Success;
}
bool
ThreadPool::tryAddJob(Job&& job)
{
assert(job);
QueueReturn ret = m_queue.tryPushBack(std::move(job));
if(ret == QueueReturn::Success && m_idleThreads > 0)
{
m_semaphore.notify();
}
return ret == QueueReturn::Success;
}
void
ThreadPool::drain()
{
LockGuard lock(m_mutex);
if(m_status.load(std::memory_order_relaxed) == Status::Run)
{
m_status = Status::Drain;
interrupt();
waitThreads();
m_status = Status::Run;
releaseThreads();
}
}
void
ThreadPool::shutdown()
{
LockGuard lock(m_mutex);
if(m_status.load(std::memory_order_relaxed) == Status::Run)
{
m_queue.disable();
m_status = Status::Stop;
interrupt();
m_queue.removeAll();
join();
}
}
bool
ThreadPool::start()
{
LockGuard lock(m_mutex);
if(m_status.load(std::memory_order_relaxed) != Status::Stop)
{
return true;
}
for(auto it = (m_threads.begin() + m_createdThreads);
it != m_threads.end(); ++it)
{
if(!spawn())
{
releaseThreads();
join();
return false;
}
}
waitThreads();
m_queue.enable();
m_status = Status::Run;
// `releaseThreads` has a release barrier so workers don't return from
// wait and not see the above store.
releaseThreads();
return true;
}
void
ThreadPool::stop()
{
LockGuard lock(m_mutex);
if(m_status.load(std::memory_order_relaxed) == Status::Run)
{
m_queue.disable();
m_status = Status::Drain;
// `interrupt` has an acquire barrier (locks a mutex), so nothing will
// be executed before the above store to `status`.
interrupt();
waitThreads();
m_status = Status::Stop;
// `releaseThreads` has a release barrier so workers don't return from
// wait and not see the above store.
releaseThreads();
join();
}
}
} // namespace thread
} // namespace llarp

@ -0,0 +1,210 @@
#ifndef LLARP_THREAD_POOL_HPP
#define LLARP_THREAD_POOL_HPP
#include <llarp/threading.hpp>
#include <queue.hpp>
#include <functional>
#include <thread>
#include <vector>
namespace llarp
{
namespace thread
{
class ThreadPool
{
// Provide an efficient fixed size threadpool. The following attributes
// of the threadpool are fixed at construction time:
// - the max number of pending jobs
// - the number of threads
public:
using Job = std::function< void() >;
using JobQueue = Queue< Job >;
enum class Status
{
Stop,
Run,
Suspend,
Drain
};
private:
JobQueue m_queue; // The job queue
util::Semaphore m_semaphore; // The semaphore for the queue.
std::atomic_size_t m_idleThreads; // Number of idle threads
std::mutex m_mutex;
std::atomic< Status > m_status;
size_t m_gateCount;
size_t m_numThreadsReady; // Threads ready to go through the gate.
std::mutex m_gateMutex;
std::condition_variable m_threadsReadyCond;
std::condition_variable m_gateCond;
std::vector< std::thread > m_threads;
size_t m_createdThreads;
void
join();
void
runJobs();
void
drainQueue();
void
waitThreads();
void
releaseThreads();
void
interrupt();
void
worker();
bool
spawn();
public:
ThreadPool(size_t numThreads, size_t maxJobs);
~ThreadPool();
// Disable the threadpool. Calls to `addJob` and `tryAddJob` will fail.
// Jobs currently in the pool will not be affected.
void
disable();
void
enable();
// Add a job to the bool. Note this call will block if the underlying
// queue is full.
// Returns false if the queue is currently disabled.
bool
addJob(const Job& job);
bool
addJob(Job&& job);
// Try to add a job to the pool. If the queue is full, or the queue is
// disabled, return false.
// This call will not block.
bool
tryAddJob(const Job& job);
bool
tryAddJob(Job&& job);
// Wait until all current jobs are complete.
// If any jobs are submitted during this time, they **may** or **may not**
// run.
void
drain();
// Disable this pool, and cancel all pending jobs. After all currently
// running jobs are complete, join with the threads in the pool.
void
shutdown();
// Start this threadpool by spawning `threadCount()` threads.
bool
start();
// Disable queueing on this threadpool and wait until all pending jobs
// have finished.
void
stop();
bool
enabled() const;
bool
started() const;
size_t
activeThreadCount() const;
// Current number of queued jobs
size_t
jobCount() const;
// Number of threads passed in the constructor
size_t
threadCount() const;
// Number of threads currently started in the threadpool
size_t
startedThreadCount() const;
// Max number of queued jobs
size_t
capacity() const;
};
inline void
ThreadPool::disable()
{
m_queue.disable();
}
inline void
ThreadPool::enable()
{
m_queue.enable();
}
inline bool
ThreadPool::enabled() const
{
return m_queue.enabled();
}
inline size_t
ThreadPool::activeThreadCount() const
{
if(m_threads.size() == m_createdThreads)
{
return m_threads.size() - m_idleThreads.load(std::memory_order_relaxed);
}
else
{
return 0;
}
}
inline size_t
ThreadPool::threadCount() const
{
return m_threads.size();
}
inline size_t
ThreadPool::startedThreadCount() const
{
return m_createdThreads;
}
inline size_t
ThreadPool::jobCount() const
{
return m_queue.size();
}
inline size_t
ThreadPool::capacity() const
{
return m_queue.capacity();
}
} // namespace thread
} // namespace llarp
#endif

@ -0,0 +1,580 @@
#include <queue.hpp>
#include <llarp/threading.hpp>
#include <array>
#include <thread>
#include <gtest/gtest.h>
using namespace llarp;
using namespace llarp::thread;
using LockGuard = std::unique_lock< std::mutex >;
class Element
{
private:
double data;
bool shouldStop;
public:
Element(double d, bool _stop = false) : data(d), shouldStop(_stop)
{
}
double
val() const
{
return data;
}
bool
stop() const
{
return shouldStop;
}
};
bool
operator==(const Element& lhs, const Element& rhs)
{
return lhs.val() == rhs.val();
}
using ObjQueue = Queue< Element >;
class Args
{
public:
std::condition_variable startCond;
std::condition_variable runCond;
std::mutex mutex;
ObjQueue queue;
// Use volatile over atomic int in order to verify the thread safety.
// If we used atomics here, we would introduce new potential synchronisation
// points.
volatile size_t iterations;
volatile size_t count;
volatile size_t startSignal;
volatile size_t runSignal;
volatile size_t endSignal;
Args(size_t _iterations, size_t size = 20 * 1000)
: queue(size)
, iterations(_iterations)
, count(0)
, startSignal(0)
, runSignal(0)
, endSignal(0)
{
}
};
void
popFrontTester(Args& args)
{
{
LockGuard guard(args.mutex);
args.count++;
args.startCond.notify_one();
args.runCond.wait(guard, [&args]() { return args.runSignal; });
}
for(;;)
{
Element e = args.queue.popFront();
if(e.stop())
{
break;
}
}
}
void
pushBackTester(Args& args)
{
{
LockGuard guard(args.mutex);
args.count++;
args.startCond.notify_one();
args.runCond.wait(guard, [&args]() { return args.runSignal; });
}
for(size_t i = 0; i < args.iterations; ++i)
{
Element e{static_cast< double >(i)};
args.queue.pushBack(e);
}
args.queue.pushBack(Element{0, true});
}
void
abaThread(char* firstValue, char* lastValue, Queue< char* >& queue,
util::Barrier& barrier)
{
barrier.wait();
for(char* val = firstValue; val <= lastValue; ++val)
{
queue.pushBack(val);
}
}
struct Exception : public std::exception
{
};
struct ExceptionTester
{
static std::atomic< std::thread::id > throwFrom;
void
test()
{
if(throwFrom != std::thread::id()
&& std::this_thread::get_id() == throwFrom)
{
throw Exception();
}
}
ExceptionTester()
{
}
ExceptionTester(const ExceptionTester&)
{
test();
}
ExceptionTester&
operator=(const ExceptionTester&)
{
test();
return *this;
}
};
std::atomic< std::thread::id > ExceptionTester::throwFrom = std::thread::id();
void
sleepNWait(size_t microseconds, util::Barrier& barrier)
{
std::this_thread::sleep_for(
std::chrono::duration< double, std::micro >(microseconds));
barrier.wait();
}
void
exceptionProducer(Queue< ExceptionTester >& queue, util::Semaphore& semaphore,
std::atomic_size_t& caught)
{
static constexpr size_t iterations = 3;
for(size_t i = 0; i < iterations; ++i)
{
try
{
queue.pushBack(ExceptionTester());
}
catch(const Exception&)
{
++caught;
}
semaphore.notify();
}
}
struct MoveTester
{
bool moved;
size_t& moveCounter;
size_t value;
explicit MoveTester(size_t& counter, size_t val)
: moved(false), moveCounter(counter), value(val)
{
}
explicit MoveTester(const MoveTester& rhs) = delete;
MoveTester&
operator=(const MoveTester& rhs) = delete;
explicit MoveTester(MoveTester&& rhs)
: moved(false), moveCounter(rhs.moveCounter), value(rhs.value)
{
rhs.moved = true;
moveCounter++;
}
MoveTester&
operator=(MoveTester&& rhs)
{
value = rhs.value;
rhs.moved = true;
moveCounter = rhs.moveCounter;
moveCounter++;
return *this;
}
};
TEST(TestQueue, single)
{
ObjQueue queue(1u);
ASSERT_EQ(0u, queue.size());
ASSERT_EQ(1u, queue.capacity());
}
TEST(TestQueue, breathing)
{
static constexpr size_t DEFAULT_CAP = 10 * 1000;
ObjQueue queue(DEFAULT_CAP);
ASSERT_EQ(0u, queue.size());
ASSERT_EQ(DEFAULT_CAP, queue.capacity());
Element e1(1.0);
Element e2(2.0);
Element e3(3.0);
queue.pushBack(e1);
queue.pushBack(e2);
queue.pushBack(e3);
Element p1 = queue.popFront();
Element p2 = queue.popFront();
Element p3 = queue.popFront();
ASSERT_EQ(e1, p1);
ASSERT_EQ(e2, p2);
ASSERT_EQ(e3, p3);
}
TEST(TestQueue, singleProducerManyConsumer)
{
static constexpr size_t iterations = 100 * 1000;
static constexpr size_t numThreads = 5;
std::array< std::thread, numThreads > threads;
Args args{iterations};
LockGuard lock(args.mutex);
for(size_t i = 0; i < threads.size(); ++i)
{
threads[i] = std::thread(std::bind(&popFrontTester, std::ref(args)));
args.startCond.wait(lock, [&args, i]() { return args.count == (i + 1); });
}
args.runSignal++;
args.runCond.notify_all();
lock.unlock();
for(size_t i = 0; i < iterations; ++i)
{
Element e{static_cast< double >(i)};
args.queue.pushBack(e);
}
for(size_t i = 0; i < numThreads; ++i)
{
Element e{0.0, true};
args.queue.pushBack(e);
}
for(size_t i = 0; i < numThreads; ++i)
{
threads[i].join();
}
ASSERT_EQ(0u, args.queue.size());
}
TEST(TestQueue, manyProducerManyConsumer)
{
static constexpr size_t iterations = 100 * 1000;
static constexpr size_t numThreads = 5;
std::array< std::thread, numThreads * 2 > threads;
Args args{iterations};
LockGuard lock(args.mutex);
for(size_t i = 0; i < numThreads; ++i)
{
threads[i] = std::thread(std::bind(&popFrontTester, std::ref(args)));
args.startCond.wait(lock, [&args, i]() { return args.count == (i + 1); });
}
for(size_t i = 0; i < numThreads; ++i)
{
threads[i + numThreads] =
std::thread(std::bind(&pushBackTester, std::ref(args)));
args.startCond.wait(
lock, [&args, i]() { return args.count == (numThreads + i + 1); });
}
args.runSignal++;
args.runCond.notify_all();
lock.unlock();
for(auto& thread : threads)
{
thread.join();
}
ASSERT_EQ(0u, args.queue.size());
}
TEST(TestQueue, ABAEmpty)
{
// Verify we avoid the ABA problem, where multiple threads try to push an
// object to the same "empty" position in the queue.
static constexpr size_t numThreads = 50;
static constexpr size_t numValues = 6;
static constexpr size_t numIterations = 1000;
static constexpr size_t numEntries = numThreads * numValues;
char block[numEntries];
for(size_t i = 0; i < numIterations; ++i)
{
util::Barrier barrier{numThreads + 1};
Queue< char* > queue{numEntries + 1};
std::array< std::thread, numThreads + 1 > threads;
char* nextValue[numThreads];
char* lastValue[numThreads];
for(size_t j = 0; j < numThreads; ++j)
{
nextValue[j] = block + (numValues * j);
lastValue[j] = block + (numValues * (j + 1)) - 1;
threads[j] = std::thread(std::bind(&abaThread, nextValue[j], lastValue[j],
std::ref(queue), std::ref(barrier)));
}
threads[numThreads] =
std::thread(std::bind(&sleepNWait, 100, std::ref(barrier)));
for(size_t j = 0; j < numEntries; ++j)
{
char* val = queue.popFront();
size_t k = 0;
for(k = 0; k < numThreads; ++k)
{
if(val == nextValue[k])
{
nextValue[k] += (val == lastValue[k] ? 0 : 1);
ASSERT_LE(nextValue[k], lastValue[k]);
break;
}
}
ASSERT_LT(k, numThreads);
}
for(auto& thread : threads)
{
thread.join();
}
ASSERT_EQ(0u, queue.size());
}
}
TEST(TestQueue, generationCount)
{
// Verify functionality after running through a full cycle (and invoking the
// generation rollover logic).
// For a queue of size 3, this is currently 508 cycles, implying we need to go
// through at least 3048 objects (3 * 508 * 2) to trigger this logic twice.
static constexpr size_t numThreads = 6;
static constexpr size_t queueSize = 3;
static constexpr size_t numEntries = 3060;
static constexpr size_t numValues = numEntries / numThreads;
char block[numEntries];
util::Barrier barrier{numThreads + 1};
Queue< char* > queue{queueSize};
std::array< std::thread, numThreads + 1 > threads;
char* nextValue[numThreads];
char* lastValue[numThreads];
for(size_t j = 0; j < numThreads; ++j)
{
nextValue[j] = block + (numValues * j);
lastValue[j] = block + (numValues * (j + 1)) - 1;
threads[j] = std::thread(std::bind(&abaThread, nextValue[j], lastValue[j],
std::ref(queue), std::ref(barrier)));
}
threads[numThreads] =
std::thread(std::bind(&sleepNWait, 100, std::ref(barrier)));
for(size_t j = 0; j < numEntries; ++j)
{
char* val = queue.popFront();
size_t k = 0;
for(k = 0; k < numThreads; ++k)
{
if(val == nextValue[k])
{
nextValue[k] += (val == lastValue[k] ? 0 : 1);
ASSERT_LE(nextValue[k], lastValue[k]);
break;
}
}
ASSERT_LT(k, numThreads);
}
for(auto& thread : threads)
{
thread.join();
}
ASSERT_EQ(0u, queue.size());
}
TEST(TestQueue, basicExceptionSafety)
{
ExceptionTester::throwFrom = std::this_thread::get_id();
Queue< ExceptionTester > queue{1};
ASSERT_THROW(queue.pushBack(ExceptionTester()), Exception);
ExceptionTester::throwFrom = std::thread::id();
}
TEST(TestQueue, exceptionSafety)
{
ExceptionTester::throwFrom = std::thread::id();
static constexpr size_t queueSize = 3;
Queue< ExceptionTester > queue{queueSize};
ASSERT_EQ(QueueReturn::Success, queue.pushBack(ExceptionTester()));
ASSERT_EQ(QueueReturn::Success, queue.pushBack(ExceptionTester()));
ASSERT_EQ(QueueReturn::Success, queue.pushBack(ExceptionTester()));
ASSERT_NE(QueueReturn::Success, queue.tryPushBack(ExceptionTester()));
util::Semaphore semaphore{0};
std::atomic_size_t caught = 0;
std::thread producer{std::bind(&exceptionProducer, std::ref(queue),
std::ref(semaphore), std::ref(caught))};
ExceptionTester::throwFrom = std::this_thread::get_id();
ASSERT_THROW({ (void)queue.popFront(); }, Exception);
// Now the queue is not full, and the producer thread can start adding items.
ASSERT_TRUE(semaphore.waitFor(std::chrono::seconds{1}));
ASSERT_EQ(queueSize, queue.size());
ASSERT_THROW({ (void)queue.popFront(); }, Exception);
// Now the queue is not full, and the producer thread can start adding items.
ASSERT_TRUE(semaphore.waitFor(std::chrono::seconds{1}));
ASSERT_EQ(queueSize, queue.size());
// Pushing into the queue with exception empties the queue.
ExceptionTester::throwFrom = producer.get_id();
// pop an item to unblock the pusher
(void)queue.popFront();
ASSERT_TRUE(semaphore.waitFor(std::chrono::seconds{1}));
ASSERT_EQ(1u, caught);
ASSERT_EQ(0u, queue.size());
ASSERT_TRUE(queue.empty());
// after throwing, the queue works fine.
ASSERT_EQ(QueueReturn::Success, queue.pushBack(ExceptionTester()));
ASSERT_EQ(QueueReturn::Success, queue.pushBack(ExceptionTester()));
ASSERT_EQ(QueueReturn::Success, queue.pushBack(ExceptionTester()));
ASSERT_NE(QueueReturn::Success, queue.tryPushBack(ExceptionTester()));
ExceptionTester::throwFrom = std::thread::id();
producer.join();
}
TEST(TestQueue, moveIt)
{
static constexpr size_t queueSize = 40;
Queue< MoveTester > queue{queueSize};
size_t counter = 0;
queue.pushBack(MoveTester{counter, 0});
ASSERT_EQ(1u, counter);
MoveTester tester2(counter, 2);
queue.pushBack(std::move(tester2));
ASSERT_TRUE(tester2.moved);
ASSERT_EQ(2u, counter);
ASSERT_EQ(QueueReturn::Success, queue.tryPushBack(MoveTester{counter, 3}));
ASSERT_EQ(3u, counter);
MoveTester tester4(counter, 4);
ASSERT_EQ(QueueReturn::Success, queue.tryPushBack(std::move(tester4)));
ASSERT_TRUE(tester4.moved);
ASSERT_EQ(4u, counter);
MoveTester popped = queue.popFront();
(void)popped;
ASSERT_EQ(5u, counter);
std::optional< MoveTester > optPopped = queue.tryPopFront();
ASSERT_TRUE(optPopped.has_value());
// Moved twice here to construct the optional.
ASSERT_EQ(6u, counter);
}

File diff suppressed because it is too large Load Diff

@ -0,0 +1,451 @@
#include <thread_pool.hpp>
#include <llarp/threading.hpp>
#include <gtest/gtest.h>
using namespace llarp;
using namespace llarp::thread;
using LockGuard = std::unique_lock< std::mutex >;
class PoolArgs
{
public:
std::mutex& mutex;
std::condition_variable& start;
std::condition_variable& stop;
volatile size_t count;
volatile size_t startSignal;
volatile size_t stopSignal;
};
class BarrierArgs
{
public:
util::Barrier& startBarrier;
util::Barrier& stopBarrier;
std::atomic_size_t count;
};
class BasicWorkArgs
{
public:
std::atomic_size_t count;
};
void
simpleFunction(PoolArgs& args)
{
LockGuard lock(args.mutex);
++args.count;
++args.startSignal;
args.start.notify_one();
args.stop.wait(lock, [&]() { return args.stopSignal; });
}
void
incrementFunction(PoolArgs& args)
{
LockGuard lock(args.mutex);
++args.count;
++args.startSignal;
args.start.notify_one();
}
void
barrierFunction(BarrierArgs& args)
{
args.startBarrier.wait();
args.count++;
args.stopBarrier.wait();
}
void
basicWork(BasicWorkArgs& args)
{
args.count++;
}
void
recurse(util::Barrier& barrier, std::atomic_size_t& counter, ThreadPool& pool,
size_t depthLimit)
{
ASSERT_LE(0u, counter);
ASSERT_GT(depthLimit, counter);
if(++counter != depthLimit)
{
ASSERT_TRUE(
pool.addJob(std::bind(recurse, std::ref(barrier), std::ref(counter),
std::ref(pool), depthLimit)));
}
barrier.wait();
}
class DestructiveObject
{
private:
util::Barrier& barrier;
ThreadPool& pool;
public:
DestructiveObject(util::Barrier& b, ThreadPool& p) : barrier(b), pool(p)
{
}
~DestructiveObject()
{
auto job = std::bind(&util::Barrier::wait, &barrier);
pool.addJob(job);
}
};
void
destructiveJob(DestructiveObject* obj)
{
delete obj;
}
TEST(TestThreadPool, breathing)
{
static constexpr size_t threads = 10;
static constexpr size_t capacity = 50;
ThreadPool pool(threads, capacity);
ASSERT_EQ(0u, pool.startedThreadCount());
ASSERT_EQ(capacity, pool.capacity());
ASSERT_EQ(0u, pool.jobCount());
ASSERT_TRUE(pool.start());
ASSERT_EQ(threads, pool.startedThreadCount());
ASSERT_EQ(capacity, pool.capacity());
ASSERT_EQ(0u, pool.jobCount());
pool.drain();
}
struct AccessorsData
{
size_t threads;
size_t capacity;
};
std::ostream&
operator<<(std::ostream& os, AccessorsData d)
{
os << "[ threads = " << d.threads << " capacity = " << d.capacity << " ]";
return os;
}
class Accessors : public ::testing::TestWithParam< AccessorsData >
{
};
TEST_P(Accessors, acessors)
{
auto d = GetParam();
ThreadPool pool(d.threads, d.capacity);
ASSERT_EQ(d.threads, pool.threadCount());
ASSERT_EQ(d.capacity, pool.capacity());
ASSERT_EQ(0u, pool.startedThreadCount());
}
static const AccessorsData accessorsData[] = {
{10, 50}, {1, 1}, {50, 100}, {2, 22}, {100, 200}};
INSTANTIATE_TEST_CASE_P(TestThreadPool, Accessors,
::testing::ValuesIn(accessorsData));
struct ClosingData
{
size_t threads;
size_t capacity;
};
std::ostream&
operator<<(std::ostream& os, ClosingData d)
{
os << "[ threads = " << d.threads << " capacity = " << d.capacity << " ]";
return os;
}
class Closing : public ::testing::TestWithParam< ClosingData >
{
};
TEST_P(Closing, drain)
{
auto d = GetParam();
std::mutex mutex;
std::condition_variable start;
std::condition_variable stop;
PoolArgs args{mutex, start, stop, 0, 0, 0};
ThreadPool pool(d.threads, d.capacity);
ASSERT_EQ(d.threads, pool.threadCount());
ASSERT_EQ(d.capacity, pool.capacity());
ASSERT_EQ(0u, pool.startedThreadCount());
auto simpleJob = std::bind(simpleFunction, std::ref(args));
ASSERT_FALSE(pool.addJob(simpleJob));
ASSERT_TRUE(pool.start());
ASSERT_EQ(0u, pool.jobCount());
LockGuard lock(mutex);
for(size_t i = 0; i < d.threads; ++i)
{
args.startSignal = 0;
args.stopSignal = 0;
ASSERT_TRUE(pool.addJob(simpleJob));
start.wait(lock, [&]() { return args.startSignal; });
}
args.stopSignal++;
lock.unlock();
stop.notify_all();
pool.drain();
ASSERT_EQ(d.threads, pool.startedThreadCount());
ASSERT_EQ(0u, pool.jobCount());
}
TEST_P(Closing, stop)
{
auto d = GetParam();
ThreadPool pool(d.threads, d.capacity);
std::mutex mutex;
std::condition_variable start;
std::condition_variable stop;
PoolArgs args{mutex, start, stop, 0, 0, 0};
ASSERT_EQ(d.threads, pool.threadCount());
ASSERT_EQ(d.capacity, pool.capacity());
ASSERT_EQ(0u, pool.startedThreadCount());
auto simpleJob = std::bind(simpleFunction, std::ref(args));
ASSERT_FALSE(pool.addJob(simpleJob));
ASSERT_TRUE(pool.start());
ASSERT_EQ(0u, pool.jobCount());
LockGuard lock(mutex);
for(size_t i = 0; i < d.capacity; ++i)
{
args.startSignal = 0;
args.stopSignal = 0;
ASSERT_TRUE(pool.addJob(simpleJob));
while(i < d.threads && !args.startSignal)
{
start.wait(lock);
}
}
args.stopSignal++;
lock.unlock();
stop.notify_all();
pool.stop();
ASSERT_EQ(d.capacity, args.count);
ASSERT_EQ(0u, pool.startedThreadCount());
ASSERT_EQ(0u, pool.activeThreadCount());
ASSERT_EQ(0u, pool.jobCount());
}
TEST_P(Closing, shutdown)
{
auto d = GetParam();
ThreadPool pool(d.threads, d.capacity);
std::mutex mutex;
std::condition_variable start;
std::condition_variable stop;
PoolArgs args{mutex, start, stop, 0, 0, 0};
ASSERT_EQ(d.threads, pool.threadCount());
ASSERT_EQ(d.capacity, pool.capacity());
ASSERT_EQ(0u, pool.startedThreadCount());
auto simpleJob = std::bind(simpleFunction, std::ref(args));
ASSERT_FALSE(pool.addJob(simpleJob));
ASSERT_TRUE(pool.start());
ASSERT_EQ(0u, pool.jobCount());
LockGuard lock(mutex);
for(size_t i = 0; i < d.capacity; ++i)
{
args.startSignal = 0;
args.stopSignal = 0;
ASSERT_TRUE(pool.addJob(simpleJob));
while(i < d.threads && !args.startSignal)
{
start.wait(lock);
}
}
ASSERT_EQ(d.threads, pool.startedThreadCount());
ASSERT_EQ(d.capacity - d.threads, pool.jobCount());
auto incrementJob = std::bind(incrementFunction, std::ref(args));
for(size_t i = 0; i < d.threads; ++i)
{
ASSERT_TRUE(pool.addJob(incrementJob));
}
args.stopSignal++;
stop.notify_all();
lock.unlock();
pool.shutdown();
ASSERT_EQ(0u, pool.startedThreadCount());
ASSERT_EQ(0u, pool.activeThreadCount());
ASSERT_EQ(0u, pool.jobCount());
}
ClosingData closingData[] = {{1, 1}, {2, 2}, {10, 10},
{10, 50}, {50, 75}, {25, 80}};
INSTANTIATE_TEST_CASE_P(TestThreadPool, Closing,
::testing::ValuesIn(closingData));
struct TryAddData
{
size_t threads;
size_t capacity;
};
std::ostream&
operator<<(std::ostream& os, TryAddData d)
{
os << "[ threads = " << d.threads << " capacity = " << d.capacity << " ]";
return os;
}
class TryAdd : public ::testing::TestWithParam< TryAddData >
{
};
TEST_P(TryAdd, noblocking)
{
// Verify that tryAdd does not block.
// Fill the queue, then verify `tryAddJob` does not block.
auto d = GetParam();
ThreadPool pool(d.threads, d.capacity);
util::Barrier startBarrier(d.threads + 1);
util::Barrier stopBarrier(d.threads + 1);
BarrierArgs args{startBarrier, stopBarrier, 0};
auto simpleJob = std::bind(barrierFunction, std::ref(args));
ASSERT_FALSE(pool.tryAddJob(simpleJob));
ASSERT_TRUE(pool.start());
for(size_t i = 0; i < d.threads; ++i)
{
ASSERT_TRUE(pool.tryAddJob(simpleJob));
}
// Wait for everything to start.
startBarrier.wait();
// and that we emptied the queue.
ASSERT_EQ(0u, pool.jobCount());
BasicWorkArgs basicWorkArgs = {0};
auto workJob = std::bind(basicWork, std::ref(basicWorkArgs));
for(size_t i = 0; i < d.capacity; ++i)
{
ASSERT_TRUE(pool.tryAddJob(workJob));
}
// queue should now be full
ASSERT_FALSE(pool.tryAddJob(workJob));
// and finish
stopBarrier.wait();
}
TEST(TestThreadPool, recurseJob)
{
// Verify we can enqueue a job onto the threadpool from a thread which is
// currently executing a threadpool job.
static constexpr size_t threads = 10;
static constexpr size_t depth = 10;
static constexpr size_t capacity = 100;
ThreadPool pool(threads, capacity);
util::Barrier barrier(threads + 1);
std::atomic_size_t counter = 0;
pool.start();
ASSERT_TRUE(pool.addJob(std::bind(recurse, std::ref(barrier),
std::ref(counter), std::ref(pool), depth)));
barrier.wait();
ASSERT_EQ(depth, counter);
}
TEST(TestThreadPool, destructors)
{
// Verify that functors have their destructors called outside of threadpool
// locks.
static constexpr size_t threads = 1;
static constexpr size_t capacity = 100;
ThreadPool pool(threads, capacity);
pool.start();
util::Barrier barrier(threads + 1);
{
DestructiveObject* obj = new DestructiveObject(barrier, pool);
ASSERT_TRUE(pool.addJob(std::bind(destructiveJob, obj)));
}
barrier.wait();
}
Loading…
Cancel
Save