ubuntutools/archive: convert external rmadison call to native implementation

instead of a function that calls the system program rmadison, use
a fully-functional class to interface with the madison api, as well
as the debian snapshot api
This commit is contained in:
Dan Streetman 2017-03-31 11:26:26 -04:00 committed by Dan Streetman
parent f944d3146a
commit b96885f05e
4 changed files with 512 additions and 158 deletions

View File

@ -48,7 +48,9 @@ from ubuntutools.config import UDTConfig
from ubuntutools.lp.lpapicache import (Launchpad, Distribution, from ubuntutools.lp.lpapicache import (Launchpad, Distribution,
SourcePackagePublishingHistory, SourcePackagePublishingHistory,
BinaryPackagePublishingHistory) BinaryPackagePublishingHistory)
from ubuntutools.lp.udtexceptions import PackageNotFoundException from ubuntutools.lp.udtexceptions import (PackageNotFoundException,
SeriesNotFoundException,
InvalidDistroValueError)
from ubuntutools.logger import Logger from ubuntutools.logger import Logger
from ubuntutools.version import Version from ubuntutools.version import Version
@ -123,6 +125,7 @@ class SourcePackage(object):
directly. directly.
""" """
distribution = None distribution = None
spph_class = SourcePackagePublishingHistory
def __init__(self, package=None, version=None, component=None, def __init__(self, package=None, version=None, component=None,
dscfile=None, lp=None, mirrors=(), workdir='.', quiet=False, dscfile=None, lp=None, mirrors=(), workdir='.', quiet=False,
@ -139,6 +142,7 @@ class SourcePackage(object):
self.source = package self.source = package
self._lp = lp self._lp = lp
self.binary = None self.binary = None
self.try_binary = True
self.workdir = workdir self.workdir = workdir
self.quiet = quiet self.quiet = quiet
self._series = series self._series = series
@ -193,10 +197,10 @@ class SourcePackage(object):
params['pocket'] = self._pocket params['pocket'] = self._pocket
spphs = archive.getPublishedSources(source_name=self.source, **params) spphs = archive.getPublishedSources(source_name=self.source, **params)
if spphs: if spphs:
self._spph = SourcePackagePublishingHistory(spphs[0]) self._spph = self.spph_class(spphs[0])
return self._spph return self._spph
if not self.binary: if self.try_binary and not self.binary:
if series: if series:
arch_series = series.getArchSeries() arch_series = series.getArchSeries()
params['distro_arch_series'] = arch_series() params['distro_arch_series'] = arch_series()
@ -310,6 +314,12 @@ class SourcePackage(object):
yield self._lp_url(name) yield self._lp_url(name)
yield default_url yield default_url
def _binary_files_info(self, arch, name):
for bpph in self.lp_spph.getBinaries(arch):
if name and not re.match(name, bpph.binary_package_name):
continue
yield (bpph.getFileName(), bpph.getUrl(), 0)
def pull_dsc(self, verify_signature=False): def pull_dsc(self, verify_signature=False):
"Retrieve dscfile and parse" "Retrieve dscfile and parse"
if self._dsc_source: if self._dsc_source:
@ -423,10 +433,9 @@ class SourcePackage(object):
(downloaded / 1024.0 / 1024, (downloaded / 1024.0 / 1024,
size / 1024.0 / 1024)) size / 1024.0 / 1024))
def _download_file(self, url, filename, verify=True): def _download_file(self, url, filename, verify=True, size=0):
"Download url to filename in workdir." "Download url to filename in workdir."
pathname = os.path.join(self.workdir, filename) pathname = os.path.join(self.workdir, filename)
size = 0
if verify: if verify:
if self.dsc.verify_file(pathname): if self.dsc.verify_file(pathname):
Logger.debug('Using existing %s', filename) Logger.debug('Using existing %s', filename)
@ -494,13 +503,11 @@ class SourcePackage(object):
if not arch: if not arch:
raise RuntimeError("Must specify arch") raise RuntimeError("Must specify arch")
for bpph in self.lp_spph.getBinaries(arch): for (fname, furl, fsize) in self._binary_files_info(arch, name):
if name and not re.match(name, bpph.binary_package_name):
continue
found = False found = False
for url in self._binary_urls(bpph.getFileName(), bpph.getUrl()): for url in self._binary_urls(fname, furl):
try: try:
if self._download_file(url, bpph.getFileName(), verify=False): if self._download_file(url, fname, False, fsize):
found = True found = True
break break
except HTTPError as e: except HTTPError as e:
@ -510,7 +517,7 @@ class SourcePackage(object):
if found: if found:
total += 1 total += 1
else: else:
Logger.normal("Could not download from any location: %s", bpph.getFileName()) Logger.normal("Could not download from any location: %s", fname)
return total return total
def verify(self): def verify(self):
@ -562,42 +569,68 @@ class SourcePackage(object):
return os.path.abspath(difffn) return os.path.abspath(difffn)
class DebianSPPH(SourcePackagePublishingHistory):
"""SPPH with getBinaries() overridden,
as LP doesn't have Debian binaries
"""
resource_type = 'source_package_publishing_history'
def getBinaries(self, arch=None):
if not self._binaries:
Logger.normal('Using Snapshot to find binary packages')
srcpkg = Snapshot.getSourcePackage(self.getPackageName(),
version=self.getVersion())
self._binaries = [b.getBPPH() for b in srcpkg.getBinaryFiles()]
return super(DebianSPPH, self).getBinaries(arch)
class DebianSourcePackage(SourcePackage): class DebianSourcePackage(SourcePackage):
"Download / unpack a Debian source package" "Download / unpack a Debian source package"
distribution = 'debian' distribution = 'debian'
spph_class = DebianSPPH
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
super(DebianSourcePackage, self).__init__(*args, **kwargs) super(DebianSourcePackage, self).__init__(*args, **kwargs)
self.masters.append(UDTConfig.defaults['DEBSEC_MIRROR']) self.masters.append(UDTConfig.defaults['DEBSEC_MIRROR'])
# Cached values:
self._snapshot_list = None
# Overridden methods: # Cached values:
self._snapshot_package = None
self._snapshot_files = None
# Don't bother searching in LP for debian binaries, they aren't there
self.try_binary = False
# Debian doesn't have 'pockets'
if self._pocket:
if self._pocket.lower() != 'release':
Logger.error("Debian does not use 'pockets', ignoring pocket '%s'",
self._pocket)
self._pocket = None
# Overridden properties/methods:
@property @property
def lp_spph(self): def lp_spph(self):
"Return the LP Source Package Publishing History entry" "Return the LP Source Package Publishing History entry"
if not self._spph: if not self._spph:
try: try:
# superclass will set self._spph
return super(DebianSourcePackage, self).lp_spph return super(DebianSourcePackage, self).lp_spph
except IndexError: except PackageNotFoundException:
pass
except SeriesNotFoundException:
pass pass
Logger.normal('Using rmadison for component determination') Logger.normal('Package not found in Launchpad, using Snapshot')
comp = 'main' self._spph = self.snapshot_package.getSPPH()
for record in rmadison(self.distribution, self.source):
if record.get('source') != self.source:
continue
comp = record['component']
if record['version'] == self.version.full_version:
self._spph = FakeSPPH(record['source'], record['version'],
comp, 'debian')
return self._spph
Logger.normal('Guessing component from most recent upload')
self._spph = FakeSPPH(self.source, self.version.full_version, comp,
'debian')
return self._spph return self._spph
@property
def component(self):
"Cached archive component, in available"
if not self._component:
Logger.debug('Determining component from Snapshot')
self._component = Snapshot.getComponent(self.source, self.version)
return self._component
def _source_urls(self, name): def _source_urls(self, name):
"Generator of sources for name" "Generator of sources for name"
wrapped_iterator = super(DebianSourcePackage, self)._source_urls(name) wrapped_iterator = super(DebianSourcePackage, self)._source_urls(name)
@ -606,8 +639,13 @@ class DebianSourcePackage(SourcePackage):
yield next(wrapped_iterator) yield next(wrapped_iterator)
except StopIteration: except StopIteration:
break break
if self.snapshot_list: yield self.snapshot_files[name]
yield self._snapshot_url(name)
def _binary_files_info(self, arch, name):
for f in self.snapshot_package.getBinaryFiles(arch):
if name and not re.match(name, f.package_name):
continue
yield (f.name, f.getUrl(), f.size)
def pull_dsc(self, verify_signature=True): def pull_dsc(self, verify_signature=True):
"Retrieve dscfile and parse" "Retrieve dscfile and parse"
@ -631,32 +669,37 @@ class DebianSourcePackage(SourcePackage):
# Local methods: # Local methods:
@property @property
def snapshot_list(self): def snapshot_package(self):
"Return a filename -> hash dictionary from snapshot.debian.org" if not self._snapshot_package:
if self._snapshot_list is None: if self._version or self._spph:
# as .version uses .lpph, and our .lpph might use us,
# only use .version if _version or _spph are set
version = self.version.full_version
srcpkg = Snapshot.getSourcePackage(self.source, version=version)
if not srcpkg:
msg = "Package {} {} not found".format(self.source, version)
raise PackageNotFoundException(msg)
self._snapshot_package = srcpkg
else:
# we have neither version nor spph, so look up our version using madison
Logger.normal('Using madison to find latest version number')
series = self._series
params = {'series': series} if series else {}
srcpkg = Madison(self.distribution).getSourcePackage(self.source, **params)
if not srcpkg:
raise PackageNotFoundException("Package {} not found".format(self.source))
if self.source != srcpkg.name:
self.binary = self.source
self.source = srcpkg.name
self._snapshot_package = srcpkg
return self._snapshot_package
try: @property
data = self.url_opener.open( def snapshot_files(self):
'http://snapshot.debian.org/mr/package/%s/%s/srcfiles?fileinfo=1' % if not self._snapshot_files:
(self.source, self.version.full_version)) files = self.snapshot_package.getFiles()
reader = codecs.getreader('utf-8') self._snapshot_files = {f.name: f.getUrl() for f in files}
srcfiles = json.load(reader(data)) return self._snapshot_files
except HTTPError:
Logger.error('Version %s of %s not found on '
'snapshot.debian.org',
self.version.full_version, self.source)
self._snapshot_list = False
return False
self._snapshot_list = dict((info[0]['name'], hash_)
for hash_, info
in srcfiles['fileinfo'].items())
return self._snapshot_list
def _snapshot_url(self, name):
"Return the snapshot.debian.org URL for name"
return os.path.join('http://snapshot.debian.org/file',
self.snapshot_list[name])
class UbuntuSourcePackage(SourcePackage): class UbuntuSourcePackage(SourcePackage):
@ -678,25 +721,342 @@ class UbuntuCloudArchiveSourcePackage(UbuntuSourcePackage):
'+files', filename) '+files', filename)
class FakeSPPH(object): class _WebJSON(object):
"""Provide the same interface as def getHostUrl(self):
ubuntutools.lpapicache.SourcePackagePublishingHistory raise Exception("Not implemented")
"""
def __init__(self, name, version, component, distribution): def load(self, path=''):
reader = codecs.getreader('utf-8')
url = self.getHostUrl() + path
Logger.debug("Loading %s" % url)
with closing(urlopen(url)) as data:
return json.load(reader(data))
# DAKweb madison API
# https://github.com/Debian/dak/blob/master/dakweb/queries/madison.py
# This is really only useful to easily find the latest version of a
# package for a specific series (or unstable). This does not provide
# any details at all for older-than-latest package versions.
class Madison(_WebJSON):
urls = {
'debian': 'https://api.ftp-master.debian.org/madison',
'ubuntu': 'http://people.canonical.com/~ubuntu-archive/madison.cgi',
}
def __init__(self, distro='debian'):
super(Madison, self).__init__()
self._distro = distro
# This currently will NOT work with ubuntu; it doesn't support f=json
if distro != 'debian':
raise InvalidDistroValueError("Madison currently only supports Debian")
def getHostUrl(self):
return self.urls[self._distro]
def getSourcePackage(self, name, series='unstable'):
url = "?f=json&package={name}&s={series}".format(name=name, series=series)
try:
result = self.load(url)
except HTTPError:
result = None
if not result:
msg = "Package {} not found in '{}'".format(name, series)
raise PackageNotFoundException(msg)
versions = list(result[0][name].values())[0]
latest = versions[sorted(versions.keys(), reverse=True)[0]]
return Snapshot.getSourcePackage(name=latest['source'],
version=latest['source_version'])
# Snapshot API
# https://anonscm.debian.org/cgit/mirror/snapshot.debian.org.git/plain/API
class _Snapshot(_WebJSON):
DEBIAN_COMPONENTS = ["main", "contrib", "non-free"]
def getHostUrl(self):
return "http://snapshot.debian.org"
def getComponent(self, name, version):
# unfortunately there is no (easy) way to find the component for older
# package versions (madison only lists the most recent versions).
# so we have to parse the file path to determine the component :(
url = "/mr/package/{}/{}/srcfiles".format(name, version)
try:
response = self.load("{}?fileinfo=1".format(url))
except HTTPError:
msg = "Package {} version {} not found"
raise PackageNotFoundException(msg.format(name, version))
result = response.get('result')
info = response.get('fileinfo')
if len(result) < 1:
msg = "No source files for package {} version {}"
raise PackageNotFoundException(msg.format(name, version))
path = info[result[0]['hash']][0]['path']
# this expects the 'component' to follow 'pool[-*]' in the path
found_pool = False
component = None
for s in path.split('/'):
if found_pool:
component = s
break
if s.startswith('pool'):
found_pool = True
if not component:
Logger.warn("could not determine component from path %s" % path)
return self.DEBIAN_COMPONENTS[0]
if component not in self.DEBIAN_COMPONENTS:
Logger.warn("unexpected component %s" % component)
return component
def _get_package(self, name, url, pkginit, version, sort_key):
try:
results = self.load("/mr/{}/{}/".format(url, name))['result']
except HTTPError:
raise PackageNotFoundException("Package {} not found.".format(name))
results = sorted(results, key=lambda r: r[sort_key], reverse=True)
results = [pkginit(r) for r in results if version == r['version']]
if not results:
msg = "Package {name} version {version} not found."
raise PackageNotFoundException(msg.format(name=name, version=version))
return results
def getSourcePackages(self, name, version):
return self._get_package(name, "package",
lambda obj: SnapshotSourcePackage(obj, name),
version, "version")
def getSourcePackage(self, name, version):
return self.getSourcePackages(name, version)[0]
def getBinaryPackages(self, name, version):
return self._get_package(name, "binary",
lambda obj: SnapshotBinaryPackage(obj),
version, "binary_version")
def getBinaryPackage(self, name, version):
return self.getBinaryPackages(name, version)[0]
Snapshot = _Snapshot()
class SnapshotPackage(object):
def __init__(self, obj):
self._obj = obj
self._files = None
self._component = None
@property
def version(self):
return self._obj['version']
@property
def component(self):
if not self._component:
self._component = Snapshot.getComponent(self.name, self.version)
return self._component
class SnapshotSourcePackage(SnapshotPackage):
def __init__(self, obj, name):
# obj required fields: 'version'
super(SnapshotSourcePackage, self).__init__(obj)
self.name = name self.name = name
self.version = version self._binary_files = None
self._spph = None
def getSPPH(self):
if not self._spph:
self._spph = SnapshotSPPH(self)
return self._spph
def getAllFiles(self):
return self.getFiles() + self.getBinaryFiles()
def getBinaryFiles(self, arch=None):
if not self._binary_files:
url = "/mr/package/{}/{}/allfiles".format(self.name, self.version)
response = Snapshot.load("{}?fileinfo=1".format(url))
info = response['fileinfo']
files = [SnapshotBinaryFile(b['name'], b['version'], self.component,
info[r['hash']][0], r['hash'],
r['architecture'], self.name)
for b in response['result']['binaries'] for r in b['files']]
self._binary_files = files
if not arch:
return list(self._binary_files)
return filter(lambda f: f.isArch(arch), self._binary_files)
def getFiles(self):
if not self._files:
url = "/mr/package/{}/{}/srcfiles".format(self.name, self.version)
response = Snapshot.load("{}?fileinfo=1".format(url))
info = response['fileinfo']
self._files = [SnapshotSourceFile(self.name, self.version, self.component,
info[r['hash']][0], r['hash'])
for r in response['result']]
return list(self._files)
class SnapshotBinaryPackage(SnapshotPackage):
def __init__(self, obj):
# obj required fields: 'version', 'binary_version', 'name', 'source'
super(SnapshotBinaryPackage, self).__init__(obj)
@property
def name(self):
return self._obj['name']
@property
def binary_version(self):
return self._obj['binary_version']
@property
def source(self):
return self._obj['source']
def getBPPH(self, arch):
f = self.getFiles(arch)
if not f:
return None
# Can only be 1 binary file for this pkg name/version/arch
return f[0].getBPPH()
def getFiles(self, arch=None):
if not self._files:
url = "/mr/binary/{}/{}/binfiles".format(self.name, self.version)
response = Snapshot.load("{}?fileinfo=1".format(url))
info = response['fileinfo']
self._files = [SnapshotBinaryFile(self.name, self.version, self.component,
info[r['hash']][0], r['hash'],
r['architecture'], self.source)
for r in response['result']]
if not arch:
return list(self._files)
return filter(lambda f: f.isArch(arch), self._files)
class SnapshotFile(object):
def __init__(self, pkg_name, pkg_version, component, obj, h):
self.package_name = pkg_name
self.package_version = pkg_version
self.component = component self.component = component
self.distribution = distribution self._obj = obj
self._changelog = None self._hash = h
@property
def getType(self):
return None
@property
def archive_name(self):
return self._obj['archive_name']
@property
def name(self):
return self._obj['name']
@property
def path(self):
return self._obj['path']
@property
def size(self):
return self._obj['size']
@property
def date(self):
if 'run' in self._obj:
return self._obj['run']
elif 'first_seen' in self._obj:
return self._obj['first_seen']
else:
Logger.error('File {} has no date information', self.name)
return 'unknown'
def getHash(self):
return self._hash
def getUrl(self):
return "{}/file/{}".format(Snapshot.getHostUrl(), self.getHash())
def __repr__(self):
return "{}/{} {} bytes {}".format(self.path, self.name, self.size, self.date)
class SnapshotSourceFile(SnapshotFile):
def __init__(self, name, version, component, obj, h):
super(SnapshotSourceFile, self).__init__(name, version, component, obj, h)
def getType(self):
return 'source'
class SnapshotBinaryFile(SnapshotFile):
def __init__(self, name, version, component, obj, h, arch, source):
super(SnapshotBinaryFile, self).__init__(name, version, component, obj, h)
self.source = source
self.arch = arch
self._bpph = None
def isArch(self, arch):
if not arch:
return True
if self.arch == 'all':
return True
return arch == self.arch
def getType(self):
return 'binary'
def getBPPH(self):
if not self._bpph:
self._bpph = SnapshotBPPH(self)
return self._bpph
class SnapshotSPPH(object):
"""Provide the same interface as SourcePackagePublishingHistory"""
def __init__(self, snapshot_pkg):
self._pkg = snapshot_pkg
# LP API defined fields
@property
def component_name(self):
return self.getComponent()
@property
def display_name(self):
return ("{name} {version}"
.format(name=self.getPackageName(),
version=self.getVersion()))
@property
def pocket(self):
# Debian does not use 'pockets'
return 'Release'
@property
def source_package_name(self):
return self.getPackageName()
@property
def source_package_version(self):
return self.getVersion()
# SPPH functions
def getPackageName(self): def getPackageName(self):
return self.name return self._pkg.name
def getVersion(self): def getVersion(self):
return self.version return self._pkg.version
def getComponent(self): def getComponent(self):
return self.component return self._pkg.component
def getChangelog(self, since_version=None): def getChangelog(self, since_version=None):
''' '''
@ -704,28 +1064,23 @@ class FakeSPPH(object):
May return None if the changelog isn't available May return None if the changelog isn't available
''' '''
if self._changelog is None: if self._changelog is None:
if self.name.startswith('lib'): name = self.getPackageName()
subdir = 'lib%s' % self.name[3] if name.startswith('lib'):
subdir = 'lib%s' % name[3]
else: else:
subdir = self.name[0] subdir = name[0]
# Strip epoch from version pkgversion = Version(self.getVersion()).strip_epoch()
pkgversion = self.version.split(':', 1)[-1] base = 'http://packages.debian.org/'
extension = ''
if self.distribution == 'debian':
base = 'http://packages.debian.org/'
extension = '.txt'
elif self.distribution == 'ubuntu':
base = 'http://changelogs.ubuntu.com/'
url = os.path.join(base, 'changelogs', 'pool', url = os.path.join(base, 'changelogs', 'pool',
self.component, subdir, self.name, self.getComponent(), subdir, name,
self.name + '_' + pkgversion, name + '_' + pkgversion,
'changelog' + extension) 'changelog.txt')
try: try:
with closing(urlopen(url)) as f: with closing(urlopen(url)) as f:
self._changelog = f.read() self._changelog = f.read()
except HTTPError as error: except HTTPError as error:
print(('%s: %s' % (url, error)), file=sys.stderr) Logger.error('{}: {}'.format(url, error))
return None return None
if since_version is None: if since_version is None:
@ -741,59 +1096,66 @@ class FakeSPPH(object):
new_entries.append(str(block)) new_entries.append(str(block))
return ''.join(new_entries) return ''.join(new_entries)
def getBinaries(self, arch=None):
return [b.getBPPH() for b in self._pkg.getBinaryFiles(arch)]
def rmadison(url, package, suite=None, arch=None):
"Call rmadison and parse the result"
cmd = ['rmadison', '-u', url]
if suite:
cmd += ['-s', suite]
if arch:
cmd += ['-a', arch]
cmd.append(package)
process = subprocess.Popen(
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding='utf-8')
output, error_output = process.communicate()
if process.wait() != 0:
if error_output:
Logger.error('rmadison failed with: %s', error_output)
else:
Logger.error('rmadison failed')
sys.exit(1)
# rmadison uses some shorthand class SnapshotBPPH(object):
if suite: """Provide the same interface as BinaryPackagePublishingHistory"""
suite = suite.replace('-proposed-updates', '-p-u') def __init__(self, snapshot_binfile):
self._file = snapshot_binfile
for line in output.strip().splitlines(): # LP API defined fields
pkg, ver, dist, archs = [x.strip() for x in line.split('|')] @property
comp = 'main' def architecture_specific(self):
if '/' in dist: return self._file.arch != 'all'
dist, comp = dist.split('/')
archs = set(x.strip() for x in archs.split(','))
# rmadison returns some results outside the requested set. @property
# It'll include backports, and when given an unknown suite, def binary_package_name(self):
# it ignores that argument return self.getPackageName()
#
# some versions (2.14.1ubuntu0.1) of rmadison return 'sid' when
# asked about 'unstable'. Others return 'unstable'. Accept either.
if (suite and dist != suite and not
(suite == 'sid' and dist == 'unstable')):
continue
if 'source' in archs: @property
yield { def binary_package_version(self):
'source': pkg, return self.getVersion()
'version': ver,
'suite': dist, @property
'component': comp, def component_name(self):
} return self.getComponent()
archs.discard('source')
if archs: @property
yield { def display_name(self):
'binary': pkg, return ("{name} {version}"
'version': ver, .format(name=self.getPackageName(),
'suite': dist, version=self.getVersion()))
'component': comp,
'architectures': archs, @property
} def pocket(self):
# Debian does not use 'pockets'
return 'Release'
# BPPH functions
@property
def arch(self):
return self._file.arch
def getSourcePackageName(self):
return self._file.source
def getPackageName(self):
return self._file.package_name
def getVersion(self):
return self._file.package_version
def getComponent(self):
return self._file.component
def getBuild(self):
return None
def getUrl(self):
return self._file.getUrl()
def getFileName(self):
return self._file.name

View File

@ -26,3 +26,8 @@ class AlreadyLoggedInError(Exception):
class ArchSeriesNotFoundException(BaseException): class ArchSeriesNotFoundException(BaseException):
"""Thrown when a distroarchseries is not found.""" """Thrown when a distroarchseries is not found."""
pass pass
class InvalidDistroValueError(ValueError):
""" Thrown when distro value is invalid """
pass

View File

@ -31,11 +31,10 @@ import tempfile
from debian.changelog import Changelog from debian.changelog import Changelog
from distro_info import DebianDistroInfo, DistroDataOutdated from distro_info import DebianDistroInfo, DistroDataOutdated
from ubuntutools.archive import rmadison, FakeSPPH from ubuntutools.archive import DebianSourcePackage, UbuntuSourcePackage
from ubuntutools.lp.udtexceptions import PackageNotFoundException from ubuntutools.lp.udtexceptions import PackageNotFoundException
from ubuntutools.logger import Logger from ubuntutools.logger import Logger
from ubuntutools.question import confirmation_prompt, YesNoQuestion from ubuntutools.question import confirmation_prompt, YesNoQuestion
from ubuntutools.version import Version
__all__ = [ __all__ = [
@ -48,32 +47,21 @@ __all__ = [
] ]
def _get_srcpkg(distro, name, release):
if distro == 'debian':
# Canonicalise release:
debian_info = DebianDistroInfo()
try:
codename = debian_info.codename(release, default=release)
except DistroDataOutdated as e:
Logger.warn(e)
lines = list(rmadison(distro, name, suite=codename, arch='source'))
if not lines:
lines = list(rmadison(distro, name, suite=release, arch='source'))
if not lines:
raise PackageNotFoundException("'%s' doesn't appear to exist in %s '%s'" %
(name, distro.capitalize(), release))
pkg = max(lines, key=lambda x: Version(x['version']))
return FakeSPPH(pkg['source'], pkg['version'], pkg['component'], distro)
def get_debian_srcpkg(name, release): def get_debian_srcpkg(name, release):
return _get_srcpkg('debian', name, release) # Canonicalise release:
debian_info = DebianDistroInfo()
try:
codename = debian_info.codename(release, default=release)
return DebianSourcePackage(package=name, series=codename).lp_spph
except DistroDataOutdated as e:
Logger.warn(e)
except PackageNotFoundException:
pass
return DebianSourcePackage(package=name, series=release).lp_spph
def get_ubuntu_srcpkg(name, release): def get_ubuntu_srcpkg(name, release):
return _get_srcpkg('ubuntu', name, release) return UbuntuSourcePackage(package=name, series=release).lp_spph
def need_sponsorship(name, component, release): def need_sponsorship(name, component, release):

View File

@ -85,7 +85,6 @@ class LocalSourcePackageTestCase(unittest.TestCase):
self.workdir = tempfile.mkdtemp(prefix='udt-test') self.workdir = tempfile.mkdtemp(prefix='udt-test')
self._stubout('ubuntutools.archive.Distribution') self._stubout('ubuntutools.archive.Distribution')
self._stubout('ubuntutools.archive.rmadison')
self.mock_http = self._stubout('httplib2.Http.request') self.mock_http = self._stubout('httplib2.Http.request')
self.mock_http.side_effect = self.request_proxy self.mock_http.side_effect = self.request_proxy