Files
wevads-platform/scripts/domain-performance-analyzer.py
2026-02-26 04:53:11 +01:00

280 lines
9.0 KiB
Python
Executable File

#!/usr/bin/env python3
"""
DOMAIN PERFORMANCE ANALYZER
Analyse les domaines sender par:
- ISP (gmail, hotmail, yahoo...)
- Pays
- Provider (Cloudflare, FreeDNS)
Identifie les meilleurs domaines pour chaque combinaison
"""
import psycopg2
import json
from datetime import datetime, timedelta
DB_CONFIG = {
'host': 'localhost',
'database': 'adx_system',
'user': 'admin',
'password': 'admin123'
}
def get_db():
return psycopg2.connect(**DB_CONFIG)
def extract_domain_from_email(email):
"""Extract domain from sender email"""
if '@' in email:
return email.split('@')[1]
return None
def analyze_domain_performance(conn, days=7):
"""Analyze domain performance from tracking data"""
cur = conn.cursor()
# Get performance by sender domain and recipient ISP
cur.execute("""
SELECT
SPLIT_PART(sender_email, '@', 2) as sender_domain,
seed_isp as recipient_isp,
COUNT(*) as total_sent,
SUM(CASE WHEN inbox_placement = 'inbox' THEN 1 ELSE 0 END) as inbox,
SUM(CASE WHEN inbox_placement = 'spam' THEN 1 ELSE 0 END) as spam,
SUM(CASE WHEN bounced THEN 1 ELSE 0 END) as bounced,
SUM(CASE WHEN opened_at IS NOT NULL THEN 1 ELSE 0 END) as opened,
SUM(CASE WHEN clicked_at IS NOT NULL THEN 1 ELSE 0 END) as clicked
FROM admin.seed_tracking
WHERE sent_at > NOW() - INTERVAL '%s days'
AND sender_email IS NOT NULL
GROUP BY sender_domain, recipient_isp
HAVING COUNT(*) >= 10
ORDER BY sender_domain, recipient_isp
""", (days,))
results = cur.fetchall()
# Aggregate by domain
domain_stats = {}
for row in results:
domain, isp, sent, inbox, spam, bounced, opened, clicked = row
if not domain:
continue
if domain not in domain_stats:
domain_stats[domain] = {
'total_sent': 0,
'total_inbox': 0,
'total_spam': 0,
'total_bounced': 0,
'by_isp': {}
}
domain_stats[domain]['total_sent'] += sent
domain_stats[domain]['total_inbox'] += inbox or 0
domain_stats[domain]['total_spam'] += spam or 0
domain_stats[domain]['total_bounced'] += bounced or 0
inbox_rate = (inbox / sent * 100) if sent > 0 else 0
domain_stats[domain]['by_isp'][isp] = {
'sent': sent,
'inbox': inbox or 0,
'spam': spam or 0,
'inbox_rate': round(inbox_rate, 2),
'opened': opened or 0,
'clicked': clicked or 0
}
return domain_stats
def update_domain_performance(conn, domain_stats):
"""Update domain_performance table"""
cur = conn.cursor()
for domain, stats in domain_stats.items():
# Calculate overall performance score
sent = stats['total_sent']
inbox_rate = (stats['total_inbox'] / sent * 100) if sent > 0 else 0
spam_rate = (stats['total_spam'] / sent * 100) if sent > 0 else 0
# Score = inbox_rate - spam_rate penalty
performance_score = max(0, inbox_rate - (spam_rate * 0.5))
# Upsert domain performance
cur.execute("""
INSERT INTO admin.domain_performance
(domain, total_sent, total_delivered, total_inbox, total_spam, total_bounced,
inbox_rate, spam_rate, stats_by_isp, performance_score, last_updated)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW())
ON CONFLICT (domain) DO UPDATE SET
total_sent = EXCLUDED.total_sent,
total_inbox = EXCLUDED.total_inbox,
total_spam = EXCLUDED.total_spam,
total_bounced = EXCLUDED.total_bounced,
inbox_rate = EXCLUDED.inbox_rate,
spam_rate = EXCLUDED.spam_rate,
stats_by_isp = EXCLUDED.stats_by_isp,
performance_score = EXCLUDED.performance_score,
last_updated = NOW()
""", (
domain, sent, stats['total_inbox'] + stats['total_spam'],
stats['total_inbox'], stats['total_spam'], stats['total_bounced'],
round(inbox_rate, 2), round(spam_rate, 2),
json.dumps(stats['by_isp']), round(performance_score, 2)
))
conn.commit()
def identify_best_domains(conn):
"""Identify best domains per ISP"""
cur = conn.cursor()
# Clear old rankings
cur.execute("DELETE FROM admin.best_domains")
# Get best domain per ISP
isps = ['gmail', 'hotmail', 'yahoo', 'other']
for isp in isps:
cur.execute("""
SELECT domain,
(stats_by_isp->%s->>'inbox_rate')::decimal as inbox_rate,
(stats_by_isp->%s->>'sent')::int as sent
FROM admin.domain_performance
WHERE stats_by_isp ? %s
AND (stats_by_isp->%s->>'sent')::int >= 10
ORDER BY (stats_by_isp->%s->>'inbox_rate')::decimal DESC
LIMIT 5
""", (isp, isp, isp, isp, isp))
results = cur.fetchall()
for rank, (domain, inbox_rate, sent) in enumerate(results, 1):
# Get domain_id
cur.execute("SELECT id FROM admin.domain_pool WHERE domain = %s", (domain,))
domain_row = cur.fetchone()
domain_id = domain_row[0] if domain_row else None
cur.execute("""
INSERT INTO admin.best_domains
(isp, country_code, domain_id, domain, inbox_rate, total_sent, rank)
VALUES (%s, 'US', %s, %s, %s, %s, %s)
""", (isp, domain_id, domain, inbox_rate, sent, rank))
conn.commit()
def show_performance_report(conn):
"""Display performance report"""
cur = conn.cursor()
print("=" * 80)
print("📊 DOMAIN PERFORMANCE REPORT")
print(f"Time: {datetime.now()}")
print("=" * 80)
# Top domains overall
print("\n🏆 TOP 10 DOMAINS (Overall):")
print("-" * 80)
cur.execute("""
SELECT domain, total_sent, inbox_rate, spam_rate, performance_score
FROM admin.domain_performance
WHERE total_sent >= 50
ORDER BY performance_score DESC
LIMIT 10
""")
print(f"{'Domain':<35} {'Sent':>8} {'Inbox%':>8} {'Spam%':>8} {'Score':>8}")
print("-" * 80)
for domain, sent, inbox, spam, score in cur.fetchall():
print(f"{domain[:35]:<35} {sent:>8} {inbox or 0:>8.1f} {spam or 0:>8.1f} {score or 0:>8.1f}")
# Best per ISP
print("\n🎯 BEST DOMAINS BY ISP:")
print("-" * 80)
for isp in ['gmail', 'hotmail', 'yahoo']:
print(f"\n 📧 {isp.upper()}:")
cur.execute("""
SELECT domain, inbox_rate, total_sent, rank
FROM admin.best_domains
WHERE isp = %s
ORDER BY rank
LIMIT 3
""", (isp,))
for domain, inbox_rate, sent, rank in cur.fetchall():
medal = {1: '🥇', 2: '🥈', 3: '🥉'}.get(rank, ' ')
print(f" {medal} {domain} (Inbox: {inbox_rate}%, Sent: {sent})")
# Problem domains (high spam)
print("\n⚠️ PROBLEM DOMAINS (High Spam Rate):")
print("-" * 80)
cur.execute("""
SELECT domain, total_sent, spam_rate, performance_score
FROM admin.domain_performance
WHERE spam_rate > 20 AND total_sent >= 20
ORDER BY spam_rate DESC
LIMIT 5
""")
for domain, sent, spam, score in cur.fetchall():
print(f"{domain} - Spam: {spam}% (Sent: {sent})")
def update_domain_pool_stats(conn):
"""Update domain_pool table with performance data"""
cur = conn.cursor()
cur.execute("""
UPDATE admin.domain_pool dp
SET
performance_score = perf.performance_score,
total_inbox = perf.total_inbox,
total_spam = perf.total_spam,
inbox_rate = perf.inbox_rate,
emails_sent = perf.total_sent
FROM admin.domain_performance perf
WHERE dp.domain = perf.domain
""")
# Mark best senders
cur.execute("""
UPDATE admin.domain_performance SET is_best_sender = false
""")
cur.execute("""
UPDATE admin.domain_performance dp
SET is_best_sender = true
WHERE dp.domain IN (SELECT domain FROM admin.best_domains WHERE rank <= 3)
""")
conn.commit()
def main():
print("Analyzing domain performance...")
conn = get_db()
# Analyze from tracking data
stats = analyze_domain_performance(conn, days=7)
print(f"Found {len(stats)} domains with data")
# Update performance table
update_domain_performance(conn, stats)
# Identify best domains
identify_best_domains(conn)
# Update domain pool
update_domain_pool_stats(conn)
# Show report
show_performance_report(conn)
conn.close()
print("\n✅ Analysis complete!")
if __name__ == '__main__':
main()