2010-12-03 00:06:43 +01:00

147 lines
5.0 KiB
Python

#
# libsupport.py - functions which add launchpadlib support to the Ubuntu
# Developer Tools package.
#
# Copyright (C) 2009 Markus Korn <thekorn@gmx.de>
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 3
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# Please see the /usr/share/common-licenses/GPL file for the full text of
# the GNU General Public License license.
#
# Modules.
import glob
import os
import sys
import urllib
import urlparse
import httplib2
try:
from launchpadlib.credentials import Credentials
from launchpadlib.launchpad import Launchpad
from launchpadlib.errors import HTTPError
except ImportError:
print "Unable to import launchpadlib module, is python-launchpadlib installed?"
sys.exit(1)
except:
Credentials = None
Launchpad = None
from ubuntutools.lp import (service, api_version)
def find_credentials(consumer, files, level=None):
""" search for credentials matching 'consumer' in path for given access level. """
if Credentials is None:
raise ImportError
for f in files:
cred = Credentials()
try:
cred.load(open(f))
except:
continue
if cred.consumer.key == consumer:
return cred
raise IOError("No credentials found for '%s', please see the " \
"manage-credentials manpage for help on how to create " \
"one for this consumer." % consumer)
def get_credentials(consumer, cred_file=None, level=None):
files = list()
if cred_file:
files.append(cred_file)
if "LPCREDENTIALS" in os.environ:
files.append(os.environ["LPCREDENTIALS"])
files.append(os.path.join(os.getcwd(), "lp_credentials.txt"))
# Add all files which have our consumer name to file listing.
for x in glob.glob(os.path.expanduser("~/.cache/lp_credentials/%s*.txt" % \
consumer)):
files.append(x)
return find_credentials(consumer, files, level)
def get_launchpad(consumer, server=service, cache=None,
cred_file=None, level=None):
credentials = get_credentials(consumer, cred_file, level)
cache = cache or os.environ.get("LPCACHE", None)
return Launchpad(credentials, server, cache, version=api_version)
def query_to_dict(query_string):
result = dict()
options = filter(None, query_string.split("&"))
for opt in options:
key, value = opt.split("=")
result.setdefault(key, set()).add(value)
return result
def translate_web_api(url, launchpad):
scheme, netloc, path, query, fragment = urlparse.urlsplit(url)
query = query_to_dict(query)
differences = set(netloc.split('.')).symmetric_difference(
set(launchpad._root_uri.host.split('.')))
if ('staging' in differences or 'edge' in differences):
raise ValueError("url conflict (url: %s, root: %s" %(url, launchpad._root_uri))
if path.endswith("/+bugs"):
path = path[:-6]
if "ws.op" in query:
raise ValueError("Invalid web url, url: %s" %url)
query["ws.op"] = "searchTasks"
scheme, netloc, api_path, _, _ = urlparse.urlsplit(str(launchpad._root_uri))
query = urllib.urlencode(query)
url = urlparse.urlunsplit((scheme, netloc, api_path + path.lstrip("/"), query, fragment))
return url
def translate_api_web(self_url):
return self_url.replace("api.", "").replace("%s/" % (api_version), "")
LEVEL = {
0: "UNAUTHORIZED",
1: "READ_PUBLIC",
2: "WRITE_PUBLIC",
3: "READ_PRIVATE",
4: "WRITE_PRIVATE"
}
def approve_application(credentials, email, password, level, web_root,
context):
authorization_url = credentials.get_request_token(context, web_root)
if level in LEVEL:
level = 'field.actions.%s' %LEVEL[level]
elif level in LEVEL.values():
level = 'field.actions.%s' %level
elif str(level).startswith("field.actions") and str(level).split(".")[-1] in LEVEL:
pass
else:
raise ValueError("Unknown access level '%s'" %level)
params = {level: 1,
"oauth_token": credentials._request_token.key,
"lp.context": context or ""}
lp_creds = ":".join((email, password))
basic_auth = "Basic %s" %(lp_creds.encode('base64'))
headers = {'Authorization': basic_auth}
response, content = httplib2.Http().request(authorization_url,
method="POST", body=urllib.urlencode(params), headers=headers)
if int(response["status"]) != 200:
if not 300 <= int(response["status"]) <= 400: # this means redirection
raise HTTPError(response, content)
credentials.exchange_request_token_for_access_token(web_root)
return credentials