Fix warnings found by pylint

Signed-off-by: Benjamin Drung <benjamin.drung@canonical.com>
This commit is contained in:
Benjamin Drung 2023-01-31 15:51:29 +01:00
parent 9fa29f6ad5
commit 4449cf2437
22 changed files with 96 additions and 99 deletions

View File

@ -338,7 +338,7 @@ def orig_needed(upload, workdir, pkg):
quote(os.path.basename(filename)),
)
try:
headers, body = http.request(url, "HEAD")
headers = http.request(url, "HEAD")[0]
if headers.status != 200 or not headers["content-location"].startswith(
"https://launchpadlibrarian.net"
):

View File

@ -145,7 +145,7 @@ def main():
sys.exit(1)
# get build dependencies from debian/control
control = apt.apt_pkg.TagFile(open("debian/control"))
control = apt.apt_pkg.TagFile(open("debian/control", encoding="utf-8"))
next(control)
unsupported_build_deps = check_build_dependencies(apt_cache, control)

View File

@ -51,9 +51,9 @@ Debian release of the package.
def merge_changelog(left_changelog, right_changelog):
"""Merge a changelog file."""
with open(left_changelog) as f:
with open(left_changelog, encoding="utf-8") as f:
left_cl = Changelog(f)
with open(right_changelog) as f:
with open(right_changelog, encoding="utf-8") as f:
right_cl = Changelog(f)
left_versions = set(left_cl.versions)

View File

@ -220,7 +220,7 @@ class PbuilderDist(object):
if self.operation == "build":
dsc_files = [a for a in remaining_arguments if a.strip().endswith(".dsc")]
assert len(dsc_files) == 1
dsc = debian.deb822.Dsc(open(dsc_files[0]))
dsc = debian.deb822.Dsc(open(dsc_files[0], encoding="utf-8"))
version = ubuntutools.version.Version(dsc["Version"])
name = (
dsc["Source"]
@ -462,7 +462,10 @@ def main():
requested_arch = parts[2]
elif len(args) > 0:
if shutil.which("arch-test") is not None:
if subprocess.run(["arch-test", args[0]], stdout=subprocess.DEVNULL).returncode == 0:
arch_test = subprocess.run(
["arch-test", args[0]], check=False, stdout=subprocess.DEVNULL
)
if arch_test.returncode == 0:
requested_arch = args.pop(0)
elif os.path.isdir("/usr/lib/arch-test") and args[0] in os.listdir(
"/usr/lib/arch-test/"

View File

@ -37,7 +37,7 @@ def previous_version(package, version, distance):
"Given an (extracted) package, determine the version distance versions ago"
upver = Version(version).upstream_version
filename = "%s-%s/debian/changelog" % (package, upver)
changelog_file = open(filename, "r")
changelog_file = open(filename, "r", encoding="utf-8")
changelog = debian.changelog.Changelog(changelog_file.read())
changelog_file.close()
seen = 0

View File

@ -113,7 +113,7 @@ def find_rdepends(releases, published_binaries):
# We want to display every pubilshed binary, even if it has no rdepends
for binpkg in published_binaries:
intermediate[binpkg]
intermediate[binpkg] # pylint: disable=pointless-statement
for arch in ("any", "source"):
for release in releases:
@ -261,7 +261,7 @@ def request_backport(package_spph, source, destinations):
series = distro.getSeries(dest)
try:
bug.addTask(target=series.getSourcePackage(name=pkgname))
except Exception:
except Exception: # pylint: disable=broad-except
break
Logger.info("Backport request filed as %s", bug.web_link)

View File

@ -48,7 +48,7 @@ def load_index(url):
try:
with gzip.open(seeded, "r") as f:
return json.load(f)
except Exception as e:
except Exception as e: # pylint: disable=broad-except
Logger.error(
"Unable to parse seed data: %s. Deleting cached data, please try again.", str(e)
)

View File

@ -116,6 +116,7 @@ def gen_debdiff(tmpdir, changelog):
with open(debdiff, "w", encoding="utf-8") as debdiff_f:
run(
["filterdiff", "-x", "*changelog*"],
check=False,
stdin=diff.stdout,
stdout=debdiff_f,
encoding="utf-8",
@ -161,7 +162,7 @@ def submit_bugreport(body, debdiff, deb_version, changelog):
cfgfile = os.path.expanduser(cfgfile)
if not os.path.exists(cfgfile):
continue
with open(cfgfile, "r") as f:
with open(cfgfile, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if line in ("gnus", "mutt", "nmh") or line.startswith("mua "):
@ -215,7 +216,7 @@ no-cc
% email
)
with open(reportbugrc_filename, "w") as f:
with open(reportbugrc_filename, "w", encoding="utf-8") as f:
f.write(reportbugrc)
Logger.info(
@ -253,7 +254,7 @@ def main():
changelog_file = check_file("debian/changelog", critical=False) or check_file(
"../debian/changelog"
)
with open(changelog_file) as f:
with open(changelog_file, encoding="utf-8") as f:
changelog = Changelog(f.read())
deb_version = get_most_recent_debian_version(changelog)

View File

@ -35,7 +35,7 @@ Logger = getLogger()
def extract(iso, path):
command = ["isoinfo", "-R", "-i", iso, "-x", path]
pipe = subprocess.run(
command, encoding="utf-8", stdout=subprocess.PIPE, stderr=subprocess.PIPE
command, check=False, encoding="utf-8", stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
if pipe.returncode != 0:

View File

@ -151,7 +151,7 @@ class SourcePackage(ABC):
def spph_class(self):
return SourcePackagePublishingHistory
def __init__(self, package=None, version=None, component=None, *args, **kwargs):
def __init__(self, package=None, version=None, component=None, **kwargs):
"""Can be initialised using either package or dscfile.
If package is specified, either the version or series can also be
specified; using version will get the specific package version,
@ -526,6 +526,7 @@ class SourcePackage(ABC):
Logger.debug(" ".join(cmd))
result = subprocess.run(
cmd,
check=False,
cwd=str(self.workdir),
encoding="utf-8",
stdout=subprocess.PIPE,
@ -543,7 +544,7 @@ class SourcePackage(ABC):
cmd = ["debdiff", self.dsc_name, newpkg.dsc_name]
difffn = newpkg.dsc_name[:-3] + "debdiff"
Logger.debug("%s > %s", " ".join(cmd), difffn)
with open(difffn, "w") as f:
with open(difffn, "w", encoding="utf-8") as f:
if subprocess.call(cmd, stdout=f, cwd=str(self.workdir)) > 2:
Logger.error("Debdiff failed.")
sys.exit(1)
@ -692,7 +693,7 @@ class PersonalPackageArchiveSourcePackage(UbuntuSourcePackage):
self.masters = []
@property
@functools.lru_cache(maxsize=None)
@functools.lru_cache()
def team(self):
try:
return PersonTeam.fetch(self._teamname)
@ -1033,9 +1034,7 @@ class _Snapshot(_WebJSON):
return self.getSourcePackages(name, version)[0]
def getBinaryPackages(self, name, version):
return self._get_package(
name, "binary", lambda obj: SnapshotBinaryPackage(obj), version, "binary_version"
)
return self._get_package(name, "binary", SnapshotBinaryPackage, version, "binary_version")
def getBinaryPackage(self, name, version):
return self.getBinaryPackages(name, version)[0]
@ -1121,10 +1120,6 @@ class SnapshotSourcePackage(SnapshotPackage):
class SnapshotBinaryPackage(SnapshotPackage):
def __init__(self, obj):
# obj required fields: 'version', 'binary_version', 'name', 'source'
super(SnapshotBinaryPackage, self).__init__(obj)
@property
def name(self):
return self._obj["name"]
@ -1176,7 +1171,6 @@ class SnapshotFile(object):
self._obj = obj
self._hash = h
@property
def getType(self):
return None
@ -1221,9 +1215,6 @@ class SnapshotFile(object):
class SnapshotSourceFile(SnapshotFile):
def __init__(self, name, version, component, obj, h):
super(SnapshotSourceFile, self).__init__(name, version, component, obj, h)
def getType(self):
return "source"
@ -1318,7 +1309,7 @@ class SnapshotSPPH(object):
return f["sha1"]
return None
def sourceFileSha256(self, url_or_filename):
def sourceFileSha256(self, url_or_filename): # pylint: disable=unused-argument
return None
def sourceFileSize(self, url_or_filename):
@ -1448,7 +1439,7 @@ class SnapshotBPPH(object):
return self._file.getHash()
return None
def binaryFileSha256(self, url_or_filename):
def binaryFileSha256(self, url_or_filename): # pylint: disable=unused-argument
return None
def binaryFileSize(self, url_or_filename):

View File

@ -150,7 +150,7 @@ class Sbuild(Builder):
def update(self, dist):
cmd = ["schroot", "--list"]
Logger.debug(" ".join(cmd))
process = subprocess.run(cmd, stdout=subprocess.PIPE, encoding="utf-8")
process = subprocess.run(cmd, check=False, stdout=subprocess.PIPE, encoding="utf-8")
chroots, _ = process.stdout.strip().split()
if process.returncode != 0:
return process.returncode
@ -182,9 +182,9 @@ class Sbuild(Builder):
_SUPPORTED_BUILDERS = {
"cowbuilder": lambda: Pbuilder("cowbuilder"),
"cowbuilder-dist": lambda: Pbuilderdist("cowbuilder-dist"),
"pbuilder": lambda: Pbuilder(),
"pbuilder-dist": lambda: Pbuilderdist(),
"sbuild": lambda: Sbuild(),
"pbuilder": Pbuilder,
"pbuilder-dist": Pbuilderdist,
"sbuild": Sbuild,
}

View File

@ -68,7 +68,7 @@ class UDTConfig(object):
config = {}
for filename in ("/etc/devscripts.conf", "~/.devscripts"):
try:
f = open(os.path.expanduser(filename), "r")
f = open(os.path.expanduser(filename), "r", encoding="utf-8")
except IOError:
continue
for line in f:
@ -179,7 +179,7 @@ def ubu_email(name=None, email=None, export=True):
if not email:
mailname = socket.getfqdn()
if os.path.isfile("/etc/mailname"):
mailname = open("/etc/mailname", "r").read().strip()
mailname = open("/etc/mailname", "r", encoding="utf-8").read().strip()
email = pwd.getpwuid(os.getuid()).pw_name + "@" + mailname
if export:
@ -187,7 +187,7 @@ def ubu_email(name=None, email=None, export=True):
os.environ["DEBEMAIL"] = email
# decode env var or gecos raw string with the current locale's encoding
encoding = locale.getdefaultlocale()[1]
encoding = locale.getlocale()[1]
if not encoding:
encoding = "utf-8"
if name and isinstance(name, bytes):

View File

@ -2,6 +2,5 @@
# ubuntu-dev-tools Launchpad Python modules.
#
# pylint: disable=invalid-name
service = "production"
api_version = "devel"
SERVICE = "production"
API_VERSION = "devel"

View File

@ -34,7 +34,7 @@ from launchpadlib.errors import HTTPError
from launchpadlib.launchpad import Launchpad as LP
from lazr.restfulclient.resource import Entry
from ubuntutools.lp import api_version, service
from ubuntutools.lp import API_VERSION, SERVICE
from ubuntutools.lp.udtexceptions import (
AlreadyLoggedInError,
ArchiveNotFoundException,
@ -76,7 +76,9 @@ __all__ = [
class _Launchpad(object):
"""Singleton for LP API access."""
def login(self, service=service, api_version=api_version):
__lp = None
def login(self, service=SERVICE, api_version=API_VERSION):
"""Enforce a non-anonymous login."""
if not self.logged_in:
self.__lp = LP.login_with("ubuntu-dev-tools", service, version=api_version)
@ -85,11 +87,11 @@ class _Launchpad(object):
# are valid; which can lead to this 'login' not actually
# logging in.
# So, this forces actual LP access here, to force actual login.
self.__lp.me
self.__lp.me # pylint: disable=pointless-statement
else:
raise AlreadyLoggedInError("Already logged in to Launchpad.")
def login_anonymously(self, service=service, api_version=api_version):
def login_anonymously(self, service=SERVICE, api_version=API_VERSION):
"""Enforce an anonymous login."""
if not self.logged_in:
self.__lp = LP.login_anonymously("ubuntu-dev-tools", service, version=api_version)
@ -201,12 +203,13 @@ class Distribution(BaseWrapper):
resource_type = "distribution"
def __init__(self, *args):
def __init__(self, *args): # pylint: disable=unused-argument
self._archives = dict()
self._series_by_name = dict()
self._series = dict()
self._dev_series = None
self._have_all_series = False
self._main_archive = None
def cache(self):
self._cache[self.name] = self
@ -254,7 +257,7 @@ class Distribution(BaseWrapper):
message = "The Archive '%s' doesn't exist in %s" % (archive, self.display_name)
raise ArchiveNotFoundException(message)
else:
if "_main_archive" not in self.__dict__:
if self._main_archive is None:
self._main_archive = Archive(self.main_archive_link)
return self._main_archive
@ -288,7 +291,7 @@ class Distribution(BaseWrapper):
self._dev_series = series
return self._dev_series
def getAllSeries(self, active=True):
def getAllSeries(self, active=True): # pylint: disable=unused-argument
"""
Returns a list of all DistroSeries objects.
"""
@ -328,7 +331,7 @@ class DistroSeries(BaseWrapper):
resource_type = "distro_series"
def __init__(self, *args):
def __init__(self, *args): # pylint: disable=unused-argument
if "_architectures" not in self.__dict__:
self._architectures = dict()
@ -372,7 +375,7 @@ class PackageUpload(BaseWrapper):
resource_type = "package_upload"
def __init__(self, *args):
def __init__(self, *args): # pylint: disable=unused-argument
self._custom_urls = None
self._source_urls = None
self._binary_urls = None
@ -431,7 +434,7 @@ class Archive(BaseWrapper):
resource_type = "archive"
def __init__(self, *args):
def __init__(self, *args): # pylint: disable=unused-argument
self._binpkgs = {}
self._srcpkgs = {}
self._pkg_uploaders = {}
@ -757,7 +760,7 @@ class Archive(BaseWrapper):
immediately if the copy passes basic security checks and the copy
will happen sometime later with full checking.
"""
# pylint: disable=protected-access
if isinstance(sponsored, PersonTeam):
sponsored = sponsored._lpobject
@ -812,6 +815,7 @@ class Archive(BaseWrapper):
self._pkgset_uploaders[key] = sorted(
set(
PersonTeam(permission.person_link)
# pylint: disable=protected-access
for permission in self._lpobject.getUploadersForPackageset(
packageset=packageset._lpobject, direct_permissions=direct_permissions
)
@ -838,7 +842,7 @@ class SourcePackagePublishingHistory(BaseWrapper):
resource_type = "source_package_publishing_history"
def __init__(self, *args):
def __init__(self, *args): # pylint: disable=unused-argument
self._archive = None
self._changelog = None
self._binaries = {}
@ -1158,7 +1162,7 @@ class BinaryPackagePublishingHistory(BaseWrapper):
resource_type = "binary_package_publishing_history"
def __init__(self, *args):
def __init__(self, *args): # pylint: disable=unused-argument
self._arch = None
self._ext = None
self._binary_urls = None
@ -1386,7 +1390,7 @@ class PersonTeam(BaseWrapper, metaclass=MetaPersonTeam):
resource_type = ("person", "team")
def __init__(self, *args):
def __init__(self, *args): # pylint: disable=unused-argument
# Don't share _upload between different PersonTeams
self._ppas = None
if "_upload" not in self.__dict__:
@ -1482,7 +1486,7 @@ class Project(BaseWrapper):
resource_type = "project"
def __init__(self, *args):
def __init__(self, *args): # pylint: disable=unused-argument
self._series = None
@property
@ -1580,7 +1584,7 @@ class Packageset(BaseWrapper):
if key not in cls._source_sets:
params = {"sourcepackagename": sourcepackagename, "direct_inclusion": direct_inclusion}
if distroseries is not None:
params["distroseries"] = distroseries._lpobject
params["distroseries"] = distroseries._lpobject # pylint: disable=protected-access
cls._source_sets[key] = [
Packageset(packageset)

View File

@ -1,40 +1,26 @@
class PackageNotFoundException(BaseException):
"""Thrown when a package is not found"""
pass
class SeriesNotFoundException(BaseException):
"""Thrown when a distroseries is not found"""
pass
class PocketDoesNotExistError(Exception):
"""Raised when a invalid pocket is used."""
pass
class ArchiveNotFoundException(BaseException):
"""Thrown when an archive for a distibution is not found"""
pass
class AlreadyLoggedInError(Exception):
"""Raised when a second login is attempted."""
pass
class ArchSeriesNotFoundException(BaseException):
"""Thrown when a distroarchseries is not found."""
pass
class InvalidDistroValueError(ValueError):
"""Thrown when distro value is invalid"""
pass

View File

@ -57,12 +57,10 @@ _SYSTEM_DISTRIBUTION_CHAIN: list[str] = []
class DownloadError(Exception):
"Unable to pull a source package"
pass
class NotFoundError(DownloadError):
"Source package not found"
pass
def system_distribution_chain():
@ -74,7 +72,6 @@ def system_distribution_chain():
the distribution chain can't be determined, print an error message
and return an empty list.
"""
global _SYSTEM_DISTRIBUTION_CHAIN
if len(_SYSTEM_DISTRIBUTION_CHAIN) == 0:
try:
vendor = check_output(("dpkg-vendor", "--query", "Vendor"), encoding="utf-8").strip()
@ -144,7 +141,7 @@ def readlist(filename, uniq=True):
Logger.error("File %s does not exist.", path)
return False
content = path.read_text().replace("\n", " ").replace(",", " ")
content = path.read_text(encoding="utf-8").replace("\n", " ").replace(",", " ")
if not content.strip():
Logger.error("File %s is empty.", path)
@ -220,7 +217,7 @@ def codename_to_distribution(codename):
return distro
def verify_file_checksums(pathname, checksums={}, size=0):
def verify_file_checksums(pathname, checksums=None, size=0):
"""verify checksums of file
Any failure will log an error.
@ -234,6 +231,8 @@ def verify_file_checksums(pathname, checksums={}, size=0):
Returns True if all checks pass, False otherwise
"""
if checksums is None:
checksums = {}
path = Path(pathname)
if not path.is_file():
@ -348,9 +347,10 @@ def download(src, dst, size=0, *, blocksize=DOWNLOAD_BLOCKSIZE_DEFAULT):
with tempfile.TemporaryDirectory() as tmpdir:
tmpdst = Path(tmpdir) / "dst"
try:
with requests.get(src, stream=True, auth=auth) as fsrc, tmpdst.open("wb") as fdst:
fsrc.raise_for_status()
_download(fsrc, fdst, size, blocksize=blocksize)
with requests.get(src, stream=True, timeout=60, auth=auth) as fsrc:
with tmpdst.open("wb") as fdst:
fsrc.raise_for_status()
_download(fsrc, fdst, size, blocksize=blocksize)
except requests.exceptions.HTTPError as error:
if error.response is not None and error.response.status_code == 404:
raise NotFoundError(f"URL {src} not found: {error}") from error
@ -426,7 +426,7 @@ def _download(fsrc, fdst, size, *, blocksize):
if show_progress:
try:
terminal_width = os.get_terminal_size(sys.stderr.fileno()).columns
except Exception as e:
except Exception as e: # pylint: disable=broad-except
Logger.error("Error finding stderr width, suppressing progress bar: %s", e)
progress_bar = _StderrProgressBar(max_width=terminal_width)

View File

@ -87,8 +87,6 @@ VALID_DISTROS = DISTRO_PKG_CLASS.keys()
class InvalidPullValueError(ValueError):
"""Thrown when --pull value is invalid"""
pass
class PullPkg(object):
"""Class used to pull file(s) associated with a specific package"""
@ -124,13 +122,14 @@ class PullPkg(object):
logger.error(str(error))
sys.exit(errno.ENOENT)
def __init__(self, *args, **kwargs):
def __init__(self, *args, **kwargs): # pylint: disable=unused-argument
self._default_pull = kwargs.get("pull")
self._default_distro = kwargs.get("distro")
self._default_arch = kwargs.get("arch", host_architecture())
def parse_args(self, args):
args = args[:]
if args is None:
args = sys.argv[1:]
help_default_pull = "What to pull: " + ", ".join(VALID_PULLS)
if self._default_pull:
@ -403,7 +402,7 @@ class PullPkg(object):
return params
def pull(self, args=sys.argv[1:]):
def pull(self, args=None):
"""Pull (download) specified package file(s)"""
options = self.parse_args(args)
@ -504,7 +503,7 @@ class PullPkg(object):
status=None,
download_only=None,
**kwargs,
):
): # pylint: disable=unused-argument
if not series:
Logger.error("Using --upload-queue requires specifying series")
return
@ -620,7 +619,11 @@ class PullPkg(object):
cmd = ["dpkg-source", "-x", dscfile.name]
Logger.debug(" ".join(cmd))
result = subprocess.run(
cmd, encoding="utf-8", stdout=subprocess.PIPE, stderr=subprocess.STDOUT
cmd,
check=False,
encoding="utf-8",
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
if result.returncode != 0:
Logger.error("Source unpack failed.")

View File

@ -138,7 +138,7 @@ def get_ubuntu_delta_changelog(srcpkg):
# Native sync
break
try:
response, body = Http().request(changes_url)
response = Http().request(changes_url)[0]
except HttpLib2Error as e:
Logger.error(str(e))
break
@ -172,6 +172,7 @@ def post_bug(srcpkg, subscribe, status, bugtitle, bugtext):
confirmation_prompt()
if srcpkg:
# pylint: disable=protected-access
bug_target = DistributionSourcePackage(
"%subuntu/+source/%s" % (Launchpad._root_uri, srcpkg)
)

View File

@ -340,7 +340,7 @@ class SourcePackage(object):
if not Logger.isEnabledFor(logging.DEBUG):
cmd.insert(1, "-q")
Logger.debug("%s > %s", " ".join(cmd), self._debdiff_filename)
with open(self._debdiff_filename, "w") as debdiff_file:
with open(self._debdiff_filename, "w", encoding="utf-8") as debdiff_file:
debdiff = subprocess.run(cmd, check=False, stdout=debdiff_file)
assert debdiff.returncode in (0, 1)
@ -352,7 +352,7 @@ class SourcePackage(object):
"""
assert os.path.isfile(self._changes_file), "%s does not exist." % (self._changes_file)
changes = debian.deb822.Changes(open(self._changes_file))
changes = debian.deb822.Changes(open(self._changes_file, encoding="utf-8"))
fixed_bugs = []
if "Launchpad-Bugs-Fixed" in changes:
fixed_bugs = changes["Launchpad-Bugs-Fixed"].split(" ")
@ -389,7 +389,9 @@ class SourcePackage(object):
# Check the changelog
self._changelog = debian.changelog.Changelog()
try:
self._changelog.parse_changelog(open("debian/changelog"), max_blocks=1, strict=True)
self._changelog.parse_changelog(
open("debian/changelog", encoding="utf-8"), max_blocks=1, strict=True
)
except debian.changelog.ChangelogParseError as error:
Logger.error("The changelog entry doesn't validate: %s", str(error))
ask_for_manual_fixing()
@ -438,7 +440,7 @@ class SourcePackage(object):
report = subprocess.check_output(cmd, encoding="utf-8")
# write lintian report file
lintian_file = open(lintian_filename, "w")
lintian_file = open(lintian_filename, "w", encoding="utf-8")
lintian_file.writelines(report)
lintian_file.close()

View File

@ -27,7 +27,8 @@ from ubuntutools.config import UDTConfig, ubu_email
class ConfigTestCase(unittest.TestCase):
_config_files = {"system": "", "user": ""}
def _fake_open(self, filename, mode="r"):
def _fake_open(self, filename, mode="r", encoding=None):
self.assertTrue(encoding, f"encoding for {filename} not specified")
if mode != "r":
raise IOError("Read only fake-file")
files = {
@ -219,7 +220,7 @@ class UbuEmailTestCase(unittest.TestCase):
self.assertEqual(os.environ["DEBEMAIL"], orig)
def test_unicode_name(self):
encoding = locale.getdefaultlocale()[1]
encoding = locale.getlocale()[1]
if not encoding:
encoding = "utf-8"
name = "Jöe Déveloper"

View File

@ -207,8 +207,9 @@ class UpdateMaintainerTestCase(unittest.TestCase):
directory == self._directory and base in self._files and self._files[base] is not None
)
def _fake_open(self, filename, mode="r"):
def _fake_open(self, filename, mode="r", encoding=None):
"""Provide StringIO objects instead of real files."""
self.assertTrue(encoding, f"encoding for {filename} not specified")
directory, base = os.path.split(filename)
if (
directory != self._directory

View File

@ -43,7 +43,7 @@ class Control(object):
def __init__(self, filename):
assert os.path.isfile(filename), "%s does not exist." % (filename)
self._filename = filename
self._content = open(filename).read()
self._content = open(filename, encoding="utf-8").read()
def get_maintainer(self):
"""Returns the value of the Maintainer field."""
@ -65,7 +65,7 @@ class Control(object):
"""Saves the control file."""
if filename:
self._filename = filename
control_file = open(self._filename, "w")
control_file = open(self._filename, "w", encoding="utf-8")
control_file.write(self._content)
control_file.close()
@ -94,7 +94,9 @@ class Control(object):
def _get_distribution(changelog_file):
"""get distribution of latest changelog entry"""
changelog = debian.changelog.Changelog(open(changelog_file), strict=False, max_blocks=1)
changelog = debian.changelog.Changelog(
open(changelog_file, encoding="utf-8"), strict=False, max_blocks=1
)
distribution = changelog.distributions.split()[0]
# Strip things like "-proposed-updates" or "-security" from distribution
return distribution.split("-", 1)[0]
@ -119,7 +121,10 @@ def _find_files(debian_directory, verbose):
# If the rules file accounts for XSBC-Original-Maintainer, we should not
# touch it in this package (e.g. the python package).
rules_file = os.path.join(debian_directory, "rules")
if os.path.isfile(rules_file) and "XSBC-Original-" in open(rules_file).read():
if (
os.path.isfile(rules_file)
and "XSBC-Original-" in open(rules_file, encoding="utf-8").read()
):
if verbose:
print("XSBC-Original is managed by 'rules' file. Doing nothing.")
control_files = []
@ -189,7 +194,7 @@ def update_maintainer(debian_directory, verbose=False):
def restore_maintainer(debian_directory, verbose=False):
"""Restore the original maintainer"""
try:
changelog_file, control_files = _find_files(debian_directory, verbose)
control_files = _find_files(debian_directory, verbose)[1]
except MaintainerUpdateException as e:
Logger.error(str(e))
raise