- Deploy /widgets/audit-banner.js (non-destructif, 3056 bytes) - Wire architecture-map.html + security-hub.html + security-dashboard.html (GOLD backups) - Deploy /live-status.php + /api/linkedin-alignment-kpi.php - Inject 2 intents WEVIA Master: audit_linkedin + vague1_execute - 10 sections audit embedded in Qdrant wevia_kb_768 - L99 score: 304/304 (zero regression) - chattr +i restored on all modified files
190 lines
8.0 KiB
Python
Executable File
190 lines
8.0 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
# /opt/weval-l99/screens-autodiscovery.py
|
|
# Detecte nouvelles pages HTML, regenere cartographie, notifie Telegram.
|
|
# Tourne toutes les 5min via cron. Idempotent, anti-regression.
|
|
import json, os, subprocess, sys, re, time, subprocess, tempfile
|
|
from urllib.request import Request, urlopen
|
|
from urllib.parse import urlencode
|
|
|
|
CARTO_FILE = "/var/www/html/cartographie-screens.html"
|
|
LAST_SCAN_FILE = "/var/www/html/api/screens-last-scan.json"
|
|
TELEGRAM_TOKEN_FILE = "/opt/wevads/vault/telegram_token.txt"
|
|
TELEGRAM_CHAT = "7605775322"
|
|
TELEGRAM_BOT_FALLBACK = "wevia_cyber_bot"
|
|
|
|
def scan_sources():
|
|
"""Scan all 3 sources: S204 local, S95 Arsenal, S95 WEVADS."""
|
|
sources = {
|
|
"S204": {"path": "/var/www/html", "max_depth": 1, "url_prefix": "https://weval-consulting.com/"},
|
|
"S204-PHP": {"path": "/var/www/html", "max_depth": 1, "url_prefix": "https://weval-consulting.com/", "ext": ".php"},
|
|
"S95-Arsenal": {"path": "/opt/wevads-arsenal/public", "max_depth": 1, "url_prefix": "https://wevads.weval-consulting.com/", "ssh": True},
|
|
"S95-WEVADS": {"path": "/var/www/html", "max_depth": 1, "url_prefix": "https://wevads.weval-consulting.com/wv/", "ssh": True},
|
|
# OPUS_ADDED_SUBDOMAINS 16avr - gaps detected via coverage_audit
|
|
"Gitea-API": {"path": "", "url_prefix": "https://git.weval-consulting.com/", "sub_urls": ["explore/repos", "explore/users", "issues", "notifications", "user/settings", "api/v1/repos/search?limit=20"], "direct": True},
|
|
"Chat-routes": {"path": "", "url_prefix": "https://chat.weval-consulting.com/", "sub_urls": ["", "login", "admin", "dashboard"], "direct": True},
|
|
}
|
|
screens = []
|
|
for srv, cfg in sources.items():
|
|
ext = cfg.get("ext", ".html")
|
|
p = cfg["path"]
|
|
if cfg.get("ssh"):
|
|
cmd = "sudo ssh -p 49222 -o StrictHostKeyChecking=no -i /var/www/.ssh/wevads_key root@10.1.0.3 'ls " + p + "/*" + ext + " 2>/dev/null | xargs -n1 basename'"
|
|
else:
|
|
cmd = "ls " + p + "/*" + ext + " 2>/dev/null | xargs -n1 basename"
|
|
try:
|
|
out = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=25).stdout
|
|
for line in out.strip().split("\n"):
|
|
line = line.strip()
|
|
if not line or not line.endswith(ext):
|
|
continue
|
|
screens.append({"name": line, "server": srv, "url": cfg["url_prefix"] + line, "cat": classify(line)})
|
|
except Exception:
|
|
continue
|
|
return screens
|
|
|
|
CAT_RULES = [
|
|
(r"^api[-_]|-api\.|_api\.", "API"),
|
|
(r"brain|hamid|claude|wevia", None), # special
|
|
(r"offer|convers", "Offers/Conv"),
|
|
(r"monitor|dashboard|health", "Monitoring"),
|
|
(r"admin|config|permissions", "Admin"),
|
|
(r"scrap|harvest|scout|crawl", "Scraping"),
|
|
(r"account|factory|persona", "Accounts"),
|
|
(r"mail|send|email|warmup|pmta|smtp", "Email/Send"),
|
|
]
|
|
def classify(name):
|
|
n = name.lower()
|
|
for pat, cat in CAT_RULES:
|
|
if re.search(pat, n):
|
|
if cat is None:
|
|
if "claude" in n: return "Claude"
|
|
if "wevia" in n: return "WEVIA"
|
|
return "Brain/IA"
|
|
return cat
|
|
return "Autres"
|
|
|
|
def load_last_scan():
|
|
if not os.path.exists(LAST_SCAN_FILE):
|
|
return {"urls": []}
|
|
try:
|
|
return json.load(open(LAST_SCAN_FILE))
|
|
except Exception:
|
|
return {"urls": []}
|
|
|
|
def save_last_scan(screens):
|
|
out = {"ts": time.strftime("%Y-%m-%dT%H:%M:%S%z"), "total": len(screens), "urls": sorted([s["url"] for s in screens])}
|
|
tmp = tempfile.NamedTemporaryFile(mode="w", delete=False, dir=os.path.dirname(LAST_SCAN_FILE), suffix=".tmp")
|
|
json.dump(out, tmp)
|
|
tmp.close()
|
|
os.replace(tmp.name, LAST_SCAN_FILE)
|
|
|
|
def read_telegram_token():
|
|
try:
|
|
with open(TELEGRAM_TOKEN_FILE) as f:
|
|
return f.read().strip()
|
|
except Exception:
|
|
return None
|
|
|
|
def notify_telegram(text):
|
|
token = read_telegram_token()
|
|
if not token:
|
|
print("NO_TELEGRAM_TOKEN, skipping notif")
|
|
return False
|
|
url = f"https://api.telegram.org/bot{token}/sendMessage"
|
|
data = urlencode({"chat_id": TELEGRAM_CHAT, "text": text, "parse_mode": "Markdown"}).encode()
|
|
try:
|
|
req = Request(url, data=data, method="POST")
|
|
resp = urlopen(req, timeout=5)
|
|
return resp.status == 200
|
|
except Exception as e:
|
|
print(f"TELEGRAM_ERR: {e}")
|
|
return False
|
|
|
|
def regen_carto(screens):
|
|
"""Regenerate DATA and CATS in cartographie-screens.html."""
|
|
if not os.path.exists(CARTO_FILE):
|
|
return False
|
|
with open(CARTO_FILE, "r", encoding="utf-8") as f:
|
|
html = f.read()
|
|
# MERGE: append-only, keep existing, only add new URLs
|
|
import re as _re
|
|
_m = _re.search(r"const DATA = (\\[.*?\\]);", html, _re.DOTALL)
|
|
if _m:
|
|
try:
|
|
_existing = __import__("json").loads(_m.group(1))
|
|
_existing_urls = {e["url"] for e in _existing}
|
|
_new_only = [s for s in screens if s["url"] not in _existing_urls]
|
|
# NEVER_SHRINK_GUARD
|
|
if len(screens) < len(_existing) * 0.9:
|
|
print("GUARD: scan=%d < existing=%d*0.9 - REFUSING" % (len(screens), len(_existing)))
|
|
return False
|
|
if not _new_only:
|
|
print(f"No new screens (existing={len(_existing)})")
|
|
return False
|
|
screens = _existing + _new_only
|
|
print(f"MERGE: {len(_existing)}+{len(_new_only)}={len(screens)}")
|
|
except Exception as e:
|
|
print(f"MERGE_ERR: {e}")
|
|
# Compute counts per category
|
|
cat_counts = {}
|
|
for s in screens:
|
|
cat_counts[s["cat"]] = cat_counts.get(s["cat"], 0) + 1
|
|
cat_sorted = dict(sorted(cat_counts.items(), key=lambda x: -x[1]))
|
|
data_str = json.dumps(screens, ensure_ascii=False, separators=(",", ":"))
|
|
cats_str = json.dumps(cat_sorted, ensure_ascii=False, separators=(",", ":"))
|
|
# Replace DATA
|
|
html2 = re.sub(r"const DATA = \[.*?\];", f"const DATA = {data_str};", html, count=1, flags=re.DOTALL)
|
|
# Replace CATS
|
|
html2 = re.sub(r"const CATS = \{.*?\};", f"const CATS = {cats_str};", html2, count=1, flags=re.DOTALL)
|
|
# Update header count
|
|
html2 = re.sub(r"(\d+) ecrans total reperes", f"{len(screens)} ecrans total reperes", html2, count=1)
|
|
if html2 == html:
|
|
return False
|
|
# Atomic write
|
|
backup = CARTO_FILE + ".pre-autodisc-" + time.strftime("%Y%m%d_%H%M%S")
|
|
with open(backup, "w", encoding="utf-8") as f:
|
|
f.write(html)
|
|
# chattr toggle for immutable files
|
|
subprocess.run(["sudo", "chattr", "-i", CARTO_FILE], capture_output=True, timeout=5)
|
|
try:
|
|
with open(CARTO_FILE, "w", encoding="utf-8") as f:
|
|
f.write(html2)
|
|
finally:
|
|
subprocess.run(["sudo", "chattr", "+i", CARTO_FILE], capture_output=True, timeout=5)
|
|
return True
|
|
|
|
def main():
|
|
screens = scan_sources()
|
|
if not screens:
|
|
print("NO_SCREENS_FOUND")
|
|
sys.exit(1)
|
|
current_urls = set(s["url"] for s in screens)
|
|
last = load_last_scan()
|
|
last_urls = set(last.get("urls", []))
|
|
new_urls = current_urls - last_urls
|
|
removed_urls = last_urls - current_urls
|
|
changed = bool(new_urls or removed_urls)
|
|
print(f"Total: {len(screens)} | New: {len(new_urls)} | Removed: {len(removed_urls)}")
|
|
if changed:
|
|
regenerated = regen_carto(screens)
|
|
print(f"Carto regenerated: {regenerated}")
|
|
# Telegram notif
|
|
msg_parts = [f"*WEVADS Cartographie MAJ* - {len(screens)} ecrans"]
|
|
if new_urls:
|
|
msg_parts.append(f"\nNouveaux ({len(new_urls)}):")
|
|
for u in sorted(new_urls)[:10]:
|
|
msg_parts.append(f"+ {u.replace('https://','')}")
|
|
if len(new_urls) > 10:
|
|
msg_parts.append(f"... et {len(new_urls)-10} autres")
|
|
if removed_urls:
|
|
msg_parts.append(f"\nSupprimes ({len(removed_urls)}):")
|
|
for u in sorted(removed_urls)[:5]:
|
|
msg_parts.append(f"- {u.replace('https://','')}")
|
|
msg = "\n".join(msg_parts)
|
|
notify_telegram(msg)
|
|
save_last_scan(screens)
|
|
print("DONE")
|
|
|
|
if __name__ == "__main__":
|
|
main()
|