#!/usr/bin/env python3 # fempkg - a simple package manager # Copyright (C) 2026 Gabriel Di Martino # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . import os import shutil import ssl import subprocess import errno import urllib from urllib.parse import urlparse from tqdm import tqdm import requests # Directories BUILD_DIR = "/tmp/fempkgbuild" PKG_DIR = "/var/lib/fempkg/pkgs" RECIPE_CACHE_DIR = os.path.expanduser("/var/lib/fempkg/repo") MANIFEST_CACHE_DIR = "/var/lib/fempkg/manifests" # current manifests (pkgname.txt) VERSIONED_MANIFEST_DIR = "/var/lib/fempkg/manifest-versions" # versioned manifests pkgname-version.txt LOCAL_MANIFESTS_DIR = "/var/lib/fempkg/local-manifests" # temporary snapshots used during install BINPKG_CACHE_DIR = "/var/lib/fempkg/binpkg" os.makedirs(PKG_DIR, exist_ok=True) os.makedirs(BUILD_DIR, exist_ok=True) # Utility functions def _ensure_dir(path): try: os.makedirs(path, exist_ok=True) except OSError as e: if e.errno != errno.EEXIST: raise def version_satisfies(installed, latest): def parse(v): parts = v.replace("-", ".").split(".") parsed = [] for p in parts: parsed.append(int(p) if p.isdigit() else p) return parsed return parse(installed) >= parse(latest) def _install_insecure_opener_if_requested(): """Sets up insecure SSL handling if FEMPKG_INSECURE=1""" if os.environ.get("FEMPKG_INSECURE") == "1": ctx = ssl.create_default_context() ctx.check_hostname = False ctx.verify_mode = ssl.CERT_NONE opener = urllib.request.build_opener(urllib.request.HTTPSHandler(context=ctx)) urllib.request.install_opener(opener) def _safe_basename_from_url(url): parsed = urlparse(url) name = os.path.basename(parsed.path) return name or "downloaded.file" def _strip_archive_ext(name): for ext in [".tar.gz", ".tar.bz2", ".tar.xz", ".tgz", ".tar"]: if name.endswith(ext): return name[:-len(ext)] return os.path.splitext(name)[0] def _clone_git_repo(url): """Clone a git repository into BUILD_DIR""" repo_name = os.path.splitext(os.path.basename(url))[0] dest = os.path.join(BUILD_DIR, repo_name) if os.path.exists(dest): shutil.rmtree(dest) os.makedirs(BUILD_DIR, exist_ok=True) subprocess.check_call(["git", "clone", url, dest]) return dest # Download function with progress bar (ABI-safe) def download_to(url, dest): _install_insecure_opener_if_requested() os.makedirs(os.path.dirname(dest), exist_ok=True) # Handle insecure SSL if requested verify_ssl = os.environ.get("FEMPKG_INSECURE") != "1" headers = { "User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/118.0.0.0 Safari/537.36" } with requests.get(url, headers=headers, stream=True, verify=verify_ssl) as r: r.raise_for_status() total_size = int(r.headers.get('content-length', 0)) chunk_size = 1024 * 1024 # 1 MB with open(dest, "wb") as f, tqdm( total=total_size, unit='B', unit_scale=True, desc=f"Downloading {os.path.basename(dest)}", ) as pbar: for chunk in r.iter_content(chunk_size=chunk_size): if chunk: # filter out keep-alive chunks f.write(chunk) pbar.update(len(chunk)) # Main download + prepare directory def download_extract(url, source_type): _install_insecure_opener_if_requested() if source_type == "git": return _clone_git_repo(url) basename = _safe_basename_from_url(url) tarball_path = os.path.join(PKG_DIR, basename) if not os.path.exists(tarball_path): download_to(url, tarball_path) dest_dir = os.path.join(BUILD_DIR, _strip_archive_ext(basename)) if os.path.exists(dest_dir): shutil.rmtree(dest_dir) os.makedirs(dest_dir) return dest_dir # --- helpers --- def safe_rmtree(path): """Remove path recursively or symlink target.""" if os.path.islink(path): real_path = os.path.realpath(path) if os.path.exists(real_path): shutil.rmtree(real_path, ignore_errors=True) os.unlink(path) elif os.path.exists(path): shutil.rmtree(path, ignore_errors=True) def _ensure_symlink(src: str, target: str): if os.path.islink(src): if os.readlink(src) == target: return else: os.unlink(src) elif os.path.exists(src): shutil.rmtree(src, ignore_errors=True) os.makedirs(target, exist_ok=True) os.symlink(target, src) _ensure_symlink("/tmp/fempkg", "/var/tmp/fempkg") _ensure_symlink("/tmp/fempkgbuild", "/var/tmp/fempkgbuild") # -------------------------- # Fetch recipes & manifests # -------------------------- def fetch_all_recipes(repo_url="https://rocketleaguechatp.duckdns.org/recipes"): index_url = f"{repo_url}/index.txt" print(f"Fetching recipe index from {index_url}") try: with requests.Session() as session: resp = session.get(index_url) resp.raise_for_status() entries = [e for e in resp.text.splitlines() if e.endswith(".recipe.py")] for entry in tqdm(entries, desc="Recipes", unit="file"): url = f"{repo_url}/{entry}" dest_path = os.path.join(RECIPE_CACHE_DIR, entry) r = session.get(url) r.raise_for_status() with open(dest_path, "wb") as f: f.write(r.content) except Exception as e: print(f"Error fetching index/recipes: {e}") def fetch_all_manifests(repo_url="https://rocketleaguechatp.duckdns.org/manifests"): index_url = f"{repo_url}/index.txt" print(f"Fetching manifest index from {index_url}") try: with requests.Session() as session: resp = session.get(index_url) resp.raise_for_status() entries = [e for e in resp.text.splitlines() if e.endswith(".txt")] for entry in tqdm(entries, desc="Manifests", unit="file"): url = f"{repo_url}/{entry}" dest_path = os.path.join(MANIFEST_CACHE_DIR, entry) r = session.get(url) r.raise_for_status() with open(dest_path, "wb") as f: f.write(r.content) except Exception as e: print(f"Error fetching index/manifests: {e}") def fetch_binpkg_index(repo_url="https://rocketleaguechatp.duckdns.org/binpkg"): index_url = f"{repo_url}/index.txt" print(f"Fetching binpkg index from {index_url}") try: with requests.Session() as session: resp = session.get(index_url) resp.raise_for_status() dest_path = os.path.join(BINPKG_CACHE_DIR, "index.txt") r = session.get(index_url) r.raise_for_status() with open(dest_path, "wb") as f: f.write(r.content) print("Binpkg index downloaded successfully.") except Exception as e: print(f"Error fetching binpkg index: {e}") def fetch_all(repo_url_recipes=None, repo_url_manifests=None, repo_url_binpkg=None): os.system(f"rm -rf {RECIPE_CACHE_DIR}/*") fetch_all_recipes(repo_url_recipes or "https://rocketleaguechatp.duckdns.org/recipes") fetch_all_manifests(repo_url_manifests or "https://rocketleaguechatp.duckdns.org/manifests") fetch_binpkg_index(repo_url_binpkg or "https://rocketleaguechatp.duckdns.org/binpkg") print("All recipes, manifests and the binpkg index have been fetched successfully.") # -------------------------- # Manifest/version utilities # -------------------------- def save_local_manifest_snapshot(pkgname, version, src_manifest_path): """Copy the current manifest to local-manifests/-.txt (temporary snapshot).""" os.makedirs(LOCAL_MANIFESTS_DIR, exist_ok=True) dest = os.path.join(LOCAL_MANIFESTS_DIR, f"{pkgname}-{version}.txt") shutil.copy(src_manifest_path, dest) return dest def promote_local_to_versioned(pkgname, version): """Move local snapshot to the versioned directory and update the current manifest.""" local = os.path.join(LOCAL_MANIFESTS_DIR, f"{pkgname}-{version}.txt") if not os.path.exists(local): return None os.makedirs(VERSIONED_MANIFEST_DIR, exist_ok=True) versioned = os.path.join(VERSIONED_MANIFEST_DIR, f"{pkgname}-{version}.txt") shutil.move(local, versioned) # copy the versioned file to be the current manifest too (manifests/pkgname.txt) current = os.path.join(MANIFEST_CACHE_DIR, f"{pkgname}.txt") shutil.copy(versioned, current) return versioned def remove_versioned_manifest(pkgname, version): p = os.path.join(VERSIONED_MANIFEST_DIR, f"{pkgname}-{version}.txt") if os.path.exists(p): os.remove(p) def read_manifest_paths(manifest_path): """Return a list of absolute paths listed in manifest_path.""" if not os.path.exists(manifest_path): return [] with open(manifest_path) as f: return [line.strip() for line in f if line.strip()] def delete_file_and_prune_dirs(path): """Remove a file and try to prune empty parents up to / (but stop at /).""" try: if os.path.islink(path) or os.path.isfile(path): os.remove(path) elif os.path.isdir(path): # If entire directory path listed, try rmdir (only if empty) try: os.rmdir(path) except OSError: # not empty return # prune parents parent = os.path.dirname(path) while parent and parent != "/" and parent != "": try: os.rmdir(parent) except OSError: break parent = os.path.dirname(parent) except Exception as e: print(f"[fempkg] Warning: failed to remove {path}: {e}")