Create a MigrationItemFactory and migrate most code to use it

This is a step towards making migration unit-testable.  This step
reduces the need for global state (in the MigrationItem class as class
fields) and with another step we can remove the global state entirely
and enable unit tests to create migration items without having to
worry about other unit tests.

Signed-off-by: Niels Thykier <niels@thykier.net>
ubuntu/rebased
Niels Thykier 6 years ago
parent 584f75bbc0
commit 6174d2c3f9
No known key found for this signature in database
GPG Key ID: A65B78DBE67C7AAC

@ -200,13 +200,13 @@ from britney2.inputs.suiteloader import DebMirrorLikeSuiteContentLoader, Missing
from britney2.installability.builder import build_installability_tester
from britney2.installability.solver import InstallabilitySolver
from britney2.migration import MigrationManager
from britney2.migrationitem import MigrationItem
from britney2.migrationitem import MigrationItem, MigrationItemFactory
from britney2.policies import PolicyVerdict
from britney2.policies.policy import AgePolicy, RCBugPolicy, PiupartsPolicy, BuildDependsPolicy
from britney2.policies.autopkgtest import AutopkgtestPolicy
from britney2.utils import (log_and_format_old_libraries, get_dependency_solvers,
read_nuninst, write_nuninst, write_heidi,
format_and_log_uninst, newly_uninst, make_migrationitem,
format_and_log_uninst, newly_uninst,
write_excuses, write_heidi_delta, write_controlfiles,
old_libraries, is_nuninst_asgood_generous,
clone_nuninst,
@ -283,7 +283,6 @@ class Britney(object):
# parse the command line arguments
self.policies = []
self._hint_parser = HintParser()
self.suite_info = None # Initialized during __parse_arguments
self.__parse_arguments()
MigrationItem.set_architectures(self.options.architectures)
@ -329,6 +328,8 @@ class Britney(object):
target_suite = self.suite_info.target_suite
target_suite.inst_tester = self._inst_tester
self._migration_item_factory = MigrationItemFactory(self.suite_info)
self._hint_parser = HintParser(self._migration_item_factory)
self._migration_manager = MigrationManager(self.options, self.suite_info, self.all_binaries, self.pkg_universe,
self.constraints)
@ -1442,7 +1443,8 @@ class Britney(object):
invalidate_excuses(excuses, upgrade_me, unconsidered)
# sort the list of candidates
self.upgrade_me = sorted(make_migrationitem(x, suite_info) for x in upgrade_me)
mi_factory = self._migration_item_factory
self.upgrade_me = sorted(mi_factory.parse_item(x, versioned=False, auto_correct=False) for x in upgrade_me)
# write excuses to the output file
if not self.options.dry_run:
@ -1591,7 +1593,7 @@ class Britney(object):
if try_removals and self.options.smooth_updates:
self.logger.info("> Removing old packages left in the target suite from smooth updates")
removals = old_libraries(self.suite_info, self.options.outofsync_arches)
removals = old_libraries(self._migration_item_factory, self.suite_info, self.options.outofsync_arches)
if removals:
output_logger.info("Removing packages left in the target suite for smooth updates (%d):", len(removals))
log_and_format_old_libraries(self.output_logger, removals)
@ -1847,11 +1849,12 @@ class Britney(object):
target_suite = self.suite_info.target_suite
sources_t = target_suite.sources
binaries_t = target_suite.binaries
mi_factory = self._migration_item_factory
used = set(binaries_t[arch][binary].source
for arch in binaries_t
for binary in binaries_t[arch]
)
removals = [MigrationItem("-%s/%s" % (source, sources_t[source].version))
removals = [mi_factory.parse_item("-%s/%s" % (source, sources_t[source].version), auto_correct=False)
for source in sources_t if source not in used
]
if removals:
@ -1859,14 +1862,14 @@ class Britney(object):
self.do_all(actions=removals)
# smooth updates
removals = old_libraries(self.suite_info, self.options.outofsync_arches)
removals = old_libraries(self._migration_item_factory, self.suite_info, self.options.outofsync_arches)
if self.options.smooth_updates:
self.logger.info("> Removing old packages left in the target suite from smooth updates")
if removals:
output_logger.info("Removing packages left in the target suite for smooth updates (%d):", len(removals))
log_and_format_old_libraries(self.output_logger, removals)
self.do_all(actions=removals)
removals = old_libraries(self.suite_info, self.options.outofsync_arches)
removals = old_libraries(self._migration_item_factory, self.suite_info, self.options.outofsync_arches)
else:
self.logger.info("> Not removing old packages left in the target suite from smooth updates"
" (smooth-updates disabled)")
@ -1951,9 +1954,9 @@ class Britney(object):
break
# run a hint
elif user_input and user_input[0] in ('easy', 'hint', 'force-hint'):
mi_factory = self._migration_item_factory
try:
self.do_hint(user_input[0], 'hint-tester',
[k.rsplit("/", 1) for k in user_input[1:] if "/" in k])
self.do_hint(user_input[0], 'hint-tester', mi_factory.parse_items(user_input[1:]))
self.printuninstchange()
except KeyboardInterrupt:
continue
@ -1975,21 +1978,17 @@ class Britney(object):
"""
output_logger = self.output_logger
if isinstance(pkgvers[0], tuple) or isinstance(pkgvers[0], list):
_pkgvers = [ MigrationItem('%s/%s' % (p, v)) for (p,v) in pkgvers ]
else:
_pkgvers = pkgvers
suites = self.suite_info
self.logger.info("> Processing '%s' hint from %s", hinttype, who)
output_logger.info("Trying %s from %s: %s", hinttype, who,
" ".join("%s/%s" % (x.uvname, x.version) for x in _pkgvers)
" ".join("%s/%s" % (x.uvname, x.version) for x in pkgvers)
)
issues = []
# loop on the requested packages and versions
for idx in range(len(_pkgvers)):
pkg = _pkgvers[idx]
for idx in range(len(pkgvers)):
pkg = pkgvers[idx]
# skip removal requests
if pkg.is_removal:
continue
@ -2003,7 +2002,7 @@ class Britney(object):
pkg.version) == 0:
suite = s
pkg.suite = s
_pkgvers[idx] = pkg
pkgvers[idx] = pkg
break
if suite.suite_class.is_additional_source:
@ -2022,7 +2021,7 @@ class Britney(object):
output_logger.warning("%s: Not using hint", ", ".join(issues))
return False
self.do_all(hinttype, _pkgvers)
self.do_all(hinttype, pkgvers)
return True
def get_auto_hinter_hints(self, upgrade_me):
@ -2111,9 +2110,11 @@ class Britney(object):
return [ candidates, mincands ]
def run_auto_hinter(self):
mi_factory = self._migration_item_factory
for l in self.get_auto_hinter_hints(self.upgrade_me):
for hint in l:
self.do_hint("easy", "autohinter", [ MigrationItem("%s/%s" % (x[0], x[1])) for x in sorted(hint) ])
self.do_hint("easy", "autohinter", [mi_factory.parse_item("%s/%s" % (x[0], x[1]), auto_correct=False)
for x in sorted(hint)])
def nuninst_arch_report(self, nuninst, arch):
"""Print a report of uninstallable packages for one architecture."""

@ -16,8 +16,6 @@ import logging
from itertools import chain
from britney2.migrationitem import MigrationItem
class MalformedHintException(Exception):
pass
@ -58,11 +56,6 @@ class Hint(object):
self._type = hint_type
self._packages = packages
if isinstance(self._packages, str):
self._packages = self._packages.split(' ')
self._packages = [MigrationItem(x) for x in self._packages]
self.check()
def check(self):
@ -122,21 +115,22 @@ class Hint(object):
return None
def split_into_one_hint_per_package(hints, who, hint_name, *args):
for package in args:
hints.add_hint(Hint(who, hint_name, package))
def split_into_one_hint_per_package(mi_factory, hints, who, hint_name, *args):
for item in mi_factory.parse_items(*args):
hints.add_hint(Hint(who, hint_name, [item]))
def single_hint_taking_list_of_packages(hints, who, hint_type, *args):
hints.add_hint(Hint(who, hint_type, args))
def single_hint_taking_list_of_packages(mi_factory, hints, who, hint_type, *args):
hints.add_hint(Hint(who, hint_type, mi_factory.parse_items(*args)))
class HintParser(object):
def __init__(self):
def __init__(self, mi_factory):
logger_name = ".".join((self.__class__.__module__, self.__class__.__name__))
self.logger = logging.getLogger(logger_name)
self.hints = HintCollection()
self.mi_factory = mi_factory
self._hint_table = {
'remark': (0, lambda *x: None),
@ -205,6 +199,7 @@ class HintParser(object):
line_no = 0
hints = self.hints
aliases = self._aliases
mi_factory = self.mi_factory
for line in lines:
line = line.strip()
line_no += 1
@ -231,7 +226,7 @@ class HintParser(object):
filename, line_no, min_args, len(l) - 1)
continue
try:
hint_parser_impl(hints, who, *l)
hint_parser_impl(mi_factory, hints, who, *l)
except MalformedHintException as e:
self.logger.warning("Malformed hint found in %s (line %d): \"%s\"", filename, line_no, e.args[0])
continue

@ -12,6 +12,9 @@
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
import logging
from britney2 import SuiteClass
class MigrationItem(object):
_architectures = []
@ -33,7 +36,7 @@ class MigrationItem(object):
def get_suites(cls):
return cls._suites
def __init__(self, name=None, versionned=True):
def __init__(self, name=None, versionned=True, package=None, version=None, architecture=None, uvname=None, suite=None):
self._name = None
self._uvname = None
self._package = None
@ -44,6 +47,16 @@ class MigrationItem(object):
if name:
self.name = name
else:
self._uvname = uvname
self._package = package
self._version = version
self._architecture = architecture
self._suite = suite
if version is not None:
self._name = "%s/%s" % (uvname, version)
else:
self._name = uvname
def __str__(self):
if self._versionned and self.version is not None:
@ -161,6 +174,101 @@ class MigrationItem(object):
return self._uvname
class UnversionnedMigrationItem(MigrationItem):
def __init__(self, name=None):
super().__init__(name=name, versionned=False)
class MigrationItemFactory(object):
def __init__(self, suites):
self._suites = suites
self._all_architectures = frozenset(suites.target_suite.binaries)
logger_name = ".".join((self.__class__.__module__, self.__class__.__name__))
self.logger = logging.getLogger(logger_name)
def generate_removal_for_cruft_item(self, pkg_id):
uvname = "-%s/%s" % (pkg_id.package_name, pkg_id.architecture)
return MigrationItem(package=pkg_id.package_name,
version=pkg_id.version,
architecture=pkg_id.architecture,
uvname=uvname,
suite=self._suites.target_suite
)
def parse_item(self, item_text, versioned=True, auto_correct=True):
"""
:param item_text: The string describing the item (e.g. "glibc/2.5")
:param versioned: If true, a two-part item is assumed to be versioned.
otherwise, it is assumed to be versionless. This determines how
items like "foo/bar" is parsed (if versioned, "bar" is assumed to
be a version and otherwise "bar" is assumed to be an architecture).
If in doubt, use versioned=True with auto_correct=True and the
code will figure it out on its own.
:param auto_correct: If True, minor issues are automatically fixed
where possible. This includes handling architecture and version
being in the wrong order and missing/omitting a suite reference
for items. This feature is useful for migration items provided
by humans (e.g. via hints) to avoid rejecting the input over
trivial/minor issues with the input.
When False, there will be no attempt to correct the migration
input.
:return: A MigrationItem matching the spec
"""
suites = self._suites
version = None
architecture = None
is_removal = False
if item_text.startswith('-'):
item_text = item_text[1:]
is_removal = True
parts = item_text.split('/', 3)
package_name = parts[0]
suite_name = suites.primary_source_suite.name
if '_' in package_name:
package_name, suite_name = package_name.split('_', 2)
if len(parts) == 3:
architecture = parts[1]
version = parts[2]
elif len(parts) == 2:
if versioned:
version = parts[1]
else:
architecture = parts[1]
if auto_correct and version in self._all_architectures:
(architecture, version) = (version, architecture)
if architecture is None:
architecture = 'source'
if '_' in architecture:
architecture, suite_name = architecture.split('_', 2)
if is_removal:
suite = suites.target_suite
else:
suite = suites.by_name_or_alias[suite_name]
assert suite.suite_class != SuiteClass.TARGET_SUITE
uvname = self._canonicalise_uvname(item_text, package_name, architecture, suite, is_removal)
return MigrationItem(package=package_name,
version=version,
architecture=architecture,
uvname=uvname,
suite=suite,
)
def parse_items(self, *args, **kwargs):
return [self.parse_item(x, **kwargs) for x in args]
@staticmethod
def _canonicalise_uvname(item_text_sans_removal, package, architecture, suite, is_removal):
parts = item_text_sans_removal.split('/', 3)
if len(parts) == 1 or architecture == 'source':
uvname = package
else:
uvname = "%s/%s" % (package, architecture)
if suite.suite_class.is_additional_source:
uvname = '%s_%s' % (uvname, suite.suite_short_name)
if is_removal:
uvname = '-%s' % (uvname)
return uvname

@ -135,9 +135,9 @@ class IgnoreRCBugHint(SimplePolicyHint):
def simple_policy_hint_parser_function(class_name, converter):
def f(hints, who, hint_name, policy_parameter, *args):
for package in args:
hints.add_hint(class_name(who, hint_name, converter(policy_parameter), package))
def f(mi_factory, hints, who, hint_name, policy_parameter, *args):
for item in mi_factory.parse_items(*args):
hints.add_hint(class_name(who, hint_name, converter(policy_parameter), [item]))
return f

@ -39,9 +39,9 @@ from britney2.consts import (VERSION, PROVIDES, DEPENDS, CONFLICTS,
ARCHITECTURE, SECTION,
SOURCE, MAINTAINER, MULTIARCH,
ESSENTIAL)
from britney2.migrationitem import MigrationItem, UnversionnedMigrationItem
from britney2.policies import PolicyVerdict
class MigrationConstraintException(Exception):
pass
@ -283,17 +283,6 @@ def write_heidi_delta(filename, all_selected):
item.version, item.architecture))
def make_migrationitem(package, suite_info):
"""Convert a textual package specification to a MigrationItem
sources is a list of source packages in each suite, used to determine
the version which should be used for the MigrationItem.
"""
item = UnversionnedMigrationItem(package)
return MigrationItem("%s/%s" % (item.uvname, suite_info[item.suite.name].sources[item.package].version))
def write_excuses(excuselist, dest_file, output_format="yaml"):
"""Write the excuses to dest_file
@ -410,7 +399,7 @@ def write_controlfiles(target_suite):
write_sources(sources_s, os.path.join(basedir, 'Sources'))
def old_libraries(suite_info, outofsync_arches=frozenset()):
def old_libraries(mi_factory, suite_info, outofsync_arches=frozenset()):
"""Detect old libraries left in the target suite for smooth transitions
This method detects old libraries which are in the target suite but no
@ -431,8 +420,7 @@ def old_libraries(suite_info, outofsync_arches=frozenset()):
pkg = binaries_t[arch][pkg_name]
if sources_t[pkg.source].version != pkg.source_version and \
(arch not in outofsync_arches or pkg_name not in binaries_s[arch]):
migration = "-" + "/".join((pkg_name, arch, pkg.source_version))
removals.append(MigrationItem(migration))
removals.append(mi_factory.generate_removal_for_cruft_item(pkg.pkg_id))
return removals

@ -2,7 +2,7 @@ import unittest
from britney2 import Suite, Suites, SuiteClass
from britney2.hints import HintParser, single_hint_taking_list_of_packages
from britney2.migrationitem import MigrationItem
from britney2.migrationitem import MigrationItemFactory
from . import HINTS_ALL, TEST_HINTER
@ -12,11 +12,9 @@ SUITES = Suites(
)
MigrationItem.set_suites(SUITES)
def new_hint_parser():
return HintParser()
mi_factory = MigrationItemFactory(SUITES)
return HintParser(mi_factory)
def parse_should_not_call_this_function(*args, **kwargs):

@ -6,7 +6,7 @@ import unittest
from britney2 import Suites, Suite, SuiteClass, SourcePackage, BinaryPackageId, BinaryPackage
from britney2.excuse import Excuse
from britney2.hints import HintParser
from britney2.migrationitem import MigrationItem
from britney2.migrationitem import MigrationItemFactory
from britney2.policies.policy import AgePolicy, RCBugPolicy, PiupartsPolicy, PolicyVerdict
from britney2.policies.autopkgtest import AutopkgtestPolicy
@ -43,10 +43,10 @@ def initialize_policy(test_name, policy_class, *args, **kwargs):
Suite(SuiteClass.TARGET_SUITE, target, os.path.join(test_dir, target), ''),
[Suite(SuiteClass.PRIMARY_SOURCE_SUITE, 'unstable', os.path.join(test_dir, 'unstable'), '')],
)
MigrationItem.set_suites(suite_info)
mi_factory = MigrationItemFactory(suite_info)
policy = policy_class(options, suite_info, *args)
fake_britney = MockObject(log=lambda x, y='I': None)
hint_parser = HintParser()
hint_parser = HintParser(mi_factory)
policy.initialise(fake_britney)
policy.register_hints(hint_parser)
hint_parser.parse_hints(TEST_HINTER, HINTS_ALL, 'test-%s' % test_name, hints)

Loading…
Cancel
Save