Fix all flake8 issues

This commit is contained in:
Benjamin Drung 2017-05-01 00:20:03 +02:00
parent 3a6cd3ac66
commit cc7170eccb
49 changed files with 476 additions and 444 deletions

23
404main
View File

@ -31,11 +31,12 @@ import apt
from ubuntutools import subprocess
def process_deps(cache, deps):
"""Takes a list of (build) dependencies and processes it."""
for basedep in [d.or_dependencies[0] for d in deps]:
if not packages.has_key(basedep.name) and basedep.name != '':
if basedep.name not in packages and basedep.name != '':
# Check the (build) dependencies recursively
find_main(cache, basedep.name)
@ -79,7 +80,7 @@ def find_main(cache, pack):
packages[pack] = True
return
else:
if not packages.has_key(pack):
if pack not in packages:
packages[pack] = False
# Retrieve package dependencies
@ -108,10 +109,12 @@ def find_main(cache, pack):
process_deps(cache, deps)
def usage(exit_code):
print 'Usage: %s <package name> [<distribution>]' % sys.argv[0]
sys.exit(exit_code)
def main():
global packages, distro
@ -129,9 +132,8 @@ def main():
distro = sys.argv[2]
if not get_package_version(cache, distro, 'bash'):
print u'«%s» is not a valid distribution.' % distro
print ('Remember that for 404main to work with a certain '
'distribution it must be in your /etc/apt/sources.list '
'file.')
print('Remember that for 404main to work with a certain distribution '
'it must be in your /etc/apt/sources.list file.')
sys.exit(1)
else:
cmd = ['lsb_release', '-cs']
@ -139,12 +141,10 @@ def main():
distro = process.stdout.read().strip('\n')
if not get_package_version(cache, distro, sys.argv[1]):
print (u"Can't find package «%s» in distribution «%s»."
% (sys.argv[1], distro))
print(u"Can't find package «%s» in distribution «%s»." % (sys.argv[1], distro))
sys.exit(1)
print (u'Checking package «%s» in distribution «%s»...'
% (sys.argv[1], distro))
print(u'Checking package «%s» in distribution «%s»...' % (sys.argv[1], distro))
find_main(cache, sys.argv[1])
@ -159,8 +159,9 @@ def main():
print ' ', package
if all_in_main:
print (u'Package «%s» and all its dependencies and build dependencies '
u'are in main.') % sys.argv[1]
print((u'Package «%s» and all its dependencies and build dependencies are in main.') %
sys.argv[1])
if __name__ == '__main__':

View File

@ -41,10 +41,12 @@ from ubuntutools.misc import (system_distribution, vendor_to_distroinfo,
from ubuntutools.question import YesNoQuestion
from ubuntutools import subprocess
def error(msg):
Logger.error(msg)
sys.exit(1)
def check_call(cmd, *args, **kwargs):
Logger.command(cmd)
ret = subprocess.call(cmd, *args, **kwargs)
@ -199,6 +201,7 @@ def find_release_package(mirror, workdir, package, version, source_release,
return srcpkg
def find_package(mirror, workdir, package, version, source_release, config):
"Returns the SourcePackage"
if package.endswith('.dsc'):
@ -217,6 +220,7 @@ def find_package(mirror, workdir, package, version, source_release, config):
return srcpkg
def get_backport_version(version, suffix, upload, release):
distribution = codename_to_distribution(release)
if not distribution:
@ -231,6 +235,7 @@ def get_backport_version(version, suffix, upload, release):
backport_version += '~ppa1'
return backport_version
def get_old_version(source, release):
try:
distribution = codename_to_distribution(release)
@ -240,15 +245,17 @@ def get_old_version(source, release):
('Release', 'Security', 'Updates',
'Proposed', 'Backports'))
return pkg.getVersion()
except (SeriesNotFoundException, PackageNotFoundException) as e:
except (SeriesNotFoundException, PackageNotFoundException):
pass
def get_backport_dist(release, release_pocket):
if release_pocket:
return release
else:
return '%s-backports' % release
def do_build(workdir, dsc, release, builder, update):
builder = get_builder(builder)
if not builder:
@ -264,9 +271,9 @@ def do_build(workdir, dsc, release, builder, update):
release,
os.path.join(workdir, "buildresult"))
def do_upload(workdir, package, bp_version, changes, upload, prompt):
print('Please check %s %s in file://%s carefully!' % \
(package, bp_version, workdir))
print('Please check %s %s in file://%s carefully!' % (package, bp_version, workdir))
if prompt or upload == 'ubuntu':
question = 'Do you want to upload the package to %s' % upload
answer = YesNoQuestion().ask(question, "yes")
@ -275,6 +282,7 @@ def do_upload(workdir, package, bp_version, changes, upload, prompt):
check_call(['dput', upload, changes], cwd=workdir)
def orig_needed(upload, workdir, pkg):
'''Avoid a -sa if possible'''
if not upload or not upload.startswith('ppa:'):
@ -285,21 +293,20 @@ def orig_needed(upload, workdir, pkg):
version = pkg.version.upstream_version
h = Http()
for filename in glob.glob(os.path.join(workdir,
'%s_%s.orig*' % (pkg.source, version))):
for filename in glob.glob(os.path.join(workdir, '%s_%s.orig*' % (pkg.source, version))):
url = ('https://launchpad.net/~%s/+archive/%s/+files/%s'
% (user, ppa, filename))
try:
headers, body = h.request(url, 'HEAD')
if (headers.status != 200 or
not headers['content-location'].
startswith('https://launchpadlibrarian.net')):
if (headers.status != 200 or
not headers['content-location'].startswith('https://launchpadlibrarian.net')):
return True
except HttpLib2Error as e:
Logger.info(e)
return True
return False
def do_backport(workdir, pkg, suffix, close, release, release_pocket, build,
builder, update, upload, keyid, prompt):
dirname = '%s-%s' % (pkg.source, release)
@ -362,6 +369,7 @@ def do_backport(workdir, pkg, suffix, close, release, release_pocket, build,
shutil.rmtree(srcdir)
def main(args):
ubu_email()
@ -412,5 +420,6 @@ def main(args):
if not opts.workdir:
shutil.rmtree(workdir)
if __name__ == '__main__':
sys.exit(main(sys.argv))

View File

@ -30,20 +30,24 @@ from launchpadlib.errors import HTTPError
from ubuntutools.config import UDTConfig
from ubuntutools.logger import Logger
def error_out(msg):
Logger.error(msg)
sys.exit(1)
def save_entry(entry):
try:
entry.lp_save()
except HTTPError, error:
error_out(error.content)
def tag_bug(bug):
bug.tags = bug.tags + ['bitesize'] # LP: #254901 workaround
bug.tags = bug.tags + ['bitesize'] # LP: #254901 workaround
save_entry(bug)
def main():
usage = "Usage: %prog <bug number>"
opt_parser = OptionParser(usage)
@ -71,7 +75,7 @@ def main():
bug = launchpad.bugs[args[0]]
except HTTPError, error:
if error.response.status == 401:
error_out("Don't have enough permissions to access bug %s. %s" % \
error_out("Don't have enough permissions to access bug %s. %s" %
(args[0], error.content))
else:
raise
@ -82,7 +86,8 @@ def main():
"for newcomers in Ubuntu development. If you need "
"any help with fixing it, talk to me about it.")
bug.subscribe(person=launchpad.me)
tag_bug(launchpad.bugs[bug.id]) # fresh bug object, LP: #336866 workaround
tag_bug(launchpad.bugs[bug.id]) # fresh bug object, LP: #336866 workaround
if __name__ == '__main__':
main()

View File

@ -153,5 +153,6 @@ def main():
else:
print('All dependencies are supported in main or restricted.')
if __name__ == '__main__':
main()

View File

@ -29,8 +29,9 @@ import ubuntutools.misc
def main():
parser = optparse.OptionParser(usage='%prog [options] [string]',
description='List pending merges from Debian matching string')
parser = optparse.OptionParser(
usage='%prog [options] [string]',
description='List pending merges from Debian matching string')
args = parser.parse_args()[1]
if len(args) > 1:
@ -66,9 +67,9 @@ def main():
if merge.get('uploader'):
uploader = '(%s)' % merge['uploader']
pretty_uploader = u'{} {}'.format(author, uploader)
if (match is None or
match in package or match in author or match in uploader):
if match is None or match in package or match in author or match in uploader:
print '%s\t%s' % (package.encode("utf-8"), pretty_uploader.encode("utf-8"))
if __name__ == '__main__':
main()

View File

@ -36,6 +36,7 @@ from launchpadlib.launchpad import Launchpad
from ubuntutools.lp.libsupport import translate_web_api
def check_args():
howmany = -1
url = ""
@ -65,6 +66,7 @@ def check_args():
return (howmany, url)
def filter_unsolved(task):
# TODO: don't use this filter here, only check status and assignee of
# the given task
@ -73,12 +75,13 @@ def filter_unsolved(task):
# this is expensive, parse name out of self_link instead?
subscriptions = set(s.person.name for s in task.bug.subscriptions)
if (task.status != "Fix Committed" and
(not task.assignee or task.assignee.name in ['motu','desktop-bugs']) and
'ubuntu-sponsors' not in subscriptions and
'ubuntu-archive' not in subscriptions):
(not task.assignee or task.assignee.name in ['motu', 'desktop-bugs']) and
'ubuntu-sponsors' not in subscriptions and
'ubuntu-archive' not in subscriptions):
return True
return False
def main():
(howmany, url) = check_args()
if len(url.split("?", 1)) == 2:

View File

@ -38,6 +38,7 @@ except ImportError:
Logger.error("Please install 'python-soappy' in order to use this utility.")
sys.exit(1)
def main():
bug_re = re.compile(r"bug=(\d+)")
@ -46,8 +47,8 @@ def main():
debbugs = SOAPpy.SOAPProxy(url, namespace)
# debug
#debbugs.config.dumpSOAPOut = 1
#debbugs.config.dumpSOAPIn = 1
# debbugs.config.dumpSOAPOut = 1
# debbugs.config.dumpSOAPIn = 1
parser = OptionParser(usage="%prog [option] bug ...")
parser.add_option("-b", "--browserless",
@ -118,10 +119,10 @@ def main():
ubupackage)
sys.exit(1)
u_bug = launchpad.bugs.createBug(target=target, title=subject,
description='Imported from Debian bug '
'http://bugs.debian.org/%d:\n\n%s'
% (bug_num, summary))
u_bug = launchpad.bugs.createBug(
target=target, title=subject,
description='Imported from Debian bug http://bugs.debian.org/%d:\n\n%s' %
(bug_num, summary))
d_sp = debian.getSourcePackage(name=package)
if d_sp is None and options.package:
d_sp = debian.getSourcePackage(name=options.package)
@ -133,5 +134,6 @@ def main():
if not options.browserless:
webbrowser.open(u_bug.web_link)
if __name__ == '__main__':
main()

View File

@ -21,6 +21,7 @@
import re
import sys
def usage(exit_code=1):
print '''Usage: merge-changelog <left changelog> <right changelog>
@ -35,10 +36,12 @@ Debian release of the package.
# Changelog Management
########################################################################
# Regular expression for top of debian/changelog
CL_RE = re.compile(r'^(\w[-+0-9a-z.]*) \(([^\(\) \t]+)\)((\s+[-0-9a-z]+)+)\;',
re.IGNORECASE)
def merge_changelog(left_changelog, right_changelog):
"""Merge a changelog file."""
@ -60,6 +63,7 @@ def merge_changelog(left_changelog, right_changelog):
return False
def read_changelog(filename):
"""Return a parsed changelog file."""
entries = []
@ -93,6 +97,7 @@ def read_changelog(filename):
return entries
########################################################################
# Version parsing code
########################################################################
@ -214,6 +219,7 @@ def strcut(string, idx, accept):
return (ret, idx)
def deb_order(string, idx):
"""Return the comparison order of two characters."""
if idx >= len(string):
@ -223,6 +229,7 @@ def deb_order(string, idx):
else:
return CMP_TABLE.index(string[idx])
def deb_cmp_str(x, y):
"""Compare two strings in a deb version."""
idx = 0
@ -237,6 +244,7 @@ def deb_cmp_str(x, y):
return 0
def deb_cmp(x, y):
"""Implement the string comparison outlined by Debian policy."""
x_idx = y_idx = 0
@ -270,5 +278,6 @@ def main():
merge_changelog(left_changelog, right_changelog)
sys.exit(0)
if __name__ == '__main__':
main()

View File

@ -490,11 +490,12 @@ def main():
sys.exit(1)
# Execute the pbuilder command
if not '--debug-echo' in args:
if '--debug-echo' not in args:
sys.exit(subprocess.call(app.get_command(args)))
else:
print app.get_command([arg for arg in args if arg != '--debug-echo'])
if __name__ == '__main__':
try:
main()

View File

@ -27,6 +27,7 @@ from ubuntutools.archive import DebianSourcePackage, DownloadError
from ubuntutools.config import UDTConfig
from ubuntutools.logger import Logger
def previous_version(package, version, distance):
"Given an (extracted) package, determine the version distance versions ago"
upver = debian.debian_support.Version(version).upstream_version
@ -43,6 +44,7 @@ def previous_version(package, version, distance):
seen += 1
return False
def main():
parser = optparse.OptionParser('%prog [options] <package> <version> '
'[distance]')
@ -105,6 +107,7 @@ def main():
oldpkg.unpack()
print 'file://' + oldpkg.debdiff(newpkg, diffstat=True)
if __name__ == '__main__':
try:
main()

View File

@ -112,7 +112,7 @@ def main():
line = list(rmadison('debian', package, suite, 'source'))
if not line:
source_package = source_package_for(package, suite)
if source_package != None and package != source_package:
if source_package is not None and package != source_package:
package = source_package
line = list(rmadison('debian', package, suite, 'source'))
if not line:
@ -135,6 +135,7 @@ def main():
if not options.download_only:
srcpkg.unpack()
if __name__ == '__main__':
try:
main()

View File

@ -141,6 +141,7 @@ def main():
if not options.download_only:
srcpkg.unpack()
if __name__ == '__main__':
try:
main()

View File

@ -26,20 +26,13 @@
import re
import json
import os
import sys
import urllib2
from optparse import OptionParser
from distro_info import UbuntuDistroInfo, DistroDataOutdated
from ubuntutools.archive import UbuntuCloudArchiveSourcePackage, DownloadError
from ubuntutools.config import UDTConfig
from ubuntutools.lp.lpapicache import Distribution, Launchpad
from ubuntutools.lp.udtexceptions import (SeriesNotFoundException,
PackageNotFoundException,
PocketDoesNotExistError)
from ubuntutools.lp.lpapicache import Launchpad
from ubuntutools.lp.udtexceptions import PocketDoesNotExistError
from ubuntutools.logger import Logger
from ubuntutools.misc import split_release_pocket
@ -51,13 +44,13 @@ from launchpadlib.launchpad import Launchpad as LP
def showOpenstackReleases(uca):
releases = []
for p in uca.ppas:
if re.match("\w*-staging", p.name):
if re.match(r"\w*-staging", p.name):
releases.append(re.sub("-staging", "", p.name))
Logger.error("Openstack releases are:\n\t%s", ", ".join(releases))
def getSPPH(lp, archive, package, version=None, series=None, pocket=None, try_binary=True):
params = { 'exact_match': True, 'order_by_date': True, }
params = {'exact_match': True, 'order_by_date': True}
if pocket:
params['pocket'] = pocket
if series:
@ -156,6 +149,7 @@ def main():
if not options.download_only:
srcpkg.unpack()
if __name__ == '__main__':
try:
main()

View File

@ -75,12 +75,12 @@ def determine_destinations(source, destination):
def disclaimer():
print ("Ubuntu's backports are not for fixing bugs in stable releases, "
"but for bringing new features to older, stable releases.")
print ("See https://wiki.ubuntu.com/UbuntuBackports for the Ubuntu "
"Backports policy and processes.")
print ("See https://wiki.ubuntu.com/StableReleaseUpdates for the process "
"for fixing bugs in stable releases.")
print("Ubuntu's backports are not for fixing bugs in stable releases, "
"but for bringing new features to older, stable releases.\n"
"See https://wiki.ubuntu.com/UbuntuBackports for the Ubuntu "
"Backports policy and processes.\n"
"See https://wiki.ubuntu.com/StableReleaseUpdates for the process "
"for fixing bugs in stable releases.")
confirmation_prompt()
@ -129,11 +129,9 @@ def find_rdepends(releases, published_binaries):
if rdep['Package'] in published_binaries:
continue
# arch==any queries return Reverse-Build-Deps:
if (arch == 'any' and rdep.get('Architectures', [])
== ['source']):
if arch == 'any' and rdep.get('Architectures', []) == ['source']:
continue
intermediate[binpkg][rdep['Package']] \
.append((release, relationship))
intermediate[binpkg][rdep['Package']].append((release, relationship))
output = []
for binpkg, rdeps in intermediate.iteritems():
@ -240,8 +238,7 @@ def request_backport(package_spph, source, destinations):
+ testing
+ [""]
+ find_rdepends(destinations, published_binaries)
+ [""]
) % subst)
+ [""]) % subst)
editor = EditBugReport(subject, body)
editor.edit()

View File

@ -42,6 +42,7 @@ from ubuntutools.question import confirmation_prompt, EditBugReport
# entry point
#
def main():
# Our usage options.
usage = ('Usage: %prog [options] '
@ -208,9 +209,8 @@ def main():
ubuntu_version = Version('~')
ubuntu_component = None # Set after getting the Debian info
if not newsource:
print ("'%s' doesn't exist in 'Ubuntu %s'.\n"
"Do you want to sync a new package?"
% (srcpkg, release))
print("'%s' doesn't exist in 'Ubuntu %s'.\nDo you want to sync a new package?" %
(srcpkg, release))
confirmation_prompt()
newsource = True
except udtexceptions.SeriesNotFoundException, error:
@ -240,8 +240,7 @@ def main():
# try rmadison
import ubuntutools.requestsync.mail
try:
debian_srcpkg = ubuntutools.requestsync.mail.get_debian_srcpkg(
srcpkg, distro)
debian_srcpkg = ubuntutools.requestsync.mail.get_debian_srcpkg(srcpkg, distro)
debian_version = Version(debian_srcpkg.getVersion())
debian_component = debian_srcpkg.getComponent()
except udtexceptions.PackageNotFoundException, error:
@ -249,9 +248,8 @@ def main():
sys.exit(1)
if ubuntu_version == debian_version:
print >> sys.stderr, ('E: The versions in Debian and Ubuntu are the '
'same already (%s). Aborting.'
% ubuntu_version)
print >> sys.stderr, ('E: The versions in Debian and Ubuntu are the '
'same already (%s). Aborting.' % ubuntu_version)
sys.exit(1)
if ubuntu_version > debian_version:
print >> sys.stderr, ('E: The version in Ubuntu (%s) is newer than '
@ -283,9 +281,9 @@ def main():
if 'ubuntu' in str(ubuntu_version):
need_interaction = True
print ('Changes have been made to the package in Ubuntu.\n'
'Please edit the report and give an explanation.\n'
'Not saving the report file will abort the request.')
print('Changes have been made to the package in Ubuntu.\n'
'Please edit the report and give an explanation.\n'
'Not saving the report file will abort the request.')
report += (u'Explanation of the Ubuntu delta and why it can be '
u'dropped:\n%s\n>>> ENTER_EXPLANATION_HERE <<<\n\n'
% get_ubuntu_delta_changelog(ubuntu_srcpkg))
@ -293,9 +291,9 @@ def main():
if ffe:
need_interaction = True
print ('To approve FeatureFreeze exception, you need to state\n'
'the reason why you feel it is necessary.\n'
'Not saving the report file will abort the request.')
print('To approve FeatureFreeze exception, you need to state\n'
'the reason why you feel it is necessary.\n'
'Not saving the report file will abort the request.')
report += ('Explanation of FeatureFreeze exception:\n'
'>>> ENTER_EXPLANATION_HERE <<<\n\n')
@ -354,6 +352,7 @@ def main():
options.keyid, email_from, mailserver_host, mailserver_port,
mailserver_user, mailserver_pass)
if __name__ == '__main__':
try:
main()

View File

@ -33,7 +33,8 @@ def main():
Logger.warn(e)
default_release = 'unstable'
parser = optparse.OptionParser('%prog [options] package',
parser = optparse.OptionParser(
'%prog [options] package',
description="List reverse-dependencies of package. "
"If the package name is prefixed with src: then the "
"reverse-dependencies of all the binary packages that "
@ -51,8 +52,7 @@ def main():
help='Also consider Suggests relationships')
parser.add_option('-b', '--build-depends',
action='store_const', dest='arch', const='source',
help='Query build dependencies (synonym for '
'--arch=source)')
help='Query build dependencies (synonym for --arch=source)')
parser.add_option('-a', '--arch', metavar='ARCH', default='any',
help='Query dependencies in ARCH. '
'Default: any')

View File

@ -10,7 +10,7 @@ import codecs
# look/set what version we have
changelog = "debian/changelog"
if os.path.exists(changelog):
head=codecs.open(changelog, 'r', 'utf-8', 'replace').readline()
head = codecs.open(changelog, 'r', 'utf-8', 'replace').readline()
match = re.compile(".*\((.*)\).*").match(head)
if match:
version = match.group(1)
@ -19,56 +19,59 @@ if sys.version_info[0] >= 3:
scripts = []
data_files = []
else:
scripts = ['404main',
'backportpackage',
'bitesize',
'check-mir',
'check-symbols',
'dch-repeat',
'grab-merge',
'grep-merges',
'hugdaylist',
'import-bug-from-debian',
'merge-changelog',
'mk-sbuild',
'pbuilder-dist',
'pbuilder-dist-simple',
'pull-debian-debdiff',
'pull-debian-source',
'pull-lp-source',
'pull-revu-source',
'pull-uca-source',
'requestbackport',
'requestsync',
'reverse-build-depends',
'reverse-depends',
'seeded-in-ubuntu',
'setup-packaging-environment',
'sponsor-patch',
'submittodebian',
'syncpackage',
'ubuntu-build',
'ubuntu-iso',
'ubuntu-upload-permission',
'update-maintainer',
]
scripts = [
'404main',
'backportpackage',
'bitesize',
'check-mir',
'check-symbols',
'dch-repeat',
'grab-merge',
'grep-merges',
'hugdaylist',
'import-bug-from-debian',
'merge-changelog',
'mk-sbuild',
'pbuilder-dist',
'pbuilder-dist-simple',
'pull-debian-debdiff',
'pull-debian-source',
'pull-lp-source',
'pull-revu-source',
'pull-uca-source',
'requestbackport',
'requestsync',
'reverse-build-depends',
'reverse-depends',
'seeded-in-ubuntu',
'setup-packaging-environment',
'sponsor-patch',
'submittodebian',
'syncpackage',
'ubuntu-build',
'ubuntu-iso',
'ubuntu-upload-permission',
'update-maintainer',
]
data_files = [
('share/bash-completion/completions', glob.glob("bash_completion/*")),
('share/man/man1', glob.glob("doc/*.1")),
('share/man/man5', glob.glob("doc/*.5")),
('share/ubuntu-dev-tools', ['enforced-editing-wrapper']),
]
]
if __name__ == '__main__':
setup(name='ubuntu-dev-tools',
version=version,
scripts=scripts,
packages=['ubuntutools',
'ubuntutools/lp',
'ubuntutools/requestsync',
'ubuntutools/sponsor_patch',
'ubuntutools/test',
],
data_files=data_files,
test_suite='ubuntutools.test.discover',
setup(
name='ubuntu-dev-tools',
version=version,
scripts=scripts,
packages=[
'ubuntutools',
'ubuntutools/lp',
'ubuntutools/requestsync',
'ubuntutools/sponsor_patch',
'ubuntutools/test',
],
data_files=data_files,
test_suite='ubuntutools.test.discover',
)

View File

@ -23,8 +23,8 @@ import tempfile
from ubuntutools.builder import get_builder
from ubuntutools.config import UDTConfig
from ubuntutools.logger import Logger
from ubuntutools.sponsor_patch.sponsor_patch import (sponsor_patch,
check_dependencies)
from ubuntutools.sponsor_patch.sponsor_patch import sponsor_patch, check_dependencies
def parse(script_name):
"""Parse the command line parameters."""
@ -99,6 +99,7 @@ def parse(script_name):
return (options, bug_number)
def main():
script_name = os.path.basename(sys.argv[0])
(options, bug_number) = parse(script_name)
@ -128,5 +129,6 @@ def main():
if options.workdir is None:
shutil.rmtree(workdir)
if __name__ == "__main__":
main()

View File

@ -39,8 +39,8 @@ from ubuntutools.update_maintainer import update_maintainer, restore_maintainer
try:
from debian.changelog import Changelog
except ImportError:
print (u"This utility requires modules from the «python-debian» package, "
u"which isn't currently installed.")
print(u"This utility requires modules from the «python-debian» package, "
u"which isn't currently installed.")
sys.exit(1)
@ -265,5 +265,6 @@ def main():
os.unlink(debdiff)
shutil.rmtree(tmpdir)
if __name__ == '__main__':
main()

View File

@ -208,7 +208,7 @@ def sync_dsc(src_pkg, debian_dist, release, name, email, bugs, ubuntu_mirror,
remove_signature(src_pkg.dsc_name)
if keyid is not False:
cmd = ["debsign", changes_filename]
if not keyid is None:
if keyid is not None:
cmd.insert(1, "-k" + keyid)
Logger.command(cmd)
subprocess.check_call(cmd)
@ -521,11 +521,12 @@ def parse():
help="Show what would be done, but don't actually do "
"it.")
no_lp = optparse.OptionGroup(parser, "Local sync preparation options",
"Options that only apply when using --no-lp. "
"WARNING: The use of --no-lp is not recommended for uploads "
"targeted at Ubuntu. "
"The archive-admins discourage its use, except for fakesyncs.")
no_lp = optparse.OptionGroup(
parser, "Local sync preparation options",
"Options that only apply when using --no-lp. "
"WARNING: The use of --no-lp is not recommended for uploads "
"targeted at Ubuntu. "
"The archive-admins discourage its use, except for fakesyncs.")
no_lp.add_option("--no-lp",
dest="lp", action="store_false", default=True,
help="Construct sync locally, rather than letting "

View File

@ -32,6 +32,7 @@ from ubuntutools.lp.udtexceptions import (SeriesNotFoundException,
from ubuntutools.lp.lpapicache import Distribution, PersonTeam
from ubuntutools.misc import split_release_pocket
def main():
# Usage.
usage = "%prog <srcpackage> <release> <operation>\n\n"
@ -100,7 +101,7 @@ def main():
try:
package = str(args[0]).lower()
release = str(args[1]).lower()
op = str(args[2]).lower()
op = str(args[2]).lower()
except IndexError:
opt_parser.print_help()
sys.exit(1)
@ -167,8 +168,8 @@ def main():
sys.exit(1)
# Output details.
print ("The source version for '%s' in %s (%s) is at %s."
% (package, release.capitalize(), component, version))
print("The source version for '%s' in %s (%s) is at %s." %
(package, release.capitalize(), component, version))
print "Current build status for this package:"
@ -185,9 +186,8 @@ def main():
if build.can_be_rescored:
# FIXME: make priority an option
priority = 5000
print 'Rescoring build %s to %d...' % \
(build.arch_tag, priority)
build.rescore(score = priority)
print 'Rescoring build %s to %d...' % (build.arch_tag, priority)
build.rescore(score=priority)
else:
print 'Cannot rescore build on %s.' % build.arch_tag
if op == 'retry':
@ -197,13 +197,12 @@ def main():
else:
print 'Cannot retry build on %s.' % build.arch_tag
# We are done
if done:
sys.exit(0)
print ("No builds for '%s' found in the %s release - it may have been "
"built in a former release." % (package, release.capitalize()))
print("No builds for '%s' found in the %s release - it may have been "
"built in a former release." % (package, release.capitalize()))
sys.exit(0)
# Batch mode
@ -272,5 +271,6 @@ def main():
print ''
if __name__ == '__main__':
main()

View File

@ -25,6 +25,7 @@ import sys
from ubuntutools import subprocess
def extract(iso, path):
command = ['isoinfo', '-R', '-i', iso, '-x', path]
pipe = subprocess.Popen(command, stdout=subprocess.PIPE,
@ -37,6 +38,7 @@ def extract(iso, path):
return stdout
def main():
desc = 'Given an ISO, %prog will display the Ubuntu version information'
parser = optparse.OptionParser(usage='%prog [options] iso...',
@ -62,6 +64,7 @@ def main():
if err:
sys.exit(1)
if __name__ == '__main__':
main()
sys.exit(0)

View File

@ -73,7 +73,7 @@ def main():
sys.exit(2)
component = spph.getComponent()
if (options.list_uploaders and (pocket != 'Release' or series.status in
('Experimental', 'Active Development', 'Pre-release Freeze'))):
('Experimental', 'Active Development', 'Pre-release Freeze'))):
component_uploader = archive.getUploadersForComponent(
component_name=component)[0]
@ -110,19 +110,17 @@ def main():
pocket):
print "You can upload %s to %s." % (package, options.release)
else:
print ("You can not upload %s to %s, yourself."
% (package, options.release))
print("You can not upload %s to %s, yourself." % (package, options.release))
if (series.status in ('Current Stable Release', 'Supported', 'Obsolete')
and pocket == 'Release'):
print ("%s is in the '%s' state. "
"You may want to query the %s-proposed pocket."
% (release, series.status, release))
print("%s is in the '%s' state. You may want to query the %s-proposed pocket." %
(release, series.status, release))
else:
print ("But you can still contribute to it via the sponsorship "
"process: https://wiki.ubuntu.com/SponsorshipProcess")
print("But you can still contribute to it via the sponsorship "
"process: https://wiki.ubuntu.com/SponsorshipProcess")
if not options.list_uploaders:
print ("To see who has the necessary upload rights, "
"use the --list-uploaders option.")
print("To see who has the necessary upload rights, "
"use the --list-uploaders option.")
sys.exit(1)
@ -133,9 +131,9 @@ def print_uploaders(uploaders, expand_teams=False, prefix=''):
recursion.
"""
for uploader in sorted(uploaders, key=lambda p: p.display_name):
print ("%s* %s (%s)%s"
% (prefix, uploader.display_name, uploader.name,
' [team]' if uploader.is_team else ''))
print("%s* %s (%s)%s" %
(prefix, uploader.display_name, uploader.name,
' [team]' if uploader.is_team else ''))
if expand_teams and uploader.is_team:
print_uploaders(uploader.participants, True, prefix=prefix + ' ')

View File

@ -29,6 +29,7 @@ Approach:
from __future__ import with_statement, print_function
import codecs
import hashlib
import os.path
try:
@ -36,19 +37,14 @@ try:
from urllib.parse import urlparse
from urllib.error import URLError, HTTPError
except ImportError:
from urllib2 import ProxyHandler, build_opener, urlopen
from urllib2 import ProxyHandler, build_opener, urlopen, URLError, HTTPError
from urlparse import urlparse
from urllib2 import URLError, HTTPError
import re
import sys
if sys.version_info[0] >= 3:
basestring = str
unicode = str
from debian.changelog import Changelog, Version
import debian.deb822
import debian.debian_support
import codecs
import httplib2
from ubuntutools.config import UDTConfig
@ -57,6 +53,10 @@ from ubuntutools.lp.lpapicache import (Launchpad, Distribution,
from ubuntutools.logger import Logger
from ubuntutools import subprocess
if sys.version_info[0] >= 3:
basestring = str
unicode = str
class DownloadError(Exception):
"Unable to pull a source package"
@ -116,8 +116,7 @@ class Dsc(debian.deb822.Dsc):
if name not in their_checksums:
# file only in one dsc
continue
if (size != their_checksums[name][0] or
checksum != their_checksums[name][1]):
if size != their_checksums[name][0] or checksum != their_checksums[name][1]:
return False
return True # one checksum is good enough
return True
@ -176,12 +175,11 @@ class SourcePackage(object):
Launchpad.login_existing(self._lp)
else:
Launchpad.login_anonymously()
spph = (Distribution(self.distribution).getArchive()
.getPublishedSources(
source_name=self.source,
version=self.version.full_version,
exact_match=True,
))
spph = Distribution(self.distribution).getArchive().getPublishedSources(
source_name=self.source,
version=self.version.full_version,
exact_match=True,
)
self._spph = SourcePackagePublishingHistory(spph[0])
return self._spph
@ -344,7 +342,7 @@ class SourcePackage(object):
if block == b'':
break
downloaded += len(block)
out.write(block)
out.write(block)
if not self.quiet:
percent = downloaded * 100 // size
bar = '=' * int(round(downloaded * bar_width / size))
@ -510,9 +508,8 @@ class DebianSourcePackage(SourcePackage):
try:
data = self.url_opener.open(
'http://snapshot.debian.org'
'/mr/package/%s/%s/srcfiles?fileinfo=1'
% (self.source, self.version.full_version))
'http://snapshot.debian.org/mr/package/%s/%s/srcfiles?fileinfo=1' %
(self.source, self.version.full_version))
reader = codecs.getreader('utf-8')
srcfiles = json.load(reader(data))
@ -543,7 +540,7 @@ class UbuntuCloudArchiveSourcePackage(UbuntuSourcePackage):
def __init__(self, uca_release, *args, **kwargs):
super(UbuntuCloudArchiveSourcePackage, self).__init__(*args, **kwargs)
self._uca_release = uca_release
self.masters = [ "http://ubuntu-cloud.archive.canonical.com/ubuntu/" ]
self.masters = ["http://ubuntu-cloud.archive.canonical.com/ubuntu/"]
def _lp_url(self, filename):
"Build a source package URL on Launchpad"
@ -653,23 +650,23 @@ def rmadison(url, package, suite=None, arch=None):
#
# some versions (2.14.1ubuntu0.1) of rmadison return 'sid' when
# asked about 'unstable'. Others return 'unstable'. Accept either.
if (suite and dist != suite and not
if (suite and dist != suite and not
(suite == 'sid' and dist == 'unstable')):
continue
if 'source' in archs:
yield {
'source': pkg,
'version': ver,
'suite': dist,
'component': comp,
}
'source': pkg,
'version': ver,
'suite': dist,
'component': comp,
}
archs.discard('source')
if archs:
yield {
'binary': pkg,
'version': ver,
'suite': dist,
'component': comp,
'architectures': archs,
}
'binary': pkg,
'version': ver,
'suite': dist,
'component': comp,
'architectures': archs,
}

View File

@ -23,11 +23,13 @@ import os
from ubuntutools.logger import Logger
from ubuntutools import subprocess
def _build_preparation(result_directory):
"""prepares the builder for building a package"""
if not os.path.isdir(result_directory):
os.makedirs(result_directory)
class Builder(object):
def __init__(self, name):
self.name = name
@ -37,7 +39,7 @@ class Builder(object):
def _build_failure(self, returncode, dsc_file):
if returncode != 0:
Logger.error("Failed to build %s from source with %s." % \
Logger.error("Failed to build %s from source with %s." %
(os.path.basename(dsc_file), self.name))
return returncode
@ -55,7 +57,7 @@ class Builder(object):
def _update_failure(self, returncode, dist):
if returncode != 0:
Logger.error("Failed to update %s chroot for %s." % \
Logger.error("Failed to update %s chroot for %s." %
(dist, self.name))
return returncode
@ -142,10 +144,10 @@ class Sbuild(Builder):
["sbuild-distupgrade"],
["sbuild-clean", "-a", "-c"]]
for cmd in commands:
#pylint: disable=W0631
# pylint: disable=W0631
Logger.command(cmd + [chroot])
ret = subprocess.call(cmd + [chroot])
#pylint: enable=W0631
# pylint: enable=W0631
if ret != 0:
return self._update_failure(ret, dist)
return 0
@ -159,6 +161,7 @@ _SUPPORTED_BUILDERS = {
"sbuild": lambda: Sbuild(),
}
def get_builder(name):
if name in _SUPPORTED_BUILDERS:
builder = _SUPPORTED_BUILDERS[name]()

View File

@ -25,6 +25,7 @@ import locale
from ubuntutools.logger import Logger
class UDTConfig(object):
"""Ubuntu Dev Tools configuration file (devscripts config file) and
environment variable parsing.
@ -140,12 +141,8 @@ def ubu_email(name=None, email=None, export=True):
if export and not name and not email and 'UBUMAIL' not in os.environ:
export = False
for var, target in (('UBUMAIL', 'email'),
('DEBFULLNAME', 'name'),
('DEBEMAIL', 'email'),
('EMAIL', 'email'),
('NAME', 'name'),
):
for var, target in (('UBUMAIL', 'email'), ('DEBFULLNAME', 'name'), ('DEBEMAIL', 'email'),
('EMAIL', 'email'), ('NAME', 'name')):
if name and email:
break
if var in os.environ:

View File

@ -1,6 +1,6 @@
##
## ubuntu-dev-tools Launchpad Python modules.
##
#
# ubuntu-dev-tools Launchpad Python modules.
#
service = 'production'
api_version = '1.0'

View File

@ -25,6 +25,7 @@ except ImportError:
from urllib import urlencode
from urlparse import urlsplit, urlunsplit
def query_to_dict(query_string):
result = dict()
options = filter(None, query_string.split("&"))
@ -33,6 +34,7 @@ def query_to_dict(query_string):
result.setdefault(key, set()).add(value)
return result
def translate_web_api(url, launchpad):
scheme, netloc, path, query, fragment = urlsplit(url)
query = query_to_dict(query)
@ -45,10 +47,9 @@ def translate_web_api(url, launchpad):
if path.endswith("/+bugs"):
path = path[:-6]
if "ws.op" in query:
raise ValueError("Invalid web url, url: %s" %url)
raise ValueError("Invalid web url, url: %s" % url)
query["ws.op"] = "searchTasks"
scheme, netloc, api_path, _, _ = urlsplit(str(launchpad._root_uri))
query = urlencode(query)
url = urlunsplit((scheme, netloc, api_path + path.lstrip("/"),
query, fragment))
url = urlunsplit((scheme, netloc, api_path + path.lstrip("/"), query, fragment))
return url

View File

@ -24,16 +24,32 @@
from __future__ import print_function
# Uncomment for tracing LP API calls
#import httplib2
#httplib2.debuglevel = 1
# import httplib2
# httplib2.debuglevel = 1
import collections
import sys
from debian.changelog import Changelog, Version
from httplib2 import Http, HttpLib2Error
from launchpadlib.launchpad import Launchpad as LP
from launchpadlib.errors import HTTPError
from lazr.restfulclient.resource import Entry
from ubuntutools.lp import (service, api_version)
from ubuntutools.lp.udtexceptions import (AlreadyLoggedInError,
ArchiveNotFoundException,
ArchSeriesNotFoundException,
PackageNotFoundException,
PocketDoesNotExistError,
SeriesNotFoundException)
if sys.version_info[0] >= 3:
basestring = str
unicode = str
#Shameless steal from python-six
# Shameless steal from python-six
def add_metaclass(metaclass):
"""Class decorator for creating a class with a metaclass."""
def wrapper(cls):
@ -49,20 +65,6 @@ def add_metaclass(metaclass):
return metaclass(cls.__name__, cls.__bases__, orig_vars)
return wrapper
from debian.changelog import Changelog, Version
from httplib2 import Http, HttpLib2Error
from launchpadlib.launchpad import Launchpad as LP
from launchpadlib.errors import HTTPError
from lazr.restfulclient.resource import Entry
from ubuntutools.lp import (service, api_version)
from ubuntutools.lp.udtexceptions import (AlreadyLoggedInError,
ArchiveNotFoundException,
ArchSeriesNotFoundException,
PackageNotFoundException,
PocketDoesNotExistError,
SeriesNotFoundException)
import collections
__all__ = [
'Archive',
@ -121,6 +123,8 @@ class _Launchpad(object):
def __call__(self):
return self
Launchpad = _Launchpad()
@ -131,7 +135,7 @@ class MetaWrapper(type):
def __init__(cls, name, bases, attrd):
super(MetaWrapper, cls).__init__(name, bases, attrd)
if 'resource_type' not in attrd:
raise TypeError('Class "%s" needs an associated resource type' % \
raise TypeError('Class "%s" needs an associated resource type' %
name)
cls._cache = dict()
@ -144,8 +148,7 @@ class BaseWrapper(object):
resource_type = None # it's a base class after all
def __new__(cls, data):
if (isinstance(data, basestring) and
data.startswith(str(Launchpad._root_uri))):
if isinstance(data, basestring) and data.startswith(str(Launchpad._root_uri)):
# looks like a LP API URL
# check if it's already cached
cached = cls._cache.get(data)
@ -161,8 +164,7 @@ class BaseWrapper(object):
if isinstance(data, Entry):
(service_root, resource_type) = data.resource_type_link.split('#')
if (service_root == str(Launchpad._root_uri) and
resource_type in cls.resource_type):
if service_root == str(Launchpad._root_uri) and resource_type in cls.resource_type:
# check if it's already cached
cached = cls._cache.get(data.self_link)
if not cached:
@ -254,7 +256,7 @@ class Distribution(BaseWrapper):
(archive, self.display_name)
raise ArchiveNotFoundException(message)
else:
if not '_main_archive' in self.__dict__:
if '_main_archive' not in self.__dict__:
self._main_archive = Archive(self.main_archive_link)
return self._main_archive
@ -266,8 +268,7 @@ class Distribution(BaseWrapper):
'''
if name_or_version not in self._series:
try:
series = DistroSeries(
self().getSeries(name_or_version=name_or_version))
series = DistroSeries(self().getSeries(name_or_version=name_or_version))
# Cache with name and version
self._series[series.name] = series
self._series[series.version] = series
@ -448,7 +449,7 @@ class Archive(BaseWrapper):
else:
package_type = "package"
msg = ("The %s '%s' does not exist in the %s %s archive" %
(package_type, name, dist.display_name, self.name))
(package_type, name, dist.display_name, self.name))
if archtag is not None and archtag != []:
msg += " for architecture %s" % archtag
pockets = [series.name if pocket == 'Release'
@ -492,10 +493,9 @@ class Archive(BaseWrapper):
'''
if component_name not in self._component_uploaders:
self._component_uploaders[component_name] = sorted(set(
PersonTeam(permission.person_link)
for permission in self._lpobject.getUploadersForComponent(
component_name=component_name
)))
PersonTeam(permission.person_link) for permission in
self._lpobject.getUploadersForComponent(component_name=component_name)
))
return self._component_uploaders[component_name]
def getUploadersForPackage(self, source_package_name):
@ -504,10 +504,9 @@ class Archive(BaseWrapper):
'''
if source_package_name not in self._pkg_uploaders:
self._pkg_uploaders[source_package_name] = sorted(set(
PersonTeam(permission.person_link)
for permission in self._lpobject.getUploadersForPackage(
source_package_name=source_package_name
)))
PersonTeam(permission.person_link) for permission in
self._lpobject.getUploadersForPackage(source_package_name=source_package_name)
))
return self._pkg_uploaders[source_package_name]
def getUploadersForPackageset(self, packageset, direct_permissions=False):
@ -517,11 +516,12 @@ class Archive(BaseWrapper):
key = (packageset, direct_permissions)
if key not in self._pkgset_uploaders:
self._pkgset_uploaders[key] = sorted(set(
PersonTeam(permission.person_link)
for permission in self._lpobject.getUploadersForPackageset(
packageset=packageset._lpobject,
direct_permissions=direct_permissions,
)))
PersonTeam(permission.person_link) for permission in
self._lpobject.getUploadersForPackageset(
packageset=packageset._lpobject,
direct_permissions=direct_permissions,
)
))
return self._pkgset_uploaders[key]
@ -576,9 +576,8 @@ class SourcePackagePublishingHistory(BaseWrapper):
if self._changelog is None:
url = self._lpobject.changelogUrl()
if url is None:
print(('E: No changelog available for %s %s',
(self.getPackageName(),
self.getVersion())), file=sys.stderr)
print('E: No changelog available for %s %s' %
(self.getPackageName(), self.getVersion()), file=sys.stderr)
return None
try:
@ -587,8 +586,7 @@ class SourcePackagePublishingHistory(BaseWrapper):
print(str(e), file=sys.stderr)
return None
if response.status != 200:
print(('%s: %s %s' % (url, response.status,
response.reason)), file=sys.stderr)
print('%s: %s %s' % (url, response.status, response.reason), file=sys.stderr)
return None
self._changelog = changelog
@ -700,7 +698,7 @@ class BinaryPackagePublishingHistory(BaseWrapper):
return self._lpobject.binaryFileUrls()
except AttributeError:
raise AttributeError("binaryFileUrls can only be found in lpapi "
"devel, not 1.0. Login using devel to have it.")
"devel, not 1.0. Login using devel to have it.")
class MetaPersonTeam(MetaWrapper):
@ -727,10 +725,7 @@ class PersonTeam(BaseWrapper):
Wrapper class around a LP person or team object.
'''
resource_type = (
'person',
'team',
)
resource_type = ('person', 'team')
def __init__(self, *args):
# Don't share _upload between different PersonTeams
@ -794,12 +789,12 @@ class PersonTeam(BaseWrapper):
# checkUpload() throws an exception if the person can't upload
try:
archive.checkUpload(
component=component,
distroseries=distroseries(),
person=self(),
pocket=pocket,
sourcepackagename=package,
)
component=component,
distroseries=distroseries(),
person=self(),
pocket=pocket,
sourcepackagename=package,
)
canUpload = True
except HTTPError as e:
if e.response.status == 403:
@ -866,9 +861,7 @@ class Packageset(BaseWrapper):
if distroseries is not None:
params['distroseries'] = distroseries._lpobject
cls._source_sets[key] = [
Packageset(packageset) for packageset
in cls._lp_packagesets.setsIncludingSource(**params)
]
cls._source_sets[key] = [Packageset(packageset) for packageset in
cls._lp_packagesets.setsIncludingSource(**params)]
return cls._source_sets[key]

View File

@ -2,22 +2,27 @@ class PackageNotFoundException(BaseException):
""" Thrown when a package is not found """
pass
class SeriesNotFoundException(BaseException):
""" Thrown when a distroseries is not found """
pass
class PocketDoesNotExistError(Exception):
'''Raised when a invalid pocket is used.'''
pass
class ArchiveNotFoundException(BaseException):
""" Thrown when an archive for a distibution is not found """
pass
class AlreadyLoggedInError(Exception):
'''Raised when a second login is attempted.'''
pass
class ArchSeriesNotFoundException(BaseException):
"""Thrown when a distroarchseries is not found."""
pass

View File

@ -35,6 +35,8 @@ from ubuntutools.lp.udtexceptions import PocketDoesNotExistError
from ubuntutools.subprocess import Popen, PIPE
_system_distribution_chain = []
def system_distribution_chain():
""" system_distribution_chain() -> [string]
@ -51,8 +53,7 @@ def system_distribution_chain():
stdout=PIPE)
_system_distribution_chain.append(p.communicate()[0].strip())
except OSError:
print ('Error: Could not determine what distribution you are '
'running.')
print('Error: Could not determine what distribution you are running.')
return []
while True:
@ -74,6 +75,7 @@ def system_distribution_chain():
return _system_distribution_chain
def system_distribution():
""" system_distro() -> string
@ -83,6 +85,7 @@ def system_distribution():
"""
return system_distribution_chain()[0]
def host_architecture():
""" host_architecture -> string
@ -90,16 +93,16 @@ def host_architecture():
architecture can't be determined, print an error message and return None.
"""
arch = Popen(['dpkg', '--print-architecture'], stdout=PIPE, \
arch = Popen(['dpkg', '--print-architecture'], stdout=PIPE,
stderr=PIPE).communicate()[0].split()
if not arch or 'not found' in arch[0]:
print('Error: Not running on a Debian based system; could not ' \
'detect its architecture.')
print('Error: Not running on a Debian based system; could not detect its architecture.')
return None
return arch[0]
def readlist(filename, uniq=True):
""" readlist(filename, uniq) -> list
@ -124,6 +127,7 @@ def readlist(filename, uniq=True):
return items
def split_release_pocket(release, default='Release'):
'''Splits the release and pocket name.
@ -141,23 +145,23 @@ def split_release_pocket(release, default='Release'):
(release, pocket) = release.rsplit('-', 1)
pocket = pocket.capitalize()
if pocket not in ('Release', 'Security', 'Updates', 'Proposed',
'Backports'):
raise PocketDoesNotExistError("Pocket '%s' does not exist." % \
pocket)
if pocket not in ('Release', 'Security', 'Updates', 'Proposed', 'Backports'):
raise PocketDoesNotExistError("Pocket '%s' does not exist." % pocket)
return (release, pocket)
def require_utf8():
'''Can be called by programs that only function in UTF-8 locales'''
if locale.getpreferredencoding() != 'UTF-8':
print(("This program only functions in a UTF-8 locale. "
"Aborting."), file=sys.stderr)
print("This program only functions in a UTF-8 locale. Aborting.", file=sys.stderr)
sys.exit(1)
_vendor_to_distroinfo = {"Debian": distro_info.DebianDistroInfo,
"Ubuntu": distro_info.UbuntuDistroInfo}
def vendor_to_distroinfo(vendor):
""" vendor_to_distroinfo(string) -> DistroInfo class
@ -167,6 +171,7 @@ def vendor_to_distroinfo(vendor):
"""
return _vendor_to_distroinfo.get(vendor)
def codename_to_distribution(codename):
""" codename_to_distribution(string) -> string

View File

@ -22,12 +22,12 @@ import tempfile
import os
import re
import sys
if sys.version_info[0] < 3:
input = raw_input
import ubuntutools.subprocess
if sys.version_info[0] < 3:
input = raw_input # noqa
class Question(object):
def __init__(self, options, show_help=True):
@ -73,8 +73,7 @@ class Question(object):
if selected == option[0]:
selected = option
if selected not in self.options:
print("Please answer the question with " + \
self.get_options() + ".")
print("Please answer the question with " + self.get_options() + ".")
return selected
@ -101,8 +100,7 @@ def input_number(question, min_number, max_number, default=None):
try:
selected = int(selected)
if selected < min_number or selected > max_number:
print("Please input a number between %i and %i." % \
(min_number, max_number))
print("Please input a number between %i and %i." % (min_number, max_number))
except ValueError:
print("Please input a number.")
assert type(selected) == int

View File

@ -82,6 +82,7 @@ team.''')
return need_sponsor
def check_existing_reports(srcpkg):
'''
Check existing bug reports on Launchpad for a possible sync request.
@ -108,6 +109,7 @@ def check_existing_reports(srcpkg):
% (bug.title, bug.web_link))
confirmation_prompt()
def get_ubuntu_delta_changelog(srcpkg):
'''
Download the Ubuntu changelog and extract the entries since the last sync
@ -153,6 +155,7 @@ def get_ubuntu_delta_changelog(srcpkg):
return '\n'.join(delta)
def post_bug(srcpkg, subscribe, status, bugtitle, bugtext):
'''
Use the LP API to file the sync request.
@ -181,7 +184,7 @@ def post_bug(srcpkg, subscribe, status, bugtitle, bugtext):
task.status = status
task.lp_save()
bug.subscribe(person = PersonTeam(subscribe)())
bug.subscribe(person=PersonTeam(subscribe)())
print('Sync request filed as bug #%i: %s'
% (bug.id, bug.web_link))

View File

@ -29,10 +29,6 @@ import smtplib
import socket
import tempfile
if sys.version_info[0] >= 3:
basestring = str
unicode = str
from debian.changelog import Changelog, Version
from distro_info import DebianDistroInfo, DistroDataOutdated
@ -42,6 +38,11 @@ from ubuntutools.logger import Logger
from ubuntutools.question import confirmation_prompt, YesNoQuestion
from ubuntutools import subprocess
if sys.version_info[0] >= 3:
basestring = str
unicode = str
__all__ = [
'get_debian_srcpkg',
'get_ubuntu_srcpkg',
@ -51,6 +52,7 @@ __all__ = [
'mail_bug',
]
def _get_srcpkg(distro, name, release):
if distro == 'debian':
# Canonicalise release:
@ -64,33 +66,33 @@ def _get_srcpkg(distro, name, release):
if not lines:
lines = list(rmadison(distro, name, suite=release, arch='source'))
if not lines:
raise PackageNotFoundException("'%s' doesn't appear to exist "
"in %s '%s'"
% (name, distro.capitalize(),
release))
raise PackageNotFoundException("'%s' doesn't appear to exist in %s '%s'" %
(name, distro.capitalize(), release))
pkg = max(lines, key=lambda x: Version(x['version']))
return FakeSPPH(pkg['source'], pkg['version'], pkg['component'], distro)
def get_debian_srcpkg(name, release):
return _get_srcpkg('debian', name, release)
def get_ubuntu_srcpkg(name, release):
return _get_srcpkg('ubuntu', name, release)
def need_sponsorship(name, component, release):
'''
Ask the user if he has upload permissions for the package or the
component.
'''
val = YesNoQuestion().ask("Do you have upload permissions for the "
"'%s' component or the package '%s' in "
"Ubuntu %s?\n"
"If in doubt answer 'n'."
% (component, name, release), 'no')
val = YesNoQuestion().ask("Do you have upload permissions for the '%s' component or "
"the package '%s' in Ubuntu %s?\nIf in doubt answer 'n'." %
(component, name, release), 'no')
return val == 'no'
def check_existing_reports(srcpkg):
'''
Point the user to the URL to manually check for duplicate bug reports.
@ -100,6 +102,7 @@ def check_existing_reports(srcpkg):
'for duplicate sync requests before continuing.' % srcpkg)
confirmation_prompt()
def get_ubuntu_delta_changelog(srcpkg):
'''
Download the Ubuntu changelog and extract the entries since the last sync
@ -119,6 +122,7 @@ def get_ubuntu_delta_changelog(srcpkg):
return u'\n'.join(delta)
def mail_bug(srcpkg, subscribe, status, bugtitle, bugtext, bug_mail_domain,
keyid, myemailaddr, mailserver_host, mailserver_port,
mailserver_user, mailserver_pass):
@ -177,9 +181,11 @@ Content-Type: text/plain; charset=UTF-8
confirmation_prompt()
# save mail in temporary file
backup = tempfile.NamedTemporaryFile(mode='w', delete=False,
prefix='requestsync-' + re.sub(r'[^a-zA-Z0-9_-]', '',
bugtitle.replace(' ', '_')))
backup = tempfile.NamedTemporaryFile(
mode='w',
delete=False,
prefix='requestsync-' + re.sub(r'[^a-zA-Z0-9_-]', '', bugtitle.replace(' ', '_'))
)
with backup:
backup.write(mail)
@ -201,8 +207,8 @@ Content-Type: text/plain; charset=UTF-8
Logger.error('Could not connect to %s:%s: %s (%i)',
mailserver_host, mailserver_port, s[1], s[0])
if s.smtp_code == 421:
confirmation_prompt(message='This is a temporary error, press '
'[Enter] to retry. Press [Ctrl-C] to abort now.')
confirmation_prompt(message='This is a temporary error, press [Enter] '
'to retry. Press [Ctrl-C] to abort now.')
if mailserver_user and mailserver_pass:
try:
@ -228,8 +234,8 @@ Content-Type: text/plain; charset=UTF-8
smtp_code, smtp_message = smtperror.recipients[to]
Logger.error('Error while sending: %i, %s', smtp_code, smtp_message)
if smtp_code == 450:
confirmation_prompt(message='This is a temporary error, press '
'[Enter] to retry. Press [Ctrl-C] to abort now.')
confirmation_prompt(message='This is a temporary error, press [Enter] '
'to retry. Press [Ctrl-C] to abort now.')
else:
return
except smtplib.SMTPResponseException as e:

View File

@ -29,6 +29,7 @@ import httplib2
from ubuntutools.logger import Logger
def is_sync(bug):
"""Checks if a Launchpad bug is a sync request.

View File

@ -23,6 +23,7 @@ from ubuntutools.logger import Logger
from ubuntutools.sponsor_patch.question import ask_for_manual_fixing
from functools import reduce
class Patch(object):
"""This object represents a patch that can be downloaded from Launchpad."""
@ -31,7 +32,7 @@ class Patch(object):
self._patch_file = re.sub(" |/", "_", patch.title)
if not reduce(lambda r, x: r or self._patch.title.endswith(x),
(".debdiff", ".diff", ".patch"), False):
Logger.info("Patch %s does not have a proper file extension." % \
Logger.info("Patch %s does not have a proper file extension." %
(self._patch.title))
self._patch_file += ".patch"
self._full_path = os.path.realpath(self._patch_file)
@ -39,8 +40,7 @@ class Patch(object):
def apply(self, task):
"""Applies the patch in the current directory."""
assert self._changed_files is not None, \
"You forgot to download the patch."
assert self._changed_files is not None, "You forgot to download the patch."
edit = False
if self.is_debdiff():
cmd = ["patch", "--merge", "--force", "-p",
@ -77,8 +77,7 @@ class Patch(object):
def get_strip_level(self):
"""Returns the stript level for the patch."""
assert self._changed_files is not None, \
"You forgot to download the patch."
assert self._changed_files is not None, "You forgot to download the patch."
strip_level = None
if self.is_debdiff():
changelog = [f for f in self._changed_files
@ -88,7 +87,6 @@ class Patch(object):
def is_debdiff(self):
"""Checks if the patch is a debdiff (= modifies debian/changelog)."""
assert self._changed_files is not None, \
"You forgot to download the patch."
assert self._changed_files is not None, "You forgot to download the patch."
return len([f for f in self._changed_files
if f.endswith("debian/changelog")]) > 0

View File

@ -21,6 +21,7 @@ import sys
from ubuntutools.question import Question, YesNoQuestion
def ask_for_ignoring_or_fixing():
"""Ask the user to resolve an issue manually or ignore it.
@ -34,6 +35,7 @@ def ask_for_ignoring_or_fixing():
user_abort()
return answer == "ignore"
def ask_for_manual_fixing():
"""Ask the user to resolve an issue manually."""
@ -42,6 +44,7 @@ def ask_for_manual_fixing():
if answer == "no":
user_abort()
def user_abort():
"""Print abort and quit the program."""

View File

@ -32,16 +32,16 @@ from ubuntutools.sponsor_patch.question import (ask_for_ignoring_or_fixing,
ask_for_manual_fixing,
user_abort)
def _get_series(launchpad):
"""Returns a tuple with the development and list of supported series."""
#pylint: disable=E1101
ubuntu = launchpad.distributions['ubuntu']
#pylint: enable=E1101
devel_series = ubuntu.current_series.name
supported_series = [series.name for series in ubuntu.series
if series.active and series.name != devel_series]
return (devel_series, supported_series)
def strip_epoch(version):
"""Removes the epoch from a Debian version string.
@ -55,6 +55,7 @@ def strip_epoch(version):
version_without_epoch = ':'.join(parts)
return version_without_epoch
class SourcePackage(object):
"""This class represents a source package."""
@ -134,8 +135,7 @@ class SourcePackage(object):
else:
target = upload
question = Question(["yes", "edit", "no"])
answer = question.ask("Do you want to upload the package to %s" % \
target, "no")
answer = question.ask("Do you want to upload the package to %s" % target, "no")
if answer == "edit":
return False
elif answer == "no":
@ -143,7 +143,7 @@ class SourcePackage(object):
cmd = ["dput", "--force", upload, self._changes_file]
Logger.command(cmd)
if subprocess.call(cmd) != 0:
Logger.error("Upload of %s to %s failed." % \
Logger.error("Upload of %s to %s failed." %
(os.path.basename(self._changes_file), upload))
sys.exit(1)
@ -175,8 +175,8 @@ class SourcePackage(object):
if dist is None:
dist = re.sub("-.*$", "", self._changelog.distributions)
build_name = self._package + "_" + strip_epoch(self._version) + \
"_" + self._builder.get_architecture() + ".build"
build_name = "{}_{}_{}.build".format(self._package, strip_epoch(self._version),
self._builder.get_architecture())
self._build_log = os.path.join(self._buildresult, build_name)
successful_built = False
@ -195,8 +195,7 @@ class SourcePackage(object):
self._buildresult)
if result != 0:
question = Question(["yes", "update", "retry", "no"])
answer = question.ask("Do you want to resolve this issue "
"manually", "yes")
answer = question.ask("Do you want to resolve this issue manually", "yes")
if answer == "yes":
break
elif answer == "update":
@ -237,7 +236,7 @@ class SourcePackage(object):
cmd.append("-sd")
else:
cmd.append("-sa")
if not keyid is None:
if keyid is not None:
cmd += ["-k" + keyid]
env = os.environ
if upload == 'ubuntu':
@ -253,9 +252,8 @@ class SourcePackage(object):
@property
def _changes_file(self):
"""Returns the file name of the .changes file."""
return os.path.join(self._workdir, self._package + "_" +
strip_epoch(self._version) +
"_source.changes")
return os.path.join(self._workdir, "{}_{}_source.changes"
.format(self._package, strip_epoch(self._version)))
def check_target(self, upload, launchpad):
"""Make sure that the target is correct.
@ -271,16 +269,14 @@ class SourcePackage(object):
[s + "-proposed" for s in supported_series] + \
[devel_series]
if self._changelog.distributions not in allowed:
Logger.error(("%s is not an allowed series. It needs to be one "
"of %s.") % (self._changelog.distributions,
", ".join(allowed)))
Logger.error("%s is not an allowed series. It needs to be one of %s." %
(self._changelog.distributions, ", ".join(allowed)))
return ask_for_ignoring_or_fixing()
elif upload and upload.startswith("ppa/"):
allowed = supported_series + [devel_series]
if self._changelog.distributions not in allowed:
Logger.error(("%s is not an allowed series. It needs to be one "
"of %s.") % (self._changelog.distributions,
", ".join(allowed)))
Logger.error("%s is not an allowed series. It needs to be one of %s." %
(self._changelog.distributions, ", ".join(allowed)))
return ask_for_ignoring_or_fixing()
return True
@ -303,24 +299,22 @@ class SourcePackage(object):
if not task.title_contains(self._version):
print("Bug #%i title: %s" % (bug_number, task.get_bug_title()))
msg = "Is %s %s the version that should be synced" % (self._package,
self._version)
answer = YesNoQuestion().ask(msg, "no")
msg = "Is %s %s the version that should be synced" % (self._package, self._version)
answer = YesNoQuestion().ask(msg, "no")
if answer == "no":
user_abort()
@property
def _debdiff_filename(self):
"""Returns the file name of the .debdiff file."""
debdiff_name = self._package + "_" + strip_epoch(self._version) + \
".debdiff"
debdiff_name = "{}_{}.debdiff".format(self._package, strip_epoch(self._version))
return os.path.join(self._workdir, debdiff_name)
@property
def _dsc_file(self):
"""Returns the file name of the .dsc file."""
return os.path.join(self._workdir, self._package + "_" +
strip_epoch(self._version) + ".dsc")
return os.path.join(self._workdir, "{}_{}.dsc".format(self._package,
strip_epoch(self._version)))
def generate_debdiff(self, dsc_file):
"""Generates a debdiff between the given .dsc file and this source
@ -348,8 +342,7 @@ class SourcePackage(object):
change something.
"""
assert os.path.isfile(self._changes_file), "%s does not exist." % \
(self._changes_file)
assert os.path.isfile(self._changes_file), "%s does not exist." % (self._changes_file)
changes = debian.deb822.Changes(open(self._changes_file))
fixed_bugs = []
if "Launchpad-Bugs-Fixed" in changes:
@ -362,8 +355,7 @@ class SourcePackage(object):
lp_bug = lp_bug.duplicate_of
if lp_bug.id not in fixed_bugs:
Logger.error("Launchpad bug #%i is not closed by new version." % \
(lp_bug.id))
Logger.error("Launchpad bug #%i is not closed by new version." % (lp_bug.id))
return ask_for_ignoring_or_fixing()
return True
@ -378,7 +370,6 @@ class SourcePackage(object):
if self._build_log:
print("file://" + self._build_log)
def reload_changelog(self):
"""Reloads debian/changelog and updates the version.
@ -400,7 +391,7 @@ class SourcePackage(object):
try:
self._version = self._changelog.get_version()
except IndexError:
Logger.error("Debian package version could not be determined. " \
Logger.error("Debian package version could not be determined. "
"debian/changelog is probably malformed.")
ask_for_manual_fixing()
return False

View File

@ -75,6 +75,7 @@ def get_source_package_name(bug_task):
package = bug_task.bug_target_name.split(" ")[0]
return package
def get_user_shell():
try:
shell = os.environ["SHELL"]
@ -82,6 +83,7 @@ def get_user_shell():
shell = pwd.getpwuid(os.getuid())[6]
return shell
def edit_source():
# Spawn shell to allow modifications
cmd = [get_user_shell()]
@ -96,6 +98,7 @@ process, exit the shell such that it returns an exit code other than zero.
Logger.error("Shell exited with exit value %i." % (returncode))
sys.exit(1)
def ask_for_patch_or_branch(bug, attached_patches, linked_branches):
patch = None
branch = None
@ -130,6 +133,7 @@ def ask_for_patch_or_branch(bug, attached_patches, linked_branches):
patch = Patch(attached_patches[selected - len(linked_branches) - 1])
return (patch, branch)
def get_patch_or_branch(bug):
patch = None
branch = None
@ -155,6 +159,7 @@ def get_patch_or_branch(bug):
linked_branches)
return (patch, branch)
def download_branch(branch):
dir_name = os.path.basename(branch)
if os.path.isdir(dir_name):
@ -166,6 +171,7 @@ def download_branch(branch):
sys.exit(1)
return dir_name
def merge_branch(branch):
edit = False
cmd = ["bzr", "merge", branch]
@ -176,6 +182,7 @@ def merge_branch(branch):
edit = True
return edit
def extract_source(dsc_file, verbose=False):
cmd = ["dpkg-source", "--no-preparation", "-x", dsc_file]
if not verbose:
@ -185,6 +192,7 @@ def extract_source(dsc_file, verbose=False):
Logger.error("Extraction of %s failed." % (os.path.basename(dsc_file)))
sys.exit(1)
def get_open_ubuntu_bug_task(launchpad, bug, branch=None):
"""Returns an open Ubuntu bug task for a given Launchpad bug.
@ -207,12 +215,11 @@ def get_open_ubuntu_bug_task(launchpad, bug, branch=None):
elif len(ubuntu_tasks) == 1:
task = ubuntu_tasks[0]
if len(ubuntu_tasks) > 1 and branch and branch[1] == 'ubuntu':
tasks = [task for task in ubuntu_tasks
if task.get_series() == branch[2]
and task.package == branch[3]]
tasks = [t for t in ubuntu_tasks if
t.get_series() == branch[2] and t.package == branch[3]]
if len(tasks) > 1:
# A bug targeted to the development series?
tasks = [task for task in tasks if task.series is not None]
tasks = [t for t in tasks if t.series is not None]
assert len(tasks) == 1
task = tasks[0]
elif len(ubuntu_tasks) > 1:
@ -223,8 +230,8 @@ def get_open_ubuntu_bug_task(launchpad, bug, branch=None):
if len(open_ubuntu_tasks) == 1:
task = open_ubuntu_tasks[0]
else:
Logger.normal("https://launchpad.net/bugs/%i has %i Ubuntu tasks:" \
% (bug_id, len(ubuntu_tasks)))
Logger.normal("https://launchpad.net/bugs/%i has %i Ubuntu tasks:" %
(bug_id, len(ubuntu_tasks)))
for i in range(len(ubuntu_tasks)):
print("%i) %s" % (i + 1,
ubuntu_tasks[i].get_package_and_series()))
@ -234,6 +241,7 @@ def get_open_ubuntu_bug_task(launchpad, bug, branch=None):
Logger.info("Selected Ubuntu task: %s" % (task.get_short_info()))
return task
def _create_and_change_into(workdir):
"""Create (if it does not exits) and change into given working directory."""
@ -241,13 +249,14 @@ def _create_and_change_into(workdir):
try:
os.makedirs(workdir)
except os.error as error:
Logger.error("Failed to create the working directory %s [Errno " \
"%i]: %s." % (workdir, error.errno, error.strerror))
Logger.error("Failed to create the working directory %s [Errno %i]: %s." %
(workdir, error.errno, error.strerror))
sys.exit(1)
if workdir != os.getcwd():
Logger.command(["cd", workdir])
os.chdir(workdir)
def _update_maintainer_field():
"""Update the Maintainer field in debian/control."""
Logger.command(["update-maintainer"])
@ -257,6 +266,7 @@ def _update_maintainer_field():
Logger.error("update-maintainer failed: %s", str(e))
sys.exit(1)
def _update_timestamp():
"""Run dch to update the timestamp of debian/changelog."""
cmd = ["dch", "--maintmaint", "--release", ""]
@ -264,6 +274,7 @@ def _update_timestamp():
if subprocess.call(cmd) != 0:
Logger.info("Failed to update timestamp in debian/changelog.")
def _download_and_change_into(task, dsc_file, patch, branch):
"""Downloads the patch and branch and changes into the source directory."""
@ -290,15 +301,14 @@ def _download_and_change_into(task, dsc_file, patch, branch):
Logger.command(["cd", directory])
os.chdir(directory)
def sponsor_patch(bug_number, build, builder, edit, keyid, lpinstance, update,
upload, workdir):
workdir = os.path.realpath(os.path.expanduser(workdir))
_create_and_change_into(workdir)
launchpad = Launchpad.login_with("sponsor-patch", lpinstance)
#pylint: disable=E1101
bug = launchpad.bugs[bug_number]
#pylint: enable=E1101
(patch, branch) = get_patch_or_branch(bug)
task = get_open_ubuntu_bug_task(launchpad, bug, branch)

View File

@ -26,10 +26,10 @@ class Popen(subprocess.Popen):
def __init__(self, *args, **kwargs):
kwargs.setdefault('close_fds', True)
if ('restore_signals' not in
inspect.getargspec(subprocess.Popen.__init__)[0]):
if 'restore_signals' not in inspect.getargspec(subprocess.Popen.__init__)[0]:
given_preexec_fn = kwargs.pop('preexec_fn', None)
restore_signals = kwargs.pop('restore_signals', True)
def preexec_fn():
if restore_signals:
for sig in ('SIGPIPE', 'SIGXFZ', 'SIGXFSZ'):

View File

@ -22,6 +22,7 @@ if sys.version_info < (2, 7):
else:
import unittest
def discover():
# import __main__ triggers code re-execution
__main__ = sys.modules['__main__']

View File

@ -20,6 +20,7 @@ import subprocess
import debian.debian_support
class ExamplePackage(object):
def __init__(self, source='example', version='1.0-1'):
self.source = source

View File

@ -15,33 +15,23 @@
# PERFORMANCE OF THIS SOFTWARE.
try:
import builtins
except ImportError:
import __builtin__
import os.path
import shutil
try:
from StringIO import StringIO
except:
from io import StringIO
from io import BytesIO
import sys
import tempfile
import types
from io import BytesIO
try:
from urllib.request import OpenerDirector, urlopen
from urllib.error import HTTPError, URLError
except ImportError:
from urllib2 import OpenerDirector, urlopen
from urllib2 import HTTPError, URLError
import debian.deb822
import httplib2
import sys
import mock
import debian.deb822
import ubuntutools.archive
from ubuntutools.config import UDTConfig
from ubuntutools.logger import Logger
from ubuntutools.test import unittest
from ubuntutools.test.example_package import ExamplePackage
@ -117,7 +107,7 @@ class LocalSourcePackageTestCase(unittest.TestCase):
# Silence the tests a little:
self._stubout('ubuntutools.logger.Logger.stdout')
self._stubout('ubuntutools.logger.Logger.stderr')
self._stubout('ubuntutools.logger.Logger.stderr')
def _stubout(self, stub):
patcher = mock.patch(stub)
@ -211,14 +201,10 @@ class LocalSourcePackageTestCase(unittest.TestCase):
pkg = self.SourcePackage('example', '1.0-1', 'main',
dscfile='test-data/example_1.0-1.dsc',
workdir=self.workdir)
pkg.quiet = True
pkg.quiet = True
pkg.pull()
def test_pull(self):
dist = self.SourcePackage.distribution
mirror = UDTConfig.defaults['%s_MIRROR' % dist.upper()]
urlbase = '/pool/main/e/example/'
pkg = self.SourcePackage('example', '1.0-1', 'main',
workdir=self.workdir)
@ -227,12 +213,10 @@ class LocalSourcePackageTestCase(unittest.TestCase):
pkg.pull()
def test_mirrors(self):
master = UDTConfig.defaults['UBUNTU_MIRROR']
mirror = 'http://mirror'
lpbase = 'https://launchpad.net/ubuntu/+archive/primary/+files/'
urlbase = '/pool/main/e/example/'
sequence = [self.urlopen_null, self.urlopen_404, self.urlopen_proxy,
self.urlopen_proxy]
def _callable_iter(*args, **kwargs):
return sequence.pop(0)(*args, **kwargs)
url_opener = mock.MagicMock(spec=OpenerDirector)
@ -241,14 +225,14 @@ class LocalSourcePackageTestCase(unittest.TestCase):
pkg = self.SourcePackage('example', '1.0-1', 'main',
workdir=self.workdir, mirrors=[mirror])
pkg.url_opener = url_opener
pkg.quiet = True
pkg.quiet = True
pkg.pull()
def test_dsc_missing(self):
self.mock_http.side_effect = self.request_404
pkg = self.SourcePackage('example', '1.0-1', 'main',
workdir=self.workdir)
pkg.quiet = True
pkg.quiet = True
self.assertRaises(ubuntutools.archive.DownloadError, pkg.pull)
@ -256,12 +240,8 @@ class DebianLocalSourcePackageTestCase(LocalSourcePackageTestCase):
SourcePackage = ubuntutools.archive.DebianSourcePackage
def test_mirrors(self):
debian_master = UDTConfig.defaults['DEBIAN_MIRROR']
debsec_master = UDTConfig.defaults['DEBSEC_MIRROR']
debian_mirror = 'http://mirror/debian'
debsec_mirror = 'http://mirror/debsec'
lpbase = 'https://launchpad.net/debian/+archive/primary/+files/'
base = '/pool/main/e/example/'
sequence = [self.urlopen_null,
self.urlopen_404,
@ -272,6 +252,7 @@ class DebianLocalSourcePackageTestCase(LocalSourcePackageTestCase):
b'{"fileinfo": {"hashabc": [{"name": "example_1.0.orig.tar.gz"}]}}'),
self.urlopen_file('example_1.0.orig.tar.gz'),
self.urlopen_proxy]
def _callable_iter(*args, **kwargs):
return sequence.pop(0)(*args, **kwargs)
url_opener = mock.MagicMock(spec=OpenerDirector)
@ -287,8 +268,6 @@ class DebianLocalSourcePackageTestCase(LocalSourcePackageTestCase):
def test_dsc_missing(self):
mirror = 'http://mirror'
lpbase = 'https://launchpad.net/debian/+archive/primary/+files/'
base = '/pool/main/e/example/'
self.mock_http.side_effect = self.request_404_then_proxy
patcher = mock.patch.object(debian.deb822.GpgInfo, 'from_sequence')
@ -305,16 +284,14 @@ class DebianLocalSourcePackageTestCase(LocalSourcePackageTestCase):
def test_dsc_badsig(self):
mirror = 'http://mirror'
lpbase = 'https://launchpad.net/debian/+archive/primary/+files/'
base = '/pool/main/e/example/'
self.mock_http.side_effect = self.request_404_then_proxy
self.mock_http.side_effect = self.request_404_then_proxy
patcher = mock.patch.object(debian.deb822.GpgInfo, 'from_sequence')
self.addCleanup(patcher.stop)
mock_gpg_info = patcher.start()
mock_gpg_info.return_value = debian.deb822.GpgInfo.from_output(
'[GNUPG:] ERRSIG DEADBEEF')
pkg = self.SourcePackage('example', '1.0-1', 'main',
workdir=self.workdir, mirrors=[mirror])
try:

View File

@ -15,10 +15,6 @@
# OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
# PERFORMANCE OF THIS SOFTWARE.
try:
import builtins
except ImportError:
import __builtin__
import os
import sys
import locale
@ -33,6 +29,7 @@ from ubuntutools.config import UDTConfig, ubu_email
from ubuntutools.logger import Logger
from ubuntutools.test import unittest
class ConfigTestCase(unittest.TestCase):
_config_files = {
'system': '',
@ -63,7 +60,7 @@ class ConfigTestCase(unittest.TestCase):
patcher = mock.patch(target, m)
self.addCleanup(patcher.stop)
patcher.start()
Logger.stdout = StringIO()
Logger.stderr = StringIO()
@ -183,22 +180,22 @@ class UbuEmailTestCase(unittest.TestCase):
del os.environ[k]
def test_pristine(self):
os.environ['DEBFULLNAME'] = name = 'Joe Developer'
os.environ['DEBEMAIL'] = email = 'joe@example.net'
os.environ['DEBFULLNAME'] = name = 'Joe Developer'
os.environ['DEBEMAIL'] = email = 'joe@example.net'
self.assertEqual(ubu_email(), (name, email))
def test_two_hat(self):
os.environ['DEBFULLNAME'] = name = 'Joe Developer'
os.environ['DEBEMAIL'] = 'joe@debian.org'
os.environ['UBUMAIL'] = email = 'joe@ubuntu.com'
os.environ['DEBFULLNAME'] = name = 'Joe Developer'
os.environ['DEBEMAIL'] = 'joe@debian.org'
os.environ['UBUMAIL'] = email = 'joe@ubuntu.com'
self.assertEqual(ubu_email(), (name, email))
self.assertEqual(os.environ['DEBFULLNAME'], name)
self.assertEqual(os.environ['DEBEMAIL'], email)
def test_two_hat_cmdlineoverride(self):
os.environ['DEBFULLNAME'] = 'Joe Developer'
os.environ['DEBEMAIL'] = 'joe@debian.org'
os.environ['UBUMAIL'] = 'joe@ubuntu.com'
os.environ['DEBEMAIL'] = 'joe@debian.org'
os.environ['UBUMAIL'] = 'joe@ubuntu.com'
name = 'Foo Bar'
email = 'joe@example.net'
self.assertEqual(ubu_email(name, email), (name, email))
@ -206,16 +203,16 @@ class UbuEmailTestCase(unittest.TestCase):
self.assertEqual(os.environ['DEBEMAIL'], email)
def test_two_hat_noexport(self):
os.environ['DEBFULLNAME'] = name = 'Joe Developer'
os.environ['DEBEMAIL'] = demail = 'joe@debian.org'
os.environ['UBUMAIL'] = uemail = 'joe@ubuntu.com'
os.environ['DEBFULLNAME'] = name = 'Joe Developer'
os.environ['DEBEMAIL'] = demail = 'joe@debian.org'
os.environ['UBUMAIL'] = uemail = 'joe@ubuntu.com'
self.assertEqual(ubu_email(export=False), (name, uemail))
self.assertEqual(os.environ['DEBFULLNAME'], name)
self.assertEqual(os.environ['DEBEMAIL'], demail)
def test_two_hat_with_name(self):
os.environ['DEBFULLNAME'] = 'Joe Developer'
os.environ['DEBEMAIL'] = 'joe@debian.org'
os.environ['DEBEMAIL'] = 'joe@debian.org'
name = 'Joe Ubuntunista'
email = 'joe@ubuntu.com'
os.environ['UBUMAIL'] = '%s <%s>' % (name, email)
@ -242,6 +239,7 @@ class UbuEmailTestCase(unittest.TestCase):
try:
os.environ['DEBFULLNAME'] = env_name
except UnicodeEncodeError:
raise unittest.SkipTest("python interpreter is not running in an unicode capable locale")
os.environ['DEBEMAIL'] = email = 'joe@example.net'
raise unittest.SkipTest("python interpreter is not running in an "
"unicode capable locale")
os.environ['DEBEMAIL'] = email = 'joe@example.net'
self.assertEqual(ubu_email(), (name, email))

View File

@ -26,6 +26,7 @@ from ubuntutools.test import unittest
TIMEOUT = 5
def load_tests(loader, tests, pattern):
"Give HelpTestCase a chance to populate before loading its test cases"
suite = unittest.TestSuite()
@ -33,6 +34,7 @@ def load_tests(loader, tests, pattern):
suite.addTests(loader.loadTestsFromTestCase(HelpTestCase))
return suite
class HelpTestCase(unittest.TestCase):
@classmethod
def populate(cls):

View File

@ -25,7 +25,7 @@ from ubuntutools import subprocess
class PylintTestCase(unittest.TestCase):
def test_pylint(self):
"Test: Run pylint on Python source code"
files = ['ubuntutools']
files = ['ubuntutools', 'setup.py']
for script in setup.scripts:
with open(script, 'r') as script_file:
shebang = script_file.readline()

View File

@ -16,13 +16,9 @@
"""Test suite for ubuntutools.update_maintainer"""
try:
import builtins
except ImportError:
import __builtin__
try:
from StringIO import StringIO
except:
except ImportError:
from io import StringIO
import os
@ -34,7 +30,8 @@ from ubuntutools.logger import Logger
from ubuntutools.test import unittest
from ubuntutools.update_maintainer import update_maintainer
_LUCID_CHANGELOG = """axis2c (1.6.0-0ubuntu8) lucid; urgency=low
_LUCID_CHANGELOG = """\
axis2c (1.6.0-0ubuntu8) lucid; urgency=low
* rebuild rest of main for armel armv7/thumb2 optimization;
UbuntuSpec:mobile-lucid-arm-gcc-v7-thumb2
@ -42,7 +39,8 @@ _LUCID_CHANGELOG = """axis2c (1.6.0-0ubuntu8) lucid; urgency=low
-- Alexander Sack <asac@ubuntu.com> Fri, 05 Mar 2010 03:10:28 +0100
"""
_AXIS2C_CONTROL = """Source: axis2c
_AXIS2C_CONTROL = """\
Source: axis2c
Section: libs
Priority: optional
DM-Upload-Allowed: yes
@ -56,7 +54,8 @@ Architecture: any
Depends: ${shlibs:Depends}, ${misc:Depends}
"""
_AXIS2C_UPDATED = """Source: axis2c
_AXIS2C_UPDATED = """\
Source: axis2c
Section: libs
Priority: optional
DM-Upload-Allowed: yes
@ -70,14 +69,16 @@ Architecture: any
Depends: ${shlibs:Depends}, ${misc:Depends}
"""
_UNSTABLE_CHANGELOG = """adblock-plus (1.3.3-1) unstable; urgency=low
_UNSTABLE_CHANGELOG = """\
adblock-plus (1.3.3-1) unstable; urgency=low
* New upstream release.
-- Benjamin Drung <bdrung@ubuntu.com> Sat, 25 Dec 2010 20:17:41 +0100
"""
_ABP_CONTROL = """Source: adblock-plus
_ABP_CONTROL = """\
Source: adblock-plus
Section: web
Priority: optional
Maintainer: Dmitry E. Oboukhov <unera@debian.org>
@ -93,7 +94,8 @@ VCS-Git: git://git.debian.org/pkg-mozext/adblock-plus.git
Package: xul-ext-adblock-plus
"""
_ABP_UPDATED = """Source: adblock-plus
_ABP_UPDATED = """\
Source: adblock-plus
Section: web
Priority: optional
Maintainer: Ubuntu Developers <ubuntu-devel-discuss@lists.ubuntu.com>
@ -110,7 +112,8 @@ VCS-Git: git://git.debian.org/pkg-mozext/adblock-plus.git
Package: xul-ext-adblock-plus
"""
_ABP_OLD_MAINTAINER = """Source: adblock-plus
_ABP_OLD_MAINTAINER = """\
Source: adblock-plus
Section: web
Priority: optional
Maintainer: Ubuntu MOTU Developers <ubuntu-motu@lists.ubuntu.com>
@ -127,37 +130,40 @@ VCS-Git: git://git.debian.org/pkg-mozext/adblock-plus.git
Package: xul-ext-adblock-plus
"""
_SIMPLE_RULES = """#!/usr/bin/make -f
_SIMPLE_RULES = """\
#!/usr/bin/make -f
%:
dh $@
\tdh $@
"""
_COMPLEX_RULES = """#!/usr/bin/make -f
_COMPLEX_RULES = """\
#!/usr/bin/make -f
... from python2.6 ...
distribution := $(shell lsb_release -is)
control-file:
sed -e "s/@PVER@/$(PVER)/g" \
-e "s/@VER@/$(VER)/g" \
-e "s/@PYSTDDEP@/$(PYSTDDEP)/g" \
-e "s/@PRIO@/$(PY_PRIO)/g" \
-e "s/@MINPRIO@/$(PY_MINPRIO)/g" \
debian/control.in > debian/control.tmp
\tsed -e "s/@PVER@/$(PVER)/g" \\
\t\t-e "s/@VER@/$(VER)/g" \\
\t\t-e "s/@PYSTDDEP@/$(PYSTDDEP)/g" \\
\t\t-e "s/@PRIO@/$(PY_PRIO)/g" \\
\t\t-e "s/@MINPRIO@/$(PY_MINPRIO)/g" \\
\t\tdebian/control.in > debian/control.tmp
ifeq ($(distribution),Ubuntu)
ifneq (,$(findstring ubuntu, $(PKGVERSION)))
m='Ubuntu Core Developers <ubuntu-devel-discuss@lists.ubuntu.com>'; \
sed -i "/^Maintainer:/s/\(.*\)/Maintainer: $$m\nXSBC-Original-\1/" \
debian/control.tmp
\tm='Ubuntu Core Developers <ubuntu-devel-discuss@lists.ubuntu.com>'; \\
\t\tsed -i "/^Maintainer:/s/\\(.*\\)/Maintainer: $$m\nXSBC-Original-\1/" \\
\t\tdebian/control.tmp
endif
endif
[ -e debian/control ] \
&& cmp -s debian/control debian/control.tmp \
&& rm -f debian/control.tmp && exit 0; \
mv debian/control.tmp debian/control
\t[ -e debian/control ] \\
\t\t&& cmp -s debian/control debian/control.tmp \\
\t\t&& rm -f debian/control.tmp && exit 0; \\
\t\tmv debian/control.tmp debian/control
... from python2.6 ...
"""
_SEAHORSE_PLUGINS_CONTROL = """# This file is autogenerated. DO NOT EDIT!
_SEAHORSE_PLUGINS_CONTROL = """\
# This file is autogenerated. DO NOT EDIT!
#
# Modifications should be made to debian/control.in instead.
# This file is regenerated automatically in the clean target.
@ -174,7 +180,8 @@ Homepage: http://live.gnome.org/Seahorse
Package: seahorse-plugins
"""
_SEAHORSE_PLUGINS_UPDATED = """# This file is autogenerated. DO NOT EDIT!
_SEAHORSE_PLUGINS_UPDATED = """\
# This file is autogenerated. DO NOT EDIT!
#
# Modifications should be made to debian/control.in instead.
# This file is regenerated automatically in the clean target.
@ -192,7 +199,8 @@ Homepage: http://live.gnome.org/Seahorse
Package: seahorse-plugins
"""
#pylint: disable=R0904
# pylint: disable=R0904
class UpdateMaintainerTestCase(unittest.TestCase):
"""TestCase object for ubuntutools.update_maintainer"""
@ -214,14 +222,14 @@ class UpdateMaintainerTestCase(unittest.TestCase):
"""Provide StringIO objects instead of real files."""
directory, base = os.path.split(filename)
if (directory != self._directory or base not in self._files or
(mode == "r" and self._files[base] is None)):
(mode == "r" and self._files[base] is None)):
raise IOError("No such file or directory: '%s'" % filename)
if mode == "w":
self._files[base] = StringIO()
self._files[base].close = lambda: None
return self._files[base]
#pylint: disable=C0103
# pylint: disable=C0103
def setUp(self):
if sys.version_info[0] < 3:
self.assertRegex = self.assertRegexpMatches
@ -252,7 +260,7 @@ class UpdateMaintainerTestCase(unittest.TestCase):
Logger.stdout = sys.stdout
Logger.stderr = sys.stderr
#pylint: enable=C0103
# pylint: enable=C0103
def test_debian_package(self):
"""Test: Don't update Maintainer field if target is Debian."""
self._files["changelog"] = StringIO(_UNSTABLE_CHANGELOG)
@ -272,9 +280,8 @@ class UpdateMaintainerTestCase(unittest.TestCase):
warnings = Logger.stderr.getvalue().strip()
Logger.stderr = StringIO()
self.assertEqual(len(warnings.splitlines()), 1)
self.assertRegex(warnings, "Warning: Overwriting original "
"maintainer: Soren Hansen "
"<soren@ubuntu.com>")
self.assertRegex(warnings, "Warning: Overwriting original maintainer: "
"Soren Hansen <soren@ubuntu.com>")
def test_update_maintainer(self):
"""Test: Update Maintainer field."""

View File

@ -169,8 +169,7 @@ def update_maintainer(debian_directory, verbose=False):
if original_maintainer.strip().endswith("ubuntu.com>"):
if verbose:
print ("The Maintainer email is set to an ubuntu.com address. "
"Doing nothing.")
print("The Maintainer email is set to an ubuntu.com address. Doing nothing.")
continue
if distribution in ("stable", "testing", "unstable", "experimental"):

View File

@ -53,5 +53,6 @@ def main():
except MaintainerUpdateException:
sys.exit(1)
if __name__ == "__main__":
main()