diff --git a/debian/changelog b/debian/changelog index c385bad9..7f8a720e 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,3 +1,9 @@ +livecd-rootfs (2.525.41) UNRELEASED; urgency=medium + + * Use snap cli rather than custom snap-tool (LP: #1864252) + + -- Robert C Jennings Fri, 21 Feb 2020 20:21:31 -0600 + livecd-rootfs (2.525.40) bionic; urgency=medium * Stop building per-pi-flavor images and only build a pi generic image for diff --git a/debian/install b/debian/install index 339c4f2c..f0a66ca6 100644 --- a/debian/install +++ b/debian/install @@ -3,4 +3,3 @@ get-ppa-fingerprint usr/share/livecd-rootfs minimize-manual usr/share/livecd-rootfs magic-proxy usr/share/livecd-rootfs lp-in-release usr/share/livecd-rootfs -snap-tool usr/share/livecd-rootfs diff --git a/live-build/functions b/live-build/functions index 371d0bc0..ea47c8d1 100644 --- a/live-build/functions +++ b/live-build/functions @@ -447,10 +447,7 @@ _snap_preseed() { # Determine if and what core snap is needed local snap_info - snap_info=$(/usr/share/livecd-rootfs/snap-tool info \ - --cohort-key="${COHORT_KEY:-}" \ - --channel="$CHANNEL" "${SNAP_NAME}" \ - ) + snap_info=$(snap info --verbose "${SNAP_NAME}") if [ $? -ne 0 ]; then echo "Failed to retrieve base of $SNAP_NAME!" @@ -470,8 +467,8 @@ _snap_preseed() { sh -c " set -x; cd \"$CHROOT_ROOT/var/lib/snapd/seed\"; - SNAPPY_STORE_NO_CDN=1 /usr/share/livecd-rootfs/snap-tool download \ - --cohort-key=\"${COHORT_KEY:-}\" \ + SNAPPY_STORE_NO_CDN=1 snap download \ + --cohort=\"${COHORT_KEY:-}\" \ --channel=\"$CHANNEL\" \"$SNAP_NAME\"" || snap_download_failed=1 if [ $snap_download_failed = 1 ] ; then echo "If the channel ($CHANNEL) includes '*/ubuntu-##.##' track per " diff --git a/snap-tool b/snap-tool deleted file mode 100755 index cc4aa529..00000000 --- a/snap-tool +++ /dev/null @@ -1,603 +0,0 @@ -#!/usr/bin/python3 -#-*- encoding: utf-8 -*- -""" -This script can be used instead of the traditional `snap` command to download -snaps and accompanying assertions. It uses the new store API (v2) which allows -creating temporary snapshots of the channel map. - -To create such a snapshot run - - snap-tool cohort-create - -This will print a "cohort-key" to stdout, which can then be passed to future -invocations of `snap-tool download`. Whenever a cohort key is provided, the -store will provide a view of the channel map as it existed when the key was -created. -""" - -from textwrap import dedent - -import argparse -import base64 -import binascii -import getopt -import hashlib -import json -import os -import re -import shutil -import subprocess -import sys -import time -import urllib.error -import urllib.request - -EXIT_OK = 0 -EXIT_ERR = 1 - - -class SnapError(Exception): - """Generic error thrown by the Snap class.""" - pass - - -class SnapCraftError(SnapError): - """Error thrown on problems with the snapcraft APIs.""" - pass - - -class SnapAssertionError(SnapError): - """Error thrown on problems with the assertions API.""" - pass - - -class ExpBackoffHTTPClient: - """This class is an abstraction layer on top of urllib with additional - retry logic for more reliable downloads.""" - - class Request: - """This is a convenience wrapper around urllib.request.""" - - def __init__(self, request, do_retry, base_interval, num_tries): - """ - :param request: - An urllib.request.Request instance. - :param do_retry: - Whether to enable the exponential backoff and retry logic. - :param base_interval: - The initial interval to sleep after a failed attempt. - :param num_tries: - How many attempts to make. - """ - self._request = request - self._do_retry = do_retry - self._base_interval = base_interval - self._num_tries = num_tries - self._response = None - - def open(self): - """Open the connection.""" - if not self._response: - self._response = self._retry_urlopen() - - def close(self): - """Close the connection.""" - if self._response: - self._response.close() - self._response = None - - def data(self): - """Return the raw response body.""" - with self: - return self.read() - - def json(self): - """Return the deserialized response body interpreted as JSON.""" - return json.loads(self.data(), encoding="utf-8") - - def text(self): - """Return the response body as a unicode string.""" - encoding = "utf-8" - - with self: - content_type = self._response.getheader("Content-Type", "") - - if content_type == "application/json": - encoding = "utf-8" - else: - m = re.match(r"text/\S+;\s*charset=(?P\S+)", - content_type) - if m: - encoding=m.group("charset") - - return self.read().decode(encoding) - - def read(self, size=None): - """Read size bytes from the response. If size if not set, the - complete response body is read in.""" - return self._response.read(size) - - def __enter__(self): - """Make this class a context manager.""" - self.open() - return self - - def __exit__(self, type, value, traceback): - """Make this class a context manager.""" - self.close() - - def _retry_urlopen(self): - """Try to open the HTTP connection as many times as configured - through the constructor. Every time an error occurs, double the - time to wait until the next attempt.""" - for attempt in range(self._num_tries): - try: - return urllib.request.urlopen(self._request) - except Exception as e: - if isinstance(e, urllib.error.HTTPError) and e.code < 500: - raise - if attempt >= self._num_tries - 1: - raise - sys.stderr.write( - "WARNING: failed to open URL '{}': {}\n" - .format(self._request.full_url, str(e)) - ) - else: - break - - sleep_interval = self._base_interval * 2**attempt - sys.stderr.write( - "Retrying HTTP request in {} seconds...\n" - .format(sleep_interval) - ) - time.sleep(sleep_interval) - - - def __init__(self, do_retry=True, base_interval=2, num_tries=8): - """ - :param do_retry: - Whether to enable the retry logic. - :param base_interval: - The initial interval to sleep after a failed attempt. - :param num_tries: - How many attempts to make. - """ - self._do_retry = do_retry - self._base_interval = base_interval - self._num_tries = num_tries if do_retry else 1 - - def get(self, url, headers=None): - """Create a GET request that can be used to retrieve the resource - at the given URL. - - :param url: - An HTTP URL. - :param headers: - A dictionary of extra headers to send along. - :return: - An ExpBackoffHTTPClient.Request instance. - """ - return self._prepare_request(url, headers=headers) - - def post(self, url, data=None, json=None, headers=None): - """Create a POST request that can be used to submit data to the - endpoint at the given URL.""" - return self._prepare_request( - url, data=data, json_data=json, headers=headers - ) - - def _prepare_request(self, url, data=None, json_data=None, headers=None): - """Prepare a Request instance that can be used to retrieve data from - and/or send data to the endpoint at the given URL. - - :param url: - An HTTP URL. - :param data: - Raw binary data to send along in the request body. - :param json_data: - A Python data structure to be serialized and sent out in JSON - format. - :param headers: - A dictionary of extra headers to send along. - :return: - An ExpBackoffHTTPClient.Request instance. - """ - if data is not None and json_data is not None: - raise ValueError( - "Parameters 'data' and 'json_data' are mutually exclusive." - ) - - if json_data: - data = json.dumps(json_data, ensure_ascii=False) - if headers is None: - headers = {} - headers["Content-Type"] = "application/json" - if isinstance(data, str): - data = data.encode("utf-8") - - return ExpBackoffHTTPClient.Request( - urllib.request.Request(url, data=data, headers=headers or {}), - self._do_retry, - self._base_interval, - self._num_tries - ) - - -class Snap: - """This class provides methods to retrieve information about a snap and - download it together with its assertions.""" - - def __init__(self, name, channel="stable", arch="amd64", series=16, - cohort_key=None, assertion_url="https://assertions.ubuntu.com", - snapcraft_url="https://api.snapcraft.io", **kwargs): - """ - :param name: - The name of the snap. - :param channel: - The channel to operate on. - :param arch: - The Debian architecture of the snap (e.g. amd64, armhf, arm64, ...). - :param series: - The device series. This should always be 16. - :param cohort_key: - A cohort key to access a snapshot of the channel map. - """ - self._name = name - self._channel = channel - self._arch = arch - self._series = series - self._cohort_key = cohort_key - self._assertion_url = assertion_url - self._snapcraft_url = snapcraft_url - self._details = None - self._assertions = {} - - @classmethod - def cohort_create(cls): - """Get a cohort key for the current moment. A cohort key is valid - across all snaps, channels and architectures.""" - return Snap("core")\ - .get_details(cohort_create=True)\ - .get("cohort-key") - - def download(self, download_assertions=True): - """Download the snap container. If download_assertions is True, the - corresponding assertions will be downloaded, as well.""" - snap = self.get_details() - - snap_name = snap["name"] - snap_revision = snap["revision"] - publisher_id = snap["publisher"]["id"] - snap_download_url = snap["download"]["url"] - snap_byte_size = snap["download"]["size"] - filename = snap_name + "_" + str(snap_revision) - snap_filename = filename + ".snap" - assert_filename = filename + ".assert" - - skip_snap_download = False - - if os.path.exists(snap_filename) and os.path.getsize(snap_filename) \ - == snap_byte_size: - skip_snap_download = True - - headers = {} - - if os.environ.get("SNAPPY_STORE_NO_CDN", "0") != "0": - headers.update({ - "X-Ubuntu-No-Cdn": "true", - "Snap-CDN": "none", - }) - - if not skip_snap_download: - http_client = ExpBackoffHTTPClient() - response = http_client.get(snap_download_url, headers=headers) - with response, open(snap_filename, "wb+") as fp: - shutil.copyfileobj(response, fp) - - if os.path.getsize(snap_filename) != snap_byte_size: - raise SnapError( - "The downloaded snap does not have the expected size." - ) - - if not download_assertions: - return - - required_assertions = [ - "account-key", - "account", - "snap-declaration", - "snap-revision", - ] - - if publisher_id == "canonical": - required_assertions.remove("account") - - for assertion_name in required_assertions: - attr_name = "get_assertion_" + assertion_name.replace("-", "_") - # This will populate self._assertions[]. - getattr(self, attr_name)() - - with open(assert_filename, "w+", encoding="utf-8") as fp: - fp.write("\n".join(self._assertions[a] for a in - required_assertions)) - - def get_details(self, cohort_create=False): - """Get details about the snap. On subsequent calls, the cached results - are returned. If cohort_create is set to True, a cohort key will be - created and included in the result.""" - if self._details and not cohort_create: - return self._details - - if self.is_core_snap() and self._channel.startswith("stable/ubuntu-"): - sys.stderr.write( - "WARNING: switching channel from '{}' to 'stable' for '{}' " - "snap.\n".format(self._channel, self._name) - ) - self._channel = "stable" - - path = "/v2/snaps/refresh" - - data = { - "context": [], - "actions": [ - { - "action": "download", - "instance-key": "0", - "name": self._name, - "channel": self._channel, - } - ], - "fields": [ - "base", - "created-at", - "download", - "license", - "name", - "prices", - "publisher", - "revision", - "snap-id", - "summary", - "title", - "type", - "version", - ], - } - - # These are mutually exclusive. - if cohort_create: - data["actions"][0]["cohort-create"] = True - elif self._cohort_key: - data["actions"][0]["cohort-key"] = self._cohort_key - - try: - response_dict = self._do_snapcraft_request(path, json_data=data) - except SnapCraftError as e: - raise SnapError("failed to get details for '{}': {}" - .format(self._name, str(e))) - - snap_data = response_dict["results"][0] - - if snap_data.get("result") == "error": - raise SnapError( - "failed to get details for '{}': {}" - .format(self._name, snap_data.get("error", {}).get("message")) - ) - - # Have "base" initialized to something meaningful. - if self.is_core_snap(): - snap_data["snap"]["base"] = "" - elif snap_data["snap"].get("base") is None: - snap_data["snap"]["base"] = "core" - - # Copy the key into the snap details. - if "cohort-key" in snap_data: - snap_data["snap"]["cohort-key"] = snap_data["cohort-key"] - - if "error" in snap_data: - raise SnapError( - "failed to get details for '{}' in '{}' on '{}': {}" - .format(self._name, self._channel, self._arch, - snap_data["error"]["message"]) - ) - - self._details = snap_data["snap"] - return self._details - - def get_assertion_snap_revision(self): - """Download the snap-revision assertion associated with this snap. The - assertion is returned as a string.""" - if "snap-revision" in self._assertions: - return self._assertions["snap-revision"] - snap = self.get_details() - - snap_sha3_384 = base64.urlsafe_b64encode( - binascii.a2b_hex(snap["download"]["sha3-384"]) - ).decode("us-ascii") - - data = self._do_assertion_request("/v1/assertions/snap-revision/{}" - .format(snap_sha3_384)) - self._assertions["snap-revision"] = data - return data - - def get_assertion_snap_declaration(self): - """Download the snap-declaration assertion associated with this snap. - The assertion is returned as a string.""" - if "snap-declaration" in self._assertions: - return self._assertions["snap-declaration"] - snap = self.get_details() - series = self._series - snap_id = snap["snap-id"] - - data = self._do_assertion_request( - "/v1/assertions/snap-declaration/{}/{}" - .format(series, snap_id)) - - self._assertions["snap-declaration"] = data - return data - - def get_assertion_account(self): - """Download the account assertion associated with this snap. The - assertion is returned as a string.""" - if "account" in self._assertions: - return self._assertions["account"] - snap = self.get_details() - publisher_id = snap["publisher"]["id"] - data = self._do_assertion_request("/v1/assertions/account/{}" - .format(publisher_id)) - self._assertions["account"] = data - return data - - def get_assertion_account_key(self): - """Download the account-key assertion associated with this snap. The - assertion will be returned as a string.""" - if "account-key" in self._assertions: - return self._assertions["account-key"] - - declaration_data = self.get_assertion_snap_declaration() - sign_key_sha3 = None - - for line in declaration_data.splitlines(): - if line.startswith("sign-key-sha3-384:"): - sign_key_sha3 = line.split(":")[1].strip() - - data = self._do_assertion_request("/v1/assertions/account-key/{}" - .format(sign_key_sha3)) - - self._assertions["account-key"] = data - return data - - def is_core_snap(self): - return re.match(r"^core\d*$", self._name) != None - - def _do_assertion_request(self, path): - url = self._assertion_url + path - - headers = { - "Accept": "application/x.ubuntu.assertion", - } - - http_client = ExpBackoffHTTPClient() - try: - with http_client.get(url, headers=headers) as response: - return response.text() - except urllib.error.HTTPError as e: - raise SnapAssertionError(str(e)) - - def _do_snapcraft_request(self, path, json_data=None): - url = self._snapcraft_url + "/" + path - - headers = { - "Snap-Device-Series": str(self._series), - "Snap-Device-Architecture": self._arch, - } - - http_client = ExpBackoffHTTPClient() - try: - response = http_client.post(url, json=json_data, headers=headers) - with response: - return response.json() - except urllib.error.HTTPError as e: - raise SnapCraftError(str(e)) - - -class SnapCli: - - def __call__(self, args): - """Parse the command line arguments and execute the selected command.""" - options = self._parse_opts(args) - - try: - options.func(getattr(options, "snap", None), **vars(options)) - except SnapError as e: - sys.stderr.write("snap-tool {}: {}\n".format( - options.command, str(e))) - return EXIT_ERR - return EXIT_OK - - @staticmethod - def _get_host_deb_arch(): - result = subprocess.run(["dpkg", "--print-architecture"], - stdout=subprocess.PIPE, stderr=subprocess.PIPE, - universal_newlines=True, check=True) - - return result.stdout.strip() - - def _parse_opts(self, args): - main_parser = argparse.ArgumentParser() - subparsers = main_parser.add_subparsers(dest="command") - - parser_cohort_create = subparsers.add_parser("cohort-create", - help="Create a cohort key for the snap store channel map.") - parser_cohort_create.set_defaults(func=self._cohort_create) - - parser_download = subparsers.add_parser("download", - help="Download a snap from the store.") - parser_download.set_defaults(func=self._download) - - parser_info = subparsers.add_parser("info", - help="Retrieve information about a snap.") - parser_info.set_defaults(func=self._info) - - # Add common parameters. - for parser in [parser_download, parser_info]: - parser.add_argument("--cohort-key", dest="cohort_key", - help="A cohort key to pin the channel map to.", type=str) - parser.add_argument("--channel", dest="channel", - help="The publication channel to query (default: stable).", - type=str, default="stable") - parser.add_argument("--series", dest="series", - help="The device series (default: 16)", - type=int, default=16) - parser.add_argument("--arch", dest="arch", - help="The Debian architecture (default: amd64).", - type=str, default=self._get_host_deb_arch()) - parser.add_argument("snap", help="The name of the snap.") - - if not args: - main_parser.print_help() - sys.exit(EXIT_ERR) - - return main_parser.parse_args(args) - - def _cohort_create(self, _, **kwargs): - print(Snap.cohort_create()) - - def _download(self, snap_name, **kwargs): - Snap(snap_name, **kwargs).download() - - def _info(self, snap_name, **kwargs): - snap = Snap(snap_name, **kwargs) - info = snap.get_details() - - print(dedent("""\ - name: {} - summary: {} - arch: {} - base: {} - channel: {} - publisher: {} - license: {} - snap-id: {} - revision: {}""" - .format( - snap_name, - info.get("summary", ""), - snap._arch, - info.get("base"), - snap._channel, - info.get("publisher", {}).get("display-name", ""), - info.get("license", ""), - info.get("snap-id", ""), - info.get("revision", "") - )) - ) - - -if __name__ == "__main__": - try: - rval = SnapCli()(sys.argv[1:]) - except KeyboardInterrupt: - sys.stderr.write("snap-tool: caught keyboard interrupt, exiting.\n") - sys.exit(EXIT_ERR) - sys.exit(rval)