britney2-ubuntu/tests/test_email.py
Iain Lane 400a6c57c4 email: When there are multiple SPPHs, use the last one
We were just seeing an interesting crash in britney. It was trying to
look up the gpg_keys of ~python-modules-team, which is a suspended
account - in LP API terms that's a HTTP error 410.

  https://api.launchpad.net/devel/~python-modules-team/gpg_keys

The direct error was fixed in commit 272f41c, but we actually should
*not* have been trying to look up this team's email address in the first
place. This upload was an auto-sync and so should not cause email to be
sent. The problem is that it was synced into universe and then promoted
into main. We were looking at the SPPH for after the promotion, which
has different values in the various signer/creator/sponsor/... fields,
and that made us think that it was a regular upload to email about.

Fix this by always looking at the oldest SPPH which should correspond to
the initial upload and not whatever happened to it afterwards.
2017-11-09 13:04:33 +00:00

458 lines
18 KiB
Python
Executable File

#!/usr/bin/python3
# (C) 2017 Canonical Ltd.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
from collections import defaultdict
import fileinput
import json
import os
import pprint
import sys
import unittest
import yaml
from unittest.mock import DEFAULT, patch, call
PROJECT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.insert(0, PROJECT_DIR)
from britney2.policies.policy import PolicyVerdict
from britney2.policies.email import EmailPolicy, person_chooser, address_chooser
from tests.test_sourceppa import FakeOptions
from tests import TestBase
from tests.mock_smtpd import FakeSMTPServer
# Example of a direct upload by core dev: openstack-doc-tools 1.5.0-0ubuntu1
# https://api.launchpad.net/1.0/ubuntu/+archive/primary/+sourcepub/7524835
UPLOAD = dict(
creator_link=None,
package_creator_link='https://api.launchpad.net/1.0/~zulcss',
package_signer_link='https://api.launchpad.net/1.0/~zulcss',
sponsor_link=None,
)
# Example of a sponsored upload: kerneloops 0.12+git20140509-2ubuntu1
# https://api.launchpad.net/1.0/ubuntu/+archive/primary/+sourcepub/7013597
SPONSORED_UPLOAD = dict(
creator_link=None,
package_creator_link='https://api.launchpad.net/1.0/~smb',
package_signer_link='https://api.launchpad.net/1.0/~apw',
sponsor_link=None,
)
# Example of a bileto upload: autopilot 1.6.0+17.04.20170302-0ubuntu1
# https://api.launchpad.net/1.0/ubuntu/+archive/primary/+sourcepub/7525085
# (dobey clicked 'build' and sil2100 clicked 'publish')
BILETO = dict(
creator_link='https://api.launchpad.net/1.0/~sil2100',
package_creator_link='https://api.launchpad.net/1.0/~dobey',
package_signer_link='https://api.launchpad.net/1.0/~ci-train-bot',
sponsor_link='https://api.launchpad.net/1.0/~ubuntu-archive-robot',
)
# Example of a non-sponsored copy: linux 4.10.0-11.13
# https://api.launchpad.net/1.0/ubuntu/+archive/primary/+sourcepub/7522481
# (the upload to the PPA was sponsored but the copy was done directly)
UNSPONSORED_COPY = dict(
creator_link='https://api.launchpad.net/1.0/~timg-tpi',
package_creator_link='https://api.launchpad.net/1.0/~sforshee',
package_signer_link='https://api.launchpad.net/1.0/~timg-tpi',
sponsor_link=None,
)
# Example of a sponsored copy: pagein 0.00.03-1
# https://api.launchpad.net/1.0/ubuntu/+archive/primary/+sourcepub/7533336
SPONSORED_COPY = dict(
creator_link='https://api.launchpad.net/1.0/~colin-king',
package_creator_link='https://api.launchpad.net/1.0/~colin-king',
package_signer_link=None,
sponsor_link='https://api.launchpad.net/1.0/~mapreri',
)
# Example of a manual debian sync: systemd 232-19
# https://api.launchpad.net/1.0/ubuntu/+archive/primary/+sourcepub/7522736
MANUAL_SYNC = dict(
creator_link='https://api.launchpad.net/1.0/~costamagnagianfranco',
package_creator_link='https://api.launchpad.net/1.0/~pkg-systemd-maintainers',
package_signer_link=None,
sponsor_link=None,
)
# Example of a sponsored manual debian sync: python-pymysql 0.7.9-2
# https://api.launchpad.net/1.0/ubuntu/+archive/primary/+sourcepub/7487820
SPONSORED_MANUAL_SYNC = dict(
creator_link='https://api.launchpad.net/1.0/~lars-tangvald',
package_creator_link='https://api.launchpad.net/1.0/~openstack-1.0',
package_signer_link=None,
sponsor_link='https://api.launchpad.net/1.0/~racb',
)
# Example of an automatic debian sync: gem2deb 0.33.1
# https://api.launchpad.net/1.0/ubuntu/+archive/primary/+sourcepub/7255529
AUTO_SYNC = dict(
creator_link='https://api.launchpad.net/1.0/~katie',
package_creator_link='https://api.launchpad.net/1.0/~pkg-ruby-extras-maintainers',
package_signer_link=None,
sponsor_link='https://api.launchpad.net/1.0/~ubuntu-archive-robot',
)
PROMOTED_AUTO_SYNC = [
dict(
creator_link=None,
package_creator_link='https://api.launchpad.net/1.0/~pkg-ruby-extras-maintainers',
package_signer_link=None,
sponsor_link=None,),
dict(
creator_link='https://api.launchpad.net/1.0/~katie',
package_creator_link='https://api.launchpad.net/1.0/~pkg-ruby-extras-maintainers',
package_signer_link=None,
sponsor_link='https://api.launchpad.net/1.0/~ubuntu-archive-robot',)
]
# address lists
UBUNTU = ['personal@gmail.com', 'ubuntu@ubuntu.com', 'work@canonical.com']
CANONICAL = ['personal@gmail.com', 'work@canonical.com']
COMMUNITY = ['personal@gmail.com', 'other@gmail.com']
def retvals(retvals):
"""Return different retvals on different calls of mock."""
def returner(*args, **kwargs):
return retvals.pop()
return returner
class FakeSourceData:
version = '55.0'
class FakeExcuse:
is_valid = True
daysold = 0
reason = []
current_policy_verdict = PolicyVerdict.PASS
class T(unittest.TestCase):
maxDiff = None
def test_person_chooser(self):
"""Find the correct person to blame for an upload."""
self.assertEqual(person_chooser(UPLOAD), {
'https://api.launchpad.net/1.0/~zulcss',
})
self.assertEqual(person_chooser(SPONSORED_UPLOAD), {
'https://api.launchpad.net/1.0/~apw',
'https://api.launchpad.net/1.0/~smb'
})
self.assertEqual(person_chooser(BILETO), {
'https://api.launchpad.net/1.0/~dobey',
'https://api.launchpad.net/1.0/~sil2100',
})
self.assertEqual(person_chooser(UNSPONSORED_COPY), {
'https://api.launchpad.net/1.0/~timg-tpi',
})
self.assertEqual(person_chooser(SPONSORED_COPY), {
'https://api.launchpad.net/1.0/~colin-king',
'https://api.launchpad.net/1.0/~mapreri',
})
self.assertEqual(person_chooser(MANUAL_SYNC), {
'https://api.launchpad.net/1.0/~costamagnagianfranco',
})
self.assertSequenceEqual(person_chooser(SPONSORED_MANUAL_SYNC), {
'https://api.launchpad.net/1.0/~lars-tangvald',
'https://api.launchpad.net/1.0/~racb',
})
self.assertEqual(person_chooser(AUTO_SYNC), set())
def test_address_chooser(self):
"""Prioritize email addresses correctly."""
self.assertEqual(address_chooser(UBUNTU), 'ubuntu@ubuntu.com')
self.assertEqual(address_chooser(CANONICAL), 'work@canonical.com')
self.assertEqual(address_chooser(COMMUNITY), 'personal@gmail.com')
@patch('britney2.policies.email.EmailPolicy.query_rest_api')
@patch('britney2.policies.email.EmailPolicy.query_lp_rest_api')
def test_email_promoted_package(self, lp, rest):
"""When a package has been promoted in proposed, we find the older SPPH
and use its details - in the case of an autosync to not email."""
lp.return_value = dict(entries=PROMOTED_AUTO_SYNC)
e = EmailPolicy(FakeOptions, None)
self.assertEqual(e.lp_get_emails('openstack-doct-tools', '1.5.0-0ubuntu1'), [])
self.assertSequenceEqual(lp.mock_calls, [
call('testbuntu/+archive/primary', {
'distro_series': '/testbuntu/zazzy',
'exact_match': 'true',
'order_by_date': 'true',
'pocket': 'Proposed',
'source_name': 'openstack-doct-tools',
'version': '1.5.0-0ubuntu1',
'ws.op': 'getPublishedSources',
})
])
self.assertSequenceEqual(rest.mock_calls, [])
@patch('britney2.policies.email.EmailPolicy.query_rest_api')
@patch('britney2.policies.email.EmailPolicy.query_lp_rest_api')
def test_email_scraping(self, lp, rest):
"""Poke correct REST APIs to find email addresses."""
lp.side_effect = retvals([
dict(entries=[dict(fingerprint='DEFACED_ED1F1CE')]),
dict(entries=[UPLOAD]),
])
rest.return_value = 'uid:Defaced Edifice <ex@example.com>:12345::'
e = EmailPolicy(FakeOptions, None)
self.assertEqual(e.lp_get_emails('openstack-doct-tools', '1.5.0-0ubuntu1'), ['ex@example.com'])
self.assertSequenceEqual(lp.mock_calls, [
call('testbuntu/+archive/primary', {
'distro_series': '/testbuntu/zazzy',
'exact_match': 'true',
'order_by_date': 'true',
'pocket': 'Proposed',
'source_name': 'openstack-doct-tools',
'version': '1.5.0-0ubuntu1',
'ws.op': 'getPublishedSources',
}),
call('https://api.launchpad.net/1.0/~zulcss/gpg_keys', {})
])
self.assertSequenceEqual(rest.mock_calls, [
call('http://keyserver.ubuntu.com/pks/lookup', {
'exact': 'on',
'op': 'index',
'options': 'mr',
'search': '0xDEFACED_ED1F1CE',
})
])
@patch('britney2.policies.email.EmailPolicy.lp_get_emails')
@patch('britney2.policies.email.smtplib')
def test_smtp_not_sent(self, smtp, lp):
"""Know when not to send any emails."""
lp.return_value = ['example@email.com']
e = EmailPolicy(FakeOptions, None)
FakeExcuse.daysold = 0.002
e.apply_policy_impl(None, None, 'chromium-browser', None, FakeSourceData, FakeExcuse)
FakeExcuse.daysold = 2.98
e.apply_policy_impl(None, None, 'chromium-browser', None, FakeSourceData, FakeExcuse)
# Would email but no address found
FakeExcuse.daysold = 10.12
lp.return_value = []
e.apply_policy_impl(None, None, 'chromium-browser', None, FakeSourceData, FakeExcuse)
self.assertEqual(smtp.mock_calls, [])
@patch('britney2.policies.email.EmailPolicy.lp_get_emails')
@patch('britney2.policies.email.smtplib')
def test_smtp_sent(self, smtp, lp):
"""Send emails correctly."""
lp.return_value = ['email@address.com']
e = EmailPolicy(FakeOptions, None)
FakeExcuse.is_valid = False
FakeExcuse.daysold = 100
e.apply_policy_impl(None, None, 'chromium-browser', None, FakeSourceData, FakeExcuse)
smtp.SMTP.assert_called_once_with('localhost')
@patch('britney2.policies.email.EmailPolicy.lp_get_emails')
@patch('britney2.policies.email.smtplib', autospec=True)
def smtp_repetition(self, smtp, lp, valid, expected):
"""Resend mails periodically, with decreasing frequency."""
if not isinstance(valid,list):
valid = [valid]*len(expected)
FakeExcuse.is_valid = valid
lp.return_value = ['email@address.com']
sendmail = smtp.SMTP().sendmail
e = EmailPolicy(FakeOptions, None)
called = []
e.cache = {}
for hours in range(0, 5000):
previous = sendmail.call_count
age = hours / 24
FakeExcuse.daysold = age
try:
FakeExcuse.is_valid = valid[len(called)]
except IndexError:
# we've already gotten all the mails we expect
pass
e.apply_policy_impl(None, None, 'unity8', None, FakeSourceData, FakeExcuse)
if sendmail.call_count > previous:
e.initialise(None) # Refill e.cache from disk
called.append(age)
name, args, kwargs = sendmail.mock_calls[-1]
text = args[2]
self.assertNotIn(' 1 days.', text)
self.assertSequenceEqual(called, expected)
def test_smtp_repetition(self):
"""Confirm that emails are sent at appropriate intervals."""
# Emails were sent when daysold reached these values:
self.smtp_repetition(valid=False, expected=[
1, 3, 7, 15, 31, 61, 91, 121, 151, 181
])
self.smtp_repetition(valid=True, expected=[
5, 7, 11, 19, 35, 65, 95, 125, 155, 185
])
self.smtp_repetition(valid=[False, False, True], expected=[
1, 3, 5, 7, 11, 19, 35, 65, 95, 125, 155, 185
])
self.smtp_repetition(valid=[False, False, True, False, True], expected=[
1, 3, 5, 7, 11, 19, 35, 65, 95, 125, 155, 185
])
class ET(TestBase):
''' Test sending mail through a mocked SMTP server '''
@classmethod
def setUpClass(cls):
cls.smtpd = FakeSMTPServer('localhost', 1337)
cls.smtpd.run()
@classmethod
def tearDownClass(cls):
cls.smtpd.close()
def setUp(self):
super().setUp()
# disable ADT, not relevant for us
for line in fileinput.input(self.britney_conf, inplace=True):
if line.startswith('ADT_ENABLE'):
print('ADT_ENABLE = no')
elif line.startswith('MINDAYS_EMERGENCY'):
print('MINDAYS_EMERGENCY = 10')
elif not line.startswith('ADT_AMQP') and not line.startswith('ADT_SWIFT_URL'):
sys.stdout.write(line)
# and set up a fake smtpd
with open(self.britney_conf, 'a') as f:
f.write('EMAIL_HOST = localhost:1337')
self.sourceppa_cache = {}
self.email_cache = {}
self.data.add('libc6', False)
def do_test(self, unstable_add, expect_emails):
'''Run britney with some unstable packages and verify excuses.
unstable_add is a list of (binpkgname, field_dict, daysold, emails)
expect_emails is a list that is checked against the emails sent during
this do_test run.
Return (output, excuses_dict, excuses_html, emails).
'''
ET.smtpd.emails.clear()
age_file = os.path.join(self.data.path,
'data',
'series',
'Dates')
email_cache_file = os.path.join(self.data.path,
'data',
'series-proposed',
'EmailCache')
for (pkg, fields, daysold, emails) in unstable_add:
self.data.add(pkg, True, fields, True, None)
self.sourceppa_cache.setdefault(pkg, {})
if fields['Version'] not in self.sourceppa_cache[pkg]:
self.sourceppa_cache[pkg][fields['Version']] = ''
with open(age_file, 'w') as f:
import time
do = time.time() - (60 * 60 * 24 * daysold)
f.write('%s %s %d' % (pkg, fields['Version'], do))
with open(email_cache_file, 'w') as f:
d = defaultdict(dict)
d[pkg][fields['Version']] = (emails, 0)
f.write(json.dumps(d))
# Set up sourceppa cache for testing
sourceppa_path = os.path.join(self.data.dirs[True], 'SourcePPA')
with open(sourceppa_path, 'w', encoding='utf-8') as sourceppa:
sourceppa.write(json.dumps(self.sourceppa_cache))
(excuses_yaml, excuses_html, out) = self.run_britney()
# convert excuses to source indexed dict
excuses_dict = {}
for s in yaml.load(excuses_yaml)['sources']:
excuses_dict[s['source']] = s
if 'SHOW_EXCUSES' in os.environ:
print('------- excuses -----')
pprint.pprint(excuses_dict, width=200)
if 'SHOW_HTML' in os.environ:
print('------- excuses.html -----\n%s\n' % excuses_html)
if 'SHOW_OUTPUT' in os.environ:
print('------- output -----\n%s\n' % out)
self.assertNotIn('FIXME', out)
# check all the emails that we asked for are there
for email in expect_emails:
self.assertIn(email, ET.smtpd.get_emails())
self.assertEqual(len(ET.smtpd.get_emails()), len(expect_emails))
return (out, excuses_dict, excuses_html, ET.smtpd.emails)
def test_email_sent(self):
'''Test that an email is sent through the SMTP server'''
pkg = ('libc6', {'Version': '2',
'Depends': 'notavailable (>= 2)'},
6,
['foo@bar.com'])
self.do_test([pkg], ["foo@bar.com"])
def test_email_not_sent_block_all_source(self):
'''Test that an email is not sent if the package is blocked by a
block-all source hint'''
self.create_hint('freeze', 'block-all source')
pkg = ('libc6', {'Version': '2'},
6, # daysold
['foo@bar.com'])
self.do_test([pkg], [])
def test_email_not_sent_blocked(self):
'''Test that an email is not sent if the package is blocked by a block hint'''
self.create_hint('freeze', 'block libc6')
pkg = ('libc6', {'Version': '2'},
6, # daysold
['foo@bar.com'])
self.do_test([pkg], [])
def test_email_sent_unblocked(self):
'''Test that an email is sent if the package is unblocked'''
self.create_hint('freeze', 'block libc6')
self.create_hint('laney', 'unblock libc6/2')
pkg = ('libc6', {'Version': '2',
'Depends': 'notavailable (>= 2)'},
6, # daysold
['foo@bar.com'])
self.do_test([pkg], ['foo@bar.com'])
def test_email_not_sent_rejected_temporarily(self):
'''Test that an email is not sent if the package is REJECTED_TEMPORARILY'''
urgency_file = os.path.join(self.data.path,
'data',
'series',
'Urgency')
with open(urgency_file, 'w') as f:
# we specified in setUp() that emergency has a 10 day delay, and
# age rejections are REJECTED_TEMPORARILY
f.write('libc6 2 emergency')
pkg = ('libc6', {'Version': '2',
'Depends': 'notavailable (>= 2)'},
6, # daysold
['foo@bar.com'])
self.do_test([pkg], [])
if __name__ == '__main__':
unittest.main()