#!/usr/bin/python # -*- coding: utf-8 -*- # # Copyright (C) 2008-2010 Martin Pitt , # 2010 Benjamin Drung , # 2010-2011 Stefano Rivera # # ################################################################## # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License # as published by the Free Software Foundation; version 3. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # See file /usr/share/common-licenses/GPL-3 for more details. # # ################################################################## import codecs import optparse import os import re import shutil import sys import textwrap import urllib import debian.debian_support from devscripts.logger import Logger from distro_info import UbuntuDistroInfo from lazr.restfulclient.errors import HTTPError from ubuntutools.archive import (DebianSourcePackage, UbuntuSourcePackage, DownloadError) from ubuntutools.config import UDTConfig, ubu_email from ubuntutools.requestsync.mail import ( get_debian_srcpkg as requestsync_mail_get_debian_srcpkg) from ubuntutools.requestsync.lp import get_debian_srcpkg, get_ubuntu_srcpkg from ubuntutools.lp import udtexceptions from ubuntutools.lp.lpapicache import (Distribution, Launchpad, PersonTeam, SourcePackagePublishingHistory) from ubuntutools.misc import split_release_pocket from ubuntutools.question import YesNoQuestion from ubuntutools import subprocess class Version(debian.debian_support.Version): def strip_epoch(self): '''Removes the epoch from a Debian version string. strip_epoch(1:1.52-1) will return "1.52-1" and strip_epoch(1.1.3-1) will return "1.1.3-1". ''' parts = self.full_version.split(':') if len(parts) > 1: del parts[0] version_without_epoch = ':'.join(parts) return version_without_epoch def get_related_debian_version(self): '''Strip the ubuntu-specific bits off the version''' related_debian_version = self.full_version uidx = related_debian_version.find('ubuntu') if uidx > 0: related_debian_version = related_debian_version[:uidx] uidx = related_debian_version.find('build') if uidx > 0: related_debian_version = related_debian_version[:uidx] return Version(related_debian_version) def is_modified_in_ubuntu(self): '''Did Ubuntu modify this (and mark the version appropriately)?''' return 'ubuntu' in self.full_version def remove_signature(dscname): '''Removes the signature from a .dsc file if the .dsc file is signed.''' dsc_file = open(dscname) if dsc_file.readline().strip() == "-----BEGIN PGP SIGNED MESSAGE-----": unsigned_file = [] # search until begin of body found for line in dsc_file: if line.strip() == "": break # search for end of body for line in dsc_file: if line.strip() == "": break unsigned_file.append(line) dsc_file.close() dsc_file = open(dscname, "w") dsc_file.writelines(unsigned_file) dsc_file.close() def add_fixed_bugs(changes, bugs): '''Add additional Launchpad bugs to the list of fixed bugs in changes file.''' changes = [l for l in changes.split("\n") if l.strip() != ""] # Remove duplicates bugs = set(str(bug) for bug in bugs) for i in xrange(len(changes)): if changes[i].startswith("Launchpad-Bugs-Fixed:"): bugs.update(changes[i][22:].strip().split(" ")) changes[i] = "Launchpad-Bugs-Fixed: %s" % (" ".join(bugs)) break elif i == len(changes) - 1: # Launchpad-Bugs-Fixed entry does not exist in changes file line = "Launchpad-Bugs-Fixed: %s" % (" ".join(bugs)) changes.append(line) return "\n".join(changes + [""]) def sync_dsc(src_pkg, debian_dist, release, name, email, bugs, ubuntu_mirror, keyid=None, simulate=False, force=False, fakesync=False): '''Local sync, trying to emulate sync-source.py Grabs a source package, replaces the .orig.tar with the one from Ubuntu, if necessary, writes a sync-appropriate .changes file, and signs it. ''' uploader = name + " <" + email + ">" src_pkg.pull_dsc() new_ver = Version(src_pkg.dsc["Version"]) try: series = release.split("-")[0] ubuntu_source = get_ubuntu_srcpkg(src_pkg.source, series) ubuntu_ver = Version(ubuntu_source.getVersion()) ubu_pkg = UbuntuSourcePackage(src_pkg.source, ubuntu_ver.full_version, ubuntu_source.getComponent(), mirrors=[ubuntu_mirror]) ubu_pkg.pull_dsc() need_orig = ubuntu_ver.upstream_version != new_ver.upstream_version except udtexceptions.PackageNotFoundException: ubuntu_ver = Version('~') ubu_pkg = None need_orig = True Logger.normal('%s does not exist in Ubuntu.', name) Logger.debug('Source %s: current version %s, new version %s', src_pkg.source, ubuntu_ver, new_ver) Logger.debug('Needs source tarball: %s', str(need_orig)) cur_ver = ubuntu_ver.get_related_debian_version() if ubuntu_ver.is_modified_in_ubuntu(): if not force: Logger.error('--force is required to discard Ubuntu changes.') sys.exit(1) Logger.warn('Overwriting modified Ubuntu version %s, ' 'setting current version to %s', ubuntu_ver.full_version, cur_ver.full_version) if simulate: return try: src_pkg.pull() except DownloadError, e: Logger.error('Failed to download: %s', str(e)) sys.exit(1) src_pkg.unpack() needs_fakesync = not (need_orig or ubu_pkg.verify_orig()) if needs_fakesync and fakesync: Logger.warn('Performing a fakesync') elif not needs_fakesync and fakesync: Logger.error('Fakesync not required, aborting.') sys.exit(1) elif needs_fakesync and not fakesync: Logger.error('The checksums of the Debian and Ubuntu packages ' 'mismatch. A fake sync using --fakesync is required.') sys.exit(1) if fakesync: # Download Ubuntu files (override Debian source tarballs) try: ubu_pkg.pull() except DownloadError, e: Logger.error('Failed to download: %s', str(e)) sys.exit(1) # change into package directory directory = src_pkg.source + '-' + new_ver.upstream_version Logger.command(('cd', directory)) os.chdir(directory) # read Debian distribution from debian/changelog if not specified if debian_dist is None: line = open("debian/changelog").readline() debian_dist = line.split(" ")[2].strip(";") if not fakesync: # create the changes file changes_filename = "%s_%s_source.changes" % \ (src_pkg.source, new_ver.strip_epoch()) cmd = ["dpkg-genchanges", "-S", "-v" + cur_ver.full_version, "-DDistribution=" + release, "-DOrigin=debian/" + debian_dist, "-e" + uploader] if need_orig: cmd.append("-sa") else: cmd.append("-sd") if not Logger.verbose: cmd += ["-q"] Logger.command(cmd + ['>', '../' + changes_filename]) process = subprocess.Popen(cmd, stdout=subprocess.PIPE) changes = process.communicate()[0] # Add additional bug numbers if len(bugs) > 0: changes = add_fixed_bugs(changes, bugs) # remove extracted (temporary) files Logger.command(('cd', '..')) os.chdir('..') shutil.rmtree(directory, True) # write changes file changes_file = open(changes_filename, "w") changes_file.writelines(changes) changes_file.close() # remove signature and sign package remove_signature(src_pkg.dsc_name) if keyid is not False: cmd = ["debsign", changes_filename] if not keyid is None: cmd.insert(1, "-k" + keyid) Logger.command(cmd) subprocess.check_call(cmd) else: # Create fakesync changelog entry new_ver = Version(new_ver.full_version + "fakesync1") changes_filename = "%s_%s_source.changes" % \ (src_pkg.source, new_ver.strip_epoch()) if len(bugs) > 0: message = "Fake sync due to mismatching orig tarball (LP: %s)." % \ (", ".join(["#" + str(b) for b in bugs])) else: message = "Fake sync due to mismatching orig tarball." cmd = ['dch', '-v', new_ver.full_version, '--force-distribution', '-D', release, message] env = {'DEBFULLNAME': name, 'DEBEMAIL': email} Logger.command(cmd) subprocess.check_call(cmd, env=env) # update the Maintainer field cmd = ["update-maintainer"] if not Logger.verbose: cmd.append("-q") Logger.command(cmd) subprocess.check_call(cmd) # Build source package cmd = ["debuild", "--no-lintian", "-S", "-v" + cur_ver.full_version] if need_orig: cmd += ['-sa'] if keyid: cmd += ["-k" + keyid] Logger.command(cmd) returncode = subprocess.call(cmd) if returncode != 0: Logger.error('Source-only build with debuild failed. ' 'Please check build log above.') sys.exit(1) def fetch_source_pkg(package, dist, version, component, ubuntu_release, mirror): """Download the specified source package. dist, version, component, mirror can all be None. """ if mirror is None: mirrors = [] else: mirrors = [mirror] if package.endswith('.dsc'): return DebianSourcePackage(dscfile=package, mirrors=mirrors) if dist is None: ubu_info = UbuntuDistroInfo() dist = 'testing' if ubu_info.is_lts(ubu_info.devel()) else 'unstable' requested_version = version if type(version) == str: version = Version(version) if version is None or component is None: try: debian_srcpkg = get_debian_srcpkg(package, dist) except (udtexceptions.PackageNotFoundException, udtexceptions.SeriesNotFoundException), e: Logger.error(str(e)) sys.exit(1) if version is None: version = Version(debian_srcpkg.getVersion()) try: ubuntu_srcpkg = get_ubuntu_srcpkg(package, ubuntu_release) ubuntu_version = Version(ubuntu_srcpkg.getVersion()) except udtexceptions.PackageNotFoundException: ubuntu_version = Version('~') except udtexceptions.SeriesNotFoundException, e: Logger.error(str(e)) sys.exit(1) if ubuntu_version >= version: # The LP importer is maybe out of date debian_srcpkg = requestsync_mail_get_debian_srcpkg(package, dist) if requested_version is None: version = Version(debian_srcpkg.getVersion()) if ubuntu_version >= version: Logger.error("Version in Debian %s (%s) isn't newer than " "Ubuntu %s (%s)", version, dist, ubuntu_version, ubuntu_release) sys.exit(1) if component is None: component = debian_srcpkg.getComponent() assert component in ('main', 'contrib', 'non-free') return DebianSourcePackage(package, version.full_version, component, mirrors=mirrors) def copy(src_pkg, release, bugs, sponsoree=None, simulate=False, force=False): """Copy a source package from Debian to Ubuntu using the Launchpad API.""" ubuntu = Distribution('ubuntu') debian_archive = Distribution('debian').getArchive() ubuntu_archive = ubuntu.getArchive() if release is None: ubuntu_series = ubuntu.getDevelopmentSeries().name ubuntu_pocket = 'Release' else: ubuntu_series, ubuntu_pocket = split_release_pocket(release) # Ensure that the provided Debian version actually exists. try: debian_spph = SourcePackagePublishingHistory( debian_archive.getPublishedSources( source_name=src_pkg.source, version=src_pkg.version.full_version, exact_match=True)[0] ) except IndexError: Logger.error('Debian version %s does not exist!', src_pkg.version) sys.exit(1) try: ubuntu_spph = get_ubuntu_srcpkg(src_pkg.source, ubuntu_series, ubuntu_pocket) ubuntu_pkg = UbuntuSourcePackage(src_pkg.source, ubuntu_spph.getVersion(), ubuntu_spph.getComponent(), mirrors=[]) Logger.normal('Source %s -> %s/%s: current version %s, new version %s', src_pkg.source, ubuntu_series, ubuntu_pocket, ubuntu_pkg.version, src_pkg.version) ubuntu_version = Version(ubuntu_pkg.version.full_version) base_version = ubuntu_version.get_related_debian_version() if not force and ubuntu_version.is_modified_in_ubuntu(): Logger.error('--force is required to discard Ubuntu changes.') sys.exit(1) # Check whether a fakesync would be required. src_pkg.pull_dsc() ubuntu_pkg.pull_dsc() if not src_pkg.dsc.compare_dsc(ubuntu_pkg.dsc): Logger.error('The checksums of the Debian and Ubuntu packages ' 'mismatch. A fake sync using --fakesync is required.') sys.exit(1) except udtexceptions.PackageNotFoundException: base_version = Version('~') Logger.normal('Source %s -> %s/%s: not in Ubuntu, new version %s', src_pkg.source, ubuntu_series, ubuntu_pocket, src_pkg.version) changes = debian_spph.getChangelog(since_version=base_version) if changes: changes = changes.strip() Logger.normal("New changes:\n%s", changes) if simulate: return if sponsoree: Logger.normal("Sponsoring this sync for %s (%s)", sponsoree.display_name, sponsoree.name) answer = YesNoQuestion().ask("Sync this package", "no") if answer != "yes": return try: ubuntu_archive.copyPackage( source_name=src_pkg.source, version=src_pkg.version.full_version, from_archive=debian_archive, to_series=ubuntu_series, to_pocket=ubuntu_pocket, include_binaries=False, sponsored=sponsoree) except HTTPError, error: Logger.error("HTTP Error %s: %s", error.response.status, error.response.reason) Logger.error(error.content) sys.exit(1) Logger.normal('Request succeeded; you should get an e-mail once it is ' 'processed.') bugs = sorted(set(bugs)) if bugs: Logger.normal("Launchpad bugs to be closed: %s", ', '.join(str(bug) for bug in bugs)) Logger.normal('Please wait for the sync to be successful before ' 'closing bugs.') answer = YesNoQuestion().ask("Close bugs", "yes") if answer == "yes": close_bugs(bugs, src_pkg.source, src_pkg.version.full_version, changes) def is_blacklisted(query): """"Determine if package "query" is in the sync blacklist Returns tuple of (blacklisted, comments) blacklisted is one of False, 'CURRENT', 'ALWAYS' """ series = Launchpad.distributions['ubuntu'].current_series lp_comments = series.getDifferenceComments(source_package_name=query) blacklisted = False comments = [u'%s\n -- %s %s' % (c.body_text, c.comment_author.name, c.comment_date.strftime('%a, %d %b %Y %H:%M:%S +0000')) for c in lp_comments] for diff in series.getDifferencesTo(source_package_name_filter=query): if (diff.status == 'Blacklisted current version' and blacklisted != 'ALWAYS'): blacklisted = 'CURRENT' if diff.status == 'Blacklisted always': blacklisted = 'ALWAYS' # Old blacklist: url = 'http://people.canonical.com/~ubuntu-archive/sync-blacklist.txt' with codecs.EncodedFile(urllib.urlopen(url), 'UTF-8') as f: applicable_lines = [] for line in f: if not line.strip(): applicable_lines = [] continue m = re.match(r'^\s*([a-z0-9.+-]+)?', line) source = m.group(0) applicable_lines.append(line) if source and query == source: comments += ["From sync-blacklist.txt:"] + applicable_lines blacklisted = 'ALWAYS' break return (blacklisted, comments) def close_bugs(bugs, package, version, changes): """Close the correct task on all bugs, with changes""" ubuntu = Launchpad.distributions['ubuntu'] message = ("This bug was fixed in the package %s - %s" % (package, version)) if changes: message += "\n\n---------------\n" + changes for bug in bugs: bug = Launchpad.bugs[bug] if bug.duplicate_of is not None: bug = bug.duplicate_of for task in bug.bug_tasks: target = task.target if target == ubuntu or (target.name == package and getattr(target, 'distribution', None) == ubuntu): if task.status != 'Fix Released': Logger.normal("Closed bug %s", task.web_link) task.status = 'Fix Released' task.lp_save() bug.newMessage(content=message) break else: Logger.error(u"Cannot find any tasks on LP: #%i to close.", bug.id) def parse(): """Parse given command-line parameters.""" usage = "%prog [options] <.dsc URL/path or package name>" epilog = "See %s(1) for more info." % os.path.basename(sys.argv[0]) parser = optparse.OptionParser(usage=usage, epilog=epilog) parser.add_option("-d", "--distribution", help="Debian distribution to sync from.") parser.add_option("-r", "--release", help="Specify target Ubuntu release.") parser.add_option("-V", "--debian-version", help="Specify the version to sync from.") parser.add_option("-c", "--component", help="Specify the Debian component to sync from.") parser.add_option("-b", "--bug", metavar="BUG", dest="bugs", action="append", default=list(), help="Mark Launchpad bug BUG as being fixed by this " "upload.") parser.add_option("-s", "--sponsor", metavar="USERNAME", dest="sponsoree", default=None, help="Sponsor the sync for USERNAME (a Launchpad " "username).") parser.add_option("-v", "--verbose", action="store_true", default=False, help="Display more progress information.") parser.add_option("-F", "--fakesync", action="store_true", default=False, help="Perform a fakesync (a sync where Debian and " "Ubuntu have a .orig.tar mismatch). " "This implies --no-lp and will leave a signed " ".changes file for you to upload.") parser.add_option("-f", "--force", action="store_true", default=False, help="Force sync over the top of Ubuntu changes.") parser.add_option('--no-conf', default=False, action='store_true', help="Don't read config files or environment variables.") parser.add_option('-l', '--lpinstance', metavar='INSTANCE', help='Launchpad instance to connect to ' '(default: production).') parser.add_option('--simulate', default=False, action='store_true', help="Show what would be done, but don't actually do " "it.") no_lp = optparse.OptionGroup(parser, "Local sync preparation options", "Options that only apply when using --no-lp. " "WARNING: The use of --no-lp is not recommended for uploads " "targeted at Ubuntu. " "The archive-admins discourage its use, except for fakesyncs.") no_lp.add_option("--no-lp", dest="lp", action="store_false", default=True, help="Construct sync locally, rather than letting " "Launchpad copy the package directly. " "It will leave a signed .changes file for you to " "upload.") no_lp.add_option("-n", "--uploader-name", help="Use UPLOADER_NAME as the name of the maintainer " "for this upload.") no_lp.add_option("-e", "--uploader-email", help="Use UPLOADER_EMAIL as email address of the " "maintainer for this upload.") no_lp.add_option("-k", "--key", dest="keyid", help="Specify the key ID to be used for signing.") no_lp.add_option('--dont-sign', dest='keyid', action='store_false', help='Do not sign the upload.') no_lp.add_option('-D', '--debian-mirror', metavar='DEBIAN_MIRROR', help='Preferred Debian mirror ' '(default: %s)' % UDTConfig.defaults['DEBIAN_MIRROR']) no_lp.add_option('-U', '--ubuntu-mirror', metavar='UBUNTU_MIRROR', help='Preferred Ubuntu mirror ' '(default: %s)' % UDTConfig.defaults['UBUNTU_MIRROR']) parser.add_option_group(no_lp) (options, args) = parser.parse_args() if options.fakesync: options.lp = False if len(args) == 0: parser.error('No .dsc URL/path or package name specified.') if len(args) > 1: parser.error('Multiple .dsc URLs/paths or package names specified: ' + ', '.join(args)) try: options.bugs = [int(b) for b in options.bugs] except TypeError: parser.error('Invalid bug number(s) specified.') if options.component not in (None, "main", "contrib", "non-free"): parser.error('%s is not a valid Debian component. ' 'It should be one of main, contrib, or non-free.' % options.component) if options.lp and options.uploader_name: parser.error('Uploader name can only be overridden using --no-lp.') if options.lp and options.uploader_email: parser.error('Uploader email address can only be overridden using ' '--no-lp.') # --key, --dont-sign, --debian-mirror, and --ubuntu-mirror are just # ignored with options.lp, and do not require warnings. if options.lp: if args[0].endswith('.dsc'): parser.error('.dsc files can only be synced using --no-lp.') return (options, args[0]) def main(): '''Handle parameters and get the ball rolling''' (options, package) = parse() Logger.verbose = options.verbose config = UDTConfig(options.no_conf) if options.debian_mirror is None: options.debian_mirror = config.get_value('DEBIAN_MIRROR') if options.ubuntu_mirror is None: options.ubuntu_mirror = config.get_value('UBUNTU_MIRROR') if options.lpinstance is None: options.lpinstance = config.get_value('LPINSTANCE') try: # devel for copyPackage and changelogUrl Launchpad.login(service=options.lpinstance, api_version='devel') except IOError: sys.exit(1) if options.release is None: ubuntu = Launchpad.distributions["ubuntu"] options.release = ubuntu.current_series.name if not options.fakesync and not options.lp: Logger.warn("The use of --no-lp is not recommended for uploads " "targeted at Ubuntu. " "The archive-admins discourage its use, except for " "fakesyncs.") sponsoree = None if options.sponsoree: try: sponsoree = PersonTeam(options.sponsoree) except KeyError: Logger.error('Cannot find the username "%s" in Launchpad.', options.sponsoree) sys.exit(1) if sponsoree and options.uploader_name is None: options.uploader_name = sponsoree.display_name elif options.uploader_name is None: options.uploader_name = ubu_email(export=False)[0] if sponsoree and options.uploader_email is None: try: options.uploader_email = sponsoree.preferred_email_address.email except ValueError: if not options.lp: Logger.error("%s doesn't have a publicly visible e-mail " "address in LP, please provide one " "--uploader-email option", sponsoree.display_name) sys.exit(1) elif options.uploader_email is None: options.uploader_email = ubu_email(export=False)[1] src_pkg = fetch_source_pkg(package, options.distribution, options.debian_version, options.component, options.release.split("-")[0], options.debian_mirror) blacklisted, comments = is_blacklisted(src_pkg.source) blacklist_fail = False if blacklisted: messages = [] if blacklisted == 'CURRENT': Logger.debug("Source package %s is temporarily blacklisted " "(blacklisted_current). " "Ubuntu ignores these for now. " "See also LP: #841372", src_pkg.source) else: if options.fakesync: messages += ["Doing a fakesync, overriding blacklist."] else: blacklist_fail = True messages += ["If this package needs a fakesync, " "use --fakesync", "If you think this package shouldn't be " "blacklisted, please file a bug explaining your " "reasoning and subscribe ~ubuntu-archive."] if blacklist_fail: Logger.error(u"Source package %s is blacklisted.", src_pkg.source) elif blacklisted == 'ALWAYS': Logger.normal(u"Source package %s is blacklisted.", src_pkg.source) if messages: for message in messages: for line in textwrap.wrap(message): Logger.normal(line) if comments: Logger.normal("Blacklist Comments:") for comment in comments: for line in textwrap.wrap(comment): Logger.normal(u" " + line) if blacklist_fail: sys.exit(1) if options.lp: copy(src_pkg, options.release, options.bugs, sponsoree, options.simulate, options.force) else: os.environ['DEB_VENDOR'] = 'Ubuntu' sync_dsc(src_pkg, options.distribution, options.release, options.uploader_name, options.uploader_email, options.bugs, options.ubuntu_mirror, options.keyid, options.simulate, options.force, options.fakesync) if __name__ == "__main__": try: main() except KeyboardInterrupt: Logger.normal('User abort.')