#! /usr/bin/python # Copyright 2009-2012 Canonical Ltd. This software is licensed under the # GNU Affero General Public License version 3. from __future__ import print_function from collections import defaultdict import logging import optparse import os import re import subprocess import sys import tempfile import apt_pkg from launchpadlib.errors import HTTPError from launchpadlib.launchpad import Launchpad re_extract_src_version = re.compile(r"(\S+)\s*\((.*)\)") class ArchiveCruftCheckerError(Exception): """ArchiveCruftChecker specific exception. Mostly used to describe errors in the initialization of this object. """ class TagFileNotFound(Exception): """Raised when an archive tag file could not be found.""" class ArchiveCruftChecker: """Perform overall checks to identify and remove obsolete records. Use initialize() method to validate passed parameters and build the infrastructure variables. It will raise ArchiveCruftCheckerError if something goes wrong. """ # XXX cprov 2006-05-15: the default archive path should come # from the config. def __init__(self, launchpad_instance='production', distribution_name='ubuntu', suite=None, archive_path='/srv/launchpad.net/ubuntu-archive'): """Store passed arguments. Also initialize empty variables for storing preliminary results. """ self.launchpad = Launchpad.login_anonymously( 'archive-cruft-check', launchpad_instance) self.distribution_name = distribution_name self.suite = suite self.archive_path = archive_path # initialize a group of variables to store temporary results # available versions of published sources self.source_versions = {} # available binaries produced by published sources self.source_binaries = {} # 'Not Build From Source' binaries self.nbs = defaultdict(lambda: defaultdict(dict)) # published binary package names self.bin_pkgs = defaultdict(list) # Architecture specific binary packages self.arch_any = defaultdict(lambda: "0") # proposed NBS (before clean up) self.dubious_nbs = defaultdict(lambda: defaultdict(set)) # NBS after clean up self.real_nbs = defaultdict(lambda: defaultdict(set)) # definitive NBS organized for clean up self.nbs_to_remove = [] @property def components_and_di(self): components_and_di = [] for component in self.components: components_and_di.append(component) components_and_di.append('%s/debian-installer' % (component)) return components_and_di @property def dist_archive(self): return os.path.join( self.archive_path, self.distro.name, 'dists', self.suite) def gunzipTagFileContent(self, filename): """Gunzip the contents of passed filename. Check filename presence, if not present in the filesystem, raises ArchiveCruftCheckerError. Use an tempfile.mkstemp() to store the uncompressed content. Invoke system available gunzip`, raises ArchiveCruftCheckError if it fails. This method doesn't close the file descriptor used and does not remove the temporary file from the filesystem, those actions are required in the callsite. (apt_pkg.TagFile is lazy) Return a tuple containing: * temp file descriptor * temp filename * the contents parsed by apt_pkg.TagFile() """ if not os.path.exists(filename): raise TagFileNotFound("File does not exist: %s" % filename) temp_fd, temp_filename = tempfile.mkstemp() subprocess.check_call(['gunzip', '-c', filename], stdout=temp_fd) os.lseek(temp_fd, 0, os.SEEK_SET) temp_file = os.fdopen(temp_fd) # XXX cprov 2006-05-15: maybe we need some sort of data integrity # check at this point, and maybe keep the uncompressed file # for debug purposes, let's see how it behaves in real conditions. parsed_contents = apt_pkg.TagFile(temp_file) return temp_file, temp_filename, parsed_contents def processSources(self): """Process archive sources index. Build source_binaries, source_versions and bin_pkgs lists. """ logging.debug("Considering Sources:") for component in self.components: filename = os.path.join( self.dist_archive, "%s/source/Sources.gz" % component) logging.debug("Processing %s" % filename) try: temp_fd, temp_filename, parsed_sources = ( self.gunzipTagFileContent(filename)) except TagFileNotFound as warning: logging.warning(warning) return try: for section in parsed_sources: source = section.find("Package") source_version = section.find("Version") binaries = section.find("Binary") for binary in [ item.strip() for item in binaries.split(',')]: self.bin_pkgs[binary].append(source) self.source_binaries[source] = binaries self.source_versions[source] = source_version finally: # close fd and remove temporary file used to store # uncompressed tag file content from the filesystem. temp_fd.close() os.unlink(temp_filename) def buildNBS(self): """Build the group of 'not build from source' binaries""" # Checks based on the Packages files logging.debug("Building not built from source list (NBS):") for component in self.components_and_di: for architecture in self.architectures: self.buildArchNBS(component, architecture) def buildArchNBS(self, component, architecture): """Build NBS per architecture. Store results in self.nbs, also build architecture specific binaries group (stored in self.arch_any) """ filename = os.path.join( self.dist_archive, "%s/binary-%s/Packages.gz" % (component, architecture)) logging.debug("Processing %s" % filename) try: temp_fd, temp_filename, parsed_packages = ( self.gunzipTagFileContent(filename)) except TagFileNotFound as warning: logging.warn(warning) return try: for section in parsed_packages: package = section.find('Package') source = section.find('Source', "") version = section.find('Version') architecture = section.find('Architecture') if source == "": source = package if source.find("(") != -1: m = re_extract_src_version.match(source) source = m.group(1) version = m.group(2) if package not in self.bin_pkgs: self.nbs[source][package][version] = "" if architecture != "all": if apt_pkg.version_compare( version, self.arch_any[package]) < 1: self.arch_any[package] = version finally: # close fd and remove temporary file used to store uncompressed # tag file content from the filesystem. temp_fd.close() os.unlink(temp_filename) def addNBS(self, nbs_d, source, version, package): """Add a new entry in given organized nbs_d list Ensure the package is still published in the suite before add. """ result = self.archive.getPublishedBinaries( binary_name=package, exact_match=True, status='Published') result = [bpph for bpph in result if bpph.distro_arch_series_link in self.das_urls] if result: nbs_d[source][version].add(package) def refineNBS(self): """ Distinguish dubious from real NBS. They are 'dubious' if the version numbers match and 'real' if the versions don't match. It stores results in self.dubious_nbs and self.real_nbs. """ for source in self.nbs: for package in self.nbs[source]: versions = sorted( self.nbs[source][package], cmp=apt_pkg.version_compare) latest_version = versions.pop() source_version = self.source_versions.get(source, "0") if apt_pkg.version_compare(latest_version, source_version) == 0: # We don't actually do anything with dubious_nbs for # now, so let's not waste time computing it. #self.addNBS(self.dubious_nbs, source, latest_version, # package) pass else: self.addNBS(self.real_nbs, source, latest_version, package) def outputNBS(self): """Properly display built NBS entries. Also organize the 'real' NBSs for removal in self.nbs_to_remove attribute. """ output = "Not Built from Source\n" output += "---------------------\n\n" nbs_keys = sorted(self.real_nbs) for source in nbs_keys: proposed_bin = self.source_binaries.get( source, "(source does not exist)") proposed_version = self.source_versions.get(source, "??") output += (" * %s_%s builds: %s\n" % (source, proposed_version, proposed_bin)) output += "\tbut no longer builds:\n" versions = sorted( self.real_nbs[source], cmp=apt_pkg.version_compare) for version in versions: packages = sorted(self.real_nbs[source][version]) for pkg in packages: self.nbs_to_remove.append(pkg) output += " o %s: %s\n" % ( version, ", ".join(packages)) output += "\n" if self.nbs_to_remove: print(output) else: logging.debug("No NBS found") def run(self): """Initialize and build required lists of obsolete entries in archive. Check integrity of passed parameters and store organised data. The result list is the self.nbs_to_remove which should contain obsolete packages not currently able to be built from again. Another preliminary lists can be inspected in order to have better idea of what was computed. If anything goes wrong mid-process, it raises ArchiveCruftCheckError, otherwise a list of packages to be removes is printed. """ try: self.distro = self.launchpad.distributions[ self.distribution_name] except KeyError: raise ArchiveCruftCheckerError( "Invalid distribution: '%s'" % self.distribution_name) if not self.suite: self.distroseries = self.distro.current_series self.suite = self.distroseries.name else: try: self.distroseries = self.distro.getSeries( name_or_version=self.suite.split('-')[0]) except HTTPError: raise ArchiveCruftCheckerError( "Invalid suite: '%s'" % self.suite) if not os.path.exists(self.dist_archive): raise ArchiveCruftCheckerError( "Invalid archive path: '%s'" % self.dist_archive) self.archive = self.distro.main_archive self.distroarchseries = list(self.distroseries.architectures) self.das_urls = [das.self_link for das in self.distroarchseries] self.architectures = [a.architecture_tag for a in self.distroarchseries] self.components = self.distroseries.component_names apt_pkg.init() self.processSources() self.buildNBS() self.refineNBS() self.outputNBS() def main(): parser = optparse.OptionParser() parser.add_option( "-l", "--launchpad", dest="launchpad_instance", default="production") parser.add_option( "-d", "--distro", dest="distro", default="ubuntu", help="check DISTRO") parser.add_option( "-s", "--suite", dest="suite", help="only act on SUITE") parser.add_option( "-n", "--no-action", dest="action", default=True, action="store_false", help="unused compatibility option") parser.add_option( "-v", "--verbose", dest="verbose", default=False, action="store_true", help="emit verbose debugging messages") options, args = parser.parse_args() if args: archive_path = args[0] else: logging.error('Archive path is required') return 1 if options.verbose: logging.basicConfig(level=logging.DEBUG) checker = ArchiveCruftChecker( launchpad_instance=options.launchpad_instance, distribution_name=options.distro, suite=options.suite, archive_path=archive_path) checker.run() return 0 if __name__ == '__main__': sys.exit(main())