LowLevelLock.h#

Fully qualified name: carb/thread/detail/LowLevelLock.h

File members: carb/thread/detail/LowLevelLock.h

// Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
//
// NVIDIA CORPORATION and its licensors retain all intellectual property
// and proprietary rights in and to this software, related documentation
// and any modifications thereto. Any use, reproduction, disclosure or
// distribution of this software and related documentation without an express
// license agreement from NVIDIA CORPORATION is strictly prohibited.
//

#pragma once

#include "../../Defines.h"
#include "../../cpp/Bit.h"
#include "../../cpp/detail/AtomicOps.h"
#include "../../clock/TscClock.h"
#include "../../detail/TSan.h"
#include "NativeFutex.h"

#include <atomic>

#define CARBLOCAL_DEBUG 0
#define CARBLOCAL_WIN_USE_THREADID 1

namespace carb
{
namespace thread
{
namespace detail
{

inline int initCyclesPerPause() noexcept
{
    // TODO: Perhaps have this calculated someday?
    return 1;
}

inline int cyclesPerPause() noexcept
{
    static const int kCyclesPerPause = initCyclesPerPause();
    return kCyclesPerPause;
}

class Backoff
{
public:
    constexpr Backoff() noexcept = default;

    CARB_NOINLINE void backoff() noexcept
    {
        std::atomic_int dummy{ 0 };
        unsigned thisDelay = delay + unsigned(clock::detail::readTsc() & (delay - 1));
        const int kCyclesPerPause = detail::cyclesPerPause();
        for (int i = int(thisDelay) / kCyclesPerPause; i > 0; --i)
        {
            // Waste some time
            (void)dummy.load();
            (void)dummy.load();
            (void)dummy.load();
            CARB_HARDWARE_PAUSE();
        }
        if (delay < (kMaxDelay - 1))
        {
            delay <<= 1; // double it
            CARB_ASSERT(cpp::has_single_bit(delay)); // Always a power of two
        }
    }

private:
    constexpr static unsigned kInitialDelay = 64;
    constexpr static unsigned kMaxDelay = 8192;
    static_assert(cpp::has_single_bit(kInitialDelay), "Must be power of 2");

    unsigned delay = kInitialDelay;
};

class LowLevelLock
{
public:
    CARB_IF_NOT_TSAN(constexpr) LowLevelLock() noexcept
    {
        __tsan_mutex_create(this, __tsan_mutex_not_static);
    }
    ~LowLevelLock()
    {
        __tsan_mutex_destroy(this, __tsan_mutex_not_static);
    }

    void lock() noexcept
    {
        __tsan_mutex_pre_lock(this, 0);
        // If we successfully set the lock bit (was previously unset), we're locked
        CARB_LIKELY_IF(!cpp::detail::AtomicOps<uintptr_t>::test_bit_and_set(m_data, kLockBit, std::memory_order_seq_cst))
        {
            __tsan_mutex_post_lock(this, 0, 0);
            return;
        }

        lock_slow();
        __tsan_mutex_post_lock(this, 0, 0);
    }

    bool try_lock() noexcept
    {
        __tsan_mutex_pre_lock(this, __tsan_mutex_try_lock);
        // If we successfully set the lock bit (was previously unset), we're locked
        bool b = !cpp::detail::AtomicOps<uintptr_t>::test_bit_and_set(m_data, kLockBit, std::memory_order_seq_cst);
        __tsan_mutex_post_lock(this, b ? __tsan_mutex_try_lock : __tsan_mutex_try_lock_failed, 0);
        return b;
    }

    void unlock() noexcept
    {
        __tsan_mutex_pre_unlock(this, 0);
        uintptr_t old = m_data.fetch_sub(kLock, std::memory_order_seq_cst);

        CARB_ASSERT(old & kLock);
        CARB_ASSERT((old & kWaiting) || (old & kPtrMask) == 0);

        // If we have waiters and no one else is waking, try to set the Waking flag and if successful, wake.
        CARB_UNLIKELY_IF((old & (kWaiting | kWaking)) == kWaiting)
        {
            old -= kLock;
            auto newVal = old + kWaking;
            if (m_data.compare_exchange_strong(old, newVal, std::memory_order_acquire, std::memory_order_relaxed))
                wake(newVal);
        }

        __tsan_mutex_post_unlock(this, 0);
    }

    bool is_locked() const noexcept
    {
        return m_data.load(std::memory_order_acquire) & kLock;
    }

private:
    constexpr static int64_t kLockBit = 0;
    constexpr static intptr_t kLock = intptr_t(1) << kLockBit;
    constexpr static uintptr_t kWaiting = 0x2;
    constexpr static int64_t kWakingBit = 2;
    constexpr static uintptr_t kWaking = uintptr_t(1) << kWakingBit;
    constexpr static uintptr_t kPtrMask = uintptr_t(~0x7);

    constexpr static int kSpinCycles = 10200;

    struct WaitBlock
    {
        constexpr static unsigned kSpinningBit = 0;
        constexpr static unsigned kSpinning = 0x1;
        constexpr static unsigned kWakeup = 0x2;

        WaitBlock* next = nullptr;
        WaitBlock* last = nullptr; // pointer to the last item in the list, when optimized
        WaitBlock* prev = nullptr; // pointer to the previous item, when optimized
        std::atomic_uint32_t flags{ kSpinning };
#if CARB_PLATFORM_WINDOWS && CARBLOCAL_WIN_USE_THREADID
        thread::ThreadId threadId{ this_thread::getId() };
#endif
#if CARBLOCAL_DEBUG
        bool signaled = false;
        uintptr_t oldVal;
        uintptr_t newVal;
        LowLevelLock* lock;
#endif
    };
    static_assert(alignof(WaitBlock) > ~kPtrMask, "WaitBlock alignment too low for bit count");

    CARB_NOINLINE void lock_slow() noexcept
    {
        Backoff backoff;
        for (;;)
        {
            uintptr_t oldVal = m_data.load(std::memory_order_acquire);

            // If the lock is already held or there are waiters then we need to wait.
            if (!(oldVal & kLock))
            {
                if (m_data.compare_exchange_strong(oldVal, oldVal + kLock))
                    break;

                // Back-off if there is too much contention.
                backoff.backoff();
                continue;
            }

            WaitBlock waitBlock;

            bool optimize = false;

            uintptr_t newVal;
            if (oldVal & kWaiting)
            {
                // If there are already waiters, add ourself to the list (we become the new head), but we will need
                // optimize.
                waitBlock.last = nullptr;
                waitBlock.next = cpp::bit_cast<WaitBlock*>(oldVal & kPtrMask);
                newVal = cpp::bit_cast<uintptr_t>(&waitBlock) | (oldVal & ~kPtrMask) | kWaiting | kWaking | kLock;
                if (!(oldVal & kWaking))
                    optimize = true;
            }
            else
            {
                waitBlock.last = &waitBlock;
                newVal = cpp::bit_cast<uintptr_t>(&waitBlock) | kWaiting | kLock;
            }

#if CARBLOCAL_DEBUG
            waitBlock.oldVal = oldVal;
            waitBlock.newVal = newVal;
            waitBlock.lock = this;
#endif
            CARB_ASSERT(newVal & kWaiting);
            CARB_ASSERT(oldVal & kLock);
            CARB_ASSERT(newVal & kLock);

            if (!m_data.compare_exchange_strong(oldVal, newVal))
            {
                // Back off if there is contention.
                backoff.backoff();
                continue;
            }

            // Optimize the list if need be
            if (optimize)
                optimize_wait_list(newVal);

            // We will spin for a bit to see if we will get woken without the much more expensive kernel wait.
            const int kCyclesPerPause = detail::cyclesPerPause();
            for (int i = kSpinCycles / kCyclesPerPause; i > 0; --i)
            {
                if (!(waitBlock.flags.load(std::memory_order_acquire) & WaitBlock::kSpinning))
                {
#if CARBLOCAL_DEBUG
                    CARB_CHECK(waitBlock.signaled);
#endif
                    break;
                }
                CARB_HARDWARE_PAUSE();
            }

            // Atomically clear the Spinning flag, and if it was still set then we will wait in the kernel.
            if (cpp::detail::AtomicOps<uint32_t>::test_bit_and_reset(waitBlock.flags, WaitBlock::kSpinningBit))
            {
                do
                {
                    // Only the wakeup flag may be set
                    CARB_ASSERT((waitBlock.flags.load(std::memory_order_relaxed) & ~WaitBlock::kWakeup) == 0);
#if CARB_PLATFORM_WINDOWS && CARBLOCAL_WIN_USE_THREADID
                    thread::detail::threadId_wait(waitBlock.flags);
#else
                    detail::NativeFutex::wait(waitBlock.flags, 0);
#endif
                } while (!(waitBlock.flags.load(std::memory_order_acquire) & WaitBlock::kWakeup));
#if CARBLOCAL_DEBUG
                CARB_CHECK(waitBlock.signaled);
#endif
            }
            // loop and try again
        }
    }

    CARB_NOINLINE void wake(uintptr_t oldVal) noexcept
    {
        WaitBlock* waitBlock;

        for (;;)
        {
            // Both Waiting and Waking flags must be set
            CARB_ASSERT((oldVal & (kWaiting | kWaking)) == (kWaiting | kWaking));

            // No point waking if we're locked
            while (oldVal & kLock)
            {
                const auto newVal = oldVal - kWaking; // clear the waking flag
                CARB_ASSERT(!(newVal & kWaking));
                CARB_ASSERT(newVal & kLock);
                CARB_ASSERT(newVal & kWaiting);
                if (m_data.compare_exchange_strong(oldVal, newVal))
                    return;
                // oldVal was reloaded by the failed cmpxchg
            }

            WaitBlock* firstWaitBlock = waitBlock = cpp::bit_cast<WaitBlock*>(oldVal & kPtrMask);
            WaitBlock* prevWaitBlock;

            // If optimize was aborted then we need to do it here.
            for (;;)
            {
                if (auto nextWaitBlock = waitBlock->last)
                {
                    waitBlock = nextWaitBlock;
                    break;
                }

                prevWaitBlock = waitBlock;
                waitBlock = waitBlock->next;
                waitBlock->prev = prevWaitBlock;
            }

            if ((prevWaitBlock = waitBlock->prev) != nullptr)
            {
                // Pop the last waiter, but there are other waiters.
                firstWaitBlock->last = prevWaitBlock;

                waitBlock->prev = nullptr;

                CARB_ASSERT(firstWaitBlock != waitBlock);
                CARB_ASSERT(m_data.load(std::memory_order_relaxed) & kWaiting);

                cpp::detail::AtomicOps<uintptr_t>::test_bit_and_reset(m_data, kWakingBit, std::memory_order_release);
                break;
            }
            else
            {
                // Current waiter is the only waiter, so clear everything.
                CARB_ASSERT(!(oldVal & kLock));
                if (m_data.compare_exchange_strong(oldVal, 0))
                    break;
                // oldVal was reloaded by the failed cmpxchg
            }
        }

        __tsan_mutex_pre_signal(this, 0);
#if CARBLOCAL_DEBUG
        CARB_CHECK(!waitBlock->signaled);
        waitBlock->signaled = true;
#endif

#if CARB_PLATFORM_WINDOWS && CARBLOCAL_WIN_USE_THREADID
        // `waitBlock` may not be accessed once kWakeup is set as the waiting thread may destroy it, so read this here.
        auto threadId = waitBlock->threadId;
#endif

        // Simultaneously try to clear the spinning flag and set the wakeup flag.
        uint32_t prevFlags = waitBlock->flags.exchange(WaitBlock::kWakeup);
        CARB_ASSERT((prevFlags & ~WaitBlock::kSpinning) == 0);
        if (!(prevFlags & WaitBlock::kSpinning))
        {
            // If the Spinning flag was already cleared by the waiting thread we need to wake the thread.
            // If we cleared the Spinning flag, the thread will not wait.
#if CARB_PLATFORM_WINDOWS && CARBLOCAL_WIN_USE_THREADID
            thread::detail::threadId_wake(threadId);
#else
            detail::NativeFutex::notify_one(waitBlock->flags); // Note that this does not access waitBlock
#endif
        }
        __tsan_mutex_post_signal(this, 0);
    }

    void optimize_wait_list(uintptr_t oldVal) noexcept
    {
        // Optimization in this context finds the 'last' entry and writes it to the first entry (the one referenced in
        // the pointer bits of m_data) since we want to wake threads in reverse order (wake oldest waiter first). It
        // also assigns all of the 'prev' pointers that it encounters so that the list can be walked backwards.
        // Optimization is optional since wake() will essentially perform the same actions, but it makes sense to defer
        // the work to the thread that is about to wait rather than the thread that will be waking.
        for (;;)
        {
            // If not locked, wake the list rather than optimize
            if (!(oldVal & kLock))
            {
                wake(oldVal);
                break;
            }

            auto firstWaitBlock = cpp::bit_cast<WaitBlock*>(oldVal & kPtrMask);

            auto waitBlock = firstWaitBlock;
            for (;;)
            {
                if (auto nextWaitBlock = waitBlock->last)
                {
                    firstWaitBlock->last = nextWaitBlock;
                    break;
                }

                auto prevWaitBlock = waitBlock;
                waitBlock = waitBlock->next;
                waitBlock->prev = prevWaitBlock;
            }

            uintptr_t newVal = oldVal - kWaking;
            CARB_ASSERT(newVal & kLock);
            CARB_ASSERT(!(newVal & kWaking));
            if (m_data.compare_exchange_strong(oldVal, newVal))
                break;
            // oldVal was reloaded by failed cmpxchg, try again
        }
    }

    // This value has packed into it three control bits, plus an optional pointer to a list of WaitBlock objects.
    // bit 0: locked state (kLock above)
    // bit 1: waiting state (kWaiting above)
    // bit 2: waking state (kWaking above)
    // The remainder of the bits are used as the WaitBlock pointer, which may be 0s if nothing is waiting.
    std::atomic_uintptr_t m_data{ 0 };
};

} // namespace detail
} // namespace thread
} // namespace carb

#undef CARBLOCAL_DEBUG
#undef CARBLOCAL_WIN_USE_THREADID