You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

658 lines
18 KiB

#!/usr/bin/env python2.7
#
try:
from urllib.request import urlopen
except ImportError:
from urllib2 import urlopen
import os
import yaml
class KernelRoutingEntry:
def __init__(self, ks, source, data):
name = "{}:{}".format(source.series.codename, source.name)
if isinstance(data, str):
name = data
table = source.series.routing_table
if table is None:
raise ValueError("unable to map routing alias {}, "
"no series routing table".format(data))
if data not in table:
raise ValueError("unable to map routing alias {}, "
"not listed in series routing table".format(data))
data = table[data]
# Clear out any entries that have been overriden to None.
for entry in dict(data):
if data[entry] is None:
del data[entry]
self._ks = ks
self._source = source
self._name = name
self._data = data if data else {}
@property
def source(self):
return self._source
@property
def name(self):
return self._name
def __eq__(self, other):
if isinstance(self, other.__class__):
return list(self) == list(other)
return False
def __ne__(self, other):
return not self.__eq__(other)
def __iter__(self):
return iter(self._data.items())
def __getitem__(self, which):
return self._data[which]
def lookup_destination(self, dest, primary=False):
data = self._data.get(dest, None)
if primary is False or data is None:
return data
return data[0]
def __str__(self):
return str(self._data)
class KernelRepoEntry:
def __init__(self, ks, owner, data):
if isinstance(data, list):
new_data = {'url': data[0]}
if len(data) == 1:
new_data['branch'] = 'master'
elif len(data) == 2:
new_data['branch'] = data[1]
data = new_data
self._ks = ks
self._owner = owner
self._data = data if data else {}
@property
def owner(self):
return self._owner
# XXX: should this object have a name ?
def __eq__(self, other):
if isinstance(self, other.__class__):
return self.url == other.url and self.branch == other.branch
return False
def __ne__(self, other):
return not self.__eq__(other)
@property
def url(self):
return self._data['url']
@property
def branch(self):
return self._data.get('branch', None)
def __str__(self):
return "{} {}".format(self.url, self.branch)
class KernelSnapEntry:
def __init__(self, ks, source, name, data):
self._ks = ks
self._source = source
self._name = name
self._data = data if data else {}
# Convert arches/track to publish-to form.
if 'publish-to' not in self._data:
if 'arches' in self._data:
publish_to = {}
for arch in self._data['arches']:
publish_to[arch] = [self._data.get('track', 'latest')]
self._data['publish-to'] = publish_to
# Convert stable to promote-to form.
if 'promote-to' not in self._data and 'stable' in self._data:
if self._data['stable'] is True:
self._data['promote-to'] = 'stable'
else:
self._data['promote-to'] = 'candidate'
# Assume no promote-to data to mean just to edge.
promote_to = self._data.get('promote-to', 'edge')
if isinstance(promote_to, str):
expand_promote_to = []
for risk in ('edge', 'beta', 'candidate', 'stable'):
expand_promote_to.append(risk)
if risk == promote_to:
break
self._data['promote-to'] = expand_promote_to
# Ensure we have stable when promote-to is present.
if 'promote-to' in self._data and 'stable' not in self._data:
if 'stable' in self._data['promote-to']:
self._data['stable'] = True
else:
self._data['stable'] = False
def __eq__(self, other):
if isinstance(self, other.__class__):
return self.name == other.name and self.source == other.source
return False
def __ne__(self, other):
return not self.__eq__(other)
@property
def series(self):
return self._source.series
@property
def source(self):
return self._source
@property
def name(self):
return self._name
@property
def repo(self):
data = self._data.get('repo', None)
if not data:
return None
return KernelRepoEntry(self._ks, self, data)
@property
def primary(self):
return self._data.get('primary', False)
@property
def gated(self):
return self._data.get('gated', False)
@property
def stable(self):
return self._data.get('stable', False)
@property
def qa(self):
return self._data.get('qa', False)
@property
def hw_cert(self):
return self._data.get('hw-cert', False)
@property
def arches(self):
# XXX: should this be []
return self._data.get('arches', None)
@property
def track(self):
return self._data.get('track', None)
@property
def publish_to(self):
return self._data.get('publish-to', None)
@property
def promote_to(self):
return self._data.get('promote-to', None)
def promote_to_risk(self, risk):
return risk in self._data.get('promote-to', [])
def __str__(self):
return "{} {}".format(str(self.source), self.name)
class KernelPackageEntry:
def __init__(self, ks, source, name, data):
self._ks = ks
self._source = source
self._name = name
self._data = data if data else {}
def __eq__(self, other):
if isinstance(self, other.__class__):
return self.name == other.name and self.source == other.source
return False
def __ne__(self, other):
return not self.__eq__(other)
@property
def series(self):
return self._source.series
@property
def source(self):
return self._source
@property
def name(self):
return self._name
@property
def type(self):
return self._data.get('type', None)
@property
def repo(self):
data = self._data.get('repo', None)
if not data:
return None
return KernelRepoEntry(self._ks, self, data)
def __str__(self):
return "{} {} {}".format(str(self.source), self.name, self.type)
class KernelSourceEntry:
def __init__(self, ks, series, name, data):
self._ks = ks
self._series = series
self._name = name
self._data = data if data else {}
def __eq__(self, other):
if isinstance(self, other.__class__):
return self.name == other.name and self.series == other.series
return False
def __ne__(self, other):
return not self.__eq__(other)
@property
def name(self):
return self._name
@property
def series(self):
return self._series
@property
def versions(self):
if 'versions' in self._data:
return self._data['versions']
derived_from = self.derived_from
if derived_from is not None:
return derived_from.versions
copy_forward = self.copy_forward
if copy_forward is not None:
return copy_forward.versions
# XXX: should this be []
return None
@property
def version(self):
versions = self.versions
if not versions:
return None
return versions[-1]
@property
def development(self):
return self._data.get('development', self.series.development)
@property
def supported(self):
return self._data.get('supported', self.series.supported)
@property
def severe_only(self):
return self._data.get('severe-only', False)
@property
def stakeholder(self):
return self._data.get('stakeholder', None)
@property
def packages(self):
# XXX: should this return None when empty
result = []
packages = self._data.get('packages')
if packages:
for package_key, package in packages.items():
result.append(KernelPackageEntry(self._ks, self, package_key, package))
return result
def lookup_package(self, package_key):
packages = self._data.get('packages')
if not packages or package_key not in packages:
return None
return KernelPackageEntry(self._ks, self, package_key, packages[package_key])
@property
def snaps(self):
# XXX: should this return None when empty
result = []
snaps = self._data.get('snaps')
if snaps:
for snap_key, snap in snaps.items():
result.append(KernelSnapEntry(self._ks, self, snap_key, snap))
return result
def lookup_snap(self, snap_key):
snaps = self._data.get('snaps')
if not snaps or snap_key not in snaps:
return None
return KernelSnapEntry(self._ks, self, snap_key, snaps[snap_key])
@property
def derived_from(self):
if 'derived-from' not in self._data:
return None
(series_key, source_key) = self._data['derived-from']
series = self._ks.lookup_series(series_key)
source = series.lookup_source(source_key)
return source
@property
def testable_flavours(self):
retval = []
if (self._data.get('testing') is not None and
self._data['testing'].get('flavours') is not None
):
for flavour in self._data['testing']['flavours'].keys():
fdata = self._data['testing']['flavours'][flavour]
# If we have neither arches nor clouds we represent a noop
if not fdata:
continue
arches = fdata.get('arches', None)
arches = arches if arches is not None else []
clouds = fdata.get('clouds', None)
clouds = clouds if clouds is not None else []
retval.append(KernelSourceTestingFlavourEntry(flavour, arches, clouds))
return retval
@property
def invalid_tasks(self):
retval = self._data.get('invalid-tasks', [])
if retval is None:
retval = []
return retval
@property
def copy_forward(self):
if 'copy-forward' not in self._data:
return None
# XXX: backwards compatibility.
if self._data['copy-forward'] is False:
return None
if self._data['copy-forward'] is True:
derived_from = self.derived_from
if derived_from is None:
return True
return self.derived_from
(series_key, source_key) = self._data['copy-forward']
series = self._ks.lookup_series(series_key)
source = series.lookup_source(source_key)
return source
@property
def backport(self):
return self._data.get('backport', False)
@property
def routing(self):
default = 'default'
if self.series.development:
default = 'devel'
if self.series.esm:
default = 'esm'
data = self._data.get('routing', default)
if data is None:
return data
return KernelRoutingEntry(self._ks, self, data)
@property
def swm_data(self):
return self._data.get('swm')
@property
def private(self):
return self._data.get('private', False)
def __str__(self):
return "{} {}".format(self.series.name, self.name)
class KernelSourceTestingFlavourEntry:
def __init__(self, name, arches, clouds):
self._name = name
self._arches = arches
self._clouds = clouds
@property
def name(self):
return self._name
@property
def arches(self):
return self._arches
@property
def clouds(self):
return self._clouds
class KernelSeriesEntry:
def __init__(self, ks, name, data, defaults=None):
self._ks = ks
self._name = name
self._data = {}
if defaults is not None:
self._data.update(defaults)
if data is not None:
self._data.update(data)
def __eq__(self, other):
if isinstance(self, other.__class__):
return self.name == other.name
return False
def __ne__(self, other):
return not self.__eq__(other)
@property
def name(self):
return self._name
@property
def codename(self):
return self._data.get('codename', None)
@property
def opening(self):
if 'opening' in self._data:
if self._data['opening'] is not False:
return True
return False
def opening_ready(self, *flags):
if 'opening' not in self._data:
return True
allow = self._data['opening']
if allow is None:
return False
if allow in (True, False):
return not allow
for flag in flags:
flag_allow = allow.get(flag, False)
if flag_allow is None or flag_allow is False:
return False
return True
opening_allow = opening_ready
@property
def development(self):
return self._data.get('development', False)
@property
def supported(self):
return self._data.get('supported', False)
@property
def lts(self):
return self._data.get('lts', False)
@property
def esm(self):
return self._data.get('esm', False)
def __str__(self):
return "{} ({})".format(self.name, self.codename)
@property
def sources(self):
result = []
sources = self._data.get('sources')
if sources:
for source_key, source in sources.items():
result.append(KernelSourceEntry(
self._ks, self, source_key, source))
return result
@property
def routing_table(self):
return self._data.get('routing-table', None)
def lookup_source(self, source_key):
sources = self._data.get('sources')
if not sources or source_key not in sources:
return None
return KernelSourceEntry(self._ks, self, source_key, sources[source_key])
# KernelSeries
#
class KernelSeries:
_url = 'https://git.launchpad.net/~canonical-kernel/' \
'+git/kteam-tools/plain/info/kernel-series.yaml'
_url_local = 'file://' + os.path.realpath(os.path.join(os.path.dirname(__file__),
'..', 'info', 'kernel-series.yaml'))
#_url = 'file:///home/apw/git2/kteam-tools/info/kernel-series.yaml'
#_url = 'file:///home/work/kteam-tools/info/kernel-series.yaml'
_data_txt = {}
@classmethod
def __load_once(cls, url):
if url not in cls._data_txt:
response = urlopen(url)
data = response.read()
if not isinstance(data, str):
data = data.decode('utf-8')
cls._data_txt[url] = data
return cls._data_txt[url]
def __init__(self, url=None, data=None, use_local=os.getenv("USE_LOCAL_KERNEL_SERIES_YAML", False)):
if data or url:
if url:
response = urlopen(url)
data = response.read()
if not isinstance(data, str):
data = data.decode('utf-8')
else:
data = self.__load_once(self._url_local if use_local else self._url)
self._data = yaml.safe_load(data)
self._development_series = None
self._codename_to_series = {}
for series_key, series in self._data.items():
if not series:
continue
if series.get('development', False):
self._development_series = series_key
if 'codename' in series:
self._codename_to_series[series['codename']] = series_key
# Pull out the defaults.
self._defaults_series = {}
if 'defaults' in self._data:
self._defaults_series = self._data['defaults']
del self._data['defaults']
@staticmethod
def key_series_name(series):
return [int(x) for x in series.name.split('.')]
@property
def series(self):
return [KernelSeriesEntry(self, series_key, series,
defaults=self._defaults_series)
for series_key, series in self._data.items()]
def lookup_series(self, series=None, codename=None, development=False):
if not series and not codename and not development:
raise ValueError("series/codename/development required")
if not series and codename:
if codename not in self._codename_to_series:
return None
series = self._codename_to_series[codename]
if not series and development:
if not self._development_series:
return None
series = self._development_series
if series and series not in self._data:
return None
return KernelSeriesEntry(self, series, self._data[series],
defaults=self._defaults_series)
if __name__ == '__main__':
db = KernelSeries()
series = db.lookup_series('16.04')
if series.name != '16.04':
print('series.name != 16.04')
if series.codename != 'xenial':
print('series.codename != xenial')
series2 = db.lookup_series(codename='xenial')
if series2.name != '16.04':
print('series2.name != 16.04')
if series2.codename != 'xenial':
print('series2.codename != xenial')
series3 = db.lookup_series(development=True)
if series3.name != '18.04':
print('series3.name != 18.04')
if series3.codename != 'bionic':
print('series3.codename != bionic')
print(str(series), str(series2), str(series3))
for series2 in sorted(db.series, key=db.key_series_name):
print(series2)
for source in series.sources:
print(str(source), source.series.name, source.name)
print(source.derived_from)
print(source.versions)
for package in source.packages:
print("PACKAGE", str(package))
for snap in source.snaps:
print("SNAP", str(snap), snap.arches)
# vi:set ts=4 sw=4 expandtab: