Files
weval-l99/gap-detector.py
2026-04-13 12:43:21 +02:00

122 lines
4.9 KiB
Python

#!/usr/bin/env python3
"""WEVIA GAP DETECTOR v1.0 — Cross-reference /opt/ tools vs fast.php routes
Detects installed tools NOT wired into the chatbot. Reports to Mattermost.
Runs as CORTEX check #14 or standalone via cron."""
import json, os, re, subprocess, urllib.request, ssl
from datetime import datetime
ssl._create_default_https_context = ssl._create_unverified_context
MATTERMOST = "http://localhost:8065/hooks/pt54hzthf3b6pe6rgp1ionipnh"
FAST_PHP = "/var/www/html/api/weval-ia-fast.php"
RESULTS_FILE = "/var/www/html/api/gap-detector.json"
def log(msg):
print(f"[{datetime.now().strftime('%H:%M:%S')}] {msg}")
def main():
log("═══ WEVIA GAP DETECTOR v1.0 ═══")
# 1. Read fast.php content (all routes)
fast_content = open(FAST_PHP).read().lower() if os.path.exists(FAST_PHP) else ""
route_count = fast_content.count("// route")
log(f"fast.php: {route_count} routes")
# 2. Scan /opt/ for installed tools
opt_dirs = []
skip_patterns = ['-data', 'vault', 'node-v', 'node18', 'isolated', 'containerd', 'guard', 'backups', 'loki-', 'mattermost', 'authentik', 'google', 'hf-spaces']
for d in sorted(os.listdir("/opt/")):
path = f"/opt/{d}"
if not os.path.isdir(path): continue
if any(s in d.lower() for s in skip_patterns): continue
file_count = len(os.listdir(path))
if file_count < 3: continue
opt_dirs.append({"name": d, "files": file_count, "path": path})
log(f"/opt/ tools: {len(opt_dirs)}")
# 3. Cross-reference: is each tool mentioned in fast.php?
wired = []
not_wired = []
for tool in opt_dirs:
name = tool["name"].lower().replace("-", "").replace("_", "")
# Check various name patterns
patterns = [tool["name"].lower(), name, tool["name"].lower()[:6]]
found = any(p in fast_content for p in patterns if len(p) > 3)
if found:
wired.append(tool["name"])
else:
not_wired.append(tool)
log(f"Wired: {len(wired)} | NOT wired: {len(not_wired)}")
# 4. Check Docker services vs routes
docker_services = []
try:
r = subprocess.run(["docker", "ps", "--format", "{{.Names}}:{{.Ports}}"], capture_output=True, text=True, timeout=5)
for line in r.stdout.strip().split("\n"):
if not line: continue
parts = line.split(":")
name = parts[0]
docker_services.append(name)
except: pass
docker_not_wired = [s for s in docker_services if s.lower().replace("-","") not in fast_content and len(s) > 3]
log(f"Docker: {len(docker_services)} running | {len(docker_not_wired)} not in fast.php")
# 5. Check running ports vs routes
ports_not_wired = []
for port_name, port in [("flowise", 3002), ("nocodb", 8234), ("uptime-kuma", 3088), ("prometheus", 9090)]:
if port_name not in fast_content:
ports_not_wired.append(f"{port_name}:{port}")
# 6. Build report
report = {
"timestamp": datetime.now().strftime('%Y-%m-%d %H:%M'),
"fast_php_routes": route_count,
"opt_tools_total": len(opt_dirs),
"wired": len(wired),
"not_wired_count": len(not_wired),
"not_wired_tools": [{"name": t["name"], "files": t["files"]} for t in not_wired],
"docker_total": len(docker_services),
"docker_not_wired": docker_not_wired,
"score": round(len(wired) / max(len(opt_dirs), 1) * 100, 1),
}
# 7. Save JSON report
try:
with open(RESULTS_FILE, "w") as f:
json.dump(report, f, indent=2, ensure_ascii=False)
log(f"Report saved: {RESULTS_FILE}")
except: pass
# 8. Mattermost alert if gaps > 5
if len(not_wired) > 5:
emoji = "🔴" if len(not_wired) > 15 else "🟡"
mm_text = f"""{emoji} **WEVIA GAP DETECTOR** — {report['timestamp']}
**Routes:** {route_count} | **Tools /opt/:** {len(opt_dirs)} | **Wired:** {len(wired)} | **Score:** {report['score']}%
**NOT WIRED ({len(not_wired)}):**
{chr(10).join(['- ' + t['name'] + ' (' + str(t['files']) + ' files)' for t in not_wired[:10]])}
{'...' if len(not_wired) > 10 else ''}
**Docker NOT WIRED:** {', '.join(docker_not_wired[:5]) if docker_not_wired else 'all OK'}"""
try:
req = urllib.request.Request(MATTERMOST, json.dumps({"text": mm_text}).encode(),
headers={"Content-Type": "application/json"})
urllib.request.urlopen(req, timeout=10)
log("Mattermost alert sent ✅")
except: pass
# 9. Print summary
log(f"SCORE: {report['score']}% ({len(wired)}/{len(opt_dirs)} tools wired)")
if not_wired:
log("NOT WIRED:")
for t in not_wired:
log(f"{t['name']:35s} ({t['files']} files)")
log("DONE ✅")
try:
main()
except Exception as e:
print(f"GAP DETECTOR CRASHED: {e}")
import traceback; traceback.print_exc()