Wrap all long lines in ubuntutools.

This commit is contained in:
Benjamin Drung 2010-12-23 20:42:21 +01:00
parent fec33a6a41
commit c28ddf5698
5 changed files with 82 additions and 50 deletions

View File

@ -31,7 +31,8 @@ try:
from launchpadlib.launchpad import Launchpad
from launchpadlib.errors import HTTPError
except ImportError:
print "Unable to import launchpadlib module, is python-launchpadlib installed?"
print ("Unable to import launchpadlib module, is python-launchpadlib "
"installed?")
sys.exit(1)
except:
Credentials = None
@ -40,7 +41,8 @@ except:
from ubuntutools.lp import (service, api_version)
def find_credentials(consumer, files, level=None):
""" search for credentials matching 'consumer' in path for given access level. """
""" search for credentials matching 'consumer' in path for given access
level. """
if Credentials is None:
raise ImportError
@ -53,8 +55,8 @@ def find_credentials(consumer, files, level=None):
if cred.consumer.key == consumer:
return cred
raise IOError("No credentials found for '%s', please see the " \
"manage-credentials manpage for help on how to create " \
raise IOError("No credentials found for '%s', please see the "
"manage-credentials manpage for help on how to create "
"one for this consumer." % consumer)
def get_credentials(consumer, cred_file=None, level=None):
@ -69,7 +71,7 @@ def get_credentials(consumer, cred_file=None, level=None):
files.append(os.path.join(os.getcwd(), "lp_credentials.txt"))
# Add all files which have our consumer name to file listing.
for x in glob.glob(os.path.expanduser("~/.cache/lp_credentials/%s*.txt" % \
for x in glob.glob(os.path.expanduser("~/.cache/lp_credentials/%s*.txt" %
consumer)):
files.append(x)
@ -96,7 +98,8 @@ def translate_web_api(url, launchpad):
differences = set(netloc.split('.')).symmetric_difference(
set(launchpad._root_uri.host.split('.')))
if ('staging' in differences or 'edge' in differences):
raise ValueError("url conflict (url: %s, root: %s" %(url, launchpad._root_uri))
raise ValueError("url conflict (url: %s, root: %s" %
(url, launchpad._root_uri))
if path.endswith("/+bugs"):
path = path[:-6]
if "ws.op" in query:
@ -104,7 +107,8 @@ def translate_web_api(url, launchpad):
query["ws.op"] = "searchTasks"
scheme, netloc, api_path, _, _ = urlparse.urlsplit(str(launchpad._root_uri))
query = urllib.urlencode(query)
url = urlparse.urlunsplit((scheme, netloc, api_path + path.lstrip("/"), query, fragment))
url = urlparse.urlunsplit((scheme, netloc, api_path + path.lstrip("/"),
query, fragment))
return url
def translate_api_web(self_url):
@ -122,10 +126,11 @@ def approve_application(credentials, email, password, level, web_root,
context):
authorization_url = credentials.get_request_token(context, web_root)
if level in LEVEL:
level = 'field.actions.%s' %LEVEL[level]
level = 'field.actions.%s' % LEVEL[level]
elif level in LEVEL.values():
level = 'field.actions.%s' %level
elif str(level).startswith("field.actions") and str(level).split(".")[-1] in LEVEL:
level = 'field.actions.%s' % level
elif (str(level).startswith("field.actions") and
str(level).split(".")[-1] in LEVEL):
pass
else:
raise ValueError("Unknown access level '%s'" %level)
@ -135,7 +140,7 @@ def approve_application(credentials, email, password, level, web_root,
"lp.context": context or ""}
lp_creds = ":".join((email, password))
basic_auth = "Basic %s" %(lp_creds.encode('base64'))
basic_auth = "Basic %s" % (lp_creds.encode('base64'))
headers = {'Authorization': basic_auth}
response, content = httplib2.Http().request(authorization_url,
method="POST", body=urllib.urlencode(params), headers=headers)

View File

@ -49,6 +49,8 @@ __all__ = [
'SourcePackagePublishingHistory',
]
_POCKETS = ('Release', 'Security', 'Updates', 'Proposed', 'Backports')
class Launchpad(object):
'''Singleton for LP API access.'''
@ -67,8 +69,8 @@ class Launchpad(object):
def login_anonymously(self, service=service, api_version=api_version):
'''Enforce an anonymous login.'''
if '_Launchpad__lp' not in self.__dict__:
self.__lp = launchpad.Launchpad.login_anonymously('ubuntu-dev-tools',
service_root=service, version=api_version)
self.__lp = launchpad.Launchpad.login_anonymously(
'ubuntu-dev-tools', service_root=service, version=api_version)
else:
raise AlreadyLoggedInError('Already logged in to Launchpad.')
@ -89,7 +91,8 @@ class MetaWrapper(type):
def __init__(cls, name, bases, attrd):
super(MetaWrapper, cls).__init__(name, bases, attrd)
if 'resource_type' not in attrd:
raise TypeError('Class "%s" needs an associated resource type' % name)
raise TypeError('Class "%s" needs an associated resource type' % \
name)
cls._cache = dict()
@ -101,7 +104,8 @@ class BaseWrapper(object):
resource_type = None # it's a base class after all
def __new__(cls, data):
if isinstance(data, basestring) and data.startswith(str(Launchpad._root_uri)):
if (isinstance(data, basestring) and
data.startswith(str(Launchpad._root_uri))):
# looks like a LP API URL
# check if it's already cached
cached = cls._cache.get(data)
@ -117,7 +121,8 @@ class BaseWrapper(object):
if isinstance(data, Entry):
(service_root, resource_type) = data.resource_type_link.split('#')
if service_root == str(Launchpad._root_uri) and resource_type in cls.resource_type:
if (service_root == str(Launchpad._root_uri) and
resource_type in cls.resource_type):
# check if it's already cached
cached = cls._cache.get(data.self_link)
if not cached:
@ -132,14 +137,16 @@ class BaseWrapper(object):
cache(cached)
return cached
else:
raise TypeError("'%s' is not a '%s' object" % (str(data), str(cls.resource_type)))
raise TypeError("'%s' is not a '%s' object" %
(str(data), str(cls.resource_type)))
else:
# not a LP API representation, let the specific class handle it
fetch = getattr(cls, 'fetch', None)
if callable(fetch):
return fetch(data)
else:
raise NotImplementedError("Don't know how to fetch '%s' from LP" % str(data))
raise NotImplementedError("Don't know how to fetch '%s' from LP"
% str(data))
def __call__(self):
return self._lpobject
@ -149,7 +156,8 @@ class BaseWrapper(object):
def __repr__(self):
if hasattr(str, 'format'):
return '<{0}: {1!r}>'.format(self.__class__.__name__, self._lpobject)
return '<{0}: {1!r}>'.format(self.__class__.__name__,
self._lpobject)
else:
return '<%s: %r>' % (self.__class__.__name__, self._lpobject)
@ -202,7 +210,9 @@ class Distribution(BaseWrapper):
if res:
return res
else:
raise ArchiveNotFoundException("The Archive '%s' doesn't exist in %s" % (archive, self.display_name))
message = "The Archive '%s' doesn't exist in %s" % \
(archive, self.display_name)
raise ArchiveNotFoundException(message)
else:
if not '_main_archive' in self.__dict__:
self._main_archive = Archive(self.main_archive_link)
@ -216,12 +226,14 @@ class Distribution(BaseWrapper):
'''
if name_or_version not in self._series:
try:
series = DistroSeries(self().getSeries(name_or_version = name_or_version))
series = DistroSeries(self().getSeries(name_or_version))
# Cache with name and version
self._series[series.name] = series
self._series[series.version] = series
except HTTPError:
raise SeriesNotFoundException("Release '%s' is unknown in '%s'." % (name_or_version, self.display_name))
message = "Release '%s' is unknown in '%s'." % \
(name_or_version, self.display_name)
raise SeriesNotFoundException(message)
return self._series[name_or_version]
def getDevelopmentSeries(self):
@ -266,8 +278,9 @@ class Archive(BaseWrapper):
PackageNotFoundException is raised.
'''
# Check if pocket has a valid value
if pocket not in ('Release', 'Security', 'Updates', 'Proposed', 'Backports'):
raise PocketDoesNotExistError("Pocket '%s' does not exist." % pocket)
if pocket not in _POCKETS:
raise PocketDoesNotExistError("Pocket '%s' does not exist." %
pocket)
dist = Distribution(self.distribution_link)
# Check if series is already a DistoSeries object or not
@ -278,9 +291,10 @@ class Archive(BaseWrapper):
series = dist.getDevelopmentSeries()
# NOTE:
# For Debian all source publication are in the state 'Pending' so filter on this
# instead of 'Published'. As the result is sorted also by date the first result
# will be the most recent one (i.e. the one we are interested in).
# For Debian all source publication are in the state 'Pending' so
# filter on this instead of 'Published'. As the result is sorted
# also by date the first result will be the most recent one
# (i.e. the one we are interested in).
if dist.name in ('debian',):
state = 'Pending'
else:
@ -288,17 +302,20 @@ class Archive(BaseWrapper):
if (name, series.name, pocket) not in self._srcpkgs:
try:
srcpkg = self.getPublishedSources(
source_name = name, distro_series = series(), pocket = pocket,
status = state, exact_match = True)[0]
self._srcpkgs[(name, series.name, pocket)] = SourcePackagePublishingHistory(srcpkg)
srcpkg = self.getPublishedSources(source_name=name,
distro_series=series(),
pocket=pocket,
status=state,
exact_match=True)[0]
index = (name, series.name, pocket)
self._srcpkgs[index] = SourcePackagePublishingHistory(srcpkg)
except IndexError:
msg = "The package '%s' does not exist in the %s %s archive" % \
(name, dist.display_name, self.name)
if pocket == 'Release':
msg = "The package '%s' does not exist in the %s %s archive in '%s'" % \
(name, dist.display_name, self.name, series.name)
msg += " in '%s'" % series.name
else:
msg = "The package '%s' does not exist in the %s %s archive in '%s-%s'" % \
(name, dist.display_name, self.name, series.name, pocket.lower())
msg += " in '%s-%s'" % (series.name, pocket.lower())
raise PackageNotFoundException(msg)
return self._srcpkgs[(name, series.name, pocket)]
@ -311,7 +328,8 @@ class SourcePackagePublishingHistory(BaseWrapper):
resource_type = 'source_package_publishing_history'
def __init__(self, *args):
# Don't share _builds between different SourcePackagePublishingHistory objects
# Don't share _builds between different
# SourcePackagePublishingHistory objects
if '_builds' not in self.__dict__:
self._builds = dict()
@ -444,7 +462,8 @@ class PersonTeam(BaseWrapper):
'''
return any(t.name == team for t in self.super_teams)
def canUploadPackage(self, archive, distroseries, package, component, pocket='Release'):
def canUploadPackage(self, archive, distroseries, package, component,
pocket='Release'):
'''Check if the person or team has upload rights for the source
package to the specified 'archive' and 'distrorelease'.
@ -461,11 +480,14 @@ class PersonTeam(BaseWrapper):
if component is not None and not isinstance(component, basestring):
raise TypeError('A component name expected.')
if package is None and component is None:
raise ValueError('Either a source package name or a component has to be specified.')
if pocket not in ('Release', 'Security', 'Updates', 'Proposed', 'Backports'):
raise PocketDoesNotExistError("Pocket '%s' does not exist." % pocket)
raise ValueError('Either a source package name or a component has '
'to be specified.')
if pocket not in _POCKETS:
raise PocketDoesNotExistError("Pocket '%s' does not exist." %
pocket)
canUpload = self._upload.get((archive, distroseries, pocket, package, component))
canUpload = self._upload.get((archive, distroseries, pocket, package,
component))
if canUpload is None:
# checkUpload() throws an exception if the person can't upload
@ -483,7 +505,8 @@ class PersonTeam(BaseWrapper):
canUpload = False
else:
raise e
self._upload[(archive, distroseries, pocket, package, component)] = canUpload
index = (archive, distroseries, pocket, package, component)
self._upload[index] = canUpload
return canUpload

View File

@ -43,7 +43,8 @@ def system_distribution():
p = Popen(('lsb_release', '-cs'), stdout=PIPE)
output = p.communicate()[0]
except OSError:
print 'Error: Could not determine what distribution you are running.'
print ('Error: Could not determine what distribution you are '
'running.')
return None
if p.returncode != 0:
print 'Error determininng system distribution'
@ -111,6 +112,7 @@ def splitReleasePocket(release):
if pocket not in ('Release', 'Security', 'Updates', 'Proposed',
'Backports'):
raise PocketDoesNotExistError("Pocket '%s' does not exist." % pocket)
raise PocketDoesNotExistError("Pocket '%s' does not exist." % \
pocket)
return (release, pocket)

View File

@ -58,10 +58,11 @@ def checkSourceExists(package, release):
pocket = 'release'
try:
page = urllib2.urlopen('https://launchpad.net/ubuntu/+source/' + package).read()
url = 'https://launchpad.net/ubuntu/+source/' + package
page = urllib2.urlopen(url).read()
m = re.search('<td>%s</td>\s*\n.*"/ubuntu/%s/\+source/%s/(\d[^"]+)"' % (
pocket, release, package.replace('+', '\+')), page)
m = re.search('<td>%s</td>\s*\n.*"/ubuntu/%s/\+source/%s/(\d[^"]+)"' % \
(pocket, release, package.replace('+', '\+')), page)
if not m:
print >> sys.stderr, "Unable to find source package '%s' in " \
"the %s-%s pocket." % (package, release.capitalize(), pocket)

View File

@ -29,7 +29,8 @@ BLACKLIST = {
'edit-patch': 'No Help',
'get-build-deps': 'No Help, runs sudo',
'grep-merges': 'No Help',
'lp-project-upload': 'Returns non-zero after help. Leaving u-d-t in LP: #524680',
'lp-project-upload': 'Returns non-zero after help. '
'Leaving u-d-t in LP: #524680',
'mk-sbuild': 'Fires up apt-get before showing help',
'pbuilder-dist-simple': 'No Help',
'setup-packaging-environment': 'Throws Error',