From ea1c335992a3d79f7eaeb8d4da88ec183416ac43 Mon Sep 17 00:00:00 2001 From: Martin Pitt Date: Tue, 1 Dec 2015 23:57:52 +0100 Subject: [PATCH] Autopkgtest: Simplify test request and result retrieval workflow Splitting up the processes of request(), submit(), and collect() makes our data structures, house keeping, and code unnecessarily complicated. Drop the latter two and now do all of it in just request(). This avoids having to have a separate requested_test map, having to fetch test results twice, and gets rid of some state keeping. --- autopkgtest.py | 291 +++++++++++++++++++++---------------------------- britney.py | 4 - 2 files changed, 123 insertions(+), 172 deletions(-) diff --git a/autopkgtest.py b/autopkgtest.py index 951b946..28563dc 100644 --- a/autopkgtest.py +++ b/autopkgtest.py @@ -57,10 +57,8 @@ class AutoPackageTest(object): self.excludes = set() self.test_state_dir = os.path.join(britney.options.unstable, 'autopkgtest') - # map of requested tests from request() + # tests requested in this and previous runs # trigger -> src -> [arch] - self.requested_tests = {} - # same map for tests requested in previous runs self.pending_tests = None self.pending_tests_file = os.path.join(self.test_state_dir, 'pending.json') @@ -89,6 +87,32 @@ class AutoPackageTest(object): self.log_verbose('%s does not exist, re-downloading all results ' 'from swift' % self.results_cache_file) + self.setup_amqp() + + def setup_amqp(self): + '''Initialize AMQP connection''' + + self.amqp_channel = None + self.amqp_file = None + if self.britney.options.dry_run: + # in dry-run mode, don't issue any requests + return + + amqp_url = self.britney.options.adt_amqp + + if amqp_url.startswith('amqp://'): + # in production mode, connect to AMQP server + creds = urllib.parse.urlsplit(amqp_url, allow_fragments=False) + self.amqp_con = amqp.Connection(creds.hostname, userid=creds.username, + password=creds.password) + self.amqp_channel = self.amqp_con.channel() + self.log_verbose('Connected to AMQP server %s' % amqp_url) + elif amqp_url.startswith('file://'): + # in testing mode, adt_amqp will be a file:// URL + self.amqp_file = amqp_url[7:] + else: + raise RuntimeError('Unknown ADT_AMQP schema in %s' % amqp_url) + def log_verbose(self, msg): if self.britney.options.verbose: print('I: [%s] - %s' % (time.asctime(), msg)) @@ -251,55 +275,6 @@ class AutoPackageTest(object): self.log_verbose('Read pending requested tests from %s: %s' % (self.pending_tests_file, self.pending_tests)) - def update_pending_tests(self): - '''Update pending tests after submitting requested tests''' - - # merge requested_tests into pending_tests - for trigger, srcmap in self.requested_tests.items(): - for src, archlist in srcmap.items(): - try: - arches = set(self.pending_tests[trigger][src]) - except KeyError: - arches = set() - arches.update(archlist) - self.pending_tests.setdefault(trigger, {})[src] = sorted(arches) - self.requested_tests.clear() - - # write it - with open(self.pending_tests_file + '.new', 'w') as f: - json.dump(self.pending_tests, f, indent=2) - os.rename(self.pending_tests_file + '.new', self.pending_tests_file) - self.log_verbose('Updated pending requested tests in %s' % - self.pending_tests_file) - - def add_test_request(self, src, arch, trigger): - '''Add one test request to the local self.requested_tests queue - - trigger is "pkgname/version" of the package that triggers the testing - of src. - - This will only be done if that test wasn't already requested in a - previous run (i. e. not already in self.pending_tests) or there already - is a result for it. - ''' - # Don't re-request if we already have a result - try: - self.test_results[trigger][src][arch] - self.log_verbose('There already is a result for %s/%s triggered by %s' % - (src, arch, trigger)) - return - except KeyError: - pass - - # Don't re-request if it's already pending - if arch in self.pending_tests.get(trigger, {}).get(src, []): - self.log_verbose('test %s/%s for %s is already pending, not queueing' % - (src, arch, trigger)) - else: - arch_list = self.requested_tests.setdefault(trigger, {}).setdefault(src, []) - assert arch not in arch_list - arch_list.append(arch) - def latest_run_for_package(self, src, arch): '''Return latest run ID for src on arch''' @@ -326,6 +301,13 @@ class AutoPackageTest(object): def fetch_swift_results(self, swift_url, src, arch): '''Download new results for source package/arch from swift''' + # Download results for one particular src/arch at most once in every + # run, as this is expensive + done_entry = src + '/' + arch + if done_entry in self.fetch_swift_results._done: + return + self.fetch_swift_results._done.add(done_entry) + # prepare query: get all runs with a timestamp later than the latest # run_id for this package/arch; '@' is at the end of each run id, to # mark the end of a test run directory path @@ -361,6 +343,8 @@ class AutoPackageTest(object): self.fetch_one_result( os.path.join(swift_url, 'autopkgtest-' + self.series, p, 'result.tar'), src, arch) + fetch_swift_results._done = set() + def fetch_one_result(self, url, src, arch): '''Download one result URL for source/arch @@ -414,17 +398,16 @@ class AutoPackageTest(object): # remove matching test requests for trigger in result_triggers: - for request_map in [self.requested_tests, self.pending_tests]: - try: - arch_list = request_map[trigger][src] - arch_list.remove(arch) - if not arch_list: - del request_map[trigger][src] - if not request_map[trigger]: - del request_map[trigger] - self.log_verbose('-> matches pending request %s/%s for trigger %s' % (src, arch, trigger)) - except (KeyError, ValueError): - self.log_verbose('-> does not match any pending request for %s/%s' % (src, arch)) + try: + arch_list = self.pending_tests[trigger][src] + arch_list.remove(arch) + if not arch_list: + del self.pending_tests[trigger][src] + if not self.pending_tests[trigger]: + del self.pending_tests[trigger] + self.log_verbose('-> matches pending request %s/%s for trigger %s' % (src, arch, trigger)) + except (KeyError, ValueError): + self.log_verbose('-> does not match any pending request for %s/%s' % (src, arch)) # add this result for trigger in result_triggers: @@ -445,15 +428,65 @@ class AutoPackageTest(object): if stamp > result[2]: result[2] = stamp - def failed_tests_for_trigger(self, trigger): - '''Return (src, arch) set for failed tests for given trigger''' + def send_test_request(self, src, arch, trigger): + '''Send out AMQP request for testing src/arch for trigger''' + + if self.britney.options.dry_run: + return + + params = {'triggers': [trigger]} + if self.britney.options.adt_ppas: + params['ppas'] = self.britney.options.adt_ppas + params = json.dumps(params) + qname = 'debci-%s-%s' % (self.series, arch) - failed = set() - for src, srcinfo in self.test_results.get(trigger, {}).items(): - for arch, result in srcinfo.items(): - if not result[0]: - failed.add((src, arch)) - return failed + if self.amqp_channel: + self.amqp_channel.basic_publish(amqp.Message(src + '\n' + params), routing_key=qname) + else: + assert self.amqp_file + with open(self.amqp_file, 'a') as f: + f.write('%s:%s %s\n' % (qname, src, params)) + + def pkg_test_request(self, src, arch, trigger): + '''Request one package test for one particular trigger + + trigger is "pkgname/version" of the package that triggers the testing + of src. + + This will only be done if that test wasn't already requested in a + previous run (i. e. not already in self.pending_tests) or there already + is a result for it. This ensures to download current results for this + package before requesting any test. + ''' + # Don't re-request if we already have a result + try: + passed = self.test_results[trigger][src][arch][0] + if passed: + self.log_verbose('%s/%s triggered by %s already passed' % (src, arch, trigger)) + return + self.log_verbose('Checking for new results for failed %s/%s for trigger %s' % + (src, arch, trigger)) + raise KeyError # fall through + except KeyError: + self.fetch_swift_results(self.britney.options.adt_swift_url, src, arch) + # do we have one now? + try: + self.test_results[trigger][src][arch] + return + except KeyError: + pass + + # Don't re-request if it's already pending + arch_list = self.pending_tests.setdefault(trigger, {}).setdefault(src, []) + if arch in arch_list: + self.log_verbose('Test %s/%s for %s is already pending, not queueing' % + (src, arch, trigger)) + else: + self.log_verbose('Requesting %s autopkgtest on %s to verify %s' % + (src, arch, trigger)) + arch_list.append(arch) + arch_list.sort() + self.send_test_request(src, arch, trigger) def check_ever_passed(self, src, arch): '''Check if tests for src ever passed on arch''' @@ -472,114 +505,36 @@ class AutoPackageTest(object): # def request(self, packages, excludes=None): + '''Request test runs for verifying packages + + "packages" is a list of (trigsrc, trigver) pairs with the packages in + unstable (the "triggers") that need to be tested against their reverse + dependencies (and also their own tests). + + "excludes" is an iterable of packages that britney determined to be + uninstallable. + ''' if excludes: self.excludes.update(excludes) - self.log_verbose('Requested autopkgtests for %s, exclusions: %s' % + self.log_verbose('Requesting autopkgtests for %s, exclusions: %s' % (['%s/%s' % i for i in packages], str(self.excludes))) for src, ver in packages: for arch in self.britney.options.adt_arches: - for (testsrc, testver) in self.tests_for_source(src, ver, arch): - self.add_test_request(testsrc, arch, src + '/' + ver) - - if self.britney.options.verbose: - for trigger, srcmap in self.requested_tests.items(): - for src, archlist in srcmap.items(): - self.log_verbose('Requesting %s autopkgtest on %s to verify %s' % - (src, ' '.join(archlist), trigger)) - - def submit(self): - # build per-queue request strings for new test requests - # TODO: Once we support version constraints in AMQP requests, add them - # queue_name -> [(pkg, params), ...]) - queues = {} - for trigger, srcmap in self.requested_tests.items(): - params = {'triggers': [trigger]} - if self.britney.options.adt_ppas: - params['ppas'] = self.britney.options.adt_ppas - for src, archlist in srcmap.items(): - for arch in archlist: - qname = 'debci-%s-%s' % (self.series, arch) - queues.setdefault(qname, []).append((src, json.dumps(params))) - - amqp_url = self.britney.options.adt_amqp + for (testsrc, _) in self.tests_for_source(src, ver, arch): + self.pkg_test_request(testsrc, arch, src + '/' + ver) - if amqp_url.startswith('amqp://'): - # in production mode, send them out via AMQP - creds = urllib.parse.urlsplit(amqp_url, allow_fragments=False) - with amqp.Connection(creds.hostname, userid=creds.username, - password=creds.password) as amqp_con: - with amqp_con.channel() as ch: - for queue, requests in queues.items(): - for (pkg, params) in requests: - ch.basic_publish(amqp.Message(pkg + '\n' + params), - routing_key=queue) - elif amqp_url.startswith('file://'): - # in testing mode, adt_amqp will be a file:// URL - with open(amqp_url[7:], 'a') as f: - for queue, requests in queues.items(): - for (pkg, params) in requests: - f.write('%s:%s %s\n' % (queue, pkg, params)) - else: - self.log_error('Unknown ADT_AMQP schema in %s' % - self.britney.options.adt_amqp) - - # mark them as pending now - self.update_pending_tests() - - def collect_requested(self): - '''Update results from swift for all requested packages - - This is normally redundant with collect(), but avoids actually - sending test requests if results are already available. This mostly - happens when you have to blow away results.cache and let it rebuild - from scratch. - ''' - # build src -> arch -> triggers inverted map - requests_by_src = {} - for trigger, srcmap in self.requested_tests.items(): - for src, archlist in srcmap.items(): - for arch in archlist: - requests_by_src.setdefault(src, {}).setdefault(arch, set()).add(trigger) - - for src, archmap in requests_by_src.items(): - for arch, triggers in archmap.items(): - self.fetch_swift_results(self.britney.options.adt_swift_url, src, arch) - - def collect(self, packages): - '''Update results from swift for all pending packages - - Remove pending tests for which we have results. - ''' - # build src -> arch -> triggers inverted map - requests_by_src = {} - for trigger, srcmap in self.pending_tests.items(): - for src, archlist in srcmap.items(): - for arch in archlist: - requests_by_src.setdefault(src, {}).setdefault(arch, set()).add(trigger) - - for src, archmap in requests_by_src.items(): - for arch, triggers in archmap.items(): - self.fetch_swift_results(self.britney.options.adt_swift_url, src, arch) - - # also update results for excuses whose tests failed, in case a - # manual retry worked - for (trigpkg, trigver) in packages: - trigger = trigpkg + '/' + trigver - for (src, arch) in self.failed_tests_for_trigger(trigger): - if arch not in self.pending_tests.get(trigger, {}).get(src, []): - self.log_verbose('Checking for new results for failed %s on %s for trigger %s' % - (src, arch, trigger)) - self.fetch_swift_results(self.britney.options.adt_swift_url, src, arch) - - # update the results cache + # update the results on-disk cache + self.log_verbose('Updating results cache') with open(self.results_cache_file + '.new', 'w') as f: json.dump(self.test_results, f, indent=2) os.rename(self.results_cache_file + '.new', self.results_cache_file) - self.log_verbose('Updated results cache') - # new results remove pending requests, update the on-disk cache - self.update_pending_tests() + # update the pending tests on-disk cache + self.log_verbose('Updated pending requested tests in %s' % self.pending_tests_file) + with open(self.pending_tests_file + '.new', 'w') as f: + json.dump(self.pending_tests, f, indent=2) + os.rename(self.pending_tests_file + '.new', self.pending_tests_file) def results(self, trigsrc, trigver): '''Return test results for triggering package diff --git a/britney.py b/britney.py index 727c5b0..e19b16a 100755 --- a/britney.py +++ b/britney.py @@ -1931,10 +1931,6 @@ class Britney(object): autopkgtest_excuses.append(e) autopkgtest_packages.append((e.name, e.ver[1])) autopkgtest.request(autopkgtest_packages, autopkgtest_excludes) - if not self.options.dry_run: - autopkgtest.collect_requested() - autopkgtest.submit() - autopkgtest.collect(autopkgtest_packages) cloud_url = "http://autopkgtest.ubuntu.com/packages/%(h)s/%(s)s/%(r)s/%(a)s" for e in autopkgtest_excuses: adtpass = True