#!/usr/bin/python3 # -*- coding: utf-8 -*- # # Copyright (C) 2008-2010 Martin Pitt , # 2010 Benjamin Drung , # 2010-2011 Stefano Rivera # # ################################################################## # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License # as published by the Free Software Foundation; version 3. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # See file /usr/share/common-licenses/GPL-3 for more details. # # ################################################################## import argparse import fnmatch import logging import os import shutil import subprocess import sys import textwrap import urllib.request from lazr.restfulclient.errors import HTTPError from ubuntutools import getLogger from ubuntutools.archive import DebianSourcePackage, DownloadError, UbuntuSourcePackage from ubuntutools.config import UDTConfig, ubu_email from ubuntutools.lp import udtexceptions from ubuntutools.lp.lpapicache import ( Distribution, Launchpad, PersonTeam, SourcePackagePublishingHistory, ) from ubuntutools.misc import split_release_pocket from ubuntutools.question import YesNoQuestion from ubuntutools.requestsync.lp import get_debian_srcpkg, get_ubuntu_srcpkg from ubuntutools.requestsync.mail import get_debian_srcpkg as requestsync_mail_get_debian_srcpkg from ubuntutools.version import Version Logger = getLogger() def remove_signature(dscname): """Removes the signature from a .dsc file if the .dsc file is signed.""" dsc_file = open(dscname, encoding="utf-8") if dsc_file.readline().strip() == "-----BEGIN PGP SIGNED MESSAGE-----": unsigned_file = [] # search until begin of body found for line in dsc_file: if line.strip() == "": break # search for end of body for line in dsc_file: if line.strip() == "": break unsigned_file.append(line) dsc_file.close() dsc_file = open(dscname, "w", encoding="utf-8") dsc_file.writelines(unsigned_file) dsc_file.close() def add_fixed_bugs(changes, bugs): """Add additional Launchpad bugs to the list of fixed bugs in changes file.""" changes = [line for line in changes.split("\n") if line.strip() != ""] # Remove duplicates bugs = set(str(bug) for bug in bugs) for i, change in enumerate(changes): if change.startswith("Launchpad-Bugs-Fixed:"): bugs.update(changes[i][22:].strip().split(" ")) changes[i] = f"Launchpad-Bugs-Fixed: {' '.join(bugs)}" break if i == len(changes) - 1: # Launchpad-Bugs-Fixed entry does not exist in changes file line = f"Launchpad-Bugs-Fixed: {' '.join(bugs)}" changes.append(line) return "\n".join(changes + [""]) def sync_dsc( src_pkg, debian_dist, release, name, email, bugs, ubuntu_mirror, keyid=None, simulate=False, force=False, fakesync=False, ): """Local sync, trying to emulate sync-source.py Grabs a source package, replaces the .orig.tar with the one from Ubuntu, if necessary, writes a sync-appropriate .changes file, and signs it. """ uploader = name + " <" + email + ">" new_ver = Version(src_pkg.dsc["Version"]) try: ubuntu_series, ubuntu_pocket = split_release_pocket(release) ubuntu_source = get_ubuntu_srcpkg(src_pkg.source, ubuntu_series, ubuntu_pocket) ubuntu_ver = Version(ubuntu_source.getVersion()) ubu_pkg = UbuntuSourcePackage( src_pkg.source, ubuntu_ver.full_version, ubuntu_source.getComponent(), mirrors=[ubuntu_mirror], ) need_orig = ubuntu_ver.upstream_version != new_ver.upstream_version except udtexceptions.PackageNotFoundException: ubuntu_ver = Version("~") ubu_pkg = None need_orig = True Logger.info("%s does not exist in Ubuntu.", name) Logger.debug( "Source %s: current version %s, new version %s", src_pkg.source, ubuntu_ver, new_ver ) Logger.debug("Needs source tarball: %s", str(need_orig)) cur_ver = ubuntu_ver.get_related_debian_version() if ubuntu_ver.is_modified_in_ubuntu(): if not force: Logger.error("--force is required to discard Ubuntu changes.") sys.exit(1) Logger.warning( "Overwriting modified Ubuntu version %s, setting current version to %s", ubuntu_ver.full_version, cur_ver.full_version, ) if simulate: return try: src_pkg.pull() except DownloadError as e: Logger.error("Failed to download: %s", str(e)) sys.exit(1) src_pkg.unpack() needs_fakesync = not (need_orig or ubu_pkg.verify_orig()) if needs_fakesync and fakesync: Logger.warning("Performing a fakesync") elif not needs_fakesync and fakesync: Logger.error("Fakesync not required, aborting.") sys.exit(1) elif needs_fakesync and not fakesync: Logger.error( "The checksums of the Debian and Ubuntu packages " "mismatch. A fake sync using --fakesync is required." ) sys.exit(1) if fakesync: # Download Ubuntu files (override Debian source tarballs) try: ubu_pkg.pull() except DownloadError as e: Logger.error("Failed to download: %s", str(e)) sys.exit(1) # change into package directory directory = src_pkg.source + "-" + new_ver.upstream_version Logger.debug("cd %s", directory) os.chdir(directory) # read Debian distribution from debian/changelog if not specified if debian_dist is None: line = open("debian/changelog", encoding="utf-8").readline() debian_dist = line.split(" ")[2].strip(";") if not fakesync: # create the changes file changes_filename = f"{src_pkg.source}_{new_ver.strip_epoch()}_source.changes" cmd = [ "dpkg-genchanges", "-S", "-v" + cur_ver.full_version, "-DDistribution=" + release, "-DOrigin=debian/" + debian_dist, "-e" + uploader, ] if need_orig: cmd.append("-sa") else: cmd.append("-sd") if not Logger.isEnabledFor(logging.DEBUG): cmd += ["-q"] Logger.debug("%s> ../%s", " ".join(cmd), changes_filename) changes = subprocess.check_output(cmd, encoding="utf-8") # Add additional bug numbers if len(bugs) > 0: changes = add_fixed_bugs(changes, bugs) # remove extracted (temporary) files Logger.debug("cd ..") os.chdir("..") shutil.rmtree(directory, True) # write changes file changes_file = open(changes_filename, "w", encoding="utf-8") changes_file.writelines(changes) changes_file.close() # remove signature and sign package remove_signature(src_pkg.dsc_name) if keyid is not False: cmd = ["debsign", changes_filename] if keyid is not None: cmd.insert(1, "-k" + keyid) Logger.debug(" ".join(cmd)) subprocess.check_call(cmd) else: # Create fakesync changelog entry new_ver = Version(new_ver.full_version + "fakesync1") changes_filename = f"{src_pkg.source}_{new_ver.strip_epoch()}_source.changes" if len(bugs) > 0: bug_numbers = [f"#{b}" for b in bugs] message = f"Fake sync due to mismatching orig tarball (LP: {', '.join(bug_numbers)})." else: message = "Fake sync due to mismatching orig tarball." cmd = ["dch", "-v", new_ver.full_version, "--force-distribution", "-D", release, message] env = {"DEBFULLNAME": name, "DEBEMAIL": email} Logger.debug(" ".join(cmd)) subprocess.check_call(cmd, env=env) # update the Maintainer field cmd = ["update-maintainer"] if not Logger.isEnabledFor(logging.DEBUG): cmd.append("-q") Logger.debug(" ".join(cmd)) subprocess.check_call(cmd) # Build source package cmd = ["debuild", "--no-lintian", "-nc", "-S", "-v" + cur_ver.full_version] if need_orig: cmd += ["-sa"] if keyid: cmd += ["-k" + keyid] Logger.debug(" ".join(cmd)) returncode = subprocess.call(cmd) if returncode != 0: Logger.error("Source-only build with debuild failed. Please check build log above.") sys.exit(1) def fetch_source_pkg(package, dist, version, component, ubuntu_release, mirror): """Download the specified source package. dist, version, component, mirror can all be None. """ if mirror is None: mirrors = [] else: mirrors = [mirror] if package.endswith(".dsc"): return DebianSourcePackage(dscfile=package, mirrors=mirrors) if dist is None: dist = "unstable" requested_version = version if isinstance(version, str): version = Version(version) if version is None or component is None: try: debian_srcpkg = get_debian_srcpkg(package, dist) except ( udtexceptions.PackageNotFoundException, udtexceptions.SeriesNotFoundException, ) as e: Logger.error(str(e)) sys.exit(1) if version is None: version = Version(debian_srcpkg.getVersion()) try: ubuntu_series, ubuntu_pocket = split_release_pocket(ubuntu_release) ubuntu_srcpkg = get_ubuntu_srcpkg(package, ubuntu_series, ubuntu_pocket) ubuntu_version = Version(ubuntu_srcpkg.getVersion()) except udtexceptions.PackageNotFoundException: ubuntu_version = Version("~") except udtexceptions.SeriesNotFoundException as e: Logger.error(str(e)) sys.exit(1) if ubuntu_version >= version: # The LP importer is maybe out of date debian_srcpkg = requestsync_mail_get_debian_srcpkg(package, dist) if requested_version is None: version = Version(debian_srcpkg.getVersion()) if ubuntu_version >= version: Logger.error( "Version in Debian %s (%s) isn't newer than Ubuntu %s (%s)", version, dist, ubuntu_version, ubuntu_release, ) sys.exit(1) if component is None: component = debian_srcpkg.getComponent() assert component in ("main", "contrib", "non-free", "non-free-firmware") return DebianSourcePackage(package, version.full_version, component, mirrors=mirrors) def copy(src_pkg, release, bugs, sponsoree=None, simulate=False, force=False): """Copy a source package from Debian to Ubuntu using the Launchpad API.""" ubuntu = Distribution("ubuntu") debian_archive = Distribution("debian").getArchive() ubuntu_archive = ubuntu.getArchive() if release is None: ubuntu_series = ubuntu.getDevelopmentSeries().name ubuntu_pocket = "Release" else: ubuntu_series, ubuntu_pocket = split_release_pocket(release) # Ensure that the provided Debian version actually exists. try: debian_spph = SourcePackagePublishingHistory( debian_archive.getPublishedSources( source_name=src_pkg.source, version=src_pkg.version.full_version, exact_match=True )[0] ) except IndexError: Logger.error( "Debian version %s has not been picked up by LP yet. Please try again later.", src_pkg.version, ) sys.exit(1) try: ubuntu_spph = get_ubuntu_srcpkg(src_pkg.source, ubuntu_series, ubuntu_pocket) ubuntu_pkg = UbuntuSourcePackage( src_pkg.source, ubuntu_spph.getVersion(), ubuntu_spph.getComponent(), mirrors=[] ) Logger.info( "Source %s -> %s/%s: current version %s, new version %s", src_pkg.source, ubuntu_series, ubuntu_pocket, ubuntu_pkg.version, src_pkg.version, ) ubuntu_version = Version(ubuntu_pkg.version.full_version) base_version = ubuntu_version.get_related_debian_version() if not force and ubuntu_version.is_modified_in_ubuntu(): Logger.error("--force is required to discard Ubuntu changes.") sys.exit(1) # Check whether a fakesync would be required. if not src_pkg.dsc.compare_dsc(ubuntu_pkg.dsc): Logger.error( "The checksums of the Debian and Ubuntu packages " "mismatch. A fake sync using --fakesync is required." ) sys.exit(1) except udtexceptions.PackageNotFoundException: base_version = Version("~") Logger.info( "Source %s -> %s/%s: not in Ubuntu, new version %s", src_pkg.source, ubuntu_series, ubuntu_pocket, src_pkg.version, ) changes = debian_spph.getChangelog(since_version=base_version) if changes: changes = changes.strip() Logger.info("New changes:\n%s", changes) if simulate: return if sponsoree: Logger.info("Sponsoring this sync for %s (%s)", sponsoree.display_name, sponsoree.name) answer = YesNoQuestion().ask("Sync this package", "no") if answer != "yes": return try: ubuntu_archive.copyPackage( source_name=src_pkg.source, version=src_pkg.version.full_version, from_archive=debian_archive, to_series=ubuntu_series, to_pocket=ubuntu_pocket, include_binaries=False, sponsored=sponsoree, ) except HTTPError as error: Logger.error("HTTP Error %s: %s", error.response.status, error.response.reason) Logger.error(error.content) sys.exit(1) Logger.info("Request succeeded; you should get an e-mail once it is processed.") bugs = sorted(set(bugs)) if bugs: Logger.info("Launchpad bugs to be closed: %s", ", ".join(str(bug) for bug in bugs)) Logger.info("Please wait for the sync to be successful before closing bugs.") answer = YesNoQuestion().ask("Close bugs", "yes") if answer == "yes": close_bugs(bugs, src_pkg.source, src_pkg.version.full_version, changes, sponsoree) def is_blacklisted(query): """Determine if package "query" is in the sync blacklist Returns tuple of (blacklisted, comments) blacklisted is one of False, 'CURRENT', 'ALWAYS' """ series = Launchpad.distributions["ubuntu"].current_series lp_comments = series.getDifferenceComments(source_package_name=query) blacklisted = False comments = [ f"{c.body_text}\n -- {c.comment_author.name}" f" {c.comment_date.strftime('%a, %d %b %Y %H:%M:%S +0000')}" for c in lp_comments ] for diff in series.getDifferencesTo(source_package_name_filter=query): if diff.status == "Blacklisted current version" and blacklisted != "ALWAYS": blacklisted = "CURRENT" if diff.status == "Blacklisted always": blacklisted = "ALWAYS" # Old blacklist: url = "https://ubuntu-archive-team.ubuntu.com/sync-blacklist.txt" with urllib.request.urlopen(url) as f: applicable_lines = [] for line in f: line = line.decode("utf-8") if not line.strip(): applicable_lines = [] continue applicable_lines.append(line) try: line = line[: line.index("#")] except ValueError: pass source = line.strip() if source and fnmatch.fnmatch(query, source): comments += ["From sync-blacklist.txt:"] + applicable_lines blacklisted = "ALWAYS" break return (blacklisted, comments) def close_bugs(bugs, package, version, changes, sponsoree): """Close the correct task on all bugs, with changes""" ubuntu = Launchpad.distributions["ubuntu"] message = f"This bug was fixed in the package {package} - {version}" if sponsoree: message += f"\nSponsored for {sponsoree.display_name} ({sponsoree.name})" if changes: message += "\n\n---------------\n" + changes for bug in bugs: bug = Launchpad.bugs[bug] if bug.duplicate_of is not None: bug = bug.duplicate_of for task in bug.bug_tasks: target = task.target if target == ubuntu or ( target.name == package and getattr(target, "distribution", None) == ubuntu ): if task.status != "Fix Released": Logger.info("Closed bug %s", task.web_link) task.status = "Fix Released" task.lp_save() bug.newMessage(content=message) break else: Logger.error("Cannot find any tasks on LP: #%i to close.", bug.id) def parse(): """Parse given command-line parameters.""" usage = "%(prog)s [options] <.dsc URL/path or package name(s)>" epilog = f"See {os.path.basename(sys.argv[0])}(1) for more info." parser = argparse.ArgumentParser(usage=usage, epilog=epilog) parser.add_argument("-d", "--distribution", help="Debian distribution to sync from.") parser.add_argument("-r", "--release", help="Specify target Ubuntu release.") parser.add_argument("-V", "--debian-version", help="Specify the version to sync from.") parser.add_argument("-c", "--component", help="Specify the Debian component to sync from.") parser.add_argument( "-b", "--bug", metavar="BUG", dest="bugs", action="append", default=[], help="Mark Launchpad bug BUG as being fixed by this upload.", ) parser.add_argument( "-s", "--sponsor", metavar="USERNAME", dest="sponsoree", help="Sponsor the sync for USERNAME (a Launchpad username).", ) parser.add_argument( "-v", "--verbose", action="store_true", help="Display more progress information." ) parser.add_argument( "-F", "--fakesync", action="store_true", help="Perform a fakesync (a sync where Debian and " "Ubuntu have a .orig.tar mismatch). " "This implies --no-lp and will leave a signed " ".changes file for you to upload.", ) parser.add_argument( "-f", "--force", action="store_true", help="Force sync over the top of Ubuntu changes." ) parser.add_argument( "--no-conf", action="store_true", help="Don't read config files or environment variables." ) parser.add_argument( "-l", "--lpinstance", metavar="INSTANCE", help="Launchpad instance to connect to (default: production).", ) parser.add_argument( "--simulate", action="store_true", help="Show what would be done, but don't actually do it.", ) no_lp = parser.add_argument_group( "Local sync preparation options", "Options that only apply when using --no-lp. " "WARNING: The use of --no-lp is not recommended for uploads " "targeted at Ubuntu. " "The archive-admins discourage its use, except for fakesyncs.", ) no_lp.add_argument( "--no-lp", dest="lp", action="store_false", help="Construct sync locally, rather than letting " "Launchpad copy the package directly. " "It will leave a signed .changes file for you to " "upload.", ) no_lp.add_argument( "-n", "--uploader-name", help="Use UPLOADER_NAME as the name of the maintainer for this upload.", ) no_lp.add_argument( "-e", "--uploader-email", help="Use UPLOADER_EMAIL as email address of the maintainer for this upload.", ) no_lp.add_argument( "-k", "--key", dest="keyid", help="Specify the key ID to be used for signing." ) no_lp.add_argument( "--dont-sign", dest="keyid", action="store_false", help="Do not sign the upload." ) no_lp.add_argument( "-D", "--debian-mirror", metavar="DEBIAN_MIRROR", help=f"Preferred Debian mirror (default: {UDTConfig.defaults['DEBIAN_MIRROR']})", ) no_lp.add_argument( "-U", "--ubuntu-mirror", metavar="UBUNTU_MIRROR", help=f"Preferred Ubuntu mirror (default: {UDTConfig.defaults['UBUNTU_MIRROR']})", ) parser.add_argument("package", nargs="*", help=argparse.SUPPRESS) args = parser.parse_args() if args.fakesync: args.lp = False try: args.bugs = [int(b) for b in args.bugs] except TypeError: parser.error("Invalid bug number(s) specified.") if args.component not in (None, "main", "contrib", "non-free", "non-free-firmware"): parser.error( f"{args.component} is not a valid Debian component. " f"It should be one of main, contrib, non-free, or non-free-firmware." ) if args.lp and args.uploader_name: parser.error("Uploader name can only be overridden using --no-lp.") if args.lp and args.uploader_email: parser.error("Uploader email address can only be overridden using --no-lp.") # --key, --dont-sign, --debian-mirror, and --ubuntu-mirror are just # ignored with args.lp, and do not require warnings. if args.lp: for package in args.package: if package.endswith(".dsc"): parser.error(".dsc files can only be synced using --no-lp.") return args def main(): """Handle parameters and get the ball rolling""" args = parse() if args.verbose: Logger.setLevel("DEBUG") config = UDTConfig(args.no_conf) if args.debian_mirror is None: args.debian_mirror = config.get_value("DEBIAN_MIRROR") if args.ubuntu_mirror is None: args.ubuntu_mirror = config.get_value("UBUNTU_MIRROR") if args.keyid is None: args.keyid = config.get_value("KEYID") if args.lpinstance is None: args.lpinstance = config.get_value("LPINSTANCE") # devel for copyPackage and changelogUrl kwargs = {"service": args.lpinstance, "api_version": "devel"} try: if args.lp and not args.simulate: Launchpad.login(**kwargs) else: Launchpad.login_anonymously(**kwargs) except IOError as e: Logger.error("Could not authenticate to LP: %s", str(e)) sys.exit(1) if args.release is None: ubuntu = Launchpad.distributions["ubuntu"] args.release = f"{ubuntu.current_series.name}-proposed" if not args.fakesync and not args.lp: Logger.warning( "The use of --no-lp is not recommended for uploads " "targeted at Ubuntu. " "The archive-admins discourage its use, except for " "fakesyncs." ) sponsoree = None if args.sponsoree: try: sponsoree = PersonTeam(args.sponsoree) except KeyError: Logger.error('Cannot find the username "%s" in Launchpad.', args.sponsoree) sys.exit(1) if sponsoree and args.uploader_name is None: args.uploader_name = sponsoree.display_name elif args.uploader_name is None: args.uploader_name = ubu_email(export=False)[0] if sponsoree and args.uploader_email is None: try: args.uploader_email = sponsoree.preferred_email_address.email except ValueError: if not args.lp: Logger.error( "%s doesn't have a publicly visible e-mail " "address in LP, please provide one " "--uploader-email option", sponsoree.display_name, ) sys.exit(1) elif args.uploader_email is None: args.uploader_email = ubu_email(export=False)[1] for package in args.package: src_pkg = fetch_source_pkg( package, args.distribution, args.debian_version, args.component, args.release, args.debian_mirror, ) blacklisted, comments = is_blacklisted(src_pkg.source) blacklist_fail = False if blacklisted: messages = [] if blacklisted == "CURRENT": Logger.debug( "Source package %s is temporarily blacklisted " "(blacklisted_current). " "Ubuntu ignores these for now. " "See also LP: #841372", src_pkg.source, ) else: if args.fakesync: messages += ["Doing a fakesync, overriding blacklist."] else: blacklist_fail = True messages += [ "If this package needs a fakesync, use --fakesync", "If you think this package shouldn't be " "blacklisted, please file a bug explaining your " "reasoning and subscribe ~ubuntu-archive.", ] if blacklist_fail: Logger.error("Source package %s is blacklisted.", src_pkg.source) elif blacklisted == "ALWAYS": Logger.info("Source package %s is blacklisted.", src_pkg.source) if messages: for message in messages: for line in textwrap.wrap(message): Logger.info(line) if comments: Logger.info("Blacklist Comments:") for comment in comments: for line in textwrap.wrap(comment): Logger.info(" %s", line) if blacklist_fail: continue if args.lp: copy(src_pkg, args.release, args.bugs, sponsoree, args.simulate, args.force) else: os.environ["DEB_VENDOR"] = "Ubuntu" sync_dsc( src_pkg, args.distribution, args.release, args.uploader_name, args.uploader_email, args.bugs, args.ubuntu_mirror, args.keyid, args.simulate, args.force, args.fakesync, ) if __name__ == "__main__": try: main() except KeyboardInterrupt: Logger.info("User abort.")