ppa-britney/ubuntu-archive-tools/archive-cruft-check

369 lines
13 KiB

#! /usr/bin/python2.7
# Copyright 2009-2012 Canonical Ltd. This software is licensed under the
# GNU Affero General Public License version 3.
from __future__ import print_function
from collections import defaultdict
import logging
import optparse
import os
import re
import subprocess
import sys
import tempfile
import apt_pkg
from launchpadlib.errors import HTTPError
from launchpadlib.launchpad import Launchpad
re_extract_src_version = re.compile(r"(\S+)\s*\((.*)\)")
class ArchiveCruftCheckerError(Exception):
"""ArchiveCruftChecker specific exception.
Mostly used to describe errors in the initialization of this object.
"""
class TagFileNotFound(Exception):
"""Raised when an archive tag file could not be found."""
class ArchiveCruftChecker:
"""Perform overall checks to identify and remove obsolete records.
Use initialize() method to validate passed parameters and build the
infrastructure variables. It will raise ArchiveCruftCheckerError if
something goes wrong.
"""
# XXX cprov 2006-05-15: the default archive path should come
# from the config.
def __init__(self, launchpad_instance='production',
distribution_name='ubuntu', suite=None,
archive_path='/srv/launchpad.net/ubuntu-archive'):
"""Store passed arguments.
Also initialize empty variables for storing preliminary results.
"""
self.launchpad = Launchpad.login_anonymously(
'archive-cruft-check', launchpad_instance)
self.distribution_name = distribution_name
self.suite = suite
self.archive_path = archive_path
# initialize a group of variables to store temporary results
# available versions of published sources
self.source_versions = {}
# available binaries produced by published sources
self.source_binaries = {}
# 'Not Build From Source' binaries
self.nbs = defaultdict(lambda: defaultdict(dict))
# published binary package names
self.bin_pkgs = defaultdict(list)
# Architecture specific binary packages
self.arch_any = defaultdict(lambda: "0")
# proposed NBS (before clean up)
self.dubious_nbs = defaultdict(lambda: defaultdict(set))
# NBS after clean up
self.real_nbs = defaultdict(lambda: defaultdict(set))
# definitive NBS organized for clean up
self.nbs_to_remove = []
@property
def components_and_di(self):
components_and_di = []
for component in self.components:
components_and_di.append(component)
components_and_di.append('%s/debian-installer' % (component))
return components_and_di
@property
def dist_archive(self):
return os.path.join(
self.archive_path, self.distro.name, 'dists', self.suite)
def gunzipTagFileContent(self, filename):
"""Gunzip the contents of passed filename.
Check filename presence, if not present in the filesystem,
raises ArchiveCruftCheckerError. Use an tempfile.mkstemp()
to store the uncompressed content. Invoke system available
gunzip`, raises ArchiveCruftCheckError if it fails.
This method doesn't close the file descriptor used and does not
remove the temporary file from the filesystem, those actions
are required in the callsite. (apt_pkg.TagFile is lazy)
Return a tuple containing:
* temp file descriptor
* temp filename
* the contents parsed by apt_pkg.TagFile()
"""
if not os.path.exists(filename):
raise TagFileNotFound("File does not exist: %s" % filename)
temp_fd, temp_filename = tempfile.mkstemp()
subprocess.check_call(['gunzip', '-c', filename], stdout=temp_fd)
os.lseek(temp_fd, 0, os.SEEK_SET)
temp_file = os.fdopen(temp_fd)
# XXX cprov 2006-05-15: maybe we need some sort of data integrity
# check at this point, and maybe keep the uncompressed file
# for debug purposes, let's see how it behaves in real conditions.
parsed_contents = apt_pkg.TagFile(temp_file)
return temp_file, temp_filename, parsed_contents
def processSources(self):
"""Process archive sources index.
Build source_binaries, source_versions and bin_pkgs lists.
"""
logging.debug("Considering Sources:")
for component in self.components:
filename = os.path.join(
self.dist_archive, "%s/source/Sources.gz" % component)
logging.debug("Processing %s" % filename)
try:
temp_fd, temp_filename, parsed_sources = (
self.gunzipTagFileContent(filename))
except TagFileNotFound as warning:
logging.warning(warning)
return
try:
for section in parsed_sources:
source = section.find("Package")
source_version = section.find("Version")
binaries = section.find("Binary")
for binary in [
item.strip() for item in binaries.split(',')]:
self.bin_pkgs[binary].append(source)
self.source_binaries[source] = binaries
self.source_versions[source] = source_version
finally:
# close fd and remove temporary file used to store
# uncompressed tag file content from the filesystem.
temp_fd.close()
os.unlink(temp_filename)
def buildNBS(self):
"""Build the group of 'not build from source' binaries"""
# Checks based on the Packages files
logging.debug("Building not built from source list (NBS):")
for component in self.components_and_di:
for architecture in self.architectures:
self.buildArchNBS(component, architecture)
def buildArchNBS(self, component, architecture):
"""Build NBS per architecture.
Store results in self.nbs, also build architecture specific
binaries group (stored in self.arch_any)
"""
filename = os.path.join(
self.dist_archive,
"%s/binary-%s/Packages.gz" % (component, architecture))
logging.debug("Processing %s" % filename)
try:
temp_fd, temp_filename, parsed_packages = (
self.gunzipTagFileContent(filename))
except TagFileNotFound as warning:
logging.warn(warning)
return
try:
for section in parsed_packages:
package = section.find('Package')
source = section.find('Source', "")
version = section.find('Version')
architecture = section.find('Architecture')
if source == "":
source = package
if source.find("(") != -1:
m = re_extract_src_version.match(source)
source = m.group(1)
version = m.group(2)
if package not in self.bin_pkgs:
self.nbs[source][package][version] = ""
if architecture != "all":
if apt_pkg.version_compare(
version, self.arch_any[package]) < 1:
self.arch_any[package] = version
finally:
# close fd and remove temporary file used to store uncompressed
# tag file content from the filesystem.
temp_fd.close()
os.unlink(temp_filename)
def addNBS(self, nbs_d, source, version, package):
"""Add a new entry in given organized nbs_d list
Ensure the package is still published in the suite before add.
"""
result = self.archive.getPublishedBinaries(
binary_name=package, exact_match=True, status='Published')
result = [bpph for bpph in result
if bpph.distro_arch_series_link in self.das_urls]
if result:
nbs_d[source][version].add(package)
def refineNBS(self):
""" Distinguish dubious from real NBS.
They are 'dubious' if the version numbers match and 'real'
if the versions don't match.
It stores results in self.dubious_nbs and self.real_nbs.
"""
for source in self.nbs:
for package in self.nbs[source]:
versions = sorted(
self.nbs[source][package], cmp=apt_pkg.version_compare)
latest_version = versions.pop()
source_version = self.source_versions.get(source, "0")
if apt_pkg.version_compare(latest_version,
source_version) == 0:
# We don't actually do anything with dubious_nbs for
# now, so let's not waste time computing it.
#self.addNBS(self.dubious_nbs, source, latest_version,
# package)
pass
else:
self.addNBS(self.real_nbs, source, latest_version,
package)
def outputNBS(self):
"""Properly display built NBS entries.
Also organize the 'real' NBSs for removal in self.nbs_to_remove
attribute.
"""
output = "Not Built from Source\n"
output += "---------------------\n\n"
nbs_keys = sorted(self.real_nbs)
for source in nbs_keys:
proposed_bin = self.source_binaries.get(
source, "(source does not exist)")
proposed_version = self.source_versions.get(source, "??")
output += (" * %s_%s builds: %s\n"
% (source, proposed_version, proposed_bin))
output += "\tbut no longer builds:\n"
versions = sorted(
self.real_nbs[source], cmp=apt_pkg.version_compare)
for version in versions:
packages = sorted(self.real_nbs[source][version])
for pkg in packages:
self.nbs_to_remove.append(pkg)
output += " o %s: %s\n" % (
version, ", ".join(packages))
output += "\n"
if self.nbs_to_remove:
print(output)
else:
logging.debug("No NBS found")
def run(self):
"""Initialize and build required lists of obsolete entries in archive.
Check integrity of passed parameters and store organised data.
The result list is the self.nbs_to_remove which should contain
obsolete packages not currently able to be built from again.
Another preliminary lists can be inspected in order to have better
idea of what was computed.
If anything goes wrong mid-process, it raises ArchiveCruftCheckError,
otherwise a list of packages to be removes is printed.
"""
try:
self.distro = self.launchpad.distributions[
self.distribution_name]
except KeyError:
raise ArchiveCruftCheckerError(
"Invalid distribution: '%s'" % self.distribution_name)
if not self.suite:
self.distroseries = self.distro.current_series
self.suite = self.distroseries.name
else:
try:
self.distroseries = self.distro.getSeries(
name_or_version=self.suite.split('-')[0])
except HTTPError:
raise ArchiveCruftCheckerError(
"Invalid suite: '%s'" % self.suite)
if not os.path.exists(self.dist_archive):
raise ArchiveCruftCheckerError(
"Invalid archive path: '%s'" % self.dist_archive)
self.archive = self.distro.main_archive
self.distroarchseries = list(self.distroseries.architectures)
self.das_urls = [das.self_link for das in self.distroarchseries]
self.architectures = [a.architecture_tag
for a in self.distroarchseries]
self.components = self.distroseries.component_names
apt_pkg.init()
self.processSources()
self.buildNBS()
self.refineNBS()
self.outputNBS()
def main():
parser = optparse.OptionParser()
parser.add_option(
"-l", "--launchpad", dest="launchpad_instance", default="production")
parser.add_option(
"-d", "--distro", dest="distro", default="ubuntu", help="check DISTRO")
parser.add_option(
"-s", "--suite", dest="suite", help="only act on SUITE")
parser.add_option(
"-n", "--no-action", dest="action", default=True, action="store_false",
help="unused compatibility option")
parser.add_option(
"-v", "--verbose", dest="verbose", default=False, action="store_true",
help="emit verbose debugging messages")
options, args = parser.parse_args()
if args:
archive_path = args[0]
else:
logging.error('Archive path is required')
return 1
if options.verbose:
logging.basicConfig(level=logging.DEBUG)
checker = ArchiveCruftChecker(
launchpad_instance=options.launchpad_instance,
distribution_name=options.distro, suite=options.suite,
archive_path=archive_path)
checker.run()
return 0
if __name__ == '__main__':
sys.exit(main())