Ivo De Decker 920d82445d Add allow-smooth-update hint
Hint to allow smooth update, even if the section isn't allowed in the
configuration.

Note that this takes the source name and the source version IN TESTING
of the binaries that must be allowed to stay around to allow a smooth
update.
2020-04-01 16:37:56 +00:00

254 lines
9.5 KiB
Python

# -*- coding: utf-8 -*-
# Copyright (C) 2013 Adam D. Barratt <adsb@debian.org>
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
import logging
from itertools import chain
class MalformedHintException(Exception):
pass
class HintCollection(object):
def __init__(self):
self._hints = []
@property
def is_empty(self):
return not self._hints
def __getitem__(self, type=None):
return self.search(type)
def search(self, type=None, onlyactive=True, package=None,
version=None, architecture=None, suite=None, removal=None):
return [hint for hint in self._hints if
(type is None or type == hint.type) and
(hint.active or not onlyactive) and
(package is None or package == hint.packages[0].package) and
(version is None or version == hint.packages[0].version) and
(architecture is None or architecture == hint.packages[0].architecture) and
(suite is None or suite == hint.packages[0].suite) and
(removal is None or removal == hint.packages[0].is_removal)]
def add_hint(self, hint):
self._hints.append(hint)
class Hint(object):
NO_VERSION = ['block', 'block-all', 'block-udeb', 'allow-archall-maintainer-upload', 'allow-uninst']
def __init__(self, user, hint_type, packages):
self._user = user
self._active = True
self._type = hint_type
self._packages = packages
self.check()
def check(self):
for package in self.packages:
# TODO check if hint is allowed to specify architecture
if self.type in self.__class__.NO_VERSION:
if package.version is not None:
raise MalformedHintException("\"%s\" needs unversioned packages, got \"%s\"" % (self.type, package))
else:
if package.version is None:
raise MalformedHintException("\"%s\" needs versioned packages, got \"%s\"" % (self.type, package))
def set_active(self, active):
self._active = active
def __str__(self):
if self.type in self.__class__.NO_VERSION:
return '%s %s' % (self._type, ' '.join(x.uvname for x in self._packages))
else:
return '%s %s' % (self._type, ' '.join(x.name for x in self._packages))
def __eq__(self, other):
if self.type != other.type:
return False
else:
# we can't use sets, because unversioned items cannot be hashed
return sorted(self.packages) == sorted(other.packages)
@property
def type(self):
return self._type
@property
def packages(self):
return self._packages
@property
def active(self):
return self._active
@property
def user(self):
return self._user
@property
def package(self):
if self.packages:
assert len(self.packages) == 1, self.packages
return self.packages[0].package
else:
return None
@property
def version(self):
if self.packages:
assert len(self.packages) == 1, self.packages
return self.packages[0].version
else:
return None
@property
def architecture(self):
if self.packages:
assert len(self.packages) == 1, self.packages
return self.packages[0].architecture
else:
return None
@property
def suite(self):
if self.packages:
assert len(self.packages) == 1, self.packages
return self.packages[0].suite
else:
return None
def split_into_one_hint_per_package(mi_factory, hints, who, hint_name, *args):
for item in mi_factory.parse_items(*args):
hints.add_hint(Hint(who, hint_name, [item]))
def single_hint_taking_list_of_packages(mi_factory, hints, who, hint_type, *args):
hints.add_hint(Hint(who, hint_type, mi_factory.parse_items(*args)))
class HintParser(object):
def __init__(self, mi_factory):
logger_name = ".".join((self.__class__.__module__, self.__class__.__name__))
self.logger = logging.getLogger(logger_name)
self.hints = HintCollection()
self.mi_factory = mi_factory
self._hint_table = {
'remark': (0, lambda *x: None),
# Migration grouping hints
'easy': (2, single_hint_taking_list_of_packages), # Easy needs at least 2 to make sense
'force-hint': (1, single_hint_taking_list_of_packages),
'hint': (1, single_hint_taking_list_of_packages),
# Block / freeze related hints
'block': (1, split_into_one_hint_per_package),
'block-all': (1, split_into_one_hint_per_package),
'block-udeb': (1, split_into_one_hint_per_package),
'unblock': (1, split_into_one_hint_per_package),
'unblock-udeb': (1, split_into_one_hint_per_package),
# Other
'remove': (1, split_into_one_hint_per_package),
'force': (1, split_into_one_hint_per_package),
'allow-uninst': (1, split_into_one_hint_per_package),
'allow-smooth-update': (1, split_into_one_hint_per_package),
}
self._aliases = {
'approve': 'unblock',
}
@property
def registered_hints(self):
"""A set of all known hints (and aliases thereof)"""
return set(chain(self._hint_table.keys(), self._aliases.keys()))
def register_hint_type(self, hint_name, parser_function, *, min_args=1, aliases=None):
"""Register a new hint that is supported by the parser
This registers a new hint that can be parsed by the hint parser. All hints are single words with a
space-separated list of arguments (on a single line). The hint parser will do some basic processing,
the permission checking and minor validation on the hint before passing it on to the parser function
given.
The parser_function will receive the following arguments:
* A hint collection
* Identifier of the entity providing the hint
* The hint_name (aliases will be mapped to the hint_name)
* Zero or more string arguments for the hint (so the function needs to use *args)
The parser_function will then have to process the arguments and call the hint collection's "add_hint"
as needed. Example implementations include "split_into_one_hint_per_package", which is used by almost
all policy hints.
:param hint_name: The name of the hint
:param parser_function: A function to add the hint
:param min_args: An optional positive integer (non-zero) denoting the number of arguments the hint takes.
:param aliases: An optional iterable of aliases to the hint (use only for backwards compatibility)
"""
if min_args < 1:
raise ValueError("min_args must be at least 1")
if hint_name in self._hint_table:
raise ValueError("The hint type %s is already registered" % hint_name)
if hint_name in self._aliases:
raise ValueError("The hint type %s is already registered as an alias of %s" % (
hint_name, self._aliases[hint_name]))
self._hint_table[hint_name] = (min_args, parser_function)
if aliases:
for alias in aliases:
self._aliases[alias] = hint_name
def parse_hints(self, who, permitted_hints, filename, lines):
hint_table = self._hint_table
line_no = 0
hints = self.hints
aliases = self._aliases
mi_factory = self.mi_factory
for line in lines:
line = line.strip()
line_no += 1
if line == "" or line.startswith('#'):
continue
ln = line.split()
hint_name = ln[0]
if hint_name in aliases:
hint_name = aliases[hint_name]
ln[0] = hint_name
if hint_name == 'finished':
break
if hint_name not in hint_table:
self.logger.warning("Unknown hint found in %s (line %d): '%s'", filename, line_no, line)
continue
if hint_name not in permitted_hints and 'ALL' not in permitted_hints:
reason = 'The hint is not a part of the permitted hints for ' + who
self.logger.info("Ignoring \"%s\" hint from %s found in %s (line %d): %s",
hint_name, who, filename, line_no, reason)
continue
min_args, hint_parser_impl = hint_table[hint_name]
if len(ln) - 1 < min_args:
self.logger.warning("Malformed hint found in %s (line %d): Needs at least %d argument(s), got %d",
filename, line_no, min_args, len(ln) - 1)
continue
try:
hint_parser_impl(mi_factory, hints, who, *ln)
except MalformedHintException as e:
self.logger.warning("Malformed hint found in %s (line %d): \"%s\"", filename, line_no, e.args[0])
continue