From 83506943488aa24e7953d5ebaa968053f4a3dfea Mon Sep 17 00:00:00 2001 From: Iain Lane Date: Wed, 22 Mar 2017 16:21:11 +0000 Subject: [PATCH] email: Add tests that send email through a mocked SMTP server I want to fix two bugs in interactions between other parts of britney and the email policy. It's not currently easy to do so because we just run the policy itself manually by creating some fake excuses. Steal part of the machinery from the autopkgtest tests, and run a few tests through britney completely. Use a fake SMTP server to record which emails we sent. (The port is hardcoded - that might not be so smart.) --- britney2/policies/email.py | 3 +- tests/mock_smtpd.py | 45 +++++++++++++++ tests/test_email.py | 112 ++++++++++++++++++++++++++++++++++++- 3 files changed, 158 insertions(+), 2 deletions(-) create mode 100644 tests/mock_smtpd.py diff --git a/britney2/policies/email.py b/britney2/policies/email.py index c488e58..9e11fd9 100644 --- a/britney2/policies/email.py +++ b/britney2/policies/email.py @@ -92,6 +92,7 @@ class EmailPolicy(BasePolicy, Rest): # self.cache contains self.emails_by_pkg from previous run self.cache = {} self.dry_run = dry_run + self.email_host = getattr(self.options, 'email_host', 'localhost') def initialise(self, britney): """Load cached source ppa data""" @@ -192,7 +193,7 @@ class EmailPolicy(BasePolicy, Rest): try: self.log("%s/%s stuck for %d days, emailing %s" % (source_name, version, age, recipients)) - server = smtplib.SMTP('localhost') + server = smtplib.SMTP(self.email_host) server.sendmail('noreply@canonical.com', emails, msg) server.quit() sent_age = age diff --git a/tests/mock_smtpd.py b/tests/mock_smtpd.py new file mode 100644 index 0000000..46e046a --- /dev/null +++ b/tests/mock_smtpd.py @@ -0,0 +1,45 @@ +#!/usr/bin/python3 + +# (C) 2017 Canonical Ltd. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. + +from collections import defaultdict + +import asyncore +import smtpd +import threading + +class FakeSMTPServer(smtpd.SMTPServer): + """A fake smtp server""" + + def __init__(self, host, port): + # ((localhost, port), remoteaddr + # remoteaddr is an address to relay to, which isn't relevant for us + super().__init__((host, port), None, decode_data=False) + + # to -> (from, data) + self.emails = defaultdict(list) + + def process_message(self, peer, mailfrom, rcpttos, data, **kwargs): + #print('received email: %s, %s, %s' % (mailfrom, rcpttos, data)) + for rcpt in rcpttos: + self.emails[rcpt].append(data) + pass + + def get_emails(self): + '''Get a list of the people that were emailed''' + return list(self.emails.keys()) + + def run(self): + self.thread = threading.Thread(target=asyncore.loop,kwargs = {'timeout':1} ) + self.thread.start() + + +# support standalone running +if __name__ == "__main__": + smtp_server = FakeSMTPServer('localhost', 1337) + smtp_server.run() diff --git a/tests/test_email.py b/tests/test_email.py index fde6405..6618e64 100755 --- a/tests/test_email.py +++ b/tests/test_email.py @@ -6,9 +6,15 @@ # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. +from collections import defaultdict + +import fileinput +import json import os +import pprint import sys import unittest +import yaml from unittest.mock import DEFAULT, patch, call PROJECT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) @@ -18,7 +24,8 @@ from britney2.policies.policy import PolicyVerdict from britney2.policies.email import EmailPolicy, person_chooser, address_chooser from tests.test_sourceppa import FakeOptions - +from tests import TestBase +from tests.mock_smtpd import FakeSMTPServer # Example of a direct upload by core dev: openstack-doc-tools 1.5.0-0ubuntu1 # https://api.launchpad.net/1.0/ubuntu/+archive/primary/+sourcepub/7524835 @@ -115,6 +122,8 @@ class FakeSourceData: class FakeExcuse: is_valid = True daysold = 0 + reason = [] + current_policy_verdict = PolicyVerdict.PASS class T(unittest.TestCase): @@ -237,5 +246,106 @@ class T(unittest.TestCase): ]) +class ET(TestBase): + ''' Test sending mail through a mocked SMTP server ''' + @classmethod + def setUpClass(cls): + cls.smtpd = FakeSMTPServer('localhost', 1337) + cls.smtpd.run() + + @classmethod + def tearDownClass(cls): + cls.smtpd.close() + + def setUp(self): + super().setUp() + # disable ADT, not relevant for us + for line in fileinput.input(self.britney_conf, inplace=True): + if line.startswith('ADT_ENABLE'): + print('ADT_ENABLE = no') + elif line.startswith('MINDAYS_EMERGENCY'): + print('MINDAYS_EMERGENCY = 10') + elif not line.startswith('ADT_AMQP') and not line.startswith('ADT_SWIFT_URL'): + sys.stdout.write(line) + # and set up a fake smtpd + with open(self.britney_conf, 'a') as f: + f.write('EMAIL_HOST = localhost:1337') + self.sourceppa_cache = {} + self.email_cache = {} + + self.data.add('libc6', False) + + def do_test(self, unstable_add, expect_emails): + '''Run britney with some unstable packages and verify excuses. + + unstable_add is a list of (binpkgname, field_dict, daysold, emails) + + expect_emails is a list that is checked against the emails sent during + this do_test run. + + Return (output, excuses_dict, excuses_html, emails). + ''' + ET.smtpd.emails.clear() + age_file = os.path.join(self.data.path, + 'data', + 'series', + 'Dates') + email_cache_file = os.path.join(self.data.path, + 'data', + 'series-proposed', + 'EmailCache') + for (pkg, fields, daysold, emails) in unstable_add: + self.data.add(pkg, True, fields, True, None) + self.sourceppa_cache.setdefault(pkg, {}) + if fields['Version'] not in self.sourceppa_cache[pkg]: + self.sourceppa_cache[pkg][fields['Version']] = '' + with open(age_file, 'w') as f: + import time + do = time.time() - (60 * 60 * 24 * daysold) + f.write('%s %s %d' % (pkg, fields['Version'], do)) + + with open(email_cache_file, 'w') as f: + d = defaultdict(dict) + d[pkg][fields['Version']] = (emails, 0) + f.write(json.dumps(d)) + + # Set up sourceppa cache for testing + sourceppa_path = os.path.join(self.data.dirs[True], 'SourcePPA') + with open(sourceppa_path, 'w', encoding='utf-8') as sourceppa: + sourceppa.write(json.dumps(self.sourceppa_cache)) + + (excuses_yaml, excuses_html, out) = self.run_britney() + + # convert excuses to source indexed dict + excuses_dict = {} + for s in yaml.load(excuses_yaml)['sources']: + excuses_dict[s['source']] = s + + if 'SHOW_EXCUSES' in os.environ: + print('------- excuses -----') + pprint.pprint(excuses_dict, width=200) + if 'SHOW_HTML' in os.environ: + print('------- excuses.html -----\n%s\n' % excuses_html) + if 'SHOW_OUTPUT' in os.environ: + print('------- output -----\n%s\n' % out) + + self.assertNotIn('FIXME', out) + # check all the emails that we asked for are there + for email in expect_emails: + self.assertIn(email, ET.smtpd.get_emails()) + self.assertEqual(len(ET.smtpd.get_emails()), len(expect_emails)) + + return (out, excuses_dict, excuses_html, ET.smtpd.emails) + + def test_email_sent(self): + '''Test that an email is sent through the SMTP server''' + pkg = ('libc6', {'Version': '2', + 'Depends': 'notavailable (>= 2)'}, + 6, + ['foo@bar.com']) + + self.do_test([pkg], ["foo@bar.com"]) + + if __name__ == '__main__': unittest.main()