#!/usr/bin/env python3 # /opt/weval-l99/screens-autodiscovery.py # Detecte nouvelles pages HTML, regenere cartographie, notifie Telegram. # Tourne toutes les 5min via cron. Idempotent, anti-regression. import json, os, sys, re, time, subprocess, tempfile from urllib.request import Request, urlopen from urllib.parse import urlencode CARTO_FILE = "/var/www/html/cartographie-screens.html" LAST_SCAN_FILE = "/var/www/html/api/screens-last-scan.json" TELEGRAM_TOKEN_FILE = "/opt/wevads/vault/telegram_token.txt" TELEGRAM_CHAT = "7605775322" TELEGRAM_BOT_FALLBACK = "wevia_cyber_bot" def scan_sources(): """Scan all 3 sources: S204 local, S95 Arsenal, S95 WEVADS.""" sources = { "S204": {"path": "/var/www/html", "max_depth": 1, "url_prefix": "https://weval-consulting.com/"}, "S204-PHP": {"path": "/var/www/html", "max_depth": 1, "url_prefix": "https://weval-consulting.com/", "ext": ".php"}, "S95-Arsenal": {"path": "/opt/wevads-arsenal/public", "max_depth": 1, "url_prefix": "https://wevads.weval-consulting.com/", "ssh": True}, "S95-WEVADS": {"path": "/var/www/html", "max_depth": 1, "url_prefix": "https://wevads.weval-consulting.com/wv/", "ssh": True}, } screens = [] for srv, cfg in sources.items(): ext = cfg.get("ext", ".html") p = cfg["path"] if cfg.get("ssh"): cmd = "sudo ssh -p 49222 -o StrictHostKeyChecking=no -i /var/www/.ssh/wevads_key root@10.1.0.3 'ls " + p + "/*" + ext + " 2>/dev/null | xargs -n1 basename'" else: cmd = "ls " + p + "/*" + ext + " 2>/dev/null | xargs -n1 basename" try: out = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=25).stdout for line in out.strip().split("\n"): line = line.strip() if not line or not line.endswith(ext): continue screens.append({"name": line, "server": srv, "url": cfg["url_prefix"] + line, "cat": classify(line)}) except Exception: continue return screens CAT_RULES = [ (r"^api[-_]|-api\.|_api\.", "API"), (r"brain|hamid|claude|wevia", None), # special (r"offer|convers", "Offers/Conv"), (r"monitor|dashboard|health", "Monitoring"), (r"admin|config|permissions", "Admin"), (r"scrap|harvest|scout|crawl", "Scraping"), (r"account|factory|persona", "Accounts"), (r"mail|send|email|warmup|pmta|smtp", "Email/Send"), ] def classify(name): n = name.lower() for pat, cat in CAT_RULES: if re.search(pat, n): if cat is None: if "claude" in n: return "Claude" if "wevia" in n: return "WEVIA" return "Brain/IA" return cat return "Autres" def load_last_scan(): if not os.path.exists(LAST_SCAN_FILE): return {"urls": []} try: return json.load(open(LAST_SCAN_FILE)) except Exception: return {"urls": []} def save_last_scan(screens): out = {"ts": time.strftime("%Y-%m-%dT%H:%M:%S%z"), "total": len(screens), "urls": sorted([s["url"] for s in screens])} tmp = tempfile.NamedTemporaryFile(mode="w", delete=False, dir=os.path.dirname(LAST_SCAN_FILE), suffix=".tmp") json.dump(out, tmp) tmp.close() os.replace(tmp.name, LAST_SCAN_FILE) def read_telegram_token(): try: with open(TELEGRAM_TOKEN_FILE) as f: return f.read().strip() except Exception: return None def notify_telegram(text): token = read_telegram_token() if not token: print("NO_TELEGRAM_TOKEN, skipping notif") return False url = f"https://api.telegram.org/bot{token}/sendMessage" data = urlencode({"chat_id": TELEGRAM_CHAT, "text": text, "parse_mode": "Markdown"}).encode() try: req = Request(url, data=data, method="POST") resp = urlopen(req, timeout=5) return resp.status == 200 except Exception as e: print(f"TELEGRAM_ERR: {e}") return False def regen_carto(screens): """Regenerate DATA and CATS in cartographie-screens.html.""" if not os.path.exists(CARTO_FILE): return False with open(CARTO_FILE, "r", encoding="utf-8") as f: html = f.read() # Compute counts per category cat_counts = {} for s in screens: cat_counts[s["cat"]] = cat_counts.get(s["cat"], 0) + 1 cat_sorted = dict(sorted(cat_counts.items(), key=lambda x: -x[1])) data_str = json.dumps(screens, ensure_ascii=False, separators=(",", ":")) cats_str = json.dumps(cat_sorted, ensure_ascii=False, separators=(",", ":")) # Replace DATA html2 = re.sub(r"const DATA = \[.*?\];", f"const DATA = {data_str};", html, count=1, flags=re.DOTALL) # Replace CATS html2 = re.sub(r"const CATS = \{.*?\};", f"const CATS = {cats_str};", html2, count=1, flags=re.DOTALL) # Update header count html2 = re.sub(r"(\d+) ecrans total reperes", f"{len(screens)} ecrans total reperes", html2, count=1) if html2 == html: return False # Atomic write backup = CARTO_FILE + ".pre-autodisc-" + time.strftime("%Y%m%d_%H%M%S") with open(backup, "w", encoding="utf-8") as f: f.write(html) with open(CARTO_FILE, "w", encoding="utf-8") as f: f.write(html2) return True def main(): screens = scan_sources() if not screens: print("NO_SCREENS_FOUND") sys.exit(1) current_urls = set(s["url"] for s in screens) last = load_last_scan() last_urls = set(last.get("urls", [])) new_urls = current_urls - last_urls removed_urls = last_urls - current_urls changed = bool(new_urls or removed_urls) print(f"Total: {len(screens)} | New: {len(new_urls)} | Removed: {len(removed_urls)}") if changed: regenerated = regen_carto(screens) print(f"Carto regenerated: {regenerated}") # Telegram notif msg_parts = [f"*WEVADS Cartographie MAJ* - {len(screens)} ecrans"] if new_urls: msg_parts.append(f"\nNouveaux ({len(new_urls)}):") for u in sorted(new_urls)[:10]: msg_parts.append(f"+ {u.replace('https://','')}") if len(new_urls) > 10: msg_parts.append(f"... et {len(new_urls)-10} autres") if removed_urls: msg_parts.append(f"\nSupprimes ({len(removed_urls)}):") for u in sorted(removed_urls)[:5]: msg_parts.append(f"- {u.replace('https://','')}") msg = "\n".join(msg_parts) notify_telegram(msg) save_last_scan(screens) print("DONE") if __name__ == "__main__": main()