parent
7abcf481da
commit
47f5f0da72
@ -0,0 +1,115 @@
|
|||||||
|
import os
|
||||||
|
import json
|
||||||
|
import urllib.request
|
||||||
|
import urllib.parse
|
||||||
|
|
||||||
|
from collections import defaultdict
|
||||||
|
|
||||||
|
from policies.policy import BasePolicy, PolicyVerdict
|
||||||
|
|
||||||
|
|
||||||
|
LAUNCHPAD_URL = 'https://api.launchpad.net/1.0/'
|
||||||
|
PRIMARY = LAUNCHPAD_URL + 'ubuntu/+archive/primary'
|
||||||
|
|
||||||
|
|
||||||
|
def query_lp_rest_api(obj, query):
|
||||||
|
"""Do a Launchpad REST request
|
||||||
|
|
||||||
|
Request <LAUNCHPAD_URL><obj>?<query>.
|
||||||
|
|
||||||
|
Returns dict of parsed json result from launchpad.
|
||||||
|
Raises HTTPError, ValueError, or ConnectionError based on different
|
||||||
|
transient failures connecting to launchpad.
|
||||||
|
"""
|
||||||
|
url = '%s%s?%s' % (LAUNCHPAD_URL, obj, urllib.parse.urlencode(query))
|
||||||
|
with urllib.request.urlopen(url, timeout=10) as req:
|
||||||
|
code = req.getcode()
|
||||||
|
if 200 <= code < 300:
|
||||||
|
return json.loads(req.read().decode('UTF-8'))
|
||||||
|
raise ConnectionError('Failed to reach launchpad, HTTP %s' % code)
|
||||||
|
|
||||||
|
|
||||||
|
class SourcePPAPolicy(BasePolicy):
|
||||||
|
"""Migrate packages copied from same source PPA together
|
||||||
|
|
||||||
|
This policy will query launchpad to determine what source PPA packages
|
||||||
|
were copied from, and ensure that all packages from the same PPA migrate
|
||||||
|
together.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, options):
|
||||||
|
super().__init__('source-ppa', options, {'unstable'})
|
||||||
|
self.filename = os.path.join(options.unstable, 'SourcePPA')
|
||||||
|
# Dict of dicts; maps pkg name -> pkg version -> source PPA URL
|
||||||
|
self.source_ppas_by_pkg = defaultdict(dict)
|
||||||
|
# Dict of sets; maps source PPA URL -> set of source names
|
||||||
|
self.pkgs_by_source_ppa = defaultdict(set)
|
||||||
|
self.britney = None
|
||||||
|
# self.cache contains self.source_ppas_by_pkg from previous run
|
||||||
|
self.cache = {}
|
||||||
|
|
||||||
|
def lp_get_source_ppa(self, pkg, version):
|
||||||
|
"""Ask LP what source PPA pkg was copied from"""
|
||||||
|
cached = self.cache.get(pkg, {}).get(version)
|
||||||
|
if cached is not None:
|
||||||
|
return cached
|
||||||
|
|
||||||
|
data = query_lp_rest_api('%s/%s' % (self.options.distribution, self.options.series), {
|
||||||
|
'ws.op': 'getPackageUploads',
|
||||||
|
'archive': PRIMARY,
|
||||||
|
'pocket': 'Proposed',
|
||||||
|
'name': pkg,
|
||||||
|
'version': version,
|
||||||
|
'exact_match': 'true',
|
||||||
|
})
|
||||||
|
try:
|
||||||
|
return data['entries'][0]['copy_source_archive_link'] or ''
|
||||||
|
# IndexError means no packages in -proposed matched this name/version,
|
||||||
|
# which is expected to happen when bileto runs britney.
|
||||||
|
except IndexError:
|
||||||
|
return ''
|
||||||
|
|
||||||
|
def initialise(self, britney):
|
||||||
|
"""Load and discover all source ppa data"""
|
||||||
|
super().initialise(britney)
|
||||||
|
self.britney = britney
|
||||||
|
|
||||||
|
try:
|
||||||
|
with open(self.filename, encoding='utf-8') as data:
|
||||||
|
self.cache = json.load(data)
|
||||||
|
self.log("Loaded cached source ppa data from %s" % self.filename)
|
||||||
|
except FileNotFoundError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
def apply_policy_impl(self, sourceppa_info, suite, source_name, source_data_tdist, source_data_srcdist, excuse):
|
||||||
|
"""Reject package if any other package copied from same PPA is invalid"""
|
||||||
|
accept = excuse.is_valid
|
||||||
|
britney_excuses = self.britney.excuses
|
||||||
|
version = source_data_srcdist.version
|
||||||
|
sourceppa = self.lp_get_source_ppa(source_name, version)
|
||||||
|
self.source_ppas_by_pkg[source_name][version] = sourceppa
|
||||||
|
if sourceppa:
|
||||||
|
# Check for other packages that might invalidate this one
|
||||||
|
for friend in self.pkgs_by_source_ppa[sourceppa]:
|
||||||
|
if not britney_excuses[friend].is_valid:
|
||||||
|
accept = False
|
||||||
|
self.pkgs_by_source_ppa[sourceppa].add(source_name)
|
||||||
|
if not accept:
|
||||||
|
# Invalidate all packages in this source ppa
|
||||||
|
shortppa = sourceppa.replace(LAUNCHPAD_URL, '')
|
||||||
|
for friend in self.pkgs_by_source_ppa[sourceppa]:
|
||||||
|
friend_exc = britney_excuses.get(friend, excuse)
|
||||||
|
friend_exc.is_valid = False
|
||||||
|
friend_exc.addreason('source-ppa')
|
||||||
|
self.log("Blocking %s because %s from %s" % (friend, source_name, shortppa))
|
||||||
|
sourceppa_info[friend] = shortppa
|
||||||
|
return PolicyVerdict.REJECTED_PERMANENTLY
|
||||||
|
return PolicyVerdict.PASS
|
||||||
|
|
||||||
|
def save_state(self, britney):
|
||||||
|
"""Write source ppa data to disk"""
|
||||||
|
tmp = self.filename + '.tmp'
|
||||||
|
with open(tmp, 'w', encoding='utf-8') as data:
|
||||||
|
json.dump(self.source_ppas_by_pkg, data)
|
||||||
|
os.rename(tmp, self.filename)
|
||||||
|
self.log("Wrote source ppa data to %s" % self.filename)
|
@ -0,0 +1,7 @@
|
|||||||
|
{
|
||||||
|
"pal": {"2.0": "https://api.launchpad.net/1.0/team/ubuntu/ppa"},
|
||||||
|
"buddy": {"2.0": "https://api.launchpad.net/1.0/team/ubuntu/ppa"},
|
||||||
|
"friend": {"2.0": "https://api.launchpad.net/1.0/team/ubuntu/ppa"},
|
||||||
|
"noppa": {"2.0": ""},
|
||||||
|
"unused": {"2.0": ""}
|
||||||
|
}
|
@ -0,0 +1,133 @@
|
|||||||
|
#!/usr/bin/python3
|
||||||
|
# (C) 2016 Canonical Ltd.
|
||||||
|
#
|
||||||
|
# This program is free software; you can redistribute it and/or modify
|
||||||
|
# it under the terms of the GNU General Public License as published by
|
||||||
|
# the Free Software Foundation; either version 2 of the License, or
|
||||||
|
# (at your option) any later version.
|
||||||
|
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
import unittest
|
||||||
|
from unittest.mock import patch
|
||||||
|
|
||||||
|
PROJECT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
||||||
|
sys.path.insert(0, PROJECT_DIR)
|
||||||
|
|
||||||
|
from policies.policy import PolicyVerdict
|
||||||
|
from policies.sourceppa import LAUNCHPAD_URL, SourcePPAPolicy
|
||||||
|
|
||||||
|
|
||||||
|
CACHE_FILE = os.path.join(PROJECT_DIR, 'tests', 'data', 'sourceppa.json')
|
||||||
|
|
||||||
|
|
||||||
|
class FakeOptions:
|
||||||
|
distribution = 'testbuntu'
|
||||||
|
series = 'zazzy'
|
||||||
|
unstable = '/tmp'
|
||||||
|
verbose = False
|
||||||
|
|
||||||
|
|
||||||
|
class FakeExcuse:
|
||||||
|
ver = ('1.0', '2.0')
|
||||||
|
is_valid = True
|
||||||
|
|
||||||
|
def addreason(self, reason):
|
||||||
|
"""Ignore reasons."""
|
||||||
|
|
||||||
|
|
||||||
|
class FakeBritney:
|
||||||
|
def __init__(self):
|
||||||
|
self.excuses = dict(
|
||||||
|
pal=FakeExcuse(),
|
||||||
|
buddy=FakeExcuse(),
|
||||||
|
friend=FakeExcuse(),
|
||||||
|
noppa=FakeExcuse())
|
||||||
|
|
||||||
|
|
||||||
|
class FakeData:
|
||||||
|
version = '2.0'
|
||||||
|
|
||||||
|
|
||||||
|
class T(unittest.TestCase):
|
||||||
|
maxDiff = None
|
||||||
|
|
||||||
|
@patch('policies.sourceppa.urllib.request.urlopen')
|
||||||
|
def test_lp_rest_api_no_entries(self, urlopen):
|
||||||
|
"""Don't explode if LP reports no entries match pkg/version"""
|
||||||
|
context = urlopen.return_value.__enter__.return_value
|
||||||
|
context.getcode.return_value = 200
|
||||||
|
context.read.return_value = b'{"entries": []}'
|
||||||
|
pol = SourcePPAPolicy(FakeOptions)
|
||||||
|
self.assertEqual(pol.lp_get_source_ppa('hello', '1.0'), '')
|
||||||
|
|
||||||
|
@patch('policies.sourceppa.urllib.request.urlopen')
|
||||||
|
def test_lp_rest_api_no_source_ppa(self, urlopen):
|
||||||
|
"""Identify when package has no source PPA"""
|
||||||
|
context = urlopen.return_value.__enter__.return_value
|
||||||
|
context.getcode.return_value = 200
|
||||||
|
context.read.return_value = b'{"entries": [{"copy_source_archive_link": null, "other_stuff": "ignored"}]}'
|
||||||
|
pol = SourcePPAPolicy(FakeOptions)
|
||||||
|
self.assertEqual(pol.lp_get_source_ppa('hello', '1.0'), '')
|
||||||
|
|
||||||
|
@patch('policies.sourceppa.urllib.request.urlopen')
|
||||||
|
def test_lp_rest_api_with_source_ppa(self, urlopen):
|
||||||
|
"""Identify source PPA"""
|
||||||
|
context = urlopen.return_value.__enter__.return_value
|
||||||
|
context.getcode.return_value = 200
|
||||||
|
context.read.return_value = b'{"entries": [{"copy_source_archive_link": "https://api.launchpad.net/1.0/team/ubuntu/ppa", "other_stuff": "ignored"}]}'
|
||||||
|
pol = SourcePPAPolicy(FakeOptions)
|
||||||
|
self.assertEqual(pol.lp_get_source_ppa('hello', '1.0'), 'https://api.launchpad.net/1.0/team/ubuntu/ppa')
|
||||||
|
|
||||||
|
@patch('policies.sourceppa.urllib.request.urlopen')
|
||||||
|
def test_lp_rest_api_errors(self, urlopen):
|
||||||
|
"""Report errors instead of swallowing them"""
|
||||||
|
context = urlopen.return_value.__enter__.return_value
|
||||||
|
context.getcode.return_value = 500
|
||||||
|
context.read.return_value = b''
|
||||||
|
pol = SourcePPAPolicy(FakeOptions)
|
||||||
|
with self.assertRaisesRegex(ConnectionError, 'HTTP 500'):
|
||||||
|
pol.lp_get_source_ppa('hello', '1.0')
|
||||||
|
# Yes, I have really seen "success with no json returned" in the wild
|
||||||
|
context.getcode.return_value = 200
|
||||||
|
context.read.return_value = b''
|
||||||
|
with self.assertRaisesRegex(ValueError, 'Expecting value'):
|
||||||
|
pol.lp_get_source_ppa('hello', '1.0')
|
||||||
|
|
||||||
|
def test_approve_ppa(self):
|
||||||
|
"""Approve packages by their PPA."""
|
||||||
|
pol = SourcePPAPolicy(FakeOptions)
|
||||||
|
pol.filename = CACHE_FILE
|
||||||
|
pol.initialise(FakeBritney())
|
||||||
|
output = {}
|
||||||
|
for pkg in ('pal', 'buddy', 'friend', 'noppa'):
|
||||||
|
self.assertEqual(pol.apply_policy_impl(output, None, pkg, None, FakeData, FakeExcuse), PolicyVerdict.PASS)
|
||||||
|
self.assertEqual(output, {})
|
||||||
|
|
||||||
|
def test_reject_ppa(self):
|
||||||
|
"""Reject packages by their PPA."""
|
||||||
|
shortppa = 'team/ubuntu/ppa'
|
||||||
|
pol = SourcePPAPolicy(FakeOptions)
|
||||||
|
pol.filename = CACHE_FILE
|
||||||
|
brit = FakeBritney()
|
||||||
|
brit.excuses['buddy'].is_valid = False # Just buddy is invalid but whole ppa fails
|
||||||
|
pol.initialise(brit)
|
||||||
|
output = {}
|
||||||
|
# This one passes because the rejection isn't known yet
|
||||||
|
self.assertEqual(pol.apply_policy_impl(output, None, 'pal', None, FakeData, brit.excuses['pal']), PolicyVerdict.PASS)
|
||||||
|
# This one fails because it is itself invalid.
|
||||||
|
self.assertEqual(pol.apply_policy_impl(output, None, 'buddy', None, FakeData, brit.excuses['buddy']), PolicyVerdict.REJECTED_PERMANENTLY)
|
||||||
|
# This one fails because buddy failed before it.
|
||||||
|
self.assertEqual(pol.apply_policy_impl(output, None, 'friend', None, FakeData, brit.excuses['friend']), PolicyVerdict.REJECTED_PERMANENTLY)
|
||||||
|
# 'noppa' not from PPA so not rejected
|
||||||
|
self.assertEqual(pol.apply_policy_impl(output, None, 'noppa', None, FakeData, FakeExcuse), PolicyVerdict.PASS)
|
||||||
|
# All are rejected however
|
||||||
|
for pkg in ('pal', 'buddy', 'friend'):
|
||||||
|
self.assertFalse(brit.excuses[pkg].is_valid)
|
||||||
|
self.assertDictEqual(pol.pkgs_by_source_ppa, {
|
||||||
|
LAUNCHPAD_URL + shortppa: {'pal', 'buddy', 'friend'}})
|
||||||
|
self.assertEqual(output, dict(pal=shortppa, buddy=shortppa, friend=shortppa))
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
unittest.main()
|
Loading…
Reference in new issue