#! /usr/bin/python3 # Copyright 2012 Canonical Ltd. # Author: Colin Watson # Based loosely but rather distantly on Launchpad's sync-source.py. # TODO: This should share more code with syncpackage. # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA """Sync all packages without Ubuntu-specific modifications from Debian.""" from __future__ import print_function import atexit from contextlib import closing import errno import fnmatch from functools import cmp_to_key import gzip from optparse import OptionParser, Values import os import re import shutil import ssl import subprocess import sys import tempfile import time try: from urllib.error import HTTPError from urllib.request import urlopen except ImportError: from urllib2 import HTTPError, urlopen import apt_pkg from debian import deb822 from launchpadlib.launchpad import Launchpad from lazr.restfulclient.errors import ServerError from ubuntutools.archive import DownloadError, SourcePackage import lputils CONSUMER_KEY = "auto-sync" default_suite = { # TODO: map from unstable "debian": "sid", } class Percentages: """Helper to compute percentage ratios compared to a fixed total.""" def __init__(self, total): self.total = total def get_ratio(self, number): """Report the ratio of `number` to `self.total`, as a percentage.""" return (float(number) / self.total) * 100 def read_blacklist(url): """Parse resource at given URL as a 'blacklist'. Format: {{{ # [comment] # [comment] }}} Return a list of patterns (fnmatch-style) matching blacklisted source package names. Return an empty list if the given URL doesn't exist. """ # TODO: The blacklist should migrate into LP, at which point this # function will be unnecessary. blacklist = [] try: with closing(urlopen(url)) as url_file: for line in url_file: try: line = line[:line.index(b'#')] except ValueError: pass line = line.strip() if not line: continue blacklist.append(line.decode('utf-8')) pass except HTTPError as e: if e.code != 404: raise return blacklist def is_blacklisted(blacklist, src): for pattern in blacklist: if fnmatch.fnmatch(src, pattern): return True return False tempdir = None def ensure_tempdir(): global tempdir if not tempdir: tempdir = tempfile.mkdtemp(prefix='auto-sync') atexit.register(shutil.rmtree, tempdir) def read_ubuntu_sources(options): """Read information from the Ubuntu Sources files. Returns a sequence of: * a mapping of source package names to versions * a mapping of binary package names to (source, version) tuples """ if options.target.distribution.name != 'ubuntu': return print("Reading Ubuntu sources ...") source_map = {} binary_map = {} ensure_tempdir() suites = [options.target.suite] if options.target.pocket != "Release": suites.insert(0, options.target.series.name) for suite in suites: for component in ("main", "restricted", "universe", "multiverse"): url = ("http://archive.ubuntu.com/ubuntu/dists/%s/%s/source/" "Sources.gz" % (suite, component)) sources_path = os.path.join( tempdir, "Ubuntu_%s_%s_Sources" % (suite, component)) with closing(urlopen(url)) as url_file: with open("%s.gz" % sources_path, "wb") as comp_file: comp_file.write(url_file.read()) with closing(gzip.GzipFile("%s.gz" % sources_path)) as gz_file: with open(sources_path, "wb") as out_file: out_file.write(gz_file.read()) with open(sources_path) as sources_file: apt_sources = apt_pkg.TagFile(sources_file) for section in apt_sources: src = section["Package"] ver = section["Version"] if (src not in source_map or apt_pkg.version_compare(source_map[src], ver) < 0): source_map[src] = ver binaries = apt_pkg.parse_depends( section.get("Binary", src)) for pkg in [b[0][0] for b in binaries]: if (pkg not in binary_map or apt_pkg.version_compare( binary_map[pkg][1], ver) < 0): binary_map[pkg] = (src, ver) return source_map, binary_map def read_debian_sources(options): """Read information from the Debian Sources files. Returns a mapping of source package names to (version, set of architectures) tuples. """ if options.source.distribution.name != 'debian': return print("Reading Debian sources ...") source_map = {} ensure_tempdir() for component in ("main", "contrib", "non-free"): url = ("http://ftp.debian.org/debian/dists/%s/%s/source/" "Sources.gz" % (options.source.suite, component)) sources_path = os.path.join( tempdir, "Debian_%s_%s_Sources" % (options.source.suite, component)) with closing(urlopen(url)) as url_file: with open("%s.gz" % sources_path, "wb") as compressed_file: compressed_file.write(url_file.read()) with closing(gzip.GzipFile("%s.gz" % sources_path)) as gz_file: with open(sources_path, "wb") as out_file: out_file.write(gz_file.read()) with open(sources_path) as sources_file: apt_sources = apt_pkg.TagFile(sources_file) for section in apt_sources: src = section["Package"] ver = section["Version"] if (src not in source_map or apt_pkg.version_compare(source_map[src][0], ver) < 0): source_map[src] = ( ver, set(section.get("Architecture", "").split())) return source_map def read_new_queue(options): """Return the set of packages already in the NEW queue.""" new_queue = options.target.series.getPackageUploads( archive=options.target.archive, status="New") return set([pu.package_name for pu in new_queue if pu.contains_source or pu.contains_copy]) def question(options, message, choices, default): choices = "/".join([c.upper() if c == default else c for c in choices]) if options.batch: print("%s (%s)? %s" % (message, choices, default.lower())) return default.lower() else: sys.stdout.write("%s (%s)? " % (message, choices)) sys.stdout.flush() return sys.stdin.readline().rstrip().lower() def filter_pockets(spphs): """Filter SourcePackagePublishingHistory entries to useful pockets.""" return [spph for spph in spphs if spph.pocket in ("Release", "Proposed")] def version_sort_spphs(spphs): """Sort a list of SourcePackagePublishingHistory entries by version. We return the list in reversed form (highest version first), since that's what the consumers of this function prefer anyway. """ def version_compare(x, y): return apt_pkg.version_compare( x.source_package_version, y.source_package_version) return sorted( spphs, key=cmp_to_key(version_compare), reverse=True) class FakeDifference: """A partial stub for DistroSeriesDifference. Used when the destination series was initialised with a different parent, so we don't get real DSDs. """ def __init__(self, options, src, ver): self.options = options self.status = "Needs attention" self.sourcepackagename = src self.source_version = ver self.real_parent_source_version = None self.fetched_parent_source_version = False @property def parent_source_version(self): """The version in the parent series. We can't take this directly from read_debian_sources, since we need the version imported into Launchpad and Launchpad may be behind; so we have to call Archive.getPublishedSources to find this out. As such, this is expensive, so we only do it when necessary. """ if not self.fetched_parent_source_version: spphs = self.options.source.archive.getPublishedSources( distro_series=self.options.source.series, pocket=self.options.source.pocket, source_name=self.sourcepackagename, exact_match=True, status="Published") spphs = version_sort_spphs(spphs) if spphs: self.real_parent_source_version = \ spphs[0].source_package_version self.fetched_parent_source_version = True return self.real_parent_source_version def get_differences(options, ubuntu_sources, debian_sources): # DSDs are not quite sufficiently reliable for us to use them here, # regardless of the parent series. See: # https://bugs.launchpad.net/launchpad/+bug/1003969 # Also, how would this work with non-Release pockets? # if options.source.series in options.target.series.getParentSeries(): if False: for status in ( "Needs attention", "Blacklisted current version", "Blacklisted always", ): for difference in options.target.series.getDifferencesTo( parent_series=options.source.series, status=status): yield difference else: # Hack around missing DSDs if the series was initialised with a # different parent. for src in sorted(debian_sources): if (src not in ubuntu_sources or apt_pkg.version_compare( ubuntu_sources[src], debian_sources[src][0]) < 0): yield FakeDifference(options, src, ubuntu_sources.get(src)) def published_in_source_series(options, difference): # Oddly, sometimes packages seem to show up as a difference without # actually being published in options.source.series. Filter those out. src = difference.sourcepackagename from_version = difference.parent_source_version from_src = options.source.archive.getPublishedSources( distro_series=options.source.series, pocket=options.source.pocket, source_name=src, version=from_version, exact_match=True, status="Published") if not from_src: if options.verbose: print( "No published sources for %s_%s in %s/%s?" % ( src, from_version, options.source.distribution.display_name, options.source.suite), file=sys.stderr) return False else: return True def already_in_target_series(options, difference): # The published Sources files may be out of date, and if we're # particularly unlucky with timing relative to a proposed-migration run # it's possible for them to miss something that's in the process of # being moved between pockets. To make sure, check whether an equal or # higher version has already been removed from the destination archive. src = difference.sourcepackagename from_version = difference.parent_source_version to_src = version_sort_spphs(filter_pockets( options.target.archive.getPublishedSources( distro_series=options.target.series, source_name=src, exact_match=True))) if (to_src and apt_pkg.version_compare( from_version, to_src[0].source_package_version) <= 0): return True else: return False def architectures_allowed(dsc, target): """Return True if the architecture set dsc is compatible with target.""" if dsc == set(["all"]): return True for dsc_arch in dsc: for target_arch in target: command = [ "dpkg-architecture", "-a%s" % target_arch, "-i%s" % dsc_arch] env = dict(os.environ) env["CC"] = "true" if subprocess.call(command, env=env) == 0: return True return False def retry_errors(func): for retry_count in range(7): try: return func() except ssl.SSLError: pass except DownloadError as e: # These are unpleasantly difficult to parse, but we have little # choice since the exception object lacks useful metadata. code = None match = re.match(r".*?: (.*?) ", str(e)) if match is not None: try: code = int(match.group(1)) except ValueError: pass if code in (502, 503): time.sleep(int(2 ** (retry_count - 1))) else: raise def sync_one_difference(options, binary_map, difference, source_names): src = difference.sourcepackagename print(" * Trying to add %s ..." % src) # We use SourcePackage directly here to avoid having to hardcode Debian # and Ubuntu, and because we got the package list from # DistroSeries.getDifferencesTo() so we can guarantee that Launchpad # knows about all of them. from_srcpkg = SourcePackage( package=src, version=difference.parent_source_version, lp=options.launchpad) from_srcpkg.distribution = options.source.distribution.name retry_errors(from_srcpkg.pull_dsc) if difference.source_version is not None: # Check whether this will require a fakesync. to_srcpkg = SourcePackage( package=src, version=difference.source_version, lp=options.launchpad) to_srcpkg.distribution = options.target.distribution.name retry_errors(to_srcpkg.pull_dsc) if not from_srcpkg.dsc.compare_dsc(to_srcpkg.dsc): print("[Skipping (requires fakesync)] %s_%s (vs %s)" % ( src, difference.parent_source_version, difference.source_version)) return False from_binary = deb822.PkgRelation.parse_relations( from_srcpkg.dsc["binary"]) pkgs = [entry[0]["name"] for entry in from_binary] for pkg in pkgs: if pkg in binary_map: current_src, current_ver = binary_map[pkg] # TODO: Check that a non-main source package is not trying to # override a main binary package (we don't know binary # components yet). # Check that a source package is not trying to override an # Ubuntu-modified binary package. if "ubuntu" in current_ver: answer = question( options, "%s_%s is trying to override modified binary %s_%s. " "OK" % ( src, difference.parent_source_version, pkg, current_ver), "yn", "n") if answer != "y": return False print("I: %s -> %s_%s." % (src, pkg, current_ver)) source_names.append(src) return True failed_copy = None def copy_packages(options, source_names): global failed_copy if failed_copy is not None and len(source_names) >= failed_copy: source_names_left = source_names[:len(source_names) // 2] source_names_right = source_names[len(source_names) // 2:] copy_packages(options, source_names_left) copy_packages(options, source_names_right) return try: options.target.archive.copyPackages( source_names=source_names, from_archive=options.source.archive, from_series=options.source.series.name, to_series=options.target.series.name, to_pocket=options.target.pocket, include_binaries=False, sponsored=options.requestor, auto_approve=True, silent=True) except ServerError as e: if len(source_names) < 100: raise if e.response.status != 503: raise print("Cannot copy %d packages at once; bisecting ..." % len(source_names)) failed_copy = len(source_names) source_names_left = source_names[:len(source_names) // 2] source_names_right = source_names[len(source_names) // 2:] copy_packages(options, source_names_left) copy_packages(options, source_names_right) def sync_differences(options): stat_us = 0 stat_cant_update = 0 stat_updated = 0 stat_uptodate_modified = 0 stat_uptodate = 0 stat_count = 0 stat_blacklisted = 0 blacklist = read_blacklist( "http://people.canonical.com/~ubuntu-archive/sync-blacklist.txt") ubuntu_sources, binary_map = read_ubuntu_sources(options) debian_sources = read_debian_sources(options) new_queue = read_new_queue(options) print("Getting differences between %s/%s and %s/%s ..." % ( options.source.distribution.display_name, options.source.suite, options.target.distribution.display_name, options.target.suite)) new_differences = [] updated_source_names = [] new_source_names = [] seen_differences = set() for difference in get_differences(options, ubuntu_sources, debian_sources): status = difference.status if status == "Resolved": stat_uptodate += 1 continue stat_count += 1 src = difference.sourcepackagename if src in seen_differences: continue seen_differences.add(src) to_version = difference.source_version if to_version is None: src_ver = src else: src_ver = "%s_%s" % (src, to_version) src_is_blacklisted = is_blacklisted(blacklist, src) if src_is_blacklisted or status == "Blacklisted always": if options.verbose: if src_is_blacklisted: print("[BLACKLISTED] %s" % src_ver) else: comments = options.target.series.getDifferenceComments( source_package_name=src) if comments: print("""[BLACKLISTED] %s (%s: "%s")""" % ( src_ver, comments[-1].comment_author.name, comments[-1].body_text)) else: print("[BLACKLISTED] %s" % src_ver) stat_blacklisted += 1 # "Blacklisted current version" is supposed to mean that the version # in options.target.series is higher than that in # options.source.series. However, I've seen cases that suggest that # this status isn't necessarily always kept up to date properly. # Since we're perfectly capable of checking the versions for # ourselves anyway, let's just be cautious and check everything with # both plausible statuses. elif status in ("Needs attention", "Blacklisted current version"): from_version = difference.parent_source_version if from_version is None: if options.verbose: print("[Ubuntu Specific] %s" % src_ver) stat_us += 1 continue if to_version is None: if not published_in_source_series(options, difference): continue # Handle new packages at the end, since they require more # interaction. if options.new: new_differences.append(difference) continue elif options.new_only: stat_uptodate += 1 elif apt_pkg.version_compare(to_version, from_version) < 0: if "ubuntu" in to_version: if options.verbose: print("[NOT Updating - Modified] %s (vs %s)" % ( src_ver, from_version)) stat_cant_update += 1 else: if not published_in_source_series(options, difference): continue if already_in_target_series(options, difference): continue print("[Updating] %s (%s [%s] < %s [%s])" % ( src, to_version, options.target.distribution.display_name, from_version, options.source.distribution.display_name)) if sync_one_difference( options, binary_map, difference, updated_source_names): stat_updated += 1 else: stat_cant_update += 1 elif "ubuntu" in to_version: if options.verbose: print("[Nothing to update (Modified)] %s (vs %s)" % ( src_ver, from_version)) stat_uptodate_modified += 1 else: if options.verbose: print("[Nothing to update] %s (%s [%s] >= %s [%s])" % ( src, to_version, options.target.distribution.display_name, from_version, options.source.distribution.display_name)) stat_uptodate += 1 else: print("[Unknown status] %s (%s)" % (src_ver, status), file=sys.stderr) target_architectures = set( a.architecture_tag for a in options.target.architectures) for difference in new_differences: src = difference.sourcepackagename from_version = difference.parent_source_version if src in new_queue: print("[Skipping (already in NEW)] %s_%s" % (src, from_version)) continue if not architectures_allowed( debian_sources[src][1], target_architectures): if options.verbose: print( "[Skipping (not built on any target architecture)] %s_%s" % (src, from_version)) continue to_src = version_sort_spphs(filter_pockets( options.target.archive.getPublishedSources( source_name=src, exact_match=True))) if (to_src and apt_pkg.version_compare( from_version, to_src[0].source_package_version) <= 0): # Equal or higher version already removed from destination # distribution. continue print("[New] %s_%s" % (src, from_version)) if to_src: print("Previous publications in %s:" % options.target.distribution.display_name) for spph in to_src[:10]: desc = " %s (%s): %s" % ( spph.source_package_version, spph.distro_series.name, spph.status) if (spph.status == "Deleted" and spph.removed_by is not None and spph.removal_comment is not None): desc += " (removed by %s: %s)" % ( spph.removed_by.display_name, spph.removal_comment) print(desc) if len(to_src) > 10: history_url = "%s/+source/%s/+publishinghistory" % ( options.target.distribution.web_link, src) print(" ... plus %d more; see %s" % (len(to_src) - 10, history_url)) else: print("No previous publications in %s" % options.target.distribution.display_name) answer = question(options, "OK", "yn", "y") new_ok = (answer != "n") if new_ok: if sync_one_difference( options, binary_map, difference, new_source_names): stat_updated += 1 else: stat_cant_update += 1 else: stat_blacklisted += 1 percentages = Percentages(stat_count) print() print("Out-of-date BUT modified: %3d (%.2f%%)" % ( stat_cant_update, percentages.get_ratio(stat_cant_update))) print("Updated: %3d (%.2f%%)" % ( stat_updated, percentages.get_ratio(stat_updated))) print("Ubuntu Specific: %3d (%.2f%%)" % ( stat_us, percentages.get_ratio(stat_us))) print("Up-to-date [Modified]: %3d (%.2f%%)" % ( stat_uptodate_modified, percentages.get_ratio(stat_uptodate_modified))) print("Up-to-date: %3d (%.2f%%)" % ( stat_uptodate, percentages.get_ratio(stat_uptodate))) print("Blacklisted: %3d (%.2f%%)" % ( stat_blacklisted, percentages.get_ratio(stat_blacklisted))) print(" -----------") print("Total: %s" % stat_count) if updated_source_names + new_source_names: print() if updated_source_names: print("Updating: %s" % " ".join(updated_source_names)) if new_source_names: print("New: %s" % " ".join(new_source_names)) if options.dry_run: print("Not copying packages in dry-run mode.") else: answer = question(options, "OK", "yn", "y") if answer != "n": copy_packages(options, updated_source_names + new_source_names) def main(): if sys.version >= '3': # Force encoding to UTF-8 even in non-UTF-8 locales. import io sys.stdout = io.TextIOWrapper( sys.stdout.detach(), encoding="UTF-8", line_buffering=True) else: # Avoid having to do .encode('UTF-8') everywhere. This is a pain; I # wish Python supported something like # "sys.stdout.encoding = 'UTF-8'". def fix_stdout(): import codecs sys.stdout = codecs.EncodedFile(sys.stdout, 'UTF-8') def null_decode(input, errors='strict'): return input, len(input) sys.stdout.decode = null_decode fix_stdout() parser = OptionParser(usage="usage: %prog [options]") parser.add_option( "-l", "--launchpad", dest="launchpad_instance", default="production") parser.add_option( "-v", "--verbose", dest="verbose", default=False, action="store_true", help="be more verbose") parser.add_option( "--log-directory", help="log to a file under this directory") parser.add_option( "-d", "--to-distro", dest="todistro", default="ubuntu", metavar="DISTRO", help="sync to DISTRO") parser.add_option( "-s", "--to-suite", dest="tosuite", metavar="SUITE", help="sync to SUITE") parser.add_option( "-D", "--from-distro", dest="fromdistro", default="debian", metavar="DISTRO", help="sync from DISTRO") parser.add_option( "-S", "--from-suite", dest="fromsuite", metavar="SUITE", help="sync from SUITE") parser.add_option( "--new-only", dest="new_only", default=False, action="store_true", help="only sync new packages") parser.add_option( "--no-new", dest="new", default=True, action="store_false", help="don't sync new packages") parser.add_option( "--batch", dest="batch", default=False, action="store_true", help="assume default answer to all questions") parser.add_option( "--dry-run", default=False, action="store_true", help="only show what would be done; don't copy packages") options, args = parser.parse_args() if args: parser.error("This program does not accept any non-option arguments.") apt_pkg.init() options.launchpad = Launchpad.login_with( CONSUMER_KEY, options.launchpad_instance, version="devel") if options.log_directory is not None: now = time.gmtime() log_relative_path = os.path.join( time.strftime("%F", now), "%s.log" % time.strftime("%T", now)) log_file = os.path.join(options.log_directory, log_relative_path) if not os.path.isdir(os.path.dirname(log_file)): os.makedirs(os.path.dirname(log_file)) sys.stdout = open(log_file, "w", buffering=1) else: log_file = None options.source = Values() options.source.launchpad = options.launchpad options.source.distribution = options.fromdistro options.source.suite = options.fromsuite if options.source.suite is None and options.fromdistro in default_suite: options.source.suite = default_suite[options.fromdistro] lputils.setup_location(options.source) options.target = Values() options.target.launchpad = options.launchpad options.target.distribution = options.todistro options.target.suite = options.tosuite lputils.setup_location(options.target, default_pocket="Proposed") # This is a very crude check, and easily bypassed. It's simply here to # discourage people from causing havoc by mistake. A mass auto-sync is # a disruptive operation, and, generally speaking, archive # administrators know when it's OK to do one. If you aren't an archive # administrator, you should think very hard, and ask on #ubuntu-release # if options.target.distribution is Ubuntu, before disabling this check. owner = options.target.archive.owner if (not options.dry_run and options.launchpad.me != owner and options.launchpad.me not in owner.participants): print("You are not an archive administrator for %s. Exiting." % options.target.distribution.display_name, file=sys.stderr) sys.exit(1) options.requestor = options.launchpad.people["katie"] sync_differences(options) if options.log_directory is not None: sys.stdout.close() current_link = os.path.join(options.log_directory, "current.log") try: os.unlink(current_link) except OSError as e: if e.errno != errno.ENOENT: raise os.symlink(log_relative_path, current_link) if __name__ == '__main__': main()