Files
weval-l99/screens-autodiscovery.py.pre-subdom-20260416_163856
2026-04-16 16:39:53 +02:00

163 lines
6.5 KiB
Python

#!/usr/bin/env python3
# /opt/weval-l99/screens-autodiscovery.py
# Detecte nouvelles pages HTML, regenere cartographie, notifie Telegram.
# Tourne toutes les 5min via cron. Idempotent, anti-regression.
import json, os, sys, re, time, subprocess, tempfile
from urllib.request import Request, urlopen
from urllib.parse import urlencode
CARTO_FILE = "/var/www/html/cartographie-screens.html"
LAST_SCAN_FILE = "/var/www/html/api/screens-last-scan.json"
TELEGRAM_TOKEN_FILE = "/opt/wevads/vault/telegram_token.txt"
TELEGRAM_CHAT = "7605775322"
TELEGRAM_BOT_FALLBACK = "wevia_cyber_bot"
def scan_sources():
"""Scan all 3 sources: S204 local, S95 Arsenal, S95 WEVADS."""
sources = {
"S204": {"path": "/var/www/html", "max_depth": 1, "url_prefix": "https://weval-consulting.com/"},
"S204-PHP": {"path": "/var/www/html", "max_depth": 1, "url_prefix": "https://weval-consulting.com/", "ext": ".php"},
"S95-Arsenal": {"path": "/opt/wevads-arsenal/public", "max_depth": 1, "url_prefix": "https://wevads.weval-consulting.com/", "ssh": True},
"S95-WEVADS": {"path": "/var/www/html", "max_depth": 1, "url_prefix": "https://wevads.weval-consulting.com/wv/", "ssh": True},
}
screens = []
for srv, cfg in sources.items():
ext = cfg.get("ext", ".html")
p = cfg["path"]
if cfg.get("ssh"):
cmd = "sudo ssh -p 49222 -o StrictHostKeyChecking=no -i /var/www/.ssh/wevads_key root@10.1.0.3 'ls " + p + "/*" + ext + " 2>/dev/null | xargs -n1 basename'"
else:
cmd = "ls " + p + "/*" + ext + " 2>/dev/null | xargs -n1 basename"
try:
out = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=25).stdout
for line in out.strip().split("\n"):
line = line.strip()
if not line or not line.endswith(ext):
continue
screens.append({"name": line, "server": srv, "url": cfg["url_prefix"] + line, "cat": classify(line)})
except Exception:
continue
return screens
CAT_RULES = [
(r"^api[-_]|-api\.|_api\.", "API"),
(r"brain|hamid|claude|wevia", None), # special
(r"offer|convers", "Offers/Conv"),
(r"monitor|dashboard|health", "Monitoring"),
(r"admin|config|permissions", "Admin"),
(r"scrap|harvest|scout|crawl", "Scraping"),
(r"account|factory|persona", "Accounts"),
(r"mail|send|email|warmup|pmta|smtp", "Email/Send"),
]
def classify(name):
n = name.lower()
for pat, cat in CAT_RULES:
if re.search(pat, n):
if cat is None:
if "claude" in n: return "Claude"
if "wevia" in n: return "WEVIA"
return "Brain/IA"
return cat
return "Autres"
def load_last_scan():
if not os.path.exists(LAST_SCAN_FILE):
return {"urls": []}
try:
return json.load(open(LAST_SCAN_FILE))
except Exception:
return {"urls": []}
def save_last_scan(screens):
out = {"ts": time.strftime("%Y-%m-%dT%H:%M:%S%z"), "total": len(screens), "urls": sorted([s["url"] for s in screens])}
tmp = tempfile.NamedTemporaryFile(mode="w", delete=False, dir=os.path.dirname(LAST_SCAN_FILE), suffix=".tmp")
json.dump(out, tmp)
tmp.close()
os.replace(tmp.name, LAST_SCAN_FILE)
def read_telegram_token():
try:
with open(TELEGRAM_TOKEN_FILE) as f:
return f.read().strip()
except Exception:
return None
def notify_telegram(text):
token = read_telegram_token()
if not token:
print("NO_TELEGRAM_TOKEN, skipping notif")
return False
url = f"https://api.telegram.org/bot{token}/sendMessage"
data = urlencode({"chat_id": TELEGRAM_CHAT, "text": text, "parse_mode": "Markdown"}).encode()
try:
req = Request(url, data=data, method="POST")
resp = urlopen(req, timeout=5)
return resp.status == 200
except Exception as e:
print(f"TELEGRAM_ERR: {e}")
return False
def regen_carto(screens):
"""Regenerate DATA and CATS in cartographie-screens.html."""
if not os.path.exists(CARTO_FILE):
return False
with open(CARTO_FILE, "r", encoding="utf-8") as f:
html = f.read()
# Compute counts per category
cat_counts = {}
for s in screens:
cat_counts[s["cat"]] = cat_counts.get(s["cat"], 0) + 1
cat_sorted = dict(sorted(cat_counts.items(), key=lambda x: -x[1]))
data_str = json.dumps(screens, ensure_ascii=False, separators=(",", ":"))
cats_str = json.dumps(cat_sorted, ensure_ascii=False, separators=(",", ":"))
# Replace DATA
html2 = re.sub(r"const DATA = \[.*?\];", f"const DATA = {data_str};", html, count=1, flags=re.DOTALL)
# Replace CATS
html2 = re.sub(r"const CATS = \{.*?\};", f"const CATS = {cats_str};", html2, count=1, flags=re.DOTALL)
# Update header count
html2 = re.sub(r"(\d+) ecrans total reperes", f"{len(screens)} ecrans total reperes", html2, count=1)
if html2 == html:
return False
# Atomic write
backup = CARTO_FILE + ".pre-autodisc-" + time.strftime("%Y%m%d_%H%M%S")
with open(backup, "w", encoding="utf-8") as f:
f.write(html)
with open(CARTO_FILE, "w", encoding="utf-8") as f:
f.write(html2)
return True
def main():
screens = scan_sources()
if not screens:
print("NO_SCREENS_FOUND")
sys.exit(1)
current_urls = set(s["url"] for s in screens)
last = load_last_scan()
last_urls = set(last.get("urls", []))
new_urls = current_urls - last_urls
removed_urls = last_urls - current_urls
changed = bool(new_urls or removed_urls)
print(f"Total: {len(screens)} | New: {len(new_urls)} | Removed: {len(removed_urls)}")
if changed:
regenerated = regen_carto(screens)
print(f"Carto regenerated: {regenerated}")
# Telegram notif
msg_parts = [f"*WEVADS Cartographie MAJ* - {len(screens)} ecrans"]
if new_urls:
msg_parts.append(f"\nNouveaux ({len(new_urls)}):")
for u in sorted(new_urls)[:10]:
msg_parts.append(f"+ {u.replace('https://','')}")
if len(new_urls) > 10:
msg_parts.append(f"... et {len(new_urls)-10} autres")
if removed_urls:
msg_parts.append(f"\nSupprimes ({len(removed_urls)}):")
for u in sorted(removed_urls)[:5]:
msg_parts.append(f"- {u.replace('https://','')}")
msg = "\n".join(msg_parts)
notify_telegram(msg)
save_last_scan(screens)
print("DONE")
if __name__ == "__main__":
main()