Port ubuntutools module to python3.

This commit is contained in:
Dimitri John Ledkov 2014-12-23 16:12:18 +00:00
commit 427ac0d620
25 changed files with 465 additions and 333 deletions

3
debian/changelog vendored
View File

@ -7,6 +7,9 @@ ubuntu-dev-tools (0.154) UNRELEASED; urgency=medium
* mk-sbuild: better message for cross build so that new start have * mk-sbuild: better message for cross build so that new start have
correct sbuild command from the last message of mk-sbuild. correct sbuild command from the last message of mk-sbuild.
[ Dimitri John Ledkov ]
* Port ubuntutools module to python3.
-- Logan Rosen <logan@ubuntu.com> Wed, 23 Apr 2014 17:24:12 -0400 -- Logan Rosen <logan@ubuntu.com> Wed, 23 Apr 2014 17:24:12 -0400
ubuntu-dev-tools (0.153) unstable; urgency=medium ubuntu-dev-tools (0.153) unstable; urgency=medium

34
debian/control vendored
View File

@ -8,6 +8,7 @@ Vcs-Bzr: lp:ubuntu-dev-tools
Vcs-Browser: https://code.launchpad.net/~ubuntu-dev/ubuntu-dev-tools/trunk Vcs-Browser: https://code.launchpad.net/~ubuntu-dev/ubuntu-dev-tools/trunk
Build-Depends: dctrl-tools, Build-Depends: dctrl-tools,
debhelper (>= 9), debhelper (>= 9),
dh-python,
devscripts (>= 2.11.0~), devscripts (>= 2.11.0~),
distro-info (>= 0.2~), distro-info (>= 0.2~),
libwww-perl, libwww-perl,
@ -19,11 +20,20 @@ Build-Depends: dctrl-tools,
python-distro-info (>= 0.4~), python-distro-info (>= 0.4~),
python-httplib2, python-httplib2,
python-launchpadlib (>= 1.5.7), python-launchpadlib (>= 1.5.7),
python-mox,
python-setuptools, python-setuptools,
python-soappy, python-soappy,
python-unittest2 python-unittest2,
python-mock,
python3-all,
python3-apt,
python3-debian,
python3-distro-info,
python3-httplib2,
python3-launchpadlib,
python3-setuptools,
python3-mock,
X-Python-Version: >= 2.6 X-Python-Version: >= 2.6
X-Python3-Version: >= 3.2
Homepage: https://launchpad.net/ubuntu-dev-tools Homepage: https://launchpad.net/ubuntu-dev-tools
Standards-Version: 3.9.5 Standards-Version: 3.9.5
@ -114,3 +124,23 @@ Description: useful tools for Ubuntu developers
- ubuntu-upload-permission - query / list the upload permissions for a - ubuntu-upload-permission - query / list the upload permissions for a
package. package.
- update-maintainer - script to update maintainer field in ubuntu packages. - update-maintainer - script to update maintainer field in ubuntu packages.
Package: python-ubuntutools
Architecture: all
Depends: ${python:Depends}
Breaks: ubuntu-dev-tools (<< 0.154)
Replaces: ubuntu-dev-tools (<< 0.154)
Description: useful library of APIs for Ubuntu developer tools (Python 2)
This package ships a collection of APIs, helpers and wrappers used to
develop useful utiliteis for Ubuntu developers.
.
Python 2 variant.
Package: python3-ubuntutools
Architecture: all
Depends: ${python3:Depends}
Description: useful library of APIs for Ubuntu developer tools
This package ships a collection of APIs, helpers and wrappers used to
develop useful utiliteis for Ubuntu developers.
.
Python 3 variant.

19
debian/rules vendored
View File

@ -1,12 +1,13 @@
#!/usr/bin/make -f #!/usr/bin/make -f
%: export PYBUILD_NAME=ubuntutools
dh $@ --with python2
ifeq (,$(filter nocheck,$(DEB_BUILD_OPTIONS))) %:
override_dh_auto_test: dh $@ --with python2,python3 --buildsystem=pybuild
set -e; \
for python in $(shell pyversions -r); do \ override_dh_install:
$$python setup.py test; \ dh_install
done mkdir -p debian/ubuntu-dev-tools/usr
endif mv debian/python-ubuntutools/etc debian/ubuntu-dev-tools
mv debian/python-ubuntutools/usr/bin debian/ubuntu-dev-tools/usr/
mv debian/python-ubuntutools/usr/share debian/ubuntu-dev-tools/usr/

View File

@ -4,6 +4,7 @@ from setuptools import setup
import glob import glob
import os import os
import re import re
import sys
# look/set what version we have # look/set what version we have
changelog = "debian/changelog" changelog = "debian/changelog"
@ -13,7 +14,11 @@ if os.path.exists(changelog):
if match: if match:
version = match.group(1) version = match.group(1)
scripts = ['404main', if sys.version_info[0] >= 3:
scripts = []
data_files = []
else:
scripts = ['404main',
'backportpackage', 'backportpackage',
'bitesize', 'bitesize',
'check-mir', 'check-mir',
@ -46,6 +51,12 @@ scripts = ['404main',
'ubuntu-upload-permission', 'ubuntu-upload-permission',
'update-maintainer', 'update-maintainer',
] ]
data_files = [
('/etc/bash_completion.d', glob.glob("bash_completion/*")),
('share/man/man1', glob.glob("doc/*.1")),
('share/man/man5', glob.glob("doc/*.5")),
('share/ubuntu-dev-tools', ['enforced-editing-wrapper']),
]
if __name__ == '__main__': if __name__ == '__main__':
setup(name='ubuntu-dev-tools', setup(name='ubuntu-dev-tools',
@ -57,11 +68,6 @@ if __name__ == '__main__':
'ubuntutools/sponsor_patch', 'ubuntutools/sponsor_patch',
'ubuntutools/test', 'ubuntutools/test',
], ],
data_files=[('/etc/bash_completion.d', data_files=data_files,
glob.glob("bash_completion/*")),
('share/man/man1', glob.glob("doc/*.1")),
('share/man/man5', glob.glob("doc/*.5")),
('share/ubuntu-dev-tools', ['enforced-editing-wrapper']),
],
test_suite='ubuntutools.test.discover', test_suite='ubuntutools.test.discover',
) )

View File

@ -27,18 +27,28 @@ Approach:
3. Verify checksums. 3. Verify checksums.
""" """
from __future__ import with_statement from __future__ import with_statement, print_function
import hashlib import hashlib
import os.path import os.path
import urllib2 try:
import urlparse from urllib.request import ProxyHandler, build_opener, urlopen
from urllib.parse import urlparse
from urllib.error import URLError, HTTPError
except ImportError:
from urllib2 import ProxyHandler, build_opener, urlopen
from urlparse import urlparse
from urllib2 import URLError, HTTPError
import re import re
import sys import sys
if sys.version_info[0] >= 3:
basestring = str
unicode = str
from debian.changelog import Changelog, Version from debian.changelog import Changelog, Version
import debian.deb822 import debian.deb822
import debian.debian_support import debian.debian_support
import codecs
import httplib2 import httplib2
from ubuntutools.config import UDTConfig from ubuntutools.config import UDTConfig
@ -81,7 +91,7 @@ class Dsc(debian.deb822.Dsc):
f = open(pathname, 'rb') f = open(pathname, 'rb')
while True: while True:
buf = f.read(hash_func.block_size) buf = f.read(hash_func.block_size)
if buf == '': if buf == b'':
break break
hash_func.update(buf) hash_func.update(buf)
f.close() f.close()
@ -102,7 +112,7 @@ class Dsc(debian.deb822.Dsc):
their_checksums = \ their_checksums = \
dict((entry['name'], (int(entry['size']), entry[key])) dict((entry['name'], (int(entry['size']), entry[key]))
for entry in other[field]) for entry in other[field])
for name, (size, checksum) in our_checksums.iteritems(): for name, (size, checksum) in our_checksums.items():
if name not in their_checksums: if name not in their_checksums:
# file only in one dsc # file only in one dsc
continue continue
@ -154,8 +164,8 @@ class SourcePackage(object):
self.version = debian.debian_support.Version(version) self.version = debian.debian_support.Version(version)
# uses default proxies from the environment # uses default proxies from the environment
proxy = urllib2.ProxyHandler() proxy = ProxyHandler()
self.url_opener = urllib2.build_opener(proxy) self.url_opener = build_opener(proxy)
@property @property
def lp_spph(self): def lp_spph(self):
@ -231,10 +241,10 @@ class SourcePackage(object):
def pull_dsc(self): def pull_dsc(self):
"Retrieve dscfile and parse" "Retrieve dscfile and parse"
if self._dsc_source: if self._dsc_source:
parsed = urlparse.urlparse(self._dsc_source) parsed = urlparse(self._dsc_source)
if parsed.scheme == '': if parsed.scheme == '':
self._dsc_source = 'file://' + os.path.abspath(self._dsc_source) self._dsc_source = 'file://' + os.path.abspath(self._dsc_source)
parsed = urlparse.urlparse(self._dsc_source) parsed = urlparse(self._dsc_source)
url = self._dsc_source url = self._dsc_source
else: else:
url = self._lp_url(self.dsc_name) url = self._lp_url(self.dsc_name)
@ -244,14 +254,14 @@ class SourcePackage(object):
def _download_dsc(self, url): def _download_dsc(self, url):
"Download specified dscfile and parse" "Download specified dscfile and parse"
parsed = urlparse.urlparse(url) parsed = urlparse(url)
if parsed.scheme == 'file': if parsed.scheme == 'file':
with open(parsed.path, 'r') as f: with open(parsed.path, 'r') as f:
body = f.read() body = f.read()
else: else:
try: try:
response, body = httplib2.Http().request(url) response, body = httplib2.Http().request(url)
except httplib2.HttpLib2Error, e: except httplib2.HttpLib2Error as e:
raise DownloadError(e) raise DownloadError(e)
if response.status != 200: if response.status != 200:
raise DownloadError("%s: %s %s" % (url, response.status, raise DownloadError("%s: %s %s" % (url, response.status,
@ -299,7 +309,7 @@ class SourcePackage(object):
"Write dsc file to workdir" "Write dsc file to workdir"
if self._dsc is None: if self._dsc is None:
self.pull_dsc() self.pull_dsc()
with open(self.dsc_pathname, 'w') as f: with open(self.dsc_pathname, 'wb') as f:
f.write(self.dsc.raw_text) f.write(self.dsc.raw_text)
def _download_file(self, url, filename): def _download_file(self, url, filename):
@ -312,17 +322,17 @@ class SourcePackage(object):
if entry['name'] == filename] if entry['name'] == filename]
assert len(size) == 1 assert len(size) == 1
size = int(size[0]) size = int(size[0])
parsed = urlparse.urlparse(url) parsed = urlparse(url)
if not self.quiet: if not self.quiet:
Logger.normal('Downloading %s from %s (%0.3f MiB)', Logger.normal('Downloading %s from %s (%0.3f MiB)',
filename, parsed.hostname, size / 1024.0 / 1024) filename, parsed.hostname, size / 1024.0 / 1024)
if parsed.scheme == 'file': if parsed.scheme == 'file':
in_ = open(parsed.path, 'r') in_ = open(parsed.path, 'rb')
else: else:
try: try:
in_ = self.url_opener.open(url) in_ = self.url_opener.open(url)
except urllib2.URLError: except URLError:
return False return False
downloaded = 0 downloaded = 0
@ -331,7 +341,7 @@ class SourcePackage(object):
with open(pathname, 'wb') as out: with open(pathname, 'wb') as out:
while True: while True:
block = in_.read(10240) block = in_.read(10240)
if block == '': if block == b'':
break break
downloaded += len(block) downloaded += len(block)
out.write(block) out.write(block)
@ -360,9 +370,9 @@ class SourcePackage(object):
try: try:
if self._download_file(url, name): if self._download_file(url, name):
break break
except urllib2.HTTPError, e: except HTTPError as e:
Logger.normal('HTTP Error %i: %s', e.code, str(e)) Logger.normal('HTTP Error %i: %s', e.code, str(e))
except urllib2.URLError, e: except URLError as e:
Logger.normal('URL Error: %s', e.reason) Logger.normal('URL Error: %s', e.reason)
else: else:
raise DownloadError('File %s could not be found' % name) raise DownloadError('File %s could not be found' % name)
@ -457,7 +467,7 @@ class DebianSourcePackage(SourcePackage):
wrapped_iterator = super(DebianSourcePackage, self)._source_urls(name) wrapped_iterator = super(DebianSourcePackage, self)._source_urls(name)
while True: while True:
try: try:
yield wrapped_iterator.next() yield next(wrapped_iterator)
except StopIteration: except StopIteration:
break break
if self.snapshot_list: if self.snapshot_list:
@ -499,11 +509,14 @@ class DebianSourcePackage(SourcePackage):
"python-simplejson") "python-simplejson")
try: try:
srcfiles = json.load(self.url_opener.open( data = self.url_opener.open(
'http://snapshot.debian.org' 'http://snapshot.debian.org'
'/mr/package/%s/%s/srcfiles?fileinfo=1' '/mr/package/%s/%s/srcfiles?fileinfo=1'
% (self.source, self.version.full_version))) % (self.source, self.version.full_version))
except urllib2.HTTPError: reader = codecs.getreader('utf-8')
srcfiles = json.load(reader(data))
except HTTPError:
Logger.error('Version %s of %s not found on ' Logger.error('Version %s of %s not found on '
'snapshot.debian.org', 'snapshot.debian.org',
self.version.full_version, self.source) self.version.full_version, self.source)
@ -511,7 +524,7 @@ class DebianSourcePackage(SourcePackage):
return False return False
self._snapshot_list = dict((info[0]['name'], hash_) self._snapshot_list = dict((info[0]['name'], hash_)
for hash_, info for hash_, info
in srcfiles['fileinfo'].iteritems()) in srcfiles['fileinfo'].items())
return self._snapshot_list return self._snapshot_list
def _snapshot_url(self, name): def _snapshot_url(self, name):
@ -569,9 +582,9 @@ class FakeSPPH(object):
self.name + '_' + pkgversion, self.name + '_' + pkgversion,
'changelog' + extension) 'changelog' + extension)
try: try:
self._changelog = urllib2.urlopen(url).read() self._changelog = urlopen(url).read()
except urllib2.HTTPError, error: except HTTPError as error:
print >> sys.stderr, ('%s: %s' % (url, error)) print(('%s: %s' % (url, error)), file=sys.stderr)
return None return None
if since_version is None: if since_version is None:

View File

@ -179,6 +179,6 @@ def ubu_email(name=None, email=None, export=True):
encoding = locale.getdefaultlocale()[1] encoding = locale.getdefaultlocale()[1]
if not encoding: if not encoding:
encoding = 'utf-8' encoding = 'utf-8'
if name: if name and isinstance(name, bytes):
name = name.decode(encoding) name = name.decode(encoding)
return name, email return name, email

View File

@ -14,7 +14,12 @@
import json import json
import os.path import os.path
import sys import sys
import urllib2 try:
from urllib.request import urlopen
from urllib.error import URLError
except ImportError:
from urllib2 import urlopen
from urllib2 import URLError
from ubuntutools.logger import Logger from ubuntutools.logger import Logger
@ -32,11 +37,11 @@ class Harvest(object):
def _get_data(self): def _get_data(self):
try: try:
sock = urllib2.urlopen(self.data_url) sock = urlopen(self.data_url)
except IOError: except IOError:
try: try:
urllib2.urlopen(BASE_URL) urlopen(BASE_URL)
except urllib2.URLError: except URLError:
Logger.error("Harvest is down.") Logger.error("Harvest is down.")
sys.exit(1) sys.exit(1)
return None return None
@ -45,9 +50,7 @@ class Harvest(object):
return json.loads(response) return json.loads(response)
def opportunity_summary(self): def opportunity_summary(self):
l = [] l = ["%s (%s)" % (k,v) for (k,v) in self.data.items() if k != "total"]
for key in filter(lambda a: a != "total", self.data.keys()):
l += ["%s (%s)" % (key, self.data[key])]
return ", ".join(l) return ", ".join(l)
def report(self): def report(self):

View File

@ -19,8 +19,11 @@
# #
# Modules. # Modules.
import urllib try:
import urlparse from urllib.parse import urlsplit, urlencode, urlunsplit
except ImportError:
from urllib import urlencode
from urlparse import urlsplit, urlunsplit
def query_to_dict(query_string): def query_to_dict(query_string):
result = dict() result = dict()
@ -31,7 +34,7 @@ def query_to_dict(query_string):
return result return result
def translate_web_api(url, launchpad): def translate_web_api(url, launchpad):
scheme, netloc, path, query, fragment = urlparse.urlsplit(url) scheme, netloc, path, query, fragment = urlsplit(url)
query = query_to_dict(query) query = query_to_dict(query)
differences = set(netloc.split('.')).symmetric_difference( differences = set(netloc.split('.')).symmetric_difference(
@ -44,8 +47,8 @@ def translate_web_api(url, launchpad):
if "ws.op" in query: if "ws.op" in query:
raise ValueError("Invalid web url, url: %s" %url) raise ValueError("Invalid web url, url: %s" %url)
query["ws.op"] = "searchTasks" query["ws.op"] = "searchTasks"
scheme, netloc, api_path, _, _ = urlparse.urlsplit(str(launchpad._root_uri)) scheme, netloc, api_path, _, _ = urlsplit(str(launchpad._root_uri))
query = urllib.urlencode(query) query = urlencode(query)
url = urlparse.urlunsplit((scheme, netloc, api_path + path.lstrip("/"), url = urlunsplit((scheme, netloc, api_path + path.lstrip("/"),
query, fragment)) query, fragment))
return url return url

View File

@ -21,12 +21,34 @@
# #
# Based on code written by Jonathan Davies <jpds@ubuntu.com> # Based on code written by Jonathan Davies <jpds@ubuntu.com>
from __future__ import print_function
# Uncomment for tracing LP API calls # Uncomment for tracing LP API calls
#import httplib2 #import httplib2
#httplib2.debuglevel = 1 #httplib2.debuglevel = 1
import sys import sys
if sys.version_info[0] >= 3:
basestring = str
unicode = str
#Shameless steal from python-six
def add_metaclass(metaclass):
"""Class decorator for creating a class with a metaclass."""
def wrapper(cls):
orig_vars = cls.__dict__.copy()
slots = orig_vars.get('__slots__')
if slots is not None:
if isinstance(slots, str):
slots = [slots]
for slots_var in slots:
orig_vars.pop(slots_var)
orig_vars.pop('__dict__', None)
orig_vars.pop('__weakref__', None)
return metaclass(cls.__name__, cls.__bases__, orig_vars)
return wrapper
from debian.changelog import Changelog, Version from debian.changelog import Changelog, Version
from httplib2 import Http, HttpLib2Error from httplib2 import Http, HttpLib2Error
from launchpadlib.launchpad import Launchpad as LP from launchpadlib.launchpad import Launchpad as LP
@ -39,6 +61,7 @@ from ubuntutools.lp.udtexceptions import (AlreadyLoggedInError,
PackageNotFoundException, PackageNotFoundException,
PocketDoesNotExistError, PocketDoesNotExistError,
SeriesNotFoundException) SeriesNotFoundException)
import collections
__all__ = [ __all__ = [
'Archive', 'Archive',
@ -64,8 +87,8 @@ class _Launchpad(object):
try: try:
self.__lp = LP.login_with('ubuntu-dev-tools', service, self.__lp = LP.login_with('ubuntu-dev-tools', service,
version=api_version) version=api_version)
except IOError, error: except IOError as error:
print >> sys.stderr, 'E: %s' % error print('E: %s' % error, file=sys.stderr)
raise raise
else: else:
raise AlreadyLoggedInError('Already logged in to Launchpad.') raise AlreadyLoggedInError('Already logged in to Launchpad.')
@ -112,11 +135,11 @@ class MetaWrapper(type):
cls._cache = dict() cls._cache = dict()
@add_metaclass(MetaWrapper)
class BaseWrapper(object): class BaseWrapper(object):
''' '''
A base class from which other wrapper classes are derived. A base class from which other wrapper classes are derived.
''' '''
__metaclass__ = MetaWrapper
resource_type = None # it's a base class after all resource_type = None # it's a base class after all
def __new__(cls, data): def __new__(cls, data):
@ -149,7 +172,7 @@ class BaseWrapper(object):
cls._cache[data.self_link] = cached cls._cache[data.self_link] = cached
# add additional class specific caching (if available) # add additional class specific caching (if available)
cache = getattr(cls, 'cache', None) cache = getattr(cls, 'cache', None)
if callable(cache): if isinstance(cache, collections.Callable):
cache(cached) cache(cached)
return cached return cached
else: else:
@ -158,7 +181,7 @@ class BaseWrapper(object):
else: else:
# not a LP API representation, let the specific class handle it # not a LP API representation, let the specific class handle it
fetch = getattr(cls, 'fetch', None) fetch = getattr(cls, 'fetch', None)
if callable(fetch): if isinstance(fetch, collections.Callable):
return fetch(data) return fetch(data)
else: else:
raise NotImplementedError("Don't know how to fetch '%s' from LP" raise NotImplementedError("Don't know how to fetch '%s' from LP"
@ -502,19 +525,19 @@ class SourcePackagePublishingHistory(BaseWrapper):
if self._changelog is None: if self._changelog is None:
url = self._lpobject.changelogUrl() url = self._lpobject.changelogUrl()
if url is None: if url is None:
print >> sys.stderr, ('E: No changelog available for %s %s', print(('E: No changelog available for %s %s',
(self.getPackageName(), (self.getPackageName(),
self.getVersion())) self.getVersion())), file=sys.stderr)
return None return None
try: try:
response, changelog = Http().request(url) response, changelog = Http().request(url)
except HttpLib2Error, e: except HttpLib2Error as e:
print >> sys.stderr, str(e) print(str(e), file=sys.stderr)
return None return None
if response.status != 200: if response.status != 200:
print >> sys.stderr, ('%s: %s %s' % (url, response.status, print(('%s: %s %s' % (url, response.status,
response.reason)) response.reason)), file=sys.stderr)
return None return None
self._changelog = changelog self._changelog = changelog
@ -627,7 +650,7 @@ class MetaPersonTeam(MetaWrapper):
if '_me' not in cls.__dict__: if '_me' not in cls.__dict__:
try: try:
cls._me = PersonTeam(Launchpad.me) cls._me = PersonTeam(Launchpad.me)
except HTTPError, error: except HTTPError as error:
if error.response.status == 401: if error.response.status == 401:
# Anonymous login # Anonymous login
cls._me = None cls._me = None
@ -636,11 +659,11 @@ class MetaPersonTeam(MetaWrapper):
return cls._me return cls._me
@add_metaclass(MetaPersonTeam)
class PersonTeam(BaseWrapper): class PersonTeam(BaseWrapper):
''' '''
Wrapper class around a LP person or team object. Wrapper class around a LP person or team object.
''' '''
__metaclass__ = MetaPersonTeam
resource_type = ( resource_type = (
'person', 'person',
@ -716,7 +739,7 @@ class PersonTeam(BaseWrapper):
sourcepackagename=package, sourcepackagename=package,
) )
canUpload = True canUpload = True
except HTTPError, e: except HTTPError as e:
if e.response.status == 403: if e.response.status == 403:
canUpload = False canUpload = False
else: else:

View File

@ -22,6 +22,8 @@
# #
# ################################################################## # ##################################################################
from __future__ import print_function
# Modules. # Modules.
import locale import locale
import os import os
@ -66,8 +68,8 @@ def system_distribution_chain():
break break
_system_distribution_chain.append(parent) _system_distribution_chain.append(parent)
except Exception: except Exception:
print ('Error: Could not determine the parent of the ' print(('Error: Could not determine the parent of the '
'distribution %s' % _system_distribution_chain[-1]) 'distribution %s' % _system_distribution_chain[-1]))
return [] return []
return _system_distribution_chain return _system_distribution_chain
@ -92,8 +94,8 @@ def host_architecture():
stderr=PIPE).communicate()[0].split() stderr=PIPE).communicate()[0].split()
if not arch or 'not found' in arch[0]: if not arch or 'not found' in arch[0]:
print 'Error: Not running on a Debian based system; could not ' \ print('Error: Not running on a Debian based system; could not ' \
'detect its architecture.' 'detect its architecture.')
return None return None
return arch[0] return arch[0]
@ -106,13 +108,13 @@ def readlist(filename, uniq=True):
""" """
if not os.path.isfile(filename): if not os.path.isfile(filename):
print 'File "%s" does not exist.' % filename print('File "%s" does not exist.' % filename)
return False return False
content = open(filename).read().replace('\n', ' ').replace(',', ' ') content = open(filename).read().replace('\n', ' ').replace(',', ' ')
if not content.strip(): if not content.strip():
print 'File "%s" is empty.' % filename print('File "%s" is empty.' % filename)
return False return False
items = [item for item in content.split() if item] items = [item for item in content.split() if item]
@ -149,8 +151,8 @@ def split_release_pocket(release, default='Release'):
def require_utf8(): def require_utf8():
'''Can be called by programs that only function in UTF-8 locales''' '''Can be called by programs that only function in UTF-8 locales'''
if locale.getpreferredencoding() != 'UTF-8': if locale.getpreferredencoding() != 'UTF-8':
print >> sys.stderr, ("This program only functions in a UTF-8 locale. " print(("This program only functions in a UTF-8 locale. "
"Aborting.") "Aborting."), file=sys.stderr)
sys.exit(1) sys.exit(1)

View File

@ -16,10 +16,15 @@
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF # ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
# OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. # OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
from __future__ import print_function
import tempfile import tempfile
import os import os
import re import re
import sys import sys
if sys.version_info[0] < 3:
input = raw_input
import ubuntutools.subprocess import ubuntutools.subprocess
@ -56,9 +61,9 @@ class Question(object):
selected = None selected = None
while selected not in self.options: while selected not in self.options:
try: try:
selected = raw_input(question).strip().lower() selected = input(question).strip().lower()
except (EOFError, KeyboardInterrupt): except (EOFError, KeyboardInterrupt):
print '\nAborting as requested.' print('\nAborting as requested.')
sys.exit(1) sys.exit(1)
if selected == "": if selected == "":
selected = default selected = default
@ -68,8 +73,8 @@ class Question(object):
if selected == option[0]: if selected == option[0]:
selected = option selected = option
if selected not in self.options: if selected not in self.options:
print "Please answer the question with " + \ print("Please answer the question with " + \
self.get_options() + "." self.get_options() + ".")
return selected return selected
@ -86,9 +91,9 @@ def input_number(question, min_number, max_number, default=None):
selected = None selected = None
while selected < min_number or selected > max_number: while selected < min_number or selected > max_number:
try: try:
selected = raw_input(question).strip() selected = input(question).strip()
except (EOFError, KeyboardInterrupt): except (EOFError, KeyboardInterrupt):
print '\nAborting as requested.' print('\nAborting as requested.')
sys.exit(1) sys.exit(1)
if default and selected == "": if default and selected == "":
selected = default selected = default
@ -96,10 +101,10 @@ def input_number(question, min_number, max_number, default=None):
try: try:
selected = int(selected) selected = int(selected)
if selected < min_number or selected > max_number: if selected < min_number or selected > max_number:
print "Please input a number between %i and %i." % \ print("Please input a number between %i and %i." % \
(min_number, max_number) (min_number, max_number))
except ValueError: except ValueError:
print "Please input a number." print("Please input a number.")
assert type(selected) == int assert type(selected) == int
return selected return selected
@ -113,9 +118,9 @@ def confirmation_prompt(message=None, action=None):
action = 'continue' action = 'continue'
message = 'Press [Enter] to %s. Press [Ctrl-C] to abort now.' % action message = 'Press [Enter] to %s. Press [Ctrl-C] to abort now.' % action
try: try:
raw_input(message) input(message)
except (EOFError, KeyboardInterrupt): except (EOFError, KeyboardInterrupt):
print '\nAborting as requested.' print('\nAborting as requested.')
sys.exit(1) sys.exit(1)
@ -129,9 +134,9 @@ class EditFile(object):
def edit(self, optional=False): def edit(self, optional=False):
if optional: if optional:
print "\n\nCurrently the %s looks like:" % self.description print("\n\nCurrently the %s looks like:" % self.description)
with open(self.filename, 'r') as f: with open(self.filename, 'r') as f:
print f.read() print(f.read())
if YesNoQuestion().ask("Edit", "no") == "no": if YesNoQuestion().ask("Edit", "no") == "no":
return return
@ -150,12 +155,12 @@ class EditFile(object):
placeholders_present = True placeholders_present = True
if placeholders_present: if placeholders_present:
print ("Placeholders still present in the %s. " print("Placeholders still present in the %s. "
"Please replace them with useful information." "Please replace them with useful information."
% self.description) % self.description)
confirmation_prompt(action='edit again') confirmation_prompt(action='edit again')
elif not modified: elif not modified:
print "The %s was not modified" % self.description print("The %s was not modified" % self.description)
if YesNoQuestion().ask("Edit again", "yes") == "no": if YesNoQuestion().ask("Edit again", "yes") == "no":
done = True done = True
elif self.check_edit(): elif self.check_edit():
@ -189,7 +194,7 @@ class EditBugReport(EditFile):
report = f.read().decode('utf-8') report = f.read().decode('utf-8')
if self.split_re.match(report) is None: if self.split_re.match(report) is None:
print ("The %s doesn't start with 'Summary:' and 'Description:' " print("The %s doesn't start with 'Summary:' and 'Description:' "
"blocks" % self.description) "blocks" % self.description)
confirmation_prompt('edit again') confirmation_prompt('edit again')
return False return False

View File

@ -20,6 +20,8 @@
# Please see the /usr/share/common-licenses/GPL-2 file for the full text # Please see the /usr/share/common-licenses/GPL-2 file for the full text
# of the GNU General Public License license. # of the GNU General Public License license.
from __future__ import print_function
import re import re
from debian.deb822 import Changes from debian.deb822 import Changes
@ -39,7 +41,7 @@ def get_debian_srcpkg(name, release):
try: try:
release = DebianDistroInfo().codename(release, None, release) release = DebianDistroInfo().codename(release, None, release)
except DistroDataOutdated, e: except DistroDataOutdated as e:
Logger.warn(e) Logger.warn(e)
return debian_archive.getSourcePackage(name, release) return debian_archive.getSourcePackage(name, release)
@ -71,11 +73,11 @@ def need_sponsorship(name, component, release):
need_sponsor = not PersonTeam.me.canUploadPackage(archive, distroseries, need_sponsor = not PersonTeam.me.canUploadPackage(archive, distroseries,
name, component) name, component)
if need_sponsor: if need_sponsor:
print '''You are not able to upload this package directly to Ubuntu. print('''You are not able to upload this package directly to Ubuntu.
Your sync request shall require an approval by a member of the appropriate Your sync request shall require an approval by a member of the appropriate
sponsorship team, who shall be subscribed to this bug report. sponsorship team, who shall be subscribed to this bug report.
This must be done before it can be processed by a member of the Ubuntu Archive This must be done before it can be processed by a member of the Ubuntu Archive
team.''' team.''')
confirmation_prompt() confirmation_prompt()
return need_sponsor return need_sponsor
@ -98,7 +100,7 @@ def check_existing_reports(srcpkg):
for bug in pkg_bug_list: for bug in pkg_bug_list:
# check for Sync or sync and the package name # check for Sync or sync and the package name
if not bug.is_complete and 'ync %s' % srcpkg in bug.title: if not bug.is_complete and 'ync %s' % srcpkg in bug.title:
print ('The following bug could be a possible duplicate sync bug ' print('The following bug could be a possible duplicate sync bug '
'on Launchpad:\n' 'on Launchpad:\n'
' * %s (%s)\n' ' * %s (%s)\n'
'Please check the above URL to verify this before ' 'Please check the above URL to verify this before '
@ -127,7 +129,7 @@ def get_ubuntu_delta_changelog(srcpkg):
break break
try: try:
response, body = Http().request(changes_url) response, body = Http().request(changes_url)
except HttpLib2Error, e: except HttpLib2Error as e:
Logger.error(str(e)) Logger.error(str(e))
break break
if response.status != 200: if response.status != 200:
@ -156,7 +158,7 @@ def post_bug(srcpkg, subscribe, status, bugtitle, bugtext):
Use the LP API to file the sync request. Use the LP API to file the sync request.
''' '''
print ('The final report is:\nSummary: %s\nDescription:\n%s\n' print('The final report is:\nSummary: %s\nDescription:\n%s\n'
% (bugtitle, bugtext)) % (bugtitle, bugtext))
confirmation_prompt() confirmation_prompt()
@ -181,5 +183,5 @@ def post_bug(srcpkg, subscribe, status, bugtitle, bugtext):
bug.subscribe(person = PersonTeam(subscribe)()) bug.subscribe(person = PersonTeam(subscribe)())
print ('Sync request filed as bug #%i: %s' print('Sync request filed as bug #%i: %s'
% (bug.id, bug.web_link)) % (bug.id, bug.web_link))

View File

@ -20,6 +20,8 @@
# Please see the /usr/share/common-licenses/GPL-2 file for the full text # Please see the /usr/share/common-licenses/GPL-2 file for the full text
# of the GNU General Public License license. # of the GNU General Public License license.
from __future__ import print_function
import os import os
import re import re
import sys import sys
@ -27,6 +29,10 @@ import smtplib
import socket import socket
import tempfile import tempfile
if sys.version_info[0] >= 3:
basestring = str
unicode = str
from debian.changelog import Changelog, Version from debian.changelog import Changelog, Version
from distro_info import DebianDistroInfo, DistroDataOutdated from distro_info import DebianDistroInfo, DistroDataOutdated
@ -51,7 +57,7 @@ def _get_srcpkg(distro, name, release):
debian_info = DebianDistroInfo() debian_info = DebianDistroInfo()
try: try:
release = debian_info.codename(release, default=release) release = debian_info.codename(release, default=release)
except DistroDataOutdated, e: except DistroDataOutdated as e:
Logger.warn(e) Logger.warn(e)
lines = list(rmadison(distro, name, suite=release, arch='source')) lines = list(rmadison(distro, name, suite=release, arch='source'))
@ -86,7 +92,7 @@ def check_existing_reports(srcpkg):
''' '''
Point the user to the URL to manually check for duplicate bug reports. Point the user to the URL to manually check for duplicate bug reports.
''' '''
print ('Please check on ' print('Please check on '
'https://bugs.launchpad.net/ubuntu/+source/%s/+bugs\n' 'https://bugs.launchpad.net/ubuntu/+source/%s/+bugs\n'
'for duplicate sync requests before continuing.' % srcpkg) 'for duplicate sync requests before continuing.' % srcpkg)
confirmation_prompt() confirmation_prompt()
@ -164,7 +170,7 @@ Content-Type: text/plain; charset=UTF-8
%s''' % (myemailaddr, to, bugtitle, signed_report) %s''' % (myemailaddr, to, bugtitle, signed_report)
print 'The final report is:\n%s' % mail print('The final report is:\n%s' % mail)
confirmation_prompt() confirmation_prompt()
# save mail in temporary file # save mail in temporary file
@ -184,11 +190,11 @@ Content-Type: text/plain; charset=UTF-8
mailserver_port) mailserver_port)
s = smtplib.SMTP(mailserver_host, mailserver_port) s = smtplib.SMTP(mailserver_host, mailserver_port)
break break
except socket.error, s: except socket.error as s:
Logger.error('Could not connect to %s:%s: %s (%i)', Logger.error('Could not connect to %s:%s: %s (%i)',
mailserver_host, mailserver_port, s[1], s[0]) mailserver_host, mailserver_port, s[1], s[0])
return return
except smtplib.SMTPConnectError, s: except smtplib.SMTPConnectError as s:
Logger.error('Could not connect to %s:%s: %s (%i)', Logger.error('Could not connect to %s:%s: %s (%i)',
mailserver_host, mailserver_port, s[1], s[0]) mailserver_host, mailserver_port, s[1], s[0])
if s.smtp_code == 421: if s.smtp_code == 421:
@ -215,7 +221,7 @@ Content-Type: text/plain; charset=UTF-8
os.remove(backup.name) os.remove(backup.name)
Logger.normal('Sync request mailed.') Logger.normal('Sync request mailed.')
break break
except smtplib.SMTPRecipientsRefused, smtperror: except smtplib.SMTPRecipientsRefused as smtperror:
smtp_code, smtp_message = smtperror.recipients[to] smtp_code, smtp_message = smtperror.recipients[to]
Logger.error('Error while sending: %i, %s', smtp_code, smtp_message) Logger.error('Error while sending: %i, %s', smtp_code, smtp_message)
if smtp_code == 450: if smtp_code == 450:
@ -223,7 +229,7 @@ Content-Type: text/plain; charset=UTF-8
'[Enter] to retry. Press [Ctrl-C] to abort now.') '[Enter] to retry. Press [Ctrl-C] to abort now.')
else: else:
return return
except smtplib.SMTPResponseException, e: except smtplib.SMTPResponseException as e:
Logger.error('Error while sending: %i, %s', Logger.error('Error while sending: %i, %s',
e.smtp_code, e.smtp_error) e.smtp_code, e.smtp_error)
return return

View File

@ -17,7 +17,11 @@
import os import os
import re import re
import urllib try:
from urllib.parse import unquote
from urllib.request import urlretrieve
except ImportError:
from urllib import unquote, urlretrieve
import debian.debian_support import debian.debian_support
import distro_info import distro_info
@ -63,7 +67,7 @@ class BugTask(object):
source_files = self.get_source().sourceFileUrls() source_files = self.get_source().sourceFileUrls()
dsc_file = "" dsc_file = ""
for url in source_files: for url in source_files:
filename = urllib.unquote(os.path.basename(url)) filename = unquote(os.path.basename(url))
Logger.info("Downloading %s..." % (filename)) Logger.info("Downloading %s..." % (filename))
# HttpLib2 isn't suitable for large files (it reads into memory), # HttpLib2 isn't suitable for large files (it reads into memory),
# but we want its https certificate validation on the .dsc # but we want its https certificate validation on the .dsc
@ -75,7 +79,7 @@ class BugTask(object):
dsc_file = os.path.join(os.getcwd(), filename) dsc_file = os.path.join(os.getcwd(), filename)
else: else:
urllib.urlretrieve(url, filename) urlretrieve(url, filename)
assert os.path.isfile(dsc_file), "%s does not exist." % (dsc_file) assert os.path.isfile(dsc_file), "%s does not exist." % (dsc_file)
return dsc_file return dsc_file

View File

@ -21,6 +21,7 @@ import re
from ubuntutools import subprocess from ubuntutools import subprocess
from ubuntutools.logger import Logger from ubuntutools.logger import Logger
from ubuntutools.sponsor_patch.question import ask_for_manual_fixing from ubuntutools.sponsor_patch.question import ask_for_manual_fixing
from functools import reduce
class Patch(object): class Patch(object):
"""This object represents a patch that can be downloaded from Launchpad.""" """This object represents a patch that can be downloaded from Launchpad."""

View File

@ -15,6 +15,8 @@
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF # ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
# OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. # OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
from __future__ import print_function
import sys import sys
from ubuntutools.question import Question, YesNoQuestion from ubuntutools.question import Question, YesNoQuestion
@ -43,5 +45,5 @@ def ask_for_manual_fixing():
def user_abort(): def user_abort():
"""Print abort and quit the program.""" """Print abort and quit the program."""
print "User abort." print("User abort.")
sys.exit(2) sys.exit(2)

View File

@ -15,6 +15,8 @@
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF # ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
# OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. # OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
from __future__ import print_function
import os import os
import re import re
import sys import sys
@ -301,7 +303,7 @@ class SourcePackage(object):
bug title.""" bug title."""
if not task.title_contains(self._version): if not task.title_contains(self._version):
print "Bug #%i title: %s" % (bug_number, task.get_bug_title()) print("Bug #%i title: %s" % (bug_number, task.get_bug_title()))
msg = "Is %s %s the version that should be synced" % (self._package, msg = "Is %s %s the version that should be synced" % (self._package,
self._version) self._version)
answer = YesNoQuestion().ask(msg, "no") answer = YesNoQuestion().ask(msg, "no")
@ -349,7 +351,7 @@ class SourcePackage(object):
assert os.path.isfile(self._changes_file), "%s does not exist." % \ assert os.path.isfile(self._changes_file), "%s does not exist." % \
(self._changes_file) (self._changes_file)
changes = debian.deb822.Changes(file(self._changes_file)) changes = debian.deb822.Changes(open(self._changes_file))
fixed_bugs = [] fixed_bugs = []
if "Launchpad-Bugs-Fixed" in changes: if "Launchpad-Bugs-Fixed" in changes:
fixed_bugs = changes["Launchpad-Bugs-Fixed"].split(" ") fixed_bugs = changes["Launchpad-Bugs-Fixed"].split(" ")
@ -370,16 +372,16 @@ class SourcePackage(object):
"""Print things that should be checked before uploading a package.""" """Print things that should be checked before uploading a package."""
lintian_filename = self._run_lintian() lintian_filename = self._run_lintian()
print "\nPlease check %s %s carefully:" % (self._package, self._version) print("\nPlease check %s %s carefully:" % (self._package, self._version))
if os.path.isfile(self._debdiff_filename): if os.path.isfile(self._debdiff_filename):
print "file://" + self._debdiff_filename print("file://" + self._debdiff_filename)
print "file://" + lintian_filename print("file://" + lintian_filename)
if self._build_log: if self._build_log:
print "file://" + self._build_log print("file://" + self._build_log)
harvest = Harvest(self._package) harvest = Harvest(self._package)
if harvest.data: if harvest.data:
print harvest.report() print(harvest.report())
def reload_changelog(self): def reload_changelog(self):
"""Reloads debian/changelog and updates the version. """Reloads debian/changelog and updates the version.
@ -391,9 +393,9 @@ class SourcePackage(object):
# Check the changelog # Check the changelog
self._changelog = debian.changelog.Changelog() self._changelog = debian.changelog.Changelog()
try: try:
self._changelog.parse_changelog(file("debian/changelog"), self._changelog.parse_changelog(open("debian/changelog"),
max_blocks=1, strict=True) max_blocks=1, strict=True)
except debian.changelog.ChangelogParseError, error: except debian.changelog.ChangelogParseError as error:
Logger.error("The changelog entry doesn't validate: %s", str(error)) Logger.error("The changelog entry doesn't validate: %s", str(error))
ask_for_manual_fixing() ask_for_manual_fixing()
return False return False

View File

@ -15,11 +15,16 @@
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF # ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
# OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. # OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
from __future__ import print_function
import os import os
import pwd import pwd
import shutil import shutil
import sys import sys
if sys.version_info[0] < 3:
range = xrange
from distro_info import UbuntuDistroInfo from distro_info import UbuntuDistroInfo
from launchpadlib.launchpad import Launchpad from launchpadlib.launchpad import Launchpad
@ -81,11 +86,11 @@ def edit_source():
# Spawn shell to allow modifications # Spawn shell to allow modifications
cmd = [get_user_shell()] cmd = [get_user_shell()]
Logger.command(cmd) Logger.command(cmd)
print """An interactive shell was launched in print("""An interactive shell was launched in
file://%s file://%s
Edit your files. When you are done, exit the shell. If you wish to abort the Edit your files. When you are done, exit the shell. If you wish to abort the
process, exit the shell such that it returns an exit code other than zero. process, exit the shell such that it returns an exit code other than zero.
""" % (os.getcwd()), """ % (os.getcwd()), end=' ')
returncode = subprocess.call(cmd) returncode = subprocess.call(cmd)
if returncode != 0: if returncode != 0:
Logger.error("Shell exited with exit value %i." % (returncode)) Logger.error("Shell exited with exit value %i." % (returncode))
@ -113,10 +118,10 @@ def ask_for_patch_or_branch(bug, attached_patches, linked_branches):
i = 0 i = 0
for linked_branch in linked_branches: for linked_branch in linked_branches:
i += 1 i += 1
print "%i) %s" % (i, linked_branch.display_name) print("%i) %s" % (i, linked_branch.display_name))
for attached_patch in attached_patches: for attached_patch in attached_patches:
i += 1 i += 1
print "%i) %s" % (i, attached_patch.title) print("%i) %s" % (i, attached_patch.title))
selected = input_number("Which branch or patch do you want to download", selected = input_number("Which branch or patch do you want to download",
1, i, i) 1, i, i)
if selected <= len(linked_branches): if selected <= len(linked_branches):
@ -220,9 +225,9 @@ def get_open_ubuntu_bug_task(launchpad, bug, branch=None):
else: else:
Logger.normal("https://launchpad.net/bugs/%i has %i Ubuntu tasks:" \ Logger.normal("https://launchpad.net/bugs/%i has %i Ubuntu tasks:" \
% (bug_id, len(ubuntu_tasks))) % (bug_id, len(ubuntu_tasks)))
for i in xrange(len(ubuntu_tasks)): for i in range(len(ubuntu_tasks)):
print "%i) %s" % (i + 1, print("%i) %s" % (i + 1,
ubuntu_tasks[i].get_package_and_series()) ubuntu_tasks[i].get_package_and_series()))
selected = input_number("To which Ubuntu task does the patch belong", selected = input_number("To which Ubuntu task does the patch belong",
1, len(ubuntu_tasks)) 1, len(ubuntu_tasks))
task = ubuntu_tasks[selected - 1] task = ubuntu_tasks[selected - 1]
@ -235,7 +240,7 @@ def _create_and_change_into(workdir):
if not os.path.isdir(workdir): if not os.path.isdir(workdir):
try: try:
os.makedirs(workdir) os.makedirs(workdir)
except os.error, error: except os.error as error:
Logger.error("Failed to create the working directory %s [Errno " \ Logger.error("Failed to create the working directory %s [Errno " \
"%i]: %s." % (workdir, error.errno, error.strerror)) "%i]: %s." % (workdir, error.errno, error.strerror))
sys.exit(1) sys.exit(1)
@ -248,7 +253,7 @@ def _update_maintainer_field():
Logger.command(["update-maintainer"]) Logger.command(["update-maintainer"])
try: try:
update_maintainer("debian", Logger.verbose) update_maintainer("debian", Logger.verbose)
except MaintainerUpdateException, e: except MaintainerUpdateException as e:
Logger.error("update-maintainer failed: %s", str(e)) Logger.error("update-maintainer failed: %s", str(e))
sys.exit(1) sys.exit(1)

View File

@ -14,19 +14,30 @@
# OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR # OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
# PERFORMANCE OF THIS SOFTWARE. # PERFORMANCE OF THIS SOFTWARE.
from __future__ import with_statement
import __builtin__ try:
import builtins
except ImportError:
import __builtin__
import os.path import os.path
import shutil import shutil
import StringIO try:
from StringIO import StringIO
except:
from io import StringIO
from io import BytesIO
import tempfile import tempfile
import types import types
import urllib2 try:
from urllib.request import OpenerDirector, urlopen
from urllib.error import HTTPError, URLError
except ImportError:
from urllib2 import OpenerDirector, urlopen
from urllib2 import HTTPError, URLError
import debian.deb822 import debian.deb822
import httplib2 import httplib2
import mox import sys
import mock
import ubuntutools.archive import ubuntutools.archive
from ubuntutools.config import UDTConfig from ubuntutools.config import UDTConfig
@ -44,15 +55,11 @@ def setUpModule():
ex_pkg.cleanup() ex_pkg.cleanup()
class DscVerificationTestCase(mox.MoxTestBase, unittest.TestCase): class DscVerificationTestCase(unittest.TestCase):
def setUp(self): def setUp(self):
super(DscVerificationTestCase, self).setUp()
with open('test-data/example_1.0-1.dsc', 'rb') as f: with open('test-data/example_1.0-1.dsc', 'rb') as f:
self.dsc = ubuntutools.archive.Dsc(f.read()) self.dsc = ubuntutools.archive.Dsc(f.read())
def tearDown(self):
super(DscVerificationTestCase, self).tearDown()
def test_good(self): def test_good(self):
self.assertTrue(self.dsc.verify_file( self.assertTrue(self.dsc.verify_file(
'test-data/example_1.0.orig.tar.gz')) 'test-data/example_1.0.orig.tar.gz'))
@ -67,10 +74,18 @@ class DscVerificationTestCase(mox.MoxTestBase, unittest.TestCase):
fn = 'test-data/example_1.0.orig.tar.gz' fn = 'test-data/example_1.0.orig.tar.gz'
with open(fn, 'rb') as f: with open(fn, 'rb') as f:
data = f.read() data = f.read()
data = data[:-1] + chr(ord(data[-1]) ^ 8) if sys.version_info[0] >= 3:
self.mox.StubOutWithMock(__builtin__, 'open') last_byte = chr(data[-1] ^ 8).encode()
open(fn, 'rb').AndReturn(StringIO.StringIO(data)) else:
self.mox.ReplayAll() last_byte = chr(ord(data[-1]) ^ 8)
data = data[:-1] + last_byte
m = mock.MagicMock(name='open', spec=open)
m.return_value = BytesIO(data)
if sys.version_info[0] >= 3:
target = 'builtins.open'
else:
target = '__builtin__.open'
with mock.patch(target, m):
self.assertFalse(self.dsc.verify_file(fn)) self.assertFalse(self.dsc.verify_file(fn))
def test_sha1(self): def test_sha1(self):
@ -85,26 +100,31 @@ class DscVerificationTestCase(mox.MoxTestBase, unittest.TestCase):
self.test_bad() self.test_bad()
class LocalSourcePackageTestCase(mox.MoxTestBase, unittest.TestCase): class LocalSourcePackageTestCase(unittest.TestCase):
SourcePackage = ubuntutools.archive.UbuntuSourcePackage SourcePackage = ubuntutools.archive.UbuntuSourcePackage
def setUp(self): def setUp(self):
super(LocalSourcePackageTestCase, self).setUp()
self.workdir = tempfile.mkdtemp(prefix='udt-test') self.workdir = tempfile.mkdtemp(prefix='udt-test')
self.mox.StubOutWithMock(ubuntutools.archive, 'Distribution') self._stubout('ubuntutools.archive.Distribution')
self.mox.StubOutWithMock(ubuntutools.archive, 'rmadison') self._stubout('ubuntutools.archive.rmadison')
self.real_http = httplib2.Http() self.mock_http = self._stubout('httplib2.Http.request')
self.mox.StubOutWithMock(httplib2, 'Http') self.mock_http.side_effect = self.request_proxy
self.mock_http = self.mox.CreateMock(httplib2.Http)
self.url_opener = mock.MagicMock(spec=OpenerDirector)
self.url_opener.open.side_effect = self.urlopen_proxy
# Silence the tests a little: # Silence the tests a little:
self.mox.stubs.Set(Logger, 'stdout', StringIO.StringIO()) self._stubout('ubuntutools.logger.Logger.stdout')
self.mox.stubs.Set(Logger, 'stderr', StringIO.StringIO()) self._stubout('ubuntutools.logger.Logger.stderr')
def _stubout(self, stub):
patcher = mock.patch(stub)
self.addCleanup(patcher.stop)
return patcher.start()
def tearDown(self): def tearDown(self):
super(LocalSourcePackageTestCase, self).tearDown()
shutil.rmtree(self.workdir) shutil.rmtree(self.workdir)
def urlopen_proxy(self, url, destname=None): def urlopen_proxy(self, url, destname=None):
@ -112,7 +132,7 @@ class LocalSourcePackageTestCase(mox.MoxTestBase, unittest.TestCase):
if destname is None: if destname is None:
destname = os.path.basename(url) destname = os.path.basename(url)
destpath = os.path.join(os.path.abspath('test-data'), destname) destpath = os.path.join(os.path.abspath('test-data'), destname)
return urllib2.urlopen('file://' + destpath) return urlopen('file://' + destpath)
def urlopen_file(self, filename): def urlopen_file(self, filename):
"Wrapper for urlopen_proxy for named files" "Wrapper for urlopen_proxy for named files"
@ -120,11 +140,11 @@ class LocalSourcePackageTestCase(mox.MoxTestBase, unittest.TestCase):
def urlopen_null(self, url): def urlopen_null(self, url):
"urlopen for zero length files" "urlopen for zero length files"
return StringIO.StringIO('') return BytesIO(b'')
def urlopen_404(self, url): def urlopen_404(self, url):
"urlopen for errors" "urlopen for errors"
raise urllib2.HTTPError(url, 404, "Not Found", {}, None) raise HTTPError(url, 404, "Not Found", {}, None)
def request_proxy(self, url, destname=None): def request_proxy(self, url, destname=None):
"httplib2 proxy for grabbing the file from test-data" "httplib2 proxy for grabbing the file from test-data"
@ -132,7 +152,7 @@ class LocalSourcePackageTestCase(mox.MoxTestBase, unittest.TestCase):
destname = os.path.basename(url) destname = os.path.basename(url)
destpath = os.path.join(os.path.abspath('test-data'), destname) destpath = os.path.join(os.path.abspath('test-data'), destname)
response = httplib2.Response({}) response = httplib2.Response({})
with open(destpath, 'r') as f: with open(destpath, 'rb') as f:
body = f.read() body = f.read()
return response, body return response, body
@ -141,10 +161,17 @@ class LocalSourcePackageTestCase(mox.MoxTestBase, unittest.TestCase):
response = httplib2.Response({'status': 404}) response = httplib2.Response({'status': 404})
return response, "I'm a 404 Error" return response, "I'm a 404 Error"
def request_404_then_proxy(self, url, destname=None):
"mock side_effect callable to chain request 404 & proxy"
if self.mock_http.called:
return self.request_proxy(url, destname)
return self.request_404(url)
def test_local_copy(self): def test_local_copy(self):
pkg = self.SourcePackage('example', '1.0-1', 'main', pkg = self.SourcePackage('example', '1.0-1', 'main',
dscfile='test-data/example_1.0-1.dsc', dscfile='test-data/example_1.0-1.dsc',
workdir=self.workdir) workdir=self.workdir)
pkg.quiet = True
pkg.pull() pkg.pull()
pkg.unpack() pkg.unpack()
@ -156,6 +183,7 @@ class LocalSourcePackageTestCase(mox.MoxTestBase, unittest.TestCase):
pkg = self.SourcePackage(dscfile=os.path.join(self.workdir, pkg = self.SourcePackage(dscfile=os.path.join(self.workdir,
'example_1.0-1.dsc'), 'example_1.0-1.dsc'),
workdir=self.workdir) workdir=self.workdir)
pkg.quiet = True
pkg.pull() pkg.pull()
pkg.unpack() pkg.unpack()
@ -168,6 +196,7 @@ class LocalSourcePackageTestCase(mox.MoxTestBase, unittest.TestCase):
dscfile=os.path.join(self.workdir, dscfile=os.path.join(self.workdir,
'example_1.0-1.dsc'), 'example_1.0-1.dsc'),
workdir=self.workdir) workdir=self.workdir)
pkg.quiet = True
pkg.pull() pkg.pull()
pkg.unpack() pkg.unpack()
@ -177,31 +206,24 @@ class LocalSourcePackageTestCase(mox.MoxTestBase, unittest.TestCase):
shutil.copy2('test-data/example_1.0-1.debian.tar.xz', self.workdir) shutil.copy2('test-data/example_1.0-1.debian.tar.xz', self.workdir)
with open(os.path.join(self.workdir, 'example_1.0-1.debian.tar.xz'), with open(os.path.join(self.workdir, 'example_1.0-1.debian.tar.xz'),
'r+b') as f: 'r+b') as f:
f.write('CORRUPTION') f.write(b'CORRUPTION')
pkg = self.SourcePackage('example', '1.0-1', 'main', pkg = self.SourcePackage('example', '1.0-1', 'main',
dscfile='test-data/example_1.0-1.dsc', dscfile='test-data/example_1.0-1.dsc',
workdir=self.workdir) workdir=self.workdir)
pkg.quiet = True
pkg.pull() pkg.pull()
def test_pull(self): def test_pull(self):
dist = self.SourcePackage.distribution dist = self.SourcePackage.distribution
mirror = UDTConfig.defaults['%s_MIRROR' % dist.upper()] mirror = UDTConfig.defaults['%s_MIRROR' % dist.upper()]
urlbase = '/pool/main/e/example/' urlbase = '/pool/main/e/example/'
httplib2.Http().AndReturn(self.mock_http)
self.mock_http.request('https://launchpad.net/%s/+archive/primary/'
'+files/example_1.0-1.dsc' % dist
).WithSideEffects(self.request_proxy)
url_opener = self.mox.CreateMock(urllib2.OpenerDirector)
url_opener.open(mirror + urlbase + 'example_1.0.orig.tar.gz'
).WithSideEffects(self.urlopen_proxy)
url_opener.open(mirror + urlbase + 'example_1.0-1.debian.tar.xz'
).WithSideEffects(self.urlopen_proxy)
self.mox.ReplayAll()
pkg = self.SourcePackage('example', '1.0-1', 'main', pkg = self.SourcePackage('example', '1.0-1', 'main',
workdir=self.workdir) workdir=self.workdir)
pkg.url_opener = url_opener
pkg.url_opener = self.url_opener
pkg.quiet = True
pkg.pull() pkg.pull()
def test_mirrors(self): def test_mirrors(self):
@ -209,34 +231,24 @@ class LocalSourcePackageTestCase(mox.MoxTestBase, unittest.TestCase):
mirror = 'http://mirror' mirror = 'http://mirror'
lpbase = 'https://launchpad.net/ubuntu/+archive/primary/+files/' lpbase = 'https://launchpad.net/ubuntu/+archive/primary/+files/'
urlbase = '/pool/main/e/example/' urlbase = '/pool/main/e/example/'
httplib2.Http().AndReturn(self.mock_http) sequence = [self.urlopen_null, self.urlopen_404, self.urlopen_proxy,
self.mock_http.request(lpbase + 'example_1.0-1.dsc' self.urlopen_proxy]
).WithSideEffects(self.request_proxy) def _callable_iter(*args, **kwargs):
url_opener = self.mox.CreateMock(urllib2.OpenerDirector) return sequence.pop(0)(*args, **kwargs)
url_opener.open(mirror + urlbase + 'example_1.0.orig.tar.gz' url_opener = mock.MagicMock(spec=OpenerDirector)
).WithSideEffects(self.urlopen_null) url_opener.open.side_effect = _callable_iter
url_opener.open(master + urlbase + 'example_1.0.orig.tar.gz'
).WithSideEffects(self.urlopen_404)
url_opener.open(lpbase + 'example_1.0.orig.tar.gz'
).WithSideEffects(self.urlopen_proxy)
url_opener.open(mirror + urlbase + 'example_1.0-1.debian.tar.xz'
).WithSideEffects(self.urlopen_proxy)
self.mox.ReplayAll()
pkg = self.SourcePackage('example', '1.0-1', 'main', pkg = self.SourcePackage('example', '1.0-1', 'main',
workdir=self.workdir, mirrors=[mirror]) workdir=self.workdir, mirrors=[mirror])
pkg.url_opener = url_opener pkg.url_opener = url_opener
pkg.quiet = True
pkg.pull() pkg.pull()
def test_dsc_missing(self): def test_dsc_missing(self):
lpbase = 'https://launchpad.net/ubuntu/+archive/primary/+files/' self.mock_http.side_effect = self.request_404
httplib2.Http().AndReturn(self.mock_http)
self.mock_http.request(lpbase + 'example_1.0-1.dsc'
).WithSideEffects(self.request_404)
self.mox.ReplayAll()
pkg = self.SourcePackage('example', '1.0-1', 'main', pkg = self.SourcePackage('example', '1.0-1', 'main',
workdir=self.workdir) workdir=self.workdir)
pkg.quiet = True
self.assertRaises(ubuntutools.archive.DownloadError, pkg.pull) self.assertRaises(ubuntutools.archive.DownloadError, pkg.pull)
@ -251,35 +263,24 @@ class DebianLocalSourcePackageTestCase(LocalSourcePackageTestCase):
lpbase = 'https://launchpad.net/debian/+archive/primary/+files/' lpbase = 'https://launchpad.net/debian/+archive/primary/+files/'
base = '/pool/main/e/example/' base = '/pool/main/e/example/'
httplib2.Http().AndReturn(self.mock_http) sequence = [self.urlopen_null,
self.mock_http.request(lpbase + 'example_1.0-1.dsc' self.urlopen_404,
).WithSideEffects(self.request_proxy) self.urlopen_404,
url_opener = self.mox.CreateMock(urllib2.OpenerDirector) self.urlopen_404,
url_opener.open(debian_mirror + base + 'example_1.0.orig.tar.gz' self.urlopen_404,
).WithSideEffects(self.urlopen_null) lambda x: BytesIO(
url_opener.open(debsec_mirror + base + 'example_1.0.orig.tar.gz' b'{"fileinfo": {"hashabc": [{"name": "example_1.0.orig.tar.gz"}]}}'),
).WithSideEffects(self.urlopen_404) self.urlopen_file('example_1.0.orig.tar.gz'),
url_opener.open(debian_master + base + 'example_1.0.orig.tar.gz' self.urlopen_proxy]
).WithSideEffects(self.urlopen_404) def _callable_iter(*args, **kwargs):
url_opener.open(debsec_master + base + 'example_1.0.orig.tar.gz' return sequence.pop(0)(*args, **kwargs)
).WithSideEffects(self.urlopen_404) url_opener = mock.MagicMock(spec=OpenerDirector)
url_opener.open(lpbase + 'example_1.0.orig.tar.gz' url_opener.open.side_effect = _callable_iter
).WithSideEffects(self.urlopen_404)
url_opener.open('http://snapshot.debian.org/mr/package/example/1.0-1/'
'srcfiles?fileinfo=1'
).WithSideEffects(lambda x: StringIO.StringIO(
'{"fileinfo": {"hashabc": [{"name": "example_1.0.orig.tar.gz"}]}}'
))
url_opener.open('http://snapshot.debian.org/file/hashabc'
).WithSideEffects(self.urlopen_file(
'example_1.0.orig.tar.gz'))
url_opener.open(debian_mirror + base + 'example_1.0-1.debian.tar.xz'
).WithSideEffects(self.urlopen_proxy)
self.mox.ReplayAll()
pkg = self.SourcePackage('example', '1.0-1', 'main', pkg = self.SourcePackage('example', '1.0-1', 'main',
workdir=self.workdir, mirrors=[debian_mirror, workdir=self.workdir, mirrors=[debian_mirror,
debsec_mirror]) debsec_mirror])
pkg.quiet = True
pkg.url_opener = url_opener pkg.url_opener = url_opener
pkg.pull() pkg.pull()
pkg.unpack() pkg.unpack()
@ -288,61 +289,35 @@ class DebianLocalSourcePackageTestCase(LocalSourcePackageTestCase):
mirror = 'http://mirror' mirror = 'http://mirror'
lpbase = 'https://launchpad.net/debian/+archive/primary/+files/' lpbase = 'https://launchpad.net/debian/+archive/primary/+files/'
base = '/pool/main/e/example/' base = '/pool/main/e/example/'
httplib2.Http().AndReturn(self.mock_http) self.mock_http.side_effect = self.request_404_then_proxy
self.mock_http.request(lpbase + 'example_1.0-1.dsc'
).WithSideEffects(self.request_404)
httplib2.Http().AndReturn(self.mock_http)
self.mock_http.request(mirror + base + 'example_1.0-1.dsc'
).WithSideEffects(self.request_proxy)
url_opener = self.mox.CreateMock(urllib2.OpenerDirector)
url_opener.open(mirror + base + 'example_1.0.orig.tar.gz'
).WithSideEffects(self.urlopen_proxy)
url_opener.open(mirror + base + 'example_1.0-1.debian.tar.xz'
).WithSideEffects(self.urlopen_proxy)
def fake_gpg_info(self, message, keyrings=None): patcher = mock.patch.object(debian.deb822.GpgInfo, 'from_sequence')
return debian.deb822.GpgInfo.from_output( self.addCleanup(patcher.stop)
mock_gpg_info = patcher.start()
mock_gpg_info.return_value = debian.deb822.GpgInfo.from_output(
'[GNUPG:] GOODSIG DEADBEEF Joe Developer ' '[GNUPG:] GOODSIG DEADBEEF Joe Developer '
'<joe@example.net>') '<joe@example.net>')
# We have to stub this out without mox because there some versions of
# python-debian will pass keyrings=None, others won't.
# http://code.google.com/p/pymox/issues/detail?id=37
self.mox.stubs.Set(debian.deb822.GpgInfo, 'from_sequence',
types.MethodType(fake_gpg_info,
debian.deb822.GpgInfo,
debian.deb822.GpgInfo))
self.mox.ReplayAll()
pkg = self.SourcePackage('example', '1.0-1', 'main', pkg = self.SourcePackage('example', '1.0-1', 'main',
workdir=self.workdir, mirrors=[mirror]) workdir=self.workdir, mirrors=[mirror])
pkg.url_opener = url_opener pkg.url_opener = self.url_opener
pkg.pull() pkg.pull()
def test_dsc_badsig(self): def test_dsc_badsig(self):
mirror = 'http://mirror' mirror = 'http://mirror'
lpbase = 'https://launchpad.net/debian/+archive/primary/+files/' lpbase = 'https://launchpad.net/debian/+archive/primary/+files/'
base = '/pool/main/e/example/' base = '/pool/main/e/example/'
httplib2.Http().AndReturn(self.mock_http) self.mock_http.side_effect = self.request_404_then_proxy
self.mock_http.request(lpbase + 'example_1.0-1.dsc'
).WithSideEffects(self.request_404)
httplib2.Http().AndReturn(self.mock_http)
self.mock_http.request(mirror + base + 'example_1.0-1.dsc'
).WithSideEffects(self.request_proxy)
def fake_gpg_info(self, message, keyrings=None): patcher = mock.patch.object(debian.deb822.GpgInfo, 'from_sequence')
return debian.deb822.GpgInfo.from_output( self.addCleanup(patcher.stop)
mock_gpg_info = patcher.start()
mock_gpg_info.return_value = debian.deb822.GpgInfo.from_output(
'[GNUPG:] ERRSIG DEADBEEF') '[GNUPG:] ERRSIG DEADBEEF')
# We have to stub this out without mox because there some versions of
# python-debian will pass keyrings=None, others won't.
# http://code.google.com/p/pymox/issues/detail?id=37
self.mox.stubs.Set(debian.deb822.GpgInfo, 'from_sequence',
types.MethodType(fake_gpg_info,
debian.deb822.GpgInfo,
debian.deb822.GpgInfo))
self.mox.ReplayAll()
pkg = self.SourcePackage('example', '1.0-1', 'main', pkg = self.SourcePackage('example', '1.0-1', 'main',
workdir=self.workdir, mirrors=[mirror]) workdir=self.workdir, mirrors=[mirror])
try:
self.assertRaises(ubuntutools.archive.DownloadError, pkg.pull) self.assertRaises(ubuntutools.archive.DownloadError, pkg.pull)
except URLError:
raise unittest.SkipTest('Test needs addr resolution to work')

View File

@ -15,19 +15,25 @@
# OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR # OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
# PERFORMANCE OF THIS SOFTWARE. # PERFORMANCE OF THIS SOFTWARE.
import __builtin__ try:
import builtins
except ImportError:
import __builtin__
import os import os
import sys import sys
import locale import locale
from StringIO import StringIO try:
from StringIO import StringIO
except:
from io import StringIO
import mox import mock
from ubuntutools.config import UDTConfig, ubu_email from ubuntutools.config import UDTConfig, ubu_email
from ubuntutools.logger import Logger from ubuntutools.logger import Logger
from ubuntutools.test import unittest from ubuntutools.test import unittest
class ConfigTestCase(mox.MoxTestBase, unittest.TestCase): class ConfigTestCase(unittest.TestCase):
_config_files = { _config_files = {
'system': '', 'system': '',
'user': '', 'user': '',
@ -46,7 +52,18 @@ class ConfigTestCase(mox.MoxTestBase, unittest.TestCase):
def setUp(self): def setUp(self):
super(ConfigTestCase, self).setUp() super(ConfigTestCase, self).setUp()
self.mox.stubs.Set(__builtin__, 'open', self._fake_open) if sys.version_info[0] < 3:
self.assertRegex = self.assertRegexpMatches
m = mock.mock_open()
m.side_effect = self._fake_open
if sys.version_info[0] >= 3:
target = 'builtins.open'
else:
target = '__builtin__.open'
patcher = mock.patch(target, m)
self.addCleanup(patcher.stop)
patcher.start()
Logger.stdout = StringIO() Logger.stdout = StringIO()
Logger.stderr = StringIO() Logger.stderr = StringIO()
@ -63,7 +80,7 @@ class ConfigTestCase(mox.MoxTestBase, unittest.TestCase):
def clean_environment(self): def clean_environment(self):
self._config_files['system'] = '' self._config_files['system'] = ''
self._config_files['user'] = '' self._config_files['user'] = ''
for k in os.environ.keys(): for k in list(os.environ.keys()):
if k.startswith(('UBUNTUTOOLS_', 'TEST_')): if k.startswith(('UBUNTUTOOLS_', 'TEST_')):
del os.environ[k] del os.environ[k]
@ -97,7 +114,7 @@ REPEAT=yes
errs = Logger.stderr.getvalue().strip() errs = Logger.stderr.getvalue().strip()
Logger.stderr = StringIO() Logger.stderr = StringIO()
self.assertEqual(len(errs.splitlines()), 1) self.assertEqual(len(errs.splitlines()), 1)
self.assertRegexpMatches(errs, self.assertRegex(errs,
r'Warning: Cannot parse.*\bCOMMAND_EXECUTION=a') r'Warning: Cannot parse.*\bCOMMAND_EXECUTION=a')
def get_value(self, *args, **kwargs): def get_value(self, *args, **kwargs):
@ -137,7 +154,7 @@ REPEAT=yes
errs = Logger.stderr.getvalue().strip() errs = Logger.stderr.getvalue().strip()
Logger.stderr = StringIO() Logger.stderr = StringIO()
self.assertEqual(len(errs.splitlines()), 1) self.assertEqual(len(errs.splitlines()), 1)
self.assertRegexpMatches(errs, self.assertRegex(errs,
r'deprecated.*\bCOMPATFOOBAR\b.*\bTEST_QUX\b') r'deprecated.*\bCOMPATFOOBAR\b.*\bTEST_QUX\b')
def test_boolean(self): def test_boolean(self):
@ -150,7 +167,7 @@ REPEAT=yes
def test_nonpackagewide(self): def test_nonpackagewide(self):
self._config_files['user'] = 'UBUNTUTOOLS_FOOBAR=a' self._config_files['user'] = 'UBUNTUTOOLS_FOOBAR=a'
self.assertEquals(self.get_value('FOOBAR'), None) self.assertEqual(self.get_value('FOOBAR'), None)
class UbuEmailTestCase(unittest.TestCase): class UbuEmailTestCase(unittest.TestCase):
@ -217,7 +234,11 @@ class UbuEmailTestCase(unittest.TestCase):
encoding = locale.getdefaultlocale()[1] encoding = locale.getdefaultlocale()[1]
if not encoding: if not encoding:
encoding = 'utf-8' encoding = 'utf-8'
name = 'Jöe Déveloper'
env_name = name
if isinstance(name, bytes):
name = 'Jöe Déveloper'.decode('utf-8') name = 'Jöe Déveloper'.decode('utf-8')
os.environ['DEBFULLNAME'] = name.encode(encoding) env_name = name.encode(encoding)
os.environ['DEBFULLNAME'] = env_name
os.environ['DEBEMAIL'] = email = 'joe@example.net' os.environ['DEBEMAIL'] = email = 'joe@example.net'
self.assertEqual(ubu_email(), (name, email)) self.assertEqual(ubu_email(), (name, email))

View File

@ -45,6 +45,7 @@ class HelpTestCase(unittest.TestCase):
null = open('/dev/null', 'r') null = open('/dev/null', 'r')
process = subprocess.Popen(['./' + script, '--help'], process = subprocess.Popen(['./' + script, '--help'],
close_fds=True, stdin=null, close_fds=True, stdin=null,
universal_newlines=True,
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE) stderr=subprocess.PIPE)
started = time.time() started = time.time()
@ -57,7 +58,10 @@ class HelpTestCase(unittest.TestCase):
while time.time() - started < TIMEOUT: while time.time() - started < TIMEOUT:
for fd in select.select(fds, [], fds, TIMEOUT)[0]: for fd in select.select(fds, [], fds, TIMEOUT)[0]:
out.append(os.read(fd, 1024)) output = os.read(fd, 1024)
if not isinstance(output, str):
output = output.decode('utf-8')
out.append(output)
if process.poll() is not None: if process.poll() is not None:
break break

View File

@ -14,7 +14,10 @@
# OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR # OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
# PERFORMANCE OF THIS SOFTWARE. # PERFORMANCE OF THIS SOFTWARE.
import StringIO try:
from StringIO import StringIO
except:
from io import StringIO
import sys import sys
from ubuntutools.logger import Logger from ubuntutools.logger import Logger
@ -23,8 +26,8 @@ from ubuntutools.test import unittest
class LoggerTestCase(unittest.TestCase): class LoggerTestCase(unittest.TestCase):
def setUp(self): def setUp(self):
Logger.stdout = StringIO.StringIO() Logger.stdout = StringIO()
Logger.stderr = StringIO.StringIO() Logger.stderr = StringIO()
self._script_name = Logger.script_name self._script_name = Logger.script_name
Logger.script_name = 'test' Logger.script_name = 'test'
self._verbose = Logger.verbose self._verbose = Logger.verbose

View File

@ -25,9 +25,6 @@ WHITELIST = [re.compile(': %s$' % x) for x in (
r"No name '\w+Error' in module 'launchpadlib\.errors'", r"No name '\w+Error' in module 'launchpadlib\.errors'",
# http://www.logilab.org/ticket/51250: # http://www.logilab.org/ticket/51250:
r"Module 'hashlib' has no '(md5|sha(1|224|256|384|512))' member", r"Module 'hashlib' has no '(md5|sha(1|224|256|384|512))' member",
# mox:
r"Instance of '.+' has no '(WithSideEffects|MultipleTimes|AndReturn)' "
r"member",
# pylint doesn't like *args/**kwargs # pylint doesn't like *args/**kwargs
r"Instance of 'Popen' has no '.*' member", r"Instance of 'Popen' has no '.*' member",
)] )]

View File

@ -16,12 +16,19 @@
"""Test suite for ubuntutools.update_maintainer""" """Test suite for ubuntutools.update_maintainer"""
import __builtin__ try:
import builtins
except ImportError:
import __builtin__
try:
from StringIO import StringIO
except:
from io import StringIO
import os import os
import StringIO
import sys import sys
import mox import mock
from ubuntutools.logger import Logger from ubuntutools.logger import Logger
from ubuntutools.test import unittest from ubuntutools.test import unittest
@ -186,7 +193,7 @@ Package: seahorse-plugins
""" """
#pylint: disable=R0904 #pylint: disable=R0904
class UpdateMaintainerTestCase(mox.MoxTestBase, unittest.TestCase): class UpdateMaintainerTestCase(unittest.TestCase):
"""TestCase object for ubuntutools.update_maintainer""" """TestCase object for ubuntutools.update_maintainer"""
_directory = "/" _directory = "/"
@ -210,18 +217,30 @@ class UpdateMaintainerTestCase(mox.MoxTestBase, unittest.TestCase):
(mode == "r" and self._files[base] is None)): (mode == "r" and self._files[base] is None)):
raise IOError("No such file or directory: '%s'" % filename) raise IOError("No such file or directory: '%s'" % filename)
if mode == "w": if mode == "w":
self._files[base] = StringIO.StringIO() self._files[base] = StringIO()
self._files[base].close = lambda: None self._files[base].close = lambda: None
return self._files[base] return self._files[base]
#pylint: disable=C0103 #pylint: disable=C0103
def setUp(self): def setUp(self):
super(UpdateMaintainerTestCase, self).setUp() if sys.version_info[0] < 3:
self.mox.stubs.Set(__builtin__, 'open', self._fake_open) self.assertRegex = self.assertRegexpMatches
self.mox.stubs.Set(os.path, 'isfile', self._fake_isfile) m = mock.mock_open()
self._files["rules"] = StringIO.StringIO(_SIMPLE_RULES) m.side_effect = self._fake_open
Logger.stdout = StringIO.StringIO() if sys.version_info[0] >= 3:
Logger.stderr = StringIO.StringIO() target = 'builtins.open'
else:
target = '__builtin__.open'
patcher = mock.patch(target, m)
self.addCleanup(patcher.stop)
patcher.start()
m = mock.MagicMock(side_effect=self._fake_isfile)
patcher = mock.patch('os.path.isfile', m)
self.addCleanup(patcher.stop)
patcher.start()
self._files["rules"] = StringIO(_SIMPLE_RULES)
Logger.stdout = StringIO()
Logger.stderr = StringIO()
def tearDown(self): def tearDown(self):
self.assertEqual(Logger.stdout.getvalue(), '') self.assertEqual(Logger.stdout.getvalue(), '')
@ -236,8 +255,8 @@ class UpdateMaintainerTestCase(mox.MoxTestBase, unittest.TestCase):
#pylint: enable=C0103 #pylint: enable=C0103
def test_debian_package(self): def test_debian_package(self):
"""Test: Don't update Maintainer field if target is Debian.""" """Test: Don't update Maintainer field if target is Debian."""
self._files["changelog"] = StringIO.StringIO(_UNSTABLE_CHANGELOG) self._files["changelog"] = StringIO(_UNSTABLE_CHANGELOG)
self._files["control"] = StringIO.StringIO(_ABP_CONTROL) self._files["control"] = StringIO(_ABP_CONTROL)
update_maintainer(self._directory) update_maintainer(self._directory)
self.assertEqual(self._files["control"].getvalue(), _ABP_CONTROL) self.assertEqual(self._files["control"].getvalue(), _ABP_CONTROL)
@ -246,52 +265,52 @@ class UpdateMaintainerTestCase(mox.MoxTestBase, unittest.TestCase):
The Maintainer field needs to be update even if The Maintainer field needs to be update even if
XSBC-Original-Maintainer has an @ubuntu.com address.""" XSBC-Original-Maintainer has an @ubuntu.com address."""
self._files["changelog"] = StringIO.StringIO(_LUCID_CHANGELOG) self._files["changelog"] = StringIO(_LUCID_CHANGELOG)
self._files["control"] = StringIO.StringIO(_AXIS2C_CONTROL) self._files["control"] = StringIO(_AXIS2C_CONTROL)
update_maintainer(self._directory) update_maintainer(self._directory)
self.assertEqual(self._files["control"].getvalue(), _AXIS2C_UPDATED) self.assertEqual(self._files["control"].getvalue(), _AXIS2C_UPDATED)
warnings = Logger.stderr.getvalue().strip() warnings = Logger.stderr.getvalue().strip()
Logger.stderr = StringIO.StringIO() Logger.stderr = StringIO()
self.assertEqual(len(warnings.splitlines()), 1) self.assertEqual(len(warnings.splitlines()), 1)
self.assertRegexpMatches(warnings, "Warning: Overwriting original " self.assertRegex(warnings, "Warning: Overwriting original "
"maintainer: Soren Hansen " "maintainer: Soren Hansen "
"<soren@ubuntu.com>") "<soren@ubuntu.com>")
def test_update_maintainer(self): def test_update_maintainer(self):
"""Test: Update Maintainer field.""" """Test: Update Maintainer field."""
self._files["changelog"] = StringIO.StringIO(_LUCID_CHANGELOG) self._files["changelog"] = StringIO(_LUCID_CHANGELOG)
self._files["control"] = StringIO.StringIO(_ABP_CONTROL) self._files["control"] = StringIO(_ABP_CONTROL)
update_maintainer(self._directory) update_maintainer(self._directory)
self.assertEqual(self._files["control"].getvalue(), _ABP_UPDATED) self.assertEqual(self._files["control"].getvalue(), _ABP_UPDATED)
def test_update_old_maintainer(self): def test_update_old_maintainer(self):
"""Test: Update old MOTU address.""" """Test: Update old MOTU address."""
self._files["changelog"] = StringIO.StringIO(_UNSTABLE_CHANGELOG) self._files["changelog"] = StringIO(_UNSTABLE_CHANGELOG)
self._files["control.in"] = StringIO.StringIO(_ABP_OLD_MAINTAINER) self._files["control.in"] = StringIO(_ABP_OLD_MAINTAINER)
update_maintainer(self._directory, True) update_maintainer(self._directory, True)
self.assertEqual(self._files["control.in"].getvalue(), _ABP_UPDATED) self.assertEqual(self._files["control.in"].getvalue(), _ABP_UPDATED)
def test_comments_in_control(self): def test_comments_in_control(self):
"""Test: Update Maintainer field in a control file containing """Test: Update Maintainer field in a control file containing
comments.""" comments."""
self._files["changelog"] = StringIO.StringIO(_LUCID_CHANGELOG) self._files["changelog"] = StringIO(_LUCID_CHANGELOG)
self._files["control"] = StringIO.StringIO(_SEAHORSE_PLUGINS_CONTROL) self._files["control"] = StringIO(_SEAHORSE_PLUGINS_CONTROL)
update_maintainer(self._directory) update_maintainer(self._directory)
self.assertEqual(self._files["control"].getvalue(), self.assertEqual(self._files["control"].getvalue(),
_SEAHORSE_PLUGINS_UPDATED) _SEAHORSE_PLUGINS_UPDATED)
def test_skip_smart_rules(self): def test_skip_smart_rules(self):
"""Test: Skip update when XSBC-Original in debian/rules.""" """Test: Skip update when XSBC-Original in debian/rules."""
self._files["changelog"] = StringIO.StringIO(_LUCID_CHANGELOG) self._files["changelog"] = StringIO(_LUCID_CHANGELOG)
self._files["control"] = StringIO.StringIO(_ABP_CONTROL) self._files["control"] = StringIO(_ABP_CONTROL)
self._files["rules"] = StringIO.StringIO(_COMPLEX_RULES) self._files["rules"] = StringIO(_COMPLEX_RULES)
update_maintainer(self._directory) update_maintainer(self._directory)
self.assertEqual(self._files["control"].getvalue(), _ABP_CONTROL) self.assertEqual(self._files["control"].getvalue(), _ABP_CONTROL)
def test_missing_rules(self): def test_missing_rules(self):
"""Test: Skip XSBC-Original test when debian/rules is missing.""" """Test: Skip XSBC-Original test when debian/rules is missing."""
self._files["changelog"] = StringIO.StringIO(_LUCID_CHANGELOG) self._files["changelog"] = StringIO(_LUCID_CHANGELOG)
self._files["control"] = StringIO.StringIO(_ABP_CONTROL) self._files["control"] = StringIO(_ABP_CONTROL)
self._files["rules"] = None self._files["rules"] = None
update_maintainer(self._directory) update_maintainer(self._directory)
self.assertEqual(self._files["control"].getvalue(), _ABP_UPDATED) self.assertEqual(self._files["control"].getvalue(), _ABP_UPDATED)

View File

@ -14,6 +14,8 @@
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF # ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
# OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. # OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
from __future__ import print_function
"""This module is for updating the Maintainer field of an Ubuntu package.""" """This module is for updating the Maintainer field of an Ubuntu package."""
import os import os
@ -125,7 +127,7 @@ def _find_files(debian_directory, verbose):
if os.path.isfile(rules_file) and \ if os.path.isfile(rules_file) and \
'XSBC-Original-' in open(rules_file).read(): 'XSBC-Original-' in open(rules_file).read():
if verbose: if verbose:
print "XSBC-Original is managed by 'rules' file. Doing nothing." print("XSBC-Original is managed by 'rules' file. Doing nothing.")
control_files = [] control_files = []
return (changelog_file, control_files) return (changelog_file, control_files)
@ -144,7 +146,7 @@ def update_maintainer(debian_directory, verbose=False):
""" """
try: try:
changelog_file, control_files = _find_files(debian_directory, verbose) changelog_file, control_files = _find_files(debian_directory, verbose)
except MaintainerUpdateException, e: except MaintainerUpdateException as e:
Logger.error(str(e)) Logger.error(str(e))
raise raise
@ -159,8 +161,8 @@ def update_maintainer(debian_directory, verbose=False):
if original_maintainer.strip().lower() in _PREVIOUS_UBUNTU_MAINTAINER: if original_maintainer.strip().lower() in _PREVIOUS_UBUNTU_MAINTAINER:
if verbose: if verbose:
print "The old maintainer was: %s" % original_maintainer print("The old maintainer was: %s" % original_maintainer)
print "Resetting as: %s" % _UBUNTU_MAINTAINER print("Resetting as: %s" % _UBUNTU_MAINTAINER)
control.set_maintainer(_UBUNTU_MAINTAINER) control.set_maintainer(_UBUNTU_MAINTAINER)
control.save() control.save()
continue continue
@ -173,7 +175,7 @@ def update_maintainer(debian_directory, verbose=False):
if distribution in ("stable", "testing", "unstable", "experimental"): if distribution in ("stable", "testing", "unstable", "experimental"):
if verbose: if verbose:
print "The package targets Debian. Doing nothing." print("The package targets Debian. Doing nothing.")
return return
if control.get_original_maintainer() is not None: if control.get_original_maintainer() is not None:
@ -181,8 +183,8 @@ def update_maintainer(debian_directory, verbose=False):
control.get_original_maintainer()) control.get_original_maintainer())
if verbose: if verbose:
print "The original maintainer is: %s" % original_maintainer print("The original maintainer is: %s" % original_maintainer)
print "Resetting as: %s" % _UBUNTU_MAINTAINER print("Resetting as: %s" % _UBUNTU_MAINTAINER)
control.set_original_maintainer(original_maintainer) control.set_original_maintainer(original_maintainer)
control.set_maintainer(_UBUNTU_MAINTAINER) control.set_maintainer(_UBUNTU_MAINTAINER)
control.save() control.save()
@ -194,7 +196,7 @@ def restore_maintainer(debian_directory, verbose=False):
"""Restore the original maintainer""" """Restore the original maintainer"""
try: try:
changelog_file, control_files = _find_files(debian_directory, verbose) changelog_file, control_files = _find_files(debian_directory, verbose)
except MaintainerUpdateException, e: except MaintainerUpdateException as e:
Logger.error(str(e)) Logger.error(str(e))
raise raise
@ -204,7 +206,7 @@ def restore_maintainer(debian_directory, verbose=False):
if not orig_maintainer: if not orig_maintainer:
continue continue
if verbose: if verbose:
print "Restoring original maintainer: %s" % orig_maintainer print("Restoring original maintainer: %s" % orig_maintainer)
control.set_maintainer(orig_maintainer) control.set_maintainer(orig_maintainer)
control.remove_original_maintainer() control.remove_original_maintainer()
control.save() control.save()