#!/usr/bin/python3 # Parse removals.txt file # Copyright (C) 2004, 2005, 2009, 2010, 2011, 2012 Canonical Ltd. # Authors: James Troup , # Colin Watson # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA ############################################################################ # What kind of genius logs 3.5 years of removal data in # semi-human-parseable text and nothing else? Hmm. That would be me. :( # MAAAAAAAAAAAAAADNESSSSSSSSSSSSSSSSS ############################################################################ from __future__ import print_function import copy import optparse import os import sys import time try: from urllib.request import urlopen, urlretrieve except ImportError: from urllib import urlretrieve from urllib2 import urlopen import re import logging import gzip import datetime import subprocess import apt_pkg from launchpadlib.launchpad import Launchpad import lputils CONSUMER_KEY = "process-removals" Debian = None def parse_removals_file(options, removals_file): if options.report_ubuntu_delta and options.no_action: me = None else: me = options.launchpad.me.name state = "first separator" removal = {} for line in removals_file: line = line.strip().decode('UTF-8') # skip over spurious empty lines if not line and state in ("first separtor", "next separator", "date"): continue if line == ("=" * 73): if state == "done": state = "next separator" elif state in ("first separator", "next separator"): state = "date" else: raise RuntimeError("found separator but state is %s." % state) # NB: The 'Reason' is an abbreviation, check any referenced bugs for # more elif state == "next separator" and line.startswith("NB:"): state = "first separator" # [Date: Tue, 9 Jan 2001 20:46:15 -0500] [ftpmaster: James Troup] elif state in ("date", "next separator"): try: (date, ftpmaster, unused) = line.split("]") date = date.replace("[Date: ", "") state = "removed from" except: state = "broken date" # Sat, 06 May 2017 08:45:30 +0000] [ftpmaster: Chris Lamb] elif state == "broken date": (date, ftpmaster, unused2) = line.split("]") state = "removed from" # Removed the following packages from unstable: elif state == "removed from": # complete processing of the date from the preceding line try: removal["date"] = time.mktime(time.strptime( date, "%a, %d %b %Y %H:%M:%S %z")) except ValueError: removal["date"] = time.mktime(time.strptime( date, "%a %d %b %H:%M:%S %Z %Y")) removal["ftpmaster"] = ftpmaster.replace("] [ftpmaster: ", "") prefix = "Removed the following packages from " if not line.startswith(prefix): raise RuntimeError( "state = %s, expected '%s', got '%s'" % (state, prefix, line)) line = line.replace(prefix, "")[:-1] line = line.replace(" and", "") line = line.replace(",", "") removal["suites"] = line.split() state = "before packages" elif state == "before packages" or state == "after packages": if line: raise RuntimeError( "state = %s, expected '', got '%s'" % (state, line)) if state == "before packages": state = "packages" elif state == "after packages": state = "reason prefix" # xroach | 4.0-8 | source, alpha, arm, hppa, i386, ia64, m68k, \ # mips, mipsel, powerpc, s390, sparc # Closed bugs: 158188 elif state == "packages": if line.find("|") != -1: package, version, architectures = [ word.strip() for word in line.split("|")] architectures = [ arch.strip() for arch in architectures.split(",")] removal.setdefault("packages", []) removal["packages"].append([package, version, architectures]) elif line.startswith("Closed bugs: "): line = line.replace("Closed bugs: ", "") removal["closed bugs"] = line.split() state = "after packages" elif not line: state = "reason prefix" else: raise RuntimeError( "state = %s, expected package list or 'Closed bugs:', " "got '%s'" % (state, line)) # ------------------- Reason ------------------- elif state == "reason prefix": expected = "------------------- Reason -------------------" if not line == expected: raise RuntimeError( "state = %s, expected '%s', got '%s'" % (state, expected, line)) state = "reason" # RoSRM; license problems. # ---------------------------------------------- elif state == "reason": if line == "----------------------------------------------": state = "done" do_removal(me, options, removal) removal = {} else: removal.setdefault("reason", "") removal["reason"] += "%s\n" % line # nothing should go here def show_reverse_depends(options, package): """Show reverse dependencies. This is mostly done by calling reverse-depends, but with some tweaks to make the output easier to read in context. """ series_name = options.series.name commands = ( ["reverse-depends", "-r", series_name, "src:%s" % package], ["reverse-depends", "-r", series_name, "-a", "source", "src:%s" % package], ) for command in commands: subp = subprocess.Popen(command, stdout=subprocess.PIPE) line = None for line in subp.communicate()[0].splitlines(): line = line.decode('UTF-8').rstrip("\n") if line == "No reverse dependencies found": line = None else: print(line) if line: print() non_meta_re = re.compile(r'^[a-zA-Z0-9+,./:=@_-]+$') def shell_escape(arg): if non_meta_re.match(arg): return arg else: return "'%s'" % arg.replace("'", "'\\''") seen_sources = set() def do_removal(me, options, removal): if options.removed_from and options.removed_from not in removal["suites"]: return if options.date and int(options.date) > int(removal["date"]): return if options.architecture: for architecture in re.split(r'[, ]+', options.architecture): package_list = copy.copy(removal["packages"]) for entry in package_list: (package, version, architectures) = entry if architecture not in architectures: removal["packages"].remove(entry) if not removal["packages"]: return for entry in removal.get("packages", []): (package, version, architectures) = entry if options.source and options.source != package: continue if package in seen_sources: continue seen_sources.add(package) if package in Debian and not (options.force and options.source): logging.info("%s (%s): back in sid - skipping.", package, version) continue spphs = [] for pocket in ("Release", "Proposed"): options.pocket = pocket if pocket == "Release": options.suite = options.series.name else: options.suite = "%s-proposed" % options.series.name try: spphs.append( lputils.find_latest_published_source(options, package)) except lputils.PackageMissing: pass if not spphs: logging.debug("%s (%s): not found", package, version) continue if options.report_ubuntu_delta: if 'ubuntu' in version: print("%s %s" % (package, version)) continue if options.no_action: continue for spph in spphs: logging.info( "%s (%s/%s), removed from Debian on %s", package, version, spph.source_package_version, time.strftime("%Y-%m-%d", time.localtime(removal["date"]))) removal["reason"] = removal["reason"].rstrip('.\n') try: bugs = ';' + ','.join( [" Debian bug #%s" % item for item in removal["closed bugs"]]) except KeyError: bugs = '' reason = "(From Debian) " + removal["reason"] + bugs subprocess.call(['seeded-in-ubuntu', package]) show_reverse_depends(options, package) for spph in spphs: if options.no_action: print("remove-package -s %s%s -e %s -m %s %s" % ( options.series.name, "-proposed" if spph.pocket == "Proposed" else "", spph.source_package_version, shell_escape(reason), package)) else: from ubuntutools.question import YesNoQuestion print("Removing packages:") print("\t%s" % spph.display_name) for binary in spph.getPublishedBinaries(): print("\t\t%s" % binary.display_name) print("Comment: %s" % reason) if YesNoQuestion().ask("Remove", "no") == "yes": spph.requestDeletion(removal_comment=reason) def fetch_removals_file(options): removals = "removals-full.txt" if options.date: thisyear = datetime.datetime.today().year if options.date >= time.mktime(time.strptime(str(thisyear), "%Y")): removals = "removals.txt" logging.debug("Fetching %s" % removals) return urlopen("http://ftp-master.debian.org/%s" % removals) def read_sources(): global Debian Debian = {} base = "http://ftp.debian.org/debian" logging.debug("Reading Debian sources") for component in "main", "contrib", "non-free": filename = "Debian_unstable_%s_Sources" % component if not os.path.exists(filename): url = "%s/dists/unstable/%s/source/Sources.gz" % (base, component) logging.info("Fetching %s" % url) fetched, headers = urlretrieve(url) out = open(filename, 'wb') gzip_handle = gzip.GzipFile(fetched) while True: block = gzip_handle.read(65536) if not block: break out.write(block) out.close() gzip_handle.close() sources_filehandle = open(filename) Sources = apt_pkg.TagFile(sources_filehandle) while Sources.step(): pkg = Sources.section.find("Package") version = Sources.section.find("Version") if (pkg in Debian and apt_pkg.version_compare(Debian[pkg]["version"], version) > 0): continue Debian[pkg] = {} Debian[pkg]["version"] = version sources_filehandle.close() def parse_options(): parser = optparse.OptionParser() parser.add_option("-l", "--launchpad", dest="launchpad_instance", default="production") parser.add_option("-a", "--architecture", metavar="ARCH", help="remove from ARCH") parser.add_option("-d", "--date", metavar="DATE", help="only those removed since DATE (Unix epoch or %Y-%m-%d)") parser.add_option("-r", "--removed-from", metavar="SUITE", help="only those removed from SUITE (aka distroseries)") parser.add_option("-s", "--source", metavar="NAME", help="only source package NAME") parser.add_option("--force", action="store_true", default=False, help="force removal even for packages back in unstable " "(only with -s)") parser.add_option("-f", "--filename", help="parse FILENAME") parser.add_option("-n", "--no-action", action="store_true", help="don't remove packages; just print remove-package " "commands") parser.add_option("-v", "--verbose", action="store_true", help="emit verbose debugging messages") parser.add_option("--report-ubuntu-delta", action="store_true", help="skip and report packages with Ubuntu deltas") options, args = parser.parse_args() if len(args): parser.error("no arguments expected") if options.date: if len(options.date) == 8: print("The format of date should be %Y-%m-%d.") sys.exit(1) try: int(options.date) except ValueError: options.date = time.mktime(time.strptime(options.date, "%Y-%m-%d")) else: options.date = time.mktime(time.strptime("2009-02-01", "%Y-%m-%d")) if not options.removed_from: options.removed_from = "unstable" if not options.architecture: options.architecture = "source" if options.verbose: logging.basicConfig(level=logging.DEBUG) else: logging.basicConfig(level=logging.INFO) return options, args def main(): apt_pkg.init() """Initialization, including parsing of options.""" options, args = parse_options() if options.report_ubuntu_delta and options.no_action: options.launchpad = Launchpad.login_anonymously( CONSUMER_KEY, options.launchpad_instance) else: options.launchpad = Launchpad.login_with( CONSUMER_KEY, options.launchpad_instance) options.distribution = options.launchpad.distributions["ubuntu"] options.series = options.distribution.current_series options.archive = options.distribution.main_archive options.version = None read_sources() if options.filename: removals_file = open(options.filename, "rb") else: removals_file = fetch_removals_file(options) parse_removals_file(options, removals_file) removals_file.close() if __name__ == '__main__': main()