carb/thread/Util.h
File members: carb/thread/Util.h
// Copyright (c) 2019-2024, NVIDIA CORPORATION. All rights reserved.
//
// NVIDIA CORPORATION and its licensors retain all intellectual property
// and proprietary rights in and to this software, related documentation
// and any modifications thereto. Any use, reproduction, disclosure or
// distribution of this software and related documentation without an express
// license agreement from NVIDIA CORPORATION is strictly prohibited.
//
#pragma once
#include "../Defines.h"
#include "../extras/ScopeExit.h"
#include "../math/Util.h"
#include "../process/Util.h"
#include "../profiler/IProfiler.h"
#include "../../omni/extras/ContainerHelper.h"
#if CARB_PLATFORM_WINDOWS
# include "../CarbWindows.h"
# include "../extras/Unicode.h"
#elif CARB_POSIX
# include <sys/syscall.h>
# include <pthread.h>
# include <sched.h>
# include <unistd.h>
# include <time.h>
#else
CARB_UNSUPPORTED_PLATFORM();
#endif
#if CARB_PLATFORM_MACOS
# pragma push_macro("min")
# pragma push_macro("max")
# undef min
# undef max
# include <mach/thread_policy.h>
# include <mach/thread_act.h>
# pragma pop_macro("max")
# pragma pop_macro("min")
#endif
#include <atomic>
#include <thread>
namespace carb
{
namespace thread
{
using ProcessId = process::ProcessId;
using ThreadId = uint32_t;
using CpuMaskVector = std::vector<uint64_t>;
constexpr uint64_t kCpusPerMask = std::numeric_limits<CpuMaskVector::value_type>::digits;
#if CARB_PLATFORM_WINDOWS
static_assert(sizeof(ThreadId) >= sizeof(DWORD), "ThreadId type is too small");
#elif CARB_POSIX
static_assert(sizeof(ThreadId) >= sizeof(pid_t), "ThreadId type is too small");
#else
CARB_UNSUPPORTED_PLATFORM();
#endif
#define OMNI_PRItid PRIu32
#define OMNI_PRIxtid PRIx32
#if CARB_PLATFORM_WINDOWS
# ifndef DOXYGEN_SHOULD_SKIP_THIS
namespace detail
{
const DWORD MS_VC_EXCEPTION = 0x406D1388;
# pragma pack(push, 8)
typedef struct tagTHREADNAME_INFO
{
DWORD dwType;
LPCSTR szName;
DWORD dwThreadID;
DWORD dwFlags;
} THREADNAME_INFO;
# pragma pack(pop)
inline void setDebuggerThreadName(DWORD threadId, LPCSTR name)
{
// Do it the old way, which is only really useful if the debugger is running
if (::IsDebuggerPresent())
{
detail::THREADNAME_INFO info;
info.dwType = 0x1000;
info.szName = name;
info.dwThreadID = threadId;
info.dwFlags = 0;
# pragma warning(push)
# pragma warning(disable : 6320 6322)
__try
{
::RaiseException(detail::MS_VC_EXCEPTION, 0, sizeof(info) / sizeof(ULONG_PTR), (ULONG_PTR*)&info);
}
__except (CARBWIN_EXCEPTION_EXECUTE_HANDLER)
{
}
# pragma warning(pop)
}
}
} // namespace detail
# endif
using NativeHandleType = HANDLE;
#elif CARB_POSIX
using NativeHandleType = pthread_t;
#else
CARB_UNSUPPORTED_PLATFORM();
#endif
inline void setName(NativeHandleType h, const char* name)
{
#if CARB_PLATFORM_WINDOWS
// Emulate CARB_NAME_THREAD but don't include Profile.h which would create a circular dependency.
if (auto profiler = g_carbProfiler.load(std::memory_order_acquire))
profiler->nameThreadDynamic(::GetThreadId(h), "%s", name);
// SetThreadDescription is only available starting with Windows 10 1607
using PSetThreadDescription = HRESULT(CARBWIN_WINAPI*)(HANDLE, PCWSTR);
static PSetThreadDescription SetThreadDescription =
(PSetThreadDescription)::GetProcAddress(::GetModuleHandleW(L"kernel32.dll"), "SetThreadDescription");
if (SetThreadDescription)
{
bool b = CARBWIN_SUCCEEDED(SetThreadDescription(h, extras::convertUtf8ToWide(name).c_str()));
CARB_UNUSED(b);
CARB_ASSERT(b);
}
else
{
detail::setDebuggerThreadName(::GetThreadId(h), name);
}
#elif CARB_PLATFORM_LINUX
if (h == pthread_self())
{
// Emulate CARB_NAME_THREAD but don't include Profile.h which would create a circular dependency.
if (auto prof = g_carbProfiler.load(std::memory_order_acquire))
prof->nameThreadDynamic(0, "%s", name);
}
if (pthread_setname_np(h, name) != 0)
{
// This is limited to 16 characters including NUL according to the man page.
char buffer[16];
strncpy(buffer, name, 15);
buffer[15] = '\0';
pthread_setname_np(h, buffer);
}
#elif CARB_PLATFORM_MACOS
if (h == pthread_self())
{
pthread_setname_np(name);
}
// not possible to name an external thread on mac
#else
CARB_UNSUPPORTED_PLATFORM();
#endif
}
inline std::string getName(NativeHandleType h)
{
#if CARB_PLATFORM_WINDOWS
// GetThreadDescription is only available starting with Windows 10 1607
using PGetThreadDescription = HRESULT(CARBWIN_WINAPI*)(HANDLE, PWSTR*);
static PGetThreadDescription GetThreadDescription =
(PGetThreadDescription)::GetProcAddress(::GetModuleHandleW(L"kernel32.dll"), "GetThreadDescription");
if (GetThreadDescription)
{
PWSTR threadName;
if (CARBWIN_SUCCEEDED(GetThreadDescription(h, &threadName)))
{
std::string s = extras::convertWideToUtf8(threadName);
::LocalFree(threadName);
return s;
}
}
return std::string();
#elif CARB_PLATFORM_LINUX || CARB_PLATFORM_MACOS
char buffer[64];
if (pthread_getname_np(h, buffer, CARB_COUNTOF(buffer)) == 0)
{
return std::string(buffer);
}
return std::string();
#else
CARB_UNSUPPORTED_PLATFORM();
#endif
}
inline void setAffinity(NativeHandleType h, size_t mask)
{
#if CARB_PLATFORM_WINDOWS
::SetThreadAffinityMask(h, mask);
#elif CARB_PLATFORM_LINUX
// From the man page: The cpu_set_t data type is implemented as a bit mask. However, the data structure should be
// treated as opaque: all manipulation of the CPU sets should be done via the macros described in this page.
if (!mask)
return;
cpu_set_t cpuSet;
CPU_ZERO(&cpuSet);
static_assert(sizeof(cpuSet) >= sizeof(mask), "Invalid assumption: use CPU_ALLOC");
do
{
int bit = __builtin_ctzll(mask);
CPU_SET(bit, &cpuSet);
mask &= ~(size_t(1) << bit);
} while (mask != 0);
pthread_setaffinity_np(h, sizeof(cpu_set_t), &cpuSet);
#elif CARB_PLATFORM_MACOS
thread_affinity_policy policy{ static_cast<integer_t>(mask) };
thread_policy_set(pthread_mach_thread_np(h), THREAD_AFFINITY_POLICY, reinterpret_cast<thread_policy_t>(&policy),
THREAD_AFFINITY_POLICY_COUNT);
#else
CARB_UNSUPPORTED_PLATFORM();
#endif
}
inline bool setAffinity(NativeHandleType h, const CpuMaskVector& masks)
{
if (masks.empty())
{
return false;
}
#if CARB_PLATFORM_WINDOWS
// Find the lowest mask with a value set. That is the CPU Group that we'll set the affinity for.
for (uint64_t i = 0; i < masks.size(); ++i)
{
if (masks[i])
{
CARBWIN_GROUP_AFFINITY affinity{};
affinity.Group = (WORD)i;
affinity.Mask = masks[i];
return ::SetThreadGroupAffinity(h, (const GROUP_AFFINITY*)&affinity, nullptr);
}
}
// Would only reach here if no affinity mask had a cpu set.
return false;
#elif CARB_PLATFORM_LINUX
uint64_t numCpus = kCpusPerMask * masks.size();
cpu_set_t* cpuSet = CPU_ALLOC(numCpus);
if (!cpuSet)
{
return false;
}
CARB_SCOPE_EXIT
{
CPU_FREE(cpuSet);
};
CPU_ZERO_S(CPU_ALLOC_SIZE(numCpus), cpuSet);
for (uint64_t i = 0; i < masks.size(); ++i)
{
CpuMaskVector::value_type mask = masks[i];
while (mask != 0)
{
int bit = cpp::countr_zero(mask);
CPU_SET(bit + (i * kCpusPerMask), cpuSet);
mask &= ~(CpuMaskVector::value_type(1) << bit);
}
}
if (pthread_setaffinity_np(h, CPU_ALLOC_SIZE(numCpus), cpuSet) != 0)
{
return false;
}
else
{
return true;
}
#elif CARB_PLATFORM_MACOS
size_t mask = 0;
for (uint64_t i = 0; i < masks.size(); ++i)
{
mask |= 1ULL << masks[i];
}
setAffinity(h, mask);
return true;
#else
CARB_UNSUPPORTED_PLATFORM();
#endif
}
inline CpuMaskVector getAffinity(NativeHandleType h)
{
CpuMaskVector results;
#if CARB_PLATFORM_WINDOWS
CARBWIN_GROUP_AFFINITY affinity;
if (!::GetThreadGroupAffinity(h, (PGROUP_AFFINITY)&affinity))
{
return results;
}
results.resize(affinity.Group + 1, 0);
results.back() = affinity.Mask;
return results;
#elif CARB_PLATFORM_LINUX
// Get the current affinity
cpu_set_t cpuSet;
CPU_ZERO(&cpuSet);
if (pthread_getaffinity_np(h, sizeof(cpu_set_t), &cpuSet) != 0)
{
return results;
}
// Convert the cpu_set_t to a CpuMaskVector
results.reserve(sizeof(cpu_set_t) / sizeof(CpuMaskVector::value_type));
CpuMaskVector::value_type* ptr = reinterpret_cast<CpuMaskVector::value_type*>(&cpuSet);
for (uint64_t i = 0; i < (sizeof(cpu_set_t) / sizeof(CpuMaskVector::value_type)); i++)
{
results.push_back(ptr[i]);
}
return results;
#elif CARB_PLATFORM_MACOS
boolean_t def = false; // if the value retrieved was the default
mach_msg_type_number_t count = 0; // the length of the returned struct in integer_t
thread_affinity_policy policy{ 0 };
int res = thread_policy_get(
pthread_mach_thread_np(h), THREAD_AFFINITY_POLICY, reinterpret_cast<thread_policy_t>(&policy), &count, &def);
if (res != 0 || def)
{
return results;
}
for (uint64_t i = 0; i < (sizeof(policy.affinity_tag) * CHAR_BIT); i++)
{
if ((policy.affinity_tag & (1ULL << i)) != 0)
{
results.push_back(i);
}
}
return results;
#else
CARB_UNSUPPORTED_PLATFORM();
#endif
}
template <size_t PausesBeforeYield = 16>
class AtomicBackoff
{
public:
static constexpr size_t kPausesBeforeYield = PausesBeforeYield;
static_assert(carb::cpp::has_single_bit(kPausesBeforeYield), "Must be a power of 2");
constexpr AtomicBackoff() noexcept = default;
CARB_PREVENT_COPY_AND_MOVE(AtomicBackoff);
static void pauseLoop(size_t count) noexcept
{
while (count-- > 0)
CARB_HARDWARE_PAUSE();
}
void reset() noexcept
{
m_growth = 1;
}
void pause() noexcept
{
if (m_growth <= kPausesBeforeYield)
{
// Execute pauses
pauseLoop(m_growth);
// Pause twice as many times next time
m_growth *= 2;
}
else
{
// Too much contention; just yield to the OS
std::this_thread::yield();
}
}
bool pauseWithoutYield() noexcept
{
pauseLoop(m_growth);
if (m_growth >= kPausesBeforeYield)
return false;
// Pause twice as many times next time
m_growth *= 2;
return true;
}
private:
size_t m_growth{ 1 };
};
inline unsigned hardware_concurrency() noexcept
{
#if CARB_PLATFORM_LINUX
static auto dockerLimit = omni::extras::getDockerCpuLimit();
if (dockerLimit > 0)
{
return unsigned(dockerLimit);
}
#endif
return std::thread::hardware_concurrency();
}
} // namespace thread
namespace this_thread
{
inline void sleepForUs(uint32_t microseconds) noexcept
{
#if CARB_PLATFORM_WINDOWS
::Sleep(microseconds / 1000);
#elif CARB_POSIX
uint64_t nanos = uint64_t(microseconds) * 1'000;
struct timespec rem, req{ time_t(nanos / 1'000'000'000), long(nanos % 1'000'000'000) };
while (nanosleep(&req, &rem) != 0 && errno == EINTR)
req = rem; // Complete remaining sleep
#else
CARB_PLATFORM_UNSUPPORTED()
#endif
}
#ifndef DOXYGEN_SHOULD_SKIP_THIS
namespace detail
{
inline unsigned contentionSpins()
{
// These must be power-of-two-minus-one so that they function as bitmasks
constexpr static unsigned kSpinsMax = 128 - 1;
constexpr static unsigned kSpinsMin = 32 - 1;
// Use randomness to prevent threads from resonating at the same frequency and permanently contending. Use a
// simple LCG for randomness.
static std::atomic_uint _seed; // Use random initialization value as the starting seed
unsigned int next = _seed.load(std::memory_order_relaxed);
_seed.store(next * 1103515245 + 12345, std::memory_order_relaxed);
return ((next >> 24) & kSpinsMax) | kSpinsMin;
}
// This function name breaks naming paradigms so that it shows up prominently in stack traces. As the name implies, this
// function waits until f() returns true.
template <class Func>
void __CONTENDED_WAIT__(Func&& f) noexcept(noexcept(f()))
{
thread::AtomicBackoff<> backoff;
while (CARB_UNLIKELY(!f()))
backoff.pause();
}
# if CARB_PLATFORM_LINUX
using TidReaderFunc = thread::ThreadId (*)(void) noexcept;
inline thread::ThreadId staticOffsetTidReader() noexcept
{
constexpr static size_t kOffset = 180; // offsetof(struct pthread, tid) / sizeof(uint32_t)
// Should correspond to struct pthread's tid member. See nptl/descr.h in GLIBC source
return reinterpret_cast<const uint32_t*>(pthread_self())[kOffset];
}
inline thread::ThreadId syscallTidReader() noexcept
{
return (thread::ThreadId)(pid_t)syscall(SYS_gettid);
}
inline TidReaderFunc determineTidReader() noexcept
{
// The thread ID is stored internally within the pthread_t, but this is opaque and there is no public API for
// retrieving it. There is however a syscall for retrieving it, but this takes on average 700 ns per call on an
// Intel Core i9-7900X running kernel 5.15.0-92. Its offset however hasn't changed in over a decade between glibc
// versions 2.17 and 2.35, so we can read it with some caution. The memory backing the pthread_t is mmap'd and
// initialized to zeros, so it is highly improbable that we read a value that happens to be at the same location
// that randomly matches our thread ID.
// macOS has pthread_gettid_np(), and efforts to add it to glibc can be tracked here:
// https://sourceware.org/bugzilla/show_bug.cgi?id=27880
// Sanity-check the static TID reader, and if it matches, return it.
if (staticOffsetTidReader() == syscallTidReader())
return &staticOffsetTidReader;
// NOTE: We could dynamically try to find the TID within the pthread_t block, but this is problematic especially
// when running in docker containers where TID is usually a very small number.
return &syscallTidReader;
}
# endif
} // namespace detail
#endif
inline thread::NativeHandleType get()
{
#if CARB_PLATFORM_WINDOWS
return ::GetCurrentThread();
#elif CARB_POSIX
return pthread_self();
#else
CARB_UNSUPPORTED_PLATFORM();
#endif
}
CARB_DEPRECATED("Use this_process::getId() instead") static inline thread::ProcessId getProcessId()
{
return this_process::getId();
}
CARB_DEPRECATED("Use this_process::getIdCached() instead") static inline thread::ProcessId getProcessIdCached()
{
return this_process::getIdCached();
}
inline thread::ThreadId getId()
{
#if CARB_PLATFORM_WINDOWS
return thread::ThreadId(::GetCurrentThreadId());
#elif CARB_PLATFORM_LINUX
// Determine the best way to read the thread ID and use that.
// NOTE: We do not store this in a thread_local because on older versions of glibc (especially 2.17, which is what
// Centos7 uses), this will require a lock that is also shared with loading shared libraries, which can cause a
// deadlock.
static detail::TidReaderFunc reader = detail::determineTidReader();
return reader();
#elif CARB_PLATFORM_MACOS
return thread::ThreadId(pthread_mach_thread_np(pthread_self()));
#else
CARB_UNSUPPORTED_PLATFORM();
#endif
}
inline void setName(const char* name)
{
thread::setName(get(), name);
}
inline std::string getName()
{
return thread::getName(get());
}
inline void setAffinity(size_t mask)
{
thread::setAffinity(get(), mask);
}
inline bool setAffinity(const thread::CpuMaskVector& masks)
{
return thread::setAffinity(get(), masks);
}
inline thread::CpuMaskVector getAffinity()
{
return thread::getAffinity(get());
}
template <class Func>
void spinWait(Func&& f) noexcept(noexcept(f()))
{
while (!CARB_LIKELY(f()))
{
CARB_HARDWARE_PAUSE();
}
}
template <class Func>
void spinWaitWithBackoff(Func&& f) noexcept(noexcept(f()))
{
if (CARB_UNLIKELY(!f()))
{
detail::__CONTENDED_WAIT__(std::forward<Func>(f));
}
}
template <class Func>
bool spinTryWait(Func&& f) noexcept(noexcept(f()))
{
thread::AtomicBackoff<> backoff;
while (CARB_UNLIKELY(!f()))
if (!backoff.pauseWithoutYield()) // not really much of a backoff
return false;
return true;
}
template <class Func>
bool spinTryWaitWithBackoff(Func&& f) noexcept(noexcept(f()))
{
if (CARB_LIKELY(f()))
return true;
int count = 1;
for (; count < 32; count *= 2)
{
// Do a bunch of pause instructions
thread::AtomicBackoff<>::pauseLoop(count);
if (CARB_LIKELY(f()))
return true;
}
for (; count < 64; ++count)
{
// Do some yielding
std::this_thread::yield();
if (CARB_LIKELY(f()))
return true;
}
return false;
}
inline void atomic_fence_seq_cst() noexcept
{
#if CARB_X86_64 && CARB_COMPILER_GNUC && __GNUC__ < 11
// On x86_64 CPUs we can use any lock-prefixed instruction as a StoreLoad operation to achieve sequential
// consistency (see https://shipilev.net/blog/2014/on-the-fence-with-dependencies/). The 'notb' instruction here has
// the added benefit of not affecting flags or other registers (see https://www.felixcloutier.com/x86/not).
// It is also likely that our 'unused' variable at the top of the stack is in L1 cache.
unsigned char unused{};
__asm__ __volatile__("lock; notb %0" : "+m"(unused)::"memory");
#else
std::atomic_thread_fence(std::memory_order_seq_cst);
#endif
}
} // namespace this_thread
} // namespace carb