email: Add tests that send email through a mocked SMTP server

I want to fix two bugs in interactions between other parts of britney
and the email policy. It's not currently easy to do so because we just
run the policy itself manually by creating some fake excuses.

Steal part of the machinery from the autopkgtest tests, and run a few
tests through britney completely. Use a fake SMTP server to record which
emails we sent.

(The port is hardcoded - that might not be so smart.)
master
Iain Lane 7 years ago
parent 63573fa24a
commit 8350694348

@ -92,6 +92,7 @@ class EmailPolicy(BasePolicy, Rest):
# self.cache contains self.emails_by_pkg from previous run
self.cache = {}
self.dry_run = dry_run
self.email_host = getattr(self.options, 'email_host', 'localhost')
def initialise(self, britney):
"""Load cached source ppa data"""
@ -192,7 +193,7 @@ class EmailPolicy(BasePolicy, Rest):
try:
self.log("%s/%s stuck for %d days, emailing %s" %
(source_name, version, age, recipients))
server = smtplib.SMTP('localhost')
server = smtplib.SMTP(self.email_host)
server.sendmail('noreply@canonical.com', emails, msg)
server.quit()
sent_age = age

@ -0,0 +1,45 @@
#!/usr/bin/python3
# (C) 2017 Canonical Ltd.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
from collections import defaultdict
import asyncore
import smtpd
import threading
class FakeSMTPServer(smtpd.SMTPServer):
"""A fake smtp server"""
def __init__(self, host, port):
# ((localhost, port), remoteaddr
# remoteaddr is an address to relay to, which isn't relevant for us
super().__init__((host, port), None, decode_data=False)
# to -> (from, data)
self.emails = defaultdict(list)
def process_message(self, peer, mailfrom, rcpttos, data, **kwargs):
#print('received email: %s, %s, %s' % (mailfrom, rcpttos, data))
for rcpt in rcpttos:
self.emails[rcpt].append(data)
pass
def get_emails(self):
'''Get a list of the people that were emailed'''
return list(self.emails.keys())
def run(self):
self.thread = threading.Thread(target=asyncore.loop,kwargs = {'timeout':1} )
self.thread.start()
# support standalone running
if __name__ == "__main__":
smtp_server = FakeSMTPServer('localhost', 1337)
smtp_server.run()

@ -6,9 +6,15 @@
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
from collections import defaultdict
import fileinput
import json
import os
import pprint
import sys
import unittest
import yaml
from unittest.mock import DEFAULT, patch, call
PROJECT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
@ -18,7 +24,8 @@ from britney2.policies.policy import PolicyVerdict
from britney2.policies.email import EmailPolicy, person_chooser, address_chooser
from tests.test_sourceppa import FakeOptions
from tests import TestBase
from tests.mock_smtpd import FakeSMTPServer
# Example of a direct upload by core dev: openstack-doc-tools 1.5.0-0ubuntu1
# https://api.launchpad.net/1.0/ubuntu/+archive/primary/+sourcepub/7524835
@ -115,6 +122,8 @@ class FakeSourceData:
class FakeExcuse:
is_valid = True
daysold = 0
reason = []
current_policy_verdict = PolicyVerdict.PASS
class T(unittest.TestCase):
@ -237,5 +246,106 @@ class T(unittest.TestCase):
])
class ET(TestBase):
''' Test sending mail through a mocked SMTP server '''
@classmethod
def setUpClass(cls):
cls.smtpd = FakeSMTPServer('localhost', 1337)
cls.smtpd.run()
@classmethod
def tearDownClass(cls):
cls.smtpd.close()
def setUp(self):
super().setUp()
# disable ADT, not relevant for us
for line in fileinput.input(self.britney_conf, inplace=True):
if line.startswith('ADT_ENABLE'):
print('ADT_ENABLE = no')
elif line.startswith('MINDAYS_EMERGENCY'):
print('MINDAYS_EMERGENCY = 10')
elif not line.startswith('ADT_AMQP') and not line.startswith('ADT_SWIFT_URL'):
sys.stdout.write(line)
# and set up a fake smtpd
with open(self.britney_conf, 'a') as f:
f.write('EMAIL_HOST = localhost:1337')
self.sourceppa_cache = {}
self.email_cache = {}
self.data.add('libc6', False)
def do_test(self, unstable_add, expect_emails):
'''Run britney with some unstable packages and verify excuses.
unstable_add is a list of (binpkgname, field_dict, daysold, emails)
expect_emails is a list that is checked against the emails sent during
this do_test run.
Return (output, excuses_dict, excuses_html, emails).
'''
ET.smtpd.emails.clear()
age_file = os.path.join(self.data.path,
'data',
'series',
'Dates')
email_cache_file = os.path.join(self.data.path,
'data',
'series-proposed',
'EmailCache')
for (pkg, fields, daysold, emails) in unstable_add:
self.data.add(pkg, True, fields, True, None)
self.sourceppa_cache.setdefault(pkg, {})
if fields['Version'] not in self.sourceppa_cache[pkg]:
self.sourceppa_cache[pkg][fields['Version']] = ''
with open(age_file, 'w') as f:
import time
do = time.time() - (60 * 60 * 24 * daysold)
f.write('%s %s %d' % (pkg, fields['Version'], do))
with open(email_cache_file, 'w') as f:
d = defaultdict(dict)
d[pkg][fields['Version']] = (emails, 0)
f.write(json.dumps(d))
# Set up sourceppa cache for testing
sourceppa_path = os.path.join(self.data.dirs[True], 'SourcePPA')
with open(sourceppa_path, 'w', encoding='utf-8') as sourceppa:
sourceppa.write(json.dumps(self.sourceppa_cache))
(excuses_yaml, excuses_html, out) = self.run_britney()
# convert excuses to source indexed dict
excuses_dict = {}
for s in yaml.load(excuses_yaml)['sources']:
excuses_dict[s['source']] = s
if 'SHOW_EXCUSES' in os.environ:
print('------- excuses -----')
pprint.pprint(excuses_dict, width=200)
if 'SHOW_HTML' in os.environ:
print('------- excuses.html -----\n%s\n' % excuses_html)
if 'SHOW_OUTPUT' in os.environ:
print('------- output -----\n%s\n' % out)
self.assertNotIn('FIXME', out)
# check all the emails that we asked for are there
for email in expect_emails:
self.assertIn(email, ET.smtpd.get_emails())
self.assertEqual(len(ET.smtpd.get_emails()), len(expect_emails))
return (out, excuses_dict, excuses_html, ET.smtpd.emails)
def test_email_sent(self):
'''Test that an email is sent through the SMTP server'''
pkg = ('libc6', {'Version': '2',
'Depends': 'notavailable (>= 2)'},
6,
['foo@bar.com'])
self.do_test([pkg], ["foo@bar.com"])
if __name__ == '__main__':
unittest.main()

Loading…
Cancel
Save