98 lines
4.5 KiB
Python
98 lines
4.5 KiB
Python
#!/usr/bin/env python3
|
|
import json,os,glob,time,subprocess,urllib.request,ssl
|
|
|
|
TS=time.strftime("%Y-%m-%dT%H:%M:%S")
|
|
LOG="/var/log/l99-autonomous.log"
|
|
STATE="/var/www/html/api/l99-state.json"
|
|
NR="/var/www/html/api/nonreg-latest.json"
|
|
REPORT="/var/www/html/api/l99-autonomous-report.json"
|
|
|
|
def log(msg):
|
|
with open(LOG,"a") as f:
|
|
f.write(TS+" "+msg+"\n")
|
|
print(msg)
|
|
|
|
log("=== L99 AUTONOMOUS START ===")
|
|
|
|
html_pages=sorted(set(
|
|
[f.replace("/var/www/html","") for f in glob.glob("/var/www/html/*.html")]+
|
|
[f.replace("/var/www/html","") for f in glob.glob("/var/www/html/**/*.html",recursive=True) if "node_modules" not in f and "archive" not in f]
|
|
))
|
|
php_apis=sorted([f for f in os.listdir("/var/www/html/api") if f.endswith(".php")])
|
|
screenshots=len(glob.glob("/opt/weval-l99/**/*.png",recursive=True))
|
|
videos=len(glob.glob("/opt/weval-l99/**/*.webm",recursive=True))+len(glob.glob("/opt/weval-l99/**/*.mp4",recursive=True))
|
|
|
|
log("DISCOVER: %d HTML, %d APIs, %d SS, %d vid"%(len(html_pages),len(php_apis),screenshots,videos))
|
|
|
|
ctx=ssl._create_unverified_context()
|
|
results={"pass":0,"fail":0,"warn":0,"pages":[],"apis":[],"new_pages":[],"dead_pages":[]}
|
|
prev_pages=[]
|
|
if os.path.exists(REPORT):
|
|
try:prev_pages=[p["path"] for p in json.load(open(REPORT)).get("pages",[])]
|
|
except:pass
|
|
|
|
for page in html_pages[:50]:
|
|
url="https://127.0.0.1"+page
|
|
try:
|
|
req=urllib.request.Request(url,headers={"Host":"weval-consulting.com","User-Agent":"L99-Auto/1.0"})
|
|
resp=urllib.request.urlopen(req,timeout=5,context=ctx)
|
|
code=resp.getcode()
|
|
size=len(resp.read())
|
|
ok=code in[200,301,302] and size>500
|
|
status="P" if ok else "F"
|
|
if ok:results["pass"]+=1
|
|
else:results["fail"]+=1
|
|
results["pages"].append({"path":page,"code":code,"size":size,"status":status})
|
|
if page not in prev_pages:results["new_pages"].append(page)
|
|
except Exception as e:
|
|
results["fail"]+=1
|
|
results["pages"].append({"path":page,"code":0,"size":0,"status":"F","error":str(e)[:40]})
|
|
results["dead_pages"].append(page)
|
|
|
|
for p in prev_pages:
|
|
if p not in[pg["path"] for pg in results["pages"]]:results["dead_pages"].append(p)
|
|
|
|
log("TEST: %dP/%dF out of %d"%(results["pass"],results["fail"],len(results["pages"])))
|
|
|
|
critical_apis=[("nonreg","/api/nonreg-api.php?cat=all"),("health","/api/ecosystem-health.php"),("deep-test","/api/wevia-deep-test.php"),("autoheal","/api/wevia-master-autoheal.php"),("archi","/api/architecture-index.json"),("l99-state","/api/l99-state.json")]
|
|
for name,path in critical_apis:
|
|
try:
|
|
req=urllib.request.Request("https://127.0.0.1"+path,headers={"Host":"weval-consulting.com","User-Agent":"L99-Auto/1.0"})
|
|
resp=urllib.request.urlopen(req,timeout=10,context=ctx)
|
|
results["apis"].append({"name":name,"code":resp.getcode(),"ok":True})
|
|
except Exception as e:
|
|
results["apis"].append({"name":name,"code":0,"ok":False,"error":str(e)[:40]})
|
|
|
|
api_pass=sum(1 for a in results["apis"] if a["ok"])
|
|
log("APIs: %d/%d"%(api_pass,len(results["apis"])))
|
|
|
|
state=json.load(open(STATE))
|
|
state["screenshots"]=screenshots
|
|
state["videos"]=videos
|
|
state["pages_html"]=len(html_pages)
|
|
state["apis_php"]=len(php_apis)
|
|
state["last_update"]=TS
|
|
state["timestamp"]=TS
|
|
nr=json.load(open(NR)) if os.path.exists(NR) else {}
|
|
state["tests_pass"]=nr.get("pass",0)
|
|
state["tests_total"]=nr.get("total",0)
|
|
dpmo=round((1-nr.get("pass",0)/max(1,nr.get("total",1)))*1000000)
|
|
sigma="<3"
|
|
if dpmo<66810:sigma="3"
|
|
if dpmo<6210:sigma="4"
|
|
if dpmo<233:sigma="5"
|
|
if dpmo<4:sigma="6"
|
|
state["sigma"]={"dpmo":dpmo,"level":sigma,"nr":"%s/%s"%(nr.get("pass","?"),nr.get("total","?"))}
|
|
state["autonomous"]={"last_run":TS,"pages_tested":len(results["pages"]),"pages_pass":results["pass"],"pages_fail":results["fail"],"new_pages":results["new_pages"][:10],"dead_pages":results["dead_pages"][:10],"apis_tested":len(results["apis"]),"apis_pass":api_pass}
|
|
json.dump(state,open(STATE,"w"),indent=2)
|
|
|
|
results["ts"]=TS
|
|
results["total"]=len(results["pages"])
|
|
results["score"]=round(results["pass"]/max(1,results["total"])*100,1)
|
|
results["sigma"]=state["sigma"]
|
|
results["apis_score"]="%d/%d"%(api_pass,len(results["apis"]))
|
|
json.dump(results,open(REPORT,"w"),indent=2)
|
|
|
|
log("DONE: %.1f%% Sigma %s | %d new | %d dead"%(results["score"],sigma,len(results["new_pages"]),len(results["dead_pages"])))
|
|
print(json.dumps({"pages":results["pass"],"total":results["total"],"score":results["score"],"sigma":sigma,"dpmo":dpmo,"new":len(results["new_pages"]),"dead":len(results["dead_pages"]),"apis":results["apis_score"]}))
|