ppa-britney/ubuntu-archive-tools/edit-acl

718 lines
27 KiB

#!/usr/bin/python
# -*- coding: utf-8 -*-
# Copyright (C) 2008, 2009, 2010, 2011, 2012 Canonical Ltd.
# Copyright (C) 2010 Stéphane Graber <stgraber@stgraber.org>
# Copyright (C) 2010 Michael Bienia <geser@ubuntu.com>
# Copyright (C) 2011 Iain Lane <laney@ubuntu.com>
# Copyright (C) 2011 Soren Hansen <soren@linux2go.dk>
# Copyright (C) 2012 Stefano Rivera <stefanor@ubuntu.com>
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; version 3 of the License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""Edit uploader permissions for the Ubuntu distro in Launchpad."""
from __future__ import print_function
from optparse import OptionParser, SUPPRESS_HELP
import sys
import launchpadlib.errors
from launchpadlib.launchpad import Launchpad
import lputils
if sys.version < '3':
input = raw_input
CONSUMER_KEY = "edit-acl"
def print_perms(perms, series=None):
for perm in perms:
if (series is not None and perm.distro_series_name is not None and
series.name != perm.distro_series_name):
continue
desc = []
desc.append("archive '%s'" % perm.archive.name)
if perm.component_name:
desc.append("component '%s'" % perm.component_name)
if series:
desc[-1] += ' in %s' % series
if perm.package_set_name:
desc.append("package set '%s' in %s" % (perm.package_set_name,
perm.distro_series_name))
if perm.source_package_name:
desc.append("source package '%s'" % perm.source_package_name)
if perm.pocket:
desc.append("pocket '%s'" % perm.pocket)
if perm.distro_series_name is not None:
desc[-1] += " in %s" % perm.distro_series_name
print("%s for %s: %s" % (perm.permission, perm.person.name,
', '.join(desc)))
def multiline_input(prompt):
print(prompt)
print("End with a line containing only a full-stop '.'")
buf = []
while True:
line = input()
if line == '.':
return '\n'.join(buf)
buf.append(line)
def get_archive(options, launchpad):
# We default to looking up by archive reference (ubuntu,
# ubuntu/partner or ~owner/ubuntu/ppa).
if options.archive is not None:
archive = launchpad.archives.getByReference(reference=options.archive)
if archive is not None:
return archive
# But we also still support combining a distro name in -d and an
# archive name or old PPA reference in -A (-d ubuntu,
# -d ubuntu -A partner, or -d ubuntu -A owner/ppa).
distro = launchpad.distributions[options.distro]
if options.archive is None:
return distro.main_archive
else:
if '/' in options.archive:
owner, ppa_name = options.archive.split('/')
return launchpad.people[owner].getPPAByName(
distribution=distro, name=ppa_name)
for archive in distro.archives:
if archive.name == options.archive:
return archive
raise AssertionError("No such archive in Ubuntu: %s" % options.archive)
def get_source_components(options, launchpad, archive, source):
try:
from debian import debian_support
except ImportError:
from debian_bundle import debian_support
args = {}
if options.series:
args['distro_series'] = options.series
newest = {}
for spph in archive.getPublishedSources(
source_name=source, exact_match=True, status='Published', **args):
if not spph.distro_series.active:
continue
new_version = debian_support.Version(spph.source_package_version)
if (spph.distro_series.name not in newest or
new_version > newest[spph.distro_series.name][0]):
newest[spph.distro_series.name] = (new_version,
spph.component_name)
for series in sorted(newest, key=lambda s: newest[s][0]):
yield series, newest[series][1]
permission_names = dict(upload='Archive Upload Rights',
admin='Queue Administration Rights')
def do_query(options):
"""Query existing permissions and show on stdout."""
if options.archive.self_link == options.distro.main_archive_link:
archives = options.distro.archives
else:
archives = [options.archive]
if options.person:
for person in options.person:
if '@' in person:
lp_person = launchpad.people.getByEmail(email=person)
else:
try:
lp_person = launchpad.people[person]
except KeyError:
print("Person '%s' doesn't exist." % person)
sys.exit(1)
perms = []
for archive in archives:
perms.extend(archive.getPermissionsForPerson(
person=lp_person))
if options.acl_type:
perm_name = permission_names[options.acl_type]
perms = [p for p in perms if p.permission == perm_name]
print("== All rights for %s ==" % lp_person.name)
print_perms(perms, options.series)
if options.component:
perms = []
if not options.acl_type or options.acl_type == 'upload':
for archive in archives:
perms.extend(archive.getUploadersForComponent(
component_name=options.component))
if not options.acl_type or options.acl_type == 'admin':
for archive in archives:
perms.extend(archive.getQueueAdminsForComponent(
component_name=options.component))
print("== All rights for component '%s' ==" % options.component)
print_perms(perms, options.series)
if options.packageset:
for packageset in options.packageset:
lp_set = launchpad.packagesets.getByName(
name=packageset, distroseries=options.series)
perms = []
for archive in archives:
perms.extend(archive.getUploadersForPackageset(
packageset=lp_set))
print(("== All uploaders for package set '%s' in '%s' "
"(owned by '%s') ==" %
(packageset, options.series.name,
lp_set.owner.display_name)))
print_perms(perms, options.series)
sources = sorted(lp_set.getSourcesIncluded(direct_inclusion=True))
if sources:
print()
print("== All source packages in package set '%s' "
"in '%s' ==" % (packageset, options.series.name))
for source in sources:
print(source)
child_sets = list(lp_set.setsIncluded(direct_inclusion=True))
if child_sets:
print()
print("== All package sets in package set '%s' in '%s' ==" %
(packageset, options.series.name))
for child_set in child_sets:
print(child_set.name)
if options.source:
for source in options.source:
perms = []
perms_set = []
for archive in archives:
perms.extend(archive.getUploadersForPackage(
source_package_name=source))
perms_set.extend(archive.getPackagesetsForSource(
sourcepackagename=source))
print("== All uploaders for package '%s' ==" % source)
print_perms(perms, options.series)
print_perms(perms_set, options.series)
for archive in archives:
for series, component in get_source_components(
options, launchpad, archive, source):
perms_component = archive.getUploadersForComponent(
component_name=component)
print_perms(perms_component, series=series)
if options.pocket:
perms = []
if not options.acl_type or options.acl_type == 'upload':
for archive in archives:
perms.extend(archive.getUploadersForPocket(
pocket=options.pocket))
if not options.acl_type or options.acl_type == 'admin':
for archive in archives:
perms.extend(archive.getQueueAdminsForPocket(
pocket=options.pocket))
print("== All rights for pocket '%s' ==" % options.pocket)
print_perms(perms, options.series)
if (not options.person and not options.component and
not options.packageset and not options.source and
not options.pocket):
perms = []
for archive in archives:
perms.extend(archive.getAllPermissions())
if options.acl_type:
perm_name = permission_names[options.acl_type]
perms = [p for p in perms if p.permission == perm_name]
print("== All rights ==")
print_perms(perms, options.series)
def validate_add_delete_options(options, requires_person=True):
if options.packageset and options.source:
# Special options to manage package sets, bodged into this tool
# since they aren't entirely inconvenient here.
if options.component or options.person:
print("-P <packageset> -s <source> cannot be used with a "
"component or person as well")
return False
return True
if requires_person and not options.person:
print("You must specify at least one person to (de-)authorise.")
return False
count = 0
if options.component:
count += 1
if options.packageset:
count += 1
if options.source:
count += 1
if options.pocket:
count += 1
if count > 1:
print("You can only specify one of package set, source, component, "
"or pocket")
return False
if count == 0:
print("You must specify one of package set, source, component, or "
"pocket")
return False
return True
def do_add(options):
"""Add a new permission."""
if not validate_add_delete_options(options):
return False
if options.packageset and options.source:
for packageset in options.packageset:
lp_set = launchpad.packagesets.getByName(
name=packageset, distroseries=options.series)
lp_set.addSources(names=options.source)
print("Added:")
for source in options.source:
print(source)
return
people = [launchpad.people[person] for person in options.person]
if options.source:
for source in options.source:
for person in people:
perm = options.archive.newPackageUploader(
person=person, source_package_name=source)
print("Added:")
print_perms([perm])
return
if options.packageset:
for packageset in options.packageset:
lp_set = launchpad.packagesets.getByName(
name=packageset, distroseries=options.series)
for person in people:
perm = options.archive.newPackagesetUploader(
person=person, packageset=lp_set)
print("Added:")
print_perms([perm])
return
if options.component:
for person in people:
if not options.acl_type or options.acl_type == 'upload':
perm = options.archive.newComponentUploader(
person=person, component_name=options.component)
else:
perm = options.archive.newQueueAdmin(
person=person, component_name=options.component)
print("Added:")
print_perms([perm])
return
if options.pocket:
admin_kwargs = {}
if options.series:
admin_kwargs["distroseries"] = options.series
for person in people:
if not options.acl_type or options.acl_type == 'upload':
perm = options.archive.newPocketUploader(
person=person, pocket=options.pocket)
else:
perm = options.archive.newPocketQueueAdmin(
person=person, pocket=options.pocket, **admin_kwargs)
print("Added:")
print_perms([perm])
return
def do_delete(options):
"""Delete a permission."""
# We kind of hacked packageset management into here.
# Deleting packagesets doesn't require a person...
requires_person = not (options.packageset and not options.source)
if not validate_add_delete_options(options, requires_person):
return False
if options.packageset and options.source:
for packageset in options.packageset:
lp_set = launchpad.packagesets.getByName(
name=packageset, distroseries=options.series)
lp_set.removeSources(names=options.source)
print("Deleted:")
for source in options.source:
print(source)
return
if options.packageset and not options.person:
for packageset in options.packageset:
lp_set = launchpad.packagesets.getByName(
name=packageset, distroseries=options.series)
uploaders = options.archive.getUploadersForPackageset(
direct_permissions=True, packageset=lp_set)
if len(uploaders) > 0:
print("Cannot delete packageset with defined uploaders")
print("Current uploaders:")
for permission in uploaders:
print(" %s" % permission.person.name)
continue
print("Confirm removal of packageset '%s'" % lp_set.name)
print("Description:")
print(" " + lp_set.description.replace("\n", "\n "))
print("Containing Sources:")
for member in lp_set.getSourcesIncluded():
print(" %s" % member)
print("Containing packagesets:")
for member in lp_set.setsIncluded():
print(" %s" % member.name)
ack = input("Remove? (y/N): ")
if ack.lower() == 'y':
lp_set.lp_delete()
print("Deleted %s/%s" % (lp_set.name, options.series.name))
return
lp_people = [launchpad.people[person] for person in options.person]
if options.source:
for source in options.source:
for lp_person in lp_people:
try:
options.archive.deletePackageUploader(
person=lp_person, source_package_name=source)
print("Deleted %s/%s" % (lp_person.name, source))
except Exception:
print("Failed to delete %s/%s" % (lp_person.name, source))
return
if options.packageset:
for packageset in options.packageset:
lp_set = launchpad.packagesets.getByName(
name=packageset, distroseries=options.series)
for lp_person in lp_people:
options.archive.deletePackagesetUploader(
person=lp_person, packageset=lp_set)
print("Deleted %s/%s/%s" % (lp_person.name, packageset,
options.series.name))
return
if options.component:
for lp_person in lp_people:
if not options.acl_type or options.acl_type == 'upload':
options.archive.deleteComponentUploader(
person=lp_person, component_name=options.component)
print("Deleted %s/%s" % (lp_person.name, options.component))
else:
options.archive.deleteQueueAdmin(
person=lp_person, component_name=options.component)
print("Deleted %s/%s (admin)" % (lp_person.name,
options.component))
return
if options.pocket:
admin_kwargs = {}
if options.series:
admin_kwargs["distroseries"] = options.series
for lp_person in lp_people:
if not options.acl_type or options.acl_type == 'upload':
options.archive.deletePocketUploader(
person=lp_person, pocket=options.pocket)
print("Deleted %s/%s" % (lp_person.name, options.pocket))
else:
options.archive.deletePocketQueueAdmin(
person=lp_person, pocket=options.pocket, **admin_kwargs)
if options.series:
print(
"Deleted %s/%s/%s (admin)" %
(lp_person.name, options.pocket, options.series.name))
else:
print("Deleted %s/%s (admin)" %
(lp_person.name, options.pocket))
return
def do_create(options):
if not options.packageset:
print("You can only create a package set, not something else.")
return False
if not options.person or len(options.person) != 1:
print("You must specify exactly one person to own the new package "
"set.")
return False
distro_series = options.series or options.distro.current_series
lp_person = launchpad.people[options.person[0]]
for packageset in options.packageset:
try:
if launchpad.packagesets.getByName(
name=packageset, distroseries=distro_series):
print("Package set %s already exists" % packageset)
continue
except (TypeError, launchpadlib.errors.HTTPError):
pass
desc = multiline_input("Description for new package set %s:"
% packageset)
ps = launchpad.packagesets.new(
name=packageset, description=desc, distroseries=distro_series,
owner=lp_person)
print(ps)
def do_modify(options):
if not options.packageset:
print("You can only modify a package set, not something else.")
return False
if options.person and len(options.person) > 1:
print("You can only specify one person as the new packageset owner.")
return False
distro_series = options.series or options.distro.current_series
lp_person = None
if options.person:
lp_person = launchpad.people[options.person[0]]
for packageset in options.packageset:
lp_set = launchpad.packagesets.getByName(
name=packageset, distroseries=distro_series)
if lp_person:
print("Making %s the owner of %s/%s"
% (lp_person.name, lp_set.name, distro_series.name))
lp_set.owner = lp_person
lp_set.lp_save()
continue
print("Current description of %s:" % lp_set.name)
print(" " + lp_set.description.replace("\n", "\n "))
desc = multiline_input("New description [blank=leave unmodified]:")
if desc:
print("Modifying description of %s/%s"
% (lp_set.name, distro_series.name))
lp_set.description = desc
lp_set.lp_save()
continue
rename = input("Rename %s to? [blank=don't rename]: " % lp_set.name)
if rename:
print("Renaming %s/%s -> %s"
% (lp_set.name, distro_series.name, rename))
lp_set.name = rename
lp_set.lp_save()
continue
def do_copy(options):
if options.archive.self_link == options.distro.main_archive_link:
archives = options.distro.archives
else:
archives = [options.archive]
if not options.packageset:
print("You can only copy a package set, not something else.")
return False
distro_series = options.series or options.distro.current_series
dst = input("Name of the destination series: ")
dst_series = options.distro.getSeries(name_or_version=dst)
for packageset in options.packageset:
src_pkgset = launchpad.packagesets.getByName(
name=packageset, distroseries=distro_series)
if not src_pkgset:
print("Package set %s doesn't exist" % packageset)
ps = launchpad.packagesets.new(
name=packageset, description=src_pkgset.description,
distroseries=dst_series, owner=src_pkgset.owner_link,
related_set=src_pkgset)
print(ps)
ps.addSources(names=src_pkgset.getSourcesIncluded())
perms = []
for archive in archives:
perms.extend(archive.getUploadersForPackageset(
packageset=src_pkgset))
for perm in perms:
perm.archive.newPackagesetUploader(
person=perm.person_link, packageset=ps)
def do_check(options):
"""Check if a person can upload a package."""
if not options.person:
print("A person needs to be specified to check.")
return False
if not options.source:
print("A source package needs to be specified to check.")
return False
people = [launchpad.people[person] for person in options.person]
distro_series = options.series or options.distro.current_series
if options.pocket:
pocket = options.pocket
else:
pocket = 'Release'
for person in people:
for srcpkg in options.source:
try:
spph = options.archive.getPublishedSources(
distro_series=distro_series,
exact_match=True,
pocket=pocket,
source_name=srcpkg,
status='Published',
)[0]
except IndexError:
if not options.pocket:
raise
# Not yet in options.pocket, but maybe in Release?
spph = options.archive.getPublishedSources(
distro_series=distro_series,
exact_match=True,
pocket='Release',
source_name=srcpkg,
status='Published',
)[0]
try:
options.archive.checkUpload(
component=spph.component_name,
distroseries=distro_series,
person=person,
pocket=pocket,
sourcepackagename=srcpkg,
)
print("%s (%s) can upload %s to %s/%s" % (
person.display_name, person.name,
srcpkg, distro_series.displayname, pocket))
except launchpadlib.errors.HTTPError as e:
if e.response.status == 403:
print("%s (%s) cannot upload %s to %s/%s" % (
person.display_name, person.name,
srcpkg, distro_series.displayname, pocket))
else:
print("There was a %s error:" % e.response.status)
print(e.content)
def main(options, action):
if action == "query":
do_query(options)
elif action == "add":
do_add(options)
elif action in ("delete", "remove"):
do_delete(options)
elif action == "create":
do_create(options)
elif action == "modify":
do_modify(options)
elif action == "copy":
do_copy(options)
elif action == "check":
do_check(options)
else:
raise AssertionError("Invalid action %s" % action)
return
if __name__ == '__main__':
parser = OptionParser(
usage="usage: %prog [options] "
"query|add|delete|create|modify|copy|check",
epilog=lputils.ARCHIVE_REFERENCE_DESCRIPTION)
parser.add_option(
"-l", "--launchpad", dest="launchpad_instance", default="production")
parser.add_option("-A", "--archive", dest="archive")
parser.add_option("-S", "--series", dest="series")
parser.add_option("-p", "--person", dest="person", action="append")
parser.add_option("-c", "--component", dest="component")
parser.add_option("-P", "--packageset", dest="packageset", action="append")
parser.add_option("-s", "--source", dest="source", action="append")
parser.add_option("--pocket", dest="pocket")
parser.add_option("-t", "--acl-type", dest="acl_type",
help="ACL type: upload or admin")
parser.add_option("--anon", dest="anon_login", action="store_true",
default=False, help="Login anonymously to Launchpad")
# Deprecated in favour of -A.
parser.add_option(
"-d", "--distribution", dest="distro", default="ubuntu",
help=SUPPRESS_HELP)
options, args = parser.parse_args()
possible_actions = ('query', 'add', 'delete', 'create', 'copy', 'check')
if len(args) != 1:
parser.error(
"You must specify an action, one of:\n%s" %
", ".join(possible_actions))
if options.anon_login and args[0] not in ('query', 'check'):
print("E: Anonymous login not supported for this action.")
sys.exit(1)
if (args[0] != 'query' and
not options.person and not options.component and
not options.packageset and not options.source and
not options.pocket):
parser.error("Provide at least one of "
"person/component/packageset/source/pocket")
if options.packageset and not options.series:
parser.error("Package set requires an associated series")
if options.acl_type and options.acl_type not in ('upload', 'admin'):
parser.error("Invalid ACL type '%s' (valid: 'upload', 'admin')" %
options.acl_type)
if options.acl_type == 'admin' and options.packageset:
parser.error("ACL type admin not allowed for package sets")
if options.acl_type == 'admin' and options.source:
parser.error("ACL type admin not allowed for source packages")
if options.pocket:
options.pocket = options.pocket.title()
if options.anon_login:
launchpad = Launchpad.login_anonymously(
CONSUMER_KEY, options.launchpad_instance, version="devel")
else:
launchpad = Launchpad.login_with(
CONSUMER_KEY, options.launchpad_instance, version="devel")
options.archive = get_archive(options, launchpad)
options.distro = options.archive.distribution
if options.series is not None:
options.series = options.distro.getSeries(
name_or_version=options.series)
try:
main(options, args[0])
except launchpadlib.errors.HTTPError as err:
print("There was a %s error:" % err.response.status)
print(err.content)