#! /usr/bin/env python2.7 from __future__ import print_function import atexit import bz2 from collections import namedtuple import optparse import os import re import shutil import subprocess import tempfile try: from urllib.parse import unquote except ImportError: from urllib import unquote import apt_pkg from launchpadlib.launchpad import Launchpad import lzma import requests # from dak, more or less re_no_epoch = re.compile(r"^\d+:") re_strip_revision = re.compile(r"-[^-]+$") re_changelog_versions = re.compile(r"^\w[-+0-9a-z.]+ \(([^\(\) \t]+)\)") default_mirrors = ":".join([ '/home/ubuntu-archive/mirror/ubuntu', '/srv/archive.ubuntu.com/ubuntu', ]) tempdir = None series_by_name = {} def ensure_tempdir(): global tempdir if not tempdir: tempdir = tempfile.mkdtemp(prefix='copy-report') atexit.register(shutil.rmtree, tempdir) def decompress_open(tagfile): if tagfile.startswith('http:') or tagfile.startswith('ftp:'): ensure_tempdir() response = requests.get(tagfile, stream=True) if response.status_code == 404: response.close() tagfile = tagfile.replace('.xz', '.bz2') response = requests.get(tagfile, stream=True) response.raise_for_status() if '.' in tagfile: suffix = '.' + tagfile.rsplit('.', 1)[1] else: suffix = '' fd, tagfile = tempfile.mkstemp(suffix=suffix, dir=tempdir) with os.fdopen(fd, 'wb') as f: f.write(response.raw.read()) response.close() elif not os.path.exists(tagfile): tagfile = tagfile.replace('.xz', '.bz2') if tagfile.endswith('.xz'): decompressor = lzma.LZMAFile elif tagfile.endswith('.bz2'): decompressor = bz2.BZ2File else: decompressor = None if decompressor is not None: fd, decompressed = tempfile.mkstemp(dir=tempdir) dcf = decompressor(tagfile) try: with os.fdopen(fd, 'wb') as f: f.write(dcf.read()) finally: dcf.close() return open(decompressed, 'rb') else: return open(tagfile, 'rb') Section = namedtuple("Section", ["version", "directory", "files"]) def tagfiletodict(tagfile): suite = {} for section in apt_pkg.TagFile(decompress_open(tagfile)): files = [s.strip().split()[2] for s in section["Files"].split('\n')] suite[section["Package"]] = Section( version=section["Version"], directory=section["Directory"], files=files) return suite def find_dsc(options, pkg, section): dsc_filename = [s for s in section.files if s.endswith('.dsc')][0] for mirror in options.mirrors: path = '%s/%s/%s' % (mirror, section.directory, dsc_filename) if os.path.exists(path): yield path ensure_tempdir() spph = options.archive.getPublishedSources( source_name=pkg, version=section.version, exact_match=True)[0] outdir = tempfile.mkdtemp(dir=tempdir) filenames = [] for url in spph.sourceFileUrls(): filename = os.path.join(outdir, unquote(os.path.basename(url))) response = requests.get(url, stream=True) response.raise_for_status() with open(filename, 'wb') as f: f.write(response.raw.read()) response.close() filenames.append(filename) yield [s for s in filenames if s.endswith('.dsc')][0] class BrokenSourcePackage(Exception): pass def get_changelog_versions(pkg, dsc, version): ensure_tempdir() upstream_version = re_no_epoch.sub('', version) upstream_version = re_strip_revision.sub('', upstream_version) with open(os.devnull, 'w') as devnull: ret = subprocess.call( ['dpkg-source', '-q', '--no-check', '-sn', '-x', dsc], stdout=devnull, cwd=tempdir) # It's in the archive, so these assertions must hold. if ret != 0: raise BrokenSourcePackage(dsc) unpacked = '%s/%s-%s' % (tempdir, pkg, upstream_version) assert os.path.isdir(unpacked) changelog_path = '%s/debian/changelog' % unpacked assert os.path.exists(changelog_path) with open(changelog_path) as changelog: versions = set() for line in changelog: m = re_changelog_versions.match(line) if m: versions.add(m.group(1)) shutil.rmtree(unpacked) return versions def descended_from(options, pkg, section1, section2): if apt_pkg.version_compare(section1.version, section2.version) <= 0: return False exception = None for dsc in find_dsc(options, pkg, section1): try: versions = get_changelog_versions(pkg, dsc, section1.version) except BrokenSourcePackage as e: exception = e continue return section1.version in versions raise exception Candidate = namedtuple( "Candidate", ["package", "suite1", "suite2", "version1", "version2"]) def get_series(options, name): if name not in series_by_name: series_by_name[name] = options.distro.getSeries(name_or_version=name) return series_by_name[name] def already_copied(options, candidate): if "-" in candidate.suite2: series, pocket = candidate.suite2.split("-", 1) pocket = pocket.title() else: series = candidate.suite2 pocket = "Release" series = get_series(options, series) pubs = options.archive.getPublishedSources( source_name=candidate.package, version=candidate.version1, exact_match=True, distro_series=series, pocket=pocket) for pub in pubs: if pub.status in ("Pending", "Published"): return True return False def copy(options, candidate): if "-" in candidate.suite2: to_series, to_pocket = candidate.suite2.split("-", 1) to_pocket = to_pocket.title() else: to_series = candidate.suite2 to_pocket = "Release" options.archive.copyPackage( source_name=candidate.package, version=candidate.version1, from_archive=options.archive, to_pocket=to_pocket, to_series=to_series, include_binaries=True, auto_approve=True) def candidate_string(candidate): string = ('copy-package -y -b -s %s --to-suite %s -e %s %s' % (candidate.suite1, candidate.suite2, candidate.version1, candidate.package)) if candidate.version2 is not None: string += ' # %s: %s' % (candidate.suite2, candidate.version2) return string def main(): apt_pkg.init_system() parser = optparse.OptionParser(usage="usage: %prog [options] [suites]") parser.add_option( "-l", "--launchpad", dest="launchpad_instance", default="production") parser.add_option( "--quick", action="store_true", help="don't examine changelogs") parser.add_option( "--copy-safe", action="store_true", help="automatically copy safe candidates") parser.add_option( "--mirrors", default=default_mirrors, help="colon-separated list of local mirrors") options, args = parser.parse_args() options.launchpad = Launchpad.login_with( "copy-report", options.launchpad_instance, version="devel") options.distro = options.launchpad.distributions["ubuntu"] options.archive = options.distro.main_archive options.mirrors = options.mirrors.split(":") if args: suites = args else: suites = reversed([ series.name for series in options.launchpad.distributions["ubuntu"].series if series.status in ("Supported", "Current Stable Release")]) yes = [] maybe = [] no = [] for suite in suites: for component in 'main', 'restricted', 'universe', 'multiverse': tagfile1 = '%s/dists/%s-security/%s/source/Sources.xz' % ( options.mirrors[0], suite, component) tagfile2 = '%s/dists/%s-updates/%s/source/Sources.xz' % ( options.mirrors[0], suite, component) name1 = '%s-security' % suite name2 = '%s-updates' % suite suite1 = tagfiletodict(tagfile1) suite2 = tagfiletodict(tagfile2) for package in sorted(suite1): section1 = suite1[package] section2 = suite2.get(package) if (section2 is None or (not options.quick and descended_from(options, package, section1, section2))): candidate = Candidate( package=package, suite1=name1, suite2=name2, version1=section1.version, version2=None) if not already_copied(options, candidate): yes.append(candidate) elif apt_pkg.version_compare( section1.version, section2.version) > 0: candidate = Candidate( package=package, suite1=name1, suite2=name2, version1=section1.version, version2=section2.version) if already_copied(options, candidate): pass elif not options.quick: no.append(candidate) else: maybe.append(candidate) if yes: print("The following packages can be copied safely:") print("--------------------------------------------") print() for candidate in yes: print(candidate_string(candidate)) print() if options.copy_safe: for candidate in yes: copy(options, candidate) if maybe: print("Check that these packages are descendants before copying:") print("---------------------------------------------------------") print() for candidate in maybe: print('#%s' % candidate_string(candidate)) print() if no: print("The following packages need to be merged by hand:") print("-------------------------------------------------") print() for candidate in no: print('#%s' % candidate_string(candidate)) print() if __name__ == '__main__': main()