diff --git a/autopkgtest.py b/autopkgtest.py index 951b946..28563dc 100644 --- a/autopkgtest.py +++ b/autopkgtest.py @@ -57,10 +57,8 @@ class AutoPackageTest(object): self.excludes = set() self.test_state_dir = os.path.join(britney.options.unstable, 'autopkgtest') - # map of requested tests from request() + # tests requested in this and previous runs # trigger -> src -> [arch] - self.requested_tests = {} - # same map for tests requested in previous runs self.pending_tests = None self.pending_tests_file = os.path.join(self.test_state_dir, 'pending.json') @@ -89,6 +87,32 @@ class AutoPackageTest(object): self.log_verbose('%s does not exist, re-downloading all results ' 'from swift' % self.results_cache_file) + self.setup_amqp() + + def setup_amqp(self): + '''Initialize AMQP connection''' + + self.amqp_channel = None + self.amqp_file = None + if self.britney.options.dry_run: + # in dry-run mode, don't issue any requests + return + + amqp_url = self.britney.options.adt_amqp + + if amqp_url.startswith('amqp://'): + # in production mode, connect to AMQP server + creds = urllib.parse.urlsplit(amqp_url, allow_fragments=False) + self.amqp_con = amqp.Connection(creds.hostname, userid=creds.username, + password=creds.password) + self.amqp_channel = self.amqp_con.channel() + self.log_verbose('Connected to AMQP server %s' % amqp_url) + elif amqp_url.startswith('file://'): + # in testing mode, adt_amqp will be a file:// URL + self.amqp_file = amqp_url[7:] + else: + raise RuntimeError('Unknown ADT_AMQP schema in %s' % amqp_url) + def log_verbose(self, msg): if self.britney.options.verbose: print('I: [%s] - %s' % (time.asctime(), msg)) @@ -251,55 +275,6 @@ class AutoPackageTest(object): self.log_verbose('Read pending requested tests from %s: %s' % (self.pending_tests_file, self.pending_tests)) - def update_pending_tests(self): - '''Update pending tests after submitting requested tests''' - - # merge requested_tests into pending_tests - for trigger, srcmap in self.requested_tests.items(): - for src, archlist in srcmap.items(): - try: - arches = set(self.pending_tests[trigger][src]) - except KeyError: - arches = set() - arches.update(archlist) - self.pending_tests.setdefault(trigger, {})[src] = sorted(arches) - self.requested_tests.clear() - - # write it - with open(self.pending_tests_file + '.new', 'w') as f: - json.dump(self.pending_tests, f, indent=2) - os.rename(self.pending_tests_file + '.new', self.pending_tests_file) - self.log_verbose('Updated pending requested tests in %s' % - self.pending_tests_file) - - def add_test_request(self, src, arch, trigger): - '''Add one test request to the local self.requested_tests queue - - trigger is "pkgname/version" of the package that triggers the testing - of src. - - This will only be done if that test wasn't already requested in a - previous run (i. e. not already in self.pending_tests) or there already - is a result for it. - ''' - # Don't re-request if we already have a result - try: - self.test_results[trigger][src][arch] - self.log_verbose('There already is a result for %s/%s triggered by %s' % - (src, arch, trigger)) - return - except KeyError: - pass - - # Don't re-request if it's already pending - if arch in self.pending_tests.get(trigger, {}).get(src, []): - self.log_verbose('test %s/%s for %s is already pending, not queueing' % - (src, arch, trigger)) - else: - arch_list = self.requested_tests.setdefault(trigger, {}).setdefault(src, []) - assert arch not in arch_list - arch_list.append(arch) - def latest_run_for_package(self, src, arch): '''Return latest run ID for src on arch''' @@ -326,6 +301,13 @@ class AutoPackageTest(object): def fetch_swift_results(self, swift_url, src, arch): '''Download new results for source package/arch from swift''' + # Download results for one particular src/arch at most once in every + # run, as this is expensive + done_entry = src + '/' + arch + if done_entry in self.fetch_swift_results._done: + return + self.fetch_swift_results._done.add(done_entry) + # prepare query: get all runs with a timestamp later than the latest # run_id for this package/arch; '@' is at the end of each run id, to # mark the end of a test run directory path @@ -361,6 +343,8 @@ class AutoPackageTest(object): self.fetch_one_result( os.path.join(swift_url, 'autopkgtest-' + self.series, p, 'result.tar'), src, arch) + fetch_swift_results._done = set() + def fetch_one_result(self, url, src, arch): '''Download one result URL for source/arch @@ -414,17 +398,16 @@ class AutoPackageTest(object): # remove matching test requests for trigger in result_triggers: - for request_map in [self.requested_tests, self.pending_tests]: - try: - arch_list = request_map[trigger][src] - arch_list.remove(arch) - if not arch_list: - del request_map[trigger][src] - if not request_map[trigger]: - del request_map[trigger] - self.log_verbose('-> matches pending request %s/%s for trigger %s' % (src, arch, trigger)) - except (KeyError, ValueError): - self.log_verbose('-> does not match any pending request for %s/%s' % (src, arch)) + try: + arch_list = self.pending_tests[trigger][src] + arch_list.remove(arch) + if not arch_list: + del self.pending_tests[trigger][src] + if not self.pending_tests[trigger]: + del self.pending_tests[trigger] + self.log_verbose('-> matches pending request %s/%s for trigger %s' % (src, arch, trigger)) + except (KeyError, ValueError): + self.log_verbose('-> does not match any pending request for %s/%s' % (src, arch)) # add this result for trigger in result_triggers: @@ -445,15 +428,65 @@ class AutoPackageTest(object): if stamp > result[2]: result[2] = stamp - def failed_tests_for_trigger(self, trigger): - '''Return (src, arch) set for failed tests for given trigger''' + def send_test_request(self, src, arch, trigger): + '''Send out AMQP request for testing src/arch for trigger''' + + if self.britney.options.dry_run: + return + + params = {'triggers': [trigger]} + if self.britney.options.adt_ppas: + params['ppas'] = self.britney.options.adt_ppas + params = json.dumps(params) + qname = 'debci-%s-%s' % (self.series, arch) - failed = set() - for src, srcinfo in self.test_results.get(trigger, {}).items(): - for arch, result in srcinfo.items(): - if not result[0]: - failed.add((src, arch)) - return failed + if self.amqp_channel: + self.amqp_channel.basic_publish(amqp.Message(src + '\n' + params), routing_key=qname) + else: + assert self.amqp_file + with open(self.amqp_file, 'a') as f: + f.write('%s:%s %s\n' % (qname, src, params)) + + def pkg_test_request(self, src, arch, trigger): + '''Request one package test for one particular trigger + + trigger is "pkgname/version" of the package that triggers the testing + of src. + + This will only be done if that test wasn't already requested in a + previous run (i. e. not already in self.pending_tests) or there already + is a result for it. This ensures to download current results for this + package before requesting any test. + ''' + # Don't re-request if we already have a result + try: + passed = self.test_results[trigger][src][arch][0] + if passed: + self.log_verbose('%s/%s triggered by %s already passed' % (src, arch, trigger)) + return + self.log_verbose('Checking for new results for failed %s/%s for trigger %s' % + (src, arch, trigger)) + raise KeyError # fall through + except KeyError: + self.fetch_swift_results(self.britney.options.adt_swift_url, src, arch) + # do we have one now? + try: + self.test_results[trigger][src][arch] + return + except KeyError: + pass + + # Don't re-request if it's already pending + arch_list = self.pending_tests.setdefault(trigger, {}).setdefault(src, []) + if arch in arch_list: + self.log_verbose('Test %s/%s for %s is already pending, not queueing' % + (src, arch, trigger)) + else: + self.log_verbose('Requesting %s autopkgtest on %s to verify %s' % + (src, arch, trigger)) + arch_list.append(arch) + arch_list.sort() + self.send_test_request(src, arch, trigger) def check_ever_passed(self, src, arch): '''Check if tests for src ever passed on arch''' @@ -472,114 +505,36 @@ class AutoPackageTest(object): # def request(self, packages, excludes=None): + '''Request test runs for verifying packages + + "packages" is a list of (trigsrc, trigver) pairs with the packages in + unstable (the "triggers") that need to be tested against their reverse + dependencies (and also their own tests). + + "excludes" is an iterable of packages that britney determined to be + uninstallable. + ''' if excludes: self.excludes.update(excludes) - self.log_verbose('Requested autopkgtests for %s, exclusions: %s' % + self.log_verbose('Requesting autopkgtests for %s, exclusions: %s' % (['%s/%s' % i for i in packages], str(self.excludes))) for src, ver in packages: for arch in self.britney.options.adt_arches: - for (testsrc, testver) in self.tests_for_source(src, ver, arch): - self.add_test_request(testsrc, arch, src + '/' + ver) - - if self.britney.options.verbose: - for trigger, srcmap in self.requested_tests.items(): - for src, archlist in srcmap.items(): - self.log_verbose('Requesting %s autopkgtest on %s to verify %s' % - (src, ' '.join(archlist), trigger)) - - def submit(self): - # build per-queue request strings for new test requests - # TODO: Once we support version constraints in AMQP requests, add them - # queue_name -> [(pkg, params), ...]) - queues = {} - for trigger, srcmap in self.requested_tests.items(): - params = {'triggers': [trigger]} - if self.britney.options.adt_ppas: - params['ppas'] = self.britney.options.adt_ppas - for src, archlist in srcmap.items(): - for arch in archlist: - qname = 'debci-%s-%s' % (self.series, arch) - queues.setdefault(qname, []).append((src, json.dumps(params))) - - amqp_url = self.britney.options.adt_amqp + for (testsrc, _) in self.tests_for_source(src, ver, arch): + self.pkg_test_request(testsrc, arch, src + '/' + ver) - if amqp_url.startswith('amqp://'): - # in production mode, send them out via AMQP - creds = urllib.parse.urlsplit(amqp_url, allow_fragments=False) - with amqp.Connection(creds.hostname, userid=creds.username, - password=creds.password) as amqp_con: - with amqp_con.channel() as ch: - for queue, requests in queues.items(): - for (pkg, params) in requests: - ch.basic_publish(amqp.Message(pkg + '\n' + params), - routing_key=queue) - elif amqp_url.startswith('file://'): - # in testing mode, adt_amqp will be a file:// URL - with open(amqp_url[7:], 'a') as f: - for queue, requests in queues.items(): - for (pkg, params) in requests: - f.write('%s:%s %s\n' % (queue, pkg, params)) - else: - self.log_error('Unknown ADT_AMQP schema in %s' % - self.britney.options.adt_amqp) - - # mark them as pending now - self.update_pending_tests() - - def collect_requested(self): - '''Update results from swift for all requested packages - - This is normally redundant with collect(), but avoids actually - sending test requests if results are already available. This mostly - happens when you have to blow away results.cache and let it rebuild - from scratch. - ''' - # build src -> arch -> triggers inverted map - requests_by_src = {} - for trigger, srcmap in self.requested_tests.items(): - for src, archlist in srcmap.items(): - for arch in archlist: - requests_by_src.setdefault(src, {}).setdefault(arch, set()).add(trigger) - - for src, archmap in requests_by_src.items(): - for arch, triggers in archmap.items(): - self.fetch_swift_results(self.britney.options.adt_swift_url, src, arch) - - def collect(self, packages): - '''Update results from swift for all pending packages - - Remove pending tests for which we have results. - ''' - # build src -> arch -> triggers inverted map - requests_by_src = {} - for trigger, srcmap in self.pending_tests.items(): - for src, archlist in srcmap.items(): - for arch in archlist: - requests_by_src.setdefault(src, {}).setdefault(arch, set()).add(trigger) - - for src, archmap in requests_by_src.items(): - for arch, triggers in archmap.items(): - self.fetch_swift_results(self.britney.options.adt_swift_url, src, arch) - - # also update results for excuses whose tests failed, in case a - # manual retry worked - for (trigpkg, trigver) in packages: - trigger = trigpkg + '/' + trigver - for (src, arch) in self.failed_tests_for_trigger(trigger): - if arch not in self.pending_tests.get(trigger, {}).get(src, []): - self.log_verbose('Checking for new results for failed %s on %s for trigger %s' % - (src, arch, trigger)) - self.fetch_swift_results(self.britney.options.adt_swift_url, src, arch) - - # update the results cache + # update the results on-disk cache + self.log_verbose('Updating results cache') with open(self.results_cache_file + '.new', 'w') as f: json.dump(self.test_results, f, indent=2) os.rename(self.results_cache_file + '.new', self.results_cache_file) - self.log_verbose('Updated results cache') - # new results remove pending requests, update the on-disk cache - self.update_pending_tests() + # update the pending tests on-disk cache + self.log_verbose('Updated pending requested tests in %s' % self.pending_tests_file) + with open(self.pending_tests_file + '.new', 'w') as f: + json.dump(self.pending_tests, f, indent=2) + os.rename(self.pending_tests_file + '.new', self.pending_tests_file) def results(self, trigsrc, trigver): '''Return test results for triggering package diff --git a/britney.py b/britney.py index 727c5b0..e19b16a 100755 --- a/britney.py +++ b/britney.py @@ -1931,10 +1931,6 @@ class Britney(object): autopkgtest_excuses.append(e) autopkgtest_packages.append((e.name, e.ver[1])) autopkgtest.request(autopkgtest_packages, autopkgtest_excludes) - if not self.options.dry_run: - autopkgtest.collect_requested() - autopkgtest.submit() - autopkgtest.collect(autopkgtest_packages) cloud_url = "http://autopkgtest.ubuntu.com/packages/%(h)s/%(s)s/%(r)s/%(a)s" for e in autopkgtest_excuses: adtpass = True