Port ubuntutools module to python3.

This commit is contained in:
Dimitri John Ledkov 2014-12-19 22:37:04 +00:00
commit 646093d74a
25 changed files with 464 additions and 333 deletions

3
debian/changelog vendored
View File

@ -7,6 +7,9 @@ ubuntu-dev-tools (0.154) UNRELEASED; urgency=medium
* mk-sbuild: better message for cross build so that new start have
correct sbuild command from the last message of mk-sbuild.
[ Dimitri John Ledkov ]
* Port ubuntutools module to python3.
-- Logan Rosen <logan@ubuntu.com> Wed, 23 Apr 2014 17:24:12 -0400
ubuntu-dev-tools (0.153) unstable; urgency=medium

33
debian/control vendored
View File

@ -8,6 +8,7 @@ Vcs-Bzr: lp:ubuntu-dev-tools
Vcs-Browser: https://code.launchpad.net/~ubuntu-dev/ubuntu-dev-tools/trunk
Build-Depends: dctrl-tools,
debhelper (>= 9),
dh-python,
devscripts (>= 2.11.0~),
distro-info (>= 0.2~),
libwww-perl,
@ -19,11 +20,19 @@ Build-Depends: dctrl-tools,
python-distro-info (>= 0.4~),
python-httplib2,
python-launchpadlib (>= 1.5.7),
python-mox,
python-setuptools,
python-soappy,
python-unittest2
python-unittest2,
python3-all,
python3-apt,
python3-debian,
python3-distro-info,
python3-httplib2,
python3-launchpadlib,
python3-setuptools,
python3-soappy,
X-Python-Version: >= 2.6
X-Python3-Version: >= 3.2
Homepage: https://launchpad.net/ubuntu-dev-tools
Standards-Version: 3.9.5
@ -114,3 +123,23 @@ Description: useful tools for Ubuntu developers
- ubuntu-upload-permission - query / list the upload permissions for a
package.
- update-maintainer - script to update maintainer field in ubuntu packages.
Package: python-ubuntutools
Architecture: all
Depends: ${python:Depends}
Breaks: ubuntu-dev-tools (<< 0.154)
Replaces: ubuntu-dev-tools (<< 0.154)
Description: useful library of APIs for Ubuntu developer tools (Python 2)
This package ships a collection of APIs, helpers and wrappers used to
develop useful utiliteis for Ubuntu developers.
.
Python 2 variant.
Package: python3-ubuntutools
Architecture: all
Depends: ${python3:Depends}
Description: useful library of APIs for Ubuntu developer tools
This package ships a collection of APIs, helpers and wrappers used to
develop useful utiliteis for Ubuntu developers.
.
Python 3 variant.

19
debian/rules vendored
View File

@ -1,12 +1,13 @@
#!/usr/bin/make -f
%:
dh $@ --with python2
export PYBUILD_NAME=ubuntutools
ifeq (,$(filter nocheck,$(DEB_BUILD_OPTIONS)))
override_dh_auto_test:
set -e; \
for python in $(shell pyversions -r); do \
$$python setup.py test; \
done
endif
%:
dh $@ --with python2,python3 --buildsystem=pybuild
override_dh_install:
dh_install
mkdir -p debian/ubuntu-dev-tools/usr
mv debian/python-ubuntutools/etc debian/ubuntu-dev-tools
mv debian/python-ubuntutools/usr/bin debian/ubuntu-dev-tools/usr/
mv debian/python-ubuntutools/usr/share debian/ubuntu-dev-tools/usr/

View File

@ -4,6 +4,7 @@ from setuptools import setup
import glob
import os
import re
import sys
# look/set what version we have
changelog = "debian/changelog"
@ -13,7 +14,11 @@ if os.path.exists(changelog):
if match:
version = match.group(1)
scripts = ['404main',
if sys.version_info[0] >= 3:
scripts = []
data_files = []
else:
scripts = ['404main',
'backportpackage',
'bitesize',
'check-mir',
@ -46,6 +51,12 @@ scripts = ['404main',
'ubuntu-upload-permission',
'update-maintainer',
]
data_files = [
('/etc/bash_completion.d', glob.glob("bash_completion/*")),
('share/man/man1', glob.glob("doc/*.1")),
('share/man/man5', glob.glob("doc/*.5")),
('share/ubuntu-dev-tools', ['enforced-editing-wrapper']),
]
if __name__ == '__main__':
setup(name='ubuntu-dev-tools',
@ -57,11 +68,6 @@ if __name__ == '__main__':
'ubuntutools/sponsor_patch',
'ubuntutools/test',
],
data_files=[('/etc/bash_completion.d',
glob.glob("bash_completion/*")),
('share/man/man1', glob.glob("doc/*.1")),
('share/man/man5', glob.glob("doc/*.5")),
('share/ubuntu-dev-tools', ['enforced-editing-wrapper']),
],
data_files=data_files,
test_suite='ubuntutools.test.discover',
)

View File

@ -27,18 +27,28 @@ Approach:
3. Verify checksums.
"""
from __future__ import with_statement
from __future__ import with_statement, print_function
import hashlib
import os.path
import urllib2
import urlparse
try:
from urllib.request import ProxyHandler, build_opener, urlopen
from urllib.parse import urlparse
from urllib.error import URLError, HTTPError
except ImportError:
from urllib2 import ProxyHandler, build_opener, urlopen
from urlparse import urlparse
from urllib2 import URLError, HTTPError
import re
import sys
if sys.version_info[0] >= 3:
basestring = str
unicode = str
from debian.changelog import Changelog, Version
import debian.deb822
import debian.debian_support
import codecs
import httplib2
from ubuntutools.config import UDTConfig
@ -81,7 +91,7 @@ class Dsc(debian.deb822.Dsc):
f = open(pathname, 'rb')
while True:
buf = f.read(hash_func.block_size)
if buf == '':
if buf == b'':
break
hash_func.update(buf)
f.close()
@ -102,7 +112,7 @@ class Dsc(debian.deb822.Dsc):
their_checksums = \
dict((entry['name'], (int(entry['size']), entry[key]))
for entry in other[field])
for name, (size, checksum) in our_checksums.iteritems():
for name, (size, checksum) in our_checksums.items():
if name not in their_checksums:
# file only in one dsc
continue
@ -154,8 +164,8 @@ class SourcePackage(object):
self.version = debian.debian_support.Version(version)
# uses default proxies from the environment
proxy = urllib2.ProxyHandler()
self.url_opener = urllib2.build_opener(proxy)
proxy = ProxyHandler()
self.url_opener = build_opener(proxy)
@property
def lp_spph(self):
@ -231,10 +241,10 @@ class SourcePackage(object):
def pull_dsc(self):
"Retrieve dscfile and parse"
if self._dsc_source:
parsed = urlparse.urlparse(self._dsc_source)
parsed = urlparse(self._dsc_source)
if parsed.scheme == '':
self._dsc_source = 'file://' + os.path.abspath(self._dsc_source)
parsed = urlparse.urlparse(self._dsc_source)
parsed = urlparse(self._dsc_source)
url = self._dsc_source
else:
url = self._lp_url(self.dsc_name)
@ -244,14 +254,14 @@ class SourcePackage(object):
def _download_dsc(self, url):
"Download specified dscfile and parse"
parsed = urlparse.urlparse(url)
parsed = urlparse(url)
if parsed.scheme == 'file':
with open(parsed.path, 'r') as f:
body = f.read()
else:
try:
response, body = httplib2.Http().request(url)
except httplib2.HttpLib2Error, e:
except httplib2.HttpLib2Error as e:
raise DownloadError(e)
if response.status != 200:
raise DownloadError("%s: %s %s" % (url, response.status,
@ -299,7 +309,7 @@ class SourcePackage(object):
"Write dsc file to workdir"
if self._dsc is None:
self.pull_dsc()
with open(self.dsc_pathname, 'w') as f:
with open(self.dsc_pathname, 'wb') as f:
f.write(self.dsc.raw_text)
def _download_file(self, url, filename):
@ -312,17 +322,17 @@ class SourcePackage(object):
if entry['name'] == filename]
assert len(size) == 1
size = int(size[0])
parsed = urlparse.urlparse(url)
parsed = urlparse(url)
if not self.quiet:
Logger.normal('Downloading %s from %s (%0.3f MiB)',
filename, parsed.hostname, size / 1024.0 / 1024)
if parsed.scheme == 'file':
in_ = open(parsed.path, 'r')
in_ = open(parsed.path, 'rb')
else:
try:
in_ = self.url_opener.open(url)
except urllib2.URLError:
except URLError:
return False
downloaded = 0
@ -331,10 +341,10 @@ class SourcePackage(object):
with open(pathname, 'wb') as out:
while True:
block = in_.read(10240)
if block == '':
if block == b'':
break
downloaded += len(block)
out.write(block)
out.write(block)
if not self.quiet:
percent = downloaded * 100 // size
bar = '=' * int(round(downloaded * bar_width / size))
@ -360,9 +370,9 @@ class SourcePackage(object):
try:
if self._download_file(url, name):
break
except urllib2.HTTPError, e:
except HTTPError as e:
Logger.normal('HTTP Error %i: %s', e.code, str(e))
except urllib2.URLError, e:
except URLError as e:
Logger.normal('URL Error: %s', e.reason)
else:
raise DownloadError('File %s could not be found' % name)
@ -457,7 +467,7 @@ class DebianSourcePackage(SourcePackage):
wrapped_iterator = super(DebianSourcePackage, self)._source_urls(name)
while True:
try:
yield wrapped_iterator.next()
yield next(wrapped_iterator)
except StopIteration:
break
if self.snapshot_list:
@ -499,11 +509,14 @@ class DebianSourcePackage(SourcePackage):
"python-simplejson")
try:
srcfiles = json.load(self.url_opener.open(
data = self.url_opener.open(
'http://snapshot.debian.org'
'/mr/package/%s/%s/srcfiles?fileinfo=1'
% (self.source, self.version.full_version)))
except urllib2.HTTPError:
% (self.source, self.version.full_version))
reader = codecs.getreader('utf-8')
srcfiles = json.load(reader(data))
except HTTPError:
Logger.error('Version %s of %s not found on '
'snapshot.debian.org',
self.version.full_version, self.source)
@ -511,7 +524,7 @@ class DebianSourcePackage(SourcePackage):
return False
self._snapshot_list = dict((info[0]['name'], hash_)
for hash_, info
in srcfiles['fileinfo'].iteritems())
in srcfiles['fileinfo'].items())
return self._snapshot_list
def _snapshot_url(self, name):
@ -569,9 +582,9 @@ class FakeSPPH(object):
self.name + '_' + pkgversion,
'changelog' + extension)
try:
self._changelog = urllib2.urlopen(url).read()
except urllib2.HTTPError, error:
print >> sys.stderr, ('%s: %s' % (url, error))
self._changelog = urlopen(url).read()
except HTTPError as error:
print(('%s: %s' % (url, error)), file=sys.stderr)
return None
if since_version is None:

View File

@ -179,6 +179,6 @@ def ubu_email(name=None, email=None, export=True):
encoding = locale.getdefaultlocale()[1]
if not encoding:
encoding = 'utf-8'
if name:
if name and isinstance(name, bytes):
name = name.decode(encoding)
return name, email

View File

@ -14,7 +14,12 @@
import json
import os.path
import sys
import urllib2
try:
from urllib.request import urlopen
from urllib.error import URLError
except ImportError:
from urllib2 import urlopen
from urllib2 import URLError
from ubuntutools.logger import Logger
@ -32,11 +37,11 @@ class Harvest(object):
def _get_data(self):
try:
sock = urllib2.urlopen(self.data_url)
sock = urlopen(self.data_url)
except IOError:
try:
urllib2.urlopen(BASE_URL)
except urllib2.URLError:
urlopen(BASE_URL)
except URLError:
Logger.error("Harvest is down.")
sys.exit(1)
return None
@ -45,9 +50,7 @@ class Harvest(object):
return json.loads(response)
def opportunity_summary(self):
l = []
for key in filter(lambda a: a != "total", self.data.keys()):
l += ["%s (%s)" % (key, self.data[key])]
l = ["%s (%s)" % (k,v) for (k,v) in self.data.items() if k != "total"]
return ", ".join(l)
def report(self):

View File

@ -19,8 +19,11 @@
#
# Modules.
import urllib
import urlparse
try:
from urllib.parse import urlsplit, urlencode, urlunsplit
except ImportError:
from urllib import urlencode
from urlparse import urlsplit, urlunsplit
def query_to_dict(query_string):
result = dict()
@ -31,7 +34,7 @@ def query_to_dict(query_string):
return result
def translate_web_api(url, launchpad):
scheme, netloc, path, query, fragment = urlparse.urlsplit(url)
scheme, netloc, path, query, fragment = urlsplit(url)
query = query_to_dict(query)
differences = set(netloc.split('.')).symmetric_difference(
@ -44,8 +47,8 @@ def translate_web_api(url, launchpad):
if "ws.op" in query:
raise ValueError("Invalid web url, url: %s" %url)
query["ws.op"] = "searchTasks"
scheme, netloc, api_path, _, _ = urlparse.urlsplit(str(launchpad._root_uri))
query = urllib.urlencode(query)
url = urlparse.urlunsplit((scheme, netloc, api_path + path.lstrip("/"),
scheme, netloc, api_path, _, _ = urlsplit(str(launchpad._root_uri))
query = urlencode(query)
url = urlunsplit((scheme, netloc, api_path + path.lstrip("/"),
query, fragment))
return url

View File

@ -21,12 +21,34 @@
#
# Based on code written by Jonathan Davies <jpds@ubuntu.com>
from __future__ import print_function
# Uncomment for tracing LP API calls
#import httplib2
#httplib2.debuglevel = 1
import sys
if sys.version_info[0] >= 3:
basestring = str
unicode = str
#Shameless steal from python-six
def add_metaclass(metaclass):
"""Class decorator for creating a class with a metaclass."""
def wrapper(cls):
orig_vars = cls.__dict__.copy()
slots = orig_vars.get('__slots__')
if slots is not None:
if isinstance(slots, str):
slots = [slots]
for slots_var in slots:
orig_vars.pop(slots_var)
orig_vars.pop('__dict__', None)
orig_vars.pop('__weakref__', None)
return metaclass(cls.__name__, cls.__bases__, orig_vars)
return wrapper
from debian.changelog import Changelog, Version
from httplib2 import Http, HttpLib2Error
from launchpadlib.launchpad import Launchpad as LP
@ -39,6 +61,7 @@ from ubuntutools.lp.udtexceptions import (AlreadyLoggedInError,
PackageNotFoundException,
PocketDoesNotExistError,
SeriesNotFoundException)
import collections
__all__ = [
'Archive',
@ -64,8 +87,8 @@ class _Launchpad(object):
try:
self.__lp = LP.login_with('ubuntu-dev-tools', service,
version=api_version)
except IOError, error:
print >> sys.stderr, 'E: %s' % error
except IOError as error:
print('E: %s' % error, file=sys.stderr)
raise
else:
raise AlreadyLoggedInError('Already logged in to Launchpad.')
@ -112,11 +135,11 @@ class MetaWrapper(type):
cls._cache = dict()
@add_metaclass(MetaWrapper)
class BaseWrapper(object):
'''
A base class from which other wrapper classes are derived.
'''
__metaclass__ = MetaWrapper
resource_type = None # it's a base class after all
def __new__(cls, data):
@ -149,7 +172,7 @@ class BaseWrapper(object):
cls._cache[data.self_link] = cached
# add additional class specific caching (if available)
cache = getattr(cls, 'cache', None)
if callable(cache):
if isinstance(cache, collections.Callable):
cache(cached)
return cached
else:
@ -158,7 +181,7 @@ class BaseWrapper(object):
else:
# not a LP API representation, let the specific class handle it
fetch = getattr(cls, 'fetch', None)
if callable(fetch):
if isinstance(fetch, collections.Callable):
return fetch(data)
else:
raise NotImplementedError("Don't know how to fetch '%s' from LP"
@ -502,19 +525,19 @@ class SourcePackagePublishingHistory(BaseWrapper):
if self._changelog is None:
url = self._lpobject.changelogUrl()
if url is None:
print >> sys.stderr, ('E: No changelog available for %s %s',
print(('E: No changelog available for %s %s',
(self.getPackageName(),
self.getVersion()))
self.getVersion())), file=sys.stderr)
return None
try:
response, changelog = Http().request(url)
except HttpLib2Error, e:
print >> sys.stderr, str(e)
except HttpLib2Error as e:
print(str(e), file=sys.stderr)
return None
if response.status != 200:
print >> sys.stderr, ('%s: %s %s' % (url, response.status,
response.reason))
print(('%s: %s %s' % (url, response.status,
response.reason)), file=sys.stderr)
return None
self._changelog = changelog
@ -627,7 +650,7 @@ class MetaPersonTeam(MetaWrapper):
if '_me' not in cls.__dict__:
try:
cls._me = PersonTeam(Launchpad.me)
except HTTPError, error:
except HTTPError as error:
if error.response.status == 401:
# Anonymous login
cls._me = None
@ -636,11 +659,11 @@ class MetaPersonTeam(MetaWrapper):
return cls._me
@add_metaclass(MetaPersonTeam)
class PersonTeam(BaseWrapper):
'''
Wrapper class around a LP person or team object.
'''
__metaclass__ = MetaPersonTeam
resource_type = (
'person',
@ -716,7 +739,7 @@ class PersonTeam(BaseWrapper):
sourcepackagename=package,
)
canUpload = True
except HTTPError, e:
except HTTPError as e:
if e.response.status == 403:
canUpload = False
else:

View File

@ -22,6 +22,8 @@
#
# ##################################################################
from __future__ import print_function
# Modules.
import locale
import os
@ -66,8 +68,8 @@ def system_distribution_chain():
break
_system_distribution_chain.append(parent)
except Exception:
print ('Error: Could not determine the parent of the '
'distribution %s' % _system_distribution_chain[-1])
print(('Error: Could not determine the parent of the '
'distribution %s' % _system_distribution_chain[-1]))
return []
return _system_distribution_chain
@ -92,8 +94,8 @@ def host_architecture():
stderr=PIPE).communicate()[0].split()
if not arch or 'not found' in arch[0]:
print 'Error: Not running on a Debian based system; could not ' \
'detect its architecture.'
print('Error: Not running on a Debian based system; could not ' \
'detect its architecture.')
return None
return arch[0]
@ -106,13 +108,13 @@ def readlist(filename, uniq=True):
"""
if not os.path.isfile(filename):
print 'File "%s" does not exist.' % filename
print('File "%s" does not exist.' % filename)
return False
content = open(filename).read().replace('\n', ' ').replace(',', ' ')
if not content.strip():
print 'File "%s" is empty.' % filename
print('File "%s" is empty.' % filename)
return False
items = [item for item in content.split() if item]
@ -149,8 +151,8 @@ def split_release_pocket(release, default='Release'):
def require_utf8():
'''Can be called by programs that only function in UTF-8 locales'''
if locale.getpreferredencoding() != 'UTF-8':
print >> sys.stderr, ("This program only functions in a UTF-8 locale. "
"Aborting.")
print(("This program only functions in a UTF-8 locale. "
"Aborting."), file=sys.stderr)
sys.exit(1)

View File

@ -16,10 +16,15 @@
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
# OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
from __future__ import print_function
import tempfile
import os
import re
import sys
if sys.version_info[0] < 3:
input = raw_input
import ubuntutools.subprocess
@ -56,9 +61,9 @@ class Question(object):
selected = None
while selected not in self.options:
try:
selected = raw_input(question).strip().lower()
selected = input(question).strip().lower()
except (EOFError, KeyboardInterrupt):
print '\nAborting as requested.'
print('\nAborting as requested.')
sys.exit(1)
if selected == "":
selected = default
@ -68,8 +73,8 @@ class Question(object):
if selected == option[0]:
selected = option
if selected not in self.options:
print "Please answer the question with " + \
self.get_options() + "."
print("Please answer the question with " + \
self.get_options() + ".")
return selected
@ -86,9 +91,9 @@ def input_number(question, min_number, max_number, default=None):
selected = None
while selected < min_number or selected > max_number:
try:
selected = raw_input(question).strip()
selected = input(question).strip()
except (EOFError, KeyboardInterrupt):
print '\nAborting as requested.'
print('\nAborting as requested.')
sys.exit(1)
if default and selected == "":
selected = default
@ -96,10 +101,10 @@ def input_number(question, min_number, max_number, default=None):
try:
selected = int(selected)
if selected < min_number or selected > max_number:
print "Please input a number between %i and %i." % \
(min_number, max_number)
print("Please input a number between %i and %i." % \
(min_number, max_number))
except ValueError:
print "Please input a number."
print("Please input a number.")
assert type(selected) == int
return selected
@ -113,9 +118,9 @@ def confirmation_prompt(message=None, action=None):
action = 'continue'
message = 'Press [Enter] to %s. Press [Ctrl-C] to abort now.' % action
try:
raw_input(message)
input(message)
except (EOFError, KeyboardInterrupt):
print '\nAborting as requested.'
print('\nAborting as requested.')
sys.exit(1)
@ -129,9 +134,9 @@ class EditFile(object):
def edit(self, optional=False):
if optional:
print "\n\nCurrently the %s looks like:" % self.description
print("\n\nCurrently the %s looks like:" % self.description)
with open(self.filename, 'r') as f:
print f.read()
print(f.read())
if YesNoQuestion().ask("Edit", "no") == "no":
return
@ -150,12 +155,12 @@ class EditFile(object):
placeholders_present = True
if placeholders_present:
print ("Placeholders still present in the %s. "
"Please replace them with useful information."
% self.description)
print("Placeholders still present in the %s. "
"Please replace them with useful information."
% self.description)
confirmation_prompt(action='edit again')
elif not modified:
print "The %s was not modified" % self.description
print("The %s was not modified" % self.description)
if YesNoQuestion().ask("Edit again", "yes") == "no":
done = True
elif self.check_edit():
@ -189,8 +194,8 @@ class EditBugReport(EditFile):
report = f.read().decode('utf-8')
if self.split_re.match(report) is None:
print ("The %s doesn't start with 'Summary:' and 'Description:' "
"blocks" % self.description)
print("The %s doesn't start with 'Summary:' and 'Description:' "
"blocks" % self.description)
confirmation_prompt('edit again')
return False
return True

View File

@ -20,6 +20,8 @@
# Please see the /usr/share/common-licenses/GPL-2 file for the full text
# of the GNU General Public License license.
from __future__ import print_function
import re
from debian.deb822 import Changes
@ -39,7 +41,7 @@ def get_debian_srcpkg(name, release):
try:
release = DebianDistroInfo().codename(release, None, release)
except DistroDataOutdated, e:
except DistroDataOutdated as e:
Logger.warn(e)
return debian_archive.getSourcePackage(name, release)
@ -71,11 +73,11 @@ def need_sponsorship(name, component, release):
need_sponsor = not PersonTeam.me.canUploadPackage(archive, distroseries,
name, component)
if need_sponsor:
print '''You are not able to upload this package directly to Ubuntu.
print('''You are not able to upload this package directly to Ubuntu.
Your sync request shall require an approval by a member of the appropriate
sponsorship team, who shall be subscribed to this bug report.
This must be done before it can be processed by a member of the Ubuntu Archive
team.'''
team.''')
confirmation_prompt()
return need_sponsor
@ -98,12 +100,12 @@ def check_existing_reports(srcpkg):
for bug in pkg_bug_list:
# check for Sync or sync and the package name
if not bug.is_complete and 'ync %s' % srcpkg in bug.title:
print ('The following bug could be a possible duplicate sync bug '
'on Launchpad:\n'
' * %s (%s)\n'
'Please check the above URL to verify this before '
'continuing.'
% (bug.title, bug.web_link))
print('The following bug could be a possible duplicate sync bug '
'on Launchpad:\n'
' * %s (%s)\n'
'Please check the above URL to verify this before '
'continuing.'
% (bug.title, bug.web_link))
confirmation_prompt()
def get_ubuntu_delta_changelog(srcpkg):
@ -127,7 +129,7 @@ def get_ubuntu_delta_changelog(srcpkg):
break
try:
response, body = Http().request(changes_url)
except HttpLib2Error, e:
except HttpLib2Error as e:
Logger.error(str(e))
break
if response.status != 200:
@ -156,8 +158,8 @@ def post_bug(srcpkg, subscribe, status, bugtitle, bugtext):
Use the LP API to file the sync request.
'''
print ('The final report is:\nSummary: %s\nDescription:\n%s\n'
% (bugtitle, bugtext))
print('The final report is:\nSummary: %s\nDescription:\n%s\n'
% (bugtitle, bugtext))
confirmation_prompt()
if srcpkg:
@ -181,5 +183,5 @@ def post_bug(srcpkg, subscribe, status, bugtitle, bugtext):
bug.subscribe(person = PersonTeam(subscribe)())
print ('Sync request filed as bug #%i: %s'
% (bug.id, bug.web_link))
print('Sync request filed as bug #%i: %s'
% (bug.id, bug.web_link))

View File

@ -20,6 +20,8 @@
# Please see the /usr/share/common-licenses/GPL-2 file for the full text
# of the GNU General Public License license.
from __future__ import print_function
import os
import re
import sys
@ -27,6 +29,10 @@ import smtplib
import socket
import tempfile
if sys.version_info[0] >= 3:
basestring = str
unicode = str
from debian.changelog import Changelog, Version
from distro_info import DebianDistroInfo, DistroDataOutdated
@ -51,7 +57,7 @@ def _get_srcpkg(distro, name, release):
debian_info = DebianDistroInfo()
try:
release = debian_info.codename(release, default=release)
except DistroDataOutdated, e:
except DistroDataOutdated as e:
Logger.warn(e)
lines = list(rmadison(distro, name, suite=release, arch='source'))
@ -86,9 +92,9 @@ def check_existing_reports(srcpkg):
'''
Point the user to the URL to manually check for duplicate bug reports.
'''
print ('Please check on '
'https://bugs.launchpad.net/ubuntu/+source/%s/+bugs\n'
'for duplicate sync requests before continuing.' % srcpkg)
print('Please check on '
'https://bugs.launchpad.net/ubuntu/+source/%s/+bugs\n'
'for duplicate sync requests before continuing.' % srcpkg)
confirmation_prompt()
def get_ubuntu_delta_changelog(srcpkg):
@ -164,7 +170,7 @@ Content-Type: text/plain; charset=UTF-8
%s''' % (myemailaddr, to, bugtitle, signed_report)
print 'The final report is:\n%s' % mail
print('The final report is:\n%s' % mail)
confirmation_prompt()
# save mail in temporary file
@ -184,11 +190,11 @@ Content-Type: text/plain; charset=UTF-8
mailserver_port)
s = smtplib.SMTP(mailserver_host, mailserver_port)
break
except socket.error, s:
except socket.error as s:
Logger.error('Could not connect to %s:%s: %s (%i)',
mailserver_host, mailserver_port, s[1], s[0])
return
except smtplib.SMTPConnectError, s:
except smtplib.SMTPConnectError as s:
Logger.error('Could not connect to %s:%s: %s (%i)',
mailserver_host, mailserver_port, s[1], s[0])
if s.smtp_code == 421:
@ -215,7 +221,7 @@ Content-Type: text/plain; charset=UTF-8
os.remove(backup.name)
Logger.normal('Sync request mailed.')
break
except smtplib.SMTPRecipientsRefused, smtperror:
except smtplib.SMTPRecipientsRefused as smtperror:
smtp_code, smtp_message = smtperror.recipients[to]
Logger.error('Error while sending: %i, %s', smtp_code, smtp_message)
if smtp_code == 450:
@ -223,7 +229,7 @@ Content-Type: text/plain; charset=UTF-8
'[Enter] to retry. Press [Ctrl-C] to abort now.')
else:
return
except smtplib.SMTPResponseException, e:
except smtplib.SMTPResponseException as e:
Logger.error('Error while sending: %i, %s',
e.smtp_code, e.smtp_error)
return

View File

@ -17,7 +17,11 @@
import os
import re
import urllib
try:
from urllib.parse import unquote
from urllib.request import urlretrieve
except ImportError:
from urllib import unquote, urlretrieve
import debian.debian_support
import distro_info
@ -63,7 +67,7 @@ class BugTask(object):
source_files = self.get_source().sourceFileUrls()
dsc_file = ""
for url in source_files:
filename = urllib.unquote(os.path.basename(url))
filename = unquote(os.path.basename(url))
Logger.info("Downloading %s..." % (filename))
# HttpLib2 isn't suitable for large files (it reads into memory),
# but we want its https certificate validation on the .dsc
@ -75,7 +79,7 @@ class BugTask(object):
dsc_file = os.path.join(os.getcwd(), filename)
else:
urllib.urlretrieve(url, filename)
urlretrieve(url, filename)
assert os.path.isfile(dsc_file), "%s does not exist." % (dsc_file)
return dsc_file

View File

@ -21,6 +21,7 @@ import re
from ubuntutools import subprocess
from ubuntutools.logger import Logger
from ubuntutools.sponsor_patch.question import ask_for_manual_fixing
from functools import reduce
class Patch(object):
"""This object represents a patch that can be downloaded from Launchpad."""

View File

@ -15,6 +15,8 @@
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
# OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
from __future__ import print_function
import sys
from ubuntutools.question import Question, YesNoQuestion
@ -43,5 +45,5 @@ def ask_for_manual_fixing():
def user_abort():
"""Print abort and quit the program."""
print "User abort."
print("User abort.")
sys.exit(2)

View File

@ -15,6 +15,8 @@
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
# OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
from __future__ import print_function
import os
import re
import sys
@ -301,7 +303,7 @@ class SourcePackage(object):
bug title."""
if not task.title_contains(self._version):
print "Bug #%i title: %s" % (bug_number, task.get_bug_title())
print("Bug #%i title: %s" % (bug_number, task.get_bug_title()))
msg = "Is %s %s the version that should be synced" % (self._package,
self._version)
answer = YesNoQuestion().ask(msg, "no")
@ -349,7 +351,7 @@ class SourcePackage(object):
assert os.path.isfile(self._changes_file), "%s does not exist." % \
(self._changes_file)
changes = debian.deb822.Changes(file(self._changes_file))
changes = debian.deb822.Changes(open(self._changes_file))
fixed_bugs = []
if "Launchpad-Bugs-Fixed" in changes:
fixed_bugs = changes["Launchpad-Bugs-Fixed"].split(" ")
@ -370,16 +372,16 @@ class SourcePackage(object):
"""Print things that should be checked before uploading a package."""
lintian_filename = self._run_lintian()
print "\nPlease check %s %s carefully:" % (self._package, self._version)
print("\nPlease check %s %s carefully:" % (self._package, self._version))
if os.path.isfile(self._debdiff_filename):
print "file://" + self._debdiff_filename
print "file://" + lintian_filename
print("file://" + self._debdiff_filename)
print("file://" + lintian_filename)
if self._build_log:
print "file://" + self._build_log
print("file://" + self._build_log)
harvest = Harvest(self._package)
if harvest.data:
print harvest.report()
print(harvest.report())
def reload_changelog(self):
"""Reloads debian/changelog and updates the version.
@ -391,9 +393,9 @@ class SourcePackage(object):
# Check the changelog
self._changelog = debian.changelog.Changelog()
try:
self._changelog.parse_changelog(file("debian/changelog"),
self._changelog.parse_changelog(open("debian/changelog"),
max_blocks=1, strict=True)
except debian.changelog.ChangelogParseError, error:
except debian.changelog.ChangelogParseError as error:
Logger.error("The changelog entry doesn't validate: %s", str(error))
ask_for_manual_fixing()
return False

View File

@ -15,11 +15,16 @@
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
# OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
from __future__ import print_function
import os
import pwd
import shutil
import sys
if sys.version_info[0] < 3:
range = xrange
from distro_info import UbuntuDistroInfo
from launchpadlib.launchpad import Launchpad
@ -81,11 +86,11 @@ def edit_source():
# Spawn shell to allow modifications
cmd = [get_user_shell()]
Logger.command(cmd)
print """An interactive shell was launched in
print("""An interactive shell was launched in
file://%s
Edit your files. When you are done, exit the shell. If you wish to abort the
process, exit the shell such that it returns an exit code other than zero.
""" % (os.getcwd()),
""" % (os.getcwd()), end=' ')
returncode = subprocess.call(cmd)
if returncode != 0:
Logger.error("Shell exited with exit value %i." % (returncode))
@ -113,10 +118,10 @@ def ask_for_patch_or_branch(bug, attached_patches, linked_branches):
i = 0
for linked_branch in linked_branches:
i += 1
print "%i) %s" % (i, linked_branch.display_name)
print("%i) %s" % (i, linked_branch.display_name))
for attached_patch in attached_patches:
i += 1
print "%i) %s" % (i, attached_patch.title)
print("%i) %s" % (i, attached_patch.title))
selected = input_number("Which branch or patch do you want to download",
1, i, i)
if selected <= len(linked_branches):
@ -220,9 +225,9 @@ def get_open_ubuntu_bug_task(launchpad, bug, branch=None):
else:
Logger.normal("https://launchpad.net/bugs/%i has %i Ubuntu tasks:" \
% (bug_id, len(ubuntu_tasks)))
for i in xrange(len(ubuntu_tasks)):
print "%i) %s" % (i + 1,
ubuntu_tasks[i].get_package_and_series())
for i in range(len(ubuntu_tasks)):
print("%i) %s" % (i + 1,
ubuntu_tasks[i].get_package_and_series()))
selected = input_number("To which Ubuntu task does the patch belong",
1, len(ubuntu_tasks))
task = ubuntu_tasks[selected - 1]
@ -235,7 +240,7 @@ def _create_and_change_into(workdir):
if not os.path.isdir(workdir):
try:
os.makedirs(workdir)
except os.error, error:
except os.error as error:
Logger.error("Failed to create the working directory %s [Errno " \
"%i]: %s." % (workdir, error.errno, error.strerror))
sys.exit(1)
@ -248,7 +253,7 @@ def _update_maintainer_field():
Logger.command(["update-maintainer"])
try:
update_maintainer("debian", Logger.verbose)
except MaintainerUpdateException, e:
except MaintainerUpdateException as e:
Logger.error("update-maintainer failed: %s", str(e))
sys.exit(1)

View File

@ -14,19 +14,30 @@
# OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
# PERFORMANCE OF THIS SOFTWARE.
from __future__ import with_statement
import __builtin__
try:
import builtins
except ImportError:
import __builtin__
import os.path
import shutil
import StringIO
try:
from StringIO import StringIO
except:
from io import StringIO
from io import BytesIO
import tempfile
import types
import urllib2
try:
from urllib.request import OpenerDirector, urlopen
from urllib.error import HTTPError, URLError
except ImportError:
from urllib2 import OpenerDirector, urlopen
from urllib2 import HTTPError, URLError
import debian.deb822
import httplib2
import mox
import sys
import mock
import ubuntutools.archive
from ubuntutools.config import UDTConfig
@ -44,15 +55,11 @@ def setUpModule():
ex_pkg.cleanup()
class DscVerificationTestCase(mox.MoxTestBase, unittest.TestCase):
class DscVerificationTestCase(unittest.TestCase):
def setUp(self):
super(DscVerificationTestCase, self).setUp()
with open('test-data/example_1.0-1.dsc', 'rb') as f:
self.dsc = ubuntutools.archive.Dsc(f.read())
def tearDown(self):
super(DscVerificationTestCase, self).tearDown()
def test_good(self):
self.assertTrue(self.dsc.verify_file(
'test-data/example_1.0.orig.tar.gz'))
@ -67,11 +74,19 @@ class DscVerificationTestCase(mox.MoxTestBase, unittest.TestCase):
fn = 'test-data/example_1.0.orig.tar.gz'
with open(fn, 'rb') as f:
data = f.read()
data = data[:-1] + chr(ord(data[-1]) ^ 8)
self.mox.StubOutWithMock(__builtin__, 'open')
open(fn, 'rb').AndReturn(StringIO.StringIO(data))
self.mox.ReplayAll()
self.assertFalse(self.dsc.verify_file(fn))
if sys.version_info[0] >= 3:
last_byte = chr(data[-1] ^ 8).encode()
else:
last_byte = chr(ord(data[-1]) ^ 8)
data = data[:-1] + last_byte
m = mock.MagicMock(name='open', spec=open)
m.return_value = BytesIO(data)
if sys.version_info[0] >= 3:
target = 'builtins.open'
else:
target = '__builtin__.open'
with mock.patch(target, m):
self.assertFalse(self.dsc.verify_file(fn))
def test_sha1(self):
del self.dsc['Checksums-Sha256']
@ -85,26 +100,31 @@ class DscVerificationTestCase(mox.MoxTestBase, unittest.TestCase):
self.test_bad()
class LocalSourcePackageTestCase(mox.MoxTestBase, unittest.TestCase):
class LocalSourcePackageTestCase(unittest.TestCase):
SourcePackage = ubuntutools.archive.UbuntuSourcePackage
def setUp(self):
super(LocalSourcePackageTestCase, self).setUp()
self.workdir = tempfile.mkdtemp(prefix='udt-test')
self.mox.StubOutWithMock(ubuntutools.archive, 'Distribution')
self.mox.StubOutWithMock(ubuntutools.archive, 'rmadison')
self._stubout('ubuntutools.archive.Distribution')
self._stubout('ubuntutools.archive.rmadison')
self.real_http = httplib2.Http()
self.mox.StubOutWithMock(httplib2, 'Http')
self.mock_http = self.mox.CreateMock(httplib2.Http)
self.mock_http = self._stubout('httplib2.Http.request')
self.mock_http.side_effect = self.request_proxy
self.url_opener = mock.MagicMock(spec=OpenerDirector)
self.url_opener.open.side_effect = self.urlopen_proxy
# Silence the tests a little:
self.mox.stubs.Set(Logger, 'stdout', StringIO.StringIO())
self.mox.stubs.Set(Logger, 'stderr', StringIO.StringIO())
self._stubout('ubuntutools.logger.Logger.stdout')
self._stubout('ubuntutools.logger.Logger.stderr')
def _stubout(self, stub):
patcher = mock.patch(stub)
self.addCleanup(patcher.stop)
return patcher.start()
def tearDown(self):
super(LocalSourcePackageTestCase, self).tearDown()
shutil.rmtree(self.workdir)
def urlopen_proxy(self, url, destname=None):
@ -112,7 +132,7 @@ class LocalSourcePackageTestCase(mox.MoxTestBase, unittest.TestCase):
if destname is None:
destname = os.path.basename(url)
destpath = os.path.join(os.path.abspath('test-data'), destname)
return urllib2.urlopen('file://' + destpath)
return urlopen('file://' + destpath)
def urlopen_file(self, filename):
"Wrapper for urlopen_proxy for named files"
@ -120,11 +140,11 @@ class LocalSourcePackageTestCase(mox.MoxTestBase, unittest.TestCase):
def urlopen_null(self, url):
"urlopen for zero length files"
return StringIO.StringIO('')
return BytesIO(b'')
def urlopen_404(self, url):
"urlopen for errors"
raise urllib2.HTTPError(url, 404, "Not Found", {}, None)
raise HTTPError(url, 404, "Not Found", {}, None)
def request_proxy(self, url, destname=None):
"httplib2 proxy for grabbing the file from test-data"
@ -132,7 +152,7 @@ class LocalSourcePackageTestCase(mox.MoxTestBase, unittest.TestCase):
destname = os.path.basename(url)
destpath = os.path.join(os.path.abspath('test-data'), destname)
response = httplib2.Response({})
with open(destpath, 'r') as f:
with open(destpath, 'rb') as f:
body = f.read()
return response, body
@ -141,10 +161,17 @@ class LocalSourcePackageTestCase(mox.MoxTestBase, unittest.TestCase):
response = httplib2.Response({'status': 404})
return response, "I'm a 404 Error"
def request_404_then_proxy(self, url, destname=None):
"mock side_effect callable to chain request 404 & proxy"
if self.mock_http.called:
return self.request_proxy(url, destname)
return self.request_404(url)
def test_local_copy(self):
pkg = self.SourcePackage('example', '1.0-1', 'main',
dscfile='test-data/example_1.0-1.dsc',
workdir=self.workdir)
pkg.quiet = True
pkg.pull()
pkg.unpack()
@ -156,6 +183,7 @@ class LocalSourcePackageTestCase(mox.MoxTestBase, unittest.TestCase):
pkg = self.SourcePackage(dscfile=os.path.join(self.workdir,
'example_1.0-1.dsc'),
workdir=self.workdir)
pkg.quiet = True
pkg.pull()
pkg.unpack()
@ -168,6 +196,7 @@ class LocalSourcePackageTestCase(mox.MoxTestBase, unittest.TestCase):
dscfile=os.path.join(self.workdir,
'example_1.0-1.dsc'),
workdir=self.workdir)
pkg.quiet = True
pkg.pull()
pkg.unpack()
@ -177,31 +206,24 @@ class LocalSourcePackageTestCase(mox.MoxTestBase, unittest.TestCase):
shutil.copy2('test-data/example_1.0-1.debian.tar.xz', self.workdir)
with open(os.path.join(self.workdir, 'example_1.0-1.debian.tar.xz'),
'r+b') as f:
f.write('CORRUPTION')
f.write(b'CORRUPTION')
pkg = self.SourcePackage('example', '1.0-1', 'main',
dscfile='test-data/example_1.0-1.dsc',
workdir=self.workdir)
pkg.quiet = True
pkg.pull()
def test_pull(self):
dist = self.SourcePackage.distribution
mirror = UDTConfig.defaults['%s_MIRROR' % dist.upper()]
urlbase = '/pool/main/e/example/'
httplib2.Http().AndReturn(self.mock_http)
self.mock_http.request('https://launchpad.net/%s/+archive/primary/'
'+files/example_1.0-1.dsc' % dist
).WithSideEffects(self.request_proxy)
url_opener = self.mox.CreateMock(urllib2.OpenerDirector)
url_opener.open(mirror + urlbase + 'example_1.0.orig.tar.gz'
).WithSideEffects(self.urlopen_proxy)
url_opener.open(mirror + urlbase + 'example_1.0-1.debian.tar.xz'
).WithSideEffects(self.urlopen_proxy)
self.mox.ReplayAll()
pkg = self.SourcePackage('example', '1.0-1', 'main',
workdir=self.workdir)
pkg.url_opener = url_opener
pkg.url_opener = self.url_opener
pkg.quiet = True
pkg.pull()
def test_mirrors(self):
@ -209,34 +231,24 @@ class LocalSourcePackageTestCase(mox.MoxTestBase, unittest.TestCase):
mirror = 'http://mirror'
lpbase = 'https://launchpad.net/ubuntu/+archive/primary/+files/'
urlbase = '/pool/main/e/example/'
httplib2.Http().AndReturn(self.mock_http)
self.mock_http.request(lpbase + 'example_1.0-1.dsc'
).WithSideEffects(self.request_proxy)
url_opener = self.mox.CreateMock(urllib2.OpenerDirector)
url_opener.open(mirror + urlbase + 'example_1.0.orig.tar.gz'
).WithSideEffects(self.urlopen_null)
url_opener.open(master + urlbase + 'example_1.0.orig.tar.gz'
).WithSideEffects(self.urlopen_404)
url_opener.open(lpbase + 'example_1.0.orig.tar.gz'
).WithSideEffects(self.urlopen_proxy)
url_opener.open(mirror + urlbase + 'example_1.0-1.debian.tar.xz'
).WithSideEffects(self.urlopen_proxy)
self.mox.ReplayAll()
sequence = [self.urlopen_null, self.urlopen_404, self.urlopen_proxy,
self.urlopen_proxy]
def _callable_iter(*args, **kwargs):
return sequence.pop(0)(*args, **kwargs)
url_opener = mock.MagicMock(spec=OpenerDirector)
url_opener.open.side_effect = _callable_iter
pkg = self.SourcePackage('example', '1.0-1', 'main',
workdir=self.workdir, mirrors=[mirror])
pkg.url_opener = url_opener
pkg.quiet = True
pkg.pull()
def test_dsc_missing(self):
lpbase = 'https://launchpad.net/ubuntu/+archive/primary/+files/'
httplib2.Http().AndReturn(self.mock_http)
self.mock_http.request(lpbase + 'example_1.0-1.dsc'
).WithSideEffects(self.request_404)
self.mox.ReplayAll()
self.mock_http.side_effect = self.request_404
pkg = self.SourcePackage('example', '1.0-1', 'main',
workdir=self.workdir)
pkg.quiet = True
self.assertRaises(ubuntutools.archive.DownloadError, pkg.pull)
@ -251,35 +263,24 @@ class DebianLocalSourcePackageTestCase(LocalSourcePackageTestCase):
lpbase = 'https://launchpad.net/debian/+archive/primary/+files/'
base = '/pool/main/e/example/'
httplib2.Http().AndReturn(self.mock_http)
self.mock_http.request(lpbase + 'example_1.0-1.dsc'
).WithSideEffects(self.request_proxy)
url_opener = self.mox.CreateMock(urllib2.OpenerDirector)
url_opener.open(debian_mirror + base + 'example_1.0.orig.tar.gz'
).WithSideEffects(self.urlopen_null)
url_opener.open(debsec_mirror + base + 'example_1.0.orig.tar.gz'
).WithSideEffects(self.urlopen_404)
url_opener.open(debian_master + base + 'example_1.0.orig.tar.gz'
).WithSideEffects(self.urlopen_404)
url_opener.open(debsec_master + base + 'example_1.0.orig.tar.gz'
).WithSideEffects(self.urlopen_404)
url_opener.open(lpbase + 'example_1.0.orig.tar.gz'
).WithSideEffects(self.urlopen_404)
url_opener.open('http://snapshot.debian.org/mr/package/example/1.0-1/'
'srcfiles?fileinfo=1'
).WithSideEffects(lambda x: StringIO.StringIO(
'{"fileinfo": {"hashabc": [{"name": "example_1.0.orig.tar.gz"}]}}'
))
url_opener.open('http://snapshot.debian.org/file/hashabc'
).WithSideEffects(self.urlopen_file(
'example_1.0.orig.tar.gz'))
url_opener.open(debian_mirror + base + 'example_1.0-1.debian.tar.xz'
).WithSideEffects(self.urlopen_proxy)
self.mox.ReplayAll()
sequence = [self.urlopen_null,
self.urlopen_404,
self.urlopen_404,
self.urlopen_404,
self.urlopen_404,
lambda x: BytesIO(
b'{"fileinfo": {"hashabc": [{"name": "example_1.0.orig.tar.gz"}]}}'),
self.urlopen_file('example_1.0.orig.tar.gz'),
self.urlopen_proxy]
def _callable_iter(*args, **kwargs):
return sequence.pop(0)(*args, **kwargs)
url_opener = mock.MagicMock(spec=OpenerDirector)
url_opener.open.side_effect = _callable_iter
pkg = self.SourcePackage('example', '1.0-1', 'main',
workdir=self.workdir, mirrors=[debian_mirror,
debsec_mirror])
pkg.quiet = True
pkg.url_opener = url_opener
pkg.pull()
pkg.unpack()
@ -288,61 +289,35 @@ class DebianLocalSourcePackageTestCase(LocalSourcePackageTestCase):
mirror = 'http://mirror'
lpbase = 'https://launchpad.net/debian/+archive/primary/+files/'
base = '/pool/main/e/example/'
httplib2.Http().AndReturn(self.mock_http)
self.mock_http.request(lpbase + 'example_1.0-1.dsc'
).WithSideEffects(self.request_404)
httplib2.Http().AndReturn(self.mock_http)
self.mock_http.request(mirror + base + 'example_1.0-1.dsc'
).WithSideEffects(self.request_proxy)
url_opener = self.mox.CreateMock(urllib2.OpenerDirector)
url_opener.open(mirror + base + 'example_1.0.orig.tar.gz'
).WithSideEffects(self.urlopen_proxy)
url_opener.open(mirror + base + 'example_1.0-1.debian.tar.xz'
).WithSideEffects(self.urlopen_proxy)
self.mock_http.side_effect = self.request_404_then_proxy
def fake_gpg_info(self, message, keyrings=None):
return debian.deb822.GpgInfo.from_output(
'[GNUPG:] GOODSIG DEADBEEF Joe Developer '
'<joe@example.net>')
# We have to stub this out without mox because there some versions of
# python-debian will pass keyrings=None, others won't.
# http://code.google.com/p/pymox/issues/detail?id=37
self.mox.stubs.Set(debian.deb822.GpgInfo, 'from_sequence',
types.MethodType(fake_gpg_info,
debian.deb822.GpgInfo,
debian.deb822.GpgInfo))
self.mox.ReplayAll()
patcher = mock.patch.object(debian.deb822.GpgInfo, 'from_sequence')
self.addCleanup(patcher.stop)
mock_gpg_info = patcher.start()
mock_gpg_info.return_value = debian.deb822.GpgInfo.from_output(
'[GNUPG:] GOODSIG DEADBEEF Joe Developer '
'<joe@example.net>')
pkg = self.SourcePackage('example', '1.0-1', 'main',
workdir=self.workdir, mirrors=[mirror])
pkg.url_opener = url_opener
pkg.url_opener = self.url_opener
pkg.pull()
def test_dsc_badsig(self):
mirror = 'http://mirror'
lpbase = 'https://launchpad.net/debian/+archive/primary/+files/'
base = '/pool/main/e/example/'
httplib2.Http().AndReturn(self.mock_http)
self.mock_http.request(lpbase + 'example_1.0-1.dsc'
).WithSideEffects(self.request_404)
httplib2.Http().AndReturn(self.mock_http)
self.mock_http.request(mirror + base + 'example_1.0-1.dsc'
).WithSideEffects(self.request_proxy)
def fake_gpg_info(self, message, keyrings=None):
return debian.deb822.GpgInfo.from_output(
'[GNUPG:] ERRSIG DEADBEEF')
# We have to stub this out without mox because there some versions of
# python-debian will pass keyrings=None, others won't.
# http://code.google.com/p/pymox/issues/detail?id=37
self.mox.stubs.Set(debian.deb822.GpgInfo, 'from_sequence',
types.MethodType(fake_gpg_info,
debian.deb822.GpgInfo,
debian.deb822.GpgInfo))
self.mox.ReplayAll()
self.mock_http.side_effect = self.request_404_then_proxy
patcher = mock.patch.object(debian.deb822.GpgInfo, 'from_sequence')
self.addCleanup(patcher.stop)
mock_gpg_info = patcher.start()
mock_gpg_info.return_value = debian.deb822.GpgInfo.from_output(
'[GNUPG:] ERRSIG DEADBEEF')
pkg = self.SourcePackage('example', '1.0-1', 'main',
workdir=self.workdir, mirrors=[mirror])
self.assertRaises(ubuntutools.archive.DownloadError, pkg.pull)
try:
self.assertRaises(ubuntutools.archive.DownloadError, pkg.pull)
except URLError:
raise unittest.SkipTest('Test needs addr resolution to work')

View File

@ -15,19 +15,25 @@
# OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
# PERFORMANCE OF THIS SOFTWARE.
import __builtin__
try:
import builtins
except ImportError:
import __builtin__
import os
import sys
import locale
from StringIO import StringIO
try:
from StringIO import StringIO
except:
from io import StringIO
import mox
import mock
from ubuntutools.config import UDTConfig, ubu_email
from ubuntutools.logger import Logger
from ubuntutools.test import unittest
class ConfigTestCase(mox.MoxTestBase, unittest.TestCase):
class ConfigTestCase(unittest.TestCase):
_config_files = {
'system': '',
'user': '',
@ -46,7 +52,18 @@ class ConfigTestCase(mox.MoxTestBase, unittest.TestCase):
def setUp(self):
super(ConfigTestCase, self).setUp()
self.mox.stubs.Set(__builtin__, 'open', self._fake_open)
if sys.version_info[0] < 3:
self.assertRegex = self.assertRegexpMatches
m = mock.mock_open()
m.side_effect = self._fake_open
if sys.version_info[0] >= 3:
target = 'builtins.open'
else:
target = '__builtin__.open'
patcher = mock.patch(target, m)
self.addCleanup(patcher.stop)
patcher.start()
Logger.stdout = StringIO()
Logger.stderr = StringIO()
@ -63,7 +80,7 @@ class ConfigTestCase(mox.MoxTestBase, unittest.TestCase):
def clean_environment(self):
self._config_files['system'] = ''
self._config_files['user'] = ''
for k in os.environ.keys():
for k in list(os.environ.keys()):
if k.startswith(('UBUNTUTOOLS_', 'TEST_')):
del os.environ[k]
@ -97,8 +114,8 @@ REPEAT=yes
errs = Logger.stderr.getvalue().strip()
Logger.stderr = StringIO()
self.assertEqual(len(errs.splitlines()), 1)
self.assertRegexpMatches(errs,
r'Warning: Cannot parse.*\bCOMMAND_EXECUTION=a')
self.assertRegex(errs,
r'Warning: Cannot parse.*\bCOMMAND_EXECUTION=a')
def get_value(self, *args, **kwargs):
config = UDTConfig(prefix='TEST')
@ -137,8 +154,8 @@ REPEAT=yes
errs = Logger.stderr.getvalue().strip()
Logger.stderr = StringIO()
self.assertEqual(len(errs.splitlines()), 1)
self.assertRegexpMatches(errs,
r'deprecated.*\bCOMPATFOOBAR\b.*\bTEST_QUX\b')
self.assertRegex(errs,
r'deprecated.*\bCOMPATFOOBAR\b.*\bTEST_QUX\b')
def test_boolean(self):
self._config_files['user'] = "TEST_BOOLEAN=yes"
@ -150,7 +167,7 @@ REPEAT=yes
def test_nonpackagewide(self):
self._config_files['user'] = 'UBUNTUTOOLS_FOOBAR=a'
self.assertEquals(self.get_value('FOOBAR'), None)
self.assertEqual(self.get_value('FOOBAR'), None)
class UbuEmailTestCase(unittest.TestCase):
@ -217,7 +234,11 @@ class UbuEmailTestCase(unittest.TestCase):
encoding = locale.getdefaultlocale()[1]
if not encoding:
encoding = 'utf-8'
name = 'Jöe Déveloper'.decode('utf-8')
os.environ['DEBFULLNAME'] = name.encode(encoding)
name = 'Jöe Déveloper'
env_name = name
if isinstance(name, bytes):
name = 'Jöe Déveloper'.decode('utf-8')
env_name = name.encode(encoding)
os.environ['DEBFULLNAME'] = env_name
os.environ['DEBEMAIL'] = email = 'joe@example.net'
self.assertEqual(ubu_email(), (name, email))

View File

@ -45,6 +45,7 @@ class HelpTestCase(unittest.TestCase):
null = open('/dev/null', 'r')
process = subprocess.Popen(['./' + script, '--help'],
close_fds=True, stdin=null,
universal_newlines=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
started = time.time()
@ -57,7 +58,10 @@ class HelpTestCase(unittest.TestCase):
while time.time() - started < TIMEOUT:
for fd in select.select(fds, [], fds, TIMEOUT)[0]:
out.append(os.read(fd, 1024))
output = os.read(fd, 1024)
if not isinstance(output, str):
output = output.decode('utf-8')
out.append(output)
if process.poll() is not None:
break

View File

@ -14,7 +14,10 @@
# OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
# PERFORMANCE OF THIS SOFTWARE.
import StringIO
try:
from StringIO import StringIO
except:
from io import StringIO
import sys
from ubuntutools.logger import Logger
@ -23,8 +26,8 @@ from ubuntutools.test import unittest
class LoggerTestCase(unittest.TestCase):
def setUp(self):
Logger.stdout = StringIO.StringIO()
Logger.stderr = StringIO.StringIO()
Logger.stdout = StringIO()
Logger.stderr = StringIO()
self._script_name = Logger.script_name
Logger.script_name = 'test'
self._verbose = Logger.verbose

View File

@ -25,9 +25,6 @@ WHITELIST = [re.compile(': %s$' % x) for x in (
r"No name '\w+Error' in module 'launchpadlib\.errors'",
# http://www.logilab.org/ticket/51250:
r"Module 'hashlib' has no '(md5|sha(1|224|256|384|512))' member",
# mox:
r"Instance of '.+' has no '(WithSideEffects|MultipleTimes|AndReturn)' "
r"member",
# pylint doesn't like *args/**kwargs
r"Instance of 'Popen' has no '.*' member",
)]

View File

@ -16,12 +16,19 @@
"""Test suite for ubuntutools.update_maintainer"""
import __builtin__
try:
import builtins
except ImportError:
import __builtin__
try:
from StringIO import StringIO
except:
from io import StringIO
import os
import StringIO
import sys
import mox
import mock
from ubuntutools.logger import Logger
from ubuntutools.test import unittest
@ -186,7 +193,7 @@ Package: seahorse-plugins
"""
#pylint: disable=R0904
class UpdateMaintainerTestCase(mox.MoxTestBase, unittest.TestCase):
class UpdateMaintainerTestCase(unittest.TestCase):
"""TestCase object for ubuntutools.update_maintainer"""
_directory = "/"
@ -210,18 +217,30 @@ class UpdateMaintainerTestCase(mox.MoxTestBase, unittest.TestCase):
(mode == "r" and self._files[base] is None)):
raise IOError("No such file or directory: '%s'" % filename)
if mode == "w":
self._files[base] = StringIO.StringIO()
self._files[base] = StringIO()
self._files[base].close = lambda: None
return self._files[base]
#pylint: disable=C0103
def setUp(self):
super(UpdateMaintainerTestCase, self).setUp()
self.mox.stubs.Set(__builtin__, 'open', self._fake_open)
self.mox.stubs.Set(os.path, 'isfile', self._fake_isfile)
self._files["rules"] = StringIO.StringIO(_SIMPLE_RULES)
Logger.stdout = StringIO.StringIO()
Logger.stderr = StringIO.StringIO()
if sys.version_info[0] < 3:
self.assertRegex = self.assertRegexpMatches
m = mock.mock_open()
m.side_effect = self._fake_open
if sys.version_info[0] >= 3:
target = 'builtins.open'
else:
target = '__builtin__.open'
patcher = mock.patch(target, m)
self.addCleanup(patcher.stop)
patcher.start()
m = mock.MagicMock(side_effect=self._fake_isfile)
patcher = mock.patch('os.path.isfile', m)
self.addCleanup(patcher.stop)
patcher.start()
self._files["rules"] = StringIO(_SIMPLE_RULES)
Logger.stdout = StringIO()
Logger.stderr = StringIO()
def tearDown(self):
self.assertEqual(Logger.stdout.getvalue(), '')
@ -236,8 +255,8 @@ class UpdateMaintainerTestCase(mox.MoxTestBase, unittest.TestCase):
#pylint: enable=C0103
def test_debian_package(self):
"""Test: Don't update Maintainer field if target is Debian."""
self._files["changelog"] = StringIO.StringIO(_UNSTABLE_CHANGELOG)
self._files["control"] = StringIO.StringIO(_ABP_CONTROL)
self._files["changelog"] = StringIO(_UNSTABLE_CHANGELOG)
self._files["control"] = StringIO(_ABP_CONTROL)
update_maintainer(self._directory)
self.assertEqual(self._files["control"].getvalue(), _ABP_CONTROL)
@ -246,52 +265,52 @@ class UpdateMaintainerTestCase(mox.MoxTestBase, unittest.TestCase):
The Maintainer field needs to be update even if
XSBC-Original-Maintainer has an @ubuntu.com address."""
self._files["changelog"] = StringIO.StringIO(_LUCID_CHANGELOG)
self._files["control"] = StringIO.StringIO(_AXIS2C_CONTROL)
self._files["changelog"] = StringIO(_LUCID_CHANGELOG)
self._files["control"] = StringIO(_AXIS2C_CONTROL)
update_maintainer(self._directory)
self.assertEqual(self._files["control"].getvalue(), _AXIS2C_UPDATED)
warnings = Logger.stderr.getvalue().strip()
Logger.stderr = StringIO.StringIO()
Logger.stderr = StringIO()
self.assertEqual(len(warnings.splitlines()), 1)
self.assertRegexpMatches(warnings, "Warning: Overwriting original "
self.assertRegex(warnings, "Warning: Overwriting original "
"maintainer: Soren Hansen "
"<soren@ubuntu.com>")
def test_update_maintainer(self):
"""Test: Update Maintainer field."""
self._files["changelog"] = StringIO.StringIO(_LUCID_CHANGELOG)
self._files["control"] = StringIO.StringIO(_ABP_CONTROL)
self._files["changelog"] = StringIO(_LUCID_CHANGELOG)
self._files["control"] = StringIO(_ABP_CONTROL)
update_maintainer(self._directory)
self.assertEqual(self._files["control"].getvalue(), _ABP_UPDATED)
def test_update_old_maintainer(self):
"""Test: Update old MOTU address."""
self._files["changelog"] = StringIO.StringIO(_UNSTABLE_CHANGELOG)
self._files["control.in"] = StringIO.StringIO(_ABP_OLD_MAINTAINER)
self._files["changelog"] = StringIO(_UNSTABLE_CHANGELOG)
self._files["control.in"] = StringIO(_ABP_OLD_MAINTAINER)
update_maintainer(self._directory, True)
self.assertEqual(self._files["control.in"].getvalue(), _ABP_UPDATED)
def test_comments_in_control(self):
"""Test: Update Maintainer field in a control file containing
comments."""
self._files["changelog"] = StringIO.StringIO(_LUCID_CHANGELOG)
self._files["control"] = StringIO.StringIO(_SEAHORSE_PLUGINS_CONTROL)
self._files["changelog"] = StringIO(_LUCID_CHANGELOG)
self._files["control"] = StringIO(_SEAHORSE_PLUGINS_CONTROL)
update_maintainer(self._directory)
self.assertEqual(self._files["control"].getvalue(),
_SEAHORSE_PLUGINS_UPDATED)
def test_skip_smart_rules(self):
"""Test: Skip update when XSBC-Original in debian/rules."""
self._files["changelog"] = StringIO.StringIO(_LUCID_CHANGELOG)
self._files["control"] = StringIO.StringIO(_ABP_CONTROL)
self._files["rules"] = StringIO.StringIO(_COMPLEX_RULES)
self._files["changelog"] = StringIO(_LUCID_CHANGELOG)
self._files["control"] = StringIO(_ABP_CONTROL)
self._files["rules"] = StringIO(_COMPLEX_RULES)
update_maintainer(self._directory)
self.assertEqual(self._files["control"].getvalue(), _ABP_CONTROL)
def test_missing_rules(self):
"""Test: Skip XSBC-Original test when debian/rules is missing."""
self._files["changelog"] = StringIO.StringIO(_LUCID_CHANGELOG)
self._files["control"] = StringIO.StringIO(_ABP_CONTROL)
self._files["changelog"] = StringIO(_LUCID_CHANGELOG)
self._files["control"] = StringIO(_ABP_CONTROL)
self._files["rules"] = None
update_maintainer(self._directory)
self.assertEqual(self._files["control"].getvalue(), _ABP_UPDATED)

View File

@ -14,6 +14,8 @@
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
# OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
from __future__ import print_function
"""This module is for updating the Maintainer field of an Ubuntu package."""
import os
@ -125,7 +127,7 @@ def _find_files(debian_directory, verbose):
if os.path.isfile(rules_file) and \
'XSBC-Original-' in open(rules_file).read():
if verbose:
print "XSBC-Original is managed by 'rules' file. Doing nothing."
print("XSBC-Original is managed by 'rules' file. Doing nothing.")
control_files = []
return (changelog_file, control_files)
@ -144,7 +146,7 @@ def update_maintainer(debian_directory, verbose=False):
"""
try:
changelog_file, control_files = _find_files(debian_directory, verbose)
except MaintainerUpdateException, e:
except MaintainerUpdateException as e:
Logger.error(str(e))
raise
@ -159,8 +161,8 @@ def update_maintainer(debian_directory, verbose=False):
if original_maintainer.strip().lower() in _PREVIOUS_UBUNTU_MAINTAINER:
if verbose:
print "The old maintainer was: %s" % original_maintainer
print "Resetting as: %s" % _UBUNTU_MAINTAINER
print("The old maintainer was: %s" % original_maintainer)
print("Resetting as: %s" % _UBUNTU_MAINTAINER)
control.set_maintainer(_UBUNTU_MAINTAINER)
control.save()
continue
@ -173,7 +175,7 @@ def update_maintainer(debian_directory, verbose=False):
if distribution in ("stable", "testing", "unstable", "experimental"):
if verbose:
print "The package targets Debian. Doing nothing."
print("The package targets Debian. Doing nothing.")
return
if control.get_original_maintainer() is not None:
@ -181,8 +183,8 @@ def update_maintainer(debian_directory, verbose=False):
control.get_original_maintainer())
if verbose:
print "The original maintainer is: %s" % original_maintainer
print "Resetting as: %s" % _UBUNTU_MAINTAINER
print("The original maintainer is: %s" % original_maintainer)
print("Resetting as: %s" % _UBUNTU_MAINTAINER)
control.set_original_maintainer(original_maintainer)
control.set_maintainer(_UBUNTU_MAINTAINER)
control.save()
@ -194,7 +196,7 @@ def restore_maintainer(debian_directory, verbose=False):
"""Restore the original maintainer"""
try:
changelog_file, control_files = _find_files(debian_directory, verbose)
except MaintainerUpdateException, e:
except MaintainerUpdateException as e:
Logger.error(str(e))
raise
@ -204,7 +206,7 @@ def restore_maintainer(debian_directory, verbose=False):
if not orig_maintainer:
continue
if verbose:
print "Restoring original maintainer: %s" % orig_maintainer
print("Restoring original maintainer: %s" % orig_maintainer)
control.set_maintainer(orig_maintainer)
control.remove_original_maintainer()
control.save()