Queue source and build checks as system tasks

This commit is contained in:
Simon Quigley 2025-02-16 13:14:11 -06:00
parent cd5ad6c3a9
commit ee26b576c4
5 changed files with 86 additions and 59 deletions

View File

@ -1124,8 +1124,11 @@ Task::Task(std::shared_ptr<JobStatus> jobstatus, std::int64_t time, std::shared_
std::lock_guard<std::mutex> sync_lock(*sync_mutex_);
assert(log != nullptr && "Log pointer should never be null");
QSqlQuery insert_query(get_thread_connection());
if (packageconf) {
insert_query.prepare("INSERT INTO task (packageconf_id, jobstatus_id, queue_time) VALUES (?, ?, ?)");
insert_query.addBindValue(packageconf->id);
} else insert_query.prepare("INSERT INTO task (jobstatus_id, queue_time) VALUES (?, ?)");
insert_query.addBindValue(jobstatus->id);
insert_query.addBindValue(QVariant::fromValue(static_cast<qlonglong>(time)));
@ -1255,7 +1258,7 @@ void Task::save(int _packageconf_id) {
WHERE packageconf_id = :packageconf_id AND jobstatus_id = :jobstatus_id
)");
link_query.bindValue(":task_id", id);
link_query.bindValue(":packageconf_id", packageconf_id);
link_query.bindValue(":packageconf_id", (packageconf_id == 0) ? QVariant(QMetaType::fromType<int>()) : packageconf_id);
link_query.bindValue(":jobstatus_id", jobstatus->id);
if (!ci_query_exec(&link_query)) {
@ -1269,7 +1272,7 @@ void Task::save(int _packageconf_id) {
INSERT INTO packageconf_jobstatus_id (packageconf_id, jobstatus_id, task_id)
VALUES (:packageconf_id, :jobstatus_id, :task_id)
)");
link_query.bindValue(":packageconf_id", packageconf_id);
link_query.bindValue(":packageconf_id", (packageconf_id == 0) ? QVariant(QMetaType::fromType<int>()) : packageconf_id);
link_query.bindValue(":jobstatus_id", jobstatus->id);
link_query.bindValue(":task_id", id);

View File

@ -760,14 +760,15 @@ std::shared_ptr<std::map<std::string, std::shared_ptr<JobStatus>>> CiLogic::get_
static const auto statuses = std::make_shared<std::map<std::string, std::shared_ptr<JobStatus>>>(
std::map<std::string, std::shared_ptr<JobStatus>>{
{"pull", std::make_shared<JobStatus>(JobStatus(1))},
{"tarball", std::make_shared<JobStatus>(JobStatus(2))},
{"source_build", std::make_shared<JobStatus>(JobStatus(3))},
{"upload", std::make_shared<JobStatus>(JobStatus(4))},
{"source_check", std::make_shared<JobStatus>(JobStatus(5))},
{"build_check", std::make_shared<JobStatus>(JobStatus(6))},
{"lintian", std::make_shared<JobStatus>(JobStatus(7))},
{"britney", std::make_shared<JobStatus>(JobStatus(8))}
{"system", std::make_shared<JobStatus>(JobStatus(1))},
{"pull", std::make_shared<JobStatus>(JobStatus(2))},
{"tarball", std::make_shared<JobStatus>(JobStatus(3))},
{"source_build", std::make_shared<JobStatus>(JobStatus(4))},
{"upload", std::make_shared<JobStatus>(JobStatus(5))},
{"source_check", std::make_shared<JobStatus>(JobStatus(6))},
{"build_check", std::make_shared<JobStatus>(JobStatus(7))},
{"lintian", std::make_shared<JobStatus>(JobStatus(8))},
{"britney", std::make_shared<JobStatus>(JobStatus(9))}
}
);

View File

@ -200,6 +200,7 @@ bool init_database(const QString& database_path) {
INSERT OR IGNORE INTO jobstatus (build_score, name, display_name)
VALUES
(250, 'system', 'System Task'),
(80, 'pull', 'Pull'),
(70, 'tarball', 'Create Tarball'),
(60, 'source_build', 'Source Build'),
@ -211,7 +212,7 @@ bool init_database(const QString& database_path) {
CREATE TABLE IF NOT EXISTS task (
id INTEGER PRIMARY KEY,
packageconf_id INTEGER NOT NULL,
packageconf_id INTEGER DEFAULT NULL,
jobstatus_id INTEGER NOT NULL,
queue_time INTEGER DEFAULT 0,
start_time INTEGER DEFAULT 0,

View File

@ -38,7 +38,8 @@ void TaskQueue::enqueue(std::shared_ptr<JobStatus> jobstatus,
if (auto task_locked = self_weak.lock())
task_func(log);
};
packageconf->assign_task(jobstatus, task_ptr, packageconf);
if (jobstatus->name != "system") packageconf->assign_task(jobstatus, task_ptr, packageconf);
std::unique_lock<std::mutex> lock(tasks_mutex_);
tasks_.emplace(task_ptr);
}
@ -85,6 +86,11 @@ void TaskQueue::worker_thread() {
if (stop_ && tasks_.empty()) return;
auto it = tasks_.begin();
while (it != tasks_.end()) {
if (!(*it)->get_parent_packageconf()) {
task_to_execute = *it;
tasks_.erase(it);
break;
}
int package_id = (*it)->get_parent_packageconf()->package->id;
{
std::lock_guard<std::mutex> pkg_lock(running_packages_mutex_);
@ -101,7 +107,7 @@ void TaskQueue::worker_thread() {
}
}
if (!task_to_execute || !task_to_execute->func) continue;
{
else if (task_to_execute->get_parent_packageconf()) {
std::lock_guard<std::mutex> pkg_lock(running_packages_mutex_);
running_packages_.insert(task_to_execute->get_parent_packageconf()->package);
}
@ -137,7 +143,7 @@ void TaskQueue::worker_thread() {
running_tasks_.erase(it);
}
}
{
if (task_to_execute->get_parent_packageconf()) {
std::lock_guard<std::mutex> pkg_lock(running_packages_mutex_);
int package_id = task_to_execute->get_parent_packageconf()->package->id;
auto it = std::find_if(running_packages_.begin(), running_packages_.end(),

View File

@ -216,12 +216,18 @@ bool WebServer::start_server(quint16 port) {
});
process_sources_thread_ = std::jthread(run_task_every, 10, [this, all_repos, proposed, cilogic, job_statuses] {
std::shared_ptr<PackageConf> null_pkgconf;
task_queue->enqueue(
job_statuses->at("system"),
[this, all_repos, proposed, job_statuses](std::shared_ptr<Log> log) mutable {
for (auto pkgconf : all_repos) {
if (!pkgconf->can_check_source_upload()) continue;
std::string package_version = pkgconf->upstream_version + "-0ubuntu0~ppa" + std::to_string(pkgconf->ppa_revision);
log->append(std::format("Enqueueing build check for {}/{}", pkgconf->package->name, package_version));
task_queue->enqueue(
job_statuses->at("source_check"),
[this, pkgconf, proposed](std::shared_ptr<Log> log) mutable {
std::string package_version = pkgconf->upstream_version + "-0ubuntu0~ppa" + std::to_string(pkgconf->ppa_revision);
[this, package_version, pkgconf, proposed](std::shared_ptr<Log> log) mutable {
pkgconf->sync();
bool found_in_ppa = false;
for (auto spph : proposed.getPublishedSources("", "", std::nullopt, true, true, "", pkgconf->package->name, "", package_version)) {
found_in_ppa = true;
@ -232,17 +238,25 @@ bool WebServer::start_server(quint16 port) {
},
pkgconf
);
pkgconf->sync();
}
},
null_pkgconf
);
});
process_binaries_thread_ = std::jthread(run_task_every, 15, [this, all_repos, proposed, cilogic, job_statuses] {
std::shared_ptr<PackageConf> null_pkgconf;
task_queue->enqueue(
job_statuses->at("system"),
[this, all_repos, job_statuses, proposed](std::shared_ptr<Log> log) mutable {
for (auto pkgconf : all_repos) {
if (!pkgconf->can_check_builds()) continue;
std::string package_version = pkgconf->upstream_version + "-0ubuntu0~ppa" + std::to_string(pkgconf->ppa_revision);
log->append(std::format("Enqueueing build check for {}/{}", pkgconf->package->name, package_version));
task_queue->enqueue(
job_statuses->at("build_check"),
[this, pkgconf, proposed](std::shared_ptr<Log> log) mutable {
std::string package_version = pkgconf->upstream_version + "-0ubuntu0~ppa" + std::to_string(pkgconf->ppa_revision);
[this, proposed, pkgconf, package_version](std::shared_ptr<Log> log) mutable {
pkgconf->sync();
bool found_in_ppa = false;
source_package_publishing_history target_spph;
for (auto spph : proposed.getPublishedSources("", "", std::nullopt, true, true, "", pkgconf->package->name, "", package_version)) {
@ -262,12 +276,14 @@ bool WebServer::start_server(quint16 port) {
}
if (!all_builds_passed) throw std::runtime_error("Build(s) pending or failed, job is not successful.");
},
pkgconf
);
pkgconf->sync();
}
},
null_pkgconf
);
});
////////////////////////////////////////////////////////////////