#!/usr/bin/env python3 """ GOLD auto-purge — doctrine 3 (GOLD-BACKUP before modif) + doctrine 59 (no-delete active) Keep only latest 5 backups PER filename_pattern. Archive older in compressed form. Runs daily via cron. """ import os, re, glob, shutil, datetime, subprocess, json from pathlib import Path VAULT = "/opt/wevads/vault" ARCHIVE_DIR = f"{VAULT}/.archive_compressed" LOG = "/var/log/weval/gold-purge.log" Path(LOG).parent.mkdir(parents=True, exist_ok=True) Path(ARCHIVE_DIR).mkdir(parents=True, exist_ok=True) KEEP_COUNT = 5 STATE_FILE = "/var/lib/weval/gold-purge-state.json" Path(STATE_FILE).parent.mkdir(parents=True, exist_ok=True) def log(m): line = f"[{datetime.datetime.now().isoformat()}] {m}\n" print(line, end="") try: with open(LOG, "a") as f: f.write(line) except: pass def group_by_stem(files): """Group files by 'stem' (name before .GOLD-YYYYMMDD or .gold-auto-)""" groups = {} for f in files: name = os.path.basename(f) # Extract stem (everything before first date-like pattern) # Pattern examples: foo.GOLD-20260417-1430 or foo.gold-auto-20260417 m = re.match(r"^(.*?)[\.-](?:GOLD|gold[-_]auto)[-_]?\d{8}", name) if m: stem = m.group(1) else: # Fallback: use dir + first 3 path parts stem = os.path.dirname(f) + "/" + name[:30] groups.setdefault(stem, []).append(f) return groups def get_mtime(f): try: return os.path.getmtime(f) except: return 0 def main(): log("=== GOLD PURGE START ===") # Find all GOLD backups (file or directory patterns) patterns = [ f"{VAULT}/*.GOLD-*", f"{VAULT}/*.gold-auto-*", f"{VAULT}/gold-auto-*", # directories f"{VAULT}/**/*.GOLD-*", f"{VAULT}/**/*.gold-*", ] all_items = set() for p in patterns: try: for item in glob.glob(p, recursive=True): all_items.add(item) except: pass log(f"FOUND {len(all_items)} GOLD items") groups = group_by_stem(sorted(all_items)) log(f"GROUPED into {len(groups)} stems") stats = {"kept": 0, "archived": 0, "errors": 0, "bytes_saved": 0} for stem, files in groups.items(): if len(files) <= KEEP_COUNT: stats["kept"] += len(files) continue # Sort by mtime desc (newest first) files_sorted = sorted(files, key=get_mtime, reverse=True) keep = files_sorted[:KEEP_COUNT] archive = files_sorted[KEEP_COUNT:] stats["kept"] += len(keep) for f in archive: try: if os.path.isfile(f): size = os.path.getsize(f) # Compress via tar.gz to archive dir arch_name = f"{os.path.basename(f)}.{datetime.datetime.now().strftime('%Y%m%d')}.tar.gz" arch_path = f"{ARCHIVE_DIR}/{arch_name}" subprocess.run(["tar", "czf", arch_path, f], capture_output=True, timeout=30) os.remove(f) # Delete original AFTER compression archive (doctrine 59: archived, not destroyed) stats["bytes_saved"] += size stats["archived"] += 1 elif os.path.isdir(f): # Directory (gold-auto-*/): compress whole dir size = int(subprocess.run(["du","-sb",f], capture_output=True, text=True).stdout.split()[0]) arch_name = f"{os.path.basename(f)}.tar.gz" arch_path = f"{ARCHIVE_DIR}/{arch_name}" if not os.path.exists(arch_path): r = subprocess.run(["tar","czf", arch_path, "-C", os.path.dirname(f), os.path.basename(f)], capture_output=True, timeout=300) if r.returncode == 0: shutil.rmtree(f) stats["bytes_saved"] += size stats["archived"] += 1 except Exception as e: log(f"ERR {f} {e}") stats["errors"] += 1 log(f"=== END stats={stats} ===") # Save state try: state = {"last_run": datetime.datetime.now().isoformat(), "stats": stats} with open(STATE_FILE, "w") as f: json.dump(state, f, indent=2) except: pass return 0 if __name__ == "__main__": import sys sys.exit(main())