mirror of
				https://git.launchpad.net/ubuntu-dev-tools
				synced 2025-11-04 07:54:03 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			332 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
			
		
		
	
	
			332 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
#!/usr/bin/python
 | 
						|
# -*- coding: UTF-8 -*-
 | 
						|
# Copyright (C) 2008 Terence Simpson <tsimpson@ubuntu.com>
 | 
						|
# License:
 | 
						|
#  This program is free software; you can redistribute it and/or modify
 | 
						|
#  it under the terms of the GNU General Public License as published by
 | 
						|
#  the Free Software Foundation; either version 2 of the License, or
 | 
						|
#  (at your option) any later version.
 | 
						|
#
 | 
						|
#  This program is distributed in the hope that it will be useful,
 | 
						|
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
 | 
						|
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | 
						|
#  GNU General Public License for more details.
 | 
						|
#
 | 
						|
#  You should have received a copy of the GNU General Public License along
 | 
						|
#  with this program; if not, write to the Free Software Foundation, Inc.,
 | 
						|
#  51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
 | 
						|
#
 | 
						|
# This script simulates «dget»'s behaviour for files hosted at
 | 
						|
# launchpadlibrarian.net.
 | 
						|
#
 | 
						|
# Detailed description:
 | 
						|
# This script attempts to download the source package in the same
 | 
						|
# way as dget does, but from launchpadlibrarian.net, which doesn't
 | 
						|
# store all the files in the same directory. It (the script) assumes
 | 
						|
# that the files are stored in sequential directories on Launchpad
 | 
						|
# Librarian and attempts to download and then unpack them.
 | 
						|
# This is a Python rewrite of the original bash script
 | 
						|
 | 
						|
import cStringIO
 | 
						|
import email.feedparser
 | 
						|
import hashlib
 | 
						|
import optparse
 | 
						|
import os
 | 
						|
import subprocess
 | 
						|
import sys
 | 
						|
import urllib2
 | 
						|
 | 
						|
try:
 | 
						|
    import GnuPGInterface
 | 
						|
except ImportError:
 | 
						|
    print >> sys.stderr, ("Please install 'python-gnupginterface' in order to "
 | 
						|
                          "use this utility.")
 | 
						|
    sys.exit(1)
 | 
						|
 | 
						|
USAGE = u"""Usage: %prog [-d|(-v|-q)] <Launchpad URL>
 | 
						|
 | 
						|
This scripts simulates «dget»'s behaviour for files hosted at
 | 
						|
launchpadlibrarian.net.
 | 
						|
 | 
						|
If you specify the -d option then it won't do anything, except download the
 | 
						|
.dsc file, but just print the commands it would run otherwise.
 | 
						|
 | 
						|
Example:
 | 
						|
    %prog http://launchpadlibrarian.net/10348157/coreutils_5.97-5.4ubuntu1.dsc
 | 
						|
"""
 | 
						|
 | 
						|
BASE_URL = "http://launchpadlibrarian.net/"
 | 
						|
 | 
						|
Debug = Verbose = Quiet = False
 | 
						|
 | 
						|
def unsign(data):
 | 
						|
    if data.splitlines()[0] != "-----BEGIN PGP SIGNED MESSAGE-----":
 | 
						|
        return data
 | 
						|
    oldstdout = sys.stdout
 | 
						|
    oldstderr = sys.stderr
 | 
						|
    sys.stdout = sys.__stdout__
 | 
						|
    sys.stderr = sys.__stderr__
 | 
						|
    gpg = GnuPGInterface.GnuPG()
 | 
						|
    proc = gpg.run(["--decrypt"], create_fhs=['stdin', 'stdout'])
 | 
						|
    proc.handles['stdin'].write(data)
 | 
						|
    proc.handles['stdin'].close()
 | 
						|
    plain = proc.handles['stdout'].read()
 | 
						|
    proc.handles['stdout'].close()
 | 
						|
    try:
 | 
						|
        proc.wait()
 | 
						|
    except:
 | 
						|
        pass
 | 
						|
    sys.stdout = oldstdout
 | 
						|
    sys.stderr = oldstderr
 | 
						|
    return plain
 | 
						|
 | 
						|
def get_entries(data):
 | 
						|
    parser = email.feedparser.FeedParser()
 | 
						|
    parser.feed(data)
 | 
						|
    return parser.close()
 | 
						|
 | 
						|
class DscParse(object):
 | 
						|
    """Attempt to get the file list from the .dsc file"""
 | 
						|
    def __init__(self, data):
 | 
						|
        """
 | 
						|
        __init__(data)
 | 
						|
        Given the contents of a .dsc, parse it and extract it's content
 | 
						|
        """
 | 
						|
        self.entries = get_entries(unsign(data))
 | 
						|
        self.files = [x.strip().split() for x in
 | 
						|
                      self.entries['Files'].splitlines()]
 | 
						|
 | 
						|
    def verify_all(self):
 | 
						|
        """
 | 
						|
        verify_all()
 | 
						|
        Verifies all the files, first checking the size, then the md5 sum.
 | 
						|
        Currently not used in this utility.
 | 
						|
        """
 | 
						|
        assert self.files, "I have no files"
 | 
						|
        ret = []
 | 
						|
        for f in self.files:
 | 
						|
            ret.append(self.verify(f))
 | 
						|
        return ret
 | 
						|
 | 
						|
    def verify(self, name):
 | 
						|
        """
 | 
						|
        verify(name)
 | 
						|
        Verify the file 'name', first checking the size, then the md5 sum.
 | 
						|
        """
 | 
						|
        assert self.files, "I have no files"
 | 
						|
        f = None
 | 
						|
        if isinstance(name, list):
 | 
						|
            f = name
 | 
						|
        else:
 | 
						|
            for i in self.files:
 | 
						|
                if i[2] == name:
 | 
						|
                    f = i
 | 
						|
        if not f:
 | 
						|
            raise ValueError, "%s is not in the .dsc" % name
 | 
						|
        (md5sum, size, name) = tuple(f)
 | 
						|
        stat = os.stat(name)
 | 
						|
        if str(stat.st_size) != size:
 | 
						|
            return (False, name, "Expected a size of %s, got %s" % \
 | 
						|
                (size, stat.st_size))
 | 
						|
        return self.getsum(name, md5sum)
 | 
						|
 | 
						|
    def getsum(self, name, md5sum=None):
 | 
						|
        """
 | 
						|
        getsum(name[, md5sum])
 | 
						|
        Read the file 'name' (in 1MB chunks) and generate an md5 sum,
 | 
						|
        then compares that to the md5 sum in the .dsc file.
 | 
						|
        """
 | 
						|
        chunk_size = 1073741824
 | 
						|
        fd = open(name, 'rb')
 | 
						|
        res = hashlib.md5()
 | 
						|
        if not md5sum:
 | 
						|
            assert self.files, "I have no files"
 | 
						|
            md5sum = [x[0] for x in self.files if x[2] == name][0]
 | 
						|
        data = fd.read(chunk_size)
 | 
						|
        while data:
 | 
						|
            res.update(data)
 | 
						|
            data = fd.read(chunk_size)
 | 
						|
        if res.hexdigest() != md5sum:
 | 
						|
            return (False, name, "Expected md5sum of %r, got %r" % \
 | 
						|
                                 (md5sum, res.hexdigest()))
 | 
						|
        return (True, name, None)
 | 
						|
 | 
						|
    def is_native(self):
 | 
						|
        """
 | 
						|
        is_native()
 | 
						|
        Returns True if this .dsc describes a native debian package;
 | 
						|
        else false.
 | 
						|
        """
 | 
						|
        return len(self.files) == 1
 | 
						|
 | 
						|
    # Access to fields in the .dsc via a dict-like interface
 | 
						|
    def __getitem__(self, item):
 | 
						|
        """
 | 
						|
        x.__getitem(item) -> x[item]
 | 
						|
        """
 | 
						|
        return self.entries.__getitem__(item)
 | 
						|
 | 
						|
    def __contains__(self, item):
 | 
						|
        """
 | 
						|
        x.__contains__(item) -> item in x
 | 
						|
        """
 | 
						|
        return self.entries.__contains__(item)
 | 
						|
 | 
						|
    def __getattr__(self, attr):
 | 
						|
        """
 | 
						|
        x.__getattr__(attr) -> item.attr
 | 
						|
        """
 | 
						|
        return getattr(self.entries, attr)
 | 
						|
 | 
						|
def error(ret, msg, *args):
 | 
						|
    """Prints an error message, unless quiet is set, and exits with ret"""
 | 
						|
    if not Quiet:
 | 
						|
        print >> sys.stderr, msg % args
 | 
						|
    sys.exit(ret)
 | 
						|
 | 
						|
def debug(msg, *args):
 | 
						|
    """If debugging is enabled, print a message"""
 | 
						|
    if Debug:
 | 
						|
        print >> sys.stderr, msg % args
 | 
						|
 | 
						|
def info(msg, *args):
 | 
						|
    """If verbose is enabled, print a message"""
 | 
						|
    if Verbose:
 | 
						|
        print msg % tuple(args)
 | 
						|
 | 
						|
def status(msg, *args):
 | 
						|
    """Prints a message, unless quiet is enabled"""
 | 
						|
    if not Quiet:
 | 
						|
        print msg % tuple(args)
 | 
						|
 | 
						|
def download(dscinfo, number, filename, verify=True):
 | 
						|
    """download filename"""
 | 
						|
    ftype = filename.endswith(".diff.gz") and "diff.gz" or \
 | 
						|
        filename.endswith(".orig.tar.gz") and "orig.tar.gz" or \
 | 
						|
        filename.endswith(".dsc") and "dsc" or "tar.gz"
 | 
						|
    if verify and os.path.exists(filename):
 | 
						|
        info('Verifying "%s"', filename)
 | 
						|
        res = dscinfo.verify(filename)
 | 
						|
        if not res[0]:
 | 
						|
            error(104, "Verification of %s failed: %s", filename, res[2])
 | 
						|
    status("Getting %s", filename)
 | 
						|
    debug("%s%s/%s", BASE_URL, number, filename)
 | 
						|
    try:
 | 
						|
        fd = urllib2.urlopen("%s%s/%s" % (BASE_URL, number, filename))
 | 
						|
        outfd = open(filename, 'wb')
 | 
						|
        outfd.write(fd.read())
 | 
						|
        fd.close()
 | 
						|
        outfd.close()
 | 
						|
    except urllib2.HTTPError, err:
 | 
						|
        status("Failed to fetch «%s» file, aborting.", ftype)
 | 
						|
        error(106, "Error: (%d %s)", err.code, err.msg)
 | 
						|
    except urllib2.URLError, err:
 | 
						|
        status("Failed to fetch «%s» file, aborting.", ftype)
 | 
						|
        error(105, "Error: %s", err)
 | 
						|
    except IOError, err:
 | 
						|
        status('Could not create "%s"', filename)
 | 
						|
        error(107, "Error: %s", err)
 | 
						|
 | 
						|
def unpack(filename):
 | 
						|
    out = open('/dev/null', 'w')
 | 
						|
    err = open('/dev/null', 'w')
 | 
						|
    cmd = ["dpkg-source", "-x", filename]
 | 
						|
    ret = subprocess.call(cmd, stdout=out, stderr=err)
 | 
						|
    out.close()
 | 
						|
    err.close()
 | 
						|
    if ret:
 | 
						|
        status("Failed to unpack source, aborting.")
 | 
						|
        sys.exit(108)
 | 
						|
 | 
						|
def get_host(url):
 | 
						|
    return urllib2.splithost(urllib2.splittype(url)[1])[0]
 | 
						|
 | 
						|
def main():
 | 
						|
    global Debug, Verbose, Quiet
 | 
						|
    parser = optparse.OptionParser(usage=USAGE)
 | 
						|
    parser.add_option("-d", "--debug", action="store_true", dest="debug",
 | 
						|
                      default=False, help="Enable debugging")
 | 
						|
    parser.add_option("-v", "--verbose", action="store_true", dest="verbose",
 | 
						|
                      default=False, help="Enable verbose output")
 | 
						|
    parser.add_option("-q", "--quiet", action="store_true", dest="quiet",
 | 
						|
                      default=False, help="Never print any output")
 | 
						|
 | 
						|
    (options, args) = parser.parse_args()
 | 
						|
    if len(args) != 1:
 | 
						|
        parser.error("Missing URL")
 | 
						|
    Debug = options.debug
 | 
						|
    Verbose = options.verbose
 | 
						|
    Quiet = options.quiet
 | 
						|
    if Verbose and Quiet:
 | 
						|
        error(4, "Specifying both --verbose and --quiet does not make sense")
 | 
						|
    if Quiet:
 | 
						|
        sys.stderr = cStringIO.StringIO()
 | 
						|
        sys.stdout = cStringIO.StringIO()
 | 
						|
 | 
						|
    url = args[0]
 | 
						|
 | 
						|
    if url.startswith("https://"):
 | 
						|
        url = url.replace("https://", "http://", 1)
 | 
						|
 | 
						|
    if not url.startswith("http://"):
 | 
						|
        url = "http://" + url
 | 
						|
 | 
						|
    if get_host(url).startswith("www."):
 | 
						|
        url = url.replace("www.", "", 1)
 | 
						|
 | 
						|
    if get_host(url) != get_host(BASE_URL):
 | 
						|
        error(1, "Error: This utility only works for files on %s.\n"
 | 
						|
                 "Maybe you want to try dget?", BASE_URL)
 | 
						|
 | 
						|
    (number, filename) = url.split('/')[3:]
 | 
						|
 | 
						|
    if not filename.endswith('.dsc'):
 | 
						|
        error(2, "You have to provide the URL for the .dsc file.")
 | 
						|
 | 
						|
    try:
 | 
						|
        number = int(number)
 | 
						|
    except:
 | 
						|
        error(3, "Bad URL format")
 | 
						|
 | 
						|
    if os.path.exists(filename):
 | 
						|
        os.remove(filename)
 | 
						|
 | 
						|
    download(None, number, filename, False)
 | 
						|
    try:
 | 
						|
        fd = open(filename)
 | 
						|
        dsc_data = fd.read()
 | 
						|
        fd.close()
 | 
						|
    except Exception:
 | 
						|
        status("Error: Please report this bug, providing the URL and attach"
 | 
						|
               " the following backtrace")
 | 
						|
        raise
 | 
						|
 | 
						|
    dscinfo = DscParse(dsc_data)
 | 
						|
 | 
						|
# launchpadlibrarian.net seems to store in this order:
 | 
						|
# For native packages:
 | 
						|
# <number>/.changes
 | 
						|
# <number>+1/.tar.gz
 | 
						|
# <number>+2/.dsc
 | 
						|
# For non-native packages:
 | 
						|
# <number>/.changes
 | 
						|
# <number>+1/.orig.tar.gz
 | 
						|
# <number>+2/.diff.gz
 | 
						|
# <number>+3/.dsc
 | 
						|
##
 | 
						|
# *Assuming* this does not change, we can figure out where the files are on
 | 
						|
# launchpadlibrarian.net relative to the .dsc file we're given.
 | 
						|
 | 
						|
# Only one file listed in the .dsc means it's native package
 | 
						|
    if len(dscinfo.files) == 1:
 | 
						|
        download(dscinfo, number-1, dscinfo.files[0][-1]) # .tar.gz
 | 
						|
    else:
 | 
						|
        download(dscinfo, number-1, dscinfo.files[1][-1]) # .diff.gz
 | 
						|
        download(dscinfo, number-2, dscinfo.files[0][-1]) # .orig.tar.gz
 | 
						|
 | 
						|
    status("Unpacking")
 | 
						|
    unpack(filename)
 | 
						|
 | 
						|
if __name__ == "__main__":
 | 
						|
    main()
 |