Files
fempkg/utils.py
Gabriel Di Martino 35d4041b46 2/2025/2026
Signed-off-by: Gabriel Di Martino <gabrys_world13@proton.me>
2026-01-01 09:59:08 +01:00

283 lines
10 KiB
Python

#!/usr/bin/env python3
# fempkg - a simple package manager
# Copyright (C) 2026 Gabriel Di Martino
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import os
import shutil
import ssl
import subprocess
import errno
import urllib
from urllib.parse import urlparse
from tqdm import tqdm
import requests
# Directories
BUILD_DIR = "/tmp/fempkgbuild"
PKG_DIR = "/var/lib/fempkg/pkgs"
RECIPE_CACHE_DIR = os.path.expanduser("/var/lib/fempkg/repo")
MANIFEST_CACHE_DIR = "/var/lib/fempkg/manifests" # current manifests (pkgname.txt)
VERSIONED_MANIFEST_DIR = "/var/lib/fempkg/manifest-versions" # versioned manifests pkgname-version.txt
LOCAL_MANIFESTS_DIR = "/var/lib/fempkg/local-manifests" # temporary snapshots used during install
BINPKG_CACHE_DIR = "/var/lib/fempkg/binpkg"
os.makedirs(PKG_DIR, exist_ok=True)
os.makedirs(BUILD_DIR, exist_ok=True)
# Utility functions
def _ensure_dir(path):
try:
os.makedirs(path, exist_ok=True)
except OSError as e:
if e.errno != errno.EEXIST:
raise
def version_satisfies(installed, latest):
def parse(v):
parts = v.replace("-", ".").split(".")
parsed = []
for p in parts:
parsed.append(int(p) if p.isdigit() else p)
return parsed
return parse(installed) >= parse(latest)
def _install_insecure_opener_if_requested():
"""Sets up insecure SSL handling if FEMPKG_INSECURE=1"""
if os.environ.get("FEMPKG_INSECURE") == "1":
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
opener = urllib.request.build_opener(urllib.request.HTTPSHandler(context=ctx))
urllib.request.install_opener(opener)
def _safe_basename_from_url(url):
parsed = urlparse(url)
name = os.path.basename(parsed.path)
return name or "downloaded.file"
def _strip_archive_ext(name):
for ext in [".tar.gz", ".tar.bz2", ".tar.xz", ".tgz", ".tar"]:
if name.endswith(ext):
return name[:-len(ext)]
return os.path.splitext(name)[0]
def _clone_git_repo(url):
"""Clone a git repository into BUILD_DIR"""
repo_name = os.path.splitext(os.path.basename(url))[0]
dest = os.path.join(BUILD_DIR, repo_name)
if os.path.exists(dest):
shutil.rmtree(dest)
os.makedirs(BUILD_DIR, exist_ok=True)
subprocess.check_call(["git", "clone", url, dest])
return dest
# Download function with progress bar (ABI-safe)
def download_to(url, dest):
_install_insecure_opener_if_requested()
os.makedirs(os.path.dirname(dest), exist_ok=True)
# Handle insecure SSL if requested
verify_ssl = os.environ.get("FEMPKG_INSECURE") != "1"
headers = {
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/118.0.0.0 Safari/537.36"
}
with requests.get(url, headers=headers, stream=True, verify=verify_ssl) as r:
r.raise_for_status()
total_size = int(r.headers.get('content-length', 0))
chunk_size = 1024 * 1024 # 1 MB
with open(dest, "wb") as f, tqdm(
total=total_size,
unit='B',
unit_scale=True,
desc=f"Downloading {os.path.basename(dest)}",
) as pbar:
for chunk in r.iter_content(chunk_size=chunk_size):
if chunk: # filter out keep-alive chunks
f.write(chunk)
pbar.update(len(chunk))
# Main download + prepare directory
def download_extract(url, source_type):
_install_insecure_opener_if_requested()
if source_type == "git":
return _clone_git_repo(url)
basename = _safe_basename_from_url(url)
tarball_path = os.path.join(PKG_DIR, basename)
if not os.path.exists(tarball_path):
download_to(url, tarball_path)
dest_dir = os.path.join(BUILD_DIR, _strip_archive_ext(basename))
if os.path.exists(dest_dir):
shutil.rmtree(dest_dir)
os.makedirs(dest_dir)
return dest_dir
# --- helpers ---
def safe_rmtree(path):
"""Remove path recursively or symlink target."""
if os.path.islink(path):
real_path = os.path.realpath(path)
if os.path.exists(real_path):
shutil.rmtree(real_path, ignore_errors=True)
os.unlink(path)
elif os.path.exists(path):
shutil.rmtree(path, ignore_errors=True)
def _ensure_symlink(src: str, target: str):
if os.path.islink(src):
if os.readlink(src) == target:
return
else:
os.unlink(src)
elif os.path.exists(src):
shutil.rmtree(src, ignore_errors=True)
os.makedirs(target, exist_ok=True)
os.symlink(target, src)
_ensure_symlink("/tmp/fempkg", "/var/tmp/fempkg")
_ensure_symlink("/tmp/fempkgbuild", "/var/tmp/fempkgbuild")
# --------------------------
# Fetch recipes & manifests
# --------------------------
def fetch_all_recipes(repo_url="https://rocketleaguechatp.duckdns.org/recipes"):
index_url = f"{repo_url}/index.txt"
print(f"Fetching recipe index from {index_url}")
try:
with requests.Session() as session:
resp = session.get(index_url)
resp.raise_for_status()
entries = [e for e in resp.text.splitlines() if e.endswith(".recipe.py")]
for entry in tqdm(entries, desc="Recipes", unit="file"):
url = f"{repo_url}/{entry}"
dest_path = os.path.join(RECIPE_CACHE_DIR, entry)
r = session.get(url)
r.raise_for_status()
with open(dest_path, "wb") as f:
f.write(r.content)
except Exception as e:
print(f"Error fetching index/recipes: {e}")
def fetch_all_manifests(repo_url="https://rocketleaguechatp.duckdns.org/manifests"):
index_url = f"{repo_url}/index.txt"
print(f"Fetching manifest index from {index_url}")
try:
with requests.Session() as session:
resp = session.get(index_url)
resp.raise_for_status()
entries = [e for e in resp.text.splitlines() if e.endswith(".txt")]
for entry in tqdm(entries, desc="Manifests", unit="file"):
url = f"{repo_url}/{entry}"
dest_path = os.path.join(MANIFEST_CACHE_DIR, entry)
r = session.get(url)
r.raise_for_status()
with open(dest_path, "wb") as f:
f.write(r.content)
except Exception as e:
print(f"Error fetching index/manifests: {e}")
def fetch_binpkg_index(repo_url="https://rocketleaguechatp.duckdns.org/binpkg"):
index_url = f"{repo_url}/index.txt"
print(f"Fetching binpkg index from {index_url}")
try:
with requests.Session() as session:
resp = session.get(index_url)
resp.raise_for_status()
dest_path = os.path.join(BINPKG_CACHE_DIR, "index.txt")
r = session.get(index_url)
r.raise_for_status()
with open(dest_path, "wb") as f:
f.write(r.content)
print("Binpkg index downloaded successfully.")
except Exception as e:
print(f"Error fetching binpkg index: {e}")
def fetch_all(repo_url_recipes=None, repo_url_manifests=None, repo_url_binpkg=None):
os.system(f"rm -rf {RECIPE_CACHE_DIR}/*")
fetch_all_recipes(repo_url_recipes or "https://rocketleaguechatp.duckdns.org/recipes")
fetch_all_manifests(repo_url_manifests or "https://rocketleaguechatp.duckdns.org/manifests")
fetch_binpkg_index(repo_url_binpkg or "https://rocketleaguechatp.duckdns.org/binpkg")
print("All recipes, manifests and the binpkg index have been fetched successfully.")
# --------------------------
# Manifest/version utilities
# --------------------------
def save_local_manifest_snapshot(pkgname, version, src_manifest_path):
"""Copy the current manifest to local-manifests/<pkg>-<version>.txt (temporary snapshot)."""
os.makedirs(LOCAL_MANIFESTS_DIR, exist_ok=True)
dest = os.path.join(LOCAL_MANIFESTS_DIR, f"{pkgname}-{version}.txt")
shutil.copy(src_manifest_path, dest)
return dest
def promote_local_to_versioned(pkgname, version):
"""Move local snapshot to the versioned directory and update the current manifest."""
local = os.path.join(LOCAL_MANIFESTS_DIR, f"{pkgname}-{version}.txt")
if not os.path.exists(local):
return None
os.makedirs(VERSIONED_MANIFEST_DIR, exist_ok=True)
versioned = os.path.join(VERSIONED_MANIFEST_DIR, f"{pkgname}-{version}.txt")
shutil.move(local, versioned)
# copy the versioned file to be the current manifest too (manifests/pkgname.txt)
current = os.path.join(MANIFEST_CACHE_DIR, f"{pkgname}.txt")
shutil.copy(versioned, current)
return versioned
def remove_versioned_manifest(pkgname, version):
p = os.path.join(VERSIONED_MANIFEST_DIR, f"{pkgname}-{version}.txt")
if os.path.exists(p):
os.remove(p)
def read_manifest_paths(manifest_path):
"""Return a list of absolute paths listed in manifest_path."""
if not os.path.exists(manifest_path):
return []
with open(manifest_path) as f:
return [line.strip() for line in f if line.strip()]
def delete_file_and_prune_dirs(path):
"""Remove a file and try to prune empty parents up to / (but stop at /)."""
try:
if os.path.islink(path) or os.path.isfile(path):
os.remove(path)
elif os.path.isdir(path):
# If entire directory path listed, try rmdir (only if empty)
try:
os.rmdir(path)
except OSError:
# not empty
return
# prune parents
parent = os.path.dirname(path)
while parent and parent != "/" and parent != "":
try:
os.rmdir(parent)
except OSError:
break
parent = os.path.dirname(parent)
except Exception as e:
print(f"[fempkg] Warning: failed to remove {path}: {e}")