ubuntu-dev-tools/sponsor-patch

700 lines
22 KiB
Python
Executable File

#!/usr/bin/python
#
# Copyright (C) 2010, Benjamin Drung <bdrung@ubuntu.com>
#
# Permission to use, copy, modify, and/or distribute this software for any
# purpose with or without fee is hereby granted, provided that the above
# copyright notice and this permission notice appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
# OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
import debian.changelog
import debian.deb822
import debian.debian_support
import launchpadlib.launchpad
import optparse
import os
import pwd
import re
import shutil
import subprocess
import sys
import urllib
USER_ABORT = 2
class BugTask(object):
def __init__(self, bug_task, launchpad):
self.bug_task = bug_task
self.launchpad = launchpad
components = re.split(" \(| ", self.bug_task.bug_target_name.strip(")"))
assert len(components) >= 1 and len(components) <= 3
if len(components) == 1:
self.package = None
self.project = components[0]
self.series = None
elif len(components) == 2:
self.package = components[0]
self.project = components[1].lower()
self.series = None
elif len(components) == 3:
self.package = components[0]
self.project = components[1].lower()
self.series = components[2].lower()
def download_source(self):
source_files = self.get_source().sourceFileUrls()
dsc_file = None
for url in source_files:
filename = urllib.unquote(os.path.basename(url))
Print.info("Downloading %s..." % (filename))
urllib.urlretrieve(url, filename)
if url.endswith(".dsc"):
dsc_file = filename
return os.path.join(os.getcwd(), dsc_file)
def get_branch_link(self):
return "lp:" + self.project + "/" + self.get_series() + "/" + self.package
def get_long_info(self):
return "Bug task: " + str(self.bug_task) + "\n" + \
"Package: " + str(self.package) + "\n" + \
"Project: " + str(self.project) + "\n" + \
"Series: " + str(self.series)
def get_package_and_series(self):
result = self.package
if self.series:
result += " (" + self.series + ")"
return result
def get_previous_version(self):
if self.is_merge():
previous_version = self.get_latest_released_version()
else:
previous_version = self.get_version()
return previous_version
def get_series(self, latest_release=False):
if self.series is None or latest_release:
return self.launchpad.distributions[self.project].current_series.name
else:
return self.series
def get_short_info(self):
return self.bug_task.bug_target_name + ": " + self.bug_task.status
def get_source(self, latest_release=False):
assert self.package is not None
if self.is_merge() and not latest_release:
project = "debian"
title = self.bug_task.title.lower().split()
if "experimental" in title:
series = "experimental"
elif "testing" in title:
# TODO: Do not hard code series!
series = "squeeze"
else:
series = "sid"
status = "Pending"
else:
project = self.project
series = self.get_series(latest_release)
status = "Published"
dist = self.launchpad.distributions[project]
archive = dist.getArchive(name="primary")
distro_series = dist.getSeries(name_or_version=series)
published = archive.getPublishedSources(source_name=self.package,
distro_series=distro_series, status=status, exact_match=True)
latest_source = None
for source in published:
if source.pocket in ('Release', 'Security', 'Updates', 'Proposed'):
latest_source = source
break
return latest_source
def get_version(self):
return debian.debian_support.Version(self.get_source().source_package_version)
def get_latest_released_version(self):
version = self.get_source(True).source_package_version
return debian.debian_support.Version(version)
def is_complete(self):
return self.bug_task.is_complete
def is_merge(self):
bug = self.bug_task.bug
return "merge" in bug.title.lower().split(" ") or "merge" in bug.tags
def is_ubuntu_task(self):
return self.project == "ubuntu"
class Patch(object):
def __init__(self, patch_file):
self.patch_file = patch_file
self.full_path = os.path.realpath(self.patch_file)
assert os.path.isfile(self.full_path), "%s does not exist." % (self.full_path)
cmd = ["diffstat", "-l", "-p0", self.full_path]
changed_files = subprocess.Popen(cmd, stdout=subprocess.PIPE).communicate()[0]
self.changed_files = filter(lambda l: l != "", changed_files.split("\n"))
def get_name(self):
return self.patch_file
def get_strip_level(self):
strip_level = None
if self.is_debdiff():
changelog = filter(lambda f: f.endswith("debian/changelog"), self.changed_files)[0]
strip_level = len(changelog.split(os.sep)) - 2
return strip_level
def is_debdiff(self):
return len(filter(lambda f: f.endswith("debian/changelog"), self.changed_files)) > 0
class Print(object):
script_name = os.path.basename(sys.argv[0])
verbose = False
@classmethod
def command(self, cmd):
if self.verbose:
for i in xrange(len(cmd)):
if cmd[i].find(" ") >= 0:
cmd[i] = '"' + cmd[i] + '"'
print "%s: I: %s" % (script_name, " ".join(cmd))
@classmethod
def debug(self, message):
if self.verbose:
print "%s: D: %s" % (script_name, message)
@classmethod
def error(self, message):
print >> sys.stderr, "%s: Error: %s" % (script_name, message)
@classmethod
def info(self, message):
if self.verbose:
print "%s: I: %s" % (script_name, message)
@classmethod
def normal(self, message):
print "%s: %s" % (script_name, message)
@classmethod
def set_verbosity(self, verbose):
self.verbose = verbose
def get_source_package_name(bug_task):
package = None
if bug_task.bug_target_name != "ubuntu":
assert bug_task.bug_target_name.endswith("(Ubuntu)")
package = bug_task.bug_target_name.split(" ")[0]
return package
def get_user_shell():
try:
shell = os.environ["SHELL"]
except KeyError:
shell = pwd.getpwuid(os.getuid())[6]
return shell
def input_number(question, min_number, max_number, default=None):
if default:
question += " [%i]? " % (default)
else:
question += "? "
selected = None
while selected < min_number or selected > max_number:
selected = raw_input(question).strip()
if default and selected == "":
selected = default
else:
try:
selected = int(selected)
if selected < min_number or selected > max_number:
print "Please input a number between %i and %i." % \
(min_number, max_number)
except ValueError:
print "Please input a number."
assert type(selected) == int
return selected
def boolean_question(question, default):
if default is True:
question += " [Y/n]? "
else:
question += " [y/N]? "
selected = None
while type(selected) != bool:
selected = raw_input(question).strip().lower()
if selected == "":
selected = default
elif selected in ("y", "yes"):
selected = True
elif selected in ("n", "no"):
selected = False
else:
print "Please answer the question with yes or no."
return selected
def yes_edit_no_question(question, default):
assert default in ("yes", "edit", "no")
if default == "yes":
question += " [Y/e/n]? "
elif default == "edit":
question += " [y/E/n]? "
else:
question += " [y/e/N]? "
selected = None
while selected not in ("yes", "edit", "no"):
selected = raw_input(question).strip().lower()
if selected == "":
selected = default
elif selected in ("y", "yes"):
selected = "yes"
elif selected in ("e", "edit"):
selected = "edit"
elif selected in ("n", "no"):
selected = "no"
else:
print "Please answer the question with yes, edit, or no."
return selected
def edit_source():
# Spawn shell to allow modifications
cmd = [get_user_shell()]
Print.command(cmd)
print """An interactive shell in file://%s was launched.
Edit your files. When you are done, exit the shell. If you wish to abort the
process, exit the shell such that it returns an exit code other than zero.""" % \
(os.getcwd())
returncode = subprocess.call(cmd)
if returncode != 0:
Print.error("Shell exited with exit value %i." % (returncode))
sys.exit(1)
def get_fixed_lauchpad_bugs(changes_file):
assert os.path.isfile(changes_file), "%s does not exist." % (changes_file)
changes = debian.deb822.Changes(file(changes_file))
fixed_bugs = []
if "Launchpad-Bugs-Fixed" in changes:
fixed_bugs = changes["Launchpad-Bugs-Fixed"].split(" ")
fixed_bugs = map(int, fixed_bugs)
return fixed_bugs
def strip_epoch(version):
'''Removes the epoch from a Debian version string.
strip_epoch(1:1.52-1) will return "1.52-1" and strip_epoch(1.1.3-1) will
return "1.1.3-1".'''
parts = version.full_version.split(':')
if len(parts) > 1:
del parts[0]
version_without_epoch = ':'.join(parts)
return version_without_epoch
def ask_for_manual_fixing():
if not boolean_question("Do you want to resolve this issue manually", True):
print "Abort."
sys.exit(USER_ABORT)
def get_patch_or_branch(bug):
patch = None
branch = None
attached_patches = filter(lambda a: a.type == "Patch", bug.attachments)
linked_branches = map(lambda b: b.branch, bug.linked_branches)
if len(attached_patches) == 0 and len(linked_branches) == 0:
if len(bug.attachments) == 0:
Print.error("No attachment and no linked branch found on bug #%i." % \
(bug.id))
else:
Print.error(("No attached patch and no linked branch found. Go to"
" https://launchpad.net/bugs/%i and mark an attachment as"
" patch.") % (bug.id))
sys.exit(1)
elif len(attached_patches) == 1 and len(linked_branches) == 0:
patch = attached_patches[0]
elif len(attached_patches) == 0 and len(linked_branches) == 1:
branch = linked_branches[0].bzr_identity
else:
if len(attached_patches) == 0:
Print.normal("https://launchpad.net/bugs/%i has %i branches linked:" % \
(bug.id, len(linked_branches)))
elif len(linked_branches) == 0:
Print.normal("https://launchpad.net/bugs/%i has %i patches attached:" % \
(bug.id, len(attached_patches)))
else:
Print.normal("https://launchpad.net/bugs/%i has %i branch(es) linked and %i patch(es) attached:" % \
(bug.id, len(linked_branches), len(attached_patches)))
i = 0
for linked_branch in linked_branches:
i += 1
print "%i) %s" % (i, linked_branch.display_name)
for attached_patch in attached_patches:
i += 1
print "%i) %s" % (i, attached_patch.title)
selected = input_number("Which branch or patch do you want to download",
1, i, i)
if selected <= len(linked_branches):
branch = linked_branches[selected - 1].bzr_identity
else:
patch = attached_patches[selected - len(linked_branches) - 1]
return (patch, branch)
def download_patch(patch):
patch_file = re.sub(" ", "_", patch.title)
if not reduce(lambda r, x: r or patch.title.endswith(x), (".debdiff", ".diff", ".patch"), False):
Print.info("Patch %s does not have a proper file extension." % (patch.title))
patch_file += ".patch"
Print.info("Downloading %s." % (patch_file))
f = open(patch_file, "w")
f.write(patch.data.open().read())
f.close()
return Patch(patch_file)
def download_branch(branch):
dir_name = os.path.basename(branch)
if os.path.isdir(dir_name):
shutil.rmtree(dir_name)
cmd = ["bzr", "branch", branch]
Print.command(cmd)
if subprocess.call(cmd) != 0:
Print.error("Failed to download branch %s." % (branch))
sys.exit(1)
return dir_name
def merge_branch(branch):
edit = False
cmd = ["bzr", "merge", branch]
Print.command(cmd)
if subprocess.call(cmd) != 0:
Print.error("Failed to merge branch %s." % (branch))
ask_for_manual_fixing()
edit = True
return edit
def extract_source(dsc_file, verbose=False):
cmd = ["dpkg-source", "-x", dsc_file]
if not verbose:
cmd.insert(1, "-q")
Print.command(cmd)
if subprocess.call(cmd) != 0:
Print.error("Extraction of %s failed." % (os.path.basename(dsc_file)))
sys.exit(1)
def apply_patch(task, patch):
edit = False
if patch.is_debdiff():
cmd = ["patch", "--merge", "--force", "-p", str(patch.get_strip_level()),
"-i", patch.full_path]
Print.command(cmd)
if subprocess.call(cmd) != 0:
Print.error("Failed to apply debdiff %s to %s %s." % \
(patch.get_name(), task.package, task.get_version()))
if not edit:
ask_for_manual_fixing()
edit = True
else:
# FIXME: edit-patch needs a non-interactive mode
# https://launchpad.net/bugs/612566
cmd = ["edit-patch", patch.full_path]
Print.command(cmd)
if subprocess.call(cmd) != 0:
Print.error("Failed to apply diff %s to %s %s." % \
(patch.get_name(), task.package, task.get_version()))
if not edit:
ask_for_manual_fixing()
edit = True
return edit
def main(script_name, bug_number, build, edit, keyid, upload, verbose=False):
if "SPONSOR_PATCH_WORKDIR" in os.environ:
# FIXME: add command line parameter
workdir = os.path.abspath(os.environ["SPONSOR_PATCH_WORKDIR"])
if not os.path.isdir(workdir):
os.makedirs(workdir)
# FIXME: Print nice error message on failure
Print.command(["cd", workdir])
os.chdir(workdir)
else:
workdir = os.getcwd()
script_name = os.path.basename(sys.argv[0])
launchpad = launchpadlib.launchpad.Launchpad.login_anonymously(script_name, "production")
bug = launchpad.bugs[bug_number]
(patch, branch) = get_patch_or_branch(bug)
bug_tasks = map(lambda x: BugTask(x, launchpad), bug.bug_tasks)
ubuntu_tasks = filter(lambda x: x.is_ubuntu_task(), bug_tasks)
if len(ubuntu_tasks) == 0:
Print.error("No Ubuntu bug task found on bug #%i." % (bug_number))
sys.exit(1)
elif len(ubuntu_tasks) == 1:
task = ubuntu_tasks[0]
if len(ubuntu_tasks) > 1:
if verbose:
Print.info("%i Ubuntu tasks exist for bug #%i." % \
(len(ubuntu_tasks), bug_number))
for task in ubuntu_tasks:
print task.get_short_info()
open_ubuntu_tasks = filter(lambda x: not x.is_complete(), ubuntu_tasks)
if len(open_ubuntu_tasks) == 1:
task = open_ubuntu_tasks[0]
else:
Print.normal("https://launchpad.net/bugs/%i has %i Ubuntu tasks:" % \
(bug_number, len(ubuntu_tasks)))
for i in xrange(len(ubuntu_tasks)):
print "%i) %s" % (i + 1, ubuntu_tasks[i].get_package_and_series())
selected = input_number("To which Ubuntu tasks do the patch belong",
1, len(ubuntu_tasks))
task = ubuntu_tasks[selected - 1]
Print.info("Selected Ubuntu task: %s" % (task.get_short_info()))
dsc_file = task.download_source()
assert os.path.isfile(dsc_file), "%s does not exist." % (dsc_file)
if patch:
patch = download_patch(patch)
Print.info("Ubuntu package: %s" % (task.package))
if task.is_merge():
Print.info("The task is a merge request.")
extract_source(dsc_file, verbose)
# change directory
directory = task.package + '-' + task.get_version().upstream_version
Print.command(["cd", directory])
os.chdir(directory)
edit |= apply_patch(task, patch)
elif branch:
branch_dir = download_branch(task.get_branch_link())
# change directory
Print.command(["cd", branch_dir])
os.chdir(branch_dir)
edit |= merge_branch(branch)
while True:
if edit:
edit_source()
# All following loop executions require manual editing.
edit = True
# update the Maintainer field
cmd = ["update-maintainer"]
if not verbose:
cmd.append("-q")
Print.command(cmd)
subprocess.check_call(cmd)
# Get new version of package
changelog = debian.changelog.Changelog(file("debian/changelog"))
try:
new_version = changelog.get_version()
except IndexError:
Print.error("Debian package version could not be determined. debian/changelog is probably malformed.")
ask_for_manual_fixing()
continue
# Check if version of the new package is greater than the version in the archive.
if new_version <= task.get_version():
Print.error("The version %s is not greater than the already available %s." % \
(new_version, task.get_version()))
ask_for_manual_fixing()
continue
cmd = ["dch", "--maintmaint", "--edit", ""]
Print.command(cmd)
if subprocess.call(cmd) != 0:
Print.info("Failed to update timetamp in debian/changelog.")
# Build source package
cmd = ["debuild", "--no-lintian", "-S"]
previous_version = task.get_previous_version()
cmd.append("-v" + previous_version.full_version)
if previous_version.upstream_version == changelog.upstream_version \
and upload == "ubuntu":
cmd.append("-sd")
else:
cmd.append("-sa")
if not keyid is None:
cmd += ["-k" + keyid]
Print.command(cmd)
if subprocess.call(cmd) != 0:
Print.error("Failed to build source tarball.")
# TODO: Add a "retry" option
ask_for_manual_fixing()
continue
# Generate debdiff
new_dsc_file = os.path.join(workdir,
task.package + "_" + strip_epoch(new_version) + ".dsc")
assert os.path.isfile(dsc_file), "%s does not exist." % (dsc_file)
assert os.path.isfile(new_dsc_file), "%s does not exist." % (new_dsc_file)
cmd = ["debdiff", dsc_file, new_dsc_file]
debdiff_file = os.path.join(workdir,
task.package + "_" + strip_epoch(new_version) + ".debdiff")
if not verbose:
cmd.insert(1, "-q")
Print.command(cmd + [">", debdiff_file])
debdiff = subprocess.Popen(cmd, stdout=subprocess.PIPE).communicate()[0]
# write debdiff file
f = open(debdiff_file, "w")
f.writelines(debdiff)
f.close()
# Make sure that the Launchpad bug will be closed
changes_file = new_dsc_file[:-4] + "_source.changes"
if bug_number not in get_fixed_lauchpad_bugs(changes_file):
Print.error("Launchpad bug #%i is not closed by new version." % \
(bug_number))
ask_for_manual_fixing()
continue
ubuntu = launchpad.distributions['ubuntu']
devel_series = ubuntu.current_series.name
supported_series = [series.name for series in ubuntu.series
if series.active and series.name != devel_series]
# Make sure that the target is correct
if upload == "ubuntu":
allowed = map(lambda s: s + "-proposed", supported_series) + \
[devel_series]
if changelog.distributions not in allowed:
Print.error("%s is not an allowed series. It needs to be one of %s." \
% (changelog.distributions, ", ".join(allowed)))
ask_for_manual_fixing()
continue
elif upload and upload.startwith("ppa/"):
allowed = supported_series + [devel_series]
if changelog.distributions not in allowed:
Print.error("%s is not an allowed series. It needs to be one of %s." \
% (changelog.distributions, ", ".join(allowed)))
ask_for_manual_fixing()
continue
if build:
buildresult = os.path.join(workdir, task.package + "-buildresult")
if not os.path.isdir(buildresult):
os.makedirs(buildresult)
cmd = ["dpkg-architecture", "-qDEB_BUILD_ARCH_CPU"]
architecture = subprocess.Popen(cmd, stdout=subprocess.PIPE).communicate()[0].strip()
# build package
dist = re.sub("-.*$", "", changelog.distributions)
# TODO: Do not rely on a specific pbuilder configuration.
cmd = ["sudo", "-E", "DIST=" + dist, "pbuilder", "--build",
"--distribution", dist, "--buildresult", buildresult,
"--architecture", architecture, new_dsc_file]
Print.command(cmd)
if subprocess.call(cmd) != 0:
Print.error("Failed to build %s from source." % \
(os.path.basename(new_dsc_file)))
# TODO: Add "retry" and "update" option
ask_for_manual_fixing()
continue
# Check lintian
build_changes = os.path.join(buildresult, task.package + "_" + \
strip_epoch(new_version) + "_" + architecture + ".changes")
assert os.path.isfile(build_changes), "%s does not exist." % (build_changes)
cmd = ["lintian", "-IE", "--pedantic", "-q", build_changes]
lintian_file = os.path.join(workdir,
task.package + "_" + strip_epoch(new_version) + ".lintian")
Print.command(cmd + [">", lintian_file])
report = subprocess.Popen(cmd, stdout=subprocess.PIPE).communicate()[0]
# write lintian report file
f = open(lintian_file, "w")
f.writelines(report)
f.close()
# Upload package
if upload:
if upload == "ubuntu":
build_log = os.path.join(buildresult, task.package + "_" + \
strip_epoch(new_version) + "_" + architecture + ".build")
print "Please check %s %s carefully:\nfile://%s\nfile://%s\nfile://%s" % \
(task.package, new_version, debdiff_file, lintian_file, build_log)
answer = yes_edit_no_question("Do you want to upload the package to the official Ubuntu archive", "yes")
if answer == "edit":
continue
elif answer == "no":
print "Abort."
sys.exit(USER_ABORT)
cmd = ["dput", "--force", upload, changes_file]
Print.command(cmd)
if subprocess.call(cmd) != 0:
Print.error("Upload of %s to %s failed." % \
(os.path.basename(changes_file), upload))
sys.exit(1)
# Leave while loop if everything worked
break
if __name__ == "__main__":
script_name = os.path.basename(sys.argv[0])
usage = "%s [options] <bug number>" % (script_name)
epilog = "See %s(1) for more info." % (script_name)
parser = optparse.OptionParser(usage=usage, epilog=epilog)
parser.add_option("-b", "--build", help="Build the package with pbuilder.",
dest="build", action="store_true", default=False)
parser.add_option("-e", "--edit", help="launch sub-shell to allow editing of the patch",
dest="edit", action="store_true", default=False)
parser.add_option("-k", "--key", dest="keyid",
help="Specify the key ID to be used for signing.", default=None)
parser.add_option("-s", "--sponsor", help="sponsoring; equals -b -u ubuntu",
dest="sponsoring", action="store_true", default=False)
parser.add_option("-u", "--upload", dest="upload",
help="Specify an upload destination (default none).", default=None)
parser.add_option("-v", "--verbose", help="print more information",
dest="verbose", action="store_true", default=False)
(options, args) = parser.parse_args()
Print.set_verbosity(options.verbose)
if len(args) == 0:
Print.error("No bug number specified.")
sys.exit(1)
elif len(args) > 1:
Print.error("Multiple bug numbers specified: %s" % (", ".join(args)))
sys.exit(1)
bug_number = args[0]
if bug_number.isdigit():
bug_number = int(bug_number)
else:
Print.error("Invalid bug number specified: %s" % (bug_number))
sys.exit(1)
if options.sponsoring:
options.build = True
options.upload = "ubuntu"
main(script_name, bug_number, options.build, options.edit, options.keyid,
options.upload, options.verbose)
# vim: set noet: