mirror of
				https://github.com/lubuntu-team/ppa-britney.git
				synced 2025-10-26 06:04:03 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			290 lines
		
	
	
		
			9.1 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
			
		
		
	
	
			290 lines
		
	
	
		
			9.1 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
| #! /usr/bin/env python
 | |
| 
 | |
| from __future__ import print_function
 | |
| 
 | |
| import atexit
 | |
| from collections import namedtuple
 | |
| import gzip
 | |
| import optparse
 | |
| import os
 | |
| import re
 | |
| import shutil
 | |
| import subprocess
 | |
| import tempfile
 | |
| try:
 | |
|     from urllib.parse import unquote
 | |
|     from urllib.request import urlretrieve
 | |
| except ImportError:
 | |
|     from urllib import unquote, urlretrieve
 | |
| 
 | |
| import apt_pkg
 | |
| from launchpadlib.launchpad import Launchpad
 | |
| 
 | |
| 
 | |
| # from dak, more or less
 | |
| re_no_epoch = re.compile(r"^\d+:")
 | |
| re_strip_revision = re.compile(r"-[^-]+$")
 | |
| re_changelog_versions = re.compile(r"^\w[-+0-9a-z.]+ \(([^\(\) \t]+)\)")
 | |
| 
 | |
| default_mirrors = ":".join([
 | |
|     '/home/ubuntu-archive/mirror/ubuntu',
 | |
|     '/srv/archive.ubuntu.com/ubuntu',
 | |
| ])
 | |
| tempdir = None
 | |
| 
 | |
| series_by_name = {}
 | |
| 
 | |
| 
 | |
| def ensure_tempdir():
 | |
|     global tempdir
 | |
|     if not tempdir:
 | |
|         tempdir = tempfile.mkdtemp(prefix='copy-report')
 | |
|         atexit.register(shutil.rmtree, tempdir)
 | |
| 
 | |
| 
 | |
| def decompress_open(tagfile):
 | |
|     if tagfile.startswith('http:') or tagfile.startswith('ftp:'):
 | |
|         url = tagfile
 | |
|         tagfile = urlretrieve(url)[0]
 | |
| 
 | |
|     if tagfile.endswith('.gz'):
 | |
|         ensure_tempdir()
 | |
|         decompressed = tempfile.mktemp(dir=tempdir)
 | |
|         fin = gzip.GzipFile(filename=tagfile)
 | |
|         with open(decompressed, 'wb') as fout:
 | |
|             fout.write(fin.read())
 | |
|         return open(decompressed, 'r')
 | |
|     else:
 | |
|         return open(tagfile, 'r')
 | |
| 
 | |
| 
 | |
| Section = namedtuple("Section", ["version", "directory", "files"])
 | |
| 
 | |
| 
 | |
| def tagfiletodict(tagfile):
 | |
|     suite = {}
 | |
|     for section in apt_pkg.TagFile(decompress_open(tagfile)):
 | |
|         files = [s.strip().split()[2] for s in section["Files"].split('\n')]
 | |
|         suite[section["Package"]] = Section(
 | |
|             version=section["Version"], directory=section["Directory"],
 | |
|             files=files)
 | |
|     return suite
 | |
| 
 | |
| 
 | |
| def find_dsc(options, pkg, section):
 | |
|     dsc_filename = [s for s in section.files if s.endswith('.dsc')][0]
 | |
|     for mirror in options.mirrors:
 | |
|         path = '%s/%s/%s' % (mirror, section.directory, dsc_filename)
 | |
|         if os.path.exists(path):
 | |
|             yield path
 | |
|     ensure_tempdir()
 | |
|     spph = options.archive.getPublishedSources(
 | |
|         source_name=pkg, version=section.version, exact_match=True)[0]
 | |
|     outdir = tempfile.mkdtemp(dir=tempdir)
 | |
|     filenames = []
 | |
|     for url in spph.sourceFileUrls():
 | |
|         filename = os.path.join(outdir, unquote(os.path.basename(url)))
 | |
|         urlretrieve(url, filename)
 | |
|         filenames.append(filename)
 | |
|     yield [s for s in filenames if s.endswith('.dsc')][0]
 | |
| 
 | |
| 
 | |
| class BrokenSourcePackage(Exception):
 | |
|     pass
 | |
| 
 | |
| 
 | |
| def get_changelog_versions(pkg, dsc, version):
 | |
|     ensure_tempdir()
 | |
| 
 | |
|     upstream_version = re_no_epoch.sub('', version)
 | |
|     upstream_version = re_strip_revision.sub('', upstream_version)
 | |
| 
 | |
|     with open(os.devnull, 'w') as devnull:
 | |
|         ret = subprocess.call(
 | |
|             ['dpkg-source', '-q', '--no-check', '-sn', '-x', dsc],
 | |
|             stdout=devnull, cwd=tempdir)
 | |
| 
 | |
|     # It's in the archive, so these assertions must hold.
 | |
|     if ret != 0:
 | |
|         raise BrokenSourcePackage(dsc)
 | |
| 
 | |
|     unpacked = '%s/%s-%s' % (tempdir, pkg, upstream_version)
 | |
|     assert os.path.isdir(unpacked)
 | |
|     changelog_path = '%s/debian/changelog' % unpacked
 | |
|     assert os.path.exists(changelog_path)
 | |
| 
 | |
|     with open(changelog_path) as changelog:
 | |
|         versions = set()
 | |
|         for line in changelog:
 | |
|             m = re_changelog_versions.match(line)
 | |
|             if m:
 | |
|                 versions.add(m.group(1))
 | |
| 
 | |
|     shutil.rmtree(unpacked)
 | |
| 
 | |
|     return versions
 | |
| 
 | |
| 
 | |
| def descended_from(options, pkg, section1, section2):
 | |
|     if apt_pkg.version_compare(section1.version, section2.version) <= 0:
 | |
|         return False
 | |
|     exception = None
 | |
|     for dsc in find_dsc(options, pkg, section1):
 | |
|         try:
 | |
|             versions = get_changelog_versions(pkg, dsc, section1.version)
 | |
|         except BrokenSourcePackage as exception:
 | |
|             continue
 | |
|         return section1.version in versions
 | |
|     raise exception
 | |
| 
 | |
| 
 | |
| Candidate = namedtuple(
 | |
|     "Candidate", ["package", "suite1", "suite2", "version1", "version2"])
 | |
| 
 | |
| 
 | |
| def get_series(options, name):
 | |
|     if name not in series_by_name:
 | |
|         series_by_name[name] = options.distro.getSeries(name_or_version=name)
 | |
|     return series_by_name[name]
 | |
| 
 | |
| 
 | |
| def already_copied(options, candidate):
 | |
|     if "-" in candidate.suite2:
 | |
|         series, pocket = candidate.suite2.split("-", 1)
 | |
|         pocket = pocket.title()
 | |
|     else:
 | |
|         series = candidate.suite2
 | |
|         pocket = "Release"
 | |
|     series = get_series(options, series)
 | |
|     pubs = options.archive.getPublishedSources(
 | |
|         source_name=candidate.package, version=candidate.version1,
 | |
|         exact_match=True, distro_series=series, pocket=pocket)
 | |
|     for pub in pubs:
 | |
|         if pub.status in ("Pending", "Published"):
 | |
|             return True
 | |
|     return False
 | |
| 
 | |
| 
 | |
| def copy(options, candidate):
 | |
|     if "-" in candidate.suite2:
 | |
|         to_series, to_pocket = candidate.suite2.split("-", 1)
 | |
|         to_pocket = to_pocket.title()
 | |
|     else:
 | |
|         to_series = candidate.suite2
 | |
|         to_pocket = "Release"
 | |
|     options.archive.copyPackage(
 | |
|         source_name=candidate.package, version=candidate.version1,
 | |
|         from_archive=options.archive, to_pocket=to_pocket, to_series=to_series,
 | |
|         include_binaries=True, auto_approve=True)
 | |
| 
 | |
| 
 | |
| def candidate_string(candidate):
 | |
|     string = ('copy-package -y -b -s %s --to-suite %s -e %s %s' %
 | |
|               (candidate.suite1, candidate.suite2, candidate.version1,
 | |
|                candidate.package))
 | |
|     if candidate.version2 is not None:
 | |
|         string += '  # %s: %s' % (candidate.suite2, candidate.version2)
 | |
|     return string
 | |
| 
 | |
| 
 | |
| def main():
 | |
|     apt_pkg.init_system()
 | |
| 
 | |
|     parser = optparse.OptionParser(usage="usage: %prog [options] [suites]")
 | |
|     parser.add_option(
 | |
|         "-l", "--launchpad", dest="launchpad_instance", default="production")
 | |
|     parser.add_option(
 | |
|         "--quick", action="store_true", help="don't examine changelogs")
 | |
|     parser.add_option(
 | |
|         "--copy-safe", action="store_true",
 | |
|         help="automatically copy safe candidates")
 | |
|     parser.add_option(
 | |
|         "--mirrors", default=default_mirrors,
 | |
|         help="colon-separated list of local mirrors")
 | |
|     options, args = parser.parse_args()
 | |
| 
 | |
|     options.launchpad = Launchpad.login_with(
 | |
|         "copy-report", options.launchpad_instance, version="devel")
 | |
|     options.distro = options.launchpad.distributions["ubuntu"]
 | |
|     options.archive = options.distro.main_archive
 | |
|     options.mirrors = options.mirrors.split(":")
 | |
| 
 | |
|     if args:
 | |
|         suites = args
 | |
|     else:
 | |
|         suites = reversed([
 | |
|             series.name
 | |
|             for series in options.launchpad.distributions["ubuntu"].series
 | |
|             if series.status in ("Supported", "Current Stable Release")])
 | |
| 
 | |
|     yes = []
 | |
|     maybe = []
 | |
|     no = []
 | |
| 
 | |
|     for suite in suites:
 | |
|         for component in 'main', 'restricted', 'universe', 'multiverse':
 | |
|             tagfile1 = '%s/dists/%s-security/%s/source/Sources.gz' % (
 | |
|                 options.mirrors[0], suite, component)
 | |
|             tagfile2 = '%s/dists/%s-updates/%s/source/Sources.gz' % (
 | |
|                 options.mirrors[0], suite, component)
 | |
|             name1 = '%s-security' % suite
 | |
|             name2 = '%s-updates' % suite
 | |
| 
 | |
|             suite1 = tagfiletodict(tagfile1)
 | |
|             suite2 = tagfiletodict(tagfile2)
 | |
| 
 | |
|             for package in sorted(suite1):
 | |
|                 section1 = suite1[package]
 | |
|                 section2 = suite2.get(package)
 | |
|                 if (section2 is None or
 | |
|                     (not options.quick and
 | |
|                      descended_from(options, package, section1, section2))):
 | |
|                     candidate = Candidate(
 | |
|                         package=package, suite1=name1, suite2=name2,
 | |
|                         version1=section1.version, version2=None)
 | |
|                     if not already_copied(options, candidate):
 | |
|                         yes.append(candidate)
 | |
|                 elif apt_pkg.version_compare(
 | |
|                         section1.version, section2.version) > 0:
 | |
|                     candidate = Candidate(
 | |
|                         package=package, suite1=name1, suite2=name2,
 | |
|                         version1=section1.version, version2=section2.version)
 | |
|                     if already_copied(options, candidate):
 | |
|                         pass
 | |
|                     elif not options.quick:
 | |
|                         no.append(candidate)
 | |
|                     else:
 | |
|                         maybe.append(candidate)
 | |
| 
 | |
|     if yes:
 | |
|         print("The following packages can be copied safely:")
 | |
|         print("--------------------------------------------")
 | |
|         print()
 | |
|         for candidate in yes:
 | |
|             print(candidate_string(candidate))
 | |
|         print()
 | |
| 
 | |
|         if options.copy_safe:
 | |
|             for candidate in yes:
 | |
|                 copy(options, candidate)
 | |
| 
 | |
|     if maybe:
 | |
|         print("Check that these packages are descendants before copying:")
 | |
|         print("---------------------------------------------------------")
 | |
|         print()
 | |
|         for candidate in maybe:
 | |
|             print('#%s' % candidate_string(candidate))
 | |
|         print()
 | |
| 
 | |
|     if no:
 | |
|         print("The following packages need to be merged by hand:")
 | |
|         print("-------------------------------------------------")
 | |
|         print()
 | |
|         for candidate in no:
 | |
|             print('#%s' % candidate_string(candidate))
 | |
|         print()
 | |
| 
 | |
| 
 | |
| if __name__ == '__main__':
 | |
|     main()
 |