Autopkgtest: Simplify test request and result retrieval workflow

Splitting up the processes of request(), submit(), and collect() makes our data
structures, house keeping, and code unnecessarily complicated. Drop the latter
two and now do all of it in just request(). This avoids having to have a
separate requested_test map, having to fetch test results twice, and gets rid
of some state keeping.
bzr-import-20160707
Martin Pitt 9 years ago
parent f94c2c0ade
commit ea1c335992

@ -57,10 +57,8 @@ class AutoPackageTest(object):
self.excludes = set()
self.test_state_dir = os.path.join(britney.options.unstable,
'autopkgtest')
# map of requested tests from request()
# tests requested in this and previous runs
# trigger -> src -> [arch]
self.requested_tests = {}
# same map for tests requested in previous runs
self.pending_tests = None
self.pending_tests_file = os.path.join(self.test_state_dir, 'pending.json')
@ -89,6 +87,32 @@ class AutoPackageTest(object):
self.log_verbose('%s does not exist, re-downloading all results '
'from swift' % self.results_cache_file)
self.setup_amqp()
def setup_amqp(self):
'''Initialize AMQP connection'''
self.amqp_channel = None
self.amqp_file = None
if self.britney.options.dry_run:
# in dry-run mode, don't issue any requests
return
amqp_url = self.britney.options.adt_amqp
if amqp_url.startswith('amqp://'):
# in production mode, connect to AMQP server
creds = urllib.parse.urlsplit(amqp_url, allow_fragments=False)
self.amqp_con = amqp.Connection(creds.hostname, userid=creds.username,
password=creds.password)
self.amqp_channel = self.amqp_con.channel()
self.log_verbose('Connected to AMQP server %s' % amqp_url)
elif amqp_url.startswith('file://'):
# in testing mode, adt_amqp will be a file:// URL
self.amqp_file = amqp_url[7:]
else:
raise RuntimeError('Unknown ADT_AMQP schema in %s' % amqp_url)
def log_verbose(self, msg):
if self.britney.options.verbose:
print('I: [%s] - %s' % (time.asctime(), msg))
@ -251,55 +275,6 @@ class AutoPackageTest(object):
self.log_verbose('Read pending requested tests from %s: %s' %
(self.pending_tests_file, self.pending_tests))
def update_pending_tests(self):
'''Update pending tests after submitting requested tests'''
# merge requested_tests into pending_tests
for trigger, srcmap in self.requested_tests.items():
for src, archlist in srcmap.items():
try:
arches = set(self.pending_tests[trigger][src])
except KeyError:
arches = set()
arches.update(archlist)
self.pending_tests.setdefault(trigger, {})[src] = sorted(arches)
self.requested_tests.clear()
# write it
with open(self.pending_tests_file + '.new', 'w') as f:
json.dump(self.pending_tests, f, indent=2)
os.rename(self.pending_tests_file + '.new', self.pending_tests_file)
self.log_verbose('Updated pending requested tests in %s' %
self.pending_tests_file)
def add_test_request(self, src, arch, trigger):
'''Add one test request to the local self.requested_tests queue
trigger is "pkgname/version" of the package that triggers the testing
of src.
This will only be done if that test wasn't already requested in a
previous run (i. e. not already in self.pending_tests) or there already
is a result for it.
'''
# Don't re-request if we already have a result
try:
self.test_results[trigger][src][arch]
self.log_verbose('There already is a result for %s/%s triggered by %s' %
(src, arch, trigger))
return
except KeyError:
pass
# Don't re-request if it's already pending
if arch in self.pending_tests.get(trigger, {}).get(src, []):
self.log_verbose('test %s/%s for %s is already pending, not queueing' %
(src, arch, trigger))
else:
arch_list = self.requested_tests.setdefault(trigger, {}).setdefault(src, [])
assert arch not in arch_list
arch_list.append(arch)
def latest_run_for_package(self, src, arch):
'''Return latest run ID for src on arch'''
@ -326,6 +301,13 @@ class AutoPackageTest(object):
def fetch_swift_results(self, swift_url, src, arch):
'''Download new results for source package/arch from swift'''
# Download results for one particular src/arch at most once in every
# run, as this is expensive
done_entry = src + '/' + arch
if done_entry in self.fetch_swift_results._done:
return
self.fetch_swift_results._done.add(done_entry)
# prepare query: get all runs with a timestamp later than the latest
# run_id for this package/arch; '@' is at the end of each run id, to
# mark the end of a test run directory path
@ -361,6 +343,8 @@ class AutoPackageTest(object):
self.fetch_one_result(
os.path.join(swift_url, 'autopkgtest-' + self.series, p, 'result.tar'), src, arch)
fetch_swift_results._done = set()
def fetch_one_result(self, url, src, arch):
'''Download one result URL for source/arch
@ -414,17 +398,16 @@ class AutoPackageTest(object):
# remove matching test requests
for trigger in result_triggers:
for request_map in [self.requested_tests, self.pending_tests]:
try:
arch_list = request_map[trigger][src]
arch_list.remove(arch)
if not arch_list:
del request_map[trigger][src]
if not request_map[trigger]:
del request_map[trigger]
self.log_verbose('-> matches pending request %s/%s for trigger %s' % (src, arch, trigger))
except (KeyError, ValueError):
self.log_verbose('-> does not match any pending request for %s/%s' % (src, arch))
try:
arch_list = self.pending_tests[trigger][src]
arch_list.remove(arch)
if not arch_list:
del self.pending_tests[trigger][src]
if not self.pending_tests[trigger]:
del self.pending_tests[trigger]
self.log_verbose('-> matches pending request %s/%s for trigger %s' % (src, arch, trigger))
except (KeyError, ValueError):
self.log_verbose('-> does not match any pending request for %s/%s' % (src, arch))
# add this result
for trigger in result_triggers:
@ -445,15 +428,65 @@ class AutoPackageTest(object):
if stamp > result[2]:
result[2] = stamp
def failed_tests_for_trigger(self, trigger):
'''Return (src, arch) set for failed tests for given trigger'''
def send_test_request(self, src, arch, trigger):
'''Send out AMQP request for testing src/arch for trigger'''
if self.britney.options.dry_run:
return
params = {'triggers': [trigger]}
if self.britney.options.adt_ppas:
params['ppas'] = self.britney.options.adt_ppas
params = json.dumps(params)
qname = 'debci-%s-%s' % (self.series, arch)
failed = set()
for src, srcinfo in self.test_results.get(trigger, {}).items():
for arch, result in srcinfo.items():
if not result[0]:
failed.add((src, arch))
return failed
if self.amqp_channel:
self.amqp_channel.basic_publish(amqp.Message(src + '\n' + params), routing_key=qname)
else:
assert self.amqp_file
with open(self.amqp_file, 'a') as f:
f.write('%s:%s %s\n' % (qname, src, params))
def pkg_test_request(self, src, arch, trigger):
'''Request one package test for one particular trigger
trigger is "pkgname/version" of the package that triggers the testing
of src.
This will only be done if that test wasn't already requested in a
previous run (i. e. not already in self.pending_tests) or there already
is a result for it. This ensures to download current results for this
package before requesting any test.
'''
# Don't re-request if we already have a result
try:
passed = self.test_results[trigger][src][arch][0]
if passed:
self.log_verbose('%s/%s triggered by %s already passed' % (src, arch, trigger))
return
self.log_verbose('Checking for new results for failed %s/%s for trigger %s' %
(src, arch, trigger))
raise KeyError # fall through
except KeyError:
self.fetch_swift_results(self.britney.options.adt_swift_url, src, arch)
# do we have one now?
try:
self.test_results[trigger][src][arch]
return
except KeyError:
pass
# Don't re-request if it's already pending
arch_list = self.pending_tests.setdefault(trigger, {}).setdefault(src, [])
if arch in arch_list:
self.log_verbose('Test %s/%s for %s is already pending, not queueing' %
(src, arch, trigger))
else:
self.log_verbose('Requesting %s autopkgtest on %s to verify %s' %
(src, arch, trigger))
arch_list.append(arch)
arch_list.sort()
self.send_test_request(src, arch, trigger)
def check_ever_passed(self, src, arch):
'''Check if tests for src ever passed on arch'''
@ -472,114 +505,36 @@ class AutoPackageTest(object):
#
def request(self, packages, excludes=None):
'''Request test runs for verifying packages
"packages" is a list of (trigsrc, trigver) pairs with the packages in
unstable (the "triggers") that need to be tested against their reverse
dependencies (and also their own tests).
"excludes" is an iterable of packages that britney determined to be
uninstallable.
'''
if excludes:
self.excludes.update(excludes)
self.log_verbose('Requested autopkgtests for %s, exclusions: %s' %
self.log_verbose('Requesting autopkgtests for %s, exclusions: %s' %
(['%s/%s' % i for i in packages], str(self.excludes)))
for src, ver in packages:
for arch in self.britney.options.adt_arches:
for (testsrc, testver) in self.tests_for_source(src, ver, arch):
self.add_test_request(testsrc, arch, src + '/' + ver)
if self.britney.options.verbose:
for trigger, srcmap in self.requested_tests.items():
for src, archlist in srcmap.items():
self.log_verbose('Requesting %s autopkgtest on %s to verify %s' %
(src, ' '.join(archlist), trigger))
def submit(self):
# build per-queue request strings for new test requests
# TODO: Once we support version constraints in AMQP requests, add them
# queue_name -> [(pkg, params), ...])
queues = {}
for trigger, srcmap in self.requested_tests.items():
params = {'triggers': [trigger]}
if self.britney.options.adt_ppas:
params['ppas'] = self.britney.options.adt_ppas
for src, archlist in srcmap.items():
for arch in archlist:
qname = 'debci-%s-%s' % (self.series, arch)
queues.setdefault(qname, []).append((src, json.dumps(params)))
amqp_url = self.britney.options.adt_amqp
for (testsrc, _) in self.tests_for_source(src, ver, arch):
self.pkg_test_request(testsrc, arch, src + '/' + ver)
if amqp_url.startswith('amqp://'):
# in production mode, send them out via AMQP
creds = urllib.parse.urlsplit(amqp_url, allow_fragments=False)
with amqp.Connection(creds.hostname, userid=creds.username,
password=creds.password) as amqp_con:
with amqp_con.channel() as ch:
for queue, requests in queues.items():
for (pkg, params) in requests:
ch.basic_publish(amqp.Message(pkg + '\n' + params),
routing_key=queue)
elif amqp_url.startswith('file://'):
# in testing mode, adt_amqp will be a file:// URL
with open(amqp_url[7:], 'a') as f:
for queue, requests in queues.items():
for (pkg, params) in requests:
f.write('%s:%s %s\n' % (queue, pkg, params))
else:
self.log_error('Unknown ADT_AMQP schema in %s' %
self.britney.options.adt_amqp)
# mark them as pending now
self.update_pending_tests()
def collect_requested(self):
'''Update results from swift for all requested packages
This is normally redundant with collect(), but avoids actually
sending test requests if results are already available. This mostly
happens when you have to blow away results.cache and let it rebuild
from scratch.
'''
# build src -> arch -> triggers inverted map
requests_by_src = {}
for trigger, srcmap in self.requested_tests.items():
for src, archlist in srcmap.items():
for arch in archlist:
requests_by_src.setdefault(src, {}).setdefault(arch, set()).add(trigger)
for src, archmap in requests_by_src.items():
for arch, triggers in archmap.items():
self.fetch_swift_results(self.britney.options.adt_swift_url, src, arch)
def collect(self, packages):
'''Update results from swift for all pending packages
Remove pending tests for which we have results.
'''
# build src -> arch -> triggers inverted map
requests_by_src = {}
for trigger, srcmap in self.pending_tests.items():
for src, archlist in srcmap.items():
for arch in archlist:
requests_by_src.setdefault(src, {}).setdefault(arch, set()).add(trigger)
for src, archmap in requests_by_src.items():
for arch, triggers in archmap.items():
self.fetch_swift_results(self.britney.options.adt_swift_url, src, arch)
# also update results for excuses whose tests failed, in case a
# manual retry worked
for (trigpkg, trigver) in packages:
trigger = trigpkg + '/' + trigver
for (src, arch) in self.failed_tests_for_trigger(trigger):
if arch not in self.pending_tests.get(trigger, {}).get(src, []):
self.log_verbose('Checking for new results for failed %s on %s for trigger %s' %
(src, arch, trigger))
self.fetch_swift_results(self.britney.options.adt_swift_url, src, arch)
# update the results cache
# update the results on-disk cache
self.log_verbose('Updating results cache')
with open(self.results_cache_file + '.new', 'w') as f:
json.dump(self.test_results, f, indent=2)
os.rename(self.results_cache_file + '.new', self.results_cache_file)
self.log_verbose('Updated results cache')
# new results remove pending requests, update the on-disk cache
self.update_pending_tests()
# update the pending tests on-disk cache
self.log_verbose('Updated pending requested tests in %s' % self.pending_tests_file)
with open(self.pending_tests_file + '.new', 'w') as f:
json.dump(self.pending_tests, f, indent=2)
os.rename(self.pending_tests_file + '.new', self.pending_tests_file)
def results(self, trigsrc, trigver):
'''Return test results for triggering package

@ -1931,10 +1931,6 @@ class Britney(object):
autopkgtest_excuses.append(e)
autopkgtest_packages.append((e.name, e.ver[1]))
autopkgtest.request(autopkgtest_packages, autopkgtest_excludes)
if not self.options.dry_run:
autopkgtest.collect_requested()
autopkgtest.submit()
autopkgtest.collect(autopkgtest_packages)
cloud_url = "http://autopkgtest.ubuntu.com/packages/%(h)s/%(s)s/%(r)s/%(a)s"
for e in autopkgtest_excuses:
adtpass = True

Loading…
Cancel
Save