From 216a54498f4b44fc96049e502dda9c3d819c11bf Mon Sep 17 00:00:00 2001 From: Simon Quigley Date: Fri, 20 Dec 2024 13:07:07 -0600 Subject: [PATCH] Move semaphore logic to common, try using it in lintian-ppa --- cpp/build-packages.cpp | 11 ------ cpp/common.cpp | 1 + cpp/common.h | 9 +++++ cpp/lintian-ppa.cpp | 82 ++++++++---------------------------------- 4 files changed, 24 insertions(+), 79 deletions(-) diff --git a/cpp/build-packages.cpp b/cpp/build-packages.cpp index a0af84a..a79e0f0 100644 --- a/cpp/build-packages.cpp +++ b/cpp/build-packages.cpp @@ -46,16 +46,6 @@ namespace fs = std::filesystem; -// Limit concurrency to 5, ensuring at most 5 processes at a time -static std::counting_semaphore<5> semaphore(5); - -// Helper RAII class to manage semaphore acquisition and release -struct semaphore_guard { - std::counting_semaphore<5> &sem; - semaphore_guard(std::counting_semaphore<5> &s) : sem(s) { sem.acquire(); } - ~semaphore_guard() { sem.release(); } -}; - // Mutex to protect access to the repo_mutexes map static std::mutex repo_map_mutex; @@ -510,7 +500,6 @@ static void update_changelog(const fs::path &packaging_dir, const std::string &r } static std::string build_package(const fs::path &packaging_dir, const std::map &env_vars, bool large, const std::string &pkg_name) { - // Removed semaphore.acquire() to let run_command_silent_on_success handle semaphore log_info("Building source package for " + pkg_name); fs::path temp_dir; std::error_code ec; diff --git a/cpp/common.cpp b/cpp/common.cpp index 4cd6fc0..0a24ac7 100644 --- a/cpp/common.cpp +++ b/cpp/common.cpp @@ -27,6 +27,7 @@ #include namespace fs = std::filesystem; +std::counting_semaphore<5> semaphore(5); static void log_info(const std::string &msg) { std::cout << "[INFO] " << msg << "\n"; diff --git a/cpp/common.h b/cpp/common.h index 75a3574..0b93123 100644 --- a/cpp/common.h +++ b/cpp/common.h @@ -18,9 +18,18 @@ #include #include #include +#include std::string parse_version(const std::filesystem::path &changelog_path); void run_command(const std::vector &cmd, const std::optional &cwd = std::nullopt, bool show_output=false); void clean_old_logs(const std::filesystem::path &log_dir, int max_age_seconds=86400); void create_tarball(const std::string& tarballPath, const std::string& directory, const std::vector& exclusions); std::string get_current_utc_time(); + +static std::counting_semaphore<5> semaphore(5); +struct semaphore_guard { + std::counting_semaphore<5> &sem; + semaphore_guard(std::counting_semaphore<5> &s) : sem(s) { sem.acquire(); } + ~semaphore_guard() { sem.release(); } +}; + diff --git a/cpp/lintian-ppa.cpp b/cpp/lintian-ppa.cpp index 8efea30..7641392 100644 --- a/cpp/lintian-ppa.cpp +++ b/cpp/lintian-ppa.cpp @@ -73,62 +73,6 @@ void log_error_custom(const std::string &msg) { } } -// Simple thread pool implementation -class ThreadPool { -public: - ThreadPool(size_t maxThreads) : stopFlag(false) { - for (size_t i = 0; i < maxThreads; ++i) { - workers.emplace_back([this]() { - while (true) { - std::function task; - - { - std::unique_lock lock(this->queueMutex); - this->condition.wait(lock, [this]() { return this->stopFlag || !this->tasks.empty(); }); - if (this->stopFlag && this->tasks.empty()) - return; - task = std::move(this->tasks.front()); - this->tasks.pop(); - } - - task(); - } - }); - } - } - - // Submit a task to the pool - template - void enqueue(F&& f) { - { - std::lock_guard lock(queueMutex); - if (stopFlag) - throw std::runtime_error("Enqueue on stopped ThreadPool"); - tasks.emplace(std::forward(f)); - } - condition.notify_one(); - } - - // Destructor joins all threads - ~ThreadPool() { - { - std::lock_guard lock(queueMutex); - stopFlag = true; - } - condition.notify_all(); - for (std::thread &worker: workers) - worker.join(); - } - -private: - std::vector workers; - std::queue> tasks; - - std::mutex queueMutex; - std::condition_variable condition; - bool stopFlag; -}; - // Function to parse command-line arguments struct Arguments { std::string user; @@ -441,14 +385,14 @@ int main(int argc, char* argv[]) { fs::create_directories(lintianDir); fs::create_directories(lintianTmpDir); - // Initialize ThreadPool with 5 threads - ThreadPool pool(5); + // Initialize a vector to hold all threads + std::vector threads; // Mutex for managing the published sources iterator std::mutex sourcesMutex; // Function to iterate over published sources and enqueue tasks - auto main_source_iter = [&](ThreadPool& poolRef, std::vector>& futures) { + auto main_source_iter = [&](std::vector& threadsRef) { // Path to .LAST_RUN file fs::path lastRunFile = lintianDir / ".LAST_RUN"; std::chrono::system_clock::time_point lastRunTime = std::chrono::system_clock::now() - std::chrono::hours(24*365); @@ -496,8 +440,9 @@ int main(int argc, char* argv[]) { if (build.buildstate == "Successfully built") { // Assuming build.datebuilt is a std::chrono::system_clock::time_point if (build.datebuilt >= lastRunTime) { - // Enqueue the process_sources task - poolRef.enqueue([=]() { + // Enqueue the process_sources task using semaphore and threads + threadsRef.emplace_back([=]() { + semaphore_guard guard(semaphore); process_sources(build.changesfile_url, fs::path(BASE_OUTPUT_DIR), lintianTmpDir); }); } @@ -506,15 +451,16 @@ int main(int argc, char* argv[]) { } }; - // Start main_source_iter in the thread pool - std::vector> futures; - pool.enqueue([&]() { main_source_iter(pool, futures); }); + // Start the main_source_iter and enqueue tasks + main_source_iter(threads); - // Wait for all tasks to complete by destructing the pool - // The ThreadPool destructor will wait for all tasks to finish - // So no additional synchronization is needed here + // Wait for all threads to complete + for(auto &t : threads) { + if(t.joinable()) { + t.join(); + } + } - // After all tasks are done, perform rsync log_info_custom("All lintian tasks completed. Syncing temporary lintian data to final directory."); rsync_copy(lintianTmpDir, lintianDir);