Merge branch 'feat/add-cloud-policy' of git+ssh://git.launchpad.net/~aleksa-svitlica/cloudware/+git/britney2-ubuntu into sil2100/private-runs

sil2100/cloud-only-run-once
Łukasz 'sil2100' Zemczak 2 years ago
commit be55223a67

@ -57,7 +57,6 @@ BOUNTY_MIN_AGE = 2
HINTSDIR = data/%(SERIES)-proposed/Hints
# hint permissions
HINTS_ADCONRAD = ALL
HINTS_LANEY = ALL
HINTS_STEFANOR = ALL
HINTS_STGRABER = ALL
@ -82,7 +81,7 @@ HINTS_FREEZE = block block-all
#
# naming a non-existent section will effectively disable new smooth
# updates but still allow removals to occur
SMOOTH_UPDATES = badgers
SMOOTH_UPDATES = libs oldlibs
IGNORE_CRUFT = 0
@ -116,6 +115,8 @@ ADT_PRIVATE_URL = https://autopkgtest.ubuntu.com/private-results/
ADT_PRIVATE_RETRY =
# Base URL for autopkgtest site, used for links in the excuses
ADT_CI_URL = https://autopkgtest.ubuntu.com/
# URL for the autopkgtest database, if used
ADT_DB_URL = https://autopkgtest.ubuntu.com/static/autopkgtest.db
ADT_HUGE = 20
# Change to 'yes' for excuses to include all ADT results, even those requiring
# no action (like passing or always-failed)
@ -124,7 +125,7 @@ ADT_SHOW_IRRELEVANT = no
# Autopkgtest results can be used to influence the aging
ADT_REGRESSION_PENALTY =
ADT_SUCCESS_BOUNTY =
ADT_BASELINE =
ADT_BASELINE = reference
ADT_RETRY_URL_MECH =
ADT_RETRY_OLDER_THAN =
ADT_REFERENCE_MAX_AGE =
@ -136,3 +137,22 @@ SRUREGRESSIONEMAIL_ENABLE = no
# we don't run piuparts testing in Ubuntu
PIUPARTS_ENABLE = no
# run cloud tests on packages
CLOUD_ENABLE = no
# A directory to store Cloud test results and logs. Is created at the start of
# each policy run and deleted after test results are parsed.
CLOUD_WORK_DIR = cloud_tests
# Who to notify regarding test failures
CLOUD_FAILURE_EMAILS = cpc@canonical.com
# Who to notify regarding test errors
CLOUD_ERROR_EMAILS = cpc@canonical.com
# A set of Azure specific settings
CLOUD_AZURE_LOCATION = westeurope
CLOUD_AZURE_VM_SIZE = Standard_D2s_v5
CLOUD_AZURE_LUNAR_URN = Canonical:0001-com-ubuntu-server-lunar-daily:23_04-daily-gen2:23.04.202301030
CLOUD_AZURE_KINETIC_URN = Canonical:0001-com-ubuntu-server-kinetic:22_10:22.10.202301040
CLOUD_AZURE_JAMMY_URN = Canonical:0001-com-ubuntu-server-jammy:22_04-lts-gen2:22.04.202212140
CLOUD_AZURE_FOCAL_URN = Canonical:0001-com-ubuntu-server-focal:20_04-lts-gen2:20.04.202212140
CLOUD_AZURE_BIONIC_URN = Canonical:UbuntuServer:18_04-lts-gen2:18.04.202212090

@ -223,6 +223,7 @@ from britney2.policies.autopkgtest import AutopkgtestPolicy
from britney2.policies.sourceppa import SourcePPAPolicy
from britney2.policies.sruadtregression import SRUADTRegressionPolicy
from britney2.policies.email import EmailPolicy
from britney2.policies.cloud import CloudPolicy
from britney2.policies.lpexcusebugs import LPExcuseBugsPolicy
from britney2.utils import (log_and_format_old_libraries,
read_nuninst, write_nuninst, write_heidi,
@ -446,11 +447,8 @@ class Britney(object):
pass
# integrity checks
if self.options.nuninst_cache and self.options.print_uninst: # pragma: no cover
self.logger.error("nuninst_cache and print_uninst are mutually exclusive!")
sys.exit(1)
# if the configuration file exists, then read it and set the additional options
elif not os.path.isfile(self.options.config): # pragma: no cover
if not os.path.isfile(self.options.config): # pragma: no cover
self.logger.error("Unable to read the configuration file (%s), exiting!", self.options.config)
sys.exit(1)
@ -555,6 +553,11 @@ class Britney(object):
self._policy_engine.add_policy(EmailPolicy(self.options,
self.suite_info,
dry_run=add_email_policy == 'dry-run'))
add_cloud_policy = getattr(self.options, 'cloud_enable', 'no')
if add_cloud_policy in ('yes', 'dry-run'):
self._policy_engine.add_policy(CloudPolicy(self.options,
self.suite_info,
dry_run=add_cloud_policy == 'dry-run'))
@property
def hints(self):

@ -97,18 +97,6 @@ class ExcuseFinder(object):
source_u.section and excuse.set_section(source_u.section)
excuse.set_distribution(self.options.distribution)
# if there is a `remove' hint and the requested version is the same as the
# version in testing, then stop here and return False
# (as a side effect, a removal may generate such excuses for both the source
# package and its binary packages on each architecture)
for hint in self.hints.search('remove', package=src, version=source_t.version):
excuse.add_hint(hint)
excuse.policy_verdict = PolicyVerdict.REJECTED_PERMANENTLY
excuse.add_verdict_info(excuse.policy_verdict, "Removal request by %s" % (hint.user))
excuse.add_verdict_info(excuse.policy_verdict, "Trying to remove package, not update it")
self.excuses[excuse.name] = excuse
return False
# the starting point is that there is nothing wrong and nothing worth doing
anywrongver = False
anyworthdoing = False

@ -28,6 +28,7 @@ import io
import itertools
import re
import socket
import sqlite3
import sys
import time
import urllib.parse
@ -124,6 +125,7 @@ class AutopkgtestPolicy(BasePolicy):
self.pending_tests_file = os.path.join(self.state_dir, 'autopkgtest-pending.json')
self.testsuite_triggers = {}
self.result_in_baseline_cache = collections.defaultdict(dict)
self.database_path = os.path.join(self.state_dir, 'autopkgtest.db')
# results map: trigger -> src -> arch -> [passed, version, run_id, seen]
# - trigger is "source/version" of an unstable package that triggered
@ -140,6 +142,12 @@ class AutopkgtestPolicy(BasePolicy):
else:
self.results_cache_file = os.path.join(self.state_dir, 'autopkgtest-results.cache')
if hasattr(self.options,'adt_db_url') and self.options.adt_db_url:
if not self.fetch_db():
self.logger.error('No autopkgtest db present, exiting')
sys.exit(1)
self.db = sqlite3.connect(self.database_path)
try:
self.options.adt_ppas = self.options.adt_ppas.strip().split()
# We also allow, for certain other use-cases, passing the PPA
@ -171,6 +179,34 @@ class AutopkgtestPolicy(BasePolicy):
else:
self.logger.info("Ignoring ADT_ARCHES %s as it is not in architectures list", arch)
def fetch_db(self):
f = None
try:
f = self.download_retry(self.options.adt_db_url)
http_code = f.getcode()
# file:/// urls don't have the http niceties
if not http_code or http_code == 200:
new_file = self.database_path + '.new'
with open(new_file,'wb') as f_out:
while True:
data=f.read(2048*1024)
if not data:
break
f_out.write(data)
if http_code and os.path.getsize(new_file) != int(f.getheader('content-length')):
self.logger.info('Short read downloading autopkgtest results')
os.unlink(new_file)
else:
os.rename(new_file, self.database_path)
else:
self.logger.error('Failure to fetch autopkgtest results %s: HTTP code=%d', self.options.adt_db_url, f.getcode())
except IOError as e:
self.logger.error('Failure to fetch autopkgtest results %s: %s', self.options.adt_db_url, str(e))
finally:
if f is not None:
f.close()
return os.path.exists(self.database_path)
def register_hints(self, hint_parser):
hint_parser.register_hint_type('force-badtest', britney2.hints.split_into_one_hint_per_package)
hint_parser.register_hint_type('force-skiptest', britney2.hints.split_into_one_hint_per_package)
@ -495,7 +531,7 @@ class AutopkgtestPolicy(BasePolicy):
history_url = cloud_url % {
'h': srchash(testsrc), 's': testsrc,
'r': self.options.series, 'a': arch}
if status == 'REGRESSION':
if status in ['REGRESSION', 'RUNNING-REFERENCE']:
if self.options.adt_retry_url_mech == 'run_id':
retry_url = self.options.adt_ci_url + 'api/v1/retry/' + run_id
elif self.options.adt_private_retry:
@ -599,6 +635,7 @@ class AutopkgtestPolicy(BasePolicy):
pkg_universe = self.britney.pkg_universe
target_suite = self.suite_info.target_suite
source_suite = item.suite
sources_t = target_suite.sources
sources_s = item.suite.sources
packages_s_a = item.suite.binaries[arch]
source_name = item.package
@ -683,6 +720,8 @@ class AutopkgtestPolicy(BasePolicy):
if binary.architecture == arch:
try:
source_of_bin = packages_s_a[binary.package_name].source
if (sources_t.get(source_of_bin, None) is None or
sources_s[source_of_bin].version != sources_t[source_of_bin].version):
triggers.add(
source_of_bin + '/' +
sources_s[source_of_bin].version)
@ -694,6 +733,8 @@ class AutopkgtestPolicy(BasePolicy):
if binary not in source_data_srcdist.binaries:
for tdep_src in self.testsuite_triggers.get(binary.package_name, set()):
try:
if (sources_t.get(tdep_src, None) is None or
sources_s[tdep_src].version != sources_t[tdep_src].version):
triggers.add(
tdep_src + '/' +
sources_s[tdep_src].version)
@ -880,7 +921,7 @@ class AutopkgtestPolicy(BasePolicy):
try:
req = urlopen(url, timeout=30)
code = req.getcode()
if 200 <= code < 300:
if not code or 200 <= code < 300:
return req
except socket.timeout as e:
self.logger.info(
@ -1081,6 +1122,71 @@ class AutopkgtestPolicy(BasePolicy):
for trigger in result_triggers:
self.add_trigger_to_results(trigger, src, ver, arch, run_id, seen, result)
def fetch_sqlite_results(self, src, arch):
'''Retrieve new results for source package/arch from sqlite
Remove matching pending_tests entries.
'''
# determine latest run_id from results
latest_run_id = ''
if not self.options.adt_shared_results_cache:
latest_run_id = self.latest_run_for_package(src, arch)
if not latest_run_id:
latest_run_id = ''
cur = self.db.cursor()
for row in cur.execute('SELECT r.exitcode,r.version,r.triggers,'
' r.run_id FROM test AS t '
'LEFT JOIN result AS r ON t.id=r.test_id '
'WHERE t.release=? AND t.arch=? '
'AND t.package=? AND r.run_id > ?',
(self.options.series, arch, src, latest_run_id)):
exitcode, ver, triggers, run_id = row
if not ver:
if exitcode in (4, 12, 20):
# repair it
ver = "unknown"
else:
self.logger.error('%s/%s/%s is damaged, ignoring',
arch, src, run_id)
# ignore this; this will leave an orphaned request
# in autopkgtest-pending.json and thus require
# manual retries after fixing the tmpfail, but we
# can't just blindly attribute it to some pending
# test.
return
# parse recorded triggers in test result
if triggers:
result_triggers = [i for i in triggers.split(' ') if '/' in i]
else:
self.logger.error('%s result has no ADT_TEST_TRIGGERS, ignoring')
continue
# 20200101_000000 is 15 chars long
seen = round(calendar.timegm(time.strptime(run_id[:15], '%Y%m%d_%H%M%S')))
# allow some skipped tests, but nothing else
if exitcode in (0, 2):
result = Result.PASS
elif exitcode == 8:
result = Result.NEUTRAL
else:
result = Result.FAIL
self.logger.info(
'Fetched test result for %s/%s/%s %s (triggers: %s): %s',
src, ver, arch, run_id, result_triggers, result.name.lower())
# remove matching test requests
for trigger in result_triggers:
self.remove_from_pending(trigger, src, arch)
# add this result
for trigger in result_triggers:
self.add_trigger_to_results(trigger, src, ver, arch, run_id, seen, result)
def remove_from_pending(self, trigger, src, arch):
try:
arch_list = self.pending_tests[trigger][src]
@ -1180,26 +1286,31 @@ class AutopkgtestPolicy(BasePolicy):
if has_result:
result_state = result[0]
version = result[1]
baseline = self.result_in_baseline(src, arch)
if result_state in {Result.OLD_PASS, Result.OLD_FAIL, Result.OLD_NEUTRAL}:
pass
elif result_state == Result.FAIL and \
baseline[0] in {Result.PASS, Result.NEUTRAL, Result.OLD_PASS, Result.OLD_NEUTRAL} and \
self.result_in_baseline(src, arch)[0] in \
{Result.PASS, Result.NEUTRAL, Result.OLD_PASS,
Result.OLD_NEUTRAL} and \
self.options.adt_retry_older_than and \
result[3] + int(self.options.adt_retry_older_than) * SECPERDAY < self._now:
# We might want to retry this failure, so continue
pass
elif not uses_swift:
elif not uses_swift and not hasattr(self,'db'):
# We're done if we don't retrigger and we're not using swift
return
elif result_state in {Result.PASS, Result.NEUTRAL}:
self.logger.debug('%s/%s triggered by %s already known', src, arch, trigger)
return
# Without swift we don't expect new results
if uses_swift:
# Without swift or autopkgtest.db we don't expect new results
if hasattr(self,'db'):
self.fetch_sqlite_results(src, arch)
elif uses_swift:
self.logger.info('Checking for new results for failed %s/%s for trigger %s', src, arch, trigger)
self.fetch_swift_results(self.options.adt_swift_url, src, arch)
if hasattr(self,'db') or uses_swift:
# do we have one now?
try:
self.test_results[trigger][src][arch]
@ -1241,7 +1352,21 @@ class AutopkgtestPolicy(BasePolicy):
result_reference = [Result.NONE, None, '', 0]
if self.options.adt_baseline == 'reference':
try:
try:
result_reference = self.test_results[REF_TRIG][src][arch]
except KeyError:
uses_swift = not self.options.adt_swift_url.startswith('file://')
# Without swift or autopkgtest.db we don't expect new results
if hasattr(self,'db'):
self.logger.info('Checking for new results for %s/%s for trigger %s', src, arch, REF_TRIG)
self.fetch_sqlite_results(src, arch)
elif uses_swift:
self.logger.info('Checking for new results for %s/%s for trigger %s', src, arch, REF_TRIG)
self.fetch_swift_results(self.options.adt_swift_url, src, arch)
# do we have one now?
result_reference = self.test_results[REF_TRIG][src][arch]
self.logger.debug('Found result for src %s in reference: %s',
src, result_reference[0].name)
except KeyError:
@ -1277,12 +1402,20 @@ class AutopkgtestPolicy(BasePolicy):
target_suite = self.suite_info.target_suite
binaries_info = target_suite.binaries[arch]
# determine current test result status
baseline_result = self.result_in_baseline(src, arch)[0]
# determine current test result status
until = self.find_max_lower_force_reset_test(src, ver, arch)
ever_passed = self.check_ever_passed_before(src, ver, arch, until)
# Special-case triggers from linux-meta*: we cannot compare results
# against different kernels, as e. g. a DKMS module might work against
# the default kernel but fail against a different flavor; so for those,
# filter the considered results to only those against our kernel
if trigger.startswith('linux-meta'):
only_trigger = trigger.split('/', 1)[0]
self.logger.info('This is a kernel; we will only look for results triggered by %s when considering regressions',
trigger)
else:
only_trigger = None
ever_passed = self.check_ever_passed_before(src, ver, arch, until, only_trigger=only_trigger)
fail_result = 'REGRESSION' if ever_passed else 'ALWAYSFAIL'
@ -1294,14 +1427,8 @@ class AutopkgtestPolicy(BasePolicy):
run_id = r[2]
if r[0] in {Result.FAIL, Result.OLD_FAIL}:
# Special-case triggers from linux-meta*: we cannot compare
# results against different kernels, as e. g. a DKMS module
# might work against the default kernel but fail against a
# different flavor; so for those, ignore the "ever
# passed" check; FIXME: check against trigsrc only
if self.options.adt_baseline != 'reference' and \
(trigger.startswith('linux-meta') or trigger.startswith('linux/')):
baseline_result = Result.FAIL
# determine current test result status
baseline_result = self.result_in_baseline(src, arch)[0]
if baseline_result == Result.FAIL:
result = 'ALWAYSFAIL'
@ -1362,6 +1489,7 @@ class AutopkgtestPolicy(BasePolicy):
except KeyError:
# no result for src/arch; still running?
if arch in self.pending_tests.get(trigger, {}).get(src, []):
baseline_result = self.result_in_baseline(src, arch)[0]
if baseline_result != Result.FAIL and not self.has_force_badtest(src, ver, arch):
result = 'RUNNING'
else:
@ -1376,7 +1504,7 @@ class AutopkgtestPolicy(BasePolicy):
return (result, ver, run_id, url)
def check_ever_passed_before(self, src, max_ver, arch, min_ver=None):
def check_ever_passed_before(self, src, max_ver, arch, min_ver=None, only_trigger=None):
'''Check if tests for src ever passed on arch for specified range
If min_ver is specified, it checks that all versions in
@ -1384,7 +1512,11 @@ class AutopkgtestPolicy(BasePolicy):
[min_ver, inf) have passed.'''
# FIXME: add caching
for srcmap in self.test_results.values():
for (trigger, srcmap) in self.test_results.items():
if only_trigger:
trig = trigger.split('/', 1)[0]
if only_trigger != trig:
continue
try:
too_high = apt_pkg.version_compare(srcmap[src][arch][1], max_ver) > 0
too_low = apt_pkg.version_compare(srcmap[src][arch][1], min_ver) <= 0 if min_ver else False
@ -1400,12 +1532,15 @@ class AutopkgtestPolicy(BasePolicy):
def find_max_lower_force_reset_test(self, src, ver, arch):
'''Find the maximum force-reset-test hint before/including ver'''
hints = self.hints.search('force-reset-test', package=src)
found_ver = None
if hints:
for hint in hints:
if not hasattr(self, 'reset_hints'):
self.reset_hints = self.hints.search('force-reset-test')
for hint in self.reset_hints:
for mi in hint.packages:
if mi.package != src:
continue
if (mi.architecture in ['source', arch] and
mi.version != 'all' and
apt_pkg.version_compare(mi.version, ver) <= 0 and
@ -1416,12 +1551,15 @@ class AutopkgtestPolicy(BasePolicy):
def has_higher_force_reset_test(self, src, ver, arch):
'''Find if there is a minimum force-reset-test hint after/including ver'''
hints = self.hints.search('force-reset-test', package=src)
if hints:
self.logger.info('Checking hints for %s/%s/%s: %s' % (src, ver, arch, [str(h) for h in hints]))
for hint in hints:
if not hasattr(self, 'reset_hints'):
self.reset_hints = self.hints.search('force-reset-test')
for hint in self.reset_hints:
for mi in hint.packages:
if mi.package != src:
continue
self.logger.info('Checking hints for %s/%s/%s: %s' % (src, ver, arch, str(hint)))
if (mi.architecture in ['source', arch] and
mi.version != 'all' and
apt_pkg.version_compare(mi.version, ver) >= 0):

@ -0,0 +1,453 @@
import json
import os
from pathlib import PurePath
import re
import shutil
import smtplib
import socket
import subprocess
import xml.etree.ElementTree as ET
from britney2 import SuiteClass
from britney2.policies.policy import BasePolicy
from britney2.policies import PolicyVerdict
class MissingURNException(Exception):
pass
FAIL_MESSAGE = """From: Ubuntu Release Team <noreply+proposed-migration@ubuntu.com>
To: {recipients}
X-Proposed-Migration: notice
Subject: [proposed-migration] {package} {version} in {series} failed Cloud tests.
Hi,
{package} {version} needs attention.
This package fails the following tests:
{results}
If you have any questions about this email, please ask them in #ubuntu-release channel on libera.chat.
Regards, Ubuntu Release Team.
"""
ERR_MESSAGE = """From: Ubuntu Release Team <noreply+proposed-migration@ubuntu.com>
To: {recipients}
X-Proposed-Migration: notice
Subject: [proposed-migration] {package} {version} in {series} had errors running Cloud Tests.
Hi,
During Cloud tests of {package} {version} the following errors occurred:
{results}
If you have any questions about this email, please ask them in #ubuntu-release channel on libera.chat.
Regards, Ubuntu Release Team.
"""
class CloudPolicy(BasePolicy):
PACKAGE_SET_FILE = "cloud_package_set"
DEFAULT_EMAILS = ["cpc@canonical.com"]
TEST_LOG_FILE = "CTF.log"
def __init__(self, options, suite_info, dry_run=False):
super().__init__(
"cloud", options, suite_info, {SuiteClass.PRIMARY_SOURCE_SUITE}
)
self.dry_run = dry_run
if self.dry_run:
self.logger.info("Cloud Policy: Dry-run enabled")
self.email_host = getattr(self.options, "email_host", "localhost")
self.logger.info(
"Cloud Policy: will send emails to: %s", self.email_host
)
self.work_dir = getattr(self.options, "cloud_work_dir", "cloud_tests")
self.failure_emails = getattr(self.options, "cloud_failure_emails", self.DEFAULT_EMAILS)
self.error_emails = getattr(self.options, "cloud_error_emails", self.DEFAULT_EMAILS)
adt_ppas = getattr(self.options, "adt_ppas", "").split()
ppas = self._parse_ppas(adt_ppas)
if len(ppas) == 0:
self.sources = ["proposed"]
self.source_type = "archive"
else:
self.sources = ppas
self.source_type = "ppa"
self.failures = {}
self.errors = {}
def initialise(self, britney):
super().initialise(britney)
self.package_set = self._retrieve_cloud_package_set_for_series(self.options.series)
def apply_src_policy_impl(self, policy_info, item, source_data_tdist, source_data_srcdist, excuse):
if item.package not in self.package_set:
return PolicyVerdict.PASS
if self.dry_run:
self.logger.info(
"Cloud Policy: Dry run would test {} in {}".format(item.package , self.options.series)
)
return PolicyVerdict.PASS
self._setup_work_directory()
self.failures = {}
self.errors = {}
self._run_cloud_tests(item.package, self.options.series, self.sources, self.source_type)
if len(self.failures) > 0 or len(self.errors) > 0:
self._send_emails_if_needed(item.package, source_data_srcdist.version, self.options.series)
self._cleanup_work_directory()
verdict = PolicyVerdict.REJECTED_PERMANENTLY
info = self._generate_verdict_info(self.failures, self.errors)
excuse.add_verdict_info(verdict, info)
return verdict
else:
self._cleanup_work_directory()
return PolicyVerdict.PASS
def _retrieve_cloud_package_set_for_series(self, series):
"""Retrieves a set of packages for the given series in which cloud
tests should be run.
Temporarily a static list retrieved from file. Will be updated to
retrieve from a database at a later date.
:param series The Ubuntu codename for the series (e.g. jammy)
"""
package_set = set()
with open(self.PACKAGE_SET_FILE) as file:
for line in file:
package_set.add(line.strip())
return package_set
def _run_cloud_tests(self, package, series, sources, source_type):
"""Runs any cloud tests for the given package.
Nothing is returned but test failures and errors are stored in instance variables.
:param package The name of the package to test
:param series The Ubuntu codename for the series (e.g. jammy)
:param sources List of sources where the package should be installed from (e.g. [proposed] or PPAs)
:param source_type Either 'archive' or 'ppa'
"""
self._run_azure_tests(package, series, sources, source_type)
def _send_emails_if_needed(self, package, version, series):
"""Sends email(s) if there are test failures and/or errors
:param package The name of the package that was tested
:param version The version number of the package
:param series The Ubuntu codename for the series (e.g. jammy)
"""
if len(self.failures) > 0:
emails = self.failure_emails
message = self._format_email_message(
FAIL_MESSAGE, emails, package, version, self.failures
)
self.logger.info("Cloud Policy: Sending failure email for {}, to {}".format(package, emails))
self._send_email(emails, message)
if len(self.errors) > 0:
emails = self.error_emails
message = self._format_email_message(
ERR_MESSAGE, emails, package, version, self.errors
)
self.logger.info("Cloud Policy: Sending error email for {}, to {}".format(package, emails))
self._send_email(emails, message)
def _run_azure_tests(self, package, series, sources, source_type):
"""Runs Azure's required package tests.
:param package The name of the package to test
:param series The Ubuntu codename for the series (e.g. jammy)
:param sources List of sources where the package should be installed from (e.g. [proposed] or PPAs)
:param source_type Either 'archive' or 'ppa'
"""
urn = self._retrieve_urn(series)
self.logger.info("Cloud Policy: Running Azure tests for: {} in {}".format(package, series))
params = [
"/snap/bin/cloud-test-framework",
"--instance-prefix", "britney-{}-{}".format(package, series)
]
params.extend(self._format_install_flags(package, sources, source_type))
params.extend(
[
"azure_gen2",
"--location", getattr(self.options, "cloud_azure_location", "westeurope"),
"--vm-size", getattr(self.options, "cloud_azure_vm_size", "Standard_D2s_v5"),
"--urn", urn,
"run-test", "package-install-with-reboot",
]
)
with open(PurePath(self.work_dir, self.TEST_LOG_FILE), "w") as file:
subprocess.run(
params,
cwd=self.work_dir,
stdout=file
)
results_file_paths = self._find_results_files(r"TEST-NetworkTests-[0-9]*.xml")
self._parse_xunit_test_results("Azure", results_file_paths)
self._store_extra_test_result_info(self, package)
def _retrieve_urn(self, series):
"""Retrieves an URN from the configuration options based on series.
An URN identifies a unique image in Azure.
:param series The ubuntu codename for the series (e.g. jammy)
"""
urn = getattr(self.options, "cloud_azure_{}_urn".format(series), None)
if urn is None:
raise MissingURNException("No URN configured for {}".format(series))
return urn
def _find_results_files(self, file_regex):
"""Find any test results files that match the given regex pattern.
:param file_regex A regex pattern to use for matching the name of the results file.
"""
file_paths = []
for file in os.listdir(self.work_dir):
if re.fullmatch(file_regex, file):
file_paths.append(PurePath(self.work_dir, file))
return file_paths
def _parse_xunit_test_results(self, cloud, results_file_paths):
"""Parses and stores any failure or error test results.
:param cloud The name of the cloud, use for storing the results.
:param results_file_paths List of paths to results files
"""
for file_path in results_file_paths:
with open(file_path) as file:
xml = ET.parse(file)
root = xml.getroot()
if root.tag == "testsuites":
for testsuite in root:
self._parse_xunit_testsuite(cloud, testsuite)
else:
self._parse_xunit_testsuite(cloud, root)
def _parse_xunit_testsuite(self, cloud, root):
"""Parses the xunit testsuite and stores any failure or error test results.
:param cloud The name of the cloud, used for storing the results.
:param root An XML tree root.
"""
for el in root:
if el.tag == "testcase":
for e in el:
if e.tag == "failure":
type = e.attrib.get('type')
message = e.attrib.get('message')
info = "{}: {}".format(type, message)
self._store_test_result(
self.failures, cloud, el.attrib.get('name'), info
)
if e.tag == "error":
type = e.attrib.get('type')
message = e.attrib.get('message')
info = "{}: {}".format(type, message)
self._store_test_result(
self.errors, cloud, el.attrib.get('name'), info
)
def _store_test_result(self, results, cloud, test_name, message):
"""Adds the test to the results hash under the given cloud.
Results format:
{
cloud1: {
test_name1: message1
test_name2: message2
},
cloud2: ...
}
:param results A hash to add results to
:param cloud The name of the cloud
:param message The exception or assertion error given by the test
"""
if cloud not in results:
results[cloud] = {}
results[cloud][test_name] = message
def _store_extra_test_result_info(self, cloud, package):
"""Stores any information beyond the test results and stores it in the results dicts
under Cloud->extra_info
Stores any information retrieved under the cloud's section in failures/errors but will
store nothing if failures/errors are empty.
:param cloud The name of the cloud
:param package The name of the package to test
"""
if len(self.failures) == 0 and len(self.errors) == 0:
return
extra_info = {}
install_source = self._retrieve_package_install_source_from_test_output(package)
if install_source:
extra_info["install_source"] = install_source
if len(self.failures.get(cloud, {})) > 0:
self._store_test_result(self.failures, cloud, "extra_info", extra_info)
if len(self.errors.get(cloud, {})) > 0:
self._store_test_result(self.errors, cloud, "extra_info", extra_info)
def _retrieve_package_install_source_from_test_output(self, package):
"""Checks the test logs for apt logs which show where the package was installed from.
Useful if multiple PPA sources are defined since we won't explicitly know the exact source.
Will return nothing unless exactly one matching line is found.
:param package The name of the package to test
"""
possible_locations = []
with open(PurePath(self.work_dir, self.TEST_LOG_FILE), "r") as file:
for line in file:
if package not in line:
continue
if "Get:" not in line:
continue
if " {} ".format(package) not in line:
continue
possible_locations.append(line)
if len(possible_locations) == 1:
return possible_locations[0]
else:
return None
def _format_email_message(self, template, emails, package, version, test_results):
"""Insert given parameters into the email template."""
series = self.options.series
results = json.dumps(test_results, indent=4)
recipients = ", ".join(emails)
message = template.format(**locals())
return message
def _send_email(self, emails, message):
"""Send an email
:param emails List of emails to send to
:param message The content of the email
"""
try:
server = smtplib.SMTP(self.email_host)
server.sendmail("noreply+proposed-migration@ubuntu.com", emails, message)
server.quit()
except socket.error as err:
self.logger.error("Cloud Policy: Failed to send mail! Is SMTP server running?")
self.logger.error(err)
def _generate_verdict_info(self, failures, errors):
info = ""
if len(failures) > 0:
fail_clouds = ",".join(list(failures.keys()))
info += "Cloud testing failed for {}.".format(fail_clouds)
if len(errors) > 0:
error_clouds = ",".join(list(errors.keys()))
info += " Cloud testing had errors for {}.".format(error_clouds)
return info
def _format_install_flags(self, package, sources, source_type):
"""Determine the flags required to install the package from the given sources
:param package The name of the package to test
:param sources List of sources where the package should be installed from (e.g. [proposed] or PPAs)
:param source_type Either 'archive' or 'ppa'
"""
install_flags = []
for source in sources:
if source_type == "archive":
install_flags.append("--install-archive-package")
install_flags.append("{}/{}".format(package, source))
elif source_type == "ppa":
install_flags.append("--install-ppa-package")
install_flags.append("{}/{}".format(package, source))
else:
raise RuntimeError("Cloud Policy: Unexpected source type, {}".format(source_type))
return install_flags
def _parse_ppas(self, ppas):
"""Parse PPA list to store in format expected by cloud tests
Only supports PPAs provided with a fingerprint
Britney private PPA format:
'user:token@team/name:fingerprint'
Britney public PPA format:
'team/name:fingerprint'
Cloud private PPA format:
'https://user:token@private-ppa.launchpadcontent.net/team/name/ubuntu=fingerprint
Cloud public PPA format:
'https://ppa.launchpadcontent.net/team/name/ubuntu=fingerprint
:param ppas List of PPAs in Britney approved format
:return A list of PPAs in valid cloud test format. Can return an empty list if none found.
"""
cloud_ppas = []
for ppa in ppas:
if '@' in ppa:
match = re.match("^(?P<auth>.+:.+)@(?P<name>.+):(?P<fingerprint>.+$)", ppa)
if not match:
raise RuntimeError('Private PPA %s not following required format (user:token@team/name:fingerprint)', ppa)
formatted_ppa = "https://{}@private-ppa.launchpadcontent.net/{}/ubuntu={}".format(
match.group("auth"), match.group("name"), match.group("fingerprint")
)
cloud_ppas.append(formatted_ppa)
else:
match = re.match("^(?P<name>.+):(?P<fingerprint>.+$)", ppa)
if not match:
raise RuntimeError('Public PPA %s not following required format (team/name:fingerprint)', ppa)
formatted_ppa = "https://ppa.launchpadcontent.net/{}/ubuntu={}".format(
match.group("name"), match.group("fingerprint")
)
cloud_ppas.append(formatted_ppa)
return cloud_ppas
def _setup_work_directory(self):
"""Create a directory for tests to be run in."""
self._cleanup_work_directory()
os.makedirs(self.work_dir)
def _cleanup_work_directory(self):
"""Delete the the directory used for running tests."""
if os.path.exists(self.work_dir):
shutil.rmtree(self.work_dir)

@ -28,7 +28,7 @@ BOTS = {
USER + "katie",
}
MESSAGE = """From: Ubuntu Release Team <noreply@canonical.com>
MESSAGE = """From: Ubuntu Release Team <noreply+proposed-migration@ubuntu.com>
To: {recipients}
X-Proposed-Migration: notice
Subject: [proposed-migration] {source_name} {version} stuck in {series}-proposed for {age} day{plural}.
@ -45,7 +45,7 @@ http://people.canonical.com/~ubuntu-archive/proposed-migration/{series}/update_e
https://wiki.ubuntu.com/ProposedMigration
If you have any questions about this email, please ask them in #ubuntu-release channel on Freenode IRC.
If you have any questions about this email, please ask them in #ubuntu-release channel on libera.chat.
Regards, Ubuntu Release Team.
"""
@ -287,7 +287,7 @@ class EmailPolicy(BasePolicy, Rest):
)
)
server = smtplib.SMTP(self.email_host)
server.sendmail("noreply@canonical.com", emails, msg)
server.sendmail("noreply+proposed-migration@ubuntu.com", emails, msg)
server.quit()
# record the age at which the mail should have been sent
last_sent = last_due

@ -1523,11 +1523,6 @@ class ImplicitDependencyPolicy(BasePolicy):
# source for pkg not in unstable: candidate for removal
return True
source_t = target_suite.sources[src]
for hint in self.hints.search('remove', package=src, version=source_t.version):
# removal hint for the source in testing: candidate for removal
return True
if target_suite.is_cruft(pkg):
# if pkg is cruft in testing, removal will be tried
return True

@ -15,6 +15,7 @@ from britney2.policies.policy import BasePolicy, PolicyVerdict
LAUNCHPAD_URL = "https://api.launchpad.net/1.0/"
PRIMARY = LAUNCHPAD_URL + "ubuntu/+archive/primary"
INCLUDE = ["~bileto-ppa-service/", "~ci-train-ppa-service/"]
EXCLUDE = ["~ci-train-ppa-service/+archive/ubuntu/4810", "~ci-train-ppa-service/+archive/ubuntu/4813", "~ci-train-ppa-service/+archive/ubuntu/4815", "~ci-train-ppa-service/+archive/ubuntu/4816"]
class SourcePPAPolicy(BasePolicy, Rest):
@ -105,6 +106,8 @@ class SourcePPAPolicy(BasePolicy, Rest):
sourceppa = self.lp_get_source_ppa(source_name, version) or ""
verdict = excuse.policy_verdict
self.source_ppas_by_pkg[source_name][version] = sourceppa
if [team for team in EXCLUDE if team in sourceppa]:
return PolicyVerdict.PASS
if not [team for team in INCLUDE if team in sourceppa]:
return PolicyVerdict.PASS

@ -171,7 +171,7 @@ class SRUADTRegressionPolicy(BasePolicy, Rest):
bug_mail = "%s@bugs.launchpad.net" % bug
server = smtplib.SMTP(self.email_host)
server.sendmail(
"noreply@canonical.com",
"noreply+proposed-migration@ubuntu.com",
bug_mail,
MESSAGE.format(**locals()),
)

@ -57,7 +57,6 @@ BOUNTY_MIN_AGE = 2
HINTSDIR = data/%(SERIES)-proposed/Hints
# hint permissions
HINTS_ADCONRAD = ALL
HINTS_LANEY = ALL
HINTS_STEFANOR = ALL
HINTS_STGRABER = ALL

@ -0,0 +1,600 @@
openssh-client
libmpfr6
python3-problem-report
rsync
libdb5
libattr1
locales
xz-utils
libpam-modules-bin
strace
xkb-data
dosfstools
libpolkit-agent-1-0
usbutils
dmsetup
libfwupdplugin1
ucf
libdbus-1-3
vim-runtime
parted
libxmlsec1-openssl
debconf-i18n
libdevmapper-event1
libjson-c4
libmbim-proxy
sound-theme-freedesktop
python3-urllib3
libgcc-s1
libp11-kit0
libparted2
keyutils
libplymouth5
libsemanage-common
procps
python3-httplib2
libxslt1
glib-networking-common
nano
libkeyutils1
libx11-6
linux-base-sgx
libreadline8
libtext-charwidth-perl
libk5crypto3
pciutils
sensible-utils
init-system-helpers
libatm1
python3-requests
python3-idna
pci
vim-tiny
libnss3
ncurses-bin
udev
cryptsetup-initramfs
libasound2-data
modemmanager
bsdmainutils
libevent-2
xfsprogs
libbsd0
ubuntu-advantage-pro
libfido2-1
libzstd1
chrony
libxmlb2
python3-pyrsistent
gnupg
dmidecode
os-prober
libbz2-1
libfastjson4
efibootmgr
libdconf1
libtdb1
libldap-common
libreadline5
netcat-openbsd
python3-gi
libpng16-16
packagekit-tools
python3-twisted-bin
python3-cryptography
software-properties-common
popularity-contest
sg3-utils
walinuxagent
btrfs-progs
python3-lib2to3
libyaml-0-2
python3-serial
base-passwd
libsigsegv2
keyboard-configuration
libsasl2-2
gpgv
python-apt-common
grub-pc
gpg-wks-client
manpages
python3-gdbm
apport
hdparm
libdns-export1109
vim
xdg-user-dirs
libxmuu1
python3-cffi-backend
gdisk
libstdc
lsb-base
libip6tc2
htop
linux-image-azure
linux-tools-azure
kpartx
libcanberra0
libpam-modules
liberror-perl
motd-news-config
libgpg-error0
libarchive13
squashfs-tools
lshw
python3-incremental
libogg0
telnet
libgstreamer1
libheimntlm0-heimdal
python3-jinja2
libslang2
libpam-systemd
dconf-service
mount
python3-automat
python3-debian
python3-jsonpatch
dbus
ubuntu-minimal
packagekit
python3-more-itertools
python3-distro-info
at
libtext-iconv-perl
libpci3
base-files
libblockdev-part2
libsqlite3-0
libkmod2
libkrb5support0
iputils-ping
libwrap0
libcom-err2
irqbalance
bcache-tools
update-notifier-common
libncurses6
logsave
ubuntu-release-upgrader-core
libmbim-glib4
bash-completion
lxd-agent-loader
python3-json-pointer
usb-modeswitch-data
fdisk
libestr0
libsystemd0
git-man
findutils
libhcrypto4-heimdal
libpsl5
perl-modules-5
python3-importlib-metadata
xxd
libtinfo6
sbsigntool
file
libext2fs2
passwd
sysvinit-utils
libasound2
libunistring2
libksba8
plymouth-theme-ubuntu-text
distro-info-data
libtevent0
libmount1
libsasl2-modules
python3-jwt
libntfs-3g883
gzip
publicsuffix
openssh-server
python3-hamcrest
iputils-tracepath
gpg-agent
iproute2
libmm-glib0
apt
libfwupdplugin5
libtext-wrapi18n-perl
libvolume-key1
libsodium23
kmod
cloud-guest-utils
python3-apport
python3-openssl
linux-cloud-tools-azure
gir1
libx11-data
libuv1
unattended-upgrades
python3-six
run-one
linux-headers-azure
byobu
bc
libudisks2-0
libargon2-1
libdevmapper1
cryptsetup
policykit-1
libcap-ng0
libgusb2
libkrb5-26-heimdal
gsettings-desktop-schemas
ssh-import-id
python3-requests-unixsocket
thin-provisioning-tools
libc6
libefivar1
libusb-1
login
libssl1
libklibc
dpkg
python3-distro
eatmydata
bash
python3-chardet
libsmartcols1
usb
libheimbase1-heimdal
fonts-ubuntu-console
libnss-systemd
libroken18-heimdal
libpolkit-gobject-1-0
zerofree
initramfs-tools
libnettle7
bind9-dnsutils
ubuntu-server
glib-networking-services
linux-tools-common
python3-markupsafe
liburcu6
diffutils
python3-pexpect
libpcap0
ncurses-base
libxml2
libuchardet0
libblkid1
powermgmt-base
dbus-user-session
ca-certificates
sosreport
libtasn1-6
busybox-initramfs
mawk
eject
libblockdev-fs2
e2fsprogs
libdrm-common
python3-secretstorage
vim-common
grub-efi-amd64-bin
libudev1
systemd
python3-certifi
ed
libbrotli1
linux-image-5
libfuse2
python3-click
python3-jsonschema
bind9-libs
libsgutils2-2
distro-info
libssh-4
libtss2-esys0
plymouth
zlib1g
libeatmydata1
dconf-gsettings-backend
libapparmor1
libblockdev-swap2
libfdisk1
libgcrypt20
friendly-recovery
libkrb5-3
libgpm2
gawk
initramfs-tools-core
pollinate
libelf1
gettext-base
kbd
libxmlsec1
gpg
libnetplan0
python3
libcurl3-gnutls
apport-symptoms
libgpgme11
python3-debconf
libnewt0
isc-dhcp-common
language-selector-common
coreutils
grub-common
libsoup2
console-setup
sudo
command-not-found
libdebconfclient0
libjson-glib-1
python3-launchpadlib
grep
cifs-utils
whiptail
linux-cloud-tools-common
libjcat1
libisc-export1105
cron
python3-pkg-resources
libblockdev-part-err2
libnuma1
libxau6
libaudit-common
libglib2
libselinux1
libicu66
git
python3-wadllib
libsepol1
tmux
python3-commandnotfound
isc-dhcp-client
libpython3
dmeventd
liblzma5
python3-setuptools
tpm-udev
libunwind8
grub-efi-amd64-signed
libtalloc2
openssl
libmagic-mgc
libmpdec2
libisns0
libnfnetlink0
libpam0g
linux-modules-5
libffi7
libaio1
klibc-utils
libsmbios-c2
python3-yaml
python3-entrypoints
psmisc
libutempter0
linux-azure
libmaxminddb0
libhx509-5-heimdal
python3-zipp
grub-pc-bin
rsyslog
libfwupd2
python3-update-manager
libgirepository-1
liblz4-1
lsb-release
fwupd-signed
libassuan0
fwupd
screen
python3-distutils
python3-pyasn1-modules
libfl2
usb-modeswitch
libpipeline1
liblocale-gettext-perl
libltdl7
libmagic1
krb5-locales
libaccountsservice0
libsemanage1
libpcre2-8-0
pastebinit
linux-tools-5
groff-base
landscape-common
ubuntu-standard
libgmp10
libproxy1v5
curl
finalrd
ethtool
python3-netifaces
info
libasn1-8-heimdal
libgnutls30
libuuid1
libpam-cap
python3-pymacaroons
libexpat1
busybox-static
shared-mime-info
libwind0-heimdal
open-iscsi
ncurses-term
libparted-fs-resize0
libcbor0
bsdutils
python3-oauthlib
ubuntu-keyring
overlayroot
python3-colorama
mime-support
python3-newt
libsasl2-modules-db
multipath-tools
iso-codes
libidn2-0
perl-base
python3-simplejson
python3-hyperlink
cpio
libnftnl11
fuse
libxcb1
libnetfilter-conntrack3
gnupg-utils
liblmdb0
cloud-initramfs-copymods
libmspack0
libss2
open-vm-tools
ftp
libblockdev-crypto2
perl
accountsservice
iptables
linux-azure-5
gpg-wks-server
hostname
grub-gfxpayload-lists
libnpth0
readline-common
apparmor
libapt-pkg6
gcc-10-base
linux-headers-5
bolt
alsa-ucm-conf
lvm2
libhogweed5
ntfs-3g
systemd-sysv
tzdata
libpython3-stdlib
libwbclient0
pinentry-curses
gnupg-l10n
secureboot-db
libedit2
libcap2
sed
python3-zope
python3-service-identity
libgssapi-krb5-2
python3-keyring
install-info
netplan
python3-software-properties
lsscsi
ufw
libgdbm-compat4
libaudit1
libacl1
python3-lazr
lsof
mtr-tiny
libdw1
libefiboot1
libpopt0
python3-distupgrade
libgssapi3-heimdal
adduser
dirmngr
libvorbis0a
bind9-host
cryptsetup-bin
ltrace
python3-dbus
update-manager-core
python3-configobj
libpam-runtime
python3-systemd
python3-twisted
dash
uuid-runtime
libvorbisfile3
shim-signed
libpcre3
liblzo2-2
debconf
snapd
cloud-init
libblockdev-utils2
linux-base
python3-nacl
mdadm
libcurl4
python3-ptyprocess
networkd-dispatcher
cloud-initramfs-dyn-netconf
util-linux
libgcab-1
libnghttp2-14
gpgsm
libblockdev2
man-db
python3-constantly
liblvm2cmd2
python3-minimal
initramfs-tools-bin
libldap-2
libappstream4
python3-pyasn1
libfribidi0
libblockdev-loop2
tcpdump
udisks2
xauth
libxmlb1
libmnl0
libncursesw6
libc-bin
libseccomp2
tar
libqmi-glib5
grub2-common
alsa-topology-conf
time
libqmi-proxy
libgdbm6
libprocps8
libxtables12
python3-attr
gpgconf
ubuntu-advantage-tools
console-setup-linux
librtmp1
mokutil
glib-networking
libxdmcp6
cryptsetup-run
debianutils
libip4tc2
lz4
python3-blinker
libpackagekit-glib2-18
patch
libstemmer0d
libfreetype6
less
init
systemd-timesyncd
python3-parted
libcrypt1
netbase
libcap2-bin
linux-cloud-tools-5
libcryptsetup12
bzip2
libdrm2
logrotate
libperl5
libatasmart4
libxext6
apt-utils
openssh-sftp-server
libgudev-1
libnspr4
sg3-utils-udev
wget
python3-apt

@ -406,6 +406,7 @@ ADT_PRIVATE_RETRY =
ADT_CI_URL = https://autopkgtest.ubuntu.com/
ADT_HUGE = 20
ADT_SHOW_IRRELEVANT = no
ADT_DB_URL =
ADT_SUCCESS_BOUNTY =
ADT_REGRESSION_PENALTY =

@ -12,6 +12,7 @@ import fileinput
import unittest
import json
import pprint
import sqlite3
import urllib.parse
import apt_pkg
@ -45,11 +46,14 @@ class TestAutopkgtestBase(TestBase):
def setUp(self):
super().setUp()
self.fake_amqp = os.path.join(self.data.path, 'amqp')
self.db_path = os.path.join(self.data.path, 'autopkgtest.db')
# Set fake AMQP and Swift server
# Set fake AMQP and Swift server and autopkgtest.db
for line in fileinput.input(self.britney_conf, inplace=True):
if 'ADT_AMQP' in line:
print('ADT_AMQP = file://%s' % self.fake_amqp)
elif 'ADT_DB_URL' in line:
print('ADT_DB_URL = file://%s' % self.db_path)
else:
sys.stdout.write(line)
@ -83,8 +87,102 @@ class TestAutopkgtestBase(TestBase):
self.swift = mock_swift.AutoPkgTestSwiftServer(port=18085)
self.swift.set_results({})
self.db = self.init_sqlite_db(self.db_path)
def tearDown(self):
del self.swift
self.db.close()
try:
os.unlink(self.db_path)
except FileNotFoundError: pass
# https://git.launchpad.net/autopkgtest-cloud/tree/charms/focal/autopkgtest-web/webcontrol/publish-db,
# https://git.launchpad.net/autopkgtest-cloud/tree/charms/focal/autopkgtest-web/webcontrol/helpers/utils.py
def init_sqlite_db(self, path):
"""Create DB if it does not exist, and connect to it"""
db = sqlite3.connect(path)
db.execute("PRAGMA journal_mode = MEMORY")
db.execute(
"CREATE TABLE current_version("
" release CHAR[20], "
" pocket CHAR[40], "
" component CHAR[10],"
" package CHAR[50], "
" version CHAR[120], "
" PRIMARY KEY(release, package))"
)
db.execute("CREATE INDEX IF NOT EXISTS current_version_pocket_ix "
"ON current_version(pocket, component)")
db.execute(
"CREATE TABLE url_last_checked("
" url CHAR[100], "
" timestamp CHAR[50], "
" PRIMARY KEY(url))"
)
db.execute('CREATE TABLE IF NOT EXISTS test ('
' id INTEGER PRIMARY KEY, '
' release CHAR[20], '
' arch CHAR[20], '
' package char[120])')
db.execute('CREATE TABLE IF NOT EXISTS result ('
' test_id INTEGER, '
' run_id CHAR[30], '
' version VARCHAR[200], '
' triggers TEXT, '
' duration INTEGER, '
' exitcode INTEGER, '
' requester TEXT, '
' PRIMARY KEY(test_id, run_id), '
' FOREIGN KEY(test_id) REFERENCES test(id))')
# /packages/<name> mostly benefits from the index on package (0.8s -> 0.01s),
# but adding the other fields improves it a further 50% to 0.005s.
db.execute('CREATE UNIQUE INDEX IF NOT EXISTS test_package_uix ON test('
' package, release, arch)')
db.execute('CREATE INDEX IF NOT EXISTS result_run_ix ON result('
' run_id desc)')
db.commit()
return db
def set_results(self, results):
'''Wrapper to set autopkgtest results in both swift and sqlite3'''
self.swift.set_results(results)
# swift bucket name is irrelevant for sqlite
for i in results.values():
for k,v in i.items():
(series, arch, discard, source, latest) = k.split('/')
retcode = v[0]
if not v[1]:
source_ver = None
else:
source_ver = v[1].split(' ')[1]
try:
trigger = v[2]['custom_environment'][0].split('=')[1]
except (IndexError, KeyError):
trigger = None
try:
self.db.execute('INSERT INTO test (release, arch, package) '
'VALUES (?, ?, ?)',
(series, arch, source))
except sqlite3.IntegrityError:
# Completely normal if we have more than one result for
# the same source package; ignore
pass
self.db.execute('INSERT INTO result '
'(test_id, run_id, version, triggers, '
' exitcode) '
'SELECT test.id, ?, ?, ?, ? FROM test '
'WHERE release=? AND arch=? AND package=?',
(latest, source_ver, trigger, retcode,
series, arch, source))
self.db.commit()
def run_it(self, unstable_add, expect_status, expect_excuses={}):
'''Run britney with some unstable packages and verify excuses.
@ -182,6 +280,29 @@ class AT(TestAutopkgtestBase):
# Tests for generic packages
################################################################
def test_fail_on_missing_database(self):
'''Fails if autopkgtest.db is requested but not available'''
os.unlink(self.db_path)
self.data.add_default_packages(lightgreen=False)
britney_failed = 0
try:
self.run_it(
# uninstallable unstable version
[('lightgreen', {'Version': '1.1~beta', 'Depends': 'libc6 (>= 0.9), libgreen1 (>= 2)'}, 'autopkgtest')],
{'lightgreen': (False, {})},
{'lightgreen': [('old-version', '1'), ('new-version', '1.1~beta'),
('reason', 'depends'),
('excuses', 'uninstallable on arch amd64, not running autopkgtest there')
]
})[1]
except AssertionError as e:
britney_failed = 1
self.assertEqual(britney_failed, 1, "DB missing but britney succeeded")
def test_no_request_for_uninstallable(self):
'''Does not request a test for an uninstallable package'''
@ -216,7 +337,7 @@ class AT(TestAutopkgtestBase):
self.sourceppa_cache['purple'] = {'2': ''}
# The package has passed before on i386
self.swift.set_results({'autopkgtest-testing': {
self.set_results({'autopkgtest-testing': {
'testing/i386/p/purple/20150101_100000@': (0, 'purple 1', tr('purple/1')),
'testing/amd64/p/purple/20150101_100000@': (0, 'purple 1', tr('purple/1')),
'testing/amd64/p/purple/20200101_100000@': (0, 'purple 2', tr('purple/2')),
@ -242,7 +363,7 @@ class AT(TestAutopkgtestBase):
self.data.add_default_packages(darkgreen=False)
# The package has failed before, and with a trigger too on amd64
self.swift.set_results({'autopkgtest-testing': {
self.set_results({'autopkgtest-testing': {
'testing/i386/d/darkgreen/20150101_100000@': (4, 'green 1'),
'testing/amd64/d/darkgreen/20150101_100000@': (4, 'green 1', tr('failedbefore/1')),
}})
@ -287,7 +408,7 @@ class AT(TestAutopkgtestBase):
# green has passed on amd64 before
# lightgreen has passed on i386, therefore we should block on it returning
self.swift.set_results({'autopkgtest-testing': {
self.set_results({'autopkgtest-testing': {
'testing/amd64/g/green/20150101_100000@': (0, 'green 4', tr('green/1')),
'testing/i386/l/lightgreen/20150101_100100@': (0, 'lightgreen 1', tr('green/1')),
}})
@ -319,7 +440,7 @@ class AT(TestAutopkgtestBase):
self.data.add_default_packages(green=False)
# green has passed before on i386 only, therefore ALWAYSFAIL on amd64
self.swift.set_results({'autopkgtest-testing': {
self.set_results({'autopkgtest-testing': {
'testing/i386/g/green/20150101_100000@': (0, 'green 1', tr('passedbefore/1')),
}})
@ -362,7 +483,7 @@ class AT(TestAutopkgtestBase):
self.data.add_default_packages(green=False)
# green has passed before on i386 only, therefore ALWAYSFAIL on amd64
self.swift.set_results({'autopkgtest-testing': {
self.set_results({'autopkgtest-testing': {
'testing/i386/g/green/20150101_100000@': (0, 'green 1', tr('passedbefore/1')),
}})
@ -380,7 +501,7 @@ class AT(TestAutopkgtestBase):
self.assertNotIn('brittle', exc['green']['policy_info']['autopkgtest'])
# second run collects the results
self.swift.set_results({'autopkgtest-testing': {
self.set_results({'autopkgtest-testing': {
'testing/i386/d/darkgreen/20150101_100000@': (0, 'darkgreen 1', tr('green/2')),
'testing/amd64/d/darkgreen/20150101_100001@': (0, 'darkgreen 1', tr('green/2')),
'testing/i386/l/lightgreen/20150101_100100@': (0, 'lightgreen 1', tr('green/2')),
@ -424,7 +545,7 @@ class AT(TestAutopkgtestBase):
# third run should not trigger any new tests, should all be in the
# cache
self.swift.set_results({})
self.set_results({})
out = self.run_it(
[],
{'green': (True, {'green/2': {'amd64': 'PASS', 'i386': 'PASS'},
@ -442,7 +563,7 @@ class AT(TestAutopkgtestBase):
self.data.add_default_packages(green=False)
# green has passed before on i386 only, therefore ALWAYSFAIL on amd64
self.swift.set_results({'autopkgtest-testing': {
self.set_results({'autopkgtest-testing': {
'testing/i386/g/green/20150101_100000@': (0, 'green 1', tr('passedbefore/1')),
}})
@ -457,7 +578,7 @@ class AT(TestAutopkgtestBase):
{'green': [('old-version', '1'), ('new-version', '2')]})
# second run collects the results
self.swift.set_results({'autopkgtest-testing': {
self.set_results({'autopkgtest-testing': {
'testing/i386/d/darkgreen/20150101_100000@': (0, 'darkgreen 1', tr('green/2')),
'testing/amd64/l/lightgreen/20150101_100100@': (0, 'lightgreen 1', tr('green/1')),
'testing/amd64/l/lightgreen/20150101_100101@': (4, 'lightgreen 1', tr('green/2')),
@ -489,7 +610,7 @@ class AT(TestAutopkgtestBase):
self.data.add_default_packages(green=False)
self.swift.set_results({'autopkgtest-testing': {
self.set_results({'autopkgtest-testing': {
'testing/i386/d/darkgreen/20150101_100000@': (0, 'darkgreen 1'),
'testing/amd64/l/lightgreen/20150101_100100@': (0, 'lightgreen 1'),
'testing/amd64/l/lightgreen/20150101_100101@': (4, 'lightgreen 1'),
@ -518,7 +639,7 @@ class AT(TestAutopkgtestBase):
self.data.add_default_packages(green=False)
self.swift.set_results({'autopkgtest-testing': {
self.set_results({'autopkgtest-testing': {
'testing/i386/d/darkgreen/20150101_100000@': (0, 'darkgreen 1', tr('green/2')),
'testing/amd64/d/darkgreen/20150101_100000@': (0, 'darkgreen 1', tr('green/2')),
'testing/i386/l/lightgreen/20150101_100100@': (0, 'lightgreen 1', tr('green/1')),
@ -574,7 +695,7 @@ class AT(TestAutopkgtestBase):
self.data.add_default_packages(green=False)
self.swift.set_results({'autopkgtest-testing': {
self.set_results({'autopkgtest-testing': {
'testing/i386/d/darkgreen/20150101_100000@': (0, 'darkgreen 1', tr('green/2')),
'testing/amd64/d/darkgreen/20150101_100000@': (0, 'darkgreen 1', tr('green/2')),
'testing/i386/l/lightgreen/20150101_100100@': (0, 'lightgreen 1', tr('green/2')),
@ -603,7 +724,7 @@ class AT(TestAutopkgtestBase):
self.data.add_default_packages(green=False)
self.swift.set_results({'autopkgtest-testing': {
self.set_results({'autopkgtest-testing': {
'testing/i386/d/darkgreen/20150101_100000@': (0, 'darkgreen 1', tr('green/2')),
'testing/amd64/d/darkgreen/20150101_100000@': (0, 'darkgreen 1', tr('green/2')),
'testing/i386/l/lightgreen/20150101_100100@': (4, 'lightgreen 1', tr('green/1')),
@ -635,7 +756,7 @@ class AT(TestAutopkgtestBase):
self.data.add_default_packages(green=False)
# green has passed before on amd64, doesn't exist on i386
self.swift.set_results({'autopkgtest-testing': {
self.set_results({'autopkgtest-testing': {
'testing/amd64/g/green64/20150101_100000@': (0, 'green64 0.1', tr('passedbefore/1')),
}})
@ -670,7 +791,7 @@ class AT(TestAutopkgtestBase):
'green': ['amd64', 'i386']}})
# second run collects the results
self.swift.set_results({'autopkgtest-testing': {
self.set_results({'autopkgtest-testing': {
'testing/i386/d/darkgreen/20150101_100000@': (0, 'darkgreen 1', tr('green/2')),
'testing/amd64/d/darkgreen/20150101_100001@': (0, 'darkgreen 1', tr('green/2')),
'testing/i386/l/lightgreen/20150101_100100@': (0, 'lightgreen 1', tr('green/2')),
@ -761,7 +882,7 @@ class AT(TestAutopkgtestBase):
'Conflicts': 'blue'},
testsuite='autopkgtest', add_src=False)
self.swift.set_results({'autopkgtest-testing': {
self.set_results({'autopkgtest-testing': {
'testing/i386/d/darkgreen/20150101_100000@': (0, 'darkgreen 1', tr('green/2')),
'testing/i386/l/lightgreen/20150101_100100@': (0, 'lightgreen 1', tr('green/2')),
'testing/i386/g/green/20150101_100200@': (0, 'green 2', tr('green/2')),
@ -793,7 +914,7 @@ class AT(TestAutopkgtestBase):
'Conflicts': 'blue'},
testsuite='autopkgtest', add_src=False)
self.swift.set_results({'autopkgtest-testing': {
self.set_results({'autopkgtest-testing': {
'testing/i386/d/darkgreen/20150101_100000@': (0, 'darkgreen 1', tr('green/2')),
'testing/i386/l/lightgreen/20150101_100100@': (0, 'lightgreen 1', tr('green/2')),
'testing/i386/g/green/20150101_100200@': (0, 'green 2', tr('green/2')),
@ -818,7 +939,7 @@ class AT(TestAutopkgtestBase):
self.data.add_default_packages(green=False, lightgreen=False)
# old lightgreen fails, thus new green should be held back
self.swift.set_results({'autopkgtest-testing': {
self.set_results({'autopkgtest-testing': {
'testing/i386/d/darkgreen/20150101_100000@': (0, 'darkgreen 1', tr('green/1.1')),
'testing/amd64/d/darkgreen/20150101_100001@': (0, 'darkgreen 1', tr('green/1.1')),
'testing/i386/l/lightgreen/20150101_100000@': (0, 'lightgreen 1', tr('green/1')),
@ -881,7 +1002,7 @@ class AT(TestAutopkgtestBase):
'debci-testing-i386:lightgreen {"triggers": ["lightgreen/2"]}']))
# next run collects the results
self.swift.set_results({'autopkgtest-testing': {
self.set_results({'autopkgtest-testing': {
'testing/i386/l/lightgreen/20150101_100200@': (0, 'lightgreen 2', tr('lightgreen/2')),
'testing/amd64/l/lightgreen/20150101_102000@': (0, 'lightgreen 2', tr('lightgreen/2')),
}})
@ -906,7 +1027,7 @@ class AT(TestAutopkgtestBase):
self.data.add_default_packages(green=False)
self.swift.set_results({'autopkgtest-testing': {
self.set_results({'autopkgtest-testing': {
'testing/i386/d/darkgreen/20150101_100000@': (0, 'darkgreen 1', tr('green/2')),
'testing/amd64/d/darkgreen/20150101_100001@': (0, 'darkgreen 1', tr('green/2')),
'testing/i386/l/lightgreen/20150101_100000@': (0, 'lightgreen 1', tr('green/2')),
@ -947,7 +1068,7 @@ class AT(TestAutopkgtestBase):
self.data.add_default_packages(green=False, lightgreen=False)
# old lightgreen fails, thus new green should be held back
self.swift.set_results({'autopkgtest-testing': {
self.set_results({'autopkgtest-testing': {
'testing/i386/d/darkgreen/20150101_100000@': (0, 'darkgreen 1', tr('green/1.1')),
'testing/amd64/d/darkgreen/20150101_100001@': (0, 'darkgreen 1', tr('green/1.1')),
'testing/i386/l/lightgreen/20150101_100000@': (0, 'lightgreen 1', tr('green/1')),
@ -982,7 +1103,7 @@ class AT(TestAutopkgtestBase):
self.assertEqual(self.pending_requests, {})
# lightgreen 2 stays unbuilt in britney, but we get a test result for it
self.swift.set_results({'autopkgtest-testing': {
self.set_results({'autopkgtest-testing': {
'testing/i386/l/lightgreen/20150101_100200@': (0, 'lightgreen 2', tr('green/1.1')),
'testing/amd64/l/lightgreen/20150101_102000@': (0, 'lightgreen 2', tr('green/1.1')),
}})
@ -1012,7 +1133,7 @@ class AT(TestAutopkgtestBase):
self.data.add_default_packages(green=False, lightgreen=False)
self.swift.set_results({'autopkgtest-testing': {
self.set_results({'autopkgtest-testing': {
'testing/i386/l/lightgreen/20150101_100101@': (0, 'lightgreen 1', tr('lightgreen/1')),
}})
@ -1037,7 +1158,7 @@ class AT(TestAutopkgtestBase):
self.assertEqual(len(self.amqp_requests), 6)
# we only get a result for lightgreen 2, not for the requested 1
self.swift.set_results({'autopkgtest-testing': {
self.set_results({'autopkgtest-testing': {
'testing/i386/d/darkgreen/20150101_100000@': (0, 'darkgreen 1', tr('green/2')),
'testing/amd64/d/darkgreen/20150101_100001@': (0, 'darkgreen 1', tr('green/2')),
'testing/i386/l/lightgreen/20150101_100100@': (0, 'lightgreen 0.5', tr('green/1')),
@ -1093,7 +1214,7 @@ class AT(TestAutopkgtestBase):
self.data.add_default_packages(green=False, lightgreen=False)
# green has passed before on i386 only, therefore ALWAYSFAIL on amd64
self.swift.set_results({'autopkgtest-testing': {
self.set_results({'autopkgtest-testing': {
'testing/i386/g/green/20150101_100000@': (0, 'green 1', tr('passedbefore/1')),
}})
@ -1159,7 +1280,7 @@ class AT(TestAutopkgtestBase):
self.data.add('brown', False, {'Depends': 'grey'}, testsuite='autopkgtest')
self.data.add('brown', True, {'Depends': 'grey'}, testsuite='autopkgtest')
self.swift.set_results({'autopkgtest-testing': {
self.set_results({'autopkgtest-testing': {
'testing/amd64/b/black/20150101_100000@': (0, 'black 1', tr('black/1')),
'testing/amd64/b/black/20150102_100000@': (99, 'black blacklisted', tr('black/2')),
'testing/amd64/g/grey/20150101_100000@': (99, 'grey blacklisted', tr('grey/1')),
@ -1188,7 +1309,7 @@ class AT(TestAutopkgtestBase):
self.data.add_default_packages(black=False)
self.swift.set_results({'autopkgtest-testing': {
self.set_results({'autopkgtest-testing': {
'testing/amd64/b/black/20150101_100000@': (0, 'black 1', tr('black/1')),
'testing/amd64/b/black/20150102_100000@': (99, 'black blacklisted', tr('black/2')),
'testing/i386/b/black/20150101_100000@': (0, 'black 1', tr('black/1')),
@ -1211,7 +1332,7 @@ class AT(TestAutopkgtestBase):
self.data.add_default_packages(black=False)
self.swift.set_results({'autopkgtest-testing': {
self.set_results({'autopkgtest-testing': {
'testing/amd64/b/black/20150101_100000@': (0, 'black 1', tr('black/1')),
'testing/i386/b/black/20150101_100001@': (0, 'black 1', tr('black/1')),
'testing/amd64/b/black/20150102_100000@': (4, 'black 2', tr('black/2')),
@ -1233,7 +1354,7 @@ class AT(TestAutopkgtestBase):
self.data.add_default_packages(green=False)
self.swift.set_results({'autopkgtest-testing': {
self.set_results({'autopkgtest-testing': {
'testing/i386/d/darkgreen/20150101_100000@': (0, 'darkgreen 1', tr('newgreen/2')),
'testing/amd64/d/darkgreen/20150101_100000@': (0, 'darkgreen 1', tr('newgreen/2')),
'testing/i386/g/green/20150101_100000@': (0, 'green 1', tr('newgreen/2')),
@ -1262,7 +1383,7 @@ class AT(TestAutopkgtestBase):
self.data.add_default_packages(darkgreen=False)
self.swift.set_results({'autopkgtest-testing': {
self.set_results({'autopkgtest-testing': {
'testing/i386/d/darkgreen/20150101_100000@': (0, 'darkgreen 1', tr('darkgreen/1')),
'testing/amd64/d/darkgreen/20150101_100000@': (0, 'darkgreen 1', tr('darkgreen/1')),
}})
@ -1279,7 +1400,7 @@ class AT(TestAutopkgtestBase):
{'darkgreen/2': {'darkgreen': ['amd64', 'i386']}})
# second run gets the results for darkgreen 2
self.swift.set_results({'autopkgtest-testing': {
self.set_results({'autopkgtest-testing': {
'testing/i386/d/darkgreen/20150101_100010@': (0, 'darkgreen 2', tr('darkgreen/2')),
'testing/amd64/d/darkgreen/20150101_100010@': (0, 'darkgreen 2', tr('darkgreen/2')),
}})
@ -1323,7 +1444,7 @@ class AT(TestAutopkgtestBase):
self.data.add_default_packages(green=False)
self.swift.set_results({'autopkgtest-testing': {
self.set_results({'autopkgtest-testing': {
'testing/i386/g/green/20150101_100000@': (0, 'green 1', tr('green/1')),
'testing/amd64/g/green/20150101_100000@': (0, 'green 1', tr('green/1')),
'testing/i386/g/green/20150101_100010@': (0, 'green 2', tr('green/2')),
@ -1363,7 +1484,7 @@ class AT(TestAutopkgtestBase):
# third run gets the results for green and lightgreen, darkgreen is
# still running
self.swift.set_results({'autopkgtest-testing': {
self.set_results({'autopkgtest-testing': {
'testing/i386/g/green/20150101_100020@': (0, 'green 3', tr('green/3')),
'testing/amd64/g/green/20150101_100020@': (0, 'green 3', tr('green/3')),
'testing/i386/l/lightgreen/20150101_100010@': (0, 'lightgreen 1', tr('green/3')),
@ -1381,7 +1502,7 @@ class AT(TestAutopkgtestBase):
{'green/3': {'darkgreen': ['amd64', 'i386']}})
# fourth run finally gets the new darkgreen result
self.swift.set_results({'autopkgtest-testing': {
self.set_results({'autopkgtest-testing': {
'testing/i386/d/darkgreen/20150101_100010@': (0, 'darkgreen 1', tr('green/3')),
'testing/amd64/d/darkgreen/20150101_100010@': (0, 'darkgreen 1', tr('green/3')),
}})
@ -1400,7 +1521,7 @@ class AT(TestAutopkgtestBase):
self.data.add_default_packages(green=False)
self.swift.set_results({'autopkgtest-testing': {
self.set_results({'autopkgtest-testing': {
'testing/i386/d/darkgreen/20150101_100000@': (0, 'darkgreen 1', tr('passedbefore/1')),
'testing/amd64/d/darkgreen/20150101_100000@': (0, 'darkgreen 1', tr('passedbefore/1')),
}})
@ -1411,7 +1532,7 @@ class AT(TestAutopkgtestBase):
{'green': (False, {'darkgreen': {'amd64': 'RUNNING', 'i386': 'RUNNING'}})})
# second run: i386 result has version 1.1
self.swift.set_results({'autopkgtest-testing': {
self.set_results({'autopkgtest-testing': {
'testing/i386/d/darkgreen/20150101_100010@': (0, 'darkgreen 1.1', tr('green/2'))
}})
self.run_it(
@ -1421,7 +1542,7 @@ class AT(TestAutopkgtestBase):
})})
# third run: amd64 result has version 1.2
self.swift.set_results({'autopkgtest-testing': {
self.set_results({'autopkgtest-testing': {
'testing/amd64/d/darkgreen/20150101_100010@': (0, 'darkgreen 1.2', tr('green/2')),
}})
self.run_it(
@ -1436,7 +1557,7 @@ class AT(TestAutopkgtestBase):
self.data.add_default_packages(lightgreen=False)
# one tmpfail result without testpkg-version, should be ignored
self.swift.set_results({'autopkgtest-testing': {
self.set_results({'autopkgtest-testing': {
'testing/i386/l/lightgreen/20150101_100000@': (0, 'lightgreen 1', tr('lightgreen/1')),
'testing/i386/l/lightgreen/20150101_100101@': (16, None, tr('lightgreen/2')),
'testing/amd64/l/lightgreen/20150101_100000@': (0, 'lightgreen 1', tr('lightgreen/1')),
@ -1450,7 +1571,7 @@ class AT(TestAutopkgtestBase):
{'lightgreen/2': {'lightgreen': ['i386']}})
# one more tmpfail result, should not confuse britney with None version
self.swift.set_results({'autopkgtest-testing': {
self.set_results({'autopkgtest-testing': {
'testing/i386/l/lightgreen/20150101_100201@': (16, None, tr('lightgreen/2')),
}})
self.run_it(
@ -1467,7 +1588,7 @@ class AT(TestAutopkgtestBase):
self.data.add_default_packages(green=False)
# first run fails
self.swift.set_results({'autopkgtest-testing': {
self.set_results({'autopkgtest-testing': {
'testing/i386/g/green/20150101_100000@': (0, 'green 2', tr('green/1')),
'testing/i386/g/green/20150101_100101@': (4, 'green 2', tr('green/2')),
'testing/amd64/g/green/20150101_100000@': (0, 'green 2', tr('green/1')),
@ -1491,7 +1612,7 @@ class AT(TestAutopkgtestBase):
# re-running test manually succeeded (note: darkgreen result should be
# cached already)
self.swift.set_results({'autopkgtest-testing': {
self.set_results({'autopkgtest-testing': {
'testing/i386/g/green/20150101_100201@': (0, 'green 2', tr('green/2')),
'testing/amd64/g/green/20150101_100201@': (0, 'green 2', tr('green/2')),
'testing/i386/l/lightgreen/20150101_100201@': (0, 'lightgreen 1', tr('green/2')),
@ -1518,7 +1639,7 @@ class AT(TestAutopkgtestBase):
self.data.add_default_packages(libc6=False)
# new libc6 works fine with green
self.swift.set_results({'autopkgtest-testing': {
self.set_results({'autopkgtest-testing': {
'testing/i386/g/green/20150101_100000@': (0, 'green 1', tr('libc6/2')),
'testing/amd64/g/green/20150101_100000@': (0, 'green 1', tr('libc6/2')),
}})
@ -1542,7 +1663,7 @@ class AT(TestAutopkgtestBase):
# new green fails; that's not libc6's fault though, so it should stay
# valid
self.swift.set_results({'autopkgtest-testing': {
self.set_results({'autopkgtest-testing': {
'testing/i386/g/green/20150101_100100@': (4, 'green 2', tr('green/2')),
'testing/amd64/g/green/20150101_100100@': (4, 'green 2', tr('green/2')),
}})
@ -1564,7 +1685,7 @@ class AT(TestAutopkgtestBase):
self.data.add_default_packages(green=False, lightgreen=False)
self.swift.set_results({'autopkgtest-testing': {
self.set_results({'autopkgtest-testing': {
'testing/i386/g/green/20150101_100101@': (0, 'green 1', tr('green/1')),
'testing/amd64/g/green/20150101_100101@': (0, 'green 1', tr('green/1')),
'testing/i386/g/green/20150101_100201@': (0, 'green 2', tr('green/2')),
@ -1591,7 +1712,7 @@ class AT(TestAutopkgtestBase):
# green
self.data.remove_all(True)
self.swift.set_results({'autopkgtest-testing': {
self.set_results({'autopkgtest-testing': {
# add new result for lightgreen 1
'testing/i386/l/lightgreen/20150101_100301@': (0, 'lightgreen 1', tr('green/2')),
'testing/amd64/l/lightgreen/20150101_100301@': (0, 'lightgreen 1', tr('green/2')),
@ -1627,7 +1748,7 @@ class AT(TestAutopkgtestBase):
# # self.data.add_default_packages(lightgreen=False)
# #
# # # lightgreen has passed before on i386 only, therefore ALWAYSFAIL on amd64
# # self.swift.set_results({'autopkgtest-testing': {
# # self.set_results({'autopkgtest-testing': {
# # 'testing/i386/l/lightgreen/20150101_100000@': (0, 'lightgreen 1', tr('passedbefore/1')),
# # }})
# #
@ -1687,7 +1808,7 @@ class AT(TestAutopkgtestBase):
self.data.add_default_packages(lightgreen=False)
self.swift.set_results({'autopkgtest-testing': {
self.set_results({'autopkgtest-testing': {
'testing/i386/r/rainbow/20150101_100000@': (0, 'rainbow 1', tr('passedbefore/1')),
}})
@ -1735,7 +1856,7 @@ class AT(TestAutopkgtestBase):
self.data.add_default_packages(green=False)
self.swift.set_results({'autopkgtest-testing': {
self.set_results({'autopkgtest-testing': {
'testing/i386/d/darkgreen/20150101_100000@': (0, 'darkgreen 1', tr('green/2')),
'testing/amd64/d/darkgreen/20150101_100000@': (0, 'darkgreen 1', tr('green/2')),
'testing/i386/l/lightgreen/20150101_100100@': (0, 'lightgreen 1', tr('green/1')),
@ -1763,7 +1884,7 @@ class AT(TestAutopkgtestBase):
self.data.add_default_packages(green=False)
self.swift.set_results({'autopkgtest-testing': {
self.set_results({'autopkgtest-testing': {
'testing/i386/d/darkgreen/20150101_100000@': (0, 'darkgreen 1', tr('green/2')),
'testing/amd64/d/darkgreen/20150101_100000@': (0, 'darkgreen 1', tr('green/2')),
'testing/i386/l/lightgreen/20150101_100100@': (0, 'lightgreen 1', tr('green/1')),
@ -1806,7 +1927,7 @@ class AT(TestAutopkgtestBase):
self.data.add_default_packages(green=False)
self.swift.set_results({'autopkgtest-testing': {
self.set_results({'autopkgtest-testing': {
'testing/i386/d/darkgreen/20150101_100000@': (0, 'darkgreen 1', tr('green/2')),
'testing/amd64/d/darkgreen/20150101_100000@': (0, 'darkgreen 1', tr('green/2')),
'testing/i386/l/lightgreen/20150101_100100@': (0, 'lightgreen 1', tr('green/1')),
@ -1848,7 +1969,7 @@ class AT(TestAutopkgtestBase):
self.data.add_default_packages(green=False)
self.swift.set_results({'autopkgtest-testing': {
self.set_results({'autopkgtest-testing': {
'testing/i386/d/darkgreen/20150101_100000@': (0, 'darkgreen 1', tr('green/2')),
'testing/amd64/d/darkgreen/20150101_100000@': (0, 'darkgreen 1', tr('green/2')),
'testing/i386/l/lightgreen/20150101_100100@': (0, 'lightgreen 1', tr('green/1')),
@ -1889,7 +2010,7 @@ class AT(TestAutopkgtestBase):
self.data.add_default_packages(green=False)
self.swift.set_results({'autopkgtest-testing': {
self.set_results({'autopkgtest-testing': {
'testing/i386/d/darkgreen/20150101_100000@': (0, 'darkgreen 1', tr('green/2')),
'testing/amd64/d/darkgreen/20150101_100000@': (0, 'darkgreen 1', tr('green/2')),
'testing/i386/l/lightgreen/20150101_100100@': (0, 'lightgreen 1', tr('green/1')),
@ -1918,7 +2039,7 @@ class AT(TestAutopkgtestBase):
self.create_hint('autopkgtest', 'force-skiptest green/2')
# regression of green, darkgreen ok, lightgreen running
self.swift.set_results({'autopkgtest-testing': {
self.set_results({'autopkgtest-testing': {
'testing/i386/g/green/20150101_100000@': (0, 'green 1', tr('passedbefore/1')),
'testing/i386/g/green/20150101_100200@': (4, 'green 2', tr('green/2')),
'testing/i386/d/darkgreen/20150101_100000@': (0, 'darkgreen 1', tr('green/2')),
@ -1942,7 +2063,7 @@ class AT(TestAutopkgtestBase):
self.data.add_default_packages(green=False)
# green has passed before on i386 only, therefore ALWAYSFAIL on amd64
self.swift.set_results({'autopkgtest-testing': {
self.set_results({'autopkgtest-testing': {
'testing/i386/g/green/20150101_100000@': (0, 'green 1', tr('passedbefore/1')),
}})
@ -1965,7 +2086,7 @@ class AT(TestAutopkgtestBase):
self.create_hint('freeze', 'block-all source')
self.swift.set_results({'autopkgtest-testing': {
self.set_results({'autopkgtest-testing': {
'testing/i386/l/lightgreen/20150101_100000@': (0, 'lightgreen 1', tr('passedbefore/1')),
'testing/amd64/l/lightgreen/20150101_100000@': (0, 'lightgreen 1', tr('passedbefore/1')),
}})
@ -1975,7 +2096,7 @@ class AT(TestAutopkgtestBase):
{'lightgreen': (False, {'lightgreen': {'amd64': 'RUNNING', 'i386': 'RUNNING'}})}
)
self.swift.set_results({'autopkgtest-testing': {
self.set_results({'autopkgtest-testing': {
'testing/i386/l/lightgreen/20150101_100100@': (0, 'lightgreen 2', tr('lightgreen/2')),
'testing/amd64/l/lightgreen/20150101_100100@': (0, 'lightgreen 2', tr('lightgreen/2')),
}})
@ -1991,7 +2112,7 @@ class AT(TestAutopkgtestBase):
self.data.add_default_packages(lightgreen=False)
self.swift.set_results({'autopkgtest-testing': {
self.set_results({'autopkgtest-testing': {
'testing/amd64/l/lightgreen/20150101_100100@': (0, 'lightgreen 1', tr('lightgreen/1')),
'testing/amd64/l/lightgreen/20150101_100101@': (4, 'lightgreen 2', tr('lightgreen/2')),
}})
@ -2012,7 +2133,7 @@ class AT(TestAutopkgtestBase):
self.data.add_default_packages(lightgreen=False)
self.swift.set_results({'autopkgtest-testing': {
self.set_results({'autopkgtest-testing': {
'testing/amd64/l/lightgreen/20150101_100100@': (0, 'lightgreen 1', tr('lightgreen/1')),
'testing/amd64/l/lightgreen/20150101_100101@': (4, 'lightgreen 2', tr('lightgreen/2')),
'testing/i386/l/lightgreen/20150101_100100@': (0, 'lightgreen 1', tr('lightgreen/1')),
@ -2035,7 +2156,7 @@ class AT(TestAutopkgtestBase):
self.data.add_default_packages(lightgreen=False)
self.swift.set_results({'autopkgtest-testing': {
self.set_results({'autopkgtest-testing': {
'testing/amd64/l/lightgreen/20150101_100100@': (4, 'lightgreen 1', tr('lightgreen/1')),
'testing/amd64/l/lightgreen/20150102_100101@': (0, 'lightgreen 2', tr('lightgreen/2')),
}})
@ -2056,7 +2177,7 @@ class AT(TestAutopkgtestBase):
self.data.add_default_packages(lightgreen=False)
self.swift.set_results({'autopkgtest-testing': {
self.set_results({'autopkgtest-testing': {
'testing/amd64/l/lightgreen/20150101_100100@': (4, 'lightgreen 1', tr('lightgreen/1')),
'testing/amd64/l/lightgreen/20150102_100101@': (0, 'lightgreen 2', tr('lightgreen/2')),
'testing/amd64/l/lightgreen/20150103_100101@': (4, 'lightgreen 3', tr('lightgreen/3')),
@ -2078,7 +2199,7 @@ class AT(TestAutopkgtestBase):
self.data.add_default_packages(green=False)
self.swift.set_results({'autopkgtest-testing': {
self.set_results({'autopkgtest-testing': {
'testing/amd64/l/lightgreen/20150101_100100@': (4, 'lightgreen 0.1', tr('lightgreen/0.1')),
'testing/amd64/l/lightgreen/20150102_100101@': (0, 'lightgreen 1', tr('lightgreen/1')),
'testing/amd64/l/lightgreen/20150103_100101@': (4, 'lightgreen 1', tr('green/2')),
@ -2100,7 +2221,7 @@ class AT(TestAutopkgtestBase):
self.data.add_default_packages(green=False, lightgreen=False)
self.swift.set_results({'autopkgtest-testing': {
self.set_results({'autopkgtest-testing': {
'testing/amd64/l/lightgreen/20150101_100100@': (0, 'lightgreen 1', tr('lightgreen/1')),
'testing/amd64/l/lightgreen/20150102_100100@': (4, 'lightgreen 1', tr('green/2')),
'testing/amd64/l/lightgreen/20150103_100101@': (0, 'lightgreen 2', tr('lightgreen/2')),
@ -2152,7 +2273,7 @@ class AT(TestAutopkgtestBase):
self.data.add_default_packages(green=False, lightgreen=False)
self.swift.set_results({'autopkgtest-testing': {
self.set_results({'autopkgtest-testing': {
'testing/amd64/l/lightgreen/20150101_100101@': (0, 'lightgreen 1', tr('lightgreen/1')),
'testing/amd64/l/lightgreen/20150102_100101@': (4, 'lightgreen 1', tr('green/2')),
'testing/amd64/l/lightgreen/20150103_100102@': (0, 'lightgreen 2', tr('lightgreen/2')),
@ -2180,7 +2301,7 @@ class AT(TestAutopkgtestBase):
self.data.add_default_packages(green=False, lightgreen=False)
self.swift.set_results({'autopkgtest-testing': {
self.set_results({'autopkgtest-testing': {
'testing/amd64/l/lightgreen/20150101_100101@': (0, 'lightgreen 1', tr('lightgreen/1')),
'testing/amd64/l/lightgreen/20150102_100101@': (0, 'lightgreen 1', tr('green/2')),
'testing/amd64/l/lightgreen/20150103_100102@': (0, 'lightgreen 2', tr('lightgreen/2')),
@ -2213,7 +2334,7 @@ class AT(TestAutopkgtestBase):
self.data.add('dkms', False, {})
self.data.add('fancy-dkms', False, {'Source': 'fancy', 'Depends': 'dkms (>= 1)'}, testsuite='autopkgtest-pkg-dkms')
self.swift.set_results({'autopkgtest-testing': {
self.set_results({'autopkgtest-testing': {
'testing/i386/f/fancy/20150101_100101@': (0, 'fancy 0.1', tr('passedbefore/1'))
}})
@ -2261,7 +2382,7 @@ class AT(TestAutopkgtestBase):
# works against linux-meta and -64only, fails against grumpy i386, no
# result yet for grumpy amd64
self.swift.set_results({'autopkgtest-testing': {
self.set_results({'autopkgtest-testing': {
'testing/amd64/f/fancy/20150101_100301@': (0, 'fancy 0.5', tr('passedbefore/1')),
'testing/i386/f/fancy/20150101_100101@': (0, 'fancy 1', tr('linux-meta/1')),
'testing/amd64/f/fancy/20150101_100101@': (0, 'fancy 1', tr('linux-meta/1')),
@ -2290,7 +2411,7 @@ class AT(TestAutopkgtestBase):
# works against linux-meta and -64only, fails against grumpy i386, no
# result yet for grumpy amd64
self.swift.set_results({'autopkgtest-testing': {
self.set_results({'autopkgtest-testing': {
# old results without trigger info
'testing/i386/f/fancy/20140101_100101@': (0, 'fancy 1', {}),
'testing/amd64/f/fancy/20140101_100101@': (8, 'fancy 1', {}),
@ -2332,7 +2453,7 @@ class AT(TestAutopkgtestBase):
self.data.add('linux-libc-dev', False, {'Source': 'linux'}, testsuite='autopkgtest')
self.data.add('linux-image', False, {'Source': 'linux-meta', 'Depends': 'linux-image-1'})
self.swift.set_results({'autopkgtest-testing': {
self.set_results({'autopkgtest-testing': {
'testing/amd64/l/lxc/20150101_100101@': (0, 'lxc 0.1', tr('passedbefore/1'))
}})
@ -2366,7 +2487,7 @@ class AT(TestAutopkgtestBase):
self.data.add('linux-image-1', False, {'Source': 'linux'}, testsuite='autopkgtest')
self.data.add('linux-firmware', False, {'Source': 'linux-firmware'}, testsuite='autopkgtest')
self.swift.set_results({'autopkgtest-testing': {
self.set_results({'autopkgtest-testing': {
'testing/i386/f/fancy/20150101_090000@': (0, 'fancy 0.5', tr('passedbefore/1')),
'testing/i386/l/linux/20150101_100000@': (0, 'linux 2', tr('linux-meta/0.2')),
'testing/amd64/l/linux/20150101_100000@': (0, 'linux 2', tr('linux-meta/0.2')),
@ -2393,7 +2514,7 @@ class AT(TestAutopkgtestBase):
)
# now linux-meta is ready to go
self.swift.set_results({'autopkgtest-testing': {
self.set_results({'autopkgtest-testing': {
'testing/i386/f/fancy/20150101_100000@': (0, 'fancy 1', tr('linux-meta/0.2')),
'testing/amd64/f/fancy/20150101_100000@': (0, 'fancy 1', tr('linux-meta/0.2')),
}})
@ -2420,7 +2541,7 @@ class AT(TestAutopkgtestBase):
self.data.add('notme', False, {'Depends': 'libgcc1'}, testsuite='autopkgtest')
# binutils has passed before on i386 only, therefore ALWAYSFAIL on amd64
self.swift.set_results({'autopkgtest-testing': {
self.set_results({'autopkgtest-testing': {
'testing/i386/b/binutils/20150101_100000@': (0, 'binutils 1', tr('passedbefore/1')),
}})
@ -2436,7 +2557,7 @@ class AT(TestAutopkgtestBase):
self.data.add('gcc-7', False, {}, testsuite='autopkgtest')
# gcc-7 has passed before on i386 only, therefore ALWAYSFAIL on amd64
self.swift.set_results({'autopkgtest-series': {
self.set_results({'autopkgtest-series': {
'series/i386/g/gcc-7/20150101_100000@': (0, 'gcc-7 1', tr('passedbefore/1')),
}})
@ -2511,7 +2632,7 @@ class AT(TestAutopkgtestBase):
self.assertEqual(len(self.amqp_requests), 2)
# add results to PPA specific swift container
self.swift.set_results({'autopkgtest-testing-awesome-developers-staging': {
self.set_results({'autopkgtest-testing-awesome-developers-staging': {
'testing/i386/l/lightgreen/20150101_100000@': (0, 'lightgreen 1', tr('passedbefore/1')),
'testing/i386/l/lightgreen/20150101_100100@': (4, 'lightgreen 2', tr('lightgreen/2')),
'testing/amd64/l/lightgreen/20150101_100101@': (0, 'lightgreen 2', tr('lightgreen/2')),
@ -2750,7 +2871,7 @@ class AT(TestAutopkgtestBase):
self.data.add_default_packages(lightgreen=False)
# first run to create autopkgtest-results.cache
self.swift.set_results({'autopkgtest-testing': {
self.set_results({'autopkgtest-testing': {
'testing/i386/l/lightgreen/20150101_100000@': (0, 'lightgreen 2', tr('lightgreen/2')),
'testing/amd64/l/lightgreen/20150101_100000@': (0, 'lightgreen 2', tr('lightgreen/2')),
}})
@ -2775,7 +2896,7 @@ class AT(TestAutopkgtestBase):
sys.stdout.write(line)
# second run, should now not update cache
self.swift.set_results({'autopkgtest-testing': {
self.set_results({'autopkgtest-testing': {
'testing/i386/l/lightgreen/20150101_100100@': (0, 'lightgreen 3', tr('lightgreen/3')),
'testing/amd64/l/lightgreen/20150101_100100@': (0, 'lightgreen 3', tr('lightgreen/3')),
}})
@ -2890,7 +3011,7 @@ class AT(TestAutopkgtestBase):
self.data.add_default_packages(green=False)
# green has passed before on i386 only, therefore ALWAYSFAIL on amd64
self.swift.set_results({'autopkgtest-testing': {
self.set_results({'autopkgtest-testing': {
'testing/i386/g/green/20150101_100000@': (0, 'green 1', tr('passedbefore/1')),
}})
@ -2908,7 +3029,7 @@ class AT(TestAutopkgtestBase):
self.assertEqual(exc['green']['policy_info']['age']['age-requirement'], 40)
# second run collects the results
self.swift.set_results({'autopkgtest-testing': {
self.set_results({'autopkgtest-testing': {
'testing/i386/d/darkgreen/20150101_100000@': (0, 'darkgreen 1', tr('green/2')),
'testing/amd64/l/lightgreen/20150101_100100@': (0, 'lightgreen 1', tr('green/1')),
'testing/amd64/l/lightgreen/20150101_100101@': (4, 'lightgreen 1', tr('green/2')),
@ -2960,7 +3081,7 @@ class AT(TestAutopkgtestBase):
self.data.add_default_packages(green=False)
self.swift.set_results({'autopkgtest-testing': {
self.set_results({'autopkgtest-testing': {
'testing/i386/d/darkgreen/20150101_100000@': (0, 'darkgreen 1', tr('green/2')),
'testing/amd64/l/lightgreen/20150101_100100@': (0, 'lightgreen 1', tr('green/1')),
'testing/amd64/l/lightgreen/20150101_100101@': (4, 'lightgreen 1', tr('green/2')),
@ -2995,7 +3116,7 @@ class AT(TestAutopkgtestBase):
self.data.add_default_packages(green=False)
self.swift.set_results({'autopkgtest-testing': {
self.set_results({'autopkgtest-testing': {
'testing/i386/d/darkgreen/20150101_100000@': (0, 'darkgreen 1', tr('green/2')),
'testing/amd64/d/darkgreen/20150101_100000@': (0, 'darkgreen 1', tr('green/2')),
'testing/i386/l/lightgreen/20150101_100100@': (0, 'lightgreen 1', tr('green/2')),

@ -0,0 +1,310 @@
#!/usr/bin/python3
# (C) 2022 Canonical Ltd.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
import os
import pathlib
import sys
from types import SimpleNamespace
import unittest
from unittest.mock import patch
import xml.etree.ElementTree as ET
PROJECT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.insert(0, PROJECT_DIR)
from britney2.policies.cloud import CloudPolicy, ERR_MESSAGE, MissingURNException
class FakeItem:
package = "chromium-browser"
version = "0.0.1"
class FakeSourceData:
version = "55.0"
class T(unittest.TestCase):
def setUp(self):
self.fake_options = SimpleNamespace(
distrubtion = "testbuntu",
series = "zazzy",
unstable = "/tmp",
verbose = False,
cloud_source = "zazzy-proposed",
cloud_source_type = "archive",
cloud_azure_zazzy_urn = "fake-urn-value"
)
self.policy = CloudPolicy(self.fake_options, {})
self.policy._setup_work_directory()
def tearDown(self):
self.policy._cleanup_work_directory()
@patch("britney2.policies.cloud.CloudPolicy._run_cloud_tests")
def test_run_cloud_tests_called_for_package_in_manifest(self, mock_run):
"""Cloud tests should run for a package in the cloud package set.
"""
self.policy.package_set = set(["chromium-browser"])
self.policy.options.series = "jammy"
self.policy.apply_src_policy_impl(
None, FakeItem, None, FakeSourceData, None
)
mock_run.assert_called_once_with(
"chromium-browser", "jammy", ["proposed"], "archive"
)
@patch("britney2.policies.cloud.CloudPolicy._run_cloud_tests")
def test_run_cloud_tests_not_called_for_package_not_in_manifest(self, mock_run):
"""Cloud tests should not run for packages not in the cloud package set"""
self.policy.package_set = set(["vim"])
self.policy.options.series = "jammy"
self.policy.apply_src_policy_impl(
None, FakeItem, None, FakeSourceData, None
)
mock_run.assert_not_called()
@patch("britney2.policies.cloud.smtplib")
@patch("britney2.policies.cloud.CloudPolicy._run_cloud_tests")
def test_no_tests_run_during_dry_run(self, mock_run, smtp):
self.policy = CloudPolicy(self.fake_options, {}, dry_run=True)
self.policy.package_set = set(["chromium-browser"])
self.policy.options.series = "jammy"
self.policy.source = "jammy-proposed"
self.policy.apply_src_policy_impl(
None, FakeItem, None, FakeSourceData, None
)
mock_run.assert_not_called()
self.assertEqual(smtp.mock_calls, [])
def test_finding_results_file(self):
"""Ensure result file output from Cloud Test Framework can be found"""
path = pathlib.PurePath(self.policy.work_dir, "TEST-FakeTests-20230101010101.xml")
path2 = pathlib.PurePath(self.policy.work_dir, "Test-OtherTests-20230101010101.xml")
with open(path, "a"): pass
with open(path2, "a"): pass
regex = r"TEST-FakeTests-[0-9]*.xml"
results_file_paths = self.policy._find_results_files(regex)
self.assertEqual(len(results_file_paths), 1)
self.assertEqual(results_file_paths[0], path)
def test_parsing_of_xunit_results_file(self):
"""Test that parser correctly sorts and stores test failures and errors"""
path = self._create_fake_test_result_file(num_pass=4, num_err=2, num_fail=3)
self.policy._parse_xunit_test_results("Azure", [path])
azure_failures = self.policy.failures.get("Azure", {})
azure_errors = self.policy.errors.get("Azure", {})
self.assertEqual(len(azure_failures), 3)
self.assertEqual(len(azure_errors), 2)
test_names = azure_failures.keys()
self.assertIn("failing_test_1", test_names)
self.assertEqual(
azure_failures.get("failing_test_1"), "AssertionError: A useful error message"
)
def test_email_formatting(self):
"""Test that information is inserted correctly in the email template"""
failures = {
"Azure": {
"failing_test1": "Error reason 1",
"failing_test2": "Error reason 2"
}
}
self.policy.options.series = "jammy"
self.policy.source = "jammy-proposed"
message = self.policy._format_email_message(ERR_MESSAGE, ["work@canonical.com"], "vim", "9.0", failures)
self.assertIn("To: work@canonical.com", message)
self.assertIn("vim 9.0", message)
self.assertIn("Error reason 2", message)
def test_urn_retrieval(self):
"""Test that URN retrieval throws the expected error when not configured."""
self.assertRaises(
MissingURNException, self.policy._retrieve_urn, "jammy"
)
urn = self.policy._retrieve_urn("zazzy")
self.assertEqual(urn, "fake-urn-value")
def test_generation_of_verdict_info(self):
"""Test that the verdict info correctly states which clouds had failures and/or errors"""
failures = {
"cloud1": {
"test_name1": "message1",
"test_name2": "message2"
},
"cloud2": {
"test_name3": "message3"
}
}
errors = {
"cloud1": {
"test_name4": "message4",
},
"cloud3": {
"test_name5": "message5"
}
}
info = self.policy._generate_verdict_info(failures, errors)
expected_failure_info = "Cloud testing failed for cloud1,cloud2."
expected_error_info = "Cloud testing had errors for cloud1,cloud3."
self.assertIn(expected_failure_info, info)
self.assertIn(expected_error_info, info)
def test_format_install_flags_with_ppas(self):
"""Ensure the correct flags are returned with PPA sources"""
expected_flags = [
"--install-ppa-package", "tmux/ppa_url=fingerprint",
"--install-ppa-package", "tmux/ppa_url2=fingerprint"
]
install_flags = self.policy._format_install_flags(
"tmux", ["ppa_url=fingerprint", "ppa_url2=fingerprint"], "ppa"
)
self.assertListEqual(install_flags, expected_flags)
def test_format_install_flags_with_archive(self):
"""Ensure the correct flags are returned with archive sources"""
expected_flags = ["--install-archive-package", "tmux/proposed"]
install_flags = self.policy._format_install_flags("tmux", ["proposed"], "archive")
self.assertListEqual(install_flags, expected_flags)
def test_format_install_flags_with_incorrect_type(self):
"""Ensure errors are raised for unknown source types"""
self.assertRaises(RuntimeError, self.policy._format_install_flags, "tmux", ["a_source"], "something")
def test_parse_ppas(self):
"""Ensure correct conversion from Britney format to cloud test format
Also check that public PPAs are not used due to fingerprint requirement for cloud
tests.
"""
input_ppas = [
"deadsnakes/ppa:fingerprint",
"user:token@team/name:fingerprint"
]
expected_ppas = [
"https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu=fingerprint",
"https://user:token@private-ppa.launchpadcontent.net/team/name/ubuntu=fingerprint"
]
output_ppas = self.policy._parse_ppas(input_ppas)
self.assertListEqual(output_ppas, expected_ppas)
def test_errors_raised_if_invalid_ppa_input(self):
"""Test that error are raised if input PPAs don't match expected format"""
self.assertRaises(
RuntimeError, self.policy._parse_ppas, ["team/name"]
)
self.assertRaises(
RuntimeError, self.policy._parse_ppas, ["user:token@team/name"]
)
self.assertRaises(
RuntimeError, self.policy._parse_ppas, ["user:token@team=fingerprint"]
)
def test_retrieve_package_install_source_from_test_output(self):
"""Ensure retrieving the package install source from apt output only returns the line we
want and not other lines containing the package name.
Ensure it returns nothing if multiple candidates are found because that means the parsing
needs to be updated.
"""
package = "tmux"
with open(pathlib.PurePath(self.policy.work_dir, self.policy.TEST_LOG_FILE), "w") as file:
file.write("Get: something \n".format(package))
file.write("Get: lib-{} \n".format(package))
install_source = self.policy._retrieve_package_install_source_from_test_output(package)
self.assertIsNone(install_source)
with open(pathlib.PurePath(self.policy.work_dir, self.policy.TEST_LOG_FILE), "a") as file:
file.write("Get: {} \n".format(package))
install_source = self.policy._retrieve_package_install_source_from_test_output(package)
self.assertEqual(install_source, "Get: tmux \n")
@patch("britney2.policies.cloud.CloudPolicy._retrieve_package_install_source_from_test_output")
def test_store_extra_test_result_info(self, mock):
"""Ensure nothing is done if there are no failures/errors.
Ensure that if there are failures/errors that any extra info retrieved is stored in the
results dict Results -> Cloud -> extra_info
"""
self.policy._store_extra_test_result_info("FakeCloud", "tmux")
mock.assert_not_called()
self.policy.failures = {"FakeCloud": {"failing_test": "failure reason"}}
mock.return_value = "source information"
self.policy._store_extra_test_result_info("FakeCloud", "tmux")
self.assertEqual(
self.policy.failures["FakeCloud"]["extra_info"]["install_source"], "source information"
)
def _create_fake_test_result_file(self, num_pass=1, num_err=0, num_fail=0):
"""Helper function to generate an xunit test result file.
:param num_pass The number of passing tests to include
:param num_err The number of erroring tests to include
:param num_fail The number of failing tests to include
Returns the path to the created file.
"""
os.makedirs(self.policy.work_dir, exist_ok=True)
path = pathlib.PurePath(self.policy.work_dir, "TEST-FakeTests-20230101010101.xml")
root = ET.Element("testsuite", attrib={"name": "FakeTests-1234567890"})
for x in range(0, num_pass):
case_attrib = {"classname": "FakeTests", "name": "passing_test_{}".format(x), "time":"0.001"}
ET.SubElement(root, "testcase", attrib=case_attrib)
for x in range(0, num_err):
case_attrib = {"classname": "FakeTests", "name": "erroring_test_{}".format(x), "time":"0.001"}
testcase = ET.SubElement(root, "testcase", attrib=case_attrib)
err_attrib = {"type": "Exception", "message": "A useful error message" }
ET.SubElement(testcase, "error", attrib=err_attrib)
for x in range(0, num_fail):
case_attrib = {"classname": "FakeTests", "name": "failing_test_{}".format(x), "time":"0.001"}
testcase = ET.SubElement(root, "testcase", attrib=case_attrib)
fail_attrib = {"type": "AssertionError", "message": "A useful error message" }
ET.SubElement(testcase, "failure", attrib=fail_attrib)
tree = ET.ElementTree(root)
ET.indent(tree, space="\t", level=0)
with open(path, "w") as file:
tree.write(file, encoding="unicode", xml_declaration=True)
return path
if __name__ == "__main__":
unittest.main()
Loading…
Cancel
Save