#!/usr/bin/python3 # (C) 2017 Canonical Ltd. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. from collections import defaultdict import fileinput import json import os import pprint import sys import unittest import yaml from unittest.mock import DEFAULT, patch, call PROJECT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.insert(0, PROJECT_DIR) from britney2.policies.policy import PolicyVerdict from britney2.policies.email import EmailPolicy, person_chooser, address_chooser from tests.test_sourceppa import FakeOptions from tests import TestBase from tests.mock_smtpd import FakeSMTPServer # Example of a direct upload by core dev: openstack-doc-tools 1.5.0-0ubuntu1 # https://api.launchpad.net/1.0/ubuntu/+archive/primary/+sourcepub/7524835 UPLOAD = dict( creator_link=None, package_creator_link='https://api.launchpad.net/1.0/~zulcss', package_signer_link='https://api.launchpad.net/1.0/~zulcss', sponsor_link=None, ) # Example of a sponsored upload: kerneloops 0.12+git20140509-2ubuntu1 # https://api.launchpad.net/1.0/ubuntu/+archive/primary/+sourcepub/7013597 SPONSORED_UPLOAD = dict( creator_link=None, package_creator_link='https://api.launchpad.net/1.0/~smb', package_signer_link='https://api.launchpad.net/1.0/~apw', sponsor_link=None, ) # Example of a bileto upload: autopilot 1.6.0+17.04.20170302-0ubuntu1 # https://api.launchpad.net/1.0/ubuntu/+archive/primary/+sourcepub/7525085 # (dobey clicked 'build' and sil2100 clicked 'publish') BILETO = dict( creator_link='https://api.launchpad.net/1.0/~sil2100', package_creator_link='https://api.launchpad.net/1.0/~dobey', package_signer_link='https://api.launchpad.net/1.0/~ci-train-bot', sponsor_link='https://api.launchpad.net/1.0/~ubuntu-archive-robot', ) # Example of a non-sponsored copy: linux 4.10.0-11.13 # https://api.launchpad.net/1.0/ubuntu/+archive/primary/+sourcepub/7522481 # (the upload to the PPA was sponsored but the copy was done directly) UNSPONSORED_COPY = dict( creator_link='https://api.launchpad.net/1.0/~timg-tpi', package_creator_link='https://api.launchpad.net/1.0/~sforshee', package_signer_link='https://api.launchpad.net/1.0/~timg-tpi', sponsor_link=None, ) # Example of a sponsored copy: pagein 0.00.03-1 # https://api.launchpad.net/1.0/ubuntu/+archive/primary/+sourcepub/7533336 SPONSORED_COPY = dict( creator_link='https://api.launchpad.net/1.0/~colin-king', package_creator_link='https://api.launchpad.net/1.0/~colin-king', package_signer_link=None, sponsor_link='https://api.launchpad.net/1.0/~mapreri', ) # Example of a manual debian sync: systemd 232-19 # https://api.launchpad.net/1.0/ubuntu/+archive/primary/+sourcepub/7522736 MANUAL_SYNC = dict( creator_link='https://api.launchpad.net/1.0/~costamagnagianfranco', package_creator_link='https://api.launchpad.net/1.0/~pkg-systemd-maintainers', package_signer_link=None, sponsor_link=None, ) # Example of a sponsored manual debian sync: python-pymysql 0.7.9-2 # https://api.launchpad.net/1.0/ubuntu/+archive/primary/+sourcepub/7487820 SPONSORED_MANUAL_SYNC = dict( creator_link='https://api.launchpad.net/1.0/~lars-tangvald', package_creator_link='https://api.launchpad.net/1.0/~openstack-1.0', package_signer_link=None, sponsor_link='https://api.launchpad.net/1.0/~racb', ) # Example of an automatic debian sync: gem2deb 0.33.1 # https://api.launchpad.net/1.0/ubuntu/+archive/primary/+sourcepub/7255529 AUTO_SYNC = dict( creator_link='https://api.launchpad.net/1.0/~katie', package_creator_link='https://api.launchpad.net/1.0/~pkg-ruby-extras-maintainers', package_signer_link=None, sponsor_link='https://api.launchpad.net/1.0/~ubuntu-archive-robot', ) PROMOTED_AUTO_SYNC = [ dict( creator_link=None, package_creator_link='https://api.launchpad.net/1.0/~pkg-ruby-extras-maintainers', package_signer_link=None, sponsor_link=None,), dict( creator_link='https://api.launchpad.net/1.0/~katie', package_creator_link='https://api.launchpad.net/1.0/~pkg-ruby-extras-maintainers', package_signer_link=None, sponsor_link='https://api.launchpad.net/1.0/~ubuntu-archive-robot',) ] # address lists UBUNTU = ['personal@gmail.com', 'ubuntu@ubuntu.com', 'work@canonical.com'] CANONICAL = ['personal@gmail.com', 'work@canonical.com'] COMMUNITY = ['personal@gmail.com', 'other@gmail.com'] def retvals(retvals): """Return different retvals on different calls of mock.""" def returner(*args, **kwargs): return retvals.pop() return returner class FakeSourceData: version = '55.0' class FakeExcuse: is_valid = True daysold = 0 reason = [] current_policy_verdict = PolicyVerdict.PASS class T(unittest.TestCase): maxDiff = None def test_person_chooser(self): """Find the correct person to blame for an upload.""" self.assertEqual(person_chooser(UPLOAD), { 'https://api.launchpad.net/1.0/~zulcss', }) self.assertEqual(person_chooser(SPONSORED_UPLOAD), { 'https://api.launchpad.net/1.0/~apw', 'https://api.launchpad.net/1.0/~smb' }) self.assertEqual(person_chooser(BILETO), { 'https://api.launchpad.net/1.0/~dobey', 'https://api.launchpad.net/1.0/~sil2100', }) self.assertEqual(person_chooser(UNSPONSORED_COPY), { 'https://api.launchpad.net/1.0/~timg-tpi', }) self.assertEqual(person_chooser(SPONSORED_COPY), { 'https://api.launchpad.net/1.0/~colin-king', 'https://api.launchpad.net/1.0/~mapreri', }) self.assertEqual(person_chooser(MANUAL_SYNC), { 'https://api.launchpad.net/1.0/~costamagnagianfranco', }) self.assertSequenceEqual(person_chooser(SPONSORED_MANUAL_SYNC), { 'https://api.launchpad.net/1.0/~lars-tangvald', 'https://api.launchpad.net/1.0/~racb', }) self.assertEqual(person_chooser(AUTO_SYNC), set()) def test_address_chooser(self): """Prioritize email addresses correctly.""" self.assertEqual(address_chooser(UBUNTU), 'ubuntu@ubuntu.com') self.assertEqual(address_chooser(CANONICAL), 'work@canonical.com') self.assertEqual(address_chooser(COMMUNITY), 'personal@gmail.com') @patch('britney2.policies.email.EmailPolicy.query_rest_api') @patch('britney2.policies.email.EmailPolicy.query_lp_rest_api') def test_email_promoted_package(self, lp, rest): """When a package has been promoted in proposed, we find the older SPPH and use its details - in the case of an autosync to not email.""" lp.return_value = dict(entries=PROMOTED_AUTO_SYNC) e = EmailPolicy(FakeOptions, None) self.assertEqual(e.lp_get_emails('openstack-doct-tools', '1.5.0-0ubuntu1'), []) self.assertSequenceEqual(lp.mock_calls, [ call('testbuntu/+archive/primary', { 'distro_series': '/testbuntu/zazzy', 'exact_match': 'true', 'order_by_date': 'true', 'pocket': 'Proposed', 'source_name': 'openstack-doct-tools', 'version': '1.5.0-0ubuntu1', 'ws.op': 'getPublishedSources', }) ]) self.assertSequenceEqual(rest.mock_calls, []) @patch('britney2.policies.email.EmailPolicy.query_rest_api') @patch('britney2.policies.email.EmailPolicy.query_lp_rest_api') def test_email_scraping(self, lp, rest): """Poke correct REST APIs to find email addresses.""" lp.side_effect = retvals([ dict(entries=[dict(fingerprint='DEFACED_ED1F1CE')]), dict(entries=[UPLOAD]), ]) rest.return_value = 'uid:Defaced Edifice :12345::' e = EmailPolicy(FakeOptions, None) self.assertEqual(e.lp_get_emails('openstack-doct-tools', '1.5.0-0ubuntu1'), ['ex@example.com']) self.assertSequenceEqual(lp.mock_calls, [ call('testbuntu/+archive/primary', { 'distro_series': '/testbuntu/zazzy', 'exact_match': 'true', 'order_by_date': 'true', 'pocket': 'Proposed', 'source_name': 'openstack-doct-tools', 'version': '1.5.0-0ubuntu1', 'ws.op': 'getPublishedSources', }), call('https://api.launchpad.net/1.0/~zulcss/gpg_keys', {}) ]) self.assertSequenceEqual(rest.mock_calls, [ call('http://keyserver.ubuntu.com/pks/lookup', { 'exact': 'on', 'op': 'index', 'options': 'mr', 'search': '0xDEFACED_ED1F1CE', }) ]) @patch('britney2.policies.email.EmailPolicy.lp_get_emails') @patch('britney2.policies.email.smtplib') def test_smtp_not_sent(self, smtp, lp): """Know when not to send any emails.""" lp.return_value = ['example@email.com'] e = EmailPolicy(FakeOptions, None) FakeExcuse.daysold = 0.002 e.apply_policy_impl(None, None, 'chromium-browser', None, FakeSourceData, FakeExcuse) FakeExcuse.daysold = 2.98 e.apply_policy_impl(None, None, 'chromium-browser', None, FakeSourceData, FakeExcuse) # Would email but no address found FakeExcuse.daysold = 10.12 lp.return_value = [] e.apply_policy_impl(None, None, 'chromium-browser', None, FakeSourceData, FakeExcuse) self.assertEqual(smtp.mock_calls, []) @patch('britney2.policies.email.EmailPolicy.lp_get_emails') @patch('britney2.policies.email.smtplib') def test_smtp_sent(self, smtp, lp): """Send emails correctly.""" lp.return_value = ['email@address.com'] e = EmailPolicy(FakeOptions, None) FakeExcuse.is_valid = False FakeExcuse.daysold = 100 e.apply_policy_impl(None, None, 'chromium-browser', None, FakeSourceData, FakeExcuse) smtp.SMTP.assert_called_once_with('localhost') @patch('britney2.policies.email.EmailPolicy.lp_get_emails') @patch('britney2.policies.email.smtplib', autospec=True) def smtp_repetition(self, smtp, lp, valid, expected): """Resend mails periodically, with decreasing frequency.""" if not isinstance(valid,list): valid = [valid]*len(expected) FakeExcuse.is_valid = valid lp.return_value = ['email@address.com'] sendmail = smtp.SMTP().sendmail e = EmailPolicy(FakeOptions, None) called = [] e.cache = {} for hours in range(0, 5000): previous = sendmail.call_count age = hours / 24 FakeExcuse.daysold = age try: FakeExcuse.is_valid = valid[len(called)] except IndexError: # we've already gotten all the mails we expect pass e.apply_policy_impl(None, None, 'unity8', None, FakeSourceData, FakeExcuse) if sendmail.call_count > previous: e.initialise(None) # Refill e.cache from disk called.append(age) name, args, kwargs = sendmail.mock_calls[-1] text = args[2] self.assertNotIn(' 1 days.', text) self.assertSequenceEqual(called, expected) def test_smtp_repetition(self): """Confirm that emails are sent at appropriate intervals.""" # Emails were sent when daysold reached these values: self.smtp_repetition(valid=False, expected=[ 1, 3, 7, 15, 31, 61, 91, 121, 151, 181 ]) self.smtp_repetition(valid=True, expected=[ 5, 7, 11, 19, 35, 65, 95, 125, 155, 185 ]) self.smtp_repetition(valid=[False, False, True], expected=[ 1, 3, 5, 7, 11, 19, 35, 65, 95, 125, 155, 185 ]) self.smtp_repetition(valid=[False, False, True, False, True], expected=[ 1, 3, 5, 7, 11, 19, 35, 65, 95, 125, 155, 185 ]) class ET(TestBase): ''' Test sending mail through a mocked SMTP server ''' @classmethod def setUpClass(cls): cls.smtpd = FakeSMTPServer('localhost', 1337) cls.smtpd.run() @classmethod def tearDownClass(cls): cls.smtpd.close() def setUp(self): super().setUp() # disable ADT, not relevant for us for line in fileinput.input(self.britney_conf, inplace=True): if line.startswith('ADT_ENABLE'): print('ADT_ENABLE = no') elif line.startswith('MINDAYS_EMERGENCY'): print('MINDAYS_EMERGENCY = 10') elif not line.startswith('ADT_AMQP') and not line.startswith('ADT_SWIFT_URL'): sys.stdout.write(line) # and set up a fake smtpd with open(self.britney_conf, 'a') as f: f.write('EMAIL_HOST = localhost:1337') self.sourceppa_cache = {} self.email_cache = {} self.data.add('libc6', False) def do_test(self, unstable_add, expect_emails): '''Run britney with some unstable packages and verify excuses. unstable_add is a list of (binpkgname, field_dict, daysold, emails) expect_emails is a list that is checked against the emails sent during this do_test run. Return (output, excuses_dict, excuses_html, emails). ''' ET.smtpd.emails.clear() age_file = os.path.join(self.data.path, 'data', 'series', 'Dates') email_cache_file = os.path.join(self.data.path, 'data', 'series-proposed', 'EmailCache') for (pkg, fields, daysold, emails) in unstable_add: self.data.add(pkg, True, fields, True, None) self.sourceppa_cache.setdefault(pkg, {}) if fields['Version'] not in self.sourceppa_cache[pkg]: self.sourceppa_cache[pkg][fields['Version']] = '' with open(age_file, 'w') as f: import time do = time.time() - (60 * 60 * 24 * daysold) f.write('%s %s %d' % (pkg, fields['Version'], do)) with open(email_cache_file, 'w') as f: d = defaultdict(dict) d[pkg][fields['Version']] = (emails, 0) f.write(json.dumps(d)) # Set up sourceppa cache for testing sourceppa_path = os.path.join(self.data.dirs[True], 'SourcePPA') with open(sourceppa_path, 'w', encoding='utf-8') as sourceppa: sourceppa.write(json.dumps(self.sourceppa_cache)) (excuses_yaml, excuses_html, out) = self.run_britney() # convert excuses to source indexed dict excuses_dict = {} for s in yaml.load(excuses_yaml)['sources']: excuses_dict[s['source']] = s if 'SHOW_EXCUSES' in os.environ: print('------- excuses -----') pprint.pprint(excuses_dict, width=200) if 'SHOW_HTML' in os.environ: print('------- excuses.html -----\n%s\n' % excuses_html) if 'SHOW_OUTPUT' in os.environ: print('------- output -----\n%s\n' % out) self.assertNotIn('FIXME', out) # check all the emails that we asked for are there for email in expect_emails: self.assertIn(email, ET.smtpd.get_emails()) self.assertEqual(len(ET.smtpd.get_emails()), len(expect_emails)) return (out, excuses_dict, excuses_html, ET.smtpd.emails) def test_email_sent(self): '''Test that an email is sent through the SMTP server''' pkg = ('libc6', {'Version': '2', 'Depends': 'notavailable (>= 2)'}, 6, ['foo@bar.com']) self.do_test([pkg], ["foo@bar.com"]) def test_email_not_sent_block_all_source(self): '''Test that an email is not sent if the package is blocked by a block-all source hint''' self.create_hint('freeze', 'block-all source') pkg = ('libc6', {'Version': '2'}, 6, # daysold ['foo@bar.com']) self.do_test([pkg], []) def test_email_not_sent_blocked(self): '''Test that an email is not sent if the package is blocked by a block hint''' self.create_hint('freeze', 'block libc6') pkg = ('libc6', {'Version': '2'}, 6, # daysold ['foo@bar.com']) self.do_test([pkg], []) def test_email_sent_unblocked(self): '''Test that an email is sent if the package is unblocked''' self.create_hint('freeze', 'block libc6') self.create_hint('laney', 'unblock libc6/2') pkg = ('libc6', {'Version': '2', 'Depends': 'notavailable (>= 2)'}, 6, # daysold ['foo@bar.com']) self.do_test([pkg], ['foo@bar.com']) def test_email_not_sent_rejected_temporarily(self): '''Test that an email is not sent if the package is REJECTED_TEMPORARILY''' urgency_file = os.path.join(self.data.path, 'data', 'series', 'Urgency') with open(urgency_file, 'w') as f: # we specified in setUp() that emergency has a 10 day delay, and # age rejections are REJECTED_TEMPORARILY f.write('libc6 2 emergency') pkg = ('libc6', {'Version': '2', 'Depends': 'notavailable (>= 2)'}, 6, # daysold ['foo@bar.com']) self.do_test([pkg], []) if __name__ == '__main__': unittest.main()