mirror of
				https://git.launchpad.net/livecd-rootfs
				synced 2025-11-04 02:44:07 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			972 lines
		
	
	
		
			33 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
			
		
		
	
	
			972 lines
		
	
	
		
			33 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
#!/usr/bin/python3 -u
 | 
						|
#-*- encoding: utf-8 -*-
 | 
						|
"""
 | 
						|
This script can be called as "lp-in-release" or as "magic-proxy". When called
 | 
						|
under the former name, it acts as a CLI tool, when called under the latter name
 | 
						|
it will act as a transparent HTTP proxy.
 | 
						|
 | 
						|
The CLI tool parses the directory listing of
 | 
						|
 | 
						|
    http://<mirror>/dists/suite/by-hash/SHA256
 | 
						|
 | 
						|
and figures out which hashes belong to an InRelease file. For example, to list
 | 
						|
all available hashes for "cosmic" run
 | 
						|
 | 
						|
    ./lp-in-release list --suite cosmic
 | 
						|
 | 
						|
Per default the script scans archive.ubuntu.com, but you can tell it to use a
 | 
						|
different mirror with the --mirror-url command line parameter. Analogously, you
 | 
						|
can list the hashes for "cosmic-updates" or "cosmic-security". The script can
 | 
						|
also find the hash that was valid at a given timestamp via
 | 
						|
 | 
						|
    ./lp-in-release select --suite cosmic --cutoff-time <timestamp>
 | 
						|
 | 
						|
Finally, you can use the script to inject inrelease-path settings into a
 | 
						|
sources.list file via
 | 
						|
 | 
						|
    ./lp-in-release inject --cutoff-time <timestamp> /etc/apt/sources.list
 | 
						|
 | 
						|
The proxy is just an extension to this functionality. Whenever a URL points at
 | 
						|
an InRelease file or a path listed in an InRelease file, the proxy will
 | 
						|
automatically inject the by hash URL for the resource according to the timestamp
 | 
						|
it was configured for. The proxy works in transparent and non-transparent mode.
 | 
						|
"""
 | 
						|
from datetime import datetime, timedelta, tzinfo
 | 
						|
 | 
						|
import argparse
 | 
						|
import copy
 | 
						|
import fcntl
 | 
						|
import getopt
 | 
						|
import hashlib
 | 
						|
import http.client
 | 
						|
import http.server
 | 
						|
import json
 | 
						|
import os
 | 
						|
import pwd
 | 
						|
import re
 | 
						|
import shutil
 | 
						|
import socketserver
 | 
						|
import sys
 | 
						|
import threading
 | 
						|
import time
 | 
						|
import urllib.error
 | 
						|
import urllib.parse
 | 
						|
import urllib.request
 | 
						|
 | 
						|
EXIT_OK  = 0
 | 
						|
EXIT_ERR = 1
 | 
						|
 | 
						|
class LPInReleaseBaseError(Exception):
 | 
						|
    pass
 | 
						|
 | 
						|
class LPInReleaseIndexError(LPInReleaseBaseError):
 | 
						|
    pass
 | 
						|
 | 
						|
class LPInReleaseCacheError(LPInReleaseBaseError):
 | 
						|
    pass
 | 
						|
 | 
						|
class LPInReleaseProxyError(LPInReleaseBaseError):
 | 
						|
    pass
 | 
						|
 | 
						|
class InRelease:
 | 
						|
    """This class represents an InRelease file."""
 | 
						|
 | 
						|
    def __init__(self, mirror, suite, data, hash_=None, last_modified=None):
 | 
						|
        """mirror must contain the proper URL of the package repository up to
 | 
						|
        the "dists" folder, e.g.
 | 
						|
 | 
						|
        http://archive.ubuntu.com/ubuntu
 | 
						|
 | 
						|
        suite is the name of the suite this InRelease file belongs to, e.g.
 | 
						|
        <release>, <release>-updates or <release>-security.
 | 
						|
 | 
						|
        data must contain the full contents of the InReleaes file as a unicode
 | 
						|
        string.
 | 
						|
 | 
						|
        If supplied, then hash_ will be used as the sha256 hexdigest of the
 | 
						|
        binary encoding of the InRelease file. If not supplied, the hash will
 | 
						|
        be calculated. This is just used as a time-saver, when cache contents
 | 
						|
        are read back in.
 | 
						|
 | 
						|
        last_modified must be a string of format
 | 
						|
 | 
						|
        Thu, 26 Apr 2018 23:37:48 UTC
 | 
						|
 | 
						|
        representing the publication time of the InRelease file. If not given,
 | 
						|
        the generation time stored in the InRelease file will be used. Below,
 | 
						|
        this is set explicitly to correspond to the Last-Modified header spat
 | 
						|
        out by the Web server.
 | 
						|
        """
 | 
						|
        self.mirror = mirror
 | 
						|
        self.suite  = suite
 | 
						|
        self.data   = data
 | 
						|
        self.dict   = {}
 | 
						|
 | 
						|
        if hash_:
 | 
						|
            self.hash = hash_
 | 
						|
        else:
 | 
						|
            h = hashlib.sha256()
 | 
						|
            h.update(data.encode("utf-8"))
 | 
						|
            self.hash = h.hexdigest()
 | 
						|
 | 
						|
        if last_modified:
 | 
						|
            self.published = self._parse_datetime(last_modified)
 | 
						|
        else:
 | 
						|
            self.published = self._extract_timestamp(data)
 | 
						|
 | 
						|
    @property
 | 
						|
    def datetime(self):
 | 
						|
        """Return the publication time of this InRelease file as a string in
 | 
						|
        YYYY-MM-DD HH:MM:SS ISO format. The result is always in GMT."""
 | 
						|
        return datetime \
 | 
						|
                .utcfromtimestamp(self.published) \
 | 
						|
                .strftime('%Y-%m-%d %H:%M:%S')
 | 
						|
 | 
						|
    @property
 | 
						|
    def normalized_address(self):
 | 
						|
        """Return the "normalized" address of the mirror URL, consisting of
 | 
						|
        only the hostname and the path. This may be used as an index into an
 | 
						|
        InReleaseCache."""
 | 
						|
        result  = urllib.parse.urlparse(self.mirror)
 | 
						|
        address = result.hostname + result.path.rstrip("/")
 | 
						|
        return address
 | 
						|
 | 
						|
    @property
 | 
						|
    def contents(self):
 | 
						|
        """Return the pure contents of the InRelease file with the signature
 | 
						|
        stripped off."""
 | 
						|
        return self._split_release_and_sig(self.data)[0]
 | 
						|
 | 
						|
    @property
 | 
						|
    def signature(self):
 | 
						|
        """Return the ASCII-armored PGP signature of the InRelease file."""
 | 
						|
        return self._split_release_and_sig(self.data)[1]
 | 
						|
 | 
						|
    def serialize(self):
 | 
						|
        """Serializes the InRelease object into Python structures to be stored
 | 
						|
        in an InReleaseCache."""
 | 
						|
        month_names = [ "_ignore_",
 | 
						|
            "Jan", "Feb", "Mar", "Apr", "May", "Jun",
 | 
						|
            "Jul", "Aug", "Sep", "Oct", "Nov", "Dec",
 | 
						|
        ]
 | 
						|
 | 
						|
        wkday_names = [
 | 
						|
            "Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun",
 | 
						|
        ]
 | 
						|
 | 
						|
        dt = datetime.utcfromtimestamp(self.published)
 | 
						|
 | 
						|
        published = "{}, {:02} {} {} {:02}:{:02}:{:02} GMT".format(
 | 
						|
            wkday_names[dt.weekday()],
 | 
						|
            dt.day,
 | 
						|
            month_names[dt.month],
 | 
						|
            dt.year,
 | 
						|
            dt.hour,
 | 
						|
            dt.minute,
 | 
						|
            dt.second
 | 
						|
        )
 | 
						|
 | 
						|
        return {
 | 
						|
            "mirror":    self.mirror,
 | 
						|
            "suite":     self.suite,
 | 
						|
            "hash":      self.hash,
 | 
						|
            "published": published,
 | 
						|
            "data":      self.data,
 | 
						|
        }
 | 
						|
 | 
						|
    def get_hash_for(self, path):
 | 
						|
        """Check if the given path is listed in this InRelease file and if so
 | 
						|
        return the corresponding hash in hexdigest format. If the path is not
 | 
						|
        listed, None is returned."""
 | 
						|
        if not self.dict:
 | 
						|
            self._parse_contents()
 | 
						|
        return self.dict.get(path)
 | 
						|
 | 
						|
    def _parse_contents(self):
 | 
						|
        """This method parses out all lines containing SHA256 hashes and creates
 | 
						|
        an internal dict, mapping resources to hashes."""
 | 
						|
        regex = re.compile(
 | 
						|
            r" (?P<hash>[0-9a-f]{64})\s+(?P<size>\d+)\s+(?P<path>\S+)")
 | 
						|
 | 
						|
        for line in self.contents.splitlines():
 | 
						|
            m = regex.match(line)
 | 
						|
            if not m:
 | 
						|
                continue
 | 
						|
            self.dict[m.group("path")] = m.group("hash")
 | 
						|
 | 
						|
    def _parse_datetime(self, datetime_string):
 | 
						|
        """Because the behavior of Python's strptime's would be
 | 
						|
        locale-dependent, we parse datetime strings of the format found in
 | 
						|
        Last-Modified HTTP headers ourselves. This returns an integer
 | 
						|
        representing a posix timestamp or None, if the parsing failed."""
 | 
						|
        class UTC(tzinfo):
 | 
						|
            def utcoffset(self, dt):
 | 
						|
                return timedelta(0)
 | 
						|
 | 
						|
        # we need a map, because strptime would be locale-dependent
 | 
						|
        month_name_to_number = {
 | 
						|
            "Jan":  1, "Feb":  2, "Mar":  3, "Apr":  4, "May":  5, "Jun":  6,
 | 
						|
            "Jul":  7, "Aug":  8, "Sep":  9, "Oct": 10, "Nov": 11, "Dec": 12
 | 
						|
        }
 | 
						|
 | 
						|
        rexpr = r"""^\s*\w+,\s+
 | 
						|
                    (?P<day>\d+)   \s+
 | 
						|
                    (?P<month>\w+) \s+
 | 
						|
                    (?P<year>\d+)  \s+
 | 
						|
                    (?P<hour>\d+)  :
 | 
						|
                    (?P<min>\d+)   :
 | 
						|
                    (?P<sec>\d+)   .*$"""
 | 
						|
 | 
						|
        m = re.match(rexpr, datetime_string, flags=re.VERBOSE)
 | 
						|
        if not m:
 | 
						|
            return None
 | 
						|
 | 
						|
        parts    = list(m.group("year", "month", "day", "hour", "min", "sec"))
 | 
						|
        parts[1] = month_name_to_number[m.group("month")]
 | 
						|
        parts    = [int(s) for s in parts]
 | 
						|
        dt       = datetime(*parts, tzinfo=UTC())
 | 
						|
        epoch    = datetime(1970, 1, 1, tzinfo=UTC())
 | 
						|
        posix    = (dt - epoch).total_seconds()
 | 
						|
 | 
						|
        return int(posix)
 | 
						|
 | 
						|
    def _extract_timestamp(self, data):
 | 
						|
        """Parse the contents of the InRelease file to find the time it was
 | 
						|
        generated. Returns a POSIX timestamp if found or None otherwise."""
 | 
						|
        for line in data.splitlines():
 | 
						|
            if line.startswith("Date:"):
 | 
						|
                return self._parse_datetime(line.split(":", 1)[1])
 | 
						|
 | 
						|
        return None
 | 
						|
 | 
						|
    def _split_release_and_sig(self, data):
 | 
						|
        """Split the InRelease file into content and signature parts and return
 | 
						|
        a tuple of unicode strings (content, signature)."""
 | 
						|
        rexpr = re.escape("-----BEGIN PGP SIGNED MESSAGE-----") + r"\r?\n|" + \
 | 
						|
                re.escape("-----BEGIN PGP SIGNATURE-----"     ) + r"\r?\n|" + \
 | 
						|
                re.escape("-----END PGP SIGNATURE-----"       )
 | 
						|
 | 
						|
        # returns content and signature
 | 
						|
        return re.split(rexpr, data)[1:3]
 | 
						|
 | 
						|
 | 
						|
class LPInReleaseCache:
 | 
						|
    """A cache for InRelease files that can optionally be saved to and
 | 
						|
    loaded from disk."""
 | 
						|
 | 
						|
    def __init__(self, filename=None):
 | 
						|
        """If filename is given, it is the name of the file that cache contents
 | 
						|
        will be saved to or loaded from when the save and load methods are
 | 
						|
        called, respectively."""
 | 
						|
        self._filename = filename
 | 
						|
        self._data     = {}
 | 
						|
        self._lock     = threading.Lock()
 | 
						|
 | 
						|
        self.load()
 | 
						|
 | 
						|
    def load(self):
 | 
						|
        """Load the cache contents from disk performing some rudimentary file
 | 
						|
        locking to prevent corruption."""
 | 
						|
        if not self._filename:
 | 
						|
            return
 | 
						|
 | 
						|
        buf = []
 | 
						|
        fd  = None
 | 
						|
        try:
 | 
						|
            fd = os.open(self._filename, os.O_CREAT | os.O_RDWR)
 | 
						|
 | 
						|
            fcntl.flock(fd, fcntl.LOCK_EX)
 | 
						|
 | 
						|
            while True:
 | 
						|
                tmp = os.read(fd, 4096)
 | 
						|
                if not tmp:
 | 
						|
                    break
 | 
						|
                buf.append(tmp)
 | 
						|
 | 
						|
            fcntl.flock(fd, fcntl.LOCK_UN)
 | 
						|
        except OSError as e:
 | 
						|
            raise LPInReleaseCacheError("Failed to load cache file: {}"
 | 
						|
                    .format(str(e)))
 | 
						|
        finally:
 | 
						|
            if fd:
 | 
						|
                os.close(fd)
 | 
						|
 | 
						|
        cache_data = {} if not buf else json.loads(
 | 
						|
                b"".join(buf).decode("utf-8"))
 | 
						|
 | 
						|
        with self._lock:
 | 
						|
            self._data = cache_data
 | 
						|
 | 
						|
    def save(self):
 | 
						|
        """Save the cache contents to disk performing some rudimentary file
 | 
						|
        locking to prevent corruption."""
 | 
						|
        if not self._filename:
 | 
						|
            return
 | 
						|
 | 
						|
        with self._lock:
 | 
						|
            buf = json \
 | 
						|
                .dumps(self._data, ensure_ascii=False, indent=4,
 | 
						|
                        sort_keys=True) \
 | 
						|
                .encode("utf-8")
 | 
						|
 | 
						|
        fd = None
 | 
						|
        try:
 | 
						|
            fd = os.open(self._filename, os.O_CREAT | os.O_RDWR)
 | 
						|
 | 
						|
            fcntl.flock(fd, fcntl.LOCK_EX)
 | 
						|
 | 
						|
            os.ftruncate(fd, 0)
 | 
						|
            os.write(fd, buf)
 | 
						|
 | 
						|
            fcntl.flock(fd, fcntl.LOCK_UN)
 | 
						|
        except OSError as e:
 | 
						|
            raise LPInReleaseCacheError("Failed to store cache file: {}"
 | 
						|
                    .format(str(e)))
 | 
						|
        finally:
 | 
						|
            if fd:
 | 
						|
                os.close(fd)
 | 
						|
 | 
						|
    def add(self, inrelease):
 | 
						|
        """Add the given InRelease object to the cache."""
 | 
						|
        with self._lock:
 | 
						|
            self._data \
 | 
						|
                .setdefault(inrelease.normalized_address, {}) \
 | 
						|
                .setdefault(inrelease.suite, {}) \
 | 
						|
                .setdefault(inrelease.hash, inrelease.serialize())
 | 
						|
 | 
						|
    def get_one(self, mirror, suite, hash_):
 | 
						|
        """Return a single InRelease object for the given mirror and suite,
 | 
						|
        corresponding to the hash or None if such an entry does not exist."""
 | 
						|
        with self._lock:
 | 
						|
            url_obj = urllib.parse.urlparse(mirror)
 | 
						|
            address = url_obj.hostname + url_obj.path.rstrip("/")
 | 
						|
 | 
						|
            inrel = self._data\
 | 
						|
                .get(address, {})\
 | 
						|
                .get(suite, {})\
 | 
						|
                .get(hash_)
 | 
						|
 | 
						|
            if not inrel:
 | 
						|
                return None
 | 
						|
 | 
						|
            return InRelease(
 | 
						|
                inrel["mirror"],
 | 
						|
                inrel["suite"],
 | 
						|
                inrel["data"],
 | 
						|
                hash_=inrel["hash"],
 | 
						|
                last_modified=inrel["published"]
 | 
						|
            )
 | 
						|
 | 
						|
    def get_all(self, mirror, suite):
 | 
						|
        """Retrieve a list of InRelease objects for the given mirror and suite.
 | 
						|
        Return a list of all known InRelease objects for the given mirror and
 | 
						|
        suite."""
 | 
						|
        with self._lock:
 | 
						|
            url_obj = urllib.parse.urlparse(mirror)
 | 
						|
            address = url_obj.hostname + url_obj.path.rstrip("/")
 | 
						|
 | 
						|
            inrel_by_hash = self._data\
 | 
						|
                .get(address, {})\
 | 
						|
                .get(suite, {})
 | 
						|
 | 
						|
            inrelease_list = []
 | 
						|
 | 
						|
            for hash_, inrel in inrel_by_hash.items():
 | 
						|
                inrelease_list.append(
 | 
						|
                    InRelease(
 | 
						|
                        inrel["mirror"],
 | 
						|
                        inrel["suite"],
 | 
						|
                        inrel["data"],
 | 
						|
                        hash_=inrel["hash"],
 | 
						|
                        last_modified=inrel["published"]
 | 
						|
                    )
 | 
						|
                )
 | 
						|
 | 
						|
            return inrelease_list
 | 
						|
 | 
						|
 | 
						|
class LPInReleaseIndex:
 | 
						|
    """Abstraction to the build system's view of the "by hash" database.
 | 
						|
    Currently, that interface is the by-hash directory listing of the Web
 | 
						|
    server."""
 | 
						|
 | 
						|
    def __init__(self, mirror, suite, cache=None):
 | 
						|
        """The mirror is the base URL of the repository up to the "dists"
 | 
						|
        folder, e.g.
 | 
						|
 | 
						|
        http://archive.ubuntu.com/ubuntu
 | 
						|
 | 
						|
        suite is the name of the suite this InReleaseIndex object operates on,
 | 
						|
        e.g. <release>, <release>-updates or <release>-security.
 | 
						|
 | 
						|
        Optionally, cache can be initialized to a LPInReleaseCache object, in
 | 
						|
        which case all look-ups will first go to the cache and only cache
 | 
						|
        misses will result in requests to the Web server.
 | 
						|
        """
 | 
						|
        self._mirror = mirror
 | 
						|
        self._suite  = suite
 | 
						|
        self._cache  = cache
 | 
						|
 | 
						|
        self._base_url = "/".join([self._mirror, "dists", self._suite,
 | 
						|
            "by-hash/SHA256"])
 | 
						|
 | 
						|
    def inrelease_files(self):
 | 
						|
        """Iterate over all InRelease files found in the archive for the mirror
 | 
						|
        and suite this index has been configured to operate on."""
 | 
						|
        hashes = self._retrieve_hashes()
 | 
						|
 | 
						|
        for h in hashes:
 | 
						|
            inrelease = None
 | 
						|
 | 
						|
            if self._cache:
 | 
						|
                inrelease = self._cache.get_one(self._mirror,
 | 
						|
                                self._suite, hash_=h)
 | 
						|
            if not inrelease:
 | 
						|
                inrelease = self._retrieve_inrelease(h)
 | 
						|
            if not inrelease:
 | 
						|
                continue
 | 
						|
 | 
						|
            yield inrelease
 | 
						|
 | 
						|
    def get_inrelease_for_timestamp(self, time_gmt):
 | 
						|
        """Find and return the InRelease file that was valid at the given Posix
 | 
						|
        timestamp."""
 | 
						|
        candidate = None
 | 
						|
 | 
						|
        for inrelease in self.inrelease_files():
 | 
						|
            if inrelease.published > time_gmt:
 | 
						|
                continue
 | 
						|
            if not candidate or inrelease.published > candidate.published:
 | 
						|
                candidate = inrelease
 | 
						|
 | 
						|
        return candidate
 | 
						|
 | 
						|
    def _retrieve_inrelease(self, hash_):
 | 
						|
        """Retrieve the contents of the file identified by hash_. Check if the
 | 
						|
        file is an InRelease file and return a corresponding InRelease object.
 | 
						|
        If the hash_ does not belong to an InRelease file, None is returned."""
 | 
						|
        _500KB = 500 * 1024
 | 
						|
 | 
						|
        buf = b""
 | 
						|
        inrelease = None
 | 
						|
        url = self._base_url + "/" + hash_
 | 
						|
 | 
						|
        try:
 | 
						|
            with urllib.request.urlopen(url) as response:
 | 
						|
 | 
						|
                # InRelease files seem to be around 200-300KB
 | 
						|
 | 
						|
                content_length = response.headers.get("Content-Length")
 | 
						|
                last_modified  = response.headers.get("Last-Modified")
 | 
						|
 | 
						|
                if not content_length:
 | 
						|
                    buf = response.read(_500KB + 1)
 | 
						|
                    content_length = len(buf)
 | 
						|
                else:
 | 
						|
                    content_length = int(content_length)
 | 
						|
 | 
						|
                # Slightly silly heuristic, but does the job
 | 
						|
 | 
						|
                if content_length > _500KB or content_length < 1024:
 | 
						|
                    return None
 | 
						|
 | 
						|
                buf += response.read()
 | 
						|
 | 
						|
                content_encoding = self \
 | 
						|
                    ._guess_content_encoding_for_response(response)
 | 
						|
 | 
						|
                # few additional checks to see if this is an InRelease file
 | 
						|
 | 
						|
                try:
 | 
						|
                    buf = buf.decode(content_encoding)
 | 
						|
                except UnicodeError:
 | 
						|
                    return None
 | 
						|
 | 
						|
                if not buf.startswith("-----BEGIN PGP SIGNED MESSAGE-----"):
 | 
						|
                    return None
 | 
						|
 | 
						|
                for kw in ["Origin:", "Label:", "Suite:", "Acquire-By-Hash:"]:
 | 
						|
                    if not kw in buf:
 | 
						|
                        return None
 | 
						|
 | 
						|
                inrelease = InRelease(self._mirror, self._suite, buf,
 | 
						|
                        hash_=hash_, last_modified=last_modified)
 | 
						|
 | 
						|
                if self._cache:
 | 
						|
                    self._cache.add(inrelease)
 | 
						|
        except urllib.error.HTTPError as e:
 | 
						|
            if not e.code in [404,]:
 | 
						|
                raise LPInReleaseIndexError("Error retrieving {}: {}"
 | 
						|
                    .format(url, str(e)))
 | 
						|
 | 
						|
        return inrelease
 | 
						|
 | 
						|
    def _guess_content_encoding_for_response(self, response):
 | 
						|
        """Guess the content encoding of the given HTTPResponse object."""
 | 
						|
        content_encoding = response.headers.get("Content-Encoding")
 | 
						|
        content_type     = response.headers.get("Content-Type",
 | 
						|
                "text/html;charset=UTF-8")
 | 
						|
 | 
						|
        if not content_encoding:
 | 
						|
            m = re.match(r"^.*charset=(\S+)$", content_type)
 | 
						|
 | 
						|
            if m:
 | 
						|
                content_encoding = m.group(1)
 | 
						|
            else:
 | 
						|
                content_encoding = "UTF-8"
 | 
						|
 | 
						|
        return content_encoding
 | 
						|
 | 
						|
    def _retrieve_hashes(self):
 | 
						|
        """Retrieve all available by-hashes for the mirror and suite that this
 | 
						|
        index is configured to operate on."""
 | 
						|
        hashes = []
 | 
						|
 | 
						|
        if self._cache:
 | 
						|
            cache_entry = self._cache.get_all(self._mirror, self._suite)
 | 
						|
            if cache_entry:
 | 
						|
                return [inrel.hash for inrel in cache_entry]
 | 
						|
 | 
						|
        try:
 | 
						|
            with urllib.request.urlopen(self._base_url) as response:
 | 
						|
                content_encoding = self._guess_content_encoding_for_response(
 | 
						|
                        response)
 | 
						|
 | 
						|
                body   = response.read().decode(content_encoding)
 | 
						|
                hashes = list(set(re.findall(r"[a-z0-9]{64}", body)))
 | 
						|
        except urllib.error.URLError as e:
 | 
						|
            raise LPInReleaseIndexError("Could not retrieve hash listing: {}"
 | 
						|
                    .format(str(e)))
 | 
						|
 | 
						|
        return hashes
 | 
						|
 | 
						|
 | 
						|
class LPInReleaseIndexCli:
 | 
						|
    """A CLI interface for LPInReleaseIndex."""
 | 
						|
 | 
						|
    def __init__(self, name):
 | 
						|
        self._name      = name
 | 
						|
        self._mirror    = None
 | 
						|
        self._suite     = None
 | 
						|
        self._timestamp = None
 | 
						|
        self._cachefile = None
 | 
						|
        self._cache     = None
 | 
						|
        self._infile    = None
 | 
						|
        self._outfile   = None
 | 
						|
 | 
						|
    def __call__(self, args):
 | 
						|
        options = vars(self._parse_opts(args))
 | 
						|
 | 
						|
        # Copy settings to object attributes
 | 
						|
        for key, value in options.items():
 | 
						|
            if hasattr(self, "_" + key):
 | 
						|
                setattr(self, "_" + key, value)
 | 
						|
 | 
						|
        if self._cachefile:
 | 
						|
            self._cache = LPInReleaseCache(self._cachefile)
 | 
						|
 | 
						|
        try:
 | 
						|
            options["func"]()
 | 
						|
        except LPInReleaseIndexError as e:
 | 
						|
            sys.stderr.write("{}: {}\n".format(self._name, str(e)))
 | 
						|
            sys.exit(EXIT_ERR)
 | 
						|
 | 
						|
        if self._cache:
 | 
						|
            self._cache.save()
 | 
						|
 | 
						|
    def list(self):
 | 
						|
        """List all InRelease hashes for a given mirror and suite."""
 | 
						|
        for inrelease in self._list(self._mirror, self._suite):
 | 
						|
            if self._timestamp and inrelease.published > self._timestamp:
 | 
						|
                continue
 | 
						|
 | 
						|
            print("{} {} ({})".format(
 | 
						|
                inrelease.hash,
 | 
						|
                inrelease.datetime,
 | 
						|
                inrelease.published,
 | 
						|
            ))
 | 
						|
 | 
						|
    def select(self):
 | 
						|
        """Find the hash of the InRelease file valid at a given timestamp."""
 | 
						|
        candidate = self._select(self._mirror, self._suite)
 | 
						|
 | 
						|
        if candidate:
 | 
						|
            print("{} {} ({})".format(
 | 
						|
                candidate.hash,
 | 
						|
                candidate.datetime,
 | 
						|
                candidate.published,
 | 
						|
            ))
 | 
						|
 | 
						|
    def inject(self):
 | 
						|
        """Inject by-hash and inrelease-path settings into a sources.list."""
 | 
						|
        sources_list = self._infile
 | 
						|
 | 
						|
        if not os.path.exists(sources_list):
 | 
						|
            sys.stderr.write("{}: No such file: {}.\n"
 | 
						|
                    .format(self._name, sources_list))
 | 
						|
            sys.exit(EXIT_ERR)
 | 
						|
 | 
						|
        with open(sources_list, "r", encoding="utf-8") as fp:
 | 
						|
            buf = fp.read()
 | 
						|
 | 
						|
        rexpr = re.compile(r"""^
 | 
						|
            (?P<type>deb(?:-src)?)\s+
 | 
						|
            (?P<opts>\[[^\]]+\]\s+)?
 | 
						|
            (?P<mirror>(?P<scheme>\S+):\S+)\s+
 | 
						|
            (?P<suite>\S+)\s+
 | 
						|
            (?P<comps>.*)$""", flags=re.VERBOSE)
 | 
						|
 | 
						|
        lines = buf.splitlines(True)
 | 
						|
 | 
						|
        for i, line in enumerate(lines):
 | 
						|
            line = lines[i]
 | 
						|
            m = rexpr.match(line)
 | 
						|
 | 
						|
            if not m:
 | 
						|
                continue
 | 
						|
            if m.group("scheme") not in ["http", "https", "ftp"]:
 | 
						|
                continue
 | 
						|
 | 
						|
            opts = {}
 | 
						|
            if m.group("opts"):
 | 
						|
                for entry in m.group("opts").strip().strip("[]").split():
 | 
						|
                    k, v = entry.split("=")
 | 
						|
                    opts[k] = v
 | 
						|
 | 
						|
            inrelease = self._select(m.group("mirror"), m.group("suite"))
 | 
						|
            if inrelease:
 | 
						|
                opts["by-hash"]        = "yes"
 | 
						|
                opts["inrelease-path"] = "by-hash/SHA256/" + inrelease.hash
 | 
						|
 | 
						|
                groupdict = m.groupdict()
 | 
						|
                groupdict["opts"] = " ".join(["{0}={1}".format(*o) for o in
 | 
						|
                    opts.items()])
 | 
						|
 | 
						|
                lines[i] = "{type} [{opts}] {mirror} {suite} {comps}\n"\
 | 
						|
			.format(**groupdict)
 | 
						|
 | 
						|
        outfile = None
 | 
						|
        try:
 | 
						|
            if not self._outfile or self._outfile == "-":
 | 
						|
                outfile = sys.stdout
 | 
						|
            else:
 | 
						|
                outfile = open(self._outfile, "w+", encoding="utf-8")
 | 
						|
            outfile.write("".join(lines))
 | 
						|
        finally:
 | 
						|
            if outfile and outfile != sys.stdout:
 | 
						|
                outfile.close()
 | 
						|
 | 
						|
    def _parse_opts(self, args):
 | 
						|
        """Parse command line arguments and initialize the CLI object."""
 | 
						|
        main_parser = argparse.ArgumentParser()
 | 
						|
        subparsers = main_parser.add_subparsers(dest="command")
 | 
						|
 | 
						|
        parser_inject = subparsers.add_parser("inject",
 | 
						|
            help="Rewrite a sources.list file injecting appropriate hashes.")
 | 
						|
        parser_list = subparsers.add_parser("list",
 | 
						|
            help="List InRelease hashes for a given release and suite.")
 | 
						|
        parser_select = subparsers.add_parser("select",
 | 
						|
            help="Select hash to use for a given timestamp, release, suite.")
 | 
						|
 | 
						|
        parser_inject.set_defaults(func=self.inject)
 | 
						|
        parser_list.set_defaults(func=self.list)
 | 
						|
        parser_select.set_defaults(func=self.select)
 | 
						|
 | 
						|
        # Options common to all commands
 | 
						|
        for parser in [parser_inject, parser_list, parser_select]:
 | 
						|
            cutoff_time_required = True if parser != parser_list else False
 | 
						|
 | 
						|
            parser.add_argument("-t", "--cutoff-time", dest="timestamp",
 | 
						|
                type=int, required=cutoff_time_required,
 | 
						|
                help="A POSIX timestamp to pin the repo to.")
 | 
						|
            parser.add_argument("--cache-file", dest="cachefile", type=str,
 | 
						|
                help="A file where to cache intermediate results (optional).")
 | 
						|
 | 
						|
        mirror = "http://archive.ubuntu.com/ubuntu"
 | 
						|
 | 
						|
        # Options common to list, select commands
 | 
						|
        for parser in [parser_list, parser_select]:
 | 
						|
            parser.add_argument("-m", "--mirror", dest="mirror", type=str,
 | 
						|
                default=mirror, help="The URL of the mirror to use.")
 | 
						|
            parser.add_argument("-s", "--suite",
 | 
						|
                dest="suite", type=str, required=True,
 | 
						|
                help="The suite to scan (e.g. 'bionic', 'bionic-updates').")
 | 
						|
 | 
						|
        # Extra option for inject command
 | 
						|
        parser_inject.add_argument("-o", "--output-file", dest="outfile",
 | 
						|
            type=str, help="")
 | 
						|
        parser_inject.add_argument("infile", type=str,
 | 
						|
            help="The sources.list file to modify.")
 | 
						|
 | 
						|
        if not args:
 | 
						|
            main_parser.print_help()
 | 
						|
            sys.exit(EXIT_ERR)
 | 
						|
 | 
						|
        return main_parser.parse_args(args)
 | 
						|
 | 
						|
    def _list(self, mirror, suite):
 | 
						|
        """Internal helper for the list command. This is also used
 | 
						|
        implicitly by the _select method."""
 | 
						|
        index = LPInReleaseIndex(mirror, suite, cache=self._cache)
 | 
						|
 | 
						|
        inrelease_files = \
 | 
						|
            reversed(
 | 
						|
                sorted(
 | 
						|
                    list(index.inrelease_files()),
 | 
						|
                    key=lambda x: x.published
 | 
						|
                )
 | 
						|
            )
 | 
						|
 | 
						|
        return inrelease_files
 | 
						|
 | 
						|
    def _select(self, mirror, suite):
 | 
						|
        """Internal helper for the select command."""
 | 
						|
        candidate = None
 | 
						|
 | 
						|
        for inrelease in self._list(mirror, suite):
 | 
						|
            if inrelease.published > self._timestamp:
 | 
						|
                continue
 | 
						|
            if not candidate or inrelease.published > candidate.published:
 | 
						|
                candidate = inrelease
 | 
						|
 | 
						|
        return candidate
 | 
						|
 | 
						|
 | 
						|
class ProxyingHTTPRequestHandler(http.server.BaseHTTPRequestHandler):
 | 
						|
    """Request handler providing a virtual snapshot of the package
 | 
						|
    repositories."""
 | 
						|
 | 
						|
    def do_HEAD(self):
 | 
						|
        """Process a HEAD request."""
 | 
						|
        self.__get_request(verb="HEAD")
 | 
						|
 | 
						|
    def do_GET(self):
 | 
						|
        """Process a GET request."""
 | 
						|
        self.__get_request()
 | 
						|
 | 
						|
    def __get_request(self, verb="GET"):
 | 
						|
        """Pass all requests on to the destination server 1:1 except when the
 | 
						|
        target is an InRelease file or a resource listed in an InRelease files.
 | 
						|
 | 
						|
        In that case we silently download the resource via the by-hash URL
 | 
						|
        which was most recent at the cutoff (or repo snapshot) time and inject
 | 
						|
        it into the response.
 | 
						|
 | 
						|
        It is important to understand that there is no status 3xx HTTP redirect
 | 
						|
        happening here, the client does not know that what it receives is not
 | 
						|
        exactly what it requested."""
 | 
						|
 | 
						|
        host, path = self.__get_host_path()
 | 
						|
 | 
						|
        m = re.match(
 | 
						|
            r"^(?P<base>.*?)/dists/(?P<suite>[^/]+)/(?P<target>.*)$",
 | 
						|
            path
 | 
						|
        )
 | 
						|
 | 
						|
        if m:
 | 
						|
            mirror = "http://" + host + m.group("base")
 | 
						|
            base   = m.group("base")
 | 
						|
            suite  = m.group("suite")
 | 
						|
            target = m.group("target")
 | 
						|
 | 
						|
            index = LPInReleaseIndex(mirror, suite,
 | 
						|
                        cache=self.server.inrelease_cache)
 | 
						|
            inrelease = index.get_inrelease_for_timestamp(
 | 
						|
                    self.server.snapshot_stamp)
 | 
						|
 | 
						|
            if inrelease is None:
 | 
						|
                self.__send_error(404, "No InRelease file found for given "
 | 
						|
                                       "mirror, suite and timestamp.")
 | 
						|
                return
 | 
						|
 | 
						|
            if target == "InRelease":
 | 
						|
                # If target is InRelease, send back contents directly.
 | 
						|
                data = inrelease.data.encode("utf-8")
 | 
						|
 | 
						|
                self.log_message(
 | 
						|
                    "Inject InRelease '{}'".format(inrelease.hash))
 | 
						|
 | 
						|
                self.send_response(200)
 | 
						|
                self.send_header("Content-Length", len(data))
 | 
						|
                self.end_headers()
 | 
						|
 | 
						|
                if verb == "GET":
 | 
						|
                    self.wfile.write(data)
 | 
						|
 | 
						|
                return
 | 
						|
            else:
 | 
						|
                # If target hash is listed, then redirect to by-hash URL.
 | 
						|
                hash_ = inrelease.get_hash_for(target)
 | 
						|
 | 
						|
                if hash_:
 | 
						|
                    self.log_message(
 | 
						|
                        "Inject {} for {}".format(hash_, target))
 | 
						|
 | 
						|
                    target_path = target.rsplit("/", 1)[0]
 | 
						|
 | 
						|
                    path = "{}/dists/{}/{}/by-hash/SHA256/{}"\
 | 
						|
                            .format(base, suite, target_path, hash_)
 | 
						|
 | 
						|
        try:
 | 
						|
            client = http.client.HTTPConnection(host)
 | 
						|
            client.request(verb, path)
 | 
						|
        except Exception as e:
 | 
						|
            self.log_error("Failed to retrieve http://{}{}: {}"
 | 
						|
                    .format(host, path, str(e)))
 | 
						|
            return
 | 
						|
 | 
						|
        try:
 | 
						|
            self.__send_response(client.getresponse())
 | 
						|
        except Exception as e:
 | 
						|
            self.log_error("Error delivering response: {}".format(str(e)))
 | 
						|
 | 
						|
    def __get_host_path(self):
 | 
						|
        """Figure out the host to contact and the path of the resource that is
 | 
						|
        being requested."""
 | 
						|
        host = self.headers.get("host")
 | 
						|
        url  = urllib.parse.urlparse(self.path)
 | 
						|
        path = url.path
 | 
						|
 | 
						|
        return host, path
 | 
						|
 | 
						|
    def __send_response(self, response):
 | 
						|
        """Pass on upstream response headers and body to the client."""
 | 
						|
        self.send_response(response.status)
 | 
						|
 | 
						|
        for name, value in response.getheaders():
 | 
						|
            self.send_header(name, value)
 | 
						|
 | 
						|
        self.end_headers()
 | 
						|
        shutil.copyfileobj(response, self.wfile)
 | 
						|
 | 
						|
    def __send_error(self, status, message):
 | 
						|
        """Return an HTTP error status and a message in the response body."""
 | 
						|
        self.send_response(status)
 | 
						|
        self.send_header("Content-Type", "text/plain; charset=utf-8")
 | 
						|
        self.end_headers()
 | 
						|
        self.wfile.write(message.encode("utf-8"))
 | 
						|
 | 
						|
 | 
						|
class MagicHTTPProxy(socketserver.ThreadingMixIn, http.server.HTTPServer):
 | 
						|
    """Tiny HTTP server using ProxyingHTTPRequestHandler instances to provide
 | 
						|
    a snapshot view of the package repositories."""
 | 
						|
 | 
						|
    def __init__(self, server_address, server_port, cache_file=None,
 | 
						|
            repo_snapshot_stamp=time.time(), run_as=None):
 | 
						|
 | 
						|
        try:
 | 
						|
            super(http.server.HTTPServer, self).__init__(
 | 
						|
                (server_address, server_port), ProxyingHTTPRequestHandler)
 | 
						|
        except OSError as e:
 | 
						|
            raise LPInReleaseProxyError(
 | 
						|
                "Could not initialize proxy: {}".format(str(e)))
 | 
						|
 | 
						|
        self.inrelease_cache = LPInReleaseCache(filename=cache_file)
 | 
						|
        self.snapshot_stamp  = repo_snapshot_stamp
 | 
						|
 | 
						|
 | 
						|
class MagicHTTPProxyCli:
 | 
						|
    """A CLI interface for the MagicHTTPProxy."""
 | 
						|
 | 
						|
    def __init__(self, name):
 | 
						|
        self._name = name
 | 
						|
        self._address = "127.0.0.1"
 | 
						|
        self._port = 8080
 | 
						|
        self._timestamp = time.time()
 | 
						|
        self._run_as = None
 | 
						|
        self._pid_file = None
 | 
						|
        self._log_file = None
 | 
						|
        self._background = False
 | 
						|
        self._setsid = False
 | 
						|
 | 
						|
    def __call__(self, args):
 | 
						|
        options = self._parse_opts(args)
 | 
						|
 | 
						|
        proxy = MagicHTTPProxy(
 | 
						|
            options.address,
 | 
						|
            options.port,
 | 
						|
            cache_file=None,
 | 
						|
            repo_snapshot_stamp=options.timestamp
 | 
						|
        )
 | 
						|
 | 
						|
        # Detach, but keep all streams open.
 | 
						|
        if options.background:
 | 
						|
            pid = os.fork()
 | 
						|
            if pid:
 | 
						|
                os._exit(EXIT_OK)
 | 
						|
 | 
						|
        if options.log_file:
 | 
						|
            fd = open(options.log_file, "wb+")
 | 
						|
            os.dup2(fd.fileno(), sys.stdout.fileno())
 | 
						|
            os.dup2(fd.fileno(), sys.stderr.fileno())
 | 
						|
 | 
						|
        # Become session leader and give up controlling terminal.
 | 
						|
        if options.setsid:
 | 
						|
            if not options.log_file:
 | 
						|
                fd = open(os.devnull, "wb+")
 | 
						|
                os.dup2(fd.fileno(), sys.stdout.fileno())
 | 
						|
                os.dup2(fd.fileno(), sys.stderr.fileno())
 | 
						|
            os.setsid()
 | 
						|
 | 
						|
        if options.pid_file:
 | 
						|
            with open(options.pid_file, "w+", encoding="utf-8") as fp:
 | 
						|
                fp.write(str(os.getpid()))
 | 
						|
 | 
						|
        if options.run_as is not None:
 | 
						|
            try:
 | 
						|
                uid = pwd.getpwnam(options.run_as).pw_uid
 | 
						|
                os.setuid(uid)
 | 
						|
            except KeyError as e:
 | 
						|
                sys.stderr.write("Failed to lookup {}: {}\n"
 | 
						|
                        .format(options.run_as, str(e)))
 | 
						|
                sys.exit(EXIT_ERR)
 | 
						|
            except PermissionError as e:
 | 
						|
                sys.stderr.write("Cannot setuid: {}\n".format(str(e)))
 | 
						|
                sys.exit(EXIT_ERR)
 | 
						|
 | 
						|
        proxy.serve_forever()
 | 
						|
 | 
						|
    def _parse_opts(self, args):
 | 
						|
        """Parse command line arguments and initialize the CLI object."""
 | 
						|
        parser = argparse.ArgumentParser()
 | 
						|
 | 
						|
        parser.add_argument("--address", dest="address", type=str,
 | 
						|
                default="127.0.0.1", help="The address of the interface to "
 | 
						|
                    "bind to (default: 127.0.0.1)")
 | 
						|
        parser.add_argument("--port", dest="port", type=int, default=8080,
 | 
						|
                help="The port to listen on (default: 8080)")
 | 
						|
        parser.add_argument("-t", "--cutoff-time", dest="timestamp", type=int,
 | 
						|
                required=True, help="A POSIX timestamp to pin the repo to.")
 | 
						|
        parser.add_argument("--run-as", dest="run_as", type=str,
 | 
						|
                help="Drop privileges and run as this user.")
 | 
						|
        parser.add_argument("--pid-file", dest="pid_file", type=str,
 | 
						|
                help="Store the PID to this file.")
 | 
						|
        parser.add_argument("--log-file", dest="log_file", type=str,
 | 
						|
                help="Re-direct all streams to this file.")
 | 
						|
        parser.add_argument("--background", dest="background",
 | 
						|
                action="store_true",
 | 
						|
                help="Whether to go into the background.")
 | 
						|
        parser.add_argument("--setsid", dest="setsid",
 | 
						|
                action="store_true",
 | 
						|
                help="Become session leader and drop controlling TTY.")
 | 
						|
 | 
						|
        return parser.parse_args(args)
 | 
						|
 | 
						|
if __name__ == "__main__":
 | 
						|
    name = os.path.basename(sys.argv[0])
 | 
						|
 | 
						|
    try:
 | 
						|
        if name == "lp-in-release":
 | 
						|
            cli = LPInReleaseIndexCli(name)
 | 
						|
        else:
 | 
						|
            cli = MagicHTTPProxyCli(name)
 | 
						|
 | 
						|
        cli(sys.argv[1:])
 | 
						|
    except LPInReleaseBaseError as e:
 | 
						|
        sys.stderr.write("{}: {}\n".format(name, str(e)))
 | 
						|
        sys.exit(EXIT_ERR)
 | 
						|
    except KeyboardInterrupt:
 | 
						|
        sys.stderr.write("{}: Caught keyboard interrupt, exiting...\n"
 | 
						|
                .format(name))
 | 
						|
        sys.exit(EXIT_ERR)
 |