Files
weval-l99/wevia-control-tower.py.PAUSED
2026-04-13 12:43:21 +02:00

294 lines
14 KiB
Python
Executable File

#!/usr/bin/env python3
"""WEVIA CONTROL TOWER v1.0 — Meta-agent: root cause analysis
Tests from OUTSIDE (real user path through Cloudflare→nginx→Authentik→app)
NOT localhost. Catches what other agents miss.
Every 3 hours: full external scan + root cause + meeting report
"""
import subprocess as sp,json,os,time,glob
from datetime import datetime
from pathlib import Path
LOG="/var/log/wevia-control-tower.log"
RESULT="/var/www/html/api/wevia-control-tower.json"
MEETING="/var/www/html/api/wevia-meeting-report.json"
ts=datetime.now()
issues=[]
root_causes=[]
fixes_applied=[]
P=F=W=0
tests=[]
def lg(m):
l=f"[{datetime.now().strftime('%H:%M:%S')}] {m}";print(l,flush=True)
with open(LOG,"a") as f:f.write(l+"\n")
def T(layer,name,ok,detail=""):
global P,F,W
s="P" if ok==True else("W" if ok=="warn" else "F")
if s=="P":P+=1
elif s=="W":W+=1
else:
F+=1
issues.append({"layer":layer,"name":name,"detail":str(detail)[:80]})
tests.append({"layer":layer,"name":name,"status":s,"detail":str(detail)[:80]})
def ext_curl(url,t=8):
"""Test from EXTERNAL path — real user experience"""
try:
r=sp.run(["curl","-sk","-L","--max-redirs","5","-o","/dev/null",
"-w","%{http_code}|%{size_download}|%{url_effective}|%{num_redirects}",
url,"--max-time",str(t)],capture_output=True,text=True,timeout=t+3)
p=r.stdout.strip().split("|")
return {"code":int(p[0]),"size":int(p[1]),"url":p[2],"redirects":int(p[3])}
except:return {"code":0,"size":0,"url":"","redirects":0}
def ext_content(url,t=8):
"""Get actual page content from external"""
try:
r=sp.run(["curl","-sk","-L","--max-redirs","5",url,"--max-time",str(t)],
capture_output=True,text=True,timeout=t+3,errors='replace')
return r.stdout
except:return ""
def root_cause(issue,analysis):
root_causes.append({"issue":issue,"analysis":analysis,"ts":datetime.now().isoformat()})
lg("="*60)
lg(f"CONTROL TOWER — {ts}")
lg("EXTERNAL TESTS (real user path, NOT localhost)")
lg("="*60)
# ═══════════════════════════════════════
# 1. ALL 15 DOMAINS — EXTERNAL FULL CHAIN
# ═══════════════════════════════════════
lg("═══ 1. EXTERNAL DOMAIN SCAN ═══")
domains=[
("weval-consulting.com",200,5000),
("analytics.weval-consulting.com",302,0),
("mm.weval-consulting.com",302,0),
("n8n.weval-consulting.com",302,0),
("auth.weval-consulting.com",302,0),
("paperclip.weval-consulting.com",200,500),
("mirofish.weval-consulting.com",200,100),
("crm.weval-consulting.com",302,0),
("code.weval-consulting.com",200,100),
("deerflow.weval-consulting.com",302,0),
("monitor.weval-consulting.com",302,0),
("wevads.weval-consulting.com",302,0),
("consent.wevup.app",200,500),
("ethica.wevup.app",302,0),
]
for dom,exp_code,min_size in domains:
r=ext_curl(f"https://{dom}/")
ok=r["code"]==exp_code or r["code"] in [200,301,302]
T("EXT-DOMAIN",dom,ok,f"HTTP {r['code']} size={r['size']} redir={r['redirects']}")
if not ok:
root_cause(f"{dom} returns HTTP {r['code']}",
"Check nginx config + upstream service + Cloudflare DNS")
# ═══════════════════════════════════════
# 2. SSO FLOW — REAL AUTHENTIK TEST
# ═══════════════════════════════════════
lg("═══ 2. SSO FLOW (external) ═══")
sso_domains=["analytics","mm","n8n","monitor","crm","deerflow","wevads"]
for dom in sso_domains:
full=f"{dom}.weval-consulting.com"
# Test the SSO flow page loads (not blank, not error)
content=ext_content(f"https://{full}/if/flow/default-authentication-flow/",10)
has_form="input" in content.lower() or "password" in content.lower() or "authentik" in content.lower()
has_error=("api.context.404" in content or "Cannot read" in content or "Unexpected token" in content or "stack trace" in content.lower())
is_blank=len(content)<200
if has_error:
T("SSO-FLOW",full,False,f"SSO ERROR in page ({len(content)}c)")
# Root cause: extract the error
if "404" in content: root_cause(f"{full} SSO 404","Missing /api/v3/ proxy to Authentik in nginx")
elif "Unexpected" in content: root_cause(f"{full} SSO JSON error","nginx returns HTML instead of JSON for /api/v3/ path")
elif "undefined" in content: root_cause(f"{full} SSO undefined","JS variable missing — API response malformed")
elif is_blank:
T("SSO-FLOW",full,False,f"SSO BLANK PAGE ({len(content)}c)")
root_cause(f"{full} blank SSO","Authentik static assets not proxied — check /static/(authentik|dist)/ in nginx")
elif has_form:
T("SSO-FLOW",full,True,f"SSO form OK ({len(content)}c)")
else:
T("SSO-FLOW",full,"warn",f"SSO unclear ({len(content)}c)")
# ═══════════════════════════════════════
# 3. PUBLIC PAGES — CONTENT CHECK
# ═══════════════════════════════════════
lg("═══ 3. PUBLIC PAGES (external) ═══")
public_checks=[
("index.html",10000,["weval","consulting"],["fatal","exception","error"]),
("use-cases.html",10000,["mission","SAP"],["fatal","error"]),
("wevia.html",3000,[],["fatal"]),
("booking.html",1000,[],["fatal"]),
("oss-discovery.html",1000,[],["Cannot read","undefined","fatal"]),
("login.html",1000,[],["fatal"]),
("enterprise-model.html",1000,[],["fatal"]),
]
for pg,min_size,must_have,must_not in public_checks:
content=ext_content(f"https://weval-consulting.com/{pg}",8)
size=len(content)
has_required=all(kw.lower() in content.lower() for kw in must_have) if must_have else True
has_errors=any(err.lower() in content.lower()[:5000] for err in must_not) if must_not else False
ok=size>=min_size and has_required and not has_errors
detail=f"size={size}"
if not has_required: detail+=f" MISSING:{[k for k in must_have if k.lower() not in content.lower()]}"
if has_errors:
detail+=f" ERROR:{[e for e in must_not if e.lower() in content.lower()]}"
root_cause(f"{pg} has JS error","Check API dependencies + cache JSON files")
T("EXT-PAGE",pg,ok,detail)
# ═══════════════════════════════════════
# 4. API FUNCTIONAL — EXTERNAL
# ═══════════════════════════════════════
lg("═══ 4. API FUNCTIONAL (external) ═══")
api_tests=[
("weval-ia-fast.php","POST",'{"message":"ping"}',["response"]),
("wevia-action-engine.php?action=help","GET",None,["actions"]),
("wevia-dashboard.php","GET",None,["system"]),
("wevia-fleet.php","GET",None,["agents"]),
("wevia-capabilities.php?cap=health","GET",None,["services"]),
]
for path,method,data,keys in api_tests:
try:
cmd=["curl","-sk","--max-time","15",f"https://weval-consulting.com/api/{path}"]
if method=="POST":cmd=[*cmd[:2],"-X","POST","-H","Content-Type: application/json","-d",data,*cmd[2:]]
r=sp.run(cmd,capture_output=True,text=True,timeout=18)
body=r.stdout
is_json=body.startswith("{") or body.startswith("[")
has_keys=all(k in body for k in keys) if keys else True
ok=is_json and has_keys and len(body)>20
T("EXT-API",path.split("?")[0],ok,f"{'JSON' if is_json else 'NOT JSON'} {len(body)}c keys={'OK' if has_keys else 'MISS'}")
if not ok:
root_cause(f"API {path} failed","Check PHP syntax + dependencies + database connection")
except Exception as e:
T("EXT-API",path.split("?")[0],False,str(e)[:40])
time.sleep(1)
# ═══════════════════════════════════════
# 5. CHATBOT — EXTERNAL REAL TEST
# ═══════════════════════════════════════
lg("═══ 5. CHATBOT (external) ═══")
try:
r=sp.run(["curl","-sk","--max-time","20","-X","POST","-H","Content-Type: application/json",
"-d",'{"message":"Quels sont les services de WEVAL Consulting?"}',
"https://weval-consulting.com/api/weval-ia-fast.php"],capture_output=True,text=True,timeout=25)
d=json.loads(r.stdout)
resp=d.get("response","")
provider=d.get("provider","?")
is_maintenance="maintenance" in resp.lower()
is_short=len(resp)<50
is_truncated=resp.endswith("...") or len(resp)<100
if is_maintenance:
T("CHATBOT","response",False,"MAINTENANCE MODE")
root_cause("Chatbot maintenance","PHP Fatal error — check duplicate functions + cognitive-wire syntax")
elif is_short:
T("CHATBOT","response",False,f"TOO SHORT ({len(resp)}c) provider={provider}")
root_cause("Chatbot short response",f"Provider {provider} may be truncating — check token limits")
else:
T("CHATBOT","response",True,f"{provider[:15]} {len(resp)}c")
except Exception as e:
T("CHATBOT","response",False,str(e)[:40])
# ═══════════════════════════════════════
# 6. CROSS-CHECK AGENT OUTPUTS
# ═══════════════════════════════════════
lg("═══ 6. AGENT CROSS-CHECK ═══")
agent_files=[
("/var/www/html/api/wevia-quality-status.json","QUALITY","global_rate",90),
("/var/www/html/api/wevia-antiregression-status.json","ANTIREG","healthy",True),
("/var/www/html/api/wevia-auth-status.json","AUTH","healthy",True),
("/var/www/html/api/l99-ux-results.json","L99-UX","pass",140),
("/var/www/html/api/l99-deep-scan.json","DEEP-SCAN","pass",100),
]
for path,name,key,min_val in agent_files:
try:
d=json.loads(open(path).read())
val=d.get(key,0)
if isinstance(min_val,bool):
ok=val==min_val
else:
ok=val>=min_val if isinstance(val,(int,float)) else len(str(val))>0
T("AGENT-CHECK",name,ok,f"{key}={val}")
if not ok:
root_cause(f"Agent {name}: {key}={val} (expected >={min_val})",
"Agent may be stale or detecting real issues — investigate")
except:T("AGENT-CHECK",name,"warn","file missing")
# ═══════════════════════════════════════
# 7. INFRA HEALTH
# ═══════════════════════════════════════
lg("═══ 7. INFRA ═══")
disk=int(sp.run("df -h /|tail -1|awk '{print $5}'|tr -d '%'",shell=True,capture_output=True,text=True,timeout=3).stdout.strip() or 0)
T("INFRA","disk",disk<85,f"{disk}%")
if disk>=85:root_cause(f"Disk {disk}%","Run docker system prune + clean old logs + remove unused images")
docker_count=int(sp.run("docker ps -q|wc -l",shell=True,capture_output=True,text=True,timeout=3).stdout.strip() or 0)
T("INFRA","docker",docker_count>=18,f"{docker_count} containers")
nginx_ok="successful" in sp.run(["nginx","-t"],capture_output=True,text=True,timeout=3).stderr
T("INFRA","nginx",nginx_ok,"syntax")
if not nginx_ok:root_cause("Nginx syntax error","Check recent config changes — revert if needed")
# ═══════════════════════════════════════
# 8. MEETING REPORT
# ═══════════════════════════════════════
lg("═══ 8. MEETING REPORT ═══")
total=P+F+W
meeting={
"timestamp":ts.isoformat(),
"type":"control_tower_3h",
"score":f"{P}/{total}",
"pct":round(P/total*100,1) if total else 0,
"issues_found":len(issues),
"root_causes_identified":len(root_causes),
"fixes_applied":len(fixes_applied),
"issues":issues,
"root_causes":root_causes,
"action_items":[],
"next_meeting":(ts.replace(hour=(ts.hour+3)%24)).strftime("%H:%M"),
}
# Generate action items from root causes
for rc in root_causes:
meeting["action_items"].append({
"issue":rc["issue"],
"action":rc["analysis"],
"priority":"HIGH" if "SSO" in rc["issue"] or "chatbot" in rc["issue"].lower() else "MEDIUM",
"owner":"SQUAD_INFRA" if "nginx" in rc["analysis"].lower() else "SQUAD_QA"
})
json.dump(meeting,open(MEETING,"w"),indent=2)
# ═══ SAVE ═══
result={
"tests":tests,"timestamp":ts.isoformat(),"type":"control-tower",
"pass":P,"fail":F,"warn":W,"total":total,
"pct":round(P/total*100,1) if total else 0,
"issues":issues,"root_causes":root_causes,
"meeting":meeting
}
json.dump(result,open(RESULT,"w"),indent=2)
# Push to KB
try:
import urllib.parse as up
fact=f"Control Tower {ts.strftime('%H:%M')}: {P}/{total} ({result['pct']}%). Issues:{len(issues)}. RootCauses:{len(root_causes)}."
sp.run(["curl","-sk",f"https://weval-consulting.com/api/wevia-action-engine.php?action=kb_add&cat=CONTROL-TOWER&fact={up.quote(fact)}","--max-time","3"],
capture_output=True,timeout=5)
except:pass
lg(f"\n{'='*60}")
lg(f"CONTROL TOWER: {P}/{total} ({result['pct']}%)")
lg(f"Issues: {len(issues)} | Root Causes: {len(root_causes)}")
if root_causes:
lg("ROOT CAUSES:")
for rc in root_causes:
lg(f" ⚠️ {rc['issue']}")
lg(f"{rc['analysis']}")
lg(f"Next meeting: {meeting['next_meeting']}")
lg(f"{'='*60}")