#!/usr/bin/python3 # (C) 2017 Canonical Ltd. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. from collections import defaultdict import fileinput import json import os import pprint import sys import unittest import yaml from unittest.mock import DEFAULT, patch, call PROJECT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.insert(0, PROJECT_DIR) from britney2.policies.policy import PolicyVerdict from britney2.policies.email import EmailPolicy, person_chooser, address_chooser from tests.test_sourceppa import FakeOptions from tests import TestBase from tests.mock_smtpd import FakeSMTPServer # Example of a direct upload by core dev: openstack-doc-tools 1.5.0-0ubuntu1 # https://api.launchpad.net/1.0/ubuntu/+archive/primary/+sourcepub/7524835 UPLOAD = dict( creator_link=None, package_creator_link="https://api.launchpad.net/1.0/~zulcss", package_signer_link="https://api.launchpad.net/1.0/~zulcss", sponsor_link=None, ) # Example of a sponsored upload: kerneloops 0.12+git20140509-2ubuntu1 # https://api.launchpad.net/1.0/ubuntu/+archive/primary/+sourcepub/7013597 SPONSORED_UPLOAD = dict( creator_link=None, package_creator_link="https://api.launchpad.net/1.0/~smb", package_signer_link="https://api.launchpad.net/1.0/~apw", sponsor_link=None, ) # Example of a bileto upload: autopilot 1.6.0+17.04.20170302-0ubuntu1 # https://api.launchpad.net/1.0/ubuntu/+archive/primary/+sourcepub/7525085 # (dobey clicked 'build' and sil2100 clicked 'publish') BILETO = dict( creator_link="https://api.launchpad.net/1.0/~sil2100", package_creator_link="https://api.launchpad.net/1.0/~dobey", package_signer_link="https://api.launchpad.net/1.0/~ci-train-bot", sponsor_link="https://api.launchpad.net/1.0/~ubuntu-archive-robot", ) # Example of a non-sponsored copy: linux 4.10.0-11.13 # https://api.launchpad.net/1.0/ubuntu/+archive/primary/+sourcepub/7522481 # (the upload to the PPA was sponsored but the copy was done directly) UNSPONSORED_COPY = dict( creator_link="https://api.launchpad.net/1.0/~timg-tpi", package_creator_link="https://api.launchpad.net/1.0/~sforshee", package_signer_link="https://api.launchpad.net/1.0/~timg-tpi", sponsor_link=None, ) # Example of a sponsored copy: pagein 0.00.03-1 # https://api.launchpad.net/1.0/ubuntu/+archive/primary/+sourcepub/7533336 SPONSORED_COPY = dict( creator_link="https://api.launchpad.net/1.0/~colin-king", package_creator_link="https://api.launchpad.net/1.0/~colin-king", package_signer_link=None, sponsor_link="https://api.launchpad.net/1.0/~mapreri", ) # Example of a manual debian sync: systemd 232-19 # https://api.launchpad.net/1.0/ubuntu/+archive/primary/+sourcepub/7522736 MANUAL_SYNC = dict( creator_link="https://api.launchpad.net/1.0/~costamagnagianfranco", package_creator_link="https://api.launchpad.net/1.0/~pkg-systemd-maintainers", package_signer_link=None, sponsor_link=None, ) # Example of a sponsored manual debian sync: python-pymysql 0.7.9-2 # https://api.launchpad.net/1.0/ubuntu/+archive/primary/+sourcepub/7487820 SPONSORED_MANUAL_SYNC = dict( creator_link="https://api.launchpad.net/1.0/~lars-tangvald", package_creator_link="https://api.launchpad.net/1.0/~openstack-1.0", package_signer_link=None, sponsor_link="https://api.launchpad.net/1.0/~racb", ) # Example of an automatic debian sync: gem2deb 0.33.1 # https://api.launchpad.net/1.0/ubuntu/+archive/primary/+sourcepub/7255529 AUTO_SYNC = dict( creator_link="https://api.launchpad.net/1.0/~katie", package_creator_link="https://api.launchpad.net/1.0/~pkg-ruby-extras-maintainers", package_signer_link=None, sponsor_link="https://api.launchpad.net/1.0/~ubuntu-archive-robot", ) PROMOTED_AUTO_SYNC = [ dict( creator_link=None, package_creator_link="https://api.launchpad.net/1.0/~pkg-ruby-extras-maintainers", package_signer_link=None, sponsor_link=None, ), dict( creator_link="https://api.launchpad.net/1.0/~katie", package_creator_link="https://api.launchpad.net/1.0/~pkg-ruby-extras-maintainers", package_signer_link=None, sponsor_link="https://api.launchpad.net/1.0/~ubuntu-archive-robot", ), ] # address lists UBUNTU = ["personal@gmail.com", "ubuntu@ubuntu.com", "work@canonical.com"] CANONICAL = ["personal@gmail.com", "work@canonical.com"] COMMUNITY = ["personal@gmail.com", "other@gmail.com"] def retvals(retvals): """Return different retvals on different calls of mock.""" def returner(*args, **kwargs): return retvals.pop() return returner class FakeItem: package = "chromium-browser" class FakeSourceData: version = "55.0" FakeItem, class FakeExcuse: is_valid = True daysold = 0 reason = [] tentative_policy_verdict = PolicyVerdict.PASS class T(unittest.TestCase): maxDiff = None def test_person_chooser(self): """Find the correct person to blame for an upload.""" self.assertEqual( person_chooser(UPLOAD), {"https://api.launchpad.net/1.0/~zulcss"} ) self.assertEqual( person_chooser(SPONSORED_UPLOAD), { "https://api.launchpad.net/1.0/~apw", "https://api.launchpad.net/1.0/~smb", }, ) self.assertEqual( person_chooser(BILETO), { "https://api.launchpad.net/1.0/~dobey", "https://api.launchpad.net/1.0/~sil2100", }, ) self.assertEqual( person_chooser(UNSPONSORED_COPY), {"https://api.launchpad.net/1.0/~timg-tpi"}, ) self.assertEqual( person_chooser(SPONSORED_COPY), { "https://api.launchpad.net/1.0/~colin-king", "https://api.launchpad.net/1.0/~mapreri", }, ) self.assertEqual( person_chooser(MANUAL_SYNC), {"https://api.launchpad.net/1.0/~costamagnagianfranco"}, ) self.assertSequenceEqual( person_chooser(SPONSORED_MANUAL_SYNC), { "https://api.launchpad.net/1.0/~lars-tangvald", "https://api.launchpad.net/1.0/~racb", }, ) self.assertEqual(person_chooser(AUTO_SYNC), set()) def test_address_chooser(self): """Prioritize email addresses correctly.""" self.assertEqual(address_chooser(UBUNTU), "ubuntu@ubuntu.com") self.assertEqual(address_chooser(CANONICAL), "work@canonical.com") self.assertEqual(address_chooser(COMMUNITY), "personal@gmail.com") @patch("britney2.policies.email.EmailPolicy.query_rest_api") @patch("britney2.policies.email.EmailPolicy.query_lp_rest_api") def test_email_promoted_package(self, lp, rest): """When a package has been promoted in proposed, we find the older SPPH and use its details - in the case of an autosync to not email.""" lp.return_value = dict(entries=PROMOTED_AUTO_SYNC) e = EmailPolicy(FakeOptions, None) self.assertEqual( e.lp_get_emails("openstack-doct-tools", "1.5.0-0ubuntu1"), [] ) self.assertSequenceEqual( lp.mock_calls, [ call( "testbuntu/+archive/primary", { "distro_series": "/testbuntu/zazzy", "exact_match": "true", "order_by_date": "true", "pocket": "Proposed", "source_name": "openstack-doct-tools", "version": "1.5.0-0ubuntu1", "ws.op": "getPublishedSources", }, ) ], ) self.assertSequenceEqual(rest.mock_calls, []) @patch("britney2.policies.email.EmailPolicy.query_rest_api") @patch("britney2.policies.email.EmailPolicy.query_lp_rest_api") def test_email_scraping(self, lp, rest): """Poke correct REST APIs to find email addresses.""" lp.side_effect = retvals( [ dict(entries=[dict(fingerprint="DEFACED_ED1F1CE")]), dict(entries=[UPLOAD]), ] ) rest.return_value = "uid:Defaced Edifice :12345::" e = EmailPolicy(FakeOptions, None) self.assertEqual( e.lp_get_emails("openstack-doct-tools", "1.5.0-0ubuntu1"), ["ex@example.com"], ) self.assertSequenceEqual( lp.mock_calls, [ call( "testbuntu/+archive/primary", { "distro_series": "/testbuntu/zazzy", "exact_match": "true", "order_by_date": "true", "pocket": "Proposed", "source_name": "openstack-doct-tools", "version": "1.5.0-0ubuntu1", "ws.op": "getPublishedSources", }, ), call("https://api.launchpad.net/1.0/~zulcss/gpg_keys", {}), ], ) self.assertSequenceEqual( rest.mock_calls, [ call( "http://keyserver.ubuntu.com/pks/lookup", { "exact": "on", "op": "index", "options": "mr", "search": "0xDEFACED_ED1F1CE", }, ) ], ) @patch("britney2.policies.email.EmailPolicy.lp_get_emails") @patch("britney2.policies.email.smtplib") def test_smtp_not_sent(self, smtp, lp): """Know when not to send any emails.""" lp.return_value = ["example@email.com"] e = EmailPolicy(FakeOptions, None) FakeExcuse.daysold = 0.002 e.apply_src_policy_impl( None, FakeItem, None, FakeSourceData, FakeExcuse ) FakeExcuse.daysold = 2.98 e.apply_src_policy_impl( None, FakeItem, None, FakeSourceData, FakeExcuse ) # Would email but no address found FakeExcuse.daysold = 10.12 lp.return_value = [] e.apply_src_policy_impl( None, FakeItem, None, FakeSourceData, FakeExcuse ) self.assertEqual(smtp.mock_calls, []) @patch("britney2.policies.email.EmailPolicy.lp_get_emails") @patch("britney2.policies.email.smtplib") def test_smtp_sent(self, smtp, lp): """Send emails correctly.""" lp.return_value = ["email@address.com"] e = EmailPolicy(FakeOptions, None) FakeExcuse.is_valid = False FakeExcuse.daysold = 100 e.apply_src_policy_impl( None, FakeItem, None, FakeSourceData, FakeExcuse ) smtp.SMTP.assert_called_once_with("localhost") @patch("britney2.policies.email.EmailPolicy.lp_get_emails") @patch("britney2.policies.email.smtplib", autospec=True) def smtp_repetition(self, smtp, lp, valid, expected): """Resend mails periodically, with decreasing frequency.""" if not isinstance(valid, list): valid = [valid] * len(expected) FakeExcuse.is_valid = valid lp.return_value = ["email@address.com"] sendmail = smtp.SMTP().sendmail e = EmailPolicy(FakeOptions, None) called = [] e.cache = {} for hours in range(0, 5000): previous = sendmail.call_count age = hours / 24 FakeExcuse.daysold = age try: FakeExcuse.is_valid = valid[len(called)] except IndexError: # we've already gotten all the mails we expect pass e.apply_src_policy_impl( None, FakeItem, None, FakeSourceData, FakeExcuse ) if sendmail.call_count > previous: e.initialise(None) # Refill e.cache from disk called.append(age) name, args, kwargs = sendmail.mock_calls[-1] text = args[2] self.assertNotIn(" 1 days.", text) self.assertSequenceEqual(called, expected) def test_smtp_repetition(self): """Confirm that emails are sent at appropriate intervals.""" # Emails were sent when daysold reached these values: self.smtp_repetition( valid=False, expected=[1, 3, 7, 15, 31, 61, 91, 121, 151, 181] ) self.smtp_repetition( valid=True, expected=[5, 7, 11, 19, 35, 65, 95, 125, 155, 185] ) self.smtp_repetition( valid=[False, False, True], expected=[1, 3, 5, 7, 11, 19, 35, 65, 95, 125, 155, 185], ) self.smtp_repetition( valid=[False, False, True, False, True], expected=[1, 3, 5, 7, 11, 19, 35, 65, 95, 125, 155, 185], ) class ET(TestBase): """ Test sending mail through a mocked SMTP server """ @classmethod def setUpClass(cls): cls.smtpd = FakeSMTPServer("localhost", 1337) cls.smtpd.run() @classmethod def tearDownClass(cls): cls.smtpd.close() def setUp(self): super().setUp() # disable ADT, not relevant for us for line in fileinput.input(self.britney_conf, inplace=True): if line.startswith("ADT_ENABLE"): print("ADT_ENABLE = no") elif line.startswith("MINDAYS_EMERGENCY"): print("MINDAYS_EMERGENCY = 10") elif not line.startswith("ADT_AMQP") and not line.startswith( "ADT_SWIFT_URL" ): sys.stdout.write(line) # and set up a fake smtpd with open(self.britney_conf, "a") as f: f.write("EMAIL_HOST = localhost:1337") self.age_file = os.path.join(self.data.dirs[False], "Dates") self.urgency_file = os.path.join(self.data.dirs[False], "Urgency") self.email_cache_file = os.path.join(self.data.dirs[True], "EmailCache") self.sourceppa_cache = {} self.email_cache = {} self.data.add("libc6", False) def do_test(self, unstable_add, expect_emails): """Run britney with some unstable packages and verify excuses. unstable_add is a list of (binpkgname, field_dict, daysold, emails) expect_emails is a list that is checked against the emails sent during this do_test run. Return (output, excuses_dict, excuses_html, emails). """ ET.smtpd.emails.clear() for (pkg, fields, daysold, emails) in unstable_add: self.data.add(pkg, True, fields, True, None) self.sourceppa_cache.setdefault(pkg, {}) if fields["Version"] not in self.sourceppa_cache[pkg]: self.sourceppa_cache[pkg][fields["Version"]] = "" with open(self.age_file, "w") as f: import time do = time.time() - (60 * 60 * 24 * daysold) f.write("%s %s %d" % (pkg, fields["Version"], do)) with open(self.email_cache_file, "w") as f: d = defaultdict(dict) d[pkg][fields["Version"]] = (emails, 0) f.write(json.dumps(d)) # Set up sourceppa cache for testing sourceppa_path = os.path.join(self.data.dirs[True], "SourcePPA") with open(sourceppa_path, "w", encoding="utf-8") as sourceppa: sourceppa.write(json.dumps(self.sourceppa_cache)) (excuses_yaml, excuses_html, out) = self.run_britney() # convert excuses to source indexed dict excuses_dict = {} for s in yaml.safe_load(excuses_yaml)["sources"]: excuses_dict[s["source"]] = s if "SHOW_EXCUSES" in os.environ: print("------- excuses -----") pprint.pprint(excuses_dict, width=200) if "SHOW_HTML" in os.environ: print("------- excuses.html -----\n%s\n" % excuses_html) if "SHOW_OUTPUT" in os.environ: print("------- output -----\n%s\n" % out) self.assertNotIn("FIXME", out) # check all the emails that we asked for are there for email in expect_emails: self.assertIn(email, ET.smtpd.get_emails()) self.assertEqual(len(ET.smtpd.get_emails()), len(expect_emails)) return (out, excuses_dict, excuses_html, ET.smtpd.emails) def test_email_sent(self): """Test that an email is sent through the SMTP server""" pkg = ( "libc6", {"Version": "2", "Depends": "notavailable (>= 2)"}, 6, ["foo@bar.com"], ) self.do_test([pkg], ["foo@bar.com"]) def test_email_not_sent_block_all_source(self): """Test that an email is not sent if the package is blocked by a block-all source hint""" self.create_hint("freeze", "block-all source") pkg = ("libc6", {"Version": "2"}, 6, ["foo@bar.com"]) # daysold self.do_test([pkg], []) def test_email_not_sent_blocked(self): """Test that an email is not sent if the package is blocked by a block hint""" self.create_hint("freeze", "block libc6") pkg = ("libc6", {"Version": "2"}, 6, ["foo@bar.com"]) # daysold self.do_test([pkg], []) def test_email_sent_unblocked(self): """Test that an email is sent if the package is unblocked""" self.create_hint("freeze", "block libc6") self.create_hint("freeze-exception", "unblock libc6/2") pkg = ( "libc6", {"Version": "2"}, 6, # daysold ["foo@bar.com"], ) self.do_test([pkg], ["foo@bar.com"]) def test_email_not_sent_rejected_temporarily(self): """Test that an email is not sent if the package is REJECTED_TEMPORARILY""" with open(self.urgency_file, "w") as f: # we specified in setUp() that emergency has a 10 day delay, and # age rejections are REJECTED_TEMPORARILY f.write("libc6 2 emergency") pkg = ( "libc6", {"Version": "2"}, 6, # daysold ["foo@bar.com"], ) self.do_test([pkg], []) if __name__ == "__main__": unittest.main()