349 lines
11 KiB
Python
Executable File
349 lines
11 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
CONFIG VALIDATOR
|
|
Valide les configurations gagnantes:
|
|
- Teste headers, DNS, body templates
|
|
- Envoie vers seeds de test
|
|
- Mesure inbox rate
|
|
- Marque comme validated ou failed
|
|
"""
|
|
|
|
import psycopg2
|
|
import smtplib
|
|
import hashlib
|
|
import json
|
|
import re
|
|
from email.mime.text import MIMEText
|
|
from email.mime.multipart import MIMEMultipart
|
|
from datetime import datetime
|
|
import random
|
|
import string
|
|
|
|
DB_CONFIG = {
|
|
'host': 'localhost',
|
|
'database': 'adx_system',
|
|
'user': 'admin',
|
|
'password': 'admin123'
|
|
}
|
|
|
|
TRACKING_BASE = "http://151.80.235.110"
|
|
|
|
def get_db():
|
|
return psycopg2.connect(**DB_CONFIG)
|
|
|
|
def generate_random_string(length=10):
|
|
"""Generate random alphanumeric string"""
|
|
return ''.join(random.choices(string.ascii_lowercase + string.digits, k=length))
|
|
|
|
def replace_placeholders(template, values):
|
|
"""Replace placeholders in template"""
|
|
result = template
|
|
|
|
# Standard placeholders
|
|
placeholders = {
|
|
'[uan_25]': generate_random_string(8),
|
|
'[uan_10]': generate_random_string(6),
|
|
'[uan_14]': generate_random_string(8),
|
|
'[uanu_10]': generate_random_string(6),
|
|
'[uanu_8]': generate_random_string(5),
|
|
'[email]': values.get('to_email', 'test@example.com'),
|
|
'[mail_date]': datetime.now().strftime('%a, %d %b %Y %H:%M:%S +0000'),
|
|
'[first_name]': values.get('first_name', 'User'),
|
|
'[placeholder1]': values.get('placeholder1', 'example.com'),
|
|
'[placeholder2]': values.get('placeholder2', 'sender.com'),
|
|
'[url]': values.get('tracking_url', 'c/test'),
|
|
'[unsub]': values.get('unsub_url', 'u/test'),
|
|
'[open]': values.get('open_url', 'o/test'),
|
|
}
|
|
|
|
for key, value in placeholders.items():
|
|
result = result.replace(key, str(value))
|
|
|
|
# Random placeholders [a_X], [ual_X]
|
|
result = re.sub(r'\[a_\d+\]', lambda m: generate_random_string(8), result)
|
|
result = re.sub(r'\[ual_\d+\]', lambda m: generate_random_string(10), result)
|
|
|
|
return result
|
|
|
|
def build_email_from_config(header_config, body_template, to_email, placeholder1, placeholder2):
|
|
"""Build complete email from winning config"""
|
|
|
|
tracking_code = hashlib.md5(f"{to_email}:{datetime.now().timestamp()}".encode()).hexdigest()[:12]
|
|
|
|
values = {
|
|
'to_email': to_email,
|
|
'placeholder1': placeholder1,
|
|
'placeholder2': placeholder2,
|
|
'tracking_url': f"{TRACKING_BASE}/c/{tracking_code}",
|
|
'unsub_url': f"{TRACKING_BASE}/u/{tracking_code}",
|
|
'open_url': f"{TRACKING_BASE}/o/{tracking_code}",
|
|
'first_name': 'Valued Customer'
|
|
}
|
|
|
|
# Parse headers
|
|
headers = header_config.get('headers_template', {})
|
|
if isinstance(headers, str):
|
|
headers = json.loads(headers)
|
|
|
|
msg = MIMEMultipart('alternative')
|
|
|
|
# Apply headers
|
|
for key, value in headers.items():
|
|
if key not in ['Content-Type', 'MIME-Version']:
|
|
msg[key] = replace_placeholders(value, values)
|
|
|
|
# Required headers
|
|
msg['To'] = to_email
|
|
msg['From'] = f"Notification <noreply@{placeholder2}>"
|
|
msg['Subject'] = replace_placeholders(header_config.get('subject', 'Important Update'), values)
|
|
msg['Date'] = datetime.now().strftime('%a, %d %b %Y %H:%M:%S +0000')
|
|
|
|
# Body
|
|
html_body = replace_placeholders(body_template, values)
|
|
text_body = "Please view this email in HTML format."
|
|
|
|
msg.attach(MIMEText(text_body, 'plain'))
|
|
msg.attach(MIMEText(html_body, 'html'))
|
|
|
|
return msg, tracking_code
|
|
|
|
def get_test_seeds(target_isps, count=5):
|
|
"""Get seeds for testing"""
|
|
conn = get_db()
|
|
cur = conn.cursor()
|
|
|
|
isp_filter = ' OR '.join([f"isp ILIKE '%{isp}%'" for isp in target_isps])
|
|
|
|
cur.execute(f"""
|
|
SELECT id, email, isp
|
|
FROM admin.brain_seeds
|
|
WHERE is_active = true
|
|
AND ({isp_filter})
|
|
ORDER BY RANDOM()
|
|
LIMIT %s
|
|
""", (count,))
|
|
|
|
seeds = cur.fetchall()
|
|
conn.close()
|
|
return seeds
|
|
|
|
def send_test_email(msg, from_email, to_email):
|
|
"""Send via local PMTA"""
|
|
try:
|
|
with smtplib.SMTP('127.0.0.1', 25) as server:
|
|
server.sendmail(from_email, to_email, msg.as_string())
|
|
return True
|
|
except Exception as e:
|
|
print(f" Send error: {e}")
|
|
return False
|
|
|
|
def validate_header_config(config_id):
|
|
"""Validate a header configuration"""
|
|
conn = get_db()
|
|
cur = conn.cursor()
|
|
|
|
# Get config
|
|
cur.execute("""
|
|
SELECT config_name, headers_template, example_placeholder1, example_placeholder2, target_isps
|
|
FROM admin.winning_headers WHERE id = %s
|
|
""", (config_id,))
|
|
|
|
row = cur.fetchone()
|
|
if not row:
|
|
print(f"Config {config_id} not found")
|
|
return None
|
|
|
|
name, headers, ph1, ph2, isps = row
|
|
|
|
print(f"\n🧪 Validating: {name}")
|
|
print(f" Targets: {isps}")
|
|
|
|
# Get default body template
|
|
cur.execute("""
|
|
SELECT body_html FROM admin.winning_body_templates
|
|
WHERE target_isps && %s LIMIT 1
|
|
""", (isps,))
|
|
|
|
body_row = cur.fetchone()
|
|
body_template = body_row[0] if body_row else "<html><body>Test</body></html>"
|
|
|
|
# Get test seeds
|
|
seeds = get_test_seeds(isps, 5)
|
|
|
|
if not seeds:
|
|
print(" ❌ No seeds available for target ISPs")
|
|
return None
|
|
|
|
# Parse headers
|
|
if isinstance(headers, str):
|
|
headers = json.loads(headers)
|
|
|
|
config = {
|
|
'headers_template': headers,
|
|
'subject': '✅ Confirm your loan today💰'
|
|
}
|
|
|
|
# Send tests
|
|
sent = 0
|
|
for seed_id, seed_email, seed_isp in seeds:
|
|
msg, tracking_code = build_email_from_config(
|
|
config, body_template, seed_email,
|
|
ph1 or 'tracking.example.com',
|
|
ph2 or 'sender.example.com'
|
|
)
|
|
|
|
from_email = f"noreply@{ph2 or 'sender.example.com'}"
|
|
success = send_test_email(msg, from_email, seed_email)
|
|
|
|
if success:
|
|
sent += 1
|
|
# Log tracking
|
|
cur.execute("""
|
|
INSERT INTO admin.seed_tracking
|
|
(seed_email, seed_isp, sender_email, tracking_code, sent_at, batch_id)
|
|
VALUES (%s, %s, %s, %s, NOW(), %s)
|
|
""", (seed_email, seed_isp, from_email, tracking_code, f"validate_header_{config_id}"))
|
|
print(f" ✅ Sent to {seed_email}")
|
|
|
|
# Update config
|
|
cur.execute("""
|
|
UPDATE admin.winning_headers SET
|
|
times_tested = times_tested + 1,
|
|
total_sent = total_sent + %s,
|
|
last_tested = NOW(),
|
|
status = 'testing'
|
|
WHERE id = %s
|
|
""", (sent, config_id))
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
print(f" 📨 Sent {sent}/{len(seeds)} test emails")
|
|
return sent
|
|
|
|
def analyze_validation_results():
|
|
"""Analyze results of validation tests"""
|
|
conn = get_db()
|
|
cur = conn.cursor()
|
|
|
|
print("=" * 70)
|
|
print("📊 VALIDATION RESULTS ANALYSIS")
|
|
print("=" * 70)
|
|
|
|
# Check header configs in testing
|
|
cur.execute("""
|
|
SELECT wh.id, wh.config_name, wh.target_isps,
|
|
COUNT(st.id) as total,
|
|
SUM(CASE WHEN st.inbox_placement = 'inbox' THEN 1 ELSE 0 END) as inbox,
|
|
SUM(CASE WHEN st.inbox_placement = 'spam' THEN 1 ELSE 0 END) as spam
|
|
FROM admin.winning_headers wh
|
|
LEFT JOIN admin.seed_tracking st ON st.batch_id = 'validate_header_' || wh.id::text
|
|
WHERE wh.status = 'testing'
|
|
GROUP BY wh.id, wh.config_name, wh.target_isps
|
|
""")
|
|
|
|
for config_id, name, isps, total, inbox, spam in cur.fetchall():
|
|
if total and total >= 3:
|
|
inbox_rate = (inbox / total * 100) if total > 0 else 0
|
|
|
|
# Determine result
|
|
if inbox_rate >= 70:
|
|
new_status = 'validated'
|
|
icon = '✅'
|
|
elif inbox_rate >= 50:
|
|
new_status = 'pending'
|
|
icon = '🟡'
|
|
else:
|
|
new_status = 'failed'
|
|
icon = '❌'
|
|
|
|
print(f"\n{icon} {name}")
|
|
print(f" Inbox: {inbox}/{total} ({inbox_rate:.1f}%)")
|
|
print(f" Status: {new_status}")
|
|
|
|
# Update
|
|
cur.execute("""
|
|
UPDATE admin.winning_headers SET
|
|
inbox_rate = %s,
|
|
last_inbox_rate = %s,
|
|
status = %s,
|
|
validated_at = CASE WHEN %s = 'validated' THEN NOW() ELSE validated_at END,
|
|
needs_revalidation = false
|
|
WHERE id = %s
|
|
""", (inbox_rate, inbox_rate, new_status, new_status, config_id))
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
def list_configs():
|
|
"""List all configs with status"""
|
|
conn = get_db()
|
|
cur = conn.cursor()
|
|
|
|
print("=" * 70)
|
|
print("📋 WINNING CONFIGURATIONS")
|
|
print("=" * 70)
|
|
|
|
print("\n🔧 HEADER CONFIGS:")
|
|
cur.execute("""
|
|
SELECT config_name, target_method, target_isps, inbox_rate, status, last_tested
|
|
FROM admin.winning_headers ORDER BY status, inbox_rate DESC NULLS LAST
|
|
""")
|
|
|
|
for name, method, isps, rate, status, tested in cur.fetchall():
|
|
icon = {'validated': '✅', 'winner': '🏆', 'pending': '⏳', 'testing': '🧪', 'failed': '❌', 'outdated': '⚠️'}.get(status, '❓')
|
|
print(f" {icon} {name[:40]:<40} {status:<10} {rate or 0:>5.1f}% {isps}")
|
|
|
|
print("\n🌐 DNS CONFIGS:")
|
|
cur.execute("""
|
|
SELECT config_name, domain, target_isps, inbox_rate, status
|
|
FROM admin.winning_dns_configs ORDER BY status
|
|
""")
|
|
|
|
for name, domain, isps, rate, status in cur.fetchall():
|
|
icon = {'validated': '✅', 'pending': '⏳'}.get(status, '❓')
|
|
print(f" {icon} {name[:30]:<30} {domain:<25} {status:<10} {isps}")
|
|
|
|
conn.close()
|
|
|
|
def main():
|
|
import sys
|
|
|
|
if len(sys.argv) < 2:
|
|
list_configs()
|
|
return
|
|
|
|
cmd = sys.argv[1]
|
|
|
|
if cmd == 'list':
|
|
list_configs()
|
|
elif cmd == 'validate':
|
|
config_id = int(sys.argv[2]) if len(sys.argv) > 2 else None
|
|
if config_id:
|
|
validate_header_config(config_id)
|
|
else:
|
|
# Validate all pending
|
|
conn = get_db()
|
|
cur = conn.cursor()
|
|
cur.execute("SELECT id FROM admin.winning_headers WHERE status IN ('pending', 'outdated')")
|
|
for (cid,) in cur.fetchall():
|
|
validate_header_config(cid)
|
|
conn.close()
|
|
elif cmd == 'results':
|
|
analyze_validation_results()
|
|
elif cmd == 'all':
|
|
# Full cycle
|
|
list_configs()
|
|
print("\nValidating pending configs...")
|
|
conn = get_db()
|
|
cur = conn.cursor()
|
|
cur.execute("SELECT id FROM admin.winning_headers WHERE status IN ('pending', 'outdated') LIMIT 5")
|
|
for (cid,) in cur.fetchall():
|
|
validate_header_config(cid)
|
|
conn.close()
|
|
else:
|
|
print("Commands: list, validate [id], results, all")
|
|
|
|
if __name__ == '__main__':
|
|
main()
|