#!/usr/bin/env python3 """WEVIA SYSTEMATIC CONTROL v1.0 Auto-discovers and tests EVERYTHING on ALL 4 machines: - ALL ports, ALL pages, ALL APIs, ALL crons, ALL Docker - ALL providers with REAL code generation test - ALL authentication flows - Updates wiki + L99 dynamically Cron: */2h """ import subprocess as sp, json, os, time, glob from datetime import datetime from pathlib import Path LOG = "/var/log/wevia-systematic.log" STATUS = "/var/www/html/api/wevia-systematic-status.json" KB_API = "https://weval-consulting.com/api/wevia-action-engine.php" ts = datetime.now() R = {"timestamp": ts.isoformat(), "sections": {}, "discoveries": [], "alerts": []} def lg(m): l = f"[{datetime.now().strftime('%H:%M:%S')}] {m}" print(l, flush=True) with open(LOG, "a") as f: f.write(l + "\n") def cmd(c, t=10): try: r = sp.run(c, shell=True, capture_output=True, text=True, timeout=t, errors='replace') return r.stdout.strip() except: return "" def curl_code(url, t=5): try: r = sp.run(["curl","-sk","-o","/dev/null","-w","%{http_code}",url,"--max-time",str(t)], capture_output=True, text=True, timeout=t+3) return int(r.stdout.strip()) except: return 0 def kb_add(cat, fact): try: import urllib.parse as up sp.run(["curl","-sk",f"{KB_API}?action=kb_add&cat={cat}&fact={up.quote(fact[:250])}","--max-time","3"], capture_output=True, timeout=5) except: pass lg("=" * 60) lg(f"SYSTEMATIC CONTROL — {ts}") # ═══════════════════════════════════════════ # 1. AUTO-DISCOVER ALL PORTS ON ALL SERVERS # ═══════════════════════════════════════════ lg("═══ 1. PORT DISCOVERY ═══") ports = {} # S204 s204_ports = cmd("ss -tlnp | grep LISTEN | awk '{print $4}' | grep -oP ':\\K\\d+' | sort -un") ports["S204"] = s204_ports.split("\n") if s204_ports else [] lg(f" S204: {len(ports['S204'])} ports") # S95 s95_ports = cmd("curl -sf --max-time 5 'http://10.1.0.3:5890/api/sentinel-brain.php?action=exec&cmd=ss+-tlnp+|+grep+LISTEN+|+awk+%27{print+$4}%27+|+grep+-oP+%27:\\\\K\\\\d%2B%27+|+sort+-un'") try: d = json.loads(s95_ports) ports["S95"] = d.get("output","").split("\n") except: ports["S95"] = [] lg(f" S95: {len(ports['S95'])} ports") # S151 s151_code = curl_code("http://151.80.235.110/", 5) ports["S151"] = ["80"] if s151_code > 0 else [] lg(f" S151: reachable={s151_code > 0}") R["sections"]["ports"] = ports # ═══════════════════════════════════════════ # 2. AUTO-DISCOVER NEW PAGES + APIs # ═══════════════════════════════════════════ lg("═══ 2. PAGE/API DISCOVERY ═══") current_pages = sorted(glob.glob("/var/www/html/*.html")) current_apis = sorted(glob.glob("/var/www/html/api/*.php")) # Check for pages added since last scan last_scan = "/var/www/html/api/wevia-systematic-last.json" if os.path.exists(last_scan): old = json.load(open(last_scan)) old_pages = set(old.get("pages", [])) old_apis = set(old.get("apis", [])) new_pages = [p for p in current_pages if os.path.basename(p) not in old_pages] new_apis = [a for a in current_apis if os.path.basename(a) not in old_apis] if new_pages: R["discoveries"].append(f"{len(new_pages)} new pages: {[os.path.basename(p) for p in new_pages[:5]]}") lg(f" NEW PAGES: {[os.path.basename(p) for p in new_pages[:5]]}") for p in new_pages: kb_add("DISCOVERY", f"New page detected: {os.path.basename(p)}") if new_apis: R["discoveries"].append(f"{len(new_apis)} new APIs: {[os.path.basename(a) for a in new_apis[:5]]}") lg(f" NEW APIs: {[os.path.basename(a) for a in new_apis[:5]]}") for a in new_apis: kb_add("DISCOVERY", f"New API detected: {os.path.basename(a)}") # Save current state for next run json.dump({ "pages": [os.path.basename(p) for p in current_pages], "apis": [os.path.basename(a) for a in current_apis], "timestamp": ts.isoformat() }, open(last_scan, "w")) R["sections"]["inventory"] = {"pages": len(current_pages), "apis": len(current_apis)} lg(f" Pages: {len(current_pages)} | APIs: {len(current_apis)}") # ═══════════════════════════════════════════ # 3. ALL CRONS CHECK (root + www-data + cron.d) # ═══════════════════════════════════════════ lg("═══ 3. CRONS ═══") crons = {} crons["root"] = int(cmd("crontab -l 2>/dev/null | grep -v '^#' | grep -v '^$' | wc -l") or 0) crons["www_data"] = int(cmd("sudo -u www-data crontab -l 2>/dev/null | grep -v '^#' | grep -v '^$' | wc -l") or 0) crons["cron_d"] = int(cmd("ls /etc/cron.d/ 2>/dev/null | wc -l") or 0) # Check each cron is actually running (not stuck) cron_list = cmd("crontab -l 2>/dev/null | grep -v '^#' | grep -v '^$'") crons["total"] = crons["root"] + crons["www_data"] + crons["cron_d"] R["sections"]["crons"] = crons lg(f" root={crons['root']} www-data={crons['www_data']} cron.d={crons['cron_d']} total={crons['total']}") kb_add("INFRA", f"Crons: root={crons['root']} www-data={crons['www_data']} cron.d={crons['cron_d']} total={crons['total']}") # ═══════════════════════════════════════════ # 4. ALL DOCKER CONTAINERS # ═══════════════════════════════════════════ lg("═══ 4. DOCKER ═══") docker = [] r = cmd("docker ps --format '{{.Names}}|{{.Status}}|{{.Ports}}'") for line in r.split("\n"): if "|" in line: parts = line.split("|") docker.append({"name": parts[0], "status": parts[1][:20], "ports": parts[2][:50] if len(parts)>2 else ""}) R["sections"]["docker"] = docker lg(f" {len(docker)} containers running") # ═══════════════════════════════════════════ # 5. AI PROVIDERS FUNCTIONAL TEST # ═══════════════════════════════════════════ lg("═══ 5. PROVIDER FUNCTIONAL TEST ═══") providers = [] # Load secrets secrets = {} try: for line in open("/etc/weval/secrets.env"): if "=" in line and not line.startswith("#"): k, v = line.strip().split("=", 1) secrets[k] = v except: pass # Test each provider with a REAL code generation request provider_tests = [ ("Cerebras", "CEREBRAS_API_KEY", "https://api.cerebras.ai/v1/chat/completions", '{"model":"qwen-3-32b","messages":[{"role":"user","content":"Write a Python function to compute factorial. Return ONLY the code."}],"max_tokens":200}'), ("Groq", "GROQ_KEY", "https://api.groq.com/openai/v1/chat/completions", '{"model":"llama-3.3-70b-versatile","messages":[{"role":"user","content":"Write a Python function to compute factorial. Return ONLY the code."}],"max_tokens":200}'), ("Gemini", "GEMINI_KEY", None, None), ("DeepSeek", "DEEPSEEK_KEY", "https://api.deepseek.com/chat/completions", '{"model":"deepseek-chat","messages":[{"role":"user","content":"Write a Python function to compute factorial. Return ONLY the code."}],"max_tokens":200}'), ("Mistral", "MISTRAL_KEY", "https://api.mistral.ai/v1/chat/completions", '{"model":"open-mistral-7b","messages":[{"role":"user","content":"Write a Python function to compute factorial. Return ONLY the code."}],"max_tokens":200}'), ] for name, key_env, url, payload in provider_tests: key = secrets.get(key_env, "") if not key or not url: providers.append({"name": name, "status": "NO_KEY", "ok": False}) continue if name == "Gemini": # Gemini uses different URL format gem_url = f"https://generativelanguage.googleapis.com/v1beta/models/gemini-2.0-flash:generateContent?key={key}" gem_body = '{"contents":[{"parts":[{"text":"Write a Python function to compute factorial. Return ONLY the code, at least 10 lines."}]}]}' try: r = sp.run(["curl","-sf","--max-time","15","-X","POST","-H","Content-Type: application/json", "-d",gem_body,gem_url], capture_output=True, text=True, timeout=18) d = json.loads(r.stdout) text = d.get("candidates",[{}])[0].get("content",{}).get("parts",[{}])[0].get("text","") has_def = "def " in text length = len(text) truncated = length < 100 providers.append({"name": name, "status": "OK" if has_def and not truncated else "TRUNCATED", "ok": has_def and not truncated, "length": length, "has_code": has_def}) if truncated: R["alerts"].append(f"{name}: response truncated ({length}c) — may need model change") lg(f" ⚠️ {name}: TRUNCATED {length}c") except Exception as e: providers.append({"name": name, "status": "ERROR", "ok": False, "error": str(e)[:30]}) continue try: r = sp.run(["curl","-sf","--max-time","15","-X","POST", "-H","Content-Type: application/json","-H",f"Authorization: Bearer {key}", "-d",payload,url], capture_output=True, text=True, timeout=18) d = json.loads(r.stdout) text = d.get("choices",[{}])[0].get("message",{}).get("content","") has_def = "def " in text length = len(text) truncated = length < 80 providers.append({"name": name, "status": "OK" if has_def else "NO_CODE", "ok": has_def and not truncated, "length": length}) if truncated: R["alerts"].append(f"{name}: response truncated ({length}c)") lg(f" ⚠️ {name}: SHORT {length}c") else: lg(f" ✅ {name}: {length}c {'has code' if has_def else 'no code'}") except Exception as e: providers.append({"name": name, "status": "ERROR", "ok": False, "error": str(e)[:30]}) lg(f" ❌ {name}: {e}") time.sleep(1) R["sections"]["providers"] = providers ok_providers = sum(1 for p in providers if p.get("ok")) lg(f" Providers: {ok_providers}/{len(providers)} functional") # ═══════════════════════════════════════════ # 6. AUTHENTICATION CHECK — ALL PROTECTED PAGES # ═══════════════════════════════════════════ lg("═══ 6. AUTHENTICATION ═══") auth_checks = [] # Find all auth_request protected pages in nginx nginx_auth = cmd("grep -rh 'auth_request.*goauthentik' /etc/nginx/sites-enabled/ | wc -l") auth_checks.append({"name": "nginx_auth_rules", "count": int(nginx_auth or 0)}) # Check each protected page returns 302 (redirect to SSO) without auth protected = cmd("grep -B2 'auth_request.*goauthentik' /etc/nginx/sites-enabled/weval-consulting 2>/dev/null | grep 'location' | grep -oP '/[a-z_-]+\\.html'") for pg in protected.split("\n"): if not pg: continue code = curl_code(f"https://weval-consulting.com{pg}", 5) ok = code in [200, 302] # 302 = SSO redirect = correct auth_checks.append({"name": pg, "code": code, "ok": ok}) if not ok: R["alerts"].append(f"Auth: {pg} returns HTTP {code} (expected 302)") R["sections"]["authentication"] = auth_checks lg(f" Auth pages: {len(auth_checks)} checked") # ═══════════════════════════════════════════ # 7. REFERENTIAL CHECK — All registered apps # ═══════════════════════════════════════════ lg("═══ 7. REFERENTIALS ═══") refs = {} # Paperclip/Twenty CRM code = curl_code("https://paperclip.weval-consulting.com/auth/sign-in", 5) refs["paperclip"] = {"ok": code == 200, "detail": f"HTTP {code}"} # Enterprise Model code = curl_code("https://weval-consulting.com/enterprise-model.html", 5) refs["enterprise_model"] = {"ok": code == 200, "detail": f"HTTP {code}"} # DeerFlow code = curl_code("https://deerflow.weval-consulting.com/", 5) refs["deerflow"] = {"ok": code in [200, 302], "detail": f"HTTP {code}"} # Register/Fleet try: r = sp.run(["curl","-sk","https://weval-consulting.com/api/wevia-fleet.php","--max-time","5"], capture_output=True, text=True, timeout=8) d = json.loads(r.stdout) refs["fleet"] = {"ok": d.get("agents",0) >= 2, "detail": f"{d.get('agents',0)} agents"} except: refs["fleet"] = {"ok": False} R["sections"]["referentials"] = refs lg(f" Referentials: {sum(1 for v in refs.values() if v.get('ok'))}/{len(refs)}") # ═══════════════════════════════════════════ # 8. SECURITY SCAN — Nuclei-style basic checks # ═══════════════════════════════════════════ lg("═══ 8. SECURITY ═══") security = [] # Check exposed sensitive files sensitive = [".env", ".git/config", "wp-admin", "phpinfo.php", "server-status", ".htaccess"] for path in sensitive: code = curl_code(f"https://weval-consulting.com/{path}", 3) exposed = code == 200 security.append({"name": path, "exposed": exposed, "ok": not exposed}) if exposed: R["alerts"].append(f"SECURITY: {path} is publicly accessible!") # Check security headers headers = cmd("curl -skI https://weval-consulting.com/ --max-time 5 | head -20") has_hsts = "strict-transport" in headers.lower() has_xframe = "x-frame-options" in headers.lower() security.append({"name": "HSTS", "ok": has_hsts}) security.append({"name": "X-Frame-Options", "ok": has_xframe}) R["sections"]["security"] = security lg(f" Security: {sum(1 for s in security if s.get('ok'))}/{len(security)}") # ═══════════════════════════════════════════ # 9. UPDATE WIKI WITH REAL STATE # ═══════════════════════════════════════════ lg("═══ 9. WIKI UPDATE ═══") wiki_entries = [ ("SYSTEMATIC", f"Ports: S204={len(ports.get('S204',[]))} S95={len(ports.get('S95',[]))}. Pages={len(current_pages)} APIs={len(current_apis)}. Docker={len(docker)}."), ("SYSTEMATIC", f"Crons: {crons['total']} (root={crons['root']} www={crons['www_data']} cron.d={crons['cron_d']}). Providers: {ok_providers}/{len(providers)} functional."), ("SYSTEMATIC", f"Auth: {len(auth_checks)} protected pages checked. Security: {sum(1 for s in security if s.get('ok'))}/{len(security)} OK."), ] if R["alerts"]: wiki_entries.append(("ALERT", f"Alerts: {'; '.join(R['alerts'][:3])}")) for cat, fact in wiki_entries: kb_add(cat, fact) lg(f" {len(wiki_entries)} KB entries pushed") # ═══ SAVE ═══ # Count totals total_checks = (len(ports.get("S204",[])) + len(docker) + len(providers) + len(auth_checks) + len(refs) + len(security) + 3) # +3 for cron checks ok_checks = (len([p for p in ports.get("S204",[]) if p]) + len([d for d in docker if "Up" in d.get("status","")]) + ok_providers + sum(1 for c in auth_checks if c.get("ok",True)) + sum(1 for v in refs.values() if v.get("ok")) + sum(1 for s in security if s.get("ok")) + 3) R["total"] = total_checks R["pass"] = ok_checks R["pct"] = round(ok_checks/total_checks*100, 1) if total_checks else 0 R["alerts_count"] = len(R["alerts"]) json.dump(R, open(STATUS, "w"), indent=2) lg(f"{'='*60}") lg(f"SYSTEMATIC: {ok_checks}/{total_checks} ({R['pct']}%) | Alerts: {len(R['alerts'])}") if R["alerts"]: for a in R["alerts"]: lg(f" ⚠️ {a}") lg(f"{'='*60}")