mirror of
				https://github.com/lubuntu-team/ppa-britney.git
				synced 2025-10-25 05:34:03 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			369 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
			
		
		
	
	
			369 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
| #! /usr/bin/python
 | |
| # Copyright 2009-2012 Canonical Ltd.  This software is licensed under the
 | |
| # GNU Affero General Public License version 3.
 | |
| 
 | |
| from __future__ import print_function
 | |
| 
 | |
| from collections import defaultdict
 | |
| import logging
 | |
| import optparse
 | |
| import os
 | |
| import re
 | |
| import subprocess
 | |
| import sys
 | |
| import tempfile
 | |
| 
 | |
| import apt_pkg
 | |
| from launchpadlib.errors import HTTPError
 | |
| from launchpadlib.launchpad import Launchpad
 | |
| 
 | |
| 
 | |
| re_extract_src_version = re.compile(r"(\S+)\s*\((.*)\)")
 | |
| 
 | |
| 
 | |
| class ArchiveCruftCheckerError(Exception):
 | |
|     """ArchiveCruftChecker specific exception.
 | |
| 
 | |
|     Mostly used to describe errors in the initialization of this object.
 | |
|     """
 | |
| 
 | |
| 
 | |
| class TagFileNotFound(Exception):
 | |
|     """Raised when an archive tag file could not be found."""
 | |
| 
 | |
| 
 | |
| class ArchiveCruftChecker:
 | |
|     """Perform overall checks to identify and remove obsolete records.
 | |
| 
 | |
|     Use initialize() method to validate passed parameters and build the
 | |
|     infrastructure variables. It will raise ArchiveCruftCheckerError if
 | |
|     something goes wrong.
 | |
|     """
 | |
| 
 | |
|     # XXX cprov 2006-05-15: the default archive path should come
 | |
|     # from the config.
 | |
|     def __init__(self, launchpad_instance='production',
 | |
|                  distribution_name='ubuntu', suite=None,
 | |
|                  archive_path='/srv/launchpad.net/ubuntu-archive'):
 | |
|         """Store passed arguments.
 | |
| 
 | |
|         Also initialize empty variables for storing preliminary results.
 | |
|         """
 | |
|         self.launchpad = Launchpad.login_anonymously(
 | |
|             'archive-cruft-check', launchpad_instance)
 | |
|         self.distribution_name = distribution_name
 | |
|         self.suite = suite
 | |
|         self.archive_path = archive_path
 | |
|         # initialize a group of variables to store temporary results
 | |
|         # available versions of published sources
 | |
|         self.source_versions = {}
 | |
|         # available binaries produced by published sources
 | |
|         self.source_binaries = {}
 | |
|         # 'Not Build From Source' binaries
 | |
|         self.nbs = defaultdict(lambda: defaultdict(dict))
 | |
|         # published binary package names
 | |
|         self.bin_pkgs = defaultdict(list)
 | |
|         # Architecture specific binary packages
 | |
|         self.arch_any = defaultdict(lambda: "0")
 | |
|         # proposed NBS (before clean up)
 | |
|         self.dubious_nbs = defaultdict(lambda: defaultdict(set))
 | |
|         # NBS after clean up
 | |
|         self.real_nbs = defaultdict(lambda: defaultdict(set))
 | |
|         # definitive NBS organized for clean up
 | |
|         self.nbs_to_remove = []
 | |
| 
 | |
|     @property
 | |
|     def components_and_di(self):
 | |
|         components_and_di = []
 | |
|         for component in self.components:
 | |
|             components_and_di.append(component)
 | |
|             components_and_di.append('%s/debian-installer' % (component))
 | |
|         return components_and_di
 | |
| 
 | |
|     @property
 | |
|     def dist_archive(self):
 | |
|         return os.path.join(
 | |
|             self.archive_path, self.distro.name, 'dists', self.suite)
 | |
| 
 | |
|     def gunzipTagFileContent(self, filename):
 | |
|         """Gunzip the contents of passed filename.
 | |
| 
 | |
|         Check filename presence, if not present in the filesystem,
 | |
|         raises ArchiveCruftCheckerError. Use an tempfile.mkstemp()
 | |
|         to store the uncompressed content. Invoke system available
 | |
|         gunzip`, raises ArchiveCruftCheckError if it fails.
 | |
| 
 | |
|         This method doesn't close the file descriptor used and does not
 | |
|         remove the temporary file from the filesystem, those actions
 | |
|         are required in the callsite. (apt_pkg.TagFile is lazy)
 | |
| 
 | |
|         Return a tuple containing:
 | |
|          * temp file descriptor
 | |
|          * temp filename
 | |
|          * the contents parsed by apt_pkg.TagFile()
 | |
|         """
 | |
|         if not os.path.exists(filename):
 | |
|             raise TagFileNotFound("File does not exist: %s" % filename)
 | |
| 
 | |
|         temp_fd, temp_filename = tempfile.mkstemp()
 | |
|         subprocess.check_call(['gunzip', '-c', filename], stdout=temp_fd)
 | |
| 
 | |
|         os.lseek(temp_fd, 0, os.SEEK_SET)
 | |
|         temp_file = os.fdopen(temp_fd)
 | |
|         # XXX cprov 2006-05-15: maybe we need some sort of data integrity
 | |
|         # check at this point, and maybe keep the uncompressed file
 | |
|         # for debug purposes, let's see how it behaves in real conditions.
 | |
|         parsed_contents = apt_pkg.TagFile(temp_file)
 | |
| 
 | |
|         return temp_file, temp_filename, parsed_contents
 | |
| 
 | |
|     def processSources(self):
 | |
|         """Process archive sources index.
 | |
| 
 | |
|         Build source_binaries, source_versions and bin_pkgs lists.
 | |
|         """
 | |
|         logging.debug("Considering Sources:")
 | |
|         for component in self.components:
 | |
|             filename = os.path.join(
 | |
|                 self.dist_archive, "%s/source/Sources.gz" % component)
 | |
| 
 | |
|             logging.debug("Processing %s" % filename)
 | |
|             try:
 | |
|                 temp_fd, temp_filename, parsed_sources = (
 | |
|                     self.gunzipTagFileContent(filename))
 | |
|             except TagFileNotFound as warning:
 | |
|                 logging.warning(warning)
 | |
|                 return
 | |
|             try:
 | |
|                 for section in parsed_sources:
 | |
|                     source = section.find("Package")
 | |
|                     source_version = section.find("Version")
 | |
|                     binaries = section.find("Binary")
 | |
|                     for binary in [
 | |
|                             item.strip() for item in binaries.split(',')]:
 | |
|                         self.bin_pkgs[binary].append(source)
 | |
| 
 | |
|                     self.source_binaries[source] = binaries
 | |
|                     self.source_versions[source] = source_version
 | |
|             finally:
 | |
|                 # close fd and remove temporary file used to store
 | |
|                 # uncompressed tag file content from the filesystem.
 | |
|                 temp_fd.close()
 | |
|                 os.unlink(temp_filename)
 | |
| 
 | |
|     def buildNBS(self):
 | |
|         """Build the group of 'not build from source' binaries"""
 | |
|         # Checks based on the Packages files
 | |
|         logging.debug("Building not built from source list (NBS):")
 | |
|         for component in self.components_and_di:
 | |
|             for architecture in self.architectures:
 | |
|                 self.buildArchNBS(component, architecture)
 | |
| 
 | |
|     def buildArchNBS(self, component, architecture):
 | |
|         """Build NBS per architecture.
 | |
| 
 | |
|         Store results in self.nbs, also build architecture specific
 | |
|         binaries group (stored in self.arch_any)
 | |
|         """
 | |
|         filename = os.path.join(
 | |
|             self.dist_archive,
 | |
|             "%s/binary-%s/Packages.gz" % (component, architecture))
 | |
| 
 | |
|         logging.debug("Processing %s" % filename)
 | |
|         try:
 | |
|             temp_fd, temp_filename, parsed_packages = (
 | |
|                 self.gunzipTagFileContent(filename))
 | |
|         except TagFileNotFound as warning:
 | |
|             logging.warn(warning)
 | |
|             return
 | |
| 
 | |
|         try:
 | |
|             for section in parsed_packages:
 | |
|                 package = section.find('Package')
 | |
|                 source = section.find('Source', "")
 | |
|                 version = section.find('Version')
 | |
|                 architecture = section.find('Architecture')
 | |
| 
 | |
|                 if source == "":
 | |
|                     source = package
 | |
| 
 | |
|                 if source.find("(") != -1:
 | |
|                     m = re_extract_src_version.match(source)
 | |
|                     source = m.group(1)
 | |
|                     version = m.group(2)
 | |
| 
 | |
|                 if package not in self.bin_pkgs:
 | |
|                     self.nbs[source][package][version] = ""
 | |
| 
 | |
|                 if architecture != "all":
 | |
|                     if apt_pkg.version_compare(
 | |
|                             version, self.arch_any[package]) < 1:
 | |
|                         self.arch_any[package] = version
 | |
|         finally:
 | |
|             # close fd and remove temporary file used to store uncompressed
 | |
|             # tag file content from the filesystem.
 | |
|             temp_fd.close()
 | |
|             os.unlink(temp_filename)
 | |
| 
 | |
|     def addNBS(self, nbs_d, source, version, package):
 | |
|         """Add a new entry in given organized nbs_d list
 | |
| 
 | |
|         Ensure the package is still published in the suite before add.
 | |
|         """
 | |
|         result = self.archive.getPublishedBinaries(
 | |
|             binary_name=package, exact_match=True, status='Published')
 | |
|         result = [bpph for bpph in result
 | |
|                   if bpph.distro_arch_series_link in self.das_urls]
 | |
| 
 | |
|         if result:
 | |
|             nbs_d[source][version].add(package)
 | |
| 
 | |
|     def refineNBS(self):
 | |
|         """ Distinguish dubious from real NBS.
 | |
| 
 | |
|         They are 'dubious' if the version numbers match and 'real'
 | |
|         if the versions don't match.
 | |
|         It stores results in self.dubious_nbs and self.real_nbs.
 | |
|         """
 | |
|         for source in self.nbs:
 | |
|             for package in self.nbs[source]:
 | |
|                 versions = sorted(
 | |
|                     self.nbs[source][package], cmp=apt_pkg.version_compare)
 | |
|                 latest_version = versions.pop()
 | |
| 
 | |
|                 source_version = self.source_versions.get(source, "0")
 | |
| 
 | |
|                 if apt_pkg.version_compare(latest_version,
 | |
|                                            source_version) == 0:
 | |
|                     # We don't actually do anything with dubious_nbs for
 | |
|                     # now, so let's not waste time computing it.
 | |
|                     #self.addNBS(self.dubious_nbs, source, latest_version,
 | |
|                     #            package)
 | |
|                     pass
 | |
|                 else:
 | |
|                     self.addNBS(self.real_nbs, source, latest_version,
 | |
|                                 package)
 | |
| 
 | |
|     def outputNBS(self):
 | |
|         """Properly display built NBS entries.
 | |
| 
 | |
|         Also organize the 'real' NBSs for removal in self.nbs_to_remove
 | |
|         attribute.
 | |
|         """
 | |
|         output = "Not Built from Source\n"
 | |
|         output += "---------------------\n\n"
 | |
| 
 | |
|         nbs_keys = sorted(self.real_nbs)
 | |
| 
 | |
|         for source in nbs_keys:
 | |
|             proposed_bin = self.source_binaries.get(
 | |
|                 source, "(source does not exist)")
 | |
|             proposed_version = self.source_versions.get(source, "??")
 | |
|             output += (" * %s_%s builds: %s\n"
 | |
|                        % (source, proposed_version, proposed_bin))
 | |
|             output += "\tbut no longer builds:\n"
 | |
|             versions = sorted(
 | |
|                 self.real_nbs[source], cmp=apt_pkg.version_compare)
 | |
| 
 | |
|             for version in versions:
 | |
|                 packages = sorted(self.real_nbs[source][version])
 | |
| 
 | |
|                 for pkg in packages:
 | |
|                     self.nbs_to_remove.append(pkg)
 | |
| 
 | |
|                 output += "        o %s: %s\n" % (
 | |
|                     version, ", ".join(packages))
 | |
| 
 | |
|             output += "\n"
 | |
| 
 | |
|         if self.nbs_to_remove:
 | |
|             print(output)
 | |
|         else:
 | |
|             logging.debug("No NBS found")
 | |
| 
 | |
|     def run(self):
 | |
|         """Initialize and build required lists of obsolete entries in archive.
 | |
| 
 | |
|         Check integrity of passed parameters and store organised data.
 | |
|         The result list is the self.nbs_to_remove which should contain
 | |
|         obsolete packages not currently able to be built from again.
 | |
|         Another preliminary lists can be inspected in order to have better
 | |
|         idea of what was computed.
 | |
|         If anything goes wrong mid-process, it raises ArchiveCruftCheckError,
 | |
|         otherwise a list of packages to be removes is printed.
 | |
|         """
 | |
|         try:
 | |
|             self.distro = self.launchpad.distributions[
 | |
|                 self.distribution_name]
 | |
|         except KeyError:
 | |
|             raise ArchiveCruftCheckerError(
 | |
|                 "Invalid distribution: '%s'" % self.distribution_name)
 | |
| 
 | |
|         if not self.suite:
 | |
|             self.distroseries = self.distro.current_series
 | |
|             self.suite = self.distroseries.name
 | |
|         else:
 | |
|             try:
 | |
|                 self.distroseries = self.distro.getSeries(
 | |
|                     name_or_version=self.suite.split('-')[0])
 | |
|             except HTTPError:
 | |
|                 raise ArchiveCruftCheckerError(
 | |
|                     "Invalid suite: '%s'" % self.suite)
 | |
| 
 | |
|         if not os.path.exists(self.dist_archive):
 | |
|             raise ArchiveCruftCheckerError(
 | |
|                 "Invalid archive path: '%s'" % self.dist_archive)
 | |
| 
 | |
|         self.archive = self.distro.main_archive
 | |
|         self.distroarchseries = list(self.distroseries.architectures)
 | |
|         self.das_urls = [das.self_link for das in self.distroarchseries]
 | |
|         self.architectures = [a.architecture_tag
 | |
|                               for a in self.distroarchseries]
 | |
|         self.components = self.distroseries.component_names
 | |
| 
 | |
|         apt_pkg.init()
 | |
|         self.processSources()
 | |
|         self.buildNBS()
 | |
|         self.refineNBS()
 | |
|         self.outputNBS()
 | |
| 
 | |
| 
 | |
| def main():
 | |
|     parser = optparse.OptionParser()
 | |
| 
 | |
|     parser.add_option(
 | |
|         "-l", "--launchpad", dest="launchpad_instance", default="production")
 | |
|     parser.add_option(
 | |
|         "-d", "--distro", dest="distro", default="ubuntu", help="check DISTRO")
 | |
|     parser.add_option(
 | |
|         "-s", "--suite", dest="suite", help="only act on SUITE")
 | |
|     parser.add_option(
 | |
|         "-n", "--no-action", dest="action", default=True, action="store_false",
 | |
|         help="unused compatibility option")
 | |
|     parser.add_option(
 | |
|         "-v", "--verbose", dest="verbose", default=False, action="store_true",
 | |
|         help="emit verbose debugging messages")
 | |
| 
 | |
|     options, args = parser.parse_args()
 | |
| 
 | |
|     if args:
 | |
|         archive_path = args[0]
 | |
|     else:
 | |
|         logging.error('Archive path is required')
 | |
|         return 1
 | |
| 
 | |
|     if options.verbose:
 | |
|         logging.basicConfig(level=logging.DEBUG)
 | |
| 
 | |
|     checker = ArchiveCruftChecker(
 | |
|         launchpad_instance=options.launchpad_instance,
 | |
|         distribution_name=options.distro, suite=options.suite,
 | |
|         archive_path=archive_path)
 | |
|     checker.run()
 | |
| 
 | |
|     return 0
 | |
| 
 | |
| 
 | |
| if __name__ == '__main__':
 | |
|     sys.exit(main())
 |