Files
wevads-platform/scripts/config-validator.py
2026-02-26 04:53:11 +01:00

349 lines
11 KiB
Python
Executable File

#!/usr/bin/env python3
"""
CONFIG VALIDATOR
Valide les configurations gagnantes:
- Teste headers, DNS, body templates
- Envoie vers seeds de test
- Mesure inbox rate
- Marque comme validated ou failed
"""
import psycopg2
import smtplib
import hashlib
import json
import re
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from datetime import datetime
import random
import string
DB_CONFIG = {
'host': 'localhost',
'database': 'adx_system',
'user': 'admin',
'password': 'admin123'
}
TRACKING_BASE = "http://151.80.235.110"
def get_db():
return psycopg2.connect(**DB_CONFIG)
def generate_random_string(length=10):
"""Generate random alphanumeric string"""
return ''.join(random.choices(string.ascii_lowercase + string.digits, k=length))
def replace_placeholders(template, values):
"""Replace placeholders in template"""
result = template
# Standard placeholders
placeholders = {
'[uan_25]': generate_random_string(8),
'[uan_10]': generate_random_string(6),
'[uan_14]': generate_random_string(8),
'[uanu_10]': generate_random_string(6),
'[uanu_8]': generate_random_string(5),
'[email]': values.get('to_email', 'test@example.com'),
'[mail_date]': datetime.now().strftime('%a, %d %b %Y %H:%M:%S +0000'),
'[first_name]': values.get('first_name', 'User'),
'[placeholder1]': values.get('placeholder1', 'example.com'),
'[placeholder2]': values.get('placeholder2', 'sender.com'),
'[url]': values.get('tracking_url', 'c/test'),
'[unsub]': values.get('unsub_url', 'u/test'),
'[open]': values.get('open_url', 'o/test'),
}
for key, value in placeholders.items():
result = result.replace(key, str(value))
# Random placeholders [a_X], [ual_X]
result = re.sub(r'\[a_\d+\]', lambda m: generate_random_string(8), result)
result = re.sub(r'\[ual_\d+\]', lambda m: generate_random_string(10), result)
return result
def build_email_from_config(header_config, body_template, to_email, placeholder1, placeholder2):
"""Build complete email from winning config"""
tracking_code = hashlib.md5(f"{to_email}:{datetime.now().timestamp()}".encode()).hexdigest()[:12]
values = {
'to_email': to_email,
'placeholder1': placeholder1,
'placeholder2': placeholder2,
'tracking_url': f"{TRACKING_BASE}/c/{tracking_code}",
'unsub_url': f"{TRACKING_BASE}/u/{tracking_code}",
'open_url': f"{TRACKING_BASE}/o/{tracking_code}",
'first_name': 'Valued Customer'
}
# Parse headers
headers = header_config.get('headers_template', {})
if isinstance(headers, str):
headers = json.loads(headers)
msg = MIMEMultipart('alternative')
# Apply headers
for key, value in headers.items():
if key not in ['Content-Type', 'MIME-Version']:
msg[key] = replace_placeholders(value, values)
# Required headers
msg['To'] = to_email
msg['From'] = f"Notification <noreply@{placeholder2}>"
msg['Subject'] = replace_placeholders(header_config.get('subject', 'Important Update'), values)
msg['Date'] = datetime.now().strftime('%a, %d %b %Y %H:%M:%S +0000')
# Body
html_body = replace_placeholders(body_template, values)
text_body = "Please view this email in HTML format."
msg.attach(MIMEText(text_body, 'plain'))
msg.attach(MIMEText(html_body, 'html'))
return msg, tracking_code
def get_test_seeds(target_isps, count=5):
"""Get seeds for testing"""
conn = get_db()
cur = conn.cursor()
isp_filter = ' OR '.join([f"isp ILIKE '%{isp}%'" for isp in target_isps])
cur.execute(f"""
SELECT id, email, isp
FROM admin.brain_seeds
WHERE is_active = true
AND ({isp_filter})
ORDER BY RANDOM()
LIMIT %s
""", (count,))
seeds = cur.fetchall()
conn.close()
return seeds
def send_test_email(msg, from_email, to_email):
"""Send via local PMTA"""
try:
with smtplib.SMTP('127.0.0.1', 25) as server:
server.sendmail(from_email, to_email, msg.as_string())
return True
except Exception as e:
print(f" Send error: {e}")
return False
def validate_header_config(config_id):
"""Validate a header configuration"""
conn = get_db()
cur = conn.cursor()
# Get config
cur.execute("""
SELECT config_name, headers_template, example_placeholder1, example_placeholder2, target_isps
FROM admin.winning_headers WHERE id = %s
""", (config_id,))
row = cur.fetchone()
if not row:
print(f"Config {config_id} not found")
return None
name, headers, ph1, ph2, isps = row
print(f"\n🧪 Validating: {name}")
print(f" Targets: {isps}")
# Get default body template
cur.execute("""
SELECT body_html FROM admin.winning_body_templates
WHERE target_isps && %s LIMIT 1
""", (isps,))
body_row = cur.fetchone()
body_template = body_row[0] if body_row else "<html><body>Test</body></html>"
# Get test seeds
seeds = get_test_seeds(isps, 5)
if not seeds:
print(" ❌ No seeds available for target ISPs")
return None
# Parse headers
if isinstance(headers, str):
headers = json.loads(headers)
config = {
'headers_template': headers,
'subject': '✅ Confirm your loan today💰'
}
# Send tests
sent = 0
for seed_id, seed_email, seed_isp in seeds:
msg, tracking_code = build_email_from_config(
config, body_template, seed_email,
ph1 or 'tracking.example.com',
ph2 or 'sender.example.com'
)
from_email = f"noreply@{ph2 or 'sender.example.com'}"
success = send_test_email(msg, from_email, seed_email)
if success:
sent += 1
# Log tracking
cur.execute("""
INSERT INTO admin.seed_tracking
(seed_email, seed_isp, sender_email, tracking_code, sent_at, batch_id)
VALUES (%s, %s, %s, %s, NOW(), %s)
""", (seed_email, seed_isp, from_email, tracking_code, f"validate_header_{config_id}"))
print(f" ✅ Sent to {seed_email}")
# Update config
cur.execute("""
UPDATE admin.winning_headers SET
times_tested = times_tested + 1,
total_sent = total_sent + %s,
last_tested = NOW(),
status = 'testing'
WHERE id = %s
""", (sent, config_id))
conn.commit()
conn.close()
print(f" 📨 Sent {sent}/{len(seeds)} test emails")
return sent
def analyze_validation_results():
"""Analyze results of validation tests"""
conn = get_db()
cur = conn.cursor()
print("=" * 70)
print("📊 VALIDATION RESULTS ANALYSIS")
print("=" * 70)
# Check header configs in testing
cur.execute("""
SELECT wh.id, wh.config_name, wh.target_isps,
COUNT(st.id) as total,
SUM(CASE WHEN st.inbox_placement = 'inbox' THEN 1 ELSE 0 END) as inbox,
SUM(CASE WHEN st.inbox_placement = 'spam' THEN 1 ELSE 0 END) as spam
FROM admin.winning_headers wh
LEFT JOIN admin.seed_tracking st ON st.batch_id = 'validate_header_' || wh.id::text
WHERE wh.status = 'testing'
GROUP BY wh.id, wh.config_name, wh.target_isps
""")
for config_id, name, isps, total, inbox, spam in cur.fetchall():
if total and total >= 3:
inbox_rate = (inbox / total * 100) if total > 0 else 0
# Determine result
if inbox_rate >= 70:
new_status = 'validated'
icon = ''
elif inbox_rate >= 50:
new_status = 'pending'
icon = '🟡'
else:
new_status = 'failed'
icon = ''
print(f"\n{icon} {name}")
print(f" Inbox: {inbox}/{total} ({inbox_rate:.1f}%)")
print(f" Status: {new_status}")
# Update
cur.execute("""
UPDATE admin.winning_headers SET
inbox_rate = %s,
last_inbox_rate = %s,
status = %s,
validated_at = CASE WHEN %s = 'validated' THEN NOW() ELSE validated_at END,
needs_revalidation = false
WHERE id = %s
""", (inbox_rate, inbox_rate, new_status, new_status, config_id))
conn.commit()
conn.close()
def list_configs():
"""List all configs with status"""
conn = get_db()
cur = conn.cursor()
print("=" * 70)
print("📋 WINNING CONFIGURATIONS")
print("=" * 70)
print("\n🔧 HEADER CONFIGS:")
cur.execute("""
SELECT config_name, target_method, target_isps, inbox_rate, status, last_tested
FROM admin.winning_headers ORDER BY status, inbox_rate DESC NULLS LAST
""")
for name, method, isps, rate, status, tested in cur.fetchall():
icon = {'validated': '', 'winner': '🏆', 'pending': '', 'testing': '🧪', 'failed': '', 'outdated': '⚠️'}.get(status, '')
print(f" {icon} {name[:40]:<40} {status:<10} {rate or 0:>5.1f}% {isps}")
print("\n🌐 DNS CONFIGS:")
cur.execute("""
SELECT config_name, domain, target_isps, inbox_rate, status
FROM admin.winning_dns_configs ORDER BY status
""")
for name, domain, isps, rate, status in cur.fetchall():
icon = {'validated': '', 'pending': ''}.get(status, '')
print(f" {icon} {name[:30]:<30} {domain:<25} {status:<10} {isps}")
conn.close()
def main():
import sys
if len(sys.argv) < 2:
list_configs()
return
cmd = sys.argv[1]
if cmd == 'list':
list_configs()
elif cmd == 'validate':
config_id = int(sys.argv[2]) if len(sys.argv) > 2 else None
if config_id:
validate_header_config(config_id)
else:
# Validate all pending
conn = get_db()
cur = conn.cursor()
cur.execute("SELECT id FROM admin.winning_headers WHERE status IN ('pending', 'outdated')")
for (cid,) in cur.fetchall():
validate_header_config(cid)
conn.close()
elif cmd == 'results':
analyze_validation_results()
elif cmd == 'all':
# Full cycle
list_configs()
print("\nValidating pending configs...")
conn = get_db()
cur = conn.cursor()
cur.execute("SELECT id FROM admin.winning_headers WHERE status IN ('pending', 'outdated') LIMIT 5")
for (cid,) in cur.fetchall():
validate_header_config(cid)
conn.close()
else:
print("Commands: list, validate [id], results, all")
if __name__ == '__main__':
main()