Move semaphore logic to common, try using it in lintian-ppa

main
Simon Quigley 1 month ago
parent 3a4e2fd78a
commit 216a54498f

@ -46,16 +46,6 @@
namespace fs = std::filesystem;
// Limit concurrency to 5, ensuring at most 5 processes at a time
static std::counting_semaphore<5> semaphore(5);
// Helper RAII class to manage semaphore acquisition and release
struct semaphore_guard {
std::counting_semaphore<5> &sem;
semaphore_guard(std::counting_semaphore<5> &s) : sem(s) { sem.acquire(); }
~semaphore_guard() { sem.release(); }
};
// Mutex to protect access to the repo_mutexes map
static std::mutex repo_map_mutex;
@ -510,7 +500,6 @@ static void update_changelog(const fs::path &packaging_dir, const std::string &r
}
static std::string build_package(const fs::path &packaging_dir, const std::map<std::string, std::string> &env_vars, bool large, const std::string &pkg_name) {
// Removed semaphore.acquire() to let run_command_silent_on_success handle semaphore
log_info("Building source package for " + pkg_name);
fs::path temp_dir;
std::error_code ec;

@ -27,6 +27,7 @@
#include <regex>
namespace fs = std::filesystem;
std::counting_semaphore<5> semaphore(5);
static void log_info(const std::string &msg) {
std::cout << "[INFO] " << msg << "\n";

@ -18,9 +18,18 @@
#include <vector>
#include <filesystem>
#include <optional>
#include <semaphore>
std::string parse_version(const std::filesystem::path &changelog_path);
void run_command(const std::vector<std::string> &cmd, const std::optional<std::filesystem::path> &cwd = std::nullopt, bool show_output=false);
void clean_old_logs(const std::filesystem::path &log_dir, int max_age_seconds=86400);
void create_tarball(const std::string& tarballPath, const std::string& directory, const std::vector<std::string>& exclusions);
std::string get_current_utc_time();
static std::counting_semaphore<5> semaphore(5);
struct semaphore_guard {
std::counting_semaphore<5> &sem;
semaphore_guard(std::counting_semaphore<5> &s) : sem(s) { sem.acquire(); }
~semaphore_guard() { sem.release(); }
};

@ -73,62 +73,6 @@ void log_error_custom(const std::string &msg) {
}
}
// Simple thread pool implementation
class ThreadPool {
public:
ThreadPool(size_t maxThreads) : stopFlag(false) {
for (size_t i = 0; i < maxThreads; ++i) {
workers.emplace_back([this]() {
while (true) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(this->queueMutex);
this->condition.wait(lock, [this]() { return this->stopFlag || !this->tasks.empty(); });
if (this->stopFlag && this->tasks.empty())
return;
task = std::move(this->tasks.front());
this->tasks.pop();
}
task();
}
});
}
}
// Submit a task to the pool
template<class F>
void enqueue(F&& f) {
{
std::lock_guard<std::mutex> lock(queueMutex);
if (stopFlag)
throw std::runtime_error("Enqueue on stopped ThreadPool");
tasks.emplace(std::forward<F>(f));
}
condition.notify_one();
}
// Destructor joins all threads
~ThreadPool() {
{
std::lock_guard<std::mutex> lock(queueMutex);
stopFlag = true;
}
condition.notify_all();
for (std::thread &worker: workers)
worker.join();
}
private:
std::vector<std::thread> workers;
std::queue<std::function<void()>> tasks;
std::mutex queueMutex;
std::condition_variable condition;
bool stopFlag;
};
// Function to parse command-line arguments
struct Arguments {
std::string user;
@ -441,14 +385,14 @@ int main(int argc, char* argv[]) {
fs::create_directories(lintianDir);
fs::create_directories(lintianTmpDir);
// Initialize ThreadPool with 5 threads
ThreadPool pool(5);
// Initialize a vector to hold all threads
std::vector<std::thread> threads;
// Mutex for managing the published sources iterator
std::mutex sourcesMutex;
// Function to iterate over published sources and enqueue tasks
auto main_source_iter = [&](ThreadPool& poolRef, std::vector<std::future<void>>& futures) {
auto main_source_iter = [&](std::vector<std::thread>& threadsRef) {
// Path to .LAST_RUN file
fs::path lastRunFile = lintianDir / ".LAST_RUN";
std::chrono::system_clock::time_point lastRunTime = std::chrono::system_clock::now() - std::chrono::hours(24*365);
@ -496,8 +440,9 @@ int main(int argc, char* argv[]) {
if (build.buildstate == "Successfully built") {
// Assuming build.datebuilt is a std::chrono::system_clock::time_point
if (build.datebuilt >= lastRunTime) {
// Enqueue the process_sources task
poolRef.enqueue([=]() {
// Enqueue the process_sources task using semaphore and threads
threadsRef.emplace_back([=]() {
semaphore_guard guard(semaphore);
process_sources(build.changesfile_url, fs::path(BASE_OUTPUT_DIR), lintianTmpDir);
});
}
@ -506,15 +451,16 @@ int main(int argc, char* argv[]) {
}
};
// Start main_source_iter in the thread pool
std::vector<std::future<void>> futures;
pool.enqueue([&]() { main_source_iter(pool, futures); });
// Start the main_source_iter and enqueue tasks
main_source_iter(threads);
// Wait for all tasks to complete by destructing the pool
// The ThreadPool destructor will wait for all tasks to finish
// So no additional synchronization is needed here
// Wait for all threads to complete
for(auto &t : threads) {
if(t.joinable()) {
t.join();
}
}
// After all tasks are done, perform rsync
log_info_custom("All lintian tasks completed. Syncing temporary lintian data to final directory.");
rsync_copy(lintianTmpDir, lintianDir);

Loading…
Cancel
Save