#!/usr/bin/env python3 import subprocess import io import os import sys import yaml import argparse import logging from datetime import datetime import tarfile import shutil from git import Repo, GitCommandError import tempfile from concurrent.futures import ThreadPoolExecutor, wait, FIRST_COMPLETED import fnmatch import re from debian.copyright import Header, Copyright import uuid from common import clean_old_logs BASE_DIR = "/srv/lubuntu-ci/repos" DEBFULLNAME = "Lugito" DEBEMAIL = "info@lubuntu.me" OUTPUT_DIR = os.path.join(BASE_DIR, "build_output") SUPPRESSED_LINTIAN_TAGS = [ "orig-tarball-missing-upstream-signature", "package-has-long-file-name", "adopted-extended-field" ] BASE_OUTPUT_DIR = "/srv/lubuntu-ci/output" LOG_DIR = os.path.join(BASE_OUTPUT_DIR, "logs", "source_builds") BASE_LINTIAN_DIR = os.path.join(BASE_OUTPUT_DIR, f".lintian.tmp.{str(uuid.uuid4())[:8]}") REAL_LINTIAN_DIR = os.path.join(BASE_OUTPUT_DIR, "lintian") os.makedirs(LOG_DIR, exist_ok=True) os.makedirs(OUTPUT_DIR, exist_ok=True) os.makedirs(BASE_LINTIAN_DIR, exist_ok=True) current_time = datetime.utcnow().strftime("%Y%m%dT%H%M%S") log_file = os.path.join(LOG_DIR, f"{current_time}.log") logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s", handlers=[ logging.FileHandler(log_file), logging.StreamHandler() ] ) logger = logging.getLogger("TimeBasedLogger") def run_command(cmd, cwd=None, env=None, show_output=False): logging.info(f"Executing: {' '.join(cmd)} in {cwd or 'current directory'}") try: result = subprocess.run( cmd, cwd=cwd, env=env, check=True, capture_output=True, text=True ) if show_output: if result.stdout: logging.info(f"Output: {result.stdout.strip()}") if result.stderr: logging.warning(f"Output: {result.stderr.strip()}") logging.info(f"Command succeeded: {' '.join(cmd)}") except subprocess.CalledProcessError as e: logging.error(f"Command failed: {' '.join(cmd)}") logging.error(e.stderr) logging.error(e.stdout) raise def parse_version(changelog_path): try: epoch = None with open(changelog_path, "r") as f: first_line = f.readline().strip() version_match = first_line.split("(")[1].split(")")[0] upstream_version = version_match if ":" in version_match: epoch, upstream_version = version_match.split(":", 1) upstream_version = upstream_version.split("-")[0] upstream_version = re.sub(r"(\+git[0-9]+)?(~[a-z]+)?$", "", upstream_version) # Log the results current_date = datetime.now().strftime("%Y%m%d%H%M") if epoch: version = f"{epoch}:{upstream_version}+git{current_date}" logging.info(f"Upstream version extracted: {epoch}:{upstream_version}") logging.info(f"Going to use: {version}") else: version = f"{upstream_version}+git{current_date}" logging.info(f"Upstream version extracted: {upstream_version}") logging.info(f"Going to use: {version}") return version except (IndexError, FileNotFoundError) as e: logging.error(f"Error parsing version from {changelog_path}: {e}") raise def get_exclusions(packaging): exclusions = [] with io.open(os.path.join(packaging, "debian/copyright"), "rt", encoding="utf-8") as f: copyright_obj = Copyright(f) for paragraph in copyright_obj.all_paragraphs(): if isinstance(paragraph, Header): if paragraph.files_excluded: for file_name in paragraph.files_excluded: exclusions.append(file_name) break return exclusions def create_tarball(name, source_dir, exclusions=[]): tar_filename = f"{name}_MAIN.orig.tar.gz" logging.info(f"Creating tarball: {tar_filename}") exclusions.append(".git/") def exclusion_func(tarinfo): for exclusion in exclusions: if exclusion in tarinfo.name: return None return tarinfo with tarfile.open(tar_filename, "w:gz") as tar: tar.add(source_dir, arcname=os.path.basename(source_dir), filter=exclusion_func) logging.info(f"Tarball created and compressed: {tar_filename}") def update_changelog(packaging_dir, release, version, env): name = os.path.basename(packaging_dir) logging.info(f"Updating changelog for {name} to version {version}-0ubuntu1~ppa1") run_command(["git", "checkout", "debian/changelog"], cwd=packaging_dir) cmd = [ "dch", "--distribution", release, "--package", name, "--newversion", f"{version}-0ubuntu1~ppa1", "--urgency", "low", "CI upload." ] run_command(cmd, cwd=packaging_dir, env=env) def build_package(packaging_dir, env, large): name = os.path.basename(packaging_dir) logging.info(f"Building source package for {name}") if large: temp_dir = os.path.join(OUTPUT_DIR, f".tmp_{name}_{env['VERSION']}") logging.warning(f"{name} is quite large and will not fit in /tmp, building at {temp_dir}") if not os.path.exists(temp_dir): os.mkdir(temp_dir) else: temp_dir = tempfile.mkdtemp() try: temp_packaging_dir = os.path.join(temp_dir, name) os.makedirs(temp_packaging_dir, exist_ok=True) shutil.copytree(packaging_dir + "/debian", temp_packaging_dir + "/debian") tarball_name = f"{name}_{env['VERSION']}.orig.tar.gz" tarball_source = os.path.join(BASE_DIR, tarball_name) tarball_dest = os.path.join(temp_dir, tarball_name) shutil.copyfile(tarball_source, tarball_dest) cmd_build = ["debuild", "--no-lintian", "-S", "-d", "-sa", "-nc"] run_command(cmd_build, cwd=temp_packaging_dir, env=env) run_command(["git", "checkout", "debian/changelog"], cwd=packaging_dir) pattern = f"{name}_{env['VERSION']}*" for filename in os.listdir(temp_dir): if fnmatch.fnmatch(filename, pattern): source_file = os.path.join(temp_dir, filename) dest_file = os.path.join(OUTPUT_DIR, filename) shutil.copyfile(source_file, dest_file) logging.info(f"Copied {filename} to {OUTPUT_DIR}") changes_files = [f for f in os.listdir(OUTPUT_DIR) if f.startswith(f"{name}_{env['VERSION']}") and f.endswith("_source.changes")] finally: shutil.rmtree(temp_dir) if changes_files: changes_file = os.path.join(OUTPUT_DIR, changes_files[-1]) logging.info(f"Built package, changes file: {changes_file}") return changes_file else: logging.error("No changes file found after build.") raise FileNotFoundError("Changes file not found.") def load_config(config_path): try: with open(config_path, "r") as f: config = yaml.safe_load(f) if "packages" not in config or "releases" not in config: raise ValueError("Config file must contain 'packages' and 'releases' sections.") return config except Exception as e: logging.error(f"Error loading config file: {e}") sys.exit(1) def clone_or_update_repo(destination, repo_url, repo_branch=None): if os.path.exists(destination): logging.info(f"Repository already exists at {destination}, checking branch and remote URL.") try: repo = Repo(destination) current_remote_url = repo.remotes.origin.url if current_remote_url != repo_url: logging.info(f"Remote URL differs for {destination}. Removing and recloning.") shutil.rmtree(destination) else: repo.git.reset("--hard", "HEAD") current_branch = repo.active_branch.name if repo_branch and current_branch != repo_branch: logging.info(f"Branch differs for {destination}. Removing and recloning.") shutil.rmtree(destination) else: logging.info(f"Repository matches desired remote and branch, pulling updates.") repo.git.checkout(repo_branch or current_branch) try: repo.remotes.origin.pull() repo.submodule_update(recursive=True) logging.info(f"Pulled latest changes for {destination}") except GitCommandError as e: if 'non-fast-forward' in str(e): logging.error(f"Pull failed due to non-fast-forward update: {e}") logging.info(f"Removing repository {destination} and cloning again.") shutil.rmtree(destination) else: logging.error(f"Pull failed for {destination}: {e}") raise else: return except Exception as e: logging.error(f"Error updating repository {destination}: {e}") logging.info(f"Removing repository {destination} and cloning again.") shutil.rmtree(destination) try: logging.info(f"Cloning repository {repo_url} into {destination}") repo = Repo.clone_from(repo_url, destination, recurse_submodules=True) if repo_branch: repo.git.checkout(repo_branch) logging.info(f"Checked out {repo_branch} in {destination}") except GitCommandError as e: logging.error(f"Git clone failed for {repo_url}: {e}") raise def publish_lintian(): if os.path.exists(BASE_LINTIAN_DIR): for root, dirs, files in os.walk(BASE_LINTIAN_DIR): for file in files: # Determine the source and destination paths src_path = os.path.join(root, file) rel_path = os.path.relpath(src_path, BASE_LINTIAN_DIR) dest_path = os.path.join(REAL_LINTIAN_DIR, rel_path) # Ensure the destination directory exists os.makedirs(os.path.dirname(dest_path), exist_ok=True) # Copy the file shutil.copy2(src_path, dest_path) # Remove the temporary directory shutil.rmtree(BASE_LINTIAN_DIR) def run_source_lintian(name, sources_path): logging.info(f"Running Lintian for {name}") with tempfile.NamedTemporaryFile(mode='w+', suffix='.txt') as temp_file: temp_file.write("\n".join(SUPPRESSED_LINTIAN_TAGS)) temp_file.flush() temp_file_path = temp_file.name cmd = [ "lintian", "-EvIL", "+pedantic", "--suppress-tags-from-file", f"{temp_file_path}", sources_path ] result = subprocess.run( cmd, capture_output=True, text=True ) stderr, stdout = None, None if result.stderr: stderr = result.stderr.strip() if result.stdout: stdout = result.stdout.strip() lintian_output = None if stderr == stdout: lintian_output = stderr else: lintian_output = f"{stderr}\n{stdout}".strip() if lintian_output: pkgdir = os.path.join(BASE_LINTIAN_DIR, name) if not os.path.exists(pkgdir): os.mkdir(pkgdir) output_file = os.path.join(pkgdir, "source.txt") with open(output_file, "a") as f: f.write(lintian_output) logging.info(f"Lintian run for {name} is complete") def main(): parser = argparse.ArgumentParser(description="Automate Lubuntu package builds.") parser.add_argument("config", help="Path to the YAML configuration file.") parser.add_argument("--skip-dput", action="store_true", help="Skip the dput upload step.") parser.add_argument("--skip-cleanup", action="store_true", help="Skip removal of build_output.") args = parser.parse_args() config = load_config(args.config) packages = config["packages"] releases = config["releases"] os.makedirs(BASE_DIR, exist_ok=True) logging.info(f"Using base directory: {BASE_DIR}") os.chdir(BASE_DIR) with ThreadPoolExecutor(max_workers=5) as executor: def dput_source(name, upload_target, changes_files, devel_changes_files): if changes_files: hr_changes = ", ".join(changes_files) logging.info(f"Uploading {hr_changes} to {upload_target} using dput") cmd_upload = ["dput", upload_target] + changes_files run_command(cmd_upload, cwd=OUTPUT_DIR) logging.info(f"Completed upload of {hr_changes} to {upload_target}") for file in devel_changes_files: if file: futures.add(executor.submit(run_source_lintian, name, file)) def prepare_package(pkg): name = pkg.get("name") if not name: logging.warning(f"Skipping package due to missing name: {pkg}") return upstream_url = pkg.get("upstream_url") or f"https://github.com/lxqt/{name}.git" upstream_destination = os.path.join(BASE_DIR, f"upstream-{name}") clone_or_update_repo(upstream_destination, upstream_url) packaging_url = pkg.get("packaging_url") or f"https://git.lubuntu.me/Lubuntu/{name}-packaging.git" packaging_branch = pkg.get("packaging_branch") or f"ubuntu/{releases[0]}" if releases else None packaging_destination = os.path.join(BASE_DIR, name) clone_or_update_repo(packaging_destination, packaging_url, packaging_branch) exclusions = get_exclusions(packaging_destination) create_tarball(name, upstream_destination, exclusions) run_command(["update-maintainer"], cwd=packaging_destination) futures.add(executor.submit(process_package, pkg)) def process_package(pkg): name = pkg.get("name") upload_target = pkg.get("upload_target", "ppa:lubuntu-ci/unstable-ci-proposed") if not name: logging.warning(f"Skipping package due to missing name: {pkg}") return [] package_changes = [] packaging_destination = os.path.join(BASE_DIR, name) changelog_path = os.path.join(packaging_destination, "debian", "changelog") version = parse_version(changelog_path) for release in releases: logging.info(f"Building {name} for {release}") try: epoch, release_version = None, version if ":" in version: epoch, release_version = version.split(":", 1) release_version = f"{release_version}~{release}" tarball_name = f"{name}_{release_version}.orig.tar.gz" tarball_source = os.path.join(BASE_DIR, f"{name}_MAIN.orig.tar.gz") tarball_dest = os.path.join(BASE_DIR, tarball_name) shutil.copyfile(tarball_source, tarball_dest) if epoch: release_version = f"{epoch}:{release_version}" env = os.environ.copy() env["DEBFULLNAME"] = DEBFULLNAME env["DEBEMAIL"] = DEBEMAIL env["VERSION"] = release_version env["UPLOAD_TARGET"] = upload_target # Update changelog and build package update_changelog(packaging_destination, release, release_version, env) if epoch: _, env["VERSION"] = release_version.split(":", 1) changes_file = build_package(packaging_destination, env, pkg.get("large", False)) if changes_file: package_changes.append((changes_file, env)) os.remove(os.path.join(BASE_DIR, tarball_name)) except Exception as e: logging.error(f"Error processing package '{name}' for release '{release}': {e}") changes_files = [os.path.basename(cf) for cf, env in package_changes] devel_changes_files = set(os.path.join(OUTPUT_DIR, file) if releases[0] in file else None for file in changes_files) if args.skip_dput: for changes_file in devel_changes_files: if changes_file: futures.add(executor.submit(run_source_lintian, name, changes_file)) else: if package_changes: upload_target = package_changes[0][1]["UPLOAD_TARGET"] futures.add(executor.submit(dput_source, name, upload_target, changes_files, devel_changes_files)) os.remove(os.path.join(BASE_DIR, f"{name}_MAIN.orig.tar.gz")) futures = set(executor.submit(prepare_package, pkg) for pkg in packages) while futures: done, not_done = wait(futures, return_when=FIRST_COMPLETED) for future in done: try: result = future.result() except Exception as e: logging.exception("Task generated an exception") finally: futures.remove(future) if not args.skip_cleanup: shutil.rmtree(OUTPUT_DIR) logging.info("Publishing Lintian output...") publish_lintian() clean_old_logs(LOG_DIR) logging.info("Script completed successfully.") if __name__ == "__main__": main()