#!/usr/bin/env python3 # /opt/weval-l99/screens-autodiscovery.py # Detecte nouvelles pages HTML, regenere cartographie, notifie Telegram. # Tourne toutes les 5min via cron. Idempotent, anti-regression. import json, os, subprocess, sys, re, time, subprocess, tempfile from urllib.request import Request, urlopen from urllib.parse import urlencode CARTO_FILE = "/var/www/html/cartographie-screens.html" LAST_SCAN_FILE = "/var/www/html/api/screens-last-scan.json" TELEGRAM_TOKEN_FILE = "/opt/wevads/vault/telegram_token.txt" TELEGRAM_CHAT = "7605775322" TELEGRAM_BOT_FALLBACK = "wevia_cyber_bot" def scan_sources(): """Scan all 3 sources: S204 local, S95 Arsenal, S95 WEVADS.""" sources = { "S204": {"path": "/var/www/html", "max_depth": 1, "url_prefix": "https://weval-consulting.com/"}, "S204-PHP": {"path": "/var/www/html", "max_depth": 1, "url_prefix": "https://weval-consulting.com/", "ext": ".php"}, "S95-Arsenal": {"path": "/opt/wevads-arsenal/public", "max_depth": 1, "url_prefix": "https://wevads.weval-consulting.com/", "ssh": True}, "S95-WEVADS": {"path": "/var/www/html", "max_depth": 1, "url_prefix": "https://wevads.weval-consulting.com/wv/", "ssh": True}, # OPUS_ADDED_SUBDOMAINS 16avr - gaps detected via coverage_audit "Gitea-API": {"path": "", "url_prefix": "https://git.weval-consulting.com/", "sub_urls": ["explore/repos", "explore/users", "issues", "notifications", "user/settings", "api/v1/repos/search?limit=20"], "direct": True}, "Chat-routes": {"path": "", "url_prefix": "https://chat.weval-consulting.com/", "sub_urls": ["", "login", "admin", "dashboard"], "direct": True}, } screens = [] for srv, cfg in sources.items(): ext = cfg.get("ext", ".html") p = cfg["path"] if cfg.get("ssh"): cmd = "sudo ssh -p 49222 -o StrictHostKeyChecking=no -i /var/www/.ssh/wevads_key root@10.1.0.3 'ls " + p + "/*" + ext + " 2>/dev/null | xargs -n1 basename'" else: cmd = "ls " + p + "/*" + ext + " 2>/dev/null | xargs -n1 basename" try: out = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=25).stdout for line in out.strip().split("\n"): line = line.strip() if not line or not line.endswith(ext): continue screens.append({"name": line, "server": srv, "url": cfg["url_prefix"] + line, "cat": classify(line)}) except Exception: continue return screens CAT_RULES = [ (r"^api[-_]|-api\.|_api\.", "API"), (r"brain|hamid|claude|wevia", None), # special (r"offer|convers", "Offers/Conv"), (r"monitor|dashboard|health", "Monitoring"), (r"admin|config|permissions", "Admin"), (r"scrap|harvest|scout|crawl", "Scraping"), (r"account|factory|persona", "Accounts"), (r"mail|send|email|warmup|pmta|smtp", "Email/Send"), ] def classify(name): n = name.lower() for pat, cat in CAT_RULES: if re.search(pat, n): if cat is None: if "claude" in n: return "Claude" if "wevia" in n: return "WEVIA" return "Brain/IA" return cat return "Autres" def load_last_scan(): if not os.path.exists(LAST_SCAN_FILE): return {"urls": []} try: return json.load(open(LAST_SCAN_FILE)) except Exception: return {"urls": []} def save_last_scan(screens): out = {"ts": time.strftime("%Y-%m-%dT%H:%M:%S%z"), "total": len(screens), "urls": sorted([s["url"] for s in screens])} tmp = tempfile.NamedTemporaryFile(mode="w", delete=False, dir=os.path.dirname(LAST_SCAN_FILE), suffix=".tmp") json.dump(out, tmp) tmp.close() os.replace(tmp.name, LAST_SCAN_FILE) def read_telegram_token(): try: with open(TELEGRAM_TOKEN_FILE) as f: return f.read().strip() except Exception: return None def notify_telegram(text): token = read_telegram_token() if not token: print("NO_TELEGRAM_TOKEN, skipping notif") return False url = f"https://api.telegram.org/bot{token}/sendMessage" data = urlencode({"chat_id": TELEGRAM_CHAT, "text": text, "parse_mode": "Markdown"}).encode() try: req = Request(url, data=data, method="POST") resp = urlopen(req, timeout=5) return resp.status == 200 except Exception as e: print(f"TELEGRAM_ERR: {e}") return False def regen_carto(screens): """Regenerate DATA and CATS in cartographie-screens.html.""" if not os.path.exists(CARTO_FILE): return False with open(CARTO_FILE, "r", encoding="utf-8") as f: html = f.read() # MERGE: append-only, keep existing, only add new URLs import re as _re _m = _re.search(r"const DATA = (\\[.*?\\]);", html, _re.DOTALL) if _m: try: _existing = __import__("json").loads(_m.group(1)) _existing_urls = {e["url"] for e in _existing} _new_only = [s for s in screens if s["url"] not in _existing_urls] # NEVER_SHRINK_GUARD if len(screens) < len(_existing) * 0.9: print("GUARD: scan=%d < existing=%d*0.9 - REFUSING" % (len(screens), len(_existing))) return False if not _new_only: print(f"No new screens (existing={len(_existing)})") return False screens = _existing + _new_only print(f"MERGE: {len(_existing)}+{len(_new_only)}={len(screens)}") except Exception as e: print(f"MERGE_ERR: {e}") # Compute counts per category cat_counts = {} for s in screens: cat_counts[s["cat"]] = cat_counts.get(s["cat"], 0) + 1 cat_sorted = dict(sorted(cat_counts.items(), key=lambda x: -x[1])) data_str = json.dumps(screens, ensure_ascii=False, separators=(",", ":")) cats_str = json.dumps(cat_sorted, ensure_ascii=False, separators=(",", ":")) # Replace DATA html2 = re.sub(r"const DATA = \[.*?\];", f"const DATA = {data_str};", html, count=1, flags=re.DOTALL) # Replace CATS html2 = re.sub(r"const CATS = \{.*?\};", f"const CATS = {cats_str};", html2, count=1, flags=re.DOTALL) # Update header count html2 = re.sub(r"(\d+) ecrans total reperes", f"{len(screens)} ecrans total reperes", html2, count=1) if html2 == html: return False # Atomic write backup = CARTO_FILE + ".pre-autodisc-" + time.strftime("%Y%m%d_%H%M%S") with open(backup, "w", encoding="utf-8") as f: f.write(html) # chattr toggle for immutable files subprocess.run(["sudo", "chattr", "-i", CARTO_FILE], capture_output=True, timeout=5) try: with open(CARTO_FILE, "w", encoding="utf-8") as f: f.write(html2) finally: subprocess.run(["sudo", "chattr", "+i", CARTO_FILE], capture_output=True, timeout=5) return True def main(): screens = scan_sources() if not screens: print("NO_SCREENS_FOUND") sys.exit(1) current_urls = set(s["url"] for s in screens) last = load_last_scan() last_urls = set(last.get("urls", [])) new_urls = current_urls - last_urls removed_urls = last_urls - current_urls changed = bool(new_urls or removed_urls) print(f"Total: {len(screens)} | New: {len(new_urls)} | Removed: {len(removed_urls)}") if changed: regenerated = regen_carto(screens) print(f"Carto regenerated: {regenerated}") # Telegram notif msg_parts = [f"*WEVADS Cartographie MAJ* - {len(screens)} ecrans"] if new_urls: msg_parts.append(f"\nNouveaux ({len(new_urls)}):") for u in sorted(new_urls)[:10]: msg_parts.append(f"+ {u.replace('https://','')}") if len(new_urls) > 10: msg_parts.append(f"... et {len(new_urls)-10} autres") if removed_urls: msg_parts.append(f"\nSupprimes ({len(removed_urls)}):") for u in sorted(removed_urls)[:5]: msg_parts.append(f"- {u.replace('https://','')}") msg = "\n".join(msg_parts) notify_telegram(msg) save_last_scan(screens) print("DONE") if __name__ == "__main__": main()