Files
wevads-platform/scripts/creative-performance-engine.py
2026-02-26 04:53:11 +01:00

231 lines
8.9 KiB
Python
Executable File

#!/usr/bin/env python3
"""
CREATIVE PERFORMANCE ENGINE
- Aggregates send/open/click per creative per offer per country/ISP
- Detects winners automatically
- Feeds back into Quality Guard for smart selection
- Run via cron every hour or on-demand
"""
import psycopg2, json, sys
from datetime import datetime, timedelta
DB = {'host':'localhost','database':'adx_system','user':'admin','password':'admin123'}
def aggregate_performance():
"""Join send_log + tracking_events to compute creative performance"""
conn = psycopg2.connect(**DB)
cur = conn.cursor()
print(f"[{datetime.now()}] Aggregating creative performance...")
# Aggregate sends per offer/creative/country/isp
cur.execute("""
INSERT INTO admin.creative_performance (offer_id, creative_id, country, isp, sends, opens, clicks, last_updated)
SELECT
s.offer_id,
s.creative_id,
COALESCE(s.country, 'XX') as country,
s.isp_target as isp,
COUNT(*) as sends,
COUNT(DISTINCT CASE WHEN te.event_type = 'open' THEN te.tracking_id END) as opens,
COUNT(DISTINCT CASE WHEN te.event_type = 'click' THEN te.tracking_id END) as clicks,
NOW()
FROM admin.unified_send_log_new s
LEFT JOIN admin.tracking_events te ON te.tracking_id = s.tracking_id
WHERE s.offer_id IS NOT NULL AND s.creative_id IS NOT NULL
AND s.status = 'sent'
GROUP BY s.offer_id, s.creative_id, s.country, s.isp_target
ON CONFLICT (offer_id, creative_id, country, isp)
DO UPDATE SET
sends = EXCLUDED.sends,
opens = EXCLUDED.opens,
clicks = EXCLUDED.clicks,
open_rate = CASE WHEN EXCLUDED.sends > 0 THEN ROUND(100.0 * EXCLUDED.opens / EXCLUDED.sends, 2) ELSE 0 END,
click_rate = CASE WHEN EXCLUDED.sends > 0 THEN ROUND(100.0 * EXCLUDED.clicks / EXCLUDED.sends, 2) ELSE 0 END,
score = CASE WHEN EXCLUDED.sends > 0
THEN ROUND(50.0 * EXCLUDED.opens / EXCLUDED.sends + 50.0 * EXCLUDED.clicks / GREATEST(EXCLUDED.opens, 1), 2)
ELSE 0 END,
last_updated = NOW()
""")
# Also aggregate subject performance
cur.execute("""
INSERT INTO admin.subject_performance (offer_id, subject_text, country, isp, sends, opens, clicks, last_updated)
SELECT
s.offer_id,
s.subject,
COALESCE(s.country, 'XX') as country,
s.isp_target as isp,
COUNT(*) as sends,
COUNT(DISTINCT CASE WHEN te.event_type = 'open' THEN te.tracking_id END) as opens,
COUNT(DISTINCT CASE WHEN te.event_type = 'click' THEN te.tracking_id END) as clicks,
NOW()
FROM admin.unified_send_log_new s
LEFT JOIN admin.tracking_events te ON te.tracking_id = s.tracking_id
WHERE s.offer_id IS NOT NULL AND s.subject IS NOT NULL AND s.subject != ''
AND s.status = 'sent'
GROUP BY s.offer_id, s.subject, s.country, s.isp_target
ON CONFLICT (offer_id, subject_text, country, isp)
DO UPDATE SET
sends = EXCLUDED.sends,
opens = EXCLUDED.opens,
clicks = EXCLUDED.clicks,
open_rate = CASE WHEN EXCLUDED.sends > 0 THEN ROUND(100.0 * EXCLUDED.opens / EXCLUDED.sends, 2) ELSE 0 END,
last_updated = NOW()
""")
conn.commit()
# Count results
cur.execute("SELECT COUNT(*) FROM admin.creative_performance WHERE sends > 0")
cp_count = cur.fetchone()[0]
cur.execute("SELECT COUNT(*) FROM admin.subject_performance WHERE sends > 0")
sp_count = cur.fetchone()[0]
print(f" Creative records: {cp_count}")
print(f" Subject records: {sp_count}")
cur.close(); conn.close()
return {'creative_records': cp_count, 'subject_records': sp_count}
def detect_winners(min_sends=20):
"""Auto-detect winning creatives per offer/country/ISP"""
conn = psycopg2.connect(**DB)
cur = conn.cursor()
print(f"\n[{datetime.now()}] Detecting winners (min {min_sends} sends)...")
# Reset all winners
cur.execute("UPDATE admin.creative_performance SET is_winner = false")
cur.execute("UPDATE admin.subject_performance SET is_winner = false")
# Find best creative per offer+country+isp (by score)
cur.execute("""
WITH ranked AS (
SELECT id, offer_id, creative_id, country, isp, sends, score,
ROW_NUMBER() OVER (PARTITION BY offer_id, country, isp ORDER BY score DESC) as rn
FROM admin.creative_performance
WHERE sends >= %s
)
UPDATE admin.creative_performance cp
SET is_winner = true
FROM ranked r
WHERE cp.id = r.id AND r.rn = 1
""", (min_sends,))
winners_cr = cur.rowcount
# Find best subject per offer+country+isp (by open_rate)
cur.execute("""
WITH ranked AS (
SELECT id, offer_id, subject_text, country, isp, sends, open_rate,
ROW_NUMBER() OVER (PARTITION BY offer_id, country, isp ORDER BY open_rate DESC) as rn
FROM admin.subject_performance
WHERE sends >= %s
)
UPDATE admin.subject_performance sp
SET is_winner = true
FROM ranked r
WHERE sp.id = r.id AND r.rn = 1
""", (min_sends,))
winners_subj = cur.rowcount
conn.commit()
# Show winners
cur.execute("""
SELECT cp.offer_id, o.name, cp.creative_id, cp.country, cp.isp,
cp.sends, cp.opens, cp.clicks, cp.score
FROM admin.creative_performance cp
JOIN affiliate.offers o ON o.id = cp.offer_id
WHERE cp.is_winner = true
ORDER BY cp.score DESC
""")
print(f"\n Creative winners: {winners_cr}")
for row in cur.fetchall():
print(f" Offer #{row[0]} ({row[1][:30]}) | Creative #{row[2]} | {row[3]}/{row[4]} | "
f"Sends:{row[5]} Opens:{row[6]} Clicks:{row[7]} Score:{row[8]}")
cur.execute("""
SELECT sp.offer_id, sp.subject_text, sp.country, sp.isp,
sp.sends, sp.opens, sp.open_rate
FROM admin.subject_performance sp
WHERE sp.is_winner = true
ORDER BY sp.open_rate DESC
""")
print(f"\n Subject winners: {winners_subj}")
for row in cur.fetchall():
print(f" Offer #{row[0]} | \"{row[1][:40]}\" | {row[2]}/{row[3]} | "
f"Sends:{row[4]} Opens:{row[5]} Rate:{row[6]}%")
cur.close(); conn.close()
return {'creative_winners': winners_cr, 'subject_winners': winners_subj}
def report():
"""Generate performance report"""
conn = psycopg2.connect(**DB)
cur = conn.cursor()
print(f"\n{'='*60}")
print(f" CREATIVE PERFORMANCE REPORT")
print(f"{'='*60}")
# Overall stats
cur.execute("""
SELECT
COUNT(DISTINCT offer_id) as offers,
COUNT(DISTINCT creative_id) as creatives,
SUM(sends) as total_sends,
SUM(opens) as total_opens,
SUM(clicks) as total_clicks,
CASE WHEN SUM(sends) > 0 THEN ROUND(100.0 * SUM(opens) / SUM(sends), 2) ELSE 0 END as avg_open_rate,
CASE WHEN SUM(sends) > 0 THEN ROUND(100.0 * SUM(clicks) / SUM(sends), 2) ELSE 0 END as avg_click_rate
FROM admin.creative_performance
""")
row = cur.fetchone()
print(f"\n Offers tracked: {row[0]}")
print(f" Creatives tracked: {row[1]}")
print(f" Total sends: {row[2]}")
print(f" Total opens: {row[3]} ({row[5]}%)")
print(f" Total clicks: {row[4]} ({row[6]}%)")
# Per country
cur.execute("""
SELECT country, SUM(sends), SUM(opens), SUM(clicks),
CASE WHEN SUM(sends) > 0 THEN ROUND(100.0 * SUM(clicks) / SUM(sends), 2) ELSE 0 END as ctr
FROM admin.creative_performance
GROUP BY country ORDER BY SUM(sends) DESC
""")
print(f"\n By Country:")
for row in cur.fetchall():
print(f" {row[0]}: {row[1]} sends, {row[2]} opens, {row[3]} clicks ({row[4]}% CTR)")
# Best performing offers
cur.execute("""
SELECT cp.offer_id, LEFT(o.name, 40), SUM(cp.sends), SUM(cp.clicks),
CASE WHEN SUM(cp.sends) > 0 THEN ROUND(100.0 * SUM(cp.clicks) / SUM(cp.sends), 2) ELSE 0 END as ctr
FROM admin.creative_performance cp
JOIN affiliate.offers o ON o.id = cp.offer_id
GROUP BY cp.offer_id, o.name
ORDER BY ctr DESC
""")
print(f"\n By Offer (CTR):")
for row in cur.fetchall():
print(f" #{row[0]} {row[1]}: {row[2]} sends, {row[3]} clicks ({row[4]}% CTR)")
cur.close(); conn.close()
if __name__ == '__main__':
action = sys.argv[1] if len(sys.argv) > 1 else 'all'
if action in ['all', 'aggregate']:
aggregate_performance()
if action in ['all', 'winners']:
detect_winners(min_sends=int(sys.argv[2]) if len(sys.argv) > 2 else 20)
if action in ['all', 'report']:
report()