#!/usr/bin/python3 # -*- coding: utf-8 -*- # # Copyright (C) 2008-2010 Martin Pitt , # 2010 Benjamin Drung , # 2010-2011 Stefano Rivera # # ################################################################## # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License # as published by the Free Software Foundation; version 3. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # See file /usr/share/common-licenses/GPL-3 for more details. # # ################################################################## import argparse import fnmatch import logging import os import shutil import subprocess import sys import textwrap import urllib.request from lazr.restfulclient.errors import HTTPError from ubuntutools import getLogger from ubuntutools.archive import DebianSourcePackage, DownloadError, UbuntuSourcePackage from ubuntutools.config import UDTConfig, ubu_email from ubuntutools.lp import udtexceptions from ubuntutools.lp.lpapicache import ( Distribution, Launchpad, PersonTeam, SourcePackagePublishingHistory, ) from ubuntutools.misc import split_release_pocket from ubuntutools.question import YesNoQuestion from ubuntutools.requestsync.lp import get_debian_srcpkg, get_ubuntu_srcpkg from ubuntutools.requestsync.mail import get_debian_srcpkg as requestsync_mail_get_debian_srcpkg from ubuntutools.version import Version Logger = getLogger() cached_sync_blocklist = None def remove_signature(dscname): """Removes the signature from a .dsc file if the .dsc file is signed.""" dsc_file = open(dscname, encoding="utf-8") if dsc_file.readline().strip() == "-----BEGIN PGP SIGNED MESSAGE-----": unsigned_file = [] # search until begin of body found for line in dsc_file: if line.strip() == "": break # search for end of body for line in dsc_file: if line.strip() == "": break unsigned_file.append(line) dsc_file.close() dsc_file = open(dscname, "w", encoding="utf-8") dsc_file.writelines(unsigned_file) dsc_file.close() def add_fixed_bugs(changes, bugs): """Add additional Launchpad bugs to the list of fixed bugs in changes file.""" changes = [line for line in changes.split("\n") if line.strip() != ""] # Remove duplicates bugs = set(str(bug) for bug in bugs) for i, change in enumerate(changes): if change.startswith("Launchpad-Bugs-Fixed:"): bugs.update(changes[i][22:].strip().split(" ")) changes[i] = f"Launchpad-Bugs-Fixed: {' '.join(bugs)}" break if i == len(changes) - 1: # Launchpad-Bugs-Fixed entry does not exist in changes file line = f"Launchpad-Bugs-Fixed: {' '.join(bugs)}" changes.append(line) return "\n".join(changes + [""]) def sync_dsc( src_pkg, debian_dist, release, name, email, bugs, ubuntu_mirror, keyid=None, simulate=False, force=False, fakesync=False, ): """Local sync, trying to emulate sync-source.py Grabs a source package, replaces the .orig.tar with the one from Ubuntu, if necessary, writes a sync-appropriate .changes file, and signs it. """ uploader = name + " <" + email + ">" new_ver = Version(src_pkg.dsc["Version"]) try: ubuntu_series, ubuntu_pocket = split_release_pocket(release) ubuntu_source = get_ubuntu_srcpkg(src_pkg.source, ubuntu_series, ubuntu_pocket) ubuntu_ver = Version(ubuntu_source.getVersion()) ubu_pkg = UbuntuSourcePackage( src_pkg.source, ubuntu_ver.full_version, ubuntu_source.getComponent(), mirrors=[ubuntu_mirror], ) need_orig = ubuntu_ver.upstream_version != new_ver.upstream_version except udtexceptions.PackageNotFoundException: ubuntu_ver = Version("~") ubu_pkg = None need_orig = True Logger.info("%s does not exist in Ubuntu.", name) Logger.debug( "Source %s: current version %s, new version %s", src_pkg.source, ubuntu_ver, new_ver ) Logger.debug("Needs source tarball: %s", str(need_orig)) cur_ver = ubuntu_ver.get_related_debian_version() if ubuntu_ver.is_modified_in_ubuntu(): if not force: Logger.error("--force is required to discard Ubuntu changes.") return None Logger.warning( "Overwriting modified Ubuntu version %s, setting current version to %s", ubuntu_ver.full_version, cur_ver.full_version, ) if simulate: return try: src_pkg.pull() except DownloadError as e: Logger.error("Failed to download: %s", str(e)) return None src_pkg.unpack() needs_fakesync = not (need_orig or ubu_pkg.verify_orig()) if needs_fakesync and fakesync: Logger.warning("Performing a fakesync") elif not needs_fakesync and fakesync: Logger.error("Fakesync not required, aborting.") return None elif needs_fakesync and not fakesync: Logger.error( "The checksums of the Debian and Ubuntu packages " "mismatch. A fake sync using --fakesync is required." ) return None if fakesync: # Download Ubuntu files (override Debian source tarballs) try: ubu_pkg.pull() except DownloadError as e: Logger.error("Failed to download: %s", str(e)) return None # change into package directory directory = src_pkg.source + "-" + new_ver.upstream_version Logger.debug("cd %s", directory) os.chdir(directory) # read Debian distribution from debian/changelog if not specified if debian_dist is None: line = open("debian/changelog", encoding="utf-8").readline() debian_dist = line.split(" ")[2].strip(";") if not fakesync: # create the changes file changes_filename = f"{src_pkg.source}_{new_ver.strip_epoch()}_source.changes" cmd = [ "dpkg-genchanges", "-S", "-v" + cur_ver.full_version, "-DDistribution=" + release, "-DOrigin=debian/" + debian_dist, "-e" + uploader, ] if need_orig: cmd.append("-sa") else: cmd.append("-sd") if not Logger.isEnabledFor(logging.DEBUG): cmd += ["-q"] Logger.debug("%s> ../%s", " ".join(cmd), changes_filename) changes = subprocess.check_output(cmd, encoding="utf-8") # Add additional bug numbers if len(bugs) > 0: changes = add_fixed_bugs(changes, bugs) # remove extracted (temporary) files Logger.debug("cd ..") os.chdir("..") shutil.rmtree(directory, True) # write changes file changes_file = open(changes_filename, "w", encoding="utf-8") changes_file.writelines(changes) changes_file.close() # remove signature and sign package remove_signature(src_pkg.dsc_name) if keyid is not False: cmd = ["debsign", changes_filename] if keyid is not None: cmd.insert(1, "-k" + keyid) Logger.debug(" ".join(cmd)) subprocess.check_call(cmd) else: # Create fakesync changelog entry new_ver = Version(new_ver.full_version + "fakesync1") changes_filename = f"{src_pkg.source}_{new_ver.strip_epoch()}_source.changes" if len(bugs) > 0: bug_numbers = [f"#{b}" for b in bugs] message = f"Fake sync due to mismatching orig tarball (LP: {', '.join(bug_numbers)})." else: message = "Fake sync due to mismatching orig tarball." cmd = ["dch", "-v", new_ver.full_version, "--force-distribution", "-D", release, message] env = {"DEBFULLNAME": name, "DEBEMAIL": email} Logger.debug(" ".join(cmd)) subprocess.check_call(cmd, env=env) # update the Maintainer field cmd = ["update-maintainer"] if not Logger.isEnabledFor(logging.DEBUG): cmd.append("-q") Logger.debug(" ".join(cmd)) subprocess.check_call(cmd) # Build source package cmd = ["debuild", "--no-lintian", "-nc", "-S", "-v" + cur_ver.full_version] if need_orig: cmd += ["-sa"] if keyid: cmd += ["-k" + keyid] Logger.debug(" ".join(cmd)) returncode = subprocess.call(cmd) if returncode != 0: Logger.error("Source-only build with debuild failed. Please check build log above.") return None def fetch_source_pkg(package, dist, version, component, ubuntu_release, mirror): """Download the specified source package. dist, version, component, mirror can all be None. """ if mirror is None: mirrors = [] else: mirrors = [mirror] if package.endswith(".dsc"): return DebianSourcePackage(dscfile=package, mirrors=mirrors) if dist is None: dist = "unstable" requested_version = version if isinstance(version, str): version = Version(version) if version is None or component is None: try: debian_srcpkg = get_debian_srcpkg(package, dist) except ( udtexceptions.PackageNotFoundException, udtexceptions.SeriesNotFoundException, ) as e: Logger.error(str(e)) return None if version is None: version = Version(debian_srcpkg.getVersion()) try: ubuntu_series, ubuntu_pocket = split_release_pocket(ubuntu_release) ubuntu_srcpkg = get_ubuntu_srcpkg(package, ubuntu_series, ubuntu_pocket) ubuntu_version = Version(ubuntu_srcpkg.getVersion()) except udtexceptions.PackageNotFoundException: ubuntu_version = Version("~") except udtexceptions.SeriesNotFoundException as e: Logger.error(str(e)) return None if ubuntu_version >= version: # The LP importer is maybe out of date debian_srcpkg = requestsync_mail_get_debian_srcpkg(package, dist) if requested_version is None: version = Version(debian_srcpkg.getVersion()) if ubuntu_version >= version: Logger.error( "Version in Debian %s (%s) isn't newer than Ubuntu %s (%s)", version, dist, ubuntu_version, ubuntu_release, ) return None if component is None: component = debian_srcpkg.getComponent() assert component in ("main", "contrib", "non-free", "non-free-firmware") return DebianSourcePackage(package, version.full_version, component, mirrors=mirrors) def copy(src_pkg, release, bugs, sponsoree=None, simulate=False, force=False, yes=False): """Copy a source package from Debian to Ubuntu using the Launchpad API.""" ubuntu = Distribution("ubuntu") debian_archive = Distribution("debian").getArchive() ubuntu_archive = ubuntu.getArchive() if release is None: ubuntu_series = ubuntu.getDevelopmentSeries().name ubuntu_pocket = "Release" else: ubuntu_series, ubuntu_pocket = split_release_pocket(release) # Ensure that the provided Debian version actually exists. try: debian_spph = SourcePackagePublishingHistory( debian_archive.getPublishedSources( source_name=src_pkg.source, version=src_pkg.version.full_version, exact_match=True )[0] ) except IndexError: Logger.error( "Debian version %s has not been picked up by LP yet. Please try again later.", src_pkg.version, ) return None try: ubuntu_spph = get_ubuntu_srcpkg(src_pkg.source, ubuntu_series, ubuntu_pocket) ubuntu_pkg = UbuntuSourcePackage( src_pkg.source, ubuntu_spph.getVersion(), ubuntu_spph.getComponent(), mirrors=[] ) Logger.info( "Source %s -> %s/%s: current version %s, new version %s", src_pkg.source, ubuntu_series, ubuntu_pocket, ubuntu_pkg.version, src_pkg.version, ) ubuntu_version = Version(ubuntu_pkg.version.full_version) base_version = ubuntu_version.get_related_debian_version() if not force and ubuntu_version.is_modified_in_ubuntu(): Logger.error("--force is required to discard Ubuntu changes.") return None # Check whether a fakesync would be required. if not src_pkg.dsc.compare_dsc(ubuntu_pkg.dsc): Logger.error( "The checksums of the Debian and Ubuntu packages " "mismatch. A fake sync using --fakesync is required." ) return None except udtexceptions.PackageNotFoundException: base_version = Version("~") Logger.info( "Source %s -> %s/%s: not in Ubuntu, new version %s", src_pkg.source, ubuntu_series, ubuntu_pocket, src_pkg.version, ) changes = debian_spph.getChangelog(since_version=base_version) if changes: changes = changes.strip() Logger.info("New changes:\n%s", changes) if simulate: return if sponsoree: Logger.info("Sponsoring this sync for %s (%s)", sponsoree.display_name, sponsoree.name) if not yes: answer = YesNoQuestion().ask("Sync this package", "no") if answer != "yes": return try: ubuntu_archive.copyPackage( source_name=src_pkg.source, version=src_pkg.version.full_version, from_archive=debian_archive, to_series=ubuntu_series, to_pocket=ubuntu_pocket, include_binaries=False, sponsored=sponsoree, ) except HTTPError as error: Logger.error("HTTP Error %s: %s", error.response.status, error.response.reason) Logger.error(error.content) return None Logger.info("Request succeeded; you should get an e-mail once it is processed.") bugs = sorted(set(bugs)) if bugs: Logger.info("Launchpad bugs to be closed: %s", ", ".join(str(bug) for bug in bugs)) Logger.info("Please wait for the sync to be successful before closing bugs.") if yes: close_bugs(bugs, src_pkg.source, src_pkg.version.full_version, changes, sponsoree) else: answer = YesNoQuestion().ask("Close bugs", "yes") if answer == "yes": close_bugs(bugs, src_pkg.source, src_pkg.version.full_version, changes, sponsoree) def is_blocklisted(query): """Determine if package "query" is in the sync blocklist Returns tuple of (blocklisted, comments) blocklisted is one of False, 'CURRENT', 'ALWAYS' """ series = Launchpad.distributions["ubuntu"].current_series lp_comments = series.getDifferenceComments(source_package_name=query) blocklisted = False comments = [ f"{c.body_text}\n -- {c.comment_author.name}" f" {c.comment_date.strftime('%a, %d %b %Y %H:%M:%S +0000')}" for c in lp_comments ] for diff in series.getDifferencesTo(source_package_name_filter=query): if diff.status == "Blacklisted current version" and blocklisted != "ALWAYS": blocklisted = "CURRENT" if diff.status == "Blacklisted always": blocklisted = "ALWAYS" global cached_sync_blocklist if not cached_sync_blocklist: url = "https://ubuntu-archive-team.ubuntu.com/sync-blocklist.txt" try: with urllib.request.urlopen(url) as f: cached_sync_blocklist = f.read().decode("utf-8") except: print("WARNING: unable to download the sync blocklist. Erring on the side of caution.") return ("ALWAYS", "INTERNAL ERROR: Unable to fetch sync blocklist") applicable_lines = [] for line in cached_sync_blocklist.splitlines(): if not line.strip(): applicable_lines = [] continue applicable_lines.append(line) try: line = line[:line.index("#")] except ValueError: pass source = line.strip() if source and fnmatch.fnmatch(query, source): comments += ["From sync-blocklist.txt:"] + applicable_lines blocklisted = "ALWAYS" break return (blocklisted, comments) def close_bugs(bugs, package, version, changes, sponsoree): """Close the correct task on all bugs, with changes""" ubuntu = Launchpad.distributions["ubuntu"] message = f"This bug was fixed in the package {package} - {version}" if sponsoree: message += f"\nSponsored for {sponsoree.display_name} ({sponsoree.name})" if changes: message += "\n\n---------------\n" + changes for bug in bugs: bug = Launchpad.bugs[bug] if bug.duplicate_of is not None: bug = bug.duplicate_of for task in bug.bug_tasks: target = task.target if target == ubuntu or ( target.name == package and getattr(target, "distribution", None) == ubuntu ): if task.status != "Fix Released": Logger.info("Closed bug %s", task.web_link) task.status = "Fix Released" task.lp_save() bug.newMessage(content=message) break else: Logger.error("Cannot find any tasks on LP: #%i to close.", bug.id) def parse(): """Parse given command-line parameters.""" usage = "%(prog)s [options] <.dsc URL/path or package name(s)>" epilog = f"See {os.path.basename(sys.argv[0])}(1) for more info." parser = argparse.ArgumentParser(usage=usage, epilog=epilog) parser.add_argument( "-y", "--yes", action="store_true", help="Automatically sync without prompting. Use with caution and care." ) parser.add_argument("-d", "--distribution", help="Debian distribution to sync from.") parser.add_argument("-r", "--release", help="Specify target Ubuntu release.") parser.add_argument("-V", "--debian-version", help="Specify the version to sync from.") parser.add_argument("-c", "--component", help="Specify the Debian component to sync from.") parser.add_argument( "-b", "--bug", metavar="BUG", dest="bugs", action="append", default=[], help="Mark Launchpad bug BUG as being fixed by this upload.", ) parser.add_argument( "-s", "--sponsor", metavar="USERNAME", dest="sponsoree", help="Sponsor the sync for USERNAME (a Launchpad username).", ) parser.add_argument( "-v", "--verbose", action="store_true", help="Display more progress information." ) parser.add_argument( "-F", "--fakesync", action="store_true", help="Perform a fakesync (a sync where Debian and " "Ubuntu have a .orig.tar mismatch). " "This implies --no-lp and will leave a signed " ".changes file for you to upload.", ) parser.add_argument( "-f", "--force", action="store_true", help="Force sync over the top of Ubuntu changes." ) parser.add_argument( "--no-conf", action="store_true", help="Don't read config files or environment variables." ) parser.add_argument( "-l", "--lpinstance", metavar="INSTANCE", help="Launchpad instance to connect to (default: production).", ) parser.add_argument( "--simulate", action="store_true", help="Show what would be done, but don't actually do it.", ) no_lp = parser.add_argument_group( "Local sync preparation options", "Options that only apply when using --no-lp. " "WARNING: The use of --no-lp is not recommended for uploads " "targeted at Ubuntu. " "The archive-admins discourage its use, except for fakesyncs.", ) no_lp.add_argument( "--no-lp", dest="lp", action="store_false", help="Construct sync locally, rather than letting " "Launchpad copy the package directly. " "It will leave a signed .changes file for you to " "upload.", ) no_lp.add_argument( "-n", "--uploader-name", help="Use UPLOADER_NAME as the name of the maintainer for this upload.", ) no_lp.add_argument( "-e", "--uploader-email", help="Use UPLOADER_EMAIL as email address of the maintainer for this upload.", ) no_lp.add_argument( "-k", "--key", dest="keyid", help="Specify the key ID to be used for signing." ) no_lp.add_argument( "--dont-sign", dest="keyid", action="store_false", help="Do not sign the upload." ) no_lp.add_argument( "-D", "--debian-mirror", metavar="DEBIAN_MIRROR", help=f"Preferred Debian mirror (default: {UDTConfig.defaults['DEBIAN_MIRROR']})", ) no_lp.add_argument( "-U", "--ubuntu-mirror", metavar="UBUNTU_MIRROR", help=f"Preferred Ubuntu mirror (default: {UDTConfig.defaults['UBUNTU_MIRROR']})", ) parser.add_argument("package", nargs="*", help=argparse.SUPPRESS) args = parser.parse_args() if args.fakesync: args.lp = False try: args.bugs = [int(b) for b in args.bugs] except TypeError: parser.error("Invalid bug number(s) specified.") if args.component not in (None, "main", "contrib", "non-free", "non-free-firmware"): parser.error( f"{args.component} is not a valid Debian component. " f"It should be one of main, contrib, non-free, or non-free-firmware." ) if args.lp and args.uploader_name: parser.error("Uploader name can only be overridden using --no-lp.") if args.lp and args.uploader_email: parser.error("Uploader email address can only be overridden using --no-lp.") # --key, --dont-sign, --debian-mirror, and --ubuntu-mirror are just # ignored with args.lp, and do not require warnings. if args.lp: for package in args.package: if package.endswith(".dsc"): parser.error(".dsc files can only be synced using --no-lp.") return args def main(): """Handle parameters and get the ball rolling""" args = parse() if args.verbose: Logger.setLevel("DEBUG") config = UDTConfig(args.no_conf) if args.debian_mirror is None: args.debian_mirror = config.get_value("DEBIAN_MIRROR") if args.ubuntu_mirror is None: args.ubuntu_mirror = config.get_value("UBUNTU_MIRROR") if args.keyid is None: args.keyid = config.get_value("KEYID") if args.lpinstance is None: args.lpinstance = config.get_value("LPINSTANCE") # devel for copyPackage and changelogUrl kwargs = {"service": args.lpinstance, "api_version": "devel"} try: if args.lp and not args.simulate: Launchpad.login(**kwargs) else: Launchpad.login_anonymously(**kwargs) except IOError as e: Logger.error("Could not authenticate to LP: %s", str(e)) sys.exit(1) if args.release is None: ubuntu = Launchpad.distributions["ubuntu"] args.release = f"{ubuntu.current_series.name}-proposed" if not args.fakesync and not args.lp: Logger.warning( "The use of --no-lp is not recommended for uploads " "targeted at Ubuntu. " "The archive-admins discourage its use, except for " "fakesyncs." ) sponsoree = None if args.sponsoree: try: sponsoree = PersonTeam(args.sponsoree) except KeyError: Logger.error('Cannot find the username "%s" in Launchpad.', args.sponsoree) sys.exit(1) if sponsoree and args.uploader_name is None: args.uploader_name = sponsoree.display_name elif args.uploader_name is None: args.uploader_name = ubu_email(export=False)[0] if sponsoree and args.uploader_email is None: try: args.uploader_email = sponsoree.preferred_email_address.email except ValueError: if not args.lp: Logger.error( "%s doesn't have a publicly visible e-mail " "address in LP, please provide one " "--uploader-email option", sponsoree.display_name, ) sys.exit(1) elif args.uploader_email is None: args.uploader_email = ubu_email(export=False)[1] for package in args.package: src_pkg = fetch_source_pkg( package, args.distribution, args.debian_version, args.component, args.release, args.debian_mirror, ) if not src_pkg: continue blocklisted, comments = is_blocklisted(src_pkg.source) blocklist_fail = False if blocklisted: messages = [] if blocklisted == "CURRENT": Logger.debug( "Source package %s is temporarily blocklisted " "(blocklisted_current). " "Ubuntu ignores these for now. " "See also LP: #841372", src_pkg.source, ) else: if args.fakesync: messages += ["Doing a fakesync, overriding blocklist."] else: blocklist_fail = True messages += [ "If this package needs a fakesync, use --fakesync", "If you think this package shouldn't be " "blocklisted, please file a bug explaining your " "reasoning and subscribe ~ubuntu-archive.", ] if blocklist_fail: Logger.error("Source package %s is blocklisted.", src_pkg.source) elif blocklisted == "ALWAYS": Logger.info("Source package %s is blocklisted.", src_pkg.source) if messages: for message in messages: for line in textwrap.wrap(message): Logger.info(line) if comments: Logger.info("Blacklist Comments:") for comment in comments: for line in textwrap.wrap(comment): Logger.info(" %s", line) if blocklist_fail: continue if args.lp: if not copy(src_pkg, args.release, args.bugs, sponsoree, args.simulate, args.force, args.yes): continue else: os.environ["DEB_VENDOR"] = "Ubuntu" if not sync_dsc( src_pkg, args.distribution, args.release, args.uploader_name, args.uploader_email, args.bugs, args.ubuntu_mirror, args.keyid, args.simulate, args.force, args.fakesync, ): continue if __name__ == "__main__": try: main() except KeyboardInterrupt: Logger.info("User abort.")