#!/usr/bin/env python3 """ BRAIN SERVER OPTIMIZER Toutes les 10 minutes: - Check santé de tous les serveurs - Calcule KPIs (cost, efficiency) - Auto-actions: restart/delete/repair selon status - Détecte queues bloquées - Met à jour scores """ import psycopg2 import subprocess import socket import requests import json from datetime import datetime, timedelta DB_CONFIG = { 'host': 'localhost', 'database': 'adx_system', 'user': 'admin', 'password': 'admin123' } # Seuils d'action THRESHOLDS = { 'spam_rate_warning': 20, # % - warning 'spam_rate_critical': 40, # % - pause server 'inbox_rate_minimum': 50, # % - en dessous = problème 'queue_stuck_minutes': 30, # minutes sans mouvement = stuck 'blacklist_auto_delete': True, 'max_cost_per_1k': 0.50, # $ - au dessus = inefficient 'min_efficiency_score': 30, # en dessous = delete } # Coûts par provider ($/heure) PROVIDER_COSTS = { 'hetzner': 0.007, # CX11 ~5€/mois 'scaleway': 0.01, 'huawei': 0.02, 'ovh': 0.015, 'vultr': 0.02, } def get_db(): return psycopg2.connect(**DB_CONFIG) def get_pmta_stats(ip, port=8080): """Get PMTA stats via API""" try: # PMTA HTTP API (si configuré) response = requests.get(f"http://{ip}:{port}/status", timeout=5) if response.status_code == 200: return response.json() except: pass # Fallback: local pmta command try: result = subprocess.run( ['pmta', 'show', 'status'], capture_output=True, text=True, timeout=10 ) # Parse output... return {'queue': 0, 'sent': 0} except: return None def check_blacklists(ip): """Quick blacklist check (principales seulement)""" critical_lists = [ 'zen.spamhaus.org', 'b.barracudacentral.org', ] def reverse_ip(ip): return '.'.join(reversed(ip.split('.'))) listings = [] for bl in critical_lists: try: query = f"{reverse_ip(ip)}.{bl}" socket.gethostbyname(query) listings.append(bl) except socket.gaierror: pass except: pass return listings def calculate_efficiency(server): """Calculate server efficiency score""" sent = server['total_sent'] or 0 delivered = server['total_delivered'] or 0 bounced = server['total_bounced'] or 0 complaints = server['total_complaints'] or 0 cost = server['total_cost'] or 0 uptime = server['uptime_hours'] or 1 if sent == 0: return 50 # Neutral for new servers # Delivery rate (40%) delivery_rate = (delivered / sent * 100) if sent > 0 else 0 delivery_score = min(40, delivery_rate * 0.4) # Cost efficiency (30%) - lower cost per 1k = better cost_per_1k = (cost / (sent / 1000)) if sent > 0 else 0 cost_score = max(0, 30 - (cost_per_1k * 60)) # $0.50/1k = 0 score # Complaint rate (30%) - lower = better complaint_rate = (complaints / sent * 100) if sent > 0 else 0 complaint_score = max(0, 30 - (complaint_rate * 10)) return round(delivery_score + cost_score + complaint_score, 2) def update_server_kpi(conn, server_id): """Update KPI for a server""" cur = conn.cursor() # Get current stats cur.execute(""" SELECT s.*, p.name as provider FROM admin.servers s LEFT JOIN admin.cloud_providers p ON s.provider_name = p.name WHERE s.id = %s """, (server_id,)) server = cur.fetchone() if not server: return None # Convert to dict columns = [desc[0] for desc in cur.description] server = dict(zip(columns, server)) # Calculate uptime if server['started_at']: uptime = (datetime.now() - server['started_at']).total_seconds() / 3600 else: uptime = 0 # Calculate cost hourly_cost = PROVIDER_COSTS.get(server['provider_name'], 0.01) total_cost = uptime * hourly_cost # Cost per 1k emails sent = server['total_sent'] or 0 cost_per_1k = (total_cost / (sent / 1000)) if sent > 1000 else None # Calculate efficiency efficiency = calculate_efficiency(server) # Inbox/Spam rates (from tracking data) cur.execute(""" SELECT COUNT(*) as total, SUM(CASE WHEN inbox_placement = 'inbox' THEN 1 ELSE 0 END) as inbox, SUM(CASE WHEN inbox_placement = 'spam' THEN 1 ELSE 0 END) as spam FROM admin.seed_tracking WHERE sender_email IN ( SELECT admin_email FROM admin.office_accounts -- WHERE assigned_server_id = %s -- Si on track par serveur ) AND sent_at > NOW() - INTERVAL '24 hours' """) tracking = cur.fetchone() inbox_rate = (tracking[1] / tracking[0] * 100) if tracking[0] > 0 else 0 spam_rate = (tracking[2] / tracking[0] * 100) if tracking[0] > 0 else 0 # Update server cur.execute(""" UPDATE admin.servers SET uptime_hours = %s, total_cost = %s, cost_per_1k_emails = %s, efficiency_score = %s, inbox_rate = %s, spam_rate = %s WHERE id = %s """, (uptime, total_cost, cost_per_1k, efficiency, inbox_rate, spam_rate, server_id)) # Save KPI snapshot cur.execute(""" INSERT INTO admin.server_kpi_history (server_id, emails_sent, emails_delivered, emails_bounced, queue_size, delivery_rate, inbox_rate, spam_rate, cost_accumulated, cost_per_1k, is_blacklisted, reputation_score, efficiency_score) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) """, ( server_id, sent, server['total_delivered'], server['total_bounced'], server['queue_size'], (server['total_delivered'] / sent * 100) if sent > 0 else 0, inbox_rate, spam_rate, total_cost, cost_per_1k, server['is_blacklisted'], server['ip_reputation_score'], efficiency )) conn.commit() return { 'server_id': server_id, 'efficiency': efficiency, 'cost': total_cost, 'cost_per_1k': cost_per_1k, 'inbox_rate': inbox_rate, 'spam_rate': spam_rate } def check_and_act(conn, server): """Check server health and take action""" cur = conn.cursor() server_id = server['id'] ip = str(server['ip_address']) actions_taken = [] # 1. Check blacklists blacklists = check_blacklists(ip) if blacklists: print(f" 🚨 BLACKLISTED on: {blacklists}") cur.execute(""" UPDATE admin.servers SET is_blacklisted = true, blacklist_sources = %s, status = 'burned' WHERE id = %s """, (blacklists, server_id)) log_action(conn, server_id, 'blacklist_detected', f"Listed on: {blacklists}", server) if THRESHOLDS['blacklist_auto_delete']: actions_taken.append(('delete', 'Blacklisted')) # 2. Check spam rate spam_rate = server.get('spam_rate', 0) or 0 if spam_rate > THRESHOLDS['spam_rate_critical']: print(f" 🚨 CRITICAL SPAM RATE: {spam_rate}%") actions_taken.append(('pause', f'Spam rate {spam_rate}% > {THRESHOLDS["spam_rate_critical"]}%')) elif spam_rate > THRESHOLDS['spam_rate_warning']: print(f" ⚠️ HIGH SPAM RATE: {spam_rate}%") # 3. Check queue stuck queue_size = server.get('queue_size', 0) or 0 if queue_size > 1000: # Check if queue moving cur.execute(""" SELECT queue_size FROM admin.server_kpi_history WHERE server_id = %s ORDER BY snapshot_at DESC LIMIT 2 """, (server_id,)) history = cur.fetchall() if len(history) >= 2 and history[0][0] == history[1][0]: print(f" ⚠️ QUEUE STUCK at {queue_size}") actions_taken.append(('restart', f'Queue stuck at {queue_size}')) # 4. Check efficiency efficiency = server.get('efficiency_score', 100) or 100 if efficiency < THRESHOLDS['min_efficiency_score']: print(f" ⚠️ LOW EFFICIENCY: {efficiency}") actions_taken.append(('review', f'Efficiency {efficiency} < {THRESHOLDS["min_efficiency_score"]}')) # Execute actions for action, reason in actions_taken: execute_action(conn, server_id, action, reason) conn.commit() return actions_taken def execute_action(conn, server_id, action, reason): """Execute automatic action""" cur = conn.cursor() print(f" ⚡ ACTION: {action} - {reason}") if action == 'restart': # Restart PMTA try: subprocess.run(['systemctl', 'restart', 'pmta'], timeout=30) success = True except: success = False cur.execute(""" UPDATE admin.servers SET last_restart = NOW(), restart_count = restart_count + 1, queue_stuck = false WHERE id = %s """, (server_id,)) elif action == 'pause': cur.execute(""" UPDATE admin.servers SET status = 'paused' WHERE id = %s """, (server_id,)) success = True elif action == 'delete': cur.execute(""" UPDATE admin.servers SET status = 'burned' WHERE id = %s """, (server_id,)) success = True # Note: actual destruction handled by cleanup cron else: success = True log_action(conn, server_id, action, reason, None, success) def log_action(conn, server_id, action, reason, metrics=None, success=True): """Log automatic action""" cur = conn.cursor() cur.execute(""" INSERT INTO admin.auto_actions_log (server_id, action_type, reason, metrics_before, success) VALUES (%s, %s, %s, %s, %s) """, (server_id, action, reason, json.dumps(metrics) if metrics else None, success)) conn.commit() def optimize_all_servers(): """Main optimization loop""" print("=" * 70) print("🧠 BRAIN SERVER OPTIMIZER") print(f"Time: {datetime.now()}") print("=" * 70) conn = get_db() cur = conn.cursor() # Get all active servers cur.execute(""" SELECT * FROM admin.servers WHERE status IN ('running', 'provisioning', 'paused') """) columns = [desc[0] for desc in cur.description] servers = [dict(zip(columns, row)) for row in cur.fetchall()] print(f"\nChecking {len(servers)} servers...") total_cost = 0 total_sent = 0 issues = [] for server in servers: print(f"\n📊 {server['name']} ({server['ip_address']})") # Update KPIs kpi = update_server_kpi(conn, server['id']) if kpi: total_cost += kpi['cost'] or 0 total_sent += server['total_sent'] or 0 print(f" Efficiency: {kpi['efficiency']} | Cost: ${kpi['cost']:.2f} | Inbox: {kpi['inbox_rate']}%") # Merge KPI into server dict server.update(kpi or {}) # Check and act actions = check_and_act(conn, server) if actions: issues.append((server['name'], actions)) # Summary print("\n" + "=" * 70) print("📋 SUMMARY") print("=" * 70) print(f"Total Servers: {len(servers)}") print(f"Total Cost: ${total_cost:.2f}") print(f"Total Sent: {total_sent:,}") if total_sent > 0: print(f"Avg Cost/1K: ${(total_cost / (total_sent/1000)):.4f}") if issues: print(f"\n⚠️ Issues detected on {len(issues)} servers:") for name, actions in issues: print(f" {name}: {[a[0] for a in actions]}") conn.close() print("\n✅ Optimization complete!") if __name__ == '__main__': optimize_all_servers()