#! /usr/bin/python3 # Copyright 2013-2019 Canonical Ltd. # Author: Colin Watson # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; version 3 of the License. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . """Manage build base images.""" from __future__ import print_function __metaclass__ = type import argparse from collections import OrderedDict import hashlib import subprocess import sys from launchpadlib.launchpad import Launchpad from launchpadlib.uris import web_root_for_service_root from six.moves import shlex_quote from six.moves.urllib.parse import ( unquote, urlparse, ) from ubuntutools.question import YesNoQuestion import lputils # Convenience aliases. image_types = OrderedDict([ ("chroot", "Chroot tarball"), ("lxd", "LXD image"), ]) # Affordance for --from-livefs. image_types_by_name = { "livecd.ubuntu-base.rootfs.tar.gz": "Chroot tarball", "livecd.ubuntu-base.lxd.tar.gz": "LXD image", } def adjust_lp_url(parser, args, url): parsed_url = urlparse(url) if parsed_url.scheme != "": root_uri = args.launchpad._root_uri service_host = root_uri.host web_host = urlparse(web_root_for_service_root(str(root_uri))).hostname if parsed_url.hostname == service_host: return url elif parsed_url.hostname == web_host: return parsed_url.path else: parser.error( "%s is not on this Launchpad instance (%s)" % (url, web_host)) else: return url def describe_image_type(image_type): if image_type == "Chroot tarball": return "base chroot tarball" elif image_type == "LXD image": return "base LXD image" else: raise ValueError("unknown image type '%s'" % image_type) def get_chroot(args): das = args.architectures[0] suite_arch = "%s/%s" % (args.suite, das.architecture_tag) url = das.getChrootURL(pocket=args.pocket, image_type=args.image_type) if url is None: print("No %s for %s" % ( describe_image_type(args.image_type), suite_arch)) return 1 if args.dry_run: print("Would fetch %s" % url) else: # We use wget here to save on having to implement a progress bar # with urlretrieve. command = ["wget"] if args.filepath is not None: command.extend(["-O", args.filepath]) command.append(url) subprocess.check_call(command) return 0 def info_chroot(args): das = args.architectures[0] url = das.getChrootURL(pocket=args.pocket, image_type=args.image_type) if url is not None: print(url) return 0 def remove_chroot(args): das = args.architectures[0] previous_url = das.getChrootURL( pocket=args.pocket, image_type=args.image_type) if previous_url is not None: print("Previous %s: %s" % ( describe_image_type(args.image_type), previous_url)) suite_arch = "%s/%s" % (args.suite, das.architecture_tag) if args.dry_run: print("Would remove %s from %s" % ( describe_image_type(args.image_type), suite_arch)) else: if not args.confirm_all: if YesNoQuestion().ask( "Remove %s from %s" % ( describe_image_type(args.image_type), suite_arch), "no") == "no": return 0 das.removeChroot(pocket=args.pocket, image_type=args.image_type) return 0 def get_last_livefs_builds(livefs, architectures): """Get the most recent build for each of `architectures` in `livefs`.""" arch_tags = {das.self_link: das.architecture_tag for das in architectures} builds = {} for build in livefs.completed_builds: arch_tag = arch_tags.get(build.distro_arch_series_link) if arch_tag is not None and arch_tag not in builds: builds[arch_tag] = build if set(builds) == set(arch_tags.values()): break return [build for _, build in sorted(builds.items())] def set_chroots_from_livefs(args): """Set a whole batch of base images at once, for convenience.""" if args.image_type is None: image_types = [args.image_type] else: image_types = list(image_types.values()) livefs = args.launchpad.load(args.livefs_url) builds = get_last_livefs_builds(livefs, args.architectures) todo = [] for build in builds: das = build.distro_arch_series suite_arch = "%s/%s" % (args.suite, das.architecture_tag) for image_url in build.getFileUrls(): image_name = unquote(urlparse(image_url).path).split('/')[-1] image_type = image_types_by_name.get(image_name) if image_type is not None: previous_url = das.getChrootURL( pocket=args.pocket, image_type=image_type) if previous_url is not None: print("Previous %s for %s: %s" % ( describe_image_type(image_type), suite_arch, previous_url)) print("New %s for %s: %s" % ( describe_image_type(image_type), suite_arch, image_url)) todo.append( (das, build.self_link, image_name, args.pocket, image_type)) if todo: if args.dry_run: print("Not setting base images in dry-run mode.") else: if not args.confirm_all: if YesNoQuestion().ask("Set these base images", "no") == "no": return 0 for das, build_url, image_name, pocket, image_type in todo: das.setChrootFromBuild( livefsbuild=build_url, filename=image_name, pocket=pocket, image_type=image_type) print() print( "The following commands will roll back to these images if a " "future set is broken:") base_command = [ "manage-chroot", "-l", args.launchpad_instance, "-d", args.distribution.name, "-s", args.suite, ] for das, build_url, image_name, _, image_type in todo: command = base_command + [ "-a", das.architecture_tag, "-i", image_type, "--from-build", build_url, "-f", image_name, "set", ] print(" ".join(shlex_quote(arg) for arg in command)) return 0 def set_chroot(args): if args.livefs_url is not None: return set_chroots_from_livefs(args) das = args.architectures[0] previous_url = das.getChrootURL( pocket=args.pocket, image_type=args.image_type) if previous_url is not None: print("Previous %s: %s" % ( describe_image_type(args.image_type), previous_url)) suite_arch = "%s/%s" % (args.suite, das.architecture_tag) if args.build_url: target = "%s from %s" % (args.filepath, args.build_url) else: target = args.filepath if args.dry_run: print("Would set %s for %s to %s" % ( describe_image_type(args.image_type), suite_arch, target)) else: if not args.confirm_all: if YesNoQuestion().ask( "Set %s for %s to %s" % ( describe_image_type(args.image_type), suite_arch, target), "no") == "no": return 0 if args.build_url: das.setChrootFromBuild( livefsbuild=args.build_url, filename=args.filepath, pocket=args.pocket, image_type=args.image_type) else: with open(args.filepath, "rb") as f: data = f.read() sha1sum = hashlib.sha1(data).hexdigest() das.setChroot( data=data, sha1sum=sha1sum, pocket=args.pocket, image_type=args.image_type) return 0 commands = { "get": get_chroot, "info": info_chroot, "remove": remove_chroot, "set": set_chroot} def main(): parser = argparse.ArgumentParser() parser.add_argument( "-l", "--launchpad", dest="launchpad_instance", default="production") parser.add_argument( "-n", "--dry-run", default=False, action="store_true", help="only show removals that would be performed") parser.add_argument( "-y", "--confirm-all", default=False, action="store_true", help="do not ask for confirmation") parser.add_argument( "-d", "--distribution", default="ubuntu", metavar="DISTRIBUTION", help="manage base images for DISTRIBUTION") parser.add_argument( "-s", "--suite", "--series", dest="suite", metavar="SUITE", help="manage base images for SUITE") parser.add_argument( "-a", "--architecture", metavar="ARCHITECTURE", help="manage base images for ARCHITECTURE") parser.add_argument( "-i", "--image-type", metavar="TYPE", help="manage base images of type TYPE") parser.add_argument( "--from-livefs", dest="livefs_url", metavar="URL", help=( "Live filesystem to set base images from (sets base images for " "all available architectures and image types)")) parser.add_argument( "--from-build", dest="build_url", metavar="URL", help="Live filesystem build URL to set base image from") parser.add_argument( "-f", "--filepath", metavar="PATH", help="Base image file path (or file name if --from-build is given)") parser.add_argument("command", choices=sorted(commands.keys())) args = parser.parse_args() if args.command == "set" and args.livefs_url is None: if args.architecture is None: parser.error("The set command requires an architecture (-a).") if args.filepath is None: parser.error( "The set command requires a base image file path (-f).") if args.command != "set" or args.livefs_url is None: if args.image_type is None: args.image_type = "Chroot tarball" if args.image_type not in image_types.values(): image_type = image_types.get(args.image_type.lower()) if image_type is not None: args.image_type = image_type else: parser.error("Unknown image type '%s'." % args.image_type) if args.command in ("get", "info"): login_method = Launchpad.login_anonymously else: login_method = Launchpad.login_with args.launchpad = login_method( "manage-chroot", args.launchpad_instance, version="devel") lputils.setup_location(args) if args.command == "set": if args.livefs_url is not None: args.livefs_url = adjust_lp_url(parser, args, args.livefs_url) if args.build_url is not None: args.build_url = adjust_lp_url(parser, args, args.build_url) return commands[args.command](args) if __name__ == '__main__': sys.exit(main())