From 1b12d8b4e3315de3bf417b40a3c66279f309d72c Mon Sep 17 00:00:00 2001 From: Dan Streetman Date: Tue, 13 Jul 2021 15:54:37 -0400 Subject: [PATCH] archive: convert to use pathlib instead of os.path Mostly convert to using pathlib --- ubuntutools/archive.py | 64 ++++++++++++++++++++++-------------------- 1 file changed, 33 insertions(+), 31 deletions(-) diff --git a/ubuntutools/archive.py b/ubuntutools/archive.py index 75902b5..4efac42 100644 --- a/ubuntutools/archive.py +++ b/ubuntutools/archive.py @@ -31,7 +31,7 @@ from urllib.request import urlopen import codecs import functools import json -import os.path +import os import re import subprocess import sys @@ -44,6 +44,8 @@ import debian.deb822 from contextlib import closing +from pathlib import Path + from ubuntutools.config import UDTConfig from ubuntutools.lp.lpapicache import (Launchpad, Distribution, @@ -86,11 +88,12 @@ class Dsc(debian.deb822.Dsc): def verify_file(self, pathname): "Verify that pathname matches the checksums in the dsc" - if not os.path.isfile(pathname): + p = Path(pathname) + if not p.is_file(): return False alg, checksums = self.get_strongest_checksum() - size, digest = checksums[os.path.basename(pathname)] - return verify_file_checksum(pathname, alg, digest, size) + size, digest = checksums[p.name] + return verify_file_checksum(p, alg, digest, size) def compare_dsc(self, other): """Check whether any files in these two dscs that have the same name @@ -142,7 +145,7 @@ class SourcePackage(ABC): """ dscfile = kwargs.get('dscfile') mirrors = kwargs.get('mirrors', ()) - workdir = kwargs.get('workdir', '.') + workdir = kwargs.get('workdir') series = kwargs.get('series') pocket = kwargs.get('pocket') status = kwargs.get('status') @@ -161,11 +164,11 @@ class SourcePackage(ABC): self.source = package self.binary = None self.try_binary = try_binary - self.workdir = workdir + self.workdir = Path(workdir) if workdir else Path('.') self._series = series self._pocket = pocket self._status = status - self._dsc_source = dscfile + self._dsc_source = Path(dscfile) if dscfile else None self._verify_signature = verify_signature # Cached values: @@ -270,12 +273,12 @@ class SourcePackage(ABC): @property def dsc_name(self): "Return the source package dsc filename for the given package" - return '%s_%s.dsc' % (self.source, self.version.strip_epoch()) + return f'{self.source}_{self.version.strip_epoch()}.dsc' @property def dsc_pathname(self): "Return the dsc_name, with the workdir path" - return os.path.join(self.workdir, self.dsc_name) + return str(self.workdir / self.dsc_name) @property def dsc(self): @@ -285,10 +288,9 @@ class SourcePackage(ABC): raise RuntimeError('Internal error: we have a dsc file but dsc not set') urls = self._source_urls(self.dsc_name) with tempfile.TemporaryDirectory() as d: - tmpdsc = os.path.join(d, self.dsc_name) + tmpdsc = Path(d) / self.dsc_name self._download_file_from_urls(urls, tmpdsc) - with open(tmpdsc, 'rb') as f: - self._dsc = Dsc(f.read()) + self._dsc = Dsc(tmpdsc.read_bytes()) self._check_dsc_signature() return self._dsc @@ -321,7 +323,7 @@ class SourcePackage(ABC): def _source_urls(self, name): "Generator of sources for name" if self._dsc_source: - yield os.path.join(os.path.dirname(self._dsc_source), name) + yield str(self._dsc_source.parent / name) for server in self._archive_servers(): yield self._mirror_url(server, name) if self.lp_spph.sourceFileUrl(name): @@ -365,34 +367,35 @@ class SourcePackage(ABC): Logger.warning('Signature on %s could not be verified' % self.dsc_name) def _verify_file(self, pathname, dscverify=False, sha1sum=None, sha256sum=None, size=0): - if not os.path.exists(pathname): + p = Path(pathname) + if not p.exists(): return False - if dscverify and not self.dsc.verify_file(pathname): + if dscverify and not self.dsc.verify_file(p): return False checksums = {} if sha1sum: checksums['SHA1'] = sha1sum if sha256sum: checksums['SHA256'] = sha256sum - if not verify_file_checksums(pathname, checksums, size): + if not verify_file_checksums(p, checksums, size): return False return True def _download_file(self, url, filename, size=0, dscverify=False, sha1sum=None, sha256sum=None): "Download url to filename; will be put in workdir unless filename is absolute path." - if os.path.isabs(filename): - pathname = filename + if Path(filename).is_absolute(): + p = Path(filename).expanduser().resolve() else: - pathname = os.path.join(self.workdir, filename) + p = self.workdir / filename can_verify = any((dscverify, sha1sum, sha256sum)) - if can_verify and self._verify_file(pathname, dscverify, sha1sum, sha256sum, size): - Logger.info('Using existing file %s', filename) + if can_verify and self._verify_file(p, dscverify, sha1sum, sha256sum, size): + Logger.info(f'Using existing file {p}') return True - download(url, pathname, size) + download(url, p, size) - return self._verify_file(pathname, dscverify, sha1sum, sha256sum, size) + return self._verify_file(p, dscverify, sha1sum, sha256sum, size) def _download_file_from_urls(self, urls, filename, size=0, dscverify=False, sha1sum=None, sha256sum=None): @@ -425,8 +428,7 @@ class SourcePackage(ABC): def pull(self): "Pull into workdir" - with open(self.dsc_pathname, 'wb') as f: - f.write(self.dsc.raw_text) + Path(self.dsc_pathname).write_bytes(self.dsc.raw_text) for entry in self.dsc['Files']: name = entry['name'] urls = self._source_urls(name) @@ -468,8 +470,7 @@ class SourcePackage(ABC): """Verify that the source package in workdir matches the dsc. Return boolean """ - return all(self.dsc.verify_file(os.path.join(self.workdir, - entry['name'])) + return all(self.dsc.verify_file(self.workdir / entry['name']) for entry in self.dsc['Files']) def verify_orig(self): @@ -477,8 +478,7 @@ class SourcePackage(ABC): Return boolean """ orig_re = re.compile(r'.*\.orig(-[^.]+)?\.tar\.[^.]+$') - return all(self.dsc.verify_file(os.path.join(self.workdir, - entry['name'])) + return all(self.dsc.verify_file(self.workdir / entry['name']) for entry in self.dsc['Files'] if orig_re.match(entry['name'])) @@ -488,7 +488,9 @@ class SourcePackage(ABC): if destdir: cmd.append(destdir) Logger.debug(' '.join(cmd)) - if subprocess.call(cmd, cwd=self.workdir): + result = subprocess.run(cmd, cwd=str(self.workdir), encoding='utf-8', + stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + if result.returncode != 0: Logger.error('Source unpack failed.') sys.exit(1) @@ -501,7 +503,7 @@ class SourcePackage(ABC): difffn = newpkg.dsc_name[:-3] + 'debdiff' Logger.debug(' '.join(cmd) + ('> %s' % difffn)) with open(difffn, 'w') as f: - if subprocess.call(cmd, stdout=f, cwd=self.workdir) > 2: + if subprocess.call(cmd, stdout=f, cwd=str(self.workdir)) > 2: Logger.error('Debdiff failed.') sys.exit(1) if diffstat: