carb/thread/FutexImpl.h

File members: carb/thread/FutexImpl.h

// Copyright (c) 2020-2024, NVIDIA CORPORATION. All rights reserved.
//
// NVIDIA CORPORATION and its licensors retain all intellectual property
// and proprietary rights in and to this software, related documentation
// and any modifications thereto. Any use, reproduction, disclosure or
// distribution of this software and related documentation without an express
// license agreement from NVIDIA CORPORATION is strictly prohibited.
//

#pragma once

#include "../Defines.h"
#include "../cpp/Bit.h"
#include "../math/Util.h"
#include "../thread/Util.h"
#include "../detail/TSan.h"

#include <atomic>

#if CARB_PLATFORM_WINDOWS
#    pragma comment(lib, "synchronization.lib") // must link with synchronization.lib
#    include "../CarbWindows.h"
#elif CARB_PLATFORM_LINUX
#    include <linux/futex.h>
#    include <sys/syscall.h>
#    include <sys/time.h>

#    include <unistd.h>
#elif CARB_PLATFORM_MACOS
/* nothing for now */
#else
CARB_UNSUPPORTED_PLATFORM();
#endif

namespace carb
{
namespace thread
{
namespace detail
{

template <class T, size_t S = sizeof(T)>
struct to_integral
{
};

template <class T>
struct to_integral<T, 1>
{
    using type = int8_t;
};

template <class T>
struct to_integral<T, 2>
{
    using type = int16_t;
};

template <class T>
struct to_integral<T, 4>
{
    using type = int32_t;
};

template <class T>
struct to_integral<T, 8>
{
    using type = int64_t;
};

template <class T>
using to_integral_t = typename to_integral<T>::type;

template <class As, class T>
CARB_NODISCARD std::enable_if_t<std::is_integral<T>::value && sizeof(As) == sizeof(T), As> reinterpret_as(const T& in) noexcept
{
    static_assert(std::is_integral<As>::value, "Must be integral type");
    return static_cast<As>(in);
}

template <class As, class T>
CARB_NODISCARD std::enable_if_t<std::is_pointer<T>::value && sizeof(As) == sizeof(T), As> reinterpret_as(const T& in) noexcept
{
    static_assert(std::is_integral<As>::value, "Must be integral type");
    return reinterpret_cast<As>(in);
}

template <class As, class T>
CARB_NODISCARD std::enable_if_t<(!std::is_pointer<T>::value && !std::is_integral<T>::value) || sizeof(As) != sizeof(T), As> reinterpret_as(
    const T& in) noexcept
{
    static_assert(std::is_integral<As>::value, "Must be integral type");
    As out{}; // Init to zero
    memcpy(&out, std::addressof(in), sizeof(in));
    return out;
}

template <class Duration>
Duration clampDuration(Duration offset)
{
    using namespace std::chrono;
    constexpr static Duration Max = duration_cast<Duration>(milliseconds(0x7fffffff));
    return ::carb_max(Duration(0), ::carb_min(Max, offset));
}

#if CARB_PLATFORM_WINDOWS
// Windows WaitOnAddress() supports 1, 2, 4 or 8 bytes, so it doesn't need to use ParkingLot. For testing ParkingLot
// or for specific modules this can be enabled though.
#    ifndef CARB_USE_PARKINGLOT
#        define CARB_USE_PARKINGLOT 0
#    endif

using hundrednanos = std::chrono::duration<int64_t, std::ratio<1, 10'000'000>>;

inline bool WaitOnAddress(volatile const void* val, const void* compare, size_t size, int64_t* timeout) noexcept
{
    // Use the NTDLL version of this function since we can give it relative or absolute times in 100ns units
    using RtlWaitOnAddressFn = NTSTATUS(__stdcall*)(volatile const void*, const void*, size_t, int64_t*);
    static RtlWaitOnAddressFn RtlWaitOnAddress =
        (RtlWaitOnAddressFn)GetProcAddress(GetModuleHandleW(L"ntdll.dll"), "RtlWaitOnAddress");

    switch (NTSTATUS ret = RtlWaitOnAddress(val, compare, size, timeout))
    {
        case CARBWIN_STATUS_SUCCESS:
            return true;

        default:
            CARB_FATAL_UNLESS(
                0, "Unexpected result from RtlWaitOnAddress: 0x%lx, GetLastError=%u", ret, ::GetLastError());
            CARB_FALLTHROUGH; // (not really, but the compiler doesn't know that CARB_FATAL_UNLESS doesn't return)
        case CARBWIN_STATUS_TIMEOUT:
            return false;
    }
}

template <class T>
inline bool WaitOnAddress(const std::atomic<T>& val, T compare, int64_t* timeout) noexcept
{
    static_assert(sizeof(val) == sizeof(compare), "Invalid assumption about atomic");
    return WaitOnAddress(std::addressof(val), std::addressof(compare), sizeof(T), timeout);
}

inline bool WaitForAlertByThreadId(const void* addr, int64_t* timeout) noexcept
{
    // Use undocumented API unlikely to change since so many things are based on it.
    // https://ntdoc.m417z.com/ntwaitforalertbythreadid
    // https://dennisbabkin.com/blog/?t=how-to-put-thread-into-kernel-wait-and-to-wake-it-by-thread-id
    using NtWaitForAlertByThreadIdFn = NTSTATUS(__stdcall*)(const void*, int64_t*);
    static auto NtWaitForAlertByThreadId =
        (NtWaitForAlertByThreadIdFn)GetProcAddress(GetModuleHandleW(L"ntdll.dll"), "NtWaitForAlertByThreadId");

    switch (NTSTATUS ret = NtWaitForAlertByThreadId(addr, timeout))
    {
        case CARBWIN_STATUS_SUCCESS:
        case CARBWIN_STATUS_ALERTED:
            return true;

        default:
            CARB_FATAL_UNLESS(
                0, "Unexpected result from NtWaitForAlertByThreadId: 0x%lx, GetLastError=%u", ret, ::GetLastError());
            CARB_FALLTHROUGH; // (not really, but the compiler doesn't know that CARB_FATAL_UNLESS doesn't return)
        case CARBWIN_STATUS_TIMEOUT:
            return false;
    }
}

inline void AlertThreadByThreadId(ThreadId threadId) noexcept
{
    // Use undocumented API unlikely to change since so many things are based on it.
    // https://ntdoc.m417z.com/ntalertthreadbythreadid
    // https://dennisbabkin.com/blog/?t=how-to-put-thread-into-kernel-wait-and-to-wake-it-by-thread-id
    using NtAlertThreadByThreadIdFn = NTSTATUS(__stdcall*)(HANDLE); // HANDLE?
    static auto NtAlertThreadByThreadId =
        (NtAlertThreadByThreadIdFn)GetProcAddress(GetModuleHandleW(L"ntdll.dll"), "NtAlertThreadByThreadId");

    NTSTATUS ret = NtAlertThreadByThreadId((HANDLE)(UINT_PTR)threadId);
    CARB_ASSERT(ret == CARBWIN_STATUS_SUCCESS);
    CARB_UNUSED(ret);
}

template <class TimePoint1, class TimePoint2>
TimePoint1 convertToSystemClock(TimePoint1 tp, TimePoint2, std::true_type) // system_clock
{
    // Already in terms of system clock
    return tp;
}

template <class TimePoint1, class TimePoint2>
auto convertToSystemClock(TimePoint1 tp, TimePoint2 now, std::false_type) // other Clock
{
    // Not using the system clock, so we need to convert to the system clock
    return tp - now + std::chrono::system_clock::now();
}

template <class Clock, class Duration>
bool winConvertAbsTime(std::chrono::time_point<Clock, Duration> time_point, int64_t& absTimeOut)
{
    auto now = Clock::now();

    // Kernel calls are quite slow to return if the time has already elapsed. It's much faster for us to check first.
    if (time_point <= now)
    {
        return false;
    }

    // Constrain the time to something that is well before the heat death of the universe
    auto tp = now + clampDuration(time_point - now);

    // Convert the timepoint to the system clock if necessary
    auto systemTp = convertToSystemClock(tp, now, std::is_same<Clock, std::chrono::system_clock>{});

    // According to https://github.com/microsoft/STL/blob/master/stl/inc/chrono, the system_clock appears to
    // use GetSystemTimePreciseAsFileTime minus an epoch value so that it lines up with 1/1/1970 midnight GMT.
    // Unfortunately there's not an easy way to check for it here, but we have a unittest in TestSemaphore.cpp.
    absTimeOut = std::chrono::duration_cast<detail::hundrednanos>(systemTp.time_since_epoch()).count();

    // Epoch value from https://github.com/microsoft/STL/blob/master/stl/src/xtime.cpp
    // This is the number of 100ns units between 1 January 1601 00:00 GMT and 1 January 1970 00:00 GMT
    constexpr int64_t kFiletimeEpochToUnixEpochIn100nsUnits = 0x19DB1DED53E8000LL;
    absTimeOut += kFiletimeEpochToUnixEpochIn100nsUnits;
    return true;
}

template <class T>
inline void threadId_wait(const std::atomic<T>& val) noexcept
{
    WaitForAlertByThreadId(std::addressof(val), nullptr);
}

template <class T, class Rep, class Period>
inline bool threadId_wait_for(const std::atomic<T>& val, std::chrono::duration<Rep, Period> duration)
{
    // NtWaitForAlertByThreadId treats negative timeouts as positive relative time
    int64_t timeout = -std::chrono::duration_cast<hundrednanos>(clampDuration(duration)).count();
    // Duration must be negative for relative time, otherwise we calculated a time in the past.
    return timeout < 0 ? WaitForAlertByThreadId(std::addressof(val), &timeout) : false;
}

template <class T, class Clock, class Duration>
inline bool threadId_wait_until(const std::atomic<T>& val, std::chrono::time_point<Clock, Duration> time_point)
{
    int64_t absTime;
    if (!winConvertAbsTime(time_point, absTime))
        return false;

    CARB_ASSERT(absTime >= 0);
    return WaitForAlertByThreadId(std::addressof(val), &absTime);
}

inline void threadId_wake(ThreadId id) noexcept
{
    AlertThreadByThreadId(id);
}

template <class T>
inline void futex_wait(const std::atomic<T>& val, T compare) noexcept
{
    WaitOnAddress(val, compare, nullptr);
}

template <class T, class Rep, class Period>
inline bool futex_wait_for(const std::atomic<T>& val, T compare, std::chrono::duration<Rep, Period> duration)
{
    // RtlWaitOnAddress treats negative timeouts as positive relative time
    int64_t timeout = -std::chrono::duration_cast<hundrednanos>(clampDuration(duration)).count();
    // Duration must be negative for relative time, otherwise we calculated a time in the past.
    return timeout < 0 ? WaitOnAddress(val, compare, &timeout) : false;
}

template <class T, class Clock, class Duration>
inline bool futex_wait_until(const std::atomic<T>& val, T compare, std::chrono::time_point<Clock, Duration> time_point)
{
    int64_t absTime;
    if (!winConvertAbsTime(time_point, absTime))
        return false;
    CARB_ASSERT(absTime >= 0);
    return detail::WaitOnAddress(val, compare, &absTime);
}

template <class T>
inline void futex_wake_one(std::atomic<T>& val) noexcept
{
    WakeByAddressSingle(std::addressof(val));
}

template <class T>
inline void futex_wake_n(std::atomic<T>& val, size_t n) noexcept
{
    while (n--)
        futex_wake_one(val);
}

template <class T>
inline void futex_wake_all(std::atomic<T>& val) noexcept
{
    WakeByAddressAll(std::addressof(val));
}

class LowLevelLock
{
public:
    constexpr LowLevelLock() noexcept = default;

    void lock() noexcept
    {
        AcquireSRWLockExclusive((PSRWLOCK)&m_lock);
    }

    bool try_lock() noexcept
    {
        return !!TryAcquireSRWLockExclusive((PSRWLOCK)&m_lock);
    }

    void unlock() noexcept
    {
        ReleaseSRWLockExclusive((PSRWLOCK)&m_lock);
    }

    bool is_locked() const noexcept
    {
        return m_lock.Ptr != nullptr;
    }

private:
    CARBWIN_SRWLOCK m_lock = CARBWIN_SRWLOCK_INIT;
};

#    if !CARB_USE_PARKINGLOT
template <class T, size_t S = sizeof(T)>
class Futex
{
    static_assert(S == 1 || S == 2 || S == 4 || S == 8, "Unsupported size");

public:
    using AtomicType = typename std::atomic<T>;
    using Type = T;
    static inline void wait(const AtomicType& val, Type compare) noexcept
    {
        futex_wait(val, compare);
    }
    template <class Rep, class Period>
    static inline bool wait_for(const AtomicType& val, Type compare, std::chrono::duration<Rep, Period> duration)
    {
        return futex_wait_for(val, compare, duration);
    }
    template <class Clock, class Duration>
    static inline bool wait_until(const AtomicType& val, Type compare, std::chrono::time_point<Clock, Duration> time_point)
    {
        return futex_wait_until(val, compare, time_point);
    }
    static inline void notify_one(AtomicType& a) noexcept
    {
        futex_wake_one(a);
    }
    static inline void notify_n(AtomicType& a, size_t n) noexcept
    {
        futex_wake_n(a, n);
    }
    static inline void notify_all(AtomicType& a) noexcept
    {
        futex_wake_all(a);
    }
};
#    endif

#elif CARB_PLATFORM_LINUX
#    define CARB_USE_PARKINGLOT 1 // Linux only supports 4 byte futex so it must use the ParkingLot

constexpr int64_t kNsPerSec = 1'000'000'000;

inline int futex(const std::atomic_uint32_t& aval,
                 int futex_op,
                 uint32_t val,
                 const struct timespec* timeout,
                 uint32_t* uaddr2,
                 int val3) noexcept
{
    static_assert(sizeof(aval) == sizeof(uint32_t), "Invalid assumption about atomic");
    auto ret = syscall(SYS_futex, std::addressof(aval), futex_op, val, timeout, uaddr2, val3);
    return ret >= 0 ? int(ret) : -errno;
}

inline void futex_wait(const std::atomic_uint32_t& val, uint32_t compare) noexcept
{
    for (;;)
    {
        int ret = futex(val, FUTEX_WAIT_BITSET_PRIVATE, compare, nullptr, nullptr, FUTEX_BITSET_MATCH_ANY);
        switch (ret)
        {
            case 0:
            case -EAGAIN: // Valid or spurious wakeup
                return;

            case -ETIMEDOUT:
                // Apparently on Windows Subsystem for Linux, calls to the kernel can timeout even when a timeout value
                // was not specified. Fall through.
            case -EINTR: // Interrupted by signal; loop again
                break;

            default:
                CARB_FATAL_UNLESS(0, "Unexpected result from futex(): %d/%s", -ret, strerror(-ret));
        }
    }
}

template <class Rep, class Period>
inline bool futex_wait_for(const std::atomic_uint32_t& val, uint32_t compare, std::chrono::duration<Rep, Period> duration)
{
    // Relative time
    int64_t ns = std::chrono::duration_cast<std::chrono::nanoseconds>(clampDuration(duration)).count();
    if (ns <= 0)
    {
        return false;
    }

    struct timespec ts;
    ts.tv_sec = time_t(ns / detail::kNsPerSec);
    ts.tv_nsec = long(ns % detail::kNsPerSec);

    // Since we're using relative time here, we can use FUTEX_WAIT_PRIVATE (see futex() man page)
    int ret = futex(val, FUTEX_WAIT_PRIVATE, compare, &ts, nullptr, 0);
    switch (ret)
    {
        case 0: // Valid wakeup
        case -EAGAIN: // Valid or spurious wakeup
        case -EINTR: // Interrupted by signal; treat as a spurious wakeup
            return true;

        default:
            CARB_FATAL_UNLESS(0, "Unexpected result from futex(): %d/%s", -ret, strerror(-ret));
            CARB_FALLTHROUGH; // (not really but the compiler doesn't know that the above won't return)
        case -ETIMEDOUT:
            return false;
    }
}

template <class Clock, class Duration>
inline bool futex_wait_until(const std::atomic_uint32_t& val,
                             uint32_t compare,
                             std::chrono::time_point<Clock, Duration> time_point)
{
    struct timespec ts;
    clock_gettime(CLOCK_MONOTONIC, &ts);

    // Constrain the time to something that is well before the heat death of the universe
    auto now = Clock::now();
    auto tp = now + clampDuration(time_point - now);

    // Get the number of nanoseconds to go
    int64_t ns = std::chrono::duration_cast<std::chrono::nanoseconds>(tp - now).count();
    if (ns <= 0)
    {
        return false;
    }

    ts.tv_sec += time_t(ns / kNsPerSec);
    ts.tv_nsec += long(ns % kNsPerSec);

    // Handle rollover
    if (ts.tv_nsec >= kNsPerSec)
    {
        ++ts.tv_sec;
        ts.tv_nsec -= kNsPerSec;
    }

    for (;;)
    {
        // Since we're using absolute monotonic time, we use FUTEX_WAIT_BITSET_PRIVATE. See the man page for futex for
        // more info.
        int ret = futex(val, FUTEX_WAIT_BITSET_PRIVATE, compare, &ts, nullptr, FUTEX_BITSET_MATCH_ANY);
        switch (ret)
        {
            case 0: // Valid wakeup
            case -EAGAIN: // Valid or spurious wakeup
                return true;

            case -EINTR: // Interrupted by signal; loop again
                break;

            default:
                CARB_FATAL_UNLESS(0, "Unexpected result from futex(): %d/%s", -ret, strerror(-ret));
                CARB_FALLTHROUGH; // (not really but the compiler doesn't know that the above won't return)
            case -ETIMEDOUT:
                return false;
        }
    }
}

inline void futex_wake_n(std::atomic_uint32_t& val, unsigned count) noexcept
{
    int ret = futex(val, FUTEX_WAKE_BITSET_PRIVATE, count, nullptr, nullptr, FUTEX_BITSET_MATCH_ANY);
    CARB_ASSERT(ret >= 0, "futex(FUTEX_WAKE) failed with errno=%d/%s", -ret, strerror(-ret));
    CARB_UNUSED(ret);
}

inline void futex_wake_one(std::atomic_uint32_t& val) noexcept
{
    futex_wake_n(val, 1);
}

inline void futex_wake_all(std::atomic_uint32_t& val) noexcept
{
    futex_wake_n(val, INT_MAX);
}
#elif CARB_PLATFORM_MACOS
#    define CARB_USE_PARKINGLOT 1

#    define UL_COMPARE_AND_WAIT 1
#    define UL_UNFAIR_LOCK 2
#    define UL_COMPARE_AND_WAIT_SHARED 3
#    define UL_UNFAIR_LOCK64_SHARED 4
#    define UL_COMPARE_AND_WAIT64 5
#    define UL_COMPARE_AND_WAIT64_SHARED 6

#    define ULF_WAKE_ALL 0x00000100
#    define ULF_WAKE_THREAD 0x00000200
#    define ULF_NO_ERRNO 0x01000000

extern "C" int __ulock_wait(uint32_t operation, void* addr, uint64_t value, uint32_t timeout);

extern "C" int __ulock_wake(uint32_t operation, void* addr, uint64_t wake_value);

inline void futex_wait(const std::atomic_uint32_t& val, uint32_t compare) noexcept
{
    for (;;)
    {
        int rc = __ulock_wait(UL_COMPARE_AND_WAIT | ULF_NO_ERRNO,
                              const_cast<uint32_t*>(reinterpret_cast<const uint32_t*>(std::addressof(val))), compare, 0);
        if (rc >= 0)
        {
            // According to XNU source, the non-negative return value is the number of remaining waiters.
            // See ulock_wait_cleanup in sys_ulock.c
            return;
        }
        switch (-rc)
        {
            case EINTR: // According to XNU source, EINTR can be returned.
                continue;
            case ETIMEDOUT:
                CARB_FALLTHROUGH;
            case EFAULT:
                CARB_FALLTHROUGH;
            default:
                CARB_FATAL_UNLESS(0, "Unexpected result from __ulock_wait: %d/%s", -rc, strerror(-rc));
        }
    }
}

template <class Rep, class Period>
inline bool futex_wait_for(const std::atomic_uint32_t& val, uint32_t compare, std::chrono::duration<Rep, Period> duration)
{
    // Relative time
    int64_t usec = std::chrono::duration_cast<std::chrono::microseconds>(clampDuration(duration)).count();
    if (usec <= 0)
    {
        return false;
    }
    if (usec > UINT32_MAX)
    {
        usec = UINT32_MAX;
    }

    int rc = __ulock_wait(UL_COMPARE_AND_WAIT | ULF_NO_ERRNO,
                          const_cast<uint32_t*>(reinterpret_cast<const uint32_t*>(std::addressof(val))), compare, usec);
    if (rc >= 0)
    {
        // According to XNU source, the non-negative return value is the number of remaining waiters.
        // See ulock_wait_cleanup in sys_ulock.c
        return true;
    }
    switch (-rc)
    {
        case EINTR: // Treat signal interrupt as a spurious wakeup
            return true;
        case ETIMEDOUT:
            return false;
        default:
            CARB_FATAL_UNLESS(0, "Unexpected result from __ulock_wait: %d/%s", -rc, strerror(-rc));
    }
}

template <class Clock, class Duration>
inline bool futex_wait_until(const std::atomic_uint32_t& val,
                             uint32_t compare,
                             std::chrono::time_point<Clock, Duration> time_point)
{
    // Constrain the time to something that is well before the heat death of the universe
    auto now = Clock::now();
    auto tp = now + clampDuration(time_point - now);

    // Convert to number of microseconds from now
    int64_t usec = std::chrono::duration_cast<std::chrono::microseconds>(tp - now).count();
    if (usec <= 0)
    {
        return false;
    }
    if (usec > UINT32_MAX)
    {
        usec = UINT32_MAX;
    }

    int rc = __ulock_wait(UL_COMPARE_AND_WAIT | ULF_NO_ERRNO,
                          const_cast<uint32_t*>(reinterpret_cast<const uint32_t*>(std::addressof(val))), compare, usec);
    if (rc >= 0)
    {
        // According to XNU source, the non-negative return value is the number of remaining waiters.
        // See ulock_wait_cleanup in sys_ulock.c
        return true;
    }
    switch (-rc)
    {
        case EINTR: // Treat signal interrupt as a spurious wakeup
            return true;
        case ETIMEDOUT:
            return false;
        default:
            CARB_FATAL_UNLESS(0, "Unexpected result from __ulock_wait: %d/%s", -rc, strerror(-rc));
    }
}

inline void futex_wake_n(std::atomic_uint32_t& val, unsigned count) noexcept
{
    for (unsigned i = 0; i < count; i++)
    {
        __ulock_wake(UL_COMPARE_AND_WAIT, std::addressof(val), 0);
    }
}

inline void futex_wake_one(std::atomic_uint32_t& val) noexcept
{
    __ulock_wake(UL_COMPARE_AND_WAIT, std::addressof(val), 0);
}

inline void futex_wake_all(std::atomic_uint32_t& val) noexcept
{
    __ulock_wake(UL_COMPARE_AND_WAIT | ULF_WAKE_ALL, std::addressof(val), 0);
}
#endif

class NativeFutex
{
public:
    using AtomicType = std::atomic_uint32_t;
    using Type = uint32_t;
    static inline void wait(const AtomicType& val, Type compare) noexcept
    {
        futex_wait(val, compare);
    }
    template <class Rep, class Period>
    static inline bool wait_for(const AtomicType& val, Type compare, std::chrono::duration<Rep, Period> duration)
    {
        return futex_wait_for(val, compare, duration);
    }
    template <class Clock, class Duration>
    static inline bool wait_until(const AtomicType& val, Type compare, std::chrono::time_point<Clock, Duration> time_point)
    {
        return futex_wait_until(val, compare, time_point);
    }
    static inline void notify_one(AtomicType& a) noexcept
    {
        futex_wake_one(a);
    }
    static inline void notify_n(AtomicType& a, size_t n) noexcept
    {
        futex_wake_n(a, unsigned(carb_min<size_t>(UINT_MAX, n)));
    }
    static inline void notify_all(AtomicType& a) noexcept
    {
        futex_wake_all(a);
    }
};

#if !CARB_PLATFORM_WINDOWS
// This class requires that it fits entirely within the same cache line.
class alignas(2 * sizeof(NativeFutex::Type)) LowLevelLock
{
public:
    CARB_IF_NOT_TSAN(constexpr) LowLevelLock() noexcept
    {
        __tsan_mutex_create(this, __tsan_mutex_not_static);
    }
    CARB_IF_TSAN(~LowLevelLock() { __tsan_mutex_destroy(this, __tsan_mutex_not_static); })

    void lock() noexcept
    {
        __tsan_mutex_pre_lock(this, 0);
        // seq_cst so subsequent loads can be relaxed
        while (m_lock.exchange(kLocked, std::memory_order_seq_cst) == kLocked)
        {
            if (!this_thread::spinTryWaitWithBackoff([&] { return !is_locked(); }))
            {
                // NativeFutex operations are comparatively slow, so m_waiters will track whether any threads are
                // actually waiting.
                ++m_waiters;
                while (is_locked())
                    NativeFutex::wait(m_lock, kLocked);
                --m_waiters;
            }
        }
        __tsan_mutex_post_lock(this, 0, 0);
    }

    bool try_lock() noexcept
    {
        __tsan_mutex_pre_lock(this, __tsan_mutex_try_lock);
        if (m_lock.exchange(kLocked, std::memory_order_seq_cst) == kUnlocked)
        {
            __tsan_mutex_post_lock(this, __tsan_mutex_try_lock, 0);
            return true;
        }
        __tsan_mutex_post_lock(this, __tsan_mutex_try_lock_failed, 0);
        return false;
    }

    void unlock() noexcept
    {
        __tsan_mutex_pre_unlock(this, 0);
        CARB_ASSERT(is_locked());
        // This operation requires that both members be in the same cache line to work properly. By doing an exchange()
        // which is RMW, we have acquire/release semantics, which allows the subsequent load to be relaxed.
        m_lock.exchange(kUnlocked, std::memory_order_seq_cst);
        if (m_waiters.load(std::memory_order_relaxed))
        {
            __tsan_mutex_pre_signal(this, 0);
            NativeFutex::notify_one(m_lock);
            __tsan_mutex_post_signal(this, 0);
        }
        __tsan_mutex_post_unlock(this, 0);
    }

    bool is_locked() const noexcept
    {
        return m_lock.load(std::memory_order_relaxed) == kLocked;
    }

private:
    constexpr static NativeFutex::Type kUnlocked = 0;
    constexpr static NativeFutex::Type kLocked = 1;

    NativeFutex::AtomicType m_lock{ kUnlocked };
    NativeFutex::AtomicType m_waiters{ 0 };
};
#endif

struct ParkingLot
{
    struct WaitEntry
    {
        enum Bits : uint32_t
        {
            kNoBits = 0,
            kNotifyBit = 1,
            kWaitBit = 2,
        };

        const void* addr;
        WaitEntry* next{ nullptr };
        WaitEntry* prev{ nullptr };
        uint32_t changeId{ 0 };
        NativeFutex::AtomicType wakeup{ kNoBits };
#if CARB_PLATFORM_WINDOWS
        ThreadId threadId{ this_thread::getId() };
#endif
    };

    class WakeCache
    {
    public:
        constexpr static size_t kWakeCacheSize = 128;

        WakeCache() noexcept = default;

        ~WakeCache()
        {
            wakeAll();
        }

#if CARB_PLATFORM_WINDOWS
        using Entry = ThreadId;

        static Entry fromWaitEntry(WaitEntry& we) noexcept
        {
            return we.threadId;
        }

        static void doWake(Entry val) noexcept
        {
            threadId_wake(val);
        }

        static void doWait(WaitEntry& we) noexcept
        {
            threadId_wait(we.wakeup);
        }

        template <class Duration>
        static bool doWaitFor(WaitEntry& we, Duration duration)
        {
            return threadId_wait_for(we.wakeup, duration);
        }

        template <class TimePoint>
        static bool doWaitUntil(WaitEntry& we, TimePoint timePoint)
        {
            return threadId_wait_until(we.wakeup, timePoint);
        }
#else
        using Entry = NativeFutex::AtomicType*;

        static Entry fromWaitEntry(WaitEntry& we) noexcept
        {
            return &we.wakeup;
        }

        static void doWake(Entry val) noexcept
        {
            NativeFutex::notify_one(*val);
        }

        static void doWait(WaitEntry& we) noexcept
        {
            NativeFutex::wait(we.wakeup, WaitEntry::kNoBits);
        }

        template <class Duration>
        static bool doWaitFor(WaitEntry& we, Duration duration)
        {
            return NativeFutex::wait_for(we.wakeup, WaitEntry::kNoBits, duration);
        }

        template <class TimePoint>
        static bool doWaitUntil(WaitEntry& we, TimePoint timePoint)
        {
            return NativeFutex::wait_until(we.wakeup, WaitEntry::kNoBits, timePoint);
        }
#endif

        CARB_NODISCARD bool add(WaitEntry& we) noexcept
        {
            CARB_ASSERT(m_end < pastTheEnd());
            *(m_end++) = fromWaitEntry(we);
            return m_end != pastTheEnd();
        }

        void wakeAll() noexcept
        {
            for (Entry* p = &m_entries[0]; p != m_end; ++p)
                doWake(*p);
            m_end = &m_entries[0];
        }

    private:
        const Entry* pastTheEnd() const noexcept
        {
            return &m_entries[kWakeCacheSize];
        }

        Entry m_entries[kWakeCacheSize];
        Entry* m_end = &m_entries[0];
    };

    class WaitBucket
    {
        LowLevelLock m_lock;
        WaitEntry* m_head{ nullptr };
        WaitEntry* m_tail{ nullptr };
        std::atomic_uint32_t m_changeTracker{ 0 };

        void assertLockState() const noexcept
        {
            CARB_ASSERT(m_lock.is_locked());
        }

    public:
        CARB_IF_NOT_TSAN(constexpr) WaitBucket() noexcept = default;

        CARB_NODISCARD std::unique_lock<LowLevelLock> lock() noexcept
        {
            return std::unique_lock<LowLevelLock>(m_lock);
        }

        void incChangeTracker() noexcept
        {
            assertLockState(); // under lock
            m_changeTracker.store(m_changeTracker.load(std::memory_order_relaxed) + 1, std::memory_order_relaxed);
        }

        bool appearsEmpty() const noexcept
        {
            // fence-fence synchronization with wait functions, forces visibility
            this_thread::atomic_fence_seq_cst();
            // cpp::atomic_ref() is dependent on this file, so we can't use it.
            return reinterpret_cast<const std::atomic<WaitEntry*>&>(m_head).load(std::memory_order_relaxed) == nullptr;
        }

        std::atomic_uint32_t& changeTracker() noexcept
        {
            return m_changeTracker;
        }

        WaitEntry* head() noexcept
        {
            assertLockState();
            return m_head;
        }

        void append(WaitEntry* e) noexcept
        {
            assertLockState();
            e->prev = std::exchange(m_tail, e);
            e->next = nullptr;
            if (e->prev)
            {
                e->prev->next = e;
            }
            else
            {
                CARB_ASSERT(m_head == nullptr);
                m_head = e;
            }
        }

        void remove(WaitEntry* e) noexcept
        {
            assertLockState();
            if (e->next)
            {
                e->next->prev = e->prev;
            }
            else
            {
                CARB_ASSERT(m_tail == e);
                m_tail = e->prev;
            }
            if (e->prev)
            {
                e->prev->next = e->next;
            }
            else
            {
                CARB_ASSERT(m_head == e);
                m_head = e->next;
            }
        }

        constexpr static size_t kNumWaitBuckets = 2048; // Must be a power of two
        static_assert(carb::cpp::has_single_bit(kNumWaitBuckets), "Invalid assumption");

        static inline WaitBucket& bucket(const void* addr) noexcept
        {
            static WaitBucket waitBuckets[kNumWaitBuckets];

#if 1
            // FNV-1a hash is fast with really good distribution
            // In "futex buckets" test, about ~70% on Windows and ~80% on Linux
            auto hash = carb::hashBuffer(&addr, sizeof(addr));
            return waitBuckets[hash & (kNumWaitBuckets - 1)];
#else
            // Simple bitshift method
            // In "futex buckets" test:
            //  >> 4 bits: ~71% on Windows, ~72% on Linux
            //  >> 5 bits: ~42% on Windows, ~71% on Linux
            return waitBuckets[(size_t(addr) >> 4) & (kNumWaitBuckets - 1)];
#endif
        }
    };

    template <typename T>
    static void wait(const std::atomic<T>& val, T compare) noexcept
    {
        WaitEntry entry{ std::addressof(val) };

        using I = to_integral_t<T>;

        // Check before waiting
        if (reinterpret_as<I>(val.load(std::memory_order_acquire)) != reinterpret_as<I>(compare))
        {
            return;
        }

        WaitBucket& b = WaitBucket::bucket(std::addressof(val));
        {
            auto lock = b.lock();

            // Check inside the lock to reduce spurious wakeups
            if (CARB_UNLIKELY(reinterpret_as<I>(val.load(std::memory_order_relaxed)) != reinterpret_as<I>(compare)))
            {
                return;
            }

            entry.changeId = b.changeTracker().load(std::memory_order_relaxed);

            b.append(&entry);
        }

        // Do the wait if everything is consistent
        NativeFutex::Type v = WaitEntry::kNoBits;
        if (CARB_LIKELY(reinterpret_as<I>(val.load(std::memory_order_acquire)) == reinterpret_as<I>(compare) &&
                        b.changeTracker().load(std::memory_order_relaxed) == entry.changeId))
        {
            // Do the wait
            WakeCache::doWait(entry);

            // Full fence to force visibility on entry.wakeup, which can be a relaxed load
            this_thread::atomic_fence_seq_cst();
            v = entry.wakeup.load(std::memory_order_relaxed);

            // See if already removed without a wait bit
            if (CARB_LIKELY(v == WaitEntry::kNotifyBit))
                return;
        }

        // May not have been removed
        if (!(v & WaitEntry::kNotifyBit))
        {
            // Need to remove
            auto lock = b.lock();
            // Check again under the lock (relaxed because we're under the lock)
            v = entry.wakeup.load(std::memory_order_relaxed);
            if (CARB_LIKELY(v == WaitEntry::kNoBits))
            {
                b.remove(&entry);
                return;
            }
        }

        // Spin briefly while the wait bit is set, though this should be rare
        if (CARB_UNLIKELY(v & WaitEntry::kWaitBit))
            this_thread::spinWait([&] { return !(entry.wakeup.load(std::memory_order_acquire) & WaitEntry::kWaitBit); });
    }

    template <typename T, typename Rep, typename Period>
    static bool wait_for(const std::atomic<T>& val, T compare, std::chrono::duration<Rep, Period> duration)
    {
        WaitEntry entry{ std::addressof(val) };

        using I = to_integral_t<T>;

        // Check before waiting
        if (reinterpret_as<I>(val.load(std::memory_order_acquire)) != reinterpret_as<I>(compare))
        {
            return true;
        }

        WaitBucket& b = WaitBucket::bucket(std::addressof(val));
        {
            auto lock = b.lock();

            // Check inside the lock to reduce spurious wakeups
            if (CARB_UNLIKELY(reinterpret_as<I>(val.load(std::memory_order_relaxed)) != reinterpret_as<I>(compare)))
            {
                return true;
            }

            entry.changeId = b.changeTracker().load(std::memory_order_relaxed);
            b.append(&entry);
        }

        // Do the wait if everything is consistent
        bool wasNotified = true;
        NativeFutex::Type v = WaitEntry::kNoBits;
        if (CARB_LIKELY(reinterpret_as<I>(val.load(std::memory_order_acquire)) == reinterpret_as<I>(compare) &&
                        b.changeTracker().load(std::memory_order_relaxed) == entry.changeId))
        {
            // Do the wait
            wasNotified = WakeCache::doWaitFor(entry, duration);

            if (wasNotified) // specifically woken
            {
                // Full fence to force visibility on entry.wakeup, which can be a relaxed load
                this_thread::atomic_fence_seq_cst();
                v = entry.wakeup.load(std::memory_order_relaxed);

                // See if already removed without a wait bit
                if (CARB_LIKELY(v == WaitEntry::kNotifyBit))
                    return true;
            }
        }

        // May not have been removed.
        if (!(v & WaitEntry::kNotifyBit))
        {
            // Need to remove
            auto lock = b.lock();
            // Check again under the lock (relaxed because we're under the lock)
            v = entry.wakeup.load(std::memory_order_relaxed);
            if (CARB_LIKELY(v == WaitEntry::kNoBits))
            {
                b.remove(&entry);
                return wasNotified;
            }
            else
            {
                // Already removed, so treat it as a notification rather than a timeout.
                wasNotified = true;
            }
        }

        // Spin briefly while the wait bit is set, though this should be rare
        if (CARB_UNLIKELY(v & WaitEntry::kWaitBit))
            this_thread::spinWait([&] { return !(entry.wakeup.load(std::memory_order_acquire) & WaitEntry::kWaitBit); });

        return wasNotified;
    }

    template <typename T, typename Clock, typename Duration>
    static bool wait_until(const std::atomic<T>& val, T compare, std::chrono::time_point<Clock, Duration> time_point)
    {
        WaitEntry entry{ std::addressof(val) };

        using I = to_integral_t<T>;

        // Check before waiting
        if (reinterpret_as<I>(val.load(std::memory_order_acquire)) != reinterpret_as<I>(compare))
        {
            return true;
        }

        WaitBucket& b = WaitBucket::bucket(std::addressof(val));
        {
            auto lock = b.lock();

            // Check inside the lock to reduce spurious wakeups
            if (CARB_UNLIKELY(reinterpret_as<I>(val.load(std::memory_order_relaxed)) != reinterpret_as<I>(compare)))
            {
                return true;
            }

            entry.changeId = b.changeTracker().load(std::memory_order_relaxed);
            b.append(&entry);
        }

        // Do the wait if everything is consistent
        bool wasNotified = true;
        NativeFutex::Type v = WaitEntry::kNoBits;
        if (CARB_LIKELY(reinterpret_as<I>(val.load(std::memory_order_acquire)) == reinterpret_as<I>(compare) &&
                        b.changeTracker().load(std::memory_order_relaxed) == entry.changeId))
        {
            // Do the wait
            wasNotified = WakeCache::doWaitUntil(entry, time_point);

            if (wasNotified)
            {
                // Full fence to force visibility on entry.wakeup, which can be a relaxed load
                this_thread::atomic_fence_seq_cst();
                v = entry.wakeup.load(std::memory_order_relaxed);

                // See if already removed without a wait bit
                if (CARB_LIKELY(v == WaitEntry::kNotifyBit))
                    return true;
            }
        }

        // May not have been removed.
        if (!(v & WaitEntry::kNotifyBit))
        {
            // Need to remove
            auto lock = b.lock();
            // Check again under the lock (relaxed because we're under the lock)
            v = entry.wakeup.load(std::memory_order_relaxed);
            if (CARB_LIKELY(v == WaitEntry::kNoBits))
            {
                b.remove(&entry);
                return wasNotified;
            }
            else
            {
                // Already removed, so treat it as a notification rather than a timeout.
                wasNotified = true;
            }
        }

        // Spin briefly while the wait bit is set, though this should be rare
        if (CARB_UNLIKELY(v & WaitEntry::kWaitBit))
            this_thread::spinWait([&] { return !(entry.wakeup.load(std::memory_order_acquire) & WaitEntry::kWaitBit); });

        return wasNotified;
    }

    static void notify_one(void* addr) noexcept
    {
        WaitBucket& b = WaitBucket::bucket(addr);

        // Read empty state with full fence to avoid locking
        if (b.appearsEmpty())
            return;

        auto lock = b.lock();
        b.incChangeTracker();
        for (WaitEntry* e = b.head(); e; e = e->next)
        {
            if (e->addr == addr)
            {
                // Remove before setting the wakeup flag
                b.remove(e);

                // `e` may be destroyed after we unlock, so record the value that we need here.
                auto val = WakeCache::fromWaitEntry(*e);

                // release semantics despite being under the lock because we want `val` read before store.
                // No access to `e` allowed after this store.
                e->wakeup.store(WaitEntry::kNotifyBit, std::memory_order_release);

                // Unlock and issue the wake
                lock.unlock();

                WakeCache::doWake(val);

                return;
            }
        }
    }

private:
    static CARB_NOINLINE void notify_n_slow(
        std::unique_lock<LowLevelLock> lock, void* addr, size_t n, WaitBucket& b, WaitEntry* e, WakeCache& wakeCache) noexcept
    {
        // We are waking a lot of things and the wakeCache is completely full. Now we need to start building a list
        WaitEntry *wake = nullptr, *end = nullptr;

        for (WaitEntry* next; e; e = next)
        {
            next = e->next;
            if (e->addr == addr)
            {
                b.remove(e);
                e->next = nullptr;
                // Don't bother with prev pointers
                if (end)
                    end->next = e;
                else
                    wake = e;
                end = e;

                // Relaxed because we're under the lock.
                // Need to set the wait bit since we're still reading/writing to the WaitEntry
                e->wakeup.store(WaitEntry::kWaitBit | WaitEntry::kNotifyBit, std::memory_order_relaxed);

                if (!--n)
                    break;
            }
        }

        lock.unlock();

        // Wake the entire cache since we know it's full
        wakeCache.wakeAll();

        for (WaitEntry* next; wake; wake = next)
        {
            next = wake->next;

            auto val = WakeCache::fromWaitEntry(*wake);

            // Clear the wait bit so that only the wake bit is set. No accesses to `wake` are allowed after this since
            // the waiting thread is free to destroy it. Release to synchronize-with spin-waiting on the wait bit.
            wake->wakeup.store(WaitEntry::kNotifyBit, std::memory_order_release);

            WakeCache::doWake(val);
        }
    }

    static CARB_NOINLINE void notify_all_slow(
        std::unique_lock<LowLevelLock> lock, void* addr, WaitBucket& b, WaitEntry* e, WakeCache& wakeCache) noexcept
    {
        // We are waking a lot of things and the wakeCache is completely full. Now we need to start building a list
        WaitEntry *wake = nullptr, *end = nullptr;

        for (WaitEntry* next; e; e = next)
        {
            next = e->next;
            if (e->addr == addr)
            {
                b.remove(e);
                e->next = nullptr;
                // Don't bother with prev pointers
                if (end)
                    end->next = e;
                else
                    wake = e;
                end = e;

                // Relaxed because we're under the lock.
                // Need to set the wait bit since we're still reading/writing to the WaitEntry
                e->wakeup.store(WaitEntry::kWaitBit | WaitEntry::kNotifyBit, std::memory_order_relaxed);
            }
        }

        lock.unlock();

        // Wake the entire cache since we know it's full
        wakeCache.wakeAll();

        for (WaitEntry* next; wake; wake = next)
        {
            next = wake->next;
            auto val = WakeCache::fromWaitEntry(*wake);

            // Clear the wait bit so that only the wake bit is set. No accesses to `wake` are allowed after this since
            // the waiting thread is free to destroy it. Release to synchronize-with spin-waiting on the wait bit.
            wake->wakeup.store(WaitEntry::kNotifyBit, std::memory_order_release);

            WakeCache::doWake(val);
        }
    }

public:
    static void notify_n(void* addr, size_t n) noexcept
    {
        if (CARB_UNLIKELY(n == 0))
        {
            return;
        }

        WaitBucket& b = WaitBucket::bucket(addr);

        // Read empty state with full fence to avoid locking
        if (b.appearsEmpty())
            return;

        // It is much faster overall to not set kWaitBit and force the woken threads to wait until we're clear of their
        // WaitEntry, so keep a local cache of addresses to wake here that don't require a WaitEntry.
        // Note that we do retain a pointer to the std::atomic_uint32_t *contained in* the WaitEntry and tell the
        // underlying OS system to wake by that address, but this is safe as it does not read that memory. The address
        // is used as a lookup to find any waiters that registered with that address. If the memory was quickly reused
        // for a different wait operation, this will cause a spurious wakeup which is allowed, but this should be
        // exceedingly rare.
        // Must be constructed before the lock is taken
        WakeCache wakeCache;

        auto lock = b.lock();
        b.incChangeTracker();
        for (WaitEntry *next, *e = b.head(); e; e = next)
        {
            next = e->next;
            if (e->addr == addr)
            {
                b.remove(e);
                bool nowFull = wakeCache.add(*e);

                // release semantics despite being under the lock because we want add() complete before store.
                // No access to `e` allowed after this store.
                e->wakeup.store(WaitEntry::kNotifyBit, std::memory_order_release);

                if (!--n)
                    break;

                if (CARB_UNLIKELY(nowFull))
                {
                    // Cache is full => transition to a list for the rest
                    notify_n_slow(std::move(lock), addr, n, b, next, wakeCache);
                    return;
                }
            }
        }

        // Lock is unlocked (important this happens first), and then wakeCache destructs and issues all of its wakes
    }

    static void notify_all(void* addr) noexcept
    {
        WaitBucket& b = WaitBucket::bucket(addr);

        // Read empty state with full fence to avoid locking
        if (b.appearsEmpty())
            return;

        // It is much faster overall to not set kWaitBit and force the woken threads to wait until we're clear of their
        // WaitEntry, so keep a local cache of addresses to wake here that don't require a WaitEntry.
        // Note that we do retain a pointer to the std::atomic_uint32_t *contained in* the WaitEntry and tell the
        // underlying OS system to wake by that address, but this is safe as it does not read that memory. The address
        // is used as a lookup to find any waiters that registered with that address. If the memory was quickly reused
        // for a different wait operation, this will cause a spurious wakeup which is allowed, but this should be
        // exceedingly rare.
        // Must be constructed before the lock is taken
        WakeCache wakeCache;

        auto lock = b.lock();
        b.incChangeTracker();
        for (WaitEntry *next, *e = b.head(); e; e = next)
        {
            next = e->next;
            if (e->addr == addr)
            {
                b.remove(e);
                bool nowFull = wakeCache.add(*e);

                // release semantics despite being under the lock because we want add() complete before store.
                // No access to `e` allowed after this store.
                e->wakeup.store(WaitEntry::kNotifyBit, std::memory_order_release);
                if (CARB_UNLIKELY(nowFull))
                {
                    // Full => transition to a list for the rest of the items
                    notify_all_slow(std::move(lock), addr, b, next, wakeCache);
                    return;
                }
            }
        }

        // Lock is unlocked (important this happens first), and then wakeCache destructs and issues all of its wakes
    }

}; // struct ParkingLot

template <class T, size_t S = sizeof(T)>
class ParkingLotFutex
{
    static_assert(S == 1 || S == 2 || S == 4 || S == 8, "Unsupported size");

public:
    using AtomicType = typename std::atomic<T>;
    using Type = T;
    static void wait(const AtomicType& val, T compare) noexcept
    {
        ParkingLot::wait(val, compare);
    }
    template <class Rep, class Period>
    static bool wait_for(const AtomicType& val, T compare, std::chrono::duration<Rep, Period> duration)
    {
        return ParkingLot::wait_for(val, compare, duration);
    }
    template <class Clock, class Duration>
    static bool wait_until(const AtomicType& val, T compare, std::chrono::time_point<Clock, Duration> time_point)
    {
        return ParkingLot::wait_until(val, compare, time_point);
    }
    static void notify_one(AtomicType& val) noexcept
    {
        ParkingLot::notify_one(std::addressof(val));
    }
    static void notify_n(AtomicType& val, size_t count) noexcept
    {
        ParkingLot::notify_n(std::addressof(val), count);
    }
    static void notify_all(AtomicType& val) noexcept
    {
        ParkingLot::notify_all(std::addressof(val));
    }
};

// Futex types that must use the ParkingLot
#if CARB_USE_PARKINGLOT
template <class T, size_t S = sizeof(T)>
using Futex = ParkingLotFutex<T, S>;
#endif

} // namespace detail
} // namespace thread
} // namespace carb