You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

498 lines
16 KiB

#! /usr/bin/python3
# Copyright (C) 2012 Canonical Ltd.
# Author: Colin Watson <cjwatson@ubuntu.com>
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; version 3 of the License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""Manipulate Ubuntu upload queues."""
from __future__ import print_function
import collections
from datetime import datetime
from operator import attrgetter
from optparse import OptionParser, SUPPRESS_HELP
import os
import sys
try:
from urllib.parse import unquote, urlsplit
from urllib.request import urlretrieve
except ImportError:
from urllib import unquote, urlretrieve
from urlparse import urlsplit
from launchpadlib.launchpad import Launchpad
import pytz
import lputils
CONSUMER_KEY = "queue"
queue_names = (
"New",
"Unapproved",
"Accepted",
"Done",
"Rejected",
)
now = datetime.now(pytz.timezone("UTC"))
def queue_item(options, queue_id):
"""Load a queue item by its numeric ID."""
return options.launchpad.load('%s%s/%s/+upload/%s' % (
options.launchpad._root_uri.ensureSlash(), options.distribution.name,
options.series.name, queue_id))
def queue_item_allowed(options, item):
# Rather than using item.contains_build, treat anything that isn't
# sourceful as binaryful. This allows us to treat copies of custom
# uploads (which have none of contains_source, contains_copy, or
# contains_build) as binaryful. However, copies may contain both source
# and binaries.
sourceful = item.contains_source or item.contains_copy
binaryful = not item.contains_source or item.contains_copy
if options.source and sourceful:
return True
elif options.binary and binaryful:
return True
else:
return False
def queue_items(options, args):
if not args:
args = ['']
items = []
for arg in args:
arg = arg.strip()
if arg.isdigit():
item = queue_item(options, arg)
if item.self_link in {item.self_link for item in items}:
continue
if item.status != options.queue:
raise ValueError(
"Item %s is in queue %s, not requested queue %s" %
(item.id, item.status, options.queue))
if (item.distroseries != options.series or
item.pocket != options.pocket):
if item.pocket == "Release":
item_suite = item.distroseries.name
else:
item_suite = "%s-%s" % (
item.distroseries.name, item.pocket.lower())
raise ValueError("Item %s is in %s/%s not in %s/%s" % (
item.id, item.distroseries.distribution.name,
item_suite, options.distribution.name,
options.suite))
if queue_item_allowed(options, item):
items.append(item)
else:
kwargs = {}
if "/" in arg:
kwargs["name"], kwargs["version"] = arg.split("/")
elif arg:
kwargs["name"] = arg
new_items = options.series.getPackageUploads(
archive=options.archive, pocket=options.pocket,
status=options.queue, exact_match=options.exact_match,
**kwargs)
for item in new_items:
if queue_item_allowed(options, item):
items.append(item)
return items
#XXX cprov 2006-09-19: We need to use template engine instead of hardcoded
# format variables.
HEAD = "-" * 9 + "|----|" + "-" * 22 + "|" + "-" * 22 + "|" + "-" * 15
FOOT_MARGIN = " " * (9 + 6 + 1 + 22 + 1 + 22 + 2)
def make_tag(item):
if item.contains_copy:
return "X-"
else:
return (("S" if item.contains_source else "-") +
("B" if item.contains_build else "-"))
def approximate_age(time):
"""Return a nicely-formatted approximate age."""
seconds = int((now - time).total_seconds())
if seconds == 1:
return "1 second"
elif seconds < 60:
return "%d seconds" % seconds
minutes = int(round(seconds / 60.0))
if minutes == 1:
return "1 minute"
elif minutes < 60:
return "%d minutes" % minutes
hours = int(round(minutes / 60.0))
if hours == 1:
return "1 hour"
elif hours < 48:
return "%d hours" % hours
days = int(round(hours / 24.0))
if days == 1:
return "1 day"
elif days < 14:
return "%d days" % days
weeks = int(round(days / 7.0))
if weeks == 1:
return "1 week"
else:
return "%d weeks" % weeks
def show_item_main(item):
tag = make_tag(item)
# TODO truncation sucks
print("%8d | %s | %s | %s | %s" %
(item.id, tag, item.display_name.ljust(20)[:20],
item.display_version.ljust(20)[:20],
approximate_age(item.date_created)))
def show_source(source):
print("\t | * %s/%s Component: %s Section: %s" %
(source.package_name, source.package_version,
source.component_name, source.section_name))
def show_binary(binary):
if "customformat" in binary:
print("\t | * %s Format: %s" % (
binary["name"], binary["customformat"]))
else:
if binary["is_new"]:
status_flag = "N"
else:
status_flag = "*"
print("\t | %s %s/%s/%s "
"Component: %s Section: %s Priority: %s" % (
status_flag, binary["name"], binary["version"],
binary["architecture"], binary["component"],
binary["section"], binary["priority"]))
def show_item(item):
show_item_main(item)
if item.contains_copy or item.contains_source:
show_source(item)
if item.contains_build:
for binary in item.getBinaryProperties():
show_binary(binary)
def display_name(item):
display = "%s/%s" % (item.display_name, item.display_version)
if item.contains_build:
display += " (%s)" % item.display_arches
return display
def info(options, args):
"""Show information on queue items."""
items = queue_items(options, args)
print("Listing %s/%s (%s) %s" %
(options.distribution.name, options.suite, options.queue,
len(items)))
print(HEAD)
for item in items:
show_item(item)
print(HEAD)
print(FOOT_MARGIN + str(len(items)))
return 0
# Get librarian URLs for source_package_publishing_history or package_upload
# objects
def urls(options, item):
try:
if item.contains_copy:
archive = item.copy_source_archive
item = archive.getPublishedSources(
exact_match=True, source_name=item.package_name,
version=item.package_version)
if item:
return urls(options, item[0])
else:
print("Error: Can't find source package for copy")
return []
except AttributeError:
# Not a package_upload
pass
ret = []
try:
ret.append(item.changes_file_url)
ret.extend(item.customFileUrls())
except AttributeError: # Copies won't have this
ret.append(item.changesFileUrl())
if options.source:
ret.extend(item.sourceFileUrls())
if options.binary:
ret.extend(item.binaryFileUrls())
# On staging we may get None URLs due to missing library files; filter
# these out.
ret = list(filter(None, ret))
return ret
def fetch(options, args):
"""Fetch the contents of a queue item."""
ret = 1
items = queue_items(options, args)
for item in items:
print("Fetching %s" % display_name(item))
fetch_item(options, item)
ret = 0
return ret
def fetch_item(options, item):
for url in urls(options, item):
path = urlsplit(url)[2]
filename = unquote(path.split("/")[-1])
exists = os.path.exists(filename)
if options.overwrite or not exists:
print("Constructing %s (%s)" % (filename, url))
urlretrieve(url, filename)
elif exists:
print("Not overwriting existing %s with %s" %
(filename, url))
def show_urls(options, args):
"""Show the URLs from which a queue item may be downloaded."""
items = queue_items(options, args)
for item in items:
for url in urls(options, item):
print(url)
return 0 if items else 1
def accept(options, args):
"""Accept a queue item."""
items = queue_items(options, args)
for item in sorted(items, key=attrgetter("id")):
if options.dry_run:
print("Would accept %s" % display_name(item))
else:
print("Accepting %s" % display_name(item))
item.acceptFromQueue()
return 0 if items else 1
def reject(options, args):
"""Reject a queue item."""
items = queue_items(options, args)
for item in sorted(items, key=attrgetter("id")):
if options.dry_run:
print("Would reject %s" % display_name(item))
else:
print("Rejecting %s" % display_name(item))
item.rejectFromQueue(comment=options.reject_comment)
return 0 if items else 1
def override_source(options, item):
"""Override properties of source packages in a queue item."""
kwargs = {}
if options.component:
kwargs["new_component"] = options.component
if options.section:
kwargs["new_section"] = options.section
print("Overriding %s_%s (%s/%s)" % (
item.package_name, item.package_version,
item.component_name, item.section_name))
item.overrideSource(**kwargs)
show_item(options.launchpad.load(item.self_link))
return set((item.package_name,))
def override_binary(options, args, item):
"""Override properties of binary packages in a queue item."""
overridden = set()
changes = []
show_binaries = []
for binary in item.getBinaryProperties():
if binary["name"] in args:
overridden.add(binary["name"])
print("Overriding %s_%s (%s/%s/%s)" % (
binary["name"], binary["version"],
binary["component"], binary["section"], binary["priority"]))
change = {"name": binary["name"]}
if options.component is not None:
change["component"] = options.component
if options.section is not None:
change["section"] = options.section
if options.priority is not None:
change["priority"] = options.priority
changes.append(change)
show_binaries.append(binary["name"])
if changes:
item.overrideBinaries(changes=changes)
if show_binaries:
show_item_main(item)
for binary in item.getBinaryProperties():
if binary["name"] in show_binaries:
show_binary(binary)
return overridden
def override(options, args):
"""Override properties of packages in the queue.
You may override the component (-c) or the section (-x). In the case of
binary packages, you may also override the priority (-p).
"""
overridden = set()
items = queue_items(options, args)
for item in items:
if item.contains_source or item.contains_copy:
overridden.update(override_source(options, item))
if item.contains_build:
overridden.update(override_binary(options, args, item))
not_overridden = set(args) - overridden
if not_overridden:
print("No matches for %s" % ",".join(sorted(not_overridden)))
return 1
else:
return 0
def report(options, args):
"""Show a report on the sizes of available queues."""
print("Report for %s/%s" % (options.distribution.name, options.suite))
for queue_name in queue_names:
items = options.series.getPackageUploads(
archive=options.archive, pocket=options.pocket, status=queue_name)
print(" %s -> %s entries" % (queue_name, len(items)))
return 0
queue_actions = {
'info': info,
'fetch': fetch,
'show-urls': show_urls,
'accept': accept,
'reject': reject,
'override': override,
'report': report,
}
def main():
parser = OptionParser(
usage="usage: %prog [options] ACTION [...]",
description=(
"ACTION may be one of info, fetch, show-urls, accept, reject, "
"override, or report."),
epilog=lputils.ARCHIVE_REFERENCE_DESCRIPTION)
parser.add_option(
"-l", "--launchpad", dest="launchpad_instance", default="production")
parser.add_option("-A", "--archive", help="look in ARCHIVE")
parser.add_option(
"-s", "--suite", dest="suite", metavar="SUITE",
help="look in suite SUITE")
parser.add_option(
"-Q", "--queue", dest="queue", metavar="QUEUE", default="new",
help="consider packages in QUEUE")
parser.add_option(
"-n", "--dry-run", dest="dry_run", default=False, action="store_true",
help="don't make any modifications")
parser.add_option(
"-e", "--exact-match", dest="exact_match",
default=True, action="store_true",
help="treat name filter as an exact match")
parser.add_option(
"-E", "--no-exact-match", dest="exact_match", action="store_false",
help="treat name filter as a prefix match")
parser.add_option(
"-c", "--component", dest="component", metavar="COMPONENT",
help="when overriding, move package to COMPONENT")
parser.add_option(
"-x", "--section", dest="section", metavar="SECTION",
help="when overriding, move package to SECTION")
parser.add_option(
"-p", "--priority", dest="priority", metavar="PRIORITY",
help="when overriding, move package to PRIORITY")
parser.add_option(
"--source", dest="source", default=False, action="store_true",
help="only operate on source packages")
parser.add_option(
"--binary", dest="binary", default=False, action="store_true",
help="only operate on binary packages")
parser.add_option(
"--overwrite", dest="overwrite", default=False, action="store_true",
help="when fetching, overwrite existing files")
parser.add_option("-m", "--reject-comment", help="rejection comment")
# Deprecated in favour of -A.
parser.add_option(
"-d", "--distribution", dest="distribution", default="ubuntu",
help=SUPPRESS_HELP)
parser.add_option("--ppa", help=SUPPRESS_HELP)
parser.add_option("--ppa-name", help=SUPPRESS_HELP)
parser.add_option(
"-j", "--partner", default=False, action="store_true",
help=SUPPRESS_HELP)
options, args = parser.parse_args()
if not args:
parser.error("must select an action")
action = args.pop(0)
try:
queue_action = queue_actions[action]
except KeyError:
parser.error("unknown action: %s" % action)
if action == "reject" and options.reject_comment is None:
parser.error("rejections must supply a rejection comment")
options.launchpad = Launchpad.login_with(
CONSUMER_KEY, options.launchpad_instance, version="devel")
options.queue = options.queue.title()
lputils.setup_location(options, default_pocket="Proposed")
if not options.source and not options.binary:
options.source = True
options.binary = True
try:
sys.exit(queue_action(options, args))
except ValueError as x:
print(x)
sys.exit(1)
if __name__ == '__main__':
main()