From fcc8014fcca663ae6073afd9ef4c3e272acdd4e1 Mon Sep 17 00:00:00 2001 From: Ilya Biryukov Date: Tue, 6 Feb 2018 15:53:42 +0000 Subject: [clangd] The new threading implementation Summary: In the new threading model clangd creates one thread per file to manage the AST and one thread to process each of the incoming requests. The number of actively running threads is bounded by the semaphore to avoid overloading the system. Reviewers: sammccall Reviewed By: sammccall Subscribers: klimek, mgorny, jkorous-apple, ioeric, hintonda, cfe-commits Differential Revision: https://reviews.llvm.org/D42573 --- clang-tools-extra/clangd/Threading.cpp | 97 +++++++++++++++++----------------- 1 file changed, 48 insertions(+), 49 deletions(-) (limited to 'clang-tools-extra/clangd/Threading.cpp') diff --git a/clang-tools-extra/clangd/Threading.cpp b/clang-tools-extra/clangd/Threading.cpp index 3c0c74bb803..94bb76c5f1f 100644 --- a/clang-tools-extra/clangd/Threading.cpp +++ b/clang-tools-extra/clangd/Threading.cpp @@ -1,63 +1,62 @@ #include "Threading.h" +#include "llvm/ADT/ScopeExit.h" #include "llvm/Support/FormatVariadic.h" #include "llvm/Support/Threading.h" +#include namespace clang { namespace clangd { -ThreadPool::ThreadPool(unsigned AsyncThreadsCount) - : RunSynchronously(AsyncThreadsCount == 0) { - if (RunSynchronously) { - // Don't start the worker thread if we're running synchronously - return; - } - Workers.reserve(AsyncThreadsCount); - for (unsigned I = 0; I < AsyncThreadsCount; ++I) { - Workers.push_back(std::thread([this, I]() { - llvm::set_thread_name(llvm::formatv("scheduler/{0}", I)); - while (true) { - UniqueFunction Request; - Context Ctx; - - // Pick request from the queue - { - std::unique_lock Lock(Mutex); - // Wait for more requests. - RequestCV.wait(Lock, - [this] { return !RequestQueue.empty() || Done; }); - if (RequestQueue.empty()) { - assert(Done); - return; - } - - // We process requests starting from the front of the queue. Users of - // ThreadPool have a way to prioritise their requests by putting - // them to the either side of the queue (using either addToEnd or - // addToFront). - std::tie(Request, Ctx) = std::move(RequestQueue.front()); - RequestQueue.pop_front(); - } // unlock Mutex - - WithContext WithCtx(std::move(Ctx)); - Request(); - } - })); - } +CancellationFlag::CancellationFlag() + : WasCancelled(std::make_shared>(false)) {} + +Semaphore::Semaphore(std::size_t MaxLocks) : FreeSlots(MaxLocks) {} + +void Semaphore::lock() { + std::unique_lock Lock(Mutex); + SlotsChanged.wait(Lock, [&]() { return FreeSlots > 0; }); + --FreeSlots; } -ThreadPool::~ThreadPool() { - if (RunSynchronously) - return; // no worker thread is running in that case +void Semaphore::unlock() { + std::unique_lock Lock(Mutex); + ++FreeSlots; + Lock.unlock(); + + SlotsChanged.notify_one(); +} +AsyncTaskRunner::~AsyncTaskRunner() { waitForAll(); } + +void AsyncTaskRunner::waitForAll() { + std::unique_lock Lock(Mutex); + TasksReachedZero.wait(Lock, [&]() { return InFlightTasks == 0; }); +} + +void AsyncTaskRunner::runAsync(UniqueFunction Action) { { - std::lock_guard Lock(Mutex); - // Wake up the worker thread - Done = true; - } // unlock Mutex - RequestCV.notify_all(); - - for (auto &Worker : Workers) - Worker.join(); + std::unique_lock Lock(Mutex); + ++InFlightTasks; + } + + auto CleanupTask = llvm::make_scope_exit([this]() { + std::unique_lock Lock(Mutex); + int NewTasksCnt = --InFlightTasks; + Lock.unlock(); + + if (NewTasksCnt == 0) + TasksReachedZero.notify_one(); + }); + + std::thread( + [](decltype(Action) Action, decltype(CleanupTask)) { + Action(); + // Make sure function stored by Action is destroyed before CleanupTask + // is run. + Action = nullptr; + }, + std::move(Action), std::move(CleanupTask)) + .detach(); } } // namespace clangd } // namespace clang -- cgit v1.2.3