diff --git a/britney2/policies/email.py b/britney2/policies/email.py index c488e58..9e11fd9 100644 --- a/britney2/policies/email.py +++ b/britney2/policies/email.py @@ -92,6 +92,7 @@ class EmailPolicy(BasePolicy, Rest): # self.cache contains self.emails_by_pkg from previous run self.cache = {} self.dry_run = dry_run + self.email_host = getattr(self.options, 'email_host', 'localhost') def initialise(self, britney): """Load cached source ppa data""" @@ -192,7 +193,7 @@ class EmailPolicy(BasePolicy, Rest): try: self.log("%s/%s stuck for %d days, emailing %s" % (source_name, version, age, recipients)) - server = smtplib.SMTP('localhost') + server = smtplib.SMTP(self.email_host) server.sendmail('noreply@canonical.com', emails, msg) server.quit() sent_age = age diff --git a/tests/mock_smtpd.py b/tests/mock_smtpd.py new file mode 100644 index 0000000..46e046a --- /dev/null +++ b/tests/mock_smtpd.py @@ -0,0 +1,45 @@ +#!/usr/bin/python3 + +# (C) 2017 Canonical Ltd. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. + +from collections import defaultdict + +import asyncore +import smtpd +import threading + +class FakeSMTPServer(smtpd.SMTPServer): + """A fake smtp server""" + + def __init__(self, host, port): + # ((localhost, port), remoteaddr + # remoteaddr is an address to relay to, which isn't relevant for us + super().__init__((host, port), None, decode_data=False) + + # to -> (from, data) + self.emails = defaultdict(list) + + def process_message(self, peer, mailfrom, rcpttos, data, **kwargs): + #print('received email: %s, %s, %s' % (mailfrom, rcpttos, data)) + for rcpt in rcpttos: + self.emails[rcpt].append(data) + pass + + def get_emails(self): + '''Get a list of the people that were emailed''' + return list(self.emails.keys()) + + def run(self): + self.thread = threading.Thread(target=asyncore.loop,kwargs = {'timeout':1} ) + self.thread.start() + + +# support standalone running +if __name__ == "__main__": + smtp_server = FakeSMTPServer('localhost', 1337) + smtp_server.run() diff --git a/tests/test_email.py b/tests/test_email.py index fde6405..6618e64 100755 --- a/tests/test_email.py +++ b/tests/test_email.py @@ -6,9 +6,15 @@ # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. +from collections import defaultdict + +import fileinput +import json import os +import pprint import sys import unittest +import yaml from unittest.mock import DEFAULT, patch, call PROJECT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) @@ -18,7 +24,8 @@ from britney2.policies.policy import PolicyVerdict from britney2.policies.email import EmailPolicy, person_chooser, address_chooser from tests.test_sourceppa import FakeOptions - +from tests import TestBase +from tests.mock_smtpd import FakeSMTPServer # Example of a direct upload by core dev: openstack-doc-tools 1.5.0-0ubuntu1 # https://api.launchpad.net/1.0/ubuntu/+archive/primary/+sourcepub/7524835 @@ -115,6 +122,8 @@ class FakeSourceData: class FakeExcuse: is_valid = True daysold = 0 + reason = [] + current_policy_verdict = PolicyVerdict.PASS class T(unittest.TestCase): @@ -237,5 +246,106 @@ class T(unittest.TestCase): ]) +class ET(TestBase): + ''' Test sending mail through a mocked SMTP server ''' + @classmethod + def setUpClass(cls): + cls.smtpd = FakeSMTPServer('localhost', 1337) + cls.smtpd.run() + + @classmethod + def tearDownClass(cls): + cls.smtpd.close() + + def setUp(self): + super().setUp() + # disable ADT, not relevant for us + for line in fileinput.input(self.britney_conf, inplace=True): + if line.startswith('ADT_ENABLE'): + print('ADT_ENABLE = no') + elif line.startswith('MINDAYS_EMERGENCY'): + print('MINDAYS_EMERGENCY = 10') + elif not line.startswith('ADT_AMQP') and not line.startswith('ADT_SWIFT_URL'): + sys.stdout.write(line) + # and set up a fake smtpd + with open(self.britney_conf, 'a') as f: + f.write('EMAIL_HOST = localhost:1337') + self.sourceppa_cache = {} + self.email_cache = {} + + self.data.add('libc6', False) + + def do_test(self, unstable_add, expect_emails): + '''Run britney with some unstable packages and verify excuses. + + unstable_add is a list of (binpkgname, field_dict, daysold, emails) + + expect_emails is a list that is checked against the emails sent during + this do_test run. + + Return (output, excuses_dict, excuses_html, emails). + ''' + ET.smtpd.emails.clear() + age_file = os.path.join(self.data.path, + 'data', + 'series', + 'Dates') + email_cache_file = os.path.join(self.data.path, + 'data', + 'series-proposed', + 'EmailCache') + for (pkg, fields, daysold, emails) in unstable_add: + self.data.add(pkg, True, fields, True, None) + self.sourceppa_cache.setdefault(pkg, {}) + if fields['Version'] not in self.sourceppa_cache[pkg]: + self.sourceppa_cache[pkg][fields['Version']] = '' + with open(age_file, 'w') as f: + import time + do = time.time() - (60 * 60 * 24 * daysold) + f.write('%s %s %d' % (pkg, fields['Version'], do)) + + with open(email_cache_file, 'w') as f: + d = defaultdict(dict) + d[pkg][fields['Version']] = (emails, 0) + f.write(json.dumps(d)) + + # Set up sourceppa cache for testing + sourceppa_path = os.path.join(self.data.dirs[True], 'SourcePPA') + with open(sourceppa_path, 'w', encoding='utf-8') as sourceppa: + sourceppa.write(json.dumps(self.sourceppa_cache)) + + (excuses_yaml, excuses_html, out) = self.run_britney() + + # convert excuses to source indexed dict + excuses_dict = {} + for s in yaml.load(excuses_yaml)['sources']: + excuses_dict[s['source']] = s + + if 'SHOW_EXCUSES' in os.environ: + print('------- excuses -----') + pprint.pprint(excuses_dict, width=200) + if 'SHOW_HTML' in os.environ: + print('------- excuses.html -----\n%s\n' % excuses_html) + if 'SHOW_OUTPUT' in os.environ: + print('------- output -----\n%s\n' % out) + + self.assertNotIn('FIXME', out) + # check all the emails that we asked for are there + for email in expect_emails: + self.assertIn(email, ET.smtpd.get_emails()) + self.assertEqual(len(ET.smtpd.get_emails()), len(expect_emails)) + + return (out, excuses_dict, excuses_html, ET.smtpd.emails) + + def test_email_sent(self): + '''Test that an email is sent through the SMTP server''' + pkg = ('libc6', {'Version': '2', + 'Depends': 'notavailable (>= 2)'}, + 6, + ['foo@bar.com']) + + self.do_test([pkg], ["foo@bar.com"]) + + if __name__ == '__main__': unittest.main()