mirror of
				https://git.launchpad.net/ubuntu-dev-tools
				synced 2025-11-04 16:04:02 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			783 lines
		
	
	
		
			27 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
			
		
		
	
	
			783 lines
		
	
	
		
			27 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
#!/usr/bin/python3
 | 
						|
# -*- coding: utf-8 -*-
 | 
						|
#
 | 
						|
# Copyright (C) 2008-2010 Martin Pitt <martin.pitt@canonical.com>,
 | 
						|
#               2010      Benjamin Drung <bdrung@ubuntu.com>,
 | 
						|
#               2010-2011 Stefano Rivera <stefanor@ubuntu.com>
 | 
						|
#
 | 
						|
# ##################################################################
 | 
						|
#
 | 
						|
# This program is free software; you can redistribute it and/or
 | 
						|
# modify it under the terms of the GNU General Public License
 | 
						|
# as published by the Free Software Foundation; version 3.
 | 
						|
#
 | 
						|
# This program is distributed in the hope that it will be useful,
 | 
						|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 | 
						|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | 
						|
# GNU General Public License for more details.
 | 
						|
#
 | 
						|
# See file /usr/share/common-licenses/GPL-3 for more details.
 | 
						|
#
 | 
						|
# ##################################################################
 | 
						|
 | 
						|
import argparse
 | 
						|
import fnmatch
 | 
						|
import logging
 | 
						|
import os
 | 
						|
import shutil
 | 
						|
import subprocess
 | 
						|
import sys
 | 
						|
import textwrap
 | 
						|
import urllib.request
 | 
						|
 | 
						|
from lazr.restfulclient.errors import HTTPError
 | 
						|
 | 
						|
from ubuntutools import getLogger
 | 
						|
from ubuntutools.archive import DebianSourcePackage, DownloadError, UbuntuSourcePackage
 | 
						|
from ubuntutools.config import UDTConfig, ubu_email
 | 
						|
from ubuntutools.lp import udtexceptions
 | 
						|
from ubuntutools.lp.lpapicache import (
 | 
						|
    Distribution,
 | 
						|
    Launchpad,
 | 
						|
    PersonTeam,
 | 
						|
    SourcePackagePublishingHistory,
 | 
						|
)
 | 
						|
from ubuntutools.misc import split_release_pocket
 | 
						|
from ubuntutools.question import YesNoQuestion
 | 
						|
from ubuntutools.requestsync.lp import get_debian_srcpkg, get_ubuntu_srcpkg
 | 
						|
from ubuntutools.requestsync.mail import get_debian_srcpkg as requestsync_mail_get_debian_srcpkg
 | 
						|
from ubuntutools.version import Version
 | 
						|
 | 
						|
Logger = getLogger()
 | 
						|
 | 
						|
 | 
						|
def remove_signature(dscname):
 | 
						|
    """Removes the signature from a .dsc file if the .dsc file is signed."""
 | 
						|
 | 
						|
    dsc_file = open(dscname, encoding="utf-8")
 | 
						|
    if dsc_file.readline().strip() == "-----BEGIN PGP SIGNED MESSAGE-----":
 | 
						|
        unsigned_file = []
 | 
						|
        # search until begin of body found
 | 
						|
        for line in dsc_file:
 | 
						|
            if line.strip() == "":
 | 
						|
                break
 | 
						|
 | 
						|
        # search for end of body
 | 
						|
        for line in dsc_file:
 | 
						|
            if line.strip() == "":
 | 
						|
                break
 | 
						|
            unsigned_file.append(line)
 | 
						|
 | 
						|
        dsc_file.close()
 | 
						|
        dsc_file = open(dscname, "w", encoding="utf-8")
 | 
						|
        dsc_file.writelines(unsigned_file)
 | 
						|
        dsc_file.close()
 | 
						|
 | 
						|
 | 
						|
def add_fixed_bugs(changes, bugs):
 | 
						|
    """Add additional Launchpad bugs to the list of fixed bugs in changes
 | 
						|
    file."""
 | 
						|
 | 
						|
    changes = [line for line in changes.split("\n") if line.strip() != ""]
 | 
						|
    # Remove duplicates
 | 
						|
    bugs = set(str(bug) for bug in bugs)
 | 
						|
 | 
						|
    for i, change in enumerate(changes):
 | 
						|
        if change.startswith("Launchpad-Bugs-Fixed:"):
 | 
						|
            bugs.update(changes[i][22:].strip().split(" "))
 | 
						|
            changes[i] = f"Launchpad-Bugs-Fixed: {' '.join(bugs)}"
 | 
						|
            break
 | 
						|
        if i == len(changes) - 1:
 | 
						|
            # Launchpad-Bugs-Fixed entry does not exist in changes file
 | 
						|
            line = f"Launchpad-Bugs-Fixed: {' '.join(bugs)}"
 | 
						|
            changes.append(line)
 | 
						|
 | 
						|
    return "\n".join(changes + [""])
 | 
						|
 | 
						|
 | 
						|
def sync_dsc(
 | 
						|
    src_pkg,
 | 
						|
    debian_dist,
 | 
						|
    release,
 | 
						|
    name,
 | 
						|
    email,
 | 
						|
    bugs,
 | 
						|
    ubuntu_mirror,
 | 
						|
    keyid=None,
 | 
						|
    simulate=False,
 | 
						|
    force=False,
 | 
						|
    fakesync=False,
 | 
						|
):
 | 
						|
    """Local sync, trying to emulate sync-source.py
 | 
						|
    Grabs a source package, replaces the .orig.tar with the one from Ubuntu,
 | 
						|
    if necessary, writes a sync-appropriate .changes file, and signs it.
 | 
						|
    """
 | 
						|
 | 
						|
    uploader = name + " <" + email + ">"
 | 
						|
 | 
						|
    new_ver = Version(src_pkg.dsc["Version"])
 | 
						|
 | 
						|
    try:
 | 
						|
        ubuntu_series, ubuntu_pocket = split_release_pocket(release)
 | 
						|
        ubuntu_source = get_ubuntu_srcpkg(src_pkg.source, ubuntu_series, ubuntu_pocket)
 | 
						|
        ubuntu_ver = Version(ubuntu_source.getVersion())
 | 
						|
        ubu_pkg = UbuntuSourcePackage(
 | 
						|
            src_pkg.source,
 | 
						|
            ubuntu_ver.full_version,
 | 
						|
            ubuntu_source.getComponent(),
 | 
						|
            mirrors=[ubuntu_mirror],
 | 
						|
        )
 | 
						|
        need_orig = ubuntu_ver.upstream_version != new_ver.upstream_version
 | 
						|
    except udtexceptions.PackageNotFoundException:
 | 
						|
        ubuntu_ver = Version("~")
 | 
						|
        ubu_pkg = None
 | 
						|
        need_orig = True
 | 
						|
        Logger.info("%s does not exist in Ubuntu.", name)
 | 
						|
 | 
						|
    Logger.debug(
 | 
						|
        "Source %s: current version %s, new version %s", src_pkg.source, ubuntu_ver, new_ver
 | 
						|
    )
 | 
						|
    Logger.debug("Needs source tarball: %s", str(need_orig))
 | 
						|
 | 
						|
    cur_ver = ubuntu_ver.get_related_debian_version()
 | 
						|
    if ubuntu_ver.is_modified_in_ubuntu():
 | 
						|
        if not force:
 | 
						|
            Logger.error("--force is required to discard Ubuntu changes.")
 | 
						|
            sys.exit(1)
 | 
						|
 | 
						|
        Logger.warning(
 | 
						|
            "Overwriting modified Ubuntu version %s, setting current version to %s",
 | 
						|
            ubuntu_ver.full_version,
 | 
						|
            cur_ver.full_version,
 | 
						|
        )
 | 
						|
    if simulate:
 | 
						|
        return
 | 
						|
 | 
						|
    try:
 | 
						|
        src_pkg.pull()
 | 
						|
    except DownloadError as e:
 | 
						|
        Logger.error("Failed to download: %s", str(e))
 | 
						|
        sys.exit(1)
 | 
						|
    src_pkg.unpack()
 | 
						|
 | 
						|
    needs_fakesync = not (need_orig or ubu_pkg.verify_orig())
 | 
						|
 | 
						|
    if needs_fakesync and fakesync:
 | 
						|
        Logger.warning("Performing a fakesync")
 | 
						|
    elif not needs_fakesync and fakesync:
 | 
						|
        Logger.error("Fakesync not required, aborting.")
 | 
						|
        sys.exit(1)
 | 
						|
    elif needs_fakesync and not fakesync:
 | 
						|
        Logger.error(
 | 
						|
            "The checksums of the Debian and Ubuntu packages "
 | 
						|
            "mismatch. A fake sync using --fakesync is required."
 | 
						|
        )
 | 
						|
        sys.exit(1)
 | 
						|
 | 
						|
    if fakesync:
 | 
						|
        # Download Ubuntu files (override Debian source tarballs)
 | 
						|
        try:
 | 
						|
            ubu_pkg.pull()
 | 
						|
        except DownloadError as e:
 | 
						|
            Logger.error("Failed to download: %s", str(e))
 | 
						|
            sys.exit(1)
 | 
						|
 | 
						|
    # change into package directory
 | 
						|
    directory = src_pkg.source + "-" + new_ver.upstream_version
 | 
						|
    Logger.debug("cd %s", directory)
 | 
						|
    os.chdir(directory)
 | 
						|
 | 
						|
    # read Debian distribution from debian/changelog if not specified
 | 
						|
    if debian_dist is None:
 | 
						|
        line = open("debian/changelog", encoding="utf-8").readline()
 | 
						|
        debian_dist = line.split(" ")[2].strip(";")
 | 
						|
 | 
						|
    if not fakesync:
 | 
						|
        # create the changes file
 | 
						|
        changes_filename = f"{src_pkg.source}_{new_ver.strip_epoch()}_source.changes"
 | 
						|
        cmd = [
 | 
						|
            "dpkg-genchanges",
 | 
						|
            "-S",
 | 
						|
            "-v" + cur_ver.full_version,
 | 
						|
            "-DDistribution=" + release,
 | 
						|
            "-DOrigin=debian/" + debian_dist,
 | 
						|
            "-e" + uploader,
 | 
						|
        ]
 | 
						|
        if need_orig:
 | 
						|
            cmd.append("-sa")
 | 
						|
        else:
 | 
						|
            cmd.append("-sd")
 | 
						|
        if not Logger.isEnabledFor(logging.DEBUG):
 | 
						|
            cmd += ["-q"]
 | 
						|
        Logger.debug("%s> ../%s", " ".join(cmd), changes_filename)
 | 
						|
        changes = subprocess.check_output(cmd, encoding="utf-8")
 | 
						|
 | 
						|
        # Add additional bug numbers
 | 
						|
        if len(bugs) > 0:
 | 
						|
            changes = add_fixed_bugs(changes, bugs)
 | 
						|
 | 
						|
        # remove extracted (temporary) files
 | 
						|
        Logger.debug("cd ..")
 | 
						|
        os.chdir("..")
 | 
						|
        shutil.rmtree(directory, True)
 | 
						|
 | 
						|
        # write changes file
 | 
						|
        changes_file = open(changes_filename, "w", encoding="utf-8")
 | 
						|
        changes_file.writelines(changes)
 | 
						|
        changes_file.close()
 | 
						|
 | 
						|
        # remove signature and sign package
 | 
						|
        remove_signature(src_pkg.dsc_name)
 | 
						|
        if keyid is not False:
 | 
						|
            cmd = ["debsign", changes_filename]
 | 
						|
            if keyid is not None:
 | 
						|
                cmd.insert(1, "-k" + keyid)
 | 
						|
            Logger.debug(" ".join(cmd))
 | 
						|
            subprocess.check_call(cmd)
 | 
						|
    else:
 | 
						|
        # Create fakesync changelog entry
 | 
						|
        new_ver = Version(new_ver.full_version + "fakesync1")
 | 
						|
        changes_filename = f"{src_pkg.source}_{new_ver.strip_epoch()}_source.changes"
 | 
						|
        if len(bugs) > 0:
 | 
						|
            bug_numbers = [f"#{b}" for b in bugs]
 | 
						|
            message = f"Fake sync due to mismatching orig tarball (LP: {', '.join(bug_numbers)})."
 | 
						|
        else:
 | 
						|
            message = "Fake sync due to mismatching orig tarball."
 | 
						|
        cmd = ["dch", "-v", new_ver.full_version, "--force-distribution", "-D", release, message]
 | 
						|
        env = {"DEBFULLNAME": name, "DEBEMAIL": email}
 | 
						|
        Logger.debug(" ".join(cmd))
 | 
						|
        subprocess.check_call(cmd, env=env)
 | 
						|
 | 
						|
        # update the Maintainer field
 | 
						|
        cmd = ["update-maintainer"]
 | 
						|
        if not Logger.isEnabledFor(logging.DEBUG):
 | 
						|
            cmd.append("-q")
 | 
						|
        Logger.debug(" ".join(cmd))
 | 
						|
        subprocess.check_call(cmd)
 | 
						|
 | 
						|
        # Build source package
 | 
						|
        cmd = ["debuild", "--no-lintian", "-nc", "-S", "-v" + cur_ver.full_version]
 | 
						|
        if need_orig:
 | 
						|
            cmd += ["-sa"]
 | 
						|
        if keyid:
 | 
						|
            cmd += ["-k" + keyid]
 | 
						|
        Logger.debug(" ".join(cmd))
 | 
						|
        returncode = subprocess.call(cmd)
 | 
						|
        if returncode != 0:
 | 
						|
            Logger.error("Source-only build with debuild failed. Please check build log above.")
 | 
						|
            sys.exit(1)
 | 
						|
 | 
						|
 | 
						|
def fetch_source_pkg(package, dist, version, component, ubuntu_release, mirror):
 | 
						|
    """Download the specified source package.
 | 
						|
    dist, version, component, mirror can all be None.
 | 
						|
    """
 | 
						|
    if mirror is None:
 | 
						|
        mirrors = []
 | 
						|
    else:
 | 
						|
        mirrors = [mirror]
 | 
						|
 | 
						|
    if package.endswith(".dsc"):
 | 
						|
        return DebianSourcePackage(dscfile=package, mirrors=mirrors)
 | 
						|
 | 
						|
    if dist is None:
 | 
						|
        dist = "unstable"
 | 
						|
 | 
						|
    requested_version = version
 | 
						|
    if isinstance(version, str):
 | 
						|
        version = Version(version)
 | 
						|
 | 
						|
    if version is None or component is None:
 | 
						|
        try:
 | 
						|
            debian_srcpkg = get_debian_srcpkg(package, dist)
 | 
						|
        except (
 | 
						|
            udtexceptions.PackageNotFoundException,
 | 
						|
            udtexceptions.SeriesNotFoundException,
 | 
						|
        ) as e:
 | 
						|
            Logger.error(str(e))
 | 
						|
            sys.exit(1)
 | 
						|
        if version is None:
 | 
						|
            version = Version(debian_srcpkg.getVersion())
 | 
						|
        try:
 | 
						|
            ubuntu_series, ubuntu_pocket = split_release_pocket(ubuntu_release)
 | 
						|
            ubuntu_srcpkg = get_ubuntu_srcpkg(package, ubuntu_series, ubuntu_pocket)
 | 
						|
            ubuntu_version = Version(ubuntu_srcpkg.getVersion())
 | 
						|
        except udtexceptions.PackageNotFoundException:
 | 
						|
            ubuntu_version = Version("~")
 | 
						|
        except udtexceptions.SeriesNotFoundException as e:
 | 
						|
            Logger.error(str(e))
 | 
						|
            sys.exit(1)
 | 
						|
        if ubuntu_version >= version:
 | 
						|
            # The LP importer is maybe out of date
 | 
						|
            debian_srcpkg = requestsync_mail_get_debian_srcpkg(package, dist)
 | 
						|
            if requested_version is None:
 | 
						|
                version = Version(debian_srcpkg.getVersion())
 | 
						|
            if ubuntu_version >= version:
 | 
						|
                Logger.error(
 | 
						|
                    "Version in Debian %s (%s) isn't newer than Ubuntu %s (%s)",
 | 
						|
                    version,
 | 
						|
                    dist,
 | 
						|
                    ubuntu_version,
 | 
						|
                    ubuntu_release,
 | 
						|
                )
 | 
						|
                sys.exit(1)
 | 
						|
        if component is None:
 | 
						|
            component = debian_srcpkg.getComponent()
 | 
						|
 | 
						|
    assert component in ("main", "contrib", "non-free", "non-free-firmware")
 | 
						|
 | 
						|
    return DebianSourcePackage(package, version.full_version, component, mirrors=mirrors)
 | 
						|
 | 
						|
 | 
						|
def copy(src_pkg, release, bugs, sponsoree=None, simulate=False, force=False):
 | 
						|
    """Copy a source package from Debian to Ubuntu using the Launchpad API."""
 | 
						|
    ubuntu = Distribution("ubuntu")
 | 
						|
    debian_archive = Distribution("debian").getArchive()
 | 
						|
    ubuntu_archive = ubuntu.getArchive()
 | 
						|
    if release is None:
 | 
						|
        ubuntu_series = ubuntu.getDevelopmentSeries().name
 | 
						|
        ubuntu_pocket = "Release"
 | 
						|
    else:
 | 
						|
        ubuntu_series, ubuntu_pocket = split_release_pocket(release)
 | 
						|
 | 
						|
    # Ensure that the provided Debian version actually exists.
 | 
						|
    try:
 | 
						|
        debian_spph = SourcePackagePublishingHistory(
 | 
						|
            debian_archive.getPublishedSources(
 | 
						|
                source_name=src_pkg.source, version=src_pkg.version.full_version, exact_match=True
 | 
						|
            )[0]
 | 
						|
        )
 | 
						|
    except IndexError:
 | 
						|
        Logger.error(
 | 
						|
            "Debian version %s has not been picked up by LP yet. Please try again later.",
 | 
						|
            src_pkg.version,
 | 
						|
        )
 | 
						|
        sys.exit(1)
 | 
						|
 | 
						|
    try:
 | 
						|
        ubuntu_spph = get_ubuntu_srcpkg(src_pkg.source, ubuntu_series, ubuntu_pocket)
 | 
						|
        ubuntu_pkg = UbuntuSourcePackage(
 | 
						|
            src_pkg.source, ubuntu_spph.getVersion(), ubuntu_spph.getComponent(), mirrors=[]
 | 
						|
        )
 | 
						|
 | 
						|
        Logger.info(
 | 
						|
            "Source %s -> %s/%s: current version %s, new version %s",
 | 
						|
            src_pkg.source,
 | 
						|
            ubuntu_series,
 | 
						|
            ubuntu_pocket,
 | 
						|
            ubuntu_pkg.version,
 | 
						|
            src_pkg.version,
 | 
						|
        )
 | 
						|
 | 
						|
        ubuntu_version = Version(ubuntu_pkg.version.full_version)
 | 
						|
        base_version = ubuntu_version.get_related_debian_version()
 | 
						|
        if not force and ubuntu_version.is_modified_in_ubuntu():
 | 
						|
            Logger.error("--force is required to discard Ubuntu changes.")
 | 
						|
            sys.exit(1)
 | 
						|
 | 
						|
        # Check whether a fakesync would be required.
 | 
						|
        if not src_pkg.dsc.compare_dsc(ubuntu_pkg.dsc):
 | 
						|
            Logger.error(
 | 
						|
                "The checksums of the Debian and Ubuntu packages "
 | 
						|
                "mismatch. A fake sync using --fakesync is required."
 | 
						|
            )
 | 
						|
            sys.exit(1)
 | 
						|
    except udtexceptions.PackageNotFoundException:
 | 
						|
        base_version = Version("~")
 | 
						|
        Logger.info(
 | 
						|
            "Source %s -> %s/%s: not in Ubuntu, new version %s",
 | 
						|
            src_pkg.source,
 | 
						|
            ubuntu_series,
 | 
						|
            ubuntu_pocket,
 | 
						|
            src_pkg.version,
 | 
						|
        )
 | 
						|
 | 
						|
    changes = debian_spph.getChangelog(since_version=base_version)
 | 
						|
    if changes:
 | 
						|
        changes = changes.strip()
 | 
						|
        Logger.info("New changes:\n%s", changes)
 | 
						|
 | 
						|
    if simulate:
 | 
						|
        return
 | 
						|
 | 
						|
    if sponsoree:
 | 
						|
        Logger.info("Sponsoring this sync for %s (%s)", sponsoree.display_name, sponsoree.name)
 | 
						|
    answer = YesNoQuestion().ask("Sync this package", "no")
 | 
						|
    if answer != "yes":
 | 
						|
        return
 | 
						|
 | 
						|
    try:
 | 
						|
        ubuntu_archive.copyPackage(
 | 
						|
            source_name=src_pkg.source,
 | 
						|
            version=src_pkg.version.full_version,
 | 
						|
            from_archive=debian_archive,
 | 
						|
            to_series=ubuntu_series,
 | 
						|
            to_pocket=ubuntu_pocket,
 | 
						|
            include_binaries=False,
 | 
						|
            sponsored=sponsoree,
 | 
						|
        )
 | 
						|
    except HTTPError as error:
 | 
						|
        Logger.error("HTTP Error %s: %s", error.response.status, error.response.reason)
 | 
						|
        Logger.error(error.content)
 | 
						|
        sys.exit(1)
 | 
						|
 | 
						|
    Logger.info("Request succeeded; you should get an e-mail once it is processed.")
 | 
						|
    bugs = sorted(set(bugs))
 | 
						|
    if bugs:
 | 
						|
        Logger.info("Launchpad bugs to be closed: %s", ", ".join(str(bug) for bug in bugs))
 | 
						|
        Logger.info("Please wait for the sync to be successful before closing bugs.")
 | 
						|
        answer = YesNoQuestion().ask("Close bugs", "yes")
 | 
						|
        if answer == "yes":
 | 
						|
            close_bugs(bugs, src_pkg.source, src_pkg.version.full_version, changes, sponsoree)
 | 
						|
 | 
						|
 | 
						|
def is_blacklisted(query):
 | 
						|
    """Determine if package "query" is in the sync blacklist
 | 
						|
    Returns tuple of (blacklisted, comments)
 | 
						|
    blacklisted is one of False, 'CURRENT', 'ALWAYS'
 | 
						|
    """
 | 
						|
    series = Launchpad.distributions["ubuntu"].current_series
 | 
						|
    lp_comments = series.getDifferenceComments(source_package_name=query)
 | 
						|
    blacklisted = False
 | 
						|
    comments = [
 | 
						|
        f"{c.body_text}\n  -- {c.comment_author.name}"
 | 
						|
        f"  {c.comment_date.strftime('%a, %d %b %Y %H:%M:%S +0000')}"
 | 
						|
        for c in lp_comments
 | 
						|
    ]
 | 
						|
 | 
						|
    for diff in series.getDifferencesTo(source_package_name_filter=query):
 | 
						|
        if diff.status == "Blacklisted current version" and blacklisted != "ALWAYS":
 | 
						|
            blacklisted = "CURRENT"
 | 
						|
        if diff.status == "Blacklisted always":
 | 
						|
            blacklisted = "ALWAYS"
 | 
						|
 | 
						|
    # Old blacklist:
 | 
						|
    url = "https://ubuntu-archive-team.ubuntu.com/sync-blacklist.txt"
 | 
						|
    with urllib.request.urlopen(url) as f:
 | 
						|
        applicable_lines = []
 | 
						|
        for line in f:
 | 
						|
            line = line.decode("utf-8")
 | 
						|
            if not line.strip():
 | 
						|
                applicable_lines = []
 | 
						|
                continue
 | 
						|
            applicable_lines.append(line)
 | 
						|
            try:
 | 
						|
                line = line[: line.index("#")]
 | 
						|
            except ValueError:
 | 
						|
                pass
 | 
						|
            source = line.strip()
 | 
						|
            if source and fnmatch.fnmatch(query, source):
 | 
						|
                comments += ["From sync-blacklist.txt:"] + applicable_lines
 | 
						|
                blacklisted = "ALWAYS"
 | 
						|
                break
 | 
						|
 | 
						|
    return (blacklisted, comments)
 | 
						|
 | 
						|
 | 
						|
def close_bugs(bugs, package, version, changes, sponsoree):
 | 
						|
    """Close the correct task on all bugs, with changes"""
 | 
						|
    ubuntu = Launchpad.distributions["ubuntu"]
 | 
						|
    message = f"This bug was fixed in the package {package} - {version}"
 | 
						|
    if sponsoree:
 | 
						|
        message += f"\nSponsored for {sponsoree.display_name} ({sponsoree.name})"
 | 
						|
    if changes:
 | 
						|
        message += "\n\n---------------\n" + changes
 | 
						|
    for bug in bugs:
 | 
						|
        bug = Launchpad.bugs[bug]
 | 
						|
        if bug.duplicate_of is not None:
 | 
						|
            bug = bug.duplicate_of
 | 
						|
        for task in bug.bug_tasks:
 | 
						|
            target = task.target
 | 
						|
            if target == ubuntu or (
 | 
						|
                target.name == package and getattr(target, "distribution", None) == ubuntu
 | 
						|
            ):
 | 
						|
                if task.status != "Fix Released":
 | 
						|
                    Logger.info("Closed bug %s", task.web_link)
 | 
						|
                    task.status = "Fix Released"
 | 
						|
                    task.lp_save()
 | 
						|
                    bug.newMessage(content=message)
 | 
						|
                break
 | 
						|
        else:
 | 
						|
            Logger.error("Cannot find any tasks on LP: #%i to close.", bug.id)
 | 
						|
 | 
						|
 | 
						|
def parse():
 | 
						|
    """Parse given command-line parameters."""
 | 
						|
 | 
						|
    usage = "%(prog)s [options] <.dsc URL/path or package name(s)>"
 | 
						|
    epilog = f"See {os.path.basename(sys.argv[0])}(1) for more info."
 | 
						|
    parser = argparse.ArgumentParser(usage=usage, epilog=epilog)
 | 
						|
 | 
						|
    parser.add_argument("-d", "--distribution", help="Debian distribution to sync from.")
 | 
						|
    parser.add_argument("-r", "--release", help="Specify target Ubuntu release.")
 | 
						|
    parser.add_argument("-V", "--debian-version", help="Specify the version to sync from.")
 | 
						|
    parser.add_argument("-c", "--component", help="Specify the Debian component to sync from.")
 | 
						|
    parser.add_argument(
 | 
						|
        "-b",
 | 
						|
        "--bug",
 | 
						|
        metavar="BUG",
 | 
						|
        dest="bugs",
 | 
						|
        action="append",
 | 
						|
        default=[],
 | 
						|
        help="Mark Launchpad bug BUG as being fixed by this upload.",
 | 
						|
    )
 | 
						|
    parser.add_argument(
 | 
						|
        "-s",
 | 
						|
        "--sponsor",
 | 
						|
        metavar="USERNAME",
 | 
						|
        dest="sponsoree",
 | 
						|
        help="Sponsor the sync for USERNAME (a Launchpad username).",
 | 
						|
    )
 | 
						|
    parser.add_argument(
 | 
						|
        "-v", "--verbose", action="store_true", help="Display more progress information."
 | 
						|
    )
 | 
						|
    parser.add_argument(
 | 
						|
        "-F",
 | 
						|
        "--fakesync",
 | 
						|
        action="store_true",
 | 
						|
        help="Perform a fakesync (a sync where Debian and "
 | 
						|
        "Ubuntu have a .orig.tar mismatch). "
 | 
						|
        "This implies --no-lp and will leave a signed "
 | 
						|
        ".changes file for you to upload.",
 | 
						|
    )
 | 
						|
    parser.add_argument(
 | 
						|
        "-f", "--force", action="store_true", help="Force sync over the top of Ubuntu changes."
 | 
						|
    )
 | 
						|
    parser.add_argument(
 | 
						|
        "--no-conf", action="store_true", help="Don't read config files or environment variables."
 | 
						|
    )
 | 
						|
    parser.add_argument(
 | 
						|
        "-l",
 | 
						|
        "--lpinstance",
 | 
						|
        metavar="INSTANCE",
 | 
						|
        help="Launchpad instance to connect to (default: production).",
 | 
						|
    )
 | 
						|
    parser.add_argument(
 | 
						|
        "--simulate",
 | 
						|
        action="store_true",
 | 
						|
        help="Show what would be done, but don't actually do it.",
 | 
						|
    )
 | 
						|
 | 
						|
    no_lp = parser.add_argument_group(
 | 
						|
        "Local sync preparation options",
 | 
						|
        "Options that only apply when using --no-lp.  "
 | 
						|
        "WARNING: The use of --no-lp is not recommended for uploads "
 | 
						|
        "targeted at Ubuntu. "
 | 
						|
        "The archive-admins discourage its use, except for fakesyncs.",
 | 
						|
    )
 | 
						|
    no_lp.add_argument(
 | 
						|
        "--no-lp",
 | 
						|
        dest="lp",
 | 
						|
        action="store_false",
 | 
						|
        help="Construct sync locally, rather than letting "
 | 
						|
        "Launchpad copy the package directly. "
 | 
						|
        "It will leave a signed .changes file for you to "
 | 
						|
        "upload.",
 | 
						|
    )
 | 
						|
    no_lp.add_argument(
 | 
						|
        "-n",
 | 
						|
        "--uploader-name",
 | 
						|
        help="Use UPLOADER_NAME as the name of the maintainer for this upload.",
 | 
						|
    )
 | 
						|
    no_lp.add_argument(
 | 
						|
        "-e",
 | 
						|
        "--uploader-email",
 | 
						|
        help="Use UPLOADER_EMAIL as email address of the maintainer for this upload.",
 | 
						|
    )
 | 
						|
    no_lp.add_argument(
 | 
						|
        "-k", "--key", dest="keyid", help="Specify the key ID to be used for signing."
 | 
						|
    )
 | 
						|
    no_lp.add_argument(
 | 
						|
        "--dont-sign", dest="keyid", action="store_false", help="Do not sign the upload."
 | 
						|
    )
 | 
						|
    no_lp.add_argument(
 | 
						|
        "-D",
 | 
						|
        "--debian-mirror",
 | 
						|
        metavar="DEBIAN_MIRROR",
 | 
						|
        help=f"Preferred Debian mirror (default: {UDTConfig.defaults['DEBIAN_MIRROR']})",
 | 
						|
    )
 | 
						|
    no_lp.add_argument(
 | 
						|
        "-U",
 | 
						|
        "--ubuntu-mirror",
 | 
						|
        metavar="UBUNTU_MIRROR",
 | 
						|
        help=f"Preferred Ubuntu mirror (default: {UDTConfig.defaults['UBUNTU_MIRROR']})",
 | 
						|
    )
 | 
						|
    parser.add_argument("package", nargs="*", help=argparse.SUPPRESS)
 | 
						|
    args = parser.parse_args()
 | 
						|
 | 
						|
    if args.fakesync:
 | 
						|
        args.lp = False
 | 
						|
 | 
						|
    try:
 | 
						|
        args.bugs = [int(b) for b in args.bugs]
 | 
						|
    except TypeError:
 | 
						|
        parser.error("Invalid bug number(s) specified.")
 | 
						|
 | 
						|
    if args.component not in (None, "main", "contrib", "non-free", "non-free-firmware"):
 | 
						|
        parser.error(
 | 
						|
            f"{args.component} is not a valid Debian component. "
 | 
						|
            f"It should be one of main, contrib, non-free, or non-free-firmware."
 | 
						|
        )
 | 
						|
 | 
						|
    if args.lp and args.uploader_name:
 | 
						|
        parser.error("Uploader name can only be overridden using --no-lp.")
 | 
						|
    if args.lp and args.uploader_email:
 | 
						|
        parser.error("Uploader email address can only be overridden using --no-lp.")
 | 
						|
    # --key, --dont-sign, --debian-mirror, and --ubuntu-mirror are just
 | 
						|
    # ignored with args.lp, and do not require warnings.
 | 
						|
 | 
						|
    if args.lp:
 | 
						|
        for package in args.package:
 | 
						|
            if package.endswith(".dsc"):
 | 
						|
                parser.error(".dsc files can only be synced using --no-lp.")
 | 
						|
 | 
						|
    return args
 | 
						|
 | 
						|
 | 
						|
def main():
 | 
						|
    """Handle parameters and get the ball rolling"""
 | 
						|
    args = parse()
 | 
						|
 | 
						|
    if args.verbose:
 | 
						|
        Logger.setLevel("DEBUG")
 | 
						|
    config = UDTConfig(args.no_conf)
 | 
						|
    if args.debian_mirror is None:
 | 
						|
        args.debian_mirror = config.get_value("DEBIAN_MIRROR")
 | 
						|
    if args.ubuntu_mirror is None:
 | 
						|
        args.ubuntu_mirror = config.get_value("UBUNTU_MIRROR")
 | 
						|
 | 
						|
    if args.keyid is None:
 | 
						|
        args.keyid = config.get_value("KEYID")
 | 
						|
 | 
						|
    if args.lpinstance is None:
 | 
						|
        args.lpinstance = config.get_value("LPINSTANCE")
 | 
						|
 | 
						|
    # devel for copyPackage and changelogUrl
 | 
						|
    kwargs = {"service": args.lpinstance, "api_version": "devel"}
 | 
						|
    try:
 | 
						|
        if args.lp and not args.simulate:
 | 
						|
            Launchpad.login(**kwargs)
 | 
						|
        else:
 | 
						|
            Launchpad.login_anonymously(**kwargs)
 | 
						|
    except IOError as e:
 | 
						|
        Logger.error("Could not authenticate to LP: %s", str(e))
 | 
						|
        sys.exit(1)
 | 
						|
 | 
						|
    if args.release is None:
 | 
						|
        ubuntu = Launchpad.distributions["ubuntu"]
 | 
						|
        args.release = f"{ubuntu.current_series.name}-proposed"
 | 
						|
 | 
						|
    if not args.fakesync and not args.lp:
 | 
						|
        Logger.warning(
 | 
						|
            "The use of --no-lp is not recommended for uploads "
 | 
						|
            "targeted at Ubuntu. "
 | 
						|
            "The archive-admins discourage its use, except for "
 | 
						|
            "fakesyncs."
 | 
						|
        )
 | 
						|
 | 
						|
    sponsoree = None
 | 
						|
    if args.sponsoree:
 | 
						|
        try:
 | 
						|
            sponsoree = PersonTeam(args.sponsoree)
 | 
						|
        except KeyError:
 | 
						|
            Logger.error('Cannot find the username "%s" in Launchpad.', args.sponsoree)
 | 
						|
            sys.exit(1)
 | 
						|
 | 
						|
    if sponsoree and args.uploader_name is None:
 | 
						|
        args.uploader_name = sponsoree.display_name
 | 
						|
    elif args.uploader_name is None:
 | 
						|
        args.uploader_name = ubu_email(export=False)[0]
 | 
						|
 | 
						|
    if sponsoree and args.uploader_email is None:
 | 
						|
        try:
 | 
						|
            args.uploader_email = sponsoree.preferred_email_address.email
 | 
						|
        except ValueError:
 | 
						|
            if not args.lp:
 | 
						|
                Logger.error(
 | 
						|
                    "%s doesn't have a publicly visible e-mail "
 | 
						|
                    "address in LP, please provide one "
 | 
						|
                    "--uploader-email option",
 | 
						|
                    sponsoree.display_name,
 | 
						|
                )
 | 
						|
                sys.exit(1)
 | 
						|
    elif args.uploader_email is None:
 | 
						|
        args.uploader_email = ubu_email(export=False)[1]
 | 
						|
 | 
						|
    for package in args.package:
 | 
						|
        src_pkg = fetch_source_pkg(
 | 
						|
            package,
 | 
						|
            args.distribution,
 | 
						|
            args.debian_version,
 | 
						|
            args.component,
 | 
						|
            args.release,
 | 
						|
            args.debian_mirror,
 | 
						|
        )
 | 
						|
 | 
						|
        blacklisted, comments = is_blacklisted(src_pkg.source)
 | 
						|
        blacklist_fail = False
 | 
						|
        if blacklisted:
 | 
						|
            messages = []
 | 
						|
 | 
						|
            if blacklisted == "CURRENT":
 | 
						|
                Logger.debug(
 | 
						|
                    "Source package %s is temporarily blacklisted "
 | 
						|
                    "(blacklisted_current). "
 | 
						|
                    "Ubuntu ignores these for now. "
 | 
						|
                    "See also LP: #841372",
 | 
						|
                    src_pkg.source,
 | 
						|
                )
 | 
						|
            else:
 | 
						|
                if args.fakesync:
 | 
						|
                    messages += ["Doing a fakesync, overriding blacklist."]
 | 
						|
                else:
 | 
						|
                    blacklist_fail = True
 | 
						|
                    messages += [
 | 
						|
                        "If this package needs a fakesync, use --fakesync",
 | 
						|
                        "If you think this package shouldn't be "
 | 
						|
                        "blacklisted, please file a bug explaining your "
 | 
						|
                        "reasoning and subscribe ~ubuntu-archive.",
 | 
						|
                    ]
 | 
						|
 | 
						|
            if blacklist_fail:
 | 
						|
                Logger.error("Source package %s is blacklisted.", src_pkg.source)
 | 
						|
            elif blacklisted == "ALWAYS":
 | 
						|
                Logger.info("Source package %s is blacklisted.", src_pkg.source)
 | 
						|
            if messages:
 | 
						|
                for message in messages:
 | 
						|
                    for line in textwrap.wrap(message):
 | 
						|
                        Logger.info(line)
 | 
						|
 | 
						|
        if comments:
 | 
						|
            Logger.info("Blacklist Comments:")
 | 
						|
            for comment in comments:
 | 
						|
                for line in textwrap.wrap(comment):
 | 
						|
                    Logger.info("  %s", line)
 | 
						|
 | 
						|
        if blacklist_fail:
 | 
						|
            sys.exit(1)
 | 
						|
 | 
						|
        if args.lp:
 | 
						|
            copy(src_pkg, args.release, args.bugs, sponsoree, args.simulate, args.force)
 | 
						|
        else:
 | 
						|
            os.environ["DEB_VENDOR"] = "Ubuntu"
 | 
						|
            sync_dsc(
 | 
						|
                src_pkg,
 | 
						|
                args.distribution,
 | 
						|
                args.release,
 | 
						|
                args.uploader_name,
 | 
						|
                args.uploader_email,
 | 
						|
                args.bugs,
 | 
						|
                args.ubuntu_mirror,
 | 
						|
                args.keyid,
 | 
						|
                args.simulate,
 | 
						|
                args.force,
 | 
						|
                args.fakesync,
 | 
						|
            )
 | 
						|
 | 
						|
 | 
						|
if __name__ == "__main__":
 | 
						|
    try:
 | 
						|
        main()
 | 
						|
    except KeyboardInterrupt:
 | 
						|
        Logger.info("User abort.")
 |