CloudPolicy: Refactor to use adt_ppas

Cloud policy now uses the ADT_PPAS field as the source but will only
take private PPAS due to requiring a fingerprint. If ADT_PPAS is empty
then the 'proposed' archive is used.

Adds logic to try and retrieve the install source of a package from the
logs. This is useful if multiple PPAs are defined since the policy won't
explicitly know when contains the package under test.
sil2100/cloud-only-run-once
Aleksa Svitlica 2 years ago
parent 1c6f56ab79
commit 2bd1584c84

@ -125,11 +125,6 @@ PIUPARTS_ENABLE = no
# run cloud tests on packages # run cloud tests on packages
CLOUD_ENABLE = no CLOUD_ENABLE = no
# Where the package can be downloaded from in the cloud. Either an archive or ppa.
# For PPAs it should be defined in the format <ppa-url>=<fingerprint>
CLOUD_SOURCE = proposed
# Can be either 'archive' or 'ppa'
CLOUD_SOURCE_TYPE = archive
# A directory to store Cloud test results and logs. Is created at the start of # A directory to store Cloud test results and logs. Is created at the start of
# each policy run and deleted after test results are parsed. # each policy run and deleted after test results are parsed.
CLOUD_WORK_DIR = cloud_tests CLOUD_WORK_DIR = cloud_tests

@ -18,7 +18,7 @@ class MissingURNException(Exception):
FAIL_MESSAGE = """From: Ubuntu Release Team <noreply+proposed-migration@ubuntu.com> FAIL_MESSAGE = """From: Ubuntu Release Team <noreply+proposed-migration@ubuntu.com>
To: {recipients} To: {recipients}
X-Proposed-Migration: notice X-Proposed-Migration: notice
Subject: [proposed-migration] {package} {version} in {series}, {source} failed Cloud tests. Subject: [proposed-migration] {package} {version} in {series} failed Cloud tests.
Hi, Hi,
@ -36,7 +36,7 @@ Regards, Ubuntu Release Team.
ERR_MESSAGE = """From: Ubuntu Release Team <noreply+proposed-migration@ubuntu.com> ERR_MESSAGE = """From: Ubuntu Release Team <noreply+proposed-migration@ubuntu.com>
To: {recipients} To: {recipients}
X-Proposed-Migration: notice X-Proposed-Migration: notice
Subject: [proposed-migration] {package} {version} in {series}, {source} had errors running Cloud Tests. Subject: [proposed-migration] {package} {version} in {series} had errors running Cloud Tests.
Hi, Hi,
@ -51,6 +51,7 @@ Regards, Ubuntu Release Team.
class CloudPolicy(BasePolicy): class CloudPolicy(BasePolicy):
PACKAGE_SET_FILE = "cloud_package_set" PACKAGE_SET_FILE = "cloud_package_set"
DEFAULT_EMAILS = ["cpc@canonical.com"] DEFAULT_EMAILS = ["cpc@canonical.com"]
TEST_LOG_FILE = "CTF.log"
def __init__(self, options, suite_info, dry_run=False): def __init__(self, options, suite_info, dry_run=False):
super().__init__( super().__init__(
@ -68,8 +69,15 @@ class CloudPolicy(BasePolicy):
self.failure_emails = getattr(self.options, "cloud_failure_emails", self.DEFAULT_EMAILS) self.failure_emails = getattr(self.options, "cloud_failure_emails", self.DEFAULT_EMAILS)
self.error_emails = getattr(self.options, "cloud_error_emails", self.DEFAULT_EMAILS) self.error_emails = getattr(self.options, "cloud_error_emails", self.DEFAULT_EMAILS)
self.source = getattr(self.options, "cloud_source") adt_ppas = getattr(self.options, "adt_ppas", [])
self.source_type = getattr(self.options, "cloud_source_type") ppas = self._parse_ppas(adt_ppas)
if len(ppas) == 0:
self.sources = ["proposed"]
self.source_type = "archive"
else:
self.sources = ppas
self.source_type = "ppa"
self.failures = {} self.failures = {}
self.errors = {} self.errors = {}
@ -85,7 +93,7 @@ class CloudPolicy(BasePolicy):
if self.dry_run: if self.dry_run:
self.logger.info( self.logger.info(
"Cloud Policy: Dry run would test {} in {}, {}".format(item.package , self.options.series, self.source) "Cloud Policy: Dry run would test {} in {}".format(item.package , self.options.series)
) )
return PolicyVerdict.PASS return PolicyVerdict.PASS
@ -93,10 +101,10 @@ class CloudPolicy(BasePolicy):
self.failures = {} self.failures = {}
self.errors = {} self.errors = {}
self._run_cloud_tests(item.package, self.options.series, self.source, self.source_type) self._run_cloud_tests(item.package, self.options.series, self.sources, self.source_type)
if len(self.failures) > 0 or len(self.errors) > 0: if len(self.failures) > 0 or len(self.errors) > 0:
self._send_emails_if_needed(item.package, source_data_srcdist.version, self.options.series, self.source) self._send_emails_if_needed(item.package, source_data_srcdist.version, self.options.series)
self._cleanup_work_directory() self._cleanup_work_directory()
verdict = PolicyVerdict.REJECTED_PERMANENTLY verdict = PolicyVerdict.REJECTED_PERMANENTLY
@ -124,24 +132,23 @@ class CloudPolicy(BasePolicy):
return package_set return package_set
def _run_cloud_tests(self, package, series, source, source_type): def _run_cloud_tests(self, package, series, sources, source_type):
"""Runs any cloud tests for the given package. """Runs any cloud tests for the given package.
Nothing is returned but test failures and errors are stored in instance variables. Nothing is returned but test failures and errors are stored in instance variables.
:param package The name of the package to test :param package The name of the package to test
:param series The Ubuntu codename for the series (e.g. jammy) :param series The Ubuntu codename for the series (e.g. jammy)
:param source Where the package should be installed from (e.g. proposed or a PPA) :param sources List of sources where the package should be installed from (e.g. [proposed] or PPAs)
:param source_type Either 'archive' or 'ppa' :param source_type Either 'archive' or 'ppa'
""" """
self._run_azure_tests(package, series, source, source_type) self._run_azure_tests(package, series, sources, source_type)
def _send_emails_if_needed(self, package, version, series, source): def _send_emails_if_needed(self, package, version, series):
"""Sends email(s) if there are test failures and/or errors """Sends email(s) if there are test failures and/or errors
:param package The name of the package that was tested :param package The name of the package that was tested
:param version The version number of the package :param version The version number of the package
:param series The Ubuntu codename for the series (e.g. jammy) :param series The Ubuntu codename for the series (e.g. jammy)
:param source Where the package should be installed from (e.g. proposed or a PPA)
""" """
if len(self.failures) > 0: if len(self.failures) > 0:
emails = self.failure_emails emails = self.failure_emails
@ -159,34 +166,42 @@ class CloudPolicy(BasePolicy):
self.logger.info("Cloud Policy: Sending error email for {}, to {}".format(package, emails)) self.logger.info("Cloud Policy: Sending error email for {}, to {}".format(package, emails))
self._send_email(emails, message) self._send_email(emails, message)
def _run_azure_tests(self, package, series, source, source_type): def _run_azure_tests(self, package, series, sources, source_type):
"""Runs Azure's required package tests. """Runs Azure's required package tests.
:param package The name of the package to test :param package The name of the package to test
:param series The Ubuntu codename for the series (e.g. jammy) :param series The Ubuntu codename for the series (e.g. jammy)
:param source Where the package should be installed from (e.g. proposed or a PPA) :param sources List of sources where the package should be installed from (e.g. [proposed] or PPAs)
:param source_type Either 'archive' or 'ppa' :param source_type Either 'archive' or 'ppa'
""" """
urn = self._retrieve_urn(series) urn = self._retrieve_urn(series)
install_flag = self._determine_install_flag(source_type)
self.logger.info("Cloud Policy: Running Azure tests for: {} in {}, {}".format(package, series, source)) self.logger.info("Cloud Policy: Running Azure tests for: {} in {}".format(package, series))
subprocess.run( params = [
[
"/snap/bin/cloud-test-framework", "/snap/bin/cloud-test-framework",
"--instance-prefix", "britney-{}-{}-{}".format(package, series, source), "--instance-prefix", "britney-{}-{}".format(package, series)
install_flag, "{}/{}".format(package, source), ]
params.extend(self._format_install_flags(package, sources, source_type))
params.extend(
[
"azure_gen2", "azure_gen2",
"--location", "westeurope", "--location", getattr(self.options, "cloud_azure_location", "westeurope"),
"--vm-size", "Standard_D2s_v5", "--vm-size", getattr(self.options, "cloud_azure_vm_size", "Standard_D2s_v5"),
"--urn", urn, "--urn", urn,
"run-test", "package-install-with-reboot", "run-test", "package-install-with-reboot",
], ]
cwd=self.work_dir )
with open(PurePath(self.work_dir, self.TEST_LOG_FILE), "w") as file:
subprocess.run(
params,
cwd=self.work_dir,
stdout=file
) )
results_file_paths = self._find_results_files(r"TEST-NetworkTests-[0-9]*.xml") results_file_paths = self._find_results_files(r"TEST-NetworkTests-[0-9]*.xml")
self._parse_xunit_test_results("Azure", results_file_paths) self._parse_xunit_test_results("Azure", results_file_paths)
self._store_extra_test_result_info(self, package)
def _retrieve_urn(self, series): def _retrieve_urn(self, series):
"""Retrieves an URN from the configuration options based on series. """Retrieves an URN from the configuration options based on series.
@ -243,18 +258,18 @@ class CloudPolicy(BasePolicy):
type = e.attrib.get('type') type = e.attrib.get('type')
message = e.attrib.get('message') message = e.attrib.get('message')
info = "{}: {}".format(type, message) info = "{}: {}".format(type, message)
self.store_test_result( self._store_test_result(
self.failures, cloud, el.attrib.get('name'), info self.failures, cloud, el.attrib.get('name'), info
) )
if e.tag == "error": if e.tag == "error":
type = e.attrib.get('type') type = e.attrib.get('type')
message = e.attrib.get('message') message = e.attrib.get('message')
info = "{}: {}".format(type, message) info = "{}: {}".format(type, message)
self.store_test_result( self._store_test_result(
self.errors, cloud, el.attrib.get('name'), info self.errors, cloud, el.attrib.get('name'), info
) )
def store_test_result(self, results, cloud, test_name, message): def _store_test_result(self, results, cloud, test_name, message):
"""Adds the test to the results hash under the given cloud. """Adds the test to the results hash under the given cloud.
Results format: Results format:
@ -275,10 +290,61 @@ class CloudPolicy(BasePolicy):
results[cloud][test_name] = message results[cloud][test_name] = message
def _store_extra_test_result_info(self, cloud, package):
"""Stores any information beyond the test results and stores it in the results dicts
under Cloud->extra_info
Stores any information retrieved under the cloud's section in failures/errors but will
store nothing if failures/errors are empty.
:param cloud The name of the cloud
:param package The name of the package to test
"""
if len(self.failures) == 0 and len(self.errors) == 0:
return
extra_info = {}
install_source = self._retrieve_package_install_source_from_test_output(package)
if install_source:
extra_info["install_source"] = install_source
if len(self.failures.get(cloud, {})) > 0:
self._store_test_result(self.failures, cloud, "extra_info", extra_info)
if len(self.errors.get(cloud, {})) > 0:
self._store_test_result(self.errors, cloud, "extra_info", extra_info)
def _retrieve_package_install_source_from_test_output(self, package):
"""Checks the test logs for apt logs which show where the package was installed from.
Useful if multiple PPA sources are defined since we won't explicitly know the exact source.
Will return nothing unless exactly one matching line is found.
:param package The name of the package to test
"""
possible_locations = []
with open(PurePath(self.work_dir, self.TEST_LOG_FILE), "r") as file:
for line in file:
if package not in line:
continue
if "Get:" not in line:
continue
if " {} ".format(package) not in line:
continue
possible_locations.append(line)
if len(possible_locations) == 1:
return possible_locations[0]
else:
return None
def _format_email_message(self, template, emails, package, version, test_results): def _format_email_message(self, template, emails, package, version, test_results):
"""Insert given parameters into the email template.""" """Insert given parameters into the email template."""
series = self.options.series series = self.options.series
source = self.source
results = json.dumps(test_results, indent=4) results = json.dumps(test_results, indent=4)
recipients = ", ".join(emails) recipients = ", ".join(emails)
message = template.format(**locals()) message = template.format(**locals())
@ -312,14 +378,56 @@ class CloudPolicy(BasePolicy):
return info return info
def _determine_install_flag(self, source_type): def _format_install_flags(self, package, sources, source_type):
"""Determine the flags required to install the package from the given sources
:param package The name of the package to test
:param sources List of sources where the package should be installed from (e.g. [proposed] or PPAs)
:param source_type Either 'archive' or 'ppa'
"""
install_flags = []
for source in sources:
if source_type == "archive": if source_type == "archive":
return "--install-archive-package" install_flags.append("--install-archive-package")
install_flags.append("{}/{}".format(package, source))
elif source_type == "ppa": elif source_type == "ppa":
return "--install-ppa-package" install_flags.append("--install-ppa-package")
install_flags.append("{}/{}".format(package, source))
else: else:
raise RuntimeError("Cloud Policy: Unexpected source type, {}".format(source_type)) raise RuntimeError("Cloud Policy: Unexpected source type, {}".format(source_type))
return install_flags
def _parse_ppas(self, ppas):
"""Parse PPA list to store in format expected by cloud tests
Since currently only private PPAs in Britney are provided with fingerprints we will
only use those.
Britney private PPA format:
'user:token@team/name:fingerprint'
Cloud private PPA format:
'https://user:token@private-ppa.launchpadcontent.net/team/name/ubuntu=fingerprint
:param ppas List of PPAs in Britney approved format
:return A list of PPAs in valid cloud test format. Can return an empty list if none found.
"""
cloud_ppas = []
for ppa in ppas:
if '@' in ppa:
match = re.match("^(?P<auth>.+:.+)@(?P<name>.+):(?P<fingerprint>.+$)", ppa)
if not match:
raise RuntimeError('Private PPA %s not following required format (user:token@team/name:fingerprint)', ppa)
formatted_ppa = "https://{}@private-ppa.launchpadcontent.net/{}/ubuntu={}".format(
match.group("auth"), match.group("name"), match.group("fingerprint")
)
cloud_ppas.append(formatted_ppa)
return cloud_ppas
def _setup_work_directory(self): def _setup_work_directory(self):
"""Create a directory for tests to be run in.""" """Create a directory for tests to be run in."""
self._cleanup_work_directory() self._cleanup_work_directory()

@ -6,10 +6,10 @@
# the Free Software Foundation; either version 2 of the License, or # the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version. # (at your option) any later version.
import json
import os import os
import pathlib import pathlib
import sys import sys
from types import SimpleNamespace
import unittest import unittest
from unittest.mock import patch from unittest.mock import patch
import xml.etree.ElementTree as ET import xml.etree.ElementTree as ET
@ -26,18 +26,18 @@ class FakeItem:
class FakeSourceData: class FakeSourceData:
version = "55.0" version = "55.0"
class FakeOptions:
distribution = "testbuntu"
series = "zazzy"
unstable = "/tmp"
verbose = False
cloud_source = "zazzy-proposed"
cloud_source_type = "archive"
cloud_azure_zazzy_urn = "fake-urn-value"
class T(unittest.TestCase): class T(unittest.TestCase):
def setUp(self): def setUp(self):
self.policy = CloudPolicy(FakeOptions, {}) self.fake_options = SimpleNamespace(
distrubtion = "testbuntu",
series = "zazzy",
unstable = "/tmp",
verbose = False,
cloud_source = "zazzy-proposed",
cloud_source_type = "archive",
cloud_azure_zazzy_urn = "fake-urn-value"
)
self.policy = CloudPolicy(self.fake_options, {})
self.policy._setup_work_directory() self.policy._setup_work_directory()
def tearDown(self): def tearDown(self):
@ -49,14 +49,13 @@ class T(unittest.TestCase):
""" """
self.policy.package_set = set(["chromium-browser"]) self.policy.package_set = set(["chromium-browser"])
self.policy.options.series = "jammy" self.policy.options.series = "jammy"
self.policy.source = "jammy-proposed"
self.policy.apply_src_policy_impl( self.policy.apply_src_policy_impl(
None, FakeItem, None, FakeSourceData, None None, FakeItem, None, FakeSourceData, None
) )
mock_run.assert_called_once_with( mock_run.assert_called_once_with(
"chromium-browser", "jammy", "jammy-proposed", "archive" "chromium-browser", "jammy", ["proposed"], "archive"
) )
@patch("britney2.policies.cloud.CloudPolicy._run_cloud_tests") @patch("britney2.policies.cloud.CloudPolicy._run_cloud_tests")
@ -65,7 +64,6 @@ class T(unittest.TestCase):
self.policy.package_set = set(["vim"]) self.policy.package_set = set(["vim"])
self.policy.options.series = "jammy" self.policy.options.series = "jammy"
self.policy.source = "jammy-proposed"
self.policy.apply_src_policy_impl( self.policy.apply_src_policy_impl(
None, FakeItem, None, FakeSourceData, None None, FakeItem, None, FakeSourceData, None
@ -76,7 +74,7 @@ class T(unittest.TestCase):
@patch("britney2.policies.cloud.smtplib") @patch("britney2.policies.cloud.smtplib")
@patch("britney2.policies.cloud.CloudPolicy._run_cloud_tests") @patch("britney2.policies.cloud.CloudPolicy._run_cloud_tests")
def test_no_tests_run_during_dry_run(self, mock_run, smtp): def test_no_tests_run_during_dry_run(self, mock_run, smtp):
self.policy = CloudPolicy(FakeOptions, {}, dry_run=True) self.policy = CloudPolicy(self.fake_options, {}, dry_run=True)
self.policy.package_set = set(["chromium-browser"]) self.policy.package_set = set(["chromium-browser"])
self.policy.options.series = "jammy" self.policy.options.series = "jammy"
self.policy.source = "jammy-proposed" self.policy.source = "jammy-proposed"
@ -173,15 +171,94 @@ class T(unittest.TestCase):
self.assertIn(expected_failure_info, info) self.assertIn(expected_failure_info, info)
self.assertIn(expected_error_info, info) self.assertIn(expected_error_info, info)
def test_determine_install_flag(self): def test_format_install_flags_with_ppas(self):
"""Ensure the correct flag is determined and errors are raised for unknown source types""" """Ensure the correct flags are returned with PPA sources"""
install_flag = self.policy._determine_install_flag("archive") expected_flags = [
self.assertEqual(install_flag, "--install-archive-package") "--install-ppa-package", "tmux/ppa_url=fingerprint",
"--install-ppa-package", "tmux/ppa_url2=fingerprint"
]
install_flags = self.policy._format_install_flags(
"tmux", ["ppa_url=fingerprint", "ppa_url2=fingerprint"], "ppa"
)
self.assertListEqual(install_flags, expected_flags)
def test_format_install_flags_with_archive(self):
"""Ensure the correct flags are returned with archive sources"""
expected_flags = ["--install-archive-package", "tmux/proposed"]
install_flags = self.policy._format_install_flags("tmux", ["proposed"], "archive")
self.assertListEqual(install_flags, expected_flags)
def test_format_install_flags_with_incorrect_type(self):
"""Ensure errors are raised for unknown source types"""
self.assertRaises(RuntimeError, self.policy._format_install_flags, "tmux", ["a_source"], "something")
def test_parse_ppas(self):
"""Ensure correct conversion from Britney format to cloud test format
Also check that public PPAs are not used due to fingerprint requirement for cloud
tests.
"""
input_ppas = [
"deadsnakes/ppa",
"user:token@team/name:fingerprint"
]
expected_ppas = [
"https://user:token@private-ppa.launchpadcontent.net/team/name/ubuntu=fingerprint"
]
output_ppas = self.policy._parse_ppas(input_ppas)
self.assertListEqual(output_ppas, expected_ppas)
install_flag = self.policy._determine_install_flag("ppa") def test_errors_raised_if_invalid_ppa_input(self):
self.assertEqual(install_flag, "--install-ppa-package") """Test that error are raised if input PPAs don't match expected format"""
self.assertRaises(
RuntimeError, self.policy._parse_ppas, ["user:token@team/name"]
)
self.assertRaises(
RuntimeError, self.policy._parse_ppas, ["user:token@team=fingerprint"]
)
def test_retrieve_package_install_source_from_test_output(self):
"""Ensure retrieving the package install source from apt output only returns the line we
want and not other lines containing the package name.
Ensure it returns nothing if multiple candidates are found because that means the parsing
needs to be updated.
"""
package = "tmux"
self.assertRaises(RuntimeError, self.policy._determine_install_flag, "something") with open(pathlib.PurePath(self.policy.work_dir, self.policy.TEST_LOG_FILE), "w") as file:
file.write("Get: something \n".format(package))
file.write("Get: lib-{} \n".format(package))
install_source = self.policy._retrieve_package_install_source_from_test_output(package)
self.assertIsNone(install_source)
with open(pathlib.PurePath(self.policy.work_dir, self.policy.TEST_LOG_FILE), "a") as file:
file.write("Get: {} \n".format(package))
install_source = self.policy._retrieve_package_install_source_from_test_output(package)
self.assertEqual(install_source, "Get: tmux \n")
@patch("britney2.policies.cloud.CloudPolicy._retrieve_package_install_source_from_test_output")
def test_store_extra_test_result_info(self, mock):
"""Ensure nothing is done if there are no failures/errors.
Ensure that if there are failures/errors that any extra info retrieved is stored in the
results dict Results -> Cloud -> extra_info
"""
self.policy._store_extra_test_result_info("FakeCloud", "tmux")
mock.assert_not_called()
self.policy.failures = {"FakeCloud": {"failing_test": "failure reason"}}
mock.return_value = "source information"
self.policy._store_extra_test_result_info("FakeCloud", "tmux")
self.assertEqual(
self.policy.failures["FakeCloud"]["extra_info"]["install_source"], "source information"
)
def _create_fake_test_result_file(self, num_pass=1, num_err=0, num_fail=0): def _create_fake_test_result_file(self, num_pass=1, num_err=0, num_fail=0):
"""Helper function to generate an xunit test result file. """Helper function to generate an xunit test result file.

Loading…
Cancel
Save