ubuntutools: use file extension when possible to determine deb/ddeb/udeb

This commit is contained in:
Dan Streetman 2019-11-25 16:40:18 -05:00
parent 1a30454d1e
commit 82c8c438f7
4 changed files with 111 additions and 41 deletions

View File

@ -314,7 +314,7 @@ class SourcePackage(object):
yield self._mirror_url(mirror, name)
yield self._lp_url(name)
def _binary_urls(self, name, default_url):
def _binary_urls(self, name, default_urls):
"Generator of URLs for name"
for mirror in self.mirrors:
yield self._mirror_url(mirror, name)
@ -322,11 +322,13 @@ class SourcePackage(object):
if mirror not in self.mirrors:
yield self._mirror_url(mirror, name)
yield self._lp_url(name)
yield default_url
for url in default_urls:
yield url
def _binary_files_info(self, arch, name):
for bpph in self.lp_spph.getBinaries(arch=arch, name=name):
yield (bpph.getFileName(), bpph.getUrl(), 0)
def _binary_files_info(self, arch, name, ext):
for bpph in self.lp_spph.getBinaries(arch=arch, name=name, ext=ext):
urls = bpph.binaryFileUrls() + [bpph.getUrl()]
yield (bpph.getFileName(), urls, 0)
def pull_dsc(self):
"Retrieve dscfile and parse"
@ -508,20 +510,26 @@ class SourcePackage(object):
else:
raise DownloadError('File %s could not be found' % name)
def pull_binaries(self, arch, name=None):
def pull_binaries(self, arch, name=None, ext=None):
"""Pull binary debs into workdir.
If name is specified, only binary packages matching the regex are included.
If ext is specified, only binary packages with that ext are included; for
example to only download dbgsym ddebs, specify ext='ddeb'.
Must specify arch, or use 'all' to pull all archs.
Returns the number of files downloaded.
"""
total = 0
Logger.debug("pull_binaries(arch=%s, name=%s, ext=%s)" % (arch, name, ext))
if not arch:
raise RuntimeError("Must specify arch")
for (fname, furl, fsize) in self._binary_files_info(arch, name):
for (fname, furls, fsize) in self._binary_files_info(arch, name, ext):
found = False
for url in self._binary_urls(fname, furl):
for url in self._binary_urls(fname, furls):
try:
if self._download_file(url, fname, False, fsize):
found = True
@ -591,11 +599,11 @@ class DebianSPPH(SourcePackagePublishingHistory):
"""
resource_type = 'source_package_publishing_history'
def getBinaries(self, arch, name=None):
def getBinaries(self, arch, name=None, ext=None):
Logger.info('Using Snapshot to find binary packages')
srcpkg = Snapshot.getSourcePackage(self.getPackageName(),
version=self.getVersion())
return srcpkg.getSPPH().getBinaries(arch=arch, name=name)
return srcpkg.getSPPH().getBinaries(arch=arch, name=name, ext=ext)
class DebianSourcePackage(SourcePackage):
@ -655,9 +663,9 @@ class DebianSourcePackage(SourcePackage):
break
yield self.snapshot_files[name]
def _binary_files_info(self, arch, name):
for f in self.snapshot_package.getBinaryFiles(arch=arch, name=name):
yield (f.name, f.getUrl(), f.size)
def _binary_files_info(self, arch, name, ext):
for f in self.snapshot_package.getBinaryFiles(arch=arch, name=name, ext=ext):
yield (f.name, [f.getUrl()], f.size)
def pull_dsc(self):
"Retrieve dscfile and parse"
@ -973,7 +981,7 @@ class SnapshotSourcePackage(SnapshotPackage):
def getAllFiles(self):
return self.getFiles() + self.getBinaryFiles()
def getBinaryFiles(self, arch=None, name=None):
def getBinaryFiles(self, arch=None, name=None, ext=None):
if not self._binary_files:
url = "/mr/package/{}/{}/allfiles".format(self.name, self.version)
response = Snapshot.load("{}?fileinfo=1".format(url))
@ -985,9 +993,11 @@ class SnapshotSourcePackage(SnapshotPackage):
self._binary_files = files
bins = list(self._binary_files)
if arch:
bins = filter(lambda b: b.isArch(arch), bins)
bins = [b for b in bins if b.isArch(arch)]
if name:
bins = filter(lambda b: re.match(name, b.name), bins)
bins = [b for b in bins if re.match(name, b.package_name)]
if ext:
bins = [b for b in bins if re.match(ext, b.ext)]
return bins
def getFiles(self):
@ -1038,7 +1048,7 @@ class SnapshotBinaryPackage(SnapshotPackage):
for r in response['result']]
if not arch:
return list(self._files)
return filter(lambda f: f.isArch(arch), self._files)
return [f for f in self._files if f.isArch(arch)]
class SnapshotFile(object):
@ -1061,6 +1071,10 @@ class SnapshotFile(object):
def name(self):
return self._obj['name']
@property
def ext(self):
return self.name.rpartition('.')[2]
@property
def path(self):
return self._obj['path']
@ -1199,9 +1213,9 @@ class SnapshotSPPH(object):
new_entries.append(str(block))
return ''.join(new_entries)
def getBinaries(self, arch, name=None):
def getBinaries(self, arch, name=None, ext=None):
return [b.getBPPH()
for b in self._pkg.getBinaryFiles(arch=arch, name=name)]
for b in self._pkg.getBinaryFiles(arch=arch, name=name, ext=ext)]
class SnapshotBPPH(object):
@ -1255,11 +1269,23 @@ class SnapshotBPPH(object):
def getComponent(self):
return self._file.component
def binaryFileUrls(self):
return [self.getUrl()]
def getBuild(self):
return None
def getUrl(self):
return self._file.getUrl()
def getFileVersion(self):
return self.getVersion()
def getFileArch(self):
return self.arch
def getFileExt(self):
return self._file.ext
def getFileName(self):
return self._file.name

View File

@ -3,4 +3,4 @@
#
service = 'production'
api_version = '1.0'
api_version = 'devel'

View File

@ -795,11 +795,14 @@ class SourcePackagePublishingHistory(BaseWrapper):
new_entries.append(str(block))
return ''.join(new_entries)
def getBinaries(self, arch, name=None):
def getBinaries(self, arch, name=None, ext=None):
'''
Returns the resulting BinaryPackagePublishingHistorys.
Must specify arch, or use 'all' to get all archs.
If name is specified, only returns BPPH matching that (regex) name.
If ext is specified, only returns BPPH matching that (regex) ext.
'''
if not arch:
raise RuntimeError("Must specify arch")
@ -834,7 +837,7 @@ class SourcePackagePublishingHistory(BaseWrapper):
# strip out the URL leading text.
filename = url.rsplit('/', 1)[1]
# strip the file suffix
pkgname = filename.rsplit('.', 1)[0]
(pkgname, _, e) = filename.rpartition('.')
# split into name, version, arch
(n, v, a) = pkgname.rsplit('_', 2)
if a == 'all':
@ -845,6 +848,9 @@ class SourcePackagePublishingHistory(BaseWrapper):
# Only check the name requested - saves time
if name and not re.match(name, n):
continue
# Only check the ext requested - saves time
if ext and not re.match(ext, e):
continue
# If we already have this BPPH, keep going
if a in self._binaries and n in self._binaries[a]:
continue
@ -862,7 +868,7 @@ class SourcePackagePublishingHistory(BaseWrapper):
if a not in self._binaries:
self._binaries[a] = {}
self._binaries[a][n] = bpph
if not name and arch == 'all':
if not name and not ext and arch == 'all':
# We must have got them all
self._have_all_binaries = True
@ -874,7 +880,10 @@ class SourcePackagePublishingHistory(BaseWrapper):
bpphs = self._binaries[arch].copy().values()
if name:
bpphs = filter(lambda b: re.match(name, b.binary_package_name), bpphs)
bpphs = [b for b in bpphs if re.match(name, b.binary_package_name)]
if ext:
bpphs = [b for b in bpphs if re.match(ext, b.getFileExt())]
return bpphs
@ -938,6 +947,7 @@ class BinaryPackagePublishingHistory(BaseWrapper):
def __init__(self, *args):
self._arch = None
self._ext = None
@property
def arch(self):
@ -976,7 +986,11 @@ class BinaryPackagePublishingHistory(BaseWrapper):
Only available in the devel API, not 1.0
'''
try:
return self._lpobject.binaryFileUrls()
urls = self._lpobject.binaryFileUrls()
if not urls:
Logger.warning('BPPH %s_%s has no binaryFileUrls' %
(self.getPackageName(), self.getVersion()))
return urls
except AttributeError:
raise AttributeError("binaryFileUrls can only be found in lpapi "
"devel, not 1.0. Login using devel to have it.")
@ -1013,12 +1027,33 @@ class BinaryPackagePublishingHistory(BaseWrapper):
'''
Returns the file extension; "deb", "ddeb", or "udeb".
'''
if self.getPackageName().endswith("-dbgsym"):
return "ddeb"
elif self.getPackageName().endswith("-di"):
if not self._ext:
self._ext = self._getFileExt()
return self._ext
def _getFileExt(self):
try:
# this is the best way, from the actual URL filename
return self.binaryFileUrls()[0].rpartition('.')[2]
except (AttributeError, IndexError):
Logger.debug('Could not get file ext from url, trying to guess...')
# is_debug should be reliable way of detecting ddeb...?
try:
if self.is_debug:
return "ddeb"
except AttributeError:
# is_debug only available with api version 'devel'
if self.getPackageName().endswith("-dbgsym"):
return "ddeb"
# is this reliable?
if self.getPackageName().endswith("-di") or self.getPackageName().endswith("-udeb"):
return "udeb"
else:
return "deb"
# everything else - assume regular deb
return "deb"
def getFileName(self):
'''

View File

@ -53,6 +53,7 @@ PULL_UDEBS = 'udebs'
PULL_LIST = 'list'
VALID_PULLS = [PULL_SOURCE, PULL_DEBS, PULL_DDEBS, PULL_UDEBS, PULL_LIST]
VALID_BINARY_PULLS = [PULL_DEBS, PULL_DDEBS, PULL_UDEBS]
DISTRO_DEBIAN = 'debian'
DISTRO_UBUNTU = 'ubuntu'
@ -388,24 +389,32 @@ class PullPkg(object):
Logger.debug("--download-only specified, not extracting")
else:
srcpkg.unpack()
else:
name = '.*'
elif pull in VALID_BINARY_PULLS:
name = None
if package != spph.getPackageName():
Logger.info("Pulling only binary package '%s'", package)
Logger.info("Use package name '%s' to pull all binary packages",
spph.getPackageName())
name = package
if pull == PULL_DEBS:
name = r'{}(?<!-di)(?<!-dbgsym)$'.format(name)
elif pull == PULL_DDEBS:
name += '-dbgsym$'
elif pull == PULL_UDEBS:
name += '-di$'
else:
raise InvalidPullValueError("Invalid pull value %s" % pull)
# e.g. 'debs' -> 'deb'
ext = pull.rstrip('s')
if distro == DISTRO_DEBIAN:
# Debian ddebs don't use .ddeb extension, unfortunately :(
if pull in [PULL_DEBS, PULL_DDEBS]:
name = name or '.*'
ext = 'deb'
if pull == PULL_DEBS:
name += r'(?<!-dbgsym)$'
if pull == PULL_DDEBS:
name += r'-dbgsym$'
# allow DownloadError to flow up to caller
total = srcpkg.pull_binaries(name=name, arch=options['arch'])
total = srcpkg.pull_binaries(name=name, ext=ext, arch=options['arch'])
if total < 1:
Logger.error("No %s found for %s %s", pull,
package, spph.getVersion())
else:
Logger.error("Internal error: invalid pull value after parse_pull()")
raise InvalidPullValueError("Invalid pull value '%s'" % pull)