carb/container/LocklessQueue.h
File members: carb/container/LocklessQueue.h
// Copyright (c) 2019-2024, NVIDIA CORPORATION. All rights reserved.
//
// NVIDIA CORPORATION and its licensors retain all intellectual property
// and proprietary rights in and to this software, related documentation
// and any modifications thereto. Any use, reproduction, disclosure or
// distribution of this software and related documentation without an express
// license agreement from NVIDIA CORPORATION is strictly prohibited.
//
#pragma once
#include "../Defines.h"
#include "../cpp/Atomic.h"
#include "../thread/Util.h"
#include <thread>
namespace carb
{
namespace container
{
template <class T>
class LocklessQueueLink;
template <class T, LocklessQueueLink<T> T::*U>
class LocklessQueue;
template <class T>
class LocklessQueueLink
{
public:
constexpr LocklessQueueLink() = default;
CARB_PREVENT_COPY_AND_MOVE(LocklessQueueLink);
private:
CARB_VIZ std::atomic<T*> m_next;
friend T;
template <class U, LocklessQueueLink<U> U::*V>
friend class LocklessQueue;
};
template <class T, LocklessQueueLink<T> T::*U>
class CARB_VIZ LocklessQueue
{
public:
constexpr LocklessQueue() : m_head(nullptr), m_tail(nullptr)
{
}
CARB_PREVENT_COPY(LocklessQueue);
LocklessQueue(LocklessQueue&& rhs) : m_head(nullptr), m_tail(nullptr)
{
this->steal(rhs);
}
LocklessQueue& operator=(LocklessQueue&&) = delete;
~LocklessQueue()
{
// Not good to destroy when not empty
CARB_ASSERT(isEmpty());
}
bool isEmpty() const
{
// Reading the tail is more efficient because much contention can happen on m_head
return !m_tail.load(std::memory_order_relaxed);
}
bool push(T* p)
{
// Make sure the node isn't already pointing at something
next(p).store(nullptr, std::memory_order_relaxed);
return _push(p, p);
}
#ifndef DOXYGEN_BUILD
template <class InputItRef,
std::enable_if_t<std::is_convertible<decltype(std::declval<InputItRef&>()++, *std::declval<InputItRef&>()), T&>::value,
bool> = false>
#else
template <class InputItRef>
#endif
bool push(InputItRef begin, InputItRef end)
{
// Handle empty list
if (begin == end)
return false;
// Walk through iterators and have them point to each other
InputItRef last = begin;
InputItRef iter = begin;
iter++;
for (; iter != end; last = iter++)
{
next(std::addressof(*last)).store(std::addressof(*iter), std::memory_order_relaxed);
}
next(std::addressof(*last)).store(nullptr, std::memory_order_relaxed);
return _push(std::addressof(*begin), std::addressof(*last));
}
#ifndef DOXYGEN_BUILD
template <class InputItPtr,
std::enable_if_t<std::is_convertible<decltype(std::declval<InputItPtr&>()++, *std::declval<InputItPtr&>()), T*>::value,
bool> = true>
#else
template <class InputItPtr>
#endif
bool push(InputItPtr begin, InputItPtr end)
{
// Handle empty list
if (begin == end)
return false;
// Walk through iterators and have them point to each other
InputItPtr last = begin;
InputItPtr iter = begin;
iter++;
for (; iter != end; last = iter++)
{
next(*last).store(*iter, std::memory_order_relaxed);
}
next(*last).store(nullptr, std::memory_order_relaxed);
return _push(*begin, *last);
}
CARB_NODISCARD LocklessQueue eject()
{
return LocklessQueue(std::move(*this));
}
void steal(LocklessQueue& rhs)
{
static T* const mediator = reinterpret_cast<T*>(size_t(-1));
// Need to pop everything from rhs, so this is similar to popAll()
T* head;
for (;;)
{
// The mediator acts as both a lock and a signal
head = rhs.m_head.exchange(mediator, std::memory_order_relaxed);
if (CARB_UNLIKELY(!head))
{
// The first xchg on the tail will tell the enqueuing thread that it is safe to blindly write out to the
// head pointer. A cmpxchg honors the algorithm.
T* m = mediator;
if (CARB_UNLIKELY(!rhs.m_head.compare_exchange_strong(
m, nullptr, std::memory_order_relaxed, std::memory_order_relaxed)))
{
// Couldn't write a nullptr back. Try again.
continue;
}
if (CARB_UNLIKELY(!!rhs.m_tail.load(std::memory_order_relaxed)))
{
bool isNull;
// Wait for consistency
this_thread::spinWaitWithBackoff([&] { return rhs.consistent(isNull); });
if (!isNull)
{
// Try again.
continue;
}
}
// Nothing on the queue.
return;
}
if (CARB_UNLIKELY(head == mediator))
{
// Another thread is in a pop function. Wait until m_head is no longer the mediator.
this_thread::spinWaitWithBackoff([&] { return rhs.mediated(); });
// Try again.
continue;
}
break;
}
// Release our lock and swap with the tail
rhs.m_head.store(nullptr, std::memory_order_release);
T* end = rhs.m_tail.exchange(nullptr, std::memory_order_release);
// Push onto *this
_push(head, end);
}
void popAll()
{
static T* const mediator = reinterpret_cast<T*>(size_t(-1));
T* head;
for (;;)
{
// The mediator acts as both a lock and a signal
head = m_head.exchange(mediator, std::memory_order_relaxed);
if (CARB_UNLIKELY(!head))
{
// The first xchg on the tail will tell the enqueuing thread that it is safe to blindly write out to the
// head pointer. A cmpxchg honors the algorithm.
T* m = mediator;
if (CARB_UNLIKELY(!m_head.compare_exchange_strong(
m, nullptr, std::memory_order_relaxed, std::memory_order_relaxed)))
{
// Couldn't write a nullptr back. Try again.
continue;
}
if (CARB_UNLIKELY(!!m_tail.load(std::memory_order_relaxed)))
{
bool isNull;
// Wait for consistency
this_thread::spinWaitWithBackoff([&] { return this->consistent(isNull); });
if (!isNull)
{
// Try again.
continue;
}
}
// Nothing on the queue.
return;
}
if (CARB_UNLIKELY(head == mediator))
{
// Another thread is in a pop function. Wait until m_head is no longer the mediator.
this_thread::spinWaitWithBackoff([&] { return this->mediated(); });
// Try again.
continue;
}
break;
}
// Release our lock and swap with the tail
m_head.store(nullptr, std::memory_order_release);
m_tail.exchange(nullptr, std::memory_order_release);
}
template <class Func>
void forEach(Func&& f)
{
static T* const mediator = reinterpret_cast<T*>(size_t(-1));
T* head;
for (;;)
{
// The mediator acts as both a lock and a signal
head = m_head.exchange(mediator, std::memory_order_relaxed);
if (CARB_UNLIKELY(!head))
{
// The first xchg on the tail will tell the enqueuing thread that it is safe to blindly write out to the
// head pointer. A cmpxchg honors the algorithm.
T* m = mediator;
if (CARB_UNLIKELY(!m_head.compare_exchange_strong(
m, nullptr, std::memory_order_relaxed, std::memory_order_relaxed)))
{
// Couldn't write a nullptr back. Try again.
continue;
}
if (CARB_UNLIKELY(!!m_tail.load(std::memory_order_relaxed)))
{
bool isNull;
// Wait for consistency
this_thread::spinWaitWithBackoff([&] { return this->consistent(isNull); });
if (!isNull)
{
// Try again.
continue;
}
}
// Nothing on the queue.
return;
}
if (CARB_UNLIKELY(head == mediator))
{
// Another thread is in a pop function. Wait until m_head is no longer the mediator.
this_thread::spinWaitWithBackoff([&] { return this->mediated(); });
// Try again.
continue;
}
break;
}
// Release our lock and swap with the tail
m_head.store(nullptr, std::memory_order_release);
T* e = m_tail.exchange(nullptr, std::memory_order_release);
for (T *p = head, *n; p; p = n)
{
// Ensure that we have a next item (except for `e`; the end of the queue). It's possible
// that a thread is in `push()` and has written the tail at the time of exchange, above,
// but has not yet written the previous item's next pointer.
n = next(p).load(std::memory_order_relaxed);
if (CARB_UNLIKELY(!n && p != e))
n = waitForEnqueue(next(p));
f(p);
}
}
T* popSC()
{
#if CARB_ASSERT_ENABLED
// For debug builds, swap with a mediator to ensure that another thread is not in this function
static T* const mediator = reinterpret_cast<T*>(size_t(-1));
T* h = m_head.exchange(mediator, std::memory_order_acquire);
CARB_ASSERT(
h != mediator, "LocklessQueue: Another thread is racing with popSC(). Use popMC() for multi-consumer.");
while (CARB_UNLIKELY(!h))
{
h = m_head.exchange(nullptr, std::memory_order_release);
if (h == mediator)
{
// We successfully swapped a nullptr for the mediator we put there.
return nullptr;
}
// Another thread in push() could've put something else here, so check it again.
}
#else
// If head is null the queue is empty
T* h = m_head.load(std::memory_order_acquire);
if (CARB_UNLIKELY(!h))
{
return nullptr;
}
#endif
// Load the next item and store into the head
T* n = next(h).load(std::memory_order_acquire);
m_head.store(n, std::memory_order_relaxed);
T* e = h;
if (CARB_UNLIKELY(!n && !m_tail.compare_exchange_strong(
e, nullptr, std::memory_order_release, std::memory_order_relaxed)))
{
// The next item was null, but we failed to write null to the tail, so another thread must have added
// something. Read the next value from `h` and store it in the _head.
n = next(h).load(std::memory_order_acquire);
if (CARB_UNLIKELY(!n))
n = waitForEnqueue(next(h));
m_head.store(n, std::memory_order_relaxed);
}
// This isn't really necessary but prevents dangling pointers.
next(h).store(nullptr, std::memory_order_relaxed);
return h;
}
T* popMC()
{
static T* const mediator = reinterpret_cast<T*>(size_t(-1));
T* head;
for (;;)
{
// The mediator acts as both a lock and a signal
head = m_head.exchange(mediator, std::memory_order_relaxed);
if (CARB_UNLIKELY(!head))
{
// The first xchg on the tail will tell the enqueuing thread that it is safe to blindly write out to the
// head pointer. A cmpxchg honors the algorithm.
T* m = mediator;
if (CARB_UNLIKELY(!m_head.compare_exchange_strong(
m, nullptr, std::memory_order_relaxed, std::memory_order_relaxed)))
{
// Couldn't write a nullptr back. Try again.
continue;
}
if (CARB_UNLIKELY(!!m_tail.load(std::memory_order_relaxed)))
{
bool isNull;
// Wait for consistency
this_thread::spinWaitWithBackoff([&] { return this->consistent(isNull); });
if (!isNull)
{
// Try again.
continue;
}
}
// Nothing on the queue.
return nullptr;
}
if (CARB_UNLIKELY(head == mediator))
{
// Another thread is in a pop function. Wait until m_head is no longer the mediator.
this_thread::spinWaitWithBackoff([&] { return this->mediated(); });
// Try again.
continue;
}
break;
}
// Restore the head pointer to a sane value before returning.
// If 'next' is nullptr, then this item _might_ be the last item.
T* n = next(head).load(std::memory_order_relaxed);
if (CARB_UNLIKELY(!n))
{
m_head.store(nullptr, std::memory_order_relaxed);
// Try to clear the tail to ensure the queue is now empty.
T* h = head;
if (m_tail.compare_exchange_strong(h, nullptr, std::memory_order_release, std::memory_order_relaxed))
{
// Both head and tail are nullptr now.
// Clear head's next pointer so that it's not dangling
next(head).store(nullptr, std::memory_order_relaxed);
return head;
}
// There must be a next item now.
n = next(head).load(std::memory_order_acquire);
if (CARB_UNLIKELY(!n))
n = waitForEnqueue(next(head));
}
m_head.store(n, std::memory_order_relaxed);
// Clear head's next pointer so that it's not dangling
next(head).store(nullptr, std::memory_order_relaxed);
return head;
}
T* pop()
{
return popMC();
}
bool pushNotify(T* p)
{
bool b = push(p);
notifyOne();
return b;
}
T* popSCWait()
{
T* p = popSC();
while (!p)
{
wait();
p = popSC();
}
return p;
}
template <class Rep, class Period>
T* popSCWaitFor(const std::chrono::duration<Rep, Period>& dur)
{
return popSCWaitUntil(std::chrono::steady_clock::now() + dur);
}
template <class Clock, class Duration>
T* popSCWaitUntil(const std::chrono::time_point<Clock, Duration>& tp)
{
T* p = popSC();
while (!p)
{
if (!waitUntil(tp))
{
return popSC();
}
p = popSC();
}
return p;
}
T* popMCWait()
{
T* p = popMC();
while (!p)
{
wait();
p = popMC();
}
return p;
}
template <class Rep, class Period>
T* popMCWaitFor(const std::chrono::duration<Rep, Period>& dur)
{
return popMCWaitUntil(std::chrono::steady_clock::now() + dur);
}
template <class Clock, class Duration>
T* popMCWaitUntil(const std::chrono::time_point<Clock, Duration>& tp)
{
T* p = popMC();
while (!p)
{
if (!waitUntil(tp))
{
return popMC();
}
p = popMC();
}
return p;
}
void wait()
{
m_tail.wait(nullptr, std::memory_order_relaxed);
}
template <class Rep, class Period>
bool waitFor(const std::chrono::duration<Rep, Period>& dur)
{
return m_tail.wait_for(nullptr, dur, std::memory_order_relaxed);
}
template <class Clock, class Duration>
bool waitUntil(const std::chrono::time_point<Clock, Duration>& tp)
{
return m_tail.wait_until(nullptr, tp, std::memory_order_relaxed);
}
void notifyOne()
{
m_tail.notify_one();
}
void notifyAll()
{
m_tail.notify_all();
}
private:
CARB_VIZ cpp::atomic<T*> m_head;
CARB_VIZ cpp::atomic<T*> m_tail;
constexpr static unsigned kWaitSpins = 1024;
bool _push(T* first, T* last)
{
// Swap the tail with our new last item
T* token = m_tail.exchange(last, std::memory_order_release);
CARB_ASSERT(token != last);
if (CARB_LIKELY(token))
{
// The previous tail item now points to our new first item.
next(token).store(first, std::memory_order_relaxed);
return false;
}
else
{
// Queue was empty; head points to our first item
m_head.store(first, std::memory_order_relaxed);
return true;
}
}
// This function name breaks naming paradigms so that it shows up prominently in stack traces.
CARB_NOINLINE T* __WAIT_FOR_ENQUEUE__(std::atomic<T*>& ptr)
{
T* val;
int spins = 0;
while ((val = ptr.load(std::memory_order_relaxed)) == nullptr)
{
spins++;
CARB_UNUSED(spins);
std::this_thread::yield();
}
return val;
}
T* waitForEnqueue(std::atomic<T*>& ptr)
{
unsigned spins = kWaitSpins;
T* val;
while (CARB_UNLIKELY(spins-- > 0))
{
if (CARB_LIKELY((val = ptr.load(std::memory_order_relaxed)) != nullptr))
return val;
CARB_HARDWARE_PAUSE();
}
return __WAIT_FOR_ENQUEUE__(ptr);
}
// Predicate: returns `false` until m_head and m_tail are both null or non-null
bool consistent(bool& isNull) const
{
T* h = m_head.load(std::memory_order_relaxed);
T* t = m_tail.load(std::memory_order_relaxed);
isNull = !t;
return !h == !t;
}
// Predicate: returns Ready when _head is no longer the mediator
bool mediated() const
{
static T* const mediator = reinterpret_cast<T*>(size_t(-1));
return m_head.load(std::memory_order_relaxed) != mediator;
}
static std::atomic<T*>& next(T* p)
{
return (p->*U).m_next;
}
};
} // namespace container
} // namespace carb