Backport improvements to snap seeding from trunk.

Bug-Ubuntu: https://bugs.launchpad.net/bugs/1831675

Signed-off-by: Steve Langasek <steve.langasek@canonical.com>
sil2100/backport-datasource-raspi3
Tobias Koch 6 years ago committed by Steve Langasek
parent f85e411e6e
commit 31e4f4fe02

12
debian/changelog vendored

@ -1,3 +1,15 @@
livecd-rootfs (2.525.27) UNRELEASED; urgency=medium
* Backport improvements to snap seeding from trunk. LP: #1831675.
[ Tobias Koch ]
* Replace "snap download" with tool that uses snap store's coherence
feature.
* Detect which core snaps are required and install them on-the-fly.
* If image has core18 snaps only, automatically preseed snapd.
-- Steve Langasek <steve.langasek@ubuntu.com> Wed, 5 Jun 2019 16:35:15 +0700
livecd-rootfs (2.525.26) bionic; urgency=medium
* Strip translation files out of the minimal images, another thing that

1
debian/install vendored

@ -3,3 +3,4 @@ get-ppa-fingerprint usr/share/livecd-rootfs
minimize-manual usr/share/livecd-rootfs
magic-proxy usr/share/livecd-rootfs
lp-in-release usr/share/livecd-rootfs
snap-tool usr/share/livecd-rootfs

@ -378,6 +378,47 @@ release_ver() {
distro-info --series="$LB_DISTRIBUTION" -r | awk '{ print $1 }'
}
_snap_post_process() {
# Look for the 'core' snap. If it is not present, assume that the image
# contains only snaps with bases >= core18. In that case snapd is
# preseeded. However, when 'core' is being installed and snapd has not
# been installed by a call to 'snap_preseed' (see below) then it is
# removed again.
local CHROOT_ROOT=$1
local SNAP_NAME=$2
local seed_dir="$CHROOT_ROOT/var/lib/snapd/seed"
local snaps_dir="$seed_dir/snaps"
local seed_yaml="$seed_dir/seed.yaml"
local assertions_dir="$seed_dir/assertions"
local snapd_install_stamp="$seed_dir/.snapd-explicit-install-stamp"
case $SNAP_NAME in
core[0-9]*)
# If the 'core' snap is not present, assume we are coreXX-only and
# install the snapd snap.
if [ ! -f ${snaps_dir}/core_[0-9]*.snap ]; then
_snap_preseed $CHROOT_ROOT snapd stable
fi
;;
core)
# If the snapd snap has been seeded, but not marked as explicitly
# installed (see snap_preseed below), then remove it.
if [ -f ${snaps_dir}/snapd_[0-9]*.snap ] && \
[ ! -f "$snapd_install_stamp" ]
then
# Remove snap, assertions and entry in seed.yaml
rm -f ${snaps_dir}/snapd_[0-9]*.snap
rm -f ${assertions_dir}/snapd_[0-9]*.assert
sed -i -e'N;/name: snapd/,+2d' $seed_yaml
fi
;;
*)
# ignore
;;
esac
}
_snap_preseed() {
# Download the snap/assertion and add to the preseed
local CHROOT_ROOT=$1
@ -392,11 +433,38 @@ _snap_preseed() {
# Download the snap & assertion
local snap_download_failed=0
chroot $CHROOT_ROOT sh -c "
# Preseed a snap only once
if [ -f ${snaps_dir}/${SNAP_NAME}_[0-9]*.snap ]; then
return
fi
case $SNAP_NAME in
snapd)
# snapd is self-contained, ignore base
;;
*)
# Determine if and what core snap is needed
local core_snap=$(/usr/share/livecd-rootfs/snap-tool info \
--cohort-key="${COHORT_KEY:-}" \
--channel="$CHANNEL" "$SNAP_NAME" | \
grep '^base:' | awk '{print $2}'
)
# If $core_snap is not the empty string then SNAP itself is not a core
# snap and we must additionally seed the core snap.
if [ -n "$core_snap" ]; then
_snap_preseed $CHROOT_ROOT $core_snap stable
fi
;;
esac
sh -c "
set -x;
cd /var/lib/snapd/seed;
SNAPPY_STORE_NO_CDN=1 snap download \
--channel=$CHANNEL \"$SNAP_NAME\"" || snap_download_failed=1
cd \"$CHROOT_ROOT/var/lib/snapd/seed\";
SNAPPY_STORE_NO_CDN=1 /usr/share/livecd-rootfs/snap-tool download \
--cohort-key=\"${COHORT_KEY:-}\" \
--channel=\"$CHANNEL\" \"$SNAP_NAME\"" || snap_download_failed=1
if [ $snap_download_failed = 1 ] ; then
echo "If the channel ($CHANNEL) includes '*/ubuntu-##.##' track per "
echo "Ubuntu policy (ex. stable/ubuntu-18.04) the publisher will need "
@ -421,6 +489,12 @@ EOF
echo -n " file: " >> $seed_yaml
(cd $snaps_dir; ls -1 ${SNAP_NAME}_*.snap) >> $seed_yaml
# If $core_snap is the empty string then SNAP itself *may be* a core snap,
# and we run some post-processing logic.
if [ -z "$core_snap" ]; then
_snap_post_process $CHROOT_ROOT $SNAP_NAME
fi
}
snap_prepare_assertions() {
@ -476,21 +550,34 @@ snap_prepare() {
local snaps_dir="$seed_dir/snaps"
snap_prepare_assertions "$CHROOT_ROOT" "$CUSTOM_BRAND_MODEL"
# Download the core snap
if ! [ -f $snaps_dir/core_[0-9]*.snap ] ; then
_snap_preseed $CHROOT_ROOT core stable
fi
}
snap_preseed() {
# Preseed a snap in the image
local CHROOT_ROOT=$1
local SNAP=$2
local SNAP_NAME=${SNAP%/*}
# Per Ubuntu policy, all seeded snaps (with the exception of the core
# snap) must pull from stable/ubuntu-$(release_ver) as their channel.
local CHANNEL=${3:-"stable/ubuntu-$(release_ver)"}
snap_prepare $CHROOT_ROOT
_snap_preseed $CHROOT_ROOT $SNAP $CHANNEL
# Mark this image as having snapd installed explicitly.
case $SNAP_NAME in
snapd)
touch "$CHROOT_ROOT/var/lib/snapd/seed/.snapd-explicit-install-stamp"
;;
esac
}
is_live_layer () {
local pass=$1
for livepass in $LIVE_PASSES; do
[ "$livepass" != "$pass" ] && continue
return 0
done
return 1
}

@ -0,0 +1,433 @@
#!/usr/bin/python3
#-*- encoding: utf-8 -*-
"""
This script can be used instead of the traditional `snap` command to download
snaps and accompanying assertions. It uses the new store API (v2) which allows
creating temporary snapshots of the channel map.
To create such a snapshot run
snap-tool cohort-create
This will print a "cohort-key" to stdout, which can then be passed to future
invocations of `snap-tool download`. Whenever a cohort key is provided, the
store will provide a view of the channel map as it existed when the key was
created.
"""
from textwrap import dedent
import argparse
import base64
import binascii
import getopt
import hashlib
import json
import os
import re
import shutil
import subprocess
import sys
import urllib.error
import urllib.request
EXIT_OK = 0
EXIT_ERR = 1
class SnapError(Exception):
"""Generic error thrown by the Snap class."""
pass
class SnapCraftError(SnapError):
"""Error thrown on problems with the snapcraft APIs."""
pass
class SnapAssertionError(SnapError):
"""Error thrown on problems with the assertions API."""
pass
class Snap:
"""This class provides methods to retrieve information about a snap and
download it together with its assertions."""
def __init__(self, name, channel="stable", arch="amd64", series=16,
cohort_key=None, assertion_url="https://assertions.ubuntu.com",
snapcraft_url="https://api.snapcraft.io", **kwargs):
"""
:param name:
The name of the snap.
:param channel:
The channel to operate on.
:param arch:
The Debian architecture of the snap (e.g. amd64, armhf, arm64, ...).
:param series:
The device series. This should always be 16.
:param cohort_key:
A cohort key to access a snapshot of the channel map.
"""
self._name = name
self._channel = channel
self._arch = arch
self._series = series
self._cohort_key = cohort_key
self._assertion_url = assertion_url
self._snapcraft_url = snapcraft_url
self._details = None
self._assertions = {}
@classmethod
def cohort_create(cls):
"""Get a cohort key for the current moment. A cohort key is valid
across all snaps, channels and architectures."""
return Snap("core")\
.get_details(cohort_create=True)\
.get("cohort-key")
def download(self, download_assertions=True):
"""Download the snap container. If download_assertions is True, the
corresponding assertions will be downloaded, as well."""
snap = self.get_details()
snap_name = snap["name"]
snap_revision = snap["revision"]
publisher_id = snap["publisher"]["id"]
snap_download_url = snap["download"]["url"]
snap_byte_size = snap["download"]["size"]
filename = snap_name + "_" + str(snap_revision)
snap_filename = filename + ".snap"
assert_filename = filename + ".assert"
skip_snap_download = False
if os.path.exists(snap_filename) and os.path.getsize(snap_filename) \
== snap_byte_size:
skip_snap_download = True
headers = {}
if os.environ.get("SNAPPY_STORE_NO_CDN", "0") != "0":
headers.update({
"X-Ubuntu-No-Cdn": "true",
"Snap-CDN": "none",
})
request = urllib.request.Request(snap_download_url, headers=headers)
if not skip_snap_download:
with urllib.request.urlopen(request) as response, \
open(snap_filename, "wb+") as fp:
shutil.copyfileobj(response, fp)
if not download_assertions:
return
required_assertions = [
"account-key",
"account",
"snap-declaration",
"snap-revision",
]
if publisher_id == "canonical":
required_assertions.remove("account")
for assertion_name in required_assertions:
attr_name = "get_assertion_" + assertion_name.replace("-", "_")
# This will populate self._assertions[<assertion_name>].
getattr(self, attr_name)()
with open(assert_filename, "w+", encoding="utf-8") as fp:
fp.write("\n".join(self._assertions[a] for a in
required_assertions))
def get_details(self, cohort_create=False):
"""Get details about the snap. On subsequent calls, the cached results
are returned. If cohort_create is set to True, a cohort key will be
created and included in the result."""
if self._details and not cohort_create:
return self._details
if self.is_core_snap() and self._channel.startswith("stable/ubuntu-"):
sys.stderr.write(
"WARNING: switching channel from '{}' to 'stable' for '{}' "
"snap.\n".format(self._channel, self._name)
)
self._channel = "stable"
path = "/v2/snaps/refresh"
data = {
"context": [],
"actions": [
{
"action": "download",
"instance-key": "0",
"name": self._name,
"channel": self._channel,
}
],
"fields": [
"base",
"created-at",
"download",
"license",
"name",
"prices",
"publisher",
"revision",
"snap-id",
"summary",
"title",
"type",
"version",
],
}
# These are mutually exclusive.
if cohort_create:
data["actions"][0]["cohort-create"] = True
elif self._cohort_key:
data["actions"][0]["cohort-key"] = self._cohort_key
request_json = json.dumps(data, ensure_ascii=False).encode("utf-8")
try:
response_dict = self._do_snapcraft_request(path, data=request_json)
except SnapCraftError as e:
raise SnapError("failed to get details for '{}': {}"
.format(self._name, str(e)))
snap_data = response_dict["results"][0]
# Have "base" initialized to something meaningful.
if self.is_core_snap():
snap_data["snap"]["base"] = ""
elif snap_data["snap"].get("base") is None:
snap_data["snap"]["base"] = "core"
# Copy the key into the snap details.
if "cohort-key" in snap_data:
snap_data["snap"]["cohort-key"] = snap_data["cohort-key"]
if "error" in snap_data:
raise SnapError(
"failed to get details for '{}' in '{}' on '{}': {}"
.format(self._name, self._channel, self._arch,
snap_data["error"]["message"])
)
self._details = snap_data["snap"]
return self._details
def get_assertion_snap_revision(self):
"""Download the snap-revision assertion associated with this snap. The
assertion is returned as a string."""
if "snap-revision" in self._assertions:
return self._assertions["snap-revision"]
snap = self.get_details()
snap_sha3_384 = base64.urlsafe_b64encode(
binascii.a2b_hex(snap["download"]["sha3-384"])
).decode("us-ascii")
data = self._do_assertion_request("/v1/assertions/snap-revision/{}"
.format(snap_sha3_384))
self._assertions["snap-revision"] = data
return data
def get_assertion_snap_declaration(self):
"""Download the snap-declaration assertion associated with this snap.
The assertion is returned as a string."""
if "snap-declaration" in self._assertions:
return self._assertions["snap-declaration"]
snap = self.get_details()
series = self._series
snap_id = snap["snap-id"]
data = self._do_assertion_request(
"/v1/assertions/snap-declaration/{}/{}"
.format(series, snap_id))
self._assertions["snap-declaration"] = data
return data
def get_assertion_account(self):
"""Download the account assertion associated with this snap. The
assertion is returned as a string."""
if "account" in self._assertions:
return self._assertions["account"]
snap = self.get_details()
publisher_id = snap["publisher"]["id"]
data = self._do_assertion_request("/v1/assertions/account/{}"
.format(publisher_id))
self._assertions["account"] = data
return data
def get_assertion_account_key(self):
"""Download the account-key assertion associated with this snap. The
assertion will be returned as a string."""
if "account-key" in self._assertions:
return self._assertions["account-key"]
declaration_data = self.get_assertion_snap_declaration()
sign_key_sha3 = None
for line in declaration_data.splitlines():
if line.startswith("sign-key-sha3-384:"):
sign_key_sha3 = line.split(":")[1].strip()
data = self._do_assertion_request("/v1/assertions/account-key/{}"
.format(sign_key_sha3))
self._assertions["account-key"] = data
return data
def is_core_snap(self):
return re.match(r"^core\d*$", self._name) != None
def _do_assertion_request(self, path):
url = self._assertion_url + path
headers = {
"Accept": "application/x.ubuntu.assertion",
}
request = urllib.request.Request(url, headers=headers)
try:
with urllib.request.urlopen(request) as response:
body = response.read()
except urllib.error.HTTPError as e:
raise SnapAssertionError(str(e))
return body.decode("utf-8")
def _do_snapcraft_request(self, path, data=None):
url = self._snapcraft_url + "/" + path
headers = {
"Snap-Device-Series": str(self._series),
"Snap-Device-Architecture": self._arch,
"Content-Type": "application/json",
}
request = urllib.request.Request(url, data=data, headers=headers)
try:
with urllib.request.urlopen(request) as response:
body = response.read()
except urllib.error.HTTPError as e:
raise SnapCraftError(str(e))
try:
response_data = json.loads(body, encoding="utf-8")
except json.JSONDecodeError as e:
raise SnapCraftError("failed to decode response body: " + str(e))
return response_data
class SnapCli:
def __call__(self, args):
"""Parse the command line arguments and execute the selected command."""
options = self._parse_opts(args)
try:
options.func(getattr(options, "snap", None), **vars(options))
except SnapError as e:
sys.stderr.write("snap-tool {}: {}\n".format(
options.command, str(e)))
return EXIT_ERR
return EXIT_OK
@staticmethod
def _get_host_deb_arch():
result = subprocess.run(["dpkg", "--print-architecture"],
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
universal_newlines=True, check=True)
return result.stdout.strip()
def _parse_opts(self, args):
main_parser = argparse.ArgumentParser()
subparsers = main_parser.add_subparsers(dest="command")
parser_cohort_create = subparsers.add_parser("cohort-create",
help="Create a cohort key for the snap store channel map.")
parser_cohort_create.set_defaults(func=self._cohort_create)
parser_download = subparsers.add_parser("download",
help="Download a snap from the store.")
parser_download.set_defaults(func=self._download)
parser_info = subparsers.add_parser("info",
help="Retrieve information about a snap.")
parser_info.set_defaults(func=self._info)
# Add common parameters.
for parser in [parser_download, parser_info]:
parser.add_argument("--cohort-key", dest="cohort_key",
help="A cohort key to pin the channel map to.", type=str)
parser.add_argument("--channel", dest="channel",
help="The publication channel to query (default: stable).",
type=str, default="stable")
parser.add_argument("--series", dest="series",
help="The device series (default: 16)",
type=int, default=16)
parser.add_argument("--arch", dest="arch",
help="The Debian architecture (default: amd64).",
type=str, default=self._get_host_deb_arch())
parser.add_argument("snap", help="The name of the snap.")
if not args:
main_parser.print_help()
sys.exit(EXIT_ERR)
return main_parser.parse_args(args)
def _cohort_create(self, _, **kwargs):
print(Snap.cohort_create())
def _download(self, snap_name, **kwargs):
Snap(snap_name, **kwargs).download()
def _info(self, snap_name, **kwargs):
snap = Snap(snap_name, **kwargs)
info = snap.get_details()
print(dedent("""\
name: {}
summary: {}
arch: {}
base: {}
channel: {}
publisher: {}
license: {}
snap-id: {}
revision: {}"""
.format(
snap_name,
info.get("summary", ""),
snap._arch,
info.get("base"),
snap._channel,
info.get("publisher", {}).get("display-name", ""),
info.get("license", ""),
info.get("snap-id", ""),
info.get("revision", "")
))
)
if __name__ == "__main__":
try:
rval = SnapCli()(sys.argv[1:])
except KeyboardInterrupt:
sys.stderr.write("snap-tool: caught keyboard interrupt, exiting.\n")
sys.exit(EXIT_ERR)
sys.exit(rval)
Loading…
Cancel
Save