Add 'ubuntu-archive-assistant' tool for proposed-migration / mir review

This commit is contained in:
Mathieu Trudel-Lapierre 2018-09-20 09:05:01 +02:00
parent c5bc971f17
commit 5ebd1eaa8d
13 changed files with 1753 additions and 0 deletions

111
tests/test_command.py Normal file
View File

@ -0,0 +1,111 @@
#!/usr/bin/python3
# -*- coding: utf-8 -*-
# Copyright (C) 2018 Canonical Ltd.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; version 3 of the License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import sys
import unittest
from ubuntu_archive_assistant.command import AssistantCommand
# class AssistantCommand():
# def print_usage(self):
# def _add_subparser_from_class(self, name, commandclass):
# def _import_subcommands(self, submodules):
scratch = 0
class MyMainCommand(AssistantCommand):
def __init__(self):
super().__init__(command_id="test", description="test", leaf=False)
class MySubCommand(AssistantCommand):
def __init__(self):
super().__init__(command_id="subtest", description="subtest", leaf=False)
def update(self, args):
super().update(args)
self._args.append("extra")
def do_nothing():
return
def do_something():
global scratch
scratch = 1337
def do_crash():
raise Exception("unexpected")
class TestCommand(unittest.TestCase):
def test_update_args(self):
main = MyMainCommand()
sub = MySubCommand()
sub._args = ['toto', 'tata']
main.commandclass = sub
main.func = do_nothing
self.assertNotIn('titi', sub._args)
main.update(['titi', 'tutu'])
main.run_command()
self.assertIn('titi', main._args)
self.assertIn('titi', sub._args)
def test_parse_args(self):
main = MyMainCommand()
main._args = [ '--debug', 'help' ]
main.subcommand = do_nothing
main.parse_args()
self.assertNotIn('help', main._args)
self.assertNotIn('--debug', main._args)
self.assertTrue(main.debug)
def test_run_command_with_commandclass(self):
main = MyMainCommand()
sub = MySubCommand()
main._args = ['unknown_arg']
main.commandclass = sub
main.func = do_nothing
self.assertEqual(None, sub._args)
main.run_command()
self.assertIn('extra', sub._args)
def test_run_command(self):
main = MyMainCommand()
sub = MySubCommand()
main.func = do_something
self.assertEqual(None, sub._args)
main.run_command()
self.assertEqual(1337, scratch)
def test_run_command_crashing(self):
main = MyMainCommand()
sub = MySubCommand()
main.func = do_crash
try:
main.run_command()
self.fail("Did not crash as expected")
except Exception as e:
self.assertIn('unexpected', e.args)

28
ubuntu-archive-assistant Executable file
View File

@ -0,0 +1,28 @@
#!/usr/bin/python3
# -*- coding: utf-8 -*-
# Copyright (C) 2018 Canonical Ltd.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; version 3 of the License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import signal
import sys
from ubuntu_archive_assistant.core import Assistant
assistant = Assistant()
def signal_handler(signal, frame):
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
assistant.main()

View File

@ -0,0 +1,16 @@
#!/usr/bin/python3
# -*- coding: utf-8 -*-
# Copyright (C) 2018 Canonical Ltd.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; version 3 of the License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

View File

@ -0,0 +1,114 @@
#!/usr/bin/python3
# -*- coding: utf-8 -*-
# Copyright (C) 2018 Canonical Ltd.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; version 3 of the License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import sys
import os
import argparse
import subprocess
import logging
from ubuntu_archive_assistant.logging import AssistantLogger, AssistantTaskLogger
class AssistantCommand(argparse.Namespace):
def __init__(self, command_id, description, logger=None, leaf=True, testing=False):
self.command_id = command_id
self.description = description
self.leaf_command = leaf
self.testing = testing
self._args = None
self.debug = False
self.cache_path = None
self.commandclass = None
self.subcommands = {}
self.subcommand = None
self.func = None
self.logger = AssistantLogger(module=command_id)
self.log = self.logger.log
self.task_logger = self.logger
self.review = self.task_logger.review
self.parser = argparse.ArgumentParser(prog="%s %s" % (sys.argv[0], command_id),
description=description,
add_help=True)
self.parser.add_argument('--debug', action='store_true',
help='Enable debug messages')
self.parser.add_argument('--verbose', action='store_true',
help='Enable debug messages')
if not leaf:
self.subparsers = self.parser.add_subparsers(title='Available commands',
metavar='', dest='subcommand')
p_help = self.subparsers.add_parser('help',
description='Show this help message',
help='Show this help message')
p_help.set_defaults(func=self.print_usage)
def update(self, args):
self._args = args
def parse_args(self):
ns, self._args = self.parser.parse_known_args(args=self._args, namespace=self)
if self.debug:
self.logger.setLevel(logging.DEBUG)
self.logger.setReviewLevel(logging.DEBUG)
if self.verbose:
self.logger.setReviewLevel(logging.INFO)
if not self.subcommand and not self.leaf_command:
print('You need to specify a command', file=sys.stderr)
self.print_usage()
def run_command(self):
if self.commandclass:
self.commandclass.update(self._args)
if self.leaf_command and 'help' in self._args:
self.print_usage()
self.func()
def print_usage(self):
self.parser.print_help(file=sys.stderr)
sys.exit(os.EX_USAGE)
def _add_subparser_from_class(self, name, commandclass):
instance = commandclass(self.logger)
self.subcommands[name] = {}
self.subcommands[name]['class'] = name
self.subcommands[name]['instance'] = instance
if instance.testing:
if not os.environ.get('ENABLE_TEST_COMMANDS', None):
return
p = self.subparsers.add_parser(instance.command_id,
description=instance.description,
help=instance.description,
add_help=False)
p.set_defaults(func=instance.run, commandclass=instance)
self.subcommands[name]['parser'] = p
def _import_subcommands(self, submodules):
import inspect
for name, obj in inspect.getmembers(submodules):
if inspect.isclass(obj) and issubclass(obj, AssistantCommand):
self._add_subparser_from_class(name, obj)

View File

@ -0,0 +1,24 @@
#!/usr/bin/python3
# -*- coding: utf-8 -*-
# Copyright (C) 2018 Canonical Ltd.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; version 3 of the License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from ubuntu_archive_assistant.commands.proposed_migration import ProposedMigration
from ubuntu_archive_assistant.commands.mir import MIRReview
__all__ = [
'ProposedMigration',
'MIRReview',
]

View File

@ -0,0 +1,201 @@
#!/usr/bin/python3
# -*- coding: utf-8 -*-
# Copyright (C) 2018 Canonical Ltd.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; version 3 of the License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import os
import sys
import time
import subprocess
import tempfile
import argparse
import requests
import logging
from ubuntu_archive_assistant.command import AssistantCommand
from ubuntu_archive_assistant.utils import urlhandling, launchpad, bugtools
from ubuntu_archive_assistant.logging import ReviewResult, AssistantTaskLogger
class MIRReview(AssistantCommand):
def __init__(self, logger):
super().__init__(command_id='mir',
description='Review Main Inclusion Requests',
logger=logger,
leaf=True)
def run(self):
self.parser.add_argument('-b', '--bug', dest='bug',
help='the MIR bug to evaluate')
self.parser.add_argument('-s', '--source', dest='source',
help='the MIR bug to evaluate')
self.parser.add_argument('--skip-review', action="store_true",
help='skip dropping to a subshell for code review')
self.parser.add_argument('--unprocessed', action="store_true",
default=False,
help='show MIRs accepted but not yet processed')
self.func = self.mir_review
self.parse_args()
self.run_command()
def mir_review(self):
lp = launchpad.LaunchpadInstance()
self.mir_team = lp.lp.people["ubuntu-mir"]
if not self.source and not self.bug:
self.log.debug("showing MIR report. show unprocessed=%s" % self.unprocessed)
bugs = self.get_mir_bugs(show_unprocessed=self.unprocessed)
sys.exit(0)
else:
completed_statuses = ("Won't Fix", "Invalid", "Fix Committed", "Fix Released")
if self.bug:
self.log.debug("show MIR by bug")
bug_no = int(self.bug)
bug = lp.lp.bugs[bug_no]
for bug_task in bug.bug_tasks:
if self.source:
if self.source != bug_task.target.name:
continue
if bug_task.status in completed_statuses:
print("MIR for %s is %s\n" % (bug_task.target.name,
bug_task.status))
continue
self.process(bug_task.target, bug_task)
else:
self.log.debug("show MIR by source")
source_pkg = self.get_source_package(self.source)
mir_bug = source_pkg.searchTasks(omit_duplicates=True,
bug_subscriber=self.mir_team,
order_by="id")[0]
self.process(source_pkg, mir_bug)
def get_source_package(self, binary):
lp = launchpad.LaunchpadInstance()
cache_name = None
name = None
source_pkg = lp.ubuntu.getSourcePackage(name=binary)
if source_pkg:
return source_pkg
try:
cache_name = subprocess.check_output(
"apt-cache show %s | grep Source:" % binary,
shell=True, universal_newlines=True)
except subprocess.CalledProcessError as e:
cache_name = subprocess.check_output(
"apt-cache show %s | grep Package:" % binary,
shell=True, universal_newlines=True)
if cache_name is not None:
if source.startswith("Source:") or source.startswith("Package:"):
name = source.split()[1]
if name:
source_pkg = lp.ubuntu.getSourcePackage(name=name)
return source_pkg
def lp_build_logs(self, source):
lp = launchpad.LaunchpadInstance()
archive = lp.ubuntu_archive()
spph = archive.getPublishedSources(exact_match=True,
source_name=source,
distro_series=lp.current_series(),
pocket="Release",
order_by_date=True)
builds = spph[0].getBuilds()
for build in builds:
if "Successfully" not in build.buildstate:
print("%s has failed to build" % build.arch_tag)
print(build.build_log_url)
def process(self, source_pkg, task=None):
lp = launchpad.LaunchpadInstance()
source_name = source_pkg.name
print("== MIR report for source package '%s' ==" % source_name)
print("\n=== Details ===")
print("LP: %s" % source_pkg.web_link)
if task and task.bug:
print("MIR bug: %s\n" % task.bug.web_link)
print(task.bug.description)
print("\n\n=== MIR assessment ===")
latest = lp.ubuntu_archive().getPublishedSources(exact_match=True,
source_name=source_name,
distro_series=lp.current_series())[0]
if not source_pkg:
print("\n%s does not exist in Ubuntu")
sys.exit(1)
if latest.pocket is "Proposed":
print("\nThere is a version of %s in -proposed: %s" % (source, latest.source_package_version))
if task:
if task.assignee:
print("MIR for %s is assigned to %s (%s)" % (task.target.display_name,
task.assignee.display_name,
task.status))
else:
print("MIR for %s is %s" % (task.target.display_name,
task.status))
print("\nPackage bug subscribers:")
for sub in source_pkg.getSubscriptions():
sub_text = " - %s" % sub.subscriber.display_name
if sub.subscribed_by:
sub_text += ", subscribed by %s" % sub.subscribed_by.display_name
print(sub_text)
print("\nBuild logs:")
self.lp_build_logs(source_name)
if not self.skip_review:
self.open_source_tmpdir(source_name)
def get_mir_bugs(self, show_unprocessed=False):
bug_statuses = ("New", "Incomplete", "Confirmed", "Triaged",
"In Progress")
def only_ubuntu(task):
if 'ubuntu/+source' not in task.target_link:
return True
return False
if show_unprocessed:
unprocessed = self.mir_team.searchTasks(omit_duplicates=True, bug_subscriber=self.mir_team, status="Fix Committed")
if any(unprocessed):
print("== Open MIRs reviewed but not processed ==")
bugtools.list_bugs(print, unprocessed, filter=only_ubuntu, file=sys.stderr)
tasks = self.mir_team.searchTasks(omit_duplicates=True, bug_subscriber=self.mir_team, status=bug_statuses)
bugtools.list_bugs(print, tasks, filter=only_ubuntu, file=sys.stderr)
result = None
return result
def open_source_tmpdir(self, source_name):
print("\nDropping to a shell for code review:\n")
with tempfile.TemporaryDirectory() as temp_dir:
os.system('cd %s; pull-lp-source %s; bash -l' % (temp_dir, source_name))

View File

@ -0,0 +1,835 @@
#!/usr/bin/python3
# -*- coding: utf-8 -*-
# Copyright (C) 2018 Canonical Ltd.
# Author: Mathieu Trudel-Lapierre <mathieu.trudel-lapierre@canonical.com>
# Author: Łukasz 'sil2100' Zemczak <lukasz.zemczak@canonical.com>
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; version 3 of the License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""Analyze britney's excuses output and suggest a course of action for
proposed migration.
"""
# FIXME: Various parts of slangasek's pseudocode (in comments where relevant)
# are not well implemented.
import yaml
import os
import re
import sys
import time
import math
import subprocess
import argparse
import tempfile
import logging
from contextlib import ExitStack
from enum import Enum
from collections import defaultdict
from ubuntu_archive_assistant.command import AssistantCommand
from ubuntu_archive_assistant.utils import urlhandling, launchpad
from ubuntu_archive_assistant.logging import ReviewResult, ReviewResultAdapter, AssistantTaskLogger
HINTS_BRANCH = 'lp:~ubuntu-release/britney/hints-ubuntu'
DEBIAN_CURRENT_SERIES = 'sid'
ARCHIVE_PAGES = 'https://people.canonical.com/~ubuntu-archive/'
LAUNCHPAD_URL = 'https://launchpad.net'
AUTOPKGTEST_URL = 'http://autopkgtest.ubuntu.com'
MAX_CACHE_AGE = 14400 # excuses cache should not be older than 4 hours
class ProposedMigration(AssistantCommand):
def __init__(self, logger):
super().__init__(command_id='proposed',
description='Assess next work required for a package\'s proposed migration',
logger=logger,
leaf=True)
self.excuses = {}
self.seen = []
def run(self):
self.parser.add_argument('-s', '--source', dest='source_name',
help='the package to evaluate')
self.parser.add_argument('--no-cache', dest='do_not_cache', action='store_const',
const=True, default=False,
help='Do not cache excuses')
self.parser.add_argument('--refresh', action='store_const',
const=True, default=False,
help='Force refresh of cached excuses')
self.func = self.proposed_migration
self.parse_args()
self.run_command()
def proposed_migration(self):
refresh_due = False
with ExitStack() as resources:
if self.do_not_cache:
fp = resources.enter_context(tempfile.NamedTemporaryFile())
self.cache_path = resources.enter_context(
tempfile.TemporaryDirectory())
refresh_due = True
else:
xdg_cache = os.getenv('XDG_CACHE_HOME', '~/.cache')
self.cache_path = os.path.expanduser(
os.path.join(xdg_cache, 'ubuntu-archive-assistant', 'proposed-migration'))
excuses_path = os.path.join(self.cache_path, 'excuses.yaml')
if os.path.exists(self.cache_path):
if not os.path.isdir(self.cache_path):
print("The {} cache directory is not a directory, please "
"resolve manually and re-run.".format(self.cache_path))
exit(1)
else:
os.makedirs(self.cache_path)
try:
fp = open(excuses_path, 'r')
except FileNotFoundError:
refresh_due = True
pass
finally:
fp = open(excuses_path, 'a+')
file_state = os.stat(excuses_path)
mtime = file_state.st_mtime
now = time.time()
if (now - mtime) > MAX_CACHE_AGE:
refresh_due = True
if self.refresh or refresh_due:
excuses_url = ARCHIVE_PAGES + 'proposed-migration/update_excuses.yaml'
urlhandling.get_with_progress(url=excuses_url, filename=fp.name)
fp.seek(0)
# Use the C implementation of the SafeLoader, it's noticeably faster, and
# here we're dealing with large input files.
self.excuses = yaml.load(fp, Loader=yaml.CSafeLoader)
if self.source_name is None:
print("No source package name was provided. The following packages are "
"blocked in proposed:\n")
self.source_name = self.choose_blocked_source(self.excuses)
self.find_excuses(self.source_name, 0)
def get_debian_ci_results(self, source_name, arch):
try:
url = "https://ci.debian.net/data/packages/unstable/{}/{}/latest.json"
results_url = url.format("amd64", self.get_pkg_archive_path(source_name))
resp = urlhandling.get(url=results_url)
return resp.json()
except Exception:
return None
def find_excuses(self, source_name, level):
if source_name in self.seen:
return
for excuses_item in self.excuses['sources']:
item_name = excuses_item.get('item-name')
if item_name == source_name:
self.selected = excuses_item
self.process(level)
def get_pkg_archive_path(self, package):
try:
# TODO: refactor to avoid shell=True
path = subprocess.check_output(
"apt-cache show %s | grep Filename:" % package,
shell=True, universal_newlines=True)
path = path.split(' ')[1].split('/')
path = "/".join(path[2:4])
return path
except Exception:
return None
def get_source_package(self, binary_name):
cache_output = None
# TODO: refactor to avoid shell=True
try:
cache_output = subprocess.check_output(
"apt-cache show %s | grep Source:" % binary_name,
shell=True, universal_newlines=True)
except subprocess.CalledProcessError:
cache_output = subprocess.check_output(
"apt-cache show %s | grep Package:" % binary_name,
shell=True, universal_newlines=True)
if cache_output is not None:
if cache_output.startswith("Source:") or cache_output.startswith("Package:"):
source_name = cache_output.split()[1]
return source_name
return None
def package_in_distro(self, package, distro='ubuntu', distroseries='bionic',
proposed=False):
# TODO: This operation is pretty costly, do caching?
if distro == 'debian':
distroseries = DEBIAN_CURRENT_SERIES
if proposed:
distroseries += "-proposed"
madison_url = "https://qa.debian.org/cgi-bin/madison.cgi"
params = "?package={}&table={}&a=&c=&s={}".format(package,
distro,
distroseries)
url = madison_url + params
resp = urlhandling.get(url=url)
package_found = {}
for line in resp.text.split('\n'):
if " {} ".format(package) not in line:
continue
package_line = line.split(' | ')
series_component = package_line[2].split('/')
component = 'main'
if len(series_component) > 1:
component = series_component[1]
if '{}'.format(distroseries) in series_component[0]:
if distro == 'ubuntu':
package_found = {
'version': package_line[1],
'component': component,
}
else:
package_found = {
'version': package_line[1],
}
return package_found
return {}
def process_lp_build_results(self, level, uploads, failed):
logger = AssistantTaskLogger("lp_builds", self.task_logger)
assistant = logger.newTask("lp_builds", level + 1)
lp = launchpad.LaunchpadInstance()
archive = lp.ubuntu_archive()
series = lp.current_series()
source_name = self.selected.get('source')
spph = archive.getPublishedSources(exact_match=True,
source_name=source_name,
distro_series=series,
pocket="Proposed",
order_by_date=True)
new_version = series.getPackageUploads(archive=archive,
name=source_name,
version=self.selected.get('new-version'),
pocket="Proposed",
exact_match=True)
for item in new_version:
arch = item.display_arches.split(',')[0]
if item.package_version not in uploads:
uploads[item.package_version] = {}
if arch == 'source':
continue
uploads[item.package_version][arch] = item.getBinaryProperties()
# Only get the builds for the latest publication, this is more likely to
# be new source in -proposed, or the most recent upload.
builds = spph[0].getBuilds()
for build in builds:
missing_arches = set()
if "Successfully" not in build.buildstate:
failed[build.arch_tag] = {
'state': build.buildstate,
}
if self.logger.getReviewLevel() < logging.ERROR:
assistant.error("{} is missing a build on {}:".format(
source_name, build.arch_tag),
status=ReviewResult.FAIL)
log_url = build.build_log_url
if not log_url:
log_url = "<No build log available>"
assistant.warning("[%s] %s" % (build.buildstate,
log_url),
status=ReviewResult.NONE, depth=1)
if any(failed) and self.logger.getReviewLevel() >= logging.ERROR:
assistant.critical("Fix missing builds: {}".format(
", ".join(failed.keys())),
status=ReviewResult.NONE)
assistant.error("{}/ubuntu/+source/{}/{}".format(
LAUNCHPAD_URL,
spph[0].source_package_name,
spph[0].source_package_version),
status=ReviewResult.INFO, depth=1)
def check_mir_status(self, logger, target_package, level):
logger = AssistantTaskLogger("mir", logger)
assistant = logger.newTask("mir", level + 2)
# TODO: Check for MIR bug state
# - has the MIR been rejected?
# - upload or submit to sponsorship queue to drop the dependency
lp = launchpad.LaunchpadInstance()
source_name = self.get_source_package(target_package)
source_pkg = lp.ubuntu.getSourcePackage(name=source_name)
mir_tasks = source_pkg.searchTasks(bug_subscriber=lp.lp.people['ubuntu-mir'],
omit_duplicates=True)
if not mir_tasks:
assistant.error("Please open a MIR bug:",
status=ReviewResult.INFO)
assistant.error("{}/ubuntu/+source/{}/+filebug?field.title=%5bMIR%5d%20{}".format(
LAUNCHPAD_URL, source_name, source_name),
status=ReviewResult.NONE, depth=1)
last_bug_id = 0
for task in mir_tasks:
assigned_to = "unassigned"
if task.assignee:
assigned_to = "assigned to %s" % task.assignee.display_name
if task.bug.id != last_bug_id:
assistant.error("(LP: #%s) %s" % (task.bug.id, task.bug.title),
status=ReviewResult.INFO)
last_bug_id = task.bug.id
assistant.warning("%s (%s) in %s (%s)" % (task.status,
task.importance,
task.target.name,
assigned_to),
status=ReviewResult.NONE, depth=1)
if task.status in ("Won't Fix", "Invalid"):
assistant.error("This MIR has been rejected; please look into "
"dropping the dependency on {} from {}".format(
target_package, source_name),
status=ReviewResult.INFO, depth=1)
def process_unsatisfiable_depends(self, level):
logger = AssistantTaskLogger("unsatisfiable", self.task_logger)
assistant = logger.newTask("unsatisfiable", level + 1)
distroseries = launchpad.LaunchpadInstance().current_series().name
affected_sources = set()
unsatisfiable = defaultdict(set)
depends = self.selected.get('dependencies').get('unsatisfiable-dependencies', {})
for arch, signatures in depends.items():
for signature in signatures:
binary_name = signature.split(' ')[0]
unsatisfiable[signature].add(arch)
try:
pkg = self.get_source_package(binary_name)
affected_sources.add(pkg)
except Exception:
# FIXME: we might be dealing with a new package in proposed
# here, but using the binary name instead of the source
# name.
if any(self.package_in_distro(binary_name, distro='ubuntu',
distroseries=distroseries)):
affected_sources.add(binary_name)
elif any(self.package_in_distro(binary_name,
distro='ubuntu',
distroseries=distroseries,
proposed=True)):
affected_sources.add(binary_name)
if not affected_sources and not unsatisfiable:
return
logger.critical("Fix unsatisfiable dependencies in {}:".format(
self.selected.get('source')),
status=ReviewResult.NONE)
# TODO: Check version comparisons for removal requests/fixes
# - is the unsatisfied dependency due to a package dropped in Ubuntu,
# but not in Debian, which may come back as a sync later
# (i.e. not blacklisted)?
# - leave in -proposed
# - is this package Ubuntu-specific?
# - is there an open bug in launchpad about this issue, with no action?
# - subscribe ubuntu-archive and request the package's removal
# - else
# - open a bug report and assign to the package's maintainer
# - is the package in Debian, but the dependency is part of Ubuntu delta?
# - fix
possible_mir = set()
for signature, arches in unsatisfiable.items():
assistant = logger.newTask("unsatisfiable", level + 2)
depends = signature.split(' ')[0]
assistant.warning("{} can not be satisfied "
"on {}".format(signature, ", ".join(arches)),
status=ReviewResult.FAIL)
in_archive = self.package_in_distro(depends, distro='ubuntu',
distroseries=distroseries)
in_proposed = self.package_in_distro(depends, distro='ubuntu',
distroseries=distroseries,
proposed=True)
if any(in_archive) and not any(in_proposed):
assistant.info("{}/{} exists "
"in the Ubuntu primary archive".format(
depends,
in_archive.get('version')),
status=ReviewResult.FAIL, depth=1)
if self.selected.get('component', 'main') != in_archive.get('component'):
possible_mir.add(depends)
elif not any(in_archive) and any(in_proposed):
assistant.info("{} is only in -proposed".format(depends),
status=ReviewResult.FAIL, depth=1)
assistant.debug("Has this package been dropped in Ubuntu, "
"but not in Debian?",
status=ReviewResult.INFO, depth=2)
elif not any(in_archive) and not any(in_proposed):
in_debian = self.package_in_distro(depends, distro='debian',
distroseries=distroseries)
if any(in_debian):
assistant.warning("{} only exists in Debian".format(depends),
status=ReviewResult.FAIL, depth=1)
assistant.debug("Is this package blacklisted? Should it be synced?",
status=ReviewResult.INFO, depth=2)
else:
assistant.warning("{} is not found".format(depends),
status=ReviewResult.FAIL, depth=1)
assistant.debug("Has this package been removed?",
status=ReviewResult.INFO, depth=2)
else:
if self.selected.get('component', 'main') != in_archive.get('component'):
possible_mir.add(depends)
for p_mir in possible_mir:
self.check_mir_status(logger, p_mir, level)
if affected_sources:
for src_name in affected_sources:
self.find_excuses(src_name, level+2)
def process_autopkgtest(self, level):
logger = AssistantTaskLogger("autopkgtest", self.task_logger)
assistant = logger.newTask("autopkgtest", level + 1)
autopkgtests = self.selected.get('policy_info').get('autopkgtest')
assistant.critical("Fix autopkgtests triggered by this package for:",
status=ReviewResult.NONE)
waiting = 0
failed_tests = defaultdict(set)
for key, test in autopkgtests.items():
logger = AssistantTaskLogger(key, logger)
assistant = logger.newTask(key, level + 2)
for arch, arch_test in test.items():
if 'RUNNING' in arch_test:
waiting += 1
if 'REGRESSION' in arch_test:
assistant.warning("{} {} {}".format(key, arch, arch_test[2]),
status=ReviewResult.FAIL)
failed_tests[key].add(arch)
if arch == "amd64":
if '/' in key:
pkgname = key.split('/')[0]
else:
pkgname = key
ci_results = self.get_debian_ci_results(pkgname, "amd64")
if ci_results is not None:
result = ci_results.get('status')
status_ci = ReviewResult.FAIL
if result == 'pass':
status_ci = ReviewResult.PASS
assistant.warning("CI tests {} in Debian".format(
result),
status=status_ci, depth=1)
if 'pass' in result:
assistant.info("Consider filing a bug "
"(usertag: autopkgtest) "
"in Debian if none exist",
status=ReviewResult.INFO, depth=2)
else:
# TODO: (cyphermox) detect this case?
# check versions?
assistant.info("If synced from Debian and "
"requires sourceful changes to "
"the package, file a bug for "
"removal from -proposed",
status=ReviewResult.INFO, depth=2)
if waiting > 0:
assistant.error("{} tests are currently running "
"or waiting to be run".format(waiting),
status=ReviewResult.INFO)
else:
if self.logger.getReviewLevel() >= logging.ERROR:
for test, arches in failed_tests.items():
assistant.error("{}: {}".format(test, ", ".join(arches)),
status=ReviewResult.FAIL)
assistant.error("{}/packages/p/{}".format(AUTOPKGTEST_URL, test.split('/')[0]),
status=ReviewResult.INFO, depth=1)
def process_blocking(self, level):
assistant = self.task_logger.newTask("blocking", level + 1)
lp = launchpad.LaunchpadInstance().lp
bugs = self.selected.get('policy_info').get('block-bugs')
source_name = self.selected.get('source')
if bugs:
assistant.critical("Resolve blocking bugs:", status=ReviewResult.NONE)
for bug in bugs.keys():
lp_bug = lp.bugs[bug]
assistant.error("[LP: #{}] {} {}".format(lp_bug.id,
lp_bug.title,
lp_bug.web_link),
status=ReviewResult.NONE)
tasks = lp_bug.bug_tasks
for task in tasks:
value = ReviewResult.FAIL
if task.status in ('Fix Committed', 'Fix Released'):
value = ReviewResult.PASS
elif task.status in ("Won't Fix", 'Invalid'):
continue
assistant.warning("{}({}) in {}".format(
task.status,
task.importance,
task.bug_target_display_name),
status=value)
# guesstimate whether this is a removal request
if 'emove {}'.format(source_name) in lp_bug.title:
assistant.info("This looks like a removal request",
status=ReviewResult.INFO)
assistant.info("Consider pinging #ubuntu-release for processing",
status=ReviewResult.INFO)
hints = self.selected.get('hints')
if hints is not None:
hints_path = os.path.join(self.cache_path, 'hints-ubuntu')
self.get_latest_hints(hints_path)
assistant.critical("Update manual hinting (contact #ubuntu-release):",
status=ReviewResult.NONE)
hint_from = hints[0]
if hint_from == 'freeze':
assistant.error("Package blocked by freeze.")
else:
version = None
unblock_re = re.compile(r'^unblock {}\/(.*)$'.format(source_name))
files = [f for f in os.listdir(hints_path) if (os.path.isfile(
os.path.join(hints_path, f)) and f != 'freeze')]
for hints_file in files:
with open(os.path.join(hints_path, hints_file)) as fp:
print("Checking {}".format(os.path.join(hints_path, hints_file)))
for line in fp:
match = unblock_re.match(line)
if match:
version = match.group(1)
break
if version:
break
if version:
reason = \
("Unblock request by {} ignored due to version mismatch: "
"{}".format(hints_file, version))
else:
reason = "Missing unblock sequence in the hints file"
assistant.error(reason, status=ReviewResult.INFO)
def process_dependencies(self, source, level):
assistant = self.task_logger.newTask("dependencies", level + 1)
dependencies = source.get('dependencies')
blocked_by = dependencies.get('blocked-by', None)
migrate_after = dependencies.get('migrate-after', None)
if blocked_by or migrate_after:
assistant.critical("Clear outstanding promotion interdependencies:",
status=ReviewResult.NONE)
assistant = self.task_logger.newTask("dependencies", level + 2)
if migrate_after is not None:
assistant.error("{} will migrate after {}".format(
source.get('source'), ", ".join(migrate_after)),
status=ReviewResult.FAIL)
assistant.warning("Investigate what packages are conflicting, "
"by looking at 'Trying easy on autohinter' lines in "
"update_output.txt for {}".format(
source.get('source')),
status=ReviewResult.INFO, depth=1)
assistant.warning("See {}proposed-migration/update_output.txt".format(
ARCHIVE_PAGES),
status=ReviewResult.INFO, depth=2)
if blocked_by is not None:
assistant.error("{} is blocked by the migration of {}".format(
source.get('source'), ", ".join(blocked_by)),
status=ReviewResult.FAIL)
for blocker in blocked_by:
self.find_excuses(blocker, level+2)
def process_missing_builds(self, level):
logger = AssistantTaskLogger("missing_builds", self.task_logger)
assistant = logger.newTask("missing_builds", level + 1)
new_version = self.selected.get('new-version')
old_version = self.selected.get('old-version')
# TODO: Process missing builds; suggest options
#
# - missing build on $arch / has no binaries on any arch
# - is this an architecture-specific build failure?
# - has Debian removed the binaries for this architecture?
# - ask AA to remove the binaries as ANAIS
# - else
# - try to fix
#
# - is this a build failure on all archs?
# - are there bugs filed about this failure in Debian?
# - is the package in sync with Debian and does the package require
# sourceful changes to fix?
# - remove from -proposed
#
# - does the package fail to build in Debian?
# - file a bug in Debian
# - is the package in sync with Debian and does the package require
# sourceful changes to fix?
# - remove from -proposed
#
# - is this a dep-wait?
# - does this package have this build-dependency in Debian?
# - is this an architecture-specific dep-wait?
# - has Debian removed the binaries for this architecture?
# - ask AA to remove the binaries as ANAIS
# - else
# - try to fix
# - does this binary package exist in Debian?
# - look what source package provides this binary package in Debian
# - is this source package ftbfs or dep-wait in -proposed?
# - recurse
# - else
# - is this source package on the sync blacklist?
# - file a bug with the Ubuntu package
# - else
# - fix by syncing or merging the source
# - else
# - make sure a bug is filed in Debian about the issue
# - was the depended-on package removed from Debian,
# and is this a sync?
# - ask AA to remove the package from -proposed
# - else
# - leave the package in -proposed
uploads = {}
failed = {}
new = []
new_binaries = set()
self.process_lp_build_results(level, uploads, failed)
if new_version in uploads:
for arch, item in uploads[new_version].items():
for binary in item:
binary_name = binary.get('name')
new_binaries.add(binary_name)
if binary.get('is_new'):
new.append(binary)
if not any(failed):
assistant = logger.newTask("old_binaries", level + 1)
assistant.warning("No failed builds found", status=ReviewResult.PASS)
try:
missing_builds = self.selected.get('missing-builds')
missing_arches = missing_builds.get('on-architectures')
arch_o = []
for arch in missing_arches:
if arch not in uploads[new_version]:
arch_o.append("-a {}".format(arch))
if any(arch_o):
old_binaries = self.selected.get('old-binaries').get(old_version)
assistant.warning("This package has dropped support for "
"architectures it previous supported. ",
status=ReviewResult.INFO)
assistant.warning("Ask in #ubuntu-release for an Archive "
"Admin to run:",
status=ReviewResult.INFO)
assistant.info("remove-package %(arches)s -b %(bins)s"
% ({'arches': " ".join(arch_o),
'bins': " ".join(old_binaries),
}), status=ReviewResult.NONE, depth=1)
except AttributeError:
# Ignore a failure here, it just means we don't have
# missing-builds to process after all.
pass
if any(new):
assistant = logger.newTask("new", level + 1)
assistant.warning("This package has NEW binaries to process:",
status=ReviewResult.INFO)
for binary in new:
assistant.error("NEW: [{}] {}/{}".format(
binary.get('architecture'),
binary.get('name'),
binary.get('version')),
status=ReviewResult.FAIL, depth=1)
def process(self, level):
source_name = self.selected.get('source')
reasons = self.selected.get('reason')
self.seen.append(source_name)
self.task_logger = AssistantTaskLogger(source_name, self.task_logger)
assistant = self.task_logger.newTask(source_name, depth=level)
text_candidate = "not considered"
candidate = ReviewResult.FAIL
if self.selected.get('is-candidate'):
text_candidate = "a valid candidate"
candidate = ReviewResult.PASS
assistant.info("{} is {}".format(source_name, text_candidate),
status=candidate)
assistant.critical("Next steps for {} {}:".format(
source_name, self.selected.get('new-version')),
status=ReviewResult.NONE)
assistant.debug("reasons: {}".format(reasons), status=ReviewResult.NONE)
work_needed = False
missing_builds = self.selected.get('missing-builds')
if missing_builds is not None or 'no-binaries' in reasons:
work_needed = True
self.process_missing_builds(level)
if 'depends' in reasons:
work_needed = True
self.process_unsatisfiable_depends(level)
if 'block' in reasons:
work_needed = True
self.process_blocking(level)
if 'autopkgtest' in reasons:
work_needed = True
self.process_autopkgtest(level)
dependencies = self.selected.get('dependencies')
if dependencies is not None:
work_needed = True
self.process_dependencies(self.selected, level)
if work_needed is False:
assistant.error("Good job!", status=ReviewResult.PASS)
assistant.warning("Investigate if packages are conflicting, "
"by looking at 'Trying easy on autohinter' lines in "
"update_output.txt"
" for {}".format(source_name),
status=ReviewResult.INFO)
assistant.warning("See {}proposed-migration/update_output.txt".format(
ARCHIVE_PAGES),
status=ReviewResult.INFO)
def choose_blocked_source(self, excuses):
import pager
def pager_callback(pagenum):
prompt = "Page -%s-. Press any key for next page or Q to select a " \
"package." % pagenum
pager.echo(prompt)
if pager.getch() in [pager.ESC_, 'q', 'Q']:
return False
pager.echo('\r' + ' '*(len(prompt)) + '\r')
choice = 0
options = []
entry_list = []
sorted_excuses = sorted(
self.excuses['sources'],
key=lambda e: e.get('policy_info').get('age').get('current-age'),
reverse=True)
for src_num, item in enumerate(sorted_excuses, start=1):
item_name = item.get('item-name')
age = math.floor(
item.get('policy_info').get('age').get('current-age'))
options.append(item_name)
entry_list.append("({}) {} (Age: {} days)\n".format(
src_num, item_name, age))
while True:
pager.page(iter(entry_list), pager_callback)
num = input("\nWhich package do you want to look at? ")
try:
choice = int(num)
if choice > 0 and choice <= src_num:
break
except ValueError:
# num might be the package name.
if num in options:
return num
return options[choice - 1]
def get_latest_hints(self, path):
if os.path.exists(path):
try:
subprocess.check_call(
"bzr info %s" % path, shell=True, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
subprocess.check_call("bzr pull -d %s" % path, shell=True,
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
except subprocess.CalledProcessError:
print("The {} path either exists but doesn't seem to be a valid "
"branch or failed to update it properly.".format(
path))
exit(1)
else:
try:
subprocess.check_call(
"bzr branch %s %s" % (HINTS_BRANCH, path), shell=True,
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
except subprocess.CalledProcessError:
print("Could not access the hints-ubuntu bzr branch, exiting.")
exit(1)

View File

@ -0,0 +1,45 @@
#!/usr/bin/python3
# -*- coding: utf-8 -*-
# Copyright (C) 2018 Canonical Ltd.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; version 3 of the License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import os
import logging
from ubuntu_archive_assistant.command import AssistantCommand
import ubuntu_archive_assistant.logging as app_logging
logger = app_logging.AssistantLogger()
class Assistant(AssistantCommand):
def __init__(self):
super().__init__(command_id='',
description='archive assistant',
logger=logger,
leaf=False)
def parse_args(self):
import ubuntu_archive_assistant.commands
self._import_subcommands(ubuntu_archive_assistant.commands)
super().parse_args()
def main(self):
self.parse_args()
self.run_command()

View File

@ -0,0 +1,171 @@
#!/usr/bin/python3
# -*- coding: utf-8 -*-
# Copyright (C) 2018 Canonical Ltd.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; version 3 of the License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import os
import logging
from enum import Enum
class ReviewResult(Enum):
NONE = 1
PASS = 2
FAIL = 3
INFO = 4
class ReviewResultAdapter(logging.LoggerAdapter):
depth = 0
def process(self, msg, kwargs):
status = kwargs.pop('status')
depth = self.depth + kwargs.pop('depth', 0)
# FIXME: identationing may be ugly because of character width
if status is ReviewResult.PASS:
icon = "\033[92m✔\033[0m"
#icon = ""
elif status is ReviewResult.FAIL:
icon = "\033[91m✘\033[0m"
#icon = ""
elif status is ReviewResult.INFO:
icon = "\033[94m\033[0m"
#icon = ""
else:
icon = ""
if depth <= 0:
return '%s %s' % (msg, icon), kwargs
elif status is ReviewResult.INFO:
return '%s%s %s' % (" " * depth * 2, icon, msg), kwargs
else:
return '%s%s %s' % (" " * depth * 2, msg, icon), kwargs
def critical(self, msg, *args, **kwargs):
self.depth = self.extra['depth']
msg, kwargs = self.process(msg, kwargs)
self.logger.critical(msg, *args, **kwargs)
def error(self, msg, *args, **kwargs):
self.depth = self.extra['depth']
msg, kwargs = self.process(msg, kwargs)
self.logger.error(msg, *args, **kwargs)
def warning(self, msg, *args, **kwargs):
self.depth = self.extra['depth']
msg, kwargs = self.process(msg, kwargs)
self.logger.warning(msg, *args, **kwargs)
def info(self, msg, *args, **kwargs):
self.depth = self.extra['depth']
msg, kwargs = self.process(msg, kwargs)
self.logger.info(msg, *args, **kwargs)
def debug(self, msg, *args, **kwargs):
self.depth = self.extra['depth']
msg, kwargs = self.process("DEBUG<{}>: {}".format(self.name, msg), kwargs)
self.logger.debug(msg, *args, **kwargs)
class AssistantLogger(object):
class __AssistantLogger(object):
def __init__(self):
main_root_logger = logging.RootLogger(logging.INFO)
self.main_log_manager = logging.Manager(main_root_logger)
main_review_logger = logging.RootLogger(logging.ERROR)
self.review_log_manager = logging.Manager(main_review_logger)
fmt = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
main_handler = logging.StreamHandler()
review_handler = logging.StreamHandler()
main_handler.setFormatter(fmt)
main_root_logger.addHandler(main_handler)
main_review_logger.addHandler(review_handler)
instance = None
def __init__(self, module=None, depth=0):
if not AssistantLogger.instance:
AssistantLogger.instance = AssistantLogger.__AssistantLogger()
if not module:
self.log = AssistantLogger.instance.main_log_manager.getLogger('assistant')
self.review_logger = AssistantLogger.instance.review_log_manager.getLogger('review')
else:
self.log = AssistantLogger.instance.main_log_manager.getLogger('assistant.%s' % module)
self.review_logger = AssistantLogger.instance.review_log_manager.getLogger('review.%s' % module)
self.depth = depth
self.review = ReviewResultAdapter(self.review_logger, {'depth': self.depth})
def newTask(self, task, depth):
review_logger = AssistantLogger.instance.review_log_manager.getLogger("%s.%s" % (self.review.name, task))
return ReviewResultAdapter(review_logger, {'depth': self.depth})
def setLevel(self, level):
self.log.setLevel(level)
def setReviewLevel(self, level):
self.review.setLevel(level)
def getReviewLevel(self):
return self.review.getEffectiveLevel()
def getReviewLogger(self, name):
return AssistantLogger.instance.review_log_manager.getLogger(name)
class AssistantTask(object):
def __init__(self, task, parent=None):
self.parent = parent
self.log = parent.log
if isinstance(parent, AssistantLogger):
self.depth = 0
else:
self.depth = parent.depth + 1
class AssistantTaskLogger(AssistantTask):
def __init__(self, task, logger):
super().__init__(task, parent=logger)
#self.review = self.parent.newTask(task, logger.depth + 1)
def newTask(self, task, depth):
review_logger = self.parent.getReviewLogger("%s.%s" % (self.parent.review.name, task))
self.review = ReviewResultAdapter(review_logger, {'depth': depth})
return self.review
def getReviewLogger(self, name):
return self.parent.getReviewLogger(name)
def critical(self, msg, *args, **kwargs):
self.review.critical(msg, *args, **kwargs)
def error(self, msg, *args, **kwargs):
self.review.error(msg, *args, **kwargs)
def warning(self, msg, *args, **kwargs):
self.review.warning(msg, *args, **kwargs)
def info(self, msg, *args, **kwargs):
self.review.info(msg, *args, **kwargs)
def debug(self, msg, *args, **kwargs):
self.review.debug(msg, *args, **kwargs)

View File

@ -0,0 +1,16 @@
#!/usr/bin/python3
# -*- coding: utf-8 -*-
# Copyright (C) 2018 Canonical Ltd.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; version 3 of the License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

View File

@ -0,0 +1,82 @@
#!/usr/bin/python3
# -*- coding: utf-8 -*-
# Copyright (C) 2018 Canonical Ltd.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; version 3 of the License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from termcolor import colored
from ubuntu_archive_assistant.utils import launchpad
from ubuntu_archive_assistant.logging import AssistantLogger
def _colorize_status(status):
color = 'grey'
if status in ('In Progress', 'Fix Committed'):
color = 'green'
elif status in ('Incomplete'):
color = 'red'
else:
color = 'grey'
return colored(status, color)
def _colorize_priority(importance):
color = 'grey'
if importance in ('Critical'):
color = 'red'
elif importance in ('High', 'Medium'):
color = 'yellow'
elif importance in ('Low'):
color = 'green'
else:
color = 'grey'
return colored(importance, color)
def show_bug(print_func, bug, **kwargs):
print_func("(LP: #%s) %s" % (bug.id, bug.title),
**kwargs)
def show_task(print_func, task, show_bug_header=False, **kwargs):
assigned_to = "unassigned"
if task.assignee:
if task.assignee.name in ('ubuntu-security', 'canonical-security'):
a_color = 'red'
elif task.assignee.name in ('ubuntu-mir'):
a_color = 'blue'
else:
a_color = 'grey'
assignee = colored(task.assignee.display_name, a_color)
assigned_to = "assigned to %s" % assignee
if show_bug_header:
show_bug(print_func, task.bug, **kwargs)
print_func("\t%s (%s) in %s (%s)" % (_colorize_status(task.status),
_colorize_priority(task.importance),
task.target.name, assigned_to),
**kwargs)
def list_bugs(print_func, tasks, filter=None, **kwargs):
last_bug_id = 0
for task in tasks:
if filter is not None and filter(task):
continue
if task.bug.id != last_bug_id:
show_bug(print_func, task.bug, **kwargs)
last_bug_id = task.bug.id
show_task(print_func, task, show_bug_header=False, **kwargs)

View File

@ -0,0 +1,60 @@
#!/usr/bin/python3
# -*- coding: utf-8 -*-
# Copyright (C) 2018 Canonical Ltd.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; version 3 of the License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import os
from launchpadlib.launchpad import Launchpad
from ubuntu_archive_assistant.logging import AssistantLogger
class LaunchpadInstance(object):
class __LaunchpadInstance(object):
def __init__(self):
self.logger = AssistantLogger()
self.lp_cachedir = os.path.expanduser(os.path.join("~", ".launchpadlib/cache"))
self.logger.log.debug("Using Launchpad cache dir: \"%s\"" % self.lp_cachedir)
self.lp = Launchpad.login_with('ubuntu-archive-assisant',
service_root='production',
launchpadlib_dir=self.lp_cachedir,
version='devel')
instance = None
def __init__(self, module=None, depth=0):
if not LaunchpadInstance.instance:
LaunchpadInstance.instance = LaunchpadInstance.__LaunchpadInstance()
self.lp = LaunchpadInstance.instance.lp
self.ubuntu = self.lp.distributions['ubuntu']
def lp(self):
return self.lp
def ubuntu(self):
return self.ubuntu
def ubuntu_archive(self):
return self.ubuntu.main_archive
def current_series(self):
return self.ubuntu.current_series

View File

@ -0,0 +1,50 @@
#!/usr/bin/python3
# -*- coding: utf-8 -*-
# Copyright (C) 2018 Canonical Ltd.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; version 3 of the License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import os
import requests
from urllib.request import FancyURLopener
class URLRetrieverWithProgress(object):
def __init__(self, url, filename):
self.url = url
self.filename = filename
self.url_opener = FancyURLopener()
def get(self):
self.url_opener.retrieve(self.url, self.filename, self._report_download)
def _report_download(self, blocks_read, block_size, total_size):
size_read = blocks_read * block_size
percent = size_read/total_size*100
if percent <= 100:
print("Refreshing %s: %.0f %%" % (os.path.basename(self.filename), percent), end='\r')
else:
print(" " * 80, end='\r')
def get_with_progress(url=None, filename=None):
retriever = URLRetrieverWithProgress(url, filename)
response = retriever.get()
return response
def get(url=None):
response = requests.get(url=url)
return response