From fed562405d76ab85e2aab96ec0357457eebafcb2 Mon Sep 17 00:00:00 2001 From: Dan Streetman Date: Wed, 25 Mar 2020 15:49:53 -0400 Subject: [PATCH] archive: simplify handling of dsc file There are only 2 options for getting the dsc file: 1) it's provided as 'dscfile' param 2) we look it up via lp_spph For case #1, update the constructor to immediately read the provided file to create the self._dsc object, and set package and version from that. For case #2, simplify various functions to simply use the normal _download_file_from_urls() function to get the dsc file from the normal locations (mirrors, masters, directly from lp), using the same process as any other file, including checking checksum(s). Signed-off-by: Dan Streetman --- ubuntutools/archive.py | 126 +++++++++++++---------------------------- 1 file changed, 38 insertions(+), 88 deletions(-) diff --git a/ubuntutools/archive.py b/ubuntutools/archive.py index 1f23cb9..cfb0da3 100644 --- a/ubuntutools/archive.py +++ b/ubuntutools/archive.py @@ -40,7 +40,6 @@ import sys from debian.changelog import Changelog import debian.deb822 -import httplib2 from contextlib import closing @@ -169,9 +168,17 @@ class SourcePackage(object): suffix))) self.masters = list(filter(None, masters)) - # if a dsc was specified, pull it to get the source/version info + # If provided a dscfile, process it now to set our source and version if self._dsc_source: - self.pull_dsc() + filename = os.path.basename(urlparse(self._dsc_source).path) + dst = os.path.join(self.workdir, filename) + download(self._dsc_source, dst) + with open(dst, 'rb') as f: + self._dsc = Dsc(f.read()) + self.source = self._dsc['Source'] + self._version = Version(self._dsc['Version']) + if self._verify_signature: + self._check_dsc_signature() @property def lp_spph(self): @@ -270,15 +277,18 @@ class SourcePackage(object): @property def dsc(self): - "Return a the Dsc" - if self._dsc is None: + "Return the Dsc" + if not self._dsc: self.pull_dsc() + with open(self.dsc_pathname, 'rb') as f: + self._dsc = Dsc(f.read()) + if self._verify_signature: + self._check_dsc_signature() return self._dsc def getDistribution(self): if not self._distribution: self._distribution = Distribution(self.distribution) - return self._distribution def getArchive(self): @@ -320,95 +330,31 @@ class SourcePackage(object): if bpph.getUrl(): yield bpph.getUrl() - def pull_dsc(self): - "Retrieve dscfile and parse" - if self._dsc_source: - parsed = urlparse(self._dsc_source) - if parsed.scheme == '': - self._dsc_source = 'file://' + os.path.abspath(self._dsc_source) - parsed = urlparse(self._dsc_source) - self._download_dsc(self._dsc_source) - else: - for url in self._source_urls(self.dsc_name): - dlerr = None - try: - self._download_dsc(url) - break - except DownloadError as e: - dlerr = e - else: - raise dlerr - self._check_dsc() - - def _download_dsc(self, url): - "Download specified dscfile and parse" - parsed = urlparse(url) - if parsed.scheme == 'file': - Logger.debug("Using dsc file '%s'" % parsed.path) - with open(parsed.path, 'rb') as f: - body = f.read() - else: - try: - Logger.debug("Trying dsc url '%s'" % url) - response, body = httplib2.Http().request(url) - except httplib2.HttpLib2Error as e: - raise DownloadError(e) - if response.status != 200: - raise DownloadError("%s: %s %s" % (url, response.status, - response.reason)) - self._dsc = Dsc(body) - - def _check_dsc(self): - "Check that the dsc matches what we are expecting" - assert self._dsc is not None - self.source = self.dsc['Source'] - self._version = Version(self.dsc['Version']) - - valid = False - no_pub_key = False - message = None - gpg_info = None + def _check_dsc_signature(self): + "Check that the dsc signature matches what we are expecting" try: gpg_info = self.dsc.get_gpg_info(( '/usr/share/keyrings/debian-keyring.gpg', '/usr/share/keyrings/debian-maintainers.gpg', )) - valid = gpg_info.valid() except IOError: - message = ('Signature on %s could not be verified, install ' - 'debian-keyring' % self.dsc_name) - if message is None: - if valid: - message = 'Valid signature' - else: - message = ('Signature on %s could not be verified' - % self.dsc_name) - if gpg_info is not None: + Logger.debug('Signature on %s could not be verified, install ' + 'debian-keyring' % self.dsc_name) + return + if gpg_info.valid(): if 'GOODSIG' in gpg_info: - message = ('Good signature by %s (0x%s)' - % (gpg_info['GOODSIG'][1], gpg_info['GOODSIG'][0])) + Logger.info('Good signature by %s (0x%s)' + % (gpg_info['GOODSIG'][1], gpg_info['GOODSIG'][0])) elif 'VALIDSIG' in gpg_info: - message = 'Valid signature by 0x%s' % gpg_info['VALIDSIG'][0] - elif 'NO_PUBKEY' in gpg_info: - no_pub_key = True - message = 'Public key not found, could not verify signature' - if self._verify_signature: - if valid: - Logger.info(message) - elif no_pub_key: - Logger.warning(message) + Logger.info('Valid signature by 0x%s' % gpg_info['VALIDSIG'][0]) else: - Logger.error(message) - raise DownloadError(message) + Logger.info('Valid signature') + elif 'NO_PUBKEY' in gpg_info: + Logger.warning('Public key not found, could not verify signature') + elif 'NODATA' in gpg_info: + Logger.warning('Package is not signed') else: - Logger.debug(message) - - def _write_dsc(self): - "Write dsc file to workdir" - if self._dsc is None: - self.pull_dsc() - with open(self.dsc_pathname, 'wb') as f: - f.write(self.dsc.raw_text) + Logger.warning('Signature on %s could not be verified' % self.dsc_name) def _verify_file(self, pathname, dscverify=False, sha1sum=False, sha256sum=False, size=0): if not os.path.exists(pathname): @@ -423,7 +369,7 @@ class SourcePackage(object): return False return True - def _download_file(self, url, filename, size, dscverify=False, sha1sum=None, sha256sum=None): + def _download_file(self, url, filename, size=0, dscverify=False, sha1sum=None, sha256sum=None): "Download url to filename in workdir." pathname = os.path.join(self.workdir, filename) @@ -443,7 +389,7 @@ class SourcePackage(object): return self._verify_file(pathname, dscverify, sha1sum, sha256sum, size) - def _download_file_from_urls(self, urls, filename, size, dscverify=False, + def _download_file_from_urls(self, urls, filename, size=0, dscverify=False, sha1sum=None, sha256sum=None): "Try to download a file from a list of urls." for url in urls: @@ -461,9 +407,13 @@ class SourcePackage(object): Logger.error('URL Error: %s', e.reason) raise DownloadError('Failed to download %s' % filename) + def pull_dsc(self): + "Pull dscfile into workdir" + urls = self._source_urls(self.dsc_name) + self._download_file_from_urls(urls, self.dsc_name) + def pull(self): "Pull into workdir" - self._write_dsc() for entry in self.dsc['Files']: name = entry['name'] urls = self._source_urls(name)