You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

411 lines
16 KiB

1 month ago
#!/usr/bin/env python3
import subprocess
import io
import os
import sys
import yaml
import argparse
import logging
from datetime import datetime
import tarfile
import shutil
from git import Repo, GitCommandError
import tempfile
from concurrent.futures import ThreadPoolExecutor, wait, FIRST_COMPLETED
import fnmatch
import re
from debian.copyright import Header, Copyright
import uuid
from common import clean_old_logs
BASE_DIR = "/srv/lubuntu-ci/repos"
DEBFULLNAME = "Lugito"
DEBEMAIL = "info@lubuntu.me"
OUTPUT_DIR = os.path.join(BASE_DIR, "build_output")
SUPPRESSED_LINTIAN_TAGS = [
"orig-tarball-missing-upstream-signature",
"package-has-long-file-name",
"adopted-extended-field"
]
BASE_OUTPUT_DIR = "/srv/lubuntu-ci/output"
LOG_DIR = os.path.join(BASE_OUTPUT_DIR, "logs", "source_builds")
BASE_LINTIAN_DIR = os.path.join(BASE_OUTPUT_DIR, f"lintian.tmp.{str(uuid.uuid4())[:8]}")
REAL_LINTIAN_DIR = os.path.join(BASE_OUTPUT_DIR, "lintian")
os.makedirs(LOG_DIR, exist_ok=True)
os.makedirs(OUTPUT_DIR, exist_ok=True)
os.makedirs(BASE_LINTIAN_DIR, exist_ok=True)
current_time = datetime.utcnow().strftime("%H-%M-%S")
log_file = os.path.join(LOG_DIR, f"{current_time}.log")
1 month ago
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[
logging.FileHandler(log_file),
logging.StreamHandler()
]
)
logger = logging.getLogger("TimeBasedLogger")
def run_command(cmd, cwd=None, env=None, show_output=False):
logging.info(f"Executing: {' '.join(cmd)} in {cwd or 'current directory'}")
try:
result = subprocess.run(
cmd,
cwd=cwd,
env=env,
check=True,
capture_output=True,
text=True
)
if show_output:
if result.stdout:
logging.info(f"Output: {result.stdout.strip()}")
if result.stderr:
logging.warning(f"Output: {result.stderr.strip()}")
logging.info(f"Command succeeded: {' '.join(cmd)}")
except subprocess.CalledProcessError as e:
logging.error(f"Command failed: {' '.join(cmd)}")
logging.error(e.stderr)
raise
def parse_version(changelog_path):
try:
with open(changelog_path, "r") as f:
first_line = f.readline().strip()
version_match = first_line.split("(")[1].split(")")[0]
# Remove Debian revision
upstream_version = version_match.split("-")[0]
# Remove '+git...' and '~release' if present
upstream_version = re.sub(r'(\+git[0-9]+)?(~[a-z]+)?$', '', upstream_version)
logging.info(f"Upstream version extracted: {upstream_version}")
current_date = datetime.now().strftime("%Y%m%d%H%M")
version = f"{upstream_version}+git{current_date}"
logging.info(f"Parsed VERSION: {version}")
return version
except (IndexError, FileNotFoundError) as e:
logging.error(f"Error parsing version from {changelog_path}: {e}")
raise
def get_exclusions(packaging):
exclusions = []
with io.open(os.path.join(packaging, "debian/copyright"), "rt", encoding="utf-8") as f:
copyright_obj = Copyright(f)
for paragraph in copyright_obj.all_paragraphs():
if isinstance(paragraph, Header):
if paragraph.files_excluded:
for file_name in paragraph.files_excluded:
exclusions.append(file_name)
break
return exclusions
def create_tarball(name, source_dir, exclusions=[]):
tar_filename = f"{name}_MAIN.orig.tar.gz"
logging.info(f"Creating tarball: {tar_filename}")
exclusions.append(".git/")
def exclusion_func(tarinfo):
for exclusion in exclusions:
if exclusion in tarinfo.name:
return None
return tarinfo
with tarfile.open(tar_filename, "w:gz") as tar:
tar.add(source_dir, arcname=os.path.basename(source_dir), filter=exclusion_func)
logging.info(f"Tarball created and compressed: {tar_filename}")
def update_changelog(packaging_dir, release, version, env):
name = os.path.basename(packaging_dir)
logging.info(f"Updating changelog for {name} to version {version}-0ubuntu1~ppa1")
run_command(["git", "checkout", "debian/changelog"], cwd=packaging_dir)
cmd = [
"dch",
"--distribution", release,
"--package", name,
"--newversion", f"{version}-0ubuntu1~ppa1",
"--urgency", "low",
"CI upload."
]
run_command(cmd, cwd=packaging_dir, env=env)
def build_package(packaging_dir, env):
name = os.path.basename(packaging_dir)
logging.info(f"Building source package for {name}")
temp_dir = tempfile.mkdtemp()
try:
temp_packaging_dir = os.path.join(temp_dir, name)
os.makedirs(temp_packaging_dir, exist_ok=True)
shutil.copytree(packaging_dir + "/debian", temp_packaging_dir + "/debian")
tarball_name = f"{name}_{env['VERSION']}.orig.tar.gz"
tarball_source = os.path.join(BASE_DIR, tarball_name)
tarball_dest = os.path.join(temp_dir, tarball_name)
shutil.copyfile(tarball_source, tarball_dest)
cmd_build = ["debuild", "--no-lintian", "-S", "-d", "-sa"]
run_command(cmd_build, cwd=temp_packaging_dir, env=env)
run_command(["git", "checkout", "debian/changelog"], cwd=packaging_dir)
pattern = f"{name}_{env['VERSION']}*"
for filename in os.listdir(temp_dir):
if fnmatch.fnmatch(filename, pattern):
source_file = os.path.join(temp_dir, filename)
dest_file = os.path.join(OUTPUT_DIR, filename)
shutil.copyfile(source_file, dest_file)
logging.info(f"Copied {filename} to {OUTPUT_DIR}")
changes_files = [f for f in os.listdir(OUTPUT_DIR) if f.startswith(f"{name}_{env['VERSION']}") and f.endswith("_source.changes")]
if changes_files:
changes_file = os.path.join(OUTPUT_DIR, changes_files[-1])
logging.info(f"Built package, changes file: {changes_file}")
return changes_file
else:
logging.error("No changes file found after build.")
raise FileNotFoundError("Changes file not found.")
finally:
shutil.rmtree(temp_dir)
def load_config(config_path):
try:
with open(config_path, "r") as f:
config = yaml.safe_load(f)
if "packages" not in config or "releases" not in config:
raise ValueError("Config file must contain 'packages' and 'releases' sections.")
return config
except Exception as e:
logging.error(f"Error loading config file: {e}")
sys.exit(1)
def clone_or_update_repo(destination, repo_url, repo_branch=None):
if os.path.exists(destination):
logging.info(f"Repository already exists at {destination}, checking branch and remote URL.")
try:
repo = Repo(destination)
current_remote_url = repo.remotes.origin.url
if current_remote_url != repo_url:
logging.info(f"Remote URL differs for {destination}. Removing and recloning.")
shutil.rmtree(destination)
else:
repo.git.reset("--hard", "HEAD")
current_branch = repo.active_branch.name
if repo_branch and current_branch != repo_branch:
logging.info(f"Branch differs for {destination}. Removing and recloning.")
shutil.rmtree(destination)
else:
logging.info(f"Repository matches desired remote and branch, pulling updates.")
repo.git.checkout(repo_branch or current_branch)
try:
repo.remotes.origin.pull()
repo.submodule_update(recursive=True)
logging.info(f"Pulled latest changes for {destination}")
except GitCommandError as e:
if 'non-fast-forward' in str(e):
logging.error(f"Pull failed due to non-fast-forward update: {e}")
logging.info(f"Removing repository {destination} and cloning again.")
shutil.rmtree(destination)
else:
logging.error(f"Pull failed for {destination}: {e}")
raise
else:
return
except Exception as e:
logging.error(f"Error updating repository {destination}: {e}")
logging.info(f"Removing repository {destination} and cloning again.")
shutil.rmtree(destination)
try:
logging.info(f"Cloning repository {repo_url} into {destination}")
repo = Repo.clone_from(repo_url, destination, recurse_submodules=True)
if repo_branch:
repo.git.checkout(repo_branch)
logging.info(f"Checked out {repo_branch} in {destination}")
except GitCommandError as e:
logging.error(f"Git clone failed for {repo_url}: {e}")
raise
def publish_lintian():
if os.path.exists(BASE_LINTIAN_DIR):
for root, dirs, files in os.walk(BASE_LINTIAN_DIR):
for file in files:
# Determine the source and destination paths
src_path = os.path.join(root, file)
rel_path = os.path.relpath(src_path, BASE_LINTIAN_DIR)
dest_path = os.path.join(REAL_LINTIAN_DIR, rel_path)
# Ensure the destination directory exists
os.makedirs(os.path.dirname(dest_path), exist_ok=True)
# Copy the file
shutil.copy2(src_path, dest_path)
# Remove the temporary directory
shutil.rmtree(BASE_LINTIAN_DIR)
def run_source_lintian(name, sources_path):
logging.info(f"Running Lintian for {name}")
with tempfile.NamedTemporaryFile(mode='w+', suffix='.txt') as temp_file:
temp_file.write("\n".join(SUPPRESSED_LINTIAN_TAGS))
temp_file.flush()
temp_file_path = temp_file.name
cmd = [
"lintian",
"-EvIL",
"+pedantic",
"--suppress-tags-from-file",
f"{temp_file_path}",
sources_path
]
result = subprocess.run(
cmd,
capture_output=True,
text=True
)
stderr, stdout = None, None
if result.stderr:
stderr = result.stderr.strip()
if result.stdout:
stdout = result.stdout.strip()
lintian_output = None
if stderr == stdout:
lintian_output = stderr
else:
lintian_output = f"{stderr}\n{stdout}".strip()
if lintian_output:
pkgdir = os.path.join(BASE_LINTIAN_DIR, name)
if not os.path.exists(pkgdir):
os.mkdir(pkgdir)
output_file = os.path.join(pkgdir, "source.txt")
with open(output_file, "a") as f:
f.write(lintian_output)
logging.info(f"Lintian run for {name} is complete")
def main():
parser = argparse.ArgumentParser(description="Automate Lubuntu package builds.")
parser.add_argument("config", help="Path to the YAML configuration file.")
parser.add_argument("--skip-dput", action="store_true", help="Skip the dput upload step.")
parser.add_argument("--skip-cleanup", action="store_true", help="Skip removal of build_output.")
args = parser.parse_args()
config = load_config(args.config)
packages = config["packages"]
releases = config["releases"]
os.makedirs(BASE_DIR, exist_ok=True)
logging.info(f"Using base directory: {BASE_DIR}")
os.chdir(BASE_DIR)
with ThreadPoolExecutor(max_workers=5) as executor:
def dput_source(name, upload_target, changes_files, devel_changes_files):
if changes_files:
hr_changes = ", ".join(changes_files)
logging.info(f"Uploading {hr_changes} to {upload_target} using dput")
cmd_upload = ["dput", upload_target] + changes_files
run_command(cmd_upload, cwd=OUTPUT_DIR)
logging.info(f"Completed upload of {hr_changes} to {upload_target}")
for file in devel_changes_files:
if file:
futures.add(executor.submit(run_source_lintian, name, file))
def prepare_package(pkg):
name = pkg.get("name")
if not name:
logging.warning(f"Skipping package due to missing name: {pkg}")
return
upstream_url = pkg.get("upstream_url") or f"https://github.com/lxqt/{name}.git"
upstream_destination = os.path.join(BASE_DIR, f"upstream-{name}")
clone_or_update_repo(upstream_destination, upstream_url)
packaging_url = pkg.get("packaging_url") or f"https://git.lubuntu.me/Lubuntu/{name}-packaging.git"
packaging_branch = pkg.get("packaging_branch") or f"ubuntu/{releases[0]}" if releases else None
packaging_destination = os.path.join(BASE_DIR, name)
clone_or_update_repo(packaging_destination, packaging_url, packaging_branch)
exclusions = get_exclusions(packaging_destination)
create_tarball(name, upstream_destination, exclusions)
run_command(["update-maintainer"], cwd=packaging_destination)
futures.add(executor.submit(process_package, pkg))
def process_package(pkg):
name = pkg.get("name")
upload_target = pkg.get("upload_target", "ppa:lubuntu-ci/unstable-ci-proposed")
if not name:
logging.warning(f"Skipping package due to missing name: {pkg}")
return []
package_changes = []
packaging_destination = os.path.join(BASE_DIR, name)
changelog_path = os.path.join(packaging_destination, "debian", "changelog")
version = parse_version(changelog_path)
for release in releases:
logging.info(f"Building {name} for {release}")
try:
release_version = f"{version}~{release}"
tarball_name = f"{name}_{release_version}.orig.tar.gz"
tarball_source = os.path.join(BASE_DIR, f"{name}_MAIN.orig.tar.gz")
tarball_dest = os.path.join(BASE_DIR, tarball_name)
shutil.copyfile(tarball_source, tarball_dest)
env = os.environ.copy()
env["DEBFULLNAME"] = DEBFULLNAME
env["DEBEMAIL"] = DEBEMAIL
env["VERSION"] = release_version
env["UPLOAD_TARGET"] = upload_target
# Update changelog and build package
update_changelog(packaging_destination, release, release_version, env)
changes_file = build_package(packaging_destination, env)
if changes_file:
package_changes.append((changes_file, env))
os.remove(os.path.join(BASE_DIR, tarball_name))
except Exception as e:
logging.error(f"Error processing package '{name}' for release '{release}': {e}")
changes_files = [os.path.basename(cf) for cf, env in package_changes]
devel_changes_files = set(os.path.join(OUTPUT_DIR, file) if releases[0] in file else None for file in changes_files)
if args.skip_dput:
for changes_file in devel_changes_files:
if changes_file:
futures.add(executor.submit(run_source_lintian, name, changes_file))
else:
upload_target = package_changes[0][1]["UPLOAD_TARGET"]
futures.add(executor.submit(dput_source, name, upload_target, changes_files, devel_changes_files))
os.remove(os.path.join(BASE_DIR, f"{name}_MAIN.orig.tar.gz"))
futures = set(executor.submit(prepare_package, pkg) for pkg in packages)
while futures:
done, not_done = wait(futures, return_when=FIRST_COMPLETED)
for future in done:
try:
result = future.result()
except Exception as e:
logging.exception("Task generated an exception")
finally:
futures.remove(future)
if not args.skip_cleanup:
shutil.rmtree(OUTPUT_DIR)
logging.info("Publishing Lintian output...")
publish_lintian()
clean_old_logs(LOG_DIR)
logging.info("Script completed successfully.")
if __name__ == "__main__":
main()