#!/usr/bin/env python3 """ CONFIG VALIDATOR Valide les configurations gagnantes: - Teste headers, DNS, body templates - Envoie vers seeds de test - Mesure inbox rate - Marque comme validated ou failed """ import psycopg2 import smtplib import hashlib import json import re from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from datetime import datetime import random import string DB_CONFIG = { 'host': 'localhost', 'database': 'adx_system', 'user': 'admin', 'password': 'admin123' } TRACKING_BASE = "http://151.80.235.110" def get_db(): return psycopg2.connect(**DB_CONFIG) def generate_random_string(length=10): """Generate random alphanumeric string""" return ''.join(random.choices(string.ascii_lowercase + string.digits, k=length)) def replace_placeholders(template, values): """Replace placeholders in template""" result = template # Standard placeholders placeholders = { '[uan_25]': generate_random_string(8), '[uan_10]': generate_random_string(6), '[uan_14]': generate_random_string(8), '[uanu_10]': generate_random_string(6), '[uanu_8]': generate_random_string(5), '[email]': values.get('to_email', 'test@example.com'), '[mail_date]': datetime.now().strftime('%a, %d %b %Y %H:%M:%S +0000'), '[first_name]': values.get('first_name', 'User'), '[placeholder1]': values.get('placeholder1', 'example.com'), '[placeholder2]': values.get('placeholder2', 'sender.com'), '[url]': values.get('tracking_url', 'c/test'), '[unsub]': values.get('unsub_url', 'u/test'), '[open]': values.get('open_url', 'o/test'), } for key, value in placeholders.items(): result = result.replace(key, str(value)) # Random placeholders [a_X], [ual_X] result = re.sub(r'\[a_\d+\]', lambda m: generate_random_string(8), result) result = re.sub(r'\[ual_\d+\]', lambda m: generate_random_string(10), result) return result def build_email_from_config(header_config, body_template, to_email, placeholder1, placeholder2): """Build complete email from winning config""" tracking_code = hashlib.md5(f"{to_email}:{datetime.now().timestamp()}".encode()).hexdigest()[:12] values = { 'to_email': to_email, 'placeholder1': placeholder1, 'placeholder2': placeholder2, 'tracking_url': f"{TRACKING_BASE}/c/{tracking_code}", 'unsub_url': f"{TRACKING_BASE}/u/{tracking_code}", 'open_url': f"{TRACKING_BASE}/o/{tracking_code}", 'first_name': 'Valued Customer' } # Parse headers headers = header_config.get('headers_template', {}) if isinstance(headers, str): headers = json.loads(headers) msg = MIMEMultipart('alternative') # Apply headers for key, value in headers.items(): if key not in ['Content-Type', 'MIME-Version']: msg[key] = replace_placeholders(value, values) # Required headers msg['To'] = to_email msg['From'] = f"Notification " msg['Subject'] = replace_placeholders(header_config.get('subject', 'Important Update'), values) msg['Date'] = datetime.now().strftime('%a, %d %b %Y %H:%M:%S +0000') # Body html_body = replace_placeholders(body_template, values) text_body = "Please view this email in HTML format." msg.attach(MIMEText(text_body, 'plain')) msg.attach(MIMEText(html_body, 'html')) return msg, tracking_code def get_test_seeds(target_isps, count=5): """Get seeds for testing""" conn = get_db() cur = conn.cursor() isp_filter = ' OR '.join([f"isp ILIKE '%{isp}%'" for isp in target_isps]) cur.execute(f""" SELECT id, email, isp FROM admin.brain_seeds WHERE is_active = true AND ({isp_filter}) ORDER BY RANDOM() LIMIT %s """, (count,)) seeds = cur.fetchall() conn.close() return seeds def send_test_email(msg, from_email, to_email): """Send via local PMTA""" try: with smtplib.SMTP('127.0.0.1', 25) as server: server.sendmail(from_email, to_email, msg.as_string()) return True except Exception as e: print(f" Send error: {e}") return False def validate_header_config(config_id): """Validate a header configuration""" conn = get_db() cur = conn.cursor() # Get config cur.execute(""" SELECT config_name, headers_template, example_placeholder1, example_placeholder2, target_isps FROM admin.winning_headers WHERE id = %s """, (config_id,)) row = cur.fetchone() if not row: print(f"Config {config_id} not found") return None name, headers, ph1, ph2, isps = row print(f"\nπŸ§ͺ Validating: {name}") print(f" Targets: {isps}") # Get default body template cur.execute(""" SELECT body_html FROM admin.winning_body_templates WHERE target_isps && %s LIMIT 1 """, (isps,)) body_row = cur.fetchone() body_template = body_row[0] if body_row else "Test" # Get test seeds seeds = get_test_seeds(isps, 5) if not seeds: print(" ❌ No seeds available for target ISPs") return None # Parse headers if isinstance(headers, str): headers = json.loads(headers) config = { 'headers_template': headers, 'subject': 'βœ… Confirm your loan todayπŸ’°' } # Send tests sent = 0 for seed_id, seed_email, seed_isp in seeds: msg, tracking_code = build_email_from_config( config, body_template, seed_email, ph1 or 'tracking.example.com', ph2 or 'sender.example.com' ) from_email = f"noreply@{ph2 or 'sender.example.com'}" success = send_test_email(msg, from_email, seed_email) if success: sent += 1 # Log tracking cur.execute(""" INSERT INTO admin.seed_tracking (seed_email, seed_isp, sender_email, tracking_code, sent_at, batch_id) VALUES (%s, %s, %s, %s, NOW(), %s) """, (seed_email, seed_isp, from_email, tracking_code, f"validate_header_{config_id}")) print(f" βœ… Sent to {seed_email}") # Update config cur.execute(""" UPDATE admin.winning_headers SET times_tested = times_tested + 1, total_sent = total_sent + %s, last_tested = NOW(), status = 'testing' WHERE id = %s """, (sent, config_id)) conn.commit() conn.close() print(f" πŸ“¨ Sent {sent}/{len(seeds)} test emails") return sent def analyze_validation_results(): """Analyze results of validation tests""" conn = get_db() cur = conn.cursor() print("=" * 70) print("πŸ“Š VALIDATION RESULTS ANALYSIS") print("=" * 70) # Check header configs in testing cur.execute(""" SELECT wh.id, wh.config_name, wh.target_isps, COUNT(st.id) as total, SUM(CASE WHEN st.inbox_placement = 'inbox' THEN 1 ELSE 0 END) as inbox, SUM(CASE WHEN st.inbox_placement = 'spam' THEN 1 ELSE 0 END) as spam FROM admin.winning_headers wh LEFT JOIN admin.seed_tracking st ON st.batch_id = 'validate_header_' || wh.id::text WHERE wh.status = 'testing' GROUP BY wh.id, wh.config_name, wh.target_isps """) for config_id, name, isps, total, inbox, spam in cur.fetchall(): if total and total >= 3: inbox_rate = (inbox / total * 100) if total > 0 else 0 # Determine result if inbox_rate >= 70: new_status = 'validated' icon = 'βœ…' elif inbox_rate >= 50: new_status = 'pending' icon = '🟑' else: new_status = 'failed' icon = '❌' print(f"\n{icon} {name}") print(f" Inbox: {inbox}/{total} ({inbox_rate:.1f}%)") print(f" Status: {new_status}") # Update cur.execute(""" UPDATE admin.winning_headers SET inbox_rate = %s, last_inbox_rate = %s, status = %s, validated_at = CASE WHEN %s = 'validated' THEN NOW() ELSE validated_at END, needs_revalidation = false WHERE id = %s """, (inbox_rate, inbox_rate, new_status, new_status, config_id)) conn.commit() conn.close() def list_configs(): """List all configs with status""" conn = get_db() cur = conn.cursor() print("=" * 70) print("πŸ“‹ WINNING CONFIGURATIONS") print("=" * 70) print("\nπŸ”§ HEADER CONFIGS:") cur.execute(""" SELECT config_name, target_method, target_isps, inbox_rate, status, last_tested FROM admin.winning_headers ORDER BY status, inbox_rate DESC NULLS LAST """) for name, method, isps, rate, status, tested in cur.fetchall(): icon = {'validated': 'βœ…', 'winner': 'πŸ†', 'pending': '⏳', 'testing': 'πŸ§ͺ', 'failed': '❌', 'outdated': '⚠️'}.get(status, '❓') print(f" {icon} {name[:40]:<40} {status:<10} {rate or 0:>5.1f}% {isps}") print("\n🌐 DNS CONFIGS:") cur.execute(""" SELECT config_name, domain, target_isps, inbox_rate, status FROM admin.winning_dns_configs ORDER BY status """) for name, domain, isps, rate, status in cur.fetchall(): icon = {'validated': 'βœ…', 'pending': '⏳'}.get(status, '❓') print(f" {icon} {name[:30]:<30} {domain:<25} {status:<10} {isps}") conn.close() def main(): import sys if len(sys.argv) < 2: list_configs() return cmd = sys.argv[1] if cmd == 'list': list_configs() elif cmd == 'validate': config_id = int(sys.argv[2]) if len(sys.argv) > 2 else None if config_id: validate_header_config(config_id) else: # Validate all pending conn = get_db() cur = conn.cursor() cur.execute("SELECT id FROM admin.winning_headers WHERE status IN ('pending', 'outdated')") for (cid,) in cur.fetchall(): validate_header_config(cid) conn.close() elif cmd == 'results': analyze_validation_results() elif cmd == 'all': # Full cycle list_configs() print("\nValidating pending configs...") conn = get_db() cur = conn.cursor() cur.execute("SELECT id FROM admin.winning_headers WHERE status IN ('pending', 'outdated') LIMIT 5") for (cid,) in cur.fetchall(): validate_header_config(cid) conn.close() else: print("Commands: list, validate [id], results, all") if __name__ == '__main__': main()