You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
517 lines
18 KiB
517 lines
18 KiB
#!/usr/bin/python3
|
|
# (C) 2017 Canonical Ltd.
|
|
#
|
|
# This program is free software; you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation; either version 2 of the License, or
|
|
# (at your option) any later version.
|
|
|
|
from collections import defaultdict
|
|
|
|
import fileinput
|
|
import json
|
|
import os
|
|
import pprint
|
|
import sys
|
|
import unittest
|
|
import yaml
|
|
from unittest.mock import DEFAULT, patch, call
|
|
|
|
PROJECT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
|
sys.path.insert(0, PROJECT_DIR)
|
|
|
|
from britney2.policies.policy import PolicyVerdict
|
|
from britney2.policies.email import EmailPolicy, person_chooser, address_chooser
|
|
|
|
from tests.test_sourceppa import FakeOptions
|
|
from tests import TestBase
|
|
from tests.mock_smtpd import FakeSMTPServer
|
|
|
|
# Example of a direct upload by core dev: openstack-doc-tools 1.5.0-0ubuntu1
|
|
# https://api.launchpad.net/1.0/ubuntu/+archive/primary/+sourcepub/7524835
|
|
UPLOAD = dict(
|
|
creator_link=None,
|
|
package_creator_link="https://api.launchpad.net/1.0/~zulcss",
|
|
package_signer_link="https://api.launchpad.net/1.0/~zulcss",
|
|
sponsor_link=None,
|
|
)
|
|
|
|
# Example of a sponsored upload: kerneloops 0.12+git20140509-2ubuntu1
|
|
# https://api.launchpad.net/1.0/ubuntu/+archive/primary/+sourcepub/7013597
|
|
SPONSORED_UPLOAD = dict(
|
|
creator_link=None,
|
|
package_creator_link="https://api.launchpad.net/1.0/~smb",
|
|
package_signer_link="https://api.launchpad.net/1.0/~apw",
|
|
sponsor_link=None,
|
|
)
|
|
|
|
# Example of a bileto upload: autopilot 1.6.0+17.04.20170302-0ubuntu1
|
|
# https://api.launchpad.net/1.0/ubuntu/+archive/primary/+sourcepub/7525085
|
|
# (dobey clicked 'build' and sil2100 clicked 'publish')
|
|
BILETO = dict(
|
|
creator_link="https://api.launchpad.net/1.0/~sil2100",
|
|
package_creator_link="https://api.launchpad.net/1.0/~dobey",
|
|
package_signer_link="https://api.launchpad.net/1.0/~ci-train-bot",
|
|
sponsor_link="https://api.launchpad.net/1.0/~ubuntu-archive-robot",
|
|
)
|
|
|
|
# Example of a non-sponsored copy: linux 4.10.0-11.13
|
|
# https://api.launchpad.net/1.0/ubuntu/+archive/primary/+sourcepub/7522481
|
|
# (the upload to the PPA was sponsored but the copy was done directly)
|
|
UNSPONSORED_COPY = dict(
|
|
creator_link="https://api.launchpad.net/1.0/~timg-tpi",
|
|
package_creator_link="https://api.launchpad.net/1.0/~sforshee",
|
|
package_signer_link="https://api.launchpad.net/1.0/~timg-tpi",
|
|
sponsor_link=None,
|
|
)
|
|
|
|
# Example of a sponsored copy: pagein 0.00.03-1
|
|
# https://api.launchpad.net/1.0/ubuntu/+archive/primary/+sourcepub/7533336
|
|
SPONSORED_COPY = dict(
|
|
creator_link="https://api.launchpad.net/1.0/~colin-king",
|
|
package_creator_link="https://api.launchpad.net/1.0/~colin-king",
|
|
package_signer_link=None,
|
|
sponsor_link="https://api.launchpad.net/1.0/~mapreri",
|
|
)
|
|
|
|
# Example of a manual debian sync: systemd 232-19
|
|
# https://api.launchpad.net/1.0/ubuntu/+archive/primary/+sourcepub/7522736
|
|
MANUAL_SYNC = dict(
|
|
creator_link="https://api.launchpad.net/1.0/~costamagnagianfranco",
|
|
package_creator_link="https://api.launchpad.net/1.0/~pkg-systemd-maintainers",
|
|
package_signer_link=None,
|
|
sponsor_link=None,
|
|
)
|
|
|
|
# Example of a sponsored manual debian sync: python-pymysql 0.7.9-2
|
|
# https://api.launchpad.net/1.0/ubuntu/+archive/primary/+sourcepub/7487820
|
|
SPONSORED_MANUAL_SYNC = dict(
|
|
creator_link="https://api.launchpad.net/1.0/~lars-tangvald",
|
|
package_creator_link="https://api.launchpad.net/1.0/~openstack-1.0",
|
|
package_signer_link=None,
|
|
sponsor_link="https://api.launchpad.net/1.0/~racb",
|
|
)
|
|
|
|
# Example of an automatic debian sync: gem2deb 0.33.1
|
|
# https://api.launchpad.net/1.0/ubuntu/+archive/primary/+sourcepub/7255529
|
|
AUTO_SYNC = dict(
|
|
creator_link="https://api.launchpad.net/1.0/~katie",
|
|
package_creator_link="https://api.launchpad.net/1.0/~pkg-ruby-extras-maintainers",
|
|
package_signer_link=None,
|
|
sponsor_link="https://api.launchpad.net/1.0/~ubuntu-archive-robot",
|
|
)
|
|
|
|
PROMOTED_AUTO_SYNC = [
|
|
dict(
|
|
creator_link=None,
|
|
package_creator_link="https://api.launchpad.net/1.0/~pkg-ruby-extras-maintainers",
|
|
package_signer_link=None,
|
|
sponsor_link=None,
|
|
),
|
|
dict(
|
|
creator_link="https://api.launchpad.net/1.0/~katie",
|
|
package_creator_link="https://api.launchpad.net/1.0/~pkg-ruby-extras-maintainers",
|
|
package_signer_link=None,
|
|
sponsor_link="https://api.launchpad.net/1.0/~ubuntu-archive-robot",
|
|
),
|
|
]
|
|
|
|
|
|
# address lists
|
|
UBUNTU = ["personal@gmail.com", "ubuntu@ubuntu.com", "work@canonical.com"]
|
|
CANONICAL = ["personal@gmail.com", "work@canonical.com"]
|
|
COMMUNITY = ["personal@gmail.com", "other@gmail.com"]
|
|
|
|
|
|
def retvals(retvals):
|
|
"""Return different retvals on different calls of mock."""
|
|
|
|
def returner(*args, **kwargs):
|
|
return retvals.pop()
|
|
|
|
return returner
|
|
|
|
|
|
class FakeItem:
|
|
package = "chromium-browser"
|
|
|
|
|
|
class FakeSourceData:
|
|
version = "55.0"
|
|
|
|
|
|
FakeItem,
|
|
|
|
|
|
class FakeExcuse:
|
|
is_valid = True
|
|
daysold = 0
|
|
reason = []
|
|
tentative_policy_verdict = PolicyVerdict.PASS
|
|
|
|
|
|
class T(unittest.TestCase):
|
|
maxDiff = None
|
|
|
|
def test_person_chooser(self):
|
|
"""Find the correct person to blame for an upload."""
|
|
self.assertEqual(
|
|
person_chooser(UPLOAD), {"https://api.launchpad.net/1.0/~zulcss"}
|
|
)
|
|
self.assertEqual(
|
|
person_chooser(SPONSORED_UPLOAD),
|
|
{
|
|
"https://api.launchpad.net/1.0/~apw",
|
|
"https://api.launchpad.net/1.0/~smb",
|
|
},
|
|
)
|
|
self.assertEqual(
|
|
person_chooser(BILETO),
|
|
{
|
|
"https://api.launchpad.net/1.0/~dobey",
|
|
"https://api.launchpad.net/1.0/~sil2100",
|
|
},
|
|
)
|
|
self.assertEqual(
|
|
person_chooser(UNSPONSORED_COPY),
|
|
{"https://api.launchpad.net/1.0/~timg-tpi"},
|
|
)
|
|
self.assertEqual(
|
|
person_chooser(SPONSORED_COPY),
|
|
{
|
|
"https://api.launchpad.net/1.0/~colin-king",
|
|
"https://api.launchpad.net/1.0/~mapreri",
|
|
},
|
|
)
|
|
self.assertEqual(
|
|
person_chooser(MANUAL_SYNC),
|
|
{"https://api.launchpad.net/1.0/~costamagnagianfranco"},
|
|
)
|
|
self.assertSequenceEqual(
|
|
person_chooser(SPONSORED_MANUAL_SYNC),
|
|
{
|
|
"https://api.launchpad.net/1.0/~lars-tangvald",
|
|
"https://api.launchpad.net/1.0/~racb",
|
|
},
|
|
)
|
|
self.assertEqual(person_chooser(AUTO_SYNC), set())
|
|
|
|
def test_address_chooser(self):
|
|
"""Prioritize email addresses correctly."""
|
|
self.assertEqual(address_chooser(UBUNTU), "ubuntu@ubuntu.com")
|
|
self.assertEqual(address_chooser(CANONICAL), "work@canonical.com")
|
|
self.assertEqual(address_chooser(COMMUNITY), "personal@gmail.com")
|
|
|
|
@patch("britney2.policies.email.EmailPolicy.query_rest_api")
|
|
@patch("britney2.policies.email.EmailPolicy.query_lp_rest_api")
|
|
def test_email_promoted_package(self, lp, rest):
|
|
"""When a package has been promoted in proposed, we find the older SPPH
|
|
and use its details - in the case of an autosync to not email."""
|
|
lp.return_value = dict(entries=PROMOTED_AUTO_SYNC)
|
|
e = EmailPolicy(FakeOptions, None)
|
|
self.assertEqual(
|
|
e.lp_get_emails("openstack-doct-tools", "1.5.0-0ubuntu1"), []
|
|
)
|
|
self.assertSequenceEqual(
|
|
lp.mock_calls,
|
|
[
|
|
call(
|
|
"testbuntu/+archive/primary",
|
|
{
|
|
"distro_series": "/testbuntu/zazzy",
|
|
"exact_match": "true",
|
|
"order_by_date": "true",
|
|
"pocket": "Proposed",
|
|
"source_name": "openstack-doct-tools",
|
|
"version": "1.5.0-0ubuntu1",
|
|
"ws.op": "getPublishedSources",
|
|
},
|
|
)
|
|
],
|
|
)
|
|
self.assertSequenceEqual(rest.mock_calls, [])
|
|
|
|
@patch("britney2.policies.email.EmailPolicy.query_rest_api")
|
|
@patch("britney2.policies.email.EmailPolicy.query_lp_rest_api")
|
|
def test_email_scraping(self, lp, rest):
|
|
"""Poke correct REST APIs to find email addresses."""
|
|
lp.side_effect = retvals(
|
|
[
|
|
dict(entries=[dict(fingerprint="DEFACED_ED1F1CE")]),
|
|
dict(entries=[UPLOAD]),
|
|
]
|
|
)
|
|
rest.return_value = "uid:Defaced Edifice <ex@example.com>:12345::"
|
|
e = EmailPolicy(FakeOptions, None)
|
|
self.assertEqual(
|
|
e.lp_get_emails("openstack-doct-tools", "1.5.0-0ubuntu1"),
|
|
["ex@example.com"],
|
|
)
|
|
self.assertSequenceEqual(
|
|
lp.mock_calls,
|
|
[
|
|
call(
|
|
"testbuntu/+archive/primary",
|
|
{
|
|
"distro_series": "/testbuntu/zazzy",
|
|
"exact_match": "true",
|
|
"order_by_date": "true",
|
|
"pocket": "Proposed",
|
|
"source_name": "openstack-doct-tools",
|
|
"version": "1.5.0-0ubuntu1",
|
|
"ws.op": "getPublishedSources",
|
|
},
|
|
),
|
|
call("https://api.launchpad.net/1.0/~zulcss/gpg_keys", {}),
|
|
],
|
|
)
|
|
self.assertSequenceEqual(
|
|
rest.mock_calls,
|
|
[
|
|
call(
|
|
"http://keyserver.ubuntu.com/pks/lookup",
|
|
{
|
|
"exact": "on",
|
|
"op": "index",
|
|
"options": "mr",
|
|
"search": "0xDEFACED_ED1F1CE",
|
|
},
|
|
)
|
|
],
|
|
)
|
|
|
|
@patch("britney2.policies.email.EmailPolicy.lp_get_emails")
|
|
@patch("britney2.policies.email.smtplib")
|
|
def test_smtp_not_sent(self, smtp, lp):
|
|
"""Know when not to send any emails."""
|
|
lp.return_value = ["example@email.com"]
|
|
e = EmailPolicy(FakeOptions, None)
|
|
FakeExcuse.daysold = 0.002
|
|
e.apply_src_policy_impl(
|
|
None, FakeItem, None, FakeSourceData, FakeExcuse
|
|
)
|
|
FakeExcuse.daysold = 2.98
|
|
e.apply_src_policy_impl(
|
|
None, FakeItem, None, FakeSourceData, FakeExcuse
|
|
)
|
|
# Would email but no address found
|
|
FakeExcuse.daysold = 10.12
|
|
lp.return_value = []
|
|
e.apply_src_policy_impl(
|
|
None, FakeItem, None, FakeSourceData, FakeExcuse
|
|
)
|
|
self.assertEqual(smtp.mock_calls, [])
|
|
|
|
@patch("britney2.policies.email.EmailPolicy.lp_get_emails")
|
|
@patch("britney2.policies.email.smtplib")
|
|
def test_smtp_sent(self, smtp, lp):
|
|
"""Send emails correctly."""
|
|
lp.return_value = ["email@address.com"]
|
|
e = EmailPolicy(FakeOptions, None)
|
|
FakeExcuse.is_valid = False
|
|
FakeExcuse.daysold = 100
|
|
e.apply_src_policy_impl(
|
|
None, FakeItem, None, FakeSourceData, FakeExcuse
|
|
)
|
|
smtp.SMTP.assert_called_once_with("localhost")
|
|
|
|
@patch("britney2.policies.email.EmailPolicy.lp_get_emails")
|
|
@patch("britney2.policies.email.smtplib", autospec=True)
|
|
def smtp_repetition(self, smtp, lp, valid, expected):
|
|
"""Resend mails periodically, with decreasing frequency."""
|
|
if not isinstance(valid, list):
|
|
valid = [valid] * len(expected)
|
|
FakeExcuse.is_valid = valid
|
|
lp.return_value = ["email@address.com"]
|
|
sendmail = smtp.SMTP().sendmail
|
|
e = EmailPolicy(FakeOptions, None)
|
|
called = []
|
|
e.cache = {}
|
|
for hours in range(0, 5000):
|
|
previous = sendmail.call_count
|
|
age = hours / 24
|
|
FakeExcuse.daysold = age
|
|
try:
|
|
FakeExcuse.is_valid = valid[len(called)]
|
|
except IndexError:
|
|
# we've already gotten all the mails we expect
|
|
pass
|
|
e.apply_src_policy_impl(
|
|
None, FakeItem, None, FakeSourceData, FakeExcuse
|
|
)
|
|
if sendmail.call_count > previous:
|
|
e.initialise(None) # Refill e.cache from disk
|
|
called.append(age)
|
|
name, args, kwargs = sendmail.mock_calls[-1]
|
|
text = args[2]
|
|
self.assertNotIn(" 1 days.", text)
|
|
self.assertSequenceEqual(called, expected)
|
|
|
|
def test_smtp_repetition(self):
|
|
"""Confirm that emails are sent at appropriate intervals."""
|
|
# Emails were sent when daysold reached these values:
|
|
self.smtp_repetition(
|
|
valid=False, expected=[1, 3, 7, 15, 31, 61, 91, 121, 151, 181]
|
|
)
|
|
self.smtp_repetition(
|
|
valid=True, expected=[5, 7, 11, 19, 35, 65, 95, 125, 155, 185]
|
|
)
|
|
self.smtp_repetition(
|
|
valid=[False, False, True],
|
|
expected=[1, 3, 5, 7, 11, 19, 35, 65, 95, 125, 155, 185],
|
|
)
|
|
self.smtp_repetition(
|
|
valid=[False, False, True, False, True],
|
|
expected=[1, 3, 5, 7, 11, 19, 35, 65, 95, 125, 155, 185],
|
|
)
|
|
|
|
|
|
class ET(TestBase):
|
|
""" Test sending mail through a mocked SMTP server """
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
cls.smtpd = FakeSMTPServer("localhost", 1337)
|
|
cls.smtpd.run()
|
|
|
|
@classmethod
|
|
def tearDownClass(cls):
|
|
cls.smtpd.close()
|
|
|
|
def setUp(self):
|
|
super().setUp()
|
|
# disable ADT, not relevant for us
|
|
for line in fileinput.input(self.britney_conf, inplace=True):
|
|
if line.startswith("ADT_ENABLE"):
|
|
print("ADT_ENABLE = no")
|
|
elif line.startswith("MINDAYS_EMERGENCY"):
|
|
print("MINDAYS_EMERGENCY = 10")
|
|
elif not line.startswith("ADT_AMQP") and not line.startswith(
|
|
"ADT_SWIFT_URL"
|
|
):
|
|
sys.stdout.write(line)
|
|
# and set up a fake smtpd
|
|
with open(self.britney_conf, "a") as f:
|
|
f.write("EMAIL_HOST = localhost:1337")
|
|
self.age_file = os.path.join(self.data.dirs[False], "Dates")
|
|
self.urgency_file = os.path.join(self.data.dirs[False], "Urgency")
|
|
self.email_cache_file = os.path.join(self.data.dirs[True], "EmailCache")
|
|
self.sourceppa_cache = {}
|
|
self.email_cache = {}
|
|
|
|
self.data.add("libc6", False)
|
|
|
|
def do_test(self, unstable_add, expect_emails):
|
|
"""Run britney with some unstable packages and verify excuses.
|
|
|
|
unstable_add is a list of (binpkgname, field_dict, daysold, emails)
|
|
|
|
expect_emails is a list that is checked against the emails sent during
|
|
this do_test run.
|
|
|
|
Return (output, excuses_dict, excuses_html, emails).
|
|
"""
|
|
ET.smtpd.emails.clear()
|
|
for (pkg, fields, daysold, emails) in unstable_add:
|
|
self.data.add(pkg, True, fields, True, None)
|
|
self.sourceppa_cache.setdefault(pkg, {})
|
|
if fields["Version"] not in self.sourceppa_cache[pkg]:
|
|
self.sourceppa_cache[pkg][fields["Version"]] = ""
|
|
with open(self.age_file, "w") as f:
|
|
import time
|
|
|
|
do = time.time() - (60 * 60 * 24 * daysold)
|
|
f.write("%s %s %d" % (pkg, fields["Version"], do))
|
|
|
|
with open(self.email_cache_file, "w") as f:
|
|
d = defaultdict(dict)
|
|
d[pkg][fields["Version"]] = (emails, 0)
|
|
f.write(json.dumps(d))
|
|
|
|
# Set up sourceppa cache for testing
|
|
sourceppa_path = os.path.join(self.data.dirs[True], "SourcePPA")
|
|
with open(sourceppa_path, "w", encoding="utf-8") as sourceppa:
|
|
sourceppa.write(json.dumps(self.sourceppa_cache))
|
|
|
|
(excuses_yaml, excuses_html, out) = self.run_britney()
|
|
|
|
# convert excuses to source indexed dict
|
|
excuses_dict = {}
|
|
for s in yaml.safe_load(excuses_yaml)["sources"]:
|
|
excuses_dict[s["source"]] = s
|
|
|
|
if "SHOW_EXCUSES" in os.environ:
|
|
print("------- excuses -----")
|
|
pprint.pprint(excuses_dict, width=200)
|
|
if "SHOW_HTML" in os.environ:
|
|
print("------- excuses.html -----\n%s\n" % excuses_html)
|
|
if "SHOW_OUTPUT" in os.environ:
|
|
print("------- output -----\n%s\n" % out)
|
|
|
|
self.assertNotIn("FIXME", out)
|
|
# check all the emails that we asked for are there
|
|
for email in expect_emails:
|
|
self.assertIn(email, ET.smtpd.get_emails())
|
|
self.assertEqual(len(ET.smtpd.get_emails()), len(expect_emails))
|
|
|
|
return (out, excuses_dict, excuses_html, ET.smtpd.emails)
|
|
|
|
def test_email_sent(self):
|
|
"""Test that an email is sent through the SMTP server"""
|
|
pkg = (
|
|
"libc6",
|
|
{"Version": "2", "Depends": "notavailable (>= 2)"},
|
|
6,
|
|
["foo@bar.com"],
|
|
)
|
|
|
|
self.do_test([pkg], ["foo@bar.com"])
|
|
|
|
def test_email_not_sent_block_all_source(self):
|
|
"""Test that an email is not sent if the package is blocked by a
|
|
block-all source hint"""
|
|
self.create_hint("freeze", "block-all source")
|
|
pkg = ("libc6", {"Version": "2"}, 6, ["foo@bar.com"]) # daysold
|
|
|
|
self.do_test([pkg], [])
|
|
|
|
def test_email_not_sent_blocked(self):
|
|
"""Test that an email is not sent if the package is blocked by a block hint"""
|
|
self.create_hint("freeze", "block libc6")
|
|
pkg = ("libc6", {"Version": "2"}, 6, ["foo@bar.com"]) # daysold
|
|
|
|
self.do_test([pkg], [])
|
|
|
|
def test_email_sent_unblocked(self):
|
|
"""Test that an email is sent if the package is unblocked"""
|
|
self.create_hint("freeze", "block libc6")
|
|
self.create_hint("freeze-exception", "unblock libc6/2")
|
|
pkg = (
|
|
"libc6",
|
|
{"Version": "2"},
|
|
6, # daysold
|
|
["foo@bar.com"],
|
|
)
|
|
|
|
self.do_test([pkg], ["foo@bar.com"])
|
|
|
|
def test_email_not_sent_rejected_temporarily(self):
|
|
"""Test that an email is not sent if the package is REJECTED_TEMPORARILY"""
|
|
with open(self.urgency_file, "w") as f:
|
|
# we specified in setUp() that emergency has a 10 day delay, and
|
|
# age rejections are REJECTED_TEMPORARILY
|
|
f.write("libc6 2 emergency")
|
|
|
|
pkg = (
|
|
"libc6",
|
|
{"Version": "2"},
|
|
6, # daysold
|
|
["foo@bar.com"],
|
|
)
|
|
|
|
self.do_test([pkg], [])
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|