archive: convert to use pathlib instead of os.path

Mostly convert to using pathlib
This commit is contained in:
Dan Streetman 2021-07-13 15:54:37 -04:00
parent be6e09a02b
commit 1b12d8b4e3

View File

@ -31,7 +31,7 @@ from urllib.request import urlopen
import codecs
import functools
import json
import os.path
import os
import re
import subprocess
import sys
@ -44,6 +44,8 @@ import debian.deb822
from contextlib import closing
from pathlib import Path
from ubuntutools.config import UDTConfig
from ubuntutools.lp.lpapicache import (Launchpad,
Distribution,
@ -86,11 +88,12 @@ class Dsc(debian.deb822.Dsc):
def verify_file(self, pathname):
"Verify that pathname matches the checksums in the dsc"
if not os.path.isfile(pathname):
p = Path(pathname)
if not p.is_file():
return False
alg, checksums = self.get_strongest_checksum()
size, digest = checksums[os.path.basename(pathname)]
return verify_file_checksum(pathname, alg, digest, size)
size, digest = checksums[p.name]
return verify_file_checksum(p, alg, digest, size)
def compare_dsc(self, other):
"""Check whether any files in these two dscs that have the same name
@ -142,7 +145,7 @@ class SourcePackage(ABC):
"""
dscfile = kwargs.get('dscfile')
mirrors = kwargs.get('mirrors', ())
workdir = kwargs.get('workdir', '.')
workdir = kwargs.get('workdir')
series = kwargs.get('series')
pocket = kwargs.get('pocket')
status = kwargs.get('status')
@ -161,11 +164,11 @@ class SourcePackage(ABC):
self.source = package
self.binary = None
self.try_binary = try_binary
self.workdir = workdir
self.workdir = Path(workdir) if workdir else Path('.')
self._series = series
self._pocket = pocket
self._status = status
self._dsc_source = dscfile
self._dsc_source = Path(dscfile) if dscfile else None
self._verify_signature = verify_signature
# Cached values:
@ -270,12 +273,12 @@ class SourcePackage(ABC):
@property
def dsc_name(self):
"Return the source package dsc filename for the given package"
return '%s_%s.dsc' % (self.source, self.version.strip_epoch())
return f'{self.source}_{self.version.strip_epoch()}.dsc'
@property
def dsc_pathname(self):
"Return the dsc_name, with the workdir path"
return os.path.join(self.workdir, self.dsc_name)
return str(self.workdir / self.dsc_name)
@property
def dsc(self):
@ -285,10 +288,9 @@ class SourcePackage(ABC):
raise RuntimeError('Internal error: we have a dsc file but dsc not set')
urls = self._source_urls(self.dsc_name)
with tempfile.TemporaryDirectory() as d:
tmpdsc = os.path.join(d, self.dsc_name)
tmpdsc = Path(d) / self.dsc_name
self._download_file_from_urls(urls, tmpdsc)
with open(tmpdsc, 'rb') as f:
self._dsc = Dsc(f.read())
self._dsc = Dsc(tmpdsc.read_bytes())
self._check_dsc_signature()
return self._dsc
@ -321,7 +323,7 @@ class SourcePackage(ABC):
def _source_urls(self, name):
"Generator of sources for name"
if self._dsc_source:
yield os.path.join(os.path.dirname(self._dsc_source), name)
yield str(self._dsc_source.parent / name)
for server in self._archive_servers():
yield self._mirror_url(server, name)
if self.lp_spph.sourceFileUrl(name):
@ -365,34 +367,35 @@ class SourcePackage(ABC):
Logger.warning('Signature on %s could not be verified' % self.dsc_name)
def _verify_file(self, pathname, dscverify=False, sha1sum=None, sha256sum=None, size=0):
if not os.path.exists(pathname):
p = Path(pathname)
if not p.exists():
return False
if dscverify and not self.dsc.verify_file(pathname):
if dscverify and not self.dsc.verify_file(p):
return False
checksums = {}
if sha1sum:
checksums['SHA1'] = sha1sum
if sha256sum:
checksums['SHA256'] = sha256sum
if not verify_file_checksums(pathname, checksums, size):
if not verify_file_checksums(p, checksums, size):
return False
return True
def _download_file(self, url, filename, size=0, dscverify=False, sha1sum=None, sha256sum=None):
"Download url to filename; will be put in workdir unless filename is absolute path."
if os.path.isabs(filename):
pathname = filename
if Path(filename).is_absolute():
p = Path(filename).expanduser().resolve()
else:
pathname = os.path.join(self.workdir, filename)
p = self.workdir / filename
can_verify = any((dscverify, sha1sum, sha256sum))
if can_verify and self._verify_file(pathname, dscverify, sha1sum, sha256sum, size):
Logger.info('Using existing file %s', filename)
if can_verify and self._verify_file(p, dscverify, sha1sum, sha256sum, size):
Logger.info(f'Using existing file {p}')
return True
download(url, pathname, size)
download(url, p, size)
return self._verify_file(pathname, dscverify, sha1sum, sha256sum, size)
return self._verify_file(p, dscverify, sha1sum, sha256sum, size)
def _download_file_from_urls(self, urls, filename, size=0, dscverify=False,
sha1sum=None, sha256sum=None):
@ -425,8 +428,7 @@ class SourcePackage(ABC):
def pull(self):
"Pull into workdir"
with open(self.dsc_pathname, 'wb') as f:
f.write(self.dsc.raw_text)
Path(self.dsc_pathname).write_bytes(self.dsc.raw_text)
for entry in self.dsc['Files']:
name = entry['name']
urls = self._source_urls(name)
@ -468,8 +470,7 @@ class SourcePackage(ABC):
"""Verify that the source package in workdir matches the dsc.
Return boolean
"""
return all(self.dsc.verify_file(os.path.join(self.workdir,
entry['name']))
return all(self.dsc.verify_file(self.workdir / entry['name'])
for entry in self.dsc['Files'])
def verify_orig(self):
@ -477,8 +478,7 @@ class SourcePackage(ABC):
Return boolean
"""
orig_re = re.compile(r'.*\.orig(-[^.]+)?\.tar\.[^.]+$')
return all(self.dsc.verify_file(os.path.join(self.workdir,
entry['name']))
return all(self.dsc.verify_file(self.workdir / entry['name'])
for entry in self.dsc['Files']
if orig_re.match(entry['name']))
@ -488,7 +488,9 @@ class SourcePackage(ABC):
if destdir:
cmd.append(destdir)
Logger.debug(' '.join(cmd))
if subprocess.call(cmd, cwd=self.workdir):
result = subprocess.run(cmd, cwd=str(self.workdir), encoding='utf-8',
stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
if result.returncode != 0:
Logger.error('Source unpack failed.')
sys.exit(1)
@ -501,7 +503,7 @@ class SourcePackage(ABC):
difffn = newpkg.dsc_name[:-3] + 'debdiff'
Logger.debug(' '.join(cmd) + ('> %s' % difffn))
with open(difffn, 'w') as f:
if subprocess.call(cmd, stdout=f, cwd=self.workdir) > 2:
if subprocess.call(cmd, stdout=f, cwd=str(self.workdir)) > 2:
Logger.error('Debdiff failed.')
sys.exit(1)
if diffstat: