#!/usr/bin/python # -*- coding: UTF-8 -*- # Copyright (C) 2008 Terence Simpson # License: # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License along # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. # # This script simulates «dget»'s behaviour for files hosted at # launchpadlibrarian.net. # # Detailed description: # This script attempts to download the source package in the same # way as dget does, but from launchpadlibrarian.net, which doesn't # store all the files in the same directory. It (the script) assumes # that the files are stored in sequential directories on Launchpad # Librarian and attempts to download and then unpack them. # This is a Python rewrite of the original bash script import cStringIO import email.feedparser import hashlib import optparse import os import sys import urllib2 try: import GnuPGInterface except ImportError: print >> sys.stderr, ("Please install 'python-gnupginterface' in order to " "use this utility.") sys.exit(1) from ubuntutools import subprocess import ubuntutools.misc USAGE = u"""Usage: %prog [-d|(-v|-q)] This scripts simulates «dget»'s behaviour for files hosted at launchpadlibrarian.net. If you specify the -d option then it won't do anything, except download the .dsc file, but just print the commands it would run otherwise. Example: %prog http://launchpadlibrarian.net/10348157/coreutils_5.97-5.4ubuntu1.dsc """ BASE_URL = "http://launchpadlibrarian.net/" Debug = Verbose = Quiet = False def unsign(data): if data.splitlines()[0] != "-----BEGIN PGP SIGNED MESSAGE-----": return data oldstdout = sys.stdout oldstderr = sys.stderr sys.stdout = sys.__stdout__ sys.stderr = sys.__stderr__ gpg = GnuPGInterface.GnuPG() proc = gpg.run(["--decrypt"], create_fhs=['stdin', 'stdout']) proc.handles['stdin'].write(data) proc.handles['stdin'].close() plain = proc.handles['stdout'].read() proc.handles['stdout'].close() try: proc.wait() except: pass sys.stdout = oldstdout sys.stderr = oldstderr return plain def get_entries(data): parser = email.feedparser.FeedParser() parser.feed(data) return parser.close() class DscParse(object): """Attempt to get the file list from the .dsc file""" def __init__(self, data): """ __init__(data) Given the contents of a .dsc, parse it and extract it's content """ self.entries = get_entries(unsign(data)) self.files = [x.strip().split() for x in self.entries['Files'].splitlines()] def verify_all(self): """ verify_all() Verifies all the files, first checking the size, then the md5 sum. Currently not used in this utility. """ assert self.files, "I have no files" ret = [] for f in self.files: ret.append(self.verify(f)) return ret def verify(self, name): """ verify(name) Verify the file 'name', first checking the size, then the md5 sum. """ assert self.files, "I have no files" f = None if isinstance(name, list): f = name else: for i in self.files: if i[2] == name: f = i if not f: raise ValueError, "%s is not in the .dsc" % name (md5sum, size, name) = tuple(f) stat = os.stat(name) if str(stat.st_size) != size: return (False, name, "Expected a size of %s, got %s" % \ (size, stat.st_size)) return self.getsum(name, md5sum) def getsum(self, name, md5sum=None): """ getsum(name[, md5sum]) Read the file 'name' (in 1MB chunks) and generate an md5 sum, then compares that to the md5 sum in the .dsc file. """ chunk_size = 1073741824 fd = open(name, 'rb') res = hashlib.md5() if not md5sum: assert self.files, "I have no files" md5sum = [x[0] for x in self.files if x[2] == name][0] data = fd.read(chunk_size) while data: res.update(data) data = fd.read(chunk_size) if res.hexdigest() != md5sum: return (False, name, "Expected md5sum of %r, got %r" % \ (md5sum, res.hexdigest())) return (True, name, None) def is_native(self): """ is_native() Returns True if this .dsc describes a native debian package; else false. """ return len(self.files) == 1 # Access to fields in the .dsc via a dict-like interface def __getitem__(self, item): """ x.__getitem(item) -> x[item] """ return self.entries.__getitem__(item) def __contains__(self, item): """ x.__contains__(item) -> item in x """ return self.entries.__contains__(item) def __getattr__(self, attr): """ x.__getattr__(attr) -> item.attr """ return getattr(self.entries, attr) def error(ret, msg, *args): """Prints an error message, unless quiet is set, and exits with ret""" if not Quiet: print >> sys.stderr, msg % args sys.exit(ret) def debug(msg, *args): """If debugging is enabled, print a message""" if Debug: print >> sys.stderr, msg % args def info(msg, *args): """If verbose is enabled, print a message""" if Verbose: print msg % tuple(args) def status(msg, *args): """Prints a message, unless quiet is enabled""" if not Quiet: print msg % tuple(args) def download(dscinfo, number, filename, verify=True): """download filename""" ftype = filename.endswith(".diff.gz") and "diff.gz" or \ filename.endswith(".orig.tar.gz") and "orig.tar.gz" or \ filename.endswith(".dsc") and "dsc" or "tar.gz" if verify and os.path.exists(filename): info('Verifying "%s"', filename) res = dscinfo.verify(filename) if not res[0]: error(104, "Verification of %s failed: %s", filename, res[2]) status("Getting %s", filename) debug("%s%s/%s", BASE_URL, number, filename) try: fd = urllib2.urlopen("%s%s/%s" % (BASE_URL, number, filename)) outfd = open(filename, 'wb') outfd.write(fd.read()) fd.close() outfd.close() except urllib2.HTTPError, err: status(u"Failed to fetch «%s» file, aborting.", ftype) error(106, "Error: (%d %s)", err.code, err.msg) except urllib2.URLError, err: status(u"Failed to fetch «%s» file, aborting.", ftype) error(105, "Error: %s", err) except IOError, err: status('Could not create "%s"', filename) error(107, "Error: %s", err) def unpack(filename): out = open('/dev/null', 'w') err = open('/dev/null', 'w') cmd = ["dpkg-source", "-x", filename] ret = subprocess.call(cmd, stdout=out, stderr=err) out.close() err.close() if ret: status("Failed to unpack source, aborting.") sys.exit(108) def get_host(url): return urllib2.splithost(urllib2.splittype(url)[1])[0] def main(): global Debug, Verbose, Quiet parser = optparse.OptionParser(usage=USAGE) parser.add_option("-d", "--debug", action="store_true", dest="debug", default=False, help="Enable debugging") parser.add_option("-v", "--verbose", action="store_true", dest="verbose", default=False, help="Enable verbose output") parser.add_option("-q", "--quiet", action="store_true", dest="quiet", default=False, help="Never print any output") (options, args) = parser.parse_args() ubuntutools.misc.require_utf8() if len(args) != 1: parser.error("Missing URL") Debug = options.debug Verbose = options.verbose Quiet = options.quiet if Verbose and Quiet: error(4, "Specifying both --verbose and --quiet does not make sense") if Quiet: sys.stderr = cStringIO.StringIO() sys.stdout = cStringIO.StringIO() url = args[0] if url.startswith("https://"): url = url.replace("https://", "http://", 1) if not url.startswith("http://"): url = "http://" + url if get_host(url).startswith("www."): url = url.replace("www.", "", 1) if get_host(url) != get_host(BASE_URL): error(1, "Error: This utility only works for files on %s.\n" "Maybe you want to try dget?", BASE_URL) (number, filename) = url.split('/')[3:] if not filename.endswith('.dsc'): error(2, "You have to provide the URL for the .dsc file.") try: number = int(number) except: error(3, "Bad URL format") if os.path.exists(filename): os.remove(filename) download(None, number, filename, False) try: fd = open(filename) dsc_data = fd.read() fd.close() except Exception: status("Error: Please report this bug, providing the URL and attach" " the following backtrace") raise dscinfo = DscParse(dsc_data) # launchpadlibrarian.net seems to store in this order: # For native packages: # /.changes # +1/.tar.gz # +2/.dsc # For non-native packages: # /.changes # +1/.orig.tar.gz # +2/.diff.gz # +3/.dsc ## # *Assuming* this does not change, we can figure out where the files are on # launchpadlibrarian.net relative to the .dsc file we're given. # Only one file listed in the .dsc means it's native package if len(dscinfo.files) == 1: download(dscinfo, number-1, dscinfo.files[0][-1]) # .tar.gz else: download(dscinfo, number-1, dscinfo.files[1][-1]) # .diff.gz download(dscinfo, number-2, dscinfo.files[0][-1]) # .orig.tar.gz status("Unpacking") unpack(filename) if __name__ == "__main__": main()