360 lines
16 KiB
Python
360 lines
16 KiB
Python
#!/usr/bin/env python3
|
|
# fempkg - a simple package manager
|
|
# Copyright (C) 2026 Gabriel Di Martino
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
|
|
import os
|
|
import subprocess
|
|
import shutil
|
|
import requests
|
|
from db import load_db, register_package, is_installed
|
|
from utils import download_extract, version_satisfies, PKG_DIR, _ensure_symlink, save_local_manifest_snapshot, read_manifest_paths, delete_file_and_prune_dirs, promote_local_to_versioned, remove_versioned_manifest
|
|
from tqdm import tqdm
|
|
|
|
# --- basic dirs ---
|
|
TMP_RECIPE_DIR = "/tmp/fempkg"
|
|
os.makedirs(TMP_RECIPE_DIR, exist_ok=True)
|
|
|
|
RECIPE_CACHE_DIR = os.path.expanduser("/var/lib/fempkg/repo")
|
|
MANIFEST_CACHE_DIR = "/var/lib/fempkg/manifests" # current manifests (pkgname.txt)
|
|
VERSIONED_MANIFEST_DIR = "/var/lib/fempkg/manifest-versions" # versioned manifests pkgname-version.txt
|
|
LOCAL_MANIFESTS_DIR = "/var/lib/fempkg/local-manifests" # temporary snapshots used during install
|
|
BINPKG_CACHE_DIR = "/var/lib/fempkg/binpkg"
|
|
|
|
for d in (RECIPE_CACHE_DIR, MANIFEST_CACHE_DIR, VERSIONED_MANIFEST_DIR, LOCAL_MANIFESTS_DIR, BINPKG_CACHE_DIR):
|
|
os.makedirs(d, exist_ok=True)
|
|
|
|
|
|
# --------------------------
|
|
# Recipe / manifest helpers
|
|
# --------------------------
|
|
def fetch_recipe(pkgname):
|
|
path = os.path.join(RECIPE_CACHE_DIR, f"{pkgname}.recipe.py")
|
|
if not os.path.exists(path):
|
|
raise FileNotFoundError(f"Recipe for {pkgname} not found in cache. Run `fempkg update` first.")
|
|
return path
|
|
|
|
def fetch_manifest(pkgname, pkgver=None):
|
|
"""
|
|
Returns a path to a manifest file. If pkgver is provided, creates a resolved
|
|
temporary manifest replacing {pkgver} placeholders and returns that path.
|
|
"""
|
|
path = os.path.join(MANIFEST_CACHE_DIR, f"{pkgname}.txt")
|
|
if not os.path.exists(path):
|
|
raise FileNotFoundError(f"Manifest for {pkgname} not found. Run `fempkg update` first.")
|
|
if not pkgver:
|
|
return path
|
|
temp_path = os.path.join(MANIFEST_CACHE_DIR, f"{pkgname}-resolved.txt")
|
|
with open(path) as f:
|
|
content = f.read()
|
|
content = content.replace("{pkgver}", pkgver)
|
|
with open(temp_path, "w") as f:
|
|
f.write(content)
|
|
return temp_path
|
|
|
|
# --------------------------
|
|
# Rebuild helper
|
|
# --------------------------
|
|
def rebuild_package(packages, repo_dir=None):
|
|
if isinstance(packages, str):
|
|
packages = [packages]
|
|
|
|
db = load_db() # <-- load db once
|
|
|
|
for pkg in packages:
|
|
if pkg not in db["installed"]:
|
|
print(f"[fempkg] Skipping rebuild of {pkg}: not installed.")
|
|
continue
|
|
|
|
print(f"[fempkg] Rebuilding dependency: {pkg}")
|
|
|
|
dep_recipe = None
|
|
if repo_dir:
|
|
dep_recipe = os.path.join(repo_dir, f"{pkg}.recipe.py")
|
|
if not os.path.exists(dep_recipe):
|
|
print(f"Warning: recipe for {pkg} not found in {repo_dir}.")
|
|
dep_recipe = None
|
|
|
|
if not dep_recipe:
|
|
dep_recipe = fetch_recipe(pkg)
|
|
|
|
build_package(dep_recipe, repo_dir, force_rebuild=True)
|
|
|
|
|
|
def extract_tar_zst_with_progress(tar_path, dest="/"):
|
|
"""
|
|
Extract a .tar.zst archive with a progress bar.
|
|
Uses 'tar' with zstd support and shows number of files extracted.
|
|
"""
|
|
# Step 1: get total number of files in the archive
|
|
try:
|
|
result = subprocess.run(
|
|
["tar", "--use-compress-program=zstd", "-tf", tar_path],
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
text=True,
|
|
check=True
|
|
)
|
|
files = result.stdout.splitlines()
|
|
total_files = len(files)
|
|
except subprocess.CalledProcessError as e:
|
|
print(f"[fempkg] Failed to list tar.zst archive: {e.stderr}")
|
|
raise
|
|
|
|
# Step 2: extract with verbose output and tqdm progress
|
|
with tqdm(total=total_files, unit="file", desc=f"Extracting {tar_path}",) as pbar:
|
|
proc = subprocess.Popen(
|
|
["tar", "--use-compress-program=zstd", "-xvf", tar_path, "-C", dest],
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
text=True
|
|
)
|
|
for line in proc.stdout:
|
|
if line.strip():
|
|
pbar.update(1)
|
|
proc.wait()
|
|
if proc.returncode != 0:
|
|
raise subprocess.CalledProcessError(proc.returncode, proc.args)
|
|
|
|
# --------------------------
|
|
# Main build function
|
|
# --------------------------
|
|
def build_package(recipe_file, repo_dir=None, force_rebuild=False, _visited=None):
|
|
_ensure_symlink("/tmp/fempkg", "/var/tmp/fempkg")
|
|
_ensure_symlink("/tmp/fempkgbuild", "/var/tmp/fempkgbuild")
|
|
|
|
if _visited is None:
|
|
_visited = set() # track packages in current build chain
|
|
|
|
db = load_db()
|
|
recipe = {}
|
|
with open(recipe_file, "r") as f:
|
|
exec(f.read(), recipe)
|
|
|
|
name, version = recipe["pkgname"], recipe["pkgver"]
|
|
|
|
# Prevent infinite recursion from dependency loops
|
|
if name in _visited:
|
|
print(f"[fempkg] Detected circular dependency on {name}, skipping...")
|
|
return
|
|
_visited.add(name)
|
|
|
|
source_type = recipe.get("source_type")
|
|
deps = recipe.get("deps", [])
|
|
triggers = recipe.get("triggers", [])
|
|
atomic_upgrade = bool(recipe.get("atomic", False))
|
|
|
|
# --- Check for "source-only" mode ---
|
|
source_only = os.environ.get("FEMPKG_SOURCE", "").lower() in ("1", "true", "yes")
|
|
nodelete_env = os.environ.get("FEMPKG_NODELETE", "").lower() in ("1", "true", "yes")
|
|
|
|
if not force_rebuild and is_installed(name, version, db=db):
|
|
print(f"[fempkg] {name}-{version} is already installed. Skipping installation.")
|
|
return
|
|
|
|
# Build dependencies
|
|
for dep_name in deps:
|
|
dep_recipe = None
|
|
if repo_dir:
|
|
dep_recipe = os.path.join(repo_dir, f"{dep_name}.recipe.py")
|
|
if not os.path.exists(dep_recipe):
|
|
print(f"Warning: recipe for {dep_name} not found in {repo_dir}.")
|
|
dep_recipe = None
|
|
if not dep_recipe:
|
|
dep_recipe = fetch_recipe(dep_name)
|
|
dep_info = {}
|
|
with open(dep_recipe, "r") as f:
|
|
exec(f.read(), dep_info)
|
|
dep_latest_ver = dep_info["pkgver"]
|
|
installed_ver = db["installed"].get(dep_name)
|
|
if installed_ver is None or not version_satisfies(installed_ver, dep_latest_ver):
|
|
print(f"Installing/updating dependency {dep_name} "
|
|
f"(installed: {installed_ver}, latest: {dep_latest_ver})")
|
|
build_package(dep_recipe, repo_dir, _visited=_visited) # <-- pass _visited down
|
|
else:
|
|
print(f"Dependency {dep_name} is up-to-date ({installed_ver}). Skipping.")
|
|
|
|
# --- BINPKG logic (skip if building from source) ---
|
|
if not source_only:
|
|
binpkg_index = os.path.join(BINPKG_CACHE_DIR, "index.txt")
|
|
binpkg_success = False
|
|
binpkg_list = []
|
|
if os.path.exists(binpkg_index):
|
|
with open(binpkg_index, "r") as f:
|
|
binpkg_list = [line.strip() for line in f if line.strip()]
|
|
|
|
if name in binpkg_list:
|
|
print(f"[fempkg] Found prebuilt binary package for {name}. Preparing to install...")
|
|
try:
|
|
binpkg_url = f"https://rocketleaguechatp.duckdns.org/binpkg/{name}-{version}.tar.zst"
|
|
local_path = os.path.join(BINPKG_CACHE_DIR, f"{name}-{version}.tar.zst")
|
|
# download
|
|
with requests.get(binpkg_url, stream=True) as r:
|
|
r.raise_for_status()
|
|
total_size = int(r.headers.get('content-length', 0))
|
|
chunk_size = 1024 * 1024 # 1 MB chunks
|
|
|
|
with open(local_path, "wb") as f, tqdm(
|
|
total=total_size, unit='B', unit_scale=True, desc=f"Downloading {name}-{version}",
|
|
) as pbar:
|
|
for chunk in r.iter_content(chunk_size=chunk_size):
|
|
if chunk:
|
|
f.write(chunk)
|
|
pbar.update(len(chunk))
|
|
|
|
# ---------------------------
|
|
# Manifest / deletion workflow
|
|
# ---------------------------
|
|
try:
|
|
resolved_manifest = fetch_manifest(name, pkgver=version)
|
|
except FileNotFoundError:
|
|
resolved_manifest = os.path.join(MANIFEST_CACHE_DIR, f"{name}.txt")
|
|
if not os.path.exists(resolved_manifest):
|
|
resolved_manifest = None
|
|
|
|
if resolved_manifest:
|
|
try:
|
|
os.makedirs(LOCAL_MANIFESTS_DIR, exist_ok=True)
|
|
local_snap = save_local_manifest_snapshot(name, version, resolved_manifest)
|
|
print(f"[fempkg] Saved local manifest snapshot: {local_snap}")
|
|
except Exception as e:
|
|
print(f"[fempkg] Warning: failed to save local manifest snapshot: {e}")
|
|
else:
|
|
print(f"[fempkg] No manifest available to snapshot for {name} {version}.")
|
|
|
|
old_installed_version = db["installed"].get(name)
|
|
if old_installed_version and (not atomic_upgrade) and (not nodelete_env):
|
|
old_versioned_path = os.path.join(VERSIONED_MANIFEST_DIR, f"{name}-{old_installed_version}.txt")
|
|
if os.path.exists(old_versioned_path):
|
|
print(f"[fempkg] Removing files from old manifest: {old_versioned_path}")
|
|
old_paths = read_manifest_paths(old_versioned_path)
|
|
for p in old_paths:
|
|
if not p:
|
|
continue
|
|
if not os.path.isabs(p):
|
|
print(f"[fempkg] Skipping (not absolute) path from manifest: {p}")
|
|
continue
|
|
if os.path.exists(p):
|
|
print(f"[fempkg] Removing old file: {p}")
|
|
delete_file_and_prune_dirs(p)
|
|
else:
|
|
print(f"[fempkg] No old versioned manifest found at {old_versioned_path}; nothing to remove.")
|
|
else:
|
|
if not old_installed_version:
|
|
print(f"[fempkg] {name} not currently installed; nothing to delete.")
|
|
elif atomic_upgrade:
|
|
print(f"[fempkg] Atomic upgrade requested in recipe - skipping deletion of old files.")
|
|
elif nodelete_env:
|
|
print(f"[fempkg] FEMPKG_NODELETE set - skipping deletion of old files.")
|
|
|
|
# Verify GPG signature
|
|
asc_path = local_path + ".asc"
|
|
if not os.path.exists(asc_path):
|
|
binpkg_asc_url = f"https://rocketleaguechatp.duckdns.org/binpkg/{name}-{version}.tar.zst.asc"
|
|
with requests.get(binpkg_asc_url) as r:
|
|
r.raise_for_status()
|
|
with open(asc_path, "wb") as f:
|
|
f.write(r.content)
|
|
|
|
print(f"[fempkg] Verifying GPG signature for {name}-{version}...")
|
|
try:
|
|
subprocess.run(["gpg", "--verify", asc_path, local_path], check=True)
|
|
print(f"[fempkg] Signature verified successfully.")
|
|
except subprocess.CalledProcessError as e:
|
|
raise RuntimeError(f"[fempkg] GPG verification failed for {name}-{version}: {e}")
|
|
|
|
print(f"[fempkg] Extracting binary package to / : {local_path}")
|
|
extract_tar_zst_with_progress(local_path, dest="/")
|
|
|
|
register_package(name, version, db=db)
|
|
print(f"[fempkg] Installed {name}-{version} from binary package.")
|
|
os.system(f"rm -rf {local_path} {asc_path}")
|
|
|
|
if resolved_manifest:
|
|
promoted = promote_local_to_versioned(name, version)
|
|
if promoted:
|
|
print(f"[fempkg] Promoted manifest to versioned: {promoted}")
|
|
else:
|
|
print(f"[fempkg] Warning: promotion of manifest failed for {name}-{version}")
|
|
|
|
if old_installed_version:
|
|
remove_versioned_manifest(name, old_installed_version)
|
|
print(f"[fempkg] Removed old versioned manifest for {name}-{old_installed_version} (if present).")
|
|
|
|
binpkg_success = True
|
|
|
|
except Exception as e:
|
|
print(f"[fempkg] Failed to use binary package for {name}: {e}. Falling back to build from source.")
|
|
binpkg_success = False
|
|
|
|
if binpkg_success:
|
|
if triggers:
|
|
print(f"[fempkg] Running post triggers for {name}...")
|
|
for trig in triggers:
|
|
if isinstance(trig, str):
|
|
print(f"> {trig}")
|
|
subprocess.run(trig, shell=True, check=True)
|
|
elif isinstance(trig, dict) and "rebuild_package" in trig:
|
|
rebuild_package(trig["rebuild_package"], repo_dir)
|
|
else:
|
|
print(f"[fempkg] Unknown trigger type: {trig}")
|
|
return
|
|
|
|
# --- Source build fallback ---
|
|
print(f"[fempkg] Building {name}-{version} from source...")
|
|
manifest_path = fetch_manifest(name, pkgver=version)
|
|
with open(manifest_path) as f:
|
|
files = sorted(line.strip() for line in f if line.strip())
|
|
print(f"Using manifest for {name} ({len(files)} files)")
|
|
|
|
download_extract(recipe["source"], source_type)
|
|
for cmd in recipe.get("build", []):
|
|
print(f"> {cmd}")
|
|
subprocess.run(f". /etc/profile && mkdir -p /tmp/fempkg && {cmd}", shell=True, check=True)
|
|
|
|
register_package(name, version, db=db)
|
|
try:
|
|
resolved_manifest = fetch_manifest(name, pkgver=version)
|
|
if resolved_manifest:
|
|
os.makedirs(VERSIONED_MANIFEST_DIR, exist_ok=True)
|
|
versioned_path = os.path.join(VERSIONED_MANIFEST_DIR, f"{name}-{version}.txt")
|
|
shutil.copy(resolved_manifest, versioned_path)
|
|
shutil.copy(versioned_path, os.path.join(MANIFEST_CACHE_DIR, f"{name}.txt"))
|
|
old_installed_version = db["installed"].get(name)
|
|
if old_installed_version and old_installed_version != version:
|
|
remove_versioned_manifest(name, old_installed_version)
|
|
except Exception as e:
|
|
print(f"[fempkg] Warning: failed to save versioned manifest for {name}: {e}")
|
|
|
|
for cleanup_path in ["/tmp/fempkg", "/tmp/fempkgbuild"]:
|
|
target = os.path.realpath(cleanup_path)
|
|
if os.path.exists(target):
|
|
shutil.rmtree(target, ignore_errors=True)
|
|
os.makedirs(target, exist_ok=True)
|
|
basename = os.path.basename(recipe["source"])
|
|
tarball_path = os.path.join(PKG_DIR, basename)
|
|
if os.path.exists(tarball_path):
|
|
os.remove(tarball_path)
|
|
|
|
if triggers:
|
|
print(f"[fempkg] Running post triggers for {name}...")
|
|
for trig in triggers:
|
|
if isinstance(trig, str):
|
|
print(f"> {trig}")
|
|
subprocess.run(trig, shell=True, check=True)
|
|
elif isinstance(trig, dict) and "rebuild_package" in trig:
|
|
rebuild_package(trig["rebuild_package"], repo_dir)
|
|
else:
|
|
print(f"[fempkg] Unknown trigger type: {trig}")
|