#! /usr/bin/python3 # Copyright (C) 2012 Canonical Ltd. # Author: Colin Watson # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; version 3 of the License. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . """Manipulate Ubuntu upload queues.""" from __future__ import print_function import collections from datetime import datetime from operator import attrgetter from optparse import OptionParser, SUPPRESS_HELP import os import sys try: from urllib.parse import unquote, urlsplit from urllib.request import urlretrieve except ImportError: from urllib import unquote, urlretrieve from urlparse import urlsplit from launchpadlib.launchpad import Launchpad import pytz import lputils CONSUMER_KEY = "queue" queue_names = ( "New", "Unapproved", "Accepted", "Done", "Rejected", ) now = datetime.now(pytz.timezone("UTC")) def queue_item(options, queue_id): """Load a queue item by its numeric ID.""" return options.launchpad.load('%s%s/%s/+upload/%s' % ( options.launchpad._root_uri.ensureSlash(), options.distribution.name, options.series.name, queue_id)) def queue_item_allowed(options, item): # Rather than using item.contains_build, treat anything that isn't # sourceful as binaryful. This allows us to treat copies of custom # uploads (which have none of contains_source, contains_copy, or # contains_build) as binaryful. However, copies may contain both source # and binaries. sourceful = item.contains_source or item.contains_copy binaryful = not item.contains_source or item.contains_copy if options.source and sourceful: return True elif options.binary and binaryful: return True else: return False def queue_items(options, args): if not args: args = [''] items = [] for arg in args: arg = arg.strip() if arg.isdigit(): item = queue_item(options, arg) if item.self_link in {item.self_link for item in items}: continue if item.status != options.queue: raise ValueError( "Item %s is in queue %s, not requested queue %s" % (item.id, item.status, options.queue)) if (item.distroseries != options.series or item.pocket != options.pocket): if item.pocket == "Release": item_suite = item.distroseries.name else: item_suite = "%s-%s" % ( item.distroseries.name, item.pocket.lower()) raise ValueError("Item %s is in %s/%s not in %s/%s" % ( item.id, item.distroseries.distribution.name, item_suite, options.distribution.name, options.suite)) if queue_item_allowed(options, item): items.append(item) else: kwargs = {} if "/" in arg: kwargs["name"], kwargs["version"] = arg.split("/") elif arg: kwargs["name"] = arg new_items = options.series.getPackageUploads( archive=options.archive, pocket=options.pocket, status=options.queue, exact_match=options.exact_match, **kwargs) for item in new_items: if queue_item_allowed(options, item): items.append(item) return items #XXX cprov 2006-09-19: We need to use template engine instead of hardcoded # format variables. HEAD = "-" * 9 + "|----|" + "-" * 22 + "|" + "-" * 22 + "|" + "-" * 15 FOOT_MARGIN = " " * (9 + 6 + 1 + 22 + 1 + 22 + 2) def make_tag(item): if item.contains_copy: return "X-" else: return (("S" if item.contains_source else "-") + ("B" if item.contains_build else "-")) def approximate_age(time): """Return a nicely-formatted approximate age.""" seconds = int((now - time).total_seconds()) if seconds == 1: return "1 second" elif seconds < 60: return "%d seconds" % seconds minutes = int(round(seconds / 60.0)) if minutes == 1: return "1 minute" elif minutes < 60: return "%d minutes" % minutes hours = int(round(minutes / 60.0)) if hours == 1: return "1 hour" elif hours < 48: return "%d hours" % hours days = int(round(hours / 24.0)) if days == 1: return "1 day" elif days < 14: return "%d days" % days weeks = int(round(days / 7.0)) if weeks == 1: return "1 week" else: return "%d weeks" % weeks def show_item_main(item): tag = make_tag(item) # TODO truncation sucks print("%8d | %s | %s | %s | %s" % (item.id, tag, item.display_name.ljust(20)[:20], item.display_version.ljust(20)[:20], approximate_age(item.date_created))) def show_source(source): print("\t | * %s/%s Component: %s Section: %s" % (source.package_name, source.package_version, source.component_name, source.section_name)) def show_binary(binary): if "customformat" in binary: print("\t | * %s Format: %s" % ( binary["name"], binary["customformat"])) else: if binary["is_new"]: status_flag = "N" else: status_flag = "*" print("\t | %s %s/%s/%s " "Component: %s Section: %s Priority: %s" % ( status_flag, binary["name"], binary["version"], binary["architecture"], binary["component"], binary["section"], binary["priority"])) def show_item(item): show_item_main(item) if item.contains_copy or item.contains_source: show_source(item) if item.contains_build: for binary in item.getBinaryProperties(): show_binary(binary) def display_name(item): display = "%s/%s" % (item.display_name, item.display_version) if item.contains_build: display += " (%s)" % item.display_arches return display def info(options, args): """Show information on queue items.""" items = queue_items(options, args) print("Listing %s/%s (%s) %s" % (options.distribution.name, options.suite, options.queue, len(items))) print(HEAD) for item in items: show_item(item) print(HEAD) print(FOOT_MARGIN + str(len(items))) return 0 # Get librarian URLs for source_package_publishing_history or package_upload # objects def urls(options, item): try: if item.contains_copy: archive = item.copy_source_archive item = archive.getPublishedSources( exact_match=True, source_name=item.package_name, version=item.package_version) if item: return urls(options, item[0]) else: print("Error: Can't find source package for copy") return [] except AttributeError: # Not a package_upload pass ret = [] try: ret.append(item.changes_file_url) ret.extend(item.customFileUrls()) except AttributeError: # Copies won't have this ret.append(item.changesFileUrl()) if options.source: ret.extend(item.sourceFileUrls()) if options.binary: ret.extend(item.binaryFileUrls()) # On staging we may get None URLs due to missing library files; filter # these out. ret = list(filter(None, ret)) return ret def fetch(options, args): """Fetch the contents of a queue item.""" ret = 1 items = queue_items(options, args) for item in items: print("Fetching %s" % display_name(item)) fetch_item(options, item) ret = 0 return ret def fetch_item(options, item): for url in urls(options, item): path = urlsplit(url)[2] filename = unquote(path.split("/")[-1]) exists = os.path.exists(filename) if options.overwrite or not exists: print("Constructing %s (%s)" % (filename, url)) urlretrieve(url, filename) elif exists: print("Not overwriting existing %s with %s" % (filename, url)) def show_urls(options, args): """Show the URLs from which a queue item may be downloaded.""" items = queue_items(options, args) for item in items: for url in urls(options, item): print(url) return 0 if items else 1 def accept(options, args): """Accept a queue item.""" items = queue_items(options, args) for item in sorted(items, key=attrgetter("id")): if options.dry_run: print("Would accept %s" % display_name(item)) else: print("Accepting %s" % display_name(item)) item.acceptFromQueue() return 0 if items else 1 def reject(options, args): """Reject a queue item.""" items = queue_items(options, args) for item in sorted(items, key=attrgetter("id")): if options.dry_run: print("Would reject %s" % display_name(item)) else: print("Rejecting %s" % display_name(item)) item.rejectFromQueue(comment=options.reject_comment) return 0 if items else 1 def override_source(options, item): """Override properties of source packages in a queue item.""" kwargs = {} if options.component: kwargs["new_component"] = options.component if options.section: kwargs["new_section"] = options.section print("Overriding %s_%s (%s/%s)" % ( item.package_name, item.package_version, item.component_name, item.section_name)) item.overrideSource(**kwargs) show_item(options.launchpad.load(item.self_link)) return set((item.package_name,)) def override_binary(options, args, item): """Override properties of binary packages in a queue item.""" overridden = set() changes = [] show_binaries = [] for binary in item.getBinaryProperties(): if binary["name"] in args: overridden.add(binary["name"]) print("Overriding %s_%s (%s/%s/%s)" % ( binary["name"], binary["version"], binary["component"], binary["section"], binary["priority"])) change = {"name": binary["name"]} if options.component is not None: change["component"] = options.component if options.section is not None: change["section"] = options.section if options.priority is not None: change["priority"] = options.priority changes.append(change) show_binaries.append(binary["name"]) if changes: item.overrideBinaries(changes=changes) if show_binaries: show_item_main(item) for binary in item.getBinaryProperties(): if binary["name"] in show_binaries: show_binary(binary) return overridden def override(options, args): """Override properties of packages in the queue. You may override the component (-c) or the section (-x). In the case of binary packages, you may also override the priority (-p). """ overridden = set() items = queue_items(options, args) for item in items: if item.contains_source or item.contains_copy: overridden.update(override_source(options, item)) if item.contains_build: overridden.update(override_binary(options, args, item)) not_overridden = set(args) - overridden if not_overridden: print("No matches for %s" % ",".join(sorted(not_overridden))) return 1 else: return 0 def report(options, args): """Show a report on the sizes of available queues.""" print("Report for %s/%s" % (options.distribution.name, options.suite)) for queue_name in queue_names: items = options.series.getPackageUploads( archive=options.archive, pocket=options.pocket, status=queue_name) print(" %s -> %s entries" % (queue_name, len(items))) return 0 queue_actions = { 'info': info, 'fetch': fetch, 'show-urls': show_urls, 'accept': accept, 'reject': reject, 'override': override, 'report': report, } def main(): parser = OptionParser( usage="usage: %prog [options] ACTION [...]", description=( "ACTION may be one of info, fetch, show-urls, accept, reject, " "override, or report."), epilog=lputils.ARCHIVE_REFERENCE_DESCRIPTION) parser.add_option( "-l", "--launchpad", dest="launchpad_instance", default="production") parser.add_option("-A", "--archive", help="look in ARCHIVE") parser.add_option( "-s", "--suite", dest="suite", metavar="SUITE", help="look in suite SUITE") parser.add_option( "-Q", "--queue", dest="queue", metavar="QUEUE", default="new", help="consider packages in QUEUE") parser.add_option( "-n", "--dry-run", dest="dry_run", default=False, action="store_true", help="don't make any modifications") parser.add_option( "-e", "--exact-match", dest="exact_match", default=True, action="store_true", help="treat name filter as an exact match") parser.add_option( "-E", "--no-exact-match", dest="exact_match", action="store_false", help="treat name filter as a prefix match") parser.add_option( "-c", "--component", dest="component", metavar="COMPONENT", help="when overriding, move package to COMPONENT") parser.add_option( "-x", "--section", dest="section", metavar="SECTION", help="when overriding, move package to SECTION") parser.add_option( "-p", "--priority", dest="priority", metavar="PRIORITY", help="when overriding, move package to PRIORITY") parser.add_option( "--source", dest="source", default=False, action="store_true", help="only operate on source packages") parser.add_option( "--binary", dest="binary", default=False, action="store_true", help="only operate on binary packages") parser.add_option( "--overwrite", dest="overwrite", default=False, action="store_true", help="when fetching, overwrite existing files") parser.add_option("-m", "--reject-comment", help="rejection comment") # Deprecated in favour of -A. parser.add_option( "-d", "--distribution", dest="distribution", default="ubuntu", help=SUPPRESS_HELP) parser.add_option("--ppa", help=SUPPRESS_HELP) parser.add_option("--ppa-name", help=SUPPRESS_HELP) parser.add_option( "-j", "--partner", default=False, action="store_true", help=SUPPRESS_HELP) options, args = parser.parse_args() if not args: parser.error("must select an action") action = args.pop(0) try: queue_action = queue_actions[action] except KeyError: parser.error("unknown action: %s" % action) if action == "reject" and options.reject_comment is None: parser.error("rejections must supply a rejection comment") options.launchpad = Launchpad.login_with( CONSUMER_KEY, options.launchpad_instance, version="devel") options.queue = options.queue.title() lputils.setup_location(options, default_pocket="Proposed") if not options.source and not options.binary: options.source = True options.binary = True try: sys.exit(queue_action(options, args)) except ValueError as x: print(x) sys.exit(1) if __name__ == '__main__': main()