carb/thread/Util.h

File members: carb/thread/Util.h

// Copyright (c) 2019-2024, NVIDIA CORPORATION. All rights reserved.
//
// NVIDIA CORPORATION and its licensors retain all intellectual property
// and proprietary rights in and to this software, related documentation
// and any modifications thereto. Any use, reproduction, disclosure or
// distribution of this software and related documentation without an express
// license agreement from NVIDIA CORPORATION is strictly prohibited.
//

#pragma once

#include "../Defines.h"

#include "../extras/ScopeExit.h"
#include "../math/Util.h"
#include "../process/Util.h"
#include "../profiler/IProfiler.h"
#include "../../omni/extras/ContainerHelper.h"

#if CARB_PLATFORM_WINDOWS
#    include "../CarbWindows.h"
#    include "../extras/Unicode.h"
#elif CARB_POSIX
#    include <sys/syscall.h>

#    include <pthread.h>
#    include <sched.h>
#    include <unistd.h>
#    include <time.h>
#else
CARB_UNSUPPORTED_PLATFORM();
#endif

#if CARB_PLATFORM_MACOS
#    pragma push_macro("min")
#    pragma push_macro("max")
#    undef min
#    undef max
#    include <mach/thread_policy.h>
#    include <mach/thread_act.h>
#    pragma pop_macro("max")
#    pragma pop_macro("min")
#endif

#include <atomic>
#include <thread>

namespace carb
{

namespace thread
{

using ProcessId = process::ProcessId;

using ThreadId = uint32_t;

using CpuMaskVector = std::vector<uint64_t>;

constexpr uint64_t kCpusPerMask = std::numeric_limits<CpuMaskVector::value_type>::digits;

#if CARB_PLATFORM_WINDOWS
static_assert(sizeof(ThreadId) >= sizeof(DWORD), "ThreadId type is too small");
#elif CARB_POSIX
static_assert(sizeof(ThreadId) >= sizeof(pid_t), "ThreadId type is too small");
#else
CARB_UNSUPPORTED_PLATFORM();
#endif

#define OMNI_PRItid PRIu32

#define OMNI_PRIxtid PRIx32

#if CARB_PLATFORM_WINDOWS
#    ifndef DOXYGEN_SHOULD_SKIP_THIS
namespace detail
{
const DWORD MS_VC_EXCEPTION = 0x406D1388;
#        pragma pack(push, 8)
typedef struct tagTHREADNAME_INFO
{
    DWORD dwType;
    LPCSTR szName;
    DWORD dwThreadID;
    DWORD dwFlags;
} THREADNAME_INFO;
#        pragma pack(pop)

inline void setDebuggerThreadName(DWORD threadId, LPCSTR name)
{
    // Do it the old way, which is only really useful if the debugger is running
    if (::IsDebuggerPresent())
    {
        detail::THREADNAME_INFO info;
        info.dwType = 0x1000;
        info.szName = name;
        info.dwThreadID = threadId;
        info.dwFlags = 0;
#        pragma warning(push)
#        pragma warning(disable : 6320 6322)
        __try
        {
            ::RaiseException(detail::MS_VC_EXCEPTION, 0, sizeof(info) / sizeof(ULONG_PTR), (ULONG_PTR*)&info);
        }
        __except (CARBWIN_EXCEPTION_EXECUTE_HANDLER)
        {
        }
#        pragma warning(pop)
    }
}

} // namespace detail
#    endif

using NativeHandleType = HANDLE;
#elif CARB_POSIX
using NativeHandleType = pthread_t;

#else
CARB_UNSUPPORTED_PLATFORM();
#endif

inline void setName(NativeHandleType h, const char* name)
{
#if CARB_PLATFORM_WINDOWS
    // Emulate CARB_NAME_THREAD but don't include Profile.h which would create a circular dependency.
    if (auto profiler = g_carbProfiler.load(std::memory_order_acquire))
        profiler->nameThreadDynamic(::GetThreadId(h), "%s", name);
    // SetThreadDescription is only available starting with Windows 10 1607
    using PSetThreadDescription = HRESULT(CARBWIN_WINAPI*)(HANDLE, PCWSTR);
    static PSetThreadDescription SetThreadDescription =
        (PSetThreadDescription)::GetProcAddress(::GetModuleHandleW(L"kernel32.dll"), "SetThreadDescription");
    if (SetThreadDescription)
    {
        bool b = CARBWIN_SUCCEEDED(SetThreadDescription(h, extras::convertUtf8ToWide(name).c_str()));
        CARB_UNUSED(b);
        CARB_ASSERT(b);
    }
    else
    {
        detail::setDebuggerThreadName(::GetThreadId(h), name);
    }
#elif CARB_PLATFORM_LINUX
    if (h == pthread_self())
    {
        // Emulate CARB_NAME_THREAD but don't include Profile.h which would create a circular dependency.
        if (auto prof = g_carbProfiler.load(std::memory_order_acquire))
            prof->nameThreadDynamic(0, "%s", name);
    }
    if (pthread_setname_np(h, name) != 0)
    {
        // This is limited to 16 characters including NUL according to the man page.
        char buffer[16];
        strncpy(buffer, name, 15);
        buffer[15] = '\0';
        pthread_setname_np(h, buffer);
    }
#elif CARB_PLATFORM_MACOS
    if (h == pthread_self())
    {
        pthread_setname_np(name);
    }

    // not possible to name an external thread on mac
#else
    CARB_UNSUPPORTED_PLATFORM();
#endif
}

inline std::string getName(NativeHandleType h)
{
#if CARB_PLATFORM_WINDOWS
    // GetThreadDescription is only available starting with Windows 10 1607
    using PGetThreadDescription = HRESULT(CARBWIN_WINAPI*)(HANDLE, PWSTR*);
    static PGetThreadDescription GetThreadDescription =
        (PGetThreadDescription)::GetProcAddress(::GetModuleHandleW(L"kernel32.dll"), "GetThreadDescription");
    if (GetThreadDescription)
    {
        PWSTR threadName;
        if (CARBWIN_SUCCEEDED(GetThreadDescription(h, &threadName)))
        {
            std::string s = extras::convertWideToUtf8(threadName);
            ::LocalFree(threadName);
            return s;
        }
    }
    return std::string();
#elif CARB_PLATFORM_LINUX || CARB_PLATFORM_MACOS
    char buffer[64];
    if (pthread_getname_np(h, buffer, CARB_COUNTOF(buffer)) == 0)
    {
        return std::string(buffer);
    }
    return std::string();
#else
    CARB_UNSUPPORTED_PLATFORM();
#endif
}

inline void setAffinity(NativeHandleType h, size_t mask)
{
#if CARB_PLATFORM_WINDOWS
    ::SetThreadAffinityMask(h, mask);
#elif CARB_PLATFORM_LINUX
    // From the man page: The cpu_set_t data type is implemented as a bit mask. However, the data structure should be
    // treated as opaque: all manipulation of the CPU sets should be done via the macros described in this page.
    if (!mask)
        return;

    cpu_set_t cpuSet;
    CPU_ZERO(&cpuSet);
    static_assert(sizeof(cpuSet) >= sizeof(mask), "Invalid assumption: use CPU_ALLOC");

    do
    {
        int bit = __builtin_ctzll(mask);
        CPU_SET(bit, &cpuSet);
        mask &= ~(size_t(1) << bit);
    } while (mask != 0);

    pthread_setaffinity_np(h, sizeof(cpu_set_t), &cpuSet);
#elif CARB_PLATFORM_MACOS
    thread_affinity_policy policy{ static_cast<integer_t>(mask) };
    thread_policy_set(pthread_mach_thread_np(h), THREAD_AFFINITY_POLICY, reinterpret_cast<thread_policy_t>(&policy),
                      THREAD_AFFINITY_POLICY_COUNT);
#else
    CARB_UNSUPPORTED_PLATFORM();
#endif
}

inline bool setAffinity(NativeHandleType h, const CpuMaskVector& masks)
{
    if (masks.empty())
    {
        return false;
    }
#if CARB_PLATFORM_WINDOWS
    // Find the lowest mask with a value set. That is the CPU Group that we'll set the affinity for.
    for (uint64_t i = 0; i < masks.size(); ++i)
    {
        if (masks[i])
        {
            CARBWIN_GROUP_AFFINITY affinity{};
            affinity.Group = (WORD)i;
            affinity.Mask = masks[i];

            return ::SetThreadGroupAffinity(h, (const GROUP_AFFINITY*)&affinity, nullptr);
        }
    }

    // Would only reach here if no affinity mask had a cpu set.
    return false;
#elif CARB_PLATFORM_LINUX
    uint64_t numCpus = kCpusPerMask * masks.size();

    cpu_set_t* cpuSet = CPU_ALLOC(numCpus);
    if (!cpuSet)
    {
        return false;
    }

    CARB_SCOPE_EXIT
    {
        CPU_FREE(cpuSet);
    };

    CPU_ZERO_S(CPU_ALLOC_SIZE(numCpus), cpuSet);

    for (uint64_t i = 0; i < masks.size(); ++i)
    {
        CpuMaskVector::value_type mask = masks[i];
        while (mask != 0)
        {
            int bit = cpp::countr_zero(mask);
            CPU_SET(bit + (i * kCpusPerMask), cpuSet);
            mask &= ~(CpuMaskVector::value_type(1) << bit);
        }
    }

    if (pthread_setaffinity_np(h, CPU_ALLOC_SIZE(numCpus), cpuSet) != 0)
    {
        return false;
    }
    else
    {
        return true;
    }
#elif CARB_PLATFORM_MACOS
    size_t mask = 0;
    for (uint64_t i = 0; i < masks.size(); ++i)
    {
        mask |= 1ULL << masks[i];
    }

    setAffinity(h, mask);

    return true;
#else
    CARB_UNSUPPORTED_PLATFORM();
#endif
}

inline CpuMaskVector getAffinity(NativeHandleType h)
{
    CpuMaskVector results;
#if CARB_PLATFORM_WINDOWS
    CARBWIN_GROUP_AFFINITY affinity;
    if (!::GetThreadGroupAffinity(h, (PGROUP_AFFINITY)&affinity))
    {
        return results;
    }

    results.resize(affinity.Group + 1, 0);
    results.back() = affinity.Mask;

    return results;
#elif CARB_PLATFORM_LINUX
    // Get the current affinity
    cpu_set_t cpuSet;
    CPU_ZERO(&cpuSet);
    if (pthread_getaffinity_np(h, sizeof(cpu_set_t), &cpuSet) != 0)
    {
        return results;
    }

    // Convert the cpu_set_t to a CpuMaskVector
    results.reserve(sizeof(cpu_set_t) / sizeof(CpuMaskVector::value_type));
    CpuMaskVector::value_type* ptr = reinterpret_cast<CpuMaskVector::value_type*>(&cpuSet);
    for (uint64_t i = 0; i < (sizeof(cpu_set_t) / sizeof(CpuMaskVector::value_type)); i++)
    {
        results.push_back(ptr[i]);
    }

    return results;
#elif CARB_PLATFORM_MACOS
    boolean_t def = false; // if the value retrieved was the default
    mach_msg_type_number_t count = 0; // the length of the returned struct in integer_t
    thread_affinity_policy policy{ 0 };

    int res = thread_policy_get(
        pthread_mach_thread_np(h), THREAD_AFFINITY_POLICY, reinterpret_cast<thread_policy_t>(&policy), &count, &def);

    if (res != 0 || def)
    {
        return results;
    }

    for (uint64_t i = 0; i < (sizeof(policy.affinity_tag) * CHAR_BIT); i++)
    {
        if ((policy.affinity_tag & (1ULL << i)) != 0)
        {
            results.push_back(i);
        }
    }

    return results;
#else
    CARB_UNSUPPORTED_PLATFORM();
#endif
}

template <size_t PausesBeforeYield = 16>
class AtomicBackoff
{
public:
    static constexpr size_t kPausesBeforeYield = PausesBeforeYield;
    static_assert(carb::cpp::has_single_bit(kPausesBeforeYield), "Must be a power of 2");

    constexpr AtomicBackoff() noexcept = default;

    CARB_PREVENT_COPY_AND_MOVE(AtomicBackoff);

    static void pauseLoop(size_t count) noexcept
    {
        while (count-- > 0)
            CARB_HARDWARE_PAUSE();
    }

    void reset() noexcept
    {
        m_growth = 1;
    }

    void pause() noexcept
    {
        if (m_growth <= kPausesBeforeYield)
        {
            // Execute pauses
            pauseLoop(m_growth);
            // Pause twice as many times next time
            m_growth *= 2;
        }
        else
        {
            // Too much contention; just yield to the OS
            std::this_thread::yield();
        }
    }

    bool pauseWithoutYield() noexcept
    {
        pauseLoop(m_growth);
        if (m_growth >= kPausesBeforeYield)
            return false;
        // Pause twice as many times next time
        m_growth *= 2;
        return true;
    }

private:
    size_t m_growth{ 1 };
};

inline unsigned hardware_concurrency() noexcept
{
#if CARB_PLATFORM_LINUX
    static auto dockerLimit = omni::extras::getDockerCpuLimit();
    if (dockerLimit > 0)
    {
        return unsigned(dockerLimit);
    }
#endif
    return std::thread::hardware_concurrency();
}

} // namespace thread

namespace this_thread
{

inline void sleepForUs(uint32_t microseconds) noexcept
{
#if CARB_PLATFORM_WINDOWS
    ::Sleep(microseconds / 1000);
#elif CARB_POSIX
    uint64_t nanos = uint64_t(microseconds) * 1'000;
    struct timespec rem, req{ time_t(nanos / 1'000'000'000), long(nanos % 1'000'000'000) };
    while (nanosleep(&req, &rem) != 0 && errno == EINTR)
        req = rem; // Complete remaining sleep
#else
    CARB_PLATFORM_UNSUPPORTED()
#endif
}

#ifndef DOXYGEN_SHOULD_SKIP_THIS
namespace detail
{

inline unsigned contentionSpins()
{
    // These must be power-of-two-minus-one so that they function as bitmasks
    constexpr static unsigned kSpinsMax = 128 - 1;
    constexpr static unsigned kSpinsMin = 32 - 1;

    // Use randomness to prevent threads from resonating at the same frequency and permanently contending. Use a
    // simple LCG for randomness.
    static std::atomic_uint _seed; // Use random initialization value as the starting seed
    unsigned int next = _seed.load(std::memory_order_relaxed);
    _seed.store(next * 1103515245 + 12345, std::memory_order_relaxed);
    return ((next >> 24) & kSpinsMax) | kSpinsMin;
}

// This function name breaks naming paradigms so that it shows up prominently in stack traces. As the name implies, this
// function waits until f() returns true.
template <class Func>
void __CONTENDED_WAIT__(Func&& f) noexcept(noexcept(f()))
{
    thread::AtomicBackoff<> backoff;
    while (CARB_UNLIKELY(!f()))
        backoff.pause();
}

#    if CARB_PLATFORM_LINUX
using TidReaderFunc = thread::ThreadId (*)(void) noexcept;

inline thread::ThreadId staticOffsetTidReader() noexcept
{
    constexpr static size_t kOffset = 180; // offsetof(struct pthread, tid) / sizeof(uint32_t)

    // Should correspond to struct pthread's tid member. See nptl/descr.h in GLIBC source
    return reinterpret_cast<const uint32_t*>(pthread_self())[kOffset];
}

inline thread::ThreadId syscallTidReader() noexcept
{
    return (thread::ThreadId)(pid_t)syscall(SYS_gettid);
}

inline TidReaderFunc determineTidReader() noexcept
{
    // The thread ID is stored internally within the pthread_t, but this is opaque and there is no public API for
    // retrieving it. There is however a syscall for retrieving it, but this takes on average 700 ns per call on an
    // Intel Core i9-7900X running kernel 5.15.0-92. Its offset however hasn't changed in over a decade between glibc
    // versions 2.17 and 2.35, so we can read it with some caution. The memory backing the pthread_t is mmap'd and
    // initialized to zeros, so it is highly improbable that we read a value that happens to be at the same location
    // that randomly matches our thread ID.

    // macOS has pthread_gettid_np(), and efforts to add it to glibc can be tracked here:
    // https://sourceware.org/bugzilla/show_bug.cgi?id=27880

    // Sanity-check the static TID reader, and if it matches, return it.
    if (staticOffsetTidReader() == syscallTidReader())
        return &staticOffsetTidReader;

    // NOTE: We could dynamically try to find the TID within the pthread_t block, but this is problematic especially
    // when running in docker containers where TID is usually a very small number.
    return &syscallTidReader;
}
#    endif

} // namespace detail
#endif

inline thread::NativeHandleType get()
{
#if CARB_PLATFORM_WINDOWS
    return ::GetCurrentThread();
#elif CARB_POSIX
    return pthread_self();
#else
    CARB_UNSUPPORTED_PLATFORM();
#endif
}

CARB_DEPRECATED("Use this_process::getId() instead") static inline thread::ProcessId getProcessId()
{
    return this_process::getId();
}

CARB_DEPRECATED("Use this_process::getIdCached() instead") static inline thread::ProcessId getProcessIdCached()
{
    return this_process::getIdCached();
}

inline thread::ThreadId getId()
{
#if CARB_PLATFORM_WINDOWS
    return thread::ThreadId(::GetCurrentThreadId());
#elif CARB_PLATFORM_LINUX
    // Determine the best way to read the thread ID and use that.
    // NOTE: We do not store this in a thread_local because on older versions of glibc (especially 2.17, which is what
    // Centos7 uses), this will require a lock that is also shared with loading shared libraries, which can cause a
    // deadlock.
    static detail::TidReaderFunc reader = detail::determineTidReader();
    return reader();
#elif CARB_PLATFORM_MACOS
    return thread::ThreadId(pthread_mach_thread_np(pthread_self()));
#else
    CARB_UNSUPPORTED_PLATFORM();
#endif
}

inline void setName(const char* name)
{
    thread::setName(get(), name);
}

inline std::string getName()
{
    return thread::getName(get());
}

inline void setAffinity(size_t mask)
{
    thread::setAffinity(get(), mask);
}

inline bool setAffinity(const thread::CpuMaskVector& masks)
{
    return thread::setAffinity(get(), masks);
}

inline thread::CpuMaskVector getAffinity()
{
    return thread::getAffinity(get());
}

template <class Func>
void spinWait(Func&& f) noexcept(noexcept(f()))
{
    while (!CARB_LIKELY(f()))
    {
        CARB_HARDWARE_PAUSE();
    }
}

template <class Func>
void spinWaitWithBackoff(Func&& f) noexcept(noexcept(f()))
{
    if (CARB_UNLIKELY(!f()))
    {
        detail::__CONTENDED_WAIT__(std::forward<Func>(f));
    }
}

template <class Func>
bool spinTryWait(Func&& f) noexcept(noexcept(f()))
{
    thread::AtomicBackoff<> backoff;
    while (CARB_UNLIKELY(!f()))
        if (!backoff.pauseWithoutYield()) // not really much of a backoff
            return false;
    return true;
}

template <class Func>
bool spinTryWaitWithBackoff(Func&& f) noexcept(noexcept(f()))
{
    if (CARB_LIKELY(f()))
        return true;

    int count = 1;
    for (; count < 32; count *= 2)
    {
        // Do a bunch of pause instructions
        thread::AtomicBackoff<>::pauseLoop(count);
        if (CARB_LIKELY(f()))
            return true;
    }
    for (; count < 64; ++count)
    {
        // Do some yielding
        std::this_thread::yield();
        if (CARB_LIKELY(f()))
            return true;
    }
    return false;
}

inline void atomic_fence_seq_cst() noexcept
{
#if CARB_X86_64 && CARB_COMPILER_GNUC && __GNUC__ < 11
    // On x86_64 CPUs we can use any lock-prefixed instruction as a StoreLoad operation to achieve sequential
    // consistency (see https://shipilev.net/blog/2014/on-the-fence-with-dependencies/). The 'notb' instruction here has
    // the added benefit of not affecting flags or other registers (see https://www.felixcloutier.com/x86/not).
    // It is also likely that our 'unused' variable at the top of the stack is in L1 cache.
    unsigned char unused{};
    __asm__ __volatile__("lock; notb %0" : "+m"(unused)::"memory");
#else
    std::atomic_thread_fence(std::memory_order_seq_cst);
#endif
}

} // namespace this_thread
} // namespace carb