Create Queue component with test suite

pull/53/head
Michael 6 years ago
parent 951a065867
commit 8a52bf448e
No known key found for this signature in database
GPG Key ID: 2D51757B47E2434C

@ -8,7 +8,7 @@
"type": "cppdbg",
"request": "launch",
"program": "${workspaceFolder}/build/testAll",
"args": ["--gtest_shuffle", "--gtest_filter=-AbyssTest.TestClientAndServer"],
"args": ["--gtest_shuffle", "--gtest_filter=TestQueue*:-TestQueueManager*"],
"stopAtEntry": false,
"cwd": "${workspaceFolder}",
"environment": [],

@ -69,6 +69,7 @@
"variant": "cpp",
"any": "cpp",
"tuntap.h": "c",
"hashtable": "cpp"
"hashtable": "cpp",
"__mutex_base": "cpp"
}
}

@ -513,6 +513,7 @@ set(TEST_SRC
test/test_dns_unit.cpp
test/test_dnsc_unit.cpp
test/test_dnsd_unit.cpp
test/test_llarp_queue.cpp
test/test_llarp_queue_manager.cpp
)

@ -87,6 +87,115 @@ namespace llarp
}
};
class Semaphore
{
private:
std::mutex m_mutex;
std::condition_variable m_cv;
size_t m_count;
public:
Semaphore(size_t count) : m_count(count)
{
}
void
notify()
{
std::unique_lock< std::mutex > lock(m_mutex);
m_count++;
m_cv.notify_one();
}
void
wait()
{
std::unique_lock< std::mutex > lock(m_mutex);
m_cv.wait(lock, [this]() { return this->m_count > 0; });
m_count--;
}
template < typename Rep, typename Period >
bool
waitFor(const std::chrono::duration< Rep, Period >& period)
{
std::unique_lock< std::mutex > lock(m_mutex);
if(m_cv.wait_for(lock, period, [this]() { return this->m_count > 0; }))
{
m_count--;
return true;
}
return false;
}
};
class Barrier
{
private:
std::mutex mutex;
std::condition_variable cv;
const size_t numThreads;
size_t numThreadsWaiting; // number of threads to be woken
size_t sigCount; // number of times the barrier has been signalled
size_t numPending; // number of threads that have been signalled, but
// haven't woken.
public:
Barrier(size_t threadCount)
: numThreads(threadCount)
, numThreadsWaiting(0)
, sigCount(0)
, numPending(0)
{
}
~Barrier()
{
for(;;)
{
{
std::unique_lock< std::mutex > lock(mutex);
if(numPending == 0)
{
break;
}
}
std::this_thread::yield();
}
assert(numThreadsWaiting == 0);
}
void
wait()
{
std::unique_lock< std::mutex > lock(mutex);
size_t signalCount = sigCount;
if(++numThreadsWaiting == numThreads)
{
++sigCount;
numPending += numThreads - 1;
numThreadsWaiting = 0;
cv.notify_all();
}
else
{
cv.wait(lock, [this, signalCount]() {
return this->sigCount != signalCount;
});
--numPending;
}
}
};
} // namespace util
} // namespace llarp

@ -0,0 +1,533 @@
#ifndef LLARP_QUEUE_HPP
#define LLARP_QUEUE_HPP
#include <llarp/threading.hpp>
#include <queue_manager.hpp>
#include <atomic>
#include <optional>
#include <tuple>
namespace llarp
{
namespace thread
{
template < typename Type >
class QueuePushGuard;
template < typename Type >
class QueuePopGuard;
template < typename Type >
class Queue
{
// This class provides a thread-safe, lock-free, fixed-size queue.
public:
static constexpr size_t Alignment = 64;
private:
Type *m_data;
const char m_dataPadding[Alignment - sizeof(Type *)];
QueueManager m_manager;
std::atomic_uint32_t m_waitingPoppers;
util::Semaphore m_popSemaphore;
const char
m_popSemaphorePadding[(2u * Alignment) - sizeof(util::Semaphore)];
std::atomic_uint32_t m_waitingPushers;
util::Semaphore m_pushSemaphore;
const char
m_pushSemaphorePadding[(2u * Alignment) - sizeof(util::Semaphore)];
friend QueuePopGuard< Type >;
friend QueuePushGuard< Type >;
public:
explicit Queue(size_t capacity);
~Queue();
Queue(const Queue &) = delete;
Queue &
operator=(const Queue &) = delete;
// Push back to the queue, blocking until space is available (if
// required). Will fail if the queue is disabled (or becomes disabled
// while waiting for space on the queue).
QueueReturn
pushBack(const Type &value);
QueueReturn
pushBack(Type &&value);
// Try to push back to the queue. Return false if the queue is full or
// disabled.
QueueReturn
tryPushBack(const Type &value);
QueueReturn
tryPushBack(Type &&value);
// Remove an element from the queue. Block until an element is available
Type
popFront();
std::optional< Type >
tryPopFront();
// Remove all elements from the queue. Note this is not atomic, and if
// other threads `pushBack` onto the queue during this call, the `size` of
// the queue is not guaranteed to be 0.
void
removeAll();
// Disable the queue. All push operations will fail "fast" (including
// blocked operations). Calling this method on a disabled queue has no
// effect.
void
disable();
// Enable the queue. Calling this method on a disabled queue has no
// effect.
void
enable();
size_t
capacity() const;
size_t
size() const;
bool
enabled() const;
bool
full() const;
bool
empty() const;
};
// Provide a guard class to provide exception safety for pushing to a queue.
// On destruction, unless the `release` method has been called, will remove
// and destroy all elements from the queue, putting the queue into an empty
// state.
template < typename Type >
class QueuePushGuard
{
private:
Queue< Type > *m_queue;
uint32_t m_generation;
uint32_t m_index;
public:
QueuePushGuard(Queue< Type > &queue, uint32_t generation, uint32_t index)
: m_queue(&queue), m_generation(generation), m_index(index)
{
}
~QueuePushGuard();
void
release();
};
// Provide a guard class to provide exception safety for popping from a
// queue. On destruction, this will pop the the given element from the
// queue.
template < typename Type >
class QueuePopGuard
{
private:
Queue< Type > &m_queue;
uint32_t m_generation;
uint32_t m_index;
public:
QueuePopGuard(Queue< Type > &queue, uint32_t generation, uint32_t index)
: m_queue(queue), m_generation(generation), m_index(index)
{
}
~QueuePopGuard();
};
template < typename Type >
Queue< Type >::Queue(size_t capacity)
: m_data(nullptr)
, m_dataPadding()
, m_manager(capacity)
, m_waitingPoppers(0)
, m_popSemaphore(0)
, m_popSemaphorePadding()
, m_waitingPushers(0)
, m_pushSemaphore(0)
, m_pushSemaphorePadding()
{
m_data = static_cast< Type * >(::operator new(capacity * sizeof(Type)));
}
template < typename Type >
Queue< Type >::~Queue()
{
removeAll();
// We have already deleted the queue members above, free as (void *)
::operator delete(static_cast< void * >(m_data));
}
template < typename Type >
QueueReturn
Queue< Type >::tryPushBack(const Type &value)
{
uint32_t generation = 0;
uint32_t index = 0;
// Sync point A
//
// The next call writes with full sequential consistency to the push
// index, which guarantees that the relaxed read to the waiting poppers
// count sees any waiting poppers from Sync point B.
QueueReturn retVal = m_manager.reservePushIndex(generation, index);
if(retVal != QueueReturn::Success)
{
return retVal;
}
// Copy into the array. If the copy constructor throws, the pushGuard will
// roll the reserve back.
QueuePushGuard< Type > pushGuard(*this, generation, index);
// Construct in place.
::new(&m_data[index]) Type(value);
pushGuard.release();
m_manager.commitPushIndex(generation, index);
if(m_waitingPoppers > 0)
{
m_popSemaphore.notify();
}
return QueueReturn::Success;
}
template < typename Type >
QueueReturn
Queue< Type >::tryPushBack(Type &&value)
{
uint32_t generation = 0;
uint32_t index = 0;
// Sync point A
//
// The next call writes with full sequential consistency to the push
// index, which guarantees that the relaxed read to the waiting poppers
// count sees any waiting poppers from Sync point B.
QueueReturn retVal = m_manager.reservePushIndex(generation, index);
if(retVal != QueueReturn::Success)
{
return retVal;
}
// Copy into the array. If the copy constructor throws, the pushGuard will
// roll the reserve back.
QueuePushGuard< Type > pushGuard(*this, generation, index);
Type &dummy = value;
// Construct in place.
::new(&m_data[index]) Type(std::move(dummy));
pushGuard.release();
m_manager.commitPushIndex(generation, index);
if(m_waitingPoppers > 0)
{
m_popSemaphore.notify();
}
return QueueReturn::Success;
}
template < typename Type >
std::optional< Type >
Queue< Type >::tryPopFront()
{
uint32_t generation;
uint32_t index;
// Sync Point C.
//
// The call to reservePopIndex writes with full *sequential* consistency,
// which guarantees the relaxed read to waiting poppers is synchronized
// with Sync Point D.
QueueReturn retVal = m_manager.reservePopIndex(generation, index);
if(retVal != QueueReturn::Success)
{
return {};
}
// Pop guard will (even if the move/copy constructor throws)
// - destroy the original object
// - update the queue
// - notify any waiting pushers
QueuePopGuard popGuard(*this, generation, index);
return std::optional< Type >(std::move(m_data[index]));
}
template < typename Type >
QueueReturn
Queue< Type >::pushBack(const Type &value)
{
for(;;)
{
QueueReturn retVal = tryPushBack(value);
switch(retVal)
{
// Queue disabled.
case QueueReturn::QueueDisabled:
// We pushed the value back
case QueueReturn::Success:
return retVal;
default:
// continue on.
break;
}
m_waitingPushers.fetch_add(1, std::memory_order_relaxed);
// Sync Point B.
//
// The call to `full` below loads the push index with full *sequential*
// consistency, which gives visibility of the change above to
// waiting pushers in Synchronisation Point B.
if(full() && enabled())
{
m_pushSemaphore.wait();
}
m_waitingPushers.fetch_add(-1, std::memory_order_relaxed);
}
}
template < typename Type >
QueueReturn
Queue< Type >::pushBack(Type &&value)
{
for(;;)
{
QueueReturn retVal = tryPushBack(std::move(value));
switch(retVal)
{
// Queue disabled.
case QueueReturn::QueueDisabled:
// We pushed the value back
case QueueReturn::Success:
return retVal;
default:
// continue on.
break;
}
m_waitingPushers.fetch_add(1, std::memory_order_relaxed);
// Sync Point B.
//
// The call to `full` below loads the push index with full *sequential*
// consistency, which gives visibility of the change above to
// waiting pushers in Synchronisation Point C.
if(full() && enabled())
{
m_pushSemaphore.wait();
}
m_waitingPushers.fetch_add(-1, std::memory_order_relaxed);
}
}
template < typename Type >
Type
Queue< Type >::popFront()
{
uint32_t generation = 0;
uint32_t index = 0;
while(m_manager.reservePopIndex(generation, index)
!= QueueReturn::Success)
{
m_waitingPoppers.fetch_add(1, std::memory_order_relaxed);
if(empty())
{
m_popSemaphore.wait();
}
m_waitingPoppers.fetch_sub(1, std::memory_order_relaxed);
}
QueuePopGuard popGuard(*this, generation, index);
return Type(std::move(m_data[index]));
}
template < typename Type >
void
Queue< Type >::removeAll()
{
size_t elemCount = size();
uint32_t poppedItems = 0;
while(poppedItems++ < elemCount)
{
uint32_t generation = 0;
uint32_t index = 0;
if(m_manager.reservePopIndex(generation, index) != QueueReturn::Success)
{
break;
}
m_data[index].~Type();
m_manager.commitPopIndex(generation, index);
}
size_t wakeups = std::min(poppedItems, m_waitingPushers.load());
while(wakeups--)
{
m_pushSemaphore.notify();
}
}
template < typename Type >
void
Queue< Type >::disable()
{
m_manager.disable();
uint32_t numWaiting = m_waitingPushers;
while(numWaiting--)
{
m_pushSemaphore.notify();
}
}
template < typename Type >
void
Queue< Type >::enable()
{
m_manager.enable();
}
template < typename Type >
size_t
Queue< Type >::capacity() const
{
return m_manager.capacity();
}
template < typename Type >
size_t
Queue< Type >::size() const
{
return m_manager.size();
}
template < typename Type >
bool
Queue< Type >::enabled() const
{
return m_manager.enabled();
}
template < typename Type >
bool
Queue< Type >::full() const
{
return (capacity() <= size());
}
template < typename Type >
bool
Queue< Type >::empty() const
{
return (0 >= size());
}
template < typename Type >
QueuePushGuard< Type >::~QueuePushGuard()
{
if(m_queue)
{
// Thread currently has the cell at index/generation. Dispose of it.
uint32_t generation = 0;
uint32_t index = 0;
// We should always have at least one item to pop.
size_t poppedItems = 1;
while(m_queue->m_manager.reservePopForClear(generation, index,
m_generation, m_index))
{
m_queue->m_data[index].~Type();
poppedItems++;
m_queue->m_manager.commitPopIndex(generation, index);
}
// And release
m_queue->m_manager.abortPushIndexReservation(m_generation, m_index);
while(poppedItems--)
{
m_queue->m_pushSemaphore.notify();
}
}
}
template < typename Type >
void
QueuePushGuard< Type >::release()
{
m_queue = nullptr;
}
template < typename Type >
QueuePopGuard< Type >::~QueuePopGuard()
{
m_queue.m_data[m_index].~Type();
m_queue.m_manager.commitPopIndex(m_generation, m_index);
// Notify a pusher
if(m_queue.m_waitingPushers > 0)
{
m_queue.m_pushSemaphore.notify();
}
}
} // namespace thread
} // namespace llarp
#endif

@ -0,0 +1,580 @@
#include <queue.hpp>
#include <llarp/threading.hpp>
#include <array>
#include <thread>
#include <gtest/gtest.h>
using namespace llarp;
using namespace llarp::thread;
using LockGuard = std::unique_lock< std::mutex >;
class Element
{
private:
double data;
bool shouldStop;
public:
Element(double d, bool _stop = false) : data(d), shouldStop(_stop)
{
}
double
val() const
{
return data;
}
bool
stop() const
{
return shouldStop;
}
};
bool
operator==(const Element& lhs, const Element& rhs)
{
return lhs.val() == rhs.val();
}
using ObjQueue = Queue< Element >;
class Args
{
public:
std::condition_variable startCond;
std::condition_variable runCond;
std::mutex mutex;
ObjQueue queue;
// Use volatile over atomic int in order to verify the thread safety.
// If we used atomics here, we would introduce new potential synchronisation
// points.
volatile size_t iterations;
volatile size_t count;
volatile size_t startSignal;
volatile size_t runSignal;
volatile size_t endSignal;
Args(size_t _iterations, size_t size = 20 * 1000)
: queue(size)
, iterations(_iterations)
, count(0)
, startSignal(0)
, runSignal(0)
, endSignal(0)
{
}
};
void
popFrontTester(Args& args)
{
{
LockGuard guard(args.mutex);
args.count++;
args.startCond.notify_one();
args.runCond.wait(guard, [&args]() { return args.runSignal; });
}
for(;;)
{
Element e = args.queue.popFront();
if(e.stop())
{
break;
}
}
}
void
pushBackTester(Args& args)
{
{
LockGuard guard(args.mutex);
args.count++;
args.startCond.notify_one();
args.runCond.wait(guard, [&args]() { return args.runSignal; });
}
for(size_t i = 0; i < args.iterations; ++i)
{
Element e{static_cast< double >(i)};
args.queue.pushBack(e);
}
args.queue.pushBack(Element{0, true});
}
void
abaThread(char* firstValue, char* lastValue, Queue< char* >& queue,
util::Barrier& barrier)
{
barrier.wait();
for(char* val = firstValue; val <= lastValue; ++val)
{
queue.pushBack(val);
}
}
struct Exception : public std::exception
{
};
struct ExceptionTester
{
static std::atomic< std::thread::id > throwFrom;
void
test()
{
if(throwFrom != std::thread::id()
&& std::this_thread::get_id() == throwFrom)
{
throw Exception();
}
}
ExceptionTester()
{
}
ExceptionTester(const ExceptionTester&)
{
test();
}
ExceptionTester&
operator=(const ExceptionTester&)
{
test();
return *this;
}
};
std::atomic< std::thread::id > ExceptionTester::throwFrom = std::thread::id();
void
sleepNWait(size_t microseconds, util::Barrier& barrier)
{
std::this_thread::sleep_for(
std::chrono::duration< double, std::micro >(microseconds));
barrier.wait();
}
void
exceptionProducer(Queue< ExceptionTester >& queue, util::Semaphore& semaphore,
std::atomic_size_t& caught)
{
static constexpr size_t iterations = 3;
for(size_t i = 0; i < iterations; ++i)
{
try
{
queue.pushBack(ExceptionTester());
}
catch(const Exception&)
{
++caught;
}
semaphore.notify();
}
}
struct MoveTester
{
bool moved;
size_t& moveCounter;
size_t value;
explicit MoveTester(size_t& counter, size_t val)
: moved(false), moveCounter(counter), value(val)
{
}
explicit MoveTester(const MoveTester& rhs) = delete;
MoveTester&
operator=(const MoveTester& rhs) = delete;
explicit MoveTester(MoveTester&& rhs)
: moved(false), moveCounter(rhs.moveCounter), value(rhs.value)
{
rhs.moved = true;
moveCounter++;
}
MoveTester&
operator=(MoveTester&& rhs)
{
value = rhs.value;
rhs.moved = true;
moveCounter = rhs.moveCounter;
moveCounter++;
return *this;
}
};
TEST(TestQueue, single)
{
ObjQueue queue(1u);
ASSERT_EQ(0u, queue.size());
ASSERT_EQ(1u, queue.capacity());
}
TEST(TestQueue, breathing)
{
static constexpr size_t DEFAULT_CAP = 10 * 1000;
ObjQueue queue(DEFAULT_CAP);
ASSERT_EQ(0u, queue.size());
ASSERT_EQ(DEFAULT_CAP, queue.capacity());
Element e1(1.0);
Element e2(2.0);
Element e3(3.0);
queue.pushBack(e1);
queue.pushBack(e2);
queue.pushBack(e3);
Element p1 = queue.popFront();
Element p2 = queue.popFront();
Element p3 = queue.popFront();
ASSERT_EQ(e1, p1);
ASSERT_EQ(e2, p2);
ASSERT_EQ(e3, p3);
}
TEST(TestQueue, singleProducerManyConsumer)
{
static constexpr size_t iterations = 100 * 1000;
static constexpr size_t numThreads = 5;
std::array< std::thread, numThreads > threads;
Args args{iterations};
LockGuard lock(args.mutex);
for(size_t i = 0; i < threads.size(); ++i)
{
threads[i] = std::thread(std::bind(&popFrontTester, std::ref(args)));
args.startCond.wait(lock, [&args, i]() { return args.count == (i + 1); });
}
args.runSignal++;
args.runCond.notify_all();
lock.unlock();
for(size_t i = 0; i < iterations; ++i)
{
Element e{static_cast< double >(i)};
args.queue.pushBack(e);
}
for(size_t i = 0; i < numThreads; ++i)
{
Element e{0.0, true};
args.queue.pushBack(e);
}
for(size_t i = 0; i < numThreads; ++i)
{
threads[i].join();
}
ASSERT_EQ(0u, args.queue.size());
}
TEST(TestQueue, manyProducerManyConsumer)
{
static constexpr size_t iterations = 100 * 1000;
static constexpr size_t numThreads = 5;
std::array< std::thread, numThreads * 2 > threads;
Args args{iterations};
LockGuard lock(args.mutex);
for(size_t i = 0; i < numThreads; ++i)
{
threads[i] = std::thread(std::bind(&popFrontTester, std::ref(args)));
args.startCond.wait(lock, [&args, i]() { return args.count == (i + 1); });
}
for(size_t i = 0; i < numThreads; ++i)
{
threads[i + numThreads] =
std::thread(std::bind(&pushBackTester, std::ref(args)));
args.startCond.wait(
lock, [&args, i]() { return args.count == (numThreads + i + 1); });
}
args.runSignal++;
args.runCond.notify_all();
lock.unlock();
for(auto& thread : threads)
{
thread.join();
}
ASSERT_EQ(0u, args.queue.size());
}
TEST(TestQueue, ABAEmpty)
{
// Verify we avoid the ABA problem, where multiple threads try to push an
// object to the same "empty" position in the queue.
static constexpr size_t numThreads = 50;
static constexpr size_t numValues = 6;
static constexpr size_t numIterations = 1000;
static constexpr size_t numEntries = numThreads * numValues;
char block[numEntries];
for(size_t i = 0; i < numIterations; ++i)
{
util::Barrier barrier{numThreads + 1};
Queue< char* > queue{numEntries + 1};
std::array< std::thread, numThreads + 1 > threads;
char* nextValue[numThreads];
char* lastValue[numThreads];
for(size_t j = 0; j < numThreads; ++j)
{
nextValue[j] = block + (numValues * j);
lastValue[j] = block + (numValues * (j + 1)) - 1;
threads[j] = std::thread(std::bind(&abaThread, nextValue[j], lastValue[j],
std::ref(queue), std::ref(barrier)));
}
threads[numThreads] =
std::thread(std::bind(&sleepNWait, 100, std::ref(barrier)));
for(size_t j = 0; j < numEntries; ++j)
{
char* val = queue.popFront();
size_t k = 0;
for(k = 0; k < numThreads; ++k)
{
if(val == nextValue[k])
{
nextValue[k] += (val == lastValue[k] ? 0 : 1);
ASSERT_LE(nextValue[k], lastValue[k]);
break;
}
}
ASSERT_LT(k, numThreads);
}
for(auto& thread : threads)
{
thread.join();
}
ASSERT_EQ(0u, queue.size());
}
}
TEST(TestQueue, generationCount)
{
// Verify functionality after running through a full cycle (and invoking the
// generation rollover logic).
// For a queue of size 3, this is currently 508 cycles, implying we need to go
// through at least 3048 objects (3 * 508 * 2) to trigger this logic twice.
static constexpr size_t numThreads = 6;
static constexpr size_t queueSize = 3;
static constexpr size_t numEntries = 3060;
static constexpr size_t numValues = numEntries / numThreads;
char block[numEntries];
util::Barrier barrier{numThreads + 1};
Queue< char* > queue{queueSize};
std::array< std::thread, numThreads + 1 > threads;
char* nextValue[numThreads];
char* lastValue[numThreads];
for(size_t j = 0; j < numThreads; ++j)
{
nextValue[j] = block + (numValues * j);
lastValue[j] = block + (numValues * (j + 1)) - 1;
threads[j] = std::thread(std::bind(&abaThread, nextValue[j], lastValue[j],
std::ref(queue), std::ref(barrier)));
}
threads[numThreads] =
std::thread(std::bind(&sleepNWait, 100, std::ref(barrier)));
for(size_t j = 0; j < numEntries; ++j)
{
char* val = queue.popFront();
size_t k = 0;
for(k = 0; k < numThreads; ++k)
{
if(val == nextValue[k])
{
nextValue[k] += (val == lastValue[k] ? 0 : 1);
ASSERT_LE(nextValue[k], lastValue[k]);
break;
}
}
ASSERT_LT(k, numThreads);
}
for(auto& thread : threads)
{
thread.join();
}
ASSERT_EQ(0u, queue.size());
}
TEST(TestQueue, basicExceptionSafety)
{
ExceptionTester::throwFrom = std::this_thread::get_id();
Queue< ExceptionTester > queue{1};
ASSERT_THROW(queue.pushBack(ExceptionTester()), Exception);
ExceptionTester::throwFrom = std::thread::id();
}
TEST(TestQueue, exceptionSafety)
{
ExceptionTester::throwFrom = std::thread::id();
static constexpr size_t queueSize = 3;
Queue< ExceptionTester > queue{queueSize};
ASSERT_EQ(QueueReturn::Success, queue.pushBack(ExceptionTester()));
ASSERT_EQ(QueueReturn::Success, queue.pushBack(ExceptionTester()));
ASSERT_EQ(QueueReturn::Success, queue.pushBack(ExceptionTester()));
ASSERT_NE(QueueReturn::Success, queue.tryPushBack(ExceptionTester()));
util::Semaphore semaphore{0};
std::atomic_size_t caught = 0;
std::thread producer{std::bind(&exceptionProducer, std::ref(queue),
std::ref(semaphore), std::ref(caught))};
ExceptionTester::throwFrom = std::this_thread::get_id();
ASSERT_THROW({ (void)queue.popFront(); }, Exception);
// Now the queue is not full, and the producer thread can start adding items.
ASSERT_TRUE(semaphore.waitFor(std::chrono::seconds{1}));
ASSERT_EQ(queueSize, queue.size());
ASSERT_THROW({ (void)queue.popFront(); }, Exception);
// Now the queue is not full, and the producer thread can start adding items.
ASSERT_TRUE(semaphore.waitFor(std::chrono::seconds{1}));
ASSERT_EQ(queueSize, queue.size());
// Pushing into the queue with exception empties the queue.
ExceptionTester::throwFrom = producer.get_id();
// pop an item to unblock the pusher
(void)queue.popFront();
ASSERT_TRUE(semaphore.waitFor(std::chrono::seconds{1}));
ASSERT_EQ(1u, caught);
ASSERT_EQ(0u, queue.size());
ASSERT_TRUE(queue.empty());
// after throwing, the queue works fine.
ASSERT_EQ(QueueReturn::Success, queue.pushBack(ExceptionTester()));
ASSERT_EQ(QueueReturn::Success, queue.pushBack(ExceptionTester()));
ASSERT_EQ(QueueReturn::Success, queue.pushBack(ExceptionTester()));
ASSERT_NE(QueueReturn::Success, queue.tryPushBack(ExceptionTester()));
ExceptionTester::throwFrom = std::thread::id();
producer.join();
}
TEST(TestQueue, moveIt)
{
static constexpr size_t queueSize = 40;
Queue< MoveTester > queue{queueSize};
size_t counter = 0;
queue.pushBack(MoveTester{counter, 0});
ASSERT_EQ(1u, counter);
MoveTester tester2(counter, 2);
queue.pushBack(std::move(tester2));
ASSERT_TRUE(tester2.moved);
ASSERT_EQ(2u, counter);
ASSERT_EQ(QueueReturn::Success, queue.tryPushBack(MoveTester{counter, 3}));
ASSERT_EQ(3u, counter);
MoveTester tester4(counter, 4);
ASSERT_EQ(QueueReturn::Success, queue.tryPushBack(std::move(tester4)));
ASSERT_TRUE(tester4.moved);
ASSERT_EQ(4u, counter);
MoveTester popped = queue.popFront();
(void)popped;
ASSERT_EQ(5u, counter);
std::optional< MoveTester > optPopped = queue.tryPopFront();
ASSERT_TRUE(optPopped.has_value());
// Moved twice here to construct the optional.
ASSERT_EQ(6u, counter);
}
Loading…
Cancel
Save