From 8af3fe19b407cb7054559ab90e58cc4dfede6293 Mon Sep 17 00:00:00 2001 From: Simon Quigley <simon@tsimonq2.net> Date: Mon, 27 Jan 2025 00:10:50 -0600 Subject: [PATCH] Use futures for queuing pull and tarball tasks --- cpp/ci_logic.cpp | 96 +++++++++++++++++++++++++----------------------- 1 file changed, 51 insertions(+), 45 deletions(-) diff --git a/cpp/ci_logic.cpp b/cpp/ci_logic.cpp index 0181733..7a703f4 100644 --- a/cpp/ci_logic.cpp +++ b/cpp/ci_logic.cpp @@ -639,60 +639,66 @@ std::string CiLogic::queue_pull_tarball(std::vector<std::shared_ptr<PackageConf> std::mutex task_assignment_mutex; try { + std::vector<std::future<void>> futures; for (auto &r : repos) { - { - std::lock_guard<std::mutex> lock(task_assignment_mutex); - auto found_it = encountered_items.find(r->package->name); - if (found_it != encountered_items.end()) { - // GHOST pull - auto existing_item = found_it->second; + futures.push_back(std::async(std::launch::async, [this, &task_queue, &job_statuses, + &encountered_items, &task_assignment_mutex, + &r]() mutable { + { + std::lock_guard<std::mutex> lock(task_assignment_mutex); + auto found_it = encountered_items.find(r->package->name); + if (found_it != encountered_items.end()) { + // GHOST pull + auto existing_item = found_it->second; - // Assign tasks (reuse the same Task objects for "pull"/"tarball") - r->assign_task(job_statuses->at("pull"), existing_item->first_pull_task, r); - r->assign_task(job_statuses->at("tarball"), existing_item->first_tarball_task, r); + // Assign tasks (reuse the same Task objects for "pull"/"tarball") + r->assign_task(job_statuses->at("pull"), existing_item->first_pull_task, r); + r->assign_task(job_statuses->at("tarball"), existing_item->first_tarball_task, r); - // Point packaging_commit/upstream_commit to the real pkgconf's pointers - r->packaging_commit = existing_item->first_pkgconf->packaging_commit; - r->upstream_commit = existing_item->first_pkgconf->upstream_commit; - r->sync(); - continue; + // Point packaging_commit/upstream_commit to the real pkgconf's pointers + r->packaging_commit = existing_item->first_pkgconf->packaging_commit; + r->upstream_commit = existing_item->first_pkgconf->upstream_commit; + r->sync(); + return; + } } - } - // REAL pull - auto new_item = std::make_shared<package_conf_item>(); - new_item->first_pkgconf = r; + // REAL pull + auto new_item = std::make_shared<package_conf_item>(); + new_item->first_pkgconf = r; - // Enqueue "pull" - task_queue->enqueue( - job_statuses->at("pull"), - [this, r](std::shared_ptr<Log> log) mutable { - pull_project(r, log); - r->sync(); - }, - r - ); + // Enqueue "pull" + task_queue->enqueue( + job_statuses->at("pull"), + [this, r](std::shared_ptr<Log> log) mutable { + pull_project(r, log); + r->sync(); + }, + r + ); - { - std::lock_guard<std::mutex> lock(task_assignment_mutex); - new_item->first_pull_task = r->get_task_by_jobstatus(job_statuses->at("pull")); - } + { + std::lock_guard<std::mutex> lock(task_assignment_mutex); + new_item->first_pull_task = r->get_task_by_jobstatus(job_statuses->at("pull")); + } - // Enqueue "tarball" - task_queue->enqueue( - job_statuses->at("tarball"), - [this, r](std::shared_ptr<Log> log) mutable { - create_project_tarball(r, log); - r->sync(); - }, - r - ); + // Enqueue "tarball" + task_queue->enqueue( + job_statuses->at("tarball"), + [this, r](std::shared_ptr<Log> log) mutable { + create_project_tarball(r, log); + r->sync(); + }, + r + ); - { - std::lock_guard<std::mutex> lock(task_assignment_mutex); - new_item->first_tarball_task = r->get_task_by_jobstatus(job_statuses->at("tarball")); - encountered_items[r->package->name] = new_item; - } + { + std::lock_guard<std::mutex> lock(task_assignment_mutex); + new_item->first_tarball_task = r->get_task_by_jobstatus(job_statuses->at("tarball")); + encountered_items[r->package->name] = new_item; + } + })); } + for (auto& future : futures) future.get(); msg = "Succeeded"; } catch (...) { msg = "Failed";