You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
782 lines
22 KiB
782 lines
22 KiB
/* Distributed under the OSI-approved BSD 3-Clause License. See accompanying
|
|
file Copyright.txt or https://cmake.org/licensing for details. */
|
|
#include "cmWorkerPool.h"
|
|
|
|
#include <algorithm>
|
|
#include <array>
|
|
#include <condition_variable>
|
|
#include <cstddef>
|
|
#include <deque>
|
|
#include <functional>
|
|
#include <mutex>
|
|
#include <thread>
|
|
|
|
#include <cm/memory>
|
|
|
|
#include <cm3p/uv.h>
|
|
|
|
#include "cmRange.h"
|
|
#include "cmStringAlgorithms.h"
|
|
#include "cmUVHandlePtr.h"
|
|
#include "cmUVSignalHackRAII.h" // IWYU pragma: keep
|
|
|
|
/**
|
|
* @brief libuv pipe buffer class
|
|
*/
|
|
class cmUVPipeBuffer
|
|
{
|
|
public:
|
|
using DataRange = cmRange<const char*>;
|
|
using DataFunction = std::function<void(DataRange)>;
|
|
/// On error the ssize_t argument is a non zero libuv error code
|
|
using EndFunction = std::function<void(ssize_t)>;
|
|
|
|
/**
|
|
* Reset to construction state
|
|
*/
|
|
void reset();
|
|
|
|
/**
|
|
* Initializes uv_pipe(), uv_stream() and uv_handle()
|
|
* @return true on success
|
|
*/
|
|
bool init(uv_loop_t* uv_loop);
|
|
|
|
/**
|
|
* Start reading
|
|
* @return true on success
|
|
*/
|
|
bool startRead(DataFunction dataFunction, EndFunction endFunction);
|
|
|
|
//! libuv pipe
|
|
uv_pipe_t* uv_pipe() const { return this->UVPipe_.get(); }
|
|
//! uv_pipe() casted to libuv stream
|
|
uv_stream_t* uv_stream() const
|
|
{
|
|
return static_cast<uv_stream_t*>(this->UVPipe_);
|
|
}
|
|
//! uv_pipe() casted to libuv handle
|
|
uv_handle_t* uv_handle() { return static_cast<uv_handle_t*>(this->UVPipe_); }
|
|
|
|
private:
|
|
// -- Libuv callbacks
|
|
static void UVAlloc(uv_handle_t* handle, size_t suggestedSize,
|
|
uv_buf_t* buf);
|
|
static void UVData(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf);
|
|
|
|
cm::uv_pipe_ptr UVPipe_;
|
|
std::vector<char> Buffer_;
|
|
DataFunction DataFunction_;
|
|
EndFunction EndFunction_;
|
|
};
|
|
|
|
void cmUVPipeBuffer::reset()
|
|
{
|
|
if (this->UVPipe_.get() != nullptr) {
|
|
this->EndFunction_ = nullptr;
|
|
this->DataFunction_ = nullptr;
|
|
this->Buffer_.clear();
|
|
this->Buffer_.shrink_to_fit();
|
|
this->UVPipe_.reset();
|
|
}
|
|
}
|
|
|
|
bool cmUVPipeBuffer::init(uv_loop_t* uv_loop)
|
|
{
|
|
this->reset();
|
|
if (uv_loop == nullptr) {
|
|
return false;
|
|
}
|
|
int ret = this->UVPipe_.init(*uv_loop, 0, this);
|
|
return (ret == 0);
|
|
}
|
|
|
|
bool cmUVPipeBuffer::startRead(DataFunction dataFunction,
|
|
EndFunction endFunction)
|
|
{
|
|
if (this->UVPipe_.get() == nullptr) {
|
|
return false;
|
|
}
|
|
if (!dataFunction || !endFunction) {
|
|
return false;
|
|
}
|
|
this->DataFunction_ = std::move(dataFunction);
|
|
this->EndFunction_ = std::move(endFunction);
|
|
int ret = uv_read_start(this->uv_stream(), &cmUVPipeBuffer::UVAlloc,
|
|
&cmUVPipeBuffer::UVData);
|
|
return (ret == 0);
|
|
}
|
|
|
|
void cmUVPipeBuffer::UVAlloc(uv_handle_t* handle, size_t suggestedSize,
|
|
uv_buf_t* buf)
|
|
{
|
|
auto& pipe = *reinterpret_cast<cmUVPipeBuffer*>(handle->data);
|
|
pipe.Buffer_.resize(suggestedSize);
|
|
buf->base = pipe.Buffer_.data();
|
|
buf->len = static_cast<unsigned long>(pipe.Buffer_.size());
|
|
}
|
|
|
|
void cmUVPipeBuffer::UVData(uv_stream_t* stream, ssize_t nread,
|
|
const uv_buf_t* buf)
|
|
{
|
|
auto& pipe = *reinterpret_cast<cmUVPipeBuffer*>(stream->data);
|
|
if (nread > 0) {
|
|
if (buf->base != nullptr) {
|
|
// Call data function
|
|
pipe.DataFunction_(DataRange(buf->base, buf->base + nread));
|
|
}
|
|
} else if (nread < 0) {
|
|
// Save the end function on the stack before resetting the pipe
|
|
EndFunction efunc;
|
|
efunc.swap(pipe.EndFunction_);
|
|
// Reset pipe before calling the end function
|
|
pipe.reset();
|
|
// Call end function
|
|
efunc((nread == UV_EOF) ? 0 : nread);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* @brief External process management class
|
|
*/
|
|
class cmUVReadOnlyProcess
|
|
{
|
|
public:
|
|
// -- Types
|
|
//! @brief Process settings
|
|
struct SetupT
|
|
{
|
|
std::string WorkingDirectory;
|
|
std::vector<std::string> Command;
|
|
cmWorkerPool::ProcessResultT* Result = nullptr;
|
|
bool MergedOutput = false;
|
|
};
|
|
|
|
// -- Const accessors
|
|
SetupT const& Setup() const { return this->Setup_; }
|
|
cmWorkerPool::ProcessResultT* Result() const { return this->Setup_.Result; }
|
|
bool IsStarted() const { return this->IsStarted_; }
|
|
bool IsFinished() const { return this->IsFinished_; }
|
|
|
|
// -- Runtime
|
|
void setup(cmWorkerPool::ProcessResultT* result, bool mergedOutput,
|
|
std::vector<std::string> const& command,
|
|
std::string const& workingDirectory = std::string());
|
|
bool start(uv_loop_t* uv_loop, std::function<void()> finishedCallback);
|
|
|
|
private:
|
|
// -- Libuv callbacks
|
|
static void UVExit(uv_process_t* handle, int64_t exitStatus, int termSignal);
|
|
void UVPipeOutData(cmUVPipeBuffer::DataRange data) const;
|
|
void UVPipeOutEnd(ssize_t error);
|
|
void UVPipeErrData(cmUVPipeBuffer::DataRange data) const;
|
|
void UVPipeErrEnd(ssize_t error);
|
|
void UVTryFinish();
|
|
|
|
// -- Setup
|
|
SetupT Setup_;
|
|
// -- Runtime
|
|
bool IsStarted_ = false;
|
|
bool IsFinished_ = false;
|
|
std::function<void()> FinishedCallback_;
|
|
std::vector<const char*> CommandPtr_;
|
|
std::array<uv_stdio_container_t, 3> UVOptionsStdIO_;
|
|
uv_process_options_t UVOptions_;
|
|
cm::uv_process_ptr UVProcess_;
|
|
cmUVPipeBuffer UVPipeOut_;
|
|
cmUVPipeBuffer UVPipeErr_;
|
|
};
|
|
|
|
void cmUVReadOnlyProcess::setup(cmWorkerPool::ProcessResultT* result,
|
|
bool mergedOutput,
|
|
std::vector<std::string> const& command,
|
|
std::string const& workingDirectory)
|
|
{
|
|
this->Setup_.WorkingDirectory = workingDirectory;
|
|
this->Setup_.Command = command;
|
|
this->Setup_.Result = result;
|
|
this->Setup_.MergedOutput = mergedOutput;
|
|
}
|
|
|
|
bool cmUVReadOnlyProcess::start(uv_loop_t* uv_loop,
|
|
std::function<void()> finishedCallback)
|
|
{
|
|
if (this->IsStarted() || (this->Result() == nullptr)) {
|
|
return false;
|
|
}
|
|
|
|
// Reset result before the start
|
|
this->Result()->reset();
|
|
|
|
// Fill command string pointers
|
|
if (!this->Setup().Command.empty()) {
|
|
this->CommandPtr_.reserve(this->Setup().Command.size() + 1);
|
|
for (std::string const& arg : this->Setup().Command) {
|
|
this->CommandPtr_.push_back(arg.c_str());
|
|
}
|
|
this->CommandPtr_.push_back(nullptr);
|
|
} else {
|
|
this->Result()->ErrorMessage = "Empty command";
|
|
}
|
|
|
|
if (!this->Result()->error()) {
|
|
if (!this->UVPipeOut_.init(uv_loop)) {
|
|
this->Result()->ErrorMessage = "libuv stdout pipe initialization failed";
|
|
}
|
|
}
|
|
if (!this->Result()->error()) {
|
|
if (!this->UVPipeErr_.init(uv_loop)) {
|
|
this->Result()->ErrorMessage = "libuv stderr pipe initialization failed";
|
|
}
|
|
}
|
|
if (!this->Result()->error()) {
|
|
// -- Setup process stdio options
|
|
// stdin
|
|
this->UVOptionsStdIO_[0].flags = UV_IGNORE;
|
|
this->UVOptionsStdIO_[0].data.stream = nullptr;
|
|
// stdout
|
|
this->UVOptionsStdIO_[1].flags =
|
|
static_cast<uv_stdio_flags>(UV_CREATE_PIPE | UV_WRITABLE_PIPE);
|
|
this->UVOptionsStdIO_[1].data.stream = this->UVPipeOut_.uv_stream();
|
|
// stderr
|
|
this->UVOptionsStdIO_[2].flags =
|
|
static_cast<uv_stdio_flags>(UV_CREATE_PIPE | UV_WRITABLE_PIPE);
|
|
this->UVOptionsStdIO_[2].data.stream = this->UVPipeErr_.uv_stream();
|
|
|
|
// -- Setup process options
|
|
std::fill_n(reinterpret_cast<char*>(&this->UVOptions_),
|
|
sizeof(this->UVOptions_), 0);
|
|
this->UVOptions_.exit_cb = &cmUVReadOnlyProcess::UVExit;
|
|
this->UVOptions_.file = this->CommandPtr_[0];
|
|
this->UVOptions_.args = const_cast<char**>(this->CommandPtr_.data());
|
|
this->UVOptions_.cwd = this->Setup_.WorkingDirectory.c_str();
|
|
this->UVOptions_.flags = UV_PROCESS_WINDOWS_HIDE;
|
|
this->UVOptions_.stdio_count =
|
|
static_cast<int>(this->UVOptionsStdIO_.size());
|
|
this->UVOptions_.stdio = this->UVOptionsStdIO_.data();
|
|
|
|
// -- Spawn process
|
|
int uvErrorCode = this->UVProcess_.spawn(*uv_loop, this->UVOptions_, this);
|
|
if (uvErrorCode != 0) {
|
|
this->Result()->ErrorMessage = "libuv process spawn failed";
|
|
if (const char* uvErr = uv_strerror(uvErrorCode)) {
|
|
this->Result()->ErrorMessage += ": ";
|
|
this->Result()->ErrorMessage += uvErr;
|
|
}
|
|
}
|
|
}
|
|
// -- Start reading from stdio streams
|
|
if (!this->Result()->error()) {
|
|
if (!this->UVPipeOut_.startRead(
|
|
[this](cmUVPipeBuffer::DataRange range) {
|
|
this->UVPipeOutData(range);
|
|
},
|
|
[this](ssize_t error) { this->UVPipeOutEnd(error); })) {
|
|
this->Result()->ErrorMessage =
|
|
"libuv start reading from stdout pipe failed";
|
|
}
|
|
}
|
|
if (!this->Result()->error()) {
|
|
if (!this->UVPipeErr_.startRead(
|
|
[this](cmUVPipeBuffer::DataRange range) {
|
|
this->UVPipeErrData(range);
|
|
},
|
|
[this](ssize_t error) { this->UVPipeErrEnd(error); })) {
|
|
this->Result()->ErrorMessage =
|
|
"libuv start reading from stderr pipe failed";
|
|
}
|
|
}
|
|
|
|
if (!this->Result()->error()) {
|
|
this->IsStarted_ = true;
|
|
this->FinishedCallback_ = std::move(finishedCallback);
|
|
} else {
|
|
// Clear libuv handles and finish
|
|
this->UVProcess_.reset();
|
|
this->UVPipeOut_.reset();
|
|
this->UVPipeErr_.reset();
|
|
this->CommandPtr_.clear();
|
|
}
|
|
|
|
return this->IsStarted();
|
|
}
|
|
|
|
void cmUVReadOnlyProcess::UVExit(uv_process_t* handle, int64_t exitStatus,
|
|
int termSignal)
|
|
{
|
|
auto& proc = *reinterpret_cast<cmUVReadOnlyProcess*>(handle->data);
|
|
if (proc.IsStarted() && !proc.IsFinished()) {
|
|
// Set error message on demand
|
|
proc.Result()->ExitStatus = exitStatus;
|
|
proc.Result()->TermSignal = termSignal;
|
|
if (!proc.Result()->error()) {
|
|
if (termSignal != 0) {
|
|
proc.Result()->ErrorMessage = cmStrCat(
|
|
"Process was terminated by signal ", proc.Result()->TermSignal);
|
|
} else if (exitStatus != 0) {
|
|
proc.Result()->ErrorMessage = cmStrCat(
|
|
"Process failed with return value ", proc.Result()->ExitStatus);
|
|
}
|
|
}
|
|
|
|
// Reset process handle
|
|
proc.UVProcess_.reset();
|
|
// Try finish
|
|
proc.UVTryFinish();
|
|
}
|
|
}
|
|
|
|
void cmUVReadOnlyProcess::UVPipeOutData(cmUVPipeBuffer::DataRange data) const
|
|
{
|
|
this->Result()->StdOut.append(data.begin(), data.end());
|
|
}
|
|
|
|
void cmUVReadOnlyProcess::UVPipeOutEnd(ssize_t error)
|
|
{
|
|
// Process pipe error
|
|
if ((error != 0) && !this->Result()->error()) {
|
|
this->Result()->ErrorMessage = cmStrCat(
|
|
"Reading from stdout pipe failed with libuv error code ", error);
|
|
}
|
|
// Try finish
|
|
this->UVTryFinish();
|
|
}
|
|
|
|
void cmUVReadOnlyProcess::UVPipeErrData(cmUVPipeBuffer::DataRange data) const
|
|
{
|
|
std::string* str = this->Setup_.MergedOutput ? &this->Result()->StdOut
|
|
: &this->Result()->StdErr;
|
|
str->append(data.begin(), data.end());
|
|
}
|
|
|
|
void cmUVReadOnlyProcess::UVPipeErrEnd(ssize_t error)
|
|
{
|
|
// Process pipe error
|
|
if ((error != 0) && !this->Result()->error()) {
|
|
this->Result()->ErrorMessage = cmStrCat(
|
|
"Reading from stderr pipe failed with libuv error code ", error);
|
|
}
|
|
// Try finish
|
|
this->UVTryFinish();
|
|
}
|
|
|
|
void cmUVReadOnlyProcess::UVTryFinish()
|
|
{
|
|
// There still might be data in the pipes after the process has finished.
|
|
// Therefore check if the process is finished AND all pipes are closed
|
|
// before signaling the worker thread to continue.
|
|
if ((this->UVProcess_.get() != nullptr) ||
|
|
(this->UVPipeOut_.uv_pipe() != nullptr) ||
|
|
(this->UVPipeErr_.uv_pipe() != nullptr)) {
|
|
return;
|
|
}
|
|
this->IsFinished_ = true;
|
|
this->FinishedCallback_();
|
|
}
|
|
|
|
/**
|
|
* @brief Worker pool worker thread
|
|
*/
|
|
class cmWorkerPoolWorker
|
|
{
|
|
public:
|
|
cmWorkerPoolWorker(uv_loop_t& uvLoop);
|
|
~cmWorkerPoolWorker();
|
|
|
|
cmWorkerPoolWorker(cmWorkerPoolWorker const&) = delete;
|
|
cmWorkerPoolWorker& operator=(cmWorkerPoolWorker const&) = delete;
|
|
|
|
/**
|
|
* Set the internal thread
|
|
*/
|
|
void SetThread(std::thread&& aThread) { this->Thread_ = std::move(aThread); }
|
|
|
|
/**
|
|
* Run an external process
|
|
*/
|
|
bool RunProcess(cmWorkerPool::ProcessResultT& result,
|
|
std::vector<std::string> const& command,
|
|
std::string const& workingDirectory);
|
|
|
|
private:
|
|
// -- Libuv callbacks
|
|
static void UVProcessStart(uv_async_t* handle);
|
|
void UVProcessFinished();
|
|
|
|
// -- Process management
|
|
struct
|
|
{
|
|
std::mutex Mutex;
|
|
cm::uv_async_ptr Request;
|
|
std::condition_variable Condition;
|
|
std::unique_ptr<cmUVReadOnlyProcess> ROP;
|
|
} Proc_;
|
|
// -- System thread
|
|
std::thread Thread_;
|
|
};
|
|
|
|
cmWorkerPoolWorker::cmWorkerPoolWorker(uv_loop_t& uvLoop)
|
|
{
|
|
this->Proc_.Request.init(uvLoop, &cmWorkerPoolWorker::UVProcessStart, this);
|
|
}
|
|
|
|
cmWorkerPoolWorker::~cmWorkerPoolWorker()
|
|
{
|
|
if (this->Thread_.joinable()) {
|
|
this->Thread_.join();
|
|
}
|
|
}
|
|
|
|
bool cmWorkerPoolWorker::RunProcess(cmWorkerPool::ProcessResultT& result,
|
|
std::vector<std::string> const& command,
|
|
std::string const& workingDirectory)
|
|
{
|
|
if (command.empty()) {
|
|
return false;
|
|
}
|
|
// Create process instance
|
|
{
|
|
std::lock_guard<std::mutex> lock(this->Proc_.Mutex);
|
|
this->Proc_.ROP = cm::make_unique<cmUVReadOnlyProcess>();
|
|
this->Proc_.ROP->setup(&result, true, command, workingDirectory);
|
|
}
|
|
// Send asynchronous process start request to libuv loop
|
|
this->Proc_.Request.send();
|
|
// Wait until the process has been finished and destroyed
|
|
{
|
|
std::unique_lock<std::mutex> ulock(this->Proc_.Mutex);
|
|
while (this->Proc_.ROP) {
|
|
this->Proc_.Condition.wait(ulock);
|
|
}
|
|
}
|
|
return !result.error();
|
|
}
|
|
|
|
void cmWorkerPoolWorker::UVProcessStart(uv_async_t* handle)
|
|
{
|
|
auto* wrk = reinterpret_cast<cmWorkerPoolWorker*>(handle->data);
|
|
bool startFailed = false;
|
|
{
|
|
auto& Proc = wrk->Proc_;
|
|
std::lock_guard<std::mutex> lock(Proc.Mutex);
|
|
if (Proc.ROP && !Proc.ROP->IsStarted()) {
|
|
startFailed =
|
|
!Proc.ROP->start(handle->loop, [wrk] { wrk->UVProcessFinished(); });
|
|
}
|
|
}
|
|
// Clean up if starting of the process failed
|
|
if (startFailed) {
|
|
wrk->UVProcessFinished();
|
|
}
|
|
}
|
|
|
|
void cmWorkerPoolWorker::UVProcessFinished()
|
|
{
|
|
std::lock_guard<std::mutex> lock(this->Proc_.Mutex);
|
|
if (this->Proc_.ROP &&
|
|
(this->Proc_.ROP->IsFinished() || !this->Proc_.ROP->IsStarted())) {
|
|
this->Proc_.ROP.reset();
|
|
}
|
|
// Notify idling thread
|
|
this->Proc_.Condition.notify_one();
|
|
}
|
|
|
|
/**
|
|
* @brief Private worker pool internals
|
|
*/
|
|
class cmWorkerPoolInternal
|
|
{
|
|
public:
|
|
// -- Constructors
|
|
cmWorkerPoolInternal(cmWorkerPool* pool);
|
|
~cmWorkerPoolInternal();
|
|
|
|
/**
|
|
* Runs the libuv loop.
|
|
*/
|
|
bool Process();
|
|
|
|
/**
|
|
* Clear queue and abort threads.
|
|
*/
|
|
void Abort();
|
|
|
|
/**
|
|
* Push a job to the queue and notify a worker.
|
|
*/
|
|
bool PushJob(cmWorkerPool::JobHandleT&& jobHandle);
|
|
|
|
/**
|
|
* Worker thread main loop method.
|
|
*/
|
|
void Work(unsigned int workerIndex);
|
|
|
|
// -- Request slots
|
|
static void UVSlotBegin(uv_async_t* handle);
|
|
static void UVSlotEnd(uv_async_t* handle);
|
|
|
|
// -- UV loop
|
|
#ifdef CMAKE_UV_SIGNAL_HACK
|
|
std::unique_ptr<cmUVSignalHackRAII> UVHackRAII;
|
|
#endif
|
|
std::unique_ptr<uv_loop_t> UVLoop;
|
|
cm::uv_async_ptr UVRequestBegin;
|
|
cm::uv_async_ptr UVRequestEnd;
|
|
|
|
// -- Thread pool and job queue
|
|
std::mutex Mutex;
|
|
bool Processing = false;
|
|
bool Aborting = false;
|
|
bool FenceProcessing = false;
|
|
unsigned int WorkersRunning = 0;
|
|
unsigned int WorkersIdle = 0;
|
|
unsigned int JobsProcessing = 0;
|
|
std::deque<cmWorkerPool::JobHandleT> Queue;
|
|
std::condition_variable Condition;
|
|
std::condition_variable ConditionFence;
|
|
std::vector<std::unique_ptr<cmWorkerPoolWorker>> Workers;
|
|
|
|
// -- References
|
|
cmWorkerPool* Pool = nullptr;
|
|
};
|
|
|
|
void cmWorkerPool::ProcessResultT::reset()
|
|
{
|
|
this->ExitStatus = 0;
|
|
this->TermSignal = 0;
|
|
if (!this->StdOut.empty()) {
|
|
this->StdOut.clear();
|
|
this->StdOut.shrink_to_fit();
|
|
}
|
|
if (!this->StdErr.empty()) {
|
|
this->StdErr.clear();
|
|
this->StdErr.shrink_to_fit();
|
|
}
|
|
if (!this->ErrorMessage.empty()) {
|
|
this->ErrorMessage.clear();
|
|
this->ErrorMessage.shrink_to_fit();
|
|
}
|
|
}
|
|
|
|
cmWorkerPoolInternal::cmWorkerPoolInternal(cmWorkerPool* pool)
|
|
: Pool(pool)
|
|
{
|
|
// Initialize libuv loop
|
|
uv_disable_stdio_inheritance();
|
|
#ifdef CMAKE_UV_SIGNAL_HACK
|
|
UVHackRAII = cm::make_unique<cmUVSignalHackRAII>();
|
|
#endif
|
|
this->UVLoop = cm::make_unique<uv_loop_t>();
|
|
uv_loop_init(this->UVLoop.get());
|
|
}
|
|
|
|
cmWorkerPoolInternal::~cmWorkerPoolInternal()
|
|
{
|
|
uv_loop_close(this->UVLoop.get());
|
|
}
|
|
|
|
bool cmWorkerPoolInternal::Process()
|
|
{
|
|
// Reset state flags
|
|
this->Processing = true;
|
|
this->Aborting = false;
|
|
// Initialize libuv asynchronous request
|
|
this->UVRequestBegin.init(*this->UVLoop, &cmWorkerPoolInternal::UVSlotBegin,
|
|
this);
|
|
this->UVRequestEnd.init(*this->UVLoop, &cmWorkerPoolInternal::UVSlotEnd,
|
|
this);
|
|
// Send begin request
|
|
this->UVRequestBegin.send();
|
|
// Run libuv loop
|
|
bool success = (uv_run(this->UVLoop.get(), UV_RUN_DEFAULT) == 0);
|
|
// Update state flags
|
|
this->Processing = false;
|
|
this->Aborting = false;
|
|
return success;
|
|
}
|
|
|
|
void cmWorkerPoolInternal::Abort()
|
|
{
|
|
// Clear all jobs and set abort flag
|
|
std::lock_guard<std::mutex> guard(this->Mutex);
|
|
if (!this->Aborting) {
|
|
// Register abort and clear queue
|
|
this->Aborting = true;
|
|
this->Queue.clear();
|
|
this->Condition.notify_all();
|
|
}
|
|
}
|
|
|
|
inline bool cmWorkerPoolInternal::PushJob(cmWorkerPool::JobHandleT&& jobHandle)
|
|
{
|
|
std::lock_guard<std::mutex> guard(this->Mutex);
|
|
if (this->Aborting) {
|
|
return false;
|
|
}
|
|
// Append the job to the queue
|
|
this->Queue.emplace_back(std::move(jobHandle));
|
|
// Notify an idle worker if there's one
|
|
if (this->WorkersIdle != 0) {
|
|
this->Condition.notify_one();
|
|
}
|
|
// Return success
|
|
return true;
|
|
}
|
|
|
|
void cmWorkerPoolInternal::UVSlotBegin(uv_async_t* handle)
|
|
{
|
|
auto& gint = *reinterpret_cast<cmWorkerPoolInternal*>(handle->data);
|
|
// Create worker threads
|
|
{
|
|
unsigned int const num = gint.Pool->ThreadCount();
|
|
// Create workers
|
|
gint.Workers.reserve(num);
|
|
for (unsigned int ii = 0; ii != num; ++ii) {
|
|
gint.Workers.emplace_back(
|
|
cm::make_unique<cmWorkerPoolWorker>(*gint.UVLoop));
|
|
}
|
|
// Start worker threads
|
|
for (unsigned int ii = 0; ii != num; ++ii) {
|
|
gint.Workers[ii]->SetThread(
|
|
std::thread(&cmWorkerPoolInternal::Work, &gint, ii));
|
|
}
|
|
}
|
|
// Destroy begin request
|
|
gint.UVRequestBegin.reset();
|
|
}
|
|
|
|
void cmWorkerPoolInternal::UVSlotEnd(uv_async_t* handle)
|
|
{
|
|
auto& gint = *reinterpret_cast<cmWorkerPoolInternal*>(handle->data);
|
|
// Join and destroy worker threads
|
|
gint.Workers.clear();
|
|
// Destroy end request
|
|
gint.UVRequestEnd.reset();
|
|
}
|
|
|
|
void cmWorkerPoolInternal::Work(unsigned int workerIndex)
|
|
{
|
|
cmWorkerPool::JobHandleT jobHandle;
|
|
std::unique_lock<std::mutex> uLock(this->Mutex);
|
|
// Increment running workers count
|
|
++this->WorkersRunning;
|
|
// Enter worker main loop
|
|
while (true) {
|
|
// Abort on request
|
|
if (this->Aborting) {
|
|
break;
|
|
}
|
|
// Wait for new jobs on the main CV
|
|
if (this->Queue.empty()) {
|
|
++this->WorkersIdle;
|
|
this->Condition.wait(uLock);
|
|
--this->WorkersIdle;
|
|
continue;
|
|
}
|
|
|
|
// If there is a fence currently active or waiting,
|
|
// sleep on the main CV and try again.
|
|
if (this->FenceProcessing) {
|
|
this->Condition.wait(uLock);
|
|
continue;
|
|
}
|
|
|
|
// Pop next job from queue
|
|
jobHandle = std::move(this->Queue.front());
|
|
this->Queue.pop_front();
|
|
|
|
// Check for fence jobs
|
|
bool raisedFence = false;
|
|
if (jobHandle->IsFence()) {
|
|
this->FenceProcessing = true;
|
|
raisedFence = true;
|
|
// Wait on the Fence CV until all pending jobs are done.
|
|
while (this->JobsProcessing != 0 && !this->Aborting) {
|
|
this->ConditionFence.wait(uLock);
|
|
}
|
|
// When aborting, explicitly kick all threads alive once more.
|
|
if (this->Aborting) {
|
|
this->FenceProcessing = false;
|
|
this->Condition.notify_all();
|
|
break;
|
|
}
|
|
}
|
|
|
|
// Unlocked scope for job processing
|
|
++this->JobsProcessing;
|
|
{
|
|
uLock.unlock();
|
|
jobHandle->Work(this->Pool, workerIndex); // Process job
|
|
jobHandle.reset(); // Destroy job
|
|
uLock.lock();
|
|
}
|
|
--this->JobsProcessing;
|
|
|
|
// If this was the thread that entered fence processing
|
|
// originally, notify all idling workers that the fence
|
|
// is done.
|
|
if (raisedFence) {
|
|
this->FenceProcessing = false;
|
|
this->Condition.notify_all();
|
|
}
|
|
// If fence processing is still not done, notify the
|
|
// the fencing worker when all active jobs are done.
|
|
if (this->FenceProcessing && this->JobsProcessing == 0) {
|
|
this->ConditionFence.notify_all();
|
|
}
|
|
}
|
|
|
|
// Decrement running workers count
|
|
if (--this->WorkersRunning == 0) {
|
|
// Last worker thread about to finish. Send libuv event.
|
|
this->UVRequestEnd.send();
|
|
}
|
|
}
|
|
|
|
cmWorkerPool::JobT::~JobT() = default;
|
|
|
|
bool cmWorkerPool::JobT::RunProcess(ProcessResultT& result,
|
|
std::vector<std::string> const& command,
|
|
std::string const& workingDirectory)
|
|
{
|
|
// Get worker by index
|
|
auto* wrk = this->Pool_->Int_->Workers.at(this->WorkerIndex_).get();
|
|
return wrk->RunProcess(result, command, workingDirectory);
|
|
}
|
|
|
|
cmWorkerPool::cmWorkerPool()
|
|
: Int_(cm::make_unique<cmWorkerPoolInternal>(this))
|
|
{
|
|
}
|
|
|
|
cmWorkerPool::~cmWorkerPool() = default;
|
|
|
|
void cmWorkerPool::SetThreadCount(unsigned int threadCount)
|
|
{
|
|
if (!this->Int_->Processing) {
|
|
this->ThreadCount_ = (threadCount > 0) ? threadCount : 1u;
|
|
}
|
|
}
|
|
|
|
bool cmWorkerPool::Process(void* userData)
|
|
{
|
|
// Setup user data
|
|
this->UserData_ = userData;
|
|
// Run libuv loop
|
|
bool success = this->Int_->Process();
|
|
// Clear user data
|
|
this->UserData_ = nullptr;
|
|
// Return
|
|
return success;
|
|
}
|
|
|
|
bool cmWorkerPool::PushJob(JobHandleT&& jobHandle)
|
|
{
|
|
return this->Int_->PushJob(std::move(jobHandle));
|
|
}
|
|
|
|
void cmWorkerPool::Abort()
|
|
{
|
|
this->Int_->Abort();
|
|
}
|