#!/usr/bin/python3 # -*- coding: utf-8 -*- # Copyright (C) 2008, 2009, 2010, 2011, 2012 Canonical Ltd. # Copyright (C) 2010 Stéphane Graber # Copyright (C) 2010 Michael Bienia # Copyright (C) 2011 Iain Lane # Copyright (C) 2011 Soren Hansen # Copyright (C) 2012 Stefano Rivera # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; version 3 of the License. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . """Edit uploader permissions for the Ubuntu distro in Launchpad.""" from __future__ import print_function import argparse import sys import launchpadlib.errors from launchpadlib.launchpad import Launchpad import lputils if sys.version < '3': input = raw_input CONSUMER_KEY = "edit-acl" def print_perms(perms, series=None): for perm in perms: if (series is not None and perm.distro_series_name is not None and series.name != perm.distro_series_name): continue desc = [] desc.append("archive '%s'" % perm.archive.name) if perm.component_name: desc.append("component '%s'" % perm.component_name) if series: desc[-1] += ' in %s' % series if perm.package_set_name: desc.append("package set '%s' in %s" % (perm.package_set_name, perm.distro_series_name)) if perm.source_package_name: desc.append("source package '%s'" % perm.source_package_name) if perm.pocket: desc.append("pocket '%s'" % perm.pocket) if perm.distro_series_name is not None: desc[-1] += " in %s" % perm.distro_series_name print("%s for %s: %s" % (perm.permission, perm.person.name, ', '.join(desc))) def multiline_input(prompt): print(prompt) print("End with a line containing only a full-stop '.'") buf = [] while True: line = input() if line == '.': return '\n'.join(buf) buf.append(line) def get_archive(args, launchpad): # We default to looking up by archive reference (ubuntu, # ubuntu/partner or ~owner/ubuntu/ppa). if args.archive is not None: archive = launchpad.archives.getByReference(reference=args.archive) if archive is not None: return archive # But we also still support combining a distro name in -d and an # archive name or old PPA reference in -A (-d ubuntu, # -d ubuntu -A partner, or -d ubuntu -A owner/ppa). distro = launchpad.distributions[args.distro] if args.archive is None: return distro.main_archive else: if '/' in args.archive: owner, ppa_name = args.archive.split('/') return launchpad.people[owner].getPPAByName( distribution=distro, name=ppa_name) for archive in distro.archives: if archive.name == args.archive: return archive raise AssertionError("No such archive in Ubuntu: %s" % args.archive) def get_source_components(args, launchpad, archive, source): try: from debian import debian_support except ImportError: from debian_bundle import debian_support kwargs = {} if args.series: kwargs['distro_series'] = args.series newest = {} for spph in archive.getPublishedSources( source_name=source, exact_match=True, status='Published', **kwargs): if not spph.distro_series.active: continue new_version = debian_support.Version(spph.source_package_version) if (spph.distro_series.name not in newest or new_version > newest[spph.distro_series.name][0]): newest[spph.distro_series.name] = (new_version, spph.component_name) for series in sorted(newest, key=lambda s: newest[s][0]): yield series, newest[series][1] permission_names = dict(upload='Archive Upload Rights', admin='Queue Administration Rights') def do_query(args): """Query existing permissions and show on stdout.""" if args.archive.self_link == args.distro.main_archive_link: archives = args.distro.archives else: archives = [args.archive] if args.person: for person in args.person: if '@' in person: lp_person = launchpad.people.getByEmail(email=person) else: try: lp_person = launchpad.people[person] except KeyError: print("Person '%s' doesn't exist." % person) sys.exit(1) perms = [] for archive in archives: perms.extend(archive.getPermissionsForPerson( person=lp_person)) if args.acl_type: perm_name = permission_names[args.acl_type] perms = [p for p in perms if p.permission == perm_name] print("== All rights for %s ==" % lp_person.name) print_perms(perms, args.series) if args.component: perms = [] if not args.acl_type or args.acl_type == 'upload': for archive in archives: perms.extend(archive.getUploadersForComponent( component_name=args.component)) if not args.acl_type or args.acl_type == 'admin': for archive in archives: perms.extend(archive.getQueueAdminsForComponent( component_name=args.component)) print("== All rights for component '%s' ==" % args.component) print_perms(perms, args.series) if args.packageset: for packageset in args.packageset: lp_set = launchpad.packagesets.getByName( name=packageset, distroseries=args.series) perms = [] for archive in archives: perms.extend(archive.getUploadersForPackageset( packageset=lp_set)) print(("== All uploaders for package set '%s' in '%s' " "(owned by '%s') ==" % (packageset, args.series.name, lp_set.owner.display_name))) print_perms(perms, args.series) sources = sorted(lp_set.getSourcesIncluded(direct_inclusion=True)) if sources: print() print("== All source packages in package set '%s' " "in '%s' ==" % (packageset, args.series.name)) for source in sources: print(source) child_sets = list(lp_set.setsIncluded(direct_inclusion=True)) if child_sets: print() print("== All package sets in package set '%s' in '%s' ==" % (packageset, args.series.name)) for child_set in child_sets: print(child_set.name) if args.source: for source in args.source: perms = [] perms_set = [] for archive in archives: perms.extend(archive.getUploadersForPackage( source_package_name=source)) perms_set.extend(archive.getPackagesetsForSource( sourcepackagename=source)) print("== All uploaders for package '%s' ==" % source) print_perms(perms, args.series) print_perms(perms_set, args.series) for archive in archives: for series, component in get_source_components( args, launchpad, archive, source): perms_component = archive.getUploadersForComponent( component_name=component) print_perms(perms_component, series=series) if args.pocket: perms = [] if not args.acl_type or args.acl_type == 'upload': for archive in archives: perms.extend(archive.getUploadersForPocket(pocket=args.pocket)) if not args.acl_type or args.acl_type == 'admin': for archive in archives: perms.extend(archive.getQueueAdminsForPocket( pocket=args.pocket)) print("== All rights for pocket '%s' ==" % args.pocket) print_perms(perms, args.series) if (not args.person and not args.component and not args.packageset and not args.source and not args.pocket): perms = [] for archive in archives: perms.extend(archive.getAllPermissions()) if args.acl_type: perm_name = permission_names[args.acl_type] perms = [p for p in perms if p.permission == perm_name] print("== All rights ==") print_perms(perms, args.series) def validate_add_delete_options(args, requires_person=True): if args.packageset and args.source: # Special options to manage package sets, bodged into this tool # since they aren't entirely inconvenient here. if args.component or args.person: print("-P -s cannot be used with a " "component or person as well") return False return True if requires_person and not args.person: print("You must specify at least one person to (de-)authorise.") return False count = 0 if args.component: count += 1 if args.packageset: count += 1 if args.source: count += 1 if args.pocket: count += 1 if count > 1: print("You can only specify one of package set, source, component, " "or pocket") return False if count == 0: print("You must specify one of package set, source, component, or " "pocket") return False return True def do_add(args): """Add a new permission.""" if not validate_add_delete_options(args): return False if args.packageset and args.source: for packageset in args.packageset: lp_set = launchpad.packagesets.getByName( name=packageset, distroseries=args.series) lp_set.addSources(names=args.source) print("Added:") for source in args.source: print(source) return people = [launchpad.people[person] for person in args.person] if args.source: for source in args.source: for person in people: perm = args.archive.newPackageUploader( person=person, source_package_name=source) print("Added:") print_perms([perm]) return if args.packageset: for packageset in args.packageset: lp_set = launchpad.packagesets.getByName( name=packageset, distroseries=args.series) for person in people: perm = args.archive.newPackagesetUploader( person=person, packageset=lp_set) print("Added:") print_perms([perm]) return if args.component: for person in people: if not args.acl_type or args.acl_type == 'upload': perm = args.archive.newComponentUploader( person=person, component_name=args.component) else: perm = args.archive.newQueueAdmin( person=person, component_name=args.component) print("Added:") print_perms([perm]) return if args.pocket: admin_kwargs = {} if args.series: admin_kwargs["distroseries"] = args.series for person in people: if not args.acl_type or args.acl_type == 'upload': perm = args.archive.newPocketUploader( person=person, pocket=args.pocket) else: perm = args.archive.newPocketQueueAdmin( person=person, pocket=args.pocket, **admin_kwargs) print("Added:") print_perms([perm]) return def do_delete(args): """Delete a permission.""" # We kind of hacked packageset management into here. # Deleting packagesets doesn't require a person... requires_person = not (args.packageset and not args.source) if not validate_add_delete_options(args, requires_person): return False if args.packageset and args.source: for packageset in args.packageset: lp_set = launchpad.packagesets.getByName( name=packageset, distroseries=args.series) lp_set.removeSources(names=args.source) print("Deleted:") for source in args.source: print(source) return if args.packageset and not args.person: for packageset in args.packageset: lp_set = launchpad.packagesets.getByName( name=packageset, distroseries=args.series) uploaders = args.archive.getUploadersForPackageset( direct_permissions=True, packageset=lp_set) if len(uploaders) > 0: print("Cannot delete packageset with defined uploaders") print("Current uploaders:") for permission in uploaders: print(" %s" % permission.person.name) continue print("Confirm removal of packageset '%s'" % lp_set.name) print("Description:") print(" " + lp_set.description.replace("\n", "\n ")) print("Containing Sources:") for member in lp_set.getSourcesIncluded(): print(" %s" % member) print("Containing packagesets:") for member in lp_set.setsIncluded(): print(" %s" % member.name) ack = input("Remove? (y/N): ") if ack.lower() == 'y': lp_set.lp_delete() print("Deleted %s/%s" % (lp_set.name, args.series.name)) return lp_people = [launchpad.people[person] for person in args.person] if args.source: for source in args.source: for lp_person in lp_people: try: args.archive.deletePackageUploader( person=lp_person, source_package_name=source) print("Deleted %s/%s" % (lp_person.name, source)) except Exception: print("Failed to delete %s/%s" % (lp_person.name, source)) return if args.packageset: for packageset in args.packageset: lp_set = launchpad.packagesets.getByName( name=packageset, distroseries=args.series) for lp_person in lp_people: args.archive.deletePackagesetUploader( person=lp_person, packageset=lp_set) print("Deleted %s/%s/%s" % (lp_person.name, packageset, args.series.name)) return if args.component: for lp_person in lp_people: if not args.acl_type or args.acl_type == 'upload': args.archive.deleteComponentUploader( person=lp_person, component_name=args.component) print("Deleted %s/%s" % (lp_person.name, args.component)) else: args.archive.deleteQueueAdmin( person=lp_person, component_name=args.component) print("Deleted %s/%s (admin)" % (lp_person.name, args.component)) return if args.pocket: admin_kwargs = {} if args.series: admin_kwargs["distroseries"] = args.series for lp_person in lp_people: if not args.acl_type or args.acl_type == 'upload': args.archive.deletePocketUploader( person=lp_person, pocket=args.pocket) print("Deleted %s/%s" % (lp_person.name, args.pocket)) else: args.archive.deletePocketQueueAdmin( person=lp_person, pocket=args.pocket, **admin_kwargs) if args.series: print( "Deleted %s/%s/%s (admin)" % (lp_person.name, args.pocket, args.series.name)) else: print("Deleted %s/%s (admin)" % (lp_person.name, args.pocket)) return def do_create(args): if not args.packageset: print("You can only create a package set, not something else.") return False if not args.person or len(args.person) != 1: print("You must specify exactly one person to own the new package " "set.") return False distro_series = args.series or args.distro.current_series lp_person = launchpad.people[args.person[0]] for packageset in args.packageset: try: if launchpad.packagesets.getByName( name=packageset, distroseries=distro_series): print("Package set %s already exists" % packageset) continue except (TypeError, launchpadlib.errors.HTTPError): pass desc = multiline_input("Description for new package set %s:" % packageset) ps = launchpad.packagesets.new( name=packageset, description=desc, distroseries=distro_series, owner=lp_person) print(ps) def do_modify(args): if not args.packageset: print("You can only modify a package set, not something else.") return False if args.person and len(args.person) > 1: print("You can only specify one person as the new packageset owner.") return False distro_series = args.series or args.distro.current_series lp_person = None if args.person: lp_person = launchpad.people[args.person[0]] for packageset in args.packageset: lp_set = launchpad.packagesets.getByName( name=packageset, distroseries=distro_series) if lp_person: print("Making %s the owner of %s/%s" % (lp_person.name, lp_set.name, distro_series.name)) lp_set.owner = lp_person lp_set.lp_save() continue print("Current description of %s:" % lp_set.name) print(" " + lp_set.description.replace("\n", "\n ")) desc = multiline_input("New description [blank=leave unmodified]:") if desc: print("Modifying description of %s/%s" % (lp_set.name, distro_series.name)) lp_set.description = desc lp_set.lp_save() continue rename = input("Rename %s to? [blank=don't rename]: " % lp_set.name) if rename: print("Renaming %s/%s -> %s" % (lp_set.name, distro_series.name, rename)) lp_set.name = rename lp_set.lp_save() continue def do_copy(args): if args.archive.self_link == args.distro.main_archive_link: archives = args.distro.archives else: archives = [args.archive] if not args.packageset: print("You can only copy a package set, not something else.") return False distro_series = args.series or args.distro.current_series dst = input("Name of the destination series: ") dst_series = args.distro.getSeries(name_or_version=dst) for packageset in args.packageset: src_pkgset = launchpad.packagesets.getByName( name=packageset, distroseries=distro_series) if not src_pkgset: print("Package set %s doesn't exist" % packageset) ps = launchpad.packagesets.new( name=packageset, description=src_pkgset.description, distroseries=dst_series, owner=src_pkgset.owner_link, related_set=src_pkgset) print(ps) ps.addSources(names=src_pkgset.getSourcesIncluded()) perms = [] for archive in archives: perms.extend(archive.getUploadersForPackageset( packageset=src_pkgset)) for perm in perms: perm.archive.newPackagesetUploader( person=perm.person_link, packageset=ps) def do_check(args): """Check if a person can upload a package.""" if not args.person: print("A person needs to be specified to check.") return False if not args.source: print("A source package needs to be specified to check.") return False people = [launchpad.people[person] for person in args.person] distro_series = args.series or args.distro.current_series if args.pocket: pocket = args.pocket else: pocket = 'Release' for person in people: for srcpkg in args.source: try: spph = args.archive.getPublishedSources( distro_series=distro_series, exact_match=True, pocket=pocket, source_name=srcpkg, status='Published', )[0] except IndexError: if not args.pocket: raise # Not yet in args.pocket, but maybe in Release? spph = args.archive.getPublishedSources( distro_series=distro_series, exact_match=True, pocket='Release', source_name=srcpkg, status='Published', )[0] try: args.archive.checkUpload( component=spph.component_name, distroseries=distro_series, person=person, pocket=pocket, sourcepackagename=srcpkg, ) print("%s (%s) can upload %s to %s/%s" % ( person.display_name, person.name, srcpkg, distro_series.displayname, pocket)) except launchpadlib.errors.HTTPError as e: if e.response.status == 403: print("%s (%s) cannot upload %s to %s/%s" % ( person.display_name, person.name, srcpkg, distro_series.displayname, pocket)) else: print("There was a %s error:" % e.response.status) print(e.content) def main(args): if args.action == "query": do_query(args) elif args.action == "add": do_add(args) elif args.action in ("delete", "remove"): do_delete(args) elif args.action == "create": do_create(args) elif args.action == "modify": do_modify(args) elif args.action == "copy": do_copy(args) elif args.action == "check": do_check(args) else: raise AssertionError("Invalid action %s" % args.action) return if __name__ == '__main__': parser = argparse.ArgumentParser( usage="%(prog)s [options] " "query|add|delete|create|modify|copy|check", epilog=lputils.ARCHIVE_REFERENCE_DESCRIPTION) parser.add_argument( "-l", "--launchpad", dest="launchpad_instance", default="production") parser.add_argument("-A", "--archive", dest="archive") parser.add_argument("-S", "--series", dest="series") parser.add_argument("-p", "--person", dest="person", action="append") parser.add_argument("-c", "--component", dest="component") parser.add_argument( "-P", "--packageset", dest="packageset", action="append") parser.add_argument("-s", "--source", dest="source", action="append") parser.add_argument("--pocket", dest="pocket") parser.add_argument( "-t", "--acl-type", dest="acl_type", help="ACL type: upload or admin") parser.add_argument( "--anon", dest="anon_login", action="store_true", default=False, help="Login anonymously to Launchpad") parser.add_argument( "action", choices=( "query", "add", "delete", "create", "modify", "copy", "check"), help="action to perform") # Deprecated in favour of -A. parser.add_argument( "-d", "--distribution", dest="distro", default="ubuntu", help=argparse.SUPPRESS) args = parser.parse_args() if args.anon_login and args.action not in ('query', 'check'): print("E: Anonymous login not supported for this action.") sys.exit(1) if (args.action != 'query' and not args.person and not args.component and not args.packageset and not args.source and not args.pocket): parser.error("Provide at least one of " "person/component/packageset/source/pocket") if args.packageset and not args.series: parser.error("Package set requires an associated series") if args.acl_type and args.acl_type not in ('upload', 'admin'): parser.error("Invalid ACL type '%s' (valid: 'upload', 'admin')" % args.acl_type) if args.acl_type == 'admin' and args.packageset: parser.error("ACL type admin not allowed for package sets") if args.acl_type == 'admin' and args.source: parser.error("ACL type admin not allowed for source packages") if args.pocket: args.pocket = args.pocket.title() if args.anon_login: launchpad = Launchpad.login_anonymously( CONSUMER_KEY, args.launchpad_instance, version="devel") else: launchpad = Launchpad.login_with( CONSUMER_KEY, args.launchpad_instance, version="devel") args.archive = get_archive(args, launchpad) args.distro = args.archive.distribution if args.series is not None: args.series = args.distro.getSeries(name_or_version=args.series) try: main(args) except launchpadlib.errors.HTTPError as err: print("There was a %s error:" % err.response.status) print(err.content)