pullpkg: add support for pulling from upload queue

This commit is contained in:
Dan Streetman 2020-03-04 08:29:02 +01:00
parent 8f42fb976f
commit 8aa2d602c5
2 changed files with 157 additions and 2 deletions

View File

@ -45,6 +45,8 @@ POCKETS = DEFAULT_POCKETS + ('Backports',)
DEFAULT_STATUSES = ('Pending', 'Published')
STATUSES = DEFAULT_STATUSES + ('Superseded', 'Deleted', 'Obsolete')
UPLOAD_QUEUE_STATUSES = ('New', 'Unapproved', 'Accepted', 'Done', 'Rejected')
_system_distribution_chain = []

View File

@ -22,6 +22,7 @@
# ##################################################################
import os
import re
import sys
import errno
@ -30,6 +31,8 @@ from argparse import ArgumentParser
from distro_info import DebianDistroInfo
from urllib.parse import urlparse
from ubuntutools.archive import (UbuntuSourcePackage, DebianSourcePackage,
UbuntuCloudArchiveSourcePackage,
PersonalPackageArchiveSourcePackage)
@ -39,7 +42,11 @@ from ubuntutools.lp.udtexceptions import (SeriesNotFoundException,
PackageNotFoundException,
PocketDoesNotExistError,
InvalidDistroValueError)
from ubuntutools.misc import (split_release_pocket, host_architecture, STATUSES)
from ubuntutools.misc import (split_release_pocket,
host_architecture,
download,
UPLOAD_QUEUE_STATUSES,
STATUSES)
from ubuntutools import _loggingBasicConfig
@ -150,7 +157,9 @@ class PullPkg(object):
if distro == DISTRO_UBUNTU:
parser.add_argument('--security', action='store_true',
help='Check the Ubuntu Security Team PPA')
help='Pull from the Ubuntu Security Team (proposed) PPA')
parser.add_argument('--upload-queue', action='store_true',
help='Pull from the Ubuntu upload queue')
if distro == DISTRO_PPA:
parser.add_argument('--ppa', help='PPA to pull from')
if parser.parse_known_args(args)[0].ppa is None:
@ -295,6 +304,7 @@ class PullPkg(object):
# ensure these are always included so we can just check for None/False later
options['ppa'] = options.get('ppa', None)
options['security'] = options.get('security', False)
options['upload_queue'] = options.get('upload_queue', False)
return options
@ -347,6 +357,22 @@ class PullPkg(object):
params['status'] = STATUSES if 'all' in options['status'] else options['status']
# special handling for upload queue
if options['upload_queue']:
if len(options['status']) > 1:
raise ValueError("Too many --status provided, "
"can only search for a single status or 'all'")
if not options['status']:
params['status'] = None
elif options['status'][0].lower() == 'all':
params['status'] = 'all'
elif options['status'][0].capitalize() in UPLOAD_QUEUE_STATUSES:
params['status'] = options['status'][0].capitalize()
else:
msg = ("Invalid upload queue status '%s': valid values are %s" %
(options['status'][0], ', '.join(UPLOAD_QUEUE_STATUSES)))
raise ValueError(msg)
return params
def pull(self, args=sys.argv[1:]):
@ -369,6 +395,11 @@ class PullPkg(object):
params = self._get_params(options)
package = params['package']
if options['upload_queue']:
# upload queue API is different/simpler
self.pull_upload_queue(pull, arch=options['arch'], **params)
return
# call implementation, and allow exceptions to flow up to caller
srcpkg = DISTRO_PKG_CLASS[distro](**params)
spph = srcpkg.lp_spph
@ -422,3 +453,125 @@ class PullPkg(object):
else:
Logger.error("Internal error: invalid pull value after parse_pull()")
raise InvalidPullValueError("Invalid pull value '%s'" % pull)
def pull_upload_queue(self, pull, **params):
package = params['package']
version = params['version']
arch = params['arch']
if not params['series']:
Logger.error("Using --upload-queue requires specifying series")
return
series = Distribution('ubuntu').getSeries(params['series'])
queueparams = {'name': package}
if params['pocket']:
queueparams['pocket'] = params['pocket']
if params['status'] == 'all':
queueparams['status'] = None
queuetype = 'any'
elif params['status']:
queueparams['status'] = params['status']
queuetype = params['status']
else:
queuetype = 'Unapproved'
packages = [p for p in series.getPackageUploads(**queueparams) if
p.package_version == version or
str(p.id) == version or
not version]
if pull == PULL_SOURCE:
packages = [p for p in packages if p.contains_source]
elif pull in VALID_BINARY_PULLS:
packages = [p for p in packages if
p.contains_build and
(arch in ['all', 'any'] or
arch in p.display_arches.replace(',', '').split())]
if not packages:
msg = ("Package %s not found in %s upload queue for %s" %
(package, queuetype, series.name))
if version:
msg += " with version/id %s" % version
if pull in VALID_BINARY_PULLS:
msg += " for arch %s" % arch
raise PackageNotFoundException(msg)
if pull == PULL_LIST:
for p in packages:
msg = "Found %s %s (ID %s)" % (p.package_name, p.package_version, p.id)
if p.display_arches:
msg += " arch %s" % p.display_arches
Logger.info(msg)
url = p.changesFileUrl()
if url:
Logger.info("Changes file:")
Logger.info(" %s", url)
else:
Logger.info("No changes file")
urls = p.sourceFileUrls()
if urls:
Logger.info("Source files:")
for url in urls:
Logger.info(" %s", url)
else:
Logger.info("No source files")
urls = p.binaryFileUrls()
if urls:
Logger.info("Binary files:")
for url in urls:
Logger.info(" %s", url)
Logger.info(" { %s }" % p.binaryFileProperties(url))
else:
Logger.info("No binary files")
urls = p.customFileUrls()
if urls:
Logger.info("Custom files:")
for url in urls:
Logger.info(" %s", url)
return
if len(packages) > 1:
msg = "Found multiple packages"
if version:
msg += " with version %s, please specify the ID instead" % version
else:
msg += ", please specify the version"
Logger.error("Available package versions/ids are:")
for p in packages:
Logger.error("%s %s (id %s)" % (p.package_name, p.package_version, p.id))
raise PackageNotFoundException(msg)
p = packages[0]
urls = set(p.customFileUrls())
if p.changesFileUrl():
urls.add(p.changesFileUrl())
if pull == PULL_SOURCE:
urls |= set(p.sourceFileUrls())
if not urls:
Logger.error("No source files to download")
for url in urls:
download(url)
else:
name = '.*'
if pull == PULL_DEBS:
name = r'{}(?<!-di)(?<!-dbgsym)$'.format(name)
elif pull == PULL_DDEBS:
name += '-dbgsym$'
elif pull == PULL_UDEBS:
name += '-di$'
else:
raise InvalidPullValueError("Invalid pull value %s" % pull)
urls |= set(p.binaryFileUrls())
if not urls:
Logger.error("No binary files to download")
for url in urls:
filename = os.path.basename(urlparse(url).path)
if re.match(name, filename):
download(url)