ParkingLot.h#
Fully qualified name: carb/thread/detail/ParkingLot.h
File members: carb/thread/detail/ParkingLot.h
// Copyright (c) 2024-2025, NVIDIA CORPORATION. All rights reserved.
//
// NVIDIA CORPORATION and its licensors retain all intellectual property
// and proprietary rights in and to this software, related documentation
// and any modifications thereto. Any use, reproduction, disclosure or
// distribution of this software and related documentation without an express
// license agreement from NVIDIA CORPORATION is strictly prohibited.
//
#pragma once
#include "NativeFutex.h"
#include "LowLevelLock.h"
#include <atomic>
#include <chrono>
#include <cstddef>
#include <memory>
#include <mutex>
#include <utility>
namespace carb
{
namespace thread
{
namespace detail
{
// NOTE: We cannot use thread ID wakeup on Windows while LowLevelLock is used (and it also uses thread ID wakeup) as
// this will lead to hard-to-debug hangs (See OVCC-1662)
#define CARBLOCAL_WIN_USE_THREADID 0
struct ParkingLot
{
struct WaitEntry
{
enum Bits : uint32_t
{
kNoBits = 0,
kNotifyBit = 1,
kWaitBit = 2,
};
const void* addr;
WaitEntry* next{ nullptr };
WaitEntry* prev{ nullptr };
uint32_t changeId{ 0 };
NativeFutex::AtomicType wakeup{ kNoBits };
#if CARB_PLATFORM_WINDOWS && CARBLOCAL_WIN_USE_THREADID
ThreadId threadId{ this_thread::getId() };
#endif
};
class WakeCache
{
public:
constexpr static size_t kWakeCacheSize = 128;
WakeCache() noexcept = default;
~WakeCache()
{
wakeAll();
}
#if CARB_PLATFORM_WINDOWS && CARBLOCAL_WIN_USE_THREADID
using Entry = ThreadId;
static Entry fromWaitEntry(WaitEntry& we) noexcept
{
return we.threadId;
}
static void doWake(Entry val) noexcept
{
threadId_wake(val);
}
static void doWait(WaitEntry& we) noexcept
{
threadId_wait(we.wakeup);
}
template <class Duration>
static bool doWaitFor(WaitEntry& we, Duration duration)
{
return threadId_wait_for(we.wakeup, duration);
}
template <class TimePoint>
static bool doWaitUntil(WaitEntry& we, TimePoint timePoint)
{
return threadId_wait_until(we.wakeup, timePoint);
}
#else
using Entry = NativeFutex::AtomicType*;
static Entry fromWaitEntry(WaitEntry& we) noexcept
{
return &we.wakeup;
}
static void doWake(Entry val) noexcept
{
NativeFutex::notify_one(*val);
}
static void doWait(WaitEntry& we) noexcept
{
NativeFutex::wait(we.wakeup, WaitEntry::kNoBits);
}
template <class Duration>
static bool doWaitFor(WaitEntry& we, Duration duration)
{
return NativeFutex::wait_for(we.wakeup, WaitEntry::kNoBits, duration);
}
template <class TimePoint>
static bool doWaitUntil(WaitEntry& we, TimePoint timePoint)
{
return NativeFutex::wait_until(we.wakeup, WaitEntry::kNoBits, timePoint);
}
#endif
CARB_NODISCARD bool add(WaitEntry& we) noexcept
{
CARB_ASSERT(m_end < pastTheEnd());
*(m_end++) = fromWaitEntry(we);
return m_end != pastTheEnd();
}
void wakeAll() noexcept
{
for (Entry* p = &m_entries[0]; p != m_end; ++p)
doWake(*p);
m_end = &m_entries[0];
}
private:
const Entry* pastTheEnd() const noexcept
{
return &m_entries[kWakeCacheSize];
}
Entry m_entries[kWakeCacheSize];
Entry* m_end = &m_entries[0];
};
class WaitBucket
{
LowLevelLock m_lock;
WaitEntry* m_head{ nullptr };
WaitEntry* m_tail{ nullptr };
std::atomic_uint32_t m_changeTracker{ 0 };
void assertLockState() const noexcept
{
CARB_ASSERT(m_lock.is_locked());
}
public:
CARB_IF_NOT_TSAN(constexpr) WaitBucket() noexcept = default;
CARB_NODISCARD std::unique_lock<LowLevelLock> lock() noexcept
{
return std::unique_lock<LowLevelLock>(m_lock);
}
void incChangeTracker() noexcept
{
assertLockState(); // under lock
m_changeTracker.store(m_changeTracker.load(std::memory_order_relaxed) + 1, std::memory_order_relaxed);
}
bool appearsEmpty() const noexcept
{
// fence-fence synchronization with wait functions, forces visibility
this_thread::atomic_fence_seq_cst();
// cpp::atomic_ref() is dependent on this file, so we can't use it.
return reinterpret_cast<const std::atomic<WaitEntry*>&>(m_head).load(std::memory_order_relaxed) == nullptr;
}
std::atomic_uint32_t& changeTracker() noexcept
{
return m_changeTracker;
}
WaitEntry* head() noexcept
{
assertLockState();
return m_head;
}
void append(WaitEntry* e) noexcept
{
assertLockState();
e->prev = std::exchange(m_tail, e);
e->next = nullptr;
if (e->prev)
{
e->prev->next = e;
}
else
{
CARB_ASSERT(m_head == nullptr);
m_head = e;
}
}
void remove(WaitEntry* e) noexcept
{
assertLockState();
if (e->next)
{
e->next->prev = e->prev;
}
else
{
CARB_ASSERT(m_tail == e);
m_tail = e->prev;
}
if (e->prev)
{
e->prev->next = e->next;
}
else
{
CARB_ASSERT(m_head == e);
m_head = e->next;
}
}
constexpr static size_t kNumWaitBuckets = 2048; // Must be a power of two
static_assert(carb::cpp::has_single_bit(kNumWaitBuckets), "Invalid assumption");
static inline WaitBucket& bucket(const void* addr) noexcept
{
static WaitBucket waitBuckets[kNumWaitBuckets];
#if 1
// FNV-1a hash is fast with really good distribution
// In "futex buckets" test, about ~70% on Windows and ~80% on Linux
auto hash = carb::hashBuffer(&addr, sizeof(addr));
return waitBuckets[hash & (kNumWaitBuckets - 1)];
#else
// Simple bitshift method
// In "futex buckets" test:
// >> 4 bits: ~71% on Windows, ~72% on Linux
// >> 5 bits: ~42% on Windows, ~71% on Linux
return waitBuckets[(size_t(addr) >> 4) & (kNumWaitBuckets - 1)];
#endif
}
};
template <typename T>
static void wait(const std::atomic<T>& val, T compare) noexcept
{
WaitEntry entry{ std::addressof(val) };
using I = to_integral_t<T>;
// Check before waiting
if (reinterpret_as<I>(val.load(std::memory_order_acquire)) != reinterpret_as<I>(compare))
{
return;
}
WaitBucket& b = WaitBucket::bucket(std::addressof(val));
{
auto lock = b.lock();
// Check inside the lock to reduce spurious wakeups
if (CARB_UNLIKELY(reinterpret_as<I>(val.load(std::memory_order_relaxed)) != reinterpret_as<I>(compare)))
{
return;
}
entry.changeId = b.changeTracker().load(std::memory_order_relaxed);
b.append(&entry);
}
// Do the wait if everything is consistent
NativeFutex::Type v = WaitEntry::kNoBits;
if (CARB_LIKELY(reinterpret_as<I>(val.load(std::memory_order_acquire)) == reinterpret_as<I>(compare) &&
b.changeTracker().load(std::memory_order_relaxed) == entry.changeId))
{
// Do the wait
WakeCache::doWait(entry);
// Full fence to force visibility on entry.wakeup, which can be a relaxed load
this_thread::atomic_fence_seq_cst();
v = entry.wakeup.load(std::memory_order_relaxed);
// See if already removed without a wait bit
if (CARB_LIKELY(v == WaitEntry::kNotifyBit))
return;
}
// May not have been removed
if (!(v & WaitEntry::kNotifyBit))
{
// Need to remove
auto lock = b.lock();
// Check again under the lock (relaxed because we're under the lock)
v = entry.wakeup.load(std::memory_order_relaxed);
if (CARB_LIKELY(v == WaitEntry::kNoBits))
{
b.remove(&entry);
return;
}
}
// Spin briefly while the wait bit is set, though this should be rare
if (CARB_UNLIKELY(v & WaitEntry::kWaitBit))
this_thread::spinWait([&] { return !(entry.wakeup.load(std::memory_order_acquire) & WaitEntry::kWaitBit); });
}
template <typename T, typename Rep, typename Period>
static bool wait_for(const std::atomic<T>& val, T compare, const std::chrono::duration<Rep, Period>& duration)
{
WaitEntry entry{ std::addressof(val) };
using I = to_integral_t<T>;
// Check before waiting
if (reinterpret_as<I>(val.load(std::memory_order_acquire)) != reinterpret_as<I>(compare))
{
return true;
}
WaitBucket& b = WaitBucket::bucket(std::addressof(val));
{
auto lock = b.lock();
// Check inside the lock to reduce spurious wakeups
if (CARB_UNLIKELY(reinterpret_as<I>(val.load(std::memory_order_relaxed)) != reinterpret_as<I>(compare)))
{
return true;
}
entry.changeId = b.changeTracker().load(std::memory_order_relaxed);
b.append(&entry);
}
// Do the wait if everything is consistent
bool wasNotified = true;
NativeFutex::Type v = WaitEntry::kNoBits;
if (CARB_LIKELY(reinterpret_as<I>(val.load(std::memory_order_acquire)) == reinterpret_as<I>(compare) &&
b.changeTracker().load(std::memory_order_relaxed) == entry.changeId))
{
// Do the wait
wasNotified = WakeCache::doWaitFor(entry, duration);
if (wasNotified) // specifically woken
{
// Full fence to force visibility on entry.wakeup, which can be a relaxed load
this_thread::atomic_fence_seq_cst();
v = entry.wakeup.load(std::memory_order_relaxed);
// See if already removed without a wait bit
if (CARB_LIKELY(v == WaitEntry::kNotifyBit))
return true;
}
}
// May not have been removed.
if (!(v & WaitEntry::kNotifyBit))
{
// Need to remove
auto lock = b.lock();
// Check again under the lock (relaxed because we're under the lock)
v = entry.wakeup.load(std::memory_order_relaxed);
if (CARB_LIKELY(v == WaitEntry::kNoBits))
{
b.remove(&entry);
return wasNotified;
}
else
{
// Already removed, so treat it as a notification rather than a timeout.
wasNotified = true;
}
}
// Spin briefly while the wait bit is set, though this should be rare
if (CARB_UNLIKELY(v & WaitEntry::kWaitBit))
this_thread::spinWait([&] { return !(entry.wakeup.load(std::memory_order_acquire) & WaitEntry::kWaitBit); });
return wasNotified;
}
template <typename T, typename FromClock, typename Duration>
static bool wait_until(const std::atomic<T>& val, T compare, const std::chrono::time_point<FromClock, Duration>& until)
{
WaitEntry entry{ std::addressof(val) };
using I = to_integral_t<T>;
// Check before waiting
if (reinterpret_as<I>(val.load(std::memory_order_acquire)) != reinterpret_as<I>(compare))
{
return true;
}
// Convert to a steady clock
auto when = cpp::detail::convertToClock<std::chrono::steady_clock>(until);
WaitBucket& b = WaitBucket::bucket(std::addressof(val));
{
auto lock = b.lock();
// Check inside the lock to reduce spurious wakeups
if (CARB_UNLIKELY(reinterpret_as<I>(val.load(std::memory_order_relaxed)) != reinterpret_as<I>(compare)))
{
return true;
}
entry.changeId = b.changeTracker().load(std::memory_order_relaxed);
b.append(&entry);
}
// Do the wait if everything is consistent
bool wasNotified = true;
NativeFutex::Type v = WaitEntry::kNoBits;
if (CARB_LIKELY(reinterpret_as<I>(val.load(std::memory_order_acquire)) == reinterpret_as<I>(compare) &&
b.changeTracker().load(std::memory_order_relaxed) == entry.changeId))
{
// Do the wait
wasNotified = WakeCache::doWaitUntil(entry, when);
if (wasNotified)
{
// Full fence to force visibility on entry.wakeup, which can be a relaxed load
this_thread::atomic_fence_seq_cst();
v = entry.wakeup.load(std::memory_order_relaxed);
// See if already removed without a wait bit
if (CARB_LIKELY(v == WaitEntry::kNotifyBit))
return true;
}
}
// May not have been removed.
if (!(v & WaitEntry::kNotifyBit))
{
// Need to remove
auto lock = b.lock();
// Check again under the lock (relaxed because we're under the lock)
v = entry.wakeup.load(std::memory_order_relaxed);
if (CARB_LIKELY(v == WaitEntry::kNoBits))
{
b.remove(&entry);
return wasNotified;
}
else
{
// Already removed, so treat it as a notification rather than a timeout.
wasNotified = true;
}
}
// Spin briefly while the wait bit is set, though this should be rare
if (CARB_UNLIKELY(v & WaitEntry::kWaitBit))
this_thread::spinWait([&] { return !(entry.wakeup.load(std::memory_order_acquire) & WaitEntry::kWaitBit); });
return wasNotified;
}
static void notify_one(void* addr) noexcept
{
WaitBucket& b = WaitBucket::bucket(addr);
// Read empty state with full fence to avoid locking
if (b.appearsEmpty())
return;
auto lock = b.lock();
b.incChangeTracker();
for (WaitEntry* e = b.head(); e; e = e->next)
{
if (e->addr == addr)
{
// Remove before setting the wakeup flag
b.remove(e);
// `e` may be destroyed after we unlock, so record the value that we need here.
auto val = WakeCache::fromWaitEntry(*e);
// release semantics despite being under the lock because we want `val` read before store.
// No access to `e` allowed after this store.
e->wakeup.store(WaitEntry::kNotifyBit, std::memory_order_release);
// Unlock and issue the wake
lock.unlock();
WakeCache::doWake(val);
return;
}
}
}
private:
static CARB_NOINLINE void notify_n_slow(
std::unique_lock<LowLevelLock> lock, void* addr, size_t n, WaitBucket& b, WaitEntry* e, WakeCache& wakeCache) noexcept
{
// We are waking a lot of things and the wakeCache is completely full. Now we need to start building a list
WaitEntry *wake = nullptr, *end = nullptr;
for (WaitEntry* next; e; e = next)
{
next = e->next;
if (e->addr == addr)
{
b.remove(e);
e->next = nullptr;
// Don't bother with prev pointers
if (end)
end->next = e;
else
wake = e;
end = e;
// Relaxed because we're under the lock.
// Need to set the wait bit since we're still reading/writing to the WaitEntry
e->wakeup.store(WaitEntry::kWaitBit | WaitEntry::kNotifyBit, std::memory_order_relaxed);
if (!--n)
break;
}
}
lock.unlock();
// Wake the entire cache since we know it's full
wakeCache.wakeAll();
for (WaitEntry* next; wake; wake = next)
{
next = wake->next;
auto val = WakeCache::fromWaitEntry(*wake);
// Clear the wait bit so that only the wake bit is set. No accesses to `wake` are allowed after this since
// the waiting thread is free to destroy it. Release to synchronize-with spin-waiting on the wait bit.
wake->wakeup.store(WaitEntry::kNotifyBit, std::memory_order_release);
WakeCache::doWake(val);
}
}
static CARB_NOINLINE void notify_all_slow(
std::unique_lock<LowLevelLock> lock, void* addr, WaitBucket& b, WaitEntry* e, WakeCache& wakeCache) noexcept
{
// We are waking a lot of things and the wakeCache is completely full. Now we need to start building a list
WaitEntry *wake = nullptr, *end = nullptr;
for (WaitEntry* next; e; e = next)
{
next = e->next;
if (e->addr == addr)
{
b.remove(e);
e->next = nullptr;
// Don't bother with prev pointers
if (end)
end->next = e;
else
wake = e;
end = e;
// Relaxed because we're under the lock.
// Need to set the wait bit since we're still reading/writing to the WaitEntry
e->wakeup.store(WaitEntry::kWaitBit | WaitEntry::kNotifyBit, std::memory_order_relaxed);
}
}
lock.unlock();
// Wake the entire cache since we know it's full
wakeCache.wakeAll();
for (WaitEntry* next; wake; wake = next)
{
next = wake->next;
auto val = WakeCache::fromWaitEntry(*wake);
// Clear the wait bit so that only the wake bit is set. No accesses to `wake` are allowed after this since
// the waiting thread is free to destroy it. Release to synchronize-with spin-waiting on the wait bit.
wake->wakeup.store(WaitEntry::kNotifyBit, std::memory_order_release);
WakeCache::doWake(val);
}
}
public:
static void notify_n(void* addr, size_t n) noexcept
{
if (CARB_UNLIKELY(n == 0))
{
return;
}
WaitBucket& b = WaitBucket::bucket(addr);
// Read empty state with full fence to avoid locking
if (b.appearsEmpty())
return;
// It is much faster overall to not set kWaitBit and force the woken threads to wait until we're clear of their
// WaitEntry, so keep a local cache of addresses to wake here that don't require a WaitEntry.
// Note that we do retain a pointer to the std::atomic_uint32_t *contained in* the WaitEntry and tell the
// underlying OS system to wake by that address, but this is safe as it does not read that memory. The address
// is used as a lookup to find any waiters that registered with that address. If the memory was quickly reused
// for a different wait operation, this will cause a spurious wakeup which is allowed, but this should be
// exceedingly rare.
// Must be constructed before the lock is taken
WakeCache wakeCache;
auto lock = b.lock();
b.incChangeTracker();
for (WaitEntry *next, *e = b.head(); e; e = next)
{
next = e->next;
if (e->addr == addr)
{
b.remove(e);
bool nowFull = wakeCache.add(*e);
// release semantics despite being under the lock because we want add() complete before store.
// No access to `e` allowed after this store.
e->wakeup.store(WaitEntry::kNotifyBit, std::memory_order_release);
if (!--n)
break;
if (CARB_UNLIKELY(nowFull))
{
// Cache is full => transition to a list for the rest
notify_n_slow(std::move(lock), addr, n, b, next, wakeCache);
return;
}
}
}
// Lock is unlocked (important this happens first), and then wakeCache destructs and issues all of its wakes
}
static void notify_all(void* addr) noexcept
{
WaitBucket& b = WaitBucket::bucket(addr);
// Read empty state with full fence to avoid locking
if (b.appearsEmpty())
return;
// It is much faster overall to not set kWaitBit and force the woken threads to wait until we're clear of their
// WaitEntry, so keep a local cache of addresses to wake here that don't require a WaitEntry.
// Note that we do retain a pointer to the std::atomic_uint32_t *contained in* the WaitEntry and tell the
// underlying OS system to wake by that address, but this is safe as it does not read that memory. The address
// is used as a lookup to find any waiters that registered with that address. If the memory was quickly reused
// for a different wait operation, this will cause a spurious wakeup which is allowed, but this should be
// exceedingly rare.
// Must be constructed before the lock is taken
WakeCache wakeCache;
auto lock = b.lock();
b.incChangeTracker();
for (WaitEntry *next, *e = b.head(); e; e = next)
{
next = e->next;
if (e->addr == addr)
{
b.remove(e);
bool nowFull = wakeCache.add(*e);
// release semantics despite being under the lock because we want add() complete before store.
// No access to `e` allowed after this store.
e->wakeup.store(WaitEntry::kNotifyBit, std::memory_order_release);
if (CARB_UNLIKELY(nowFull))
{
// Full => transition to a list for the rest of the items
notify_all_slow(std::move(lock), addr, b, next, wakeCache);
return;
}
}
}
// Lock is unlocked (important this happens first), and then wakeCache destructs and issues all of its wakes
}
}; // struct ParkingLot
template <class T, size_t S = sizeof(T)>
class ParkingLotFutex
{
static_assert(S == 1 || S == 2 || S == 4 || S == 8, "Unsupported size");
public:
using AtomicType = typename std::atomic<T>;
using Type = T;
static void wait(const AtomicType& val, T compare) noexcept
{
ParkingLot::wait(val, compare);
}
template <class Rep, class Period>
static bool wait_for(const AtomicType& val, T compare, std::chrono::duration<Rep, Period> duration)
{
return ParkingLot::wait_for(val, compare, duration);
}
template <class Clock, class Duration>
static bool wait_until(const AtomicType& val, T compare, std::chrono::time_point<Clock, Duration> time_point)
{
return ParkingLot::wait_until(val, compare, time_point);
}
static void notify_one(AtomicType& val) noexcept
{
ParkingLot::notify_one(std::addressof(val));
}
static void notify_n(AtomicType& val, size_t count) noexcept
{
ParkingLot::notify_n(std::addressof(val), count);
}
static void notify_all(AtomicType& val) noexcept
{
ParkingLot::notify_all(std::addressof(val));
}
};
#undef CARBLOCAL_WIN_USE_THREADID
} // namespace detail
} // namespace thread
} // namespace carb