Split objects out of sponsor-patch script.

This commit is contained in:
Benjamin Drung 2010-12-17 17:32:38 +01:00
parent cec070446f
commit fbe27a7ce4
5 changed files with 267 additions and 208 deletions

View File

@ -21,163 +21,21 @@ import re
import shutil
import subprocess
import sys
import urllib
import debian.changelog
import debian.deb822
import debian.debian_support
import launchpadlib.launchpad
from ubuntutools.builder import getBuilder
import ubuntutools.update_maintainer
from ubuntutools.logger import Logger
from ubuntutools.question import input_number, boolean_question, yes_edit_no_question
from ubuntutools.sponsor_patch.bugtask import BugTask
from ubuntutools.sponsor_patch.patch import Patch
USER_ABORT = 2
class BugTask(object):
def __init__(self, bug_task, launchpad):
self.bug_task = bug_task
self.launchpad = launchpad
components = re.split(" \(| ", self.bug_task.bug_target_name.strip(")"))
assert len(components) >= 1 and len(components) <= 3
if len(components) == 1:
self.package = None
self.project = components[0]
self.series = None
elif len(components) == 2:
self.package = components[0]
self.project = components[1].lower()
self.series = None
elif len(components) == 3:
self.package = components[0]
self.project = components[1].lower()
self.series = components[2].lower()
def download_source(self):
source_files = self.get_source().sourceFileUrls()
dsc_file = None
for url in source_files:
filename = urllib.unquote(os.path.basename(url))
Logger.info("Downloading %s..." % (filename))
urllib.urlretrieve(url, filename)
if url.endswith(".dsc"):
dsc_file = filename
return os.path.join(os.getcwd(), dsc_file)
def get_branch_link(self):
return "lp:" + self.project + "/" + self.get_series() + "/" + \
self.package
def get_long_info(self):
return "Bug task: " + str(self.bug_task) + "\n" + \
"Package: " + str(self.package) + "\n" + \
"Project: " + str(self.project) + "\n" + \
"Series: " + str(self.series)
def get_package_and_series(self):
result = self.package
if self.series:
result += " (" + self.series + ")"
return result
def get_previous_version(self):
if self.is_merge():
previous_version = self.get_latest_released_version()
else:
previous_version = self.get_version()
return previous_version
def get_series(self, latest_release=False):
if self.series is None or latest_release:
dist = self.launchpad.distributions[self.project]
return dist.current_series.name
else:
return self.series
def get_short_info(self):
return self.bug_task.bug_target_name + ": " + self.bug_task.status
def get_source(self, latest_release=False):
assert self.package is not None
if self.is_merge() and not latest_release:
project = "debian"
title = self.bug_task.title.lower().split()
if "experimental" in title:
series = "experimental"
elif "testing" in title:
# TODO: Do not hard code series!
series = "squeeze"
else:
series = "sid"
status = "Pending"
else:
project = self.project
series = self.get_series(latest_release)
status = "Published"
dist = self.launchpad.distributions[project]
archive = dist.getArchive(name="primary")
distro_series = dist.getSeries(name_or_version=series)
published = archive.getPublishedSources(source_name=self.package,
distro_series=distro_series,
status=status, exact_match=True)
latest_source = None
for source in published:
if source.pocket in ('Release', 'Security', 'Updates', 'Proposed'):
latest_source = source
break
return latest_source
def get_version(self):
source_package_version = self.get_source().source_package_version
return debian.debian_support.Version(source_package_version)
def get_latest_released_version(self):
version = self.get_source(True).source_package_version
return debian.debian_support.Version(version)
def is_complete(self):
return self.bug_task.is_complete
def is_merge(self):
bug = self.bug_task.bug
return "merge" in bug.title.lower().split(" ") or "merge" in bug.tags
def is_ubuntu_task(self):
return self.project == "ubuntu"
class Patch(object):
def __init__(self, patch_file):
self.patch_file = patch_file
self.full_path = os.path.realpath(self.patch_file)
assert os.path.isfile(self.full_path), "%s does not exist." % \
(self.full_path)
cmd = ["diffstat", "-l", "-p0", self.full_path]
process = subprocess.Popen(cmd, stdout=subprocess.PIPE)
changed_files = process.communicate()[0]
self.changed_files = filter(lambda l: l != "",
changed_files.split("\n"))
def get_name(self):
return self.patch_file
def get_strip_level(self):
strip_level = None
if self.is_debdiff():
changelog = filter(lambda f: f.endswith("debian/changelog"),
self.changed_files)[0]
strip_level = len(changelog.split(os.sep)) - 2
return strip_level
def is_debdiff(self):
return len(filter(lambda f: f.endswith("debian/changelog"),
self.changed_files)) > 0
def get_source_package_name(bug_task):
package = None
if bug_task.bug_target_name != "ubuntu":
@ -192,68 +50,6 @@ def get_user_shell():
shell = pwd.getpwuid(os.getuid())[6]
return shell
def input_number(question, min_number, max_number, default=None):
if default:
question += " [%i]? " % (default)
else:
question += "? "
selected = None
while selected < min_number or selected > max_number:
selected = raw_input(question).strip()
if default and selected == "":
selected = default
else:
try:
selected = int(selected)
if selected < min_number or selected > max_number:
print "Please input a number between %i and %i." % \
(min_number, max_number)
except ValueError:
print "Please input a number."
assert type(selected) == int
return selected
def boolean_question(question, default):
if default is True:
question += " [Y/n]? "
else:
question += " [y/N]? "
selected = None
while type(selected) != bool:
selected = raw_input(question).strip().lower()
if selected == "":
selected = default
elif selected in ("y", "yes"):
selected = True
elif selected in ("n", "no"):
selected = False
else:
print "Please answer the question with yes or no."
return selected
def yes_edit_no_question(question, default):
assert default in ("yes", "edit", "no")
if default == "yes":
question += " [Y/e/n]? "
elif default == "edit":
question += " [y/E/n]? "
else:
question += " [y/e/N]? "
selected = None
while selected not in ("yes", "edit", "no"):
selected = raw_input(question).strip().lower()
if selected == "":
selected = default
elif selected in ("y", "yes"):
selected = "yes"
elif selected in ("e", "edit"):
selected = "edit"
elif selected in ("n", "no"):
selected = "no"
else:
print "Please answer the question with yes, edit, or no."
return selected
def edit_source():
# Spawn shell to allow modifications
cmd = [get_user_shell()]

78
ubuntutools/question.py Normal file
View File

@ -0,0 +1,78 @@
#
# question.py - Helper class for asking questions
#
# Copyright (C) 2010, Benjamin Drung <bdrung@ubuntu.com>
#
# Permission to use, copy, modify, and/or distribute this software for any
# purpose with or without fee is hereby granted, provided that the above
# copyright notice and this permission notice appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
# OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
def input_number(question, min_number, max_number, default=None):
if default:
question += " [%i]? " % (default)
else:
question += "? "
selected = None
while selected < min_number or selected > max_number:
selected = raw_input(question).strip()
if default and selected == "":
selected = default
else:
try:
selected = int(selected)
if selected < min_number or selected > max_number:
print "Please input a number between %i and %i." % \
(min_number, max_number)
except ValueError:
print "Please input a number."
assert type(selected) == int
return selected
def boolean_question(question, default):
if default is True:
question += " [Y/n]? "
else:
question += " [y/N]? "
selected = None
while type(selected) != bool:
selected = raw_input(question).strip().lower()
if selected == "":
selected = default
elif selected in ("y", "yes"):
selected = True
elif selected in ("n", "no"):
selected = False
else:
print "Please answer the question with yes or no."
return selected
def yes_edit_no_question(question, default):
assert default in ("yes", "edit", "no")
if default == "yes":
question += " [Y/e/n]? "
elif default == "edit":
question += " [y/E/n]? "
else:
question += " [y/e/N]? "
selected = None
while selected not in ("yes", "edit", "no"):
selected = raw_input(question).strip().lower()
if selected == "":
selected = default
elif selected in ("y", "yes"):
selected = "yes"
elif selected in ("e", "edit"):
selected = "edit"
elif selected in ("n", "no"):
selected = "no"
else:
print "Please answer the question with yes, edit, or no."
return selected

View File

View File

@ -0,0 +1,139 @@
#
# bugtask.py - Internal helper class for sponsor-patch
#
# Copyright (C) 2010, Benjamin Drung <bdrung@ubuntu.com>
#
# Permission to use, copy, modify, and/or distribute this software for any
# purpose with or without fee is hereby granted, provided that the above
# copyright notice and this permission notice appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
# OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
import os
import re
import urllib
import debian.debian_support
from ubuntutools.logger import Logger
class BugTask(object):
def __init__(self, bug_task, launchpad):
self.bug_task = bug_task
self.launchpad = launchpad
components = re.split(" \(| ", self.bug_task.bug_target_name.strip(")"))
assert len(components) >= 1 and len(components) <= 3
if len(components) == 1:
self.package = None
self.project = components[0]
self.series = None
elif len(components) == 2:
self.package = components[0]
self.project = components[1].lower()
self.series = None
elif len(components) == 3:
self.package = components[0]
self.project = components[1].lower()
self.series = components[2].lower()
def download_source(self):
source_files = self.get_source().sourceFileUrls()
dsc_file = None
for url in source_files:
filename = urllib.unquote(os.path.basename(url))
Logger.info("Downloading %s..." % (filename))
urllib.urlretrieve(url, filename)
if url.endswith(".dsc"):
dsc_file = filename
return os.path.join(os.getcwd(), dsc_file)
def get_branch_link(self):
return "lp:" + self.project + "/" + self.get_series() + "/" + \
self.package
def get_long_info(self):
return "Bug task: " + str(self.bug_task) + "\n" + \
"Package: " + str(self.package) + "\n" + \
"Project: " + str(self.project) + "\n" + \
"Series: " + str(self.series)
def get_package_and_series(self):
result = self.package
if self.series:
result += " (" + self.series + ")"
return result
def get_previous_version(self):
if self.is_merge():
previous_version = self.get_latest_released_version()
else:
previous_version = self.get_version()
return previous_version
def get_series(self, latest_release=False):
if self.series is None or latest_release:
dist = self.launchpad.distributions[self.project]
return dist.current_series.name
else:
return self.series
def get_short_info(self):
return self.bug_task.bug_target_name + ": " + self.bug_task.status
def get_source(self, latest_release=False):
assert self.package is not None
if self.is_merge() and not latest_release:
project = "debian"
title = self.bug_task.title.lower().split()
if "experimental" in title:
series = "experimental"
elif "testing" in title:
# TODO: Do not hard code series!
series = "squeeze"
else:
series = "sid"
status = "Pending"
else:
project = self.project
series = self.get_series(latest_release)
status = "Published"
dist = self.launchpad.distributions[project]
archive = dist.getArchive(name="primary")
distro_series = dist.getSeries(name_or_version=series)
published = archive.getPublishedSources(source_name=self.package,
distro_series=distro_series,
status=status, exact_match=True)
latest_source = None
for source in published:
if source.pocket in ('Release', 'Security', 'Updates', 'Proposed'):
latest_source = source
break
return latest_source
def get_version(self):
source_package_version = self.get_source().source_package_version
return debian.debian_support.Version(source_package_version)
def get_latest_released_version(self):
version = self.get_source(True).source_package_version
return debian.debian_support.Version(version)
def is_complete(self):
return self.bug_task.is_complete
def is_merge(self):
bug = self.bug_task.bug
return "merge" in bug.title.lower().split(" ") or "merge" in bug.tags
def is_ubuntu_task(self):
return self.project == "ubuntu"

View File

@ -0,0 +1,46 @@
#
# patch.py - Internal helper class for sponsor-patch
#
# Copyright (C) 2010, Benjamin Drung <bdrung@ubuntu.com>
#
# Permission to use, copy, modify, and/or distribute this software for any
# purpose with or without fee is hereby granted, provided that the above
# copyright notice and this permission notice appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
# OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
import os
import subprocess
class Patch(object):
def __init__(self, patch_file):
self.patch_file = patch_file
self.full_path = os.path.realpath(self.patch_file)
assert os.path.isfile(self.full_path), "%s does not exist." % \
(self.full_path)
cmd = ["diffstat", "-l", "-p0", self.full_path]
process = subprocess.Popen(cmd, stdout=subprocess.PIPE)
changed_files = process.communicate()[0]
self.changed_files = filter(lambda l: l != "",
changed_files.split("\n"))
def get_name(self):
return self.patch_file
def get_strip_level(self):
strip_level = None
if self.is_debdiff():
changelog = filter(lambda f: f.endswith("debian/changelog"),
self.changed_files)[0]
strip_level = len(changelog.split(os.sep)) - 2
return strip_level
def is_debdiff(self):
return len(filter(lambda f: f.endswith("debian/changelog"),
self.changed_files)) > 0