#!/usr/bin/python # -*- coding: utf-8 -*- # Copyright (C) 2008, 2009, 2010, 2011, 2012 Canonical Ltd. # Copyright (C) 2010 Stéphane Graber # Copyright (C) 2010 Michael Bienia # Copyright (C) 2011 Iain Lane # Copyright (C) 2011 Soren Hansen # Copyright (C) 2012 Stefano Rivera # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; version 3 of the License. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . """Edit uploader permissions for the Ubuntu distro in Launchpad.""" from __future__ import print_function from optparse import OptionParser, SUPPRESS_HELP import sys import launchpadlib.errors from launchpadlib.launchpad import Launchpad import lputils if sys.version < '3': input = raw_input CONSUMER_KEY = "edit-acl" def print_perms(perms, series=None): for perm in perms: if (series is not None and perm.distro_series_name is not None and series.name != perm.distro_series_name): continue desc = [] desc.append("archive '%s'" % perm.archive.name) if perm.component_name: desc.append("component '%s'" % perm.component_name) if series: desc[-1] += ' in %s' % series if perm.package_set_name: desc.append("package set '%s' in %s" % (perm.package_set_name, perm.distro_series_name)) if perm.source_package_name: desc.append("source package '%s'" % perm.source_package_name) if perm.pocket: desc.append("pocket '%s'" % perm.pocket) if perm.distro_series_name is not None: desc[-1] += " in %s" % perm.distro_series_name print("%s for %s: %s" % (perm.permission, perm.person.name, ', '.join(desc))) def multiline_input(prompt): print(prompt) print("End with a line containing only a full-stop '.'") buf = [] while True: line = input() if line == '.': return '\n'.join(buf) buf.append(line) def get_archive(options, launchpad): # We default to looking up by archive reference (ubuntu, # ubuntu/partner or ~owner/ubuntu/ppa). if options.archive is not None: archive = launchpad.archives.getByReference(reference=options.archive) if archive is not None: return archive # But we also still support combining a distro name in -d and an # archive name or old PPA reference in -A (-d ubuntu, # -d ubuntu -A partner, or -d ubuntu -A owner/ppa). distro = launchpad.distributions[options.distro] if options.archive is None: return distro.main_archive else: if '/' in options.archive: owner, ppa_name = options.archive.split('/') return launchpad.people[owner].getPPAByName( distribution=distro, name=ppa_name) for archive in distro.archives: if archive.name == options.archive: return archive raise AssertionError("No such archive in Ubuntu: %s" % options.archive) def get_source_components(options, launchpad, archive, source): try: from debian import debian_support except ImportError: from debian_bundle import debian_support args = {} if options.series: args['distro_series'] = options.series newest = {} for spph in archive.getPublishedSources( source_name=source, exact_match=True, status='Published', **args): if not spph.distro_series.active: continue new_version = debian_support.Version(spph.source_package_version) if (spph.distro_series.name not in newest or new_version > newest[spph.distro_series.name][0]): newest[spph.distro_series.name] = (new_version, spph.component_name) for series in sorted(newest, key=lambda s: newest[s][0]): yield series, newest[series][1] permission_names = dict(upload='Archive Upload Rights', admin='Queue Administration Rights') def do_query(options): """Query existing permissions and show on stdout.""" if options.archive.self_link == options.distro.main_archive_link: archives = options.distro.archives else: archives = [options.archive] if options.person: for person in options.person: if '@' in person: lp_person = launchpad.people.getByEmail(email=person) else: try: lp_person = launchpad.people[person] except KeyError: print("Person '%s' doesn't exist." % person) sys.exit(1) perms = [] for archive in archives: perms.extend(archive.getPermissionsForPerson( person=lp_person)) if options.acl_type: perm_name = permission_names[options.acl_type] perms = [p for p in perms if p.permission == perm_name] print("== All rights for %s ==" % lp_person.name) print_perms(perms, options.series) if options.component: perms = [] if not options.acl_type or options.acl_type == 'upload': for archive in archives: perms.extend(archive.getUploadersForComponent( component_name=options.component)) if not options.acl_type or options.acl_type == 'admin': for archive in archives: perms.extend(archive.getQueueAdminsForComponent( component_name=options.component)) print("== All rights for component '%s' ==" % options.component) print_perms(perms, options.series) if options.packageset: for packageset in options.packageset: lp_set = launchpad.packagesets.getByName( name=packageset, distroseries=options.series) perms = [] for archive in archives: perms.extend(archive.getUploadersForPackageset( packageset=lp_set)) print(("== All uploaders for package set '%s' in '%s' " "(owned by '%s') ==" % (packageset, options.series.name, lp_set.owner.display_name))) print_perms(perms, options.series) sources = sorted(lp_set.getSourcesIncluded(direct_inclusion=True)) if sources: print() print("== All source packages in package set '%s' " "in '%s' ==" % (packageset, options.series.name)) for source in sources: print(source) child_sets = list(lp_set.setsIncluded(direct_inclusion=True)) if child_sets: print() print("== All package sets in package set '%s' in '%s' ==" % (packageset, options.series.name)) for child_set in child_sets: print(child_set.name) if options.source: for source in options.source: perms = [] perms_set = [] for archive in archives: perms.extend(archive.getUploadersForPackage( source_package_name=source)) perms_set.extend(archive.getPackagesetsForSource( sourcepackagename=source)) print("== All uploaders for package '%s' ==" % source) print_perms(perms, options.series) print_perms(perms_set, options.series) for archive in archives: for series, component in get_source_components( options, launchpad, archive, source): perms_component = archive.getUploadersForComponent( component_name=component) print_perms(perms_component, series=series) if options.pocket: perms = [] if not options.acl_type or options.acl_type == 'upload': for archive in archives: perms.extend(archive.getUploadersForPocket( pocket=options.pocket)) if not options.acl_type or options.acl_type == 'admin': for archive in archives: perms.extend(archive.getQueueAdminsForPocket( pocket=options.pocket)) print("== All rights for pocket '%s' ==" % options.pocket) print_perms(perms, options.series) if (not options.person and not options.component and not options.packageset and not options.source and not options.pocket): perms = [] for archive in archives: perms.extend(archive.getAllPermissions()) if options.acl_type: perm_name = permission_names[options.acl_type] perms = [p for p in perms if p.permission == perm_name] print("== All rights ==") print_perms(perms, options.series) def validate_add_delete_options(options, requires_person=True): if options.packageset and options.source: # Special options to manage package sets, bodged into this tool # since they aren't entirely inconvenient here. if options.component or options.person: print("-P -s cannot be used with a " "component or person as well") return False return True if requires_person and not options.person: print("You must specify at least one person to (de-)authorise.") return False count = 0 if options.component: count += 1 if options.packageset: count += 1 if options.source: count += 1 if options.pocket: count += 1 if count > 1: print("You can only specify one of package set, source, component, " "or pocket") return False if count == 0: print("You must specify one of package set, source, component, or " "pocket") return False return True def do_add(options): """Add a new permission.""" if not validate_add_delete_options(options): return False if options.packageset and options.source: for packageset in options.packageset: lp_set = launchpad.packagesets.getByName( name=packageset, distroseries=options.series) lp_set.addSources(names=options.source) print("Added:") for source in options.source: print(source) return people = [launchpad.people[person] for person in options.person] if options.source: for source in options.source: for person in people: perm = options.archive.newPackageUploader( person=person, source_package_name=source) print("Added:") print_perms([perm]) return if options.packageset: for packageset in options.packageset: lp_set = launchpad.packagesets.getByName( name=packageset, distroseries=options.series) for person in people: perm = options.archive.newPackagesetUploader( person=person, packageset=lp_set) print("Added:") print_perms([perm]) return if options.component: for person in people: if not options.acl_type or options.acl_type == 'upload': perm = options.archive.newComponentUploader( person=person, component_name=options.component) else: perm = options.archive.newQueueAdmin( person=person, component_name=options.component) print("Added:") print_perms([perm]) return if options.pocket: admin_kwargs = {} if options.series: admin_kwargs["distroseries"] = options.series for person in people: if not options.acl_type or options.acl_type == 'upload': perm = options.archive.newPocketUploader( person=person, pocket=options.pocket) else: perm = options.archive.newPocketQueueAdmin( person=person, pocket=options.pocket, **admin_kwargs) print("Added:") print_perms([perm]) return def do_delete(options): """Delete a permission.""" # We kind of hacked packageset management into here. # Deleting packagesets doesn't require a person... requires_person = not (options.packageset and not options.source) if not validate_add_delete_options(options, requires_person): return False if options.packageset and options.source: for packageset in options.packageset: lp_set = launchpad.packagesets.getByName( name=packageset, distroseries=options.series) lp_set.removeSources(names=options.source) print("Deleted:") for source in options.source: print(source) return if options.packageset and not options.person: for packageset in options.packageset: lp_set = launchpad.packagesets.getByName( name=packageset, distroseries=options.series) uploaders = options.archive.getUploadersForPackageset( direct_permissions=True, packageset=lp_set) if len(uploaders) > 0: print("Cannot delete packageset with defined uploaders") print("Current uploaders:") for permission in uploaders: print(" %s" % permission.person.name) continue print("Confirm removal of packageset '%s'" % lp_set.name) print("Description:") print(" " + lp_set.description.replace("\n", "\n ")) print("Containing Sources:") for member in lp_set.getSourcesIncluded(): print(" %s" % member) print("Containing packagesets:") for member in lp_set.setsIncluded(): print(" %s" % member.name) ack = input("Remove? (y/N): ") if ack.lower() == 'y': lp_set.lp_delete() print("Deleted %s/%s" % (lp_set.name, options.series.name)) return lp_people = [launchpad.people[person] for person in options.person] if options.source: for source in options.source: for lp_person in lp_people: try: options.archive.deletePackageUploader( person=lp_person, source_package_name=source) print("Deleted %s/%s" % (lp_person.name, source)) except Exception: print("Failed to delete %s/%s" % (lp_person.name, source)) return if options.packageset: for packageset in options.packageset: lp_set = launchpad.packagesets.getByName( name=packageset, distroseries=options.series) for lp_person in lp_people: options.archive.deletePackagesetUploader( person=lp_person, packageset=lp_set) print("Deleted %s/%s/%s" % (lp_person.name, packageset, options.series.name)) return if options.component: for lp_person in lp_people: if not options.acl_type or options.acl_type == 'upload': options.archive.deleteComponentUploader( person=lp_person, component_name=options.component) print("Deleted %s/%s" % (lp_person.name, options.component)) else: options.archive.deleteQueueAdmin( person=lp_person, component_name=options.component) print("Deleted %s/%s (admin)" % (lp_person.name, options.component)) return if options.pocket: admin_kwargs = {} if options.series: admin_kwargs["distroseries"] = options.series for lp_person in lp_people: if not options.acl_type or options.acl_type == 'upload': options.archive.deletePocketUploader( person=lp_person, pocket=options.pocket) print("Deleted %s/%s" % (lp_person.name, options.pocket)) else: options.archive.deletePocketQueueAdmin( person=lp_person, pocket=options.pocket, **admin_kwargs) if options.series: print( "Deleted %s/%s/%s (admin)" % (lp_person.name, options.pocket, options.series.name)) else: print("Deleted %s/%s (admin)" % (lp_person.name, options.pocket)) return def do_create(options): if not options.packageset: print("You can only create a package set, not something else.") return False if not options.person or len(options.person) != 1: print("You must specify exactly one person to own the new package " "set.") return False distro_series = options.series or options.distro.current_series lp_person = launchpad.people[options.person[0]] for packageset in options.packageset: try: if launchpad.packagesets.getByName( name=packageset, distroseries=distro_series): print("Package set %s already exists" % packageset) continue except (TypeError, launchpadlib.errors.HTTPError): pass desc = multiline_input("Description for new package set %s:" % packageset) ps = launchpad.packagesets.new( name=packageset, description=desc, distroseries=distro_series, owner=lp_person) print(ps) def do_modify(options): if not options.packageset: print("You can only modify a package set, not something else.") return False if options.person and len(options.person) > 1: print("You can only specify one person as the new packageset owner.") return False distro_series = options.series or options.distro.current_series lp_person = None if options.person: lp_person = launchpad.people[options.person[0]] for packageset in options.packageset: lp_set = launchpad.packagesets.getByName( name=packageset, distroseries=distro_series) if lp_person: print("Making %s the owner of %s/%s" % (lp_person.name, lp_set.name, distro_series.name)) lp_set.owner = lp_person lp_set.lp_save() continue print("Current description of %s:" % lp_set.name) print(" " + lp_set.description.replace("\n", "\n ")) desc = multiline_input("New description [blank=leave unmodified]:") if desc: print("Modifying description of %s/%s" % (lp_set.name, distro_series.name)) lp_set.description = desc lp_set.lp_save() continue rename = input("Rename %s to? [blank=don't rename]: " % lp_set.name) if rename: print("Renaming %s/%s -> %s" % (lp_set.name, distro_series.name, rename)) lp_set.name = rename lp_set.lp_save() continue def do_copy(options): if options.archive.self_link == options.distro.main_archive_link: archives = options.distro.archives else: archives = [options.archive] if not options.packageset: print("You can only copy a package set, not something else.") return False distro_series = options.series or options.distro.current_series dst = input("Name of the destination series: ") dst_series = options.distro.getSeries(name_or_version=dst) for packageset in options.packageset: src_pkgset = launchpad.packagesets.getByName( name=packageset, distroseries=distro_series) if not src_pkgset: print("Package set %s doesn't exist" % packageset) ps = launchpad.packagesets.new( name=packageset, description=src_pkgset.description, distroseries=dst_series, owner=src_pkgset.owner_link, related_set=src_pkgset) print(ps) ps.addSources(names=src_pkgset.getSourcesIncluded()) perms = [] for archive in archives: perms.extend(archive.getUploadersForPackageset( packageset=src_pkgset)) for perm in perms: perm.archive.newPackagesetUploader( person=perm.person_link, packageset=ps) def do_check(options): """Check if a person can upload a package.""" if not options.person: print("A person needs to be specified to check.") return False if not options.source: print("A source package needs to be specified to check.") return False people = [launchpad.people[person] for person in options.person] distro_series = options.series or options.distro.current_series if options.pocket: pocket = options.pocket else: pocket = 'Release' for person in people: for srcpkg in options.source: try: spph = options.archive.getPublishedSources( distro_series=distro_series, exact_match=True, pocket=pocket, source_name=srcpkg, status='Published', )[0] except IndexError: if not options.pocket: raise # Not yet in options.pocket, but maybe in Release? spph = options.archive.getPublishedSources( distro_series=distro_series, exact_match=True, pocket='Release', source_name=srcpkg, status='Published', )[0] try: options.archive.checkUpload( component=spph.component_name, distroseries=distro_series, person=person, pocket=pocket, sourcepackagename=srcpkg, ) print("%s (%s) can upload %s to %s/%s" % ( person.display_name, person.name, srcpkg, distro_series.displayname, pocket)) except launchpadlib.errors.HTTPError as e: if e.response.status == 403: print("%s (%s) cannot upload %s to %s/%s" % ( person.display_name, person.name, srcpkg, distro_series.displayname, pocket)) else: print("There was a %s error:" % e.response.status) print(e.content) def main(options, action): if action == "query": do_query(options) elif action == "add": do_add(options) elif action in ("delete", "remove"): do_delete(options) elif action == "create": do_create(options) elif action == "modify": do_modify(options) elif action == "copy": do_copy(options) elif action == "check": do_check(options) else: raise AssertionError("Invalid action %s" % action) return if __name__ == '__main__': parser = OptionParser( usage="usage: %prog [options] " "query|add|delete|create|modify|copy|check", epilog=lputils.ARCHIVE_REFERENCE_DESCRIPTION) parser.add_option( "-l", "--launchpad", dest="launchpad_instance", default="production") parser.add_option("-A", "--archive", dest="archive") parser.add_option("-S", "--series", dest="series") parser.add_option("-p", "--person", dest="person", action="append") parser.add_option("-c", "--component", dest="component") parser.add_option("-P", "--packageset", dest="packageset", action="append") parser.add_option("-s", "--source", dest="source", action="append") parser.add_option("--pocket", dest="pocket") parser.add_option("-t", "--acl-type", dest="acl_type", help="ACL type: upload or admin") parser.add_option("--anon", dest="anon_login", action="store_true", default=False, help="Login anonymously to Launchpad") # Deprecated in favour of -A. parser.add_option( "-d", "--distribution", dest="distro", default="ubuntu", help=SUPPRESS_HELP) options, args = parser.parse_args() possible_actions = ('query', 'add', 'delete', 'create', 'copy', 'check') if len(args) != 1: parser.error( "You must specify an action, one of:\n%s" % ", ".join(possible_actions)) if options.anon_login and args[0] not in ('query', 'check'): print("E: Anonymous login not supported for this action.") sys.exit(1) if (args[0] != 'query' and not options.person and not options.component and not options.packageset and not options.source and not options.pocket): parser.error("Provide at least one of " "person/component/packageset/source/pocket") if options.packageset and not options.series: parser.error("Package set requires an associated series") if options.acl_type and options.acl_type not in ('upload', 'admin'): parser.error("Invalid ACL type '%s' (valid: 'upload', 'admin')" % options.acl_type) if options.acl_type == 'admin' and options.packageset: parser.error("ACL type admin not allowed for package sets") if options.acl_type == 'admin' and options.source: parser.error("ACL type admin not allowed for source packages") if options.pocket: options.pocket = options.pocket.title() if options.anon_login: launchpad = Launchpad.login_anonymously( CONSUMER_KEY, options.launchpad_instance, version="devel") else: launchpad = Launchpad.login_with( CONSUMER_KEY, options.launchpad_instance, version="devel") options.archive = get_archive(options, launchpad) options.distro = options.archive.distribution if options.series is not None: options.series = options.distro.getSeries( name_or_version=options.series) try: main(options, args[0]) except launchpadlib.errors.HTTPError as err: print("There was a %s error:" % err.response.status) print(err.content)