#! /usr/bin/python2.7 # Copyright (C) 2009, 2010, 2011, 2012 Canonical Ltd. # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; version 3 of the License. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . """Apply suitable overrides to new kernel binaries, matching previous ones.""" from __future__ import print_function import atexit from collections import defaultdict from contextlib import closing import gzip from optparse import OptionParser, Values import os import shutil import sys import tempfile try: from urllib.request import urlopen except ImportError: from urllib2 import urlopen import apt_pkg from launchpadlib.launchpad import Launchpad from lazr.restfulclient.errors import ServerError from ubuntutools.question import YesNoQuestion import lputils CONSUMER_KEY = "kernel-overrides" tempdir = None def ensure_tempdir(): global tempdir if not tempdir: tempdir = tempfile.mkdtemp(prefix='kernel-overrides') atexit.register(shutil.rmtree, tempdir) class FakeBPPH: def __init__(self, pkg, component, das): self.binary_package_name = pkg self.component_name = component self.distro_arch_series = das def get_published_binaries(options, source): """If getPublishedBinaries times out, fall back to doing it by hand.""" try: for binary in source.getPublishedBinaries(): if not binary.is_debug: yield binary except ServerError as e: if e.response.status != 503: raise print("getPublishedBinaries timed out; fetching Packages instead ...") ensure_tempdir() for section_name in ("", "debian-installer"): for component in ("main", "restricted", "universe", "multiverse"): for das in options.old.series.architectures: arch = das.architecture_tag if arch in ("amd64", "i386"): base = "http://archive.ubuntu.com/ubuntu" else: base = "http://ports.ubuntu.com/ubuntu-ports" url = ("%s/dists/%s/%s%s/binary-%s/Packages.gz" % (base, options.old.suite, component, "/%s" % section_name if section_name else "", arch)) path = os.path.join( tempdir, "Ubuntu_%s_%s%s_Packages_%s" % (options.old.suite, component, "_%s" % section_name if section_name else "", arch)) with closing(urlopen(url)) as url_file: with open("%s.gz" % path, "wb") as comp_file: comp_file.write(url_file.read()) with closing(gzip.GzipFile("%s.gz" % path)) as gz_file: with open(path, "wb") as out_file: out_file.write(gz_file.read()) with open(path) as packages_file: apt_packages = apt_pkg.TagFile(packages_file) for section in apt_packages: pkg = section["Package"] src = section.get("Source", pkg).split(" ", 1)[0] if src != options.source: continue yield FakeBPPH(pkg, component, das) def find_current_binaries(options): print("Checking existing binaries in %s ..." % options.old.suite, file=sys.stderr) sources = options.old.archive.getPublishedSources( source_name=options.source, distro_series=options.old.series, pocket=options.old.pocket, exact_match=True, status="Published") for source in sources: binaries = defaultdict(dict) for binary in get_published_binaries(options, source): print(".", end="") sys.stdout.flush() arch = binary.distro_arch_series.architecture_tag name = binary.binary_package_name component = binary.component_name if name not in binaries[arch]: binaries[arch][name] = component if binaries: print() return binaries print() return [] def find_matching_uploads(options, newabi): print("Checking %s uploads to %s ..." % (options.queue.lower(), options.suite), file=sys.stderr) uploads = options.series.getPackageUploads( name=options.source, exact_match=True, archive=options.archive, pocket=options.pocket, status=options.queue) for upload in uploads: if upload.contains_build: # display_name is inaccurate for the theoretical case of an # upload containing multiple builds, but in practice it's close # enough. source = upload.display_name.split(",")[0] if source == options.source: binaries = upload.getBinaryProperties() binaries = [b for b in binaries if "customformat" not in b] if [b for b in binaries if newabi in b["version"]]: yield upload, binaries def equal_except_abi(old, new, abi): """Are OLD and NEW the same package name aside from ABI?""" # Make sure new always contains the ABI. if abi in old: old, new = new, old if abi not in new: return False left, _, right = new.partition(abi) if not old.startswith(left) or not old.endswith(right): return False old_abi = old[len(left):] if right: old_abi = old_abi[:-len(right)] return old_abi[0].isdigit() and old_abi[-1].isdigit() def apply_kernel_overrides(options, newabi): current_binaries = find_current_binaries(options) all_changes = [] for upload, binaries in find_matching_uploads(options, newabi): print("%s/%s (%s):" % (upload.package_name, upload.package_version, upload.display_arches.split(",")[0])) changes = [] for binary in binaries: if binary["architecture"] not in current_binaries: continue current_binaries_arch = current_binaries[binary["architecture"]] for name, component in current_binaries_arch.items(): if (binary["component"] != component and equal_except_abi(name, binary["name"], newabi)): print("\t%s: %s -> %s" % (binary["name"], binary["component"], component)) changes.append( {"name": binary["name"], "component": component}) if changes: all_changes.append((upload, changes)) if all_changes: if options.dry_run: print("Dry run; no changes made.") else: if not options.confirm_all: if YesNoQuestion().ask("Override", "no") == "no": return for upload, changes in all_changes: upload.overrideBinaries(changes=changes) def main(): parser = OptionParser(usage="usage: %prog [options] NEW-ABI") parser.add_option( "-l", "--launchpad", dest="launchpad_instance", default="production") parser.add_option( "-d", "--distribution", metavar="DISTRO", default="ubuntu", help="look in distribution DISTRO") parser.add_option( "-S", "--suite", metavar="SUITE", help="look in suite SUITE (default: -proposed)") parser.add_option( "--old-suite", metavar="SUITE", help="look for previous binaries in suite SUITE " "(default: value of --suite without -proposed)") parser.add_option( "-s", "--source", metavar="SOURCE", default="linux", help="operate on source package SOURCE") parser.add_option( "-Q", "--queue", metavar="QUEUE", default="new", help="consider packages in QUEUE") parser.add_option( "-n", "--dry-run", default=False, action="store_true", help="don't make any modifications") parser.add_option( "-y", "--confirm-all", default=False, action="store_true", help="do not ask for confirmation") options, args = parser.parse_args() if len(args) != 1: parser.error("must supply NEW-ABI") newabi = args[0] options.launchpad = Launchpad.login_with( CONSUMER_KEY, options.launchpad_instance, version="devel") if options.suite is None: distribution = options.launchpad.distributions[options.distribution] options.suite = "%s-proposed" % distribution.current_series.name if options.old_suite is None: options.old_suite = options.suite if options.old_suite.endswith("-proposed"): options.old_suite = options.old_suite[:-9] options.queue = options.queue.title() options.version = None lputils.setup_location(options) options.old = Values() options.old.launchpad = options.launchpad options.old.distribution = options.distribution options.old.suite = options.old_suite lputils.setup_location(options.old) apply_kernel_overrides(options, newabi) if __name__ == '__main__': main()