Address pylint complaints

Signed-off-by: Benjamin Drung <benjamin.drung@canonical.com>
This commit is contained in:
Benjamin Drung 2023-01-30 23:10:31 +01:00
parent 8692bc2b1c
commit b1bc7e1cdc
44 changed files with 337 additions and 239 deletions

View File

@ -324,7 +324,7 @@ def orig_needed(upload, workdir, pkg):
version = pkg.version.upstream_version
h = Http()
http = Http()
for filename in glob.glob(os.path.join(workdir, "%s_%s.orig*" % (pkg.source, version))):
url = "https://launchpad.net/~%s/+archive/%s/+sourcefiles/%s/%s/%s" % (
quote(user),
@ -334,7 +334,7 @@ def orig_needed(upload, workdir, pkg):
quote(os.path.basename(filename)),
)
try:
headers, body = h.request(url, "HEAD")
headers, body = http.request(url, "HEAD")
if headers.status != 200 or not headers["content-location"].startswith(
"https://launchpadlibrarian.net"
):

View File

@ -21,6 +21,9 @@
# this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
# pylint: disable=invalid-name
# pylint: enable=invalid-name
import optparse
import os.path
import sys

View File

@ -22,6 +22,9 @@
# UDT_EDIT_WRAPPER_TEMPLATE_RE: An extra boilerplate-detecting regex.
# UDT_EDIT_WRAPPER_FILE_DESCRIPTION: The type of file being edited.
# pylint: disable=invalid-name
# pylint: enable=invalid-name
import optparse
import os
import re

View File

@ -19,6 +19,9 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# pylint: disable=invalid-name
# pylint: enable=invalid-name
import json
import optparse
import sys

View File

@ -21,6 +21,9 @@
#
# ##################################################################
# pylint: disable=invalid-name
# pylint: enable=invalid-name
import argparse
import logging
import re

View File

@ -18,6 +18,9 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# pylint: disable=invalid-name
# pylint: enable=invalid-name
import sys
from debian.changelog import Changelog
@ -59,9 +62,9 @@ def merge_changelog(left_changelog, right_changelog):
right_blocks = iter(right_cl)
clist = sorted(left_versions | right_versions, reverse=True)
ci = len(clist)
remaining = len(clist)
for version in clist:
ci -= 1
remaining -= 1
if version in left_versions:
block = next(left_blocks)
if version in right_versions:
@ -71,7 +74,7 @@ def merge_changelog(left_changelog, right_changelog):
assert block.version == version
Logger.info(str(block).strip() + ("\n" if ci else ""))
Logger.info(str(block).strip() + ("\n" if remaining else ""))
def main():

View File

@ -29,6 +29,9 @@
# configurations. For example, a symlink called pbuilder-hardy will assume
# that the target distribution is always meant to be Ubuntu Hardy.
# pylint: disable=invalid-name
# pylint: enable=invalid-name
import os
import os.path
import shutil

View File

@ -5,6 +5,9 @@
#
# See pull-pkg
# pylint: disable=invalid-name
# pylint: enable=invalid-name
from ubuntutools.pullpkg import PullPkg
if __name__ == "__main__":

View File

@ -17,6 +17,9 @@
# OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
# PERFORMANCE OF THIS SOFTWARE.
# pylint: disable=invalid-name
# pylint: enable=invalid-name
import optparse
import sys

View File

@ -5,6 +5,9 @@
#
# See pull-pkg
# pylint: disable=invalid-name
# pylint: enable=invalid-name
from ubuntutools.pullpkg import PullPkg
if __name__ == "__main__":

View File

@ -5,6 +5,9 @@
#
# See pull-pkg
# pylint: disable=invalid-name
# pylint: enable=invalid-name
from ubuntutools.pullpkg import PullPkg
if __name__ == "__main__":

View File

@ -5,6 +5,9 @@
#
# See pull-pkg
# pylint: disable=invalid-name
# pylint: enable=invalid-name
from ubuntutools.pullpkg import PullPkg
if __name__ == "__main__":

View File

@ -5,6 +5,9 @@
#
# See pull-pkg
# pylint: disable=invalid-name
# pylint: enable=invalid-name
from ubuntutools.pullpkg import PullPkg
if __name__ == "__main__":

View File

@ -5,6 +5,9 @@
#
# See pull-pkg
# pylint: disable=invalid-name
# pylint: enable=invalid-name
from ubuntutools.pullpkg import PullPkg
if __name__ == "__main__":

View File

@ -5,6 +5,9 @@
#
# See pull-pkg
# pylint: disable=invalid-name
# pylint: enable=invalid-name
from ubuntutools.pullpkg import PullPkg
if __name__ == "__main__":

View File

@ -5,6 +5,9 @@
#
# See pull-pkg
# pylint: disable=invalid-name
# pylint: enable=invalid-name
from ubuntutools.pullpkg import PullPkg
if __name__ == "__main__":

View File

@ -23,6 +23,9 @@
#
# ##################################################################
# pylint: disable=invalid-name
# pylint: enable=invalid-name
from ubuntutools.pullpkg import PullPkg
if __name__ == "__main__":

View File

@ -6,6 +6,9 @@
#
# See pull-pkg
# pylint: disable=invalid-name
# pylint: enable=invalid-name
from ubuntutools.pullpkg import PullPkg
if __name__ == "__main__":

View File

@ -6,6 +6,9 @@
#
# See pull-pkg
# pylint: disable=invalid-name
# pylint: enable=invalid-name
from ubuntutools.pullpkg import PullPkg
if __name__ == "__main__":

View File

@ -6,6 +6,9 @@
#
# See pull-pkg
# pylint: disable=invalid-name
# pylint: enable=invalid-name
from ubuntutools.pullpkg import PullPkg
if __name__ == "__main__":

View File

@ -6,6 +6,9 @@
#
# See pull-pkg
# pylint: disable=invalid-name
# pylint: enable=invalid-name
from ubuntutools.pullpkg import PullPkg
if __name__ == "__main__":

View File

@ -5,6 +5,9 @@
#
# See pull-pkg
# pylint: disable=invalid-name
# pylint: enable=invalid-name
from ubuntutools.pullpkg import PullPkg
if __name__ == "__main__":

View File

@ -5,6 +5,9 @@
#
# See pull-pkg
# pylint: disable=invalid-name
# pylint: enable=invalid-name
from ubuntutools.pullpkg import PullPkg
if __name__ == "__main__":

View File

@ -5,6 +5,9 @@
#
# See pull-pkg
# pylint: disable=invalid-name
# pylint: enable=invalid-name
from ubuntutools.pullpkg import PullPkg
if __name__ == "__main__":

View File

@ -5,6 +5,9 @@
#
# See pull-pkg
# pylint: disable=invalid-name
# pylint: enable=invalid-name
from ubuntutools.pullpkg import PullPkg
if __name__ == "__main__":

View File

@ -14,6 +14,9 @@
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
# OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
# pylint: disable=invalid-name
# pylint: enable=invalid-name
import argparse
import sys

View File

@ -14,6 +14,9 @@
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
# OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
# pylint: disable=invalid-name
# pylint: enable=invalid-name
import collections
import gzip
import json
@ -35,21 +38,21 @@ def load_index(url):
and read it.
"""
cachedir = os.path.expanduser("~/.cache/ubuntu-dev-tools")
fn = os.path.join(cachedir, "seeded.json.gz")
seeded = os.path.join(cachedir, "seeded.json.gz")
if not os.path.isfile(fn) or time.time() - os.path.getmtime(fn) > 60 * 60 * 2:
if not os.path.isfile(seeded) or time.time() - os.path.getmtime(seeded) > 60 * 60 * 2:
if not os.path.isdir(cachedir):
os.makedirs(cachedir)
urllib.request.urlretrieve(url, fn)
urllib.request.urlretrieve(url, seeded)
try:
with gzip.open(fn, "r") as f:
with gzip.open(seeded, "r") as f:
return json.load(f)
except Exception as e:
Logger.error(
"Unable to parse seed data: %s. Deleting cached data, please try again.", str(e)
)
os.unlink(fn)
os.unlink(seeded)
def resolve_binaries(sources):

View File

@ -14,6 +14,9 @@
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
# OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
# pylint: disable=invalid-name
# pylint: enable=invalid-name
import logging
import optparse
import os

View File

@ -187,8 +187,8 @@ def submit_bugreport(body, debdiff, deb_version, changelog):
def check_reportbug_config():
fn = os.path.expanduser("~/.reportbugrc")
if os.path.exists(fn):
reportbugrc_filename = os.path.expanduser("~/.reportbugrc")
if os.path.exists(reportbugrc_filename):
return
email = ubu_email()[1]
reportbugrc = (
@ -210,7 +210,7 @@ no-cc
% email
)
with open(fn, "w") as f:
with open(reportbugrc_filename, "w") as f:
f.write(reportbugrc)
Logger.info(

View File

@ -22,6 +22,9 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# pylint: disable=invalid-name
# pylint: enable=invalid-name
import sys
from optparse import OptionGroup, OptionParser
@ -145,14 +148,14 @@ def main():
try:
package = str(args[0]).lower()
release = str(args[1]).lower()
op = str(args[2]).lower()
operation = str(args[2]).lower()
except IndexError:
opt_parser.print_help()
sys.exit(1)
# Check our operation.
if op not in ("rescore", "retry", "status"):
Logger.error("Invalid operation: %s." % op)
if operation not in ("rescore", "retry", "status"):
Logger.error("Invalid operation: %s." % operation)
sys.exit(1)
# If the user has specified an architecture to build, we only wish to
@ -199,9 +202,9 @@ def main():
# (retry) or buildd admins (rescore). Check if the proper permissions
# are in place.
me = PersonTeam.me
if op == "rescore":
if operation == "rescore":
necessary_privs = me.isLpTeamMember("launchpad-buildd-admins")
if op == "retry":
if operation == "retry":
necessary_privs = me.canUploadPackage(
ubuntu_archive,
distroseries,
@ -210,11 +213,11 @@ def main():
pocket=pocket,
)
if op in ("rescore", "retry") and not necessary_privs:
if operation in ("rescore", "retry") and not necessary_privs:
Logger.error(
"You cannot perform the %s operation on a %s "
"package as you do not have the permissions "
"to do this action." % (op, component)
"to do this action." % (operation, component)
)
sys.exit(1)
@ -235,7 +238,7 @@ def main():
done = True
Logger.info("%s: %s." % (build.arch_tag, build.buildstate))
if op == "rescore":
if operation == "rescore":
if build.can_be_rescored:
# FIXME: make priority an option
priority = 5000
@ -243,7 +246,7 @@ def main():
build.rescore(score=priority)
else:
Logger.info("Cannot rescore build on %s." % build.arch_tag)
if op == "retry":
if operation == "retry":
if build.can_be_retried:
Logger.info("Retrying build on %s..." % build.arch_tag)
build.retry()

View File

@ -20,6 +20,9 @@
#
# ##################################################################
# pylint: disable=invalid-name
# pylint: enable=invalid-name
import optparse
import subprocess
import sys

View File

@ -14,6 +14,9 @@
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
# OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
# pylint: disable=invalid-name
# pylint: enable=invalid-name
import optparse
import sys

View File

@ -7,7 +7,7 @@ import logging
import sys
def getLogger():
def getLogger(): # pylint: disable=invalid-name
"""Get the logger instance for this module
Quick guide for using this or not: if you want to call ubuntutools

View File

@ -102,12 +102,12 @@ class Dsc(debian.deb822.Dsc):
def verify_file(self, pathname):
"Verify that pathname matches the checksums in the dsc"
p = Path(pathname)
if not p.is_file():
path = Path(pathname)
if not path.is_file():
return False
alg, checksums = self.get_strongest_checksum()
size, digest = checksums[p.name]
return verify_file_checksum(p, alg, digest, size)
size, digest = checksums[path.name]
return verify_file_checksum(path, alg, digest, size)
def compare_dsc(self, other):
"""Check whether any files in these two dscs that have the same name
@ -256,7 +256,7 @@ class SourcePackage(ABC):
# log binary lookup failure, in case it provides hints
Logger.info(str(bpnfe))
# raise the original exception for the source lookup
raise pnfe
raise pnfe from None
self.binary = self.source
self.source = bpph.getSourcePackageName()
@ -312,8 +312,8 @@ class SourcePackage(ABC):
if self._dsc_source:
raise RuntimeError("Internal error: we have a dsc file but dsc not set")
urls = self._source_urls(self.dsc_name)
with tempfile.TemporaryDirectory() as d:
tmpdsc = Path(d) / self.dsc_name
with tempfile.TemporaryDirectory() as tmpdir:
tmpdsc = Path(tmpdir) / self.dsc_name
self._download_file_from_urls(urls, tmpdsc)
self._dsc = Dsc(tmpdsc.read_bytes())
self._check_dsc_signature()
@ -401,35 +401,35 @@ class SourcePackage(ABC):
Logger.warning("Signature on %s could not be verified" % self.dsc_name)
def _verify_file(self, pathname, dscverify=False, sha1sum=None, sha256sum=None, size=0):
p = Path(pathname)
if not p.exists():
path = Path(pathname)
if not path.exists():
return False
if dscverify and not self.dsc.verify_file(p):
if dscverify and not self.dsc.verify_file(path):
return False
checksums = {}
if sha1sum:
checksums["SHA1"] = sha1sum
if sha256sum:
checksums["SHA256"] = sha256sum
if not verify_file_checksums(p, checksums, size):
if not verify_file_checksums(path, checksums, size):
return False
return True
def _download_file(self, url, filename, size=0, dscverify=False, sha1sum=None, sha256sum=None):
"Download url to filename; will be put in workdir unless filename is absolute path."
if Path(filename).is_absolute():
p = Path(filename).expanduser().resolve()
path = Path(filename).expanduser().resolve()
else:
p = self.workdir / filename
path = self.workdir / filename
can_verify = any((dscverify, sha1sum, sha256sum))
if can_verify and self._verify_file(p, dscverify, sha1sum, sha256sum, size):
Logger.info(f"Using existing file {p}")
if can_verify and self._verify_file(path, dscverify, sha1sum, sha256sum, size):
Logger.info(f"Using existing file {path}")
return True
download(url, p, size)
download(url, path, size)
return self._verify_file(p, dscverify, sha1sum, sha256sum, size)
return self._verify_file(path, dscverify, sha1sum, sha256sum, size)
def _download_file_from_urls(
self, urls, filename, size=0, dscverify=False, sha1sum=None, sha256sum=None
@ -698,8 +698,8 @@ class PersonalPackageArchiveSourcePackage(UbuntuSourcePackage):
def team(self):
try:
return PersonTeam.fetch(self._teamname)
except KeyError:
raise ValueError(f"No user/team '{self._teamname}' found on Launchpad")
except KeyError as error:
raise ValueError(f"No user/team '{self._teamname}' found on Launchpad") from error
@functools.lru_cache()
def getArchive(self):
@ -823,9 +823,9 @@ class UbuntuCloudArchiveSourcePackage(PersonalPackageArchiveSourcePackage):
if not any((release, pocket)):
all_ppas = cls.getUbuntuCloudArchiveTeam().getPPAs()
ppas = []
for r in cls.getUbuntuCloudArchiveReleaseNames():
for p in cls.VALID_POCKETS:
name = f"{r}-{p}"
for ppa_release in cls.getUbuntuCloudArchiveReleaseNames():
for valid_pocket in cls.VALID_POCKETS:
name = f"{ppa_release}-{valid_pocket}"
if name in all_ppas:
ppas.append(all_ppas[name])
return ppas
@ -868,27 +868,27 @@ class UbuntuCloudArchiveSourcePackage(PersonalPackageArchiveSourcePackage):
release = release.lower().strip()
# Cases 1 and 2
PATTERN1 = r"^(?P<ucarelease>[a-z]+)(?:-(?P<pocket>[a-z]+))?$"
pattern1 = r"^(?P<ucarelease>[a-z]+)(?:-(?P<pocket>[a-z]+))?$"
# Cases 3 and 4
PATTERN2 = r"^(?P<ubunturelease>[a-z]+)-(?P<ucarelease>[a-z]+)(?:-(?P<pocket>[a-z]+))?$"
pattern2 = r"^(?P<ubunturelease>[a-z]+)-(?P<ucarelease>[a-z]+)(?:-(?P<pocket>[a-z]+))?$"
# Case 5
PATTERN3 = r"^(?P<ubunturelease>[a-z]+)-(?P<pocket>[a-z]+)/(?P<ucarelease>[a-z]+)$"
pattern3 = r"^(?P<ubunturelease>[a-z]+)-(?P<pocket>[a-z]+)/(?P<ucarelease>[a-z]+)$"
for pattern in [PATTERN1, PATTERN2, PATTERN3]:
for pattern in [pattern1, pattern2, pattern3]:
match = re.match(pattern, release)
if match:
r = match.group("ucarelease")
p = match.group("pocket")
uca_release = match.group("ucarelease")
pocket = match.group("pocket")
# For UCA, there is no 'release' pocket, the default is 'updates'
if p and p == "release":
if pocket and pocket == "release":
Logger.warning(
"Ubuntu Cloud Archive does not use 'release' pocket,"
" using 'updates' instead"
)
p = "updates"
if cls.isValidRelease(r) and (not p or p in cls.VALID_POCKETS):
Logger.debug(f"Using Ubuntu Cloud Archive release '{r}'")
return (r, p)
pocket = "updates"
if cls.isValidRelease(uca_release) and (not pocket or pocket in cls.VALID_POCKETS):
Logger.debug(f"Using Ubuntu Cloud Archive release '{uca_release}'")
return (uca_release, pocket)
raise SeriesNotFoundException(f"Ubuntu Cloud Archive release '{release}' not found")
@classmethod
@ -897,14 +897,14 @@ class UbuntuCloudArchiveSourcePackage(PersonalPackageArchiveSourcePackage):
raise SeriesNotFoundException(f"Ubuntu Cloud Archive release '{release}' not found")
if pocket and pocket not in cls.VALID_POCKETS:
raise PocketDoesNotExistError(f"Ubuntu Cloud Archive pocket '{pocket}' is invalid")
DEFAULT = tuple(
default = tuple(
cls.getUbuntuCloudArchivePPAs(release=release or cls.getDevelSeries())[0].name.split(
"-", maxsplit=1
)
)
if not package:
# not much we can do without a package name
return DEFAULT
return default
checked_pocket = False
for ppa in cls.getUbuntuCloudArchivePPAs(release=release):
if pocket and pocket != ppa.name.partition("-")[2]:
@ -918,10 +918,10 @@ class UbuntuCloudArchiveSourcePackage(PersonalPackageArchiveSourcePackage):
if version:
params["version"] = version
if ppa.getPublishedSources(**params):
(r, _, p) = ppa.name.partition("-")
return (r, p)
(ppa_release, _, ppa_pocket) = ppa.name.partition("-")
return (ppa_release, ppa_pocket)
# package/version not found in any ppa
return DEFAULT
return default
class _WebJSON(object):
@ -986,9 +986,9 @@ class _Snapshot(_WebJSON):
url = "/mr/package/{}/{}/srcfiles".format(name, version)
try:
response = self.load("{}?fileinfo=1".format(url))
except HTTPError:
except HTTPError as error:
msg = "Package {} version {} not found"
raise PackageNotFoundException(msg.format(name, version))
raise PackageNotFoundException(msg.format(name, version)) from error
result = response.get("result")
info = response.get("fileinfo")
if len(result) < 1:
@ -998,11 +998,11 @@ class _Snapshot(_WebJSON):
# this expects the 'component' to follow 'pool[-*]' in the path
found_pool = False
component = None
for s in path.split("/"):
for part in path.split("/"):
if found_pool:
component = s
component = part
break
if s.startswith("pool"):
if part.startswith("pool"):
found_pool = True
if not component:
Logger.warning("could not determine component from path %s" % path)
@ -1014,8 +1014,8 @@ class _Snapshot(_WebJSON):
def _get_package(self, name, url, pkginit, version, sort_key):
try:
results = self.load("/mr/{}/{}/".format(url, name))["result"]
except HTTPError:
raise PackageNotFoundException("Package {} not found.".format(name))
except HTTPError as error:
raise PackageNotFoundException("Package {} not found.".format(name)) from error
results = sorted(results, key=lambda r: r[sort_key], reverse=True)
results = [pkginit(r) for r in results if version == r["version"]]
@ -1168,7 +1168,7 @@ class SnapshotBinaryPackage(SnapshotPackage):
class SnapshotFile(object):
def __init__(self, pkg_name, pkg_version, component, obj, h):
def __init__(self, pkg_name, pkg_version, component, obj, h): # pylint: disable=invalid-name
self.package_name = pkg_name
self.package_version = pkg_version
self.component = component

View File

@ -2,5 +2,6 @@
# ubuntu-dev-tools Launchpad Python modules.
#
# pylint: disable=invalid-name
service = "production"
api_version = "devel"

View File

@ -242,9 +242,9 @@ class Distribution(BaseWrapper):
res = self._archives.get(archive)
if not res:
for a in self.archives:
if a.name == archive:
res = Archive(a)
for archive_ in self.archives:
if archive_.name == archive:
res = Archive(archive_)
self._archives[res.name] = res
break
@ -271,9 +271,9 @@ class Distribution(BaseWrapper):
try:
series = DistroSeries(self().getSeries(name_or_version=name_or_version))
except HTTPError:
except HTTPError as error:
message = "Release '%s' is unknown in '%s'." % (name_or_version, self.display_name)
raise SeriesNotFoundException(message)
raise SeriesNotFoundException(message) from error
self._cache_series(series)
return series
@ -293,9 +293,9 @@ class Distribution(BaseWrapper):
Returns a list of all DistroSeries objects.
"""
if not self._have_all_series:
for s in Launchpad.load(self.series_collection_link).entries:
series = DistroSeries(s["self_link"])
self._cache_series(series)
for series in Launchpad.load(self.series_collection_link).entries:
series_link = DistroSeries(series["self_link"])
self._cache_series(series_link)
self._have_all_series = True
allseries = filter(lambda s: s.active, self._series.values())
@ -346,9 +346,9 @@ class DistroSeries(BaseWrapper):
try:
architecture = DistroArchSeries(self().getDistroArchSeries(archtag=archtag))
self._architectures[architecture.architecture_tag] = architecture
except HTTPError:
except HTTPError as error:
message = "Architecture %s is unknown." % archtag
raise ArchSeriesNotFoundException(message)
raise ArchSeriesNotFoundException(message) from error
return self._architectures[archtag]
def getPackageUploads(self, name=None, pocket=None, version=None, status="Unapproved"):
@ -418,9 +418,9 @@ class PackageUpload(BaseWrapper):
urls = self.binaryFileUrls()
props = self.getBinaryProperties()
self._binary_prop_dict = dict(zip(urls, props))
for (k, v) in copy(self._binary_prop_dict).items():
filename = os.path.basename(urlparse(k).path)
self._binary_prop_dict[filename] = v
for (key, value) in copy(self._binary_prop_dict).items():
filename = os.path.basename(urlparse(key).path)
self._binary_prop_dict[filename] = value
return self._binary_prop_dict.get(filename_or_url, {})
@ -583,9 +583,9 @@ class Archive(BaseWrapper):
else:
pockets = tuple(pocket)
for p in pockets:
if p not in POCKETS:
raise PocketDoesNotExistError("Pocket '%s' does not exist." % p)
for pocket_ in pockets:
if pocket_ not in POCKETS:
raise PocketDoesNotExistError("Pocket '%s' does not exist." % pocket_)
if not status:
if version:
@ -599,9 +599,9 @@ class Archive(BaseWrapper):
else:
statuses = tuple(status)
for s in statuses:
if s not in STATUSES:
raise ValueError("Status '%s' is not valid." % s)
for status_ in statuses:
if status_ not in STATUSES:
raise ValueError("Status '%s' is not valid." % status_)
dist = Distribution(self.distribution_link)
@ -685,25 +685,25 @@ class Archive(BaseWrapper):
err_msg = "status %s not in (%s)" % (record.status, ",".join(statuses))
Logger.debug(skipmsg + err_msg)
continue
r = wrapper(record)
if binary and archtag and archtag != r.arch:
err_msg = "arch %s does not match requested arch %s" % (r.arch, archtag)
release = wrapper(record)
if binary and archtag and archtag != release.arch:
err_msg = "arch %s does not match requested arch %s" % (release.arch, archtag)
Logger.debug(skipmsg + err_msg)
continue
# results are ordered so first is latest
cache[index] = r
return r
cache[index] = release
return release
version_with_epoch = None
if version and version == Version(version).strip_epoch() and len(records) == 0:
# a specific version was asked for, but we found none;
# check if one exists with an epoch to give a hint in error msg
for epoch in range(1, 9):
v = Version(version)
v.epoch = epoch
params["version"] = v.full_version
version_ = Version(version)
version_.epoch = epoch
params["version"] = version_.full_version
if len(getattr(self, function)(**params)) > 0:
version_with_epoch = v.full_version
version_with_epoch = version_.full_version
Logger.debug("Found version with epoch %s" % version_with_epoch)
break
@ -957,12 +957,12 @@ class SourcePackagePublishingHistory(BaseWrapper):
Logger.warning(
"SPPH %s_%s has no sourceFileUrls" % (self.getPackageName(), self.getVersion())
)
for u in urls:
for url in urls:
# make sure mandatory fields are present
for field in ["url", "sha1", "sha256", "size"]:
if field not in u:
u[field] = None
u["filename"] = os.path.basename(urlparse(u["url"]).path)
if field not in url:
url[field] = None
url["filename"] = os.path.basename(urlparse(url["url"]).path)
self._source_urls = urls
if include_meta:
@ -1036,11 +1036,11 @@ class SourcePackagePublishingHistory(BaseWrapper):
if self.status in ["Pending", "Published"]:
# Published, great! Directly query the list of binaries
binaries = map(BinaryPackagePublishingHistory, self._lpobject.getPublishedBinaries())
for b in binaries:
a = b.arch
if a not in self._binaries:
self._binaries[a] = {}
self._binaries[a][b.binary_package_name] = b
for binary in binaries:
arch_ = binary.arch
if arch_ not in self._binaries:
self._binaries[arch_] = {}
self._binaries[arch_][binary.binary_package_name] = binary
else:
# we have to go the long way :(
Logger.info("Please wait, this may take some time...")
@ -1050,37 +1050,37 @@ class SourcePackagePublishingHistory(BaseWrapper):
# strip out the URL leading text.
filename = os.path.basename(urlparse(url).path)
# strip the file suffix
(pkgname, _, e) = filename.rpartition(".")
(pkgname, _, extension) = filename.rpartition(".")
# split into name, version, arch
(n, v, a) = pkgname.rsplit("_", 2)
(name_, _, arch_) = pkgname.rsplit("_", 2)
# arch 'all' has separate bpph for each real arch,
# but all point to the same binary url
if a == "all":
a = arch or host_architecture()
if arch_ == "all":
arch_ = arch or host_architecture()
# Only check the arch requested - saves time
if arch and arch != a:
if arch and arch != arch_:
continue
# Only check the name requested - saves time
if name and not re.match(name, n):
if name and not re.match(name, name_):
continue
# Only check the ext requested - saves time
if ext and not re.match(ext, e):
if ext and not re.match(ext, extension):
continue
# If we already have this BPPH, keep going
if a in self._binaries and n in self._binaries[a]:
if arch_ in self._binaries and name_ in self._binaries[arch_]:
continue
# we ignore the version, as it may be missing epoch
# also we can't use series, as some package versions
# span multiple series! (e.g. for different archs)
params = {"name": n, "archtag": a, "version": self.getVersion()}
params = {"name": name_, "archtag": arch_, "version": self.getVersion()}
try:
bpph = archive.getBinaryPackage(**params)
except PackageNotFoundException:
Logger.debug("Could not find pkg in archive: %s" % filename)
continue
if a not in self._binaries:
self._binaries[a] = {}
self._binaries[a][n] = bpph
if arch_ not in self._binaries:
self._binaries[arch_] = {}
self._binaries[arch_][name_] = bpph
if not arch:
bpphs = [b for a in self._binaries.values() for b in a.values()]
@ -1215,21 +1215,21 @@ class BinaryPackagePublishingHistory(BaseWrapper):
if not self._binary_urls:
try:
urls = self._lpobject.binaryFileUrls(include_meta=True)
except AttributeError:
except AttributeError as error:
raise AttributeError(
"binaryFileUrls can only be found in lpapi "
"devel, not 1.0. Login using devel to have it."
)
) from error
if not urls:
Logger.warning(
"BPPH %s_%s has no binaryFileUrls" % (self.getPackageName(), self.getVersion())
)
for u in urls:
for url in urls:
# make sure mandatory fields are present
for field in ["url", "sha1", "sha256", "size"]:
if field not in u:
u[field] = None
u["filename"] = os.path.basename(urlparse(u["url"]).path)
if field not in url:
url[field] = None
url["filename"] = os.path.basename(urlparse(url["url"]).path)
self._binary_urls = urls
if include_meta:
@ -1438,9 +1438,9 @@ class PersonTeam(BaseWrapper, metaclass=MetaPersonTeam):
if pocket not in POCKETS:
raise PocketDoesNotExistError("Pocket '%s' does not exist." % pocket)
canUpload = self._upload.get((archive, distroseries, pocket, package, component))
can_upload = self._upload.get((archive, distroseries, pocket, package, component))
if canUpload is None:
if can_upload is None:
# checkUpload() throws an exception if the person can't upload
try:
archive.checkUpload(
@ -1450,16 +1450,16 @@ class PersonTeam(BaseWrapper, metaclass=MetaPersonTeam):
pocket=pocket,
sourcepackagename=package,
)
canUpload = True
can_upload = True
except HTTPError as e:
if e.response.status == 403:
canUpload = False
can_upload = False
else:
raise e
index = (archive, distroseries, pocket, package, component)
self._upload[index] = canUpload
self._upload[index] = can_upload
return canUpload
return can_upload
def getPPAs(self):
if self._ppas is None:

View File

@ -52,7 +52,7 @@ UPLOAD_QUEUE_STATUSES = ("New", "Unapproved", "Accepted", "Done", "Rejected")
DOWNLOAD_BLOCKSIZE_DEFAULT = 8192
_system_distribution_chain = []
_SYSTEM_DISTRIBUTION_CHAIN = []
class DownloadError(Exception):
@ -74,11 +74,11 @@ def system_distribution_chain():
the distribution chain can't be determined, print an error message
and return an empty list.
"""
global _system_distribution_chain
if len(_system_distribution_chain) == 0:
global _SYSTEM_DISTRIBUTION_CHAIN
if len(_SYSTEM_DISTRIBUTION_CHAIN) == 0:
try:
vendor = check_output(("dpkg-vendor", "--query", "Vendor"), encoding="utf-8").strip()
_system_distribution_chain.append(vendor)
_SYSTEM_DISTRIBUTION_CHAIN.append(vendor)
except CalledProcessError:
Logger.error("Could not determine what distribution you are running.")
return []
@ -89,7 +89,7 @@ def system_distribution_chain():
(
"dpkg-vendor",
"--vendor",
_system_distribution_chain[-1],
_SYSTEM_DISTRIBUTION_CHAIN[-1],
"--query",
"Parent",
),
@ -98,9 +98,9 @@ def system_distribution_chain():
except CalledProcessError:
# Vendor has no parent
break
_system_distribution_chain.append(parent)
_SYSTEM_DISTRIBUTION_CHAIN.append(parent)
return _system_distribution_chain
return _SYSTEM_DISTRIBUTION_CHAIN
def system_distribution():
@ -138,16 +138,16 @@ def readlist(filename, uniq=True):
Read a list of words from the indicated file. If 'uniq' is True, filter
out duplicated words.
"""
p = Path(filename)
path = Path(filename)
if not p.is_file():
Logger.error(f"File {p} does not exist.")
if not path.is_file():
Logger.error(f"File {path} does not exist.")
return False
content = p.read_text().replace("\n", " ").replace(",", " ")
content = path.read_text().replace("\n", " ").replace(",", " ")
if not content.strip():
Logger.error(f"File {p} is empty.")
Logger.error(f"File {path} is empty.")
return False
items = [item for item in content.split() if item]
@ -234,29 +234,31 @@ def verify_file_checksums(pathname, checksums={}, size=0):
Returns True if all checks pass, False otherwise
"""
p = Path(pathname)
path = Path(pathname)
if not p.is_file():
Logger.error(f"File {p} not found")
if not path.is_file():
Logger.error(f"File {path} not found")
return False
filesize = p.stat().st_size
filesize = path.stat().st_size
if size and size != filesize:
Logger.error(f"File {p} incorrect size, got {filesize} expected {size}")
Logger.error(f"File {path} incorrect size, got {filesize} expected {size}")
return False
for (alg, checksum) in checksums.items():
h = hashlib.new(alg)
with p.open("rb") as f:
hash_ = hashlib.new(alg)
with path.open("rb") as f:
while True:
block = f.read(h.block_size)
block = f.read(hash_.block_size)
if len(block) == 0:
break
h.update(block)
digest = h.hexdigest()
hash_.update(block)
digest = hash_.hexdigest()
if digest == checksum:
Logger.debug(f"File {p} checksum ({alg}) verified: {checksum}")
Logger.debug(f"File {path} checksum ({alg}) verified: {checksum}")
else:
Logger.error(f"File {p} checksum ({alg}) mismatch: got {digest} expected {checksum}")
Logger.error(
f"File {path} checksum ({alg}) mismatch: got {digest} expected {checksum}"
)
return False
return True
@ -288,9 +290,13 @@ def extract_authentication(url):
This returns a tuple in the form (url, username, password)
"""
u = urlparse(url)
if u.username or u.password:
return (u._replace(netloc=u.hostname).geturl(), u.username, u.password)
components = urlparse(url)
if components.username or components.password:
return (
components._replace(netloc=components.hostname).geturl(),
components.username,
components.password,
)
return (url, None, None)
@ -339,21 +345,21 @@ def download(src, dst, size=0, *, blocksize=DOWNLOAD_BLOCKSIZE_DEFAULT):
(src, username, password) = extract_authentication(src)
auth = (username, password) if username or password else None
with tempfile.TemporaryDirectory() as d:
tmpdst = Path(d) / "dst"
with tempfile.TemporaryDirectory() as tmpdir:
tmpdst = Path(tmpdir) / "dst"
try:
with requests.get(src, stream=True, auth=auth) as fsrc, tmpdst.open("wb") as fdst:
fsrc.raise_for_status()
_download(fsrc, fdst, size, blocksize=blocksize)
except requests.exceptions.HTTPError as e:
if e.response is not None and e.response.status_code == 404:
raise NotFoundError(f"URL {src} not found: {e}")
raise DownloadError(e)
except requests.exceptions.ConnectionError as e:
except requests.exceptions.HTTPError as error:
if error.response is not None and error.response.status_code == 404:
raise NotFoundError(f"URL {src} not found: {error}") from error
raise DownloadError(error) from error
except requests.exceptions.ConnectionError as error:
# This is likely a archive hostname that doesn't resolve, like 'ftpmaster.internal'
raise NotFoundError(f"URL {src} not found: {e}")
except requests.exceptions.RequestException as e:
raise DownloadError(e)
raise NotFoundError(f"URL {src} not found: {error}") from error
except requests.exceptions.RequestException as error:
raise DownloadError(error) from error
shutil.move(tmpdst, dst)
return dst
@ -440,8 +446,8 @@ def _download(fsrc, fdst, size, *, blocksize):
def _download_text(src, binary, *, blocksize):
with tempfile.TemporaryDirectory() as d:
dst = Path(d) / "dst"
with tempfile.TemporaryDirectory() as tmpdir:
dst = Path(tmpdir) / "dst"
download(src, dst, blocksize=blocksize)
return dst.read_bytes() if binary else dst.read_text()

View File

@ -107,21 +107,21 @@ class PullPkg(object):
unexpected errors will flow up to the caller.
On success, this simply returns.
"""
Logger = ubuntutools_getLogger()
logger = ubuntutools_getLogger()
try:
cls(*args, **kwargs).pull()
return
except KeyboardInterrupt:
Logger.info("User abort.")
logger.info("User abort.")
except (
PackageNotFoundException,
SeriesNotFoundException,
PocketDoesNotExistError,
InvalidDistroValueError,
InvalidPullValueError,
) as e:
Logger.error(str(e))
) as error:
logger.error(str(error))
sys.exit(errno.ENOENT)
def __init__(self, *args, **kwargs):
@ -275,12 +275,12 @@ class PullPkg(object):
if distro == DISTRO_PPA:
# PPAs are part of Ubuntu distribution
d = Distribution(DISTRO_UBUNTU)
distribution = Distribution(DISTRO_UBUNTU)
else:
d = Distribution(distro)
distribution = Distribution(distro)
# let SeriesNotFoundException flow up
d.getSeries(release)
distribution.getSeries(release)
Logger.debug("Using distro '%s' release '%s' pocket '%s'", distro, release, pocket)
return (release, pocket)
@ -340,12 +340,12 @@ class PullPkg(object):
params["package"] = options["package"]
if options["release"]:
(r, v, p) = self.parse_release_and_version(
(release, version, pocket) = self.parse_release_and_version(
distro, options["release"], options["version"]
)
params["series"] = r
params["version"] = v
params["pocket"] = p
params["series"] = release
params["version"] = version
params["pocket"] = pocket
if params["package"].endswith(".dsc") and not params["series"] and not params["version"]:
params["dscfile"] = params["package"]
@ -553,33 +553,33 @@ class PullPkg(object):
raise PackageNotFoundException(msg)
if pull == PULL_LIST:
for p in packages:
msg = "Found %s %s (ID %s)" % (p.package_name, p.package_version, p.id)
if p.display_arches:
msg += " arch %s" % p.display_arches
for pkg in packages:
msg = "Found %s %s (ID %s)" % (pkg.package_name, pkg.package_version, pkg.id)
if pkg.display_arches:
msg += " arch %s" % pkg.display_arches
Logger.info(msg)
url = p.changesFileUrl()
url = pkg.changesFileUrl()
if url:
Logger.info("Changes file:")
Logger.info(" %s", url)
else:
Logger.info("No changes file")
urls = p.sourceFileUrls()
urls = pkg.sourceFileUrls()
if urls:
Logger.info("Source files:")
for url in urls:
Logger.info(" %s", url)
else:
Logger.info("No source files")
urls = p.binaryFileUrls()
urls = pkg.binaryFileUrls()
if urls:
Logger.info("Binary files:")
for url in urls:
Logger.info(" %s", url)
Logger.info(" { %s }" % p.binaryFileProperties(url))
Logger.info(" { %s }" % pkg.binaryFileProperties(url))
else:
Logger.info("No binary files")
urls = p.customFileUrls()
urls = pkg.customFileUrls()
if urls:
Logger.info("Custom files:")
for url in urls:
@ -593,18 +593,18 @@ class PullPkg(object):
else:
msg += ", please specify the version"
Logger.error("Available package versions/ids are:")
for p in packages:
Logger.error("%s %s (id %s)" % (p.package_name, p.package_version, p.id))
for pkg in packages:
Logger.error("%s %s (id %s)" % (pkg.package_name, pkg.package_version, pkg.id))
raise PackageNotFoundException(msg)
p = packages[0]
pkg = packages[0]
urls = set(p.customFileUrls())
if p.changesFileUrl():
urls.add(p.changesFileUrl())
urls = set(pkg.customFileUrls())
if pkg.changesFileUrl():
urls.add(pkg.changesFileUrl())
if pull == PULL_SOURCE:
urls |= set(p.sourceFileUrls())
urls |= set(pkg.sourceFileUrls())
if not urls:
Logger.error("No source files to download")
dscfile = None
@ -636,7 +636,7 @@ class PullPkg(object):
else:
raise InvalidPullValueError("Invalid pull value %s" % pull)
urls |= set(p.binaryFileUrls())
urls |= set(pkg.binaryFileUrls())
if not urls:
Logger.error("No binary files to download")
for url in urls:

View File

@ -149,9 +149,9 @@ def get_ubuntu_delta_changelog(srcpkg):
changes = Changes(Http().request(changes_url)[1])
for line in changes["Changes"].splitlines():
line = line[1:]
m = topline.match(line)
if m:
distribution = m.group(3).split()[0].split("-")[0]
match = topline.match(line)
if match:
distribution = match.group(3).split()[0].split("-")[0]
if debian_info.valid(distribution):
break
if line.startswith(" "):

View File

@ -207,9 +207,9 @@ Content-Type: text/plain; charset=UTF-8
while True:
try:
Logger.info("Connecting to %s:%s ...", mailserver_host, mailserver_port)
s = smtplib.SMTP(mailserver_host, mailserver_port)
smtp = smtplib.SMTP(mailserver_host, mailserver_port)
break
except smtplib.SMTPConnectError as s:
except smtplib.SMTPConnectError as error:
try:
# py2 path
# pylint: disable=unsubscriptable-object
@ -217,8 +217,8 @@ Content-Type: text/plain; charset=UTF-8
"Could not connect to %s:%s: %s (%i)",
mailserver_host,
mailserver_port,
s[1],
s[0],
error[1],
error[0],
)
except TypeError:
# pylint: disable=no-member
@ -226,15 +226,15 @@ Content-Type: text/plain; charset=UTF-8
"Could not connect to %s:%s: %s (%i)",
mailserver_host,
mailserver_port,
s.strerror,
s.errno,
error.strerror,
error.errno,
)
if s.smtp_code == 421:
if error.smtp_code == 421:
confirmation_prompt(
message="This is a temporary error, press [Enter] "
"to retry. Press [Ctrl-C] to abort now."
)
except socket.error as s:
except socket.error as error:
try:
# py2 path
# pylint: disable=unsubscriptable-object
@ -242,8 +242,8 @@ Content-Type: text/plain; charset=UTF-8
"Could not connect to %s:%s: %s (%i)",
mailserver_host,
mailserver_port,
s[1],
s[0],
error[1],
error[0],
)
except TypeError:
# pylint: disable=no-member
@ -251,27 +251,27 @@ Content-Type: text/plain; charset=UTF-8
"Could not connect to %s:%s: %s (%i)",
mailserver_host,
mailserver_port,
s.strerror,
s.errno,
error.strerror,
error.errno,
)
return
if mailserver_user and mailserver_pass:
try:
s.login(mailserver_user, mailserver_pass)
smtp.login(mailserver_user, mailserver_pass)
except smtplib.SMTPAuthenticationError:
Logger.error("Error authenticating to the server: invalid username and password.")
s.quit()
smtp.quit()
return
except smtplib.SMTPException:
Logger.error("Unknown SMTP error.")
s.quit()
smtp.quit()
return
while True:
try:
s.sendmail(myemailaddr, to, mail.encode("utf-8"))
s.quit()
smtp.sendmail(myemailaddr, to, mail.encode("utf-8"))
smtp.quit()
os.remove(backup.name)
Logger.info("Sync request mailed.")
break
@ -285,8 +285,8 @@ Content-Type: text/plain; charset=UTF-8
)
else:
return
except smtplib.SMTPResponseException as e:
Logger.error("Error while sending: %i, %s", e.smtp_code, e.smtp_error)
except smtplib.SMTPResponseException as error:
Logger.error("Error while sending: %i, %s", error.smtp_code, error.smtp_error)
return
except smtplib.SMTPServerDisconnected:
Logger.error("Server disconnected while sending the mail.")

View File

@ -57,11 +57,11 @@ class ExamplePackage(object):
return "my content"
def create(self):
with tempfile.TemporaryDirectory() as d:
self._create(Path(d))
with tempfile.TemporaryDirectory() as tmpdir:
self._create(Path(tmpdir))
def _create(self, d):
pkgdir = d / self.dirname
def _create(self, directory: Path):
pkgdir = directory / self.dirname
pkgdir.mkdir()
(pkgdir / self.content_filename).write_text(self.content_text)
@ -80,13 +80,13 @@ class ExamplePackage(object):
f"dpkg-source -b {self.dirname}".split(),
check=True,
env=self.env,
cwd=str(d),
cwd=str(directory),
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
# move tarballs and dsc to destdir
self.destdir.mkdir(parents=True, exist_ok=True)
(d / self.orig.name).rename(self.orig)
(d / self.debian.name).rename(self.debian)
(d / self.dsc.name).rename(self.dsc)
(directory / self.orig.name).rename(self.orig)
(directory / self.debian.name).rename(self.debian)
(directory / self.dsc.name).rename(self.dsc)

View File

@ -26,9 +26,9 @@ from ubuntutools.test.example_package import ExamplePackage
class BaseVerificationTestCase(unittest.TestCase):
def setUp(self):
d = tempfile.TemporaryDirectory()
self.addCleanup(d.cleanup)
self.pkg = ExamplePackage(destdir=Path(d.name))
tmpdir = tempfile.TemporaryDirectory()
self.addCleanup(tmpdir.cleanup)
self.pkg = ExamplePackage(destdir=Path(tmpdir.name))
self.pkg.create()
self.dsc = ubuntutools.archive.Dsc(self.pkg.dsc.read_bytes())
@ -65,9 +65,9 @@ class LocalSourcePackageTestCase(BaseVerificationTestCase):
def setUp(self):
super().setUp()
d = tempfile.TemporaryDirectory()
self.addCleanup(d.cleanup)
self.workdir = Path(d.name)
tmpdir = tempfile.TemporaryDirectory()
self.addCleanup(tmpdir.cleanup)
self.workdir = Path(tmpdir.name)
def pull(self, **kwargs):
"""Do the pull from pkg dir to the workdir, return the SourcePackage"""

View File

@ -40,9 +40,9 @@ class ConfigTestCase(unittest.TestCase):
def setUp(self):
super(ConfigTestCase, self).setUp()
m = mock.mock_open()
m.side_effect = self._fake_open
patcher = mock.patch("builtins.open", m)
open_mock = mock.mock_open()
open_mock.side_effect = self._fake_open
patcher = mock.patch("builtins.open", open_mock)
self.addCleanup(patcher.stop)
patcher.start()
@ -230,8 +230,6 @@ class UbuEmailTestCase(unittest.TestCase):
try:
os.environ["DEBFULLNAME"] = env_name
except UnicodeEncodeError:
raise unittest.SkipTest(
"python interpreter is not running in an unicode capable locale"
)
self.skipTest("python interpreter is not running in an unicode capable locale")
os.environ["DEBEMAIL"] = email = "joe@example.net"
self.assertEqual(ubu_email(), (name, email))

View File

@ -14,6 +14,9 @@
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
# OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
# pylint: disable=invalid-name
# pylint: enable=invalid-name
import optparse
import os
import sys