You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

717 lines
26 KiB

#!/usr/bin/python3
# -*- coding: utf-8 -*-
# Copyright (C) 2008, 2009, 2010, 2011, 2012 Canonical Ltd.
# Copyright (C) 2010 Stéphane Graber <stgraber@stgraber.org>
# Copyright (C) 2010 Michael Bienia <geser@ubuntu.com>
# Copyright (C) 2011 Iain Lane <laney@ubuntu.com>
# Copyright (C) 2011 Soren Hansen <soren@linux2go.dk>
# Copyright (C) 2012 Stefano Rivera <stefanor@ubuntu.com>
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; version 3 of the License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""Edit uploader permissions for the Ubuntu distro in Launchpad."""
from __future__ import print_function
import argparse
import sys
import launchpadlib.errors
from launchpadlib.launchpad import Launchpad
import lputils
if sys.version < '3':
input = raw_input
CONSUMER_KEY = "edit-acl"
def print_perms(perms, series=None):
for perm in perms:
if (series is not None and perm.distro_series_name is not None and
series.name != perm.distro_series_name):
continue
desc = []
desc.append("archive '%s'" % perm.archive.name)
if perm.component_name:
desc.append("component '%s'" % perm.component_name)
if series:
desc[-1] += ' in %s' % series
if perm.package_set_name:
desc.append("package set '%s' in %s" % (perm.package_set_name,
perm.distro_series_name))
if perm.source_package_name:
desc.append("source package '%s'" % perm.source_package_name)
if perm.pocket:
desc.append("pocket '%s'" % perm.pocket)
if perm.distro_series_name is not None:
desc[-1] += " in %s" % perm.distro_series_name
print("%s for %s: %s" % (perm.permission, perm.person.name,
', '.join(desc)))
def multiline_input(prompt):
print(prompt)
print("End with a line containing only a full-stop '.'")
buf = []
while True:
line = input()
if line == '.':
return '\n'.join(buf)
buf.append(line)
def get_archive(args, launchpad):
# We default to looking up by archive reference (ubuntu,
# ubuntu/partner or ~owner/ubuntu/ppa).
if args.archive is not None:
archive = launchpad.archives.getByReference(reference=args.archive)
if archive is not None:
return archive
# But we also still support combining a distro name in -d and an
# archive name or old PPA reference in -A (-d ubuntu,
# -d ubuntu -A partner, or -d ubuntu -A owner/ppa).
distro = launchpad.distributions[args.distro]
if args.archive is None:
return distro.main_archive
else:
if '/' in args.archive:
owner, ppa_name = args.archive.split('/')
return launchpad.people[owner].getPPAByName(
distribution=distro, name=ppa_name)
for archive in distro.archives:
if archive.name == args.archive:
return archive
raise AssertionError("No such archive in Ubuntu: %s" % args.archive)
def get_source_components(args, launchpad, archive, source):
try:
from debian import debian_support
except ImportError:
from debian_bundle import debian_support
kwargs = {}
if args.series:
kwargs['distro_series'] = args.series
newest = {}
for spph in archive.getPublishedSources(
source_name=source, exact_match=True, status='Published',
**kwargs):
if not spph.distro_series.active:
continue
new_version = debian_support.Version(spph.source_package_version)
if (spph.distro_series.name not in newest or
new_version > newest[spph.distro_series.name][0]):
newest[spph.distro_series.name] = (new_version,
spph.component_name)
for series in sorted(newest, key=lambda s: newest[s][0]):
yield series, newest[series][1]
permission_names = dict(upload='Archive Upload Rights',
admin='Queue Administration Rights')
def do_query(args):
"""Query existing permissions and show on stdout."""
if args.archive.self_link == args.distro.main_archive_link:
archives = args.distro.archives
else:
archives = [args.archive]
if args.person:
for person in args.person:
if '@' in person:
lp_person = launchpad.people.getByEmail(email=person)
else:
try:
lp_person = launchpad.people[person]
except KeyError:
print("Person '%s' doesn't exist." % person)
sys.exit(1)
perms = []
for archive in archives:
perms.extend(archive.getPermissionsForPerson(
person=lp_person))
if args.acl_type:
perm_name = permission_names[args.acl_type]
perms = [p for p in perms if p.permission == perm_name]
print("== All rights for %s ==" % lp_person.name)
print_perms(perms, args.series)
if args.component:
perms = []
if not args.acl_type or args.acl_type == 'upload':
for archive in archives:
perms.extend(archive.getUploadersForComponent(
component_name=args.component))
if not args.acl_type or args.acl_type == 'admin':
for archive in archives:
perms.extend(archive.getQueueAdminsForComponent(
component_name=args.component))
print("== All rights for component '%s' ==" % args.component)
print_perms(perms, args.series)
if args.packageset:
for packageset in args.packageset:
lp_set = launchpad.packagesets.getByName(
name=packageset, distroseries=args.series)
perms = []
for archive in archives:
perms.extend(archive.getUploadersForPackageset(
packageset=lp_set))
print(("== All uploaders for package set '%s' in '%s' "
"(owned by '%s') ==" %
(packageset, args.series.name,
lp_set.owner.display_name)))
print_perms(perms, args.series)
sources = sorted(lp_set.getSourcesIncluded(direct_inclusion=True))
if sources:
print()
print("== All source packages in package set '%s' "
"in '%s' ==" % (packageset, args.series.name))
for source in sources:
print(source)
child_sets = list(lp_set.setsIncluded(direct_inclusion=True))
if child_sets:
print()
print("== All package sets in package set '%s' in '%s' ==" %
(packageset, args.series.name))
for child_set in child_sets:
print(child_set.name)
if args.source:
for source in args.source:
perms = []
perms_set = []
for archive in archives:
perms.extend(archive.getUploadersForPackage(
source_package_name=source))
perms_set.extend(archive.getPackagesetsForSource(
sourcepackagename=source))
print("== All uploaders for package '%s' ==" % source)
print_perms(perms, args.series)
print_perms(perms_set, args.series)
for archive in archives:
for series, component in get_source_components(
args, launchpad, archive, source):
perms_component = archive.getUploadersForComponent(
component_name=component)
print_perms(perms_component, series=series)
if args.pocket:
perms = []
if not args.acl_type or args.acl_type == 'upload':
for archive in archives:
perms.extend(archive.getUploadersForPocket(pocket=args.pocket))
if not args.acl_type or args.acl_type == 'admin':
for archive in archives:
perms.extend(archive.getQueueAdminsForPocket(
pocket=args.pocket))
print("== All rights for pocket '%s' ==" % args.pocket)
print_perms(perms, args.series)
if (not args.person and not args.component and
not args.packageset and not args.source and
not args.pocket):
perms = []
for archive in archives:
perms.extend(archive.getAllPermissions())
if args.acl_type:
perm_name = permission_names[args.acl_type]
perms = [p for p in perms if p.permission == perm_name]
print("== All rights ==")
print_perms(perms, args.series)
def validate_add_delete_options(args, requires_person=True):
if args.packageset and args.source:
# Special options to manage package sets, bodged into this tool
# since they aren't entirely inconvenient here.
if args.component or args.person:
print("-P <packageset> -s <source> cannot be used with a "
"component or person as well")
return False
return True
if requires_person and not args.person:
print("You must specify at least one person to (de-)authorise.")
return False
count = 0
if args.component:
count += 1
if args.packageset:
count += 1
if args.source:
count += 1
if args.pocket:
count += 1
if count > 1:
print("You can only specify one of package set, source, component, "
"or pocket")
return False
if count == 0:
print("You must specify one of package set, source, component, or "
"pocket")
return False
return True
def do_add(args):
"""Add a new permission."""
if not validate_add_delete_options(args):
return False
if args.packageset and args.source:
for packageset in args.packageset:
lp_set = launchpad.packagesets.getByName(
name=packageset, distroseries=args.series)
lp_set.addSources(names=args.source)
print("Added:")
for source in args.source:
print(source)
return
people = [launchpad.people[person] for person in args.person]
if args.source:
for source in args.source:
for person in people:
perm = args.archive.newPackageUploader(
person=person, source_package_name=source)
print("Added:")
print_perms([perm])
return
if args.packageset:
for packageset in args.packageset:
lp_set = launchpad.packagesets.getByName(
name=packageset, distroseries=args.series)
for person in people:
perm = args.archive.newPackagesetUploader(
person=person, packageset=lp_set)
print("Added:")
print_perms([perm])
return
if args.component:
for person in people:
if not args.acl_type or args.acl_type == 'upload':
perm = args.archive.newComponentUploader(
person=person, component_name=args.component)
else:
perm = args.archive.newQueueAdmin(
person=person, component_name=args.component)
print("Added:")
print_perms([perm])
return
if args.pocket:
admin_kwargs = {}
if args.series:
admin_kwargs["distroseries"] = args.series
for person in people:
if not args.acl_type or args.acl_type == 'upload':
perm = args.archive.newPocketUploader(
person=person, pocket=args.pocket)
else:
perm = args.archive.newPocketQueueAdmin(
person=person, pocket=args.pocket, **admin_kwargs)
print("Added:")
print_perms([perm])
return
def do_delete(args):
"""Delete a permission."""
# We kind of hacked packageset management into here.
# Deleting packagesets doesn't require a person...
requires_person = not (args.packageset and not args.source)
if not validate_add_delete_options(args, requires_person):
return False
if args.packageset and args.source:
for packageset in args.packageset:
lp_set = launchpad.packagesets.getByName(
name=packageset, distroseries=args.series)
lp_set.removeSources(names=args.source)
print("Deleted:")
for source in args.source:
print(source)
return
if args.packageset and not args.person:
for packageset in args.packageset:
lp_set = launchpad.packagesets.getByName(
name=packageset, distroseries=args.series)
uploaders = args.archive.getUploadersForPackageset(
direct_permissions=True, packageset=lp_set)
if len(uploaders) > 0:
print("Cannot delete packageset with defined uploaders")
print("Current uploaders:")
for permission in uploaders:
print(" %s" % permission.person.name)
continue
print("Confirm removal of packageset '%s'" % lp_set.name)
print("Description:")
print(" " + lp_set.description.replace("\n", "\n "))
print("Containing Sources:")
for member in lp_set.getSourcesIncluded():
print(" %s" % member)
print("Containing packagesets:")
for member in lp_set.setsIncluded():
print(" %s" % member.name)
ack = input("Remove? (y/N): ")
if ack.lower() == 'y':
lp_set.lp_delete()
print("Deleted %s/%s" % (lp_set.name, args.series.name))
return
lp_people = [launchpad.people[person] for person in args.person]
if args.source:
for source in args.source:
for lp_person in lp_people:
try:
args.archive.deletePackageUploader(
person=lp_person, source_package_name=source)
print("Deleted %s/%s" % (lp_person.name, source))
except Exception:
print("Failed to delete %s/%s" % (lp_person.name, source))
return
if args.packageset:
for packageset in args.packageset:
lp_set = launchpad.packagesets.getByName(
name=packageset, distroseries=args.series)
for lp_person in lp_people:
args.archive.deletePackagesetUploader(
person=lp_person, packageset=lp_set)
print("Deleted %s/%s/%s" % (lp_person.name, packageset,
args.series.name))
return
if args.component:
for lp_person in lp_people:
if not args.acl_type or args.acl_type == 'upload':
args.archive.deleteComponentUploader(
person=lp_person, component_name=args.component)
print("Deleted %s/%s" % (lp_person.name, args.component))
else:
args.archive.deleteQueueAdmin(
person=lp_person, component_name=args.component)
print("Deleted %s/%s (admin)" % (lp_person.name,
args.component))
return
if args.pocket:
admin_kwargs = {}
if args.series:
admin_kwargs["distroseries"] = args.series
for lp_person in lp_people:
if not args.acl_type or args.acl_type == 'upload':
args.archive.deletePocketUploader(
person=lp_person, pocket=args.pocket)
print("Deleted %s/%s" % (lp_person.name, args.pocket))
else:
args.archive.deletePocketQueueAdmin(
person=lp_person, pocket=args.pocket, **admin_kwargs)
if args.series:
print(
"Deleted %s/%s/%s (admin)" %
(lp_person.name, args.pocket, args.series.name))
else:
print("Deleted %s/%s (admin)" %
(lp_person.name, args.pocket))
return
def do_create(args):
if not args.packageset:
print("You can only create a package set, not something else.")
return False
if not args.person or len(args.person) != 1:
print("You must specify exactly one person to own the new package "
"set.")
return False
distro_series = args.series or args.distro.current_series
lp_person = launchpad.people[args.person[0]]
for packageset in args.packageset:
try:
if launchpad.packagesets.getByName(
name=packageset, distroseries=distro_series):
print("Package set %s already exists" % packageset)
continue
except (TypeError, launchpadlib.errors.HTTPError):
pass
desc = multiline_input("Description for new package set %s:"
% packageset)
ps = launchpad.packagesets.new(
name=packageset, description=desc, distroseries=distro_series,
owner=lp_person)
print(ps)
def do_modify(args):
if not args.packageset:
print("You can only modify a package set, not something else.")
return False
if args.person and len(args.person) > 1:
print("You can only specify one person as the new packageset owner.")
return False
distro_series = args.series or args.distro.current_series
lp_person = None
if args.person:
lp_person = launchpad.people[args.person[0]]
for packageset in args.packageset:
lp_set = launchpad.packagesets.getByName(
name=packageset, distroseries=distro_series)
if lp_person:
print("Making %s the owner of %s/%s"
% (lp_person.name, lp_set.name, distro_series.name))
lp_set.owner = lp_person
lp_set.lp_save()
continue
print("Current description of %s:" % lp_set.name)
print(" " + lp_set.description.replace("\n", "\n "))
desc = multiline_input("New description [blank=leave unmodified]:")
if desc:
print("Modifying description of %s/%s"
% (lp_set.name, distro_series.name))
lp_set.description = desc
lp_set.lp_save()
continue
rename = input("Rename %s to? [blank=don't rename]: " % lp_set.name)
if rename:
print("Renaming %s/%s -> %s"
% (lp_set.name, distro_series.name, rename))
lp_set.name = rename
lp_set.lp_save()
continue
def do_copy(args):
if args.archive.self_link == args.distro.main_archive_link:
archives = args.distro.archives
else:
archives = [args.archive]
if not args.packageset:
print("You can only copy a package set, not something else.")
return False
distro_series = args.series or args.distro.current_series
dst = input("Name of the destination series: ")
dst_series = args.distro.getSeries(name_or_version=dst)
for packageset in args.packageset:
src_pkgset = launchpad.packagesets.getByName(
name=packageset, distroseries=distro_series)
if not src_pkgset:
print("Package set %s doesn't exist" % packageset)
ps = launchpad.packagesets.new(
name=packageset, description=src_pkgset.description,
distroseries=dst_series, owner=src_pkgset.owner_link,
related_set=src_pkgset)
print(ps)
ps.addSources(names=src_pkgset.getSourcesIncluded())
perms = []
for archive in archives:
perms.extend(archive.getUploadersForPackageset(
packageset=src_pkgset))
for perm in perms:
perm.archive.newPackagesetUploader(
person=perm.person_link, packageset=ps)
def do_check(args):
"""Check if a person can upload a package."""
if not args.person:
print("A person needs to be specified to check.")
return False
if not args.source:
print("A source package needs to be specified to check.")
return False
people = [launchpad.people[person] for person in args.person]
distro_series = args.series or args.distro.current_series
if args.pocket:
pocket = args.pocket
else:
pocket = 'Release'
for person in people:
for srcpkg in args.source:
try:
spph = args.archive.getPublishedSources(
distro_series=distro_series,
exact_match=True,
pocket=pocket,
source_name=srcpkg,
status='Published',
)[0]
except IndexError:
if not args.pocket:
raise
# Not yet in args.pocket, but maybe in Release?
spph = args.archive.getPublishedSources(
distro_series=distro_series,
exact_match=True,
pocket='Release',
source_name=srcpkg,
status='Published',
)[0]
try:
args.archive.checkUpload(
component=spph.component_name,
distroseries=distro_series,
person=person,
pocket=pocket,
sourcepackagename=srcpkg,
)
print("%s (%s) can upload %s to %s/%s" % (
person.display_name, person.name,
srcpkg, distro_series.displayname, pocket))
except launchpadlib.errors.HTTPError as e:
if e.response.status == 403:
print("%s (%s) cannot upload %s to %s/%s" % (
person.display_name, person.name,
srcpkg, distro_series.displayname, pocket))
else:
print("There was a %s error:" % e.response.status)
print(e.content)
def main(args):
if args.action == "query":
do_query(args)
elif args.action == "add":
do_add(args)
elif args.action in ("delete", "remove"):
do_delete(args)
elif args.action == "create":
do_create(args)
elif args.action == "modify":
do_modify(args)
elif args.action == "copy":
do_copy(args)
elif args.action == "check":
do_check(args)
else:
raise AssertionError("Invalid action %s" % args.action)
return
if __name__ == '__main__':
parser = argparse.ArgumentParser(
usage="%(prog)s [options] "
"query|add|delete|create|modify|copy|check",
epilog=lputils.ARCHIVE_REFERENCE_DESCRIPTION)
parser.add_argument(
"-l", "--launchpad", dest="launchpad_instance", default="production")
parser.add_argument("-A", "--archive", dest="archive")
parser.add_argument("-S", "--series", dest="series")
parser.add_argument("-p", "--person", dest="person", action="append")
parser.add_argument("-c", "--component", dest="component")
parser.add_argument(
"-P", "--packageset", dest="packageset", action="append")
parser.add_argument("-s", "--source", dest="source", action="append")
parser.add_argument("--pocket", dest="pocket")
parser.add_argument(
"-t", "--acl-type", dest="acl_type", help="ACL type: upload or admin")
parser.add_argument(
"--anon", dest="anon_login", action="store_true", default=False,
help="Login anonymously to Launchpad")
parser.add_argument(
"action",
choices=(
"query", "add", "delete", "create", "modify", "copy", "check"),
help="action to perform")
# Deprecated in favour of -A.
parser.add_argument(
"-d", "--distribution", dest="distro", default="ubuntu",
help=argparse.SUPPRESS)
args = parser.parse_args()
if args.anon_login and args.action not in ('query', 'check'):
print("E: Anonymous login not supported for this action.")
sys.exit(1)
if (args.action != 'query' and
not args.person and not args.component and
not args.packageset and not args.source and
not args.pocket):
parser.error("Provide at least one of "
"person/component/packageset/source/pocket")
if args.packageset and not args.series:
parser.error("Package set requires an associated series")
if args.acl_type and args.acl_type not in ('upload', 'admin'):
parser.error("Invalid ACL type '%s' (valid: 'upload', 'admin')" %
args.acl_type)
if args.acl_type == 'admin' and args.packageset:
parser.error("ACL type admin not allowed for package sets")
if args.acl_type == 'admin' and args.source:
parser.error("ACL type admin not allowed for source packages")
if args.pocket:
args.pocket = args.pocket.title()
if args.anon_login:
launchpad = Launchpad.login_anonymously(
CONSUMER_KEY, args.launchpad_instance, version="devel")
else:
launchpad = Launchpad.login_with(
CONSUMER_KEY, args.launchpad_instance, version="devel")
args.archive = get_archive(args, launchpad)
args.distro = args.archive.distribution
if args.series is not None:
args.series = args.distro.getSeries(name_or_version=args.series)
try:
main(args)
except launchpadlib.errors.HTTPError as err:
print("There was a %s error:" % err.response.status)
print(err.content)