Run autopkgtests for valid candidates, and wait for the results.

bzr-import-20160707
Colin Watson 12 years ago
parent 111ab78d7d
commit f961f74ef3

@ -0,0 +1,168 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2013 Canonical Ltd.
# Author: Colin Watson <cjwatson@ubuntu.com>
# Partly based on code in auto-package-testing by
# Jean-Baptiste Lallement <jean-baptiste.lallement@canonical.com>
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
from __future__ import print_function
from collections import defaultdict
from contextlib import closing
import logging
import os
import subprocess
import tempfile
from textwrap import dedent
adt_britney = os.path.expanduser("~/auto-package-testing/jenkins/adt-britney")
class AutoPackageTest(object):
"""autopkgtest integration
Look for autopkgtest jobs to run for each update that is otherwise a
valid candidate, and collect the results. If an update causes any
autopkgtest jobs to be run, then they must all pass before the update is
accepted.
"""
def __init__(self, series, debug=False):
self.series = series
self.debug = debug
self.read()
self.rc_path = None
def _ensure_rc_file(self):
if self.rc_path:
return
self.rc_path = os.path.expanduser(
"~/proposed-migration/autopkgtest/rc.%s" % self.series)
with open(self.rc_path, "w") as rc_file:
home = os.path.expanduser("~")
print(dedent("""\
aptroot: ~/.chdist/%s-proposed-amd64/
apturi: file:%s/mirror/ubuntu
components: main restricted universe multiverse
rsync_host: rsync://10.189.74.2/adt/
datadir: ~/proposed-migration/autopkgtest/data""" %
(self.series, home)), file=rc_file)
@property
def _request_path(self):
return os.path.expanduser(
"~/proposed-migration/autopkgtest/work/adt.request.%s" %
self.series)
@property
def _result_path(self):
return os.path.expanduser(
"~/proposed-migration/autopkgtest/work/adt.result.%s" %
self.series)
def _parse(self, path):
if os.path.exists(path):
with open(path) as f:
for line in f:
line = line.strip()
if line.startswith("Suite:") or line.startswith("Date:"):
continue
linebits = line.split()
if len(linebits) < 2:
logging.warning(
"Invalid line format: '%s', skipped" % line)
continue
yield linebits
def read(self):
self.pkglist = defaultdict(dict)
self.pkgcauses = defaultdict(lambda: defaultdict(list))
for linebits in self._parse(self._result_path):
src = linebits.pop(0)
ver = linebits.pop(0)
self.pkglist[src][ver] = {
"status": "NEW",
"causes": {},
}
try:
status = linebits.pop(0).upper()
self.pkglist[src][ver]["status"] = status
while True:
trigsrc = linebits.pop(0)
trigver = linebits.pop(0)
self.pkglist[src][ver]["causes"][trigsrc] = trigver
self.pkgcauses[trigsrc][trigver].append((status, src, ver))
except IndexError:
# End of the list
pass
def _adt_britney(self, *args):
command = [
adt_britney,
"-c", self.rc_path, "-r", self.series, "-a", "amd64", "-PU",
]
if self.debug:
command.append("-d")
command.extend(args)
subprocess.check_call(command)
def request(self, packages):
self._ensure_rc_file()
request_path = self._request_path
if os.path.exists(request_path):
os.unlink(request_path)
with closing(tempfile.NamedTemporaryFile(mode="w")) as request_file:
for src, ver in packages:
if src in self.pkglist and ver in self.pkglist[src]:
continue
print("%s %s" % (src, ver), file=request_file)
request_file.flush()
self._adt_britney("request", "-O", request_path, request_file.name)
for linebits in self._parse(request_path):
# Make sure that there's an entry in pkgcauses for each new
# request, so that results() gives useful information without
# relying on the submit/collect cycle. This improves behaviour
# in dry-run mode.
src = linebits.pop(0)
ver = linebits.pop(0)
try:
status = linebits.pop(0).upper()
while True:
trigsrc = linebits.pop(0)
trigver = linebits.pop(0)
for status, csrc, cver in self.pkgcauses[src][ver]:
if csrc == trigsrc and cver == trigver:
break
else:
self.pkgcauses[src][ver].append(
(status, trigsrc, trigver))
except IndexError:
# End of the list
pass
def submit(self):
self._ensure_rc_file()
request_path = self._request_path
if os.path.exists(request_path):
self._adt_britney("submit", request_path)
def collect(self):
self._ensure_rc_file()
result_path = self._result_path
self._adt_britney("collect", "-O", result_path)
self.read()
def results(self, trigsrc, trigver):
for status, src, ver in self.pkgcauses[trigsrc][trigver]:
yield status, src, ver

@ -52,3 +52,8 @@ HINTS_STGRABER = ALL
SMOOTH_UPDATES = badgers
REMOVE_OBSOLETE = no
# autopkgtest needs to know the series name; set to the empty string to
# disable autopkgtest
ADT_SERIES = saucy
ADT_DEBUG = no

@ -213,6 +213,7 @@ from excuse import Excuse
from migrationitem import MigrationItem, HintItem
from hints import HintCollection
from britney import buildSystem
from autopkgtest import AutoPackageTest
__author__ = 'Fabio Tranchitella and the Debian Release Team'
@ -916,7 +917,7 @@ class Britney(object):
elif len(l) == 1:
# All current hints require at least one argument
self.__log("Malformed hint found in %s: '%s'" % (filename, line), type="W")
elif l[0] in ["approve", "block", "block-all", "block-udeb", "unblock", "unblock-udeb", "force", "urgent", "remove"]:
elif l[0] in ["approve", "block", "block-all", "block-udeb", "unblock", "unblock-udeb", "force", "force-autopkgtest", "urgent", "remove"]:
for package in l[1:]:
hints.add_hint('%s %s' % (l[0], package), who)
elif l[0] in ["age-days"]:
@ -925,7 +926,7 @@ class Britney(object):
else:
hints.add_hint(l, who)
for x in ["approve", "block", "block-all", "block-udeb", "unblock", "unblock-udeb", "force", "urgent", "remove", "age-days"]:
for x in ["approve", "block", "block-all", "block-udeb", "unblock", "unblock-udeb", "force", "force-autopkgtest", "urgent", "remove", "age-days"]:
z = {}
for hint in hints[x]:
package = hint.package
@ -1755,6 +1756,49 @@ class Britney(object):
# extract the not considered packages, which are in the excuses but not in upgrade_me
unconsidered = [e.name for e in self.excuses if e.name not in upgrade_me]
if self.options.adt_series:
# trigger autopkgtests for valid candidates
adt_debug = getattr(self.options, "adt_debug", "no") == "yes"
autopkgtest = AutoPackageTest(
self.options.adt_series, debug=adt_debug)
autopkgtest_packages = []
autopkgtest_excuses = []
for e in self.excuses:
if e.name not in upgrade_me:
continue
# skip removals, binary-only candidates, and proposed-updates
if e.name.startswith("-") or "/" in e.name or "_" in e.name:
pass
if e.ver[1] == "-":
continue
autopkgtest_excuses.append(e)
autopkgtest_packages.append((e.name, e.ver[1]))
autopkgtest.request(autopkgtest_packages)
if not self.options.dry_run:
autopkgtest.submit()
autopkgtest.collect()
for e in autopkgtest_excuses:
adtpass = True
for status, adtsrc, adtver in autopkgtest.results(
e.name, e.ver[1]):
e.addhtml(
"autopkgtest for %s %s: %s" % (adtsrc, adtver, status))
if status != "PASS":
adtpass = False
if not adtpass:
forces = [
x for x in self.hints.search(
'force-autopkgtest', package=e.name)
if self.same_source(e.ver[1], x.version) ]
if forces:
e.addhtml(
"Should ignore, but forced by %s" % forces[0].user)
else:
upgrade_me.remove(e.name)
unconsidered.append(e.name)
e.addhtml("Not considered")
e.is_valid = False
# invalidate impossible excuses
for e in self.excuses:
# parts[0] == package name

Loading…
Cancel
Save