Centralize the database thread connection code
This commit is contained in:
parent
10d43282b6
commit
775e5bddb7
@ -39,6 +39,7 @@ add_library(lubuntuci_lib SHARED
|
||||
web_server.cpp
|
||||
sources_parser.cpp
|
||||
naive_bayes_classifier.cpp
|
||||
db_common.cpp
|
||||
)
|
||||
|
||||
target_include_directories(lubuntuci_lib PUBLIC
|
||||
|
@ -15,6 +15,7 @@
|
||||
|
||||
#include "ci_database_objs.h"
|
||||
#include "utilities.h"
|
||||
#include "db_common.h"
|
||||
|
||||
#include <algorithm>
|
||||
#include <ranges>
|
||||
@ -37,10 +38,10 @@ Person::Person(int id, const std::string username, const std::string logo_url)
|
||||
Release::Release(int id, int version, const std::string& codename, bool isDefault)
|
||||
: id(id), version(version), codename(codename), isDefault(isDefault) {}
|
||||
|
||||
std::vector<Release> Release::get_releases(QSqlDatabase& p_db) {
|
||||
std::vector<Release> Release::get_releases() {
|
||||
std::vector<Release> result;
|
||||
QString query_str = "SELECT id, version, codename, isDefault FROM release;";
|
||||
QSqlQuery query(query_str, p_db);
|
||||
QSqlQuery query(query_str, get_thread_connection());
|
||||
while (query.next()) {
|
||||
Release current_release(query.value("id").toInt(), query.value("version").toInt(),
|
||||
query.value("codename").toString().toStdString(),
|
||||
@ -50,8 +51,8 @@ std::vector<Release> Release::get_releases(QSqlDatabase& p_db) {
|
||||
return result;
|
||||
}
|
||||
|
||||
Release Release::get_release_by_id(QSqlDatabase& p_db, int id) {
|
||||
QSqlQuery query(p_db);
|
||||
Release Release::get_release_by_id(int id) {
|
||||
QSqlQuery query(get_thread_connection());
|
||||
|
||||
query.prepare("SELECT id, version, codename, isDefault FROM release WHERE id = ? LIMIT 1");
|
||||
query.bindValue(0, id);
|
||||
@ -75,8 +76,8 @@ Release Release::get_release_by_id(QSqlDatabase& p_db, int id) {
|
||||
return Release();
|
||||
}
|
||||
|
||||
bool Release::set_releases(QSqlDatabase& p_db, YAML::Node& releases) {
|
||||
std::vector<Release> current_releases = get_releases(p_db);
|
||||
bool Release::set_releases(YAML::Node& releases) {
|
||||
std::vector<Release> current_releases = get_releases();
|
||||
|
||||
// Use set subtraction to determine which releases need to be added and removed
|
||||
// The first operation is releases - current_releases which shows all *additions*
|
||||
@ -101,7 +102,7 @@ bool Release::set_releases(QSqlDatabase& p_db, YAML::Node& releases) {
|
||||
// Insert the additions
|
||||
for (const auto& release : additions) {
|
||||
auto [version, is_last] = get_version_from_codename(release);
|
||||
QSqlQuery query(p_db);
|
||||
QSqlQuery query(get_thread_connection());
|
||||
query.prepare("INSERT INTO release (version, codename, isDefault) VALUES (?, ?, ?)");
|
||||
query.bindValue(0, version);
|
||||
query.bindValue(1, QString::fromStdString(release));
|
||||
@ -111,7 +112,7 @@ bool Release::set_releases(QSqlDatabase& p_db, YAML::Node& releases) {
|
||||
|
||||
// Remove the deletions
|
||||
for (const auto& release : deletions) {
|
||||
QSqlQuery query(p_db);
|
||||
QSqlQuery query(get_thread_connection());
|
||||
query.prepare("DELETE FROM release WHERE codename = ?");
|
||||
query.bindValue(0, QString::fromStdString(release));
|
||||
if (!query.exec()) { return false; }
|
||||
@ -131,10 +132,10 @@ Package::Package(int id, const std::string& name, bool large, const std::string&
|
||||
packaging_browser = transform_url(packaging_url);
|
||||
}
|
||||
|
||||
std::vector<Package> Package::get_packages(QSqlDatabase& p_db) {
|
||||
std::vector<Package> Package::get_packages() {
|
||||
std::vector<Package> result;
|
||||
QString query_str = "SELECT id, name, large, upstream_url, packaging_branch, packaging_url FROM package";
|
||||
QSqlQuery query(query_str, p_db);
|
||||
QSqlQuery query(query_str, get_thread_connection());
|
||||
while (query.next()) {
|
||||
Package current_package(query.value("id").toInt(), query.value("name").toString().toStdString(),
|
||||
query.value("large").toBool(),
|
||||
@ -146,8 +147,8 @@ std::vector<Package> Package::get_packages(QSqlDatabase& p_db) {
|
||||
return result;
|
||||
}
|
||||
|
||||
Package Package::get_package_by_id(QSqlDatabase& p_db, int id) {
|
||||
QSqlQuery query(p_db);
|
||||
Package Package::get_package_by_id(int id) {
|
||||
QSqlQuery query(get_thread_connection());
|
||||
query.prepare("SELECT id, name, large, upstream_url, packaging_branch, packaging_url FROM package WHERE id = ? LIMIT 1");
|
||||
query.bindValue(0, id);
|
||||
if (!query.exec()) {
|
||||
@ -165,8 +166,8 @@ Package Package::get_package_by_id(QSqlDatabase& p_db, int id) {
|
||||
return Package();
|
||||
}
|
||||
|
||||
bool Package::set_packages(QSqlDatabase& p_db, YAML::Node& packages) {
|
||||
std::vector<Package> current_packages = get_packages(p_db);
|
||||
bool Package::set_packages(YAML::Node& packages) {
|
||||
std::vector<Package> current_packages = get_packages();
|
||||
std::unordered_map<std::string, YAML::Node> packages_map;
|
||||
for (const auto& package : packages) {
|
||||
if (package["name"]) {
|
||||
@ -214,7 +215,7 @@ bool Package::set_packages(QSqlDatabase& p_db, YAML::Node& packages) {
|
||||
? QString::fromStdString(package_node["packaging_branch"].as<std::string>())
|
||||
: QString("");
|
||||
|
||||
QSqlQuery query(p_db);
|
||||
QSqlQuery query(get_thread_connection());
|
||||
query.prepare("INSERT INTO package (name, large, upstream_url, packaging_branch, packaging_url) VALUES (?, ?, ?, ?, ?)");
|
||||
query.bindValue(0, name);
|
||||
query.bindValue(1, large);
|
||||
@ -226,7 +227,7 @@ bool Package::set_packages(QSqlDatabase& p_db, YAML::Node& packages) {
|
||||
|
||||
// Remove the deletions
|
||||
for (const auto& package : deletions) {
|
||||
QSqlQuery query(p_db);
|
||||
QSqlQuery query(get_thread_connection());
|
||||
query.prepare("DELETE FROM package WHERE name = ?");
|
||||
query.bindValue(0, QString::fromStdString(package));
|
||||
if (!query.exec()) { return false; }
|
||||
@ -273,10 +274,10 @@ std::string Package::transform_url(const std::string& url) {
|
||||
Branch::Branch(int id, const std::string& name, const std::string& upload_target, const std::string& upload_target_ssh)
|
||||
: id(id), name(name), upload_target(upload_target), upload_target_ssh(upload_target_ssh) {}
|
||||
|
||||
std::vector<Branch> Branch::get_branches(QSqlDatabase& p_db) {
|
||||
std::vector<Branch> Branch::get_branches() {
|
||||
std::vector<Branch> result;
|
||||
QString query_str = "SELECT id, name, upload_target, upload_target_ssh FROM branch";
|
||||
QSqlQuery query(query_str, p_db);
|
||||
QSqlQuery query(query_str, get_thread_connection());
|
||||
while (query.next()) {
|
||||
Branch current_branch(query.value("id").toInt(), query.value("name").toString().toStdString(),
|
||||
query.value("upload_target").toString().toStdString(),
|
||||
@ -286,8 +287,8 @@ std::vector<Branch> Branch::get_branches(QSqlDatabase& p_db) {
|
||||
return result;
|
||||
}
|
||||
|
||||
Branch Branch::get_branch_by_id(QSqlDatabase& p_db, int id) {
|
||||
QSqlQuery query(p_db);
|
||||
Branch Branch::get_branch_by_id(int id) {
|
||||
QSqlQuery query(get_thread_connection());
|
||||
query.prepare("SELECT id, name, upload_target, upload_target_ssh FROM branch WHERE id = ? LIMIT 1");
|
||||
query.bindValue(0, id);
|
||||
if (!query.exec()) {
|
||||
@ -311,7 +312,7 @@ PackageConf::PackageConf(int id, std::shared_ptr<Package> package, std::shared_p
|
||||
std::shared_ptr<GitCommit> packaging_commit, std::shared_ptr<GitCommit> upstream_commit)
|
||||
: id(id), package(package), release(release), branch(branch), packaging_commit(packaging_commit), upstream_commit(upstream_commit) {}
|
||||
|
||||
std::vector<std::shared_ptr<PackageConf>> PackageConf::get_package_confs(QSqlDatabase& p_db, std::map<std::string, std::shared_ptr<JobStatus>> jobstatus_map) {
|
||||
std::vector<std::shared_ptr<PackageConf>> PackageConf::get_package_confs(std::map<std::string, std::shared_ptr<JobStatus>> jobstatus_map) {
|
||||
Branch _tmp_brch = Branch();
|
||||
Package _tmp_pkg = Package();
|
||||
Release _tmp_rel = Release();
|
||||
@ -319,22 +320,22 @@ std::vector<std::shared_ptr<PackageConf>> PackageConf::get_package_confs(QSqlDat
|
||||
|
||||
// Get the default release for setting the packaging branch
|
||||
std::string default_release;
|
||||
for (const Release& release : _tmp_rel.get_releases(p_db)) {
|
||||
for (const Release& release : _tmp_rel.get_releases()) {
|
||||
if (release.isDefault) {
|
||||
default_release = release.codename;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
for (const Branch& branch : _tmp_brch.get_branches(p_db)) {
|
||||
for (const Branch& branch : _tmp_brch.get_branches()) {
|
||||
int branch_id = branch.id;
|
||||
std::shared_ptr<Branch> shared_branch = std::make_shared<Branch>(branch);
|
||||
|
||||
for (const Release& release : _tmp_rel.get_releases(p_db)) {
|
||||
for (const Release& release : _tmp_rel.get_releases()) {
|
||||
int release_id = release.id;
|
||||
std::shared_ptr<Release> shared_release = std::make_shared<Release>(release);
|
||||
|
||||
for (const Package& package : _tmp_pkg.get_packages(p_db)) {
|
||||
for (const Package& package : _tmp_pkg.get_packages()) {
|
||||
int package_id = package.id;
|
||||
|
||||
Package new_package = package;
|
||||
@ -343,7 +344,7 @@ std::vector<std::shared_ptr<PackageConf>> PackageConf::get_package_confs(QSqlDat
|
||||
}
|
||||
std::shared_ptr<Package> shared_package = std::make_shared<Package>(new_package);
|
||||
|
||||
QSqlQuery query_local(p_db);
|
||||
QSqlQuery query_local(get_thread_connection());
|
||||
query_local.prepare(R"(
|
||||
SELECT id, upstream_version, ppa_revision, package_id, release_id, branch_id, packaging_commit_id, upstream_commit_id
|
||||
FROM packageconf
|
||||
@ -368,13 +369,13 @@ std::vector<std::shared_ptr<PackageConf>> PackageConf::get_package_confs(QSqlDat
|
||||
|
||||
if (!pkg_commit_variant.isNull()) {
|
||||
int pkg_commit_id = pkg_commit_variant.toInt();
|
||||
GitCommit tmp_pkg_commit = _tmp_commit.get_commit_by_id(p_db, pkg_commit_id);
|
||||
GitCommit tmp_pkg_commit = _tmp_commit.get_commit_by_id(pkg_commit_id);
|
||||
packaging_commit_ptr = std::make_shared<GitCommit>(tmp_pkg_commit);
|
||||
}
|
||||
|
||||
if (!ups_commit_variant.isNull()) {
|
||||
int ups_commit_id = ups_commit_variant.toInt();
|
||||
GitCommit tmp_ups_commit = _tmp_commit.get_commit_by_id(p_db, ups_commit_id);
|
||||
GitCommit tmp_ups_commit = _tmp_commit.get_commit_by_id(ups_commit_id);
|
||||
upstream_commit_ptr = std::make_shared<GitCommit>(tmp_ups_commit);
|
||||
}
|
||||
|
||||
@ -397,7 +398,7 @@ std::vector<std::shared_ptr<PackageConf>> PackageConf::get_package_confs(QSqlDat
|
||||
|
||||
{
|
||||
// 1. Query all rows from `task`
|
||||
QSqlQuery query(p_db);
|
||||
QSqlQuery query(get_thread_connection());
|
||||
query.prepare(R"(
|
||||
SELECT
|
||||
t.id AS id,
|
||||
@ -496,7 +497,7 @@ std::vector<std::shared_ptr<PackageConf>> PackageConf::get_package_confs(QSqlDat
|
||||
return result;
|
||||
}
|
||||
|
||||
std::vector<std::shared_ptr<PackageConf>> PackageConf::get_package_confs_by_package_name(QSqlDatabase& p_db, std::vector<std::shared_ptr<PackageConf>> packageconfs, const std::string& package_name) {
|
||||
std::vector<std::shared_ptr<PackageConf>> PackageConf::get_package_confs_by_package_name(std::vector<std::shared_ptr<PackageConf>> packageconfs, const std::string& package_name) {
|
||||
Branch _tmp_brch = Branch();
|
||||
Package _tmp_pkg = Package();
|
||||
PackageConf _tmp_pkg_conf = PackageConf();
|
||||
@ -513,21 +514,21 @@ std::vector<std::shared_ptr<PackageConf>> PackageConf::get_package_confs_by_pack
|
||||
|
||||
// Get the default release for setting the packaging branch
|
||||
std::string default_release;
|
||||
for (const Release& release : _tmp_rel.get_releases(p_db)) {
|
||||
for (const Release& release : _tmp_rel.get_releases()) {
|
||||
if (release.isDefault) {
|
||||
default_release = release.codename;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
for (const Branch& branch : _tmp_brch.get_branches(p_db)) {
|
||||
for (const Branch& branch : _tmp_brch.get_branches()) {
|
||||
int branch_id = branch.id;
|
||||
std::shared_ptr<Branch> shared_branch = std::make_shared<Branch>(branch);
|
||||
|
||||
for (const Release& release : _tmp_rel.get_releases(p_db)) {
|
||||
for (const Release& release : _tmp_rel.get_releases()) {
|
||||
int release_id = release.id;
|
||||
std::shared_ptr<Release> shared_release = std::make_shared<Release>(release);
|
||||
for (const Package& package : _tmp_pkg.get_packages(p_db)) {
|
||||
for (const Package& package : _tmp_pkg.get_packages()) {
|
||||
int package_id = package.id;
|
||||
|
||||
Package new_package = package;
|
||||
@ -536,7 +537,7 @@ std::vector<std::shared_ptr<PackageConf>> PackageConf::get_package_confs_by_pack
|
||||
}
|
||||
std::shared_ptr<Package> shared_package = std::make_shared<Package>(new_package);
|
||||
|
||||
QSqlQuery query_local(p_db);
|
||||
QSqlQuery query_local(get_thread_connection());
|
||||
query_local.prepare(R"(
|
||||
SELECT id, package_id, release_id, branch_id, packaging_commit_id, upstream_commit_id
|
||||
FROM packageconf
|
||||
@ -561,13 +562,13 @@ std::vector<std::shared_ptr<PackageConf>> PackageConf::get_package_confs_by_pack
|
||||
|
||||
if (!pkg_commit_variant.isNull()) {
|
||||
int pkg_commit_id = pkg_commit_variant.toInt();
|
||||
GitCommit tmp_pkg_commit = _tmp_commit.get_commit_by_id(p_db, pkg_commit_id);
|
||||
GitCommit tmp_pkg_commit = _tmp_commit.get_commit_by_id(pkg_commit_id);
|
||||
packaging_commit_ptr = std::make_shared<GitCommit>(tmp_pkg_commit);
|
||||
}
|
||||
|
||||
if (!ups_commit_variant.isNull()) {
|
||||
int ups_commit_id = ups_commit_variant.toInt();
|
||||
GitCommit tmp_ups_commit = _tmp_commit.get_commit_by_id(p_db, ups_commit_id);
|
||||
GitCommit tmp_ups_commit = _tmp_commit.get_commit_by_id(ups_commit_id);
|
||||
upstream_commit_ptr = std::make_shared<GitCommit>(tmp_ups_commit);
|
||||
}
|
||||
|
||||
@ -588,7 +589,7 @@ std::vector<std::shared_ptr<PackageConf>> PackageConf::get_package_confs_by_pack
|
||||
|
||||
{
|
||||
// 1. Query all rows from `task`
|
||||
QSqlQuery query(p_db);
|
||||
QSqlQuery query(get_thread_connection());
|
||||
query.prepare(R"(
|
||||
SELECT id, packageconf_id, jobstatus_id, queue_time, start_time,
|
||||
finish_time, successful, log
|
||||
@ -602,14 +603,14 @@ std::vector<std::shared_ptr<PackageConf>> PackageConf::get_package_confs_by_pack
|
||||
// so we can quickly look up a JobStatus by its ID:
|
||||
std::map<int, std::shared_ptr<JobStatus>> all_jobstatuses;
|
||||
{
|
||||
QSqlQuery q2(p_db);
|
||||
QSqlQuery q2(get_thread_connection());
|
||||
q2.prepare("SELECT id FROM jobstatus");
|
||||
if (!q2.exec()) {
|
||||
qDebug() << "Failed to load jobstatus list:" << q2.lastError().text();
|
||||
}
|
||||
while (q2.next()) {
|
||||
int js_id = q2.value(0).toInt();
|
||||
auto js_ptr = std::make_shared<JobStatus>(JobStatus(p_db, js_id));
|
||||
auto js_ptr = std::make_shared<JobStatus>(JobStatus(js_id));
|
||||
all_jobstatuses[js_id] = js_ptr;
|
||||
}
|
||||
}
|
||||
@ -712,9 +713,9 @@ void PackageConf::assign_task(std::shared_ptr<JobStatus> jobstatus, std::shared_
|
||||
}
|
||||
|
||||
|
||||
bool PackageConf::set_package_confs(QSqlDatabase& p_db) {
|
||||
bool PackageConf::set_package_confs() {
|
||||
// Fetch current PackageConf entries from the database
|
||||
QSqlQuery query(p_db);
|
||||
QSqlQuery query(get_thread_connection());
|
||||
query.prepare("SELECT package_id, release_id, branch_id FROM packageconf");
|
||||
if (!query.exec()) {
|
||||
qDebug() << "Failed to fetch existing packageconfs:" << query.lastError().text();
|
||||
@ -732,15 +733,15 @@ bool PackageConf::set_package_confs(QSqlDatabase& p_db) {
|
||||
}
|
||||
|
||||
// Fetch all package, release, and branch IDs
|
||||
QSqlQuery pkg_query("SELECT id FROM package", p_db);
|
||||
QSqlQuery pkg_query("SELECT id FROM package", get_thread_connection());
|
||||
std::set<int> package_ids;
|
||||
while (pkg_query.next()) { package_ids.insert(pkg_query.value(0).toInt()); }
|
||||
|
||||
QSqlQuery rel_query("SELECT id FROM release", p_db);
|
||||
QSqlQuery rel_query("SELECT id FROM release", get_thread_connection());
|
||||
std::set<int> release_ids;
|
||||
while (rel_query.next()) { release_ids.insert(rel_query.value(0).toInt()); }
|
||||
|
||||
QSqlQuery br_query("SELECT id FROM branch", p_db);
|
||||
QSqlQuery br_query("SELECT id FROM branch", get_thread_connection());
|
||||
std::set<int> branch_ids;
|
||||
while (br_query.next()) { branch_ids.insert(br_query.value(0).toInt()); }
|
||||
|
||||
@ -773,7 +774,7 @@ bool PackageConf::set_package_confs(QSqlDatabase& p_db) {
|
||||
|
||||
// Insert additions, now including packaging_commit_id/upstream_commit_id as NULL
|
||||
for (const auto& conf : additions) {
|
||||
QSqlQuery insert_query(p_db);
|
||||
QSqlQuery insert_query(get_thread_connection());
|
||||
insert_query.prepare(R"(
|
||||
INSERT INTO packageconf (
|
||||
package_id,
|
||||
@ -799,7 +800,7 @@ bool PackageConf::set_package_confs(QSqlDatabase& p_db) {
|
||||
|
||||
// Remove deletions
|
||||
for (const auto& conf : deletions) {
|
||||
QSqlQuery delete_query(p_db);
|
||||
QSqlQuery delete_query(get_thread_connection());
|
||||
delete_query.prepare(R"(
|
||||
DELETE FROM packageconf
|
||||
WHERE package_id = ?
|
||||
@ -822,12 +823,12 @@ bool PackageConf::set_package_confs(QSqlDatabase& p_db) {
|
||||
return true;
|
||||
}
|
||||
|
||||
void PackageConf::sync(QSqlDatabase& p_db) {
|
||||
void PackageConf::sync() {
|
||||
bool task_succeeded = true;
|
||||
int attempt = 0;
|
||||
while (!task_succeeded) {
|
||||
try {
|
||||
QSqlQuery query(p_db);
|
||||
QSqlQuery query(get_thread_connection());
|
||||
|
||||
if ((!packaging_commit || !upstream_commit) || ((!packaging_commit || packaging_commit->id == 0) && (!upstream_commit || upstream_commit->id == 0))) break;
|
||||
else if ((packaging_commit && packaging_commit->id == 0) && (!upstream_commit || upstream_commit->id != 0)) {
|
||||
@ -866,8 +867,8 @@ void PackageConf::sync(QSqlDatabase& p_db) {
|
||||
std::lock_guard<std::mutex> lock(*task_mutex_);
|
||||
for (auto [job_status, task] : jobstatus_task_map_) {
|
||||
if (task) {
|
||||
auto sync_func = [this, task, p_db]() mutable {
|
||||
task->save(p_db, id);
|
||||
auto sync_func = [this, task]() mutable {
|
||||
task->save(id);
|
||||
};
|
||||
sync_func();
|
||||
}
|
||||
@ -939,7 +940,6 @@ bool PackageConf::can_check_builds() {
|
||||
// Start of GitCommit
|
||||
// Constructor which also adds it to the database
|
||||
GitCommit::GitCommit(
|
||||
QSqlDatabase& p_db,
|
||||
const std::string& commit_hash,
|
||||
const std::string& commit_summary,
|
||||
const std::string& commit_message,
|
||||
@ -953,7 +953,7 @@ GitCommit::GitCommit(
|
||||
commit_author(commit_author),
|
||||
commit_committer(commit_committer) {
|
||||
// Insert the entry into the database right away
|
||||
QSqlQuery insert_query(p_db);
|
||||
QSqlQuery insert_query(get_thread_connection());
|
||||
|
||||
// Convert commit_datetime to a string in ISO 8601 format
|
||||
auto sys_time = commit_datetime.get_sys_time();
|
||||
@ -1018,8 +1018,8 @@ std::chrono::zoned_time<std::chrono::seconds> GitCommit::convert_timestr_to_zone
|
||||
return db_commit_datetime;
|
||||
}
|
||||
|
||||
GitCommit GitCommit::get_commit_by_id(QSqlDatabase& p_db, int id) {
|
||||
QSqlQuery query(p_db);
|
||||
GitCommit GitCommit::get_commit_by_id(int id) {
|
||||
QSqlQuery query(get_thread_connection());
|
||||
query.prepare(
|
||||
"SELECT id, commit_hash, commit_summary, commit_message, commit_datetime, "
|
||||
" commit_author, commit_committer "
|
||||
@ -1062,8 +1062,8 @@ GitCommit GitCommit::get_commit_by_id(QSqlDatabase& p_db, int id) {
|
||||
return GitCommit();
|
||||
}
|
||||
|
||||
std::optional<GitCommit> GitCommit::get_commit_by_hash(QSqlDatabase& p_db, const std::string commit_hash) {
|
||||
QSqlQuery query(p_db);
|
||||
std::optional<GitCommit> GitCommit::get_commit_by_hash(const std::string commit_hash) {
|
||||
QSqlQuery query(get_thread_connection());
|
||||
query.prepare(
|
||||
"SELECT id, commit_hash, commit_summary, commit_message, commit_datetime, "
|
||||
" commit_author, commit_committer "
|
||||
@ -1107,8 +1107,8 @@ std::optional<GitCommit> GitCommit::get_commit_by_hash(QSqlDatabase& p_db, const
|
||||
}
|
||||
// End of GitCommit
|
||||
// Start of JobStatus
|
||||
JobStatus::JobStatus(QSqlDatabase& p_db, int id) : id(id) {
|
||||
QSqlQuery query(p_db);
|
||||
JobStatus::JobStatus(int id) : id(id) {
|
||||
QSqlQuery query(get_thread_connection());
|
||||
query.prepare(
|
||||
"SELECT id, build_score, name, display_name "
|
||||
"FROM jobstatus WHERE id = ? LIMIT 1"
|
||||
@ -1126,11 +1126,11 @@ JobStatus::JobStatus(QSqlDatabase& p_db, int id) : id(id) {
|
||||
}
|
||||
// End of JobStatus
|
||||
// Start of Task
|
||||
Task::Task(QSqlDatabase& p_db, std::shared_ptr<JobStatus> jobstatus, std::int64_t time, std::shared_ptr<PackageConf> packageconf)
|
||||
Task::Task(std::shared_ptr<JobStatus> jobstatus, std::int64_t time, std::shared_ptr<PackageConf> packageconf)
|
||||
: jobstatus(jobstatus), queue_time(time), is_running(false), log(std::make_shared<Log>()), parent_packageconf(packageconf)
|
||||
{
|
||||
assert(log != nullptr && "Log pointer should never be null");
|
||||
QSqlQuery insert_query(p_db);
|
||||
QSqlQuery insert_query(get_thread_connection());
|
||||
insert_query.prepare("INSERT INTO task (packageconf_id, jobstatus_id, queue_time) VALUES (?, ?, ?)");
|
||||
insert_query.addBindValue(packageconf->id);
|
||||
insert_query.addBindValue(jobstatus->id);
|
||||
@ -1187,12 +1187,12 @@ bool Task::compare(const std::shared_ptr<Task>& lhs, const std::shared_ptr<Task>
|
||||
return lhs->id < rhs->id; // Earlier id first
|
||||
}
|
||||
|
||||
std::set<std::shared_ptr<Task>> Task::get_completed_tasks(QSqlDatabase& p_db, std::vector<std::shared_ptr<PackageConf>> packageconfs, std::map<std::string, std::shared_ptr<JobStatus>> job_statuses, int page, int per_page) {
|
||||
std::set<std::shared_ptr<Task>> Task::get_completed_tasks(std::vector<std::shared_ptr<PackageConf>> packageconfs, std::map<std::string, std::shared_ptr<JobStatus>> job_statuses, int page, int per_page) {
|
||||
std::set<std::shared_ptr<Task>> result;
|
||||
|
||||
if (per_page < 1) { per_page = 1; }
|
||||
|
||||
QSqlQuery query(p_db);
|
||||
QSqlQuery query(get_thread_connection());
|
||||
query.prepare(
|
||||
"SELECT id, packageconf_id, jobstatus_id, start_time, finish_time, successful, log "
|
||||
"FROM task WHERE start_time != 0 AND finish_time != 0 ORDER BY finish_time DESC LIMIT ? OFFSET ?"
|
||||
@ -1231,11 +1231,11 @@ std::set<std::shared_ptr<Task>> Task::get_completed_tasks(QSqlDatabase& p_db, st
|
||||
return result;
|
||||
}
|
||||
|
||||
void Task::save(QSqlDatabase& p_db, int _packageconf_id) {
|
||||
void Task::save(int _packageconf_id) {
|
||||
bool task_succeeded = false;
|
||||
int attempt = 0;
|
||||
while (!task_succeeded) {
|
||||
QSqlQuery query(p_db);
|
||||
QSqlQuery query(get_thread_connection());
|
||||
query.prepare("UPDATE task SET jobstatus_id = ?, queue_time = ?, start_time = ?, finish_time = ?, successful = ?, log = ? WHERE id = ?");
|
||||
query.addBindValue(jobstatus->id);
|
||||
query.addBindValue(QVariant::fromValue(static_cast<qlonglong>(queue_time)));
|
||||
@ -1254,7 +1254,7 @@ void Task::save(QSqlDatabase& p_db, int _packageconf_id) {
|
||||
}
|
||||
}
|
||||
|
||||
QSqlQuery link_query(p_db);
|
||||
QSqlQuery link_query(get_thread_connection());
|
||||
|
||||
int packageconf_id;
|
||||
// Max length of int, or default
|
||||
|
@ -46,9 +46,9 @@ public:
|
||||
bool isDefault;
|
||||
|
||||
Release(int id = 0, int version = 0, const std::string& codename = "", bool isDefault = false);
|
||||
std::vector<Release> get_releases(QSqlDatabase& p_db);
|
||||
Release get_release_by_id(QSqlDatabase& p_db, int id);
|
||||
bool set_releases(QSqlDatabase& p_db, YAML::Node& releases);
|
||||
std::vector<Release> get_releases();
|
||||
Release get_release_by_id(int id);
|
||||
bool set_releases(YAML::Node& releases);
|
||||
};
|
||||
|
||||
class Package {
|
||||
@ -63,9 +63,9 @@ public:
|
||||
std::string packaging_url;
|
||||
|
||||
Package(int id = 0, const std::string& name = "", bool large = false, const std::string& upstream_url = "", const std::string& packaging_branch = "", const std::string& packaging_url = "");
|
||||
std::vector<Package> get_packages(QSqlDatabase& p_db);
|
||||
Package get_package_by_id(QSqlDatabase& p_db, int id);
|
||||
bool set_packages(QSqlDatabase& p_db, YAML::Node& packages);
|
||||
std::vector<Package> get_packages();
|
||||
Package get_package_by_id(int id);
|
||||
bool set_packages(YAML::Node& packages);
|
||||
|
||||
private:
|
||||
std::string transform_url(const std::string& url);
|
||||
@ -79,8 +79,8 @@ public:
|
||||
std::string upload_target_ssh;
|
||||
|
||||
Branch(int id = 0, const std::string& name = "", const std::string& upload_target = "", const std::string& upload_target_ssh = "");
|
||||
std::vector<Branch> get_branches(QSqlDatabase& p_db);
|
||||
Branch get_branch_by_id(QSqlDatabase& p_db, int id);
|
||||
std::vector<Branch> get_branches();
|
||||
Branch get_branch_by_id(int id);
|
||||
};
|
||||
|
||||
class GitCommit {
|
||||
@ -94,7 +94,6 @@ public:
|
||||
std::string commit_committer;
|
||||
|
||||
GitCommit(
|
||||
QSqlDatabase& p_db,
|
||||
const std::string& commit_hash = "",
|
||||
const std::string& commit_summary = "",
|
||||
const std::string& commit_message = "",
|
||||
@ -103,7 +102,7 @@ public:
|
||||
const std::string& commit_committer = ""
|
||||
);
|
||||
GitCommit(
|
||||
const int id = 0,
|
||||
const int id,
|
||||
const std::string& commit_hash = "",
|
||||
const std::string& commit_summary = "",
|
||||
const std::string& commit_message = "",
|
||||
@ -112,8 +111,8 @@ public:
|
||||
const std::string& commit_committer = ""
|
||||
);
|
||||
|
||||
GitCommit get_commit_by_id(QSqlDatabase& p_db, int id);
|
||||
std::optional<GitCommit> get_commit_by_hash(QSqlDatabase& p_db, const std::string commit_hash);
|
||||
GitCommit get_commit_by_id(int id);
|
||||
std::optional<GitCommit> get_commit_by_hash(const std::string commit_hash);
|
||||
|
||||
private:
|
||||
std::chrono::zoned_time<std::chrono::seconds> convert_timestr_to_zonedtime(const std::string& datetime_str);
|
||||
@ -126,7 +125,7 @@ public:
|
||||
std::string name;
|
||||
std::string display_name;
|
||||
|
||||
JobStatus(QSqlDatabase& p_db, int id);
|
||||
JobStatus(int id);
|
||||
};
|
||||
|
||||
class PackageConf {
|
||||
@ -158,18 +157,17 @@ public:
|
||||
|
||||
PackageConf(int id = 0, std::shared_ptr<Package> package = NULL, std::shared_ptr<Release> release = NULL, std::shared_ptr<Branch> branch = NULL,
|
||||
std::shared_ptr<GitCommit> packaging_commit = NULL, std::shared_ptr<GitCommit> upstream_commit = NULL);
|
||||
std::vector<std::shared_ptr<PackageConf>> get_package_confs(QSqlDatabase& p_db, std::map<std::string, std::shared_ptr<JobStatus>> jobstatus_map);
|
||||
std::vector<std::shared_ptr<PackageConf>> get_package_confs_by_package_name(QSqlDatabase& p_db,
|
||||
std::vector<std::shared_ptr<PackageConf>> packageconfs,
|
||||
std::vector<std::shared_ptr<PackageConf>> get_package_confs(std::map<std::string, std::shared_ptr<JobStatus>> jobstatus_map);
|
||||
std::vector<std::shared_ptr<PackageConf>> get_package_confs_by_package_name(std::vector<std::shared_ptr<PackageConf>> packageconfs,
|
||||
const std::string& package_name);
|
||||
void assign_task(std::shared_ptr<JobStatus> jobstatus, std::shared_ptr<Task> task_ptr, std::weak_ptr<PackageConf> packageconf_ptr);
|
||||
int successful_task_count();
|
||||
int total_task_count();
|
||||
std::shared_ptr<Task> get_task_by_jobstatus(std::shared_ptr<JobStatus> jobstatus);
|
||||
bool set_package_confs(QSqlDatabase& p_db);
|
||||
bool set_package_confs();
|
||||
bool set_commit_id(const std::string& _commit_id = "");
|
||||
bool set_commit_time(const std::chrono::zoned_time<std::chrono::seconds>& _commit_time = std::chrono::zoned_time<std::chrono::seconds>{});
|
||||
void sync(QSqlDatabase& p_db);
|
||||
void sync();
|
||||
bool can_check_source_upload();
|
||||
bool can_check_builds();
|
||||
|
||||
@ -211,11 +209,11 @@ public:
|
||||
std::weak_ptr<PackageConf> parent_packageconf;
|
||||
bool is_running;
|
||||
|
||||
Task(QSqlDatabase& p_db, std::shared_ptr<JobStatus> jobstatus, std::int64_t time, std::shared_ptr<PackageConf> packageconf);
|
||||
Task(std::shared_ptr<JobStatus> jobstatus, std::int64_t time, std::shared_ptr<PackageConf> packageconf);
|
||||
Task();
|
||||
|
||||
std::set<std::shared_ptr<Task>> get_completed_tasks(QSqlDatabase& p_db, std::vector<std::shared_ptr<PackageConf>> packageconfs, std::map<std::string, std::shared_ptr<JobStatus>> job_statuses, int page, int per_page);
|
||||
void save(QSqlDatabase& p_db, int _packageconf_id = 0);
|
||||
std::set<std::shared_ptr<Task>> get_completed_tasks(std::vector<std::shared_ptr<PackageConf>> packageconfs, std::map<std::string, std::shared_ptr<JobStatus>> job_statuses, int page, int per_page);
|
||||
void save(int _packageconf_id = 0);
|
||||
|
||||
std::shared_ptr<PackageConf> get_parent_packageconf() const {
|
||||
return parent_packageconf.lock();
|
||||
|
@ -67,32 +67,6 @@ static void merge_yaml_nodes(YAML::Node &master, const YAML::Node &partial) {
|
||||
}
|
||||
}
|
||||
|
||||
QSqlDatabase CiLogic::get_thread_connection() {
|
||||
std::lock_guard<std::mutex> lock(connection_mutex_);
|
||||
thread_local unsigned int thread_unique_id = thread_id_counter.fetch_add(1);
|
||||
QString connectionName = QString("LubuntuCIConnection_%1").arg(thread_unique_id);
|
||||
|
||||
// Check if the connection already exists for this thread
|
||||
if (QSqlDatabase::contains(connectionName)) {
|
||||
QSqlDatabase db = QSqlDatabase::database(connectionName);
|
||||
if (!db.isOpen()) {
|
||||
if (!db.open()) {
|
||||
throw std::runtime_error("Failed to open thread-specific database connection: " + db.lastError().text().toStdString());
|
||||
}
|
||||
}
|
||||
return db;
|
||||
}
|
||||
|
||||
QSqlDatabase threadDb = QSqlDatabase::addDatabase("QSQLITE", connectionName);
|
||||
threadDb.setDatabaseName("/srv/lubuntu-ci/repos/ci-tools/lubuntu_ci.db");
|
||||
|
||||
if (!threadDb.open()) {
|
||||
throw std::runtime_error("Failed to open new database connection for thread: " + threadDb.lastError().text().toStdString());
|
||||
}
|
||||
|
||||
return threadDb;
|
||||
}
|
||||
|
||||
// This returns the following information about a commit:
|
||||
// 1) commit_hash
|
||||
// 2) commit_summary
|
||||
@ -100,7 +74,7 @@ QSqlDatabase CiLogic::get_thread_connection() {
|
||||
// 4) commit_datetime
|
||||
// 5) commit_author
|
||||
// 6) commit_committer
|
||||
GitCommit get_commit_from_pkg_repo(QSqlDatabase& p_db, const std::string& repo_name, std::shared_ptr<Log> log) {
|
||||
GitCommit get_commit_from_pkg_repo(const std::string& repo_name, std::shared_ptr<Log> log) {
|
||||
// Ensure libgit2 is initialized
|
||||
ensure_git_inited();
|
||||
|
||||
@ -245,7 +219,6 @@ GitCommit get_commit_from_pkg_repo(QSqlDatabase& p_db, const std::string& repo_n
|
||||
|
||||
// Construct and return the GitCommit object with collected data
|
||||
GitCommit git_commit_instance(
|
||||
p_db,
|
||||
commit_hash,
|
||||
current_summary, // Use the current commit summary
|
||||
commit_message,
|
||||
@ -255,7 +228,7 @@ GitCommit get_commit_from_pkg_repo(QSqlDatabase& p_db, const std::string& repo_n
|
||||
);
|
||||
|
||||
// Check if the commit already exists in the DB
|
||||
auto existing_commit = _tmp_commit.get_commit_by_hash(p_db, commit_hash);
|
||||
auto existing_commit = _tmp_commit.get_commit_by_hash(commit_hash);
|
||||
if (existing_commit) {
|
||||
found_valid_commit = true;
|
||||
// Cleanup revwalk and repository before returning
|
||||
@ -517,35 +490,30 @@ void CiLogic::init_global() {
|
||||
|
||||
// Set the packages in the DB
|
||||
YAML::Node yaml_packages = g_config["packages"];
|
||||
auto connection = get_thread_connection();
|
||||
if (!_tmp_pkg.set_packages(connection, yaml_packages)) {
|
||||
if (!_tmp_pkg.set_packages(yaml_packages)) {
|
||||
log_error("Failed to set packages.");
|
||||
}
|
||||
packages = _tmp_pkg.get_packages(connection);
|
||||
packages = _tmp_pkg.get_packages();
|
||||
|
||||
// Set the releases in the DB
|
||||
YAML::Node yaml_releases = g_config["releases"];
|
||||
connection = get_thread_connection();
|
||||
if (!_tmp_rel.set_releases(connection, yaml_releases)) {
|
||||
if (!_tmp_rel.set_releases(yaml_releases)) {
|
||||
log_error("Failed to set releases.");
|
||||
}
|
||||
connection = get_thread_connection();
|
||||
releases = _tmp_rel.get_releases(connection);
|
||||
releases = _tmp_rel.get_releases();
|
||||
|
||||
// Add missing packageconf entries
|
||||
connection = get_thread_connection();
|
||||
if (!_tmp_pkg_conf.set_package_confs(connection)) {
|
||||
if (!_tmp_pkg_conf.set_package_confs()) {
|
||||
log_error("Failed to set package configurations.");
|
||||
}
|
||||
set_packageconfs(_tmp_pkg_conf.get_package_confs(connection, get_job_statuses()));
|
||||
set_packageconfs(_tmp_pkg_conf.get_package_confs(get_job_statuses()));
|
||||
|
||||
// Finally, store the branches
|
||||
connection = get_thread_connection();
|
||||
if (branches.empty()) {
|
||||
branches = _tmp_brnch.get_branches(connection);
|
||||
}
|
||||
if (branches.empty()) {
|
||||
branches = _tmp_brnch.get_branches();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert a YAML node to CiProject
|
||||
@ -1000,12 +968,9 @@ bool CiLogic::pull_project(std::shared_ptr<PackageConf> &proj, std::shared_ptr<L
|
||||
|
||||
// Now read the HEAD commits and store them
|
||||
log->append("Fetching complete. Storing Git commit data...\n");
|
||||
auto connection = get_thread_connection();
|
||||
*proj->packaging_commit = get_commit_from_pkg_repo(connection, packaging_dir.string(), log);
|
||||
connection = get_thread_connection();
|
||||
*proj->upstream_commit = get_commit_from_pkg_repo(connection, upstream_dir.string(), log);
|
||||
connection = get_thread_connection();
|
||||
proj->sync(connection);
|
||||
*proj->packaging_commit = get_commit_from_pkg_repo(packaging_dir.string(), log);
|
||||
*proj->upstream_commit = get_commit_from_pkg_repo(upstream_dir.string(), log);
|
||||
proj->sync();
|
||||
|
||||
log->append("Done!");
|
||||
return true;
|
||||
@ -1362,16 +1327,15 @@ std::string CiLogic::queue_pull_tarball(std::vector<std::shared_ptr<PackageConf>
|
||||
std::map<std::string, std::shared_ptr<JobStatus>> CiLogic::get_job_statuses() {
|
||||
if (!_cached_job_statuses.empty()) { return _cached_job_statuses; }
|
||||
|
||||
auto connection = get_thread_connection();
|
||||
static const std::map<std::string, std::shared_ptr<JobStatus>> statuses = {
|
||||
{"pull", std::make_shared<JobStatus>(JobStatus(connection, 1))},
|
||||
{"tarball", std::make_shared<JobStatus>(JobStatus(connection, 2))},
|
||||
{"source_build", std::make_shared<JobStatus>(JobStatus(connection, 3))},
|
||||
{"upload", std::make_shared<JobStatus>(JobStatus(connection, 4))},
|
||||
{"source_check", std::make_shared<JobStatus>(JobStatus(connection, 5))},
|
||||
{"build_check", std::make_shared<JobStatus>(JobStatus(connection, 6))},
|
||||
{"lintian", std::make_shared<JobStatus>(JobStatus(connection, 7))},
|
||||
{"britney", std::make_shared<JobStatus>(JobStatus(connection, 8))}
|
||||
{"pull", std::make_shared<JobStatus>(JobStatus(1))},
|
||||
{"tarball", std::make_shared<JobStatus>(JobStatus(2))},
|
||||
{"source_build", std::make_shared<JobStatus>(JobStatus(3))},
|
||||
{"upload", std::make_shared<JobStatus>(JobStatus(4))},
|
||||
{"source_check", std::make_shared<JobStatus>(JobStatus(5))},
|
||||
{"build_check", std::make_shared<JobStatus>(JobStatus(6))},
|
||||
{"lintian", std::make_shared<JobStatus>(JobStatus(7))},
|
||||
{"britney", std::make_shared<JobStatus>(JobStatus(8))}
|
||||
};
|
||||
_cached_job_statuses = statuses;
|
||||
return statuses;
|
||||
@ -1412,8 +1376,7 @@ void CiLogic::set_packageconfs(std::vector<std::shared_ptr<PackageConf>> _pkgcon
|
||||
|
||||
void CiLogic::sync(std::shared_ptr<PackageConf> pkgconf) {
|
||||
std::lock_guard<std::mutex> lock(packageconfs_mutex_);
|
||||
auto connection = get_thread_connection();
|
||||
pkgconf->sync(connection);
|
||||
pkgconf->sync();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -13,9 +13,6 @@
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
|
||||
// cpp/ci_logic.h
|
||||
// [License Header as in original]
|
||||
|
||||
#ifndef CI_LOGIC_H
|
||||
#define CI_LOGIC_H
|
||||
|
||||
@ -97,8 +94,6 @@ class CiLogic {
|
||||
void set_packageconfs(std::vector<std::shared_ptr<PackageConf>> _pkgconfs);
|
||||
void sync(std::shared_ptr<PackageConf> pkgconf);
|
||||
|
||||
QSqlDatabase get_thread_connection();
|
||||
|
||||
std::string queue_pull_tarball(std::vector<std::shared_ptr<PackageConf>> repos,
|
||||
std::unique_ptr<TaskQueue>& task_queue,
|
||||
const std::map<std::string, std::shared_ptr<JobStatus>> job_statuses);
|
||||
@ -116,7 +111,6 @@ class CiLogic {
|
||||
|
||||
QSqlDatabase p_db;
|
||||
|
||||
mutable std::mutex connection_mutex_;
|
||||
mutable std::mutex packageconfs_mutex_;
|
||||
std::vector<std::shared_ptr<PackageConf>> packageconfs;
|
||||
std::map<std::string, std::shared_ptr<JobStatus>> _cached_job_statuses;
|
||||
|
50
cpp/db_common.cpp
Normal file
50
cpp/db_common.cpp
Normal file
@ -0,0 +1,50 @@
|
||||
// Copyright (C) 2024-2025 Simon Quigley <tsimonq2@ubuntu.com>
|
||||
//
|
||||
// This program is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// This program is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
|
||||
#include "db_common.h"
|
||||
#include <QSqlDatabase>
|
||||
#include <QSqlError>
|
||||
#include <QSqlQuery>
|
||||
#include <atomic>
|
||||
#include <mutex>
|
||||
|
||||
std::mutex connection_mutex_;
|
||||
static std::atomic<unsigned int> thread_id_counter{10};
|
||||
|
||||
QSqlDatabase get_thread_connection() {
|
||||
std::lock_guard<std::mutex> lock(connection_mutex_);
|
||||
thread_local unsigned int thread_unique_id = thread_id_counter.fetch_add(1);
|
||||
QString connection_name = QString("LubuntuCIConnection_%1").arg(thread_unique_id);
|
||||
|
||||
// Check if the connection already exists for this thread
|
||||
if (QSqlDatabase::contains(connection_name)) {
|
||||
QSqlDatabase db = QSqlDatabase::database(connection_name);
|
||||
if (!db.isOpen()) {
|
||||
if (!db.open()) {
|
||||
throw std::runtime_error("Failed to open thread-specific database connection: " + db.lastError().text().toStdString());
|
||||
}
|
||||
}
|
||||
return db;
|
||||
}
|
||||
|
||||
QSqlDatabase thread_db = QSqlDatabase::addDatabase("QSQLITE", connection_name);
|
||||
thread_db.setDatabaseName("/srv/lubuntu-ci/repos/ci-tools/lubuntu_ci.db");
|
||||
|
||||
if (!thread_db.open()) {
|
||||
throw std::runtime_error("Failed to open new database connection for thread: " + thread_db.lastError().text().toStdString());
|
||||
}
|
||||
|
||||
return thread_db;
|
||||
}
|
23
cpp/db_common.h
Normal file
23
cpp/db_common.h
Normal file
@ -0,0 +1,23 @@
|
||||
// Copyright (C) 2024 Simon Quigley <tsimonq2@ubuntu.com>
|
||||
//
|
||||
// This program is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// This program is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
|
||||
#ifndef DB_COMMON_H
|
||||
#define DB_COMMON_H
|
||||
|
||||
#include <QSqlDatabase>
|
||||
|
||||
QSqlDatabase get_thread_connection();
|
||||
|
||||
#endif // DB_COMMON_H
|
@ -26,45 +26,16 @@ TaskQueue::~TaskQueue() {
|
||||
stop();
|
||||
}
|
||||
|
||||
// FIXME: copy of CiLogic::get_thread_connection()
|
||||
std::atomic<unsigned int> TaskQueue::thread_id_counter{1200};
|
||||
QSqlDatabase TaskQueue::get_thread_connection() {
|
||||
std::lock_guard<std::mutex> lock(connection_mutex_);
|
||||
thread_local unsigned int thread_unique_id = thread_id_counter.fetch_add(1);
|
||||
QString connectionName = QString("LubuntuCIConnection_%1").arg(thread_unique_id);
|
||||
|
||||
// Check if the connection already exists for this thread
|
||||
if (QSqlDatabase::contains(connectionName)) {
|
||||
QSqlDatabase db = QSqlDatabase::database(connectionName);
|
||||
if (!db.isOpen()) {
|
||||
if (!db.open()) {
|
||||
throw std::runtime_error("Failed to open thread-specific database connection: " + db.lastError().text().toStdString());
|
||||
}
|
||||
}
|
||||
return db;
|
||||
}
|
||||
|
||||
QSqlDatabase threadDb = QSqlDatabase::addDatabase("QSQLITE", connectionName);
|
||||
threadDb.setDatabaseName("/srv/lubuntu-ci/repos/ci-tools/lubuntu_ci.db");
|
||||
|
||||
if (!threadDb.open()) {
|
||||
throw std::runtime_error("Failed to open new database connection for thread: " + threadDb.lastError().text().toStdString());
|
||||
}
|
||||
|
||||
return threadDb;
|
||||
}
|
||||
|
||||
void TaskQueue::enqueue(std::shared_ptr<JobStatus> jobstatus,
|
||||
std::function<void(std::shared_ptr<Log> log)> task_func,
|
||||
std::shared_ptr<PackageConf> packageconf) {
|
||||
{
|
||||
auto connection = get_thread_connection();
|
||||
auto now = std::chrono::duration_cast<std::chrono::milliseconds>(
|
||||
std::chrono::system_clock::now().time_since_epoch())
|
||||
.count();
|
||||
|
||||
// Create the task
|
||||
std::shared_ptr<Task> task_ptr = std::make_shared<Task>(connection, jobstatus, now, packageconf);
|
||||
std::shared_ptr<Task> task_ptr = std::make_shared<Task>(jobstatus, now, packageconf);
|
||||
task_ptr->func = [task_func, self_weak = std::weak_ptr<Task>(task_ptr)](std::shared_ptr<Log> log) {
|
||||
std::shared_ptr<Task> task_locked = self_weak.lock();
|
||||
if (task_locked) {
|
||||
@ -163,8 +134,7 @@ void TaskQueue::worker_thread() {
|
||||
std::chrono::system_clock::now().time_since_epoch())
|
||||
.count();
|
||||
task_to_execute->start_time = now;
|
||||
auto connection = get_thread_connection();
|
||||
task_to_execute->save(connection, 0);
|
||||
task_to_execute->save(0);
|
||||
}
|
||||
|
||||
try {
|
||||
@ -186,8 +156,7 @@ void TaskQueue::worker_thread() {
|
||||
std::chrono::system_clock::now().time_since_epoch())
|
||||
.count();
|
||||
task_to_execute->finish_time = now;
|
||||
auto connection = get_thread_connection();
|
||||
task_to_execute->save(connection, 0);
|
||||
task_to_execute->save(0);
|
||||
}
|
||||
|
||||
{
|
||||
|
@ -53,12 +53,9 @@ private:
|
||||
std::condition_variable cv_;
|
||||
bool stop_;
|
||||
std::vector<std::thread> workers_;
|
||||
static std::atomic<unsigned int> thread_id_counter;
|
||||
mutable std::mutex connection_mutex_;
|
||||
int max_worker_id = 1;
|
||||
|
||||
void worker_thread();
|
||||
QSqlDatabase get_thread_connection();
|
||||
};
|
||||
|
||||
#endif // TASK_QUEUE_H
|
||||
|
@ -17,6 +17,7 @@
|
||||
#include "utilities.h"
|
||||
#include "sources_parser.h"
|
||||
#include "naive_bayes_classifier.h"
|
||||
#include "db_common.h"
|
||||
|
||||
// Qt includes
|
||||
#include <QtHttpServer/QHttpServer>
|
||||
@ -171,7 +172,7 @@ bool WebServer::start_server(quint16 port) {
|
||||
|
||||
// Load initial tokens
|
||||
{
|
||||
QSqlQuery load_tokens(lubuntuci->cilogic.get_thread_connection());
|
||||
QSqlQuery load_tokens(get_thread_connection());
|
||||
load_tokens.prepare("SELECT person.id, person.username, person.logo_url, person_token.token, person_token.expiry_date FROM person INNER JOIN person_token ON person.id = person_token.person_id");
|
||||
load_tokens.exec();
|
||||
while (load_tokens.next()) {
|
||||
@ -188,7 +189,7 @@ bool WebServer::start_server(quint16 port) {
|
||||
}
|
||||
|
||||
expire_tokens_thread_ = std::jthread(run_task_every, 60, [this, lubuntuci] {
|
||||
QSqlQuery expired_tokens(lubuntuci->cilogic.get_thread_connection());
|
||||
QSqlQuery expired_tokens(get_thread_connection());
|
||||
QString current_time = QDateTime::currentDateTime().toString(Qt::ISODate);
|
||||
|
||||
expired_tokens.prepare("DELETE FROM person_token WHERE expiry_date < :current_time");
|
||||
@ -367,7 +368,7 @@ bool WebServer::start_server(quint16 port) {
|
||||
if (found_key_bool) {
|
||||
_token_person.remove(found_key);
|
||||
} else {
|
||||
QSqlQuery get_person(lubuntuci->cilogic.get_thread_connection());
|
||||
QSqlQuery get_person(get_thread_connection());
|
||||
get_person.prepare("SELECT id, username, logo_url FROM person WHERE username = ?");
|
||||
get_person.bindValue(0, QString::fromStdString(username));
|
||||
if (!get_person.exec()) { qDebug() << "Error executing SELECT query for person:" << get_person.lastError(); }
|
||||
@ -376,7 +377,7 @@ bool WebServer::start_server(quint16 port) {
|
||||
person = Person(get_person.value(0).toInt(), get_person.value(1).toString().toStdString(),
|
||||
get_person.value(2).toString().toStdString());
|
||||
} else {
|
||||
QSqlQuery insert_person(lubuntuci->cilogic.get_thread_connection());
|
||||
QSqlQuery insert_person(get_thread_connection());
|
||||
insert_person.prepare("INSERT INTO person (username, logo_url) VALUES (?, ?)");
|
||||
insert_person.bindValue(0, QString::fromStdString(username));
|
||||
insert_person.bindValue(1, QString::fromStdString("https://api.launchpad.net/devel/~" + username + "/logo"));
|
||||
@ -395,7 +396,7 @@ bool WebServer::start_server(quint16 port) {
|
||||
_active_tokens.insert(token, one_day);
|
||||
|
||||
{
|
||||
QSqlQuery insert_token(lubuntuci->cilogic.get_thread_connection());
|
||||
QSqlQuery insert_token(get_thread_connection());
|
||||
insert_token.prepare("INSERT INTO person_token (person_id, token, expiry_date) VALUES (?, ?, ?)");
|
||||
insert_token.bindValue(0, person.id);
|
||||
insert_token.bindValue(1, token);
|
||||
|
Loading…
x
Reference in New Issue
Block a user