Update ubuntu-archive-tools.

master
Simon Quigley 4 years ago
parent 64dd1fc405
commit 363622addf

@ -1 +1 @@
1209 cjwatson@canonical.com-20190220074107-dvkdscxl2y2ww9j6
1324 steve.langasek@canonical.com-20200516195933-ymljxy32gq2m13p9

@ -1,4 +1,4 @@
#!/usr/bin/env python
#!/usr/bin/env python2.7
# Check for override mismatches between architectures
# Copyright (C) 2005, 2008, 2009, 2010, 2011, 2012 Canonical Ltd.

@ -1,4 +1,4 @@
#! /usr/bin/python
#! /usr/bin/python2.7
# Copyright 2009-2012 Canonical Ltd. This software is licensed under the
# GNU Affero General Public License version 3.

@ -1,4 +1,4 @@
#! /usr/bin/python
#! /usr/bin/python2.7
# Copyright (C) 2016 Canonical Ltd.
# Author: Colin Watson <cjwatson@ubuntu.com>

@ -1,4 +1,4 @@
#! /usr/bin/python
#! /usr/bin/python3
# Copyright (C) 2012 Canonical Ltd.
# Author: Colin Watson <cjwatson@ubuntu.com>
@ -19,67 +19,74 @@
from __future__ import print_function
from optparse import OptionParser
import argparse
from launchpadlib.launchpad import Launchpad
import lazr.restfulclient.errors
def branch_livefses(options, owner):
for livefs in list(options.launchpad.livefses):
def branch_livefses(args, owner):
for livefs in list(args.launchpad.livefses):
if (livefs.owner == owner and
livefs.distro_series == options.source_series):
livefs.distro_series == args.source_series):
print("Branching %s for %s ..." % (
livefs.web_link, options.dest_series.name))
new_livefs = options.launchpad.livefses.new(
owner=owner, distro_series=options.dest_series,
name=livefs.name, metadata=livefs.metadata)
livefs.web_link, args.dest_series.name))
try:
new_livefs = args.launchpad.livefses.getByName(
owner=owner, distro_series=args.dest_series,
name=livefs.name)
except lazr.restfulclient.errors.NotFound:
new_livefs = args.launchpad.livefses.new(
owner=owner, distro_series=args.dest_series,
name=livefs.name, metadata=livefs.metadata)
new_livefs.require_virtualized = livefs.require_virtualized
new_livefs.relative_build_score = livefs.relative_build_score
new_livefs.lp_save()
try:
new_livefs.lp_save()
except lazr.restfulclient.errors.Unauthorized:
print("Could not devirt ppa, ask Launchpad team for support.")
pass
print(" %s" % new_livefs.web_link)
def main():
parser = OptionParser(usage="usage: %prog [options] OWNER")
parser.add_option(
parser = argparse.ArgumentParser()
parser.add_argument(
"-l", "--launchpad", dest="launchpad_instance", default="production")
parser.add_option(
parser.add_argument(
"-d", "--distribution", default="ubuntu", metavar="DISTRIBUTION",
help="branch live filesystems for DISTRIBUTION")
parser.add_option(
parser.add_argument(
"--source-series",
help="source series (default: current stable release)")
parser.add_option(
parser.add_argument(
"--dest-series",
help="destination series (default: series in pre-release freeze)")
options, args = parser.parse_args()
if not args:
parser.error(
"You must specify an owner whose live filesystems you want to "
"copy.")
options.launchpad = Launchpad.login_with(
"branch-livefses", options.launchpad_instance, version="devel")
distro = options.launchpad.distributions[options.distribution]
if options.source_series is None:
options.source_series = [
parser.add_argument("owner", help="owner of live filesystems to copy")
args = parser.parse_args()
args.launchpad = Launchpad.login_with(
"branch-livefses", args.launchpad_instance, version="devel")
distro = args.launchpad.distributions[args.distribution]
if args.source_series is None:
args.source_series = [
series for series in distro.series
if series.status == "Current Stable Release"][0]
else:
options.source_series = distro.getSeries(
name_or_version=options.source_series)
if options.dest_series is None:
options.dest_series = [
args.source_series = distro.getSeries(
name_or_version=args.source_series)
if args.dest_series is None:
args.dest_series = [
series for series in distro.series
if series.status == "Pre-release Freeze"][0]
else:
options.dest_series = distro.getSeries(
name_or_version=options.dest_series)
args.dest_series = distro.getSeries(
name_or_version=args.dest_series)
owner = options.launchpad.people[args[0]]
owner = args.launchpad.people[args.owner]
branch_livefses(options, owner)
branch_livefses(args, owner)
if __name__ == '__main__':

@ -1,4 +1,4 @@
#! /usr/bin/python
#! /usr/bin/python2.7
# Copyright (C) 2012 Canonical Ltd.
# Author: Colin Watson <cjwatson@ubuntu.com>
@ -22,6 +22,7 @@ from __future__ import print_function
from optparse import OptionParser
import os
import re
import time
import subprocess
try:
from urllib.parse import urlparse
@ -134,8 +135,18 @@ def branch(options, collection):
if "git.launchpad.net" in remote:
lp_git_repo = options.launchpad.git_repositories.getByPath(
path=urlparse(remote).path.lstrip("/"))
lp_git_repo.default_branch = options.dest_series
lp_git_repo.lp_save()
new_ref = "refs/heads/%s" % options.dest_series
# Sometimes it takes LP a while to notice the new ref
for i in range(10):
if lp_git_repo.getRefByPath(path=new_ref):
lp_git_repo.default_branch = new_ref
lp_git_repo.lp_save()
break
time.sleep(1)
else:
raise Exception(
"Was unable to set default_branch of %s after "
"multiple retries - proceed manually." % remote)
else:
raise Exception(
"Git remote URL must be on git.launchpad.net.")

@ -1,4 +1,4 @@
#! /usr/bin/python
#! /usr/bin/python3
# Copyright 2012 Canonical Ltd.
# Author: Colin Watson <cjwatson@ubuntu.com>

@ -1,4 +1,4 @@
#! /usr/bin/python
#! /usr/bin/python2.7
# Copyright (C) 2009, 2010, 2011, 2012 Canonical Ltd.
@ -17,18 +17,16 @@
from __future__ import print_function
from collections import defaultdict
import gzip
import optparse
import os
import re
import sys
import tempfile
import apt_pkg
from utils import read_tag_file
default_base = '/home/ubuntu-archive/mirror/ubuntu'
default_suite = 'disco'
default_suite = 'groovy'
components = ('main', 'restricted', 'universe', 'multiverse')
# Cut-down RE from deb822.PkgRelation.
@ -65,34 +63,6 @@ def ports_arches(suite):
return ('arm64', 'armhf', 'ppc64el', 's390x')
def read_tag_file(path):
tmp = tempfile.NamedTemporaryFile(prefix='checkrdepends.', delete=False)
try:
compressed = gzip.open(path)
try:
tmp.write(compressed.read())
finally:
compressed.close()
tmp.close()
with open(tmp.name) as uncompressed:
tag_file = apt_pkg.TagFile(uncompressed)
prev_name = None
prev_stanza = None
for stanza in tag_file:
try:
name = stanza['package']
except KeyError:
continue
if name != prev_name and prev_stanza is not None:
yield prev_stanza
prev_name = name
prev_stanza = stanza
if prev_stanza is not None:
yield prev_stanza
finally:
os.unlink(tmp.name)
def read_sources(path):
ret = {
'binary': {},

@ -1,4 +1,4 @@
#!/usr/bin/env python
#!/usr/bin/env python3
# Sync a suite with a Seed list.
# Copyright (C) 2004, 2005, 2009, 2010, 2011, 2012 Canonical Ltd.
@ -22,8 +22,6 @@
# listed for promotion at once
# i.e. to allow 'change-override -S' usage
from __future__ import print_function
__metaclass__ = type
import atexit
@ -36,6 +34,7 @@ try:
except ImportError:
from cgi import escape
import json
from operator import attrgetter
from optparse import OptionParser
import os
import shutil
@ -43,10 +42,7 @@ import sys
import tempfile
from textwrap import dedent
import time
try:
from urllib.parse import quote_plus
except ImportError:
from urllib import quote_plus
from urllib.parse import quote_plus
import apt_pkg
from launchpadlib.launchpad import Launchpad
@ -128,7 +124,7 @@ def read_current_source(options):
archive_source[pkg] = (
version, component.split("/")[0])
for pkg, (version, component) in archive_source.items():
for pkg, (version, component) in list(archive_source.items()):
if component in options.components:
current_source[pkg] = (version, component)
@ -140,9 +136,7 @@ def read_current_binary(options):
components_with_di.append('%s/debian-installer' % component)
for suite in options.suites:
for component in components_with_di:
for arch in [
"i386", "amd64", "armhf", "arm64", "ppc64el",
"s390x"]:
for arch in options.architectures:
binaries_path = "%s/dists/%s/%s/binary-%s/Packages.gz" % (
options.archive_dir, suite, component, arch)
for section in apt_pkg.TagFile(decompress_open(binaries_path)):
@ -161,7 +155,7 @@ def read_current_binary(options):
archive_binary[pkg][0], version) < 0:
archive_binary[pkg] = (version, component, src)
for pkg, (version, component, src) in archive_binary.items():
for pkg, (version, component, src) in list(archive_binary.items()):
if component in options.components:
current_binary[pkg] = (version, component, src)
@ -183,8 +177,7 @@ def read_germinate(options):
# ideally supported+build-depends too, but Launchpad's
# cron.germinate doesn't save this
for arch in ["i386", "amd64", "armhf", "arm64", "ppc64el",
"s390x"]:
for arch in options.architectures:
for seed in seeds:
filename = "%s/%s_%s_%s_%s" % (
options.germinate_path, seed, flavour, options.suite, arch)
@ -264,13 +257,13 @@ def find_signer(options, source):
exact_match=True)
if not publications:
return('no publications found', '')
sorted_pubs = sorted([(ps.date_published, ps)
for ps in publications
if ps.date_published is not None], reverse=True)
sorted_pubs = sorted(
[ps for ps in publications if ps.date_published is not None],
key=attrgetter('date_published'), reverse=True)
for pub in sorted_pubs:
if pub[1].package_signer:
signer = pub[1].package_signer.name
web_link = pub[1].package_signer.web_link
if pub.package_signer:
signer = pub.package_signer.name
web_link = pub.package_signer.web_link
return(signer, web_link)
else:
signer = ''
@ -343,8 +336,8 @@ def do_dot(why, fd, mir_bugs, suite):
fd.write(
'digraph "component-mismatches: movements to main/restricted" {\n')
for s, binwhy in why.iteritems():
for binary, why in binwhy.iteritems():
for s, binwhy in why.items():
for binary, why in binwhy.items():
# ignore binaries from this source, and "rescued"
if why in binwhy or why.startswith('Rescued'):
continue
@ -472,7 +465,7 @@ def get_teams(options, source):
if os.path.exists(options.package_team_mapping):
with open(options.package_team_mapping) as ptm_file:
for team, packages in json.load(ptm_file).items():
for team, packages in list(json.load(ptm_file).items()):
if team == "unsubscribed":
continue
for package in packages:
@ -575,7 +568,7 @@ def do_output(options,
package_team_mapping = defaultdict(set)
if os.path.exists(options.package_team_mapping):
with open(options.package_team_mapping) as ptm_file:
for team, packages in json.load(ptm_file).items():
for team, packages in (json.load(ptm_file).items()):
if team == "unsubscribed":
continue
for package in packages:
@ -741,7 +734,7 @@ def do_output(options,
}
results["source demotions"] += len(output)
for title, output_spec in all_output.items():
for title, output_spec in list(all_output.items()):
source_and_binary = output_spec.get("source_and_binary", False)
binary_only = output_spec.get("binary_only", False)
print_section_text(
@ -882,6 +875,9 @@ def main():
else:
options.suites = [options.suite]
options.series = options.distro.getSeries(name_or_version=options.suites[0])
options.architectures = [a.architecture_tag for a in options.series.architectures]
if options.output_file is not None:
sys.stdout = open('%s.new' % options.output_file, 'w')
if options.html_output_file is not None:
@ -889,6 +885,11 @@ def main():
else:
options.html_output = None
# Force encoding to UTF-8 even in non-UTF-8 locales.
import io
sys.stdout = io.TextIOWrapper(
sys.stdout.detach(), encoding="UTF-8", line_buffering=True)
options.time = time.time()
options.timestamp = time.strftime(
'%a %b %e %H:%M:%S %Z %Y', time.gmtime(options.time))

@ -1,4 +1,4 @@
#! /usr/bin/python
#! /usr/bin/python3
# Copyright (C) 2012 Canonical Ltd.
# Author: Colin Watson <cjwatson@ubuntu.com>

@ -1,4 +1,4 @@
#!/usr/bin/python
#!/usr/bin/python3
# Copyright (C) 2011, 2012 Canonical Ltd.
# Author: Martin Pitt <martin.pitt@canonical.com>
@ -24,87 +24,429 @@ USAGE:
from __future__ import print_function
import argparse
from contextlib import contextmanager
from copy import copy
from io import StringIO
import sys
import unittest
from launchpadlib.launchpad import Launchpad
from kernel_series import KernelSeries
class TestBase(unittest.TestCase):
class FakeArgs:
def __init__(self, **kwargs):
self.testing = True
self.series = None
self.source = None
self.ppa2 = False
self.security = False
self.security2 = False
self.esm = False
self.fips = False
self.ibmgt = False
self.to_signing = False
self.from_signing = False
self.no_auto = False
self.update(**kwargs)
def update(self, **kwargs):
for (key, value) in kwargs.items():
setattr(self, key, value)
return self
@contextmanager
def capture(self):
new_out, new_err = StringIO(), StringIO()
old_out, old_err = sys.stdout, sys.stderr
try:
sys.stdout, sys.stderr = new_out, new_err
yield sys.stdout, sys.stderr
finally:
sys.stdout, sys.stderr = old_out, old_err
@classmethod
def setUpClass(cls):
data = """
defaults:
routing-table:
default:
security-build:
- ['ppa:canonical-kernel-security-team/ubuntu/ppa', 'Release' ]
- ['ppa:canonical-kernel-security-team/ubuntu/ppa2', 'Release' ]
build:
- ['ppa:canonical-kernel-team/ubuntu/ppa', 'Release' ]
proposed:
- ['ubuntu', 'Proposed' ]
esm:
security-build:
- ['ppa:canonical-kernel-security-team/ubuntu/esm', 'Release']
build:
- ['ppa:canonical-kernel-esm/ubuntu/ppa', 'Release']
signing:
- ['ppa:canonical-signing/ubuntu/esm', 'Release']
proposed:
- ['ppa:canonical-kernel-esm/ubuntu/proposed', 'Release']
14.04:
codename: trusty
supported: true
esm: true
sources:
linux:
packages:
linux:
linux-signed:
type: signed
linux-meta:
type: meta
16.04:
codename: xenial
supported: true
sources:
linux-fips:
routing:
security-build:
- ['ppa:canonical-kernel-security-team/ubuntu/ppa', 'Release']
- ['ppa:canonical-kernel-security-team/ubuntu/ppa2', 'Release']
build:
- ['ppa:fips-cc-stig/ubuntu/fips-build', 'Release']
signing:
- ['ppa:canonical-signing/ubuntu/fips', 'Release']
proposed:
- ['ppa:ubuntu-advantage/ubuntu/fips-proposed', 'Release']
packages:
linux-fips:
linux-meta-fips:
type: meta
linux-signed-fips:
type: signed
18.04:
codename: bionic
supported: true
sources:
linux:
packages:
linux:
linux-signed:
type: signed
linux-meta:
type: meta
linux-ibm-gt:
routing:
security-build:
- ['ppa:canonical-kernel-security-team/ubuntu/ppa', 'Release']
- ['ppa:canonical-kernel-security-team/ubuntu/ppa2', 'Release']
build:
- ['ppa:ibm-cloud/ubuntu/build', 'Release']
proposed:
- ['ppa:ibm-cloud/ubuntu/proposed', 'Release']
packages:
linux-ibm-gt:
linux-meta-ibm-gt:
type: meta
"""
cls.ks = KernelSeries(data=data)
class TestRouting(TestBase):
def test_default(self):
expected = (['ppa:canonical-kernel-team/ubuntu/ppa', 'Release'], ['ubuntu', 'Proposed'], False)
result = routing(self.FakeArgs(series='bionic', source='linux'), self.ks)
self.assertEqual(expected, result)
def test_security(self):
expected = (['ppa:canonical-kernel-security-team/ubuntu/ppa', 'Release'], ['ubuntu', 'Proposed'], True)
result = routing(self.FakeArgs(series='bionic', source='linux', security=True), self.ks)
self.assertEqual(expected, result)
def test_security2(self):
expected = (['ppa:canonical-kernel-security-team/ubuntu/ppa2', 'Release'], ['ubuntu', 'Proposed'], True)
result = routing(self.FakeArgs(series='bionic', source='linux', security2=True), self.ks)
self.assertEqual(expected, result)
def test_to_signing(self):
expected = (['ppa:canonical-kernel-team/ubuntu/ppa', 'Release'], None, False)
result = routing(self.FakeArgs(series='bionic', source='linux', to_signing=True), self.ks)
self.assertEqual(expected, result)
def test_from_signing(self):
expected = (None, ['ubuntu', 'Proposed'], False)
result = routing(self.FakeArgs(series='bionic', source='linux', from_signing=True), self.ks)
self.assertEqual(expected, result)
def test_esm(self):
expected = (['ppa:canonical-kernel-esm/ubuntu/ppa', 'Release'], ['ppa:canonical-signing/ubuntu/esm', 'Release'], False)
result = routing(self.FakeArgs(series='trusty', source='linux'), self.ks)
self.assertEqual(expected, result)
def test_esm_security(self):
expected = (['ppa:canonical-kernel-security-team/ubuntu/esm', 'Release'], ['ppa:canonical-signing/ubuntu/esm', 'Release'], False)
result = routing(self.FakeArgs(series='trusty', source='linux', security=True), self.ks)
self.assertEqual(expected, result)
def test_esm_security2(self):
with self.assertRaises(SystemExit), self.capture() as (out, err):
expected = (None, ['ppa:canonical-kernel-esm/ubuntu/proposed', 'Release'], False)
result = routing(self.FakeArgs(series='trusty', source='linux', security2=True), self.ks)
self.assertEqual(expected, result)
def test_esm_to_signing(self):
expected = (['ppa:canonical-kernel-esm/ubuntu/ppa', 'Release'], ['ppa:canonical-signing/ubuntu/esm', 'Release'], False)
result = routing(self.FakeArgs(series='trusty', source='linux', esm=True, to_signing=True), self.ks)
self.assertEqual(expected, result)
def test_esm_from_signing(self):
expected = (['ppa:canonical-signing/ubuntu/esm', 'Release'], ['ppa:canonical-kernel-esm/ubuntu/proposed', 'Release'], False)
result = routing(self.FakeArgs(series='trusty', source='linux', esm=True, from_signing=True), self.ks)
self.assertEqual(expected, result)
# Autorouting will enable to_signing, the user will then want to switch us
# to from_signing in order to perform phase two copies. To ensure this is
# simple we make from_signing take presidence over to_signing. Test this
# is honoured correctly.
def test_esm_from_signing_override_to_signing(self):
expected = (['ppa:canonical-signing/ubuntu/esm', 'Release'], ['ppa:canonical-kernel-esm/ubuntu/proposed', 'Release'], False)
result = routing(self.FakeArgs(series='trusty', source='linux', esm=True, to_signing=True, from_signing=True), self.ks)
self.assertEqual(expected, result)
def test_fips(self):
expected = (['ppa:fips-cc-stig/ubuntu/fips-build', 'Release'], ['ppa:canonical-signing/ubuntu/fips', 'Release'], False)
result = routing(self.FakeArgs(series='xenial', source='linux-fips'), self.ks)
self.assertEqual(expected, result)
def test_fips_security(self):
expected = (['ppa:canonical-kernel-security-team/ubuntu/ppa', 'Release'], ['ppa:canonical-signing/ubuntu/fips', 'Release'], False)
result = routing(self.FakeArgs(series='xenial', source='linux-fips', security=True), self.ks)
self.assertEqual(expected, result)
def test_fips_security2(self):
expected = (['ppa:canonical-kernel-security-team/ubuntu/ppa2', 'Release'], ['ppa:canonical-signing/ubuntu/fips', 'Release'], False)
result = routing(self.FakeArgs(series='xenial', source='linux-fips', security2=True), self.ks)
self.assertEqual(expected, result)
def test_fips_to_signing(self):
expected = (['ppa:fips-cc-stig/ubuntu/fips-build', 'Release'], ['ppa:canonical-signing/ubuntu/fips', 'Release'], False)
result = routing(self.FakeArgs(series='xenial', source='linux-fips', to_signing=True), self.ks)
self.assertEqual(expected, result)
def test_fips_from_signing(self):
expected = (['ppa:canonical-signing/ubuntu/fips', 'Release'], ['ppa:ubuntu-advantage/ubuntu/fips-proposed', 'Release'], False)
result = routing(self.FakeArgs(series='xenial', source='linux-fips', from_signing=True), self.ks)
self.assertEqual(expected, result)
def test_ibmgt(self):
expected = (['ppa:ibm-cloud/ubuntu/build', 'Release'], ['ppa:ibm-cloud/ubuntu/proposed', 'Release'], False)
result = routing(self.FakeArgs(series='bionic', source='linux-ibm-gt'), self.ks)
self.assertEqual(expected, result)
def test_ibmgt_security(self):
expected = (['ppa:canonical-kernel-security-team/ubuntu/ppa', 'Release'], ['ppa:ibm-cloud/ubuntu/proposed', 'Release'], False)
result = routing(self.FakeArgs(series='bionic', source='linux-ibm-gt', security=True), self.ks)
self.assertEqual(expected, result)
def test_ibmgt_security2(self):
expected = (['ppa:canonical-kernel-security-team/ubuntu/ppa2', 'Release'], ['ppa:ibm-cloud/ubuntu/proposed', 'Release'], False)
result = routing(self.FakeArgs(series='bionic', source='linux-ibm-gt', security2=True), self.ks)
self.assertEqual(expected, result)
def routing(args, ks):
series_name = args.series
package_name = args.source
series = ks.lookup_series(codename=series_name)
if series is None:
print("ERROR: {} -- series unknown".format(series_name))
sys.exit(1)
package = None
package_signed = None
for source_srch in series.sources:
package_signed = None
for package_srch in source_srch.packages:
if package_srch.name == package_name:
package = package_srch
if (package_srch.name.startswith('linux-signed-') or
package_srch.name == 'linux-signed'):
package_signed = package_srch
if package is not None:
break
if package is None:
print("ERROR: {}/{} -- package unknown".format(series_name, package_name))
sys.exit(1)
source = package.source
routing = source.routing
if routing is None:
print("ERROR: {}/{} -- package has no routing".format(series_name, package_name))
sys.exit(1)
build_archives = routing.lookup_destination('build')
security_archives = routing.lookup_destination('security-build')
proposed_archive = routing.lookup_destination('proposed', primary=True)
signing_archive = routing.lookup_destination('signing', primary=True)
if build_archives is None or len(build_archives) < 1:
print("ERROR: {}/{} -- package has no primary build archive".format(series_name, package_name))
sys.exit(1)
if args.ppa2 and (build_archives is None or len(build_archives) < 2):
print("ERROR: {}/{} -- package has no secondary build archive".format(series_name, package_name))
sys.exit(1)
if build_archives is None:
print("ERROR: {}/{} -- package has no build archive".format(series_name, package_name))
sys.exit(1)
if proposed_archive is None:
print("ERROR: {}/{} -- package has no proposed archive".format(series_name, package_name))
sys.exit(1)
if args.security and (security_archives is None or len(security_archives) < 1):
print("ERROR: {}/{} -- package has no primary security archive".format(series_name, package_name))
sys.exit(1)
if args.security2 and (security_archives is None or len(security_archives) < 2):
print("ERROR: {}/{} -- package has no secondary security archive".format(series_name, package_name))
sys.exit(1)
# Default route build -> proposed
if args.ppa2:
from_archive = build_archives[1]
else:
from_archive = build_archives[0]
to_archive = proposed_archive
unembargo = False
# Handle security routing.
if args.security:
from_archive = security_archives[0]
if args.security2:
from_archive = security_archives[1]
# Allow us to unembargo when releasing from security to ubuntu.
if (args.security or args.security2) and to_archive[0] == 'ubuntu':
unembargo = True
# Handle signing routing.
if args.from_signing:
from_archive = signing_archive
elif args.to_signing:
to_archive = signing_archive
# Automatically route to signing by default.
elif args.no_auto is False and signing_archive is not None and package_signed is not None:
to_archive = signing_archive
# Announce the routing if needed.
if (args.testing is False and (routing.name != 'default' or from_archive == signing_archive or to_archive == signing_archive)):
msg = "NOTE: directing copy using {} routes".format(routing.name)
if from_archive == signing_archive:
msg += ' from signing'
elif to_archive == signing_archive:
msg += ' to signing'
print(msg)
return (from_archive, to_archive, unembargo)
# SELF-TESTS:
if len(sys.argv) >= 2 and sys.argv[1] == '--self-test':
unittest.main(argv=[sys.argv[0]] + sys.argv[2:])
sys.exit(0)
parser = argparse.ArgumentParser(description='Copy a proposed kernel to the apropriate archive pocket')
parser.set_defaults(testing=False)
parser.add_argument('--dry-run', action='store_true', help='Do everything but actually copy the package')
parser.add_argument('--ppa2', action='store_true', help='Copy from the kernel build PPA2')
parser.add_argument('--security', '-S', action='store_true', help='Copy from the kernel security PPA')
parser.add_argument('--security2', action='store_true', help='Copy from the kernel security PPA2')
parser.add_argument('--esm', '-E', action='store_true', help='Copy from the kernel ESM PPA and to the kernel ESM proposed PPA')
parser.add_argument('--fips', action='store_true', help='Copy from the kernel FIPS PPA and to the kernel FIPS proposed PPA')
parser.add_argument('--ibmgt', action='store_true', help='Copy from the kernel IBM-GT build PPA to the corresponding proposed PPA')
parser.add_argument('--no-auto', action='store_true', help='Turn off automatic detection of ESM et al based on series')
parser.add_argument('--to-signing', action='store_true', help='Copy from the kernel ESM/FIPS PPA to the ESM/FIPS signing PPA')
parser.add_argument('--from-signing', action='store_true', help='Copy from the ESM/FIPS signing PPA to the ESM/FIPS proposed PPA')
parser.add_argument('series', action='store', help='The series the source package is in')
parser.add_argument('source', action='store', help='The source package name')
parser.add_argument('source', action='store', nargs='+', help='The source package name')
args = parser.parse_args()
to = 'ubuntu'
ppa_name = '~canonical-kernel-team/ubuntu/ppa'
security = False
# If we are allowed to intuit destinations do so:
# 1) precise is now destined for the ESM PPAs
if not args.no_auto:
if args.series == 'precise' and not args.esm:
print("NOTE: directing copy from and to ESM for precise")
args.esm = True
if args.esm:
ppa_name = '~canonical-kernel-esm/ubuntu/ppa'
to = '~canonical-kernel-esm/ubuntu/proposed'
to_pocket = 'release'
if args.security:
ppa_name = '~canonical-kernel-security-team/ubuntu/ppa'
if not args.esm:
security = True
else:
ppa_name = '~canonical-kernel-security-team/ubuntu/esm'
if args.security2:
ppa_name = '~canonical-kernel-security-team/ubuntu/ppa2'
if not args.esm:
security = True
if args.esm or args.fips or args.ibmgt:
print("NOTE: flags --esm, --fips, and --ibmgt are now deprecated")
(release, pkg) = (args.series, args.source)
release = args.series
ks = KernelSeries()
launchpad = Launchpad.login_with(
'ubuntu-archive-tools', 'production', version='devel')
ubuntu = launchpad.distributions['ubuntu']
distro_series = ubuntu.getSeries(name_or_version=release)
kernel_ppa = launchpad.archives.getByReference(
reference=ppa_name)
# get current version in PPA for that series
versions = kernel_ppa.getPublishedSources(
source_name=pkg, exact_match=True, status='Published', pocket='Release',
distro_series=distro_series)
assert versions.total_size == 1
version = versions[0].source_package_version
include_binaries = (pkg not in ('debian-installer')
and not pkg.startswith('linux-signed'))
# Grab a reference to the 'to' archive and select a pocket.
to_archive = launchpad.archives.getByReference(reference=to)
if to == 'ubuntu':
to_pocket = 'proposed'
else:
to_pocket = 'release'
print("""Copying {}/{}:
From: {} release
To: {} {}""".format(pkg, version, kernel_ppa, to_archive, to_pocket))
copies = []
for pkg in list(args.source):
# BODGE: routing should just take release/pkg.
args.source = pkg
(from_archive, to_archive, security) = routing(args, ks)
##print("from_archive<{}> to_archive<{}>".format(from_archive, to_archive))
if from_archive is None:
print("ERROR: bad source PPA")
sys.exit(1)
if to_archive is None:
print("ERROR: bad destination")
sys.exit(1)
(from_reference, from_pocket) = from_archive
(to_reference, to_pocket) = to_archive
# Grab a reference to the 'from' archive.
from_archive = launchpad.archives.getByReference(
reference=from_reference)
# Grab a reference to the 'to' archive.
to_archive = launchpad.archives.getByReference(reference=to_reference)
# get current version in PPA for that series
versions = from_archive.getPublishedSources(
source_name=pkg, exact_match=True, status='Published', pocket=from_pocket,
distro_series=distro_series)
version = None
if versions.total_size == 1:
version = versions[0].source_package_version
include_binaries = (pkg not in ('debian-installer')
and not pkg.startswith('linux-signed'))
if args.from_signing:
include_binaries = True
print("""Copying {}/{}:
From: {} {}
To: {} {}
Binaries: {}""".format(pkg, version, from_archive.reference, from_pocket, to_archive.reference, to_pocket, include_binaries))
if not version:
print("ERROR: no version to copy")
sys.exit(1)
copies.append({
'from_archive': from_archive,
'include_binaries': include_binaries,
'source_name': pkg,
'to_series': release,
'to_pocket': to_pocket,
'version': version,
'auto_approve': True,
'unembargo': security,
})
if args.dry_run:
print("Dry run; no packages copied.")
sys.exit(0)
# Finally ready to actually copy this.
to_archive.copyPackage(
from_archive=kernel_ppa, include_binaries=include_binaries,
source_name=pkg, to_series=release, to_pocket=to_pocket, version=version,
auto_approve=True, unembargo=security)
for copy in copies:
# We found valid packages for each requested element, actually copy them.
to_archive.copyPackage(**copy)
# TODO: adjust this script to use find-bin-overrides or rewrite
# find-bin-overrides to use lpapi and use it here.

@ -1,10 +1,10 @@
#! /usr/bin/env python
#! /usr/bin/env python2.7
from __future__ import print_function
import atexit
import bz2
from collections import namedtuple
import gzip
import optparse
import os
import re
@ -13,12 +13,13 @@ import subprocess
import tempfile
try:
from urllib.parse import unquote
from urllib.request import urlretrieve
except ImportError:
from urllib import unquote, urlretrieve
from urllib import unquote
import apt_pkg
from launchpadlib.launchpad import Launchpad
import lzma
import requests
# from dak, more or less
@ -44,18 +45,42 @@ def ensure_tempdir():
def decompress_open(tagfile):
if tagfile.startswith('http:') or tagfile.startswith('ftp:'):
url = tagfile
tagfile = urlretrieve(url)[0]
if tagfile.endswith('.gz'):
ensure_tempdir()
decompressed = tempfile.mktemp(dir=tempdir)
fin = gzip.GzipFile(filename=tagfile)
with open(decompressed, 'wb') as fout:
fout.write(fin.read())
return open(decompressed, 'r')
response = requests.get(tagfile, stream=True)
if response.status_code == 404:
response.close()
tagfile = tagfile.replace('.xz', '.bz2')
response = requests.get(tagfile, stream=True)
response.raise_for_status()
if '.' in tagfile:
suffix = '.' + tagfile.rsplit('.', 1)[1]
else:
suffix = ''
fd, tagfile = tempfile.mkstemp(suffix=suffix, dir=tempdir)
with os.fdopen(fd, 'wb') as f:
f.write(response.raw.read())
response.close()
elif not os.path.exists(tagfile):
tagfile = tagfile.replace('.xz', '.bz2')
if tagfile.endswith('.xz'):
decompressor = lzma.LZMAFile
elif tagfile.endswith('.bz2'):
decompressor = bz2.BZ2File
else:
decompressor = None
if decompressor is not None:
fd, decompressed = tempfile.mkstemp(dir=tempdir)
dcf = decompressor(tagfile)
try:
with os.fdopen(fd, 'wb') as f:
f.write(dcf.read())
finally:
dcf.close()
return open(decompressed, 'rb')
else:
return open(tagfile, 'r')
return open(tagfile, 'rb')
Section = namedtuple("Section", ["version", "directory", "files"])
@ -84,7 +109,11 @@ def find_dsc(options, pkg, section):
filenames = []
for url in spph.sourceFileUrls():
filename = os.path.join(outdir, unquote(os.path.basename(url)))
urlretrieve(url, filename)
response = requests.get(url, stream=True)
response.raise_for_status()
with open(filename, 'wb') as f:
f.write(response.raw.read())
response.close()
filenames.append(filename)
yield [s for s in filenames if s.endswith('.dsc')][0]
@ -132,7 +161,8 @@ def descended_from(options, pkg, section1, section2):
for dsc in find_dsc(options, pkg, section1):
try:
versions = get_changelog_versions(pkg, dsc, section1.version)
except BrokenSourcePackage as exception:
except BrokenSourcePackage as e:
exception = e
continue
return section1.version in versions
raise exception
@ -223,9 +253,9 @@ def main():
for suite in suites:
for component in 'main', 'restricted', 'universe', 'multiverse':
tagfile1 = '%s/dists/%s-security/%s/source/Sources.gz' % (
tagfile1 = '%s/dists/%s-security/%s/source/Sources.xz' % (
options.mirrors[0], suite, component)
tagfile2 = '%s/dists/%s-updates/%s/source/Sources.gz' % (
tagfile2 = '%s/dists/%s-updates/%s/source/Sources.xz' % (
options.mirrors[0], suite, component)
name1 = '%s-security' % suite
name2 = '%s-updates' % suite

@ -20,7 +20,7 @@ set -e
MIRROR=$HOME/mirror
DISTRIBUTION="${DISTRIBUTION:-ubuntu}"
RELEASE="${RELEASE:-disco}"
RELEASE="${RELEASE:-groovy}"
OUTDIR="${OUTDIR:-$HOME/public_html/NBS}"
OUTFILE="${OUTFILE:-$HOME/public_html/nbs.html}"
@ -42,6 +42,6 @@ checkrdepends -B "$MIRROR/$DISTRIBUTION" -s $RELEASE -b -d "$D" $CHECK
rsync -a --delete "$D/" "$OUTDIR/"
nbs-report -d "$DISTRIBUTION" -s "$RELEASE" --csv "${OUTFILE%.html}.csv" \
nbs-report -B "$MIRROR/$DISTRIBUTION" -d "$DISTRIBUTION" -s "$RELEASE" --csv "${OUTFILE%.html}.csv" \
"$OUTDIR/" >"$OUTFILE.new" && \
mv "$OUTFILE.new" "$OUTFILE"

@ -1,4 +1,4 @@
#! /usr/bin/python
#! /usr/bin/python3
# Copyright (C) 2013 Canonical Ltd.
# Author: Colin Watson <cjwatson@ubuntu.com>

@ -1,4 +1,4 @@
#! /usr/bin/python
#! /usr/bin/python2.7
# Copyright (C) 2014 Canonical Ltd.
# Author: Colin Watson <cjwatson@ubuntu.com>

@ -1,4 +1,4 @@
#!/usr/bin/python
#!/usr/bin/python3
# -*- coding: utf-8 -*-
# Copyright (C) 2008, 2009, 2010, 2011, 2012 Canonical Ltd.
@ -24,7 +24,7 @@
from __future__ import print_function
from optparse import OptionParser, SUPPRESS_HELP
import argparse
import sys
import launchpadlib.errors
@ -74,44 +74,45 @@ def multiline_input(prompt):
buf.append(line)
def get_archive(options, launchpad):
def get_archive(args, launchpad):
# We default to looking up by archive reference (ubuntu,
# ubuntu/partner or ~owner/ubuntu/ppa).
if options.archive is not None:
archive = launchpad.archives.getByReference(reference=options.archive)
if args.archive is not None:
archive = launchpad.archives.getByReference(reference=args.archive)
if archive is not None:
return archive
# But we also still support combining a distro name in -d and an
# archive name or old PPA reference in -A (-d ubuntu,
# -d ubuntu -A partner, or -d ubuntu -A owner/ppa).
distro = launchpad.distributions[options.distro]
if options.archive is None:
distro = launchpad.distributions[args.distro]
if args.archive is None:
return distro.main_archive
else:
if '/' in options.archive:
owner, ppa_name = options.archive.split('/')
if '/' in args.archive:
owner, ppa_name = args.archive.split('/')
return launchpad.people[owner].getPPAByName(
distribution=distro, name=ppa_name)
for archive in distro.archives:
if archive.name == options.archive:
if archive.name == args.archive:
return archive
raise AssertionError("No such archive in Ubuntu: %s" % options.archive)
raise AssertionError("No such archive in Ubuntu: %s" % args.archive)
def get_source_components(options, launchpad, archive, source):
def get_source_components(args, launchpad, archive, source):
try:
from debian import debian_support
except ImportError:
from debian_bundle import debian_support
args = {}
if options.series:
args['distro_series'] = options.series
kwargs = {}
if args.series:
kwargs['distro_series'] = args.series
newest = {}
for spph in archive.getPublishedSources(
source_name=source, exact_match=True, status='Published', **args):
source_name=source, exact_match=True, status='Published',
**kwargs):
if not spph.distro_series.active:
continue
new_version = debian_support.Version(spph.source_package_version)
@ -128,15 +129,15 @@ permission_names = dict(upload='Archive Upload Rights',
admin='Queue Administration Rights')
def do_query(options):
def do_query(args):
"""Query existing permissions and show on stdout."""
if options.archive.self_link == options.distro.main_archive_link:
archives = options.distro.archives
if args.archive.self_link == args.distro.main_archive_link:
archives = args.distro.archives
else:
archives = [options.archive]
archives = [args.archive]
if options.person:
for person in options.person:
if args.person:
for person in args.person:
if '@' in person:
lp_person = launchpad.people.getByEmail(email=person)
else:
@ -149,29 +150,29 @@ def do_query(options):
for archive in archives:
perms.extend(archive.getPermissionsForPerson(
person=lp_person))
if options.acl_type:
perm_name = permission_names[options.acl_type]
if args.acl_type:
perm_name = permission_names[args.acl_type]
perms = [p for p in perms if p.permission == perm_name]
print("== All rights for %s ==" % lp_person.name)
print_perms(perms, options.series)
print_perms(perms, args.series)
if options.component:
if args.component:
perms = []
if not options.acl_type or options.acl_type == 'upload':
if not args.acl_type or args.acl_type == 'upload':
for archive in archives:
perms.extend(archive.getUploadersForComponent(
component_name=options.component))
if not options.acl_type or options.acl_type == 'admin':
component_name=args.component))
if not args.acl_type or args.acl_type == 'admin':
for archive in archives:
perms.extend(archive.getQueueAdminsForComponent(
component_name=options.component))
print("== All rights for component '%s' ==" % options.component)
print_perms(perms, options.series)
component_name=args.component))
print("== All rights for component '%s' ==" % args.component)
print_perms(perms, args.series)
if options.packageset:
for packageset in options.packageset:
if args.packageset:
for packageset in args.packageset:
lp_set = launchpad.packagesets.getByName(
name=packageset, distroseries=options.series)
name=packageset, distroseries=args.series)
perms = []
for archive in archives:
@ -179,27 +180,27 @@ def do_query(options):
packageset=lp_set))
print(("== All uploaders for package set '%s' in '%s' "
"(owned by '%s') ==" %
(packageset, options.series.name,
(packageset, args.series.name,
lp_set.owner.display_name)))
print_perms(perms, options.series)
print_perms(perms, args.series)
sources = sorted(lp_set.getSourcesIncluded(direct_inclusion=True))
if sources:
print()
print("== All source packages in package set '%s' "
"in '%s' ==" % (packageset, options.series.name))
"in '%s' ==" % (packageset, args.series.name))
for source in sources:
print(source)
child_sets = list(lp_set.setsIncluded(direct_inclusion=True))
if child_sets:
print()
print("== All package sets in package set '%s' in '%s' ==" %
(packageset, options.series.name))
(packageset, args.series.name))
for child_set in child_sets:
print(child_set.name)
if options.source:
for source in options.source:
if args.source:
for source in args.source:
perms = []
perms_set = []
for archive in archives:
@ -208,63 +209,62 @@ def do_query(options):
perms_set.extend(archive.getPackagesetsForSource(
sourcepackagename=source))
print("== All uploaders for package '%s' ==" % source)
print_perms(perms, options.series)
print_perms(perms_set, options.series)
print_perms(perms, args.series)
print_perms(perms_set, args.series)
for archive in archives:
for series, component in get_source_components(
options, launchpad, archive, source):
args, launchpad, archive, source):
perms_component = archive.getUploadersForComponent(
component_name=component)
print_perms(perms_component, series=series)
if options.pocket:
if args.pocket:
perms = []
if not options.acl_type or options.acl_type == 'upload':
if not args.acl_type or args.acl_type == 'upload':
for archive in archives:
perms.extend(archive.getUploadersForPocket(
pocket=options.pocket))
if not options.acl_type or options.acl_type == 'admin':
perms.extend(archive.getUploadersForPocket(pocket=args.pocket))
if not args.acl_type or args.acl_type == 'admin':
for archive in archives:
perms.extend(archive.getQueueAdminsForPocket(
pocket=options.pocket))
print("== All rights for pocket '%s' ==" % options.pocket)
print_perms(perms, options.series)
pocket=args.pocket))
print("== All rights for pocket '%s' ==" % args.pocket)
print_perms(perms, args.series)
if (not options.person and not options.component and
not options.packageset and not options.source and
not options.pocket):
if (not args.person and not args.component and
not args.packageset and not args.source and
not args.pocket):
perms = []
for archive in archives:
perms.extend(archive.getAllPermissions())
if options.acl_type:
perm_name = permission_names[options.acl_type]
if args.acl_type:
perm_name = permission_names[args.acl_type]
perms = [p for p in perms if p.permission == perm_name]
print("== All rights ==")
print_perms(perms, options.series)
print_perms(perms, args.series)
def validate_add_delete_options(options, requires_person=True):
if options.packageset and options.source:
def validate_add_delete_options(args, requires_person=True):
if args.packageset and args.source:
# Special options to manage package sets, bodged into this tool
# since they aren't entirely inconvenient here.
if options.component or options.person:
if args.component or args.person:
print("-P <packageset> -s <source> cannot be used with a "
"component or person as well")
return False
return True
if requires_person and not options.person:
if requires_person and not args.person:
print("You must specify at least one person to (de-)authorise.")
return False
count = 0
if options.component:
if args.component:
count += 1
if options.packageset:
if args.packageset:
count += 1
if options.source:
if args.source:
count += 1
if options.pocket:
if args.pocket:
count += 1
if count > 1:
print("You can only specify one of package set, source, component, "
@ -279,94 +279,94 @@ def validate_add_delete_options(options, requires_person=True):
return True
def do_add(options):
def do_add(args):
"""Add a new permission."""
if not validate_add_delete_options(options):
if not validate_add_delete_options(args):
return False
if options.packageset and options.source:
for packageset in options.packageset:
if args.packageset and args.source:
for packageset in args.packageset:
lp_set = launchpad.packagesets.getByName(
name=packageset, distroseries=options.series)
lp_set.addSources(names=options.source)
name=packageset, distroseries=args.series)
lp_set.addSources(names=args.source)
print("Added:")
for source in options.source:
for source in args.source:
print(source)
return
people = [launchpad.people[person] for person in options.person]
people = [launchpad.people[person] for person in args.person]
if options.source:
for source in options.source:
if args.source:
for source in args.source:
for person in people:
perm = options.archive.newPackageUploader(
perm = args.archive.newPackageUploader(
person=person, source_package_name=source)
print("Added:")
print_perms([perm])
return
if options.packageset:
for packageset in options.packageset:
if args.packageset:
for packageset in args.packageset:
lp_set = launchpad.packagesets.getByName(
name=packageset, distroseries=options.series)
name=packageset, distroseries=args.series)
for person in people:
perm = options.archive.newPackagesetUploader(
perm = args.archive.newPackagesetUploader(
person=person, packageset=lp_set)
print("Added:")
print_perms([perm])
return
if options.component:
if args.component:
for person in people:
if not options.acl_type or options.acl_type == 'upload':
perm = options.archive.newComponentUploader(
person=person, component_name=options.component)
if not args.acl_type or args.acl_type == 'upload':
perm = args.archive.newComponentUploader(
person=person, component_name=args.component)
else:
perm = options.archive.newQueueAdmin(
person=person, component_name=options.component)
perm = args.archive.newQueueAdmin(
person=person, component_name=args.component)
print("Added:")
print_perms([perm])
return
if options.pocket:
if args.pocket:
admin_kwargs = {}
if options.series:
admin_kwargs["distroseries"] = options.series
if args.series:
admin_kwargs["distroseries"] = args.series
for person in people:
if not options.acl_type or options.acl_type == 'upload':
perm = options.archive.newPocketUploader(
person=person, pocket=options.pocket)
if not args.acl_type or args.acl_type == 'upload':
perm = args.archive.newPocketUploader(
person=person, pocket=args.pocket)
else:
perm = options.archive.newPocketQueueAdmin(
person=person, pocket=options.pocket, **admin_kwargs)
perm = args.archive.newPocketQueueAdmin(
person=person, pocket=args.pocket, **admin_kwargs)
print("Added:")
print_perms([perm])
return
def do_delete(options):
def do_delete(args):
"""Delete a permission."""
# We kind of hacked packageset management into here.
# Deleting packagesets doesn't require a person...
requires_person = not (options.packageset and not options.source)
if not validate_add_delete_options(options, requires_person):
requires_person = not (args.packageset and not args.source)
if not validate_add_delete_options(args, requires_person):
return False
if options.packageset and options.source:
for packageset in options.packageset:
if args.packageset and args.source:
for packageset in args.packageset:
lp_set = launchpad.packagesets.getByName(
name=packageset, distroseries=options.series)
lp_set.removeSources(names=options.source)
name=packageset, distroseries=args.series)
lp_set.removeSources(names=args.source)
print("Deleted:")
for source in options.source:
for source in args.source:
print(source)
return
if options.packageset and not options.person:
for packageset in options.packageset:
if args.packageset and not args.person:
for packageset in args.packageset:
lp_set = launchpad.packagesets.getByName(
name=packageset, distroseries=options.series)
uploaders = options.archive.getUploadersForPackageset(
name=packageset, distroseries=args.series)
uploaders = args.archive.getUploadersForPackageset(
direct_permissions=True, packageset=lp_set)
if len(uploaders) > 0:
print("Cannot delete packageset with defined uploaders")
@ -386,82 +386,82 @@ def do_delete(options):
ack = input("Remove? (y/N): ")
if ack.lower() == 'y':
lp_set.lp_delete()
print("Deleted %s/%s" % (lp_set.name, options.series.name))
print("Deleted %s/%s" % (lp_set.name, args.series.name))
return
lp_people = [launchpad.people[person] for person in options.person]
lp_people = [launchpad.people[person] for person in args.person]
if options.source:
for source in options.source:
if args.source:
for source in args.source:
for lp_person in lp_people:
try:
options.archive.deletePackageUploader(
args.archive.deletePackageUploader(
person=lp_person, source_package_name=source)
print("Deleted %s/%s" % (lp_person.name, source))
except Exception:
print("Failed to delete %s/%s" % (lp_person.name, source))
return
if options.packageset:
for packageset in options.packageset:
if args.packageset:
for packageset in args.packageset:
lp_set = launchpad.packagesets.getByName(
name=packageset, distroseries=options.series)
name=packageset, distroseries=args.series)
for lp_person in lp_people:
options.archive.deletePackagesetUploader(
args.archive.deletePackagesetUploader(
person=lp_person, packageset=lp_set)
print("Deleted %s/%s/%s" % (lp_person.name, packageset,
options.series.name))
args.series.name))
return
if options.component:
if args.component:
for lp_person in lp_people:
if not options.acl_type or options.acl_type == 'upload':
options.archive.deleteComponentUploader(
person=lp_person, component_name=options.component)
print("Deleted %s/%s" % (lp_person.name, options.component))
if not args.acl_type or args.acl_type == 'upload':
args.archive.deleteComponentUploader(
person=lp_person, component_name=args.component)
print("Deleted %s/%s" % (lp_person.name, args.component))
else:
options.archive.deleteQueueAdmin(
person=lp_person, component_name=options.component)
args.archive.deleteQueueAdmin(
person=lp_person, component_name=args.component)
print("Deleted %s/%s (admin)" % (lp_person.name,
options.component))
args.component))
return
if options.pocket:
if args.pocket:
admin_kwargs = {}
if options.series:
admin_kwargs["distroseries"] = options.series
if args.series:
admin_kwargs["distroseries"] = args.series
for lp_person in lp_people:
if not options.acl_type or options.acl_type == 'upload':
options.archive.deletePocketUploader(
person=lp_person, pocket=options.pocket)
print("Deleted %s/%s" % (lp_person.name, options.pocket))
if not args.acl_type or args.acl_type == 'upload':
args.archive.deletePocketUploader(
person=lp_person, pocket=args.pocket)
print("Deleted %s/%s" % (lp_person.name, args.pocket))
else:
options.archive.deletePocketQueueAdmin(
person=lp_person, pocket=options.pocket, **admin_kwargs)
if options.series:
args.archive.deletePocketQueueAdmin(
person=lp_person, pocket=args.pocket, **admin_kwargs)