Centralize exec() in db_common for gradual backoff retries

main
Simon Quigley 1 week ago
parent 8dc972b4de
commit 966bdcd1cb

@ -56,7 +56,7 @@ Release Release::get_release_by_id(int id) {
query.prepare("SELECT id, version, codename, isDefault FROM release WHERE id = ? LIMIT 1");
query.bindValue(0, id);
if (!query.exec()) {
if (!ci_query_exec(&query)) {
qDebug() << "Error executing query:" << query.lastError().text();
return Release();
}
@ -107,7 +107,7 @@ bool Release::set_releases(YAML::Node& releases) {
query.bindValue(0, version);
query.bindValue(1, QString::fromStdString(release));
query.bindValue(2, is_last);
if (!query.exec()) { return false; }
if (!ci_query_exec(&query)) { return false; }
}
// Remove the deletions
@ -115,7 +115,7 @@ bool Release::set_releases(YAML::Node& releases) {
QSqlQuery query(get_thread_connection());
query.prepare("DELETE FROM release WHERE codename = ?");
query.bindValue(0, QString::fromStdString(release));
if (!query.exec()) { return false; }
if (!ci_query_exec(&query)) { return false; }
}
return true;
@ -151,7 +151,7 @@ Package Package::get_package_by_id(int id) {
QSqlQuery query(get_thread_connection());
query.prepare("SELECT id, name, large, upstream_url, packaging_branch, packaging_url FROM package WHERE id = ? LIMIT 1");
query.bindValue(0, id);
if (!query.exec()) {
if (!ci_query_exec(&query)) {
qDebug() << "Error executing query:" << query.lastError().text();
return Package();
}
@ -222,7 +222,7 @@ bool Package::set_packages(YAML::Node& packages) {
query.bindValue(2, upstream_url);
query.bindValue(3, packaging_branch);
query.bindValue(4, packaging_url);
if (!query.exec()) { return false; }
if (!ci_query_exec(&query)) { return false; }
}
// Remove the deletions
@ -230,7 +230,7 @@ bool Package::set_packages(YAML::Node& packages) {
QSqlQuery query(get_thread_connection());
query.prepare("DELETE FROM package WHERE name = ?");
query.bindValue(0, QString::fromStdString(package));
if (!query.exec()) { return false; }
if (!ci_query_exec(&query)) { return false; }
}
return true;
@ -291,7 +291,7 @@ Branch Branch::get_branch_by_id(int id) {
QSqlQuery query(get_thread_connection());
query.prepare("SELECT id, name, upload_target, upload_target_ssh FROM branch WHERE id = ? LIMIT 1");
query.bindValue(0, id);
if (!query.exec()) {
if (!ci_query_exec(&query)) {
qDebug() << "Error executing query:" << query.lastError().text();
return Branch();
}
@ -353,7 +353,7 @@ std::vector<std::shared_ptr<PackageConf>> PackageConf::get_package_confs(std::ma
query_local.bindValue(0, package_id);
query_local.bindValue(1, release_id);
query_local.bindValue(2, branch_id);
if (!query_local.exec()) {
if (!ci_query_exec(&query_local)) {
qDebug() << "Failed to get packageconf:" << query_local.lastError().text()
<< package_id << release_id << branch_id;
}
@ -416,7 +416,7 @@ std::vector<std::shared_ptr<PackageConf>> PackageConf::get_package_confs(std::ma
ON
t.id = pjs.task_id
)");
if (!query.exec()) {
if (!ci_query_exec(&query)) {
qDebug() << "Failed to load tasks:" << query.lastError().text();
}
@ -546,7 +546,7 @@ std::vector<std::shared_ptr<PackageConf>> PackageConf::get_package_confs_by_pack
query_local.bindValue(0, package_id);
query_local.bindValue(1, release_id);
query_local.bindValue(2, branch_id);
if (!query_local.exec()) {
if (!ci_query_exec(&query_local)) {
qDebug() << "Failed to get packageconf:" << query_local.lastError().text()
<< package_id << release_id << branch_id;
}
@ -595,7 +595,7 @@ std::vector<std::shared_ptr<PackageConf>> PackageConf::get_package_confs_by_pack
finish_time, successful, log
FROM task
)");
if (!query.exec()) {
if (!ci_query_exec(&query)) {
qDebug() << "Failed to load tasks:" << query.lastError().text();
}
@ -605,7 +605,7 @@ std::vector<std::shared_ptr<PackageConf>> PackageConf::get_package_confs_by_pack
{
QSqlQuery q2(get_thread_connection());
q2.prepare("SELECT id FROM jobstatus");
if (!q2.exec()) {
if (!ci_query_exec(&q2)) {
qDebug() << "Failed to load jobstatus list:" << q2.lastError().text();
}
while (q2.next()) {
@ -717,7 +717,7 @@ bool PackageConf::set_package_confs() {
// Fetch current PackageConf entries from the database
QSqlQuery query(get_thread_connection());
query.prepare("SELECT package_id, release_id, branch_id FROM packageconf");
if (!query.exec()) {
if (!ci_query_exec(&query)) {
qDebug() << "Failed to fetch existing packageconfs:" << query.lastError().text();
return false;
}
@ -788,7 +788,7 @@ bool PackageConf::set_package_confs() {
insert_query.addBindValue(conf.release_id);
insert_query.addBindValue(conf.branch_id);
if (!insert_query.exec()) {
if (!ci_query_exec(&insert_query)) {
log_error("Failed to insert PackageConf: "
+ insert_query.lastError().text().toStdString()
+ " Package ID " + std::to_string(conf.package_id)
@ -811,7 +811,7 @@ bool PackageConf::set_package_confs() {
delete_query.addBindValue(conf.release_id);
delete_query.addBindValue(conf.branch_id);
if (!delete_query.exec()) {
if (!ci_query_exec(&delete_query)) {
qDebug() << "Failed to delete packageconf:" << delete_query.lastError().text();
return false;
}
@ -824,9 +824,9 @@ bool PackageConf::set_package_confs() {
}
void PackageConf::sync() {
bool task_succeeded = true;
int attempt = 0;
while (!task_succeeded) {
bool oneshot = true;
while (oneshot) {
oneshot = false;
try {
QSqlQuery query(get_thread_connection());
@ -851,14 +851,7 @@ void PackageConf::sync() {
query.addBindValue(branch->id);
query.addBindValue(release->id);
attempt++;
task_succeeded = query.exec();
if (!task_succeeded) {
if (query.lastError().text().contains("database is locked")) {
int delay = 1000 * static_cast<int>(std::pow(2, attempt - 1));
std::this_thread::sleep_for(std::chrono::milliseconds(delay));
} else task_succeeded = true;
}
if (!ci_query_exec(&query)) break;
} catch (...) {}
}
@ -969,7 +962,7 @@ GitCommit::GitCommit(
insert_query.addBindValue(QString::fromStdString(commit_author)); // Text
insert_query.addBindValue(QString::fromStdString(commit_committer)); // Text
if (!insert_query.exec()) {
if (!ci_query_exec(&insert_query)) {
// Log error with relevant details
log_error("Failed to insert GitCommit: " + insert_query.lastError().text().toStdString());
return;
@ -1027,7 +1020,7 @@ GitCommit GitCommit::get_commit_by_id(int id) {
);
query.bindValue(0, id);
if (!query.exec()) {
if (!ci_query_exec(&query)) {
qDebug() << "Error executing query:" << query.lastError().text();
return GitCommit();
}
@ -1071,7 +1064,7 @@ std::optional<GitCommit> GitCommit::get_commit_by_hash(const std::string commit_
);
query.bindValue(0, QString::fromStdString(commit_hash));
if (!query.exec()) {
if (!ci_query_exec(&query)) {
qDebug() << "Error executing query:" << query.lastError().text();
return GitCommit();
}
@ -1115,7 +1108,7 @@ JobStatus::JobStatus(int id) : id(id) {
);
query.bindValue(0, id);
if (!query.exec()) {
if (!ci_query_exec(&query)) {
qDebug() << "Error executing query:" << query.lastError().text();
} else if (query.next()) {
id = query.value("id").toInt();
@ -1138,7 +1131,7 @@ Task::Task(std::shared_ptr<JobStatus> jobstatus, std::int64_t time, std::shared_
build_score = jobstatus->build_score;
if (!insert_query.exec()) {
if (!ci_query_exec(&insert_query)) {
// Log error with relevant details
log_error("Failed to insert Task: " + insert_query.lastError().text().toStdString());
return;
@ -1200,7 +1193,7 @@ std::set<std::shared_ptr<Task>> Task::get_completed_tasks(std::vector<std::share
query.bindValue(0, per_page);
query.bindValue(1, page);
if (!query.exec()) {
if (!ci_query_exec(&query)) {
qDebug() << "Error getting completed tasks:" << query.lastError().text();
} while (query.next()) {
std::shared_ptr<Log> log = std::make_shared<Log>();
@ -1232,28 +1225,16 @@ std::set<std::shared_ptr<Task>> Task::get_completed_tasks(std::vector<std::share
}
void Task::save(int _packageconf_id) {
bool task_succeeded = false;
int attempt = 0;
while (!task_succeeded) {
QSqlQuery query(get_thread_connection());
query.prepare("UPDATE task SET jobstatus_id = ?, queue_time = ?, start_time = ?, finish_time = ?, successful = ?, log = ? WHERE id = ?");
query.addBindValue(jobstatus->id);
query.addBindValue(QVariant::fromValue(static_cast<qlonglong>(queue_time)));
query.addBindValue(QVariant::fromValue(static_cast<qlonglong>(start_time)));
query.addBindValue(QVariant::fromValue(static_cast<qlonglong>(finish_time)));
query.addBindValue(successful);
query.addBindValue(QString::fromStdString(std::regex_replace(log->get(), std::regex(R"(^\s+)"), "")));
query.addBindValue(id);
task_succeeded = query.exec();
attempt++;
if (!task_succeeded) {
if (query.lastError().text().contains("database is locked")) {
int delay = 1000 * static_cast<int>(std::pow(2, attempt - 1));
std::this_thread::sleep_for(std::chrono::milliseconds(delay));
} else task_succeeded = true;
}
}
QSqlQuery query(get_thread_connection());
query.prepare("UPDATE task SET jobstatus_id = ?, queue_time = ?, start_time = ?, finish_time = ?, successful = ?, log = ? WHERE id = ?");
query.addBindValue(jobstatus->id);
query.addBindValue(QVariant::fromValue(static_cast<qlonglong>(queue_time)));
query.addBindValue(QVariant::fromValue(static_cast<qlonglong>(start_time)));
query.addBindValue(QVariant::fromValue(static_cast<qlonglong>(finish_time)));
query.addBindValue(successful);
query.addBindValue(QString::fromStdString(std::regex_replace(log->get(), std::regex(R"(^\s+)"), "")));
query.addBindValue(id);
ci_query_exec(&query);
QSqlQuery link_query(get_thread_connection());
int packageconf_id;
@ -1275,7 +1256,7 @@ void Task::save(int _packageconf_id) {
link_query.bindValue(":packageconf_id", packageconf_id);
link_query.bindValue(":jobstatus_id", jobstatus->id);
if (!link_query.exec()) {
if (!ci_query_exec(&link_query)) {
qDebug() << "Failed to update packageconf_jobstatus_id for task" << id << ":"
<< link_query.lastError().text();
qDebug() << "packageconf_id:" << packageconf_id << "jobstatus_id:" << jobstatus->id
@ -1290,7 +1271,7 @@ void Task::save(int _packageconf_id) {
link_query.bindValue(":jobstatus_id", jobstatus->id);
link_query.bindValue(":task_id", id);
if (!link_query.exec()) {
if (!ci_query_exec(&link_query)) {
qDebug() << "Failed to insert into packageconf_jobstatus_id for task" << id << ":"
<< link_query.lastError().text();
qDebug() << "packageconf_id:" << packageconf_id << "jobstatus_id:" << jobstatus->id

@ -14,13 +14,18 @@
// along with this program. If not, see <https://www.gnu.org/licenses/>.
#include "db_common.h"
#include <atomic>
#include <chrono>
#include <mutex>
#include <thread>
#include <QSqlDatabase>
#include <QSqlError>
#include <QSqlQuery>
#include <QString>
#include <atomic>
#include <mutex>
// get_thread_connection and init_database
static std::mutex connection_mutex_;
static std::atomic<unsigned int> thread_id_counter{1};
static QString shared_database_path;
@ -51,6 +56,23 @@ QSqlDatabase get_thread_connection() {
return thread_db;
}
bool ci_query_exec(QSqlQuery* query) {
bool passed = false;
int attempt = 0;
while (passed) {
passed = query->exec();
if (passed) return true;
attempt++;
QSqlError error = query->lastError();
if (error.text().contains("database is locked")) {
int delay = 1000 * static_cast<int>(std::pow(2, attempt - 1));
std::this_thread::sleep_for(std::chrono::milliseconds(delay));
} else break;
}
return false;
}
bool init_database(const QString& database_path) {
shared_database_path = database_path;

@ -17,8 +17,10 @@
#define DB_COMMON_H
#include <QSqlDatabase>
#include <QSqlQuery>
QSqlDatabase get_thread_connection();
bool ci_query_exec(QSqlQuery* query);
bool init_database(const QString& database_path);
#endif // DB_COMMON_H

@ -174,7 +174,7 @@ bool WebServer::start_server(quint16 port) {
{
QSqlQuery load_tokens(get_thread_connection());
load_tokens.prepare("SELECT person.id, person.username, person.logo_url, person_token.token, person_token.expiry_date FROM person INNER JOIN person_token ON person.id = person_token.person_id");
load_tokens.exec();
ci_query_exec(&load_tokens);
while (load_tokens.next()) {
int person_id = load_tokens.value(0).toInt();
QString username = load_tokens.value(1).toString();
@ -194,7 +194,7 @@ bool WebServer::start_server(quint16 port) {
expired_tokens.prepare("DELETE FROM person_token WHERE expiry_date < :current_time");
expired_tokens.bindValue(":current_time", QDateTime::currentDateTime().toString(Qt::ISODate));
expired_tokens.exec();
ci_query_exec(&expired_tokens);
for (auto it = _active_tokens.begin(); it != _active_tokens.end();) {
if (it.value() <= QDateTime::currentDateTime()) it = _active_tokens.erase(it);
else ++it;
@ -371,7 +371,7 @@ bool WebServer::start_server(quint16 port) {
QSqlQuery get_person(get_thread_connection());
get_person.prepare("SELECT id, username, logo_url FROM person WHERE username = ?");
get_person.bindValue(0, QString::fromStdString(username));
if (!get_person.exec()) { qDebug() << "Error executing SELECT query for person:" << get_person.lastError(); }
if (!ci_query_exec(&get_person)) { qDebug() << "Error executing SELECT query for person:" << get_person.lastError(); }
if (get_person.next()) {
person = Person(get_person.value(0).toInt(), get_person.value(1).toString().toStdString(),
@ -381,7 +381,7 @@ bool WebServer::start_server(quint16 port) {
insert_person.prepare("INSERT INTO person (username, logo_url) VALUES (?, ?)");
insert_person.bindValue(0, QString::fromStdString(username));
insert_person.bindValue(1, QString::fromStdString("https://api.launchpad.net/devel/~" + username + "/logo"));
if (!insert_person.exec()) { qDebug() << "Error executing INSERT query for person:" << insert_person.lastError(); }
if (!ci_query_exec(&insert_person)) { qDebug() << "Error executing INSERT query for person:" << insert_person.lastError(); }
QVariant last_id = insert_person.lastInsertId();
if (last_id.isValid()) {
@ -401,7 +401,7 @@ bool WebServer::start_server(quint16 port) {
insert_token.bindValue(0, person.id);
insert_token.bindValue(1, token);
insert_token.bindValue(2, one_day.toString(Qt::ISODate));
if (!insert_token.exec()) { qDebug() << "Error executing INSERT query for token:" << insert_token.lastError(); }
if (!ci_query_exec(&insert_token)) { qDebug() << "Error executing INSERT query for token:" << insert_token.lastError(); }
}
QString final_html = QString(R"(

Loading…
Cancel
Save