ubuntutools: add pull-* --no-verify-signature option, don't fail if no pub key

Change dsc verification to fail only if the public key was available, but
signature verification failed.  If no public key is available for the dsc,
print warning only. (LP: #1700846)

Also add pull-* parameter --no-verify-signature to manually prevent failure
when signature verification fails.
This commit is contained in:
Dan Streetman 2018-02-28 15:47:53 -05:00
parent 41a6c47ac2
commit 3a413760f3
3 changed files with 82 additions and 41 deletions

View File

@ -129,7 +129,7 @@ class SourcePackage(object):
def __init__(self, package=None, version=None, component=None,
dscfile=None, lp=None, mirrors=(), workdir='.', quiet=False,
series=None, pocket=None):
series=None, pocket=None, verify_signature=False):
"""Can be initialised using either package or dscfile.
If package is specified, either the version or series can also be
specified; using version will get the specific package version,
@ -149,6 +149,7 @@ class SourcePackage(object):
self._use_series = True
self._pocket = pocket
self._dsc_source = dscfile
self._verify_signature = verify_signature
# Cached values:
self._distribution = None
@ -321,7 +322,7 @@ class SourcePackage(object):
continue
yield (bpph.getFileName(), bpph.getUrl(), 0)
def pull_dsc(self, verify_signature=False):
def pull_dsc(self):
"Retrieve dscfile and parse"
if self._dsc_source:
parsed = urlparse(self._dsc_source)
@ -333,16 +334,18 @@ class SourcePackage(object):
url = self._lp_url(self.dsc_name)
self._download_dsc(url)
self._check_dsc(verify_signature=verify_signature)
self._check_dsc()
def _download_dsc(self, url):
"Download specified dscfile and parse"
parsed = urlparse(url)
if parsed.scheme == 'file':
with open(parsed.path, 'r') as f:
Logger.debug("Using dsc file '%s'" % parsed.path)
with open(parsed.path, 'rb') as f:
body = f.read()
else:
try:
Logger.debug("Trying dsc url '%s'" % url)
response, body = httplib2.Http().request(url)
except httplib2.HttpLib2Error as e:
raise DownloadError(e)
@ -351,13 +354,14 @@ class SourcePackage(object):
response.reason))
self._dsc = Dsc(body)
def _check_dsc(self, verify_signature=False):
def _check_dsc(self):
"Check that the dsc matches what we are expecting"
assert self._dsc is not None
self.source = self.dsc['Source']
self._version = Version(self.dsc['Version'])
valid = False
no_pub_key = False
message = None
gpg_info = None
try:
@ -381,19 +385,24 @@ class SourcePackage(object):
% (gpg_info['GOODSIG'][1], gpg_info['GOODSIG'][0]))
elif 'VALIDSIG' in gpg_info:
message = 'Valid signature by 0x%s' % gpg_info['VALIDSIG'][0]
if verify_signature:
elif 'NO_PUBKEY' in gpg_info:
no_pub_key = True
message = 'Public key not found, could not verify signature'
if self._verify_signature:
if valid:
Logger.normal(message)
elif no_pub_key:
Logger.warn(message)
else:
Logger.error(message)
raise DownloadError(message)
else:
Logger.info(message)
def _write_dsc(self, verify_signature=True):
def _write_dsc(self):
"Write dsc file to workdir"
if self._dsc is None:
self.pull_dsc(verify_signature=verify_signature)
self.pull_dsc()
with open(self.dsc_pathname, 'wb') as f:
f.write(self.dsc.raw_text)
@ -477,9 +486,9 @@ class SourcePackage(object):
return False
return True
def pull(self, verify_signature=False):
def pull(self):
"Pull into workdir"
self._write_dsc(verify_signature=verify_signature)
self._write_dsc()
for entry in self.dsc['Files']:
name = entry['name']
for url in self._source_urls(name):
@ -648,10 +657,10 @@ class DebianSourcePackage(SourcePackage):
continue
yield (f.name, f.getUrl(), f.size)
def pull_dsc(self, verify_signature=True):
def pull_dsc(self):
"Retrieve dscfile and parse"
try:
super(DebianSourcePackage, self).pull_dsc(verify_signature)
super(DebianSourcePackage, self).pull_dsc()
return
except DownloadError:
pass
@ -666,7 +675,7 @@ class DebianSourcePackage(SourcePackage):
break
else:
raise DownloadError('dsc could not be found anywhere')
self._check_dsc(verify_signature=verify_signature)
self._check_dsc()
# Local methods:
@property

View File

@ -83,6 +83,8 @@ def create_argparser(default_pull=None, default_distro=None, default_arch=None):
help='Preferred mirror(s)')
parser.add_argument('--no-conf', action='store_true',
help="Don't read config files or environment variables")
parser.add_argument('--no-verify-signature', action='store_true',
help="Don't fail if dsc signature can't be verified")
parser.add_argument('-a', '--arch', default=default_arch,
help=help_default_arch)
parser.add_argument('-p', '--pull', default=default_pull,
@ -177,6 +179,7 @@ def pull(options):
assert hasattr(options, 'verbose')
assert hasattr(options, 'download_only')
assert hasattr(options, 'no_conf')
assert hasattr(options, 'no_verify_signature')
# these are type string
assert hasattr(options, 'arch')
assert hasattr(options, 'pull')
@ -234,7 +237,8 @@ def pull(options):
pkgcls = DISTRO_PKG_CLASS[distro]
srcpkg = pkgcls(package=package, version=version,
series=release, pocket=pocket,
mirrors=mirrors, dscfile=dscfile)
mirrors=mirrors, dscfile=dscfile,
verify_signature=(not options.no_verify_signature))
spph = srcpkg.lp_spph
except PackageNotFoundException as e:
Logger.error(str(e))

View File

@ -145,11 +145,14 @@ class LocalSourcePackageTestCase(unittest.TestCase):
return self.request_404(url)
def test_local_copy(self):
pkg = self.SourcePackage('example', '1.0-1', 'main',
pkg = self.SourcePackage(package='example',
version='1.0-1',
component='main',
dscfile='test-data/example_1.0-1.dsc',
workdir=self.workdir)
workdir=self.workdir,
verify_signature=False)
pkg.quiet = True
pkg.pull(verify_signature=False)
pkg.pull()
pkg.unpack()
def test_workdir_srcpkg_noinfo(self):
@ -159,9 +162,10 @@ class LocalSourcePackageTestCase(unittest.TestCase):
pkg = self.SourcePackage(dscfile=os.path.join(self.workdir,
'example_1.0-1.dsc'),
workdir=self.workdir)
workdir=self.workdir,
verify_signature=False)
pkg.quiet = True
pkg.pull(verify_signature=False)
pkg.pull()
pkg.unpack()
def test_workdir_srcpkg_info(self):
@ -169,12 +173,14 @@ class LocalSourcePackageTestCase(unittest.TestCase):
shutil.copy2('test-data/example_1.0.orig.tar.gz', self.workdir)
shutil.copy2('test-data/example_1.0-1.debian.tar.xz', self.workdir)
pkg = self.SourcePackage('example', '1.0-1', 'main',
pkg = self.SourcePackage(package='example', version='1.0-1',
component='main',
dscfile=os.path.join(self.workdir,
'example_1.0-1.dsc'),
workdir=self.workdir)
workdir=self.workdir,
verify_signature=False)
pkg.quiet = True
pkg.pull(verify_signature=False)
pkg.pull()
pkg.unpack()
def test_verification(self):
@ -185,19 +191,25 @@ class LocalSourcePackageTestCase(unittest.TestCase):
'r+b') as f:
f.write(b'CORRUPTION')
pkg = self.SourcePackage('example', '1.0-1', 'main',
pkg = self.SourcePackage(package='example',
version='1.0-1',
component='main',
dscfile='test-data/example_1.0-1.dsc',
workdir=self.workdir)
workdir=self.workdir,
verify_signature=False)
pkg.quiet = True
pkg.pull(verify_signature=False)
pkg.pull()
def test_pull(self):
pkg = self.SourcePackage('example', '1.0-1', 'main',
workdir=self.workdir)
pkg = self.SourcePackage(package='example',
version='1.0-1',
component='main',
workdir=self.workdir,
verify_signature=False)
pkg.url_opener = self.url_opener
pkg.quiet = True
pkg.pull(verify_signature=False)
pkg.pull()
def test_mirrors(self):
mirror = 'http://mirror'
@ -209,15 +221,21 @@ class LocalSourcePackageTestCase(unittest.TestCase):
url_opener = mock.MagicMock(spec=OpenerDirector)
url_opener.open.side_effect = _callable_iter
pkg = self.SourcePackage('example', '1.0-1', 'main',
workdir=self.workdir, mirrors=[mirror])
pkg = self.SourcePackage(package='example',
version='1.0-1',
component='main',
workdir=self.workdir,
mirrors=[mirror],
verify_signature=False)
pkg.url_opener = url_opener
pkg.quiet = True
pkg.pull(verify_signature=False)
pkg.pull()
def test_dsc_missing(self):
self.mock_http.side_effect = self.request_404
pkg = self.SourcePackage('example', '1.0-1', 'main',
pkg = self.SourcePackage(package='example',
version='1.0-1',
component='main',
workdir=self.workdir)
pkg.quiet = True
self.assertRaises(ubuntutools.archive.DownloadError, pkg.pull)
@ -243,12 +261,15 @@ class DebianLocalSourcePackageTestCase(LocalSourcePackageTestCase):
url_opener = mock.MagicMock(spec=OpenerDirector)
url_opener.open.side_effect = _callable_iter
pkg = self.SourcePackage('example', '1.0-1', 'main',
workdir=self.workdir, mirrors=[debian_mirror,
debsec_mirror])
pkg = self.SourcePackage(package='example',
version='1.0-1',
component='main',
workdir=self.workdir,
mirrors=[debian_mirror, debsec_mirror],
verify_signature=False)
pkg.quiet = True
pkg.url_opener = url_opener
pkg.pull(verify_signature=False)
pkg.pull()
pkg.unpack()
def test_dsc_missing(self):
@ -262,10 +283,14 @@ class DebianLocalSourcePackageTestCase(LocalSourcePackageTestCase):
'[GNUPG:] GOODSIG DEADBEEF Joe Developer '
'<joe@example.net>')
pkg = self.SourcePackage('example', '1.0-1', 'main',
workdir=self.workdir, mirrors=[mirror])
pkg = self.SourcePackage(package='example',
version='1.0-1',
component='main',
workdir=self.workdir,
mirrors=[mirror],
verify_signature=False)
pkg.url_opener = self.url_opener
pkg.pull(verify_signature=False)
pkg.pull()
def test_dsc_badsig(self):
mirror = 'http://mirror'
@ -277,8 +302,11 @@ class DebianLocalSourcePackageTestCase(LocalSourcePackageTestCase):
mock_gpg_info.return_value = debian.deb822.GpgInfo.from_output(
'[GNUPG:] ERRSIG DEADBEEF')
pkg = self.SourcePackage('example', '1.0-1', 'main',
workdir=self.workdir, mirrors=[mirror])
pkg = self.SourcePackage(package='example',
version='1.0-1',
component='main',
workdir=self.workdir,
mirrors=[mirror])
try:
self.assertRaises(ubuntutools.archive.DownloadError, pkg.pull)
except URLError: