diff --git a/autopkgtest.py b/autopkgtest.py index 5e72f37..7561613 100644 --- a/autopkgtest.py +++ b/autopkgtest.py @@ -454,13 +454,6 @@ class AutoPackageTest(object): (src, ver, arch, ', '.join(['%s/%s' % i for i in triggers]))) def submit(self): - # send AMQP requests for new test requests - # TODO: Once we support version constraints in AMQP requests, add them - arch_queues = {} - for arch in self.britney.options.adt_arches.split(): - arch_queues[arch] = 'debci-%s-%s' % (self.series, arch) - - amqp_url = self.britney.options.adt_amqp def _arches(verinfo): res = set() @@ -476,23 +469,35 @@ class AutoPackageTest(object): res.add(t) return res + # build per-queue request strings for new test requests + # TODO: Once we support version constraints in AMQP requests, add them + # arch → (queue_name, [(pkg, params), ...]) + arch_queues = {} + for arch in self.britney.options.adt_arches.split(): + requests = [] + for pkg, verinfo in self.requested_tests.items(): + if arch in _arches(verinfo): + params = {'triggers': sorted(_trigsources(verinfo))} + requests.append((pkg, json.dumps(params))) + arch_queues[arch] = ('debci-%s-%s' % (self.series, arch), requests) + + amqp_url = self.britney.options.adt_amqp + if amqp_url.startswith('amqp://'): + # in production mode, send them out via AMQP with kombu.Connection(amqp_url) as conn: - for arch in arch_queues: + for arch, (queue, requests) in arch_queues.items(): # don't use SimpleQueue here as it always declares queues; # ACLs might not allow that - with kombu.Producer(conn, routing_key=arch_queues[arch], auto_declare=False) as p: - for pkg, verinfo in self.requested_tests.items(): - if arch in _arches(verinfo): - params = {'triggers': sorted(_trigsources(verinfo))} - p.publish(pkg + '\n' + json.dumps(params)) + with kombu.Producer(conn, routing_key=queue, auto_declare=False) as p: + for (pkg, params) in requests: + p.publish(pkg + '\n' + params) elif amqp_url.startswith('file://'): # in testing mode, adt_amqp will be a file:// URL with open(amqp_url[7:], 'a') as f: - for pkg, verinfo in self.requested_tests.items(): - for arch in _arches(verinfo): - params = {'triggers': sorted(_trigsources(verinfo))} - f.write('%s:%s %s\n' % (arch_queues[arch], pkg, json.dumps(params))) + for arch, (queue, requests) in arch_queues.items(): + for (pkg, params) in requests: + f.write('%s:%s %s\n' % (queue, pkg, params)) else: self.log_error('Unknown ADT_AMQP schema in %s' % self.britney.options.adt_amqp)