Email Policy, send emails when packages are rejected.

ubuntu/rebased
Robert Bruce Park 8 years ago committed by Iain Lane
parent 82a5e1f5a8
commit 84892de81a
No known key found for this signature in database
GPG Key ID: E352D5C51C5041D4

@ -217,6 +217,7 @@ from britney2.policies.policy import (AgePolicy,
) )
from britney2.policies.autopkgtest import AutopkgtestPolicy from britney2.policies.autopkgtest import AutopkgtestPolicy
from britney2.policies.sourceppa import SourcePPAPolicy from britney2.policies.sourceppa import SourcePPAPolicy
from britney2.policies.email import EmailPolicy
from britney2.utils import (log_and_format_old_libraries, from britney2.utils import (log_and_format_old_libraries,
read_nuninst, write_nuninst, write_heidi, read_nuninst, write_nuninst, write_heidi,
format_and_log_uninst, newly_uninst, format_and_log_uninst, newly_uninst,
@ -525,6 +526,11 @@ class Britney(object):
self._policy_engine.add_policy(BuiltOnBuilddPolicy(self.options, self.suite_info)) self._policy_engine.add_policy(BuiltOnBuilddPolicy(self.options, self.suite_info))
self._policy_engine.add_policy(LPBlockBugPolicy(self.options, self.suite_info)) self._policy_engine.add_policy(LPBlockBugPolicy(self.options, self.suite_info))
self._policy_engine.add_policy(SourcePPAPolicy(self.options, self.suite_info)) self._policy_engine.add_policy(SourcePPAPolicy(self.options, self.suite_info))
add_email_policy = getattr(self.options, 'email_enable', 'no')
if add_email_policy in ('yes', 'dry-run'):
self._policy_engine.add_policy(EmailPolicy(self.options,
self.suite_info,
dry_run=add_email_policy == 'dry-run'))
@property @property
def hints(self): def hints(self):

@ -219,6 +219,19 @@ class Excuse(object):
value = PolicyVerdict.PASS_HINTED value = PolicyVerdict.PASS_HINTED
self._policy_verdict = value self._policy_verdict = value
@property
def tentative_policy_verdict(self):
"""If we've not finished running all of the policies, we can find out
what all of the policies that have run so far said."""
all_verdicts = {
info["verdict"]
if isinstance(info["verdict"], PolicyVerdict)
else PolicyVerdict[info["verdict"]]
for info in self.policy_info.values()
}
return max(all_verdicts)
def set_vers(self, tver, uver): def set_vers(self, tver, uver):
"""Set the versions of the item from target and source suite""" """Set the versions of the item from target and source suite"""
if tver and uver: if tver and uver:

@ -0,0 +1,315 @@
import os
import re
import json
import math
import socket
import smtplib
from urllib.error import HTTPError
from urllib.parse import unquote
from collections import defaultdict
from britney2 import SuiteClass
from britney2.policies.rest import Rest
from britney2.policies.policy import BasePolicy, PolicyVerdict
# Recurring emails should never be more than this many days apart
MAX_INTERVAL = 30
API_PREFIX = "https://api.launchpad.net/1.0/"
USER = API_PREFIX + "~"
# Don't send emails to these bots
BOTS = {
USER + "ci-train-bot",
USER + "bileto-bot",
USER + "ubuntu-archive-robot",
USER + "katie",
}
MESSAGE = """From: Ubuntu Release Team <noreply@canonical.com>
To: {recipients}
X-Proposed-Migration: notice
Subject: [proposed-migration] {source_name} {version} stuck in {series}-proposed for {age} day{plural}.
Hi,
{source_name} {version} needs attention.
It has been stuck in {series}-proposed for {age} day{plural}.
You either sponsored or uploaded this package, please investigate why it hasn't been approved for migration.
http://people.canonical.com/~ubuntu-archive/proposed-migration/{series}/update_excuses.html#{source_name}
https://wiki.ubuntu.com/ProposedMigration
If you have any questions about this email, please ask them in #ubuntu-release channel on Freenode IRC.
Regards, Ubuntu Release Team.
"""
def person_chooser(source):
"""Assign blame for the current source package."""
people = (
{
source["package_signer_link"],
source["sponsor_link"],
source["creator_link"],
}
- {None}
- BOTS
)
# some bots (e.g. bileto) generate uploads that are otherwise manual. We
# want to email the people that the bot was acting on behalf of.
bot = source["package_signer_link"] in BOTS
# direct uploads
regular = not source["creator_link"] and not source["sponsor_link"]
if bot or regular:
people.add(source["package_creator_link"])
return people
def address_chooser(addresses):
"""Prefer @ubuntu and @canonical addresses."""
first = ""
canonical = ""
for address in addresses:
if address.endswith("@ubuntu.com"):
return address
if address.endswith("@canonical.com"):
canonical = address
if not first:
first = address
return canonical or first
class EmailPolicy(BasePolicy, Rest):
"""Send an email when a package has been rejected."""
def __init__(self, options, suite_info, dry_run=False):
super().__init__(
"email", options, suite_info, {SuiteClass.PRIMARY_SOURCE_SUITE}
)
self.filename = os.path.join(options.unstable, "EmailCache")
# Maps lp username -> email address
self.addresses = {}
# Dict of dicts; maps pkg name -> pkg version -> boolean
self.emails_by_pkg = defaultdict(dict)
# self.cache contains self.emails_by_pkg from previous run
self.cache = {}
self.dry_run = dry_run
self.email_host = getattr(self.options, "email_host", "localhost")
self.logger.info(
"EmailPolicy: will send emails to: %s", self.email_host
)
def initialise(self, britney):
"""Load cached source ppa data"""
super().initialise(britney)
if os.path.exists(self.filename):
with open(self.filename, encoding="utf-8") as data:
self.cache = json.load(data)
self.logger.info("Loaded cached email data from %s" % self.filename)
tmp = self.filename + ".new"
if os.path.exists(tmp):
# if we find a record on disk of emails sent from an incomplete
# britney run, merge them in now.
with open(tmp, encoding="utf-8") as data:
self.cache.update(json.load(data))
self._save_progress(self.cache)
self.save_state()
def _scrape_gpg_emails(self, person):
"""Find email addresses from one person's GPG keys."""
if person in self.addresses:
return self.addresses[person]
addresses = []
try:
gpg = self.query_lp_rest_api(person + "/gpg_keys", {})
for key in gpg["entries"]:
details = self.query_rest_api(
"http://keyserver.ubuntu.com/pks/lookup",
{
"op": "index",
"search": "0x" + key["fingerprint"],
"exact": "on",
"options": "mr",
},
)
for line in details.splitlines():
parts = line.split(":")
if parts[0] == "info":
if int(parts[1]) != 1 or int(parts[2]) > 1:
break
if parts[0] == "uid":
flags = parts[4]
if "e" in flags or "r" in flags:
continue
uid = unquote(parts[1])
match = re.match(r"^.*<(.+@.+)>$", uid)
if match:
addresses.append(match.group(1))
address = self.addresses[person] = address_chooser(addresses)
return address
except HTTPError as e:
if e.code != 410: # suspended user
raise
self.logger.info(
"Ignoring person %s as suspended in Launchpad" % person
)
return None
def scrape_gpg_emails(self, people):
"""Find email addresses from GPG keys."""
emails = [self._scrape_gpg_emails(person) for person in (people or [])]
return [email for email in emails if email is not None]
def lp_get_emails(self, pkg, version):
"""Ask LP who uploaded this package."""
data = self.query_lp_rest_api(
"%s/+archive/primary" % self.options.distribution,
{
"ws.op": "getPublishedSources",
"distro_series": "/%s/%s"
% (self.options.distribution, self.options.series),
"exact_match": "true",
"order_by_date": "true",
"pocket": "Proposed",
"source_name": pkg,
"version": version,
},
)
try:
source = next(reversed(data["entries"]))
# IndexError means no packages in -proposed matched this name/version,
# which is expected to happen when bileto runs britney.
except StopIteration:
self.logger.info(
"Email getPublishedSources IndexError (%s %s)" % (pkg, version)
)
return []
return self.scrape_gpg_emails(person_chooser(source))
def apply_src_policy_impl(
self, email_info, item, source_data_tdist, source_data_srcdist, excuse
):
"""Send email if package is rejected."""
source_name = item.package
max_age = 5 if excuse.is_valid else 1
series = self.options.series
version = source_data_srcdist.version
age = int(excuse.daysold) or 0
plural = "" if age == 1 else "s"
# an item is stuck if it's
# - old enough
# - not blocked
# - not temporarily rejected (e.g. by the autopkgtest policy when tests
# are still running)
stuck = (
age >= max_age
and "block" not in excuse.reason
and excuse.tentative_policy_verdict
!= PolicyVerdict.REJECTED_TEMPORARILY
)
if self.dry_run:
self.logger.info(
"[email dry run] Considering: %s/%s: %s"
% (source_name, version, "stuck" if stuck else "not stuck")
)
if not stuck:
return PolicyVerdict.PASS
cached = self.cache.get(source_name, {}).get(version)
try:
emails, last_sent = cached
# migration of older data
last_sent = int(last_sent)
# Find out whether we are due to send another email by calculating
# the most recent age at which we should have sent one. A
# sequence of doubling intervals (0 + 1 = 1, 1 + 2 = 3, 3 + 4 = 7)
# is equivalent to 2^n-1, or 2^n + (max_age - 1) - 1.
# 2^(floor(log2(age))) straightforwardly calculates the most
# recent age at which we wanted to send an email.
last_due = int(
math.pow(2, int(math.log(age + 2 - max_age, 2))) + max_age - 2
)
# Don't let the interval double without bounds.
if last_due - max_age >= MAX_INTERVAL:
last_due = (
int((age - max_age - MAX_INTERVAL) / MAX_INTERVAL)
* MAX_INTERVAL
+ max_age
+ MAX_INTERVAL
)
# And don't send emails before we've reached the minimum age
# threshold.
if last_due < max_age:
last_due = max_age
except TypeError:
# This exception happens when source_name, version never seen before
emails = []
last_sent = 0
last_due = max_age
if self.dry_run:
self.logger.info(
"[email dry run] Age %d >= threshold %d: would email: %s"
% (age, max_age, self.lp_get_emails(source_name, version))
)
# don't update the cache file in dry run mode; we'll see all output each time
return PolicyVerdict.PASS
if last_sent < last_due:
if not emails:
emails = self.lp_get_emails(source_name, version)
if emails:
recipients = ", ".join(emails)
msg = MESSAGE.format(**locals())
try:
self.logger.info(
"%s/%s stuck for %d days (email last sent at %d days old, "
"threshold for sending %d days), emailing %s"
% (
source_name,
version,
age,
last_sent,
last_due,
recipients,
)
)
server = smtplib.SMTP(self.email_host)
server.sendmail("noreply@canonical.com", emails, msg)
server.quit()
# record the age at which the mail should have been sent
last_sent = last_due
except socket.error as err:
self.logger.error(
"Failed to send mail! Is SMTP server running?"
)
self.logger.error(err)
self.emails_by_pkg[source_name][version] = (emails, last_sent)
self._save_progress(self.emails_by_pkg)
return PolicyVerdict.PASS
def _save_progress(self, my_data):
"""Checkpoint after each sent mail"""
tmp = self.filename + ".new"
with open(tmp, "w", encoding="utf-8") as data:
json.dump(my_data, data)
return tmp
def save_state(self, britney=None):
"""Save email notification status of all pending packages"""
if not self.dry_run:
try:
os.rename(self.filename + ".new", self.filename)
# if we haven't written any cache, don't clobber the old one
except FileNotFoundError:
pass
if britney:
self.logger.info("Wrote email data to %s" % self.filename)

@ -402,6 +402,8 @@ ADT_SUCCESS_BOUNTY =
ADT_REGRESSION_PENALTY = ADT_REGRESSION_PENALTY =
ADT_BASELINE = ADT_BASELINE =
ADT_RETRY_OLDER_THAN = ADT_RETRY_OLDER_THAN =
EMAIL_ENABLE = yes
''') ''')
assert os.path.exists(self.britney) assert os.path.exists(self.britney)

@ -0,0 +1,48 @@
#!/usr/bin/python3
# (C) 2017 Canonical Ltd.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
from collections import defaultdict
import asyncore
import smtpd
import threading
class FakeSMTPServer(smtpd.SMTPServer):
"""A fake smtp server"""
def __init__(self, host, port):
# ((localhost, port), remoteaddr
# remoteaddr is an address to relay to, which isn't relevant for us
super().__init__((host, port), None, decode_data=False)
# to -> (from, data)
self.emails = defaultdict(list)
def process_message(self, peer, mailfrom, rcpttos, data, **kwargs):
# print('received email: %s, %s, %s' % (mailfrom, rcpttos, data))
for rcpt in rcpttos:
self.emails[rcpt].append(data)
pass
def get_emails(self):
"""Get a list of the people that were emailed"""
return list(self.emails.keys())
def run(self):
self.thread = threading.Thread(
target=asyncore.loop, kwargs={"timeout": 1}
)
self.thread.start()
# support standalone running
if __name__ == "__main__":
smtp_server = FakeSMTPServer("localhost", 1337)
smtp_server.run()

@ -65,6 +65,12 @@ class TestAutopkgtestBase(TestBase):
'newgreen': {'2': ''}, 'newgreen': {'2': ''},
} }
self.email_cache = {}
for pkg, vals in self.sourceppa_cache.items():
for version, empty in vals.items():
self.email_cache.setdefault(pkg, {})
self.email_cache[pkg][version] = True
self.email_cache = {} self.email_cache = {}
for pkg, vals in self.sourceppa_cache.items(): for pkg, vals in self.sourceppa_cache.items():
for version, empty in vals.items(): for version, empty in vals.items():

@ -0,0 +1,516 @@
#!/usr/bin/python3
# (C) 2017 Canonical Ltd.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
from collections import defaultdict
import fileinput
import json
import os
import pprint
import sys
import unittest
import yaml
from unittest.mock import DEFAULT, patch, call
PROJECT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.insert(0, PROJECT_DIR)
from britney2.policies.policy import PolicyVerdict
from britney2.policies.email import EmailPolicy, person_chooser, address_chooser
from tests.test_sourceppa import FakeOptions
from tests import TestBase
from tests.mock_smtpd import FakeSMTPServer
# Example of a direct upload by core dev: openstack-doc-tools 1.5.0-0ubuntu1
# https://api.launchpad.net/1.0/ubuntu/+archive/primary/+sourcepub/7524835
UPLOAD = dict(
creator_link=None,
package_creator_link="https://api.launchpad.net/1.0/~zulcss",
package_signer_link="https://api.launchpad.net/1.0/~zulcss",
sponsor_link=None,
)
# Example of a sponsored upload: kerneloops 0.12+git20140509-2ubuntu1
# https://api.launchpad.net/1.0/ubuntu/+archive/primary/+sourcepub/7013597
SPONSORED_UPLOAD = dict(
creator_link=None,
package_creator_link="https://api.launchpad.net/1.0/~smb",
package_signer_link="https://api.launchpad.net/1.0/~apw",
sponsor_link=None,
)
# Example of a bileto upload: autopilot 1.6.0+17.04.20170302-0ubuntu1
# https://api.launchpad.net/1.0/ubuntu/+archive/primary/+sourcepub/7525085
# (dobey clicked 'build' and sil2100 clicked 'publish')
BILETO = dict(
creator_link="https://api.launchpad.net/1.0/~sil2100",
package_creator_link="https://api.launchpad.net/1.0/~dobey",
package_signer_link="https://api.launchpad.net/1.0/~ci-train-bot",
sponsor_link="https://api.launchpad.net/1.0/~ubuntu-archive-robot",
)
# Example of a non-sponsored copy: linux 4.10.0-11.13
# https://api.launchpad.net/1.0/ubuntu/+archive/primary/+sourcepub/7522481
# (the upload to the PPA was sponsored but the copy was done directly)
UNSPONSORED_COPY = dict(
creator_link="https://api.launchpad.net/1.0/~timg-tpi",
package_creator_link="https://api.launchpad.net/1.0/~sforshee",
package_signer_link="https://api.launchpad.net/1.0/~timg-tpi",
sponsor_link=None,
)
# Example of a sponsored copy: pagein 0.00.03-1
# https://api.launchpad.net/1.0/ubuntu/+archive/primary/+sourcepub/7533336
SPONSORED_COPY = dict(
creator_link="https://api.launchpad.net/1.0/~colin-king",
package_creator_link="https://api.launchpad.net/1.0/~colin-king",
package_signer_link=None,
sponsor_link="https://api.launchpad.net/1.0/~mapreri",
)
# Example of a manual debian sync: systemd 232-19
# https://api.launchpad.net/1.0/ubuntu/+archive/primary/+sourcepub/7522736
MANUAL_SYNC = dict(
creator_link="https://api.launchpad.net/1.0/~costamagnagianfranco",
package_creator_link="https://api.launchpad.net/1.0/~pkg-systemd-maintainers",
package_signer_link=None,
sponsor_link=None,
)
# Example of a sponsored manual debian sync: python-pymysql 0.7.9-2
# https://api.launchpad.net/1.0/ubuntu/+archive/primary/+sourcepub/7487820
SPONSORED_MANUAL_SYNC = dict(
creator_link="https://api.launchpad.net/1.0/~lars-tangvald",
package_creator_link="https://api.launchpad.net/1.0/~openstack-1.0",
package_signer_link=None,
sponsor_link="https://api.launchpad.net/1.0/~racb",
)
# Example of an automatic debian sync: gem2deb 0.33.1
# https://api.launchpad.net/1.0/ubuntu/+archive/primary/+sourcepub/7255529
AUTO_SYNC = dict(
creator_link="https://api.launchpad.net/1.0/~katie",
package_creator_link="https://api.launchpad.net/1.0/~pkg-ruby-extras-maintainers",
package_signer_link=None,
sponsor_link="https://api.launchpad.net/1.0/~ubuntu-archive-robot",
)
PROMOTED_AUTO_SYNC = [
dict(
creator_link=None,
package_creator_link="https://api.launchpad.net/1.0/~pkg-ruby-extras-maintainers",
package_signer_link=None,
sponsor_link=None,
),
dict(
creator_link="https://api.launchpad.net/1.0/~katie",
package_creator_link="https://api.launchpad.net/1.0/~pkg-ruby-extras-maintainers",
package_signer_link=None,
sponsor_link="https://api.launchpad.net/1.0/~ubuntu-archive-robot",
),
]
# address lists
UBUNTU = ["personal@gmail.com", "ubuntu@ubuntu.com", "work@canonical.com"]
CANONICAL = ["personal@gmail.com", "work@canonical.com"]
COMMUNITY = ["personal@gmail.com", "other@gmail.com"]
def retvals(retvals):
"""Return different retvals on different calls of mock."""
def returner(*args, **kwargs):
return retvals.pop()
return returner
class FakeItem:
package = "chromium-browser"
class FakeSourceData:
version = "55.0"
FakeItem,
class FakeExcuse:
is_valid = True
daysold = 0
reason = []
tentative_policy_verdict = PolicyVerdict.PASS
class T(unittest.TestCase):
maxDiff = None
def test_person_chooser(self):
"""Find the correct person to blame for an upload."""
self.assertEqual(
person_chooser(UPLOAD), {"https://api.launchpad.net/1.0/~zulcss"}
)
self.assertEqual(
person_chooser(SPONSORED_UPLOAD),
{
"https://api.launchpad.net/1.0/~apw",
"https://api.launchpad.net/1.0/~smb",
},
)
self.assertEqual(
person_chooser(BILETO),
{
"https://api.launchpad.net/1.0/~dobey",
"https://api.launchpad.net/1.0/~sil2100",
},
)
self.assertEqual(
person_chooser(UNSPONSORED_COPY),
{"https://api.launchpad.net/1.0/~timg-tpi"},
)
self.assertEqual(
person_chooser(SPONSORED_COPY),
{
"https://api.launchpad.net/1.0/~colin-king",
"https://api.launchpad.net/1.0/~mapreri",
},
)
self.assertEqual(
person_chooser(MANUAL_SYNC),
{"https://api.launchpad.net/1.0/~costamagnagianfranco"},
)
self.assertSequenceEqual(
person_chooser(SPONSORED_MANUAL_SYNC),
{
"https://api.launchpad.net/1.0/~lars-tangvald",
"https://api.launchpad.net/1.0/~racb",
},
)
self.assertEqual(person_chooser(AUTO_SYNC), set())
def test_address_chooser(self):
"""Prioritize email addresses correctly."""
self.assertEqual(address_chooser(UBUNTU), "ubuntu@ubuntu.com")
self.assertEqual(address_chooser(CANONICAL), "work@canonical.com")
self.assertEqual(address_chooser(COMMUNITY), "personal@gmail.com")
@patch("britney2.policies.email.EmailPolicy.query_rest_api")
@patch("britney2.policies.email.EmailPolicy.query_lp_rest_api")
def test_email_promoted_package(self, lp, rest):
"""When a package has been promoted in proposed, we find the older SPPH
and use its details - in the case of an autosync to not email."""
lp.return_value = dict(entries=PROMOTED_AUTO_SYNC)
e = EmailPolicy(FakeOptions, None)
self.assertEqual(
e.lp_get_emails("openstack-doct-tools", "1.5.0-0ubuntu1"), []
)
self.assertSequenceEqual(
lp.mock_calls,
[
call(
"testbuntu/+archive/primary",
{
"distro_series": "/testbuntu/zazzy",
"exact_match": "true",
"order_by_date": "true",
"pocket": "Proposed",
"source_name": "openstack-doct-tools",
"version": "1.5.0-0ubuntu1",
"ws.op": "getPublishedSources",
},
)
],
)
self.assertSequenceEqual(rest.mock_calls, [])
@patch("britney2.policies.email.EmailPolicy.query_rest_api")
@patch("britney2.policies.email.EmailPolicy.query_lp_rest_api")
def test_email_scraping(self, lp, rest):
"""Poke correct REST APIs to find email addresses."""
lp.side_effect = retvals(
[
dict(entries=[dict(fingerprint="DEFACED_ED1F1CE")]),
dict(entries=[UPLOAD]),
]
)
rest.return_value = "uid:Defaced Edifice <ex@example.com>:12345::"
e = EmailPolicy(FakeOptions, None)
self.assertEqual(
e.lp_get_emails("openstack-doct-tools", "1.5.0-0ubuntu1"),
["ex@example.com"],
)
self.assertSequenceEqual(
lp.mock_calls,
[
call(
"testbuntu/+archive/primary",
{
"distro_series": "/testbuntu/zazzy",
"exact_match": "true",
"order_by_date": "true",
"pocket": "Proposed",
"source_name": "openstack-doct-tools",
"version": "1.5.0-0ubuntu1",
"ws.op": "getPublishedSources",
},
),
call("https://api.launchpad.net/1.0/~zulcss/gpg_keys", {}),
],
)
self.assertSequenceEqual(
rest.mock_calls,
[
call(
"http://keyserver.ubuntu.com/pks/lookup",
{
"exact": "on",
"op": "index",
"options": "mr",
"search": "0xDEFACED_ED1F1CE",
},
)
],
)
@patch("britney2.policies.email.EmailPolicy.lp_get_emails")
@patch("britney2.policies.email.smtplib")
def test_smtp_not_sent(self, smtp, lp):
"""Know when not to send any emails."""
lp.return_value = ["example@email.com"]
e = EmailPolicy(FakeOptions, None)
FakeExcuse.daysold = 0.002
e.apply_src_policy_impl(
None, FakeItem, None, FakeSourceData, FakeExcuse
)
FakeExcuse.daysold = 2.98
e.apply_src_policy_impl(
None, FakeItem, None, FakeSourceData, FakeExcuse
)
# Would email but no address found
FakeExcuse.daysold = 10.12
lp.return_value = []
e.apply_src_policy_impl(
None, FakeItem, None, FakeSourceData, FakeExcuse
)
self.assertEqual(smtp.mock_calls, [])
@patch("britney2.policies.email.EmailPolicy.lp_get_emails")
@patch("britney2.policies.email.smtplib")
def test_smtp_sent(self, smtp, lp):
"""Send emails correctly."""
lp.return_value = ["email@address.com"]
e = EmailPolicy(FakeOptions, None)
FakeExcuse.is_valid = False
FakeExcuse.daysold = 100
e.apply_src_policy_impl(
None, FakeItem, None, FakeSourceData, FakeExcuse
)
smtp.SMTP.assert_called_once_with("localhost")
@patch("britney2.policies.email.EmailPolicy.lp_get_emails")
@patch("britney2.policies.email.smtplib", autospec=True)
def smtp_repetition(self, smtp, lp, valid, expected):
"""Resend mails periodically, with decreasing frequency."""
if not isinstance(valid, list):
valid = [valid] * len(expected)
FakeExcuse.is_valid = valid
lp.return_value = ["email@address.com"]
sendmail = smtp.SMTP().sendmail
e = EmailPolicy(FakeOptions, None)
called = []
e.cache = {}
for hours in range(0, 5000):
previous = sendmail.call_count
age = hours / 24
FakeExcuse.daysold = age
try:
FakeExcuse.is_valid = valid[len(called)]
except IndexError:
# we've already gotten all the mails we expect
pass
e.apply_src_policy_impl(
None, FakeItem, None, FakeSourceData, FakeExcuse
)
if sendmail.call_count > previous:
e.initialise(None) # Refill e.cache from disk
called.append(age)
name, args, kwargs = sendmail.mock_calls[-1]
text = args[2]
self.assertNotIn(" 1 days.", text)
self.assertSequenceEqual(called, expected)
def test_smtp_repetition(self):
"""Confirm that emails are sent at appropriate intervals."""
# Emails were sent when daysold reached these values:
self.smtp_repetition(
valid=False, expected=[1, 3, 7, 15, 31, 61, 91, 121, 151, 181]
)
self.smtp_repetition(
valid=True, expected=[5, 7, 11, 19, 35, 65, 95, 125, 155, 185]
)
self.smtp_repetition(
valid=[False, False, True],
expected=[1, 3, 5, 7, 11, 19, 35, 65, 95, 125, 155, 185],
)
self.smtp_repetition(
valid=[False, False, True, False, True],
expected=[1, 3, 5, 7, 11, 19, 35, 65, 95, 125, 155, 185],
)
class ET(TestBase):
""" Test sending mail through a mocked SMTP server """
@classmethod
def setUpClass(cls):
cls.smtpd = FakeSMTPServer("localhost", 1337)
cls.smtpd.run()
@classmethod
def tearDownClass(cls):
cls.smtpd.close()
def setUp(self):
super().setUp()
# disable ADT, not relevant for us
for line in fileinput.input(self.britney_conf, inplace=True):
if line.startswith("ADT_ENABLE"):
print("ADT_ENABLE = no")
elif line.startswith("MINDAYS_EMERGENCY"):
print("MINDAYS_EMERGENCY = 10")
elif not line.startswith("ADT_AMQP") and not line.startswith(
"ADT_SWIFT_URL"
):
sys.stdout.write(line)
# and set up a fake smtpd
with open(self.britney_conf, "a") as f:
f.write("EMAIL_HOST = localhost:1337")
self.age_file = os.path.join(self.data.dirs[False], "Dates")
self.urgency_file = os.path.join(self.data.dirs[False], "Urgency")
self.email_cache_file = os.path.join(self.data.dirs[True], "EmailCache")
self.sourceppa_cache = {}
self.email_cache = {}
self.data.add("libc6", False)
def do_test(self, unstable_add, expect_emails):
"""Run britney with some unstable packages and verify excuses.
unstable_add is a list of (binpkgname, field_dict, daysold, emails)
expect_emails is a list that is checked against the emails sent during
this do_test run.
Return (output, excuses_dict, excuses_html, emails).
"""
ET.smtpd.emails.clear()
for (pkg, fields, daysold, emails) in unstable_add:
self.data.add(pkg, True, fields, True, None)
self.sourceppa_cache.setdefault(pkg, {})
if fields["Version"] not in self.sourceppa_cache[pkg]:
self.sourceppa_cache[pkg][fields["Version"]] = ""
with open(self.age_file, "w") as f:
import time
do = time.time() - (60 * 60 * 24 * daysold)
f.write("%s %s %d" % (pkg, fields["Version"], do))
with open(self.email_cache_file, "w") as f:
d = defaultdict(dict)
d[pkg][fields["Version"]] = (emails, 0)
f.write(json.dumps(d))
# Set up sourceppa cache for testing
sourceppa_path = os.path.join(self.data.dirs[True], "SourcePPA")
with open(sourceppa_path, "w", encoding="utf-8") as sourceppa:
sourceppa.write(json.dumps(self.sourceppa_cache))
(excuses_yaml, excuses_html, out) = self.run_britney()
# convert excuses to source indexed dict
excuses_dict = {}
for s in yaml.safe_load(excuses_yaml)["sources"]:
excuses_dict[s["source"]] = s
if "SHOW_EXCUSES" in os.environ:
print("------- excuses -----")
pprint.pprint(excuses_dict, width=200)
if "SHOW_HTML" in os.environ:
print("------- excuses.html -----\n%s\n" % excuses_html)
if "SHOW_OUTPUT" in os.environ:
print("------- output -----\n%s\n" % out)
self.assertNotIn("FIXME", out)
# check all the emails that we asked for are there
for email in expect_emails:
self.assertIn(email, ET.smtpd.get_emails())
self.assertEqual(len(ET.smtpd.get_emails()), len(expect_emails))
return (out, excuses_dict, excuses_html, ET.smtpd.emails)
def test_email_sent(self):
"""Test that an email is sent through the SMTP server"""
pkg = (
"libc6",
{"Version": "2", "Depends": "notavailable (>= 2)"},
6,
["foo@bar.com"],
)
self.do_test([pkg], ["foo@bar.com"])
def test_email_not_sent_block_all_source(self):
"""Test that an email is not sent if the package is blocked by a
block-all source hint"""
self.create_hint("freeze", "block-all source")
pkg = ("libc6", {"Version": "2"}, 6, ["foo@bar.com"]) # daysold
self.do_test([pkg], [])
def test_email_not_sent_blocked(self):
"""Test that an email is not sent if the package is blocked by a block hint"""
self.create_hint("freeze", "block libc6")
pkg = ("libc6", {"Version": "2"}, 6, ["foo@bar.com"]) # daysold
self.do_test([pkg], [])
def test_email_sent_unblocked(self):
"""Test that an email is sent if the package is unblocked"""
self.create_hint("freeze", "block libc6")
self.create_hint("freeze-exception", "unblock libc6/2")
pkg = (
"libc6",
{"Version": "2"},
6, # daysold
["foo@bar.com"],
)
self.do_test([pkg], ["foo@bar.com"])
def test_email_not_sent_rejected_temporarily(self):
"""Test that an email is not sent if the package is REJECTED_TEMPORARILY"""
with open(self.urgency_file, "w") as f:
# we specified in setUp() that emergency has a 10 day delay, and
# age rejections are REJECTED_TEMPORARILY
f.write("libc6 2 emergency")
pkg = (
"libc6",
{"Version": "2"},
6, # daysold
["foo@bar.com"],
)
self.do_test([pkg], [])
if __name__ == "__main__":
unittest.main()
Loading…
Cancel
Save