mirror of
				https://github.com/lubuntu-team/ppa-britney.git
				synced 2025-10-26 06:04:03 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			498 lines
		
	
	
		
			16 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
			
		
		
	
	
			498 lines
		
	
	
		
			16 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
| #! /usr/bin/python
 | |
| 
 | |
| # Copyright (C) 2012 Canonical Ltd.
 | |
| # Author: Colin Watson <cjwatson@ubuntu.com>
 | |
| 
 | |
| # This program is free software: you can redistribute it and/or modify
 | |
| # it under the terms of the GNU General Public License as published by
 | |
| # the Free Software Foundation; version 3 of the License.
 | |
| #
 | |
| # This program is distributed in the hope that it will be useful,
 | |
| # but WITHOUT ANY WARRANTY; without even the implied warranty of
 | |
| # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | |
| # GNU General Public License for more details.
 | |
| #
 | |
| # You should have received a copy of the GNU General Public License
 | |
| # along with this program.  If not, see <http://www.gnu.org/licenses/>.
 | |
| 
 | |
| """Manipulate Ubuntu upload queues."""
 | |
| 
 | |
| from __future__ import print_function
 | |
| 
 | |
| import collections
 | |
| from datetime import datetime
 | |
| from operator import attrgetter
 | |
| from optparse import OptionParser, SUPPRESS_HELP
 | |
| import os
 | |
| import sys
 | |
| try:
 | |
|     from urllib.parse import unquote, urlsplit
 | |
|     from urllib.request import urlretrieve
 | |
| except ImportError:
 | |
|     from urllib import unquote, urlretrieve
 | |
|     from urlparse import urlsplit
 | |
| 
 | |
| from launchpadlib.launchpad import Launchpad
 | |
| import pytz
 | |
| 
 | |
| import lputils
 | |
| 
 | |
| 
 | |
| CONSUMER_KEY = "queue"
 | |
| 
 | |
| 
 | |
| queue_names = (
 | |
|     "New",
 | |
|     "Unapproved",
 | |
|     "Accepted",
 | |
|     "Done",
 | |
|     "Rejected",
 | |
| )
 | |
| 
 | |
| 
 | |
| now = datetime.now(pytz.timezone("UTC"))
 | |
| 
 | |
| 
 | |
| def queue_item(options, queue_id):
 | |
|     """Load a queue item by its numeric ID."""
 | |
|     return options.launchpad.load('%s%s/%s/+upload/%s' % (
 | |
|         options.launchpad._root_uri.ensureSlash(), options.distribution.name,
 | |
|         options.series.name, queue_id))
 | |
| 
 | |
| 
 | |
| def queue_item_allowed(options, item):
 | |
|     # Rather than using item.contains_build, treat anything that isn't
 | |
|     # sourceful as binaryful.  This allows us to treat copies of custom
 | |
|     # uploads (which have none of contains_source, contains_copy, or
 | |
|     # contains_build) as binaryful.  However, copies may contain both source
 | |
|     # and binaries.
 | |
|     sourceful = item.contains_source or item.contains_copy
 | |
|     binaryful = not item.contains_source or item.contains_copy
 | |
|     if options.source and sourceful:
 | |
|         return True
 | |
|     elif options.binary and binaryful:
 | |
|         return True
 | |
|     else:
 | |
|         return False
 | |
| 
 | |
| 
 | |
| def queue_items(options, args):
 | |
|     if not args:
 | |
|         args = ['']
 | |
| 
 | |
|     items = collections.OrderedDict()
 | |
|     for arg in args:
 | |
|         arg = arg.strip()
 | |
|         if arg.isdigit():
 | |
|             item = queue_item(options, arg)
 | |
|             if item in items:
 | |
|                 continue
 | |
|             if item.status != options.queue:
 | |
|                 raise ValueError(
 | |
|                     "Item %s is in queue %s, not requested queue %s" %
 | |
|                     (item.id, item.status, options.queue))
 | |
|             if (item.distroseries != options.series or
 | |
|                     item.pocket != options.pocket):
 | |
|                 if item.pocket == "Release":
 | |
|                     item_suite = item.distroseries.name
 | |
|                 else:
 | |
|                     item_suite = "%s-%s" % (
 | |
|                         item.distroseries.name, item.pocket.lower())
 | |
|                 raise ValueError("Item %s is in %s/%s not in %s/%s" % (
 | |
|                                  item.id, item.distroseries.distribution.name,
 | |
|                                  item_suite, options.distribution.name,
 | |
|                                  options.suite))
 | |
|             if queue_item_allowed(options, item):
 | |
|                 items[item] = 1
 | |
|         else:
 | |
|             kwargs = {}
 | |
|             if "/" in arg:
 | |
|                 kwargs["name"], kwargs["version"] = arg.split("/")
 | |
|             elif arg:
 | |
|                 kwargs["name"] = arg
 | |
|             new_items = options.series.getPackageUploads(
 | |
|                 archive=options.archive, pocket=options.pocket,
 | |
|                 status=options.queue, exact_match=options.exact_match,
 | |
|                 **kwargs)
 | |
|             for item in new_items:
 | |
|                 if queue_item_allowed(options, item):
 | |
|                     items[item] = 1
 | |
| 
 | |
|     return items
 | |
| 
 | |
| 
 | |
| #XXX cprov 2006-09-19: We need to use template engine instead of hardcoded
 | |
| # format variables.
 | |
| HEAD = "-" * 9 + "|----|" + "-" * 22 + "|" + "-" * 22 + "|" + "-" * 15
 | |
| FOOT_MARGIN = " " * (9 + 6 + 1 + 22 + 1 + 22 + 2)
 | |
| 
 | |
| 
 | |
| def make_tag(item):
 | |
|     if item.contains_copy:
 | |
|         return "X-"
 | |
|     else:
 | |
|         return (("S" if item.contains_source else "-") +
 | |
|                 ("B" if item.contains_build else "-"))
 | |
| 
 | |
| 
 | |
| def approximate_age(time):
 | |
|     """Return a nicely-formatted approximate age."""
 | |
|     seconds = int((now - time).total_seconds())
 | |
|     if seconds == 1:
 | |
|         return "1 second"
 | |
|     elif seconds < 60:
 | |
|         return "%d seconds" % seconds
 | |
| 
 | |
|     minutes = int(round(seconds / 60.0))
 | |
|     if minutes == 1:
 | |
|         return "1 minute"
 | |
|     elif minutes < 60:
 | |
|         return "%d minutes" % minutes
 | |
| 
 | |
|     hours = int(round(minutes / 60.0))
 | |
|     if hours == 1:
 | |
|         return "1 hour"
 | |
|     elif hours < 48:
 | |
|         return "%d hours" % hours
 | |
| 
 | |
|     days = int(round(hours / 24.0))
 | |
|     if days == 1:
 | |
|         return "1 day"
 | |
|     elif days < 14:
 | |
|         return "%d days" % days
 | |
| 
 | |
|     weeks = int(round(days / 7.0))
 | |
|     if weeks == 1:
 | |
|         return "1 week"
 | |
|     else:
 | |
|         return "%d weeks" % weeks
 | |
| 
 | |
| 
 | |
| def show_item_main(item):
 | |
|     tag = make_tag(item)
 | |
|     # TODO truncation sucks
 | |
|     print("%8d | %s | %s | %s | %s" %
 | |
|           (item.id, tag, item.display_name.ljust(20)[:20],
 | |
|            item.display_version.ljust(20)[:20],
 | |
|            approximate_age(item.date_created)))
 | |
| 
 | |
| 
 | |
| def show_source(source):
 | |
|     print("\t | * %s/%s Component: %s Section: %s" %
 | |
|           (source.package_name, source.package_version,
 | |
|            source.component_name, source.section_name))
 | |
| 
 | |
| 
 | |
| def show_binary(binary):
 | |
|     if "customformat" in binary:
 | |
|         print("\t | * %s Format: %s" % (
 | |
|               binary["name"], binary["customformat"]))
 | |
|     else:
 | |
|         if binary["is_new"]:
 | |
|             status_flag = "N"
 | |
|         else:
 | |
|             status_flag = "*"
 | |
|         print("\t | %s %s/%s/%s "
 | |
|               "Component: %s Section: %s Priority: %s" % (
 | |
|                   status_flag, binary["name"], binary["version"],
 | |
|                   binary["architecture"], binary["component"],
 | |
|                   binary["section"], binary["priority"]))
 | |
| 
 | |
| 
 | |
| def show_item(item):
 | |
|     show_item_main(item)
 | |
|     if item.contains_copy or item.contains_source:
 | |
|         show_source(item)
 | |
|     if item.contains_build:
 | |
|         for binary in item.getBinaryProperties():
 | |
|             show_binary(binary)
 | |
| 
 | |
| 
 | |
| def display_name(item):
 | |
|     display = "%s/%s" % (item.display_name, item.display_version)
 | |
|     if item.contains_build:
 | |
|         display += " (%s)" % item.display_arches
 | |
|     return display
 | |
| 
 | |
| 
 | |
| def info(options, args):
 | |
|     """Show information on queue items."""
 | |
|     items = queue_items(options, args)
 | |
|     print("Listing %s/%s (%s) %s" %
 | |
|           (options.distribution.name, options.suite, options.queue,
 | |
|            len(items)))
 | |
|     print(HEAD)
 | |
|     for item in items:
 | |
|         show_item(item)
 | |
|     print(HEAD)
 | |
|     print(FOOT_MARGIN + str(len(items)))
 | |
|     return 0
 | |
| 
 | |
| 
 | |
| # Get librarian URLs for source_package_publishing_history or package_upload
 | |
| # objects
 | |
| def urls(options, item):
 | |
|     try:
 | |
|         if item.contains_copy:
 | |
|             archive = item.copy_source_archive
 | |
|             item = archive.getPublishedSources(
 | |
|                 exact_match=True, source_name=item.package_name,
 | |
|                 version=item.package_version)
 | |
|             if item:
 | |
|                 return urls(options, item[0])
 | |
|             else:
 | |
|                 print("Error: Can't find source package for copy")
 | |
|                 return []
 | |
|     except AttributeError:
 | |
|         # Not a package_upload
 | |
|         pass
 | |
| 
 | |
|     ret = []
 | |
|     try:
 | |
|         ret.append(item.changes_file_url)
 | |
|         ret.extend(item.customFileUrls())
 | |
|     except AttributeError:  # Copies won't have this
 | |
|         ret.append(item.changesFileUrl())
 | |
|     if options.source:
 | |
|         ret.extend(item.sourceFileUrls())
 | |
|     if options.binary:
 | |
|         ret.extend(item.binaryFileUrls())
 | |
|     # On staging we may get None URLs due to missing library files; filter
 | |
|     # these out.
 | |
|     ret = list(filter(None, ret))
 | |
|     return ret
 | |
| 
 | |
| 
 | |
| def fetch(options, args):
 | |
|     """Fetch the contents of a queue item."""
 | |
|     ret = 1
 | |
|     items = queue_items(options, args)
 | |
|     for item in items:
 | |
|         print("Fetching %s" % display_name(item))
 | |
|         fetch_item(options, item)
 | |
|         ret = 0
 | |
|     return ret
 | |
| 
 | |
| 
 | |
| def fetch_item(options, item):
 | |
|     for url in urls(options, item):
 | |
|         path = urlsplit(url)[2]
 | |
|         filename = unquote(path.split("/")[-1])
 | |
|         exists = os.path.exists(filename)
 | |
|         if options.overwrite or not exists:
 | |
|             print("Constructing %s (%s)" % (filename, url))
 | |
|             urlretrieve(url, filename)
 | |
|         elif exists:
 | |
|             print("Not overwriting existing %s with %s" %
 | |
|                   (filename, url))
 | |
| 
 | |
| 
 | |
| def show_urls(options, args):
 | |
|     """Show the URLs from which a queue item may be downloaded."""
 | |
|     items = queue_items(options, args)
 | |
|     for item in items:
 | |
|         for url in urls(options, item):
 | |
|             print(url)
 | |
|     return 0 if items else 1
 | |
| 
 | |
| 
 | |
| def accept(options, args):
 | |
|     """Accept a queue item."""
 | |
|     items = queue_items(options, args)
 | |
|     for item in sorted(items, key=attrgetter("id")):
 | |
|         if options.dry_run:
 | |
|             print("Would accept %s" % display_name(item))
 | |
|         else:
 | |
|             print("Accepting %s" % display_name(item))
 | |
|             item.acceptFromQueue()
 | |
|     return 0 if items else 1
 | |
| 
 | |
| 
 | |
| def reject(options, args):
 | |
|     """Reject a queue item."""
 | |
|     items = queue_items(options, args)
 | |
|     for item in sorted(items, key=attrgetter("id")):
 | |
|         if options.dry_run:
 | |
|             print("Would reject %s" % display_name(item))
 | |
|         else:
 | |
|             print("Rejecting %s" % display_name(item))
 | |
|             item.rejectFromQueue(comment=options.reject_comment)
 | |
|     return 0 if items else 1
 | |
| 
 | |
| 
 | |
| def override_source(options, item):
 | |
|     """Override properties of source packages in a queue item."""
 | |
|     kwargs = {}
 | |
|     if options.component:
 | |
|         kwargs["new_component"] = options.component
 | |
|     if options.section:
 | |
|         kwargs["new_section"] = options.section
 | |
| 
 | |
|     print("Overriding %s_%s (%s/%s)" % (
 | |
|         item.package_name, item.package_version,
 | |
|         item.component_name, item.section_name))
 | |
|     item.overrideSource(**kwargs)
 | |
|     show_item(options.launchpad.load(item.self_link))
 | |
|     return set((item.package_name,))
 | |
| 
 | |
| 
 | |
| def override_binary(options, args, item):
 | |
|     """Override properties of binary packages in a queue item."""
 | |
|     overridden = set()
 | |
|     changes = []
 | |
|     show_binaries = []
 | |
|     for binary in item.getBinaryProperties():
 | |
|         if binary["name"] in args:
 | |
|             overridden.add(binary["name"])
 | |
|             print("Overriding %s_%s (%s/%s/%s)" % (
 | |
|                 binary["name"], binary["version"],
 | |
|                 binary["component"], binary["section"], binary["priority"]))
 | |
|             change = {"name": binary["name"]}
 | |
|             if options.component is not None:
 | |
|                 change["component"] = options.component
 | |
|             if options.section is not None:
 | |
|                 change["section"] = options.section
 | |
|             if options.priority is not None:
 | |
|                 change["priority"] = options.priority
 | |
|             changes.append(change)
 | |
|             show_binaries.append(binary["name"])
 | |
|     if changes:
 | |
|         item.overrideBinaries(changes=changes)
 | |
|     if show_binaries:
 | |
|         show_item_main(item)
 | |
|         for binary in item.getBinaryProperties():
 | |
|             if binary["name"] in show_binaries:
 | |
|                 show_binary(binary)
 | |
|     return overridden
 | |
| 
 | |
| 
 | |
| def override(options, args):
 | |
|     """Override properties of packages in the queue.
 | |
| 
 | |
|     You may override the component (-c) or the section (-x).  In the case of
 | |
|     binary packages, you may also override the priority (-p).
 | |
|     """
 | |
|     overridden = set()
 | |
|     items = queue_items(options, args)
 | |
|     for item in items:
 | |
|         if item.contains_source or item.contains_copy:
 | |
|             overridden.update(override_source(options, item))
 | |
|         if item.contains_build:
 | |
|             overridden.update(override_binary(options, args, item))
 | |
|     not_overridden = set(args) - overridden
 | |
|     if not_overridden:
 | |
|         print("No matches for %s" % ",".join(sorted(not_overridden)))
 | |
|         return 1
 | |
|     else:
 | |
|         return 0
 | |
| 
 | |
| 
 | |
| def report(options, args):
 | |
|     """Show a report on the sizes of available queues."""
 | |
|     print("Report for %s/%s" % (options.distribution.name, options.suite))
 | |
|     for queue_name in queue_names:
 | |
|         items = options.series.getPackageUploads(
 | |
|             archive=options.archive, pocket=options.pocket, status=queue_name)
 | |
|         print(" %s -> %s entries" % (queue_name, len(items)))
 | |
|     return 0
 | |
| 
 | |
| 
 | |
| queue_actions = {
 | |
|     'info': info,
 | |
|     'fetch': fetch,
 | |
|     'show-urls': show_urls,
 | |
|     'accept': accept,
 | |
|     'reject': reject,
 | |
|     'override': override,
 | |
|     'report': report,
 | |
| }
 | |
| 
 | |
| 
 | |
| def main():
 | |
|     parser = OptionParser(
 | |
|         usage="usage: %prog [options] ACTION [...]",
 | |
|         description=(
 | |
|             "ACTION may be one of info, fetch, show-urls, accept, reject, "
 | |
|             "override, or report."),
 | |
|         epilog=lputils.ARCHIVE_REFERENCE_DESCRIPTION)
 | |
|     parser.add_option(
 | |
|         "-l", "--launchpad", dest="launchpad_instance", default="production")
 | |
|     parser.add_option("-A", "--archive", help="look in ARCHIVE")
 | |
|     parser.add_option(
 | |
|         "-s", "--suite", dest="suite", metavar="SUITE",
 | |
|         help="look in suite SUITE")
 | |
|     parser.add_option(
 | |
|         "-Q", "--queue", dest="queue", metavar="QUEUE", default="new",
 | |
|         help="consider packages in QUEUE")
 | |
|     parser.add_option(
 | |
|         "-n", "--dry-run", dest="dry_run", default=False, action="store_true",
 | |
|         help="don't make any modifications")
 | |
|     parser.add_option(
 | |
|         "-e", "--exact-match", dest="exact_match",
 | |
|         default=True, action="store_true",
 | |
|         help="treat name filter as an exact match")
 | |
|     parser.add_option(
 | |
|         "-E", "--no-exact-match", dest="exact_match", action="store_false",
 | |
|         help="treat name filter as a prefix match")
 | |
|     parser.add_option(
 | |
|         "-c", "--component", dest="component", metavar="COMPONENT",
 | |
|         help="when overriding, move package to COMPONENT")
 | |
|     parser.add_option(
 | |
|         "-x", "--section", dest="section", metavar="SECTION",
 | |
|         help="when overriding, move package to SECTION")
 | |
|     parser.add_option(
 | |
|         "-p", "--priority", dest="priority", metavar="PRIORITY",
 | |
|         help="when overriding, move package to PRIORITY")
 | |
|     parser.add_option(
 | |
|         "--source", dest="source", default=False, action="store_true",
 | |
|         help="only operate on source packages")
 | |
|     parser.add_option(
 | |
|         "--binary", dest="binary", default=False, action="store_true",
 | |
|         help="only operate on binary packages")
 | |
|     parser.add_option(
 | |
|         "--overwrite", dest="overwrite", default=False, action="store_true",
 | |
|         help="when fetching, overwrite existing files")
 | |
|     parser.add_option("-m", "--reject-comment", help="rejection comment")
 | |
| 
 | |
|     # Deprecated in favour of -A.
 | |
|     parser.add_option(
 | |
|         "-d", "--distribution", dest="distribution", default="ubuntu",
 | |
|         help=SUPPRESS_HELP)
 | |
|     parser.add_option("--ppa", help=SUPPRESS_HELP)
 | |
|     parser.add_option("--ppa-name", help=SUPPRESS_HELP)
 | |
|     parser.add_option(
 | |
|         "-j", "--partner", default=False, action="store_true",
 | |
|         help=SUPPRESS_HELP)
 | |
|     options, args = parser.parse_args()
 | |
| 
 | |
|     if not args:
 | |
|         parser.error("must select an action")
 | |
|     action = args.pop(0)
 | |
|     try:
 | |
|         queue_action = queue_actions[action]
 | |
|     except KeyError:
 | |
|         parser.error("unknown action: %s" % action)
 | |
| 
 | |
|     if action == "reject" and options.reject_comment is None:
 | |
|         parser.error("rejections must supply a rejection comment")
 | |
| 
 | |
|     options.launchpad = Launchpad.login_with(
 | |
|         CONSUMER_KEY, options.launchpad_instance, version="devel")
 | |
| 
 | |
|     options.queue = options.queue.title()
 | |
|     lputils.setup_location(options, default_pocket="Proposed")
 | |
| 
 | |
|     if not options.source and not options.binary:
 | |
|         options.source = True
 | |
|         options.binary = True
 | |
| 
 | |
|     try:
 | |
|         sys.exit(queue_action(options, args))
 | |
|     except ValueError as x:
 | |
|         print(x)
 | |
|         sys.exit(1)
 | |
| 
 | |
| 
 | |
| if __name__ == '__main__':
 | |
|     main()
 |