Files
wevads-platform/scripts/brain-server-optimizer.py
2026-02-26 04:53:11 +01:00

389 lines
12 KiB
Python
Executable File

#!/usr/bin/env python3
"""
BRAIN SERVER OPTIMIZER
Toutes les 10 minutes:
- Check santé de tous les serveurs
- Calcule KPIs (cost, efficiency)
- Auto-actions: restart/delete/repair selon status
- Détecte queues bloquées
- Met à jour scores
"""
import psycopg2
import subprocess
import socket
import requests
import json
from datetime import datetime, timedelta
DB_CONFIG = {
'host': 'localhost',
'database': 'adx_system',
'user': 'admin',
'password': 'admin123'
}
# Seuils d'action
THRESHOLDS = {
'spam_rate_warning': 20, # % - warning
'spam_rate_critical': 40, # % - pause server
'inbox_rate_minimum': 50, # % - en dessous = problème
'queue_stuck_minutes': 30, # minutes sans mouvement = stuck
'blacklist_auto_delete': True,
'max_cost_per_1k': 0.50, # $ - au dessus = inefficient
'min_efficiency_score': 30, # en dessous = delete
}
# Coûts par provider ($/heure)
PROVIDER_COSTS = {
'hetzner': 0.007, # CX11 ~5€/mois
'scaleway': 0.01,
'huawei': 0.02,
'ovh': 0.015,
'vultr': 0.02,
}
def get_db():
return psycopg2.connect(**DB_CONFIG)
def get_pmta_stats(ip, port=8080):
"""Get PMTA stats via API"""
try:
# PMTA HTTP API (si configuré)
response = requests.get(f"http://{ip}:{port}/status", timeout=5)
if response.status_code == 200:
return response.json()
except:
pass
# Fallback: local pmta command
try:
result = subprocess.run(
['pmta', 'show', 'status'],
capture_output=True, text=True, timeout=10
)
# Parse output...
return {'queue': 0, 'sent': 0}
except:
return None
def check_blacklists(ip):
"""Quick blacklist check (principales seulement)"""
critical_lists = [
'zen.spamhaus.org',
'b.barracudacentral.org',
]
def reverse_ip(ip):
return '.'.join(reversed(ip.split('.')))
listings = []
for bl in critical_lists:
try:
query = f"{reverse_ip(ip)}.{bl}"
socket.gethostbyname(query)
listings.append(bl)
except socket.gaierror:
pass
except:
pass
return listings
def calculate_efficiency(server):
"""Calculate server efficiency score"""
sent = server['total_sent'] or 0
delivered = server['total_delivered'] or 0
bounced = server['total_bounced'] or 0
complaints = server['total_complaints'] or 0
cost = server['total_cost'] or 0
uptime = server['uptime_hours'] or 1
if sent == 0:
return 50 # Neutral for new servers
# Delivery rate (40%)
delivery_rate = (delivered / sent * 100) if sent > 0 else 0
delivery_score = min(40, delivery_rate * 0.4)
# Cost efficiency (30%) - lower cost per 1k = better
cost_per_1k = (cost / (sent / 1000)) if sent > 0 else 0
cost_score = max(0, 30 - (cost_per_1k * 60)) # $0.50/1k = 0 score
# Complaint rate (30%) - lower = better
complaint_rate = (complaints / sent * 100) if sent > 0 else 0
complaint_score = max(0, 30 - (complaint_rate * 10))
return round(delivery_score + cost_score + complaint_score, 2)
def update_server_kpi(conn, server_id):
"""Update KPI for a server"""
cur = conn.cursor()
# Get current stats
cur.execute("""
SELECT s.*, p.name as provider
FROM admin.servers s
LEFT JOIN admin.cloud_providers p ON s.provider_name = p.name
WHERE s.id = %s
""", (server_id,))
server = cur.fetchone()
if not server:
return None
# Convert to dict
columns = [desc[0] for desc in cur.description]
server = dict(zip(columns, server))
# Calculate uptime
if server['started_at']:
uptime = (datetime.now() - server['started_at']).total_seconds() / 3600
else:
uptime = 0
# Calculate cost
hourly_cost = PROVIDER_COSTS.get(server['provider_name'], 0.01)
total_cost = uptime * hourly_cost
# Cost per 1k emails
sent = server['total_sent'] or 0
cost_per_1k = (total_cost / (sent / 1000)) if sent > 1000 else None
# Calculate efficiency
efficiency = calculate_efficiency(server)
# Inbox/Spam rates (from tracking data)
cur.execute("""
SELECT
COUNT(*) as total,
SUM(CASE WHEN inbox_placement = 'inbox' THEN 1 ELSE 0 END) as inbox,
SUM(CASE WHEN inbox_placement = 'spam' THEN 1 ELSE 0 END) as spam
FROM admin.seed_tracking
WHERE sender_email IN (
SELECT admin_email FROM admin.office_accounts
-- WHERE assigned_server_id = %s -- Si on track par serveur
)
AND sent_at > NOW() - INTERVAL '24 hours'
""")
tracking = cur.fetchone()
inbox_rate = (tracking[1] / tracking[0] * 100) if tracking[0] > 0 else 0
spam_rate = (tracking[2] / tracking[0] * 100) if tracking[0] > 0 else 0
# Update server
cur.execute("""
UPDATE admin.servers SET
uptime_hours = %s,
total_cost = %s,
cost_per_1k_emails = %s,
efficiency_score = %s,
inbox_rate = %s,
spam_rate = %s
WHERE id = %s
""", (uptime, total_cost, cost_per_1k, efficiency, inbox_rate, spam_rate, server_id))
# Save KPI snapshot
cur.execute("""
INSERT INTO admin.server_kpi_history
(server_id, emails_sent, emails_delivered, emails_bounced, queue_size,
delivery_rate, inbox_rate, spam_rate, cost_accumulated, cost_per_1k,
is_blacklisted, reputation_score, efficiency_score)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
""", (
server_id, sent, server['total_delivered'], server['total_bounced'],
server['queue_size'],
(server['total_delivered'] / sent * 100) if sent > 0 else 0,
inbox_rate, spam_rate, total_cost, cost_per_1k,
server['is_blacklisted'], server['ip_reputation_score'], efficiency
))
conn.commit()
return {
'server_id': server_id,
'efficiency': efficiency,
'cost': total_cost,
'cost_per_1k': cost_per_1k,
'inbox_rate': inbox_rate,
'spam_rate': spam_rate
}
def check_and_act(conn, server):
"""Check server health and take action"""
cur = conn.cursor()
server_id = server['id']
ip = str(server['ip_address'])
actions_taken = []
# 1. Check blacklists
blacklists = check_blacklists(ip)
if blacklists:
print(f" 🚨 BLACKLISTED on: {blacklists}")
cur.execute("""
UPDATE admin.servers SET
is_blacklisted = true,
blacklist_sources = %s,
status = 'burned'
WHERE id = %s
""", (blacklists, server_id))
log_action(conn, server_id, 'blacklist_detected',
f"Listed on: {blacklists}", server)
if THRESHOLDS['blacklist_auto_delete']:
actions_taken.append(('delete', 'Blacklisted'))
# 2. Check spam rate
spam_rate = server.get('spam_rate', 0) or 0
if spam_rate > THRESHOLDS['spam_rate_critical']:
print(f" 🚨 CRITICAL SPAM RATE: {spam_rate}%")
actions_taken.append(('pause', f'Spam rate {spam_rate}% > {THRESHOLDS["spam_rate_critical"]}%'))
elif spam_rate > THRESHOLDS['spam_rate_warning']:
print(f" ⚠️ HIGH SPAM RATE: {spam_rate}%")
# 3. Check queue stuck
queue_size = server.get('queue_size', 0) or 0
if queue_size > 1000:
# Check if queue moving
cur.execute("""
SELECT queue_size FROM admin.server_kpi_history
WHERE server_id = %s
ORDER BY snapshot_at DESC
LIMIT 2
""", (server_id,))
history = cur.fetchall()
if len(history) >= 2 and history[0][0] == history[1][0]:
print(f" ⚠️ QUEUE STUCK at {queue_size}")
actions_taken.append(('restart', f'Queue stuck at {queue_size}'))
# 4. Check efficiency
efficiency = server.get('efficiency_score', 100) or 100
if efficiency < THRESHOLDS['min_efficiency_score']:
print(f" ⚠️ LOW EFFICIENCY: {efficiency}")
actions_taken.append(('review', f'Efficiency {efficiency} < {THRESHOLDS["min_efficiency_score"]}'))
# Execute actions
for action, reason in actions_taken:
execute_action(conn, server_id, action, reason)
conn.commit()
return actions_taken
def execute_action(conn, server_id, action, reason):
"""Execute automatic action"""
cur = conn.cursor()
print(f" ⚡ ACTION: {action} - {reason}")
if action == 'restart':
# Restart PMTA
try:
subprocess.run(['systemctl', 'restart', 'pmta'], timeout=30)
success = True
except:
success = False
cur.execute("""
UPDATE admin.servers SET
last_restart = NOW(),
restart_count = restart_count + 1,
queue_stuck = false
WHERE id = %s
""", (server_id,))
elif action == 'pause':
cur.execute("""
UPDATE admin.servers SET status = 'paused' WHERE id = %s
""", (server_id,))
success = True
elif action == 'delete':
cur.execute("""
UPDATE admin.servers SET status = 'burned' WHERE id = %s
""", (server_id,))
success = True
# Note: actual destruction handled by cleanup cron
else:
success = True
log_action(conn, server_id, action, reason, None, success)
def log_action(conn, server_id, action, reason, metrics=None, success=True):
"""Log automatic action"""
cur = conn.cursor()
cur.execute("""
INSERT INTO admin.auto_actions_log
(server_id, action_type, reason, metrics_before, success)
VALUES (%s, %s, %s, %s, %s)
""", (server_id, action, reason, json.dumps(metrics) if metrics else None, success))
conn.commit()
def optimize_all_servers():
"""Main optimization loop"""
print("=" * 70)
print("🧠 BRAIN SERVER OPTIMIZER")
print(f"Time: {datetime.now()}")
print("=" * 70)
conn = get_db()
cur = conn.cursor()
# Get all active servers
cur.execute("""
SELECT * FROM admin.servers
WHERE status IN ('running', 'provisioning', 'paused')
""")
columns = [desc[0] for desc in cur.description]
servers = [dict(zip(columns, row)) for row in cur.fetchall()]
print(f"\nChecking {len(servers)} servers...")
total_cost = 0
total_sent = 0
issues = []
for server in servers:
print(f"\n📊 {server['name']} ({server['ip_address']})")
# Update KPIs
kpi = update_server_kpi(conn, server['id'])
if kpi:
total_cost += kpi['cost'] or 0
total_sent += server['total_sent'] or 0
print(f" Efficiency: {kpi['efficiency']} | Cost: ${kpi['cost']:.2f} | Inbox: {kpi['inbox_rate']}%")
# Merge KPI into server dict
server.update(kpi or {})
# Check and act
actions = check_and_act(conn, server)
if actions:
issues.append((server['name'], actions))
# Summary
print("\n" + "=" * 70)
print("📋 SUMMARY")
print("=" * 70)
print(f"Total Servers: {len(servers)}")
print(f"Total Cost: ${total_cost:.2f}")
print(f"Total Sent: {total_sent:,}")
if total_sent > 0:
print(f"Avg Cost/1K: ${(total_cost / (total_sent/1000)):.4f}")
if issues:
print(f"\n⚠️ Issues detected on {len(issues)} servers:")
for name, actions in issues:
print(f" {name}: {[a[0] for a in actions]}")
conn.close()
print("\n✅ Optimization complete!")
if __name__ == '__main__':
optimize_all_servers()