archive: simplify handling of dsc file

There are only 2 options for getting the dsc file:
1) it's provided as 'dscfile' param
2) we look it up via lp_spph

For case #1, update the constructor to immediately read the provided file
to create the self._dsc object, and set package and version from that.

For case #2, simplify various functions to simply use the normal
_download_file_from_urls() function to get the dsc file from the
normal locations (mirrors, masters, directly from lp), using the same
process as any other file, including checking checksum(s).

Signed-off-by: Dan Streetman <ddstreet@canonical.com>
This commit is contained in:
Dan Streetman 2020-03-25 15:49:53 -04:00
parent 248cf38d79
commit fed562405d

View File

@ -40,7 +40,6 @@ import sys
from debian.changelog import Changelog from debian.changelog import Changelog
import debian.deb822 import debian.deb822
import httplib2
from contextlib import closing from contextlib import closing
@ -169,9 +168,17 @@ class SourcePackage(object):
suffix))) suffix)))
self.masters = list(filter(None, masters)) self.masters = list(filter(None, masters))
# if a dsc was specified, pull it to get the source/version info # If provided a dscfile, process it now to set our source and version
if self._dsc_source: if self._dsc_source:
self.pull_dsc() filename = os.path.basename(urlparse(self._dsc_source).path)
dst = os.path.join(self.workdir, filename)
download(self._dsc_source, dst)
with open(dst, 'rb') as f:
self._dsc = Dsc(f.read())
self.source = self._dsc['Source']
self._version = Version(self._dsc['Version'])
if self._verify_signature:
self._check_dsc_signature()
@property @property
def lp_spph(self): def lp_spph(self):
@ -270,15 +277,18 @@ class SourcePackage(object):
@property @property
def dsc(self): def dsc(self):
"Return a the Dsc" "Return the Dsc"
if self._dsc is None: if not self._dsc:
self.pull_dsc() self.pull_dsc()
with open(self.dsc_pathname, 'rb') as f:
self._dsc = Dsc(f.read())
if self._verify_signature:
self._check_dsc_signature()
return self._dsc return self._dsc
def getDistribution(self): def getDistribution(self):
if not self._distribution: if not self._distribution:
self._distribution = Distribution(self.distribution) self._distribution = Distribution(self.distribution)
return self._distribution return self._distribution
def getArchive(self): def getArchive(self):
@ -320,95 +330,31 @@ class SourcePackage(object):
if bpph.getUrl(): if bpph.getUrl():
yield bpph.getUrl() yield bpph.getUrl()
def pull_dsc(self): def _check_dsc_signature(self):
"Retrieve dscfile and parse" "Check that the dsc signature matches what we are expecting"
if self._dsc_source:
parsed = urlparse(self._dsc_source)
if parsed.scheme == '':
self._dsc_source = 'file://' + os.path.abspath(self._dsc_source)
parsed = urlparse(self._dsc_source)
self._download_dsc(self._dsc_source)
else:
for url in self._source_urls(self.dsc_name):
dlerr = None
try:
self._download_dsc(url)
break
except DownloadError as e:
dlerr = e
else:
raise dlerr
self._check_dsc()
def _download_dsc(self, url):
"Download specified dscfile and parse"
parsed = urlparse(url)
if parsed.scheme == 'file':
Logger.debug("Using dsc file '%s'" % parsed.path)
with open(parsed.path, 'rb') as f:
body = f.read()
else:
try:
Logger.debug("Trying dsc url '%s'" % url)
response, body = httplib2.Http().request(url)
except httplib2.HttpLib2Error as e:
raise DownloadError(e)
if response.status != 200:
raise DownloadError("%s: %s %s" % (url, response.status,
response.reason))
self._dsc = Dsc(body)
def _check_dsc(self):
"Check that the dsc matches what we are expecting"
assert self._dsc is not None
self.source = self.dsc['Source']
self._version = Version(self.dsc['Version'])
valid = False
no_pub_key = False
message = None
gpg_info = None
try: try:
gpg_info = self.dsc.get_gpg_info(( gpg_info = self.dsc.get_gpg_info((
'/usr/share/keyrings/debian-keyring.gpg', '/usr/share/keyrings/debian-keyring.gpg',
'/usr/share/keyrings/debian-maintainers.gpg', '/usr/share/keyrings/debian-maintainers.gpg',
)) ))
valid = gpg_info.valid()
except IOError: except IOError:
message = ('Signature on %s could not be verified, install ' Logger.debug('Signature on %s could not be verified, install '
'debian-keyring' % self.dsc_name) 'debian-keyring' % self.dsc_name)
if message is None: return
if valid: if gpg_info.valid():
message = 'Valid signature'
else:
message = ('Signature on %s could not be verified'
% self.dsc_name)
if gpg_info is not None:
if 'GOODSIG' in gpg_info: if 'GOODSIG' in gpg_info:
message = ('Good signature by %s (0x%s)' Logger.info('Good signature by %s (0x%s)'
% (gpg_info['GOODSIG'][1], gpg_info['GOODSIG'][0])) % (gpg_info['GOODSIG'][1], gpg_info['GOODSIG'][0]))
elif 'VALIDSIG' in gpg_info: elif 'VALIDSIG' in gpg_info:
message = 'Valid signature by 0x%s' % gpg_info['VALIDSIG'][0] Logger.info('Valid signature by 0x%s' % gpg_info['VALIDSIG'][0])
else:
Logger.info('Valid signature')
elif 'NO_PUBKEY' in gpg_info: elif 'NO_PUBKEY' in gpg_info:
no_pub_key = True Logger.warning('Public key not found, could not verify signature')
message = 'Public key not found, could not verify signature' elif 'NODATA' in gpg_info:
if self._verify_signature: Logger.warning('Package is not signed')
if valid:
Logger.info(message)
elif no_pub_key:
Logger.warning(message)
else: else:
Logger.error(message) Logger.warning('Signature on %s could not be verified' % self.dsc_name)
raise DownloadError(message)
else:
Logger.debug(message)
def _write_dsc(self):
"Write dsc file to workdir"
if self._dsc is None:
self.pull_dsc()
with open(self.dsc_pathname, 'wb') as f:
f.write(self.dsc.raw_text)
def _verify_file(self, pathname, dscverify=False, sha1sum=False, sha256sum=False, size=0): def _verify_file(self, pathname, dscverify=False, sha1sum=False, sha256sum=False, size=0):
if not os.path.exists(pathname): if not os.path.exists(pathname):
@ -423,7 +369,7 @@ class SourcePackage(object):
return False return False
return True return True
def _download_file(self, url, filename, size, dscverify=False, sha1sum=None, sha256sum=None): def _download_file(self, url, filename, size=0, dscverify=False, sha1sum=None, sha256sum=None):
"Download url to filename in workdir." "Download url to filename in workdir."
pathname = os.path.join(self.workdir, filename) pathname = os.path.join(self.workdir, filename)
@ -443,7 +389,7 @@ class SourcePackage(object):
return self._verify_file(pathname, dscverify, sha1sum, sha256sum, size) return self._verify_file(pathname, dscverify, sha1sum, sha256sum, size)
def _download_file_from_urls(self, urls, filename, size, dscverify=False, def _download_file_from_urls(self, urls, filename, size=0, dscverify=False,
sha1sum=None, sha256sum=None): sha1sum=None, sha256sum=None):
"Try to download a file from a list of urls." "Try to download a file from a list of urls."
for url in urls: for url in urls:
@ -461,9 +407,13 @@ class SourcePackage(object):
Logger.error('URL Error: %s', e.reason) Logger.error('URL Error: %s', e.reason)
raise DownloadError('Failed to download %s' % filename) raise DownloadError('Failed to download %s' % filename)
def pull_dsc(self):
"Pull dscfile into workdir"
urls = self._source_urls(self.dsc_name)
self._download_file_from_urls(urls, self.dsc_name)
def pull(self): def pull(self):
"Pull into workdir" "Pull into workdir"
self._write_dsc()
for entry in self.dsc['Files']: for entry in self.dsc['Files']:
name = entry['name'] name = entry['name']
urls = self._source_urls(name) urls = self._source_urls(name)