From 966bdcd1cb941cd1bb3105dc8f202e5cc2302e95 Mon Sep 17 00:00:00 2001 From: Simon Quigley Date: Sat, 25 Jan 2025 13:08:19 -0600 Subject: [PATCH] Centralize exec() in db_common for gradual backoff retries --- cpp/ci_database_objs.cpp | 93 ++++++++++++++++------------------------ cpp/db_common.cpp | 26 ++++++++++- cpp/db_common.h | 2 + cpp/web_server.cpp | 10 ++--- 4 files changed, 68 insertions(+), 63 deletions(-) diff --git a/cpp/ci_database_objs.cpp b/cpp/ci_database_objs.cpp index f919afe..c2d3805 100644 --- a/cpp/ci_database_objs.cpp +++ b/cpp/ci_database_objs.cpp @@ -56,7 +56,7 @@ Release Release::get_release_by_id(int id) { query.prepare("SELECT id, version, codename, isDefault FROM release WHERE id = ? LIMIT 1"); query.bindValue(0, id); - if (!query.exec()) { + if (!ci_query_exec(&query)) { qDebug() << "Error executing query:" << query.lastError().text(); return Release(); } @@ -107,7 +107,7 @@ bool Release::set_releases(YAML::Node& releases) { query.bindValue(0, version); query.bindValue(1, QString::fromStdString(release)); query.bindValue(2, is_last); - if (!query.exec()) { return false; } + if (!ci_query_exec(&query)) { return false; } } // Remove the deletions @@ -115,7 +115,7 @@ bool Release::set_releases(YAML::Node& releases) { QSqlQuery query(get_thread_connection()); query.prepare("DELETE FROM release WHERE codename = ?"); query.bindValue(0, QString::fromStdString(release)); - if (!query.exec()) { return false; } + if (!ci_query_exec(&query)) { return false; } } return true; @@ -151,7 +151,7 @@ Package Package::get_package_by_id(int id) { QSqlQuery query(get_thread_connection()); query.prepare("SELECT id, name, large, upstream_url, packaging_branch, packaging_url FROM package WHERE id = ? LIMIT 1"); query.bindValue(0, id); - if (!query.exec()) { + if (!ci_query_exec(&query)) { qDebug() << "Error executing query:" << query.lastError().text(); return Package(); } @@ -222,7 +222,7 @@ bool Package::set_packages(YAML::Node& packages) { query.bindValue(2, upstream_url); query.bindValue(3, packaging_branch); query.bindValue(4, packaging_url); - if (!query.exec()) { return false; } + if (!ci_query_exec(&query)) { return false; } } // Remove the deletions @@ -230,7 +230,7 @@ bool Package::set_packages(YAML::Node& packages) { QSqlQuery query(get_thread_connection()); query.prepare("DELETE FROM package WHERE name = ?"); query.bindValue(0, QString::fromStdString(package)); - if (!query.exec()) { return false; } + if (!ci_query_exec(&query)) { return false; } } return true; @@ -291,7 +291,7 @@ Branch Branch::get_branch_by_id(int id) { QSqlQuery query(get_thread_connection()); query.prepare("SELECT id, name, upload_target, upload_target_ssh FROM branch WHERE id = ? LIMIT 1"); query.bindValue(0, id); - if (!query.exec()) { + if (!ci_query_exec(&query)) { qDebug() << "Error executing query:" << query.lastError().text(); return Branch(); } @@ -353,7 +353,7 @@ std::vector> PackageConf::get_package_confs(std::ma query_local.bindValue(0, package_id); query_local.bindValue(1, release_id); query_local.bindValue(2, branch_id); - if (!query_local.exec()) { + if (!ci_query_exec(&query_local)) { qDebug() << "Failed to get packageconf:" << query_local.lastError().text() << package_id << release_id << branch_id; } @@ -416,7 +416,7 @@ std::vector> PackageConf::get_package_confs(std::ma ON t.id = pjs.task_id )"); - if (!query.exec()) { + if (!ci_query_exec(&query)) { qDebug() << "Failed to load tasks:" << query.lastError().text(); } @@ -546,7 +546,7 @@ std::vector> PackageConf::get_package_confs_by_pack query_local.bindValue(0, package_id); query_local.bindValue(1, release_id); query_local.bindValue(2, branch_id); - if (!query_local.exec()) { + if (!ci_query_exec(&query_local)) { qDebug() << "Failed to get packageconf:" << query_local.lastError().text() << package_id << release_id << branch_id; } @@ -595,7 +595,7 @@ std::vector> PackageConf::get_package_confs_by_pack finish_time, successful, log FROM task )"); - if (!query.exec()) { + if (!ci_query_exec(&query)) { qDebug() << "Failed to load tasks:" << query.lastError().text(); } @@ -605,7 +605,7 @@ std::vector> PackageConf::get_package_confs_by_pack { QSqlQuery q2(get_thread_connection()); q2.prepare("SELECT id FROM jobstatus"); - if (!q2.exec()) { + if (!ci_query_exec(&q2)) { qDebug() << "Failed to load jobstatus list:" << q2.lastError().text(); } while (q2.next()) { @@ -717,7 +717,7 @@ bool PackageConf::set_package_confs() { // Fetch current PackageConf entries from the database QSqlQuery query(get_thread_connection()); query.prepare("SELECT package_id, release_id, branch_id FROM packageconf"); - if (!query.exec()) { + if (!ci_query_exec(&query)) { qDebug() << "Failed to fetch existing packageconfs:" << query.lastError().text(); return false; } @@ -788,7 +788,7 @@ bool PackageConf::set_package_confs() { insert_query.addBindValue(conf.release_id); insert_query.addBindValue(conf.branch_id); - if (!insert_query.exec()) { + if (!ci_query_exec(&insert_query)) { log_error("Failed to insert PackageConf: " + insert_query.lastError().text().toStdString() + " Package ID " + std::to_string(conf.package_id) @@ -811,7 +811,7 @@ bool PackageConf::set_package_confs() { delete_query.addBindValue(conf.release_id); delete_query.addBindValue(conf.branch_id); - if (!delete_query.exec()) { + if (!ci_query_exec(&delete_query)) { qDebug() << "Failed to delete packageconf:" << delete_query.lastError().text(); return false; } @@ -824,9 +824,9 @@ bool PackageConf::set_package_confs() { } void PackageConf::sync() { - bool task_succeeded = true; - int attempt = 0; - while (!task_succeeded) { + bool oneshot = true; + while (oneshot) { + oneshot = false; try { QSqlQuery query(get_thread_connection()); @@ -851,14 +851,7 @@ void PackageConf::sync() { query.addBindValue(branch->id); query.addBindValue(release->id); - attempt++; - task_succeeded = query.exec(); - if (!task_succeeded) { - if (query.lastError().text().contains("database is locked")) { - int delay = 1000 * static_cast(std::pow(2, attempt - 1)); - std::this_thread::sleep_for(std::chrono::milliseconds(delay)); - } else task_succeeded = true; - } + if (!ci_query_exec(&query)) break; } catch (...) {} } @@ -969,7 +962,7 @@ GitCommit::GitCommit( insert_query.addBindValue(QString::fromStdString(commit_author)); // Text insert_query.addBindValue(QString::fromStdString(commit_committer)); // Text - if (!insert_query.exec()) { + if (!ci_query_exec(&insert_query)) { // Log error with relevant details log_error("Failed to insert GitCommit: " + insert_query.lastError().text().toStdString()); return; @@ -1027,7 +1020,7 @@ GitCommit GitCommit::get_commit_by_id(int id) { ); query.bindValue(0, id); - if (!query.exec()) { + if (!ci_query_exec(&query)) { qDebug() << "Error executing query:" << query.lastError().text(); return GitCommit(); } @@ -1071,7 +1064,7 @@ std::optional GitCommit::get_commit_by_hash(const std::string commit_ ); query.bindValue(0, QString::fromStdString(commit_hash)); - if (!query.exec()) { + if (!ci_query_exec(&query)) { qDebug() << "Error executing query:" << query.lastError().text(); return GitCommit(); } @@ -1115,7 +1108,7 @@ JobStatus::JobStatus(int id) : id(id) { ); query.bindValue(0, id); - if (!query.exec()) { + if (!ci_query_exec(&query)) { qDebug() << "Error executing query:" << query.lastError().text(); } else if (query.next()) { id = query.value("id").toInt(); @@ -1138,7 +1131,7 @@ Task::Task(std::shared_ptr jobstatus, std::int64_t time, std::shared_ build_score = jobstatus->build_score; - if (!insert_query.exec()) { + if (!ci_query_exec(&insert_query)) { // Log error with relevant details log_error("Failed to insert Task: " + insert_query.lastError().text().toStdString()); return; @@ -1200,7 +1193,7 @@ std::set> Task::get_completed_tasks(std::vector log = std::make_shared(); @@ -1232,28 +1225,16 @@ std::set> Task::get_completed_tasks(std::vectorid); - query.addBindValue(QVariant::fromValue(static_cast(queue_time))); - query.addBindValue(QVariant::fromValue(static_cast(start_time))); - query.addBindValue(QVariant::fromValue(static_cast(finish_time))); - query.addBindValue(successful); - query.addBindValue(QString::fromStdString(std::regex_replace(log->get(), std::regex(R"(^\s+)"), ""))); - query.addBindValue(id); - task_succeeded = query.exec(); - attempt++; - if (!task_succeeded) { - if (query.lastError().text().contains("database is locked")) { - int delay = 1000 * static_cast(std::pow(2, attempt - 1)); - std::this_thread::sleep_for(std::chrono::milliseconds(delay)); - } else task_succeeded = true; - } - } - + QSqlQuery query(get_thread_connection()); + query.prepare("UPDATE task SET jobstatus_id = ?, queue_time = ?, start_time = ?, finish_time = ?, successful = ?, log = ? WHERE id = ?"); + query.addBindValue(jobstatus->id); + query.addBindValue(QVariant::fromValue(static_cast(queue_time))); + query.addBindValue(QVariant::fromValue(static_cast(start_time))); + query.addBindValue(QVariant::fromValue(static_cast(finish_time))); + query.addBindValue(successful); + query.addBindValue(QString::fromStdString(std::regex_replace(log->get(), std::regex(R"(^\s+)"), ""))); + query.addBindValue(id); + ci_query_exec(&query); QSqlQuery link_query(get_thread_connection()); int packageconf_id; @@ -1275,7 +1256,7 @@ void Task::save(int _packageconf_id) { link_query.bindValue(":packageconf_id", packageconf_id); link_query.bindValue(":jobstatus_id", jobstatus->id); - if (!link_query.exec()) { + if (!ci_query_exec(&link_query)) { qDebug() << "Failed to update packageconf_jobstatus_id for task" << id << ":" << link_query.lastError().text(); qDebug() << "packageconf_id:" << packageconf_id << "jobstatus_id:" << jobstatus->id @@ -1290,7 +1271,7 @@ void Task::save(int _packageconf_id) { link_query.bindValue(":jobstatus_id", jobstatus->id); link_query.bindValue(":task_id", id); - if (!link_query.exec()) { + if (!ci_query_exec(&link_query)) { qDebug() << "Failed to insert into packageconf_jobstatus_id for task" << id << ":" << link_query.lastError().text(); qDebug() << "packageconf_id:" << packageconf_id << "jobstatus_id:" << jobstatus->id diff --git a/cpp/db_common.cpp b/cpp/db_common.cpp index b6c7819..bc90892 100644 --- a/cpp/db_common.cpp +++ b/cpp/db_common.cpp @@ -14,13 +14,18 @@ // along with this program. If not, see . #include "db_common.h" + +#include +#include +#include +#include + #include #include #include #include -#include -#include +// get_thread_connection and init_database static std::mutex connection_mutex_; static std::atomic thread_id_counter{1}; static QString shared_database_path; @@ -51,6 +56,23 @@ QSqlDatabase get_thread_connection() { return thread_db; } +bool ci_query_exec(QSqlQuery* query) { + bool passed = false; + int attempt = 0; + while (passed) { + passed = query->exec(); + if (passed) return true; + attempt++; + + QSqlError error = query->lastError(); + if (error.text().contains("database is locked")) { + int delay = 1000 * static_cast(std::pow(2, attempt - 1)); + std::this_thread::sleep_for(std::chrono::milliseconds(delay)); + } else break; + } + return false; +} + bool init_database(const QString& database_path) { shared_database_path = database_path; diff --git a/cpp/db_common.h b/cpp/db_common.h index 63c6141..01d43aa 100644 --- a/cpp/db_common.h +++ b/cpp/db_common.h @@ -17,8 +17,10 @@ #define DB_COMMON_H #include +#include QSqlDatabase get_thread_connection(); +bool ci_query_exec(QSqlQuery* query); bool init_database(const QString& database_path); #endif // DB_COMMON_H diff --git a/cpp/web_server.cpp b/cpp/web_server.cpp index 738cc7b..4cf4b35 100644 --- a/cpp/web_server.cpp +++ b/cpp/web_server.cpp @@ -174,7 +174,7 @@ bool WebServer::start_server(quint16 port) { { QSqlQuery load_tokens(get_thread_connection()); load_tokens.prepare("SELECT person.id, person.username, person.logo_url, person_token.token, person_token.expiry_date FROM person INNER JOIN person_token ON person.id = person_token.person_id"); - load_tokens.exec(); + ci_query_exec(&load_tokens); while (load_tokens.next()) { int person_id = load_tokens.value(0).toInt(); QString username = load_tokens.value(1).toString(); @@ -194,7 +194,7 @@ bool WebServer::start_server(quint16 port) { expired_tokens.prepare("DELETE FROM person_token WHERE expiry_date < :current_time"); expired_tokens.bindValue(":current_time", QDateTime::currentDateTime().toString(Qt::ISODate)); - expired_tokens.exec(); + ci_query_exec(&expired_tokens); for (auto it = _active_tokens.begin(); it != _active_tokens.end();) { if (it.value() <= QDateTime::currentDateTime()) it = _active_tokens.erase(it); else ++it; @@ -371,7 +371,7 @@ bool WebServer::start_server(quint16 port) { QSqlQuery get_person(get_thread_connection()); get_person.prepare("SELECT id, username, logo_url FROM person WHERE username = ?"); get_person.bindValue(0, QString::fromStdString(username)); - if (!get_person.exec()) { qDebug() << "Error executing SELECT query for person:" << get_person.lastError(); } + if (!ci_query_exec(&get_person)) { qDebug() << "Error executing SELECT query for person:" << get_person.lastError(); } if (get_person.next()) { person = Person(get_person.value(0).toInt(), get_person.value(1).toString().toStdString(), @@ -381,7 +381,7 @@ bool WebServer::start_server(quint16 port) { insert_person.prepare("INSERT INTO person (username, logo_url) VALUES (?, ?)"); insert_person.bindValue(0, QString::fromStdString(username)); insert_person.bindValue(1, QString::fromStdString("https://api.launchpad.net/devel/~" + username + "/logo")); - if (!insert_person.exec()) { qDebug() << "Error executing INSERT query for person:" << insert_person.lastError(); } + if (!ci_query_exec(&insert_person)) { qDebug() << "Error executing INSERT query for person:" << insert_person.lastError(); } QVariant last_id = insert_person.lastInsertId(); if (last_id.isValid()) { @@ -401,7 +401,7 @@ bool WebServer::start_server(quint16 port) { insert_token.bindValue(0, person.id); insert_token.bindValue(1, token); insert_token.bindValue(2, one_day.toString(Qt::ISODate)); - if (!insert_token.exec()) { qDebug() << "Error executing INSERT query for token:" << insert_token.lastError(); } + if (!ci_query_exec(&insert_token)) { qDebug() << "Error executing INSERT query for token:" << insert_token.lastError(); } } QString final_html = QString(R"(