mirror of
				https://git.launchpad.net/livecd-rootfs
				synced 2025-11-04 02:44:07 +00:00 
			
		
		
		
	Use snap cli rather than custom snap-tool
snap-tool was added to support a deprecate cohort-key feature of the snap store. Recent changes in snap assertions have added additional fields which snap-tool is not retrieving. This resulted in snap install failures on first boot. This patch removes snap-tool and returns to using the snap cli. This ensures snap downloads will function without odd incompatibilities.
This commit is contained in:
		
							parent
							
								
									ccbc6aed83
								
							
						
					
					
						commit
						489f009e49
					
				
							
								
								
									
										6
									
								
								debian/changelog
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										6
									
								
								debian/changelog
									
									
									
									
										vendored
									
									
								
							@ -1,3 +1,9 @@
 | 
			
		||||
livecd-rootfs (2.645) UNRELEASED; urgency=medium
 | 
			
		||||
 | 
			
		||||
  * Use snap cli rather than custom snap-tool
 | 
			
		||||
 | 
			
		||||
 -- Robert C Jennings <robert.jennings@canonical.com>  Fri, 21 Feb 2020 14:54:56 -0600
 | 
			
		||||
 | 
			
		||||
livecd-rootfs (2.644) focal; urgency=medium
 | 
			
		||||
 | 
			
		||||
  * Rename the raspi3 SUBARCH to raspi, as we generate universal generic pi
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										1
									
								
								debian/install
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										1
									
								
								debian/install
									
									
									
									
										vendored
									
									
								
							@ -4,4 +4,3 @@ get-ppa-fingerprint usr/share/livecd-rootfs
 | 
			
		||||
minimize-manual usr/share/livecd-rootfs
 | 
			
		||||
magic-proxy usr/share/livecd-rootfs
 | 
			
		||||
lp-in-release usr/share/livecd-rootfs
 | 
			
		||||
snap-tool usr/share/livecd-rootfs
 | 
			
		||||
 | 
			
		||||
@ -490,10 +490,7 @@ _snap_preseed() {
 | 
			
		||||
            # Determine if and what core snap is needed
 | 
			
		||||
            local snap_info
 | 
			
		||||
 | 
			
		||||
            snap_info=$(/usr/share/livecd-rootfs/snap-tool info \
 | 
			
		||||
                --cohort-key="${COHORT_KEY:-}" \
 | 
			
		||||
                --channel="$CHANNEL" "${SNAP_NAME}" \
 | 
			
		||||
            )
 | 
			
		||||
            snap_info=$(snap info "${SNAP_NAME}")
 | 
			
		||||
 | 
			
		||||
            if [ $? -ne 0 ]; then
 | 
			
		||||
                echo "Failed to retrieve base of $SNAP_NAME!"
 | 
			
		||||
@ -513,8 +510,8 @@ _snap_preseed() {
 | 
			
		||||
    sh -c "
 | 
			
		||||
        set -x;
 | 
			
		||||
        cd \"$CHROOT_ROOT/var/lib/snapd/seed\";
 | 
			
		||||
        SNAPPY_STORE_NO_CDN=1 /usr/share/livecd-rootfs/snap-tool download \
 | 
			
		||||
            --cohort-key=\"${COHORT_KEY:-}\" \
 | 
			
		||||
        SNAPPY_STORE_NO_CDN=1 snap download \
 | 
			
		||||
            --cohort="${COHORT_KEY:-}" \
 | 
			
		||||
            --channel=\"$CHANNEL\" \"$SNAP_NAME\"" || snap_download_failed=1
 | 
			
		||||
    if [ $snap_download_failed = 1 ] ; then
 | 
			
		||||
        echo "If the channel ($CHANNEL) includes '*/ubuntu-##.##' track per "
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										603
									
								
								snap-tool
									
									
									
									
									
								
							
							
						
						
									
										603
									
								
								snap-tool
									
									
									
									
									
								
							@ -1,603 +0,0 @@
 | 
			
		||||
#!/usr/bin/python3
 | 
			
		||||
#-*- encoding: utf-8 -*-
 | 
			
		||||
"""
 | 
			
		||||
This script can be used instead of the traditional `snap` command to download
 | 
			
		||||
snaps and accompanying assertions. It uses the new store API (v2) which allows
 | 
			
		||||
creating temporary snapshots of the channel map.
 | 
			
		||||
 | 
			
		||||
To create such a snapshot run
 | 
			
		||||
 | 
			
		||||
    snap-tool cohort-create
 | 
			
		||||
 | 
			
		||||
This will print a "cohort-key" to stdout, which can then be passed to future
 | 
			
		||||
invocations of `snap-tool download`. Whenever a cohort key is provided, the
 | 
			
		||||
store will provide a view of the channel map as it existed when the key was
 | 
			
		||||
created.
 | 
			
		||||
"""
 | 
			
		||||
 | 
			
		||||
from textwrap import dedent
 | 
			
		||||
 | 
			
		||||
import argparse
 | 
			
		||||
import base64
 | 
			
		||||
import binascii
 | 
			
		||||
import getopt
 | 
			
		||||
import hashlib
 | 
			
		||||
import json
 | 
			
		||||
import os
 | 
			
		||||
import re
 | 
			
		||||
import shutil
 | 
			
		||||
import subprocess
 | 
			
		||||
import sys
 | 
			
		||||
import time
 | 
			
		||||
import urllib.error
 | 
			
		||||
import urllib.request
 | 
			
		||||
 | 
			
		||||
EXIT_OK  = 0
 | 
			
		||||
EXIT_ERR = 1
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class SnapError(Exception):
 | 
			
		||||
    """Generic error thrown by the Snap class."""
 | 
			
		||||
    pass
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class SnapCraftError(SnapError):
 | 
			
		||||
    """Error thrown on problems with the snapcraft APIs."""
 | 
			
		||||
    pass
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class SnapAssertionError(SnapError):
 | 
			
		||||
    """Error thrown on problems with the assertions API."""
 | 
			
		||||
    pass
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class ExpBackoffHTTPClient:
 | 
			
		||||
    """This class is an abstraction layer on top of urllib with additional
 | 
			
		||||
    retry logic for more reliable downloads."""
 | 
			
		||||
 | 
			
		||||
    class Request:
 | 
			
		||||
        """This is a convenience wrapper around urllib.request."""
 | 
			
		||||
 | 
			
		||||
        def __init__(self, request, do_retry, base_interval, num_tries):
 | 
			
		||||
            """
 | 
			
		||||
            :param request:
 | 
			
		||||
                An urllib.request.Request instance.
 | 
			
		||||
            :param do_retry:
 | 
			
		||||
                Whether to enable the exponential backoff and retry logic.
 | 
			
		||||
            :param base_interval:
 | 
			
		||||
                The initial interval to sleep after a failed attempt.
 | 
			
		||||
            :param num_tries:
 | 
			
		||||
                How many attempts to make.
 | 
			
		||||
            """
 | 
			
		||||
            self._request       = request
 | 
			
		||||
            self._do_retry      = do_retry
 | 
			
		||||
            self._base_interval = base_interval
 | 
			
		||||
            self._num_tries     = num_tries
 | 
			
		||||
            self._response      = None
 | 
			
		||||
 | 
			
		||||
        def open(self):
 | 
			
		||||
            """Open the connection."""
 | 
			
		||||
            if not self._response:
 | 
			
		||||
                self._response = self._retry_urlopen()
 | 
			
		||||
 | 
			
		||||
        def close(self):
 | 
			
		||||
            """Close the connection."""
 | 
			
		||||
            if self._response:
 | 
			
		||||
                self._response.close()
 | 
			
		||||
                self._response = None
 | 
			
		||||
 | 
			
		||||
        def data(self):
 | 
			
		||||
            """Return the raw response body."""
 | 
			
		||||
            with self:
 | 
			
		||||
                return self.read()
 | 
			
		||||
 | 
			
		||||
        def json(self):
 | 
			
		||||
            """Return the deserialized response body interpreted as JSON."""
 | 
			
		||||
            return json.loads(self.data(), encoding="utf-8")
 | 
			
		||||
 | 
			
		||||
        def text(self):
 | 
			
		||||
            """Return the response body as a unicode string."""
 | 
			
		||||
            encoding = "utf-8"
 | 
			
		||||
 | 
			
		||||
            with self:
 | 
			
		||||
                content_type = self._response.getheader("Content-Type", "")
 | 
			
		||||
 | 
			
		||||
                if content_type == "application/json":
 | 
			
		||||
                    encoding = "utf-8"
 | 
			
		||||
                else:
 | 
			
		||||
                    m = re.match(r"text/\S+;\s*charset=(?P<charset>\S+)",
 | 
			
		||||
                            content_type)
 | 
			
		||||
                    if m:
 | 
			
		||||
                        encoding=m.group("charset")
 | 
			
		||||
 | 
			
		||||
                return self.read().decode(encoding)
 | 
			
		||||
 | 
			
		||||
        def read(self, size=None):
 | 
			
		||||
            """Read size bytes from the response. If size if not set, the
 | 
			
		||||
            complete response body is read in."""
 | 
			
		||||
            return self._response.read(size)
 | 
			
		||||
 | 
			
		||||
        def __enter__(self):
 | 
			
		||||
            """Make this class a context manager."""
 | 
			
		||||
            self.open()
 | 
			
		||||
            return self
 | 
			
		||||
 | 
			
		||||
        def __exit__(self, type, value, traceback):
 | 
			
		||||
            """Make this class a context manager."""
 | 
			
		||||
            self.close()
 | 
			
		||||
 | 
			
		||||
        def _retry_urlopen(self):
 | 
			
		||||
            """Try to open the HTTP connection as many times as configured
 | 
			
		||||
            through the constructor. Every time an error occurs, double the
 | 
			
		||||
            time to wait until the next attempt."""
 | 
			
		||||
            for attempt in range(self._num_tries):
 | 
			
		||||
                try:
 | 
			
		||||
                    return urllib.request.urlopen(self._request)
 | 
			
		||||
                except Exception as e:
 | 
			
		||||
                    if isinstance(e, urllib.error.HTTPError) and e.code < 500:
 | 
			
		||||
                        raise
 | 
			
		||||
                    if attempt >= self._num_tries - 1:
 | 
			
		||||
                        raise
 | 
			
		||||
                    sys.stderr.write(
 | 
			
		||||
                        "WARNING: failed to open URL '{}': {}\n"
 | 
			
		||||
                        .format(self._request.full_url, str(e))
 | 
			
		||||
                    )
 | 
			
		||||
                else:
 | 
			
		||||
                    break
 | 
			
		||||
 | 
			
		||||
                sleep_interval = self._base_interval * 2**attempt
 | 
			
		||||
                sys.stderr.write(
 | 
			
		||||
                    "Retrying HTTP request in {} seconds...\n"
 | 
			
		||||
                    .format(sleep_interval)
 | 
			
		||||
                )
 | 
			
		||||
                time.sleep(sleep_interval)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    def __init__(self, do_retry=True, base_interval=2, num_tries=8):
 | 
			
		||||
        """
 | 
			
		||||
        :param do_retry:
 | 
			
		||||
            Whether to enable the retry logic.
 | 
			
		||||
        :param base_interval:
 | 
			
		||||
            The initial interval to sleep after a failed attempt.
 | 
			
		||||
        :param num_tries:
 | 
			
		||||
            How many attempts to make.
 | 
			
		||||
        """
 | 
			
		||||
        self._do_retry      = do_retry
 | 
			
		||||
        self._base_interval = base_interval
 | 
			
		||||
        self._num_tries     = num_tries if do_retry else 1
 | 
			
		||||
 | 
			
		||||
    def get(self, url, headers=None):
 | 
			
		||||
        """Create a GET request that can be used to retrieve the resource
 | 
			
		||||
        at the given URL.
 | 
			
		||||
 | 
			
		||||
        :param url:
 | 
			
		||||
            An HTTP URL.
 | 
			
		||||
        :param headers:
 | 
			
		||||
            A dictionary of extra headers to send along.
 | 
			
		||||
        :return:
 | 
			
		||||
            An ExpBackoffHTTPClient.Request instance.
 | 
			
		||||
        """
 | 
			
		||||
        return self._prepare_request(url, headers=headers)
 | 
			
		||||
 | 
			
		||||
    def post(self, url, data=None, json=None, headers=None):
 | 
			
		||||
        """Create a POST request that can be used to submit data to the
 | 
			
		||||
        endpoint at the given URL."""
 | 
			
		||||
        return self._prepare_request(
 | 
			
		||||
            url, data=data, json_data=json, headers=headers
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
    def _prepare_request(self, url, data=None, json_data=None, headers=None):
 | 
			
		||||
        """Prepare a Request instance that can be used to retrieve data from
 | 
			
		||||
        and/or send data to the endpoint at the given URL.
 | 
			
		||||
 | 
			
		||||
        :param url:
 | 
			
		||||
            An HTTP URL.
 | 
			
		||||
        :param data:
 | 
			
		||||
            Raw binary data to send along in the request body.
 | 
			
		||||
        :param json_data:
 | 
			
		||||
            A Python data structure to be serialized and sent out in JSON
 | 
			
		||||
            format.
 | 
			
		||||
        :param headers:
 | 
			
		||||
            A dictionary of extra headers to send along.
 | 
			
		||||
        :return:
 | 
			
		||||
            An ExpBackoffHTTPClient.Request instance.
 | 
			
		||||
        """
 | 
			
		||||
        if data is not None and json_data is not None:
 | 
			
		||||
            raise ValueError(
 | 
			
		||||
                "Parameters 'data' and 'json_data' are mutually exclusive."
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
        if json_data:
 | 
			
		||||
            data = json.dumps(json_data, ensure_ascii=False)
 | 
			
		||||
            if headers is None:
 | 
			
		||||
                headers = {}
 | 
			
		||||
            headers["Content-Type"] = "application/json"
 | 
			
		||||
        if isinstance(data, str):
 | 
			
		||||
            data = data.encode("utf-8")
 | 
			
		||||
 | 
			
		||||
        return ExpBackoffHTTPClient.Request(
 | 
			
		||||
            urllib.request.Request(url, data=data, headers=headers or {}),
 | 
			
		||||
            self._do_retry,
 | 
			
		||||
            self._base_interval,
 | 
			
		||||
            self._num_tries
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class Snap:
 | 
			
		||||
    """This class provides methods to retrieve information about a snap and
 | 
			
		||||
    download it together with its assertions."""
 | 
			
		||||
 | 
			
		||||
    def __init__(self, name, channel="stable", arch="amd64", series=16,
 | 
			
		||||
            cohort_key=None, assertion_url="https://assertions.ubuntu.com",
 | 
			
		||||
            snapcraft_url="https://api.snapcraft.io", **kwargs):
 | 
			
		||||
        """
 | 
			
		||||
        :param name:
 | 
			
		||||
            The name of the snap.
 | 
			
		||||
        :param channel:
 | 
			
		||||
            The channel to operate on.
 | 
			
		||||
        :param arch:
 | 
			
		||||
            The Debian architecture of the snap (e.g. amd64, armhf, arm64, ...).
 | 
			
		||||
        :param series:
 | 
			
		||||
            The device series. This should always be 16.
 | 
			
		||||
        :param cohort_key:
 | 
			
		||||
            A cohort key to access a snapshot of the channel map.
 | 
			
		||||
        """
 | 
			
		||||
        self._name          = name
 | 
			
		||||
        self._channel       = channel
 | 
			
		||||
        self._arch          = arch
 | 
			
		||||
        self._series        = series
 | 
			
		||||
        self._cohort_key    = cohort_key
 | 
			
		||||
        self._assertion_url = assertion_url
 | 
			
		||||
        self._snapcraft_url = snapcraft_url
 | 
			
		||||
        self._details       = None
 | 
			
		||||
        self._assertions    = {}
 | 
			
		||||
 | 
			
		||||
    @classmethod
 | 
			
		||||
    def cohort_create(cls):
 | 
			
		||||
        """Get a cohort key for the current moment. A cohort key is valid
 | 
			
		||||
        across all snaps, channels and architectures."""
 | 
			
		||||
        return Snap("core")\
 | 
			
		||||
            .get_details(cohort_create=True)\
 | 
			
		||||
            .get("cohort-key")
 | 
			
		||||
 | 
			
		||||
    def download(self, download_assertions=True):
 | 
			
		||||
        """Download the snap container. If download_assertions is True, the
 | 
			
		||||
        corresponding assertions will be downloaded, as well."""
 | 
			
		||||
        snap = self.get_details()
 | 
			
		||||
 | 
			
		||||
        snap_name         = snap["name"]
 | 
			
		||||
        snap_revision     = snap["revision"]
 | 
			
		||||
        publisher_id      = snap["publisher"]["id"]
 | 
			
		||||
        snap_download_url = snap["download"]["url"]
 | 
			
		||||
        snap_byte_size    = snap["download"]["size"]
 | 
			
		||||
        filename          = snap_name + "_" + str(snap_revision)
 | 
			
		||||
        snap_filename     = filename + ".snap"
 | 
			
		||||
        assert_filename   = filename + ".assert"
 | 
			
		||||
 | 
			
		||||
        skip_snap_download = False
 | 
			
		||||
 | 
			
		||||
        if os.path.exists(snap_filename) and os.path.getsize(snap_filename) \
 | 
			
		||||
                == snap_byte_size:
 | 
			
		||||
            skip_snap_download = True
 | 
			
		||||
 | 
			
		||||
        headers = {}
 | 
			
		||||
 | 
			
		||||
        if os.environ.get("SNAPPY_STORE_NO_CDN", "0") != "0":
 | 
			
		||||
            headers.update({
 | 
			
		||||
                "X-Ubuntu-No-Cdn": "true",
 | 
			
		||||
                "Snap-CDN": "none",
 | 
			
		||||
            })
 | 
			
		||||
 | 
			
		||||
        if not skip_snap_download:
 | 
			
		||||
            http_client = ExpBackoffHTTPClient()
 | 
			
		||||
            response = http_client.get(snap_download_url, headers=headers)
 | 
			
		||||
            with response, open(snap_filename, "wb+") as fp:
 | 
			
		||||
                shutil.copyfileobj(response, fp)
 | 
			
		||||
 | 
			
		||||
        if os.path.getsize(snap_filename) != snap_byte_size:
 | 
			
		||||
            raise SnapError(
 | 
			
		||||
                "The downloaded snap does not have the expected size."
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
        if not download_assertions:
 | 
			
		||||
            return
 | 
			
		||||
 | 
			
		||||
        required_assertions = [
 | 
			
		||||
            "account-key",
 | 
			
		||||
            "account",
 | 
			
		||||
            "snap-declaration",
 | 
			
		||||
            "snap-revision",
 | 
			
		||||
        ]
 | 
			
		||||
 | 
			
		||||
        if publisher_id == "canonical":
 | 
			
		||||
            required_assertions.remove("account")
 | 
			
		||||
 | 
			
		||||
        for assertion_name in required_assertions:
 | 
			
		||||
            attr_name = "get_assertion_" + assertion_name.replace("-", "_")
 | 
			
		||||
            # This will populate self._assertions[<assertion_name>].
 | 
			
		||||
            getattr(self, attr_name)()
 | 
			
		||||
 | 
			
		||||
        with open(assert_filename, "w+", encoding="utf-8") as fp:
 | 
			
		||||
            fp.write("\n".join(self._assertions[a] for a in
 | 
			
		||||
                required_assertions))
 | 
			
		||||
 | 
			
		||||
    def get_details(self, cohort_create=False):
 | 
			
		||||
        """Get details about the snap. On subsequent calls, the cached results
 | 
			
		||||
        are returned. If cohort_create is set to True, a cohort key will be
 | 
			
		||||
        created and included in the result."""
 | 
			
		||||
        if self._details and not cohort_create:
 | 
			
		||||
            return self._details
 | 
			
		||||
 | 
			
		||||
        if self.is_core_snap() and self._channel.startswith("stable/ubuntu-"):
 | 
			
		||||
            sys.stderr.write(
 | 
			
		||||
                "WARNING: switching channel from '{}' to 'stable' for '{}' "
 | 
			
		||||
                "snap.\n".format(self._channel, self._name)
 | 
			
		||||
            )
 | 
			
		||||
            self._channel = "stable"
 | 
			
		||||
 | 
			
		||||
        path = "/v2/snaps/refresh"
 | 
			
		||||
 | 
			
		||||
        data = {
 | 
			
		||||
            "context": [],
 | 
			
		||||
            "actions": [
 | 
			
		||||
                {
 | 
			
		||||
                    "action":       "download",
 | 
			
		||||
                    "instance-key": "0",
 | 
			
		||||
                    "name":         self._name,
 | 
			
		||||
                    "channel":      self._channel,
 | 
			
		||||
                }
 | 
			
		||||
            ],
 | 
			
		||||
            "fields": [
 | 
			
		||||
                "base",
 | 
			
		||||
                "created-at",
 | 
			
		||||
                "download",
 | 
			
		||||
                "license",
 | 
			
		||||
                "name",
 | 
			
		||||
                "prices",
 | 
			
		||||
                "publisher",
 | 
			
		||||
                "revision",
 | 
			
		||||
                "snap-id",
 | 
			
		||||
                "summary",
 | 
			
		||||
                "title",
 | 
			
		||||
                "type",
 | 
			
		||||
                "version",
 | 
			
		||||
            ],
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        # These are mutually exclusive.
 | 
			
		||||
        if cohort_create:
 | 
			
		||||
            data["actions"][0]["cohort-create"] = True
 | 
			
		||||
        elif self._cohort_key:
 | 
			
		||||
            data["actions"][0]["cohort-key"] = self._cohort_key
 | 
			
		||||
 | 
			
		||||
        try:
 | 
			
		||||
            response_dict = self._do_snapcraft_request(path, json_data=data)
 | 
			
		||||
        except SnapCraftError as e:
 | 
			
		||||
            raise SnapError("failed to get details for '{}': {}"
 | 
			
		||||
                    .format(self._name, str(e)))
 | 
			
		||||
 | 
			
		||||
        snap_data = response_dict["results"][0]
 | 
			
		||||
 | 
			
		||||
        if snap_data.get("result") == "error":
 | 
			
		||||
            raise SnapError(
 | 
			
		||||
                "failed to get details for '{}': {}"
 | 
			
		||||
                .format(self._name, snap_data.get("error", {}).get("message"))
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
        # Have "base" initialized to something meaningful.
 | 
			
		||||
        if self.is_core_snap():
 | 
			
		||||
            snap_data["snap"]["base"] = ""
 | 
			
		||||
        elif snap_data["snap"].get("base") is None:
 | 
			
		||||
            snap_data["snap"]["base"] = "core"
 | 
			
		||||
 | 
			
		||||
        # Copy the key into the snap details.
 | 
			
		||||
        if "cohort-key" in snap_data:
 | 
			
		||||
            snap_data["snap"]["cohort-key"] = snap_data["cohort-key"]
 | 
			
		||||
 | 
			
		||||
        if "error" in snap_data:
 | 
			
		||||
            raise SnapError(
 | 
			
		||||
                "failed to get details for '{}' in '{}' on '{}': {}"
 | 
			
		||||
                    .format(self._name, self._channel, self._arch,
 | 
			
		||||
                        snap_data["error"]["message"])
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
        self._details = snap_data["snap"]
 | 
			
		||||
        return self._details
 | 
			
		||||
 | 
			
		||||
    def get_assertion_snap_revision(self):
 | 
			
		||||
        """Download the snap-revision assertion associated with this snap. The
 | 
			
		||||
        assertion is returned as a string."""
 | 
			
		||||
        if "snap-revision" in self._assertions:
 | 
			
		||||
            return self._assertions["snap-revision"]
 | 
			
		||||
        snap = self.get_details()
 | 
			
		||||
 | 
			
		||||
        snap_sha3_384 = base64.urlsafe_b64encode(
 | 
			
		||||
            binascii.a2b_hex(snap["download"]["sha3-384"])
 | 
			
		||||
        ).decode("us-ascii")
 | 
			
		||||
 | 
			
		||||
        data = self._do_assertion_request("/v1/assertions/snap-revision/{}"
 | 
			
		||||
                .format(snap_sha3_384))
 | 
			
		||||
        self._assertions["snap-revision"] = data
 | 
			
		||||
        return data
 | 
			
		||||
 | 
			
		||||
    def get_assertion_snap_declaration(self):
 | 
			
		||||
        """Download the snap-declaration assertion associated with this snap.
 | 
			
		||||
        The assertion is returned as a string."""
 | 
			
		||||
        if "snap-declaration" in self._assertions:
 | 
			
		||||
            return self._assertions["snap-declaration"]
 | 
			
		||||
        snap = self.get_details()
 | 
			
		||||
        series = self._series
 | 
			
		||||
        snap_id = snap["snap-id"]
 | 
			
		||||
 | 
			
		||||
        data = self._do_assertion_request(
 | 
			
		||||
                "/v1/assertions/snap-declaration/{}/{}"
 | 
			
		||||
                    .format(series, snap_id))
 | 
			
		||||
 | 
			
		||||
        self._assertions["snap-declaration"] = data
 | 
			
		||||
        return data
 | 
			
		||||
 | 
			
		||||
    def get_assertion_account(self):
 | 
			
		||||
        """Download the account assertion associated with this snap. The
 | 
			
		||||
        assertion is returned as a string."""
 | 
			
		||||
        if "account" in self._assertions:
 | 
			
		||||
            return self._assertions["account"]
 | 
			
		||||
        snap = self.get_details()
 | 
			
		||||
        publisher_id = snap["publisher"]["id"]
 | 
			
		||||
        data = self._do_assertion_request("/v1/assertions/account/{}"
 | 
			
		||||
                .format(publisher_id))
 | 
			
		||||
        self._assertions["account"] = data
 | 
			
		||||
        return data
 | 
			
		||||
 | 
			
		||||
    def get_assertion_account_key(self):
 | 
			
		||||
        """Download the account-key assertion associated with this snap. The
 | 
			
		||||
        assertion will be returned as a string."""
 | 
			
		||||
        if "account-key" in self._assertions:
 | 
			
		||||
            return self._assertions["account-key"]
 | 
			
		||||
 | 
			
		||||
        declaration_data = self.get_assertion_snap_declaration()
 | 
			
		||||
        sign_key_sha3 = None
 | 
			
		||||
 | 
			
		||||
        for line in declaration_data.splitlines():
 | 
			
		||||
            if line.startswith("sign-key-sha3-384:"):
 | 
			
		||||
                sign_key_sha3 = line.split(":")[1].strip()
 | 
			
		||||
 | 
			
		||||
        data = self._do_assertion_request("/v1/assertions/account-key/{}"
 | 
			
		||||
                .format(sign_key_sha3))
 | 
			
		||||
 | 
			
		||||
        self._assertions["account-key"] = data
 | 
			
		||||
        return data
 | 
			
		||||
 | 
			
		||||
    def is_core_snap(self):
 | 
			
		||||
        return re.match(r"^core\d*$", self._name) != None
 | 
			
		||||
 | 
			
		||||
    def _do_assertion_request(self, path):
 | 
			
		||||
        url = self._assertion_url + path
 | 
			
		||||
 | 
			
		||||
        headers = {
 | 
			
		||||
            "Accept": "application/x.ubuntu.assertion",
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        http_client = ExpBackoffHTTPClient()
 | 
			
		||||
        try:
 | 
			
		||||
            with http_client.get(url, headers=headers) as response:
 | 
			
		||||
                return response.text()
 | 
			
		||||
        except urllib.error.HTTPError as e:
 | 
			
		||||
            raise SnapAssertionError(str(e))
 | 
			
		||||
 | 
			
		||||
    def _do_snapcraft_request(self, path, json_data=None):
 | 
			
		||||
        url = self._snapcraft_url + "/" + path
 | 
			
		||||
 | 
			
		||||
        headers = {
 | 
			
		||||
            "Snap-Device-Series": str(self._series),
 | 
			
		||||
            "Snap-Device-Architecture": self._arch,
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        http_client = ExpBackoffHTTPClient()
 | 
			
		||||
        try:
 | 
			
		||||
            response = http_client.post(url, json=json_data, headers=headers)
 | 
			
		||||
            with response:
 | 
			
		||||
                return response.json()
 | 
			
		||||
        except urllib.error.HTTPError as e:
 | 
			
		||||
            raise SnapCraftError(str(e))
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class SnapCli:
 | 
			
		||||
 | 
			
		||||
    def __call__(self, args):
 | 
			
		||||
        """Parse the command line arguments and execute the selected command."""
 | 
			
		||||
        options = self._parse_opts(args)
 | 
			
		||||
 | 
			
		||||
        try:
 | 
			
		||||
            options.func(getattr(options, "snap", None), **vars(options))
 | 
			
		||||
        except SnapError as e:
 | 
			
		||||
            sys.stderr.write("snap-tool {}: {}\n".format(
 | 
			
		||||
                options.command, str(e)))
 | 
			
		||||
            return EXIT_ERR
 | 
			
		||||
        return EXIT_OK
 | 
			
		||||
 | 
			
		||||
    @staticmethod
 | 
			
		||||
    def _get_host_deb_arch():
 | 
			
		||||
        result = subprocess.run(["dpkg", "--print-architecture"],
 | 
			
		||||
                stdout=subprocess.PIPE, stderr=subprocess.PIPE,
 | 
			
		||||
                universal_newlines=True, check=True)
 | 
			
		||||
 | 
			
		||||
        return result.stdout.strip()
 | 
			
		||||
 | 
			
		||||
    def _parse_opts(self, args):
 | 
			
		||||
        main_parser = argparse.ArgumentParser()
 | 
			
		||||
        subparsers = main_parser.add_subparsers(dest="command")
 | 
			
		||||
 | 
			
		||||
        parser_cohort_create = subparsers.add_parser("cohort-create",
 | 
			
		||||
                help="Create a cohort key for the snap store channel map.")
 | 
			
		||||
        parser_cohort_create.set_defaults(func=self._cohort_create)
 | 
			
		||||
 | 
			
		||||
        parser_download = subparsers.add_parser("download",
 | 
			
		||||
                help="Download a snap from the store.")
 | 
			
		||||
        parser_download.set_defaults(func=self._download)
 | 
			
		||||
 | 
			
		||||
        parser_info = subparsers.add_parser("info",
 | 
			
		||||
                help="Retrieve information about a snap.")
 | 
			
		||||
        parser_info.set_defaults(func=self._info)
 | 
			
		||||
 | 
			
		||||
        # Add common parameters.
 | 
			
		||||
        for parser in [parser_download, parser_info]:
 | 
			
		||||
            parser.add_argument("--cohort-key", dest="cohort_key",
 | 
			
		||||
                help="A cohort key to pin the channel map to.", type=str)
 | 
			
		||||
            parser.add_argument("--channel", dest="channel",
 | 
			
		||||
                help="The publication channel to query (default: stable).",
 | 
			
		||||
                type=str, default="stable")
 | 
			
		||||
            parser.add_argument("--series", dest="series",
 | 
			
		||||
                help="The device series (default: 16)",
 | 
			
		||||
                type=int, default=16)
 | 
			
		||||
            parser.add_argument("--arch", dest="arch",
 | 
			
		||||
                help="The Debian architecture (default: amd64).",
 | 
			
		||||
                type=str, default=self._get_host_deb_arch())
 | 
			
		||||
            parser.add_argument("snap", help="The name of the snap.")
 | 
			
		||||
 | 
			
		||||
        if not args:
 | 
			
		||||
            main_parser.print_help()
 | 
			
		||||
            sys.exit(EXIT_ERR)
 | 
			
		||||
 | 
			
		||||
        return main_parser.parse_args(args)
 | 
			
		||||
 | 
			
		||||
    def _cohort_create(self, _, **kwargs):
 | 
			
		||||
        print(Snap.cohort_create())
 | 
			
		||||
 | 
			
		||||
    def _download(self, snap_name, **kwargs):
 | 
			
		||||
        Snap(snap_name, **kwargs).download()
 | 
			
		||||
 | 
			
		||||
    def _info(self, snap_name, **kwargs):
 | 
			
		||||
        snap = Snap(snap_name, **kwargs)
 | 
			
		||||
        info = snap.get_details()
 | 
			
		||||
 | 
			
		||||
        print(dedent("""\
 | 
			
		||||
            name:      {}
 | 
			
		||||
            summary:   {}
 | 
			
		||||
            arch:      {}
 | 
			
		||||
            base:      {}
 | 
			
		||||
            channel:   {}
 | 
			
		||||
            publisher: {}
 | 
			
		||||
            license:   {}
 | 
			
		||||
            snap-id:   {}
 | 
			
		||||
            revision:  {}"""
 | 
			
		||||
            .format(
 | 
			
		||||
                snap_name,
 | 
			
		||||
                info.get("summary", ""),
 | 
			
		||||
                snap._arch,
 | 
			
		||||
                info.get("base"),
 | 
			
		||||
                snap._channel,
 | 
			
		||||
                info.get("publisher", {}).get("display-name", ""),
 | 
			
		||||
                info.get("license", ""),
 | 
			
		||||
                info.get("snap-id", ""),
 | 
			
		||||
                info.get("revision", "")
 | 
			
		||||
            ))
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
if __name__ == "__main__":
 | 
			
		||||
    try:
 | 
			
		||||
        rval = SnapCli()(sys.argv[1:])
 | 
			
		||||
    except KeyboardInterrupt:
 | 
			
		||||
        sys.stderr.write("snap-tool: caught keyboard interrupt, exiting.\n")
 | 
			
		||||
        sys.exit(EXIT_ERR)
 | 
			
		||||
    sys.exit(rval)
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user