Meh, don't use futures

This commit is contained in:
Simon Quigley 2025-01-27 18:46:53 -06:00
parent b94ec17fd3
commit 7f3b73a857

View File

@ -639,69 +639,63 @@ std::string CiLogic::queue_pull_tarball(std::vector<std::shared_ptr<PackageConf>
std::mutex task_assignment_mutex; std::mutex task_assignment_mutex;
try { try {
std::vector<std::future<void>> futures;
for (auto &r : repos) { for (auto &r : repos) {
futures.push_back(std::async(std::launch::async, [this, &task_queue, &job_statuses, {
&encountered_items, &task_assignment_mutex, std::lock_guard<std::mutex> lock(task_assignment_mutex);
&r]() mutable { auto found_it = encountered_items.find(r->package->name);
{ if (found_it != encountered_items.end()) {
std::lock_guard<std::mutex> lock(task_assignment_mutex); // GHOST pull
auto found_it = encountered_items.find(r->package->name); auto existing_item = found_it->second;
if (found_it != encountered_items.end()) {
// GHOST pull
auto existing_item = found_it->second;
// Assign tasks (reuse the same Task objects for "pull"/"tarball") // Assign tasks (reuse the same Task objects for "pull"/"tarball")
r->assign_task(job_statuses->at("pull"), existing_item->first_pull_task, r); r->assign_task(job_statuses->at("pull"), existing_item->first_pull_task, r);
r->assign_task(job_statuses->at("tarball"), existing_item->first_tarball_task, r); r->assign_task(job_statuses->at("tarball"), existing_item->first_tarball_task, r);
// Point packaging_commit/upstream_commit to the real pkgconf's pointers // Point packaging_commit/upstream_commit to the real pkgconf's pointers
r->packaging_commit = existing_item->first_pkgconf->packaging_commit; r->packaging_commit = existing_item->first_pkgconf->packaging_commit;
r->upstream_commit = existing_item->first_pkgconf->upstream_commit; r->upstream_commit = existing_item->first_pkgconf->upstream_commit;
r->sync(); r->sync();
return; return;
}
} }
// REAL pull }
auto new_item = std::make_shared<package_conf_item>(); // REAL pull
new_item->first_pkgconf = r; auto new_item = std::make_shared<package_conf_item>();
r->sync(); new_item->first_pkgconf = r;
r->sync();
// Enqueue "pull" // Enqueue "pull"
task_queue->enqueue( task_queue->enqueue(
job_statuses->at("pull"), job_statuses->at("pull"),
[this, r](std::shared_ptr<Log> log) mutable { [this, r](std::shared_ptr<Log> log) mutable {
pull_project(r, log); pull_project(r, log);
r->sync(); r->sync();
}, },
r r
); );
{ {
std::lock_guard<std::mutex> lock(task_assignment_mutex); std::lock_guard<std::mutex> lock(task_assignment_mutex);
new_item->first_pull_task = r->get_task_by_jobstatus(job_statuses->at("pull")); new_item->first_pull_task = r->get_task_by_jobstatus(job_statuses->at("pull"));
} }
// Enqueue "tarball" // Enqueue "tarball"
task_queue->enqueue( task_queue->enqueue(
job_statuses->at("tarball"), job_statuses->at("tarball"),
[this, r](std::shared_ptr<Log> log) mutable { [this, r](std::shared_ptr<Log> log) mutable {
create_project_tarball(r, log); create_project_tarball(r, log);
r->sync(); r->sync();
}, },
r r
); );
{ {
std::lock_guard<std::mutex> lock(task_assignment_mutex); std::lock_guard<std::mutex> lock(task_assignment_mutex);
new_item->first_tarball_task = r->get_task_by_jobstatus(job_statuses->at("tarball")); new_item->first_tarball_task = r->get_task_by_jobstatus(job_statuses->at("tarball"));
encountered_items[r->package->name] = new_item; encountered_items[r->package->name] = new_item;
} }
r->sync(); r->sync();
}));
} }
for (auto& future : futures) future.get();
msg = "Succeeded"; msg = "Succeeded";
} catch (...) { } catch (...) {
msg = "Failed"; msg = "Failed";