283 lines
10 KiB
Python
283 lines
10 KiB
Python
#!/usr/bin/env python3
|
|
# fempkg - a simple package manager
|
|
# Copyright (C) 2026 Gabriel Di Martino
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
|
|
import os
|
|
import shutil
|
|
import ssl
|
|
import subprocess
|
|
import errno
|
|
import urllib
|
|
from urllib.parse import urlparse
|
|
from tqdm import tqdm
|
|
import requests
|
|
|
|
# Directories
|
|
BUILD_DIR = "/tmp/fempkgbuild"
|
|
PKG_DIR = "/var/lib/fempkg/pkgs"
|
|
RECIPE_CACHE_DIR = os.path.expanduser("/var/lib/fempkg/repo")
|
|
MANIFEST_CACHE_DIR = "/var/lib/fempkg/manifests" # current manifests (pkgname.txt)
|
|
VERSIONED_MANIFEST_DIR = "/var/lib/fempkg/manifest-versions" # versioned manifests pkgname-version.txt
|
|
LOCAL_MANIFESTS_DIR = "/var/lib/fempkg/local-manifests" # temporary snapshots used during install
|
|
BINPKG_CACHE_DIR = "/var/lib/fempkg/binpkg"
|
|
|
|
os.makedirs(PKG_DIR, exist_ok=True)
|
|
os.makedirs(BUILD_DIR, exist_ok=True)
|
|
|
|
# Utility functions
|
|
def _ensure_dir(path):
|
|
try:
|
|
os.makedirs(path, exist_ok=True)
|
|
except OSError as e:
|
|
if e.errno != errno.EEXIST:
|
|
raise
|
|
|
|
def version_satisfies(installed, latest):
|
|
def parse(v):
|
|
parts = v.replace("-", ".").split(".")
|
|
parsed = []
|
|
for p in parts:
|
|
parsed.append(int(p) if p.isdigit() else p)
|
|
return parsed
|
|
return parse(installed) >= parse(latest)
|
|
|
|
def _install_insecure_opener_if_requested():
|
|
"""Sets up insecure SSL handling if FEMPKG_INSECURE=1"""
|
|
if os.environ.get("FEMPKG_INSECURE") == "1":
|
|
ctx = ssl.create_default_context()
|
|
ctx.check_hostname = False
|
|
ctx.verify_mode = ssl.CERT_NONE
|
|
opener = urllib.request.build_opener(urllib.request.HTTPSHandler(context=ctx))
|
|
urllib.request.install_opener(opener)
|
|
|
|
def _safe_basename_from_url(url):
|
|
parsed = urlparse(url)
|
|
name = os.path.basename(parsed.path)
|
|
return name or "downloaded.file"
|
|
|
|
def _strip_archive_ext(name):
|
|
for ext in [".tar.gz", ".tar.bz2", ".tar.xz", ".tgz", ".tar"]:
|
|
if name.endswith(ext):
|
|
return name[:-len(ext)]
|
|
return os.path.splitext(name)[0]
|
|
|
|
def _clone_git_repo(url):
|
|
"""Clone a git repository into BUILD_DIR"""
|
|
repo_name = os.path.splitext(os.path.basename(url))[0]
|
|
dest = os.path.join(BUILD_DIR, repo_name)
|
|
if os.path.exists(dest):
|
|
shutil.rmtree(dest)
|
|
os.makedirs(BUILD_DIR, exist_ok=True)
|
|
subprocess.check_call(["git", "clone", url, dest])
|
|
return dest
|
|
|
|
# Download function with progress bar (ABI-safe)
|
|
def download_to(url, dest):
|
|
_install_insecure_opener_if_requested()
|
|
os.makedirs(os.path.dirname(dest), exist_ok=True)
|
|
|
|
# Handle insecure SSL if requested
|
|
verify_ssl = os.environ.get("FEMPKG_INSECURE") != "1"
|
|
|
|
headers = {
|
|
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/118.0.0.0 Safari/537.36"
|
|
}
|
|
|
|
with requests.get(url, headers=headers, stream=True, verify=verify_ssl) as r:
|
|
r.raise_for_status()
|
|
total_size = int(r.headers.get('content-length', 0))
|
|
chunk_size = 1024 * 1024 # 1 MB
|
|
|
|
with open(dest, "wb") as f, tqdm(
|
|
total=total_size,
|
|
unit='B',
|
|
unit_scale=True,
|
|
desc=f"Downloading {os.path.basename(dest)}",
|
|
) as pbar:
|
|
for chunk in r.iter_content(chunk_size=chunk_size):
|
|
if chunk: # filter out keep-alive chunks
|
|
f.write(chunk)
|
|
pbar.update(len(chunk))
|
|
|
|
# Main download + prepare directory
|
|
def download_extract(url, source_type):
|
|
_install_insecure_opener_if_requested()
|
|
|
|
if source_type == "git":
|
|
return _clone_git_repo(url)
|
|
|
|
basename = _safe_basename_from_url(url)
|
|
tarball_path = os.path.join(PKG_DIR, basename)
|
|
|
|
if not os.path.exists(tarball_path):
|
|
download_to(url, tarball_path)
|
|
|
|
dest_dir = os.path.join(BUILD_DIR, _strip_archive_ext(basename))
|
|
if os.path.exists(dest_dir):
|
|
shutil.rmtree(dest_dir)
|
|
os.makedirs(dest_dir)
|
|
|
|
return dest_dir
|
|
|
|
# --- helpers ---
|
|
def safe_rmtree(path):
|
|
"""Remove path recursively or symlink target."""
|
|
if os.path.islink(path):
|
|
real_path = os.path.realpath(path)
|
|
if os.path.exists(real_path):
|
|
shutil.rmtree(real_path, ignore_errors=True)
|
|
os.unlink(path)
|
|
elif os.path.exists(path):
|
|
shutil.rmtree(path, ignore_errors=True)
|
|
|
|
def _ensure_symlink(src: str, target: str):
|
|
if os.path.islink(src):
|
|
if os.readlink(src) == target:
|
|
return
|
|
else:
|
|
os.unlink(src)
|
|
elif os.path.exists(src):
|
|
shutil.rmtree(src, ignore_errors=True)
|
|
os.makedirs(target, exist_ok=True)
|
|
os.symlink(target, src)
|
|
|
|
_ensure_symlink("/tmp/fempkg", "/var/tmp/fempkg")
|
|
_ensure_symlink("/tmp/fempkgbuild", "/var/tmp/fempkgbuild")
|
|
|
|
# --------------------------
|
|
# Fetch recipes & manifests
|
|
# --------------------------
|
|
|
|
def fetch_all_recipes(repo_url="https://rocketleaguechatp.duckdns.org/recipes"):
|
|
index_url = f"{repo_url}/index.txt"
|
|
print(f"Fetching recipe index from {index_url}")
|
|
try:
|
|
with requests.Session() as session:
|
|
resp = session.get(index_url)
|
|
resp.raise_for_status()
|
|
entries = [e for e in resp.text.splitlines() if e.endswith(".recipe.py")]
|
|
|
|
for entry in tqdm(entries, desc="Recipes", unit="file"):
|
|
url = f"{repo_url}/{entry}"
|
|
dest_path = os.path.join(RECIPE_CACHE_DIR, entry)
|
|
r = session.get(url)
|
|
r.raise_for_status()
|
|
with open(dest_path, "wb") as f:
|
|
f.write(r.content)
|
|
except Exception as e:
|
|
print(f"Error fetching index/recipes: {e}")
|
|
|
|
|
|
def fetch_all_manifests(repo_url="https://rocketleaguechatp.duckdns.org/manifests"):
|
|
index_url = f"{repo_url}/index.txt"
|
|
print(f"Fetching manifest index from {index_url}")
|
|
try:
|
|
with requests.Session() as session:
|
|
resp = session.get(index_url)
|
|
resp.raise_for_status()
|
|
entries = [e for e in resp.text.splitlines() if e.endswith(".txt")]
|
|
|
|
for entry in tqdm(entries, desc="Manifests", unit="file"):
|
|
url = f"{repo_url}/{entry}"
|
|
dest_path = os.path.join(MANIFEST_CACHE_DIR, entry)
|
|
r = session.get(url)
|
|
r.raise_for_status()
|
|
with open(dest_path, "wb") as f:
|
|
f.write(r.content)
|
|
except Exception as e:
|
|
print(f"Error fetching index/manifests: {e}")
|
|
|
|
|
|
def fetch_binpkg_index(repo_url="https://rocketleaguechatp.duckdns.org/binpkg"):
|
|
index_url = f"{repo_url}/index.txt"
|
|
print(f"Fetching binpkg index from {index_url}")
|
|
try:
|
|
with requests.Session() as session:
|
|
resp = session.get(index_url)
|
|
resp.raise_for_status()
|
|
dest_path = os.path.join(BINPKG_CACHE_DIR, "index.txt")
|
|
r = session.get(index_url)
|
|
r.raise_for_status()
|
|
with open(dest_path, "wb") as f:
|
|
f.write(r.content)
|
|
print("Binpkg index downloaded successfully.")
|
|
except Exception as e:
|
|
print(f"Error fetching binpkg index: {e}")
|
|
|
|
|
|
def fetch_all(repo_url_recipes=None, repo_url_manifests=None, repo_url_binpkg=None):
|
|
os.system(f"rm -rf {RECIPE_CACHE_DIR}/*")
|
|
fetch_all_recipes(repo_url_recipes or "https://rocketleaguechatp.duckdns.org/recipes")
|
|
fetch_all_manifests(repo_url_manifests or "https://rocketleaguechatp.duckdns.org/manifests")
|
|
fetch_binpkg_index(repo_url_binpkg or "https://rocketleaguechatp.duckdns.org/binpkg")
|
|
print("All recipes, manifests and the binpkg index have been fetched successfully.")
|
|
|
|
# --------------------------
|
|
# Manifest/version utilities
|
|
# --------------------------
|
|
def save_local_manifest_snapshot(pkgname, version, src_manifest_path):
|
|
"""Copy the current manifest to local-manifests/<pkg>-<version>.txt (temporary snapshot)."""
|
|
os.makedirs(LOCAL_MANIFESTS_DIR, exist_ok=True)
|
|
dest = os.path.join(LOCAL_MANIFESTS_DIR, f"{pkgname}-{version}.txt")
|
|
shutil.copy(src_manifest_path, dest)
|
|
return dest
|
|
|
|
def promote_local_to_versioned(pkgname, version):
|
|
"""Move local snapshot to the versioned directory and update the current manifest."""
|
|
local = os.path.join(LOCAL_MANIFESTS_DIR, f"{pkgname}-{version}.txt")
|
|
if not os.path.exists(local):
|
|
return None
|
|
os.makedirs(VERSIONED_MANIFEST_DIR, exist_ok=True)
|
|
versioned = os.path.join(VERSIONED_MANIFEST_DIR, f"{pkgname}-{version}.txt")
|
|
shutil.move(local, versioned)
|
|
# copy the versioned file to be the current manifest too (manifests/pkgname.txt)
|
|
current = os.path.join(MANIFEST_CACHE_DIR, f"{pkgname}.txt")
|
|
shutil.copy(versioned, current)
|
|
return versioned
|
|
|
|
def remove_versioned_manifest(pkgname, version):
|
|
p = os.path.join(VERSIONED_MANIFEST_DIR, f"{pkgname}-{version}.txt")
|
|
if os.path.exists(p):
|
|
os.remove(p)
|
|
|
|
def read_manifest_paths(manifest_path):
|
|
"""Return a list of absolute paths listed in manifest_path."""
|
|
if not os.path.exists(manifest_path):
|
|
return []
|
|
with open(manifest_path) as f:
|
|
return [line.strip() for line in f if line.strip()]
|
|
|
|
def delete_file_and_prune_dirs(path):
|
|
"""Remove a file and try to prune empty parents up to / (but stop at /)."""
|
|
try:
|
|
if os.path.islink(path) or os.path.isfile(path):
|
|
os.remove(path)
|
|
elif os.path.isdir(path):
|
|
# If entire directory path listed, try rmdir (only if empty)
|
|
try:
|
|
os.rmdir(path)
|
|
except OSError:
|
|
# not empty
|
|
return
|
|
# prune parents
|
|
parent = os.path.dirname(path)
|
|
while parent and parent != "/" and parent != "":
|
|
try:
|
|
os.rmdir(parent)
|
|
except OSError:
|
|
break
|
|
parent = os.path.dirname(parent)
|
|
except Exception as e:
|
|
print(f"[fempkg] Warning: failed to remove {path}: {e}") |