ThreadPoolUtils.h#
Fully qualified name: carb/tasking/ThreadPoolUtils.h
File members: carb/tasking/ThreadPoolUtils.h
// Copyright (c) 2019-2023, NVIDIA CORPORATION. All rights reserved.
//
// NVIDIA CORPORATION and its licensors retain all intellectual property
// and proprietary rights in and to this software, related documentation
// and any modifications thereto. Any use, reproduction, disclosure or
// distribution of this software and related documentation without an express
// license agreement from NVIDIA CORPORATION is strictly prohibited.
//
#pragma once
#include "../cpp/Tuple.h"
#include "../logging/Log.h"
#include "IThreadPool.h"
#include <future>
namespace carb
{
namespace tasking
{
#ifndef DOXYGEN_BUILD
namespace detail
{
template <class ReturnType>
struct ApplyWithPromise
{
template <class Callable, class Tuple>
void operator()(std::promise<ReturnType>& promise, Callable&& f, Tuple&& t)
{
promise.set_value(std::forward<ReturnType>(cpp::apply(std::forward<Callable>(f), std::forward<Tuple>(t))));
}
};
template <>
struct ApplyWithPromise<void>
{
template <class Callable, class Tuple>
void operator()(std::promise<void>& promise, Callable& f, Tuple&& t)
{
cpp::apply(std::forward<Callable>(f), std::forward<Tuple>(t));
promise.set_value();
}
};
} // namespace detail
#endif
class ThreadPoolWrapper
{
public:
ThreadPoolWrapper(IThreadPool* poolInterface, size_t workerCount = 0) : m_interface(poolInterface)
{
if (m_interface == nullptr)
{
CARB_LOG_ERROR("IThreadPool interface used to create a thread pool wrapper is null.");
return;
}
if (workerCount == 0)
{
workerCount = m_interface->getDefaultWorkerCount();
}
m_pool = m_interface->createEx(workerCount);
if (m_pool == nullptr)
{
CARB_LOG_ERROR("Couldn't create a new thread pool.");
}
}
size_t getWorkerCount() const
{
if (!isValid())
{
CARB_LOG_ERROR("Attempt to call the 'getWorkerCount' method of an invalid thread pool wrapper.");
return 0;
}
return m_interface->getWorkerCount(m_pool);
}
template <class Callable, class... Args>
auto enqueueJob(Callable&& task, Args&&... args)
{
using ReturnType = typename cpp::invoke_result_t<Callable, Args...>;
using Future = std::future<ReturnType>;
using Tuple = std::tuple<std::decay_t<Args>...>;
struct Data
{
std::promise<ReturnType> promise{};
Callable f;
Tuple args;
Data(Callable&& f_, Args&&... args_) : f(std::forward<Callable>(f_)), args(std::forward<Args>(args_)...)
{
}
void callAndDelete()
{
detail::ApplyWithPromise<ReturnType>{}(promise, f, args);
delete this;
}
};
if (!isValid())
{
CARB_LOG_ERROR("Attempt to call the 'enqueueJob' method of an invalid thread pool wrapper.");
return Future{};
}
Data* pData = new (std::nothrow) Data{ std::forward<Callable>(task), std::forward<Args>(args)... };
if (!pData)
{
CARB_LOG_ERROR("ThreadPoolWrapper: No memory for job");
return Future{};
}
Future result = pData->promise.get_future();
if (CARB_LIKELY(m_interface->enqueueJob(
m_pool, [](void* userData) { static_cast<Data*>(userData)->callAndDelete(); }, pData)))
{
return result;
}
CARB_LOG_ERROR("ThreadPoolWrapper: failed to enqueue job");
delete pData;
return Future{};
}
size_t getCurrentlyRunningJobCount() const
{
if (!isValid())
{
CARB_LOG_ERROR("Attempt to call the 'getCurrentlyRunningJobCount' method of an invalid thread pool wrapper.");
return 0;
}
return m_interface->getCurrentlyRunningJobCount(m_pool);
}
void waitUntilFinished() const
{
if (!isValid())
{
CARB_LOG_ERROR("Attempt to call the 'waitUntilFinished' method of an invalid thread pool wrapper.");
return;
}
m_interface->waitUntilFinished(m_pool);
}
bool isValid() const
{
return m_pool != nullptr;
}
~ThreadPoolWrapper()
{
if (isValid())
{
m_interface->destroy(m_pool);
}
}
CARB_PREVENT_COPY_AND_MOVE(ThreadPoolWrapper);
private:
// ThreadPoolWrapper private members and functions
IThreadPool* m_interface = nullptr;
ThreadPool* m_pool = nullptr;
};
} // namespace tasking
} // namespace carb