mirror of
https://git.launchpad.net/ubuntu-dev-tools
synced 2025-03-13 08:01:09 +00:00
support the param for both native BPPH as well as Debian Snapshot emulated BPPH add sourceFileUrls() function add helper functions to get file urls, checksums, and size
1393 lines
50 KiB
Python
1393 lines
50 KiB
Python
# -*- coding: utf-8 -*-
|
|
#
|
|
# lpapicache.py - wrapper classes around the LP API implementing caching
|
|
# for usage in the ubuntu-dev-tools package
|
|
#
|
|
# Copyright © 2009-2010 Michael Bienia <geser@ubuntu.com>
|
|
# 2011 Stefano Rivera <stefanor@ubuntu.com>
|
|
#
|
|
# This program is free software; you can redistribute it and/or
|
|
# modify it under the terms of the GNU General Public License
|
|
# as published by the Free Software Foundation; either version 3
|
|
# of the License, or (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# Please see the /usr/share/common-licenses/GPL file for the full text
|
|
# of the GNU General Public License license.
|
|
#
|
|
# Based on code written by Jonathan Davies <jpds@ubuntu.com>
|
|
|
|
# Uncomment for tracing LP API calls
|
|
# import httplib2
|
|
# httplib2.debuglevel = 1
|
|
|
|
import collections
|
|
import os
|
|
import re
|
|
|
|
from debian.changelog import Changelog
|
|
from httplib2 import Http, HttpLib2Error
|
|
from launchpadlib.launchpad import Launchpad as LP
|
|
from launchpadlib.errors import HTTPError
|
|
from lazr.restfulclient.resource import Entry
|
|
from urllib.parse import urlparse
|
|
|
|
from ubuntutools.version import Version
|
|
from ubuntutools.lp import (service, api_version)
|
|
from ubuntutools.misc import (host_architecture,
|
|
DEFAULT_POCKETS, POCKETS,
|
|
DEFAULT_STATUSES, STATUSES)
|
|
from ubuntutools.lp.udtexceptions import (AlreadyLoggedInError,
|
|
ArchiveNotFoundException,
|
|
ArchSeriesNotFoundException,
|
|
PackageNotFoundException,
|
|
PocketDoesNotExistError,
|
|
SeriesNotFoundException)
|
|
|
|
import logging
|
|
Logger = logging.getLogger(__name__)
|
|
|
|
|
|
__all__ = [
|
|
'Archive',
|
|
'BinaryPackagePublishingHistory',
|
|
'Build',
|
|
'Distribution',
|
|
'DistributionSourcePackage',
|
|
'DistroSeries',
|
|
'DistroArchSeries',
|
|
'Launchpad',
|
|
'PersonTeam',
|
|
'SourcePackagePublishingHistory',
|
|
]
|
|
|
|
|
|
class _Launchpad(object):
|
|
'''Singleton for LP API access.'''
|
|
|
|
def login(self, service=service, api_version=api_version):
|
|
'''Enforce a non-anonymous login.'''
|
|
if not self.logged_in:
|
|
try:
|
|
self.__lp = LP.login_with('ubuntu-dev-tools', service,
|
|
version=api_version)
|
|
except IOError as error:
|
|
Logger.error(str(error))
|
|
raise
|
|
else:
|
|
raise AlreadyLoggedInError('Already logged in to Launchpad.')
|
|
|
|
def login_anonymously(self, service=service, api_version=api_version):
|
|
'''Enforce an anonymous login.'''
|
|
if not self.logged_in:
|
|
self.__lp = LP.login_anonymously('ubuntu-dev-tools', service,
|
|
version=api_version)
|
|
else:
|
|
raise AlreadyLoggedInError('Already logged in to Launchpad.')
|
|
|
|
def login_existing(self, lp):
|
|
'''Use an already logged in Launchpad object'''
|
|
if not self.logged_in:
|
|
self.__lp = lp
|
|
else:
|
|
raise AlreadyLoggedInError('Already logged in to Launchpad.')
|
|
|
|
@property
|
|
def logged_in(self):
|
|
'''Are we logged in?'''
|
|
return '_Launchpad__lp' in self.__dict__
|
|
|
|
def __getattr__(self, attr):
|
|
if not self.logged_in:
|
|
self.login()
|
|
return getattr(self.__lp, attr)
|
|
|
|
def __call__(self):
|
|
return self
|
|
|
|
|
|
Launchpad = _Launchpad()
|
|
|
|
|
|
class MetaWrapper(type):
|
|
'''
|
|
A meta class used for wrapping LP API objects.
|
|
'''
|
|
def __init__(cls, name, bases, attrd):
|
|
super(MetaWrapper, cls).__init__(name, bases, attrd)
|
|
if 'resource_type' not in attrd:
|
|
raise TypeError('Class "%s" needs an associated resource type' %
|
|
name)
|
|
cls._cache = dict()
|
|
|
|
|
|
class BaseWrapper(object, metaclass=MetaWrapper):
|
|
'''
|
|
A base class from which other wrapper classes are derived.
|
|
'''
|
|
resource_type = None # it's a base class after all
|
|
|
|
def __new__(cls, data):
|
|
if isinstance(data, str) and data.startswith(str(Launchpad._root_uri)):
|
|
# looks like a LP API URL
|
|
# check if it's already cached
|
|
cached = cls._cache.get(data)
|
|
if cached:
|
|
return cached
|
|
|
|
# not cached, so try to get it
|
|
try:
|
|
data = Launchpad.load(data)
|
|
except HTTPError:
|
|
# didn't work
|
|
pass
|
|
|
|
if isinstance(data, Entry):
|
|
(service_root, resource_type) = data.resource_type_link.split('#')
|
|
if service_root == str(Launchpad._root_uri) and resource_type in cls.resource_type:
|
|
# check if it's already cached
|
|
cached = cls._cache.get(data.self_link)
|
|
if not cached:
|
|
# create a new instance
|
|
cached = object.__new__(cls)
|
|
cached._lpobject = data
|
|
# and add it to our cache
|
|
cls._cache[data.self_link] = cached
|
|
Logger.debug("%s: %s" % (cls.__name__, data.self_link))
|
|
# add additional class specific caching (if available)
|
|
cache = getattr(cls, 'cache', None)
|
|
if isinstance(cache, collections.Callable):
|
|
cache(cached)
|
|
return cached
|
|
else:
|
|
raise TypeError("'%s' is not a '%s' object" %
|
|
(str(data), str(cls.resource_type)))
|
|
else:
|
|
# not a LP API representation, let the specific class handle it
|
|
fetch = getattr(cls, 'fetch', None)
|
|
if isinstance(fetch, collections.Callable):
|
|
return fetch(data)
|
|
else:
|
|
raise NotImplementedError("Don't know how to fetch '%s' from LP"
|
|
% str(data))
|
|
|
|
def __call__(self):
|
|
return self._lpobject
|
|
|
|
def __getattr__(self, attr):
|
|
return getattr(self._lpobject, attr)
|
|
|
|
def __repr__(self):
|
|
if hasattr(str, 'format'):
|
|
return '<{0}: {1!r}>'.format(self.__class__.__name__,
|
|
self._lpobject)
|
|
else:
|
|
return '<%s: %r>' % (self.__class__.__name__, self._lpobject)
|
|
|
|
|
|
class Distribution(BaseWrapper):
|
|
'''
|
|
Wrapper class around a LP distribution object.
|
|
'''
|
|
resource_type = 'distribution'
|
|
|
|
def __init__(self, *args):
|
|
self._archives = dict()
|
|
self._series_by_name = dict()
|
|
self._series = dict()
|
|
self._dev_series = None
|
|
self._have_all_series = False
|
|
|
|
def cache(self):
|
|
self._cache[self.name] = self
|
|
|
|
def _cache_series(self, series):
|
|
'''
|
|
Add the DistroSeries to the cache if needed.
|
|
'''
|
|
if series.version not in self._series:
|
|
self._series_by_name[series.name] = series
|
|
self._series[series.version] = series
|
|
|
|
@classmethod
|
|
def fetch(cls, dist):
|
|
'''
|
|
Fetch the distribution object identified by 'dist' from LP.
|
|
'''
|
|
if not isinstance(dist, str):
|
|
raise TypeError("Don't know what do with '%r'" % dist)
|
|
cached = cls._cache.get(dist)
|
|
if not cached:
|
|
cached = Distribution(Launchpad.distributions[dist])
|
|
return cached
|
|
|
|
def getArchive(self, archive=None):
|
|
'''
|
|
Returns an Archive object for the requested archive.
|
|
Raises a ArchiveNotFoundException if the archive doesn't exist.
|
|
|
|
If 'archive' is None, return the main archive.
|
|
'''
|
|
if archive:
|
|
res = self._archives.get(archive)
|
|
|
|
if not res:
|
|
for a in self.archives:
|
|
if a.name == archive:
|
|
res = Archive(a)
|
|
self._archives[res.name] = res
|
|
break
|
|
|
|
if res:
|
|
return res
|
|
else:
|
|
message = "The Archive '%s' doesn't exist in %s" % \
|
|
(archive, self.display_name)
|
|
raise ArchiveNotFoundException(message)
|
|
else:
|
|
if '_main_archive' not in self.__dict__:
|
|
self._main_archive = Archive(self.main_archive_link)
|
|
return self._main_archive
|
|
|
|
def getSeries(self, name_or_version):
|
|
'''
|
|
Returns a DistroSeries object for a series passed by name
|
|
(e.g. 'karmic') or version (e.g. '9.10').
|
|
If the series is not found: raise SeriesNotFoundException
|
|
'''
|
|
if name_or_version in self._series:
|
|
return self._series[name_or_version]
|
|
if name_or_version in self._series_by_name:
|
|
return self._series_by_name[name_or_version]
|
|
|
|
try:
|
|
series = DistroSeries(self().getSeries(name_or_version=name_or_version))
|
|
except HTTPError:
|
|
message = "Release '%s' is unknown in '%s'." % \
|
|
(name_or_version, self.display_name)
|
|
raise SeriesNotFoundException(message)
|
|
|
|
self._cache_series(series)
|
|
return series
|
|
|
|
def getDevelopmentSeries(self):
|
|
'''
|
|
Returns a DistroSeries object of the current development series.
|
|
'''
|
|
if not self._dev_series:
|
|
series = DistroSeries(self.current_series_link)
|
|
self._cache_series(series)
|
|
self._dev_series = series
|
|
return self._dev_series
|
|
|
|
def getAllSeries(self, active=True):
|
|
'''
|
|
Returns a list of all DistroSeries objects.
|
|
'''
|
|
if not self._have_all_series:
|
|
for s in Launchpad.load(self.series_collection_link).entries:
|
|
series = DistroSeries(s['self_link'])
|
|
self._cache_series(series)
|
|
self._have_all_series = True
|
|
|
|
allseries = filter(lambda s: s.active, self._series.values())
|
|
allseries = sorted(allseries,
|
|
key=lambda s: float(s.version),
|
|
reverse=True)
|
|
Logger.debug("Found series: %s" % ", ".join(map(lambda s: "%s (%s)" %
|
|
(s.name, s.version),
|
|
allseries)))
|
|
return collections.OrderedDict((s.name, s) for s in allseries)
|
|
|
|
|
|
class DistroArchSeries(BaseWrapper):
|
|
'''
|
|
Wrapper class around a LP distro arch series object.
|
|
'''
|
|
resource_type = 'distro_arch_series'
|
|
|
|
def getSeries(self):
|
|
'''
|
|
Get DistroSeries for this.
|
|
'''
|
|
return DistroSeries(self._lpobject.distroseries_link)
|
|
|
|
|
|
class DistroSeries(BaseWrapper):
|
|
'''
|
|
Wrapper class around a LP distro series object.
|
|
'''
|
|
resource_type = 'distro_series'
|
|
|
|
def __init__(self, *args):
|
|
if "_architectures" not in self.__dict__:
|
|
self._architectures = dict()
|
|
|
|
def getArchSeries(self, archtag=None):
|
|
'''
|
|
Returns a DistroArchSeries object for an architecture passed by name
|
|
(e.g. 'amd64').
|
|
If arch is not specified, get the DistroArchSeries for the system arch.
|
|
The special archtag 'all' will get the system arch.
|
|
If the architecture is not found: raise ArchSeriesNotFoundException.
|
|
'''
|
|
if not archtag or archtag == 'all':
|
|
archtag = host_architecture()
|
|
if archtag not in self._architectures:
|
|
try:
|
|
architecture = DistroArchSeries(
|
|
self().getDistroArchSeries(archtag=archtag))
|
|
self._architectures[architecture.architecture_tag] = (
|
|
architecture)
|
|
except HTTPError:
|
|
message = "Architecture %s is unknown." % archtag
|
|
raise ArchSeriesNotFoundException(message)
|
|
return self._architectures[archtag]
|
|
|
|
|
|
class Archive(BaseWrapper):
|
|
'''
|
|
Wrapper class around a LP archive object.
|
|
'''
|
|
resource_type = 'archive'
|
|
|
|
def __init__(self, *args):
|
|
self._binpkgs = {}
|
|
self._srcpkgs = {}
|
|
self._pkg_uploaders = {}
|
|
self._pkgset_uploaders = {}
|
|
self._component_uploaders = {}
|
|
|
|
def getSourcePackage(self, name, series=None, pocket=None, version=None,
|
|
status=None, wrapper=None, search_all_series=False):
|
|
'''
|
|
Returns a SourcePackagePublishingHistory object for the most
|
|
recent source package in the distribution 'dist', series and
|
|
pocket.
|
|
|
|
series defaults to the current development series if not specified.
|
|
series must be either a series name string, or DistroSeries object.
|
|
|
|
version may be specified to get only the exact version requested.
|
|
|
|
pocket may be a string or a list. If no version is provided, it
|
|
defaults to all pockets except 'Backports'; if searching for a
|
|
specific version, it defaults to all pockets. Pocket strings must
|
|
be capitalized.
|
|
|
|
status may be a string or a list. If no version is provided, it
|
|
defaults to only 'Pending' and 'Published'; if searching for a
|
|
specific version, it defaults to all statuses. Status strings must
|
|
be capitalized.
|
|
|
|
wrapper is the class to return an instance of; defaults to
|
|
SourcePackagePublishingHistory.
|
|
|
|
search_all_series is used if series is None. If False, this will
|
|
search only the latest devel series, and if True all series
|
|
will be searched, in reverse order, starting with the latest
|
|
devel series. Defaults to False.
|
|
|
|
If the requested source package doesn't exist a
|
|
PackageNotFoundException is raised.
|
|
'''
|
|
return self._getPublishedItem(name, series, pocket, cache=self._srcpkgs,
|
|
function='getPublishedSources',
|
|
name_key='source_name',
|
|
wrapper=wrapper or SourcePackagePublishingHistory,
|
|
version=version,
|
|
status=status,
|
|
search_all_series=search_all_series,
|
|
binary=False)
|
|
|
|
def getBinaryPackage(self, name, archtag=None, series=None, pocket=None,
|
|
version=None, status=None, wrapper=None,
|
|
search_all_series=False):
|
|
'''
|
|
Returns a BinaryPackagePublishingHistory object for the most
|
|
recent source package in the distribution 'dist', architecture
|
|
'archtag', series and pocket.
|
|
|
|
series defaults to the current development series if not specified.
|
|
series must be either a series name string, or DistroArchSeries object.
|
|
series may be omitted if version is specified.
|
|
|
|
version may be specified to get only the exact version requested.
|
|
|
|
pocket may be a string or a list. If no version is provided, it
|
|
defaults to all pockets except 'Backports'; if searching for a
|
|
specific version, it defaults to all pockets. Pocket strings must
|
|
be capitalized.
|
|
|
|
status may be a string or a list. If no version is provided, it
|
|
defaults to only 'Pending' and 'Published'; if searching for a
|
|
specific version, it defaults to all statuses. Status strings must
|
|
be capitalized.
|
|
|
|
wrapper is the class to return an instance of; defaults to
|
|
BinaryPackagePublishingHistory.
|
|
|
|
search_all_series is used if series is None. If False, this will
|
|
search only the latest devel series, and if True all series
|
|
will be searched, in reverse order, starting with the latest
|
|
devel series. Defaults to False.
|
|
|
|
If the requested binary package doesn't exist a
|
|
PackageNotFoundException is raised.
|
|
'''
|
|
return self._getPublishedItem(name, series, pocket, archtag=archtag,
|
|
cache=self._binpkgs,
|
|
function='getPublishedBinaries',
|
|
name_key='binary_name',
|
|
wrapper=wrapper or BinaryPackagePublishingHistory,
|
|
version=version,
|
|
status=status,
|
|
search_all_series=search_all_series,
|
|
binary=True)
|
|
|
|
def _getPublishedItem(self, name, series, pocket, cache,
|
|
function, name_key, wrapper, archtag=None,
|
|
version=None, status=None, search_all_series=False,
|
|
binary=False):
|
|
'''
|
|
Common code between getSourcePackage and getBinaryPackage.
|
|
|
|
Don't use this directly.
|
|
'''
|
|
if not pocket:
|
|
if version and not series:
|
|
# check ALL pockets if specific version in any series
|
|
pockets = POCKETS
|
|
else:
|
|
# otherwise, check all pockets EXCEPT 'Backports'
|
|
pockets = DEFAULT_POCKETS
|
|
elif isinstance(pocket, str):
|
|
pockets = (pocket,)
|
|
else:
|
|
pockets = tuple(pocket)
|
|
|
|
for p in pockets:
|
|
if p not in POCKETS:
|
|
raise PocketDoesNotExistError("Pocket '%s' does not exist." % p)
|
|
|
|
if not status:
|
|
if version:
|
|
# check ALL statuses if specific version
|
|
statuses = STATUSES
|
|
else:
|
|
# otherwise, only check 'Pending' and 'Published'
|
|
statuses = DEFAULT_STATUSES
|
|
elif isinstance(status, str):
|
|
statuses = (status,)
|
|
else:
|
|
statuses = tuple(status)
|
|
|
|
for s in statuses:
|
|
if s not in STATUSES:
|
|
raise ValueError("Status '%s' is not valid." % s)
|
|
|
|
dist = Distribution(self.distribution_link)
|
|
|
|
# please don't pass DistroArchSeries as archtag!
|
|
# but, the code was like that before so keep
|
|
# backwards compatibility.
|
|
if isinstance(archtag, DistroArchSeries):
|
|
series = archtag
|
|
archtag = None
|
|
|
|
series_to_check = [series]
|
|
if not version and not series:
|
|
# if neither version or series are specified, use either the
|
|
# devel series or search all series
|
|
if search_all_series:
|
|
series_to_check = dist.getAllSeries().values()
|
|
else:
|
|
series_to_check = [dist.getDevelopmentSeries()]
|
|
|
|
# check each series - if only version was provided, series will be None
|
|
for series in series_to_check:
|
|
arch_series = None
|
|
|
|
if isinstance(series, DistroArchSeries):
|
|
arch_series = series
|
|
series = series.getSeries()
|
|
elif isinstance(series, DistroSeries):
|
|
pass
|
|
elif series:
|
|
series = dist.getSeries(series)
|
|
|
|
if binary:
|
|
if arch_series is None and series:
|
|
arch_series = series.getArchSeries(archtag=archtag)
|
|
if archtag is None and arch_series:
|
|
archtag = arch_series.architecture_tag
|
|
if archtag is None:
|
|
archtag = host_architecture()
|
|
|
|
index = (name, getattr(series, 'name', None), archtag, pockets, statuses, version)
|
|
|
|
if index in cache:
|
|
return cache[index]
|
|
|
|
params = {
|
|
name_key: name,
|
|
'exact_match': True,
|
|
}
|
|
|
|
if arch_series:
|
|
params['distro_arch_series'] = arch_series()
|
|
elif series:
|
|
params['distro_series'] = series()
|
|
|
|
if len(pockets) == 1:
|
|
params['pocket'] = pockets[0]
|
|
|
|
if len(statuses) == 1:
|
|
params['status'] = statuses[0]
|
|
|
|
if version:
|
|
params['version'] = version
|
|
|
|
Logger.debug('Calling %s(%s)' % (function,
|
|
', '.join(['%s=%s' % (k, v)
|
|
for (k, v) in params.items()])))
|
|
records = getattr(self, function)(**params)
|
|
|
|
err_msg = ("does not exist in the %s %s archive" %
|
|
(dist.display_name, self.name))
|
|
|
|
for record in records:
|
|
if binary:
|
|
rversion = getattr(record, 'binary_package_version', None)
|
|
else:
|
|
rversion = getattr(record, 'source_package_version', None)
|
|
skipmsg = ('Skipping version %s: ' % rversion)
|
|
|
|
if record.pocket not in pockets:
|
|
err_msg = 'pocket %s not in (%s)' % (record.pocket, ','.join(pockets))
|
|
Logger.debug(skipmsg + err_msg)
|
|
continue
|
|
if record.status not in statuses:
|
|
err_msg = 'status %s not in (%s)' % (record.status, ','.join(statuses))
|
|
Logger.debug(skipmsg + err_msg)
|
|
continue
|
|
r = wrapper(record)
|
|
if binary and archtag and archtag != r.arch:
|
|
err_msg = 'arch %s does not match requested arch %s' % (r.arch, archtag)
|
|
Logger.debug(skipmsg + err_msg)
|
|
continue
|
|
# results are ordered so first is latest
|
|
cache[index] = r
|
|
return r
|
|
|
|
version_with_epoch = None
|
|
if version and version == Version(version).strip_epoch() and len(records) == 0:
|
|
# a specific version was asked for, but we found none;
|
|
# check if one exists with an epoch to give a hint in error msg
|
|
for epoch in range(1, 9):
|
|
v = Version(version)
|
|
v.epoch = epoch
|
|
params['version'] = v.full_version
|
|
if len(getattr(self, function)(**params)) > 0:
|
|
version_with_epoch = v.full_version
|
|
Logger.debug('Found version with epoch %s' % version_with_epoch)
|
|
break
|
|
|
|
if name_key == 'binary_name':
|
|
package_type = "binary package"
|
|
elif name_key == 'source_name':
|
|
package_type = "source package"
|
|
else:
|
|
package_type = "package"
|
|
msg = "The %s '%s' " % (package_type, name)
|
|
if version:
|
|
msg += "version %s " % version
|
|
msg += err_msg
|
|
if binary and archtag:
|
|
msg += " for architecture %s" % archtag
|
|
if len(series_to_check) > 1:
|
|
msg += " in any release"
|
|
if len(pockets) == 1:
|
|
msg += " for pocket %s" % pockets[0]
|
|
elif len(pockets) != len(POCKETS):
|
|
msg += " for pockets " + ', '.join(pockets)
|
|
elif series:
|
|
msg += " in %s" % series.name
|
|
if len(pockets) == 1:
|
|
msg += "-%s" % pockets[0]
|
|
elif len(pockets) != len(POCKETS):
|
|
msg += " for pockets " + ', '.join(pockets)
|
|
if len(statuses) == 1:
|
|
msg += " with status %s" % statuses[0]
|
|
elif len(statuses) != len(STATUSES):
|
|
msg += " with status in " + ', '.join(statuses)
|
|
if version_with_epoch:
|
|
msg += " (did you forget the epoch? try %s)" % version_with_epoch
|
|
raise PackageNotFoundException(msg)
|
|
|
|
def copyPackage(self, source_name, version, from_archive, to_pocket,
|
|
to_series=None, sponsored=None, include_binaries=False):
|
|
'''Copy a single named source into this archive.
|
|
|
|
Asynchronously copy a specific version of a named source to the
|
|
destination archive if necessary. Calls to this method will return
|
|
immediately if the copy passes basic security checks and the copy
|
|
will happen sometime later with full checking.
|
|
'''
|
|
|
|
if isinstance(sponsored, PersonTeam):
|
|
sponsored = sponsored._lpobject
|
|
|
|
self._lpobject.copyPackage(
|
|
source_name=source_name,
|
|
version=version,
|
|
from_archive=from_archive._lpobject,
|
|
to_pocket=to_pocket,
|
|
to_series=to_series,
|
|
sponsored=sponsored,
|
|
include_binaries=include_binaries
|
|
)
|
|
|
|
def getUploadersForComponent(self, component_name):
|
|
'''Get the list of PersonTeams who can upload packages in the
|
|
specified component.
|
|
[Note: the permission records, themselves, aren't exposed]
|
|
'''
|
|
if component_name not in self._component_uploaders:
|
|
self._component_uploaders[component_name] = sorted(set(
|
|
PersonTeam(permission.person_link) for permission in
|
|
self._lpobject.getUploadersForComponent(component_name=component_name)
|
|
))
|
|
return self._component_uploaders[component_name]
|
|
|
|
def getUploadersForPackage(self, source_package_name):
|
|
'''Get the list of PersonTeams who can upload source_package_name)
|
|
[Note: the permission records, themselves, aren't exposed]
|
|
'''
|
|
if source_package_name not in self._pkg_uploaders:
|
|
self._pkg_uploaders[source_package_name] = sorted(set(
|
|
PersonTeam(permission.person_link) for permission in
|
|
self._lpobject.getUploadersForPackage(source_package_name=source_package_name)
|
|
))
|
|
return self._pkg_uploaders[source_package_name]
|
|
|
|
def getUploadersForPackageset(self, packageset, direct_permissions=False):
|
|
'''Get the list of PersonTeams who can upload packages in packageset
|
|
[Note: the permission records, themselves, aren't exposed]
|
|
'''
|
|
key = (packageset, direct_permissions)
|
|
if key not in self._pkgset_uploaders:
|
|
self._pkgset_uploaders[key] = sorted(set(
|
|
PersonTeam(permission.person_link) for permission in
|
|
self._lpobject.getUploadersForPackageset(
|
|
packageset=packageset._lpobject,
|
|
direct_permissions=direct_permissions,
|
|
)
|
|
))
|
|
return self._pkgset_uploaders[key]
|
|
|
|
|
|
class SourcePackagePublishingHistory(BaseWrapper):
|
|
'''
|
|
Wrapper class around a LP source package object.
|
|
'''
|
|
resource_type = 'source_package_publishing_history'
|
|
|
|
def __init__(self, *args):
|
|
self._archive = None
|
|
self._changelog = None
|
|
self._binaries = {}
|
|
self._distro_series = None
|
|
self._source_urls = None
|
|
# Don't share _builds between different
|
|
# SourcePackagePublishingHistory objects
|
|
if '_builds' not in self.__dict__:
|
|
self._builds = dict()
|
|
|
|
def getDistroSeries(self):
|
|
'''
|
|
Return the DistroSeries.
|
|
'''
|
|
if not self._distro_series:
|
|
self._distro_series = DistroSeries(self._lpobject.distro_series_link)
|
|
return self._distro_series
|
|
|
|
def getPackageName(self):
|
|
'''
|
|
Returns the source package name.
|
|
'''
|
|
return self._lpobject.source_package_name
|
|
|
|
def getVersion(self):
|
|
'''
|
|
Returns the version of the source package.
|
|
'''
|
|
return self._lpobject.source_package_version
|
|
|
|
def getComponent(self):
|
|
'''
|
|
Returns the component of the source package.
|
|
'''
|
|
return self._lpobject.component_name
|
|
|
|
def getSeriesName(self):
|
|
'''
|
|
Returns the series
|
|
|
|
Named getSeriesName() to avoid confusion with
|
|
getDistroSeries()
|
|
'''
|
|
return self.getDistroSeries().name
|
|
|
|
def getSeriesAndPocket(self):
|
|
'''
|
|
Returns a human-readable release-pocket
|
|
'''
|
|
release = self.getSeriesName()
|
|
if self.pocket != 'Release':
|
|
release += '-' + self.pocket.lower()
|
|
return release
|
|
|
|
def getArchive(self):
|
|
'''
|
|
Get this SPPH's archive.
|
|
'''
|
|
if not self._archive:
|
|
self._archive = Archive(self._lpobject.archive_link)
|
|
|
|
return self._archive
|
|
|
|
def getChangelog(self, since_version=None):
|
|
'''
|
|
Return the changelog, optionally since a particular version
|
|
May return None if the changelog isn't available
|
|
Only available in the devel API, not 1.0
|
|
'''
|
|
if self._changelog is None:
|
|
url = self._lpobject.changelogUrl()
|
|
if url is None:
|
|
Logger.error('No changelog available for %s %s' %
|
|
(self.getPackageName(), self.getVersion()))
|
|
return None
|
|
|
|
try:
|
|
response, changelog = Http().request(url)
|
|
except HttpLib2Error as e:
|
|
Logger.error(str(e))
|
|
return None
|
|
if response.status != 200:
|
|
Logger.error('%s: %s %s' % (url, response.status, response.reason))
|
|
return None
|
|
self._changelog = changelog
|
|
|
|
if since_version is None:
|
|
return self._changelog
|
|
|
|
if isinstance(since_version, str):
|
|
since_version = Version(since_version)
|
|
|
|
new_entries = []
|
|
for block in Changelog(self._changelog):
|
|
if block.version <= since_version:
|
|
break
|
|
new_entries.append(str(block))
|
|
return ''.join(new_entries)
|
|
|
|
def sourceFileUrls(self, include_meta=False):
|
|
'''
|
|
Return the URL for this source publication's files.
|
|
|
|
The include_meta param changes the return value;
|
|
when it is False (the default), an array of url strings is
|
|
returned. When include_meta is True, an array is returned
|
|
with dicts, containing the entries:
|
|
url: the url string
|
|
sha1: the SHA1 checksum of the source file (if provided)
|
|
sha256: the SHA256 checksum of the source file
|
|
size: the size of the source file
|
|
Also, this function adds a 'filename' field:
|
|
filename: the filename parsed from the url path
|
|
'''
|
|
if not self._source_urls:
|
|
urls = self._lpobject.sourceFileUrls(include_meta=True)
|
|
if not urls:
|
|
Logger.warning('SPPH %s_%s has no sourceFileUrls' %
|
|
(self.getPackageName(), self.getVersion()))
|
|
for u in urls:
|
|
u['filename'] = os.path.basename(urlparse(u['url']).path)
|
|
self._source_urls = urls
|
|
|
|
if include_meta:
|
|
return list(self._source_urls)
|
|
return [f['url'] for f in self._source_urls]
|
|
|
|
def sourceFileUrl(self, filename):
|
|
'''
|
|
Returns the URL for the specified source filename.
|
|
|
|
If the filename is not found in the sourceFileUrls(), this returns None.
|
|
'''
|
|
for f in self.sourceFileUrls(include_meta=True):
|
|
if filename == f['filename']:
|
|
return f['url']
|
|
return None
|
|
|
|
def sourceFileSha1(self, url_or_filename):
|
|
'''
|
|
Returns the SHA1 checksum for the specified source file url.
|
|
|
|
If the url is not found in the sourceFileUrls(), this returns None.
|
|
|
|
The url may be specified as a filename.
|
|
'''
|
|
for f in self.sourceFileUrls(include_meta=True):
|
|
if url_or_filename in [f['url'], f['filename']]:
|
|
return f['sha1']
|
|
return None
|
|
|
|
def sourceFileSha256(self, url_or_filename):
|
|
'''
|
|
Returns the SHA256 checksum for the specified source file url.
|
|
|
|
If the url is not found in the sourceFileUrls(), this returns None.
|
|
|
|
The url may be specified as a filename.
|
|
'''
|
|
for f in self.sourceFileUrls(include_meta=True):
|
|
if url_or_filename in [f['url'], f['filename']]:
|
|
return f['sha256']
|
|
return None
|
|
|
|
def sourceFileSize(self, url_or_filename):
|
|
'''
|
|
Returns the size for the specified source file url.
|
|
|
|
If the url is not found in the sourceFileUrls(), this returns 0.
|
|
|
|
The url may be specified as a filename.
|
|
'''
|
|
for f in self.sourceFileUrls(include_meta=True):
|
|
if url_or_filename in [f['url'], f['filename']]:
|
|
return int(f['size'])
|
|
return 0
|
|
|
|
def getBinaries(self, arch=None, name=None, ext=None):
|
|
'''
|
|
Returns the resulting BinaryPackagePublishingHistorys.
|
|
If arch is specified, it returns binaries for only that arch,
|
|
plus any binaries with arch 'all'. If arch is not specified, or
|
|
if arch is specified as 'all', all archs are returned.
|
|
|
|
If name is specified, only returns BPPH matching that (regex) name.
|
|
|
|
If ext is specified, only returns BPPH matching that (regex) ext.
|
|
'''
|
|
if arch == 'all':
|
|
arch = None
|
|
|
|
if self.status in ["Pending", "Published"]:
|
|
# Published, great! Directly query the list of binaries
|
|
binaries = map(BinaryPackagePublishingHistory,
|
|
self._lpobject.getPublishedBinaries())
|
|
for b in binaries:
|
|
a = b.arch
|
|
if a not in self._binaries:
|
|
self._binaries[a] = {}
|
|
self._binaries[a][b.binary_package_name] = b
|
|
else:
|
|
# we have to go the long way :(
|
|
Logger.info("Please wait, this may take some time...")
|
|
archive = self.getArchive()
|
|
urls = self.binaryFileUrls()
|
|
for url in urls:
|
|
# strip out the URL leading text.
|
|
filename = os.path.basename(urlparse(url).path)
|
|
# strip the file suffix
|
|
(pkgname, _, e) = filename.rpartition('.')
|
|
# split into name, version, arch
|
|
(n, v, a) = pkgname.rsplit('_', 2)
|
|
# arch 'all' has separate bpph for each real arch,
|
|
# but all point to the same binary url
|
|
if a == 'all':
|
|
a = arch or host_architecture()
|
|
# Only check the arch requested - saves time
|
|
if arch and arch != a:
|
|
continue
|
|
# Only check the name requested - saves time
|
|
if name and not re.match(name, n):
|
|
continue
|
|
# Only check the ext requested - saves time
|
|
if ext and not re.match(ext, e):
|
|
continue
|
|
# If we already have this BPPH, keep going
|
|
if a in self._binaries and n in self._binaries[a]:
|
|
continue
|
|
# we ignore the version, as it may be missing epoch
|
|
# also we can't use series, as some package versions
|
|
# span multiple series! (e.g. for different archs)
|
|
params = {'name': n,
|
|
'archtag': a,
|
|
'version': self.getVersion()}
|
|
try:
|
|
bpph = archive.getBinaryPackage(**params)
|
|
except PackageNotFoundException:
|
|
Logger.debug("Could not find pkg in archive: %s" % filename)
|
|
continue
|
|
if a not in self._binaries:
|
|
self._binaries[a] = {}
|
|
self._binaries[a][n] = bpph
|
|
|
|
if not arch:
|
|
bpphs = [b for a in self._binaries.values() for b in a.values()]
|
|
elif arch in self._binaries:
|
|
bpphs = list(self._binaries[arch].values())
|
|
else:
|
|
return []
|
|
|
|
if name:
|
|
bpphs = [b for b in bpphs if re.match(name, b.binary_package_name)]
|
|
|
|
if ext:
|
|
bpphs = [b for b in bpphs if re.match(ext, b.getFileExt())]
|
|
|
|
return bpphs
|
|
|
|
def _fetch_builds(self):
|
|
'''Populate self._builds with the build records.'''
|
|
builds = self.getBuilds()
|
|
for build in builds:
|
|
self._builds[build.arch_tag] = Build(build)
|
|
|
|
def getBuildStates(self, archs):
|
|
res = list()
|
|
|
|
if not self._builds:
|
|
self._fetch_builds()
|
|
|
|
for arch in archs:
|
|
build = self._builds.get(arch)
|
|
if build:
|
|
res.append(' %s' % build)
|
|
return "Build state(s) for '%s':\n%s" % (
|
|
self.getPackageName(), '\n'.join(res))
|
|
|
|
def rescoreBuilds(self, archs, score):
|
|
res = list()
|
|
|
|
if not self._builds:
|
|
self._fetch_builds()
|
|
|
|
for arch in archs:
|
|
build = self._builds.get(arch)
|
|
if build:
|
|
if build.rescore(score):
|
|
res.append(' %s: done' % arch)
|
|
else:
|
|
res.append(' %s: failed' % arch)
|
|
return "Rescoring builds of '%s' to %i:\n%s" % (
|
|
self.getPackageName(), score, '\n'.join(res))
|
|
|
|
def retryBuilds(self, archs):
|
|
res = list()
|
|
|
|
if not self._builds:
|
|
self._fetch_builds()
|
|
|
|
for arch in archs:
|
|
build = self._builds.get(arch)
|
|
if build:
|
|
if build.retry():
|
|
res.append(' %s: done' % arch)
|
|
else:
|
|
res.append(' %s: failed' % arch)
|
|
return "Retrying builds of '%s':\n%s" % (
|
|
self.getPackageName(), '\n'.join(res))
|
|
|
|
|
|
class BinaryPackagePublishingHistory(BaseWrapper):
|
|
'''
|
|
Wrapper class around a LP binary package object.
|
|
'''
|
|
resource_type = 'binary_package_publishing_history'
|
|
|
|
def __init__(self, *args):
|
|
self._arch = None
|
|
self._ext = None
|
|
self._binary_urls = None
|
|
|
|
@property
|
|
def arch(self):
|
|
if not self._arch:
|
|
das = DistroArchSeries(self._lpobject.distro_arch_series_link)
|
|
self._arch = das.architecture_tag
|
|
return self._arch
|
|
|
|
def getSourcePackageName(self):
|
|
'''
|
|
Returns the source package name.
|
|
'''
|
|
return self.getBuild().source_package_name
|
|
|
|
def getPackageName(self):
|
|
'''
|
|
Returns the binary package name.
|
|
'''
|
|
return self._lpobject.binary_package_name
|
|
|
|
def getVersion(self):
|
|
'''
|
|
Returns the version of the binary package.
|
|
'''
|
|
return self._lpobject.binary_package_version
|
|
|
|
def getComponent(self):
|
|
'''
|
|
Returns the component of the binary package.
|
|
'''
|
|
return self._lpobject.component_name
|
|
|
|
def binaryFileUrls(self, include_meta=False):
|
|
'''
|
|
Return the URL for this binary publication's files.
|
|
Only available in the devel API, not 1.0
|
|
|
|
The include_meta param changes the return value;
|
|
when it is False (the default), an array of url strings is
|
|
returned (but typically there is only a single url in the array).
|
|
When include_meta is True, an array (again, with typically only one
|
|
entry) is returned with dicts, containing the entries:
|
|
url: the url string
|
|
sha1: the SHA1 checksum of the binary file
|
|
sha256: the SHA256 checksum of the binary file
|
|
size: the size of the binary file
|
|
Also, this function adds a 'filename' field:
|
|
filename: the filename parsed from the url path
|
|
'''
|
|
if not self._binary_urls:
|
|
try:
|
|
urls = self._lpobject.binaryFileUrls(include_meta=True)
|
|
except AttributeError:
|
|
raise AttributeError("binaryFileUrls can only be found in lpapi "
|
|
"devel, not 1.0. Login using devel to have it.")
|
|
if not urls:
|
|
Logger.warning('BPPH %s_%s has no binaryFileUrls' %
|
|
(self.getPackageName(), self.getVersion()))
|
|
for u in urls:
|
|
u['filename'] = os.path.basename(urlparse(u['url']).path)
|
|
self._binary_urls = urls
|
|
|
|
if include_meta:
|
|
return list(self._binary_urls)
|
|
return [f['url'] for f in self._binary_urls]
|
|
|
|
def binaryFileUrl(self, filename):
|
|
'''
|
|
Returns the URL for the specified binary filename.
|
|
|
|
If the filename is not found in the binaryFileUrls(), this returns None.
|
|
'''
|
|
for f in self.binaryFileUrls(include_meta=True):
|
|
if filename == f['filename']:
|
|
return f['url']
|
|
return None
|
|
|
|
def binaryFileSha1(self, url_or_filename):
|
|
'''
|
|
Returns the SHA1 checksum for the specified binary file url.
|
|
|
|
If the url is not found in the binaryFileUrls(), this returns None.
|
|
|
|
The url may be specified as a filename.
|
|
'''
|
|
for f in self.binaryFileUrls(include_meta=True):
|
|
if url_or_filename in [f['url'], f['filename']]:
|
|
return f['sha1']
|
|
return None
|
|
|
|
def binaryFileSha256(self, url_or_filename):
|
|
'''
|
|
Returns the SHA256 checksum for the specified binary file url.
|
|
|
|
If the url is not found in the binaryFileUrls(), this returns None.
|
|
|
|
The url may be specified as a filename.
|
|
'''
|
|
for f in self.binaryFileUrls(include_meta=True):
|
|
if url_or_filename in [f['url'], f['filename']]:
|
|
return f['sha256']
|
|
return None
|
|
|
|
def binaryFileSize(self, url_or_filename):
|
|
'''
|
|
Returns the size for the specified binary file url.
|
|
|
|
If the url is not found in the binaryFileUrls(), this returns 0.
|
|
|
|
The url may be specified as a filename.
|
|
'''
|
|
for f in self.binaryFileUrls(include_meta=True):
|
|
if url_or_filename in [f['url'], f['filename']]:
|
|
return int(f['size'])
|
|
return 0
|
|
|
|
def getBuild(self):
|
|
'''
|
|
Returns the original build of the binary package.
|
|
'''
|
|
return Build(self._lpobject.build_link)
|
|
|
|
def getUrl(self):
|
|
'''
|
|
Returns the original build URL of the binary package.
|
|
'''
|
|
return "{build}/+files/{filename}".format(build=self.getBuild().getUrl(),
|
|
filename=self.getFileName())
|
|
|
|
def getFileVersion(self):
|
|
'''
|
|
Returns the file version, which is the package version without the epoch
|
|
'''
|
|
return Version(self.getVersion()).strip_epoch()
|
|
|
|
def getFileArch(self):
|
|
'''
|
|
Returns the file arch, which is 'all' if not arch-specific
|
|
'''
|
|
if bool(self._lpobject.architecture_specific):
|
|
return self.arch
|
|
else:
|
|
return 'all'
|
|
|
|
def getFileExt(self):
|
|
'''
|
|
Returns the file extension; "deb", "ddeb", or "udeb".
|
|
'''
|
|
if not self._ext:
|
|
self._ext = self._getFileExt()
|
|
|
|
return self._ext
|
|
|
|
def _getFileExt(self):
|
|
try:
|
|
# this is the best way, from the actual URL filename
|
|
return self.binaryFileUrls()[0].rpartition('.')[2]
|
|
except (AttributeError, IndexError):
|
|
Logger.debug('Could not get file ext from url, trying to guess...')
|
|
|
|
# is_debug should be reliable way of detecting ddeb...?
|
|
try:
|
|
if self.is_debug:
|
|
return "ddeb"
|
|
except AttributeError:
|
|
# is_debug only available with api version 'devel'
|
|
if self.getPackageName().endswith("-dbgsym"):
|
|
return "ddeb"
|
|
|
|
# is this reliable?
|
|
if self.getPackageName().endswith("-di") or self.getPackageName().endswith("-udeb"):
|
|
return "udeb"
|
|
|
|
# everything else - assume regular deb
|
|
return "deb"
|
|
|
|
def getFileName(self):
|
|
'''
|
|
Returns the filename for this binary package.
|
|
'''
|
|
return "{name}_{version}_{arch}.{ext}".format(name=self.getPackageName(),
|
|
version=self.getFileVersion(),
|
|
arch=self.getFileArch(),
|
|
ext=self.getFileExt())
|
|
|
|
|
|
class MetaPersonTeam(MetaWrapper):
|
|
@property
|
|
def me(cls):
|
|
'''The PersonTeam object of the currently authenticated LP user or
|
|
None when anonymously logged in.
|
|
'''
|
|
if '_me' not in cls.__dict__:
|
|
try:
|
|
cls._me = PersonTeam(Launchpad.me)
|
|
except HTTPError as error:
|
|
if error.response.status == 401:
|
|
# Anonymous login
|
|
cls._me = None
|
|
else:
|
|
raise
|
|
return cls._me
|
|
|
|
|
|
class PersonTeam(BaseWrapper, metaclass=MetaPersonTeam):
|
|
'''
|
|
Wrapper class around a LP person or team object.
|
|
'''
|
|
|
|
resource_type = ('person', 'team')
|
|
|
|
def __init__(self, *args):
|
|
# Don't share _upload between different PersonTeams
|
|
self._ppas = None
|
|
if '_upload' not in self.__dict__:
|
|
self._upload = dict()
|
|
|
|
def __str__(self):
|
|
return u'%s (%s)' % (self.display_name, self.name)
|
|
|
|
def cache(self):
|
|
self._cache[self.name] = self
|
|
|
|
@classmethod
|
|
def fetch(cls, person_or_team):
|
|
'''
|
|
Fetch the person or team object identified by 'url' from LP.
|
|
'''
|
|
if not isinstance(person_or_team, str):
|
|
raise TypeError("Don't know what do with '%r'" % person_or_team)
|
|
cached = cls._cache.get(person_or_team)
|
|
if not cached:
|
|
cached = PersonTeam(Launchpad.people[person_or_team])
|
|
return cached
|
|
|
|
def isLpTeamMember(self, team):
|
|
'''
|
|
Checks if the user is a member of a certain team on Launchpad.
|
|
|
|
Returns True if the user is a member of the team otherwise False.
|
|
'''
|
|
return any(t.name == team for t in self.super_teams)
|
|
|
|
def canUploadPackage(self, archive, distroseries, package, component,
|
|
pocket='Release'):
|
|
'''Check if the person or team has upload rights for the source
|
|
package to the specified 'archive' and 'distrorelease'.
|
|
|
|
A source package name and a component have to be specified.
|
|
'archive' has to be a Archive object.
|
|
'distroseries' has to be an DistroSeries object.
|
|
'''
|
|
if not isinstance(archive, Archive):
|
|
raise TypeError("'%r' is not an Archive object." % archive)
|
|
if not isinstance(distroseries, DistroSeries):
|
|
raise TypeError("'%r' is not a DistroSeries object." % distroseries)
|
|
if package is not None and not isinstance(package, str):
|
|
raise TypeError('A source package name expected.')
|
|
if component is not None and not isinstance(component, str):
|
|
raise TypeError('A component name expected.')
|
|
if package is None and component is None:
|
|
raise ValueError('Either a source package name or a component has '
|
|
'to be specified.')
|
|
if pocket not in POCKETS:
|
|
raise PocketDoesNotExistError("Pocket '%s' does not exist." %
|
|
pocket)
|
|
|
|
canUpload = self._upload.get((archive, distroseries, pocket, package,
|
|
component))
|
|
|
|
if canUpload is None:
|
|
# checkUpload() throws an exception if the person can't upload
|
|
try:
|
|
archive.checkUpload(
|
|
component=component,
|
|
distroseries=distroseries(),
|
|
person=self(),
|
|
pocket=pocket,
|
|
sourcepackagename=package,
|
|
)
|
|
canUpload = True
|
|
except HTTPError as e:
|
|
if e.response.status == 403:
|
|
canUpload = False
|
|
else:
|
|
raise e
|
|
index = (archive, distroseries, pocket, package, component)
|
|
self._upload[index] = canUpload
|
|
|
|
return canUpload
|
|
|
|
def getPPAs(self):
|
|
if self._ppas is None:
|
|
ppas = [Archive(ppa['self_link']) for ppa in
|
|
Launchpad.load(self._lpobject.ppas_collection_link).entries]
|
|
self._ppas = {ppa.name: ppa for ppa in ppas}
|
|
return self._ppas
|
|
|
|
def getPPAByName(self, name):
|
|
return Archive(self._lpobject.getPPAByName(name=name))
|
|
|
|
|
|
class Build(BaseWrapper):
|
|
'''
|
|
Wrapper class around a build object.
|
|
'''
|
|
resource_type = 'build'
|
|
|
|
def __str__(self):
|
|
return u'%s: %s' % (self.arch_tag, self.buildstate)
|
|
|
|
def getSourcePackagePublishingHistory(self):
|
|
link = self._lpobject.current_source_publication_link
|
|
if link:
|
|
if re.search('redacted', link):
|
|
# Too old - the link has been 'redacted'
|
|
return None
|
|
return SourcePackagePublishingHistory(link)
|
|
return None
|
|
|
|
def getUrl(self):
|
|
return self()
|
|
|
|
def rescore(self, score):
|
|
if self.can_be_rescored:
|
|
self().rescore(score=score)
|
|
return True
|
|
return False
|
|
|
|
def retry(self):
|
|
if self.can_be_retried:
|
|
self().retry()
|
|
return True
|
|
return False
|
|
|
|
|
|
class DistributionSourcePackage(BaseWrapper):
|
|
'''
|
|
Caching class for distribution_source_package objects.
|
|
'''
|
|
resource_type = 'distribution_source_package'
|
|
|
|
|
|
class Packageset(BaseWrapper):
|
|
'''
|
|
Caching class for packageset objects.
|
|
'''
|
|
resource_type = 'packageset'
|
|
_lp_packagesets = None
|
|
_source_sets = {}
|
|
|
|
@classmethod
|
|
def setsIncludingSource(cls, sourcepackagename, distroseries=None,
|
|
direct_inclusion=False):
|
|
'''Get the package sets including sourcepackagename'''
|
|
|
|
if cls._lp_packagesets is None:
|
|
cls._lp_packagesets = Launchpad.packagesets
|
|
|
|
key = (sourcepackagename, distroseries, direct_inclusion)
|
|
if key not in cls._source_sets:
|
|
params = {
|
|
'sourcepackagename': sourcepackagename,
|
|
'direct_inclusion': direct_inclusion,
|
|
}
|
|
if distroseries is not None:
|
|
params['distroseries'] = distroseries._lpobject
|
|
|
|
cls._source_sets[key] = [Packageset(packageset) for packageset in
|
|
cls._lp_packagesets.setsIncludingSource(**params)]
|
|
|
|
return cls._source_sets[key]
|