|
|
|
# -*- coding: utf-8 -*-
|
|
|
|
|
|
|
|
# Copyright (C) 2013 Adam D. Barratt <adsb@debian.org>
|
|
|
|
|
|
|
|
# This program is free software; you can redistribute it and/or modify
|
|
|
|
# it under the terms of the GNU General Public License as published by
|
|
|
|
# the Free Software Foundation; either version 2 of the License, or
|
|
|
|
# (at your option) any later version.
|
|
|
|
|
|
|
|
# This program is distributed in the hope that it will be useful,
|
|
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
|
|
# GNU General Public License for more details.
|
|
|
|
|
|
|
|
from __future__ import print_function
|
|
|
|
|
|
|
|
from itertools import chain
|
|
|
|
|
|
|
|
from migrationitem import MigrationItem
|
|
|
|
|
|
|
|
|
|
|
|
class MalformedHintException(Exception):
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
class HintCollection(object):
|
|
|
|
def __init__(self):
|
|
|
|
self._hints = []
|
|
|
|
|
|
|
|
def __getitem__(self, type=None):
|
|
|
|
return self.search(type)
|
|
|
|
|
|
|
|
def search(self, type=None, onlyactive=True, package=None, \
|
|
|
|
version=None, removal=None):
|
|
|
|
|
|
|
|
return [ hint for hint in self._hints if
|
|
|
|
(type is None or type == hint.type) and
|
|
|
|
(hint.active or not onlyactive) and
|
|
|
|
(package is None or package == hint.packages[0].package) and
|
|
|
|
(version is None or version == hint.packages[0].version) and
|
|
|
|
(removal is None or removal == hint.packages[0].is_removal)
|
|
|
|
]
|
|
|
|
|
|
|
|
def add_hint(self, hint):
|
|
|
|
self._hints.append(hint)
|
|
|
|
|
|
|
|
|
|
|
|
class Hint(object):
|
|
|
|
NO_VERSION = [ 'block', 'block-all', 'block-udeb' ]
|
|
|
|
|
|
|
|
def __init__(self, user, hint_type, packages):
|
|
|
|
self._user = user
|
|
|
|
self._active = True
|
|
|
|
self._type = hint_type
|
|
|
|
self._packages = packages
|
|
|
|
|
|
|
|
if isinstance(self._packages, str):
|
|
|
|
self._packages = self._packages.split(' ')
|
|
|
|
|
|
|
|
self._packages = [MigrationItem(x) for x in self._packages]
|
|
|
|
|
|
|
|
self.check()
|
|
|
|
|
|
|
|
def check(self):
|
|
|
|
for package in self.packages:
|
|
|
|
if self.type in self.__class__.NO_VERSION:
|
|
|
|
if package.version is not None:
|
|
|
|
raise MalformedHintException("\"%s\" needs unversioned packages, got \"%s\"" % (self.type, package))
|
|
|
|
else:
|
|
|
|
if package.version is None:
|
|
|
|
raise MalformedHintException("\"%s\" needs versioned packages, got \"%s\"" % (self.type, package))
|
|
|
|
|
|
|
|
def set_active(self, active):
|
|
|
|
self._active = active
|
|
|
|
|
|
|
|
def __str__(self):
|
|
|
|
if self.type in self.__class__.NO_VERSION:
|
|
|
|
return '%s %s' % (self._type, ' '.join(x.uvname for x in self._packages))
|
|
|
|
else:
|
|
|
|
return '%s %s' % (self._type, ' '.join(x.name for x in self._packages))
|
|
|
|
|
|
|
|
def __eq__(self, other):
|
|
|
|
if self.type != other.type:
|
|
|
|
return False
|
|
|
|
else:
|
|
|
|
return frozenset(self.packages) == frozenset(other.packages)
|
|
|
|
|
|
|
|
@property
|
|
|
|
def type(self):
|
|
|
|
return self._type
|
|
|
|
|
|
|
|
@property
|
|
|
|
def packages(self):
|
|
|
|
return self._packages
|
|
|
|
|
|
|
|
@property
|
|
|
|
def active(self):
|
|
|
|
return self._active
|
|
|
|
|
|
|
|
@property
|
|
|
|
def user(self):
|
|
|
|
return self._user
|
|
|
|
|
|
|
|
@property
|
|
|
|
def package(self):
|
|
|
|
if self.packages:
|
|
|
|
assert len(self.packages) == 1, self.packages
|
|
|
|
return self.packages[0].package
|
|
|
|
else:
|
|
|
|
return None
|
|
|
|
|
|
|
|
@property
|
|
|
|
def version(self):
|
|
|
|
if self.packages:
|
|
|
|
assert len(self.packages) == 1, self.packages
|
|
|
|
return self.packages[0].version
|
|
|
|
else:
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
def split_into_one_hint_per_package(hints, who, hint_name, *args):
|
|
|
|
for package in args:
|
|
|
|
hints.add_hint(Hint(who, hint_name, package))
|
|
|
|
|
|
|
|
|
|
|
|
def single_hint_taking_list_of_packages(hints, who, hint_type, *args):
|
|
|
|
hints.add_hint(Hint(who, hint_type, args))
|
|
|
|
|
|
|
|
|
|
|
|
class HintParser(object):
|
|
|
|
|
|
|
|
def __init__(self, britney):
|
|
|
|
self._britney = britney
|
|
|
|
self.hints = HintCollection()
|
|
|
|
self._hint_table = {
|
|
|
|
'remark': (0, lambda *x: None),
|
|
|
|
|
|
|
|
# Migration grouping hints
|
|
|
|
'easy': (2, single_hint_taking_list_of_packages), # Easy needs at least 2 to make sense
|
|
|
|
'force-hint': (1, single_hint_taking_list_of_packages),
|
|
|
|
'hint': (1, single_hint_taking_list_of_packages),
|
|
|
|
|
|
|
|
# Block / freeze related hints
|
|
|
|
'block': (1, split_into_one_hint_per_package),
|
|
|
|
'block-all': (1, split_into_one_hint_per_package),
|
|
|
|
'block-udeb': (1, split_into_one_hint_per_package),
|
|
|
|
'unblock': (1, split_into_one_hint_per_package),
|
|
|
|
'unblock-udeb': (1, split_into_one_hint_per_package),
|
|
|
|
|
Add autopkgtest integration
Add new module autopkgtest.py with the logic for determining the tests for a
source package, requesting tests via AMQP, fetching results from swift, and
keeping track of pending tests between run. This also caches the downloaded
results from swift, as re-dowloading them all is very expensive.
Integrate this into britney.py:
* In should_upgrade_src(), check whether a package is built everywhere and
installable (run_autopkgtest), and good enough to run autopkgtests for it
and its reverse dependencies.
* In write_excuses(), generate test requests for all excuses and create blocks
for those that cause test regresssions.
This introduces two new hints:
* force-badtest pkg/ver: Failing results for that package will be ignored.
This is useful to deal with broken tests that get imported from Debian or
are from under-maintained packages, or broke due to some infrastructure
changes. These are long-lived usually.
* force-skiptest pkg/ver: Test results *triggered by* that package (i. e.
reverse dependencies) will be ignored. This is mostly useful for landing
packages that trigger a huge amount of tests (glibc, perl) where some tests
are just too flaky to get them all passing, and one just wants to land it
after the remaining failures have been checked. This should be used rarely
and the hints should be removed immediately again.
Add integration tests that call britney in various scenarios on constructed
fake archives, with mocked AMQP and Swift results.
9 years ago
|
|
|
# test related hints
|
|
|
|
'force-badtest': (1, split_into_one_hint_per_package),
|
|
|
|
'force-skiptest': (1, split_into_one_hint_per_package),
|
|
|
|
|
|
|
|
# Other
|
|
|
|
'remove': (1, split_into_one_hint_per_package),
|
|
|
|
'force': (1, split_into_one_hint_per_package),
|
|
|
|
}
|
|
|
|
self._aliases = {
|
|
|
|
'approve': 'unblock',
|
|
|
|
}
|
|
|
|
|
|
|
|
@property
|
|
|
|
def registered_hints(self):
|
|
|
|
"""A set of all known hints (and aliases thereof)"""
|
|
|
|
return set(chain(self._hint_table.keys(), self._aliases.keys()))
|
|
|
|
|
|
|
|
def register_hint_type(self, hint_name, parser_function, *, min_args=1, aliases=None):
|
|
|
|
"""Register a new hint that is supported by the parser
|
|
|
|
|
|
|
|
This registers a new hint that can be parsed by the hint parser. All hints are single words with a
|
|
|
|
space-separated list of arguments (on a single line). The hint parser will do some basic processing,
|
|
|
|
the permission checking and minor validation on the hint before passing it on to the parser function
|
|
|
|
given.
|
|
|
|
|
|
|
|
The parser_function will receive the following arguments:
|
|
|
|
* A hint collection
|
|
|
|
* Identifier of the entity providing the hint
|
|
|
|
* The hint_name (aliases will be mapped to the hint_name)
|
|
|
|
* Zero or more string arguments for the hint (so the function needs to use *args)
|
|
|
|
|
|
|
|
The parser_function will then have to process the arguments and call the hint collection's "add_hint"
|
|
|
|
as needed. Example implementations include "split_into_one_hint_per_package", which is used by almost
|
|
|
|
all policy hints.
|
|
|
|
|
|
|
|
:param hint_name: The name of the hint
|
|
|
|
:param parser_function: A function to add the hint
|
|
|
|
:param min_args: An optional positive integer (or 0) denoting the number of arguments the hint takes.
|
|
|
|
:param aliases: An optional iterable of aliases to the hint (use only for backwards compatibility)
|
|
|
|
"""
|
|
|
|
if min_args < 1:
|
|
|
|
raise ValueError("min_args must be at least 1")
|
|
|
|
if hint_name in self._hint_table:
|
|
|
|
raise ValueError("The hint type %s is already registered" % hint_name)
|
|
|
|
if hint_name in self._aliases:
|
|
|
|
raise ValueError("The hint type %s is already registered as an alias of %s" % (
|
|
|
|
hint_name, self._aliases[hint_name]))
|
|
|
|
self._hint_table[hint_name] = (min_args, parser_function)
|
|
|
|
if aliases:
|
|
|
|
for alias in aliases:
|
|
|
|
self._aliases[alias] = hint_name
|
|
|
|
|
|
|
|
def parse_hints(self, who, permitted_hints, filename, lines):
|
|
|
|
hint_table = self._hint_table
|
|
|
|
line_no = 0
|
|
|
|
hints = self.hints
|
|
|
|
aliases = self._aliases
|
|
|
|
for line in lines:
|
|
|
|
line = line.strip()
|
|
|
|
line_no += 1
|
|
|
|
if line == "" or line.startswith('#'):
|
|
|
|
continue
|
|
|
|
l = line.split()
|
|
|
|
hint_name = l[0]
|
|
|
|
if hint_name in aliases:
|
|
|
|
hint_name = aliases[hint_name]
|
|
|
|
l[0] = hint_name
|
|
|
|
if hint_name == 'finished':
|
|
|
|
break
|
|
|
|
if hint_name not in hint_table:
|
|
|
|
self.log("Unknown hint found in %s (line %d): '%s'" % (filename, line_no, line), type="W")
|
|
|
|
continue
|
|
|
|
if hint_name not in permitted_hints and 'ALL' not in permitted_hints:
|
|
|
|
reason = 'The hint is not a part of the permitted hints for ' + who
|
|
|
|
self.log("Ignoring \"%s\" hint from %s found in %s (line %d): %s" % (
|
|
|
|
hint_name, who, filename, line_no, reason), type="I")
|
|
|
|
continue
|
|
|
|
min_args, hint_parser_impl = hint_table[hint_name]
|
|
|
|
if len(l) - 1 < min_args:
|
|
|
|
self.log("Malformed hint found in %s (line %d): Needs at least %d argument(s), got %d" % (
|
|
|
|
filename, line_no, min_args, len(l) - 1), type="W")
|
|
|
|
continue
|
|
|
|
try:
|
|
|
|
hint_parser_impl(hints, who, *l)
|
|
|
|
except MalformedHintException as e:
|
|
|
|
self.log("Malformed hint found in %s (line %d): \"%s\"" % (
|
|
|
|
filename, line_no, e.args[0]), type="W")
|
|
|
|
continue
|
|
|
|
|
|
|
|
def log(self, msg, type="I"):
|
|
|
|
self._britney.log(msg, type=type)
|