New source package downloading framework in ubuntutools.archive. Use in many scripts.

pull-lp-source: str() exceptions before passing to Logger (LP: #695523)
This commit is contained in:
Benjamin Drung 2011-01-20 00:37:16 +01:00
commit 76876a11f0
29 changed files with 1267 additions and 618 deletions

View File

@ -24,17 +24,15 @@ import shutil
import subprocess
import sys
import tempfile
import urllib
from debian.deb822 import Dsc
from launchpadlib.launchpad import Launchpad
import lsb_release
from ubuntutools.archive import UbuntuSourcePackage, DownloadError
from ubuntutools.config import UDTConfig, ubu_email
from ubuntutools.builder import get_builder
from ubuntutools.logger import Logger
from ubuntutools.question import YesNoQuestion
from ubuntutools.misc import dsc_url
def error(msg):
Logger.error(msg)
@ -44,7 +42,7 @@ def check_call(cmd, *args, **kwargs):
Logger.command(cmd)
ret = subprocess.call(cmd, *args, **kwargs)
if ret != 0:
error('%s returned %d.' % (cmd, ret))
error('%s returned %d.' % (cmd[0], ret))
def parse(args):
usage = 'Usage: %prog [options] <source package name or .dsc URL/file>'
@ -167,63 +165,26 @@ def find_release_package(launchpad, package, version, source_release):
return srcpkg
def find_version_package(launchpad, package, version):
ubuntu = launchpad.distributions['ubuntu']
archive = ubuntu.main_archive
try:
# Might get more than one (i.e. same version in multiple
# releases), but they should all be identical
return archive.getPublishedSources(source_name=package,
version=version)[0]
except IndexError:
error('Version %s of package %s was never published in Ubuntu.' %
(version, package))
def find_package(launchpad, mirror, workdir, package, version, source_release):
"Returns the SourcePackage"
if package.endswith('.dsc'):
return UbuntuSourcePackage(version=version, dscfile=package,
workdir=workdir, lp=launchpad,
mirrors=[mirror])
def dscurls_from_package(launchpad, mirror, package, version, source_release):
if not source_release and not version:
source_release = launchpad.distributions['ubuntu'].current_series.name
component = None
# If source_release is specified, then version is just for verification
if source_release:
srcpkg = find_release_package(launchpad, package, version,
source_release)
else:
srcpkg = find_version_package(launchpad, package, version)
version = srcpkg.source_package_version
component = srcpkg.component_name
urls = []
if mirror:
urls.append(dsc_url(mirror, srcpkg.component_name, package,
srcpkg.source_package_version))
for source_file in srcpkg.sourceFileUrls():
if source_file.endswith('.dsc'):
urls.append(urllib.unquote(source_file))
return urls
else:
error('Package %s contains no .dsc file.' % package)
def dscurl_from_dsc(package):
path = os.path.abspath(os.path.expanduser(package))
if os.path.exists(path):
return 'file://%s' % path
else:
# Can't resolve it as a local path? Let's just hope it's good as-is
return package
def fetch_package(launchpad, mirror, workdir, package, version, source_release):
# Returns the path to the .dsc file that was fetched
if package.endswith('.dsc'):
dscs = [dscurl_from_dsc(package)]
else:
dscs = dscurls_from_package(launchpad, mirror, package, version,
source_release)
for dsc in dscs:
cmd = ('dget', '--download-only', '--allow-unauthenticated', dsc)
Logger.command(cmd)
ret = subprocess.call(cmd, cwd=workdir)
if ret == 0:
return os.path.join(workdir, os.path.basename(dsc))
return UbuntuSourcePackage(package, version, component, workdir=workdir,
lp=launchpad, mirrors=[mirror])
def get_backport_version(version, suffix, upload, release):
backport_version = version + ('~%s1' % release)
@ -239,7 +200,7 @@ def get_backport_dist(upload, release):
else:
return release
def do_build(workdir, package, release, bp_version, builder, update):
def do_build(workdir, dsc, release, builder, update):
builder = get_builder(builder)
if not builder:
return
@ -248,12 +209,11 @@ def do_build(workdir, package, release, bp_version, builder, update):
if 0 != builder.update(release):
sys.exit(1)
return builder.build(os.path.join(workdir,
'%s_%s.dsc' % (package, bp_version)),
return builder.build(os.path.join(workdir, dsc),
release,
os.path.join(workdir, "buildresult"))
def do_upload(workdir, package, bp_version, upload, prompt):
def do_upload(workdir, package, bp_version, changes, upload, prompt):
print 'Please check %s %s in file://%s carefully!' % \
(package, bp_version, workdir)
if prompt or upload == 'ubuntu':
@ -262,20 +222,21 @@ def do_upload(workdir, package, bp_version, upload, prompt):
if answer == "no":
return
changes_file = '%s_%s_source.changes' % (package, bp_version)
check_call(['dput', upload, changes_file], cwd=workdir)
check_call(['dput', upload, changes], cwd=workdir)
def do_backport(workdir, pkg, suffix, release, build, builder, update, upload,
prompt):
dirname = '%s-%s' % (pkg.source, release)
pkg.unpack(dirname)
srcdir = os.path.join(workdir, dirname)
def do_backport(workdir, package, dscfile, version, suffix, release, build,
builder, update, upload, prompt):
check_call(['dpkg-source', '-x', dscfile, package], cwd=workdir)
srcdir = os.path.join(workdir, package)
bp_version = get_backport_version(version, suffix, upload, release)
bp_version = get_backport_version(pkg.version.full_version, suffix,
upload, release)
bp_dist = get_backport_dist(upload, release)
check_call(['dch',
'--force-bad-version',
'--allow-lower-version',
'--force-distribution',
'--preserve',
'--newversion', bp_version,
'--distribution', bp_dist,
@ -283,15 +244,14 @@ def do_backport(workdir, package, dscfile, version, suffix, release, build,
cwd=srcdir)
check_call(['debuild', '--no-lintian', '-S', '-sa'], cwd=srcdir)
if ':' in bp_version:
bp_version = bp_version[bp_version.find(':')+1:]
fn_base = pkg.source + '_' + bp_version.split(':', 1)[-1]
if build:
if 0 != do_build(workdir, package, release, bp_version, builder,
update):
if 0 != do_build(workdir, fn_base + '.dsc', release, builder, update):
sys.exit(1)
if upload:
do_upload(workdir, package, bp_version, upload, prompt)
do_upload(workdir, pkg.source, bp_version, fn_base + '.changes',
upload, prompt)
shutil.rmtree(srcdir)
@ -320,22 +280,17 @@ def main(args):
os.makedirs(workdir)
try:
dscfile = fetch_package(launchpad,
opts.ubuntu_mirror,
workdir,
package_or_dsc,
opts.version,
opts.source_release)
dsc = Dsc(open(os.path.join(workdir, dscfile)))
package = dsc['Source']
version = dsc['Version']
pkg = find_package(launchpad,
opts.ubuntu_mirror,
workdir,
package_or_dsc,
opts.version,
opts.source_release)
pkg.pull()
for release in opts.dest_releases:
do_backport(workdir,
package,
dscfile,
version,
pkg,
opts.suffix,
release,
opts.build,
@ -343,6 +298,8 @@ def main(args):
opts.update,
opts.upload,
opts.prompt)
except DownloadError, e:
error(str(e))
finally:
if not opts.workdir:
shutil.rmtree(workdir)

8
debian/changelog vendored
View File

@ -1,9 +1,15 @@
ubuntu-dev-tools (0.112) UNRELEASED; urgency=low
[ Robert Collins ]
* manage-credentials: Finish migrating away from the Launchpad 'edge' service
root. (LP: #704657)
-- Robert Collins <robertc@robertcollins.net> Wed, 19 Jan 2011 12:03:40 +1300
[ Stefano Rivera ]
* New source package downloading framework in ubuntutools.archive. Use in
many scripts.
* pull-lp-source: str() exceptions before passing to Logger (LP: #695523)
-- Benjamin Drung <bdrung@debian.org> Thu, 20 Jan 2011 00:36:35 +0100
ubuntu-dev-tools (0.111) natty; urgency=low

1
debian/clean vendored
View File

@ -1 +1,2 @@
*.egg-info/*
test-data/example_*

4
debian/control vendored
View File

@ -56,7 +56,9 @@ Recommends: bzr,
python-magic,
python-soappy,
reportbug (>= 3.39ubuntu1)
Suggests: python-simplejson | python (>= 2.7), qemu-kvm-extras-static
Suggests: debian-keyring,
python-simplejson | python (>= 2.7),
qemu-kvm-extras-static
Breaks: ${python:Breaks}
Description: useful tools for Ubuntu developers
This is a collection of useful tools that Ubuntu developers use to make their

11
debian/copyright vendored
View File

@ -27,6 +27,7 @@ Copyright: 2007, Albert Damen <albrt@gmx.net>
2010, Evan Broder <evan@ebroder.net>
2006-2007, Luke Yelavich <themuso@ubuntu.com>
2009-2010, Michael Bienia <geser@ubuntu.com>
2010-2011, Stefano Rivera <stefanor@ubuntu.com>
2008, Stephan Hermann <sh@sourcecode.de>
2007, Steve Kowalik <stevenk@ubuntu.com>
License: GPL-2
@ -183,6 +184,8 @@ Files: doc/pull-debian-debdiff.1,
pull-debian-debdiff,
sponsor-patch,
suspicious-source,
test-data/*,
ubuntutools/archive.py,
ubuntutools/builder.py,
ubuntutools/config.py,
ubuntutools/control.py,
@ -193,10 +196,10 @@ Files: doc/pull-debian-debdiff.1,
ubuntutools/update_maintainer.py,
update-maintainer,
wrap-and-sort
Copyright: 2010, Benjamin Drung <bdrung@ubuntu.com>
2010, Evan Broder <evan@ebroder.net>
2008, Siegfried-Angel Gevatter Pujals <rainct@ubuntu.com>
2010, Stefano Rivera <stefanor@ubuntu.com>
Copyright: 2010, Benjamin Drung <bdrung@ubuntu.com>
2010, Evan Broder <evan@ebroder.net>
2008, Siegfried-Angel Gevatter Pujals <rainct@ubuntu.com>
2010-2011, Stefano Rivera <stefanor@ubuntu.com>
License: ISC
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above

View File

@ -94,6 +94,13 @@ the default of "production".
Do not read any configuration files, or configuration from environment
variables.
.SH ENVIRONMENT
.TP
.BR DEBFULLNAME ", " DEBEMAIL ", " UBUMAIL
Used to determine the uploader (if not supplied as options).
See
.BR ubuntu\-dev\-tools (5)
for details.
.P
All of the \fBCONFIGURATION VARIABLES\fR below are also supported as
environment variables.
Variables in the environment take precedence to those in configuration

View File

@ -121,6 +121,7 @@ The default value for \fB--lpinstance\fR.
.SH SEE ALSO
.BR rmadison (1),
.BR syncpackage (1),
.BR ubuntu\-dev\-tools (5)
.SH AUTHOR

View File

@ -12,7 +12,8 @@ primary archive or PPA starting from a pristine Debian package.
Debian ones, as the common script used by Ubuntu archive administrators does,
this way you can preserve source files integrity between the two distributions.
.PP
\fBsyncpackage\fR will detect source tarballs with mismatching checksums and will automatically create fake syncs instead.
\fBsyncpackage\fR will detect source tarballs with mismatching checksums
and will automatically create fake syncs instead.
.SH WARNING
The use of \fBsyncpackage\fR is discouraged by the Ubuntu Archive
Administrators, as it introduces an unnecessary window for error.
@ -40,12 +41,15 @@ Specify the version to sync from.
Specify the component to sync from.
.TP
\fB\-v\fR, \fB\-\-verbose\fR
print more information
Display more progress information.
.TP
\fB\-n\fI UPLOADER\fR, \fB\-\-uploader\fR=\fIUPLOADER\fR
Use UPLOADER as the name and email address of the
maintainer for this upload instead of evaluating
DEBFULLNAME and DEBEMAIL.
\fB\-n\fI UPLOADER_NAME\fR, \fB\-\-uploader\-name\fR=\fIUPLOADER_NAME\fR
Use UPLOADER_NAME as the name of the maintainer for this upload instead
of evaluating DEBFULLNAME and UBUMAIL.
.TP
\fB\-e\fI UPLOADER_EMAIL\fR, \fB\-\-uploader\-email\fR=\fIUPLOADER_EMAIL\fR
Use UPLOADER_EMAIL as the email address of the maintainer for this
upload instead of evaluating DEBEMAIL and UBUMAIL.
.TP
\fB\-k\fI KEYID\fR, \fB\-\-key\fR=\fIKEYID\fR
Specify the key ID to be used for signing.
@ -55,7 +59,49 @@ Do not sign the upload.
.TP
\fB\-b\fI BUG\fR, \fB\-\-bug\fR=\fIBUG\fR
Mark a Launchpad bug as being fixed by this upload.
.PP
.TP
.B \-d \fIDEBIAN_MIRROR\fR, \fB\-\-debian\-mirror\fR=\fIDEBIAN_MIRROR\fR
Use the specified mirror.
Should be in the form \fBhttp://ftp.debian.org/debian\fR.
If the package isn't found on this mirror, \fBsyncpackage\fR will fall
back to the default mirror.
.TP
.B \-s \fIUBUNTU_MIRROR\fR, \fB\-\-debsec\-mirror\fR=\fIUBUNTU_MIRROR\fR
Use the specified Debian security mirror.
Should be in the form \fBhttp://archive.ubuntu.com/ubuntu\fR.
If the package isn't found on this mirror, \fBsyncpackage\fR will fall
back to the default mirror.
.TP
.B \-\-no\-conf
Do not read any configuration files, or configuration from environment
variables.
.SH ENVIRONMENT
.TP
.BR DEBFULLNAME ", " DEBEMAIL ", " UBUMAIL
Used to determine the uploader (if not supplied as options).
See
.BR ubuntu\-dev\-tools (5)
for details.
.P
All of the \fBCONFIGURATION VARIABLES\fR below are also supported as
environment variables.
Variables in the environment take precedence to those in configuration
files.
.SH CONFIGURATION VARIABLES
The following variables can be set in the environment or in
.BR ubuntu\-dev\-tools (5)
configuration files.
In each case, the script\-specific variable takes precedence over the
package\-wide variable.
.TP
.BR SYNCPACKAGE_DEBIAN_MIRROR ", " UBUNTUTOOLS_DEBIAN_MIRROR
The default value for \fB\-\-debian\-mirror\fR.
.TP
.BR SYNCPACKAGE_UBUNTU_MIRROR ", " UBUNTUTOOLS_DEBSEC_MIRROR
The default value for \fB\-\-ubuntu\-mirror\fR.
.SH SEE ALSO
.BR requestsync (1),
.BR ubuntu\-dev\-tools (5)
.SH AUTHOR
\fBsyncpackage\fR was written by Martin Pitt <martin.pitt@canonical.com> and Benjamin Drung <bdrung@ubuntu.com>.
.PP

View File

@ -43,6 +43,8 @@ In addition, several scripts use the following environment variables:
.B UBUMAIL
Overrides \fBDEBEMAIL\fR and \fBDEBFULLNAME\fR when the target is
clearly Ubuntu.
Can either contain an e-mail address or \fBFull Name
<email@example.org>\fR.
.TP
.BR DEBEMAIL ", " DEBFULLNAME

View File

@ -2,7 +2,7 @@
# pull-debian-debdiff - find and download a specific version of a Debian
# package and its immediate parent to generate a debdiff.
#
# Copyright (C) 2010, Stefano Rivera <stefanor@ubuntu.com>
# Copyright (C) 2010-2011, Stefano Rivera <stefanor@ubuntu.com>
# Inspired by a tool of the same name by Kees Cook.
#
# Permission to use, copy, modify, and/or distribute this software for any
@ -17,119 +17,19 @@
# OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
# PERFORMANCE OF THIS SOFTWARE.
import hashlib
import optparse
import os.path
import subprocess
import sys
import urllib2
import debian.debian_support
import debian.changelog
from ubuntutools.archive import DebianSourcePackage, DownloadError
from ubuntutools.config import UDTConfig
from ubuntutools.logger import Logger
from ubuntutools.misc import dsc_name, dsc_url
DEFAULT_DEBIAN_MIRROR = 'http://ftp.debian.org/debian'
DEFAULT_DEBSEC_MIRROR = 'http://security.debian.org'
def pull(package, version, opts, unpack=False):
"Download Debian source package version version"
urls = []
# TODO: Not all packages are main :)
# Practically this is fine, as it'll be found on snapshot, but still ugly.
if opts.debsec_mirror and opts.debsec_mirror != DEFAULT_DEBSEC_MIRROR:
urls.append(dsc_url(opts.debsec_mirror, 'main', package, version))
urls.append(dsc_url(DEFAULT_DEBSEC_MIRROR, 'main', package, version))
if opts.debian_mirror and opts.debian_mirror != DEFAULT_DEBIAN_MIRROR:
urls.append(dsc_url(opts.debian_mirror, 'main', package, version))
urls.append(dsc_url(DEFAULT_DEBIAN_MIRROR, 'main', package, version))
for url in urls:
cmd = ('dget', '-u' + ('x' if unpack else 'd'), url)
Logger.command(cmd)
return_code = subprocess.call(cmd)
if return_code == 0:
return True
Logger.normal('Trying snapshot.debian.org')
return pull_from_snapshot(package, version, unpack)
def pull_from_snapshot(package, version, unpack=False):
"Download Debian source package version version from snapshot.debian.org"
try:
import json
except ImportError:
import simplejson as json
except ImportError:
Logger.error("Please install python-simplejson.")
sys.exit(1)
try:
srcfiles = json.load(urllib2.urlopen(
'http://snapshot.debian.org/mr/package/%s/%s/srcfiles'
% (package, version)))
except urllib2.HTTPError:
Logger.error('Version %s of %s not found on snapshot.debian.org',
version, package)
return False
for hash_ in srcfiles['result']:
hash_ = hash_['hash']
try:
info = json.load(urllib2.urlopen(
'http://snapshot.debian.org/mr/file/%s/info' % hash_))
except urllib2.URLError:
Logger.error('Unable to dowload info for hash.')
return False
filename = info['result'][0]['name']
if '/' in filename:
Logger.error('Unacceptable file name: %s', filename)
return False
if os.path.exists(filename):
source_file = open(filename, 'r')
sha1 = hashlib.sha1()
sha1.update(source_file.read())
source_file.close()
if sha1.hexdigest() == hash_:
Logger.normal('Using existing %s', filename)
continue
Logger.normal('Downloading: %s (%0.3f MiB)', filename,
info['result'][0]['size'] / 1024.0 / 1024)
try:
in_ = urllib2.urlopen('http://snapshot.debian.org/file/%s' % hash_)
out = open(filename, 'w')
while True:
block = in_.read(10240)
if block == '':
break
out.write(block)
sys.stdout.write('.')
sys.stdout.flush()
sys.stdout.write('\n')
sys.stdout.flush()
out.close()
except urllib2.URLError:
Logger.error('Error downloading %s', filename)
return False
if unpack:
cmd = ('dpkg-source', '--no-check', '-x', dsc_name(package, version))
Logger.command(cmd)
subprocess.check_call(cmd)
return True
def previous_version(package, version, distance):
"Given an (extracted) package, determine the version distance versions ago"
upver = version
if ':' in upver:
upver = upver.split(':', 1)[1]
upver = upver.split('-')[0]
upver = debian.debian_support.Version(version).upstream_version
filename = '%s-%s/debian/changelog' % (package, upver)
changelog_file = open(filename, 'r')
changelog = debian.changelog.Changelog(changelog_file.read())
@ -175,11 +75,17 @@ def main():
opts.debian_mirror = config.get_value('DEBIAN_MIRROR')
if opts.debsec_mirror is None:
opts.debsec_mirror = config.get_value('DEBSEC_MIRROR')
mirrors = [opts.debsec_mirror, opts.debian_mirror]
Logger.normal('Downloading %s %s', package, version)
if not pull(package, version, opts, unpack=not opts.fetch_only):
Logger.error("Couldn't locate version %s of %s.", version, package)
newpkg = DebianSourcePackage(package, version, mirrors=mirrors)
try:
newpkg.pull()
except DownloadError, e:
Logger.error(str(e))
sys.exit(1)
newpkg.unpack()
if opts.fetch_only:
sys.exit(0)
@ -189,22 +95,15 @@ def main():
Logger.error('No previous version could be found')
sys.exit(1)
Logger.normal('Downloading %s %s', package, oldversion)
if not pull(package, oldversion, opts, unpack=True):
Logger.error("Couldn't locate version %s of %s.", oldversion, package)
sys.exit(1)
cmd = ('debdiff', dsc_name(package, oldversion), dsc_name(package, version))
Logger.command(cmd)
difffn = dsc_name(package, version)[:-3] + 'debdiff'
debdiff_file = open(difffn, 'w')
if subprocess.call(cmd, stdout=debdiff_file) > 2:
Logger.error('Debdiff failed.')
oldpkg = DebianSourcePackage(package, oldversion, mirrors=mirrors)
try:
oldpkg.pull()
except DownloadError, e:
Logger.error(str(e))
sys.exit(1)
debdiff_file.close()
cmd = ('diffstat', '-p0', difffn)
Logger.command(cmd)
subprocess.check_call(cmd)
print difffn
oldpkg.unpack()
print 'file://' + oldpkg.debdiff(newpkg, diffstat=True)
if __name__ == '__main__':
main()

View File

@ -25,16 +25,16 @@
import os
import sys
import subprocess
import urllib
from optparse import OptionParser
from ubuntutools.archive import UbuntuSourcePackage
from ubuntutools.config import UDTConfig
from ubuntutools.logger import Logger
from ubuntutools.lp.lpapicache import Distribution, Launchpad
from ubuntutools.lp.udtexceptions import (SeriesNotFoundException,
PackageNotFoundException, PocketDoesNotExistError)
from ubuntutools.misc import split_release_pocket, dsc_url
PackageNotFoundException,
PocketDoesNotExistError)
from ubuntutools.misc import split_release_pocket
def main():
usage = "Usage: %prog <package> [release]"
@ -71,39 +71,24 @@ def main():
try:
(release, pocket) = split_release_pocket(release)
except PocketDoesNotExistError, error:
Logger.error(error)
except PocketDoesNotExistError, e:
Logger.error(str(e))
sys.exit(1)
try:
spph = Distribution('ubuntu').getArchive().getSourcePackage(package,
release,
pocket)
except (SeriesNotFoundException, PackageNotFoundException), error:
Logger.error(error)
except (SeriesNotFoundException, PackageNotFoundException), e:
Logger.error(str(e))
sys.exit(1)
urls = []
if options.ubuntu_mirror:
urls.append(dsc_url(options.ubuntu_mirror, spph.getComponent(),
package, spph.getVersion()))
dsc_url = [url for url in spph.sourceFileUrls() if url.endswith('.dsc')]
assert dsc_url, 'No .dsc file found'
urls.append(urllib.unquote(dsc_url[0]))
Logger.normal('Fetching the source for %s from %s (%s)...',
package, release.capitalize(), pocket)
for url in urls:
cmd = ('dget', '-u' + ('d' if options.download_only else 'x'), url)
Logger.command(cmd)
return_code = subprocess.call(cmd)
if return_code == 0:
Logger.normal("Success!")
sys.exit(0)
Logger.error('Failed to fetch and extrace the source. '
'Please check the output for the error.')
sys.exit(1)
srcpkg = UbuntuSourcePackage(package, spph.getVersion(),
component=spph.getComponent(),
mirrors=[options.ubuntu_mirror])
srcpkg.pull()
if not options.download_only:
srcpkg.unpack()
if __name__ == '__main__':
main()

View File

@ -1,8 +1,9 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
#
# Copyright (C) 2008-2010 Martin Pitt <martin.pitt@canonical.com>
# 2010 Benjamin Drung <bdrung@ubuntu.com>
# Copyright (C) 2008-2010 Martin Pitt <martin.pitt@canonical.com>,
# 2010 Benjamin Drung <bdrung@ubuntu.com>,
# 2010 Stefano Rivera <stefanor@ubuntu.com>
#
# ##################################################################
#
@ -21,67 +22,21 @@
import debian.deb822
import debian.debian_support
import hashlib
import optparse
import os
import re
import shutil
import subprocess
import sys
import urllib
# ubuntu-dev-tools modules
from ubuntutools.archive import DebianSourcePackage, UbuntuSourcePackage
from ubuntutools.config import UDTConfig, ubu_email
from ubuntutools.requestsync.mail import getDebianSrcPkg \
as requestsync_mail_getDebianSrcPkg
from ubuntutools.requestsync.lp import getDebianSrcPkg, getUbuntuSrcPkg
from ubuntutools.logger import Logger
from ubuntutools.lp import udtexceptions
from ubuntutools.lp.lpapicache import Launchpad
class File(object):
def __init__(self, url, checksum, size):
self.url = url
self.name = os.path.basename(url)
self.checksum = checksum
self.size = size
def __repr__(self):
return self.name + " (" + self.checksum + " " + self.size + \
") source " + str(bool(self.is_source_file()))
def __eq__(self, other):
return self.name == other.name and self.checksum == other.checksum and \
self.size == other.size
def get_name(self):
return self.name
def is_source_file(self):
return re.match(".*\.orig.*\.tar\..*", self.name)
def download(self, script_name=None, verbose=False):
'''Download file (by URL) to the current directory.
If the file is already present, this function does nothing.'''
file_exists = os.path.exists(self.name)
if file_exists:
# Check for correct checksum
md5 = hashlib.md5()
md5.update(open(self.name).read())
file_exists = md5.hexdigest() == self.checksum
if not file_exists:
if verbose:
print '%s: I: Downloading %s...' % (script_name, self.url)
try:
urllib.urlretrieve(self.url, self.name)
except IOError as err:
parameters = (script_name, self.name, err.errno, err.strerror)
print >> sys.stderr, ("%s: Error: Failed to download %s "
"[Errno %i]: %s.") % parameters
sys.exit(1)
class Version(debian.debian_support.Version):
def strip_epoch(self):
@ -107,19 +62,9 @@ class Version(debian.debian_support.Version):
return Version(related_debian_version)
def is_modified_in_ubuntu(self):
return self.full_version.find('ubuntu') > 0
return 'ubuntu' in self.full_version
def quote_parameter(parameter):
if parameter.find(" ") >= 0:
return '"' + parameter + '"'
else:
return parameter
def print_command(script_name, cmd):
cmd = [quote_parameter(x) for x in cmd]
print "%s: I: %s" % (script_name, " ".join(cmd))
def remove_signature(dscname):
'''Removes the signature from a .dsc file if the .dsc file is signed.'''
@ -142,25 +87,6 @@ def remove_signature(dscname):
dsc_file.writelines(unsigned_file)
dsc_file.close()
def dsc_getfiles(dscurl, script_name):
'''Return list of files in a .dsc file (excluding the .dsc file itself).'''
basepath = os.path.dirname(dscurl)
dsc = debian.deb822.Dsc(urllib.urlopen(dscurl))
if 'Files' not in dsc:
parameters = (script_name, os.path.basename(dscurl))
print >> sys.stderr, ("%s: Error: No Files field found in the dsc "
"file. Please check %s!") % parameters
sys.exit(1)
files = []
for source_file in dsc['Files']:
url = os.path.join(basepath, source_file['name'])
if not source_file['name'].endswith('.dsc'):
files.append(File(url, source_file['md5sum'], source_file['size']))
return files
def add_fixed_bugs(changes, bugs):
'''Add additional Launchpad bugs to the list of fixed bugs in changes
file.'''
@ -181,120 +107,51 @@ def add_fixed_bugs(changes, bugs):
return "\n".join(changes + [""])
def sync_dsc(script_name, dscurl, debian_dist, release, name, email, bugs,
keyid=None, verbose=False):
assert dscurl.endswith(".dsc")
dscname = os.path.basename(dscurl)
basepath = os.path.dirname(dscurl)
(srcpkg, new_ver) = dscname.split('_')
def sync_dsc(src_pkg, debian_dist, release, name, email, bugs, ubuntu_mirror,
keyid=None):
uploader = name + " <" + email + ">"
if os.path.exists(os.path.join(basepath, dscname)):
dscfile = dscurl
else:
try:
urllib.urlretrieve(dscurl, dscname)
except IOError as error:
parameters = (script_name, dscname, error.errno, error.strerror)
print >> sys.stderr, ("%s: Error: Failed to download %s "
"[Errno %i]: %s.") % parameters
sys.exit(1)
dscfile = debian.deb822.Dsc(file(dscname))
if "Version" not in dscfile:
print >> sys.stderr, ("%s: Error: No Version field found in the dsc "
"file. Please check %s!") % (script_name, dscname)
sys.exit(1)
new_ver = Version(dscfile["Version"])
src_pkg.pull_dsc()
new_ver = Version(src_pkg.dsc["Version"])
try:
ubuntu_source = getUbuntuSrcPkg(srcpkg, release)
ubuntu_source = getUbuntuSrcPkg(src_pkg.source, release)
ubuntu_ver = Version(ubuntu_source.getVersion())
ubuntu_dsc = [f for f in ubuntu_source.sourceFileUrls()
if f.endswith(".dsc")]
assert len(ubuntu_dsc) == 1
ubuntu_dsc = ubuntu_dsc[0]
ubu_pkg = UbuntuSourcePackage(src_pkg.source, ubuntu_ver.full_version,
ubuntu_source.getComponent(),
mirrors=[ubuntu_mirror])
ubu_pkg.pull_dsc()
need_orig = ubuntu_ver.upstream_version != new_ver.upstream_version
except udtexceptions.PackageNotFoundException:
ubuntu_ver = Version('~')
ubuntu_dsc = None
ubu_pkg = None
need_orig = True
Logger.info('%s does not exist in Ubuntu.', name)
# No need to continue if version is not greater than current one
if new_ver <= ubuntu_ver:
parameters = (script_name, srcpkg, new_ver, ubuntu_ver)
print >> sys.stderr, ("%s: Error: %s version %s is not greater than "
"already available %s") % parameters
sys.exit(1)
if verbose:
print '%s: D: Source %s: current version %s, new version %s' % \
(script_name, srcpkg, ubuntu_ver, new_ver)
files = dsc_getfiles(dscurl, script_name)
source_files = [f for f in files if f.is_source_file()]
if verbose:
print '%s: D: Files: %s' % (script_name,
str([x.get_name() for x in files]))
print '%s: D: Source files: %s' % \
(script_name, str([x.get_name() for x in source_files]))
[f.download(script_name, verbose) for f in files]
if ubuntu_dsc is None:
ubuntu_files = None
else:
ubuntu_files = dsc_getfiles(ubuntu_dsc, script_name)
# do we need the orig.tar.gz?
need_orig = True
fakesync_files = []
if ubuntu_ver.upstream_version == new_ver.upstream_version:
# We need to check if all .orig*.tar.* tarballs exist in Ubuntu
need_orig = False
for source_file in source_files:
ubuntu_file = [f for f in ubuntu_files
if f.get_name() == source_file.get_name()]
if len(ubuntu_file) == 0:
# The source file does not exist in Ubuntu
if verbose:
parameters = (script_name, source_file.get_name())
print "%s: I: %s does not exist in Ubuntu." % parameters
need_orig = True
elif not ubuntu_file[0] == source_file:
# The checksum of the files mismatch -> We need a fake sync
parameters = (script_name, source_file.get_name())
print ("%s: Warning: The checksum of the file %s mismatch. "
"A fake sync is required.") % parameters
fakesync_files.append(ubuntu_file[0])
if verbose:
print "%s: D: Ubuntu version: %s" % (script_name,
ubuntu_file[0])
print "%s: D: Debian version: %s" % (script_name,
source_file)
if verbose:
print '%s: D: needs source tarball: %s' % (script_name, str(need_orig))
Logger.debug('Source %s: current version %s, new version %s',
src_pkg.source, ubuntu_ver, new_ver)
Logger.debug('Needs source tarball: %s', str(need_orig))
cur_ver = ubuntu_ver.get_related_debian_version()
if ubuntu_ver.is_modified_in_ubuntu():
params = (script_name, ubuntu_ver.full_version, cur_ver.full_version)
print ('%s: Warning: Overwriting modified Ubuntu version %s, '
'setting current version to %s') % params
Logger.warn('Overwriting modified Ubuntu version %s, '
'setting current version to %s',
ubuntu_ver.full_version, cur_ver.full_version)
# extract package
cmd = ['dpkg-source', '-x', dscname]
env = os.environ
env['DEB_VENDOR'] = 'Ubuntu'
if not verbose:
cmd.insert(1, "-q")
if verbose:
print_command(script_name, cmd)
subprocess.check_call(cmd, env=env)
src_pkg.pull()
src_pkg.unpack()
# Do a fake sync if required
if len(fakesync_files) > 0:
fakesync = not ubu_pkg.verify_orig()
if fakesync:
Logger.warn('The checksums of the Debian and Ubuntu packages mismatch. '
'A fake sync is required.')
# Download Ubuntu files (override Debian source tarballs)
[f.download(script_name, verbose) for f in fakesync_files]
ubu_pkg.pull()
# change into package directory
directory = srcpkg + '-' + new_ver.upstream_version
if verbose:
print_command(script_name, ["cd", directory])
directory = src_pkg.source + '-' + new_ver.upstream_version
Logger.command(('cd', directory))
os.chdir(directory)
# read Debian distribution from debian/changelog if not specified
@ -302,10 +159,10 @@ def sync_dsc(script_name, dscurl, debian_dist, release, name, email, bugs,
line = open("debian/changelog").readline()
debian_dist = line.split(" ")[2].strip(";")
if len(fakesync_files) == 0:
if not fakesync:
# create the changes file
changes_filename = "%s_%s_source.changes" % \
(srcpkg, new_ver.strip_epoch())
(src_pkg.source, new_ver.strip_epoch())
cmd = ["dpkg-genchanges", "-S", "-v" + cur_ver.full_version,
"-DDistribution=" + release,
"-DOrigin=debian/" + debian_dist,
@ -314,20 +171,17 @@ def sync_dsc(script_name, dscurl, debian_dist, release, name, email, bugs,
cmd.append("-sa")
else:
cmd.append("-sd")
if not verbose:
if not Logger.verbose:
cmd += ["-q"]
if verbose:
print_command(script_name, cmd + [">", "../" + changes_filename])
changes = subprocess.Popen(cmd, stdout=subprocess.PIPE,
env={"DEB_VENDOR": "Ubuntu"}).communicate()[0]
Logger.command(cmd + ['>', '../' + changes_filename])
changes = subprocess.Popen(cmd, stdout=subprocess.PIPE).communicate()[0]
# Add additional bug numbers
if len(bugs) > 0:
changes = add_fixed_bugs(changes, bugs)
# remove extracted (temporary) files
if verbose:
print_command(script_name, ["cd", ".."])
Logger.command(('cd', '..'))
os.chdir('..')
shutil.rmtree(directory, True)
@ -337,181 +191,191 @@ def sync_dsc(script_name, dscurl, debian_dist, release, name, email, bugs,
changes_file.close()
# remove signature and sign package
remove_signature(dscname)
remove_signature(src_pkg.dsc_name)
if keyid is not False:
cmd = ["debsign", changes_filename]
if not keyid is None:
cmd.insert(1, "-k" + keyid)
if verbose:
print_command(script_name, cmd)
Logger.command(cmd)
subprocess.check_call(cmd)
else:
# Create fakesync changelog entry
new_ver = Version(new_ver.full_version + "fakesync1")
changes_filename = "%s_%s_source.changes" % \
(srcpkg, new_ver.strip_epoch())
(src_pkg.source, new_ver.strip_epoch())
if len(bugs) > 0:
message = "Fake sync due to mismatching orig tarball (LP: %s)." % \
(", ".join(["#" + str(b) for b in bugs]))
else:
message = "Fake sync due to mismatching orig tarball."
cmd = ["dch", "-v", new_ver.full_version, "-D", release, message]
env = {"DEBFULLNAME": name, "DEBEMAIL": email}
if verbose:
print_command(script_name, cmd)
cmd = ['dch', '-v', new_ver.full_version, '--force-distribution',
'-D', release, message]
env = {'DEBFULLNAME': name, 'DEBEMAIL': email}
Logger.command(cmd)
subprocess.check_call(cmd, env=env)
# update the Maintainer field
cmd = ["update-maintainer"]
if not verbose:
if not Logger.verbose:
cmd.append("-q")
if verbose:
print_command(script_name, cmd)
Logger.command(cmd)
subprocess.check_call(cmd)
# Build source package
cmd = ["debuild", "--no-lintian", "-S", "-v" + cur_ver.full_version]
env = os.environ
env['DEB_VENDOR'] = 'Ubuntu'
if need_orig:
cmd += ['-sa']
if keyid:
cmd += ["-k" + keyid]
if verbose:
print_command(script_name, cmd)
returncode = subprocess.call(cmd, env=env)
Logger.command(cmd)
returncode = subprocess.call(cmd)
if returncode != 0:
print >> sys.stderr, ("%s: Error: Source-only build with debuild "
"failed. Please check build log above.") % \
(script_name)
Logger.error('Source-only build with debuild failed. '
'Please check build log above.')
sys.exit(1)
def get_debian_dscurl(package, dist, release, version=None, component=None):
def fetch_source_pkg(package, dist, version, component, ubuntu_release, mirror):
"""Download the specified source package.
dist, version, component, mirror can all be None.
"""
if package.endswith('.dsc'):
return DebianSourcePackage(dscfile=package, mirrors=[mirror])
if dist is None:
dist = "unstable"
requested_version = version
if type(version) == str:
version = Version(version)
if version is None or component is None:
debian_srcpkg = getDebianSrcPkg(package, dist)
try:
src_pkg = getUbuntuSrcPkg(package, release)
ubuntu_version = Version(src_pkg.getVersion())
except udtexceptions.PackageNotFoundException:
ubuntu_version = Version('~')
if ubuntu_version >= Version(debian_srcpkg.getVersion()):
# The LP importer is maybe out of date
debian_srcpkg = requestsync_mail_getDebianSrcPkg(package, dist)
debian_srcpkg = getDebianSrcPkg(package, dist)
except (udtexceptions.PackageNotFoundException,
udtexceptions.SeriesNotFoundException), e:
Logger.error(str(e))
sys.exit(1)
if version is None:
version = Version(debian_srcpkg.getVersion())
try:
ubuntu_srcpkg = getUbuntuSrcPkg(package, ubuntu_release)
ubuntu_version = Version(ubuntu_srcpkg.getVersion())
except udtexceptions.PackageNotFoundException:
ubuntu_version = Version('~')
except udtexceptions.SeriesNotFoundException, e:
Logger.error(str(e))
sys.exit(1)
if ubuntu_version >= version:
# The LP importer is maybe out of date
debian_srcpkg = requestsync_mail_getDebianSrcPkg(package, dist)
if requested_version is None:
version = Version(debian_srcpkg.getVersion())
if ubuntu_version >= version:
Logger.error("Version in Debian %s (%s) isn't newer than "
"Ubuntu %s (%s)",
version, dist, ubuntu_version, ubuntu_release)
sys.exit(1)
if component is None:
component = debian_srcpkg.getComponent()
assert component in ("main", "contrib", "non-free")
assert component in ('main', 'contrib', 'non-free')
if package.startswith("lib"):
group = package[0:4]
else:
group = package[0]
dsc_file = package + "_" + version.strip_epoch() + ".dsc"
dscurl = os.path.join("http://ftp.debian.org/debian/pool", component, group,
package, dsc_file)
return dscurl
return DebianSourcePackage(package, version.full_version, component,
mirrors=[mirror])
def main():
script_name = os.path.basename(sys.argv[0])
usage = "%s [options] <.dsc URL/path or package name>" % (script_name)
epilog = "See %s(1) for more info." % (script_name)
usage = "%prog [options] <.dsc URL/path or package name>"
epilog = "See %s(1) for more info." % os.path.basename(sys.argv[0])
parser = optparse.OptionParser(usage=usage, epilog=epilog)
parser.add_option("-d", "--distribution", type="string",
parser.add_option("-d", "--distribution",
dest="dist", default=None,
help="Debian distribution to sync from.")
parser.add_option("-r", "--release", dest="release", default=None,
parser.add_option("-r", "--release",
dest="release", default=None,
help="Specify target Ubuntu release.")
parser.add_option("-V", "--debian-version", dest="debversion", default=None,
parser.add_option("-V", "--debian-version",
dest="debversion", default=None,
help="Specify the version to sync from.")
parser.add_option("-c", "--component", dest="component", default=None,
parser.add_option("-c", "--component",
dest="component", default=None,
help="Specify the Debian component to sync from.")
parser.add_option("-v", "--verbose", help="print more information",
dest="verbose", action="store_true", default=False)
parser.add_option("-n", "--uploader-name", dest="uploader_name",
parser.add_option("-v", "--verbose",
dest="verbose", action="store_true", default=False,
help="Display more progress information.")
parser.add_option("-n", "--uploader-name",
dest="uploader_name", default=None,
help="Use UPLOADER_NAME as the name of the maintainer "
"for this upload instead of evaluating DEBFULLNAME.",
default=None)
parser.add_option("-e", "--uploader-email", dest="uploader_email",
"for this upload.")
parser.add_option("-e", "--uploader-email",
dest="uploader_email", default=None,
help="Use UPLOADER_EMAIL as email address of the "
"maintainer for this upload instead of evaluating "
"DEBEMAIL.", default=None)
parser.add_option("-k", "--key", dest="keyid", default=None,
"maintainer for this upload.")
parser.add_option("-k", "--key",
dest="keyid", default=None,
help="Specify the key ID to be used for signing.")
parser.add_option('--dont-sign', dest='keyid', action='store_false',
help='Do not sign the upload')
parser.add_option('--dont-sign',
dest='keyid', action='store_false',
help='Do not sign the upload.')
parser.add_option("-b", "--bug", metavar="BUG",
help="Mark a Launchpad bug as being fixed by this upload",
dest="bugs", action="append", default=list())
dest="bugs", action="append", default=list(),
help="Mark Launchpad bug BUG as being fixed by this "
"upload.")
parser.add_option('-D', '--debian-mirror', metavar='DEBIAN_MIRROR',
dest='debian_mirror',
help='Preferred Debian mirror '
'(default: %s)'
% UDTConfig.defaults['DEBIAN_MIRROR'])
parser.add_option('-U', '--ubuntu-mirror', metavar='UBUNTU_MIRROR',
dest='ubuntu_mirror',
help='Prefeed Ubuntu mirror '
'(default: %s)'
% UDTConfig.defaults['UBUNTU_MIRROR'])
parser.add_option('--no-conf',
dest='no_conf', default=False, action='store_true',
help="Don't read config files or environment variables.")
(options, args) = parser.parse_args()
if len(args) == 0:
print >> sys.stderr, ("%s: Error: No .dsc URL/path or package name "
"specified.") % (script_name)
sys.exit(1)
elif len(args) > 1:
parameters = (script_name, ", ".join(args))
print >> sys.stderr, ("%s: Error: Multiple .dsc URLs/paths or "
"package names specified: %s") % parameters
sys.exit(1)
parser.error('No .dsc URL/path or package name specified.')
if len(args) > 1:
parser.error('Multiple .dsc URLs/paths or package names specified: '
+ ', '.join(args))
invalid_bug_numbers = [bug for bug in options.bugs if not bug.isdigit()]
if len(invalid_bug_numbers) > 0:
print >> sys.stderr, "%s: Error: Invalid bug number(s) specified: %s" \
% (script_name, ", ".join(invalid_bug_numbers))
sys.exit(1)
parser.error('Invalid bug number(s) specified: '
+ ', '.join(invalid_bug_numbers))
if options.component not in (None, "main", "contrib", "non-free"):
parser.error('%s is not a valid Debian component. '
'It should be one of main, contrib, or non-free.'
% options.component)
Logger.verbose = options.verbose
config = UDTConfig(options.no_conf)
if options.debian_mirror is None:
options.debian_mirror = config.get_value('DEBIAN_MIRROR')
if options.ubuntu_mirror is None:
options.ubuntu_mirror = config.get_value('UBUNTU_MIRROR')
if options.uploader_name is None:
if "DEBFULLNAME" in os.environ:
options.uploader_name = os.environ["DEBFULLNAME"]
else:
print >> sys.stderr, ("%s: Error: No uploader name specified. You "
"must pass the --uploader-name option or set "
"the DEBFULLNAME environment variable.") % \
(script_name)
sys.exit(1)
options.uploader_name = ubu_email(export=False)[0]
if options.uploader_email is None:
if "DEBEMAIL" in os.environ:
options.uploader_email = os.environ["DEBEMAIL"]
else:
print >> sys.stderr, ("%s: Error: No uploader email address "
"specified. You must pass the "
"--uploader-email option or set the DEBEMAIL"
" environment variable.") % (script_name)
sys.exit(1)
options.uploader_email = ubu_email(export=False)[1]
Launchpad.login_anonymously()
if options.release is None:
options.release = Launchpad.distributions["ubuntu"].current_series.name
if args[0].endswith(".dsc"):
dscurl = args[0]
else:
if options.component not in (None, "main", "contrib", "non-free"):
parameters = (script_name, options.component)
print >> sys.stderr, ("%s: Error: %s is not a valid Debian "
"component. It should be one of main, "
"contrib, or non-free.") % parameters
sys.exit(1)
dscurl = get_debian_dscurl(args[0], options.dist, options.release,
options.debversion, options.component)
os.environ['DEB_VENDOR'] = 'Ubuntu'
if options.verbose:
print "%s: D: .dsc url: %s" % (script_name, dscurl)
sync_dsc(script_name, dscurl, options.dist, options.release,
options.uploader_name, options.uploader_email, options.bugs,
options.keyid, options.verbose)
src_pkg = fetch_source_pkg(args[0], options.dist, options.debversion,
options.component, options.release,
options.debian_mirror)
sync_dsc(src_pkg, options.dist, options.release, options.uploader_name,
options.uploader_email, options.bugs, options.ubuntu_mirror,
options.keyid)
if __name__ == "__main__":
main()

View File

@ -0,0 +1 @@
upstream

View File

@ -0,0 +1 @@
7

View File

@ -0,0 +1,12 @@
Source: example
Section: misc
Priority: extra
Maintainer: Ubuntu Developers <ubuntu-dev-team@lists.alioth.debian.org>
Build-Depends: debhelper (>= 7.0.50~)
Standards-Version: 3.9.1
Package: example
Architecture: all
Depends: ${misc:Depends}, ${shlibs:Depends}
Description: Example package for testing purposes
An example package used by the test suite. Useless.

View File

@ -0,0 +1,17 @@
Format: http://svn.debian.org/wsvn/dep/web/deps/dep5.mdwn?op=file&rev=152
Source: https://launchpad.net/ubuntu-dev-tools
Files: *
Copyright: 2010-2011, Stefano Rivera <stefanor@ubuntu.com>
License: ISC
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH
REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT,
INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR
OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
PERFORMANCE OF THIS SOFTWARE.

View File

@ -0,0 +1,4 @@
#!/usr/bin/make -f
%:
dh $@

View File

@ -0,0 +1 @@
3.0 (quilt)

View File

@ -0,0 +1 @@
abort-on-upstream-changes

525
ubuntutools/archive.py Normal file
View File

@ -0,0 +1,525 @@
# archive.py - Functions for dealing with Debian source packages, archives,
# and mirrors.
#
# Copyright (C) 2010-2011, Stefano Rivera <stefanor@ubuntu.com>
#
# Permission to use, copy, modify, and/or distribute this software for any
# purpose with or without fee is hereby granted, provided that the above
# copyright notice and this permission notice appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH
# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
# AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT,
# INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
# LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR
# OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
# PERFORMANCE OF THIS SOFTWARE.
"""Pull source packages from archives.
Approach:
1. Pull dsc from Launchpad (this is over https and can authenticate the
rest of the source package)
2. Attempt to pull the remaining files from:
1. existing files
2. mirrors
3. Launchpad
3. Verify checksums.
"""
from __future__ import with_statement
import hashlib
import os.path
import subprocess
import urllib2
import urlparse
import re
import sys
import debian.deb822
import debian.debian_support
from ubuntutools.config import UDTConfig
from ubuntutools.logger import Logger
from ubuntutools.lp.lpapicache import (Launchpad, Distribution,
SourcePackagePublishingHistory)
class DownloadError(Exception):
"Unable to pull a source package"
pass
class Dsc(debian.deb822.Dsc):
"Extend deb822's Dsc with checksum verification abilities"
def get_strongest_checksum(self):
"Return alg, dict by filename of size, hash_ pairs"
if 'Checksums-Sha256' in self:
return ('sha256',
dict((entry['name'], (int(entry['size']), entry['sha256']))
for entry in self['Checksums-Sha256']))
if 'Checksums-Sha1' in self:
return ('sha1',
dict((entry['name'], (int(entry['size']), entry['sha1']))
for entry in self['Checksums-Sha1']))
return ('md5',
dict((entry['name'], (int(entry['size']), entry['md5sum']))
for entry in self['Files']))
def verify_file(self, pathname):
"Verify that pathname matches the checksums in the dsc"
if os.path.isfile(pathname):
alg, checksums = self.get_strongest_checksum()
size, digest = checksums[os.path.basename(pathname)]
if os.path.getsize(pathname) != size:
return False
hash_func = getattr(hashlib, alg)()
f = open(pathname, 'rb')
while True:
buf = f.read(hash_func.block_size)
if buf == '':
break
hash_func.update(buf)
f.close()
return hash_func.hexdigest() == digest
return False
class SourcePackage(object):
"""Base class for source package downloading.
Use DebianSourcePackage or UbuntuSourcePackage instead of using this
directly.
"""
distribution = 'unknown'
def __init__(self, package=None, version=None, component=None,
dscfile=None, lp=None, mirrors=(), workdir='.'):
"Can be initialised either using package, version or dscfile"
assert ((package is not None and version is not None)
or dscfile is not None)
self.source = package
self._lp = lp
self.workdir = workdir
# Cached values:
self._component = component
self._dsc = None
self._spph = None
# State:
self._dsc_fetched = False
# Mirrors
self._dsc_source = dscfile
self.mirrors = list(mirrors)
self.masters = [UDTConfig.defaults['%s_MIRROR'
% self.distribution.upper()]]
if dscfile is not None:
if self.source is None:
self.source = 'unknown'
if version is None:
version = 'unknown'
self.version = debian.debian_support.Version(version)
@property
def lp_spph(self):
"Return the LP Source Package Publishing History entry"
if not self._spph:
if not Launchpad.logged_in:
if self._lp:
Launchpad.login_existing(self._lp)
else:
Launchpad.login_anonymously()
spph = (Distribution(self.distribution).getArchive()
.getPublishedSources(
source_name=self.source,
version=self.version.full_version,
exact_match=True,
))
self._spph = SourcePackagePublishingHistory(spph[0])
return self._spph
@property
def component(self):
"Cached archive component, in available"
if not self._component:
Logger.debug('Determining component from Launchpad')
self._component = self.lp_spph.getComponent()
return self._component
@property
def dsc_name(self):
"Return the source package dsc filename for the given package"
version = self.version.upstream_version
if self.version.debian_version:
version += '-' + self.version.debian_version
return '%s_%s.dsc' % (self.source, version)
@property
def dsc_pathname(self):
"Return the dsc_name, with the workdir path"
return os.path.join(self.workdir, self.dsc_name)
@property
def dsc(self):
"Return a the Dsc"
if not self._dsc:
if self._dsc_fetched:
self._dsc = Dsc(file(self.dsc_pathname, 'rb').read())
return self._dsc
def _mirror_url(self, mirror, filename):
"Build a source package URL on a mirror"
if self.source.startswith('lib'):
group = self.source[:4]
else:
group = self.source[0]
return os.path.join(mirror, 'pool', self.component, group,
self.source, filename)
def _lp_url(self, filename):
"Build a source package URL on Launchpad"
return os.path.join('https://launchpad.net', self.distribution,
'+archive', 'primary', '+files', filename)
def _source_urls(self, name):
"Generator of sources for name"
if self._dsc_source:
yield os.path.join(os.path.dirname(self._dsc_source), name)
for mirror in self.mirrors:
yield self._mirror_url(mirror, name)
for mirror in self.masters:
if mirror not in self.mirrors:
yield self._mirror_url(mirror, name)
yield self._lp_url(name)
def pull_dsc(self):
"Retrieve dscfile and parse"
if self._dsc_source:
parsed = urlparse.urlparse(self._dsc_source)
if parsed.scheme == '':
self._dsc_source = 'file://' + os.path.abspath(self._dsc_source)
parsed = urlparse.urlparse(self._dsc_source)
source_is_workdir = (os.path.realpath(os.path.dirname(parsed.path))
== os.path.realpath(self.workdir))
if not (parsed.scheme == 'file' and source_is_workdir):
if not self._download_file(self._dsc_source, self.dsc_name):
raise DownloadError('dsc not found')
else:
if not self._download_file(self._lp_url(self.dsc_name),
self.dsc_name):
raise DownloadError('dsc not found')
self._check_dsc()
def _check_dsc(self, verify_signature=False):
"Check that the dsc matches what we are expecting"
assert os.path.exists(self.dsc_pathname)
self._dsc_fetched = True
old_pathname = self.dsc_pathname
self.source = self.dsc['Source']
self.version = debian.debian_support.Version(self.dsc['Version'])
# If source or version was previously unknown
if self.dsc_pathname != old_pathname:
os.rename(old_pathname, self.dsc_pathname)
valid = False
message = None
gpg_info = None
try:
gpg_info = self.dsc.get_gpg_info()
valid = gpg_info.valid()
except IOError:
message = ('Signature on %s could not be verified, install '
'debian-keyring' % self.dsc_name)
if message is None:
if valid:
message = 'Valid signature'
else:
message = ('Signature on %s could not be verified'
% self.dsc_name)
if gpg_info is not None:
if 'GOODSIG' in gpg_info:
message = ('Good signature by %s (0x%s)'
% (gpg_info['GOODSIG'][1], gpg_info['GOODSIG'][0]))
elif 'VALIDSIG' in gpg_info:
message = 'Valid signature by 0x%s' % gpg_info['VALIDSIG'][0]
if verify_signature:
if valid:
Logger.normal(message)
else:
Logger.error(message)
raise DownloadError(message)
else:
Logger.info(message)
def _download_file(self, url, filename):
"Download url to filename in workdir."
logurl = url
if os.path.basename(url) != filename:
logurl += ' -> ' + filename
pathname = os.path.join(self.workdir, filename)
if self.dsc and not url.endswith('.dsc'):
if self.dsc.verify_file(pathname):
Logger.debug('Using existing %s', filename)
return True
size = [entry['size'] for entry in self.dsc['Files']
if entry['name'] == filename]
assert len(size) == 1
size = int(size[0])
Logger.normal('Downloading %s (%0.3f MiB)', logurl,
size / 1024.0 / 1024)
else:
Logger.normal('Downloading %s', logurl)
try:
in_ = urllib2.urlopen(url)
except urllib2.URLError:
return False
with open(pathname, 'wb') as out:
while True:
block = in_.read(10240)
if block == '':
break
out.write(block)
Logger.stdout.write('.')
Logger.stdout.flush()
in_.close()
Logger.stdout.write(' done\n')
Logger.stdout.flush()
if self.dsc and not url.endswith('.dsc'):
if not self.dsc.verify_file(pathname):
Logger.error('Checksum does not match.')
return False
return True
def pull(self):
"Pull into workdir"
if self.dsc is None:
self.pull_dsc()
for entry in self.dsc['Files']:
name = entry['name']
for url in self._source_urls(name):
try:
if self._download_file(url, name):
break
except urllib2.HTTPError, e:
Logger.normal('HTTP Error %i: %s', e.code, str(e))
except urllib2.URLError, e:
Logger.normal('URL Error: %s', e.reason)
else:
raise DownloadError('File %s could not be found' % name)
def verify(self):
"""Verify that the source package in workdir matches the dsc.
Return boolean
"""
return all(self.dsc.verify_file(os.path.join(self.workdir,
entry['name']))
for entry in self.dsc['Files'])
def verify_orig(self):
"""Verify that the .orig files in workdir match the dsc.
Return boolean
"""
orig_re = re.compile(r'.*\.orig(-[^.]+)?\.tar\.[^.]+$')
return all(self.dsc.verify_file(os.path.join(self.workdir,
entry['name']))
for entry in self.dsc['Files']
if orig_re.match(entry['name']))
def unpack(self, destdir=None):
"Unpack in workdir"
cmd = ['dpkg-source', '-x', self.dsc_name]
if destdir:
cmd.append(destdir)
Logger.command(cmd)
if subprocess.call(cmd, cwd=self.workdir):
Logger.error('Source unpack failed.')
sys.exit(1)
def debdiff(self, newpkg, diffstat=False):
"""Write a debdiff comparing this src pkg to a newer one.
Optionally print diffstat.
Return the debdiff filename.
"""
cmd = ['debdiff', self.dsc_name, newpkg.dsc_name]
difffn = newpkg.dsc_name[:-3] + 'debdiff'
Logger.command(cmd + ['> %s' % difffn])
with open(difffn, 'w') as f:
if subprocess.call(cmd, stdout=f, cwd=self.workdir) > 2:
Logger.error('Debdiff failed.')
sys.exit(1)
if diffstat:
cmd = ('diffstat', '-p1', difffn)
Logger.command(cmd)
if subprocess.call(cmd):
Logger.error('diffstat failed.')
sys.exit(1)
return os.path.abspath(difffn)
class DebianSourcePackage(SourcePackage):
"Download / unpack a Debian source package"
distribution = 'debian'
def __init__(self, *args, **kwargs):
super(DebianSourcePackage, self).__init__(*args, **kwargs)
self.masters.append(UDTConfig.defaults['DEBSEC_MIRROR'])
# Cached values:
self._snapshot_list = None
# Overridden methods:
@property
def lp_spph(self):
"Return the LP Source Package Publishing History entry"
if not self._spph:
try:
return super(DebianSourcePackage, self).lp_spph
except IndexError:
pass
Logger.normal('Using rmadison for component determination')
comp = 'main'
for record in rmadison(self.distribution, self.source):
if record.get('source') != self.source:
continue
comp = record['component']
if record['version'] == self.version.full_version:
self._spph = FakeSPPH(record['source'], record['version'],
comp)
return self._spph
Logger.normal('Guessing component from most recent upload')
self._spph = FakeSPPH(self.source, self.version.full_version, comp)
return self._spph
def _source_urls(self, name):
"Generator of sources for name"
wrapped_iterator = super(DebianSourcePackage, self)._source_urls(name)
while True:
try:
yield wrapped_iterator.next()
except StopIteration:
break
if self.snapshot_list:
yield self._snapshot_url(name)
def pull_dsc(self):
"Retrieve dscfile and parse"
try:
super(DebianSourcePackage, self).pull_dsc()
return
except DownloadError:
pass
# Not all Debian Source packages get imported to LP
# (or the importer could be lagging)
for url in self._source_urls(self.dsc_name):
if self._download_file(url, self.dsc_name):
break
else:
raise DownloadError('dsc could not be found anywhere')
self._check_dsc(verify_signature=True)
# Local methods:
@property
def snapshot_list(self):
"Return a filename -> hash dictionary from snapshot.debian.org"
if self._snapshot_list is None:
try:
import json
except ImportError:
import simplejson as json
except ImportError:
Logger.error("Please install python-simplejson.")
raise DownloadError("Unable to dowload from "
"snapshot.debian.org without "
"python-simplejson")
try:
srcfiles = json.load(urllib2.urlopen(
'http://snapshot.debian.org'
'/mr/package/%s/%s/srcfiles?fileinfo=1'
% (self.source, self.version.full_version)))
except urllib2.HTTPError:
Logger.error('Version %s of %s not found on '
'snapshot.debian.org',
self.version.full_version, self.source)
self._snapshot_list = False
return False
self._snapshot_list = dict((info[0]['name'], hash_)
for hash_, info
in srcfiles['fileinfo'].iteritems())
return self._snapshot_list
def _snapshot_url(self, name):
"Return the snapshot.debian.org URL for name"
return os.path.join('http://snapshot.debian.org/file',
self.snapshot_list[name])
class UbuntuSourcePackage(SourcePackage):
"Download / unpack an Ubuntu source package"
distribution = 'ubuntu'
class FakeSPPH(object):
"""Provide the same interface as
ubuntutools.lpapicache.SourcePackagePublishingHistory
"""
def __init__(self, name, version, component):
self.name = name
self.version = version
self.component = component
def getPackageName(self):
return self.name
def getVersion(self):
return self.version
def getComponent(self):
return self.component
def rmadison(url, package, suite=None, arch=None):
"Call rmadison and parse the result"
cmd = ['rmadison', '-u', url]
if suite:
cmd += ['-s', suite]
if arch:
cmd += ['-a', arch]
cmd.append(package)
process = subprocess.Popen(cmd, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, close_fds=True)
output = process.communicate()[0]
assert process.wait() == 0
for line in output.strip().splitlines():
pkg, ver, dist, archs = [x.strip() for x in line.split('|')]
comp = 'main'
if '/' in dist:
dist, comp = dist.split('/')
archs = set(x.strip() for x in archs.split(','))
if 'source' in archs:
yield {
'source': pkg,
'version': ver,
'suite': dist,
'component': comp,
}
archs.discard('source')
if archs:
yield {
'binary': pkg,
'version': ver,
'suite': dist,
'component': comp,
'architectures': archs,
}

View File

@ -34,11 +34,11 @@ class UDTConfig(object):
# These are reqired to be used by at least two scripts.
defaults = {
'BUILDER': 'pbuilder',
'DEBIAN_MIRROR': None,
'DEBSEC_MIRROR': None,
'DEBIAN_MIRROR': 'http://ftp.debian.org/debian',
'DEBSEC_MIRROR': 'http://security.debian.org',
'LPINSTANCE': 'production',
'MIRROR_FALLBACK': True,
'UBUNTU_MIRROR': None,
'UBUNTU_MIRROR': 'http://archive.ubuntu.com/ubuntu',
'UPDATE_BUILDER': False,
'WORKDIR': None,
}
@ -74,7 +74,7 @@ class UDTConfig(object):
f.close()
return config
def get_value(self, key, default=None, boolean=False, compat_keys=[]):
def get_value(self, key, default=None, boolean=False, compat_keys=()):
"""Retrieve a value from the environment or configuration files.
keys are prefixed with the script name, falling back to UBUNTUTOOLS for
package-wide keys.

View File

@ -20,6 +20,12 @@
import os
import sys
def escape_arg(arg):
"Shell-escpae arg, if necessary"
if ' ' not in arg:
return arg
return '"%s"' % arg.replace('\\', r'\\').replace('"', r'\"')
class Logger(object):
script_name = os.path.basename(sys.argv[0])
verbose = False
@ -30,10 +36,9 @@ class Logger(object):
@classmethod
def command(cls, cmd):
if cls.verbose:
for i in xrange(len(cmd)):
if cmd[i].find(" ") >= 0:
cmd[i] = '"' + cmd[i] + '"'
print >> cls.stdout, "%s: I: %s" % (cls.script_name, " ".join(cmd))
print >> cls.stdout, "%s: I: %s" % (cls.script_name,
" ".join(escape_arg(arg)
for arg in cmd))
@classmethod
def debug(cls, message, *args):

View File

@ -56,7 +56,7 @@ class _Launchpad(object):
def login(self, service=service):
'''Enforce a non-anonymous login.'''
if '_Launchpad__lp' not in self.__dict__:
if not self.logged_in:
try:
self.__lp = libsupport.get_launchpad('ubuntu-dev-tools',
server=service)
@ -68,14 +68,26 @@ class _Launchpad(object):
def login_anonymously(self, service=service, api_version=api_version):
'''Enforce an anonymous login.'''
if '_Launchpad__lp' not in self.__dict__:
if not self.logged_in:
self.__lp = launchpad.Launchpad.login_anonymously(
'ubuntu-dev-tools', service_root=service, version=api_version)
else:
raise AlreadyLoggedInError('Already logged in to Launchpad.')
def login_existing(self, lp):
'''Use an already logged in Launchpad object'''
if not self.logged_in:
self.__lp = lp
else:
raise AlreadyLoggedInError('Already logged in to Launchpad.')
@property
def logged_in(self):
'''Are we logged in?'''
return '_Launchpad__lp' in self.__dict__
def __getattr__(self, attr):
if '_Launchpad__lp' not in self.__dict__:
if not self.logged_in:
self.login()
return getattr(self.__lp, attr)

View File

@ -119,15 +119,3 @@ def split_release_pocket(release):
pocket)
return (release, pocket)
def dsc_name(package, version):
"Return the source package dsc filename for the given package"
if ':' in version:
version = version.split(':', 1)[1]
return '%s_%s.dsc' % (package, version)
def dsc_url(mirror, component, package, version):
"Build a source package URL"
group = package[:4] if package.startswith('lib') else package[0]
filename = dsc_name(package, version)
return os.path.join(mirror, 'pool', component, group, package, filename)

View File

@ -2,7 +2,8 @@
#
# mail.py - methods used by requestsync when used in "mail" mode
#
# Copyright © 2009 Michael Bienia <geser@ubuntu.com>
# Copyright © 2009 Michael Bienia <geser@ubuntu.com>,
# 2011 Stefano Rivera <stefanor@ubuntu.com>
#
# This module may contain code written by other authors/contributors to
# the main requestsync script. See there for their names.
@ -25,6 +26,7 @@ import subprocess
import smtplib
import socket
from debian.changelog import Version
from ubuntutools.archive import rmadison, FakeSPPH
from ubuntutools.requestsync.common import raw_input_exit_on_ctrlc
from ubuntutools.lp.udtexceptions import PackageNotFoundException
@ -36,68 +38,15 @@ __all__ = [
'mailBug',
]
class SourcePackagePublishingHistory(object):
'''
Simulate a SourcePackagePublishingHistory class from the LP API caching
module.
'''
def __init__(self, name, version, component):
self.name = name
self.version = version
self.component = component
def getPackageName(self):
return self.name
def getVersion(self):
return self.version
def getComponent(self):
return self.component
def rmadison(distro, package, release):
# Map 'sid' and 'squeeze' to their releasenames else rmadison gets a python
# traceback back from the remote script
releasenames = {
'sid': 'unstable',
'squeeze': 'testing', # Needs updating after each Debian release
}
release = releasenames.get(release, release)
rmadison_cmd = subprocess.Popen(['rmadison', '-u', distro, '-a', 'source',
'-s', release, package],
stdout=subprocess.PIPE)
rmadison_out = rmadison_cmd.communicate()[0]
assert (rmadison_cmd.returncode == 0)
# Return the most recent source line
lines = rmadison_out.splitlines()
if not lines:
# no output
return None
lines = [map(lambda x: x.strip(), line.split('|')) for line in lines]
lines = [line for line in lines if line[3].find('source') != -1]
if lines:
return max(lines, key = lambda x: Version(x[1]))
else:
# no source line
return None
def getSrcPkg(distro, name, release):
out = rmadison(distro, name, release)
if not out:
lines = list(rmadison(distro, name, suite=release, arch='source'))
if not lines:
raise PackageNotFoundException("'%s' doesn't appear to exist "
"in %s '%s'"
% (name, distro.capitalize(), release))
pkg = max(lines, key=lambda x: Version(x['version']))
version = out[1]
component = 'main'
raw_comp = out[2].split('/')
if len(raw_comp) == 2:
component = raw_comp[1]
return SourcePackagePublishingHistory(name, version, component)
return FakeSPPH(pkg['source'], pkg['version'], pkg['component'])
def getDebianSrcPkg(name, release):
return getSrcPkg('debian', name, release)

View File

@ -0,0 +1,63 @@
# example_package.py - Creates an example package
#
# Copyright (C) 2010-2011, Stefano Rivera <stefanor@ubuntu.com>
#
# Permission to use, copy, modify, and/or distribute this software for any
# purpose with or without fee is hereby granted, provided that the above
# copyright notice and this permission notice appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH
# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
# AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT,
# INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
# LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR
# OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
# PERFORMANCE OF THIS SOFTWARE.
import os.path
import shutil
import subprocess
import debian.debian_support
class ExamplePackage(object):
def __init__(self, source='example', version='1.0-1'):
self.source = source
self.version = debian.debian_support.Version(version)
self.srcdir = os.path.join('test-data', '%s-%s' % (source,
self.version.upstream_version))
if os.path.exists(self.srcdir):
shutil.rmtree(self.srcdir)
shutil.copytree('test-data/blank-example', self.srcdir)
def create_orig(self):
"Create .orig.tar.gz"
orig = '%s_%s.orig.tar.gz' % (self.source,
self.version.upstream_version)
subprocess.check_call(('tar', '-czf', orig,
os.path.basename(self.srcdir),
'--exclude', 'debian'),
cwd='test-data')
def changelog_entry(self, version=None, create=False):
"Add a changelog entry"
cmd = ['dch', '--noconf', '--preserve', '--package', self.source]
if create:
cmd.append('--create')
cmd += ['--newversion', version or self.version.full_version]
cmd.append('')
env = dict(os.environ)
env['DEBFULLNAME'] = 'Example'
env['DEBEMAIL'] = 'example@example.net'
subprocess.check_call(cmd, env=env, cwd=self.srcdir)
def create(self):
"Build source package"
self.changelog_entry(create=True)
subprocess.check_call(('dpkg-buildpackage', '-rfakeroot', '-S',
'-uc', '-us'),
cwd=self.srcdir)
def cleanup(self):
"Remove srcdir"
shutil.rmtree(self.srcdir)

View File

@ -2,7 +2,8 @@
# List of classes names for which member attributes should not be checked
# (useful for classes with attributes dynamically set).
ignored-classes=Launchpad,BaseWrapper,PersonTeam,Distribution,Consumer,Credentials
# lpapicache classes, urlparse
ignored-classes=Launchpad,BaseWrapper,PersonTeam,Distribution,Consumer,Credentials,ParseResult
[FORMAT]
@ -12,3 +13,8 @@ max-line-length=80
# String used as indentation unit. This is usually " " (4 spaces) or "\t" (1
# tab).
indent-string=' '
[BASIC]
# Allow variables called e, f, lp
good-names=i,j,k,ex,Run,_,e,f,lp

View File

@ -0,0 +1,289 @@
# test_archive.py - Test suite for ubuntutools.archive
#
# Copyright (C) 2010, Stefano Rivera <stefanor@ubuntu.com>
#
# Permission to use, copy, modify, and/or distribute this software for any
# purpose with or without fee is hereby granted, provided that the above
# copyright notice and this permission notice appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH
# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
# AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT,
# INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
# LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR
# OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
# PERFORMANCE OF THIS SOFTWARE.
from __future__ import with_statement
import __builtin__
import os.path
import shutil
import StringIO
import tempfile
import urllib2
import debian.deb822
import mox
import ubuntutools.archive
from ubuntutools.config import UDTConfig
from ubuntutools.logger import Logger
from ubuntutools.test import unittest
from ubuntutools.test.example_package import ExamplePackage
def setUpModule():
if not os.path.exists('test-data/example-0.1-1.dsc'):
ex_pkg = ExamplePackage()
ex_pkg.create_orig()
ex_pkg.create()
ex_pkg.cleanup()
class DscVerificationTestCase(mox.MoxTestBase, unittest.TestCase):
def setUp(self):
super(DscVerificationTestCase, self).setUp()
with open('test-data/example_1.0-1.dsc', 'rb') as f:
self.dsc = ubuntutools.archive.Dsc(f.read())
def tearDown(self):
super(DscVerificationTestCase, self).tearDown()
def test_good(self):
self.assertTrue(self.dsc.verify_file(
'test-data/example_1.0.orig.tar.gz'))
self.assertTrue(self.dsc.verify_file(
'test-data/example_1.0-1.debian.tar.gz'))
def test_missing(self):
self.assertFalse(self.dsc.verify_file(
'test-data/does.not.exist'))
def test_bad(self):
fn = 'test-data/example_1.0.orig.tar.gz'
with open(fn, 'rb') as f:
data = f.read()
data = data[:-1] + chr(ord(data[-1]) ^ 8)
self.mox.StubOutWithMock(__builtin__, 'open')
open(fn, 'rb').AndReturn(StringIO.StringIO(data))
self.mox.ReplayAll()
self.assertFalse(self.dsc.verify_file(fn))
def test_sha1(self):
del self.dsc['Checksums-Sha256']
self.test_good()
self.test_bad()
def test_md5(self):
del self.dsc['Checksums-Sha256']
del self.dsc['Checksums-Sha1']
self.test_good()
self.test_bad()
class LocalSourcePackageTestCase(mox.MoxTestBase, unittest.TestCase):
SourcePackage = ubuntutools.archive.UbuntuSourcePackage
def setUp(self):
super(LocalSourcePackageTestCase, self).setUp()
self.workdir = tempfile.mkdtemp(prefix='udt-test')
self.mox.StubOutWithMock(ubuntutools.archive, 'Distribution')
self.mox.StubOutWithMock(ubuntutools.archive, 'rmadison')
self.urlopen = urllib2.urlopen
self.mox.StubOutWithMock(urllib2, 'urlopen')
# Silence the tests a little:
self.mox.stubs.Set(Logger, 'stdout', StringIO.StringIO())
self.mox.stubs.Set(Logger, 'stderr', StringIO.StringIO())
def tearDown(self):
super(LocalSourcePackageTestCase, self).tearDown()
shutil.rmtree(self.workdir)
def urlopen_proxy(self, url, destname=None):
"Grab the file from test-data"
if destname is None:
destname = os.path.basename(url)
return self.urlopen('file://'
+ os.path.join(os.path.abspath('test-data'),
destname))
def urlopen_file(self, filename):
"Wrapper for urlopen_proxy for named files"
return lambda url: self.urlopen_proxy(url, filename)
def urlopen_null(self, url):
"urlopen for zero length files"
return StringIO.StringIO('')
def urlopen_404(self, url):
"urlopen for errors"
raise urllib2.HTTPError(url, 404, "Not Found", {}, None)
def test_local_copy(self):
urllib2.urlopen(mox.Regex('^file://.*\.dsc$')
).WithSideEffects(self.urlopen)
urllib2.urlopen(mox.Regex('^file://.*\.orig\.tar\.gz$')
).WithSideEffects(self.urlopen)
urllib2.urlopen(mox.Regex('^file://.*\.debian\.tar\.gz$')
).WithSideEffects(self.urlopen)
self.mox.ReplayAll()
pkg = self.SourcePackage('example', '1.0-1', 'main',
dscfile='test-data/example_1.0-1.dsc',
workdir=self.workdir)
pkg.pull()
pkg.unpack()
def test_verification(self):
shutil.copy2('test-data/example_1.0-1.dsc', self.workdir)
shutil.copy2('test-data/example_1.0.orig.tar.gz', self.workdir)
shutil.copy2('test-data/example_1.0-1.debian.tar.gz', self.workdir)
with open(os.path.join(self.workdir, 'example_1.0-1.debian.tar.gz'),
'r+b') as f:
f.write('CORRUPTION')
urllib2.urlopen(mox.Regex('^file://.*\.dsc$')
).WithSideEffects(self.urlopen)
urllib2.urlopen(mox.Regex('^file://.*\.debian\.tar\.gz$')
).WithSideEffects(self.urlopen)
self.mox.ReplayAll()
pkg = self.SourcePackage('example', '1.0-1', 'main',
dscfile='test-data/example_1.0-1.dsc',
workdir=self.workdir)
pkg.pull()
def test_pull(self):
dist = self.SourcePackage.distribution
mirror = UDTConfig.defaults['%s_MIRROR' % dist.upper()]
urlbase = '/pool/main/e/example/'
urllib2.urlopen('https://launchpad.net/%s/+archive/primary/'
'+files/example_1.0-1.dsc' % dist
).WithSideEffects(self.urlopen_proxy)
urllib2.urlopen(mirror + urlbase + 'example_1.0.orig.tar.gz'
).WithSideEffects(self.urlopen_proxy)
urllib2.urlopen(mirror + urlbase + 'example_1.0-1.debian.tar.gz'
).WithSideEffects(self.urlopen_proxy)
self.mox.ReplayAll()
pkg = self.SourcePackage('example', '1.0-1', 'main',
workdir=self.workdir)
pkg.pull()
def test_mirrors(self):
master = UDTConfig.defaults['UBUNTU_MIRROR']
mirror = 'http://mirror'
lpbase = 'https://launchpad.net/ubuntu/+archive/primary/+files/'
urlbase = '/pool/main/e/example/'
urllib2.urlopen(lpbase + 'example_1.0-1.dsc'
).WithSideEffects(self.urlopen_proxy)
urllib2.urlopen(mirror + urlbase + 'example_1.0.orig.tar.gz'
).WithSideEffects(self.urlopen_null)
urllib2.urlopen(master + urlbase + 'example_1.0.orig.tar.gz'
).WithSideEffects(self.urlopen_404)
urllib2.urlopen(lpbase + 'example_1.0.orig.tar.gz'
).WithSideEffects(self.urlopen_proxy)
urllib2.urlopen(mirror + urlbase + 'example_1.0-1.debian.tar.gz'
).WithSideEffects(self.urlopen_proxy)
self.mox.ReplayAll()
pkg = self.SourcePackage('example', '1.0-1', 'main',
workdir=self.workdir, mirrors=[mirror])
pkg.pull()
def test_dsc_missing(self):
lpbase = 'https://launchpad.net/ubuntu/+archive/primary/+files/'
urllib2.urlopen(lpbase + 'example_1.0-1.dsc'
).WithSideEffects(self.urlopen_404)
self.mox.ReplayAll()
pkg = self.SourcePackage('example', '1.0-1', 'main',
workdir=self.workdir)
self.assertRaises(ubuntutools.archive.DownloadError, pkg.pull)
class DebianLocalSourcePackageTestCase(LocalSourcePackageTestCase):
SourcePackage = ubuntutools.archive.DebianSourcePackage
def test_mirrors(self):
debian_master = UDTConfig.defaults['DEBIAN_MIRROR']
debsec_master = UDTConfig.defaults['DEBSEC_MIRROR']
debian_mirror = 'http://mirror/debian'
debsec_mirror = 'http://mirror/debsec'
lpbase = 'https://launchpad.net/debian/+archive/primary/+files/'
base = '/pool/main/e/example/'
urllib2.urlopen(lpbase + 'example_1.0-1.dsc'
).WithSideEffects(self.urlopen_proxy)
urllib2.urlopen(debian_mirror + base + 'example_1.0.orig.tar.gz'
).WithSideEffects(self.urlopen_null)
urllib2.urlopen(debsec_mirror + base + 'example_1.0.orig.tar.gz'
).WithSideEffects(self.urlopen_404)
urllib2.urlopen(debian_master + base + 'example_1.0.orig.tar.gz'
).WithSideEffects(self.urlopen_404)
urllib2.urlopen(debsec_master + base + 'example_1.0.orig.tar.gz'
).WithSideEffects(self.urlopen_404)
urllib2.urlopen(lpbase + 'example_1.0.orig.tar.gz'
).WithSideEffects(self.urlopen_404)
urllib2.urlopen('http://snapshot.debian.org/mr/package/example/1.0-1/'
'srcfiles?fileinfo=1'
).WithSideEffects(lambda x: StringIO.StringIO(
'{"fileinfo": {"hashabc": [{"name": "example_1.0.orig.tar.gz"}]}}'
))
urllib2.urlopen('http://snapshot.debian.org/file/hashabc'
).WithSideEffects(self.urlopen_file(
'example_1.0.orig.tar.gz'))
urllib2.urlopen(debian_mirror + base + 'example_1.0-1.debian.tar.gz'
).WithSideEffects(self.urlopen_proxy)
self.mox.ReplayAll()
pkg = self.SourcePackage('example', '1.0-1', 'main',
workdir=self.workdir, mirrors=[debian_mirror,
debsec_mirror])
pkg.pull()
pkg.unpack()
def test_dsc_missing(self):
mirror = 'http://mirror'
lpbase = 'https://launchpad.net/debian/+archive/primary/+files/'
base = '/pool/main/e/example/'
urllib2.urlopen(lpbase + 'example_1.0-1.dsc'
).WithSideEffects(self.urlopen_404)
urllib2.urlopen(mirror + base + 'example_1.0-1.dsc'
).WithSideEffects(self.urlopen_proxy)
urllib2.urlopen(mirror + base + 'example_1.0.orig.tar.gz'
).WithSideEffects(self.urlopen_proxy)
urllib2.urlopen(mirror + base + 'example_1.0-1.debian.tar.gz'
).WithSideEffects(self.urlopen_proxy)
self.mox.StubOutWithMock(debian.deb822.GpgInfo, 'from_sequence')
debian.deb822.GpgInfo.from_sequence(mox.IsA(str)).WithSideEffects(
lambda x: debian.deb822.GpgInfo.from_output(
'[GNUPG:] GOODSIG DEADBEEF Joe Developer '
'<joe@example.net>'))
self.mox.ReplayAll()
pkg = self.SourcePackage('example', '1.0-1', 'main',
workdir=self.workdir, mirrors=[mirror])
pkg.pull()
def test_dsc_badsig(self):
mirror = 'http://mirror'
lpbase = 'https://launchpad.net/debian/+archive/primary/+files/'
base = '/pool/main/e/example/'
urllib2.urlopen(lpbase + 'example_1.0-1.dsc'
).WithSideEffects(self.urlopen_404)
urllib2.urlopen(mirror + base + 'example_1.0-1.dsc'
).WithSideEffects(self.urlopen_proxy)
self.mox.StubOutWithMock(debian.deb822.GpgInfo, 'from_sequence')
debian.deb822.GpgInfo.from_sequence(mox.IsA(str)).WithSideEffects(
lambda x: debian.deb822.GpgInfo.from_output(
'[GNUPG:] ERRSIG DEADBEEF'))
self.mox.ReplayAll()
pkg = self.SourcePackage('example', '1.0-1', 'main',
workdir=self.workdir, mirrors=[mirror])
self.assertRaises(ubuntutools.archive.DownloadError, pkg.pull)

View File

@ -25,6 +25,8 @@ WHITELIST = [re.compile(': %s$' % x) for x in (
r"No name '\w+Error' in module 'launchpadlib\.errors'",
# http://www.logilab.org/ticket/51250:
r"Module 'hashlib' has no '(md5|sha(1|224|256|384|512))' member",
# mox:
r"Instance of 'GpgInfo' has no 'WithSideEffects' member",
)]
class PylintTestCase(unittest.TestCase):