summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--clang-tools-extra/clangd/CMakeLists.txt1
-rw-r--r--clang-tools-extra/clangd/ClangdServer.h1
-rw-r--r--clang-tools-extra/clangd/ClangdUnit.h4
-rw-r--r--clang-tools-extra/clangd/ClangdUnitStore.cpp37
-rw-r--r--clang-tools-extra/clangd/ClangdUnitStore.h73
-rw-r--r--clang-tools-extra/clangd/TUScheduler.cpp418
-rw-r--r--clang-tools-extra/clangd/TUScheduler.h17
-rw-r--r--clang-tools-extra/clangd/Threading.cpp97
-rw-r--r--clang-tools-extra/clangd/Threading.h99
-rw-r--r--clang-tools-extra/unittests/clangd/CMakeLists.txt1
-rw-r--r--clang-tools-extra/unittests/clangd/ThreadingTests.cpp61
11 files changed, 529 insertions, 280 deletions
diff --git a/clang-tools-extra/clangd/CMakeLists.txt b/clang-tools-extra/clangd/CMakeLists.txt
index 9c424391dd9..67530270ebb 100644
--- a/clang-tools-extra/clangd/CMakeLists.txt
+++ b/clang-tools-extra/clangd/CMakeLists.txt
@@ -6,7 +6,6 @@ add_clang_library(clangDaemon
ClangdLSPServer.cpp
ClangdServer.cpp
ClangdUnit.cpp
- ClangdUnitStore.cpp
CodeComplete.cpp
CodeCompletionStrings.cpp
CompileArgsCache.cpp
diff --git a/clang-tools-extra/clangd/ClangdServer.h b/clang-tools-extra/clangd/ClangdServer.h
index 79dcf278457..fffd46f15cb 100644
--- a/clang-tools-extra/clangd/ClangdServer.h
+++ b/clang-tools-extra/clangd/ClangdServer.h
@@ -11,7 +11,6 @@
#define LLVM_CLANG_TOOLS_EXTRA_CLANGD_CLANGDSERVER_H
#include "ClangdUnit.h"
-#include "ClangdUnitStore.h"
#include "CodeComplete.h"
#include "CompileArgsCache.h"
#include "DraftStore.h"
diff --git a/clang-tools-extra/clangd/ClangdUnit.h b/clang-tools-extra/clangd/ClangdUnit.h
index bf1aced1009..12228bcfc7c 100644
--- a/clang-tools-extra/clangd/ClangdUnit.h
+++ b/clang-tools-extra/clangd/ClangdUnit.h
@@ -151,6 +151,8 @@ using ASTParsedCallback = std::function<void(PathRef Path, ParsedAST *)>;
/// Manages resources, required by clangd. Allows to rebuild file with new
/// contents, and provides AST and Preamble for it.
+/// NOTE: Threading-related bits of CppFile are now deprecated and will be
+/// removed soon.
class CppFile : public std::enable_shared_from_this<CppFile> {
public:
// We only allow to create CppFile as shared_ptr, because a future returned by
@@ -178,6 +180,7 @@ public:
/// that will wait for any ongoing rebuilds to finish and actually set the AST
/// and Preamble to nulls. It can be run on a different thread. This function
/// is useful to cancel ongoing rebuilds, if any, before removing CppFile.
+ /// DEPRECATED. This function will be removed soon, please do not use it.
UniqueFunction<void()> deferCancelRebuild();
/// Rebuild AST and Preamble synchronously on the calling thread.
@@ -200,6 +203,7 @@ public:
/// The future to finish rebuild returns a list of diagnostics built during
/// reparse, or None, if another deferRebuild was called before this
/// rebuild was finished.
+ /// DEPRECATED. This function will be removed soon, please do not use it.
UniqueFunction<llvm::Optional<std::vector<DiagWithFixIts>>()>
deferRebuild(ParseInputs &&Inputs);
diff --git a/clang-tools-extra/clangd/ClangdUnitStore.cpp b/clang-tools-extra/clangd/ClangdUnitStore.cpp
deleted file mode 100644
index bc2479d669e..00000000000
--- a/clang-tools-extra/clangd/ClangdUnitStore.cpp
+++ /dev/null
@@ -1,37 +0,0 @@
-//===--- ClangdUnitStore.cpp - A ClangdUnits container -----------*-C++-*-===//
-//
-// The LLVM Compiler Infrastructure
-//
-// This file is distributed under the University of Illinois Open Source
-// License. See LICENSE.TXT for details.
-//
-//===----------------------------------------------------------------------===//
-
-#include "ClangdUnitStore.h"
-#include "llvm/Support/Path.h"
-#include <algorithm>
-
-using namespace clang::clangd;
-using namespace clang;
-
-std::shared_ptr<CppFile> CppFileCollection::removeIfPresent(PathRef File) {
- std::lock_guard<std::mutex> Lock(Mutex);
-
- auto It = OpenedFiles.find(File);
- if (It == OpenedFiles.end())
- return nullptr;
-
- std::shared_ptr<CppFile> Result = It->second;
- OpenedFiles.erase(It);
- return Result;
-}
-std::vector<std::pair<Path, std::size_t>>
-CppFileCollection::getUsedBytesPerFile() const {
- std::lock_guard<std::mutex> Lock(Mutex);
- std::vector<std::pair<Path, std::size_t>> Result;
- Result.reserve(OpenedFiles.size());
- for (auto &&PathAndFile : OpenedFiles)
- Result.push_back(
- {PathAndFile.first().str(), PathAndFile.second->getUsedBytes()});
- return Result;
-}
diff --git a/clang-tools-extra/clangd/ClangdUnitStore.h b/clang-tools-extra/clangd/ClangdUnitStore.h
deleted file mode 100644
index 6ec03023299..00000000000
--- a/clang-tools-extra/clangd/ClangdUnitStore.h
+++ /dev/null
@@ -1,73 +0,0 @@
-//===--- ClangdUnitStore.h - A container of CppFiles -------------*-C++-*-===//
-//
-// The LLVM Compiler Infrastructure
-//
-// This file is distributed under the University of Illinois Open Source
-// License. See LICENSE.TXT for details.
-//
-//===---------------------------------------------------------------------===//
-
-#ifndef LLVM_CLANG_TOOLS_EXTRA_CLANGD_CLANGDUNITSTORE_H
-#define LLVM_CLANG_TOOLS_EXTRA_CLANGD_CLANGDUNITSTORE_H
-
-#include "ClangdUnit.h"
-#include "GlobalCompilationDatabase.h"
-#include "Logger.h"
-#include "Path.h"
-#include "clang/Tooling/CompilationDatabase.h"
-#include <mutex>
-
-namespace clang {
-namespace clangd {
-
-class Logger;
-
-/// Thread-safe mapping from FileNames to CppFile.
-class CppFileCollection {
-public:
- /// \p ASTCallback is called when a file is parsed synchronously. This should
- /// not be expensive since it blocks diagnostics.
- explicit CppFileCollection(bool StorePreamblesInMemory,
- std::shared_ptr<PCHContainerOperations> PCHs,
- ASTParsedCallback ASTCallback)
- : ASTCallback(std::move(ASTCallback)), PCHs(std::move(PCHs)),
- StorePreamblesInMemory(StorePreamblesInMemory) {}
-
- std::shared_ptr<CppFile> getOrCreateFile(PathRef File) {
- std::lock_guard<std::mutex> Lock(Mutex);
- auto It = OpenedFiles.find(File);
- if (It == OpenedFiles.end()) {
- It = OpenedFiles
- .try_emplace(File, CppFile::Create(File, StorePreamblesInMemory,
- PCHs, ASTCallback))
- .first;
- }
- return It->second;
- }
-
- std::shared_ptr<CppFile> getFile(PathRef File) const {
- std::lock_guard<std::mutex> Lock(Mutex);
- auto It = OpenedFiles.find(File);
- if (It == OpenedFiles.end())
- return nullptr;
- return It->second;
- }
-
- /// Removes a CppFile, stored for \p File, if it's inside collection and
- /// returns it.
- std::shared_ptr<CppFile> removeIfPresent(PathRef File);
-
- /// Gets used memory for each of the stored files.
- std::vector<std::pair<Path, std::size_t>> getUsedBytesPerFile() const;
-
-private:
- mutable std::mutex Mutex;
- llvm::StringMap<std::shared_ptr<CppFile>> OpenedFiles;
- ASTParsedCallback ASTCallback;
- std::shared_ptr<PCHContainerOperations> PCHs;
- bool StorePreamblesInMemory;
-};
-} // namespace clangd
-} // namespace clang
-
-#endif
diff --git a/clang-tools-extra/clangd/TUScheduler.cpp b/clang-tools-extra/clangd/TUScheduler.cpp
index 4c18dcdab6b..e2393658ba0 100644
--- a/clang-tools-extra/clangd/TUScheduler.cpp
+++ b/clang-tools-extra/clangd/TUScheduler.cpp
@@ -1,9 +1,303 @@
+//===--- TUScheduler.cpp -----------------------------------------*-C++-*-===//
+//
+// The LLVM Compiler Infrastructure
+//
+// This file is distributed under the University of Illinois Open Source
+// License. See LICENSE.TXT for details.
+//
+//===----------------------------------------------------------------------===//
+// For each file, managed by TUScheduler, we create a single ASTWorker that
+// manages an AST for that file. All operations that modify or read the AST are
+// run on a separate dedicated thread asynchronously in FIFO order.
+//
+// We start processing each update immediately after we receive it. If two or
+// more updates come subsequently without reads in-between, we attempt to drop
+// an older one to not waste time building the ASTs we don't need.
+//
+// The processing thread of the ASTWorker is also responsible for building the
+// preamble. However, unlike AST, the same preamble can be read concurrently, so
+// we run each of async preamble reads on its own thread.
+//
+// To limit the concurrent load that clangd produces we mantain a semaphore that
+// keeps more than a fixed number of threads from running concurrently.
+//
+// Rationale for cancelling updates.
+// LSP clients can send updates to clangd on each keystroke. Some files take
+// significant time to parse (e.g. a few seconds) and clangd can get starved by
+// the updates to those files. Therefore we try to process only the last update,
+// if possible.
+// Our current strategy to do that is the following:
+// - For each update we immediately schedule rebuild of the AST.
+// - Rebuild of the AST checks if it was cancelled before doing any actual work.
+// If it was, it does not do an actual rebuild, only reports llvm::None to the
+// callback
+// - When adding an update, we cancel the last update in the queue if it didn't
+// have any reads.
+// There is probably a optimal ways to do that. One approach we might take is
+// the following:
+// - For each update we remember the pending inputs, but delay rebuild of the
+// AST for some timeout.
+// - If subsequent updates come before rebuild was started, we replace the
+// pending inputs and reset the timer.
+// - If any reads of the AST are scheduled, we start building the AST
+// immediately.
+
#include "TUScheduler.h"
#include "clang/Frontend/PCHContainerOperations.h"
#include "llvm/Support/Errc.h"
+#include <memory>
+#include <queue>
namespace clang {
namespace clangd {
+namespace {
+class ASTWorkerHandle;
+
+/// Owns one instance of the AST, schedules updates and reads of it.
+/// Also responsible for building and providing access to the preamble.
+/// Each ASTWorker processes the async requests sent to it on a separate
+/// dedicated thread.
+/// The ASTWorker that manages the AST is shared by both the processing thread
+/// and the TUScheduler. The TUScheduler should discard an ASTWorker when
+/// remove() is called, but its thread may be busy and we don't want to block.
+/// So the workers are accessed via an ASTWorkerHandle. Destroying the handle
+/// signals the worker to exit its run loop and gives up shared ownership of the
+/// worker.
+class ASTWorker {
+ friend class ASTWorkerHandle;
+ ASTWorker(Semaphore &Barrier, std::shared_ptr<CppFile> AST, bool RunSync);
+
+public:
+ /// Create a new ASTWorker and return a handle to it.
+ /// The processing thread is spawned using \p Tasks. However, when \p Tasks
+ /// is null, all requests will be processed on the calling thread
+ /// synchronously instead. \p Barrier is acquired when processing each
+ /// request, it is be used to limit the number of actively running threads.
+ static ASTWorkerHandle Create(AsyncTaskRunner *Tasks, Semaphore &Barrier,
+ std::shared_ptr<CppFile> AST);
+ ~ASTWorker();
+
+ void update(ParseInputs Inputs,
+ UniqueFunction<void(llvm::Optional<std::vector<DiagWithFixIts>>)>
+ OnUpdated);
+ void runWithAST(UniqueFunction<void(llvm::Expected<InputsAndAST>)> Action);
+
+ std::shared_ptr<const PreambleData> getPossiblyStalePreamble() const;
+ std::size_t getUsedBytes() const;
+
+private:
+ // Must be called exactly once on processing thread. Will return after
+ // stop() is called on a separate thread and all pending requests are
+ // processed.
+ void run();
+ /// Signal that run() should finish processing pending requests and exit.
+ void stop();
+ /// Adds a new task to the end of the request queue.
+ void startTask(UniqueFunction<void()> Task, bool isUpdate,
+ llvm::Optional<CancellationFlag> CF);
+
+ using RequestWithCtx = std::pair<UniqueFunction<void()>, Context>;
+
+ const bool RunSync;
+ Semaphore &Barrier;
+ // AST and FileInputs are only accessed on the processing thread from run().
+ const std::shared_ptr<CppFile> AST;
+ // Inputs, corresponding to the current state of AST.
+ ParseInputs FileInputs;
+ // Guards members used by both TUScheduler and the worker thread.
+ mutable std::mutex Mutex;
+ // Set to true to signal run() to finish processing.
+ bool Done; /* GUARDED_BY(Mutex) */
+ std::queue<RequestWithCtx> Requests; /* GUARDED_BY(Mutex) */
+ // Only set when last request is an update. This allows us to cancel an update
+ // that was never read, if a subsequent update comes in.
+ llvm::Optional<CancellationFlag> LastUpdateCF; /* GUARDED_BY(Mutex) */
+ std::condition_variable RequestsCV;
+};
+
+/// A smart-pointer-like class that points to an active ASTWorker.
+/// In destructor, signals to the underlying ASTWorker that no new requests will
+/// be sent and the processing loop may exit (after running all pending
+/// requests).
+class ASTWorkerHandle {
+ friend class ASTWorker;
+ ASTWorkerHandle(std::shared_ptr<ASTWorker> Worker)
+ : Worker(std::move(Worker)) {
+ assert(this->Worker);
+ }
+
+public:
+ ASTWorkerHandle(const ASTWorkerHandle &) = delete;
+ ASTWorkerHandle &operator=(const ASTWorkerHandle &) = delete;
+ ASTWorkerHandle(ASTWorkerHandle &&) = default;
+ ASTWorkerHandle &operator=(ASTWorkerHandle &&) = default;
+
+ ~ASTWorkerHandle() {
+ if (Worker)
+ Worker->stop();
+ }
+
+ ASTWorker &operator*() {
+ assert(Worker && "Handle was moved from");
+ return *Worker;
+ }
+
+ ASTWorker *operator->() {
+ assert(Worker && "Handle was moved from");
+ return Worker.get();
+ }
+
+ /// Returns an owning reference to the underlying ASTWorker that can outlive
+ /// the ASTWorkerHandle. However, no new requests to an active ASTWorker can
+ /// be schedule via the returned reference, i.e. only reads of the preamble
+ /// are possible.
+ std::shared_ptr<const ASTWorker> lock() { return Worker; }
+
+private:
+ std::shared_ptr<ASTWorker> Worker;
+};
+
+ASTWorkerHandle ASTWorker::Create(AsyncTaskRunner *Tasks, Semaphore &Barrier,
+ std::shared_ptr<CppFile> AST) {
+ std::shared_ptr<ASTWorker> Worker(
+ new ASTWorker(Barrier, std::move(AST), /*RunSync=*/!Tasks));
+ if (Tasks)
+ Tasks->runAsync([Worker]() { Worker->run(); });
+
+ return ASTWorkerHandle(std::move(Worker));
+}
+
+ASTWorker::ASTWorker(Semaphore &Barrier, std::shared_ptr<CppFile> AST,
+ bool RunSync)
+ : RunSync(RunSync), Barrier(Barrier), AST(std::move(AST)), Done(false) {
+ if (RunSync)
+ return;
+}
+
+ASTWorker::~ASTWorker() {
+#ifndef NDEBUG
+ std::lock_guard<std::mutex> Lock(Mutex);
+ assert(Done && "handle was not destroyed");
+ assert(Requests.empty() && "unprocessed requests when destroying ASTWorker");
+#endif
+}
+
+void ASTWorker::update(
+ ParseInputs Inputs,
+ UniqueFunction<void(llvm::Optional<std::vector<DiagWithFixIts>>)>
+ OnUpdated) {
+ auto Task = [=](CancellationFlag CF, decltype(OnUpdated) OnUpdated) mutable {
+ if (CF.isCancelled()) {
+ OnUpdated(llvm::None);
+ return;
+ }
+ FileInputs = Inputs;
+ auto Diags = AST->rebuild(std::move(Inputs));
+ // We want to report the diagnostics even if this update was cancelled.
+ // It seems more useful than making the clients wait indefinitely if they
+ // spam us with updates.
+ OnUpdated(std::move(Diags));
+ };
+
+ CancellationFlag UpdateCF;
+ startTask(BindWithForward(Task, UpdateCF, std::move(OnUpdated)),
+ /*isUpdate=*/true, UpdateCF);
+}
+
+void ASTWorker::runWithAST(
+ UniqueFunction<void(llvm::Expected<InputsAndAST>)> Action) {
+ auto Task = [=](decltype(Action) Action) {
+ auto ASTWrapper = this->AST->getAST().get();
+ // FIXME: no need to lock here, cleanup the CppFile interface to get rid of
+ // them.
+ ASTWrapper->runUnderLock([&](ParsedAST *AST) {
+ if (!AST) {
+ Action(llvm::make_error<llvm::StringError>(
+ "invalid AST", llvm::errc::invalid_argument));
+ return;
+ }
+ Action(InputsAndAST{FileInputs, *AST});
+ });
+ };
+
+ startTask(BindWithForward(Task, std::move(Action)), /*isUpdate=*/false,
+ llvm::None);
+}
+
+std::shared_ptr<const PreambleData>
+ASTWorker::getPossiblyStalePreamble() const {
+ return AST->getPossiblyStalePreamble();
+}
+
+std::size_t ASTWorker::getUsedBytes() const {
+ // FIXME(ibiryukov): we'll need to take locks here after we remove
+ // thread-safety from CppFile. For now, CppFile is thread-safe and we can
+ // safely call methods on it without acquiring a lock.
+ return AST->getUsedBytes();
+}
+
+void ASTWorker::stop() {
+ {
+ std::lock_guard<std::mutex> Lock(Mutex);
+ assert(!Done && "stop() called twice");
+ Done = true;
+ }
+ RequestsCV.notify_one();
+}
+
+void ASTWorker::startTask(UniqueFunction<void()> Task, bool isUpdate,
+ llvm::Optional<CancellationFlag> CF) {
+ assert(isUpdate == CF.hasValue() &&
+ "Only updates are expected to pass CancellationFlag");
+
+ if (RunSync) {
+ assert(!Done && "running a task after stop()");
+ Task();
+ return;
+ }
+
+ {
+ std::lock_guard<std::mutex> Lock(Mutex);
+ assert(!Done && "running a task after stop()");
+ if (isUpdate) {
+ if (!Requests.empty() && LastUpdateCF) {
+ // There were no reads for the last unprocessed update, let's cancel it
+ // to not waste time on it.
+ LastUpdateCF->cancel();
+ }
+ LastUpdateCF = std::move(*CF);
+ } else {
+ LastUpdateCF = llvm::None;
+ }
+ Requests.emplace(std::move(Task), Context::current().clone());
+ } // unlock Mutex.
+ RequestsCV.notify_one();
+}
+
+void ASTWorker::run() {
+ while (true) {
+ RequestWithCtx Req;
+ {
+ std::unique_lock<std::mutex> Lock(Mutex);
+ RequestsCV.wait(Lock, [&]() { return Done || !Requests.empty(); });
+ if (Requests.empty()) {
+ assert(Done);
+ return;
+ }
+ // Even when Done is true, we finish processing all pending requests
+ // before exiting the processing loop.
+
+ Req = std::move(Requests.front());
+ Requests.pop();
+ } // unlock Mutex
+
+ std::lock_guard<Semaphore> BarrierLock(Barrier);
+ WithContext Guard(std::move(Req.second));
+ Req.first();
+ }
+}
+} // namespace
+
unsigned getDefaultAsyncThreadsCount() {
unsigned HardwareConcurrency = std::thread::hardware_concurrency();
// C++ standard says that hardware_concurrency()
@@ -14,110 +308,114 @@ unsigned getDefaultAsyncThreadsCount() {
return HardwareConcurrency;
}
+struct TUScheduler::FileData {
+ /// Latest inputs, passed to TUScheduler::update().
+ ParseInputs Inputs;
+ ASTWorkerHandle Worker;
+};
+
TUScheduler::TUScheduler(unsigned AsyncThreadsCount,
bool StorePreamblesInMemory,
ASTParsedCallback ASTCallback)
+ : StorePreamblesInMemory(StorePreamblesInMemory),
+ PCHOps(std::make_shared<PCHContainerOperations>()),
+ ASTCallback(std::move(ASTCallback)), Barrier(AsyncThreadsCount) {
+ if (0 < AsyncThreadsCount)
+ Tasks.emplace();
+}
+
+TUScheduler::~TUScheduler() {
+ // Notify all workers that they need to stop.
+ Files.clear();
- : Files(StorePreamblesInMemory, std::make_shared<PCHContainerOperations>(),
- std::move(ASTCallback)),
- Threads(AsyncThreadsCount) {}
+ // Wait for all in-flight tasks to finish.
+ if (Tasks)
+ Tasks->waitForAll();
+}
void TUScheduler::update(
PathRef File, ParseInputs Inputs,
UniqueFunction<void(llvm::Optional<std::vector<DiagWithFixIts>>)>
OnUpdated) {
- CachedInputs[File] = Inputs;
-
- auto Resources = Files.getOrCreateFile(File);
- auto DeferredRebuild = Resources->deferRebuild(std::move(Inputs));
-
- Threads.addToFront(
- [](decltype(OnUpdated) OnUpdated,
- decltype(DeferredRebuild) DeferredRebuild) {
- auto Diags = DeferredRebuild();
- OnUpdated(Diags);
- },
- std::move(OnUpdated), std::move(DeferredRebuild));
+ std::unique_ptr<FileData> &FD = Files[File];
+ if (!FD) {
+ // Create a new worker to process the AST-related tasks.
+ ASTWorkerHandle Worker = ASTWorker::Create(
+ Tasks ? Tasks.getPointer() : nullptr, Barrier,
+ CppFile::Create(File, StorePreamblesInMemory, PCHOps, ASTCallback));
+ FD = std::unique_ptr<FileData>(new FileData{Inputs, std::move(Worker)});
+ } else {
+ FD->Inputs = Inputs;
+ }
+ FD->Worker->update(std::move(Inputs), std::move(OnUpdated));
}
void TUScheduler::remove(PathRef File,
UniqueFunction<void(llvm::Error)> Action) {
- CachedInputs.erase(File);
-
- auto Resources = Files.removeIfPresent(File);
- if (!Resources) {
+ auto It = Files.find(File);
+ if (It == Files.end()) {
Action(llvm::make_error<llvm::StringError>(
"trying to remove non-added document", llvm::errc::invalid_argument));
return;
}
-
- auto DeferredCancel = Resources->deferCancelRebuild();
- Threads.addToFront(
- [](decltype(Action) Action, decltype(DeferredCancel) DeferredCancel) {
- DeferredCancel();
- Action(llvm::Error::success());
- },
- std::move(Action), std::move(DeferredCancel));
+ Files.erase(It);
}
void TUScheduler::runWithAST(
PathRef File, UniqueFunction<void(llvm::Expected<InputsAndAST>)> Action) {
- auto Resources = Files.getFile(File);
- if (!Resources) {
+ auto It = Files.find(File);
+ if (It == Files.end()) {
Action(llvm::make_error<llvm::StringError>(
"trying to get AST for non-added document",
llvm::errc::invalid_argument));
return;
}
- const ParseInputs &Inputs = getInputs(File);
- // We currently block the calling thread until AST is available and run the
- // action on the calling thread to avoid inconsistent states coming from
- // subsequent updates.
- // FIXME(ibiryukov): this should be moved to the worker threads.
- Resources->getAST().get()->runUnderLock([&](ParsedAST *AST) {
- if (AST)
- Action(InputsAndAST{Inputs, *AST});
- else
- Action(llvm::make_error<llvm::StringError>(
- "Could not build AST for the latest file update",
- llvm::errc::invalid_argument));
- });
+ It->second->Worker->runWithAST(std::move(Action));
}
void TUScheduler::runWithPreamble(
PathRef File,
UniqueFunction<void(llvm::Expected<InputsAndPreamble>)> Action) {
- std::shared_ptr<CppFile> Resources = Files.getFile(File);
- if (!Resources) {
+ auto It = Files.find(File);
+ if (It == Files.end()) {
Action(llvm::make_error<llvm::StringError>(
"trying to get preamble for non-added document",
llvm::errc::invalid_argument));
return;
}
- const ParseInputs &Inputs = getInputs(File);
- std::shared_ptr<const PreambleData> Preamble =
- Resources->getPossiblyStalePreamble();
- Threads.addToFront(
- [Resources, Preamble, Inputs](decltype(Action) Action) mutable {
- if (!Preamble)
- Preamble = Resources->getPossiblyStalePreamble();
+ if (!Tasks) {
+ std::shared_ptr<const PreambleData> Preamble =
+ It->second->Worker->getPossiblyStalePreamble();
+ Action(InputsAndPreamble{It->second->Inputs, Preamble.get()});
+ return;
+ }
- Action(InputsAndPreamble{Inputs, Preamble.get()});
- },
- std::move(Action));
-}
+ ParseInputs InputsCopy = It->second->Inputs;
+ std::shared_ptr<const ASTWorker> Worker = It->second->Worker.lock();
+ auto Task = [InputsCopy, Worker, this](Context Ctx,
+ decltype(Action) Action) mutable {
+ std::lock_guard<Semaphore> BarrierLock(Barrier);
+ WithContext Guard(std::move(Ctx));
+ std::shared_ptr<const PreambleData> Preamble =
+ Worker->getPossiblyStalePreamble();
+ Action(InputsAndPreamble{InputsCopy, Preamble.get()});
+ };
-const ParseInputs &TUScheduler::getInputs(PathRef File) {
- auto It = CachedInputs.find(File);
- assert(It != CachedInputs.end());
- return It->second;
+ Tasks->runAsync(
+ BindWithForward(Task, Context::current().clone(), std::move(Action)));
}
std::vector<std::pair<Path, std::size_t>>
TUScheduler::getUsedBytesPerFile() const {
- return Files.getUsedBytesPerFile();
+ std::vector<std::pair<Path, std::size_t>> Result;
+ Result.reserve(Files.size());
+ for (auto &&PathAndFile : Files)
+ Result.push_back(
+ {PathAndFile.first(), PathAndFile.second->Worker->getUsedBytes()});
+ return Result;
}
+
} // namespace clangd
} // namespace clang
diff --git a/clang-tools-extra/clangd/TUScheduler.h b/clang-tools-extra/clangd/TUScheduler.h
index c7df8c4dba1..41562ebff45 100644
--- a/clang-tools-extra/clangd/TUScheduler.h
+++ b/clang-tools-extra/clangd/TUScheduler.h
@@ -11,9 +11,9 @@
#define LLVM_CLANG_TOOLS_EXTRA_CLANGD_TUSCHEDULER_H
#include "ClangdUnit.h"
-#include "ClangdUnitStore.h"
#include "Function.h"
#include "Threading.h"
+#include "llvm/ADT/StringMap.h"
namespace clang {
namespace clangd {
@@ -42,6 +42,7 @@ class TUScheduler {
public:
TUScheduler(unsigned AsyncThreadsCount, bool StorePreamblesInMemory,
ASTParsedCallback ASTCallback);
+ ~TUScheduler();
/// Returns estimated memory usage for each of the currently open files.
/// The order of results is unspecified.
@@ -81,11 +82,17 @@ public:
UniqueFunction<void(llvm::Expected<InputsAndPreamble>)> Action);
private:
- const ParseInputs &getInputs(PathRef File);
+ /// This class stores per-file data in the Files map.
+ struct FileData;
- llvm::StringMap<ParseInputs> CachedInputs;
- CppFileCollection Files;
- ThreadPool Threads;
+ const bool StorePreamblesInMemory;
+ const std::shared_ptr<PCHContainerOperations> PCHOps;
+ const ASTParsedCallback ASTCallback;
+ Semaphore Barrier;
+ llvm::StringMap<std::unique_ptr<FileData>> Files;
+ // None when running tasks synchronously and non-None when running tasks
+ // asynchronously.
+ llvm::Optional<AsyncTaskRunner> Tasks;
};
} // namespace clangd
} // namespace clang
diff --git a/clang-tools-extra/clangd/Threading.cpp b/clang-tools-extra/clangd/Threading.cpp
index 3c0c74bb803..94bb76c5f1f 100644
--- a/clang-tools-extra/clangd/Threading.cpp
+++ b/clang-tools-extra/clangd/Threading.cpp
@@ -1,63 +1,62 @@
#include "Threading.h"
+#include "llvm/ADT/ScopeExit.h"
#include "llvm/Support/FormatVariadic.h"
#include "llvm/Support/Threading.h"
+#include <thread>
namespace clang {
namespace clangd {
-ThreadPool::ThreadPool(unsigned AsyncThreadsCount)
- : RunSynchronously(AsyncThreadsCount == 0) {
- if (RunSynchronously) {
- // Don't start the worker thread if we're running synchronously
- return;
- }
- Workers.reserve(AsyncThreadsCount);
- for (unsigned I = 0; I < AsyncThreadsCount; ++I) {
- Workers.push_back(std::thread([this, I]() {
- llvm::set_thread_name(llvm::formatv("scheduler/{0}", I));
- while (true) {
- UniqueFunction<void()> Request;
- Context Ctx;
-
- // Pick request from the queue
- {
- std::unique_lock<std::mutex> Lock(Mutex);
- // Wait for more requests.
- RequestCV.wait(Lock,
- [this] { return !RequestQueue.empty() || Done; });
- if (RequestQueue.empty()) {
- assert(Done);
- return;
- }
-
- // We process requests starting from the front of the queue. Users of
- // ThreadPool have a way to prioritise their requests by putting
- // them to the either side of the queue (using either addToEnd or
- // addToFront).
- std::tie(Request, Ctx) = std::move(RequestQueue.front());
- RequestQueue.pop_front();
- } // unlock Mutex
-
- WithContext WithCtx(std::move(Ctx));
- Request();
- }
- }));
- }
+CancellationFlag::CancellationFlag()
+ : WasCancelled(std::make_shared<std::atomic<bool>>(false)) {}
+
+Semaphore::Semaphore(std::size_t MaxLocks) : FreeSlots(MaxLocks) {}
+
+void Semaphore::lock() {
+ std::unique_lock<std::mutex> Lock(Mutex);
+ SlotsChanged.wait(Lock, [&]() { return FreeSlots > 0; });
+ --FreeSlots;
}
-ThreadPool::~ThreadPool() {
- if (RunSynchronously)
- return; // no worker thread is running in that case
+void Semaphore::unlock() {
+ std::unique_lock<std::mutex> Lock(Mutex);
+ ++FreeSlots;
+ Lock.unlock();
+
+ SlotsChanged.notify_one();
+}
+AsyncTaskRunner::~AsyncTaskRunner() { waitForAll(); }
+
+void AsyncTaskRunner::waitForAll() {
+ std::unique_lock<std::mutex> Lock(Mutex);
+ TasksReachedZero.wait(Lock, [&]() { return InFlightTasks == 0; });
+}
+
+void AsyncTaskRunner::runAsync(UniqueFunction<void()> Action) {
{
- std::lock_guard<std::mutex> Lock(Mutex);
- // Wake up the worker thread
- Done = true;
- } // unlock Mutex
- RequestCV.notify_all();
-
- for (auto &Worker : Workers)
- Worker.join();
+ std::unique_lock<std::mutex> Lock(Mutex);
+ ++InFlightTasks;
+ }
+
+ auto CleanupTask = llvm::make_scope_exit([this]() {
+ std::unique_lock<std::mutex> Lock(Mutex);
+ int NewTasksCnt = --InFlightTasks;
+ Lock.unlock();
+
+ if (NewTasksCnt == 0)
+ TasksReachedZero.notify_one();
+ });
+
+ std::thread(
+ [](decltype(Action) Action, decltype(CleanupTask)) {
+ Action();
+ // Make sure function stored by Action is destroyed before CleanupTask
+ // is run.
+ Action = nullptr;
+ },
+ std::move(Action), std::move(CleanupTask))
+ .detach();
}
} // namespace clangd
} // namespace clang
diff --git a/clang-tools-extra/clangd/Threading.h b/clang-tools-extra/clangd/Threading.h
index 123d17964ef..a24eed7bc5b 100644
--- a/clang-tools-extra/clangd/Threading.h
+++ b/clang-tools-extra/clangd/Threading.h
@@ -12,74 +12,65 @@
#include "Context.h"
#include "Function.h"
+#include <atomic>
+#include <cassert>
#include <condition_variable>
-#include <deque>
+#include <memory>
#include <mutex>
-#include <thread>
#include <vector>
namespace clang {
namespace clangd {
-/// A simple fixed-size thread pool implementation.
-class ThreadPool {
+
+/// A shared boolean flag indicating if the computation was cancelled.
+/// Once cancelled, cannot be returned to the previous state.
+class CancellationFlag {
public:
- /// If \p AsyncThreadsCount is 0, requests added using addToFront and addToEnd
- /// will be processed synchronously on the calling thread.
- // Otherwise, \p AsyncThreadsCount threads will be created to schedule the
- // requests.
- ThreadPool(unsigned AsyncThreadsCount);
- /// Destructor blocks until all requests are processed and worker threads are
- /// terminated.
- ~ThreadPool();
+ CancellationFlag();
- /// Add a new request to run function \p F with args \p As to the start of the
- /// queue. The request will be run on a separate thread.
- template <class Func, class... Args>
- void addToFront(Func &&F, Args &&... As) {
- if (RunSynchronously) {
- std::forward<Func>(F)(std::forward<Args>(As)...);
- return;
- }
+ void cancel() {
+ assert(WasCancelled && "the object was moved");
+ WasCancelled->store(true);
+ }
- {
- std::lock_guard<std::mutex> Lock(Mutex);
- RequestQueue.emplace_front(
- BindWithForward(std::forward<Func>(F), std::forward<Args>(As)...),
- Context::current().clone());
- }
- RequestCV.notify_one();
+ bool isCancelled() const {
+ assert(WasCancelled && "the object was moved");
+ return WasCancelled->load();
}
- /// Add a new request to run function \p F with args \p As to the end of the
- /// queue. The request will be run on a separate thread.
- template <class Func, class... Args> void addToEnd(Func &&F, Args &&... As) {
- if (RunSynchronously) {
- std::forward<Func>(F)(std::forward<Args>(As)...);
- return;
- }
+private:
+ std::shared_ptr<std::atomic<bool>> WasCancelled;
+};
- {
- std::lock_guard<std::mutex> Lock(Mutex);
- RequestQueue.emplace_back(
- BindWithForward(std::forward<Func>(F), std::forward<Args>(As)...),
- Context::current().clone());
- }
- RequestCV.notify_one();
- }
+/// Limits the number of threads that can acquire the lock at the same time.
+class Semaphore {
+public:
+ Semaphore(std::size_t MaxLocks);
+
+ void lock();
+ void unlock();
+
+private:
+ std::mutex Mutex;
+ std::condition_variable SlotsChanged;
+ std::size_t FreeSlots;
+};
+
+/// Runs tasks on separate (detached) threads and wait for all tasks to finish.
+/// Objects that need to spawn threads can own an AsyncTaskRunner to ensure they
+/// all complete on destruction.
+class AsyncTaskRunner {
+public:
+ /// Destructor waits for all pending tasks to finish.
+ ~AsyncTaskRunner();
+
+ void waitForAll();
+ void runAsync(UniqueFunction<void()> Action);
private:
- bool RunSynchronously;
- mutable std::mutex Mutex;
- /// We run some tasks on separate threads(parsing, CppFile cleanup).
- /// These threads looks into RequestQueue to find requests to handle and
- /// terminate when Done is set to true.
- std::vector<std::thread> Workers;
- /// Setting Done to true will make the worker threads terminate.
- bool Done = false;
- /// A queue of requests.
- std::deque<std::pair<UniqueFunction<void()>, Context>> RequestQueue;
- /// Condition variable to wake up worker threads.
- std::condition_variable RequestCV;
+ std::mutex Mutex;
+ std::condition_variable TasksReachedZero;
+ std::size_t InFlightTasks = 0;
};
} // namespace clangd
} // namespace clang
diff --git a/clang-tools-extra/unittests/clangd/CMakeLists.txt b/clang-tools-extra/unittests/clangd/CMakeLists.txt
index 8f6125e192d..c0cba6c817f 100644
--- a/clang-tools-extra/unittests/clangd/CMakeLists.txt
+++ b/clang-tools-extra/unittests/clangd/CMakeLists.txt
@@ -21,6 +21,7 @@ add_extra_unittest(ClangdTests
JSONExprTests.cpp
URITests.cpp
TestFS.cpp
+ ThreadingTests.cpp
TraceTests.cpp
TUSchedulerTests.cpp
SourceCodeTests.cpp
diff --git a/clang-tools-extra/unittests/clangd/ThreadingTests.cpp b/clang-tools-extra/unittests/clangd/ThreadingTests.cpp
new file mode 100644
index 00000000000..ffa12887c82
--- /dev/null
+++ b/clang-tools-extra/unittests/clangd/ThreadingTests.cpp
@@ -0,0 +1,61 @@
+//===-- ThreadingTests.cpp --------------------------------------*- C++ -*-===//
+//
+// The LLVM Compiler Infrastructure
+//
+// This file is distributed under the University of Illinois Open Source
+// License. See LICENSE.TXT for details.
+//
+//===----------------------------------------------------------------------===//
+
+#include "Threading.h"
+#include "gtest/gtest.h"
+#include <mutex>
+
+namespace clang {
+namespace clangd {
+class ThreadingTest : public ::testing::Test {};
+
+TEST_F(ThreadingTest, TaskRunner) {
+ const int TasksCnt = 100;
+ const int IncrementsPerTask = 1000;
+
+ std::mutex Mutex;
+ int Counter(0); /* GUARDED_BY(Mutex) */
+ {
+ AsyncTaskRunner Tasks;
+ auto scheduleIncrements = [&]() {
+ for (int TaskI = 0; TaskI < TasksCnt; ++TaskI) {
+ Tasks.runAsync([&Counter, &Mutex]() {
+ for (int Increment = 0; Increment < IncrementsPerTask; ++Increment) {
+ std::lock_guard<std::mutex> Lock(Mutex);
+ ++Counter;
+ }
+ });
+ }
+ };
+
+ {
+ // Make sure runAsync is not running tasks synchronously on the same
+ // thread by locking the Mutex used for increments.
+ std::lock_guard<std::mutex> Lock(Mutex);
+ scheduleIncrements();
+ }
+
+ Tasks.waitForAll();
+ {
+ std::lock_guard<std::mutex> Lock(Mutex);
+ ASSERT_EQ(Counter, TasksCnt * IncrementsPerTask);
+ }
+
+ {
+ std::lock_guard<std::mutex> Lock(Mutex);
+ Counter = 0;
+ scheduleIncrements();
+ }
+ }
+ // Check that destructor has waited for tasks to finish.
+ std::lock_guard<std::mutex> Lock(Mutex);
+ ASSERT_EQ(Counter, TasksCnt * IncrementsPerTask);
+}
+} // namespace clangd
+} // namespace clang