You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
777 lines
20 KiB
777 lines
20 KiB
/* Distributed under the OSI-approved BSD 3-Clause License. See accompanying
|
|
file Copyright.txt or https://cmake.org/licensing for details. */
|
|
#include "cmWorkerPool.h"
|
|
|
|
#include <algorithm>
|
|
#include <array>
|
|
#include <condition_variable>
|
|
#include <cstddef>
|
|
#include <deque>
|
|
#include <functional>
|
|
#include <mutex>
|
|
#include <thread>
|
|
|
|
#include <cm/memory>
|
|
|
|
#include <cm3p/uv.h>
|
|
|
|
#include "cmRange.h"
|
|
#include "cmStringAlgorithms.h"
|
|
#include "cmUVHandlePtr.h"
|
|
#include "cmUVSignalHackRAII.h" // IWYU pragma: keep
|
|
|
|
/**
|
|
* @brief libuv pipe buffer class
|
|
*/
|
|
class cmUVPipeBuffer
|
|
{
|
|
public:
|
|
using DataRange = cmRange<const char*>;
|
|
using DataFunction = std::function<void(DataRange)>;
|
|
/// On error the ssize_t argument is a non zero libuv error code
|
|
using EndFunction = std::function<void(ssize_t)>;
|
|
|
|
public:
|
|
/**
|
|
* Reset to construction state
|
|
*/
|
|
void reset();
|
|
|
|
/**
|
|
* Initializes uv_pipe(), uv_stream() and uv_handle()
|
|
* @return true on success
|
|
*/
|
|
bool init(uv_loop_t* uv_loop);
|
|
|
|
/**
|
|
* Start reading
|
|
* @return true on success
|
|
*/
|
|
bool startRead(DataFunction dataFunction, EndFunction endFunction);
|
|
|
|
//! libuv pipe
|
|
uv_pipe_t* uv_pipe() const { return UVPipe_.get(); }
|
|
//! uv_pipe() casted to libuv stream
|
|
uv_stream_t* uv_stream() const { return static_cast<uv_stream_t*>(UVPipe_); }
|
|
//! uv_pipe() casted to libuv handle
|
|
uv_handle_t* uv_handle() { return static_cast<uv_handle_t*>(UVPipe_); }
|
|
|
|
private:
|
|
// -- Libuv callbacks
|
|
static void UVAlloc(uv_handle_t* handle, size_t suggestedSize,
|
|
uv_buf_t* buf);
|
|
static void UVData(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf);
|
|
|
|
private:
|
|
cm::uv_pipe_ptr UVPipe_;
|
|
std::vector<char> Buffer_;
|
|
DataFunction DataFunction_;
|
|
EndFunction EndFunction_;
|
|
};
|
|
|
|
void cmUVPipeBuffer::reset()
|
|
{
|
|
if (UVPipe_.get() != nullptr) {
|
|
EndFunction_ = nullptr;
|
|
DataFunction_ = nullptr;
|
|
Buffer_.clear();
|
|
Buffer_.shrink_to_fit();
|
|
UVPipe_.reset();
|
|
}
|
|
}
|
|
|
|
bool cmUVPipeBuffer::init(uv_loop_t* uv_loop)
|
|
{
|
|
reset();
|
|
if (uv_loop == nullptr) {
|
|
return false;
|
|
}
|
|
int ret = UVPipe_.init(*uv_loop, 0, this);
|
|
return (ret == 0);
|
|
}
|
|
|
|
bool cmUVPipeBuffer::startRead(DataFunction dataFunction,
|
|
EndFunction endFunction)
|
|
{
|
|
if (UVPipe_.get() == nullptr) {
|
|
return false;
|
|
}
|
|
if (!dataFunction || !endFunction) {
|
|
return false;
|
|
}
|
|
DataFunction_ = std::move(dataFunction);
|
|
EndFunction_ = std::move(endFunction);
|
|
int ret = uv_read_start(uv_stream(), &cmUVPipeBuffer::UVAlloc,
|
|
&cmUVPipeBuffer::UVData);
|
|
return (ret == 0);
|
|
}
|
|
|
|
void cmUVPipeBuffer::UVAlloc(uv_handle_t* handle, size_t suggestedSize,
|
|
uv_buf_t* buf)
|
|
{
|
|
auto& pipe = *reinterpret_cast<cmUVPipeBuffer*>(handle->data);
|
|
pipe.Buffer_.resize(suggestedSize);
|
|
buf->base = pipe.Buffer_.data();
|
|
buf->len = static_cast<unsigned long>(pipe.Buffer_.size());
|
|
}
|
|
|
|
void cmUVPipeBuffer::UVData(uv_stream_t* stream, ssize_t nread,
|
|
const uv_buf_t* buf)
|
|
{
|
|
auto& pipe = *reinterpret_cast<cmUVPipeBuffer*>(stream->data);
|
|
if (nread > 0) {
|
|
if (buf->base != nullptr) {
|
|
// Call data function
|
|
pipe.DataFunction_(DataRange(buf->base, buf->base + nread));
|
|
}
|
|
} else if (nread < 0) {
|
|
// Save the end function on the stack before resetting the pipe
|
|
EndFunction efunc;
|
|
efunc.swap(pipe.EndFunction_);
|
|
// Reset pipe before calling the end function
|
|
pipe.reset();
|
|
// Call end function
|
|
efunc((nread == UV_EOF) ? 0 : nread);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* @brief External process management class
|
|
*/
|
|
class cmUVReadOnlyProcess
|
|
{
|
|
public:
|
|
// -- Types
|
|
//! @brief Process settings
|
|
struct SetupT
|
|
{
|
|
std::string WorkingDirectory;
|
|
std::vector<std::string> Command;
|
|
cmWorkerPool::ProcessResultT* Result = nullptr;
|
|
bool MergedOutput = false;
|
|
};
|
|
|
|
public:
|
|
// -- Const accessors
|
|
SetupT const& Setup() const { return Setup_; }
|
|
cmWorkerPool::ProcessResultT* Result() const { return Setup_.Result; }
|
|
bool IsStarted() const { return IsStarted_; }
|
|
bool IsFinished() const { return IsFinished_; }
|
|
|
|
// -- Runtime
|
|
void setup(cmWorkerPool::ProcessResultT* result, bool mergedOutput,
|
|
std::vector<std::string> const& command,
|
|
std::string const& workingDirectory = std::string());
|
|
bool start(uv_loop_t* uv_loop, std::function<void()> finishedCallback);
|
|
|
|
private:
|
|
// -- Libuv callbacks
|
|
static void UVExit(uv_process_t* handle, int64_t exitStatus, int termSignal);
|
|
void UVPipeOutData(cmUVPipeBuffer::DataRange data);
|
|
void UVPipeOutEnd(ssize_t error);
|
|
void UVPipeErrData(cmUVPipeBuffer::DataRange data);
|
|
void UVPipeErrEnd(ssize_t error);
|
|
void UVTryFinish();
|
|
|
|
private:
|
|
// -- Setup
|
|
SetupT Setup_;
|
|
// -- Runtime
|
|
bool IsStarted_ = false;
|
|
bool IsFinished_ = false;
|
|
std::function<void()> FinishedCallback_;
|
|
std::vector<const char*> CommandPtr_;
|
|
std::array<uv_stdio_container_t, 3> UVOptionsStdIO_;
|
|
uv_process_options_t UVOptions_;
|
|
cm::uv_process_ptr UVProcess_;
|
|
cmUVPipeBuffer UVPipeOut_;
|
|
cmUVPipeBuffer UVPipeErr_;
|
|
};
|
|
|
|
void cmUVReadOnlyProcess::setup(cmWorkerPool::ProcessResultT* result,
|
|
bool mergedOutput,
|
|
std::vector<std::string> const& command,
|
|
std::string const& workingDirectory)
|
|
{
|
|
Setup_.WorkingDirectory = workingDirectory;
|
|
Setup_.Command = command;
|
|
Setup_.Result = result;
|
|
Setup_.MergedOutput = mergedOutput;
|
|
}
|
|
|
|
bool cmUVReadOnlyProcess::start(uv_loop_t* uv_loop,
|
|
std::function<void()> finishedCallback)
|
|
{
|
|
if (IsStarted() || (Result() == nullptr)) {
|
|
return false;
|
|
}
|
|
|
|
// Reset result before the start
|
|
Result()->reset();
|
|
|
|
// Fill command string pointers
|
|
if (!Setup().Command.empty()) {
|
|
CommandPtr_.reserve(Setup().Command.size() + 1);
|
|
for (std::string const& arg : Setup().Command) {
|
|
CommandPtr_.push_back(arg.c_str());
|
|
}
|
|
CommandPtr_.push_back(nullptr);
|
|
} else {
|
|
Result()->ErrorMessage = "Empty command";
|
|
}
|
|
|
|
if (!Result()->error()) {
|
|
if (!UVPipeOut_.init(uv_loop)) {
|
|
Result()->ErrorMessage = "libuv stdout pipe initialization failed";
|
|
}
|
|
}
|
|
if (!Result()->error()) {
|
|
if (!UVPipeErr_.init(uv_loop)) {
|
|
Result()->ErrorMessage = "libuv stderr pipe initialization failed";
|
|
}
|
|
}
|
|
if (!Result()->error()) {
|
|
// -- Setup process stdio options
|
|
// stdin
|
|
UVOptionsStdIO_[0].flags = UV_IGNORE;
|
|
UVOptionsStdIO_[0].data.stream = nullptr;
|
|
// stdout
|
|
UVOptionsStdIO_[1].flags =
|
|
static_cast<uv_stdio_flags>(UV_CREATE_PIPE | UV_WRITABLE_PIPE);
|
|
UVOptionsStdIO_[1].data.stream = UVPipeOut_.uv_stream();
|
|
// stderr
|
|
UVOptionsStdIO_[2].flags =
|
|
static_cast<uv_stdio_flags>(UV_CREATE_PIPE | UV_WRITABLE_PIPE);
|
|
UVOptionsStdIO_[2].data.stream = UVPipeErr_.uv_stream();
|
|
|
|
// -- Setup process options
|
|
std::fill_n(reinterpret_cast<char*>(&UVOptions_), sizeof(UVOptions_), 0);
|
|
UVOptions_.exit_cb = &cmUVReadOnlyProcess::UVExit;
|
|
UVOptions_.file = CommandPtr_[0];
|
|
UVOptions_.args = const_cast<char**>(CommandPtr_.data());
|
|
UVOptions_.cwd = Setup_.WorkingDirectory.c_str();
|
|
UVOptions_.flags = UV_PROCESS_WINDOWS_HIDE;
|
|
UVOptions_.stdio_count = static_cast<int>(UVOptionsStdIO_.size());
|
|
UVOptions_.stdio = UVOptionsStdIO_.data();
|
|
|
|
// -- Spawn process
|
|
int uvErrorCode = UVProcess_.spawn(*uv_loop, UVOptions_, this);
|
|
if (uvErrorCode != 0) {
|
|
Result()->ErrorMessage = "libuv process spawn failed";
|
|
if (const char* uvErr = uv_strerror(uvErrorCode)) {
|
|
Result()->ErrorMessage += ": ";
|
|
Result()->ErrorMessage += uvErr;
|
|
}
|
|
}
|
|
}
|
|
// -- Start reading from stdio streams
|
|
if (!Result()->error()) {
|
|
if (!UVPipeOut_.startRead(
|
|
[this](cmUVPipeBuffer::DataRange range) {
|
|
this->UVPipeOutData(range);
|
|
},
|
|
[this](ssize_t error) { this->UVPipeOutEnd(error); })) {
|
|
Result()->ErrorMessage = "libuv start reading from stdout pipe failed";
|
|
}
|
|
}
|
|
if (!Result()->error()) {
|
|
if (!UVPipeErr_.startRead(
|
|
[this](cmUVPipeBuffer::DataRange range) {
|
|
this->UVPipeErrData(range);
|
|
},
|
|
[this](ssize_t error) { this->UVPipeErrEnd(error); })) {
|
|
Result()->ErrorMessage = "libuv start reading from stderr pipe failed";
|
|
}
|
|
}
|
|
|
|
if (!Result()->error()) {
|
|
IsStarted_ = true;
|
|
FinishedCallback_ = std::move(finishedCallback);
|
|
} else {
|
|
// Clear libuv handles and finish
|
|
UVProcess_.reset();
|
|
UVPipeOut_.reset();
|
|
UVPipeErr_.reset();
|
|
CommandPtr_.clear();
|
|
}
|
|
|
|
return IsStarted();
|
|
}
|
|
|
|
void cmUVReadOnlyProcess::UVExit(uv_process_t* handle, int64_t exitStatus,
|
|
int termSignal)
|
|
{
|
|
auto& proc = *reinterpret_cast<cmUVReadOnlyProcess*>(handle->data);
|
|
if (proc.IsStarted() && !proc.IsFinished()) {
|
|
// Set error message on demand
|
|
proc.Result()->ExitStatus = exitStatus;
|
|
proc.Result()->TermSignal = termSignal;
|
|
if (!proc.Result()->error()) {
|
|
if (termSignal != 0) {
|
|
proc.Result()->ErrorMessage = cmStrCat(
|
|
"Process was terminated by signal ", proc.Result()->TermSignal);
|
|
} else if (exitStatus != 0) {
|
|
proc.Result()->ErrorMessage = cmStrCat(
|
|
"Process failed with return value ", proc.Result()->ExitStatus);
|
|
}
|
|
}
|
|
|
|
// Reset process handle
|
|
proc.UVProcess_.reset();
|
|
// Try finish
|
|
proc.UVTryFinish();
|
|
}
|
|
}
|
|
|
|
void cmUVReadOnlyProcess::UVPipeOutData(cmUVPipeBuffer::DataRange data)
|
|
{
|
|
Result()->StdOut.append(data.begin(), data.end());
|
|
}
|
|
|
|
void cmUVReadOnlyProcess::UVPipeOutEnd(ssize_t error)
|
|
{
|
|
// Process pipe error
|
|
if ((error != 0) && !Result()->error()) {
|
|
Result()->ErrorMessage = cmStrCat(
|
|
"Reading from stdout pipe failed with libuv error code ", error);
|
|
}
|
|
// Try finish
|
|
UVTryFinish();
|
|
}
|
|
|
|
void cmUVReadOnlyProcess::UVPipeErrData(cmUVPipeBuffer::DataRange data)
|
|
{
|
|
std::string* str =
|
|
Setup_.MergedOutput ? &Result()->StdOut : &Result()->StdErr;
|
|
str->append(data.begin(), data.end());
|
|
}
|
|
|
|
void cmUVReadOnlyProcess::UVPipeErrEnd(ssize_t error)
|
|
{
|
|
// Process pipe error
|
|
if ((error != 0) && !Result()->error()) {
|
|
Result()->ErrorMessage = cmStrCat(
|
|
"Reading from stderr pipe failed with libuv error code ", error);
|
|
}
|
|
// Try finish
|
|
UVTryFinish();
|
|
}
|
|
|
|
void cmUVReadOnlyProcess::UVTryFinish()
|
|
{
|
|
// There still might be data in the pipes after the process has finished.
|
|
// Therefore check if the process is finished AND all pipes are closed
|
|
// before signaling the worker thread to continue.
|
|
if ((UVProcess_.get() != nullptr) || (UVPipeOut_.uv_pipe() != nullptr) ||
|
|
(UVPipeErr_.uv_pipe() != nullptr)) {
|
|
return;
|
|
}
|
|
IsFinished_ = true;
|
|
FinishedCallback_();
|
|
}
|
|
|
|
/**
|
|
* @brief Worker pool worker thread
|
|
*/
|
|
class cmWorkerPoolWorker
|
|
{
|
|
public:
|
|
cmWorkerPoolWorker(uv_loop_t& uvLoop);
|
|
~cmWorkerPoolWorker();
|
|
|
|
cmWorkerPoolWorker(cmWorkerPoolWorker const&) = delete;
|
|
cmWorkerPoolWorker& operator=(cmWorkerPoolWorker const&) = delete;
|
|
|
|
/**
|
|
* Set the internal thread
|
|
*/
|
|
void SetThread(std::thread&& aThread) { Thread_ = std::move(aThread); }
|
|
|
|
/**
|
|
* Run an external process
|
|
*/
|
|
bool RunProcess(cmWorkerPool::ProcessResultT& result,
|
|
std::vector<std::string> const& command,
|
|
std::string const& workingDirectory);
|
|
|
|
private:
|
|
// -- Libuv callbacks
|
|
static void UVProcessStart(uv_async_t* handle);
|
|
void UVProcessFinished();
|
|
|
|
private:
|
|
// -- Process management
|
|
struct
|
|
{
|
|
std::mutex Mutex;
|
|
cm::uv_async_ptr Request;
|
|
std::condition_variable Condition;
|
|
std::unique_ptr<cmUVReadOnlyProcess> ROP;
|
|
} Proc_;
|
|
// -- System thread
|
|
std::thread Thread_;
|
|
};
|
|
|
|
cmWorkerPoolWorker::cmWorkerPoolWorker(uv_loop_t& uvLoop)
|
|
{
|
|
Proc_.Request.init(uvLoop, &cmWorkerPoolWorker::UVProcessStart, this);
|
|
}
|
|
|
|
cmWorkerPoolWorker::~cmWorkerPoolWorker()
|
|
{
|
|
if (Thread_.joinable()) {
|
|
Thread_.join();
|
|
}
|
|
}
|
|
|
|
bool cmWorkerPoolWorker::RunProcess(cmWorkerPool::ProcessResultT& result,
|
|
std::vector<std::string> const& command,
|
|
std::string const& workingDirectory)
|
|
{
|
|
if (command.empty()) {
|
|
return false;
|
|
}
|
|
// Create process instance
|
|
{
|
|
std::lock_guard<std::mutex> lock(Proc_.Mutex);
|
|
Proc_.ROP = cm::make_unique<cmUVReadOnlyProcess>();
|
|
Proc_.ROP->setup(&result, true, command, workingDirectory);
|
|
}
|
|
// Send asynchronous process start request to libuv loop
|
|
Proc_.Request.send();
|
|
// Wait until the process has been finished and destroyed
|
|
{
|
|
std::unique_lock<std::mutex> ulock(Proc_.Mutex);
|
|
while (Proc_.ROP) {
|
|
Proc_.Condition.wait(ulock);
|
|
}
|
|
}
|
|
return !result.error();
|
|
}
|
|
|
|
void cmWorkerPoolWorker::UVProcessStart(uv_async_t* handle)
|
|
{
|
|
auto* wrk = reinterpret_cast<cmWorkerPoolWorker*>(handle->data);
|
|
bool startFailed = false;
|
|
{
|
|
auto& Proc = wrk->Proc_;
|
|
std::lock_guard<std::mutex> lock(Proc.Mutex);
|
|
if (Proc.ROP && !Proc.ROP->IsStarted()) {
|
|
startFailed =
|
|
!Proc.ROP->start(handle->loop, [wrk] { wrk->UVProcessFinished(); });
|
|
}
|
|
}
|
|
// Clean up if starting of the process failed
|
|
if (startFailed) {
|
|
wrk->UVProcessFinished();
|
|
}
|
|
}
|
|
|
|
void cmWorkerPoolWorker::UVProcessFinished()
|
|
{
|
|
std::lock_guard<std::mutex> lock(Proc_.Mutex);
|
|
if (Proc_.ROP && (Proc_.ROP->IsFinished() || !Proc_.ROP->IsStarted())) {
|
|
Proc_.ROP.reset();
|
|
}
|
|
// Notify idling thread
|
|
Proc_.Condition.notify_one();
|
|
}
|
|
|
|
/**
|
|
* @brief Private worker pool internals
|
|
*/
|
|
class cmWorkerPoolInternal
|
|
{
|
|
public:
|
|
// -- Constructors
|
|
cmWorkerPoolInternal(cmWorkerPool* pool);
|
|
~cmWorkerPoolInternal();
|
|
|
|
/**
|
|
* Runs the libuv loop.
|
|
*/
|
|
bool Process();
|
|
|
|
/**
|
|
* Clear queue and abort threads.
|
|
*/
|
|
void Abort();
|
|
|
|
/**
|
|
* Push a job to the queue and notify a worker.
|
|
*/
|
|
bool PushJob(cmWorkerPool::JobHandleT&& jobHandle);
|
|
|
|
/**
|
|
* Worker thread main loop method.
|
|
*/
|
|
void Work(unsigned int workerIndex);
|
|
|
|
// -- Request slots
|
|
static void UVSlotBegin(uv_async_t* handle);
|
|
static void UVSlotEnd(uv_async_t* handle);
|
|
|
|
public:
|
|
// -- UV loop
|
|
#ifdef CMAKE_UV_SIGNAL_HACK
|
|
std::unique_ptr<cmUVSignalHackRAII> UVHackRAII;
|
|
#endif
|
|
std::unique_ptr<uv_loop_t> UVLoop;
|
|
cm::uv_async_ptr UVRequestBegin;
|
|
cm::uv_async_ptr UVRequestEnd;
|
|
|
|
// -- Thread pool and job queue
|
|
std::mutex Mutex;
|
|
bool Processing = false;
|
|
bool Aborting = false;
|
|
bool FenceProcessing = false;
|
|
unsigned int WorkersRunning = 0;
|
|
unsigned int WorkersIdle = 0;
|
|
unsigned int JobsProcessing = 0;
|
|
std::deque<cmWorkerPool::JobHandleT> Queue;
|
|
std::condition_variable Condition;
|
|
std::condition_variable ConditionFence;
|
|
std::vector<std::unique_ptr<cmWorkerPoolWorker>> Workers;
|
|
|
|
// -- References
|
|
cmWorkerPool* Pool = nullptr;
|
|
};
|
|
|
|
void cmWorkerPool::ProcessResultT::reset()
|
|
{
|
|
ExitStatus = 0;
|
|
TermSignal = 0;
|
|
if (!StdOut.empty()) {
|
|
StdOut.clear();
|
|
StdOut.shrink_to_fit();
|
|
}
|
|
if (!StdErr.empty()) {
|
|
StdErr.clear();
|
|
StdErr.shrink_to_fit();
|
|
}
|
|
if (!ErrorMessage.empty()) {
|
|
ErrorMessage.clear();
|
|
ErrorMessage.shrink_to_fit();
|
|
}
|
|
}
|
|
|
|
cmWorkerPoolInternal::cmWorkerPoolInternal(cmWorkerPool* pool)
|
|
: Pool(pool)
|
|
{
|
|
// Initialize libuv loop
|
|
uv_disable_stdio_inheritance();
|
|
#ifdef CMAKE_UV_SIGNAL_HACK
|
|
UVHackRAII = cm::make_unique<cmUVSignalHackRAII>();
|
|
#endif
|
|
UVLoop = cm::make_unique<uv_loop_t>();
|
|
uv_loop_init(UVLoop.get());
|
|
}
|
|
|
|
cmWorkerPoolInternal::~cmWorkerPoolInternal()
|
|
{
|
|
uv_loop_close(UVLoop.get());
|
|
}
|
|
|
|
bool cmWorkerPoolInternal::Process()
|
|
{
|
|
// Reset state flags
|
|
Processing = true;
|
|
Aborting = false;
|
|
// Initialize libuv asynchronous request
|
|
UVRequestBegin.init(*UVLoop, &cmWorkerPoolInternal::UVSlotBegin, this);
|
|
UVRequestEnd.init(*UVLoop, &cmWorkerPoolInternal::UVSlotEnd, this);
|
|
// Send begin request
|
|
UVRequestBegin.send();
|
|
// Run libuv loop
|
|
bool success = (uv_run(UVLoop.get(), UV_RUN_DEFAULT) == 0);
|
|
// Update state flags
|
|
Processing = false;
|
|
Aborting = false;
|
|
return success;
|
|
}
|
|
|
|
void cmWorkerPoolInternal::Abort()
|
|
{
|
|
// Clear all jobs and set abort flag
|
|
std::lock_guard<std::mutex> guard(Mutex);
|
|
if (!Aborting) {
|
|
// Register abort and clear queue
|
|
Aborting = true;
|
|
Queue.clear();
|
|
Condition.notify_all();
|
|
}
|
|
}
|
|
|
|
inline bool cmWorkerPoolInternal::PushJob(cmWorkerPool::JobHandleT&& jobHandle)
|
|
{
|
|
std::lock_guard<std::mutex> guard(Mutex);
|
|
if (Aborting) {
|
|
return false;
|
|
}
|
|
// Append the job to the queue
|
|
Queue.emplace_back(std::move(jobHandle));
|
|
// Notify an idle worker if there's one
|
|
if (WorkersIdle != 0) {
|
|
Condition.notify_one();
|
|
}
|
|
// Return success
|
|
return true;
|
|
}
|
|
|
|
void cmWorkerPoolInternal::UVSlotBegin(uv_async_t* handle)
|
|
{
|
|
auto& gint = *reinterpret_cast<cmWorkerPoolInternal*>(handle->data);
|
|
// Create worker threads
|
|
{
|
|
unsigned int const num = gint.Pool->ThreadCount();
|
|
// Create workers
|
|
gint.Workers.reserve(num);
|
|
for (unsigned int ii = 0; ii != num; ++ii) {
|
|
gint.Workers.emplace_back(
|
|
cm::make_unique<cmWorkerPoolWorker>(*gint.UVLoop));
|
|
}
|
|
// Start worker threads
|
|
for (unsigned int ii = 0; ii != num; ++ii) {
|
|
gint.Workers[ii]->SetThread(
|
|
std::thread(&cmWorkerPoolInternal::Work, &gint, ii));
|
|
}
|
|
}
|
|
// Destroy begin request
|
|
gint.UVRequestBegin.reset();
|
|
}
|
|
|
|
void cmWorkerPoolInternal::UVSlotEnd(uv_async_t* handle)
|
|
{
|
|
auto& gint = *reinterpret_cast<cmWorkerPoolInternal*>(handle->data);
|
|
// Join and destroy worker threads
|
|
gint.Workers.clear();
|
|
// Destroy end request
|
|
gint.UVRequestEnd.reset();
|
|
}
|
|
|
|
void cmWorkerPoolInternal::Work(unsigned int workerIndex)
|
|
{
|
|
cmWorkerPool::JobHandleT jobHandle;
|
|
std::unique_lock<std::mutex> uLock(Mutex);
|
|
// Increment running workers count
|
|
++WorkersRunning;
|
|
// Enter worker main loop
|
|
while (true) {
|
|
// Abort on request
|
|
if (Aborting) {
|
|
break;
|
|
}
|
|
// Wait for new jobs on the main CV
|
|
if (Queue.empty()) {
|
|
++WorkersIdle;
|
|
Condition.wait(uLock);
|
|
--WorkersIdle;
|
|
continue;
|
|
}
|
|
|
|
// If there is a fence currently active or waiting,
|
|
// sleep on the main CV and try again.
|
|
if (FenceProcessing) {
|
|
Condition.wait(uLock);
|
|
continue;
|
|
}
|
|
|
|
// Pop next job from queue
|
|
jobHandle = std::move(Queue.front());
|
|
Queue.pop_front();
|
|
|
|
// Check for fence jobs
|
|
bool raisedFence = false;
|
|
if (jobHandle->IsFence()) {
|
|
FenceProcessing = true;
|
|
raisedFence = true;
|
|
// Wait on the Fence CV until all pending jobs are done.
|
|
while (JobsProcessing != 0 && !Aborting) {
|
|
ConditionFence.wait(uLock);
|
|
}
|
|
// When aborting, explicitly kick all threads alive once more.
|
|
if (Aborting) {
|
|
FenceProcessing = false;
|
|
Condition.notify_all();
|
|
break;
|
|
}
|
|
}
|
|
|
|
// Unlocked scope for job processing
|
|
++JobsProcessing;
|
|
{
|
|
uLock.unlock();
|
|
jobHandle->Work(Pool, workerIndex); // Process job
|
|
jobHandle.reset(); // Destroy job
|
|
uLock.lock();
|
|
}
|
|
--JobsProcessing;
|
|
|
|
// If this was the thread that entered fence processing
|
|
// originally, notify all idling workers that the fence
|
|
// is done.
|
|
if (raisedFence) {
|
|
FenceProcessing = false;
|
|
Condition.notify_all();
|
|
}
|
|
// If fence processing is still not done, notify the
|
|
// the fencing worker when all active jobs are done.
|
|
if (FenceProcessing && JobsProcessing == 0) {
|
|
ConditionFence.notify_all();
|
|
}
|
|
}
|
|
|
|
// Decrement running workers count
|
|
if (--WorkersRunning == 0) {
|
|
// Last worker thread about to finish. Send libuv event.
|
|
UVRequestEnd.send();
|
|
}
|
|
}
|
|
|
|
cmWorkerPool::JobT::~JobT() = default;
|
|
|
|
bool cmWorkerPool::JobT::RunProcess(ProcessResultT& result,
|
|
std::vector<std::string> const& command,
|
|
std::string const& workingDirectory)
|
|
{
|
|
// Get worker by index
|
|
auto* wrk = Pool_->Int_->Workers.at(WorkerIndex_).get();
|
|
return wrk->RunProcess(result, command, workingDirectory);
|
|
}
|
|
|
|
cmWorkerPool::cmWorkerPool()
|
|
: Int_(cm::make_unique<cmWorkerPoolInternal>(this))
|
|
{
|
|
}
|
|
|
|
cmWorkerPool::~cmWorkerPool() = default;
|
|
|
|
void cmWorkerPool::SetThreadCount(unsigned int threadCount)
|
|
{
|
|
if (!Int_->Processing) {
|
|
ThreadCount_ = (threadCount > 0) ? threadCount : 1u;
|
|
}
|
|
}
|
|
|
|
bool cmWorkerPool::Process(void* userData)
|
|
{
|
|
// Setup user data
|
|
UserData_ = userData;
|
|
// Run libuv loop
|
|
bool success = Int_->Process();
|
|
// Clear user data
|
|
UserData_ = nullptr;
|
|
// Return
|
|
return success;
|
|
}
|
|
|
|
bool cmWorkerPool::PushJob(JobHandleT&& jobHandle)
|
|
{
|
|
return Int_->PushJob(std::move(jobHandle));
|
|
}
|
|
|
|
void cmWorkerPool::Abort()
|
|
{
|
|
Int_->Abort();
|
|
}
|