mirror of
				https://git.launchpad.net/ubuntu-dev-tools
				synced 2025-10-20 16:44:03 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			152 lines
		
	
	
		
			5.1 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			152 lines
		
	
	
		
			5.1 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| #
 | |
| #   libsupport.py - functions which add launchpadlib support to the Ubuntu
 | |
| #                     Developer Tools package.
 | |
| #
 | |
| #   Copyright (C) 2009 Markus Korn <thekorn@gmx.de>
 | |
| #
 | |
| #   This program is free software; you can redistribute it and/or
 | |
| #   modify it under the terms of the GNU General Public License
 | |
| #   as published by the Free Software Foundation; either version 3
 | |
| #   of the License, or (at your option) any later version.
 | |
| #
 | |
| #   This program is distributed in the hope that it will be useful,
 | |
| #   but WITHOUT ANY WARRANTY; without even the implied warranty of
 | |
| #   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | |
| #   GNU General Public License for more details.
 | |
| #
 | |
| #   Please see the /usr/share/common-licenses/GPL file for the full text of
 | |
| #   the GNU General Public License license.
 | |
| #
 | |
| 
 | |
| # Modules.
 | |
| import glob
 | |
| import os
 | |
| import sys
 | |
| import urllib
 | |
| import urlparse
 | |
| import httplib2
 | |
| 
 | |
| try:
 | |
|     from launchpadlib.credentials import Credentials
 | |
|     from launchpadlib.launchpad import Launchpad
 | |
|     from launchpadlib.errors import HTTPError
 | |
| except ImportError:
 | |
|     print ("Unable to import launchpadlib module, is python-launchpadlib "
 | |
|            "installed?")
 | |
|     sys.exit(1)
 | |
| except:
 | |
|     Credentials = None
 | |
|     Launchpad = None
 | |
| 
 | |
| from ubuntutools.lp import (service, api_version)
 | |
| 
 | |
| def find_credentials(consumer, files, level=None):
 | |
|     """ search for credentials matching 'consumer' in path for given access
 | |
|         level. """
 | |
|     if Credentials is None:
 | |
|         raise ImportError
 | |
| 
 | |
|     for f in files:
 | |
|         cred = Credentials()
 | |
|         try:
 | |
|             cred.load(open(f))
 | |
|         except:
 | |
|             continue
 | |
|         if cred.consumer.key == consumer:
 | |
|             return cred
 | |
| 
 | |
|     raise IOError("No credentials found for '%s', please see the "
 | |
|                   "manage-credentials manpage for help on how to create "
 | |
|                   "one for this consumer." % consumer)
 | |
| 
 | |
| def get_credentials(consumer, cred_file=None, level=None):
 | |
|     files = list()
 | |
| 
 | |
|     if cred_file:
 | |
|         files.append(cred_file)
 | |
| 
 | |
|     if "LPCREDENTIALS" in os.environ:
 | |
|         files.append(os.environ["LPCREDENTIALS"])
 | |
| 
 | |
|     files.append(os.path.join(os.getcwd(), "lp_credentials.txt"))
 | |
| 
 | |
|     # Add all files which have our consumer name to file listing.
 | |
|     for x in glob.glob(os.path.expanduser("~/.cache/lp_credentials/%s*.txt" %
 | |
|                                           consumer)):
 | |
|         files.append(x)
 | |
| 
 | |
|     return find_credentials(consumer, files, level)
 | |
| 
 | |
| def get_launchpad(consumer, server=service, cache=None,
 | |
|                   cred_file=None, level=None):
 | |
|     credentials = get_credentials(consumer, cred_file, level)
 | |
|     cache = cache or os.environ.get("LPCACHE", None)
 | |
|     return Launchpad(credentials, server, cache, version=api_version)
 | |
| 
 | |
| def query_to_dict(query_string):
 | |
|     result = dict()
 | |
|     options = filter(None, query_string.split("&"))
 | |
|     for opt in options:
 | |
|         key, value = opt.split("=")
 | |
|         result.setdefault(key, set()).add(value)
 | |
|     return result
 | |
| 
 | |
| def translate_web_api(url, launchpad):
 | |
|     scheme, netloc, path, query, fragment = urlparse.urlsplit(url)
 | |
|     query = query_to_dict(query)
 | |
| 
 | |
|     differences = set(netloc.split('.')).symmetric_difference(
 | |
|             set(launchpad._root_uri.host.split('.')))
 | |
|     if ('staging' in differences or 'edge' in differences):
 | |
|         raise ValueError("url conflict (url: %s, root: %s" %
 | |
|                          (url, launchpad._root_uri))
 | |
|     if path.endswith("/+bugs"):
 | |
|         path = path[:-6]
 | |
|         if "ws.op" in query:
 | |
|             raise ValueError("Invalid web url, url: %s" %url)
 | |
|         query["ws.op"] = "searchTasks"
 | |
|     scheme, netloc, api_path, _, _ = urlparse.urlsplit(str(launchpad._root_uri))
 | |
|     query = urllib.urlencode(query)
 | |
|     url = urlparse.urlunsplit((scheme, netloc, api_path + path.lstrip("/"),
 | |
|                                query, fragment))
 | |
|     return url
 | |
| 
 | |
| def translate_api_web(self_url):
 | |
|     return self_url.replace("api.", "").replace("%s/" % (api_version), "")
 | |
| 
 | |
| LEVEL = {
 | |
|     0: "UNAUTHORIZED",
 | |
|     1: "READ_PUBLIC",
 | |
|     2: "WRITE_PUBLIC",
 | |
|     3: "READ_PRIVATE",
 | |
|     4: "WRITE_PRIVATE"
 | |
| }
 | |
| 
 | |
| def approve_application(credentials, email, password, level, web_root,
 | |
|         context):
 | |
|     authorization_url = credentials.get_request_token(context, web_root)
 | |
|     if level in LEVEL:
 | |
|         level = 'field.actions.%s' % LEVEL[level]
 | |
|     elif level in LEVEL.values():
 | |
|         level = 'field.actions.%s' % level
 | |
|     elif (str(level).startswith("field.actions") and
 | |
|           str(level).split(".")[-1] in LEVEL):
 | |
|         pass
 | |
|     else:
 | |
|         raise ValueError("Unknown access level '%s'" %level)
 | |
| 
 | |
|     params = {level: 1,
 | |
|         "oauth_token": credentials._request_token.key,
 | |
|         "lp.context": context or ""}
 | |
| 
 | |
|     lp_creds = ":".join((email, password))
 | |
|     basic_auth = "Basic %s" % (lp_creds.encode('base64'))
 | |
|     headers = {'Authorization': basic_auth}
 | |
|     response, content = httplib2.Http().request(authorization_url,
 | |
|         method="POST", body=urllib.urlencode(params), headers=headers)
 | |
|     if int(response["status"]) != 200:
 | |
|         if not 300 <= int(response["status"]) <= 400: # this means redirection
 | |
|             raise HTTPError(response, content)
 | |
|     credentials.exchange_request_token_for_access_token(web_root)
 | |
|     return credentials
 |