carb/thread/FutexImpl.h
File members: carb/thread/FutexImpl.h
// Copyright (c) 2020-2024, NVIDIA CORPORATION. All rights reserved.
//
// NVIDIA CORPORATION and its licensors retain all intellectual property
// and proprietary rights in and to this software, related documentation
// and any modifications thereto. Any use, reproduction, disclosure or
// distribution of this software and related documentation without an express
// license agreement from NVIDIA CORPORATION is strictly prohibited.
//
#pragma once
#include "../Defines.h"
#include "../cpp/Bit.h"
#include "../math/Util.h"
#include "../thread/Util.h"
#include "../detail/TSan.h"
#include <atomic>
#if CARB_PLATFORM_WINDOWS
# pragma comment(lib, "synchronization.lib") // must link with synchronization.lib
# include "../CarbWindows.h"
#elif CARB_PLATFORM_LINUX
# include <linux/futex.h>
# include <sys/syscall.h>
# include <sys/time.h>
# include <unistd.h>
#elif CARB_PLATFORM_MACOS
/* nothing for now */
#else
CARB_UNSUPPORTED_PLATFORM();
#endif
namespace carb
{
namespace thread
{
namespace detail
{
template <class T, size_t S = sizeof(T)>
struct to_integral
{
};
template <class T>
struct to_integral<T, 1>
{
using type = int8_t;
};
template <class T>
struct to_integral<T, 2>
{
using type = int16_t;
};
template <class T>
struct to_integral<T, 4>
{
using type = int32_t;
};
template <class T>
struct to_integral<T, 8>
{
using type = int64_t;
};
template <class T>
using to_integral_t = typename to_integral<T>::type;
template <class As, class T>
CARB_NODISCARD std::enable_if_t<std::is_integral<T>::value && sizeof(As) == sizeof(T), As> reinterpret_as(const T& in) noexcept
{
static_assert(std::is_integral<As>::value, "Must be integral type");
return static_cast<As>(in);
}
template <class As, class T>
CARB_NODISCARD std::enable_if_t<std::is_pointer<T>::value && sizeof(As) == sizeof(T), As> reinterpret_as(const T& in) noexcept
{
static_assert(std::is_integral<As>::value, "Must be integral type");
return reinterpret_cast<As>(in);
}
template <class As, class T>
CARB_NODISCARD std::enable_if_t<(!std::is_pointer<T>::value && !std::is_integral<T>::value) || sizeof(As) != sizeof(T), As> reinterpret_as(
const T& in) noexcept
{
static_assert(std::is_integral<As>::value, "Must be integral type");
As out{}; // Init to zero
memcpy(&out, std::addressof(in), sizeof(in));
return out;
}
template <class Duration>
Duration clampDuration(Duration offset)
{
using namespace std::chrono;
constexpr static Duration Max = duration_cast<Duration>(milliseconds(0x7fffffff));
return ::carb_max(Duration(0), ::carb_min(Max, offset));
}
#if CARB_PLATFORM_WINDOWS
// Windows WaitOnAddress() supports 1, 2, 4 or 8 bytes, so it doesn't need to use ParkingLot. For testing ParkingLot
// or for specific modules this can be enabled though.
# ifndef CARB_USE_PARKINGLOT
# define CARB_USE_PARKINGLOT 0
# endif
using hundrednanos = std::chrono::duration<int64_t, std::ratio<1, 10'000'000>>;
inline bool WaitOnAddress(volatile const void* val, const void* compare, size_t size, int64_t* timeout) noexcept
{
// Use the NTDLL version of this function since we can give it relative or absolute times in 100ns units
using RtlWaitOnAddressFn = NTSTATUS(__stdcall*)(volatile const void*, const void*, size_t, int64_t*);
static RtlWaitOnAddressFn RtlWaitOnAddress =
(RtlWaitOnAddressFn)GetProcAddress(GetModuleHandleW(L"ntdll.dll"), "RtlWaitOnAddress");
switch (NTSTATUS ret = RtlWaitOnAddress(val, compare, size, timeout))
{
case CARBWIN_STATUS_SUCCESS:
return true;
default:
CARB_FATAL_UNLESS(
0, "Unexpected result from RtlWaitOnAddress: 0x%lx, GetLastError=%u", ret, ::GetLastError());
CARB_FALLTHROUGH; // (not really, but the compiler doesn't know that CARB_FATAL_UNLESS doesn't return)
case CARBWIN_STATUS_TIMEOUT:
return false;
}
}
template <class T>
inline bool WaitOnAddress(const std::atomic<T>& val, T compare, int64_t* timeout) noexcept
{
static_assert(sizeof(val) == sizeof(compare), "Invalid assumption about atomic");
return WaitOnAddress(std::addressof(val), std::addressof(compare), sizeof(T), timeout);
}
inline bool WaitForAlertByThreadId(const void* addr, int64_t* timeout) noexcept
{
// Use undocumented API unlikely to change since so many things are based on it.
// https://ntdoc.m417z.com/ntwaitforalertbythreadid
// https://dennisbabkin.com/blog/?t=how-to-put-thread-into-kernel-wait-and-to-wake-it-by-thread-id
using NtWaitForAlertByThreadIdFn = NTSTATUS(__stdcall*)(const void*, int64_t*);
static auto NtWaitForAlertByThreadId =
(NtWaitForAlertByThreadIdFn)GetProcAddress(GetModuleHandleW(L"ntdll.dll"), "NtWaitForAlertByThreadId");
switch (NTSTATUS ret = NtWaitForAlertByThreadId(addr, timeout))
{
case CARBWIN_STATUS_SUCCESS:
case CARBWIN_STATUS_ALERTED:
return true;
default:
CARB_FATAL_UNLESS(
0, "Unexpected result from NtWaitForAlertByThreadId: 0x%lx, GetLastError=%u", ret, ::GetLastError());
CARB_FALLTHROUGH; // (not really, but the compiler doesn't know that CARB_FATAL_UNLESS doesn't return)
case CARBWIN_STATUS_TIMEOUT:
return false;
}
}
inline void AlertThreadByThreadId(ThreadId threadId) noexcept
{
// Use undocumented API unlikely to change since so many things are based on it.
// https://ntdoc.m417z.com/ntalertthreadbythreadid
// https://dennisbabkin.com/blog/?t=how-to-put-thread-into-kernel-wait-and-to-wake-it-by-thread-id
using NtAlertThreadByThreadIdFn = NTSTATUS(__stdcall*)(HANDLE); // HANDLE?
static auto NtAlertThreadByThreadId =
(NtAlertThreadByThreadIdFn)GetProcAddress(GetModuleHandleW(L"ntdll.dll"), "NtAlertThreadByThreadId");
NTSTATUS ret = NtAlertThreadByThreadId((HANDLE)(UINT_PTR)threadId);
CARB_ASSERT(ret == CARBWIN_STATUS_SUCCESS);
CARB_UNUSED(ret);
}
template <class TimePoint1, class TimePoint2>
TimePoint1 convertToSystemClock(TimePoint1 tp, TimePoint2, std::true_type) // system_clock
{
// Already in terms of system clock
return tp;
}
template <class TimePoint1, class TimePoint2>
auto convertToSystemClock(TimePoint1 tp, TimePoint2 now, std::false_type) // other Clock
{
// Not using the system clock, so we need to convert to the system clock
return tp - now + std::chrono::system_clock::now();
}
template <class Clock, class Duration>
bool winConvertAbsTime(std::chrono::time_point<Clock, Duration> time_point, int64_t& absTimeOut)
{
auto now = Clock::now();
// Kernel calls are quite slow to return if the time has already elapsed. It's much faster for us to check first.
if (time_point <= now)
{
return false;
}
// Constrain the time to something that is well before the heat death of the universe
auto tp = now + clampDuration(time_point - now);
// Convert the timepoint to the system clock if necessary
auto systemTp = convertToSystemClock(tp, now, std::is_same<Clock, std::chrono::system_clock>{});
// According to https://github.com/microsoft/STL/blob/master/stl/inc/chrono, the system_clock appears to
// use GetSystemTimePreciseAsFileTime minus an epoch value so that it lines up with 1/1/1970 midnight GMT.
// Unfortunately there's not an easy way to check for it here, but we have a unittest in TestSemaphore.cpp.
absTimeOut = std::chrono::duration_cast<detail::hundrednanos>(systemTp.time_since_epoch()).count();
// Epoch value from https://github.com/microsoft/STL/blob/master/stl/src/xtime.cpp
// This is the number of 100ns units between 1 January 1601 00:00 GMT and 1 January 1970 00:00 GMT
constexpr int64_t kFiletimeEpochToUnixEpochIn100nsUnits = 0x19DB1DED53E8000LL;
absTimeOut += kFiletimeEpochToUnixEpochIn100nsUnits;
return true;
}
template <class T>
inline void threadId_wait(const std::atomic<T>& val) noexcept
{
WaitForAlertByThreadId(std::addressof(val), nullptr);
}
template <class T, class Rep, class Period>
inline bool threadId_wait_for(const std::atomic<T>& val, std::chrono::duration<Rep, Period> duration)
{
// NtWaitForAlertByThreadId treats negative timeouts as positive relative time
int64_t timeout = -std::chrono::duration_cast<hundrednanos>(clampDuration(duration)).count();
// Duration must be negative for relative time, otherwise we calculated a time in the past.
return timeout < 0 ? WaitForAlertByThreadId(std::addressof(val), &timeout) : false;
}
template <class T, class Clock, class Duration>
inline bool threadId_wait_until(const std::atomic<T>& val, std::chrono::time_point<Clock, Duration> time_point)
{
int64_t absTime;
if (!winConvertAbsTime(time_point, absTime))
return false;
CARB_ASSERT(absTime >= 0);
return WaitForAlertByThreadId(std::addressof(val), &absTime);
}
inline void threadId_wake(ThreadId id) noexcept
{
AlertThreadByThreadId(id);
}
template <class T>
inline void futex_wait(const std::atomic<T>& val, T compare) noexcept
{
WaitOnAddress(val, compare, nullptr);
}
template <class T, class Rep, class Period>
inline bool futex_wait_for(const std::atomic<T>& val, T compare, std::chrono::duration<Rep, Period> duration)
{
// RtlWaitOnAddress treats negative timeouts as positive relative time
int64_t timeout = -std::chrono::duration_cast<hundrednanos>(clampDuration(duration)).count();
// Duration must be negative for relative time, otherwise we calculated a time in the past.
return timeout < 0 ? WaitOnAddress(val, compare, &timeout) : false;
}
template <class T, class Clock, class Duration>
inline bool futex_wait_until(const std::atomic<T>& val, T compare, std::chrono::time_point<Clock, Duration> time_point)
{
int64_t absTime;
if (!winConvertAbsTime(time_point, absTime))
return false;
CARB_ASSERT(absTime >= 0);
return detail::WaitOnAddress(val, compare, &absTime);
}
template <class T>
inline void futex_wake_one(std::atomic<T>& val) noexcept
{
WakeByAddressSingle(std::addressof(val));
}
template <class T>
inline void futex_wake_n(std::atomic<T>& val, size_t n) noexcept
{
while (n--)
futex_wake_one(val);
}
template <class T>
inline void futex_wake_all(std::atomic<T>& val) noexcept
{
WakeByAddressAll(std::addressof(val));
}
class LowLevelLock
{
public:
constexpr LowLevelLock() noexcept = default;
void lock() noexcept
{
AcquireSRWLockExclusive((PSRWLOCK)&m_lock);
}
bool try_lock() noexcept
{
return !!TryAcquireSRWLockExclusive((PSRWLOCK)&m_lock);
}
void unlock() noexcept
{
ReleaseSRWLockExclusive((PSRWLOCK)&m_lock);
}
bool is_locked() const noexcept
{
return m_lock.Ptr != nullptr;
}
private:
CARBWIN_SRWLOCK m_lock = CARBWIN_SRWLOCK_INIT;
};
# if !CARB_USE_PARKINGLOT
template <class T, size_t S = sizeof(T)>
class Futex
{
static_assert(S == 1 || S == 2 || S == 4 || S == 8, "Unsupported size");
public:
using AtomicType = typename std::atomic<T>;
using Type = T;
static inline void wait(const AtomicType& val, Type compare) noexcept
{
futex_wait(val, compare);
}
template <class Rep, class Period>
static inline bool wait_for(const AtomicType& val, Type compare, std::chrono::duration<Rep, Period> duration)
{
return futex_wait_for(val, compare, duration);
}
template <class Clock, class Duration>
static inline bool wait_until(const AtomicType& val, Type compare, std::chrono::time_point<Clock, Duration> time_point)
{
return futex_wait_until(val, compare, time_point);
}
static inline void notify_one(AtomicType& a) noexcept
{
futex_wake_one(a);
}
static inline void notify_n(AtomicType& a, size_t n) noexcept
{
futex_wake_n(a, n);
}
static inline void notify_all(AtomicType& a) noexcept
{
futex_wake_all(a);
}
};
# endif
#elif CARB_PLATFORM_LINUX
# define CARB_USE_PARKINGLOT 1 // Linux only supports 4 byte futex so it must use the ParkingLot
constexpr int64_t kNsPerSec = 1'000'000'000;
inline int futex(const std::atomic_uint32_t& aval,
int futex_op,
uint32_t val,
const struct timespec* timeout,
uint32_t* uaddr2,
int val3) noexcept
{
static_assert(sizeof(aval) == sizeof(uint32_t), "Invalid assumption about atomic");
auto ret = syscall(SYS_futex, std::addressof(aval), futex_op, val, timeout, uaddr2, val3);
return ret >= 0 ? int(ret) : -errno;
}
inline void futex_wait(const std::atomic_uint32_t& val, uint32_t compare) noexcept
{
for (;;)
{
int ret = futex(val, FUTEX_WAIT_BITSET_PRIVATE, compare, nullptr, nullptr, FUTEX_BITSET_MATCH_ANY);
switch (ret)
{
case 0:
case -EAGAIN: // Valid or spurious wakeup
return;
case -ETIMEDOUT:
// Apparently on Windows Subsystem for Linux, calls to the kernel can timeout even when a timeout value
// was not specified. Fall through.
case -EINTR: // Interrupted by signal; loop again
break;
default:
CARB_FATAL_UNLESS(0, "Unexpected result from futex(): %d/%s", -ret, strerror(-ret));
}
}
}
template <class Rep, class Period>
inline bool futex_wait_for(const std::atomic_uint32_t& val, uint32_t compare, std::chrono::duration<Rep, Period> duration)
{
// Relative time
int64_t ns = std::chrono::duration_cast<std::chrono::nanoseconds>(clampDuration(duration)).count();
if (ns <= 0)
{
return false;
}
struct timespec ts;
ts.tv_sec = time_t(ns / detail::kNsPerSec);
ts.tv_nsec = long(ns % detail::kNsPerSec);
// Since we're using relative time here, we can use FUTEX_WAIT_PRIVATE (see futex() man page)
int ret = futex(val, FUTEX_WAIT_PRIVATE, compare, &ts, nullptr, 0);
switch (ret)
{
case 0: // Valid wakeup
case -EAGAIN: // Valid or spurious wakeup
case -EINTR: // Interrupted by signal; treat as a spurious wakeup
return true;
default:
CARB_FATAL_UNLESS(0, "Unexpected result from futex(): %d/%s", -ret, strerror(-ret));
CARB_FALLTHROUGH; // (not really but the compiler doesn't know that the above won't return)
case -ETIMEDOUT:
return false;
}
}
template <class Clock, class Duration>
inline bool futex_wait_until(const std::atomic_uint32_t& val,
uint32_t compare,
std::chrono::time_point<Clock, Duration> time_point)
{
struct timespec ts;
clock_gettime(CLOCK_MONOTONIC, &ts);
// Constrain the time to something that is well before the heat death of the universe
auto now = Clock::now();
auto tp = now + clampDuration(time_point - now);
// Get the number of nanoseconds to go
int64_t ns = std::chrono::duration_cast<std::chrono::nanoseconds>(tp - now).count();
if (ns <= 0)
{
return false;
}
ts.tv_sec += time_t(ns / kNsPerSec);
ts.tv_nsec += long(ns % kNsPerSec);
// Handle rollover
if (ts.tv_nsec >= kNsPerSec)
{
++ts.tv_sec;
ts.tv_nsec -= kNsPerSec;
}
for (;;)
{
// Since we're using absolute monotonic time, we use FUTEX_WAIT_BITSET_PRIVATE. See the man page for futex for
// more info.
int ret = futex(val, FUTEX_WAIT_BITSET_PRIVATE, compare, &ts, nullptr, FUTEX_BITSET_MATCH_ANY);
switch (ret)
{
case 0: // Valid wakeup
case -EAGAIN: // Valid or spurious wakeup
return true;
case -EINTR: // Interrupted by signal; loop again
break;
default:
CARB_FATAL_UNLESS(0, "Unexpected result from futex(): %d/%s", -ret, strerror(-ret));
CARB_FALLTHROUGH; // (not really but the compiler doesn't know that the above won't return)
case -ETIMEDOUT:
return false;
}
}
}
inline void futex_wake_n(std::atomic_uint32_t& val, unsigned count) noexcept
{
int ret = futex(val, FUTEX_WAKE_BITSET_PRIVATE, count, nullptr, nullptr, FUTEX_BITSET_MATCH_ANY);
CARB_ASSERT(ret >= 0, "futex(FUTEX_WAKE) failed with errno=%d/%s", -ret, strerror(-ret));
CARB_UNUSED(ret);
}
inline void futex_wake_one(std::atomic_uint32_t& val) noexcept
{
futex_wake_n(val, 1);
}
inline void futex_wake_all(std::atomic_uint32_t& val) noexcept
{
futex_wake_n(val, INT_MAX);
}
#elif CARB_PLATFORM_MACOS
# define CARB_USE_PARKINGLOT 1
# define UL_COMPARE_AND_WAIT 1
# define UL_UNFAIR_LOCK 2
# define UL_COMPARE_AND_WAIT_SHARED 3
# define UL_UNFAIR_LOCK64_SHARED 4
# define UL_COMPARE_AND_WAIT64 5
# define UL_COMPARE_AND_WAIT64_SHARED 6
# define ULF_WAKE_ALL 0x00000100
# define ULF_WAKE_THREAD 0x00000200
# define ULF_NO_ERRNO 0x01000000
extern "C" int __ulock_wait(uint32_t operation, void* addr, uint64_t value, uint32_t timeout);
extern "C" int __ulock_wake(uint32_t operation, void* addr, uint64_t wake_value);
inline void futex_wait(const std::atomic_uint32_t& val, uint32_t compare) noexcept
{
for (;;)
{
int rc = __ulock_wait(UL_COMPARE_AND_WAIT | ULF_NO_ERRNO,
const_cast<uint32_t*>(reinterpret_cast<const uint32_t*>(std::addressof(val))), compare, 0);
if (rc >= 0)
{
// According to XNU source, the non-negative return value is the number of remaining waiters.
// See ulock_wait_cleanup in sys_ulock.c
return;
}
switch (-rc)
{
case EINTR: // According to XNU source, EINTR can be returned.
continue;
case ETIMEDOUT:
CARB_FALLTHROUGH;
case EFAULT:
CARB_FALLTHROUGH;
default:
CARB_FATAL_UNLESS(0, "Unexpected result from __ulock_wait: %d/%s", -rc, strerror(-rc));
}
}
}
template <class Rep, class Period>
inline bool futex_wait_for(const std::atomic_uint32_t& val, uint32_t compare, std::chrono::duration<Rep, Period> duration)
{
// Relative time
int64_t usec = std::chrono::duration_cast<std::chrono::microseconds>(clampDuration(duration)).count();
if (usec <= 0)
{
return false;
}
if (usec > UINT32_MAX)
{
usec = UINT32_MAX;
}
int rc = __ulock_wait(UL_COMPARE_AND_WAIT | ULF_NO_ERRNO,
const_cast<uint32_t*>(reinterpret_cast<const uint32_t*>(std::addressof(val))), compare, usec);
if (rc >= 0)
{
// According to XNU source, the non-negative return value is the number of remaining waiters.
// See ulock_wait_cleanup in sys_ulock.c
return true;
}
switch (-rc)
{
case EINTR: // Treat signal interrupt as a spurious wakeup
return true;
case ETIMEDOUT:
return false;
default:
CARB_FATAL_UNLESS(0, "Unexpected result from __ulock_wait: %d/%s", -rc, strerror(-rc));
}
}
template <class Clock, class Duration>
inline bool futex_wait_until(const std::atomic_uint32_t& val,
uint32_t compare,
std::chrono::time_point<Clock, Duration> time_point)
{
// Constrain the time to something that is well before the heat death of the universe
auto now = Clock::now();
auto tp = now + clampDuration(time_point - now);
// Convert to number of microseconds from now
int64_t usec = std::chrono::duration_cast<std::chrono::microseconds>(tp - now).count();
if (usec <= 0)
{
return false;
}
if (usec > UINT32_MAX)
{
usec = UINT32_MAX;
}
int rc = __ulock_wait(UL_COMPARE_AND_WAIT | ULF_NO_ERRNO,
const_cast<uint32_t*>(reinterpret_cast<const uint32_t*>(std::addressof(val))), compare, usec);
if (rc >= 0)
{
// According to XNU source, the non-negative return value is the number of remaining waiters.
// See ulock_wait_cleanup in sys_ulock.c
return true;
}
switch (-rc)
{
case EINTR: // Treat signal interrupt as a spurious wakeup
return true;
case ETIMEDOUT:
return false;
default:
CARB_FATAL_UNLESS(0, "Unexpected result from __ulock_wait: %d/%s", -rc, strerror(-rc));
}
}
inline void futex_wake_n(std::atomic_uint32_t& val, unsigned count) noexcept
{
for (unsigned i = 0; i < count; i++)
{
__ulock_wake(UL_COMPARE_AND_WAIT, std::addressof(val), 0);
}
}
inline void futex_wake_one(std::atomic_uint32_t& val) noexcept
{
__ulock_wake(UL_COMPARE_AND_WAIT, std::addressof(val), 0);
}
inline void futex_wake_all(std::atomic_uint32_t& val) noexcept
{
__ulock_wake(UL_COMPARE_AND_WAIT | ULF_WAKE_ALL, std::addressof(val), 0);
}
#endif
class NativeFutex
{
public:
using AtomicType = std::atomic_uint32_t;
using Type = uint32_t;
static inline void wait(const AtomicType& val, Type compare) noexcept
{
futex_wait(val, compare);
}
template <class Rep, class Period>
static inline bool wait_for(const AtomicType& val, Type compare, std::chrono::duration<Rep, Period> duration)
{
return futex_wait_for(val, compare, duration);
}
template <class Clock, class Duration>
static inline bool wait_until(const AtomicType& val, Type compare, std::chrono::time_point<Clock, Duration> time_point)
{
return futex_wait_until(val, compare, time_point);
}
static inline void notify_one(AtomicType& a) noexcept
{
futex_wake_one(a);
}
static inline void notify_n(AtomicType& a, size_t n) noexcept
{
futex_wake_n(a, unsigned(carb_min<size_t>(UINT_MAX, n)));
}
static inline void notify_all(AtomicType& a) noexcept
{
futex_wake_all(a);
}
};
#if !CARB_PLATFORM_WINDOWS
// This class requires that it fits entirely within the same cache line.
class alignas(2 * sizeof(NativeFutex::Type)) LowLevelLock
{
public:
CARB_IF_NOT_TSAN(constexpr) LowLevelLock() noexcept
{
__tsan_mutex_create(this, __tsan_mutex_not_static);
}
CARB_IF_TSAN(~LowLevelLock() { __tsan_mutex_destroy(this, __tsan_mutex_not_static); })
void lock() noexcept
{
__tsan_mutex_pre_lock(this, 0);
// seq_cst so subsequent loads can be relaxed
while (m_lock.exchange(kLocked, std::memory_order_seq_cst) == kLocked)
{
if (!this_thread::spinTryWaitWithBackoff([&] { return !is_locked(); }))
{
// NativeFutex operations are comparatively slow, so m_waiters will track whether any threads are
// actually waiting.
++m_waiters;
while (is_locked())
NativeFutex::wait(m_lock, kLocked);
--m_waiters;
}
}
__tsan_mutex_post_lock(this, 0, 0);
}
bool try_lock() noexcept
{
__tsan_mutex_pre_lock(this, __tsan_mutex_try_lock);
if (m_lock.exchange(kLocked, std::memory_order_seq_cst) == kUnlocked)
{
__tsan_mutex_post_lock(this, __tsan_mutex_try_lock, 0);
return true;
}
__tsan_mutex_post_lock(this, __tsan_mutex_try_lock_failed, 0);
return false;
}
void unlock() noexcept
{
__tsan_mutex_pre_unlock(this, 0);
CARB_ASSERT(is_locked());
// This operation requires that both members be in the same cache line to work properly. By doing an exchange()
// which is RMW, we have acquire/release semantics, which allows the subsequent load to be relaxed.
m_lock.exchange(kUnlocked, std::memory_order_seq_cst);
if (m_waiters.load(std::memory_order_relaxed))
{
__tsan_mutex_pre_signal(this, 0);
NativeFutex::notify_one(m_lock);
__tsan_mutex_post_signal(this, 0);
}
__tsan_mutex_post_unlock(this, 0);
}
bool is_locked() const noexcept
{
return m_lock.load(std::memory_order_relaxed) == kLocked;
}
private:
constexpr static NativeFutex::Type kUnlocked = 0;
constexpr static NativeFutex::Type kLocked = 1;
NativeFutex::AtomicType m_lock{ kUnlocked };
NativeFutex::AtomicType m_waiters{ 0 };
};
#endif
struct ParkingLot
{
struct WaitEntry
{
enum Bits : uint32_t
{
kNoBits = 0,
kNotifyBit = 1,
kWaitBit = 2,
};
const void* addr;
WaitEntry* next{ nullptr };
WaitEntry* prev{ nullptr };
uint32_t changeId{ 0 };
NativeFutex::AtomicType wakeup{ kNoBits };
#if CARB_PLATFORM_WINDOWS
ThreadId threadId{ this_thread::getId() };
#endif
};
class WakeCache
{
public:
constexpr static size_t kWakeCacheSize = 128;
WakeCache() noexcept = default;
~WakeCache()
{
wakeAll();
}
#if CARB_PLATFORM_WINDOWS
using Entry = ThreadId;
static Entry fromWaitEntry(WaitEntry& we) noexcept
{
return we.threadId;
}
static void doWake(Entry val) noexcept
{
threadId_wake(val);
}
static void doWait(WaitEntry& we) noexcept
{
threadId_wait(we.wakeup);
}
template <class Duration>
static bool doWaitFor(WaitEntry& we, Duration duration)
{
return threadId_wait_for(we.wakeup, duration);
}
template <class TimePoint>
static bool doWaitUntil(WaitEntry& we, TimePoint timePoint)
{
return threadId_wait_until(we.wakeup, timePoint);
}
#else
using Entry = NativeFutex::AtomicType*;
static Entry fromWaitEntry(WaitEntry& we) noexcept
{
return &we.wakeup;
}
static void doWake(Entry val) noexcept
{
NativeFutex::notify_one(*val);
}
static void doWait(WaitEntry& we) noexcept
{
NativeFutex::wait(we.wakeup, WaitEntry::kNoBits);
}
template <class Duration>
static bool doWaitFor(WaitEntry& we, Duration duration)
{
return NativeFutex::wait_for(we.wakeup, WaitEntry::kNoBits, duration);
}
template <class TimePoint>
static bool doWaitUntil(WaitEntry& we, TimePoint timePoint)
{
return NativeFutex::wait_until(we.wakeup, WaitEntry::kNoBits, timePoint);
}
#endif
CARB_NODISCARD bool add(WaitEntry& we) noexcept
{
CARB_ASSERT(m_end < pastTheEnd());
*(m_end++) = fromWaitEntry(we);
return m_end != pastTheEnd();
}
void wakeAll() noexcept
{
for (Entry* p = &m_entries[0]; p != m_end; ++p)
doWake(*p);
m_end = &m_entries[0];
}
private:
const Entry* pastTheEnd() const noexcept
{
return &m_entries[kWakeCacheSize];
}
Entry m_entries[kWakeCacheSize];
Entry* m_end = &m_entries[0];
};
class WaitBucket
{
LowLevelLock m_lock;
WaitEntry* m_head{ nullptr };
WaitEntry* m_tail{ nullptr };
std::atomic_uint32_t m_changeTracker{ 0 };
void assertLockState() const noexcept
{
CARB_ASSERT(m_lock.is_locked());
}
public:
CARB_IF_NOT_TSAN(constexpr) WaitBucket() noexcept = default;
CARB_NODISCARD std::unique_lock<LowLevelLock> lock() noexcept
{
return std::unique_lock<LowLevelLock>(m_lock);
}
void incChangeTracker() noexcept
{
assertLockState(); // under lock
m_changeTracker.store(m_changeTracker.load(std::memory_order_relaxed) + 1, std::memory_order_relaxed);
}
bool appearsEmpty() const noexcept
{
// fence-fence synchronization with wait functions, forces visibility
this_thread::atomic_fence_seq_cst();
// cpp::atomic_ref() is dependent on this file, so we can't use it.
return reinterpret_cast<const std::atomic<WaitEntry*>&>(m_head).load(std::memory_order_relaxed) == nullptr;
}
std::atomic_uint32_t& changeTracker() noexcept
{
return m_changeTracker;
}
WaitEntry* head() noexcept
{
assertLockState();
return m_head;
}
void append(WaitEntry* e) noexcept
{
assertLockState();
e->prev = std::exchange(m_tail, e);
e->next = nullptr;
if (e->prev)
{
e->prev->next = e;
}
else
{
CARB_ASSERT(m_head == nullptr);
m_head = e;
}
}
void remove(WaitEntry* e) noexcept
{
assertLockState();
if (e->next)
{
e->next->prev = e->prev;
}
else
{
CARB_ASSERT(m_tail == e);
m_tail = e->prev;
}
if (e->prev)
{
e->prev->next = e->next;
}
else
{
CARB_ASSERT(m_head == e);
m_head = e->next;
}
}
constexpr static size_t kNumWaitBuckets = 2048; // Must be a power of two
static_assert(carb::cpp::has_single_bit(kNumWaitBuckets), "Invalid assumption");
static inline WaitBucket& bucket(const void* addr) noexcept
{
static WaitBucket waitBuckets[kNumWaitBuckets];
#if 1
// FNV-1a hash is fast with really good distribution
// In "futex buckets" test, about ~70% on Windows and ~80% on Linux
auto hash = carb::hashBuffer(&addr, sizeof(addr));
return waitBuckets[hash & (kNumWaitBuckets - 1)];
#else
// Simple bitshift method
// In "futex buckets" test:
// >> 4 bits: ~71% on Windows, ~72% on Linux
// >> 5 bits: ~42% on Windows, ~71% on Linux
return waitBuckets[(size_t(addr) >> 4) & (kNumWaitBuckets - 1)];
#endif
}
};
template <typename T>
static void wait(const std::atomic<T>& val, T compare) noexcept
{
WaitEntry entry{ std::addressof(val) };
using I = to_integral_t<T>;
// Check before waiting
if (reinterpret_as<I>(val.load(std::memory_order_acquire)) != reinterpret_as<I>(compare))
{
return;
}
WaitBucket& b = WaitBucket::bucket(std::addressof(val));
{
auto lock = b.lock();
// Check inside the lock to reduce spurious wakeups
if (CARB_UNLIKELY(reinterpret_as<I>(val.load(std::memory_order_relaxed)) != reinterpret_as<I>(compare)))
{
return;
}
entry.changeId = b.changeTracker().load(std::memory_order_relaxed);
b.append(&entry);
}
// Do the wait if everything is consistent
NativeFutex::Type v = WaitEntry::kNoBits;
if (CARB_LIKELY(reinterpret_as<I>(val.load(std::memory_order_acquire)) == reinterpret_as<I>(compare) &&
b.changeTracker().load(std::memory_order_relaxed) == entry.changeId))
{
// Do the wait
WakeCache::doWait(entry);
// Full fence to force visibility on entry.wakeup, which can be a relaxed load
this_thread::atomic_fence_seq_cst();
v = entry.wakeup.load(std::memory_order_relaxed);
// See if already removed without a wait bit
if (CARB_LIKELY(v == WaitEntry::kNotifyBit))
return;
}
// May not have been removed
if (!(v & WaitEntry::kNotifyBit))
{
// Need to remove
auto lock = b.lock();
// Check again under the lock (relaxed because we're under the lock)
v = entry.wakeup.load(std::memory_order_relaxed);
if (CARB_LIKELY(v == WaitEntry::kNoBits))
{
b.remove(&entry);
return;
}
}
// Spin briefly while the wait bit is set, though this should be rare
if (CARB_UNLIKELY(v & WaitEntry::kWaitBit))
this_thread::spinWait([&] { return !(entry.wakeup.load(std::memory_order_acquire) & WaitEntry::kWaitBit); });
}
template <typename T, typename Rep, typename Period>
static bool wait_for(const std::atomic<T>& val, T compare, std::chrono::duration<Rep, Period> duration)
{
WaitEntry entry{ std::addressof(val) };
using I = to_integral_t<T>;
// Check before waiting
if (reinterpret_as<I>(val.load(std::memory_order_acquire)) != reinterpret_as<I>(compare))
{
return true;
}
WaitBucket& b = WaitBucket::bucket(std::addressof(val));
{
auto lock = b.lock();
// Check inside the lock to reduce spurious wakeups
if (CARB_UNLIKELY(reinterpret_as<I>(val.load(std::memory_order_relaxed)) != reinterpret_as<I>(compare)))
{
return true;
}
entry.changeId = b.changeTracker().load(std::memory_order_relaxed);
b.append(&entry);
}
// Do the wait if everything is consistent
bool wasNotified = true;
NativeFutex::Type v = WaitEntry::kNoBits;
if (CARB_LIKELY(reinterpret_as<I>(val.load(std::memory_order_acquire)) == reinterpret_as<I>(compare) &&
b.changeTracker().load(std::memory_order_relaxed) == entry.changeId))
{
// Do the wait
wasNotified = WakeCache::doWaitFor(entry, duration);
if (wasNotified) // specifically woken
{
// Full fence to force visibility on entry.wakeup, which can be a relaxed load
this_thread::atomic_fence_seq_cst();
v = entry.wakeup.load(std::memory_order_relaxed);
// See if already removed without a wait bit
if (CARB_LIKELY(v == WaitEntry::kNotifyBit))
return true;
}
}
// May not have been removed.
if (!(v & WaitEntry::kNotifyBit))
{
// Need to remove
auto lock = b.lock();
// Check again under the lock (relaxed because we're under the lock)
v = entry.wakeup.load(std::memory_order_relaxed);
if (CARB_LIKELY(v == WaitEntry::kNoBits))
{
b.remove(&entry);
return wasNotified;
}
else
{
// Already removed, so treat it as a notification rather than a timeout.
wasNotified = true;
}
}
// Spin briefly while the wait bit is set, though this should be rare
if (CARB_UNLIKELY(v & WaitEntry::kWaitBit))
this_thread::spinWait([&] { return !(entry.wakeup.load(std::memory_order_acquire) & WaitEntry::kWaitBit); });
return wasNotified;
}
template <typename T, typename Clock, typename Duration>
static bool wait_until(const std::atomic<T>& val, T compare, std::chrono::time_point<Clock, Duration> time_point)
{
WaitEntry entry{ std::addressof(val) };
using I = to_integral_t<T>;
// Check before waiting
if (reinterpret_as<I>(val.load(std::memory_order_acquire)) != reinterpret_as<I>(compare))
{
return true;
}
WaitBucket& b = WaitBucket::bucket(std::addressof(val));
{
auto lock = b.lock();
// Check inside the lock to reduce spurious wakeups
if (CARB_UNLIKELY(reinterpret_as<I>(val.load(std::memory_order_relaxed)) != reinterpret_as<I>(compare)))
{
return true;
}
entry.changeId = b.changeTracker().load(std::memory_order_relaxed);
b.append(&entry);
}
// Do the wait if everything is consistent
bool wasNotified = true;
NativeFutex::Type v = WaitEntry::kNoBits;
if (CARB_LIKELY(reinterpret_as<I>(val.load(std::memory_order_acquire)) == reinterpret_as<I>(compare) &&
b.changeTracker().load(std::memory_order_relaxed) == entry.changeId))
{
// Do the wait
wasNotified = WakeCache::doWaitUntil(entry, time_point);
if (wasNotified)
{
// Full fence to force visibility on entry.wakeup, which can be a relaxed load
this_thread::atomic_fence_seq_cst();
v = entry.wakeup.load(std::memory_order_relaxed);
// See if already removed without a wait bit
if (CARB_LIKELY(v == WaitEntry::kNotifyBit))
return true;
}
}
// May not have been removed.
if (!(v & WaitEntry::kNotifyBit))
{
// Need to remove
auto lock = b.lock();
// Check again under the lock (relaxed because we're under the lock)
v = entry.wakeup.load(std::memory_order_relaxed);
if (CARB_LIKELY(v == WaitEntry::kNoBits))
{
b.remove(&entry);
return wasNotified;
}
else
{
// Already removed, so treat it as a notification rather than a timeout.
wasNotified = true;
}
}
// Spin briefly while the wait bit is set, though this should be rare
if (CARB_UNLIKELY(v & WaitEntry::kWaitBit))
this_thread::spinWait([&] { return !(entry.wakeup.load(std::memory_order_acquire) & WaitEntry::kWaitBit); });
return wasNotified;
}
static void notify_one(void* addr) noexcept
{
WaitBucket& b = WaitBucket::bucket(addr);
// Read empty state with full fence to avoid locking
if (b.appearsEmpty())
return;
auto lock = b.lock();
b.incChangeTracker();
for (WaitEntry* e = b.head(); e; e = e->next)
{
if (e->addr == addr)
{
// Remove before setting the wakeup flag
b.remove(e);
// `e` may be destroyed after we unlock, so record the value that we need here.
auto val = WakeCache::fromWaitEntry(*e);
// release semantics despite being under the lock because we want `val` read before store.
// No access to `e` allowed after this store.
e->wakeup.store(WaitEntry::kNotifyBit, std::memory_order_release);
// Unlock and issue the wake
lock.unlock();
WakeCache::doWake(val);
return;
}
}
}
private:
static CARB_NOINLINE void notify_n_slow(
std::unique_lock<LowLevelLock> lock, void* addr, size_t n, WaitBucket& b, WaitEntry* e, WakeCache& wakeCache) noexcept
{
// We are waking a lot of things and the wakeCache is completely full. Now we need to start building a list
WaitEntry *wake = nullptr, *end = nullptr;
for (WaitEntry* next; e; e = next)
{
next = e->next;
if (e->addr == addr)
{
b.remove(e);
e->next = nullptr;
// Don't bother with prev pointers
if (end)
end->next = e;
else
wake = e;
end = e;
// Relaxed because we're under the lock.
// Need to set the wait bit since we're still reading/writing to the WaitEntry
e->wakeup.store(WaitEntry::kWaitBit | WaitEntry::kNotifyBit, std::memory_order_relaxed);
if (!--n)
break;
}
}
lock.unlock();
// Wake the entire cache since we know it's full
wakeCache.wakeAll();
for (WaitEntry* next; wake; wake = next)
{
next = wake->next;
auto val = WakeCache::fromWaitEntry(*wake);
// Clear the wait bit so that only the wake bit is set. No accesses to `wake` are allowed after this since
// the waiting thread is free to destroy it. Release to synchronize-with spin-waiting on the wait bit.
wake->wakeup.store(WaitEntry::kNotifyBit, std::memory_order_release);
WakeCache::doWake(val);
}
}
static CARB_NOINLINE void notify_all_slow(
std::unique_lock<LowLevelLock> lock, void* addr, WaitBucket& b, WaitEntry* e, WakeCache& wakeCache) noexcept
{
// We are waking a lot of things and the wakeCache is completely full. Now we need to start building a list
WaitEntry *wake = nullptr, *end = nullptr;
for (WaitEntry* next; e; e = next)
{
next = e->next;
if (e->addr == addr)
{
b.remove(e);
e->next = nullptr;
// Don't bother with prev pointers
if (end)
end->next = e;
else
wake = e;
end = e;
// Relaxed because we're under the lock.
// Need to set the wait bit since we're still reading/writing to the WaitEntry
e->wakeup.store(WaitEntry::kWaitBit | WaitEntry::kNotifyBit, std::memory_order_relaxed);
}
}
lock.unlock();
// Wake the entire cache since we know it's full
wakeCache.wakeAll();
for (WaitEntry* next; wake; wake = next)
{
next = wake->next;
auto val = WakeCache::fromWaitEntry(*wake);
// Clear the wait bit so that only the wake bit is set. No accesses to `wake` are allowed after this since
// the waiting thread is free to destroy it. Release to synchronize-with spin-waiting on the wait bit.
wake->wakeup.store(WaitEntry::kNotifyBit, std::memory_order_release);
WakeCache::doWake(val);
}
}
public:
static void notify_n(void* addr, size_t n) noexcept
{
if (CARB_UNLIKELY(n == 0))
{
return;
}
WaitBucket& b = WaitBucket::bucket(addr);
// Read empty state with full fence to avoid locking
if (b.appearsEmpty())
return;
// It is much faster overall to not set kWaitBit and force the woken threads to wait until we're clear of their
// WaitEntry, so keep a local cache of addresses to wake here that don't require a WaitEntry.
// Note that we do retain a pointer to the std::atomic_uint32_t *contained in* the WaitEntry and tell the
// underlying OS system to wake by that address, but this is safe as it does not read that memory. The address
// is used as a lookup to find any waiters that registered with that address. If the memory was quickly reused
// for a different wait operation, this will cause a spurious wakeup which is allowed, but this should be
// exceedingly rare.
// Must be constructed before the lock is taken
WakeCache wakeCache;
auto lock = b.lock();
b.incChangeTracker();
for (WaitEntry *next, *e = b.head(); e; e = next)
{
next = e->next;
if (e->addr == addr)
{
b.remove(e);
bool nowFull = wakeCache.add(*e);
// release semantics despite being under the lock because we want add() complete before store.
// No access to `e` allowed after this store.
e->wakeup.store(WaitEntry::kNotifyBit, std::memory_order_release);
if (!--n)
break;
if (CARB_UNLIKELY(nowFull))
{
// Cache is full => transition to a list for the rest
notify_n_slow(std::move(lock), addr, n, b, next, wakeCache);
return;
}
}
}
// Lock is unlocked (important this happens first), and then wakeCache destructs and issues all of its wakes
}
static void notify_all(void* addr) noexcept
{
WaitBucket& b = WaitBucket::bucket(addr);
// Read empty state with full fence to avoid locking
if (b.appearsEmpty())
return;
// It is much faster overall to not set kWaitBit and force the woken threads to wait until we're clear of their
// WaitEntry, so keep a local cache of addresses to wake here that don't require a WaitEntry.
// Note that we do retain a pointer to the std::atomic_uint32_t *contained in* the WaitEntry and tell the
// underlying OS system to wake by that address, but this is safe as it does not read that memory. The address
// is used as a lookup to find any waiters that registered with that address. If the memory was quickly reused
// for a different wait operation, this will cause a spurious wakeup which is allowed, but this should be
// exceedingly rare.
// Must be constructed before the lock is taken
WakeCache wakeCache;
auto lock = b.lock();
b.incChangeTracker();
for (WaitEntry *next, *e = b.head(); e; e = next)
{
next = e->next;
if (e->addr == addr)
{
b.remove(e);
bool nowFull = wakeCache.add(*e);
// release semantics despite being under the lock because we want add() complete before store.
// No access to `e` allowed after this store.
e->wakeup.store(WaitEntry::kNotifyBit, std::memory_order_release);
if (CARB_UNLIKELY(nowFull))
{
// Full => transition to a list for the rest of the items
notify_all_slow(std::move(lock), addr, b, next, wakeCache);
return;
}
}
}
// Lock is unlocked (important this happens first), and then wakeCache destructs and issues all of its wakes
}
}; // struct ParkingLot
template <class T, size_t S = sizeof(T)>
class ParkingLotFutex
{
static_assert(S == 1 || S == 2 || S == 4 || S == 8, "Unsupported size");
public:
using AtomicType = typename std::atomic<T>;
using Type = T;
static void wait(const AtomicType& val, T compare) noexcept
{
ParkingLot::wait(val, compare);
}
template <class Rep, class Period>
static bool wait_for(const AtomicType& val, T compare, std::chrono::duration<Rep, Period> duration)
{
return ParkingLot::wait_for(val, compare, duration);
}
template <class Clock, class Duration>
static bool wait_until(const AtomicType& val, T compare, std::chrono::time_point<Clock, Duration> time_point)
{
return ParkingLot::wait_until(val, compare, time_point);
}
static void notify_one(AtomicType& val) noexcept
{
ParkingLot::notify_one(std::addressof(val));
}
static void notify_n(AtomicType& val, size_t count) noexcept
{
ParkingLot::notify_n(std::addressof(val), count);
}
static void notify_all(AtomicType& val) noexcept
{
ParkingLot::notify_all(std::addressof(val));
}
};
// Futex types that must use the ParkingLot
#if CARB_USE_PARKINGLOT
template <class T, size_t S = sizeof(T)>
using Futex = ParkingLotFutex<T, S>;
#endif
} // namespace detail
} // namespace thread
} // namespace carb