carb/container/LocklessQueue.h

File members: carb/container/LocklessQueue.h

// Copyright (c) 2019-2024, NVIDIA CORPORATION. All rights reserved.
//
// NVIDIA CORPORATION and its licensors retain all intellectual property
// and proprietary rights in and to this software, related documentation
// and any modifications thereto. Any use, reproduction, disclosure or
// distribution of this software and related documentation without an express
// license agreement from NVIDIA CORPORATION is strictly prohibited.
//

#pragma once

#include "../Defines.h"
#include "../cpp/Atomic.h"
#include "../thread/Util.h"

#include <thread>

namespace carb
{

namespace container
{

template <class T>
class LocklessQueueLink;
template <class T, LocklessQueueLink<T> T::*U>
class LocklessQueue;

template <class T>
class LocklessQueueLink
{
public:
    constexpr LocklessQueueLink() = default;

    CARB_PREVENT_COPY_AND_MOVE(LocklessQueueLink);

private:
    CARB_VIZ std::atomic<T*> m_next;

    friend T;
    template <class U, LocklessQueueLink<U> U::*V>
    friend class LocklessQueue;
};

template <class T, LocklessQueueLink<T> T::*U>
class CARB_VIZ LocklessQueue
{
public:
    constexpr LocklessQueue() : m_head(nullptr), m_tail(nullptr)
    {
    }

    CARB_PREVENT_COPY(LocklessQueue);

    LocklessQueue(LocklessQueue&& rhs) : m_head(nullptr), m_tail(nullptr)
    {
        this->steal(rhs);
    }

    LocklessQueue& operator=(LocklessQueue&&) = delete;

    ~LocklessQueue()
    {
        // Not good to destroy when not empty
        CARB_ASSERT(isEmpty());
    }

    bool isEmpty() const
    {
        // Reading the tail is more efficient because much contention can happen on m_head
        return !m_tail.load(std::memory_order_relaxed);
    }

    bool push(T* p)
    {
        // Make sure the node isn't already pointing at something
        next(p).store(nullptr, std::memory_order_relaxed);
        return _push(p, p);
    }

#ifndef DOXYGEN_BUILD
    template <class InputItRef,
              std::enable_if_t<std::is_convertible<decltype(std::declval<InputItRef&>()++, *std::declval<InputItRef&>()), T&>::value,
                               bool> = false>
#else
    template <class InputItRef>
#endif
    bool push(InputItRef begin, InputItRef end)
    {
        // Handle empty list
        if (begin == end)
            return false;

        // Walk through iterators and have them point to each other
        InputItRef last = begin;
        InputItRef iter = begin;
        iter++;
        for (; iter != end; last = iter++)
        {
            next(std::addressof(*last)).store(std::addressof(*iter), std::memory_order_relaxed);
        }

        next(std::addressof(*last)).store(nullptr, std::memory_order_relaxed);

        return _push(std::addressof(*begin), std::addressof(*last));
    }

#ifndef DOXYGEN_BUILD
    template <class InputItPtr,
              std::enable_if_t<std::is_convertible<decltype(std::declval<InputItPtr&>()++, *std::declval<InputItPtr&>()), T*>::value,
                               bool> = true>
#else
    template <class InputItPtr>
#endif
    bool push(InputItPtr begin, InputItPtr end)
    {
        // Handle empty list
        if (begin == end)
            return false;

        // Walk through iterators and have them point to each other
        InputItPtr last = begin;
        InputItPtr iter = begin;
        iter++;
        for (; iter != end; last = iter++)
        {
            next(*last).store(*iter, std::memory_order_relaxed);
        }

        next(*last).store(nullptr, std::memory_order_relaxed);

        return _push(*begin, *last);
    }

    CARB_NODISCARD LocklessQueue eject()
    {
        return LocklessQueue(std::move(*this));
    }

    void steal(LocklessQueue& rhs)
    {
        static T* const mediator = reinterpret_cast<T*>(size_t(-1));

        // Need to pop everything from rhs, so this is similar to popAll()
        T* head;
        for (;;)
        {
            // The mediator acts as both a lock and a signal
            head = rhs.m_head.exchange(mediator, std::memory_order_relaxed);

            if (CARB_UNLIKELY(!head))
            {
                // The first xchg on the tail will tell the enqueuing thread that it is safe to blindly write out to the
                // head pointer. A cmpxchg honors the algorithm.
                T* m = mediator;
                if (CARB_UNLIKELY(!rhs.m_head.compare_exchange_strong(
                        m, nullptr, std::memory_order_relaxed, std::memory_order_relaxed)))
                {
                    // Couldn't write a nullptr back. Try again.
                    continue;
                }
                if (CARB_UNLIKELY(!!rhs.m_tail.load(std::memory_order_relaxed)))
                {
                    bool isNull;
                    // Wait for consistency
                    this_thread::spinWaitWithBackoff([&] { return rhs.consistent(isNull); });
                    if (!isNull)
                    {
                        // Try again.
                        continue;
                    }
                }
                // Nothing on the queue.
                return;
            }

            if (CARB_UNLIKELY(head == mediator))
            {
                // Another thread is in a pop function. Wait until m_head is no longer the mediator.
                this_thread::spinWaitWithBackoff([&] { return rhs.mediated(); });
                // Try again.
                continue;
            }
            break;
        }

        // Release our lock and swap with the tail
        rhs.m_head.store(nullptr, std::memory_order_release);
        T* end = rhs.m_tail.exchange(nullptr, std::memory_order_release);

        // Push onto *this
        _push(head, end);
    }

    void popAll()
    {
        static T* const mediator = reinterpret_cast<T*>(size_t(-1));

        T* head;
        for (;;)
        {
            // The mediator acts as both a lock and a signal
            head = m_head.exchange(mediator, std::memory_order_relaxed);

            if (CARB_UNLIKELY(!head))
            {
                // The first xchg on the tail will tell the enqueuing thread that it is safe to blindly write out to the
                // head pointer. A cmpxchg honors the algorithm.
                T* m = mediator;
                if (CARB_UNLIKELY(!m_head.compare_exchange_strong(
                        m, nullptr, std::memory_order_relaxed, std::memory_order_relaxed)))
                {
                    // Couldn't write a nullptr back. Try again.
                    continue;
                }
                if (CARB_UNLIKELY(!!m_tail.load(std::memory_order_relaxed)))
                {
                    bool isNull;
                    // Wait for consistency
                    this_thread::spinWaitWithBackoff([&] { return this->consistent(isNull); });
                    if (!isNull)
                    {
                        // Try again.
                        continue;
                    }
                }
                // Nothing on the queue.
                return;
            }

            if (CARB_UNLIKELY(head == mediator))
            {
                // Another thread is in a pop function. Wait until m_head is no longer the mediator.
                this_thread::spinWaitWithBackoff([&] { return this->mediated(); });
                // Try again.
                continue;
            }
            break;
        }

        // Release our lock and swap with the tail
        m_head.store(nullptr, std::memory_order_release);
        m_tail.exchange(nullptr, std::memory_order_release);
    }

    template <class Func>
    void forEach(Func&& f)
    {
        static T* const mediator = reinterpret_cast<T*>(size_t(-1));

        T* head;
        for (;;)
        {
            // The mediator acts as both a lock and a signal
            head = m_head.exchange(mediator, std::memory_order_relaxed);

            if (CARB_UNLIKELY(!head))
            {
                // The first xchg on the tail will tell the enqueuing thread that it is safe to blindly write out to the
                // head pointer. A cmpxchg honors the algorithm.
                T* m = mediator;
                if (CARB_UNLIKELY(!m_head.compare_exchange_strong(
                        m, nullptr, std::memory_order_relaxed, std::memory_order_relaxed)))
                {
                    // Couldn't write a nullptr back. Try again.
                    continue;
                }
                if (CARB_UNLIKELY(!!m_tail.load(std::memory_order_relaxed)))
                {
                    bool isNull;
                    // Wait for consistency
                    this_thread::spinWaitWithBackoff([&] { return this->consistent(isNull); });
                    if (!isNull)
                    {
                        // Try again.
                        continue;
                    }
                }
                // Nothing on the queue.
                return;
            }

            if (CARB_UNLIKELY(head == mediator))
            {
                // Another thread is in a pop function. Wait until m_head is no longer the mediator.
                this_thread::spinWaitWithBackoff([&] { return this->mediated(); });
                // Try again.
                continue;
            }
            break;
        }

        // Release our lock and swap with the tail
        m_head.store(nullptr, std::memory_order_release);
        T* e = m_tail.exchange(nullptr, std::memory_order_release);
        for (T *p = head, *n; p; p = n)
        {
            // Ensure that we have a next item (except for `e`; the end of the queue). It's possible
            // that a thread is in `push()` and has written the tail at the time of exchange, above,
            // but has not yet written the previous item's next pointer.
            n = next(p).load(std::memory_order_relaxed);
            if (CARB_UNLIKELY(!n && p != e))
                n = waitForEnqueue(next(p));
            f(p);
        }
    }

    T* popSC()
    {
#if CARB_ASSERT_ENABLED
        // For debug builds, swap with a mediator to ensure that another thread is not in this function
        static T* const mediator = reinterpret_cast<T*>(size_t(-1));
        T* h = m_head.exchange(mediator, std::memory_order_acquire);
        CARB_ASSERT(
            h != mediator, "LocklessQueue: Another thread is racing with popSC(). Use popMC() for multi-consumer.");
        while (CARB_UNLIKELY(!h))
        {
            h = m_head.exchange(nullptr, std::memory_order_release);
            if (h == mediator)
            {
                // We successfully swapped a nullptr for the mediator we put there.
                return nullptr;
            }
            // Another thread in push() could've put something else here, so check it again.
        }
#else
        // If head is null the queue is empty
        T* h = m_head.load(std::memory_order_acquire);
        if (CARB_UNLIKELY(!h))
        {
            return nullptr;
        }
#endif

        // Load the next item and store into the head
        T* n = next(h).load(std::memory_order_acquire);
        m_head.store(n, std::memory_order_relaxed);
        T* e = h;
        if (CARB_UNLIKELY(!n && !m_tail.compare_exchange_strong(
                                    e, nullptr, std::memory_order_release, std::memory_order_relaxed)))
        {
            // The next item was null, but we failed to write null to the tail, so another thread must have added
            // something. Read the next value from `h` and store it in the _head.
            n = next(h).load(std::memory_order_acquire);
            if (CARB_UNLIKELY(!n))
                n = waitForEnqueue(next(h));
            m_head.store(n, std::memory_order_relaxed);
        }

        // This isn't really necessary but prevents dangling pointers.
        next(h).store(nullptr, std::memory_order_relaxed);

        return h;
    }

    T* popMC()
    {
        static T* const mediator = reinterpret_cast<T*>(size_t(-1));

        T* head;
        for (;;)
        {
            // The mediator acts as both a lock and a signal
            head = m_head.exchange(mediator, std::memory_order_relaxed);

            if (CARB_UNLIKELY(!head))
            {
                // The first xchg on the tail will tell the enqueuing thread that it is safe to blindly write out to the
                // head pointer. A cmpxchg honors the algorithm.
                T* m = mediator;
                if (CARB_UNLIKELY(!m_head.compare_exchange_strong(
                        m, nullptr, std::memory_order_relaxed, std::memory_order_relaxed)))
                {
                    // Couldn't write a nullptr back. Try again.
                    continue;
                }
                if (CARB_UNLIKELY(!!m_tail.load(std::memory_order_relaxed)))
                {
                    bool isNull;
                    // Wait for consistency
                    this_thread::spinWaitWithBackoff([&] { return this->consistent(isNull); });
                    if (!isNull)
                    {
                        // Try again.
                        continue;
                    }
                }
                // Nothing on the queue.
                return nullptr;
            }

            if (CARB_UNLIKELY(head == mediator))
            {
                // Another thread is in a pop function. Wait until m_head is no longer the mediator.
                this_thread::spinWaitWithBackoff([&] { return this->mediated(); });
                // Try again.
                continue;
            }
            break;
        }

        // Restore the head pointer to a sane value before returning.
        // If 'next' is nullptr, then this item _might_ be the last item.
        T* n = next(head).load(std::memory_order_relaxed);

        if (CARB_UNLIKELY(!n))
        {
            m_head.store(nullptr, std::memory_order_relaxed);
            // Try to clear the tail to ensure the queue is now empty.
            T* h = head;
            if (m_tail.compare_exchange_strong(h, nullptr, std::memory_order_release, std::memory_order_relaxed))
            {
                // Both head and tail are nullptr now.
                // Clear head's next pointer so that it's not dangling
                next(head).store(nullptr, std::memory_order_relaxed);
                return head;
            }
            // There must be a next item now.
            n = next(head).load(std::memory_order_acquire);
            if (CARB_UNLIKELY(!n))
                n = waitForEnqueue(next(head));
        }

        m_head.store(n, std::memory_order_relaxed);

        // Clear head's next pointer so that it's not dangling
        next(head).store(nullptr, std::memory_order_relaxed);
        return head;
    }

    T* pop()
    {
        return popMC();
    }

    bool pushNotify(T* p)
    {
        bool b = push(p);
        notifyOne();
        return b;
    }

    T* popSCWait()
    {
        T* p = popSC();
        while (!p)
        {
            wait();
            p = popSC();
        }
        return p;
    }

    template <class Rep, class Period>
    T* popSCWaitFor(const std::chrono::duration<Rep, Period>& dur)
    {
        return popSCWaitUntil(std::chrono::steady_clock::now() + dur);
    }

    template <class Clock, class Duration>
    T* popSCWaitUntil(const std::chrono::time_point<Clock, Duration>& tp)
    {
        T* p = popSC();
        while (!p)
        {
            if (!waitUntil(tp))
            {
                return popSC();
            }
            p = popSC();
        }
        return p;
    }

    T* popMCWait()
    {
        T* p = popMC();
        while (!p)
        {
            wait();
            p = popMC();
        }
        return p;
    }

    template <class Rep, class Period>
    T* popMCWaitFor(const std::chrono::duration<Rep, Period>& dur)
    {
        return popMCWaitUntil(std::chrono::steady_clock::now() + dur);
    }

    template <class Clock, class Duration>
    T* popMCWaitUntil(const std::chrono::time_point<Clock, Duration>& tp)
    {
        T* p = popMC();
        while (!p)
        {
            if (!waitUntil(tp))
            {
                return popMC();
            }
            p = popMC();
        }
        return p;
    }

    void wait()
    {
        m_tail.wait(nullptr, std::memory_order_relaxed);
    }

    template <class Rep, class Period>
    bool waitFor(const std::chrono::duration<Rep, Period>& dur)
    {
        return m_tail.wait_for(nullptr, dur, std::memory_order_relaxed);
    }

    template <class Clock, class Duration>
    bool waitUntil(const std::chrono::time_point<Clock, Duration>& tp)
    {
        return m_tail.wait_until(nullptr, tp, std::memory_order_relaxed);
    }

    void notifyOne()
    {
        m_tail.notify_one();
    }

    void notifyAll()
    {
        m_tail.notify_all();
    }

private:
    CARB_VIZ cpp::atomic<T*> m_head;
    CARB_VIZ cpp::atomic<T*> m_tail;

    constexpr static unsigned kWaitSpins = 1024;

    bool _push(T* first, T* last)
    {
        // Swap the tail with our new last item
        T* token = m_tail.exchange(last, std::memory_order_release);
        CARB_ASSERT(token != last);
        if (CARB_LIKELY(token))
        {
            // The previous tail item now points to our new first item.
            next(token).store(first, std::memory_order_relaxed);
            return false;
        }
        else
        {
            // Queue was empty; head points to our first item
            m_head.store(first, std::memory_order_relaxed);
            return true;
        }
    }

    // This function name breaks naming paradigms so that it shows up prominently in stack traces.
    CARB_NOINLINE T* __WAIT_FOR_ENQUEUE__(std::atomic<T*>& ptr)
    {
        T* val;
        int spins = 0;
        while ((val = ptr.load(std::memory_order_relaxed)) == nullptr)
        {
            spins++;
            CARB_UNUSED(spins);
            std::this_thread::yield();
        }
        return val;
    }

    T* waitForEnqueue(std::atomic<T*>& ptr)
    {
        unsigned spins = kWaitSpins;
        T* val;
        while (CARB_UNLIKELY(spins-- > 0))
        {
            if (CARB_LIKELY((val = ptr.load(std::memory_order_relaxed)) != nullptr))
                return val;
            CARB_HARDWARE_PAUSE();
        }
        return __WAIT_FOR_ENQUEUE__(ptr);
    }

    // Predicate: returns `false` until m_head and m_tail are both null or non-null
    bool consistent(bool& isNull) const
    {
        T* h = m_head.load(std::memory_order_relaxed);
        T* t = m_tail.load(std::memory_order_relaxed);
        isNull = !t;
        return !h == !t;
    }

    // Predicate: returns Ready when _head is no longer the mediator
    bool mediated() const
    {
        static T* const mediator = reinterpret_cast<T*>(size_t(-1));
        return m_head.load(std::memory_order_relaxed) != mediator;
    }

    static std::atomic<T*>& next(T* p)
    {
        return (p->*U).m_next;
    }
};

} // namespace container

} // namespace carb