diff options
Diffstat (limited to 'clang-tools-extra/clangd/Threading.cpp')
-rw-r--r-- | clang-tools-extra/clangd/Threading.cpp | 97 |
1 files changed, 48 insertions, 49 deletions
diff --git a/clang-tools-extra/clangd/Threading.cpp b/clang-tools-extra/clangd/Threading.cpp index 3c0c74bb803..94bb76c5f1f 100644 --- a/clang-tools-extra/clangd/Threading.cpp +++ b/clang-tools-extra/clangd/Threading.cpp @@ -1,63 +1,62 @@ #include "Threading.h" +#include "llvm/ADT/ScopeExit.h" #include "llvm/Support/FormatVariadic.h" #include "llvm/Support/Threading.h" +#include <thread> namespace clang { namespace clangd { -ThreadPool::ThreadPool(unsigned AsyncThreadsCount) - : RunSynchronously(AsyncThreadsCount == 0) { - if (RunSynchronously) { - // Don't start the worker thread if we're running synchronously - return; - } - Workers.reserve(AsyncThreadsCount); - for (unsigned I = 0; I < AsyncThreadsCount; ++I) { - Workers.push_back(std::thread([this, I]() { - llvm::set_thread_name(llvm::formatv("scheduler/{0}", I)); - while (true) { - UniqueFunction<void()> Request; - Context Ctx; - - // Pick request from the queue - { - std::unique_lock<std::mutex> Lock(Mutex); - // Wait for more requests. - RequestCV.wait(Lock, - [this] { return !RequestQueue.empty() || Done; }); - if (RequestQueue.empty()) { - assert(Done); - return; - } - - // We process requests starting from the front of the queue. Users of - // ThreadPool have a way to prioritise their requests by putting - // them to the either side of the queue (using either addToEnd or - // addToFront). - std::tie(Request, Ctx) = std::move(RequestQueue.front()); - RequestQueue.pop_front(); - } // unlock Mutex - - WithContext WithCtx(std::move(Ctx)); - Request(); - } - })); - } +CancellationFlag::CancellationFlag() + : WasCancelled(std::make_shared<std::atomic<bool>>(false)) {} + +Semaphore::Semaphore(std::size_t MaxLocks) : FreeSlots(MaxLocks) {} + +void Semaphore::lock() { + std::unique_lock<std::mutex> Lock(Mutex); + SlotsChanged.wait(Lock, [&]() { return FreeSlots > 0; }); + --FreeSlots; } -ThreadPool::~ThreadPool() { - if (RunSynchronously) - return; // no worker thread is running in that case +void Semaphore::unlock() { + std::unique_lock<std::mutex> Lock(Mutex); + ++FreeSlots; + Lock.unlock(); + + SlotsChanged.notify_one(); +} +AsyncTaskRunner::~AsyncTaskRunner() { waitForAll(); } + +void AsyncTaskRunner::waitForAll() { + std::unique_lock<std::mutex> Lock(Mutex); + TasksReachedZero.wait(Lock, [&]() { return InFlightTasks == 0; }); +} + +void AsyncTaskRunner::runAsync(UniqueFunction<void()> Action) { { - std::lock_guard<std::mutex> Lock(Mutex); - // Wake up the worker thread - Done = true; - } // unlock Mutex - RequestCV.notify_all(); - - for (auto &Worker : Workers) - Worker.join(); + std::unique_lock<std::mutex> Lock(Mutex); + ++InFlightTasks; + } + + auto CleanupTask = llvm::make_scope_exit([this]() { + std::unique_lock<std::mutex> Lock(Mutex); + int NewTasksCnt = --InFlightTasks; + Lock.unlock(); + + if (NewTasksCnt == 0) + TasksReachedZero.notify_one(); + }); + + std::thread( + [](decltype(Action) Action, decltype(CleanupTask)) { + Action(); + // Make sure function stored by Action is destroyed before CleanupTask + // is run. + Action = nullptr; + }, + std::move(Action), std::move(CleanupTask)) + .detach(); } } // namespace clangd } // namespace clang |