diff --git a/tests/test_command.py b/tests/test_command.py
new file mode 100644
index 0000000..f07b6e9
--- /dev/null
+++ b/tests/test_command.py
@@ -0,0 +1,111 @@
+# -*- coding: utf-8 -*-
+# Copyright (C) 2018 Canonical Ltd.
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; version 3 of the License.
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# GNU General Public License for more details.
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+import sys
+import unittest
+from ubuntu_archive_assistant.command import AssistantCommand
+# class AssistantCommand():
+# def print_usage(self):
+# def _add_subparser_from_class(self, name, commandclass):
+# def _import_subcommands(self, submodules):
+scratch = 0
+class MyMainCommand(AssistantCommand):
+ def __init__(self):
+ super().__init__(command_id="test", description="test", leaf=False)
+class MySubCommand(AssistantCommand):
+ def __init__(self):
+ super().__init__(command_id="subtest", description="subtest", leaf=False)
+ def update(self, args):
+ super().update(args)
+ self._args.append("extra")
+def do_nothing():
+ return
+def do_something():
+ global scratch
+ scratch = 1337
+def do_crash():
+ raise Exception("unexpected")
+class TestCommand(unittest.TestCase):
+ def test_update_args(self):
+ main = MyMainCommand()
+ sub = MySubCommand()
+ sub._args = ['toto', 'tata']
+ main.commandclass = sub
+ main.func = do_nothing
+ self.assertNotIn('titi', sub._args)
+ main.update(['titi', 'tutu'])
+ main.run_command()
+ self.assertIn('titi', main._args)
+ self.assertIn('titi', sub._args)
+ def test_parse_args(self):
+ main = MyMainCommand()
+ main._args = [ '--debug', 'help' ]
+ main.subcommand = do_nothing
+ main.parse_args()
+ self.assertNotIn('help', main._args)
+ self.assertNotIn('--debug', main._args)
+ self.assertTrue(main.debug)
+ def test_run_command_with_commandclass(self):
+ main = MyMainCommand()
+ sub = MySubCommand()
+ main._args = ['unknown_arg']
+ main.commandclass = sub
+ main.func = do_nothing
+ self.assertEqual(None, sub._args)
+ main.run_command()
+ self.assertIn('extra', sub._args)
+ def test_run_command(self):
+ main = MyMainCommand()
+ sub = MySubCommand()
+ main.func = do_something
+ self.assertEqual(None, sub._args)
+ main.run_command()
+ self.assertEqual(1337, scratch)
+ def test_run_command_crashing(self):
+ main = MyMainCommand()
+ sub = MySubCommand()
+ main.func = do_crash
+ try:
+ main.run_command()
+ self.fail("Did not crash as expected")
+ except Exception as e:
+ self.assertIn('unexpected', e.args)
diff --git a/ubuntu-archive-assistant b/ubuntu-archive-assistant
new file mode 100755
index 0000000..97fd187
--- /dev/null
+++ b/ubuntu-archive-assistant
@@ -0,0 +1,28 @@
+# -*- coding: utf-8 -*-
+# Copyright (C) 2018 Canonical Ltd.
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; version 3 of the License.
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# GNU General Public License for more details.
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+import signal
+import sys
+from ubuntu_archive_assistant.core import Assistant
+assistant = Assistant()
+def signal_handler(signal, frame):
+ sys.exit(0)
+signal.signal(signal.SIGINT, signal_handler)
diff --git a/ubuntu_archive_assistant/__init__.py b/ubuntu_archive_assistant/__init__.py
new file mode 100644
index 0000000..27669bc
--- /dev/null
+++ b/ubuntu_archive_assistant/__init__.py
@@ -0,0 +1,16 @@
+# -*- coding: utf-8 -*-
+# Copyright (C) 2018 Canonical Ltd.
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; version 3 of the License.
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# GNU General Public License for more details.
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
diff --git a/ubuntu_archive_assistant/command.py b/ubuntu_archive_assistant/command.py
new file mode 100644
index 0000000..4549033
--- /dev/null
+++ b/ubuntu_archive_assistant/command.py
@@ -0,0 +1,114 @@
+# -*- coding: utf-8 -*-
+# Copyright (C) 2018 Canonical Ltd.
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; version 3 of the License.
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# GNU General Public License for more details.
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+import sys
+import os
+import argparse
+import subprocess
+import logging
+from ubuntu_archive_assistant.logging import AssistantLogger, AssistantTaskLogger
+class AssistantCommand(argparse.Namespace):
+ def __init__(self, command_id, description, logger=None, leaf=True, testing=False):
+ self.command_id = command_id
+ self.description = description
+ self.leaf_command = leaf
+ self.testing = testing
+ self._args = None
+ self.debug = False
+ self.cache_path = None
+ self.commandclass = None
+ self.subcommands = {}
+ self.subcommand = None
+ self.func = None
+ self.logger = AssistantLogger(module=command_id)
+ self.log = self.logger.log
+ self.task_logger = self.logger
+ self.review = self.task_logger.review
+ self.parser = argparse.ArgumentParser(prog="%s %s" % (sys.argv[0], command_id),
+ description=description,
+ add_help=True)
+ self.parser.add_argument('--debug', action='store_true',
+ help='Enable debug messages')
+ self.parser.add_argument('--verbose', action='store_true',
+ help='Enable debug messages')
+ if not leaf:
+ self.subparsers = self.parser.add_subparsers(title='Available commands',
+ metavar='', dest='subcommand')
+ p_help = self.subparsers.add_parser('help',
+ description='Show this help message',
+ help='Show this help message')
+ p_help.set_defaults(func=self.print_usage)
+ def update(self, args):
+ self._args = args
+ def parse_args(self):
+ ns, self._args = self.parser.parse_known_args(args=self._args, namespace=self)
+ if self.debug:
+ self.logger.setLevel(logging.DEBUG)
+ self.logger.setReviewLevel(logging.DEBUG)
+ if self.verbose:
+ self.logger.setReviewLevel(logging.INFO)
+ if not self.subcommand and not self.leaf_command:
+ print('You need to specify a command', file=sys.stderr)
+ self.print_usage()
+ def run_command(self):
+ if self.commandclass:
+ self.commandclass.update(self._args)
+ if self.leaf_command and 'help' in self._args:
+ self.print_usage()
+ self.func()
+ def print_usage(self):
+ self.parser.print_help(file=sys.stderr)
+ sys.exit(os.EX_USAGE)
+ def _add_subparser_from_class(self, name, commandclass):
+ instance = commandclass(self.logger)
+ self.subcommands[name] = {}
+ self.subcommands[name]['class'] = name
+ self.subcommands[name]['instance'] = instance
+ if instance.testing:
+ if not os.environ.get('ENABLE_TEST_COMMANDS', None):
+ return
+ p = self.subparsers.add_parser(instance.command_id,
+ description=instance.description,
+ help=instance.description,
+ add_help=False)
+ p.set_defaults(func=instance.run, commandclass=instance)
+ self.subcommands[name]['parser'] = p
+ def _import_subcommands(self, submodules):
+ import inspect
+ for name, obj in inspect.getmembers(submodules):
+ if inspect.isclass(obj) and issubclass(obj, AssistantCommand):
+ self._add_subparser_from_class(name, obj)
diff --git a/ubuntu_archive_assistant/commands/__init__.py b/ubuntu_archive_assistant/commands/__init__.py
new file mode 100644
index 0000000..508a03d
--- /dev/null
+++ b/ubuntu_archive_assistant/commands/__init__.py
@@ -0,0 +1,24 @@
+# -*- coding: utf-8 -*-
+# Copyright (C) 2018 Canonical Ltd.
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; version 3 of the License.
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# GNU General Public License for more details.
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+from ubuntu_archive_assistant.commands.proposed_migration import ProposedMigration
+from ubuntu_archive_assistant.commands.mir import MIRReview
+__all__ = [
+ 'ProposedMigration',
+ 'MIRReview',
diff --git a/ubuntu_archive_assistant/commands/mir.py b/ubuntu_archive_assistant/commands/mir.py
new file mode 100644
index 0000000..92f2d45
--- /dev/null
+++ b/ubuntu_archive_assistant/commands/mir.py
@@ -0,0 +1,201 @@
+# -*- coding: utf-8 -*-
+# Copyright (C) 2018 Canonical Ltd.
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; version 3 of the License.
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# GNU General Public License for more details.
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+import os
+import sys
+import time
+import subprocess
+import tempfile
+import argparse
+import requests
+import logging
+from ubuntu_archive_assistant.command import AssistantCommand
+from ubuntu_archive_assistant.utils import urlhandling, launchpad, bugtools
+from ubuntu_archive_assistant.logging import ReviewResult, AssistantTaskLogger
+class MIRReview(AssistantCommand):
+ def __init__(self, logger):
+ super().__init__(command_id='mir',
+ description='Review Main Inclusion Requests',
+ logger=logger,
+ leaf=True)
+ def run(self):
+ self.parser.add_argument('-b', '--bug', dest='bug',
+ help='the MIR bug to evaluate')
+ self.parser.add_argument('-s', '--source', dest='source',
+ help='the MIR bug to evaluate')
+ self.parser.add_argument('--skip-review', action="store_true",
+ help='skip dropping to a subshell for code review')
+ self.parser.add_argument('--unprocessed', action="store_true",
+ default=False,
+ help='show MIRs accepted but not yet processed')
+ self.func = self.mir_review
+ self.parse_args()
+ self.run_command()
+ def mir_review(self):
+ lp = launchpad.LaunchpadInstance()
+ self.mir_team = lp.lp.people["ubuntu-mir"]
+ if not self.source and not self.bug:
+ self.log.debug("showing MIR report. show unprocessed=%s" % self.unprocessed)
+ bugs = self.get_mir_bugs(show_unprocessed=self.unprocessed)
+ sys.exit(0)
+ else:
+ completed_statuses = ("Won't Fix", "Invalid", "Fix Committed", "Fix Released")
+ if self.bug:
+ self.log.debug("show MIR by bug")
+ bug_no = int(self.bug)
+ bug = lp.lp.bugs[bug_no]
+ for bug_task in bug.bug_tasks:
+ if self.source:
+ if self.source != bug_task.target.name:
+ continue
+ if bug_task.status in completed_statuses:
+ print("MIR for %s is %s\n" % (bug_task.target.name,
+ bug_task.status))
+ continue
+ self.process(bug_task.target, bug_task)
+ else:
+ self.log.debug("show MIR by source")
+ source_pkg = self.get_source_package(self.source)
+ mir_bug = source_pkg.searchTasks(omit_duplicates=True,
+ bug_subscriber=self.mir_team,
+ order_by="id")[0]
+ self.process(source_pkg, mir_bug)
+ def get_source_package(self, binary):
+ lp = launchpad.LaunchpadInstance()
+ cache_name = None
+ name = None
+ source_pkg = lp.ubuntu.getSourcePackage(name=binary)
+ if source_pkg:
+ return source_pkg
+ try:
+ cache_name = subprocess.check_output(
+ "apt-cache show %s | grep Source:" % binary,
+ shell=True, universal_newlines=True)
+ except subprocess.CalledProcessError as e:
+ cache_name = subprocess.check_output(
+ "apt-cache show %s | grep Package:" % binary,
+ shell=True, universal_newlines=True)
+ if cache_name is not None:
+ if source.startswith("Source:") or source.startswith("Package:"):
+ name = source.split()[1]
+ if name:
+ source_pkg = lp.ubuntu.getSourcePackage(name=name)
+ return source_pkg
+ def lp_build_logs(self, source):
+ lp = launchpad.LaunchpadInstance()
+ archive = lp.ubuntu_archive()
+ spph = archive.getPublishedSources(exact_match=True,
+ source_name=source,
+ distro_series=lp.current_series(),
+ pocket="Release",
+ order_by_date=True)
+ builds = spph[0].getBuilds()
+ for build in builds:
+ if "Successfully" not in build.buildstate:
+ print("%s has failed to build" % build.arch_tag)
+ print(build.build_log_url)
+ def process(self, source_pkg, task=None):
+ lp = launchpad.LaunchpadInstance()
+ source_name = source_pkg.name
+ print("== MIR report for source package '%s' ==" % source_name)
+ print("\n=== Details ===")
+ print("LP: %s" % source_pkg.web_link)
+ if task and task.bug:
+ print("MIR bug: %s\n" % task.bug.web_link)
+ print(task.bug.description)
+ print("\n\n=== MIR assessment ===")
+ latest = lp.ubuntu_archive().getPublishedSources(exact_match=True,
+ source_name=source_name,
+ distro_series=lp.current_series())[0]
+ if not source_pkg:
+ print("\n%s does not exist in Ubuntu")
+ sys.exit(1)
+ if latest.pocket is "Proposed":
+ print("\nThere is a version of %s in -proposed: %s" % (source, latest.source_package_version))
+ if task:
+ if task.assignee:
+ print("MIR for %s is assigned to %s (%s)" % (task.target.display_name,
+ task.assignee.display_name,
+ task.status))
+ else:
+ print("MIR for %s is %s" % (task.target.display_name,
+ task.status))
+ print("\nPackage bug subscribers:")
+ for sub in source_pkg.getSubscriptions():
+ sub_text = " - %s" % sub.subscriber.display_name
+ if sub.subscribed_by:
+ sub_text += ", subscribed by %s" % sub.subscribed_by.display_name
+ print(sub_text)
+ print("\nBuild logs:")
+ self.lp_build_logs(source_name)
+ if not self.skip_review:
+ self.open_source_tmpdir(source_name)
+ def get_mir_bugs(self, show_unprocessed=False):
+ bug_statuses = ("New", "Incomplete", "Confirmed", "Triaged",
+ "In Progress")
+ def only_ubuntu(task):
+ if 'ubuntu/+source' not in task.target_link:
+ return True
+ return False
+ if show_unprocessed:
+ unprocessed = self.mir_team.searchTasks(omit_duplicates=True, bug_subscriber=self.mir_team, status="Fix Committed")
+ if any(unprocessed):
+ print("== Open MIRs reviewed but not processed ==")
+ bugtools.list_bugs(print, unprocessed, filter=only_ubuntu, file=sys.stderr)
+ tasks = self.mir_team.searchTasks(omit_duplicates=True, bug_subscriber=self.mir_team, status=bug_statuses)
+ bugtools.list_bugs(print, tasks, filter=only_ubuntu, file=sys.stderr)
+ result = None
+ return result
+ def open_source_tmpdir(self, source_name):
+ print("\nDropping to a shell for code review:\n")
+ with tempfile.TemporaryDirectory() as temp_dir:
+ os.system('cd %s; pull-lp-source %s; bash -l' % (temp_dir, source_name))
diff --git a/ubuntu_archive_assistant/commands/proposed_migration.py b/ubuntu_archive_assistant/commands/proposed_migration.py
new file mode 100644
index 0000000..ee2d45d
--- /dev/null
+++ b/ubuntu_archive_assistant/commands/proposed_migration.py
@@ -0,0 +1,835 @@
+# -*- coding: utf-8 -*-
+# Copyright (C) 2018 Canonical Ltd.
+# Author: Mathieu Trudel-Lapierre
+# Author: Łukasz 'sil2100' Zemczak
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; version 3 of the License.
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# GNU General Public License for more details.
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+"""Analyze britney's excuses output and suggest a course of action for
+proposed migration.
+# FIXME: Various parts of slangasek's pseudocode (in comments where relevant)
+# are not well implemented.
+import yaml
+import os
+import re
+import sys
+import time
+import math
+import subprocess
+import argparse
+import tempfile
+import logging
+from contextlib import ExitStack
+from enum import Enum
+from collections import defaultdict
+from ubuntu_archive_assistant.command import AssistantCommand
+from ubuntu_archive_assistant.utils import urlhandling, launchpad
+from ubuntu_archive_assistant.logging import ReviewResult, ReviewResultAdapter, AssistantTaskLogger
+HINTS_BRANCH = 'lp:~ubuntu-release/britney/hints-ubuntu'
+ARCHIVE_PAGES = 'https://people.canonical.com/~ubuntu-archive/'
+LAUNCHPAD_URL = 'https://launchpad.net'
+AUTOPKGTEST_URL = 'http://autopkgtest.ubuntu.com'
+MAX_CACHE_AGE = 14400 # excuses cache should not be older than 4 hours
+class ProposedMigration(AssistantCommand):
+ def __init__(self, logger):
+ super().__init__(command_id='proposed',
+ description='Assess next work required for a package\'s proposed migration',
+ logger=logger,
+ leaf=True)
+ self.excuses = {}
+ self.seen = []
+ def run(self):
+ self.parser.add_argument('-s', '--source', dest='source_name',
+ help='the package to evaluate')
+ self.parser.add_argument('--no-cache', dest='do_not_cache', action='store_const',
+ const=True, default=False,
+ help='Do not cache excuses')
+ self.parser.add_argument('--refresh', action='store_const',
+ const=True, default=False,
+ help='Force refresh of cached excuses')
+ self.func = self.proposed_migration
+ self.parse_args()
+ self.run_command()
+ def proposed_migration(self):
+ refresh_due = False
+ with ExitStack() as resources:
+ if self.do_not_cache:
+ fp = resources.enter_context(tempfile.NamedTemporaryFile())
+ self.cache_path = resources.enter_context(
+ tempfile.TemporaryDirectory())
+ refresh_due = True
+ else:
+ xdg_cache = os.getenv('XDG_CACHE_HOME', '~/.cache')
+ self.cache_path = os.path.expanduser(
+ os.path.join(xdg_cache, 'ubuntu-archive-assistant', 'proposed-migration'))
+ excuses_path = os.path.join(self.cache_path, 'excuses.yaml')
+ if os.path.exists(self.cache_path):
+ if not os.path.isdir(self.cache_path):
+ print("The {} cache directory is not a directory, please "
+ "resolve manually and re-run.".format(self.cache_path))
+ exit(1)
+ else:
+ os.makedirs(self.cache_path)
+ try:
+ fp = open(excuses_path, 'r')
+ except FileNotFoundError:
+ refresh_due = True
+ pass
+ finally:
+ fp = open(excuses_path, 'a+')
+ file_state = os.stat(excuses_path)
+ mtime = file_state.st_mtime
+ now = time.time()
+ if (now - mtime) > MAX_CACHE_AGE:
+ refresh_due = True
+ if self.refresh or refresh_due:
+ excuses_url = ARCHIVE_PAGES + 'proposed-migration/update_excuses.yaml'
+ urlhandling.get_with_progress(url=excuses_url, filename=fp.name)
+ fp.seek(0)
+ # Use the C implementation of the SafeLoader, it's noticeably faster, and
+ # here we're dealing with large input files.
+ self.excuses = yaml.load(fp, Loader=yaml.CSafeLoader)
+ if self.source_name is None:
+ print("No source package name was provided. The following packages are "
+ "blocked in proposed:\n")
+ self.source_name = self.choose_blocked_source(self.excuses)
+ self.find_excuses(self.source_name, 0)
+ def get_debian_ci_results(self, source_name, arch):
+ try:
+ url = "https://ci.debian.net/data/packages/unstable/{}/{}/latest.json"
+ results_url = url.format("amd64", self.get_pkg_archive_path(source_name))
+ resp = urlhandling.get(url=results_url)
+ return resp.json()
+ except Exception:
+ return None
+ def find_excuses(self, source_name, level):
+ if source_name in self.seen:
+ return
+ for excuses_item in self.excuses['sources']:
+ item_name = excuses_item.get('item-name')
+ if item_name == source_name:
+ self.selected = excuses_item
+ self.process(level)
+ def get_pkg_archive_path(self, package):
+ try:
+ # TODO: refactor to avoid shell=True
+ path = subprocess.check_output(
+ "apt-cache show %s | grep Filename:" % package,
+ shell=True, universal_newlines=True)
+ path = path.split(' ')[1].split('/')
+ path = "/".join(path[2:4])
+ return path
+ except Exception:
+ return None
+ def get_source_package(self, binary_name):
+ cache_output = None
+ # TODO: refactor to avoid shell=True
+ try:
+ cache_output = subprocess.check_output(
+ "apt-cache show %s | grep Source:" % binary_name,
+ shell=True, universal_newlines=True)
+ except subprocess.CalledProcessError:
+ cache_output = subprocess.check_output(
+ "apt-cache show %s | grep Package:" % binary_name,
+ shell=True, universal_newlines=True)
+ if cache_output is not None:
+ if cache_output.startswith("Source:") or cache_output.startswith("Package:"):
+ source_name = cache_output.split()[1]
+ return source_name
+ return None
+ def package_in_distro(self, package, distro='ubuntu', distroseries='bionic',
+ proposed=False):
+ # TODO: This operation is pretty costly, do caching?
+ if distro == 'debian':
+ distroseries = DEBIAN_CURRENT_SERIES
+ if proposed:
+ distroseries += "-proposed"
+ madison_url = "https://qa.debian.org/cgi-bin/madison.cgi"
+ params = "?package={}&table={}&a=&c=&s={}".format(package,
+ distro,
+ distroseries)
+ url = madison_url + params
+ resp = urlhandling.get(url=url)
+ package_found = {}
+ for line in resp.text.split('\n'):
+ if " {} ".format(package) not in line:
+ continue
+ package_line = line.split(' | ')
+ series_component = package_line[2].split('/')
+ component = 'main'
+ if len(series_component) > 1:
+ component = series_component[1]
+ if '{}'.format(distroseries) in series_component[0]:
+ if distro == 'ubuntu':
+ package_found = {
+ 'version': package_line[1],
+ 'component': component,
+ }
+ else:
+ package_found = {
+ 'version': package_line[1],
+ }
+ return package_found
+ return {}
+ def process_lp_build_results(self, level, uploads, failed):
+ logger = AssistantTaskLogger("lp_builds", self.task_logger)
+ assistant = logger.newTask("lp_builds", level + 1)
+ lp = launchpad.LaunchpadInstance()
+ archive = lp.ubuntu_archive()
+ series = lp.current_series()
+ source_name = self.selected.get('source')
+ spph = archive.getPublishedSources(exact_match=True,
+ source_name=source_name,
+ distro_series=series,
+ pocket="Proposed",
+ order_by_date=True)
+ new_version = series.getPackageUploads(archive=archive,
+ name=source_name,
+ version=self.selected.get('new-version'),
+ pocket="Proposed",
+ exact_match=True)
+ for item in new_version:
+ arch = item.display_arches.split(',')[0]
+ if item.package_version not in uploads:
+ uploads[item.package_version] = {}
+ if arch == 'source':
+ continue
+ uploads[item.package_version][arch] = item.getBinaryProperties()
+ # Only get the builds for the latest publication, this is more likely to
+ # be new source in -proposed, or the most recent upload.
+ builds = spph[0].getBuilds()
+ for build in builds:
+ missing_arches = set()
+ if "Successfully" not in build.buildstate:
+ failed[build.arch_tag] = {
+ 'state': build.buildstate,
+ }
+ if self.logger.getReviewLevel() < logging.ERROR:
+ assistant.error("{} is missing a build on {}:".format(
+ source_name, build.arch_tag),
+ status=ReviewResult.FAIL)
+ log_url = build.build_log_url
+ if not log_url:
+ log_url = ""
+ assistant.warning("[%s] %s" % (build.buildstate,
+ log_url),
+ status=ReviewResult.NONE, depth=1)
+ if any(failed) and self.logger.getReviewLevel() >= logging.ERROR:
+ assistant.critical("Fix missing builds: {}".format(
+ ", ".join(failed.keys())),
+ status=ReviewResult.NONE)
+ assistant.error("{}/ubuntu/+source/{}/{}".format(
+ spph[0].source_package_name,
+ spph[0].source_package_version),
+ status=ReviewResult.INFO, depth=1)
+ def check_mir_status(self, logger, target_package, level):
+ logger = AssistantTaskLogger("mir", logger)
+ assistant = logger.newTask("mir", level + 2)
+ # TODO: Check for MIR bug state
+ # - has the MIR been rejected?
+ # - upload or submit to sponsorship queue to drop the dependency
+ lp = launchpad.LaunchpadInstance()
+ source_name = self.get_source_package(target_package)
+ source_pkg = lp.ubuntu.getSourcePackage(name=source_name)
+ mir_tasks = source_pkg.searchTasks(bug_subscriber=lp.lp.people['ubuntu-mir'],
+ omit_duplicates=True)
+ if not mir_tasks:
+ assistant.error("Please open a MIR bug:",
+ status=ReviewResult.INFO)
+ assistant.error("{}/ubuntu/+source/{}/+filebug?field.title=%5bMIR%5d%20{}".format(
+ LAUNCHPAD_URL, source_name, source_name),
+ status=ReviewResult.NONE, depth=1)
+ last_bug_id = 0
+ for task in mir_tasks:
+ assigned_to = "unassigned"
+ if task.assignee:
+ assigned_to = "assigned to %s" % task.assignee.display_name
+ if task.bug.id != last_bug_id:
+ assistant.error("(LP: #%s) %s" % (task.bug.id, task.bug.title),
+ status=ReviewResult.INFO)
+ last_bug_id = task.bug.id
+ assistant.warning("%s (%s) in %s (%s)" % (task.status,
+ task.importance,
+ task.target.name,
+ assigned_to),
+ status=ReviewResult.NONE, depth=1)
+ if task.status in ("Won't Fix", "Invalid"):
+ assistant.error("This MIR has been rejected; please look into "
+ "dropping the dependency on {} from {}".format(
+ target_package, source_name),
+ status=ReviewResult.INFO, depth=1)
+ def process_unsatisfiable_depends(self, level):
+ logger = AssistantTaskLogger("unsatisfiable", self.task_logger)
+ assistant = logger.newTask("unsatisfiable", level + 1)
+ distroseries = launchpad.LaunchpadInstance().current_series().name
+ affected_sources = set()
+ unsatisfiable = defaultdict(set)
+ depends = self.selected.get('dependencies').get('unsatisfiable-dependencies', {})
+ for arch, signatures in depends.items():
+ for signature in signatures:
+ binary_name = signature.split(' ')[0]
+ unsatisfiable[signature].add(arch)
+ try:
+ pkg = self.get_source_package(binary_name)
+ affected_sources.add(pkg)
+ except Exception:
+ # FIXME: we might be dealing with a new package in proposed
+ # here, but using the binary name instead of the source
+ # name.
+ if any(self.package_in_distro(binary_name, distro='ubuntu',
+ distroseries=distroseries)):
+ affected_sources.add(binary_name)
+ elif any(self.package_in_distro(binary_name,
+ distro='ubuntu',
+ distroseries=distroseries,
+ proposed=True)):
+ affected_sources.add(binary_name)
+ if not affected_sources and not unsatisfiable:
+ return
+ logger.critical("Fix unsatisfiable dependencies in {}:".format(
+ self.selected.get('source')),
+ status=ReviewResult.NONE)
+ # TODO: Check version comparisons for removal requests/fixes
+ # - is the unsatisfied dependency due to a package dropped in Ubuntu,
+ # but not in Debian, which may come back as a sync later
+ # (i.e. not blacklisted)?
+ # - leave in -proposed
+ # - is this package Ubuntu-specific?
+ # - is there an open bug in launchpad about this issue, with no action?
+ # - subscribe ubuntu-archive and request the package's removal
+ # - else
+ # - open a bug report and assign to the package's maintainer
+ # - is the package in Debian, but the dependency is part of Ubuntu delta?
+ # - fix
+ possible_mir = set()
+ for signature, arches in unsatisfiable.items():
+ assistant = logger.newTask("unsatisfiable", level + 2)
+ depends = signature.split(' ')[0]
+ assistant.warning("{} can not be satisfied "
+ "on {}".format(signature, ", ".join(arches)),
+ status=ReviewResult.FAIL)
+ in_archive = self.package_in_distro(depends, distro='ubuntu',
+ distroseries=distroseries)
+ in_proposed = self.package_in_distro(depends, distro='ubuntu',
+ distroseries=distroseries,
+ proposed=True)
+ if any(in_archive) and not any(in_proposed):
+ assistant.info("{}/{} exists "
+ "in the Ubuntu primary archive".format(
+ depends,
+ in_archive.get('version')),
+ status=ReviewResult.FAIL, depth=1)
+ if self.selected.get('component', 'main') != in_archive.get('component'):
+ possible_mir.add(depends)
+ elif not any(in_archive) and any(in_proposed):
+ assistant.info("{} is only in -proposed".format(depends),
+ status=ReviewResult.FAIL, depth=1)
+ assistant.debug("Has this package been dropped in Ubuntu, "
+ "but not in Debian?",
+ status=ReviewResult.INFO, depth=2)
+ elif not any(in_archive) and not any(in_proposed):
+ in_debian = self.package_in_distro(depends, distro='debian',
+ distroseries=distroseries)
+ if any(in_debian):
+ assistant.warning("{} only exists in Debian".format(depends),
+ status=ReviewResult.FAIL, depth=1)
+ assistant.debug("Is this package blacklisted? Should it be synced?",
+ status=ReviewResult.INFO, depth=2)
+ else:
+ assistant.warning("{} is not found".format(depends),
+ status=ReviewResult.FAIL, depth=1)
+ assistant.debug("Has this package been removed?",
+ status=ReviewResult.INFO, depth=2)
+ else:
+ if self.selected.get('component', 'main') != in_archive.get('component'):
+ possible_mir.add(depends)
+ for p_mir in possible_mir:
+ self.check_mir_status(logger, p_mir, level)
+ if affected_sources:
+ for src_name in affected_sources:
+ self.find_excuses(src_name, level+2)
+ def process_autopkgtest(self, level):
+ logger = AssistantTaskLogger("autopkgtest", self.task_logger)
+ assistant = logger.newTask("autopkgtest", level + 1)
+ autopkgtests = self.selected.get('policy_info').get('autopkgtest')
+ assistant.critical("Fix autopkgtests triggered by this package for:",
+ status=ReviewResult.NONE)
+ waiting = 0
+ failed_tests = defaultdict(set)
+ for key, test in autopkgtests.items():
+ logger = AssistantTaskLogger(key, logger)
+ assistant = logger.newTask(key, level + 2)
+ for arch, arch_test in test.items():
+ if 'RUNNING' in arch_test:
+ waiting += 1
+ if 'REGRESSION' in arch_test:
+ assistant.warning("{} {} {}".format(key, arch, arch_test[2]),
+ status=ReviewResult.FAIL)
+ failed_tests[key].add(arch)
+ if arch == "amd64":
+ if '/' in key:
+ pkgname = key.split('/')[0]
+ else:
+ pkgname = key
+ ci_results = self.get_debian_ci_results(pkgname, "amd64")
+ if ci_results is not None:
+ result = ci_results.get('status')
+ status_ci = ReviewResult.FAIL
+ if result == 'pass':
+ status_ci = ReviewResult.PASS
+ assistant.warning("CI tests {} in Debian".format(
+ result),
+ status=status_ci, depth=1)
+ if 'pass' in result:
+ assistant.info("Consider filing a bug "
+ "(usertag: autopkgtest) "
+ "in Debian if none exist",
+ status=ReviewResult.INFO, depth=2)
+ else:
+ # TODO: (cyphermox) detect this case?
+ # check versions?
+ assistant.info("If synced from Debian and "
+ "requires sourceful changes to "
+ "the package, file a bug for "
+ "removal from -proposed",
+ status=ReviewResult.INFO, depth=2)
+ if waiting > 0:
+ assistant.error("{} tests are currently running "
+ "or waiting to be run".format(waiting),
+ status=ReviewResult.INFO)
+ else:
+ if self.logger.getReviewLevel() >= logging.ERROR:
+ for test, arches in failed_tests.items():
+ assistant.error("{}: {}".format(test, ", ".join(arches)),
+ status=ReviewResult.FAIL)
+ assistant.error("{}/packages/p/{}".format(AUTOPKGTEST_URL, test.split('/')[0]),
+ status=ReviewResult.INFO, depth=1)
+ def process_blocking(self, level):
+ assistant = self.task_logger.newTask("blocking", level + 1)
+ lp = launchpad.LaunchpadInstance().lp
+ bugs = self.selected.get('policy_info').get('block-bugs')
+ source_name = self.selected.get('source')
+ if bugs:
+ assistant.critical("Resolve blocking bugs:", status=ReviewResult.NONE)
+ for bug in bugs.keys():
+ lp_bug = lp.bugs[bug]
+ assistant.error("[LP: #{}] {} {}".format(lp_bug.id,
+ lp_bug.title,
+ lp_bug.web_link),
+ status=ReviewResult.NONE)
+ tasks = lp_bug.bug_tasks
+ for task in tasks:
+ value = ReviewResult.FAIL
+ if task.status in ('Fix Committed', 'Fix Released'):
+ value = ReviewResult.PASS
+ elif task.status in ("Won't Fix", 'Invalid'):
+ continue
+ assistant.warning("{}({}) in {}".format(
+ task.status,
+ task.importance,
+ task.bug_target_display_name),
+ status=value)
+ # guesstimate whether this is a removal request
+ if 'emove {}'.format(source_name) in lp_bug.title:
+ assistant.info("This looks like a removal request",
+ status=ReviewResult.INFO)
+ assistant.info("Consider pinging #ubuntu-release for processing",
+ status=ReviewResult.INFO)
+ hints = self.selected.get('hints')
+ if hints is not None:
+ hints_path = os.path.join(self.cache_path, 'hints-ubuntu')
+ self.get_latest_hints(hints_path)
+ assistant.critical("Update manual hinting (contact #ubuntu-release):",
+ status=ReviewResult.NONE)
+ hint_from = hints[0]
+ if hint_from == 'freeze':
+ assistant.error("Package blocked by freeze.")
+ else:
+ version = None
+ unblock_re = re.compile(r'^unblock {}\/(.*)$'.format(source_name))
+ files = [f for f in os.listdir(hints_path) if (os.path.isfile(
+ os.path.join(hints_path, f)) and f != 'freeze')]
+ for hints_file in files:
+ with open(os.path.join(hints_path, hints_file)) as fp:
+ print("Checking {}".format(os.path.join(hints_path, hints_file)))
+ for line in fp:
+ match = unblock_re.match(line)
+ if match:
+ version = match.group(1)
+ break
+ if version:
+ break
+ if version:
+ reason = \
+ ("Unblock request by {} ignored due to version mismatch: "
+ "{}".format(hints_file, version))
+ else:
+ reason = "Missing unblock sequence in the hints file"
+ assistant.error(reason, status=ReviewResult.INFO)
+ def process_dependencies(self, source, level):
+ assistant = self.task_logger.newTask("dependencies", level + 1)
+ dependencies = source.get('dependencies')
+ blocked_by = dependencies.get('blocked-by', None)
+ migrate_after = dependencies.get('migrate-after', None)
+ if blocked_by or migrate_after:
+ assistant.critical("Clear outstanding promotion interdependencies:",
+ status=ReviewResult.NONE)
+ assistant = self.task_logger.newTask("dependencies", level + 2)
+ if migrate_after is not None:
+ assistant.error("{} will migrate after {}".format(
+ source.get('source'), ", ".join(migrate_after)),
+ status=ReviewResult.FAIL)
+ assistant.warning("Investigate what packages are conflicting, "
+ "by looking at 'Trying easy on autohinter' lines in "
+ "update_output.txt for {}".format(
+ source.get('source')),
+ status=ReviewResult.INFO, depth=1)
+ assistant.warning("See {}proposed-migration/update_output.txt".format(
+ status=ReviewResult.INFO, depth=2)
+ if blocked_by is not None:
+ assistant.error("{} is blocked by the migration of {}".format(
+ source.get('source'), ", ".join(blocked_by)),
+ status=ReviewResult.FAIL)
+ for blocker in blocked_by:
+ self.find_excuses(blocker, level+2)
+ def process_missing_builds(self, level):
+ logger = AssistantTaskLogger("missing_builds", self.task_logger)
+ assistant = logger.newTask("missing_builds", level + 1)
+ new_version = self.selected.get('new-version')
+ old_version = self.selected.get('old-version')
+ # TODO: Process missing builds; suggest options
+ #
+ # - missing build on $arch / has no binaries on any arch
+ # - is this an architecture-specific build failure?
+ # - has Debian removed the binaries for this architecture?
+ # - ask AA to remove the binaries as ANAIS
+ # - else
+ # - try to fix
+ #
+ # - is this a build failure on all archs?
+ # - are there bugs filed about this failure in Debian?
+ # - is the package in sync with Debian and does the package require
+ # sourceful changes to fix?
+ # - remove from -proposed
+ #
+ # - does the package fail to build in Debian?
+ # - file a bug in Debian
+ # - is the package in sync with Debian and does the package require
+ # sourceful changes to fix?
+ # - remove from -proposed
+ #
+ # - is this a dep-wait?
+ # - does this package have this build-dependency in Debian?
+ # - is this an architecture-specific dep-wait?
+ # - has Debian removed the binaries for this architecture?
+ # - ask AA to remove the binaries as ANAIS
+ # - else
+ # - try to fix
+ # - does this binary package exist in Debian?
+ # - look what source package provides this binary package in Debian
+ # - is this source package ftbfs or dep-wait in -proposed?
+ # - recurse
+ # - else
+ # - is this source package on the sync blacklist?
+ # - file a bug with the Ubuntu package
+ # - else
+ # - fix by syncing or merging the source
+ # - else
+ # - make sure a bug is filed in Debian about the issue
+ # - was the depended-on package removed from Debian,
+ # and is this a sync?
+ # - ask AA to remove the package from -proposed
+ # - else
+ # - leave the package in -proposed
+ uploads = {}
+ failed = {}
+ new = []
+ new_binaries = set()
+ self.process_lp_build_results(level, uploads, failed)
+ if new_version in uploads:
+ for arch, item in uploads[new_version].items():
+ for binary in item:
+ binary_name = binary.get('name')
+ new_binaries.add(binary_name)
+ if binary.get('is_new'):
+ new.append(binary)
+ if not any(failed):
+ assistant = logger.newTask("old_binaries", level + 1)
+ assistant.warning("No failed builds found", status=ReviewResult.PASS)
+ try:
+ missing_builds = self.selected.get('missing-builds')
+ missing_arches = missing_builds.get('on-architectures')
+ arch_o = []
+ for arch in missing_arches:
+ if arch not in uploads[new_version]:
+ arch_o.append("-a {}".format(arch))
+ if any(arch_o):
+ old_binaries = self.selected.get('old-binaries').get(old_version)
+ assistant.warning("This package has dropped support for "
+ "architectures it previous supported. ",
+ status=ReviewResult.INFO)
+ assistant.warning("Ask in #ubuntu-release for an Archive "
+ "Admin to run:",
+ status=ReviewResult.INFO)
+ assistant.info("remove-package %(arches)s -b %(bins)s"
+ % ({'arches': " ".join(arch_o),
+ 'bins': " ".join(old_binaries),
+ }), status=ReviewResult.NONE, depth=1)
+ except AttributeError:
+ # Ignore a failure here, it just means we don't have
+ # missing-builds to process after all.
+ pass
+ if any(new):
+ assistant = logger.newTask("new", level + 1)
+ assistant.warning("This package has NEW binaries to process:",
+ status=ReviewResult.INFO)
+ for binary in new:
+ assistant.error("NEW: [{}] {}/{}".format(
+ binary.get('architecture'),
+ binary.get('name'),
+ binary.get('version')),
+ status=ReviewResult.FAIL, depth=1)
+ def process(self, level):
+ source_name = self.selected.get('source')
+ reasons = self.selected.get('reason')
+ self.seen.append(source_name)
+ self.task_logger = AssistantTaskLogger(source_name, self.task_logger)
+ assistant = self.task_logger.newTask(source_name, depth=level)
+ text_candidate = "not considered"
+ candidate = ReviewResult.FAIL
+ if self.selected.get('is-candidate'):
+ text_candidate = "a valid candidate"
+ candidate = ReviewResult.PASS
+ assistant.info("{} is {}".format(source_name, text_candidate),
+ status=candidate)
+ assistant.critical("Next steps for {} {}:".format(
+ source_name, self.selected.get('new-version')),
+ status=ReviewResult.NONE)
+ assistant.debug("reasons: {}".format(reasons), status=ReviewResult.NONE)
+ work_needed = False
+ missing_builds = self.selected.get('missing-builds')
+ if missing_builds is not None or 'no-binaries' in reasons:
+ work_needed = True
+ self.process_missing_builds(level)
+ if 'depends' in reasons:
+ work_needed = True
+ self.process_unsatisfiable_depends(level)
+ if 'block' in reasons:
+ work_needed = True
+ self.process_blocking(level)
+ if 'autopkgtest' in reasons:
+ work_needed = True
+ self.process_autopkgtest(level)
+ dependencies = self.selected.get('dependencies')
+ if dependencies is not None:
+ work_needed = True
+ self.process_dependencies(self.selected, level)
+ if work_needed is False:
+ assistant.error("Good job!", status=ReviewResult.PASS)
+ assistant.warning("Investigate if packages are conflicting, "
+ "by looking at 'Trying easy on autohinter' lines in "
+ "update_output.txt"
+ " for {}".format(source_name),
+ status=ReviewResult.INFO)
+ assistant.warning("See {}proposed-migration/update_output.txt".format(
+ status=ReviewResult.INFO)
+ def choose_blocked_source(self, excuses):
+ import pager
+ def pager_callback(pagenum):
+ prompt = "Page -%s-. Press any key for next page or Q to select a " \
+ "package." % pagenum
+ pager.echo(prompt)
+ if pager.getch() in [pager.ESC_, 'q', 'Q']:
+ return False
+ pager.echo('\r' + ' '*(len(prompt)) + '\r')
+ choice = 0
+ options = []
+ entry_list = []
+ sorted_excuses = sorted(
+ self.excuses['sources'],
+ key=lambda e: e.get('policy_info').get('age').get('current-age'),
+ reverse=True)
+ for src_num, item in enumerate(sorted_excuses, start=1):
+ item_name = item.get('item-name')
+ age = math.floor(
+ item.get('policy_info').get('age').get('current-age'))
+ options.append(item_name)
+ entry_list.append("({}) {} (Age: {} days)\n".format(
+ src_num, item_name, age))
+ while True:
+ pager.page(iter(entry_list), pager_callback)
+ num = input("\nWhich package do you want to look at? ")
+ try:
+ choice = int(num)
+ if choice > 0 and choice <= src_num:
+ break
+ except ValueError:
+ # num might be the package name.
+ if num in options:
+ return num
+ return options[choice - 1]
+ def get_latest_hints(self, path):
+ if os.path.exists(path):
+ try:
+ subprocess.check_call(
+ "bzr info %s" % path, shell=True, stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+ subprocess.check_call("bzr pull -d %s" % path, shell=True,
+ stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ except subprocess.CalledProcessError:
+ print("The {} path either exists but doesn't seem to be a valid "
+ "branch or failed to update it properly.".format(
+ path))
+ exit(1)
+ else:
+ try:
+ subprocess.check_call(
+ "bzr branch %s %s" % (HINTS_BRANCH, path), shell=True,
+ stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ except subprocess.CalledProcessError:
+ print("Could not access the hints-ubuntu bzr branch, exiting.")
+ exit(1)
diff --git a/ubuntu_archive_assistant/core.py b/ubuntu_archive_assistant/core.py
new file mode 100644
index 0000000..b3c5495
--- /dev/null
+++ b/ubuntu_archive_assistant/core.py
@@ -0,0 +1,45 @@
+# -*- coding: utf-8 -*-
+# Copyright (C) 2018 Canonical Ltd.
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; version 3 of the License.
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# GNU General Public License for more details.
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+import os
+import logging
+from ubuntu_archive_assistant.command import AssistantCommand
+import ubuntu_archive_assistant.logging as app_logging
+logger = app_logging.AssistantLogger()
+class Assistant(AssistantCommand):
+ def __init__(self):
+ super().__init__(command_id='',
+ description='archive assistant',
+ logger=logger,
+ leaf=False)
+ def parse_args(self):
+ import ubuntu_archive_assistant.commands
+ self._import_subcommands(ubuntu_archive_assistant.commands)
+ super().parse_args()
+ def main(self):
+ self.parse_args()
+ self.run_command()
diff --git a/ubuntu_archive_assistant/logging.py b/ubuntu_archive_assistant/logging.py
new file mode 100644
index 0000000..c4cab54
--- /dev/null
+++ b/ubuntu_archive_assistant/logging.py
@@ -0,0 +1,171 @@
+# -*- coding: utf-8 -*-
+# Copyright (C) 2018 Canonical Ltd.
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; version 3 of the License.
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# GNU General Public License for more details.
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+import os
+import logging
+from enum import Enum
+class ReviewResult(Enum):
+ NONE = 1
+ PASS = 2
+ FAIL = 3
+ INFO = 4
+class ReviewResultAdapter(logging.LoggerAdapter):
+ depth = 0
+ def process(self, msg, kwargs):
+ status = kwargs.pop('status')
+ depth = self.depth + kwargs.pop('depth', 0)
+ # FIXME: identationing may be ugly because of character width
+ if status is ReviewResult.PASS:
+ icon = "\033[92m✔\033[0m"
+ #icon = ""
+ elif status is ReviewResult.FAIL:
+ icon = "\033[91m✘\033[0m"
+ #icon = ""
+ elif status is ReviewResult.INFO:
+ icon = "\033[94m\033[0m"
+ #icon = ""
+ else:
+ icon = ""
+ if depth <= 0:
+ return '%s %s' % (msg, icon), kwargs
+ elif status is ReviewResult.INFO:
+ return '%s%s %s' % (" " * depth * 2, icon, msg), kwargs
+ else:
+ return '%s%s %s' % (" " * depth * 2, msg, icon), kwargs
+ def critical(self, msg, *args, **kwargs):
+ self.depth = self.extra['depth']
+ msg, kwargs = self.process(msg, kwargs)
+ self.logger.critical(msg, *args, **kwargs)
+ def error(self, msg, *args, **kwargs):
+ self.depth = self.extra['depth']
+ msg, kwargs = self.process(msg, kwargs)
+ self.logger.error(msg, *args, **kwargs)
+ def warning(self, msg, *args, **kwargs):
+ self.depth = self.extra['depth']
+ msg, kwargs = self.process(msg, kwargs)
+ self.logger.warning(msg, *args, **kwargs)
+ def info(self, msg, *args, **kwargs):
+ self.depth = self.extra['depth']
+ msg, kwargs = self.process(msg, kwargs)
+ self.logger.info(msg, *args, **kwargs)
+ def debug(self, msg, *args, **kwargs):
+ self.depth = self.extra['depth']
+ msg, kwargs = self.process("DEBUG<{}>: {}".format(self.name, msg), kwargs)
+ self.logger.debug(msg, *args, **kwargs)
+class AssistantLogger(object):
+ class __AssistantLogger(object):
+ def __init__(self):
+ main_root_logger = logging.RootLogger(logging.INFO)
+ self.main_log_manager = logging.Manager(main_root_logger)
+ main_review_logger = logging.RootLogger(logging.ERROR)
+ self.review_log_manager = logging.Manager(main_review_logger)
+ fmt = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
+ main_handler = logging.StreamHandler()
+ review_handler = logging.StreamHandler()
+ main_handler.setFormatter(fmt)
+ main_root_logger.addHandler(main_handler)
+ main_review_logger.addHandler(review_handler)
+ instance = None
+ def __init__(self, module=None, depth=0):
+ if not AssistantLogger.instance:
+ AssistantLogger.instance = AssistantLogger.__AssistantLogger()
+ if not module:
+ self.log = AssistantLogger.instance.main_log_manager.getLogger('assistant')
+ self.review_logger = AssistantLogger.instance.review_log_manager.getLogger('review')
+ else:
+ self.log = AssistantLogger.instance.main_log_manager.getLogger('assistant.%s' % module)
+ self.review_logger = AssistantLogger.instance.review_log_manager.getLogger('review.%s' % module)
+ self.depth = depth
+ self.review = ReviewResultAdapter(self.review_logger, {'depth': self.depth})
+ def newTask(self, task, depth):
+ review_logger = AssistantLogger.instance.review_log_manager.getLogger("%s.%s" % (self.review.name, task))
+ return ReviewResultAdapter(review_logger, {'depth': self.depth})
+ def setLevel(self, level):
+ self.log.setLevel(level)
+ def setReviewLevel(self, level):
+ self.review.setLevel(level)
+ def getReviewLevel(self):
+ return self.review.getEffectiveLevel()
+ def getReviewLogger(self, name):
+ return AssistantLogger.instance.review_log_manager.getLogger(name)
+class AssistantTask(object):
+ def __init__(self, task, parent=None):
+ self.parent = parent
+ self.log = parent.log
+ if isinstance(parent, AssistantLogger):
+ self.depth = 0
+ else:
+ self.depth = parent.depth + 1
+class AssistantTaskLogger(AssistantTask):
+ def __init__(self, task, logger):
+ super().__init__(task, parent=logger)
+ #self.review = self.parent.newTask(task, logger.depth + 1)
+ def newTask(self, task, depth):
+ review_logger = self.parent.getReviewLogger("%s.%s" % (self.parent.review.name, task))
+ self.review = ReviewResultAdapter(review_logger, {'depth': depth})
+ return self.review
+ def getReviewLogger(self, name):
+ return self.parent.getReviewLogger(name)
+ def critical(self, msg, *args, **kwargs):
+ self.review.critical(msg, *args, **kwargs)
+ def error(self, msg, *args, **kwargs):
+ self.review.error(msg, *args, **kwargs)
+ def warning(self, msg, *args, **kwargs):
+ self.review.warning(msg, *args, **kwargs)
+ def info(self, msg, *args, **kwargs):
+ self.review.info(msg, *args, **kwargs)
+ def debug(self, msg, *args, **kwargs):
+ self.review.debug(msg, *args, **kwargs)
diff --git a/ubuntu_archive_assistant/utils/__init__.py b/ubuntu_archive_assistant/utils/__init__.py
new file mode 100644
index 0000000..27669bc
--- /dev/null
+++ b/ubuntu_archive_assistant/utils/__init__.py
@@ -0,0 +1,16 @@
+# -*- coding: utf-8 -*-
+# Copyright (C) 2018 Canonical Ltd.
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; version 3 of the License.
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# GNU General Public License for more details.
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
diff --git a/ubuntu_archive_assistant/utils/bugtools.py b/ubuntu_archive_assistant/utils/bugtools.py
new file mode 100644
index 0000000..ffdc8e1
--- /dev/null
+++ b/ubuntu_archive_assistant/utils/bugtools.py
@@ -0,0 +1,82 @@
+# -*- coding: utf-8 -*-
+# Copyright (C) 2018 Canonical Ltd.
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; version 3 of the License.
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# GNU General Public License for more details.
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+from termcolor import colored
+from ubuntu_archive_assistant.utils import launchpad
+from ubuntu_archive_assistant.logging import AssistantLogger
+def _colorize_status(status):
+ color = 'grey'
+ if status in ('In Progress', 'Fix Committed'):
+ color = 'green'
+ elif status in ('Incomplete'):
+ color = 'red'
+ else:
+ color = 'grey'
+ return colored(status, color)
+def _colorize_priority(importance):
+ color = 'grey'
+ if importance in ('Critical'):
+ color = 'red'
+ elif importance in ('High', 'Medium'):
+ color = 'yellow'
+ elif importance in ('Low'):
+ color = 'green'
+ else:
+ color = 'grey'
+ return colored(importance, color)
+def show_bug(print_func, bug, **kwargs):
+ print_func("(LP: #%s) %s" % (bug.id, bug.title),
+ **kwargs)
+def show_task(print_func, task, show_bug_header=False, **kwargs):
+ assigned_to = "unassigned"
+ if task.assignee:
+ if task.assignee.name in ('ubuntu-security', 'canonical-security'):
+ a_color = 'red'
+ elif task.assignee.name in ('ubuntu-mir'):
+ a_color = 'blue'
+ else:
+ a_color = 'grey'
+ assignee = colored(task.assignee.display_name, a_color)
+ assigned_to = "assigned to %s" % assignee
+ if show_bug_header:
+ show_bug(print_func, task.bug, **kwargs)
+ print_func("\t%s (%s) in %s (%s)" % (_colorize_status(task.status),
+ _colorize_priority(task.importance),
+ task.target.name, assigned_to),
+ **kwargs)
+def list_bugs(print_func, tasks, filter=None, **kwargs):
+ last_bug_id = 0
+ for task in tasks:
+ if filter is not None and filter(task):
+ continue
+ if task.bug.id != last_bug_id:
+ show_bug(print_func, task.bug, **kwargs)
+ last_bug_id = task.bug.id
+ show_task(print_func, task, show_bug_header=False, **kwargs)
\ No newline at end of file
diff --git a/ubuntu_archive_assistant/utils/launchpad.py b/ubuntu_archive_assistant/utils/launchpad.py
new file mode 100644
index 0000000..43872b5
--- /dev/null
+++ b/ubuntu_archive_assistant/utils/launchpad.py
@@ -0,0 +1,60 @@
+# -*- coding: utf-8 -*-
+# Copyright (C) 2018 Canonical Ltd.
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; version 3 of the License.
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# GNU General Public License for more details.
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+import os
+from launchpadlib.launchpad import Launchpad
+from ubuntu_archive_assistant.logging import AssistantLogger
+class LaunchpadInstance(object):
+ class __LaunchpadInstance(object):
+ def __init__(self):
+ self.logger = AssistantLogger()
+ self.lp_cachedir = os.path.expanduser(os.path.join("~", ".launchpadlib/cache"))
+ self.logger.log.debug("Using Launchpad cache dir: \"%s\"" % self.lp_cachedir)
+ self.lp = Launchpad.login_with('ubuntu-archive-assisant',
+ service_root='production',
+ launchpadlib_dir=self.lp_cachedir,
+ version='devel')
+ instance = None
+ def __init__(self, module=None, depth=0):
+ if not LaunchpadInstance.instance:
+ LaunchpadInstance.instance = LaunchpadInstance.__LaunchpadInstance()
+ self.lp = LaunchpadInstance.instance.lp
+ self.ubuntu = self.lp.distributions['ubuntu']
+ def lp(self):
+ return self.lp
+ def ubuntu(self):
+ return self.ubuntu
+ def ubuntu_archive(self):
+ return self.ubuntu.main_archive
+ def current_series(self):
+ return self.ubuntu.current_series
diff --git a/ubuntu_archive_assistant/utils/urlhandling.py b/ubuntu_archive_assistant/utils/urlhandling.py
new file mode 100644
index 0000000..ddb0bad
--- /dev/null
+++ b/ubuntu_archive_assistant/utils/urlhandling.py
@@ -0,0 +1,50 @@
+# -*- coding: utf-8 -*-
+# Copyright (C) 2018 Canonical Ltd.
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; version 3 of the License.
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# GNU General Public License for more details.
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+import os
+import requests
+from urllib.request import FancyURLopener
+class URLRetrieverWithProgress(object):
+ def __init__(self, url, filename):
+ self.url = url
+ self.filename = filename
+ self.url_opener = FancyURLopener()
+ def get(self):
+ self.url_opener.retrieve(self.url, self.filename, self._report_download)
+ def _report_download(self, blocks_read, block_size, total_size):
+ size_read = blocks_read * block_size
+ percent = size_read/total_size*100
+ if percent <= 100:
+ print("Refreshing %s: %.0f %%" % (os.path.basename(self.filename), percent), end='\r')
+ else:
+ print(" " * 80, end='\r')
+def get_with_progress(url=None, filename=None):
+ retriever = URLRetrieverWithProgress(url, filename)
+ response = retriever.get()
+ return response
+def get(url=None):
+ response = requests.get(url=url)
+ return response