[ThreadPool] add ability to group tasks into separate groups
This is needed for parallelizing of loading modules symbols in LLDB (D122975). Currently LLDB can parallelize indexing symbols when loading a module, but modules are loaded sequentially. If LLDB index cache is enabled, this means that the cache loading is not parallelized, even though it could. However doing that creates a threadpool-within-threadpool situation, so the number of threads would not be properly limited. This change adds ThreadPoolTaskGroup as a simple type that can be used with ThreadPool calls to put tasks into groups that can be independently waited for (even recursively from within a task) but still run in the same thread pool. Differential Revision: https://reviews.llvm.org/D123225
This commit is contained in:
parent
764676b737
commit
8ef5710e63
|
@ -13,26 +13,42 @@
|
||||||
#ifndef LLVM_SUPPORT_THREADPOOL_H
|
#ifndef LLVM_SUPPORT_THREADPOOL_H
|
||||||
#define LLVM_SUPPORT_THREADPOOL_H
|
#define LLVM_SUPPORT_THREADPOOL_H
|
||||||
|
|
||||||
|
#include "llvm/ADT/DenseMap.h"
|
||||||
#include "llvm/Config/llvm-config.h"
|
#include "llvm/Config/llvm-config.h"
|
||||||
|
#include "llvm/Support/RWMutex.h"
|
||||||
#include "llvm/Support/Threading.h"
|
#include "llvm/Support/Threading.h"
|
||||||
#include "llvm/Support/thread.h"
|
#include "llvm/Support/thread.h"
|
||||||
|
|
||||||
#include <future>
|
#include <future>
|
||||||
|
|
||||||
#include <condition_variable>
|
#include <condition_variable>
|
||||||
|
#include <deque>
|
||||||
#include <functional>
|
#include <functional>
|
||||||
#include <memory>
|
#include <memory>
|
||||||
#include <mutex>
|
#include <mutex>
|
||||||
#include <queue>
|
|
||||||
#include <utility>
|
#include <utility>
|
||||||
|
|
||||||
namespace llvm {
|
namespace llvm {
|
||||||
|
|
||||||
|
class ThreadPoolTaskGroup;
|
||||||
|
|
||||||
/// A ThreadPool for asynchronous parallel execution on a defined number of
|
/// A ThreadPool for asynchronous parallel execution on a defined number of
|
||||||
/// threads.
|
/// threads.
|
||||||
///
|
///
|
||||||
/// The pool keeps a vector of threads alive, waiting on a condition variable
|
/// The pool keeps a vector of threads alive, waiting on a condition variable
|
||||||
/// for some work to become available.
|
/// for some work to become available.
|
||||||
|
///
|
||||||
|
/// It is possible to reuse one thread pool for different groups of tasks
|
||||||
|
/// by grouping tasks using ThreadPoolTaskGroup. All tasks are processed using
|
||||||
|
/// the same queue, but it is possible to wait only for a specific group of
|
||||||
|
/// tasks to finish.
|
||||||
|
///
|
||||||
|
/// It is also possible for worker threads to submit new tasks and wait for
|
||||||
|
/// them. Note that this may result in a deadlock in cases such as when a task
|
||||||
|
/// (directly or indirectly) tries to wait for its own completion, or when all
|
||||||
|
/// available threads are used up by tasks waiting for a task that has no thread
|
||||||
|
/// left to run on (this includes waiting on the returned future). It should be
|
||||||
|
/// generally safe to wait() for a group as long as groups do not form a cycle.
|
||||||
class ThreadPool {
|
class ThreadPool {
|
||||||
public:
|
public:
|
||||||
/// Construct a pool using the hardware strategy \p S for mapping hardware
|
/// Construct a pool using the hardware strategy \p S for mapping hardware
|
||||||
|
@ -47,23 +63,47 @@ public:
|
||||||
/// Asynchronous submission of a task to the pool. The returned future can be
|
/// Asynchronous submission of a task to the pool. The returned future can be
|
||||||
/// used to wait for the task to finish and is *non-blocking* on destruction.
|
/// used to wait for the task to finish and is *non-blocking* on destruction.
|
||||||
template <typename Function, typename... Args>
|
template <typename Function, typename... Args>
|
||||||
inline auto async(Function &&F, Args &&...ArgList) {
|
auto async(Function &&F, Args &&...ArgList) {
|
||||||
auto Task =
|
auto Task =
|
||||||
std::bind(std::forward<Function>(F), std::forward<Args>(ArgList)...);
|
std::bind(std::forward<Function>(F), std::forward<Args>(ArgList)...);
|
||||||
return async(std::move(Task));
|
return async(std::move(Task));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Overload, task will be in the given task group.
|
||||||
|
template <typename Function, typename... Args>
|
||||||
|
auto async(ThreadPoolTaskGroup &Group, Function &&F, Args &&...ArgList) {
|
||||||
|
auto Task =
|
||||||
|
std::bind(std::forward<Function>(F), std::forward<Args>(ArgList)...);
|
||||||
|
return async(Group, std::move(Task));
|
||||||
|
}
|
||||||
|
|
||||||
/// Asynchronous submission of a task to the pool. The returned future can be
|
/// Asynchronous submission of a task to the pool. The returned future can be
|
||||||
/// used to wait for the task to finish and is *non-blocking* on destruction.
|
/// used to wait for the task to finish and is *non-blocking* on destruction.
|
||||||
template <typename Func>
|
template <typename Func>
|
||||||
auto async(Func &&F) -> std::shared_future<decltype(F())> {
|
auto async(Func &&F) -> std::shared_future<decltype(F())> {
|
||||||
return asyncImpl(std::function<decltype(F())()>(std::forward<Func>(F)));
|
return asyncImpl(std::function<decltype(F())()>(std::forward<Func>(F)),
|
||||||
|
nullptr);
|
||||||
|
}
|
||||||
|
|
||||||
|
template <typename Func>
|
||||||
|
auto async(ThreadPoolTaskGroup &Group, Func &&F)
|
||||||
|
-> std::shared_future<decltype(F())> {
|
||||||
|
return asyncImpl(std::function<decltype(F())()>(std::forward<Func>(F)),
|
||||||
|
&Group);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Blocking wait for all the threads to complete and the queue to be empty.
|
/// Blocking wait for all the threads to complete and the queue to be empty.
|
||||||
/// It is an error to try to add new tasks while blocking on this call.
|
/// It is an error to try to add new tasks while blocking on this call.
|
||||||
|
/// Calling wait() from a task would deadlock waiting for itself.
|
||||||
void wait();
|
void wait();
|
||||||
|
|
||||||
|
/// Blocking wait for only all the threads in the given group to complete.
|
||||||
|
/// It is possible to wait even inside a task, but waiting (directly or
|
||||||
|
/// indirectly) on itself will deadlock. If called from a task running on a
|
||||||
|
/// worker thread, the call may process pending tasks while waiting in order
|
||||||
|
/// not to waste the thread.
|
||||||
|
void wait(ThreadPoolTaskGroup &Group);
|
||||||
|
|
||||||
// TODO: misleading legacy name warning!
|
// TODO: misleading legacy name warning!
|
||||||
// Returns the maximum number of worker threads in the pool, not the current
|
// Returns the maximum number of worker threads in the pool, not the current
|
||||||
// number of threads!
|
// number of threads!
|
||||||
|
@ -98,12 +138,15 @@ private:
|
||||||
std::move(F)};
|
std::move(F)};
|
||||||
}
|
}
|
||||||
|
|
||||||
bool workCompletedUnlocked() { return !ActiveThreads && Tasks.empty(); }
|
/// Returns true if all tasks in the given group have finished (nullptr means
|
||||||
|
/// all tasks regardless of their group). QueueLock must be locked.
|
||||||
|
bool workCompletedUnlocked(ThreadPoolTaskGroup *Group) const;
|
||||||
|
|
||||||
/// Asynchronous submission of a task to the pool. The returned future can be
|
/// Asynchronous submission of a task to the pool. The returned future can be
|
||||||
/// used to wait for the task to finish and is *non-blocking* on destruction.
|
/// used to wait for the task to finish and is *non-blocking* on destruction.
|
||||||
template <typename ResTy>
|
template <typename ResTy>
|
||||||
std::shared_future<ResTy> asyncImpl(std::function<ResTy()> Task) {
|
std::shared_future<ResTy> asyncImpl(std::function<ResTy()> Task,
|
||||||
|
ThreadPoolTaskGroup *Group) {
|
||||||
|
|
||||||
#if LLVM_ENABLE_THREADS
|
#if LLVM_ENABLE_THREADS
|
||||||
/// Wrap the Task in a std::function<void()> that sets the result of the
|
/// Wrap the Task in a std::function<void()> that sets the result of the
|
||||||
|
@ -117,7 +160,7 @@ private:
|
||||||
|
|
||||||
// Don't allow enqueueing after disabling the pool
|
// Don't allow enqueueing after disabling the pool
|
||||||
assert(EnableFlag && "Queuing a thread during ThreadPool destruction");
|
assert(EnableFlag && "Queuing a thread during ThreadPool destruction");
|
||||||
Tasks.push(std::move(R.first));
|
Tasks.emplace_back(std::make_pair(std::move(R.first), Group));
|
||||||
requestedThreads = ActiveThreads + Tasks.size();
|
requestedThreads = ActiveThreads + Tasks.size();
|
||||||
}
|
}
|
||||||
QueueCondition.notify_one();
|
QueueCondition.notify_one();
|
||||||
|
@ -130,7 +173,7 @@ private:
|
||||||
auto Future = std::async(std::launch::deferred, std::move(Task)).share();
|
auto Future = std::async(std::launch::deferred, std::move(Task)).share();
|
||||||
// Wrap the future so that both ThreadPool::wait() can operate and the
|
// Wrap the future so that both ThreadPool::wait() can operate and the
|
||||||
// returned future can be sync'ed on.
|
// returned future can be sync'ed on.
|
||||||
Tasks.push([Future]() { Future.get(); });
|
Tasks.emplace_back(std::make_pair([Future]() { Future.get(); }, Group));
|
||||||
return Future;
|
return Future;
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
@ -139,25 +182,29 @@ private:
|
||||||
// Grow to ensure that we have at least `requested` Threads, but do not go
|
// Grow to ensure that we have at least `requested` Threads, but do not go
|
||||||
// over MaxThreadCount.
|
// over MaxThreadCount.
|
||||||
void grow(int requested);
|
void grow(int requested);
|
||||||
|
|
||||||
|
void processTasks(ThreadPoolTaskGroup *WaitingForGroup);
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
/// Threads in flight
|
/// Threads in flight
|
||||||
std::vector<llvm::thread> Threads;
|
std::vector<llvm::thread> Threads;
|
||||||
/// Lock protecting access to the Threads vector.
|
/// Lock protecting access to the Threads vector.
|
||||||
mutable std::mutex ThreadsLock;
|
mutable llvm::sys::RWMutex ThreadsLock;
|
||||||
|
|
||||||
/// Tasks waiting for execution in the pool.
|
/// Tasks waiting for execution in the pool.
|
||||||
std::queue<std::function<void()>> Tasks;
|
std::deque<std::pair<std::function<void()>, ThreadPoolTaskGroup *>> Tasks;
|
||||||
|
|
||||||
/// Locking and signaling for accessing the Tasks queue.
|
/// Locking and signaling for accessing the Tasks queue.
|
||||||
std::mutex QueueLock;
|
std::mutex QueueLock;
|
||||||
std::condition_variable QueueCondition;
|
std::condition_variable QueueCondition;
|
||||||
|
|
||||||
/// Signaling for job completion
|
/// Signaling for job completion (all tasks or all tasks in a group).
|
||||||
std::condition_variable CompletionCondition;
|
std::condition_variable CompletionCondition;
|
||||||
|
|
||||||
/// Keep track of the number of thread actually busy
|
/// Keep track of the number of thread actually busy
|
||||||
unsigned ActiveThreads = 0;
|
unsigned ActiveThreads = 0;
|
||||||
|
/// Number of threads active for tasks in the given group (only non-zero).
|
||||||
|
DenseMap<ThreadPoolTaskGroup *, unsigned> ActiveGroups;
|
||||||
|
|
||||||
#if LLVM_ENABLE_THREADS // avoids warning for unused variable
|
#if LLVM_ENABLE_THREADS // avoids warning for unused variable
|
||||||
/// Signal for the destruction of the pool, asking thread to exit.
|
/// Signal for the destruction of the pool, asking thread to exit.
|
||||||
|
@ -169,6 +216,34 @@ private:
|
||||||
/// Maximum number of threads to potentially grow this pool to.
|
/// Maximum number of threads to potentially grow this pool to.
|
||||||
const unsigned MaxThreadCount;
|
const unsigned MaxThreadCount;
|
||||||
};
|
};
|
||||||
}
|
|
||||||
|
/// A group of tasks to be run on a thread pool. Thread pool tasks in different
|
||||||
|
/// groups can run on the same threadpool but can be waited for separately.
|
||||||
|
/// It is even possible for tasks of one group to submit and wait for tasks
|
||||||
|
/// of another group, as long as this does not form a loop.
|
||||||
|
class ThreadPoolTaskGroup {
|
||||||
|
public:
|
||||||
|
/// The ThreadPool argument is the thread pool to forward calls to.
|
||||||
|
ThreadPoolTaskGroup(ThreadPool &Pool) : Pool(Pool) {}
|
||||||
|
|
||||||
|
/// Blocking destructor: will wait for all the tasks in the group to complete
|
||||||
|
/// by calling ThreadPool::wait().
|
||||||
|
~ThreadPoolTaskGroup() { wait(); }
|
||||||
|
|
||||||
|
/// Calls ThreadPool::async() for this group.
|
||||||
|
template <typename Function, typename... Args>
|
||||||
|
inline auto async(Function &&F, Args &&...ArgList) {
|
||||||
|
return Pool.async(*this, std::forward<Function>(F),
|
||||||
|
std::forward<Args>(ArgList)...);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Calls ThreadPool::wait() for this group.
|
||||||
|
void wait() { Pool.wait(*this); }
|
||||||
|
|
||||||
|
private:
|
||||||
|
ThreadPool &Pool;
|
||||||
|
};
|
||||||
|
|
||||||
|
} // namespace llvm
|
||||||
|
|
||||||
#endif // LLVM_SUPPORT_THREADPOOL_H
|
#endif // LLVM_SUPPORT_THREADPOOL_H
|
||||||
|
|
|
@ -24,11 +24,19 @@ using namespace llvm;
|
||||||
|
|
||||||
#if LLVM_ENABLE_THREADS
|
#if LLVM_ENABLE_THREADS
|
||||||
|
|
||||||
|
// A note on thread groups: Tasks are by default in no group (represented
|
||||||
|
// by nullptr ThreadPoolTaskGroup pointer in the Tasks queue) and functionality
|
||||||
|
// here normally works on all tasks regardless of their group (functions
|
||||||
|
// in that case receive nullptr ThreadPoolTaskGroup pointer as argument).
|
||||||
|
// A task in a group has a pointer to that ThreadPoolTaskGroup in the Tasks
|
||||||
|
// queue, and functions called to work only on tasks from one group take that
|
||||||
|
// pointer.
|
||||||
|
|
||||||
ThreadPool::ThreadPool(ThreadPoolStrategy S)
|
ThreadPool::ThreadPool(ThreadPoolStrategy S)
|
||||||
: Strategy(S), MaxThreadCount(S.compute_thread_count()) {}
|
: Strategy(S), MaxThreadCount(S.compute_thread_count()) {}
|
||||||
|
|
||||||
void ThreadPool::grow(int requested) {
|
void ThreadPool::grow(int requested) {
|
||||||
std::unique_lock<std::mutex> LockGuard(ThreadsLock);
|
llvm::sys::ScopedWriter LockGuard(ThreadsLock);
|
||||||
if (Threads.size() >= MaxThreadCount)
|
if (Threads.size() >= MaxThreadCount)
|
||||||
return; // Already hit the max thread pool size.
|
return; // Already hit the max thread pool size.
|
||||||
int newThreadCount = std::min<int>(requested, MaxThreadCount);
|
int newThreadCount = std::min<int>(requested, MaxThreadCount);
|
||||||
|
@ -36,52 +44,125 @@ void ThreadPool::grow(int requested) {
|
||||||
int ThreadID = Threads.size();
|
int ThreadID = Threads.size();
|
||||||
Threads.emplace_back([this, ThreadID] {
|
Threads.emplace_back([this, ThreadID] {
|
||||||
Strategy.apply_thread_strategy(ThreadID);
|
Strategy.apply_thread_strategy(ThreadID);
|
||||||
while (true) {
|
processTasks(nullptr);
|
||||||
std::function<void()> Task;
|
|
||||||
{
|
|
||||||
std::unique_lock<std::mutex> LockGuard(QueueLock);
|
|
||||||
// Wait for tasks to be pushed in the queue
|
|
||||||
QueueCondition.wait(LockGuard,
|
|
||||||
[&] { return !EnableFlag || !Tasks.empty(); });
|
|
||||||
// Exit condition
|
|
||||||
if (!EnableFlag && Tasks.empty())
|
|
||||||
return;
|
|
||||||
// Yeah, we have a task, grab it and release the lock on the queue
|
|
||||||
|
|
||||||
// We first need to signal that we are active before popping the queue
|
|
||||||
// in order for wait() to properly detect that even if the queue is
|
|
||||||
// empty, there is still a task in flight.
|
|
||||||
++ActiveThreads;
|
|
||||||
Task = std::move(Tasks.front());
|
|
||||||
Tasks.pop();
|
|
||||||
}
|
|
||||||
// Run the task we just grabbed
|
|
||||||
Task();
|
|
||||||
|
|
||||||
bool Notify;
|
|
||||||
{
|
|
||||||
// Adjust `ActiveThreads`, in case someone waits on ThreadPool::wait()
|
|
||||||
std::lock_guard<std::mutex> LockGuard(QueueLock);
|
|
||||||
--ActiveThreads;
|
|
||||||
Notify = workCompletedUnlocked();
|
|
||||||
}
|
|
||||||
// Notify task completion if this is the last active thread, in case
|
|
||||||
// someone waits on ThreadPool::wait().
|
|
||||||
if (Notify)
|
|
||||||
CompletionCondition.notify_all();
|
|
||||||
}
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#ifndef NDEBUG
|
||||||
|
// The group of the tasks run by the current thread.
|
||||||
|
static LLVM_THREAD_LOCAL std::vector<ThreadPoolTaskGroup *>
|
||||||
|
*CurrentThreadTaskGroups = nullptr;
|
||||||
|
#endif
|
||||||
|
|
||||||
|
// WaitingForGroup == nullptr means all tasks regardless of their group.
|
||||||
|
void ThreadPool::processTasks(ThreadPoolTaskGroup *WaitingForGroup) {
|
||||||
|
while (true) {
|
||||||
|
std::function<void()> Task;
|
||||||
|
ThreadPoolTaskGroup *GroupOfTask;
|
||||||
|
{
|
||||||
|
std::unique_lock<std::mutex> LockGuard(QueueLock);
|
||||||
|
bool workCompletedForGroup = false; // Result of workCompletedUnlocked()
|
||||||
|
// Wait for tasks to be pushed in the queue
|
||||||
|
QueueCondition.wait(LockGuard, [&] {
|
||||||
|
return !EnableFlag || !Tasks.empty() ||
|
||||||
|
(WaitingForGroup != nullptr &&
|
||||||
|
(workCompletedForGroup =
|
||||||
|
workCompletedUnlocked(WaitingForGroup)));
|
||||||
|
});
|
||||||
|
// Exit condition
|
||||||
|
if (!EnableFlag && Tasks.empty())
|
||||||
|
return;
|
||||||
|
if (WaitingForGroup != nullptr && workCompletedForGroup)
|
||||||
|
return;
|
||||||
|
// Yeah, we have a task, grab it and release the lock on the queue
|
||||||
|
|
||||||
|
// We first need to signal that we are active before popping the queue
|
||||||
|
// in order for wait() to properly detect that even if the queue is
|
||||||
|
// empty, there is still a task in flight.
|
||||||
|
++ActiveThreads;
|
||||||
|
Task = std::move(Tasks.front().first);
|
||||||
|
GroupOfTask = Tasks.front().second;
|
||||||
|
// Need to count active threads in each group separately, ActiveThreads
|
||||||
|
// would never be 0 if waiting for another group inside a wait.
|
||||||
|
if (GroupOfTask != nullptr)
|
||||||
|
++ActiveGroups[GroupOfTask]; // Increment or set to 1 if new item
|
||||||
|
Tasks.pop_front();
|
||||||
|
}
|
||||||
|
#ifndef NDEBUG
|
||||||
|
if (CurrentThreadTaskGroups == nullptr)
|
||||||
|
CurrentThreadTaskGroups = new std::vector<ThreadPoolTaskGroup *>;
|
||||||
|
CurrentThreadTaskGroups->push_back(GroupOfTask);
|
||||||
|
#endif
|
||||||
|
|
||||||
|
// Run the task we just grabbed
|
||||||
|
Task();
|
||||||
|
|
||||||
|
#ifndef NDEBUG
|
||||||
|
CurrentThreadTaskGroups->pop_back();
|
||||||
|
#endif
|
||||||
|
|
||||||
|
bool Notify;
|
||||||
|
bool NotifyGroup;
|
||||||
|
{
|
||||||
|
// Adjust `ActiveThreads`, in case someone waits on ThreadPool::wait()
|
||||||
|
std::lock_guard<std::mutex> LockGuard(QueueLock);
|
||||||
|
--ActiveThreads;
|
||||||
|
if (GroupOfTask != nullptr) {
|
||||||
|
auto A = ActiveGroups.find(GroupOfTask);
|
||||||
|
if (--(A->second) == 0)
|
||||||
|
ActiveGroups.erase(A);
|
||||||
|
}
|
||||||
|
Notify = workCompletedUnlocked(GroupOfTask);
|
||||||
|
NotifyGroup = GroupOfTask != nullptr && Notify;
|
||||||
|
}
|
||||||
|
// Notify task completion if this is the last active thread, in case
|
||||||
|
// someone waits on ThreadPool::wait().
|
||||||
|
if (Notify)
|
||||||
|
CompletionCondition.notify_all();
|
||||||
|
// If this was a task in a group, notify also threads waiting for tasks
|
||||||
|
// in this function on QueueCondition, to make a recursive wait() return
|
||||||
|
// after the group it's been waiting for has finished.
|
||||||
|
if (NotifyGroup)
|
||||||
|
QueueCondition.notify_all();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
bool ThreadPool::workCompletedUnlocked(ThreadPoolTaskGroup *Group) const {
|
||||||
|
if (Group == nullptr)
|
||||||
|
return !ActiveThreads && Tasks.empty();
|
||||||
|
return ActiveGroups.count(Group) == 0 &&
|
||||||
|
!llvm::any_of(Tasks,
|
||||||
|
[Group](const auto &T) { return T.second == Group; });
|
||||||
|
}
|
||||||
|
|
||||||
void ThreadPool::wait() {
|
void ThreadPool::wait() {
|
||||||
|
assert(!isWorkerThread()); // Would deadlock waiting for itself.
|
||||||
// Wait for all threads to complete and the queue to be empty
|
// Wait for all threads to complete and the queue to be empty
|
||||||
std::unique_lock<std::mutex> LockGuard(QueueLock);
|
std::unique_lock<std::mutex> LockGuard(QueueLock);
|
||||||
CompletionCondition.wait(LockGuard, [&] { return workCompletedUnlocked(); });
|
CompletionCondition.wait(LockGuard,
|
||||||
|
[&] { return workCompletedUnlocked(nullptr); });
|
||||||
|
}
|
||||||
|
|
||||||
|
void ThreadPool::wait(ThreadPoolTaskGroup &Group) {
|
||||||
|
// Wait for all threads in the group to complete.
|
||||||
|
if (!isWorkerThread()) {
|
||||||
|
std::unique_lock<std::mutex> LockGuard(QueueLock);
|
||||||
|
CompletionCondition.wait(LockGuard,
|
||||||
|
[&] { return workCompletedUnlocked(&Group); });
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
// Make sure to not deadlock waiting for oneself.
|
||||||
|
assert(CurrentThreadTaskGroups == nullptr ||
|
||||||
|
!llvm::is_contained(*CurrentThreadTaskGroups, &Group));
|
||||||
|
// Handle the case of recursive call from another task in a different group,
|
||||||
|
// in which case process tasks while waiting to keep the thread busy and avoid
|
||||||
|
// possible deadlock.
|
||||||
|
processTasks(&Group);
|
||||||
}
|
}
|
||||||
|
|
||||||
bool ThreadPool::isWorkerThread() const {
|
bool ThreadPool::isWorkerThread() const {
|
||||||
std::unique_lock<std::mutex> LockGuard(ThreadsLock);
|
llvm::sys::ScopedReader LockGuard(ThreadsLock);
|
||||||
llvm::thread::id CurrentThreadId = llvm::this_thread::get_id();
|
llvm::thread::id CurrentThreadId = llvm::this_thread::get_id();
|
||||||
for (const llvm::thread &Thread : Threads)
|
for (const llvm::thread &Thread : Threads)
|
||||||
if (CurrentThreadId == Thread.get_id())
|
if (CurrentThreadId == Thread.get_id())
|
||||||
|
@ -96,7 +177,7 @@ ThreadPool::~ThreadPool() {
|
||||||
EnableFlag = false;
|
EnableFlag = false;
|
||||||
}
|
}
|
||||||
QueueCondition.notify_all();
|
QueueCondition.notify_all();
|
||||||
std::unique_lock<std::mutex> LockGuard(ThreadsLock);
|
llvm::sys::ScopedReader LockGuard(ThreadsLock);
|
||||||
for (auto &Worker : Threads)
|
for (auto &Worker : Threads)
|
||||||
Worker.join();
|
Worker.join();
|
||||||
}
|
}
|
||||||
|
@ -115,12 +196,18 @@ ThreadPool::ThreadPool(ThreadPoolStrategy S) : MaxThreadCount(1) {
|
||||||
void ThreadPool::wait() {
|
void ThreadPool::wait() {
|
||||||
// Sequential implementation running the tasks
|
// Sequential implementation running the tasks
|
||||||
while (!Tasks.empty()) {
|
while (!Tasks.empty()) {
|
||||||
auto Task = std::move(Tasks.front());
|
auto Task = std::move(Tasks.front().first);
|
||||||
Tasks.pop();
|
Tasks.pop_front();
|
||||||
Task();
|
Task();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void ThreadPool::wait(ThreadPoolTaskGroup &) {
|
||||||
|
// Simply wait for all, this works even if recursive (the running task
|
||||||
|
// is already removed from the queue).
|
||||||
|
wait();
|
||||||
|
}
|
||||||
|
|
||||||
bool ThreadPool::isWorkerThread() const {
|
bool ThreadPool::isWorkerThread() const {
|
||||||
report_fatal_error("LLVM compiled without multithreading");
|
report_fatal_error("LLVM compiled without multithreading");
|
||||||
}
|
}
|
||||||
|
|
|
@ -38,6 +38,7 @@
|
||||||
#include "llvm/Support/WithColor.h"
|
#include "llvm/Support/WithColor.h"
|
||||||
#include "llvm/Support/raw_ostream.h"
|
#include "llvm/Support/raw_ostream.h"
|
||||||
#include <algorithm>
|
#include <algorithm>
|
||||||
|
#include <queue>
|
||||||
|
|
||||||
using namespace llvm;
|
using namespace llvm;
|
||||||
|
|
||||||
|
|
|
@ -18,6 +18,9 @@
|
||||||
#include "llvm/Support/TargetSelect.h"
|
#include "llvm/Support/TargetSelect.h"
|
||||||
#include "llvm/Support/Threading.h"
|
#include "llvm/Support/Threading.h"
|
||||||
|
|
||||||
|
#include <chrono>
|
||||||
|
#include <thread>
|
||||||
|
|
||||||
#include "gtest/gtest.h"
|
#include "gtest/gtest.h"
|
||||||
|
|
||||||
using namespace llvm;
|
using namespace llvm;
|
||||||
|
@ -29,6 +32,7 @@ class ThreadPoolTest : public testing::Test {
|
||||||
SmallVector<Triple::ArchType, 4> UnsupportedArchs;
|
SmallVector<Triple::ArchType, 4> UnsupportedArchs;
|
||||||
SmallVector<Triple::OSType, 4> UnsupportedOSs;
|
SmallVector<Triple::OSType, 4> UnsupportedOSs;
|
||||||
SmallVector<Triple::EnvironmentType, 1> UnsupportedEnvironments;
|
SmallVector<Triple::EnvironmentType, 1> UnsupportedEnvironments;
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
// This is intended for platform as a temporary "XFAIL"
|
// This is intended for platform as a temporary "XFAIL"
|
||||||
bool isUnsupportedOSOrEnvironment() {
|
bool isUnsupportedOSOrEnvironment() {
|
||||||
|
@ -57,27 +61,45 @@ protected:
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Make sure this thread not progress faster than the main thread.
|
/// Make sure this thread not progress faster than the main thread.
|
||||||
void waitForMainThread() {
|
void waitForMainThread() { waitForPhase(1); }
|
||||||
std::unique_lock<std::mutex> LockGuard(WaitMainThreadMutex);
|
|
||||||
WaitMainThread.wait(LockGuard, [&] { return MainThreadReady; });
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Set the readiness of the main thread.
|
/// Set the readiness of the main thread.
|
||||||
void setMainThreadReady() {
|
void setMainThreadReady() { setPhase(1); }
|
||||||
|
|
||||||
|
/// Wait until given phase is set using setPhase(); first "main" phase is 1.
|
||||||
|
/// See also PhaseResetHelper below.
|
||||||
|
void waitForPhase(int Phase) {
|
||||||
|
std::unique_lock<std::mutex> LockGuard(CurrentPhaseMutex);
|
||||||
|
CurrentPhaseCondition.wait(
|
||||||
|
LockGuard, [&] { return CurrentPhase == Phase || CurrentPhase < 0; });
|
||||||
|
}
|
||||||
|
/// If a thread waits on another phase, the test could bail out on a failed
|
||||||
|
/// assertion and ThreadPool destructor would wait() on all threads, which
|
||||||
|
/// would deadlock on the task waiting. Create this helper to automatically
|
||||||
|
/// reset the phase and unblock such threads.
|
||||||
|
struct PhaseResetHelper {
|
||||||
|
PhaseResetHelper(ThreadPoolTest *test) : test(test) {}
|
||||||
|
~PhaseResetHelper() { test->setPhase(-1); }
|
||||||
|
ThreadPoolTest *test;
|
||||||
|
};
|
||||||
|
|
||||||
|
/// Advance to the given phase.
|
||||||
|
void setPhase(int Phase) {
|
||||||
{
|
{
|
||||||
std::unique_lock<std::mutex> LockGuard(WaitMainThreadMutex);
|
std::unique_lock<std::mutex> LockGuard(CurrentPhaseMutex);
|
||||||
MainThreadReady = true;
|
assert(Phase == CurrentPhase + 1 || Phase < 0);
|
||||||
|
CurrentPhase = Phase;
|
||||||
}
|
}
|
||||||
WaitMainThread.notify_all();
|
CurrentPhaseCondition.notify_all();
|
||||||
}
|
}
|
||||||
|
|
||||||
void SetUp() override { MainThreadReady = false; }
|
void SetUp() override { CurrentPhase = 0; }
|
||||||
|
|
||||||
std::vector<llvm::BitVector> RunOnAllSockets(ThreadPoolStrategy S);
|
std::vector<llvm::BitVector> RunOnAllSockets(ThreadPoolStrategy S);
|
||||||
|
|
||||||
std::condition_variable WaitMainThread;
|
std::condition_variable CurrentPhaseCondition;
|
||||||
std::mutex WaitMainThreadMutex;
|
std::mutex CurrentPhaseMutex;
|
||||||
bool MainThreadReady = false;
|
int CurrentPhase; // -1 = error, 0 = setup, 1 = ready, 2+ = custom
|
||||||
};
|
};
|
||||||
|
|
||||||
#define CHECK_UNSUPPORTED() \
|
#define CHECK_UNSUPPORTED() \
|
||||||
|
@ -194,6 +216,125 @@ TEST_F(ThreadPoolTest, PoolDestruction) {
|
||||||
ASSERT_EQ(5, checked_in);
|
ASSERT_EQ(5, checked_in);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Check running tasks in different groups.
|
||||||
|
TEST_F(ThreadPoolTest, Groups) {
|
||||||
|
CHECK_UNSUPPORTED();
|
||||||
|
// Need at least two threads, as the task in group2
|
||||||
|
// might block a thread until all tasks in group1 finish.
|
||||||
|
ThreadPoolStrategy S = hardware_concurrency(2);
|
||||||
|
if (S.compute_thread_count() < 2)
|
||||||
|
return;
|
||||||
|
ThreadPool Pool(S);
|
||||||
|
PhaseResetHelper Helper(this);
|
||||||
|
ThreadPoolTaskGroup Group1(Pool);
|
||||||
|
ThreadPoolTaskGroup Group2(Pool);
|
||||||
|
|
||||||
|
// Check that waiting for an empty group is a no-op.
|
||||||
|
Group1.wait();
|
||||||
|
|
||||||
|
std::atomic_int checked_in1{0};
|
||||||
|
std::atomic_int checked_in2{0};
|
||||||
|
|
||||||
|
for (size_t i = 0; i < 5; ++i) {
|
||||||
|
Group1.async([this, &checked_in1] {
|
||||||
|
waitForMainThread();
|
||||||
|
++checked_in1;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
Group2.async([this, &checked_in2] {
|
||||||
|
waitForPhase(2);
|
||||||
|
++checked_in2;
|
||||||
|
});
|
||||||
|
ASSERT_EQ(0, checked_in1);
|
||||||
|
ASSERT_EQ(0, checked_in2);
|
||||||
|
// Start first group and wait for it.
|
||||||
|
setMainThreadReady();
|
||||||
|
Group1.wait();
|
||||||
|
ASSERT_EQ(5, checked_in1);
|
||||||
|
// Second group has not yet finished, start it and wait for it.
|
||||||
|
ASSERT_EQ(0, checked_in2);
|
||||||
|
setPhase(2);
|
||||||
|
Group2.wait();
|
||||||
|
ASSERT_EQ(5, checked_in1);
|
||||||
|
ASSERT_EQ(1, checked_in2);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check recursive tasks.
|
||||||
|
TEST_F(ThreadPoolTest, RecursiveGroups) {
|
||||||
|
CHECK_UNSUPPORTED();
|
||||||
|
ThreadPool Pool;
|
||||||
|
ThreadPoolTaskGroup Group(Pool);
|
||||||
|
|
||||||
|
std::atomic_int checked_in1{0};
|
||||||
|
|
||||||
|
for (size_t i = 0; i < 5; ++i) {
|
||||||
|
Group.async([this, &Pool, &checked_in1] {
|
||||||
|
waitForMainThread();
|
||||||
|
|
||||||
|
ThreadPoolTaskGroup LocalGroup(Pool);
|
||||||
|
|
||||||
|
// Check that waiting for an empty group is a no-op.
|
||||||
|
LocalGroup.wait();
|
||||||
|
|
||||||
|
std::atomic_int checked_in2{0};
|
||||||
|
for (size_t i = 0; i < 5; ++i) {
|
||||||
|
LocalGroup.async([&checked_in2] { ++checked_in2; });
|
||||||
|
}
|
||||||
|
LocalGroup.wait();
|
||||||
|
ASSERT_EQ(5, checked_in2);
|
||||||
|
|
||||||
|
++checked_in1;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
ASSERT_EQ(0, checked_in1);
|
||||||
|
setMainThreadReady();
|
||||||
|
Group.wait();
|
||||||
|
ASSERT_EQ(5, checked_in1);
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_F(ThreadPoolTest, RecursiveWaitDeadlock) {
|
||||||
|
CHECK_UNSUPPORTED();
|
||||||
|
ThreadPoolStrategy S = hardware_concurrency(2);
|
||||||
|
if (S.compute_thread_count() < 2)
|
||||||
|
return;
|
||||||
|
ThreadPool Pool(S);
|
||||||
|
PhaseResetHelper Helper(this);
|
||||||
|
ThreadPoolTaskGroup Group(Pool);
|
||||||
|
|
||||||
|
// Test that a thread calling wait() for a group and is waiting for more tasks
|
||||||
|
// returns when the last task finishes in a different thread while the waiting
|
||||||
|
// thread was waiting for more tasks to process while waiting.
|
||||||
|
|
||||||
|
// Task A runs in the first thread. It finishes and leaves
|
||||||
|
// the background thread waiting for more tasks.
|
||||||
|
Group.async([this] {
|
||||||
|
waitForMainThread();
|
||||||
|
setPhase(2);
|
||||||
|
});
|
||||||
|
// Task B is run in a second thread, it launches yet another
|
||||||
|
// task C in a different group, which will be handled by the waiting
|
||||||
|
// thread started above.
|
||||||
|
Group.async([this, &Pool] {
|
||||||
|
waitForPhase(2);
|
||||||
|
ThreadPoolTaskGroup LocalGroup(Pool);
|
||||||
|
LocalGroup.async([this] {
|
||||||
|
waitForPhase(3);
|
||||||
|
// Give the other thread enough time to check that there's no task
|
||||||
|
// to process and suspend waiting for a notification. This is indeed racy,
|
||||||
|
// but probably the best that can be done.
|
||||||
|
std::this_thread::sleep_for(std::chrono::milliseconds(10));
|
||||||
|
});
|
||||||
|
// And task B only now will wait for the tasks in the group (=task C)
|
||||||
|
// to finish. This test checks that it does not deadlock. If the
|
||||||
|
// `NotifyGroup` handling in ThreadPool::processTasks() didn't take place,
|
||||||
|
// this task B would be stuck waiting for tasks to arrive.
|
||||||
|
setPhase(3);
|
||||||
|
LocalGroup.wait();
|
||||||
|
});
|
||||||
|
setMainThreadReady();
|
||||||
|
Group.wait();
|
||||||
|
}
|
||||||
|
|
||||||
#if LLVM_ENABLE_THREADS == 1
|
#if LLVM_ENABLE_THREADS == 1
|
||||||
|
|
||||||
// FIXME: Skip some tests below on non-Windows because multi-socket systems
|
// FIXME: Skip some tests below on non-Windows because multi-socket systems
|
||||||
|
|
Loading…
Reference in New Issue