125 lines
4.5 KiB
Python
Executable File
125 lines
4.5 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
# /opt/weval-l99/screens-health-check.py v2
|
|
# Distinction: UP / SLOW / BROKEN (5xx) / DOWN (timeout/DNS) / PROTECTED (401/403)
|
|
import json, os, sys, time, re, tempfile
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
from urllib.request import Request, urlopen
|
|
from urllib.error import URLError, HTTPError
|
|
from urllib.parse import quote, urlparse, urlunparse
|
|
|
|
CARTO_FILE = "/var/www/html/cartographie-screens.html"
|
|
OUT_FILE = "/var/www/html/api/screens-health.json"
|
|
WORKERS = 40
|
|
TIMEOUT = 10
|
|
SLOW_MS = 8000
|
|
|
|
def extract_data():
|
|
if not os.path.exists(CARTO_FILE):
|
|
return []
|
|
with open(CARTO_FILE, "r", encoding="utf-8") as f:
|
|
html = f.read()
|
|
m = re.search(r"const DATA = (\[.*?\]);", html, re.DOTALL)
|
|
if not m:
|
|
return []
|
|
try:
|
|
return json.loads(m.group(1))
|
|
except Exception:
|
|
return []
|
|
|
|
def classify(code, elapsed_ms):
|
|
if code == 0:
|
|
return "DOWN"
|
|
if 200 <= code < 400:
|
|
return "SLOW" if elapsed_ms > SLOW_MS else "UP"
|
|
if code in (401, 403):
|
|
return "PROTECTED"
|
|
if code == 404:
|
|
return "NOT_FOUND"
|
|
if code in (405, 501):
|
|
return "UP" # method not allowed = page exists but rejected HEAD
|
|
if code == 400:
|
|
return "PROTECTED" # endpoint exists, expects POST/specific input
|
|
if code == 429:
|
|
return "SLOW" # rate limited but endpoint alive
|
|
if code == 502:
|
|
return "SLOW" # upstream transient
|
|
if 500 <= code < 600:
|
|
return "BROKEN"
|
|
return "DOWN"
|
|
|
|
def check_one(entry):
|
|
url = entry.get("url")
|
|
name = entry.get("name")
|
|
# Percent-encode non-ASCII chars in path (fixes DOWN on URLs like méthodologie, adhérence)
|
|
try:
|
|
p = urlparse(url)
|
|
if any(ord(c) > 127 for c in p.path):
|
|
encoded_path = quote(p.path, safe="/")
|
|
url = urlunparse((p.scheme, p.netloc, encoded_path, p.params, p.query, p.fragment))
|
|
except Exception:
|
|
pass
|
|
t0 = time.time()
|
|
code = 0
|
|
try:
|
|
# First try GET with Range (PHP handles GET better than HEAD; Range avoids loading full body)
|
|
req = Request(url, method="GET", headers={"User-Agent": "WEVIA-HealthCheck/2.0", "Range": "bytes=0-0"})
|
|
try:
|
|
resp = urlopen(req, timeout=TIMEOUT)
|
|
code = resp.status
|
|
except HTTPError as e:
|
|
code = e.code
|
|
# If HEAD rejected (405/501), retry GET with Range header
|
|
if code in (405, 501):
|
|
try:
|
|
req2 = Request(url, method="GET", headers={"User-Agent": "WEVIA-HealthCheck/2.0", "Range": "bytes=0-0"})
|
|
resp2 = urlopen(req2, timeout=TIMEOUT)
|
|
code = resp2.status
|
|
except HTTPError as e2:
|
|
code = e2.code
|
|
except Exception:
|
|
pass
|
|
except (URLError, Exception):
|
|
code = 0
|
|
elapsed_ms = int((time.time() - t0) * 1000)
|
|
status = classify(code, elapsed_ms)
|
|
return {"name": name, "url": url, "server": entry.get("server"), "status": status, "code": code, "ms": elapsed_ms}
|
|
|
|
def main():
|
|
data = extract_data()
|
|
if not data:
|
|
print("NO_DATA")
|
|
sys.exit(1)
|
|
print(f"Checking {len(data)} screens, {WORKERS} workers, timeout {TIMEOUT}s")
|
|
t0 = time.time()
|
|
results = []
|
|
with ThreadPoolExecutor(max_workers=WORKERS) as ex:
|
|
futures = {ex.submit(check_one, e): e for e in data}
|
|
for fut in as_completed(futures):
|
|
try:
|
|
results.append(fut.result())
|
|
except Exception:
|
|
e = futures[fut]
|
|
results.append({"name": e.get("name"), "url": e.get("url"), "server": e.get("server"), "status": "ERROR", "code": 0, "ms": 0})
|
|
elapsed = time.time() - t0
|
|
counts = {}
|
|
for r in results:
|
|
counts[r["status"]] = counts.get(r["status"], 0) + 1
|
|
out = {
|
|
"generated_at": time.strftime("%Y-%m-%dT%H:%M:%S%z"),
|
|
"elapsed_sec": round(elapsed, 1),
|
|
"total": len(results),
|
|
"counts": counts,
|
|
"by_url": {r["url"]: {"status": r["status"], "code": r["code"], "ms": r["ms"]} for r in results}
|
|
}
|
|
tmp = tempfile.NamedTemporaryFile(mode="w", delete=False, dir="/var/www/html/api/", suffix=".tmp", encoding="utf-8")
|
|
json.dump(out, tmp, separators=(",", ":"))
|
|
tmp.close()
|
|
os.replace(tmp.name, OUT_FILE)
|
|
os.chmod(OUT_FILE, 0o644)
|
|
summary = " ".join(f"{k}={v}" for k, v in sorted(counts.items()))
|
|
print(f"DONE in {elapsed:.1f}s: {summary}")
|
|
print(f"Written: {OUT_FILE}")
|
|
|
|
if __name__ == "__main__":
|
|
main()
|