You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

320 lines
10 KiB

#! /usr/bin/env python2.7
from __future__ import print_function
import atexit
import bz2
from collections import namedtuple
import optparse
import os
import re
import shutil
import subprocess
import tempfile
try:
from urllib.parse import unquote
except ImportError:
from urllib import unquote
import apt_pkg
from launchpadlib.launchpad import Launchpad
import lzma
import requests
# from dak, more or less
re_no_epoch = re.compile(r"^\d+:")
re_strip_revision = re.compile(r"-[^-]+$")
re_changelog_versions = re.compile(r"^\w[-+0-9a-z.]+ \(([^\(\) \t]+)\)")
default_mirrors = ":".join([
'/home/ubuntu-archive/mirror/ubuntu',
'/srv/archive.ubuntu.com/ubuntu',
])
tempdir = None
series_by_name = {}
def ensure_tempdir():
global tempdir
if not tempdir:
tempdir = tempfile.mkdtemp(prefix='copy-report')
atexit.register(shutil.rmtree, tempdir)
def decompress_open(tagfile):
if tagfile.startswith('http:') or tagfile.startswith('ftp:'):
ensure_tempdir()
response = requests.get(tagfile, stream=True)
if response.status_code == 404:
response.close()
tagfile = tagfile.replace('.xz', '.bz2')
response = requests.get(tagfile, stream=True)
response.raise_for_status()
if '.' in tagfile:
suffix = '.' + tagfile.rsplit('.', 1)[1]
else:
suffix = ''
fd, tagfile = tempfile.mkstemp(suffix=suffix, dir=tempdir)
with os.fdopen(fd, 'wb') as f:
f.write(response.raw.read())
response.close()
elif not os.path.exists(tagfile):
tagfile = tagfile.replace('.xz', '.bz2')
if tagfile.endswith('.xz'):
decompressor = lzma.LZMAFile
elif tagfile.endswith('.bz2'):
decompressor = bz2.BZ2File
else:
decompressor = None
if decompressor is not None:
fd, decompressed = tempfile.mkstemp(dir=tempdir)
dcf = decompressor(tagfile)
try:
with os.fdopen(fd, 'wb') as f:
f.write(dcf.read())
finally:
dcf.close()
return open(decompressed, 'rb')
else:
return open(tagfile, 'rb')
Section = namedtuple("Section", ["version", "directory", "files"])
def tagfiletodict(tagfile):
suite = {}
for section in apt_pkg.TagFile(decompress_open(tagfile)):
files = [s.strip().split()[2] for s in section["Files"].split('\n')]
suite[section["Package"]] = Section(
version=section["Version"], directory=section["Directory"],
files=files)
return suite
def find_dsc(options, pkg, section):
dsc_filename = [s for s in section.files if s.endswith('.dsc')][0]
for mirror in options.mirrors:
path = '%s/%s/%s' % (mirror, section.directory, dsc_filename)
if os.path.exists(path):
yield path
ensure_tempdir()
spph = options.archive.getPublishedSources(
source_name=pkg, version=section.version, exact_match=True)[0]
outdir = tempfile.mkdtemp(dir=tempdir)
filenames = []
for url in spph.sourceFileUrls():
filename = os.path.join(outdir, unquote(os.path.basename(url)))
response = requests.get(url, stream=True)
response.raise_for_status()
with open(filename, 'wb') as f:
f.write(response.raw.read())
response.close()
filenames.append(filename)
yield [s for s in filenames if s.endswith('.dsc')][0]
class BrokenSourcePackage(Exception):
pass
def get_changelog_versions(pkg, dsc, version):
ensure_tempdir()
upstream_version = re_no_epoch.sub('', version)
upstream_version = re_strip_revision.sub('', upstream_version)
with open(os.devnull, 'w') as devnull:
ret = subprocess.call(
['dpkg-source', '-q', '--no-check', '-sn', '-x', dsc],
stdout=devnull, cwd=tempdir)
# It's in the archive, so these assertions must hold.
if ret != 0:
raise BrokenSourcePackage(dsc)
unpacked = '%s/%s-%s' % (tempdir, pkg, upstream_version)
assert os.path.isdir(unpacked)
changelog_path = '%s/debian/changelog' % unpacked
assert os.path.exists(changelog_path)
with open(changelog_path) as changelog:
versions = set()
for line in changelog:
m = re_changelog_versions.match(line)
if m:
versions.add(m.group(1))
shutil.rmtree(unpacked)
return versions
def descended_from(options, pkg, section1, section2):
if apt_pkg.version_compare(section1.version, section2.version) <= 0:
return False
exception = None
for dsc in find_dsc(options, pkg, section1):
try:
versions = get_changelog_versions(pkg, dsc, section1.version)
except BrokenSourcePackage as e:
exception = e
continue
return section1.version in versions
raise exception
Candidate = namedtuple(
"Candidate", ["package", "suite1", "suite2", "version1", "version2"])
def get_series(options, name):
if name not in series_by_name:
series_by_name[name] = options.distro.getSeries(name_or_version=name)
return series_by_name[name]
def already_copied(options, candidate):
if "-" in candidate.suite2:
series, pocket = candidate.suite2.split("-", 1)
pocket = pocket.title()
else:
series = candidate.suite2
pocket = "Release"
series = get_series(options, series)
pubs = options.archive.getPublishedSources(
source_name=candidate.package, version=candidate.version1,
exact_match=True, distro_series=series, pocket=pocket)
for pub in pubs:
if pub.status in ("Pending", "Published"):
return True
return False
def copy(options, candidate):
if "-" in candidate.suite2:
to_series, to_pocket = candidate.suite2.split("-", 1)
to_pocket = to_pocket.title()
else:
to_series = candidate.suite2
to_pocket = "Release"
options.archive.copyPackage(
source_name=candidate.package, version=candidate.version1,
from_archive=options.archive, to_pocket=to_pocket, to_series=to_series,
include_binaries=True, auto_approve=True)
def candidate_string(candidate):
string = ('copy-package -y -b -s %s --to-suite %s -e %s %s' %
(candidate.suite1, candidate.suite2, candidate.version1,
candidate.package))
if candidate.version2 is not None:
string += ' # %s: %s' % (candidate.suite2, candidate.version2)
return string
def main():
apt_pkg.init_system()
parser = optparse.OptionParser(usage="usage: %prog [options] [suites]")
parser.add_option(
"-l", "--launchpad", dest="launchpad_instance", default="production")
parser.add_option(
"--quick", action="store_true", help="don't examine changelogs")
parser.add_option(
"--copy-safe", action="store_true",
help="automatically copy safe candidates")
parser.add_option(
"--mirrors", default=default_mirrors,
help="colon-separated list of local mirrors")
options, args = parser.parse_args()
options.launchpad = Launchpad.login_with(
"copy-report", options.launchpad_instance, version="devel")
options.distro = options.launchpad.distributions["ubuntu"]
options.archive = options.distro.main_archive
options.mirrors = options.mirrors.split(":")
if args:
suites = args
else:
suites = reversed([
series.name
for series in options.launchpad.distributions["ubuntu"].series
if series.status in ("Supported", "Current Stable Release")])
yes = []
maybe = []
no = []
for suite in suites:
for component in 'main', 'restricted', 'universe', 'multiverse':
tagfile1 = '%s/dists/%s-security/%s/source/Sources.xz' % (
options.mirrors[0], suite, component)
tagfile2 = '%s/dists/%s-updates/%s/source/Sources.xz' % (
options.mirrors[0], suite, component)
name1 = '%s-security' % suite
name2 = '%s-updates' % suite
suite1 = tagfiletodict(tagfile1)
suite2 = tagfiletodict(tagfile2)
for package in sorted(suite1):
section1 = suite1[package]
section2 = suite2.get(package)
if (section2 is None or
(not options.quick and
descended_from(options, package, section1, section2))):
candidate = Candidate(
package=package, suite1=name1, suite2=name2,
version1=section1.version, version2=None)
if not already_copied(options, candidate):
yes.append(candidate)
elif apt_pkg.version_compare(
section1.version, section2.version) > 0:
candidate = Candidate(
package=package, suite1=name1, suite2=name2,
version1=section1.version, version2=section2.version)
if already_copied(options, candidate):
pass
elif not options.quick:
no.append(candidate)
else:
maybe.append(candidate)
if yes:
print("The following packages can be copied safely:")
print("--------------------------------------------")
print()
for candidate in yes:
print(candidate_string(candidate))
print()
if options.copy_safe:
for candidate in yes:
copy(options, candidate)
if maybe:
print("Check that these packages are descendants before copying:")
print("---------------------------------------------------------")
print()
for candidate in maybe:
print('#%s' % candidate_string(candidate))
print()
if no:
print("The following packages need to be merged by hand:")
print("-------------------------------------------------")
print()
for candidate in no:
print('#%s' % candidate_string(candidate))
print()
if __name__ == '__main__':
main()