187 lines
7.5 KiB
Python
187 lines
7.5 KiB
Python
import json, subprocess as sp, time, os, urllib.request, ssl
|
|
|
|
BASE = "https://weval-consulting.com"
|
|
RESULTS = "/var/www/html/api/l99-security.json"
|
|
ctx = ssl.create_default_context()
|
|
ctx.check_hostname = False
|
|
ctx.verify_mode = ssl.CERT_NONE
|
|
|
|
security = {"scans": {}, "timestamp": time.strftime("%Y-%m-%dT%H:%M:%S")}
|
|
|
|
# === 1. NUCLEI SCAN (fast mode, critical+high only) ===
|
|
print("=== NUCLEI SCAN ===")
|
|
try:
|
|
targets = [
|
|
"https://weval-consulting.com",
|
|
"https://wevads.weval-consulting.com",
|
|
"https://consent.wevup.app"
|
|
]
|
|
# Write targets file
|
|
open("/tmp/nuclei-targets.txt", "w").write("\n".join(targets))
|
|
|
|
r = sp.run([
|
|
"nuclei", "-l", "/tmp/nuclei-targets.txt",
|
|
"-severity", "critical,high,medium",
|
|
"-t", "http/", "-t", "ssl/", "-t", "misconfiguration/",
|
|
"-silent", "-timeout", "5", "-retries", "1",
|
|
"-json-export", "/tmp/nuclei-results.json",
|
|
"-rate-limit", "10"
|
|
], capture_output=True, text=True, timeout=120)
|
|
|
|
# Parse results
|
|
vulns = []
|
|
if os.path.exists("/tmp/nuclei-results.json"):
|
|
for line in open("/tmp/nuclei-results.json"):
|
|
try:
|
|
v = json.loads(line.strip())
|
|
vulns.append({
|
|
"template": v.get("template-id", "?"),
|
|
"severity": v.get("info", {}).get("severity", "?"),
|
|
"name": v.get("info", {}).get("name", "?"),
|
|
"host": v.get("host", "?"),
|
|
"matched": v.get("matched-at", "?"),
|
|
"type": v.get("type", "?")
|
|
})
|
|
except:
|
|
pass
|
|
|
|
security["scans"]["nuclei"] = {
|
|
"vulns": vulns,
|
|
"count": len(vulns),
|
|
"critical": sum(1 for v in vulns if v["severity"] == "critical"),
|
|
"high": sum(1 for v in vulns if v["severity"] == "high"),
|
|
"medium": sum(1 for v in vulns if v["severity"] == "medium"),
|
|
"status": "PASS" if not any(v["severity"] in ("critical","high") for v in vulns) else "FAIL"
|
|
}
|
|
print(f" Nuclei: {len(vulns)} vulns ({security['scans']['nuclei']['critical']}C/{security['scans']['nuclei']['high']}H/{security['scans']['nuclei']['medium']}M)")
|
|
except Exception as e:
|
|
security["scans"]["nuclei"] = {"error": str(e)[:100], "status": "ERROR"}
|
|
print(f" Nuclei ERROR: {e}")
|
|
|
|
# === 2. PORT SCAN (nmap or ss) ===
|
|
print("=== PORT SCAN ===")
|
|
try:
|
|
r = sp.run(["ss", "-tlnp"], capture_output=True, text=True, timeout=10)
|
|
ports = []
|
|
for line in r.stdout.split("\n"):
|
|
if "LISTEN" in line:
|
|
parts = line.split()
|
|
addr = parts[3] if len(parts) > 3 else ""
|
|
proc = parts[-1] if parts else ""
|
|
ports.append({"addr": addr, "process": proc[:50]})
|
|
|
|
# Flag suspicious ports
|
|
exposed = [p for p in ports if "0.0.0.0" in p["addr"] or "::" in p["addr"]]
|
|
security["scans"]["ports"] = {
|
|
"total": len(ports),
|
|
"exposed": len(exposed),
|
|
"ports": ports[:30],
|
|
"status": "WARN" if len(exposed) > 10 else "PASS"
|
|
}
|
|
print(f" Ports: {len(ports)} total, {len(exposed)} exposed to 0.0.0.0")
|
|
except Exception as e:
|
|
security["scans"]["ports"] = {"error": str(e)[:100]}
|
|
|
|
# === 3. SSL/TLS CHECK ===
|
|
print("=== SSL CHECK ===")
|
|
try:
|
|
r = sp.run(["openssl", "s_client", "-connect", "weval-consulting.com:443", "-servername", "weval-consulting.com"],
|
|
input=b"", capture_output=True, text=True, timeout=10)
|
|
has_cert = "BEGIN CERTIFICATE" in r.stdout
|
|
expiry = ""
|
|
for line in r.stdout.split("\n"):
|
|
if "Not After" in line:
|
|
expiry = line.strip()
|
|
|
|
security["scans"]["ssl"] = {
|
|
"valid": has_cert,
|
|
"expiry": expiry,
|
|
"status": "PASS" if has_cert else "FAIL"
|
|
}
|
|
print(f" SSL: valid={has_cert} {expiry}")
|
|
except Exception as e:
|
|
security["scans"]["ssl"] = {"error": str(e)[:100]}
|
|
|
|
# === 4. AUTH CHECK (SSO protection) ===
|
|
print("=== AUTH CHECK ===")
|
|
protected_pages = [
|
|
"/blade-ai.html", "/crm.html", "/ops-center.html", "/monitoring.html",
|
|
"/l99.html", "/ai-benchmark.html", "/command-center.html"
|
|
]
|
|
auth_results = []
|
|
for page in protected_pages:
|
|
try:
|
|
req = urllib.request.Request(BASE + page)
|
|
resp = urllib.request.urlopen(req, timeout=5, context=ctx)
|
|
code = resp.getcode()
|
|
url = resp.geturl()
|
|
protected = "authentik" in url.lower() or "if/flow" in url.lower() or code == 302
|
|
auth_results.append({"page": page, "protected": protected, "code": code})
|
|
except urllib.error.HTTPError as e:
|
|
auth_results.append({"page": page, "protected": e.code in (302, 401, 403), "code": e.code})
|
|
except:
|
|
auth_results.append({"page": page, "protected": True, "code": 0})
|
|
|
|
unprotected = [a for a in auth_results if not a["protected"]]
|
|
security["scans"]["auth"] = {
|
|
"total": len(auth_results),
|
|
"protected": len(auth_results) - len(unprotected),
|
|
"unprotected": [a["page"] for a in unprotected],
|
|
"status": "PASS" if not unprotected else "FAIL"
|
|
}
|
|
print(f" Auth: {len(auth_results)-len(unprotected)}/{len(auth_results)} protected | Exposed: {[a['page'] for a in unprotected]}")
|
|
|
|
# === 5. SECURITY HEADERS ===
|
|
print("=== HEADERS CHECK ===")
|
|
try:
|
|
req = urllib.request.Request(BASE)
|
|
resp = urllib.request.urlopen(req, timeout=5, context=ctx)
|
|
headers = dict(resp.headers)
|
|
required = ["X-Frame-Options", "X-Content-Type-Options", "Strict-Transport-Security",
|
|
"Content-Security-Policy", "X-XSS-Protection"]
|
|
present = [h for h in required if any(h.lower() == k.lower() for k in headers)]
|
|
missing = [h for h in required if h not in present]
|
|
security["scans"]["headers"] = {
|
|
"present": present,
|
|
"missing": missing,
|
|
"status": "PASS" if len(missing) <= 1 else "WARN"
|
|
}
|
|
print(f" Headers: {len(present)}/{len(required)} present | Missing: {missing}")
|
|
except Exception as e:
|
|
security["scans"]["headers"] = {"error": str(e)[:100]}
|
|
|
|
# === 6. OSS TOOLS EVALUATION ===
|
|
print("=== OSS TOOLS ===")
|
|
oss_eval = []
|
|
for name, path, useful_for in [
|
|
("Nuclei", "/usr/local/bin/nuclei", "vuln scanning, API testing, misconfig detection"),
|
|
("Wazuh", "/opt/wazuh", "SIEM, log analysis, intrusion detection"),
|
|
("Prometheus", "/opt/prometheus", "metrics, alerting, infrastructure monitoring"),
|
|
("KeyHacks", "/opt/keyhacks", "API key validation, secret detection"),
|
|
("EvoMaster", "/opt/evo*", "API fuzzing, REST testing"),
|
|
("DeerFlow", "/opt/deer-flow", "research, web search, documentation"),
|
|
("Flowise", "docker:flowise", "visual LLM workflows, chain testing"),
|
|
("SkillSmith", "/opt/skillsmith", "skill creation for Claude agents"),
|
|
("Paperclip", "/opt/paperclip-weval", "150 agents, multi-task orchestration"),
|
|
("SuperMemory", "/opt/supermemory", "memory/RAG, context persistence"),
|
|
("Uptime-Kuma", "docker:uptime-kuma", "uptime monitoring, status pages"),
|
|
("AEGIS", "/opt/guard", "file protection, integrity monitoring"),
|
|
]:
|
|
exists = os.path.exists(path) or (path.startswith("docker:") and True)
|
|
oss_eval.append({"name": name, "exists": exists, "use": useful_for})
|
|
|
|
security["oss_tools"] = oss_eval
|
|
print(f" {len([o for o in oss_eval if o['exists']])} OSS tools available")
|
|
|
|
# Save
|
|
json.dump(security, open(RESULTS, "w"), indent=2, ensure_ascii=False)
|
|
|
|
# Summary
|
|
print(f"\n{'='*50}")
|
|
print(f"SECURITY SCAN COMPLETE")
|
|
for scan_name, scan_data in security["scans"].items():
|
|
status = scan_data.get("status", "?")
|
|
tag = "🟢" if status == "PASS" else "🟡" if status == "WARN" else "🔴"
|
|
print(f" {tag} {scan_name}: {status}")
|
|
|