CI Fixes, squash

pull/2213/head
dr7ana 7 months ago
parent c18ad4c618
commit 7314c2a22a

@ -11,7 +11,7 @@ find_library(FOUNDATION Foundation REQUIRED)
find_library(NETEXT NetworkExtension REQUIRED)
find_library(COREFOUNDATION CoreFoundation REQUIRED)
target_link_libraries(lokinet-util PUBLIC ${FOUNDATION})
target_link_libraries(lokinet-base PUBLIC ${FOUNDATION})
target_sources(lokinet-platform PRIVATE vpn_platform.cpp vpn_interface.cpp route_manager.cpp context_wrapper.cpp)
@ -27,7 +27,7 @@ enable_lto(lokinet-extension)
target_compile_options(lokinet-extension PRIVATE -fobjc-arc)
if(MACOS_SYSTEM_EXTENSION)
target_compile_definitions(lokinet-extension PRIVATE MACOS_SYSTEM_EXTENSION)
target_compile_definitions(lokinet-util PUBLIC MACOS_SYSTEM_EXTENSION)
target_compile_definitions(lokinet-base PUBLIC MACOS_SYSTEM_EXTENSION)
else()
target_link_options(lokinet-extension PRIVATE -e _NSExtensionMain)
endif()
@ -41,7 +41,7 @@ else()
endif()
target_link_libraries(lokinet-extension PRIVATE
lokinet-amalgum
lokinet-core
${COREFOUNDATION}
${NETEXT})

@ -174,7 +174,7 @@ namespace llarp
mutable util::Mutex m; // protects persisting_conns
// sessions to persist -> timestamp to end persist at
std::unordered_map<RouterID, llarp_time_t> persisting_conns GUARDED_BY(_mutex);
std::unordered_map<RouterID, llarp_time_t> persisting_conns;
// holds any messages we attempt to send while connections are establishing
std::unordered_map<RouterID, MessageQueue> pending_conn_msg_queue;

@ -7,7 +7,7 @@ namespace llarp
/*
TODO:
- change these parameters to ustringviews and ustrings where needed after bumping oxenc
- nuke seq_no's
- change std::string sig(64, '\0') --> std::array<unsigned char, 64> sig
*/
namespace ObtainExitMessage

@ -6,7 +6,6 @@
#include "router_id.hpp"
#include "util/common.hpp"
#include "util/fs.hpp"
#include "util/thread/annotations.hpp"
#include "util/thread/threading.hpp"
#include <llarp/router/router.hpp>

@ -105,13 +105,13 @@ namespace llarp
using Lock_t = util::NullLock;
Mutex_t first; // protects second
TransitHopsMap_t second GUARDED_BY(first);
TransitHopsMap_t second;
/// Invokes a callback for each transit path; visit must be invokable with a `const
/// TransitHop_ptr&` argument.
template <typename TransitHopVisitor>
void
ForEach(TransitHopVisitor&& visit) EXCLUDES(first)
ForEach(TransitHopVisitor&& visit)
{
Lock_t lock(first);
for (const auto& item : second)
@ -125,7 +125,7 @@ namespace llarp
struct SyncOwnedPathsMap_t
{
util::Mutex first; // protects second
OwnedPathsMap_t second GUARDED_BY(first);
OwnedPathsMap_t second;
/// Invokes a callback for each owned path; visit must be invokable with a `const Path_ptr&`
/// argument.

@ -3,7 +3,6 @@
#include "path/path.hpp"
#include "router_id.hpp"
#include "util/bencode.hpp"
#include "util/thread/annotations.hpp"
#include "util/thread/threading.hpp"
#include <map>
@ -68,46 +67,45 @@ namespace llarp
/// generic variant
bool
IsBad(const RouterID& r, uint64_t chances = profiling_chances) EXCLUDES(m_ProfilesMutex);
IsBad(const RouterID& r, uint64_t chances = profiling_chances);
/// check if this router should have paths built over it
bool
IsBadForPath(const RouterID& r, uint64_t chances = profiling_chances) EXCLUDES(m_ProfilesMutex);
IsBadForPath(const RouterID& r, uint64_t chances = profiling_chances);
/// check if this router should be connected directly to
bool
IsBadForConnect(const RouterID& r, uint64_t chances = profiling_chances)
EXCLUDES(m_ProfilesMutex);
IsBadForConnect(const RouterID& r, uint64_t chances = profiling_chances);
void
MarkConnectTimeout(const RouterID& r) EXCLUDES(m_ProfilesMutex);
MarkConnectTimeout(const RouterID& r);
void
MarkConnectSuccess(const RouterID& r) EXCLUDES(m_ProfilesMutex);
MarkConnectSuccess(const RouterID& r);
void
MarkPathTimeout(path::Path* p) EXCLUDES(m_ProfilesMutex);
MarkPathTimeout(path::Path* p);
void
MarkPathFail(path::Path* p) EXCLUDES(m_ProfilesMutex);
MarkPathFail(path::Path* p);
void
MarkPathSuccess(path::Path* p) EXCLUDES(m_ProfilesMutex);
MarkPathSuccess(path::Path* p);
void
MarkHopFail(const RouterID& r) EXCLUDES(m_ProfilesMutex);
MarkHopFail(const RouterID& r);
void
ClearProfile(const RouterID& r) EXCLUDES(m_ProfilesMutex);
ClearProfile(const RouterID& r);
void
Tick() EXCLUDES(m_ProfilesMutex);
Tick();
bool
Load(const fs::path fname) EXCLUDES(m_ProfilesMutex);
Load(const fs::path fname);
bool
Save(const fs::path fname) EXCLUDES(m_ProfilesMutex);
Save(const fs::path fname);
bool
ShouldSave(llarp_time_t now) const;
@ -126,7 +124,7 @@ namespace llarp
BDecode(oxenc::bt_dict_consumer dict);
mutable util::Mutex m_ProfilesMutex; // protects m_Profiles
std::map<RouterID, RouterProfile> m_Profiles GUARDED_BY(m_ProfilesMutex);
std::map<RouterID, RouterProfile> m_Profiles;
llarp_time_t m_LastSave = 0s;
std::atomic<bool> m_DisableProfiling;
};

@ -136,11 +136,11 @@ namespace llarp
std::atomic<bool> isServiceNode = false;
// whitelist = active routers
std::unordered_set<RouterID> router_whitelist GUARDED_BY(_mutex);
std::unordered_set<RouterID> router_whitelist;
// greylist = fully funded, but decommissioned routers
std::unordered_set<RouterID> router_greylist GUARDED_BY(_mutex);
std::unordered_set<RouterID> router_greylist;
// greenlist = registered but not fully-staked routers
std::unordered_set<RouterID> router_greenlist GUARDED_BY(_mutex);
std::unordered_set<RouterID> router_greenlist;
};
} // namespace llarp

@ -1,53 +0,0 @@
#pragma once
#include <ciso646>
// Clang thread safety analysis macros. Does nothing under non-clang compilers.
// Enable thread safety attributes only with clang and libc++ (the latter
// because we are using stl mutexes, which don't have the required annotations
// under stdlibc++). The attributes can be safely erased when compiling with
// other compilers.
#if defined(__clang__) && defined(_LIBCPP_VERSION) && !defined(SWIG)
#define THREAD_ANNOTATION_ATTRIBUTE__(x) __attribute__((x))
#else
#define THREAD_ANNOTATION_ATTRIBUTE__(x) // no-op
#endif
#define CAPABILITY(x) THREAD_ANNOTATION_ATTRIBUTE__(capability(x))
#define SCOPED_CAPABILITY THREAD_ANNOTATION_ATTRIBUTE__(scoped_lockable)
#define GUARDED_BY(x) THREAD_ANNOTATION_ATTRIBUTE__(guarded_by(x))
#define PT_GUARDED_BY(x) THREAD_ANNOTATION_ATTRIBUTE__(pt_guarded_by(x))
#define ACQUIRED_BEFORE(...) THREAD_ANNOTATION_ATTRIBUTE__(acquired_before(__VA_ARGS__))
#define ACQUIRED_AFTER(...) THREAD_ANNOTATION_ATTRIBUTE__(acquired_after(__VA_ARGS__))
#define REQUIRES(...) THREAD_ANNOTATION_ATTRIBUTE__(requires_capability(__VA_ARGS__))
#define REQUIRES_SHARED(...) THREAD_ANNOTATION_ATTRIBUTE__(requires_shared_capability(__VA_ARGS__))
#define ACQUIRE(...) THREAD_ANNOTATION_ATTRIBUTE__(acquire_capability(__VA_ARGS__))
#define ACQUIRE_SHARED(...) THREAD_ANNOTATION_ATTRIBUTE__(acquire_shared_capability(__VA_ARGS__))
#define RELEASE(...) THREAD_ANNOTATION_ATTRIBUTE__(release_capability(__VA_ARGS__))
#define RELEASE_SHARED(...) THREAD_ANNOTATION_ATTRIBUTE__(release_shared_capability(__VA_ARGS__))
#define TRY_ACQUIRE(...) THREAD_ANNOTATION_ATTRIBUTE__(try_acquire_capability(__VA_ARGS__))
#define TRY_ACQUIRE_SHARED(...) \
THREAD_ANNOTATION_ATTRIBUTE__(try_acquire_shared_capability(__VA_ARGS__))
#define EXCLUDES(...) THREAD_ANNOTATION_ATTRIBUTE__(locks_excluded(__VA_ARGS__))
#define ASSERT_CAPABILITY(x) THREAD_ANNOTATION_ATTRIBUTE__(assert_capability(x))
#define ASSERT_SHARED_CAPABILITY(x) THREAD_ANNOTATION_ATTRIBUTE__(assert_shared_capability(x))
#define RETURN_CAPABILITY(x) THREAD_ANNOTATION_ATTRIBUTE__(lock_returned(x))
#define NO_THREAD_SAFETY_ANALYSIS THREAD_ANNOTATION_ATTRIBUTE__(no_thread_safety_analysis)

@ -7,556 +7,553 @@
#include <optional>
#include <tuple>
namespace llarp
namespace llarp::thread
{
namespace thread
template <typename Type>
class QueuePushGuard;
template <typename Type>
class QueuePopGuard;
template <typename Type>
class Queue
{
template <typename Type>
class QueuePushGuard;
template <typename Type>
class QueuePopGuard;
// This class provides a thread-safe, lock-free, fixed-size queue.
public:
static constexpr size_t Alignment = 64;
template <typename Type>
class Queue
{
// This class provides a thread-safe, lock-free, fixed-size queue.
public:
static constexpr size_t Alignment = 64;
private:
Type* m_data;
const char m_dataPadding[Alignment - sizeof(Type*)];
private:
Type* m_data;
const char m_dataPadding[Alignment - sizeof(Type*)];
QueueManager m_manager;
QueueManager m_manager;
std::atomic<std::uint32_t> m_waitingPoppers;
util::Semaphore m_popSemaphore;
const char m_popSemaphorePadding[(2u * Alignment) - sizeof(util::Semaphore)];
std::atomic<std::uint32_t> m_waitingPoppers;
util::Semaphore m_popSemaphore;
const char m_popSemaphorePadding[(2u * Alignment) - sizeof(util::Semaphore)];
std::atomic<std::uint32_t> m_waitingPushers;
util::Semaphore m_pushSemaphore;
const char m_pushSemaphorePadding[(2u * Alignment) - sizeof(util::Semaphore)];
std::atomic<std::uint32_t> m_waitingPushers;
util::Semaphore m_pushSemaphore;
const char m_pushSemaphorePadding[(2u * Alignment) - sizeof(util::Semaphore)];
friend QueuePopGuard<Type>;
friend QueuePushGuard<Type>;
friend QueuePopGuard<Type>;
friend QueuePushGuard<Type>;
public:
explicit Queue(size_t capacity);
public:
explicit Queue(size_t capacity);
~Queue();
~Queue();
Queue(const Queue&) = delete;
Queue&
operator=(const Queue&) = delete;
Queue(const Queue&) = delete;
Queue&
operator=(const Queue&) = delete;
// Push back to the queue, blocking until space is available (if
// required). Will fail if the queue is disabled (or becomes disabled
// while waiting for space on the queue).
QueueReturn
pushBack(const Type& value);
// Push back to the queue, blocking until space is available (if
// required). Will fail if the queue is disabled (or becomes disabled
// while waiting for space on the queue).
QueueReturn
pushBack(const Type& value);
QueueReturn
pushBack(Type&& value);
QueueReturn
pushBack(Type&& value);
// Try to push back to the queue. Return false if the queue is full or
// disabled.
QueueReturn
tryPushBack(const Type& value);
// Try to push back to the queue. Return false if the queue is full or
// disabled.
QueueReturn
tryPushBack(const Type& value);
QueueReturn
tryPushBack(Type&& value);
QueueReturn
tryPushBack(Type&& value);
// Remove an element from the queue. Block until an element is available
Type
popFront();
// Remove an element from the queue. Block until an element is available
Type
popFront();
// Remove an element from the queue. Block until an element is available
// or until <timeout> microseconds have elapsed
std::optional<Type>
popFrontWithTimeout(std::chrono::microseconds timeout);
// Remove an element from the queue. Block until an element is available
// or until <timeout> microseconds have elapsed
std::optional<Type>
popFrontWithTimeout(std::chrono::microseconds timeout);
std::optional<Type>
tryPopFront();
std::optional<Type>
tryPopFront();
// Remove all elements from the queue. Note this is not atomic, and if
// other threads `pushBack` onto the queue during this call, the `size` of
// the queue is not guaranteed to be 0.
void
removeAll();
// Remove all elements from the queue. Note this is not atomic, and if
// other threads `pushBack` onto the queue during this call, the `size` of
// the queue is not guaranteed to be 0.
void
removeAll();
// Disable the queue. All push operations will fail "fast" (including
// blocked operations). Calling this method on a disabled queue has no
// effect.
void
disable();
// Disable the queue. All push operations will fail "fast" (including
// blocked operations). Calling this method on a disabled queue has no
// effect.
void
disable();
// Enable the queue. Calling this method on a disabled queue has no
// effect.
void
enable();
// Enable the queue. Calling this method on a disabled queue has no
// effect.
void
enable();
size_t
capacity() const;
size_t
capacity() const;
size_t
size() const;
size_t
size() const;
bool
enabled() const;
bool
enabled() const;
bool
full() const;
bool
full() const;
bool
empty() const;
};
// Provide a guard class to provide exception safety for pushing to a queue.
// On destruction, unless the `release` method has been called, will remove
// and destroy all elements from the queue, putting the queue into an empty
// state.
template <typename Type>
class QueuePushGuard
{
private:
Queue<Type>* m_queue;
uint32_t m_generation;
uint32_t m_index;
bool
empty() const;
};
public:
QueuePushGuard(Queue<Type>& queue, uint32_t generation, uint32_t index)
: m_queue(&queue), m_generation(generation), m_index(index)
{}
// Provide a guard class to provide exception safety for pushing to a queue.
// On destruction, unless the `release` method has been called, will remove
// and destroy all elements from the queue, putting the queue into an empty
// state.
template <typename Type>
class QueuePushGuard
{
private:
Queue<Type>* m_queue;
uint32_t m_generation;
uint32_t m_index;
public:
QueuePushGuard(Queue<Type>& queue, uint32_t generation, uint32_t index)
: m_queue(&queue), m_generation(generation), m_index(index)
{}
~QueuePushGuard();
void
release();
};
// Provide a guard class to provide exception safety for popping from a
// queue. On destruction, this will pop the the given element from the
// queue.
template <typename Type>
class QueuePopGuard
{
private:
Queue<Type>& m_queue;
uint32_t m_generation;
uint32_t m_index;
public:
QueuePopGuard(Queue<Type>& queue, uint32_t generation, uint32_t index)
: m_queue(queue), m_generation(generation), m_index(index)
{}
~QueuePopGuard();
};
template <typename Type>
Queue<Type>::Queue(size_t capacity)
: m_data(nullptr)
, m_dataPadding()
, m_manager(capacity)
, m_waitingPoppers(0)
, m_popSemaphore(0)
, m_popSemaphorePadding()
, m_waitingPushers(0)
, m_pushSemaphore(0)
, m_pushSemaphorePadding()
{
m_data = static_cast<Type*>(::operator new(capacity * sizeof(Type)));
}
~QueuePushGuard();
template <typename Type>
Queue<Type>::~Queue()
{
removeAll();
void
release();
};
// Provide a guard class to provide exception safety for popping from a
// queue. On destruction, this will pop the the given element from the
// queue.
template <typename Type>
class QueuePopGuard
{
private:
Queue<Type>& m_queue;
uint32_t m_generation;
uint32_t m_index;
public:
QueuePopGuard(Queue<Type>& queue, uint32_t generation, uint32_t index)
: m_queue(queue), m_generation(generation), m_index(index)
{}
~QueuePopGuard();
};
template <typename Type>
Queue<Type>::Queue(size_t capacity)
: m_data(nullptr)
, m_dataPadding()
, m_manager(capacity)
, m_waitingPoppers(0)
, m_popSemaphore(0)
, m_popSemaphorePadding()
, m_waitingPushers(0)
, m_pushSemaphore(0)
, m_pushSemaphorePadding()
{
m_data = static_cast<Type*>(::operator new(capacity * sizeof(Type)));
}
// We have already deleted the queue members above, free as (void *)
::operator delete(static_cast<void*>(m_data));
}
template <typename Type>
Queue<Type>::~Queue()
{
removeAll();
template <typename Type>
QueueReturn
Queue<Type>::tryPushBack(const Type& value)
{
uint32_t generation = 0;
uint32_t index = 0;
// We have already deleted the queue members above, free as (void *)
::operator delete(static_cast<void*>(m_data));
}
// Sync point A
//
// The next call writes with full sequential consistency to the push
// index, which guarantees that the relaxed read to the waiting poppers
// count sees any waiting poppers from Sync point B.
template <typename Type>
QueueReturn
Queue<Type>::tryPushBack(const Type& value)
{
uint32_t generation = 0;
uint32_t index = 0;
QueueReturn retVal = m_manager.reservePushIndex(generation, index);
// Sync point A
//
// The next call writes with full sequential consistency to the push
// index, which guarantees that the relaxed read to the waiting poppers
// count sees any waiting poppers from Sync point B.
if (retVal != QueueReturn::Success)
{
return retVal;
}
QueueReturn retVal = m_manager.reservePushIndex(generation, index);
// Copy into the array. If the copy constructor throws, the pushGuard will
// roll the reserve back.
if (retVal != QueueReturn::Success)
{
return retVal;
}
QueuePushGuard<Type> pushGuard(*this, generation, index);
// Copy into the array. If the copy constructor throws, the pushGuard will
// roll the reserve back.
// Construct in place.
::new (&m_data[index]) Type(value);
QueuePushGuard<Type> pushGuard(*this, generation, index);
pushGuard.release();
// Construct in place.
::new (&m_data[index]) Type(value);
m_manager.commitPushIndex(generation, index);
pushGuard.release();
if (m_waitingPoppers > 0)
{
m_popSemaphore.notify();
}
m_manager.commitPushIndex(generation, index);
return QueueReturn::Success;
if (m_waitingPoppers > 0)
{
m_popSemaphore.notify();
}
template <typename Type>
QueueReturn
Queue<Type>::tryPushBack(Type&& value)
{
uint32_t generation = 0;
uint32_t index = 0;
return QueueReturn::Success;
}
// Sync point A
//
// The next call writes with full sequential consistency to the push
// index, which guarantees that the relaxed read to the waiting poppers
// count sees any waiting poppers from Sync point B.
template <typename Type>
QueueReturn
Queue<Type>::tryPushBack(Type&& value)
{
uint32_t generation = 0;
uint32_t index = 0;
QueueReturn retVal = m_manager.reservePushIndex(generation, index);
// Sync point A
//
// The next call writes with full sequential consistency to the push
// index, which guarantees that the relaxed read to the waiting poppers
// count sees any waiting poppers from Sync point B.
if (retVal != QueueReturn::Success)
{
return retVal;
}
QueueReturn retVal = m_manager.reservePushIndex(generation, index);
// Copy into the array. If the copy constructor throws, the pushGuard will
// roll the reserve back.
if (retVal != QueueReturn::Success)
{
return retVal;
}
QueuePushGuard<Type> pushGuard(*this, generation, index);
// Copy into the array. If the copy constructor throws, the pushGuard will
// roll the reserve back.
Type& dummy = value;
QueuePushGuard<Type> pushGuard(*this, generation, index);
// Construct in place.
::new (&m_data[index]) Type(std::move(dummy));
Type& dummy = value;
pushGuard.release();
// Construct in place.
::new (&m_data[index]) Type(std::move(dummy));
m_manager.commitPushIndex(generation, index);
pushGuard.release();
if (m_waitingPoppers > 0)
{
m_popSemaphore.notify();
}
m_manager.commitPushIndex(generation, index);
return QueueReturn::Success;
if (m_waitingPoppers > 0)
{
m_popSemaphore.notify();
}
template <typename Type>
std::optional<Type>
Queue<Type>::tryPopFront()
return QueueReturn::Success;
}
template <typename Type>
std::optional<Type>
Queue<Type>::tryPopFront()
{
uint32_t generation;
uint32_t index;
// Sync Point C.
//
// The call to reservePopIndex writes with full *sequential* consistency,
// which guarantees the relaxed read to waiting poppers is synchronized
// with Sync Point D.
QueueReturn retVal = m_manager.reservePopIndex(generation, index);
if (retVal != QueueReturn::Success)
{
uint32_t generation;
uint32_t index;
return {};
}
// Sync Point C.
//
// The call to reservePopIndex writes with full *sequential* consistency,
// which guarantees the relaxed read to waiting poppers is synchronized
// with Sync Point D.
// Pop guard will (even if the move/copy constructor throws)
// - destroy the original object
// - update the queue
// - notify any waiting pushers
QueueReturn retVal = m_manager.reservePopIndex(generation, index);
QueuePopGuard<Type> popGuard(*this, generation, index);
return std::optional<Type>(std::move(m_data[index]));
}
template <typename Type>
QueueReturn
Queue<Type>::pushBack(const Type& value)
{
for (;;)
{
QueueReturn retVal = tryPushBack(value);
if (retVal != QueueReturn::Success)
switch (retVal)
{
return {};
// Queue disabled.
case QueueReturn::QueueDisabled:
// We pushed the value back
case QueueReturn::Success:
return retVal;
default:
// continue on.
break;
}
// Pop guard will (even if the move/copy constructor throws)
// - destroy the original object
// - update the queue
// - notify any waiting pushers
m_waitingPushers.fetch_add(1, std::memory_order_relaxed);
QueuePopGuard<Type> popGuard(*this, generation, index);
return std::optional<Type>(std::move(m_data[index]));
}
// Sync Point B.
//
// The call to `full` below loads the push index with full *sequential*
// consistency, which gives visibility of the change above to
// waiting pushers in Synchronisation Point B.
template <typename Type>
QueueReturn
Queue<Type>::pushBack(const Type& value)
{
for (;;)
if (full() && enabled())
{
QueueReturn retVal = tryPushBack(value);
switch (retVal)
{
// Queue disabled.
case QueueReturn::QueueDisabled:
// We pushed the value back
case QueueReturn::Success:
return retVal;
default:
// continue on.
break;
}
m_waitingPushers.fetch_add(1, std::memory_order_relaxed);
// Sync Point B.
//
// The call to `full` below loads the push index with full *sequential*
// consistency, which gives visibility of the change above to
// waiting pushers in Synchronisation Point B.
if (full() && enabled())
{
m_pushSemaphore.wait();
}
m_waitingPushers.fetch_add(-1, std::memory_order_relaxed);
m_pushSemaphore.wait();
}
m_waitingPushers.fetch_add(-1, std::memory_order_relaxed);
}
}
template <typename Type>
QueueReturn
Queue<Type>::pushBack(Type&& value)
template <typename Type>
QueueReturn
Queue<Type>::pushBack(Type&& value)
{
for (;;)
{
for (;;)
QueueReturn retVal = tryPushBack(std::move(value));
switch (retVal)
{
QueueReturn retVal = tryPushBack(std::move(value));
switch (retVal)
{
// Queue disabled.
case QueueReturn::QueueDisabled:
// We pushed the value back
case QueueReturn::Success:
return retVal;
default:
// continue on.
break;
}
m_waitingPushers.fetch_add(1, std::memory_order_relaxed);
// Sync Point B.
//
// The call to `full` below loads the push index with full *sequential*
// consistency, which gives visibility of the change above to
// waiting pushers in Synchronisation Point C.
if (full() && enabled())
{
m_pushSemaphore.wait();
}
m_waitingPushers.fetch_add(-1, std::memory_order_relaxed);
// Queue disabled.
case QueueReturn::QueueDisabled:
// We pushed the value back
case QueueReturn::Success:
return retVal;
default:
// continue on.
break;
}
}
template <typename Type>
Type
Queue<Type>::popFront()
{
uint32_t generation = 0;
uint32_t index = 0;
while (m_manager.reservePopIndex(generation, index) != QueueReturn::Success)
{
m_waitingPoppers.fetch_add(1, std::memory_order_relaxed);
m_waitingPushers.fetch_add(1, std::memory_order_relaxed);
if (empty())
{
m_popSemaphore.wait();
}
// Sync Point B.
//
// The call to `full` below loads the push index with full *sequential*
// consistency, which gives visibility of the change above to
// waiting pushers in Synchronisation Point C.
m_waitingPoppers.fetch_sub(1, std::memory_order_relaxed);
if (full() && enabled())
{
m_pushSemaphore.wait();
}
QueuePopGuard<Type> popGuard(*this, generation, index);
return Type(std::move(m_data[index]));
m_waitingPushers.fetch_add(-1, std::memory_order_relaxed);
}
}
template <typename Type>
std::optional<Type>
Queue<Type>::popFrontWithTimeout(std::chrono::microseconds timeout)
template <typename Type>
Type
Queue<Type>::popFront()
{
uint32_t generation = 0;
uint32_t index = 0;
while (m_manager.reservePopIndex(generation, index) != QueueReturn::Success)
{
uint32_t generation = 0;
uint32_t index = 0;
bool secondTry = false;
bool success = false;
for (;;)
m_waitingPoppers.fetch_add(1, std::memory_order_relaxed);
if (empty())
{
success = m_manager.reservePopIndex(generation, index) == QueueReturn::Success;
m_popSemaphore.wait();
}
if (secondTry || success)
break;
m_waitingPoppers.fetch_sub(1, std::memory_order_relaxed);
}
m_waitingPoppers.fetch_add(1, std::memory_order_relaxed);
QueuePopGuard<Type> popGuard(*this, generation, index);
return Type(std::move(m_data[index]));
}
if (empty())
{
m_popSemaphore.waitFor(timeout);
secondTry = true;
}
template <typename Type>
std::optional<Type>
Queue<Type>::popFrontWithTimeout(std::chrono::microseconds timeout)
{
uint32_t generation = 0;
uint32_t index = 0;
bool secondTry = false;
bool success = false;
for (;;)
{
success = m_manager.reservePopIndex(generation, index) == QueueReturn::Success;
m_waitingPoppers.fetch_sub(1, std::memory_order_relaxed);
}
if (secondTry || success)
break;
if (success)
m_waitingPoppers.fetch_add(1, std::memory_order_relaxed);
if (empty())
{
QueuePopGuard<Type> popGuard(*this, generation, index);
return Type(std::move(m_data[index]));
m_popSemaphore.waitFor(timeout);
secondTry = true;
}
return {};
m_waitingPoppers.fetch_sub(1, std::memory_order_relaxed);
}
template <typename Type>
void
Queue<Type>::removeAll()
if (success)
{
size_t elemCount = size();
uint32_t poppedItems = 0;
QueuePopGuard<Type> popGuard(*this, generation, index);
return Type(std::move(m_data[index]));
}
while (poppedItems++ < elemCount)
{
uint32_t generation = 0;
uint32_t index = 0;
return {};
}
if (m_manager.reservePopIndex(generation, index) != QueueReturn::Success)
{
break;
}
template <typename Type>
void
Queue<Type>::removeAll()
{
size_t elemCount = size();
m_data[index].~Type();
m_manager.commitPopIndex(generation, index);
}
uint32_t poppedItems = 0;
size_t wakeups = std::min(poppedItems, m_waitingPushers.load());
while (poppedItems++ < elemCount)
{
uint32_t generation = 0;
uint32_t index = 0;
while (wakeups--)
if (m_manager.reservePopIndex(generation, index) != QueueReturn::Success)
{
m_pushSemaphore.notify();
break;
}
m_data[index].~Type();
m_manager.commitPopIndex(generation, index);
}
template <typename Type>
void
Queue<Type>::disable()
size_t wakeups = std::min(poppedItems, m_waitingPushers.load());
while (wakeups--)
{
m_manager.disable();
m_pushSemaphore.notify();
}
}
uint32_t numWaiting = m_waitingPushers;
template <typename Type>
void
Queue<Type>::disable()
{
m_manager.disable();
while (numWaiting--)
{
m_pushSemaphore.notify();
}
}
uint32_t numWaiting = m_waitingPushers;
template <typename Type>
void
Queue<Type>::enable()
while (numWaiting--)
{
m_manager.enable();
m_pushSemaphore.notify();
}
}
template <typename Type>
size_t
Queue<Type>::capacity() const
{
return m_manager.capacity();
}
template <typename Type>
void
Queue<Type>::enable()
{
m_manager.enable();
}
template <typename Type>
size_t
Queue<Type>::size() const
{
return m_manager.size();
}
template <typename Type>
size_t
Queue<Type>::capacity() const
{
return m_manager.capacity();
}
template <typename Type>
bool
Queue<Type>::enabled() const
{
return m_manager.enabled();
}
template <typename Type>
size_t
Queue<Type>::size() const
{
return m_manager.size();
}
template <typename Type>
bool
Queue<Type>::full() const
{
return (capacity() <= size());
}
template <typename Type>
bool
Queue<Type>::enabled() const
{
return m_manager.enabled();
}
template <typename Type>
bool
Queue<Type>::empty() const
{
return (0 >= size());
}
template <typename Type>
bool
Queue<Type>::full() const
{
return (capacity() <= size());
}
template <typename Type>
bool
Queue<Type>::empty() const
{
return (0 >= size());
}
template <typename Type>
QueuePushGuard<Type>::~QueuePushGuard()
template <typename Type>
QueuePushGuard<Type>::~QueuePushGuard()
{
if (m_queue)
{
if (m_queue)
{
// Thread currently has the cell at index/generation. Dispose of it.
// Thread currently has the cell at index/generation. Dispose of it.
uint32_t generation = 0;
uint32_t index = 0;
uint32_t generation = 0;
uint32_t index = 0;
// We should always have at least one item to pop.
size_t poppedItems = 1;
// We should always have at least one item to pop.
size_t poppedItems = 1;
while (m_queue->m_manager.reservePopForClear(generation, index, m_generation, m_index))
{
m_queue->m_data[index].~Type();
while (m_queue->m_manager.reservePopForClear(generation, index, m_generation, m_index))
{
m_queue->m_data[index].~Type();
poppedItems++;
poppedItems++;
m_queue->m_manager.commitPopIndex(generation, index);
}
m_queue->m_manager.commitPopIndex(generation, index);
}
// And release
// And release
m_queue->m_manager.abortPushIndexReservation(m_generation, m_index);
m_queue->m_manager.abortPushIndexReservation(m_generation, m_index);
while (poppedItems--)
{
m_queue->m_pushSemaphore.notify();
}
while (poppedItems--)
{
m_queue->m_pushSemaphore.notify();
}
}
}
template <typename Type>
void
QueuePushGuard<Type>::release()
{
m_queue = nullptr;
}
template <typename Type>
void
QueuePushGuard<Type>::release()
{
m_queue = nullptr;
}
template <typename Type>
QueuePopGuard<Type>::~QueuePopGuard()
{
m_queue.m_data[m_index].~Type();
m_queue.m_manager.commitPopIndex(m_generation, m_index);
template <typename Type>
QueuePopGuard<Type>::~QueuePopGuard()
{
m_queue.m_data[m_index].~Type();
m_queue.m_manager.commitPopIndex(m_generation, m_index);
// Notify a pusher
if (m_queue.m_waitingPushers > 0)
{
m_queue.m_pushSemaphore.notify();
}
// Notify a pusher
if (m_queue.m_waitingPushers > 0)
{
m_queue.m_pushSemaphore.notify();
}
}
} // namespace thread
} // namespace llarp
} // namespace llarp::thread

@ -1,7 +1,5 @@
#pragma once
#include "annotations.hpp"
#include <condition_variable>
#include <iostream>
#include <mutex>
@ -18,165 +16,141 @@ using pid_t = int;
#include <unistd.h>
#endif
#ifdef TRACY_ENABLE
#include <Tracy.hpp>
#define DECLARE_LOCK(type, var, ...) TracyLockable(type, var)
#else
#define DECLARE_LOCK(type, var, ...) type var __VA_ARGS__
#endif
namespace llarp
namespace llarp::util
{
namespace util
/// a mutex that does nothing
///
/// this exists to convert mutexes that were initially in use (but may no
/// longer be necessary) into no-op placeholders (except in debug mode
/// where they complain loudly when they are actually accessed across
/// different threads; see below).
///
/// the idea is to "turn off" the mutexes and see where they are actually
/// needed.
struct NullMutex
{
/// a mutex that does nothing
///
/// this exists to convert mutexes that were initially in use (but may no
/// longer be necessary) into no-op placeholders (except in debug mode
/// where they complain loudly when they are actually accessed across
/// different threads; see below).
///
/// the idea is to "turn off" the mutexes and see where they are actually
/// needed.
struct CAPABILITY("mutex") NullMutex
{
#ifdef LOKINET_DEBUG
/// in debug mode, we implement lock() to enforce that any lock is only
/// used from a single thread. the point of this is to identify locks that
/// are actually needed by dying a painful death when used across threads
mutable std::optional<std::thread::id> m_id;
void
lock() const
{
if (!m_id)
{
m_id = std::this_thread::get_id();
}
else if (*m_id != std::this_thread::get_id())
{
std::cerr << "NullMutex " << this << " was used across threads: locked by "
<< std::this_thread::get_id() << " and was previously locked by " << *m_id
<< "\n";
// if you're encountering this abort() call, you may have discovered a
// case where a NullMutex should be reverted to a "real mutex"
std::abort();
}
}
#else
void
lock() const
{}
#endif
// Does nothing; once locked the mutex belongs to that thread forever
void
unlock() const
{}
};
/// a lock that does nothing
struct SCOPED_CAPABILITY NullLock
/// in debug mode, we implement lock() to enforce that any lock is only
/// used from a single thread. the point of this is to identify locks that
/// are actually needed by dying a painful death when used across threads
mutable std::optional<std::thread::id> m_id;
void
lock() const
{
NullLock(NullMutex& mtx) ACQUIRE(mtx)
if (!m_id)
{
mtx.lock();
m_id = std::this_thread::get_id();
}
~NullLock() RELEASE()
else if (*m_id != std::this_thread::get_id())
{
(void)this; // trick clang-tidy
std::cerr << "NullMutex " << this << " was used across threads: locked by "
<< std::this_thread::get_id() << " and was previously locked by " << *m_id
<< "\n";
// if you're encountering this abort() call, you may have discovered a
// case where a NullMutex should be reverted to a "real mutex"
std::abort();
}
};
/// Default mutex type, supporting shared and exclusive locks.
using Mutex = std::shared_timed_mutex;
/// Basic RAII lock type for the default mutex type.
using Lock = std::lock_guard<Mutex>;
}
#else
void
lock() const
{}
#endif
// Does nothing; once locked the mutex belongs to that thread forever
void
unlock() const
{}
};
/// Obtains multiple unique locks simultaneously and atomically. Returns a
/// tuple of all the held locks.
template <typename... Mutex>
[[nodiscard]] auto
unique_locks(Mutex&... lockables)
/// a lock that does nothing
struct NullLock
{
NullLock(NullMutex& mtx)
{
std::lock(lockables...);
return std::make_tuple(std::unique_lock{lockables, std::adopt_lock}...);
mtx.lock();
}
class Semaphore
~NullLock()
{
private:
std::mutex m_mutex; // protects m_count
size_t m_count GUARDED_BY(m_mutex);
std::condition_variable m_cv;
(void)this; // trick clang-tidy
}
};
public:
Semaphore(size_t count) : m_count(count)
{}
/// Default mutex type, supporting shared and exclusive locks.
using Mutex = std::shared_timed_mutex;
void
notify() EXCLUDES(m_mutex)
{
{
std::lock_guard<std::mutex> lock(m_mutex);
m_count++;
}
m_cv.notify_one();
}
/// Basic RAII lock type for the default mutex type.
using Lock = std::lock_guard<Mutex>;
void
wait() EXCLUDES(m_mutex)
{
std::unique_lock lock{m_mutex};
m_cv.wait(lock, [this] { return m_count > 0; });
m_count--;
}
class Semaphore
{
private:
std::mutex m_mutex; // protects m_count
size_t m_count;
std::condition_variable m_cv;
bool
waitFor(std::chrono::microseconds timeout) EXCLUDES(m_mutex)
{
std::unique_lock lock{m_mutex};
if (!m_cv.wait_for(lock, timeout, [this] { return m_count > 0; }))
return false;
public:
Semaphore(size_t count) : m_count(count)
{}
m_count--;
return true;
void
notify()
{
{
std::lock_guard<std::mutex> lock(m_mutex);
m_count++;
}
};
m_cv.notify_one();
}
void
SetThreadName(const std::string& name);
wait()
{
std::unique_lock lock{m_mutex};
m_cv.wait(lock, [this] { return m_count > 0; });
m_count--;
}
inline pid_t
GetPid()
bool
waitFor(std::chrono::microseconds timeout)
{
std::unique_lock lock{m_mutex};
if (!m_cv.wait_for(lock, timeout, [this] { return m_count > 0; }))
return false;
m_count--;
return true;
}
};
void
SetThreadName(const std::string& name);
inline pid_t
GetPid()
{
#ifdef WIN32
return _getpid();
return _getpid();
#else
return ::getpid();
return ::getpid();
#endif
}
}
// type for detecting contention on a resource
struct ContentionKiller
// type for detecting contention on a resource
struct ContentionKiller
{
template <typename F>
void
TryAccess(F visit) const
{
template <typename F>
void
TryAccess(F visit) const
#if defined(LOKINET_DEBUG)
EXCLUDES(_access)
NullLock lock(_access);
#endif
{
#if defined(LOKINET_DEBUG)
NullLock lock(_access);
#endif
visit();
}
visit();
}
#if defined(LOKINET_DEBUG)
private:
mutable NullMutex _access;
private:
mutable NullMutex _access;
#endif
};
} // namespace util
} // namespace llarp
};
} // namespace llarp::util

Loading…
Cancel
Save