Use snap cli rather than custom snap-tool (LP: #1864252)
snap-tool was added to support a deprecate cohort-key feature of the snap store. Recent changes in snap assertions have added additional fields which snap-tool is not retrieving. This resulted in snap install failures on first boot. This patch removes snap-tool and returns to using the snap cli. This ensures snap downloads will function without odd incompatibilities.sil2100/appliance-images
parent
d516e68807
commit
ec954a80b9
@ -1,603 +0,0 @@
|
|||||||
#!/usr/bin/python3
|
|
||||||
#-*- encoding: utf-8 -*-
|
|
||||||
"""
|
|
||||||
This script can be used instead of the traditional `snap` command to download
|
|
||||||
snaps and accompanying assertions. It uses the new store API (v2) which allows
|
|
||||||
creating temporary snapshots of the channel map.
|
|
||||||
|
|
||||||
To create such a snapshot run
|
|
||||||
|
|
||||||
snap-tool cohort-create
|
|
||||||
|
|
||||||
This will print a "cohort-key" to stdout, which can then be passed to future
|
|
||||||
invocations of `snap-tool download`. Whenever a cohort key is provided, the
|
|
||||||
store will provide a view of the channel map as it existed when the key was
|
|
||||||
created.
|
|
||||||
"""
|
|
||||||
|
|
||||||
from textwrap import dedent
|
|
||||||
|
|
||||||
import argparse
|
|
||||||
import base64
|
|
||||||
import binascii
|
|
||||||
import getopt
|
|
||||||
import hashlib
|
|
||||||
import json
|
|
||||||
import os
|
|
||||||
import re
|
|
||||||
import shutil
|
|
||||||
import subprocess
|
|
||||||
import sys
|
|
||||||
import time
|
|
||||||
import urllib.error
|
|
||||||
import urllib.request
|
|
||||||
|
|
||||||
EXIT_OK = 0
|
|
||||||
EXIT_ERR = 1
|
|
||||||
|
|
||||||
|
|
||||||
class SnapError(Exception):
|
|
||||||
"""Generic error thrown by the Snap class."""
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
class SnapCraftError(SnapError):
|
|
||||||
"""Error thrown on problems with the snapcraft APIs."""
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
class SnapAssertionError(SnapError):
|
|
||||||
"""Error thrown on problems with the assertions API."""
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
class ExpBackoffHTTPClient:
|
|
||||||
"""This class is an abstraction layer on top of urllib with additional
|
|
||||||
retry logic for more reliable downloads."""
|
|
||||||
|
|
||||||
class Request:
|
|
||||||
"""This is a convenience wrapper around urllib.request."""
|
|
||||||
|
|
||||||
def __init__(self, request, do_retry, base_interval, num_tries):
|
|
||||||
"""
|
|
||||||
:param request:
|
|
||||||
An urllib.request.Request instance.
|
|
||||||
:param do_retry:
|
|
||||||
Whether to enable the exponential backoff and retry logic.
|
|
||||||
:param base_interval:
|
|
||||||
The initial interval to sleep after a failed attempt.
|
|
||||||
:param num_tries:
|
|
||||||
How many attempts to make.
|
|
||||||
"""
|
|
||||||
self._request = request
|
|
||||||
self._do_retry = do_retry
|
|
||||||
self._base_interval = base_interval
|
|
||||||
self._num_tries = num_tries
|
|
||||||
self._response = None
|
|
||||||
|
|
||||||
def open(self):
|
|
||||||
"""Open the connection."""
|
|
||||||
if not self._response:
|
|
||||||
self._response = self._retry_urlopen()
|
|
||||||
|
|
||||||
def close(self):
|
|
||||||
"""Close the connection."""
|
|
||||||
if self._response:
|
|
||||||
self._response.close()
|
|
||||||
self._response = None
|
|
||||||
|
|
||||||
def data(self):
|
|
||||||
"""Return the raw response body."""
|
|
||||||
with self:
|
|
||||||
return self.read()
|
|
||||||
|
|
||||||
def json(self):
|
|
||||||
"""Return the deserialized response body interpreted as JSON."""
|
|
||||||
return json.loads(self.data(), encoding="utf-8")
|
|
||||||
|
|
||||||
def text(self):
|
|
||||||
"""Return the response body as a unicode string."""
|
|
||||||
encoding = "utf-8"
|
|
||||||
|
|
||||||
with self:
|
|
||||||
content_type = self._response.getheader("Content-Type", "")
|
|
||||||
|
|
||||||
if content_type == "application/json":
|
|
||||||
encoding = "utf-8"
|
|
||||||
else:
|
|
||||||
m = re.match(r"text/\S+;\s*charset=(?P<charset>\S+)",
|
|
||||||
content_type)
|
|
||||||
if m:
|
|
||||||
encoding=m.group("charset")
|
|
||||||
|
|
||||||
return self.read().decode(encoding)
|
|
||||||
|
|
||||||
def read(self, size=None):
|
|
||||||
"""Read size bytes from the response. If size if not set, the
|
|
||||||
complete response body is read in."""
|
|
||||||
return self._response.read(size)
|
|
||||||
|
|
||||||
def __enter__(self):
|
|
||||||
"""Make this class a context manager."""
|
|
||||||
self.open()
|
|
||||||
return self
|
|
||||||
|
|
||||||
def __exit__(self, type, value, traceback):
|
|
||||||
"""Make this class a context manager."""
|
|
||||||
self.close()
|
|
||||||
|
|
||||||
def _retry_urlopen(self):
|
|
||||||
"""Try to open the HTTP connection as many times as configured
|
|
||||||
through the constructor. Every time an error occurs, double the
|
|
||||||
time to wait until the next attempt."""
|
|
||||||
for attempt in range(self._num_tries):
|
|
||||||
try:
|
|
||||||
return urllib.request.urlopen(self._request)
|
|
||||||
except Exception as e:
|
|
||||||
if isinstance(e, urllib.error.HTTPError) and e.code < 500:
|
|
||||||
raise
|
|
||||||
if attempt >= self._num_tries - 1:
|
|
||||||
raise
|
|
||||||
sys.stderr.write(
|
|
||||||
"WARNING: failed to open URL '{}': {}\n"
|
|
||||||
.format(self._request.full_url, str(e))
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
break
|
|
||||||
|
|
||||||
sleep_interval = self._base_interval * 2**attempt
|
|
||||||
sys.stderr.write(
|
|
||||||
"Retrying HTTP request in {} seconds...\n"
|
|
||||||
.format(sleep_interval)
|
|
||||||
)
|
|
||||||
time.sleep(sleep_interval)
|
|
||||||
|
|
||||||
|
|
||||||
def __init__(self, do_retry=True, base_interval=2, num_tries=8):
|
|
||||||
"""
|
|
||||||
:param do_retry:
|
|
||||||
Whether to enable the retry logic.
|
|
||||||
:param base_interval:
|
|
||||||
The initial interval to sleep after a failed attempt.
|
|
||||||
:param num_tries:
|
|
||||||
How many attempts to make.
|
|
||||||
"""
|
|
||||||
self._do_retry = do_retry
|
|
||||||
self._base_interval = base_interval
|
|
||||||
self._num_tries = num_tries if do_retry else 1
|
|
||||||
|
|
||||||
def get(self, url, headers=None):
|
|
||||||
"""Create a GET request that can be used to retrieve the resource
|
|
||||||
at the given URL.
|
|
||||||
|
|
||||||
:param url:
|
|
||||||
An HTTP URL.
|
|
||||||
:param headers:
|
|
||||||
A dictionary of extra headers to send along.
|
|
||||||
:return:
|
|
||||||
An ExpBackoffHTTPClient.Request instance.
|
|
||||||
"""
|
|
||||||
return self._prepare_request(url, headers=headers)
|
|
||||||
|
|
||||||
def post(self, url, data=None, json=None, headers=None):
|
|
||||||
"""Create a POST request that can be used to submit data to the
|
|
||||||
endpoint at the given URL."""
|
|
||||||
return self._prepare_request(
|
|
||||||
url, data=data, json_data=json, headers=headers
|
|
||||||
)
|
|
||||||
|
|
||||||
def _prepare_request(self, url, data=None, json_data=None, headers=None):
|
|
||||||
"""Prepare a Request instance that can be used to retrieve data from
|
|
||||||
and/or send data to the endpoint at the given URL.
|
|
||||||
|
|
||||||
:param url:
|
|
||||||
An HTTP URL.
|
|
||||||
:param data:
|
|
||||||
Raw binary data to send along in the request body.
|
|
||||||
:param json_data:
|
|
||||||
A Python data structure to be serialized and sent out in JSON
|
|
||||||
format.
|
|
||||||
:param headers:
|
|
||||||
A dictionary of extra headers to send along.
|
|
||||||
:return:
|
|
||||||
An ExpBackoffHTTPClient.Request instance.
|
|
||||||
"""
|
|
||||||
if data is not None and json_data is not None:
|
|
||||||
raise ValueError(
|
|
||||||
"Parameters 'data' and 'json_data' are mutually exclusive."
|
|
||||||
)
|
|
||||||
|
|
||||||
if json_data:
|
|
||||||
data = json.dumps(json_data, ensure_ascii=False)
|
|
||||||
if headers is None:
|
|
||||||
headers = {}
|
|
||||||
headers["Content-Type"] = "application/json"
|
|
||||||
if isinstance(data, str):
|
|
||||||
data = data.encode("utf-8")
|
|
||||||
|
|
||||||
return ExpBackoffHTTPClient.Request(
|
|
||||||
urllib.request.Request(url, data=data, headers=headers or {}),
|
|
||||||
self._do_retry,
|
|
||||||
self._base_interval,
|
|
||||||
self._num_tries
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
class Snap:
|
|
||||||
"""This class provides methods to retrieve information about a snap and
|
|
||||||
download it together with its assertions."""
|
|
||||||
|
|
||||||
def __init__(self, name, channel="stable", arch="amd64", series=16,
|
|
||||||
cohort_key=None, assertion_url="https://assertions.ubuntu.com",
|
|
||||||
snapcraft_url="https://api.snapcraft.io", **kwargs):
|
|
||||||
"""
|
|
||||||
:param name:
|
|
||||||
The name of the snap.
|
|
||||||
:param channel:
|
|
||||||
The channel to operate on.
|
|
||||||
:param arch:
|
|
||||||
The Debian architecture of the snap (e.g. amd64, armhf, arm64, ...).
|
|
||||||
:param series:
|
|
||||||
The device series. This should always be 16.
|
|
||||||
:param cohort_key:
|
|
||||||
A cohort key to access a snapshot of the channel map.
|
|
||||||
"""
|
|
||||||
self._name = name
|
|
||||||
self._channel = channel
|
|
||||||
self._arch = arch
|
|
||||||
self._series = series
|
|
||||||
self._cohort_key = cohort_key
|
|
||||||
self._assertion_url = assertion_url
|
|
||||||
self._snapcraft_url = snapcraft_url
|
|
||||||
self._details = None
|
|
||||||
self._assertions = {}
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def cohort_create(cls):
|
|
||||||
"""Get a cohort key for the current moment. A cohort key is valid
|
|
||||||
across all snaps, channels and architectures."""
|
|
||||||
return Snap("core")\
|
|
||||||
.get_details(cohort_create=True)\
|
|
||||||
.get("cohort-key")
|
|
||||||
|
|
||||||
def download(self, download_assertions=True):
|
|
||||||
"""Download the snap container. If download_assertions is True, the
|
|
||||||
corresponding assertions will be downloaded, as well."""
|
|
||||||
snap = self.get_details()
|
|
||||||
|
|
||||||
snap_name = snap["name"]
|
|
||||||
snap_revision = snap["revision"]
|
|
||||||
publisher_id = snap["publisher"]["id"]
|
|
||||||
snap_download_url = snap["download"]["url"]
|
|
||||||
snap_byte_size = snap["download"]["size"]
|
|
||||||
filename = snap_name + "_" + str(snap_revision)
|
|
||||||
snap_filename = filename + ".snap"
|
|
||||||
assert_filename = filename + ".assert"
|
|
||||||
|
|
||||||
skip_snap_download = False
|
|
||||||
|
|
||||||
if os.path.exists(snap_filename) and os.path.getsize(snap_filename) \
|
|
||||||
== snap_byte_size:
|
|
||||||
skip_snap_download = True
|
|
||||||
|
|
||||||
headers = {}
|
|
||||||
|
|
||||||
if os.environ.get("SNAPPY_STORE_NO_CDN", "0") != "0":
|
|
||||||
headers.update({
|
|
||||||
"X-Ubuntu-No-Cdn": "true",
|
|
||||||
"Snap-CDN": "none",
|
|
||||||
})
|
|
||||||
|
|
||||||
if not skip_snap_download:
|
|
||||||
http_client = ExpBackoffHTTPClient()
|
|
||||||
response = http_client.get(snap_download_url, headers=headers)
|
|
||||||
with response, open(snap_filename, "wb+") as fp:
|
|
||||||
shutil.copyfileobj(response, fp)
|
|
||||||
|
|
||||||
if os.path.getsize(snap_filename) != snap_byte_size:
|
|
||||||
raise SnapError(
|
|
||||||
"The downloaded snap does not have the expected size."
|
|
||||||
)
|
|
||||||
|
|
||||||
if not download_assertions:
|
|
||||||
return
|
|
||||||
|
|
||||||
required_assertions = [
|
|
||||||
"account-key",
|
|
||||||
"account",
|
|
||||||
"snap-declaration",
|
|
||||||
"snap-revision",
|
|
||||||
]
|
|
||||||
|
|
||||||
if publisher_id == "canonical":
|
|
||||||
required_assertions.remove("account")
|
|
||||||
|
|
||||||
for assertion_name in required_assertions:
|
|
||||||
attr_name = "get_assertion_" + assertion_name.replace("-", "_")
|
|
||||||
# This will populate self._assertions[<assertion_name>].
|
|
||||||
getattr(self, attr_name)()
|
|
||||||
|
|
||||||
with open(assert_filename, "w+", encoding="utf-8") as fp:
|
|
||||||
fp.write("\n".join(self._assertions[a] for a in
|
|
||||||
required_assertions))
|
|
||||||
|
|
||||||
def get_details(self, cohort_create=False):
|
|
||||||
"""Get details about the snap. On subsequent calls, the cached results
|
|
||||||
are returned. If cohort_create is set to True, a cohort key will be
|
|
||||||
created and included in the result."""
|
|
||||||
if self._details and not cohort_create:
|
|
||||||
return self._details
|
|
||||||
|
|
||||||
if self.is_core_snap() and self._channel.startswith("stable/ubuntu-"):
|
|
||||||
sys.stderr.write(
|
|
||||||
"WARNING: switching channel from '{}' to 'stable' for '{}' "
|
|
||||||
"snap.\n".format(self._channel, self._name)
|
|
||||||
)
|
|
||||||
self._channel = "stable"
|
|
||||||
|
|
||||||
path = "/v2/snaps/refresh"
|
|
||||||
|
|
||||||
data = {
|
|
||||||
"context": [],
|
|
||||||
"actions": [
|
|
||||||
{
|
|
||||||
"action": "download",
|
|
||||||
"instance-key": "0",
|
|
||||||
"name": self._name,
|
|
||||||
"channel": self._channel,
|
|
||||||
}
|
|
||||||
],
|
|
||||||
"fields": [
|
|
||||||
"base",
|
|
||||||
"created-at",
|
|
||||||
"download",
|
|
||||||
"license",
|
|
||||||
"name",
|
|
||||||
"prices",
|
|
||||||
"publisher",
|
|
||||||
"revision",
|
|
||||||
"snap-id",
|
|
||||||
"summary",
|
|
||||||
"title",
|
|
||||||
"type",
|
|
||||||
"version",
|
|
||||||
],
|
|
||||||
}
|
|
||||||
|
|
||||||
# These are mutually exclusive.
|
|
||||||
if cohort_create:
|
|
||||||
data["actions"][0]["cohort-create"] = True
|
|
||||||
elif self._cohort_key:
|
|
||||||
data["actions"][0]["cohort-key"] = self._cohort_key
|
|
||||||
|
|
||||||
try:
|
|
||||||
response_dict = self._do_snapcraft_request(path, json_data=data)
|
|
||||||
except SnapCraftError as e:
|
|
||||||
raise SnapError("failed to get details for '{}': {}"
|
|
||||||
.format(self._name, str(e)))
|
|
||||||
|
|
||||||
snap_data = response_dict["results"][0]
|
|
||||||
|
|
||||||
if snap_data.get("result") == "error":
|
|
||||||
raise SnapError(
|
|
||||||
"failed to get details for '{}': {}"
|
|
||||||
.format(self._name, snap_data.get("error", {}).get("message"))
|
|
||||||
)
|
|
||||||
|
|
||||||
# Have "base" initialized to something meaningful.
|
|
||||||
if self.is_core_snap():
|
|
||||||
snap_data["snap"]["base"] = ""
|
|
||||||
elif snap_data["snap"].get("base") is None:
|
|
||||||
snap_data["snap"]["base"] = "core"
|
|
||||||
|
|
||||||
# Copy the key into the snap details.
|
|
||||||
if "cohort-key" in snap_data:
|
|
||||||
snap_data["snap"]["cohort-key"] = snap_data["cohort-key"]
|
|
||||||
|
|
||||||
if "error" in snap_data:
|
|
||||||
raise SnapError(
|
|
||||||
"failed to get details for '{}' in '{}' on '{}': {}"
|
|
||||||
.format(self._name, self._channel, self._arch,
|
|
||||||
snap_data["error"]["message"])
|
|
||||||
)
|
|
||||||
|
|
||||||
self._details = snap_data["snap"]
|
|
||||||
return self._details
|
|
||||||
|
|
||||||
def get_assertion_snap_revision(self):
|
|
||||||
"""Download the snap-revision assertion associated with this snap. The
|
|
||||||
assertion is returned as a string."""
|
|
||||||
if "snap-revision" in self._assertions:
|
|
||||||
return self._assertions["snap-revision"]
|
|
||||||
snap = self.get_details()
|
|
||||||
|
|
||||||
snap_sha3_384 = base64.urlsafe_b64encode(
|
|
||||||
binascii.a2b_hex(snap["download"]["sha3-384"])
|
|
||||||
).decode("us-ascii")
|
|
||||||
|
|
||||||
data = self._do_assertion_request("/v1/assertions/snap-revision/{}"
|
|
||||||
.format(snap_sha3_384))
|
|
||||||
self._assertions["snap-revision"] = data
|
|
||||||
return data
|
|
||||||
|
|
||||||
def get_assertion_snap_declaration(self):
|
|
||||||
"""Download the snap-declaration assertion associated with this snap.
|
|
||||||
The assertion is returned as a string."""
|
|
||||||
if "snap-declaration" in self._assertions:
|
|
||||||
return self._assertions["snap-declaration"]
|
|
||||||
snap = self.get_details()
|
|
||||||
series = self._series
|
|
||||||
snap_id = snap["snap-id"]
|
|
||||||
|
|
||||||
data = self._do_assertion_request(
|
|
||||||
"/v1/assertions/snap-declaration/{}/{}"
|
|
||||||
.format(series, snap_id))
|
|
||||||
|
|
||||||
self._assertions["snap-declaration"] = data
|
|
||||||
return data
|
|
||||||
|
|
||||||
def get_assertion_account(self):
|
|
||||||
"""Download the account assertion associated with this snap. The
|
|
||||||
assertion is returned as a string."""
|
|
||||||
if "account" in self._assertions:
|
|
||||||
return self._assertions["account"]
|
|
||||||
snap = self.get_details()
|
|
||||||
publisher_id = snap["publisher"]["id"]
|
|
||||||
data = self._do_assertion_request("/v1/assertions/account/{}"
|
|
||||||
.format(publisher_id))
|
|
||||||
self._assertions["account"] = data
|
|
||||||
return data
|
|
||||||
|
|
||||||
def get_assertion_account_key(self):
|
|
||||||
"""Download the account-key assertion associated with this snap. The
|
|
||||||
assertion will be returned as a string."""
|
|
||||||
if "account-key" in self._assertions:
|
|
||||||
return self._assertions["account-key"]
|
|
||||||
|
|
||||||
declaration_data = self.get_assertion_snap_declaration()
|
|
||||||
sign_key_sha3 = None
|
|
||||||
|
|
||||||
for line in declaration_data.splitlines():
|
|
||||||
if line.startswith("sign-key-sha3-384:"):
|
|
||||||
sign_key_sha3 = line.split(":")[1].strip()
|
|
||||||
|
|
||||||
data = self._do_assertion_request("/v1/assertions/account-key/{}"
|
|
||||||
.format(sign_key_sha3))
|
|
||||||
|
|
||||||
self._assertions["account-key"] = data
|
|
||||||
return data
|
|
||||||
|
|
||||||
def is_core_snap(self):
|
|
||||||
return re.match(r"^core\d*$", self._name) != None
|
|
||||||
|
|
||||||
def _do_assertion_request(self, path):
|
|
||||||
url = self._assertion_url + path
|
|
||||||
|
|
||||||
headers = {
|
|
||||||
"Accept": "application/x.ubuntu.assertion",
|
|
||||||
}
|
|
||||||
|
|
||||||
http_client = ExpBackoffHTTPClient()
|
|
||||||
try:
|
|
||||||
with http_client.get(url, headers=headers) as response:
|
|
||||||
return response.text()
|
|
||||||
except urllib.error.HTTPError as e:
|
|
||||||
raise SnapAssertionError(str(e))
|
|
||||||
|
|
||||||
def _do_snapcraft_request(self, path, json_data=None):
|
|
||||||
url = self._snapcraft_url + "/" + path
|
|
||||||
|
|
||||||
headers = {
|
|
||||||
"Snap-Device-Series": str(self._series),
|
|
||||||
"Snap-Device-Architecture": self._arch,
|
|
||||||
}
|
|
||||||
|
|
||||||
http_client = ExpBackoffHTTPClient()
|
|
||||||
try:
|
|
||||||
response = http_client.post(url, json=json_data, headers=headers)
|
|
||||||
with response:
|
|
||||||
return response.json()
|
|
||||||
except urllib.error.HTTPError as e:
|
|
||||||
raise SnapCraftError(str(e))
|
|
||||||
|
|
||||||
|
|
||||||
class SnapCli:
|
|
||||||
|
|
||||||
def __call__(self, args):
|
|
||||||
"""Parse the command line arguments and execute the selected command."""
|
|
||||||
options = self._parse_opts(args)
|
|
||||||
|
|
||||||
try:
|
|
||||||
options.func(getattr(options, "snap", None), **vars(options))
|
|
||||||
except SnapError as e:
|
|
||||||
sys.stderr.write("snap-tool {}: {}\n".format(
|
|
||||||
options.command, str(e)))
|
|
||||||
return EXIT_ERR
|
|
||||||
return EXIT_OK
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def _get_host_deb_arch():
|
|
||||||
result = subprocess.run(["dpkg", "--print-architecture"],
|
|
||||||
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
|
|
||||||
universal_newlines=True, check=True)
|
|
||||||
|
|
||||||
return result.stdout.strip()
|
|
||||||
|
|
||||||
def _parse_opts(self, args):
|
|
||||||
main_parser = argparse.ArgumentParser()
|
|
||||||
subparsers = main_parser.add_subparsers(dest="command")
|
|
||||||
|
|
||||||
parser_cohort_create = subparsers.add_parser("cohort-create",
|
|
||||||
help="Create a cohort key for the snap store channel map.")
|
|
||||||
parser_cohort_create.set_defaults(func=self._cohort_create)
|
|
||||||
|
|
||||||
parser_download = subparsers.add_parser("download",
|
|
||||||
help="Download a snap from the store.")
|
|
||||||
parser_download.set_defaults(func=self._download)
|
|
||||||
|
|
||||||
parser_info = subparsers.add_parser("info",
|
|
||||||
help="Retrieve information about a snap.")
|
|
||||||
parser_info.set_defaults(func=self._info)
|
|
||||||
|
|
||||||
# Add common parameters.
|
|
||||||
for parser in [parser_download, parser_info]:
|
|
||||||
parser.add_argument("--cohort-key", dest="cohort_key",
|
|
||||||
help="A cohort key to pin the channel map to.", type=str)
|
|
||||||
parser.add_argument("--channel", dest="channel",
|
|
||||||
help="The publication channel to query (default: stable).",
|
|
||||||
type=str, default="stable")
|
|
||||||
parser.add_argument("--series", dest="series",
|
|
||||||
help="The device series (default: 16)",
|
|
||||||
type=int, default=16)
|
|
||||||
parser.add_argument("--arch", dest="arch",
|
|
||||||
help="The Debian architecture (default: amd64).",
|
|
||||||
type=str, default=self._get_host_deb_arch())
|
|
||||||
parser.add_argument("snap", help="The name of the snap.")
|
|
||||||
|
|
||||||
if not args:
|
|
||||||
main_parser.print_help()
|
|
||||||
sys.exit(EXIT_ERR)
|
|
||||||
|
|
||||||
return main_parser.parse_args(args)
|
|
||||||
|
|
||||||
def _cohort_create(self, _, **kwargs):
|
|
||||||
print(Snap.cohort_create())
|
|
||||||
|
|
||||||
def _download(self, snap_name, **kwargs):
|
|
||||||
Snap(snap_name, **kwargs).download()
|
|
||||||
|
|
||||||
def _info(self, snap_name, **kwargs):
|
|
||||||
snap = Snap(snap_name, **kwargs)
|
|
||||||
info = snap.get_details()
|
|
||||||
|
|
||||||
print(dedent("""\
|
|
||||||
name: {}
|
|
||||||
summary: {}
|
|
||||||
arch: {}
|
|
||||||
base: {}
|
|
||||||
channel: {}
|
|
||||||
publisher: {}
|
|
||||||
license: {}
|
|
||||||
snap-id: {}
|
|
||||||
revision: {}"""
|
|
||||||
.format(
|
|
||||||
snap_name,
|
|
||||||
info.get("summary", ""),
|
|
||||||
snap._arch,
|
|
||||||
info.get("base"),
|
|
||||||
snap._channel,
|
|
||||||
info.get("publisher", {}).get("display-name", ""),
|
|
||||||
info.get("license", ""),
|
|
||||||
info.get("snap-id", ""),
|
|
||||||
info.get("revision", "")
|
|
||||||
))
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
try:
|
|
||||||
rval = SnapCli()(sys.argv[1:])
|
|
||||||
except KeyboardInterrupt:
|
|
||||||
sys.stderr.write("snap-tool: caught keyboard interrupt, exiting.\n")
|
|
||||||
sys.exit(EXIT_ERR)
|
|
||||||
sys.exit(rval)
|
|
Loading…
Reference in new issue